tokio_quiche/quic/connection/
mod.rs

1// Copyright (C) 2025, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27mod error;
28mod id;
29mod map;
30
31pub use self::error::HandshakeError;
32pub use self::id::ConnectionIdGenerator;
33pub use self::id::SimpleConnectionIdGenerator;
34pub(crate) use self::map::ConnectionMap;
35
36use boring::ssl::SslRef;
37use datagram_socket::AsSocketStats;
38use datagram_socket::DatagramSocketSend;
39use datagram_socket::MaybeConnectedSocket;
40use datagram_socket::QuicAuditStats;
41use datagram_socket::ShutdownConnection;
42use datagram_socket::SocketStats;
43use foundations::telemetry::log;
44use futures::future::BoxFuture;
45use futures::Future;
46use quiche::ConnectionId;
47use std::fmt;
48use std::io;
49use std::net::SocketAddr;
50use std::sync::Arc;
51use std::sync::Mutex;
52use std::task::Poll;
53use std::time::Duration;
54use std::time::Instant;
55use std::time::SystemTime;
56use tokio::sync::mpsc;
57use tokio_util::task::AbortOnDropHandle;
58
59use self::error::make_handshake_result;
60use super::io::connection_stage::Close;
61use super::io::connection_stage::ConnectionStageContext;
62use super::io::connection_stage::Handshake;
63use super::io::connection_stage::RunningApplication;
64use super::io::worker::Closing;
65use super::io::worker::IoWorkerParams;
66use super::io::worker::Running;
67use super::io::worker::RunningOrClosing;
68use super::io::worker::WriteState;
69use super::QuicheConnection;
70use crate::buf_factory::PooledBuf;
71use crate::metrics::Metrics;
72use crate::quic::io::worker::IoWorker;
73use crate::quic::io::worker::WriterConfig;
74use crate::quic::io::worker::INCOMING_QUEUE_SIZE;
75use crate::quic::router::ConnectionMapCommand;
76use crate::QuicResult;
77
78/// Wrapper for connection statistics recorded by [quiche].
79#[derive(Debug)]
80pub struct QuicConnectionStats {
81    /// Aggregate connection statistics across all paths.
82    pub stats: quiche::Stats,
83    /// Specific statistics about the connection's active path.
84    pub path_stats: Option<quiche::PathStats>,
85}
86pub(crate) type QuicConnectionStatsShared = Arc<Mutex<QuicConnectionStats>>;
87
88impl QuicConnectionStats {
89    pub(crate) fn from_conn(qconn: &QuicheConnection) -> Self {
90        Self {
91            stats: qconn.stats(),
92            path_stats: qconn.path_stats().next(),
93        }
94    }
95}
96
97impl AsSocketStats for QuicConnectionStats {
98    fn as_socket_stats(&self) -> SocketStats {
99        SocketStats {
100            pmtu: self
101                .path_stats
102                .as_ref()
103                .map(|p| p.pmtu as u16)
104                .unwrap_or_default(),
105            rtt_us: self
106                .path_stats
107                .as_ref()
108                .map(|p| p.rtt.as_micros() as i64)
109                .unwrap_or_default(),
110            min_rtt_us: self
111                .path_stats
112                .as_ref()
113                .and_then(|p| p.min_rtt.map(|x| x.as_micros() as i64))
114                .unwrap_or_default(),
115            rtt_var_us: self
116                .path_stats
117                .as_ref()
118                .map(|p| p.rttvar.as_micros() as i64)
119                .unwrap_or_default(),
120            cwnd: self
121                .path_stats
122                .as_ref()
123                .map(|p| p.cwnd as u64)
124                .unwrap_or_default(),
125            packets_sent: self.stats.sent as u64,
126            packets_recvd: self.stats.recv as u64,
127            packets_lost: self.stats.lost as u64,
128            packets_retrans: self.stats.retrans as u64,
129            bytes_sent: self.stats.sent_bytes,
130            bytes_recvd: self.stats.recv_bytes,
131            bytes_lost: self.stats.lost_bytes,
132            bytes_retrans: self.stats.stream_retrans_bytes,
133            bytes_unsent: 0, /* not implemented yet, kept for compatibility
134                              * with TCP */
135            delivery_rate: self
136                .path_stats
137                .as_ref()
138                .map(|p| p.delivery_rate)
139                .unwrap_or_default(),
140        }
141    }
142}
143
144/// A received network packet with additional metadata.
145#[derive(Debug)]
146pub struct Incoming {
147    /// The address that sent the inbound packet.
148    pub peer_addr: SocketAddr,
149    /// The address on which we received the inbound packet.
150    pub local_addr: SocketAddr,
151    /// The receive timestamp of the packet.
152    ///
153    /// Used for the `perf-quic-listener-metrics` feature.
154    pub rx_time: Option<SystemTime>,
155    /// The packet's contents.
156    pub buf: PooledBuf,
157    /// If set, then `buf` is a GRO buffer containing multiple packets.
158    /// Each individual packet has a size of `gso` (except for the last one).
159    pub gro: Option<u16>,
160}
161
162/// A QUIC connection that has not performed a handshake yet.
163///
164/// This type is currently only used for server-side connections. It is created
165/// and added to the listener's connection stream after an initial packet from
166/// a client has been received and (optionally) the client's IP address has been
167/// validated.
168///
169/// To turn the initial connection into a fully established one, a QUIC
170/// handshake must be performed. Users have multiple options to facilitate this:
171/// - `start` is a simple entrypoint which spawns a task to handle the entire
172///   lifetime of the QUIC connection. The caller can then only communicate with
173///   the connection via their [`ApplicationOverQuic`].
174/// - `handshake` spawns a task for the handshake and awaits its completion.
175///   Afterwards, it pauses the connection and allows the caller to resume it
176///   later via an opaque struct. We spawn a separate task to allow the tokio
177///   scheduler free choice in where to run the handshake.
178/// - `handshake_fut` returns a future to drive the handshake for maximum
179///   flexibility.
180#[must_use = "call InitialQuicConnection::start to establish the connection"]
181pub struct InitialQuicConnection<Tx, M>
182where
183    Tx: DatagramSocketSend + Send + 'static + ?Sized,
184    M: Metrics,
185{
186    /// An internal ID, to uniquely identify the connection across multiple QUIC
187    /// connection IDs.
188    pub(crate) id: u64,
189    params: QuicConnectionParams<Tx, M>,
190    pub(crate) audit_log_stats: Arc<QuicAuditStats>,
191    stats: QuicConnectionStatsShared,
192    pub(crate) incoming_ev_sender: mpsc::Sender<Incoming>,
193    incoming_ev_receiver: mpsc::Receiver<Incoming>,
194}
195
196impl<Tx, M> InitialQuicConnection<Tx, M>
197where
198    Tx: DatagramSocketSend + Send + 'static + ?Sized,
199    M: Metrics,
200{
201    #[inline]
202    pub(crate) fn new(params: QuicConnectionParams<Tx, M>) -> Self {
203        let (incoming_ev_sender, incoming_ev_receiver) =
204            mpsc::channel(INCOMING_QUEUE_SIZE);
205        let audit_log_stats = Arc::new(QuicAuditStats::new(params.scid.to_vec()));
206
207        let stats = Arc::new(Mutex::new(QuicConnectionStats::from_conn(
208            &params.quiche_conn,
209        )));
210
211        Self {
212            id: Self::generate_id(),
213            params,
214            audit_log_stats,
215            stats,
216            incoming_ev_sender,
217            incoming_ev_receiver,
218        }
219    }
220
221    /// The local address this connection listens on.
222    pub fn local_addr(&self) -> SocketAddr {
223        self.params.local_addr
224    }
225
226    /// The remote address for this connection.
227    pub fn peer_addr(&self) -> SocketAddr {
228        self.params.peer_addr
229    }
230
231    /// [boring]'s SSL object for this connection.
232    #[doc(hidden)]
233    pub fn ssl_mut(&mut self) -> &mut SslRef {
234        self.params.quiche_conn.as_mut()
235    }
236
237    /// A handle to the [`QuicAuditStats`] for this connection.
238    ///
239    /// # Note
240    /// These stats are updated during the lifetime of the connection.
241    /// The getter exists to grab a handle early on, which can then
242    /// be stowed away and read out after the connection has closed.
243    #[inline]
244    pub fn audit_log_stats(&self) -> Arc<QuicAuditStats> {
245        Arc::clone(&self.audit_log_stats)
246    }
247
248    /// A handle to the [`QuicConnectionStats`] for this connection.
249    ///
250    /// # Note
251    /// Initially, these stats represent the state when the [quiche::Connection]
252    /// was created. They are updated when the connection is closed, so this
253    /// getter exists primarily to grab a handle early on.
254    #[inline]
255    pub fn stats(&self) -> &QuicConnectionStatsShared {
256        &self.stats
257    }
258
259    /// Creates a future to drive the connection's handshake.
260    ///
261    /// This is a lower-level alternative to the `handshake` function which
262    /// gives the caller more control over execution of the future. See
263    /// `handshake` for details on the return values.
264    #[allow(clippy::type_complexity)]
265    pub fn handshake_fut<A: ApplicationOverQuic>(
266        self, app: A,
267    ) -> (
268        QuicConnection,
269        BoxFuture<'static, io::Result<Running<Arc<Tx>, M, A>>>,
270    ) {
271        self.params.metrics.connections_in_memory().inc();
272
273        let conn = QuicConnection {
274            local_addr: self.params.local_addr,
275            peer_addr: self.params.peer_addr,
276            audit_log_stats: Arc::clone(&self.audit_log_stats),
277            stats: Arc::clone(&self.stats),
278            scid: self.params.scid,
279        };
280        let context = ConnectionStageContext {
281            in_pkt: self.params.initial_pkt,
282            incoming_pkt_receiver: self.incoming_ev_receiver,
283            application: app,
284            stats: Arc::clone(&self.stats),
285        };
286        let conn_stage = Handshake {
287            handshake_info: self.params.handshake_info,
288        };
289        let params = IoWorkerParams {
290            socket: MaybeConnectedSocket::new(self.params.socket),
291            shutdown_tx: self.params.shutdown_tx,
292            cfg: self.params.writer_cfg,
293            audit_log_stats: self.audit_log_stats,
294            write_state: WriteState::default(),
295            conn_map_cmd_tx: self.params.conn_map_cmd_tx,
296            #[cfg(feature = "perf-quic-listener-metrics")]
297            init_rx_time: self.params.init_rx_time,
298            metrics: self.params.metrics.clone(),
299        };
300
301        let handshake_fut = async move {
302            let qconn = self.params.quiche_conn;
303            let handshake_done =
304                IoWorker::new(params, conn_stage).run(qconn, context).await;
305
306            match handshake_done {
307                RunningOrClosing::Running(r) => Ok(r),
308                RunningOrClosing::Closing(Closing {
309                    params,
310                    work_loop_result,
311                    mut context,
312                    mut qconn,
313                }) => {
314                    let hs_result = make_handshake_result(&work_loop_result);
315                    IoWorker::new(params, Close { work_loop_result })
316                        .close(&mut qconn, &mut context)
317                        .await;
318                    hs_result
319                },
320            }
321        };
322
323        (conn, Box::pin(handshake_fut))
324    }
325
326    /// Performs the QUIC handshake in a separate tokio task and awaits its
327    /// completion.
328    ///
329    /// The returned [`QuicConnection`] holds metadata about the established
330    /// connection. The connection itself is paused after `handshake`
331    /// returns and must be resumed by passing the opaque `Running` value to
332    /// [`InitialQuicConnection::resume`]. This two-step process
333    /// allows callers to collect telemetry and run code before serving their
334    /// [`ApplicationOverQuic`].
335    pub async fn handshake<A: ApplicationOverQuic>(
336        self, app: A,
337    ) -> io::Result<(QuicConnection, Running<Arc<Tx>, M, A>)> {
338        let task_metrics = self.params.metrics.clone();
339        let (conn, handshake_fut) = Self::handshake_fut(self, app);
340
341        let handshake_handle = crate::metrics::tokio_task::spawn(
342            "quic_handshake_worker",
343            task_metrics,
344            handshake_fut,
345        );
346
347        // `AbortOnDropHandle` simulates task-killswitch behavior without needing
348        // to give up ownership of the `JoinHandle`.
349        let handshake_abort_handle = AbortOnDropHandle::new(handshake_handle);
350
351        let worker = handshake_abort_handle.await??;
352
353        Ok((conn, worker))
354    }
355
356    /// Resumes a QUIC connection which was paused after a successful handshake.
357    pub fn resume<A: ApplicationOverQuic>(pre_running: Running<Arc<Tx>, M, A>) {
358        let task_metrics = pre_running.params.metrics.clone();
359        let fut = async move {
360            let Running {
361                params,
362                context,
363                qconn,
364            } = pre_running;
365            let running_worker = IoWorker::new(params, RunningApplication);
366
367            let Closing {
368                params,
369                mut context,
370                work_loop_result,
371                mut qconn,
372            } = running_worker.run(qconn, context).await;
373
374            IoWorker::new(params, Close { work_loop_result })
375                .close(&mut qconn, &mut context)
376                .await;
377        };
378
379        crate::metrics::tokio_task::spawn_with_killswitch(
380            "quic_io_worker",
381            task_metrics,
382            fut,
383        );
384    }
385
386    /// Drives a QUIC connection from handshake to close in separate tokio
387    /// tasks.
388    ///
389    /// It combines [`InitialQuicConnection::handshake`] and
390    /// [`InitialQuicConnection::resume`] into a single call.
391    pub fn start<A: ApplicationOverQuic>(self, app: A) -> QuicConnection {
392        let task_metrics = self.params.metrics.clone();
393        let (conn, handshake_fut) = Self::handshake_fut(self, app);
394
395        let fut = async move {
396            match handshake_fut.await {
397                Ok(running) => Self::resume(running),
398                Err(e) =>
399                    log::error!("QUIC handshake failed in IQC::start"; "error" => e),
400            }
401        };
402
403        crate::metrics::tokio_task::spawn_with_killswitch(
404            "quic_handshake_worker",
405            task_metrics,
406            fut,
407        );
408
409        conn
410    }
411
412    fn generate_id() -> u64 {
413        let mut buf = [0; 8];
414
415        boring::rand::rand_bytes(&mut buf).unwrap();
416
417        u64::from_ne_bytes(buf)
418    }
419}
420
421pub(crate) struct QuicConnectionParams<Tx, M>
422where
423    Tx: DatagramSocketSend + Send + 'static + ?Sized,
424    M: Metrics,
425{
426    pub writer_cfg: WriterConfig,
427    pub initial_pkt: Option<Incoming>,
428    pub shutdown_tx: mpsc::Sender<()>,
429    pub conn_map_cmd_tx: mpsc::UnboundedSender<ConnectionMapCommand>, /* channel that signals connection map changes */
430    pub scid: ConnectionId<'static>,
431    pub metrics: M,
432    #[cfg(feature = "perf-quic-listener-metrics")]
433    pub init_rx_time: Option<SystemTime>,
434    pub handshake_info: HandshakeInfo,
435    pub quiche_conn: QuicheConnection,
436    pub socket: Arc<Tx>,
437    pub local_addr: SocketAddr,
438    pub peer_addr: SocketAddr,
439}
440
441/// Metadata about an established QUIC connection.
442///
443/// While this struct allows access to some facets of a QUIC connection, it
444/// notably does not represent the [quiche::Connection] itself. The crate
445/// handles most interactions with [quiche] internally in a worker task. Users
446/// can only access the connection directly via their [`ApplicationOverQuic`]
447/// implementation.
448///
449/// See the [module-level docs](crate::quic) for an overview of how a QUIC
450/// connection is handled internally.
451pub struct QuicConnection {
452    local_addr: SocketAddr,
453    peer_addr: SocketAddr,
454    audit_log_stats: Arc<QuicAuditStats>,
455    stats: QuicConnectionStatsShared,
456    scid: ConnectionId<'static>,
457}
458
459impl QuicConnection {
460    /// The local address this connection listens on.
461    #[inline]
462    pub fn local_addr(&self) -> SocketAddr {
463        self.local_addr
464    }
465
466    /// The remote address for this connection.
467    #[inline]
468    pub fn peer_addr(&self) -> SocketAddr {
469        self.peer_addr
470    }
471
472    /// A handle to the [`QuicAuditStats`] for this connection.
473    ///
474    /// # Note
475    /// These stats are updated during the lifetime of the connection.
476    /// The getter exists to grab a handle early on, which can then
477    /// be stowed away and read out after the connection has closed.
478    #[inline]
479    pub fn audit_log_stats(&self) -> &Arc<QuicAuditStats> {
480        &self.audit_log_stats
481    }
482
483    /// A handle to the [`QuicConnectionStats`] for this connection.
484    ///
485    /// # Note
486    /// Initially, these stats represent the state when the [quiche::Connection]
487    /// was created. They are updated when the connection is closed, so this
488    /// getter exists primarily to grab a handle early on.
489    #[inline]
490    pub fn stats(&self) -> &QuicConnectionStatsShared {
491        &self.stats
492    }
493
494    /// The QUIC source connection ID used by this connection.
495    #[inline]
496    pub fn scid(&self) -> &ConnectionId<'static> {
497        &self.scid
498    }
499}
500
501impl AsSocketStats for QuicConnection {
502    #[inline]
503    fn as_socket_stats(&self) -> SocketStats {
504        // It is important to note that those stats are only updated when
505        // the connection stops, which is fine, since this is only used to
506        // log after the connection is finished.
507        self.stats.lock().unwrap().as_socket_stats()
508    }
509
510    #[inline]
511    fn as_quic_stats(&self) -> Option<&Arc<QuicAuditStats>> {
512        Some(&self.audit_log_stats)
513    }
514}
515
516impl<Tx, M> AsSocketStats for InitialQuicConnection<Tx, M>
517where
518    Tx: DatagramSocketSend + Send + 'static + ?Sized,
519    M: Metrics,
520{
521    #[inline]
522    fn as_socket_stats(&self) -> SocketStats {
523        // It is important to note that those stats are only updated when
524        // the connection stops, which is fine, since this is only used to
525        // log after the connection is finished.
526        self.stats.lock().unwrap().as_socket_stats()
527    }
528
529    #[inline]
530    fn as_quic_stats(&self) -> Option<&Arc<QuicAuditStats>> {
531        Some(&self.audit_log_stats)
532    }
533}
534
535impl<Tx, M> ShutdownConnection for InitialQuicConnection<Tx, M>
536where
537    Tx: DatagramSocketSend + Send + 'static + ?Sized,
538    M: Metrics,
539{
540    #[inline]
541    fn poll_shutdown(
542        &mut self, _cx: &mut std::task::Context,
543    ) -> std::task::Poll<io::Result<()>> {
544        // TODO: Does nothing at the moment. We always call Self::start
545        // anyway so it's not really important at this moment.
546        Poll::Ready(Ok(()))
547    }
548}
549
550impl ShutdownConnection for QuicConnection {
551    #[inline]
552    fn poll_shutdown(
553        &mut self, _cx: &mut std::task::Context,
554    ) -> std::task::Poll<io::Result<()>> {
555        // TODO: does nothing at the moment
556        Poll::Ready(Ok(()))
557    }
558}
559
560/// Details about a connection's QUIC handshake.
561#[derive(Debug, Clone)]
562pub struct HandshakeInfo {
563    /// The time at which the connection was created.
564    start_time: Instant,
565    /// The timeout before which the handshake must complete.
566    timeout: Option<Duration>,
567    /// The real duration that the handshake took to complete.
568    time_handshake: Option<Duration>,
569}
570
571impl HandshakeInfo {
572    pub(crate) fn new(start_time: Instant, timeout: Option<Duration>) -> Self {
573        Self {
574            start_time,
575            timeout,
576            time_handshake: None,
577        }
578    }
579
580    /// The time at which the connection was created.
581    #[inline]
582    pub fn start_time(&self) -> Instant {
583        self.start_time
584    }
585
586    /// How long the handshake took to complete.
587    #[inline]
588    pub fn elapsed(&self) -> Duration {
589        self.time_handshake.unwrap_or_default()
590    }
591
592    pub(crate) fn set_elapsed(&mut self) {
593        let elapsed = self.start_time.elapsed();
594        self.time_handshake = Some(elapsed)
595    }
596
597    pub(crate) fn deadline(&self) -> Option<Instant> {
598        self.timeout.map(|timeout| self.start_time + timeout)
599    }
600
601    pub(crate) fn is_expired(&self) -> bool {
602        self.timeout
603            .is_some_and(|timeout| self.start_time.elapsed() >= timeout)
604    }
605}
606
607/// A trait to implement an application served over QUIC.
608///
609/// The application is driven by an internal worker task, which also handles I/O
610/// for the connection. The worker feeds inbound packets into the
611/// [quiche::Connection], calls [`ApplicationOverQuic::process_reads`] followed
612/// by [`ApplicationOverQuic::process_writes`], and then flushes any pending
613/// outbound packets to the network. This repeats in a loop until either the
614/// connection is closed or the [`ApplicationOverQuic`] returns an error.
615///
616/// In between loop iterations, the worker yields until a new packet arrives, a
617/// timer expires, or [`ApplicationOverQuic::wait_for_data`] resolves.
618/// Implementors can interact with the underlying connection via the mutable
619/// reference passed to trait methods.
620#[allow(unused_variables)] // for default functions
621pub trait ApplicationOverQuic: Send + 'static {
622    /// Callback to customize the [`ApplicationOverQuic`] after the QUIC
623    /// handshake completed successfully.
624    ///
625    /// # Errors
626    /// Returning an error from this method immediately stops the worker loop
627    /// and transitions to the connection closing stage.
628    fn on_conn_established(
629        &mut self, qconn: &mut QuicheConnection, handshake_info: &HandshakeInfo,
630    ) -> QuicResult<()>;
631
632    /// Determines whether the application's methods will be called by the
633    /// worker.
634    ///
635    /// The function is checked in each iteration of the worker loop. Only
636    /// `on_conn_established()` and `buffer()` bypass this check.
637    fn should_act(&self) -> bool;
638
639    /// A borrowed buffer for the worker to write outbound packets into.
640    ///
641    /// This method allows sharing a buffer between the worker and the
642    /// application, efficiently using the allocated memory while the
643    /// application is inactive. It can also be used to artificially
644    /// restrict the size of outbound network packets.
645    ///
646    /// Any data in the buffer may be overwritten by the worker. If necessary,
647    /// the application should save the contents when this method is called.
648    fn buffer(&mut self) -> &mut [u8];
649
650    /// Waits for an event to trigger the next iteration of the worker loop.
651    ///
652    /// The returned future is awaited in parallel to inbound packets and the
653    /// connection's timers. Any one of those futures resolving triggers the
654    /// next loop iteration, so implementations should not rely on
655    /// `wait_for_data` for the bulk of their processing. Instead, after
656    /// `wait_for_data` resolves, `process_writes` should be used to pull all
657    /// available data out of the event source (for example, a channel).
658    ///
659    /// As for any future, it is **very important** that this method does not
660    /// block the runtime. If it does, the other concurrent futures will be
661    /// starved.
662    ///
663    /// # Errors
664    /// Returning an error from this method immediately stops the worker loop
665    /// and transitions to the connection closing stage.
666    fn wait_for_data(
667        &mut self, qconn: &mut QuicheConnection,
668    ) -> impl Future<Output = QuicResult<()>> + Send;
669
670    /// Processes data received on the connection.
671    ///
672    /// This method is only called if `should_act()` returns `true` and any
673    /// packets were received since the last worker loop iteration. It
674    /// should be used to read from the connection's open streams.
675    ///
676    /// # Errors
677    /// Returning an error from this method immediately stops the worker loop
678    /// and transitions to the connection closing stage.
679    fn process_reads(&mut self, qconn: &mut QuicheConnection) -> QuicResult<()>;
680
681    /// Adds data to be sent on the connection.
682    ///
683    /// Unlike `process_reads`, this method is called on every iteration of the
684    /// worker loop (provided `should_act()` returns true). It is called
685    /// after `process_reads` and immediately before packets are pushed to
686    /// the socket. The main use case is providing already-buffered data to
687    /// the [quiche::Connection].
688    ///
689    /// # Errors
690    /// Returning an error from this method immediately stops the worker loop
691    /// and transitions to the connection closing stage.
692    fn process_writes(&mut self, qconn: &mut QuicheConnection) -> QuicResult<()>;
693
694    /// Callback to inspect the result of the worker task, before a final packet
695    /// with a `CONNECTION_CLOSE` frame is flushed to the network.
696    ///
697    /// `connection_result` is [`Ok`] only if the connection was closed without
698    /// any local error. Otherwise, the state of `qconn` depends on the
699    /// error type and application behavior.
700    fn on_conn_close<M: Metrics>(
701        &mut self, qconn: &mut QuicheConnection, metrics: &M,
702        connection_result: &QuicResult<()>,
703    ) {
704    }
705}
706
707/// A command to execute on a [quiche::Connection] in the context of an
708/// [`ApplicationOverQuic`].
709///
710/// We expect most [`ApplicationOverQuic`] implementations (such as
711/// [H3Driver](crate::http3::driver::H3Driver)) will provide some way to submit
712/// actions for them to take, for example via a channel. This enum may be
713/// accepted as part of those actions to inspect or alter the state of the
714/// underlying connection.
715pub enum QuicCommand {
716    /// Close the connection with the given parameters.
717    ///
718    /// Some packets may still be sent after this command has been executed, so
719    /// the worker task may continue running for a bit. See
720    /// [`quiche::Connection::close`] for details.
721    ConnectionClose(ConnectionShutdownBehaviour),
722    /// Execute a custom callback on the connection.
723    Custom(Box<dyn FnOnce(&mut QuicheConnection) + Send + 'static>),
724    /// Collect the current [`SocketStats`] from the connection.
725    ///
726    /// Unlike [`QuicConnection::stats()`], these statistics are not cached and
727    /// instead are retrieved right before the command is executed.
728    Stats(Box<dyn FnOnce(datagram_socket::SocketStats) + Send + 'static>),
729}
730
731impl QuicCommand {
732    /// Consume the command and perform its operation on `qconn`.
733    ///
734    /// This method should be called by [`ApplicationOverQuic`] implementations
735    /// when they receive a [`QuicCommand`] to execute.
736    pub fn execute(self, qconn: &mut QuicheConnection) {
737        match self {
738            Self::ConnectionClose(behavior) => {
739                let ConnectionShutdownBehaviour {
740                    send_application_close,
741                    error_code,
742                    reason,
743                } = behavior;
744
745                let _ = qconn.close(send_application_close, error_code, &reason);
746            },
747            Self::Custom(f) => {
748                (f)(qconn);
749            },
750            Self::Stats(callback) => {
751                let stats_pair = QuicConnectionStats::from_conn(qconn);
752                (callback)(stats_pair.as_socket_stats());
753            },
754        }
755    }
756}
757
758impl fmt::Debug for QuicCommand {
759    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
760        match self {
761            Self::ConnectionClose(b) =>
762                f.debug_tuple("ConnectionClose").field(b).finish(),
763            Self::Custom(_) => f.debug_tuple("Custom").finish_non_exhaustive(),
764            Self::Stats(_) => f.debug_tuple("Stats").finish_non_exhaustive(),
765        }
766    }
767}
768
769/// Parameters to close a [quiche::Connection].
770///
771/// The connection will use these parameters for the `CONNECTION_CLOSE` frame
772/// it sends to its peer.
773#[derive(Debug, Clone)]
774pub struct ConnectionShutdownBehaviour {
775    /// Whether to send an application close or a regular close to the peer.
776    ///
777    /// If this is true but the connection is not in a state where it is safe to
778    /// send an application error (not established nor in early data), in
779    /// accordance with [RFC 9000](https://www.rfc-editor.org/rfc/rfc9000.html#section-10.2.3-3), the
780    /// error code is changed to `APPLICATION_ERROR` and the reason phrase is
781    /// cleared.
782    pub send_application_close: bool,
783    /// The [QUIC][proto-err] or [application-level][app-err] error code to send
784    /// to the peer.
785    ///
786    /// [proto-err]: https://www.rfc-editor.org/rfc/rfc9000.html#section-20.1
787    /// [app-err]: https://www.rfc-editor.org/rfc/rfc9000.html#section-20.2
788    pub error_code: u64,
789    /// The reason phrase to send to the peer.
790    pub reason: Vec<u8>,
791}