1use foundations::telemetry::metrics::Gauge;
28
29use std::collections::VecDeque;
30use std::ops::Div;
31use std::ops::Sub;
32use std::time::Duration;
33use std::time::Instant;
34
35use crate::quic::QuicheConnection;
36
37const EST_WIN: usize = 10;
38
39pub(super) struct BandwidthReporter {
42 last_update: Instant,
44 update_period: Duration,
46 last_bandwidth: u64,
48 last_sent: u64,
50 last_lost: u64,
52 last_acked: u64,
54 pub(super) max_bandwidth: u64,
56 pub(super) max_loss_pct: f32,
58
59 estimator: MaxUtilizedBandwidthEstimator,
60
61 gauge: Gauge,
62}
63
64impl BandwidthReporter {
65 pub(super) fn new(gauge: Gauge) -> Self {
66 BandwidthReporter {
67 last_update: Instant::now(),
68 update_period: Duration::from_millis(50),
69
70 last_bandwidth: 0,
71
72 last_sent: 0,
73 last_lost: 0,
74 last_acked: 0,
75
76 max_bandwidth: 0,
77 max_loss_pct: 0.,
78
79 estimator: MaxUtilizedBandwidthEstimator::new(),
80
81 gauge,
82 }
83 }
84
85 #[inline]
86 pub(super) fn update(&mut self, quiche: &QuicheConnection, now: Instant) {
87 if now.duration_since(self.last_update) < self.update_period {
88 return;
89 }
90
91 let stats = quiche.stats();
92
93 let bytes_sent = stats.sent_bytes - self.last_sent;
94 let bytes_lost = stats.lost_bytes - self.last_lost;
95 let bytes_acked = stats.acked_bytes - self.last_acked;
96
97 self.estimator.new_round(
98 self.last_update,
99 bytes_sent,
100 bytes_lost,
101 bytes_acked,
102 );
103
104 self.last_sent = stats.sent_bytes;
105 self.last_lost = stats.lost_bytes;
106 self.last_acked = stats.acked_bytes;
107
108 self.last_update = now;
109
110 let bw_estimate = self.estimator.get();
111
112 if self.last_bandwidth != bw_estimate.bandwidth {
113 self.gauge.dec_by(self.last_bandwidth);
114
115 self.last_bandwidth = bw_estimate.bandwidth;
116
117 self.gauge.inc_by(self.last_bandwidth);
118
119 self.max_bandwidth = self.max_bandwidth.max(self.last_bandwidth);
120 self.max_loss_pct = self.max_loss_pct.max(bw_estimate.loss);
121 }
122
123 if let Some(p) = quiche.path_stats().find(|s| s.active) {
124 self.update_period = p.rtt;
125 }
126 }
127}
128
129impl Drop for BandwidthReporter {
130 fn drop(&mut self) {
131 self.gauge.dec_by(self.last_bandwidth);
132 }
133}
134
135#[derive(Clone, Copy, Default)]
136pub struct Estimate {
137 pub bandwidth: u64,
138 pub loss: f32,
139}
140
141impl PartialEq for Estimate {
142 fn eq(&self, other: &Self) -> bool {
143 self.bandwidth == other.bandwidth
144 }
145}
146
147impl Eq for Estimate {}
148
149impl PartialOrd for Estimate {
150 #[allow(clippy::non_canonical_partial_ord_impl)]
151 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
152 self.bandwidth.partial_cmp(&other.bandwidth)
153 }
154}
155
156impl Ord for Estimate {
157 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
158 self.bandwidth.cmp(&other.bandwidth)
159 }
160}
161
162struct Round {
163 bytes_sent: u64,
164 bytes_acked: u64,
165 bytes_lost: u64,
166 start: Instant,
167}
168
169pub(super) struct MaxUtilizedBandwidthEstimator {
170 rounds: VecDeque<Round>,
171 estimate: WindowedFilter<Estimate, Instant, Duration>,
172 bytes_sent_prev_round: u64,
173}
174
175impl MaxUtilizedBandwidthEstimator {
176 fn new() -> Self {
177 let rounds = VecDeque::with_capacity(EST_WIN);
178
179 MaxUtilizedBandwidthEstimator {
180 rounds,
181 estimate: WindowedFilter::new(Duration::from_secs(120)),
182 bytes_sent_prev_round: 0,
183 }
184 }
185
186 fn new_round(
187 &mut self, time: Instant, bytes_sent: u64, bytes_lost: u64,
188 bytes_acked: u64,
189 ) {
190 if self.rounds.len() == EST_WIN {
191 let _ = self.rounds.pop_front();
192 }
193
194 self.rounds.push_back(Round {
195 bytes_sent: self.bytes_sent_prev_round,
196 bytes_acked,
197 bytes_lost,
198 start: time,
199 });
200
201 self.bytes_sent_prev_round = bytes_sent;
204
205 let bytes_acked = self.rounds.iter().map(|v| v.bytes_acked).sum::<u64>();
206 let bytes_lost = self.rounds.iter().map(|v| v.bytes_lost).sum::<u64>();
207 let bytes_sent = self.rounds.iter().map(|v| v.bytes_sent).sum::<u64>();
208
209 let loss = if bytes_lost == 0 {
210 0.
211 } else {
212 bytes_lost as f32 / bytes_sent as f32
213 };
214
215 let time_delta = time.duration_since(self.rounds.front().unwrap().start);
216
217 if bytes_acked > 0 {
218 let ack_rate =
219 bandwidth_from_bytes_and_time_delta(bytes_acked, time_delta);
220 let send_rate =
221 bandwidth_from_bytes_and_time_delta(bytes_sent, time_delta);
222 let estimate = Estimate {
223 bandwidth: ack_rate.min(send_rate),
224 loss,
225 };
226
227 if self.rounds.len() < EST_WIN / 2 {
228 self.estimate.reset(estimate, time)
229 } else {
230 self.estimate.update(estimate, time)
231 }
232 }
233 }
234
235 pub(super) fn get(&self) -> Estimate {
236 if self.rounds.len() < EST_WIN / 2 {
238 return Default::default();
239 }
240
241 self.estimate.get_best().unwrap_or_default()
242 }
243}
244
245fn bandwidth_from_bytes_and_time_delta(bytes: u64, time_delta: Duration) -> u64 {
247 if bytes == 0 {
248 return 0;
249 }
250
251 let mut nanos = time_delta.as_nanos();
252 if nanos == 0 {
253 nanos = 1;
254 }
255
256 let num_nano_bits = 8 * bytes as u128 * 1_000_000_000;
257 if num_nano_bits < nanos {
258 return 1;
259 }
260
261 (num_nano_bits / nanos) as u64
262}
263
264#[derive(Clone, Copy)]
266struct Sample<T, I> {
267 sample: T,
268 time: I,
269}
270
271pub struct WindowedFilter<T, I, D> {
272 window_length: D,
273 estimates: [Option<Sample<T, I>>; 3],
274}
275
276impl<T, I, D> WindowedFilter<T, I, D>
277where
278 T: Ord + Copy,
279 I: Sub<I, Output = D> + Copy,
280 D: Ord + Div<u32, Output = D> + Copy,
281{
282 pub fn new(window_length: D) -> Self {
283 WindowedFilter {
284 window_length,
285 estimates: [None, None, None],
286 }
287 }
288
289 pub fn reset(&mut self, new_sample: T, new_time: I) {
290 let sample = Some(Sample {
291 sample: new_sample,
292 time: new_time,
293 });
294
295 self.estimates = [sample, sample, sample];
296 }
297
298 pub fn get_best(&self) -> Option<T> {
299 self.estimates[0].as_ref().map(|e| e.sample)
300 }
301
302 pub fn update(&mut self, new_sample: T, new_time: I) {
303 if match &self.estimates[0] {
307 None => true,
308 Some(best) if new_sample > best.sample => true,
309 _ =>
310 new_time - self.estimates[2].as_ref().unwrap().time >
311 self.window_length,
312 } {
313 return self.reset(new_sample, new_time);
314 }
315
316 if new_sample > self.estimates[1].unwrap().sample {
317 self.estimates[1] = Some(Sample {
318 sample: new_sample,
319 time: new_time,
320 });
321 self.estimates[2] = self.estimates[1];
322 } else if new_sample > self.estimates[2].unwrap().sample {
323 self.estimates[2] = Some(Sample {
324 sample: new_sample,
325 time: new_time,
326 });
327 }
328
329 if new_time - self.estimates[0].unwrap().time > self.window_length {
331 self.estimates[0] = self.estimates[1];
334 self.estimates[1] = self.estimates[2];
335 self.estimates[2] = Some(Sample {
336 sample: new_sample,
337 time: new_time,
338 });
339 if new_time - self.estimates[0].unwrap().time > self.window_length {
344 self.estimates[0] = self.estimates[1];
345 self.estimates[1] = self.estimates[2];
346 }
347 return;
348 }
349
350 if self.estimates[1].unwrap().sample == self.estimates[0].unwrap().sample &&
351 new_time - self.estimates[1].unwrap().time > self.window_length / 4
352 {
353 self.estimates[1] = Some(Sample {
357 sample: new_sample,
358 time: new_time,
359 });
360 self.estimates[2] = self.estimates[1];
361 return;
362 }
363
364 if self.estimates[2].unwrap().sample == self.estimates[1].unwrap().sample &&
365 new_time - self.estimates[2].unwrap().time > self.window_length / 2
366 {
367 self.estimates[2] = Some(Sample {
371 sample: new_sample,
372 time: new_time,
373 });
374 }
375 }
376}
377
378#[cfg(test)]
379mod tests {
380 use super::*;
381
382 #[test]
383 fn estimate() {
384 let mut now = Instant::now();
385
386 let mut estimator = MaxUtilizedBandwidthEstimator::new();
387
388 assert_eq!(estimator.get().bandwidth, 0);
389 assert!(estimator.estimate.get_best().is_none());
390
391 estimator.new_round(now, 30_000_000, 0, 0);
393
394 assert_eq!(estimator.get().bandwidth, 0); assert!(estimator.estimate.get_best().is_none());
396
397 now += Duration::from_secs(30);
398
399 estimator.new_round(now, 60_000_000, 0, 30_000_000);
401
402 assert_eq!(estimator.get().bandwidth, 0); assert_eq!(estimator.estimate.get_best().unwrap().bandwidth, 8_000_000);
405
406 now += Duration::from_secs(30);
407
408 estimator.new_round(now, 90_000_000, 0, 60_000_000);
410
411 assert_eq!(estimator.get().bandwidth, 0); assert_eq!(estimator.estimate.get_best().unwrap().bandwidth, 12_000_000);
414
415 now += Duration::from_secs(30);
416
417 estimator.new_round(now, 30_000_000, 0, 90_000_000);
419
420 assert_eq!(estimator.get().bandwidth, 0); assert_eq!(estimator.estimate.get_best().unwrap().bandwidth, 16_000_000);
423
424 for _ in 0..4 {
425 now += Duration::from_secs(30);
426 estimator.new_round(now, 30_000_000, 0, 30_000_000);
428 assert_eq!(estimator.get().bandwidth, 16_000_000);
431 }
432
433 now += Duration::from_secs(30);
436 estimator.new_round(now, 30_000_000, 0, 30_000_000);
438
439 assert!(estimator.get().bandwidth < 8 * 2_000_000);
440 }
441}