Skip to main content

tokio_quiche/http3/driver/
mod.rs

1// Copyright (C) 2025, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27mod client;
28/// Wrapper for running HTTP/3 connections.
29pub mod connection;
30mod datagram;
31// `DriverHooks` must stay private to prevent users from creating their own
32// H3Drivers.
33mod hooks;
34mod server;
35mod streams;
36#[cfg(test)]
37pub mod test_utils;
38#[cfg(test)]
39mod tests;
40
41use std::collections::BTreeMap;
42use std::error::Error;
43use std::fmt;
44use std::marker::PhantomData;
45use std::sync::Arc;
46use std::time::Instant;
47
48use bytes::BufMut as _;
49use bytes::Bytes;
50use bytes::BytesMut;
51use datagram_socket::DgramBuffer;
52use datagram_socket::StreamClosureKind;
53use foundations::telemetry::log;
54use futures::FutureExt;
55use futures_util::stream::FuturesUnordered;
56use quiche::h3;
57use quiche::h3::WireErrorCode;
58use tokio::select;
59use tokio::sync::mpsc;
60use tokio::sync::mpsc::error::TryRecvError;
61use tokio::sync::mpsc::error::TrySendError;
62use tokio::sync::mpsc::UnboundedReceiver;
63use tokio::sync::mpsc::UnboundedSender;
64use tokio_stream::StreamExt;
65use tokio_util::sync::PollSender;
66
67use self::hooks::DriverHooks;
68use self::hooks::InboundHeaders;
69use self::streams::FlowCtx;
70use self::streams::HaveUpstreamCapacity;
71use self::streams::ReceivedDownstreamData;
72use self::streams::StreamCtx;
73use self::streams::StreamReady;
74use self::streams::WaitForDownstreamData;
75use self::streams::WaitForStream;
76use self::streams::WaitForUpstreamCapacity;
77use crate::buf_factory::BufFactory;
78use crate::http3::settings::Http3Settings;
79use crate::http3::H3AuditStats;
80use crate::metrics::Metrics;
81use crate::quic::HandshakeInfo;
82use crate::quic::QuicCommand;
83use crate::quic::QuicheConnection;
84use crate::ApplicationOverQuic;
85use crate::QuicResult;
86
87pub use self::client::ClientEventStream;
88pub use self::client::ClientH3Command;
89pub use self::client::ClientH3Controller;
90pub use self::client::ClientH3Driver;
91pub use self::client::ClientH3Event;
92pub use self::client::ClientRequestSender;
93pub use self::client::NewClientRequest;
94pub use self::server::IsInEarlyData;
95pub use self::server::RawPriorityValue;
96pub use self::server::ServerEventStream;
97pub use self::server::ServerH3Command;
98pub use self::server::ServerH3Controller;
99pub use self::server::ServerH3Driver;
100pub use self::server::ServerH3Event;
101
102// The default priority for HTTP/3 responses if the application didn't provide
103// one.
104const DEFAULT_PRIO: h3::Priority = h3::Priority::new(3, true);
105
106// For a stream use a channel with 16 entries, which works out to 16 * 64KB =
107// 1MB of max buffered data.
108#[cfg(not(any(test, debug_assertions)))]
109const STREAM_CAPACITY: usize = 16;
110#[cfg(any(test, debug_assertions))]
111const STREAM_CAPACITY: usize = 1; // Set to 1 to stress write_pending under test conditions
112
113// For *all* flows use a shared channel with 2048 entries, which works out
114// to 3MB of max buffered data at 1500 bytes per datagram.
115const FLOW_CAPACITY: usize = 2048;
116
117/// Used by a local task to send [`OutboundFrame`]s to a peer on the
118/// stream or flow associated with this channel.
119pub type OutboundFrameSender = PollSender<OutboundFrame>;
120
121/// Used internally to receive [`OutboundFrame`]s which should be sent to a peer
122/// on the stream or flow associated with this channel.
123type OutboundFrameStream = mpsc::Receiver<OutboundFrame>;
124
125/// Used internally to send [`InboundFrame`]s (data) from the peer to a local
126/// task on the stream or flow associated with this channel.
127type InboundFrameSender = PollSender<InboundFrame>;
128
129/// Used by a local task to receive [`InboundFrame`]s (data) on the stream or
130/// flow associated with this channel.
131pub type InboundFrameStream = mpsc::Receiver<InboundFrame>;
132
133/// The error type used internally in [H3Driver].
134///
135/// Note that [`ApplicationOverQuic`] errors are not exposed to users at this
136/// time. The type is public to document the failure modes in [H3Driver].
137#[derive(Debug, PartialEq, Eq)]
138#[non_exhaustive]
139pub enum H3ConnectionError {
140    /// The controller task was shut down and is no longer listening.
141    ControllerWentAway,
142    /// Other error at the connection, but not stream level.
143    H3(h3::Error),
144    /// Received data for a stream that was closed or never opened.
145    NonexistentStream,
146    /// The server's post-accept timeout was hit.
147    /// The timeout can be configured in [`Http3Settings`].
148    PostAcceptTimeout,
149}
150
151impl From<h3::Error> for H3ConnectionError {
152    fn from(err: h3::Error) -> Self {
153        H3ConnectionError::H3(err)
154    }
155}
156
157impl From<quiche::Error> for H3ConnectionError {
158    fn from(err: quiche::Error) -> Self {
159        H3ConnectionError::H3(h3::Error::TransportError(err))
160    }
161}
162
163impl Error for H3ConnectionError {}
164
165impl fmt::Display for H3ConnectionError {
166    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
167        let s: &dyn fmt::Display = match self {
168            Self::ControllerWentAway => &"controller went away",
169            Self::H3(e) => e,
170            Self::NonexistentStream => &"nonexistent stream",
171            Self::PostAcceptTimeout => &"post accept timeout hit",
172        };
173
174        write!(f, "H3ConnectionError: {s}")
175    }
176}
177
178type H3ConnectionResult<T> = Result<T, H3ConnectionError>;
179
180/// HTTP/3 headers that were received on a stream.
181///
182/// `recv` is used to read the message body, while `send` is used to transmit
183/// data back to the peer.
184pub struct IncomingH3Headers {
185    /// Stream ID of the frame.
186    pub stream_id: u64,
187    /// The actual [`h3::Header`]s which were received.
188    pub headers: Vec<h3::Header>,
189    /// An [`OutboundFrameSender`] for streaming body data to the peer. For
190    /// [ClientH3Driver], note that the request body can also be passed a
191    /// cloned sender via [`NewClientRequest`].
192    pub send: OutboundFrameSender,
193    /// An [`InboundFrameStream`] of body data received from the peer.
194    pub recv: InboundFrameStream,
195    /// Whether there is a body associated with the incoming headers.
196    pub read_fin: bool,
197    /// Handle to the [`H3AuditStats`] for the message's stream.
198    pub h3_audit_stats: Arc<H3AuditStats>,
199}
200
201impl fmt::Debug for IncomingH3Headers {
202    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
203        f.debug_struct("IncomingH3Headers")
204            .field("stream_id", &self.stream_id)
205            .field("headers", &self.headers)
206            .field("read_fin", &self.read_fin)
207            .field("h3_audit_stats", &self.h3_audit_stats)
208            .finish()
209    }
210}
211
212/// [`H3Event`]s are produced by an [H3Driver] to describe HTTP/3 state updates.
213///
214/// Both [ServerH3Driver] and [ClientH3Driver] may extend this enum with
215/// endpoint-specific variants. The events must be consumed by users of the
216/// drivers, like a higher-level `Server` or `Client` controller.
217#[derive(Debug)]
218pub enum H3Event {
219    /// A SETTINGS frame was received.
220    IncomingSettings {
221        /// Raw HTTP/3 setting pairs, in the order received from the peer.
222        settings: Vec<(u64, u64)>,
223    },
224
225    /// A HEADERS frame was received on the given stream. This is either a
226    /// request or a response depending on the perspective of the [`H3Event`]
227    /// receiver.
228    IncomingHeaders(IncomingH3Headers),
229
230    /// A DATAGRAM flow was created and associated with the given `flow_id`.
231    /// This event is fired before a HEADERS event for CONNECT[-UDP] requests.
232    NewFlow {
233        /// Flow ID of the new flow.
234        flow_id: u64,
235        /// An [`OutboundFrameSender`] for transmitting datagrams to the peer.
236        send: OutboundFrameSender,
237        /// An [`InboundFrameStream`] for receiving datagrams from the peer.
238        recv: InboundFrameStream,
239    },
240    /// A RST_STREAM frame was seen on the given `stream_id`. The user of the
241    /// driver should clean up any state allocated for this stream.
242    ResetStream { stream_id: u64 },
243    /// The connection has irrecoverably errored and is shutting down.
244    ConnectionError(h3::Error),
245    /// The connection has been shutdown, optionally due to an
246    /// [`H3ConnectionError`].
247    ConnectionShutdown(Option<H3ConnectionError>),
248    /// Body data has been received over a stream.
249    BodyBytesReceived {
250        /// Stream ID of the body data.
251        stream_id: u64,
252        /// Number of bytes received.
253        num_bytes: u64,
254        /// Whether the stream is finished and won't yield any more data.
255        fin: bool,
256    },
257    /// The stream has been closed. This is used to signal stream closures that
258    /// don't result from RST_STREAM frames, unlike the
259    /// [`H3Event::ResetStream`] variant.
260    StreamClosed { stream_id: u64 },
261    /// A GOAWAY frame was received from the peer containing `id`,
262    /// as described in https://datatracker.ietf.org/doc/html/rfc9114#section-5.2.
263    GoAway { id: u64 },
264}
265
266impl H3Event {
267    /// Generates an event from an applicable [`H3ConnectionError`].
268    fn from_error(err: &H3ConnectionError) -> Option<Self> {
269        Some(match err {
270            H3ConnectionError::H3(e) => Self::ConnectionError(*e),
271            H3ConnectionError::PostAcceptTimeout => Self::ConnectionShutdown(
272                Some(H3ConnectionError::PostAcceptTimeout),
273            ),
274            _ => return None,
275        })
276    }
277}
278
279/// An [`OutboundFrame`] is a data frame that should be sent from a local task
280/// to a peer over a [`quiche::h3::Connection`].
281///
282/// This is used, for example, to send response body data to a peer, or proxied
283/// UDP datagrams.
284#[derive(Debug)]
285pub enum OutboundFrame {
286    /// Response headers to be sent to the peer, with optional priority.
287    Headers(Vec<h3::Header>, Option<quiche::h3::Priority>),
288    /// Response body/CONNECT downstream data plus FIN flag.
289    Body(Bytes, bool),
290    /// CONNECT-UDP (DATAGRAM) downstream data plus flow ID.
291    Datagram(DgramBuffer, u64),
292    /// Close the stream with a trailers, with optional priority.
293    Trailers(Vec<h3::Header>, Option<quiche::h3::Priority>),
294    /// An error encountered when serving the request. Stream should be closed.
295    PeerStreamError,
296    /// DATAGRAM flow explicitly closed.
297    FlowShutdown { flow_id: u64, stream_id: u64 },
298}
299
300/// An [`InboundFrame`] is a data frame that was received from the peer over a
301/// [`quiche::h3::Connection`]. This is used by peers to send body or datagrams
302/// to the local task.
303#[derive(Debug)]
304pub enum InboundFrame {
305    /// Request body/CONNECT upstream data plus FIN flag.
306    Body(BytesMut, bool),
307    /// CONNECT-UDP (DATAGRAM) upstream data.
308    Datagram(DgramBuffer),
309}
310
311/// A ready-made [`ApplicationOverQuic`] which can handle HTTP/3 and MASQUE.
312/// Depending on the `DriverHooks` in use, it powers either a client or a
313/// server.
314///
315/// Use the [ClientH3Driver] and [ServerH3Driver] aliases to access the
316/// respective driver types. The driver is passed into an I/O loop and
317/// communicates with the driver's user (e.g., an HTTP client or a server) via
318/// its associated [H3Controller]. The controller allows the application to both
319/// listen for [`H3Event`]s of note and send [`H3Command`]s into the I/O loop.
320pub struct H3Driver<H: DriverHooks> {
321    /// Configuration used to initialize `conn`. Created from [`Http3Settings`]
322    /// in the constructor.
323    h3_config: h3::Config,
324    /// The underlying HTTP/3 connection. Initialized in
325    /// `ApplicationOverQuic::on_conn_established`.
326    conn: Option<h3::Connection>,
327    /// State required by the client/server hooks.
328    hooks: H,
329    /// Sends [`H3Event`]s to the [H3Controller] paired with this driver.
330    h3_event_sender: mpsc::UnboundedSender<H::Event>,
331    /// Receives [`H3Command`]s from the [H3Controller] paired with this driver.
332    cmd_recv: mpsc::UnboundedReceiver<H::Command>,
333    /// A sender that feeds back into `cmd_recv`. Used by hooks that need to
334    /// re-queue commands (e.g. retrying blocked requests) without access to
335    /// the [H3Controller]'s copy of the sender.
336    cmd_sender: mpsc::UnboundedSender<H::Command>,
337
338    /// A map of stream IDs to their [StreamCtx]. This is mainly used to
339    /// retrieve the internal Tokio channels associated with the stream.
340    stream_map: BTreeMap<u64, StreamCtx>,
341    /// A map of flow IDs to their [FlowCtx]. This is mainly used to retrieve
342    /// the internal Tokio channels associated with the flow.
343    flow_map: BTreeMap<u64, FlowCtx>,
344    /// Set of [`WaitForStream`] futures. A stream is added to this set if
345    /// we need to send to it and its channel is at capacity, or if we need
346    /// data from its channel and the channel is empty.
347    waiting_streams: FuturesUnordered<WaitForStream>,
348
349    /// Receives [`OutboundFrame`]s from all datagram flows on the connection.
350    dgram_recv: OutboundFrameStream,
351    /// Keeps the datagram channel open such that datagram flows can be created.
352    dgram_send: OutboundFrameSender,
353    /// A buffer to receive H3 body data from quiche. We initialize a large
354    /// buffer and then `split()` off filled parts until we need to reallocate.
355    body_recv_buf: bytes::buf::Limit<BytesMut>,
356
357    /// The buffer used to interact with the underlying IoWorker.
358    io_worker_buf: Vec<u8>,
359    /// The maximum HTTP/3 stream ID seen on this connection.
360    max_stream_seen: u64,
361
362    /// Tracks whether we have forwarded the HTTP/3 SETTINGS frame
363    /// to the [H3Controller] once.
364    settings_received_and_forwarded: bool,
365    /// Tracks whether the H3 event receiver has been dropped.
366    /// Used to avoid busy-looping on `h3_event_sender.closed()`.
367    h3_event_receiver_dropped: bool,
368}
369
370impl<H: DriverHooks> H3Driver<H> {
371    /// Builds a new [H3Driver] and an associated [H3Controller].
372    ///
373    /// The driver should then be passed to
374    /// [`InitialQuicConnection`](crate::InitialQuicConnection)'s `start`
375    /// method.
376    pub fn new(http3_settings: Http3Settings) -> (Self, H3Controller<H>) {
377        let (dgram_send, dgram_recv) = mpsc::channel(FLOW_CAPACITY);
378        let (cmd_sender, cmd_recv) = mpsc::unbounded_channel();
379        let (h3_event_sender, h3_event_recv) = mpsc::unbounded_channel();
380
381        (
382            H3Driver {
383                h3_config: (&http3_settings).into(),
384                conn: None,
385                hooks: H::new(&http3_settings),
386                h3_event_sender,
387                cmd_recv,
388                cmd_sender: cmd_sender.clone(),
389
390                stream_map: BTreeMap::new(),
391                flow_map: BTreeMap::new(),
392
393                dgram_recv,
394                dgram_send: PollSender::new(dgram_send),
395                max_stream_seen: 0,
396                body_recv_buf: BytesMut::with_capacity(BufFactory::MAX_BUF_SIZE)
397                    .limit(BufFactory::MAX_BUF_SIZE),
398                io_worker_buf: vec![0u8; BufFactory::MAX_BUF_SIZE],
399
400                waiting_streams: FuturesUnordered::new(),
401
402                settings_received_and_forwarded: false,
403                h3_event_receiver_dropped: false,
404            },
405            H3Controller {
406                cmd_sender,
407                h3_event_recv: Some(h3_event_recv),
408            },
409        )
410    }
411
412    /// Returns a sender that feeds back into this driver's own `cmd_recv`.
413    ///
414    /// Hooks that need to re-queue commands (e.g. retrying a request that
415    /// was temporarily blocked) can use this sender without needing access
416    /// to the paired [H3Controller].
417    pub(crate) fn self_cmd_sender(&self) -> &mpsc::UnboundedSender<H::Command> {
418        &self.cmd_sender
419    }
420
421    /// Retrieve the [FlowCtx] associated with the given `flow_id`. If no
422    /// context is found, a new one will be created.
423    fn get_or_insert_flow(
424        &mut self, flow_id: u64,
425    ) -> H3ConnectionResult<&mut FlowCtx> {
426        use std::collections::btree_map::Entry;
427        Ok(match self.flow_map.entry(flow_id) {
428            Entry::Vacant(e) => {
429                // This is a datagram for a new flow we haven't seen before
430                let (flow, recv) = FlowCtx::new(FLOW_CAPACITY);
431                let flow_req = H3Event::NewFlow {
432                    flow_id,
433                    recv,
434                    send: self.dgram_send.clone(),
435                };
436                self.h3_event_sender
437                    .send(flow_req.into())
438                    .map_err(|_| H3ConnectionError::ControllerWentAway)?;
439                e.insert(flow)
440            },
441            Entry::Occupied(e) => e.into_mut(),
442        })
443    }
444
445    /// Adds a [StreamCtx] to the stream map with the given `stream_id`.
446    fn insert_stream(&mut self, stream_id: u64, ctx: StreamCtx) {
447        self.stream_map.insert(stream_id, ctx);
448        self.max_stream_seen = self.max_stream_seen.max(stream_id);
449    }
450
451    /// Fetches body chunks from the [`quiche::h3::Connection`] and forwards
452    /// them to the stream's associated [`InboundFrameStream`].
453    fn process_h3_data(
454        &mut self, qconn: &mut QuicheConnection, stream_id: u64,
455    ) -> H3ConnectionResult<()> {
456        // Split self borrow between conn and stream_map
457        let conn = self.conn.as_mut().ok_or(Self::connection_not_present())?;
458        let ctx = self
459            .stream_map
460            .get_mut(&stream_id)
461            .ok_or(H3ConnectionError::NonexistentStream)?;
462
463        enum StreamStatus {
464            Done { close: bool },
465            Reset { wire_err_code: u64 },
466            Blocked,
467        }
468
469        let status = loop {
470            let Some(sender) = ctx.send.as_ref().and_then(PollSender::get_ref)
471            else {
472                // already waiting for capacity
473                break StreamStatus::Done { close: false };
474            };
475
476            let try_reserve_result = sender.try_reserve();
477            let permit = match try_reserve_result {
478                Ok(permit) => permit,
479                Err(TrySendError::Closed(())) => {
480                    // The channel has closed before we delivered a fin or reset
481                    // to the application.
482                    if !ctx.fin_or_reset_recv &&
483                        ctx.associated_dgram_flow_id.is_none()
484                    // The channel might be closed if the stream was used to
485                    // initiate a datagram exchange.
486                    // TODO: ideally, the application would still shut down the
487                    // stream properly. Once applications code
488                    // is fixed, we can remove this check.
489                    {
490                        let err = h3::WireErrorCode::RequestCancelled as u64;
491                        let _ = qconn.stream_shutdown(
492                            stream_id,
493                            quiche::Shutdown::Read,
494                            err,
495                        );
496                        drop(try_reserve_result); // needed to drop the borrow on ctx.
497                        ctx.handle_sent_stop_sending(err);
498                        // TODO: should we send an H3Event event to
499                        // h3_event_sender? We can only get here if the app
500                        // actively closed or dropped
501                        // the channel so any event we send would be more for
502                        // logging or auditing
503                    }
504                    break StreamStatus::Done {
505                        close: ctx.both_directions_done(),
506                    };
507                },
508                Err(TrySendError::Full(())) => {
509                    if ctx.fin_or_reset_recv || qconn.stream_readable(stream_id) {
510                        break StreamStatus::Blocked;
511                    }
512                    break StreamStatus::Done { close: false };
513                },
514            };
515
516            if ctx.fin_or_reset_recv {
517                // Signal end-of-body to upstream
518                permit.send(InboundFrame::Body(Default::default(), true));
519                break StreamStatus::Done {
520                    close: ctx.fin_or_reset_sent,
521                };
522            }
523
524            // NOTE: `self.body_recv_buf` is `Limit<BytesMut>` so
525            // `has_remaining_mut()` will indicate if the buffer
526            // has space available until the *limit* is
527            //  reached. (A plain `BytesMut` can reallocate and would always
528            // return true)
529            if !self.body_recv_buf.has_remaining_mut() {
530                self.body_recv_buf =
531                    BytesMut::with_capacity(BufFactory::MAX_BUF_SIZE)
532                        .limit(BufFactory::MAX_BUF_SIZE)
533            };
534            match conn.recv_body_buf(qconn, stream_id, &mut self.body_recv_buf) {
535                Ok(n) => {
536                    ctx.audit_stats.add_downstream_bytes_recvd(n as u64);
537                    let event = H3Event::BodyBytesReceived {
538                        stream_id,
539                        num_bytes: n as u64,
540                        fin: false,
541                    };
542                    let _ = self.h3_event_sender.send(event.into());
543                    // Take the filled part, leave the remaining capacity
544                    let filled_body = self.body_recv_buf.get_mut().split();
545                    // Sanity check: the remaining spare capacity should equal
546                    // the limit.
547                    debug_assert_eq!(
548                        self.body_recv_buf.get_mut().spare_capacity_mut().len(),
549                        self.body_recv_buf.remaining_mut()
550                    );
551                    permit.send(InboundFrame::Body(filled_body, false));
552                },
553                Err(h3::Error::Done) =>
554                    break StreamStatus::Done { close: false },
555                Err(h3::Error::TransportError(quiche::Error::StreamReset(
556                    code,
557                ))) => {
558                    break StreamStatus::Reset {
559                        wire_err_code: code,
560                    };
561                },
562                Err(_) => break StreamStatus::Done { close: true },
563            }
564        };
565
566        match status {
567            StreamStatus::Done { close } => {
568                if close {
569                    return self.cleanup_stream(qconn, stream_id);
570                }
571
572                // The QUIC stream is finished, manually invoke `process_h3_fin`
573                // in case `h3::poll()` is never called again.
574                //
575                // Note that this case will not conflict with StreamStatus::Done
576                // being returned due to the body channel being
577                // blocked. qconn.stream_finished() will guarantee
578                // that we've fully parsed the body as it only returns true
579                // if we've seen a Fin for the read half of the stream.
580                if !ctx.fin_or_reset_recv && qconn.stream_finished(stream_id) {
581                    return self.process_h3_fin(qconn, stream_id);
582                }
583            },
584            StreamStatus::Reset { wire_err_code } => {
585                debug_assert!(ctx.send.is_some());
586                ctx.handle_recvd_reset(wire_err_code);
587                self.h3_event_sender
588                    .send(H3Event::ResetStream { stream_id }.into())
589                    .map_err(|_| H3ConnectionError::ControllerWentAway)?;
590                if ctx.both_directions_done() {
591                    return self.cleanup_stream(qconn, stream_id);
592                }
593            },
594            StreamStatus::Blocked => {
595                self.waiting_streams.push(ctx.wait_for_send(stream_id));
596            },
597        }
598
599        Ok(())
600    }
601
602    /// Processes an end-of-stream event from the [`quiche::h3::Connection`].
603    fn process_h3_fin(
604        &mut self, qconn: &mut QuicheConnection, stream_id: u64,
605    ) -> H3ConnectionResult<()> {
606        let ctx = self
607            .stream_map
608            .get_mut(&stream_id)
609            .filter(|c| !c.fin_or_reset_recv);
610        let Some(ctx) = ctx else {
611            // Stream is already finished, nothing to do
612            return Ok(());
613        };
614
615        ctx.fin_or_reset_recv = true;
616        ctx.audit_stats
617            .set_recvd_stream_fin(StreamClosureKind::Explicit);
618
619        // It's important to send this H3Event before process_h3_data so that
620        // a server can (potentially) generate the control response before the
621        // corresponding receiver drops.
622        let event = H3Event::BodyBytesReceived {
623            stream_id,
624            num_bytes: 0,
625            fin: true,
626        };
627        let _ = self.h3_event_sender.send(event.into());
628
629        // Communicate fin to upstream. Since `ctx.fin_recv` is true now,
630        // there can't be a recursive loop.
631        self.process_h3_data(qconn, stream_id)
632    }
633
634    /// Processes a single [`quiche::h3::Event`] received from the underlying
635    /// [`quiche::h3::Connection`]. Some events are dispatched to helper
636    /// methods.
637    fn process_read_event(
638        &mut self, qconn: &mut QuicheConnection, stream_id: u64, event: h3::Event,
639    ) -> H3ConnectionResult<()> {
640        self.forward_settings()?;
641
642        match event {
643            // Requests/responses are exclusively handled by hooks.
644            h3::Event::Headers { list, more_frames } =>
645                H::headers_received(self, qconn, InboundHeaders {
646                    stream_id,
647                    headers: list,
648                    has_body: more_frames,
649                }),
650
651            h3::Event::Data => self.process_h3_data(qconn, stream_id),
652            h3::Event::Finished => self.process_h3_fin(qconn, stream_id),
653
654            h3::Event::Reset(code) => {
655                if let Some(ctx) = self.stream_map.get_mut(&stream_id) {
656                    ctx.handle_recvd_reset(code);
657                    // See if we are waiting on this stream and close the channel
658                    // if we are. If we are not waiting, `handle_recvd_reset()`
659                    // will have taken care of closing.
660                    for pending in self.waiting_streams.iter_mut() {
661                        match pending {
662                            WaitForStream::Upstream(
663                                WaitForUpstreamCapacity {
664                                    stream_id: id,
665                                    chan: Some(chan),
666                                },
667                            ) if stream_id == *id => {
668                                chan.close();
669                            },
670                            _ => {},
671                        }
672                    }
673
674                    self.h3_event_sender
675                        .send(H3Event::ResetStream { stream_id }.into())
676                        .map_err(|_| H3ConnectionError::ControllerWentAway)?;
677                    if ctx.both_directions_done() {
678                        return self.cleanup_stream(qconn, stream_id);
679                    }
680                }
681
682                // TODO: if we don't have the stream in our map: should we
683                // send the H3Event::ResetStream?
684                Ok(())
685            },
686
687            h3::Event::PriorityUpdate => Ok(()),
688            h3::Event::GoAway => {
689                self.h3_event_sender
690                    .send(H3Event::GoAway { id: stream_id }.into())
691                    .map_err(|_| H3ConnectionError::ControllerWentAway)?;
692                Ok(())
693            },
694        }
695    }
696
697    /// The SETTINGS frame can be received at any point, so we
698    /// need to check `peer_settings_raw` to decide if we've received it.
699    ///
700    /// Settings should only be sent once, so we generate a single event
701    /// when `peer_settings_raw` transitions from None to Some.
702    fn forward_settings(&mut self) -> H3ConnectionResult<()> {
703        if self.settings_received_and_forwarded {
704            return Ok(());
705        }
706
707        // capture the peer settings and forward it
708        if let Some(settings) = self.conn_mut()?.peer_settings_raw() {
709            let incoming_settings = H3Event::IncomingSettings {
710                settings: settings.to_vec(),
711            };
712
713            self.h3_event_sender
714                .send(incoming_settings.into())
715                .map_err(|_| H3ConnectionError::ControllerWentAway)?;
716
717            self.settings_received_and_forwarded = true;
718        }
719        Ok(())
720    }
721
722    /// Send an individual frame to the underlying [`quiche::h3::Connection`] to
723    /// be flushed at a later time.
724    ///
725    /// `Self::process_writes` will iterate over all writable streams and call
726    /// this method in a loop for each stream to send all writable packets.
727    fn process_write_frame(
728        conn: &mut h3::Connection, qconn: &mut QuicheConnection,
729        ctx: &mut StreamCtx,
730    ) -> h3::Result<()> {
731        let Some(frame) = &mut ctx.queued_frame else {
732            return Ok(());
733        };
734
735        let audit_stats = &ctx.audit_stats;
736        let stream_id = audit_stats.stream_id();
737
738        match frame {
739            OutboundFrame::Headers(headers, priority) => {
740                let prio = priority.as_ref().unwrap_or(&DEFAULT_PRIO);
741
742                let res = if ctx.initial_headers_sent {
743                    // Initial headers were already sent, send additional
744                    // headers now.
745                    conn.send_additional_headers_with_priority(
746                        qconn, stream_id, headers, prio, false, false,
747                    )
748                } else {
749                    // Send initial headers.
750                    conn.send_response_with_priority(
751                        qconn, stream_id, headers, prio, false,
752                    )
753                    .inspect(|_| ctx.initial_headers_sent = true)
754                };
755
756                if let Err(h3::Error::StreamBlocked) = res {
757                    ctx.first_full_headers_flush_fail_time
758                        .get_or_insert(Instant::now());
759                }
760
761                if res.is_ok() {
762                    if let Some(first) =
763                        ctx.first_full_headers_flush_fail_time.take()
764                    {
765                        ctx.audit_stats.add_header_flush_duration(
766                            Instant::now().duration_since(first),
767                        );
768                    }
769                }
770
771                res
772            },
773
774            OutboundFrame::Body(body, fin) => {
775                let len = body.len();
776                if len == 0 && !*fin {
777                    // quiche doesn't allow sending an empty body when the fin
778                    // flag is not set
779                    return Ok(());
780                }
781                if *fin {
782                    // If this is the last body frame, drop the receiver in the
783                    // stream map to signal that we shouldn't receive any more
784                    // frames. NOTE: we can't use `mpsc::Receiver::close()`
785                    // due to an inconsistency in how tokio handles reading
786                    // from a closed mpsc channel https://github.com/tokio-rs/tokio/issues/7631
787                    ctx.recv = None;
788                }
789                let n = conn.send_body_zc(qconn, stream_id, body, *fin)?;
790
791                audit_stats.add_downstream_bytes_sent(n as _);
792                if n != len {
793                    // Couldn't write the entire body, `send_body_zc` will
794                    // have trimmed `body` accordingly. The driver keeps
795                    // the remainder of the body to send in the future.
796                    debug_assert_eq!(
797                        n + body.len(),
798                        len,
799                        "send_body_zc() should have trimmed body but did not"
800                    );
801                    Err(h3::Error::StreamBlocked)
802                } else {
803                    if *fin {
804                        Self::on_fin_sent(ctx)?;
805                    }
806                    Ok(())
807                }
808            },
809
810            OutboundFrame::Trailers(headers, priority) => {
811                let prio = priority.as_ref().unwrap_or(&DEFAULT_PRIO);
812
813                // trailers always set fin=true
814                let res = conn.send_additional_headers_with_priority(
815                    qconn, stream_id, headers, prio, true, true,
816                );
817
818                if res.is_ok() {
819                    Self::on_fin_sent(ctx)?;
820                }
821                res
822            },
823
824            OutboundFrame::PeerStreamError => Err(h3::Error::MessageError),
825
826            OutboundFrame::FlowShutdown { .. } => {
827                unreachable!("Only flows send shutdowns")
828            },
829
830            OutboundFrame::Datagram(..) => {
831                unreachable!("Only flows send datagrams")
832            },
833        }
834    }
835
836    fn on_fin_sent(ctx: &mut StreamCtx) -> h3::Result<()> {
837        ctx.recv = None;
838        ctx.fin_or_reset_sent = true;
839        ctx.audit_stats
840            .set_sent_stream_fin(StreamClosureKind::Explicit);
841        if ctx.fin_or_reset_recv {
842            // Return a TransportError to trigger stream cleanup
843            // instead of h3::Error::Done
844            Err(h3::Error::TransportError(quiche::Error::Done))
845        } else {
846            Ok(())
847        }
848    }
849
850    /// Resumes reads or writes to the connection when a stream channel becomes
851    /// unblocked.
852    ///
853    /// If we were waiting for more data from a channel, we resume writing to
854    /// the connection. Otherwise, we were blocked on channel capacity and
855    /// continue reading from the connection. `Upstream` in this context is
856    /// the consumer of the stream.
857    fn upstream_ready(
858        &mut self, qconn: &mut QuicheConnection, ready: StreamReady,
859    ) -> H3ConnectionResult<()> {
860        match ready {
861            StreamReady::Downstream(r) => self.upstream_read_ready(qconn, r),
862            StreamReady::Upstream(r) => self.upstream_write_ready(qconn, r),
863        }
864    }
865
866    fn upstream_read_ready(
867        &mut self, qconn: &mut QuicheConnection,
868        read_ready: ReceivedDownstreamData,
869    ) -> H3ConnectionResult<()> {
870        let ReceivedDownstreamData {
871            stream_id,
872            chan,
873            data,
874        } = read_ready;
875
876        match self.stream_map.get_mut(&stream_id) {
877            None => Ok(()),
878            Some(stream) => {
879                stream.recv = Some(chan);
880                stream.queued_frame = data;
881                self.process_writable_stream(qconn, stream_id)
882            },
883        }
884    }
885
886    fn upstream_write_ready(
887        &mut self, qconn: &mut QuicheConnection,
888        write_ready: HaveUpstreamCapacity,
889    ) -> H3ConnectionResult<()> {
890        let HaveUpstreamCapacity {
891            stream_id,
892            mut chan,
893        } = write_ready;
894
895        match self.stream_map.get_mut(&stream_id) {
896            None => Ok(()),
897            Some(stream) => {
898                chan.abort_send(); // Have to do it to release the associated permit
899                stream.send = Some(chan);
900                self.process_h3_data(qconn, stream_id)
901            },
902        }
903    }
904
905    /// Processes all queued outbound datagrams from the `dgram_recv` channel.
906    fn dgram_ready(
907        &mut self, qconn: &mut QuicheConnection, frame: OutboundFrame,
908    ) -> H3ConnectionResult<()> {
909        let mut frame = Ok(frame);
910
911        loop {
912            match frame {
913                Ok(OutboundFrame::Datagram(dgram, flow_id)) => {
914                    // Drop datagrams if there is no capacity
915                    let _ = datagram::send_h3_dgram(qconn, flow_id, dgram);
916                },
917                Ok(OutboundFrame::FlowShutdown { flow_id, stream_id }) => {
918                    self.shutdown_stream(
919                        qconn,
920                        stream_id,
921                        StreamShutdown::Both {
922                            read_error_code: WireErrorCode::NoError as u64,
923                            write_error_code: WireErrorCode::NoError as u64,
924                        },
925                    )?;
926                    self.flow_map.remove(&flow_id);
927                    self.close_if_idle(qconn);
928                    break;
929                },
930                Ok(_) => unreachable!("Flows can't send frame of other types"),
931                Err(TryRecvError::Empty) => break,
932                Err(TryRecvError::Disconnected) =>
933                    return Err(H3ConnectionError::ControllerWentAway),
934            }
935
936            frame = self.dgram_recv.try_recv();
937        }
938
939        Ok(())
940    }
941
942    /// Return a mutable reference to the driver's HTTP/3 connection.
943    ///
944    /// If the connection doesn't exist yet, this function returns
945    /// a `Self::connection_not_present()` error.
946    fn conn_mut(&mut self) -> H3ConnectionResult<&mut h3::Connection> {
947        self.conn.as_mut().ok_or(Self::connection_not_present())
948    }
949
950    /// Alias for [`quiche::Error::TlsFail`], which is used in the case where
951    /// this driver doesn't have an established HTTP/3 connection attached
952    /// to it yet.
953    const fn connection_not_present() -> H3ConnectionError {
954        H3ConnectionError::H3(h3::Error::TransportError(quiche::Error::TlsFail))
955    }
956
957    /// Cleans up internal state for the indicated HTTP/3 stream.
958    ///
959    /// This function removes the stream from the stream map, closes any pending
960    /// futures, removes associated DATAGRAM flows, and sends a
961    /// [`H3Event::StreamClosed`] event (for servers).
962    fn cleanup_stream(
963        &mut self, qconn: &mut QuicheConnection, stream_id: u64,
964    ) -> H3ConnectionResult<()> {
965        let Some(stream_ctx) = self.stream_map.remove(&stream_id) else {
966            return Ok(());
967        };
968
969        // Find if the stream also has any pending futures associated with it
970        for pending in self.waiting_streams.iter_mut() {
971            match pending {
972                WaitForStream::Downstream(WaitForDownstreamData {
973                    stream_id: id,
974                    chan: Some(chan),
975                }) if stream_id == *id => {
976                    chan.close();
977                },
978                WaitForStream::Upstream(WaitForUpstreamCapacity {
979                    stream_id: id,
980                    chan: Some(chan),
981                }) if stream_id == *id => {
982                    chan.close();
983                },
984                _ => {},
985            }
986        }
987
988        // Close any DATAGRAM-proxying channels when we close the stream, if they
989        // exist
990        if let Some(mapped_flow_id) = stream_ctx.associated_dgram_flow_id {
991            self.flow_map.remove(&mapped_flow_id);
992        }
993
994        if qconn.is_server() {
995            // Signal the server to remove the stream from its map
996            let _ = self
997                .h3_event_sender
998                .send(H3Event::StreamClosed { stream_id }.into());
999        }
1000
1001        self.close_if_idle(qconn);
1002
1003        Ok(())
1004    }
1005
1006    /// Closes the connection with `NoError` if the H3 event receiver
1007    /// has been dropped and there are no active streams or flows.
1008    fn close_if_idle(&self, qconn: &mut QuicheConnection) {
1009        if self.h3_event_receiver_dropped &&
1010            self.stream_map.is_empty() &&
1011            self.flow_map.is_empty()
1012        {
1013            let _ =
1014                qconn.close(true, quiche::h3::WireErrorCode::NoError as u64, &[]);
1015        }
1016    }
1017
1018    /// Shuts down the indicated HTTP/3 stream by sending frames and cleaning
1019    /// up then cleans up internal state by calling
1020    /// [`Self::cleanup_stream`].
1021    fn shutdown_stream(
1022        &mut self, qconn: &mut QuicheConnection, stream_id: u64,
1023        shutdown: StreamShutdown,
1024    ) -> H3ConnectionResult<()> {
1025        let Some(stream_ctx) = self.stream_map.get(&stream_id) else {
1026            return Ok(());
1027        };
1028
1029        let audit_stats = &stream_ctx.audit_stats;
1030
1031        match shutdown {
1032            StreamShutdown::Read { error_code } => {
1033                audit_stats.set_sent_stop_sending_error_code(error_code as _);
1034                let _ = qconn.stream_shutdown(
1035                    stream_id,
1036                    quiche::Shutdown::Read,
1037                    error_code,
1038                );
1039            },
1040            StreamShutdown::Write { error_code } => {
1041                audit_stats.set_sent_reset_stream_error_code(error_code as _);
1042                let _ = qconn.stream_shutdown(
1043                    stream_id,
1044                    quiche::Shutdown::Write,
1045                    error_code,
1046                );
1047            },
1048            StreamShutdown::Both {
1049                read_error_code,
1050                write_error_code,
1051            } => {
1052                audit_stats
1053                    .set_sent_stop_sending_error_code(read_error_code as _);
1054                let _ = qconn.stream_shutdown(
1055                    stream_id,
1056                    quiche::Shutdown::Read,
1057                    read_error_code,
1058                );
1059                audit_stats
1060                    .set_sent_reset_stream_error_code(write_error_code as _);
1061                let _ = qconn.stream_shutdown(
1062                    stream_id,
1063                    quiche::Shutdown::Write,
1064                    write_error_code,
1065                );
1066            },
1067        }
1068
1069        self.cleanup_stream(qconn, stream_id)
1070    }
1071
1072    /// Handles a regular [`H3Command`]. May be called internally by
1073    /// [DriverHooks] for non-endpoint-specific [`H3Command`]s.
1074    fn handle_core_command(
1075        &mut self, qconn: &mut QuicheConnection, cmd: H3Command,
1076    ) -> H3ConnectionResult<()> {
1077        match cmd {
1078            H3Command::QuicCmd(cmd) => cmd.execute(qconn),
1079            H3Command::GoAway => {
1080                let max_id = self.max_stream_seen;
1081                self.conn_mut()
1082                    .expect("connection should be established")
1083                    .send_goaway(qconn, max_id)?;
1084            },
1085            H3Command::ShutdownStream {
1086                stream_id,
1087                shutdown,
1088            } => {
1089                self.shutdown_stream(qconn, stream_id, shutdown)?;
1090            },
1091        }
1092        Ok(())
1093    }
1094}
1095
1096impl<H: DriverHooks> H3Driver<H> {
1097    /// Reads all buffered datagrams out of `qconn` and distributes them to
1098    /// their flow channels.
1099    fn process_available_dgrams(
1100        &mut self, qconn: &mut QuicheConnection,
1101    ) -> H3ConnectionResult<()> {
1102        loop {
1103            match datagram::receive_h3_dgram(qconn) {
1104                Ok((flow_id, dgram))
1105                    if !qconn.is_server() ||
1106                        self.hooks.extended_connect_enabled() =>
1107                {
1108                    self.get_or_insert_flow(flow_id)?.send_best_effort(dgram);
1109                },
1110                Ok(_) => {},
1111                Err(quiche::Error::Done) => return Ok(()),
1112                Err(err) => return Err(H3ConnectionError::from(err)),
1113            }
1114        }
1115    }
1116
1117    /// Flushes any queued-up frames for `stream_id` into `qconn` until either
1118    /// there is no more capacity in `qconn` or no more frames to send.
1119    fn process_writable_stream(
1120        &mut self, qconn: &mut QuicheConnection, stream_id: u64,
1121    ) -> H3ConnectionResult<()> {
1122        // Split self borrow between conn and stream_map
1123        let conn = self.conn.as_mut().ok_or(Self::connection_not_present())?;
1124        let Some(ctx) = self.stream_map.get_mut(&stream_id) else {
1125            return Ok(()); // Unknown stream_id
1126        };
1127
1128        loop {
1129            // Process each writable frame, queue the next frame for processing
1130            // and shut down any errored streams.
1131            match Self::process_write_frame(conn, qconn, ctx) {
1132                Ok(()) => ctx.queued_frame = None,
1133                Err(h3::Error::StreamBlocked | h3::Error::Done) => break,
1134                Err(h3::Error::MessageError) => {
1135                    return self.shutdown_stream(
1136                        qconn,
1137                        stream_id,
1138                        StreamShutdown::Both {
1139                            read_error_code: WireErrorCode::MessageError as u64,
1140                            write_error_code: WireErrorCode::MessageError as u64,
1141                        },
1142                    );
1143                },
1144                Err(h3::Error::TransportError(quiche::Error::StreamStopped(
1145                    e,
1146                ))) => {
1147                    ctx.handle_recvd_stop_sending(e);
1148                    if ctx.both_directions_done() {
1149                        return self.cleanup_stream(qconn, stream_id);
1150                    } else {
1151                        return Ok(());
1152                    }
1153                },
1154                Err(h3::Error::TransportError(
1155                    quiche::Error::InvalidStreamState(stream),
1156                )) => {
1157                    return self.cleanup_stream(qconn, stream);
1158                },
1159                Err(_) => {
1160                    return self.cleanup_stream(qconn, stream_id);
1161                },
1162            }
1163
1164            let Some(recv) = ctx.recv.as_mut() else {
1165                // This stream is already waiting for data or we wrote a fin and
1166                // closed the channel.
1167                debug_assert!(
1168                    ctx.queued_frame.is_none(),
1169                    "We MUST NOT have a queued frame if we are already waiting on 
1170                    more data from the channel"
1171                );
1172                return Ok(());
1173            };
1174
1175            // Attempt to queue the next frame for processing. The corresponding
1176            // sender is created at the same time as the `StreamCtx`
1177            // and ultimately ends up in an `H3Body`. The body then
1178            // determines which frames to send to the peer via
1179            // this processing loop.
1180            match recv.try_recv() {
1181                Ok(frame) => ctx.queued_frame = Some(frame),
1182                Err(TryRecvError::Disconnected) => {
1183                    if !ctx.fin_or_reset_sent &&
1184                        ctx.associated_dgram_flow_id.is_none()
1185                    // The channel might be closed if the stream was used to
1186                    // initiate a datagram exchange.
1187                    // TODO: ideally, the application would still shut down the
1188                    // stream properly. Once applications code
1189                    // is fixed, we can remove this check.
1190                    {
1191                        // The channel closed without having written a fin. Send a
1192                        // RESET_STREAM to indicate we won't be writing anything
1193                        // else
1194                        let err = h3::WireErrorCode::RequestCancelled as u64;
1195                        let _ = qconn.stream_shutdown(
1196                            stream_id,
1197                            quiche::Shutdown::Write,
1198                            err,
1199                        );
1200                        ctx.handle_sent_reset(err);
1201                        if ctx.both_directions_done() {
1202                            return self.cleanup_stream(qconn, stream_id);
1203                        }
1204                    }
1205                    break;
1206                },
1207                Err(TryRecvError::Empty) => {
1208                    self.waiting_streams.push(ctx.wait_for_recv(stream_id));
1209                    break;
1210                },
1211            }
1212        }
1213
1214        Ok(())
1215    }
1216
1217    /// Tests `qconn` for either a local or peer error and increments
1218    /// the associated HTTP/3 or QUIC error counter.
1219    fn record_quiche_error(qconn: &mut QuicheConnection, metrics: &impl Metrics) {
1220        // split metrics between local/peer and QUIC/HTTP/3 level errors
1221        if let Some(err) = qconn.local_error() {
1222            if err.is_app {
1223                metrics.local_h3_conn_close_error_count(err.error_code.into())
1224            } else {
1225                metrics.local_quic_conn_close_error_count(err.error_code.into())
1226            }
1227            .inc();
1228        } else if let Some(err) = qconn.peer_error() {
1229            if err.is_app {
1230                metrics.peer_h3_conn_close_error_count(err.error_code.into())
1231            } else {
1232                metrics.peer_quic_conn_close_error_count(err.error_code.into())
1233            }
1234            .inc();
1235        }
1236    }
1237}
1238
1239impl<H: DriverHooks> ApplicationOverQuic for H3Driver<H> {
1240    fn on_conn_established(
1241        &mut self, quiche_conn: &mut QuicheConnection,
1242        handshake_info: &HandshakeInfo,
1243    ) -> QuicResult<()> {
1244        let conn = h3::Connection::with_transport(quiche_conn, &self.h3_config)?;
1245        self.conn = Some(conn);
1246
1247        H::conn_established(self, quiche_conn, handshake_info)?;
1248        Ok(())
1249    }
1250
1251    #[inline]
1252    fn should_act(&self) -> bool {
1253        self.conn.is_some()
1254    }
1255
1256    #[inline]
1257    fn buffer(&mut self) -> &mut [u8] {
1258        &mut self.io_worker_buf
1259    }
1260
1261    /// Poll the underlying [`quiche::h3::Connection`] for
1262    /// [`quiche::h3::Event`]s and DATAGRAMs, delegating processing to
1263    /// `Self::process_read_event`.
1264    ///
1265    /// If a DATAGRAM is found, it is sent to the receiver on its channel.
1266    fn process_reads(&mut self, qconn: &mut QuicheConnection) -> QuicResult<()> {
1267        loop {
1268            match self.conn_mut()?.poll(qconn) {
1269                Ok((stream_id, event)) =>
1270                    self.process_read_event(qconn, stream_id, event)?,
1271                Err(h3::Error::Done) => break,
1272                Err(err) => {
1273                    // Don't bubble error up, instead keep the worker loop going
1274                    // until quiche reports the connection is
1275                    // closed.
1276                    log::debug!("connection closed due to h3 protocol error"; "error"=>?err);
1277                    return Ok(());
1278                },
1279            };
1280        }
1281
1282        self.process_available_dgrams(qconn)?;
1283        Ok(())
1284    }
1285
1286    /// Write as much data as possible into the [`quiche::h3::Connection`] from
1287    /// all sources. This will attempt to write any queued frames into their
1288    /// respective streams, if writable.
1289    fn process_writes(&mut self, qconn: &mut QuicheConnection) -> QuicResult<()> {
1290        while let Some(stream_id) = qconn.stream_writable_next() {
1291            self.process_writable_stream(qconn, stream_id)?;
1292        }
1293
1294        // Also optimistically check for any ready streams
1295        while let Some(Some(ready)) = self.waiting_streams.next().now_or_never() {
1296            self.upstream_ready(qconn, ready)?;
1297        }
1298
1299        Ok(())
1300    }
1301
1302    /// Reports connection-level error metrics and forwards
1303    /// IOWorker errors to the associated [H3Controller].
1304    fn on_conn_close<M: Metrics>(
1305        &mut self, quiche_conn: &mut QuicheConnection, metrics: &M,
1306        work_loop_result: &QuicResult<()>,
1307    ) {
1308        let max_stream_seen = self.max_stream_seen;
1309        metrics
1310            .maximum_writable_streams()
1311            .observe(max_stream_seen as f64);
1312
1313        Self::record_quiche_error(quiche_conn, metrics);
1314
1315        let Err(work_loop_error) = work_loop_result else {
1316            return;
1317        };
1318
1319        let Some(h3_err) = work_loop_error.downcast_ref::<H3ConnectionError>()
1320        else {
1321            log::error!("Found non-H3ConnectionError"; "error" => %work_loop_error);
1322            return;
1323        };
1324
1325        if matches!(h3_err, H3ConnectionError::ControllerWentAway) {
1326            // Inform client that we won't (can't) respond anymore
1327            let _ = quiche_conn.close(true, WireErrorCode::NoError as u64, &[]);
1328            return;
1329        }
1330
1331        if let Some(ev) = H3Event::from_error(h3_err) {
1332            let _ = self.h3_event_sender.send(ev.into());
1333            #[expect(clippy::needless_return)]
1334            return; // avoid accidental fallthrough in the future
1335        }
1336    }
1337
1338    /// Wait for incoming data from the [H3Controller]. The next iteration of
1339    /// the I/O loop commences when one of the `select!`ed futures triggers.
1340    #[inline]
1341    async fn wait_for_data(
1342        &mut self, qconn: &mut QuicheConnection,
1343    ) -> QuicResult<()> {
1344        select! {
1345            biased;
1346            Some(ready) = self.waiting_streams.next() => self.upstream_ready(qconn, ready),
1347            Some(dgram) = self.dgram_recv.recv() => self.dgram_ready(qconn, dgram),
1348            Some(cmd) = self.cmd_recv.recv() => H::conn_command(self, qconn, cmd),
1349            r = self.hooks.wait_for_action(qconn), if H::has_wait_action(self) => r,
1350            _ = self.h3_event_sender.closed(), if !self.h3_event_receiver_dropped => {
1351                self.h3_event_receiver_dropped = true;
1352                self.close_if_idle(qconn);
1353                Ok(())
1354            }
1355        }?;
1356
1357        // Make sure controller is not starved, but also not prioritized in the
1358        // biased select. So poll it last, however also perform a try_recv
1359        // each iteration.
1360        if let Ok(cmd) = self.cmd_recv.try_recv() {
1361            H::conn_command(self, qconn, cmd)?;
1362        }
1363
1364        Ok(())
1365    }
1366}
1367
1368impl<H: DriverHooks> Drop for H3Driver<H> {
1369    fn drop(&mut self) {
1370        for stream in self.stream_map.values() {
1371            stream
1372                .audit_stats
1373                .set_recvd_stream_fin(StreamClosureKind::Implicit);
1374        }
1375    }
1376}
1377
1378/// [`H3Command`]s are sent by the [H3Controller] to alter the [H3Driver]'s
1379/// state.
1380///
1381/// Both [ServerH3Driver] and [ClientH3Driver] may extend this enum with
1382/// endpoint-specific variants.
1383#[derive(Debug)]
1384pub enum H3Command {
1385    /// A connection-level command that executes directly on the
1386    /// [`quiche::Connection`].
1387    QuicCmd(QuicCommand),
1388    /// Send a GOAWAY frame to the peer to initiate a graceful connection
1389    /// shutdown.
1390    GoAway,
1391    /// Shuts down a stream in the specified direction(s) and removes it from
1392    /// local state.
1393    ///
1394    /// This removes the stream from local state and sends a `RESET_STREAM`
1395    /// frame (for write direction) and/or a `STOP_SENDING` frame (for read
1396    /// direction) to the peer. See [`quiche::Connection::stream_shutdown`]
1397    /// for details.
1398    ShutdownStream {
1399        stream_id: u64,
1400        shutdown: StreamShutdown,
1401    },
1402}
1403
1404/// Specifies which direction(s) of a stream to shut down.
1405///
1406/// Used with [`H3Controller::shutdown_stream`] and the internal
1407/// `shutdown_stream` function to control whether to send a `STOP_SENDING` frame
1408/// (read direction), and/or a `RESET_STREAM` frame (write direction)
1409///
1410/// Note: Despite its name, "shutdown" here refers to signaling the peer about
1411/// stream termination, not sending a FIN flag. `STOP_SENDING` asks the peer to
1412/// stop sending data, while `RESET_STREAM` abruptly terminates the write side.
1413#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1414pub enum StreamShutdown {
1415    /// Shut down only the read direction (sends `STOP_SENDING` frame with the
1416    /// given error code).
1417    Read { error_code: u64 },
1418    /// Shut down only the write direction (sends `RESET_STREAM` frame with the
1419    /// given error code).
1420    Write { error_code: u64 },
1421    /// Shut down both directions (sends both `STOP_SENDING` and `RESET_STREAM`
1422    /// frames).
1423    Both {
1424        read_error_code: u64,
1425        write_error_code: u64,
1426    },
1427}
1428
1429/// Sends [`H3Command`]s to an [H3Driver]. The sender is typed and internally
1430/// wraps instances of `T` in the appropriate `H3Command` variant.
1431pub struct RequestSender<C, T> {
1432    sender: UnboundedSender<C>,
1433    // Required to work around dangling type parameter
1434    _r: PhantomData<fn() -> T>,
1435}
1436
1437impl<C, T: Into<C>> RequestSender<C, T> {
1438    /// Send a request to the [H3Driver]. This can only fail if the driver is
1439    /// gone.
1440    #[inline(always)]
1441    pub fn send(&self, v: T) -> Result<(), mpsc::error::SendError<C>> {
1442        self.sender.send(v.into())
1443    }
1444}
1445
1446impl<C, T> Clone for RequestSender<C, T> {
1447    fn clone(&self) -> Self {
1448        Self {
1449            sender: self.sender.clone(),
1450            _r: Default::default(),
1451        }
1452    }
1453}
1454
1455/// Interface to communicate with a paired [H3Driver].
1456///
1457/// An [H3Controller] receives [`H3Event`]s from its driver, which must be
1458/// consumed by the application built on top of the driver to react to incoming
1459/// events. The controller also allows the application to send ad-hoc
1460/// [`H3Command`]s to the driver, which will be processed when the driver waits
1461/// for incoming data.
1462pub struct H3Controller<H: DriverHooks> {
1463    /// Sends [`H3Command`]s to the [H3Driver], like [`QuicCommand`]s or
1464    /// outbound HTTP requests.
1465    cmd_sender: UnboundedSender<H::Command>,
1466    /// Receives [`H3Event`]s from the [H3Driver]. Can be extracted and
1467    /// used independently of the [H3Controller].
1468    h3_event_recv: Option<UnboundedReceiver<H::Event>>,
1469}
1470
1471impl<H: DriverHooks> H3Controller<H> {
1472    /// Gets a mut reference to the [`H3Event`] receiver for the paired
1473    /// [H3Driver].
1474    pub fn event_receiver_mut(&mut self) -> &mut UnboundedReceiver<H::Event> {
1475        self.h3_event_recv
1476            .as_mut()
1477            .expect("No event receiver on H3Controller")
1478    }
1479
1480    /// Takes the [`H3Event`] receiver for the paired [H3Driver].
1481    pub fn take_event_receiver(&mut self) -> UnboundedReceiver<H::Event> {
1482        self.h3_event_recv
1483            .take()
1484            .expect("No event receiver on H3Controller")
1485    }
1486
1487    /// Creates a [`QuicCommand`] sender for the paired [H3Driver].
1488    pub fn cmd_sender(&self) -> RequestSender<H::Command, QuicCommand> {
1489        RequestSender {
1490            sender: self.cmd_sender.clone(),
1491            _r: Default::default(),
1492        }
1493    }
1494
1495    /// Sends a GOAWAY frame to initiate a graceful connection shutdown.
1496    pub fn send_goaway(&self) {
1497        let _ = self.cmd_sender.send(H3Command::GoAway.into());
1498    }
1499
1500    /// Creates an [`H3Command`] sender for the paired [H3Driver].
1501    pub fn h3_cmd_sender(&self) -> RequestSender<H::Command, H3Command> {
1502        RequestSender {
1503            sender: self.cmd_sender.clone(),
1504            _r: Default::default(),
1505        }
1506    }
1507
1508    /// Shuts down a stream in the specified direction(s) and removes it from
1509    /// local state.
1510    ///
1511    /// This removes the stream from local state and sends a `RESET_STREAM`
1512    /// frame (for write direction) and/or a `STOP_SENDING` frame (for read
1513    /// direction) to the peer, depending on the [`StreamShutdown`] variant.
1514    pub fn shutdown_stream(&self, stream_id: u64, shutdown: StreamShutdown) {
1515        let _ = self.cmd_sender.send(
1516            H3Command::ShutdownStream {
1517                stream_id,
1518                shutdown,
1519            }
1520            .into(),
1521        );
1522    }
1523}