tokio_quiche/quic/connection/mod.rs
1// Copyright (C) 2025, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8// * Redistributions of source code must retain the above copyright notice,
9// this list of conditions and the following disclaimer.
10//
11// * Redistributions in binary form must reproduce the above copyright
12// notice, this list of conditions and the following disclaimer in the
13// documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27mod error;
28mod id;
29mod map;
30
31pub use self::error::HandshakeError;
32pub use self::id::ConnectionIdGenerator;
33pub use self::id::SimpleConnectionIdGenerator;
34pub(crate) use self::map::ConnectionMap;
35
36use boring::ssl::SslRef;
37use datagram_socket::AsSocketStats;
38use datagram_socket::DatagramSocketSend;
39use datagram_socket::MaybeConnectedSocket;
40use datagram_socket::QuicAuditStats;
41use datagram_socket::ShutdownConnection;
42use datagram_socket::SocketStats;
43use foundations::telemetry::log;
44use futures::future::BoxFuture;
45use futures::Future;
46use quiche::ConnectionId;
47use std::fmt;
48use std::io;
49use std::net::SocketAddr;
50use std::sync::Arc;
51use std::sync::Mutex;
52use std::task::Poll;
53use std::time::Duration;
54use std::time::Instant;
55use std::time::SystemTime;
56use tokio::sync::mpsc;
57use tokio_util::task::AbortOnDropHandle;
58
59use self::error::make_handshake_result;
60use super::io::connection_stage::Close;
61use super::io::connection_stage::ConnectionStageContext;
62use super::io::connection_stage::Handshake;
63use super::io::connection_stage::RunningApplication;
64use super::io::worker::Closing;
65use super::io::worker::IoWorkerParams;
66use super::io::worker::Running;
67use super::io::worker::RunningOrClosing;
68use super::io::worker::WriteState;
69use super::QuicheConnection;
70use crate::buf_factory::PooledBuf;
71use crate::metrics::Metrics;
72use crate::quic::io::worker::IoWorker;
73use crate::quic::io::worker::WriterConfig;
74use crate::quic::io::worker::INCOMING_QUEUE_SIZE;
75use crate::quic::router::ConnectionMapCommand;
76use crate::QuicResult;
77
78/// Wrapper for connection statistics recorded by [quiche].
79#[derive(Debug)]
80pub struct QuicConnectionStats {
81 /// Aggregate connection statistics across all paths.
82 pub stats: quiche::Stats,
83 /// Specific statistics about the connection's active path.
84 pub path_stats: Option<quiche::PathStats>,
85}
86pub(crate) type QuicConnectionStatsShared = Arc<Mutex<QuicConnectionStats>>;
87
88impl QuicConnectionStats {
89 pub(crate) fn from_conn(qconn: &QuicheConnection) -> Self {
90 Self {
91 stats: qconn.stats(),
92 path_stats: qconn.path_stats().next(),
93 }
94 }
95
96 fn startup_exit_to_socket_stats(
97 value: quiche::StartupExit,
98 ) -> datagram_socket::StartupExit {
99 let reason = match value.reason {
100 quiche::StartupExitReason::Loss =>
101 datagram_socket::StartupExitReason::Loss,
102 quiche::StartupExitReason::BandwidthPlateau =>
103 datagram_socket::StartupExitReason::BandwidthPlateau,
104 quiche::StartupExitReason::PersistentQueue =>
105 datagram_socket::StartupExitReason::PersistentQueue,
106 };
107
108 datagram_socket::StartupExit {
109 cwnd: value.cwnd,
110 bandwidth: value.bandwidth,
111 reason,
112 }
113 }
114}
115
116impl AsSocketStats for QuicConnectionStats {
117 fn as_socket_stats(&self) -> SocketStats {
118 SocketStats {
119 pmtu: self
120 .path_stats
121 .as_ref()
122 .map(|p| p.pmtu as u16)
123 .unwrap_or_default(),
124 rtt_us: self
125 .path_stats
126 .as_ref()
127 .map(|p| p.rtt.as_micros() as i64)
128 .unwrap_or_default(),
129 min_rtt_us: self
130 .path_stats
131 .as_ref()
132 .and_then(|p| p.min_rtt.map(|x| x.as_micros() as i64))
133 .unwrap_or_default(),
134 max_rtt_us: self
135 .path_stats
136 .as_ref()
137 .and_then(|p| p.max_rtt.map(|x| x.as_micros() as i64))
138 .unwrap_or_default(),
139 rtt_var_us: self
140 .path_stats
141 .as_ref()
142 .map(|p| p.rttvar.as_micros() as i64)
143 .unwrap_or_default(),
144 cwnd: self
145 .path_stats
146 .as_ref()
147 .map(|p| p.cwnd as u64)
148 .unwrap_or_default(),
149 total_pto_count: self
150 .path_stats
151 .as_ref()
152 .map(|p| p.total_pto_count as u64)
153 .unwrap_or_default(),
154 packets_sent: self.stats.sent as u64,
155 packets_recvd: self.stats.recv as u64,
156 packets_lost: self.stats.lost as u64,
157 packets_lost_spurious: self.stats.spurious_lost as u64,
158 packets_retrans: self.stats.retrans as u64,
159 bytes_sent: self.stats.sent_bytes,
160 bytes_recvd: self.stats.recv_bytes,
161 bytes_lost: self.stats.lost_bytes,
162 bytes_retrans: self.stats.stream_retrans_bytes,
163 bytes_unsent: 0, /* not implemented yet, kept for compatibility
164 * with TCP */
165 delivery_rate: self
166 .path_stats
167 .as_ref()
168 .map(|p| p.delivery_rate)
169 .unwrap_or_default(),
170 max_bandwidth: self.path_stats.as_ref().and_then(|p| p.max_bandwidth),
171 startup_exit: self
172 .path_stats
173 .as_ref()
174 .and_then(|p| p.startup_exit)
175 .map(QuicConnectionStats::startup_exit_to_socket_stats),
176 bytes_in_flight_duration_us: self
177 .stats
178 .bytes_in_flight_duration
179 .as_micros() as u64,
180 }
181 }
182}
183
184/// A received network packet with additional metadata.
185#[derive(Debug)]
186pub struct Incoming {
187 /// The address that sent the inbound packet.
188 pub peer_addr: SocketAddr,
189 /// The address on which we received the inbound packet.
190 pub local_addr: SocketAddr,
191 /// The receive timestamp of the packet.
192 ///
193 /// Used for the `perf-quic-listener-metrics` feature.
194 pub rx_time: Option<SystemTime>,
195 /// The packet's contents.
196 pub buf: PooledBuf,
197 /// If set, then `buf` is a GRO buffer containing multiple packets.
198 /// Each individual packet has a size of `gso` (except for the last one).
199 pub gro: Option<i32>,
200 /// [SO_MARK] control message value received from the socket.
201 ///
202 /// This will always be `None` after the connection has been spawned as
203 /// the message is `take()`d before spawning.
204 ///
205 /// [SO_MARK]: https://man7.org/linux/man-pages/man7/socket.7.html
206 #[cfg(target_os = "linux")]
207 pub so_mark_data: Option<[u8; 4]>,
208}
209
210/// A QUIC connection that has not performed a handshake yet.
211///
212/// This type is currently only used for server-side connections. It is created
213/// and added to the listener's connection stream after an initial packet from
214/// a client has been received and (optionally) the client's IP address has been
215/// validated.
216///
217/// To turn the initial connection into a fully established one, a QUIC
218/// handshake must be performed. Users have multiple options to facilitate this:
219/// - `start` is a simple entrypoint which spawns a task to handle the entire
220/// lifetime of the QUIC connection. The caller can then only communicate with
221/// the connection via their [`ApplicationOverQuic`].
222/// - `handshake` spawns a task for the handshake and awaits its completion.
223/// Afterwards, it pauses the connection and allows the caller to resume it
224/// later via an opaque struct. We spawn a separate task to allow the tokio
225/// scheduler free choice in where to run the handshake.
226/// - `handshake_fut` returns a future to drive the handshake for maximum
227/// flexibility.
228#[must_use = "call InitialQuicConnection::start to establish the connection"]
229pub struct InitialQuicConnection<Tx, M>
230where
231 Tx: DatagramSocketSend + Send + 'static + ?Sized,
232 M: Metrics,
233{
234 /// An internal ID, to uniquely identify the connection across multiple QUIC
235 /// connection IDs.
236 pub(crate) id: u64,
237 params: QuicConnectionParams<Tx, M>,
238 pub(crate) audit_log_stats: Arc<QuicAuditStats>,
239 stats: QuicConnectionStatsShared,
240 pub(crate) incoming_ev_sender: mpsc::Sender<Incoming>,
241 incoming_ev_receiver: mpsc::Receiver<Incoming>,
242}
243
244impl<Tx, M> InitialQuicConnection<Tx, M>
245where
246 Tx: DatagramSocketSend + Send + 'static + ?Sized,
247 M: Metrics,
248{
249 #[inline]
250 pub(crate) fn new(params: QuicConnectionParams<Tx, M>) -> Self {
251 let (incoming_ev_sender, incoming_ev_receiver) =
252 mpsc::channel(INCOMING_QUEUE_SIZE);
253 let audit_log_stats = Arc::new(QuicAuditStats::new(params.scid.to_vec()));
254
255 let stats = Arc::new(Mutex::new(QuicConnectionStats::from_conn(
256 ¶ms.quiche_conn,
257 )));
258
259 Self {
260 id: Self::generate_id(),
261 params,
262 audit_log_stats,
263 stats,
264 incoming_ev_sender,
265 incoming_ev_receiver,
266 }
267 }
268
269 /// The local address this connection listens on.
270 pub fn local_addr(&self) -> SocketAddr {
271 self.params.local_addr
272 }
273
274 /// The remote address for this connection.
275 pub fn peer_addr(&self) -> SocketAddr {
276 self.params.peer_addr
277 }
278
279 /// [boring]'s SSL object for this connection.
280 #[doc(hidden)]
281 pub fn ssl_mut(&mut self) -> &mut SslRef {
282 self.params.quiche_conn.as_mut()
283 }
284
285 /// A handle to the [`QuicAuditStats`] for this connection.
286 ///
287 /// # Note
288 /// These stats are updated during the lifetime of the connection.
289 /// The getter exists to grab a handle early on, which can then
290 /// be stowed away and read out after the connection has closed.
291 #[inline]
292 pub fn audit_log_stats(&self) -> Arc<QuicAuditStats> {
293 Arc::clone(&self.audit_log_stats)
294 }
295
296 /// A handle to the [`QuicConnectionStats`] for this connection.
297 ///
298 /// # Note
299 /// Initially, these stats represent the state when the [quiche::Connection]
300 /// was created. They are updated when the connection is closed, so this
301 /// getter exists primarily to grab a handle early on.
302 #[inline]
303 pub fn stats(&self) -> &QuicConnectionStatsShared {
304 &self.stats
305 }
306
307 /// Creates a future to drive the connection's handshake.
308 ///
309 /// This is a lower-level alternative to the `handshake` function which
310 /// gives the caller more control over execution of the future. See
311 /// `handshake` for details on the return values.
312 #[allow(clippy::type_complexity)]
313 pub fn handshake_fut<A: ApplicationOverQuic>(
314 self, app: A,
315 ) -> (
316 QuicConnection,
317 BoxFuture<'static, io::Result<Running<Arc<Tx>, M, A>>>,
318 ) {
319 self.params.metrics.connections_in_memory().inc();
320
321 let conn = QuicConnection {
322 local_addr: self.params.local_addr,
323 peer_addr: self.params.peer_addr,
324 audit_log_stats: Arc::clone(&self.audit_log_stats),
325 stats: Arc::clone(&self.stats),
326 scid: self.params.scid,
327 };
328 let context = ConnectionStageContext {
329 in_pkt: self.params.initial_pkt,
330 incoming_pkt_receiver: self.incoming_ev_receiver,
331 application: app,
332 stats: Arc::clone(&self.stats),
333 };
334 let conn_stage = Handshake {
335 handshake_info: self.params.handshake_info,
336 };
337 let params = IoWorkerParams {
338 socket: MaybeConnectedSocket::new(self.params.socket),
339 shutdown_tx: self.params.shutdown_tx,
340 cfg: self.params.writer_cfg,
341 audit_log_stats: self.audit_log_stats,
342 write_state: WriteState::default(),
343 conn_map_cmd_tx: self.params.conn_map_cmd_tx,
344 #[cfg(feature = "perf-quic-listener-metrics")]
345 init_rx_time: self.params.init_rx_time,
346 metrics: self.params.metrics.clone(),
347 };
348
349 let handshake_fut = async move {
350 let qconn = self.params.quiche_conn;
351 let handshake_done =
352 IoWorker::new(params, conn_stage).run(qconn, context).await;
353
354 match handshake_done {
355 RunningOrClosing::Running(r) => Ok(r),
356 RunningOrClosing::Closing(Closing {
357 params,
358 work_loop_result,
359 mut context,
360 mut qconn,
361 }) => {
362 let hs_result = make_handshake_result(&work_loop_result);
363 IoWorker::new(params, Close { work_loop_result })
364 .close(&mut qconn, &mut context)
365 .await;
366 hs_result
367 },
368 }
369 };
370
371 (conn, Box::pin(handshake_fut))
372 }
373
374 /// Performs the QUIC handshake in a separate tokio task and awaits its
375 /// completion.
376 ///
377 /// The returned [`QuicConnection`] holds metadata about the established
378 /// connection. The connection itself is paused after `handshake`
379 /// returns and must be resumed by passing the opaque `Running` value to
380 /// [`InitialQuicConnection::resume`]. This two-step process
381 /// allows callers to collect telemetry and run code before serving their
382 /// [`ApplicationOverQuic`].
383 pub async fn handshake<A: ApplicationOverQuic>(
384 self, app: A,
385 ) -> io::Result<(QuicConnection, Running<Arc<Tx>, M, A>)> {
386 let task_metrics = self.params.metrics.clone();
387 let (conn, handshake_fut) = Self::handshake_fut(self, app);
388
389 let handshake_handle = crate::metrics::tokio_task::spawn(
390 "quic_handshake_worker",
391 task_metrics,
392 handshake_fut,
393 );
394
395 // `AbortOnDropHandle` simulates task-killswitch behavior without needing
396 // to give up ownership of the `JoinHandle`.
397 let handshake_abort_handle = AbortOnDropHandle::new(handshake_handle);
398
399 let worker = handshake_abort_handle.await??;
400
401 Ok((conn, worker))
402 }
403
404 /// Resumes a QUIC connection which was paused after a successful handshake.
405 pub fn resume<A: ApplicationOverQuic>(pre_running: Running<Arc<Tx>, M, A>) {
406 let task_metrics = pre_running.params.metrics.clone();
407 let fut = async move {
408 let Running {
409 params,
410 context,
411 qconn,
412 } = pre_running;
413 let running_worker = IoWorker::new(params, RunningApplication);
414
415 let Closing {
416 params,
417 mut context,
418 work_loop_result,
419 mut qconn,
420 } = running_worker.run(qconn, context).await;
421
422 IoWorker::new(params, Close { work_loop_result })
423 .close(&mut qconn, &mut context)
424 .await;
425 };
426
427 crate::metrics::tokio_task::spawn_with_killswitch(
428 "quic_io_worker",
429 task_metrics,
430 fut,
431 );
432 }
433
434 /// Drives a QUIC connection from handshake to close in separate tokio
435 /// tasks.
436 ///
437 /// It combines [`InitialQuicConnection::handshake`] and
438 /// [`InitialQuicConnection::resume`] into a single call.
439 pub fn start<A: ApplicationOverQuic>(self, app: A) -> QuicConnection {
440 let task_metrics = self.params.metrics.clone();
441 let (conn, handshake_fut) = Self::handshake_fut(self, app);
442
443 let fut = async move {
444 match handshake_fut.await {
445 Ok(running) => Self::resume(running),
446 Err(e) => {
447 log::error!("QUIC handshake failed in IQC::start"; "error" => e)
448 },
449 }
450 };
451
452 crate::metrics::tokio_task::spawn_with_killswitch(
453 "quic_handshake_worker",
454 task_metrics,
455 fut,
456 );
457
458 conn
459 }
460
461 fn generate_id() -> u64 {
462 let mut buf = [0; 8];
463
464 boring::rand::rand_bytes(&mut buf).unwrap();
465
466 u64::from_ne_bytes(buf)
467 }
468}
469
470pub(crate) struct QuicConnectionParams<Tx, M>
471where
472 Tx: DatagramSocketSend + Send + 'static + ?Sized,
473 M: Metrics,
474{
475 pub writer_cfg: WriterConfig,
476 pub initial_pkt: Option<Incoming>,
477 pub shutdown_tx: mpsc::Sender<()>,
478 pub conn_map_cmd_tx: mpsc::UnboundedSender<ConnectionMapCommand>, /* channel that signals connection map changes */
479 pub scid: ConnectionId<'static>,
480 pub metrics: M,
481 #[cfg(feature = "perf-quic-listener-metrics")]
482 pub init_rx_time: Option<SystemTime>,
483 pub handshake_info: HandshakeInfo,
484 pub quiche_conn: QuicheConnection,
485 pub socket: Arc<Tx>,
486 pub local_addr: SocketAddr,
487 pub peer_addr: SocketAddr,
488}
489
490/// Metadata about an established QUIC connection.
491///
492/// While this struct allows access to some facets of a QUIC connection, it
493/// notably does not represent the [quiche::Connection] itself. The crate
494/// handles most interactions with [quiche] internally in a worker task. Users
495/// can only access the connection directly via their [`ApplicationOverQuic`]
496/// implementation.
497///
498/// See the [module-level docs](crate::quic) for an overview of how a QUIC
499/// connection is handled internally.
500pub struct QuicConnection {
501 local_addr: SocketAddr,
502 peer_addr: SocketAddr,
503 audit_log_stats: Arc<QuicAuditStats>,
504 stats: QuicConnectionStatsShared,
505 scid: ConnectionId<'static>,
506}
507
508impl QuicConnection {
509 /// The local address this connection listens on.
510 #[inline]
511 pub fn local_addr(&self) -> SocketAddr {
512 self.local_addr
513 }
514
515 /// The remote address for this connection.
516 #[inline]
517 pub fn peer_addr(&self) -> SocketAddr {
518 self.peer_addr
519 }
520
521 /// A handle to the [`QuicAuditStats`] for this connection.
522 ///
523 /// # Note
524 /// These stats are updated during the lifetime of the connection.
525 /// The getter exists to grab a handle early on, which can then
526 /// be stowed away and read out after the connection has closed.
527 #[inline]
528 pub fn audit_log_stats(&self) -> &Arc<QuicAuditStats> {
529 &self.audit_log_stats
530 }
531
532 /// A handle to the [`QuicConnectionStats`] for this connection.
533 ///
534 /// # Note
535 /// Initially, these stats represent the state when the [quiche::Connection]
536 /// was created. They are updated when the connection is closed, so this
537 /// getter exists primarily to grab a handle early on.
538 #[inline]
539 pub fn stats(&self) -> &QuicConnectionStatsShared {
540 &self.stats
541 }
542
543 /// The QUIC source connection ID used by this connection.
544 #[inline]
545 pub fn scid(&self) -> &ConnectionId<'static> {
546 &self.scid
547 }
548}
549
550impl AsSocketStats for QuicConnection {
551 #[inline]
552 fn as_socket_stats(&self) -> SocketStats {
553 // It is important to note that those stats are only updated when
554 // the connection stops, which is fine, since this is only used to
555 // log after the connection is finished.
556 self.stats.lock().unwrap().as_socket_stats()
557 }
558
559 #[inline]
560 fn as_quic_stats(&self) -> Option<&Arc<QuicAuditStats>> {
561 Some(&self.audit_log_stats)
562 }
563}
564
565impl<Tx, M> AsSocketStats for InitialQuicConnection<Tx, M>
566where
567 Tx: DatagramSocketSend + Send + 'static + ?Sized,
568 M: Metrics,
569{
570 #[inline]
571 fn as_socket_stats(&self) -> SocketStats {
572 // It is important to note that those stats are only updated when
573 // the connection stops, which is fine, since this is only used to
574 // log after the connection is finished.
575 self.stats.lock().unwrap().as_socket_stats()
576 }
577
578 #[inline]
579 fn as_quic_stats(&self) -> Option<&Arc<QuicAuditStats>> {
580 Some(&self.audit_log_stats)
581 }
582}
583
584impl<Tx, M> ShutdownConnection for InitialQuicConnection<Tx, M>
585where
586 Tx: DatagramSocketSend + Send + 'static + ?Sized,
587 M: Metrics,
588{
589 #[inline]
590 fn poll_shutdown(
591 &mut self, _cx: &mut std::task::Context,
592 ) -> std::task::Poll<io::Result<()>> {
593 // TODO: Does nothing at the moment. We always call Self::start
594 // anyway so it's not really important at this moment.
595 Poll::Ready(Ok(()))
596 }
597}
598
599impl ShutdownConnection for QuicConnection {
600 #[inline]
601 fn poll_shutdown(
602 &mut self, _cx: &mut std::task::Context,
603 ) -> std::task::Poll<io::Result<()>> {
604 // TODO: does nothing at the moment
605 Poll::Ready(Ok(()))
606 }
607}
608
609/// Details about a connection's QUIC handshake.
610#[derive(Debug, Clone)]
611pub struct HandshakeInfo {
612 /// The time at which the connection was created.
613 start_time: Instant,
614 /// The timeout before which the handshake must complete.
615 timeout: Option<Duration>,
616 /// The real duration that the handshake took to complete.
617 time_handshake: Option<Duration>,
618}
619
620impl HandshakeInfo {
621 pub(crate) fn new(start_time: Instant, timeout: Option<Duration>) -> Self {
622 Self {
623 start_time,
624 timeout,
625 time_handshake: None,
626 }
627 }
628
629 /// The time at which the connection was created.
630 #[inline]
631 pub fn start_time(&self) -> Instant {
632 self.start_time
633 }
634
635 /// How long the handshake took to complete.
636 #[inline]
637 pub fn elapsed(&self) -> Duration {
638 self.time_handshake.unwrap_or_default()
639 }
640
641 pub(crate) fn set_elapsed(&mut self) {
642 let elapsed = self.start_time.elapsed();
643 self.time_handshake = Some(elapsed)
644 }
645
646 pub(crate) fn deadline(&self) -> Option<Instant> {
647 self.timeout.map(|timeout| self.start_time + timeout)
648 }
649
650 pub(crate) fn is_expired(&self) -> bool {
651 self.timeout
652 .is_some_and(|timeout| self.start_time.elapsed() >= timeout)
653 }
654}
655
656/// A trait to implement an application served over QUIC.
657///
658/// The application is driven by an internal worker task, which also handles I/O
659/// for the connection. The worker feeds inbound packets into the
660/// [quiche::Connection], calls [`ApplicationOverQuic::process_reads`] followed
661/// by [`ApplicationOverQuic::process_writes`], and then flushes any pending
662/// outbound packets to the network. This repeats in a loop until either the
663/// connection is closed or the [`ApplicationOverQuic`] returns an error.
664///
665/// In between loop iterations, the worker yields until a new packet arrives, a
666/// timer expires, or [`ApplicationOverQuic::wait_for_data`] resolves.
667/// Implementors can interact with the underlying connection via the mutable
668/// reference passed to trait methods.
669#[allow(unused_variables)] // for default functions
670pub trait ApplicationOverQuic: Send + 'static {
671 /// Callback to customize the [`ApplicationOverQuic`] after the QUIC
672 /// handshake completed successfully.
673 ///
674 /// # Errors
675 /// Returning an error from this method immediately stops the worker loop
676 /// and transitions to the connection closing stage.
677 fn on_conn_established(
678 &mut self, qconn: &mut QuicheConnection, handshake_info: &HandshakeInfo,
679 ) -> QuicResult<()>;
680
681 /// Determines whether the application's methods will be called by the
682 /// worker.
683 ///
684 /// The function is checked in each iteration of the worker loop. Only
685 /// `on_conn_established()` and `buffer()` bypass this check.
686 fn should_act(&self) -> bool;
687
688 /// A borrowed buffer for the worker to write outbound packets into.
689 ///
690 /// This method allows sharing a buffer between the worker and the
691 /// application, efficiently using the allocated memory while the
692 /// application is inactive. It can also be used to artificially
693 /// restrict the size of outbound network packets.
694 ///
695 /// Any data in the buffer may be overwritten by the worker. If necessary,
696 /// the application should save the contents when this method is called.
697 fn buffer(&mut self) -> &mut [u8];
698
699 /// Waits for an event to trigger the next iteration of the worker loop.
700 ///
701 /// The returned future is awaited in parallel to inbound packets and the
702 /// connection's timers. Any one of those futures resolving triggers the
703 /// next loop iteration, so implementations should not rely on
704 /// `wait_for_data` for the bulk of their processing. Instead, after
705 /// `wait_for_data` resolves, `process_writes` should be used to pull all
706 /// available data out of the event source (for example, a channel).
707 ///
708 /// As for any future, it is **very important** that this method does not
709 /// block the runtime. If it does, the other concurrent futures will be
710 /// starved.
711 ///
712 /// # Errors
713 /// Returning an error from this method immediately stops the worker loop
714 /// and transitions to the connection closing stage.
715 fn wait_for_data(
716 &mut self, qconn: &mut QuicheConnection,
717 ) -> impl Future<Output = QuicResult<()>> + Send;
718
719 /// Processes data received on the connection.
720 ///
721 /// This method is only called if `should_act()` returns `true` and any
722 /// packets were received since the last worker loop iteration. It
723 /// should be used to read from the connection's open streams.
724 ///
725 /// # Errors
726 /// Returning an error from this method immediately stops the worker loop
727 /// and transitions to the connection closing stage.
728 fn process_reads(&mut self, qconn: &mut QuicheConnection) -> QuicResult<()>;
729
730 /// Adds data to be sent on the connection.
731 ///
732 /// Unlike `process_reads`, this method is called on every iteration of the
733 /// worker loop (provided `should_act()` returns true). It is called
734 /// after `process_reads` and immediately before packets are pushed to
735 /// the socket. The main use case is providing already-buffered data to
736 /// the [quiche::Connection].
737 ///
738 /// # Errors
739 /// Returning an error from this method immediately stops the worker loop
740 /// and transitions to the connection closing stage.
741 fn process_writes(&mut self, qconn: &mut QuicheConnection) -> QuicResult<()>;
742
743 /// Callback to inspect the result of the worker task, before a final packet
744 /// with a `CONNECTION_CLOSE` frame is flushed to the network.
745 ///
746 /// `connection_result` is [`Ok`] only if the connection was closed without
747 /// any local error. Otherwise, the state of `qconn` depends on the
748 /// error type and application behavior.
749 fn on_conn_close<M: Metrics>(
750 &mut self, qconn: &mut QuicheConnection, metrics: &M,
751 connection_result: &QuicResult<()>,
752 ) {
753 }
754}
755
756/// A command to execute on a [quiche::Connection] in the context of an
757/// [`ApplicationOverQuic`].
758///
759/// We expect most [`ApplicationOverQuic`] implementations (such as
760/// [H3Driver](crate::http3::driver::H3Driver)) will provide some way to submit
761/// actions for them to take, for example via a channel. This enum may be
762/// accepted as part of those actions to inspect or alter the state of the
763/// underlying connection.
764pub enum QuicCommand {
765 /// Close the connection with the given parameters.
766 ///
767 /// Some packets may still be sent after this command has been executed, so
768 /// the worker task may continue running for a bit. See
769 /// [`quiche::Connection::close`] for details.
770 ConnectionClose(ConnectionShutdownBehaviour),
771 /// Execute a custom callback on the connection.
772 Custom(Box<dyn FnOnce(&mut QuicheConnection) + Send + 'static>),
773 /// Collect the current [`SocketStats`] from the connection.
774 ///
775 /// Unlike [`QuicConnection::stats()`], these statistics are not cached and
776 /// instead are retrieved right before the command is executed.
777 Stats(Box<dyn FnOnce(datagram_socket::SocketStats) + Send + 'static>),
778}
779
780impl QuicCommand {
781 /// Consume the command and perform its operation on `qconn`.
782 ///
783 /// This method should be called by [`ApplicationOverQuic`] implementations
784 /// when they receive a [`QuicCommand`] to execute.
785 pub fn execute(self, qconn: &mut QuicheConnection) {
786 match self {
787 Self::ConnectionClose(behavior) => {
788 let ConnectionShutdownBehaviour {
789 send_application_close,
790 error_code,
791 reason,
792 } = behavior;
793
794 let _ = qconn.close(send_application_close, error_code, &reason);
795 },
796 Self::Custom(f) => {
797 (f)(qconn);
798 },
799 Self::Stats(callback) => {
800 let stats_pair = QuicConnectionStats::from_conn(qconn);
801 (callback)(stats_pair.as_socket_stats());
802 },
803 }
804 }
805}
806
807impl fmt::Debug for QuicCommand {
808 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
809 match self {
810 Self::ConnectionClose(b) =>
811 f.debug_tuple("ConnectionClose").field(b).finish(),
812 Self::Custom(_) => f.debug_tuple("Custom").finish_non_exhaustive(),
813 Self::Stats(_) => f.debug_tuple("Stats").finish_non_exhaustive(),
814 }
815 }
816}
817
818/// Parameters to close a [quiche::Connection].
819///
820/// The connection will use these parameters for the `CONNECTION_CLOSE` frame
821/// it sends to its peer.
822#[derive(Debug, Clone)]
823pub struct ConnectionShutdownBehaviour {
824 /// Whether to send an application close or a regular close to the peer.
825 ///
826 /// If this is true but the connection is not in a state where it is safe to
827 /// send an application error (not established nor in early data), in
828 /// accordance with [RFC 9000](https://www.rfc-editor.org/rfc/rfc9000.html#section-10.2.3-3), the
829 /// error code is changed to `APPLICATION_ERROR` and the reason phrase is
830 /// cleared.
831 pub send_application_close: bool,
832 /// The [QUIC][proto-err] or [application-level][app-err] error code to send
833 /// to the peer.
834 ///
835 /// [proto-err]: https://www.rfc-editor.org/rfc/rfc9000.html#section-20.1
836 /// [app-err]: https://www.rfc-editor.org/rfc/rfc9000.html#section-20.2
837 pub error_code: u64,
838 /// The reason phrase to send to the peer.
839 pub reason: Vec<u8>,
840}