tokio_quiche/http3/driver/
streams.rs

1// Copyright (C) 2025, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27use std::future::Future;
28use std::pin::Pin;
29use std::sync::Arc;
30use std::task::Context;
31use std::task::Poll;
32
33use tokio::sync::mpsc;
34use tokio_util::sync::PollSender;
35
36use super::InboundFrame;
37use super::InboundFrameSender;
38use super::InboundFrameStream;
39use super::OutboundFrame;
40use super::OutboundFrameSender;
41use super::OutboundFrameStream;
42use crate::http3::H3AuditStats;
43
44pub(crate) struct StreamCtx {
45    /// Sends [`InboundFrame`]s to a local task, for example an `H3Body`.
46    pub(crate) send: Option<InboundFrameSender>,
47    /// Receives [`OutboundFrame`]s from a local task.
48    pub(crate) recv: Option<OutboundFrameStream>,
49    /// Stores the next [`OutboundFrame`] to write to the connection.
50    /// This is used as temporary storage when waiting for `recv`.
51    pub(crate) queued_frame: Option<OutboundFrame>,
52    pub(crate) audit_stats: Arc<H3AuditStats>,
53    /// Indicates the stream sent initial headers.
54    pub(crate) initial_headers_sent: bool,
55    /// Indicates the stream received fin. No more data will be received.
56    pub(crate) fin_recv: bool,
57    /// Indicates the stream sent fin. No more data will be sent.
58    pub(crate) fin_sent: bool,
59    /// The flow ID for proxying datagrams over this stream. If `None`,
60    /// the stream has no associated DATAGRAM flow.
61    pub(crate) associated_dgram_flow_id: Option<u64>,
62}
63
64impl StreamCtx {
65    /// Creates a new [StreamCtx]. This method returns the [StreamCtx] itself
66    /// as well as the sender/receiver that it communicates with.
67    pub(crate) fn new(
68        stream_id: u64, capacity: usize,
69    ) -> (Self, OutboundFrameSender, InboundFrameStream) {
70        let (forward_sender, forward_receiver) = mpsc::channel(capacity);
71        let (backward_sender, backward_receiver) = mpsc::channel(capacity);
72
73        let ctx = StreamCtx {
74            send: Some(PollSender::new(forward_sender)),
75            recv: Some(backward_receiver),
76            queued_frame: None,
77            audit_stats: Arc::new(H3AuditStats::new(stream_id)),
78
79            initial_headers_sent: false,
80
81            fin_recv: false,
82            fin_sent: false,
83
84            associated_dgram_flow_id: None,
85        };
86
87        (ctx, PollSender::new(backward_sender), forward_receiver)
88    }
89
90    /// Creates a [Future] that resolves when `send` has capacity again.
91    pub(crate) fn wait_for_send(&mut self, stream_id: u64) -> WaitForStream {
92        WaitForStream::Upstream(WaitForUpstreamCapacity {
93            stream_id,
94            chan: self.send.take(),
95        })
96    }
97
98    /// Creates a [Future] that resolves when `recv` has data again.
99    pub(crate) fn wait_for_recv(&mut self, stream_id: u64) -> WaitForStream {
100        WaitForStream::Downstream(WaitForDownstreamData {
101            stream_id,
102            chan: self.recv.take(),
103        })
104    }
105}
106
107pub(crate) struct FlowCtx {
108    /// Sends inbound datagrams to a local task.
109    send: mpsc::Sender<InboundFrame>,
110    // No `recv`: all outbound datagrams are sent on a shared channel in H3Driver
111}
112
113impl FlowCtx {
114    /// Creates a new [FlowCtx]. This method returns the context itself
115    /// as well as the datagram receiver for this flow.
116    pub(crate) fn new(capacity: usize) -> (Self, InboundFrameStream) {
117        let (forward_sender, forward_receiver) = mpsc::channel(capacity);
118        let ctx = FlowCtx {
119            send: forward_sender,
120        };
121        (ctx, forward_receiver)
122    }
123
124    /// Tries to send a datagram to the flow receiver, but drops it if the
125    /// channel is full.
126    pub(crate) fn send_best_effort(&self, datagram: InboundFrame) {
127        let _ = self.send.try_send(datagram);
128    }
129}
130
131pub(crate) enum WaitForStream {
132    Downstream(WaitForDownstreamData),
133    Upstream(WaitForUpstreamCapacity),
134}
135
136pub(crate) enum StreamReady {
137    Downstream(ReceivedDownstreamData),
138    Upstream(HaveUpstreamCapacity),
139}
140
141impl Future for WaitForStream {
142    type Output = StreamReady;
143
144    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
145        match self.get_mut() {
146            WaitForStream::Downstream(d) =>
147                Pin::new(d).poll(cx).map(StreamReady::Downstream),
148            WaitForStream::Upstream(u) =>
149                Pin::new(u).poll(cx).map(StreamReady::Upstream),
150        }
151    }
152}
153
154pub(crate) struct WaitForDownstreamData {
155    pub(crate) stream_id: u64,
156    pub(crate) chan: Option<OutboundFrameStream>,
157}
158
159pub(crate) struct ReceivedDownstreamData {
160    pub(crate) stream_id: u64,
161    pub(crate) chan: OutboundFrameStream,
162    pub(crate) data: Option<OutboundFrame>,
163}
164
165impl Future for WaitForDownstreamData {
166    type Output = ReceivedDownstreamData;
167
168    fn poll(
169        mut self: Pin<&mut Self>, cx: &mut Context<'_>,
170    ) -> Poll<Self::Output> {
171        // Unwraps below are Ok because chan will only be None after first
172        // Poll::Ready, which is fine to panic for non fused future.
173        self.chan.as_mut().unwrap().poll_recv(cx).map(|data| {
174            ReceivedDownstreamData {
175                stream_id: self.stream_id,
176                chan: self.chan.take().unwrap(),
177                data,
178            }
179        })
180    }
181}
182
183pub(crate) struct WaitForUpstreamCapacity {
184    pub(crate) stream_id: u64,
185    pub(crate) chan: Option<InboundFrameSender>,
186}
187
188pub(crate) struct HaveUpstreamCapacity {
189    pub(crate) stream_id: u64,
190    pub(crate) chan: InboundFrameSender,
191}
192
193impl Future for WaitForUpstreamCapacity {
194    type Output = HaveUpstreamCapacity;
195
196    fn poll(
197        mut self: Pin<&mut Self>, cx: &mut Context<'_>,
198    ) -> Poll<Self::Output> {
199        // Unwraps below are Ok because chan will only be None after first
200        // Poll::Ready, which is fine to panic for non fused future.
201        match self.chan.as_mut().unwrap().poll_reserve(cx) {
202            Poll::Ready(_) => Poll::Ready(HaveUpstreamCapacity {
203                stream_id: self.stream_id,
204                chan: self.chan.take().unwrap(),
205            }),
206            Poll::Pending => Poll::Pending,
207        }
208    }
209}