1use crate::packet;
2use crate::recovery::OnLossDetectionTimeoutOutcome;
3
4use std::collections::VecDeque;
5use std::time::Duration;
6use std::time::Instant;
7
8use smallvec::SmallVec;
9
10#[cfg(feature = "qlog")]
11use qlog::events::EventData;
12
13#[cfg(feature = "qlog")]
14use crate::recovery::QlogMetrics;
15
16use crate::frame;
17
18use crate::recovery::bytes_in_flight::BytesInFlight;
19use crate::recovery::gcongestion::Bandwidth;
20use crate::recovery::rtt::RttStats;
21use crate::recovery::CongestionControlAlgorithm;
22use crate::recovery::HandshakeStatus;
23use crate::recovery::LossDetectionTimer;
24use crate::recovery::OnAckReceivedOutcome;
25use crate::recovery::RangeSet;
26use crate::recovery::RecoveryConfig;
27use crate::recovery::RecoveryOps;
28use crate::recovery::RecoveryStats;
29use crate::recovery::ReleaseDecision;
30use crate::recovery::Sent;
31use crate::recovery::StartupExit;
32use crate::recovery::GRANULARITY;
33use crate::recovery::INITIAL_PACKET_THRESHOLD;
34use crate::recovery::INITIAL_TIME_THRESHOLD;
35use crate::recovery::MAX_OUTSTANDING_NON_ACK_ELICITING;
36use crate::recovery::MAX_PACKET_THRESHOLD;
37use crate::recovery::MAX_PTO_PROBES_COUNT;
38use crate::Error;
39use crate::Result;
40
41use super::pacer::Pacer;
42use super::Acked;
43use super::Congestion;
44use super::CongestionControl;
45use super::Lost;
46
47const MAX_WINDOW_PACKETS: usize = 20_000;
49
50#[derive(Debug)]
51struct SentPacket {
52 pkt_num: u64,
53 status: SentStatus,
54}
55
56#[derive(Debug)]
57enum SentStatus {
58 Sent {
59 time_sent: Instant,
60 ack_eliciting: bool,
61 in_flight: bool,
62 has_data: bool,
63 is_pmtud_probe: bool,
64 sent_bytes: usize,
65 frames: SmallVec<[frame::Frame; 1]>,
66 },
67 Acked,
68 Lost,
69}
70
71impl SentStatus {
72 fn ack(&mut self) -> Self {
73 std::mem::replace(self, SentStatus::Acked)
74 }
75
76 fn lose(&mut self) -> Self {
77 if !matches!(self, SentStatus::Acked) {
78 std::mem::replace(self, SentStatus::Lost)
79 } else {
80 SentStatus::Acked
81 }
82 }
83}
84
85#[derive(Default)]
86struct RecoveryEpoch {
87 time_of_last_ack_eliciting_packet: Option<Instant>,
89
90 largest_acked_packet: Option<u64>,
93
94 loss_time: Option<Instant>,
97
98 sent_packets: VecDeque<SentPacket>,
101
102 loss_probes: usize,
103 pkts_in_flight: usize,
104
105 acked_frames: Vec<frame::Frame>,
106 lost_frames: Vec<frame::Frame>,
107
108 #[allow(dead_code)]
110 test_largest_sent_pkt_num_on_path: Option<u64>,
111}
112
113struct AckedDetectionResult {
114 acked_bytes: usize,
115 spurious_losses: usize,
116 spurious_pkt_thresh: Option<u64>,
117 has_ack_eliciting: bool,
118}
119
120struct LossDetectionResult {
121 lost_bytes: usize,
122 lost_packets: usize,
123
124 pmtud_lost_bytes: usize,
125 pmtud_lost_packets: SmallVec<[u64; 1]>,
126}
127
128impl RecoveryEpoch {
129 fn discard(&mut self, cc: &mut impl CongestionControl) -> usize {
132 let unacked_bytes = self
133 .sent_packets
134 .drain(..)
135 .map(|p| {
136 if let SentPacket {
137 status:
138 SentStatus::Sent {
139 in_flight,
140 sent_bytes,
141 ..
142 },
143 pkt_num,
144 } = p
145 {
146 cc.on_packet_neutered(pkt_num);
147 if in_flight {
148 return sent_bytes;
149 }
150 }
151 0
152 })
153 .sum();
154
155 std::mem::take(&mut self.sent_packets);
156 self.time_of_last_ack_eliciting_packet = None;
157 self.loss_time = None;
158 self.loss_probes = 0;
159 self.pkts_in_flight = 0;
160
161 unacked_bytes
162 }
163
164 fn detect_and_remove_acked_packets(
166 &mut self, peer_sent_ack_ranges: &RangeSet, newly_acked: &mut Vec<Acked>,
167 skip_pn: Option<u64>, trace_id: &str,
168 ) -> Result<AckedDetectionResult> {
169 newly_acked.clear();
170
171 let mut acked_bytes = 0;
172 let mut spurious_losses = 0;
173 let mut spurious_pkt_thresh = None;
174 let mut has_ack_eliciting = false;
175
176 let largest_ack_received = peer_sent_ack_ranges.last().unwrap();
177 let largest_acked = self
178 .largest_acked_packet
179 .unwrap_or(0)
180 .max(largest_ack_received);
181
182 for peer_sent_range in peer_sent_ack_ranges.iter() {
183 if skip_pn.is_some_and(|skip_pn| peer_sent_range.contains(&skip_pn)) {
184 return Err(Error::OptimisticAckDetected);
189 }
190
191 let start = if self
194 .sent_packets
195 .front()
196 .filter(|e| e.pkt_num >= peer_sent_range.start)
197 .is_some()
198 {
199 0
201 } else {
202 self.sent_packets
203 .binary_search_by_key(&peer_sent_range.start, |p| p.pkt_num)
204 .unwrap_or_else(|e| e)
205 };
206
207 for SentPacket { pkt_num, status } in
208 self.sent_packets.range_mut(start..)
209 {
210 if *pkt_num < peer_sent_range.end {
211 match status.ack() {
212 SentStatus::Sent {
213 time_sent,
214 in_flight,
215 sent_bytes,
216 frames,
217 ack_eliciting,
218 ..
219 } => {
220 if in_flight {
221 self.pkts_in_flight -= 1;
222 acked_bytes += sent_bytes;
223 }
224 newly_acked.push(Acked {
225 pkt_num: *pkt_num,
226 time_sent,
227 });
228
229 self.acked_frames.extend(frames);
230
231 has_ack_eliciting |= ack_eliciting;
232
233 trace!("{trace_id} packet newly acked {pkt_num}");
234 },
235
236 SentStatus::Acked => {},
237 SentStatus::Lost => {
238 spurious_losses += 1;
240 spurious_pkt_thresh
241 .get_or_insert(largest_acked - *pkt_num + 1);
242 },
243 }
244 } else {
245 break;
246 }
247 }
248 }
249
250 self.drain_acked_and_lost_packets();
251
252 Ok(AckedDetectionResult {
253 acked_bytes,
254 spurious_losses,
255 spurious_pkt_thresh,
256 has_ack_eliciting,
257 })
258 }
259
260 fn detect_and_remove_lost_packets(
261 &mut self, loss_delay: Duration, pkt_thresh: u64, now: Instant,
262 newly_lost: &mut Vec<Lost>,
263 ) -> LossDetectionResult {
264 newly_lost.clear();
265 let mut lost_bytes = 0;
266 self.loss_time = None;
267
268 let lost_send_time = now.checked_sub(loss_delay).unwrap();
269 let largest_acked = self.largest_acked_packet.unwrap_or(0);
270 let mut pmtud_lost_bytes = 0;
271 let mut pmtud_lost_packets = SmallVec::new();
272
273 for SentPacket { pkt_num, status } in &mut self.sent_packets {
274 if *pkt_num > largest_acked {
275 break;
276 }
277
278 if let SentStatus::Sent { time_sent, .. } = status {
279 if *time_sent <= lost_send_time ||
280 largest_acked >= *pkt_num + pkt_thresh
281 {
282 if let SentStatus::Sent {
283 in_flight,
284 sent_bytes,
285 frames,
286 is_pmtud_probe,
287 ..
288 } = status.lose()
289 {
290 self.lost_frames.extend(frames);
291
292 if in_flight {
293 self.pkts_in_flight -= 1;
294
295 if is_pmtud_probe {
296 pmtud_lost_bytes += sent_bytes;
297 pmtud_lost_packets.push(*pkt_num);
298 continue;
300 }
301
302 lost_bytes += sent_bytes;
303 }
304
305 newly_lost.push(Lost {
306 packet_number: *pkt_num,
307 bytes_lost: sent_bytes,
308 });
309 }
310 } else {
311 self.loss_time = Some(*time_sent + loss_delay);
312 break;
313 }
314 }
315 }
316
317 LossDetectionResult {
318 lost_bytes,
319 lost_packets: newly_lost.len(),
320
321 pmtud_lost_bytes,
322 pmtud_lost_packets,
323 }
324 }
325
326 fn drain_acked_and_lost_packets(&mut self) {
330 while let Some(SentPacket {
331 status: SentStatus::Acked | SentStatus::Lost,
332 ..
333 }) = self.sent_packets.front()
334 {
335 self.sent_packets.pop_front();
336 }
337 }
338
339 fn least_unacked(&self) -> u64 {
340 for pkt in self.sent_packets.iter() {
341 if let SentPacket {
342 pkt_num,
343 status: SentStatus::Sent { .. },
344 } = pkt
345 {
346 return *pkt_num;
347 }
348 }
349
350 self.largest_acked_packet.unwrap_or(0) + 1
351 }
352}
353
354pub struct GRecovery {
355 epochs: [RecoveryEpoch; packet::Epoch::count()],
356
357 loss_timer: LossDetectionTimer,
358
359 pto_count: u32,
360
361 rtt_stats: RttStats,
362
363 recovery_stats: RecoveryStats,
364
365 pub lost_count: usize,
366
367 pub lost_spurious_count: usize,
368
369 pkt_thresh: u64,
370
371 time_thresh: f64,
372
373 bytes_in_flight: BytesInFlight,
374
375 bytes_sent: usize,
376
377 pub bytes_lost: u64,
378
379 max_datagram_size: usize,
380
381 #[cfg(feature = "qlog")]
382 qlog_metrics: QlogMetrics,
383
384 #[cfg(feature = "qlog")]
385 qlog_prev_cc_state: &'static str,
386
387 outstanding_non_ack_eliciting: usize,
389
390 newly_acked: Vec<Acked>,
392
393 lost_reuse: Vec<Lost>,
396
397 pacer: Pacer,
398}
399
400impl GRecovery {
401 pub fn new(recovery_config: &RecoveryConfig) -> Option<Self> {
402 let cc = match recovery_config.cc_algorithm {
403 CongestionControlAlgorithm::Bbr2Gcongestion => Congestion::bbrv2(
404 recovery_config.initial_congestion_window_packets,
405 MAX_WINDOW_PACKETS,
406 recovery_config,
407 ),
408 _ => return None,
409 };
410
411 Some(Self {
412 epochs: Default::default(),
413 rtt_stats: RttStats::new(recovery_config.max_ack_delay),
414 recovery_stats: RecoveryStats::default(),
415 loss_timer: Default::default(),
416 pto_count: 0,
417
418 lost_count: 0,
419 lost_spurious_count: 0,
420
421 pkt_thresh: INITIAL_PACKET_THRESHOLD,
422 time_thresh: INITIAL_TIME_THRESHOLD,
423
424 bytes_in_flight: Default::default(),
425 bytes_sent: 0,
426 bytes_lost: 0,
427
428 max_datagram_size: recovery_config.max_send_udp_payload_size,
429
430 #[cfg(feature = "qlog")]
431 qlog_metrics: QlogMetrics::default(),
432
433 #[cfg(feature = "qlog")]
434 qlog_prev_cc_state: "",
435
436 outstanding_non_ack_eliciting: 0,
437
438 pacer: Pacer::new(
439 recovery_config.pacing,
440 cc,
441 recovery_config
442 .max_pacing_rate
443 .map(Bandwidth::from_mbits_per_second),
444 ),
445
446 newly_acked: Vec::new(),
447 lost_reuse: Vec::new(),
448 })
449 }
450
451 fn detect_and_remove_lost_packets(
452 &mut self, epoch: packet::Epoch, now: Instant,
453 ) -> (usize, usize) {
454 let lost = &mut self.lost_reuse;
455
456 let LossDetectionResult {
457 lost_bytes,
458 lost_packets,
459
460 pmtud_lost_bytes,
461 pmtud_lost_packets,
462 } = self.epochs[epoch].detect_and_remove_lost_packets(
463 self.rtt_stats.loss_delay(self.time_thresh),
464 self.pkt_thresh,
465 now,
466 lost,
467 );
468
469 self.bytes_in_flight
470 .saturating_subtract(lost_bytes + pmtud_lost_bytes, now);
471
472 for pkt in pmtud_lost_packets {
473 self.pacer.on_packet_neutered(pkt);
474 }
475
476 (lost_bytes, lost_packets)
477 }
478
479 fn loss_time_and_space(&self) -> (Option<Instant>, packet::Epoch) {
480 let mut epoch = packet::Epoch::Initial;
481 let mut time = self.epochs[epoch].loss_time;
482
483 for e in [packet::Epoch::Handshake, packet::Epoch::Application] {
485 let new_time = self.epochs[e].loss_time;
486 if time.is_none() || new_time < time {
487 time = new_time;
488 epoch = e;
489 }
490 }
491
492 (time, epoch)
493 }
494
495 fn pto_time_and_space(
496 &self, handshake_status: HandshakeStatus, now: Instant,
497 ) -> (Option<Instant>, packet::Epoch) {
498 let mut duration = self.pto() * (1 << self.pto_count);
499
500 if self.bytes_in_flight.is_zero() {
502 if handshake_status.has_handshake_keys {
503 return (Some(now + duration), packet::Epoch::Handshake);
504 } else {
505 return (Some(now + duration), packet::Epoch::Initial);
506 }
507 }
508
509 let mut pto_timeout = None;
510 let mut pto_space = packet::Epoch::Initial;
511
512 for &e in packet::Epoch::epochs(
514 packet::Epoch::Initial..=packet::Epoch::Application,
515 ) {
516 if self.epochs[e].pkts_in_flight == 0 {
517 continue;
518 }
519
520 if e == packet::Epoch::Application {
521 if !handshake_status.completed {
523 return (pto_timeout, pto_space);
524 }
525
526 duration +=
528 self.rtt_stats.max_ack_delay * 2_u32.pow(self.pto_count);
529 }
530
531 let new_time = self.epochs[e]
532 .time_of_last_ack_eliciting_packet
533 .map(|t| t + duration);
534
535 if pto_timeout.is_none() || new_time < pto_timeout {
536 pto_timeout = new_time;
537 pto_space = e;
538 }
539 }
540
541 (pto_timeout, pto_space)
542 }
543
544 fn set_loss_detection_timer(
545 &mut self, handshake_status: HandshakeStatus, now: Instant,
546 ) {
547 if let (Some(earliest_loss_time), _) = self.loss_time_and_space() {
548 self.loss_timer.update(earliest_loss_time);
550 return;
551 }
552
553 if self.bytes_in_flight.is_zero() &&
554 handshake_status.peer_verified_address
555 {
556 self.loss_timer.clear();
557 return;
558 }
559
560 if let (Some(timeout), _) = self.pto_time_and_space(handshake_status, now)
562 {
563 self.loss_timer.update(timeout);
564 }
565 }
566}
567
568impl RecoveryOps for GRecovery {
569 fn lost_count(&self) -> usize {
570 self.lost_count
571 }
572
573 fn bytes_lost(&self) -> u64 {
574 self.bytes_lost
575 }
576
577 fn should_elicit_ack(&self, epoch: packet::Epoch) -> bool {
578 self.epochs[epoch].loss_probes > 0 ||
579 self.outstanding_non_ack_eliciting >=
580 MAX_OUTSTANDING_NON_ACK_ELICITING
581 }
582
583 fn get_acked_frames(&mut self, epoch: packet::Epoch) -> Vec<frame::Frame> {
584 std::mem::take(&mut self.epochs[epoch].acked_frames)
585 }
586
587 fn get_lost_frames(&mut self, epoch: packet::Epoch) -> Vec<frame::Frame> {
588 std::mem::take(&mut self.epochs[epoch].lost_frames)
589 }
590
591 fn get_largest_acked_on_epoch(&self, epoch: packet::Epoch) -> Option<u64> {
592 self.epochs[epoch].largest_acked_packet
593 }
594
595 fn has_lost_frames(&self, epoch: packet::Epoch) -> bool {
596 !self.epochs[epoch].lost_frames.is_empty()
597 }
598
599 fn loss_probes(&self, epoch: packet::Epoch) -> usize {
600 self.epochs[epoch].loss_probes
601 }
602
603 #[cfg(test)]
604 fn inc_loss_probes(&mut self, epoch: packet::Epoch) {
605 self.epochs[epoch].loss_probes += 1;
606 }
607
608 fn ping_sent(&mut self, epoch: packet::Epoch) {
609 self.epochs[epoch].loss_probes =
610 self.epochs[epoch].loss_probes.saturating_sub(1);
611 }
612
613 fn on_packet_sent(
614 &mut self, pkt: Sent, epoch: packet::Epoch,
615 handshake_status: HandshakeStatus, now: Instant, trace_id: &str,
616 ) {
617 let time_sent = self.get_next_release_time().time(now).unwrap_or(now);
618
619 let epoch = &mut self.epochs[epoch];
620
621 let ack_eliciting = pkt.ack_eliciting;
622 let in_flight = pkt.in_flight;
623 let is_pmtud_probe = pkt.is_pmtud_probe;
624 let pkt_num = pkt.pkt_num;
625 let sent_bytes = pkt.size;
626
627 if let Some(SentPacket { pkt_num, .. }) = epoch.sent_packets.back() {
628 assert!(*pkt_num < pkt.pkt_num, "Packet numbers must increase");
629 }
630
631 let status = SentStatus::Sent {
632 time_sent,
633 ack_eliciting,
634 in_flight,
635 is_pmtud_probe,
636 has_data: pkt.has_data,
637 sent_bytes,
638 frames: pkt.frames,
639 };
640
641 #[cfg(test)]
642 {
643 epoch.test_largest_sent_pkt_num_on_path = epoch
644 .test_largest_sent_pkt_num_on_path
645 .max(Some(pkt.pkt_num));
646 }
647
648 epoch.sent_packets.push_back(SentPacket { pkt_num, status });
649
650 if ack_eliciting {
651 epoch.time_of_last_ack_eliciting_packet = Some(time_sent);
652 self.outstanding_non_ack_eliciting = 0;
653 } else {
654 self.outstanding_non_ack_eliciting += 1;
655 }
656
657 if in_flight {
658 self.pacer.on_packet_sent(
659 time_sent,
660 self.bytes_in_flight.get(),
661 pkt_num,
662 sent_bytes,
663 pkt.has_data,
664 &self.rtt_stats,
665 );
666
667 self.bytes_in_flight.add(sent_bytes, now);
668 epoch.pkts_in_flight += 1;
669 self.set_loss_detection_timer(handshake_status, time_sent);
670 }
671
672 self.bytes_sent += sent_bytes;
673
674 trace!("{trace_id} {self:?}");
675 }
676
677 fn get_packet_send_time(&self, now: Instant) -> Instant {
678 self.pacer.get_next_release_time().time(now).unwrap_or(now)
679 }
680
681 fn on_ack_received(
683 &mut self, peer_sent_ack_ranges: &RangeSet, ack_delay: u64,
684 epoch: packet::Epoch, handshake_status: HandshakeStatus, now: Instant,
685 skip_pn: Option<u64>, trace_id: &str,
686 ) -> Result<OnAckReceivedOutcome> {
687 let prior_in_flight = self.bytes_in_flight.get();
688
689 let AckedDetectionResult {
690 acked_bytes,
691 spurious_losses,
692 spurious_pkt_thresh,
693 has_ack_eliciting,
694 } = self.epochs[epoch].detect_and_remove_acked_packets(
695 peer_sent_ack_ranges,
696 &mut self.newly_acked,
697 skip_pn,
698 trace_id,
699 )?;
700
701 self.lost_spurious_count += spurious_losses;
702 if let Some(thresh) = spurious_pkt_thresh {
703 self.pkt_thresh =
704 self.pkt_thresh.max(thresh.min(MAX_PACKET_THRESHOLD));
705 }
706
707 if self.newly_acked.is_empty() {
708 return Ok(OnAckReceivedOutcome::default());
709 }
710
711 self.bytes_in_flight.saturating_subtract(acked_bytes, now);
712
713 let largest_newly_acked = self.newly_acked.last().unwrap();
714
715 let largest_acked_pkt_num = self.epochs[epoch]
718 .largest_acked_packet
719 .unwrap_or(0)
720 .max(largest_newly_acked.pkt_num);
721 self.epochs[epoch].largest_acked_packet = Some(largest_acked_pkt_num);
722
723 let update_rtt = largest_newly_acked.pkt_num == largest_acked_pkt_num &&
725 has_ack_eliciting;
726 if update_rtt {
727 let latest_rtt = now - largest_newly_acked.time_sent;
728 self.rtt_stats.update_rtt(
729 latest_rtt,
730 Duration::from_micros(ack_delay),
731 now,
732 handshake_status.completed,
733 );
734 }
735
736 let (lost_bytes, lost_packets) =
737 self.detect_and_remove_lost_packets(epoch, now);
738
739 self.pacer.on_congestion_event(
740 update_rtt,
741 prior_in_flight,
742 self.bytes_in_flight.get(),
743 now,
744 &self.newly_acked,
745 &self.lost_reuse,
746 self.epochs[epoch].least_unacked(),
747 &self.rtt_stats,
748 &mut self.recovery_stats,
749 );
750
751 self.pto_count = 0;
752 self.lost_count += lost_packets;
753
754 self.set_loss_detection_timer(handshake_status, now);
755
756 trace!("{trace_id} {self:?}");
757
758 Ok(OnAckReceivedOutcome {
759 lost_packets,
760 lost_bytes,
761 acked_bytes,
762 spurious_losses,
763 })
764 }
765
766 fn on_loss_detection_timeout(
767 &mut self, handshake_status: HandshakeStatus, now: Instant,
768 trace_id: &str,
769 ) -> OnLossDetectionTimeoutOutcome {
770 let (earliest_loss_time, epoch) = self.loss_time_and_space();
771
772 if earliest_loss_time.is_some() {
773 let prior_in_flight = self.bytes_in_flight.get();
774
775 let (lost_bytes, lost_packets) =
776 self.detect_and_remove_lost_packets(epoch, now);
777
778 self.pacer.on_congestion_event(
779 false,
780 prior_in_flight,
781 self.bytes_in_flight.get(),
782 now,
783 &[],
784 &self.lost_reuse,
785 self.epochs[epoch].least_unacked(),
786 &self.rtt_stats,
787 &mut self.recovery_stats,
788 );
789
790 self.lost_count += lost_packets;
791
792 self.set_loss_detection_timer(handshake_status, now);
793
794 trace!("{trace_id} {self:?}");
795 return OnLossDetectionTimeoutOutcome {
796 lost_packets,
797 lost_bytes,
798 };
799 }
800
801 let epoch = if self.bytes_in_flight.get() > 0 {
802 let (_, e) = self.pto_time_and_space(handshake_status, now);
805
806 e
807 } else {
808 if handshake_status.has_handshake_keys {
812 packet::Epoch::Handshake
813 } else {
814 packet::Epoch::Initial
815 }
816 };
817
818 self.pto_count += 1;
819
820 let epoch = &mut self.epochs[epoch];
821
822 epoch.loss_probes = MAX_PTO_PROBES_COUNT.min(self.pto_count as usize);
823
824 let unacked_frames = epoch
828 .sent_packets
829 .iter_mut()
830 .filter_map(|p| {
831 if let SentStatus::Sent {
832 has_data: true,
833 frames,
834 ..
835 } = &p.status
836 {
837 Some(frames)
838 } else {
839 None
840 }
841 })
842 .take(epoch.loss_probes)
843 .flatten()
844 .filter(|f| !matches!(f, frame::Frame::DatagramHeader { .. }));
845
846 epoch.lost_frames.extend(unacked_frames.cloned());
854
855 self.pacer
856 .on_retransmission_timeout(!epoch.lost_frames.is_empty());
857
858 self.set_loss_detection_timer(handshake_status, now);
859
860 trace!("{trace_id} {self:?}");
861 OnLossDetectionTimeoutOutcome {
862 lost_packets: 0,
863 lost_bytes: 0,
864 }
865 }
866
867 fn on_pkt_num_space_discarded(
868 &mut self, epoch: packet::Epoch, handshake_status: HandshakeStatus,
869 now: Instant,
870 ) {
871 let epoch = &mut self.epochs[epoch];
872 self.bytes_in_flight
873 .saturating_subtract(epoch.discard(&mut self.pacer), now);
874 self.set_loss_detection_timer(handshake_status, now);
875 }
876
877 fn on_path_change(
878 &mut self, epoch: packet::Epoch, now: Instant, _trace_id: &str,
879 ) -> (usize, usize) {
880 let (lost_bytes, lost_packets) =
881 self.detect_and_remove_lost_packets(epoch, now);
882
883 (lost_packets, lost_bytes)
884 }
885
886 fn loss_detection_timer(&self) -> Option<Instant> {
887 self.loss_timer.time
888 }
889
890 fn cwnd(&self) -> usize {
891 self.pacer.get_congestion_window()
892 }
893
894 fn cwnd_available(&self) -> usize {
895 if self.epochs.iter().any(|e| e.loss_probes > 0) {
897 return usize::MAX;
898 }
899
900 self.cwnd().saturating_sub(self.bytes_in_flight.get())
901 }
902
903 fn rtt(&self) -> Duration {
904 self.rtt_stats.rtt()
905 }
906
907 fn min_rtt(&self) -> Option<Duration> {
908 self.rtt_stats.min_rtt()
909 }
910
911 fn max_rtt(&self) -> Option<Duration> {
912 self.rtt_stats.max_rtt()
913 }
914
915 fn rttvar(&self) -> Duration {
916 self.rtt_stats.rttvar()
917 }
918
919 fn pto(&self) -> Duration {
920 let r = &self.rtt_stats;
921 r.rtt() + (r.rttvar() * 4).max(GRANULARITY)
922 }
923
924 fn delivery_rate(&self) -> Bandwidth {
926 self.pacer.bandwidth_estimate(&self.rtt_stats)
927 }
928
929 fn startup_exit(&self) -> Option<StartupExit> {
931 self.recovery_stats.startup_exit
932 }
933
934 fn max_datagram_size(&self) -> usize {
935 self.max_datagram_size
936 }
937
938 fn pmtud_update_max_datagram_size(&mut self, new_max_datagram_size: usize) {
939 self.max_datagram_size = new_max_datagram_size;
940 self.pacer.update_mss(self.max_datagram_size);
941 }
942
943 fn update_max_datagram_size(&mut self, new_max_datagram_size: usize) {
944 self.pmtud_update_max_datagram_size(
945 self.max_datagram_size.min(new_max_datagram_size),
946 )
947 }
948
949 fn on_app_limited(&mut self) {
951 self.pacer.on_app_limited(self.bytes_in_flight.get())
952 }
953
954 #[cfg(test)]
955 fn sent_packets_len(&self, epoch: packet::Epoch) -> usize {
956 self.epochs[epoch].sent_packets.len()
957 }
958
959 #[cfg(test)]
960 fn in_flight_count(&self, epoch: packet::Epoch) -> usize {
961 self.epochs[epoch].pkts_in_flight
962 }
963
964 #[cfg(test)]
965 fn bytes_in_flight(&self) -> usize {
966 self.bytes_in_flight.get()
967 }
968
969 fn bytes_in_flight_duration(&self) -> Duration {
970 self.bytes_in_flight.get_duration()
971 }
972
973 #[cfg(test)]
974 fn pacing_rate(&self) -> u64 {
975 self.pacer
976 .pacing_rate(self.bytes_in_flight.get(), &self.rtt_stats)
977 .to_bytes_per_period(Duration::from_secs(1))
978 }
979
980 #[cfg(test)]
981 fn pto_count(&self) -> u32 {
982 self.pto_count
983 }
984
985 #[cfg(test)]
986 fn pkt_thresh(&self) -> u64 {
987 self.pkt_thresh
988 }
989
990 #[cfg(test)]
991 fn lost_spurious_count(&self) -> usize {
992 self.lost_spurious_count
993 }
994
995 #[cfg(test)]
996 fn detect_lost_packets_for_test(
997 &mut self, epoch: packet::Epoch, now: Instant,
998 ) -> (usize, usize) {
999 let ret = self.detect_and_remove_lost_packets(epoch, now);
1000 self.epochs[epoch].drain_acked_and_lost_packets();
1001 ret
1002 }
1003
1004 #[cfg(test)]
1005 fn largest_sent_pkt_num_on_path(&self, epoch: packet::Epoch) -> Option<u64> {
1006 self.epochs[epoch].test_largest_sent_pkt_num_on_path
1007 }
1008
1009 #[cfg(test)]
1010 fn app_limited(&self) -> bool {
1011 self.pacer.is_app_limited(self.bytes_in_flight.get())
1012 }
1013
1014 fn update_app_limited(&mut self, _v: bool) {
1016 }
1018
1019 fn delivery_rate_update_app_limited(&mut self, _v: bool) {
1021 }
1023
1024 fn update_max_ack_delay(&mut self, max_ack_delay: Duration) {
1025 self.rtt_stats.max_ack_delay = max_ack_delay;
1026 }
1027
1028 fn get_next_release_time(&self) -> ReleaseDecision {
1029 self.pacer.get_next_release_time()
1030 }
1031
1032 fn gcongestion_enabled(&self) -> bool {
1033 true
1034 }
1035
1036 #[cfg(feature = "qlog")]
1037 fn state_str(&self, _now: Instant) -> &'static str {
1038 self.pacer.state_str()
1039 }
1040
1041 #[cfg(feature = "qlog")]
1042 fn get_updated_qlog_event_data(&mut self) -> Option<EventData> {
1043 let qlog_metrics = QlogMetrics {
1044 min_rtt: *self.rtt_stats.min_rtt,
1045 smoothed_rtt: self.rtt(),
1046 latest_rtt: self.rtt_stats.latest_rtt(),
1047 rttvar: self.rtt_stats.rttvar(),
1048 cwnd: self.cwnd() as u64,
1049 bytes_in_flight: self.bytes_in_flight.get() as u64,
1050 ssthresh: self.pacer.ssthresh(),
1051 pacing_rate: self.delivery_rate().to_bytes_per_second(),
1052 };
1053
1054 self.qlog_metrics.maybe_update(qlog_metrics)
1055 }
1056
1057 #[cfg(feature = "qlog")]
1058 fn get_updated_qlog_cc_state(
1059 &mut self, now: Instant,
1060 ) -> Option<&'static str> {
1061 let cc_state = self.state_str(now);
1062 if cc_state != self.qlog_prev_cc_state {
1063 self.qlog_prev_cc_state = cc_state;
1064 Some(cc_state)
1065 } else {
1066 None
1067 }
1068 }
1069
1070 fn send_quantum(&self) -> usize {
1071 let pacing_rate = self
1072 .pacer
1073 .pacing_rate(self.bytes_in_flight.get(), &self.rtt_stats);
1074
1075 let floor = if pacing_rate < Bandwidth::from_kbits_per_second(1200) {
1076 self.max_datagram_size
1077 } else {
1078 2 * self.max_datagram_size
1079 };
1080
1081 pacing_rate
1082 .to_bytes_per_period(ReleaseDecision::EQUAL_THRESHOLD)
1083 .min(64 * 1024)
1084 .max(floor as u64) as usize
1085 }
1086}
1087
1088impl std::fmt::Debug for GRecovery {
1089 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
1090 write!(f, "timer={:?} ", self.loss_detection_timer())?;
1091 write!(f, "rtt_stats={:?} ", self.rtt_stats)?;
1092 write!(f, "bytes_in_flight={} ", self.bytes_in_flight.get())?;
1093 write!(f, "{:?} ", self.pacer)?;
1094 Ok(())
1095 }
1096}