tokio_quiche/quic/io/
utilization_estimator.rs

1// Copyright (C) 2025, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27use foundations::telemetry::metrics::Gauge;
28
29use std::collections::VecDeque;
30use std::ops::Div;
31use std::ops::Sub;
32use std::time::Duration;
33use std::time::Instant;
34
35use crate::quic::QuicheConnection;
36
37const EST_WIN: usize = 10;
38
39/// [`BandwidthReporter`] is responsible to track the bandwidth estimate for the
40/// connection
41pub(super) struct BandwidthReporter {
42    /// Time of last update
43    last_update: Instant,
44    /// Period between update (set using rtt)
45    update_period: Duration,
46    /// Estimate at last update
47    last_bandwidth: u64,
48    /// Bytes sent at last update
49    last_sent: u64,
50    /// Bytes lost at last update
51    last_lost: u64,
52    /// Bytes acked at last update
53    last_acked: u64,
54    /// Max recorded bandwidth
55    pub(super) max_bandwidth: u64,
56    /// Loss at max recorded bandwidth
57    pub(super) max_loss_pct: f32,
58
59    estimator: MaxUtilizedBandwidthEstimator,
60
61    gauge: Gauge,
62}
63
64impl BandwidthReporter {
65    pub(super) fn new(gauge: Gauge) -> Self {
66        BandwidthReporter {
67            last_update: Instant::now(),
68            update_period: Duration::from_millis(50),
69
70            last_bandwidth: 0,
71
72            last_sent: 0,
73            last_lost: 0,
74            last_acked: 0,
75
76            max_bandwidth: 0,
77            max_loss_pct: 0.,
78
79            estimator: MaxUtilizedBandwidthEstimator::new(),
80
81            gauge,
82        }
83    }
84
85    #[inline]
86    pub(super) fn update(&mut self, quiche: &QuicheConnection, now: Instant) {
87        if now.duration_since(self.last_update) < self.update_period {
88            return;
89        }
90
91        let stats = quiche.stats();
92
93        let bytes_sent = stats.sent_bytes - self.last_sent;
94        let bytes_lost = stats.lost_bytes - self.last_lost;
95        let bytes_acked = stats.acked_bytes - self.last_acked;
96
97        self.estimator.new_round(
98            self.last_update,
99            bytes_sent,
100            bytes_lost,
101            bytes_acked,
102        );
103
104        self.last_sent = stats.sent_bytes;
105        self.last_lost = stats.lost_bytes;
106        self.last_acked = stats.acked_bytes;
107
108        self.last_update = now;
109
110        let bw_estimate = self.estimator.get();
111
112        if self.last_bandwidth != bw_estimate.bandwidth {
113            self.gauge.dec_by(self.last_bandwidth);
114
115            self.last_bandwidth = bw_estimate.bandwidth;
116
117            self.gauge.inc_by(self.last_bandwidth);
118
119            self.max_bandwidth = self.max_bandwidth.max(self.last_bandwidth);
120            self.max_loss_pct = self.max_loss_pct.max(bw_estimate.loss);
121        }
122
123        if let Some(p) = quiche.path_stats().find(|s| s.active) {
124            self.update_period = p.rtt;
125        }
126    }
127}
128
129impl Drop for BandwidthReporter {
130    fn drop(&mut self) {
131        self.gauge.dec_by(self.last_bandwidth);
132    }
133}
134
135#[derive(Clone, Copy, Default)]
136pub struct Estimate {
137    pub bandwidth: u64,
138    pub loss: f32,
139}
140
141impl PartialEq for Estimate {
142    fn eq(&self, other: &Self) -> bool {
143        self.bandwidth == other.bandwidth
144    }
145}
146
147impl Eq for Estimate {}
148
149impl PartialOrd for Estimate {
150    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
151        Some(self.cmp(other))
152    }
153}
154
155impl Ord for Estimate {
156    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
157        self.bandwidth.cmp(&other.bandwidth)
158    }
159}
160
161struct Round {
162    bytes_sent: u64,
163    bytes_acked: u64,
164    bytes_lost: u64,
165    start: Instant,
166}
167
168pub(super) struct MaxUtilizedBandwidthEstimator {
169    rounds: VecDeque<Round>,
170    estimate: WindowedFilter<Estimate, Instant, Duration>,
171    bytes_sent_prev_round: u64,
172}
173
174impl MaxUtilizedBandwidthEstimator {
175    fn new() -> Self {
176        let rounds = VecDeque::with_capacity(EST_WIN);
177
178        MaxUtilizedBandwidthEstimator {
179            rounds,
180            estimate: WindowedFilter::new(Duration::from_secs(120)),
181            bytes_sent_prev_round: 0,
182        }
183    }
184
185    fn new_round(
186        &mut self, time: Instant, bytes_sent: u64, bytes_lost: u64,
187        bytes_acked: u64,
188    ) {
189        if self.rounds.len() == EST_WIN {
190            let _ = self.rounds.pop_front();
191        }
192
193        self.rounds.push_back(Round {
194            bytes_sent: self.bytes_sent_prev_round,
195            bytes_acked,
196            bytes_lost,
197            start: time,
198        });
199
200        // Unlike acked and lost count, sent count is computed over a window 1 rtt
201        // in the past.
202        self.bytes_sent_prev_round = bytes_sent;
203
204        let bytes_acked = self.rounds.iter().map(|v| v.bytes_acked).sum::<u64>();
205        let bytes_lost = self.rounds.iter().map(|v| v.bytes_lost).sum::<u64>();
206        let bytes_sent = self.rounds.iter().map(|v| v.bytes_sent).sum::<u64>();
207
208        let loss = if bytes_lost == 0 {
209            0.
210        } else {
211            bytes_lost as f32 / bytes_sent as f32
212        };
213
214        let time_delta = time.duration_since(self.rounds.front().unwrap().start);
215
216        if bytes_acked > 0 {
217            let ack_rate =
218                bandwidth_from_bytes_and_time_delta(bytes_acked, time_delta);
219            let send_rate =
220                bandwidth_from_bytes_and_time_delta(bytes_sent, time_delta);
221            let estimate = Estimate {
222                bandwidth: ack_rate.min(send_rate),
223                loss,
224            };
225
226            if self.rounds.len() < EST_WIN / 2 {
227                self.estimate.reset(estimate, time)
228            } else {
229                self.estimate.update(estimate, time)
230            }
231        }
232    }
233
234    pub(super) fn get(&self) -> Estimate {
235        // Too few rounds
236        if self.rounds.len() < EST_WIN / 2 {
237            return Default::default();
238        }
239
240        self.estimate.get_best().unwrap_or_default()
241    }
242}
243
244/// Bandwidth in bits per second from bytes and time period
245fn bandwidth_from_bytes_and_time_delta(bytes: u64, time_delta: Duration) -> u64 {
246    if bytes == 0 {
247        return 0;
248    }
249
250    let mut nanos = time_delta.as_nanos();
251    if nanos == 0 {
252        nanos = 1;
253    }
254
255    let num_nano_bits = 8 * bytes as u128 * 1_000_000_000;
256    if num_nano_bits < nanos {
257        return 1;
258    }
259
260    (num_nano_bits / nanos) as u64
261}
262
263/// Below is windowed filter implementation from quiche
264#[derive(Clone, Copy)]
265struct Sample<T, I> {
266    sample: T,
267    time: I,
268}
269
270pub struct WindowedFilter<T, I, D> {
271    window_length: D,
272    estimates: [Option<Sample<T, I>>; 3],
273}
274
275impl<T, I, D> WindowedFilter<T, I, D>
276where
277    T: Ord + Copy,
278    I: Sub<I, Output = D> + Copy,
279    D: Ord + Div<u32, Output = D> + Copy,
280{
281    pub fn new(window_length: D) -> Self {
282        WindowedFilter {
283            window_length,
284            estimates: [None, None, None],
285        }
286    }
287
288    pub fn reset(&mut self, new_sample: T, new_time: I) {
289        let sample = Some(Sample {
290            sample: new_sample,
291            time: new_time,
292        });
293
294        self.estimates = [sample, sample, sample];
295    }
296
297    pub fn get_best(&self) -> Option<T> {
298        self.estimates[0].as_ref().map(|e| e.sample)
299    }
300
301    pub fn update(&mut self, new_sample: T, new_time: I) {
302        // Reset all estimates if they have not yet been initialized, if new
303        // sample is a new best, or if the newest recorded estimate is too
304        // old.
305        if match &self.estimates[0] {
306            None => true,
307            Some(best) if new_sample > best.sample => true,
308            _ =>
309                new_time - self.estimates[2].as_ref().unwrap().time >
310                    self.window_length,
311        } {
312            return self.reset(new_sample, new_time);
313        }
314
315        if new_sample > self.estimates[1].unwrap().sample {
316            self.estimates[1] = Some(Sample {
317                sample: new_sample,
318                time: new_time,
319            });
320            self.estimates[2] = self.estimates[1];
321        } else if new_sample > self.estimates[2].unwrap().sample {
322            self.estimates[2] = Some(Sample {
323                sample: new_sample,
324                time: new_time,
325            });
326        }
327
328        // Expire and update estimates as necessary.
329        if new_time - self.estimates[0].unwrap().time > self.window_length {
330            // The best estimate hasn't been updated for an entire window, so
331            // promote second and third best estimates.
332            self.estimates[0] = self.estimates[1];
333            self.estimates[1] = self.estimates[2];
334            self.estimates[2] = Some(Sample {
335                sample: new_sample,
336                time: new_time,
337            });
338            // Need to iterate one more time. Check if the new best estimate is
339            // outside the window as well, since it may also have been recorded a
340            // long time ago. Don't need to iterate once more since we cover that
341            // case at the beginning of the method.
342            if new_time - self.estimates[0].unwrap().time > self.window_length {
343                self.estimates[0] = self.estimates[1];
344                self.estimates[1] = self.estimates[2];
345            }
346            return;
347        }
348
349        if self.estimates[1].unwrap().sample == self.estimates[0].unwrap().sample &&
350            new_time - self.estimates[1].unwrap().time > self.window_length / 4
351        {
352            // A quarter of the window has passed without a better sample, so the
353            // second-best estimate is taken from the second quarter of the
354            // window.
355            self.estimates[1] = Some(Sample {
356                sample: new_sample,
357                time: new_time,
358            });
359            self.estimates[2] = self.estimates[1];
360            return;
361        }
362
363        if self.estimates[2].unwrap().sample == self.estimates[1].unwrap().sample &&
364            new_time - self.estimates[2].unwrap().time > self.window_length / 2
365        {
366            // We've passed a half of the window without a better estimate, so
367            // take a third-best estimate from the second half of the
368            // window.
369            self.estimates[2] = Some(Sample {
370                sample: new_sample,
371                time: new_time,
372            });
373        }
374    }
375}
376
377#[cfg(test)]
378mod tests {
379    use super::*;
380
381    #[test]
382    fn estimate() {
383        let mut now = Instant::now();
384
385        let mut estimator = MaxUtilizedBandwidthEstimator::new();
386
387        assert_eq!(estimator.get().bandwidth, 0);
388        assert!(estimator.estimate.get_best().is_none());
389
390        // First round send 30M, nothing gets acked
391        estimator.new_round(now, 30_000_000, 0, 0);
392
393        assert_eq!(estimator.get().bandwidth, 0); // Not enough rounds for estimate yet
394        assert!(estimator.estimate.get_best().is_none());
395
396        now += Duration::from_secs(30);
397
398        // Send 60M, previous 30M gets acked
399        estimator.new_round(now, 60_000_000, 0, 30_000_000);
400
401        // 30M over 30s = 1MBps = 8Mbps
402        assert_eq!(estimator.get().bandwidth, 0); // Not enough rounds for estimate yet
403        assert_eq!(estimator.estimate.get_best().unwrap().bandwidth, 8_000_000);
404
405        now += Duration::from_secs(30);
406
407        // Send 90M, previous 60M gets acked
408        estimator.new_round(now, 90_000_000, 0, 60_000_000);
409
410        // 90M over 60s = 1.5MBps = 12Mbps
411        assert_eq!(estimator.get().bandwidth, 0); // Not enough rounds for estimate yet
412        assert_eq!(estimator.estimate.get_best().unwrap().bandwidth, 12_000_000);
413
414        now += Duration::from_secs(30);
415
416        // Send 10M, previous 90M gets acked
417        estimator.new_round(now, 30_000_000, 0, 90_000_000);
418
419        // 180M over 90s = 2MBps = 16Mbps
420        assert_eq!(estimator.get().bandwidth, 0); // Not enough rounds for estimate yet
421        assert_eq!(estimator.estimate.get_best().unwrap().bandwidth, 16_000_000);
422
423        for _ in 0..4 {
424            now += Duration::from_secs(30);
425            // Send another 10M, previous 10M gets acked
426            estimator.new_round(now, 30_000_000, 0, 30_000_000);
427            // The bandwidth is lower but it doesn't matter, we record highest
428            // bandwidth, so it remains as before for two minutes
429            assert_eq!(estimator.get().bandwidth, 16_000_000);
430        }
431
432        // After two minutes the filter is updated, and the max bandwidth is
433        // reduced
434        now += Duration::from_secs(30);
435        // Send another 10M, previous 10M gets acked
436        estimator.new_round(now, 30_000_000, 0, 30_000_000);
437
438        assert!(estimator.get().bandwidth < 8 * 2_000_000);
439    }
440}