Skip to main content

tokio_quiche/http3/driver/
mod.rs

1// Copyright (C) 2025, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27mod client;
28/// Wrapper for running HTTP/3 connections.
29pub mod connection;
30mod datagram;
31// `DriverHooks` must stay private to prevent users from creating their own
32// H3Drivers.
33mod hooks;
34mod server;
35mod streams;
36#[cfg(test)]
37pub mod test_utils;
38#[cfg(test)]
39mod tests;
40
41use std::collections::BTreeMap;
42use std::error::Error;
43use std::fmt;
44use std::marker::PhantomData;
45use std::sync::Arc;
46use std::time::Instant;
47
48use datagram_socket::StreamClosureKind;
49use foundations::telemetry::log;
50use futures::FutureExt;
51use futures_util::stream::FuturesUnordered;
52use quiche::h3;
53use quiche::h3::WireErrorCode;
54use tokio::select;
55use tokio::sync::mpsc;
56use tokio::sync::mpsc::error::TryRecvError;
57use tokio::sync::mpsc::error::TrySendError;
58use tokio::sync::mpsc::UnboundedReceiver;
59use tokio::sync::mpsc::UnboundedSender;
60use tokio_stream::StreamExt;
61use tokio_util::sync::PollSender;
62
63use self::hooks::DriverHooks;
64use self::hooks::InboundHeaders;
65use self::streams::FlowCtx;
66use self::streams::HaveUpstreamCapacity;
67use self::streams::ReceivedDownstreamData;
68use self::streams::StreamCtx;
69use self::streams::StreamReady;
70use self::streams::WaitForDownstreamData;
71use self::streams::WaitForStream;
72use self::streams::WaitForUpstreamCapacity;
73use crate::buf_factory::BufFactory;
74use crate::buf_factory::PooledBuf;
75use crate::buf_factory::PooledDgram;
76use crate::http3::settings::Http3Settings;
77use crate::http3::H3AuditStats;
78use crate::metrics::Metrics;
79use crate::quic::HandshakeInfo;
80use crate::quic::QuicCommand;
81use crate::quic::QuicheConnection;
82use crate::ApplicationOverQuic;
83use crate::QuicResult;
84
85pub use self::client::ClientEventStream;
86pub use self::client::ClientH3Command;
87pub use self::client::ClientH3Controller;
88pub use self::client::ClientH3Driver;
89pub use self::client::ClientH3Event;
90pub use self::client::ClientRequestSender;
91pub use self::client::NewClientRequest;
92pub use self::server::IsInEarlyData;
93pub use self::server::RawPriorityValue;
94pub use self::server::ServerEventStream;
95pub use self::server::ServerH3Command;
96pub use self::server::ServerH3Controller;
97pub use self::server::ServerH3Driver;
98pub use self::server::ServerH3Event;
99
100// The default priority for HTTP/3 responses if the application didn't provide
101// one.
102const DEFAULT_PRIO: h3::Priority = h3::Priority::new(3, true);
103
104// For a stream use a channel with 16 entries, which works out to 16 * 64KB =
105// 1MB of max buffered data.
106#[cfg(not(any(test, debug_assertions)))]
107const STREAM_CAPACITY: usize = 16;
108#[cfg(any(test, debug_assertions))]
109const STREAM_CAPACITY: usize = 1; // Set to 1 to stress write_pending under test conditions
110
111// For *all* flows use a shared channel with 2048 entries, which works out
112// to 3MB of max buffered data at 1500 bytes per datagram.
113const FLOW_CAPACITY: usize = 2048;
114
115/// Used by a local task to send [`OutboundFrame`]s to a peer on the
116/// stream or flow associated with this channel.
117pub type OutboundFrameSender = PollSender<OutboundFrame>;
118
119/// Used internally to receive [`OutboundFrame`]s which should be sent to a peer
120/// on the stream or flow associated with this channel.
121type OutboundFrameStream = mpsc::Receiver<OutboundFrame>;
122
123/// Used internally to send [`InboundFrame`]s (data) from the peer to a local
124/// task on the stream or flow associated with this channel.
125type InboundFrameSender = PollSender<InboundFrame>;
126
127/// Used by a local task to receive [`InboundFrame`]s (data) on the stream or
128/// flow associated with this channel.
129pub type InboundFrameStream = mpsc::Receiver<InboundFrame>;
130
131/// The error type used internally in [H3Driver].
132///
133/// Note that [`ApplicationOverQuic`] errors are not exposed to users at this
134/// time. The type is public to document the failure modes in [H3Driver].
135#[derive(Debug, PartialEq, Eq)]
136#[non_exhaustive]
137pub enum H3ConnectionError {
138    /// The controller task was shut down and is no longer listening.
139    ControllerWentAway,
140    /// Other error at the connection, but not stream level.
141    H3(h3::Error),
142    /// Received a GOAWAY frame from the peer.
143    GoAway,
144    /// Received data for a stream that was closed or never opened.
145    NonexistentStream,
146    /// The server's post-accept timeout was hit.
147    /// The timeout can be configured in [`Http3Settings`].
148    PostAcceptTimeout,
149}
150
151impl From<h3::Error> for H3ConnectionError {
152    fn from(err: h3::Error) -> Self {
153        H3ConnectionError::H3(err)
154    }
155}
156
157impl From<quiche::Error> for H3ConnectionError {
158    fn from(err: quiche::Error) -> Self {
159        H3ConnectionError::H3(h3::Error::TransportError(err))
160    }
161}
162
163impl Error for H3ConnectionError {}
164
165impl fmt::Display for H3ConnectionError {
166    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
167        let s: &dyn fmt::Display = match self {
168            Self::ControllerWentAway => &"controller went away",
169            Self::H3(e) => e,
170            Self::GoAway => &"goaway",
171            Self::NonexistentStream => &"nonexistent stream",
172            Self::PostAcceptTimeout => &"post accept timeout hit",
173        };
174
175        write!(f, "H3ConnectionError: {s}")
176    }
177}
178
179type H3ConnectionResult<T> = Result<T, H3ConnectionError>;
180
181/// HTTP/3 headers that were received on a stream.
182///
183/// `recv` is used to read the message body, while `send` is used to transmit
184/// data back to the peer.
185pub struct IncomingH3Headers {
186    /// Stream ID of the frame.
187    pub stream_id: u64,
188    /// The actual [`h3::Header`]s which were received.
189    pub headers: Vec<h3::Header>,
190    /// An [`OutboundFrameSender`] for streaming body data to the peer. For
191    /// [ClientH3Driver], note that the request body can also be passed a
192    /// cloned sender via [`NewClientRequest`].
193    pub send: OutboundFrameSender,
194    /// An [`InboundFrameStream`] of body data received from the peer.
195    pub recv: InboundFrameStream,
196    /// Whether there is a body associated with the incoming headers.
197    pub read_fin: bool,
198    /// Handle to the [`H3AuditStats`] for the message's stream.
199    pub h3_audit_stats: Arc<H3AuditStats>,
200}
201
202impl fmt::Debug for IncomingH3Headers {
203    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
204        f.debug_struct("IncomingH3Headers")
205            .field("stream_id", &self.stream_id)
206            .field("headers", &self.headers)
207            .field("read_fin", &self.read_fin)
208            .field("h3_audit_stats", &self.h3_audit_stats)
209            .finish()
210    }
211}
212
213/// [`H3Event`]s are produced by an [H3Driver] to describe HTTP/3 state updates.
214///
215/// Both [ServerH3Driver] and [ClientH3Driver] may extend this enum with
216/// endpoint-specific variants. The events must be consumed by users of the
217/// drivers, like a higher-level `Server` or `Client` controller.
218#[derive(Debug)]
219pub enum H3Event {
220    /// A SETTINGS frame was received.
221    IncomingSettings {
222        /// Raw HTTP/3 setting pairs, in the order received from the peer.
223        settings: Vec<(u64, u64)>,
224    },
225
226    /// A HEADERS frame was received on the given stream. This is either a
227    /// request or a response depending on the perspective of the [`H3Event`]
228    /// receiver.
229    IncomingHeaders(IncomingH3Headers),
230
231    /// A DATAGRAM flow was created and associated with the given `flow_id`.
232    /// This event is fired before a HEADERS event for CONNECT[-UDP] requests.
233    NewFlow {
234        /// Flow ID of the new flow.
235        flow_id: u64,
236        /// An [`OutboundFrameSender`] for transmitting datagrams to the peer.
237        send: OutboundFrameSender,
238        /// An [`InboundFrameStream`] for receiving datagrams from the peer.
239        recv: InboundFrameStream,
240    },
241    /// A RST_STREAM frame was seen on the given `stream_id`. The user of the
242    /// driver should clean up any state allocated for this stream.
243    ResetStream { stream_id: u64 },
244    /// The connection has irrecoverably errored and is shutting down.
245    ConnectionError(h3::Error),
246    /// The connection has been shutdown, optionally due to an
247    /// [`H3ConnectionError`].
248    ConnectionShutdown(Option<H3ConnectionError>),
249    /// Body data has been received over a stream.
250    BodyBytesReceived {
251        /// Stream ID of the body data.
252        stream_id: u64,
253        /// Number of bytes received.
254        num_bytes: u64,
255        /// Whether the stream is finished and won't yield any more data.
256        fin: bool,
257    },
258    /// The stream has been closed. This is used to signal stream closures that
259    /// don't result from RST_STREAM frames, unlike the
260    /// [`H3Event::ResetStream`] variant.
261    StreamClosed { stream_id: u64 },
262}
263
264impl H3Event {
265    /// Generates an event from an applicable [`H3ConnectionError`].
266    fn from_error(err: &H3ConnectionError) -> Option<Self> {
267        Some(match err {
268            H3ConnectionError::H3(e) => Self::ConnectionError(*e),
269            H3ConnectionError::PostAcceptTimeout => Self::ConnectionShutdown(
270                Some(H3ConnectionError::PostAcceptTimeout),
271            ),
272            _ => return None,
273        })
274    }
275}
276
277/// An [`OutboundFrame`] is a data frame that should be sent from a local task
278/// to a peer over a [`quiche::h3::Connection`].
279///
280/// This is used, for example, to send response body data to a peer, or proxied
281/// UDP datagrams.
282#[derive(Debug)]
283pub enum OutboundFrame {
284    /// Response headers to be sent to the peer, with optional priority.
285    Headers(Vec<h3::Header>, Option<quiche::h3::Priority>),
286    /// Response body/CONNECT downstream data plus FIN flag.
287    #[cfg(feature = "zero-copy")]
288    Body(crate::buf_factory::QuicheBuf, bool),
289    /// Response body/CONNECT downstream data plus FIN flag.
290    #[cfg(not(feature = "zero-copy"))]
291    Body(PooledBuf, bool),
292    /// CONNECT-UDP (DATAGRAM) downstream data plus flow ID.
293    Datagram(PooledDgram, u64),
294    /// Close the stream with a trailers, with optional priority.
295    Trailers(Vec<h3::Header>, Option<quiche::h3::Priority>),
296    /// An error encountered when serving the request. Stream should be closed.
297    PeerStreamError,
298    /// DATAGRAM flow explicitly closed.
299    FlowShutdown { flow_id: u64, stream_id: u64 },
300}
301
302impl OutboundFrame {
303    /// Creates a body frame with the provided buffer.
304    pub fn body(body: PooledBuf, fin: bool) -> Self {
305        #[cfg(feature = "zero-copy")]
306        let body = crate::buf_factory::QuicheBuf::new(body);
307
308        OutboundFrame::Body(body, fin)
309    }
310}
311
312/// An [`InboundFrame`] is a data frame that was received from the peer over a
313/// [`quiche::h3::Connection`]. This is used by peers to send body or datagrams
314/// to the local task.
315#[derive(Debug)]
316pub enum InboundFrame {
317    /// Request body/CONNECT upstream data plus FIN flag.
318    Body(PooledBuf, bool),
319    /// CONNECT-UDP (DATAGRAM) upstream data.
320    Datagram(PooledDgram),
321}
322
323/// A ready-made [`ApplicationOverQuic`] which can handle HTTP/3 and MASQUE.
324/// Depending on the `DriverHooks` in use, it powers either a client or a
325/// server.
326///
327/// Use the [ClientH3Driver] and [ServerH3Driver] aliases to access the
328/// respective driver types. The driver is passed into an I/O loop and
329/// communicates with the driver's user (e.g., an HTTP client or a server) via
330/// its associated [H3Controller]. The controller allows the application to both
331/// listen for [`H3Event`]s of note and send [`H3Command`]s into the I/O loop.
332pub struct H3Driver<H: DriverHooks> {
333    /// Configuration used to initialize `conn`. Created from [`Http3Settings`]
334    /// in the constructor.
335    h3_config: h3::Config,
336    /// The underlying HTTP/3 connection. Initialized in
337    /// `ApplicationOverQuic::on_conn_established`.
338    conn: Option<h3::Connection>,
339    /// State required by the client/server hooks.
340    hooks: H,
341    /// Sends [`H3Event`]s to the [H3Controller] paired with this driver.
342    h3_event_sender: mpsc::UnboundedSender<H::Event>,
343    /// Receives [`H3Command`]s from the [H3Controller] paired with this driver.
344    cmd_recv: mpsc::UnboundedReceiver<H::Command>,
345
346    /// A map of stream IDs to their [StreamCtx]. This is mainly used to
347    /// retrieve the internal Tokio channels associated with the stream.
348    stream_map: BTreeMap<u64, StreamCtx>,
349    /// A map of flow IDs to their [FlowCtx]. This is mainly used to retrieve
350    /// the internal Tokio channels associated with the flow.
351    flow_map: BTreeMap<u64, FlowCtx>,
352    /// Set of [`WaitForStream`] futures. A stream is added to this set if
353    /// we need to send to it and its channel is at capacity, or if we need
354    /// data from its channel and the channel is empty.
355    waiting_streams: FuturesUnordered<WaitForStream>,
356
357    /// Receives [`OutboundFrame`]s from all datagram flows on the connection.
358    dgram_recv: OutboundFrameStream,
359    /// Keeps the datagram channel open such that datagram flows can be created.
360    dgram_send: OutboundFrameSender,
361
362    /// The buffer used to interact with the underlying IoWorker.
363    pooled_buf: PooledBuf,
364    /// The maximum HTTP/3 stream ID seen on this connection.
365    max_stream_seen: u64,
366
367    /// Tracks whether we have forwarded the HTTP/3 SETTINGS frame
368    /// to the [H3Controller] once.
369    settings_received_and_forwarded: bool,
370}
371
372impl<H: DriverHooks> H3Driver<H> {
373    /// Builds a new [H3Driver] and an associated [H3Controller].
374    ///
375    /// The driver should then be passed to
376    /// [`InitialQuicConnection`](crate::InitialQuicConnection)'s `start`
377    /// method.
378    pub fn new(http3_settings: Http3Settings) -> (Self, H3Controller<H>) {
379        let (dgram_send, dgram_recv) = mpsc::channel(FLOW_CAPACITY);
380        let (cmd_sender, cmd_recv) = mpsc::unbounded_channel();
381        let (h3_event_sender, h3_event_recv) = mpsc::unbounded_channel();
382
383        (
384            H3Driver {
385                h3_config: (&http3_settings).into(),
386                conn: None,
387                hooks: H::new(&http3_settings),
388                h3_event_sender,
389                cmd_recv,
390
391                stream_map: BTreeMap::new(),
392                flow_map: BTreeMap::new(),
393
394                dgram_recv,
395                dgram_send: PollSender::new(dgram_send),
396                pooled_buf: BufFactory::get_max_buf(),
397                max_stream_seen: 0,
398
399                waiting_streams: FuturesUnordered::new(),
400
401                settings_received_and_forwarded: false,
402            },
403            H3Controller {
404                cmd_sender,
405                h3_event_recv: Some(h3_event_recv),
406            },
407        )
408    }
409
410    /// Retrieve the [FlowCtx] associated with the given `flow_id`. If no
411    /// context is found, a new one will be created.
412    fn get_or_insert_flow(
413        &mut self, flow_id: u64,
414    ) -> H3ConnectionResult<&mut FlowCtx> {
415        use std::collections::btree_map::Entry;
416        Ok(match self.flow_map.entry(flow_id) {
417            Entry::Vacant(e) => {
418                // This is a datagram for a new flow we haven't seen before
419                let (flow, recv) = FlowCtx::new(FLOW_CAPACITY);
420                let flow_req = H3Event::NewFlow {
421                    flow_id,
422                    recv,
423                    send: self.dgram_send.clone(),
424                };
425                self.h3_event_sender
426                    .send(flow_req.into())
427                    .map_err(|_| H3ConnectionError::ControllerWentAway)?;
428                e.insert(flow)
429            },
430            Entry::Occupied(e) => e.into_mut(),
431        })
432    }
433
434    /// Adds a [StreamCtx] to the stream map with the given `stream_id`.
435    fn insert_stream(&mut self, stream_id: u64, ctx: StreamCtx) {
436        self.stream_map.insert(stream_id, ctx);
437        self.max_stream_seen = self.max_stream_seen.max(stream_id);
438    }
439
440    /// Fetches body chunks from the [`quiche::h3::Connection`] and forwards
441    /// them to the stream's associated [`InboundFrameStream`].
442    fn process_h3_data(
443        &mut self, qconn: &mut QuicheConnection, stream_id: u64,
444    ) -> H3ConnectionResult<()> {
445        // Split self borrow between conn and stream_map
446        let conn = self.conn.as_mut().ok_or(Self::connection_not_present())?;
447        let ctx = self
448            .stream_map
449            .get_mut(&stream_id)
450            .ok_or(H3ConnectionError::NonexistentStream)?;
451
452        enum StreamStatus {
453            Done { close: bool },
454            Reset { wire_err_code: u64 },
455            Blocked,
456        }
457
458        let status = loop {
459            let Some(sender) = ctx.send.as_ref().and_then(PollSender::get_ref)
460            else {
461                // already waiting for capacity
462                break StreamStatus::Done { close: false };
463            };
464
465            let try_reserve_result = sender.try_reserve();
466            let permit = match try_reserve_result {
467                Ok(permit) => permit,
468                Err(TrySendError::Closed(())) => {
469                    // The channel has closed before we delivered a fin or reset
470                    // to the application.
471                    if !ctx.fin_or_reset_recv &&
472                        ctx.associated_dgram_flow_id.is_none()
473                    // The channel might be closed if the stream was used to
474                    // initiate a datagram exchange.
475                    // TODO: ideally, the application would still shut down the
476                    // stream properly. Once applications code
477                    // is fixed, we can remove this check.
478                    {
479                        let err = h3::WireErrorCode::RequestCancelled as u64;
480                        let _ = qconn.stream_shutdown(
481                            stream_id,
482                            quiche::Shutdown::Read,
483                            err,
484                        );
485                        drop(try_reserve_result); // needed to drop the borrow on ctx.
486                        ctx.handle_sent_stop_sending(err);
487                        // TODO: should we send an H3Event event to
488                        // h3_event_sender? We can only get here if the app
489                        // actively closed or dropped
490                        // the channel so any event we send would be more for
491                        // logging or auditing
492                    }
493                    break StreamStatus::Done {
494                        close: ctx.both_directions_done(),
495                    };
496                },
497                Err(TrySendError::Full(())) => {
498                    if ctx.fin_or_reset_recv || qconn.stream_readable(stream_id) {
499                        break StreamStatus::Blocked;
500                    }
501                    break StreamStatus::Done { close: false };
502                },
503            };
504
505            if ctx.fin_or_reset_recv {
506                // Signal end-of-body to upstream
507                permit
508                    .send(InboundFrame::Body(BufFactory::get_empty_buf(), true));
509                break StreamStatus::Done {
510                    close: ctx.fin_or_reset_sent,
511                };
512            }
513
514            match conn.recv_body(qconn, stream_id, &mut self.pooled_buf) {
515                Ok(n) => {
516                    let mut body = std::mem::replace(
517                        &mut self.pooled_buf,
518                        BufFactory::get_max_buf(),
519                    );
520                    body.truncate(n);
521
522                    ctx.audit_stats.add_downstream_bytes_recvd(n as u64);
523                    let event = H3Event::BodyBytesReceived {
524                        stream_id,
525                        num_bytes: n as u64,
526                        fin: false,
527                    };
528                    let _ = self.h3_event_sender.send(event.into());
529
530                    permit.send(InboundFrame::Body(body, false));
531                },
532                Err(h3::Error::Done) =>
533                    break StreamStatus::Done { close: false },
534                Err(h3::Error::TransportError(quiche::Error::StreamReset(
535                    code,
536                ))) => {
537                    break StreamStatus::Reset {
538                        wire_err_code: code,
539                    };
540                },
541                Err(_) => break StreamStatus::Done { close: true },
542            }
543        };
544
545        match status {
546            StreamStatus::Done { close } => {
547                if close {
548                    return self.cleanup_stream(qconn, stream_id);
549                }
550
551                // The QUIC stream is finished, manually invoke `process_h3_fin`
552                // in case `h3::poll()` is never called again.
553                //
554                // Note that this case will not conflict with StreamStatus::Done
555                // being returned due to the body channel being
556                // blocked. qconn.stream_finished() will guarantee
557                // that we've fully parsed the body as it only returns true
558                // if we've seen a Fin for the read half of the stream.
559                if !ctx.fin_or_reset_recv && qconn.stream_finished(stream_id) {
560                    return self.process_h3_fin(qconn, stream_id);
561                }
562            },
563            StreamStatus::Reset { wire_err_code } => {
564                debug_assert!(ctx.send.is_some());
565                ctx.handle_recvd_reset(wire_err_code);
566                self.h3_event_sender
567                    .send(H3Event::ResetStream { stream_id }.into())
568                    .map_err(|_| H3ConnectionError::ControllerWentAway)?;
569                if ctx.both_directions_done() {
570                    return self.cleanup_stream(qconn, stream_id);
571                }
572            },
573            StreamStatus::Blocked => {
574                self.waiting_streams.push(ctx.wait_for_send(stream_id));
575            },
576        }
577
578        Ok(())
579    }
580
581    /// Processes an end-of-stream event from the [`quiche::h3::Connection`].
582    fn process_h3_fin(
583        &mut self, qconn: &mut QuicheConnection, stream_id: u64,
584    ) -> H3ConnectionResult<()> {
585        let ctx = self
586            .stream_map
587            .get_mut(&stream_id)
588            .filter(|c| !c.fin_or_reset_recv);
589        let Some(ctx) = ctx else {
590            // Stream is already finished, nothing to do
591            return Ok(());
592        };
593
594        ctx.fin_or_reset_recv = true;
595        ctx.audit_stats
596            .set_recvd_stream_fin(StreamClosureKind::Explicit);
597
598        // It's important to send this H3Event before process_h3_data so that
599        // a server can (potentially) generate the control response before the
600        // corresponding receiver drops.
601        let event = H3Event::BodyBytesReceived {
602            stream_id,
603            num_bytes: 0,
604            fin: true,
605        };
606        let _ = self.h3_event_sender.send(event.into());
607
608        // Communicate fin to upstream. Since `ctx.fin_recv` is true now,
609        // there can't be a recursive loop.
610        self.process_h3_data(qconn, stream_id)
611    }
612
613    /// Processes a single [`quiche::h3::Event`] received from the underlying
614    /// [`quiche::h3::Connection`]. Some events are dispatched to helper
615    /// methods.
616    fn process_read_event(
617        &mut self, qconn: &mut QuicheConnection, stream_id: u64, event: h3::Event,
618    ) -> H3ConnectionResult<()> {
619        self.forward_settings()?;
620
621        match event {
622            // Requests/responses are exclusively handled by hooks.
623            h3::Event::Headers { list, more_frames } =>
624                H::headers_received(self, qconn, InboundHeaders {
625                    stream_id,
626                    headers: list,
627                    has_body: more_frames,
628                }),
629
630            h3::Event::Data => self.process_h3_data(qconn, stream_id),
631            h3::Event::Finished => self.process_h3_fin(qconn, stream_id),
632
633            h3::Event::Reset(code) => {
634                if let Some(ctx) = self.stream_map.get_mut(&stream_id) {
635                    ctx.handle_recvd_reset(code);
636                    // See if we are waiting on this stream and close the channel
637                    // if we are. If we are not waiting, `handle_recvd_reset()`
638                    // will have taken care of closing.
639                    for pending in self.waiting_streams.iter_mut() {
640                        match pending {
641                            WaitForStream::Upstream(
642                                WaitForUpstreamCapacity {
643                                    stream_id: id,
644                                    chan: Some(chan),
645                                },
646                            ) if stream_id == *id => {
647                                chan.close();
648                            },
649                            _ => {},
650                        }
651                    }
652
653                    self.h3_event_sender
654                        .send(H3Event::ResetStream { stream_id }.into())
655                        .map_err(|_| H3ConnectionError::ControllerWentAway)?;
656                    if ctx.both_directions_done() {
657                        return self.cleanup_stream(qconn, stream_id);
658                    }
659                }
660
661                // TODO: if we don't have the stream in our map: should we
662                // send the H3Event::ResetStream?
663                Ok(())
664            },
665
666            h3::Event::PriorityUpdate => Ok(()),
667            h3::Event::GoAway => Err(H3ConnectionError::GoAway),
668        }
669    }
670
671    /// The SETTINGS frame can be received at any point, so we
672    /// need to check `peer_settings_raw` to decide if we've received it.
673    ///
674    /// Settings should only be sent once, so we generate a single event
675    /// when `peer_settings_raw` transitions from None to Some.
676    fn forward_settings(&mut self) -> H3ConnectionResult<()> {
677        if self.settings_received_and_forwarded {
678            return Ok(());
679        }
680
681        // capture the peer settings and forward it
682        if let Some(settings) = self.conn_mut()?.peer_settings_raw() {
683            let incoming_settings = H3Event::IncomingSettings {
684                settings: settings.to_vec(),
685            };
686
687            self.h3_event_sender
688                .send(incoming_settings.into())
689                .map_err(|_| H3ConnectionError::ControllerWentAway)?;
690
691            self.settings_received_and_forwarded = true;
692        }
693        Ok(())
694    }
695
696    /// Send an individual frame to the underlying [`quiche::h3::Connection`] to
697    /// be flushed at a later time.
698    ///
699    /// `Self::process_writes` will iterate over all writable streams and call
700    /// this method in a loop for each stream to send all writable packets.
701    fn process_write_frame(
702        conn: &mut h3::Connection, qconn: &mut QuicheConnection,
703        ctx: &mut StreamCtx,
704    ) -> h3::Result<()> {
705        let Some(frame) = &mut ctx.queued_frame else {
706            return Ok(());
707        };
708
709        let audit_stats = &ctx.audit_stats;
710        let stream_id = audit_stats.stream_id();
711
712        match frame {
713            OutboundFrame::Headers(headers, priority) => {
714                let prio = priority.as_ref().unwrap_or(&DEFAULT_PRIO);
715
716                let res = if ctx.initial_headers_sent {
717                    // Initial headers were already sent, send additional
718                    // headers now.
719                    conn.send_additional_headers_with_priority(
720                        qconn, stream_id, headers, prio, false, false,
721                    )
722                } else {
723                    // Send initial headers.
724                    conn.send_response_with_priority(
725                        qconn, stream_id, headers, prio, false,
726                    )
727                    .inspect(|_| ctx.initial_headers_sent = true)
728                };
729
730                if let Err(h3::Error::StreamBlocked) = res {
731                    ctx.first_full_headers_flush_fail_time
732                        .get_or_insert(Instant::now());
733                }
734
735                if res.is_ok() {
736                    if let Some(first) =
737                        ctx.first_full_headers_flush_fail_time.take()
738                    {
739                        ctx.audit_stats.add_header_flush_duration(
740                            Instant::now().duration_since(first),
741                        );
742                    }
743                }
744
745                res
746            },
747
748            OutboundFrame::Body(body, fin) => {
749                let len = body.as_ref().len();
750                if len == 0 && !*fin {
751                    // quiche doesn't allow sending an empty body when the fin
752                    // flag is not set
753                    return Ok(());
754                }
755                if *fin {
756                    // If this is the last body frame, drop the receiver in the
757                    // stream map to signal that we shouldn't receive any more
758                    // frames. NOTE: we can't use `mpsc::Receiver::close()`
759                    // due to an inconsistency in how tokio handles reading
760                    // from a closed mpsc channel https://github.com/tokio-rs/tokio/issues/7631
761                    ctx.recv = None;
762                }
763                #[cfg(feature = "zero-copy")]
764                let n = conn.send_body_zc(qconn, stream_id, body, *fin)?;
765
766                #[cfg(not(feature = "zero-copy"))]
767                let n = conn.send_body(qconn, stream_id, body, *fin)?;
768
769                audit_stats.add_downstream_bytes_sent(n as _);
770                if n != len {
771                    // Couldn't write the entire body, keep what remains for
772                    // future retry.
773                    #[cfg(not(feature = "zero-copy"))]
774                    body.pop_front(n);
775
776                    Err(h3::Error::StreamBlocked)
777                } else {
778                    if *fin {
779                        Self::on_fin_sent(ctx)?;
780                    }
781                    Ok(())
782                }
783            },
784
785            OutboundFrame::Trailers(headers, priority) => {
786                let prio = priority.as_ref().unwrap_or(&DEFAULT_PRIO);
787
788                // trailers always set fin=true
789                let res = conn.send_additional_headers_with_priority(
790                    qconn, stream_id, headers, prio, true, true,
791                );
792
793                if res.is_ok() {
794                    Self::on_fin_sent(ctx)?;
795                }
796                res
797            },
798
799            OutboundFrame::PeerStreamError => Err(h3::Error::MessageError),
800
801            OutboundFrame::FlowShutdown { .. } => {
802                unreachable!("Only flows send shutdowns")
803            },
804
805            OutboundFrame::Datagram(..) => {
806                unreachable!("Only flows send datagrams")
807            },
808        }
809    }
810
811    fn on_fin_sent(ctx: &mut StreamCtx) -> h3::Result<()> {
812        ctx.recv = None;
813        ctx.fin_or_reset_sent = true;
814        ctx.audit_stats
815            .set_sent_stream_fin(StreamClosureKind::Explicit);
816        if ctx.fin_or_reset_recv {
817            // Return a TransportError to trigger stream cleanup
818            // instead of h3::Error::Done
819            Err(h3::Error::TransportError(quiche::Error::Done))
820        } else {
821            Ok(())
822        }
823    }
824
825    /// Resumes reads or writes to the connection when a stream channel becomes
826    /// unblocked.
827    ///
828    /// If we were waiting for more data from a channel, we resume writing to
829    /// the connection. Otherwise, we were blocked on channel capacity and
830    /// continue reading from the connection. `Upstream` in this context is
831    /// the consumer of the stream.
832    fn upstream_ready(
833        &mut self, qconn: &mut QuicheConnection, ready: StreamReady,
834    ) -> H3ConnectionResult<()> {
835        match ready {
836            StreamReady::Downstream(r) => self.upstream_read_ready(qconn, r),
837            StreamReady::Upstream(r) => self.upstream_write_ready(qconn, r),
838        }
839    }
840
841    fn upstream_read_ready(
842        &mut self, qconn: &mut QuicheConnection,
843        read_ready: ReceivedDownstreamData,
844    ) -> H3ConnectionResult<()> {
845        let ReceivedDownstreamData {
846            stream_id,
847            chan,
848            data,
849        } = read_ready;
850
851        match self.stream_map.get_mut(&stream_id) {
852            None => Ok(()),
853            Some(stream) => {
854                stream.recv = Some(chan);
855                stream.queued_frame = data;
856                self.process_writable_stream(qconn, stream_id)
857            },
858        }
859    }
860
861    fn upstream_write_ready(
862        &mut self, qconn: &mut QuicheConnection,
863        write_ready: HaveUpstreamCapacity,
864    ) -> H3ConnectionResult<()> {
865        let HaveUpstreamCapacity {
866            stream_id,
867            mut chan,
868        } = write_ready;
869
870        match self.stream_map.get_mut(&stream_id) {
871            None => Ok(()),
872            Some(stream) => {
873                chan.abort_send(); // Have to do it to release the associated permit
874                stream.send = Some(chan);
875                self.process_h3_data(qconn, stream_id)
876            },
877        }
878    }
879
880    /// Processes all queued outbound datagrams from the `dgram_recv` channel.
881    fn dgram_ready(
882        &mut self, qconn: &mut QuicheConnection, frame: OutboundFrame,
883    ) -> H3ConnectionResult<()> {
884        let mut frame = Ok(frame);
885
886        loop {
887            match frame {
888                Ok(OutboundFrame::Datagram(dgram, flow_id)) => {
889                    // Drop datagrams if there is no capacity
890                    let _ = datagram::send_h3_dgram(qconn, flow_id, dgram);
891                },
892                Ok(OutboundFrame::FlowShutdown { flow_id, stream_id }) => {
893                    self.shutdown_stream(
894                        qconn,
895                        stream_id,
896                        StreamShutdown::Both {
897                            read_error_code: WireErrorCode::NoError as u64,
898                            write_error_code: WireErrorCode::NoError as u64,
899                        },
900                    )?;
901                    self.flow_map.remove(&flow_id);
902                    break;
903                },
904                Ok(_) => unreachable!("Flows can't send frame of other types"),
905                Err(TryRecvError::Empty) => break,
906                Err(TryRecvError::Disconnected) =>
907                    return Err(H3ConnectionError::ControllerWentAway),
908            }
909
910            frame = self.dgram_recv.try_recv();
911        }
912
913        Ok(())
914    }
915
916    /// Return a mutable reference to the driver's HTTP/3 connection.
917    ///
918    /// If the connection doesn't exist yet, this function returns
919    /// a `Self::connection_not_present()` error.
920    fn conn_mut(&mut self) -> H3ConnectionResult<&mut h3::Connection> {
921        self.conn.as_mut().ok_or(Self::connection_not_present())
922    }
923
924    /// Alias for [`quiche::Error::TlsFail`], which is used in the case where
925    /// this driver doesn't have an established HTTP/3 connection attached
926    /// to it yet.
927    const fn connection_not_present() -> H3ConnectionError {
928        H3ConnectionError::H3(h3::Error::TransportError(quiche::Error::TlsFail))
929    }
930
931    /// Cleans up internal state for the indicated HTTP/3 stream.
932    ///
933    /// This function removes the stream from the stream map, closes any pending
934    /// futures, removes associated DATAGRAM flows, and sends a
935    /// [`H3Event::StreamClosed`] event (for servers).
936    fn cleanup_stream(
937        &mut self, qconn: &mut QuicheConnection, stream_id: u64,
938    ) -> H3ConnectionResult<()> {
939        let Some(stream_ctx) = self.stream_map.remove(&stream_id) else {
940            return Ok(());
941        };
942
943        // Find if the stream also has any pending futures associated with it
944        for pending in self.waiting_streams.iter_mut() {
945            match pending {
946                WaitForStream::Downstream(WaitForDownstreamData {
947                    stream_id: id,
948                    chan: Some(chan),
949                }) if stream_id == *id => {
950                    chan.close();
951                },
952                WaitForStream::Upstream(WaitForUpstreamCapacity {
953                    stream_id: id,
954                    chan: Some(chan),
955                }) if stream_id == *id => {
956                    chan.close();
957                },
958                _ => {},
959            }
960        }
961
962        // Close any DATAGRAM-proxying channels when we close the stream, if they
963        // exist
964        if let Some(mapped_flow_id) = stream_ctx.associated_dgram_flow_id {
965            self.flow_map.remove(&mapped_flow_id);
966        }
967
968        if qconn.is_server() {
969            // Signal the server to remove the stream from its map
970            let _ = self
971                .h3_event_sender
972                .send(H3Event::StreamClosed { stream_id }.into());
973        }
974
975        Ok(())
976    }
977
978    /// Shuts down the indicated HTTP/3 stream by sending frames and cleaning
979    /// up then cleans up internal state by calling
980    /// [`Self::cleanup_stream`].
981    fn shutdown_stream(
982        &mut self, qconn: &mut QuicheConnection, stream_id: u64,
983        shutdown: StreamShutdown,
984    ) -> H3ConnectionResult<()> {
985        let Some(stream_ctx) = self.stream_map.get(&stream_id) else {
986            return Ok(());
987        };
988
989        let audit_stats = &stream_ctx.audit_stats;
990
991        match shutdown {
992            StreamShutdown::Read { error_code } => {
993                audit_stats.set_sent_stop_sending_error_code(error_code as _);
994                let _ = qconn.stream_shutdown(
995                    stream_id,
996                    quiche::Shutdown::Read,
997                    error_code,
998                );
999            },
1000            StreamShutdown::Write { error_code } => {
1001                audit_stats.set_sent_reset_stream_error_code(error_code as _);
1002                let _ = qconn.stream_shutdown(
1003                    stream_id,
1004                    quiche::Shutdown::Write,
1005                    error_code,
1006                );
1007            },
1008            StreamShutdown::Both {
1009                read_error_code,
1010                write_error_code,
1011            } => {
1012                audit_stats
1013                    .set_sent_stop_sending_error_code(read_error_code as _);
1014                let _ = qconn.stream_shutdown(
1015                    stream_id,
1016                    quiche::Shutdown::Read,
1017                    read_error_code,
1018                );
1019                audit_stats
1020                    .set_sent_reset_stream_error_code(write_error_code as _);
1021                let _ = qconn.stream_shutdown(
1022                    stream_id,
1023                    quiche::Shutdown::Write,
1024                    write_error_code,
1025                );
1026            },
1027        }
1028
1029        self.cleanup_stream(qconn, stream_id)
1030    }
1031
1032    /// Handles a regular [`H3Command`]. May be called internally by
1033    /// [DriverHooks] for non-endpoint-specific [`H3Command`]s.
1034    fn handle_core_command(
1035        &mut self, qconn: &mut QuicheConnection, cmd: H3Command,
1036    ) -> H3ConnectionResult<()> {
1037        match cmd {
1038            H3Command::QuicCmd(cmd) => cmd.execute(qconn),
1039            H3Command::GoAway => {
1040                let max_id = self.max_stream_seen;
1041                self.conn_mut()
1042                    .expect("connection should be established")
1043                    .send_goaway(qconn, max_id)?;
1044            },
1045            H3Command::ShutdownStream {
1046                stream_id,
1047                shutdown,
1048            } => {
1049                self.shutdown_stream(qconn, stream_id, shutdown)?;
1050            },
1051        }
1052        Ok(())
1053    }
1054}
1055
1056impl<H: DriverHooks> H3Driver<H> {
1057    /// Reads all buffered datagrams out of `qconn` and distributes them to
1058    /// their flow channels.
1059    fn process_available_dgrams(
1060        &mut self, qconn: &mut QuicheConnection,
1061    ) -> H3ConnectionResult<()> {
1062        loop {
1063            match datagram::receive_h3_dgram(qconn) {
1064                Ok((flow_id, dgram)) => {
1065                    self.get_or_insert_flow(flow_id)?.send_best_effort(dgram);
1066                },
1067                Err(quiche::Error::Done) => return Ok(()),
1068                Err(err) => return Err(H3ConnectionError::from(err)),
1069            }
1070        }
1071    }
1072
1073    /// Flushes any queued-up frames for `stream_id` into `qconn` until either
1074    /// there is no more capacity in `qconn` or no more frames to send.
1075    fn process_writable_stream(
1076        &mut self, qconn: &mut QuicheConnection, stream_id: u64,
1077    ) -> H3ConnectionResult<()> {
1078        // Split self borrow between conn and stream_map
1079        let conn = self.conn.as_mut().ok_or(Self::connection_not_present())?;
1080        let Some(ctx) = self.stream_map.get_mut(&stream_id) else {
1081            return Ok(()); // Unknown stream_id
1082        };
1083
1084        loop {
1085            // Process each writable frame, queue the next frame for processing
1086            // and shut down any errored streams.
1087            match Self::process_write_frame(conn, qconn, ctx) {
1088                Ok(()) => ctx.queued_frame = None,
1089                Err(h3::Error::StreamBlocked | h3::Error::Done) => break,
1090                Err(h3::Error::MessageError) => {
1091                    return self.shutdown_stream(
1092                        qconn,
1093                        stream_id,
1094                        StreamShutdown::Both {
1095                            read_error_code: WireErrorCode::MessageError as u64,
1096                            write_error_code: WireErrorCode::MessageError as u64,
1097                        },
1098                    );
1099                },
1100                Err(h3::Error::TransportError(quiche::Error::StreamStopped(
1101                    e,
1102                ))) => {
1103                    ctx.handle_recvd_stop_sending(e);
1104                    if ctx.both_directions_done() {
1105                        return self.cleanup_stream(qconn, stream_id);
1106                    } else {
1107                        return Ok(());
1108                    }
1109                },
1110                Err(h3::Error::TransportError(
1111                    quiche::Error::InvalidStreamState(stream),
1112                )) => {
1113                    return self.cleanup_stream(qconn, stream);
1114                },
1115                Err(_) => {
1116                    return self.cleanup_stream(qconn, stream_id);
1117                },
1118            }
1119
1120            let Some(recv) = ctx.recv.as_mut() else {
1121                // This stream is already waiting for data or we wrote a fin and
1122                // closed the channel.
1123                debug_assert!(
1124                    ctx.queued_frame.is_none(),
1125                    "We MUST NOT have a queued frame if we are already waiting on 
1126                    more data from the channel"
1127                );
1128                return Ok(());
1129            };
1130
1131            // Attempt to queue the next frame for processing. The corresponding
1132            // sender is created at the same time as the `StreamCtx`
1133            // and ultimately ends up in an `H3Body`. The body then
1134            // determines which frames to send to the peer via
1135            // this processing loop.
1136            match recv.try_recv() {
1137                Ok(frame) => ctx.queued_frame = Some(frame),
1138                Err(TryRecvError::Disconnected) => {
1139                    if !ctx.fin_or_reset_sent &&
1140                        ctx.associated_dgram_flow_id.is_none()
1141                    // The channel might be closed if the stream was used to
1142                    // initiate a datagram exchange.
1143                    // TODO: ideally, the application would still shut down the
1144                    // stream properly. Once applications code
1145                    // is fixed, we can remove this check.
1146                    {
1147                        // The channel closed without having written a fin. Send a
1148                        // RESET_STREAM to indicate we won't be writing anything
1149                        // else
1150                        let err = h3::WireErrorCode::RequestCancelled as u64;
1151                        let _ = qconn.stream_shutdown(
1152                            stream_id,
1153                            quiche::Shutdown::Write,
1154                            err,
1155                        );
1156                        ctx.handle_sent_reset(err);
1157                        if ctx.both_directions_done() {
1158                            return self.cleanup_stream(qconn, stream_id);
1159                        }
1160                    }
1161                    break;
1162                },
1163                Err(TryRecvError::Empty) => {
1164                    self.waiting_streams.push(ctx.wait_for_recv(stream_id));
1165                    break;
1166                },
1167            }
1168        }
1169
1170        Ok(())
1171    }
1172
1173    /// Tests `qconn` for either a local or peer error and increments
1174    /// the associated HTTP/3 or QUIC error counter.
1175    fn record_quiche_error(qconn: &mut QuicheConnection, metrics: &impl Metrics) {
1176        // split metrics between local/peer and QUIC/HTTP/3 level errors
1177        if let Some(err) = qconn.local_error() {
1178            if err.is_app {
1179                metrics.local_h3_conn_close_error_count(err.error_code.into())
1180            } else {
1181                metrics.local_quic_conn_close_error_count(err.error_code.into())
1182            }
1183            .inc();
1184        } else if let Some(err) = qconn.peer_error() {
1185            if err.is_app {
1186                metrics.peer_h3_conn_close_error_count(err.error_code.into())
1187            } else {
1188                metrics.peer_quic_conn_close_error_count(err.error_code.into())
1189            }
1190            .inc();
1191        }
1192    }
1193}
1194
1195impl<H: DriverHooks> ApplicationOverQuic for H3Driver<H> {
1196    fn on_conn_established(
1197        &mut self, quiche_conn: &mut QuicheConnection,
1198        handshake_info: &HandshakeInfo,
1199    ) -> QuicResult<()> {
1200        let conn = h3::Connection::with_transport(quiche_conn, &self.h3_config)?;
1201        self.conn = Some(conn);
1202
1203        H::conn_established(self, quiche_conn, handshake_info)?;
1204        Ok(())
1205    }
1206
1207    #[inline]
1208    fn should_act(&self) -> bool {
1209        self.conn.is_some()
1210    }
1211
1212    #[inline]
1213    fn buffer(&mut self) -> &mut [u8] {
1214        &mut self.pooled_buf
1215    }
1216
1217    /// Poll the underlying [`quiche::h3::Connection`] for
1218    /// [`quiche::h3::Event`]s and DATAGRAMs, delegating processing to
1219    /// `Self::process_read_event`.
1220    ///
1221    /// If a DATAGRAM is found, it is sent to the receiver on its channel.
1222    fn process_reads(&mut self, qconn: &mut QuicheConnection) -> QuicResult<()> {
1223        loop {
1224            match self.conn_mut()?.poll(qconn) {
1225                Ok((stream_id, event)) =>
1226                    self.process_read_event(qconn, stream_id, event)?,
1227                Err(h3::Error::Done) => break,
1228                Err(err) => {
1229                    // Don't bubble error up, instead keep the worker loop going
1230                    // until quiche reports the connection is
1231                    // closed.
1232                    log::debug!("connection closed due to h3 protocol error"; "error"=>?err);
1233                    return Ok(());
1234                },
1235            };
1236        }
1237
1238        self.process_available_dgrams(qconn)?;
1239        Ok(())
1240    }
1241
1242    /// Write as much data as possible into the [`quiche::h3::Connection`] from
1243    /// all sources. This will attempt to write any queued frames into their
1244    /// respective streams, if writable.
1245    fn process_writes(&mut self, qconn: &mut QuicheConnection) -> QuicResult<()> {
1246        while let Some(stream_id) = qconn.stream_writable_next() {
1247            self.process_writable_stream(qconn, stream_id)?;
1248        }
1249
1250        // Also optimistically check for any ready streams
1251        while let Some(Some(ready)) = self.waiting_streams.next().now_or_never() {
1252            self.upstream_ready(qconn, ready)?;
1253        }
1254
1255        Ok(())
1256    }
1257
1258    /// Reports connection-level error metrics and forwards
1259    /// IOWorker errors to the associated [H3Controller].
1260    fn on_conn_close<M: Metrics>(
1261        &mut self, quiche_conn: &mut QuicheConnection, metrics: &M,
1262        work_loop_result: &QuicResult<()>,
1263    ) {
1264        let max_stream_seen = self.max_stream_seen;
1265        metrics
1266            .maximum_writable_streams()
1267            .observe(max_stream_seen as f64);
1268
1269        let Err(work_loop_error) = work_loop_result else {
1270            return;
1271        };
1272
1273        Self::record_quiche_error(quiche_conn, metrics);
1274
1275        let Some(h3_err) = work_loop_error.downcast_ref::<H3ConnectionError>()
1276        else {
1277            log::error!("Found non-H3ConnectionError"; "error" => %work_loop_error);
1278            return;
1279        };
1280
1281        if matches!(h3_err, H3ConnectionError::ControllerWentAway) {
1282            // Inform client that we won't (can't) respond anymore
1283            let _ = quiche_conn.close(true, WireErrorCode::NoError as u64, &[]);
1284            return;
1285        }
1286
1287        if let Some(ev) = H3Event::from_error(h3_err) {
1288            let _ = self.h3_event_sender.send(ev.into());
1289            #[expect(clippy::needless_return)]
1290            return; // avoid accidental fallthrough in the future
1291        }
1292    }
1293
1294    /// Wait for incoming data from the [H3Controller]. The next iteration of
1295    /// the I/O loop commences when one of the `select!`ed futures triggers.
1296    #[inline]
1297    async fn wait_for_data(
1298        &mut self, qconn: &mut QuicheConnection,
1299    ) -> QuicResult<()> {
1300        select! {
1301            biased;
1302            Some(ready) = self.waiting_streams.next() => self.upstream_ready(qconn, ready),
1303            Some(dgram) = self.dgram_recv.recv() => self.dgram_ready(qconn, dgram),
1304            Some(cmd) = self.cmd_recv.recv() => H::conn_command(self, qconn, cmd),
1305            r = self.hooks.wait_for_action(qconn), if H::has_wait_action(self) => r,
1306        }?;
1307
1308        // Make sure controller is not starved, but also not prioritized in the
1309        // biased select. So poll it last, however also perform a try_recv
1310        // each iteration.
1311        if let Ok(cmd) = self.cmd_recv.try_recv() {
1312            H::conn_command(self, qconn, cmd)?;
1313        }
1314
1315        Ok(())
1316    }
1317}
1318
1319impl<H: DriverHooks> Drop for H3Driver<H> {
1320    fn drop(&mut self) {
1321        for stream in self.stream_map.values() {
1322            stream
1323                .audit_stats
1324                .set_recvd_stream_fin(StreamClosureKind::Implicit);
1325        }
1326    }
1327}
1328
1329/// [`H3Command`]s are sent by the [H3Controller] to alter the [H3Driver]'s
1330/// state.
1331///
1332/// Both [ServerH3Driver] and [ClientH3Driver] may extend this enum with
1333/// endpoint-specific variants.
1334#[derive(Debug)]
1335pub enum H3Command {
1336    /// A connection-level command that executes directly on the
1337    /// [`quiche::Connection`].
1338    QuicCmd(QuicCommand),
1339    /// Send a GOAWAY frame to the peer to initiate a graceful connection
1340    /// shutdown.
1341    GoAway,
1342    /// Shuts down a stream in the specified direction(s) and removes it from
1343    /// local state.
1344    ///
1345    /// This removes the stream from local state and sends a `RESET_STREAM`
1346    /// frame (for write direction) and/or a `STOP_SENDING` frame (for read
1347    /// direction) to the peer. See [`quiche::Connection::stream_shutdown`]
1348    /// for details.
1349    ShutdownStream {
1350        stream_id: u64,
1351        shutdown: StreamShutdown,
1352    },
1353}
1354
1355/// Specifies which direction(s) of a stream to shut down.
1356///
1357/// Used with [`H3Controller::shutdown_stream`] and the internal
1358/// `shutdown_stream` function to control whether to send a `STOP_SENDING` frame
1359/// (read direction), and/or a `RESET_STREAM` frame (write direction)
1360///
1361/// Note: Despite its name, "shutdown" here refers to signaling the peer about
1362/// stream termination, not sending a FIN flag. `STOP_SENDING` asks the peer to
1363/// stop sending data, while `RESET_STREAM` abruptly terminates the write side.
1364#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1365pub enum StreamShutdown {
1366    /// Shut down only the read direction (sends `STOP_SENDING` frame with the
1367    /// given error code).
1368    Read { error_code: u64 },
1369    /// Shut down only the write direction (sends `RESET_STREAM` frame with the
1370    /// given error code).
1371    Write { error_code: u64 },
1372    /// Shut down both directions (sends both `STOP_SENDING` and `RESET_STREAM`
1373    /// frames).
1374    Both {
1375        read_error_code: u64,
1376        write_error_code: u64,
1377    },
1378}
1379
1380/// Sends [`H3Command`]s to an [H3Driver]. The sender is typed and internally
1381/// wraps instances of `T` in the appropriate `H3Command` variant.
1382pub struct RequestSender<C, T> {
1383    sender: UnboundedSender<C>,
1384    // Required to work around dangling type parameter
1385    _r: PhantomData<fn() -> T>,
1386}
1387
1388impl<C, T: Into<C>> RequestSender<C, T> {
1389    /// Send a request to the [H3Driver]. This can only fail if the driver is
1390    /// gone.
1391    #[inline(always)]
1392    pub fn send(&self, v: T) -> Result<(), mpsc::error::SendError<C>> {
1393        self.sender.send(v.into())
1394    }
1395}
1396
1397impl<C, T> Clone for RequestSender<C, T> {
1398    fn clone(&self) -> Self {
1399        Self {
1400            sender: self.sender.clone(),
1401            _r: Default::default(),
1402        }
1403    }
1404}
1405
1406/// Interface to communicate with a paired [H3Driver].
1407///
1408/// An [H3Controller] receives [`H3Event`]s from its driver, which must be
1409/// consumed by the application built on top of the driver to react to incoming
1410/// events. The controller also allows the application to send ad-hoc
1411/// [`H3Command`]s to the driver, which will be processed when the driver waits
1412/// for incoming data.
1413pub struct H3Controller<H: DriverHooks> {
1414    /// Sends [`H3Command`]s to the [H3Driver], like [`QuicCommand`]s or
1415    /// outbound HTTP requests.
1416    cmd_sender: UnboundedSender<H::Command>,
1417    /// Receives [`H3Event`]s from the [H3Driver]. Can be extracted and
1418    /// used independently of the [H3Controller].
1419    h3_event_recv: Option<UnboundedReceiver<H::Event>>,
1420}
1421
1422impl<H: DriverHooks> H3Controller<H> {
1423    /// Gets a mut reference to the [`H3Event`] receiver for the paired
1424    /// [H3Driver].
1425    pub fn event_receiver_mut(&mut self) -> &mut UnboundedReceiver<H::Event> {
1426        self.h3_event_recv
1427            .as_mut()
1428            .expect("No event receiver on H3Controller")
1429    }
1430
1431    /// Takes the [`H3Event`] receiver for the paired [H3Driver].
1432    pub fn take_event_receiver(&mut self) -> UnboundedReceiver<H::Event> {
1433        self.h3_event_recv
1434            .take()
1435            .expect("No event receiver on H3Controller")
1436    }
1437
1438    /// Creates a [`QuicCommand`] sender for the paired [H3Driver].
1439    pub fn cmd_sender(&self) -> RequestSender<H::Command, QuicCommand> {
1440        RequestSender {
1441            sender: self.cmd_sender.clone(),
1442            _r: Default::default(),
1443        }
1444    }
1445
1446    /// Sends a GOAWAY frame to initiate a graceful connection shutdown.
1447    pub fn send_goaway(&self) {
1448        let _ = self.cmd_sender.send(H3Command::GoAway.into());
1449    }
1450
1451    /// Creates an [`H3Command`] sender for the paired [H3Driver].
1452    pub fn h3_cmd_sender(&self) -> RequestSender<H::Command, H3Command> {
1453        RequestSender {
1454            sender: self.cmd_sender.clone(),
1455            _r: Default::default(),
1456        }
1457    }
1458
1459    /// Shuts down a stream in the specified direction(s) and removes it from
1460    /// local state.
1461    ///
1462    /// This removes the stream from local state and sends a `RESET_STREAM`
1463    /// frame (for write direction) and/or a `STOP_SENDING` frame (for read
1464    /// direction) to the peer, depending on the [`StreamShutdown`] variant.
1465    pub fn shutdown_stream(&self, stream_id: u64, shutdown: StreamShutdown) {
1466        let _ = self.cmd_sender.send(
1467            H3Command::ShutdownStream {
1468                stream_id,
1469                shutdown,
1470            }
1471            .into(),
1472        );
1473    }
1474}