datagram_socket/
socket_stats.rs

1// Copyright (C) 2025, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27use std::ops::Deref;
28use std::sync::atomic::AtomicI64;
29use std::sync::atomic::AtomicU64;
30use std::sync::atomic::AtomicU8;
31use std::sync::atomic::Ordering;
32use std::sync::Arc;
33use std::sync::RwLock;
34use std::time::Duration;
35use std::time::SystemTime;
36
37pub trait AsSocketStats {
38    fn as_socket_stats(&self) -> SocketStats;
39
40    fn as_quic_stats(&self) -> Option<&Arc<QuicAuditStats>> {
41        None
42    }
43}
44
45#[derive(Debug, Clone, Copy, Default)]
46pub struct SocketStats {
47    pub pmtu: u16,
48    pub rtt_us: i64,
49    pub min_rtt_us: i64,
50    pub max_rtt_us: i64,
51    pub rtt_var_us: i64,
52    pub cwnd: u64,
53    pub total_pto_count: u64,
54    pub packets_sent: u64,
55    pub packets_recvd: u64,
56    pub packets_lost: u64,
57    pub packets_lost_spurious: u64,
58    pub packets_retrans: u64,
59    pub bytes_sent: u64,
60    pub bytes_recvd: u64,
61    pub bytes_lost: u64,
62    pub bytes_retrans: u64,
63    pub bytes_unsent: u64,
64    pub delivery_rate: u64,
65    pub max_bandwidth: Option<u64>,
66    pub startup_exit: Option<StartupExit>,
67    pub bytes_in_flight_duration_us: u64,
68}
69
70/// Statistics from when a CCA first exited the startup phase.
71#[derive(Debug, Clone, Copy, PartialEq)]
72pub struct StartupExit {
73    pub cwnd: usize,
74    pub bandwidth: Option<u64>,
75    pub reason: StartupExitReason,
76}
77
78/// The reason a CCA exited the startup phase.
79#[derive(Debug, Clone, Copy, PartialEq)]
80pub enum StartupExitReason {
81    /// Exit startup due to excessive loss
82    Loss,
83
84    /// Exit startup due to bandwidth plateau.
85    BandwidthPlateau,
86
87    /// Exit startup due to persistent queue.
88    PersistentQueue,
89}
90
91type BoxError = Box<dyn std::error::Error + Send + Sync>;
92
93#[derive(Debug)]
94pub struct QuicAuditStats {
95    /// A transport-level connection error code received from the client.
96    recvd_conn_close_transport_error_code: AtomicI64,
97    /// A transport-level connection error code sent to the client.
98    sent_conn_close_transport_error_code: AtomicI64,
99    /// An application-level connection error code received from the client.
100    recvd_conn_close_application_error_code: AtomicI64,
101    /// An application-level connection error code sent to the client.
102    sent_conn_close_application_error_code: AtomicI64,
103    /// Time taken for the QUIC handshake in microseconds.
104    transport_handshake_duration_us: AtomicI64,
105    /// The start time of the handshake.
106    transport_handshake_start: Arc<RwLock<Option<SystemTime>>>,
107    /// The reason the QUIC connection was closed
108    connection_close_reason: RwLock<Option<BoxError>>,
109    /// Max recorded bandwidth.
110    max_bandwidth: AtomicU64,
111    /// Loss at max recorded bandwidth.
112    max_loss_pct: AtomicU8,
113    /// The server's chosen QUIC connection ID.
114    ///
115    /// The QUIC connection ID is presently an array of 20 bytes (160 bits)
116    pub quic_connection_id: Vec<u8>,
117}
118
119impl QuicAuditStats {
120    #[inline]
121    pub fn new(quic_connection_id: Vec<u8>) -> Self {
122        Self {
123            recvd_conn_close_transport_error_code: AtomicI64::new(-1),
124            sent_conn_close_transport_error_code: AtomicI64::new(-1),
125            recvd_conn_close_application_error_code: AtomicI64::new(-1),
126            sent_conn_close_application_error_code: AtomicI64::new(-1),
127            transport_handshake_duration_us: AtomicI64::new(-1),
128            transport_handshake_start: Arc::new(RwLock::new(None)),
129            connection_close_reason: RwLock::new(None),
130            max_bandwidth: AtomicU64::new(0),
131            max_loss_pct: AtomicU8::new(0),
132            quic_connection_id,
133        }
134    }
135
136    #[inline]
137    pub fn recvd_conn_close_transport_error_code(&self) -> i64 {
138        self.recvd_conn_close_transport_error_code
139            .load(Ordering::SeqCst)
140    }
141
142    #[inline]
143    pub fn sent_conn_close_transport_error_code(&self) -> i64 {
144        self.sent_conn_close_transport_error_code
145            .load(Ordering::SeqCst)
146    }
147
148    #[inline]
149    pub fn recvd_conn_close_application_error_code(&self) -> i64 {
150        self.recvd_conn_close_application_error_code
151            .load(Ordering::SeqCst)
152    }
153
154    #[inline]
155    pub fn sent_conn_close_application_error_code(&self) -> i64 {
156        self.sent_conn_close_application_error_code
157            .load(Ordering::SeqCst)
158    }
159
160    #[inline]
161    pub fn set_recvd_conn_close_transport_error_code(
162        &self, recvd_conn_close_transport_error_code: i64,
163    ) {
164        self.recvd_conn_close_transport_error_code
165            .store(recvd_conn_close_transport_error_code, Ordering::SeqCst)
166    }
167
168    #[inline]
169    pub fn set_sent_conn_close_transport_error_code(
170        &self, sent_conn_close_transport_error_code: i64,
171    ) {
172        self.sent_conn_close_transport_error_code
173            .store(sent_conn_close_transport_error_code, Ordering::SeqCst)
174    }
175
176    #[inline]
177    pub fn set_recvd_conn_close_application_error_code(
178        &self, recvd_conn_close_application_error_code: i64,
179    ) {
180        self.recvd_conn_close_application_error_code
181            .store(recvd_conn_close_application_error_code, Ordering::SeqCst)
182    }
183
184    #[inline]
185    pub fn set_sent_conn_close_application_error_code(
186        &self, sent_conn_close_application_error_code: i64,
187    ) {
188        self.sent_conn_close_application_error_code
189            .store(sent_conn_close_application_error_code, Ordering::SeqCst)
190    }
191
192    #[inline]
193    pub fn transport_handshake_duration_us(&self) -> i64 {
194        self.transport_handshake_duration_us.load(Ordering::SeqCst)
195    }
196
197    #[inline]
198    pub fn set_transport_handshake_start(&self, start_time: SystemTime) {
199        *self.transport_handshake_start.write().unwrap() = Some(start_time);
200    }
201
202    #[inline]
203    pub fn set_transport_handshake_duration(&self, duration: Duration) {
204        let dur = i64::try_from(duration.as_micros()).unwrap_or(-1);
205        self.transport_handshake_duration_us
206            .store(dur, Ordering::SeqCst);
207    }
208
209    #[inline]
210    pub fn transport_handshake_start(&self) -> Arc<RwLock<Option<SystemTime>>> {
211        Arc::clone(&self.transport_handshake_start)
212    }
213
214    #[inline]
215    pub fn connection_close_reason(
216        &self,
217    ) -> impl Deref<Target = Option<BoxError>> + '_ {
218        self.connection_close_reason.read().unwrap()
219    }
220
221    #[inline]
222    pub fn set_connection_close_reason(&self, error: BoxError) {
223        *self.connection_close_reason.write().unwrap() = Some(error);
224    }
225
226    #[inline]
227    pub fn set_max_bandwidth(&self, max_bandwidth: u64) {
228        self.max_bandwidth.store(max_bandwidth, Ordering::Release)
229    }
230
231    #[inline]
232    pub fn max_bandwidth(&self) -> u64 {
233        self.max_bandwidth.load(Ordering::Acquire)
234    }
235
236    #[inline]
237    pub fn set_max_loss_pct(&self, max_loss_pct: u8) {
238        self.max_loss_pct.store(max_loss_pct, Ordering::Release)
239    }
240
241    #[inline]
242    pub fn max_loss_pct(&self) -> u8 {
243        self.max_loss_pct.load(Ordering::Acquire)
244    }
245}
246
247#[derive(Debug, Copy, Clone, PartialEq, Eq)]
248pub enum StreamClosureKind {
249    None,
250    Implicit,
251    Explicit,
252}