tokio_quiche/http3/driver/
mod.rs

1// Copyright (C) 2025, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27mod client;
28/// Wrapper for running HTTP/3 connections.
29pub mod connection;
30mod datagram;
31// `DriverHooks` must stay private to prevent users from creating their own
32// H3Drivers.
33mod hooks;
34mod server;
35mod streams;
36
37use std::collections::BTreeMap;
38use std::error::Error;
39use std::fmt;
40use std::marker::PhantomData;
41use std::sync::Arc;
42use std::time::Instant;
43
44use datagram_socket::StreamClosureKind;
45use foundations::telemetry::log;
46use futures::FutureExt;
47use futures_util::stream::FuturesUnordered;
48use quiche::h3;
49use tokio::select;
50use tokio::sync::mpsc;
51use tokio::sync::mpsc::error::TryRecvError;
52use tokio::sync::mpsc::error::TrySendError;
53use tokio::sync::mpsc::UnboundedReceiver;
54use tokio::sync::mpsc::UnboundedSender;
55use tokio_stream::StreamExt;
56use tokio_util::sync::PollSender;
57
58use self::hooks::DriverHooks;
59use self::hooks::InboundHeaders;
60use self::streams::FlowCtx;
61use self::streams::HaveUpstreamCapacity;
62use self::streams::ReceivedDownstreamData;
63use self::streams::StreamCtx;
64use self::streams::StreamReady;
65use self::streams::WaitForDownstreamData;
66use self::streams::WaitForStream;
67use self::streams::WaitForUpstreamCapacity;
68use crate::buf_factory::BufFactory;
69use crate::buf_factory::PooledBuf;
70use crate::buf_factory::PooledDgram;
71use crate::http3::settings::Http3Settings;
72use crate::http3::H3AuditStats;
73use crate::metrics::Metrics;
74use crate::quic::HandshakeInfo;
75use crate::quic::QuicCommand;
76use crate::quic::QuicheConnection;
77use crate::ApplicationOverQuic;
78use crate::QuicResult;
79
80pub use self::client::ClientEventStream;
81pub use self::client::ClientH3Command;
82pub use self::client::ClientH3Controller;
83pub use self::client::ClientH3Driver;
84pub use self::client::ClientH3Event;
85pub use self::client::ClientRequestSender;
86pub use self::client::NewClientRequest;
87pub use self::server::ServerEventStream;
88pub use self::server::ServerH3Command;
89pub use self::server::ServerH3Controller;
90pub use self::server::ServerH3Driver;
91pub use self::server::ServerH3Event;
92
93// The default priority for HTTP/3 responses if the application didn't provide
94// one.
95const DEFAULT_PRIO: h3::Priority = h3::Priority::new(3, true);
96
97// For a stream use a channel with 16 entries, which works out to 16 * 64KB =
98// 1MB of max buffered data.
99#[cfg(not(any(test, debug_assertions)))]
100const STREAM_CAPACITY: usize = 16;
101#[cfg(any(test, debug_assertions))]
102const STREAM_CAPACITY: usize = 1; // Set to 1 to stress write_pending under test conditions
103
104// For *all* flows use a shared channel with 2048 entries, which works out
105// to 3MB of max buffered data at 1500 bytes per datagram.
106const FLOW_CAPACITY: usize = 2048;
107
108/// Used by a local task to send [`OutboundFrame`]s to a peer on the
109/// stream or flow associated with this channel.
110pub type OutboundFrameSender = PollSender<OutboundFrame>;
111
112/// Used internally to receive [`OutboundFrame`]s which should be sent to a peer
113/// on the stream or flow associated with this channel.
114type OutboundFrameStream = mpsc::Receiver<OutboundFrame>;
115
116/// Used internally to send [`InboundFrame`]s (data) from the peer to a local
117/// task on the stream or flow associated with this channel.
118type InboundFrameSender = PollSender<InboundFrame>;
119
120/// Used by a local task to receive [`InboundFrame`]s (data) on the stream or
121/// flow associated with this channel.
122pub type InboundFrameStream = mpsc::Receiver<InboundFrame>;
123
124/// The error type used internally in [H3Driver].
125///
126/// Note that [`ApplicationOverQuic`] errors are not exposed to users at this
127/// time. The type is public to document the failure modes in [H3Driver].
128#[derive(Debug, PartialEq, Eq)]
129#[non_exhaustive]
130pub enum H3ConnectionError {
131    /// The controller task was shut down and is no longer listening.
132    ControllerWentAway,
133    /// Other error at the connection, but not stream level.
134    H3(h3::Error),
135    /// Received a GOAWAY frame from the peer.
136    GoAway,
137    /// Received data for a stream that was closed or never opened.
138    NonexistentStream,
139    /// The server's post-accept timeout was hit.
140    /// The timeout can be configured in [`Http3Settings`].
141    PostAcceptTimeout,
142}
143
144impl From<h3::Error> for H3ConnectionError {
145    fn from(err: h3::Error) -> Self {
146        H3ConnectionError::H3(err)
147    }
148}
149
150impl From<quiche::Error> for H3ConnectionError {
151    fn from(err: quiche::Error) -> Self {
152        H3ConnectionError::H3(h3::Error::TransportError(err))
153    }
154}
155
156impl Error for H3ConnectionError {}
157
158impl fmt::Display for H3ConnectionError {
159    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
160        let s: &dyn fmt::Display = match self {
161            Self::ControllerWentAway => &"controller went away",
162            Self::H3(e) => e,
163            Self::GoAway => &"goaway",
164            Self::NonexistentStream => &"nonexistent stream",
165            Self::PostAcceptTimeout => &"post accept timeout hit",
166        };
167
168        write!(f, "H3ConnectionError: {s}")
169    }
170}
171
172type H3ConnectionResult<T> = Result<T, H3ConnectionError>;
173
174/// HTTP/3 headers that were received on a stream.
175///
176/// `recv` is used to read the message body, while `send` is used to transmit
177/// data back to the peer.
178pub struct IncomingH3Headers {
179    /// Stream ID of the frame.
180    pub stream_id: u64,
181    /// The actual [`h3::Header`]s which were received.
182    pub headers: Vec<h3::Header>,
183    /// An [`OutboundFrameSender`] for streaming body data to the peer. For
184    /// [ClientH3Driver], note that the request body can also be passed a
185    /// cloned sender via [`NewClientRequest`].
186    pub send: OutboundFrameSender,
187    /// An [`InboundFrameStream`] of body data received from the peer.
188    pub recv: InboundFrameStream,
189    /// Whether there is a body associated with the incoming headers.
190    pub read_fin: bool,
191    /// Handle to the [`H3AuditStats`] for the message's stream.
192    pub h3_audit_stats: Arc<H3AuditStats>,
193}
194
195impl fmt::Debug for IncomingH3Headers {
196    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
197        f.debug_struct("IncomingH3Headers")
198            .field("stream_id", &self.stream_id)
199            .field("headers", &self.headers)
200            .field("read_fin", &self.read_fin)
201            .field("h3_audit_stats", &self.h3_audit_stats)
202            .finish()
203    }
204}
205
206/// [`H3Event`]s are produced by an [H3Driver] to describe HTTP/3 state updates.
207///
208/// Both [ServerH3Driver] and [ClientH3Driver] may extend this enum with
209/// endpoint-specific variants. The events must be consumed by users of the
210/// drivers, like a higher-level `Server` or `Client` controller.
211#[derive(Debug)]
212pub enum H3Event {
213    /// A SETTINGS frame was received.
214    IncomingSettings {
215        /// Raw HTTP/3 setting pairs, in the order received from the peer.
216        settings: Vec<(u64, u64)>,
217    },
218
219    /// A HEADERS frame was received on the given stream. This is either a
220    /// request or a response depending on the perspective of the [`H3Event`]
221    /// receiver.
222    IncomingHeaders(IncomingH3Headers),
223
224    /// A DATAGRAM flow was created and associated with the given `flow_id`.
225    /// This event is fired before a HEADERS event for CONNECT[-UDP] requests.
226    NewFlow {
227        /// Flow ID of the new flow.
228        flow_id: u64,
229        /// An [`OutboundFrameSender`] for transmitting datagrams to the peer.
230        send: OutboundFrameSender,
231        /// An [`InboundFrameStream`] for receiving datagrams from the peer.
232        recv: InboundFrameStream,
233    },
234    /// A RST_STREAM frame was seen on the given `stream_id`. The user of the
235    /// driver should clean up any state allocated for this stream.
236    ResetStream { stream_id: u64 },
237    /// The connection has irrecoverably errored and is shutting down.
238    ConnectionError(h3::Error),
239    /// The connection has been shutdown, optionally due to an
240    /// [`H3ConnectionError`].
241    ConnectionShutdown(Option<H3ConnectionError>),
242    /// Body data has been received over a stream.
243    BodyBytesReceived {
244        /// Stream ID of the body data.
245        stream_id: u64,
246        /// Number of bytes received.
247        num_bytes: u64,
248        /// Whether the stream is finished and won't yield any more data.
249        fin: bool,
250    },
251    /// The stream has been closed. This is used to signal stream closures that
252    /// don't result from RST_STREAM frames, unlike the
253    /// [`H3Event::ResetStream`] variant.
254    StreamClosed { stream_id: u64 },
255}
256
257impl H3Event {
258    /// Generates an event from an applicable [`H3ConnectionError`].
259    fn from_error(err: &H3ConnectionError) -> Option<Self> {
260        Some(match err {
261            H3ConnectionError::H3(e) => Self::ConnectionError(*e),
262            H3ConnectionError::PostAcceptTimeout => Self::ConnectionShutdown(
263                Some(H3ConnectionError::PostAcceptTimeout),
264            ),
265            _ => return None,
266        })
267    }
268}
269
270/// An [`OutboundFrame`] is a data frame that should be sent from a local task
271/// to a peer over a [`quiche::h3::Connection`].
272///
273/// This is used, for example, to send response body data to a peer, or proxied
274/// UDP datagrams.
275#[derive(Debug)]
276pub enum OutboundFrame {
277    /// Response headers to be sent to the peer, with optional priority.
278    Headers(Vec<h3::Header>, Option<quiche::h3::Priority>),
279    /// Response body/CONNECT downstream data plus FIN flag.
280    #[cfg(feature = "zero-copy")]
281    Body(crate::buf_factory::QuicheBuf, bool),
282    /// Response body/CONNECT downstream data plus FIN flag.
283    #[cfg(not(feature = "zero-copy"))]
284    Body(PooledBuf, bool),
285    /// CONNECT-UDP (DATAGRAM) downstream data plus flow ID.
286    Datagram(PooledDgram, u64),
287    /// An error encountered when serving the request. Stream should be closed.
288    PeerStreamError,
289    /// DATAGRAM flow explicitly closed.
290    FlowShutdown { flow_id: u64, stream_id: u64 },
291}
292
293impl OutboundFrame {
294    /// Creates a body frame with the provided buffer.
295    pub fn body(body: PooledBuf, fin: bool) -> Self {
296        #[cfg(feature = "zero-copy")]
297        let body = crate::buf_factory::QuicheBuf::new(body);
298
299        OutboundFrame::Body(body, fin)
300    }
301}
302
303/// An [`InboundFrame`] is a data frame that was received from the peer over a
304/// [`quiche::h3::Connection`]. This is used by peers to send body or datagrams
305/// to the local task.
306#[derive(Debug)]
307pub enum InboundFrame {
308    /// Request body/CONNECT upstream data plus FIN flag.
309    Body(PooledBuf, bool),
310    /// CONNECT-UDP (DATAGRAM) upstream data.
311    Datagram(PooledDgram),
312}
313
314/// A ready-made [`ApplicationOverQuic`] which can handle HTTP/3 and MASQUE.
315/// Depending on the `DriverHooks` in use, it powers either a client or a
316/// server.
317///
318/// Use the [ClientH3Driver] and [ServerH3Driver] aliases to access the
319/// respective driver types. The driver is passed into an I/O loop and
320/// communicates with the driver's user (e.g., an HTTP client or a server) via
321/// its associated [H3Controller]. The controller allows the application to both
322/// listen for [`H3Event`]s of note and send [`H3Command`]s into the I/O loop.
323pub struct H3Driver<H: DriverHooks> {
324    /// Configuration used to initialize `conn`. Created from [`Http3Settings`]
325    /// in the constructor.
326    h3_config: h3::Config,
327    /// The underlying HTTP/3 connection. Initialized in
328    /// `ApplicationOverQuic::on_conn_established`.
329    conn: Option<h3::Connection>,
330    /// State required by the client/server hooks.
331    hooks: H,
332    /// Sends [`H3Event`]s to the [H3Controller] paired with this driver.
333    h3_event_sender: mpsc::UnboundedSender<H::Event>,
334    /// Receives [`H3Command`]s from the [H3Controller] paired with this driver.
335    cmd_recv: mpsc::UnboundedReceiver<H::Command>,
336
337    /// A map of stream IDs to their [StreamCtx]. This is mainly used to
338    /// retrieve the internal Tokio channels associated with the stream.
339    stream_map: BTreeMap<u64, StreamCtx>,
340    /// A map of flow IDs to their [FlowCtx]. This is mainly used to retrieve
341    /// the internal Tokio channels associated with the flow.
342    flow_map: BTreeMap<u64, FlowCtx>,
343    /// Set of [`WaitForStream`] futures. A stream is added to this set if
344    /// we need to send to it and its channel is at capacity, or if we need
345    /// data from its channel and the channel is empty.
346    waiting_streams: FuturesUnordered<WaitForStream>,
347
348    /// Receives [`OutboundFrame`]s from all datagram flows on the connection.
349    dgram_recv: OutboundFrameStream,
350    /// Keeps the datagram channel open such that datagram flows can be created.
351    dgram_send: OutboundFrameSender,
352
353    /// The buffer used to interact with the underlying IoWorker.
354    pooled_buf: PooledBuf,
355    /// The maximum HTTP/3 stream ID seen on this connection.
356    max_stream_seen: u64,
357
358    /// Tracks whether we have forwarded the HTTP/3 SETTINGS frame
359    /// to the [H3Controller] once.
360    settings_received_and_forwarded: bool,
361}
362
363impl<H: DriverHooks> H3Driver<H> {
364    /// Builds a new [H3Driver] and an associated [H3Controller].
365    ///
366    /// The driver should then be passed to
367    /// [`InitialQuicConnection`](crate::InitialQuicConnection)'s `start`
368    /// method.
369    pub fn new(http3_settings: Http3Settings) -> (Self, H3Controller<H>) {
370        let (dgram_send, dgram_recv) = mpsc::channel(FLOW_CAPACITY);
371        let (cmd_sender, cmd_recv) = mpsc::unbounded_channel();
372        let (h3_event_sender, h3_event_recv) = mpsc::unbounded_channel();
373
374        (
375            H3Driver {
376                h3_config: (&http3_settings).into(),
377                conn: None,
378                hooks: H::new(&http3_settings),
379                h3_event_sender,
380                cmd_recv,
381
382                stream_map: BTreeMap::new(),
383                flow_map: BTreeMap::new(),
384
385                dgram_recv,
386                dgram_send: PollSender::new(dgram_send),
387                pooled_buf: BufFactory::get_max_buf(),
388                max_stream_seen: 0,
389
390                waiting_streams: FuturesUnordered::new(),
391
392                settings_received_and_forwarded: false,
393            },
394            H3Controller {
395                cmd_sender,
396                h3_event_recv: Some(h3_event_recv),
397            },
398        )
399    }
400
401    /// Retrieve the [FlowCtx] associated with the given `flow_id`. If no
402    /// context is found, a new one will be created.
403    fn get_or_insert_flow(
404        &mut self, flow_id: u64,
405    ) -> H3ConnectionResult<&mut FlowCtx> {
406        use std::collections::btree_map::Entry;
407        Ok(match self.flow_map.entry(flow_id) {
408            Entry::Vacant(e) => {
409                // This is a datagram for a new flow we haven't seen before
410                let (flow, recv) = FlowCtx::new(FLOW_CAPACITY);
411                let flow_req = H3Event::NewFlow {
412                    flow_id,
413                    recv,
414                    send: self.dgram_send.clone(),
415                };
416                self.h3_event_sender
417                    .send(flow_req.into())
418                    .map_err(|_| H3ConnectionError::ControllerWentAway)?;
419                e.insert(flow)
420            },
421            Entry::Occupied(e) => e.into_mut(),
422        })
423    }
424
425    /// Adds a [StreamCtx] to the stream map with the given `stream_id`.
426    fn insert_stream(&mut self, stream_id: u64, ctx: StreamCtx) {
427        self.stream_map.insert(stream_id, ctx);
428        self.max_stream_seen = self.max_stream_seen.max(stream_id);
429    }
430
431    /// Fetches body chunks from the [`quiche::h3::Connection`] and forwards
432    /// them to the stream's associated [`InboundFrameStream`].
433    fn process_h3_data(
434        &mut self, qconn: &mut QuicheConnection, stream_id: u64,
435    ) -> H3ConnectionResult<()> {
436        // Split self borrow between conn and stream_map
437        let conn = self.conn.as_mut().ok_or(Self::connection_not_present())?;
438        let ctx = self
439            .stream_map
440            .get_mut(&stream_id)
441            .ok_or(H3ConnectionError::NonexistentStream)?;
442
443        enum StreamStatus {
444            Done { close: bool },
445            Blocked,
446        }
447
448        let status = loop {
449            let Some(sender) = ctx.send.as_ref().and_then(PollSender::get_ref)
450            else {
451                // already waiting for capacity
452                break StreamStatus::Done { close: false };
453            };
454
455            let permit = match sender.try_reserve() {
456                Ok(permit) => permit,
457                Err(TrySendError::Closed(())) => {
458                    break StreamStatus::Done {
459                        close: ctx.fin_sent && ctx.fin_recv,
460                    };
461                },
462                Err(TrySendError::Full(())) => {
463                    if ctx.fin_recv || qconn.stream_readable(stream_id) {
464                        break StreamStatus::Blocked;
465                    }
466                    break StreamStatus::Done { close: false };
467                },
468            };
469
470            if ctx.fin_recv {
471                // Signal end-of-body to upstream
472                permit
473                    .send(InboundFrame::Body(BufFactory::get_empty_buf(), true));
474                break StreamStatus::Done {
475                    close: ctx.fin_sent,
476                };
477            }
478
479            match conn.recv_body(qconn, stream_id, &mut self.pooled_buf) {
480                Ok(n) => {
481                    let mut body = std::mem::replace(
482                        &mut self.pooled_buf,
483                        BufFactory::get_max_buf(),
484                    );
485                    body.truncate(n);
486
487                    ctx.audit_stats.add_downstream_bytes_recvd(n as u64);
488                    let event = H3Event::BodyBytesReceived {
489                        stream_id,
490                        num_bytes: n as u64,
491                        fin: false,
492                    };
493                    let _ = self.h3_event_sender.send(event.into());
494
495                    permit.send(InboundFrame::Body(body, false));
496                },
497                Err(h3::Error::Done) =>
498                    break StreamStatus::Done { close: false },
499                Err(_) => break StreamStatus::Done { close: true },
500            }
501        };
502
503        match status {
504            StreamStatus::Done { close } => {
505                if close {
506                    return self.finish_stream(qconn, stream_id, None, None);
507                }
508
509                // The QUIC stream is finished, manually invoke `process_h3_fin`
510                // in case `h3::poll()` is never called again.
511                //
512                // Note that this case will not conflict with StreamStatus::Done
513                // being returned due to the body channel being
514                // blocked. qconn.stream_finished() will guarantee
515                // that we've fully parsed the body as it only returns true
516                // if we've seen a Fin for the read half of the stream.
517                if !ctx.fin_recv && qconn.stream_finished(stream_id) {
518                    return self.process_h3_fin(qconn, stream_id);
519                }
520            },
521            StreamStatus::Blocked => {
522                self.waiting_streams.push(ctx.wait_for_send(stream_id));
523            },
524        }
525
526        Ok(())
527    }
528
529    /// Processes an end-of-stream event from the [`quiche::h3::Connection`].
530    fn process_h3_fin(
531        &mut self, qconn: &mut QuicheConnection, stream_id: u64,
532    ) -> H3ConnectionResult<()> {
533        let ctx = self.stream_map.get_mut(&stream_id).filter(|c| !c.fin_recv);
534        let Some(ctx) = ctx else {
535            // Stream is already finished, nothing to do
536            return Ok(());
537        };
538
539        ctx.fin_recv = true;
540        ctx.audit_stats
541            .set_recvd_stream_fin(StreamClosureKind::Explicit);
542
543        // It's important to send this H3Event before process_h3_data so that
544        // a server can (potentially) generate the control response before the
545        // corresponding receiver drops.
546        let event = H3Event::BodyBytesReceived {
547            stream_id,
548            num_bytes: 0,
549            fin: true,
550        };
551        let _ = self.h3_event_sender.send(event.into());
552
553        // Communicate fin to upstream. Since `ctx.fin_recv` is true now,
554        // there can't be a recursive loop.
555        self.process_h3_data(qconn, stream_id)
556    }
557
558    /// Processes a single [`quiche::h3::Event`] received from the underlying
559    /// [`quiche::h3::Connection`]. Some events are dispatched to helper
560    /// methods.
561    fn process_read_event(
562        &mut self, qconn: &mut QuicheConnection, stream_id: u64, event: h3::Event,
563    ) -> H3ConnectionResult<()> {
564        self.forward_settings()?;
565
566        match event {
567            // Requests/responses are exclusively handled by hooks.
568            h3::Event::Headers { list, more_frames } =>
569                H::headers_received(self, qconn, InboundHeaders {
570                    stream_id,
571                    headers: list,
572                    has_body: more_frames,
573                }),
574
575            h3::Event::Data => self.process_h3_data(qconn, stream_id),
576            h3::Event::Finished => self.process_h3_fin(qconn, stream_id),
577
578            h3::Event::Reset(code) => {
579                if let Some(ctx) = self.stream_map.get(&stream_id) {
580                    ctx.audit_stats.set_recvd_reset_stream_error_code(code as _);
581                }
582
583                self.h3_event_sender
584                    .send(H3Event::ResetStream { stream_id }.into())
585                    .map_err(|_| H3ConnectionError::ControllerWentAway)?;
586
587                self.finish_stream(qconn, stream_id, None, None)
588            },
589
590            h3::Event::PriorityUpdate => Ok(()),
591            h3::Event::GoAway => Err(H3ConnectionError::GoAway),
592        }
593    }
594
595    /// The SETTINGS frame can be received at any point, so we
596    /// need to check `peer_settings_raw` to decide if we've received it.
597    ///
598    /// Settings should only be sent once, so we generate a single event
599    /// when `peer_settings_raw` transitions from None to Some.
600    fn forward_settings(&mut self) -> H3ConnectionResult<()> {
601        if self.settings_received_and_forwarded {
602            return Ok(());
603        }
604
605        // capture the peer settings and forward it
606        if let Some(settings) = self.conn_mut()?.peer_settings_raw() {
607            let incoming_settings = H3Event::IncomingSettings {
608                settings: settings.to_vec(),
609            };
610
611            self.h3_event_sender
612                .send(incoming_settings.into())
613                .map_err(|_| H3ConnectionError::ControllerWentAway)?;
614
615            self.settings_received_and_forwarded = true;
616        }
617        Ok(())
618    }
619
620    /// Send an individual frame to the underlying [`quiche::h3::Connection`] to
621    /// be flushed at a later time.
622    ///
623    /// `Self::process_writes` will iterate over all writable streams and call
624    /// this method in a loop for each stream to send all writable packets.
625    fn process_write_frame(
626        conn: &mut h3::Connection, qconn: &mut QuicheConnection,
627        ctx: &mut StreamCtx,
628    ) -> h3::Result<()> {
629        let Some(frame) = &mut ctx.queued_frame else {
630            return Ok(());
631        };
632
633        let audit_stats = &ctx.audit_stats;
634        let stream_id = audit_stats.stream_id();
635
636        match frame {
637            OutboundFrame::Headers(headers, priority) => {
638                let prio = priority.as_ref().unwrap_or(&DEFAULT_PRIO);
639
640                let res = if ctx.initial_headers_sent {
641                    // Initial headers were already sent, send additional
642                    // headers now.
643                    conn.send_additional_headers_with_priority(
644                        qconn, stream_id, headers, prio, false, false,
645                    )
646                } else {
647                    // Send initial headers.
648                    conn.send_response_with_priority(
649                        qconn, stream_id, headers, prio, false,
650                    )
651                    .inspect(|_| ctx.initial_headers_sent = true)
652                };
653
654                if let Err(h3::Error::StreamBlocked) = res {
655                    ctx.first_full_headers_flush_fail_time
656                        .get_or_insert(Instant::now());
657                }
658
659                if res.is_ok() {
660                    if let Some(first) =
661                        ctx.first_full_headers_flush_fail_time.take()
662                    {
663                        ctx.audit_stats.add_header_flush_duration(
664                            Instant::now().duration_since(first),
665                        );
666                    }
667                }
668
669                res
670            },
671
672            OutboundFrame::Body(body, fin) => {
673                let len = body.as_ref().len();
674                if len == 0 && !*fin {
675                    // quiche doesn't allow sending an empty body when the fin
676                    // flag is not set
677                    return Ok(());
678                }
679                if *fin {
680                    // If this is the last body frame, drop the receiver in the
681                    // stream map to signal that we shouldn't receive any more
682                    // frames. NOTE: we can't use `mpsc::Receiver::close()`
683                    // due to an inconsistency in how tokio handles reading
684                    // from a closed mpsc channel https://github.com/tokio-rs/tokio/issues/7631
685                    ctx.recv = None;
686                }
687                #[cfg(feature = "zero-copy")]
688                let n = conn.send_body_zc(qconn, stream_id, body, *fin)?;
689
690                #[cfg(not(feature = "zero-copy"))]
691                let n = conn.send_body(qconn, stream_id, body, *fin)?;
692
693                audit_stats.add_downstream_bytes_sent(n as _);
694                if n != len {
695                    // Couldn't write the entire body, keep what remains for
696                    // future retry.
697                    #[cfg(not(feature = "zero-copy"))]
698                    body.pop_front(n);
699
700                    Err(h3::Error::StreamBlocked)
701                } else {
702                    if *fin {
703                        ctx.fin_sent = true;
704                        audit_stats
705                            .set_sent_stream_fin(StreamClosureKind::Explicit);
706                        if ctx.fin_recv {
707                            // Return a TransportError to trigger stream cleanup
708                            // instead of h3::Error::Done
709                            return Err(h3::Error::TransportError(
710                                quiche::Error::Done,
711                            ));
712                        }
713                    }
714                    Ok(())
715                }
716            },
717
718            OutboundFrame::PeerStreamError => Err(h3::Error::MessageError),
719
720            OutboundFrame::FlowShutdown { .. } => {
721                unreachable!("Only flows send shutdowns")
722            },
723
724            OutboundFrame::Datagram(..) => {
725                unreachable!("Only flows send datagrams")
726            },
727        }
728    }
729
730    /// Resumes reads or writes to the connection when a stream channel becomes
731    /// unblocked.
732    ///
733    /// If we were waiting for more data from a channel, we resume writing to
734    /// the connection. Otherwise, we were blocked on channel capacity and
735    /// continue reading from the connection. `Upstream` in this context is
736    /// the consumer of the stream.
737    fn upstream_ready(
738        &mut self, qconn: &mut QuicheConnection, ready: StreamReady,
739    ) -> H3ConnectionResult<()> {
740        match ready {
741            StreamReady::Downstream(r) => self.upstream_read_ready(qconn, r),
742            StreamReady::Upstream(r) => self.upstream_write_ready(qconn, r),
743        }
744    }
745
746    fn upstream_read_ready(
747        &mut self, qconn: &mut QuicheConnection,
748        read_ready: ReceivedDownstreamData,
749    ) -> H3ConnectionResult<()> {
750        let ReceivedDownstreamData {
751            stream_id,
752            chan,
753            data,
754        } = read_ready;
755
756        match self.stream_map.get_mut(&stream_id) {
757            None => Ok(()),
758            Some(stream) => {
759                stream.recv = Some(chan);
760                stream.queued_frame = data;
761                self.process_writable_stream(qconn, stream_id)
762            },
763        }
764    }
765
766    fn upstream_write_ready(
767        &mut self, qconn: &mut QuicheConnection,
768        write_ready: HaveUpstreamCapacity,
769    ) -> H3ConnectionResult<()> {
770        let HaveUpstreamCapacity {
771            stream_id,
772            mut chan,
773        } = write_ready;
774
775        match self.stream_map.get_mut(&stream_id) {
776            None => Ok(()),
777            Some(stream) => {
778                chan.abort_send(); // Have to do it to release the associated permit
779                stream.send = Some(chan);
780                self.process_h3_data(qconn, stream_id)
781            },
782        }
783    }
784
785    /// Processes all queued outbound datagrams from the `dgram_recv` channel.
786    fn dgram_ready(
787        &mut self, qconn: &mut QuicheConnection, frame: OutboundFrame,
788    ) -> H3ConnectionResult<()> {
789        let mut frame = Ok(frame);
790
791        loop {
792            match frame {
793                Ok(OutboundFrame::Datagram(dgram, flow_id)) => {
794                    // Drop datagrams if there is no capacity
795                    let _ = datagram::send_h3_dgram(qconn, flow_id, dgram);
796                },
797                Ok(OutboundFrame::FlowShutdown { flow_id, stream_id }) => {
798                    self.finish_stream(
799                        qconn,
800                        stream_id,
801                        Some(quiche::h3::WireErrorCode::NoError as u64),
802                        Some(quiche::h3::WireErrorCode::NoError as u64),
803                    )?;
804                    self.flow_map.remove(&flow_id);
805                    break;
806                },
807                Ok(_) => unreachable!("Flows can't send frame of other types"),
808                Err(TryRecvError::Empty) => break,
809                Err(TryRecvError::Disconnected) =>
810                    return Err(H3ConnectionError::ControllerWentAway),
811            }
812
813            frame = self.dgram_recv.try_recv();
814        }
815
816        Ok(())
817    }
818
819    /// Return a mutable reference to the driver's HTTP/3 connection.
820    ///
821    /// If the connection doesn't exist yet, this function returns
822    /// a `Self::connection_not_present()` error.
823    fn conn_mut(&mut self) -> H3ConnectionResult<&mut h3::Connection> {
824        self.conn.as_mut().ok_or(Self::connection_not_present())
825    }
826
827    /// Alias for [`quiche::Error::TlsFail`], which is used in the case where
828    /// this driver doesn't have an established HTTP/3 connection attached
829    /// to it yet.
830    const fn connection_not_present() -> H3ConnectionError {
831        H3ConnectionError::H3(h3::Error::TransportError(quiche::Error::TlsFail))
832    }
833
834    /// Removes a stream from the stream map if it exists. Also optionally sends
835    /// `RESET` or `STOP_SENDING` frames if `write` or `read` is set to an
836    /// error code, respectively.
837    fn finish_stream(
838        &mut self, qconn: &mut QuicheConnection, stream_id: u64,
839        read: Option<u64>, write: Option<u64>,
840    ) -> H3ConnectionResult<()> {
841        let Some(stream_ctx) = self.stream_map.remove(&stream_id) else {
842            return Ok(());
843        };
844
845        let audit_stats = &stream_ctx.audit_stats;
846
847        if let Some(err) = read {
848            audit_stats.set_sent_stop_sending_error_code(err as _);
849            let _ = qconn.stream_shutdown(stream_id, quiche::Shutdown::Read, err);
850        }
851
852        if let Some(err) = write {
853            audit_stats.set_sent_reset_stream_error_code(err as _);
854            let _ =
855                qconn.stream_shutdown(stream_id, quiche::Shutdown::Write, err);
856        }
857
858        // Find if the stream also has any pending futures associated with it
859        for pending in self.waiting_streams.iter_mut() {
860            match pending {
861                WaitForStream::Downstream(WaitForDownstreamData {
862                    stream_id: id,
863                    chan: Some(chan),
864                }) if stream_id == *id => {
865                    chan.close();
866                },
867                WaitForStream::Upstream(WaitForUpstreamCapacity {
868                    stream_id: id,
869                    chan: Some(chan),
870                }) if stream_id == *id => {
871                    chan.close();
872                },
873                _ => {},
874            }
875        }
876
877        // Close any DATAGRAM-proxying channels when we close the stream, if they
878        // exist
879        if let Some(mapped_flow_id) = stream_ctx.associated_dgram_flow_id {
880            self.flow_map.remove(&mapped_flow_id);
881        }
882
883        if qconn.is_server() {
884            // Signal the server to remove the stream from its map
885            let _ = self
886                .h3_event_sender
887                .send(H3Event::StreamClosed { stream_id }.into());
888        }
889
890        Ok(())
891    }
892
893    /// Handles a regular [`H3Command`]. May be called internally by
894    /// [DriverHooks] for non-endpoint-specific [`H3Command`]s.
895    fn handle_core_command(
896        &mut self, qconn: &mut QuicheConnection, cmd: H3Command,
897    ) -> H3ConnectionResult<()> {
898        match cmd {
899            H3Command::QuicCmd(cmd) => cmd.execute(qconn),
900            H3Command::GoAway => {
901                let max_id = self.max_stream_seen;
902                self.conn_mut()
903                    .expect("connection should be established")
904                    .send_goaway(qconn, max_id)?;
905            },
906        }
907        Ok(())
908    }
909}
910
911impl<H: DriverHooks> H3Driver<H> {
912    /// Reads all buffered datagrams out of `qconn` and distributes them to
913    /// their flow channels.
914    fn process_available_dgrams(
915        &mut self, qconn: &mut QuicheConnection,
916    ) -> H3ConnectionResult<()> {
917        loop {
918            match datagram::receive_h3_dgram(qconn) {
919                Ok((flow_id, dgram)) => {
920                    self.get_or_insert_flow(flow_id)?.send_best_effort(dgram);
921                },
922                Err(quiche::Error::Done) => return Ok(()),
923                Err(err) => return Err(H3ConnectionError::from(err)),
924            }
925        }
926    }
927
928    /// Flushes any queued-up frames for `stream_id` into `qconn` until either
929    /// there is no more capacity in `qconn` or no more frames to send.
930    fn process_writable_stream(
931        &mut self, qconn: &mut QuicheConnection, stream_id: u64,
932    ) -> H3ConnectionResult<()> {
933        // Split self borrow between conn and stream_map
934        let conn = self.conn.as_mut().ok_or(Self::connection_not_present())?;
935        let Some(ctx) = self.stream_map.get_mut(&stream_id) else {
936            return Ok(()); // Unknown stream_id
937        };
938
939        loop {
940            // Process each writable frame, queue the next frame for processing
941            // and shut down any errored streams.
942            match Self::process_write_frame(conn, qconn, ctx) {
943                Ok(()) => ctx.queued_frame = None,
944                Err(h3::Error::StreamBlocked | h3::Error::Done) => break,
945                Err(h3::Error::MessageError) => {
946                    return self.finish_stream(
947                        qconn,
948                        stream_id,
949                        Some(quiche::h3::WireErrorCode::MessageError as u64),
950                        Some(quiche::h3::WireErrorCode::MessageError as u64),
951                    );
952                },
953                Err(h3::Error::TransportError(quiche::Error::StreamStopped(
954                    e,
955                ))) => {
956                    ctx.audit_stats.set_recvd_stop_sending_error_code(e as i64);
957                    return self.finish_stream(qconn, stream_id, Some(e), None);
958                },
959                Err(h3::Error::TransportError(
960                    quiche::Error::InvalidStreamState(stream),
961                )) => {
962                    return self.finish_stream(qconn, stream, None, None);
963                },
964                Err(_) => {
965                    return self.finish_stream(qconn, stream_id, None, None);
966                },
967            }
968
969            let Some(recv) = ctx.recv.as_mut() else {
970                // This stream is already waiting for data or we wrote a fin and
971                // closed the channel.
972                return Ok(());
973            };
974
975            // Attempt to queue the next frame for processing. The corresponding
976            // sender is created at the same time as the `StreamCtx`
977            // and ultimately ends up in an `H3Body`. The body then
978            // determines which frames to send to the peer via
979            // this processing loop.
980            match recv.try_recv() {
981                Ok(frame) => ctx.queued_frame = Some(frame),
982                Err(TryRecvError::Disconnected) => break,
983                Err(TryRecvError::Empty) => {
984                    self.waiting_streams.push(ctx.wait_for_recv(stream_id));
985                    break;
986                },
987            }
988        }
989
990        Ok(())
991    }
992
993    /// Tests `qconn` for either a local or peer error and increments
994    /// the associated HTTP/3 or QUIC error counter.
995    fn record_quiche_error(qconn: &mut QuicheConnection, metrics: &impl Metrics) {
996        // split metrics between local/peer and QUIC/HTTP/3 level errors
997        if let Some(err) = qconn.local_error() {
998            if err.is_app {
999                metrics.local_h3_conn_close_error_count(err.error_code.into())
1000            } else {
1001                metrics.local_quic_conn_close_error_count(err.error_code.into())
1002            }
1003            .inc();
1004        } else if let Some(err) = qconn.peer_error() {
1005            if err.is_app {
1006                metrics.peer_h3_conn_close_error_count(err.error_code.into())
1007            } else {
1008                metrics.peer_quic_conn_close_error_count(err.error_code.into())
1009            }
1010            .inc();
1011        }
1012    }
1013}
1014
1015impl<H: DriverHooks> ApplicationOverQuic for H3Driver<H> {
1016    fn on_conn_established(
1017        &mut self, quiche_conn: &mut QuicheConnection,
1018        handshake_info: &HandshakeInfo,
1019    ) -> QuicResult<()> {
1020        let conn = h3::Connection::with_transport(quiche_conn, &self.h3_config)?;
1021        self.conn = Some(conn);
1022
1023        H::conn_established(self, quiche_conn, handshake_info)?;
1024        Ok(())
1025    }
1026
1027    #[inline]
1028    fn should_act(&self) -> bool {
1029        self.conn.is_some()
1030    }
1031
1032    #[inline]
1033    fn buffer(&mut self) -> &mut [u8] {
1034        &mut self.pooled_buf
1035    }
1036
1037    /// Poll the underlying [`quiche::h3::Connection`] for
1038    /// [`quiche::h3::Event`]s and DATAGRAMs, delegating processing to
1039    /// `Self::process_read_event`.
1040    ///
1041    /// If a DATAGRAM is found, it is sent to the receiver on its channel.
1042    fn process_reads(&mut self, qconn: &mut QuicheConnection) -> QuicResult<()> {
1043        loop {
1044            match self.conn_mut()?.poll(qconn) {
1045                Ok((stream_id, event)) =>
1046                    self.process_read_event(qconn, stream_id, event)?,
1047                Err(h3::Error::Done) => break,
1048                Err(err) => {
1049                    // Don't bubble error up, instead keep the worker loop going
1050                    // until quiche reports the connection is
1051                    // closed.
1052                    log::debug!("connection closed due to h3 protocol error"; "error"=>?err);
1053                    return Ok(());
1054                },
1055            };
1056        }
1057
1058        self.process_available_dgrams(qconn)?;
1059        Ok(())
1060    }
1061
1062    /// Write as much data as possible into the [`quiche::h3::Connection`] from
1063    /// all sources. This will attempt to write any queued frames into their
1064    /// respective streams, if writable.
1065    fn process_writes(&mut self, qconn: &mut QuicheConnection) -> QuicResult<()> {
1066        while let Some(stream_id) = qconn.stream_writable_next() {
1067            self.process_writable_stream(qconn, stream_id)?;
1068        }
1069
1070        // Also optimistically check for any ready streams
1071        while let Some(Some(ready)) = self.waiting_streams.next().now_or_never() {
1072            self.upstream_ready(qconn, ready)?;
1073        }
1074
1075        Ok(())
1076    }
1077
1078    /// Reports connection-level error metrics and forwards
1079    /// IOWorker errors to the associated [H3Controller].
1080    fn on_conn_close<M: Metrics>(
1081        &mut self, quiche_conn: &mut QuicheConnection, metrics: &M,
1082        work_loop_result: &QuicResult<()>,
1083    ) {
1084        let max_stream_seen = self.max_stream_seen;
1085        metrics
1086            .maximum_writable_streams()
1087            .observe(max_stream_seen as f64);
1088
1089        let Err(work_loop_error) = work_loop_result else {
1090            return;
1091        };
1092
1093        Self::record_quiche_error(quiche_conn, metrics);
1094
1095        let Some(h3_err) = work_loop_error.downcast_ref::<H3ConnectionError>()
1096        else {
1097            log::error!("Found non-H3ConnectionError"; "error" => %work_loop_error);
1098            return;
1099        };
1100
1101        if matches!(h3_err, H3ConnectionError::ControllerWentAway) {
1102            // Inform client that we won't (can't) respond anymore
1103            let _ =
1104                quiche_conn.close(true, h3::WireErrorCode::NoError as u64, &[]);
1105            return;
1106        }
1107
1108        if let Some(ev) = H3Event::from_error(h3_err) {
1109            let _ = self.h3_event_sender.send(ev.into());
1110            #[expect(clippy::needless_return)]
1111            return; // avoid accidental fallthrough in the future
1112        }
1113    }
1114
1115    /// Wait for incoming data from the [H3Controller]. The next iteration of
1116    /// the I/O loop commences when one of the `select!`ed futures triggers.
1117    #[inline]
1118    async fn wait_for_data(
1119        &mut self, qconn: &mut QuicheConnection,
1120    ) -> QuicResult<()> {
1121        select! {
1122            biased;
1123            Some(ready) = self.waiting_streams.next() => self.upstream_ready(qconn, ready),
1124            Some(dgram) = self.dgram_recv.recv() => self.dgram_ready(qconn, dgram),
1125            Some(cmd) = self.cmd_recv.recv() => H::conn_command(self, qconn, cmd),
1126            r = self.hooks.wait_for_action(qconn), if H::has_wait_action(self) => r,
1127        }?;
1128
1129        // Make sure controller is not starved, but also not prioritized in the
1130        // biased select. So poll it last, however also perform a try_recv
1131        // each iteration.
1132        if let Ok(cmd) = self.cmd_recv.try_recv() {
1133            H::conn_command(self, qconn, cmd)?;
1134        }
1135
1136        Ok(())
1137    }
1138}
1139
1140impl<H: DriverHooks> Drop for H3Driver<H> {
1141    fn drop(&mut self) {
1142        for stream in self.stream_map.values() {
1143            stream
1144                .audit_stats
1145                .set_recvd_stream_fin(StreamClosureKind::Implicit);
1146        }
1147    }
1148}
1149
1150/// [`H3Command`]s are sent by the [H3Controller] to alter the [H3Driver]'s
1151/// state.
1152///
1153/// Both [ServerH3Driver] and [ClientH3Driver] may extend this enum with
1154/// endpoint-specific variants.
1155#[derive(Debug)]
1156pub enum H3Command {
1157    /// A connection-level command that executes directly on the
1158    /// [`quiche::Connection`].
1159    QuicCmd(QuicCommand),
1160    /// Send a GOAWAY frame to the peer to initiate a graceful connection
1161    /// shutdown.
1162    GoAway,
1163}
1164
1165/// Sends [`H3Command`]s to an [H3Driver]. The sender is typed and internally
1166/// wraps instances of `T` in the appropriate `H3Command` variant.
1167pub struct RequestSender<C, T> {
1168    sender: UnboundedSender<C>,
1169    // Required to work around dangling type parameter
1170    _r: PhantomData<fn() -> T>,
1171}
1172
1173impl<C, T: Into<C>> RequestSender<C, T> {
1174    /// Send a request to the [H3Driver]. This can only fail if the driver is
1175    /// gone.
1176    #[inline(always)]
1177    pub fn send(&self, v: T) -> Result<(), mpsc::error::SendError<C>> {
1178        self.sender.send(v.into())
1179    }
1180}
1181
1182impl<C, T> Clone for RequestSender<C, T> {
1183    fn clone(&self) -> Self {
1184        Self {
1185            sender: self.sender.clone(),
1186            _r: Default::default(),
1187        }
1188    }
1189}
1190
1191/// Interface to communicate with a paired [H3Driver].
1192///
1193/// An [H3Controller] receives [`H3Event`]s from its driver, which must be
1194/// consumed by the application built on top of the driver to react to incoming
1195/// events. The controller also allows the application to send ad-hoc
1196/// [`H3Command`]s to the driver, which will be processed when the driver waits
1197/// for incoming data.
1198pub struct H3Controller<H: DriverHooks> {
1199    /// Sends [`H3Command`]s to the [H3Driver], like [`QuicCommand`]s or
1200    /// outbound HTTP requests.
1201    cmd_sender: UnboundedSender<H::Command>,
1202    /// Receives [`H3Event`]s from the [H3Driver]. Can be extracted and
1203    /// used independently of the [H3Controller].
1204    h3_event_recv: Option<UnboundedReceiver<H::Event>>,
1205}
1206
1207impl<H: DriverHooks> H3Controller<H> {
1208    /// Gets a mut reference to the [`H3Event`] receiver for the paired
1209    /// [H3Driver].
1210    pub fn event_receiver_mut(&mut self) -> &mut UnboundedReceiver<H::Event> {
1211        self.h3_event_recv
1212            .as_mut()
1213            .expect("No event receiver on H3Controller")
1214    }
1215
1216    /// Takes the [`H3Event`] receiver for the paired [H3Driver].
1217    pub fn take_event_receiver(&mut self) -> UnboundedReceiver<H::Event> {
1218        self.h3_event_recv
1219            .take()
1220            .expect("No event receiver on H3Controller")
1221    }
1222
1223    /// Creates a [`QuicCommand`] sender for the paired [H3Driver].
1224    pub fn cmd_sender(&self) -> RequestSender<H::Command, QuicCommand> {
1225        RequestSender {
1226            sender: self.cmd_sender.clone(),
1227            _r: Default::default(),
1228        }
1229    }
1230
1231    /// Sends a GOAWAY frame to initiate a graceful connection shutdown.
1232    pub fn send_goaway(&self) {
1233        let _ = self.cmd_sender.send(H3Command::GoAway.into());
1234    }
1235}