Skip to main content

quiche/recovery/
mod.rs

1// Copyright (C) 2018-2019, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27use std::str::FromStr;
28use std::time::Duration;
29use std::time::Instant;
30
31use crate::frame;
32use crate::packet;
33use crate::ranges::RangeSet;
34pub(crate) use crate::recovery::bandwidth::Bandwidth;
35use crate::Config;
36use crate::Result;
37
38#[cfg(feature = "qlog")]
39use qlog::events::EventData;
40#[cfg(feature = "qlog")]
41use serde::Serialize;
42
43use smallvec::SmallVec;
44
45use self::congestion::recovery::LegacyRecovery;
46use self::gcongestion::GRecovery;
47pub use gcongestion::BbrBwLoReductionStrategy;
48pub use gcongestion::BbrParams;
49
50// Loss Recovery
51const INITIAL_PACKET_THRESHOLD: u64 = 3;
52
53const MAX_PACKET_THRESHOLD: u64 = 20;
54
55// Time threshold used to calculate the loss time.
56//
57// https://www.rfc-editor.org/rfc/rfc9002.html#section-6.1.2
58const INITIAL_TIME_THRESHOLD: f64 = 9.0 / 8.0;
59
60// Reduce the sensitivity to packet reordering after the first reordering event.
61//
62// Packet reorder is not a real loss event so quickly reduce the sensitivity to
63// avoid penializing subsequent packet reordering.
64//
65// https://www.rfc-editor.org/rfc/rfc9002.html#section-6.1.2
66//
67// Implementations MAY experiment with absolute thresholds, thresholds from
68// previous connections, adaptive thresholds, or the including of RTT variation.
69// Smaller thresholds reduce reordering resilience and increase spurious
70// retransmissions, and larger thresholds increase loss detection delay.
71const PACKET_REORDER_TIME_THRESHOLD: f64 = 5.0 / 4.0;
72
73// # Experiment: enable_relaxed_loss_threshold
74//
75// Time threshold overhead used to calculate the loss time.
76//
77// The actual threshold is calcualted as 1 + INITIAL_TIME_THRESHOLD_OVERHEAD and
78// equivalent to INITIAL_TIME_THRESHOLD.
79const INITIAL_TIME_THRESHOLD_OVERHEAD: f64 = 1.0 / 8.0;
80// # Experiment: enable_relaxed_loss_threshold
81//
82// The factor by which to increase the time threshold on spurious loss.
83const TIME_THRESHOLD_OVERHEAD_MULTIPLIER: f64 = 2.0;
84
85const GRANULARITY: Duration = Duration::from_millis(1);
86
87const MAX_PTO_PROBES_COUNT: usize = 2;
88
89const MINIMUM_WINDOW_PACKETS: usize = 2;
90
91const LOSS_REDUCTION_FACTOR: f64 = 0.5;
92
93// How many non ACK eliciting packets we send before including a PING to solicit
94// an ACK.
95pub(super) const MAX_OUTSTANDING_NON_ACK_ELICITING: usize = 24;
96
97#[derive(Default)]
98struct LossDetectionTimer {
99    time: Option<Instant>,
100}
101
102impl LossDetectionTimer {
103    fn update(&mut self, timeout: Instant) {
104        self.time = Some(timeout);
105    }
106
107    fn clear(&mut self) {
108        self.time = None;
109    }
110}
111
112impl std::fmt::Debug for LossDetectionTimer {
113    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
114        match self.time {
115            Some(v) => {
116                let now = Instant::now();
117                if v > now {
118                    let d = v.duration_since(now);
119                    write!(f, "{d:?}")
120                } else {
121                    write!(f, "exp")
122                }
123            },
124            None => write!(f, "none"),
125        }
126    }
127}
128
129#[derive(Clone, Copy, PartialEq)]
130pub struct RecoveryConfig {
131    pub initial_rtt: Duration,
132    pub max_send_udp_payload_size: usize,
133    pub max_ack_delay: Duration,
134    pub cc_algorithm: CongestionControlAlgorithm,
135    pub custom_bbr_params: Option<BbrParams>,
136    pub hystart: bool,
137    pub pacing: bool,
138    pub max_pacing_rate: Option<u64>,
139    pub initial_congestion_window_packets: usize,
140    pub enable_relaxed_loss_threshold: bool,
141    pub enable_cubic_idle_restart_fix: bool,
142}
143
144impl RecoveryConfig {
145    pub fn from_config(config: &Config) -> Self {
146        Self {
147            initial_rtt: config.initial_rtt,
148            max_send_udp_payload_size: config.max_send_udp_payload_size,
149            max_ack_delay: Duration::ZERO,
150            cc_algorithm: config.cc_algorithm,
151            custom_bbr_params: config.custom_bbr_params,
152            hystart: config.hystart,
153            pacing: config.pacing,
154            max_pacing_rate: config.max_pacing_rate,
155            initial_congestion_window_packets: config
156                .initial_congestion_window_packets,
157            enable_relaxed_loss_threshold: config.enable_relaxed_loss_threshold,
158            enable_cubic_idle_restart_fix: config.enable_cubic_idle_restart_fix,
159        }
160    }
161}
162
163#[enum_dispatch::enum_dispatch(RecoveryOps)]
164#[allow(clippy::large_enum_variant)]
165#[derive(Debug)]
166pub(crate) enum Recovery {
167    Legacy(LegacyRecovery),
168    GCongestion(GRecovery),
169}
170
171#[derive(Debug, Default, PartialEq)]
172pub struct OnAckReceivedOutcome {
173    pub lost_packets: usize,
174    pub lost_bytes: usize,
175    pub acked_bytes: usize,
176    pub spurious_losses: usize,
177}
178
179#[derive(Debug, Default)]
180pub struct OnLossDetectionTimeoutOutcome {
181    pub lost_packets: usize,
182    pub lost_bytes: usize,
183}
184
185#[enum_dispatch::enum_dispatch]
186/// Api for the Recovery implementation
187pub trait RecoveryOps {
188    fn lost_count(&self) -> usize;
189    fn bytes_lost(&self) -> u64;
190
191    /// Returns whether or not we should elicit an ACK even if we wouldn't
192    /// otherwise have constructed an ACK eliciting packet.
193    fn should_elicit_ack(&self, epoch: packet::Epoch) -> bool;
194
195    fn next_acked_frame(&mut self, epoch: packet::Epoch) -> Option<frame::Frame>;
196
197    fn next_lost_frame(&mut self, epoch: packet::Epoch) -> Option<frame::Frame>;
198
199    fn get_largest_acked_on_epoch(&self, epoch: packet::Epoch) -> Option<u64>;
200    fn has_lost_frames(&self, epoch: packet::Epoch) -> bool;
201    fn loss_probes(&self, epoch: packet::Epoch) -> usize;
202    #[cfg(test)]
203    fn inc_loss_probes(&mut self, epoch: packet::Epoch);
204
205    fn ping_sent(&mut self, epoch: packet::Epoch);
206
207    fn on_packet_sent(
208        &mut self, pkt: Sent, epoch: packet::Epoch,
209        handshake_status: HandshakeStatus, now: Instant, trace_id: &str,
210    );
211    fn get_packet_send_time(&self, now: Instant) -> Instant;
212
213    #[allow(clippy::too_many_arguments)]
214    fn on_ack_received(
215        &mut self, ranges: &RangeSet, ack_delay: u64, epoch: packet::Epoch,
216        handshake_status: HandshakeStatus, now: Instant, skip_pn: Option<u64>,
217        trace_id: &str,
218    ) -> Result<OnAckReceivedOutcome>;
219
220    fn on_loss_detection_timeout(
221        &mut self, handshake_status: HandshakeStatus, now: Instant,
222        trace_id: &str,
223    ) -> OnLossDetectionTimeoutOutcome;
224    fn on_pkt_num_space_discarded(
225        &mut self, epoch: packet::Epoch, handshake_status: HandshakeStatus,
226        now: Instant,
227    );
228    fn on_path_change(
229        &mut self, epoch: packet::Epoch, now: Instant, _trace_id: &str,
230    ) -> (usize, usize);
231    fn loss_detection_timer(&self) -> Option<Instant>;
232    fn cwnd(&self) -> usize;
233    fn cwnd_available(&self) -> usize;
234    fn rtt(&self) -> Duration;
235
236    fn min_rtt(&self) -> Option<Duration>;
237
238    fn max_rtt(&self) -> Option<Duration>;
239
240    fn rttvar(&self) -> Duration;
241
242    fn pto(&self) -> Duration;
243
244    /// The most recent data delivery rate estimate.
245    fn delivery_rate(&self) -> Bandwidth;
246
247    /// Maximum bandwidth estimate, if one is available.
248    fn max_bandwidth(&self) -> Option<Bandwidth>;
249
250    /// Statistics from when a CCA first exited the startup phase.
251    fn startup_exit(&self) -> Option<StartupExit>;
252
253    fn max_datagram_size(&self) -> usize;
254
255    fn pmtud_update_max_datagram_size(&mut self, new_max_datagram_size: usize);
256
257    fn update_max_datagram_size(&mut self, new_max_datagram_size: usize);
258
259    fn on_app_limited(&mut self);
260
261    // Since a recovery module is path specific, this tracks the largest packet
262    // sent per path.
263    #[cfg(test)]
264    fn largest_sent_pkt_num_on_path(&self, epoch: packet::Epoch) -> Option<u64>;
265
266    #[cfg(test)]
267    fn app_limited(&self) -> bool;
268
269    #[cfg(test)]
270    fn sent_packets_len(&self, epoch: packet::Epoch) -> usize;
271
272    fn bytes_in_flight(&self) -> usize;
273
274    fn bytes_in_flight_duration(&self) -> Duration;
275
276    #[cfg(test)]
277    fn in_flight_count(&self, epoch: packet::Epoch) -> usize;
278
279    #[cfg(test)]
280    fn pacing_rate(&self) -> u64;
281
282    #[cfg(test)]
283    fn pto_count(&self) -> u32;
284
285    // This value might be `None` when experiment `enable_relaxed_loss_threshold`
286    // is enabled for gcongestion
287    #[cfg(test)]
288    fn pkt_thresh(&self) -> Option<u64>;
289
290    #[cfg(test)]
291    fn time_thresh(&self) -> f64;
292
293    #[cfg(test)]
294    fn lost_spurious_count(&self) -> usize;
295
296    #[cfg(test)]
297    fn detect_lost_packets_for_test(
298        &mut self, epoch: packet::Epoch, now: Instant,
299    ) -> (usize, usize);
300
301    fn update_app_limited(&mut self, v: bool);
302
303    fn delivery_rate_update_app_limited(&mut self, v: bool);
304
305    fn update_max_ack_delay(&mut self, max_ack_delay: Duration);
306
307    #[cfg(feature = "qlog")]
308    fn state_str(&self, now: Instant) -> &'static str;
309
310    #[cfg(feature = "qlog")]
311    fn get_updated_qlog_event_data(&mut self) -> Option<EventData>;
312
313    #[cfg(feature = "qlog")]
314    fn get_updated_qlog_cc_state(&mut self, now: Instant)
315        -> Option<&'static str>;
316
317    fn send_quantum(&self) -> usize;
318
319    fn get_next_release_time(&self) -> ReleaseDecision;
320
321    fn gcongestion_enabled(&self) -> bool;
322}
323
324impl Recovery {
325    pub fn new_with_config(recovery_config: &RecoveryConfig) -> Self {
326        let grecovery = GRecovery::new(recovery_config);
327        if let Some(grecovery) = grecovery {
328            Recovery::from(grecovery)
329        } else {
330            Recovery::from(LegacyRecovery::new_with_config(recovery_config))
331        }
332    }
333
334    #[cfg(feature = "qlog")]
335    pub fn maybe_qlog(
336        &mut self, qlog: &mut qlog::streamer::QlogStreamer, now: Instant,
337    ) {
338        if let Some(ev_data) = self.get_updated_qlog_event_data() {
339            qlog.add_event_data_with_instant(ev_data, now).ok();
340        }
341
342        if let Some(cc_state) = self.get_updated_qlog_cc_state(now) {
343            let ev_data = EventData::QuicCongestionStateUpdated(
344                qlog::events::quic::CongestionStateUpdated {
345                    old: None,
346                    new: cc_state.to_string(),
347                    trigger: None,
348                },
349            );
350
351            qlog.add_event_data_with_instant(ev_data, now).ok();
352        }
353    }
354
355    #[cfg(test)]
356    pub fn new(config: &Config) -> Self {
357        Self::new_with_config(&RecoveryConfig::from_config(config))
358    }
359}
360
361/// Available congestion control algorithms.
362///
363/// This enum provides currently available list of congestion control
364/// algorithms.
365#[derive(Debug, Copy, Clone, PartialEq, Eq)]
366#[repr(C)]
367pub enum CongestionControlAlgorithm {
368    /// Reno congestion control algorithm. `reno` in a string form.
369    Reno            = 0,
370    /// CUBIC congestion control algorithm (default). `cubic` in a string form.
371    CUBIC           = 1,
372    /// BBRv2 congestion control algorithm implementation from gcongestion
373    /// branch. `bbr2_gcongestion` in a string form.
374    Bbr2Gcongestion = 4,
375}
376
377impl FromStr for CongestionControlAlgorithm {
378    type Err = crate::Error;
379
380    /// Converts a string to `CongestionControlAlgorithm`.
381    ///
382    /// If `name` is not valid, `Error::CongestionControl` is returned.
383    fn from_str(name: &str) -> std::result::Result<Self, Self::Err> {
384        match name {
385            "reno" => Ok(CongestionControlAlgorithm::Reno),
386            "cubic" => Ok(CongestionControlAlgorithm::CUBIC),
387            "bbr" => Ok(CongestionControlAlgorithm::Bbr2Gcongestion),
388            "bbr2" => Ok(CongestionControlAlgorithm::Bbr2Gcongestion),
389            "bbr2_gcongestion" => Ok(CongestionControlAlgorithm::Bbr2Gcongestion),
390            _ => Err(crate::Error::CongestionControl),
391        }
392    }
393}
394
395#[derive(Clone)]
396pub struct Sent {
397    pub pkt_num: u64,
398
399    pub frames: SmallVec<[frame::Frame; 1]>,
400
401    pub time_sent: Instant,
402
403    pub time_acked: Option<Instant>,
404
405    pub time_lost: Option<Instant>,
406
407    pub size: usize,
408
409    pub ack_eliciting: bool,
410
411    pub in_flight: bool,
412
413    pub delivered: usize,
414
415    pub delivered_time: Instant,
416
417    pub first_sent_time: Instant,
418
419    pub is_app_limited: bool,
420
421    pub tx_in_flight: usize,
422
423    pub lost: u64,
424
425    pub has_data: bool,
426
427    pub is_pmtud_probe: bool,
428}
429
430impl std::fmt::Debug for Sent {
431    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
432        write!(f, "pkt_num={:?} ", self.pkt_num)?;
433        write!(f, "pkt_sent_time={:?} ", self.time_sent)?;
434        write!(f, "pkt_size={:?} ", self.size)?;
435        write!(f, "delivered={:?} ", self.delivered)?;
436        write!(f, "delivered_time={:?} ", self.delivered_time)?;
437        write!(f, "first_sent_time={:?} ", self.first_sent_time)?;
438        write!(f, "is_app_limited={} ", self.is_app_limited)?;
439        write!(f, "tx_in_flight={} ", self.tx_in_flight)?;
440        write!(f, "lost={} ", self.lost)?;
441        write!(f, "has_data={} ", self.has_data)?;
442        write!(f, "is_pmtud_probe={}", self.is_pmtud_probe)?;
443
444        Ok(())
445    }
446}
447
448#[derive(Clone, Copy, Debug)]
449pub struct HandshakeStatus {
450    pub has_handshake_keys: bool,
451
452    pub peer_verified_address: bool,
453
454    pub completed: bool,
455}
456
457#[cfg(test)]
458impl Default for HandshakeStatus {
459    fn default() -> HandshakeStatus {
460        HandshakeStatus {
461            has_handshake_keys: true,
462
463            peer_verified_address: true,
464
465            completed: true,
466        }
467    }
468}
469
470// We don't need to log all qlog metrics every time there is a recovery event.
471// Instead, we can log only the MetricsUpdated event data fields that we care
472// about, only when they change. To support this, the QLogMetrics structure
473// keeps a running picture of the fields.
474#[derive(Default)]
475#[cfg(feature = "qlog")]
476struct QlogMetrics {
477    min_rtt: Duration,
478    smoothed_rtt: Duration,
479    latest_rtt: Duration,
480    rttvar: Duration,
481    cwnd: u64,
482    bytes_in_flight: u64,
483    ssthresh: Option<u64>,
484    pacing_rate: Option<u64>,
485    delivery_rate: Option<u64>,
486    send_rate: Option<u64>,
487    ack_rate: Option<u64>,
488    lost_packets: Option<u64>,
489    lost_bytes: Option<u64>,
490    pto_count: Option<u32>,
491}
492
493#[cfg(feature = "qlog")]
494trait CustomCfQlogField {
495    fn name(&self) -> &'static str;
496    fn as_json_value(&self) -> serde_json::Value;
497}
498
499#[cfg(feature = "qlog")]
500#[serde_with::skip_serializing_none]
501#[derive(Serialize)]
502struct TotalAndDelta {
503    total: Option<u64>,
504    delta: Option<u64>,
505}
506
507#[cfg(feature = "qlog")]
508struct CustomQlogField<T> {
509    name: &'static str,
510    value: T,
511}
512
513#[cfg(feature = "qlog")]
514impl<T> CustomQlogField<T> {
515    fn new(name: &'static str, value: T) -> Self {
516        Self { name, value }
517    }
518}
519
520#[cfg(feature = "qlog")]
521impl<T: Serialize> CustomCfQlogField for CustomQlogField<T> {
522    fn name(&self) -> &'static str {
523        self.name
524    }
525
526    fn as_json_value(&self) -> serde_json::Value {
527        serde_json::json!(&self.value)
528    }
529}
530
531#[cfg(feature = "qlog")]
532struct CfExData(qlog::events::ExData);
533
534#[cfg(feature = "qlog")]
535impl CfExData {
536    fn new() -> Self {
537        Self(qlog::events::ExData::new())
538    }
539
540    fn insert<T: Serialize>(&mut self, name: &'static str, value: T) {
541        let field = CustomQlogField::new(name, value);
542        self.0
543            .insert(field.name().to_string(), field.as_json_value());
544    }
545
546    fn into_inner(self) -> qlog::events::ExData {
547        self.0
548    }
549}
550
551#[cfg(feature = "qlog")]
552impl QlogMetrics {
553    // Make a qlog event if the latest instance of QlogMetrics is different.
554    //
555    // This function diffs each of the fields. A qlog MetricsUpdated event is
556    // only generated if at least one field is different. Where fields are
557    // different, the qlog event contains the latest value.
558    fn maybe_update(&mut self, latest: Self) -> Option<EventData> {
559        let mut emit_event = false;
560
561        let new_min_rtt = if self.min_rtt != latest.min_rtt {
562            self.min_rtt = latest.min_rtt;
563            emit_event = true;
564            Some(latest.min_rtt.as_secs_f32() * 1000.0)
565        } else {
566            None
567        };
568
569        let new_smoothed_rtt = if self.smoothed_rtt != latest.smoothed_rtt {
570            self.smoothed_rtt = latest.smoothed_rtt;
571            emit_event = true;
572            Some(latest.smoothed_rtt.as_secs_f32() * 1000.0)
573        } else {
574            None
575        };
576
577        let new_latest_rtt = if self.latest_rtt != latest.latest_rtt {
578            self.latest_rtt = latest.latest_rtt;
579            emit_event = true;
580            Some(latest.latest_rtt.as_secs_f32() * 1000.0)
581        } else {
582            None
583        };
584
585        let new_rttvar = if self.rttvar != latest.rttvar {
586            self.rttvar = latest.rttvar;
587            emit_event = true;
588            Some(latest.rttvar.as_secs_f32() * 1000.0)
589        } else {
590            None
591        };
592
593        let new_cwnd = if self.cwnd != latest.cwnd {
594            self.cwnd = latest.cwnd;
595            emit_event = true;
596            Some(latest.cwnd)
597        } else {
598            None
599        };
600
601        let new_bytes_in_flight =
602            if self.bytes_in_flight != latest.bytes_in_flight {
603                self.bytes_in_flight = latest.bytes_in_flight;
604                emit_event = true;
605                Some(latest.bytes_in_flight)
606            } else {
607                None
608            };
609
610        let new_ssthresh = if self.ssthresh != latest.ssthresh {
611            self.ssthresh = latest.ssthresh;
612            emit_event = true;
613            latest.ssthresh
614        } else {
615            None
616        };
617
618        let new_pacing_rate = if self.pacing_rate != latest.pacing_rate {
619            self.pacing_rate = latest.pacing_rate;
620            emit_event = true;
621            latest.pacing_rate
622        } else {
623            None
624        };
625
626        let new_pto_count =
627            if latest.pto_count.is_some() && self.pto_count != latest.pto_count {
628                self.pto_count = latest.pto_count;
629                emit_event = true;
630                latest.pto_count.map(|v| v as u16)
631            } else {
632                None
633            };
634
635        // Build ex_data for rate metrics
636        let mut ex_data = CfExData::new();
637        if self.delivery_rate != latest.delivery_rate {
638            if let Some(rate) = latest.delivery_rate {
639                self.delivery_rate = latest.delivery_rate;
640                emit_event = true;
641                ex_data.insert("cf_delivery_rate", rate);
642            }
643        }
644        if self.send_rate != latest.send_rate {
645            if let Some(rate) = latest.send_rate {
646                self.send_rate = latest.send_rate;
647                emit_event = true;
648                ex_data.insert("cf_send_rate", rate);
649            }
650        }
651        if self.ack_rate != latest.ack_rate {
652            if let Some(rate) = latest.ack_rate {
653                self.ack_rate = latest.ack_rate;
654                emit_event = true;
655                ex_data.insert("cf_ack_rate", rate);
656            }
657        }
658
659        if self.lost_packets != latest.lost_packets {
660            if let Some(val) = latest.lost_packets {
661                emit_event = true;
662                ex_data.insert("cf_lost_packets", TotalAndDelta {
663                    total: latest.lost_packets,
664                    delta: Some(val - self.lost_packets.unwrap_or(0)),
665                });
666                self.lost_packets = latest.lost_packets;
667            }
668        }
669        if self.lost_bytes != latest.lost_bytes {
670            if let Some(val) = latest.lost_bytes {
671                emit_event = true;
672                ex_data.insert("cf_lost_bytes", TotalAndDelta {
673                    total: latest.lost_bytes,
674                    delta: Some(val - self.lost_bytes.unwrap_or(0)),
675                });
676                self.lost_bytes = latest.lost_bytes;
677            }
678        }
679
680        if emit_event {
681            return Some(EventData::QuicMetricsUpdated(
682                qlog::events::quic::RecoveryMetricsUpdated {
683                    min_rtt: new_min_rtt,
684                    smoothed_rtt: new_smoothed_rtt,
685                    latest_rtt: new_latest_rtt,
686                    rtt_variance: new_rttvar,
687                    congestion_window: new_cwnd,
688                    bytes_in_flight: new_bytes_in_flight,
689                    ssthresh: new_ssthresh,
690                    pacing_rate: new_pacing_rate,
691                    pto_count: new_pto_count,
692                    ex_data: ex_data.into_inner(),
693                    ..Default::default()
694                },
695            ));
696        }
697
698        None
699    }
700}
701
702/// When the pacer thinks is a good time to release the next packet
703#[derive(Debug, Clone, Copy, PartialEq, Eq)]
704pub enum ReleaseTime {
705    Immediate,
706    At(Instant),
707}
708
709/// When the next packet should be release and if it can be part of a burst
710#[derive(Clone, Copy, Debug, PartialEq, Eq)]
711pub struct ReleaseDecision {
712    time: ReleaseTime,
713    allow_burst: bool,
714}
715
716impl ReleaseTime {
717    /// Add the specific delay to the current time
718    fn inc(&mut self, delay: Duration) {
719        match self {
720            ReleaseTime::Immediate => {},
721            ReleaseTime::At(time) => *time += delay,
722        }
723    }
724
725    /// Set the time to the later of two times
726    fn set_max(&mut self, other: Instant) {
727        match self {
728            ReleaseTime::Immediate => *self = ReleaseTime::At(other),
729            ReleaseTime::At(time) => *self = ReleaseTime::At(other.max(*time)),
730        }
731    }
732}
733
734impl ReleaseDecision {
735    pub(crate) const EQUAL_THRESHOLD: Duration = Duration::from_micros(50);
736
737    /// Get the [`Instant`] the next packet should be released. It will never be
738    /// in the past.
739    #[inline]
740    pub fn time(&self, now: Instant) -> Option<Instant> {
741        match self.time {
742            ReleaseTime::Immediate => None,
743            ReleaseTime::At(other) => other.gt(&now).then_some(other),
744        }
745    }
746
747    /// Can this packet be appended to a previous burst
748    #[inline]
749    pub fn can_burst(&self) -> bool {
750        self.allow_burst
751    }
752
753    /// Check if the two packets can be released at the same time
754    #[inline]
755    pub fn time_eq(&self, other: &Self, now: Instant) -> bool {
756        let delta = match (self.time(now), other.time(now)) {
757            (None, None) => Duration::ZERO,
758            (Some(t), None) | (None, Some(t)) => t.duration_since(now),
759            (Some(t1), Some(t2)) if t1 < t2 => t2.duration_since(t1),
760            (Some(t1), Some(t2)) => t1.duration_since(t2),
761        };
762
763        delta <= Self::EQUAL_THRESHOLD
764    }
765}
766
767/// Recovery statistics
768#[derive(Default, Debug)]
769pub struct RecoveryStats {
770    startup_exit: Option<StartupExit>,
771}
772
773impl RecoveryStats {
774    // Record statistics when a CCA first exits startup.
775    pub fn set_startup_exit(&mut self, startup_exit: StartupExit) {
776        if self.startup_exit.is_none() {
777            self.startup_exit = Some(startup_exit);
778        }
779    }
780}
781
782/// Statistics from when a CCA first exited the startup phase.
783#[derive(Debug, Clone, Copy, PartialEq)]
784pub struct StartupExit {
785    /// The congestion_window recorded at Startup exit.
786    pub cwnd: usize,
787
788    /// The bandwidth estimate recorded at Startup exit.
789    pub bandwidth: Option<u64>,
790
791    /// The reason a CCA exited the startup phase.
792    pub reason: StartupExitReason,
793}
794
795impl StartupExit {
796    fn new(
797        cwnd: usize, bandwidth: Option<Bandwidth>, reason: StartupExitReason,
798    ) -> Self {
799        let bandwidth = bandwidth.map(Bandwidth::to_bytes_per_second);
800        Self {
801            cwnd,
802            bandwidth,
803            reason,
804        }
805    }
806}
807
808/// The reason a CCA exited the startup phase.
809#[derive(Debug, Clone, Copy, PartialEq)]
810pub enum StartupExitReason {
811    /// Exit slow start or BBR startup due to excessive loss
812    Loss,
813
814    /// Exit BBR startup due to bandwidth plateau.
815    BandwidthPlateau,
816
817    /// Exit BBR startup due to persistent queue.
818    PersistentQueue,
819
820    /// Exit HyStart++ conservative slow start after the max rounds allowed.
821    ConservativeSlowStartRounds,
822}
823
824#[cfg(test)]
825mod tests {
826    use super::*;
827    use crate::packet;
828    use crate::test_utils;
829    use crate::CongestionControlAlgorithm;
830    use crate::DEFAULT_INITIAL_RTT;
831    use rstest::rstest;
832    use smallvec::smallvec;
833    use std::str::FromStr;
834
835    fn recovery_for_alg(algo: CongestionControlAlgorithm) -> Recovery {
836        let mut cfg = Config::new(crate::PROTOCOL_VERSION).unwrap();
837        cfg.set_cc_algorithm(algo);
838        Recovery::new(&cfg)
839    }
840
841    #[test]
842    fn lookup_cc_algo_ok() {
843        let algo = CongestionControlAlgorithm::from_str("reno").unwrap();
844        assert_eq!(algo, CongestionControlAlgorithm::Reno);
845        assert!(!recovery_for_alg(algo).gcongestion_enabled());
846
847        let algo = CongestionControlAlgorithm::from_str("cubic").unwrap();
848        assert_eq!(algo, CongestionControlAlgorithm::CUBIC);
849        assert!(!recovery_for_alg(algo).gcongestion_enabled());
850
851        let algo = CongestionControlAlgorithm::from_str("bbr").unwrap();
852        assert_eq!(algo, CongestionControlAlgorithm::Bbr2Gcongestion);
853        assert!(recovery_for_alg(algo).gcongestion_enabled());
854
855        let algo = CongestionControlAlgorithm::from_str("bbr2").unwrap();
856        assert_eq!(algo, CongestionControlAlgorithm::Bbr2Gcongestion);
857        assert!(recovery_for_alg(algo).gcongestion_enabled());
858
859        let algo =
860            CongestionControlAlgorithm::from_str("bbr2_gcongestion").unwrap();
861        assert_eq!(algo, CongestionControlAlgorithm::Bbr2Gcongestion);
862        assert!(recovery_for_alg(algo).gcongestion_enabled());
863    }
864
865    #[test]
866    fn lookup_cc_algo_bad() {
867        assert_eq!(
868            CongestionControlAlgorithm::from_str("???"),
869            Err(crate::Error::CongestionControl)
870        );
871    }
872
873    #[rstest]
874    fn loss_on_pto(
875        #[values("reno", "cubic", "bbr2_gcongestion")] cc_algorithm_name: &str,
876    ) {
877        let mut cfg = Config::new(crate::PROTOCOL_VERSION).unwrap();
878        assert_eq!(cfg.set_cc_algorithm_name(cc_algorithm_name), Ok(()));
879
880        let mut r = Recovery::new(&cfg);
881
882        let mut now = Instant::now();
883
884        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 0);
885
886        // Start by sending a few packets.
887        let p = Sent {
888            pkt_num: 0,
889            frames: smallvec![],
890            time_sent: now,
891            time_acked: None,
892            time_lost: None,
893            size: 1000,
894            ack_eliciting: true,
895            in_flight: true,
896            delivered: 0,
897            delivered_time: now,
898            first_sent_time: now,
899            is_app_limited: false,
900            tx_in_flight: 0,
901            lost: 0,
902            has_data: false,
903            is_pmtud_probe: false,
904        };
905
906        r.on_packet_sent(
907            p,
908            packet::Epoch::Application,
909            HandshakeStatus::default(),
910            now,
911            "",
912        );
913
914        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 1);
915        assert_eq!(r.bytes_in_flight(), 1000);
916        assert_eq!(r.bytes_in_flight_duration(), Duration::ZERO);
917
918        let p = Sent {
919            pkt_num: 1,
920            frames: smallvec![],
921            time_sent: now,
922            time_acked: None,
923            time_lost: None,
924            size: 1000,
925            ack_eliciting: true,
926            in_flight: true,
927            delivered: 0,
928            delivered_time: now,
929            first_sent_time: now,
930            is_app_limited: false,
931            tx_in_flight: 0,
932            lost: 0,
933            has_data: false,
934            is_pmtud_probe: false,
935        };
936
937        r.on_packet_sent(
938            p,
939            packet::Epoch::Application,
940            HandshakeStatus::default(),
941            now,
942            "",
943        );
944
945        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 2);
946        assert_eq!(r.bytes_in_flight(), 2000);
947        assert_eq!(r.bytes_in_flight_duration(), Duration::ZERO);
948
949        let p = Sent {
950            pkt_num: 2,
951            frames: smallvec![],
952            time_sent: now,
953            time_acked: None,
954            time_lost: None,
955            size: 1000,
956            ack_eliciting: true,
957            in_flight: true,
958            delivered: 0,
959            delivered_time: now,
960            first_sent_time: now,
961            is_app_limited: false,
962            tx_in_flight: 0,
963            lost: 0,
964            has_data: false,
965            is_pmtud_probe: false,
966        };
967
968        r.on_packet_sent(
969            p,
970            packet::Epoch::Application,
971            HandshakeStatus::default(),
972            now,
973            "",
974        );
975        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 3);
976        assert_eq!(r.bytes_in_flight(), 3000);
977        assert_eq!(r.bytes_in_flight_duration(), Duration::ZERO);
978
979        let p = Sent {
980            pkt_num: 3,
981            frames: smallvec![],
982            time_sent: now,
983            time_acked: None,
984            time_lost: None,
985            size: 1000,
986            ack_eliciting: true,
987            in_flight: true,
988            delivered: 0,
989            delivered_time: now,
990            first_sent_time: now,
991            is_app_limited: false,
992            tx_in_flight: 0,
993            lost: 0,
994            has_data: false,
995            is_pmtud_probe: false,
996        };
997
998        r.on_packet_sent(
999            p,
1000            packet::Epoch::Application,
1001            HandshakeStatus::default(),
1002            now,
1003            "",
1004        );
1005        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 4);
1006        assert_eq!(r.bytes_in_flight(), 4000);
1007        assert_eq!(r.bytes_in_flight_duration(), Duration::ZERO);
1008
1009        // Wait for 10ms.
1010        now += Duration::from_millis(10);
1011
1012        // Only the first 2 packets are acked.
1013        let mut acked = RangeSet::default();
1014        acked.insert(0..2);
1015
1016        assert_eq!(
1017            r.on_ack_received(
1018                &acked,
1019                25,
1020                packet::Epoch::Application,
1021                HandshakeStatus::default(),
1022                now,
1023                None,
1024                "",
1025            )
1026            .unwrap(),
1027            OnAckReceivedOutcome {
1028                lost_packets: 0,
1029                lost_bytes: 0,
1030                acked_bytes: 2 * 1000,
1031                spurious_losses: 0,
1032            }
1033        );
1034
1035        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 2);
1036        assert_eq!(r.bytes_in_flight(), 2000);
1037        assert_eq!(r.bytes_in_flight_duration(), Duration::from_millis(10));
1038        assert_eq!(r.lost_count(), 0);
1039
1040        // Wait until loss detection timer expires.
1041        now = r.loss_detection_timer().unwrap();
1042
1043        // PTO.
1044        r.on_loss_detection_timeout(HandshakeStatus::default(), now, "");
1045        assert_eq!(r.loss_probes(packet::Epoch::Application), 1);
1046        assert_eq!(r.lost_count(), 0);
1047        assert_eq!(r.pto_count(), 1);
1048
1049        let p = Sent {
1050            pkt_num: 4,
1051            frames: smallvec![],
1052            time_sent: now,
1053            time_acked: None,
1054            time_lost: None,
1055            size: 1000,
1056            ack_eliciting: true,
1057            in_flight: true,
1058            delivered: 0,
1059            delivered_time: now,
1060            first_sent_time: now,
1061            is_app_limited: false,
1062            tx_in_flight: 0,
1063            lost: 0,
1064            has_data: false,
1065            is_pmtud_probe: false,
1066        };
1067
1068        r.on_packet_sent(
1069            p,
1070            packet::Epoch::Application,
1071            HandshakeStatus::default(),
1072            now,
1073            "",
1074        );
1075        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 3);
1076        assert_eq!(r.bytes_in_flight(), 3000);
1077        assert_eq!(r.bytes_in_flight_duration(), Duration::from_millis(30));
1078
1079        let p = Sent {
1080            pkt_num: 5,
1081            frames: smallvec![],
1082            time_sent: now,
1083            time_acked: None,
1084            time_lost: None,
1085            size: 1000,
1086            ack_eliciting: true,
1087            in_flight: true,
1088            delivered: 0,
1089            delivered_time: now,
1090            first_sent_time: now,
1091            is_app_limited: false,
1092            tx_in_flight: 0,
1093            lost: 0,
1094            has_data: false,
1095            is_pmtud_probe: false,
1096        };
1097
1098        r.on_packet_sent(
1099            p,
1100            packet::Epoch::Application,
1101            HandshakeStatus::default(),
1102            now,
1103            "",
1104        );
1105        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 4);
1106        assert_eq!(r.bytes_in_flight(), 4000);
1107        assert_eq!(r.bytes_in_flight_duration(), Duration::from_millis(30));
1108        assert_eq!(r.lost_count(), 0);
1109
1110        // Wait for 10ms.
1111        now += Duration::from_millis(10);
1112
1113        // PTO packets are acked.
1114        let mut acked = RangeSet::default();
1115        acked.insert(4..6);
1116
1117        assert_eq!(
1118            r.on_ack_received(
1119                &acked,
1120                25,
1121                packet::Epoch::Application,
1122                HandshakeStatus::default(),
1123                now,
1124                None,
1125                "",
1126            )
1127            .unwrap(),
1128            OnAckReceivedOutcome {
1129                lost_packets: 2,
1130                lost_bytes: 2000,
1131                acked_bytes: 2 * 1000,
1132                spurious_losses: 0,
1133            }
1134        );
1135
1136        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 4);
1137        assert_eq!(r.bytes_in_flight(), 0);
1138        assert_eq!(r.bytes_in_flight_duration(), Duration::from_millis(40));
1139
1140        assert_eq!(r.lost_count(), 2);
1141
1142        // Wait 1 RTT.
1143        now += r.rtt();
1144
1145        assert_eq!(
1146            r.detect_lost_packets_for_test(packet::Epoch::Application, now),
1147            (0, 0)
1148        );
1149
1150        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 0);
1151        if cc_algorithm_name == "reno" || cc_algorithm_name == "cubic" {
1152            assert!(r.startup_exit().is_some());
1153            assert_eq!(r.startup_exit().unwrap().reason, StartupExitReason::Loss);
1154        } else {
1155            assert_eq!(r.startup_exit(), None);
1156        }
1157    }
1158
1159    #[rstest]
1160    fn loss_on_timer(
1161        #[values("reno", "cubic", "bbr2_gcongestion")] cc_algorithm_name: &str,
1162    ) {
1163        let mut cfg = Config::new(crate::PROTOCOL_VERSION).unwrap();
1164        assert_eq!(cfg.set_cc_algorithm_name(cc_algorithm_name), Ok(()));
1165
1166        let mut r = Recovery::new(&cfg);
1167
1168        let mut now = Instant::now();
1169
1170        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 0);
1171
1172        // Start by sending a few packets.
1173        let p = Sent {
1174            pkt_num: 0,
1175            frames: smallvec![],
1176            time_sent: now,
1177            time_acked: None,
1178            time_lost: None,
1179            size: 1000,
1180            ack_eliciting: true,
1181            in_flight: true,
1182            delivered: 0,
1183            delivered_time: now,
1184            first_sent_time: now,
1185            is_app_limited: false,
1186            tx_in_flight: 0,
1187            lost: 0,
1188            has_data: false,
1189            is_pmtud_probe: false,
1190        };
1191
1192        r.on_packet_sent(
1193            p,
1194            packet::Epoch::Application,
1195            HandshakeStatus::default(),
1196            now,
1197            "",
1198        );
1199        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 1);
1200        assert_eq!(r.bytes_in_flight(), 1000);
1201        assert_eq!(r.bytes_in_flight_duration(), Duration::ZERO);
1202
1203        let p = Sent {
1204            pkt_num: 1,
1205            frames: smallvec![],
1206            time_sent: now,
1207            time_acked: None,
1208            time_lost: None,
1209            size: 1000,
1210            ack_eliciting: true,
1211            in_flight: true,
1212            delivered: 0,
1213            delivered_time: now,
1214            first_sent_time: now,
1215            is_app_limited: false,
1216            tx_in_flight: 0,
1217            lost: 0,
1218            has_data: false,
1219            is_pmtud_probe: false,
1220        };
1221
1222        r.on_packet_sent(
1223            p,
1224            packet::Epoch::Application,
1225            HandshakeStatus::default(),
1226            now,
1227            "",
1228        );
1229        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 2);
1230        assert_eq!(r.bytes_in_flight(), 2000);
1231        assert_eq!(r.bytes_in_flight_duration(), Duration::ZERO);
1232
1233        let p = Sent {
1234            pkt_num: 2,
1235            frames: smallvec![],
1236            time_sent: now,
1237            time_acked: None,
1238            time_lost: None,
1239            size: 1000,
1240            ack_eliciting: true,
1241            in_flight: true,
1242            delivered: 0,
1243            delivered_time: now,
1244            first_sent_time: now,
1245            is_app_limited: false,
1246            tx_in_flight: 0,
1247            lost: 0,
1248            has_data: false,
1249            is_pmtud_probe: false,
1250        };
1251
1252        r.on_packet_sent(
1253            p,
1254            packet::Epoch::Application,
1255            HandshakeStatus::default(),
1256            now,
1257            "",
1258        );
1259        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 3);
1260        assert_eq!(r.bytes_in_flight(), 3000);
1261        assert_eq!(r.bytes_in_flight_duration(), Duration::ZERO);
1262
1263        let p = Sent {
1264            pkt_num: 3,
1265            frames: smallvec![],
1266            time_sent: now,
1267            time_acked: None,
1268            time_lost: None,
1269            size: 1000,
1270            ack_eliciting: true,
1271            in_flight: true,
1272            delivered: 0,
1273            delivered_time: now,
1274            first_sent_time: now,
1275            is_app_limited: false,
1276            tx_in_flight: 0,
1277            lost: 0,
1278            has_data: false,
1279            is_pmtud_probe: false,
1280        };
1281
1282        r.on_packet_sent(
1283            p,
1284            packet::Epoch::Application,
1285            HandshakeStatus::default(),
1286            now,
1287            "",
1288        );
1289        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 4);
1290        assert_eq!(r.bytes_in_flight(), 4000);
1291        assert_eq!(r.bytes_in_flight_duration(), Duration::ZERO);
1292
1293        // Wait for 10ms.
1294        now += Duration::from_millis(10);
1295
1296        // Only the first 2 packets and the last one are acked.
1297        let mut acked = RangeSet::default();
1298        acked.insert(0..2);
1299        acked.insert(3..4);
1300
1301        assert_eq!(
1302            r.on_ack_received(
1303                &acked,
1304                25,
1305                packet::Epoch::Application,
1306                HandshakeStatus::default(),
1307                now,
1308                None,
1309                "",
1310            )
1311            .unwrap(),
1312            OnAckReceivedOutcome {
1313                lost_packets: 0,
1314                lost_bytes: 0,
1315                acked_bytes: 3 * 1000,
1316                spurious_losses: 0,
1317            }
1318        );
1319
1320        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 2);
1321        assert_eq!(r.bytes_in_flight(), 1000);
1322        assert_eq!(r.bytes_in_flight_duration(), Duration::from_millis(10));
1323        assert_eq!(r.lost_count(), 0);
1324
1325        // Wait until loss detection timer expires.
1326        now = r.loss_detection_timer().unwrap();
1327
1328        // Packet is declared lost.
1329        r.on_loss_detection_timeout(HandshakeStatus::default(), now, "");
1330        assert_eq!(r.loss_probes(packet::Epoch::Application), 0);
1331
1332        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 2);
1333        assert_eq!(r.bytes_in_flight(), 0);
1334        assert_eq!(r.bytes_in_flight_duration(), Duration::from_micros(11250));
1335
1336        assert_eq!(r.lost_count(), 1);
1337
1338        // Wait 1 RTT.
1339        now += r.rtt();
1340
1341        assert_eq!(
1342            r.detect_lost_packets_for_test(packet::Epoch::Application, now),
1343            (0, 0)
1344        );
1345
1346        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 0);
1347        if cc_algorithm_name == "reno" || cc_algorithm_name == "cubic" {
1348            assert!(r.startup_exit().is_some());
1349            assert_eq!(r.startup_exit().unwrap().reason, StartupExitReason::Loss);
1350        } else {
1351            assert_eq!(r.startup_exit(), None);
1352        }
1353    }
1354
1355    #[rstest]
1356    fn loss_on_reordering(
1357        #[values("reno", "cubic", "bbr2_gcongestion")] cc_algorithm_name: &str,
1358    ) {
1359        let mut cfg = Config::new(crate::PROTOCOL_VERSION).unwrap();
1360        assert_eq!(cfg.set_cc_algorithm_name(cc_algorithm_name), Ok(()));
1361
1362        let mut r = Recovery::new(&cfg);
1363
1364        let mut now = Instant::now();
1365
1366        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 0);
1367
1368        // Start by sending a few packets.
1369        //
1370        // pkt number: [0, 1, 2, 3]
1371        for i in 0..4 {
1372            let p = test_utils::helper_packet_sent(i, now, 1000);
1373            r.on_packet_sent(
1374                p,
1375                packet::Epoch::Application,
1376                HandshakeStatus::default(),
1377                now,
1378                "",
1379            );
1380
1381            let pkt_count = (i + 1) as usize;
1382            assert_eq!(r.sent_packets_len(packet::Epoch::Application), pkt_count);
1383            assert_eq!(r.bytes_in_flight(), pkt_count * 1000);
1384            assert_eq!(r.bytes_in_flight_duration(), Duration::ZERO);
1385        }
1386
1387        // Wait for 10ms after sending.
1388        now += Duration::from_millis(10);
1389
1390        // Recieve reordered ACKs, i.e. pkt_num [2, 3]
1391        let mut acked = RangeSet::default();
1392        acked.insert(2..4);
1393        assert_eq!(
1394            r.on_ack_received(
1395                &acked,
1396                25,
1397                packet::Epoch::Application,
1398                HandshakeStatus::default(),
1399                now,
1400                None,
1401                "",
1402            )
1403            .unwrap(),
1404            OnAckReceivedOutcome {
1405                lost_packets: 1,
1406                lost_bytes: 1000,
1407                acked_bytes: 1000 * 2,
1408                spurious_losses: 0,
1409            }
1410        );
1411        // Since we only remove packets from the back to avoid compaction, the
1412        // send length remains the same after receiving reordered ACKs
1413        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 4);
1414        assert_eq!(r.pkt_thresh().unwrap(), INITIAL_PACKET_THRESHOLD);
1415        assert_eq!(r.time_thresh(), INITIAL_TIME_THRESHOLD);
1416
1417        // Wait for 10ms after receiving first set of ACKs.
1418        now += Duration::from_millis(10);
1419
1420        // Recieve remaining ACKs, i.e. pkt_num [0, 1]
1421        let mut acked = RangeSet::default();
1422        acked.insert(0..2);
1423        assert_eq!(
1424            r.on_ack_received(
1425                &acked,
1426                25,
1427                packet::Epoch::Application,
1428                HandshakeStatus::default(),
1429                now,
1430                None,
1431                "",
1432            )
1433            .unwrap(),
1434            OnAckReceivedOutcome {
1435                lost_packets: 0,
1436                lost_bytes: 0,
1437                acked_bytes: 1000,
1438                spurious_losses: 1,
1439            }
1440        );
1441        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 0);
1442        assert_eq!(r.bytes_in_flight(), 0);
1443        assert_eq!(r.bytes_in_flight_duration(), Duration::from_millis(20));
1444
1445        // Spurious loss.
1446        assert_eq!(r.lost_count(), 1);
1447        assert_eq!(r.lost_spurious_count(), 1);
1448
1449        // Packet threshold was increased.
1450        assert_eq!(r.pkt_thresh().unwrap(), 4);
1451        assert_eq!(r.time_thresh(), PACKET_REORDER_TIME_THRESHOLD);
1452
1453        // Wait 1 RTT.
1454        now += r.rtt();
1455
1456        // All packets have been ACKed so dont expect additional lost packets
1457        assert_eq!(
1458            r.detect_lost_packets_for_test(packet::Epoch::Application, now),
1459            (0, 0)
1460        );
1461        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 0);
1462
1463        if cc_algorithm_name == "reno" || cc_algorithm_name == "cubic" {
1464            assert!(r.startup_exit().is_some());
1465            assert_eq!(r.startup_exit().unwrap().reason, StartupExitReason::Loss);
1466        } else {
1467            assert_eq!(r.startup_exit(), None);
1468        }
1469    }
1470
1471    // TODO: This should run agains both `congestion` and `gcongestion`.
1472    // `congestion` and `gcongestion` behave differently. That might be ok
1473    // given the different algorithms but it would be ideal to merge and share
1474    // the logic.
1475    #[rstest]
1476    fn time_thresholds_on_reordering(
1477        #[values("bbr2_gcongestion")] cc_algorithm_name: &str,
1478    ) {
1479        let mut cfg = Config::new(crate::PROTOCOL_VERSION).unwrap();
1480        assert_eq!(cfg.set_cc_algorithm_name(cc_algorithm_name), Ok(()));
1481
1482        let mut now = Instant::now();
1483        let mut r = Recovery::new(&cfg);
1484        assert_eq!(r.rtt(), DEFAULT_INITIAL_RTT);
1485
1486        // Pick time between and above thresholds for testing threshold increase.
1487        //
1488        //```
1489        //              between_thresh_ms
1490        //                         |
1491        //    initial_thresh_ms    |     spurious_thresh_ms
1492        //      v                  v             v
1493        // --------------------------------------------------
1494        //      | ................ | ..................... |
1495        //            THRESH_GAP         THRESH_GAP
1496        // ```
1497        // 
1498        // Threshold gap time.
1499        const THRESH_GAP: Duration = Duration::from_millis(30);
1500        // Initial time theshold based on inital RTT.
1501        let initial_thresh_ms =
1502            DEFAULT_INITIAL_RTT.mul_f64(INITIAL_TIME_THRESHOLD);
1503        // The time threshold after spurious loss.
1504        let spurious_thresh_ms: Duration =
1505            DEFAULT_INITIAL_RTT.mul_f64(PACKET_REORDER_TIME_THRESHOLD);
1506        // Time between the two thresholds
1507        let between_thresh_ms = initial_thresh_ms + THRESH_GAP;
1508        assert!(between_thresh_ms > initial_thresh_ms);
1509        assert!(between_thresh_ms < spurious_thresh_ms);
1510        assert!(between_thresh_ms + THRESH_GAP > spurious_thresh_ms);
1511
1512        for i in 0..6 {
1513            let send_time = now + i * between_thresh_ms;
1514
1515            let p = test_utils::helper_packet_sent(i.into(), send_time, 1000);
1516            r.on_packet_sent(
1517                p,
1518                packet::Epoch::Application,
1519                HandshakeStatus::default(),
1520                send_time,
1521                "",
1522            );
1523        }
1524
1525        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 6);
1526        assert_eq!(r.bytes_in_flight(), 6 * 1000);
1527        assert_eq!(r.pkt_thresh().unwrap(), INITIAL_PACKET_THRESHOLD);
1528        assert_eq!(r.time_thresh(), INITIAL_TIME_THRESHOLD);
1529
1530        // Wait for `between_thresh_ms` after sending to trigger loss based on
1531        // loss threshold.
1532        now += between_thresh_ms;
1533
1534        // Ack packet: 1
1535        //
1536        // [0, 1, 2, 3, 4, 5]
1537        //     ^
1538        let mut acked = RangeSet::default();
1539        acked.insert(1..2);
1540        assert_eq!(
1541            r.on_ack_received(
1542                &acked,
1543                25,
1544                packet::Epoch::Application,
1545                HandshakeStatus::default(),
1546                now,
1547                None,
1548                "",
1549            )
1550            .unwrap(),
1551            OnAckReceivedOutcome {
1552                lost_packets: 1,
1553                lost_bytes: 1000,
1554                acked_bytes: 1000,
1555                spurious_losses: 0,
1556            }
1557        );
1558        assert_eq!(r.pkt_thresh().unwrap(), INITIAL_PACKET_THRESHOLD);
1559        assert_eq!(r.time_thresh(), INITIAL_TIME_THRESHOLD);
1560
1561        // Ack packet: 0
1562        //
1563        // [0, 1, 2, 3, 4, 5]
1564        //  ^  x
1565        let mut acked = RangeSet::default();
1566        acked.insert(0..1);
1567        assert_eq!(
1568            r.on_ack_received(
1569                &acked,
1570                25,
1571                packet::Epoch::Application,
1572                HandshakeStatus::default(),
1573                now,
1574                None,
1575                "",
1576            )
1577            .unwrap(),
1578            OnAckReceivedOutcome {
1579                lost_packets: 0,
1580                lost_bytes: 0,
1581                acked_bytes: 0,
1582                spurious_losses: 1,
1583            }
1584        );
1585        // The time_thresh after spurious loss
1586        assert_eq!(r.time_thresh(), PACKET_REORDER_TIME_THRESHOLD);
1587
1588        // Wait for `between_thresh_ms` after sending. However, since the
1589        // threshold has increased, we do not expect loss.
1590        now += between_thresh_ms;
1591
1592        // Ack packet: 3
1593        //
1594        // [2, 3, 4, 5]
1595        //     ^
1596        let mut acked = RangeSet::default();
1597        acked.insert(3..4);
1598        assert_eq!(
1599            r.on_ack_received(
1600                &acked,
1601                25,
1602                packet::Epoch::Application,
1603                HandshakeStatus::default(),
1604                now,
1605                None,
1606                "",
1607            )
1608            .unwrap(),
1609            OnAckReceivedOutcome {
1610                lost_packets: 0,
1611                lost_bytes: 0,
1612                acked_bytes: 1000,
1613                spurious_losses: 0,
1614            }
1615        );
1616
1617        // Wait for and additional `plus_overhead` to trigger loss based on the
1618        // new time threshold.
1619        now += THRESH_GAP;
1620
1621        // Ack packet: 4
1622        //
1623        // [2, 3, 4, 5]
1624        //     x  ^
1625        let mut acked = RangeSet::default();
1626        acked.insert(4..5);
1627        assert_eq!(
1628            r.on_ack_received(
1629                &acked,
1630                25,
1631                packet::Epoch::Application,
1632                HandshakeStatus::default(),
1633                now,
1634                None,
1635                "",
1636            )
1637            .unwrap(),
1638            OnAckReceivedOutcome {
1639                lost_packets: 1,
1640                lost_bytes: 1000,
1641                acked_bytes: 1000,
1642                spurious_losses: 0,
1643            }
1644        );
1645    }
1646
1647    // TODO: Implement enable_relaxed_loss_threshold and enable this test for the
1648    // congestion module.
1649    #[rstest]
1650    fn relaxed_thresholds_on_reordering(
1651        #[values("bbr2_gcongestion")] cc_algorithm_name: &str,
1652    ) {
1653        let mut cfg = Config::new(crate::PROTOCOL_VERSION).unwrap();
1654        cfg.enable_relaxed_loss_threshold = true;
1655        assert_eq!(cfg.set_cc_algorithm_name(cc_algorithm_name), Ok(()));
1656
1657        let mut now = Instant::now();
1658        let mut r = Recovery::new(&cfg);
1659        assert_eq!(r.rtt(), DEFAULT_INITIAL_RTT);
1660
1661        // Pick time between and above thresholds for testing threshold increase.
1662        //
1663        //```
1664        //              between_thresh_ms
1665        //                         |
1666        //    initial_thresh_ms    |     spurious_thresh_ms
1667        //      v                  v             v
1668        // --------------------------------------------------
1669        //      | ................ | ..................... |
1670        //            THRESH_GAP         THRESH_GAP
1671        // ```
1672        // Threshold gap time.
1673        const THRESH_GAP: Duration = Duration::from_millis(30);
1674        // Initial time theshold based on inital RTT.
1675        let initial_thresh_ms =
1676            DEFAULT_INITIAL_RTT.mul_f64(INITIAL_TIME_THRESHOLD);
1677        // The time threshold after spurious loss.
1678        let spurious_thresh_ms: Duration =
1679            DEFAULT_INITIAL_RTT.mul_f64(PACKET_REORDER_TIME_THRESHOLD);
1680        // Time between the two thresholds
1681        let between_thresh_ms = initial_thresh_ms + THRESH_GAP;
1682        assert!(between_thresh_ms > initial_thresh_ms);
1683        assert!(between_thresh_ms < spurious_thresh_ms);
1684        assert!(between_thresh_ms + THRESH_GAP > spurious_thresh_ms);
1685
1686        for i in 0..6 {
1687            let send_time = now + i * between_thresh_ms;
1688
1689            let p = test_utils::helper_packet_sent(i.into(), send_time, 1000);
1690            r.on_packet_sent(
1691                p,
1692                packet::Epoch::Application,
1693                HandshakeStatus::default(),
1694                send_time,
1695                "",
1696            );
1697        }
1698
1699        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 6);
1700        assert_eq!(r.bytes_in_flight(), 6 * 1000);
1701        // Intitial thresholds
1702        assert_eq!(r.pkt_thresh().unwrap(), INITIAL_PACKET_THRESHOLD);
1703        assert_eq!(r.time_thresh(), INITIAL_TIME_THRESHOLD);
1704
1705        // Wait for `between_thresh_ms` after sending to trigger loss based on
1706        // loss threshold.
1707        now += between_thresh_ms;
1708
1709        // Ack packet: 1
1710        //
1711        // [0, 1, 2, 3, 4, 5]
1712        //     ^
1713        let mut acked = RangeSet::default();
1714        acked.insert(1..2);
1715        assert_eq!(
1716            r.on_ack_received(
1717                &acked,
1718                25,
1719                packet::Epoch::Application,
1720                HandshakeStatus::default(),
1721                now,
1722                None,
1723                "",
1724            )
1725            .unwrap(),
1726            OnAckReceivedOutcome {
1727                lost_packets: 1,
1728                lost_bytes: 1000,
1729                acked_bytes: 1000,
1730                spurious_losses: 0,
1731            }
1732        );
1733        // Thresholds after 1st loss
1734        assert_eq!(r.pkt_thresh().unwrap(), INITIAL_PACKET_THRESHOLD);
1735        assert_eq!(r.time_thresh(), INITIAL_TIME_THRESHOLD);
1736
1737        // Ack packet: 0
1738        //
1739        // [0, 1, 2, 3, 4, 5]
1740        //  ^  x
1741        let mut acked = RangeSet::default();
1742        acked.insert(0..1);
1743        assert_eq!(
1744            r.on_ack_received(
1745                &acked,
1746                25,
1747                packet::Epoch::Application,
1748                HandshakeStatus::default(),
1749                now,
1750                None,
1751                "",
1752            )
1753            .unwrap(),
1754            OnAckReceivedOutcome {
1755                lost_packets: 0,
1756                lost_bytes: 0,
1757                acked_bytes: 0,
1758                spurious_losses: 1,
1759            }
1760        );
1761        // Thresholds after 1st spurious loss
1762        //
1763        // Packet threshold should be disabled. Time threshold overhead should
1764        // stay the same.
1765        assert_eq!(r.pkt_thresh(), None);
1766        assert_eq!(r.time_thresh(), INITIAL_TIME_THRESHOLD);
1767
1768        // Set now to send time of packet 2 so we can trigger spurious loss for
1769        // packet 2.
1770        now += between_thresh_ms;
1771        // Then wait for `between_thresh_ms` after sending packet 2 to trigger
1772        // loss. Since the time threshold has NOT increased, expect a
1773        // loss.
1774        now += between_thresh_ms;
1775
1776        // Ack packet: 3
1777        //
1778        // [2, 3, 4, 5]
1779        //     ^
1780        let mut acked = RangeSet::default();
1781        acked.insert(3..4);
1782        assert_eq!(
1783            r.on_ack_received(
1784                &acked,
1785                25,
1786                packet::Epoch::Application,
1787                HandshakeStatus::default(),
1788                now,
1789                None,
1790                "",
1791            )
1792            .unwrap(),
1793            OnAckReceivedOutcome {
1794                lost_packets: 1,
1795                lost_bytes: 1000,
1796                acked_bytes: 1000,
1797                spurious_losses: 0,
1798            }
1799        );
1800        // Thresholds after 2nd loss.
1801        assert_eq!(r.pkt_thresh(), None);
1802        assert_eq!(r.time_thresh(), INITIAL_TIME_THRESHOLD);
1803
1804        // Wait for and additional `plus_overhead` to trigger loss based on the
1805        // new time threshold.
1806        // now += THRESH_GAP;
1807
1808        // Ack packet: 2
1809        //
1810        // [2, 3, 4, 5]
1811        //  ^  x
1812        let mut acked = RangeSet::default();
1813        acked.insert(2..3);
1814        assert_eq!(
1815            r.on_ack_received(
1816                &acked,
1817                25,
1818                packet::Epoch::Application,
1819                HandshakeStatus::default(),
1820                now,
1821                None,
1822                "",
1823            )
1824            .unwrap(),
1825            OnAckReceivedOutcome {
1826                lost_packets: 0,
1827                lost_bytes: 0,
1828                acked_bytes: 0,
1829                spurious_losses: 1,
1830            }
1831        );
1832        // Thresholds after 2nd spurious loss.
1833        //
1834        // Time threshold overhead should double.
1835        assert_eq!(r.pkt_thresh(), None);
1836        let double_time_thresh_overhead =
1837            1.0 + 2.0 * INITIAL_TIME_THRESHOLD_OVERHEAD;
1838        assert_eq!(r.time_thresh(), double_time_thresh_overhead);
1839    }
1840
1841    #[rstest]
1842    fn pacing(
1843        #[values("reno", "cubic", "bbr2_gcongestion")] cc_algorithm_name: &str,
1844        #[values(false, true)] time_sent_set_to_now: bool,
1845    ) {
1846        let pacing_enabled = cc_algorithm_name == "bbr2" ||
1847            cc_algorithm_name == "bbr2_gcongestion";
1848
1849        let mut cfg = Config::new(crate::PROTOCOL_VERSION).unwrap();
1850        assert_eq!(cfg.set_cc_algorithm_name(cc_algorithm_name), Ok(()));
1851
1852        #[cfg(feature = "internal")]
1853        cfg.set_custom_bbr_params(BbrParams {
1854            time_sent_set_to_now: Some(time_sent_set_to_now),
1855            ..Default::default()
1856        });
1857
1858        let mut r = Recovery::new(&cfg);
1859
1860        let mut now = Instant::now();
1861
1862        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 0);
1863
1864        // send out first packet burst (a full initcwnd).
1865        for i in 0..10 {
1866            let p = Sent {
1867                pkt_num: i,
1868                frames: smallvec![],
1869                time_sent: now,
1870                time_acked: None,
1871                time_lost: None,
1872                size: 1200,
1873                ack_eliciting: true,
1874                in_flight: true,
1875                delivered: 0,
1876                delivered_time: now,
1877                first_sent_time: now,
1878                is_app_limited: false,
1879                tx_in_flight: 0,
1880                lost: 0,
1881                has_data: true,
1882                is_pmtud_probe: false,
1883            };
1884
1885            r.on_packet_sent(
1886                p,
1887                packet::Epoch::Application,
1888                HandshakeStatus::default(),
1889                now,
1890                "",
1891            );
1892        }
1893
1894        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 10);
1895        assert_eq!(r.bytes_in_flight(), 12000);
1896        assert_eq!(r.bytes_in_flight_duration(), Duration::ZERO);
1897
1898        if !pacing_enabled {
1899            assert_eq!(r.pacing_rate(), 0);
1900        } else {
1901            assert_eq!(r.pacing_rate(), 103963);
1902        }
1903        assert_eq!(r.get_packet_send_time(now), now);
1904
1905        assert_eq!(r.cwnd(), 12000);
1906        assert_eq!(r.cwnd_available(), 0);
1907
1908        // Wait 50ms for ACK.
1909        let initial_rtt = Duration::from_millis(50);
1910        now += initial_rtt;
1911
1912        let mut acked = RangeSet::default();
1913        acked.insert(0..10);
1914
1915        assert_eq!(
1916            r.on_ack_received(
1917                &acked,
1918                10,
1919                packet::Epoch::Application,
1920                HandshakeStatus::default(),
1921                now,
1922                None,
1923                "",
1924            )
1925            .unwrap(),
1926            OnAckReceivedOutcome {
1927                lost_packets: 0,
1928                lost_bytes: 0,
1929                acked_bytes: 12000,
1930                spurious_losses: 0,
1931            }
1932        );
1933
1934        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 0);
1935        assert_eq!(r.bytes_in_flight(), 0);
1936        assert_eq!(r.bytes_in_flight_duration(), initial_rtt);
1937        assert_eq!(r.min_rtt(), Some(initial_rtt));
1938        assert_eq!(r.rtt(), initial_rtt);
1939
1940        // 10 MSS increased due to acks.
1941        assert_eq!(r.cwnd(), 12000 + 1200 * 10);
1942
1943        // Send the second packet burst.
1944        let p = Sent {
1945            pkt_num: 10,
1946            frames: smallvec![],
1947            time_sent: now,
1948            time_acked: None,
1949            time_lost: None,
1950            size: 6000,
1951            ack_eliciting: true,
1952            in_flight: true,
1953            delivered: 0,
1954            delivered_time: now,
1955            first_sent_time: now,
1956            is_app_limited: false,
1957            tx_in_flight: 0,
1958            lost: 0,
1959            has_data: true,
1960            is_pmtud_probe: false,
1961        };
1962
1963        r.on_packet_sent(
1964            p,
1965            packet::Epoch::Application,
1966            HandshakeStatus::default(),
1967            now,
1968            "",
1969        );
1970
1971        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 1);
1972        assert_eq!(r.bytes_in_flight(), 6000);
1973        assert_eq!(r.bytes_in_flight_duration(), initial_rtt);
1974
1975        if !pacing_enabled {
1976            // Pacing is disabled.
1977            assert_eq!(r.get_packet_send_time(now), now);
1978        } else {
1979            // Pacing is done from the beginning.
1980            assert_ne!(r.get_packet_send_time(now), now);
1981        }
1982
1983        // Send the third and fourth packet bursts together.
1984        let p = Sent {
1985            pkt_num: 11,
1986            frames: smallvec![],
1987            time_sent: now,
1988            time_acked: None,
1989            time_lost: None,
1990            size: 6000,
1991            ack_eliciting: true,
1992            in_flight: true,
1993            delivered: 0,
1994            delivered_time: now,
1995            first_sent_time: now,
1996            is_app_limited: false,
1997            tx_in_flight: 0,
1998            lost: 0,
1999            has_data: true,
2000            is_pmtud_probe: false,
2001        };
2002
2003        r.on_packet_sent(
2004            p,
2005            packet::Epoch::Application,
2006            HandshakeStatus::default(),
2007            now,
2008            "",
2009        );
2010
2011        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 2);
2012        assert_eq!(r.bytes_in_flight(), 12000);
2013        assert_eq!(r.bytes_in_flight_duration(), initial_rtt);
2014
2015        // Send the fourth packet burst.
2016        let p = Sent {
2017            pkt_num: 12,
2018            frames: smallvec![],
2019            time_sent: now,
2020            time_acked: None,
2021            time_lost: None,
2022            size: 1000,
2023            ack_eliciting: true,
2024            in_flight: true,
2025            delivered: 0,
2026            delivered_time: now,
2027            first_sent_time: now,
2028            is_app_limited: false,
2029            tx_in_flight: 0,
2030            lost: 0,
2031            has_data: true,
2032            is_pmtud_probe: false,
2033        };
2034
2035        r.on_packet_sent(
2036            p,
2037            packet::Epoch::Application,
2038            HandshakeStatus::default(),
2039            now,
2040            "",
2041        );
2042
2043        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 3);
2044        assert_eq!(r.bytes_in_flight(), 13000);
2045        assert_eq!(r.bytes_in_flight_duration(), initial_rtt);
2046
2047        // We pace this outgoing packet. as all conditions for pacing
2048        // are passed.
2049        let pacing_rate = if pacing_enabled {
2050            let cwnd_gain: f64 = 2.0;
2051            // Adjust for cwnd_gain.  BW estimate was made before the CWND
2052            // increase.
2053            let bw = r.cwnd() as f64 / cwnd_gain / initial_rtt.as_secs_f64();
2054            bw as u64
2055        } else {
2056            0
2057        };
2058        assert_eq!(r.pacing_rate(), pacing_rate);
2059
2060        let scale_factor = if pacing_enabled {
2061            // For bbr2_gcongestion, send time is almost 13000 / pacing_rate.
2062            // Don't know where 13000 comes from.
2063            1.08333332
2064        } else {
2065            1.0
2066        };
2067        assert_eq!(
2068            r.get_packet_send_time(now) - now,
2069            if pacing_enabled {
2070                Duration::from_secs_f64(
2071                    scale_factor * 12000.0 / pacing_rate as f64,
2072                )
2073            } else {
2074                Duration::ZERO
2075            }
2076        );
2077        assert_eq!(r.startup_exit(), None);
2078
2079        let reduced_rtt = Duration::from_millis(40);
2080        now += reduced_rtt;
2081
2082        let mut acked = RangeSet::default();
2083        acked.insert(10..11);
2084
2085        assert_eq!(
2086            r.on_ack_received(
2087                &acked,
2088                0,
2089                packet::Epoch::Application,
2090                HandshakeStatus::default(),
2091                now,
2092                None,
2093                "",
2094            )
2095            .unwrap(),
2096            OnAckReceivedOutcome {
2097                lost_packets: 0,
2098                lost_bytes: 0,
2099                acked_bytes: 6000,
2100                spurious_losses: 0,
2101            }
2102        );
2103
2104        let expected_srtt = (7 * initial_rtt + reduced_rtt) / 8;
2105        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 2);
2106        assert_eq!(r.bytes_in_flight(), 7000);
2107        assert_eq!(r.bytes_in_flight_duration(), initial_rtt + reduced_rtt);
2108        assert_eq!(r.min_rtt(), Some(reduced_rtt));
2109        assert_eq!(r.rtt(), expected_srtt);
2110
2111        let mut acked = RangeSet::default();
2112        acked.insert(11..12);
2113
2114        assert_eq!(
2115            r.on_ack_received(
2116                &acked,
2117                0,
2118                packet::Epoch::Application,
2119                HandshakeStatus::default(),
2120                now,
2121                None,
2122                "",
2123            )
2124            .unwrap(),
2125            OnAckReceivedOutcome {
2126                lost_packets: 0,
2127                lost_bytes: 0,
2128                acked_bytes: 6000,
2129                spurious_losses: 0,
2130            }
2131        );
2132
2133        // When enabled, the pacer adds a 25msec delay to the packet
2134        // sends which will be applied to the sent times tracked by
2135        // the recovery module, bringing down RTT to 15msec.
2136        let expected_min_rtt = if pacing_enabled &&
2137            !time_sent_set_to_now &&
2138            cfg!(feature = "internal")
2139        {
2140            reduced_rtt - Duration::from_millis(25)
2141        } else {
2142            reduced_rtt
2143        };
2144
2145        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 1);
2146        assert_eq!(r.bytes_in_flight(), 1000);
2147        assert_eq!(r.bytes_in_flight_duration(), initial_rtt + reduced_rtt);
2148        assert_eq!(r.min_rtt(), Some(expected_min_rtt));
2149
2150        let expected_srtt = (7 * expected_srtt + expected_min_rtt) / 8;
2151        assert_eq!(r.rtt(), expected_srtt);
2152
2153        let mut acked = RangeSet::default();
2154        acked.insert(12..13);
2155
2156        assert_eq!(
2157            r.on_ack_received(
2158                &acked,
2159                0,
2160                packet::Epoch::Application,
2161                HandshakeStatus::default(),
2162                now,
2163                None,
2164                "",
2165            )
2166            .unwrap(),
2167            OnAckReceivedOutcome {
2168                lost_packets: 0,
2169                lost_bytes: 0,
2170                acked_bytes: 1000,
2171                spurious_losses: 0,
2172            }
2173        );
2174
2175        // Pacer adds 50msec delay to the second packet, resulting in
2176        // an effective RTT of 0.
2177        let expected_min_rtt = if pacing_enabled &&
2178            !time_sent_set_to_now &&
2179            cfg!(feature = "internal")
2180        {
2181            Duration::from_millis(0)
2182        } else {
2183            reduced_rtt
2184        };
2185        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 0);
2186        assert_eq!(r.bytes_in_flight(), 0);
2187        assert_eq!(r.bytes_in_flight_duration(), initial_rtt + reduced_rtt);
2188        assert_eq!(r.min_rtt(), Some(expected_min_rtt));
2189
2190        let expected_srtt = (7 * expected_srtt + expected_min_rtt) / 8;
2191        assert_eq!(r.rtt(), expected_srtt);
2192    }
2193
2194    #[rstest]
2195    // initial_cwnd / first_rtt == initial_pacing_rate.  Pacing is 1.0 * bw before
2196    // and after.
2197    #[case::bw_estimate_equal_after_first_rtt(1.0, 1.0)]
2198    // initial_cwnd / first_rtt < initial_pacing_rate.  Pacing decreases from 2 *
2199    // bw to 1.0 * bw.
2200    #[case::bw_estimate_decrease_after_first_rtt(2.0, 1.0)]
2201    // initial_cwnd / first_rtt > initial_pacing_rate from 0.5 * bw to 1.0 * bw.
2202    // Initial pacing remains 0.5 * bw because the initial_pacing_rate parameter
2203    // is used an upper bound for the pacing rate after the first RTT.
2204    // Pacing rate after the first ACK should be:
2205    // min(initial_pacing_rate_bytes_per_second, init_cwnd / first_rtt)
2206    #[case::bw_estimate_increase_after_first_rtt(0.5, 0.5)]
2207    #[cfg(feature = "internal")]
2208    fn initial_pacing_rate_override(
2209        #[case] initial_multipler: f64, #[case] expected_multiplier: f64,
2210    ) {
2211        let rtt = Duration::from_millis(50);
2212        let bw = Bandwidth::from_bytes_and_time_delta(12000, rtt);
2213        let initial_pacing_rate_hint = bw * initial_multipler;
2214        let expected_pacing_with_rtt_measurement = bw * expected_multiplier;
2215
2216        let cc_algorithm_name = "bbr2_gcongestion";
2217        let mut cfg = Config::new(crate::PROTOCOL_VERSION).unwrap();
2218        assert_eq!(cfg.set_cc_algorithm_name(cc_algorithm_name), Ok(()));
2219        cfg.set_custom_bbr_params(BbrParams {
2220            initial_pacing_rate_bytes_per_second: Some(
2221                initial_pacing_rate_hint.to_bytes_per_second(),
2222            ),
2223            ..Default::default()
2224        });
2225
2226        let mut r = Recovery::new(&cfg);
2227
2228        let mut now = Instant::now();
2229
2230        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 0);
2231
2232        // send some packets.
2233        for i in 0..2 {
2234            let p = test_utils::helper_packet_sent(i, now, 1200);
2235            r.on_packet_sent(
2236                p,
2237                packet::Epoch::Application,
2238                HandshakeStatus::default(),
2239                now,
2240                "",
2241            );
2242        }
2243
2244        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 2);
2245        assert_eq!(r.bytes_in_flight(), 2400);
2246        assert_eq!(r.bytes_in_flight_duration(), Duration::ZERO);
2247
2248        // Initial pacing rate matches the override value.
2249        assert_eq!(
2250            r.pacing_rate(),
2251            initial_pacing_rate_hint.to_bytes_per_second()
2252        );
2253        assert_eq!(r.get_packet_send_time(now), now);
2254
2255        assert_eq!(r.cwnd(), 12000);
2256        assert_eq!(r.cwnd_available(), 9600);
2257
2258        // Wait 1 rtt for ACK.
2259        now += rtt;
2260
2261        let mut acked = RangeSet::default();
2262        acked.insert(0..2);
2263
2264        assert_eq!(
2265            r.on_ack_received(
2266                &acked,
2267                10,
2268                packet::Epoch::Application,
2269                HandshakeStatus::default(),
2270                now,
2271                None,
2272                "",
2273            )
2274            .unwrap(),
2275            OnAckReceivedOutcome {
2276                lost_packets: 0,
2277                lost_bytes: 0,
2278                acked_bytes: 2400,
2279                spurious_losses: 0,
2280            }
2281        );
2282
2283        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 0);
2284        assert_eq!(r.bytes_in_flight(), 0);
2285        assert_eq!(r.bytes_in_flight_duration(), rtt);
2286        assert_eq!(r.rtt(), rtt);
2287
2288        // Pacing rate is recalculated based on initial cwnd when the
2289        // first RTT estimate is available.
2290        assert_eq!(
2291            r.pacing_rate(),
2292            expected_pacing_with_rtt_measurement.to_bytes_per_second()
2293        );
2294    }
2295
2296    #[rstest]
2297    fn validate_ack_range_on_ack_received(
2298        #[values("cubic", "bbr2_gcongestion")] cc_algorithm_name: &str,
2299    ) {
2300        let mut cfg = Config::new(crate::PROTOCOL_VERSION).unwrap();
2301        cfg.set_cc_algorithm_name(cc_algorithm_name).unwrap();
2302
2303        let epoch = packet::Epoch::Application;
2304        let mut r = Recovery::new(&cfg);
2305        let mut now = Instant::now();
2306        assert_eq!(r.sent_packets_len(epoch), 0);
2307
2308        // Send 4 packets
2309        let pkt_size = 1000;
2310        let pkt_count = 4;
2311        for pkt_num in 0..pkt_count {
2312            let sent = test_utils::helper_packet_sent(pkt_num, now, pkt_size);
2313            r.on_packet_sent(sent, epoch, HandshakeStatus::default(), now, "");
2314        }
2315        assert_eq!(r.sent_packets_len(epoch), pkt_count as usize);
2316        assert_eq!(r.bytes_in_flight(), pkt_count as usize * pkt_size);
2317        assert!(r.get_largest_acked_on_epoch(epoch).is_none());
2318        assert_eq!(r.largest_sent_pkt_num_on_path(epoch).unwrap(), 3);
2319
2320        // Wait for 10ms.
2321        now += Duration::from_millis(10);
2322
2323        // ACK 2 packets
2324        let mut acked = RangeSet::default();
2325        acked.insert(0..2);
2326
2327        assert_eq!(
2328            r.on_ack_received(
2329                &acked,
2330                25,
2331                epoch,
2332                HandshakeStatus::default(),
2333                now,
2334                None,
2335                "",
2336            )
2337            .unwrap(),
2338            OnAckReceivedOutcome {
2339                lost_packets: 0,
2340                lost_bytes: 0,
2341                acked_bytes: 2 * 1000,
2342                spurious_losses: 0,
2343            }
2344        );
2345
2346        assert_eq!(r.sent_packets_len(epoch), 2);
2347        assert_eq!(r.bytes_in_flight(), 2 * 1000);
2348
2349        assert_eq!(r.get_largest_acked_on_epoch(epoch).unwrap(), 1);
2350        assert_eq!(r.largest_sent_pkt_num_on_path(epoch).unwrap(), 3);
2351
2352        // ACK large range
2353        let mut acked = RangeSet::default();
2354        acked.insert(0..10);
2355        assert_eq!(
2356            r.on_ack_received(
2357                &acked,
2358                25,
2359                epoch,
2360                HandshakeStatus::default(),
2361                now,
2362                None,
2363                "",
2364            )
2365            .unwrap(),
2366            OnAckReceivedOutcome {
2367                lost_packets: 0,
2368                lost_bytes: 0,
2369                acked_bytes: 2 * 1000,
2370                spurious_losses: 0,
2371            }
2372        );
2373        assert_eq!(r.sent_packets_len(epoch), 0);
2374        assert_eq!(r.bytes_in_flight(), 0);
2375
2376        assert_eq!(r.get_largest_acked_on_epoch(epoch).unwrap(), 3);
2377        assert_eq!(r.largest_sent_pkt_num_on_path(epoch).unwrap(), 3);
2378    }
2379
2380    #[rstest]
2381    fn pmtud_loss_on_timer(
2382        #[values("reno", "cubic", "bbr2_gcongestion")] cc_algorithm_name: &str,
2383    ) {
2384        let mut cfg = Config::new(crate::PROTOCOL_VERSION).unwrap();
2385        assert_eq!(cfg.set_cc_algorithm_name(cc_algorithm_name), Ok(()));
2386
2387        let mut r = Recovery::new(&cfg);
2388        assert_eq!(r.cwnd(), 12000);
2389
2390        let mut now = Instant::now();
2391
2392        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 0);
2393
2394        // Start by sending a few packets.
2395        let p = Sent {
2396            pkt_num: 0,
2397            frames: smallvec![],
2398            time_sent: now,
2399            time_acked: None,
2400            time_lost: None,
2401            size: 1000,
2402            ack_eliciting: true,
2403            in_flight: true,
2404            delivered: 0,
2405            delivered_time: now,
2406            first_sent_time: now,
2407            is_app_limited: false,
2408            tx_in_flight: 0,
2409            lost: 0,
2410            has_data: false,
2411            is_pmtud_probe: false,
2412        };
2413
2414        r.on_packet_sent(
2415            p,
2416            packet::Epoch::Application,
2417            HandshakeStatus::default(),
2418            now,
2419            "",
2420        );
2421
2422        assert_eq!(r.in_flight_count(packet::Epoch::Application), 1);
2423        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 1);
2424        assert_eq!(r.bytes_in_flight(), 1000);
2425        assert_eq!(r.bytes_in_flight_duration(), Duration::ZERO);
2426
2427        let p = Sent {
2428            pkt_num: 1,
2429            frames: smallvec![],
2430            time_sent: now,
2431            time_acked: None,
2432            time_lost: None,
2433            size: 1000,
2434            ack_eliciting: true,
2435            in_flight: true,
2436            delivered: 0,
2437            delivered_time: now,
2438            first_sent_time: now,
2439            is_app_limited: false,
2440            tx_in_flight: 0,
2441            lost: 0,
2442            has_data: false,
2443            is_pmtud_probe: true,
2444        };
2445
2446        r.on_packet_sent(
2447            p,
2448            packet::Epoch::Application,
2449            HandshakeStatus::default(),
2450            now,
2451            "",
2452        );
2453
2454        assert_eq!(r.in_flight_count(packet::Epoch::Application), 2);
2455
2456        let p = Sent {
2457            pkt_num: 2,
2458            frames: smallvec![],
2459            time_sent: now,
2460            time_acked: None,
2461            time_lost: None,
2462            size: 1000,
2463            ack_eliciting: true,
2464            in_flight: true,
2465            delivered: 0,
2466            delivered_time: now,
2467            first_sent_time: now,
2468            is_app_limited: false,
2469            tx_in_flight: 0,
2470            lost: 0,
2471            has_data: false,
2472            is_pmtud_probe: false,
2473        };
2474
2475        r.on_packet_sent(
2476            p,
2477            packet::Epoch::Application,
2478            HandshakeStatus::default(),
2479            now,
2480            "",
2481        );
2482
2483        assert_eq!(r.in_flight_count(packet::Epoch::Application), 3);
2484
2485        // Wait for 10ms.
2486        now += Duration::from_millis(10);
2487
2488        // Only the first  packets and the last one are acked.
2489        let mut acked = RangeSet::default();
2490        acked.insert(0..1);
2491        acked.insert(2..3);
2492
2493        assert_eq!(
2494            r.on_ack_received(
2495                &acked,
2496                25,
2497                packet::Epoch::Application,
2498                HandshakeStatus::default(),
2499                now,
2500                None,
2501                "",
2502            )
2503            .unwrap(),
2504            OnAckReceivedOutcome {
2505                lost_packets: 0,
2506                lost_bytes: 0,
2507                acked_bytes: 2 * 1000,
2508                spurious_losses: 0,
2509            }
2510        );
2511
2512        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 2);
2513        assert_eq!(r.bytes_in_flight(), 1000);
2514        assert_eq!(r.bytes_in_flight_duration(), Duration::from_millis(10));
2515        assert_eq!(r.lost_count(), 0);
2516
2517        // Wait until loss detection timer expires.
2518        now = r.loss_detection_timer().unwrap();
2519
2520        // Packet is declared lost.
2521        r.on_loss_detection_timeout(HandshakeStatus::default(), now, "");
2522        assert_eq!(r.loss_probes(packet::Epoch::Application), 0);
2523
2524        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 2);
2525        assert_eq!(r.in_flight_count(packet::Epoch::Application), 0);
2526        assert_eq!(r.bytes_in_flight(), 0);
2527        assert_eq!(r.bytes_in_flight_duration(), Duration::from_micros(11250));
2528        assert_eq!(r.cwnd(), 12000);
2529
2530        assert_eq!(r.lost_count(), 0);
2531
2532        // Wait 1 RTT.
2533        now += r.rtt();
2534
2535        assert_eq!(
2536            r.detect_lost_packets_for_test(packet::Epoch::Application, now),
2537            (0, 0)
2538        );
2539
2540        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 0);
2541        assert_eq!(r.in_flight_count(packet::Epoch::Application), 0);
2542        assert_eq!(r.bytes_in_flight(), 0);
2543        assert_eq!(r.bytes_in_flight_duration(), Duration::from_micros(11250));
2544        assert_eq!(r.lost_count(), 0);
2545        assert_eq!(r.startup_exit(), None);
2546    }
2547
2548    // Modeling delivery_rate for gcongestion is non-trivial so we only test the
2549    // congestion specific algorithms.
2550    #[rstest]
2551    fn congestion_delivery_rate(
2552        #[values("reno", "cubic", "bbr2")] cc_algorithm_name: &str,
2553    ) {
2554        let mut cfg = Config::new(crate::PROTOCOL_VERSION).unwrap();
2555        assert_eq!(cfg.set_cc_algorithm_name(cc_algorithm_name), Ok(()));
2556
2557        let mut r = Recovery::new(&cfg);
2558        assert_eq!(r.cwnd(), 12000);
2559
2560        let now = Instant::now();
2561
2562        let mut total_bytes_sent = 0;
2563        for pn in 0..10 {
2564            // Start by sending a few packets.
2565            let bytes = 1000;
2566            let sent = test_utils::helper_packet_sent(pn, now, bytes);
2567            r.on_packet_sent(
2568                sent,
2569                packet::Epoch::Application,
2570                HandshakeStatus::default(),
2571                now,
2572                "",
2573            );
2574
2575            total_bytes_sent += bytes;
2576        }
2577
2578        // Ack
2579        let interval = Duration::from_secs(10);
2580        let mut acked = RangeSet::default();
2581        acked.insert(0..10);
2582        assert_eq!(
2583            r.on_ack_received(
2584                &acked,
2585                25,
2586                packet::Epoch::Application,
2587                HandshakeStatus::default(),
2588                now + interval,
2589                None,
2590                "",
2591            )
2592            .unwrap(),
2593            OnAckReceivedOutcome {
2594                lost_packets: 0,
2595                lost_bytes: 0,
2596                acked_bytes: total_bytes_sent,
2597                spurious_losses: 0,
2598            }
2599        );
2600        assert_eq!(r.delivery_rate().to_bytes_per_second(), 1000);
2601        assert_eq!(r.min_rtt().unwrap(), interval);
2602        // delivery rate should be in units bytes/sec
2603        assert_eq!(
2604            total_bytes_sent as u64 / interval.as_secs(),
2605            r.delivery_rate().to_bytes_per_second()
2606        );
2607        assert_eq!(r.startup_exit(), None);
2608    }
2609
2610    #[rstest]
2611    fn acks_with_no_retransmittable_data(
2612        #[values("reno", "cubic", "bbr2_gcongestion")] cc_algorithm_name: &str,
2613    ) {
2614        let rtt = Duration::from_millis(100);
2615
2616        let mut cfg = Config::new(crate::PROTOCOL_VERSION).unwrap();
2617        assert_eq!(cfg.set_cc_algorithm_name(cc_algorithm_name), Ok(()));
2618
2619        let mut r = Recovery::new(&cfg);
2620
2621        let mut now = Instant::now();
2622
2623        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 0);
2624
2625        let mut next_packet = 0;
2626        // send some packets.
2627        for _ in 0..3 {
2628            let p = test_utils::helper_packet_sent(next_packet, now, 1200);
2629            next_packet += 1;
2630            r.on_packet_sent(
2631                p,
2632                packet::Epoch::Application,
2633                HandshakeStatus::default(),
2634                now,
2635                "",
2636            );
2637        }
2638
2639        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 3);
2640        assert_eq!(r.bytes_in_flight(), 3600);
2641        assert_eq!(r.bytes_in_flight_duration(), Duration::ZERO);
2642
2643        assert_eq!(
2644            r.pacing_rate(),
2645            if cc_algorithm_name == "bbr2_gcongestion" {
2646                103963
2647            } else {
2648                0
2649            },
2650        );
2651        assert_eq!(r.get_packet_send_time(now), now);
2652        assert_eq!(r.cwnd(), 12000);
2653        assert_eq!(r.cwnd_available(), 8400);
2654
2655        // Wait 1 rtt for ACK.
2656        now += rtt;
2657
2658        let mut acked = RangeSet::default();
2659        acked.insert(0..3);
2660
2661        assert_eq!(
2662            r.on_ack_received(
2663                &acked,
2664                10,
2665                packet::Epoch::Application,
2666                HandshakeStatus::default(),
2667                now,
2668                None,
2669                "",
2670            )
2671            .unwrap(),
2672            OnAckReceivedOutcome {
2673                lost_packets: 0,
2674                lost_bytes: 0,
2675                acked_bytes: 3600,
2676                spurious_losses: 0,
2677            }
2678        );
2679
2680        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 0);
2681        assert_eq!(r.bytes_in_flight(), 0);
2682        assert_eq!(r.bytes_in_flight_duration(), rtt);
2683        assert_eq!(r.rtt(), rtt);
2684
2685        // Pacing rate is recalculated based on initial cwnd when the
2686        // first RTT estimate is available.
2687        assert_eq!(
2688            r.pacing_rate(),
2689            if cc_algorithm_name == "bbr2_gcongestion" {
2690                120000
2691            } else {
2692                0
2693            },
2694        );
2695
2696        // Send some no "in_flight" packets
2697        for iter in 3..1000 {
2698            let mut p = test_utils::helper_packet_sent(next_packet, now, 1200);
2699            // `in_flight = false` marks packets as if they only contained ACK
2700            // frames.
2701            p.in_flight = false;
2702            next_packet += 1;
2703            r.on_packet_sent(
2704                p,
2705                packet::Epoch::Application,
2706                HandshakeStatus::default(),
2707                now,
2708                "",
2709            );
2710
2711            now += rtt;
2712
2713            let mut acked = RangeSet::default();
2714            acked.insert(iter..(iter + 1));
2715
2716            assert_eq!(
2717                r.on_ack_received(
2718                    &acked,
2719                    10,
2720                    packet::Epoch::Application,
2721                    HandshakeStatus::default(),
2722                    now,
2723                    None,
2724                    "",
2725                )
2726                .unwrap(),
2727                OnAckReceivedOutcome {
2728                    lost_packets: 0,
2729                    lost_bytes: 0,
2730                    acked_bytes: 0,
2731                    spurious_losses: 0,
2732                }
2733            );
2734
2735            // Verify that connection has not exited startup.
2736            assert_eq!(r.startup_exit(), None, "{iter}");
2737
2738            // Unchanged metrics.
2739            assert_eq!(
2740                r.sent_packets_len(packet::Epoch::Application),
2741                0,
2742                "{iter}"
2743            );
2744            assert_eq!(r.bytes_in_flight(), 0, "{iter}");
2745            assert_eq!(r.bytes_in_flight_duration(), rtt, "{iter}");
2746            assert_eq!(
2747                r.pacing_rate(),
2748                if cc_algorithm_name == "bbr2_gcongestion" ||
2749                    cc_algorithm_name == "bbr2"
2750                {
2751                    120000
2752                } else {
2753                    0
2754                },
2755                "{iter}"
2756            );
2757        }
2758    }
2759}
2760
2761mod bandwidth;
2762mod bytes_in_flight;
2763mod congestion;
2764mod gcongestion;
2765mod rtt;