datagram_socket/
socket_stats.rs

1// Copyright (C) 2025, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27use std::ops::Deref;
28use std::sync::atomic::AtomicI64;
29use std::sync::atomic::AtomicU64;
30use std::sync::atomic::AtomicU8;
31use std::sync::atomic::Ordering;
32use std::sync::Arc;
33use std::sync::RwLock;
34use std::time::Duration;
35use std::time::SystemTime;
36
37pub trait AsSocketStats {
38    fn as_socket_stats(&self) -> SocketStats;
39
40    fn as_quic_stats(&self) -> Option<&Arc<QuicAuditStats>> {
41        None
42    }
43}
44
45#[derive(Debug, Clone, Copy, Default)]
46pub struct SocketStats {
47    pub pmtu: u16,
48    pub rtt_us: i64,
49    pub min_rtt_us: i64,
50    pub max_rtt_us: i64,
51    pub rtt_var_us: i64,
52    pub cwnd: u64,
53    pub total_pto_count: u64,
54    pub packets_sent: u64,
55    pub packets_recvd: u64,
56    pub packets_lost: u64,
57    pub packets_lost_spurious: u64,
58    pub packets_retrans: u64,
59    pub bytes_sent: u64,
60    pub bytes_recvd: u64,
61    pub bytes_lost: u64,
62    pub bytes_retrans: u64,
63    pub bytes_unsent: u64,
64    pub delivery_rate: u64,
65    pub startup_exit: Option<StartupExit>,
66    pub bytes_in_flight_duration_us: u64,
67}
68
69/// Statistics from when a CCA first exited the startup phase.
70#[derive(Debug, Clone, Copy, PartialEq)]
71pub struct StartupExit {
72    pub cwnd: usize,
73    pub reason: StartupExitReason,
74}
75
76/// The reason a CCA exited the startup phase.
77#[derive(Debug, Clone, Copy, PartialEq)]
78pub enum StartupExitReason {
79    /// Exit startup due to excessive loss
80    Loss,
81
82    /// Exit startup due to bandwidth plateau.
83    BandwidthPlateau,
84
85    /// Exit startup due to persistent queue.
86    PersistentQueue,
87}
88
89type BoxError = Box<dyn std::error::Error + Send + Sync>;
90
91#[derive(Debug)]
92pub struct QuicAuditStats {
93    /// A transport-level connection error code received from the client.
94    recvd_conn_close_transport_error_code: AtomicI64,
95    /// A transport-level connection error code sent to the client.
96    sent_conn_close_transport_error_code: AtomicI64,
97    /// An application-level connection error code received from the client.
98    recvd_conn_close_application_error_code: AtomicI64,
99    /// An application-level connection error code sent to the client.
100    sent_conn_close_application_error_code: AtomicI64,
101    /// Time taken for the QUIC handshake in microseconds.
102    transport_handshake_duration_us: AtomicI64,
103    /// The start time of the handshake.
104    transport_handshake_start: Arc<RwLock<Option<SystemTime>>>,
105    /// The reason the QUIC connection was closed
106    connection_close_reason: RwLock<Option<BoxError>>,
107    /// Max recorded bandwidth.
108    max_bandwidth: AtomicU64,
109    /// Loss at max recorded bandwidth.
110    max_loss_pct: AtomicU8,
111    /// The server's chosen QUIC connection ID.
112    ///
113    /// The QUIC connection ID is presently an array of 20 bytes (160 bits)
114    pub quic_connection_id: Vec<u8>,
115}
116
117impl QuicAuditStats {
118    #[inline]
119    pub fn new(quic_connection_id: Vec<u8>) -> Self {
120        Self {
121            recvd_conn_close_transport_error_code: AtomicI64::new(-1),
122            sent_conn_close_transport_error_code: AtomicI64::new(-1),
123            recvd_conn_close_application_error_code: AtomicI64::new(-1),
124            sent_conn_close_application_error_code: AtomicI64::new(-1),
125            transport_handshake_duration_us: AtomicI64::new(-1),
126            transport_handshake_start: Arc::new(RwLock::new(None)),
127            connection_close_reason: RwLock::new(None),
128            max_bandwidth: AtomicU64::new(0),
129            max_loss_pct: AtomicU8::new(0),
130            quic_connection_id,
131        }
132    }
133
134    #[inline]
135    pub fn recvd_conn_close_transport_error_code(&self) -> i64 {
136        self.recvd_conn_close_transport_error_code
137            .load(Ordering::SeqCst)
138    }
139
140    #[inline]
141    pub fn sent_conn_close_transport_error_code(&self) -> i64 {
142        self.sent_conn_close_transport_error_code
143            .load(Ordering::SeqCst)
144    }
145
146    #[inline]
147    pub fn recvd_conn_close_application_error_code(&self) -> i64 {
148        self.recvd_conn_close_application_error_code
149            .load(Ordering::SeqCst)
150    }
151
152    #[inline]
153    pub fn sent_conn_close_application_error_code(&self) -> i64 {
154        self.sent_conn_close_application_error_code
155            .load(Ordering::SeqCst)
156    }
157
158    #[inline]
159    pub fn set_recvd_conn_close_transport_error_code(
160        &self, recvd_conn_close_transport_error_code: i64,
161    ) {
162        self.recvd_conn_close_transport_error_code
163            .store(recvd_conn_close_transport_error_code, Ordering::SeqCst)
164    }
165
166    #[inline]
167    pub fn set_sent_conn_close_transport_error_code(
168        &self, sent_conn_close_transport_error_code: i64,
169    ) {
170        self.sent_conn_close_transport_error_code
171            .store(sent_conn_close_transport_error_code, Ordering::SeqCst)
172    }
173
174    #[inline]
175    pub fn set_recvd_conn_close_application_error_code(
176        &self, recvd_conn_close_application_error_code: i64,
177    ) {
178        self.recvd_conn_close_application_error_code
179            .store(recvd_conn_close_application_error_code, Ordering::SeqCst)
180    }
181
182    #[inline]
183    pub fn set_sent_conn_close_application_error_code(
184        &self, sent_conn_close_application_error_code: i64,
185    ) {
186        self.sent_conn_close_application_error_code
187            .store(sent_conn_close_application_error_code, Ordering::SeqCst)
188    }
189
190    #[inline]
191    pub fn transport_handshake_duration_us(&self) -> i64 {
192        self.transport_handshake_duration_us.load(Ordering::SeqCst)
193    }
194
195    #[inline]
196    pub fn set_transport_handshake_start(&self, start_time: SystemTime) {
197        *self.transport_handshake_start.write().unwrap() = Some(start_time);
198    }
199
200    #[inline]
201    pub fn set_transport_handshake_duration(&self, duration: Duration) {
202        let dur = i64::try_from(duration.as_micros()).unwrap_or(-1);
203        self.transport_handshake_duration_us
204            .store(dur, Ordering::SeqCst);
205    }
206
207    #[inline]
208    pub fn transport_handshake_start(&self) -> Arc<RwLock<Option<SystemTime>>> {
209        Arc::clone(&self.transport_handshake_start)
210    }
211
212    #[inline]
213    pub fn connection_close_reason(
214        &self,
215    ) -> impl Deref<Target = Option<BoxError>> + '_ {
216        self.connection_close_reason.read().unwrap()
217    }
218
219    #[inline]
220    pub fn set_connection_close_reason(&self, error: BoxError) {
221        *self.connection_close_reason.write().unwrap() = Some(error);
222    }
223
224    #[inline]
225    pub fn set_max_bandwidth(&self, max_bandwidth: u64) {
226        self.max_bandwidth.store(max_bandwidth, Ordering::Release)
227    }
228
229    #[inline]
230    pub fn max_bandwidth(&self) -> u64 {
231        self.max_bandwidth.load(Ordering::Acquire)
232    }
233
234    #[inline]
235    pub fn set_max_loss_pct(&self, max_loss_pct: u8) {
236        self.max_loss_pct.store(max_loss_pct, Ordering::Release)
237    }
238
239    #[inline]
240    pub fn max_loss_pct(&self) -> u8 {
241        self.max_loss_pct.load(Ordering::Acquire)
242    }
243}
244
245#[derive(Debug, Copy, Clone, PartialEq, Eq)]
246pub enum StreamClosureKind {
247    None,
248    Implicit,
249    Explicit,
250}