Skip to main content

tokio_quiche/http3/driver/
streams.rs

1// Copyright (C) 2025, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27use std::future::Future;
28use std::pin::Pin;
29use std::sync::Arc;
30use std::task::Context;
31use std::task::Poll;
32use std::time::Instant;
33
34use tokio::sync::mpsc;
35use tokio_util::sync::PollSender;
36
37use super::InboundFrame;
38use super::InboundFrameSender;
39use super::InboundFrameStream;
40use super::OutboundFrame;
41use super::OutboundFrameSender;
42use super::OutboundFrameStream;
43use crate::http3::H3AuditStats;
44
45pub(crate) struct StreamCtx {
46    /// Sends [`InboundFrame`]s to a local task, for example an `H3Body`.
47    pub(crate) send: Option<InboundFrameSender>,
48    /// Receives [`OutboundFrame`]s from a local task.
49    pub(crate) recv: Option<OutboundFrameStream>,
50    /// Stores the next [`OutboundFrame`] to write to the connection.
51    /// This is used as temporary storage when waiting for `recv`.
52    pub(crate) queued_frame: Option<OutboundFrame>,
53    pub(crate) audit_stats: Arc<H3AuditStats>,
54    /// Indicates the stream sent initial headers.
55    pub(crate) initial_headers_sent: bool,
56    /// First time that a HEADERS frame was not fully flushed.
57    pub(crate) first_full_headers_flush_fail_time: Option<Instant>,
58    /// Indicates the stream received fin or reset. No more data will be
59    /// received.
60    pub(crate) fin_or_reset_recv: bool,
61    /// Indicates the stream sent fin or reset. No more data will be sent.
62    pub(crate) fin_or_reset_sent: bool,
63    /// The flow ID for proxying datagrams over this stream. If `None`,
64    /// the stream has no associated DATAGRAM flow.
65    pub(crate) associated_dgram_flow_id: Option<u64>,
66}
67
68impl StreamCtx {
69    /// Creates a new [StreamCtx]. This method returns the [StreamCtx] itself
70    /// as well as the sender/receiver that it communicates with.
71    pub(crate) fn new(
72        stream_id: u64, capacity: usize,
73    ) -> (Self, OutboundFrameSender, InboundFrameStream) {
74        let (forward_sender, forward_receiver) = mpsc::channel(capacity);
75        let (backward_sender, backward_receiver) = mpsc::channel(capacity);
76
77        let ctx = StreamCtx {
78            send: Some(PollSender::new(forward_sender)),
79            recv: Some(backward_receiver),
80            queued_frame: None,
81            audit_stats: Arc::new(H3AuditStats::new(stream_id)),
82
83            initial_headers_sent: false,
84            first_full_headers_flush_fail_time: None,
85
86            fin_or_reset_recv: false,
87            fin_or_reset_sent: false,
88
89            associated_dgram_flow_id: None,
90        };
91
92        (ctx, PollSender::new(backward_sender), forward_receiver)
93    }
94
95    /// Creates a [Future] that resolves when `send` has capacity again.
96    pub(crate) fn wait_for_send(&mut self, stream_id: u64) -> WaitForStream {
97        WaitForStream::Upstream(WaitForUpstreamCapacity {
98            stream_id,
99            chan: self.send.take(),
100        })
101    }
102
103    /// Creates a [Future] that resolves when `recv` has data again.
104    pub(crate) fn wait_for_recv(&mut self, stream_id: u64) -> WaitForStream {
105        WaitForStream::Downstream(WaitForDownstreamData {
106            stream_id,
107            chan: self.recv.take(),
108        })
109    }
110
111    /// Handle the case when we received a STOP_SENDING frame. Note, that
112    /// we'll only learn about a STOP_SENDING frame from the write path.
113    /// Also note, that quiche will automatically send a RESET_STREAM frame
114    /// in response when it receives a STOP_SENDING (as recommended by the
115    /// RFC)
116    pub(crate) fn handle_recvd_stop_sending(&mut self, wire_err_code: u64) {
117        // Update stats even if the stream is already closing.
118        self.audit_stats
119            .set_recvd_stop_sending_error_code(wire_err_code as i64);
120
121        if self.fin_or_reset_sent {
122            // It is valid to receive STOP_SENDING after we already sent fin
123            // (or a reset): the write side is already finished and there is
124            // nothing left to do.
125            return;
126        }
127
128        // We received a STOP_SENDING frame. This indicates that the
129        // write direction has closed. We still need to continue
130        // reading from the stream until we receive a RESET_FRAME or
131        // `fin`.
132        self.fin_or_reset_sent = true;
133        // Drop any pending data and close the write side.
134        // We can't accept additional frames
135        self.queued_frame = None;
136        self.recv = None;
137    }
138
139    pub(crate) fn handle_recvd_reset(&mut self, wire_err_code: u64) {
140        debug_assert!(!self.fin_or_reset_recv);
141        // We received a RESET_STREAM frame, which closes the read direction
142        // but not the write direction. If the peer wants to shut down write,
143        // it must also send STOP_SENDING
144        self.audit_stats
145            .set_recvd_reset_stream_error_code(wire_err_code as i64);
146        self.fin_or_reset_recv = true;
147        self.send = None;
148    }
149
150    pub(crate) fn handle_sent_reset(&mut self, wire_err_code: u64) {
151        debug_assert!(!self.fin_or_reset_sent);
152        self.audit_stats
153            .set_sent_reset_stream_error_code(wire_err_code as i64);
154        self.fin_or_reset_sent = true;
155    }
156
157    pub(crate) fn handle_sent_stop_sending(&mut self, wire_err_code: u64) {
158        debug_assert!(!self.fin_or_reset_recv);
159        self.audit_stats
160            .set_sent_stop_sending_error_code(wire_err_code as i64);
161        // It is ok for us to set the `fin_reset_recv` flag here.  While the peer
162        // must still send a fin or reset_stream with its final size, we don't
163        // need to read it from the stream. Quiche will take care of that.
164        self.fin_or_reset_recv = true;
165        self.send = None;
166    }
167
168    pub(crate) fn both_directions_done(&self) -> bool {
169        self.fin_or_reset_recv && self.fin_or_reset_sent
170    }
171}
172
173pub(crate) struct FlowCtx {
174    /// Sends inbound datagrams to a local task.
175    send: mpsc::Sender<InboundFrame>,
176    // No `recv`: all outbound datagrams are sent on a shared channel in H3Driver
177}
178
179impl FlowCtx {
180    /// Creates a new [FlowCtx]. This method returns the context itself
181    /// as well as the datagram receiver for this flow.
182    pub(crate) fn new(capacity: usize) -> (Self, InboundFrameStream) {
183        let (forward_sender, forward_receiver) = mpsc::channel(capacity);
184        let ctx = FlowCtx {
185            send: forward_sender,
186        };
187        (ctx, forward_receiver)
188    }
189
190    /// Tries to send a datagram to the flow receiver, but drops it if the
191    /// channel is full.
192    pub(crate) fn send_best_effort(&self, datagram: InboundFrame) {
193        let _ = self.send.try_send(datagram);
194    }
195}
196
197pub(crate) enum WaitForStream {
198    Downstream(WaitForDownstreamData),
199    Upstream(WaitForUpstreamCapacity),
200}
201
202pub(crate) enum StreamReady {
203    Downstream(ReceivedDownstreamData),
204    Upstream(HaveUpstreamCapacity),
205}
206
207impl Future for WaitForStream {
208    type Output = StreamReady;
209
210    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
211        match self.get_mut() {
212            WaitForStream::Downstream(d) =>
213                Pin::new(d).poll(cx).map(StreamReady::Downstream),
214            WaitForStream::Upstream(u) =>
215                Pin::new(u).poll(cx).map(StreamReady::Upstream),
216        }
217    }
218}
219
220pub(crate) struct WaitForDownstreamData {
221    pub(crate) stream_id: u64,
222    pub(crate) chan: Option<OutboundFrameStream>,
223}
224
225pub(crate) struct ReceivedDownstreamData {
226    pub(crate) stream_id: u64,
227    pub(crate) chan: OutboundFrameStream,
228    pub(crate) data: Option<OutboundFrame>,
229}
230
231impl Future for WaitForDownstreamData {
232    type Output = ReceivedDownstreamData;
233
234    fn poll(
235        mut self: Pin<&mut Self>, cx: &mut Context<'_>,
236    ) -> Poll<Self::Output> {
237        // Unwraps below are Ok because chan will only be None after first
238        // Poll::Ready, which is fine to panic for non fused future.
239        self.chan.as_mut().unwrap().poll_recv(cx).map(|data| {
240            ReceivedDownstreamData {
241                stream_id: self.stream_id,
242                chan: self.chan.take().unwrap(),
243                data,
244            }
245        })
246    }
247}
248
249pub(crate) struct WaitForUpstreamCapacity {
250    pub(crate) stream_id: u64,
251    pub(crate) chan: Option<InboundFrameSender>,
252}
253
254pub(crate) struct HaveUpstreamCapacity {
255    pub(crate) stream_id: u64,
256    pub(crate) chan: InboundFrameSender,
257}
258
259impl Future for WaitForUpstreamCapacity {
260    type Output = HaveUpstreamCapacity;
261
262    fn poll(
263        mut self: Pin<&mut Self>, cx: &mut Context<'_>,
264    ) -> Poll<Self::Output> {
265        // Unwraps below are Ok because chan will only be None after first
266        // Poll::Ready, which is fine to panic for non fused future.
267        match self.chan.as_mut().unwrap().poll_reserve(cx) {
268            Poll::Ready(_) => Poll::Ready(HaveUpstreamCapacity {
269                stream_id: self.stream_id,
270                chan: self.chan.take().unwrap(),
271            }),
272            Poll::Pending => Poll::Pending,
273        }
274    }
275}