tokio_quiche/http3/driver/
mod.rs

1// Copyright (C) 2025, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27mod client;
28/// Wrapper for running HTTP/3 connections.
29pub mod connection;
30mod datagram;
31// `DriverHooks` must stay private to prevent users from creating their own
32// H3Drivers.
33mod hooks;
34mod server;
35mod streams;
36
37use std::collections::BTreeMap;
38use std::error::Error;
39use std::fmt;
40use std::marker::PhantomData;
41use std::sync::Arc;
42
43use datagram_socket::StreamClosureKind;
44use foundations::telemetry::log;
45use futures::FutureExt;
46use futures_util::stream::FuturesUnordered;
47use quiche::h3;
48use tokio::select;
49use tokio::sync::mpsc;
50use tokio::sync::mpsc::error::TryRecvError;
51use tokio::sync::mpsc::error::TrySendError;
52use tokio::sync::mpsc::UnboundedReceiver;
53use tokio::sync::mpsc::UnboundedSender;
54use tokio_stream::StreamExt;
55use tokio_util::sync::PollSender;
56
57use self::hooks::DriverHooks;
58use self::hooks::InboundHeaders;
59use self::streams::FlowCtx;
60use self::streams::HaveUpstreamCapacity;
61use self::streams::ReceivedDownstreamData;
62use self::streams::StreamCtx;
63use self::streams::StreamReady;
64use self::streams::WaitForDownstreamData;
65use self::streams::WaitForStream;
66use self::streams::WaitForUpstreamCapacity;
67use crate::buf_factory::BufFactory;
68use crate::buf_factory::PooledBuf;
69use crate::buf_factory::PooledDgram;
70use crate::http3::settings::Http3Settings;
71use crate::http3::H3AuditStats;
72use crate::metrics::Metrics;
73use crate::quic::HandshakeInfo;
74use crate::quic::QuicCommand;
75use crate::quic::QuicheConnection;
76use crate::ApplicationOverQuic;
77use crate::QuicResult;
78
79pub use self::client::ClientEventStream;
80pub use self::client::ClientH3Command;
81pub use self::client::ClientH3Controller;
82pub use self::client::ClientH3Driver;
83pub use self::client::ClientH3Event;
84pub use self::client::ClientRequestSender;
85pub use self::client::NewClientRequest;
86pub use self::server::ServerEventStream;
87pub use self::server::ServerH3Command;
88pub use self::server::ServerH3Controller;
89pub use self::server::ServerH3Driver;
90pub use self::server::ServerH3Event;
91
92// The default priority for HTTP/3 responses if the application didn't provide
93// one.
94const DEFAULT_PRIO: h3::Priority = h3::Priority::new(3, true);
95
96// For a stream use a channel with 16 entries, which works out to 16 * 64KB =
97// 1MB of max buffered data.
98#[cfg(not(any(test, debug_assertions)))]
99const STREAM_CAPACITY: usize = 16;
100#[cfg(any(test, debug_assertions))]
101const STREAM_CAPACITY: usize = 1; // Set to 1 to stress write_pending under test conditions
102
103// For *all* flows use a shared channel with 2048 entries, which works out
104// to 3MB of max buffered data at 1500 bytes per datagram.
105const FLOW_CAPACITY: usize = 2048;
106
107/// Used by a local task to send [`OutboundFrame`]s to a peer on the
108/// stream or flow associated with this channel.
109pub type OutboundFrameSender = PollSender<OutboundFrame>;
110
111/// Used internally to receive [`OutboundFrame`]s which should be sent to a peer
112/// on the stream or flow associated with this channel.
113type OutboundFrameStream = mpsc::Receiver<OutboundFrame>;
114
115/// Used internally to send [`InboundFrame`]s (data) from the peer to a local
116/// task on the stream or flow associated with this channel.
117type InboundFrameSender = PollSender<InboundFrame>;
118
119/// Used by a local task to receive [`InboundFrame`]s (data) on the stream or
120/// flow associated with this channel.
121pub type InboundFrameStream = mpsc::Receiver<InboundFrame>;
122
123/// The error type used internally in [H3Driver].
124///
125/// Note that [`ApplicationOverQuic`] errors are not exposed to users at this
126/// time. The type is public to document the failure modes in [H3Driver].
127#[derive(Debug, PartialEq, Eq)]
128#[non_exhaustive]
129pub enum H3ConnectionError {
130    /// The controller task was shut down and is no longer listening.
131    ControllerWentAway,
132    /// Other error at the connection, but not stream level.
133    H3(h3::Error),
134    /// Received a GOAWAY frame from the peer.
135    GoAway,
136    /// Received data for a stream that was closed or never opened.
137    NonexistentStream,
138    /// The server's post-accept timeout was hit.
139    /// The timeout can be configured in [`Http3Settings`].
140    PostAcceptTimeout,
141}
142
143impl From<h3::Error> for H3ConnectionError {
144    fn from(err: h3::Error) -> Self {
145        H3ConnectionError::H3(err)
146    }
147}
148
149impl From<quiche::Error> for H3ConnectionError {
150    fn from(err: quiche::Error) -> Self {
151        H3ConnectionError::H3(h3::Error::TransportError(err))
152    }
153}
154
155impl Error for H3ConnectionError {}
156
157impl fmt::Display for H3ConnectionError {
158    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
159        let s: &dyn fmt::Display = match self {
160            Self::ControllerWentAway => &"controller went away",
161            Self::H3(e) => e,
162            Self::GoAway => &"goaway",
163            Self::NonexistentStream => &"nonexistent stream",
164            Self::PostAcceptTimeout => &"post accept timeout hit",
165        };
166
167        write!(f, "H3ConnectionError: {s}")
168    }
169}
170
171type H3ConnectionResult<T> = Result<T, H3ConnectionError>;
172
173/// HTTP/3 headers that were received on a stream.
174///
175/// `recv` is used to read the message body, while `send` is used to transmit
176/// data back to the peer.
177pub struct IncomingH3Headers {
178    /// Stream ID of the frame.
179    pub stream_id: u64,
180    /// The actual [`h3::Header`]s which were received.
181    pub headers: Vec<h3::Header>,
182    /// An [`OutboundFrameSender`] for streaming body data to the peer. For
183    /// [ClientH3Driver], note that the request body can also be passed a
184    /// cloned sender via [`NewClientRequest`].
185    pub send: OutboundFrameSender,
186    /// An [`InboundFrameStream`] of body data received from the peer.
187    pub recv: InboundFrameStream,
188    /// Whether there is a body associated with the incoming headers.
189    pub read_fin: bool,
190    /// Handle to the [`H3AuditStats`] for the message's stream.
191    pub h3_audit_stats: Arc<H3AuditStats>,
192}
193
194impl fmt::Debug for IncomingH3Headers {
195    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
196        f.debug_struct("IncomingH3Headers")
197            .field("stream_id", &self.stream_id)
198            .field("headers", &self.headers)
199            .field("read_fin", &self.read_fin)
200            .field("h3_audit_stats", &self.h3_audit_stats)
201            .finish()
202    }
203}
204
205/// [`H3Event`]s are produced by an [H3Driver] to describe HTTP/3 state updates.
206///
207/// Both [ServerH3Driver] and [ClientH3Driver] may extend this enum with
208/// endpoint-specific variants. The events must be consumed by users of the
209/// drivers, like a higher-level `Server` or `Client` controller.
210#[derive(Debug)]
211pub enum H3Event {
212    /// A SETTINGS frame was received.
213    IncomingSettings {
214        /// Raw HTTP/3 setting pairs, in the order received from the peer.
215        settings: Vec<(u64, u64)>,
216    },
217
218    /// A HEADERS frame was received on the given stream. This is either a
219    /// request or a response depending on the perspective of the [`H3Event`]
220    /// receiver.
221    IncomingHeaders(IncomingH3Headers),
222
223    /// A DATAGRAM flow was created and associated with the given `flow_id`.
224    /// This event is fired before a HEADERS event for CONNECT[-UDP] requests.
225    NewFlow {
226        /// Flow ID of the new flow.
227        flow_id: u64,
228        /// An [`OutboundFrameSender`] for transmitting datagrams to the peer.
229        send: OutboundFrameSender,
230        /// An [`InboundFrameStream`] for receiving datagrams from the peer.
231        recv: InboundFrameStream,
232    },
233    /// A RST_STREAM frame was seen on the given `stream_id`. The user of the
234    /// driver should clean up any state allocated for this stream.
235    ResetStream { stream_id: u64 },
236    /// The connection has irrecoverably errored and is shutting down.
237    ConnectionError(h3::Error),
238    /// The connection has been shutdown, optionally due to an
239    /// [`H3ConnectionError`].
240    ConnectionShutdown(Option<H3ConnectionError>),
241    /// Body data has been received over a stream.
242    BodyBytesReceived {
243        /// Stream ID of the body data.
244        stream_id: u64,
245        /// Number of bytes received.
246        num_bytes: u64,
247        /// Whether the stream is finished and won't yield any more data.
248        fin: bool,
249    },
250    /// The stream has been closed. This is used to signal stream closures that
251    /// don't result from RST_STREAM frames, unlike the
252    /// [`H3Event::ResetStream`] variant.
253    StreamClosed { stream_id: u64 },
254}
255
256impl H3Event {
257    /// Generates an event from an applicable [`H3ConnectionError`].
258    fn from_error(err: &H3ConnectionError) -> Option<Self> {
259        Some(match err {
260            H3ConnectionError::H3(e) => Self::ConnectionError(*e),
261            H3ConnectionError::PostAcceptTimeout => Self::ConnectionShutdown(
262                Some(H3ConnectionError::PostAcceptTimeout),
263            ),
264            _ => return None,
265        })
266    }
267}
268
269/// An [`OutboundFrame`] is a data frame that should be sent from a local task
270/// to a peer over a [`quiche::h3::Connection`].
271///
272/// This is used, for example, to send response body data to a peer, or proxied
273/// UDP datagrams.
274#[derive(Debug)]
275pub enum OutboundFrame {
276    /// Response headers to be sent to the peer, with optional priority.
277    Headers(Vec<h3::Header>, Option<quiche::h3::Priority>),
278    /// Response body/CONNECT downstream data plus FIN flag.
279    #[cfg(feature = "zero-copy")]
280    Body(crate::buf_factory::QuicheBuf, bool),
281    /// Response body/CONNECT downstream data plus FIN flag.
282    #[cfg(not(feature = "zero-copy"))]
283    Body(PooledBuf, bool),
284    /// CONNECT-UDP (DATAGRAM) downstream data plus flow ID.
285    Datagram(PooledDgram, u64),
286    /// An error encountered when serving the request. Stream should be closed.
287    PeerStreamError,
288    /// DATAGRAM flow explicitly closed.
289    FlowShutdown { flow_id: u64, stream_id: u64 },
290}
291
292impl OutboundFrame {
293    /// Creates a body frame with the provided buffer.
294    pub fn body(body: PooledBuf, fin: bool) -> Self {
295        #[cfg(feature = "zero-copy")]
296        let body = crate::buf_factory::QuicheBuf::new(body);
297
298        OutboundFrame::Body(body, fin)
299    }
300}
301
302/// An [`InboundFrame`] is a data frame that was received from the peer over a
303/// [`quiche::h3::Connection`]. This is used by peers to send body or datagrams
304/// to the local task.
305#[derive(Debug)]
306pub enum InboundFrame {
307    /// Request body/CONNECT upstream data plus FIN flag.
308    Body(PooledBuf, bool),
309    /// CONNECT-UDP (DATAGRAM) upstream data.
310    Datagram(PooledDgram),
311}
312
313/// A ready-made [`ApplicationOverQuic`] which can handle HTTP/3 and MASQUE.
314/// Depending on the `DriverHooks` in use, it powers either a client or a
315/// server.
316///
317/// Use the [ClientH3Driver] and [ServerH3Driver] aliases to access the
318/// respective driver types. The driver is passed into an I/O loop and
319/// communicates with the driver's user (e.g., an HTTP client or a server) via
320/// its associated [H3Controller]. The controller allows the application to both
321/// listen for [`H3Event`]s of note and send [`H3Command`]s into the I/O loop.
322pub struct H3Driver<H: DriverHooks> {
323    /// Configuration used to initialize `conn`. Created from [`Http3Settings`]
324    /// in the constructor.
325    h3_config: h3::Config,
326    /// The underlying HTTP/3 connection. Initialized in
327    /// `ApplicationOverQuic::on_conn_established`.
328    conn: Option<h3::Connection>,
329    /// State required by the client/server hooks.
330    hooks: H,
331    /// Sends [`H3Event`]s to the [H3Controller] paired with this driver.
332    h3_event_sender: mpsc::UnboundedSender<H::Event>,
333    /// Receives [`H3Command`]s from the [H3Controller] paired with this driver.
334    cmd_recv: mpsc::UnboundedReceiver<H::Command>,
335
336    /// A map of stream IDs to their [StreamCtx]. This is mainly used to
337    /// retrieve the internal Tokio channels associated with the stream.
338    stream_map: BTreeMap<u64, StreamCtx>,
339    /// A map of flow IDs to their [FlowCtx]. This is mainly used to retrieve
340    /// the internal Tokio channels associated with the flow.
341    flow_map: BTreeMap<u64, FlowCtx>,
342    /// Set of [`WaitForStream`] futures. A stream is added to this set if
343    /// we need to send to it and its channel is at capacity, or if we need
344    /// data from its channel and the channel is empty.
345    waiting_streams: FuturesUnordered<WaitForStream>,
346
347    /// Receives [`OutboundFrame`]s from all datagram flows on the connection.
348    dgram_recv: OutboundFrameStream,
349    /// Keeps the datagram channel open such that datagram flows can be created.
350    dgram_send: OutboundFrameSender,
351
352    /// The buffer used to interact with the underlying IoWorker.
353    pooled_buf: PooledBuf,
354    /// The maximum HTTP/3 stream ID seen on this connection.
355    max_stream_seen: u64,
356
357    /// Tracks whether we have forwarded the HTTP/3 SETTINGS frame
358    /// to the [H3Controller] once.
359    settings_received_and_forwarded: bool,
360}
361
362impl<H: DriverHooks> H3Driver<H> {
363    /// Builds a new [H3Driver] and an associated [H3Controller].
364    ///
365    /// The driver should then be passed to
366    /// [`InitialQuicConnection`](crate::InitialQuicConnection)'s `start`
367    /// method.
368    pub fn new(http3_settings: Http3Settings) -> (Self, H3Controller<H>) {
369        let (dgram_send, dgram_recv) = mpsc::channel(FLOW_CAPACITY);
370        let (cmd_sender, cmd_recv) = mpsc::unbounded_channel();
371        let (h3_event_sender, h3_event_recv) = mpsc::unbounded_channel();
372
373        (
374            H3Driver {
375                h3_config: (&http3_settings).into(),
376                conn: None,
377                hooks: H::new(&http3_settings),
378                h3_event_sender,
379                cmd_recv,
380
381                stream_map: BTreeMap::new(),
382                flow_map: BTreeMap::new(),
383
384                dgram_recv,
385                dgram_send: PollSender::new(dgram_send),
386                pooled_buf: BufFactory::get_max_buf(),
387                max_stream_seen: 0,
388
389                waiting_streams: FuturesUnordered::new(),
390
391                settings_received_and_forwarded: false,
392            },
393            H3Controller {
394                cmd_sender,
395                h3_event_recv: Some(h3_event_recv),
396            },
397        )
398    }
399
400    /// Retrieve the [FlowCtx] associated with the given `flow_id`. If no
401    /// context is found, a new one will be created.
402    fn get_or_insert_flow(
403        &mut self, flow_id: u64,
404    ) -> H3ConnectionResult<&mut FlowCtx> {
405        use std::collections::btree_map::Entry;
406        Ok(match self.flow_map.entry(flow_id) {
407            Entry::Vacant(e) => {
408                // This is a datagram for a new flow we haven't seen before
409                let (flow, recv) = FlowCtx::new(FLOW_CAPACITY);
410                let flow_req = H3Event::NewFlow {
411                    flow_id,
412                    recv,
413                    send: self.dgram_send.clone(),
414                };
415                self.h3_event_sender
416                    .send(flow_req.into())
417                    .map_err(|_| H3ConnectionError::ControllerWentAway)?;
418                e.insert(flow)
419            },
420            Entry::Occupied(e) => e.into_mut(),
421        })
422    }
423
424    /// Adds a [StreamCtx] to the stream map with the given `stream_id`.
425    fn insert_stream(&mut self, stream_id: u64, ctx: StreamCtx) {
426        self.stream_map.insert(stream_id, ctx);
427        self.max_stream_seen = self.max_stream_seen.max(stream_id);
428    }
429
430    /// Fetches body chunks from the [`quiche::h3::Connection`] and forwards
431    /// them to the stream's associated [`InboundFrameStream`].
432    fn process_h3_data(
433        &mut self, qconn: &mut QuicheConnection, stream_id: u64,
434    ) -> H3ConnectionResult<()> {
435        // Split self borrow between conn and stream_map
436        let conn = self.conn.as_mut().ok_or(Self::connection_not_present())?;
437        let ctx = self
438            .stream_map
439            .get_mut(&stream_id)
440            .ok_or(H3ConnectionError::NonexistentStream)?;
441
442        enum StreamStatus {
443            Done { close: bool },
444            Blocked,
445        }
446
447        let status = loop {
448            let Some(sender) = ctx.send.as_ref().and_then(PollSender::get_ref)
449            else {
450                // already waiting for capacity
451                break StreamStatus::Done { close: false };
452            };
453
454            let permit = match sender.try_reserve() {
455                Ok(permit) => permit,
456                Err(TrySendError::Closed(())) => {
457                    break StreamStatus::Done {
458                        close: ctx.fin_sent && ctx.fin_recv,
459                    };
460                },
461                Err(TrySendError::Full(())) => {
462                    if ctx.fin_recv || qconn.stream_readable(stream_id) {
463                        break StreamStatus::Blocked;
464                    }
465                    break StreamStatus::Done { close: false };
466                },
467            };
468
469            if ctx.fin_recv {
470                // Signal end-of-body to upstream
471                permit
472                    .send(InboundFrame::Body(BufFactory::get_empty_buf(), true));
473                break StreamStatus::Done {
474                    close: ctx.fin_sent,
475                };
476            }
477
478            match conn.recv_body(qconn, stream_id, &mut self.pooled_buf) {
479                Ok(n) => {
480                    let mut body = std::mem::replace(
481                        &mut self.pooled_buf,
482                        BufFactory::get_max_buf(),
483                    );
484                    body.truncate(n);
485
486                    ctx.audit_stats.add_downstream_bytes_recvd(n as u64);
487                    let event = H3Event::BodyBytesReceived {
488                        stream_id,
489                        num_bytes: n as u64,
490                        fin: false,
491                    };
492                    let _ = self.h3_event_sender.send(event.into());
493
494                    permit.send(InboundFrame::Body(body, false));
495                },
496                Err(h3::Error::Done) =>
497                    break StreamStatus::Done { close: false },
498                Err(_) => break StreamStatus::Done { close: true },
499            }
500        };
501
502        match status {
503            StreamStatus::Done { close } => {
504                if close {
505                    return self.finish_stream(qconn, stream_id, None, None);
506                }
507
508                // The QUIC stream is finished, manually invoke `process_h3_fin`
509                // in case `h3::poll()` is never called again.
510                //
511                // Note that this case will not conflict with StreamStatus::Done
512                // being returned due to the body channel being
513                // blocked. qconn.stream_finished() will guarantee
514                // that we've fully parsed the body as it only returns true
515                // if we've seen a Fin for the read half of the stream.
516                if !ctx.fin_recv && qconn.stream_finished(stream_id) {
517                    return self.process_h3_fin(qconn, stream_id);
518                }
519            },
520            StreamStatus::Blocked => {
521                self.waiting_streams.push(ctx.wait_for_send(stream_id));
522            },
523        }
524
525        Ok(())
526    }
527
528    /// Processes an end-of-stream event from the [`quiche::h3::Connection`].
529    fn process_h3_fin(
530        &mut self, qconn: &mut QuicheConnection, stream_id: u64,
531    ) -> H3ConnectionResult<()> {
532        let ctx = self.stream_map.get_mut(&stream_id).filter(|c| !c.fin_recv);
533        let Some(ctx) = ctx else {
534            // Stream is already finished, nothing to do
535            return Ok(());
536        };
537
538        ctx.fin_recv = true;
539        ctx.audit_stats
540            .set_recvd_stream_fin(StreamClosureKind::Explicit);
541
542        // It's important to send this H3Event before process_h3_data so that
543        // a server can (potentially) generate the control response before the
544        // corresponding receiver drops.
545        let event = H3Event::BodyBytesReceived {
546            stream_id,
547            num_bytes: 0,
548            fin: true,
549        };
550        let _ = self.h3_event_sender.send(event.into());
551
552        // Communicate fin to upstream. Since `ctx.fin_recv` is true now,
553        // there can't be a recursive loop.
554        self.process_h3_data(qconn, stream_id)
555    }
556
557    /// Processes a single [`quiche::h3::Event`] received from the underlying
558    /// [`quiche::h3::Connection`]. Some events are dispatched to helper
559    /// methods.
560    fn process_read_event(
561        &mut self, qconn: &mut QuicheConnection, stream_id: u64, event: h3::Event,
562    ) -> H3ConnectionResult<()> {
563        self.forward_settings()?;
564
565        match event {
566            // Requests/responses are exclusively handled by hooks.
567            h3::Event::Headers { list, more_frames } =>
568                H::headers_received(self, qconn, InboundHeaders {
569                    stream_id,
570                    headers: list,
571                    has_body: more_frames,
572                }),
573
574            h3::Event::Data => self.process_h3_data(qconn, stream_id),
575            h3::Event::Finished => self.process_h3_fin(qconn, stream_id),
576
577            h3::Event::Reset(code) => {
578                if let Some(ctx) = self.stream_map.get(&stream_id) {
579                    ctx.audit_stats.set_recvd_reset_stream_error_code(code as _);
580                }
581
582                self.h3_event_sender
583                    .send(H3Event::ResetStream { stream_id }.into())
584                    .map_err(|_| H3ConnectionError::ControllerWentAway)?;
585
586                self.finish_stream(qconn, stream_id, None, None)
587            },
588
589            h3::Event::PriorityUpdate => Ok(()),
590            h3::Event::GoAway => Err(H3ConnectionError::GoAway),
591        }
592    }
593
594    /// The SETTINGS frame can be received at any point, so we
595    /// need to check `peer_settings_raw` to decide if we've received it.
596    ///
597    /// Settings should only be sent once, so we generate a single event
598    /// when `peer_settings_raw` transitions from None to Some.
599    fn forward_settings(&mut self) -> H3ConnectionResult<()> {
600        if self.settings_received_and_forwarded {
601            return Ok(());
602        }
603
604        // capture the peer settings and forward it
605        if let Some(settings) = self.conn_mut()?.peer_settings_raw() {
606            let incoming_settings = H3Event::IncomingSettings {
607                settings: settings.to_vec(),
608            };
609
610            self.h3_event_sender
611                .send(incoming_settings.into())
612                .map_err(|_| H3ConnectionError::ControllerWentAway)?;
613
614            self.settings_received_and_forwarded = true;
615        }
616        Ok(())
617    }
618
619    /// Send an individual frame to the underlying [`quiche::h3::Connection`] to
620    /// be flushed at a later time.
621    ///
622    /// `Self::process_writes` will iterate over all writable streams and call
623    /// this method in a loop for each stream to send all writable packets.
624    fn process_write_frame(
625        conn: &mut h3::Connection, qconn: &mut QuicheConnection,
626        ctx: &mut StreamCtx,
627    ) -> h3::Result<()> {
628        let Some(frame) = &mut ctx.queued_frame else {
629            return Ok(());
630        };
631
632        let audit_stats = &ctx.audit_stats;
633        let stream_id = audit_stats.stream_id();
634
635        match frame {
636            OutboundFrame::Headers(headers, priority) => {
637                let prio = priority.as_ref().unwrap_or(&DEFAULT_PRIO);
638
639                if ctx.initial_headers_sent {
640                    // Initial headers were already sent, send additional
641                    // headers now.
642                    conn.send_additional_headers_with_priority(
643                        qconn, stream_id, headers, prio, false, false,
644                    )
645                } else {
646                    // Send initial headers.
647                    conn.send_response_with_priority(
648                        qconn, stream_id, headers, prio, false,
649                    )
650                    .inspect(|_| ctx.initial_headers_sent = true)
651                }
652            },
653
654            OutboundFrame::Body(body, fin) => {
655                let len = body.as_ref().len();
656                if len == 0 && !*fin {
657                    // quiche doesn't allow sending an empty body when the fin
658                    // flag is not set
659                    return Ok(());
660                }
661                if *fin {
662                    // If this is the last body frame, close the receiver in the
663                    // stream map to signal that we shouldn't
664                    // receive any more frames.
665                    ctx.recv.as_mut().expect("channel").close();
666                }
667                #[cfg(feature = "zero-copy")]
668                let n = conn.send_body_zc(qconn, stream_id, body, *fin)?;
669
670                #[cfg(not(feature = "zero-copy"))]
671                let n = conn.send_body(qconn, stream_id, body, *fin)?;
672
673                audit_stats.add_downstream_bytes_sent(n as _);
674                if n != len {
675                    // Couldn't write the entire body, keep what remains for
676                    // future retry.
677                    #[cfg(not(feature = "zero-copy"))]
678                    body.pop_front(n);
679
680                    Err(h3::Error::StreamBlocked)
681                } else {
682                    if *fin {
683                        ctx.fin_sent = true;
684                        audit_stats
685                            .set_sent_stream_fin(StreamClosureKind::Explicit);
686                        if ctx.fin_recv {
687                            // Return a TransportError to trigger stream cleanup
688                            // instead of h3::Error::Done
689                            return Err(h3::Error::TransportError(
690                                quiche::Error::Done,
691                            ));
692                        }
693                    }
694                    Ok(())
695                }
696            },
697
698            OutboundFrame::PeerStreamError => Err(h3::Error::MessageError),
699
700            OutboundFrame::FlowShutdown { .. } => {
701                unreachable!("Only flows send shutdowns")
702            },
703
704            OutboundFrame::Datagram(..) => {
705                unreachable!("Only flows send datagrams")
706            },
707        }
708    }
709
710    /// Resumes reads or writes to the connection when a stream channel becomes
711    /// unblocked.
712    ///
713    /// If we were waiting for more data from a channel, we resume writing to
714    /// the connection. Otherwise, we were blocked on channel capacity and
715    /// continue reading from the connection. `Upstream` in this context is
716    /// the consumer of the stream.
717    fn upstream_ready(
718        &mut self, qconn: &mut QuicheConnection, ready: StreamReady,
719    ) -> H3ConnectionResult<()> {
720        match ready {
721            StreamReady::Downstream(r) => self.upstream_read_ready(qconn, r),
722            StreamReady::Upstream(r) => self.upstream_write_ready(qconn, r),
723        }
724    }
725
726    fn upstream_read_ready(
727        &mut self, qconn: &mut QuicheConnection,
728        read_ready: ReceivedDownstreamData,
729    ) -> H3ConnectionResult<()> {
730        let ReceivedDownstreamData {
731            stream_id,
732            chan,
733            data,
734        } = read_ready;
735
736        match self.stream_map.get_mut(&stream_id) {
737            None => Ok(()),
738            Some(stream) => {
739                stream.recv = Some(chan);
740                stream.queued_frame = data;
741                self.process_writable_stream(qconn, stream_id)
742            },
743        }
744    }
745
746    fn upstream_write_ready(
747        &mut self, qconn: &mut QuicheConnection,
748        write_ready: HaveUpstreamCapacity,
749    ) -> H3ConnectionResult<()> {
750        let HaveUpstreamCapacity {
751            stream_id,
752            mut chan,
753        } = write_ready;
754
755        match self.stream_map.get_mut(&stream_id) {
756            None => Ok(()),
757            Some(stream) => {
758                chan.abort_send(); // Have to do it to release the associated permit
759                stream.send = Some(chan);
760                self.process_h3_data(qconn, stream_id)
761            },
762        }
763    }
764
765    /// Processes all queued outbound datagrams from the `dgram_recv` channel.
766    fn dgram_ready(
767        &mut self, qconn: &mut QuicheConnection, frame: OutboundFrame,
768    ) -> H3ConnectionResult<()> {
769        let mut frame = Ok(frame);
770
771        loop {
772            match frame {
773                Ok(OutboundFrame::Datagram(dgram, flow_id)) => {
774                    // Drop datagrams if there is no capacity
775                    let _ = datagram::send_h3_dgram(qconn, flow_id, dgram);
776                },
777                Ok(OutboundFrame::FlowShutdown { flow_id, stream_id }) => {
778                    self.finish_stream(
779                        qconn,
780                        stream_id,
781                        Some(quiche::h3::WireErrorCode::NoError as u64),
782                        Some(quiche::h3::WireErrorCode::NoError as u64),
783                    )?;
784                    self.flow_map.remove(&flow_id);
785                    break;
786                },
787                Ok(_) => unreachable!("Flows can't send frame of other types"),
788                Err(TryRecvError::Empty) => break,
789                Err(TryRecvError::Disconnected) =>
790                    return Err(H3ConnectionError::ControllerWentAway),
791            }
792
793            frame = self.dgram_recv.try_recv();
794        }
795
796        Ok(())
797    }
798
799    /// Return a mutable reference to the driver's HTTP/3 connection.
800    ///
801    /// If the connection doesn't exist yet, this function returns
802    /// a `Self::connection_not_present()` error.
803    fn conn_mut(&mut self) -> H3ConnectionResult<&mut h3::Connection> {
804        self.conn.as_mut().ok_or(Self::connection_not_present())
805    }
806
807    /// Alias for [`quiche::Error::TlsFail`], which is used in the case where
808    /// this driver doesn't have an established HTTP/3 connection attached
809    /// to it yet.
810    const fn connection_not_present() -> H3ConnectionError {
811        H3ConnectionError::H3(h3::Error::TransportError(quiche::Error::TlsFail))
812    }
813
814    /// Removes a stream from the stream map if it exists. Also optionally sends
815    /// `RESET` or `STOP_SENDING` frames if `write` or `read` is set to an
816    /// error code, respectively.
817    fn finish_stream(
818        &mut self, qconn: &mut QuicheConnection, stream_id: u64,
819        read: Option<u64>, write: Option<u64>,
820    ) -> H3ConnectionResult<()> {
821        let Some(stream_ctx) = self.stream_map.remove(&stream_id) else {
822            return Ok(());
823        };
824
825        let audit_stats = &stream_ctx.audit_stats;
826
827        if let Some(err) = read {
828            audit_stats.set_sent_stop_sending_error_code(err as _);
829            let _ = qconn.stream_shutdown(stream_id, quiche::Shutdown::Read, err);
830        }
831
832        if let Some(err) = write {
833            audit_stats.set_sent_reset_stream_error_code(err as _);
834            let _ =
835                qconn.stream_shutdown(stream_id, quiche::Shutdown::Write, err);
836        }
837
838        // Find if the stream also has any pending futures associated with it
839        for pending in self.waiting_streams.iter_mut() {
840            match pending {
841                WaitForStream::Downstream(WaitForDownstreamData {
842                    stream_id: id,
843                    chan: Some(chan),
844                }) if stream_id == *id => {
845                    chan.close();
846                },
847                WaitForStream::Upstream(WaitForUpstreamCapacity {
848                    stream_id: id,
849                    chan: Some(chan),
850                }) if stream_id == *id => {
851                    chan.close();
852                },
853                _ => {},
854            }
855        }
856
857        // Close any DATAGRAM-proxying channels when we close the stream, if they
858        // exist
859        if let Some(mapped_flow_id) = stream_ctx.associated_dgram_flow_id {
860            self.flow_map.remove(&mapped_flow_id);
861        }
862
863        if qconn.is_server() {
864            // Signal the server to remove the stream from its map
865            let _ = self
866                .h3_event_sender
867                .send(H3Event::StreamClosed { stream_id }.into());
868        }
869
870        Ok(())
871    }
872
873    /// Handles a regular [`H3Command`]. May be called internally by
874    /// [DriverHooks] for non-endpoint-specific [`H3Command`]s.
875    fn handle_core_command(
876        &mut self, qconn: &mut QuicheConnection, cmd: H3Command,
877    ) -> H3ConnectionResult<()> {
878        match cmd {
879            H3Command::QuicCmd(cmd) => cmd.execute(qconn),
880            H3Command::GoAway => {
881                let max_id = self.max_stream_seen;
882                self.conn_mut()
883                    .expect("connection should be established")
884                    .send_goaway(qconn, max_id)?;
885            },
886        }
887        Ok(())
888    }
889}
890
891impl<H: DriverHooks> H3Driver<H> {
892    /// Reads all buffered datagrams out of `qconn` and distributes them to
893    /// their flow channels.
894    fn process_available_dgrams(
895        &mut self, qconn: &mut QuicheConnection,
896    ) -> H3ConnectionResult<()> {
897        loop {
898            match datagram::receive_h3_dgram(qconn) {
899                Ok((flow_id, dgram)) => {
900                    self.get_or_insert_flow(flow_id)?.send_best_effort(dgram);
901                },
902                Err(quiche::Error::Done) => return Ok(()),
903                Err(err) => return Err(H3ConnectionError::from(err)),
904            }
905        }
906    }
907
908    /// Flushes any queued-up frames for `stream_id` into `qconn` until either
909    /// there is no more capacity in `qconn` or no more frames to send.
910    fn process_writable_stream(
911        &mut self, qconn: &mut QuicheConnection, stream_id: u64,
912    ) -> H3ConnectionResult<()> {
913        // Split self borrow between conn and stream_map
914        let conn = self.conn.as_mut().ok_or(Self::connection_not_present())?;
915        let Some(ctx) = self.stream_map.get_mut(&stream_id) else {
916            return Ok(()); // Unknown stream_id
917        };
918
919        loop {
920            // Process each writable frame, queue the next frame for processing
921            // and shut down any errored streams.
922            match Self::process_write_frame(conn, qconn, ctx) {
923                Ok(()) => ctx.queued_frame = None,
924                Err(h3::Error::StreamBlocked | h3::Error::Done) => break,
925                Err(h3::Error::MessageError) => {
926                    return self.finish_stream(
927                        qconn,
928                        stream_id,
929                        Some(quiche::h3::WireErrorCode::MessageError as u64),
930                        Some(quiche::h3::WireErrorCode::MessageError as u64),
931                    );
932                },
933                Err(h3::Error::TransportError(quiche::Error::StreamStopped(
934                    e,
935                ))) => {
936                    ctx.audit_stats.set_recvd_stop_sending_error_code(e as i64);
937                    return self.finish_stream(qconn, stream_id, Some(e), None);
938                },
939                Err(h3::Error::TransportError(
940                    quiche::Error::InvalidStreamState(stream),
941                )) => {
942                    return self.finish_stream(qconn, stream, None, None);
943                },
944                Err(_) => {
945                    return self.finish_stream(qconn, stream_id, None, None);
946                },
947            }
948
949            let Some(recv) = ctx.recv.as_mut() else {
950                return Ok(()); // This stream is already waiting for data
951            };
952
953            // Attempt to queue the next frame for processing. The corresponding
954            // sender is created at the same time as the `StreamCtx`
955            // and ultimately ends up in an `H3Body`. The body then
956            // determines which frames to send to the peer via
957            // this processing loop.
958            match recv.try_recv() {
959                Ok(frame) => ctx.queued_frame = Some(frame),
960                Err(TryRecvError::Disconnected) => break,
961                Err(TryRecvError::Empty) => {
962                    self.waiting_streams.push(ctx.wait_for_recv(stream_id));
963                    break;
964                },
965            }
966        }
967
968        Ok(())
969    }
970
971    /// Tests `qconn` for either a local or peer error and increments
972    /// the associated HTTP/3 or QUIC error counter.
973    fn record_quiche_error(qconn: &mut QuicheConnection, metrics: &impl Metrics) {
974        // split metrics between local/peer and QUIC/HTTP/3 level errors
975        if let Some(err) = qconn.local_error() {
976            if err.is_app {
977                metrics.local_h3_conn_close_error_count(err.error_code.into())
978            } else {
979                metrics.local_quic_conn_close_error_count(err.error_code.into())
980            }
981            .inc();
982        } else if let Some(err) = qconn.peer_error() {
983            if err.is_app {
984                metrics.peer_h3_conn_close_error_count(err.error_code.into())
985            } else {
986                metrics.peer_quic_conn_close_error_count(err.error_code.into())
987            }
988            .inc();
989        }
990    }
991}
992
993impl<H: DriverHooks> ApplicationOverQuic for H3Driver<H> {
994    fn on_conn_established(
995        &mut self, quiche_conn: &mut QuicheConnection,
996        handshake_info: &HandshakeInfo,
997    ) -> QuicResult<()> {
998        let conn = h3::Connection::with_transport(quiche_conn, &self.h3_config)?;
999        self.conn = Some(conn);
1000
1001        H::conn_established(self, quiche_conn, handshake_info)?;
1002        Ok(())
1003    }
1004
1005    #[inline]
1006    fn should_act(&self) -> bool {
1007        self.conn.is_some()
1008    }
1009
1010    #[inline]
1011    fn buffer(&mut self) -> &mut [u8] {
1012        &mut self.pooled_buf
1013    }
1014
1015    /// Poll the underlying [`quiche::h3::Connection`] for
1016    /// [`quiche::h3::Event`]s and DATAGRAMs, delegating processing to
1017    /// `Self::process_read_event`.
1018    ///
1019    /// If a DATAGRAM is found, it is sent to the receiver on its channel.
1020    fn process_reads(&mut self, qconn: &mut QuicheConnection) -> QuicResult<()> {
1021        loop {
1022            match self.conn_mut()?.poll(qconn) {
1023                Ok((stream_id, event)) =>
1024                    self.process_read_event(qconn, stream_id, event)?,
1025                Err(h3::Error::Done) => break,
1026                Err(err) => {
1027                    // Don't bubble error up, instead keep the worker loop going
1028                    // until quiche reports the connection is
1029                    // closed.
1030                    log::debug!("connection closed due to h3 protocol error"; "error"=>?err);
1031                    return Ok(());
1032                },
1033            };
1034        }
1035
1036        self.process_available_dgrams(qconn)?;
1037        Ok(())
1038    }
1039
1040    /// Write as much data as possible into the [`quiche::h3::Connection`] from
1041    /// all sources. This will attempt to write any queued frames into their
1042    /// respective streams, if writable.
1043    fn process_writes(&mut self, qconn: &mut QuicheConnection) -> QuicResult<()> {
1044        while let Some(stream_id) = qconn.stream_writable_next() {
1045            self.process_writable_stream(qconn, stream_id)?;
1046        }
1047
1048        // Also optimistically check for any ready streams
1049        while let Some(Some(ready)) = self.waiting_streams.next().now_or_never() {
1050            self.upstream_ready(qconn, ready)?;
1051        }
1052
1053        Ok(())
1054    }
1055
1056    /// Reports connection-level error metrics and forwards
1057    /// IOWorker errors to the associated [H3Controller].
1058    fn on_conn_close<M: Metrics>(
1059        &mut self, quiche_conn: &mut QuicheConnection, metrics: &M,
1060        work_loop_result: &QuicResult<()>,
1061    ) {
1062        let max_stream_seen = self.max_stream_seen;
1063        metrics
1064            .maximum_writable_streams()
1065            .observe(max_stream_seen as f64);
1066
1067        let Err(work_loop_error) = work_loop_result else {
1068            return;
1069        };
1070
1071        Self::record_quiche_error(quiche_conn, metrics);
1072
1073        let Some(h3_err) = work_loop_error.downcast_ref::<H3ConnectionError>()
1074        else {
1075            log::error!("Found non-H3ConnectionError"; "error" => %work_loop_error);
1076            return;
1077        };
1078
1079        if matches!(h3_err, H3ConnectionError::ControllerWentAway) {
1080            // Inform client that we won't (can't) respond anymore
1081            let _ =
1082                quiche_conn.close(true, h3::WireErrorCode::NoError as u64, &[]);
1083            return;
1084        }
1085
1086        if let Some(ev) = H3Event::from_error(h3_err) {
1087            let _ = self.h3_event_sender.send(ev.into());
1088            #[expect(clippy::needless_return)]
1089            return; // avoid accidental fallthrough in the future
1090        }
1091    }
1092
1093    /// Wait for incoming data from the [H3Controller]. The next iteration of
1094    /// the I/O loop commences when one of the `select!`ed futures triggers.
1095    #[inline]
1096    async fn wait_for_data(
1097        &mut self, qconn: &mut QuicheConnection,
1098    ) -> QuicResult<()> {
1099        select! {
1100            biased;
1101            Some(ready) = self.waiting_streams.next() => self.upstream_ready(qconn, ready),
1102            Some(dgram) = self.dgram_recv.recv() => self.dgram_ready(qconn, dgram),
1103            Some(cmd) = self.cmd_recv.recv() => H::conn_command(self, qconn, cmd),
1104            r = self.hooks.wait_for_action(qconn), if H::has_wait_action(self) => r,
1105        }?;
1106
1107        // Make sure controller is not starved, but also not prioritized in the
1108        // biased select. So poll it last, however also perform a try_recv
1109        // each iteration.
1110        if let Ok(cmd) = self.cmd_recv.try_recv() {
1111            H::conn_command(self, qconn, cmd)?;
1112        }
1113
1114        Ok(())
1115    }
1116}
1117
1118impl<H: DriverHooks> Drop for H3Driver<H> {
1119    fn drop(&mut self) {
1120        for stream in self.stream_map.values() {
1121            stream
1122                .audit_stats
1123                .set_recvd_stream_fin(StreamClosureKind::Implicit);
1124        }
1125    }
1126}
1127
1128/// [`H3Command`]s are sent by the [H3Controller] to alter the [H3Driver]'s
1129/// state.
1130///
1131/// Both [ServerH3Driver] and [ClientH3Driver] may extend this enum with
1132/// endpoint-specific variants.
1133#[derive(Debug)]
1134pub enum H3Command {
1135    /// A connection-level command that executes directly on the
1136    /// [`quiche::Connection`].
1137    QuicCmd(QuicCommand),
1138    /// Send a GOAWAY frame to the peer to initiate a graceful connection
1139    /// shutdown.
1140    GoAway,
1141}
1142
1143/// Sends [`H3Command`]s to an [H3Driver]. The sender is typed and internally
1144/// wraps instances of `T` in the appropriate `H3Command` variant.
1145pub struct RequestSender<C, T> {
1146    sender: UnboundedSender<C>,
1147    // Required to work around dangling type parameter
1148    _r: PhantomData<fn() -> T>,
1149}
1150
1151impl<C, T: Into<C>> RequestSender<C, T> {
1152    /// Send a request to the [H3Driver]. This can only fail if the driver is
1153    /// gone.
1154    #[inline(always)]
1155    pub fn send(&self, v: T) -> Result<(), mpsc::error::SendError<C>> {
1156        self.sender.send(v.into())
1157    }
1158}
1159
1160impl<C, T> Clone for RequestSender<C, T> {
1161    fn clone(&self) -> Self {
1162        Self {
1163            sender: self.sender.clone(),
1164            _r: Default::default(),
1165        }
1166    }
1167}
1168
1169/// Interface to communicate with a paired [H3Driver].
1170///
1171/// An [H3Controller] receives [`H3Event`]s from its driver, which must be
1172/// consumed by the application built on top of the driver to react to incoming
1173/// events. The controller also allows the application to send ad-hoc
1174/// [`H3Command`]s to the driver, which will be processed when the driver waits
1175/// for incoming data.
1176pub struct H3Controller<H: DriverHooks> {
1177    /// Sends [`H3Command`]s to the [H3Driver], like [`QuicCommand`]s or
1178    /// outbound HTTP requests.
1179    cmd_sender: UnboundedSender<H::Command>,
1180    /// Receives [`H3Event`]s from the [H3Driver]. Can be extracted and
1181    /// used independently of the [H3Controller].
1182    h3_event_recv: Option<UnboundedReceiver<H::Event>>,
1183}
1184
1185impl<H: DriverHooks> H3Controller<H> {
1186    /// Gets a mut reference to the [`H3Event`] receiver for the paired
1187    /// [H3Driver].
1188    pub fn event_receiver_mut(&mut self) -> &mut UnboundedReceiver<H::Event> {
1189        self.h3_event_recv
1190            .as_mut()
1191            .expect("No event receiver on H3Controller")
1192    }
1193
1194    /// Takes the [`H3Event`] receiver for the paired [H3Driver].
1195    pub fn take_event_receiver(&mut self) -> UnboundedReceiver<H::Event> {
1196        self.h3_event_recv
1197            .take()
1198            .expect("No event receiver on H3Controller")
1199    }
1200
1201    /// Creates a [`QuicCommand`] sender for the paired [H3Driver].
1202    pub fn cmd_sender(&self) -> RequestSender<H::Command, QuicCommand> {
1203        RequestSender {
1204            sender: self.cmd_sender.clone(),
1205            _r: Default::default(),
1206        }
1207    }
1208
1209    /// Sends a GOAWAY frame to initiate a graceful connection shutdown.
1210    pub fn send_goaway(&self) {
1211        let _ = self.cmd_sender.send(H3Command::GoAway.into());
1212    }
1213}