Skip to main content

quiche/recovery/
mod.rs

1// Copyright (C) 2018-2019, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27use std::str::FromStr;
28use std::time::Duration;
29use std::time::Instant;
30
31use crate::frame;
32use crate::packet;
33use crate::ranges::RangeSet;
34pub(crate) use crate::recovery::bandwidth::Bandwidth;
35use crate::Config;
36use crate::Result;
37
38#[cfg(feature = "qlog")]
39use qlog::events::EventData;
40#[cfg(feature = "qlog")]
41use serde::Serialize;
42
43use smallvec::SmallVec;
44
45use self::congestion::recovery::LegacyRecovery;
46use self::gcongestion::GRecovery;
47pub use gcongestion::BbrBwLoReductionStrategy;
48pub use gcongestion::BbrParams;
49
50// Loss Recovery
51const INITIAL_PACKET_THRESHOLD: u64 = 3;
52
53const MAX_PACKET_THRESHOLD: u64 = 20;
54
55// Time threshold used to calculate the loss time.
56//
57// https://www.rfc-editor.org/rfc/rfc9002.html#section-6.1.2
58const INITIAL_TIME_THRESHOLD: f64 = 9.0 / 8.0;
59
60// Reduce the sensitivity to packet reordering after the first reordering event.
61//
62// Packet reorder is not a real loss event so quickly reduce the sensitivity to
63// avoid penializing subsequent packet reordering.
64//
65// https://www.rfc-editor.org/rfc/rfc9002.html#section-6.1.2
66//
67// Implementations MAY experiment with absolute thresholds, thresholds from
68// previous connections, adaptive thresholds, or the including of RTT variation.
69// Smaller thresholds reduce reordering resilience and increase spurious
70// retransmissions, and larger thresholds increase loss detection delay.
71const PACKET_REORDER_TIME_THRESHOLD: f64 = 5.0 / 4.0;
72
73// # Experiment: enable_relaxed_loss_threshold
74//
75// Time threshold overhead used to calculate the loss time.
76//
77// The actual threshold is calcualted as 1 + INITIAL_TIME_THRESHOLD_OVERHEAD and
78// equivalent to INITIAL_TIME_THRESHOLD.
79const INITIAL_TIME_THRESHOLD_OVERHEAD: f64 = 1.0 / 8.0;
80// # Experiment: enable_relaxed_loss_threshold
81//
82// The factor by which to increase the time threshold on spurious loss.
83const TIME_THRESHOLD_OVERHEAD_MULTIPLIER: f64 = 2.0;
84
85const GRANULARITY: Duration = Duration::from_millis(1);
86
87const MAX_PTO_PROBES_COUNT: usize = 2;
88
89const MINIMUM_WINDOW_PACKETS: usize = 2;
90
91const LOSS_REDUCTION_FACTOR: f64 = 0.5;
92
93// How many non ACK eliciting packets we send before including a PING to solicit
94// an ACK.
95pub(super) const MAX_OUTSTANDING_NON_ACK_ELICITING: usize = 24;
96
97#[derive(Default)]
98struct LossDetectionTimer {
99    time: Option<Instant>,
100}
101
102impl LossDetectionTimer {
103    fn update(&mut self, timeout: Instant) {
104        self.time = Some(timeout);
105    }
106
107    fn clear(&mut self) {
108        self.time = None;
109    }
110}
111
112impl std::fmt::Debug for LossDetectionTimer {
113    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
114        match self.time {
115            Some(v) => {
116                let now = Instant::now();
117                if v > now {
118                    let d = v.duration_since(now);
119                    write!(f, "{d:?}")
120                } else {
121                    write!(f, "exp")
122                }
123            },
124            None => write!(f, "none"),
125        }
126    }
127}
128
129#[derive(Clone, Copy, PartialEq)]
130pub struct RecoveryConfig {
131    pub initial_rtt: Duration,
132    pub max_send_udp_payload_size: usize,
133    pub max_ack_delay: Duration,
134    pub cc_algorithm: CongestionControlAlgorithm,
135    pub custom_bbr_params: Option<BbrParams>,
136    pub hystart: bool,
137    pub pacing: bool,
138    pub max_pacing_rate: Option<u64>,
139    pub initial_congestion_window_packets: usize,
140    pub enable_relaxed_loss_threshold: bool,
141}
142
143impl RecoveryConfig {
144    pub fn from_config(config: &Config) -> Self {
145        Self {
146            initial_rtt: config.initial_rtt,
147            max_send_udp_payload_size: config.max_send_udp_payload_size,
148            max_ack_delay: Duration::ZERO,
149            cc_algorithm: config.cc_algorithm,
150            custom_bbr_params: config.custom_bbr_params,
151            hystart: config.hystart,
152            pacing: config.pacing,
153            max_pacing_rate: config.max_pacing_rate,
154            initial_congestion_window_packets: config
155                .initial_congestion_window_packets,
156            enable_relaxed_loss_threshold: config.enable_relaxed_loss_threshold,
157        }
158    }
159}
160
161#[enum_dispatch::enum_dispatch(RecoveryOps)]
162#[allow(clippy::large_enum_variant)]
163#[derive(Debug)]
164pub(crate) enum Recovery {
165    Legacy(LegacyRecovery),
166    GCongestion(GRecovery),
167}
168
169#[derive(Debug, Default, PartialEq)]
170pub struct OnAckReceivedOutcome {
171    pub lost_packets: usize,
172    pub lost_bytes: usize,
173    pub acked_bytes: usize,
174    pub spurious_losses: usize,
175}
176
177#[derive(Debug, Default)]
178pub struct OnLossDetectionTimeoutOutcome {
179    pub lost_packets: usize,
180    pub lost_bytes: usize,
181}
182
183#[enum_dispatch::enum_dispatch]
184/// Api for the Recovery implementation
185pub trait RecoveryOps {
186    fn lost_count(&self) -> usize;
187    fn bytes_lost(&self) -> u64;
188
189    /// Returns whether or not we should elicit an ACK even if we wouldn't
190    /// otherwise have constructed an ACK eliciting packet.
191    fn should_elicit_ack(&self, epoch: packet::Epoch) -> bool;
192
193    fn next_acked_frame(&mut self, epoch: packet::Epoch) -> Option<frame::Frame>;
194
195    fn next_lost_frame(&mut self, epoch: packet::Epoch) -> Option<frame::Frame>;
196
197    fn get_largest_acked_on_epoch(&self, epoch: packet::Epoch) -> Option<u64>;
198    fn has_lost_frames(&self, epoch: packet::Epoch) -> bool;
199    fn loss_probes(&self, epoch: packet::Epoch) -> usize;
200    #[cfg(test)]
201    fn inc_loss_probes(&mut self, epoch: packet::Epoch);
202
203    fn ping_sent(&mut self, epoch: packet::Epoch);
204
205    fn on_packet_sent(
206        &mut self, pkt: Sent, epoch: packet::Epoch,
207        handshake_status: HandshakeStatus, now: Instant, trace_id: &str,
208    );
209    fn get_packet_send_time(&self, now: Instant) -> Instant;
210
211    #[allow(clippy::too_many_arguments)]
212    fn on_ack_received(
213        &mut self, ranges: &RangeSet, ack_delay: u64, epoch: packet::Epoch,
214        handshake_status: HandshakeStatus, now: Instant, skip_pn: Option<u64>,
215        trace_id: &str,
216    ) -> Result<OnAckReceivedOutcome>;
217
218    fn on_loss_detection_timeout(
219        &mut self, handshake_status: HandshakeStatus, now: Instant,
220        trace_id: &str,
221    ) -> OnLossDetectionTimeoutOutcome;
222    fn on_pkt_num_space_discarded(
223        &mut self, epoch: packet::Epoch, handshake_status: HandshakeStatus,
224        now: Instant,
225    );
226    fn on_path_change(
227        &mut self, epoch: packet::Epoch, now: Instant, _trace_id: &str,
228    ) -> (usize, usize);
229    fn loss_detection_timer(&self) -> Option<Instant>;
230    fn cwnd(&self) -> usize;
231    fn cwnd_available(&self) -> usize;
232    fn rtt(&self) -> Duration;
233
234    fn min_rtt(&self) -> Option<Duration>;
235
236    fn max_rtt(&self) -> Option<Duration>;
237
238    fn rttvar(&self) -> Duration;
239
240    fn pto(&self) -> Duration;
241
242    /// The most recent data delivery rate estimate.
243    fn delivery_rate(&self) -> Bandwidth;
244
245    /// Maximum bandwidth estimate, if one is available.
246    fn max_bandwidth(&self) -> Option<Bandwidth>;
247
248    /// Statistics from when a CCA first exited the startup phase.
249    fn startup_exit(&self) -> Option<StartupExit>;
250
251    fn max_datagram_size(&self) -> usize;
252
253    fn pmtud_update_max_datagram_size(&mut self, new_max_datagram_size: usize);
254
255    fn update_max_datagram_size(&mut self, new_max_datagram_size: usize);
256
257    fn on_app_limited(&mut self);
258
259    // Since a recovery module is path specific, this tracks the largest packet
260    // sent per path.
261    #[cfg(test)]
262    fn largest_sent_pkt_num_on_path(&self, epoch: packet::Epoch) -> Option<u64>;
263
264    #[cfg(test)]
265    fn app_limited(&self) -> bool;
266
267    #[cfg(test)]
268    fn sent_packets_len(&self, epoch: packet::Epoch) -> usize;
269
270    fn bytes_in_flight(&self) -> usize;
271
272    fn bytes_in_flight_duration(&self) -> Duration;
273
274    #[cfg(test)]
275    fn in_flight_count(&self, epoch: packet::Epoch) -> usize;
276
277    #[cfg(test)]
278    fn pacing_rate(&self) -> u64;
279
280    #[cfg(test)]
281    fn pto_count(&self) -> u32;
282
283    // This value might be `None` when experiment `enable_relaxed_loss_threshold`
284    // is enabled for gcongestion
285    #[cfg(test)]
286    fn pkt_thresh(&self) -> Option<u64>;
287
288    #[cfg(test)]
289    fn time_thresh(&self) -> f64;
290
291    #[cfg(test)]
292    fn lost_spurious_count(&self) -> usize;
293
294    #[cfg(test)]
295    fn detect_lost_packets_for_test(
296        &mut self, epoch: packet::Epoch, now: Instant,
297    ) -> (usize, usize);
298
299    fn update_app_limited(&mut self, v: bool);
300
301    fn delivery_rate_update_app_limited(&mut self, v: bool);
302
303    fn update_max_ack_delay(&mut self, max_ack_delay: Duration);
304
305    #[cfg(feature = "qlog")]
306    fn state_str(&self, now: Instant) -> &'static str;
307
308    #[cfg(feature = "qlog")]
309    fn get_updated_qlog_event_data(&mut self) -> Option<EventData>;
310
311    #[cfg(feature = "qlog")]
312    fn get_updated_qlog_cc_state(&mut self, now: Instant)
313        -> Option<&'static str>;
314
315    fn send_quantum(&self) -> usize;
316
317    fn get_next_release_time(&self) -> ReleaseDecision;
318
319    fn gcongestion_enabled(&self) -> bool;
320}
321
322impl Recovery {
323    pub fn new_with_config(recovery_config: &RecoveryConfig) -> Self {
324        let grecovery = GRecovery::new(recovery_config);
325        if let Some(grecovery) = grecovery {
326            Recovery::from(grecovery)
327        } else {
328            Recovery::from(LegacyRecovery::new_with_config(recovery_config))
329        }
330    }
331
332    #[cfg(feature = "qlog")]
333    pub fn maybe_qlog(
334        &mut self, qlog: &mut qlog::streamer::QlogStreamer, now: Instant,
335    ) {
336        if let Some(ev_data) = self.get_updated_qlog_event_data() {
337            qlog.add_event_data_with_instant(ev_data, now).ok();
338        }
339
340        if let Some(cc_state) = self.get_updated_qlog_cc_state(now) {
341            let ev_data = EventData::CongestionStateUpdated(
342                qlog::events::quic::CongestionStateUpdated {
343                    old: None,
344                    new: cc_state.to_string(),
345                    trigger: None,
346                },
347            );
348
349            qlog.add_event_data_with_instant(ev_data, now).ok();
350        }
351    }
352
353    #[cfg(test)]
354    pub fn new(config: &Config) -> Self {
355        Self::new_with_config(&RecoveryConfig::from_config(config))
356    }
357}
358
359/// Available congestion control algorithms.
360///
361/// This enum provides currently available list of congestion control
362/// algorithms.
363#[derive(Debug, Copy, Clone, PartialEq, Eq)]
364#[repr(C)]
365pub enum CongestionControlAlgorithm {
366    /// Reno congestion control algorithm. `reno` in a string form.
367    Reno            = 0,
368    /// CUBIC congestion control algorithm (default). `cubic` in a string form.
369    CUBIC           = 1,
370    /// BBRv2 congestion control algorithm implementation from gcongestion
371    /// branch. `bbr2_gcongestion` in a string form.
372    Bbr2Gcongestion = 4,
373}
374
375impl FromStr for CongestionControlAlgorithm {
376    type Err = crate::Error;
377
378    /// Converts a string to `CongestionControlAlgorithm`.
379    ///
380    /// If `name` is not valid, `Error::CongestionControl` is returned.
381    fn from_str(name: &str) -> std::result::Result<Self, Self::Err> {
382        match name {
383            "reno" => Ok(CongestionControlAlgorithm::Reno),
384            "cubic" => Ok(CongestionControlAlgorithm::CUBIC),
385            "bbr" => Ok(CongestionControlAlgorithm::Bbr2Gcongestion),
386            "bbr2" => Ok(CongestionControlAlgorithm::Bbr2Gcongestion),
387            "bbr2_gcongestion" => Ok(CongestionControlAlgorithm::Bbr2Gcongestion),
388            _ => Err(crate::Error::CongestionControl),
389        }
390    }
391}
392
393#[derive(Clone)]
394pub struct Sent {
395    pub pkt_num: u64,
396
397    pub frames: SmallVec<[frame::Frame; 1]>,
398
399    pub time_sent: Instant,
400
401    pub time_acked: Option<Instant>,
402
403    pub time_lost: Option<Instant>,
404
405    pub size: usize,
406
407    pub ack_eliciting: bool,
408
409    pub in_flight: bool,
410
411    pub delivered: usize,
412
413    pub delivered_time: Instant,
414
415    pub first_sent_time: Instant,
416
417    pub is_app_limited: bool,
418
419    pub tx_in_flight: usize,
420
421    pub lost: u64,
422
423    pub has_data: bool,
424
425    pub is_pmtud_probe: bool,
426}
427
428impl std::fmt::Debug for Sent {
429    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
430        write!(f, "pkt_num={:?} ", self.pkt_num)?;
431        write!(f, "pkt_sent_time={:?} ", self.time_sent)?;
432        write!(f, "pkt_size={:?} ", self.size)?;
433        write!(f, "delivered={:?} ", self.delivered)?;
434        write!(f, "delivered_time={:?} ", self.delivered_time)?;
435        write!(f, "first_sent_time={:?} ", self.first_sent_time)?;
436        write!(f, "is_app_limited={} ", self.is_app_limited)?;
437        write!(f, "tx_in_flight={} ", self.tx_in_flight)?;
438        write!(f, "lost={} ", self.lost)?;
439        write!(f, "has_data={} ", self.has_data)?;
440        write!(f, "is_pmtud_probe={}", self.is_pmtud_probe)?;
441
442        Ok(())
443    }
444}
445
446#[derive(Clone, Copy, Debug)]
447pub struct HandshakeStatus {
448    pub has_handshake_keys: bool,
449
450    pub peer_verified_address: bool,
451
452    pub completed: bool,
453}
454
455#[cfg(test)]
456impl Default for HandshakeStatus {
457    fn default() -> HandshakeStatus {
458        HandshakeStatus {
459            has_handshake_keys: true,
460
461            peer_verified_address: true,
462
463            completed: true,
464        }
465    }
466}
467
468// We don't need to log all qlog metrics every time there is a recovery event.
469// Instead, we can log only the MetricsUpdated event data fields that we care
470// about, only when they change. To support this, the QLogMetrics structure
471// keeps a running picture of the fields.
472#[derive(Default)]
473#[cfg(feature = "qlog")]
474struct QlogMetrics {
475    min_rtt: Duration,
476    smoothed_rtt: Duration,
477    latest_rtt: Duration,
478    rttvar: Duration,
479    cwnd: u64,
480    bytes_in_flight: u64,
481    ssthresh: Option<u64>,
482    pacing_rate: Option<u64>,
483    delivery_rate: Option<u64>,
484    send_rate: Option<u64>,
485    ack_rate: Option<u64>,
486    lost_packets: Option<u64>,
487    lost_bytes: Option<u64>,
488    pto_count: Option<u32>,
489}
490
491#[cfg(feature = "qlog")]
492trait CustomCfQlogField {
493    fn name(&self) -> &'static str;
494    fn as_json_value(&self) -> serde_json::Value;
495}
496
497#[cfg(feature = "qlog")]
498#[serde_with::skip_serializing_none]
499#[derive(Serialize)]
500struct TotalAndDelta {
501    total: Option<u64>,
502    delta: Option<u64>,
503}
504
505#[cfg(feature = "qlog")]
506struct CustomQlogField<T> {
507    name: &'static str,
508    value: T,
509}
510
511#[cfg(feature = "qlog")]
512impl<T> CustomQlogField<T> {
513    fn new(name: &'static str, value: T) -> Self {
514        Self { name, value }
515    }
516}
517
518#[cfg(feature = "qlog")]
519impl<T: Serialize> CustomCfQlogField for CustomQlogField<T> {
520    fn name(&self) -> &'static str {
521        self.name
522    }
523
524    fn as_json_value(&self) -> serde_json::Value {
525        serde_json::json!(&self.value)
526    }
527}
528
529#[cfg(feature = "qlog")]
530struct CfExData(qlog::events::ExData);
531
532#[cfg(feature = "qlog")]
533impl CfExData {
534    fn new() -> Self {
535        Self(qlog::events::ExData::new())
536    }
537
538    fn insert<T: Serialize>(&mut self, name: &'static str, value: T) {
539        let field = CustomQlogField::new(name, value);
540        self.0
541            .insert(field.name().to_string(), field.as_json_value());
542    }
543
544    fn into_inner(self) -> qlog::events::ExData {
545        self.0
546    }
547}
548
549#[cfg(feature = "qlog")]
550impl QlogMetrics {
551    // Make a qlog event if the latest instance of QlogMetrics is different.
552    //
553    // This function diffs each of the fields. A qlog MetricsUpdated event is
554    // only generated if at least one field is different. Where fields are
555    // different, the qlog event contains the latest value.
556    fn maybe_update(&mut self, latest: Self) -> Option<EventData> {
557        let mut emit_event = false;
558
559        let new_min_rtt = if self.min_rtt != latest.min_rtt {
560            self.min_rtt = latest.min_rtt;
561            emit_event = true;
562            Some(latest.min_rtt.as_secs_f32() * 1000.0)
563        } else {
564            None
565        };
566
567        let new_smoothed_rtt = if self.smoothed_rtt != latest.smoothed_rtt {
568            self.smoothed_rtt = latest.smoothed_rtt;
569            emit_event = true;
570            Some(latest.smoothed_rtt.as_secs_f32() * 1000.0)
571        } else {
572            None
573        };
574
575        let new_latest_rtt = if self.latest_rtt != latest.latest_rtt {
576            self.latest_rtt = latest.latest_rtt;
577            emit_event = true;
578            Some(latest.latest_rtt.as_secs_f32() * 1000.0)
579        } else {
580            None
581        };
582
583        let new_rttvar = if self.rttvar != latest.rttvar {
584            self.rttvar = latest.rttvar;
585            emit_event = true;
586            Some(latest.rttvar.as_secs_f32() * 1000.0)
587        } else {
588            None
589        };
590
591        let new_cwnd = if self.cwnd != latest.cwnd {
592            self.cwnd = latest.cwnd;
593            emit_event = true;
594            Some(latest.cwnd)
595        } else {
596            None
597        };
598
599        let new_bytes_in_flight =
600            if self.bytes_in_flight != latest.bytes_in_flight {
601                self.bytes_in_flight = latest.bytes_in_flight;
602                emit_event = true;
603                Some(latest.bytes_in_flight)
604            } else {
605                None
606            };
607
608        let new_ssthresh = if self.ssthresh != latest.ssthresh {
609            self.ssthresh = latest.ssthresh;
610            emit_event = true;
611            latest.ssthresh
612        } else {
613            None
614        };
615
616        let new_pacing_rate = if self.pacing_rate != latest.pacing_rate {
617            self.pacing_rate = latest.pacing_rate;
618            emit_event = true;
619            latest.pacing_rate
620        } else {
621            None
622        };
623
624        let new_pto_count =
625            if latest.pto_count.is_some() && self.pto_count != latest.pto_count {
626                self.pto_count = latest.pto_count;
627                emit_event = true;
628                latest.pto_count.map(|v| v as u16)
629            } else {
630                None
631            };
632
633        // Build ex_data for rate metrics
634        let mut ex_data = CfExData::new();
635        if self.delivery_rate != latest.delivery_rate {
636            if let Some(rate) = latest.delivery_rate {
637                self.delivery_rate = latest.delivery_rate;
638                emit_event = true;
639                ex_data.insert("cf_delivery_rate", rate);
640            }
641        }
642        if self.send_rate != latest.send_rate {
643            if let Some(rate) = latest.send_rate {
644                self.send_rate = latest.send_rate;
645                emit_event = true;
646                ex_data.insert("cf_send_rate", rate);
647            }
648        }
649        if self.ack_rate != latest.ack_rate {
650            if let Some(rate) = latest.ack_rate {
651                self.ack_rate = latest.ack_rate;
652                emit_event = true;
653                ex_data.insert("cf_ack_rate", rate);
654            }
655        }
656
657        if self.lost_packets != latest.lost_packets {
658            if let Some(val) = latest.lost_packets {
659                emit_event = true;
660                ex_data.insert("cf_lost_packets", TotalAndDelta {
661                    total: latest.lost_packets,
662                    delta: Some(val - self.lost_packets.unwrap_or(0)),
663                });
664                self.lost_packets = latest.lost_packets;
665            }
666        }
667        if self.lost_bytes != latest.lost_bytes {
668            if let Some(val) = latest.lost_bytes {
669                emit_event = true;
670                ex_data.insert("cf_lost_bytes", TotalAndDelta {
671                    total: latest.lost_bytes,
672                    delta: Some(val - self.lost_bytes.unwrap_or(0)),
673                });
674                self.lost_bytes = latest.lost_bytes;
675            }
676        }
677
678        if emit_event {
679            return Some(EventData::MetricsUpdated(
680                qlog::events::quic::MetricsUpdated {
681                    min_rtt: new_min_rtt,
682                    smoothed_rtt: new_smoothed_rtt,
683                    latest_rtt: new_latest_rtt,
684                    rtt_variance: new_rttvar,
685                    congestion_window: new_cwnd,
686                    bytes_in_flight: new_bytes_in_flight,
687                    ssthresh: new_ssthresh,
688                    pacing_rate: new_pacing_rate,
689                    pto_count: new_pto_count,
690                    ex_data: ex_data.into_inner(),
691                    ..Default::default()
692                },
693            ));
694        }
695
696        None
697    }
698}
699
700/// When the pacer thinks is a good time to release the next packet
701#[derive(Debug, Clone, Copy, PartialEq, Eq)]
702pub enum ReleaseTime {
703    Immediate,
704    At(Instant),
705}
706
707/// When the next packet should be release and if it can be part of a burst
708#[derive(Clone, Copy, Debug, PartialEq, Eq)]
709pub struct ReleaseDecision {
710    time: ReleaseTime,
711    allow_burst: bool,
712}
713
714impl ReleaseTime {
715    /// Add the specific delay to the current time
716    fn inc(&mut self, delay: Duration) {
717        match self {
718            ReleaseTime::Immediate => {},
719            ReleaseTime::At(time) => *time += delay,
720        }
721    }
722
723    /// Set the time to the later of two times
724    fn set_max(&mut self, other: Instant) {
725        match self {
726            ReleaseTime::Immediate => *self = ReleaseTime::At(other),
727            ReleaseTime::At(time) => *self = ReleaseTime::At(other.max(*time)),
728        }
729    }
730}
731
732impl ReleaseDecision {
733    pub(crate) const EQUAL_THRESHOLD: Duration = Duration::from_micros(50);
734
735    /// Get the [`Instant`] the next packet should be released. It will never be
736    /// in the past.
737    #[inline]
738    pub fn time(&self, now: Instant) -> Option<Instant> {
739        match self.time {
740            ReleaseTime::Immediate => None,
741            ReleaseTime::At(other) => other.gt(&now).then_some(other),
742        }
743    }
744
745    /// Can this packet be appended to a previous burst
746    #[inline]
747    pub fn can_burst(&self) -> bool {
748        self.allow_burst
749    }
750
751    /// Check if the two packets can be released at the same time
752    #[inline]
753    pub fn time_eq(&self, other: &Self, now: Instant) -> bool {
754        let delta = match (self.time(now), other.time(now)) {
755            (None, None) => Duration::ZERO,
756            (Some(t), None) | (None, Some(t)) => t.duration_since(now),
757            (Some(t1), Some(t2)) if t1 < t2 => t2.duration_since(t1),
758            (Some(t1), Some(t2)) => t1.duration_since(t2),
759        };
760
761        delta <= Self::EQUAL_THRESHOLD
762    }
763}
764
765/// Recovery statistics
766#[derive(Default, Debug)]
767pub struct RecoveryStats {
768    startup_exit: Option<StartupExit>,
769}
770
771impl RecoveryStats {
772    // Record statistics when a CCA first exits startup.
773    pub fn set_startup_exit(&mut self, startup_exit: StartupExit) {
774        if self.startup_exit.is_none() {
775            self.startup_exit = Some(startup_exit);
776        }
777    }
778}
779
780/// Statistics from when a CCA first exited the startup phase.
781#[derive(Debug, Clone, Copy, PartialEq)]
782pub struct StartupExit {
783    /// The congestion_window recorded at Startup exit.
784    pub cwnd: usize,
785
786    /// The bandwidth estimate recorded at Startup exit.
787    pub bandwidth: Option<u64>,
788
789    /// The reason a CCA exited the startup phase.
790    pub reason: StartupExitReason,
791}
792
793impl StartupExit {
794    fn new(
795        cwnd: usize, bandwidth: Option<Bandwidth>, reason: StartupExitReason,
796    ) -> Self {
797        let bandwidth = bandwidth.map(Bandwidth::to_bytes_per_second);
798        Self {
799            cwnd,
800            bandwidth,
801            reason,
802        }
803    }
804}
805
806/// The reason a CCA exited the startup phase.
807#[derive(Debug, Clone, Copy, PartialEq)]
808pub enum StartupExitReason {
809    /// Exit slow start or BBR startup due to excessive loss
810    Loss,
811
812    /// Exit BBR startup due to bandwidth plateau.
813    BandwidthPlateau,
814
815    /// Exit BBR startup due to persistent queue.
816    PersistentQueue,
817
818    /// Exit HyStart++ conservative slow start after the max rounds allowed.
819    ConservativeSlowStartRounds,
820}
821
822#[cfg(test)]
823mod tests {
824    use super::*;
825    use crate::packet;
826    use crate::test_utils;
827    use crate::CongestionControlAlgorithm;
828    use crate::DEFAULT_INITIAL_RTT;
829    use rstest::rstest;
830    use smallvec::smallvec;
831    use std::str::FromStr;
832
833    fn recovery_for_alg(algo: CongestionControlAlgorithm) -> Recovery {
834        let mut cfg = Config::new(crate::PROTOCOL_VERSION).unwrap();
835        cfg.set_cc_algorithm(algo);
836        Recovery::new(&cfg)
837    }
838
839    #[test]
840    fn lookup_cc_algo_ok() {
841        let algo = CongestionControlAlgorithm::from_str("reno").unwrap();
842        assert_eq!(algo, CongestionControlAlgorithm::Reno);
843        assert!(!recovery_for_alg(algo).gcongestion_enabled());
844
845        let algo = CongestionControlAlgorithm::from_str("cubic").unwrap();
846        assert_eq!(algo, CongestionControlAlgorithm::CUBIC);
847        assert!(!recovery_for_alg(algo).gcongestion_enabled());
848
849        let algo = CongestionControlAlgorithm::from_str("bbr").unwrap();
850        assert_eq!(algo, CongestionControlAlgorithm::Bbr2Gcongestion);
851        assert!(recovery_for_alg(algo).gcongestion_enabled());
852
853        let algo = CongestionControlAlgorithm::from_str("bbr2").unwrap();
854        assert_eq!(algo, CongestionControlAlgorithm::Bbr2Gcongestion);
855        assert!(recovery_for_alg(algo).gcongestion_enabled());
856
857        let algo =
858            CongestionControlAlgorithm::from_str("bbr2_gcongestion").unwrap();
859        assert_eq!(algo, CongestionControlAlgorithm::Bbr2Gcongestion);
860        assert!(recovery_for_alg(algo).gcongestion_enabled());
861    }
862
863    #[test]
864    fn lookup_cc_algo_bad() {
865        assert_eq!(
866            CongestionControlAlgorithm::from_str("???"),
867            Err(crate::Error::CongestionControl)
868        );
869    }
870
871    #[rstest]
872    fn loss_on_pto(
873        #[values("reno", "cubic", "bbr2_gcongestion")] cc_algorithm_name: &str,
874    ) {
875        let mut cfg = Config::new(crate::PROTOCOL_VERSION).unwrap();
876        assert_eq!(cfg.set_cc_algorithm_name(cc_algorithm_name), Ok(()));
877
878        let mut r = Recovery::new(&cfg);
879
880        let mut now = Instant::now();
881
882        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 0);
883
884        // Start by sending a few packets.
885        let p = Sent {
886            pkt_num: 0,
887            frames: smallvec![],
888            time_sent: now,
889            time_acked: None,
890            time_lost: None,
891            size: 1000,
892            ack_eliciting: true,
893            in_flight: true,
894            delivered: 0,
895            delivered_time: now,
896            first_sent_time: now,
897            is_app_limited: false,
898            tx_in_flight: 0,
899            lost: 0,
900            has_data: false,
901            is_pmtud_probe: false,
902        };
903
904        r.on_packet_sent(
905            p,
906            packet::Epoch::Application,
907            HandshakeStatus::default(),
908            now,
909            "",
910        );
911
912        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 1);
913        assert_eq!(r.bytes_in_flight(), 1000);
914        assert_eq!(r.bytes_in_flight_duration(), Duration::ZERO);
915
916        let p = Sent {
917            pkt_num: 1,
918            frames: smallvec![],
919            time_sent: now,
920            time_acked: None,
921            time_lost: None,
922            size: 1000,
923            ack_eliciting: true,
924            in_flight: true,
925            delivered: 0,
926            delivered_time: now,
927            first_sent_time: now,
928            is_app_limited: false,
929            tx_in_flight: 0,
930            lost: 0,
931            has_data: false,
932            is_pmtud_probe: false,
933        };
934
935        r.on_packet_sent(
936            p,
937            packet::Epoch::Application,
938            HandshakeStatus::default(),
939            now,
940            "",
941        );
942
943        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 2);
944        assert_eq!(r.bytes_in_flight(), 2000);
945        assert_eq!(r.bytes_in_flight_duration(), Duration::ZERO);
946
947        let p = Sent {
948            pkt_num: 2,
949            frames: smallvec![],
950            time_sent: now,
951            time_acked: None,
952            time_lost: None,
953            size: 1000,
954            ack_eliciting: true,
955            in_flight: true,
956            delivered: 0,
957            delivered_time: now,
958            first_sent_time: now,
959            is_app_limited: false,
960            tx_in_flight: 0,
961            lost: 0,
962            has_data: false,
963            is_pmtud_probe: false,
964        };
965
966        r.on_packet_sent(
967            p,
968            packet::Epoch::Application,
969            HandshakeStatus::default(),
970            now,
971            "",
972        );
973        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 3);
974        assert_eq!(r.bytes_in_flight(), 3000);
975        assert_eq!(r.bytes_in_flight_duration(), Duration::ZERO);
976
977        let p = Sent {
978            pkt_num: 3,
979            frames: smallvec![],
980            time_sent: now,
981            time_acked: None,
982            time_lost: None,
983            size: 1000,
984            ack_eliciting: true,
985            in_flight: true,
986            delivered: 0,
987            delivered_time: now,
988            first_sent_time: now,
989            is_app_limited: false,
990            tx_in_flight: 0,
991            lost: 0,
992            has_data: false,
993            is_pmtud_probe: false,
994        };
995
996        r.on_packet_sent(
997            p,
998            packet::Epoch::Application,
999            HandshakeStatus::default(),
1000            now,
1001            "",
1002        );
1003        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 4);
1004        assert_eq!(r.bytes_in_flight(), 4000);
1005        assert_eq!(r.bytes_in_flight_duration(), Duration::ZERO);
1006
1007        // Wait for 10ms.
1008        now += Duration::from_millis(10);
1009
1010        // Only the first 2 packets are acked.
1011        let mut acked = RangeSet::default();
1012        acked.insert(0..2);
1013
1014        assert_eq!(
1015            r.on_ack_received(
1016                &acked,
1017                25,
1018                packet::Epoch::Application,
1019                HandshakeStatus::default(),
1020                now,
1021                None,
1022                "",
1023            )
1024            .unwrap(),
1025            OnAckReceivedOutcome {
1026                lost_packets: 0,
1027                lost_bytes: 0,
1028                acked_bytes: 2 * 1000,
1029                spurious_losses: 0,
1030            }
1031        );
1032
1033        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 2);
1034        assert_eq!(r.bytes_in_flight(), 2000);
1035        assert_eq!(r.bytes_in_flight_duration(), Duration::from_millis(10));
1036        assert_eq!(r.lost_count(), 0);
1037
1038        // Wait until loss detection timer expires.
1039        now = r.loss_detection_timer().unwrap();
1040
1041        // PTO.
1042        r.on_loss_detection_timeout(HandshakeStatus::default(), now, "");
1043        assert_eq!(r.loss_probes(packet::Epoch::Application), 1);
1044        assert_eq!(r.lost_count(), 0);
1045        assert_eq!(r.pto_count(), 1);
1046
1047        let p = Sent {
1048            pkt_num: 4,
1049            frames: smallvec![],
1050            time_sent: now,
1051            time_acked: None,
1052            time_lost: None,
1053            size: 1000,
1054            ack_eliciting: true,
1055            in_flight: true,
1056            delivered: 0,
1057            delivered_time: now,
1058            first_sent_time: now,
1059            is_app_limited: false,
1060            tx_in_flight: 0,
1061            lost: 0,
1062            has_data: false,
1063            is_pmtud_probe: false,
1064        };
1065
1066        r.on_packet_sent(
1067            p,
1068            packet::Epoch::Application,
1069            HandshakeStatus::default(),
1070            now,
1071            "",
1072        );
1073        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 3);
1074        assert_eq!(r.bytes_in_flight(), 3000);
1075        assert_eq!(r.bytes_in_flight_duration(), Duration::from_millis(30));
1076
1077        let p = Sent {
1078            pkt_num: 5,
1079            frames: smallvec![],
1080            time_sent: now,
1081            time_acked: None,
1082            time_lost: None,
1083            size: 1000,
1084            ack_eliciting: true,
1085            in_flight: true,
1086            delivered: 0,
1087            delivered_time: now,
1088            first_sent_time: now,
1089            is_app_limited: false,
1090            tx_in_flight: 0,
1091            lost: 0,
1092            has_data: false,
1093            is_pmtud_probe: false,
1094        };
1095
1096        r.on_packet_sent(
1097            p,
1098            packet::Epoch::Application,
1099            HandshakeStatus::default(),
1100            now,
1101            "",
1102        );
1103        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 4);
1104        assert_eq!(r.bytes_in_flight(), 4000);
1105        assert_eq!(r.bytes_in_flight_duration(), Duration::from_millis(30));
1106        assert_eq!(r.lost_count(), 0);
1107
1108        // Wait for 10ms.
1109        now += Duration::from_millis(10);
1110
1111        // PTO packets are acked.
1112        let mut acked = RangeSet::default();
1113        acked.insert(4..6);
1114
1115        assert_eq!(
1116            r.on_ack_received(
1117                &acked,
1118                25,
1119                packet::Epoch::Application,
1120                HandshakeStatus::default(),
1121                now,
1122                None,
1123                "",
1124            )
1125            .unwrap(),
1126            OnAckReceivedOutcome {
1127                lost_packets: 2,
1128                lost_bytes: 2000,
1129                acked_bytes: 2 * 1000,
1130                spurious_losses: 0,
1131            }
1132        );
1133
1134        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 4);
1135        assert_eq!(r.bytes_in_flight(), 0);
1136        assert_eq!(r.bytes_in_flight_duration(), Duration::from_millis(40));
1137
1138        assert_eq!(r.lost_count(), 2);
1139
1140        // Wait 1 RTT.
1141        now += r.rtt();
1142
1143        assert_eq!(
1144            r.detect_lost_packets_for_test(packet::Epoch::Application, now),
1145            (0, 0)
1146        );
1147
1148        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 0);
1149        if cc_algorithm_name == "reno" || cc_algorithm_name == "cubic" {
1150            assert!(r.startup_exit().is_some());
1151            assert_eq!(r.startup_exit().unwrap().reason, StartupExitReason::Loss);
1152        } else {
1153            assert_eq!(r.startup_exit(), None);
1154        }
1155    }
1156
1157    #[rstest]
1158    fn loss_on_timer(
1159        #[values("reno", "cubic", "bbr2_gcongestion")] cc_algorithm_name: &str,
1160    ) {
1161        let mut cfg = Config::new(crate::PROTOCOL_VERSION).unwrap();
1162        assert_eq!(cfg.set_cc_algorithm_name(cc_algorithm_name), Ok(()));
1163
1164        let mut r = Recovery::new(&cfg);
1165
1166        let mut now = Instant::now();
1167
1168        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 0);
1169
1170        // Start by sending a few packets.
1171        let p = Sent {
1172            pkt_num: 0,
1173            frames: smallvec![],
1174            time_sent: now,
1175            time_acked: None,
1176            time_lost: None,
1177            size: 1000,
1178            ack_eliciting: true,
1179            in_flight: true,
1180            delivered: 0,
1181            delivered_time: now,
1182            first_sent_time: now,
1183            is_app_limited: false,
1184            tx_in_flight: 0,
1185            lost: 0,
1186            has_data: false,
1187            is_pmtud_probe: false,
1188        };
1189
1190        r.on_packet_sent(
1191            p,
1192            packet::Epoch::Application,
1193            HandshakeStatus::default(),
1194            now,
1195            "",
1196        );
1197        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 1);
1198        assert_eq!(r.bytes_in_flight(), 1000);
1199        assert_eq!(r.bytes_in_flight_duration(), Duration::ZERO);
1200
1201        let p = Sent {
1202            pkt_num: 1,
1203            frames: smallvec![],
1204            time_sent: now,
1205            time_acked: None,
1206            time_lost: None,
1207            size: 1000,
1208            ack_eliciting: true,
1209            in_flight: true,
1210            delivered: 0,
1211            delivered_time: now,
1212            first_sent_time: now,
1213            is_app_limited: false,
1214            tx_in_flight: 0,
1215            lost: 0,
1216            has_data: false,
1217            is_pmtud_probe: false,
1218        };
1219
1220        r.on_packet_sent(
1221            p,
1222            packet::Epoch::Application,
1223            HandshakeStatus::default(),
1224            now,
1225            "",
1226        );
1227        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 2);
1228        assert_eq!(r.bytes_in_flight(), 2000);
1229        assert_eq!(r.bytes_in_flight_duration(), Duration::ZERO);
1230
1231        let p = Sent {
1232            pkt_num: 2,
1233            frames: smallvec![],
1234            time_sent: now,
1235            time_acked: None,
1236            time_lost: None,
1237            size: 1000,
1238            ack_eliciting: true,
1239            in_flight: true,
1240            delivered: 0,
1241            delivered_time: now,
1242            first_sent_time: now,
1243            is_app_limited: false,
1244            tx_in_flight: 0,
1245            lost: 0,
1246            has_data: false,
1247            is_pmtud_probe: false,
1248        };
1249
1250        r.on_packet_sent(
1251            p,
1252            packet::Epoch::Application,
1253            HandshakeStatus::default(),
1254            now,
1255            "",
1256        );
1257        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 3);
1258        assert_eq!(r.bytes_in_flight(), 3000);
1259        assert_eq!(r.bytes_in_flight_duration(), Duration::ZERO);
1260
1261        let p = Sent {
1262            pkt_num: 3,
1263            frames: smallvec![],
1264            time_sent: now,
1265            time_acked: None,
1266            time_lost: None,
1267            size: 1000,
1268            ack_eliciting: true,
1269            in_flight: true,
1270            delivered: 0,
1271            delivered_time: now,
1272            first_sent_time: now,
1273            is_app_limited: false,
1274            tx_in_flight: 0,
1275            lost: 0,
1276            has_data: false,
1277            is_pmtud_probe: false,
1278        };
1279
1280        r.on_packet_sent(
1281            p,
1282            packet::Epoch::Application,
1283            HandshakeStatus::default(),
1284            now,
1285            "",
1286        );
1287        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 4);
1288        assert_eq!(r.bytes_in_flight(), 4000);
1289        assert_eq!(r.bytes_in_flight_duration(), Duration::ZERO);
1290
1291        // Wait for 10ms.
1292        now += Duration::from_millis(10);
1293
1294        // Only the first 2 packets and the last one are acked.
1295        let mut acked = RangeSet::default();
1296        acked.insert(0..2);
1297        acked.insert(3..4);
1298
1299        assert_eq!(
1300            r.on_ack_received(
1301                &acked,
1302                25,
1303                packet::Epoch::Application,
1304                HandshakeStatus::default(),
1305                now,
1306                None,
1307                "",
1308            )
1309            .unwrap(),
1310            OnAckReceivedOutcome {
1311                lost_packets: 0,
1312                lost_bytes: 0,
1313                acked_bytes: 3 * 1000,
1314                spurious_losses: 0,
1315            }
1316        );
1317
1318        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 2);
1319        assert_eq!(r.bytes_in_flight(), 1000);
1320        assert_eq!(r.bytes_in_flight_duration(), Duration::from_millis(10));
1321        assert_eq!(r.lost_count(), 0);
1322
1323        // Wait until loss detection timer expires.
1324        now = r.loss_detection_timer().unwrap();
1325
1326        // Packet is declared lost.
1327        r.on_loss_detection_timeout(HandshakeStatus::default(), now, "");
1328        assert_eq!(r.loss_probes(packet::Epoch::Application), 0);
1329
1330        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 2);
1331        assert_eq!(r.bytes_in_flight(), 0);
1332        assert_eq!(r.bytes_in_flight_duration(), Duration::from_micros(11250));
1333
1334        assert_eq!(r.lost_count(), 1);
1335
1336        // Wait 1 RTT.
1337        now += r.rtt();
1338
1339        assert_eq!(
1340            r.detect_lost_packets_for_test(packet::Epoch::Application, now),
1341            (0, 0)
1342        );
1343
1344        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 0);
1345        if cc_algorithm_name == "reno" || cc_algorithm_name == "cubic" {
1346            assert!(r.startup_exit().is_some());
1347            assert_eq!(r.startup_exit().unwrap().reason, StartupExitReason::Loss);
1348        } else {
1349            assert_eq!(r.startup_exit(), None);
1350        }
1351    }
1352
1353    #[rstest]
1354    fn loss_on_reordering(
1355        #[values("reno", "cubic", "bbr2_gcongestion")] cc_algorithm_name: &str,
1356    ) {
1357        let mut cfg = Config::new(crate::PROTOCOL_VERSION).unwrap();
1358        assert_eq!(cfg.set_cc_algorithm_name(cc_algorithm_name), Ok(()));
1359
1360        let mut r = Recovery::new(&cfg);
1361
1362        let mut now = Instant::now();
1363
1364        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 0);
1365
1366        // Start by sending a few packets.
1367        //
1368        // pkt number: [0, 1, 2, 3]
1369        for i in 0..4 {
1370            let p = test_utils::helper_packet_sent(i, now, 1000);
1371            r.on_packet_sent(
1372                p,
1373                packet::Epoch::Application,
1374                HandshakeStatus::default(),
1375                now,
1376                "",
1377            );
1378
1379            let pkt_count = (i + 1) as usize;
1380            assert_eq!(r.sent_packets_len(packet::Epoch::Application), pkt_count);
1381            assert_eq!(r.bytes_in_flight(), pkt_count * 1000);
1382            assert_eq!(r.bytes_in_flight_duration(), Duration::ZERO);
1383        }
1384
1385        // Wait for 10ms after sending.
1386        now += Duration::from_millis(10);
1387
1388        // Recieve reordered ACKs, i.e. pkt_num [2, 3]
1389        let mut acked = RangeSet::default();
1390        acked.insert(2..4);
1391        assert_eq!(
1392            r.on_ack_received(
1393                &acked,
1394                25,
1395                packet::Epoch::Application,
1396                HandshakeStatus::default(),
1397                now,
1398                None,
1399                "",
1400            )
1401            .unwrap(),
1402            OnAckReceivedOutcome {
1403                lost_packets: 1,
1404                lost_bytes: 1000,
1405                acked_bytes: 1000 * 2,
1406                spurious_losses: 0,
1407            }
1408        );
1409        // Since we only remove packets from the back to avoid compaction, the
1410        // send length remains the same after receiving reordered ACKs
1411        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 4);
1412        assert_eq!(r.pkt_thresh().unwrap(), INITIAL_PACKET_THRESHOLD);
1413        assert_eq!(r.time_thresh(), INITIAL_TIME_THRESHOLD);
1414
1415        // Wait for 10ms after receiving first set of ACKs.
1416        now += Duration::from_millis(10);
1417
1418        // Recieve remaining ACKs, i.e. pkt_num [0, 1]
1419        let mut acked = RangeSet::default();
1420        acked.insert(0..2);
1421        assert_eq!(
1422            r.on_ack_received(
1423                &acked,
1424                25,
1425                packet::Epoch::Application,
1426                HandshakeStatus::default(),
1427                now,
1428                None,
1429                "",
1430            )
1431            .unwrap(),
1432            OnAckReceivedOutcome {
1433                lost_packets: 0,
1434                lost_bytes: 0,
1435                acked_bytes: 1000,
1436                spurious_losses: 1,
1437            }
1438        );
1439        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 0);
1440        assert_eq!(r.bytes_in_flight(), 0);
1441        assert_eq!(r.bytes_in_flight_duration(), Duration::from_millis(20));
1442
1443        // Spurious loss.
1444        assert_eq!(r.lost_count(), 1);
1445        assert_eq!(r.lost_spurious_count(), 1);
1446
1447        // Packet threshold was increased.
1448        assert_eq!(r.pkt_thresh().unwrap(), 4);
1449        assert_eq!(r.time_thresh(), PACKET_REORDER_TIME_THRESHOLD);
1450
1451        // Wait 1 RTT.
1452        now += r.rtt();
1453
1454        // All packets have been ACKed so dont expect additional lost packets
1455        assert_eq!(
1456            r.detect_lost_packets_for_test(packet::Epoch::Application, now),
1457            (0, 0)
1458        );
1459        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 0);
1460
1461        if cc_algorithm_name == "reno" || cc_algorithm_name == "cubic" {
1462            assert!(r.startup_exit().is_some());
1463            assert_eq!(r.startup_exit().unwrap().reason, StartupExitReason::Loss);
1464        } else {
1465            assert_eq!(r.startup_exit(), None);
1466        }
1467    }
1468
1469    // TODO: This should run agains both `congestion` and `gcongestion`.
1470    // `congestion` and `gcongestion` behave differently. That might be ok
1471    // given the different algorithms but it would be ideal to merge and share
1472    // the logic.
1473    #[rstest]
1474    fn time_thresholds_on_reordering(
1475        #[values("bbr2_gcongestion")] cc_algorithm_name: &str,
1476    ) {
1477        let mut cfg = Config::new(crate::PROTOCOL_VERSION).unwrap();
1478        assert_eq!(cfg.set_cc_algorithm_name(cc_algorithm_name), Ok(()));
1479
1480        let mut now = Instant::now();
1481        let mut r = Recovery::new(&cfg);
1482        assert_eq!(r.rtt(), DEFAULT_INITIAL_RTT);
1483
1484        // Pick time between and above thresholds for testing threshold increase.
1485        //
1486        //```
1487        //              between_thresh_ms
1488        //                         |
1489        //    initial_thresh_ms    |     spurious_thresh_ms
1490        //      v                  v             v
1491        // --------------------------------------------------
1492        //      | ................ | ..................... |
1493        //            THRESH_GAP         THRESH_GAP
1494        // ```
1495        // 
1496        // Threshold gap time.
1497        const THRESH_GAP: Duration = Duration::from_millis(30);
1498        // Initial time theshold based on inital RTT.
1499        let initial_thresh_ms =
1500            DEFAULT_INITIAL_RTT.mul_f64(INITIAL_TIME_THRESHOLD);
1501        // The time threshold after spurious loss.
1502        let spurious_thresh_ms: Duration =
1503            DEFAULT_INITIAL_RTT.mul_f64(PACKET_REORDER_TIME_THRESHOLD);
1504        // Time between the two thresholds
1505        let between_thresh_ms = initial_thresh_ms + THRESH_GAP;
1506        assert!(between_thresh_ms > initial_thresh_ms);
1507        assert!(between_thresh_ms < spurious_thresh_ms);
1508        assert!(between_thresh_ms + THRESH_GAP > spurious_thresh_ms);
1509
1510        for i in 0..6 {
1511            let send_time = now + i * between_thresh_ms;
1512
1513            let p = test_utils::helper_packet_sent(i.into(), send_time, 1000);
1514            r.on_packet_sent(
1515                p,
1516                packet::Epoch::Application,
1517                HandshakeStatus::default(),
1518                send_time,
1519                "",
1520            );
1521        }
1522
1523        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 6);
1524        assert_eq!(r.bytes_in_flight(), 6 * 1000);
1525        assert_eq!(r.pkt_thresh().unwrap(), INITIAL_PACKET_THRESHOLD);
1526        assert_eq!(r.time_thresh(), INITIAL_TIME_THRESHOLD);
1527
1528        // Wait for `between_thresh_ms` after sending to trigger loss based on
1529        // loss threshold.
1530        now += between_thresh_ms;
1531
1532        // Ack packet: 1
1533        //
1534        // [0, 1, 2, 3, 4, 5]
1535        //     ^
1536        let mut acked = RangeSet::default();
1537        acked.insert(1..2);
1538        assert_eq!(
1539            r.on_ack_received(
1540                &acked,
1541                25,
1542                packet::Epoch::Application,
1543                HandshakeStatus::default(),
1544                now,
1545                None,
1546                "",
1547            )
1548            .unwrap(),
1549            OnAckReceivedOutcome {
1550                lost_packets: 1,
1551                lost_bytes: 1000,
1552                acked_bytes: 1000,
1553                spurious_losses: 0,
1554            }
1555        );
1556        assert_eq!(r.pkt_thresh().unwrap(), INITIAL_PACKET_THRESHOLD);
1557        assert_eq!(r.time_thresh(), INITIAL_TIME_THRESHOLD);
1558
1559        // Ack packet: 0
1560        //
1561        // [0, 1, 2, 3, 4, 5]
1562        //  ^  x
1563        let mut acked = RangeSet::default();
1564        acked.insert(0..1);
1565        assert_eq!(
1566            r.on_ack_received(
1567                &acked,
1568                25,
1569                packet::Epoch::Application,
1570                HandshakeStatus::default(),
1571                now,
1572                None,
1573                "",
1574            )
1575            .unwrap(),
1576            OnAckReceivedOutcome {
1577                lost_packets: 0,
1578                lost_bytes: 0,
1579                acked_bytes: 0,
1580                spurious_losses: 1,
1581            }
1582        );
1583        // The time_thresh after spurious loss
1584        assert_eq!(r.time_thresh(), PACKET_REORDER_TIME_THRESHOLD);
1585
1586        // Wait for `between_thresh_ms` after sending. However, since the
1587        // threshold has increased, we do not expect loss.
1588        now += between_thresh_ms;
1589
1590        // Ack packet: 3
1591        //
1592        // [2, 3, 4, 5]
1593        //     ^
1594        let mut acked = RangeSet::default();
1595        acked.insert(3..4);
1596        assert_eq!(
1597            r.on_ack_received(
1598                &acked,
1599                25,
1600                packet::Epoch::Application,
1601                HandshakeStatus::default(),
1602                now,
1603                None,
1604                "",
1605            )
1606            .unwrap(),
1607            OnAckReceivedOutcome {
1608                lost_packets: 0,
1609                lost_bytes: 0,
1610                acked_bytes: 1000,
1611                spurious_losses: 0,
1612            }
1613        );
1614
1615        // Wait for and additional `plus_overhead` to trigger loss based on the
1616        // new time threshold.
1617        now += THRESH_GAP;
1618
1619        // Ack packet: 4
1620        //
1621        // [2, 3, 4, 5]
1622        //     x  ^
1623        let mut acked = RangeSet::default();
1624        acked.insert(4..5);
1625        assert_eq!(
1626            r.on_ack_received(
1627                &acked,
1628                25,
1629                packet::Epoch::Application,
1630                HandshakeStatus::default(),
1631                now,
1632                None,
1633                "",
1634            )
1635            .unwrap(),
1636            OnAckReceivedOutcome {
1637                lost_packets: 1,
1638                lost_bytes: 1000,
1639                acked_bytes: 1000,
1640                spurious_losses: 0,
1641            }
1642        );
1643    }
1644
1645    // TODO: Implement enable_relaxed_loss_threshold and enable this test for the
1646    // congestion module.
1647    #[rstest]
1648    fn relaxed_thresholds_on_reordering(
1649        #[values("bbr2_gcongestion")] cc_algorithm_name: &str,
1650    ) {
1651        let mut cfg = Config::new(crate::PROTOCOL_VERSION).unwrap();
1652        cfg.enable_relaxed_loss_threshold = true;
1653        assert_eq!(cfg.set_cc_algorithm_name(cc_algorithm_name), Ok(()));
1654
1655        let mut now = Instant::now();
1656        let mut r = Recovery::new(&cfg);
1657        assert_eq!(r.rtt(), DEFAULT_INITIAL_RTT);
1658
1659        // Pick time between and above thresholds for testing threshold increase.
1660        //
1661        //```
1662        //              between_thresh_ms
1663        //                         |
1664        //    initial_thresh_ms    |     spurious_thresh_ms
1665        //      v                  v             v
1666        // --------------------------------------------------
1667        //      | ................ | ..................... |
1668        //            THRESH_GAP         THRESH_GAP
1669        // ```
1670        // Threshold gap time.
1671        const THRESH_GAP: Duration = Duration::from_millis(30);
1672        // Initial time theshold based on inital RTT.
1673        let initial_thresh_ms =
1674            DEFAULT_INITIAL_RTT.mul_f64(INITIAL_TIME_THRESHOLD);
1675        // The time threshold after spurious loss.
1676        let spurious_thresh_ms: Duration =
1677            DEFAULT_INITIAL_RTT.mul_f64(PACKET_REORDER_TIME_THRESHOLD);
1678        // Time between the two thresholds
1679        let between_thresh_ms = initial_thresh_ms + THRESH_GAP;
1680        assert!(between_thresh_ms > initial_thresh_ms);
1681        assert!(between_thresh_ms < spurious_thresh_ms);
1682        assert!(between_thresh_ms + THRESH_GAP > spurious_thresh_ms);
1683
1684        for i in 0..6 {
1685            let send_time = now + i * between_thresh_ms;
1686
1687            let p = test_utils::helper_packet_sent(i.into(), send_time, 1000);
1688            r.on_packet_sent(
1689                p,
1690                packet::Epoch::Application,
1691                HandshakeStatus::default(),
1692                send_time,
1693                "",
1694            );
1695        }
1696
1697        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 6);
1698        assert_eq!(r.bytes_in_flight(), 6 * 1000);
1699        // Intitial thresholds
1700        assert_eq!(r.pkt_thresh().unwrap(), INITIAL_PACKET_THRESHOLD);
1701        assert_eq!(r.time_thresh(), INITIAL_TIME_THRESHOLD);
1702
1703        // Wait for `between_thresh_ms` after sending to trigger loss based on
1704        // loss threshold.
1705        now += between_thresh_ms;
1706
1707        // Ack packet: 1
1708        //
1709        // [0, 1, 2, 3, 4, 5]
1710        //     ^
1711        let mut acked = RangeSet::default();
1712        acked.insert(1..2);
1713        assert_eq!(
1714            r.on_ack_received(
1715                &acked,
1716                25,
1717                packet::Epoch::Application,
1718                HandshakeStatus::default(),
1719                now,
1720                None,
1721                "",
1722            )
1723            .unwrap(),
1724            OnAckReceivedOutcome {
1725                lost_packets: 1,
1726                lost_bytes: 1000,
1727                acked_bytes: 1000,
1728                spurious_losses: 0,
1729            }
1730        );
1731        // Thresholds after 1st loss
1732        assert_eq!(r.pkt_thresh().unwrap(), INITIAL_PACKET_THRESHOLD);
1733        assert_eq!(r.time_thresh(), INITIAL_TIME_THRESHOLD);
1734
1735        // Ack packet: 0
1736        //
1737        // [0, 1, 2, 3, 4, 5]
1738        //  ^  x
1739        let mut acked = RangeSet::default();
1740        acked.insert(0..1);
1741        assert_eq!(
1742            r.on_ack_received(
1743                &acked,
1744                25,
1745                packet::Epoch::Application,
1746                HandshakeStatus::default(),
1747                now,
1748                None,
1749                "",
1750            )
1751            .unwrap(),
1752            OnAckReceivedOutcome {
1753                lost_packets: 0,
1754                lost_bytes: 0,
1755                acked_bytes: 0,
1756                spurious_losses: 1,
1757            }
1758        );
1759        // Thresholds after 1st spurious loss
1760        //
1761        // Packet threshold should be disabled. Time threshold overhead should
1762        // stay the same.
1763        assert_eq!(r.pkt_thresh(), None);
1764        assert_eq!(r.time_thresh(), INITIAL_TIME_THRESHOLD);
1765
1766        // Set now to send time of packet 2 so we can trigger spurious loss for
1767        // packet 2.
1768        now += between_thresh_ms;
1769        // Then wait for `between_thresh_ms` after sending packet 2 to trigger
1770        // loss. Since the time threshold has NOT increased, expect a
1771        // loss.
1772        now += between_thresh_ms;
1773
1774        // Ack packet: 3
1775        //
1776        // [2, 3, 4, 5]
1777        //     ^
1778        let mut acked = RangeSet::default();
1779        acked.insert(3..4);
1780        assert_eq!(
1781            r.on_ack_received(
1782                &acked,
1783                25,
1784                packet::Epoch::Application,
1785                HandshakeStatus::default(),
1786                now,
1787                None,
1788                "",
1789            )
1790            .unwrap(),
1791            OnAckReceivedOutcome {
1792                lost_packets: 1,
1793                lost_bytes: 1000,
1794                acked_bytes: 1000,
1795                spurious_losses: 0,
1796            }
1797        );
1798        // Thresholds after 2nd loss.
1799        assert_eq!(r.pkt_thresh(), None);
1800        assert_eq!(r.time_thresh(), INITIAL_TIME_THRESHOLD);
1801
1802        // Wait for and additional `plus_overhead` to trigger loss based on the
1803        // new time threshold.
1804        // now += THRESH_GAP;
1805
1806        // Ack packet: 2
1807        //
1808        // [2, 3, 4, 5]
1809        //  ^  x
1810        let mut acked = RangeSet::default();
1811        acked.insert(2..3);
1812        assert_eq!(
1813            r.on_ack_received(
1814                &acked,
1815                25,
1816                packet::Epoch::Application,
1817                HandshakeStatus::default(),
1818                now,
1819                None,
1820                "",
1821            )
1822            .unwrap(),
1823            OnAckReceivedOutcome {
1824                lost_packets: 0,
1825                lost_bytes: 0,
1826                acked_bytes: 0,
1827                spurious_losses: 1,
1828            }
1829        );
1830        // Thresholds after 2nd spurious loss.
1831        //
1832        // Time threshold overhead should double.
1833        assert_eq!(r.pkt_thresh(), None);
1834        let double_time_thresh_overhead =
1835            1.0 + 2.0 * INITIAL_TIME_THRESHOLD_OVERHEAD;
1836        assert_eq!(r.time_thresh(), double_time_thresh_overhead);
1837    }
1838
1839    #[rstest]
1840    fn pacing(
1841        #[values("reno", "cubic", "bbr2_gcongestion")] cc_algorithm_name: &str,
1842        #[values(false, true)] time_sent_set_to_now: bool,
1843    ) {
1844        let pacing_enabled = cc_algorithm_name == "bbr2" ||
1845            cc_algorithm_name == "bbr2_gcongestion";
1846
1847        let mut cfg = Config::new(crate::PROTOCOL_VERSION).unwrap();
1848        assert_eq!(cfg.set_cc_algorithm_name(cc_algorithm_name), Ok(()));
1849
1850        #[cfg(feature = "internal")]
1851        cfg.set_custom_bbr_params(BbrParams {
1852            time_sent_set_to_now: Some(time_sent_set_to_now),
1853            ..Default::default()
1854        });
1855
1856        let mut r = Recovery::new(&cfg);
1857
1858        let mut now = Instant::now();
1859
1860        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 0);
1861
1862        // send out first packet burst (a full initcwnd).
1863        for i in 0..10 {
1864            let p = Sent {
1865                pkt_num: i,
1866                frames: smallvec![],
1867                time_sent: now,
1868                time_acked: None,
1869                time_lost: None,
1870                size: 1200,
1871                ack_eliciting: true,
1872                in_flight: true,
1873                delivered: 0,
1874                delivered_time: now,
1875                first_sent_time: now,
1876                is_app_limited: false,
1877                tx_in_flight: 0,
1878                lost: 0,
1879                has_data: true,
1880                is_pmtud_probe: false,
1881            };
1882
1883            r.on_packet_sent(
1884                p,
1885                packet::Epoch::Application,
1886                HandshakeStatus::default(),
1887                now,
1888                "",
1889            );
1890        }
1891
1892        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 10);
1893        assert_eq!(r.bytes_in_flight(), 12000);
1894        assert_eq!(r.bytes_in_flight_duration(), Duration::ZERO);
1895
1896        if !pacing_enabled {
1897            assert_eq!(r.pacing_rate(), 0);
1898        } else {
1899            assert_eq!(r.pacing_rate(), 103963);
1900        }
1901        assert_eq!(r.get_packet_send_time(now), now);
1902
1903        assert_eq!(r.cwnd(), 12000);
1904        assert_eq!(r.cwnd_available(), 0);
1905
1906        // Wait 50ms for ACK.
1907        let initial_rtt = Duration::from_millis(50);
1908        now += initial_rtt;
1909
1910        let mut acked = RangeSet::default();
1911        acked.insert(0..10);
1912
1913        assert_eq!(
1914            r.on_ack_received(
1915                &acked,
1916                10,
1917                packet::Epoch::Application,
1918                HandshakeStatus::default(),
1919                now,
1920                None,
1921                "",
1922            )
1923            .unwrap(),
1924            OnAckReceivedOutcome {
1925                lost_packets: 0,
1926                lost_bytes: 0,
1927                acked_bytes: 12000,
1928                spurious_losses: 0,
1929            }
1930        );
1931
1932        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 0);
1933        assert_eq!(r.bytes_in_flight(), 0);
1934        assert_eq!(r.bytes_in_flight_duration(), initial_rtt);
1935        assert_eq!(r.min_rtt(), Some(initial_rtt));
1936        assert_eq!(r.rtt(), initial_rtt);
1937
1938        // 10 MSS increased due to acks.
1939        assert_eq!(r.cwnd(), 12000 + 1200 * 10);
1940
1941        // Send the second packet burst.
1942        let p = Sent {
1943            pkt_num: 10,
1944            frames: smallvec![],
1945            time_sent: now,
1946            time_acked: None,
1947            time_lost: None,
1948            size: 6000,
1949            ack_eliciting: true,
1950            in_flight: true,
1951            delivered: 0,
1952            delivered_time: now,
1953            first_sent_time: now,
1954            is_app_limited: false,
1955            tx_in_flight: 0,
1956            lost: 0,
1957            has_data: true,
1958            is_pmtud_probe: false,
1959        };
1960
1961        r.on_packet_sent(
1962            p,
1963            packet::Epoch::Application,
1964            HandshakeStatus::default(),
1965            now,
1966            "",
1967        );
1968
1969        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 1);
1970        assert_eq!(r.bytes_in_flight(), 6000);
1971        assert_eq!(r.bytes_in_flight_duration(), initial_rtt);
1972
1973        if !pacing_enabled {
1974            // Pacing is disabled.
1975            assert_eq!(r.get_packet_send_time(now), now);
1976        } else {
1977            // Pacing is done from the beginning.
1978            assert_ne!(r.get_packet_send_time(now), now);
1979        }
1980
1981        // Send the third and fourth packet bursts together.
1982        let p = Sent {
1983            pkt_num: 11,
1984            frames: smallvec![],
1985            time_sent: now,
1986            time_acked: None,
1987            time_lost: None,
1988            size: 6000,
1989            ack_eliciting: true,
1990            in_flight: true,
1991            delivered: 0,
1992            delivered_time: now,
1993            first_sent_time: now,
1994            is_app_limited: false,
1995            tx_in_flight: 0,
1996            lost: 0,
1997            has_data: true,
1998            is_pmtud_probe: false,
1999        };
2000
2001        r.on_packet_sent(
2002            p,
2003            packet::Epoch::Application,
2004            HandshakeStatus::default(),
2005            now,
2006            "",
2007        );
2008
2009        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 2);
2010        assert_eq!(r.bytes_in_flight(), 12000);
2011        assert_eq!(r.bytes_in_flight_duration(), initial_rtt);
2012
2013        // Send the fourth packet burst.
2014        let p = Sent {
2015            pkt_num: 12,
2016            frames: smallvec![],
2017            time_sent: now,
2018            time_acked: None,
2019            time_lost: None,
2020            size: 1000,
2021            ack_eliciting: true,
2022            in_flight: true,
2023            delivered: 0,
2024            delivered_time: now,
2025            first_sent_time: now,
2026            is_app_limited: false,
2027            tx_in_flight: 0,
2028            lost: 0,
2029            has_data: true,
2030            is_pmtud_probe: false,
2031        };
2032
2033        r.on_packet_sent(
2034            p,
2035            packet::Epoch::Application,
2036            HandshakeStatus::default(),
2037            now,
2038            "",
2039        );
2040
2041        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 3);
2042        assert_eq!(r.bytes_in_flight(), 13000);
2043        assert_eq!(r.bytes_in_flight_duration(), initial_rtt);
2044
2045        // We pace this outgoing packet. as all conditions for pacing
2046        // are passed.
2047        let pacing_rate = if pacing_enabled {
2048            let cwnd_gain: f64 = 2.0;
2049            // Adjust for cwnd_gain.  BW estimate was made before the CWND
2050            // increase.
2051            let bw = r.cwnd() as f64 / cwnd_gain / initial_rtt.as_secs_f64();
2052            bw as u64
2053        } else {
2054            0
2055        };
2056        assert_eq!(r.pacing_rate(), pacing_rate);
2057
2058        let scale_factor = if pacing_enabled {
2059            // For bbr2_gcongestion, send time is almost 13000 / pacing_rate.
2060            // Don't know where 13000 comes from.
2061            1.08333332
2062        } else {
2063            1.0
2064        };
2065        assert_eq!(
2066            r.get_packet_send_time(now) - now,
2067            if pacing_enabled {
2068                Duration::from_secs_f64(
2069                    scale_factor * 12000.0 / pacing_rate as f64,
2070                )
2071            } else {
2072                Duration::ZERO
2073            }
2074        );
2075        assert_eq!(r.startup_exit(), None);
2076
2077        let reduced_rtt = Duration::from_millis(40);
2078        now += reduced_rtt;
2079
2080        let mut acked = RangeSet::default();
2081        acked.insert(10..11);
2082
2083        assert_eq!(
2084            r.on_ack_received(
2085                &acked,
2086                0,
2087                packet::Epoch::Application,
2088                HandshakeStatus::default(),
2089                now,
2090                None,
2091                "",
2092            )
2093            .unwrap(),
2094            OnAckReceivedOutcome {
2095                lost_packets: 0,
2096                lost_bytes: 0,
2097                acked_bytes: 6000,
2098                spurious_losses: 0,
2099            }
2100        );
2101
2102        let expected_srtt = (7 * initial_rtt + reduced_rtt) / 8;
2103        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 2);
2104        assert_eq!(r.bytes_in_flight(), 7000);
2105        assert_eq!(r.bytes_in_flight_duration(), initial_rtt + reduced_rtt);
2106        assert_eq!(r.min_rtt(), Some(reduced_rtt));
2107        assert_eq!(r.rtt(), expected_srtt);
2108
2109        let mut acked = RangeSet::default();
2110        acked.insert(11..12);
2111
2112        assert_eq!(
2113            r.on_ack_received(
2114                &acked,
2115                0,
2116                packet::Epoch::Application,
2117                HandshakeStatus::default(),
2118                now,
2119                None,
2120                "",
2121            )
2122            .unwrap(),
2123            OnAckReceivedOutcome {
2124                lost_packets: 0,
2125                lost_bytes: 0,
2126                acked_bytes: 6000,
2127                spurious_losses: 0,
2128            }
2129        );
2130
2131        // When enabled, the pacer adds a 25msec delay to the packet
2132        // sends which will be applied to the sent times tracked by
2133        // the recovery module, bringing down RTT to 15msec.
2134        let expected_min_rtt = if pacing_enabled &&
2135            !time_sent_set_to_now &&
2136            cfg!(feature = "internal")
2137        {
2138            reduced_rtt - Duration::from_millis(25)
2139        } else {
2140            reduced_rtt
2141        };
2142
2143        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 1);
2144        assert_eq!(r.bytes_in_flight(), 1000);
2145        assert_eq!(r.bytes_in_flight_duration(), initial_rtt + reduced_rtt);
2146        assert_eq!(r.min_rtt(), Some(expected_min_rtt));
2147
2148        let expected_srtt = (7 * expected_srtt + expected_min_rtt) / 8;
2149        assert_eq!(r.rtt(), expected_srtt);
2150
2151        let mut acked = RangeSet::default();
2152        acked.insert(12..13);
2153
2154        assert_eq!(
2155            r.on_ack_received(
2156                &acked,
2157                0,
2158                packet::Epoch::Application,
2159                HandshakeStatus::default(),
2160                now,
2161                None,
2162                "",
2163            )
2164            .unwrap(),
2165            OnAckReceivedOutcome {
2166                lost_packets: 0,
2167                lost_bytes: 0,
2168                acked_bytes: 1000,
2169                spurious_losses: 0,
2170            }
2171        );
2172
2173        // Pacer adds 50msec delay to the second packet, resulting in
2174        // an effective RTT of 0.
2175        let expected_min_rtt = if pacing_enabled &&
2176            !time_sent_set_to_now &&
2177            cfg!(feature = "internal")
2178        {
2179            Duration::from_millis(0)
2180        } else {
2181            reduced_rtt
2182        };
2183        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 0);
2184        assert_eq!(r.bytes_in_flight(), 0);
2185        assert_eq!(r.bytes_in_flight_duration(), initial_rtt + reduced_rtt);
2186        assert_eq!(r.min_rtt(), Some(expected_min_rtt));
2187
2188        let expected_srtt = (7 * expected_srtt + expected_min_rtt) / 8;
2189        assert_eq!(r.rtt(), expected_srtt);
2190    }
2191
2192    #[rstest]
2193    // initial_cwnd / first_rtt == initial_pacing_rate.  Pacing is 1.0 * bw before
2194    // and after.
2195    #[case::bw_estimate_equal_after_first_rtt(1.0, 1.0)]
2196    // initial_cwnd / first_rtt < initial_pacing_rate.  Pacing decreases from 2 *
2197    // bw to 1.0 * bw.
2198    #[case::bw_estimate_decrease_after_first_rtt(2.0, 1.0)]
2199    // initial_cwnd / first_rtt > initial_pacing_rate from 0.5 * bw to 1.0 * bw.
2200    // Initial pacing remains 0.5 * bw because the initial_pacing_rate parameter
2201    // is used an upper bound for the pacing rate after the first RTT.
2202    // Pacing rate after the first ACK should be:
2203    // min(initial_pacing_rate_bytes_per_second, init_cwnd / first_rtt)
2204    #[case::bw_estimate_increase_after_first_rtt(0.5, 0.5)]
2205    #[cfg(feature = "internal")]
2206    fn initial_pacing_rate_override(
2207        #[case] initial_multipler: f64, #[case] expected_multiplier: f64,
2208    ) {
2209        let rtt = Duration::from_millis(50);
2210        let bw = Bandwidth::from_bytes_and_time_delta(12000, rtt);
2211        let initial_pacing_rate_hint = bw * initial_multipler;
2212        let expected_pacing_with_rtt_measurement = bw * expected_multiplier;
2213
2214        let cc_algorithm_name = "bbr2_gcongestion";
2215        let mut cfg = Config::new(crate::PROTOCOL_VERSION).unwrap();
2216        assert_eq!(cfg.set_cc_algorithm_name(cc_algorithm_name), Ok(()));
2217        cfg.set_custom_bbr_params(BbrParams {
2218            initial_pacing_rate_bytes_per_second: Some(
2219                initial_pacing_rate_hint.to_bytes_per_second(),
2220            ),
2221            ..Default::default()
2222        });
2223
2224        let mut r = Recovery::new(&cfg);
2225
2226        let mut now = Instant::now();
2227
2228        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 0);
2229
2230        // send some packets.
2231        for i in 0..2 {
2232            let p = test_utils::helper_packet_sent(i, now, 1200);
2233            r.on_packet_sent(
2234                p,
2235                packet::Epoch::Application,
2236                HandshakeStatus::default(),
2237                now,
2238                "",
2239            );
2240        }
2241
2242        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 2);
2243        assert_eq!(r.bytes_in_flight(), 2400);
2244        assert_eq!(r.bytes_in_flight_duration(), Duration::ZERO);
2245
2246        // Initial pacing rate matches the override value.
2247        assert_eq!(
2248            r.pacing_rate(),
2249            initial_pacing_rate_hint.to_bytes_per_second()
2250        );
2251        assert_eq!(r.get_packet_send_time(now), now);
2252
2253        assert_eq!(r.cwnd(), 12000);
2254        assert_eq!(r.cwnd_available(), 9600);
2255
2256        // Wait 1 rtt for ACK.
2257        now += rtt;
2258
2259        let mut acked = RangeSet::default();
2260        acked.insert(0..2);
2261
2262        assert_eq!(
2263            r.on_ack_received(
2264                &acked,
2265                10,
2266                packet::Epoch::Application,
2267                HandshakeStatus::default(),
2268                now,
2269                None,
2270                "",
2271            )
2272            .unwrap(),
2273            OnAckReceivedOutcome {
2274                lost_packets: 0,
2275                lost_bytes: 0,
2276                acked_bytes: 2400,
2277                spurious_losses: 0,
2278            }
2279        );
2280
2281        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 0);
2282        assert_eq!(r.bytes_in_flight(), 0);
2283        assert_eq!(r.bytes_in_flight_duration(), rtt);
2284        assert_eq!(r.rtt(), rtt);
2285
2286        // Pacing rate is recalculated based on initial cwnd when the
2287        // first RTT estimate is available.
2288        assert_eq!(
2289            r.pacing_rate(),
2290            expected_pacing_with_rtt_measurement.to_bytes_per_second()
2291        );
2292    }
2293
2294    #[rstest]
2295    fn validate_ack_range_on_ack_received(
2296        #[values("cubic", "bbr2_gcongestion")] cc_algorithm_name: &str,
2297    ) {
2298        let mut cfg = Config::new(crate::PROTOCOL_VERSION).unwrap();
2299        cfg.set_cc_algorithm_name(cc_algorithm_name).unwrap();
2300
2301        let epoch = packet::Epoch::Application;
2302        let mut r = Recovery::new(&cfg);
2303        let mut now = Instant::now();
2304        assert_eq!(r.sent_packets_len(epoch), 0);
2305
2306        // Send 4 packets
2307        let pkt_size = 1000;
2308        let pkt_count = 4;
2309        for pkt_num in 0..pkt_count {
2310            let sent = test_utils::helper_packet_sent(pkt_num, now, pkt_size);
2311            r.on_packet_sent(sent, epoch, HandshakeStatus::default(), now, "");
2312        }
2313        assert_eq!(r.sent_packets_len(epoch), pkt_count as usize);
2314        assert_eq!(r.bytes_in_flight(), pkt_count as usize * pkt_size);
2315        assert!(r.get_largest_acked_on_epoch(epoch).is_none());
2316        assert_eq!(r.largest_sent_pkt_num_on_path(epoch).unwrap(), 3);
2317
2318        // Wait for 10ms.
2319        now += Duration::from_millis(10);
2320
2321        // ACK 2 packets
2322        let mut acked = RangeSet::default();
2323        acked.insert(0..2);
2324
2325        assert_eq!(
2326            r.on_ack_received(
2327                &acked,
2328                25,
2329                epoch,
2330                HandshakeStatus::default(),
2331                now,
2332                None,
2333                "",
2334            )
2335            .unwrap(),
2336            OnAckReceivedOutcome {
2337                lost_packets: 0,
2338                lost_bytes: 0,
2339                acked_bytes: 2 * 1000,
2340                spurious_losses: 0,
2341            }
2342        );
2343
2344        assert_eq!(r.sent_packets_len(epoch), 2);
2345        assert_eq!(r.bytes_in_flight(), 2 * 1000);
2346
2347        assert_eq!(r.get_largest_acked_on_epoch(epoch).unwrap(), 1);
2348        assert_eq!(r.largest_sent_pkt_num_on_path(epoch).unwrap(), 3);
2349
2350        // ACK large range
2351        let mut acked = RangeSet::default();
2352        acked.insert(0..10);
2353        assert_eq!(
2354            r.on_ack_received(
2355                &acked,
2356                25,
2357                epoch,
2358                HandshakeStatus::default(),
2359                now,
2360                None,
2361                "",
2362            )
2363            .unwrap(),
2364            OnAckReceivedOutcome {
2365                lost_packets: 0,
2366                lost_bytes: 0,
2367                acked_bytes: 2 * 1000,
2368                spurious_losses: 0,
2369            }
2370        );
2371        assert_eq!(r.sent_packets_len(epoch), 0);
2372        assert_eq!(r.bytes_in_flight(), 0);
2373
2374        assert_eq!(r.get_largest_acked_on_epoch(epoch).unwrap(), 3);
2375        assert_eq!(r.largest_sent_pkt_num_on_path(epoch).unwrap(), 3);
2376    }
2377
2378    #[rstest]
2379    fn pmtud_loss_on_timer(
2380        #[values("reno", "cubic", "bbr2_gcongestion")] cc_algorithm_name: &str,
2381    ) {
2382        let mut cfg = Config::new(crate::PROTOCOL_VERSION).unwrap();
2383        assert_eq!(cfg.set_cc_algorithm_name(cc_algorithm_name), Ok(()));
2384
2385        let mut r = Recovery::new(&cfg);
2386        assert_eq!(r.cwnd(), 12000);
2387
2388        let mut now = Instant::now();
2389
2390        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 0);
2391
2392        // Start by sending a few packets.
2393        let p = Sent {
2394            pkt_num: 0,
2395            frames: smallvec![],
2396            time_sent: now,
2397            time_acked: None,
2398            time_lost: None,
2399            size: 1000,
2400            ack_eliciting: true,
2401            in_flight: true,
2402            delivered: 0,
2403            delivered_time: now,
2404            first_sent_time: now,
2405            is_app_limited: false,
2406            tx_in_flight: 0,
2407            lost: 0,
2408            has_data: false,
2409            is_pmtud_probe: false,
2410        };
2411
2412        r.on_packet_sent(
2413            p,
2414            packet::Epoch::Application,
2415            HandshakeStatus::default(),
2416            now,
2417            "",
2418        );
2419
2420        assert_eq!(r.in_flight_count(packet::Epoch::Application), 1);
2421        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 1);
2422        assert_eq!(r.bytes_in_flight(), 1000);
2423        assert_eq!(r.bytes_in_flight_duration(), Duration::ZERO);
2424
2425        let p = Sent {
2426            pkt_num: 1,
2427            frames: smallvec![],
2428            time_sent: now,
2429            time_acked: None,
2430            time_lost: None,
2431            size: 1000,
2432            ack_eliciting: true,
2433            in_flight: true,
2434            delivered: 0,
2435            delivered_time: now,
2436            first_sent_time: now,
2437            is_app_limited: false,
2438            tx_in_flight: 0,
2439            lost: 0,
2440            has_data: false,
2441            is_pmtud_probe: true,
2442        };
2443
2444        r.on_packet_sent(
2445            p,
2446            packet::Epoch::Application,
2447            HandshakeStatus::default(),
2448            now,
2449            "",
2450        );
2451
2452        assert_eq!(r.in_flight_count(packet::Epoch::Application), 2);
2453
2454        let p = Sent {
2455            pkt_num: 2,
2456            frames: smallvec![],
2457            time_sent: now,
2458            time_acked: None,
2459            time_lost: None,
2460            size: 1000,
2461            ack_eliciting: true,
2462            in_flight: true,
2463            delivered: 0,
2464            delivered_time: now,
2465            first_sent_time: now,
2466            is_app_limited: false,
2467            tx_in_flight: 0,
2468            lost: 0,
2469            has_data: false,
2470            is_pmtud_probe: false,
2471        };
2472
2473        r.on_packet_sent(
2474            p,
2475            packet::Epoch::Application,
2476            HandshakeStatus::default(),
2477            now,
2478            "",
2479        );
2480
2481        assert_eq!(r.in_flight_count(packet::Epoch::Application), 3);
2482
2483        // Wait for 10ms.
2484        now += Duration::from_millis(10);
2485
2486        // Only the first  packets and the last one are acked.
2487        let mut acked = RangeSet::default();
2488        acked.insert(0..1);
2489        acked.insert(2..3);
2490
2491        assert_eq!(
2492            r.on_ack_received(
2493                &acked,
2494                25,
2495                packet::Epoch::Application,
2496                HandshakeStatus::default(),
2497                now,
2498                None,
2499                "",
2500            )
2501            .unwrap(),
2502            OnAckReceivedOutcome {
2503                lost_packets: 0,
2504                lost_bytes: 0,
2505                acked_bytes: 2 * 1000,
2506                spurious_losses: 0,
2507            }
2508        );
2509
2510        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 2);
2511        assert_eq!(r.bytes_in_flight(), 1000);
2512        assert_eq!(r.bytes_in_flight_duration(), Duration::from_millis(10));
2513        assert_eq!(r.lost_count(), 0);
2514
2515        // Wait until loss detection timer expires.
2516        now = r.loss_detection_timer().unwrap();
2517
2518        // Packet is declared lost.
2519        r.on_loss_detection_timeout(HandshakeStatus::default(), now, "");
2520        assert_eq!(r.loss_probes(packet::Epoch::Application), 0);
2521
2522        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 2);
2523        assert_eq!(r.in_flight_count(packet::Epoch::Application), 0);
2524        assert_eq!(r.bytes_in_flight(), 0);
2525        assert_eq!(r.bytes_in_flight_duration(), Duration::from_micros(11250));
2526        assert_eq!(r.cwnd(), 12000);
2527
2528        assert_eq!(r.lost_count(), 0);
2529
2530        // Wait 1 RTT.
2531        now += r.rtt();
2532
2533        assert_eq!(
2534            r.detect_lost_packets_for_test(packet::Epoch::Application, now),
2535            (0, 0)
2536        );
2537
2538        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 0);
2539        assert_eq!(r.in_flight_count(packet::Epoch::Application), 0);
2540        assert_eq!(r.bytes_in_flight(), 0);
2541        assert_eq!(r.bytes_in_flight_duration(), Duration::from_micros(11250));
2542        assert_eq!(r.lost_count(), 0);
2543        assert_eq!(r.startup_exit(), None);
2544    }
2545
2546    // Modeling delivery_rate for gcongestion is non-trivial so we only test the
2547    // congestion specific algorithms.
2548    #[rstest]
2549    fn congestion_delivery_rate(
2550        #[values("reno", "cubic", "bbr2")] cc_algorithm_name: &str,
2551    ) {
2552        let mut cfg = Config::new(crate::PROTOCOL_VERSION).unwrap();
2553        assert_eq!(cfg.set_cc_algorithm_name(cc_algorithm_name), Ok(()));
2554
2555        let mut r = Recovery::new(&cfg);
2556        assert_eq!(r.cwnd(), 12000);
2557
2558        let now = Instant::now();
2559
2560        let mut total_bytes_sent = 0;
2561        for pn in 0..10 {
2562            // Start by sending a few packets.
2563            let bytes = 1000;
2564            let sent = test_utils::helper_packet_sent(pn, now, bytes);
2565            r.on_packet_sent(
2566                sent,
2567                packet::Epoch::Application,
2568                HandshakeStatus::default(),
2569                now,
2570                "",
2571            );
2572
2573            total_bytes_sent += bytes;
2574        }
2575
2576        // Ack
2577        let interval = Duration::from_secs(10);
2578        let mut acked = RangeSet::default();
2579        acked.insert(0..10);
2580        assert_eq!(
2581            r.on_ack_received(
2582                &acked,
2583                25,
2584                packet::Epoch::Application,
2585                HandshakeStatus::default(),
2586                now + interval,
2587                None,
2588                "",
2589            )
2590            .unwrap(),
2591            OnAckReceivedOutcome {
2592                lost_packets: 0,
2593                lost_bytes: 0,
2594                acked_bytes: total_bytes_sent,
2595                spurious_losses: 0,
2596            }
2597        );
2598        assert_eq!(r.delivery_rate().to_bytes_per_second(), 1000);
2599        assert_eq!(r.min_rtt().unwrap(), interval);
2600        // delivery rate should be in units bytes/sec
2601        assert_eq!(
2602            total_bytes_sent as u64 / interval.as_secs(),
2603            r.delivery_rate().to_bytes_per_second()
2604        );
2605        assert_eq!(r.startup_exit(), None);
2606    }
2607
2608    #[rstest]
2609    fn acks_with_no_retransmittable_data(
2610        #[values("reno", "cubic", "bbr2_gcongestion")] cc_algorithm_name: &str,
2611    ) {
2612        let rtt = Duration::from_millis(100);
2613
2614        let mut cfg = Config::new(crate::PROTOCOL_VERSION).unwrap();
2615        assert_eq!(cfg.set_cc_algorithm_name(cc_algorithm_name), Ok(()));
2616
2617        let mut r = Recovery::new(&cfg);
2618
2619        let mut now = Instant::now();
2620
2621        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 0);
2622
2623        let mut next_packet = 0;
2624        // send some packets.
2625        for _ in 0..3 {
2626            let p = test_utils::helper_packet_sent(next_packet, now, 1200);
2627            next_packet += 1;
2628            r.on_packet_sent(
2629                p,
2630                packet::Epoch::Application,
2631                HandshakeStatus::default(),
2632                now,
2633                "",
2634            );
2635        }
2636
2637        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 3);
2638        assert_eq!(r.bytes_in_flight(), 3600);
2639        assert_eq!(r.bytes_in_flight_duration(), Duration::ZERO);
2640
2641        assert_eq!(
2642            r.pacing_rate(),
2643            if cc_algorithm_name == "bbr2_gcongestion" {
2644                103963
2645            } else {
2646                0
2647            },
2648        );
2649        assert_eq!(r.get_packet_send_time(now), now);
2650        assert_eq!(r.cwnd(), 12000);
2651        assert_eq!(r.cwnd_available(), 8400);
2652
2653        // Wait 1 rtt for ACK.
2654        now += rtt;
2655
2656        let mut acked = RangeSet::default();
2657        acked.insert(0..3);
2658
2659        assert_eq!(
2660            r.on_ack_received(
2661                &acked,
2662                10,
2663                packet::Epoch::Application,
2664                HandshakeStatus::default(),
2665                now,
2666                None,
2667                "",
2668            )
2669            .unwrap(),
2670            OnAckReceivedOutcome {
2671                lost_packets: 0,
2672                lost_bytes: 0,
2673                acked_bytes: 3600,
2674                spurious_losses: 0,
2675            }
2676        );
2677
2678        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 0);
2679        assert_eq!(r.bytes_in_flight(), 0);
2680        assert_eq!(r.bytes_in_flight_duration(), rtt);
2681        assert_eq!(r.rtt(), rtt);
2682
2683        // Pacing rate is recalculated based on initial cwnd when the
2684        // first RTT estimate is available.
2685        assert_eq!(
2686            r.pacing_rate(),
2687            if cc_algorithm_name == "bbr2_gcongestion" {
2688                120000
2689            } else {
2690                0
2691            },
2692        );
2693
2694        // Send some no "in_flight" packets
2695        for iter in 3..1000 {
2696            let mut p = test_utils::helper_packet_sent(next_packet, now, 1200);
2697            // `in_flight = false` marks packets as if they only contained ACK
2698            // frames.
2699            p.in_flight = false;
2700            next_packet += 1;
2701            r.on_packet_sent(
2702                p,
2703                packet::Epoch::Application,
2704                HandshakeStatus::default(),
2705                now,
2706                "",
2707            );
2708
2709            now += rtt;
2710
2711            let mut acked = RangeSet::default();
2712            acked.insert(iter..(iter + 1));
2713
2714            assert_eq!(
2715                r.on_ack_received(
2716                    &acked,
2717                    10,
2718                    packet::Epoch::Application,
2719                    HandshakeStatus::default(),
2720                    now,
2721                    None,
2722                    "",
2723                )
2724                .unwrap(),
2725                OnAckReceivedOutcome {
2726                    lost_packets: 0,
2727                    lost_bytes: 0,
2728                    acked_bytes: 0,
2729                    spurious_losses: 0,
2730                }
2731            );
2732
2733            // Verify that connection has not exited startup.
2734            assert_eq!(r.startup_exit(), None, "{iter}");
2735
2736            // Unchanged metrics.
2737            assert_eq!(
2738                r.sent_packets_len(packet::Epoch::Application),
2739                0,
2740                "{iter}"
2741            );
2742            assert_eq!(r.bytes_in_flight(), 0, "{iter}");
2743            assert_eq!(r.bytes_in_flight_duration(), rtt, "{iter}");
2744            assert_eq!(
2745                r.pacing_rate(),
2746                if cc_algorithm_name == "bbr2_gcongestion" ||
2747                    cc_algorithm_name == "bbr2"
2748                {
2749                    120000
2750                } else {
2751                    0
2752                },
2753                "{iter}"
2754            );
2755        }
2756    }
2757}
2758
2759mod bandwidth;
2760mod bytes_in_flight;
2761mod congestion;
2762mod gcongestion;
2763mod rtt;