tokio_quiche/http3/driver/mod.rs
1// Copyright (C) 2025, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8// * Redistributions of source code must retain the above copyright notice,
9// this list of conditions and the following disclaimer.
10//
11// * Redistributions in binary form must reproduce the above copyright
12// notice, this list of conditions and the following disclaimer in the
13// documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27mod client;
28/// Wrapper for running HTTP/3 connections.
29pub mod connection;
30mod datagram;
31// `DriverHooks` must stay private to prevent users from creating their own
32// H3Drivers.
33mod hooks;
34mod server;
35mod streams;
36#[cfg(test)]
37pub mod test_utils;
38#[cfg(test)]
39mod tests;
40
41use std::collections::BTreeMap;
42use std::error::Error;
43use std::fmt;
44use std::marker::PhantomData;
45use std::sync::Arc;
46use std::time::Instant;
47
48use bytes::BufMut as _;
49use bytes::Bytes;
50use bytes::BytesMut;
51use datagram_socket::DgramBuffer;
52use datagram_socket::StreamClosureKind;
53use foundations::telemetry::log;
54use futures::FutureExt;
55use futures_util::stream::FuturesUnordered;
56use quiche::h3;
57use quiche::h3::WireErrorCode;
58use tokio::select;
59use tokio::sync::mpsc;
60use tokio::sync::mpsc::error::TryRecvError;
61use tokio::sync::mpsc::error::TrySendError;
62use tokio::sync::mpsc::UnboundedReceiver;
63use tokio::sync::mpsc::UnboundedSender;
64use tokio_stream::StreamExt;
65use tokio_util::sync::PollSender;
66
67use self::hooks::DriverHooks;
68use self::hooks::InboundHeaders;
69use self::streams::FlowCtx;
70use self::streams::HaveUpstreamCapacity;
71use self::streams::ReceivedDownstreamData;
72use self::streams::StreamCtx;
73use self::streams::StreamReady;
74use self::streams::WaitForDownstreamData;
75use self::streams::WaitForStream;
76use self::streams::WaitForUpstreamCapacity;
77use crate::buf_factory::BufFactory;
78use crate::http3::settings::Http3Settings;
79use crate::http3::H3AuditStats;
80use crate::metrics::Metrics;
81use crate::quic::HandshakeInfo;
82use crate::quic::QuicCommand;
83use crate::quic::QuicheConnection;
84use crate::ApplicationOverQuic;
85use crate::QuicResult;
86
87pub use self::client::ClientEventStream;
88pub use self::client::ClientH3Command;
89pub use self::client::ClientH3Controller;
90pub use self::client::ClientH3Driver;
91pub use self::client::ClientH3Event;
92pub use self::client::ClientRequestSender;
93pub use self::client::NewClientRequest;
94pub use self::server::IsInEarlyData;
95pub use self::server::RawPriorityValue;
96pub use self::server::ServerEventStream;
97pub use self::server::ServerH3Command;
98pub use self::server::ServerH3Controller;
99pub use self::server::ServerH3Driver;
100pub use self::server::ServerH3Event;
101
102// The default priority for HTTP/3 responses if the application didn't provide
103// one.
104const DEFAULT_PRIO: h3::Priority = h3::Priority::new(3, true);
105
106// For a stream use a channel with 16 entries, which works out to 16 * 64KB =
107// 1MB of max buffered data.
108#[cfg(not(any(test, debug_assertions)))]
109const STREAM_CAPACITY: usize = 16;
110#[cfg(any(test, debug_assertions))]
111const STREAM_CAPACITY: usize = 1; // Set to 1 to stress write_pending under test conditions
112
113// For *all* flows use a shared channel with 2048 entries, which works out
114// to 3MB of max buffered data at 1500 bytes per datagram.
115const FLOW_CAPACITY: usize = 2048;
116
117/// Used by a local task to send [`OutboundFrame`]s to a peer on the
118/// stream or flow associated with this channel.
119pub type OutboundFrameSender = PollSender<OutboundFrame>;
120
121/// Used internally to receive [`OutboundFrame`]s which should be sent to a peer
122/// on the stream or flow associated with this channel.
123type OutboundFrameStream = mpsc::Receiver<OutboundFrame>;
124
125/// Used internally to send [`InboundFrame`]s (data) from the peer to a local
126/// task on the stream or flow associated with this channel.
127type InboundFrameSender = PollSender<InboundFrame>;
128
129/// Used by a local task to receive [`InboundFrame`]s (data) on the stream or
130/// flow associated with this channel.
131pub type InboundFrameStream = mpsc::Receiver<InboundFrame>;
132
133/// The error type used internally in [H3Driver].
134///
135/// Note that [`ApplicationOverQuic`] errors are not exposed to users at this
136/// time. The type is public to document the failure modes in [H3Driver].
137#[derive(Debug, PartialEq, Eq)]
138#[non_exhaustive]
139pub enum H3ConnectionError {
140 /// The controller task was shut down and is no longer listening.
141 ControllerWentAway,
142 /// Other error at the connection, but not stream level.
143 H3(h3::Error),
144 /// Received data for a stream that was closed or never opened.
145 NonexistentStream,
146 /// The server's post-accept timeout was hit.
147 /// The timeout can be configured in [`Http3Settings`].
148 PostAcceptTimeout,
149}
150
151impl From<h3::Error> for H3ConnectionError {
152 fn from(err: h3::Error) -> Self {
153 H3ConnectionError::H3(err)
154 }
155}
156
157impl From<quiche::Error> for H3ConnectionError {
158 fn from(err: quiche::Error) -> Self {
159 H3ConnectionError::H3(h3::Error::TransportError(err))
160 }
161}
162
163impl Error for H3ConnectionError {}
164
165impl fmt::Display for H3ConnectionError {
166 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
167 let s: &dyn fmt::Display = match self {
168 Self::ControllerWentAway => &"controller went away",
169 Self::H3(e) => e,
170 Self::NonexistentStream => &"nonexistent stream",
171 Self::PostAcceptTimeout => &"post accept timeout hit",
172 };
173
174 write!(f, "H3ConnectionError: {s}")
175 }
176}
177
178type H3ConnectionResult<T> = Result<T, H3ConnectionError>;
179
180/// HTTP/3 headers that were received on a stream.
181///
182/// `recv` is used to read the message body, while `send` is used to transmit
183/// data back to the peer.
184pub struct IncomingH3Headers {
185 /// Stream ID of the frame.
186 pub stream_id: u64,
187 /// The actual [`h3::Header`]s which were received.
188 pub headers: Vec<h3::Header>,
189 /// An [`OutboundFrameSender`] for streaming body data to the peer. For
190 /// [ClientH3Driver], note that the request body can also be passed a
191 /// cloned sender via [`NewClientRequest`].
192 pub send: OutboundFrameSender,
193 /// An [`InboundFrameStream`] of body data received from the peer.
194 pub recv: InboundFrameStream,
195 /// Whether there is a body associated with the incoming headers.
196 pub read_fin: bool,
197 /// Handle to the [`H3AuditStats`] for the message's stream.
198 pub h3_audit_stats: Arc<H3AuditStats>,
199}
200
201impl fmt::Debug for IncomingH3Headers {
202 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
203 f.debug_struct("IncomingH3Headers")
204 .field("stream_id", &self.stream_id)
205 .field("headers", &self.headers)
206 .field("read_fin", &self.read_fin)
207 .field("h3_audit_stats", &self.h3_audit_stats)
208 .finish()
209 }
210}
211
212/// [`H3Event`]s are produced by an [H3Driver] to describe HTTP/3 state updates.
213///
214/// Both [ServerH3Driver] and [ClientH3Driver] may extend this enum with
215/// endpoint-specific variants. The events must be consumed by users of the
216/// drivers, like a higher-level `Server` or `Client` controller.
217#[derive(Debug)]
218pub enum H3Event {
219 /// A SETTINGS frame was received.
220 IncomingSettings {
221 /// Raw HTTP/3 setting pairs, in the order received from the peer.
222 settings: Vec<(u64, u64)>,
223 },
224
225 /// A HEADERS frame was received on the given stream. This is either a
226 /// request or a response depending on the perspective of the [`H3Event`]
227 /// receiver.
228 IncomingHeaders(IncomingH3Headers),
229
230 /// A DATAGRAM flow was created and associated with the given `flow_id`.
231 /// This event is fired before a HEADERS event for CONNECT[-UDP] requests.
232 NewFlow {
233 /// Flow ID of the new flow.
234 flow_id: u64,
235 /// An [`OutboundFrameSender`] for transmitting datagrams to the peer.
236 send: OutboundFrameSender,
237 /// An [`InboundFrameStream`] for receiving datagrams from the peer.
238 recv: InboundFrameStream,
239 },
240 /// A RST_STREAM frame was seen on the given `stream_id`. The user of the
241 /// driver should clean up any state allocated for this stream.
242 ResetStream { stream_id: u64 },
243 /// The connection has irrecoverably errored and is shutting down.
244 ConnectionError(h3::Error),
245 /// The connection has been shutdown, optionally due to an
246 /// [`H3ConnectionError`].
247 ConnectionShutdown(Option<H3ConnectionError>),
248 /// Body data has been received over a stream.
249 BodyBytesReceived {
250 /// Stream ID of the body data.
251 stream_id: u64,
252 /// Number of bytes received.
253 num_bytes: u64,
254 /// Whether the stream is finished and won't yield any more data.
255 fin: bool,
256 },
257 /// The stream has been closed. This is used to signal stream closures that
258 /// don't result from RST_STREAM frames, unlike the
259 /// [`H3Event::ResetStream`] variant.
260 StreamClosed { stream_id: u64 },
261 /// A GOAWAY frame was received from the peer containing `id`,
262 /// as described in https://datatracker.ietf.org/doc/html/rfc9114#section-5.2.
263 GoAway { id: u64 },
264}
265
266impl H3Event {
267 /// Generates an event from an applicable [`H3ConnectionError`].
268 fn from_error(err: &H3ConnectionError) -> Option<Self> {
269 Some(match err {
270 H3ConnectionError::H3(e) => Self::ConnectionError(*e),
271 H3ConnectionError::PostAcceptTimeout => Self::ConnectionShutdown(
272 Some(H3ConnectionError::PostAcceptTimeout),
273 ),
274 _ => return None,
275 })
276 }
277}
278
279/// An [`OutboundFrame`] is a data frame that should be sent from a local task
280/// to a peer over a [`quiche::h3::Connection`].
281///
282/// This is used, for example, to send response body data to a peer, or proxied
283/// UDP datagrams.
284#[derive(Debug)]
285pub enum OutboundFrame {
286 /// Response headers to be sent to the peer, with optional priority.
287 Headers(Vec<h3::Header>, Option<quiche::h3::Priority>),
288 /// Response body/CONNECT downstream data plus FIN flag.
289 Body(Bytes, bool),
290 /// CONNECT-UDP (DATAGRAM) downstream data plus flow ID.
291 Datagram(DgramBuffer, u64),
292 /// Close the stream with a trailers, with optional priority.
293 Trailers(Vec<h3::Header>, Option<quiche::h3::Priority>),
294 /// An error encountered when serving the request. Stream should be closed.
295 PeerStreamError,
296 /// DATAGRAM flow explicitly closed.
297 FlowShutdown { flow_id: u64, stream_id: u64 },
298}
299
300/// An [`InboundFrame`] is a data frame that was received from the peer over a
301/// [`quiche::h3::Connection`]. This is used by peers to send body or datagrams
302/// to the local task.
303#[derive(Debug)]
304pub enum InboundFrame {
305 /// Request body/CONNECT upstream data plus FIN flag.
306 Body(BytesMut, bool),
307 /// CONNECT-UDP (DATAGRAM) upstream data.
308 Datagram(DgramBuffer),
309}
310
311/// A ready-made [`ApplicationOverQuic`] which can handle HTTP/3 and MASQUE.
312/// Depending on the `DriverHooks` in use, it powers either a client or a
313/// server.
314///
315/// Use the [ClientH3Driver] and [ServerH3Driver] aliases to access the
316/// respective driver types. The driver is passed into an I/O loop and
317/// communicates with the driver's user (e.g., an HTTP client or a server) via
318/// its associated [H3Controller]. The controller allows the application to both
319/// listen for [`H3Event`]s of note and send [`H3Command`]s into the I/O loop.
320pub struct H3Driver<H: DriverHooks> {
321 /// Configuration used to initialize `conn`. Created from [`Http3Settings`]
322 /// in the constructor.
323 h3_config: h3::Config,
324 /// The underlying HTTP/3 connection. Initialized in
325 /// `ApplicationOverQuic::on_conn_established`.
326 conn: Option<h3::Connection>,
327 /// State required by the client/server hooks.
328 hooks: H,
329 /// Sends [`H3Event`]s to the [H3Controller] paired with this driver.
330 h3_event_sender: mpsc::UnboundedSender<H::Event>,
331 /// Receives [`H3Command`]s from the [H3Controller] paired with this driver.
332 cmd_recv: mpsc::UnboundedReceiver<H::Command>,
333 /// A sender that feeds back into `cmd_recv`. Used by hooks that need to
334 /// re-queue commands (e.g. retrying blocked requests) without access to
335 /// the [H3Controller]'s copy of the sender.
336 cmd_sender: mpsc::UnboundedSender<H::Command>,
337
338 /// A map of stream IDs to their [StreamCtx]. This is mainly used to
339 /// retrieve the internal Tokio channels associated with the stream.
340 stream_map: BTreeMap<u64, StreamCtx>,
341 /// A map of flow IDs to their [FlowCtx]. This is mainly used to retrieve
342 /// the internal Tokio channels associated with the flow.
343 flow_map: BTreeMap<u64, FlowCtx>,
344 /// Set of [`WaitForStream`] futures. A stream is added to this set if
345 /// we need to send to it and its channel is at capacity, or if we need
346 /// data from its channel and the channel is empty.
347 waiting_streams: FuturesUnordered<WaitForStream>,
348
349 /// Receives [`OutboundFrame`]s from all datagram flows on the connection.
350 dgram_recv: OutboundFrameStream,
351 /// Keeps the datagram channel open such that datagram flows can be created.
352 dgram_send: OutboundFrameSender,
353 /// A buffer to receive H3 body data from quiche. We initialize a large
354 /// buffer and then `split()` off filled parts until we need to reallocate.
355 body_recv_buf: bytes::buf::Limit<BytesMut>,
356
357 /// The buffer used to interact with the underlying IoWorker.
358 io_worker_buf: Vec<u8>,
359 /// The maximum HTTP/3 stream ID seen on this connection.
360 max_stream_seen: u64,
361
362 /// Tracks whether we have forwarded the HTTP/3 SETTINGS frame
363 /// to the [H3Controller] once.
364 settings_received_and_forwarded: bool,
365}
366
367impl<H: DriverHooks> H3Driver<H> {
368 /// Builds a new [H3Driver] and an associated [H3Controller].
369 ///
370 /// The driver should then be passed to
371 /// [`InitialQuicConnection`](crate::InitialQuicConnection)'s `start`
372 /// method.
373 pub fn new(http3_settings: Http3Settings) -> (Self, H3Controller<H>) {
374 let (dgram_send, dgram_recv) = mpsc::channel(FLOW_CAPACITY);
375 let (cmd_sender, cmd_recv) = mpsc::unbounded_channel();
376 let (h3_event_sender, h3_event_recv) = mpsc::unbounded_channel();
377
378 (
379 H3Driver {
380 h3_config: (&http3_settings).into(),
381 conn: None,
382 hooks: H::new(&http3_settings),
383 h3_event_sender,
384 cmd_recv,
385 cmd_sender: cmd_sender.clone(),
386
387 stream_map: BTreeMap::new(),
388 flow_map: BTreeMap::new(),
389
390 dgram_recv,
391 dgram_send: PollSender::new(dgram_send),
392 max_stream_seen: 0,
393 body_recv_buf: BytesMut::with_capacity(BufFactory::MAX_BUF_SIZE)
394 .limit(BufFactory::MAX_BUF_SIZE),
395 io_worker_buf: vec![0u8; BufFactory::MAX_BUF_SIZE],
396
397 waiting_streams: FuturesUnordered::new(),
398
399 settings_received_and_forwarded: false,
400 },
401 H3Controller {
402 cmd_sender,
403 h3_event_recv: Some(h3_event_recv),
404 },
405 )
406 }
407
408 /// Returns a sender that feeds back into this driver's own `cmd_recv`.
409 ///
410 /// Hooks that need to re-queue commands (e.g. retrying a request that
411 /// was temporarily blocked) can use this sender without needing access
412 /// to the paired [H3Controller].
413 pub(crate) fn self_cmd_sender(&self) -> &mpsc::UnboundedSender<H::Command> {
414 &self.cmd_sender
415 }
416
417 /// Retrieve the [FlowCtx] associated with the given `flow_id`. If no
418 /// context is found, a new one will be created.
419 fn get_or_insert_flow(
420 &mut self, flow_id: u64,
421 ) -> H3ConnectionResult<&mut FlowCtx> {
422 use std::collections::btree_map::Entry;
423 Ok(match self.flow_map.entry(flow_id) {
424 Entry::Vacant(e) => {
425 // This is a datagram for a new flow we haven't seen before
426 let (flow, recv) = FlowCtx::new(FLOW_CAPACITY);
427 let flow_req = H3Event::NewFlow {
428 flow_id,
429 recv,
430 send: self.dgram_send.clone(),
431 };
432 self.h3_event_sender
433 .send(flow_req.into())
434 .map_err(|_| H3ConnectionError::ControllerWentAway)?;
435 e.insert(flow)
436 },
437 Entry::Occupied(e) => e.into_mut(),
438 })
439 }
440
441 /// Adds a [StreamCtx] to the stream map with the given `stream_id`.
442 fn insert_stream(&mut self, stream_id: u64, ctx: StreamCtx) {
443 self.stream_map.insert(stream_id, ctx);
444 self.max_stream_seen = self.max_stream_seen.max(stream_id);
445 }
446
447 /// Fetches body chunks from the [`quiche::h3::Connection`] and forwards
448 /// them to the stream's associated [`InboundFrameStream`].
449 fn process_h3_data(
450 &mut self, qconn: &mut QuicheConnection, stream_id: u64,
451 ) -> H3ConnectionResult<()> {
452 // Split self borrow between conn and stream_map
453 let conn = self.conn.as_mut().ok_or(Self::connection_not_present())?;
454 let ctx = self
455 .stream_map
456 .get_mut(&stream_id)
457 .ok_or(H3ConnectionError::NonexistentStream)?;
458
459 enum StreamStatus {
460 Done { close: bool },
461 Reset { wire_err_code: u64 },
462 Blocked,
463 }
464
465 let status = loop {
466 let Some(sender) = ctx.send.as_ref().and_then(PollSender::get_ref)
467 else {
468 // already waiting for capacity
469 break StreamStatus::Done { close: false };
470 };
471
472 let try_reserve_result = sender.try_reserve();
473 let permit = match try_reserve_result {
474 Ok(permit) => permit,
475 Err(TrySendError::Closed(())) => {
476 // The channel has closed before we delivered a fin or reset
477 // to the application.
478 if !ctx.fin_or_reset_recv &&
479 ctx.associated_dgram_flow_id.is_none()
480 // The channel might be closed if the stream was used to
481 // initiate a datagram exchange.
482 // TODO: ideally, the application would still shut down the
483 // stream properly. Once applications code
484 // is fixed, we can remove this check.
485 {
486 let err = h3::WireErrorCode::RequestCancelled as u64;
487 let _ = qconn.stream_shutdown(
488 stream_id,
489 quiche::Shutdown::Read,
490 err,
491 );
492 drop(try_reserve_result); // needed to drop the borrow on ctx.
493 ctx.handle_sent_stop_sending(err);
494 // TODO: should we send an H3Event event to
495 // h3_event_sender? We can only get here if the app
496 // actively closed or dropped
497 // the channel so any event we send would be more for
498 // logging or auditing
499 }
500 break StreamStatus::Done {
501 close: ctx.both_directions_done(),
502 };
503 },
504 Err(TrySendError::Full(())) => {
505 if ctx.fin_or_reset_recv || qconn.stream_readable(stream_id) {
506 break StreamStatus::Blocked;
507 }
508 break StreamStatus::Done { close: false };
509 },
510 };
511
512 if ctx.fin_or_reset_recv {
513 // Signal end-of-body to upstream
514 permit.send(InboundFrame::Body(Default::default(), true));
515 break StreamStatus::Done {
516 close: ctx.fin_or_reset_sent,
517 };
518 }
519
520 // NOTE: `self.body_recv_buf` is `Limit<BytesMut>` so
521 // `has_remaining_mut()` will indicate if the buffer
522 // has space available until the *limit* is
523 // reached. (A plain `BytesMut` can reallocate and would always
524 // return true)
525 if !self.body_recv_buf.has_remaining_mut() {
526 self.body_recv_buf =
527 BytesMut::with_capacity(BufFactory::MAX_BUF_SIZE)
528 .limit(BufFactory::MAX_BUF_SIZE)
529 };
530 match conn.recv_body_buf(qconn, stream_id, &mut self.body_recv_buf) {
531 Ok(n) => {
532 ctx.audit_stats.add_downstream_bytes_recvd(n as u64);
533 let event = H3Event::BodyBytesReceived {
534 stream_id,
535 num_bytes: n as u64,
536 fin: false,
537 };
538 let _ = self.h3_event_sender.send(event.into());
539 // Take the filled part, leave the remaining capacity
540 let filled_body = self.body_recv_buf.get_mut().split();
541 // Sanity check: the remaining spare capacity should equal
542 // the limit.
543 debug_assert_eq!(
544 self.body_recv_buf.get_mut().spare_capacity_mut().len(),
545 self.body_recv_buf.remaining_mut()
546 );
547 permit.send(InboundFrame::Body(filled_body, false));
548 },
549 Err(h3::Error::Done) =>
550 break StreamStatus::Done { close: false },
551 Err(h3::Error::TransportError(quiche::Error::StreamReset(
552 code,
553 ))) => {
554 break StreamStatus::Reset {
555 wire_err_code: code,
556 };
557 },
558 Err(_) => break StreamStatus::Done { close: true },
559 }
560 };
561
562 match status {
563 StreamStatus::Done { close } => {
564 if close {
565 return self.cleanup_stream(qconn, stream_id);
566 }
567
568 // The QUIC stream is finished, manually invoke `process_h3_fin`
569 // in case `h3::poll()` is never called again.
570 //
571 // Note that this case will not conflict with StreamStatus::Done
572 // being returned due to the body channel being
573 // blocked. qconn.stream_finished() will guarantee
574 // that we've fully parsed the body as it only returns true
575 // if we've seen a Fin for the read half of the stream.
576 if !ctx.fin_or_reset_recv && qconn.stream_finished(stream_id) {
577 return self.process_h3_fin(qconn, stream_id);
578 }
579 },
580 StreamStatus::Reset { wire_err_code } => {
581 debug_assert!(ctx.send.is_some());
582 ctx.handle_recvd_reset(wire_err_code);
583 self.h3_event_sender
584 .send(H3Event::ResetStream { stream_id }.into())
585 .map_err(|_| H3ConnectionError::ControllerWentAway)?;
586 if ctx.both_directions_done() {
587 return self.cleanup_stream(qconn, stream_id);
588 }
589 },
590 StreamStatus::Blocked => {
591 self.waiting_streams.push(ctx.wait_for_send(stream_id));
592 },
593 }
594
595 Ok(())
596 }
597
598 /// Processes an end-of-stream event from the [`quiche::h3::Connection`].
599 fn process_h3_fin(
600 &mut self, qconn: &mut QuicheConnection, stream_id: u64,
601 ) -> H3ConnectionResult<()> {
602 let ctx = self
603 .stream_map
604 .get_mut(&stream_id)
605 .filter(|c| !c.fin_or_reset_recv);
606 let Some(ctx) = ctx else {
607 // Stream is already finished, nothing to do
608 return Ok(());
609 };
610
611 ctx.fin_or_reset_recv = true;
612 ctx.audit_stats
613 .set_recvd_stream_fin(StreamClosureKind::Explicit);
614
615 // It's important to send this H3Event before process_h3_data so that
616 // a server can (potentially) generate the control response before the
617 // corresponding receiver drops.
618 let event = H3Event::BodyBytesReceived {
619 stream_id,
620 num_bytes: 0,
621 fin: true,
622 };
623 let _ = self.h3_event_sender.send(event.into());
624
625 // Communicate fin to upstream. Since `ctx.fin_recv` is true now,
626 // there can't be a recursive loop.
627 self.process_h3_data(qconn, stream_id)
628 }
629
630 /// Processes a single [`quiche::h3::Event`] received from the underlying
631 /// [`quiche::h3::Connection`]. Some events are dispatched to helper
632 /// methods.
633 fn process_read_event(
634 &mut self, qconn: &mut QuicheConnection, stream_id: u64, event: h3::Event,
635 ) -> H3ConnectionResult<()> {
636 self.forward_settings()?;
637
638 match event {
639 // Requests/responses are exclusively handled by hooks.
640 h3::Event::Headers { list, more_frames } =>
641 H::headers_received(self, qconn, InboundHeaders {
642 stream_id,
643 headers: list,
644 has_body: more_frames,
645 }),
646
647 h3::Event::Data => self.process_h3_data(qconn, stream_id),
648 h3::Event::Finished => self.process_h3_fin(qconn, stream_id),
649
650 h3::Event::Reset(code) => {
651 if let Some(ctx) = self.stream_map.get_mut(&stream_id) {
652 ctx.handle_recvd_reset(code);
653 // See if we are waiting on this stream and close the channel
654 // if we are. If we are not waiting, `handle_recvd_reset()`
655 // will have taken care of closing.
656 for pending in self.waiting_streams.iter_mut() {
657 match pending {
658 WaitForStream::Upstream(
659 WaitForUpstreamCapacity {
660 stream_id: id,
661 chan: Some(chan),
662 },
663 ) if stream_id == *id => {
664 chan.close();
665 },
666 _ => {},
667 }
668 }
669
670 self.h3_event_sender
671 .send(H3Event::ResetStream { stream_id }.into())
672 .map_err(|_| H3ConnectionError::ControllerWentAway)?;
673 if ctx.both_directions_done() {
674 return self.cleanup_stream(qconn, stream_id);
675 }
676 }
677
678 // TODO: if we don't have the stream in our map: should we
679 // send the H3Event::ResetStream?
680 Ok(())
681 },
682
683 h3::Event::PriorityUpdate => Ok(()),
684 h3::Event::GoAway => {
685 self.h3_event_sender
686 .send(H3Event::GoAway { id: stream_id }.into())
687 .map_err(|_| H3ConnectionError::ControllerWentAway)?;
688 Ok(())
689 },
690 }
691 }
692
693 /// The SETTINGS frame can be received at any point, so we
694 /// need to check `peer_settings_raw` to decide if we've received it.
695 ///
696 /// Settings should only be sent once, so we generate a single event
697 /// when `peer_settings_raw` transitions from None to Some.
698 fn forward_settings(&mut self) -> H3ConnectionResult<()> {
699 if self.settings_received_and_forwarded {
700 return Ok(());
701 }
702
703 // capture the peer settings and forward it
704 if let Some(settings) = self.conn_mut()?.peer_settings_raw() {
705 let incoming_settings = H3Event::IncomingSettings {
706 settings: settings.to_vec(),
707 };
708
709 self.h3_event_sender
710 .send(incoming_settings.into())
711 .map_err(|_| H3ConnectionError::ControllerWentAway)?;
712
713 self.settings_received_and_forwarded = true;
714 }
715 Ok(())
716 }
717
718 /// Send an individual frame to the underlying [`quiche::h3::Connection`] to
719 /// be flushed at a later time.
720 ///
721 /// `Self::process_writes` will iterate over all writable streams and call
722 /// this method in a loop for each stream to send all writable packets.
723 fn process_write_frame(
724 conn: &mut h3::Connection, qconn: &mut QuicheConnection,
725 ctx: &mut StreamCtx,
726 ) -> h3::Result<()> {
727 let Some(frame) = &mut ctx.queued_frame else {
728 return Ok(());
729 };
730
731 let audit_stats = &ctx.audit_stats;
732 let stream_id = audit_stats.stream_id();
733
734 match frame {
735 OutboundFrame::Headers(headers, priority) => {
736 let prio = priority.as_ref().unwrap_or(&DEFAULT_PRIO);
737
738 let res = if ctx.initial_headers_sent {
739 // Initial headers were already sent, send additional
740 // headers now.
741 conn.send_additional_headers_with_priority(
742 qconn, stream_id, headers, prio, false, false,
743 )
744 } else {
745 // Send initial headers.
746 conn.send_response_with_priority(
747 qconn, stream_id, headers, prio, false,
748 )
749 .inspect(|_| ctx.initial_headers_sent = true)
750 };
751
752 if let Err(h3::Error::StreamBlocked) = res {
753 ctx.first_full_headers_flush_fail_time
754 .get_or_insert(Instant::now());
755 }
756
757 if res.is_ok() {
758 if let Some(first) =
759 ctx.first_full_headers_flush_fail_time.take()
760 {
761 ctx.audit_stats.add_header_flush_duration(
762 Instant::now().duration_since(first),
763 );
764 }
765 }
766
767 res
768 },
769
770 OutboundFrame::Body(body, fin) => {
771 let len = body.len();
772 if len == 0 && !*fin {
773 // quiche doesn't allow sending an empty body when the fin
774 // flag is not set
775 return Ok(());
776 }
777 if *fin {
778 // If this is the last body frame, drop the receiver in the
779 // stream map to signal that we shouldn't receive any more
780 // frames. NOTE: we can't use `mpsc::Receiver::close()`
781 // due to an inconsistency in how tokio handles reading
782 // from a closed mpsc channel https://github.com/tokio-rs/tokio/issues/7631
783 ctx.recv = None;
784 }
785 let n = conn.send_body_zc(qconn, stream_id, body, *fin)?;
786
787 audit_stats.add_downstream_bytes_sent(n as _);
788 if n != len {
789 // Couldn't write the entire body, `send_body_zc` will
790 // have trimmed `body` accordingly. The driver keeps
791 // the remainder of the body to send in the future.
792 debug_assert_eq!(
793 n + body.len(),
794 len,
795 "send_body_zc() should have trimmed body but did not"
796 );
797 Err(h3::Error::StreamBlocked)
798 } else {
799 if *fin {
800 Self::on_fin_sent(ctx)?;
801 }
802 Ok(())
803 }
804 },
805
806 OutboundFrame::Trailers(headers, priority) => {
807 let prio = priority.as_ref().unwrap_or(&DEFAULT_PRIO);
808
809 // trailers always set fin=true
810 let res = conn.send_additional_headers_with_priority(
811 qconn, stream_id, headers, prio, true, true,
812 );
813
814 if res.is_ok() {
815 Self::on_fin_sent(ctx)?;
816 }
817 res
818 },
819
820 OutboundFrame::PeerStreamError => Err(h3::Error::MessageError),
821
822 OutboundFrame::FlowShutdown { .. } => {
823 unreachable!("Only flows send shutdowns")
824 },
825
826 OutboundFrame::Datagram(..) => {
827 unreachable!("Only flows send datagrams")
828 },
829 }
830 }
831
832 fn on_fin_sent(ctx: &mut StreamCtx) -> h3::Result<()> {
833 ctx.recv = None;
834 ctx.fin_or_reset_sent = true;
835 ctx.audit_stats
836 .set_sent_stream_fin(StreamClosureKind::Explicit);
837 if ctx.fin_or_reset_recv {
838 // Return a TransportError to trigger stream cleanup
839 // instead of h3::Error::Done
840 Err(h3::Error::TransportError(quiche::Error::Done))
841 } else {
842 Ok(())
843 }
844 }
845
846 /// Resumes reads or writes to the connection when a stream channel becomes
847 /// unblocked.
848 ///
849 /// If we were waiting for more data from a channel, we resume writing to
850 /// the connection. Otherwise, we were blocked on channel capacity and
851 /// continue reading from the connection. `Upstream` in this context is
852 /// the consumer of the stream.
853 fn upstream_ready(
854 &mut self, qconn: &mut QuicheConnection, ready: StreamReady,
855 ) -> H3ConnectionResult<()> {
856 match ready {
857 StreamReady::Downstream(r) => self.upstream_read_ready(qconn, r),
858 StreamReady::Upstream(r) => self.upstream_write_ready(qconn, r),
859 }
860 }
861
862 fn upstream_read_ready(
863 &mut self, qconn: &mut QuicheConnection,
864 read_ready: ReceivedDownstreamData,
865 ) -> H3ConnectionResult<()> {
866 let ReceivedDownstreamData {
867 stream_id,
868 chan,
869 data,
870 } = read_ready;
871
872 match self.stream_map.get_mut(&stream_id) {
873 None => Ok(()),
874 Some(stream) => {
875 stream.recv = Some(chan);
876 stream.queued_frame = data;
877 self.process_writable_stream(qconn, stream_id)
878 },
879 }
880 }
881
882 fn upstream_write_ready(
883 &mut self, qconn: &mut QuicheConnection,
884 write_ready: HaveUpstreamCapacity,
885 ) -> H3ConnectionResult<()> {
886 let HaveUpstreamCapacity {
887 stream_id,
888 mut chan,
889 } = write_ready;
890
891 match self.stream_map.get_mut(&stream_id) {
892 None => Ok(()),
893 Some(stream) => {
894 chan.abort_send(); // Have to do it to release the associated permit
895 stream.send = Some(chan);
896 self.process_h3_data(qconn, stream_id)
897 },
898 }
899 }
900
901 /// Processes all queued outbound datagrams from the `dgram_recv` channel.
902 fn dgram_ready(
903 &mut self, qconn: &mut QuicheConnection, frame: OutboundFrame,
904 ) -> H3ConnectionResult<()> {
905 let mut frame = Ok(frame);
906
907 loop {
908 match frame {
909 Ok(OutboundFrame::Datagram(dgram, flow_id)) => {
910 // Drop datagrams if there is no capacity
911 let _ = datagram::send_h3_dgram(qconn, flow_id, dgram);
912 },
913 Ok(OutboundFrame::FlowShutdown { flow_id, stream_id }) => {
914 self.shutdown_stream(
915 qconn,
916 stream_id,
917 StreamShutdown::Both {
918 read_error_code: WireErrorCode::NoError as u64,
919 write_error_code: WireErrorCode::NoError as u64,
920 },
921 )?;
922 self.flow_map.remove(&flow_id);
923 break;
924 },
925 Ok(_) => unreachable!("Flows can't send frame of other types"),
926 Err(TryRecvError::Empty) => break,
927 Err(TryRecvError::Disconnected) =>
928 return Err(H3ConnectionError::ControllerWentAway),
929 }
930
931 frame = self.dgram_recv.try_recv();
932 }
933
934 Ok(())
935 }
936
937 /// Return a mutable reference to the driver's HTTP/3 connection.
938 ///
939 /// If the connection doesn't exist yet, this function returns
940 /// a `Self::connection_not_present()` error.
941 fn conn_mut(&mut self) -> H3ConnectionResult<&mut h3::Connection> {
942 self.conn.as_mut().ok_or(Self::connection_not_present())
943 }
944
945 /// Alias for [`quiche::Error::TlsFail`], which is used in the case where
946 /// this driver doesn't have an established HTTP/3 connection attached
947 /// to it yet.
948 const fn connection_not_present() -> H3ConnectionError {
949 H3ConnectionError::H3(h3::Error::TransportError(quiche::Error::TlsFail))
950 }
951
952 /// Cleans up internal state for the indicated HTTP/3 stream.
953 ///
954 /// This function removes the stream from the stream map, closes any pending
955 /// futures, removes associated DATAGRAM flows, and sends a
956 /// [`H3Event::StreamClosed`] event (for servers).
957 fn cleanup_stream(
958 &mut self, qconn: &mut QuicheConnection, stream_id: u64,
959 ) -> H3ConnectionResult<()> {
960 let Some(stream_ctx) = self.stream_map.remove(&stream_id) else {
961 return Ok(());
962 };
963
964 // Find if the stream also has any pending futures associated with it
965 for pending in self.waiting_streams.iter_mut() {
966 match pending {
967 WaitForStream::Downstream(WaitForDownstreamData {
968 stream_id: id,
969 chan: Some(chan),
970 }) if stream_id == *id => {
971 chan.close();
972 },
973 WaitForStream::Upstream(WaitForUpstreamCapacity {
974 stream_id: id,
975 chan: Some(chan),
976 }) if stream_id == *id => {
977 chan.close();
978 },
979 _ => {},
980 }
981 }
982
983 // Close any DATAGRAM-proxying channels when we close the stream, if they
984 // exist
985 if let Some(mapped_flow_id) = stream_ctx.associated_dgram_flow_id {
986 self.flow_map.remove(&mapped_flow_id);
987 }
988
989 if qconn.is_server() {
990 // Signal the server to remove the stream from its map
991 let _ = self
992 .h3_event_sender
993 .send(H3Event::StreamClosed { stream_id }.into());
994 }
995
996 Ok(())
997 }
998
999 /// Shuts down the indicated HTTP/3 stream by sending frames and cleaning
1000 /// up then cleans up internal state by calling
1001 /// [`Self::cleanup_stream`].
1002 fn shutdown_stream(
1003 &mut self, qconn: &mut QuicheConnection, stream_id: u64,
1004 shutdown: StreamShutdown,
1005 ) -> H3ConnectionResult<()> {
1006 let Some(stream_ctx) = self.stream_map.get(&stream_id) else {
1007 return Ok(());
1008 };
1009
1010 let audit_stats = &stream_ctx.audit_stats;
1011
1012 match shutdown {
1013 StreamShutdown::Read { error_code } => {
1014 audit_stats.set_sent_stop_sending_error_code(error_code as _);
1015 let _ = qconn.stream_shutdown(
1016 stream_id,
1017 quiche::Shutdown::Read,
1018 error_code,
1019 );
1020 },
1021 StreamShutdown::Write { error_code } => {
1022 audit_stats.set_sent_reset_stream_error_code(error_code as _);
1023 let _ = qconn.stream_shutdown(
1024 stream_id,
1025 quiche::Shutdown::Write,
1026 error_code,
1027 );
1028 },
1029 StreamShutdown::Both {
1030 read_error_code,
1031 write_error_code,
1032 } => {
1033 audit_stats
1034 .set_sent_stop_sending_error_code(read_error_code as _);
1035 let _ = qconn.stream_shutdown(
1036 stream_id,
1037 quiche::Shutdown::Read,
1038 read_error_code,
1039 );
1040 audit_stats
1041 .set_sent_reset_stream_error_code(write_error_code as _);
1042 let _ = qconn.stream_shutdown(
1043 stream_id,
1044 quiche::Shutdown::Write,
1045 write_error_code,
1046 );
1047 },
1048 }
1049
1050 self.cleanup_stream(qconn, stream_id)
1051 }
1052
1053 /// Handles a regular [`H3Command`]. May be called internally by
1054 /// [DriverHooks] for non-endpoint-specific [`H3Command`]s.
1055 fn handle_core_command(
1056 &mut self, qconn: &mut QuicheConnection, cmd: H3Command,
1057 ) -> H3ConnectionResult<()> {
1058 match cmd {
1059 H3Command::QuicCmd(cmd) => cmd.execute(qconn),
1060 H3Command::GoAway => {
1061 let max_id = self.max_stream_seen;
1062 self.conn_mut()
1063 .expect("connection should be established")
1064 .send_goaway(qconn, max_id)?;
1065 },
1066 H3Command::ShutdownStream {
1067 stream_id,
1068 shutdown,
1069 } => {
1070 self.shutdown_stream(qconn, stream_id, shutdown)?;
1071 },
1072 }
1073 Ok(())
1074 }
1075}
1076
1077impl<H: DriverHooks> H3Driver<H> {
1078 /// Reads all buffered datagrams out of `qconn` and distributes them to
1079 /// their flow channels.
1080 fn process_available_dgrams(
1081 &mut self, qconn: &mut QuicheConnection,
1082 ) -> H3ConnectionResult<()> {
1083 loop {
1084 match datagram::receive_h3_dgram(qconn) {
1085 Ok((flow_id, dgram)) => {
1086 self.get_or_insert_flow(flow_id)?.send_best_effort(dgram);
1087 },
1088 Err(quiche::Error::Done) => return Ok(()),
1089 Err(err) => return Err(H3ConnectionError::from(err)),
1090 }
1091 }
1092 }
1093
1094 /// Flushes any queued-up frames for `stream_id` into `qconn` until either
1095 /// there is no more capacity in `qconn` or no more frames to send.
1096 fn process_writable_stream(
1097 &mut self, qconn: &mut QuicheConnection, stream_id: u64,
1098 ) -> H3ConnectionResult<()> {
1099 // Split self borrow between conn and stream_map
1100 let conn = self.conn.as_mut().ok_or(Self::connection_not_present())?;
1101 let Some(ctx) = self.stream_map.get_mut(&stream_id) else {
1102 return Ok(()); // Unknown stream_id
1103 };
1104
1105 loop {
1106 // Process each writable frame, queue the next frame for processing
1107 // and shut down any errored streams.
1108 match Self::process_write_frame(conn, qconn, ctx) {
1109 Ok(()) => ctx.queued_frame = None,
1110 Err(h3::Error::StreamBlocked | h3::Error::Done) => break,
1111 Err(h3::Error::MessageError) => {
1112 return self.shutdown_stream(
1113 qconn,
1114 stream_id,
1115 StreamShutdown::Both {
1116 read_error_code: WireErrorCode::MessageError as u64,
1117 write_error_code: WireErrorCode::MessageError as u64,
1118 },
1119 );
1120 },
1121 Err(h3::Error::TransportError(quiche::Error::StreamStopped(
1122 e,
1123 ))) => {
1124 ctx.handle_recvd_stop_sending(e);
1125 if ctx.both_directions_done() {
1126 return self.cleanup_stream(qconn, stream_id);
1127 } else {
1128 return Ok(());
1129 }
1130 },
1131 Err(h3::Error::TransportError(
1132 quiche::Error::InvalidStreamState(stream),
1133 )) => {
1134 return self.cleanup_stream(qconn, stream);
1135 },
1136 Err(_) => {
1137 return self.cleanup_stream(qconn, stream_id);
1138 },
1139 }
1140
1141 let Some(recv) = ctx.recv.as_mut() else {
1142 // This stream is already waiting for data or we wrote a fin and
1143 // closed the channel.
1144 debug_assert!(
1145 ctx.queued_frame.is_none(),
1146 "We MUST NOT have a queued frame if we are already waiting on
1147 more data from the channel"
1148 );
1149 return Ok(());
1150 };
1151
1152 // Attempt to queue the next frame for processing. The corresponding
1153 // sender is created at the same time as the `StreamCtx`
1154 // and ultimately ends up in an `H3Body`. The body then
1155 // determines which frames to send to the peer via
1156 // this processing loop.
1157 match recv.try_recv() {
1158 Ok(frame) => ctx.queued_frame = Some(frame),
1159 Err(TryRecvError::Disconnected) => {
1160 if !ctx.fin_or_reset_sent &&
1161 ctx.associated_dgram_flow_id.is_none()
1162 // The channel might be closed if the stream was used to
1163 // initiate a datagram exchange.
1164 // TODO: ideally, the application would still shut down the
1165 // stream properly. Once applications code
1166 // is fixed, we can remove this check.
1167 {
1168 // The channel closed without having written a fin. Send a
1169 // RESET_STREAM to indicate we won't be writing anything
1170 // else
1171 let err = h3::WireErrorCode::RequestCancelled as u64;
1172 let _ = qconn.stream_shutdown(
1173 stream_id,
1174 quiche::Shutdown::Write,
1175 err,
1176 );
1177 ctx.handle_sent_reset(err);
1178 if ctx.both_directions_done() {
1179 return self.cleanup_stream(qconn, stream_id);
1180 }
1181 }
1182 break;
1183 },
1184 Err(TryRecvError::Empty) => {
1185 self.waiting_streams.push(ctx.wait_for_recv(stream_id));
1186 break;
1187 },
1188 }
1189 }
1190
1191 Ok(())
1192 }
1193
1194 /// Tests `qconn` for either a local or peer error and increments
1195 /// the associated HTTP/3 or QUIC error counter.
1196 fn record_quiche_error(qconn: &mut QuicheConnection, metrics: &impl Metrics) {
1197 // split metrics between local/peer and QUIC/HTTP/3 level errors
1198 if let Some(err) = qconn.local_error() {
1199 if err.is_app {
1200 metrics.local_h3_conn_close_error_count(err.error_code.into())
1201 } else {
1202 metrics.local_quic_conn_close_error_count(err.error_code.into())
1203 }
1204 .inc();
1205 } else if let Some(err) = qconn.peer_error() {
1206 if err.is_app {
1207 metrics.peer_h3_conn_close_error_count(err.error_code.into())
1208 } else {
1209 metrics.peer_quic_conn_close_error_count(err.error_code.into())
1210 }
1211 .inc();
1212 }
1213 }
1214}
1215
1216impl<H: DriverHooks> ApplicationOverQuic for H3Driver<H> {
1217 fn on_conn_established(
1218 &mut self, quiche_conn: &mut QuicheConnection,
1219 handshake_info: &HandshakeInfo,
1220 ) -> QuicResult<()> {
1221 let conn = h3::Connection::with_transport(quiche_conn, &self.h3_config)?;
1222 self.conn = Some(conn);
1223
1224 H::conn_established(self, quiche_conn, handshake_info)?;
1225 Ok(())
1226 }
1227
1228 #[inline]
1229 fn should_act(&self) -> bool {
1230 self.conn.is_some()
1231 }
1232
1233 #[inline]
1234 fn buffer(&mut self) -> &mut [u8] {
1235 &mut self.io_worker_buf
1236 }
1237
1238 /// Poll the underlying [`quiche::h3::Connection`] for
1239 /// [`quiche::h3::Event`]s and DATAGRAMs, delegating processing to
1240 /// `Self::process_read_event`.
1241 ///
1242 /// If a DATAGRAM is found, it is sent to the receiver on its channel.
1243 fn process_reads(&mut self, qconn: &mut QuicheConnection) -> QuicResult<()> {
1244 loop {
1245 match self.conn_mut()?.poll(qconn) {
1246 Ok((stream_id, event)) =>
1247 self.process_read_event(qconn, stream_id, event)?,
1248 Err(h3::Error::Done) => break,
1249 Err(err) => {
1250 // Don't bubble error up, instead keep the worker loop going
1251 // until quiche reports the connection is
1252 // closed.
1253 log::debug!("connection closed due to h3 protocol error"; "error"=>?err);
1254 return Ok(());
1255 },
1256 };
1257 }
1258
1259 self.process_available_dgrams(qconn)?;
1260 Ok(())
1261 }
1262
1263 /// Write as much data as possible into the [`quiche::h3::Connection`] from
1264 /// all sources. This will attempt to write any queued frames into their
1265 /// respective streams, if writable.
1266 fn process_writes(&mut self, qconn: &mut QuicheConnection) -> QuicResult<()> {
1267 while let Some(stream_id) = qconn.stream_writable_next() {
1268 self.process_writable_stream(qconn, stream_id)?;
1269 }
1270
1271 // Also optimistically check for any ready streams
1272 while let Some(Some(ready)) = self.waiting_streams.next().now_or_never() {
1273 self.upstream_ready(qconn, ready)?;
1274 }
1275
1276 Ok(())
1277 }
1278
1279 /// Reports connection-level error metrics and forwards
1280 /// IOWorker errors to the associated [H3Controller].
1281 fn on_conn_close<M: Metrics>(
1282 &mut self, quiche_conn: &mut QuicheConnection, metrics: &M,
1283 work_loop_result: &QuicResult<()>,
1284 ) {
1285 let max_stream_seen = self.max_stream_seen;
1286 metrics
1287 .maximum_writable_streams()
1288 .observe(max_stream_seen as f64);
1289
1290 Self::record_quiche_error(quiche_conn, metrics);
1291
1292 let Err(work_loop_error) = work_loop_result else {
1293 return;
1294 };
1295
1296 let Some(h3_err) = work_loop_error.downcast_ref::<H3ConnectionError>()
1297 else {
1298 log::error!("Found non-H3ConnectionError"; "error" => %work_loop_error);
1299 return;
1300 };
1301
1302 if matches!(h3_err, H3ConnectionError::ControllerWentAway) {
1303 // Inform client that we won't (can't) respond anymore
1304 let _ = quiche_conn.close(true, WireErrorCode::NoError as u64, &[]);
1305 return;
1306 }
1307
1308 if let Some(ev) = H3Event::from_error(h3_err) {
1309 let _ = self.h3_event_sender.send(ev.into());
1310 #[expect(clippy::needless_return)]
1311 return; // avoid accidental fallthrough in the future
1312 }
1313 }
1314
1315 /// Wait for incoming data from the [H3Controller]. The next iteration of
1316 /// the I/O loop commences when one of the `select!`ed futures triggers.
1317 #[inline]
1318 async fn wait_for_data(
1319 &mut self, qconn: &mut QuicheConnection,
1320 ) -> QuicResult<()> {
1321 select! {
1322 biased;
1323 Some(ready) = self.waiting_streams.next() => self.upstream_ready(qconn, ready),
1324 Some(dgram) = self.dgram_recv.recv() => self.dgram_ready(qconn, dgram),
1325 Some(cmd) = self.cmd_recv.recv() => H::conn_command(self, qconn, cmd),
1326 r = self.hooks.wait_for_action(qconn), if H::has_wait_action(self) => r,
1327 }?;
1328
1329 // Make sure controller is not starved, but also not prioritized in the
1330 // biased select. So poll it last, however also perform a try_recv
1331 // each iteration.
1332 if let Ok(cmd) = self.cmd_recv.try_recv() {
1333 H::conn_command(self, qconn, cmd)?;
1334 }
1335
1336 Ok(())
1337 }
1338}
1339
1340impl<H: DriverHooks> Drop for H3Driver<H> {
1341 fn drop(&mut self) {
1342 for stream in self.stream_map.values() {
1343 stream
1344 .audit_stats
1345 .set_recvd_stream_fin(StreamClosureKind::Implicit);
1346 }
1347 }
1348}
1349
1350/// [`H3Command`]s are sent by the [H3Controller] to alter the [H3Driver]'s
1351/// state.
1352///
1353/// Both [ServerH3Driver] and [ClientH3Driver] may extend this enum with
1354/// endpoint-specific variants.
1355#[derive(Debug)]
1356pub enum H3Command {
1357 /// A connection-level command that executes directly on the
1358 /// [`quiche::Connection`].
1359 QuicCmd(QuicCommand),
1360 /// Send a GOAWAY frame to the peer to initiate a graceful connection
1361 /// shutdown.
1362 GoAway,
1363 /// Shuts down a stream in the specified direction(s) and removes it from
1364 /// local state.
1365 ///
1366 /// This removes the stream from local state and sends a `RESET_STREAM`
1367 /// frame (for write direction) and/or a `STOP_SENDING` frame (for read
1368 /// direction) to the peer. See [`quiche::Connection::stream_shutdown`]
1369 /// for details.
1370 ShutdownStream {
1371 stream_id: u64,
1372 shutdown: StreamShutdown,
1373 },
1374}
1375
1376/// Specifies which direction(s) of a stream to shut down.
1377///
1378/// Used with [`H3Controller::shutdown_stream`] and the internal
1379/// `shutdown_stream` function to control whether to send a `STOP_SENDING` frame
1380/// (read direction), and/or a `RESET_STREAM` frame (write direction)
1381///
1382/// Note: Despite its name, "shutdown" here refers to signaling the peer about
1383/// stream termination, not sending a FIN flag. `STOP_SENDING` asks the peer to
1384/// stop sending data, while `RESET_STREAM` abruptly terminates the write side.
1385#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1386pub enum StreamShutdown {
1387 /// Shut down only the read direction (sends `STOP_SENDING` frame with the
1388 /// given error code).
1389 Read { error_code: u64 },
1390 /// Shut down only the write direction (sends `RESET_STREAM` frame with the
1391 /// given error code).
1392 Write { error_code: u64 },
1393 /// Shut down both directions (sends both `STOP_SENDING` and `RESET_STREAM`
1394 /// frames).
1395 Both {
1396 read_error_code: u64,
1397 write_error_code: u64,
1398 },
1399}
1400
1401/// Sends [`H3Command`]s to an [H3Driver]. The sender is typed and internally
1402/// wraps instances of `T` in the appropriate `H3Command` variant.
1403pub struct RequestSender<C, T> {
1404 sender: UnboundedSender<C>,
1405 // Required to work around dangling type parameter
1406 _r: PhantomData<fn() -> T>,
1407}
1408
1409impl<C, T: Into<C>> RequestSender<C, T> {
1410 /// Send a request to the [H3Driver]. This can only fail if the driver is
1411 /// gone.
1412 #[inline(always)]
1413 pub fn send(&self, v: T) -> Result<(), mpsc::error::SendError<C>> {
1414 self.sender.send(v.into())
1415 }
1416}
1417
1418impl<C, T> Clone for RequestSender<C, T> {
1419 fn clone(&self) -> Self {
1420 Self {
1421 sender: self.sender.clone(),
1422 _r: Default::default(),
1423 }
1424 }
1425}
1426
1427/// Interface to communicate with a paired [H3Driver].
1428///
1429/// An [H3Controller] receives [`H3Event`]s from its driver, which must be
1430/// consumed by the application built on top of the driver to react to incoming
1431/// events. The controller also allows the application to send ad-hoc
1432/// [`H3Command`]s to the driver, which will be processed when the driver waits
1433/// for incoming data.
1434pub struct H3Controller<H: DriverHooks> {
1435 /// Sends [`H3Command`]s to the [H3Driver], like [`QuicCommand`]s or
1436 /// outbound HTTP requests.
1437 cmd_sender: UnboundedSender<H::Command>,
1438 /// Receives [`H3Event`]s from the [H3Driver]. Can be extracted and
1439 /// used independently of the [H3Controller].
1440 h3_event_recv: Option<UnboundedReceiver<H::Event>>,
1441}
1442
1443impl<H: DriverHooks> H3Controller<H> {
1444 /// Gets a mut reference to the [`H3Event`] receiver for the paired
1445 /// [H3Driver].
1446 pub fn event_receiver_mut(&mut self) -> &mut UnboundedReceiver<H::Event> {
1447 self.h3_event_recv
1448 .as_mut()
1449 .expect("No event receiver on H3Controller")
1450 }
1451
1452 /// Takes the [`H3Event`] receiver for the paired [H3Driver].
1453 pub fn take_event_receiver(&mut self) -> UnboundedReceiver<H::Event> {
1454 self.h3_event_recv
1455 .take()
1456 .expect("No event receiver on H3Controller")
1457 }
1458
1459 /// Creates a [`QuicCommand`] sender for the paired [H3Driver].
1460 pub fn cmd_sender(&self) -> RequestSender<H::Command, QuicCommand> {
1461 RequestSender {
1462 sender: self.cmd_sender.clone(),
1463 _r: Default::default(),
1464 }
1465 }
1466
1467 /// Sends a GOAWAY frame to initiate a graceful connection shutdown.
1468 pub fn send_goaway(&self) {
1469 let _ = self.cmd_sender.send(H3Command::GoAway.into());
1470 }
1471
1472 /// Creates an [`H3Command`] sender for the paired [H3Driver].
1473 pub fn h3_cmd_sender(&self) -> RequestSender<H::Command, H3Command> {
1474 RequestSender {
1475 sender: self.cmd_sender.clone(),
1476 _r: Default::default(),
1477 }
1478 }
1479
1480 /// Shuts down a stream in the specified direction(s) and removes it from
1481 /// local state.
1482 ///
1483 /// This removes the stream from local state and sends a `RESET_STREAM`
1484 /// frame (for write direction) and/or a `STOP_SENDING` frame (for read
1485 /// direction) to the peer, depending on the [`StreamShutdown`] variant.
1486 pub fn shutdown_stream(&self, stream_id: u64, shutdown: StreamShutdown) {
1487 let _ = self.cmd_sender.send(
1488 H3Command::ShutdownStream {
1489 stream_id,
1490 shutdown,
1491 }
1492 .into(),
1493 );
1494 }
1495}