Skip to main content

tokio_quiche/quic/connection/
mod.rs

1// Copyright (C) 2025, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27mod error;
28mod id;
29mod map;
30
31pub use self::error::HandshakeError;
32pub use self::id::ConnectionIdGenerator;
33pub use self::id::SharedConnectionIdGenerator;
34pub use self::id::SimpleConnectionIdGenerator;
35pub(crate) use self::map::ConnectionMap;
36
37use boring::ssl::SslRef;
38use datagram_socket::AsSocketStats;
39use datagram_socket::DatagramSocketSend;
40use datagram_socket::MaybeConnectedSocket;
41use datagram_socket::QuicAuditStats;
42use datagram_socket::ShutdownConnection;
43use datagram_socket::SocketStats;
44use foundations::telemetry::log;
45use futures::Future;
46use quiche::ConnectionId;
47use std::fmt;
48use std::io;
49use std::net::SocketAddr;
50use std::sync::Arc;
51use std::sync::Mutex;
52use std::task::Poll;
53use std::time::Duration;
54use std::time::Instant;
55use std::time::SystemTime;
56use tokio::sync::mpsc;
57use tokio_util::task::AbortOnDropHandle;
58
59use self::error::make_handshake_result;
60use super::io::connection_stage::Close;
61use super::io::connection_stage::ConnectionStageContext;
62use super::io::connection_stage::Handshake;
63use super::io::connection_stage::RunningApplication;
64use super::io::worker::Closing;
65use super::io::worker::IoWorkerParams;
66use super::io::worker::Running;
67use super::io::worker::RunningOrClosing;
68use super::io::worker::WriteState;
69use super::QuicheConnection;
70use crate::metrics::Metrics;
71use crate::quic::io::worker::IoWorker;
72use crate::quic::io::worker::WriterConfig;
73use crate::quic::io::worker::INCOMING_QUEUE_SIZE;
74use crate::quic::router::ConnectionMapCommand;
75use crate::QuicResult;
76
77/// Wrapper for connection statistics recorded by [quiche].
78#[derive(Debug)]
79pub struct QuicConnectionStats {
80    /// Aggregate connection statistics across all paths.
81    pub stats: quiche::Stats,
82    /// Specific statistics about the connection's active path.
83    pub path_stats: Option<quiche::PathStats>,
84}
85pub(crate) type QuicConnectionStatsShared = Arc<Mutex<QuicConnectionStats>>;
86
87impl QuicConnectionStats {
88    pub(crate) fn from_conn(qconn: &QuicheConnection) -> Self {
89        Self {
90            stats: qconn.stats(),
91            path_stats: qconn.path_stats().next(),
92        }
93    }
94
95    fn startup_exit_to_socket_stats(
96        value: quiche::StartupExit,
97    ) -> datagram_socket::StartupExit {
98        let reason = match value.reason {
99            quiche::StartupExitReason::Loss =>
100                datagram_socket::StartupExitReason::Loss,
101            quiche::StartupExitReason::BandwidthPlateau =>
102                datagram_socket::StartupExitReason::BandwidthPlateau,
103            quiche::StartupExitReason::PersistentQueue =>
104                datagram_socket::StartupExitReason::PersistentQueue,
105            quiche::StartupExitReason::ConservativeSlowStartRounds =>
106                datagram_socket::StartupExitReason::ConservativeSlowStartRounds,
107        };
108
109        datagram_socket::StartupExit {
110            cwnd: value.cwnd,
111            bandwidth: value.bandwidth,
112            reason,
113        }
114    }
115}
116
117impl AsSocketStats for QuicConnectionStats {
118    fn as_socket_stats(&self) -> SocketStats {
119        SocketStats {
120            pmtu: self
121                .path_stats
122                .as_ref()
123                .map(|p| p.pmtu as u16)
124                .unwrap_or_default(),
125            rtt_us: self
126                .path_stats
127                .as_ref()
128                .map(|p| p.rtt.as_micros() as i64)
129                .unwrap_or_default(),
130            min_rtt_us: self
131                .path_stats
132                .as_ref()
133                .and_then(|p| p.min_rtt.map(|x| x.as_micros() as i64))
134                .unwrap_or_default(),
135            max_rtt_us: self
136                .path_stats
137                .as_ref()
138                .and_then(|p| p.max_rtt.map(|x| x.as_micros() as i64))
139                .unwrap_or_default(),
140            rtt_var_us: self
141                .path_stats
142                .as_ref()
143                .map(|p| p.rttvar.as_micros() as i64)
144                .unwrap_or_default(),
145            cwnd: self
146                .path_stats
147                .as_ref()
148                .map(|p| p.cwnd as u64)
149                .unwrap_or_default(),
150            total_pto_count: self
151                .path_stats
152                .as_ref()
153                .map(|p| p.total_pto_count as u64)
154                .unwrap_or_default(),
155            packets_sent: self.stats.sent as u64,
156            packets_recvd: self.stats.recv as u64,
157            packets_lost: self.stats.lost as u64,
158            packets_lost_spurious: self.stats.spurious_lost as u64,
159            packets_retrans: self.stats.retrans as u64,
160            bytes_sent: self.stats.sent_bytes,
161            bytes_recvd: self.stats.recv_bytes,
162            bytes_lost: self.stats.lost_bytes,
163            bytes_retrans: self.stats.stream_retrans_bytes,
164            bytes_unsent: 0, /* not implemented yet, kept for compatibility
165                              * with TCP */
166            delivery_rate: self
167                .path_stats
168                .as_ref()
169                .map(|p| p.delivery_rate)
170                .unwrap_or_default(),
171            max_bandwidth: self.path_stats.as_ref().and_then(|p| p.max_bandwidth),
172            startup_exit: self
173                .path_stats
174                .as_ref()
175                .and_then(|p| p.startup_exit)
176                .map(QuicConnectionStats::startup_exit_to_socket_stats),
177            bytes_in_flight_duration_us: self
178                .stats
179                .bytes_in_flight_duration
180                .as_micros() as u64,
181        }
182    }
183}
184
185/// A received network packet with additional metadata.
186#[derive(Debug)]
187pub struct Incoming {
188    /// The address that sent the inbound packet.
189    pub peer_addr: SocketAddr,
190    /// The address on which we received the inbound packet.
191    pub local_addr: SocketAddr,
192    /// The receive timestamp of the packet.
193    ///
194    /// Used for the `perf-quic-listener-metrics` feature.
195    pub rx_time: Option<SystemTime>,
196    /// The packet's contents.
197    pub buf: Vec<u8>,
198    /// If set, then `buf` is a GRO buffer containing multiple packets.
199    /// Each individual packet has a size of `gso` (except for the last one).
200    pub gro: Option<i32>,
201    /// [SO_MARK] control message value received from the socket.
202    ///
203    /// This will always be `None` after the connection has been spawned as
204    /// the message is `take()`d before spawning.
205    ///
206    /// [SO_MARK]: https://man7.org/linux/man-pages/man7/socket.7.html
207    #[cfg(target_os = "linux")]
208    pub so_mark_data: Option<[u8; 4]>,
209}
210
211/// A QUIC connection that has not performed a handshake yet.
212///
213/// This type is currently only used for server-side connections. It is created
214/// and added to the listener's connection stream after an initial packet from
215/// a client has been received and (optionally) the client's IP address has been
216/// validated.
217///
218/// To turn the initial connection into a fully established one, a QUIC
219/// handshake must be performed. Users have multiple options to facilitate this:
220/// - `start` is a simple entrypoint which spawns a task to handle the entire
221///   lifetime of the QUIC connection. The caller can then only communicate with
222///   the connection via their [`ApplicationOverQuic`].
223/// - `handshake` spawns a task for the handshake and awaits its completion.
224///   Afterwards, it pauses the connection and allows the caller to resume it
225///   later via an opaque struct. We spawn a separate task to allow the tokio
226///   scheduler free choice in where to run the handshake.
227/// - `handshake_fut` returns a future to drive the handshake for maximum
228///   flexibility.
229#[must_use = "call InitialQuicConnection::start to establish the connection"]
230pub struct InitialQuicConnection<Tx, M>
231where
232    Tx: DatagramSocketSend + Send + 'static + ?Sized,
233    M: Metrics,
234{
235    params: QuicConnectionParams<Tx, M>,
236    pub(crate) audit_log_stats: Arc<QuicAuditStats>,
237    stats: QuicConnectionStatsShared,
238    pub(crate) incoming_ev_sender: mpsc::Sender<Incoming>,
239    incoming_ev_receiver: mpsc::Receiver<Incoming>,
240}
241
242impl<Tx, M> InitialQuicConnection<Tx, M>
243where
244    Tx: DatagramSocketSend + Send + 'static + ?Sized,
245    M: Metrics,
246{
247    #[inline]
248    pub(crate) fn new(params: QuicConnectionParams<Tx, M>) -> Self {
249        let (incoming_ev_sender, incoming_ev_receiver) =
250            mpsc::channel(INCOMING_QUEUE_SIZE);
251        let audit_log_stats = Arc::new(QuicAuditStats::new(params.scid.to_vec()));
252
253        let stats = Arc::new(Mutex::new(QuicConnectionStats::from_conn(
254            &params.quiche_conn,
255        )));
256
257        Self {
258            params,
259            audit_log_stats,
260            stats,
261            incoming_ev_sender,
262            incoming_ev_receiver,
263        }
264    }
265
266    /// The local address this connection listens on.
267    pub fn local_addr(&self) -> SocketAddr {
268        self.params.local_addr
269    }
270
271    /// The remote address for this connection.
272    pub fn peer_addr(&self) -> SocketAddr {
273        self.params.peer_addr
274    }
275
276    /// [boring]'s SSL object for this connection.
277    #[doc(hidden)]
278    pub fn ssl_mut(&mut self) -> &mut SslRef {
279        // Deref to pick `Connection::as_mut` over `Box::as_mut`.
280        (*self.params.quiche_conn).as_mut()
281    }
282
283    /// A handle to the [`QuicAuditStats`] for this connection.
284    ///
285    /// # Note
286    /// These stats are updated during the lifetime of the connection.
287    /// The getter exists to grab a handle early on, which can then
288    /// be stowed away and read out after the connection has closed.
289    #[inline]
290    pub fn audit_log_stats(&self) -> Arc<QuicAuditStats> {
291        Arc::clone(&self.audit_log_stats)
292    }
293
294    /// A handle to the [`QuicConnectionStats`] for this connection.
295    ///
296    /// # Note
297    /// Initially, these stats represent the state when the [quiche::Connection]
298    /// was created. They are updated when the connection is closed, so this
299    /// getter exists primarily to grab a handle early on.
300    #[inline]
301    pub fn stats(&self) -> &QuicConnectionStatsShared {
302        &self.stats
303    }
304
305    /// Creates a future to drive the connection's handshake.
306    ///
307    /// This is a lower-level alternative to the `handshake` function which
308    /// gives the caller more control over execution of the future. See
309    /// `handshake` for details on the return values.
310    pub fn handshake_fut<A: ApplicationOverQuic>(
311        self, app: A,
312    ) -> (
313        QuicConnection,
314        impl Future<Output = io::Result<Running<Arc<Tx>, M, A>>> + Send + 'static,
315    ) {
316        self.params.metrics.connections_in_memory().inc();
317
318        let conn = QuicConnection {
319            local_addr: self.params.local_addr,
320            peer_addr: self.params.peer_addr,
321            audit_log_stats: Arc::clone(&self.audit_log_stats),
322            stats: Arc::clone(&self.stats),
323            scid: self.params.scid,
324        };
325        let context = ConnectionStageContext {
326            in_pkt: self.params.initial_pkt,
327            incoming_pkt_receiver: self.incoming_ev_receiver,
328            application: app,
329            stats: Arc::clone(&self.stats),
330        };
331        let conn_stage = Handshake {
332            handshake_info: self.params.handshake_info,
333        };
334        let params = IoWorkerParams {
335            socket: MaybeConnectedSocket::new(self.params.socket),
336            shutdown_tx: self.params.shutdown_tx,
337            cfg: self.params.writer_cfg,
338            audit_log_stats: self.audit_log_stats,
339            write_state: WriteState::default(),
340            conn_map_cmd_tx: self.params.conn_map_cmd_tx,
341            cid_generator: self.params.cid_generator,
342            #[cfg(feature = "perf-quic-listener-metrics")]
343            init_rx_time: self.params.init_rx_time,
344            metrics: self.params.metrics.clone(),
345        };
346
347        let handshake_fut = async move {
348            let qconn = self.params.quiche_conn;
349            let handshake_done =
350                IoWorker::new(params, conn_stage).run(qconn, context).await;
351
352            match handshake_done {
353                RunningOrClosing::Running(r) => Ok(r),
354                RunningOrClosing::Closing(Closing {
355                    params,
356                    work_loop_result,
357                    mut context,
358                    mut qconn,
359                }) => {
360                    let hs_result = make_handshake_result(&work_loop_result);
361                    IoWorker::new(params, Close { work_loop_result })
362                        .close(&mut qconn, &mut context)
363                        .await;
364                    hs_result
365                },
366            }
367        };
368
369        (conn, handshake_fut)
370    }
371
372    /// Performs the QUIC handshake in a separate tokio task and awaits its
373    /// completion.
374    ///
375    /// The returned [`QuicConnection`] holds metadata about the established
376    /// connection. The connection itself is paused after `handshake`
377    /// returns and must be resumed by passing the opaque `Running` value to
378    /// [`InitialQuicConnection::resume`]. This two-step process
379    /// allows callers to collect telemetry and run code before serving their
380    /// [`ApplicationOverQuic`].
381    pub async fn handshake<A: ApplicationOverQuic>(
382        self, app: A,
383    ) -> io::Result<(QuicConnection, Running<Arc<Tx>, M, A>)> {
384        let task_metrics = self.params.metrics.clone();
385        let (conn, handshake_fut) = Self::handshake_fut(self, app);
386
387        let handshake_handle = crate::metrics::tokio_task::spawn(
388            "quic_handshake_worker",
389            task_metrics,
390            handshake_fut,
391        );
392
393        // `AbortOnDropHandle` simulates task-killswitch behavior without needing
394        // to give up ownership of the `JoinHandle`.
395        let handshake_abort_handle = AbortOnDropHandle::new(handshake_handle);
396
397        let worker = handshake_abort_handle.await??;
398
399        Ok((conn, worker))
400    }
401
402    /// Resumes a QUIC connection which was paused after a successful handshake.
403    pub fn resume<A: ApplicationOverQuic>(pre_running: Running<Arc<Tx>, M, A>) {
404        let task_metrics = pre_running.params.metrics.clone();
405        let fut = async move {
406            let Running {
407                params,
408                context,
409                qconn,
410            } = pre_running;
411            let running_worker = IoWorker::new(params, RunningApplication);
412
413            let Closing {
414                params,
415                mut context,
416                work_loop_result,
417                mut qconn,
418            } = running_worker.run(qconn, context).await;
419
420            IoWorker::new(params, Close { work_loop_result })
421                .close(&mut qconn, &mut context)
422                .await;
423        };
424
425        crate::metrics::tokio_task::spawn_with_killswitch(
426            "quic_io_worker",
427            task_metrics,
428            fut,
429        );
430    }
431
432    /// Drives a QUIC connection from handshake to close in separate tokio
433    /// tasks.
434    ///
435    /// It combines [`InitialQuicConnection::handshake`] and
436    /// [`InitialQuicConnection::resume`] into a single call.
437    pub fn start<A: ApplicationOverQuic>(self, app: A) -> QuicConnection {
438        let task_metrics = self.params.metrics.clone();
439        let (conn, handshake_fut) = Self::handshake_fut(self, app);
440        // Pin to the heap so the spawned task only carries a pointer to it
441        // instead of inlining the full future state across the await.
442        let handshake_fut = Box::pin(handshake_fut);
443
444        let fut = async move {
445            match handshake_fut.await {
446                Ok(running) => Self::resume(running),
447                Err(e) => {
448                    log::error!("QUIC handshake failed in IQC::start"; "error" => e)
449                },
450            }
451        };
452
453        crate::metrics::tokio_task::spawn_with_killswitch(
454            "quic_handshake_worker",
455            task_metrics,
456            fut,
457        );
458
459        conn
460    }
461}
462
463pub(crate) struct QuicConnectionParams<Tx, M>
464where
465    Tx: DatagramSocketSend + Send + 'static + ?Sized,
466    M: Metrics,
467{
468    pub writer_cfg: WriterConfig,
469    pub initial_pkt: Option<Incoming>,
470    pub shutdown_tx: mpsc::Sender<()>,
471    pub conn_map_cmd_tx: mpsc::UnboundedSender<ConnectionMapCommand>, /* channel that signals connection map changes */
472    pub scid: ConnectionId<'static>,
473    pub cid_generator: Option<SharedConnectionIdGenerator>,
474    pub metrics: M,
475    #[cfg(feature = "perf-quic-listener-metrics")]
476    pub init_rx_time: Option<SystemTime>,
477    pub handshake_info: HandshakeInfo,
478    /// Boxed because this value is moved by-value through several nested
479    /// async state machines. Inlining a [`QuicheConnection`] here would
480    /// duplicate its payload across the future state slots that hold it
481    /// across an `.await`.
482    pub quiche_conn: Box<QuicheConnection>,
483    pub socket: Arc<Tx>,
484    pub local_addr: SocketAddr,
485    pub peer_addr: SocketAddr,
486}
487
488/// Metadata about an established QUIC connection.
489///
490/// While this struct allows access to some facets of a QUIC connection, it
491/// notably does not represent the [quiche::Connection] itself. The crate
492/// handles most interactions with [quiche] internally in a worker task. Users
493/// can only access the connection directly via their [`ApplicationOverQuic`]
494/// implementation.
495///
496/// See the [module-level docs](crate::quic) for an overview of how a QUIC
497/// connection is handled internally.
498pub struct QuicConnection {
499    local_addr: SocketAddr,
500    peer_addr: SocketAddr,
501    audit_log_stats: Arc<QuicAuditStats>,
502    stats: QuicConnectionStatsShared,
503    scid: ConnectionId<'static>,
504}
505
506impl QuicConnection {
507    /// The local address this connection listens on.
508    #[inline]
509    pub fn local_addr(&self) -> SocketAddr {
510        self.local_addr
511    }
512
513    /// The remote address for this connection.
514    #[inline]
515    pub fn peer_addr(&self) -> SocketAddr {
516        self.peer_addr
517    }
518
519    /// A handle to the [`QuicAuditStats`] for this connection.
520    ///
521    /// # Note
522    /// These stats are updated during the lifetime of the connection.
523    /// The getter exists to grab a handle early on, which can then
524    /// be stowed away and read out after the connection has closed.
525    #[inline]
526    pub fn audit_log_stats(&self) -> &Arc<QuicAuditStats> {
527        &self.audit_log_stats
528    }
529
530    /// A handle to the [`QuicConnectionStats`] for this connection.
531    ///
532    /// # Note
533    /// Initially, these stats represent the state when the [quiche::Connection]
534    /// was created. They are updated when the connection is closed, so this
535    /// getter exists primarily to grab a handle early on.
536    #[inline]
537    pub fn stats(&self) -> &QuicConnectionStatsShared {
538        &self.stats
539    }
540
541    /// The QUIC source connection ID used by this connection.
542    #[inline]
543    pub fn scid(&self) -> &ConnectionId<'static> {
544        &self.scid
545    }
546}
547
548impl AsSocketStats for QuicConnection {
549    #[inline]
550    fn as_socket_stats(&self) -> SocketStats {
551        // It is important to note that those stats are only updated when
552        // the connection stops, which is fine, since this is only used to
553        // log after the connection is finished.
554        self.stats.lock().unwrap().as_socket_stats()
555    }
556
557    #[inline]
558    fn as_quic_stats(&self) -> Option<&Arc<QuicAuditStats>> {
559        Some(&self.audit_log_stats)
560    }
561}
562
563impl<Tx, M> AsSocketStats for InitialQuicConnection<Tx, M>
564where
565    Tx: DatagramSocketSend + Send + 'static + ?Sized,
566    M: Metrics,
567{
568    #[inline]
569    fn as_socket_stats(&self) -> SocketStats {
570        // It is important to note that those stats are only updated when
571        // the connection stops, which is fine, since this is only used to
572        // log after the connection is finished.
573        self.stats.lock().unwrap().as_socket_stats()
574    }
575
576    #[inline]
577    fn as_quic_stats(&self) -> Option<&Arc<QuicAuditStats>> {
578        Some(&self.audit_log_stats)
579    }
580}
581
582impl<Tx, M> ShutdownConnection for InitialQuicConnection<Tx, M>
583where
584    Tx: DatagramSocketSend + Send + 'static + ?Sized,
585    M: Metrics,
586{
587    #[inline]
588    fn poll_shutdown(
589        &mut self, _cx: &mut std::task::Context,
590    ) -> std::task::Poll<io::Result<()>> {
591        // TODO: Does nothing at the moment. We always call Self::start
592        // anyway so it's not really important at this moment.
593        Poll::Ready(Ok(()))
594    }
595}
596
597impl ShutdownConnection for QuicConnection {
598    #[inline]
599    fn poll_shutdown(
600        &mut self, _cx: &mut std::task::Context,
601    ) -> std::task::Poll<io::Result<()>> {
602        // TODO: does nothing at the moment
603        Poll::Ready(Ok(()))
604    }
605}
606
607/// Details about a connection's QUIC handshake.
608#[derive(Debug, Clone)]
609pub struct HandshakeInfo {
610    /// The time at which the connection was created.
611    start_time: Instant,
612    /// The timeout before which the handshake must complete.
613    timeout: Option<Duration>,
614    /// The real duration that the handshake took to complete.
615    time_handshake: Option<Duration>,
616}
617
618impl HandshakeInfo {
619    pub(crate) fn new(start_time: Instant, timeout: Option<Duration>) -> Self {
620        Self {
621            start_time,
622            timeout,
623            time_handshake: None,
624        }
625    }
626
627    /// The time at which the connection was created.
628    #[inline]
629    pub fn start_time(&self) -> Instant {
630        self.start_time
631    }
632
633    /// How long the handshake took to complete.
634    #[inline]
635    pub fn elapsed(&self) -> Duration {
636        self.time_handshake.unwrap_or_default()
637    }
638
639    pub(crate) fn set_elapsed(&mut self) {
640        let elapsed = self.start_time.elapsed();
641        self.time_handshake = Some(elapsed)
642    }
643
644    pub(crate) fn deadline(&self) -> Option<Instant> {
645        self.timeout.map(|timeout| self.start_time + timeout)
646    }
647
648    pub(crate) fn is_expired(&self) -> bool {
649        self.timeout
650            .is_some_and(|timeout| self.start_time.elapsed() >= timeout)
651    }
652}
653
654/// A trait to implement an application served over QUIC.
655///
656/// The application is driven by an internal worker task, which also handles I/O
657/// for the connection. The worker feeds inbound packets into the
658/// [quiche::Connection], calls [`ApplicationOverQuic::process_reads`] followed
659/// by [`ApplicationOverQuic::process_writes`], and then flushes any pending
660/// outbound packets to the network. This repeats in a loop until either the
661/// connection is closed or the [`ApplicationOverQuic`] returns an error.
662///
663/// In between loop iterations, the worker yields until a new packet arrives, a
664/// timer expires, or [`ApplicationOverQuic::wait_for_data`] resolves.
665/// Implementors can interact with the underlying connection via the mutable
666/// reference passed to trait methods.
667#[allow(unused_variables)] // for default functions
668pub trait ApplicationOverQuic: Send + 'static {
669    /// Callback to customize the [`ApplicationOverQuic`] after the QUIC
670    /// handshake completed successfully.
671    ///
672    /// # Errors
673    /// Returning an error from this method immediately stops the worker loop
674    /// and transitions to the connection closing stage.
675    fn on_conn_established(
676        &mut self, qconn: &mut QuicheConnection, handshake_info: &HandshakeInfo,
677    ) -> QuicResult<()>;
678
679    /// Determines whether the application's methods will be called by the
680    /// worker.
681    ///
682    /// The function is checked in each iteration of the worker loop. Only
683    /// `on_conn_established()` and `buffer()` bypass this check.
684    fn should_act(&self) -> bool;
685
686    /// A borrowed buffer for the worker to write outbound packets into.
687    ///
688    /// This method allows sharing a buffer between the worker and the
689    /// application, efficiently using the allocated memory while the
690    /// application is inactive. It can also be used to artificially
691    /// restrict the size of outbound network packets.
692    ///
693    /// Any data in the buffer may be overwritten by the worker. If necessary,
694    /// the application should save the contents when this method is called.
695    fn buffer(&mut self) -> &mut [u8];
696
697    /// Waits for an event to trigger the next iteration of the worker loop.
698    ///
699    /// The returned future is awaited in parallel to inbound packets and the
700    /// connection's timers. Any one of those futures resolving triggers the
701    /// next loop iteration, so implementations should not rely on
702    /// `wait_for_data` for the bulk of their processing. Instead, after
703    /// `wait_for_data` resolves, `process_writes` should be used to pull all
704    /// available data out of the event source (for example, a channel).
705    ///
706    /// As for any future, it is **very important** that this method does not
707    /// block the runtime. If it does, the other concurrent futures will be
708    /// starved.
709    ///
710    /// # Cancel safety
711    /// This method MUST be cancel safe.
712    /// It gets called inside select! and could be (repeatedly) cancelled
713    ///
714    /// # Errors
715    /// Returning an error from this method immediately stops the worker loop
716    /// and transitions to the connection closing stage.
717    fn wait_for_data(
718        &mut self, qconn: &mut QuicheConnection,
719    ) -> impl Future<Output = QuicResult<()>> + Send;
720
721    /// Processes data received on the connection.
722    ///
723    /// This method is only called if `should_act()` returns `true` and any
724    /// packets were received since the last worker loop iteration. It
725    /// should be used to read from the connection's open streams.
726    ///
727    /// # Errors
728    /// Returning an error from this method immediately stops the worker loop
729    /// and transitions to the connection closing stage.
730    fn process_reads(&mut self, qconn: &mut QuicheConnection) -> QuicResult<()>;
731
732    /// Adds data to be sent on the connection.
733    ///
734    /// Unlike `process_reads`, this method is called on every iteration of the
735    /// worker loop (provided `should_act()` returns true). It is called
736    /// after `process_reads` and immediately before packets are pushed to
737    /// the socket. The main use case is providing already-buffered data to
738    /// the [quiche::Connection].
739    ///
740    /// # Errors
741    /// Returning an error from this method immediately stops the worker loop
742    /// and transitions to the connection closing stage.
743    fn process_writes(&mut self, qconn: &mut QuicheConnection) -> QuicResult<()>;
744
745    /// Callback to inspect the result of the worker task, before a final packet
746    /// with a `CONNECTION_CLOSE` frame is flushed to the network.
747    ///
748    /// `connection_result` is [`Ok`] only if the connection was closed without
749    /// any local error. Otherwise, the state of `qconn` depends on the
750    /// error type and application behavior.
751    fn on_conn_close<M: Metrics>(
752        &mut self, qconn: &mut QuicheConnection, metrics: &M,
753        connection_result: &QuicResult<()>,
754    ) {
755    }
756}
757
758/// A command to execute on a [quiche::Connection] in the context of an
759/// [`ApplicationOverQuic`].
760///
761/// We expect most [`ApplicationOverQuic`] implementations (such as
762/// [H3Driver](crate::http3::driver::H3Driver)) will provide some way to submit
763/// actions for them to take, for example via a channel. This enum may be
764/// accepted as part of those actions to inspect or alter the state of the
765/// underlying connection.
766pub enum QuicCommand {
767    /// Close the connection with the given parameters.
768    ///
769    /// Some packets may still be sent after this command has been executed, so
770    /// the worker task may continue running for a bit. See
771    /// [`quiche::Connection::close`] for details.
772    ConnectionClose(ConnectionShutdownBehaviour),
773    /// Execute a custom callback on the connection.
774    Custom(Box<dyn FnOnce(&mut QuicheConnection) + Send + 'static>),
775    /// Collect the current [`SocketStats`] from the connection.
776    ///
777    /// Unlike [`QuicConnection::stats()`], these statistics are not cached and
778    /// instead are retrieved right before the command is executed.
779    Stats(Box<dyn FnOnce(datagram_socket::SocketStats) + Send + 'static>),
780    /// Collect the current [`QuicConnectionStats`] from the connection.
781    ///
782    /// These statistics are not cached and instead are retrieved right before
783    /// the command is executed.
784    ConnectionStats(Box<dyn FnOnce(QuicConnectionStats) + Send + 'static>),
785}
786
787impl QuicCommand {
788    /// Consume the command and perform its operation on `qconn`.
789    ///
790    /// This method should be called by [`ApplicationOverQuic`] implementations
791    /// when they receive a [`QuicCommand`] to execute.
792    pub fn execute(self, qconn: &mut QuicheConnection) {
793        match self {
794            Self::ConnectionClose(behavior) => {
795                let ConnectionShutdownBehaviour {
796                    send_application_close,
797                    error_code,
798                    reason,
799                } = behavior;
800
801                let _ = qconn.close(send_application_close, error_code, &reason);
802            },
803            Self::Custom(f) => {
804                (f)(qconn);
805            },
806            Self::Stats(callback) => {
807                let stats_pair = QuicConnectionStats::from_conn(qconn);
808                (callback)(stats_pair.as_socket_stats());
809            },
810            Self::ConnectionStats(callback) => {
811                let stats_pair = QuicConnectionStats::from_conn(qconn);
812                (callback)(stats_pair);
813            },
814        }
815    }
816}
817
818impl fmt::Debug for QuicCommand {
819    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
820        match self {
821            Self::ConnectionClose(b) =>
822                f.debug_tuple("ConnectionClose").field(b).finish(),
823            Self::Custom(_) => f.debug_tuple("Custom").finish_non_exhaustive(),
824            Self::Stats(_) => f.debug_tuple("Stats").finish_non_exhaustive(),
825            Self::ConnectionStats(_) =>
826                f.debug_tuple("ConnectionStats").finish_non_exhaustive(),
827        }
828    }
829}
830
831/// Parameters to close a [quiche::Connection].
832///
833/// The connection will use these parameters for the `CONNECTION_CLOSE` frame
834/// it sends to its peer.
835#[derive(Debug, Clone)]
836pub struct ConnectionShutdownBehaviour {
837    /// Whether to send an application close or a regular close to the peer.
838    ///
839    /// If this is true but the connection is not in a state where it is safe to
840    /// send an application error (not established nor in early data), in
841    /// accordance with [RFC 9000](https://www.rfc-editor.org/rfc/rfc9000.html#section-10.2.3-3), the
842    /// error code is changed to `APPLICATION_ERROR` and the reason phrase is
843    /// cleared.
844    pub send_application_close: bool,
845    /// The [QUIC][proto-err] or [application-level][app-err] error code to send
846    /// to the peer.
847    ///
848    /// [proto-err]: https://www.rfc-editor.org/rfc/rfc9000.html#section-20.1
849    /// [app-err]: https://www.rfc-editor.org/rfc/rfc9000.html#section-20.2
850    pub error_code: u64,
851    /// The reason phrase to send to the peer.
852    pub reason: Vec<u8>,
853}