tokio_quiche/quic/connection/
mod.rs

1// Copyright (C) 2025, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27mod error;
28mod id;
29mod map;
30
31pub use self::error::HandshakeError;
32pub use self::id::ConnectionIdGenerator;
33pub use self::id::SimpleConnectionIdGenerator;
34pub(crate) use self::map::ConnectionMap;
35
36use boring::ssl::SslRef;
37use datagram_socket::AsSocketStats;
38use datagram_socket::DatagramSocketSend;
39use datagram_socket::MaybeConnectedSocket;
40use datagram_socket::QuicAuditStats;
41use datagram_socket::ShutdownConnection;
42use datagram_socket::SocketStats;
43use foundations::telemetry::log;
44use futures::future::BoxFuture;
45use futures::Future;
46use quiche::ConnectionId;
47use std::fmt;
48use std::io;
49use std::net::SocketAddr;
50use std::sync::Arc;
51use std::sync::Mutex;
52use std::task::Poll;
53use std::time::Duration;
54use std::time::Instant;
55use std::time::SystemTime;
56use tokio::sync::mpsc;
57use tokio_util::task::AbortOnDropHandle;
58
59use self::error::make_handshake_result;
60use super::io::connection_stage::Close;
61use super::io::connection_stage::ConnectionStageContext;
62use super::io::connection_stage::Handshake;
63use super::io::connection_stage::RunningApplication;
64use super::io::worker::Closing;
65use super::io::worker::IoWorkerParams;
66use super::io::worker::Running;
67use super::io::worker::RunningOrClosing;
68use super::io::worker::WriteState;
69use super::QuicheConnection;
70use crate::buf_factory::PooledBuf;
71use crate::metrics::Metrics;
72use crate::quic::io::worker::IoWorker;
73use crate::quic::io::worker::WriterConfig;
74use crate::quic::io::worker::INCOMING_QUEUE_SIZE;
75use crate::quic::router::ConnectionMapCommand;
76use crate::QuicResult;
77
78/// Wrapper for connection statistics recorded by [quiche].
79#[derive(Debug)]
80pub struct QuicConnectionStats {
81    /// Aggregate connection statistics across all paths.
82    pub stats: quiche::Stats,
83    /// Specific statistics about the connection's active path.
84    pub path_stats: Option<quiche::PathStats>,
85}
86pub(crate) type QuicConnectionStatsShared = Arc<Mutex<QuicConnectionStats>>;
87
88impl QuicConnectionStats {
89    pub(crate) fn from_conn(qconn: &QuicheConnection) -> Self {
90        Self {
91            stats: qconn.stats(),
92            path_stats: qconn.path_stats().next(),
93        }
94    }
95
96    fn startup_exit_to_socket_stats(
97        value: quiche::StartupExit,
98    ) -> datagram_socket::StartupExit {
99        let reason = match value.reason {
100            quiche::StartupExitReason::Loss =>
101                datagram_socket::StartupExitReason::Loss,
102            quiche::StartupExitReason::BandwidthPlateau =>
103                datagram_socket::StartupExitReason::BandwidthPlateau,
104            quiche::StartupExitReason::PersistentQueue =>
105                datagram_socket::StartupExitReason::PersistentQueue,
106        };
107
108        datagram_socket::StartupExit {
109            cwnd: value.cwnd,
110            bandwidth: value.bandwidth,
111            reason,
112        }
113    }
114}
115
116impl AsSocketStats for QuicConnectionStats {
117    fn as_socket_stats(&self) -> SocketStats {
118        SocketStats {
119            pmtu: self
120                .path_stats
121                .as_ref()
122                .map(|p| p.pmtu as u16)
123                .unwrap_or_default(),
124            rtt_us: self
125                .path_stats
126                .as_ref()
127                .map(|p| p.rtt.as_micros() as i64)
128                .unwrap_or_default(),
129            min_rtt_us: self
130                .path_stats
131                .as_ref()
132                .and_then(|p| p.min_rtt.map(|x| x.as_micros() as i64))
133                .unwrap_or_default(),
134            max_rtt_us: self
135                .path_stats
136                .as_ref()
137                .and_then(|p| p.max_rtt.map(|x| x.as_micros() as i64))
138                .unwrap_or_default(),
139            rtt_var_us: self
140                .path_stats
141                .as_ref()
142                .map(|p| p.rttvar.as_micros() as i64)
143                .unwrap_or_default(),
144            cwnd: self
145                .path_stats
146                .as_ref()
147                .map(|p| p.cwnd as u64)
148                .unwrap_or_default(),
149            total_pto_count: self
150                .path_stats
151                .as_ref()
152                .map(|p| p.total_pto_count as u64)
153                .unwrap_or_default(),
154            packets_sent: self.stats.sent as u64,
155            packets_recvd: self.stats.recv as u64,
156            packets_lost: self.stats.lost as u64,
157            packets_lost_spurious: self.stats.spurious_lost as u64,
158            packets_retrans: self.stats.retrans as u64,
159            bytes_sent: self.stats.sent_bytes,
160            bytes_recvd: self.stats.recv_bytes,
161            bytes_lost: self.stats.lost_bytes,
162            bytes_retrans: self.stats.stream_retrans_bytes,
163            bytes_unsent: 0, /* not implemented yet, kept for compatibility
164                              * with TCP */
165            delivery_rate: self
166                .path_stats
167                .as_ref()
168                .map(|p| p.delivery_rate)
169                .unwrap_or_default(),
170            max_bandwidth: self.path_stats.as_ref().and_then(|p| p.max_bandwidth),
171            startup_exit: self
172                .path_stats
173                .as_ref()
174                .and_then(|p| p.startup_exit)
175                .map(QuicConnectionStats::startup_exit_to_socket_stats),
176            bytes_in_flight_duration_us: self
177                .stats
178                .bytes_in_flight_duration
179                .as_micros() as u64,
180        }
181    }
182}
183
184/// A received network packet with additional metadata.
185#[derive(Debug)]
186pub struct Incoming {
187    /// The address that sent the inbound packet.
188    pub peer_addr: SocketAddr,
189    /// The address on which we received the inbound packet.
190    pub local_addr: SocketAddr,
191    /// The receive timestamp of the packet.
192    ///
193    /// Used for the `perf-quic-listener-metrics` feature.
194    pub rx_time: Option<SystemTime>,
195    /// The packet's contents.
196    pub buf: PooledBuf,
197    /// If set, then `buf` is a GRO buffer containing multiple packets.
198    /// Each individual packet has a size of `gso` (except for the last one).
199    pub gro: Option<u16>,
200}
201
202/// A QUIC connection that has not performed a handshake yet.
203///
204/// This type is currently only used for server-side connections. It is created
205/// and added to the listener's connection stream after an initial packet from
206/// a client has been received and (optionally) the client's IP address has been
207/// validated.
208///
209/// To turn the initial connection into a fully established one, a QUIC
210/// handshake must be performed. Users have multiple options to facilitate this:
211/// - `start` is a simple entrypoint which spawns a task to handle the entire
212///   lifetime of the QUIC connection. The caller can then only communicate with
213///   the connection via their [`ApplicationOverQuic`].
214/// - `handshake` spawns a task for the handshake and awaits its completion.
215///   Afterwards, it pauses the connection and allows the caller to resume it
216///   later via an opaque struct. We spawn a separate task to allow the tokio
217///   scheduler free choice in where to run the handshake.
218/// - `handshake_fut` returns a future to drive the handshake for maximum
219///   flexibility.
220#[must_use = "call InitialQuicConnection::start to establish the connection"]
221pub struct InitialQuicConnection<Tx, M>
222where
223    Tx: DatagramSocketSend + Send + 'static + ?Sized,
224    M: Metrics,
225{
226    /// An internal ID, to uniquely identify the connection across multiple QUIC
227    /// connection IDs.
228    pub(crate) id: u64,
229    params: QuicConnectionParams<Tx, M>,
230    pub(crate) audit_log_stats: Arc<QuicAuditStats>,
231    stats: QuicConnectionStatsShared,
232    pub(crate) incoming_ev_sender: mpsc::Sender<Incoming>,
233    incoming_ev_receiver: mpsc::Receiver<Incoming>,
234}
235
236impl<Tx, M> InitialQuicConnection<Tx, M>
237where
238    Tx: DatagramSocketSend + Send + 'static + ?Sized,
239    M: Metrics,
240{
241    #[inline]
242    pub(crate) fn new(params: QuicConnectionParams<Tx, M>) -> Self {
243        let (incoming_ev_sender, incoming_ev_receiver) =
244            mpsc::channel(INCOMING_QUEUE_SIZE);
245        let audit_log_stats = Arc::new(QuicAuditStats::new(params.scid.to_vec()));
246
247        let stats = Arc::new(Mutex::new(QuicConnectionStats::from_conn(
248            &params.quiche_conn,
249        )));
250
251        Self {
252            id: Self::generate_id(),
253            params,
254            audit_log_stats,
255            stats,
256            incoming_ev_sender,
257            incoming_ev_receiver,
258        }
259    }
260
261    /// The local address this connection listens on.
262    pub fn local_addr(&self) -> SocketAddr {
263        self.params.local_addr
264    }
265
266    /// The remote address for this connection.
267    pub fn peer_addr(&self) -> SocketAddr {
268        self.params.peer_addr
269    }
270
271    /// [boring]'s SSL object for this connection.
272    #[doc(hidden)]
273    pub fn ssl_mut(&mut self) -> &mut SslRef {
274        self.params.quiche_conn.as_mut()
275    }
276
277    /// A handle to the [`QuicAuditStats`] for this connection.
278    ///
279    /// # Note
280    /// These stats are updated during the lifetime of the connection.
281    /// The getter exists to grab a handle early on, which can then
282    /// be stowed away and read out after the connection has closed.
283    #[inline]
284    pub fn audit_log_stats(&self) -> Arc<QuicAuditStats> {
285        Arc::clone(&self.audit_log_stats)
286    }
287
288    /// A handle to the [`QuicConnectionStats`] for this connection.
289    ///
290    /// # Note
291    /// Initially, these stats represent the state when the [quiche::Connection]
292    /// was created. They are updated when the connection is closed, so this
293    /// getter exists primarily to grab a handle early on.
294    #[inline]
295    pub fn stats(&self) -> &QuicConnectionStatsShared {
296        &self.stats
297    }
298
299    /// Creates a future to drive the connection's handshake.
300    ///
301    /// This is a lower-level alternative to the `handshake` function which
302    /// gives the caller more control over execution of the future. See
303    /// `handshake` for details on the return values.
304    #[allow(clippy::type_complexity)]
305    pub fn handshake_fut<A: ApplicationOverQuic>(
306        self, app: A,
307    ) -> (
308        QuicConnection,
309        BoxFuture<'static, io::Result<Running<Arc<Tx>, M, A>>>,
310    ) {
311        self.params.metrics.connections_in_memory().inc();
312
313        let conn = QuicConnection {
314            local_addr: self.params.local_addr,
315            peer_addr: self.params.peer_addr,
316            audit_log_stats: Arc::clone(&self.audit_log_stats),
317            stats: Arc::clone(&self.stats),
318            scid: self.params.scid,
319        };
320        let context = ConnectionStageContext {
321            in_pkt: self.params.initial_pkt,
322            incoming_pkt_receiver: self.incoming_ev_receiver,
323            application: app,
324            stats: Arc::clone(&self.stats),
325        };
326        let conn_stage = Handshake {
327            handshake_info: self.params.handshake_info,
328        };
329        let params = IoWorkerParams {
330            socket: MaybeConnectedSocket::new(self.params.socket),
331            shutdown_tx: self.params.shutdown_tx,
332            cfg: self.params.writer_cfg,
333            audit_log_stats: self.audit_log_stats,
334            write_state: WriteState::default(),
335            conn_map_cmd_tx: self.params.conn_map_cmd_tx,
336            #[cfg(feature = "perf-quic-listener-metrics")]
337            init_rx_time: self.params.init_rx_time,
338            metrics: self.params.metrics.clone(),
339        };
340
341        let handshake_fut = async move {
342            let qconn = self.params.quiche_conn;
343            let handshake_done =
344                IoWorker::new(params, conn_stage).run(qconn, context).await;
345
346            match handshake_done {
347                RunningOrClosing::Running(r) => Ok(r),
348                RunningOrClosing::Closing(Closing {
349                    params,
350                    work_loop_result,
351                    mut context,
352                    mut qconn,
353                }) => {
354                    let hs_result = make_handshake_result(&work_loop_result);
355                    IoWorker::new(params, Close { work_loop_result })
356                        .close(&mut qconn, &mut context)
357                        .await;
358                    hs_result
359                },
360            }
361        };
362
363        (conn, Box::pin(handshake_fut))
364    }
365
366    /// Performs the QUIC handshake in a separate tokio task and awaits its
367    /// completion.
368    ///
369    /// The returned [`QuicConnection`] holds metadata about the established
370    /// connection. The connection itself is paused after `handshake`
371    /// returns and must be resumed by passing the opaque `Running` value to
372    /// [`InitialQuicConnection::resume`]. This two-step process
373    /// allows callers to collect telemetry and run code before serving their
374    /// [`ApplicationOverQuic`].
375    pub async fn handshake<A: ApplicationOverQuic>(
376        self, app: A,
377    ) -> io::Result<(QuicConnection, Running<Arc<Tx>, M, A>)> {
378        let task_metrics = self.params.metrics.clone();
379        let (conn, handshake_fut) = Self::handshake_fut(self, app);
380
381        let handshake_handle = crate::metrics::tokio_task::spawn(
382            "quic_handshake_worker",
383            task_metrics,
384            handshake_fut,
385        );
386
387        // `AbortOnDropHandle` simulates task-killswitch behavior without needing
388        // to give up ownership of the `JoinHandle`.
389        let handshake_abort_handle = AbortOnDropHandle::new(handshake_handle);
390
391        let worker = handshake_abort_handle.await??;
392
393        Ok((conn, worker))
394    }
395
396    /// Resumes a QUIC connection which was paused after a successful handshake.
397    pub fn resume<A: ApplicationOverQuic>(pre_running: Running<Arc<Tx>, M, A>) {
398        let task_metrics = pre_running.params.metrics.clone();
399        let fut = async move {
400            let Running {
401                params,
402                context,
403                qconn,
404            } = pre_running;
405            let running_worker = IoWorker::new(params, RunningApplication);
406
407            let Closing {
408                params,
409                mut context,
410                work_loop_result,
411                mut qconn,
412            } = running_worker.run(qconn, context).await;
413
414            IoWorker::new(params, Close { work_loop_result })
415                .close(&mut qconn, &mut context)
416                .await;
417        };
418
419        crate::metrics::tokio_task::spawn_with_killswitch(
420            "quic_io_worker",
421            task_metrics,
422            fut,
423        );
424    }
425
426    /// Drives a QUIC connection from handshake to close in separate tokio
427    /// tasks.
428    ///
429    /// It combines [`InitialQuicConnection::handshake`] and
430    /// [`InitialQuicConnection::resume`] into a single call.
431    pub fn start<A: ApplicationOverQuic>(self, app: A) -> QuicConnection {
432        let task_metrics = self.params.metrics.clone();
433        let (conn, handshake_fut) = Self::handshake_fut(self, app);
434
435        let fut = async move {
436            match handshake_fut.await {
437                Ok(running) => Self::resume(running),
438                Err(e) =>
439                    log::error!("QUIC handshake failed in IQC::start"; "error" => e),
440            }
441        };
442
443        crate::metrics::tokio_task::spawn_with_killswitch(
444            "quic_handshake_worker",
445            task_metrics,
446            fut,
447        );
448
449        conn
450    }
451
452    fn generate_id() -> u64 {
453        let mut buf = [0; 8];
454
455        boring::rand::rand_bytes(&mut buf).unwrap();
456
457        u64::from_ne_bytes(buf)
458    }
459}
460
461pub(crate) struct QuicConnectionParams<Tx, M>
462where
463    Tx: DatagramSocketSend + Send + 'static + ?Sized,
464    M: Metrics,
465{
466    pub writer_cfg: WriterConfig,
467    pub initial_pkt: Option<Incoming>,
468    pub shutdown_tx: mpsc::Sender<()>,
469    pub conn_map_cmd_tx: mpsc::UnboundedSender<ConnectionMapCommand>, /* channel that signals connection map changes */
470    pub scid: ConnectionId<'static>,
471    pub metrics: M,
472    #[cfg(feature = "perf-quic-listener-metrics")]
473    pub init_rx_time: Option<SystemTime>,
474    pub handshake_info: HandshakeInfo,
475    pub quiche_conn: QuicheConnection,
476    pub socket: Arc<Tx>,
477    pub local_addr: SocketAddr,
478    pub peer_addr: SocketAddr,
479}
480
481/// Metadata about an established QUIC connection.
482///
483/// While this struct allows access to some facets of a QUIC connection, it
484/// notably does not represent the [quiche::Connection] itself. The crate
485/// handles most interactions with [quiche] internally in a worker task. Users
486/// can only access the connection directly via their [`ApplicationOverQuic`]
487/// implementation.
488///
489/// See the [module-level docs](crate::quic) for an overview of how a QUIC
490/// connection is handled internally.
491pub struct QuicConnection {
492    local_addr: SocketAddr,
493    peer_addr: SocketAddr,
494    audit_log_stats: Arc<QuicAuditStats>,
495    stats: QuicConnectionStatsShared,
496    scid: ConnectionId<'static>,
497}
498
499impl QuicConnection {
500    /// The local address this connection listens on.
501    #[inline]
502    pub fn local_addr(&self) -> SocketAddr {
503        self.local_addr
504    }
505
506    /// The remote address for this connection.
507    #[inline]
508    pub fn peer_addr(&self) -> SocketAddr {
509        self.peer_addr
510    }
511
512    /// A handle to the [`QuicAuditStats`] for this connection.
513    ///
514    /// # Note
515    /// These stats are updated during the lifetime of the connection.
516    /// The getter exists to grab a handle early on, which can then
517    /// be stowed away and read out after the connection has closed.
518    #[inline]
519    pub fn audit_log_stats(&self) -> &Arc<QuicAuditStats> {
520        &self.audit_log_stats
521    }
522
523    /// A handle to the [`QuicConnectionStats`] for this connection.
524    ///
525    /// # Note
526    /// Initially, these stats represent the state when the [quiche::Connection]
527    /// was created. They are updated when the connection is closed, so this
528    /// getter exists primarily to grab a handle early on.
529    #[inline]
530    pub fn stats(&self) -> &QuicConnectionStatsShared {
531        &self.stats
532    }
533
534    /// The QUIC source connection ID used by this connection.
535    #[inline]
536    pub fn scid(&self) -> &ConnectionId<'static> {
537        &self.scid
538    }
539}
540
541impl AsSocketStats for QuicConnection {
542    #[inline]
543    fn as_socket_stats(&self) -> SocketStats {
544        // It is important to note that those stats are only updated when
545        // the connection stops, which is fine, since this is only used to
546        // log after the connection is finished.
547        self.stats.lock().unwrap().as_socket_stats()
548    }
549
550    #[inline]
551    fn as_quic_stats(&self) -> Option<&Arc<QuicAuditStats>> {
552        Some(&self.audit_log_stats)
553    }
554}
555
556impl<Tx, M> AsSocketStats for InitialQuicConnection<Tx, M>
557where
558    Tx: DatagramSocketSend + Send + 'static + ?Sized,
559    M: Metrics,
560{
561    #[inline]
562    fn as_socket_stats(&self) -> SocketStats {
563        // It is important to note that those stats are only updated when
564        // the connection stops, which is fine, since this is only used to
565        // log after the connection is finished.
566        self.stats.lock().unwrap().as_socket_stats()
567    }
568
569    #[inline]
570    fn as_quic_stats(&self) -> Option<&Arc<QuicAuditStats>> {
571        Some(&self.audit_log_stats)
572    }
573}
574
575impl<Tx, M> ShutdownConnection for InitialQuicConnection<Tx, M>
576where
577    Tx: DatagramSocketSend + Send + 'static + ?Sized,
578    M: Metrics,
579{
580    #[inline]
581    fn poll_shutdown(
582        &mut self, _cx: &mut std::task::Context,
583    ) -> std::task::Poll<io::Result<()>> {
584        // TODO: Does nothing at the moment. We always call Self::start
585        // anyway so it's not really important at this moment.
586        Poll::Ready(Ok(()))
587    }
588}
589
590impl ShutdownConnection for QuicConnection {
591    #[inline]
592    fn poll_shutdown(
593        &mut self, _cx: &mut std::task::Context,
594    ) -> std::task::Poll<io::Result<()>> {
595        // TODO: does nothing at the moment
596        Poll::Ready(Ok(()))
597    }
598}
599
600/// Details about a connection's QUIC handshake.
601#[derive(Debug, Clone)]
602pub struct HandshakeInfo {
603    /// The time at which the connection was created.
604    start_time: Instant,
605    /// The timeout before which the handshake must complete.
606    timeout: Option<Duration>,
607    /// The real duration that the handshake took to complete.
608    time_handshake: Option<Duration>,
609}
610
611impl HandshakeInfo {
612    pub(crate) fn new(start_time: Instant, timeout: Option<Duration>) -> Self {
613        Self {
614            start_time,
615            timeout,
616            time_handshake: None,
617        }
618    }
619
620    /// The time at which the connection was created.
621    #[inline]
622    pub fn start_time(&self) -> Instant {
623        self.start_time
624    }
625
626    /// How long the handshake took to complete.
627    #[inline]
628    pub fn elapsed(&self) -> Duration {
629        self.time_handshake.unwrap_or_default()
630    }
631
632    pub(crate) fn set_elapsed(&mut self) {
633        let elapsed = self.start_time.elapsed();
634        self.time_handshake = Some(elapsed)
635    }
636
637    pub(crate) fn deadline(&self) -> Option<Instant> {
638        self.timeout.map(|timeout| self.start_time + timeout)
639    }
640
641    pub(crate) fn is_expired(&self) -> bool {
642        self.timeout
643            .is_some_and(|timeout| self.start_time.elapsed() >= timeout)
644    }
645}
646
647/// A trait to implement an application served over QUIC.
648///
649/// The application is driven by an internal worker task, which also handles I/O
650/// for the connection. The worker feeds inbound packets into the
651/// [quiche::Connection], calls [`ApplicationOverQuic::process_reads`] followed
652/// by [`ApplicationOverQuic::process_writes`], and then flushes any pending
653/// outbound packets to the network. This repeats in a loop until either the
654/// connection is closed or the [`ApplicationOverQuic`] returns an error.
655///
656/// In between loop iterations, the worker yields until a new packet arrives, a
657/// timer expires, or [`ApplicationOverQuic::wait_for_data`] resolves.
658/// Implementors can interact with the underlying connection via the mutable
659/// reference passed to trait methods.
660#[allow(unused_variables)] // for default functions
661pub trait ApplicationOverQuic: Send + 'static {
662    /// Callback to customize the [`ApplicationOverQuic`] after the QUIC
663    /// handshake completed successfully.
664    ///
665    /// # Errors
666    /// Returning an error from this method immediately stops the worker loop
667    /// and transitions to the connection closing stage.
668    fn on_conn_established(
669        &mut self, qconn: &mut QuicheConnection, handshake_info: &HandshakeInfo,
670    ) -> QuicResult<()>;
671
672    /// Determines whether the application's methods will be called by the
673    /// worker.
674    ///
675    /// The function is checked in each iteration of the worker loop. Only
676    /// `on_conn_established()` and `buffer()` bypass this check.
677    fn should_act(&self) -> bool;
678
679    /// A borrowed buffer for the worker to write outbound packets into.
680    ///
681    /// This method allows sharing a buffer between the worker and the
682    /// application, efficiently using the allocated memory while the
683    /// application is inactive. It can also be used to artificially
684    /// restrict the size of outbound network packets.
685    ///
686    /// Any data in the buffer may be overwritten by the worker. If necessary,
687    /// the application should save the contents when this method is called.
688    fn buffer(&mut self) -> &mut [u8];
689
690    /// Waits for an event to trigger the next iteration of the worker loop.
691    ///
692    /// The returned future is awaited in parallel to inbound packets and the
693    /// connection's timers. Any one of those futures resolving triggers the
694    /// next loop iteration, so implementations should not rely on
695    /// `wait_for_data` for the bulk of their processing. Instead, after
696    /// `wait_for_data` resolves, `process_writes` should be used to pull all
697    /// available data out of the event source (for example, a channel).
698    ///
699    /// As for any future, it is **very important** that this method does not
700    /// block the runtime. If it does, the other concurrent futures will be
701    /// starved.
702    ///
703    /// # Errors
704    /// Returning an error from this method immediately stops the worker loop
705    /// and transitions to the connection closing stage.
706    fn wait_for_data(
707        &mut self, qconn: &mut QuicheConnection,
708    ) -> impl Future<Output = QuicResult<()>> + Send;
709
710    /// Processes data received on the connection.
711    ///
712    /// This method is only called if `should_act()` returns `true` and any
713    /// packets were received since the last worker loop iteration. It
714    /// should be used to read from the connection's open streams.
715    ///
716    /// # Errors
717    /// Returning an error from this method immediately stops the worker loop
718    /// and transitions to the connection closing stage.
719    fn process_reads(&mut self, qconn: &mut QuicheConnection) -> QuicResult<()>;
720
721    /// Adds data to be sent on the connection.
722    ///
723    /// Unlike `process_reads`, this method is called on every iteration of the
724    /// worker loop (provided `should_act()` returns true). It is called
725    /// after `process_reads` and immediately before packets are pushed to
726    /// the socket. The main use case is providing already-buffered data to
727    /// the [quiche::Connection].
728    ///
729    /// # Errors
730    /// Returning an error from this method immediately stops the worker loop
731    /// and transitions to the connection closing stage.
732    fn process_writes(&mut self, qconn: &mut QuicheConnection) -> QuicResult<()>;
733
734    /// Callback to inspect the result of the worker task, before a final packet
735    /// with a `CONNECTION_CLOSE` frame is flushed to the network.
736    ///
737    /// `connection_result` is [`Ok`] only if the connection was closed without
738    /// any local error. Otherwise, the state of `qconn` depends on the
739    /// error type and application behavior.
740    fn on_conn_close<M: Metrics>(
741        &mut self, qconn: &mut QuicheConnection, metrics: &M,
742        connection_result: &QuicResult<()>,
743    ) {
744    }
745}
746
747/// A command to execute on a [quiche::Connection] in the context of an
748/// [`ApplicationOverQuic`].
749///
750/// We expect most [`ApplicationOverQuic`] implementations (such as
751/// [H3Driver](crate::http3::driver::H3Driver)) will provide some way to submit
752/// actions for them to take, for example via a channel. This enum may be
753/// accepted as part of those actions to inspect or alter the state of the
754/// underlying connection.
755pub enum QuicCommand {
756    /// Close the connection with the given parameters.
757    ///
758    /// Some packets may still be sent after this command has been executed, so
759    /// the worker task may continue running for a bit. See
760    /// [`quiche::Connection::close`] for details.
761    ConnectionClose(ConnectionShutdownBehaviour),
762    /// Execute a custom callback on the connection.
763    Custom(Box<dyn FnOnce(&mut QuicheConnection) + Send + 'static>),
764    /// Collect the current [`SocketStats`] from the connection.
765    ///
766    /// Unlike [`QuicConnection::stats()`], these statistics are not cached and
767    /// instead are retrieved right before the command is executed.
768    Stats(Box<dyn FnOnce(datagram_socket::SocketStats) + Send + 'static>),
769}
770
771impl QuicCommand {
772    /// Consume the command and perform its operation on `qconn`.
773    ///
774    /// This method should be called by [`ApplicationOverQuic`] implementations
775    /// when they receive a [`QuicCommand`] to execute.
776    pub fn execute(self, qconn: &mut QuicheConnection) {
777        match self {
778            Self::ConnectionClose(behavior) => {
779                let ConnectionShutdownBehaviour {
780                    send_application_close,
781                    error_code,
782                    reason,
783                } = behavior;
784
785                let _ = qconn.close(send_application_close, error_code, &reason);
786            },
787            Self::Custom(f) => {
788                (f)(qconn);
789            },
790            Self::Stats(callback) => {
791                let stats_pair = QuicConnectionStats::from_conn(qconn);
792                (callback)(stats_pair.as_socket_stats());
793            },
794        }
795    }
796}
797
798impl fmt::Debug for QuicCommand {
799    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
800        match self {
801            Self::ConnectionClose(b) =>
802                f.debug_tuple("ConnectionClose").field(b).finish(),
803            Self::Custom(_) => f.debug_tuple("Custom").finish_non_exhaustive(),
804            Self::Stats(_) => f.debug_tuple("Stats").finish_non_exhaustive(),
805        }
806    }
807}
808
809/// Parameters to close a [quiche::Connection].
810///
811/// The connection will use these parameters for the `CONNECTION_CLOSE` frame
812/// it sends to its peer.
813#[derive(Debug, Clone)]
814pub struct ConnectionShutdownBehaviour {
815    /// Whether to send an application close or a regular close to the peer.
816    ///
817    /// If this is true but the connection is not in a state where it is safe to
818    /// send an application error (not established nor in early data), in
819    /// accordance with [RFC 9000](https://www.rfc-editor.org/rfc/rfc9000.html#section-10.2.3-3), the
820    /// error code is changed to `APPLICATION_ERROR` and the reason phrase is
821    /// cleared.
822    pub send_application_close: bool,
823    /// The [QUIC][proto-err] or [application-level][app-err] error code to send
824    /// to the peer.
825    ///
826    /// [proto-err]: https://www.rfc-editor.org/rfc/rfc9000.html#section-20.1
827    /// [app-err]: https://www.rfc-editor.org/rfc/rfc9000.html#section-20.2
828    pub error_code: u64,
829    /// The reason phrase to send to the peer.
830    pub reason: Vec<u8>,
831}