Skip to main content

quiche/recovery/
mod.rs

1// Copyright (C) 2018-2019, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27use std::str::FromStr;
28use std::time::Duration;
29use std::time::Instant;
30
31use crate::frame;
32use crate::packet;
33use crate::ranges::RangeSet;
34pub(crate) use crate::recovery::bandwidth::Bandwidth;
35use crate::Config;
36use crate::Result;
37
38#[cfg(feature = "qlog")]
39use qlog::events::EventData;
40#[cfg(feature = "qlog")]
41use serde::Serialize;
42
43use smallvec::SmallVec;
44
45use self::congestion::recovery::LegacyRecovery;
46use self::gcongestion::GRecovery;
47pub use gcongestion::BbrBwLoReductionStrategy;
48pub use gcongestion::BbrParams;
49
50// Loss Recovery
51const INITIAL_PACKET_THRESHOLD: u64 = 3;
52
53const MAX_PACKET_THRESHOLD: u64 = 20;
54
55// Time threshold used to calculate the loss time.
56//
57// https://www.rfc-editor.org/rfc/rfc9002.html#section-6.1.2
58const INITIAL_TIME_THRESHOLD: f64 = 9.0 / 8.0;
59
60// Reduce the sensitivity to packet reordering after the first reordering event.
61//
62// Packet reorder is not a real loss event so quickly reduce the sensitivity to
63// avoid penializing subsequent packet reordering.
64//
65// https://www.rfc-editor.org/rfc/rfc9002.html#section-6.1.2
66//
67// Implementations MAY experiment with absolute thresholds, thresholds from
68// previous connections, adaptive thresholds, or the including of RTT variation.
69// Smaller thresholds reduce reordering resilience and increase spurious
70// retransmissions, and larger thresholds increase loss detection delay.
71const PACKET_REORDER_TIME_THRESHOLD: f64 = 5.0 / 4.0;
72
73// # Experiment: enable_relaxed_loss_threshold
74//
75// Time threshold overhead used to calculate the loss time.
76//
77// The actual threshold is calcualted as 1 + INITIAL_TIME_THRESHOLD_OVERHEAD and
78// equivalent to INITIAL_TIME_THRESHOLD.
79const INITIAL_TIME_THRESHOLD_OVERHEAD: f64 = 1.0 / 8.0;
80// # Experiment: enable_relaxed_loss_threshold
81//
82// The factor by which to increase the time threshold on spurious loss.
83const TIME_THRESHOLD_OVERHEAD_MULTIPLIER: f64 = 2.0;
84
85const GRANULARITY: Duration = Duration::from_millis(1);
86
87const MAX_PTO_PROBES_COUNT: usize = 2;
88
89const MINIMUM_WINDOW_PACKETS: usize = 2;
90
91const LOSS_REDUCTION_FACTOR: f64 = 0.5;
92
93// How many non ACK eliciting packets we send before including a PING to solicit
94// an ACK.
95pub(super) const MAX_OUTSTANDING_NON_ACK_ELICITING: usize = 24;
96
97// The upper cap on the exponent when using exponential backoff for probes. With
98// a value of 20, the maximum possible value is 2^20 = 1,048,576 times the
99// minimum PTO. This prevents arithmetic overflow.
100const MAX_PTO_EXPONENT: u32 = 20;
101
102#[derive(Default)]
103struct LossDetectionTimer {
104    time: Option<Instant>,
105}
106
107impl LossDetectionTimer {
108    fn update(&mut self, timeout: Instant) {
109        self.time = Some(timeout);
110    }
111
112    fn clear(&mut self) {
113        self.time = None;
114    }
115}
116
117impl std::fmt::Debug for LossDetectionTimer {
118    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
119        match self.time {
120            Some(v) => {
121                let now = Instant::now();
122                if v > now {
123                    let d = v.duration_since(now);
124                    write!(f, "{d:?}")
125                } else {
126                    write!(f, "exp")
127                }
128            },
129            None => write!(f, "none"),
130        }
131    }
132}
133
134#[derive(Clone, Copy, PartialEq)]
135pub struct RecoveryConfig {
136    pub initial_rtt: Duration,
137    pub max_send_udp_payload_size: usize,
138    pub max_ack_delay: Duration,
139    pub cc_algorithm: CongestionControlAlgorithm,
140    pub custom_bbr_params: Option<BbrParams>,
141    pub hystart: bool,
142    pub pacing: bool,
143    pub max_pacing_rate: Option<u64>,
144    pub initial_congestion_window_packets: usize,
145    pub enable_relaxed_loss_threshold: bool,
146    pub enable_cubic_idle_restart_fix: bool,
147}
148
149impl RecoveryConfig {
150    pub fn from_config(config: &Config) -> Self {
151        Self {
152            initial_rtt: config.initial_rtt,
153            max_send_udp_payload_size: config.max_send_udp_payload_size,
154            max_ack_delay: Duration::ZERO,
155            cc_algorithm: config.cc_algorithm,
156            custom_bbr_params: config.custom_bbr_params,
157            hystart: config.hystart,
158            pacing: config.pacing,
159            max_pacing_rate: config.max_pacing_rate,
160            initial_congestion_window_packets: config
161                .initial_congestion_window_packets,
162            enable_relaxed_loss_threshold: config.enable_relaxed_loss_threshold,
163            enable_cubic_idle_restart_fix: config.enable_cubic_idle_restart_fix,
164        }
165    }
166}
167
168#[enum_dispatch::enum_dispatch(RecoveryOps)]
169#[allow(clippy::large_enum_variant)]
170#[derive(Debug)]
171pub(crate) enum Recovery {
172    Legacy(LegacyRecovery),
173    GCongestion(GRecovery),
174}
175
176#[derive(Debug, Default, PartialEq)]
177pub struct OnAckReceivedOutcome {
178    pub lost_packets: usize,
179    pub lost_bytes: usize,
180    pub acked_bytes: usize,
181    pub spurious_losses: usize,
182}
183
184#[derive(Debug, Default)]
185pub struct OnLossDetectionTimeoutOutcome {
186    pub lost_packets: usize,
187    pub lost_bytes: usize,
188}
189
190#[enum_dispatch::enum_dispatch]
191/// Api for the Recovery implementation
192pub trait RecoveryOps {
193    fn lost_count(&self) -> usize;
194    fn bytes_lost(&self) -> u64;
195
196    /// Returns whether or not we should elicit an ACK even if we wouldn't
197    /// otherwise have constructed an ACK eliciting packet.
198    fn should_elicit_ack(&self, epoch: packet::Epoch) -> bool;
199
200    fn next_acked_frame(&mut self, epoch: packet::Epoch) -> Option<frame::Frame>;
201
202    fn next_lost_frame(&mut self, epoch: packet::Epoch) -> Option<frame::Frame>;
203
204    fn get_largest_acked_on_epoch(&self, epoch: packet::Epoch) -> Option<u64>;
205    fn has_lost_frames(&self, epoch: packet::Epoch) -> bool;
206    fn loss_probes(&self, epoch: packet::Epoch) -> usize;
207    #[cfg(test)]
208    fn inc_loss_probes(&mut self, epoch: packet::Epoch);
209    #[cfg(test)]
210    fn lost_frames_count(&self, epoch: packet::Epoch) -> usize;
211
212    fn ping_sent(&mut self, epoch: packet::Epoch);
213
214    fn on_packet_sent(
215        &mut self, pkt: Sent, epoch: packet::Epoch,
216        handshake_status: HandshakeStatus, now: Instant, trace_id: &str,
217    );
218    fn get_packet_send_time(&self, now: Instant) -> Instant;
219
220    #[allow(clippy::too_many_arguments)]
221    fn on_ack_received(
222        &mut self, ranges: &RangeSet, ack_delay: u64, epoch: packet::Epoch,
223        handshake_status: HandshakeStatus, now: Instant, skip_pn: Option<u64>,
224        trace_id: &str,
225    ) -> Result<OnAckReceivedOutcome>;
226
227    fn on_loss_detection_timeout(
228        &mut self, handshake_status: HandshakeStatus, now: Instant,
229        trace_id: &str,
230    ) -> OnLossDetectionTimeoutOutcome;
231    fn on_pkt_num_space_discarded(
232        &mut self, epoch: packet::Epoch, handshake_status: HandshakeStatus,
233        now: Instant,
234    );
235    fn on_path_change(
236        &mut self, epoch: packet::Epoch, now: Instant, _trace_id: &str,
237    ) -> (usize, usize);
238    fn loss_detection_timer(&self) -> Option<Instant>;
239    fn cwnd(&self) -> usize;
240    fn cwnd_available(&self) -> usize;
241    fn rtt(&self) -> Duration;
242
243    fn min_rtt(&self) -> Option<Duration>;
244
245    fn max_rtt(&self) -> Option<Duration>;
246
247    fn rttvar(&self) -> Duration;
248
249    fn pto(&self) -> Duration;
250
251    /// The most recent data delivery rate estimate.
252    fn delivery_rate(&self) -> Bandwidth;
253
254    /// Maximum bandwidth estimate, if one is available.
255    fn max_bandwidth(&self) -> Option<Bandwidth>;
256
257    /// Statistics from when a CCA first exited the startup phase.
258    fn startup_exit(&self) -> Option<StartupExit>;
259
260    fn max_datagram_size(&self) -> usize;
261
262    fn pmtud_update_max_datagram_size(&mut self, new_max_datagram_size: usize);
263
264    fn update_max_datagram_size(&mut self, new_max_datagram_size: usize);
265
266    fn on_app_limited(&mut self);
267
268    // Since a recovery module is path specific, this tracks the largest packet
269    // sent per path.
270    #[cfg(test)]
271    fn largest_sent_pkt_num_on_path(&self, epoch: packet::Epoch) -> Option<u64>;
272
273    #[cfg(test)]
274    fn app_limited(&self) -> bool;
275
276    #[cfg(test)]
277    fn sent_packets_len(&self, epoch: packet::Epoch) -> usize;
278
279    fn bytes_in_flight(&self) -> usize;
280
281    fn bytes_in_flight_duration(&self) -> Duration;
282
283    #[cfg(test)]
284    fn in_flight_count(&self, epoch: packet::Epoch) -> usize;
285
286    #[cfg(test)]
287    fn pacing_rate(&self) -> u64;
288
289    #[cfg(test)]
290    fn pto_count(&self) -> u32;
291
292    // This value might be `None` when experiment `enable_relaxed_loss_threshold`
293    // is enabled for gcongestion
294    #[cfg(test)]
295    fn pkt_thresh(&self) -> Option<u64>;
296
297    #[cfg(test)]
298    fn time_thresh(&self) -> f64;
299
300    #[cfg(test)]
301    fn lost_spurious_count(&self) -> usize;
302
303    #[cfg(test)]
304    fn detect_lost_packets_for_test(
305        &mut self, epoch: packet::Epoch, now: Instant,
306    ) -> (usize, usize);
307
308    fn update_app_limited(&mut self, v: bool);
309
310    fn delivery_rate_update_app_limited(&mut self, v: bool);
311
312    fn update_max_ack_delay(&mut self, max_ack_delay: Duration);
313
314    #[cfg(feature = "qlog")]
315    fn state_str(&self, now: Instant) -> &'static str;
316
317    #[cfg(feature = "qlog")]
318    fn get_updated_qlog_event_data(&mut self) -> Option<EventData>;
319
320    #[cfg(feature = "qlog")]
321    fn get_updated_qlog_cc_state(&mut self, now: Instant)
322        -> Option<&'static str>;
323
324    fn send_quantum(&self) -> usize;
325
326    fn get_next_release_time(&self) -> ReleaseDecision;
327
328    fn gcongestion_enabled(&self) -> bool;
329}
330
331impl Recovery {
332    pub fn new_with_config(recovery_config: &RecoveryConfig) -> Self {
333        let grecovery = GRecovery::new(recovery_config);
334        if let Some(grecovery) = grecovery {
335            Recovery::from(grecovery)
336        } else {
337            Recovery::from(LegacyRecovery::new_with_config(recovery_config))
338        }
339    }
340
341    #[cfg(feature = "qlog")]
342    pub fn maybe_qlog(
343        &mut self, qlog: &mut qlog::streamer::QlogStreamer, now: Instant,
344    ) {
345        if let Some(ev_data) = self.get_updated_qlog_event_data() {
346            qlog.add_event_data_with_instant(ev_data, now).ok();
347        }
348
349        if let Some(cc_state) = self.get_updated_qlog_cc_state(now) {
350            let ev_data = EventData::QuicCongestionStateUpdated(
351                qlog::events::quic::CongestionStateUpdated {
352                    old: None,
353                    new: cc_state.to_string(),
354                    trigger: None,
355                },
356            );
357
358            qlog.add_event_data_with_instant(ev_data, now).ok();
359        }
360    }
361
362    #[cfg(test)]
363    pub fn new(config: &Config) -> Self {
364        Self::new_with_config(&RecoveryConfig::from_config(config))
365    }
366}
367
368/// Available congestion control algorithms.
369///
370/// This enum provides currently available list of congestion control
371/// algorithms.
372#[derive(Debug, Copy, Clone, PartialEq, Eq)]
373#[repr(C)]
374pub enum CongestionControlAlgorithm {
375    /// Reno congestion control algorithm. `reno` in a string form.
376    Reno            = 0,
377    /// CUBIC congestion control algorithm (default). `cubic` in a string form.
378    CUBIC           = 1,
379    /// BBRv2 congestion control algorithm implementation from gcongestion
380    /// branch. `bbr2_gcongestion` in a string form.
381    Bbr2Gcongestion = 4,
382}
383
384impl FromStr for CongestionControlAlgorithm {
385    type Err = crate::Error;
386
387    /// Converts a string to `CongestionControlAlgorithm`.
388    ///
389    /// If `name` is not valid, `Error::CongestionControl` is returned.
390    fn from_str(name: &str) -> std::result::Result<Self, Self::Err> {
391        match name {
392            "reno" => Ok(CongestionControlAlgorithm::Reno),
393            "cubic" => Ok(CongestionControlAlgorithm::CUBIC),
394            "bbr" => Ok(CongestionControlAlgorithm::Bbr2Gcongestion),
395            "bbr2" => Ok(CongestionControlAlgorithm::Bbr2Gcongestion),
396            "bbr2_gcongestion" => Ok(CongestionControlAlgorithm::Bbr2Gcongestion),
397            _ => Err(crate::Error::CongestionControl),
398        }
399    }
400}
401
402#[derive(Clone)]
403pub struct Sent {
404    pub pkt_num: u64,
405
406    pub frames: SmallVec<[frame::Frame; 1]>,
407
408    pub time_sent: Instant,
409
410    pub time_acked: Option<Instant>,
411
412    pub time_lost: Option<Instant>,
413
414    pub size: usize,
415
416    pub ack_eliciting: bool,
417
418    pub in_flight: bool,
419
420    pub delivered: usize,
421
422    pub delivered_time: Instant,
423
424    pub first_sent_time: Instant,
425
426    pub is_app_limited: bool,
427
428    pub tx_in_flight: usize,
429
430    pub lost: u64,
431
432    pub has_data: bool,
433
434    pub is_pmtud_probe: bool,
435}
436
437impl std::fmt::Debug for Sent {
438    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
439        write!(f, "pkt_num={:?} ", self.pkt_num)?;
440        write!(f, "pkt_sent_time={:?} ", self.time_sent)?;
441        write!(f, "pkt_size={:?} ", self.size)?;
442        write!(f, "delivered={:?} ", self.delivered)?;
443        write!(f, "delivered_time={:?} ", self.delivered_time)?;
444        write!(f, "first_sent_time={:?} ", self.first_sent_time)?;
445        write!(f, "is_app_limited={} ", self.is_app_limited)?;
446        write!(f, "tx_in_flight={} ", self.tx_in_flight)?;
447        write!(f, "lost={} ", self.lost)?;
448        write!(f, "has_data={} ", self.has_data)?;
449        write!(f, "is_pmtud_probe={}", self.is_pmtud_probe)?;
450
451        Ok(())
452    }
453}
454
455#[derive(Clone, Copy, Debug)]
456pub struct HandshakeStatus {
457    pub has_handshake_keys: bool,
458
459    pub peer_verified_address: bool,
460
461    pub completed: bool,
462}
463
464#[cfg(test)]
465impl Default for HandshakeStatus {
466    fn default() -> HandshakeStatus {
467        HandshakeStatus {
468            has_handshake_keys: true,
469
470            peer_verified_address: true,
471
472            completed: true,
473        }
474    }
475}
476
477// We don't need to log all qlog metrics every time there is a recovery event.
478// Instead, we can log only the MetricsUpdated event data fields that we care
479// about, only when they change. To support this, the QLogMetrics structure
480// keeps a running picture of the fields.
481#[derive(Default)]
482#[cfg(feature = "qlog")]
483struct QlogMetrics {
484    min_rtt: Duration,
485    smoothed_rtt: Duration,
486    latest_rtt: Duration,
487    rttvar: Duration,
488    cwnd: u64,
489    bytes_in_flight: u64,
490    ssthresh: Option<u64>,
491    pacing_rate: Option<u64>,
492    delivery_rate: Option<u64>,
493    send_rate: Option<u64>,
494    ack_rate: Option<u64>,
495    lost_packets: Option<u64>,
496    lost_bytes: Option<u64>,
497    pto_count: Option<u32>,
498}
499
500#[cfg(feature = "qlog")]
501trait CustomCfQlogField {
502    fn name(&self) -> &'static str;
503    fn as_json_value(&self) -> serde_json::Value;
504}
505
506#[cfg(feature = "qlog")]
507#[serde_with::skip_serializing_none]
508#[derive(Serialize)]
509struct TotalAndDelta {
510    total: Option<u64>,
511    delta: Option<u64>,
512}
513
514#[cfg(feature = "qlog")]
515struct CustomQlogField<T> {
516    name: &'static str,
517    value: T,
518}
519
520#[cfg(feature = "qlog")]
521impl<T> CustomQlogField<T> {
522    fn new(name: &'static str, value: T) -> Self {
523        Self { name, value }
524    }
525}
526
527#[cfg(feature = "qlog")]
528impl<T: Serialize> CustomCfQlogField for CustomQlogField<T> {
529    fn name(&self) -> &'static str {
530        self.name
531    }
532
533    fn as_json_value(&self) -> serde_json::Value {
534        serde_json::json!(&self.value)
535    }
536}
537
538#[cfg(feature = "qlog")]
539struct CfExData(qlog::events::ExData);
540
541#[cfg(feature = "qlog")]
542impl CfExData {
543    fn new() -> Self {
544        Self(qlog::events::ExData::new())
545    }
546
547    fn insert<T: Serialize>(&mut self, name: &'static str, value: T) {
548        let field = CustomQlogField::new(name, value);
549        self.0
550            .insert(field.name().to_string(), field.as_json_value());
551    }
552
553    fn into_inner(self) -> qlog::events::ExData {
554        self.0
555    }
556}
557
558#[cfg(feature = "qlog")]
559impl QlogMetrics {
560    // Make a qlog event if the latest instance of QlogMetrics is different.
561    //
562    // This function diffs each of the fields. A qlog MetricsUpdated event is
563    // only generated if at least one field is different. Where fields are
564    // different, the qlog event contains the latest value.
565    fn maybe_update(&mut self, latest: Self) -> Option<EventData> {
566        let mut emit_event = false;
567
568        let new_min_rtt = if self.min_rtt != latest.min_rtt {
569            self.min_rtt = latest.min_rtt;
570            emit_event = true;
571            Some(latest.min_rtt.as_secs_f32() * 1000.0)
572        } else {
573            None
574        };
575
576        let new_smoothed_rtt = if self.smoothed_rtt != latest.smoothed_rtt {
577            self.smoothed_rtt = latest.smoothed_rtt;
578            emit_event = true;
579            Some(latest.smoothed_rtt.as_secs_f32() * 1000.0)
580        } else {
581            None
582        };
583
584        let new_latest_rtt = if self.latest_rtt != latest.latest_rtt {
585            self.latest_rtt = latest.latest_rtt;
586            emit_event = true;
587            Some(latest.latest_rtt.as_secs_f32() * 1000.0)
588        } else {
589            None
590        };
591
592        let new_rttvar = if self.rttvar != latest.rttvar {
593            self.rttvar = latest.rttvar;
594            emit_event = true;
595            Some(latest.rttvar.as_secs_f32() * 1000.0)
596        } else {
597            None
598        };
599
600        let new_cwnd = if self.cwnd != latest.cwnd {
601            self.cwnd = latest.cwnd;
602            emit_event = true;
603            Some(latest.cwnd)
604        } else {
605            None
606        };
607
608        let new_bytes_in_flight =
609            if self.bytes_in_flight != latest.bytes_in_flight {
610                self.bytes_in_flight = latest.bytes_in_flight;
611                emit_event = true;
612                Some(latest.bytes_in_flight)
613            } else {
614                None
615            };
616
617        let new_ssthresh = if self.ssthresh != latest.ssthresh {
618            self.ssthresh = latest.ssthresh;
619            emit_event = true;
620            latest.ssthresh
621        } else {
622            None
623        };
624
625        let new_pacing_rate = if self.pacing_rate != latest.pacing_rate {
626            self.pacing_rate = latest.pacing_rate;
627            emit_event = true;
628            latest.pacing_rate
629        } else {
630            None
631        };
632
633        let new_pto_count =
634            if latest.pto_count.is_some() && self.pto_count != latest.pto_count {
635                self.pto_count = latest.pto_count;
636                emit_event = true;
637                latest.pto_count.map(|v| v as u16)
638            } else {
639                None
640            };
641
642        // Build ex_data for rate metrics
643        let mut ex_data = CfExData::new();
644        if self.delivery_rate != latest.delivery_rate {
645            if let Some(rate) = latest.delivery_rate {
646                self.delivery_rate = latest.delivery_rate;
647                emit_event = true;
648                ex_data.insert("cf_delivery_rate", rate);
649            }
650        }
651        if self.send_rate != latest.send_rate {
652            if let Some(rate) = latest.send_rate {
653                self.send_rate = latest.send_rate;
654                emit_event = true;
655                ex_data.insert("cf_send_rate", rate);
656            }
657        }
658        if self.ack_rate != latest.ack_rate {
659            if let Some(rate) = latest.ack_rate {
660                self.ack_rate = latest.ack_rate;
661                emit_event = true;
662                ex_data.insert("cf_ack_rate", rate);
663            }
664        }
665
666        if self.lost_packets != latest.lost_packets {
667            if let Some(val) = latest.lost_packets {
668                emit_event = true;
669                ex_data.insert("cf_lost_packets", TotalAndDelta {
670                    total: latest.lost_packets,
671                    delta: Some(val - self.lost_packets.unwrap_or(0)),
672                });
673                self.lost_packets = latest.lost_packets;
674            }
675        }
676        if self.lost_bytes != latest.lost_bytes {
677            if let Some(val) = latest.lost_bytes {
678                emit_event = true;
679                ex_data.insert("cf_lost_bytes", TotalAndDelta {
680                    total: latest.lost_bytes,
681                    delta: Some(val - self.lost_bytes.unwrap_or(0)),
682                });
683                self.lost_bytes = latest.lost_bytes;
684            }
685        }
686
687        if emit_event {
688            return Some(EventData::QuicMetricsUpdated(
689                qlog::events::quic::RecoveryMetricsUpdated {
690                    min_rtt: new_min_rtt,
691                    smoothed_rtt: new_smoothed_rtt,
692                    latest_rtt: new_latest_rtt,
693                    rtt_variance: new_rttvar,
694                    congestion_window: new_cwnd,
695                    bytes_in_flight: new_bytes_in_flight,
696                    ssthresh: new_ssthresh,
697                    pacing_rate: new_pacing_rate,
698                    pto_count: new_pto_count,
699                    ex_data: ex_data.into_inner(),
700                    ..Default::default()
701                },
702            ));
703        }
704
705        None
706    }
707}
708
709/// When the pacer thinks is a good time to release the next packet
710#[derive(Debug, Clone, Copy, PartialEq, Eq)]
711pub enum ReleaseTime {
712    Immediate,
713    At(Instant),
714}
715
716/// When the next packet should be release and if it can be part of a burst
717#[derive(Clone, Copy, Debug, PartialEq, Eq)]
718pub struct ReleaseDecision {
719    time: ReleaseTime,
720    allow_burst: bool,
721}
722
723impl ReleaseTime {
724    /// Add the specific delay to the current time
725    fn inc(&mut self, delay: Duration) {
726        match self {
727            ReleaseTime::Immediate => {},
728            ReleaseTime::At(time) => *time += delay,
729        }
730    }
731
732    /// Set the time to the later of two times
733    fn set_max(&mut self, other: Instant) {
734        match self {
735            ReleaseTime::Immediate => *self = ReleaseTime::At(other),
736            ReleaseTime::At(time) => *self = ReleaseTime::At(other.max(*time)),
737        }
738    }
739}
740
741impl ReleaseDecision {
742    pub(crate) const EQUAL_THRESHOLD: Duration = Duration::from_micros(50);
743
744    /// Get the [`Instant`] the next packet should be released. It will never be
745    /// in the past.
746    #[inline]
747    pub fn time(&self, now: Instant) -> Option<Instant> {
748        match self.time {
749            ReleaseTime::Immediate => None,
750            ReleaseTime::At(other) => other.gt(&now).then_some(other),
751        }
752    }
753
754    /// Can this packet be appended to a previous burst
755    #[inline]
756    pub fn can_burst(&self) -> bool {
757        self.allow_burst
758    }
759
760    /// Check if the two packets can be released at the same time
761    #[inline]
762    pub fn time_eq(&self, other: &Self, now: Instant) -> bool {
763        let delta = match (self.time(now), other.time(now)) {
764            (None, None) => Duration::ZERO,
765            (Some(t), None) | (None, Some(t)) => t.duration_since(now),
766            (Some(t1), Some(t2)) if t1 < t2 => t2.duration_since(t1),
767            (Some(t1), Some(t2)) => t1.duration_since(t2),
768        };
769
770        delta <= Self::EQUAL_THRESHOLD
771    }
772}
773
774/// Recovery statistics
775#[derive(Default, Debug)]
776pub struct RecoveryStats {
777    startup_exit: Option<StartupExit>,
778}
779
780impl RecoveryStats {
781    // Record statistics when a CCA first exits startup.
782    pub fn set_startup_exit(&mut self, startup_exit: StartupExit) {
783        if self.startup_exit.is_none() {
784            self.startup_exit = Some(startup_exit);
785        }
786    }
787}
788
789/// Statistics from when a CCA first exited the startup phase.
790#[derive(Debug, Clone, Copy, PartialEq)]
791pub struct StartupExit {
792    /// The congestion_window recorded at Startup exit.
793    pub cwnd: usize,
794
795    /// The bandwidth estimate recorded at Startup exit.
796    pub bandwidth: Option<u64>,
797
798    /// The reason a CCA exited the startup phase.
799    pub reason: StartupExitReason,
800}
801
802impl StartupExit {
803    fn new(
804        cwnd: usize, bandwidth: Option<Bandwidth>, reason: StartupExitReason,
805    ) -> Self {
806        let bandwidth = bandwidth.map(Bandwidth::to_bytes_per_second);
807        Self {
808            cwnd,
809            bandwidth,
810            reason,
811        }
812    }
813}
814
815/// The reason a CCA exited the startup phase.
816#[derive(Debug, Clone, Copy, PartialEq)]
817pub enum StartupExitReason {
818    /// Exit slow start or BBR startup due to excessive loss
819    Loss,
820
821    /// Exit BBR startup due to bandwidth plateau.
822    BandwidthPlateau,
823
824    /// Exit BBR startup due to persistent queue.
825    PersistentQueue,
826
827    /// Exit HyStart++ conservative slow start after the max rounds allowed.
828    ConservativeSlowStartRounds,
829}
830
831#[cfg(test)]
832mod tests {
833    use super::*;
834    use crate::packet;
835    use crate::range_buf::RangeBuf;
836    use crate::test_utils;
837    use crate::CongestionControlAlgorithm;
838    use crate::DEFAULT_INITIAL_RTT;
839    use rstest::rstest;
840    use smallvec::smallvec;
841    use std::str::FromStr;
842
843    fn recovery_for_alg(algo: CongestionControlAlgorithm) -> Recovery {
844        let mut cfg = Config::new(crate::PROTOCOL_VERSION).unwrap();
845        cfg.set_cc_algorithm(algo);
846        Recovery::new(&cfg)
847    }
848
849    #[test]
850    fn lookup_cc_algo_ok() {
851        let algo = CongestionControlAlgorithm::from_str("reno").unwrap();
852        assert_eq!(algo, CongestionControlAlgorithm::Reno);
853        assert!(!recovery_for_alg(algo).gcongestion_enabled());
854
855        let algo = CongestionControlAlgorithm::from_str("cubic").unwrap();
856        assert_eq!(algo, CongestionControlAlgorithm::CUBIC);
857        assert!(!recovery_for_alg(algo).gcongestion_enabled());
858
859        let algo = CongestionControlAlgorithm::from_str("bbr").unwrap();
860        assert_eq!(algo, CongestionControlAlgorithm::Bbr2Gcongestion);
861        assert!(recovery_for_alg(algo).gcongestion_enabled());
862
863        let algo = CongestionControlAlgorithm::from_str("bbr2").unwrap();
864        assert_eq!(algo, CongestionControlAlgorithm::Bbr2Gcongestion);
865        assert!(recovery_for_alg(algo).gcongestion_enabled());
866
867        let algo =
868            CongestionControlAlgorithm::from_str("bbr2_gcongestion").unwrap();
869        assert_eq!(algo, CongestionControlAlgorithm::Bbr2Gcongestion);
870        assert!(recovery_for_alg(algo).gcongestion_enabled());
871    }
872
873    #[test]
874    fn lookup_cc_algo_bad() {
875        assert_eq!(
876            CongestionControlAlgorithm::from_str("???"),
877            Err(crate::Error::CongestionControl)
878        );
879    }
880
881    #[rstest]
882    fn loss_on_pto(
883        #[values("reno", "cubic", "bbr2_gcongestion")] cc_algorithm_name: &str,
884    ) {
885        let mut cfg = Config::new(crate::PROTOCOL_VERSION).unwrap();
886        assert_eq!(cfg.set_cc_algorithm_name(cc_algorithm_name), Ok(()));
887
888        let mut r = Recovery::new(&cfg);
889
890        let mut now = Instant::now();
891
892        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 0);
893
894        // Start by sending a few packets.
895        let p = Sent {
896            pkt_num: 0,
897            frames: smallvec![],
898            time_sent: now,
899            time_acked: None,
900            time_lost: None,
901            size: 1000,
902            ack_eliciting: true,
903            in_flight: true,
904            delivered: 0,
905            delivered_time: now,
906            first_sent_time: now,
907            is_app_limited: false,
908            tx_in_flight: 0,
909            lost: 0,
910            has_data: false,
911            is_pmtud_probe: false,
912        };
913
914        r.on_packet_sent(
915            p,
916            packet::Epoch::Application,
917            HandshakeStatus::default(),
918            now,
919            "",
920        );
921
922        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 1);
923        assert_eq!(r.bytes_in_flight(), 1000);
924        assert_eq!(r.bytes_in_flight_duration(), Duration::ZERO);
925
926        let p = Sent {
927            pkt_num: 1,
928            frames: smallvec![],
929            time_sent: now,
930            time_acked: None,
931            time_lost: None,
932            size: 1000,
933            ack_eliciting: true,
934            in_flight: true,
935            delivered: 0,
936            delivered_time: now,
937            first_sent_time: now,
938            is_app_limited: false,
939            tx_in_flight: 0,
940            lost: 0,
941            has_data: false,
942            is_pmtud_probe: false,
943        };
944
945        r.on_packet_sent(
946            p,
947            packet::Epoch::Application,
948            HandshakeStatus::default(),
949            now,
950            "",
951        );
952
953        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 2);
954        assert_eq!(r.bytes_in_flight(), 2000);
955        assert_eq!(r.bytes_in_flight_duration(), Duration::ZERO);
956
957        let p = Sent {
958            pkt_num: 2,
959            frames: smallvec![],
960            time_sent: now,
961            time_acked: None,
962            time_lost: None,
963            size: 1000,
964            ack_eliciting: true,
965            in_flight: true,
966            delivered: 0,
967            delivered_time: now,
968            first_sent_time: now,
969            is_app_limited: false,
970            tx_in_flight: 0,
971            lost: 0,
972            has_data: false,
973            is_pmtud_probe: false,
974        };
975
976        r.on_packet_sent(
977            p,
978            packet::Epoch::Application,
979            HandshakeStatus::default(),
980            now,
981            "",
982        );
983        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 3);
984        assert_eq!(r.bytes_in_flight(), 3000);
985        assert_eq!(r.bytes_in_flight_duration(), Duration::ZERO);
986
987        let p = Sent {
988            pkt_num: 3,
989            frames: smallvec![],
990            time_sent: now,
991            time_acked: None,
992            time_lost: None,
993            size: 1000,
994            ack_eliciting: true,
995            in_flight: true,
996            delivered: 0,
997            delivered_time: now,
998            first_sent_time: now,
999            is_app_limited: false,
1000            tx_in_flight: 0,
1001            lost: 0,
1002            has_data: false,
1003            is_pmtud_probe: false,
1004        };
1005
1006        r.on_packet_sent(
1007            p,
1008            packet::Epoch::Application,
1009            HandshakeStatus::default(),
1010            now,
1011            "",
1012        );
1013        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 4);
1014        assert_eq!(r.bytes_in_flight(), 4000);
1015        assert_eq!(r.bytes_in_flight_duration(), Duration::ZERO);
1016
1017        // Wait for 10ms.
1018        now += Duration::from_millis(10);
1019
1020        // Only the first 2 packets are acked.
1021        let mut acked = RangeSet::default();
1022        acked.insert(0..2);
1023
1024        assert_eq!(
1025            r.on_ack_received(
1026                &acked,
1027                25,
1028                packet::Epoch::Application,
1029                HandshakeStatus::default(),
1030                now,
1031                None,
1032                "",
1033            )
1034            .unwrap(),
1035            OnAckReceivedOutcome {
1036                lost_packets: 0,
1037                lost_bytes: 0,
1038                acked_bytes: 2 * 1000,
1039                spurious_losses: 0,
1040            }
1041        );
1042
1043        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 2);
1044        assert_eq!(r.bytes_in_flight(), 2000);
1045        assert_eq!(r.bytes_in_flight_duration(), Duration::from_millis(10));
1046        assert_eq!(r.lost_count(), 0);
1047
1048        // Wait until loss detection timer expires.
1049        now = r.loss_detection_timer().unwrap();
1050
1051        // PTO.
1052        r.on_loss_detection_timeout(HandshakeStatus::default(), now, "");
1053        assert_eq!(r.loss_probes(packet::Epoch::Application), 1);
1054        assert_eq!(r.lost_count(), 0);
1055        assert_eq!(r.pto_count(), 1);
1056
1057        let p = Sent {
1058            pkt_num: 4,
1059            frames: smallvec![],
1060            time_sent: now,
1061            time_acked: None,
1062            time_lost: None,
1063            size: 1000,
1064            ack_eliciting: true,
1065            in_flight: true,
1066            delivered: 0,
1067            delivered_time: now,
1068            first_sent_time: now,
1069            is_app_limited: false,
1070            tx_in_flight: 0,
1071            lost: 0,
1072            has_data: false,
1073            is_pmtud_probe: false,
1074        };
1075
1076        r.on_packet_sent(
1077            p,
1078            packet::Epoch::Application,
1079            HandshakeStatus::default(),
1080            now,
1081            "",
1082        );
1083        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 3);
1084        assert_eq!(r.bytes_in_flight(), 3000);
1085        assert_eq!(r.bytes_in_flight_duration(), Duration::from_millis(30));
1086
1087        let p = Sent {
1088            pkt_num: 5,
1089            frames: smallvec![],
1090            time_sent: now,
1091            time_acked: None,
1092            time_lost: None,
1093            size: 1000,
1094            ack_eliciting: true,
1095            in_flight: true,
1096            delivered: 0,
1097            delivered_time: now,
1098            first_sent_time: now,
1099            is_app_limited: false,
1100            tx_in_flight: 0,
1101            lost: 0,
1102            has_data: false,
1103            is_pmtud_probe: false,
1104        };
1105
1106        r.on_packet_sent(
1107            p,
1108            packet::Epoch::Application,
1109            HandshakeStatus::default(),
1110            now,
1111            "",
1112        );
1113        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 4);
1114        assert_eq!(r.bytes_in_flight(), 4000);
1115        assert_eq!(r.bytes_in_flight_duration(), Duration::from_millis(30));
1116        assert_eq!(r.lost_count(), 0);
1117
1118        // Wait for 10ms.
1119        now += Duration::from_millis(10);
1120
1121        // PTO packets are acked.
1122        let mut acked = RangeSet::default();
1123        acked.insert(4..6);
1124
1125        assert_eq!(
1126            r.on_ack_received(
1127                &acked,
1128                25,
1129                packet::Epoch::Application,
1130                HandshakeStatus::default(),
1131                now,
1132                None,
1133                "",
1134            )
1135            .unwrap(),
1136            OnAckReceivedOutcome {
1137                lost_packets: 2,
1138                lost_bytes: 2000,
1139                acked_bytes: 2 * 1000,
1140                spurious_losses: 0,
1141            }
1142        );
1143
1144        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 4);
1145        assert_eq!(r.bytes_in_flight(), 0);
1146        assert_eq!(r.bytes_in_flight_duration(), Duration::from_millis(40));
1147
1148        assert_eq!(r.lost_count(), 2);
1149
1150        // Wait 1 RTT.
1151        now += r.rtt();
1152
1153        assert_eq!(
1154            r.detect_lost_packets_for_test(packet::Epoch::Application, now),
1155            (0, 0)
1156        );
1157
1158        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 0);
1159        if cc_algorithm_name == "reno" || cc_algorithm_name == "cubic" {
1160            assert!(r.startup_exit().is_some());
1161            assert_eq!(r.startup_exit().unwrap().reason, StartupExitReason::Loss);
1162        } else {
1163            assert_eq!(r.startup_exit(), None);
1164        }
1165    }
1166
1167    #[rstest]
1168    fn loss_on_timer(
1169        #[values("reno", "cubic", "bbr2_gcongestion")] cc_algorithm_name: &str,
1170    ) {
1171        let mut cfg = Config::new(crate::PROTOCOL_VERSION).unwrap();
1172        assert_eq!(cfg.set_cc_algorithm_name(cc_algorithm_name), Ok(()));
1173
1174        let mut r = Recovery::new(&cfg);
1175
1176        let mut now = Instant::now();
1177
1178        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 0);
1179
1180        // Start by sending a few packets.
1181        let p = Sent {
1182            pkt_num: 0,
1183            frames: smallvec![],
1184            time_sent: now,
1185            time_acked: None,
1186            time_lost: None,
1187            size: 1000,
1188            ack_eliciting: true,
1189            in_flight: true,
1190            delivered: 0,
1191            delivered_time: now,
1192            first_sent_time: now,
1193            is_app_limited: false,
1194            tx_in_flight: 0,
1195            lost: 0,
1196            has_data: false,
1197            is_pmtud_probe: false,
1198        };
1199
1200        r.on_packet_sent(
1201            p,
1202            packet::Epoch::Application,
1203            HandshakeStatus::default(),
1204            now,
1205            "",
1206        );
1207        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 1);
1208        assert_eq!(r.bytes_in_flight(), 1000);
1209        assert_eq!(r.bytes_in_flight_duration(), Duration::ZERO);
1210
1211        let p = Sent {
1212            pkt_num: 1,
1213            frames: smallvec![],
1214            time_sent: now,
1215            time_acked: None,
1216            time_lost: None,
1217            size: 1000,
1218            ack_eliciting: true,
1219            in_flight: true,
1220            delivered: 0,
1221            delivered_time: now,
1222            first_sent_time: now,
1223            is_app_limited: false,
1224            tx_in_flight: 0,
1225            lost: 0,
1226            has_data: false,
1227            is_pmtud_probe: false,
1228        };
1229
1230        r.on_packet_sent(
1231            p,
1232            packet::Epoch::Application,
1233            HandshakeStatus::default(),
1234            now,
1235            "",
1236        );
1237        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 2);
1238        assert_eq!(r.bytes_in_flight(), 2000);
1239        assert_eq!(r.bytes_in_flight_duration(), Duration::ZERO);
1240
1241        let p = Sent {
1242            pkt_num: 2,
1243            frames: smallvec![],
1244            time_sent: now,
1245            time_acked: None,
1246            time_lost: None,
1247            size: 1000,
1248            ack_eliciting: true,
1249            in_flight: true,
1250            delivered: 0,
1251            delivered_time: now,
1252            first_sent_time: now,
1253            is_app_limited: false,
1254            tx_in_flight: 0,
1255            lost: 0,
1256            has_data: false,
1257            is_pmtud_probe: false,
1258        };
1259
1260        r.on_packet_sent(
1261            p,
1262            packet::Epoch::Application,
1263            HandshakeStatus::default(),
1264            now,
1265            "",
1266        );
1267        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 3);
1268        assert_eq!(r.bytes_in_flight(), 3000);
1269        assert_eq!(r.bytes_in_flight_duration(), Duration::ZERO);
1270
1271        let p = Sent {
1272            pkt_num: 3,
1273            frames: smallvec![],
1274            time_sent: now,
1275            time_acked: None,
1276            time_lost: None,
1277            size: 1000,
1278            ack_eliciting: true,
1279            in_flight: true,
1280            delivered: 0,
1281            delivered_time: now,
1282            first_sent_time: now,
1283            is_app_limited: false,
1284            tx_in_flight: 0,
1285            lost: 0,
1286            has_data: false,
1287            is_pmtud_probe: false,
1288        };
1289
1290        r.on_packet_sent(
1291            p,
1292            packet::Epoch::Application,
1293            HandshakeStatus::default(),
1294            now,
1295            "",
1296        );
1297        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 4);
1298        assert_eq!(r.bytes_in_flight(), 4000);
1299        assert_eq!(r.bytes_in_flight_duration(), Duration::ZERO);
1300
1301        // Wait for 10ms.
1302        now += Duration::from_millis(10);
1303
1304        // Only the first 2 packets and the last one are acked.
1305        let mut acked = RangeSet::default();
1306        acked.insert(0..2);
1307        acked.insert(3..4);
1308
1309        assert_eq!(
1310            r.on_ack_received(
1311                &acked,
1312                25,
1313                packet::Epoch::Application,
1314                HandshakeStatus::default(),
1315                now,
1316                None,
1317                "",
1318            )
1319            .unwrap(),
1320            OnAckReceivedOutcome {
1321                lost_packets: 0,
1322                lost_bytes: 0,
1323                acked_bytes: 3 * 1000,
1324                spurious_losses: 0,
1325            }
1326        );
1327
1328        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 2);
1329        assert_eq!(r.bytes_in_flight(), 1000);
1330        assert_eq!(r.bytes_in_flight_duration(), Duration::from_millis(10));
1331        assert_eq!(r.lost_count(), 0);
1332
1333        // Wait until loss detection timer expires.
1334        now = r.loss_detection_timer().unwrap();
1335
1336        // Packet is declared lost.
1337        r.on_loss_detection_timeout(HandshakeStatus::default(), now, "");
1338        assert_eq!(r.loss_probes(packet::Epoch::Application), 0);
1339
1340        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 2);
1341        assert_eq!(r.bytes_in_flight(), 0);
1342        assert_eq!(r.bytes_in_flight_duration(), Duration::from_micros(11250));
1343
1344        assert_eq!(r.lost_count(), 1);
1345
1346        // Wait 1 RTT.
1347        now += r.rtt();
1348
1349        assert_eq!(
1350            r.detect_lost_packets_for_test(packet::Epoch::Application, now),
1351            (0, 0)
1352        );
1353
1354        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 0);
1355        if cc_algorithm_name == "reno" || cc_algorithm_name == "cubic" {
1356            assert!(r.startup_exit().is_some());
1357            assert_eq!(r.startup_exit().unwrap().reason, StartupExitReason::Loss);
1358        } else {
1359            assert_eq!(r.startup_exit(), None);
1360        }
1361    }
1362
1363    #[rstest]
1364    fn loss_on_reordering(
1365        #[values("reno", "cubic", "bbr2_gcongestion")] cc_algorithm_name: &str,
1366    ) {
1367        let mut cfg = Config::new(crate::PROTOCOL_VERSION).unwrap();
1368        assert_eq!(cfg.set_cc_algorithm_name(cc_algorithm_name), Ok(()));
1369
1370        let mut r = Recovery::new(&cfg);
1371
1372        let mut now = Instant::now();
1373
1374        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 0);
1375
1376        // Start by sending a few packets.
1377        //
1378        // pkt number: [0, 1, 2, 3]
1379        for i in 0..4 {
1380            let p = test_utils::helper_packet_sent(i, now, 1000);
1381            r.on_packet_sent(
1382                p,
1383                packet::Epoch::Application,
1384                HandshakeStatus::default(),
1385                now,
1386                "",
1387            );
1388
1389            let pkt_count = (i + 1) as usize;
1390            assert_eq!(r.sent_packets_len(packet::Epoch::Application), pkt_count);
1391            assert_eq!(r.bytes_in_flight(), pkt_count * 1000);
1392            assert_eq!(r.bytes_in_flight_duration(), Duration::ZERO);
1393        }
1394
1395        // Wait for 10ms after sending.
1396        now += Duration::from_millis(10);
1397
1398        // Recieve reordered ACKs, i.e. pkt_num [2, 3]
1399        let mut acked = RangeSet::default();
1400        acked.insert(2..4);
1401        assert_eq!(
1402            r.on_ack_received(
1403                &acked,
1404                25,
1405                packet::Epoch::Application,
1406                HandshakeStatus::default(),
1407                now,
1408                None,
1409                "",
1410            )
1411            .unwrap(),
1412            OnAckReceivedOutcome {
1413                lost_packets: 1,
1414                lost_bytes: 1000,
1415                acked_bytes: 1000 * 2,
1416                spurious_losses: 0,
1417            }
1418        );
1419        // Since we only remove packets from the back to avoid compaction, the
1420        // send length remains the same after receiving reordered ACKs
1421        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 4);
1422        assert_eq!(r.pkt_thresh().unwrap(), INITIAL_PACKET_THRESHOLD);
1423        assert_eq!(r.time_thresh(), INITIAL_TIME_THRESHOLD);
1424
1425        // Wait for 10ms after receiving first set of ACKs.
1426        now += Duration::from_millis(10);
1427
1428        // Recieve remaining ACKs, i.e. pkt_num [0, 1]
1429        let mut acked = RangeSet::default();
1430        acked.insert(0..2);
1431        assert_eq!(
1432            r.on_ack_received(
1433                &acked,
1434                25,
1435                packet::Epoch::Application,
1436                HandshakeStatus::default(),
1437                now,
1438                None,
1439                "",
1440            )
1441            .unwrap(),
1442            OnAckReceivedOutcome {
1443                lost_packets: 0,
1444                lost_bytes: 0,
1445                acked_bytes: 1000,
1446                spurious_losses: 1,
1447            }
1448        );
1449        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 0);
1450        assert_eq!(r.bytes_in_flight(), 0);
1451        assert_eq!(r.bytes_in_flight_duration(), Duration::from_millis(20));
1452
1453        // Spurious loss.
1454        assert_eq!(r.lost_count(), 1);
1455        assert_eq!(r.lost_spurious_count(), 1);
1456
1457        // Packet threshold was increased.
1458        assert_eq!(r.pkt_thresh().unwrap(), 4);
1459        assert_eq!(r.time_thresh(), PACKET_REORDER_TIME_THRESHOLD);
1460
1461        // Wait 1 RTT.
1462        now += r.rtt();
1463
1464        // All packets have been ACKed so dont expect additional lost packets
1465        assert_eq!(
1466            r.detect_lost_packets_for_test(packet::Epoch::Application, now),
1467            (0, 0)
1468        );
1469        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 0);
1470
1471        if cc_algorithm_name == "reno" || cc_algorithm_name == "cubic" {
1472            assert!(r.startup_exit().is_some());
1473            assert_eq!(r.startup_exit().unwrap().reason, StartupExitReason::Loss);
1474        } else {
1475            assert_eq!(r.startup_exit(), None);
1476        }
1477    }
1478
1479    // TODO: This should run agains both `congestion` and `gcongestion`.
1480    // `congestion` and `gcongestion` behave differently. That might be ok
1481    // given the different algorithms but it would be ideal to merge and share
1482    // the logic.
1483    #[rstest]
1484    fn time_thresholds_on_reordering(
1485        #[values("bbr2_gcongestion")] cc_algorithm_name: &str,
1486    ) {
1487        let mut cfg = Config::new(crate::PROTOCOL_VERSION).unwrap();
1488        assert_eq!(cfg.set_cc_algorithm_name(cc_algorithm_name), Ok(()));
1489
1490        let mut now = Instant::now();
1491        let mut r = Recovery::new(&cfg);
1492        assert_eq!(r.rtt(), DEFAULT_INITIAL_RTT);
1493
1494        // Pick time between and above thresholds for testing threshold increase.
1495        //
1496        //```
1497        //              between_thresh_ms
1498        //                         |
1499        //    initial_thresh_ms    |     spurious_thresh_ms
1500        //      v                  v             v
1501        // --------------------------------------------------
1502        //      | ................ | ..................... |
1503        //            THRESH_GAP         THRESH_GAP
1504        // ```
1505        // 
1506        // Threshold gap time.
1507        const THRESH_GAP: Duration = Duration::from_millis(30);
1508        // Initial time theshold based on inital RTT.
1509        let initial_thresh_ms =
1510            DEFAULT_INITIAL_RTT.mul_f64(INITIAL_TIME_THRESHOLD);
1511        // The time threshold after spurious loss.
1512        let spurious_thresh_ms: Duration =
1513            DEFAULT_INITIAL_RTT.mul_f64(PACKET_REORDER_TIME_THRESHOLD);
1514        // Time between the two thresholds
1515        let between_thresh_ms = initial_thresh_ms + THRESH_GAP;
1516        assert!(between_thresh_ms > initial_thresh_ms);
1517        assert!(between_thresh_ms < spurious_thresh_ms);
1518        assert!(between_thresh_ms + THRESH_GAP > spurious_thresh_ms);
1519
1520        for i in 0..6 {
1521            let send_time = now + i * between_thresh_ms;
1522
1523            let p = test_utils::helper_packet_sent(i.into(), send_time, 1000);
1524            r.on_packet_sent(
1525                p,
1526                packet::Epoch::Application,
1527                HandshakeStatus::default(),
1528                send_time,
1529                "",
1530            );
1531        }
1532
1533        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 6);
1534        assert_eq!(r.bytes_in_flight(), 6 * 1000);
1535        assert_eq!(r.pkt_thresh().unwrap(), INITIAL_PACKET_THRESHOLD);
1536        assert_eq!(r.time_thresh(), INITIAL_TIME_THRESHOLD);
1537
1538        // Wait for `between_thresh_ms` after sending to trigger loss based on
1539        // loss threshold.
1540        now += between_thresh_ms;
1541
1542        // Ack packet: 1
1543        //
1544        // [0, 1, 2, 3, 4, 5]
1545        //     ^
1546        let mut acked = RangeSet::default();
1547        acked.insert(1..2);
1548        assert_eq!(
1549            r.on_ack_received(
1550                &acked,
1551                25,
1552                packet::Epoch::Application,
1553                HandshakeStatus::default(),
1554                now,
1555                None,
1556                "",
1557            )
1558            .unwrap(),
1559            OnAckReceivedOutcome {
1560                lost_packets: 1,
1561                lost_bytes: 1000,
1562                acked_bytes: 1000,
1563                spurious_losses: 0,
1564            }
1565        );
1566        assert_eq!(r.pkt_thresh().unwrap(), INITIAL_PACKET_THRESHOLD);
1567        assert_eq!(r.time_thresh(), INITIAL_TIME_THRESHOLD);
1568
1569        // Ack packet: 0
1570        //
1571        // [0, 1, 2, 3, 4, 5]
1572        //  ^  x
1573        let mut acked = RangeSet::default();
1574        acked.insert(0..1);
1575        assert_eq!(
1576            r.on_ack_received(
1577                &acked,
1578                25,
1579                packet::Epoch::Application,
1580                HandshakeStatus::default(),
1581                now,
1582                None,
1583                "",
1584            )
1585            .unwrap(),
1586            OnAckReceivedOutcome {
1587                lost_packets: 0,
1588                lost_bytes: 0,
1589                acked_bytes: 0,
1590                spurious_losses: 1,
1591            }
1592        );
1593        // The time_thresh after spurious loss
1594        assert_eq!(r.time_thresh(), PACKET_REORDER_TIME_THRESHOLD);
1595
1596        // Wait for `between_thresh_ms` after sending. However, since the
1597        // threshold has increased, we do not expect loss.
1598        now += between_thresh_ms;
1599
1600        // Ack packet: 3
1601        //
1602        // [2, 3, 4, 5]
1603        //     ^
1604        let mut acked = RangeSet::default();
1605        acked.insert(3..4);
1606        assert_eq!(
1607            r.on_ack_received(
1608                &acked,
1609                25,
1610                packet::Epoch::Application,
1611                HandshakeStatus::default(),
1612                now,
1613                None,
1614                "",
1615            )
1616            .unwrap(),
1617            OnAckReceivedOutcome {
1618                lost_packets: 0,
1619                lost_bytes: 0,
1620                acked_bytes: 1000,
1621                spurious_losses: 0,
1622            }
1623        );
1624
1625        // Wait for and additional `plus_overhead` to trigger loss based on the
1626        // new time threshold.
1627        now += THRESH_GAP;
1628
1629        // Ack packet: 4
1630        //
1631        // [2, 3, 4, 5]
1632        //     x  ^
1633        let mut acked = RangeSet::default();
1634        acked.insert(4..5);
1635        assert_eq!(
1636            r.on_ack_received(
1637                &acked,
1638                25,
1639                packet::Epoch::Application,
1640                HandshakeStatus::default(),
1641                now,
1642                None,
1643                "",
1644            )
1645            .unwrap(),
1646            OnAckReceivedOutcome {
1647                lost_packets: 1,
1648                lost_bytes: 1000,
1649                acked_bytes: 1000,
1650                spurious_losses: 0,
1651            }
1652        );
1653    }
1654
1655    // TODO: Implement enable_relaxed_loss_threshold and enable this test for the
1656    // congestion module.
1657    #[rstest]
1658    fn relaxed_thresholds_on_reordering(
1659        #[values("bbr2_gcongestion")] cc_algorithm_name: &str,
1660    ) {
1661        let mut cfg = Config::new(crate::PROTOCOL_VERSION).unwrap();
1662        cfg.enable_relaxed_loss_threshold = true;
1663        assert_eq!(cfg.set_cc_algorithm_name(cc_algorithm_name), Ok(()));
1664
1665        let mut now = Instant::now();
1666        let mut r = Recovery::new(&cfg);
1667        assert_eq!(r.rtt(), DEFAULT_INITIAL_RTT);
1668
1669        // Pick time between and above thresholds for testing threshold increase.
1670        //
1671        //```
1672        //              between_thresh_ms
1673        //                         |
1674        //    initial_thresh_ms    |     spurious_thresh_ms
1675        //      v                  v             v
1676        // --------------------------------------------------
1677        //      | ................ | ..................... |
1678        //            THRESH_GAP         THRESH_GAP
1679        // ```
1680        // Threshold gap time.
1681        const THRESH_GAP: Duration = Duration::from_millis(30);
1682        // Initial time theshold based on inital RTT.
1683        let initial_thresh_ms =
1684            DEFAULT_INITIAL_RTT.mul_f64(INITIAL_TIME_THRESHOLD);
1685        // The time threshold after spurious loss.
1686        let spurious_thresh_ms: Duration =
1687            DEFAULT_INITIAL_RTT.mul_f64(PACKET_REORDER_TIME_THRESHOLD);
1688        // Time between the two thresholds
1689        let between_thresh_ms = initial_thresh_ms + THRESH_GAP;
1690        assert!(between_thresh_ms > initial_thresh_ms);
1691        assert!(between_thresh_ms < spurious_thresh_ms);
1692        assert!(between_thresh_ms + THRESH_GAP > spurious_thresh_ms);
1693
1694        for i in 0..6 {
1695            let send_time = now + i * between_thresh_ms;
1696
1697            let p = test_utils::helper_packet_sent(i.into(), send_time, 1000);
1698            r.on_packet_sent(
1699                p,
1700                packet::Epoch::Application,
1701                HandshakeStatus::default(),
1702                send_time,
1703                "",
1704            );
1705        }
1706
1707        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 6);
1708        assert_eq!(r.bytes_in_flight(), 6 * 1000);
1709        // Intitial thresholds
1710        assert_eq!(r.pkt_thresh().unwrap(), INITIAL_PACKET_THRESHOLD);
1711        assert_eq!(r.time_thresh(), INITIAL_TIME_THRESHOLD);
1712
1713        // Wait for `between_thresh_ms` after sending to trigger loss based on
1714        // loss threshold.
1715        now += between_thresh_ms;
1716
1717        // Ack packet: 1
1718        //
1719        // [0, 1, 2, 3, 4, 5]
1720        //     ^
1721        let mut acked = RangeSet::default();
1722        acked.insert(1..2);
1723        assert_eq!(
1724            r.on_ack_received(
1725                &acked,
1726                25,
1727                packet::Epoch::Application,
1728                HandshakeStatus::default(),
1729                now,
1730                None,
1731                "",
1732            )
1733            .unwrap(),
1734            OnAckReceivedOutcome {
1735                lost_packets: 1,
1736                lost_bytes: 1000,
1737                acked_bytes: 1000,
1738                spurious_losses: 0,
1739            }
1740        );
1741        // Thresholds after 1st loss
1742        assert_eq!(r.pkt_thresh().unwrap(), INITIAL_PACKET_THRESHOLD);
1743        assert_eq!(r.time_thresh(), INITIAL_TIME_THRESHOLD);
1744
1745        // Ack packet: 0
1746        //
1747        // [0, 1, 2, 3, 4, 5]
1748        //  ^  x
1749        let mut acked = RangeSet::default();
1750        acked.insert(0..1);
1751        assert_eq!(
1752            r.on_ack_received(
1753                &acked,
1754                25,
1755                packet::Epoch::Application,
1756                HandshakeStatus::default(),
1757                now,
1758                None,
1759                "",
1760            )
1761            .unwrap(),
1762            OnAckReceivedOutcome {
1763                lost_packets: 0,
1764                lost_bytes: 0,
1765                acked_bytes: 0,
1766                spurious_losses: 1,
1767            }
1768        );
1769        // Thresholds after 1st spurious loss
1770        //
1771        // Packet threshold should be disabled. Time threshold overhead should
1772        // stay the same.
1773        assert_eq!(r.pkt_thresh(), None);
1774        assert_eq!(r.time_thresh(), INITIAL_TIME_THRESHOLD);
1775
1776        // Set now to send time of packet 2 so we can trigger spurious loss for
1777        // packet 2.
1778        now += between_thresh_ms;
1779        // Then wait for `between_thresh_ms` after sending packet 2 to trigger
1780        // loss. Since the time threshold has NOT increased, expect a
1781        // loss.
1782        now += between_thresh_ms;
1783
1784        // Ack packet: 3
1785        //
1786        // [2, 3, 4, 5]
1787        //     ^
1788        let mut acked = RangeSet::default();
1789        acked.insert(3..4);
1790        assert_eq!(
1791            r.on_ack_received(
1792                &acked,
1793                25,
1794                packet::Epoch::Application,
1795                HandshakeStatus::default(),
1796                now,
1797                None,
1798                "",
1799            )
1800            .unwrap(),
1801            OnAckReceivedOutcome {
1802                lost_packets: 1,
1803                lost_bytes: 1000,
1804                acked_bytes: 1000,
1805                spurious_losses: 0,
1806            }
1807        );
1808        // Thresholds after 2nd loss.
1809        assert_eq!(r.pkt_thresh(), None);
1810        assert_eq!(r.time_thresh(), INITIAL_TIME_THRESHOLD);
1811
1812        // Wait for and additional `plus_overhead` to trigger loss based on the
1813        // new time threshold.
1814        // now += THRESH_GAP;
1815
1816        // Ack packet: 2
1817        //
1818        // [2, 3, 4, 5]
1819        //  ^  x
1820        let mut acked = RangeSet::default();
1821        acked.insert(2..3);
1822        assert_eq!(
1823            r.on_ack_received(
1824                &acked,
1825                25,
1826                packet::Epoch::Application,
1827                HandshakeStatus::default(),
1828                now,
1829                None,
1830                "",
1831            )
1832            .unwrap(),
1833            OnAckReceivedOutcome {
1834                lost_packets: 0,
1835                lost_bytes: 0,
1836                acked_bytes: 0,
1837                spurious_losses: 1,
1838            }
1839        );
1840        // Thresholds after 2nd spurious loss.
1841        //
1842        // Time threshold overhead should double.
1843        assert_eq!(r.pkt_thresh(), None);
1844        let double_time_thresh_overhead =
1845            1.0 + 2.0 * INITIAL_TIME_THRESHOLD_OVERHEAD;
1846        assert_eq!(r.time_thresh(), double_time_thresh_overhead);
1847    }
1848
1849    #[rstest]
1850    fn pacing(
1851        #[values("reno", "cubic", "bbr2_gcongestion")] cc_algorithm_name: &str,
1852        #[values(false, true)] time_sent_set_to_now: bool,
1853    ) {
1854        let pacing_enabled = cc_algorithm_name == "bbr2" ||
1855            cc_algorithm_name == "bbr2_gcongestion";
1856
1857        let mut cfg = Config::new(crate::PROTOCOL_VERSION).unwrap();
1858        assert_eq!(cfg.set_cc_algorithm_name(cc_algorithm_name), Ok(()));
1859
1860        #[cfg(feature = "internal")]
1861        cfg.set_custom_bbr_params(BbrParams {
1862            time_sent_set_to_now: Some(time_sent_set_to_now),
1863            ..Default::default()
1864        });
1865
1866        let mut r = Recovery::new(&cfg);
1867
1868        let mut now = Instant::now();
1869
1870        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 0);
1871
1872        // send out first packet burst (a full initcwnd).
1873        for i in 0..10 {
1874            let p = Sent {
1875                pkt_num: i,
1876                frames: smallvec![],
1877                time_sent: now,
1878                time_acked: None,
1879                time_lost: None,
1880                size: 1200,
1881                ack_eliciting: true,
1882                in_flight: true,
1883                delivered: 0,
1884                delivered_time: now,
1885                first_sent_time: now,
1886                is_app_limited: false,
1887                tx_in_flight: 0,
1888                lost: 0,
1889                has_data: true,
1890                is_pmtud_probe: false,
1891            };
1892
1893            r.on_packet_sent(
1894                p,
1895                packet::Epoch::Application,
1896                HandshakeStatus::default(),
1897                now,
1898                "",
1899            );
1900        }
1901
1902        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 10);
1903        assert_eq!(r.bytes_in_flight(), 12000);
1904        assert_eq!(r.bytes_in_flight_duration(), Duration::ZERO);
1905
1906        if !pacing_enabled {
1907            assert_eq!(r.pacing_rate(), 0);
1908        } else {
1909            assert_eq!(r.pacing_rate(), 103963);
1910        }
1911        assert_eq!(r.get_packet_send_time(now), now);
1912
1913        assert_eq!(r.cwnd(), 12000);
1914        assert_eq!(r.cwnd_available(), 0);
1915
1916        // Wait 50ms for ACK.
1917        let initial_rtt = Duration::from_millis(50);
1918        now += initial_rtt;
1919
1920        let mut acked = RangeSet::default();
1921        acked.insert(0..10);
1922
1923        assert_eq!(
1924            r.on_ack_received(
1925                &acked,
1926                10,
1927                packet::Epoch::Application,
1928                HandshakeStatus::default(),
1929                now,
1930                None,
1931                "",
1932            )
1933            .unwrap(),
1934            OnAckReceivedOutcome {
1935                lost_packets: 0,
1936                lost_bytes: 0,
1937                acked_bytes: 12000,
1938                spurious_losses: 0,
1939            }
1940        );
1941
1942        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 0);
1943        assert_eq!(r.bytes_in_flight(), 0);
1944        assert_eq!(r.bytes_in_flight_duration(), initial_rtt);
1945        assert_eq!(r.min_rtt(), Some(initial_rtt));
1946        assert_eq!(r.rtt(), initial_rtt);
1947
1948        // 10 MSS increased due to acks.
1949        assert_eq!(r.cwnd(), 12000 + 1200 * 10);
1950
1951        // Send the second packet burst.
1952        let p = Sent {
1953            pkt_num: 10,
1954            frames: smallvec![],
1955            time_sent: now,
1956            time_acked: None,
1957            time_lost: None,
1958            size: 6000,
1959            ack_eliciting: true,
1960            in_flight: true,
1961            delivered: 0,
1962            delivered_time: now,
1963            first_sent_time: now,
1964            is_app_limited: false,
1965            tx_in_flight: 0,
1966            lost: 0,
1967            has_data: true,
1968            is_pmtud_probe: false,
1969        };
1970
1971        r.on_packet_sent(
1972            p,
1973            packet::Epoch::Application,
1974            HandshakeStatus::default(),
1975            now,
1976            "",
1977        );
1978
1979        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 1);
1980        assert_eq!(r.bytes_in_flight(), 6000);
1981        assert_eq!(r.bytes_in_flight_duration(), initial_rtt);
1982
1983        if !pacing_enabled {
1984            // Pacing is disabled.
1985            assert_eq!(r.get_packet_send_time(now), now);
1986        } else {
1987            // Pacing is done from the beginning.
1988            assert_ne!(r.get_packet_send_time(now), now);
1989        }
1990
1991        // Send the third and fourth packet bursts together.
1992        let p = Sent {
1993            pkt_num: 11,
1994            frames: smallvec![],
1995            time_sent: now,
1996            time_acked: None,
1997            time_lost: None,
1998            size: 6000,
1999            ack_eliciting: true,
2000            in_flight: true,
2001            delivered: 0,
2002            delivered_time: now,
2003            first_sent_time: now,
2004            is_app_limited: false,
2005            tx_in_flight: 0,
2006            lost: 0,
2007            has_data: true,
2008            is_pmtud_probe: false,
2009        };
2010
2011        r.on_packet_sent(
2012            p,
2013            packet::Epoch::Application,
2014            HandshakeStatus::default(),
2015            now,
2016            "",
2017        );
2018
2019        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 2);
2020        assert_eq!(r.bytes_in_flight(), 12000);
2021        assert_eq!(r.bytes_in_flight_duration(), initial_rtt);
2022
2023        // Send the fourth packet burst.
2024        let p = Sent {
2025            pkt_num: 12,
2026            frames: smallvec![],
2027            time_sent: now,
2028            time_acked: None,
2029            time_lost: None,
2030            size: 1000,
2031            ack_eliciting: true,
2032            in_flight: true,
2033            delivered: 0,
2034            delivered_time: now,
2035            first_sent_time: now,
2036            is_app_limited: false,
2037            tx_in_flight: 0,
2038            lost: 0,
2039            has_data: true,
2040            is_pmtud_probe: false,
2041        };
2042
2043        r.on_packet_sent(
2044            p,
2045            packet::Epoch::Application,
2046            HandshakeStatus::default(),
2047            now,
2048            "",
2049        );
2050
2051        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 3);
2052        assert_eq!(r.bytes_in_flight(), 13000);
2053        assert_eq!(r.bytes_in_flight_duration(), initial_rtt);
2054
2055        // We pace this outgoing packet. as all conditions for pacing
2056        // are passed.
2057        let pacing_rate = if pacing_enabled {
2058            let cwnd_gain: f64 = 2.0;
2059            // Adjust for cwnd_gain.  BW estimate was made before the CWND
2060            // increase.
2061            let bw = r.cwnd() as f64 / cwnd_gain / initial_rtt.as_secs_f64();
2062            bw as u64
2063        } else {
2064            0
2065        };
2066        assert_eq!(r.pacing_rate(), pacing_rate);
2067
2068        let scale_factor = if pacing_enabled {
2069            // For bbr2_gcongestion, send time is almost 13000 / pacing_rate.
2070            // Don't know where 13000 comes from.
2071            1.08333332
2072        } else {
2073            1.0
2074        };
2075        assert_eq!(
2076            r.get_packet_send_time(now) - now,
2077            if pacing_enabled {
2078                Duration::from_secs_f64(
2079                    scale_factor * 12000.0 / pacing_rate as f64,
2080                )
2081            } else {
2082                Duration::ZERO
2083            }
2084        );
2085        assert_eq!(r.startup_exit(), None);
2086
2087        let reduced_rtt = Duration::from_millis(40);
2088        now += reduced_rtt;
2089
2090        let mut acked = RangeSet::default();
2091        acked.insert(10..11);
2092
2093        assert_eq!(
2094            r.on_ack_received(
2095                &acked,
2096                0,
2097                packet::Epoch::Application,
2098                HandshakeStatus::default(),
2099                now,
2100                None,
2101                "",
2102            )
2103            .unwrap(),
2104            OnAckReceivedOutcome {
2105                lost_packets: 0,
2106                lost_bytes: 0,
2107                acked_bytes: 6000,
2108                spurious_losses: 0,
2109            }
2110        );
2111
2112        let expected_srtt = (7 * initial_rtt + reduced_rtt) / 8;
2113        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 2);
2114        assert_eq!(r.bytes_in_flight(), 7000);
2115        assert_eq!(r.bytes_in_flight_duration(), initial_rtt + reduced_rtt);
2116        assert_eq!(r.min_rtt(), Some(reduced_rtt));
2117        assert_eq!(r.rtt(), expected_srtt);
2118
2119        let mut acked = RangeSet::default();
2120        acked.insert(11..12);
2121
2122        assert_eq!(
2123            r.on_ack_received(
2124                &acked,
2125                0,
2126                packet::Epoch::Application,
2127                HandshakeStatus::default(),
2128                now,
2129                None,
2130                "",
2131            )
2132            .unwrap(),
2133            OnAckReceivedOutcome {
2134                lost_packets: 0,
2135                lost_bytes: 0,
2136                acked_bytes: 6000,
2137                spurious_losses: 0,
2138            }
2139        );
2140
2141        // When enabled, the pacer adds a 25msec delay to the packet
2142        // sends which will be applied to the sent times tracked by
2143        // the recovery module, bringing down RTT to 15msec.
2144        let expected_min_rtt = if pacing_enabled &&
2145            !time_sent_set_to_now &&
2146            cfg!(feature = "internal")
2147        {
2148            reduced_rtt - Duration::from_millis(25)
2149        } else {
2150            reduced_rtt
2151        };
2152
2153        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 1);
2154        assert_eq!(r.bytes_in_flight(), 1000);
2155        assert_eq!(r.bytes_in_flight_duration(), initial_rtt + reduced_rtt);
2156        assert_eq!(r.min_rtt(), Some(expected_min_rtt));
2157
2158        let expected_srtt = (7 * expected_srtt + expected_min_rtt) / 8;
2159        assert_eq!(r.rtt(), expected_srtt);
2160
2161        let mut acked = RangeSet::default();
2162        acked.insert(12..13);
2163
2164        assert_eq!(
2165            r.on_ack_received(
2166                &acked,
2167                0,
2168                packet::Epoch::Application,
2169                HandshakeStatus::default(),
2170                now,
2171                None,
2172                "",
2173            )
2174            .unwrap(),
2175            OnAckReceivedOutcome {
2176                lost_packets: 0,
2177                lost_bytes: 0,
2178                acked_bytes: 1000,
2179                spurious_losses: 0,
2180            }
2181        );
2182
2183        // Pacer adds 50msec delay to the second packet, resulting in
2184        // an effective RTT of 0.
2185        let expected_min_rtt = if pacing_enabled &&
2186            !time_sent_set_to_now &&
2187            cfg!(feature = "internal")
2188        {
2189            Duration::from_millis(0)
2190        } else {
2191            reduced_rtt
2192        };
2193        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 0);
2194        assert_eq!(r.bytes_in_flight(), 0);
2195        assert_eq!(r.bytes_in_flight_duration(), initial_rtt + reduced_rtt);
2196        assert_eq!(r.min_rtt(), Some(expected_min_rtt));
2197
2198        let expected_srtt = (7 * expected_srtt + expected_min_rtt) / 8;
2199        assert_eq!(r.rtt(), expected_srtt);
2200    }
2201
2202    #[rstest]
2203    // initial_cwnd / first_rtt == initial_pacing_rate.  Pacing is 1.0 * bw before
2204    // and after.
2205    #[case::bw_estimate_equal_after_first_rtt(1.0, 1.0)]
2206    // initial_cwnd / first_rtt < initial_pacing_rate.  Pacing decreases from 2 *
2207    // bw to 1.0 * bw.
2208    #[case::bw_estimate_decrease_after_first_rtt(2.0, 1.0)]
2209    // initial_cwnd / first_rtt > initial_pacing_rate from 0.5 * bw to 1.0 * bw.
2210    // Initial pacing remains 0.5 * bw because the initial_pacing_rate parameter
2211    // is used an upper bound for the pacing rate after the first RTT.
2212    // Pacing rate after the first ACK should be:
2213    // min(initial_pacing_rate_bytes_per_second, init_cwnd / first_rtt)
2214    #[case::bw_estimate_increase_after_first_rtt(0.5, 0.5)]
2215    #[cfg(feature = "internal")]
2216    fn initial_pacing_rate_override(
2217        #[case] initial_multipler: f64, #[case] expected_multiplier: f64,
2218    ) {
2219        let rtt = Duration::from_millis(50);
2220        let bw = Bandwidth::from_bytes_and_time_delta(12000, rtt);
2221        let initial_pacing_rate_hint = bw * initial_multipler;
2222        let expected_pacing_with_rtt_measurement = bw * expected_multiplier;
2223
2224        let cc_algorithm_name = "bbr2_gcongestion";
2225        let mut cfg = Config::new(crate::PROTOCOL_VERSION).unwrap();
2226        assert_eq!(cfg.set_cc_algorithm_name(cc_algorithm_name), Ok(()));
2227        cfg.set_custom_bbr_params(BbrParams {
2228            initial_pacing_rate_bytes_per_second: Some(
2229                initial_pacing_rate_hint.to_bytes_per_second(),
2230            ),
2231            ..Default::default()
2232        });
2233
2234        let mut r = Recovery::new(&cfg);
2235
2236        let mut now = Instant::now();
2237
2238        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 0);
2239
2240        // send some packets.
2241        for i in 0..2 {
2242            let p = test_utils::helper_packet_sent(i, now, 1200);
2243            r.on_packet_sent(
2244                p,
2245                packet::Epoch::Application,
2246                HandshakeStatus::default(),
2247                now,
2248                "",
2249            );
2250        }
2251
2252        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 2);
2253        assert_eq!(r.bytes_in_flight(), 2400);
2254        assert_eq!(r.bytes_in_flight_duration(), Duration::ZERO);
2255
2256        // Initial pacing rate matches the override value.
2257        assert_eq!(
2258            r.pacing_rate(),
2259            initial_pacing_rate_hint.to_bytes_per_second()
2260        );
2261        assert_eq!(r.get_packet_send_time(now), now);
2262
2263        assert_eq!(r.cwnd(), 12000);
2264        assert_eq!(r.cwnd_available(), 9600);
2265
2266        // Wait 1 rtt for ACK.
2267        now += rtt;
2268
2269        let mut acked = RangeSet::default();
2270        acked.insert(0..2);
2271
2272        assert_eq!(
2273            r.on_ack_received(
2274                &acked,
2275                10,
2276                packet::Epoch::Application,
2277                HandshakeStatus::default(),
2278                now,
2279                None,
2280                "",
2281            )
2282            .unwrap(),
2283            OnAckReceivedOutcome {
2284                lost_packets: 0,
2285                lost_bytes: 0,
2286                acked_bytes: 2400,
2287                spurious_losses: 0,
2288            }
2289        );
2290
2291        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 0);
2292        assert_eq!(r.bytes_in_flight(), 0);
2293        assert_eq!(r.bytes_in_flight_duration(), rtt);
2294        assert_eq!(r.rtt(), rtt);
2295
2296        // Pacing rate is recalculated based on initial cwnd when the
2297        // first RTT estimate is available.
2298        assert_eq!(
2299            r.pacing_rate(),
2300            expected_pacing_with_rtt_measurement.to_bytes_per_second()
2301        );
2302    }
2303
2304    #[rstest]
2305    fn validate_ack_range_on_ack_received(
2306        #[values("cubic", "bbr2_gcongestion")] cc_algorithm_name: &str,
2307    ) {
2308        let mut cfg = Config::new(crate::PROTOCOL_VERSION).unwrap();
2309        cfg.set_cc_algorithm_name(cc_algorithm_name).unwrap();
2310
2311        let epoch = packet::Epoch::Application;
2312        let mut r = Recovery::new(&cfg);
2313        let mut now = Instant::now();
2314        assert_eq!(r.sent_packets_len(epoch), 0);
2315
2316        // Send 4 packets
2317        let pkt_size = 1000;
2318        let pkt_count = 4;
2319        for pkt_num in 0..pkt_count {
2320            let sent = test_utils::helper_packet_sent(pkt_num, now, pkt_size);
2321            r.on_packet_sent(sent, epoch, HandshakeStatus::default(), now, "");
2322        }
2323        assert_eq!(r.sent_packets_len(epoch), pkt_count as usize);
2324        assert_eq!(r.bytes_in_flight(), pkt_count as usize * pkt_size);
2325        assert!(r.get_largest_acked_on_epoch(epoch).is_none());
2326        assert_eq!(r.largest_sent_pkt_num_on_path(epoch).unwrap(), 3);
2327
2328        // Wait for 10ms.
2329        now += Duration::from_millis(10);
2330
2331        // ACK 2 packets
2332        let mut acked = RangeSet::default();
2333        acked.insert(0..2);
2334
2335        assert_eq!(
2336            r.on_ack_received(
2337                &acked,
2338                25,
2339                epoch,
2340                HandshakeStatus::default(),
2341                now,
2342                None,
2343                "",
2344            )
2345            .unwrap(),
2346            OnAckReceivedOutcome {
2347                lost_packets: 0,
2348                lost_bytes: 0,
2349                acked_bytes: 2 * 1000,
2350                spurious_losses: 0,
2351            }
2352        );
2353
2354        assert_eq!(r.sent_packets_len(epoch), 2);
2355        assert_eq!(r.bytes_in_flight(), 2 * 1000);
2356
2357        assert_eq!(r.get_largest_acked_on_epoch(epoch).unwrap(), 1);
2358        assert_eq!(r.largest_sent_pkt_num_on_path(epoch).unwrap(), 3);
2359
2360        // ACK large range
2361        let mut acked = RangeSet::default();
2362        acked.insert(0..10);
2363        assert_eq!(
2364            r.on_ack_received(
2365                &acked,
2366                25,
2367                epoch,
2368                HandshakeStatus::default(),
2369                now,
2370                None,
2371                "",
2372            )
2373            .unwrap(),
2374            OnAckReceivedOutcome {
2375                lost_packets: 0,
2376                lost_bytes: 0,
2377                acked_bytes: 2 * 1000,
2378                spurious_losses: 0,
2379            }
2380        );
2381        assert_eq!(r.sent_packets_len(epoch), 0);
2382        assert_eq!(r.bytes_in_flight(), 0);
2383
2384        assert_eq!(r.get_largest_acked_on_epoch(epoch).unwrap(), 3);
2385        assert_eq!(r.largest_sent_pkt_num_on_path(epoch).unwrap(), 3);
2386    }
2387
2388    #[rstest]
2389    fn pmtud_loss_on_timer(
2390        #[values("reno", "cubic", "bbr2_gcongestion")] cc_algorithm_name: &str,
2391    ) {
2392        let mut cfg = Config::new(crate::PROTOCOL_VERSION).unwrap();
2393        assert_eq!(cfg.set_cc_algorithm_name(cc_algorithm_name), Ok(()));
2394
2395        let mut r = Recovery::new(&cfg);
2396        assert_eq!(r.cwnd(), 12000);
2397
2398        let mut now = Instant::now();
2399
2400        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 0);
2401
2402        // Start by sending a few packets.
2403        let p = Sent {
2404            pkt_num: 0,
2405            frames: smallvec![],
2406            time_sent: now,
2407            time_acked: None,
2408            time_lost: None,
2409            size: 1000,
2410            ack_eliciting: true,
2411            in_flight: true,
2412            delivered: 0,
2413            delivered_time: now,
2414            first_sent_time: now,
2415            is_app_limited: false,
2416            tx_in_flight: 0,
2417            lost: 0,
2418            has_data: false,
2419            is_pmtud_probe: false,
2420        };
2421
2422        r.on_packet_sent(
2423            p,
2424            packet::Epoch::Application,
2425            HandshakeStatus::default(),
2426            now,
2427            "",
2428        );
2429
2430        assert_eq!(r.in_flight_count(packet::Epoch::Application), 1);
2431        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 1);
2432        assert_eq!(r.bytes_in_flight(), 1000);
2433        assert_eq!(r.bytes_in_flight_duration(), Duration::ZERO);
2434
2435        let p = Sent {
2436            pkt_num: 1,
2437            frames: smallvec![],
2438            time_sent: now,
2439            time_acked: None,
2440            time_lost: None,
2441            size: 1000,
2442            ack_eliciting: true,
2443            in_flight: true,
2444            delivered: 0,
2445            delivered_time: now,
2446            first_sent_time: now,
2447            is_app_limited: false,
2448            tx_in_flight: 0,
2449            lost: 0,
2450            has_data: false,
2451            is_pmtud_probe: true,
2452        };
2453
2454        r.on_packet_sent(
2455            p,
2456            packet::Epoch::Application,
2457            HandshakeStatus::default(),
2458            now,
2459            "",
2460        );
2461
2462        assert_eq!(r.in_flight_count(packet::Epoch::Application), 2);
2463
2464        let p = Sent {
2465            pkt_num: 2,
2466            frames: smallvec![],
2467            time_sent: now,
2468            time_acked: None,
2469            time_lost: None,
2470            size: 1000,
2471            ack_eliciting: true,
2472            in_flight: true,
2473            delivered: 0,
2474            delivered_time: now,
2475            first_sent_time: now,
2476            is_app_limited: false,
2477            tx_in_flight: 0,
2478            lost: 0,
2479            has_data: false,
2480            is_pmtud_probe: false,
2481        };
2482
2483        r.on_packet_sent(
2484            p,
2485            packet::Epoch::Application,
2486            HandshakeStatus::default(),
2487            now,
2488            "",
2489        );
2490
2491        assert_eq!(r.in_flight_count(packet::Epoch::Application), 3);
2492
2493        // Wait for 10ms.
2494        now += Duration::from_millis(10);
2495
2496        // Only the first  packets and the last one are acked.
2497        let mut acked = RangeSet::default();
2498        acked.insert(0..1);
2499        acked.insert(2..3);
2500
2501        assert_eq!(
2502            r.on_ack_received(
2503                &acked,
2504                25,
2505                packet::Epoch::Application,
2506                HandshakeStatus::default(),
2507                now,
2508                None,
2509                "",
2510            )
2511            .unwrap(),
2512            OnAckReceivedOutcome {
2513                lost_packets: 0,
2514                lost_bytes: 0,
2515                acked_bytes: 2 * 1000,
2516                spurious_losses: 0,
2517            }
2518        );
2519
2520        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 2);
2521        assert_eq!(r.bytes_in_flight(), 1000);
2522        assert_eq!(r.bytes_in_flight_duration(), Duration::from_millis(10));
2523        assert_eq!(r.lost_count(), 0);
2524
2525        // Wait until loss detection timer expires.
2526        now = r.loss_detection_timer().unwrap();
2527
2528        // Packet is declared lost.
2529        r.on_loss_detection_timeout(HandshakeStatus::default(), now, "");
2530        assert_eq!(r.loss_probes(packet::Epoch::Application), 0);
2531
2532        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 2);
2533        assert_eq!(r.in_flight_count(packet::Epoch::Application), 0);
2534        assert_eq!(r.bytes_in_flight(), 0);
2535        assert_eq!(r.bytes_in_flight_duration(), Duration::from_micros(11250));
2536        assert_eq!(r.cwnd(), 12000);
2537
2538        assert_eq!(r.lost_count(), 0);
2539
2540        // Wait 1 RTT.
2541        now += r.rtt();
2542
2543        assert_eq!(
2544            r.detect_lost_packets_for_test(packet::Epoch::Application, now),
2545            (0, 0)
2546        );
2547
2548        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 0);
2549        assert_eq!(r.in_flight_count(packet::Epoch::Application), 0);
2550        assert_eq!(r.bytes_in_flight(), 0);
2551        assert_eq!(r.bytes_in_flight_duration(), Duration::from_micros(11250));
2552        assert_eq!(r.lost_count(), 0);
2553        assert_eq!(r.startup_exit(), None);
2554    }
2555
2556    // Modeling delivery_rate for gcongestion is non-trivial so we only test the
2557    // congestion specific algorithms.
2558    #[rstest]
2559    fn congestion_delivery_rate(
2560        #[values("reno", "cubic", "bbr2")] cc_algorithm_name: &str,
2561    ) {
2562        let mut cfg = Config::new(crate::PROTOCOL_VERSION).unwrap();
2563        assert_eq!(cfg.set_cc_algorithm_name(cc_algorithm_name), Ok(()));
2564
2565        let mut r = Recovery::new(&cfg);
2566        assert_eq!(r.cwnd(), 12000);
2567
2568        let now = Instant::now();
2569
2570        let mut total_bytes_sent = 0;
2571        for pn in 0..10 {
2572            // Start by sending a few packets.
2573            let bytes = 1000;
2574            let sent = test_utils::helper_packet_sent(pn, now, bytes);
2575            r.on_packet_sent(
2576                sent,
2577                packet::Epoch::Application,
2578                HandshakeStatus::default(),
2579                now,
2580                "",
2581            );
2582
2583            total_bytes_sent += bytes;
2584        }
2585
2586        // Ack
2587        let interval = Duration::from_secs(10);
2588        let mut acked = RangeSet::default();
2589        acked.insert(0..10);
2590        assert_eq!(
2591            r.on_ack_received(
2592                &acked,
2593                25,
2594                packet::Epoch::Application,
2595                HandshakeStatus::default(),
2596                now + interval,
2597                None,
2598                "",
2599            )
2600            .unwrap(),
2601            OnAckReceivedOutcome {
2602                lost_packets: 0,
2603                lost_bytes: 0,
2604                acked_bytes: total_bytes_sent,
2605                spurious_losses: 0,
2606            }
2607        );
2608        assert_eq!(r.delivery_rate().to_bytes_per_second(), 1000);
2609        assert_eq!(r.min_rtt().unwrap(), interval);
2610        // delivery rate should be in units bytes/sec
2611        assert_eq!(
2612            total_bytes_sent as u64 / interval.as_secs(),
2613            r.delivery_rate().to_bytes_per_second()
2614        );
2615        assert_eq!(r.startup_exit(), None);
2616    }
2617
2618    #[rstest]
2619    fn acks_with_no_retransmittable_data(
2620        #[values("reno", "cubic", "bbr2_gcongestion")] cc_algorithm_name: &str,
2621    ) {
2622        let rtt = Duration::from_millis(100);
2623
2624        let mut cfg = Config::new(crate::PROTOCOL_VERSION).unwrap();
2625        assert_eq!(cfg.set_cc_algorithm_name(cc_algorithm_name), Ok(()));
2626
2627        let mut r = Recovery::new(&cfg);
2628
2629        let mut now = Instant::now();
2630
2631        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 0);
2632
2633        let mut next_packet = 0;
2634        // send some packets.
2635        for _ in 0..3 {
2636            let p = test_utils::helper_packet_sent(next_packet, now, 1200);
2637            next_packet += 1;
2638            r.on_packet_sent(
2639                p,
2640                packet::Epoch::Application,
2641                HandshakeStatus::default(),
2642                now,
2643                "",
2644            );
2645        }
2646
2647        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 3);
2648        assert_eq!(r.bytes_in_flight(), 3600);
2649        assert_eq!(r.bytes_in_flight_duration(), Duration::ZERO);
2650
2651        assert_eq!(
2652            r.pacing_rate(),
2653            if cc_algorithm_name == "bbr2_gcongestion" {
2654                103963
2655            } else {
2656                0
2657            },
2658        );
2659        assert_eq!(r.get_packet_send_time(now), now);
2660        assert_eq!(r.cwnd(), 12000);
2661        assert_eq!(r.cwnd_available(), 8400);
2662
2663        // Wait 1 rtt for ACK.
2664        now += rtt;
2665
2666        let mut acked = RangeSet::default();
2667        acked.insert(0..3);
2668
2669        assert_eq!(
2670            r.on_ack_received(
2671                &acked,
2672                10,
2673                packet::Epoch::Application,
2674                HandshakeStatus::default(),
2675                now,
2676                None,
2677                "",
2678            )
2679            .unwrap(),
2680            OnAckReceivedOutcome {
2681                lost_packets: 0,
2682                lost_bytes: 0,
2683                acked_bytes: 3600,
2684                spurious_losses: 0,
2685            }
2686        );
2687
2688        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 0);
2689        assert_eq!(r.bytes_in_flight(), 0);
2690        assert_eq!(r.bytes_in_flight_duration(), rtt);
2691        assert_eq!(r.rtt(), rtt);
2692
2693        // Pacing rate is recalculated based on initial cwnd when the
2694        // first RTT estimate is available.
2695        assert_eq!(
2696            r.pacing_rate(),
2697            if cc_algorithm_name == "bbr2_gcongestion" {
2698                120000
2699            } else {
2700                0
2701            },
2702        );
2703
2704        // Send some no "in_flight" packets
2705        for iter in 3..1000 {
2706            let mut p = test_utils::helper_packet_sent(next_packet, now, 1200);
2707            // `in_flight = false` marks packets as if they only contained ACK
2708            // frames.
2709            p.in_flight = false;
2710            next_packet += 1;
2711            r.on_packet_sent(
2712                p,
2713                packet::Epoch::Application,
2714                HandshakeStatus::default(),
2715                now,
2716                "",
2717            );
2718
2719            now += rtt;
2720
2721            let mut acked = RangeSet::default();
2722            acked.insert(iter..(iter + 1));
2723
2724            assert_eq!(
2725                r.on_ack_received(
2726                    &acked,
2727                    10,
2728                    packet::Epoch::Application,
2729                    HandshakeStatus::default(),
2730                    now,
2731                    None,
2732                    "",
2733                )
2734                .unwrap(),
2735                OnAckReceivedOutcome {
2736                    lost_packets: 0,
2737                    lost_bytes: 0,
2738                    acked_bytes: 0,
2739                    spurious_losses: 0,
2740                }
2741            );
2742
2743            // Verify that connection has not exited startup.
2744            assert_eq!(r.startup_exit(), None, "{iter}");
2745
2746            // Unchanged metrics.
2747            assert_eq!(
2748                r.sent_packets_len(packet::Epoch::Application),
2749                0,
2750                "{iter}"
2751            );
2752            assert_eq!(r.bytes_in_flight(), 0, "{iter}");
2753            assert_eq!(r.bytes_in_flight_duration(), rtt, "{iter}");
2754            assert_eq!(
2755                r.pacing_rate(),
2756                if cc_algorithm_name == "bbr2_gcongestion" ||
2757                    cc_algorithm_name == "bbr2"
2758                {
2759                    120000
2760                } else {
2761                    0
2762                },
2763                "{iter}"
2764            );
2765        }
2766    }
2767    #[rstest]
2768    fn pto_overflow_reproduction(
2769        #[values("reno", "cubic", "bbr2_gcongestion")] cc_algorithm_name: &str,
2770    ) {
2771        let mut cfg = Config::new(crate::PROTOCOL_VERSION).unwrap();
2772        assert_eq!(cfg.set_cc_algorithm_name(cc_algorithm_name), Ok(()));
2773        let mut r = Recovery::new(&cfg);
2774        let now = Instant::now();
2775
2776        // Scenario: Handshake not completed
2777        let handshake_status = HandshakeStatus {
2778            has_handshake_keys: true,
2779            peer_verified_address: true,
2780            completed: false,
2781        };
2782
2783        // 1. Send Initial packet to arm the timer
2784        let p_initial = Sent {
2785            pkt_num: 0,
2786            frames: smallvec::smallvec![],
2787            time_sent: now,
2788            time_acked: None,
2789            time_lost: None,
2790            size: 1000,
2791            ack_eliciting: true,
2792            in_flight: true,
2793            delivered: 0,
2794            delivered_time: now,
2795            first_sent_time: now,
2796            is_app_limited: false,
2797            tx_in_flight: 0,
2798            lost: 0,
2799            has_data: false,
2800            is_pmtud_probe: false,
2801        };
2802        r.on_packet_sent(
2803            p_initial,
2804            packet::Epoch::Initial,
2805            handshake_status,
2806            now,
2807            "",
2808        );
2809
2810        // The timer should now be set for the Initial packet.
2811        assert!(r.loss_detection_timer().is_some());
2812
2813        // 2. Send Application packet (0-RTT)
2814        let p_app = Sent {
2815            pkt_num: 0, // Application space has its own packet numbers
2816            frames: smallvec::smallvec![],
2817            time_sent: now,
2818            time_acked: None,
2819            time_lost: None,
2820            size: 1000,
2821            ack_eliciting: true,
2822            in_flight: true,
2823            delivered: 0,
2824            delivered_time: now,
2825            first_sent_time: now,
2826            is_app_limited: false,
2827            tx_in_flight: 0,
2828
2829            lost: 0,
2830            has_data: true,
2831            is_pmtud_probe: false,
2832        };
2833        r.on_packet_sent(
2834            p_app,
2835            packet::Epoch::Application,
2836            handshake_status,
2837            now,
2838            "",
2839        );
2840
2841        // 3. Acknowledge the Initial packet.
2842        // This empties the Initial space, but Application space still has data in
2843        // flight.
2844        let mut ranges = RangeSet::default();
2845        ranges.insert(0..1);
2846        r.on_ack_received(
2847            &ranges,
2848            0,
2849            packet::Epoch::Initial,
2850            handshake_status,
2851            now,
2852            None,
2853            "",
2854        )
2855        .unwrap();
2856
2857        // The timer should be cleared at this point.
2858        // Although there is Application data in flight, the handshake is not
2859        // confirmed, so it cannot be used to arm the PTO timer. Since there are
2860        // no packets in flight in Initial or Handshake spaces either, no timer
2861        // should be set.
2862        assert!(r.loss_detection_timer().is_none());
2863    }
2864
2865    // Test that consecutive PTOs don't add duplicate frames to lost_frames.
2866    // This validates the fix: `if epoch.lost_frames.is_empty()` guard.
2867    #[rstest]
2868    fn pto_does_not_duplicate_frames_on_consecutive_timeouts(
2869        #[values("reno", "cubic", "bbr2_gcongestion")] cc_algorithm_name: &str,
2870    ) {
2871        let mut cfg = Config::new(crate::PROTOCOL_VERSION).unwrap();
2872        assert_eq!(cfg.set_cc_algorithm_name(cc_algorithm_name), Ok(()));
2873
2874        let mut r = Recovery::new(&cfg);
2875        let mut now = Instant::now();
2876
2877        // Send a packet with a STREAM frame.
2878        let frames = smallvec![frame::Frame::Stream {
2879            stream_id: 4,
2880            data: RangeBuf::from(b"test", 0, false),
2881        },];
2882
2883        let p = Sent {
2884            pkt_num: 0,
2885            frames: frames.clone(),
2886            time_sent: now,
2887            time_acked: None,
2888            time_lost: None,
2889            size: 1000,
2890            ack_eliciting: true,
2891            in_flight: true,
2892            delivered: 0,
2893            delivered_time: now,
2894            first_sent_time: now,
2895            is_app_limited: false,
2896            tx_in_flight: 0,
2897            lost: 0,
2898            has_data: true,
2899            is_pmtud_probe: false,
2900        };
2901
2902        r.on_packet_sent(
2903            p,
2904            packet::Epoch::Application,
2905            HandshakeStatus::default(),
2906            now,
2907            "",
2908        );
2909
2910        // Verify initial state
2911        assert_eq!(r.lost_frames_count(packet::Epoch::Application), 0);
2912        assert_eq!(r.lost_count(), 0);
2913
2914        // First PTO - should add frames when lost_frames is empty.
2915        now = r.loss_detection_timer().unwrap();
2916        r.on_loss_detection_timeout(HandshakeStatus::default(), now, "");
2917
2918        assert_eq!(r.pto_count(), 1);
2919        let frames_after_first_pto =
2920            r.lost_frames_count(packet::Epoch::Application);
2921        assert_eq!(
2922            frames_after_first_pto, 1,
2923            "First PTO should add exactly 1 frame"
2924        );
2925        assert_eq!(
2926            r.lost_count(),
2927            0,
2928            "PTO doesn't declare packets lost (no CC impact)"
2929        );
2930
2931        // Second PTO while lost_frames is still populated.
2932        // WITHOUT the fix: would add duplicate frame (count becomes 2).
2933        // WITH the fix: skips adding (count stays 1).
2934        now = r.loss_detection_timer().unwrap();
2935        r.on_loss_detection_timeout(HandshakeStatus::default(), now, "");
2936
2937        assert_eq!(r.pto_count(), 2);
2938        let frames_after_second_pto =
2939            r.lost_frames_count(packet::Epoch::Application);
2940        assert_eq!(
2941            frames_after_second_pto, frames_after_first_pto,
2942            "Second PTO must NOT add duplicate frames (fix: `if \
2943             lost_frames.is_empty()`)"
2944        );
2945
2946        // Third PTO for extra validation
2947        now = r.loss_detection_timer().unwrap();
2948        r.on_loss_detection_timeout(HandshakeStatus::default(), now, "");
2949
2950        assert_eq!(r.pto_count(), 3);
2951        let frames_after_third_pto =
2952            r.lost_frames_count(packet::Epoch::Application);
2953        assert_eq!(
2954            frames_after_third_pto, frames_after_first_pto,
2955            "Third PTO must NOT add duplicate frames"
2956        );
2957
2958        // Verify packets are still tracked (not removed)
2959        assert_eq!(r.sent_packets_len(packet::Epoch::Application), 1);
2960        // Verify lost_count never increased (PTO doesn't trigger CC)
2961        assert_eq!(r.lost_count(), 0);
2962    }
2963
2964    // Test that send_on_path after PTO timeout properly sends retransmissions
2965    // and doesn't mark packets as lost (lost_count should remain 0).
2966    #[rstest]
2967    fn pto_send_on_path_retransmits_without_loss(
2968        #[values("reno", "cubic", "bbr2_gcongestion")] cc_algorithm_name: &str,
2969    ) {
2970        use crate::test_utils;
2971
2972        let mut pipe = test_utils::Pipe::new(cc_algorithm_name).unwrap();
2973
2974        // Complete handshake
2975        assert_eq!(pipe.handshake(), Ok(()));
2976
2977        // Client sends stream data
2978        assert_eq!(pipe.client.stream_send(4, b"hello", false), Ok(5));
2979
2980        let mut buf = [0; 65535];
2981
2982        // Send the packet but drop it (don't deliver to server)
2983        let (len1, _) = pipe.client.send(&mut buf).unwrap();
2984        assert!(len1 > 0);
2985
2986        // Verify lost_count is 0 (no losses yet)
2987        let initial_lost_count = pipe
2988            .client
2989            .paths
2990            .get_active()
2991            .unwrap()
2992            .recovery
2993            .lost_count();
2994        assert_eq!(initial_lost_count, 0, "No packets should be lost initially");
2995
2996        // Verify frames are not yet in lost_frames
2997        let initial_lost_frames = pipe
2998            .client
2999            .paths
3000            .get_active()
3001            .unwrap()
3002            .recovery
3003            .lost_frames_count(packet::Epoch::Application);
3004        assert_eq!(
3005            initial_lost_frames, 0,
3006            "No frames should be in lost_frames initially"
3007        );
3008
3009        // Wait for PTO timeout
3010        let timer = pipe.client.timeout().unwrap();
3011        std::thread::sleep(timer + Duration::from_millis(1));
3012
3013        // Trigger PTO via on_timeout()
3014        pipe.client.on_timeout();
3015
3016        // After PTO, frames should be in lost_frames for retransmission
3017        let lost_frames_after_pto = pipe
3018            .client
3019            .paths
3020            .get_active()
3021            .unwrap()
3022            .recovery
3023            .lost_frames_count(packet::Epoch::Application);
3024        assert!(
3025            lost_frames_after_pto > 0,
3026            "PTO should add frames to lost_frames for retransmission"
3027        );
3028
3029        // But lost_count should still be 0 (PTO doesn't declare packets lost)
3030        let lost_count_after_pto = pipe
3031            .client
3032            .paths
3033            .get_active()
3034            .unwrap()
3035            .recovery
3036            .lost_count();
3037        assert_eq!(
3038            lost_count_after_pto, 0,
3039            "PTO should not increment lost_count"
3040        );
3041
3042        // Now send the retransmission via send_on_path
3043        let (len2, _) = pipe.client.send(&mut buf).unwrap();
3044        assert!(len2 > 0, "Should send PTO probe packet");
3045
3046        // After sending, lost_count should still be 0
3047        let lost_count_after_send = pipe
3048            .client
3049            .paths
3050            .get_active()
3051            .unwrap()
3052            .recovery
3053            .lost_count();
3054        assert_eq!(
3055            lost_count_after_send, 0,
3056            "Sending PTO probe should not increment lost_count"
3057        );
3058
3059        // Deliver the retransmission to server
3060        assert_eq!(pipe.server_recv(&mut buf[..len2]), Ok(len2));
3061
3062        // Server should receive the stream data
3063        let mut recv_buf = [0; 100];
3064        assert_eq!(pipe.server.stream_recv(4, &mut recv_buf), Ok((5, false)));
3065        assert_eq!(&recv_buf[..5], b"hello");
3066
3067        // Final verification: lost_count on client should still be 0
3068        let final_lost_count = pipe
3069            .client
3070            .paths
3071            .get_active()
3072            .unwrap()
3073            .recovery
3074            .lost_count();
3075        assert_eq!(
3076            final_lost_count, 0,
3077            "No packets should be marked as lost - PTO only retransmits"
3078        );
3079    }
3080}
3081
3082mod bandwidth;
3083mod bytes_in_flight;
3084mod congestion;
3085mod gcongestion;
3086mod rtt;