1use std::str::FromStr;
28use std::time::Duration;
29use std::time::Instant;
30
31use crate::frame;
32use crate::packet;
33use crate::ranges::RangeSet;
34pub(crate) use crate::recovery::bandwidth::Bandwidth;
35use crate::Config;
36use crate::Result;
37
38#[cfg(feature = "qlog")]
39use qlog::events::EventData;
40#[cfg(feature = "qlog")]
41use serde::Serialize;
42
43use smallvec::SmallVec;
44
45use self::congestion::recovery::LegacyRecovery;
46use self::gcongestion::GRecovery;
47pub use gcongestion::BbrBwLoReductionStrategy;
48pub use gcongestion::BbrParams;
49
50const INITIAL_PACKET_THRESHOLD: u64 = 3;
52
53const MAX_PACKET_THRESHOLD: u64 = 20;
54
55const INITIAL_TIME_THRESHOLD: f64 = 9.0 / 8.0;
59
60const PACKET_REORDER_TIME_THRESHOLD: f64 = 5.0 / 4.0;
72
73const INITIAL_TIME_THRESHOLD_OVERHEAD: f64 = 1.0 / 8.0;
80const TIME_THRESHOLD_OVERHEAD_MULTIPLIER: f64 = 2.0;
84
85const GRANULARITY: Duration = Duration::from_millis(1);
86
87const MAX_PTO_PROBES_COUNT: usize = 2;
88
89const MINIMUM_WINDOW_PACKETS: usize = 2;
90
91const LOSS_REDUCTION_FACTOR: f64 = 0.5;
92
93pub(super) const MAX_OUTSTANDING_NON_ACK_ELICITING: usize = 24;
96
97const MAX_PTO_EXPONENT: u32 = 20;
101
102#[derive(Default)]
103struct LossDetectionTimer {
104 time: Option<Instant>,
105}
106
107impl LossDetectionTimer {
108 fn update(&mut self, timeout: Instant) {
109 self.time = Some(timeout);
110 }
111
112 fn clear(&mut self) {
113 self.time = None;
114 }
115}
116
117impl std::fmt::Debug for LossDetectionTimer {
118 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
119 match self.time {
120 Some(v) => {
121 let now = Instant::now();
122 if v > now {
123 let d = v.duration_since(now);
124 write!(f, "{d:?}")
125 } else {
126 write!(f, "exp")
127 }
128 },
129 None => write!(f, "none"),
130 }
131 }
132}
133
134#[derive(Clone, Copy, PartialEq)]
135pub struct RecoveryConfig {
136 pub initial_rtt: Duration,
137 pub max_send_udp_payload_size: usize,
138 pub max_ack_delay: Duration,
139 pub cc_algorithm: CongestionControlAlgorithm,
140 pub custom_bbr_params: Option<BbrParams>,
141 pub hystart: bool,
142 pub pacing: bool,
143 pub max_pacing_rate: Option<u64>,
144 pub initial_congestion_window_packets: usize,
145 pub enable_relaxed_loss_threshold: bool,
146 pub enable_cubic_idle_restart_fix: bool,
147}
148
149impl RecoveryConfig {
150 pub fn from_config(config: &Config) -> Self {
151 Self {
152 initial_rtt: config.initial_rtt,
153 max_send_udp_payload_size: config.max_send_udp_payload_size,
154 max_ack_delay: Duration::ZERO,
155 cc_algorithm: config.cc_algorithm,
156 custom_bbr_params: config.custom_bbr_params,
157 hystart: config.hystart,
158 pacing: config.pacing,
159 max_pacing_rate: config.max_pacing_rate,
160 initial_congestion_window_packets: config
161 .initial_congestion_window_packets,
162 enable_relaxed_loss_threshold: config.enable_relaxed_loss_threshold,
163 enable_cubic_idle_restart_fix: config.enable_cubic_idle_restart_fix,
164 }
165 }
166}
167
168#[enum_dispatch::enum_dispatch(RecoveryOps)]
169#[allow(clippy::large_enum_variant)]
170#[derive(Debug)]
171pub(crate) enum Recovery {
172 Legacy(LegacyRecovery),
173 GCongestion(GRecovery),
174}
175
176#[derive(Debug, Default, PartialEq)]
177pub struct OnAckReceivedOutcome {
178 pub lost_packets: usize,
179 pub lost_bytes: usize,
180 pub acked_bytes: usize,
181 pub spurious_losses: usize,
182}
183
184#[derive(Debug, Default)]
185pub struct OnLossDetectionTimeoutOutcome {
186 pub lost_packets: usize,
187 pub lost_bytes: usize,
188}
189
190#[enum_dispatch::enum_dispatch]
191pub trait RecoveryOps {
193 fn lost_count(&self) -> usize;
194 fn bytes_lost(&self) -> u64;
195
196 fn should_elicit_ack(&self, epoch: packet::Epoch) -> bool;
199
200 fn next_acked_frame(&mut self, epoch: packet::Epoch) -> Option<frame::Frame>;
201
202 fn next_lost_frame(&mut self, epoch: packet::Epoch) -> Option<frame::Frame>;
203
204 fn get_largest_acked_on_epoch(&self, epoch: packet::Epoch) -> Option<u64>;
205 fn has_lost_frames(&self, epoch: packet::Epoch) -> bool;
206 fn loss_probes(&self, epoch: packet::Epoch) -> usize;
207 #[cfg(test)]
208 fn inc_loss_probes(&mut self, epoch: packet::Epoch);
209 #[cfg(test)]
210 fn lost_frames_count(&self, epoch: packet::Epoch) -> usize;
211
212 fn ping_sent(&mut self, epoch: packet::Epoch);
213
214 fn on_packet_sent(
215 &mut self, pkt: Sent, epoch: packet::Epoch,
216 handshake_status: HandshakeStatus, now: Instant, trace_id: &str,
217 );
218 fn get_packet_send_time(&self, now: Instant) -> Instant;
219
220 #[allow(clippy::too_many_arguments)]
221 fn on_ack_received(
222 &mut self, ranges: &RangeSet, ack_delay: u64, epoch: packet::Epoch,
223 handshake_status: HandshakeStatus, now: Instant, skip_pn: Option<u64>,
224 trace_id: &str,
225 ) -> Result<OnAckReceivedOutcome>;
226
227 fn on_loss_detection_timeout(
228 &mut self, handshake_status: HandshakeStatus, now: Instant,
229 trace_id: &str,
230 ) -> OnLossDetectionTimeoutOutcome;
231 fn on_pkt_num_space_discarded(
232 &mut self, epoch: packet::Epoch, handshake_status: HandshakeStatus,
233 now: Instant,
234 );
235 fn on_path_change(
236 &mut self, epoch: packet::Epoch, now: Instant, _trace_id: &str,
237 ) -> (usize, usize);
238 fn loss_detection_timer(&self) -> Option<Instant>;
239 fn cwnd(&self) -> usize;
240 fn cwnd_available(&self) -> usize;
241 fn rtt(&self) -> Duration;
242
243 fn min_rtt(&self) -> Option<Duration>;
244
245 fn max_rtt(&self) -> Option<Duration>;
246
247 fn rttvar(&self) -> Duration;
248
249 fn pto(&self) -> Duration;
250
251 fn delivery_rate(&self) -> Bandwidth;
253
254 fn max_bandwidth(&self) -> Option<Bandwidth>;
256
257 fn startup_exit(&self) -> Option<StartupExit>;
259
260 fn max_datagram_size(&self) -> usize;
261
262 fn pmtud_update_max_datagram_size(&mut self, new_max_datagram_size: usize);
263
264 fn update_max_datagram_size(&mut self, new_max_datagram_size: usize);
265
266 fn on_app_limited(&mut self);
267
268 #[cfg(test)]
271 fn largest_sent_pkt_num_on_path(&self, epoch: packet::Epoch) -> Option<u64>;
272
273 #[cfg(test)]
274 fn app_limited(&self) -> bool;
275
276 #[cfg(test)]
277 fn sent_packets_len(&self, epoch: packet::Epoch) -> usize;
278
279 fn bytes_in_flight(&self) -> usize;
280
281 fn bytes_in_flight_duration(&self) -> Duration;
282
283 #[cfg(test)]
284 fn in_flight_count(&self, epoch: packet::Epoch) -> usize;
285
286 #[cfg(test)]
287 fn pacing_rate(&self) -> u64;
288
289 #[cfg(test)]
290 fn pto_count(&self) -> u32;
291
292 #[cfg(test)]
295 fn pkt_thresh(&self) -> Option<u64>;
296
297 #[cfg(test)]
298 fn time_thresh(&self) -> f64;
299
300 #[cfg(test)]
301 fn lost_spurious_count(&self) -> usize;
302
303 #[cfg(test)]
304 fn detect_lost_packets_for_test(
305 &mut self, epoch: packet::Epoch, now: Instant,
306 ) -> (usize, usize);
307
308 fn update_app_limited(&mut self, v: bool);
309
310 fn delivery_rate_update_app_limited(&mut self, v: bool);
311
312 fn update_max_ack_delay(&mut self, max_ack_delay: Duration);
313
314 #[cfg(feature = "qlog")]
315 fn state_str(&self, now: Instant) -> &'static str;
316
317 #[cfg(feature = "qlog")]
318 fn get_updated_qlog_event_data(&mut self) -> Option<EventData>;
319
320 #[cfg(feature = "qlog")]
321 fn get_updated_qlog_cc_state(&mut self, now: Instant)
322 -> Option<&'static str>;
323
324 fn send_quantum(&self) -> usize;
325
326 fn get_next_release_time(&self) -> ReleaseDecision;
327
328 fn gcongestion_enabled(&self) -> bool;
329}
330
331impl Recovery {
332 pub fn new_with_config(recovery_config: &RecoveryConfig) -> Self {
333 let grecovery = GRecovery::new(recovery_config);
334 if let Some(grecovery) = grecovery {
335 Recovery::from(grecovery)
336 } else {
337 Recovery::from(LegacyRecovery::new_with_config(recovery_config))
338 }
339 }
340
341 #[cfg(feature = "qlog")]
342 pub fn maybe_qlog(
343 &mut self, qlog: &mut qlog::streamer::QlogStreamer, now: Instant,
344 ) {
345 if let Some(ev_data) = self.get_updated_qlog_event_data() {
346 qlog.add_event_data_with_instant(ev_data, now).ok();
347 }
348
349 if let Some(cc_state) = self.get_updated_qlog_cc_state(now) {
350 let ev_data = EventData::QuicCongestionStateUpdated(
351 qlog::events::quic::CongestionStateUpdated {
352 old: None,
353 new: cc_state.to_string(),
354 trigger: None,
355 },
356 );
357
358 qlog.add_event_data_with_instant(ev_data, now).ok();
359 }
360 }
361
362 #[cfg(test)]
363 pub fn new(config: &Config) -> Self {
364 Self::new_with_config(&RecoveryConfig::from_config(config))
365 }
366}
367
368#[derive(Debug, Copy, Clone, PartialEq, Eq)]
373#[repr(C)]
374pub enum CongestionControlAlgorithm {
375 Reno = 0,
377 CUBIC = 1,
379 Bbr2Gcongestion = 4,
382}
383
384impl FromStr for CongestionControlAlgorithm {
385 type Err = crate::Error;
386
387 fn from_str(name: &str) -> std::result::Result<Self, Self::Err> {
391 match name {
392 "reno" => Ok(CongestionControlAlgorithm::Reno),
393 "cubic" => Ok(CongestionControlAlgorithm::CUBIC),
394 "bbr" => Ok(CongestionControlAlgorithm::Bbr2Gcongestion),
395 "bbr2" => Ok(CongestionControlAlgorithm::Bbr2Gcongestion),
396 "bbr2_gcongestion" => Ok(CongestionControlAlgorithm::Bbr2Gcongestion),
397 _ => Err(crate::Error::CongestionControl),
398 }
399 }
400}
401
402#[derive(Clone)]
403pub struct Sent {
404 pub pkt_num: u64,
405
406 pub frames: SmallVec<[frame::Frame; 1]>,
407
408 pub time_sent: Instant,
409
410 pub time_acked: Option<Instant>,
411
412 pub time_lost: Option<Instant>,
413
414 pub size: usize,
415
416 pub ack_eliciting: bool,
417
418 pub in_flight: bool,
419
420 pub delivered: usize,
421
422 pub delivered_time: Instant,
423
424 pub first_sent_time: Instant,
425
426 pub is_app_limited: bool,
427
428 pub tx_in_flight: usize,
429
430 pub lost: u64,
431
432 pub has_data: bool,
433
434 pub is_pmtud_probe: bool,
435}
436
437impl std::fmt::Debug for Sent {
438 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
439 write!(f, "pkt_num={:?} ", self.pkt_num)?;
440 write!(f, "pkt_sent_time={:?} ", self.time_sent)?;
441 write!(f, "pkt_size={:?} ", self.size)?;
442 write!(f, "delivered={:?} ", self.delivered)?;
443 write!(f, "delivered_time={:?} ", self.delivered_time)?;
444 write!(f, "first_sent_time={:?} ", self.first_sent_time)?;
445 write!(f, "is_app_limited={} ", self.is_app_limited)?;
446 write!(f, "tx_in_flight={} ", self.tx_in_flight)?;
447 write!(f, "lost={} ", self.lost)?;
448 write!(f, "has_data={} ", self.has_data)?;
449 write!(f, "is_pmtud_probe={}", self.is_pmtud_probe)?;
450
451 Ok(())
452 }
453}
454
455#[derive(Clone, Copy, Debug)]
456pub struct HandshakeStatus {
457 pub has_handshake_keys: bool,
458
459 pub peer_verified_address: bool,
460
461 pub completed: bool,
462}
463
464#[cfg(test)]
465impl Default for HandshakeStatus {
466 fn default() -> HandshakeStatus {
467 HandshakeStatus {
468 has_handshake_keys: true,
469
470 peer_verified_address: true,
471
472 completed: true,
473 }
474 }
475}
476
477#[derive(Default)]
482#[cfg(feature = "qlog")]
483struct QlogMetrics {
484 min_rtt: Duration,
485 smoothed_rtt: Duration,
486 latest_rtt: Duration,
487 rttvar: Duration,
488 cwnd: u64,
489 bytes_in_flight: u64,
490 ssthresh: Option<u64>,
491 pacing_rate: Option<u64>,
492 delivery_rate: Option<u64>,
493 send_rate: Option<u64>,
494 ack_rate: Option<u64>,
495 lost_packets: Option<u64>,
496 lost_bytes: Option<u64>,
497 pto_count: Option<u32>,
498}
499
500#[cfg(feature = "qlog")]
501trait CustomCfQlogField {
502 fn name(&self) -> &'static str;
503 fn as_json_value(&self) -> serde_json::Value;
504}
505
506#[cfg(feature = "qlog")]
507#[serde_with::skip_serializing_none]
508#[derive(Serialize)]
509struct TotalAndDelta {
510 total: Option<u64>,
511 delta: Option<u64>,
512}
513
514#[cfg(feature = "qlog")]
515struct CustomQlogField<T> {
516 name: &'static str,
517 value: T,
518}
519
520#[cfg(feature = "qlog")]
521impl<T> CustomQlogField<T> {
522 fn new(name: &'static str, value: T) -> Self {
523 Self { name, value }
524 }
525}
526
527#[cfg(feature = "qlog")]
528impl<T: Serialize> CustomCfQlogField for CustomQlogField<T> {
529 fn name(&self) -> &'static str {
530 self.name
531 }
532
533 fn as_json_value(&self) -> serde_json::Value {
534 serde_json::json!(&self.value)
535 }
536}
537
538#[cfg(feature = "qlog")]
539struct CfExData(qlog::events::ExData);
540
541#[cfg(feature = "qlog")]
542impl CfExData {
543 fn new() -> Self {
544 Self(qlog::events::ExData::new())
545 }
546
547 fn insert<T: Serialize>(&mut self, name: &'static str, value: T) {
548 let field = CustomQlogField::new(name, value);
549 self.0
550 .insert(field.name().to_string(), field.as_json_value());
551 }
552
553 fn into_inner(self) -> qlog::events::ExData {
554 self.0
555 }
556}
557
558#[cfg(feature = "qlog")]
559impl QlogMetrics {
560 fn maybe_update(&mut self, latest: Self) -> Option<EventData> {
566 let mut emit_event = false;
567
568 let new_min_rtt = if self.min_rtt != latest.min_rtt {
569 self.min_rtt = latest.min_rtt;
570 emit_event = true;
571 Some(latest.min_rtt.as_secs_f32() * 1000.0)
572 } else {
573 None
574 };
575
576 let new_smoothed_rtt = if self.smoothed_rtt != latest.smoothed_rtt {
577 self.smoothed_rtt = latest.smoothed_rtt;
578 emit_event = true;
579 Some(latest.smoothed_rtt.as_secs_f32() * 1000.0)
580 } else {
581 None
582 };
583
584 let new_latest_rtt = if self.latest_rtt != latest.latest_rtt {
585 self.latest_rtt = latest.latest_rtt;
586 emit_event = true;
587 Some(latest.latest_rtt.as_secs_f32() * 1000.0)
588 } else {
589 None
590 };
591
592 let new_rttvar = if self.rttvar != latest.rttvar {
593 self.rttvar = latest.rttvar;
594 emit_event = true;
595 Some(latest.rttvar.as_secs_f32() * 1000.0)
596 } else {
597 None
598 };
599
600 let new_cwnd = if self.cwnd != latest.cwnd {
601 self.cwnd = latest.cwnd;
602 emit_event = true;
603 Some(latest.cwnd)
604 } else {
605 None
606 };
607
608 let new_bytes_in_flight =
609 if self.bytes_in_flight != latest.bytes_in_flight {
610 self.bytes_in_flight = latest.bytes_in_flight;
611 emit_event = true;
612 Some(latest.bytes_in_flight)
613 } else {
614 None
615 };
616
617 let new_ssthresh = if self.ssthresh != latest.ssthresh {
618 self.ssthresh = latest.ssthresh;
619 emit_event = true;
620 latest.ssthresh
621 } else {
622 None
623 };
624
625 let new_pacing_rate = if self.pacing_rate != latest.pacing_rate {
626 self.pacing_rate = latest.pacing_rate;
627 emit_event = true;
628 latest.pacing_rate
629 } else {
630 None
631 };
632
633 let new_pto_count =
634 if latest.pto_count.is_some() && self.pto_count != latest.pto_count {
635 self.pto_count = latest.pto_count;
636 emit_event = true;
637 latest.pto_count.map(|v| v as u16)
638 } else {
639 None
640 };
641
642 let mut ex_data = CfExData::new();
644 if self.delivery_rate != latest.delivery_rate {
645 if let Some(rate) = latest.delivery_rate {
646 self.delivery_rate = latest.delivery_rate;
647 emit_event = true;
648 ex_data.insert("cf_delivery_rate", rate);
649 }
650 }
651 if self.send_rate != latest.send_rate {
652 if let Some(rate) = latest.send_rate {
653 self.send_rate = latest.send_rate;
654 emit_event = true;
655 ex_data.insert("cf_send_rate", rate);
656 }
657 }
658 if self.ack_rate != latest.ack_rate {
659 if let Some(rate) = latest.ack_rate {
660 self.ack_rate = latest.ack_rate;
661 emit_event = true;
662 ex_data.insert("cf_ack_rate", rate);
663 }
664 }
665
666 if self.lost_packets != latest.lost_packets {
667 if let Some(val) = latest.lost_packets {
668 emit_event = true;
669 ex_data.insert("cf_lost_packets", TotalAndDelta {
670 total: latest.lost_packets,
671 delta: Some(val - self.lost_packets.unwrap_or(0)),
672 });
673 self.lost_packets = latest.lost_packets;
674 }
675 }
676 if self.lost_bytes != latest.lost_bytes {
677 if let Some(val) = latest.lost_bytes {
678 emit_event = true;
679 ex_data.insert("cf_lost_bytes", TotalAndDelta {
680 total: latest.lost_bytes,
681 delta: Some(val - self.lost_bytes.unwrap_or(0)),
682 });
683 self.lost_bytes = latest.lost_bytes;
684 }
685 }
686
687 if emit_event {
688 return Some(EventData::QuicMetricsUpdated(
689 qlog::events::quic::RecoveryMetricsUpdated {
690 min_rtt: new_min_rtt,
691 smoothed_rtt: new_smoothed_rtt,
692 latest_rtt: new_latest_rtt,
693 rtt_variance: new_rttvar,
694 congestion_window: new_cwnd,
695 bytes_in_flight: new_bytes_in_flight,
696 ssthresh: new_ssthresh,
697 pacing_rate: new_pacing_rate,
698 pto_count: new_pto_count,
699 ex_data: ex_data.into_inner(),
700 ..Default::default()
701 },
702 ));
703 }
704
705 None
706 }
707}
708
709#[derive(Debug, Clone, Copy, PartialEq, Eq)]
711pub enum ReleaseTime {
712 Immediate,
713 At(Instant),
714}
715
716#[derive(Clone, Copy, Debug, PartialEq, Eq)]
718pub struct ReleaseDecision {
719 time: ReleaseTime,
720 allow_burst: bool,
721}
722
723impl ReleaseTime {
724 fn inc(&mut self, delay: Duration) {
726 match self {
727 ReleaseTime::Immediate => {},
728 ReleaseTime::At(time) => *time += delay,
729 }
730 }
731
732 fn set_max(&mut self, other: Instant) {
734 match self {
735 ReleaseTime::Immediate => *self = ReleaseTime::At(other),
736 ReleaseTime::At(time) => *self = ReleaseTime::At(other.max(*time)),
737 }
738 }
739}
740
741impl ReleaseDecision {
742 pub(crate) const EQUAL_THRESHOLD: Duration = Duration::from_micros(50);
743
744 #[inline]
747 pub fn time(&self, now: Instant) -> Option<Instant> {
748 match self.time {
749 ReleaseTime::Immediate => None,
750 ReleaseTime::At(other) => other.gt(&now).then_some(other),
751 }
752 }
753
754 #[inline]
756 pub fn can_burst(&self) -> bool {
757 self.allow_burst
758 }
759
760 #[inline]
762 pub fn time_eq(&self, other: &Self, now: Instant) -> bool {
763 let delta = match (self.time(now), other.time(now)) {
764 (None, None) => Duration::ZERO,
765 (Some(t), None) | (None, Some(t)) => t.duration_since(now),
766 (Some(t1), Some(t2)) if t1 < t2 => t2.duration_since(t1),
767 (Some(t1), Some(t2)) => t1.duration_since(t2),
768 };
769
770 delta <= Self::EQUAL_THRESHOLD
771 }
772}
773
774#[derive(Default, Debug)]
776pub struct RecoveryStats {
777 startup_exit: Option<StartupExit>,
778}
779
780impl RecoveryStats {
781 pub fn set_startup_exit(&mut self, startup_exit: StartupExit) {
783 if self.startup_exit.is_none() {
784 self.startup_exit = Some(startup_exit);
785 }
786 }
787}
788
789#[derive(Debug, Clone, Copy, PartialEq)]
791pub struct StartupExit {
792 pub cwnd: usize,
794
795 pub bandwidth: Option<u64>,
797
798 pub reason: StartupExitReason,
800}
801
802impl StartupExit {
803 fn new(
804 cwnd: usize, bandwidth: Option<Bandwidth>, reason: StartupExitReason,
805 ) -> Self {
806 let bandwidth = bandwidth.map(Bandwidth::to_bytes_per_second);
807 Self {
808 cwnd,
809 bandwidth,
810 reason,
811 }
812 }
813}
814
815#[derive(Debug, Clone, Copy, PartialEq)]
817pub enum StartupExitReason {
818 Loss,
820
821 BandwidthPlateau,
823
824 PersistentQueue,
826
827 ConservativeSlowStartRounds,
829}
830
831#[cfg(test)]
832mod tests {
833 use super::*;
834 use crate::packet;
835 use crate::range_buf::RangeBuf;
836 use crate::test_utils;
837 use crate::CongestionControlAlgorithm;
838 use crate::DEFAULT_INITIAL_RTT;
839 use rstest::rstest;
840 use smallvec::smallvec;
841 use std::str::FromStr;
842
843 fn recovery_for_alg(algo: CongestionControlAlgorithm) -> Recovery {
844 let mut cfg = Config::new(crate::PROTOCOL_VERSION).unwrap();
845 cfg.set_cc_algorithm(algo);
846 Recovery::new(&cfg)
847 }
848
849 #[test]
850 fn lookup_cc_algo_ok() {
851 let algo = CongestionControlAlgorithm::from_str("reno").unwrap();
852 assert_eq!(algo, CongestionControlAlgorithm::Reno);
853 assert!(!recovery_for_alg(algo).gcongestion_enabled());
854
855 let algo = CongestionControlAlgorithm::from_str("cubic").unwrap();
856 assert_eq!(algo, CongestionControlAlgorithm::CUBIC);
857 assert!(!recovery_for_alg(algo).gcongestion_enabled());
858
859 let algo = CongestionControlAlgorithm::from_str("bbr").unwrap();
860 assert_eq!(algo, CongestionControlAlgorithm::Bbr2Gcongestion);
861 assert!(recovery_for_alg(algo).gcongestion_enabled());
862
863 let algo = CongestionControlAlgorithm::from_str("bbr2").unwrap();
864 assert_eq!(algo, CongestionControlAlgorithm::Bbr2Gcongestion);
865 assert!(recovery_for_alg(algo).gcongestion_enabled());
866
867 let algo =
868 CongestionControlAlgorithm::from_str("bbr2_gcongestion").unwrap();
869 assert_eq!(algo, CongestionControlAlgorithm::Bbr2Gcongestion);
870 assert!(recovery_for_alg(algo).gcongestion_enabled());
871 }
872
873 #[test]
874 fn lookup_cc_algo_bad() {
875 assert_eq!(
876 CongestionControlAlgorithm::from_str("???"),
877 Err(crate::Error::CongestionControl)
878 );
879 }
880
881 #[rstest]
882 fn loss_on_pto(
883 #[values("reno", "cubic", "bbr2_gcongestion")] cc_algorithm_name: &str,
884 ) {
885 let mut cfg = Config::new(crate::PROTOCOL_VERSION).unwrap();
886 assert_eq!(cfg.set_cc_algorithm_name(cc_algorithm_name), Ok(()));
887
888 let mut r = Recovery::new(&cfg);
889
890 let mut now = Instant::now();
891
892 assert_eq!(r.sent_packets_len(packet::Epoch::Application), 0);
893
894 let p = Sent {
896 pkt_num: 0,
897 frames: smallvec![],
898 time_sent: now,
899 time_acked: None,
900 time_lost: None,
901 size: 1000,
902 ack_eliciting: true,
903 in_flight: true,
904 delivered: 0,
905 delivered_time: now,
906 first_sent_time: now,
907 is_app_limited: false,
908 tx_in_flight: 0,
909 lost: 0,
910 has_data: false,
911 is_pmtud_probe: false,
912 };
913
914 r.on_packet_sent(
915 p,
916 packet::Epoch::Application,
917 HandshakeStatus::default(),
918 now,
919 "",
920 );
921
922 assert_eq!(r.sent_packets_len(packet::Epoch::Application), 1);
923 assert_eq!(r.bytes_in_flight(), 1000);
924 assert_eq!(r.bytes_in_flight_duration(), Duration::ZERO);
925
926 let p = Sent {
927 pkt_num: 1,
928 frames: smallvec![],
929 time_sent: now,
930 time_acked: None,
931 time_lost: None,
932 size: 1000,
933 ack_eliciting: true,
934 in_flight: true,
935 delivered: 0,
936 delivered_time: now,
937 first_sent_time: now,
938 is_app_limited: false,
939 tx_in_flight: 0,
940 lost: 0,
941 has_data: false,
942 is_pmtud_probe: false,
943 };
944
945 r.on_packet_sent(
946 p,
947 packet::Epoch::Application,
948 HandshakeStatus::default(),
949 now,
950 "",
951 );
952
953 assert_eq!(r.sent_packets_len(packet::Epoch::Application), 2);
954 assert_eq!(r.bytes_in_flight(), 2000);
955 assert_eq!(r.bytes_in_flight_duration(), Duration::ZERO);
956
957 let p = Sent {
958 pkt_num: 2,
959 frames: smallvec![],
960 time_sent: now,
961 time_acked: None,
962 time_lost: None,
963 size: 1000,
964 ack_eliciting: true,
965 in_flight: true,
966 delivered: 0,
967 delivered_time: now,
968 first_sent_time: now,
969 is_app_limited: false,
970 tx_in_flight: 0,
971 lost: 0,
972 has_data: false,
973 is_pmtud_probe: false,
974 };
975
976 r.on_packet_sent(
977 p,
978 packet::Epoch::Application,
979 HandshakeStatus::default(),
980 now,
981 "",
982 );
983 assert_eq!(r.sent_packets_len(packet::Epoch::Application), 3);
984 assert_eq!(r.bytes_in_flight(), 3000);
985 assert_eq!(r.bytes_in_flight_duration(), Duration::ZERO);
986
987 let p = Sent {
988 pkt_num: 3,
989 frames: smallvec![],
990 time_sent: now,
991 time_acked: None,
992 time_lost: None,
993 size: 1000,
994 ack_eliciting: true,
995 in_flight: true,
996 delivered: 0,
997 delivered_time: now,
998 first_sent_time: now,
999 is_app_limited: false,
1000 tx_in_flight: 0,
1001 lost: 0,
1002 has_data: false,
1003 is_pmtud_probe: false,
1004 };
1005
1006 r.on_packet_sent(
1007 p,
1008 packet::Epoch::Application,
1009 HandshakeStatus::default(),
1010 now,
1011 "",
1012 );
1013 assert_eq!(r.sent_packets_len(packet::Epoch::Application), 4);
1014 assert_eq!(r.bytes_in_flight(), 4000);
1015 assert_eq!(r.bytes_in_flight_duration(), Duration::ZERO);
1016
1017 now += Duration::from_millis(10);
1019
1020 let mut acked = RangeSet::default();
1022 acked.insert(0..2);
1023
1024 assert_eq!(
1025 r.on_ack_received(
1026 &acked,
1027 25,
1028 packet::Epoch::Application,
1029 HandshakeStatus::default(),
1030 now,
1031 None,
1032 "",
1033 )
1034 .unwrap(),
1035 OnAckReceivedOutcome {
1036 lost_packets: 0,
1037 lost_bytes: 0,
1038 acked_bytes: 2 * 1000,
1039 spurious_losses: 0,
1040 }
1041 );
1042
1043 assert_eq!(r.sent_packets_len(packet::Epoch::Application), 2);
1044 assert_eq!(r.bytes_in_flight(), 2000);
1045 assert_eq!(r.bytes_in_flight_duration(), Duration::from_millis(10));
1046 assert_eq!(r.lost_count(), 0);
1047
1048 now = r.loss_detection_timer().unwrap();
1050
1051 r.on_loss_detection_timeout(HandshakeStatus::default(), now, "");
1053 assert_eq!(r.loss_probes(packet::Epoch::Application), 1);
1054 assert_eq!(r.lost_count(), 0);
1055 assert_eq!(r.pto_count(), 1);
1056
1057 let p = Sent {
1058 pkt_num: 4,
1059 frames: smallvec![],
1060 time_sent: now,
1061 time_acked: None,
1062 time_lost: None,
1063 size: 1000,
1064 ack_eliciting: true,
1065 in_flight: true,
1066 delivered: 0,
1067 delivered_time: now,
1068 first_sent_time: now,
1069 is_app_limited: false,
1070 tx_in_flight: 0,
1071 lost: 0,
1072 has_data: false,
1073 is_pmtud_probe: false,
1074 };
1075
1076 r.on_packet_sent(
1077 p,
1078 packet::Epoch::Application,
1079 HandshakeStatus::default(),
1080 now,
1081 "",
1082 );
1083 assert_eq!(r.sent_packets_len(packet::Epoch::Application), 3);
1084 assert_eq!(r.bytes_in_flight(), 3000);
1085 assert_eq!(r.bytes_in_flight_duration(), Duration::from_millis(30));
1086
1087 let p = Sent {
1088 pkt_num: 5,
1089 frames: smallvec![],
1090 time_sent: now,
1091 time_acked: None,
1092 time_lost: None,
1093 size: 1000,
1094 ack_eliciting: true,
1095 in_flight: true,
1096 delivered: 0,
1097 delivered_time: now,
1098 first_sent_time: now,
1099 is_app_limited: false,
1100 tx_in_flight: 0,
1101 lost: 0,
1102 has_data: false,
1103 is_pmtud_probe: false,
1104 };
1105
1106 r.on_packet_sent(
1107 p,
1108 packet::Epoch::Application,
1109 HandshakeStatus::default(),
1110 now,
1111 "",
1112 );
1113 assert_eq!(r.sent_packets_len(packet::Epoch::Application), 4);
1114 assert_eq!(r.bytes_in_flight(), 4000);
1115 assert_eq!(r.bytes_in_flight_duration(), Duration::from_millis(30));
1116 assert_eq!(r.lost_count(), 0);
1117
1118 now += Duration::from_millis(10);
1120
1121 let mut acked = RangeSet::default();
1123 acked.insert(4..6);
1124
1125 assert_eq!(
1126 r.on_ack_received(
1127 &acked,
1128 25,
1129 packet::Epoch::Application,
1130 HandshakeStatus::default(),
1131 now,
1132 None,
1133 "",
1134 )
1135 .unwrap(),
1136 OnAckReceivedOutcome {
1137 lost_packets: 2,
1138 lost_bytes: 2000,
1139 acked_bytes: 2 * 1000,
1140 spurious_losses: 0,
1141 }
1142 );
1143
1144 assert_eq!(r.sent_packets_len(packet::Epoch::Application), 4);
1145 assert_eq!(r.bytes_in_flight(), 0);
1146 assert_eq!(r.bytes_in_flight_duration(), Duration::from_millis(40));
1147
1148 assert_eq!(r.lost_count(), 2);
1149
1150 now += r.rtt();
1152
1153 assert_eq!(
1154 r.detect_lost_packets_for_test(packet::Epoch::Application, now),
1155 (0, 0)
1156 );
1157
1158 assert_eq!(r.sent_packets_len(packet::Epoch::Application), 0);
1159 if cc_algorithm_name == "reno" || cc_algorithm_name == "cubic" {
1160 assert!(r.startup_exit().is_some());
1161 assert_eq!(r.startup_exit().unwrap().reason, StartupExitReason::Loss);
1162 } else {
1163 assert_eq!(r.startup_exit(), None);
1164 }
1165 }
1166
1167 #[rstest]
1168 fn loss_on_timer(
1169 #[values("reno", "cubic", "bbr2_gcongestion")] cc_algorithm_name: &str,
1170 ) {
1171 let mut cfg = Config::new(crate::PROTOCOL_VERSION).unwrap();
1172 assert_eq!(cfg.set_cc_algorithm_name(cc_algorithm_name), Ok(()));
1173
1174 let mut r = Recovery::new(&cfg);
1175
1176 let mut now = Instant::now();
1177
1178 assert_eq!(r.sent_packets_len(packet::Epoch::Application), 0);
1179
1180 let p = Sent {
1182 pkt_num: 0,
1183 frames: smallvec![],
1184 time_sent: now,
1185 time_acked: None,
1186 time_lost: None,
1187 size: 1000,
1188 ack_eliciting: true,
1189 in_flight: true,
1190 delivered: 0,
1191 delivered_time: now,
1192 first_sent_time: now,
1193 is_app_limited: false,
1194 tx_in_flight: 0,
1195 lost: 0,
1196 has_data: false,
1197 is_pmtud_probe: false,
1198 };
1199
1200 r.on_packet_sent(
1201 p,
1202 packet::Epoch::Application,
1203 HandshakeStatus::default(),
1204 now,
1205 "",
1206 );
1207 assert_eq!(r.sent_packets_len(packet::Epoch::Application), 1);
1208 assert_eq!(r.bytes_in_flight(), 1000);
1209 assert_eq!(r.bytes_in_flight_duration(), Duration::ZERO);
1210
1211 let p = Sent {
1212 pkt_num: 1,
1213 frames: smallvec![],
1214 time_sent: now,
1215 time_acked: None,
1216 time_lost: None,
1217 size: 1000,
1218 ack_eliciting: true,
1219 in_flight: true,
1220 delivered: 0,
1221 delivered_time: now,
1222 first_sent_time: now,
1223 is_app_limited: false,
1224 tx_in_flight: 0,
1225 lost: 0,
1226 has_data: false,
1227 is_pmtud_probe: false,
1228 };
1229
1230 r.on_packet_sent(
1231 p,
1232 packet::Epoch::Application,
1233 HandshakeStatus::default(),
1234 now,
1235 "",
1236 );
1237 assert_eq!(r.sent_packets_len(packet::Epoch::Application), 2);
1238 assert_eq!(r.bytes_in_flight(), 2000);
1239 assert_eq!(r.bytes_in_flight_duration(), Duration::ZERO);
1240
1241 let p = Sent {
1242 pkt_num: 2,
1243 frames: smallvec![],
1244 time_sent: now,
1245 time_acked: None,
1246 time_lost: None,
1247 size: 1000,
1248 ack_eliciting: true,
1249 in_flight: true,
1250 delivered: 0,
1251 delivered_time: now,
1252 first_sent_time: now,
1253 is_app_limited: false,
1254 tx_in_flight: 0,
1255 lost: 0,
1256 has_data: false,
1257 is_pmtud_probe: false,
1258 };
1259
1260 r.on_packet_sent(
1261 p,
1262 packet::Epoch::Application,
1263 HandshakeStatus::default(),
1264 now,
1265 "",
1266 );
1267 assert_eq!(r.sent_packets_len(packet::Epoch::Application), 3);
1268 assert_eq!(r.bytes_in_flight(), 3000);
1269 assert_eq!(r.bytes_in_flight_duration(), Duration::ZERO);
1270
1271 let p = Sent {
1272 pkt_num: 3,
1273 frames: smallvec![],
1274 time_sent: now,
1275 time_acked: None,
1276 time_lost: None,
1277 size: 1000,
1278 ack_eliciting: true,
1279 in_flight: true,
1280 delivered: 0,
1281 delivered_time: now,
1282 first_sent_time: now,
1283 is_app_limited: false,
1284 tx_in_flight: 0,
1285 lost: 0,
1286 has_data: false,
1287 is_pmtud_probe: false,
1288 };
1289
1290 r.on_packet_sent(
1291 p,
1292 packet::Epoch::Application,
1293 HandshakeStatus::default(),
1294 now,
1295 "",
1296 );
1297 assert_eq!(r.sent_packets_len(packet::Epoch::Application), 4);
1298 assert_eq!(r.bytes_in_flight(), 4000);
1299 assert_eq!(r.bytes_in_flight_duration(), Duration::ZERO);
1300
1301 now += Duration::from_millis(10);
1303
1304 let mut acked = RangeSet::default();
1306 acked.insert(0..2);
1307 acked.insert(3..4);
1308
1309 assert_eq!(
1310 r.on_ack_received(
1311 &acked,
1312 25,
1313 packet::Epoch::Application,
1314 HandshakeStatus::default(),
1315 now,
1316 None,
1317 "",
1318 )
1319 .unwrap(),
1320 OnAckReceivedOutcome {
1321 lost_packets: 0,
1322 lost_bytes: 0,
1323 acked_bytes: 3 * 1000,
1324 spurious_losses: 0,
1325 }
1326 );
1327
1328 assert_eq!(r.sent_packets_len(packet::Epoch::Application), 2);
1329 assert_eq!(r.bytes_in_flight(), 1000);
1330 assert_eq!(r.bytes_in_flight_duration(), Duration::from_millis(10));
1331 assert_eq!(r.lost_count(), 0);
1332
1333 now = r.loss_detection_timer().unwrap();
1335
1336 r.on_loss_detection_timeout(HandshakeStatus::default(), now, "");
1338 assert_eq!(r.loss_probes(packet::Epoch::Application), 0);
1339
1340 assert_eq!(r.sent_packets_len(packet::Epoch::Application), 2);
1341 assert_eq!(r.bytes_in_flight(), 0);
1342 assert_eq!(r.bytes_in_flight_duration(), Duration::from_micros(11250));
1343
1344 assert_eq!(r.lost_count(), 1);
1345
1346 now += r.rtt();
1348
1349 assert_eq!(
1350 r.detect_lost_packets_for_test(packet::Epoch::Application, now),
1351 (0, 0)
1352 );
1353
1354 assert_eq!(r.sent_packets_len(packet::Epoch::Application), 0);
1355 if cc_algorithm_name == "reno" || cc_algorithm_name == "cubic" {
1356 assert!(r.startup_exit().is_some());
1357 assert_eq!(r.startup_exit().unwrap().reason, StartupExitReason::Loss);
1358 } else {
1359 assert_eq!(r.startup_exit(), None);
1360 }
1361 }
1362
1363 #[rstest]
1364 fn loss_on_reordering(
1365 #[values("reno", "cubic", "bbr2_gcongestion")] cc_algorithm_name: &str,
1366 ) {
1367 let mut cfg = Config::new(crate::PROTOCOL_VERSION).unwrap();
1368 assert_eq!(cfg.set_cc_algorithm_name(cc_algorithm_name), Ok(()));
1369
1370 let mut r = Recovery::new(&cfg);
1371
1372 let mut now = Instant::now();
1373
1374 assert_eq!(r.sent_packets_len(packet::Epoch::Application), 0);
1375
1376 for i in 0..4 {
1380 let p = test_utils::helper_packet_sent(i, now, 1000);
1381 r.on_packet_sent(
1382 p,
1383 packet::Epoch::Application,
1384 HandshakeStatus::default(),
1385 now,
1386 "",
1387 );
1388
1389 let pkt_count = (i + 1) as usize;
1390 assert_eq!(r.sent_packets_len(packet::Epoch::Application), pkt_count);
1391 assert_eq!(r.bytes_in_flight(), pkt_count * 1000);
1392 assert_eq!(r.bytes_in_flight_duration(), Duration::ZERO);
1393 }
1394
1395 now += Duration::from_millis(10);
1397
1398 let mut acked = RangeSet::default();
1400 acked.insert(2..4);
1401 assert_eq!(
1402 r.on_ack_received(
1403 &acked,
1404 25,
1405 packet::Epoch::Application,
1406 HandshakeStatus::default(),
1407 now,
1408 None,
1409 "",
1410 )
1411 .unwrap(),
1412 OnAckReceivedOutcome {
1413 lost_packets: 1,
1414 lost_bytes: 1000,
1415 acked_bytes: 1000 * 2,
1416 spurious_losses: 0,
1417 }
1418 );
1419 assert_eq!(r.sent_packets_len(packet::Epoch::Application), 4);
1422 assert_eq!(r.pkt_thresh().unwrap(), INITIAL_PACKET_THRESHOLD);
1423 assert_eq!(r.time_thresh(), INITIAL_TIME_THRESHOLD);
1424
1425 now += Duration::from_millis(10);
1427
1428 let mut acked = RangeSet::default();
1430 acked.insert(0..2);
1431 assert_eq!(
1432 r.on_ack_received(
1433 &acked,
1434 25,
1435 packet::Epoch::Application,
1436 HandshakeStatus::default(),
1437 now,
1438 None,
1439 "",
1440 )
1441 .unwrap(),
1442 OnAckReceivedOutcome {
1443 lost_packets: 0,
1444 lost_bytes: 0,
1445 acked_bytes: 1000,
1446 spurious_losses: 1,
1447 }
1448 );
1449 assert_eq!(r.sent_packets_len(packet::Epoch::Application), 0);
1450 assert_eq!(r.bytes_in_flight(), 0);
1451 assert_eq!(r.bytes_in_flight_duration(), Duration::from_millis(20));
1452
1453 assert_eq!(r.lost_count(), 1);
1455 assert_eq!(r.lost_spurious_count(), 1);
1456
1457 assert_eq!(r.pkt_thresh().unwrap(), 4);
1459 assert_eq!(r.time_thresh(), PACKET_REORDER_TIME_THRESHOLD);
1460
1461 now += r.rtt();
1463
1464 assert_eq!(
1466 r.detect_lost_packets_for_test(packet::Epoch::Application, now),
1467 (0, 0)
1468 );
1469 assert_eq!(r.sent_packets_len(packet::Epoch::Application), 0);
1470
1471 if cc_algorithm_name == "reno" || cc_algorithm_name == "cubic" {
1472 assert!(r.startup_exit().is_some());
1473 assert_eq!(r.startup_exit().unwrap().reason, StartupExitReason::Loss);
1474 } else {
1475 assert_eq!(r.startup_exit(), None);
1476 }
1477 }
1478
1479 #[rstest]
1484 fn time_thresholds_on_reordering(
1485 #[values("bbr2_gcongestion")] cc_algorithm_name: &str,
1486 ) {
1487 let mut cfg = Config::new(crate::PROTOCOL_VERSION).unwrap();
1488 assert_eq!(cfg.set_cc_algorithm_name(cc_algorithm_name), Ok(()));
1489
1490 let mut now = Instant::now();
1491 let mut r = Recovery::new(&cfg);
1492 assert_eq!(r.rtt(), DEFAULT_INITIAL_RTT);
1493
1494 const THRESH_GAP: Duration = Duration::from_millis(30);
1508 let initial_thresh_ms =
1510 DEFAULT_INITIAL_RTT.mul_f64(INITIAL_TIME_THRESHOLD);
1511 let spurious_thresh_ms: Duration =
1513 DEFAULT_INITIAL_RTT.mul_f64(PACKET_REORDER_TIME_THRESHOLD);
1514 let between_thresh_ms = initial_thresh_ms + THRESH_GAP;
1516 assert!(between_thresh_ms > initial_thresh_ms);
1517 assert!(between_thresh_ms < spurious_thresh_ms);
1518 assert!(between_thresh_ms + THRESH_GAP > spurious_thresh_ms);
1519
1520 for i in 0..6 {
1521 let send_time = now + i * between_thresh_ms;
1522
1523 let p = test_utils::helper_packet_sent(i.into(), send_time, 1000);
1524 r.on_packet_sent(
1525 p,
1526 packet::Epoch::Application,
1527 HandshakeStatus::default(),
1528 send_time,
1529 "",
1530 );
1531 }
1532
1533 assert_eq!(r.sent_packets_len(packet::Epoch::Application), 6);
1534 assert_eq!(r.bytes_in_flight(), 6 * 1000);
1535 assert_eq!(r.pkt_thresh().unwrap(), INITIAL_PACKET_THRESHOLD);
1536 assert_eq!(r.time_thresh(), INITIAL_TIME_THRESHOLD);
1537
1538 now += between_thresh_ms;
1541
1542 let mut acked = RangeSet::default();
1547 acked.insert(1..2);
1548 assert_eq!(
1549 r.on_ack_received(
1550 &acked,
1551 25,
1552 packet::Epoch::Application,
1553 HandshakeStatus::default(),
1554 now,
1555 None,
1556 "",
1557 )
1558 .unwrap(),
1559 OnAckReceivedOutcome {
1560 lost_packets: 1,
1561 lost_bytes: 1000,
1562 acked_bytes: 1000,
1563 spurious_losses: 0,
1564 }
1565 );
1566 assert_eq!(r.pkt_thresh().unwrap(), INITIAL_PACKET_THRESHOLD);
1567 assert_eq!(r.time_thresh(), INITIAL_TIME_THRESHOLD);
1568
1569 let mut acked = RangeSet::default();
1574 acked.insert(0..1);
1575 assert_eq!(
1576 r.on_ack_received(
1577 &acked,
1578 25,
1579 packet::Epoch::Application,
1580 HandshakeStatus::default(),
1581 now,
1582 None,
1583 "",
1584 )
1585 .unwrap(),
1586 OnAckReceivedOutcome {
1587 lost_packets: 0,
1588 lost_bytes: 0,
1589 acked_bytes: 0,
1590 spurious_losses: 1,
1591 }
1592 );
1593 assert_eq!(r.time_thresh(), PACKET_REORDER_TIME_THRESHOLD);
1595
1596 now += between_thresh_ms;
1599
1600 let mut acked = RangeSet::default();
1605 acked.insert(3..4);
1606 assert_eq!(
1607 r.on_ack_received(
1608 &acked,
1609 25,
1610 packet::Epoch::Application,
1611 HandshakeStatus::default(),
1612 now,
1613 None,
1614 "",
1615 )
1616 .unwrap(),
1617 OnAckReceivedOutcome {
1618 lost_packets: 0,
1619 lost_bytes: 0,
1620 acked_bytes: 1000,
1621 spurious_losses: 0,
1622 }
1623 );
1624
1625 now += THRESH_GAP;
1628
1629 let mut acked = RangeSet::default();
1634 acked.insert(4..5);
1635 assert_eq!(
1636 r.on_ack_received(
1637 &acked,
1638 25,
1639 packet::Epoch::Application,
1640 HandshakeStatus::default(),
1641 now,
1642 None,
1643 "",
1644 )
1645 .unwrap(),
1646 OnAckReceivedOutcome {
1647 lost_packets: 1,
1648 lost_bytes: 1000,
1649 acked_bytes: 1000,
1650 spurious_losses: 0,
1651 }
1652 );
1653 }
1654
1655 #[rstest]
1658 fn relaxed_thresholds_on_reordering(
1659 #[values("bbr2_gcongestion")] cc_algorithm_name: &str,
1660 ) {
1661 let mut cfg = Config::new(crate::PROTOCOL_VERSION).unwrap();
1662 cfg.enable_relaxed_loss_threshold = true;
1663 assert_eq!(cfg.set_cc_algorithm_name(cc_algorithm_name), Ok(()));
1664
1665 let mut now = Instant::now();
1666 let mut r = Recovery::new(&cfg);
1667 assert_eq!(r.rtt(), DEFAULT_INITIAL_RTT);
1668
1669 const THRESH_GAP: Duration = Duration::from_millis(30);
1682 let initial_thresh_ms =
1684 DEFAULT_INITIAL_RTT.mul_f64(INITIAL_TIME_THRESHOLD);
1685 let spurious_thresh_ms: Duration =
1687 DEFAULT_INITIAL_RTT.mul_f64(PACKET_REORDER_TIME_THRESHOLD);
1688 let between_thresh_ms = initial_thresh_ms + THRESH_GAP;
1690 assert!(between_thresh_ms > initial_thresh_ms);
1691 assert!(between_thresh_ms < spurious_thresh_ms);
1692 assert!(between_thresh_ms + THRESH_GAP > spurious_thresh_ms);
1693
1694 for i in 0..6 {
1695 let send_time = now + i * between_thresh_ms;
1696
1697 let p = test_utils::helper_packet_sent(i.into(), send_time, 1000);
1698 r.on_packet_sent(
1699 p,
1700 packet::Epoch::Application,
1701 HandshakeStatus::default(),
1702 send_time,
1703 "",
1704 );
1705 }
1706
1707 assert_eq!(r.sent_packets_len(packet::Epoch::Application), 6);
1708 assert_eq!(r.bytes_in_flight(), 6 * 1000);
1709 assert_eq!(r.pkt_thresh().unwrap(), INITIAL_PACKET_THRESHOLD);
1711 assert_eq!(r.time_thresh(), INITIAL_TIME_THRESHOLD);
1712
1713 now += between_thresh_ms;
1716
1717 let mut acked = RangeSet::default();
1722 acked.insert(1..2);
1723 assert_eq!(
1724 r.on_ack_received(
1725 &acked,
1726 25,
1727 packet::Epoch::Application,
1728 HandshakeStatus::default(),
1729 now,
1730 None,
1731 "",
1732 )
1733 .unwrap(),
1734 OnAckReceivedOutcome {
1735 lost_packets: 1,
1736 lost_bytes: 1000,
1737 acked_bytes: 1000,
1738 spurious_losses: 0,
1739 }
1740 );
1741 assert_eq!(r.pkt_thresh().unwrap(), INITIAL_PACKET_THRESHOLD);
1743 assert_eq!(r.time_thresh(), INITIAL_TIME_THRESHOLD);
1744
1745 let mut acked = RangeSet::default();
1750 acked.insert(0..1);
1751 assert_eq!(
1752 r.on_ack_received(
1753 &acked,
1754 25,
1755 packet::Epoch::Application,
1756 HandshakeStatus::default(),
1757 now,
1758 None,
1759 "",
1760 )
1761 .unwrap(),
1762 OnAckReceivedOutcome {
1763 lost_packets: 0,
1764 lost_bytes: 0,
1765 acked_bytes: 0,
1766 spurious_losses: 1,
1767 }
1768 );
1769 assert_eq!(r.pkt_thresh(), None);
1774 assert_eq!(r.time_thresh(), INITIAL_TIME_THRESHOLD);
1775
1776 now += between_thresh_ms;
1779 now += between_thresh_ms;
1783
1784 let mut acked = RangeSet::default();
1789 acked.insert(3..4);
1790 assert_eq!(
1791 r.on_ack_received(
1792 &acked,
1793 25,
1794 packet::Epoch::Application,
1795 HandshakeStatus::default(),
1796 now,
1797 None,
1798 "",
1799 )
1800 .unwrap(),
1801 OnAckReceivedOutcome {
1802 lost_packets: 1,
1803 lost_bytes: 1000,
1804 acked_bytes: 1000,
1805 spurious_losses: 0,
1806 }
1807 );
1808 assert_eq!(r.pkt_thresh(), None);
1810 assert_eq!(r.time_thresh(), INITIAL_TIME_THRESHOLD);
1811
1812 let mut acked = RangeSet::default();
1821 acked.insert(2..3);
1822 assert_eq!(
1823 r.on_ack_received(
1824 &acked,
1825 25,
1826 packet::Epoch::Application,
1827 HandshakeStatus::default(),
1828 now,
1829 None,
1830 "",
1831 )
1832 .unwrap(),
1833 OnAckReceivedOutcome {
1834 lost_packets: 0,
1835 lost_bytes: 0,
1836 acked_bytes: 0,
1837 spurious_losses: 1,
1838 }
1839 );
1840 assert_eq!(r.pkt_thresh(), None);
1844 let double_time_thresh_overhead =
1845 1.0 + 2.0 * INITIAL_TIME_THRESHOLD_OVERHEAD;
1846 assert_eq!(r.time_thresh(), double_time_thresh_overhead);
1847 }
1848
1849 #[rstest]
1850 fn pacing(
1851 #[values("reno", "cubic", "bbr2_gcongestion")] cc_algorithm_name: &str,
1852 #[values(false, true)] time_sent_set_to_now: bool,
1853 ) {
1854 let pacing_enabled = cc_algorithm_name == "bbr2" ||
1855 cc_algorithm_name == "bbr2_gcongestion";
1856
1857 let mut cfg = Config::new(crate::PROTOCOL_VERSION).unwrap();
1858 assert_eq!(cfg.set_cc_algorithm_name(cc_algorithm_name), Ok(()));
1859
1860 #[cfg(feature = "internal")]
1861 cfg.set_custom_bbr_params(BbrParams {
1862 time_sent_set_to_now: Some(time_sent_set_to_now),
1863 ..Default::default()
1864 });
1865
1866 let mut r = Recovery::new(&cfg);
1867
1868 let mut now = Instant::now();
1869
1870 assert_eq!(r.sent_packets_len(packet::Epoch::Application), 0);
1871
1872 for i in 0..10 {
1874 let p = Sent {
1875 pkt_num: i,
1876 frames: smallvec![],
1877 time_sent: now,
1878 time_acked: None,
1879 time_lost: None,
1880 size: 1200,
1881 ack_eliciting: true,
1882 in_flight: true,
1883 delivered: 0,
1884 delivered_time: now,
1885 first_sent_time: now,
1886 is_app_limited: false,
1887 tx_in_flight: 0,
1888 lost: 0,
1889 has_data: true,
1890 is_pmtud_probe: false,
1891 };
1892
1893 r.on_packet_sent(
1894 p,
1895 packet::Epoch::Application,
1896 HandshakeStatus::default(),
1897 now,
1898 "",
1899 );
1900 }
1901
1902 assert_eq!(r.sent_packets_len(packet::Epoch::Application), 10);
1903 assert_eq!(r.bytes_in_flight(), 12000);
1904 assert_eq!(r.bytes_in_flight_duration(), Duration::ZERO);
1905
1906 if !pacing_enabled {
1907 assert_eq!(r.pacing_rate(), 0);
1908 } else {
1909 assert_eq!(r.pacing_rate(), 103963);
1910 }
1911 assert_eq!(r.get_packet_send_time(now), now);
1912
1913 assert_eq!(r.cwnd(), 12000);
1914 assert_eq!(r.cwnd_available(), 0);
1915
1916 let initial_rtt = Duration::from_millis(50);
1918 now += initial_rtt;
1919
1920 let mut acked = RangeSet::default();
1921 acked.insert(0..10);
1922
1923 assert_eq!(
1924 r.on_ack_received(
1925 &acked,
1926 10,
1927 packet::Epoch::Application,
1928 HandshakeStatus::default(),
1929 now,
1930 None,
1931 "",
1932 )
1933 .unwrap(),
1934 OnAckReceivedOutcome {
1935 lost_packets: 0,
1936 lost_bytes: 0,
1937 acked_bytes: 12000,
1938 spurious_losses: 0,
1939 }
1940 );
1941
1942 assert_eq!(r.sent_packets_len(packet::Epoch::Application), 0);
1943 assert_eq!(r.bytes_in_flight(), 0);
1944 assert_eq!(r.bytes_in_flight_duration(), initial_rtt);
1945 assert_eq!(r.min_rtt(), Some(initial_rtt));
1946 assert_eq!(r.rtt(), initial_rtt);
1947
1948 assert_eq!(r.cwnd(), 12000 + 1200 * 10);
1950
1951 let p = Sent {
1953 pkt_num: 10,
1954 frames: smallvec![],
1955 time_sent: now,
1956 time_acked: None,
1957 time_lost: None,
1958 size: 6000,
1959 ack_eliciting: true,
1960 in_flight: true,
1961 delivered: 0,
1962 delivered_time: now,
1963 first_sent_time: now,
1964 is_app_limited: false,
1965 tx_in_flight: 0,
1966 lost: 0,
1967 has_data: true,
1968 is_pmtud_probe: false,
1969 };
1970
1971 r.on_packet_sent(
1972 p,
1973 packet::Epoch::Application,
1974 HandshakeStatus::default(),
1975 now,
1976 "",
1977 );
1978
1979 assert_eq!(r.sent_packets_len(packet::Epoch::Application), 1);
1980 assert_eq!(r.bytes_in_flight(), 6000);
1981 assert_eq!(r.bytes_in_flight_duration(), initial_rtt);
1982
1983 if !pacing_enabled {
1984 assert_eq!(r.get_packet_send_time(now), now);
1986 } else {
1987 assert_ne!(r.get_packet_send_time(now), now);
1989 }
1990
1991 let p = Sent {
1993 pkt_num: 11,
1994 frames: smallvec![],
1995 time_sent: now,
1996 time_acked: None,
1997 time_lost: None,
1998 size: 6000,
1999 ack_eliciting: true,
2000 in_flight: true,
2001 delivered: 0,
2002 delivered_time: now,
2003 first_sent_time: now,
2004 is_app_limited: false,
2005 tx_in_flight: 0,
2006 lost: 0,
2007 has_data: true,
2008 is_pmtud_probe: false,
2009 };
2010
2011 r.on_packet_sent(
2012 p,
2013 packet::Epoch::Application,
2014 HandshakeStatus::default(),
2015 now,
2016 "",
2017 );
2018
2019 assert_eq!(r.sent_packets_len(packet::Epoch::Application), 2);
2020 assert_eq!(r.bytes_in_flight(), 12000);
2021 assert_eq!(r.bytes_in_flight_duration(), initial_rtt);
2022
2023 let p = Sent {
2025 pkt_num: 12,
2026 frames: smallvec![],
2027 time_sent: now,
2028 time_acked: None,
2029 time_lost: None,
2030 size: 1000,
2031 ack_eliciting: true,
2032 in_flight: true,
2033 delivered: 0,
2034 delivered_time: now,
2035 first_sent_time: now,
2036 is_app_limited: false,
2037 tx_in_flight: 0,
2038 lost: 0,
2039 has_data: true,
2040 is_pmtud_probe: false,
2041 };
2042
2043 r.on_packet_sent(
2044 p,
2045 packet::Epoch::Application,
2046 HandshakeStatus::default(),
2047 now,
2048 "",
2049 );
2050
2051 assert_eq!(r.sent_packets_len(packet::Epoch::Application), 3);
2052 assert_eq!(r.bytes_in_flight(), 13000);
2053 assert_eq!(r.bytes_in_flight_duration(), initial_rtt);
2054
2055 let pacing_rate = if pacing_enabled {
2058 let cwnd_gain: f64 = 2.0;
2059 let bw = r.cwnd() as f64 / cwnd_gain / initial_rtt.as_secs_f64();
2062 bw as u64
2063 } else {
2064 0
2065 };
2066 assert_eq!(r.pacing_rate(), pacing_rate);
2067
2068 let scale_factor = if pacing_enabled {
2069 1.08333332
2072 } else {
2073 1.0
2074 };
2075 assert_eq!(
2076 r.get_packet_send_time(now) - now,
2077 if pacing_enabled {
2078 Duration::from_secs_f64(
2079 scale_factor * 12000.0 / pacing_rate as f64,
2080 )
2081 } else {
2082 Duration::ZERO
2083 }
2084 );
2085 assert_eq!(r.startup_exit(), None);
2086
2087 let reduced_rtt = Duration::from_millis(40);
2088 now += reduced_rtt;
2089
2090 let mut acked = RangeSet::default();
2091 acked.insert(10..11);
2092
2093 assert_eq!(
2094 r.on_ack_received(
2095 &acked,
2096 0,
2097 packet::Epoch::Application,
2098 HandshakeStatus::default(),
2099 now,
2100 None,
2101 "",
2102 )
2103 .unwrap(),
2104 OnAckReceivedOutcome {
2105 lost_packets: 0,
2106 lost_bytes: 0,
2107 acked_bytes: 6000,
2108 spurious_losses: 0,
2109 }
2110 );
2111
2112 let expected_srtt = (7 * initial_rtt + reduced_rtt) / 8;
2113 assert_eq!(r.sent_packets_len(packet::Epoch::Application), 2);
2114 assert_eq!(r.bytes_in_flight(), 7000);
2115 assert_eq!(r.bytes_in_flight_duration(), initial_rtt + reduced_rtt);
2116 assert_eq!(r.min_rtt(), Some(reduced_rtt));
2117 assert_eq!(r.rtt(), expected_srtt);
2118
2119 let mut acked = RangeSet::default();
2120 acked.insert(11..12);
2121
2122 assert_eq!(
2123 r.on_ack_received(
2124 &acked,
2125 0,
2126 packet::Epoch::Application,
2127 HandshakeStatus::default(),
2128 now,
2129 None,
2130 "",
2131 )
2132 .unwrap(),
2133 OnAckReceivedOutcome {
2134 lost_packets: 0,
2135 lost_bytes: 0,
2136 acked_bytes: 6000,
2137 spurious_losses: 0,
2138 }
2139 );
2140
2141 let expected_min_rtt = if pacing_enabled &&
2145 !time_sent_set_to_now &&
2146 cfg!(feature = "internal")
2147 {
2148 reduced_rtt - Duration::from_millis(25)
2149 } else {
2150 reduced_rtt
2151 };
2152
2153 assert_eq!(r.sent_packets_len(packet::Epoch::Application), 1);
2154 assert_eq!(r.bytes_in_flight(), 1000);
2155 assert_eq!(r.bytes_in_flight_duration(), initial_rtt + reduced_rtt);
2156 assert_eq!(r.min_rtt(), Some(expected_min_rtt));
2157
2158 let expected_srtt = (7 * expected_srtt + expected_min_rtt) / 8;
2159 assert_eq!(r.rtt(), expected_srtt);
2160
2161 let mut acked = RangeSet::default();
2162 acked.insert(12..13);
2163
2164 assert_eq!(
2165 r.on_ack_received(
2166 &acked,
2167 0,
2168 packet::Epoch::Application,
2169 HandshakeStatus::default(),
2170 now,
2171 None,
2172 "",
2173 )
2174 .unwrap(),
2175 OnAckReceivedOutcome {
2176 lost_packets: 0,
2177 lost_bytes: 0,
2178 acked_bytes: 1000,
2179 spurious_losses: 0,
2180 }
2181 );
2182
2183 let expected_min_rtt = if pacing_enabled &&
2186 !time_sent_set_to_now &&
2187 cfg!(feature = "internal")
2188 {
2189 Duration::from_millis(0)
2190 } else {
2191 reduced_rtt
2192 };
2193 assert_eq!(r.sent_packets_len(packet::Epoch::Application), 0);
2194 assert_eq!(r.bytes_in_flight(), 0);
2195 assert_eq!(r.bytes_in_flight_duration(), initial_rtt + reduced_rtt);
2196 assert_eq!(r.min_rtt(), Some(expected_min_rtt));
2197
2198 let expected_srtt = (7 * expected_srtt + expected_min_rtt) / 8;
2199 assert_eq!(r.rtt(), expected_srtt);
2200 }
2201
2202 #[rstest]
2203 #[case::bw_estimate_equal_after_first_rtt(1.0, 1.0)]
2206 #[case::bw_estimate_decrease_after_first_rtt(2.0, 1.0)]
2209 #[case::bw_estimate_increase_after_first_rtt(0.5, 0.5)]
2215 #[cfg(feature = "internal")]
2216 fn initial_pacing_rate_override(
2217 #[case] initial_multipler: f64, #[case] expected_multiplier: f64,
2218 ) {
2219 let rtt = Duration::from_millis(50);
2220 let bw = Bandwidth::from_bytes_and_time_delta(12000, rtt);
2221 let initial_pacing_rate_hint = bw * initial_multipler;
2222 let expected_pacing_with_rtt_measurement = bw * expected_multiplier;
2223
2224 let cc_algorithm_name = "bbr2_gcongestion";
2225 let mut cfg = Config::new(crate::PROTOCOL_VERSION).unwrap();
2226 assert_eq!(cfg.set_cc_algorithm_name(cc_algorithm_name), Ok(()));
2227 cfg.set_custom_bbr_params(BbrParams {
2228 initial_pacing_rate_bytes_per_second: Some(
2229 initial_pacing_rate_hint.to_bytes_per_second(),
2230 ),
2231 ..Default::default()
2232 });
2233
2234 let mut r = Recovery::new(&cfg);
2235
2236 let mut now = Instant::now();
2237
2238 assert_eq!(r.sent_packets_len(packet::Epoch::Application), 0);
2239
2240 for i in 0..2 {
2242 let p = test_utils::helper_packet_sent(i, now, 1200);
2243 r.on_packet_sent(
2244 p,
2245 packet::Epoch::Application,
2246 HandshakeStatus::default(),
2247 now,
2248 "",
2249 );
2250 }
2251
2252 assert_eq!(r.sent_packets_len(packet::Epoch::Application), 2);
2253 assert_eq!(r.bytes_in_flight(), 2400);
2254 assert_eq!(r.bytes_in_flight_duration(), Duration::ZERO);
2255
2256 assert_eq!(
2258 r.pacing_rate(),
2259 initial_pacing_rate_hint.to_bytes_per_second()
2260 );
2261 assert_eq!(r.get_packet_send_time(now), now);
2262
2263 assert_eq!(r.cwnd(), 12000);
2264 assert_eq!(r.cwnd_available(), 9600);
2265
2266 now += rtt;
2268
2269 let mut acked = RangeSet::default();
2270 acked.insert(0..2);
2271
2272 assert_eq!(
2273 r.on_ack_received(
2274 &acked,
2275 10,
2276 packet::Epoch::Application,
2277 HandshakeStatus::default(),
2278 now,
2279 None,
2280 "",
2281 )
2282 .unwrap(),
2283 OnAckReceivedOutcome {
2284 lost_packets: 0,
2285 lost_bytes: 0,
2286 acked_bytes: 2400,
2287 spurious_losses: 0,
2288 }
2289 );
2290
2291 assert_eq!(r.sent_packets_len(packet::Epoch::Application), 0);
2292 assert_eq!(r.bytes_in_flight(), 0);
2293 assert_eq!(r.bytes_in_flight_duration(), rtt);
2294 assert_eq!(r.rtt(), rtt);
2295
2296 assert_eq!(
2299 r.pacing_rate(),
2300 expected_pacing_with_rtt_measurement.to_bytes_per_second()
2301 );
2302 }
2303
2304 #[rstest]
2305 fn validate_ack_range_on_ack_received(
2306 #[values("cubic", "bbr2_gcongestion")] cc_algorithm_name: &str,
2307 ) {
2308 let mut cfg = Config::new(crate::PROTOCOL_VERSION).unwrap();
2309 cfg.set_cc_algorithm_name(cc_algorithm_name).unwrap();
2310
2311 let epoch = packet::Epoch::Application;
2312 let mut r = Recovery::new(&cfg);
2313 let mut now = Instant::now();
2314 assert_eq!(r.sent_packets_len(epoch), 0);
2315
2316 let pkt_size = 1000;
2318 let pkt_count = 4;
2319 for pkt_num in 0..pkt_count {
2320 let sent = test_utils::helper_packet_sent(pkt_num, now, pkt_size);
2321 r.on_packet_sent(sent, epoch, HandshakeStatus::default(), now, "");
2322 }
2323 assert_eq!(r.sent_packets_len(epoch), pkt_count as usize);
2324 assert_eq!(r.bytes_in_flight(), pkt_count as usize * pkt_size);
2325 assert!(r.get_largest_acked_on_epoch(epoch).is_none());
2326 assert_eq!(r.largest_sent_pkt_num_on_path(epoch).unwrap(), 3);
2327
2328 now += Duration::from_millis(10);
2330
2331 let mut acked = RangeSet::default();
2333 acked.insert(0..2);
2334
2335 assert_eq!(
2336 r.on_ack_received(
2337 &acked,
2338 25,
2339 epoch,
2340 HandshakeStatus::default(),
2341 now,
2342 None,
2343 "",
2344 )
2345 .unwrap(),
2346 OnAckReceivedOutcome {
2347 lost_packets: 0,
2348 lost_bytes: 0,
2349 acked_bytes: 2 * 1000,
2350 spurious_losses: 0,
2351 }
2352 );
2353
2354 assert_eq!(r.sent_packets_len(epoch), 2);
2355 assert_eq!(r.bytes_in_flight(), 2 * 1000);
2356
2357 assert_eq!(r.get_largest_acked_on_epoch(epoch).unwrap(), 1);
2358 assert_eq!(r.largest_sent_pkt_num_on_path(epoch).unwrap(), 3);
2359
2360 let mut acked = RangeSet::default();
2362 acked.insert(0..10);
2363 assert_eq!(
2364 r.on_ack_received(
2365 &acked,
2366 25,
2367 epoch,
2368 HandshakeStatus::default(),
2369 now,
2370 None,
2371 "",
2372 )
2373 .unwrap(),
2374 OnAckReceivedOutcome {
2375 lost_packets: 0,
2376 lost_bytes: 0,
2377 acked_bytes: 2 * 1000,
2378 spurious_losses: 0,
2379 }
2380 );
2381 assert_eq!(r.sent_packets_len(epoch), 0);
2382 assert_eq!(r.bytes_in_flight(), 0);
2383
2384 assert_eq!(r.get_largest_acked_on_epoch(epoch).unwrap(), 3);
2385 assert_eq!(r.largest_sent_pkt_num_on_path(epoch).unwrap(), 3);
2386 }
2387
2388 #[rstest]
2389 fn pmtud_loss_on_timer(
2390 #[values("reno", "cubic", "bbr2_gcongestion")] cc_algorithm_name: &str,
2391 ) {
2392 let mut cfg = Config::new(crate::PROTOCOL_VERSION).unwrap();
2393 assert_eq!(cfg.set_cc_algorithm_name(cc_algorithm_name), Ok(()));
2394
2395 let mut r = Recovery::new(&cfg);
2396 assert_eq!(r.cwnd(), 12000);
2397
2398 let mut now = Instant::now();
2399
2400 assert_eq!(r.sent_packets_len(packet::Epoch::Application), 0);
2401
2402 let p = Sent {
2404 pkt_num: 0,
2405 frames: smallvec![],
2406 time_sent: now,
2407 time_acked: None,
2408 time_lost: None,
2409 size: 1000,
2410 ack_eliciting: true,
2411 in_flight: true,
2412 delivered: 0,
2413 delivered_time: now,
2414 first_sent_time: now,
2415 is_app_limited: false,
2416 tx_in_flight: 0,
2417 lost: 0,
2418 has_data: false,
2419 is_pmtud_probe: false,
2420 };
2421
2422 r.on_packet_sent(
2423 p,
2424 packet::Epoch::Application,
2425 HandshakeStatus::default(),
2426 now,
2427 "",
2428 );
2429
2430 assert_eq!(r.in_flight_count(packet::Epoch::Application), 1);
2431 assert_eq!(r.sent_packets_len(packet::Epoch::Application), 1);
2432 assert_eq!(r.bytes_in_flight(), 1000);
2433 assert_eq!(r.bytes_in_flight_duration(), Duration::ZERO);
2434
2435 let p = Sent {
2436 pkt_num: 1,
2437 frames: smallvec![],
2438 time_sent: now,
2439 time_acked: None,
2440 time_lost: None,
2441 size: 1000,
2442 ack_eliciting: true,
2443 in_flight: true,
2444 delivered: 0,
2445 delivered_time: now,
2446 first_sent_time: now,
2447 is_app_limited: false,
2448 tx_in_flight: 0,
2449 lost: 0,
2450 has_data: false,
2451 is_pmtud_probe: true,
2452 };
2453
2454 r.on_packet_sent(
2455 p,
2456 packet::Epoch::Application,
2457 HandshakeStatus::default(),
2458 now,
2459 "",
2460 );
2461
2462 assert_eq!(r.in_flight_count(packet::Epoch::Application), 2);
2463
2464 let p = Sent {
2465 pkt_num: 2,
2466 frames: smallvec![],
2467 time_sent: now,
2468 time_acked: None,
2469 time_lost: None,
2470 size: 1000,
2471 ack_eliciting: true,
2472 in_flight: true,
2473 delivered: 0,
2474 delivered_time: now,
2475 first_sent_time: now,
2476 is_app_limited: false,
2477 tx_in_flight: 0,
2478 lost: 0,
2479 has_data: false,
2480 is_pmtud_probe: false,
2481 };
2482
2483 r.on_packet_sent(
2484 p,
2485 packet::Epoch::Application,
2486 HandshakeStatus::default(),
2487 now,
2488 "",
2489 );
2490
2491 assert_eq!(r.in_flight_count(packet::Epoch::Application), 3);
2492
2493 now += Duration::from_millis(10);
2495
2496 let mut acked = RangeSet::default();
2498 acked.insert(0..1);
2499 acked.insert(2..3);
2500
2501 assert_eq!(
2502 r.on_ack_received(
2503 &acked,
2504 25,
2505 packet::Epoch::Application,
2506 HandshakeStatus::default(),
2507 now,
2508 None,
2509 "",
2510 )
2511 .unwrap(),
2512 OnAckReceivedOutcome {
2513 lost_packets: 0,
2514 lost_bytes: 0,
2515 acked_bytes: 2 * 1000,
2516 spurious_losses: 0,
2517 }
2518 );
2519
2520 assert_eq!(r.sent_packets_len(packet::Epoch::Application), 2);
2521 assert_eq!(r.bytes_in_flight(), 1000);
2522 assert_eq!(r.bytes_in_flight_duration(), Duration::from_millis(10));
2523 assert_eq!(r.lost_count(), 0);
2524
2525 now = r.loss_detection_timer().unwrap();
2527
2528 r.on_loss_detection_timeout(HandshakeStatus::default(), now, "");
2530 assert_eq!(r.loss_probes(packet::Epoch::Application), 0);
2531
2532 assert_eq!(r.sent_packets_len(packet::Epoch::Application), 2);
2533 assert_eq!(r.in_flight_count(packet::Epoch::Application), 0);
2534 assert_eq!(r.bytes_in_flight(), 0);
2535 assert_eq!(r.bytes_in_flight_duration(), Duration::from_micros(11250));
2536 assert_eq!(r.cwnd(), 12000);
2537
2538 assert_eq!(r.lost_count(), 0);
2539
2540 now += r.rtt();
2542
2543 assert_eq!(
2544 r.detect_lost_packets_for_test(packet::Epoch::Application, now),
2545 (0, 0)
2546 );
2547
2548 assert_eq!(r.sent_packets_len(packet::Epoch::Application), 0);
2549 assert_eq!(r.in_flight_count(packet::Epoch::Application), 0);
2550 assert_eq!(r.bytes_in_flight(), 0);
2551 assert_eq!(r.bytes_in_flight_duration(), Duration::from_micros(11250));
2552 assert_eq!(r.lost_count(), 0);
2553 assert_eq!(r.startup_exit(), None);
2554 }
2555
2556 #[rstest]
2559 fn congestion_delivery_rate(
2560 #[values("reno", "cubic", "bbr2")] cc_algorithm_name: &str,
2561 ) {
2562 let mut cfg = Config::new(crate::PROTOCOL_VERSION).unwrap();
2563 assert_eq!(cfg.set_cc_algorithm_name(cc_algorithm_name), Ok(()));
2564
2565 let mut r = Recovery::new(&cfg);
2566 assert_eq!(r.cwnd(), 12000);
2567
2568 let now = Instant::now();
2569
2570 let mut total_bytes_sent = 0;
2571 for pn in 0..10 {
2572 let bytes = 1000;
2574 let sent = test_utils::helper_packet_sent(pn, now, bytes);
2575 r.on_packet_sent(
2576 sent,
2577 packet::Epoch::Application,
2578 HandshakeStatus::default(),
2579 now,
2580 "",
2581 );
2582
2583 total_bytes_sent += bytes;
2584 }
2585
2586 let interval = Duration::from_secs(10);
2588 let mut acked = RangeSet::default();
2589 acked.insert(0..10);
2590 assert_eq!(
2591 r.on_ack_received(
2592 &acked,
2593 25,
2594 packet::Epoch::Application,
2595 HandshakeStatus::default(),
2596 now + interval,
2597 None,
2598 "",
2599 )
2600 .unwrap(),
2601 OnAckReceivedOutcome {
2602 lost_packets: 0,
2603 lost_bytes: 0,
2604 acked_bytes: total_bytes_sent,
2605 spurious_losses: 0,
2606 }
2607 );
2608 assert_eq!(r.delivery_rate().to_bytes_per_second(), 1000);
2609 assert_eq!(r.min_rtt().unwrap(), interval);
2610 assert_eq!(
2612 total_bytes_sent as u64 / interval.as_secs(),
2613 r.delivery_rate().to_bytes_per_second()
2614 );
2615 assert_eq!(r.startup_exit(), None);
2616 }
2617
2618 #[rstest]
2619 fn acks_with_no_retransmittable_data(
2620 #[values("reno", "cubic", "bbr2_gcongestion")] cc_algorithm_name: &str,
2621 ) {
2622 let rtt = Duration::from_millis(100);
2623
2624 let mut cfg = Config::new(crate::PROTOCOL_VERSION).unwrap();
2625 assert_eq!(cfg.set_cc_algorithm_name(cc_algorithm_name), Ok(()));
2626
2627 let mut r = Recovery::new(&cfg);
2628
2629 let mut now = Instant::now();
2630
2631 assert_eq!(r.sent_packets_len(packet::Epoch::Application), 0);
2632
2633 let mut next_packet = 0;
2634 for _ in 0..3 {
2636 let p = test_utils::helper_packet_sent(next_packet, now, 1200);
2637 next_packet += 1;
2638 r.on_packet_sent(
2639 p,
2640 packet::Epoch::Application,
2641 HandshakeStatus::default(),
2642 now,
2643 "",
2644 );
2645 }
2646
2647 assert_eq!(r.sent_packets_len(packet::Epoch::Application), 3);
2648 assert_eq!(r.bytes_in_flight(), 3600);
2649 assert_eq!(r.bytes_in_flight_duration(), Duration::ZERO);
2650
2651 assert_eq!(
2652 r.pacing_rate(),
2653 if cc_algorithm_name == "bbr2_gcongestion" {
2654 103963
2655 } else {
2656 0
2657 },
2658 );
2659 assert_eq!(r.get_packet_send_time(now), now);
2660 assert_eq!(r.cwnd(), 12000);
2661 assert_eq!(r.cwnd_available(), 8400);
2662
2663 now += rtt;
2665
2666 let mut acked = RangeSet::default();
2667 acked.insert(0..3);
2668
2669 assert_eq!(
2670 r.on_ack_received(
2671 &acked,
2672 10,
2673 packet::Epoch::Application,
2674 HandshakeStatus::default(),
2675 now,
2676 None,
2677 "",
2678 )
2679 .unwrap(),
2680 OnAckReceivedOutcome {
2681 lost_packets: 0,
2682 lost_bytes: 0,
2683 acked_bytes: 3600,
2684 spurious_losses: 0,
2685 }
2686 );
2687
2688 assert_eq!(r.sent_packets_len(packet::Epoch::Application), 0);
2689 assert_eq!(r.bytes_in_flight(), 0);
2690 assert_eq!(r.bytes_in_flight_duration(), rtt);
2691 assert_eq!(r.rtt(), rtt);
2692
2693 assert_eq!(
2696 r.pacing_rate(),
2697 if cc_algorithm_name == "bbr2_gcongestion" {
2698 120000
2699 } else {
2700 0
2701 },
2702 );
2703
2704 for iter in 3..1000 {
2706 let mut p = test_utils::helper_packet_sent(next_packet, now, 1200);
2707 p.in_flight = false;
2710 next_packet += 1;
2711 r.on_packet_sent(
2712 p,
2713 packet::Epoch::Application,
2714 HandshakeStatus::default(),
2715 now,
2716 "",
2717 );
2718
2719 now += rtt;
2720
2721 let mut acked = RangeSet::default();
2722 acked.insert(iter..(iter + 1));
2723
2724 assert_eq!(
2725 r.on_ack_received(
2726 &acked,
2727 10,
2728 packet::Epoch::Application,
2729 HandshakeStatus::default(),
2730 now,
2731 None,
2732 "",
2733 )
2734 .unwrap(),
2735 OnAckReceivedOutcome {
2736 lost_packets: 0,
2737 lost_bytes: 0,
2738 acked_bytes: 0,
2739 spurious_losses: 0,
2740 }
2741 );
2742
2743 assert_eq!(r.startup_exit(), None, "{iter}");
2745
2746 assert_eq!(
2748 r.sent_packets_len(packet::Epoch::Application),
2749 0,
2750 "{iter}"
2751 );
2752 assert_eq!(r.bytes_in_flight(), 0, "{iter}");
2753 assert_eq!(r.bytes_in_flight_duration(), rtt, "{iter}");
2754 assert_eq!(
2755 r.pacing_rate(),
2756 if cc_algorithm_name == "bbr2_gcongestion" ||
2757 cc_algorithm_name == "bbr2"
2758 {
2759 120000
2760 } else {
2761 0
2762 },
2763 "{iter}"
2764 );
2765 }
2766 }
2767 #[rstest]
2768 fn pto_overflow_reproduction(
2769 #[values("reno", "cubic", "bbr2_gcongestion")] cc_algorithm_name: &str,
2770 ) {
2771 let mut cfg = Config::new(crate::PROTOCOL_VERSION).unwrap();
2772 assert_eq!(cfg.set_cc_algorithm_name(cc_algorithm_name), Ok(()));
2773 let mut r = Recovery::new(&cfg);
2774 let now = Instant::now();
2775
2776 let handshake_status = HandshakeStatus {
2778 has_handshake_keys: true,
2779 peer_verified_address: true,
2780 completed: false,
2781 };
2782
2783 let p_initial = Sent {
2785 pkt_num: 0,
2786 frames: smallvec::smallvec![],
2787 time_sent: now,
2788 time_acked: None,
2789 time_lost: None,
2790 size: 1000,
2791 ack_eliciting: true,
2792 in_flight: true,
2793 delivered: 0,
2794 delivered_time: now,
2795 first_sent_time: now,
2796 is_app_limited: false,
2797 tx_in_flight: 0,
2798 lost: 0,
2799 has_data: false,
2800 is_pmtud_probe: false,
2801 };
2802 r.on_packet_sent(
2803 p_initial,
2804 packet::Epoch::Initial,
2805 handshake_status,
2806 now,
2807 "",
2808 );
2809
2810 assert!(r.loss_detection_timer().is_some());
2812
2813 let p_app = Sent {
2815 pkt_num: 0, frames: smallvec::smallvec![],
2817 time_sent: now,
2818 time_acked: None,
2819 time_lost: None,
2820 size: 1000,
2821 ack_eliciting: true,
2822 in_flight: true,
2823 delivered: 0,
2824 delivered_time: now,
2825 first_sent_time: now,
2826 is_app_limited: false,
2827 tx_in_flight: 0,
2828
2829 lost: 0,
2830 has_data: true,
2831 is_pmtud_probe: false,
2832 };
2833 r.on_packet_sent(
2834 p_app,
2835 packet::Epoch::Application,
2836 handshake_status,
2837 now,
2838 "",
2839 );
2840
2841 let mut ranges = RangeSet::default();
2845 ranges.insert(0..1);
2846 r.on_ack_received(
2847 &ranges,
2848 0,
2849 packet::Epoch::Initial,
2850 handshake_status,
2851 now,
2852 None,
2853 "",
2854 )
2855 .unwrap();
2856
2857 assert!(r.loss_detection_timer().is_none());
2863 }
2864
2865 #[rstest]
2868 fn pto_does_not_duplicate_frames_on_consecutive_timeouts(
2869 #[values("reno", "cubic", "bbr2_gcongestion")] cc_algorithm_name: &str,
2870 ) {
2871 let mut cfg = Config::new(crate::PROTOCOL_VERSION).unwrap();
2872 assert_eq!(cfg.set_cc_algorithm_name(cc_algorithm_name), Ok(()));
2873
2874 let mut r = Recovery::new(&cfg);
2875 let mut now = Instant::now();
2876
2877 let frames = smallvec![frame::Frame::Stream {
2879 stream_id: 4,
2880 data: RangeBuf::from(b"test", 0, false),
2881 },];
2882
2883 let p = Sent {
2884 pkt_num: 0,
2885 frames: frames.clone(),
2886 time_sent: now,
2887 time_acked: None,
2888 time_lost: None,
2889 size: 1000,
2890 ack_eliciting: true,
2891 in_flight: true,
2892 delivered: 0,
2893 delivered_time: now,
2894 first_sent_time: now,
2895 is_app_limited: false,
2896 tx_in_flight: 0,
2897 lost: 0,
2898 has_data: true,
2899 is_pmtud_probe: false,
2900 };
2901
2902 r.on_packet_sent(
2903 p,
2904 packet::Epoch::Application,
2905 HandshakeStatus::default(),
2906 now,
2907 "",
2908 );
2909
2910 assert_eq!(r.lost_frames_count(packet::Epoch::Application), 0);
2912 assert_eq!(r.lost_count(), 0);
2913
2914 now = r.loss_detection_timer().unwrap();
2916 r.on_loss_detection_timeout(HandshakeStatus::default(), now, "");
2917
2918 assert_eq!(r.pto_count(), 1);
2919 let frames_after_first_pto =
2920 r.lost_frames_count(packet::Epoch::Application);
2921 assert_eq!(
2922 frames_after_first_pto, 1,
2923 "First PTO should add exactly 1 frame"
2924 );
2925 assert_eq!(
2926 r.lost_count(),
2927 0,
2928 "PTO doesn't declare packets lost (no CC impact)"
2929 );
2930
2931 now = r.loss_detection_timer().unwrap();
2935 r.on_loss_detection_timeout(HandshakeStatus::default(), now, "");
2936
2937 assert_eq!(r.pto_count(), 2);
2938 let frames_after_second_pto =
2939 r.lost_frames_count(packet::Epoch::Application);
2940 assert_eq!(
2941 frames_after_second_pto, frames_after_first_pto,
2942 "Second PTO must NOT add duplicate frames (fix: `if \
2943 lost_frames.is_empty()`)"
2944 );
2945
2946 now = r.loss_detection_timer().unwrap();
2948 r.on_loss_detection_timeout(HandshakeStatus::default(), now, "");
2949
2950 assert_eq!(r.pto_count(), 3);
2951 let frames_after_third_pto =
2952 r.lost_frames_count(packet::Epoch::Application);
2953 assert_eq!(
2954 frames_after_third_pto, frames_after_first_pto,
2955 "Third PTO must NOT add duplicate frames"
2956 );
2957
2958 assert_eq!(r.sent_packets_len(packet::Epoch::Application), 1);
2960 assert_eq!(r.lost_count(), 0);
2962 }
2963
2964 #[rstest]
2967 fn pto_send_on_path_retransmits_without_loss(
2968 #[values("reno", "cubic", "bbr2_gcongestion")] cc_algorithm_name: &str,
2969 ) {
2970 use crate::test_utils;
2971
2972 let mut pipe = test_utils::Pipe::new(cc_algorithm_name).unwrap();
2973
2974 assert_eq!(pipe.handshake(), Ok(()));
2976
2977 assert_eq!(pipe.client.stream_send(4, b"hello", false), Ok(5));
2979
2980 let mut buf = [0; 65535];
2981
2982 let (len1, _) = pipe.client.send(&mut buf).unwrap();
2984 assert!(len1 > 0);
2985
2986 let initial_lost_count = pipe
2988 .client
2989 .paths
2990 .get_active()
2991 .unwrap()
2992 .recovery
2993 .lost_count();
2994 assert_eq!(initial_lost_count, 0, "No packets should be lost initially");
2995
2996 let initial_lost_frames = pipe
2998 .client
2999 .paths
3000 .get_active()
3001 .unwrap()
3002 .recovery
3003 .lost_frames_count(packet::Epoch::Application);
3004 assert_eq!(
3005 initial_lost_frames, 0,
3006 "No frames should be in lost_frames initially"
3007 );
3008
3009 let timer = pipe.client.timeout().unwrap();
3011 std::thread::sleep(timer + Duration::from_millis(1));
3012
3013 pipe.client.on_timeout();
3015
3016 let lost_frames_after_pto = pipe
3018 .client
3019 .paths
3020 .get_active()
3021 .unwrap()
3022 .recovery
3023 .lost_frames_count(packet::Epoch::Application);
3024 assert!(
3025 lost_frames_after_pto > 0,
3026 "PTO should add frames to lost_frames for retransmission"
3027 );
3028
3029 let lost_count_after_pto = pipe
3031 .client
3032 .paths
3033 .get_active()
3034 .unwrap()
3035 .recovery
3036 .lost_count();
3037 assert_eq!(
3038 lost_count_after_pto, 0,
3039 "PTO should not increment lost_count"
3040 );
3041
3042 let (len2, _) = pipe.client.send(&mut buf).unwrap();
3044 assert!(len2 > 0, "Should send PTO probe packet");
3045
3046 let lost_count_after_send = pipe
3048 .client
3049 .paths
3050 .get_active()
3051 .unwrap()
3052 .recovery
3053 .lost_count();
3054 assert_eq!(
3055 lost_count_after_send, 0,
3056 "Sending PTO probe should not increment lost_count"
3057 );
3058
3059 assert_eq!(pipe.server_recv(&mut buf[..len2]), Ok(len2));
3061
3062 let mut recv_buf = [0; 100];
3064 assert_eq!(pipe.server.stream_recv(4, &mut recv_buf), Ok((5, false)));
3065 assert_eq!(&recv_buf[..5], b"hello");
3066
3067 let final_lost_count = pipe
3069 .client
3070 .paths
3071 .get_active()
3072 .unwrap()
3073 .recovery
3074 .lost_count();
3075 assert_eq!(
3076 final_lost_count, 0,
3077 "No packets should be marked as lost - PTO only retransmits"
3078 );
3079 }
3080}
3081
3082mod bandwidth;
3083mod bytes_in_flight;
3084mod congestion;
3085mod gcongestion;
3086mod rtt;