tokio_quiche/http3/driver/
mod.rs

1// Copyright (C) 2025, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27mod client;
28/// Wrapper for running HTTP/3 connections.
29pub mod connection;
30mod datagram;
31// `DriverHooks` must stay private to prevent users from creating their own
32// H3Drivers.
33mod hooks;
34mod server;
35mod streams;
36#[cfg(test)]
37pub mod test_utils;
38#[cfg(test)]
39mod tests;
40
41use std::collections::BTreeMap;
42use std::error::Error;
43use std::fmt;
44use std::marker::PhantomData;
45use std::sync::Arc;
46use std::time::Instant;
47
48use datagram_socket::StreamClosureKind;
49use foundations::telemetry::log;
50use futures::FutureExt;
51use futures_util::stream::FuturesUnordered;
52use quiche::h3;
53use tokio::select;
54use tokio::sync::mpsc;
55use tokio::sync::mpsc::error::TryRecvError;
56use tokio::sync::mpsc::error::TrySendError;
57use tokio::sync::mpsc::UnboundedReceiver;
58use tokio::sync::mpsc::UnboundedSender;
59use tokio_stream::StreamExt;
60use tokio_util::sync::PollSender;
61
62use self::hooks::DriverHooks;
63use self::hooks::InboundHeaders;
64use self::streams::FlowCtx;
65use self::streams::HaveUpstreamCapacity;
66use self::streams::ReceivedDownstreamData;
67use self::streams::StreamCtx;
68use self::streams::StreamReady;
69use self::streams::WaitForDownstreamData;
70use self::streams::WaitForStream;
71use self::streams::WaitForUpstreamCapacity;
72use crate::buf_factory::BufFactory;
73use crate::buf_factory::PooledBuf;
74use crate::buf_factory::PooledDgram;
75use crate::http3::settings::Http3Settings;
76use crate::http3::H3AuditStats;
77use crate::metrics::Metrics;
78use crate::quic::HandshakeInfo;
79use crate::quic::QuicCommand;
80use crate::quic::QuicheConnection;
81use crate::ApplicationOverQuic;
82use crate::QuicResult;
83
84pub use self::client::ClientEventStream;
85pub use self::client::ClientH3Command;
86pub use self::client::ClientH3Controller;
87pub use self::client::ClientH3Driver;
88pub use self::client::ClientH3Event;
89pub use self::client::ClientRequestSender;
90pub use self::client::NewClientRequest;
91pub use self::server::RawPriorityValue;
92pub use self::server::ServerEventStream;
93pub use self::server::ServerH3Command;
94pub use self::server::ServerH3Controller;
95pub use self::server::ServerH3Driver;
96pub use self::server::ServerH3Event;
97
98// The default priority for HTTP/3 responses if the application didn't provide
99// one.
100const DEFAULT_PRIO: h3::Priority = h3::Priority::new(3, true);
101
102// For a stream use a channel with 16 entries, which works out to 16 * 64KB =
103// 1MB of max buffered data.
104#[cfg(not(any(test, debug_assertions)))]
105const STREAM_CAPACITY: usize = 16;
106#[cfg(any(test, debug_assertions))]
107const STREAM_CAPACITY: usize = 1; // Set to 1 to stress write_pending under test conditions
108
109// For *all* flows use a shared channel with 2048 entries, which works out
110// to 3MB of max buffered data at 1500 bytes per datagram.
111const FLOW_CAPACITY: usize = 2048;
112
113/// Used by a local task to send [`OutboundFrame`]s to a peer on the
114/// stream or flow associated with this channel.
115pub type OutboundFrameSender = PollSender<OutboundFrame>;
116
117/// Used internally to receive [`OutboundFrame`]s which should be sent to a peer
118/// on the stream or flow associated with this channel.
119type OutboundFrameStream = mpsc::Receiver<OutboundFrame>;
120
121/// Used internally to send [`InboundFrame`]s (data) from the peer to a local
122/// task on the stream or flow associated with this channel.
123type InboundFrameSender = PollSender<InboundFrame>;
124
125/// Used by a local task to receive [`InboundFrame`]s (data) on the stream or
126/// flow associated with this channel.
127pub type InboundFrameStream = mpsc::Receiver<InboundFrame>;
128
129/// The error type used internally in [H3Driver].
130///
131/// Note that [`ApplicationOverQuic`] errors are not exposed to users at this
132/// time. The type is public to document the failure modes in [H3Driver].
133#[derive(Debug, PartialEq, Eq)]
134#[non_exhaustive]
135pub enum H3ConnectionError {
136    /// The controller task was shut down and is no longer listening.
137    ControllerWentAway,
138    /// Other error at the connection, but not stream level.
139    H3(h3::Error),
140    /// Received a GOAWAY frame from the peer.
141    GoAway,
142    /// Received data for a stream that was closed or never opened.
143    NonexistentStream,
144    /// The server's post-accept timeout was hit.
145    /// The timeout can be configured in [`Http3Settings`].
146    PostAcceptTimeout,
147}
148
149impl From<h3::Error> for H3ConnectionError {
150    fn from(err: h3::Error) -> Self {
151        H3ConnectionError::H3(err)
152    }
153}
154
155impl From<quiche::Error> for H3ConnectionError {
156    fn from(err: quiche::Error) -> Self {
157        H3ConnectionError::H3(h3::Error::TransportError(err))
158    }
159}
160
161impl Error for H3ConnectionError {}
162
163impl fmt::Display for H3ConnectionError {
164    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
165        let s: &dyn fmt::Display = match self {
166            Self::ControllerWentAway => &"controller went away",
167            Self::H3(e) => e,
168            Self::GoAway => &"goaway",
169            Self::NonexistentStream => &"nonexistent stream",
170            Self::PostAcceptTimeout => &"post accept timeout hit",
171        };
172
173        write!(f, "H3ConnectionError: {s}")
174    }
175}
176
177type H3ConnectionResult<T> = Result<T, H3ConnectionError>;
178
179/// HTTP/3 headers that were received on a stream.
180///
181/// `recv` is used to read the message body, while `send` is used to transmit
182/// data back to the peer.
183pub struct IncomingH3Headers {
184    /// Stream ID of the frame.
185    pub stream_id: u64,
186    /// The actual [`h3::Header`]s which were received.
187    pub headers: Vec<h3::Header>,
188    /// An [`OutboundFrameSender`] for streaming body data to the peer. For
189    /// [ClientH3Driver], note that the request body can also be passed a
190    /// cloned sender via [`NewClientRequest`].
191    pub send: OutboundFrameSender,
192    /// An [`InboundFrameStream`] of body data received from the peer.
193    pub recv: InboundFrameStream,
194    /// Whether there is a body associated with the incoming headers.
195    pub read_fin: bool,
196    /// Handle to the [`H3AuditStats`] for the message's stream.
197    pub h3_audit_stats: Arc<H3AuditStats>,
198}
199
200impl fmt::Debug for IncomingH3Headers {
201    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
202        f.debug_struct("IncomingH3Headers")
203            .field("stream_id", &self.stream_id)
204            .field("headers", &self.headers)
205            .field("read_fin", &self.read_fin)
206            .field("h3_audit_stats", &self.h3_audit_stats)
207            .finish()
208    }
209}
210
211/// [`H3Event`]s are produced by an [H3Driver] to describe HTTP/3 state updates.
212///
213/// Both [ServerH3Driver] and [ClientH3Driver] may extend this enum with
214/// endpoint-specific variants. The events must be consumed by users of the
215/// drivers, like a higher-level `Server` or `Client` controller.
216#[derive(Debug)]
217pub enum H3Event {
218    /// A SETTINGS frame was received.
219    IncomingSettings {
220        /// Raw HTTP/3 setting pairs, in the order received from the peer.
221        settings: Vec<(u64, u64)>,
222    },
223
224    /// A HEADERS frame was received on the given stream. This is either a
225    /// request or a response depending on the perspective of the [`H3Event`]
226    /// receiver.
227    IncomingHeaders(IncomingH3Headers),
228
229    /// A DATAGRAM flow was created and associated with the given `flow_id`.
230    /// This event is fired before a HEADERS event for CONNECT[-UDP] requests.
231    NewFlow {
232        /// Flow ID of the new flow.
233        flow_id: u64,
234        /// An [`OutboundFrameSender`] for transmitting datagrams to the peer.
235        send: OutboundFrameSender,
236        /// An [`InboundFrameStream`] for receiving datagrams from the peer.
237        recv: InboundFrameStream,
238    },
239    /// A RST_STREAM frame was seen on the given `stream_id`. The user of the
240    /// driver should clean up any state allocated for this stream.
241    ResetStream { stream_id: u64 },
242    /// The connection has irrecoverably errored and is shutting down.
243    ConnectionError(h3::Error),
244    /// The connection has been shutdown, optionally due to an
245    /// [`H3ConnectionError`].
246    ConnectionShutdown(Option<H3ConnectionError>),
247    /// Body data has been received over a stream.
248    BodyBytesReceived {
249        /// Stream ID of the body data.
250        stream_id: u64,
251        /// Number of bytes received.
252        num_bytes: u64,
253        /// Whether the stream is finished and won't yield any more data.
254        fin: bool,
255    },
256    /// The stream has been closed. This is used to signal stream closures that
257    /// don't result from RST_STREAM frames, unlike the
258    /// [`H3Event::ResetStream`] variant.
259    StreamClosed { stream_id: u64 },
260}
261
262impl H3Event {
263    /// Generates an event from an applicable [`H3ConnectionError`].
264    fn from_error(err: &H3ConnectionError) -> Option<Self> {
265        Some(match err {
266            H3ConnectionError::H3(e) => Self::ConnectionError(*e),
267            H3ConnectionError::PostAcceptTimeout => Self::ConnectionShutdown(
268                Some(H3ConnectionError::PostAcceptTimeout),
269            ),
270            _ => return None,
271        })
272    }
273}
274
275/// An [`OutboundFrame`] is a data frame that should be sent from a local task
276/// to a peer over a [`quiche::h3::Connection`].
277///
278/// This is used, for example, to send response body data to a peer, or proxied
279/// UDP datagrams.
280#[derive(Debug)]
281pub enum OutboundFrame {
282    /// Response headers to be sent to the peer, with optional priority.
283    Headers(Vec<h3::Header>, Option<quiche::h3::Priority>),
284    /// Response body/CONNECT downstream data plus FIN flag.
285    #[cfg(feature = "zero-copy")]
286    Body(crate::buf_factory::QuicheBuf, bool),
287    /// Response body/CONNECT downstream data plus FIN flag.
288    #[cfg(not(feature = "zero-copy"))]
289    Body(PooledBuf, bool),
290    /// CONNECT-UDP (DATAGRAM) downstream data plus flow ID.
291    Datagram(PooledDgram, u64),
292    /// Close the stream with a trailers, with optional priority.
293    Trailers(Vec<h3::Header>, Option<quiche::h3::Priority>),
294    /// An error encountered when serving the request. Stream should be closed.
295    PeerStreamError,
296    /// DATAGRAM flow explicitly closed.
297    FlowShutdown { flow_id: u64, stream_id: u64 },
298}
299
300impl OutboundFrame {
301    /// Creates a body frame with the provided buffer.
302    pub fn body(body: PooledBuf, fin: bool) -> Self {
303        #[cfg(feature = "zero-copy")]
304        let body = crate::buf_factory::QuicheBuf::new(body);
305
306        OutboundFrame::Body(body, fin)
307    }
308}
309
310/// An [`InboundFrame`] is a data frame that was received from the peer over a
311/// [`quiche::h3::Connection`]. This is used by peers to send body or datagrams
312/// to the local task.
313#[derive(Debug)]
314pub enum InboundFrame {
315    /// Request body/CONNECT upstream data plus FIN flag.
316    Body(PooledBuf, bool),
317    /// CONNECT-UDP (DATAGRAM) upstream data.
318    Datagram(PooledDgram),
319}
320
321/// A ready-made [`ApplicationOverQuic`] which can handle HTTP/3 and MASQUE.
322/// Depending on the `DriverHooks` in use, it powers either a client or a
323/// server.
324///
325/// Use the [ClientH3Driver] and [ServerH3Driver] aliases to access the
326/// respective driver types. The driver is passed into an I/O loop and
327/// communicates with the driver's user (e.g., an HTTP client or a server) via
328/// its associated [H3Controller]. The controller allows the application to both
329/// listen for [`H3Event`]s of note and send [`H3Command`]s into the I/O loop.
330pub struct H3Driver<H: DriverHooks> {
331    /// Configuration used to initialize `conn`. Created from [`Http3Settings`]
332    /// in the constructor.
333    h3_config: h3::Config,
334    /// The underlying HTTP/3 connection. Initialized in
335    /// `ApplicationOverQuic::on_conn_established`.
336    conn: Option<h3::Connection>,
337    /// State required by the client/server hooks.
338    hooks: H,
339    /// Sends [`H3Event`]s to the [H3Controller] paired with this driver.
340    h3_event_sender: mpsc::UnboundedSender<H::Event>,
341    /// Receives [`H3Command`]s from the [H3Controller] paired with this driver.
342    cmd_recv: mpsc::UnboundedReceiver<H::Command>,
343
344    /// A map of stream IDs to their [StreamCtx]. This is mainly used to
345    /// retrieve the internal Tokio channels associated with the stream.
346    stream_map: BTreeMap<u64, StreamCtx>,
347    /// A map of flow IDs to their [FlowCtx]. This is mainly used to retrieve
348    /// the internal Tokio channels associated with the flow.
349    flow_map: BTreeMap<u64, FlowCtx>,
350    /// Set of [`WaitForStream`] futures. A stream is added to this set if
351    /// we need to send to it and its channel is at capacity, or if we need
352    /// data from its channel and the channel is empty.
353    waiting_streams: FuturesUnordered<WaitForStream>,
354
355    /// Receives [`OutboundFrame`]s from all datagram flows on the connection.
356    dgram_recv: OutboundFrameStream,
357    /// Keeps the datagram channel open such that datagram flows can be created.
358    dgram_send: OutboundFrameSender,
359
360    /// The buffer used to interact with the underlying IoWorker.
361    pooled_buf: PooledBuf,
362    /// The maximum HTTP/3 stream ID seen on this connection.
363    max_stream_seen: u64,
364
365    /// Tracks whether we have forwarded the HTTP/3 SETTINGS frame
366    /// to the [H3Controller] once.
367    settings_received_and_forwarded: bool,
368}
369
370impl<H: DriverHooks> H3Driver<H> {
371    /// Builds a new [H3Driver] and an associated [H3Controller].
372    ///
373    /// The driver should then be passed to
374    /// [`InitialQuicConnection`](crate::InitialQuicConnection)'s `start`
375    /// method.
376    pub fn new(http3_settings: Http3Settings) -> (Self, H3Controller<H>) {
377        let (dgram_send, dgram_recv) = mpsc::channel(FLOW_CAPACITY);
378        let (cmd_sender, cmd_recv) = mpsc::unbounded_channel();
379        let (h3_event_sender, h3_event_recv) = mpsc::unbounded_channel();
380
381        (
382            H3Driver {
383                h3_config: (&http3_settings).into(),
384                conn: None,
385                hooks: H::new(&http3_settings),
386                h3_event_sender,
387                cmd_recv,
388
389                stream_map: BTreeMap::new(),
390                flow_map: BTreeMap::new(),
391
392                dgram_recv,
393                dgram_send: PollSender::new(dgram_send),
394                pooled_buf: BufFactory::get_max_buf(),
395                max_stream_seen: 0,
396
397                waiting_streams: FuturesUnordered::new(),
398
399                settings_received_and_forwarded: false,
400            },
401            H3Controller {
402                cmd_sender,
403                h3_event_recv: Some(h3_event_recv),
404            },
405        )
406    }
407
408    /// Retrieve the [FlowCtx] associated with the given `flow_id`. If no
409    /// context is found, a new one will be created.
410    fn get_or_insert_flow(
411        &mut self, flow_id: u64,
412    ) -> H3ConnectionResult<&mut FlowCtx> {
413        use std::collections::btree_map::Entry;
414        Ok(match self.flow_map.entry(flow_id) {
415            Entry::Vacant(e) => {
416                // This is a datagram for a new flow we haven't seen before
417                let (flow, recv) = FlowCtx::new(FLOW_CAPACITY);
418                let flow_req = H3Event::NewFlow {
419                    flow_id,
420                    recv,
421                    send: self.dgram_send.clone(),
422                };
423                self.h3_event_sender
424                    .send(flow_req.into())
425                    .map_err(|_| H3ConnectionError::ControllerWentAway)?;
426                e.insert(flow)
427            },
428            Entry::Occupied(e) => e.into_mut(),
429        })
430    }
431
432    /// Adds a [StreamCtx] to the stream map with the given `stream_id`.
433    fn insert_stream(&mut self, stream_id: u64, ctx: StreamCtx) {
434        self.stream_map.insert(stream_id, ctx);
435        self.max_stream_seen = self.max_stream_seen.max(stream_id);
436    }
437
438    /// Fetches body chunks from the [`quiche::h3::Connection`] and forwards
439    /// them to the stream's associated [`InboundFrameStream`].
440    fn process_h3_data(
441        &mut self, qconn: &mut QuicheConnection, stream_id: u64,
442    ) -> H3ConnectionResult<()> {
443        // Split self borrow between conn and stream_map
444        let conn = self.conn.as_mut().ok_or(Self::connection_not_present())?;
445        let ctx = self
446            .stream_map
447            .get_mut(&stream_id)
448            .ok_or(H3ConnectionError::NonexistentStream)?;
449
450        enum StreamStatus {
451            Done { close: bool },
452            Reset { wire_err_code: u64 },
453            Blocked,
454        }
455
456        let status = loop {
457            let Some(sender) = ctx.send.as_ref().and_then(PollSender::get_ref)
458            else {
459                // already waiting for capacity
460                break StreamStatus::Done { close: false };
461            };
462
463            let try_reserve_result = sender.try_reserve();
464            let permit = match try_reserve_result {
465                Ok(permit) => permit,
466                Err(TrySendError::Closed(())) => {
467                    // The channel has closed before we delivered a fin or reset
468                    // to the application.
469                    if !ctx.fin_or_reset_recv &&
470                        ctx.associated_dgram_flow_id.is_none()
471                    // The channel might be closed if the stream was used to
472                    // initiate a datagram exchange.
473                    // TODO: ideally, the application would still shut down the
474                    // stream properly. Once applications code
475                    // is fixed, we can remove this check.
476                    {
477                        let err = h3::WireErrorCode::RequestCancelled as u64;
478                        let _ = qconn.stream_shutdown(
479                            stream_id,
480                            quiche::Shutdown::Read,
481                            err,
482                        );
483                        drop(try_reserve_result); // needed to drop the borrow on ctx.
484                        ctx.handle_sent_stop_sending(err);
485                        // TODO: should we send an H3Event event to
486                        // h3_event_sender? We can only get here if the app
487                        // actively closed or dropped
488                        // the channel so any event we send would be more for
489                        // logging or auditing
490                    }
491                    break StreamStatus::Done {
492                        close: ctx.both_directions_done(),
493                    };
494                },
495                Err(TrySendError::Full(())) => {
496                    if ctx.fin_or_reset_recv || qconn.stream_readable(stream_id) {
497                        break StreamStatus::Blocked;
498                    }
499                    break StreamStatus::Done { close: false };
500                },
501            };
502
503            if ctx.fin_or_reset_recv {
504                // Signal end-of-body to upstream
505                permit
506                    .send(InboundFrame::Body(BufFactory::get_empty_buf(), true));
507                break StreamStatus::Done {
508                    close: ctx.fin_or_reset_sent,
509                };
510            }
511
512            match conn.recv_body(qconn, stream_id, &mut self.pooled_buf) {
513                Ok(n) => {
514                    let mut body = std::mem::replace(
515                        &mut self.pooled_buf,
516                        BufFactory::get_max_buf(),
517                    );
518                    body.truncate(n);
519
520                    ctx.audit_stats.add_downstream_bytes_recvd(n as u64);
521                    let event = H3Event::BodyBytesReceived {
522                        stream_id,
523                        num_bytes: n as u64,
524                        fin: false,
525                    };
526                    let _ = self.h3_event_sender.send(event.into());
527
528                    permit.send(InboundFrame::Body(body, false));
529                },
530                Err(h3::Error::Done) =>
531                    break StreamStatus::Done { close: false },
532                Err(h3::Error::TransportError(quiche::Error::StreamReset(
533                    code,
534                ))) => {
535                    break StreamStatus::Reset {
536                        wire_err_code: code,
537                    };
538                },
539                Err(_) => break StreamStatus::Done { close: true },
540            }
541        };
542
543        match status {
544            StreamStatus::Done { close } => {
545                if close {
546                    return self.finish_stream(qconn, stream_id, None, None);
547                }
548
549                // The QUIC stream is finished, manually invoke `process_h3_fin`
550                // in case `h3::poll()` is never called again.
551                //
552                // Note that this case will not conflict with StreamStatus::Done
553                // being returned due to the body channel being
554                // blocked. qconn.stream_finished() will guarantee
555                // that we've fully parsed the body as it only returns true
556                // if we've seen a Fin for the read half of the stream.
557                if !ctx.fin_or_reset_recv && qconn.stream_finished(stream_id) {
558                    return self.process_h3_fin(qconn, stream_id);
559                }
560            },
561            StreamStatus::Reset { wire_err_code } => {
562                debug_assert!(ctx.send.is_some());
563                ctx.handle_recvd_reset(wire_err_code);
564                self.h3_event_sender
565                    .send(H3Event::ResetStream { stream_id }.into())
566                    .map_err(|_| H3ConnectionError::ControllerWentAway)?;
567                if ctx.both_directions_done() {
568                    return self.finish_stream(qconn, stream_id, None, None);
569                }
570            },
571            StreamStatus::Blocked => {
572                self.waiting_streams.push(ctx.wait_for_send(stream_id));
573            },
574        }
575
576        Ok(())
577    }
578
579    /// Processes an end-of-stream event from the [`quiche::h3::Connection`].
580    fn process_h3_fin(
581        &mut self, qconn: &mut QuicheConnection, stream_id: u64,
582    ) -> H3ConnectionResult<()> {
583        let ctx = self
584            .stream_map
585            .get_mut(&stream_id)
586            .filter(|c| !c.fin_or_reset_recv);
587        let Some(ctx) = ctx else {
588            // Stream is already finished, nothing to do
589            return Ok(());
590        };
591
592        ctx.fin_or_reset_recv = true;
593        ctx.audit_stats
594            .set_recvd_stream_fin(StreamClosureKind::Explicit);
595
596        // It's important to send this H3Event before process_h3_data so that
597        // a server can (potentially) generate the control response before the
598        // corresponding receiver drops.
599        let event = H3Event::BodyBytesReceived {
600            stream_id,
601            num_bytes: 0,
602            fin: true,
603        };
604        let _ = self.h3_event_sender.send(event.into());
605
606        // Communicate fin to upstream. Since `ctx.fin_recv` is true now,
607        // there can't be a recursive loop.
608        self.process_h3_data(qconn, stream_id)
609    }
610
611    /// Processes a single [`quiche::h3::Event`] received from the underlying
612    /// [`quiche::h3::Connection`]. Some events are dispatched to helper
613    /// methods.
614    fn process_read_event(
615        &mut self, qconn: &mut QuicheConnection, stream_id: u64, event: h3::Event,
616    ) -> H3ConnectionResult<()> {
617        self.forward_settings()?;
618
619        match event {
620            // Requests/responses are exclusively handled by hooks.
621            h3::Event::Headers { list, more_frames } =>
622                H::headers_received(self, qconn, InboundHeaders {
623                    stream_id,
624                    headers: list,
625                    has_body: more_frames,
626                }),
627
628            h3::Event::Data => self.process_h3_data(qconn, stream_id),
629            h3::Event::Finished => self.process_h3_fin(qconn, stream_id),
630
631            h3::Event::Reset(code) => {
632                if let Some(ctx) = self.stream_map.get_mut(&stream_id) {
633                    ctx.handle_recvd_reset(code);
634                    // See if we are waiting on this stream and close the channel
635                    // if we are. If we are not waiting, `handle_recvd_reset()`
636                    // will have taken care of closing.
637                    for pending in self.waiting_streams.iter_mut() {
638                        match pending {
639                            WaitForStream::Upstream(
640                                WaitForUpstreamCapacity {
641                                    stream_id: id,
642                                    chan: Some(chan),
643                                },
644                            ) if stream_id == *id => {
645                                chan.close();
646                            },
647                            _ => {},
648                        }
649                    }
650
651                    self.h3_event_sender
652                        .send(H3Event::ResetStream { stream_id }.into())
653                        .map_err(|_| H3ConnectionError::ControllerWentAway)?;
654                    if ctx.both_directions_done() {
655                        return self.finish_stream(qconn, stream_id, None, None);
656                    }
657                }
658
659                // TODO: if we don't have the stream in our map: should we
660                // send the H3Event::ResetStream?
661                Ok(())
662            },
663
664            h3::Event::PriorityUpdate => Ok(()),
665            h3::Event::GoAway => Err(H3ConnectionError::GoAway),
666        }
667    }
668
669    /// The SETTINGS frame can be received at any point, so we
670    /// need to check `peer_settings_raw` to decide if we've received it.
671    ///
672    /// Settings should only be sent once, so we generate a single event
673    /// when `peer_settings_raw` transitions from None to Some.
674    fn forward_settings(&mut self) -> H3ConnectionResult<()> {
675        if self.settings_received_and_forwarded {
676            return Ok(());
677        }
678
679        // capture the peer settings and forward it
680        if let Some(settings) = self.conn_mut()?.peer_settings_raw() {
681            let incoming_settings = H3Event::IncomingSettings {
682                settings: settings.to_vec(),
683            };
684
685            self.h3_event_sender
686                .send(incoming_settings.into())
687                .map_err(|_| H3ConnectionError::ControllerWentAway)?;
688
689            self.settings_received_and_forwarded = true;
690        }
691        Ok(())
692    }
693
694    /// Send an individual frame to the underlying [`quiche::h3::Connection`] to
695    /// be flushed at a later time.
696    ///
697    /// `Self::process_writes` will iterate over all writable streams and call
698    /// this method in a loop for each stream to send all writable packets.
699    fn process_write_frame(
700        conn: &mut h3::Connection, qconn: &mut QuicheConnection,
701        ctx: &mut StreamCtx,
702    ) -> h3::Result<()> {
703        let Some(frame) = &mut ctx.queued_frame else {
704            return Ok(());
705        };
706
707        let audit_stats = &ctx.audit_stats;
708        let stream_id = audit_stats.stream_id();
709
710        match frame {
711            OutboundFrame::Headers(headers, priority) => {
712                let prio = priority.as_ref().unwrap_or(&DEFAULT_PRIO);
713
714                let res = if ctx.initial_headers_sent {
715                    // Initial headers were already sent, send additional
716                    // headers now.
717                    conn.send_additional_headers_with_priority(
718                        qconn, stream_id, headers, prio, false, false,
719                    )
720                } else {
721                    // Send initial headers.
722                    conn.send_response_with_priority(
723                        qconn, stream_id, headers, prio, false,
724                    )
725                    .inspect(|_| ctx.initial_headers_sent = true)
726                };
727
728                if let Err(h3::Error::StreamBlocked) = res {
729                    ctx.first_full_headers_flush_fail_time
730                        .get_or_insert(Instant::now());
731                }
732
733                if res.is_ok() {
734                    if let Some(first) =
735                        ctx.first_full_headers_flush_fail_time.take()
736                    {
737                        ctx.audit_stats.add_header_flush_duration(
738                            Instant::now().duration_since(first),
739                        );
740                    }
741                }
742
743                res
744            },
745
746            OutboundFrame::Body(body, fin) => {
747                let len = body.as_ref().len();
748                if len == 0 && !*fin {
749                    // quiche doesn't allow sending an empty body when the fin
750                    // flag is not set
751                    return Ok(());
752                }
753                if *fin {
754                    // If this is the last body frame, drop the receiver in the
755                    // stream map to signal that we shouldn't receive any more
756                    // frames. NOTE: we can't use `mpsc::Receiver::close()`
757                    // due to an inconsistency in how tokio handles reading
758                    // from a closed mpsc channel https://github.com/tokio-rs/tokio/issues/7631
759                    ctx.recv = None;
760                }
761                #[cfg(feature = "zero-copy")]
762                let n = conn.send_body_zc(qconn, stream_id, body, *fin)?;
763
764                #[cfg(not(feature = "zero-copy"))]
765                let n = conn.send_body(qconn, stream_id, body, *fin)?;
766
767                audit_stats.add_downstream_bytes_sent(n as _);
768                if n != len {
769                    // Couldn't write the entire body, keep what remains for
770                    // future retry.
771                    #[cfg(not(feature = "zero-copy"))]
772                    body.pop_front(n);
773
774                    Err(h3::Error::StreamBlocked)
775                } else {
776                    if *fin {
777                        Self::on_fin_sent(ctx)?;
778                    }
779                    Ok(())
780                }
781            },
782
783            OutboundFrame::Trailers(headers, priority) => {
784                let prio = priority.as_ref().unwrap_or(&DEFAULT_PRIO);
785
786                // trailers always set fin=true
787                let res = conn.send_additional_headers_with_priority(
788                    qconn, stream_id, headers, prio, true, true,
789                );
790
791                if res.is_ok() {
792                    Self::on_fin_sent(ctx)?;
793                }
794                res
795            },
796
797            OutboundFrame::PeerStreamError => Err(h3::Error::MessageError),
798
799            OutboundFrame::FlowShutdown { .. } => {
800                unreachable!("Only flows send shutdowns")
801            },
802
803            OutboundFrame::Datagram(..) => {
804                unreachable!("Only flows send datagrams")
805            },
806        }
807    }
808
809    fn on_fin_sent(ctx: &mut StreamCtx) -> h3::Result<()> {
810        ctx.recv = None;
811        ctx.fin_or_reset_sent = true;
812        ctx.audit_stats
813            .set_sent_stream_fin(StreamClosureKind::Explicit);
814        if ctx.fin_or_reset_recv {
815            // Return a TransportError to trigger stream cleanup
816            // instead of h3::Error::Done
817            Err(h3::Error::TransportError(quiche::Error::Done))
818        } else {
819            Ok(())
820        }
821    }
822
823    /// Resumes reads or writes to the connection when a stream channel becomes
824    /// unblocked.
825    ///
826    /// If we were waiting for more data from a channel, we resume writing to
827    /// the connection. Otherwise, we were blocked on channel capacity and
828    /// continue reading from the connection. `Upstream` in this context is
829    /// the consumer of the stream.
830    fn upstream_ready(
831        &mut self, qconn: &mut QuicheConnection, ready: StreamReady,
832    ) -> H3ConnectionResult<()> {
833        match ready {
834            StreamReady::Downstream(r) => self.upstream_read_ready(qconn, r),
835            StreamReady::Upstream(r) => self.upstream_write_ready(qconn, r),
836        }
837    }
838
839    fn upstream_read_ready(
840        &mut self, qconn: &mut QuicheConnection,
841        read_ready: ReceivedDownstreamData,
842    ) -> H3ConnectionResult<()> {
843        let ReceivedDownstreamData {
844            stream_id,
845            chan,
846            data,
847        } = read_ready;
848
849        match self.stream_map.get_mut(&stream_id) {
850            None => Ok(()),
851            Some(stream) => {
852                stream.recv = Some(chan);
853                stream.queued_frame = data;
854                self.process_writable_stream(qconn, stream_id)
855            },
856        }
857    }
858
859    fn upstream_write_ready(
860        &mut self, qconn: &mut QuicheConnection,
861        write_ready: HaveUpstreamCapacity,
862    ) -> H3ConnectionResult<()> {
863        let HaveUpstreamCapacity {
864            stream_id,
865            mut chan,
866        } = write_ready;
867
868        match self.stream_map.get_mut(&stream_id) {
869            None => Ok(()),
870            Some(stream) => {
871                chan.abort_send(); // Have to do it to release the associated permit
872                stream.send = Some(chan);
873                self.process_h3_data(qconn, stream_id)
874            },
875        }
876    }
877
878    /// Processes all queued outbound datagrams from the `dgram_recv` channel.
879    fn dgram_ready(
880        &mut self, qconn: &mut QuicheConnection, frame: OutboundFrame,
881    ) -> H3ConnectionResult<()> {
882        let mut frame = Ok(frame);
883
884        loop {
885            match frame {
886                Ok(OutboundFrame::Datagram(dgram, flow_id)) => {
887                    // Drop datagrams if there is no capacity
888                    let _ = datagram::send_h3_dgram(qconn, flow_id, dgram);
889                },
890                Ok(OutboundFrame::FlowShutdown { flow_id, stream_id }) => {
891                    self.finish_stream(
892                        qconn,
893                        stream_id,
894                        Some(quiche::h3::WireErrorCode::NoError as u64),
895                        Some(quiche::h3::WireErrorCode::NoError as u64),
896                    )?;
897                    self.flow_map.remove(&flow_id);
898                    break;
899                },
900                Ok(_) => unreachable!("Flows can't send frame of other types"),
901                Err(TryRecvError::Empty) => break,
902                Err(TryRecvError::Disconnected) =>
903                    return Err(H3ConnectionError::ControllerWentAway),
904            }
905
906            frame = self.dgram_recv.try_recv();
907        }
908
909        Ok(())
910    }
911
912    /// Return a mutable reference to the driver's HTTP/3 connection.
913    ///
914    /// If the connection doesn't exist yet, this function returns
915    /// a `Self::connection_not_present()` error.
916    fn conn_mut(&mut self) -> H3ConnectionResult<&mut h3::Connection> {
917        self.conn.as_mut().ok_or(Self::connection_not_present())
918    }
919
920    /// Alias for [`quiche::Error::TlsFail`], which is used in the case where
921    /// this driver doesn't have an established HTTP/3 connection attached
922    /// to it yet.
923    const fn connection_not_present() -> H3ConnectionError {
924        H3ConnectionError::H3(h3::Error::TransportError(quiche::Error::TlsFail))
925    }
926
927    /// Removes a stream from the stream map if it exists. Also optionally sends
928    /// `RESET` or `STOP_SENDING` frames if `write` or `read` is set to an
929    /// error code, respectively.
930    fn finish_stream(
931        &mut self, qconn: &mut QuicheConnection, stream_id: u64,
932        read: Option<u64>, write: Option<u64>,
933    ) -> H3ConnectionResult<()> {
934        let Some(stream_ctx) = self.stream_map.remove(&stream_id) else {
935            return Ok(());
936        };
937
938        let audit_stats = &stream_ctx.audit_stats;
939
940        if let Some(err) = read {
941            audit_stats.set_sent_stop_sending_error_code(err as _);
942            let _ = qconn.stream_shutdown(stream_id, quiche::Shutdown::Read, err);
943        }
944
945        if let Some(err) = write {
946            audit_stats.set_sent_reset_stream_error_code(err as _);
947            let _ =
948                qconn.stream_shutdown(stream_id, quiche::Shutdown::Write, err);
949        }
950
951        // Find if the stream also has any pending futures associated with it
952        for pending in self.waiting_streams.iter_mut() {
953            match pending {
954                WaitForStream::Downstream(WaitForDownstreamData {
955                    stream_id: id,
956                    chan: Some(chan),
957                }) if stream_id == *id => {
958                    chan.close();
959                },
960                WaitForStream::Upstream(WaitForUpstreamCapacity {
961                    stream_id: id,
962                    chan: Some(chan),
963                }) if stream_id == *id => {
964                    chan.close();
965                },
966                _ => {},
967            }
968        }
969
970        // Close any DATAGRAM-proxying channels when we close the stream, if they
971        // exist
972        if let Some(mapped_flow_id) = stream_ctx.associated_dgram_flow_id {
973            self.flow_map.remove(&mapped_flow_id);
974        }
975
976        if qconn.is_server() {
977            // Signal the server to remove the stream from its map
978            let _ = self
979                .h3_event_sender
980                .send(H3Event::StreamClosed { stream_id }.into());
981        }
982
983        Ok(())
984    }
985
986    /// Handles a regular [`H3Command`]. May be called internally by
987    /// [DriverHooks] for non-endpoint-specific [`H3Command`]s.
988    fn handle_core_command(
989        &mut self, qconn: &mut QuicheConnection, cmd: H3Command,
990    ) -> H3ConnectionResult<()> {
991        match cmd {
992            H3Command::QuicCmd(cmd) => cmd.execute(qconn),
993            H3Command::GoAway => {
994                let max_id = self.max_stream_seen;
995                self.conn_mut()
996                    .expect("connection should be established")
997                    .send_goaway(qconn, max_id)?;
998            },
999        }
1000        Ok(())
1001    }
1002}
1003
1004impl<H: DriverHooks> H3Driver<H> {
1005    /// Reads all buffered datagrams out of `qconn` and distributes them to
1006    /// their flow channels.
1007    fn process_available_dgrams(
1008        &mut self, qconn: &mut QuicheConnection,
1009    ) -> H3ConnectionResult<()> {
1010        loop {
1011            match datagram::receive_h3_dgram(qconn) {
1012                Ok((flow_id, dgram)) => {
1013                    self.get_or_insert_flow(flow_id)?.send_best_effort(dgram);
1014                },
1015                Err(quiche::Error::Done) => return Ok(()),
1016                Err(err) => return Err(H3ConnectionError::from(err)),
1017            }
1018        }
1019    }
1020
1021    /// Flushes any queued-up frames for `stream_id` into `qconn` until either
1022    /// there is no more capacity in `qconn` or no more frames to send.
1023    fn process_writable_stream(
1024        &mut self, qconn: &mut QuicheConnection, stream_id: u64,
1025    ) -> H3ConnectionResult<()> {
1026        // Split self borrow between conn and stream_map
1027        let conn = self.conn.as_mut().ok_or(Self::connection_not_present())?;
1028        let Some(ctx) = self.stream_map.get_mut(&stream_id) else {
1029            return Ok(()); // Unknown stream_id
1030        };
1031
1032        loop {
1033            // Process each writable frame, queue the next frame for processing
1034            // and shut down any errored streams.
1035            match Self::process_write_frame(conn, qconn, ctx) {
1036                Ok(()) => ctx.queued_frame = None,
1037                Err(h3::Error::StreamBlocked | h3::Error::Done) => break,
1038                Err(h3::Error::MessageError) => {
1039                    return self.finish_stream(
1040                        qconn,
1041                        stream_id,
1042                        Some(quiche::h3::WireErrorCode::MessageError as u64),
1043                        Some(quiche::h3::WireErrorCode::MessageError as u64),
1044                    );
1045                },
1046                Err(h3::Error::TransportError(quiche::Error::StreamStopped(
1047                    e,
1048                ))) => {
1049                    ctx.handle_recvd_stop_sending(e);
1050                    if ctx.both_directions_done() {
1051                        return self.finish_stream(qconn, stream_id, None, None);
1052                    } else {
1053                        return Ok(());
1054                    }
1055                },
1056                Err(h3::Error::TransportError(
1057                    quiche::Error::InvalidStreamState(stream),
1058                )) => {
1059                    return self.finish_stream(qconn, stream, None, None);
1060                },
1061                Err(_) => {
1062                    return self.finish_stream(qconn, stream_id, None, None);
1063                },
1064            }
1065
1066            let Some(recv) = ctx.recv.as_mut() else {
1067                // This stream is already waiting for data or we wrote a fin and
1068                // closed the channel.
1069                debug_assert!(
1070                    ctx.queued_frame.is_none(),
1071                    "We MUST NOT have a queued frame if we are already waiting on 
1072                    more data from the channel"
1073                );
1074                return Ok(());
1075            };
1076
1077            // Attempt to queue the next frame for processing. The corresponding
1078            // sender is created at the same time as the `StreamCtx`
1079            // and ultimately ends up in an `H3Body`. The body then
1080            // determines which frames to send to the peer via
1081            // this processing loop.
1082            match recv.try_recv() {
1083                Ok(frame) => ctx.queued_frame = Some(frame),
1084                Err(TryRecvError::Disconnected) => {
1085                    if !ctx.fin_or_reset_sent &&
1086                        ctx.associated_dgram_flow_id.is_none()
1087                    // The channel might be closed if the stream was used to
1088                    // initiate a datagram exchange.
1089                    // TODO: ideally, the application would still shut down the
1090                    // stream properly. Once applications code
1091                    // is fixed, we can remove this check.
1092                    {
1093                        // The channel closed without having written a fin. Send a
1094                        // RESET_STREAM to indicate we won't be writing anything
1095                        // else
1096                        let err = h3::WireErrorCode::RequestCancelled as u64;
1097                        let _ = qconn.stream_shutdown(
1098                            stream_id,
1099                            quiche::Shutdown::Write,
1100                            err,
1101                        );
1102                        ctx.handle_sent_reset(err);
1103                        if ctx.both_directions_done() {
1104                            return self
1105                                .finish_stream(qconn, stream_id, None, None);
1106                        }
1107                    }
1108                    break;
1109                },
1110                Err(TryRecvError::Empty) => {
1111                    self.waiting_streams.push(ctx.wait_for_recv(stream_id));
1112                    break;
1113                },
1114            }
1115        }
1116
1117        Ok(())
1118    }
1119
1120    /// Tests `qconn` for either a local or peer error and increments
1121    /// the associated HTTP/3 or QUIC error counter.
1122    fn record_quiche_error(qconn: &mut QuicheConnection, metrics: &impl Metrics) {
1123        // split metrics between local/peer and QUIC/HTTP/3 level errors
1124        if let Some(err) = qconn.local_error() {
1125            if err.is_app {
1126                metrics.local_h3_conn_close_error_count(err.error_code.into())
1127            } else {
1128                metrics.local_quic_conn_close_error_count(err.error_code.into())
1129            }
1130            .inc();
1131        } else if let Some(err) = qconn.peer_error() {
1132            if err.is_app {
1133                metrics.peer_h3_conn_close_error_count(err.error_code.into())
1134            } else {
1135                metrics.peer_quic_conn_close_error_count(err.error_code.into())
1136            }
1137            .inc();
1138        }
1139    }
1140}
1141
1142impl<H: DriverHooks> ApplicationOverQuic for H3Driver<H> {
1143    fn on_conn_established(
1144        &mut self, quiche_conn: &mut QuicheConnection,
1145        handshake_info: &HandshakeInfo,
1146    ) -> QuicResult<()> {
1147        let conn = h3::Connection::with_transport(quiche_conn, &self.h3_config)?;
1148        self.conn = Some(conn);
1149
1150        H::conn_established(self, quiche_conn, handshake_info)?;
1151        Ok(())
1152    }
1153
1154    #[inline]
1155    fn should_act(&self) -> bool {
1156        self.conn.is_some()
1157    }
1158
1159    #[inline]
1160    fn buffer(&mut self) -> &mut [u8] {
1161        &mut self.pooled_buf
1162    }
1163
1164    /// Poll the underlying [`quiche::h3::Connection`] for
1165    /// [`quiche::h3::Event`]s and DATAGRAMs, delegating processing to
1166    /// `Self::process_read_event`.
1167    ///
1168    /// If a DATAGRAM is found, it is sent to the receiver on its channel.
1169    fn process_reads(&mut self, qconn: &mut QuicheConnection) -> QuicResult<()> {
1170        loop {
1171            match self.conn_mut()?.poll(qconn) {
1172                Ok((stream_id, event)) =>
1173                    self.process_read_event(qconn, stream_id, event)?,
1174                Err(h3::Error::Done) => break,
1175                Err(err) => {
1176                    // Don't bubble error up, instead keep the worker loop going
1177                    // until quiche reports the connection is
1178                    // closed.
1179                    log::debug!("connection closed due to h3 protocol error"; "error"=>?err);
1180                    return Ok(());
1181                },
1182            };
1183        }
1184
1185        self.process_available_dgrams(qconn)?;
1186        Ok(())
1187    }
1188
1189    /// Write as much data as possible into the [`quiche::h3::Connection`] from
1190    /// all sources. This will attempt to write any queued frames into their
1191    /// respective streams, if writable.
1192    fn process_writes(&mut self, qconn: &mut QuicheConnection) -> QuicResult<()> {
1193        while let Some(stream_id) = qconn.stream_writable_next() {
1194            self.process_writable_stream(qconn, stream_id)?;
1195        }
1196
1197        // Also optimistically check for any ready streams
1198        while let Some(Some(ready)) = self.waiting_streams.next().now_or_never() {
1199            self.upstream_ready(qconn, ready)?;
1200        }
1201
1202        Ok(())
1203    }
1204
1205    /// Reports connection-level error metrics and forwards
1206    /// IOWorker errors to the associated [H3Controller].
1207    fn on_conn_close<M: Metrics>(
1208        &mut self, quiche_conn: &mut QuicheConnection, metrics: &M,
1209        work_loop_result: &QuicResult<()>,
1210    ) {
1211        let max_stream_seen = self.max_stream_seen;
1212        metrics
1213            .maximum_writable_streams()
1214            .observe(max_stream_seen as f64);
1215
1216        let Err(work_loop_error) = work_loop_result else {
1217            return;
1218        };
1219
1220        Self::record_quiche_error(quiche_conn, metrics);
1221
1222        let Some(h3_err) = work_loop_error.downcast_ref::<H3ConnectionError>()
1223        else {
1224            log::error!("Found non-H3ConnectionError"; "error" => %work_loop_error);
1225            return;
1226        };
1227
1228        if matches!(h3_err, H3ConnectionError::ControllerWentAway) {
1229            // Inform client that we won't (can't) respond anymore
1230            let _ =
1231                quiche_conn.close(true, h3::WireErrorCode::NoError as u64, &[]);
1232            return;
1233        }
1234
1235        if let Some(ev) = H3Event::from_error(h3_err) {
1236            let _ = self.h3_event_sender.send(ev.into());
1237            #[expect(clippy::needless_return)]
1238            return; // avoid accidental fallthrough in the future
1239        }
1240    }
1241
1242    /// Wait for incoming data from the [H3Controller]. The next iteration of
1243    /// the I/O loop commences when one of the `select!`ed futures triggers.
1244    #[inline]
1245    async fn wait_for_data(
1246        &mut self, qconn: &mut QuicheConnection,
1247    ) -> QuicResult<()> {
1248        select! {
1249            biased;
1250            Some(ready) = self.waiting_streams.next() => self.upstream_ready(qconn, ready),
1251            Some(dgram) = self.dgram_recv.recv() => self.dgram_ready(qconn, dgram),
1252            Some(cmd) = self.cmd_recv.recv() => H::conn_command(self, qconn, cmd),
1253            r = self.hooks.wait_for_action(qconn), if H::has_wait_action(self) => r,
1254        }?;
1255
1256        // Make sure controller is not starved, but also not prioritized in the
1257        // biased select. So poll it last, however also perform a try_recv
1258        // each iteration.
1259        if let Ok(cmd) = self.cmd_recv.try_recv() {
1260            H::conn_command(self, qconn, cmd)?;
1261        }
1262
1263        Ok(())
1264    }
1265}
1266
1267impl<H: DriverHooks> Drop for H3Driver<H> {
1268    fn drop(&mut self) {
1269        for stream in self.stream_map.values() {
1270            stream
1271                .audit_stats
1272                .set_recvd_stream_fin(StreamClosureKind::Implicit);
1273        }
1274    }
1275}
1276
1277/// [`H3Command`]s are sent by the [H3Controller] to alter the [H3Driver]'s
1278/// state.
1279///
1280/// Both [ServerH3Driver] and [ClientH3Driver] may extend this enum with
1281/// endpoint-specific variants.
1282#[derive(Debug)]
1283pub enum H3Command {
1284    /// A connection-level command that executes directly on the
1285    /// [`quiche::Connection`].
1286    QuicCmd(QuicCommand),
1287    /// Send a GOAWAY frame to the peer to initiate a graceful connection
1288    /// shutdown.
1289    GoAway,
1290}
1291
1292/// Sends [`H3Command`]s to an [H3Driver]. The sender is typed and internally
1293/// wraps instances of `T` in the appropriate `H3Command` variant.
1294pub struct RequestSender<C, T> {
1295    sender: UnboundedSender<C>,
1296    // Required to work around dangling type parameter
1297    _r: PhantomData<fn() -> T>,
1298}
1299
1300impl<C, T: Into<C>> RequestSender<C, T> {
1301    /// Send a request to the [H3Driver]. This can only fail if the driver is
1302    /// gone.
1303    #[inline(always)]
1304    pub fn send(&self, v: T) -> Result<(), mpsc::error::SendError<C>> {
1305        self.sender.send(v.into())
1306    }
1307}
1308
1309impl<C, T> Clone for RequestSender<C, T> {
1310    fn clone(&self) -> Self {
1311        Self {
1312            sender: self.sender.clone(),
1313            _r: Default::default(),
1314        }
1315    }
1316}
1317
1318/// Interface to communicate with a paired [H3Driver].
1319///
1320/// An [H3Controller] receives [`H3Event`]s from its driver, which must be
1321/// consumed by the application built on top of the driver to react to incoming
1322/// events. The controller also allows the application to send ad-hoc
1323/// [`H3Command`]s to the driver, which will be processed when the driver waits
1324/// for incoming data.
1325pub struct H3Controller<H: DriverHooks> {
1326    /// Sends [`H3Command`]s to the [H3Driver], like [`QuicCommand`]s or
1327    /// outbound HTTP requests.
1328    cmd_sender: UnboundedSender<H::Command>,
1329    /// Receives [`H3Event`]s from the [H3Driver]. Can be extracted and
1330    /// used independently of the [H3Controller].
1331    h3_event_recv: Option<UnboundedReceiver<H::Event>>,
1332}
1333
1334impl<H: DriverHooks> H3Controller<H> {
1335    /// Gets a mut reference to the [`H3Event`] receiver for the paired
1336    /// [H3Driver].
1337    pub fn event_receiver_mut(&mut self) -> &mut UnboundedReceiver<H::Event> {
1338        self.h3_event_recv
1339            .as_mut()
1340            .expect("No event receiver on H3Controller")
1341    }
1342
1343    /// Takes the [`H3Event`] receiver for the paired [H3Driver].
1344    pub fn take_event_receiver(&mut self) -> UnboundedReceiver<H::Event> {
1345        self.h3_event_recv
1346            .take()
1347            .expect("No event receiver on H3Controller")
1348    }
1349
1350    /// Creates a [`QuicCommand`] sender for the paired [H3Driver].
1351    pub fn cmd_sender(&self) -> RequestSender<H::Command, QuicCommand> {
1352        RequestSender {
1353            sender: self.cmd_sender.clone(),
1354            _r: Default::default(),
1355        }
1356    }
1357
1358    /// Sends a GOAWAY frame to initiate a graceful connection shutdown.
1359    pub fn send_goaway(&self) {
1360        let _ = self.cmd_sender.send(H3Command::GoAway.into());
1361    }
1362}