datagram_socket/
socket_stats.rs

1// Copyright (C) 2025, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27use std::ops::Deref;
28use std::sync::atomic::AtomicI64;
29use std::sync::atomic::AtomicU64;
30use std::sync::atomic::AtomicU8;
31use std::sync::atomic::Ordering;
32use std::sync::Arc;
33#[cfg(target_os = "linux")]
34use std::sync::OnceLock;
35use std::sync::RwLock;
36use std::time::Duration;
37use std::time::SystemTime;
38
39pub trait AsSocketStats {
40    fn as_socket_stats(&self) -> SocketStats;
41
42    fn as_quic_stats(&self) -> Option<&Arc<QuicAuditStats>> {
43        None
44    }
45}
46
47#[derive(Debug, Clone, Copy, Default)]
48pub struct SocketStats {
49    pub pmtu: u16,
50    pub rtt_us: i64,
51    pub min_rtt_us: i64,
52    pub max_rtt_us: i64,
53    pub rtt_var_us: i64,
54    pub cwnd: u64,
55    pub total_pto_count: u64,
56    pub packets_sent: u64,
57    pub packets_recvd: u64,
58    pub packets_lost: u64,
59    pub packets_lost_spurious: u64,
60    pub packets_retrans: u64,
61    pub bytes_sent: u64,
62    pub bytes_recvd: u64,
63    pub bytes_lost: u64,
64    pub bytes_retrans: u64,
65    pub bytes_unsent: u64,
66    pub delivery_rate: u64,
67    pub max_bandwidth: Option<u64>,
68    pub startup_exit: Option<StartupExit>,
69    pub bytes_in_flight_duration_us: u64,
70}
71
72/// Statistics from when a CCA first exited the startup phase.
73#[derive(Debug, Clone, Copy, PartialEq)]
74pub struct StartupExit {
75    pub cwnd: usize,
76    pub bandwidth: Option<u64>,
77    pub reason: StartupExitReason,
78}
79
80/// The reason a CCA exited the startup phase.
81#[derive(Debug, Clone, Copy, PartialEq)]
82pub enum StartupExitReason {
83    /// Exit startup due to excessive loss
84    Loss,
85
86    /// Exit startup due to bandwidth plateau.
87    BandwidthPlateau,
88
89    /// Exit startup due to persistent queue.
90    PersistentQueue,
91}
92
93type BoxError = Box<dyn std::error::Error + Send + Sync>;
94
95#[derive(Debug)]
96pub struct QuicAuditStats {
97    /// A transport-level connection error code received from the client.
98    recvd_conn_close_transport_error_code: AtomicI64,
99    /// A transport-level connection error code sent to the client.
100    sent_conn_close_transport_error_code: AtomicI64,
101    /// An application-level connection error code received from the client.
102    recvd_conn_close_application_error_code: AtomicI64,
103    /// An application-level connection error code sent to the client.
104    sent_conn_close_application_error_code: AtomicI64,
105    /// Time taken for the QUIC handshake in microseconds.
106    transport_handshake_duration_us: AtomicI64,
107    /// The start time of the handshake.
108    transport_handshake_start: Arc<RwLock<Option<SystemTime>>>,
109    /// The reason the QUIC connection was closed
110    connection_close_reason: RwLock<Option<BoxError>>,
111    /// Max recorded bandwidth.
112    max_bandwidth: AtomicU64,
113    /// Loss at max recorded bandwidth.
114    max_loss_pct: AtomicU8,
115    /// The value of the first `SO_RECVMARK` control message received for the
116    /// connection.
117    ///
118    /// Linux-only.
119    #[cfg(target_os = "linux")]
120    initial_so_mark: OnceLock<[u8; 4]>,
121    /// The server's chosen QUIC connection ID.
122    ///
123    /// The QUIC connection ID is presently an array of 20 bytes (160 bits)
124    pub quic_connection_id: Vec<u8>,
125}
126
127impl QuicAuditStats {
128    #[inline]
129    pub fn new(quic_connection_id: Vec<u8>) -> Self {
130        Self {
131            recvd_conn_close_transport_error_code: AtomicI64::new(-1),
132            sent_conn_close_transport_error_code: AtomicI64::new(-1),
133            recvd_conn_close_application_error_code: AtomicI64::new(-1),
134            sent_conn_close_application_error_code: AtomicI64::new(-1),
135            transport_handshake_duration_us: AtomicI64::new(-1),
136            transport_handshake_start: Arc::new(RwLock::new(None)),
137            connection_close_reason: RwLock::new(None),
138            max_bandwidth: AtomicU64::new(0),
139            max_loss_pct: AtomicU8::new(0),
140            #[cfg(target_os = "linux")]
141            initial_so_mark: OnceLock::new(),
142            quic_connection_id,
143        }
144    }
145
146    #[inline]
147    pub fn recvd_conn_close_transport_error_code(&self) -> i64 {
148        self.recvd_conn_close_transport_error_code
149            .load(Ordering::SeqCst)
150    }
151
152    #[inline]
153    pub fn sent_conn_close_transport_error_code(&self) -> i64 {
154        self.sent_conn_close_transport_error_code
155            .load(Ordering::SeqCst)
156    }
157
158    #[inline]
159    pub fn recvd_conn_close_application_error_code(&self) -> i64 {
160        self.recvd_conn_close_application_error_code
161            .load(Ordering::SeqCst)
162    }
163
164    #[inline]
165    pub fn sent_conn_close_application_error_code(&self) -> i64 {
166        self.sent_conn_close_application_error_code
167            .load(Ordering::SeqCst)
168    }
169
170    #[inline]
171    pub fn set_recvd_conn_close_transport_error_code(
172        &self, recvd_conn_close_transport_error_code: i64,
173    ) {
174        self.recvd_conn_close_transport_error_code
175            .store(recvd_conn_close_transport_error_code, Ordering::SeqCst)
176    }
177
178    #[inline]
179    pub fn set_sent_conn_close_transport_error_code(
180        &self, sent_conn_close_transport_error_code: i64,
181    ) {
182        self.sent_conn_close_transport_error_code
183            .store(sent_conn_close_transport_error_code, Ordering::SeqCst)
184    }
185
186    #[inline]
187    pub fn set_recvd_conn_close_application_error_code(
188        &self, recvd_conn_close_application_error_code: i64,
189    ) {
190        self.recvd_conn_close_application_error_code
191            .store(recvd_conn_close_application_error_code, Ordering::SeqCst)
192    }
193
194    #[inline]
195    pub fn set_sent_conn_close_application_error_code(
196        &self, sent_conn_close_application_error_code: i64,
197    ) {
198        self.sent_conn_close_application_error_code
199            .store(sent_conn_close_application_error_code, Ordering::SeqCst)
200    }
201
202    #[inline]
203    pub fn transport_handshake_duration_us(&self) -> i64 {
204        self.transport_handshake_duration_us.load(Ordering::SeqCst)
205    }
206
207    #[inline]
208    pub fn set_transport_handshake_start(&self, start_time: SystemTime) {
209        *self.transport_handshake_start.write().unwrap() = Some(start_time);
210    }
211
212    #[inline]
213    pub fn set_transport_handshake_duration(&self, duration: Duration) {
214        let dur = i64::try_from(duration.as_micros()).unwrap_or(-1);
215        self.transport_handshake_duration_us
216            .store(dur, Ordering::SeqCst);
217    }
218
219    #[inline]
220    pub fn transport_handshake_start(&self) -> Arc<RwLock<Option<SystemTime>>> {
221        Arc::clone(&self.transport_handshake_start)
222    }
223
224    #[inline]
225    pub fn connection_close_reason(
226        &self,
227    ) -> impl Deref<Target = Option<BoxError>> + '_ {
228        self.connection_close_reason.read().unwrap()
229    }
230
231    #[inline]
232    pub fn set_connection_close_reason(&self, error: BoxError) {
233        *self.connection_close_reason.write().unwrap() = Some(error);
234    }
235
236    #[inline]
237    pub fn set_max_bandwidth(&self, max_bandwidth: u64) {
238        self.max_bandwidth.store(max_bandwidth, Ordering::Release)
239    }
240
241    #[inline]
242    pub fn max_bandwidth(&self) -> u64 {
243        self.max_bandwidth.load(Ordering::Acquire)
244    }
245
246    #[inline]
247    pub fn set_max_loss_pct(&self, max_loss_pct: u8) {
248        self.max_loss_pct.store(max_loss_pct, Ordering::Release)
249    }
250
251    #[inline]
252    pub fn max_loss_pct(&self) -> u8 {
253        self.max_loss_pct.load(Ordering::Acquire)
254    }
255
256    #[inline]
257    #[cfg(target_os = "linux")]
258    pub fn set_initial_so_mark_data(&self, value: Option<[u8; 4]>) {
259        if let Some(inner) = value {
260            let _ = self.initial_so_mark.set(inner);
261        }
262    }
263
264    #[inline]
265    #[cfg(target_os = "linux")]
266    pub fn initial_so_mark_data(&self) -> Option<&[u8; 4]> {
267        self.initial_so_mark.get()
268    }
269}
270
271#[derive(Debug, Copy, Clone, PartialEq, Eq)]
272pub enum StreamClosureKind {
273    None,
274    Implicit,
275    Explicit,
276}