quiche/recovery/gcongestion/
recovery.rs

1use crate::packet;
2use crate::recovery::OnLossDetectionTimeoutOutcome;
3
4use std::collections::VecDeque;
5use std::time::Duration;
6use std::time::Instant;
7
8use smallvec::SmallVec;
9
10#[cfg(feature = "qlog")]
11use qlog::events::EventData;
12
13#[cfg(feature = "qlog")]
14use crate::recovery::QlogMetrics;
15
16use crate::frame;
17
18use crate::recovery::bytes_in_flight::BytesInFlight;
19use crate::recovery::gcongestion::Bandwidth;
20use crate::recovery::rtt::RttStats;
21use crate::recovery::CongestionControlAlgorithm;
22use crate::recovery::HandshakeStatus;
23use crate::recovery::LossDetectionTimer;
24use crate::recovery::OnAckReceivedOutcome;
25use crate::recovery::RangeSet;
26use crate::recovery::RecoveryConfig;
27use crate::recovery::RecoveryOps;
28use crate::recovery::RecoveryStats;
29use crate::recovery::ReleaseDecision;
30use crate::recovery::Sent;
31use crate::recovery::StartupExit;
32use crate::recovery::GRANULARITY;
33use crate::recovery::INITIAL_PACKET_THRESHOLD;
34use crate::recovery::INITIAL_TIME_THRESHOLD;
35use crate::recovery::MAX_OUTSTANDING_NON_ACK_ELICITING;
36use crate::recovery::MAX_PACKET_THRESHOLD;
37use crate::recovery::MAX_PTO_PROBES_COUNT;
38use crate::Error;
39use crate::Result;
40
41use super::pacer::Pacer;
42use super::Acked;
43use super::Congestion;
44use super::CongestionControl;
45use super::Lost;
46
47// Congestion Control
48const MAX_WINDOW_PACKETS: usize = 20_000;
49
50#[derive(Debug)]
51struct SentPacket {
52    pkt_num: u64,
53    status: SentStatus,
54}
55
56#[derive(Debug)]
57enum SentStatus {
58    Sent {
59        time_sent: Instant,
60        ack_eliciting: bool,
61        in_flight: bool,
62        has_data: bool,
63        is_pmtud_probe: bool,
64        sent_bytes: usize,
65        frames: SmallVec<[frame::Frame; 1]>,
66    },
67    Acked,
68    Lost,
69}
70
71impl SentStatus {
72    fn ack(&mut self) -> Self {
73        std::mem::replace(self, SentStatus::Acked)
74    }
75
76    fn lose(&mut self) -> Self {
77        if !matches!(self, SentStatus::Acked) {
78            std::mem::replace(self, SentStatus::Lost)
79        } else {
80            SentStatus::Acked
81        }
82    }
83}
84
85#[derive(Default)]
86struct RecoveryEpoch {
87    /// The time the most recent ack-eliciting packet was sent.
88    time_of_last_ack_eliciting_packet: Option<Instant>,
89
90    /// The largest packet number acknowledged in the packet number space so
91    /// far.
92    largest_acked_packet: Option<u64>,
93
94    /// The time at which the next packet in that packet number space can be
95    /// considered lost based on exceeding the reordering window in time.
96    loss_time: Option<Instant>,
97
98    /// An association of packet numbers in a packet number space to information
99    /// about them.
100    sent_packets: VecDeque<SentPacket>,
101
102    loss_probes: usize,
103    pkts_in_flight: usize,
104
105    acked_frames: Vec<frame::Frame>,
106    lost_frames: Vec<frame::Frame>,
107
108    /// The largest packet number sent in the packet number space so far.
109    #[allow(dead_code)]
110    test_largest_sent_pkt_num_on_path: Option<u64>,
111}
112
113struct AckedDetectionResult {
114    acked_bytes: usize,
115    spurious_losses: usize,
116    spurious_pkt_thresh: Option<u64>,
117    has_ack_eliciting: bool,
118}
119
120struct LossDetectionResult {
121    lost_bytes: usize,
122    lost_packets: usize,
123
124    pmtud_lost_bytes: usize,
125    pmtud_lost_packets: SmallVec<[u64; 1]>,
126}
127
128impl RecoveryEpoch {
129    /// Discard the Epoch state and return the total size of unacked packets
130    /// that were discarded
131    fn discard(&mut self, cc: &mut impl CongestionControl) -> usize {
132        let unacked_bytes = self
133            .sent_packets
134            .drain(..)
135            .map(|p| {
136                if let SentPacket {
137                    status:
138                        SentStatus::Sent {
139                            in_flight,
140                            sent_bytes,
141                            ..
142                        },
143                    pkt_num,
144                } = p
145                {
146                    cc.on_packet_neutered(pkt_num);
147                    if in_flight {
148                        return sent_bytes;
149                    }
150                }
151                0
152            })
153            .sum();
154
155        std::mem::take(&mut self.sent_packets);
156        self.time_of_last_ack_eliciting_packet = None;
157        self.loss_time = None;
158        self.loss_probes = 0;
159        self.pkts_in_flight = 0;
160
161        unacked_bytes
162    }
163
164    // `peer_sent_ack_ranges` should not be used without validation.
165    fn detect_and_remove_acked_packets(
166        &mut self, peer_sent_ack_ranges: &RangeSet, newly_acked: &mut Vec<Acked>,
167        skip_pn: Option<u64>, trace_id: &str,
168    ) -> Result<AckedDetectionResult> {
169        newly_acked.clear();
170
171        let mut acked_bytes = 0;
172        let mut spurious_losses = 0;
173        let mut spurious_pkt_thresh = None;
174        let mut has_ack_eliciting = false;
175
176        let largest_ack_received = peer_sent_ack_ranges.last().unwrap();
177        let largest_acked = self
178            .largest_acked_packet
179            .unwrap_or(0)
180            .max(largest_ack_received);
181
182        for peer_sent_range in peer_sent_ack_ranges.iter() {
183            if skip_pn.is_some_and(|skip_pn| peer_sent_range.contains(&skip_pn)) {
184                // https://www.rfc-editor.org/rfc/rfc9000#section-13.1
185                // An endpoint SHOULD treat receipt of an acknowledgment
186                // for a packet it did not send as
187                // a connection error of type PROTOCOL_VIOLATION
188                return Err(Error::OptimisticAckDetected);
189            }
190
191            // Because packets always have incrementing numbers, they are always
192            // in sorted order.
193            let start = if self
194                .sent_packets
195                .front()
196                .filter(|e| e.pkt_num >= peer_sent_range.start)
197                .is_some()
198            {
199                // Usually it will be the first packet.
200                0
201            } else {
202                self.sent_packets
203                    .binary_search_by_key(&peer_sent_range.start, |p| p.pkt_num)
204                    .unwrap_or_else(|e| e)
205            };
206
207            for SentPacket { pkt_num, status } in
208                self.sent_packets.range_mut(start..)
209            {
210                if *pkt_num < peer_sent_range.end {
211                    match status.ack() {
212                        SentStatus::Sent {
213                            time_sent,
214                            in_flight,
215                            sent_bytes,
216                            frames,
217                            ack_eliciting,
218                            ..
219                        } => {
220                            if in_flight {
221                                self.pkts_in_flight -= 1;
222                                acked_bytes += sent_bytes;
223                            }
224                            newly_acked.push(Acked {
225                                pkt_num: *pkt_num,
226                                time_sent,
227                            });
228
229                            self.acked_frames.extend(frames);
230
231                            has_ack_eliciting |= ack_eliciting;
232
233                            trace!("{trace_id} packet newly acked {pkt_num}");
234                        },
235
236                        SentStatus::Acked => {},
237                        SentStatus::Lost => {
238                            // An acked packet was already declared lost
239                            spurious_losses += 1;
240                            spurious_pkt_thresh
241                                .get_or_insert(largest_acked - *pkt_num + 1);
242                        },
243                    }
244                } else {
245                    break;
246                }
247            }
248        }
249
250        self.drain_acked_and_lost_packets();
251
252        Ok(AckedDetectionResult {
253            acked_bytes,
254            spurious_losses,
255            spurious_pkt_thresh,
256            has_ack_eliciting,
257        })
258    }
259
260    fn detect_and_remove_lost_packets(
261        &mut self, loss_delay: Duration, pkt_thresh: u64, now: Instant,
262        newly_lost: &mut Vec<Lost>,
263    ) -> LossDetectionResult {
264        newly_lost.clear();
265        let mut lost_bytes = 0;
266        self.loss_time = None;
267
268        let lost_send_time = now.checked_sub(loss_delay).unwrap();
269        let largest_acked = self.largest_acked_packet.unwrap_or(0);
270        let mut pmtud_lost_bytes = 0;
271        let mut pmtud_lost_packets = SmallVec::new();
272
273        for SentPacket { pkt_num, status } in &mut self.sent_packets {
274            if *pkt_num > largest_acked {
275                break;
276            }
277
278            if let SentStatus::Sent { time_sent, .. } = status {
279                if *time_sent <= lost_send_time ||
280                    largest_acked >= *pkt_num + pkt_thresh
281                {
282                    if let SentStatus::Sent {
283                        in_flight,
284                        sent_bytes,
285                        frames,
286                        is_pmtud_probe,
287                        ..
288                    } = status.lose()
289                    {
290                        self.lost_frames.extend(frames);
291
292                        if in_flight {
293                            self.pkts_in_flight -= 1;
294
295                            if is_pmtud_probe {
296                                pmtud_lost_bytes += sent_bytes;
297                                pmtud_lost_packets.push(*pkt_num);
298                                // Do not track PMTUD probes losses
299                                continue;
300                            }
301
302                            lost_bytes += sent_bytes;
303                        }
304
305                        newly_lost.push(Lost {
306                            packet_number: *pkt_num,
307                            bytes_lost: sent_bytes,
308                        });
309                    }
310                } else {
311                    self.loss_time = Some(*time_sent + loss_delay);
312                    break;
313                }
314            }
315        }
316
317        LossDetectionResult {
318            lost_bytes,
319            lost_packets: newly_lost.len(),
320
321            pmtud_lost_bytes,
322            pmtud_lost_packets,
323        }
324    }
325
326    /// Remove packets that were already handled from the front of the queue,
327    /// but avoid removing packets from the middle of the queue to avoid
328    /// compaction
329    fn drain_acked_and_lost_packets(&mut self) {
330        while let Some(SentPacket {
331            status: SentStatus::Acked | SentStatus::Lost,
332            ..
333        }) = self.sent_packets.front()
334        {
335            self.sent_packets.pop_front();
336        }
337    }
338
339    fn least_unacked(&self) -> u64 {
340        for pkt in self.sent_packets.iter() {
341            if let SentPacket {
342                pkt_num,
343                status: SentStatus::Sent { .. },
344            } = pkt
345            {
346                return *pkt_num;
347            }
348        }
349
350        self.largest_acked_packet.unwrap_or(0) + 1
351    }
352}
353
354pub struct GRecovery {
355    epochs: [RecoveryEpoch; packet::Epoch::count()],
356
357    loss_timer: LossDetectionTimer,
358
359    pto_count: u32,
360
361    rtt_stats: RttStats,
362
363    recovery_stats: RecoveryStats,
364
365    pub lost_count: usize,
366
367    pub lost_spurious_count: usize,
368
369    pkt_thresh: u64,
370
371    time_thresh: f64,
372
373    bytes_in_flight: BytesInFlight,
374
375    bytes_sent: usize,
376
377    pub bytes_lost: u64,
378
379    max_datagram_size: usize,
380
381    #[cfg(feature = "qlog")]
382    qlog_metrics: QlogMetrics,
383
384    #[cfg(feature = "qlog")]
385    qlog_prev_cc_state: &'static str,
386
387    /// How many non-ack-eliciting packets have been sent.
388    outstanding_non_ack_eliciting: usize,
389
390    /// A resusable list of acks.
391    newly_acked: Vec<Acked>,
392
393    /// A [`Vec`] that can be reused for calls of
394    /// [`Self::detect_and_remove_lost_packets`] to avoid allocations
395    lost_reuse: Vec<Lost>,
396
397    pacer: Pacer,
398}
399
400impl GRecovery {
401    pub fn new(recovery_config: &RecoveryConfig) -> Option<Self> {
402        let cc = match recovery_config.cc_algorithm {
403            CongestionControlAlgorithm::Bbr2Gcongestion => Congestion::bbrv2(
404                recovery_config.initial_congestion_window_packets,
405                MAX_WINDOW_PACKETS,
406                recovery_config,
407            ),
408            _ => return None,
409        };
410
411        Some(Self {
412            epochs: Default::default(),
413            rtt_stats: RttStats::new(recovery_config.max_ack_delay),
414            recovery_stats: RecoveryStats::default(),
415            loss_timer: Default::default(),
416            pto_count: 0,
417
418            lost_count: 0,
419            lost_spurious_count: 0,
420
421            pkt_thresh: INITIAL_PACKET_THRESHOLD,
422            time_thresh: INITIAL_TIME_THRESHOLD,
423
424            bytes_in_flight: Default::default(),
425            bytes_sent: 0,
426            bytes_lost: 0,
427
428            max_datagram_size: recovery_config.max_send_udp_payload_size,
429
430            #[cfg(feature = "qlog")]
431            qlog_metrics: QlogMetrics::default(),
432
433            #[cfg(feature = "qlog")]
434            qlog_prev_cc_state: "",
435
436            outstanding_non_ack_eliciting: 0,
437
438            pacer: Pacer::new(
439                recovery_config.pacing,
440                cc,
441                recovery_config
442                    .max_pacing_rate
443                    .map(Bandwidth::from_mbits_per_second),
444            ),
445
446            newly_acked: Vec::new(),
447            lost_reuse: Vec::new(),
448        })
449    }
450
451    fn detect_and_remove_lost_packets(
452        &mut self, epoch: packet::Epoch, now: Instant,
453    ) -> (usize, usize) {
454        let lost = &mut self.lost_reuse;
455
456        let LossDetectionResult {
457            lost_bytes,
458            lost_packets,
459
460            pmtud_lost_bytes,
461            pmtud_lost_packets,
462        } = self.epochs[epoch].detect_and_remove_lost_packets(
463            self.rtt_stats.loss_delay(self.time_thresh),
464            self.pkt_thresh,
465            now,
466            lost,
467        );
468
469        self.bytes_in_flight
470            .saturating_subtract(lost_bytes + pmtud_lost_bytes, now);
471
472        for pkt in pmtud_lost_packets {
473            self.pacer.on_packet_neutered(pkt);
474        }
475
476        (lost_bytes, lost_packets)
477    }
478
479    fn loss_time_and_space(&self) -> (Option<Instant>, packet::Epoch) {
480        let mut epoch = packet::Epoch::Initial;
481        let mut time = self.epochs[epoch].loss_time;
482
483        // Iterate over all packet number spaces starting from Handshake.
484        for e in [packet::Epoch::Handshake, packet::Epoch::Application] {
485            let new_time = self.epochs[e].loss_time;
486            if time.is_none() || new_time < time {
487                time = new_time;
488                epoch = e;
489            }
490        }
491
492        (time, epoch)
493    }
494
495    fn pto_time_and_space(
496        &self, handshake_status: HandshakeStatus, now: Instant,
497    ) -> (Option<Instant>, packet::Epoch) {
498        let mut duration = self.pto() * (1 << self.pto_count);
499
500        // Arm PTO from now when there are no inflight packets.
501        if self.bytes_in_flight.is_zero() {
502            if handshake_status.has_handshake_keys {
503                return (Some(now + duration), packet::Epoch::Handshake);
504            } else {
505                return (Some(now + duration), packet::Epoch::Initial);
506            }
507        }
508
509        let mut pto_timeout = None;
510        let mut pto_space = packet::Epoch::Initial;
511
512        // Iterate over all packet number spaces.
513        for &e in packet::Epoch::epochs(
514            packet::Epoch::Initial..=packet::Epoch::Application,
515        ) {
516            if self.epochs[e].pkts_in_flight == 0 {
517                continue;
518            }
519
520            if e == packet::Epoch::Application {
521                // Skip Application Data until handshake completes.
522                if !handshake_status.completed {
523                    return (pto_timeout, pto_space);
524                }
525
526                // Include max_ack_delay and backoff for Application Data.
527                duration +=
528                    self.rtt_stats.max_ack_delay * 2_u32.pow(self.pto_count);
529            }
530
531            let new_time = self.epochs[e]
532                .time_of_last_ack_eliciting_packet
533                .map(|t| t + duration);
534
535            if pto_timeout.is_none() || new_time < pto_timeout {
536                pto_timeout = new_time;
537                pto_space = e;
538            }
539        }
540
541        (pto_timeout, pto_space)
542    }
543
544    fn set_loss_detection_timer(
545        &mut self, handshake_status: HandshakeStatus, now: Instant,
546    ) {
547        if let (Some(earliest_loss_time), _) = self.loss_time_and_space() {
548            // Time threshold loss detection.
549            self.loss_timer.update(earliest_loss_time);
550            return;
551        }
552
553        if self.bytes_in_flight.is_zero() &&
554            handshake_status.peer_verified_address
555        {
556            self.loss_timer.clear();
557            return;
558        }
559
560        // PTO timer.
561        if let (Some(timeout), _) = self.pto_time_and_space(handshake_status, now)
562        {
563            self.loss_timer.update(timeout);
564        }
565    }
566}
567
568impl RecoveryOps for GRecovery {
569    fn lost_count(&self) -> usize {
570        self.lost_count
571    }
572
573    fn bytes_lost(&self) -> u64 {
574        self.bytes_lost
575    }
576
577    fn should_elicit_ack(&self, epoch: packet::Epoch) -> bool {
578        self.epochs[epoch].loss_probes > 0 ||
579            self.outstanding_non_ack_eliciting >=
580                MAX_OUTSTANDING_NON_ACK_ELICITING
581    }
582
583    fn get_acked_frames(&mut self, epoch: packet::Epoch) -> Vec<frame::Frame> {
584        std::mem::take(&mut self.epochs[epoch].acked_frames)
585    }
586
587    fn get_lost_frames(&mut self, epoch: packet::Epoch) -> Vec<frame::Frame> {
588        std::mem::take(&mut self.epochs[epoch].lost_frames)
589    }
590
591    fn get_largest_acked_on_epoch(&self, epoch: packet::Epoch) -> Option<u64> {
592        self.epochs[epoch].largest_acked_packet
593    }
594
595    fn has_lost_frames(&self, epoch: packet::Epoch) -> bool {
596        !self.epochs[epoch].lost_frames.is_empty()
597    }
598
599    fn loss_probes(&self, epoch: packet::Epoch) -> usize {
600        self.epochs[epoch].loss_probes
601    }
602
603    #[cfg(test)]
604    fn inc_loss_probes(&mut self, epoch: packet::Epoch) {
605        self.epochs[epoch].loss_probes += 1;
606    }
607
608    fn ping_sent(&mut self, epoch: packet::Epoch) {
609        self.epochs[epoch].loss_probes =
610            self.epochs[epoch].loss_probes.saturating_sub(1);
611    }
612
613    fn on_packet_sent(
614        &mut self, pkt: Sent, epoch: packet::Epoch,
615        handshake_status: HandshakeStatus, now: Instant, trace_id: &str,
616    ) {
617        let time_sent = self.get_next_release_time().time(now).unwrap_or(now);
618
619        let epoch = &mut self.epochs[epoch];
620
621        let ack_eliciting = pkt.ack_eliciting;
622        let in_flight = pkt.in_flight;
623        let is_pmtud_probe = pkt.is_pmtud_probe;
624        let pkt_num = pkt.pkt_num;
625        let sent_bytes = pkt.size;
626
627        if let Some(SentPacket { pkt_num, .. }) = epoch.sent_packets.back() {
628            assert!(*pkt_num < pkt.pkt_num, "Packet numbers must increase");
629        }
630
631        let status = SentStatus::Sent {
632            time_sent,
633            ack_eliciting,
634            in_flight,
635            is_pmtud_probe,
636            has_data: pkt.has_data,
637            sent_bytes,
638            frames: pkt.frames,
639        };
640
641        #[cfg(test)]
642        {
643            epoch.test_largest_sent_pkt_num_on_path = epoch
644                .test_largest_sent_pkt_num_on_path
645                .max(Some(pkt.pkt_num));
646        }
647
648        epoch.sent_packets.push_back(SentPacket { pkt_num, status });
649
650        if ack_eliciting {
651            epoch.time_of_last_ack_eliciting_packet = Some(time_sent);
652            self.outstanding_non_ack_eliciting = 0;
653        } else {
654            self.outstanding_non_ack_eliciting += 1;
655        }
656
657        if in_flight {
658            self.pacer.on_packet_sent(
659                time_sent,
660                self.bytes_in_flight.get(),
661                pkt_num,
662                sent_bytes,
663                pkt.has_data,
664                &self.rtt_stats,
665            );
666
667            self.bytes_in_flight.add(sent_bytes, now);
668            epoch.pkts_in_flight += 1;
669            self.set_loss_detection_timer(handshake_status, time_sent);
670        }
671
672        self.bytes_sent += sent_bytes;
673
674        trace!("{trace_id} {self:?}");
675    }
676
677    fn get_packet_send_time(&self, now: Instant) -> Instant {
678        self.pacer.get_next_release_time().time(now).unwrap_or(now)
679    }
680
681    // `peer_sent_ack_ranges` should not be used without validation.
682    fn on_ack_received(
683        &mut self, peer_sent_ack_ranges: &RangeSet, ack_delay: u64,
684        epoch: packet::Epoch, handshake_status: HandshakeStatus, now: Instant,
685        skip_pn: Option<u64>, trace_id: &str,
686    ) -> Result<OnAckReceivedOutcome> {
687        let prior_in_flight = self.bytes_in_flight.get();
688
689        let AckedDetectionResult {
690            acked_bytes,
691            spurious_losses,
692            spurious_pkt_thresh,
693            has_ack_eliciting,
694        } = self.epochs[epoch].detect_and_remove_acked_packets(
695            peer_sent_ack_ranges,
696            &mut self.newly_acked,
697            skip_pn,
698            trace_id,
699        )?;
700
701        self.lost_spurious_count += spurious_losses;
702        if let Some(thresh) = spurious_pkt_thresh {
703            self.pkt_thresh =
704                self.pkt_thresh.max(thresh.min(MAX_PACKET_THRESHOLD));
705        }
706
707        if self.newly_acked.is_empty() {
708            return Ok(OnAckReceivedOutcome::default());
709        }
710
711        self.bytes_in_flight.saturating_subtract(acked_bytes, now);
712
713        let largest_newly_acked = self.newly_acked.last().unwrap();
714
715        // Update `largest_acked_packet` based on the validated `newly_acked`
716        // value.
717        let largest_acked_pkt_num = self.epochs[epoch]
718            .largest_acked_packet
719            .unwrap_or(0)
720            .max(largest_newly_acked.pkt_num);
721        self.epochs[epoch].largest_acked_packet = Some(largest_acked_pkt_num);
722
723        // Check if largest packet is newly acked.
724        let update_rtt = largest_newly_acked.pkt_num == largest_acked_pkt_num &&
725            has_ack_eliciting;
726        if update_rtt {
727            let latest_rtt = now - largest_newly_acked.time_sent;
728            self.rtt_stats.update_rtt(
729                latest_rtt,
730                Duration::from_micros(ack_delay),
731                now,
732                handshake_status.completed,
733            );
734        }
735
736        let (lost_bytes, lost_packets) =
737            self.detect_and_remove_lost_packets(epoch, now);
738
739        self.pacer.on_congestion_event(
740            update_rtt,
741            prior_in_flight,
742            self.bytes_in_flight.get(),
743            now,
744            &self.newly_acked,
745            &self.lost_reuse,
746            self.epochs[epoch].least_unacked(),
747            &self.rtt_stats,
748            &mut self.recovery_stats,
749        );
750
751        self.pto_count = 0;
752        self.lost_count += lost_packets;
753
754        self.set_loss_detection_timer(handshake_status, now);
755
756        trace!("{trace_id} {self:?}");
757
758        Ok(OnAckReceivedOutcome {
759            lost_packets,
760            lost_bytes,
761            acked_bytes,
762            spurious_losses,
763        })
764    }
765
766    fn on_loss_detection_timeout(
767        &mut self, handshake_status: HandshakeStatus, now: Instant,
768        trace_id: &str,
769    ) -> OnLossDetectionTimeoutOutcome {
770        let (earliest_loss_time, epoch) = self.loss_time_and_space();
771
772        if earliest_loss_time.is_some() {
773            let prior_in_flight = self.bytes_in_flight.get();
774
775            let (lost_bytes, lost_packets) =
776                self.detect_and_remove_lost_packets(epoch, now);
777
778            self.pacer.on_congestion_event(
779                false,
780                prior_in_flight,
781                self.bytes_in_flight.get(),
782                now,
783                &[],
784                &self.lost_reuse,
785                self.epochs[epoch].least_unacked(),
786                &self.rtt_stats,
787                &mut self.recovery_stats,
788            );
789
790            self.lost_count += lost_packets;
791
792            self.set_loss_detection_timer(handshake_status, now);
793
794            trace!("{trace_id} {self:?}");
795            return OnLossDetectionTimeoutOutcome {
796                lost_packets,
797                lost_bytes,
798            };
799        }
800
801        let epoch = if self.bytes_in_flight.get() > 0 {
802            // Send new data if available, else retransmit old data. If neither
803            // is available, send a single PING frame.
804            let (_, e) = self.pto_time_and_space(handshake_status, now);
805
806            e
807        } else {
808            // Client sends an anti-deadlock packet: Initial is padded to earn
809            // more anti-amplification credit, a Handshake packet proves address
810            // ownership.
811            if handshake_status.has_handshake_keys {
812                packet::Epoch::Handshake
813            } else {
814                packet::Epoch::Initial
815            }
816        };
817
818        self.pto_count += 1;
819
820        let epoch = &mut self.epochs[epoch];
821
822        epoch.loss_probes = MAX_PTO_PROBES_COUNT.min(self.pto_count as usize);
823
824        // Skip packets that have already been acked or lost, and packets
825        // that don't contain either CRYPTO or STREAM frames and only return as
826        // many packets as the number of probe packets that will be sent.
827        let unacked_frames = epoch
828            .sent_packets
829            .iter_mut()
830            .filter_map(|p| {
831                if let SentStatus::Sent {
832                    has_data: true,
833                    frames,
834                    ..
835                } = &p.status
836                {
837                    Some(frames)
838                } else {
839                    None
840                }
841            })
842            .take(epoch.loss_probes)
843            .flatten()
844            .filter(|f| !matches!(f, frame::Frame::DatagramHeader { .. }));
845
846        // Retransmit the frames from the oldest sent packets on PTO. However
847        // the packets are not actually declared lost (so there is no effect to
848        // congestion control), we just reschedule the data they carried.
849        //
850        // This will also trigger sending an ACK and retransmitting frames like
851        // HANDSHAKE_DONE and MAX_DATA / MAX_STREAM_DATA as well, in addition
852        // to CRYPTO and STREAM, if the original packet carried them.
853        epoch.lost_frames.extend(unacked_frames.cloned());
854
855        self.pacer
856            .on_retransmission_timeout(!epoch.lost_frames.is_empty());
857
858        self.set_loss_detection_timer(handshake_status, now);
859
860        trace!("{trace_id} {self:?}");
861        OnLossDetectionTimeoutOutcome {
862            lost_packets: 0,
863            lost_bytes: 0,
864        }
865    }
866
867    fn on_pkt_num_space_discarded(
868        &mut self, epoch: packet::Epoch, handshake_status: HandshakeStatus,
869        now: Instant,
870    ) {
871        let epoch = &mut self.epochs[epoch];
872        self.bytes_in_flight
873            .saturating_subtract(epoch.discard(&mut self.pacer), now);
874        self.set_loss_detection_timer(handshake_status, now);
875    }
876
877    fn on_path_change(
878        &mut self, epoch: packet::Epoch, now: Instant, _trace_id: &str,
879    ) -> (usize, usize) {
880        let (lost_bytes, lost_packets) =
881            self.detect_and_remove_lost_packets(epoch, now);
882
883        (lost_packets, lost_bytes)
884    }
885
886    fn loss_detection_timer(&self) -> Option<Instant> {
887        self.loss_timer.time
888    }
889
890    fn cwnd(&self) -> usize {
891        self.pacer.get_congestion_window()
892    }
893
894    fn cwnd_available(&self) -> usize {
895        // Ignore cwnd when sending probe packets.
896        if self.epochs.iter().any(|e| e.loss_probes > 0) {
897            return usize::MAX;
898        }
899
900        self.cwnd().saturating_sub(self.bytes_in_flight.get())
901    }
902
903    fn rtt(&self) -> Duration {
904        self.rtt_stats.rtt()
905    }
906
907    fn min_rtt(&self) -> Option<Duration> {
908        self.rtt_stats.min_rtt()
909    }
910
911    fn max_rtt(&self) -> Option<Duration> {
912        self.rtt_stats.max_rtt()
913    }
914
915    fn rttvar(&self) -> Duration {
916        self.rtt_stats.rttvar()
917    }
918
919    fn pto(&self) -> Duration {
920        let r = &self.rtt_stats;
921        r.rtt() + (r.rttvar() * 4).max(GRANULARITY)
922    }
923
924    /// The most recent data delivery rate estimate.
925    fn delivery_rate(&self) -> Bandwidth {
926        self.pacer.bandwidth_estimate(&self.rtt_stats)
927    }
928
929    /// Statistics from when a CCA first exited the startup phase.
930    fn startup_exit(&self) -> Option<StartupExit> {
931        self.recovery_stats.startup_exit
932    }
933
934    fn max_datagram_size(&self) -> usize {
935        self.max_datagram_size
936    }
937
938    fn pmtud_update_max_datagram_size(&mut self, new_max_datagram_size: usize) {
939        self.max_datagram_size = new_max_datagram_size;
940        self.pacer.update_mss(self.max_datagram_size);
941    }
942
943    fn update_max_datagram_size(&mut self, new_max_datagram_size: usize) {
944        self.pmtud_update_max_datagram_size(
945            self.max_datagram_size.min(new_max_datagram_size),
946        )
947    }
948
949    // FIXME only used by gcongestion
950    fn on_app_limited(&mut self) {
951        self.pacer.on_app_limited(self.bytes_in_flight.get())
952    }
953
954    #[cfg(test)]
955    fn sent_packets_len(&self, epoch: packet::Epoch) -> usize {
956        self.epochs[epoch].sent_packets.len()
957    }
958
959    #[cfg(test)]
960    fn in_flight_count(&self, epoch: packet::Epoch) -> usize {
961        self.epochs[epoch].pkts_in_flight
962    }
963
964    #[cfg(test)]
965    fn bytes_in_flight(&self) -> usize {
966        self.bytes_in_flight.get()
967    }
968
969    fn bytes_in_flight_duration(&self) -> Duration {
970        self.bytes_in_flight.get_duration()
971    }
972
973    #[cfg(test)]
974    fn pacing_rate(&self) -> u64 {
975        self.pacer
976            .pacing_rate(self.bytes_in_flight.get(), &self.rtt_stats)
977            .to_bytes_per_period(Duration::from_secs(1))
978    }
979
980    #[cfg(test)]
981    fn pto_count(&self) -> u32 {
982        self.pto_count
983    }
984
985    #[cfg(test)]
986    fn pkt_thresh(&self) -> u64 {
987        self.pkt_thresh
988    }
989
990    #[cfg(test)]
991    fn lost_spurious_count(&self) -> usize {
992        self.lost_spurious_count
993    }
994
995    #[cfg(test)]
996    fn detect_lost_packets_for_test(
997        &mut self, epoch: packet::Epoch, now: Instant,
998    ) -> (usize, usize) {
999        let ret = self.detect_and_remove_lost_packets(epoch, now);
1000        self.epochs[epoch].drain_acked_and_lost_packets();
1001        ret
1002    }
1003
1004    #[cfg(test)]
1005    fn largest_sent_pkt_num_on_path(&self, epoch: packet::Epoch) -> Option<u64> {
1006        self.epochs[epoch].test_largest_sent_pkt_num_on_path
1007    }
1008
1009    #[cfg(test)]
1010    fn app_limited(&self) -> bool {
1011        self.pacer.is_app_limited(self.bytes_in_flight.get())
1012    }
1013
1014    // FIXME only used by congestion
1015    fn update_app_limited(&mut self, _v: bool) {
1016        // TODO
1017    }
1018
1019    // FIXME only used by congestion
1020    fn delivery_rate_update_app_limited(&mut self, _v: bool) {
1021        // TODO
1022    }
1023
1024    fn update_max_ack_delay(&mut self, max_ack_delay: Duration) {
1025        self.rtt_stats.max_ack_delay = max_ack_delay;
1026    }
1027
1028    fn get_next_release_time(&self) -> ReleaseDecision {
1029        self.pacer.get_next_release_time()
1030    }
1031
1032    fn gcongestion_enabled(&self) -> bool {
1033        true
1034    }
1035
1036    #[cfg(feature = "qlog")]
1037    fn state_str(&self, _now: Instant) -> &'static str {
1038        self.pacer.state_str()
1039    }
1040
1041    #[cfg(feature = "qlog")]
1042    fn get_updated_qlog_event_data(&mut self) -> Option<EventData> {
1043        let qlog_metrics = QlogMetrics {
1044            min_rtt: *self.rtt_stats.min_rtt,
1045            smoothed_rtt: self.rtt(),
1046            latest_rtt: self.rtt_stats.latest_rtt(),
1047            rttvar: self.rtt_stats.rttvar(),
1048            cwnd: self.cwnd() as u64,
1049            bytes_in_flight: self.bytes_in_flight.get() as u64,
1050            ssthresh: self.pacer.ssthresh(),
1051            pacing_rate: self.delivery_rate().to_bytes_per_second(),
1052        };
1053
1054        self.qlog_metrics.maybe_update(qlog_metrics)
1055    }
1056
1057    #[cfg(feature = "qlog")]
1058    fn get_updated_qlog_cc_state(
1059        &mut self, now: Instant,
1060    ) -> Option<&'static str> {
1061        let cc_state = self.state_str(now);
1062        if cc_state != self.qlog_prev_cc_state {
1063            self.qlog_prev_cc_state = cc_state;
1064            Some(cc_state)
1065        } else {
1066            None
1067        }
1068    }
1069
1070    fn send_quantum(&self) -> usize {
1071        let pacing_rate = self
1072            .pacer
1073            .pacing_rate(self.bytes_in_flight.get(), &self.rtt_stats);
1074
1075        let floor = if pacing_rate < Bandwidth::from_kbits_per_second(1200) {
1076            self.max_datagram_size
1077        } else {
1078            2 * self.max_datagram_size
1079        };
1080
1081        pacing_rate
1082            .to_bytes_per_period(ReleaseDecision::EQUAL_THRESHOLD)
1083            .min(64 * 1024)
1084            .max(floor as u64) as usize
1085    }
1086}
1087
1088impl std::fmt::Debug for GRecovery {
1089    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
1090        write!(f, "timer={:?} ", self.loss_detection_timer())?;
1091        write!(f, "rtt_stats={:?} ", self.rtt_stats)?;
1092        write!(f, "bytes_in_flight={} ", self.bytes_in_flight.get())?;
1093        write!(f, "{:?} ", self.pacer)?;
1094        Ok(())
1095    }
1096}