quiche/recovery/gcongestion/
recovery.rs

1use crate::packet;
2
3use std::collections::VecDeque;
4use std::time::Duration;
5use std::time::Instant;
6
7use smallvec::SmallVec;
8
9#[cfg(feature = "qlog")]
10use qlog::events::EventData;
11
12#[cfg(feature = "qlog")]
13use crate::recovery::QlogMetrics;
14
15use crate::frame;
16
17use crate::recovery::rtt::RttStats;
18use crate::recovery::CongestionControlAlgorithm;
19use crate::recovery::HandshakeStatus;
20use crate::recovery::LossDetectionTimer;
21use crate::recovery::RangeSet;
22use crate::recovery::RecoveryConfig;
23use crate::recovery::RecoveryOps;
24use crate::recovery::ReleaseDecision;
25use crate::recovery::Sent;
26use crate::recovery::GRANULARITY;
27use crate::recovery::INITIAL_PACKET_THRESHOLD;
28use crate::recovery::INITIAL_TIME_THRESHOLD;
29use crate::recovery::MAX_OUTSTANDING_NON_ACK_ELICITING;
30use crate::recovery::MAX_PACKET_THRESHOLD;
31use crate::recovery::MAX_PTO_PROBES_COUNT;
32
33use super::bandwidth::Bandwidth;
34use super::pacer::Pacer;
35use super::Acked;
36use super::Congestion;
37use super::CongestionControl;
38use super::Lost;
39
40// Congestion Control
41const INITIAL_WINDOW_PACKETS: usize = 10;
42
43const MAX_WINDOW_PACKETS: usize = 20_000;
44
45#[derive(Debug)]
46struct SentPacket {
47    pkt_num: u64,
48    status: SentStatus,
49}
50
51#[derive(Debug)]
52enum SentStatus {
53    Sent {
54        time_sent: Instant,
55        ack_eliciting: bool,
56        in_flight: bool,
57        has_data: bool,
58        pmtud: bool,
59        sent_bytes: usize,
60        frames: SmallVec<[frame::Frame; 1]>,
61    },
62    Acked,
63    Lost,
64}
65
66impl SentStatus {
67    fn ack(&mut self) -> Self {
68        std::mem::replace(self, SentStatus::Acked)
69    }
70
71    fn lose(&mut self) -> Self {
72        if !matches!(self, SentStatus::Acked) {
73            std::mem::replace(self, SentStatus::Lost)
74        } else {
75            SentStatus::Acked
76        }
77    }
78}
79
80#[derive(Default)]
81struct RecoveryEpoch {
82    /// The time the most recent ack-eliciting packet was sent.
83    time_of_last_ack_eliciting_packet: Option<Instant>,
84
85    /// The largest packet number acknowledged in the packet number space so
86    /// far.
87    largest_acked_packet: Option<u64>,
88
89    /// The time at which the next packet in that packet number space can be
90    /// considered lost based on exceeding the reordering window in time.
91    loss_time: Option<Instant>,
92
93    /// An association of packet numbers in a packet number space to information
94    /// about them.
95    sent_packets: VecDeque<SentPacket>,
96
97    loss_probes: usize,
98    pkts_in_flight: usize,
99
100    acked_frames: Vec<frame::Frame>,
101    lost_frames: Vec<frame::Frame>,
102}
103
104struct AckedDetectionResult {
105    acked_bytes: usize,
106    spurious_losses: usize,
107    spurious_pkt_thresh: Option<u64>,
108    has_ack_eliciting: bool,
109}
110
111struct LossDetectionResult {
112    lost_bytes: usize,
113    lost_packets: usize,
114
115    pmtud_lost_bytes: usize,
116    pmtud_lost_packets: SmallVec<[u64; 1]>,
117}
118
119impl RecoveryEpoch {
120    /// Discard the Epoch state and return the total size of unacked packets
121    /// that were discarded
122    fn discard(&mut self, cc: &mut impl CongestionControl) -> usize {
123        let unacked_bytes = self
124            .sent_packets
125            .drain(..)
126            .map(|p| {
127                if let SentPacket {
128                    status:
129                        SentStatus::Sent {
130                            in_flight,
131                            sent_bytes,
132                            ..
133                        },
134                    pkt_num,
135                } = p
136                {
137                    cc.on_packet_neutered(pkt_num);
138                    if in_flight {
139                        return sent_bytes;
140                    }
141                }
142                0
143            })
144            .sum();
145
146        std::mem::take(&mut self.sent_packets);
147        self.time_of_last_ack_eliciting_packet = None;
148        self.loss_time = None;
149        self.loss_probes = 0;
150        self.pkts_in_flight = 0;
151
152        unacked_bytes
153    }
154
155    fn detect_and_remove_acked_packets(
156        &mut self, acked: &RangeSet, newly_acked: &mut Vec<Acked>, trace_id: &str,
157    ) -> AckedDetectionResult {
158        // Update the largest acked packet
159        self.largest_acked_packet.replace(
160            self.largest_acked_packet
161                .unwrap_or(0)
162                .max(acked.last().unwrap()),
163        );
164
165        newly_acked.clear();
166
167        let mut acked_bytes = 0;
168        let mut spurious_losses = 0;
169        let mut spurious_pkt_thresh = None;
170        let mut has_ack_eliciting = false;
171
172        let largest_acked = self.largest_acked_packet.unwrap();
173
174        for ack in acked.iter() {
175            // Because packets always have incrementing numbers, they are always
176            // in sorted order.
177            let start = if self
178                .sent_packets
179                .front()
180                .filter(|e| e.pkt_num >= ack.start)
181                .is_some()
182            {
183                // Usually it will be the first packet.
184                0
185            } else {
186                self.sent_packets
187                    .binary_search_by_key(&ack.start, |p| p.pkt_num)
188                    .unwrap_or_else(|e| e)
189            };
190
191            for SentPacket { pkt_num, status } in
192                self.sent_packets.range_mut(start..)
193            {
194                if *pkt_num < ack.end {
195                    match status.ack() {
196                        SentStatus::Sent {
197                            time_sent,
198                            in_flight,
199                            sent_bytes,
200                            frames,
201                            ack_eliciting,
202                            ..
203                        } => {
204                            if in_flight {
205                                self.pkts_in_flight -= 1;
206                                acked_bytes += sent_bytes;
207                            }
208                            newly_acked.push(Acked {
209                                pkt_num: *pkt_num,
210                                time_sent,
211                            });
212
213                            self.acked_frames.extend(frames);
214
215                            has_ack_eliciting |= ack_eliciting;
216
217                            trace!("{} packet newly acked {}", trace_id, pkt_num);
218                        },
219
220                        SentStatus::Acked => {},
221                        SentStatus::Lost => {
222                            // An acked packet was already declared lost
223                            spurious_losses += 1;
224                            spurious_pkt_thresh
225                                .get_or_insert(largest_acked - *pkt_num + 1);
226                        },
227                    }
228                } else {
229                    break;
230                }
231            }
232        }
233
234        self.drain_acked_and_lost_packets();
235
236        AckedDetectionResult {
237            acked_bytes,
238            spurious_losses,
239            spurious_pkt_thresh,
240            has_ack_eliciting,
241        }
242    }
243
244    fn detect_and_remove_lost_packets(
245        &mut self, loss_delay: Duration, pkt_thresh: u64, now: Instant,
246        newly_lost: &mut Vec<Lost>,
247    ) -> LossDetectionResult {
248        newly_lost.clear();
249        let mut lost_bytes = 0;
250        self.loss_time = None;
251
252        let lost_send_time = now.checked_sub(loss_delay).unwrap();
253        let largest_acked = self.largest_acked_packet.unwrap_or(0);
254        let mut pmtud_lost_bytes = 0;
255        let mut pmtud_lost_packets = SmallVec::new();
256
257        for SentPacket { pkt_num, status } in &mut self.sent_packets {
258            if *pkt_num > largest_acked {
259                break;
260            }
261
262            if let SentStatus::Sent { time_sent, .. } = status {
263                if *time_sent <= lost_send_time ||
264                    largest_acked >= *pkt_num + pkt_thresh
265                {
266                    if let SentStatus::Sent {
267                        in_flight,
268                        sent_bytes,
269                        frames,
270                        pmtud,
271                        ..
272                    } = status.lose()
273                    {
274                        self.lost_frames.extend(frames);
275
276                        if in_flight {
277                            self.pkts_in_flight -= 1;
278
279                            if pmtud {
280                                pmtud_lost_bytes += sent_bytes;
281                                pmtud_lost_packets.push(*pkt_num);
282                                // Do not track PMTUD probes losses
283                                continue;
284                            }
285
286                            lost_bytes += sent_bytes;
287                        }
288
289                        newly_lost.push(Lost {
290                            packet_number: *pkt_num,
291                            bytes_lost: sent_bytes,
292                        });
293                    }
294                } else {
295                    self.loss_time = Some(*time_sent + loss_delay);
296                    break;
297                }
298            }
299        }
300
301        LossDetectionResult {
302            lost_bytes,
303            lost_packets: newly_lost.len(),
304
305            pmtud_lost_bytes,
306            pmtud_lost_packets,
307        }
308    }
309
310    /// Remove packets that were already handled from the front of the queue,
311    /// but avoid removing packets from the middle of the queue to avoid
312    /// compaction
313    fn drain_acked_and_lost_packets(&mut self) {
314        while let Some(SentPacket {
315            status: SentStatus::Acked | SentStatus::Lost,
316            ..
317        }) = self.sent_packets.front()
318        {
319            self.sent_packets.pop_front();
320        }
321    }
322
323    fn least_unacked(&self) -> u64 {
324        for pkt in self.sent_packets.iter() {
325            if let SentPacket {
326                pkt_num,
327                status: SentStatus::Sent { .. },
328            } = pkt
329            {
330                return *pkt_num;
331            }
332        }
333
334        self.largest_acked_packet.unwrap_or(0) + 1
335    }
336}
337
338pub struct GRecovery {
339    epochs: [RecoveryEpoch; packet::Epoch::count()],
340
341    loss_timer: LossDetectionTimer,
342
343    pto_count: u32,
344
345    rtt_stats: RttStats,
346
347    pub lost_count: usize,
348
349    pub lost_spurious_count: usize,
350
351    pkt_thresh: u64,
352
353    time_thresh: f64,
354
355    bytes_in_flight: usize,
356
357    bytes_sent: usize,
358
359    pub bytes_lost: u64,
360
361    max_datagram_size: usize,
362
363    #[cfg(feature = "qlog")]
364    qlog_metrics: QlogMetrics,
365
366    /// How many non-ack-eliciting packets have been sent.
367    outstanding_non_ack_eliciting: usize,
368
369    /// A resusable list of acks.
370    newly_acked: Vec<Acked>,
371
372    /// A [`Vec`] that can be reused for calls of
373    /// [`detect_and_remove_lost_packets`] to avoid allocations
374    lost_reuse: Vec<Lost>,
375
376    pacer: Pacer,
377}
378
379impl GRecovery {
380    pub fn new(recovery_config: &RecoveryConfig) -> Option<Self> {
381        let cc = match recovery_config.cc_algorithm {
382            CongestionControlAlgorithm::Bbr2Gcongestion => Congestion::bbrv2(
383                INITIAL_WINDOW_PACKETS,
384                MAX_WINDOW_PACKETS,
385                recovery_config.max_send_udp_payload_size,
386            ),
387            _ => return None,
388        };
389
390        Some(Self {
391            epochs: Default::default(),
392            rtt_stats: RttStats::new(recovery_config.max_ack_delay),
393            loss_timer: Default::default(),
394            pto_count: 0,
395
396            lost_count: 0,
397            lost_spurious_count: 0,
398
399            pkt_thresh: INITIAL_PACKET_THRESHOLD,
400            time_thresh: INITIAL_TIME_THRESHOLD,
401
402            bytes_in_flight: 0,
403            bytes_sent: 0,
404            bytes_lost: 0,
405
406            max_datagram_size: recovery_config.max_send_udp_payload_size,
407
408            #[cfg(feature = "qlog")]
409            qlog_metrics: QlogMetrics::default(),
410
411            outstanding_non_ack_eliciting: 0,
412
413            pacer: Pacer::new(
414                recovery_config.pacing,
415                cc,
416                recovery_config
417                    .max_pacing_rate
418                    .map(Bandwidth::from_mbits_per_second),
419            ),
420
421            newly_acked: Vec::new(),
422            lost_reuse: Vec::new(),
423        })
424    }
425
426    fn detect_and_remove_lost_packets(
427        &mut self, epoch: packet::Epoch, now: Instant,
428    ) -> (usize, usize) {
429        let lost = &mut self.lost_reuse;
430
431        let LossDetectionResult {
432            lost_bytes,
433            lost_packets,
434
435            pmtud_lost_bytes,
436            pmtud_lost_packets,
437        } = self.epochs[epoch].detect_and_remove_lost_packets(
438            self.rtt_stats.loss_delay(self.time_thresh),
439            self.pkt_thresh,
440            now,
441            lost,
442        );
443
444        self.bytes_in_flight -= lost_bytes + pmtud_lost_bytes;
445
446        for pkt in pmtud_lost_packets {
447            self.pacer.on_packet_neutered(pkt);
448        }
449
450        (lost_bytes, lost_packets)
451    }
452
453    fn loss_time_and_space(&self) -> (Option<Instant>, packet::Epoch) {
454        let mut epoch = packet::Epoch::Initial;
455        let mut time = self.epochs[epoch].loss_time;
456
457        // Iterate over all packet number spaces starting from Handshake.
458        for e in [packet::Epoch::Handshake, packet::Epoch::Application] {
459            let new_time = self.epochs[e].loss_time;
460            if time.is_none() || new_time < time {
461                time = new_time;
462                epoch = e;
463            }
464        }
465
466        (time, epoch)
467    }
468
469    fn pto_time_and_space(
470        &self, handshake_status: HandshakeStatus, now: Instant,
471    ) -> (Option<Instant>, packet::Epoch) {
472        let mut duration = self.pto() * (1 << self.pto_count);
473
474        // Arm PTO from now when there are no inflight packets.
475        if self.bytes_in_flight == 0 {
476            if handshake_status.has_handshake_keys {
477                return (Some(now + duration), packet::Epoch::Handshake);
478            } else {
479                return (Some(now + duration), packet::Epoch::Initial);
480            }
481        }
482
483        let mut pto_timeout = None;
484        let mut pto_space = packet::Epoch::Initial;
485
486        // Iterate over all packet number spaces.
487        for &e in packet::Epoch::epochs(
488            packet::Epoch::Initial..=packet::Epoch::Application,
489        ) {
490            if self.epochs[e].pkts_in_flight == 0 {
491                continue;
492            }
493
494            if e == packet::Epoch::Application {
495                // Skip Application Data until handshake completes.
496                if !handshake_status.completed {
497                    return (pto_timeout, pto_space);
498                }
499
500                // Include max_ack_delay and backoff for Application Data.
501                duration +=
502                    self.rtt_stats.max_ack_delay * 2_u32.pow(self.pto_count);
503            }
504
505            let new_time = self.epochs[e]
506                .time_of_last_ack_eliciting_packet
507                .map(|t| t + duration);
508
509            if pto_timeout.is_none() || new_time < pto_timeout {
510                pto_timeout = new_time;
511                pto_space = e;
512            }
513        }
514
515        (pto_timeout, pto_space)
516    }
517
518    fn set_loss_detection_timer(
519        &mut self, handshake_status: HandshakeStatus, now: Instant,
520    ) {
521        if let (Some(earliest_loss_time), _) = self.loss_time_and_space() {
522            // Time threshold loss detection.
523            self.loss_timer.update(earliest_loss_time);
524            return;
525        }
526
527        if self.bytes_in_flight == 0 && handshake_status.peer_verified_address {
528            self.loss_timer.clear();
529            return;
530        }
531
532        // PTO timer.
533        if let (Some(timeout), _) = self.pto_time_and_space(handshake_status, now)
534        {
535            self.loss_timer.update(timeout);
536        }
537    }
538}
539
540impl RecoveryOps for GRecovery {
541    fn lost_count(&self) -> usize {
542        self.lost_count
543    }
544
545    fn bytes_lost(&self) -> u64 {
546        self.bytes_lost
547    }
548
549    fn should_elicit_ack(&self, epoch: packet::Epoch) -> bool {
550        self.epochs[epoch].loss_probes > 0 ||
551            self.outstanding_non_ack_eliciting >=
552                MAX_OUTSTANDING_NON_ACK_ELICITING
553    }
554
555    fn get_acked_frames(&mut self, epoch: packet::Epoch) -> Vec<frame::Frame> {
556        std::mem::take(&mut self.epochs[epoch].acked_frames)
557    }
558
559    fn get_lost_frames(&mut self, epoch: packet::Epoch) -> Vec<frame::Frame> {
560        std::mem::take(&mut self.epochs[epoch].lost_frames)
561    }
562
563    fn get_largest_acked_on_epoch(&self, epoch: packet::Epoch) -> Option<u64> {
564        self.epochs[epoch].largest_acked_packet
565    }
566
567    fn has_lost_frames(&self, epoch: packet::Epoch) -> bool {
568        !self.epochs[epoch].lost_frames.is_empty()
569    }
570
571    fn loss_probes(&self, epoch: packet::Epoch) -> usize {
572        self.epochs[epoch].loss_probes
573    }
574
575    #[cfg(test)]
576    fn inc_loss_probes(&mut self, epoch: packet::Epoch) {
577        self.epochs[epoch].loss_probes += 1;
578    }
579
580    fn ping_sent(&mut self, epoch: packet::Epoch) {
581        self.epochs[epoch].loss_probes =
582            self.epochs[epoch].loss_probes.saturating_sub(1);
583    }
584
585    fn on_packet_sent(
586        &mut self, pkt: Sent, epoch: packet::Epoch,
587        handshake_status: HandshakeStatus, now: Instant, trace_id: &str,
588    ) {
589        let time_sent = self.get_next_release_time().time(now).unwrap_or(now);
590
591        let epoch = &mut self.epochs[epoch];
592
593        let ack_eliciting = pkt.ack_eliciting;
594        let in_flight = pkt.in_flight;
595        let sent_bytes = pkt.size;
596        let pkt_num = pkt.pkt_num;
597
598        if let Some(SentPacket { pkt_num, .. }) = epoch.sent_packets.back() {
599            assert!(*pkt_num < pkt.pkt_num, "Packet numbers must increase");
600        }
601
602        let status = SentStatus::Sent {
603            time_sent,
604            ack_eliciting,
605            in_flight,
606            pmtud: pkt.pmtud,
607            has_data: pkt.has_data,
608            sent_bytes,
609            frames: pkt.frames,
610        };
611
612        epoch.sent_packets.push_back(SentPacket { pkt_num, status });
613
614        if ack_eliciting {
615            epoch.time_of_last_ack_eliciting_packet = Some(time_sent);
616            self.outstanding_non_ack_eliciting = 0;
617        } else {
618            self.outstanding_non_ack_eliciting += 1;
619        }
620
621        if in_flight {
622            self.pacer.on_packet_sent(
623                time_sent,
624                self.bytes_in_flight,
625                pkt_num,
626                sent_bytes,
627                pkt.has_data,
628                &self.rtt_stats,
629            );
630
631            self.bytes_in_flight += sent_bytes;
632            epoch.pkts_in_flight += 1;
633            self.set_loss_detection_timer(handshake_status, time_sent);
634        }
635
636        self.bytes_sent += sent_bytes;
637
638        trace!("{} {:?}", trace_id, self);
639    }
640
641    fn get_packet_send_time(&self) -> Instant {
642        let now = Instant::now();
643        self.pacer.get_next_release_time().time(now).unwrap_or(now)
644    }
645
646    fn on_ack_received(
647        &mut self, ranges: &RangeSet, ack_delay: u64, epoch: packet::Epoch,
648        handshake_status: HandshakeStatus, now: Instant, trace_id: &str,
649    ) -> (usize, usize, usize) {
650        let prior_in_flight = self.bytes_in_flight;
651
652        let AckedDetectionResult {
653            acked_bytes,
654            spurious_losses,
655            spurious_pkt_thresh,
656            has_ack_eliciting,
657        } = self.epochs[epoch].detect_and_remove_acked_packets(
658            ranges,
659            &mut self.newly_acked,
660            trace_id,
661        );
662
663        self.lost_spurious_count += spurious_losses;
664        if let Some(thresh) = spurious_pkt_thresh {
665            self.pkt_thresh =
666                self.pkt_thresh.max(thresh.min(MAX_PACKET_THRESHOLD));
667        }
668
669        if self.newly_acked.is_empty() {
670            return (0, 0, 0);
671        }
672
673        self.bytes_in_flight -= acked_bytes;
674
675        // Check if largest packet is newly acked.
676        let largest_newly_acked = self.newly_acked.last().unwrap();
677        let update_rtt: bool = largest_newly_acked.pkt_num ==
678            ranges.last().unwrap() &&
679            has_ack_eliciting;
680        if update_rtt {
681            let latest_rtt = now - largest_newly_acked.time_sent;
682            self.rtt_stats.update_rtt(
683                latest_rtt,
684                Duration::from_micros(ack_delay),
685                now,
686                handshake_status.completed,
687            );
688        }
689
690        let (lost_bytes, lost_packets) =
691            self.detect_and_remove_lost_packets(epoch, now);
692
693        self.pacer.on_congestion_event(
694            update_rtt,
695            prior_in_flight,
696            self.bytes_in_flight,
697            now,
698            &self.newly_acked,
699            &self.lost_reuse,
700            self.epochs[epoch].least_unacked(),
701            &self.rtt_stats,
702        );
703
704        self.pto_count = 0;
705        self.lost_count += lost_packets;
706
707        self.set_loss_detection_timer(handshake_status, now);
708
709        trace!("{} {:?}", trace_id, self);
710
711        (lost_packets, lost_bytes, acked_bytes)
712    }
713
714    fn on_loss_detection_timeout(
715        &mut self, handshake_status: HandshakeStatus, now: Instant,
716        trace_id: &str,
717    ) -> (usize, usize) {
718        let (earliest_loss_time, epoch) = self.loss_time_and_space();
719
720        if earliest_loss_time.is_some() {
721            let prior_in_flight = self.bytes_in_flight;
722
723            let (lost_bytes, lost_packets) =
724                self.detect_and_remove_lost_packets(epoch, now);
725
726            self.pacer.on_congestion_event(
727                false,
728                prior_in_flight,
729                self.bytes_in_flight,
730                now,
731                &[],
732                &self.lost_reuse,
733                self.epochs[epoch].least_unacked(),
734                &self.rtt_stats,
735            );
736
737            self.lost_count += lost_packets;
738
739            self.set_loss_detection_timer(handshake_status, now);
740
741            trace!("{} {:?}", trace_id, self);
742            return (lost_packets, lost_bytes);
743        }
744
745        let epoch = if self.bytes_in_flight > 0 {
746            // Send new data if available, else retransmit old data. If neither
747            // is available, send a single PING frame.
748            let (_, e) = self.pto_time_and_space(handshake_status, now);
749
750            e
751        } else {
752            // Client sends an anti-deadlock packet: Initial is padded to earn
753            // more anti-amplification credit, a Handshake packet proves address
754            // ownership.
755            if handshake_status.has_handshake_keys {
756                packet::Epoch::Handshake
757            } else {
758                packet::Epoch::Initial
759            }
760        };
761
762        self.pto_count += 1;
763
764        let epoch = &mut self.epochs[epoch];
765
766        epoch.loss_probes = MAX_PTO_PROBES_COUNT.min(self.pto_count as usize);
767
768        // Skip packets that have already been acked or lost, and packets
769        // that don't contain either CRYPTO or STREAM frames and only return as
770        // many packets as the number of probe packets that will be sent.
771        let unacked_frames = epoch
772            .sent_packets
773            .iter_mut()
774            .filter_map(|p| {
775                if let SentStatus::Sent {
776                    has_data: true,
777                    frames,
778                    ..
779                } = &p.status
780                {
781                    Some(frames)
782                } else {
783                    None
784                }
785            })
786            .take(epoch.loss_probes)
787            .flatten()
788            .filter(|f| !matches!(f, frame::Frame::DatagramHeader { .. }));
789
790        // Retransmit the frames from the oldest sent packets on PTO. However
791        // the packets are not actually declared lost (so there is no effect to
792        // congestion control), we just reschedule the data they carried.
793        //
794        // This will also trigger sending an ACK and retransmitting frames like
795        // HANDSHAKE_DONE and MAX_DATA / MAX_STREAM_DATA as well, in addition
796        // to CRYPTO and STREAM, if the original packet carried them.
797        epoch.lost_frames.extend(unacked_frames.cloned());
798
799        self.pacer
800            .on_retransmission_timeout(!epoch.lost_frames.is_empty());
801
802        self.set_loss_detection_timer(handshake_status, now);
803
804        trace!("{} {:?}", trace_id, self);
805        (0, 0)
806    }
807
808    fn on_pkt_num_space_discarded(
809        &mut self, epoch: packet::Epoch, handshake_status: HandshakeStatus,
810        now: Instant,
811    ) {
812        let epoch = &mut self.epochs[epoch];
813        self.bytes_in_flight = self
814            .bytes_in_flight
815            .saturating_sub(epoch.discard(&mut self.pacer));
816        self.set_loss_detection_timer(handshake_status, now);
817    }
818
819    fn on_path_change(
820        &mut self, epoch: packet::Epoch, now: Instant, _trace_id: &str,
821    ) -> (usize, usize) {
822        let (lost_bytes, lost_packets) =
823            self.detect_and_remove_lost_packets(epoch, now);
824
825        (lost_packets, lost_bytes)
826    }
827
828    fn loss_detection_timer(&self) -> Option<Instant> {
829        self.loss_timer.time
830    }
831
832    fn cwnd(&self) -> usize {
833        self.pacer.get_congestion_window()
834    }
835
836    fn cwnd_available(&self) -> usize {
837        // Ignore cwnd when sending probe packets.
838        if self.epochs.iter().any(|e| e.loss_probes > 0) {
839            return usize::MAX;
840        }
841
842        self.cwnd().saturating_sub(self.bytes_in_flight)
843    }
844
845    fn rtt(&self) -> Duration {
846        self.rtt_stats.rtt()
847    }
848
849    fn min_rtt(&self) -> Option<Duration> {
850        self.rtt_stats.min_rtt()
851    }
852
853    fn rttvar(&self) -> Duration {
854        self.rtt_stats.rttvar()
855    }
856
857    fn pto(&self) -> Duration {
858        let r = &self.rtt_stats;
859        r.rtt() + (r.rttvar() * 4).max(GRANULARITY)
860    }
861
862    fn delivery_rate(&self) -> u64 {
863        self.pacer
864            .bandwidth_estimate(&self.rtt_stats)
865            .to_bits_per_second()
866    }
867
868    fn max_datagram_size(&self) -> usize {
869        self.max_datagram_size
870    }
871
872    fn pmtud_update_max_datagram_size(&mut self, new_max_datagram_size: usize) {
873        self.max_datagram_size = new_max_datagram_size;
874        self.pacer.update_mss(self.max_datagram_size);
875    }
876
877    fn update_max_datagram_size(&mut self, new_max_datagram_size: usize) {
878        self.pmtud_update_max_datagram_size(
879            self.max_datagram_size.min(new_max_datagram_size),
880        )
881    }
882
883    fn on_app_limited(&mut self) {
884        self.pacer.on_app_limited(self.bytes_in_flight)
885    }
886
887    #[cfg(test)]
888    fn sent_packets_len(&self, epoch: packet::Epoch) -> usize {
889        self.epochs[epoch].sent_packets.len()
890    }
891
892    #[cfg(test)]
893    fn in_flight_count(&self, epoch: packet::Epoch) -> usize {
894        self.epochs[epoch].pkts_in_flight
895    }
896
897    #[cfg(test)]
898    fn bytes_in_flight(&self) -> usize {
899        self.bytes_in_flight
900    }
901
902    #[cfg(test)]
903    fn pacing_rate(&self) -> u64 {
904        self.pacer
905            .pacing_rate(self.bytes_in_flight, &self.rtt_stats)
906            .to_bytes_per_period(Duration::from_secs(1))
907    }
908
909    #[cfg(test)]
910    fn pto_count(&self) -> u32 {
911        self.pto_count
912    }
913
914    #[cfg(test)]
915    fn pkt_thresh(&self) -> u64 {
916        self.pkt_thresh
917    }
918
919    #[cfg(test)]
920    fn lost_spurious_count(&self) -> usize {
921        self.lost_spurious_count
922    }
923
924    #[cfg(test)]
925    fn detect_lost_packets_for_test(
926        &mut self, epoch: packet::Epoch, now: Instant,
927    ) -> (usize, usize) {
928        let ret = self.detect_and_remove_lost_packets(epoch, now);
929        self.epochs[epoch].drain_acked_and_lost_packets();
930        ret
931    }
932
933    #[cfg(test)]
934    fn app_limited(&self) -> bool {
935        self.pacer.is_app_limited(self.bytes_in_flight)
936    }
937
938    fn update_app_limited(&mut self, _v: bool) {
939        // TODO
940    }
941
942    fn delivery_rate_update_app_limited(&mut self, _v: bool) {
943        // TODO
944    }
945
946    fn update_max_ack_delay(&mut self, max_ack_delay: Duration) {
947        self.rtt_stats.max_ack_delay = max_ack_delay;
948    }
949
950    fn get_next_release_time(&self) -> ReleaseDecision {
951        self.pacer.get_next_release_time()
952    }
953
954    fn gcongestion_enabled(&self) -> bool {
955        true
956    }
957
958    #[cfg(feature = "qlog")]
959    fn maybe_qlog(&mut self) -> Option<EventData> {
960        let qlog_metrics = QlogMetrics {
961            min_rtt: self.rtt_stats.min_rtt().unwrap_or(Duration::MAX),
962            smoothed_rtt: self.rtt(),
963            latest_rtt: self.rtt_stats.latest_rtt(),
964            rttvar: self.rtt_stats.rttvar(),
965            cwnd: self.cwnd() as u64,
966            bytes_in_flight: self.bytes_in_flight as u64,
967            ssthresh: self.pacer.ssthresh(),
968            pacing_rate: self.delivery_rate(),
969        };
970
971        self.qlog_metrics.maybe_update(qlog_metrics)
972    }
973
974    fn send_quantum(&self) -> usize {
975        let pacing_rate = self
976            .pacer
977            .pacing_rate(self.bytes_in_flight, &self.rtt_stats);
978
979        let floor = if pacing_rate < Bandwidth::from_kbits_per_second(1200) {
980            self.max_datagram_size
981        } else {
982            2 * self.max_datagram_size
983        };
984
985        pacing_rate
986            .to_bytes_per_period(ReleaseDecision::EQUAL_THRESHOLD)
987            .min(64 * 1024)
988            .max(floor as u64) as usize
989    }
990}
991
992impl std::fmt::Debug for GRecovery {
993    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
994        write!(f, "timer={:?} ", self.loss_detection_timer())?;
995        write!(f, "rtt_stats={:?} ", self.rtt_stats)?;
996        write!(f, "bytes_in_flight={} ", self.bytes_in_flight)?;
997        write!(f, "{:?} ", self.pacer)?;
998        Ok(())
999    }
1000}