1use foundations::telemetry::metrics::Gauge;
28
29use std::collections::VecDeque;
30use std::ops::Div;
31use std::ops::Sub;
32use std::time::Duration;
33use std::time::Instant;
34
35use crate::quic::QuicheConnection;
36
37const EST_WIN: usize = 10;
38
39pub(super) struct BandwidthReporter {
42 last_update: Instant,
44 update_period: Duration,
46 last_bandwidth: u64,
48 last_sent: u64,
50 last_lost: u64,
52 last_acked: u64,
54 pub(super) max_bandwidth: u64,
56 pub(super) max_loss_pct: f32,
58
59 estimator: MaxUtilizedBandwidthEstimator,
60
61 gauge: Gauge,
62}
63
64impl BandwidthReporter {
65 pub(super) fn new(gauge: Gauge) -> Self {
66 BandwidthReporter {
67 last_update: Instant::now(),
68 update_period: Duration::from_millis(50),
69
70 last_bandwidth: 0,
71
72 last_sent: 0,
73 last_lost: 0,
74 last_acked: 0,
75
76 max_bandwidth: 0,
77 max_loss_pct: 0.,
78
79 estimator: MaxUtilizedBandwidthEstimator::new(),
80
81 gauge,
82 }
83 }
84
85 #[inline]
86 pub(super) fn update(&mut self, quiche: &QuicheConnection, now: Instant) {
87 if now.duration_since(self.last_update) < self.update_period {
88 return;
89 }
90
91 let stats = quiche.stats();
92
93 let bytes_sent = stats.sent_bytes - self.last_sent;
94 let bytes_lost = stats.lost_bytes - self.last_lost;
95 let bytes_acked = stats.acked_bytes - self.last_acked;
96
97 self.estimator.new_round(
98 self.last_update,
99 bytes_sent,
100 bytes_lost,
101 bytes_acked,
102 );
103
104 self.last_sent = stats.sent_bytes;
105 self.last_lost = stats.lost_bytes;
106 self.last_acked = stats.acked_bytes;
107
108 self.last_update = now;
109
110 let bw_estimate = self.estimator.get();
111
112 if self.last_bandwidth != bw_estimate.bandwidth {
113 self.gauge.dec_by(self.last_bandwidth);
114
115 self.last_bandwidth = bw_estimate.bandwidth;
116
117 self.gauge.inc_by(self.last_bandwidth);
118
119 self.max_bandwidth = self.max_bandwidth.max(self.last_bandwidth);
120 self.max_loss_pct = self.max_loss_pct.max(bw_estimate.loss);
121 }
122
123 if let Some(p) = quiche.path_stats().find(|s| s.active) {
124 self.update_period = p.rtt;
125 }
126 }
127}
128
129impl Drop for BandwidthReporter {
130 fn drop(&mut self) {
131 self.gauge.dec_by(self.last_bandwidth);
132 }
133}
134
135#[derive(Clone, Copy, Default)]
136pub struct Estimate {
137 pub bandwidth: u64,
138 pub loss: f32,
139}
140
141impl PartialEq for Estimate {
142 fn eq(&self, other: &Self) -> bool {
143 self.bandwidth == other.bandwidth
144 }
145}
146
147impl Eq for Estimate {}
148
149impl PartialOrd for Estimate {
150 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
151 Some(self.cmp(other))
152 }
153}
154
155impl Ord for Estimate {
156 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
157 self.bandwidth.cmp(&other.bandwidth)
158 }
159}
160
161struct Round {
162 bytes_sent: u64,
163 bytes_acked: u64,
164 bytes_lost: u64,
165 start: Instant,
166}
167
168pub(super) struct MaxUtilizedBandwidthEstimator {
169 rounds: VecDeque<Round>,
170 estimate: WindowedFilter<Estimate, Instant, Duration>,
171 bytes_sent_prev_round: u64,
172}
173
174impl MaxUtilizedBandwidthEstimator {
175 fn new() -> Self {
176 let rounds = VecDeque::with_capacity(EST_WIN);
177
178 MaxUtilizedBandwidthEstimator {
179 rounds,
180 estimate: WindowedFilter::new(Duration::from_secs(120)),
181 bytes_sent_prev_round: 0,
182 }
183 }
184
185 fn new_round(
186 &mut self, time: Instant, bytes_sent: u64, bytes_lost: u64,
187 bytes_acked: u64,
188 ) {
189 if self.rounds.len() == EST_WIN {
190 let _ = self.rounds.pop_front();
191 }
192
193 self.rounds.push_back(Round {
194 bytes_sent: self.bytes_sent_prev_round,
195 bytes_acked,
196 bytes_lost,
197 start: time,
198 });
199
200 self.bytes_sent_prev_round = bytes_sent;
203
204 let bytes_acked = self.rounds.iter().map(|v| v.bytes_acked).sum::<u64>();
205 let bytes_lost = self.rounds.iter().map(|v| v.bytes_lost).sum::<u64>();
206 let bytes_sent = self.rounds.iter().map(|v| v.bytes_sent).sum::<u64>();
207
208 let loss = if bytes_lost == 0 {
209 0.
210 } else {
211 bytes_lost as f32 / bytes_sent as f32
212 };
213
214 let time_delta = time.duration_since(self.rounds.front().unwrap().start);
215
216 if bytes_acked > 0 {
217 let ack_rate =
218 bandwidth_from_bytes_and_time_delta(bytes_acked, time_delta);
219 let send_rate =
220 bandwidth_from_bytes_and_time_delta(bytes_sent, time_delta);
221 let estimate = Estimate {
222 bandwidth: ack_rate.min(send_rate),
223 loss,
224 };
225
226 if self.rounds.len() < EST_WIN / 2 {
227 self.estimate.reset(estimate, time)
228 } else {
229 self.estimate.update(estimate, time)
230 }
231 }
232 }
233
234 pub(super) fn get(&self) -> Estimate {
235 if self.rounds.len() < EST_WIN / 2 {
237 return Default::default();
238 }
239
240 self.estimate.get_best().unwrap_or_default()
241 }
242}
243
244fn bandwidth_from_bytes_and_time_delta(bytes: u64, time_delta: Duration) -> u64 {
246 if bytes == 0 {
247 return 0;
248 }
249
250 let mut nanos = time_delta.as_nanos();
251 if nanos == 0 {
252 nanos = 1;
253 }
254
255 let num_nano_bits = 8 * bytes as u128 * 1_000_000_000;
256 if num_nano_bits < nanos {
257 return 1;
258 }
259
260 (num_nano_bits / nanos) as u64
261}
262
263#[derive(Clone, Copy)]
265struct Sample<T, I> {
266 sample: T,
267 time: I,
268}
269
270pub struct WindowedFilter<T, I, D> {
271 window_length: D,
272 estimates: [Option<Sample<T, I>>; 3],
273}
274
275impl<T, I, D> WindowedFilter<T, I, D>
276where
277 T: Ord + Copy,
278 I: Sub<I, Output = D> + Copy,
279 D: Ord + Div<u32, Output = D> + Copy,
280{
281 pub fn new(window_length: D) -> Self {
282 WindowedFilter {
283 window_length,
284 estimates: [None, None, None],
285 }
286 }
287
288 pub fn reset(&mut self, new_sample: T, new_time: I) {
289 let sample = Some(Sample {
290 sample: new_sample,
291 time: new_time,
292 });
293
294 self.estimates = [sample, sample, sample];
295 }
296
297 pub fn get_best(&self) -> Option<T> {
298 self.estimates[0].as_ref().map(|e| e.sample)
299 }
300
301 pub fn update(&mut self, new_sample: T, new_time: I) {
302 if match &self.estimates[0] {
306 None => true,
307 Some(best) if new_sample > best.sample => true,
308 _ =>
309 new_time - self.estimates[2].as_ref().unwrap().time >
310 self.window_length,
311 } {
312 return self.reset(new_sample, new_time);
313 }
314
315 if new_sample > self.estimates[1].unwrap().sample {
316 self.estimates[1] = Some(Sample {
317 sample: new_sample,
318 time: new_time,
319 });
320 self.estimates[2] = self.estimates[1];
321 } else if new_sample > self.estimates[2].unwrap().sample {
322 self.estimates[2] = Some(Sample {
323 sample: new_sample,
324 time: new_time,
325 });
326 }
327
328 if new_time - self.estimates[0].unwrap().time > self.window_length {
330 self.estimates[0] = self.estimates[1];
333 self.estimates[1] = self.estimates[2];
334 self.estimates[2] = Some(Sample {
335 sample: new_sample,
336 time: new_time,
337 });
338 if new_time - self.estimates[0].unwrap().time > self.window_length {
343 self.estimates[0] = self.estimates[1];
344 self.estimates[1] = self.estimates[2];
345 }
346 return;
347 }
348
349 if self.estimates[1].unwrap().sample == self.estimates[0].unwrap().sample &&
350 new_time - self.estimates[1].unwrap().time > self.window_length / 4
351 {
352 self.estimates[1] = Some(Sample {
356 sample: new_sample,
357 time: new_time,
358 });
359 self.estimates[2] = self.estimates[1];
360 return;
361 }
362
363 if self.estimates[2].unwrap().sample == self.estimates[1].unwrap().sample &&
364 new_time - self.estimates[2].unwrap().time > self.window_length / 2
365 {
366 self.estimates[2] = Some(Sample {
370 sample: new_sample,
371 time: new_time,
372 });
373 }
374 }
375}
376
377#[cfg(test)]
378mod tests {
379 use super::*;
380
381 #[test]
382 fn estimate() {
383 let mut now = Instant::now();
384
385 let mut estimator = MaxUtilizedBandwidthEstimator::new();
386
387 assert_eq!(estimator.get().bandwidth, 0);
388 assert!(estimator.estimate.get_best().is_none());
389
390 estimator.new_round(now, 30_000_000, 0, 0);
392
393 assert_eq!(estimator.get().bandwidth, 0); assert!(estimator.estimate.get_best().is_none());
395
396 now += Duration::from_secs(30);
397
398 estimator.new_round(now, 60_000_000, 0, 30_000_000);
400
401 assert_eq!(estimator.get().bandwidth, 0); assert_eq!(estimator.estimate.get_best().unwrap().bandwidth, 8_000_000);
404
405 now += Duration::from_secs(30);
406
407 estimator.new_round(now, 90_000_000, 0, 60_000_000);
409
410 assert_eq!(estimator.get().bandwidth, 0); assert_eq!(estimator.estimate.get_best().unwrap().bandwidth, 12_000_000);
413
414 now += Duration::from_secs(30);
415
416 estimator.new_round(now, 30_000_000, 0, 90_000_000);
418
419 assert_eq!(estimator.get().bandwidth, 0); assert_eq!(estimator.estimate.get_best().unwrap().bandwidth, 16_000_000);
422
423 for _ in 0..4 {
424 now += Duration::from_secs(30);
425 estimator.new_round(now, 30_000_000, 0, 30_000_000);
427 assert_eq!(estimator.get().bandwidth, 16_000_000);
430 }
431
432 now += Duration::from_secs(30);
435 estimator.new_round(now, 30_000_000, 0, 30_000_000);
437
438 assert!(estimator.get().bandwidth < 8 * 2_000_000);
439 }
440}