Skip to main content

tokio_quiche/quic/connection/
mod.rs

1// Copyright (C) 2025, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27mod error;
28mod id;
29mod map;
30
31pub use self::error::HandshakeError;
32pub use self::id::ConnectionIdGenerator;
33pub use self::id::SharedConnectionIdGenerator;
34pub use self::id::SimpleConnectionIdGenerator;
35pub(crate) use self::map::ConnectionMap;
36
37use boring::ssl::SslRef;
38use datagram_socket::AsSocketStats;
39use datagram_socket::DatagramSocketSend;
40use datagram_socket::MaybeConnectedSocket;
41use datagram_socket::QuicAuditStats;
42use datagram_socket::ShutdownConnection;
43use datagram_socket::SocketStats;
44use foundations::telemetry::log;
45use futures::future::BoxFuture;
46use futures::Future;
47use quiche::ConnectionId;
48use std::fmt;
49use std::io;
50use std::net::SocketAddr;
51use std::sync::Arc;
52use std::sync::Mutex;
53use std::task::Poll;
54use std::time::Duration;
55use std::time::Instant;
56use std::time::SystemTime;
57use tokio::sync::mpsc;
58use tokio_util::task::AbortOnDropHandle;
59
60use self::error::make_handshake_result;
61use super::io::connection_stage::Close;
62use super::io::connection_stage::ConnectionStageContext;
63use super::io::connection_stage::Handshake;
64use super::io::connection_stage::RunningApplication;
65use super::io::worker::Closing;
66use super::io::worker::IoWorkerParams;
67use super::io::worker::Running;
68use super::io::worker::RunningOrClosing;
69use super::io::worker::WriteState;
70use super::QuicheConnection;
71use crate::metrics::Metrics;
72use crate::quic::io::worker::IoWorker;
73use crate::quic::io::worker::WriterConfig;
74use crate::quic::io::worker::INCOMING_QUEUE_SIZE;
75use crate::quic::router::ConnectionMapCommand;
76use crate::QuicResult;
77
78/// Wrapper for connection statistics recorded by [quiche].
79#[derive(Debug)]
80pub struct QuicConnectionStats {
81    /// Aggregate connection statistics across all paths.
82    pub stats: quiche::Stats,
83    /// Specific statistics about the connection's active path.
84    pub path_stats: Option<quiche::PathStats>,
85}
86pub(crate) type QuicConnectionStatsShared = Arc<Mutex<QuicConnectionStats>>;
87
88impl QuicConnectionStats {
89    pub(crate) fn from_conn(qconn: &QuicheConnection) -> Self {
90        Self {
91            stats: qconn.stats(),
92            path_stats: qconn.path_stats().next(),
93        }
94    }
95
96    fn startup_exit_to_socket_stats(
97        value: quiche::StartupExit,
98    ) -> datagram_socket::StartupExit {
99        let reason = match value.reason {
100            quiche::StartupExitReason::Loss =>
101                datagram_socket::StartupExitReason::Loss,
102            quiche::StartupExitReason::BandwidthPlateau =>
103                datagram_socket::StartupExitReason::BandwidthPlateau,
104            quiche::StartupExitReason::PersistentQueue =>
105                datagram_socket::StartupExitReason::PersistentQueue,
106            quiche::StartupExitReason::ConservativeSlowStartRounds =>
107                datagram_socket::StartupExitReason::ConservativeSlowStartRounds,
108        };
109
110        datagram_socket::StartupExit {
111            cwnd: value.cwnd,
112            bandwidth: value.bandwidth,
113            reason,
114        }
115    }
116}
117
118impl AsSocketStats for QuicConnectionStats {
119    fn as_socket_stats(&self) -> SocketStats {
120        SocketStats {
121            pmtu: self
122                .path_stats
123                .as_ref()
124                .map(|p| p.pmtu as u16)
125                .unwrap_or_default(),
126            rtt_us: self
127                .path_stats
128                .as_ref()
129                .map(|p| p.rtt.as_micros() as i64)
130                .unwrap_or_default(),
131            min_rtt_us: self
132                .path_stats
133                .as_ref()
134                .and_then(|p| p.min_rtt.map(|x| x.as_micros() as i64))
135                .unwrap_or_default(),
136            max_rtt_us: self
137                .path_stats
138                .as_ref()
139                .and_then(|p| p.max_rtt.map(|x| x.as_micros() as i64))
140                .unwrap_or_default(),
141            rtt_var_us: self
142                .path_stats
143                .as_ref()
144                .map(|p| p.rttvar.as_micros() as i64)
145                .unwrap_or_default(),
146            cwnd: self
147                .path_stats
148                .as_ref()
149                .map(|p| p.cwnd as u64)
150                .unwrap_or_default(),
151            total_pto_count: self
152                .path_stats
153                .as_ref()
154                .map(|p| p.total_pto_count as u64)
155                .unwrap_or_default(),
156            packets_sent: self.stats.sent as u64,
157            packets_recvd: self.stats.recv as u64,
158            packets_lost: self.stats.lost as u64,
159            packets_lost_spurious: self.stats.spurious_lost as u64,
160            packets_retrans: self.stats.retrans as u64,
161            bytes_sent: self.stats.sent_bytes,
162            bytes_recvd: self.stats.recv_bytes,
163            bytes_lost: self.stats.lost_bytes,
164            bytes_retrans: self.stats.stream_retrans_bytes,
165            bytes_unsent: 0, /* not implemented yet, kept for compatibility
166                              * with TCP */
167            delivery_rate: self
168                .path_stats
169                .as_ref()
170                .map(|p| p.delivery_rate)
171                .unwrap_or_default(),
172            max_bandwidth: self.path_stats.as_ref().and_then(|p| p.max_bandwidth),
173            startup_exit: self
174                .path_stats
175                .as_ref()
176                .and_then(|p| p.startup_exit)
177                .map(QuicConnectionStats::startup_exit_to_socket_stats),
178            bytes_in_flight_duration_us: self
179                .stats
180                .bytes_in_flight_duration
181                .as_micros() as u64,
182        }
183    }
184}
185
186/// A received network packet with additional metadata.
187#[derive(Debug)]
188pub struct Incoming {
189    /// The address that sent the inbound packet.
190    pub peer_addr: SocketAddr,
191    /// The address on which we received the inbound packet.
192    pub local_addr: SocketAddr,
193    /// The receive timestamp of the packet.
194    ///
195    /// Used for the `perf-quic-listener-metrics` feature.
196    pub rx_time: Option<SystemTime>,
197    /// The packet's contents.
198    pub buf: Vec<u8>,
199    /// If set, then `buf` is a GRO buffer containing multiple packets.
200    /// Each individual packet has a size of `gso` (except for the last one).
201    pub gro: Option<i32>,
202    /// [SO_MARK] control message value received from the socket.
203    ///
204    /// This will always be `None` after the connection has been spawned as
205    /// the message is `take()`d before spawning.
206    ///
207    /// [SO_MARK]: https://man7.org/linux/man-pages/man7/socket.7.html
208    #[cfg(target_os = "linux")]
209    pub so_mark_data: Option<[u8; 4]>,
210}
211
212/// A QUIC connection that has not performed a handshake yet.
213///
214/// This type is currently only used for server-side connections. It is created
215/// and added to the listener's connection stream after an initial packet from
216/// a client has been received and (optionally) the client's IP address has been
217/// validated.
218///
219/// To turn the initial connection into a fully established one, a QUIC
220/// handshake must be performed. Users have multiple options to facilitate this:
221/// - `start` is a simple entrypoint which spawns a task to handle the entire
222///   lifetime of the QUIC connection. The caller can then only communicate with
223///   the connection via their [`ApplicationOverQuic`].
224/// - `handshake` spawns a task for the handshake and awaits its completion.
225///   Afterwards, it pauses the connection and allows the caller to resume it
226///   later via an opaque struct. We spawn a separate task to allow the tokio
227///   scheduler free choice in where to run the handshake.
228/// - `handshake_fut` returns a future to drive the handshake for maximum
229///   flexibility.
230#[must_use = "call InitialQuicConnection::start to establish the connection"]
231pub struct InitialQuicConnection<Tx, M>
232where
233    Tx: DatagramSocketSend + Send + 'static + ?Sized,
234    M: Metrics,
235{
236    params: QuicConnectionParams<Tx, M>,
237    pub(crate) audit_log_stats: Arc<QuicAuditStats>,
238    stats: QuicConnectionStatsShared,
239    pub(crate) incoming_ev_sender: mpsc::Sender<Incoming>,
240    incoming_ev_receiver: mpsc::Receiver<Incoming>,
241}
242
243impl<Tx, M> InitialQuicConnection<Tx, M>
244where
245    Tx: DatagramSocketSend + Send + 'static + ?Sized,
246    M: Metrics,
247{
248    #[inline]
249    pub(crate) fn new(params: QuicConnectionParams<Tx, M>) -> Self {
250        let (incoming_ev_sender, incoming_ev_receiver) =
251            mpsc::channel(INCOMING_QUEUE_SIZE);
252        let audit_log_stats = Arc::new(QuicAuditStats::new(params.scid.to_vec()));
253
254        let stats = Arc::new(Mutex::new(QuicConnectionStats::from_conn(
255            &params.quiche_conn,
256        )));
257
258        Self {
259            params,
260            audit_log_stats,
261            stats,
262            incoming_ev_sender,
263            incoming_ev_receiver,
264        }
265    }
266
267    /// The local address this connection listens on.
268    pub fn local_addr(&self) -> SocketAddr {
269        self.params.local_addr
270    }
271
272    /// The remote address for this connection.
273    pub fn peer_addr(&self) -> SocketAddr {
274        self.params.peer_addr
275    }
276
277    /// [boring]'s SSL object for this connection.
278    #[doc(hidden)]
279    pub fn ssl_mut(&mut self) -> &mut SslRef {
280        self.params.quiche_conn.as_mut()
281    }
282
283    /// A handle to the [`QuicAuditStats`] for this connection.
284    ///
285    /// # Note
286    /// These stats are updated during the lifetime of the connection.
287    /// The getter exists to grab a handle early on, which can then
288    /// be stowed away and read out after the connection has closed.
289    #[inline]
290    pub fn audit_log_stats(&self) -> Arc<QuicAuditStats> {
291        Arc::clone(&self.audit_log_stats)
292    }
293
294    /// A handle to the [`QuicConnectionStats`] for this connection.
295    ///
296    /// # Note
297    /// Initially, these stats represent the state when the [quiche::Connection]
298    /// was created. They are updated when the connection is closed, so this
299    /// getter exists primarily to grab a handle early on.
300    #[inline]
301    pub fn stats(&self) -> &QuicConnectionStatsShared {
302        &self.stats
303    }
304
305    /// Creates a future to drive the connection's handshake.
306    ///
307    /// This is a lower-level alternative to the `handshake` function which
308    /// gives the caller more control over execution of the future. See
309    /// `handshake` for details on the return values.
310    #[allow(clippy::type_complexity)]
311    pub fn handshake_fut<A: ApplicationOverQuic>(
312        self, app: A,
313    ) -> (
314        QuicConnection,
315        BoxFuture<'static, io::Result<Running<Arc<Tx>, M, A>>>,
316    ) {
317        self.params.metrics.connections_in_memory().inc();
318
319        let conn = QuicConnection {
320            local_addr: self.params.local_addr,
321            peer_addr: self.params.peer_addr,
322            audit_log_stats: Arc::clone(&self.audit_log_stats),
323            stats: Arc::clone(&self.stats),
324            scid: self.params.scid,
325        };
326        let context = ConnectionStageContext {
327            in_pkt: self.params.initial_pkt,
328            incoming_pkt_receiver: self.incoming_ev_receiver,
329            application: app,
330            stats: Arc::clone(&self.stats),
331        };
332        let conn_stage = Handshake {
333            handshake_info: self.params.handshake_info,
334        };
335        let params = IoWorkerParams {
336            socket: MaybeConnectedSocket::new(self.params.socket),
337            shutdown_tx: self.params.shutdown_tx,
338            cfg: self.params.writer_cfg,
339            audit_log_stats: self.audit_log_stats,
340            write_state: WriteState::default(),
341            conn_map_cmd_tx: self.params.conn_map_cmd_tx,
342            cid_generator: self.params.cid_generator,
343            #[cfg(feature = "perf-quic-listener-metrics")]
344            init_rx_time: self.params.init_rx_time,
345            metrics: self.params.metrics.clone(),
346        };
347
348        let handshake_fut = async move {
349            let qconn = self.params.quiche_conn;
350            let handshake_done =
351                IoWorker::new(params, conn_stage).run(qconn, context).await;
352
353            match handshake_done {
354                RunningOrClosing::Running(r) => Ok(r),
355                RunningOrClosing::Closing(Closing {
356                    params,
357                    work_loop_result,
358                    mut context,
359                    mut qconn,
360                }) => {
361                    let hs_result = make_handshake_result(&work_loop_result);
362                    IoWorker::new(params, Close { work_loop_result })
363                        .close(&mut qconn, &mut context)
364                        .await;
365                    hs_result
366                },
367            }
368        };
369
370        (conn, Box::pin(handshake_fut))
371    }
372
373    /// Performs the QUIC handshake in a separate tokio task and awaits its
374    /// completion.
375    ///
376    /// The returned [`QuicConnection`] holds metadata about the established
377    /// connection. The connection itself is paused after `handshake`
378    /// returns and must be resumed by passing the opaque `Running` value to
379    /// [`InitialQuicConnection::resume`]. This two-step process
380    /// allows callers to collect telemetry and run code before serving their
381    /// [`ApplicationOverQuic`].
382    pub async fn handshake<A: ApplicationOverQuic>(
383        self, app: A,
384    ) -> io::Result<(QuicConnection, Running<Arc<Tx>, M, A>)> {
385        let task_metrics = self.params.metrics.clone();
386        let (conn, handshake_fut) = Self::handshake_fut(self, app);
387
388        let handshake_handle = crate::metrics::tokio_task::spawn(
389            "quic_handshake_worker",
390            task_metrics,
391            handshake_fut,
392        );
393
394        // `AbortOnDropHandle` simulates task-killswitch behavior without needing
395        // to give up ownership of the `JoinHandle`.
396        let handshake_abort_handle = AbortOnDropHandle::new(handshake_handle);
397
398        let worker = handshake_abort_handle.await??;
399
400        Ok((conn, worker))
401    }
402
403    /// Resumes a QUIC connection which was paused after a successful handshake.
404    pub fn resume<A: ApplicationOverQuic>(pre_running: Running<Arc<Tx>, M, A>) {
405        let task_metrics = pre_running.params.metrics.clone();
406        let fut = async move {
407            let Running {
408                params,
409                context,
410                qconn,
411            } = pre_running;
412            let running_worker = IoWorker::new(params, RunningApplication);
413
414            let Closing {
415                params,
416                mut context,
417                work_loop_result,
418                mut qconn,
419            } = running_worker.run(qconn, context).await;
420
421            IoWorker::new(params, Close { work_loop_result })
422                .close(&mut qconn, &mut context)
423                .await;
424        };
425
426        crate::metrics::tokio_task::spawn_with_killswitch(
427            "quic_io_worker",
428            task_metrics,
429            fut,
430        );
431    }
432
433    /// Drives a QUIC connection from handshake to close in separate tokio
434    /// tasks.
435    ///
436    /// It combines [`InitialQuicConnection::handshake`] and
437    /// [`InitialQuicConnection::resume`] into a single call.
438    pub fn start<A: ApplicationOverQuic>(self, app: A) -> QuicConnection {
439        let task_metrics = self.params.metrics.clone();
440        let (conn, handshake_fut) = Self::handshake_fut(self, app);
441
442        let fut = async move {
443            match handshake_fut.await {
444                Ok(running) => Self::resume(running),
445                Err(e) => {
446                    log::error!("QUIC handshake failed in IQC::start"; "error" => e)
447                },
448            }
449        };
450
451        crate::metrics::tokio_task::spawn_with_killswitch(
452            "quic_handshake_worker",
453            task_metrics,
454            fut,
455        );
456
457        conn
458    }
459}
460
461pub(crate) struct QuicConnectionParams<Tx, M>
462where
463    Tx: DatagramSocketSend + Send + 'static + ?Sized,
464    M: Metrics,
465{
466    pub writer_cfg: WriterConfig,
467    pub initial_pkt: Option<Incoming>,
468    pub shutdown_tx: mpsc::Sender<()>,
469    pub conn_map_cmd_tx: mpsc::UnboundedSender<ConnectionMapCommand>, /* channel that signals connection map changes */
470    pub scid: ConnectionId<'static>,
471    pub cid_generator: Option<SharedConnectionIdGenerator>,
472    pub metrics: M,
473    #[cfg(feature = "perf-quic-listener-metrics")]
474    pub init_rx_time: Option<SystemTime>,
475    pub handshake_info: HandshakeInfo,
476    pub quiche_conn: QuicheConnection,
477    pub socket: Arc<Tx>,
478    pub local_addr: SocketAddr,
479    pub peer_addr: SocketAddr,
480}
481
482/// Metadata about an established QUIC connection.
483///
484/// While this struct allows access to some facets of a QUIC connection, it
485/// notably does not represent the [quiche::Connection] itself. The crate
486/// handles most interactions with [quiche] internally in a worker task. Users
487/// can only access the connection directly via their [`ApplicationOverQuic`]
488/// implementation.
489///
490/// See the [module-level docs](crate::quic) for an overview of how a QUIC
491/// connection is handled internally.
492pub struct QuicConnection {
493    local_addr: SocketAddr,
494    peer_addr: SocketAddr,
495    audit_log_stats: Arc<QuicAuditStats>,
496    stats: QuicConnectionStatsShared,
497    scid: ConnectionId<'static>,
498}
499
500impl QuicConnection {
501    /// The local address this connection listens on.
502    #[inline]
503    pub fn local_addr(&self) -> SocketAddr {
504        self.local_addr
505    }
506
507    /// The remote address for this connection.
508    #[inline]
509    pub fn peer_addr(&self) -> SocketAddr {
510        self.peer_addr
511    }
512
513    /// A handle to the [`QuicAuditStats`] for this connection.
514    ///
515    /// # Note
516    /// These stats are updated during the lifetime of the connection.
517    /// The getter exists to grab a handle early on, which can then
518    /// be stowed away and read out after the connection has closed.
519    #[inline]
520    pub fn audit_log_stats(&self) -> &Arc<QuicAuditStats> {
521        &self.audit_log_stats
522    }
523
524    /// A handle to the [`QuicConnectionStats`] for this connection.
525    ///
526    /// # Note
527    /// Initially, these stats represent the state when the [quiche::Connection]
528    /// was created. They are updated when the connection is closed, so this
529    /// getter exists primarily to grab a handle early on.
530    #[inline]
531    pub fn stats(&self) -> &QuicConnectionStatsShared {
532        &self.stats
533    }
534
535    /// The QUIC source connection ID used by this connection.
536    #[inline]
537    pub fn scid(&self) -> &ConnectionId<'static> {
538        &self.scid
539    }
540}
541
542impl AsSocketStats for QuicConnection {
543    #[inline]
544    fn as_socket_stats(&self) -> SocketStats {
545        // It is important to note that those stats are only updated when
546        // the connection stops, which is fine, since this is only used to
547        // log after the connection is finished.
548        self.stats.lock().unwrap().as_socket_stats()
549    }
550
551    #[inline]
552    fn as_quic_stats(&self) -> Option<&Arc<QuicAuditStats>> {
553        Some(&self.audit_log_stats)
554    }
555}
556
557impl<Tx, M> AsSocketStats for InitialQuicConnection<Tx, M>
558where
559    Tx: DatagramSocketSend + Send + 'static + ?Sized,
560    M: Metrics,
561{
562    #[inline]
563    fn as_socket_stats(&self) -> SocketStats {
564        // It is important to note that those stats are only updated when
565        // the connection stops, which is fine, since this is only used to
566        // log after the connection is finished.
567        self.stats.lock().unwrap().as_socket_stats()
568    }
569
570    #[inline]
571    fn as_quic_stats(&self) -> Option<&Arc<QuicAuditStats>> {
572        Some(&self.audit_log_stats)
573    }
574}
575
576impl<Tx, M> ShutdownConnection for InitialQuicConnection<Tx, M>
577where
578    Tx: DatagramSocketSend + Send + 'static + ?Sized,
579    M: Metrics,
580{
581    #[inline]
582    fn poll_shutdown(
583        &mut self, _cx: &mut std::task::Context,
584    ) -> std::task::Poll<io::Result<()>> {
585        // TODO: Does nothing at the moment. We always call Self::start
586        // anyway so it's not really important at this moment.
587        Poll::Ready(Ok(()))
588    }
589}
590
591impl ShutdownConnection for QuicConnection {
592    #[inline]
593    fn poll_shutdown(
594        &mut self, _cx: &mut std::task::Context,
595    ) -> std::task::Poll<io::Result<()>> {
596        // TODO: does nothing at the moment
597        Poll::Ready(Ok(()))
598    }
599}
600
601/// Details about a connection's QUIC handshake.
602#[derive(Debug, Clone)]
603pub struct HandshakeInfo {
604    /// The time at which the connection was created.
605    start_time: Instant,
606    /// The timeout before which the handshake must complete.
607    timeout: Option<Duration>,
608    /// The real duration that the handshake took to complete.
609    time_handshake: Option<Duration>,
610}
611
612impl HandshakeInfo {
613    pub(crate) fn new(start_time: Instant, timeout: Option<Duration>) -> Self {
614        Self {
615            start_time,
616            timeout,
617            time_handshake: None,
618        }
619    }
620
621    /// The time at which the connection was created.
622    #[inline]
623    pub fn start_time(&self) -> Instant {
624        self.start_time
625    }
626
627    /// How long the handshake took to complete.
628    #[inline]
629    pub fn elapsed(&self) -> Duration {
630        self.time_handshake.unwrap_or_default()
631    }
632
633    pub(crate) fn set_elapsed(&mut self) {
634        let elapsed = self.start_time.elapsed();
635        self.time_handshake = Some(elapsed)
636    }
637
638    pub(crate) fn deadline(&self) -> Option<Instant> {
639        self.timeout.map(|timeout| self.start_time + timeout)
640    }
641
642    pub(crate) fn is_expired(&self) -> bool {
643        self.timeout
644            .is_some_and(|timeout| self.start_time.elapsed() >= timeout)
645    }
646}
647
648/// A trait to implement an application served over QUIC.
649///
650/// The application is driven by an internal worker task, which also handles I/O
651/// for the connection. The worker feeds inbound packets into the
652/// [quiche::Connection], calls [`ApplicationOverQuic::process_reads`] followed
653/// by [`ApplicationOverQuic::process_writes`], and then flushes any pending
654/// outbound packets to the network. This repeats in a loop until either the
655/// connection is closed or the [`ApplicationOverQuic`] returns an error.
656///
657/// In between loop iterations, the worker yields until a new packet arrives, a
658/// timer expires, or [`ApplicationOverQuic::wait_for_data`] resolves.
659/// Implementors can interact with the underlying connection via the mutable
660/// reference passed to trait methods.
661#[allow(unused_variables)] // for default functions
662pub trait ApplicationOverQuic: Send + 'static {
663    /// Callback to customize the [`ApplicationOverQuic`] after the QUIC
664    /// handshake completed successfully.
665    ///
666    /// # Errors
667    /// Returning an error from this method immediately stops the worker loop
668    /// and transitions to the connection closing stage.
669    fn on_conn_established(
670        &mut self, qconn: &mut QuicheConnection, handshake_info: &HandshakeInfo,
671    ) -> QuicResult<()>;
672
673    /// Determines whether the application's methods will be called by the
674    /// worker.
675    ///
676    /// The function is checked in each iteration of the worker loop. Only
677    /// `on_conn_established()` and `buffer()` bypass this check.
678    fn should_act(&self) -> bool;
679
680    /// A borrowed buffer for the worker to write outbound packets into.
681    ///
682    /// This method allows sharing a buffer between the worker and the
683    /// application, efficiently using the allocated memory while the
684    /// application is inactive. It can also be used to artificially
685    /// restrict the size of outbound network packets.
686    ///
687    /// Any data in the buffer may be overwritten by the worker. If necessary,
688    /// the application should save the contents when this method is called.
689    fn buffer(&mut self) -> &mut [u8];
690
691    /// Waits for an event to trigger the next iteration of the worker loop.
692    ///
693    /// The returned future is awaited in parallel to inbound packets and the
694    /// connection's timers. Any one of those futures resolving triggers the
695    /// next loop iteration, so implementations should not rely on
696    /// `wait_for_data` for the bulk of their processing. Instead, after
697    /// `wait_for_data` resolves, `process_writes` should be used to pull all
698    /// available data out of the event source (for example, a channel).
699    ///
700    /// As for any future, it is **very important** that this method does not
701    /// block the runtime. If it does, the other concurrent futures will be
702    /// starved.
703    ///
704    /// # Cancel safety
705    /// This method MUST be cancel safe.
706    /// It gets called inside select! and could be (repeatedly) cancelled
707    ///
708    /// # Errors
709    /// Returning an error from this method immediately stops the worker loop
710    /// and transitions to the connection closing stage.
711    fn wait_for_data(
712        &mut self, qconn: &mut QuicheConnection,
713    ) -> impl Future<Output = QuicResult<()>> + Send;
714
715    /// Processes data received on the connection.
716    ///
717    /// This method is only called if `should_act()` returns `true` and any
718    /// packets were received since the last worker loop iteration. It
719    /// should be used to read from the connection's open streams.
720    ///
721    /// # Errors
722    /// Returning an error from this method immediately stops the worker loop
723    /// and transitions to the connection closing stage.
724    fn process_reads(&mut self, qconn: &mut QuicheConnection) -> QuicResult<()>;
725
726    /// Adds data to be sent on the connection.
727    ///
728    /// Unlike `process_reads`, this method is called on every iteration of the
729    /// worker loop (provided `should_act()` returns true). It is called
730    /// after `process_reads` and immediately before packets are pushed to
731    /// the socket. The main use case is providing already-buffered data to
732    /// the [quiche::Connection].
733    ///
734    /// # Errors
735    /// Returning an error from this method immediately stops the worker loop
736    /// and transitions to the connection closing stage.
737    fn process_writes(&mut self, qconn: &mut QuicheConnection) -> QuicResult<()>;
738
739    /// Callback to inspect the result of the worker task, before a final packet
740    /// with a `CONNECTION_CLOSE` frame is flushed to the network.
741    ///
742    /// `connection_result` is [`Ok`] only if the connection was closed without
743    /// any local error. Otherwise, the state of `qconn` depends on the
744    /// error type and application behavior.
745    fn on_conn_close<M: Metrics>(
746        &mut self, qconn: &mut QuicheConnection, metrics: &M,
747        connection_result: &QuicResult<()>,
748    ) {
749    }
750}
751
752/// A command to execute on a [quiche::Connection] in the context of an
753/// [`ApplicationOverQuic`].
754///
755/// We expect most [`ApplicationOverQuic`] implementations (such as
756/// [H3Driver](crate::http3::driver::H3Driver)) will provide some way to submit
757/// actions for them to take, for example via a channel. This enum may be
758/// accepted as part of those actions to inspect or alter the state of the
759/// underlying connection.
760pub enum QuicCommand {
761    /// Close the connection with the given parameters.
762    ///
763    /// Some packets may still be sent after this command has been executed, so
764    /// the worker task may continue running for a bit. See
765    /// [`quiche::Connection::close`] for details.
766    ConnectionClose(ConnectionShutdownBehaviour),
767    /// Execute a custom callback on the connection.
768    Custom(Box<dyn FnOnce(&mut QuicheConnection) + Send + 'static>),
769    /// Collect the current [`SocketStats`] from the connection.
770    ///
771    /// Unlike [`QuicConnection::stats()`], these statistics are not cached and
772    /// instead are retrieved right before the command is executed.
773    Stats(Box<dyn FnOnce(datagram_socket::SocketStats) + Send + 'static>),
774    /// Collect the current [`QuicConnectionStats`] from the connection.
775    ///
776    /// These statistics are not cached and instead are retrieved right before
777    /// the command is executed.
778    ConnectionStats(Box<dyn FnOnce(QuicConnectionStats) + Send + 'static>),
779}
780
781impl QuicCommand {
782    /// Consume the command and perform its operation on `qconn`.
783    ///
784    /// This method should be called by [`ApplicationOverQuic`] implementations
785    /// when they receive a [`QuicCommand`] to execute.
786    pub fn execute(self, qconn: &mut QuicheConnection) {
787        match self {
788            Self::ConnectionClose(behavior) => {
789                let ConnectionShutdownBehaviour {
790                    send_application_close,
791                    error_code,
792                    reason,
793                } = behavior;
794
795                let _ = qconn.close(send_application_close, error_code, &reason);
796            },
797            Self::Custom(f) => {
798                (f)(qconn);
799            },
800            Self::Stats(callback) => {
801                let stats_pair = QuicConnectionStats::from_conn(qconn);
802                (callback)(stats_pair.as_socket_stats());
803            },
804            Self::ConnectionStats(callback) => {
805                let stats_pair = QuicConnectionStats::from_conn(qconn);
806                (callback)(stats_pair);
807            },
808        }
809    }
810}
811
812impl fmt::Debug for QuicCommand {
813    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
814        match self {
815            Self::ConnectionClose(b) =>
816                f.debug_tuple("ConnectionClose").field(b).finish(),
817            Self::Custom(_) => f.debug_tuple("Custom").finish_non_exhaustive(),
818            Self::Stats(_) => f.debug_tuple("Stats").finish_non_exhaustive(),
819            Self::ConnectionStats(_) =>
820                f.debug_tuple("ConnectionStats").finish_non_exhaustive(),
821        }
822    }
823}
824
825/// Parameters to close a [quiche::Connection].
826///
827/// The connection will use these parameters for the `CONNECTION_CLOSE` frame
828/// it sends to its peer.
829#[derive(Debug, Clone)]
830pub struct ConnectionShutdownBehaviour {
831    /// Whether to send an application close or a regular close to the peer.
832    ///
833    /// If this is true but the connection is not in a state where it is safe to
834    /// send an application error (not established nor in early data), in
835    /// accordance with [RFC 9000](https://www.rfc-editor.org/rfc/rfc9000.html#section-10.2.3-3), the
836    /// error code is changed to `APPLICATION_ERROR` and the reason phrase is
837    /// cleared.
838    pub send_application_close: bool,
839    /// The [QUIC][proto-err] or [application-level][app-err] error code to send
840    /// to the peer.
841    ///
842    /// [proto-err]: https://www.rfc-editor.org/rfc/rfc9000.html#section-20.1
843    /// [app-err]: https://www.rfc-editor.org/rfc/rfc9000.html#section-20.2
844    pub error_code: u64,
845    /// The reason phrase to send to the peer.
846    pub reason: Vec<u8>,
847}