Skip to main content

tokio_quiche/quic/connection/
mod.rs

1// Copyright (C) 2025, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27mod error;
28mod id;
29mod map;
30
31pub use self::error::HandshakeError;
32pub use self::id::ConnectionIdGenerator;
33pub use self::id::SharedConnectionIdGenerator;
34pub use self::id::SimpleConnectionIdGenerator;
35pub(crate) use self::map::ConnectionMap;
36
37use boring::ssl::SslRef;
38use datagram_socket::AsSocketStats;
39use datagram_socket::DatagramSocketSend;
40use datagram_socket::MaybeConnectedSocket;
41use datagram_socket::QuicAuditStats;
42use datagram_socket::ShutdownConnection;
43use datagram_socket::SocketStats;
44use foundations::telemetry::log;
45use futures::future::BoxFuture;
46use futures::Future;
47use quiche::ConnectionId;
48use std::fmt;
49use std::io;
50use std::net::SocketAddr;
51use std::sync::Arc;
52use std::sync::Mutex;
53use std::task::Poll;
54use std::time::Duration;
55use std::time::Instant;
56use std::time::SystemTime;
57use tokio::sync::mpsc;
58use tokio_util::task::AbortOnDropHandle;
59
60use self::error::make_handshake_result;
61use super::io::connection_stage::Close;
62use super::io::connection_stage::ConnectionStageContext;
63use super::io::connection_stage::Handshake;
64use super::io::connection_stage::RunningApplication;
65use super::io::worker::Closing;
66use super::io::worker::IoWorkerParams;
67use super::io::worker::Running;
68use super::io::worker::RunningOrClosing;
69use super::io::worker::WriteState;
70use super::QuicheConnection;
71use crate::buf_factory::PooledBuf;
72use crate::metrics::Metrics;
73use crate::quic::io::worker::IoWorker;
74use crate::quic::io::worker::WriterConfig;
75use crate::quic::io::worker::INCOMING_QUEUE_SIZE;
76use crate::quic::router::ConnectionMapCommand;
77use crate::QuicResult;
78
79/// Wrapper for connection statistics recorded by [quiche].
80#[derive(Debug)]
81pub struct QuicConnectionStats {
82    /// Aggregate connection statistics across all paths.
83    pub stats: quiche::Stats,
84    /// Specific statistics about the connection's active path.
85    pub path_stats: Option<quiche::PathStats>,
86}
87pub(crate) type QuicConnectionStatsShared = Arc<Mutex<QuicConnectionStats>>;
88
89impl QuicConnectionStats {
90    pub(crate) fn from_conn(qconn: &QuicheConnection) -> Self {
91        Self {
92            stats: qconn.stats(),
93            path_stats: qconn.path_stats().next(),
94        }
95    }
96
97    fn startup_exit_to_socket_stats(
98        value: quiche::StartupExit,
99    ) -> datagram_socket::StartupExit {
100        let reason = match value.reason {
101            quiche::StartupExitReason::Loss =>
102                datagram_socket::StartupExitReason::Loss,
103            quiche::StartupExitReason::BandwidthPlateau =>
104                datagram_socket::StartupExitReason::BandwidthPlateau,
105            quiche::StartupExitReason::PersistentQueue =>
106                datagram_socket::StartupExitReason::PersistentQueue,
107            quiche::StartupExitReason::ConservativeSlowStartRounds =>
108                datagram_socket::StartupExitReason::ConservativeSlowStartRounds,
109        };
110
111        datagram_socket::StartupExit {
112            cwnd: value.cwnd,
113            bandwidth: value.bandwidth,
114            reason,
115        }
116    }
117}
118
119impl AsSocketStats for QuicConnectionStats {
120    fn as_socket_stats(&self) -> SocketStats {
121        SocketStats {
122            pmtu: self
123                .path_stats
124                .as_ref()
125                .map(|p| p.pmtu as u16)
126                .unwrap_or_default(),
127            rtt_us: self
128                .path_stats
129                .as_ref()
130                .map(|p| p.rtt.as_micros() as i64)
131                .unwrap_or_default(),
132            min_rtt_us: self
133                .path_stats
134                .as_ref()
135                .and_then(|p| p.min_rtt.map(|x| x.as_micros() as i64))
136                .unwrap_or_default(),
137            max_rtt_us: self
138                .path_stats
139                .as_ref()
140                .and_then(|p| p.max_rtt.map(|x| x.as_micros() as i64))
141                .unwrap_or_default(),
142            rtt_var_us: self
143                .path_stats
144                .as_ref()
145                .map(|p| p.rttvar.as_micros() as i64)
146                .unwrap_or_default(),
147            cwnd: self
148                .path_stats
149                .as_ref()
150                .map(|p| p.cwnd as u64)
151                .unwrap_or_default(),
152            total_pto_count: self
153                .path_stats
154                .as_ref()
155                .map(|p| p.total_pto_count as u64)
156                .unwrap_or_default(),
157            packets_sent: self.stats.sent as u64,
158            packets_recvd: self.stats.recv as u64,
159            packets_lost: self.stats.lost as u64,
160            packets_lost_spurious: self.stats.spurious_lost as u64,
161            packets_retrans: self.stats.retrans as u64,
162            bytes_sent: self.stats.sent_bytes,
163            bytes_recvd: self.stats.recv_bytes,
164            bytes_lost: self.stats.lost_bytes,
165            bytes_retrans: self.stats.stream_retrans_bytes,
166            bytes_unsent: 0, /* not implemented yet, kept for compatibility
167                              * with TCP */
168            delivery_rate: self
169                .path_stats
170                .as_ref()
171                .map(|p| p.delivery_rate)
172                .unwrap_or_default(),
173            max_bandwidth: self.path_stats.as_ref().and_then(|p| p.max_bandwidth),
174            startup_exit: self
175                .path_stats
176                .as_ref()
177                .and_then(|p| p.startup_exit)
178                .map(QuicConnectionStats::startup_exit_to_socket_stats),
179            bytes_in_flight_duration_us: self
180                .stats
181                .bytes_in_flight_duration
182                .as_micros() as u64,
183        }
184    }
185}
186
187/// A received network packet with additional metadata.
188#[derive(Debug)]
189pub struct Incoming {
190    /// The address that sent the inbound packet.
191    pub peer_addr: SocketAddr,
192    /// The address on which we received the inbound packet.
193    pub local_addr: SocketAddr,
194    /// The receive timestamp of the packet.
195    ///
196    /// Used for the `perf-quic-listener-metrics` feature.
197    pub rx_time: Option<SystemTime>,
198    /// The packet's contents.
199    pub buf: PooledBuf,
200    /// If set, then `buf` is a GRO buffer containing multiple packets.
201    /// Each individual packet has a size of `gso` (except for the last one).
202    pub gro: Option<i32>,
203    /// [SO_MARK] control message value received from the socket.
204    ///
205    /// This will always be `None` after the connection has been spawned as
206    /// the message is `take()`d before spawning.
207    ///
208    /// [SO_MARK]: https://man7.org/linux/man-pages/man7/socket.7.html
209    #[cfg(target_os = "linux")]
210    pub so_mark_data: Option<[u8; 4]>,
211}
212
213/// A QUIC connection that has not performed a handshake yet.
214///
215/// This type is currently only used for server-side connections. It is created
216/// and added to the listener's connection stream after an initial packet from
217/// a client has been received and (optionally) the client's IP address has been
218/// validated.
219///
220/// To turn the initial connection into a fully established one, a QUIC
221/// handshake must be performed. Users have multiple options to facilitate this:
222/// - `start` is a simple entrypoint which spawns a task to handle the entire
223///   lifetime of the QUIC connection. The caller can then only communicate with
224///   the connection via their [`ApplicationOverQuic`].
225/// - `handshake` spawns a task for the handshake and awaits its completion.
226///   Afterwards, it pauses the connection and allows the caller to resume it
227///   later via an opaque struct. We spawn a separate task to allow the tokio
228///   scheduler free choice in where to run the handshake.
229/// - `handshake_fut` returns a future to drive the handshake for maximum
230///   flexibility.
231#[must_use = "call InitialQuicConnection::start to establish the connection"]
232pub struct InitialQuicConnection<Tx, M>
233where
234    Tx: DatagramSocketSend + Send + 'static + ?Sized,
235    M: Metrics,
236{
237    params: QuicConnectionParams<Tx, M>,
238    pub(crate) audit_log_stats: Arc<QuicAuditStats>,
239    stats: QuicConnectionStatsShared,
240    pub(crate) incoming_ev_sender: mpsc::Sender<Incoming>,
241    incoming_ev_receiver: mpsc::Receiver<Incoming>,
242}
243
244impl<Tx, M> InitialQuicConnection<Tx, M>
245where
246    Tx: DatagramSocketSend + Send + 'static + ?Sized,
247    M: Metrics,
248{
249    #[inline]
250    pub(crate) fn new(params: QuicConnectionParams<Tx, M>) -> Self {
251        let (incoming_ev_sender, incoming_ev_receiver) =
252            mpsc::channel(INCOMING_QUEUE_SIZE);
253        let audit_log_stats = Arc::new(QuicAuditStats::new(params.scid.to_vec()));
254
255        let stats = Arc::new(Mutex::new(QuicConnectionStats::from_conn(
256            &params.quiche_conn,
257        )));
258
259        Self {
260            params,
261            audit_log_stats,
262            stats,
263            incoming_ev_sender,
264            incoming_ev_receiver,
265        }
266    }
267
268    /// The local address this connection listens on.
269    pub fn local_addr(&self) -> SocketAddr {
270        self.params.local_addr
271    }
272
273    /// The remote address for this connection.
274    pub fn peer_addr(&self) -> SocketAddr {
275        self.params.peer_addr
276    }
277
278    /// [boring]'s SSL object for this connection.
279    #[doc(hidden)]
280    pub fn ssl_mut(&mut self) -> &mut SslRef {
281        self.params.quiche_conn.as_mut()
282    }
283
284    /// A handle to the [`QuicAuditStats`] for this connection.
285    ///
286    /// # Note
287    /// These stats are updated during the lifetime of the connection.
288    /// The getter exists to grab a handle early on, which can then
289    /// be stowed away and read out after the connection has closed.
290    #[inline]
291    pub fn audit_log_stats(&self) -> Arc<QuicAuditStats> {
292        Arc::clone(&self.audit_log_stats)
293    }
294
295    /// A handle to the [`QuicConnectionStats`] for this connection.
296    ///
297    /// # Note
298    /// Initially, these stats represent the state when the [quiche::Connection]
299    /// was created. They are updated when the connection is closed, so this
300    /// getter exists primarily to grab a handle early on.
301    #[inline]
302    pub fn stats(&self) -> &QuicConnectionStatsShared {
303        &self.stats
304    }
305
306    /// Creates a future to drive the connection's handshake.
307    ///
308    /// This is a lower-level alternative to the `handshake` function which
309    /// gives the caller more control over execution of the future. See
310    /// `handshake` for details on the return values.
311    #[allow(clippy::type_complexity)]
312    pub fn handshake_fut<A: ApplicationOverQuic>(
313        self, app: A,
314    ) -> (
315        QuicConnection,
316        BoxFuture<'static, io::Result<Running<Arc<Tx>, M, A>>>,
317    ) {
318        self.params.metrics.connections_in_memory().inc();
319
320        let conn = QuicConnection {
321            local_addr: self.params.local_addr,
322            peer_addr: self.params.peer_addr,
323            audit_log_stats: Arc::clone(&self.audit_log_stats),
324            stats: Arc::clone(&self.stats),
325            scid: self.params.scid,
326        };
327        let context = ConnectionStageContext {
328            in_pkt: self.params.initial_pkt,
329            incoming_pkt_receiver: self.incoming_ev_receiver,
330            application: app,
331            stats: Arc::clone(&self.stats),
332        };
333        let conn_stage = Handshake {
334            handshake_info: self.params.handshake_info,
335        };
336        let params = IoWorkerParams {
337            socket: MaybeConnectedSocket::new(self.params.socket),
338            shutdown_tx: self.params.shutdown_tx,
339            cfg: self.params.writer_cfg,
340            audit_log_stats: self.audit_log_stats,
341            write_state: WriteState::default(),
342            conn_map_cmd_tx: self.params.conn_map_cmd_tx,
343            cid_generator: self.params.cid_generator,
344            #[cfg(feature = "perf-quic-listener-metrics")]
345            init_rx_time: self.params.init_rx_time,
346            metrics: self.params.metrics.clone(),
347        };
348
349        let handshake_fut = async move {
350            let qconn = self.params.quiche_conn;
351            let handshake_done =
352                IoWorker::new(params, conn_stage).run(qconn, context).await;
353
354            match handshake_done {
355                RunningOrClosing::Running(r) => Ok(r),
356                RunningOrClosing::Closing(Closing {
357                    params,
358                    work_loop_result,
359                    mut context,
360                    mut qconn,
361                }) => {
362                    let hs_result = make_handshake_result(&work_loop_result);
363                    IoWorker::new(params, Close { work_loop_result })
364                        .close(&mut qconn, &mut context)
365                        .await;
366                    hs_result
367                },
368            }
369        };
370
371        (conn, Box::pin(handshake_fut))
372    }
373
374    /// Performs the QUIC handshake in a separate tokio task and awaits its
375    /// completion.
376    ///
377    /// The returned [`QuicConnection`] holds metadata about the established
378    /// connection. The connection itself is paused after `handshake`
379    /// returns and must be resumed by passing the opaque `Running` value to
380    /// [`InitialQuicConnection::resume`]. This two-step process
381    /// allows callers to collect telemetry and run code before serving their
382    /// [`ApplicationOverQuic`].
383    pub async fn handshake<A: ApplicationOverQuic>(
384        self, app: A,
385    ) -> io::Result<(QuicConnection, Running<Arc<Tx>, M, A>)> {
386        let task_metrics = self.params.metrics.clone();
387        let (conn, handshake_fut) = Self::handshake_fut(self, app);
388
389        let handshake_handle = crate::metrics::tokio_task::spawn(
390            "quic_handshake_worker",
391            task_metrics,
392            handshake_fut,
393        );
394
395        // `AbortOnDropHandle` simulates task-killswitch behavior without needing
396        // to give up ownership of the `JoinHandle`.
397        let handshake_abort_handle = AbortOnDropHandle::new(handshake_handle);
398
399        let worker = handshake_abort_handle.await??;
400
401        Ok((conn, worker))
402    }
403
404    /// Resumes a QUIC connection which was paused after a successful handshake.
405    pub fn resume<A: ApplicationOverQuic>(pre_running: Running<Arc<Tx>, M, A>) {
406        let task_metrics = pre_running.params.metrics.clone();
407        let fut = async move {
408            let Running {
409                params,
410                context,
411                qconn,
412            } = pre_running;
413            let running_worker = IoWorker::new(params, RunningApplication);
414
415            let Closing {
416                params,
417                mut context,
418                work_loop_result,
419                mut qconn,
420            } = running_worker.run(qconn, context).await;
421
422            IoWorker::new(params, Close { work_loop_result })
423                .close(&mut qconn, &mut context)
424                .await;
425        };
426
427        crate::metrics::tokio_task::spawn_with_killswitch(
428            "quic_io_worker",
429            task_metrics,
430            fut,
431        );
432    }
433
434    /// Drives a QUIC connection from handshake to close in separate tokio
435    /// tasks.
436    ///
437    /// It combines [`InitialQuicConnection::handshake`] and
438    /// [`InitialQuicConnection::resume`] into a single call.
439    pub fn start<A: ApplicationOverQuic>(self, app: A) -> QuicConnection {
440        let task_metrics = self.params.metrics.clone();
441        let (conn, handshake_fut) = Self::handshake_fut(self, app);
442
443        let fut = async move {
444            match handshake_fut.await {
445                Ok(running) => Self::resume(running),
446                Err(e) => {
447                    log::error!("QUIC handshake failed in IQC::start"; "error" => e)
448                },
449            }
450        };
451
452        crate::metrics::tokio_task::spawn_with_killswitch(
453            "quic_handshake_worker",
454            task_metrics,
455            fut,
456        );
457
458        conn
459    }
460}
461
462pub(crate) struct QuicConnectionParams<Tx, M>
463where
464    Tx: DatagramSocketSend + Send + 'static + ?Sized,
465    M: Metrics,
466{
467    pub writer_cfg: WriterConfig,
468    pub initial_pkt: Option<Incoming>,
469    pub shutdown_tx: mpsc::Sender<()>,
470    pub conn_map_cmd_tx: mpsc::UnboundedSender<ConnectionMapCommand>, /* channel that signals connection map changes */
471    pub scid: ConnectionId<'static>,
472    pub cid_generator: Option<SharedConnectionIdGenerator>,
473    pub metrics: M,
474    #[cfg(feature = "perf-quic-listener-metrics")]
475    pub init_rx_time: Option<SystemTime>,
476    pub handshake_info: HandshakeInfo,
477    pub quiche_conn: QuicheConnection,
478    pub socket: Arc<Tx>,
479    pub local_addr: SocketAddr,
480    pub peer_addr: SocketAddr,
481}
482
483/// Metadata about an established QUIC connection.
484///
485/// While this struct allows access to some facets of a QUIC connection, it
486/// notably does not represent the [quiche::Connection] itself. The crate
487/// handles most interactions with [quiche] internally in a worker task. Users
488/// can only access the connection directly via their [`ApplicationOverQuic`]
489/// implementation.
490///
491/// See the [module-level docs](crate::quic) for an overview of how a QUIC
492/// connection is handled internally.
493pub struct QuicConnection {
494    local_addr: SocketAddr,
495    peer_addr: SocketAddr,
496    audit_log_stats: Arc<QuicAuditStats>,
497    stats: QuicConnectionStatsShared,
498    scid: ConnectionId<'static>,
499}
500
501impl QuicConnection {
502    /// The local address this connection listens on.
503    #[inline]
504    pub fn local_addr(&self) -> SocketAddr {
505        self.local_addr
506    }
507
508    /// The remote address for this connection.
509    #[inline]
510    pub fn peer_addr(&self) -> SocketAddr {
511        self.peer_addr
512    }
513
514    /// A handle to the [`QuicAuditStats`] for this connection.
515    ///
516    /// # Note
517    /// These stats are updated during the lifetime of the connection.
518    /// The getter exists to grab a handle early on, which can then
519    /// be stowed away and read out after the connection has closed.
520    #[inline]
521    pub fn audit_log_stats(&self) -> &Arc<QuicAuditStats> {
522        &self.audit_log_stats
523    }
524
525    /// A handle to the [`QuicConnectionStats`] for this connection.
526    ///
527    /// # Note
528    /// Initially, these stats represent the state when the [quiche::Connection]
529    /// was created. They are updated when the connection is closed, so this
530    /// getter exists primarily to grab a handle early on.
531    #[inline]
532    pub fn stats(&self) -> &QuicConnectionStatsShared {
533        &self.stats
534    }
535
536    /// The QUIC source connection ID used by this connection.
537    #[inline]
538    pub fn scid(&self) -> &ConnectionId<'static> {
539        &self.scid
540    }
541}
542
543impl AsSocketStats for QuicConnection {
544    #[inline]
545    fn as_socket_stats(&self) -> SocketStats {
546        // It is important to note that those stats are only updated when
547        // the connection stops, which is fine, since this is only used to
548        // log after the connection is finished.
549        self.stats.lock().unwrap().as_socket_stats()
550    }
551
552    #[inline]
553    fn as_quic_stats(&self) -> Option<&Arc<QuicAuditStats>> {
554        Some(&self.audit_log_stats)
555    }
556}
557
558impl<Tx, M> AsSocketStats for InitialQuicConnection<Tx, M>
559where
560    Tx: DatagramSocketSend + Send + 'static + ?Sized,
561    M: Metrics,
562{
563    #[inline]
564    fn as_socket_stats(&self) -> SocketStats {
565        // It is important to note that those stats are only updated when
566        // the connection stops, which is fine, since this is only used to
567        // log after the connection is finished.
568        self.stats.lock().unwrap().as_socket_stats()
569    }
570
571    #[inline]
572    fn as_quic_stats(&self) -> Option<&Arc<QuicAuditStats>> {
573        Some(&self.audit_log_stats)
574    }
575}
576
577impl<Tx, M> ShutdownConnection for InitialQuicConnection<Tx, M>
578where
579    Tx: DatagramSocketSend + Send + 'static + ?Sized,
580    M: Metrics,
581{
582    #[inline]
583    fn poll_shutdown(
584        &mut self, _cx: &mut std::task::Context,
585    ) -> std::task::Poll<io::Result<()>> {
586        // TODO: Does nothing at the moment. We always call Self::start
587        // anyway so it's not really important at this moment.
588        Poll::Ready(Ok(()))
589    }
590}
591
592impl ShutdownConnection for QuicConnection {
593    #[inline]
594    fn poll_shutdown(
595        &mut self, _cx: &mut std::task::Context,
596    ) -> std::task::Poll<io::Result<()>> {
597        // TODO: does nothing at the moment
598        Poll::Ready(Ok(()))
599    }
600}
601
602/// Details about a connection's QUIC handshake.
603#[derive(Debug, Clone)]
604pub struct HandshakeInfo {
605    /// The time at which the connection was created.
606    start_time: Instant,
607    /// The timeout before which the handshake must complete.
608    timeout: Option<Duration>,
609    /// The real duration that the handshake took to complete.
610    time_handshake: Option<Duration>,
611}
612
613impl HandshakeInfo {
614    pub(crate) fn new(start_time: Instant, timeout: Option<Duration>) -> Self {
615        Self {
616            start_time,
617            timeout,
618            time_handshake: None,
619        }
620    }
621
622    /// The time at which the connection was created.
623    #[inline]
624    pub fn start_time(&self) -> Instant {
625        self.start_time
626    }
627
628    /// How long the handshake took to complete.
629    #[inline]
630    pub fn elapsed(&self) -> Duration {
631        self.time_handshake.unwrap_or_default()
632    }
633
634    pub(crate) fn set_elapsed(&mut self) {
635        let elapsed = self.start_time.elapsed();
636        self.time_handshake = Some(elapsed)
637    }
638
639    pub(crate) fn deadline(&self) -> Option<Instant> {
640        self.timeout.map(|timeout| self.start_time + timeout)
641    }
642
643    pub(crate) fn is_expired(&self) -> bool {
644        self.timeout
645            .is_some_and(|timeout| self.start_time.elapsed() >= timeout)
646    }
647}
648
649/// A trait to implement an application served over QUIC.
650///
651/// The application is driven by an internal worker task, which also handles I/O
652/// for the connection. The worker feeds inbound packets into the
653/// [quiche::Connection], calls [`ApplicationOverQuic::process_reads`] followed
654/// by [`ApplicationOverQuic::process_writes`], and then flushes any pending
655/// outbound packets to the network. This repeats in a loop until either the
656/// connection is closed or the [`ApplicationOverQuic`] returns an error.
657///
658/// In between loop iterations, the worker yields until a new packet arrives, a
659/// timer expires, or [`ApplicationOverQuic::wait_for_data`] resolves.
660/// Implementors can interact with the underlying connection via the mutable
661/// reference passed to trait methods.
662#[allow(unused_variables)] // for default functions
663pub trait ApplicationOverQuic: Send + 'static {
664    /// Callback to customize the [`ApplicationOverQuic`] after the QUIC
665    /// handshake completed successfully.
666    ///
667    /// # Errors
668    /// Returning an error from this method immediately stops the worker loop
669    /// and transitions to the connection closing stage.
670    fn on_conn_established(
671        &mut self, qconn: &mut QuicheConnection, handshake_info: &HandshakeInfo,
672    ) -> QuicResult<()>;
673
674    /// Determines whether the application's methods will be called by the
675    /// worker.
676    ///
677    /// The function is checked in each iteration of the worker loop. Only
678    /// `on_conn_established()` and `buffer()` bypass this check.
679    fn should_act(&self) -> bool;
680
681    /// A borrowed buffer for the worker to write outbound packets into.
682    ///
683    /// This method allows sharing a buffer between the worker and the
684    /// application, efficiently using the allocated memory while the
685    /// application is inactive. It can also be used to artificially
686    /// restrict the size of outbound network packets.
687    ///
688    /// Any data in the buffer may be overwritten by the worker. If necessary,
689    /// the application should save the contents when this method is called.
690    fn buffer(&mut self) -> &mut [u8];
691
692    /// Waits for an event to trigger the next iteration of the worker loop.
693    ///
694    /// The returned future is awaited in parallel to inbound packets and the
695    /// connection's timers. Any one of those futures resolving triggers the
696    /// next loop iteration, so implementations should not rely on
697    /// `wait_for_data` for the bulk of their processing. Instead, after
698    /// `wait_for_data` resolves, `process_writes` should be used to pull all
699    /// available data out of the event source (for example, a channel).
700    ///
701    /// As for any future, it is **very important** that this method does not
702    /// block the runtime. If it does, the other concurrent futures will be
703    /// starved.
704    ///
705    /// # Errors
706    /// Returning an error from this method immediately stops the worker loop
707    /// and transitions to the connection closing stage.
708    fn wait_for_data(
709        &mut self, qconn: &mut QuicheConnection,
710    ) -> impl Future<Output = QuicResult<()>> + Send;
711
712    /// Processes data received on the connection.
713    ///
714    /// This method is only called if `should_act()` returns `true` and any
715    /// packets were received since the last worker loop iteration. It
716    /// should be used to read from the connection's open streams.
717    ///
718    /// # Errors
719    /// Returning an error from this method immediately stops the worker loop
720    /// and transitions to the connection closing stage.
721    fn process_reads(&mut self, qconn: &mut QuicheConnection) -> QuicResult<()>;
722
723    /// Adds data to be sent on the connection.
724    ///
725    /// Unlike `process_reads`, this method is called on every iteration of the
726    /// worker loop (provided `should_act()` returns true). It is called
727    /// after `process_reads` and immediately before packets are pushed to
728    /// the socket. The main use case is providing already-buffered data to
729    /// the [quiche::Connection].
730    ///
731    /// # Errors
732    /// Returning an error from this method immediately stops the worker loop
733    /// and transitions to the connection closing stage.
734    fn process_writes(&mut self, qconn: &mut QuicheConnection) -> QuicResult<()>;
735
736    /// Callback to inspect the result of the worker task, before a final packet
737    /// with a `CONNECTION_CLOSE` frame is flushed to the network.
738    ///
739    /// `connection_result` is [`Ok`] only if the connection was closed without
740    /// any local error. Otherwise, the state of `qconn` depends on the
741    /// error type and application behavior.
742    fn on_conn_close<M: Metrics>(
743        &mut self, qconn: &mut QuicheConnection, metrics: &M,
744        connection_result: &QuicResult<()>,
745    ) {
746    }
747}
748
749/// A command to execute on a [quiche::Connection] in the context of an
750/// [`ApplicationOverQuic`].
751///
752/// We expect most [`ApplicationOverQuic`] implementations (such as
753/// [H3Driver](crate::http3::driver::H3Driver)) will provide some way to submit
754/// actions for them to take, for example via a channel. This enum may be
755/// accepted as part of those actions to inspect or alter the state of the
756/// underlying connection.
757pub enum QuicCommand {
758    /// Close the connection with the given parameters.
759    ///
760    /// Some packets may still be sent after this command has been executed, so
761    /// the worker task may continue running for a bit. See
762    /// [`quiche::Connection::close`] for details.
763    ConnectionClose(ConnectionShutdownBehaviour),
764    /// Execute a custom callback on the connection.
765    Custom(Box<dyn FnOnce(&mut QuicheConnection) + Send + 'static>),
766    /// Collect the current [`SocketStats`] from the connection.
767    ///
768    /// Unlike [`QuicConnection::stats()`], these statistics are not cached and
769    /// instead are retrieved right before the command is executed.
770    Stats(Box<dyn FnOnce(datagram_socket::SocketStats) + Send + 'static>),
771    /// Collect the current [`QuicConnectionStats`] from the connection.
772    ///
773    /// These statistics are not cached and instead are retrieved right before
774    /// the command is executed.
775    ConnectionStats(Box<dyn FnOnce(QuicConnectionStats) + Send + 'static>),
776}
777
778impl QuicCommand {
779    /// Consume the command and perform its operation on `qconn`.
780    ///
781    /// This method should be called by [`ApplicationOverQuic`] implementations
782    /// when they receive a [`QuicCommand`] to execute.
783    pub fn execute(self, qconn: &mut QuicheConnection) {
784        match self {
785            Self::ConnectionClose(behavior) => {
786                let ConnectionShutdownBehaviour {
787                    send_application_close,
788                    error_code,
789                    reason,
790                } = behavior;
791
792                let _ = qconn.close(send_application_close, error_code, &reason);
793            },
794            Self::Custom(f) => {
795                (f)(qconn);
796            },
797            Self::Stats(callback) => {
798                let stats_pair = QuicConnectionStats::from_conn(qconn);
799                (callback)(stats_pair.as_socket_stats());
800            },
801            Self::ConnectionStats(callback) => {
802                let stats_pair = QuicConnectionStats::from_conn(qconn);
803                (callback)(stats_pair);
804            },
805        }
806    }
807}
808
809impl fmt::Debug for QuicCommand {
810    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
811        match self {
812            Self::ConnectionClose(b) =>
813                f.debug_tuple("ConnectionClose").field(b).finish(),
814            Self::Custom(_) => f.debug_tuple("Custom").finish_non_exhaustive(),
815            Self::Stats(_) => f.debug_tuple("Stats").finish_non_exhaustive(),
816            Self::ConnectionStats(_) =>
817                f.debug_tuple("ConnectionStats").finish_non_exhaustive(),
818        }
819    }
820}
821
822/// Parameters to close a [quiche::Connection].
823///
824/// The connection will use these parameters for the `CONNECTION_CLOSE` frame
825/// it sends to its peer.
826#[derive(Debug, Clone)]
827pub struct ConnectionShutdownBehaviour {
828    /// Whether to send an application close or a regular close to the peer.
829    ///
830    /// If this is true but the connection is not in a state where it is safe to
831    /// send an application error (not established nor in early data), in
832    /// accordance with [RFC 9000](https://www.rfc-editor.org/rfc/rfc9000.html#section-10.2.3-3), the
833    /// error code is changed to `APPLICATION_ERROR` and the reason phrase is
834    /// cleared.
835    pub send_application_close: bool,
836    /// The [QUIC][proto-err] or [application-level][app-err] error code to send
837    /// to the peer.
838    ///
839    /// [proto-err]: https://www.rfc-editor.org/rfc/rfc9000.html#section-20.1
840    /// [app-err]: https://www.rfc-editor.org/rfc/rfc9000.html#section-20.2
841    pub error_code: u64,
842    /// The reason phrase to send to the peer.
843    pub reason: Vec<u8>,
844}