Skip to main content

quiche/recovery/gcongestion/
recovery.rs

1use crate::packet;
2use crate::recovery::OnLossDetectionTimeoutOutcome;
3use crate::recovery::INITIAL_TIME_THRESHOLD_OVERHEAD;
4use crate::recovery::TIME_THRESHOLD_OVERHEAD_MULTIPLIER;
5use crate::Error;
6use crate::Result;
7
8use std::collections::VecDeque;
9use std::time::Duration;
10use std::time::Instant;
11
12use smallvec::SmallVec;
13
14#[cfg(feature = "qlog")]
15use qlog::events::EventData;
16
17#[cfg(feature = "qlog")]
18use crate::recovery::QlogMetrics;
19
20use crate::frame;
21
22use crate::recovery::bytes_in_flight::BytesInFlight;
23use crate::recovery::gcongestion::Bandwidth;
24use crate::recovery::rtt::RttStats;
25use crate::recovery::CongestionControlAlgorithm;
26use crate::recovery::HandshakeStatus;
27use crate::recovery::LossDetectionTimer;
28use crate::recovery::OnAckReceivedOutcome;
29use crate::recovery::RangeSet;
30use crate::recovery::RecoveryConfig;
31use crate::recovery::RecoveryOps;
32use crate::recovery::RecoveryStats;
33use crate::recovery::ReleaseDecision;
34use crate::recovery::Sent;
35use crate::recovery::StartupExit;
36use crate::recovery::GRANULARITY;
37use crate::recovery::INITIAL_PACKET_THRESHOLD;
38use crate::recovery::INITIAL_TIME_THRESHOLD;
39use crate::recovery::MAX_OUTSTANDING_NON_ACK_ELICITING;
40use crate::recovery::MAX_PACKET_THRESHOLD;
41use crate::recovery::MAX_PTO_PROBES_COUNT;
42use crate::recovery::PACKET_REORDER_TIME_THRESHOLD;
43
44use super::bbr2::BBRv2;
45use super::pacer::Pacer;
46use super::Acked;
47use super::Lost;
48
49// Congestion Control
50const MAX_WINDOW_PACKETS: usize = 20_000;
51
52#[derive(Debug)]
53struct SentPacket {
54    pkt_num: u64,
55    status: SentStatus,
56}
57
58#[derive(Debug)]
59enum SentStatus {
60    Sent {
61        time_sent: Instant,
62        ack_eliciting: bool,
63        in_flight: bool,
64        has_data: bool,
65        is_pmtud_probe: bool,
66        sent_bytes: usize,
67        frames: SmallVec<[frame::Frame; 1]>,
68    },
69    Acked,
70    Lost,
71}
72
73impl SentStatus {
74    fn ack(&mut self) -> Self {
75        std::mem::replace(self, SentStatus::Acked)
76    }
77
78    fn lose(&mut self) -> Self {
79        if !matches!(self, SentStatus::Acked) {
80            std::mem::replace(self, SentStatus::Lost)
81        } else {
82            SentStatus::Acked
83        }
84    }
85}
86
87#[derive(Default)]
88struct RecoveryEpoch {
89    /// The time the most recent ack-eliciting packet was sent.
90    time_of_last_ack_eliciting_packet: Option<Instant>,
91
92    /// The largest packet number acknowledged in the packet number space so
93    /// far.
94    largest_acked_packet: Option<u64>,
95
96    /// The time at which the next packet in that packet number space can be
97    /// considered lost based on exceeding the reordering window in time.
98    loss_time: Option<Instant>,
99
100    /// An association of packet numbers in a packet number space to information
101    /// about them.
102    sent_packets: VecDeque<SentPacket>,
103
104    loss_probes: usize,
105    pkts_in_flight: usize,
106
107    acked_frames: VecDeque<frame::Frame>,
108    lost_frames: VecDeque<frame::Frame>,
109
110    /// The largest packet number sent in the packet number space so far.
111    #[allow(dead_code)]
112    test_largest_sent_pkt_num_on_path: Option<u64>,
113}
114
115struct AckedDetectionResult {
116    acked_bytes: usize,
117    spurious_losses: usize,
118    spurious_pkt_thresh: Option<u64>,
119    has_ack_eliciting: bool,
120}
121
122struct LossDetectionResult {
123    lost_bytes: usize,
124    lost_packets: usize,
125
126    pmtud_lost_bytes: usize,
127    pmtud_lost_packets: SmallVec<[u64; 1]>,
128}
129
130impl RecoveryEpoch {
131    /// Discard the Epoch state and return the total size of unacked packets
132    /// that were discarded
133    fn discard(&mut self, cc: &mut Pacer) -> usize {
134        let unacked_bytes = self
135            .sent_packets
136            .drain(..)
137            .map(|p| {
138                if let SentPacket {
139                    status:
140                        SentStatus::Sent {
141                            in_flight,
142                            sent_bytes,
143                            ..
144                        },
145                    pkt_num,
146                } = p
147                {
148                    cc.on_packet_neutered(pkt_num);
149                    if in_flight {
150                        return sent_bytes;
151                    }
152                }
153                0
154            })
155            .sum();
156
157        std::mem::take(&mut self.sent_packets);
158        self.time_of_last_ack_eliciting_packet = None;
159        self.loss_time = None;
160        self.loss_probes = 0;
161        self.pkts_in_flight = 0;
162
163        unacked_bytes
164    }
165
166    // `peer_sent_ack_ranges` should not be used without validation.
167    fn detect_and_remove_acked_packets(
168        &mut self, peer_sent_ack_ranges: &RangeSet, newly_acked: &mut Vec<Acked>,
169        skip_pn: Option<u64>, trace_id: &str,
170    ) -> Result<AckedDetectionResult> {
171        newly_acked.clear();
172
173        let mut acked_bytes = 0;
174        let mut spurious_losses = 0;
175        let mut spurious_pkt_thresh = None;
176        let mut has_ack_eliciting = false;
177
178        let largest_ack_received = peer_sent_ack_ranges.last().unwrap();
179        let largest_acked = self
180            .largest_acked_packet
181            .unwrap_or(0)
182            .max(largest_ack_received);
183
184        for peer_sent_range in peer_sent_ack_ranges.iter() {
185            if skip_pn.is_some_and(|skip_pn| peer_sent_range.contains(&skip_pn)) {
186                // https://www.rfc-editor.org/rfc/rfc9000#section-13.1
187                // An endpoint SHOULD treat receipt of an acknowledgment
188                // for a packet it did not send as
189                // a connection error of type PROTOCOL_VIOLATION
190                return Err(Error::OptimisticAckDetected);
191            }
192
193            // Because packets always have incrementing numbers, they are always
194            // in sorted order.
195            let start = if self
196                .sent_packets
197                .front()
198                .filter(|e| e.pkt_num >= peer_sent_range.start)
199                .is_some()
200            {
201                // Usually it will be the first packet.
202                0
203            } else {
204                self.sent_packets
205                    .binary_search_by_key(&peer_sent_range.start, |p| p.pkt_num)
206                    .unwrap_or_else(|e| e)
207            };
208
209            for SentPacket { pkt_num, status } in
210                self.sent_packets.range_mut(start..)
211            {
212                if *pkt_num < peer_sent_range.end {
213                    match status.ack() {
214                        SentStatus::Sent {
215                            time_sent,
216                            in_flight,
217                            sent_bytes,
218                            frames,
219                            ack_eliciting,
220                            ..
221                        } => {
222                            if in_flight {
223                                self.pkts_in_flight -= 1;
224                                acked_bytes += sent_bytes;
225                            }
226                            newly_acked.push(Acked {
227                                pkt_num: *pkt_num,
228                                time_sent,
229                            });
230
231                            self.acked_frames.extend(frames);
232
233                            has_ack_eliciting |= ack_eliciting;
234
235                            trace!("{trace_id} packet newly acked {pkt_num}");
236                        },
237
238                        SentStatus::Acked => {},
239                        SentStatus::Lost => {
240                            // An acked packet was already declared lost
241                            spurious_losses += 1;
242                            spurious_pkt_thresh
243                                .get_or_insert(largest_acked - *pkt_num + 1);
244                        },
245                    }
246                } else {
247                    break;
248                }
249            }
250        }
251
252        self.drain_acked_and_lost_packets();
253
254        Ok(AckedDetectionResult {
255            acked_bytes,
256            spurious_losses,
257            spurious_pkt_thresh,
258            has_ack_eliciting,
259        })
260    }
261
262    fn detect_and_remove_lost_packets(
263        &mut self, loss_delay: Duration, pkt_thresh: Option<u64>, now: Instant,
264        newly_lost: &mut Vec<Lost>,
265    ) -> LossDetectionResult {
266        newly_lost.clear();
267        let mut lost_bytes = 0;
268        self.loss_time = None;
269
270        let lost_send_time = now.checked_sub(loss_delay).unwrap();
271        let largest_acked = self.largest_acked_packet.unwrap_or(0);
272        let mut pmtud_lost_bytes = 0;
273        let mut pmtud_lost_packets = SmallVec::new();
274
275        for SentPacket { pkt_num, status } in &mut self.sent_packets {
276            if *pkt_num > largest_acked {
277                break;
278            }
279
280            if let SentStatus::Sent { time_sent, .. } = status {
281                let loss_by_time = *time_sent <= lost_send_time;
282                let loss_by_pkt = match pkt_thresh {
283                    Some(pkt_thresh) => largest_acked >= *pkt_num + pkt_thresh,
284                    None => false,
285                };
286
287                if loss_by_time || loss_by_pkt {
288                    if let SentStatus::Sent {
289                        in_flight,
290                        sent_bytes,
291                        frames,
292                        is_pmtud_probe,
293                        ..
294                    } = status.lose()
295                    {
296                        self.lost_frames.extend(frames);
297
298                        if in_flight {
299                            self.pkts_in_flight -= 1;
300
301                            if is_pmtud_probe {
302                                pmtud_lost_bytes += sent_bytes;
303                                pmtud_lost_packets.push(*pkt_num);
304                                // Do not track PMTUD probes losses
305                                continue;
306                            }
307
308                            lost_bytes += sent_bytes;
309                        }
310
311                        newly_lost.push(Lost {
312                            packet_number: *pkt_num,
313                            bytes_lost: sent_bytes,
314                        });
315                    }
316                } else {
317                    self.loss_time = Some(*time_sent + loss_delay);
318                    break;
319                }
320            }
321        }
322
323        LossDetectionResult {
324            lost_bytes,
325            lost_packets: newly_lost.len(),
326
327            pmtud_lost_bytes,
328            pmtud_lost_packets,
329        }
330    }
331
332    /// Remove packets that were already handled from the front of the queue,
333    /// but avoid removing packets from the middle of the queue to avoid
334    /// compaction
335    fn drain_acked_and_lost_packets(&mut self) {
336        while let Some(SentPacket {
337            status: SentStatus::Acked | SentStatus::Lost,
338            ..
339        }) = self.sent_packets.front()
340        {
341            self.sent_packets.pop_front();
342        }
343    }
344
345    fn least_unacked(&self) -> u64 {
346        for pkt in self.sent_packets.iter() {
347            if let SentPacket {
348                pkt_num,
349                status: SentStatus::Sent { .. },
350            } = pkt
351            {
352                return *pkt_num;
353            }
354        }
355
356        self.largest_acked_packet.unwrap_or(0) + 1
357    }
358}
359
360struct LossThreshold {
361    pkt_thresh: Option<u64>,
362    time_thresh: f64,
363
364    // # Experiment: enable_relaxed_loss_threshold
365    //
366    // If `Some` this will disable pkt_thresh on the first loss and then double
367    // time_thresh on subsequent loss.
368    //
369    // The actual threshold is calcualted as `1.0 +
370    // INITIAL_TIME_THRESHOLD_OVERHEAD` and equivalent to the initial value
371    // of INITIAL_TIME_THRESHOLD.
372    time_thresh_overhead: Option<f64>,
373}
374
375impl LossThreshold {
376    fn new(recovery_config: &RecoveryConfig) -> Self {
377        let time_thresh_overhead =
378            if recovery_config.enable_relaxed_loss_threshold {
379                Some(INITIAL_TIME_THRESHOLD_OVERHEAD)
380            } else {
381                None
382            };
383        LossThreshold {
384            pkt_thresh: Some(INITIAL_PACKET_THRESHOLD),
385            time_thresh: INITIAL_TIME_THRESHOLD,
386            time_thresh_overhead,
387        }
388    }
389
390    fn pkt_thresh(&self) -> Option<u64> {
391        self.pkt_thresh
392    }
393
394    fn time_thresh(&self) -> f64 {
395        self.time_thresh
396    }
397
398    fn on_spurious_loss(&mut self, new_pkt_thresh: u64) {
399        match &mut self.time_thresh_overhead {
400            Some(time_thresh_overhead) => {
401                if self.pkt_thresh.is_some() {
402                    // Disable packet threshold on first spurious loss.
403                    self.pkt_thresh = None;
404                } else {
405                    // Double time threshold but cap it at `1.0`, which ends up
406                    // being 2x the RTT.
407                    *time_thresh_overhead *= TIME_THRESHOLD_OVERHEAD_MULTIPLIER;
408                    *time_thresh_overhead = time_thresh_overhead.min(1.0);
409
410                    self.time_thresh = 1.0 + *time_thresh_overhead;
411                }
412            },
413            None => {
414                let new_packet_threshold = self
415                    .pkt_thresh
416                    .expect("packet threshold should always be Some when `enable_relaxed_loss_threshold` is false")
417                    .max(new_pkt_thresh.min(MAX_PACKET_THRESHOLD));
418                self.pkt_thresh = Some(new_packet_threshold);
419
420                self.time_thresh = PACKET_REORDER_TIME_THRESHOLD;
421            },
422        }
423    }
424}
425
426pub struct GRecovery {
427    epochs: [RecoveryEpoch; packet::Epoch::count()],
428
429    loss_timer: LossDetectionTimer,
430
431    pto_count: u32,
432
433    rtt_stats: RttStats,
434
435    recovery_stats: RecoveryStats,
436
437    pub lost_count: usize,
438
439    pub lost_spurious_count: usize,
440
441    loss_thresh: LossThreshold,
442
443    bytes_in_flight: BytesInFlight,
444
445    bytes_sent: usize,
446
447    pub bytes_lost: u64,
448
449    max_datagram_size: usize,
450    time_sent_set_to_now: bool,
451
452    #[cfg(feature = "qlog")]
453    qlog_metrics: QlogMetrics,
454
455    #[cfg(feature = "qlog")]
456    qlog_prev_cc_state: &'static str,
457
458    /// How many non-ack-eliciting packets have been sent.
459    outstanding_non_ack_eliciting: usize,
460
461    /// A resusable list of acks.
462    newly_acked: Vec<Acked>,
463
464    /// A [`Vec`] that can be reused for calls of
465    /// [`Self::detect_and_remove_lost_packets`] to avoid allocations
466    lost_reuse: Vec<Lost>,
467
468    pacer: Pacer,
469}
470
471impl GRecovery {
472    #[cfg(feature = "qlog")]
473    fn send_rate(&self) -> Bandwidth {
474        self.pacer.send_rate().unwrap_or(Bandwidth::zero())
475    }
476
477    #[cfg(feature = "qlog")]
478    fn ack_rate(&self) -> Bandwidth {
479        self.pacer.ack_rate().unwrap_or(Bandwidth::zero())
480    }
481
482    pub fn new(recovery_config: &RecoveryConfig) -> Option<Self> {
483        let cc = match recovery_config.cc_algorithm {
484            CongestionControlAlgorithm::Bbr2Gcongestion => BBRv2::new(
485                recovery_config.initial_congestion_window_packets,
486                MAX_WINDOW_PACKETS,
487                recovery_config.max_send_udp_payload_size,
488                recovery_config.initial_rtt,
489                recovery_config.custom_bbr_params.as_ref(),
490            ),
491            _ => return None,
492        };
493
494        Some(Self {
495            epochs: Default::default(),
496            rtt_stats: RttStats::new(
497                recovery_config.initial_rtt,
498                recovery_config.max_ack_delay,
499            ),
500            recovery_stats: RecoveryStats::default(),
501            loss_timer: Default::default(),
502            pto_count: 0,
503
504            lost_count: 0,
505            lost_spurious_count: 0,
506
507            loss_thresh: LossThreshold::new(recovery_config),
508            bytes_in_flight: Default::default(),
509            bytes_sent: 0,
510            bytes_lost: 0,
511
512            max_datagram_size: recovery_config.max_send_udp_payload_size,
513            time_sent_set_to_now: cc.time_sent_set_to_now(),
514
515            #[cfg(feature = "qlog")]
516            qlog_metrics: QlogMetrics::default(),
517
518            #[cfg(feature = "qlog")]
519            qlog_prev_cc_state: "",
520
521            outstanding_non_ack_eliciting: 0,
522
523            pacer: Pacer::new(
524                recovery_config.pacing,
525                cc,
526                recovery_config
527                    .max_pacing_rate
528                    .map(Bandwidth::from_mbits_per_second),
529            ),
530
531            newly_acked: Vec::new(),
532            lost_reuse: Vec::new(),
533        })
534    }
535
536    fn detect_and_remove_lost_packets(
537        &mut self, epoch: packet::Epoch, now: Instant,
538    ) -> (usize, usize) {
539        let loss_delay =
540            self.rtt_stats.loss_delay(self.loss_thresh.time_thresh());
541        let lost = &mut self.lost_reuse;
542
543        let LossDetectionResult {
544            lost_bytes,
545            lost_packets,
546            pmtud_lost_bytes,
547            pmtud_lost_packets,
548        } = self.epochs[epoch].detect_and_remove_lost_packets(
549            loss_delay,
550            self.loss_thresh.pkt_thresh(),
551            now,
552            lost,
553        );
554
555        self.bytes_in_flight
556            .saturating_subtract(lost_bytes + pmtud_lost_bytes, now);
557
558        for pkt in pmtud_lost_packets {
559            self.pacer.on_packet_neutered(pkt);
560        }
561
562        (lost_bytes, lost_packets)
563    }
564
565    fn loss_time_and_space(&self) -> (Option<Instant>, packet::Epoch) {
566        let mut epoch = packet::Epoch::Initial;
567        let mut time = self.epochs[epoch].loss_time;
568
569        // Iterate over all packet number spaces starting from Handshake.
570        for e in [packet::Epoch::Handshake, packet::Epoch::Application] {
571            let new_time = self.epochs[e].loss_time;
572            if time.is_none() || new_time < time {
573                time = new_time;
574                epoch = e;
575            }
576        }
577
578        (time, epoch)
579    }
580
581    fn pto_time_and_space(
582        &self, handshake_status: HandshakeStatus, now: Instant,
583    ) -> (Option<Instant>, packet::Epoch) {
584        let mut duration = self.pto() * (1 << self.pto_count);
585
586        // Arm PTO from now when there are no inflight packets.
587        if self.bytes_in_flight.is_zero() {
588            if handshake_status.has_handshake_keys {
589                return (Some(now + duration), packet::Epoch::Handshake);
590            } else {
591                return (Some(now + duration), packet::Epoch::Initial);
592            }
593        }
594
595        let mut pto_timeout = None;
596        let mut pto_space = packet::Epoch::Initial;
597
598        // Iterate over all packet number spaces.
599        for &e in packet::Epoch::epochs(
600            packet::Epoch::Initial..=packet::Epoch::Application,
601        ) {
602            if self.epochs[e].pkts_in_flight == 0 {
603                continue;
604            }
605
606            if e == packet::Epoch::Application {
607                // Skip Application Data until handshake completes.
608                if !handshake_status.completed {
609                    return (pto_timeout, pto_space);
610                }
611
612                // Include max_ack_delay and backoff for Application Data.
613                duration +=
614                    self.rtt_stats.max_ack_delay * 2_u32.pow(self.pto_count);
615            }
616
617            let new_time = self.epochs[e]
618                .time_of_last_ack_eliciting_packet
619                .map(|t| t + duration);
620
621            if pto_timeout.is_none() || new_time < pto_timeout {
622                pto_timeout = new_time;
623                pto_space = e;
624            }
625        }
626
627        (pto_timeout, pto_space)
628    }
629
630    fn set_loss_detection_timer(
631        &mut self, handshake_status: HandshakeStatus, now: Instant,
632    ) {
633        if let (Some(earliest_loss_time), _) = self.loss_time_and_space() {
634            // Time threshold loss detection.
635            self.loss_timer.update(earliest_loss_time);
636            return;
637        }
638
639        if self.bytes_in_flight.is_zero() &&
640            handshake_status.peer_verified_address
641        {
642            self.loss_timer.clear();
643            return;
644        }
645
646        // PTO timer.
647        if let (Some(timeout), _) = self.pto_time_and_space(handshake_status, now)
648        {
649            self.loss_timer.update(timeout);
650        }
651    }
652}
653
654impl RecoveryOps for GRecovery {
655    fn lost_count(&self) -> usize {
656        self.lost_count
657    }
658
659    fn bytes_lost(&self) -> u64 {
660        self.bytes_lost
661    }
662
663    fn should_elicit_ack(&self, epoch: packet::Epoch) -> bool {
664        self.epochs[epoch].loss_probes > 0 ||
665            self.outstanding_non_ack_eliciting >=
666                MAX_OUTSTANDING_NON_ACK_ELICITING
667    }
668
669    fn next_acked_frame(&mut self, epoch: packet::Epoch) -> Option<frame::Frame> {
670        self.epochs[epoch].acked_frames.pop_front()
671    }
672
673    fn next_lost_frame(&mut self, epoch: packet::Epoch) -> Option<frame::Frame> {
674        self.epochs[epoch].lost_frames.pop_front()
675    }
676
677    fn get_largest_acked_on_epoch(&self, epoch: packet::Epoch) -> Option<u64> {
678        self.epochs[epoch].largest_acked_packet
679    }
680
681    fn has_lost_frames(&self, epoch: packet::Epoch) -> bool {
682        !self.epochs[epoch].lost_frames.is_empty()
683    }
684
685    fn loss_probes(&self, epoch: packet::Epoch) -> usize {
686        self.epochs[epoch].loss_probes
687    }
688
689    #[cfg(test)]
690    fn inc_loss_probes(&mut self, epoch: packet::Epoch) {
691        self.epochs[epoch].loss_probes += 1;
692    }
693
694    fn ping_sent(&mut self, epoch: packet::Epoch) {
695        self.epochs[epoch].loss_probes =
696            self.epochs[epoch].loss_probes.saturating_sub(1);
697    }
698
699    fn on_packet_sent(
700        &mut self, pkt: Sent, epoch: packet::Epoch,
701        handshake_status: HandshakeStatus, now: Instant, trace_id: &str,
702    ) {
703        let time_sent = if self.time_sent_set_to_now {
704            now
705        } else {
706            self.get_next_release_time().time(now).unwrap_or(now)
707        };
708
709        let epoch = &mut self.epochs[epoch];
710
711        let ack_eliciting = pkt.ack_eliciting;
712        let in_flight = pkt.in_flight;
713        let is_pmtud_probe = pkt.is_pmtud_probe;
714        let pkt_num = pkt.pkt_num;
715        let sent_bytes = pkt.size;
716
717        if let Some(SentPacket { pkt_num, .. }) = epoch.sent_packets.back() {
718            assert!(*pkt_num < pkt.pkt_num, "Packet numbers must increase");
719        }
720
721        let status = SentStatus::Sent {
722            time_sent,
723            ack_eliciting,
724            in_flight,
725            is_pmtud_probe,
726            has_data: pkt.has_data,
727            sent_bytes,
728            frames: pkt.frames,
729        };
730
731        #[cfg(test)]
732        {
733            epoch.test_largest_sent_pkt_num_on_path = epoch
734                .test_largest_sent_pkt_num_on_path
735                .max(Some(pkt.pkt_num));
736        }
737
738        epoch.sent_packets.push_back(SentPacket { pkt_num, status });
739
740        if ack_eliciting {
741            epoch.time_of_last_ack_eliciting_packet = Some(time_sent);
742            self.outstanding_non_ack_eliciting = 0;
743        } else {
744            self.outstanding_non_ack_eliciting += 1;
745        }
746
747        if in_flight {
748            self.pacer.on_packet_sent(
749                time_sent,
750                self.bytes_in_flight.get(),
751                pkt_num,
752                sent_bytes,
753                pkt.has_data,
754                &self.rtt_stats,
755            );
756
757            self.bytes_in_flight.add(sent_bytes, now);
758            epoch.pkts_in_flight += 1;
759            self.set_loss_detection_timer(handshake_status, time_sent);
760        }
761
762        self.bytes_sent += sent_bytes;
763
764        trace!("{trace_id} {self:?}");
765    }
766
767    fn get_packet_send_time(&self, now: Instant) -> Instant {
768        self.pacer.get_next_release_time().time(now).unwrap_or(now)
769    }
770
771    // `peer_sent_ack_ranges` should not be used without validation.
772    fn on_ack_received(
773        &mut self, peer_sent_ack_ranges: &RangeSet, ack_delay: u64,
774        epoch: packet::Epoch, handshake_status: HandshakeStatus, now: Instant,
775        skip_pn: Option<u64>, trace_id: &str,
776    ) -> Result<OnAckReceivedOutcome> {
777        let prior_in_flight = self.bytes_in_flight.get();
778
779        let AckedDetectionResult {
780            acked_bytes,
781            spurious_losses,
782            spurious_pkt_thresh,
783            has_ack_eliciting,
784        } = self.epochs[epoch].detect_and_remove_acked_packets(
785            peer_sent_ack_ranges,
786            &mut self.newly_acked,
787            skip_pn,
788            trace_id,
789        )?;
790
791        self.lost_spurious_count += spurious_losses;
792        if let Some(thresh) = spurious_pkt_thresh {
793            self.loss_thresh.on_spurious_loss(thresh);
794        }
795
796        if self.newly_acked.is_empty() {
797            return Ok(OnAckReceivedOutcome {
798                acked_bytes,
799                spurious_losses,
800                ..Default::default()
801            });
802        }
803
804        self.bytes_in_flight.saturating_subtract(acked_bytes, now);
805
806        let largest_newly_acked = self.newly_acked.last().unwrap();
807
808        // Update `largest_acked_packet` based on the validated `newly_acked`
809        // value.
810        let largest_acked_pkt_num = self.epochs[epoch]
811            .largest_acked_packet
812            .unwrap_or(0)
813            .max(largest_newly_acked.pkt_num);
814        self.epochs[epoch].largest_acked_packet = Some(largest_acked_pkt_num);
815
816        // Check if largest packet is newly acked.
817        let update_rtt = largest_newly_acked.pkt_num == largest_acked_pkt_num &&
818            has_ack_eliciting;
819        if update_rtt {
820            let latest_rtt = now - largest_newly_acked.time_sent;
821            self.rtt_stats.update_rtt(
822                latest_rtt,
823                Duration::from_micros(ack_delay),
824                now,
825                handshake_status.completed,
826            );
827        }
828
829        let (lost_bytes, lost_packets) =
830            self.detect_and_remove_lost_packets(epoch, now);
831
832        self.pacer.on_congestion_event(
833            update_rtt,
834            prior_in_flight,
835            self.bytes_in_flight.get(),
836            now,
837            &self.newly_acked,
838            &self.lost_reuse,
839            self.epochs[epoch].least_unacked(),
840            &self.rtt_stats,
841            &mut self.recovery_stats,
842        );
843
844        self.pto_count = 0;
845        self.lost_count += lost_packets;
846
847        self.set_loss_detection_timer(handshake_status, now);
848
849        trace!("{trace_id} {self:?}");
850
851        Ok(OnAckReceivedOutcome {
852            lost_packets,
853            lost_bytes,
854            acked_bytes,
855            spurious_losses,
856        })
857    }
858
859    fn on_loss_detection_timeout(
860        &mut self, handshake_status: HandshakeStatus, now: Instant,
861        trace_id: &str,
862    ) -> OnLossDetectionTimeoutOutcome {
863        let (earliest_loss_time, epoch) = self.loss_time_and_space();
864
865        if earliest_loss_time.is_some() {
866            let prior_in_flight = self.bytes_in_flight.get();
867
868            let (lost_bytes, lost_packets) =
869                self.detect_and_remove_lost_packets(epoch, now);
870
871            self.pacer.on_congestion_event(
872                false,
873                prior_in_flight,
874                self.bytes_in_flight.get(),
875                now,
876                &[],
877                &self.lost_reuse,
878                self.epochs[epoch].least_unacked(),
879                &self.rtt_stats,
880                &mut self.recovery_stats,
881            );
882
883            self.lost_count += lost_packets;
884
885            self.set_loss_detection_timer(handshake_status, now);
886
887            trace!("{trace_id} {self:?}");
888            return OnLossDetectionTimeoutOutcome {
889                lost_packets,
890                lost_bytes,
891            };
892        }
893
894        let epoch = if self.bytes_in_flight.get() > 0 {
895            // Send new data if available, else retransmit old data. If neither
896            // is available, send a single PING frame.
897            let (_, e) = self.pto_time_and_space(handshake_status, now);
898
899            e
900        } else {
901            // Client sends an anti-deadlock packet: Initial is padded to earn
902            // more anti-amplification credit, a Handshake packet proves address
903            // ownership.
904            if handshake_status.has_handshake_keys {
905                packet::Epoch::Handshake
906            } else {
907                packet::Epoch::Initial
908            }
909        };
910
911        self.pto_count += 1;
912
913        let epoch = &mut self.epochs[epoch];
914
915        epoch.loss_probes = MAX_PTO_PROBES_COUNT.min(self.pto_count as usize);
916
917        // Skip packets that have already been acked or lost, and packets
918        // that don't contain either CRYPTO or STREAM frames and only return as
919        // many packets as the number of probe packets that will be sent.
920        let unacked_frames = epoch
921            .sent_packets
922            .iter_mut()
923            .filter_map(|p| {
924                if let SentStatus::Sent {
925                    has_data: true,
926                    frames,
927                    ..
928                } = &p.status
929                {
930                    Some(frames)
931                } else {
932                    None
933                }
934            })
935            .take(epoch.loss_probes)
936            .flatten()
937            .filter(|f| !matches!(f, frame::Frame::DatagramHeader { .. }));
938
939        // Retransmit the frames from the oldest sent packets on PTO. However
940        // the packets are not actually declared lost (so there is no effect to
941        // congestion control), we just reschedule the data they carried.
942        //
943        // This will also trigger sending an ACK and retransmitting frames like
944        // HANDSHAKE_DONE and MAX_DATA / MAX_STREAM_DATA as well, in addition
945        // to CRYPTO and STREAM, if the original packet carried them.
946        epoch.lost_frames.extend(unacked_frames.cloned());
947
948        self.pacer
949            .on_retransmission_timeout(!epoch.lost_frames.is_empty());
950
951        self.set_loss_detection_timer(handshake_status, now);
952
953        trace!("{trace_id} {self:?}");
954        OnLossDetectionTimeoutOutcome {
955            lost_packets: 0,
956            lost_bytes: 0,
957        }
958    }
959
960    fn on_pkt_num_space_discarded(
961        &mut self, epoch: packet::Epoch, handshake_status: HandshakeStatus,
962        now: Instant,
963    ) {
964        let epoch = &mut self.epochs[epoch];
965        self.bytes_in_flight
966            .saturating_subtract(epoch.discard(&mut self.pacer), now);
967        self.set_loss_detection_timer(handshake_status, now);
968    }
969
970    fn on_path_change(
971        &mut self, epoch: packet::Epoch, now: Instant, _trace_id: &str,
972    ) -> (usize, usize) {
973        let (lost_bytes, lost_packets) =
974            self.detect_and_remove_lost_packets(epoch, now);
975
976        (lost_packets, lost_bytes)
977    }
978
979    fn loss_detection_timer(&self) -> Option<Instant> {
980        self.loss_timer.time
981    }
982
983    fn cwnd(&self) -> usize {
984        self.pacer.get_congestion_window()
985    }
986
987    fn cwnd_available(&self) -> usize {
988        // Ignore cwnd when sending probe packets.
989        if self.epochs.iter().any(|e| e.loss_probes > 0) {
990            return usize::MAX;
991        }
992
993        self.cwnd().saturating_sub(self.bytes_in_flight.get())
994    }
995
996    fn rtt(&self) -> Duration {
997        self.rtt_stats.rtt()
998    }
999
1000    fn min_rtt(&self) -> Option<Duration> {
1001        self.rtt_stats.min_rtt()
1002    }
1003
1004    fn max_rtt(&self) -> Option<Duration> {
1005        self.rtt_stats.max_rtt()
1006    }
1007
1008    fn rttvar(&self) -> Duration {
1009        self.rtt_stats.rttvar()
1010    }
1011
1012    fn pto(&self) -> Duration {
1013        let r = &self.rtt_stats;
1014        r.rtt() + (r.rttvar() * 4).max(GRANULARITY)
1015    }
1016
1017    /// The most recent data delivery rate estimate.
1018    fn delivery_rate(&self) -> Bandwidth {
1019        self.pacer.bandwidth_estimate(&self.rtt_stats)
1020    }
1021
1022    fn max_bandwidth(&self) -> Option<Bandwidth> {
1023        Some(self.pacer.max_bandwidth())
1024    }
1025
1026    /// Statistics from when a CCA first exited the startup phase.
1027    fn startup_exit(&self) -> Option<StartupExit> {
1028        self.recovery_stats.startup_exit
1029    }
1030
1031    fn max_datagram_size(&self) -> usize {
1032        self.max_datagram_size
1033    }
1034
1035    fn pmtud_update_max_datagram_size(&mut self, new_max_datagram_size: usize) {
1036        self.max_datagram_size = new_max_datagram_size;
1037        self.pacer.update_mss(self.max_datagram_size);
1038    }
1039
1040    fn update_max_datagram_size(&mut self, new_max_datagram_size: usize) {
1041        self.pmtud_update_max_datagram_size(
1042            self.max_datagram_size.min(new_max_datagram_size),
1043        )
1044    }
1045
1046    // FIXME only used by gcongestion
1047    fn on_app_limited(&mut self) {
1048        self.pacer.on_app_limited(self.bytes_in_flight.get())
1049    }
1050
1051    #[cfg(test)]
1052    fn sent_packets_len(&self, epoch: packet::Epoch) -> usize {
1053        self.epochs[epoch].sent_packets.len()
1054    }
1055
1056    #[cfg(test)]
1057    fn in_flight_count(&self, epoch: packet::Epoch) -> usize {
1058        self.epochs[epoch].pkts_in_flight
1059    }
1060
1061    fn bytes_in_flight(&self) -> usize {
1062        self.bytes_in_flight.get()
1063    }
1064
1065    fn bytes_in_flight_duration(&self) -> Duration {
1066        self.bytes_in_flight.get_duration()
1067    }
1068
1069    #[cfg(test)]
1070    fn pacing_rate(&self) -> u64 {
1071        self.pacer
1072            .pacing_rate(self.bytes_in_flight.get(), &self.rtt_stats)
1073            .to_bytes_per_period(Duration::from_secs(1))
1074    }
1075
1076    #[cfg(test)]
1077    fn pto_count(&self) -> u32 {
1078        self.pto_count
1079    }
1080
1081    #[cfg(test)]
1082    fn pkt_thresh(&self) -> Option<u64> {
1083        self.loss_thresh.pkt_thresh()
1084    }
1085
1086    #[cfg(test)]
1087    fn time_thresh(&self) -> f64 {
1088        self.loss_thresh.time_thresh()
1089    }
1090
1091    #[cfg(test)]
1092    fn lost_spurious_count(&self) -> usize {
1093        self.lost_spurious_count
1094    }
1095
1096    #[cfg(test)]
1097    fn detect_lost_packets_for_test(
1098        &mut self, epoch: packet::Epoch, now: Instant,
1099    ) -> (usize, usize) {
1100        let ret = self.detect_and_remove_lost_packets(epoch, now);
1101        self.epochs[epoch].drain_acked_and_lost_packets();
1102        ret
1103    }
1104
1105    #[cfg(test)]
1106    fn largest_sent_pkt_num_on_path(&self, epoch: packet::Epoch) -> Option<u64> {
1107        self.epochs[epoch].test_largest_sent_pkt_num_on_path
1108    }
1109
1110    #[cfg(test)]
1111    fn app_limited(&self) -> bool {
1112        self.pacer.is_app_limited(self.bytes_in_flight.get())
1113    }
1114
1115    // FIXME only used by congestion
1116    fn update_app_limited(&mut self, _v: bool) {
1117        // TODO
1118    }
1119
1120    // FIXME only used by congestion
1121    fn delivery_rate_update_app_limited(&mut self, _v: bool) {
1122        // TODO
1123    }
1124
1125    fn update_max_ack_delay(&mut self, max_ack_delay: Duration) {
1126        self.rtt_stats.max_ack_delay = max_ack_delay;
1127    }
1128
1129    fn get_next_release_time(&self) -> ReleaseDecision {
1130        self.pacer.get_next_release_time()
1131    }
1132
1133    fn gcongestion_enabled(&self) -> bool {
1134        true
1135    }
1136
1137    #[cfg(feature = "qlog")]
1138    fn state_str(&self, _now: Instant) -> &'static str {
1139        self.pacer.state_str()
1140    }
1141
1142    #[cfg(feature = "qlog")]
1143    fn get_updated_qlog_event_data(&mut self) -> Option<EventData> {
1144        let qlog_metrics = QlogMetrics {
1145            min_rtt: *self.rtt_stats.min_rtt,
1146            smoothed_rtt: self.rtt(),
1147            latest_rtt: self.rtt_stats.latest_rtt(),
1148            rttvar: self.rtt_stats.rttvar(),
1149            cwnd: self.cwnd() as u64,
1150            bytes_in_flight: self.bytes_in_flight.get() as u64,
1151            ssthresh: self.pacer.ssthresh(),
1152
1153            pacing_rate: Some(
1154                self.pacer
1155                    .pacing_rate(self.bytes_in_flight.get(), &self.rtt_stats)
1156                    .to_bytes_per_second(),
1157            ),
1158            delivery_rate: Some(self.delivery_rate().to_bytes_per_second()),
1159            send_rate: Some(self.send_rate().to_bytes_per_second()),
1160            ack_rate: Some(self.ack_rate().to_bytes_per_second()),
1161            lost_packets: Some(self.lost_count as u64),
1162            lost_bytes: Some(self.bytes_lost),
1163            pto_count: Some(self.pto_count),
1164        };
1165
1166        self.qlog_metrics.maybe_update(qlog_metrics)
1167    }
1168
1169    #[cfg(feature = "qlog")]
1170    fn get_updated_qlog_cc_state(
1171        &mut self, now: Instant,
1172    ) -> Option<&'static str> {
1173        let cc_state = self.state_str(now);
1174        if cc_state != self.qlog_prev_cc_state {
1175            self.qlog_prev_cc_state = cc_state;
1176            Some(cc_state)
1177        } else {
1178            None
1179        }
1180    }
1181
1182    fn send_quantum(&self) -> usize {
1183        let pacing_rate = self
1184            .pacer
1185            .pacing_rate(self.bytes_in_flight.get(), &self.rtt_stats);
1186
1187        let floor = if pacing_rate < Bandwidth::from_kbits_per_second(1200) {
1188            self.max_datagram_size
1189        } else {
1190            2 * self.max_datagram_size
1191        };
1192
1193        pacing_rate
1194            .to_bytes_per_period(ReleaseDecision::EQUAL_THRESHOLD)
1195            .min(64 * 1024)
1196            .max(floor as u64) as usize
1197    }
1198}
1199
1200impl std::fmt::Debug for GRecovery {
1201    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
1202        write!(f, "timer={:?} ", self.loss_detection_timer())?;
1203        write!(f, "rtt_stats={:?} ", self.rtt_stats)?;
1204        write!(f, "bytes_in_flight={} ", self.bytes_in_flight.get())?;
1205        write!(f, "{:?} ", self.pacer)?;
1206        Ok(())
1207    }
1208}
1209
1210#[cfg(test)]
1211mod tests {
1212    use super::*;
1213    use crate::Config;
1214
1215    #[test]
1216    fn loss_threshold() {
1217        let config = Config::new(crate::PROTOCOL_VERSION).unwrap();
1218        let recovery_config = RecoveryConfig::from_config(&config);
1219        assert!(!recovery_config.enable_relaxed_loss_threshold);
1220
1221        let mut loss_thresh = LossThreshold::new(&recovery_config);
1222        assert_eq!(loss_thresh.time_thresh_overhead, None);
1223        assert_eq!(loss_thresh.pkt_thresh().unwrap(), INITIAL_PACKET_THRESHOLD);
1224        assert_eq!(loss_thresh.time_thresh(), INITIAL_TIME_THRESHOLD);
1225
1226        // First spurious loss.
1227        loss_thresh.on_spurious_loss(INITIAL_PACKET_THRESHOLD);
1228        assert_eq!(loss_thresh.pkt_thresh().unwrap(), INITIAL_PACKET_THRESHOLD);
1229        assert_eq!(loss_thresh.time_thresh(), PACKET_REORDER_TIME_THRESHOLD);
1230
1231        // Packet gaps < INITIAL_PACKET_THRESHOLD will NOT change packet
1232        // threshold.
1233        for packet_gap in 0..INITIAL_PACKET_THRESHOLD {
1234            loss_thresh.on_spurious_loss(packet_gap);
1235
1236            // Packet threshold only increases once the packet gap increases.
1237            assert_eq!(
1238                loss_thresh.pkt_thresh().unwrap(),
1239                INITIAL_PACKET_THRESHOLD
1240            );
1241            assert_eq!(loss_thresh.time_thresh(), PACKET_REORDER_TIME_THRESHOLD);
1242        }
1243
1244        // Subsequent spurious loss with packet_gaps > INITIAL_PACKET_THRESHOLD.
1245        // Test values much larger than MAX_PACKET_THRESHOLD, i.e.
1246        // `MAX_PACKET_THRESHOLD * 2`
1247        for packet_gap in INITIAL_PACKET_THRESHOLD + 1..MAX_PACKET_THRESHOLD * 2 {
1248            loss_thresh.on_spurious_loss(packet_gap);
1249
1250            // Packet threshold is equal to packet gap beyond
1251            // INITIAL_PACKET_THRESHOLD, but capped
1252            // at MAX_PACKET_THRESHOLD.
1253            let new_packet_threshold = if packet_gap < MAX_PACKET_THRESHOLD {
1254                packet_gap
1255            } else {
1256                MAX_PACKET_THRESHOLD
1257            };
1258            assert_eq!(loss_thresh.pkt_thresh().unwrap(), new_packet_threshold);
1259            assert_eq!(loss_thresh.time_thresh(), PACKET_REORDER_TIME_THRESHOLD);
1260        }
1261        // Packet threshold is capped at MAX_PACKET_THRESHOLD
1262        assert_eq!(loss_thresh.pkt_thresh().unwrap(), MAX_PACKET_THRESHOLD);
1263        assert_eq!(loss_thresh.time_thresh(), PACKET_REORDER_TIME_THRESHOLD);
1264
1265        // Packet threshold is monotonically increasing
1266        loss_thresh.on_spurious_loss(INITIAL_PACKET_THRESHOLD);
1267        assert_eq!(loss_thresh.pkt_thresh().unwrap(), MAX_PACKET_THRESHOLD);
1268        assert_eq!(loss_thresh.time_thresh(), PACKET_REORDER_TIME_THRESHOLD);
1269    }
1270
1271    #[test]
1272    fn relaxed_loss_threshold() {
1273        // The max time threshold when operating in relaxed loss mode.
1274        const MAX_TIME_THRESHOLD: f64 = 2.0;
1275
1276        let mut config = Config::new(crate::PROTOCOL_VERSION).unwrap();
1277        config.set_enable_relaxed_loss_threshold(true);
1278        let recovery_config = RecoveryConfig::from_config(&config);
1279        assert!(recovery_config.enable_relaxed_loss_threshold);
1280
1281        let mut loss_thresh = LossThreshold::new(&recovery_config);
1282        assert_eq!(
1283            loss_thresh.time_thresh_overhead,
1284            Some(INITIAL_TIME_THRESHOLD_OVERHEAD)
1285        );
1286        assert_eq!(loss_thresh.pkt_thresh().unwrap(), INITIAL_PACKET_THRESHOLD);
1287        assert_eq!(loss_thresh.time_thresh(), INITIAL_TIME_THRESHOLD);
1288
1289        // First spurious loss.
1290        loss_thresh.on_spurious_loss(INITIAL_PACKET_THRESHOLD);
1291        assert_eq!(loss_thresh.pkt_thresh(), None);
1292        assert_eq!(loss_thresh.time_thresh(), INITIAL_TIME_THRESHOLD);
1293
1294        // Subsequent spurious loss.
1295        for subsequent_loss_count in 1..100 {
1296            // Double the overhead until it caps at `2.0`.
1297            //
1298            // It takes `3` rounds of doubling for INITIAL_TIME_THRESHOLD_OVERHEAD
1299            // to equal `1.0`.
1300            let new_time_threshold = if subsequent_loss_count <= 3 {
1301                1.0 + INITIAL_TIME_THRESHOLD_OVERHEAD *
1302                    2_f64.powi(subsequent_loss_count as i32)
1303            } else {
1304                2.0
1305            };
1306
1307            loss_thresh.on_spurious_loss(subsequent_loss_count);
1308            assert_eq!(loss_thresh.pkt_thresh(), None);
1309            assert_eq!(loss_thresh.time_thresh(), new_time_threshold);
1310        }
1311        // Time threshold is capped at 2.0.
1312        assert_eq!(loss_thresh.pkt_thresh(), None);
1313        assert_eq!(loss_thresh.time_thresh(), MAX_TIME_THRESHOLD);
1314    }
1315}