1use crate::packet;
2
3use std::collections::VecDeque;
4use std::time::Duration;
5use std::time::Instant;
6
7use smallvec::SmallVec;
8
9#[cfg(feature = "qlog")]
10use qlog::events::EventData;
11
12#[cfg(feature = "qlog")]
13use crate::recovery::QlogMetrics;
14
15use crate::frame;
16
17use crate::recovery::rtt::RttStats;
18use crate::recovery::CongestionControlAlgorithm;
19use crate::recovery::HandshakeStatus;
20use crate::recovery::LossDetectionTimer;
21use crate::recovery::RangeSet;
22use crate::recovery::RecoveryConfig;
23use crate::recovery::RecoveryOps;
24use crate::recovery::ReleaseDecision;
25use crate::recovery::Sent;
26use crate::recovery::GRANULARITY;
27use crate::recovery::INITIAL_PACKET_THRESHOLD;
28use crate::recovery::INITIAL_TIME_THRESHOLD;
29use crate::recovery::MAX_OUTSTANDING_NON_ACK_ELICITING;
30use crate::recovery::MAX_PACKET_THRESHOLD;
31use crate::recovery::MAX_PTO_PROBES_COUNT;
32
33use super::bandwidth::Bandwidth;
34use super::pacer::Pacer;
35use super::Acked;
36use super::Congestion;
37use super::CongestionControl;
38use super::Lost;
39
40const INITIAL_WINDOW_PACKETS: usize = 10;
42
43const MAX_WINDOW_PACKETS: usize = 20_000;
44
45#[derive(Debug)]
46struct SentPacket {
47 pkt_num: u64,
48 status: SentStatus,
49}
50
51#[derive(Debug)]
52enum SentStatus {
53 Sent {
54 time_sent: Instant,
55 ack_eliciting: bool,
56 in_flight: bool,
57 has_data: bool,
58 pmtud: bool,
59 sent_bytes: usize,
60 frames: SmallVec<[frame::Frame; 1]>,
61 },
62 Acked,
63 Lost,
64}
65
66impl SentStatus {
67 fn ack(&mut self) -> Self {
68 std::mem::replace(self, SentStatus::Acked)
69 }
70
71 fn lose(&mut self) -> Self {
72 if !matches!(self, SentStatus::Acked) {
73 std::mem::replace(self, SentStatus::Lost)
74 } else {
75 SentStatus::Acked
76 }
77 }
78}
79
80#[derive(Default)]
81struct RecoveryEpoch {
82 time_of_last_ack_eliciting_packet: Option<Instant>,
84
85 largest_acked_packet: Option<u64>,
88
89 loss_time: Option<Instant>,
92
93 sent_packets: VecDeque<SentPacket>,
96
97 loss_probes: usize,
98 pkts_in_flight: usize,
99
100 acked_frames: Vec<frame::Frame>,
101 lost_frames: Vec<frame::Frame>,
102}
103
104struct AckedDetectionResult {
105 acked_bytes: usize,
106 spurious_losses: usize,
107 spurious_pkt_thresh: Option<u64>,
108 has_ack_eliciting: bool,
109}
110
111struct LossDetectionResult {
112 lost_bytes: usize,
113 lost_packets: usize,
114
115 pmtud_lost_bytes: usize,
116 pmtud_lost_packets: SmallVec<[u64; 1]>,
117}
118
119impl RecoveryEpoch {
120 fn discard(&mut self, cc: &mut impl CongestionControl) -> usize {
123 let unacked_bytes = self
124 .sent_packets
125 .drain(..)
126 .map(|p| {
127 if let SentPacket {
128 status:
129 SentStatus::Sent {
130 in_flight,
131 sent_bytes,
132 ..
133 },
134 pkt_num,
135 } = p
136 {
137 cc.on_packet_neutered(pkt_num);
138 if in_flight {
139 return sent_bytes;
140 }
141 }
142 0
143 })
144 .sum();
145
146 std::mem::take(&mut self.sent_packets);
147 self.time_of_last_ack_eliciting_packet = None;
148 self.loss_time = None;
149 self.loss_probes = 0;
150 self.pkts_in_flight = 0;
151
152 unacked_bytes
153 }
154
155 fn detect_and_remove_acked_packets(
156 &mut self, acked: &RangeSet, newly_acked: &mut Vec<Acked>, trace_id: &str,
157 ) -> AckedDetectionResult {
158 self.largest_acked_packet.replace(
160 self.largest_acked_packet
161 .unwrap_or(0)
162 .max(acked.last().unwrap()),
163 );
164
165 newly_acked.clear();
166
167 let mut acked_bytes = 0;
168 let mut spurious_losses = 0;
169 let mut spurious_pkt_thresh = None;
170 let mut has_ack_eliciting = false;
171
172 let largest_acked = self.largest_acked_packet.unwrap();
173
174 for ack in acked.iter() {
175 let start = if self
178 .sent_packets
179 .front()
180 .filter(|e| e.pkt_num >= ack.start)
181 .is_some()
182 {
183 0
185 } else {
186 self.sent_packets
187 .binary_search_by_key(&ack.start, |p| p.pkt_num)
188 .unwrap_or_else(|e| e)
189 };
190
191 for SentPacket { pkt_num, status } in
192 self.sent_packets.range_mut(start..)
193 {
194 if *pkt_num < ack.end {
195 match status.ack() {
196 SentStatus::Sent {
197 time_sent,
198 in_flight,
199 sent_bytes,
200 frames,
201 ack_eliciting,
202 ..
203 } => {
204 if in_flight {
205 self.pkts_in_flight -= 1;
206 acked_bytes += sent_bytes;
207 }
208 newly_acked.push(Acked {
209 pkt_num: *pkt_num,
210 time_sent,
211 });
212
213 self.acked_frames.extend(frames);
214
215 has_ack_eliciting |= ack_eliciting;
216
217 trace!("{} packet newly acked {}", trace_id, pkt_num);
218 },
219
220 SentStatus::Acked => {},
221 SentStatus::Lost => {
222 spurious_losses += 1;
224 spurious_pkt_thresh
225 .get_or_insert(largest_acked - *pkt_num + 1);
226 },
227 }
228 } else {
229 break;
230 }
231 }
232 }
233
234 self.drain_acked_and_lost_packets();
235
236 AckedDetectionResult {
237 acked_bytes,
238 spurious_losses,
239 spurious_pkt_thresh,
240 has_ack_eliciting,
241 }
242 }
243
244 fn detect_and_remove_lost_packets(
245 &mut self, loss_delay: Duration, pkt_thresh: u64, now: Instant,
246 newly_lost: &mut Vec<Lost>,
247 ) -> LossDetectionResult {
248 newly_lost.clear();
249 let mut lost_bytes = 0;
250 self.loss_time = None;
251
252 let lost_send_time = now.checked_sub(loss_delay).unwrap();
253 let largest_acked = self.largest_acked_packet.unwrap_or(0);
254 let mut pmtud_lost_bytes = 0;
255 let mut pmtud_lost_packets = SmallVec::new();
256
257 for SentPacket { pkt_num, status } in &mut self.sent_packets {
258 if *pkt_num > largest_acked {
259 break;
260 }
261
262 if let SentStatus::Sent { time_sent, .. } = status {
263 if *time_sent <= lost_send_time ||
264 largest_acked >= *pkt_num + pkt_thresh
265 {
266 if let SentStatus::Sent {
267 in_flight,
268 sent_bytes,
269 frames,
270 pmtud,
271 ..
272 } = status.lose()
273 {
274 self.lost_frames.extend(frames);
275
276 if in_flight {
277 self.pkts_in_flight -= 1;
278
279 if pmtud {
280 pmtud_lost_bytes += sent_bytes;
281 pmtud_lost_packets.push(*pkt_num);
282 continue;
284 }
285
286 lost_bytes += sent_bytes;
287 }
288
289 newly_lost.push(Lost {
290 packet_number: *pkt_num,
291 bytes_lost: sent_bytes,
292 });
293 }
294 } else {
295 self.loss_time = Some(*time_sent + loss_delay);
296 break;
297 }
298 }
299 }
300
301 LossDetectionResult {
302 lost_bytes,
303 lost_packets: newly_lost.len(),
304
305 pmtud_lost_bytes,
306 pmtud_lost_packets,
307 }
308 }
309
310 fn drain_acked_and_lost_packets(&mut self) {
314 while let Some(SentPacket {
315 status: SentStatus::Acked | SentStatus::Lost,
316 ..
317 }) = self.sent_packets.front()
318 {
319 self.sent_packets.pop_front();
320 }
321 }
322
323 fn least_unacked(&self) -> u64 {
324 for pkt in self.sent_packets.iter() {
325 if let SentPacket {
326 pkt_num,
327 status: SentStatus::Sent { .. },
328 } = pkt
329 {
330 return *pkt_num;
331 }
332 }
333
334 self.largest_acked_packet.unwrap_or(0) + 1
335 }
336}
337
338pub struct GRecovery {
339 epochs: [RecoveryEpoch; packet::Epoch::count()],
340
341 loss_timer: LossDetectionTimer,
342
343 pto_count: u32,
344
345 rtt_stats: RttStats,
346
347 pub lost_count: usize,
348
349 pub lost_spurious_count: usize,
350
351 pkt_thresh: u64,
352
353 time_thresh: f64,
354
355 bytes_in_flight: usize,
356
357 bytes_sent: usize,
358
359 pub bytes_lost: u64,
360
361 max_datagram_size: usize,
362
363 #[cfg(feature = "qlog")]
364 qlog_metrics: QlogMetrics,
365
366 outstanding_non_ack_eliciting: usize,
368
369 newly_acked: Vec<Acked>,
371
372 lost_reuse: Vec<Lost>,
375
376 pacer: Pacer,
377}
378
379impl GRecovery {
380 pub fn new(recovery_config: &RecoveryConfig) -> Option<Self> {
381 let cc = match recovery_config.cc_algorithm {
382 CongestionControlAlgorithm::Bbr2Gcongestion => Congestion::bbrv2(
383 INITIAL_WINDOW_PACKETS,
384 MAX_WINDOW_PACKETS,
385 recovery_config.max_send_udp_payload_size,
386 ),
387 _ => return None,
388 };
389
390 Some(Self {
391 epochs: Default::default(),
392 rtt_stats: RttStats::new(recovery_config.max_ack_delay),
393 loss_timer: Default::default(),
394 pto_count: 0,
395
396 lost_count: 0,
397 lost_spurious_count: 0,
398
399 pkt_thresh: INITIAL_PACKET_THRESHOLD,
400 time_thresh: INITIAL_TIME_THRESHOLD,
401
402 bytes_in_flight: 0,
403 bytes_sent: 0,
404 bytes_lost: 0,
405
406 max_datagram_size: recovery_config.max_send_udp_payload_size,
407
408 #[cfg(feature = "qlog")]
409 qlog_metrics: QlogMetrics::default(),
410
411 outstanding_non_ack_eliciting: 0,
412
413 pacer: Pacer::new(
414 recovery_config.pacing,
415 cc,
416 recovery_config
417 .max_pacing_rate
418 .map(Bandwidth::from_mbits_per_second),
419 ),
420
421 newly_acked: Vec::new(),
422 lost_reuse: Vec::new(),
423 })
424 }
425
426 fn detect_and_remove_lost_packets(
427 &mut self, epoch: packet::Epoch, now: Instant,
428 ) -> (usize, usize) {
429 let lost = &mut self.lost_reuse;
430
431 let LossDetectionResult {
432 lost_bytes,
433 lost_packets,
434
435 pmtud_lost_bytes,
436 pmtud_lost_packets,
437 } = self.epochs[epoch].detect_and_remove_lost_packets(
438 self.rtt_stats.loss_delay(self.time_thresh),
439 self.pkt_thresh,
440 now,
441 lost,
442 );
443
444 self.bytes_in_flight -= lost_bytes + pmtud_lost_bytes;
445
446 for pkt in pmtud_lost_packets {
447 self.pacer.on_packet_neutered(pkt);
448 }
449
450 (lost_bytes, lost_packets)
451 }
452
453 fn loss_time_and_space(&self) -> (Option<Instant>, packet::Epoch) {
454 let mut epoch = packet::Epoch::Initial;
455 let mut time = self.epochs[epoch].loss_time;
456
457 for e in [packet::Epoch::Handshake, packet::Epoch::Application] {
459 let new_time = self.epochs[e].loss_time;
460 if time.is_none() || new_time < time {
461 time = new_time;
462 epoch = e;
463 }
464 }
465
466 (time, epoch)
467 }
468
469 fn pto_time_and_space(
470 &self, handshake_status: HandshakeStatus, now: Instant,
471 ) -> (Option<Instant>, packet::Epoch) {
472 let mut duration = self.pto() * (1 << self.pto_count);
473
474 if self.bytes_in_flight == 0 {
476 if handshake_status.has_handshake_keys {
477 return (Some(now + duration), packet::Epoch::Handshake);
478 } else {
479 return (Some(now + duration), packet::Epoch::Initial);
480 }
481 }
482
483 let mut pto_timeout = None;
484 let mut pto_space = packet::Epoch::Initial;
485
486 for &e in packet::Epoch::epochs(
488 packet::Epoch::Initial..=packet::Epoch::Application,
489 ) {
490 if self.epochs[e].pkts_in_flight == 0 {
491 continue;
492 }
493
494 if e == packet::Epoch::Application {
495 if !handshake_status.completed {
497 return (pto_timeout, pto_space);
498 }
499
500 duration +=
502 self.rtt_stats.max_ack_delay * 2_u32.pow(self.pto_count);
503 }
504
505 let new_time = self.epochs[e]
506 .time_of_last_ack_eliciting_packet
507 .map(|t| t + duration);
508
509 if pto_timeout.is_none() || new_time < pto_timeout {
510 pto_timeout = new_time;
511 pto_space = e;
512 }
513 }
514
515 (pto_timeout, pto_space)
516 }
517
518 fn set_loss_detection_timer(
519 &mut self, handshake_status: HandshakeStatus, now: Instant,
520 ) {
521 if let (Some(earliest_loss_time), _) = self.loss_time_and_space() {
522 self.loss_timer.update(earliest_loss_time);
524 return;
525 }
526
527 if self.bytes_in_flight == 0 && handshake_status.peer_verified_address {
528 self.loss_timer.clear();
529 return;
530 }
531
532 if let (Some(timeout), _) = self.pto_time_and_space(handshake_status, now)
534 {
535 self.loss_timer.update(timeout);
536 }
537 }
538}
539
540impl RecoveryOps for GRecovery {
541 fn lost_count(&self) -> usize {
542 self.lost_count
543 }
544
545 fn bytes_lost(&self) -> u64 {
546 self.bytes_lost
547 }
548
549 fn should_elicit_ack(&self, epoch: packet::Epoch) -> bool {
550 self.epochs[epoch].loss_probes > 0 ||
551 self.outstanding_non_ack_eliciting >=
552 MAX_OUTSTANDING_NON_ACK_ELICITING
553 }
554
555 fn get_acked_frames(&mut self, epoch: packet::Epoch) -> Vec<frame::Frame> {
556 std::mem::take(&mut self.epochs[epoch].acked_frames)
557 }
558
559 fn get_lost_frames(&mut self, epoch: packet::Epoch) -> Vec<frame::Frame> {
560 std::mem::take(&mut self.epochs[epoch].lost_frames)
561 }
562
563 fn get_largest_acked_on_epoch(&self, epoch: packet::Epoch) -> Option<u64> {
564 self.epochs[epoch].largest_acked_packet
565 }
566
567 fn has_lost_frames(&self, epoch: packet::Epoch) -> bool {
568 !self.epochs[epoch].lost_frames.is_empty()
569 }
570
571 fn loss_probes(&self, epoch: packet::Epoch) -> usize {
572 self.epochs[epoch].loss_probes
573 }
574
575 #[cfg(test)]
576 fn inc_loss_probes(&mut self, epoch: packet::Epoch) {
577 self.epochs[epoch].loss_probes += 1;
578 }
579
580 fn ping_sent(&mut self, epoch: packet::Epoch) {
581 self.epochs[epoch].loss_probes =
582 self.epochs[epoch].loss_probes.saturating_sub(1);
583 }
584
585 fn on_packet_sent(
586 &mut self, pkt: Sent, epoch: packet::Epoch,
587 handshake_status: HandshakeStatus, now: Instant, trace_id: &str,
588 ) {
589 let time_sent = self.get_next_release_time().time(now).unwrap_or(now);
590
591 let epoch = &mut self.epochs[epoch];
592
593 let ack_eliciting = pkt.ack_eliciting;
594 let in_flight = pkt.in_flight;
595 let sent_bytes = pkt.size;
596 let pkt_num = pkt.pkt_num;
597
598 if let Some(SentPacket { pkt_num, .. }) = epoch.sent_packets.back() {
599 assert!(*pkt_num < pkt.pkt_num, "Packet numbers must increase");
600 }
601
602 let status = SentStatus::Sent {
603 time_sent,
604 ack_eliciting,
605 in_flight,
606 pmtud: pkt.pmtud,
607 has_data: pkt.has_data,
608 sent_bytes,
609 frames: pkt.frames,
610 };
611
612 epoch.sent_packets.push_back(SentPacket { pkt_num, status });
613
614 if ack_eliciting {
615 epoch.time_of_last_ack_eliciting_packet = Some(time_sent);
616 self.outstanding_non_ack_eliciting = 0;
617 } else {
618 self.outstanding_non_ack_eliciting += 1;
619 }
620
621 if in_flight {
622 self.pacer.on_packet_sent(
623 time_sent,
624 self.bytes_in_flight,
625 pkt_num,
626 sent_bytes,
627 pkt.has_data,
628 &self.rtt_stats,
629 );
630
631 self.bytes_in_flight += sent_bytes;
632 epoch.pkts_in_flight += 1;
633 self.set_loss_detection_timer(handshake_status, time_sent);
634 }
635
636 self.bytes_sent += sent_bytes;
637
638 trace!("{} {:?}", trace_id, self);
639 }
640
641 fn get_packet_send_time(&self) -> Instant {
642 let now = Instant::now();
643 self.pacer.get_next_release_time().time(now).unwrap_or(now)
644 }
645
646 fn on_ack_received(
647 &mut self, ranges: &RangeSet, ack_delay: u64, epoch: packet::Epoch,
648 handshake_status: HandshakeStatus, now: Instant, trace_id: &str,
649 ) -> (usize, usize, usize) {
650 let prior_in_flight = self.bytes_in_flight;
651
652 let AckedDetectionResult {
653 acked_bytes,
654 spurious_losses,
655 spurious_pkt_thresh,
656 has_ack_eliciting,
657 } = self.epochs[epoch].detect_and_remove_acked_packets(
658 ranges,
659 &mut self.newly_acked,
660 trace_id,
661 );
662
663 self.lost_spurious_count += spurious_losses;
664 if let Some(thresh) = spurious_pkt_thresh {
665 self.pkt_thresh =
666 self.pkt_thresh.max(thresh.min(MAX_PACKET_THRESHOLD));
667 }
668
669 if self.newly_acked.is_empty() {
670 return (0, 0, 0);
671 }
672
673 self.bytes_in_flight -= acked_bytes;
674
675 let largest_newly_acked = self.newly_acked.last().unwrap();
677 let update_rtt: bool = largest_newly_acked.pkt_num ==
678 ranges.last().unwrap() &&
679 has_ack_eliciting;
680 if update_rtt {
681 let latest_rtt = now - largest_newly_acked.time_sent;
682 self.rtt_stats.update_rtt(
683 latest_rtt,
684 Duration::from_micros(ack_delay),
685 now,
686 handshake_status.completed,
687 );
688 }
689
690 let (lost_bytes, lost_packets) =
691 self.detect_and_remove_lost_packets(epoch, now);
692
693 self.pacer.on_congestion_event(
694 update_rtt,
695 prior_in_flight,
696 self.bytes_in_flight,
697 now,
698 &self.newly_acked,
699 &self.lost_reuse,
700 self.epochs[epoch].least_unacked(),
701 &self.rtt_stats,
702 );
703
704 self.pto_count = 0;
705 self.lost_count += lost_packets;
706
707 self.set_loss_detection_timer(handshake_status, now);
708
709 trace!("{} {:?}", trace_id, self);
710
711 (lost_packets, lost_bytes, acked_bytes)
712 }
713
714 fn on_loss_detection_timeout(
715 &mut self, handshake_status: HandshakeStatus, now: Instant,
716 trace_id: &str,
717 ) -> (usize, usize) {
718 let (earliest_loss_time, epoch) = self.loss_time_and_space();
719
720 if earliest_loss_time.is_some() {
721 let prior_in_flight = self.bytes_in_flight;
722
723 let (lost_bytes, lost_packets) =
724 self.detect_and_remove_lost_packets(epoch, now);
725
726 self.pacer.on_congestion_event(
727 false,
728 prior_in_flight,
729 self.bytes_in_flight,
730 now,
731 &[],
732 &self.lost_reuse,
733 self.epochs[epoch].least_unacked(),
734 &self.rtt_stats,
735 );
736
737 self.lost_count += lost_packets;
738
739 self.set_loss_detection_timer(handshake_status, now);
740
741 trace!("{} {:?}", trace_id, self);
742 return (lost_packets, lost_bytes);
743 }
744
745 let epoch = if self.bytes_in_flight > 0 {
746 let (_, e) = self.pto_time_and_space(handshake_status, now);
749
750 e
751 } else {
752 if handshake_status.has_handshake_keys {
756 packet::Epoch::Handshake
757 } else {
758 packet::Epoch::Initial
759 }
760 };
761
762 self.pto_count += 1;
763
764 let epoch = &mut self.epochs[epoch];
765
766 epoch.loss_probes = MAX_PTO_PROBES_COUNT.min(self.pto_count as usize);
767
768 let unacked_frames = epoch
772 .sent_packets
773 .iter_mut()
774 .filter_map(|p| {
775 if let SentStatus::Sent {
776 has_data: true,
777 frames,
778 ..
779 } = &p.status
780 {
781 Some(frames)
782 } else {
783 None
784 }
785 })
786 .take(epoch.loss_probes)
787 .flatten()
788 .filter(|f| !matches!(f, frame::Frame::DatagramHeader { .. }));
789
790 epoch.lost_frames.extend(unacked_frames.cloned());
798
799 self.pacer
800 .on_retransmission_timeout(!epoch.lost_frames.is_empty());
801
802 self.set_loss_detection_timer(handshake_status, now);
803
804 trace!("{} {:?}", trace_id, self);
805 (0, 0)
806 }
807
808 fn on_pkt_num_space_discarded(
809 &mut self, epoch: packet::Epoch, handshake_status: HandshakeStatus,
810 now: Instant,
811 ) {
812 let epoch = &mut self.epochs[epoch];
813 self.bytes_in_flight = self
814 .bytes_in_flight
815 .saturating_sub(epoch.discard(&mut self.pacer));
816 self.set_loss_detection_timer(handshake_status, now);
817 }
818
819 fn on_path_change(
820 &mut self, epoch: packet::Epoch, now: Instant, _trace_id: &str,
821 ) -> (usize, usize) {
822 let (lost_bytes, lost_packets) =
823 self.detect_and_remove_lost_packets(epoch, now);
824
825 (lost_packets, lost_bytes)
826 }
827
828 fn loss_detection_timer(&self) -> Option<Instant> {
829 self.loss_timer.time
830 }
831
832 fn cwnd(&self) -> usize {
833 self.pacer.get_congestion_window()
834 }
835
836 fn cwnd_available(&self) -> usize {
837 if self.epochs.iter().any(|e| e.loss_probes > 0) {
839 return usize::MAX;
840 }
841
842 self.cwnd().saturating_sub(self.bytes_in_flight)
843 }
844
845 fn rtt(&self) -> Duration {
846 self.rtt_stats.rtt()
847 }
848
849 fn min_rtt(&self) -> Option<Duration> {
850 self.rtt_stats.min_rtt()
851 }
852
853 fn rttvar(&self) -> Duration {
854 self.rtt_stats.rttvar()
855 }
856
857 fn pto(&self) -> Duration {
858 let r = &self.rtt_stats;
859 r.rtt() + (r.rttvar() * 4).max(GRANULARITY)
860 }
861
862 fn delivery_rate(&self) -> u64 {
863 self.pacer
864 .bandwidth_estimate(&self.rtt_stats)
865 .to_bits_per_second()
866 }
867
868 fn max_datagram_size(&self) -> usize {
869 self.max_datagram_size
870 }
871
872 fn pmtud_update_max_datagram_size(&mut self, new_max_datagram_size: usize) {
873 self.max_datagram_size = new_max_datagram_size;
874 self.pacer.update_mss(self.max_datagram_size);
875 }
876
877 fn update_max_datagram_size(&mut self, new_max_datagram_size: usize) {
878 self.pmtud_update_max_datagram_size(
879 self.max_datagram_size.min(new_max_datagram_size),
880 )
881 }
882
883 fn on_app_limited(&mut self) {
884 self.pacer.on_app_limited(self.bytes_in_flight)
885 }
886
887 #[cfg(test)]
888 fn sent_packets_len(&self, epoch: packet::Epoch) -> usize {
889 self.epochs[epoch].sent_packets.len()
890 }
891
892 #[cfg(test)]
893 fn in_flight_count(&self, epoch: packet::Epoch) -> usize {
894 self.epochs[epoch].pkts_in_flight
895 }
896
897 #[cfg(test)]
898 fn bytes_in_flight(&self) -> usize {
899 self.bytes_in_flight
900 }
901
902 #[cfg(test)]
903 fn pacing_rate(&self) -> u64 {
904 self.pacer
905 .pacing_rate(self.bytes_in_flight, &self.rtt_stats)
906 .to_bytes_per_period(Duration::from_secs(1))
907 }
908
909 #[cfg(test)]
910 fn pto_count(&self) -> u32 {
911 self.pto_count
912 }
913
914 #[cfg(test)]
915 fn pkt_thresh(&self) -> u64 {
916 self.pkt_thresh
917 }
918
919 #[cfg(test)]
920 fn lost_spurious_count(&self) -> usize {
921 self.lost_spurious_count
922 }
923
924 #[cfg(test)]
925 fn detect_lost_packets_for_test(
926 &mut self, epoch: packet::Epoch, now: Instant,
927 ) -> (usize, usize) {
928 let ret = self.detect_and_remove_lost_packets(epoch, now);
929 self.epochs[epoch].drain_acked_and_lost_packets();
930 ret
931 }
932
933 #[cfg(test)]
934 fn app_limited(&self) -> bool {
935 self.pacer.is_app_limited(self.bytes_in_flight)
936 }
937
938 fn update_app_limited(&mut self, _v: bool) {
939 }
941
942 fn delivery_rate_update_app_limited(&mut self, _v: bool) {
943 }
945
946 fn update_max_ack_delay(&mut self, max_ack_delay: Duration) {
947 self.rtt_stats.max_ack_delay = max_ack_delay;
948 }
949
950 fn get_next_release_time(&self) -> ReleaseDecision {
951 self.pacer.get_next_release_time()
952 }
953
954 fn gcongestion_enabled(&self) -> bool {
955 true
956 }
957
958 #[cfg(feature = "qlog")]
959 fn maybe_qlog(&mut self) -> Option<EventData> {
960 let qlog_metrics = QlogMetrics {
961 min_rtt: self.rtt_stats.min_rtt().unwrap_or(Duration::MAX),
962 smoothed_rtt: self.rtt(),
963 latest_rtt: self.rtt_stats.latest_rtt(),
964 rttvar: self.rtt_stats.rttvar(),
965 cwnd: self.cwnd() as u64,
966 bytes_in_flight: self.bytes_in_flight as u64,
967 ssthresh: self.pacer.ssthresh(),
968 pacing_rate: self.delivery_rate(),
969 };
970
971 self.qlog_metrics.maybe_update(qlog_metrics)
972 }
973
974 fn send_quantum(&self) -> usize {
975 let pacing_rate = self
976 .pacer
977 .pacing_rate(self.bytes_in_flight, &self.rtt_stats);
978
979 let floor = if pacing_rate < Bandwidth::from_kbits_per_second(1200) {
980 self.max_datagram_size
981 } else {
982 2 * self.max_datagram_size
983 };
984
985 pacing_rate
986 .to_bytes_per_period(ReleaseDecision::EQUAL_THRESHOLD)
987 .min(64 * 1024)
988 .max(floor as u64) as usize
989 }
990}
991
992impl std::fmt::Debug for GRecovery {
993 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
994 write!(f, "timer={:?} ", self.loss_detection_timer())?;
995 write!(f, "rtt_stats={:?} ", self.rtt_stats)?;
996 write!(f, "bytes_in_flight={} ", self.bytes_in_flight)?;
997 write!(f, "{:?} ", self.pacer)?;
998 Ok(())
999 }
1000}