tokio_quiche/http3/driver/
streams.rs

1// Copyright (C) 2025, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27use std::future::Future;
28use std::pin::Pin;
29use std::sync::Arc;
30use std::task::Context;
31use std::task::Poll;
32use std::time::Instant;
33
34use tokio::sync::mpsc;
35use tokio_util::sync::PollSender;
36
37use super::InboundFrame;
38use super::InboundFrameSender;
39use super::InboundFrameStream;
40use super::OutboundFrame;
41use super::OutboundFrameSender;
42use super::OutboundFrameStream;
43use crate::http3::H3AuditStats;
44
45pub(crate) struct StreamCtx {
46    /// Sends [`InboundFrame`]s to a local task, for example an `H3Body`.
47    pub(crate) send: Option<InboundFrameSender>,
48    /// Receives [`OutboundFrame`]s from a local task.
49    pub(crate) recv: Option<OutboundFrameStream>,
50    /// Stores the next [`OutboundFrame`] to write to the connection.
51    /// This is used as temporary storage when waiting for `recv`.
52    pub(crate) queued_frame: Option<OutboundFrame>,
53    pub(crate) audit_stats: Arc<H3AuditStats>,
54    /// Indicates the stream sent initial headers.
55    pub(crate) initial_headers_sent: bool,
56    /// First time that a HEADERS frame was not fully flushed.
57    pub(crate) first_full_headers_flush_fail_time: Option<Instant>,
58    /// Indicates the stream received fin or reset. No more data will be
59    /// received.
60    pub(crate) fin_or_reset_recv: bool,
61    /// Indicates the stream sent fin or reset. No more data will be sent.
62    pub(crate) fin_or_reset_sent: bool,
63    /// The flow ID for proxying datagrams over this stream. If `None`,
64    /// the stream has no associated DATAGRAM flow.
65    pub(crate) associated_dgram_flow_id: Option<u64>,
66}
67
68impl StreamCtx {
69    /// Creates a new [StreamCtx]. This method returns the [StreamCtx] itself
70    /// as well as the sender/receiver that it communicates with.
71    pub(crate) fn new(
72        stream_id: u64, capacity: usize,
73    ) -> (Self, OutboundFrameSender, InboundFrameStream) {
74        let (forward_sender, forward_receiver) = mpsc::channel(capacity);
75        let (backward_sender, backward_receiver) = mpsc::channel(capacity);
76
77        let ctx = StreamCtx {
78            send: Some(PollSender::new(forward_sender)),
79            recv: Some(backward_receiver),
80            queued_frame: None,
81            audit_stats: Arc::new(H3AuditStats::new(stream_id)),
82
83            initial_headers_sent: false,
84            first_full_headers_flush_fail_time: None,
85
86            fin_or_reset_recv: false,
87            fin_or_reset_sent: false,
88
89            associated_dgram_flow_id: None,
90        };
91
92        (ctx, PollSender::new(backward_sender), forward_receiver)
93    }
94
95    /// Creates a [Future] that resolves when `send` has capacity again.
96    pub(crate) fn wait_for_send(&mut self, stream_id: u64) -> WaitForStream {
97        WaitForStream::Upstream(WaitForUpstreamCapacity {
98            stream_id,
99            chan: self.send.take(),
100        })
101    }
102
103    /// Creates a [Future] that resolves when `recv` has data again.
104    pub(crate) fn wait_for_recv(&mut self, stream_id: u64) -> WaitForStream {
105        WaitForStream::Downstream(WaitForDownstreamData {
106            stream_id,
107            chan: self.recv.take(),
108        })
109    }
110
111    /// Handle the case when we received a STOP_SENDING frame. Note, that
112    /// we'll only learn about a STOP_SENDING frame from the write path.
113    /// Also note, that quiche will automatically send a RESET_STREAM frame
114    /// in response when it receives a STOP_SENDING (as recommended by the
115    /// RFC)
116    pub(crate) fn handle_recvd_stop_sending(&mut self, wire_err_code: u64) {
117        debug_assert!(!self.fin_or_reset_sent);
118        // We received a STOP_SENDING frame. This indicates that the
119        // write direction has closed. We still need to continue
120        // reading from the stream until we receive a RESET_FRAME or
121        // `fin`.
122        self.audit_stats
123            .set_recvd_stop_sending_error_code(wire_err_code as i64);
124        self.fin_or_reset_sent = true;
125        // Drop any pending data and close the write side.
126        // We can't accept additional frames
127        self.queued_frame = None;
128        debug_assert!(self.recv.is_some());
129        self.recv = None;
130    }
131
132    pub(crate) fn handle_recvd_reset(&mut self, wire_err_code: u64) {
133        debug_assert!(!self.fin_or_reset_recv);
134        // We received a RESET_STREAM frame, which closes the read direction
135        // but not the write direction. If the peer wants to shut down write,
136        // it must also send STOP_SENDING
137        self.audit_stats
138            .set_recvd_reset_stream_error_code(wire_err_code as i64);
139        self.fin_or_reset_recv = true;
140        self.send = None;
141    }
142
143    pub(crate) fn handle_sent_reset(&mut self, wire_err_code: u64) {
144        debug_assert!(!self.fin_or_reset_sent);
145        self.audit_stats
146            .set_sent_reset_stream_error_code(wire_err_code as i64);
147        self.fin_or_reset_sent = true;
148    }
149
150    pub(crate) fn handle_sent_stop_sending(&mut self, wire_err_code: u64) {
151        debug_assert!(!self.fin_or_reset_recv);
152        self.audit_stats
153            .set_sent_stop_sending_error_code(wire_err_code as i64);
154        // It is ok for us to set the `fin_reset_recv` flag here.  While the peer
155        // must still send a fin or reset_stream with its final size, we don't
156        // need to read it from the stream. Quiche will take care of that.
157        self.fin_or_reset_recv = true;
158        self.send = None;
159    }
160
161    pub(crate) fn both_directions_done(&self) -> bool {
162        self.fin_or_reset_recv && self.fin_or_reset_sent
163    }
164}
165
166pub(crate) struct FlowCtx {
167    /// Sends inbound datagrams to a local task.
168    send: mpsc::Sender<InboundFrame>,
169    // No `recv`: all outbound datagrams are sent on a shared channel in H3Driver
170}
171
172impl FlowCtx {
173    /// Creates a new [FlowCtx]. This method returns the context itself
174    /// as well as the datagram receiver for this flow.
175    pub(crate) fn new(capacity: usize) -> (Self, InboundFrameStream) {
176        let (forward_sender, forward_receiver) = mpsc::channel(capacity);
177        let ctx = FlowCtx {
178            send: forward_sender,
179        };
180        (ctx, forward_receiver)
181    }
182
183    /// Tries to send a datagram to the flow receiver, but drops it if the
184    /// channel is full.
185    pub(crate) fn send_best_effort(&self, datagram: InboundFrame) {
186        let _ = self.send.try_send(datagram);
187    }
188}
189
190pub(crate) enum WaitForStream {
191    Downstream(WaitForDownstreamData),
192    Upstream(WaitForUpstreamCapacity),
193}
194
195pub(crate) enum StreamReady {
196    Downstream(ReceivedDownstreamData),
197    Upstream(HaveUpstreamCapacity),
198}
199
200impl Future for WaitForStream {
201    type Output = StreamReady;
202
203    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
204        match self.get_mut() {
205            WaitForStream::Downstream(d) =>
206                Pin::new(d).poll(cx).map(StreamReady::Downstream),
207            WaitForStream::Upstream(u) =>
208                Pin::new(u).poll(cx).map(StreamReady::Upstream),
209        }
210    }
211}
212
213pub(crate) struct WaitForDownstreamData {
214    pub(crate) stream_id: u64,
215    pub(crate) chan: Option<OutboundFrameStream>,
216}
217
218pub(crate) struct ReceivedDownstreamData {
219    pub(crate) stream_id: u64,
220    pub(crate) chan: OutboundFrameStream,
221    pub(crate) data: Option<OutboundFrame>,
222}
223
224impl Future for WaitForDownstreamData {
225    type Output = ReceivedDownstreamData;
226
227    fn poll(
228        mut self: Pin<&mut Self>, cx: &mut Context<'_>,
229    ) -> Poll<Self::Output> {
230        // Unwraps below are Ok because chan will only be None after first
231        // Poll::Ready, which is fine to panic for non fused future.
232        self.chan.as_mut().unwrap().poll_recv(cx).map(|data| {
233            ReceivedDownstreamData {
234                stream_id: self.stream_id,
235                chan: self.chan.take().unwrap(),
236                data,
237            }
238        })
239    }
240}
241
242pub(crate) struct WaitForUpstreamCapacity {
243    pub(crate) stream_id: u64,
244    pub(crate) chan: Option<InboundFrameSender>,
245}
246
247pub(crate) struct HaveUpstreamCapacity {
248    pub(crate) stream_id: u64,
249    pub(crate) chan: InboundFrameSender,
250}
251
252impl Future for WaitForUpstreamCapacity {
253    type Output = HaveUpstreamCapacity;
254
255    fn poll(
256        mut self: Pin<&mut Self>, cx: &mut Context<'_>,
257    ) -> Poll<Self::Output> {
258        // Unwraps below are Ok because chan will only be None after first
259        // Poll::Ready, which is fine to panic for non fused future.
260        match self.chan.as_mut().unwrap().poll_reserve(cx) {
261            Poll::Ready(_) => Poll::Ready(HaveUpstreamCapacity {
262                stream_id: self.stream_id,
263                chan: self.chan.take().unwrap(),
264            }),
265            Poll::Pending => Poll::Pending,
266        }
267    }
268}