1use crate::packet;
2use crate::recovery::OnLossDetectionTimeoutOutcome;
3use crate::recovery::INITIAL_TIME_THRESHOLD_OVERHEAD;
4use crate::recovery::TIME_THRESHOLD_OVERHEAD_MULTIPLIER;
5use crate::Error;
6use crate::Result;
7
8use std::collections::VecDeque;
9use std::time::Duration;
10use std::time::Instant;
11
12use smallvec::SmallVec;
13
14#[cfg(feature = "qlog")]
15use qlog::events::EventData;
16
17#[cfg(feature = "qlog")]
18use crate::recovery::QlogMetrics;
19
20use crate::frame;
21
22use crate::recovery::bytes_in_flight::BytesInFlight;
23use crate::recovery::gcongestion::Bandwidth;
24use crate::recovery::rtt::RttStats;
25use crate::recovery::CongestionControlAlgorithm;
26use crate::recovery::HandshakeStatus;
27use crate::recovery::LossDetectionTimer;
28use crate::recovery::OnAckReceivedOutcome;
29use crate::recovery::RangeSet;
30use crate::recovery::RecoveryConfig;
31use crate::recovery::RecoveryOps;
32use crate::recovery::RecoveryStats;
33use crate::recovery::ReleaseDecision;
34use crate::recovery::Sent;
35use crate::recovery::StartupExit;
36use crate::recovery::GRANULARITY;
37use crate::recovery::INITIAL_PACKET_THRESHOLD;
38use crate::recovery::INITIAL_TIME_THRESHOLD;
39use crate::recovery::MAX_OUTSTANDING_NON_ACK_ELICITING;
40use crate::recovery::MAX_PACKET_THRESHOLD;
41use crate::recovery::MAX_PTO_PROBES_COUNT;
42use crate::recovery::PACKET_REORDER_TIME_THRESHOLD;
43
44use super::bbr2::BBRv2;
45use super::pacer::Pacer;
46use super::Acked;
47use super::Lost;
48
49const MAX_WINDOW_PACKETS: usize = 20_000;
51
52#[derive(Debug)]
53struct SentPacket {
54 pkt_num: u64,
55 status: SentStatus,
56}
57
58#[derive(Debug)]
59enum SentStatus {
60 Sent {
61 time_sent: Instant,
62 ack_eliciting: bool,
63 in_flight: bool,
64 has_data: bool,
65 is_pmtud_probe: bool,
66 sent_bytes: usize,
67 frames: SmallVec<[frame::Frame; 1]>,
68 },
69 Acked,
70 Lost,
71}
72
73impl SentStatus {
74 fn ack(&mut self) -> Self {
75 std::mem::replace(self, SentStatus::Acked)
76 }
77
78 fn lose(&mut self) -> Self {
79 if !matches!(self, SentStatus::Acked) {
80 std::mem::replace(self, SentStatus::Lost)
81 } else {
82 SentStatus::Acked
83 }
84 }
85}
86
87#[derive(Default)]
88struct RecoveryEpoch {
89 time_of_last_ack_eliciting_packet: Option<Instant>,
91
92 largest_acked_packet: Option<u64>,
95
96 loss_time: Option<Instant>,
99
100 sent_packets: VecDeque<SentPacket>,
103
104 loss_probes: usize,
105 pkts_in_flight: usize,
106
107 acked_frames: VecDeque<frame::Frame>,
108 lost_frames: VecDeque<frame::Frame>,
109
110 #[allow(dead_code)]
112 test_largest_sent_pkt_num_on_path: Option<u64>,
113}
114
115struct AckedDetectionResult {
116 acked_bytes: usize,
117 spurious_losses: usize,
118 spurious_pkt_thresh: Option<u64>,
119 has_ack_eliciting: bool,
120}
121
122struct LossDetectionResult {
123 lost_bytes: usize,
124 lost_packets: usize,
125
126 pmtud_lost_bytes: usize,
127 pmtud_lost_packets: SmallVec<[u64; 1]>,
128}
129
130impl RecoveryEpoch {
131 fn discard(&mut self, cc: &mut Pacer) -> usize {
134 let unacked_bytes = self
135 .sent_packets
136 .drain(..)
137 .map(|p| {
138 if let SentPacket {
139 status:
140 SentStatus::Sent {
141 in_flight,
142 sent_bytes,
143 ..
144 },
145 pkt_num,
146 } = p
147 {
148 cc.on_packet_neutered(pkt_num);
149 if in_flight {
150 return sent_bytes;
151 }
152 }
153 0
154 })
155 .sum();
156
157 std::mem::take(&mut self.sent_packets);
158 self.time_of_last_ack_eliciting_packet = None;
159 self.loss_time = None;
160 self.loss_probes = 0;
161 self.pkts_in_flight = 0;
162
163 unacked_bytes
164 }
165
166 fn detect_and_remove_acked_packets(
168 &mut self, peer_sent_ack_ranges: &RangeSet, newly_acked: &mut Vec<Acked>,
169 skip_pn: Option<u64>, trace_id: &str,
170 ) -> Result<AckedDetectionResult> {
171 newly_acked.clear();
172
173 let mut acked_bytes = 0;
174 let mut spurious_losses = 0;
175 let mut spurious_pkt_thresh = None;
176 let mut has_ack_eliciting = false;
177
178 let largest_ack_received = peer_sent_ack_ranges.last().unwrap();
179 let largest_acked = self
180 .largest_acked_packet
181 .unwrap_or(0)
182 .max(largest_ack_received);
183
184 for peer_sent_range in peer_sent_ack_ranges.iter() {
185 if skip_pn.is_some_and(|skip_pn| peer_sent_range.contains(&skip_pn)) {
186 return Err(Error::OptimisticAckDetected);
191 }
192
193 let start = if self
196 .sent_packets
197 .front()
198 .filter(|e| e.pkt_num >= peer_sent_range.start)
199 .is_some()
200 {
201 0
203 } else {
204 self.sent_packets
205 .binary_search_by_key(&peer_sent_range.start, |p| p.pkt_num)
206 .unwrap_or_else(|e| e)
207 };
208
209 for SentPacket { pkt_num, status } in
210 self.sent_packets.range_mut(start..)
211 {
212 if *pkt_num < peer_sent_range.end {
213 match status.ack() {
214 SentStatus::Sent {
215 time_sent,
216 in_flight,
217 sent_bytes,
218 frames,
219 ack_eliciting,
220 ..
221 } => {
222 if in_flight {
223 self.pkts_in_flight -= 1;
224 acked_bytes += sent_bytes;
225 }
226 newly_acked.push(Acked {
227 pkt_num: *pkt_num,
228 time_sent,
229 });
230
231 self.acked_frames.extend(frames);
232
233 has_ack_eliciting |= ack_eliciting;
234
235 trace!("{trace_id} packet newly acked {pkt_num}");
236 },
237
238 SentStatus::Acked => {},
239 SentStatus::Lost => {
240 spurious_losses += 1;
242 spurious_pkt_thresh
243 .get_or_insert(largest_acked - *pkt_num + 1);
244 },
245 }
246 } else {
247 break;
248 }
249 }
250 }
251
252 self.drain_acked_and_lost_packets();
253
254 Ok(AckedDetectionResult {
255 acked_bytes,
256 spurious_losses,
257 spurious_pkt_thresh,
258 has_ack_eliciting,
259 })
260 }
261
262 fn detect_and_remove_lost_packets(
263 &mut self, loss_delay: Duration, pkt_thresh: Option<u64>, now: Instant,
264 newly_lost: &mut Vec<Lost>,
265 ) -> LossDetectionResult {
266 newly_lost.clear();
267 let mut lost_bytes = 0;
268 self.loss_time = None;
269
270 let lost_send_time = now.checked_sub(loss_delay).unwrap();
271 let largest_acked = self.largest_acked_packet.unwrap_or(0);
272 let mut pmtud_lost_bytes = 0;
273 let mut pmtud_lost_packets = SmallVec::new();
274
275 for SentPacket { pkt_num, status } in &mut self.sent_packets {
276 if *pkt_num > largest_acked {
277 break;
278 }
279
280 if let SentStatus::Sent { time_sent, .. } = status {
281 let loss_by_time = *time_sent <= lost_send_time;
282 let loss_by_pkt = match pkt_thresh {
283 Some(pkt_thresh) => largest_acked >= *pkt_num + pkt_thresh,
284 None => false,
285 };
286
287 if loss_by_time || loss_by_pkt {
288 if let SentStatus::Sent {
289 in_flight,
290 sent_bytes,
291 frames,
292 is_pmtud_probe,
293 ..
294 } = status.lose()
295 {
296 self.lost_frames.extend(frames);
297
298 if in_flight {
299 self.pkts_in_flight -= 1;
300
301 if is_pmtud_probe {
302 pmtud_lost_bytes += sent_bytes;
303 pmtud_lost_packets.push(*pkt_num);
304 continue;
306 }
307
308 lost_bytes += sent_bytes;
309 }
310
311 newly_lost.push(Lost {
312 packet_number: *pkt_num,
313 bytes_lost: sent_bytes,
314 });
315 }
316 } else {
317 self.loss_time = Some(*time_sent + loss_delay);
318 break;
319 }
320 }
321 }
322
323 LossDetectionResult {
324 lost_bytes,
325 lost_packets: newly_lost.len(),
326
327 pmtud_lost_bytes,
328 pmtud_lost_packets,
329 }
330 }
331
332 fn drain_acked_and_lost_packets(&mut self) {
336 while let Some(SentPacket {
337 status: SentStatus::Acked | SentStatus::Lost,
338 ..
339 }) = self.sent_packets.front()
340 {
341 self.sent_packets.pop_front();
342 }
343 }
344
345 fn least_unacked(&self) -> u64 {
346 for pkt in self.sent_packets.iter() {
347 if let SentPacket {
348 pkt_num,
349 status: SentStatus::Sent { .. },
350 } = pkt
351 {
352 return *pkt_num;
353 }
354 }
355
356 self.largest_acked_packet.unwrap_or(0) + 1
357 }
358}
359
360struct LossThreshold {
361 pkt_thresh: Option<u64>,
362 time_thresh: f64,
363
364 time_thresh_overhead: Option<f64>,
373}
374
375impl LossThreshold {
376 fn new(recovery_config: &RecoveryConfig) -> Self {
377 let time_thresh_overhead =
378 if recovery_config.enable_relaxed_loss_threshold {
379 Some(INITIAL_TIME_THRESHOLD_OVERHEAD)
380 } else {
381 None
382 };
383 LossThreshold {
384 pkt_thresh: Some(INITIAL_PACKET_THRESHOLD),
385 time_thresh: INITIAL_TIME_THRESHOLD,
386 time_thresh_overhead,
387 }
388 }
389
390 fn pkt_thresh(&self) -> Option<u64> {
391 self.pkt_thresh
392 }
393
394 fn time_thresh(&self) -> f64 {
395 self.time_thresh
396 }
397
398 fn on_spurious_loss(&mut self, new_pkt_thresh: u64) {
399 match &mut self.time_thresh_overhead {
400 Some(time_thresh_overhead) => {
401 if self.pkt_thresh.is_some() {
402 self.pkt_thresh = None;
404 } else {
405 *time_thresh_overhead *= TIME_THRESHOLD_OVERHEAD_MULTIPLIER;
408 *time_thresh_overhead = time_thresh_overhead.min(1.0);
409
410 self.time_thresh = 1.0 + *time_thresh_overhead;
411 }
412 },
413 None => {
414 let new_packet_threshold = self
415 .pkt_thresh
416 .expect("packet threshold should always be Some when `enable_relaxed_loss_threshold` is false")
417 .max(new_pkt_thresh.min(MAX_PACKET_THRESHOLD));
418 self.pkt_thresh = Some(new_packet_threshold);
419
420 self.time_thresh = PACKET_REORDER_TIME_THRESHOLD;
421 },
422 }
423 }
424}
425
426pub struct GRecovery {
427 epochs: [RecoveryEpoch; packet::Epoch::count()],
428
429 loss_timer: LossDetectionTimer,
430
431 pto_count: u32,
432
433 rtt_stats: RttStats,
434
435 recovery_stats: RecoveryStats,
436
437 pub lost_count: usize,
438
439 pub lost_spurious_count: usize,
440
441 loss_thresh: LossThreshold,
442
443 bytes_in_flight: BytesInFlight,
444
445 bytes_sent: usize,
446
447 pub bytes_lost: u64,
448
449 max_datagram_size: usize,
450 time_sent_set_to_now: bool,
451
452 #[cfg(feature = "qlog")]
453 qlog_metrics: QlogMetrics,
454
455 #[cfg(feature = "qlog")]
456 qlog_prev_cc_state: &'static str,
457
458 outstanding_non_ack_eliciting: usize,
460
461 newly_acked: Vec<Acked>,
463
464 lost_reuse: Vec<Lost>,
467
468 pacer: Pacer,
469}
470
471impl GRecovery {
472 #[cfg(feature = "qlog")]
473 fn send_rate(&self) -> Bandwidth {
474 self.pacer.send_rate().unwrap_or(Bandwidth::zero())
475 }
476
477 #[cfg(feature = "qlog")]
478 fn ack_rate(&self) -> Bandwidth {
479 self.pacer.ack_rate().unwrap_or(Bandwidth::zero())
480 }
481
482 pub fn new(recovery_config: &RecoveryConfig) -> Option<Self> {
483 let cc = match recovery_config.cc_algorithm {
484 CongestionControlAlgorithm::Bbr2Gcongestion => BBRv2::new(
485 recovery_config.initial_congestion_window_packets,
486 MAX_WINDOW_PACKETS,
487 recovery_config.max_send_udp_payload_size,
488 recovery_config.initial_rtt,
489 recovery_config.custom_bbr_params.as_ref(),
490 ),
491 _ => return None,
492 };
493
494 Some(Self {
495 epochs: Default::default(),
496 rtt_stats: RttStats::new(
497 recovery_config.initial_rtt,
498 recovery_config.max_ack_delay,
499 ),
500 recovery_stats: RecoveryStats::default(),
501 loss_timer: Default::default(),
502 pto_count: 0,
503
504 lost_count: 0,
505 lost_spurious_count: 0,
506
507 loss_thresh: LossThreshold::new(recovery_config),
508 bytes_in_flight: Default::default(),
509 bytes_sent: 0,
510 bytes_lost: 0,
511
512 max_datagram_size: recovery_config.max_send_udp_payload_size,
513 time_sent_set_to_now: cc.time_sent_set_to_now(),
514
515 #[cfg(feature = "qlog")]
516 qlog_metrics: QlogMetrics::default(),
517
518 #[cfg(feature = "qlog")]
519 qlog_prev_cc_state: "",
520
521 outstanding_non_ack_eliciting: 0,
522
523 pacer: Pacer::new(
524 recovery_config.pacing,
525 cc,
526 recovery_config
527 .max_pacing_rate
528 .map(Bandwidth::from_mbits_per_second),
529 ),
530
531 newly_acked: Vec::new(),
532 lost_reuse: Vec::new(),
533 })
534 }
535
536 fn detect_and_remove_lost_packets(
537 &mut self, epoch: packet::Epoch, now: Instant,
538 ) -> (usize, usize) {
539 let loss_delay =
540 self.rtt_stats.loss_delay(self.loss_thresh.time_thresh());
541 let lost = &mut self.lost_reuse;
542
543 let LossDetectionResult {
544 lost_bytes,
545 lost_packets,
546 pmtud_lost_bytes,
547 pmtud_lost_packets,
548 } = self.epochs[epoch].detect_and_remove_lost_packets(
549 loss_delay,
550 self.loss_thresh.pkt_thresh(),
551 now,
552 lost,
553 );
554
555 self.bytes_in_flight
556 .saturating_subtract(lost_bytes + pmtud_lost_bytes, now);
557
558 for pkt in pmtud_lost_packets {
559 self.pacer.on_packet_neutered(pkt);
560 }
561
562 (lost_bytes, lost_packets)
563 }
564
565 fn loss_time_and_space(&self) -> (Option<Instant>, packet::Epoch) {
566 let mut epoch = packet::Epoch::Initial;
567 let mut time = self.epochs[epoch].loss_time;
568
569 for e in [packet::Epoch::Handshake, packet::Epoch::Application] {
571 let new_time = self.epochs[e].loss_time;
572 if time.is_none() || new_time < time {
573 time = new_time;
574 epoch = e;
575 }
576 }
577
578 (time, epoch)
579 }
580
581 fn pto_time_and_space(
582 &self, handshake_status: HandshakeStatus, now: Instant,
583 ) -> (Option<Instant>, packet::Epoch) {
584 let mut duration = self.pto() * (1 << self.pto_count);
585
586 if self.bytes_in_flight.is_zero() {
588 if handshake_status.has_handshake_keys {
589 return (Some(now + duration), packet::Epoch::Handshake);
590 } else {
591 return (Some(now + duration), packet::Epoch::Initial);
592 }
593 }
594
595 let mut pto_timeout = None;
596 let mut pto_space = packet::Epoch::Initial;
597
598 for &e in packet::Epoch::epochs(
600 packet::Epoch::Initial..=packet::Epoch::Application,
601 ) {
602 if self.epochs[e].pkts_in_flight == 0 {
603 continue;
604 }
605
606 if e == packet::Epoch::Application {
607 if !handshake_status.completed {
609 return (pto_timeout, pto_space);
610 }
611
612 duration +=
614 self.rtt_stats.max_ack_delay * 2_u32.pow(self.pto_count);
615 }
616
617 let new_time = self.epochs[e]
618 .time_of_last_ack_eliciting_packet
619 .map(|t| t + duration);
620
621 if pto_timeout.is_none() || new_time < pto_timeout {
622 pto_timeout = new_time;
623 pto_space = e;
624 }
625 }
626
627 (pto_timeout, pto_space)
628 }
629
630 fn set_loss_detection_timer(
631 &mut self, handshake_status: HandshakeStatus, now: Instant,
632 ) {
633 if let (Some(earliest_loss_time), _) = self.loss_time_and_space() {
634 self.loss_timer.update(earliest_loss_time);
636 return;
637 }
638
639 if self.bytes_in_flight.is_zero() &&
640 handshake_status.peer_verified_address
641 {
642 self.loss_timer.clear();
643 return;
644 }
645
646 if let (Some(timeout), _) = self.pto_time_and_space(handshake_status, now)
648 {
649 self.loss_timer.update(timeout);
650 }
651 }
652}
653
654impl RecoveryOps for GRecovery {
655 fn lost_count(&self) -> usize {
656 self.lost_count
657 }
658
659 fn bytes_lost(&self) -> u64 {
660 self.bytes_lost
661 }
662
663 fn should_elicit_ack(&self, epoch: packet::Epoch) -> bool {
664 self.epochs[epoch].loss_probes > 0 ||
665 self.outstanding_non_ack_eliciting >=
666 MAX_OUTSTANDING_NON_ACK_ELICITING
667 }
668
669 fn next_acked_frame(&mut self, epoch: packet::Epoch) -> Option<frame::Frame> {
670 self.epochs[epoch].acked_frames.pop_front()
671 }
672
673 fn next_lost_frame(&mut self, epoch: packet::Epoch) -> Option<frame::Frame> {
674 self.epochs[epoch].lost_frames.pop_front()
675 }
676
677 fn get_largest_acked_on_epoch(&self, epoch: packet::Epoch) -> Option<u64> {
678 self.epochs[epoch].largest_acked_packet
679 }
680
681 fn has_lost_frames(&self, epoch: packet::Epoch) -> bool {
682 !self.epochs[epoch].lost_frames.is_empty()
683 }
684
685 fn loss_probes(&self, epoch: packet::Epoch) -> usize {
686 self.epochs[epoch].loss_probes
687 }
688
689 #[cfg(test)]
690 fn inc_loss_probes(&mut self, epoch: packet::Epoch) {
691 self.epochs[epoch].loss_probes += 1;
692 }
693
694 fn ping_sent(&mut self, epoch: packet::Epoch) {
695 self.epochs[epoch].loss_probes =
696 self.epochs[epoch].loss_probes.saturating_sub(1);
697 }
698
699 fn on_packet_sent(
700 &mut self, pkt: Sent, epoch: packet::Epoch,
701 handshake_status: HandshakeStatus, now: Instant, trace_id: &str,
702 ) {
703 let time_sent = if self.time_sent_set_to_now {
704 now
705 } else {
706 self.get_next_release_time().time(now).unwrap_or(now)
707 };
708
709 let epoch = &mut self.epochs[epoch];
710
711 let ack_eliciting = pkt.ack_eliciting;
712 let in_flight = pkt.in_flight;
713 let is_pmtud_probe = pkt.is_pmtud_probe;
714 let pkt_num = pkt.pkt_num;
715 let sent_bytes = pkt.size;
716
717 if let Some(SentPacket { pkt_num, .. }) = epoch.sent_packets.back() {
718 assert!(*pkt_num < pkt.pkt_num, "Packet numbers must increase");
719 }
720
721 let status = SentStatus::Sent {
722 time_sent,
723 ack_eliciting,
724 in_flight,
725 is_pmtud_probe,
726 has_data: pkt.has_data,
727 sent_bytes,
728 frames: pkt.frames,
729 };
730
731 #[cfg(test)]
732 {
733 epoch.test_largest_sent_pkt_num_on_path = epoch
734 .test_largest_sent_pkt_num_on_path
735 .max(Some(pkt.pkt_num));
736 }
737
738 epoch.sent_packets.push_back(SentPacket { pkt_num, status });
739
740 if ack_eliciting {
741 epoch.time_of_last_ack_eliciting_packet = Some(time_sent);
742 self.outstanding_non_ack_eliciting = 0;
743 } else {
744 self.outstanding_non_ack_eliciting += 1;
745 }
746
747 if in_flight {
748 self.pacer.on_packet_sent(
749 time_sent,
750 self.bytes_in_flight.get(),
751 pkt_num,
752 sent_bytes,
753 pkt.has_data,
754 &self.rtt_stats,
755 );
756
757 self.bytes_in_flight.add(sent_bytes, now);
758 epoch.pkts_in_flight += 1;
759 self.set_loss_detection_timer(handshake_status, time_sent);
760 }
761
762 self.bytes_sent += sent_bytes;
763
764 trace!("{trace_id} {self:?}");
765 }
766
767 fn get_packet_send_time(&self, now: Instant) -> Instant {
768 self.pacer.get_next_release_time().time(now).unwrap_or(now)
769 }
770
771 fn on_ack_received(
773 &mut self, peer_sent_ack_ranges: &RangeSet, ack_delay: u64,
774 epoch: packet::Epoch, handshake_status: HandshakeStatus, now: Instant,
775 skip_pn: Option<u64>, trace_id: &str,
776 ) -> Result<OnAckReceivedOutcome> {
777 let prior_in_flight = self.bytes_in_flight.get();
778
779 let AckedDetectionResult {
780 acked_bytes,
781 spurious_losses,
782 spurious_pkt_thresh,
783 has_ack_eliciting,
784 } = self.epochs[epoch].detect_and_remove_acked_packets(
785 peer_sent_ack_ranges,
786 &mut self.newly_acked,
787 skip_pn,
788 trace_id,
789 )?;
790
791 self.lost_spurious_count += spurious_losses;
792 if let Some(thresh) = spurious_pkt_thresh {
793 self.loss_thresh.on_spurious_loss(thresh);
794 }
795
796 if self.newly_acked.is_empty() {
797 return Ok(OnAckReceivedOutcome {
798 acked_bytes,
799 spurious_losses,
800 ..Default::default()
801 });
802 }
803
804 self.bytes_in_flight.saturating_subtract(acked_bytes, now);
805
806 let largest_newly_acked = self.newly_acked.last().unwrap();
807
808 let largest_acked_pkt_num = self.epochs[epoch]
811 .largest_acked_packet
812 .unwrap_or(0)
813 .max(largest_newly_acked.pkt_num);
814 self.epochs[epoch].largest_acked_packet = Some(largest_acked_pkt_num);
815
816 let update_rtt = largest_newly_acked.pkt_num == largest_acked_pkt_num &&
818 has_ack_eliciting;
819 if update_rtt {
820 let latest_rtt = now - largest_newly_acked.time_sent;
821 self.rtt_stats.update_rtt(
822 latest_rtt,
823 Duration::from_micros(ack_delay),
824 now,
825 handshake_status.completed,
826 );
827 }
828
829 let (lost_bytes, lost_packets) =
830 self.detect_and_remove_lost_packets(epoch, now);
831
832 self.pacer.on_congestion_event(
833 update_rtt,
834 prior_in_flight,
835 self.bytes_in_flight.get(),
836 now,
837 &self.newly_acked,
838 &self.lost_reuse,
839 self.epochs[epoch].least_unacked(),
840 &self.rtt_stats,
841 &mut self.recovery_stats,
842 );
843
844 self.pto_count = 0;
845 self.lost_count += lost_packets;
846
847 self.set_loss_detection_timer(handshake_status, now);
848
849 trace!("{trace_id} {self:?}");
850
851 Ok(OnAckReceivedOutcome {
852 lost_packets,
853 lost_bytes,
854 acked_bytes,
855 spurious_losses,
856 })
857 }
858
859 fn on_loss_detection_timeout(
860 &mut self, handshake_status: HandshakeStatus, now: Instant,
861 trace_id: &str,
862 ) -> OnLossDetectionTimeoutOutcome {
863 let (earliest_loss_time, epoch) = self.loss_time_and_space();
864
865 if earliest_loss_time.is_some() {
866 let prior_in_flight = self.bytes_in_flight.get();
867
868 let (lost_bytes, lost_packets) =
869 self.detect_and_remove_lost_packets(epoch, now);
870
871 self.pacer.on_congestion_event(
872 false,
873 prior_in_flight,
874 self.bytes_in_flight.get(),
875 now,
876 &[],
877 &self.lost_reuse,
878 self.epochs[epoch].least_unacked(),
879 &self.rtt_stats,
880 &mut self.recovery_stats,
881 );
882
883 self.lost_count += lost_packets;
884
885 self.set_loss_detection_timer(handshake_status, now);
886
887 trace!("{trace_id} {self:?}");
888 return OnLossDetectionTimeoutOutcome {
889 lost_packets,
890 lost_bytes,
891 };
892 }
893
894 let epoch = if self.bytes_in_flight.get() > 0 {
895 let (_, e) = self.pto_time_and_space(handshake_status, now);
898
899 e
900 } else {
901 if handshake_status.has_handshake_keys {
905 packet::Epoch::Handshake
906 } else {
907 packet::Epoch::Initial
908 }
909 };
910
911 self.pto_count += 1;
912
913 let epoch = &mut self.epochs[epoch];
914
915 epoch.loss_probes = MAX_PTO_PROBES_COUNT.min(self.pto_count as usize);
916
917 let unacked_frames = epoch
921 .sent_packets
922 .iter_mut()
923 .filter_map(|p| {
924 if let SentStatus::Sent {
925 has_data: true,
926 frames,
927 ..
928 } = &p.status
929 {
930 Some(frames)
931 } else {
932 None
933 }
934 })
935 .take(epoch.loss_probes)
936 .flatten()
937 .filter(|f| !matches!(f, frame::Frame::DatagramHeader { .. }));
938
939 epoch.lost_frames.extend(unacked_frames.cloned());
947
948 self.pacer
949 .on_retransmission_timeout(!epoch.lost_frames.is_empty());
950
951 self.set_loss_detection_timer(handshake_status, now);
952
953 trace!("{trace_id} {self:?}");
954 OnLossDetectionTimeoutOutcome {
955 lost_packets: 0,
956 lost_bytes: 0,
957 }
958 }
959
960 fn on_pkt_num_space_discarded(
961 &mut self, epoch: packet::Epoch, handshake_status: HandshakeStatus,
962 now: Instant,
963 ) {
964 let epoch = &mut self.epochs[epoch];
965 self.bytes_in_flight
966 .saturating_subtract(epoch.discard(&mut self.pacer), now);
967 self.set_loss_detection_timer(handshake_status, now);
968 }
969
970 fn on_path_change(
971 &mut self, epoch: packet::Epoch, now: Instant, _trace_id: &str,
972 ) -> (usize, usize) {
973 let (lost_bytes, lost_packets) =
974 self.detect_and_remove_lost_packets(epoch, now);
975
976 (lost_packets, lost_bytes)
977 }
978
979 fn loss_detection_timer(&self) -> Option<Instant> {
980 self.loss_timer.time
981 }
982
983 fn cwnd(&self) -> usize {
984 self.pacer.get_congestion_window()
985 }
986
987 fn cwnd_available(&self) -> usize {
988 if self.epochs.iter().any(|e| e.loss_probes > 0) {
990 return usize::MAX;
991 }
992
993 self.cwnd().saturating_sub(self.bytes_in_flight.get())
994 }
995
996 fn rtt(&self) -> Duration {
997 self.rtt_stats.rtt()
998 }
999
1000 fn min_rtt(&self) -> Option<Duration> {
1001 self.rtt_stats.min_rtt()
1002 }
1003
1004 fn max_rtt(&self) -> Option<Duration> {
1005 self.rtt_stats.max_rtt()
1006 }
1007
1008 fn rttvar(&self) -> Duration {
1009 self.rtt_stats.rttvar()
1010 }
1011
1012 fn pto(&self) -> Duration {
1013 let r = &self.rtt_stats;
1014 r.rtt() + (r.rttvar() * 4).max(GRANULARITY)
1015 }
1016
1017 fn delivery_rate(&self) -> Bandwidth {
1019 self.pacer.bandwidth_estimate(&self.rtt_stats)
1020 }
1021
1022 fn max_bandwidth(&self) -> Option<Bandwidth> {
1023 Some(self.pacer.max_bandwidth())
1024 }
1025
1026 fn startup_exit(&self) -> Option<StartupExit> {
1028 self.recovery_stats.startup_exit
1029 }
1030
1031 fn max_datagram_size(&self) -> usize {
1032 self.max_datagram_size
1033 }
1034
1035 fn pmtud_update_max_datagram_size(&mut self, new_max_datagram_size: usize) {
1036 self.max_datagram_size = new_max_datagram_size;
1037 self.pacer.update_mss(self.max_datagram_size);
1038 }
1039
1040 fn update_max_datagram_size(&mut self, new_max_datagram_size: usize) {
1041 self.pmtud_update_max_datagram_size(
1042 self.max_datagram_size.min(new_max_datagram_size),
1043 )
1044 }
1045
1046 fn on_app_limited(&mut self) {
1048 self.pacer.on_app_limited(self.bytes_in_flight.get())
1049 }
1050
1051 #[cfg(test)]
1052 fn sent_packets_len(&self, epoch: packet::Epoch) -> usize {
1053 self.epochs[epoch].sent_packets.len()
1054 }
1055
1056 #[cfg(test)]
1057 fn in_flight_count(&self, epoch: packet::Epoch) -> usize {
1058 self.epochs[epoch].pkts_in_flight
1059 }
1060
1061 fn bytes_in_flight(&self) -> usize {
1062 self.bytes_in_flight.get()
1063 }
1064
1065 fn bytes_in_flight_duration(&self) -> Duration {
1066 self.bytes_in_flight.get_duration()
1067 }
1068
1069 #[cfg(test)]
1070 fn pacing_rate(&self) -> u64 {
1071 self.pacer
1072 .pacing_rate(self.bytes_in_flight.get(), &self.rtt_stats)
1073 .to_bytes_per_period(Duration::from_secs(1))
1074 }
1075
1076 #[cfg(test)]
1077 fn pto_count(&self) -> u32 {
1078 self.pto_count
1079 }
1080
1081 #[cfg(test)]
1082 fn pkt_thresh(&self) -> Option<u64> {
1083 self.loss_thresh.pkt_thresh()
1084 }
1085
1086 #[cfg(test)]
1087 fn time_thresh(&self) -> f64 {
1088 self.loss_thresh.time_thresh()
1089 }
1090
1091 #[cfg(test)]
1092 fn lost_spurious_count(&self) -> usize {
1093 self.lost_spurious_count
1094 }
1095
1096 #[cfg(test)]
1097 fn detect_lost_packets_for_test(
1098 &mut self, epoch: packet::Epoch, now: Instant,
1099 ) -> (usize, usize) {
1100 let ret = self.detect_and_remove_lost_packets(epoch, now);
1101 self.epochs[epoch].drain_acked_and_lost_packets();
1102 ret
1103 }
1104
1105 #[cfg(test)]
1106 fn largest_sent_pkt_num_on_path(&self, epoch: packet::Epoch) -> Option<u64> {
1107 self.epochs[epoch].test_largest_sent_pkt_num_on_path
1108 }
1109
1110 #[cfg(test)]
1111 fn app_limited(&self) -> bool {
1112 self.pacer.is_app_limited(self.bytes_in_flight.get())
1113 }
1114
1115 fn update_app_limited(&mut self, _v: bool) {
1117 }
1119
1120 fn delivery_rate_update_app_limited(&mut self, _v: bool) {
1122 }
1124
1125 fn update_max_ack_delay(&mut self, max_ack_delay: Duration) {
1126 self.rtt_stats.max_ack_delay = max_ack_delay;
1127 }
1128
1129 fn get_next_release_time(&self) -> ReleaseDecision {
1130 self.pacer.get_next_release_time()
1131 }
1132
1133 fn gcongestion_enabled(&self) -> bool {
1134 true
1135 }
1136
1137 #[cfg(feature = "qlog")]
1138 fn state_str(&self, _now: Instant) -> &'static str {
1139 self.pacer.state_str()
1140 }
1141
1142 #[cfg(feature = "qlog")]
1143 fn get_updated_qlog_event_data(&mut self) -> Option<EventData> {
1144 let qlog_metrics = QlogMetrics {
1145 min_rtt: *self.rtt_stats.min_rtt,
1146 smoothed_rtt: self.rtt(),
1147 latest_rtt: self.rtt_stats.latest_rtt(),
1148 rttvar: self.rtt_stats.rttvar(),
1149 cwnd: self.cwnd() as u64,
1150 bytes_in_flight: self.bytes_in_flight.get() as u64,
1151 ssthresh: self.pacer.ssthresh(),
1152
1153 pacing_rate: Some(
1154 self.pacer
1155 .pacing_rate(self.bytes_in_flight.get(), &self.rtt_stats)
1156 .to_bytes_per_second(),
1157 ),
1158 delivery_rate: Some(self.delivery_rate().to_bytes_per_second()),
1159 send_rate: Some(self.send_rate().to_bytes_per_second()),
1160 ack_rate: Some(self.ack_rate().to_bytes_per_second()),
1161 lost_packets: Some(self.lost_count as u64),
1162 lost_bytes: Some(self.bytes_lost),
1163 pto_count: Some(self.pto_count),
1164 };
1165
1166 self.qlog_metrics.maybe_update(qlog_metrics)
1167 }
1168
1169 #[cfg(feature = "qlog")]
1170 fn get_updated_qlog_cc_state(
1171 &mut self, now: Instant,
1172 ) -> Option<&'static str> {
1173 let cc_state = self.state_str(now);
1174 if cc_state != self.qlog_prev_cc_state {
1175 self.qlog_prev_cc_state = cc_state;
1176 Some(cc_state)
1177 } else {
1178 None
1179 }
1180 }
1181
1182 fn send_quantum(&self) -> usize {
1183 let pacing_rate = self
1184 .pacer
1185 .pacing_rate(self.bytes_in_flight.get(), &self.rtt_stats);
1186
1187 let floor = if pacing_rate < Bandwidth::from_kbits_per_second(1200) {
1188 self.max_datagram_size
1189 } else {
1190 2 * self.max_datagram_size
1191 };
1192
1193 pacing_rate
1194 .to_bytes_per_period(ReleaseDecision::EQUAL_THRESHOLD)
1195 .min(64 * 1024)
1196 .max(floor as u64) as usize
1197 }
1198}
1199
1200impl std::fmt::Debug for GRecovery {
1201 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
1202 write!(f, "timer={:?} ", self.loss_detection_timer())?;
1203 write!(f, "rtt_stats={:?} ", self.rtt_stats)?;
1204 write!(f, "bytes_in_flight={} ", self.bytes_in_flight.get())?;
1205 write!(f, "{:?} ", self.pacer)?;
1206 Ok(())
1207 }
1208}
1209
1210#[cfg(test)]
1211mod tests {
1212 use super::*;
1213 use crate::Config;
1214
1215 #[test]
1216 fn loss_threshold() {
1217 let config = Config::new(crate::PROTOCOL_VERSION).unwrap();
1218 let recovery_config = RecoveryConfig::from_config(&config);
1219 assert!(!recovery_config.enable_relaxed_loss_threshold);
1220
1221 let mut loss_thresh = LossThreshold::new(&recovery_config);
1222 assert_eq!(loss_thresh.time_thresh_overhead, None);
1223 assert_eq!(loss_thresh.pkt_thresh().unwrap(), INITIAL_PACKET_THRESHOLD);
1224 assert_eq!(loss_thresh.time_thresh(), INITIAL_TIME_THRESHOLD);
1225
1226 loss_thresh.on_spurious_loss(INITIAL_PACKET_THRESHOLD);
1228 assert_eq!(loss_thresh.pkt_thresh().unwrap(), INITIAL_PACKET_THRESHOLD);
1229 assert_eq!(loss_thresh.time_thresh(), PACKET_REORDER_TIME_THRESHOLD);
1230
1231 for packet_gap in 0..INITIAL_PACKET_THRESHOLD {
1234 loss_thresh.on_spurious_loss(packet_gap);
1235
1236 assert_eq!(
1238 loss_thresh.pkt_thresh().unwrap(),
1239 INITIAL_PACKET_THRESHOLD
1240 );
1241 assert_eq!(loss_thresh.time_thresh(), PACKET_REORDER_TIME_THRESHOLD);
1242 }
1243
1244 for packet_gap in INITIAL_PACKET_THRESHOLD + 1..MAX_PACKET_THRESHOLD * 2 {
1248 loss_thresh.on_spurious_loss(packet_gap);
1249
1250 let new_packet_threshold = if packet_gap < MAX_PACKET_THRESHOLD {
1254 packet_gap
1255 } else {
1256 MAX_PACKET_THRESHOLD
1257 };
1258 assert_eq!(loss_thresh.pkt_thresh().unwrap(), new_packet_threshold);
1259 assert_eq!(loss_thresh.time_thresh(), PACKET_REORDER_TIME_THRESHOLD);
1260 }
1261 assert_eq!(loss_thresh.pkt_thresh().unwrap(), MAX_PACKET_THRESHOLD);
1263 assert_eq!(loss_thresh.time_thresh(), PACKET_REORDER_TIME_THRESHOLD);
1264
1265 loss_thresh.on_spurious_loss(INITIAL_PACKET_THRESHOLD);
1267 assert_eq!(loss_thresh.pkt_thresh().unwrap(), MAX_PACKET_THRESHOLD);
1268 assert_eq!(loss_thresh.time_thresh(), PACKET_REORDER_TIME_THRESHOLD);
1269 }
1270
1271 #[test]
1272 fn relaxed_loss_threshold() {
1273 const MAX_TIME_THRESHOLD: f64 = 2.0;
1275
1276 let mut config = Config::new(crate::PROTOCOL_VERSION).unwrap();
1277 config.set_enable_relaxed_loss_threshold(true);
1278 let recovery_config = RecoveryConfig::from_config(&config);
1279 assert!(recovery_config.enable_relaxed_loss_threshold);
1280
1281 let mut loss_thresh = LossThreshold::new(&recovery_config);
1282 assert_eq!(
1283 loss_thresh.time_thresh_overhead,
1284 Some(INITIAL_TIME_THRESHOLD_OVERHEAD)
1285 );
1286 assert_eq!(loss_thresh.pkt_thresh().unwrap(), INITIAL_PACKET_THRESHOLD);
1287 assert_eq!(loss_thresh.time_thresh(), INITIAL_TIME_THRESHOLD);
1288
1289 loss_thresh.on_spurious_loss(INITIAL_PACKET_THRESHOLD);
1291 assert_eq!(loss_thresh.pkt_thresh(), None);
1292 assert_eq!(loss_thresh.time_thresh(), INITIAL_TIME_THRESHOLD);
1293
1294 for subsequent_loss_count in 1..100 {
1296 let new_time_threshold = if subsequent_loss_count <= 3 {
1301 1.0 + INITIAL_TIME_THRESHOLD_OVERHEAD *
1302 2_f64.powi(subsequent_loss_count as i32)
1303 } else {
1304 2.0
1305 };
1306
1307 loss_thresh.on_spurious_loss(subsequent_loss_count);
1308 assert_eq!(loss_thresh.pkt_thresh(), None);
1309 assert_eq!(loss_thresh.time_thresh(), new_time_threshold);
1310 }
1311 assert_eq!(loss_thresh.pkt_thresh(), None);
1313 assert_eq!(loss_thresh.time_thresh(), MAX_TIME_THRESHOLD);
1314 }
1315}