Skip to main content

datagram_socket/
socket_stats.rs

1// Copyright (C) 2025, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27use std::ops::Deref;
28use std::sync::atomic::AtomicI64;
29use std::sync::atomic::AtomicU64;
30use std::sync::atomic::AtomicU8;
31use std::sync::atomic::Ordering;
32use std::sync::Arc;
33#[cfg(target_os = "linux")]
34use std::sync::OnceLock;
35use std::sync::RwLock;
36use std::time::Duration;
37use std::time::SystemTime;
38
39pub trait AsSocketStats {
40    fn as_socket_stats(&self) -> SocketStats;
41
42    fn as_quic_stats(&self) -> Option<&Arc<QuicAuditStats>> {
43        None
44    }
45}
46
47#[derive(Debug, Clone, Copy, Default)]
48pub struct SocketStats {
49    pub pmtu: u16,
50    pub rtt_us: i64,
51    pub min_rtt_us: i64,
52    pub max_rtt_us: i64,
53    pub rtt_var_us: i64,
54    pub cwnd: u64,
55    pub total_pto_count: u64,
56    pub packets_sent: u64,
57    pub packets_recvd: u64,
58    pub packets_lost: u64,
59    pub packets_lost_spurious: u64,
60    pub packets_retrans: u64,
61    pub bytes_sent: u64,
62    pub bytes_recvd: u64,
63    pub bytes_lost: u64,
64    pub bytes_retrans: u64,
65    pub bytes_unsent: u64,
66    pub delivery_rate: u64,
67    pub max_bandwidth: Option<u64>,
68    pub startup_exit: Option<StartupExit>,
69    pub bytes_in_flight_duration_us: u64,
70}
71
72/// Statistics from when a CCA first exited the startup phase.
73#[derive(Debug, Clone, Copy, PartialEq)]
74pub struct StartupExit {
75    pub cwnd: usize,
76    pub bandwidth: Option<u64>,
77    pub reason: StartupExitReason,
78}
79
80/// The reason a CCA exited the startup phase.
81#[derive(Debug, Clone, Copy, PartialEq)]
82pub enum StartupExitReason {
83    /// Exit slow start or BBR startup due to excessive loss
84    Loss,
85
86    /// Exit BBR startup due to bandwidth plateau.
87    BandwidthPlateau,
88
89    /// Exit BBR startup due to persistent queue.
90    PersistentQueue,
91
92    /// Exit HyStart++ conservative slow start after the max rounds allowed.
93    ConservativeSlowStartRounds,
94}
95
96type BoxError = Box<dyn std::error::Error + Send + Sync>;
97
98#[derive(Debug)]
99pub struct QuicAuditStats {
100    /// A transport-level connection error code received from the client.
101    recvd_conn_close_transport_error_code: AtomicI64,
102    /// A transport-level connection error code sent to the client.
103    sent_conn_close_transport_error_code: AtomicI64,
104    /// An application-level connection error code received from the client.
105    recvd_conn_close_application_error_code: AtomicI64,
106    /// An application-level connection error code sent to the client.
107    sent_conn_close_application_error_code: AtomicI64,
108    /// Time taken for the QUIC handshake in microseconds.
109    transport_handshake_duration_us: AtomicI64,
110    /// The start time of the handshake.
111    transport_handshake_start: Arc<RwLock<Option<SystemTime>>>,
112    /// The reason the QUIC connection was closed
113    connection_close_reason: RwLock<Option<BoxError>>,
114    /// Max recorded bandwidth.
115    max_bandwidth: AtomicU64,
116    /// Loss at max recorded bandwidth.
117    max_loss_pct: AtomicU8,
118    /// The value of the first `SO_RECVMARK` control message received for the
119    /// connection.
120    ///
121    /// Linux-only.
122    #[cfg(target_os = "linux")]
123    initial_so_mark: OnceLock<[u8; 4]>,
124    /// The server's chosen QUIC connection ID.
125    ///
126    /// The QUIC connection ID is presently an array of 20 bytes (160 bits)
127    pub quic_connection_id: Vec<u8>,
128}
129
130impl QuicAuditStats {
131    #[inline]
132    pub fn new(quic_connection_id: Vec<u8>) -> Self {
133        Self {
134            recvd_conn_close_transport_error_code: AtomicI64::new(-1),
135            sent_conn_close_transport_error_code: AtomicI64::new(-1),
136            recvd_conn_close_application_error_code: AtomicI64::new(-1),
137            sent_conn_close_application_error_code: AtomicI64::new(-1),
138            transport_handshake_duration_us: AtomicI64::new(-1),
139            transport_handshake_start: Arc::new(RwLock::new(None)),
140            connection_close_reason: RwLock::new(None),
141            max_bandwidth: AtomicU64::new(0),
142            max_loss_pct: AtomicU8::new(0),
143            #[cfg(target_os = "linux")]
144            initial_so_mark: OnceLock::new(),
145            quic_connection_id,
146        }
147    }
148
149    #[inline]
150    pub fn recvd_conn_close_transport_error_code(&self) -> i64 {
151        self.recvd_conn_close_transport_error_code
152            .load(Ordering::SeqCst)
153    }
154
155    #[inline]
156    pub fn sent_conn_close_transport_error_code(&self) -> i64 {
157        self.sent_conn_close_transport_error_code
158            .load(Ordering::SeqCst)
159    }
160
161    #[inline]
162    pub fn recvd_conn_close_application_error_code(&self) -> i64 {
163        self.recvd_conn_close_application_error_code
164            .load(Ordering::SeqCst)
165    }
166
167    #[inline]
168    pub fn sent_conn_close_application_error_code(&self) -> i64 {
169        self.sent_conn_close_application_error_code
170            .load(Ordering::SeqCst)
171    }
172
173    #[inline]
174    pub fn set_recvd_conn_close_transport_error_code(
175        &self, recvd_conn_close_transport_error_code: i64,
176    ) {
177        self.recvd_conn_close_transport_error_code
178            .store(recvd_conn_close_transport_error_code, Ordering::SeqCst)
179    }
180
181    #[inline]
182    pub fn set_sent_conn_close_transport_error_code(
183        &self, sent_conn_close_transport_error_code: i64,
184    ) {
185        self.sent_conn_close_transport_error_code
186            .store(sent_conn_close_transport_error_code, Ordering::SeqCst)
187    }
188
189    #[inline]
190    pub fn set_recvd_conn_close_application_error_code(
191        &self, recvd_conn_close_application_error_code: i64,
192    ) {
193        self.recvd_conn_close_application_error_code
194            .store(recvd_conn_close_application_error_code, Ordering::SeqCst)
195    }
196
197    #[inline]
198    pub fn set_sent_conn_close_application_error_code(
199        &self, sent_conn_close_application_error_code: i64,
200    ) {
201        self.sent_conn_close_application_error_code
202            .store(sent_conn_close_application_error_code, Ordering::SeqCst)
203    }
204
205    #[inline]
206    pub fn transport_handshake_duration_us(&self) -> i64 {
207        self.transport_handshake_duration_us.load(Ordering::SeqCst)
208    }
209
210    #[inline]
211    pub fn set_transport_handshake_start(&self, start_time: SystemTime) {
212        *self.transport_handshake_start.write().unwrap() = Some(start_time);
213    }
214
215    #[inline]
216    pub fn set_transport_handshake_duration(&self, duration: Duration) {
217        let dur = i64::try_from(duration.as_micros()).unwrap_or(-1);
218        self.transport_handshake_duration_us
219            .store(dur, Ordering::SeqCst);
220    }
221
222    #[inline]
223    pub fn transport_handshake_start(&self) -> Arc<RwLock<Option<SystemTime>>> {
224        Arc::clone(&self.transport_handshake_start)
225    }
226
227    #[inline]
228    pub fn connection_close_reason(
229        &self,
230    ) -> impl Deref<Target = Option<BoxError>> + '_ {
231        self.connection_close_reason.read().unwrap()
232    }
233
234    #[inline]
235    pub fn set_connection_close_reason(&self, error: BoxError) {
236        *self.connection_close_reason.write().unwrap() = Some(error);
237    }
238
239    #[inline]
240    pub fn set_max_bandwidth(&self, max_bandwidth: u64) {
241        self.max_bandwidth.store(max_bandwidth, Ordering::Release)
242    }
243
244    #[inline]
245    pub fn max_bandwidth(&self) -> u64 {
246        self.max_bandwidth.load(Ordering::Acquire)
247    }
248
249    #[inline]
250    pub fn set_max_loss_pct(&self, max_loss_pct: u8) {
251        self.max_loss_pct.store(max_loss_pct, Ordering::Release)
252    }
253
254    #[inline]
255    pub fn max_loss_pct(&self) -> u8 {
256        self.max_loss_pct.load(Ordering::Acquire)
257    }
258
259    #[inline]
260    #[cfg(target_os = "linux")]
261    pub fn set_initial_so_mark_data(&self, value: Option<[u8; 4]>) {
262        if let Some(inner) = value {
263            let _ = self.initial_so_mark.set(inner);
264        }
265    }
266
267    #[inline]
268    #[cfg(target_os = "linux")]
269    pub fn initial_so_mark_data(&self) -> Option<&[u8; 4]> {
270        self.initial_so_mark.get()
271    }
272}
273
274#[derive(Debug, Copy, Clone, PartialEq, Eq)]
275pub enum StreamClosureKind {
276    None,
277    Implicit,
278    Explicit,
279}