tokio_quiche/quic/connection/
mod.rs

1// Copyright (C) 2025, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27mod error;
28mod id;
29mod map;
30
31pub use self::error::HandshakeError;
32pub use self::id::ConnectionIdGenerator;
33pub use self::id::SimpleConnectionIdGenerator;
34pub(crate) use self::map::ConnectionMap;
35
36use boring::ssl::SslRef;
37use datagram_socket::AsSocketStats;
38use datagram_socket::DatagramSocketSend;
39use datagram_socket::MaybeConnectedSocket;
40use datagram_socket::QuicAuditStats;
41use datagram_socket::ShutdownConnection;
42use datagram_socket::SocketStats;
43use foundations::telemetry::log;
44use futures::future::BoxFuture;
45use futures::Future;
46use quiche::ConnectionId;
47use std::fmt;
48use std::io;
49use std::net::SocketAddr;
50use std::sync::Arc;
51use std::sync::Mutex;
52use std::task::Poll;
53use std::time::Duration;
54use std::time::Instant;
55use std::time::SystemTime;
56use tokio::sync::mpsc;
57use tokio_util::task::AbortOnDropHandle;
58
59use self::error::make_handshake_result;
60use super::io::connection_stage::Close;
61use super::io::connection_stage::ConnectionStageContext;
62use super::io::connection_stage::Handshake;
63use super::io::connection_stage::RunningApplication;
64use super::io::worker::Closing;
65use super::io::worker::IoWorkerParams;
66use super::io::worker::Running;
67use super::io::worker::RunningOrClosing;
68use super::io::worker::WriteState;
69use super::QuicheConnection;
70use crate::buf_factory::PooledBuf;
71use crate::metrics::Metrics;
72use crate::quic::io::worker::IoWorker;
73use crate::quic::io::worker::WriterConfig;
74use crate::quic::io::worker::INCOMING_QUEUE_SIZE;
75use crate::quic::router::ConnectionMapCommand;
76use crate::QuicResult;
77
78/// Wrapper for connection statistics recorded by [quiche].
79#[derive(Debug)]
80pub struct QuicConnectionStats {
81    /// Aggregate connection statistics across all paths.
82    pub stats: quiche::Stats,
83    /// Specific statistics about the connection's active path.
84    pub path_stats: Option<quiche::PathStats>,
85}
86pub(crate) type QuicConnectionStatsShared = Arc<Mutex<QuicConnectionStats>>;
87
88impl QuicConnectionStats {
89    pub(crate) fn from_conn(qconn: &QuicheConnection) -> Self {
90        Self {
91            stats: qconn.stats(),
92            path_stats: qconn.path_stats().next(),
93        }
94    }
95
96    fn startup_exit_to_socket_stats(
97        value: quiche::StartupExit,
98    ) -> datagram_socket::StartupExit {
99        let reason = match value.reason {
100            quiche::StartupExitReason::Loss =>
101                datagram_socket::StartupExitReason::Loss,
102            quiche::StartupExitReason::BandwidthPlateau =>
103                datagram_socket::StartupExitReason::BandwidthPlateau,
104            quiche::StartupExitReason::PersistentQueue =>
105                datagram_socket::StartupExitReason::PersistentQueue,
106        };
107
108        datagram_socket::StartupExit {
109            cwnd: value.cwnd,
110            reason,
111        }
112    }
113}
114
115impl AsSocketStats for QuicConnectionStats {
116    fn as_socket_stats(&self) -> SocketStats {
117        SocketStats {
118            pmtu: self
119                .path_stats
120                .as_ref()
121                .map(|p| p.pmtu as u16)
122                .unwrap_or_default(),
123            rtt_us: self
124                .path_stats
125                .as_ref()
126                .map(|p| p.rtt.as_micros() as i64)
127                .unwrap_or_default(),
128            min_rtt_us: self
129                .path_stats
130                .as_ref()
131                .and_then(|p| p.min_rtt.map(|x| x.as_micros() as i64))
132                .unwrap_or_default(),
133            max_rtt_us: self
134                .path_stats
135                .as_ref()
136                .and_then(|p| p.max_rtt.map(|x| x.as_micros() as i64))
137                .unwrap_or_default(),
138            rtt_var_us: self
139                .path_stats
140                .as_ref()
141                .map(|p| p.rttvar.as_micros() as i64)
142                .unwrap_or_default(),
143            cwnd: self
144                .path_stats
145                .as_ref()
146                .map(|p| p.cwnd as u64)
147                .unwrap_or_default(),
148            total_pto_count: self
149                .path_stats
150                .as_ref()
151                .map(|p| p.total_pto_count as u64)
152                .unwrap_or_default(),
153            packets_sent: self.stats.sent as u64,
154            packets_recvd: self.stats.recv as u64,
155            packets_lost: self.stats.lost as u64,
156            packets_lost_spurious: self.stats.spurious_lost as u64,
157            packets_retrans: self.stats.retrans as u64,
158            bytes_sent: self.stats.sent_bytes,
159            bytes_recvd: self.stats.recv_bytes,
160            bytes_lost: self.stats.lost_bytes,
161            bytes_retrans: self.stats.stream_retrans_bytes,
162            bytes_unsent: 0, /* not implemented yet, kept for compatibility
163                              * with TCP */
164            delivery_rate: self
165                .path_stats
166                .as_ref()
167                .map(|p| p.delivery_rate)
168                .unwrap_or_default(),
169            startup_exit: self
170                .path_stats
171                .as_ref()
172                .and_then(|p| p.startup_exit)
173                .map(QuicConnectionStats::startup_exit_to_socket_stats),
174            bytes_in_flight_duration_us: self
175                .stats
176                .bytes_in_flight_duration
177                .as_micros() as u64,
178        }
179    }
180}
181
182/// A received network packet with additional metadata.
183#[derive(Debug)]
184pub struct Incoming {
185    /// The address that sent the inbound packet.
186    pub peer_addr: SocketAddr,
187    /// The address on which we received the inbound packet.
188    pub local_addr: SocketAddr,
189    /// The receive timestamp of the packet.
190    ///
191    /// Used for the `perf-quic-listener-metrics` feature.
192    pub rx_time: Option<SystemTime>,
193    /// The packet's contents.
194    pub buf: PooledBuf,
195    /// If set, then `buf` is a GRO buffer containing multiple packets.
196    /// Each individual packet has a size of `gso` (except for the last one).
197    pub gro: Option<u16>,
198}
199
200/// A QUIC connection that has not performed a handshake yet.
201///
202/// This type is currently only used for server-side connections. It is created
203/// and added to the listener's connection stream after an initial packet from
204/// a client has been received and (optionally) the client's IP address has been
205/// validated.
206///
207/// To turn the initial connection into a fully established one, a QUIC
208/// handshake must be performed. Users have multiple options to facilitate this:
209/// - `start` is a simple entrypoint which spawns a task to handle the entire
210///   lifetime of the QUIC connection. The caller can then only communicate with
211///   the connection via their [`ApplicationOverQuic`].
212/// - `handshake` spawns a task for the handshake and awaits its completion.
213///   Afterwards, it pauses the connection and allows the caller to resume it
214///   later via an opaque struct. We spawn a separate task to allow the tokio
215///   scheduler free choice in where to run the handshake.
216/// - `handshake_fut` returns a future to drive the handshake for maximum
217///   flexibility.
218#[must_use = "call InitialQuicConnection::start to establish the connection"]
219pub struct InitialQuicConnection<Tx, M>
220where
221    Tx: DatagramSocketSend + Send + 'static + ?Sized,
222    M: Metrics,
223{
224    /// An internal ID, to uniquely identify the connection across multiple QUIC
225    /// connection IDs.
226    pub(crate) id: u64,
227    params: QuicConnectionParams<Tx, M>,
228    pub(crate) audit_log_stats: Arc<QuicAuditStats>,
229    stats: QuicConnectionStatsShared,
230    pub(crate) incoming_ev_sender: mpsc::Sender<Incoming>,
231    incoming_ev_receiver: mpsc::Receiver<Incoming>,
232}
233
234impl<Tx, M> InitialQuicConnection<Tx, M>
235where
236    Tx: DatagramSocketSend + Send + 'static + ?Sized,
237    M: Metrics,
238{
239    #[inline]
240    pub(crate) fn new(params: QuicConnectionParams<Tx, M>) -> Self {
241        let (incoming_ev_sender, incoming_ev_receiver) =
242            mpsc::channel(INCOMING_QUEUE_SIZE);
243        let audit_log_stats = Arc::new(QuicAuditStats::new(params.scid.to_vec()));
244
245        let stats = Arc::new(Mutex::new(QuicConnectionStats::from_conn(
246            &params.quiche_conn,
247        )));
248
249        Self {
250            id: Self::generate_id(),
251            params,
252            audit_log_stats,
253            stats,
254            incoming_ev_sender,
255            incoming_ev_receiver,
256        }
257    }
258
259    /// The local address this connection listens on.
260    pub fn local_addr(&self) -> SocketAddr {
261        self.params.local_addr
262    }
263
264    /// The remote address for this connection.
265    pub fn peer_addr(&self) -> SocketAddr {
266        self.params.peer_addr
267    }
268
269    /// [boring]'s SSL object for this connection.
270    #[doc(hidden)]
271    pub fn ssl_mut(&mut self) -> &mut SslRef {
272        self.params.quiche_conn.as_mut()
273    }
274
275    /// A handle to the [`QuicAuditStats`] for this connection.
276    ///
277    /// # Note
278    /// These stats are updated during the lifetime of the connection.
279    /// The getter exists to grab a handle early on, which can then
280    /// be stowed away and read out after the connection has closed.
281    #[inline]
282    pub fn audit_log_stats(&self) -> Arc<QuicAuditStats> {
283        Arc::clone(&self.audit_log_stats)
284    }
285
286    /// A handle to the [`QuicConnectionStats`] for this connection.
287    ///
288    /// # Note
289    /// Initially, these stats represent the state when the [quiche::Connection]
290    /// was created. They are updated when the connection is closed, so this
291    /// getter exists primarily to grab a handle early on.
292    #[inline]
293    pub fn stats(&self) -> &QuicConnectionStatsShared {
294        &self.stats
295    }
296
297    /// Creates a future to drive the connection's handshake.
298    ///
299    /// This is a lower-level alternative to the `handshake` function which
300    /// gives the caller more control over execution of the future. See
301    /// `handshake` for details on the return values.
302    #[allow(clippy::type_complexity)]
303    pub fn handshake_fut<A: ApplicationOverQuic>(
304        self, app: A,
305    ) -> (
306        QuicConnection,
307        BoxFuture<'static, io::Result<Running<Arc<Tx>, M, A>>>,
308    ) {
309        self.params.metrics.connections_in_memory().inc();
310
311        let conn = QuicConnection {
312            local_addr: self.params.local_addr,
313            peer_addr: self.params.peer_addr,
314            audit_log_stats: Arc::clone(&self.audit_log_stats),
315            stats: Arc::clone(&self.stats),
316            scid: self.params.scid,
317        };
318        let context = ConnectionStageContext {
319            in_pkt: self.params.initial_pkt,
320            incoming_pkt_receiver: self.incoming_ev_receiver,
321            application: app,
322            stats: Arc::clone(&self.stats),
323        };
324        let conn_stage = Handshake {
325            handshake_info: self.params.handshake_info,
326        };
327        let params = IoWorkerParams {
328            socket: MaybeConnectedSocket::new(self.params.socket),
329            shutdown_tx: self.params.shutdown_tx,
330            cfg: self.params.writer_cfg,
331            audit_log_stats: self.audit_log_stats,
332            write_state: WriteState::default(),
333            conn_map_cmd_tx: self.params.conn_map_cmd_tx,
334            #[cfg(feature = "perf-quic-listener-metrics")]
335            init_rx_time: self.params.init_rx_time,
336            metrics: self.params.metrics.clone(),
337        };
338
339        let handshake_fut = async move {
340            let qconn = self.params.quiche_conn;
341            let handshake_done =
342                IoWorker::new(params, conn_stage).run(qconn, context).await;
343
344            match handshake_done {
345                RunningOrClosing::Running(r) => Ok(r),
346                RunningOrClosing::Closing(Closing {
347                    params,
348                    work_loop_result,
349                    mut context,
350                    mut qconn,
351                }) => {
352                    let hs_result = make_handshake_result(&work_loop_result);
353                    IoWorker::new(params, Close { work_loop_result })
354                        .close(&mut qconn, &mut context)
355                        .await;
356                    hs_result
357                },
358            }
359        };
360
361        (conn, Box::pin(handshake_fut))
362    }
363
364    /// Performs the QUIC handshake in a separate tokio task and awaits its
365    /// completion.
366    ///
367    /// The returned [`QuicConnection`] holds metadata about the established
368    /// connection. The connection itself is paused after `handshake`
369    /// returns and must be resumed by passing the opaque `Running` value to
370    /// [`InitialQuicConnection::resume`]. This two-step process
371    /// allows callers to collect telemetry and run code before serving their
372    /// [`ApplicationOverQuic`].
373    pub async fn handshake<A: ApplicationOverQuic>(
374        self, app: A,
375    ) -> io::Result<(QuicConnection, Running<Arc<Tx>, M, A>)> {
376        let task_metrics = self.params.metrics.clone();
377        let (conn, handshake_fut) = Self::handshake_fut(self, app);
378
379        let handshake_handle = crate::metrics::tokio_task::spawn(
380            "quic_handshake_worker",
381            task_metrics,
382            handshake_fut,
383        );
384
385        // `AbortOnDropHandle` simulates task-killswitch behavior without needing
386        // to give up ownership of the `JoinHandle`.
387        let handshake_abort_handle = AbortOnDropHandle::new(handshake_handle);
388
389        let worker = handshake_abort_handle.await??;
390
391        Ok((conn, worker))
392    }
393
394    /// Resumes a QUIC connection which was paused after a successful handshake.
395    pub fn resume<A: ApplicationOverQuic>(pre_running: Running<Arc<Tx>, M, A>) {
396        let task_metrics = pre_running.params.metrics.clone();
397        let fut = async move {
398            let Running {
399                params,
400                context,
401                qconn,
402            } = pre_running;
403            let running_worker = IoWorker::new(params, RunningApplication);
404
405            let Closing {
406                params,
407                mut context,
408                work_loop_result,
409                mut qconn,
410            } = running_worker.run(qconn, context).await;
411
412            IoWorker::new(params, Close { work_loop_result })
413                .close(&mut qconn, &mut context)
414                .await;
415        };
416
417        crate::metrics::tokio_task::spawn_with_killswitch(
418            "quic_io_worker",
419            task_metrics,
420            fut,
421        );
422    }
423
424    /// Drives a QUIC connection from handshake to close in separate tokio
425    /// tasks.
426    ///
427    /// It combines [`InitialQuicConnection::handshake`] and
428    /// [`InitialQuicConnection::resume`] into a single call.
429    pub fn start<A: ApplicationOverQuic>(self, app: A) -> QuicConnection {
430        let task_metrics = self.params.metrics.clone();
431        let (conn, handshake_fut) = Self::handshake_fut(self, app);
432
433        let fut = async move {
434            match handshake_fut.await {
435                Ok(running) => Self::resume(running),
436                Err(e) =>
437                    log::error!("QUIC handshake failed in IQC::start"; "error" => e),
438            }
439        };
440
441        crate::metrics::tokio_task::spawn_with_killswitch(
442            "quic_handshake_worker",
443            task_metrics,
444            fut,
445        );
446
447        conn
448    }
449
450    fn generate_id() -> u64 {
451        let mut buf = [0; 8];
452
453        boring::rand::rand_bytes(&mut buf).unwrap();
454
455        u64::from_ne_bytes(buf)
456    }
457}
458
459pub(crate) struct QuicConnectionParams<Tx, M>
460where
461    Tx: DatagramSocketSend + Send + 'static + ?Sized,
462    M: Metrics,
463{
464    pub writer_cfg: WriterConfig,
465    pub initial_pkt: Option<Incoming>,
466    pub shutdown_tx: mpsc::Sender<()>,
467    pub conn_map_cmd_tx: mpsc::UnboundedSender<ConnectionMapCommand>, /* channel that signals connection map changes */
468    pub scid: ConnectionId<'static>,
469    pub metrics: M,
470    #[cfg(feature = "perf-quic-listener-metrics")]
471    pub init_rx_time: Option<SystemTime>,
472    pub handshake_info: HandshakeInfo,
473    pub quiche_conn: QuicheConnection,
474    pub socket: Arc<Tx>,
475    pub local_addr: SocketAddr,
476    pub peer_addr: SocketAddr,
477}
478
479/// Metadata about an established QUIC connection.
480///
481/// While this struct allows access to some facets of a QUIC connection, it
482/// notably does not represent the [quiche::Connection] itself. The crate
483/// handles most interactions with [quiche] internally in a worker task. Users
484/// can only access the connection directly via their [`ApplicationOverQuic`]
485/// implementation.
486///
487/// See the [module-level docs](crate::quic) for an overview of how a QUIC
488/// connection is handled internally.
489pub struct QuicConnection {
490    local_addr: SocketAddr,
491    peer_addr: SocketAddr,
492    audit_log_stats: Arc<QuicAuditStats>,
493    stats: QuicConnectionStatsShared,
494    scid: ConnectionId<'static>,
495}
496
497impl QuicConnection {
498    /// The local address this connection listens on.
499    #[inline]
500    pub fn local_addr(&self) -> SocketAddr {
501        self.local_addr
502    }
503
504    /// The remote address for this connection.
505    #[inline]
506    pub fn peer_addr(&self) -> SocketAddr {
507        self.peer_addr
508    }
509
510    /// A handle to the [`QuicAuditStats`] for this connection.
511    ///
512    /// # Note
513    /// These stats are updated during the lifetime of the connection.
514    /// The getter exists to grab a handle early on, which can then
515    /// be stowed away and read out after the connection has closed.
516    #[inline]
517    pub fn audit_log_stats(&self) -> &Arc<QuicAuditStats> {
518        &self.audit_log_stats
519    }
520
521    /// A handle to the [`QuicConnectionStats`] for this connection.
522    ///
523    /// # Note
524    /// Initially, these stats represent the state when the [quiche::Connection]
525    /// was created. They are updated when the connection is closed, so this
526    /// getter exists primarily to grab a handle early on.
527    #[inline]
528    pub fn stats(&self) -> &QuicConnectionStatsShared {
529        &self.stats
530    }
531
532    /// The QUIC source connection ID used by this connection.
533    #[inline]
534    pub fn scid(&self) -> &ConnectionId<'static> {
535        &self.scid
536    }
537}
538
539impl AsSocketStats for QuicConnection {
540    #[inline]
541    fn as_socket_stats(&self) -> SocketStats {
542        // It is important to note that those stats are only updated when
543        // the connection stops, which is fine, since this is only used to
544        // log after the connection is finished.
545        self.stats.lock().unwrap().as_socket_stats()
546    }
547
548    #[inline]
549    fn as_quic_stats(&self) -> Option<&Arc<QuicAuditStats>> {
550        Some(&self.audit_log_stats)
551    }
552}
553
554impl<Tx, M> AsSocketStats for InitialQuicConnection<Tx, M>
555where
556    Tx: DatagramSocketSend + Send + 'static + ?Sized,
557    M: Metrics,
558{
559    #[inline]
560    fn as_socket_stats(&self) -> SocketStats {
561        // It is important to note that those stats are only updated when
562        // the connection stops, which is fine, since this is only used to
563        // log after the connection is finished.
564        self.stats.lock().unwrap().as_socket_stats()
565    }
566
567    #[inline]
568    fn as_quic_stats(&self) -> Option<&Arc<QuicAuditStats>> {
569        Some(&self.audit_log_stats)
570    }
571}
572
573impl<Tx, M> ShutdownConnection for InitialQuicConnection<Tx, M>
574where
575    Tx: DatagramSocketSend + Send + 'static + ?Sized,
576    M: Metrics,
577{
578    #[inline]
579    fn poll_shutdown(
580        &mut self, _cx: &mut std::task::Context,
581    ) -> std::task::Poll<io::Result<()>> {
582        // TODO: Does nothing at the moment. We always call Self::start
583        // anyway so it's not really important at this moment.
584        Poll::Ready(Ok(()))
585    }
586}
587
588impl ShutdownConnection for QuicConnection {
589    #[inline]
590    fn poll_shutdown(
591        &mut self, _cx: &mut std::task::Context,
592    ) -> std::task::Poll<io::Result<()>> {
593        // TODO: does nothing at the moment
594        Poll::Ready(Ok(()))
595    }
596}
597
598/// Details about a connection's QUIC handshake.
599#[derive(Debug, Clone)]
600pub struct HandshakeInfo {
601    /// The time at which the connection was created.
602    start_time: Instant,
603    /// The timeout before which the handshake must complete.
604    timeout: Option<Duration>,
605    /// The real duration that the handshake took to complete.
606    time_handshake: Option<Duration>,
607}
608
609impl HandshakeInfo {
610    pub(crate) fn new(start_time: Instant, timeout: Option<Duration>) -> Self {
611        Self {
612            start_time,
613            timeout,
614            time_handshake: None,
615        }
616    }
617
618    /// The time at which the connection was created.
619    #[inline]
620    pub fn start_time(&self) -> Instant {
621        self.start_time
622    }
623
624    /// How long the handshake took to complete.
625    #[inline]
626    pub fn elapsed(&self) -> Duration {
627        self.time_handshake.unwrap_or_default()
628    }
629
630    pub(crate) fn set_elapsed(&mut self) {
631        let elapsed = self.start_time.elapsed();
632        self.time_handshake = Some(elapsed)
633    }
634
635    pub(crate) fn deadline(&self) -> Option<Instant> {
636        self.timeout.map(|timeout| self.start_time + timeout)
637    }
638
639    pub(crate) fn is_expired(&self) -> bool {
640        self.timeout
641            .is_some_and(|timeout| self.start_time.elapsed() >= timeout)
642    }
643}
644
645/// A trait to implement an application served over QUIC.
646///
647/// The application is driven by an internal worker task, which also handles I/O
648/// for the connection. The worker feeds inbound packets into the
649/// [quiche::Connection], calls [`ApplicationOverQuic::process_reads`] followed
650/// by [`ApplicationOverQuic::process_writes`], and then flushes any pending
651/// outbound packets to the network. This repeats in a loop until either the
652/// connection is closed or the [`ApplicationOverQuic`] returns an error.
653///
654/// In between loop iterations, the worker yields until a new packet arrives, a
655/// timer expires, or [`ApplicationOverQuic::wait_for_data`] resolves.
656/// Implementors can interact with the underlying connection via the mutable
657/// reference passed to trait methods.
658#[allow(unused_variables)] // for default functions
659pub trait ApplicationOverQuic: Send + 'static {
660    /// Callback to customize the [`ApplicationOverQuic`] after the QUIC
661    /// handshake completed successfully.
662    ///
663    /// # Errors
664    /// Returning an error from this method immediately stops the worker loop
665    /// and transitions to the connection closing stage.
666    fn on_conn_established(
667        &mut self, qconn: &mut QuicheConnection, handshake_info: &HandshakeInfo,
668    ) -> QuicResult<()>;
669
670    /// Determines whether the application's methods will be called by the
671    /// worker.
672    ///
673    /// The function is checked in each iteration of the worker loop. Only
674    /// `on_conn_established()` and `buffer()` bypass this check.
675    fn should_act(&self) -> bool;
676
677    /// A borrowed buffer for the worker to write outbound packets into.
678    ///
679    /// This method allows sharing a buffer between the worker and the
680    /// application, efficiently using the allocated memory while the
681    /// application is inactive. It can also be used to artificially
682    /// restrict the size of outbound network packets.
683    ///
684    /// Any data in the buffer may be overwritten by the worker. If necessary,
685    /// the application should save the contents when this method is called.
686    fn buffer(&mut self) -> &mut [u8];
687
688    /// Waits for an event to trigger the next iteration of the worker loop.
689    ///
690    /// The returned future is awaited in parallel to inbound packets and the
691    /// connection's timers. Any one of those futures resolving triggers the
692    /// next loop iteration, so implementations should not rely on
693    /// `wait_for_data` for the bulk of their processing. Instead, after
694    /// `wait_for_data` resolves, `process_writes` should be used to pull all
695    /// available data out of the event source (for example, a channel).
696    ///
697    /// As for any future, it is **very important** that this method does not
698    /// block the runtime. If it does, the other concurrent futures will be
699    /// starved.
700    ///
701    /// # Errors
702    /// Returning an error from this method immediately stops the worker loop
703    /// and transitions to the connection closing stage.
704    fn wait_for_data(
705        &mut self, qconn: &mut QuicheConnection,
706    ) -> impl Future<Output = QuicResult<()>> + Send;
707
708    /// Processes data received on the connection.
709    ///
710    /// This method is only called if `should_act()` returns `true` and any
711    /// packets were received since the last worker loop iteration. It
712    /// should be used to read from the connection's open streams.
713    ///
714    /// # Errors
715    /// Returning an error from this method immediately stops the worker loop
716    /// and transitions to the connection closing stage.
717    fn process_reads(&mut self, qconn: &mut QuicheConnection) -> QuicResult<()>;
718
719    /// Adds data to be sent on the connection.
720    ///
721    /// Unlike `process_reads`, this method is called on every iteration of the
722    /// worker loop (provided `should_act()` returns true). It is called
723    /// after `process_reads` and immediately before packets are pushed to
724    /// the socket. The main use case is providing already-buffered data to
725    /// the [quiche::Connection].
726    ///
727    /// # Errors
728    /// Returning an error from this method immediately stops the worker loop
729    /// and transitions to the connection closing stage.
730    fn process_writes(&mut self, qconn: &mut QuicheConnection) -> QuicResult<()>;
731
732    /// Callback to inspect the result of the worker task, before a final packet
733    /// with a `CONNECTION_CLOSE` frame is flushed to the network.
734    ///
735    /// `connection_result` is [`Ok`] only if the connection was closed without
736    /// any local error. Otherwise, the state of `qconn` depends on the
737    /// error type and application behavior.
738    fn on_conn_close<M: Metrics>(
739        &mut self, qconn: &mut QuicheConnection, metrics: &M,
740        connection_result: &QuicResult<()>,
741    ) {
742    }
743}
744
745/// A command to execute on a [quiche::Connection] in the context of an
746/// [`ApplicationOverQuic`].
747///
748/// We expect most [`ApplicationOverQuic`] implementations (such as
749/// [H3Driver](crate::http3::driver::H3Driver)) will provide some way to submit
750/// actions for them to take, for example via a channel. This enum may be
751/// accepted as part of those actions to inspect or alter the state of the
752/// underlying connection.
753pub enum QuicCommand {
754    /// Close the connection with the given parameters.
755    ///
756    /// Some packets may still be sent after this command has been executed, so
757    /// the worker task may continue running for a bit. See
758    /// [`quiche::Connection::close`] for details.
759    ConnectionClose(ConnectionShutdownBehaviour),
760    /// Execute a custom callback on the connection.
761    Custom(Box<dyn FnOnce(&mut QuicheConnection) + Send + 'static>),
762    /// Collect the current [`SocketStats`] from the connection.
763    ///
764    /// Unlike [`QuicConnection::stats()`], these statistics are not cached and
765    /// instead are retrieved right before the command is executed.
766    Stats(Box<dyn FnOnce(datagram_socket::SocketStats) + Send + 'static>),
767}
768
769impl QuicCommand {
770    /// Consume the command and perform its operation on `qconn`.
771    ///
772    /// This method should be called by [`ApplicationOverQuic`] implementations
773    /// when they receive a [`QuicCommand`] to execute.
774    pub fn execute(self, qconn: &mut QuicheConnection) {
775        match self {
776            Self::ConnectionClose(behavior) => {
777                let ConnectionShutdownBehaviour {
778                    send_application_close,
779                    error_code,
780                    reason,
781                } = behavior;
782
783                let _ = qconn.close(send_application_close, error_code, &reason);
784            },
785            Self::Custom(f) => {
786                (f)(qconn);
787            },
788            Self::Stats(callback) => {
789                let stats_pair = QuicConnectionStats::from_conn(qconn);
790                (callback)(stats_pair.as_socket_stats());
791            },
792        }
793    }
794}
795
796impl fmt::Debug for QuicCommand {
797    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
798        match self {
799            Self::ConnectionClose(b) =>
800                f.debug_tuple("ConnectionClose").field(b).finish(),
801            Self::Custom(_) => f.debug_tuple("Custom").finish_non_exhaustive(),
802            Self::Stats(_) => f.debug_tuple("Stats").finish_non_exhaustive(),
803        }
804    }
805}
806
807/// Parameters to close a [quiche::Connection].
808///
809/// The connection will use these parameters for the `CONNECTION_CLOSE` frame
810/// it sends to its peer.
811#[derive(Debug, Clone)]
812pub struct ConnectionShutdownBehaviour {
813    /// Whether to send an application close or a regular close to the peer.
814    ///
815    /// If this is true but the connection is not in a state where it is safe to
816    /// send an application error (not established nor in early data), in
817    /// accordance with [RFC 9000](https://www.rfc-editor.org/rfc/rfc9000.html#section-10.2.3-3), the
818    /// error code is changed to `APPLICATION_ERROR` and the reason phrase is
819    /// cleared.
820    pub send_application_close: bool,
821    /// The [QUIC][proto-err] or [application-level][app-err] error code to send
822    /// to the peer.
823    ///
824    /// [proto-err]: https://www.rfc-editor.org/rfc/rfc9000.html#section-20.1
825    /// [app-err]: https://www.rfc-editor.org/rfc/rfc9000.html#section-20.2
826    pub error_code: u64,
827    /// The reason phrase to send to the peer.
828    pub reason: Vec<u8>,
829}