tokio_quiche/quic/io/
utilization_estimator.rs

1// Copyright (C) 2025, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27use foundations::telemetry::metrics::Gauge;
28
29use std::collections::VecDeque;
30use std::ops::Div;
31use std::ops::Sub;
32use std::time::Duration;
33use std::time::Instant;
34
35use crate::quic::QuicheConnection;
36
37const EST_WIN: usize = 10;
38
39/// [`BandwidthReporter`] is responsible to track the bandwidth estimate for the
40/// connection
41pub(super) struct BandwidthReporter {
42    /// Time of last update
43    last_update: Instant,
44    /// Period between update (set using rtt)
45    update_period: Duration,
46    /// Estimate at last update
47    last_bandwidth: u64,
48    /// Bytes sent at last update
49    last_sent: u64,
50    /// Bytes lost at last update
51    last_lost: u64,
52    /// Bytes acked at last update
53    last_acked: u64,
54    /// Max recorded bandwidth
55    pub(super) max_bandwidth: u64,
56    /// Loss at max recorded bandwidth
57    pub(super) max_loss_pct: f32,
58
59    estimator: MaxUtilizedBandwidthEstimator,
60
61    gauge: Gauge,
62}
63
64impl BandwidthReporter {
65    pub(super) fn new(gauge: Gauge) -> Self {
66        BandwidthReporter {
67            last_update: Instant::now(),
68            update_period: Duration::from_millis(50),
69
70            last_bandwidth: 0,
71
72            last_sent: 0,
73            last_lost: 0,
74            last_acked: 0,
75
76            max_bandwidth: 0,
77            max_loss_pct: 0.,
78
79            estimator: MaxUtilizedBandwidthEstimator::new(),
80
81            gauge,
82        }
83    }
84
85    #[inline]
86    pub(super) fn update(&mut self, quiche: &QuicheConnection, now: Instant) {
87        if now.duration_since(self.last_update) < self.update_period {
88            return;
89        }
90
91        let stats = quiche.stats();
92
93        let bytes_sent = stats.sent_bytes - self.last_sent;
94        let bytes_lost = stats.lost_bytes - self.last_lost;
95        let bytes_acked = stats.acked_bytes - self.last_acked;
96
97        self.estimator.new_round(
98            self.last_update,
99            bytes_sent,
100            bytes_lost,
101            bytes_acked,
102        );
103
104        self.last_sent = stats.sent_bytes;
105        self.last_lost = stats.lost_bytes;
106        self.last_acked = stats.acked_bytes;
107
108        self.last_update = now;
109
110        let bw_estimate = self.estimator.get();
111
112        if self.last_bandwidth != bw_estimate.bandwidth {
113            self.gauge.dec_by(self.last_bandwidth);
114
115            self.last_bandwidth = bw_estimate.bandwidth;
116
117            self.gauge.inc_by(self.last_bandwidth);
118
119            self.max_bandwidth = self.max_bandwidth.max(self.last_bandwidth);
120            self.max_loss_pct = self.max_loss_pct.max(bw_estimate.loss);
121        }
122
123        if let Some(p) = quiche.path_stats().find(|s| s.active) {
124            self.update_period = p.rtt;
125        }
126    }
127}
128
129impl Drop for BandwidthReporter {
130    fn drop(&mut self) {
131        self.gauge.dec_by(self.last_bandwidth);
132    }
133}
134
135#[derive(Clone, Copy, Default)]
136pub struct Estimate {
137    pub bandwidth: u64,
138    pub loss: f32,
139}
140
141impl PartialEq for Estimate {
142    fn eq(&self, other: &Self) -> bool {
143        self.bandwidth == other.bandwidth
144    }
145}
146
147impl Eq for Estimate {}
148
149impl PartialOrd for Estimate {
150    #[allow(clippy::non_canonical_partial_ord_impl)]
151    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
152        self.bandwidth.partial_cmp(&other.bandwidth)
153    }
154}
155
156impl Ord for Estimate {
157    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
158        self.bandwidth.cmp(&other.bandwidth)
159    }
160}
161
162struct Round {
163    bytes_sent: u64,
164    bytes_acked: u64,
165    bytes_lost: u64,
166    start: Instant,
167}
168
169pub(super) struct MaxUtilizedBandwidthEstimator {
170    rounds: VecDeque<Round>,
171    estimate: WindowedFilter<Estimate, Instant, Duration>,
172    bytes_sent_prev_round: u64,
173}
174
175impl MaxUtilizedBandwidthEstimator {
176    fn new() -> Self {
177        let rounds = VecDeque::with_capacity(EST_WIN);
178
179        MaxUtilizedBandwidthEstimator {
180            rounds,
181            estimate: WindowedFilter::new(Duration::from_secs(120)),
182            bytes_sent_prev_round: 0,
183        }
184    }
185
186    fn new_round(
187        &mut self, time: Instant, bytes_sent: u64, bytes_lost: u64,
188        bytes_acked: u64,
189    ) {
190        if self.rounds.len() == EST_WIN {
191            let _ = self.rounds.pop_front();
192        }
193
194        self.rounds.push_back(Round {
195            bytes_sent: self.bytes_sent_prev_round,
196            bytes_acked,
197            bytes_lost,
198            start: time,
199        });
200
201        // Unlike acked and lost count, sent count is computed over a window 1 rtt
202        // in the past.
203        self.bytes_sent_prev_round = bytes_sent;
204
205        let bytes_acked = self.rounds.iter().map(|v| v.bytes_acked).sum::<u64>();
206        let bytes_lost = self.rounds.iter().map(|v| v.bytes_lost).sum::<u64>();
207        let bytes_sent = self.rounds.iter().map(|v| v.bytes_sent).sum::<u64>();
208
209        let loss = if bytes_lost == 0 {
210            0.
211        } else {
212            bytes_lost as f32 / bytes_sent as f32
213        };
214
215        let time_delta = time.duration_since(self.rounds.front().unwrap().start);
216
217        if bytes_acked > 0 {
218            let ack_rate =
219                bandwidth_from_bytes_and_time_delta(bytes_acked, time_delta);
220            let send_rate =
221                bandwidth_from_bytes_and_time_delta(bytes_sent, time_delta);
222            let estimate = Estimate {
223                bandwidth: ack_rate.min(send_rate),
224                loss,
225            };
226
227            if self.rounds.len() < EST_WIN / 2 {
228                self.estimate.reset(estimate, time)
229            } else {
230                self.estimate.update(estimate, time)
231            }
232        }
233    }
234
235    pub(super) fn get(&self) -> Estimate {
236        // Too few rounds
237        if self.rounds.len() < EST_WIN / 2 {
238            return Default::default();
239        }
240
241        self.estimate.get_best().unwrap_or_default()
242    }
243}
244
245/// Bandwidth in bits per second from bytes and time period
246fn bandwidth_from_bytes_and_time_delta(bytes: u64, time_delta: Duration) -> u64 {
247    if bytes == 0 {
248        return 0;
249    }
250
251    let mut nanos = time_delta.as_nanos();
252    if nanos == 0 {
253        nanos = 1;
254    }
255
256    let num_nano_bits = 8 * bytes as u128 * 1_000_000_000;
257    if num_nano_bits < nanos {
258        return 1;
259    }
260
261    (num_nano_bits / nanos) as u64
262}
263
264/// Below is windowed filter implementation from quiche
265#[derive(Clone, Copy)]
266struct Sample<T, I> {
267    sample: T,
268    time: I,
269}
270
271pub struct WindowedFilter<T, I, D> {
272    window_length: D,
273    estimates: [Option<Sample<T, I>>; 3],
274}
275
276impl<T, I, D> WindowedFilter<T, I, D>
277where
278    T: Ord + Copy,
279    I: Sub<I, Output = D> + Copy,
280    D: Ord + Div<u32, Output = D> + Copy,
281{
282    pub fn new(window_length: D) -> Self {
283        WindowedFilter {
284            window_length,
285            estimates: [None, None, None],
286        }
287    }
288
289    pub fn reset(&mut self, new_sample: T, new_time: I) {
290        let sample = Some(Sample {
291            sample: new_sample,
292            time: new_time,
293        });
294
295        self.estimates = [sample, sample, sample];
296    }
297
298    pub fn get_best(&self) -> Option<T> {
299        self.estimates[0].as_ref().map(|e| e.sample)
300    }
301
302    pub fn update(&mut self, new_sample: T, new_time: I) {
303        // Reset all estimates if they have not yet been initialized, if new
304        // sample is a new best, or if the newest recorded estimate is too
305        // old.
306        if match &self.estimates[0] {
307            None => true,
308            Some(best) if new_sample > best.sample => true,
309            _ =>
310                new_time - self.estimates[2].as_ref().unwrap().time >
311                    self.window_length,
312        } {
313            return self.reset(new_sample, new_time);
314        }
315
316        if new_sample > self.estimates[1].unwrap().sample {
317            self.estimates[1] = Some(Sample {
318                sample: new_sample,
319                time: new_time,
320            });
321            self.estimates[2] = self.estimates[1];
322        } else if new_sample > self.estimates[2].unwrap().sample {
323            self.estimates[2] = Some(Sample {
324                sample: new_sample,
325                time: new_time,
326            });
327        }
328
329        // Expire and update estimates as necessary.
330        if new_time - self.estimates[0].unwrap().time > self.window_length {
331            // The best estimate hasn't been updated for an entire window, so
332            // promote second and third best estimates.
333            self.estimates[0] = self.estimates[1];
334            self.estimates[1] = self.estimates[2];
335            self.estimates[2] = Some(Sample {
336                sample: new_sample,
337                time: new_time,
338            });
339            // Need to iterate one more time. Check if the new best estimate is
340            // outside the window as well, since it may also have been recorded a
341            // long time ago. Don't need to iterate once more since we cover that
342            // case at the beginning of the method.
343            if new_time - self.estimates[0].unwrap().time > self.window_length {
344                self.estimates[0] = self.estimates[1];
345                self.estimates[1] = self.estimates[2];
346            }
347            return;
348        }
349
350        if self.estimates[1].unwrap().sample == self.estimates[0].unwrap().sample &&
351            new_time - self.estimates[1].unwrap().time > self.window_length / 4
352        {
353            // A quarter of the window has passed without a better sample, so the
354            // second-best estimate is taken from the second quarter of the
355            // window.
356            self.estimates[1] = Some(Sample {
357                sample: new_sample,
358                time: new_time,
359            });
360            self.estimates[2] = self.estimates[1];
361            return;
362        }
363
364        if self.estimates[2].unwrap().sample == self.estimates[1].unwrap().sample &&
365            new_time - self.estimates[2].unwrap().time > self.window_length / 2
366        {
367            // We've passed a half of the window without a better estimate, so
368            // take a third-best estimate from the second half of the
369            // window.
370            self.estimates[2] = Some(Sample {
371                sample: new_sample,
372                time: new_time,
373            });
374        }
375    }
376}
377
378#[cfg(test)]
379mod tests {
380    use super::*;
381
382    #[test]
383    fn estimate() {
384        let mut now = Instant::now();
385
386        let mut estimator = MaxUtilizedBandwidthEstimator::new();
387
388        assert_eq!(estimator.get().bandwidth, 0);
389        assert!(estimator.estimate.get_best().is_none());
390
391        // First round send 30M, nothing gets acked
392        estimator.new_round(now, 30_000_000, 0, 0);
393
394        assert_eq!(estimator.get().bandwidth, 0); // Not enough rounds for estimate yet
395        assert!(estimator.estimate.get_best().is_none());
396
397        now += Duration::from_secs(30);
398
399        // Send 60M, previous 30M gets acked
400        estimator.new_round(now, 60_000_000, 0, 30_000_000);
401
402        // 30M over 30s = 1MBps = 8Mbps
403        assert_eq!(estimator.get().bandwidth, 0); // Not enough rounds for estimate yet
404        assert_eq!(estimator.estimate.get_best().unwrap().bandwidth, 8_000_000);
405
406        now += Duration::from_secs(30);
407
408        // Send 90M, previous 60M gets acked
409        estimator.new_round(now, 90_000_000, 0, 60_000_000);
410
411        // 90M over 60s = 1.5MBps = 12Mbps
412        assert_eq!(estimator.get().bandwidth, 0); // Not enough rounds for estimate yet
413        assert_eq!(estimator.estimate.get_best().unwrap().bandwidth, 12_000_000);
414
415        now += Duration::from_secs(30);
416
417        // Send 10M, previous 90M gets acked
418        estimator.new_round(now, 30_000_000, 0, 90_000_000);
419
420        // 180M over 90s = 2MBps = 16Mbps
421        assert_eq!(estimator.get().bandwidth, 0); // Not enough rounds for estimate yet
422        assert_eq!(estimator.estimate.get_best().unwrap().bandwidth, 16_000_000);
423
424        for _ in 0..4 {
425            now += Duration::from_secs(30);
426            // Send another 10M, previous 10M gets acked
427            estimator.new_round(now, 30_000_000, 0, 30_000_000);
428            // The bandwidth is lower but it doesn't matter, we record highest
429            // bandwidth, so it remains as before for two minutes
430            assert_eq!(estimator.get().bandwidth, 16_000_000);
431        }
432
433        // After two minutes the filter is updated, and the max bandwidth is
434        // reduced
435        now += Duration::from_secs(30);
436        // Send another 10M, previous 10M gets acked
437        estimator.new_round(now, 30_000_000, 0, 30_000_000);
438
439        assert!(estimator.get().bandwidth < 8 * 2_000_000);
440    }
441}