tokio_quiche/http3/driver/mod.rs
1// Copyright (C) 2025, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8// * Redistributions of source code must retain the above copyright notice,
9// this list of conditions and the following disclaimer.
10//
11// * Redistributions in binary form must reproduce the above copyright
12// notice, this list of conditions and the following disclaimer in the
13// documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27mod client;
28/// Wrapper for running HTTP/3 connections.
29pub mod connection;
30mod datagram;
31// `DriverHooks` must stay private to prevent users from creating their own
32// H3Drivers.
33mod hooks;
34mod server;
35mod streams;
36#[cfg(test)]
37pub mod test_utils;
38#[cfg(test)]
39mod tests;
40
41use std::collections::BTreeMap;
42use std::error::Error;
43use std::fmt;
44use std::marker::PhantomData;
45use std::sync::Arc;
46use std::time::Instant;
47
48use datagram_socket::StreamClosureKind;
49use foundations::telemetry::log;
50use futures::FutureExt;
51use futures_util::stream::FuturesUnordered;
52use quiche::h3;
53use tokio::select;
54use tokio::sync::mpsc;
55use tokio::sync::mpsc::error::TryRecvError;
56use tokio::sync::mpsc::error::TrySendError;
57use tokio::sync::mpsc::UnboundedReceiver;
58use tokio::sync::mpsc::UnboundedSender;
59use tokio_stream::StreamExt;
60use tokio_util::sync::PollSender;
61
62use self::hooks::DriverHooks;
63use self::hooks::InboundHeaders;
64use self::streams::FlowCtx;
65use self::streams::HaveUpstreamCapacity;
66use self::streams::ReceivedDownstreamData;
67use self::streams::StreamCtx;
68use self::streams::StreamReady;
69use self::streams::WaitForDownstreamData;
70use self::streams::WaitForStream;
71use self::streams::WaitForUpstreamCapacity;
72use crate::buf_factory::BufFactory;
73use crate::buf_factory::PooledBuf;
74use crate::buf_factory::PooledDgram;
75use crate::http3::settings::Http3Settings;
76use crate::http3::H3AuditStats;
77use crate::metrics::Metrics;
78use crate::quic::HandshakeInfo;
79use crate::quic::QuicCommand;
80use crate::quic::QuicheConnection;
81use crate::ApplicationOverQuic;
82use crate::QuicResult;
83
84pub use self::client::ClientEventStream;
85pub use self::client::ClientH3Command;
86pub use self::client::ClientH3Controller;
87pub use self::client::ClientH3Driver;
88pub use self::client::ClientH3Event;
89pub use self::client::ClientRequestSender;
90pub use self::client::NewClientRequest;
91pub use self::server::RawPriorityValue;
92pub use self::server::ServerEventStream;
93pub use self::server::ServerH3Command;
94pub use self::server::ServerH3Controller;
95pub use self::server::ServerH3Driver;
96pub use self::server::ServerH3Event;
97
98// The default priority for HTTP/3 responses if the application didn't provide
99// one.
100const DEFAULT_PRIO: h3::Priority = h3::Priority::new(3, true);
101
102// For a stream use a channel with 16 entries, which works out to 16 * 64KB =
103// 1MB of max buffered data.
104#[cfg(not(any(test, debug_assertions)))]
105const STREAM_CAPACITY: usize = 16;
106#[cfg(any(test, debug_assertions))]
107const STREAM_CAPACITY: usize = 1; // Set to 1 to stress write_pending under test conditions
108
109// For *all* flows use a shared channel with 2048 entries, which works out
110// to 3MB of max buffered data at 1500 bytes per datagram.
111const FLOW_CAPACITY: usize = 2048;
112
113/// Used by a local task to send [`OutboundFrame`]s to a peer on the
114/// stream or flow associated with this channel.
115pub type OutboundFrameSender = PollSender<OutboundFrame>;
116
117/// Used internally to receive [`OutboundFrame`]s which should be sent to a peer
118/// on the stream or flow associated with this channel.
119type OutboundFrameStream = mpsc::Receiver<OutboundFrame>;
120
121/// Used internally to send [`InboundFrame`]s (data) from the peer to a local
122/// task on the stream or flow associated with this channel.
123type InboundFrameSender = PollSender<InboundFrame>;
124
125/// Used by a local task to receive [`InboundFrame`]s (data) on the stream or
126/// flow associated with this channel.
127pub type InboundFrameStream = mpsc::Receiver<InboundFrame>;
128
129/// The error type used internally in [H3Driver].
130///
131/// Note that [`ApplicationOverQuic`] errors are not exposed to users at this
132/// time. The type is public to document the failure modes in [H3Driver].
133#[derive(Debug, PartialEq, Eq)]
134#[non_exhaustive]
135pub enum H3ConnectionError {
136 /// The controller task was shut down and is no longer listening.
137 ControllerWentAway,
138 /// Other error at the connection, but not stream level.
139 H3(h3::Error),
140 /// Received a GOAWAY frame from the peer.
141 GoAway,
142 /// Received data for a stream that was closed or never opened.
143 NonexistentStream,
144 /// The server's post-accept timeout was hit.
145 /// The timeout can be configured in [`Http3Settings`].
146 PostAcceptTimeout,
147}
148
149impl From<h3::Error> for H3ConnectionError {
150 fn from(err: h3::Error) -> Self {
151 H3ConnectionError::H3(err)
152 }
153}
154
155impl From<quiche::Error> for H3ConnectionError {
156 fn from(err: quiche::Error) -> Self {
157 H3ConnectionError::H3(h3::Error::TransportError(err))
158 }
159}
160
161impl Error for H3ConnectionError {}
162
163impl fmt::Display for H3ConnectionError {
164 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
165 let s: &dyn fmt::Display = match self {
166 Self::ControllerWentAway => &"controller went away",
167 Self::H3(e) => e,
168 Self::GoAway => &"goaway",
169 Self::NonexistentStream => &"nonexistent stream",
170 Self::PostAcceptTimeout => &"post accept timeout hit",
171 };
172
173 write!(f, "H3ConnectionError: {s}")
174 }
175}
176
177type H3ConnectionResult<T> = Result<T, H3ConnectionError>;
178
179/// HTTP/3 headers that were received on a stream.
180///
181/// `recv` is used to read the message body, while `send` is used to transmit
182/// data back to the peer.
183pub struct IncomingH3Headers {
184 /// Stream ID of the frame.
185 pub stream_id: u64,
186 /// The actual [`h3::Header`]s which were received.
187 pub headers: Vec<h3::Header>,
188 /// An [`OutboundFrameSender`] for streaming body data to the peer. For
189 /// [ClientH3Driver], note that the request body can also be passed a
190 /// cloned sender via [`NewClientRequest`].
191 pub send: OutboundFrameSender,
192 /// An [`InboundFrameStream`] of body data received from the peer.
193 pub recv: InboundFrameStream,
194 /// Whether there is a body associated with the incoming headers.
195 pub read_fin: bool,
196 /// Handle to the [`H3AuditStats`] for the message's stream.
197 pub h3_audit_stats: Arc<H3AuditStats>,
198}
199
200impl fmt::Debug for IncomingH3Headers {
201 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
202 f.debug_struct("IncomingH3Headers")
203 .field("stream_id", &self.stream_id)
204 .field("headers", &self.headers)
205 .field("read_fin", &self.read_fin)
206 .field("h3_audit_stats", &self.h3_audit_stats)
207 .finish()
208 }
209}
210
211/// [`H3Event`]s are produced by an [H3Driver] to describe HTTP/3 state updates.
212///
213/// Both [ServerH3Driver] and [ClientH3Driver] may extend this enum with
214/// endpoint-specific variants. The events must be consumed by users of the
215/// drivers, like a higher-level `Server` or `Client` controller.
216#[derive(Debug)]
217pub enum H3Event {
218 /// A SETTINGS frame was received.
219 IncomingSettings {
220 /// Raw HTTP/3 setting pairs, in the order received from the peer.
221 settings: Vec<(u64, u64)>,
222 },
223
224 /// A HEADERS frame was received on the given stream. This is either a
225 /// request or a response depending on the perspective of the [`H3Event`]
226 /// receiver.
227 IncomingHeaders(IncomingH3Headers),
228
229 /// A DATAGRAM flow was created and associated with the given `flow_id`.
230 /// This event is fired before a HEADERS event for CONNECT[-UDP] requests.
231 NewFlow {
232 /// Flow ID of the new flow.
233 flow_id: u64,
234 /// An [`OutboundFrameSender`] for transmitting datagrams to the peer.
235 send: OutboundFrameSender,
236 /// An [`InboundFrameStream`] for receiving datagrams from the peer.
237 recv: InboundFrameStream,
238 },
239 /// A RST_STREAM frame was seen on the given `stream_id`. The user of the
240 /// driver should clean up any state allocated for this stream.
241 ResetStream { stream_id: u64 },
242 /// The connection has irrecoverably errored and is shutting down.
243 ConnectionError(h3::Error),
244 /// The connection has been shutdown, optionally due to an
245 /// [`H3ConnectionError`].
246 ConnectionShutdown(Option<H3ConnectionError>),
247 /// Body data has been received over a stream.
248 BodyBytesReceived {
249 /// Stream ID of the body data.
250 stream_id: u64,
251 /// Number of bytes received.
252 num_bytes: u64,
253 /// Whether the stream is finished and won't yield any more data.
254 fin: bool,
255 },
256 /// The stream has been closed. This is used to signal stream closures that
257 /// don't result from RST_STREAM frames, unlike the
258 /// [`H3Event::ResetStream`] variant.
259 StreamClosed { stream_id: u64 },
260}
261
262impl H3Event {
263 /// Generates an event from an applicable [`H3ConnectionError`].
264 fn from_error(err: &H3ConnectionError) -> Option<Self> {
265 Some(match err {
266 H3ConnectionError::H3(e) => Self::ConnectionError(*e),
267 H3ConnectionError::PostAcceptTimeout => Self::ConnectionShutdown(
268 Some(H3ConnectionError::PostAcceptTimeout),
269 ),
270 _ => return None,
271 })
272 }
273}
274
275/// An [`OutboundFrame`] is a data frame that should be sent from a local task
276/// to a peer over a [`quiche::h3::Connection`].
277///
278/// This is used, for example, to send response body data to a peer, or proxied
279/// UDP datagrams.
280#[derive(Debug)]
281pub enum OutboundFrame {
282 /// Response headers to be sent to the peer, with optional priority.
283 Headers(Vec<h3::Header>, Option<quiche::h3::Priority>),
284 /// Response body/CONNECT downstream data plus FIN flag.
285 #[cfg(feature = "zero-copy")]
286 Body(crate::buf_factory::QuicheBuf, bool),
287 /// Response body/CONNECT downstream data plus FIN flag.
288 #[cfg(not(feature = "zero-copy"))]
289 Body(PooledBuf, bool),
290 /// CONNECT-UDP (DATAGRAM) downstream data plus flow ID.
291 Datagram(PooledDgram, u64),
292 /// An error encountered when serving the request. Stream should be closed.
293 PeerStreamError,
294 /// DATAGRAM flow explicitly closed.
295 FlowShutdown { flow_id: u64, stream_id: u64 },
296}
297
298impl OutboundFrame {
299 /// Creates a body frame with the provided buffer.
300 pub fn body(body: PooledBuf, fin: bool) -> Self {
301 #[cfg(feature = "zero-copy")]
302 let body = crate::buf_factory::QuicheBuf::new(body);
303
304 OutboundFrame::Body(body, fin)
305 }
306}
307
308/// An [`InboundFrame`] is a data frame that was received from the peer over a
309/// [`quiche::h3::Connection`]. This is used by peers to send body or datagrams
310/// to the local task.
311#[derive(Debug)]
312pub enum InboundFrame {
313 /// Request body/CONNECT upstream data plus FIN flag.
314 Body(PooledBuf, bool),
315 /// CONNECT-UDP (DATAGRAM) upstream data.
316 Datagram(PooledDgram),
317}
318
319/// A ready-made [`ApplicationOverQuic`] which can handle HTTP/3 and MASQUE.
320/// Depending on the `DriverHooks` in use, it powers either a client or a
321/// server.
322///
323/// Use the [ClientH3Driver] and [ServerH3Driver] aliases to access the
324/// respective driver types. The driver is passed into an I/O loop and
325/// communicates with the driver's user (e.g., an HTTP client or a server) via
326/// its associated [H3Controller]. The controller allows the application to both
327/// listen for [`H3Event`]s of note and send [`H3Command`]s into the I/O loop.
328pub struct H3Driver<H: DriverHooks> {
329 /// Configuration used to initialize `conn`. Created from [`Http3Settings`]
330 /// in the constructor.
331 h3_config: h3::Config,
332 /// The underlying HTTP/3 connection. Initialized in
333 /// `ApplicationOverQuic::on_conn_established`.
334 conn: Option<h3::Connection>,
335 /// State required by the client/server hooks.
336 hooks: H,
337 /// Sends [`H3Event`]s to the [H3Controller] paired with this driver.
338 h3_event_sender: mpsc::UnboundedSender<H::Event>,
339 /// Receives [`H3Command`]s from the [H3Controller] paired with this driver.
340 cmd_recv: mpsc::UnboundedReceiver<H::Command>,
341
342 /// A map of stream IDs to their [StreamCtx]. This is mainly used to
343 /// retrieve the internal Tokio channels associated with the stream.
344 stream_map: BTreeMap<u64, StreamCtx>,
345 /// A map of flow IDs to their [FlowCtx]. This is mainly used to retrieve
346 /// the internal Tokio channels associated with the flow.
347 flow_map: BTreeMap<u64, FlowCtx>,
348 /// Set of [`WaitForStream`] futures. A stream is added to this set if
349 /// we need to send to it and its channel is at capacity, or if we need
350 /// data from its channel and the channel is empty.
351 waiting_streams: FuturesUnordered<WaitForStream>,
352
353 /// Receives [`OutboundFrame`]s from all datagram flows on the connection.
354 dgram_recv: OutboundFrameStream,
355 /// Keeps the datagram channel open such that datagram flows can be created.
356 dgram_send: OutboundFrameSender,
357
358 /// The buffer used to interact with the underlying IoWorker.
359 pooled_buf: PooledBuf,
360 /// The maximum HTTP/3 stream ID seen on this connection.
361 max_stream_seen: u64,
362
363 /// Tracks whether we have forwarded the HTTP/3 SETTINGS frame
364 /// to the [H3Controller] once.
365 settings_received_and_forwarded: bool,
366}
367
368impl<H: DriverHooks> H3Driver<H> {
369 /// Builds a new [H3Driver] and an associated [H3Controller].
370 ///
371 /// The driver should then be passed to
372 /// [`InitialQuicConnection`](crate::InitialQuicConnection)'s `start`
373 /// method.
374 pub fn new(http3_settings: Http3Settings) -> (Self, H3Controller<H>) {
375 let (dgram_send, dgram_recv) = mpsc::channel(FLOW_CAPACITY);
376 let (cmd_sender, cmd_recv) = mpsc::unbounded_channel();
377 let (h3_event_sender, h3_event_recv) = mpsc::unbounded_channel();
378
379 (
380 H3Driver {
381 h3_config: (&http3_settings).into(),
382 conn: None,
383 hooks: H::new(&http3_settings),
384 h3_event_sender,
385 cmd_recv,
386
387 stream_map: BTreeMap::new(),
388 flow_map: BTreeMap::new(),
389
390 dgram_recv,
391 dgram_send: PollSender::new(dgram_send),
392 pooled_buf: BufFactory::get_max_buf(),
393 max_stream_seen: 0,
394
395 waiting_streams: FuturesUnordered::new(),
396
397 settings_received_and_forwarded: false,
398 },
399 H3Controller {
400 cmd_sender,
401 h3_event_recv: Some(h3_event_recv),
402 },
403 )
404 }
405
406 /// Retrieve the [FlowCtx] associated with the given `flow_id`. If no
407 /// context is found, a new one will be created.
408 fn get_or_insert_flow(
409 &mut self, flow_id: u64,
410 ) -> H3ConnectionResult<&mut FlowCtx> {
411 use std::collections::btree_map::Entry;
412 Ok(match self.flow_map.entry(flow_id) {
413 Entry::Vacant(e) => {
414 // This is a datagram for a new flow we haven't seen before
415 let (flow, recv) = FlowCtx::new(FLOW_CAPACITY);
416 let flow_req = H3Event::NewFlow {
417 flow_id,
418 recv,
419 send: self.dgram_send.clone(),
420 };
421 self.h3_event_sender
422 .send(flow_req.into())
423 .map_err(|_| H3ConnectionError::ControllerWentAway)?;
424 e.insert(flow)
425 },
426 Entry::Occupied(e) => e.into_mut(),
427 })
428 }
429
430 /// Adds a [StreamCtx] to the stream map with the given `stream_id`.
431 fn insert_stream(&mut self, stream_id: u64, ctx: StreamCtx) {
432 self.stream_map.insert(stream_id, ctx);
433 self.max_stream_seen = self.max_stream_seen.max(stream_id);
434 }
435
436 /// Fetches body chunks from the [`quiche::h3::Connection`] and forwards
437 /// them to the stream's associated [`InboundFrameStream`].
438 fn process_h3_data(
439 &mut self, qconn: &mut QuicheConnection, stream_id: u64,
440 ) -> H3ConnectionResult<()> {
441 // Split self borrow between conn and stream_map
442 let conn = self.conn.as_mut().ok_or(Self::connection_not_present())?;
443 let ctx = self
444 .stream_map
445 .get_mut(&stream_id)
446 .ok_or(H3ConnectionError::NonexistentStream)?;
447
448 enum StreamStatus {
449 Done { close: bool },
450 Reset { wire_err_code: u64 },
451 Blocked,
452 }
453
454 let status = loop {
455 let Some(sender) = ctx.send.as_ref().and_then(PollSender::get_ref)
456 else {
457 // already waiting for capacity
458 break StreamStatus::Done { close: false };
459 };
460
461 let try_reserve_result = sender.try_reserve();
462 let permit = match try_reserve_result {
463 Ok(permit) => permit,
464 Err(TrySendError::Closed(())) => {
465 // The channel has closed before we delivered a fin or reset
466 // to the application.
467 if !ctx.fin_or_reset_recv &&
468 ctx.associated_dgram_flow_id.is_none()
469 // The channel might be closed if the stream was used to
470 // initiate a datagram exchange.
471 // TODO: ideally, the application would still shut down the
472 // stream properly. Once applications code
473 // is fixed, we can remove this check.
474 {
475 let err = h3::WireErrorCode::RequestCancelled as u64;
476 let _ = qconn.stream_shutdown(
477 stream_id,
478 quiche::Shutdown::Read,
479 err,
480 );
481 drop(try_reserve_result); // needed to drop the borrow on ctx.
482 ctx.handle_sent_stop_sending(err);
483 // TODO: should we send an H3Event event to
484 // h3_event_sender? We can only get here if the app
485 // actively closed or dropped
486 // the channel so any event we send would be more for
487 // logging or auditing
488 }
489 break StreamStatus::Done {
490 close: ctx.both_directions_done(),
491 };
492 },
493 Err(TrySendError::Full(())) => {
494 if ctx.fin_or_reset_recv || qconn.stream_readable(stream_id) {
495 break StreamStatus::Blocked;
496 }
497 break StreamStatus::Done { close: false };
498 },
499 };
500
501 if ctx.fin_or_reset_recv {
502 // Signal end-of-body to upstream
503 permit
504 .send(InboundFrame::Body(BufFactory::get_empty_buf(), true));
505 break StreamStatus::Done {
506 close: ctx.fin_or_reset_sent,
507 };
508 }
509
510 match conn.recv_body(qconn, stream_id, &mut self.pooled_buf) {
511 Ok(n) => {
512 let mut body = std::mem::replace(
513 &mut self.pooled_buf,
514 BufFactory::get_max_buf(),
515 );
516 body.truncate(n);
517
518 ctx.audit_stats.add_downstream_bytes_recvd(n as u64);
519 let event = H3Event::BodyBytesReceived {
520 stream_id,
521 num_bytes: n as u64,
522 fin: false,
523 };
524 let _ = self.h3_event_sender.send(event.into());
525
526 permit.send(InboundFrame::Body(body, false));
527 },
528 Err(h3::Error::Done) =>
529 break StreamStatus::Done { close: false },
530 Err(h3::Error::TransportError(quiche::Error::StreamReset(
531 code,
532 ))) => {
533 break StreamStatus::Reset {
534 wire_err_code: code,
535 };
536 },
537 Err(_) => break StreamStatus::Done { close: true },
538 }
539 };
540
541 match status {
542 StreamStatus::Done { close } => {
543 if close {
544 return self.finish_stream(qconn, stream_id, None, None);
545 }
546
547 // The QUIC stream is finished, manually invoke `process_h3_fin`
548 // in case `h3::poll()` is never called again.
549 //
550 // Note that this case will not conflict with StreamStatus::Done
551 // being returned due to the body channel being
552 // blocked. qconn.stream_finished() will guarantee
553 // that we've fully parsed the body as it only returns true
554 // if we've seen a Fin for the read half of the stream.
555 if !ctx.fin_or_reset_recv && qconn.stream_finished(stream_id) {
556 return self.process_h3_fin(qconn, stream_id);
557 }
558 },
559 StreamStatus::Reset { wire_err_code } => {
560 debug_assert!(ctx.send.is_some());
561 ctx.handle_recvd_reset(wire_err_code);
562 self.h3_event_sender
563 .send(H3Event::ResetStream { stream_id }.into())
564 .map_err(|_| H3ConnectionError::ControllerWentAway)?;
565 if ctx.both_directions_done() {
566 return self.finish_stream(qconn, stream_id, None, None);
567 }
568 },
569 StreamStatus::Blocked => {
570 self.waiting_streams.push(ctx.wait_for_send(stream_id));
571 },
572 }
573
574 Ok(())
575 }
576
577 /// Processes an end-of-stream event from the [`quiche::h3::Connection`].
578 fn process_h3_fin(
579 &mut self, qconn: &mut QuicheConnection, stream_id: u64,
580 ) -> H3ConnectionResult<()> {
581 let ctx = self
582 .stream_map
583 .get_mut(&stream_id)
584 .filter(|c| !c.fin_or_reset_recv);
585 let Some(ctx) = ctx else {
586 // Stream is already finished, nothing to do
587 return Ok(());
588 };
589
590 ctx.fin_or_reset_recv = true;
591 ctx.audit_stats
592 .set_recvd_stream_fin(StreamClosureKind::Explicit);
593
594 // It's important to send this H3Event before process_h3_data so that
595 // a server can (potentially) generate the control response before the
596 // corresponding receiver drops.
597 let event = H3Event::BodyBytesReceived {
598 stream_id,
599 num_bytes: 0,
600 fin: true,
601 };
602 let _ = self.h3_event_sender.send(event.into());
603
604 // Communicate fin to upstream. Since `ctx.fin_recv` is true now,
605 // there can't be a recursive loop.
606 self.process_h3_data(qconn, stream_id)
607 }
608
609 /// Processes a single [`quiche::h3::Event`] received from the underlying
610 /// [`quiche::h3::Connection`]. Some events are dispatched to helper
611 /// methods.
612 fn process_read_event(
613 &mut self, qconn: &mut QuicheConnection, stream_id: u64, event: h3::Event,
614 ) -> H3ConnectionResult<()> {
615 self.forward_settings()?;
616
617 match event {
618 // Requests/responses are exclusively handled by hooks.
619 h3::Event::Headers { list, more_frames } =>
620 H::headers_received(self, qconn, InboundHeaders {
621 stream_id,
622 headers: list,
623 has_body: more_frames,
624 }),
625
626 h3::Event::Data => self.process_h3_data(qconn, stream_id),
627 h3::Event::Finished => self.process_h3_fin(qconn, stream_id),
628
629 h3::Event::Reset(code) => {
630 if let Some(ctx) = self.stream_map.get_mut(&stream_id) {
631 ctx.handle_recvd_reset(code);
632 // See if we are waiting on this stream and close the channel
633 // if we are. If we are not waiting, `handle_recvd_reset()`
634 // will have taken care of closing.
635 for pending in self.waiting_streams.iter_mut() {
636 match pending {
637 WaitForStream::Upstream(
638 WaitForUpstreamCapacity {
639 stream_id: id,
640 chan: Some(chan),
641 },
642 ) if stream_id == *id => {
643 chan.close();
644 },
645 _ => {},
646 }
647 }
648
649 self.h3_event_sender
650 .send(H3Event::ResetStream { stream_id }.into())
651 .map_err(|_| H3ConnectionError::ControllerWentAway)?;
652 if ctx.both_directions_done() {
653 return self.finish_stream(qconn, stream_id, None, None);
654 }
655 }
656
657 // TODO: if we don't have the stream in our map: should we
658 // send the H3Event::ResetStream?
659 Ok(())
660 },
661
662 h3::Event::PriorityUpdate => Ok(()),
663 h3::Event::GoAway => Err(H3ConnectionError::GoAway),
664 }
665 }
666
667 /// The SETTINGS frame can be received at any point, so we
668 /// need to check `peer_settings_raw` to decide if we've received it.
669 ///
670 /// Settings should only be sent once, so we generate a single event
671 /// when `peer_settings_raw` transitions from None to Some.
672 fn forward_settings(&mut self) -> H3ConnectionResult<()> {
673 if self.settings_received_and_forwarded {
674 return Ok(());
675 }
676
677 // capture the peer settings and forward it
678 if let Some(settings) = self.conn_mut()?.peer_settings_raw() {
679 let incoming_settings = H3Event::IncomingSettings {
680 settings: settings.to_vec(),
681 };
682
683 self.h3_event_sender
684 .send(incoming_settings.into())
685 .map_err(|_| H3ConnectionError::ControllerWentAway)?;
686
687 self.settings_received_and_forwarded = true;
688 }
689 Ok(())
690 }
691
692 /// Send an individual frame to the underlying [`quiche::h3::Connection`] to
693 /// be flushed at a later time.
694 ///
695 /// `Self::process_writes` will iterate over all writable streams and call
696 /// this method in a loop for each stream to send all writable packets.
697 fn process_write_frame(
698 conn: &mut h3::Connection, qconn: &mut QuicheConnection,
699 ctx: &mut StreamCtx,
700 ) -> h3::Result<()> {
701 let Some(frame) = &mut ctx.queued_frame else {
702 return Ok(());
703 };
704 debug_assert!(
705 ctx.recv.is_some(),
706 "If we have a queued frame in the context, we MUST NOT be waiting
707 on data from the channel"
708 );
709
710 let audit_stats = &ctx.audit_stats;
711 let stream_id = audit_stats.stream_id();
712
713 match frame {
714 OutboundFrame::Headers(headers, priority) => {
715 let prio = priority.as_ref().unwrap_or(&DEFAULT_PRIO);
716
717 let res = if ctx.initial_headers_sent {
718 // Initial headers were already sent, send additional
719 // headers now.
720 conn.send_additional_headers_with_priority(
721 qconn, stream_id, headers, prio, false, false,
722 )
723 } else {
724 // Send initial headers.
725 conn.send_response_with_priority(
726 qconn, stream_id, headers, prio, false,
727 )
728 .inspect(|_| ctx.initial_headers_sent = true)
729 };
730
731 if let Err(h3::Error::StreamBlocked) = res {
732 ctx.first_full_headers_flush_fail_time
733 .get_or_insert(Instant::now());
734 }
735
736 if res.is_ok() {
737 if let Some(first) =
738 ctx.first_full_headers_flush_fail_time.take()
739 {
740 ctx.audit_stats.add_header_flush_duration(
741 Instant::now().duration_since(first),
742 );
743 }
744 }
745
746 res
747 },
748
749 OutboundFrame::Body(body, fin) => {
750 let len = body.as_ref().len();
751 if len == 0 && !*fin {
752 // quiche doesn't allow sending an empty body when the fin
753 // flag is not set
754 return Ok(());
755 }
756 if *fin {
757 // If this is the last body frame, drop the receiver in the
758 // stream map to signal that we shouldn't receive any more
759 // frames. NOTE: we can't use `mpsc::Receiver::close()`
760 // due to an inconsistency in how tokio handles reading
761 // from a closed mpsc channel https://github.com/tokio-rs/tokio/issues/7631
762 ctx.recv = None;
763 }
764 #[cfg(feature = "zero-copy")]
765 let n = conn.send_body_zc(qconn, stream_id, body, *fin)?;
766
767 #[cfg(not(feature = "zero-copy"))]
768 let n = conn.send_body(qconn, stream_id, body, *fin)?;
769
770 audit_stats.add_downstream_bytes_sent(n as _);
771 if n != len {
772 // Couldn't write the entire body, keep what remains for
773 // future retry.
774 #[cfg(not(feature = "zero-copy"))]
775 body.pop_front(n);
776
777 Err(h3::Error::StreamBlocked)
778 } else {
779 if *fin {
780 ctx.fin_or_reset_sent = true;
781 audit_stats
782 .set_sent_stream_fin(StreamClosureKind::Explicit);
783 if ctx.fin_or_reset_recv {
784 // Return a TransportError to trigger stream cleanup
785 // instead of h3::Error::Done
786 return Err(h3::Error::TransportError(
787 quiche::Error::Done,
788 ));
789 }
790 }
791 Ok(())
792 }
793 },
794
795 OutboundFrame::PeerStreamError => Err(h3::Error::MessageError),
796
797 OutboundFrame::FlowShutdown { .. } => {
798 unreachable!("Only flows send shutdowns")
799 },
800
801 OutboundFrame::Datagram(..) => {
802 unreachable!("Only flows send datagrams")
803 },
804 }
805 }
806
807 /// Resumes reads or writes to the connection when a stream channel becomes
808 /// unblocked.
809 ///
810 /// If we were waiting for more data from a channel, we resume writing to
811 /// the connection. Otherwise, we were blocked on channel capacity and
812 /// continue reading from the connection. `Upstream` in this context is
813 /// the consumer of the stream.
814 fn upstream_ready(
815 &mut self, qconn: &mut QuicheConnection, ready: StreamReady,
816 ) -> H3ConnectionResult<()> {
817 match ready {
818 StreamReady::Downstream(r) => self.upstream_read_ready(qconn, r),
819 StreamReady::Upstream(r) => self.upstream_write_ready(qconn, r),
820 }
821 }
822
823 fn upstream_read_ready(
824 &mut self, qconn: &mut QuicheConnection,
825 read_ready: ReceivedDownstreamData,
826 ) -> H3ConnectionResult<()> {
827 let ReceivedDownstreamData {
828 stream_id,
829 chan,
830 data,
831 } = read_ready;
832
833 match self.stream_map.get_mut(&stream_id) {
834 None => Ok(()),
835 Some(stream) => {
836 stream.recv = Some(chan);
837 stream.queued_frame = data;
838 self.process_writable_stream(qconn, stream_id)
839 },
840 }
841 }
842
843 fn upstream_write_ready(
844 &mut self, qconn: &mut QuicheConnection,
845 write_ready: HaveUpstreamCapacity,
846 ) -> H3ConnectionResult<()> {
847 let HaveUpstreamCapacity {
848 stream_id,
849 mut chan,
850 } = write_ready;
851
852 match self.stream_map.get_mut(&stream_id) {
853 None => Ok(()),
854 Some(stream) => {
855 chan.abort_send(); // Have to do it to release the associated permit
856 stream.send = Some(chan);
857 self.process_h3_data(qconn, stream_id)
858 },
859 }
860 }
861
862 /// Processes all queued outbound datagrams from the `dgram_recv` channel.
863 fn dgram_ready(
864 &mut self, qconn: &mut QuicheConnection, frame: OutboundFrame,
865 ) -> H3ConnectionResult<()> {
866 let mut frame = Ok(frame);
867
868 loop {
869 match frame {
870 Ok(OutboundFrame::Datagram(dgram, flow_id)) => {
871 // Drop datagrams if there is no capacity
872 let _ = datagram::send_h3_dgram(qconn, flow_id, dgram);
873 },
874 Ok(OutboundFrame::FlowShutdown { flow_id, stream_id }) => {
875 self.finish_stream(
876 qconn,
877 stream_id,
878 Some(quiche::h3::WireErrorCode::NoError as u64),
879 Some(quiche::h3::WireErrorCode::NoError as u64),
880 )?;
881 self.flow_map.remove(&flow_id);
882 break;
883 },
884 Ok(_) => unreachable!("Flows can't send frame of other types"),
885 Err(TryRecvError::Empty) => break,
886 Err(TryRecvError::Disconnected) =>
887 return Err(H3ConnectionError::ControllerWentAway),
888 }
889
890 frame = self.dgram_recv.try_recv();
891 }
892
893 Ok(())
894 }
895
896 /// Return a mutable reference to the driver's HTTP/3 connection.
897 ///
898 /// If the connection doesn't exist yet, this function returns
899 /// a `Self::connection_not_present()` error.
900 fn conn_mut(&mut self) -> H3ConnectionResult<&mut h3::Connection> {
901 self.conn.as_mut().ok_or(Self::connection_not_present())
902 }
903
904 /// Alias for [`quiche::Error::TlsFail`], which is used in the case where
905 /// this driver doesn't have an established HTTP/3 connection attached
906 /// to it yet.
907 const fn connection_not_present() -> H3ConnectionError {
908 H3ConnectionError::H3(h3::Error::TransportError(quiche::Error::TlsFail))
909 }
910
911 /// Removes a stream from the stream map if it exists. Also optionally sends
912 /// `RESET` or `STOP_SENDING` frames if `write` or `read` is set to an
913 /// error code, respectively.
914 fn finish_stream(
915 &mut self, qconn: &mut QuicheConnection, stream_id: u64,
916 read: Option<u64>, write: Option<u64>,
917 ) -> H3ConnectionResult<()> {
918 let Some(stream_ctx) = self.stream_map.remove(&stream_id) else {
919 return Ok(());
920 };
921
922 let audit_stats = &stream_ctx.audit_stats;
923
924 if let Some(err) = read {
925 audit_stats.set_sent_stop_sending_error_code(err as _);
926 let _ = qconn.stream_shutdown(stream_id, quiche::Shutdown::Read, err);
927 }
928
929 if let Some(err) = write {
930 audit_stats.set_sent_reset_stream_error_code(err as _);
931 let _ =
932 qconn.stream_shutdown(stream_id, quiche::Shutdown::Write, err);
933 }
934
935 // Find if the stream also has any pending futures associated with it
936 for pending in self.waiting_streams.iter_mut() {
937 match pending {
938 WaitForStream::Downstream(WaitForDownstreamData {
939 stream_id: id,
940 chan: Some(chan),
941 }) if stream_id == *id => {
942 chan.close();
943 },
944 WaitForStream::Upstream(WaitForUpstreamCapacity {
945 stream_id: id,
946 chan: Some(chan),
947 }) if stream_id == *id => {
948 chan.close();
949 },
950 _ => {},
951 }
952 }
953
954 // Close any DATAGRAM-proxying channels when we close the stream, if they
955 // exist
956 if let Some(mapped_flow_id) = stream_ctx.associated_dgram_flow_id {
957 self.flow_map.remove(&mapped_flow_id);
958 }
959
960 if qconn.is_server() {
961 // Signal the server to remove the stream from its map
962 let _ = self
963 .h3_event_sender
964 .send(H3Event::StreamClosed { stream_id }.into());
965 }
966
967 Ok(())
968 }
969
970 /// Handles a regular [`H3Command`]. May be called internally by
971 /// [DriverHooks] for non-endpoint-specific [`H3Command`]s.
972 fn handle_core_command(
973 &mut self, qconn: &mut QuicheConnection, cmd: H3Command,
974 ) -> H3ConnectionResult<()> {
975 match cmd {
976 H3Command::QuicCmd(cmd) => cmd.execute(qconn),
977 H3Command::GoAway => {
978 let max_id = self.max_stream_seen;
979 self.conn_mut()
980 .expect("connection should be established")
981 .send_goaway(qconn, max_id)?;
982 },
983 }
984 Ok(())
985 }
986}
987
988impl<H: DriverHooks> H3Driver<H> {
989 /// Reads all buffered datagrams out of `qconn` and distributes them to
990 /// their flow channels.
991 fn process_available_dgrams(
992 &mut self, qconn: &mut QuicheConnection,
993 ) -> H3ConnectionResult<()> {
994 loop {
995 match datagram::receive_h3_dgram(qconn) {
996 Ok((flow_id, dgram)) => {
997 self.get_or_insert_flow(flow_id)?.send_best_effort(dgram);
998 },
999 Err(quiche::Error::Done) => return Ok(()),
1000 Err(err) => return Err(H3ConnectionError::from(err)),
1001 }
1002 }
1003 }
1004
1005 /// Flushes any queued-up frames for `stream_id` into `qconn` until either
1006 /// there is no more capacity in `qconn` or no more frames to send.
1007 fn process_writable_stream(
1008 &mut self, qconn: &mut QuicheConnection, stream_id: u64,
1009 ) -> H3ConnectionResult<()> {
1010 // Split self borrow between conn and stream_map
1011 let conn = self.conn.as_mut().ok_or(Self::connection_not_present())?;
1012 let Some(ctx) = self.stream_map.get_mut(&stream_id) else {
1013 return Ok(()); // Unknown stream_id
1014 };
1015
1016 loop {
1017 // Process each writable frame, queue the next frame for processing
1018 // and shut down any errored streams.
1019 match Self::process_write_frame(conn, qconn, ctx) {
1020 Ok(()) => ctx.queued_frame = None,
1021 Err(h3::Error::StreamBlocked | h3::Error::Done) => break,
1022 Err(h3::Error::MessageError) => {
1023 return self.finish_stream(
1024 qconn,
1025 stream_id,
1026 Some(quiche::h3::WireErrorCode::MessageError as u64),
1027 Some(quiche::h3::WireErrorCode::MessageError as u64),
1028 );
1029 },
1030 Err(h3::Error::TransportError(quiche::Error::StreamStopped(
1031 e,
1032 ))) => {
1033 ctx.handle_recvd_stop_sending(e);
1034 if ctx.both_directions_done() {
1035 return self.finish_stream(qconn, stream_id, None, None);
1036 } else {
1037 return Ok(());
1038 }
1039 },
1040 Err(h3::Error::TransportError(
1041 quiche::Error::InvalidStreamState(stream),
1042 )) => {
1043 return self.finish_stream(qconn, stream, None, None);
1044 },
1045 Err(_) => {
1046 return self.finish_stream(qconn, stream_id, None, None);
1047 },
1048 }
1049
1050 let Some(recv) = ctx.recv.as_mut() else {
1051 // This stream is already waiting for data or we wrote a fin and
1052 // closed the channel.
1053 debug_assert!(
1054 ctx.queued_frame.is_none(),
1055 "We MUST NOT have a queued frame if we are already waiting on
1056 more data from the channel"
1057 );
1058 return Ok(());
1059 };
1060
1061 // Attempt to queue the next frame for processing. The corresponding
1062 // sender is created at the same time as the `StreamCtx`
1063 // and ultimately ends up in an `H3Body`. The body then
1064 // determines which frames to send to the peer via
1065 // this processing loop.
1066 match recv.try_recv() {
1067 Ok(frame) => ctx.queued_frame = Some(frame),
1068 Err(TryRecvError::Disconnected) => {
1069 if !ctx.fin_or_reset_sent &&
1070 ctx.associated_dgram_flow_id.is_none()
1071 // The channel might be closed if the stream was used to
1072 // initiate a datagram exchange.
1073 // TODO: ideally, the application would still shut down the
1074 // stream properly. Once applications code
1075 // is fixed, we can remove this check.
1076 {
1077 // The channel closed without having written a fin. Send a
1078 // RESET_STREAM to indicate we won't be writing anything
1079 // else
1080 let err = h3::WireErrorCode::RequestCancelled as u64;
1081 let _ = qconn.stream_shutdown(
1082 stream_id,
1083 quiche::Shutdown::Write,
1084 err,
1085 );
1086 ctx.handle_sent_reset(err);
1087 if ctx.both_directions_done() {
1088 return self
1089 .finish_stream(qconn, stream_id, None, None);
1090 }
1091 }
1092 break;
1093 },
1094 Err(TryRecvError::Empty) => {
1095 self.waiting_streams.push(ctx.wait_for_recv(stream_id));
1096 break;
1097 },
1098 }
1099 }
1100
1101 Ok(())
1102 }
1103
1104 /// Tests `qconn` for either a local or peer error and increments
1105 /// the associated HTTP/3 or QUIC error counter.
1106 fn record_quiche_error(qconn: &mut QuicheConnection, metrics: &impl Metrics) {
1107 // split metrics between local/peer and QUIC/HTTP/3 level errors
1108 if let Some(err) = qconn.local_error() {
1109 if err.is_app {
1110 metrics.local_h3_conn_close_error_count(err.error_code.into())
1111 } else {
1112 metrics.local_quic_conn_close_error_count(err.error_code.into())
1113 }
1114 .inc();
1115 } else if let Some(err) = qconn.peer_error() {
1116 if err.is_app {
1117 metrics.peer_h3_conn_close_error_count(err.error_code.into())
1118 } else {
1119 metrics.peer_quic_conn_close_error_count(err.error_code.into())
1120 }
1121 .inc();
1122 }
1123 }
1124}
1125
1126impl<H: DriverHooks> ApplicationOverQuic for H3Driver<H> {
1127 fn on_conn_established(
1128 &mut self, quiche_conn: &mut QuicheConnection,
1129 handshake_info: &HandshakeInfo,
1130 ) -> QuicResult<()> {
1131 let conn = h3::Connection::with_transport(quiche_conn, &self.h3_config)?;
1132 self.conn = Some(conn);
1133
1134 H::conn_established(self, quiche_conn, handshake_info)?;
1135 Ok(())
1136 }
1137
1138 #[inline]
1139 fn should_act(&self) -> bool {
1140 self.conn.is_some()
1141 }
1142
1143 #[inline]
1144 fn buffer(&mut self) -> &mut [u8] {
1145 &mut self.pooled_buf
1146 }
1147
1148 /// Poll the underlying [`quiche::h3::Connection`] for
1149 /// [`quiche::h3::Event`]s and DATAGRAMs, delegating processing to
1150 /// `Self::process_read_event`.
1151 ///
1152 /// If a DATAGRAM is found, it is sent to the receiver on its channel.
1153 fn process_reads(&mut self, qconn: &mut QuicheConnection) -> QuicResult<()> {
1154 loop {
1155 match self.conn_mut()?.poll(qconn) {
1156 Ok((stream_id, event)) =>
1157 self.process_read_event(qconn, stream_id, event)?,
1158 Err(h3::Error::Done) => break,
1159 Err(err) => {
1160 // Don't bubble error up, instead keep the worker loop going
1161 // until quiche reports the connection is
1162 // closed.
1163 log::debug!("connection closed due to h3 protocol error"; "error"=>?err);
1164 return Ok(());
1165 },
1166 };
1167 }
1168
1169 self.process_available_dgrams(qconn)?;
1170 Ok(())
1171 }
1172
1173 /// Write as much data as possible into the [`quiche::h3::Connection`] from
1174 /// all sources. This will attempt to write any queued frames into their
1175 /// respective streams, if writable.
1176 fn process_writes(&mut self, qconn: &mut QuicheConnection) -> QuicResult<()> {
1177 while let Some(stream_id) = qconn.stream_writable_next() {
1178 self.process_writable_stream(qconn, stream_id)?;
1179 }
1180
1181 // Also optimistically check for any ready streams
1182 while let Some(Some(ready)) = self.waiting_streams.next().now_or_never() {
1183 self.upstream_ready(qconn, ready)?;
1184 }
1185
1186 Ok(())
1187 }
1188
1189 /// Reports connection-level error metrics and forwards
1190 /// IOWorker errors to the associated [H3Controller].
1191 fn on_conn_close<M: Metrics>(
1192 &mut self, quiche_conn: &mut QuicheConnection, metrics: &M,
1193 work_loop_result: &QuicResult<()>,
1194 ) {
1195 let max_stream_seen = self.max_stream_seen;
1196 metrics
1197 .maximum_writable_streams()
1198 .observe(max_stream_seen as f64);
1199
1200 let Err(work_loop_error) = work_loop_result else {
1201 return;
1202 };
1203
1204 Self::record_quiche_error(quiche_conn, metrics);
1205
1206 let Some(h3_err) = work_loop_error.downcast_ref::<H3ConnectionError>()
1207 else {
1208 log::error!("Found non-H3ConnectionError"; "error" => %work_loop_error);
1209 return;
1210 };
1211
1212 if matches!(h3_err, H3ConnectionError::ControllerWentAway) {
1213 // Inform client that we won't (can't) respond anymore
1214 let _ =
1215 quiche_conn.close(true, h3::WireErrorCode::NoError as u64, &[]);
1216 return;
1217 }
1218
1219 if let Some(ev) = H3Event::from_error(h3_err) {
1220 let _ = self.h3_event_sender.send(ev.into());
1221 #[expect(clippy::needless_return)]
1222 return; // avoid accidental fallthrough in the future
1223 }
1224 }
1225
1226 /// Wait for incoming data from the [H3Controller]. The next iteration of
1227 /// the I/O loop commences when one of the `select!`ed futures triggers.
1228 #[inline]
1229 async fn wait_for_data(
1230 &mut self, qconn: &mut QuicheConnection,
1231 ) -> QuicResult<()> {
1232 select! {
1233 biased;
1234 Some(ready) = self.waiting_streams.next() => self.upstream_ready(qconn, ready),
1235 Some(dgram) = self.dgram_recv.recv() => self.dgram_ready(qconn, dgram),
1236 Some(cmd) = self.cmd_recv.recv() => H::conn_command(self, qconn, cmd),
1237 r = self.hooks.wait_for_action(qconn), if H::has_wait_action(self) => r,
1238 }?;
1239
1240 // Make sure controller is not starved, but also not prioritized in the
1241 // biased select. So poll it last, however also perform a try_recv
1242 // each iteration.
1243 if let Ok(cmd) = self.cmd_recv.try_recv() {
1244 H::conn_command(self, qconn, cmd)?;
1245 }
1246
1247 Ok(())
1248 }
1249}
1250
1251impl<H: DriverHooks> Drop for H3Driver<H> {
1252 fn drop(&mut self) {
1253 for stream in self.stream_map.values() {
1254 stream
1255 .audit_stats
1256 .set_recvd_stream_fin(StreamClosureKind::Implicit);
1257 }
1258 }
1259}
1260
1261/// [`H3Command`]s are sent by the [H3Controller] to alter the [H3Driver]'s
1262/// state.
1263///
1264/// Both [ServerH3Driver] and [ClientH3Driver] may extend this enum with
1265/// endpoint-specific variants.
1266#[derive(Debug)]
1267pub enum H3Command {
1268 /// A connection-level command that executes directly on the
1269 /// [`quiche::Connection`].
1270 QuicCmd(QuicCommand),
1271 /// Send a GOAWAY frame to the peer to initiate a graceful connection
1272 /// shutdown.
1273 GoAway,
1274}
1275
1276/// Sends [`H3Command`]s to an [H3Driver]. The sender is typed and internally
1277/// wraps instances of `T` in the appropriate `H3Command` variant.
1278pub struct RequestSender<C, T> {
1279 sender: UnboundedSender<C>,
1280 // Required to work around dangling type parameter
1281 _r: PhantomData<fn() -> T>,
1282}
1283
1284impl<C, T: Into<C>> RequestSender<C, T> {
1285 /// Send a request to the [H3Driver]. This can only fail if the driver is
1286 /// gone.
1287 #[inline(always)]
1288 pub fn send(&self, v: T) -> Result<(), mpsc::error::SendError<C>> {
1289 self.sender.send(v.into())
1290 }
1291}
1292
1293impl<C, T> Clone for RequestSender<C, T> {
1294 fn clone(&self) -> Self {
1295 Self {
1296 sender: self.sender.clone(),
1297 _r: Default::default(),
1298 }
1299 }
1300}
1301
1302/// Interface to communicate with a paired [H3Driver].
1303///
1304/// An [H3Controller] receives [`H3Event`]s from its driver, which must be
1305/// consumed by the application built on top of the driver to react to incoming
1306/// events. The controller also allows the application to send ad-hoc
1307/// [`H3Command`]s to the driver, which will be processed when the driver waits
1308/// for incoming data.
1309pub struct H3Controller<H: DriverHooks> {
1310 /// Sends [`H3Command`]s to the [H3Driver], like [`QuicCommand`]s or
1311 /// outbound HTTP requests.
1312 cmd_sender: UnboundedSender<H::Command>,
1313 /// Receives [`H3Event`]s from the [H3Driver]. Can be extracted and
1314 /// used independently of the [H3Controller].
1315 h3_event_recv: Option<UnboundedReceiver<H::Event>>,
1316}
1317
1318impl<H: DriverHooks> H3Controller<H> {
1319 /// Gets a mut reference to the [`H3Event`] receiver for the paired
1320 /// [H3Driver].
1321 pub fn event_receiver_mut(&mut self) -> &mut UnboundedReceiver<H::Event> {
1322 self.h3_event_recv
1323 .as_mut()
1324 .expect("No event receiver on H3Controller")
1325 }
1326
1327 /// Takes the [`H3Event`] receiver for the paired [H3Driver].
1328 pub fn take_event_receiver(&mut self) -> UnboundedReceiver<H::Event> {
1329 self.h3_event_recv
1330 .take()
1331 .expect("No event receiver on H3Controller")
1332 }
1333
1334 /// Creates a [`QuicCommand`] sender for the paired [H3Driver].
1335 pub fn cmd_sender(&self) -> RequestSender<H::Command, QuicCommand> {
1336 RequestSender {
1337 sender: self.cmd_sender.clone(),
1338 _r: Default::default(),
1339 }
1340 }
1341
1342 /// Sends a GOAWAY frame to initiate a graceful connection shutdown.
1343 pub fn send_goaway(&self) {
1344 let _ = self.cmd_sender.send(H3Command::GoAway.into());
1345 }
1346}