Skip to main content

quiche/recovery/congestion/
recovery.rs

1// Copyright (C) 2018-2019, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27use std::cmp;
28
29use std::time::Duration;
30use std::time::Instant;
31
32use std::collections::VecDeque;
33
34use super::RecoveryConfig;
35use super::Sent;
36
37use crate::packet::Epoch;
38use crate::ranges::RangeSet;
39use crate::recovery::Bandwidth;
40use crate::recovery::HandshakeStatus;
41use crate::recovery::OnLossDetectionTimeoutOutcome;
42use crate::recovery::RecoveryOps;
43use crate::recovery::StartupExit;
44use crate::Error;
45use crate::Result;
46
47#[cfg(feature = "qlog")]
48use crate::recovery::QlogMetrics;
49
50use crate::frame;
51
52#[cfg(feature = "qlog")]
53use qlog::events::EventData;
54
55use super::Congestion;
56use crate::recovery::bytes_in_flight::BytesInFlight;
57use crate::recovery::rtt::RttStats;
58use crate::recovery::LossDetectionTimer;
59use crate::recovery::OnAckReceivedOutcome;
60use crate::recovery::ReleaseDecision;
61use crate::recovery::ReleaseTime;
62use crate::recovery::GRANULARITY;
63use crate::recovery::INITIAL_PACKET_THRESHOLD;
64use crate::recovery::INITIAL_TIME_THRESHOLD;
65use crate::recovery::MAX_OUTSTANDING_NON_ACK_ELICITING;
66use crate::recovery::MAX_PACKET_THRESHOLD;
67use crate::recovery::MAX_PTO_PROBES_COUNT;
68use crate::recovery::PACKET_REORDER_TIME_THRESHOLD;
69
70#[derive(Default)]
71struct RecoveryEpoch {
72    /// The time the most recent ack-eliciting packet was sent.
73    time_of_last_ack_eliciting_packet: Option<Instant>,
74
75    /// The largest packet number acknowledged in the packet number space so
76    /// far.
77    largest_acked_packet: Option<u64>,
78
79    /// The time at which the next packet in that packet number space can be
80    /// considered lost based on exceeding the reordering window in time.
81    loss_time: Option<Instant>,
82
83    /// An association of packet numbers in a packet number space to information
84    /// about them.
85    sent_packets: VecDeque<Sent>,
86
87    loss_probes: usize,
88    in_flight_count: usize,
89
90    acked_frames: Vec<frame::Frame>,
91    lost_frames: Vec<frame::Frame>,
92
93    /// The largest packet number sent in the packet number space so far.
94    #[cfg(test)]
95    test_largest_sent_pkt_num_on_path: Option<u64>,
96}
97
98struct AckedDetectionResult {
99    acked_bytes: usize,
100    spurious_losses: usize,
101    spurious_pkt_thresh: Option<u64>,
102    has_ack_eliciting: bool,
103    has_in_flight_spurious_loss: bool,
104}
105
106struct LossDetectionResult {
107    largest_lost_pkt: Option<Sent>,
108    lost_packets: usize,
109    lost_bytes: usize,
110    pmtud_lost_bytes: usize,
111}
112
113impl RecoveryEpoch {
114    // `peer_sent_ack_ranges` should not be used without validation.
115    fn detect_and_remove_acked_packets(
116        &mut self, now: Instant, peer_sent_ack_ranges: &RangeSet,
117        newly_acked: &mut Vec<Acked>, rtt_stats: &RttStats, skip_pn: Option<u64>,
118        trace_id: &str,
119    ) -> Result<AckedDetectionResult> {
120        newly_acked.clear();
121
122        let mut acked_bytes = 0;
123        let mut spurious_losses = 0;
124        let mut spurious_pkt_thresh = None;
125        let mut has_ack_eliciting = false;
126        let mut has_in_flight_spurious_loss = false;
127
128        let largest_ack_received = peer_sent_ack_ranges
129            .last()
130            .expect("ACK frames should always have at least one ack range");
131        let largest_acked = self
132            .largest_acked_packet
133            .unwrap_or(0)
134            .max(largest_ack_received);
135
136        for peer_sent_range in peer_sent_ack_ranges.iter() {
137            if skip_pn.is_some_and(|skip_pn| peer_sent_range.contains(&skip_pn)) {
138                // https://www.rfc-editor.org/rfc/rfc9000#section-13.1
139                // An endpoint SHOULD treat receipt of an acknowledgment
140                // for a packet it did not send as
141                // a connection error of type PROTOCOL_VIOLATION
142                return Err(Error::OptimisticAckDetected);
143            }
144
145            // Because packets always have incrementing numbers, they are always
146            // in sorted order.
147            let start = if self
148                .sent_packets
149                .front()
150                .filter(|e| e.pkt_num >= peer_sent_range.start)
151                .is_some()
152            {
153                // Usually it will be the first packet.
154                0
155            } else {
156                self.sent_packets
157                    .binary_search_by_key(&peer_sent_range.start, |p| p.pkt_num)
158                    .unwrap_or_else(|e| e)
159            };
160
161            for unacked in self.sent_packets.range_mut(start..) {
162                if unacked.pkt_num >= peer_sent_range.end {
163                    break;
164                }
165
166                if unacked.time_acked.is_some() {
167                    // Already acked.
168                } else if unacked.time_lost.is_some() {
169                    // An acked packet was already declared lost.
170                    spurious_losses += 1;
171                    spurious_pkt_thresh
172                        .get_or_insert(largest_acked - unacked.pkt_num + 1);
173                    unacked.time_acked = Some(now);
174
175                    if unacked.in_flight {
176                        has_in_flight_spurious_loss = true;
177                    }
178                } else {
179                    if unacked.in_flight {
180                        self.in_flight_count -= 1;
181                        acked_bytes += unacked.size;
182                    }
183
184                    newly_acked.push(Acked {
185                        pkt_num: unacked.pkt_num,
186                        time_sent: unacked.time_sent,
187                        size: unacked.size,
188
189                        rtt: now.saturating_duration_since(unacked.time_sent),
190                        delivered: unacked.delivered,
191                        delivered_time: unacked.delivered_time,
192                        first_sent_time: unacked.first_sent_time,
193                        is_app_limited: unacked.is_app_limited,
194                    });
195
196                    trace!("{} packet newly acked {}", trace_id, unacked.pkt_num);
197
198                    self.acked_frames
199                        .extend(std::mem::take(&mut unacked.frames));
200
201                    has_ack_eliciting |= unacked.ack_eliciting;
202                    unacked.time_acked = Some(now);
203                }
204            }
205        }
206
207        self.drain_acked_and_lost_packets(now - rtt_stats.rtt());
208
209        Ok(AckedDetectionResult {
210            acked_bytes,
211            spurious_losses,
212            spurious_pkt_thresh,
213            has_ack_eliciting,
214            has_in_flight_spurious_loss,
215        })
216    }
217
218    fn detect_lost_packets(
219        &mut self, loss_delay: Duration, pkt_thresh: u64, now: Instant,
220        trace_id: &str, epoch: Epoch,
221    ) -> LossDetectionResult {
222        self.loss_time = None;
223
224        // Minimum time of kGranularity before packets are deemed lost.
225        let loss_delay = cmp::max(loss_delay, GRANULARITY);
226        let largest_acked = self.largest_acked_packet.unwrap_or(0);
227
228        // Packets sent before this time are deemed lost.
229        let lost_send_time = now.checked_sub(loss_delay).unwrap();
230
231        let mut lost_packets = 0;
232        let mut lost_bytes = 0;
233        let mut pmtud_lost_bytes = 0;
234
235        let mut largest_lost_pkt = None;
236
237        let unacked_iter = self.sent_packets
238        .iter_mut()
239        // Skip packets that follow the largest acked packet.
240        .take_while(|p| p.pkt_num <= largest_acked)
241        // Skip packets that have already been acked or lost.
242        .filter(|p| p.time_acked.is_none() && p.time_lost.is_none());
243
244        for unacked in unacked_iter {
245            // Mark packet as lost, or set time when it should be marked.
246            if unacked.time_sent <= lost_send_time ||
247                largest_acked >= unacked.pkt_num + pkt_thresh
248            {
249                self.lost_frames.extend(unacked.frames.drain(..));
250
251                unacked.time_lost = Some(now);
252
253                if unacked.is_pmtud_probe {
254                    pmtud_lost_bytes += unacked.size;
255                    self.in_flight_count -= 1;
256
257                    // Do not track PMTUD probes losses.
258                    continue;
259                }
260
261                if unacked.in_flight {
262                    lost_bytes += unacked.size;
263
264                    // Frames have already been removed from the packet, so
265                    // cloning the whole packet should be relatively cheap.
266                    largest_lost_pkt = Some(unacked.clone());
267
268                    self.in_flight_count -= 1;
269
270                    trace!(
271                        "{} packet {} lost on epoch {}",
272                        trace_id,
273                        unacked.pkt_num,
274                        epoch
275                    );
276                }
277
278                lost_packets += 1;
279            } else {
280                let loss_time = match self.loss_time {
281                    None => unacked.time_sent + loss_delay,
282
283                    Some(loss_time) =>
284                        cmp::min(loss_time, unacked.time_sent + loss_delay),
285                };
286
287                self.loss_time = Some(loss_time);
288                break;
289            }
290        }
291
292        LossDetectionResult {
293            largest_lost_pkt,
294            lost_packets,
295            lost_bytes,
296            pmtud_lost_bytes,
297        }
298    }
299
300    fn drain_acked_and_lost_packets(&mut self, loss_thresh: Instant) {
301        // In order to avoid removing elements from the middle of the list
302        // (which would require copying other elements to compact the list),
303        // we only remove a contiguous range of elements from the start of the
304        // list.
305        //
306        // This means that acked or lost elements coming after this will not
307        // be removed at this point, but their removal is delayed for a later
308        // time, once the gaps have been filled.
309        while let Some(pkt) = self.sent_packets.front() {
310            if let Some(time_lost) = pkt.time_lost {
311                if time_lost > loss_thresh {
312                    break;
313                }
314            }
315
316            if pkt.time_acked.is_none() && pkt.time_lost.is_none() {
317                break;
318            }
319
320            self.sent_packets.pop_front();
321        }
322    }
323}
324
325pub struct LegacyRecovery {
326    epochs: [RecoveryEpoch; Epoch::count()],
327
328    loss_timer: LossDetectionTimer,
329
330    pto_count: u32,
331
332    rtt_stats: RttStats,
333
334    lost_spurious_count: usize,
335
336    pkt_thresh: u64,
337
338    time_thresh: f64,
339
340    bytes_in_flight: BytesInFlight,
341
342    bytes_sent: usize,
343
344    bytes_lost: u64,
345
346    pub max_datagram_size: usize,
347
348    #[cfg(feature = "qlog")]
349    qlog_metrics: QlogMetrics,
350
351    #[cfg(feature = "qlog")]
352    qlog_prev_cc_state: &'static str,
353
354    /// How many non-ack-eliciting packets have been sent.
355    outstanding_non_ack_eliciting: usize,
356
357    pub congestion: Congestion,
358
359    /// A resusable list of acks.
360    newly_acked: Vec<Acked>,
361}
362
363impl LegacyRecovery {
364    pub fn new_with_config(recovery_config: &RecoveryConfig) -> Self {
365        Self {
366            epochs: Default::default(),
367
368            loss_timer: Default::default(),
369
370            pto_count: 0,
371
372            rtt_stats: RttStats::new(
373                recovery_config.initial_rtt,
374                recovery_config.max_ack_delay,
375            ),
376
377            lost_spurious_count: 0,
378
379            pkt_thresh: INITIAL_PACKET_THRESHOLD,
380
381            time_thresh: INITIAL_TIME_THRESHOLD,
382
383            bytes_in_flight: Default::default(),
384
385            bytes_sent: 0,
386
387            bytes_lost: 0,
388
389            max_datagram_size: recovery_config.max_send_udp_payload_size,
390
391            #[cfg(feature = "qlog")]
392            qlog_metrics: QlogMetrics::default(),
393
394            #[cfg(feature = "qlog")]
395            qlog_prev_cc_state: "",
396
397            outstanding_non_ack_eliciting: 0,
398
399            congestion: Congestion::from_config(recovery_config),
400
401            newly_acked: Vec::new(),
402        }
403    }
404
405    #[cfg(test)]
406    pub fn new(config: &crate::Config) -> Self {
407        Self::new_with_config(&RecoveryConfig::from_config(config))
408    }
409
410    fn loss_time_and_space(&self) -> (Option<Instant>, Epoch) {
411        let mut epoch = Epoch::Initial;
412        let mut time = self.epochs[epoch].loss_time;
413
414        // Iterate over all packet number spaces starting from Handshake.
415        for e in [Epoch::Handshake, Epoch::Application] {
416            let new_time = self.epochs[e].loss_time;
417            if time.is_none() || new_time < time {
418                time = new_time;
419                epoch = e;
420            }
421        }
422
423        (time, epoch)
424    }
425
426    fn pto_time_and_space(
427        &self, handshake_status: HandshakeStatus, now: Instant,
428    ) -> (Option<Instant>, Epoch) {
429        let mut duration = self.pto() * 2_u32.pow(self.pto_count);
430
431        // Arm PTO from now when there are no inflight packets.
432        if self.bytes_in_flight.is_zero() {
433            if handshake_status.has_handshake_keys {
434                return (Some(now + duration), Epoch::Handshake);
435            } else {
436                return (Some(now + duration), Epoch::Initial);
437            }
438        }
439
440        let mut pto_timeout = None;
441        let mut pto_space = Epoch::Initial;
442
443        // Iterate over all packet number spaces.
444        for e in [Epoch::Initial, Epoch::Handshake, Epoch::Application] {
445            let epoch = &self.epochs[e];
446            if epoch.in_flight_count == 0 {
447                continue;
448            }
449
450            if e == Epoch::Application {
451                // Skip Application Data until handshake completes.
452                if !handshake_status.completed {
453                    return (pto_timeout, pto_space);
454                }
455
456                // Include max_ack_delay and backoff for Application Data.
457                duration +=
458                    self.rtt_stats.max_ack_delay * 2_u32.pow(self.pto_count);
459            }
460
461            let new_time = epoch
462                .time_of_last_ack_eliciting_packet
463                .map(|t| t + duration);
464
465            if pto_timeout.is_none() || new_time < pto_timeout {
466                pto_timeout = new_time;
467                pto_space = e;
468            }
469        }
470
471        (pto_timeout, pto_space)
472    }
473
474    fn set_loss_detection_timer(
475        &mut self, handshake_status: HandshakeStatus, now: Instant,
476    ) {
477        let (earliest_loss_time, _) = self.loss_time_and_space();
478
479        if let Some(to) = earliest_loss_time {
480            // Time threshold loss detection.
481            self.loss_timer.update(to);
482            return;
483        }
484
485        if self.bytes_in_flight.is_zero() &&
486            handshake_status.peer_verified_address
487        {
488            self.loss_timer.clear();
489            return;
490        }
491
492        // PTO timer.
493        if let (Some(timeout), _) = self.pto_time_and_space(handshake_status, now)
494        {
495            self.loss_timer.update(timeout);
496        }
497    }
498
499    fn detect_lost_packets(
500        &mut self, epoch: Epoch, now: Instant, trace_id: &str,
501    ) -> (usize, usize) {
502        let loss_delay = cmp::max(self.rtt_stats.latest_rtt, self.rtt())
503            .mul_f64(self.time_thresh);
504
505        let loss = self.epochs[epoch].detect_lost_packets(
506            loss_delay,
507            self.pkt_thresh,
508            now,
509            trace_id,
510            epoch,
511        );
512
513        if let Some(pkt) = loss.largest_lost_pkt {
514            if !self.congestion.in_congestion_recovery(pkt.time_sent) {
515                (self.congestion.cc_ops.checkpoint)(&mut self.congestion);
516            }
517
518            (self.congestion.cc_ops.congestion_event)(
519                &mut self.congestion,
520                self.bytes_in_flight.get(),
521                loss.lost_bytes,
522                &pkt,
523                now,
524            );
525
526            self.bytes_in_flight
527                .saturating_subtract(loss.lost_bytes, now);
528        };
529
530        self.bytes_in_flight
531            .saturating_subtract(loss.pmtud_lost_bytes, now);
532
533        self.epochs[epoch]
534            .drain_acked_and_lost_packets(now - self.rtt_stats.rtt());
535
536        self.congestion.lost_count += loss.lost_packets;
537
538        (loss.lost_packets, loss.lost_bytes)
539    }
540}
541
542impl RecoveryOps for LegacyRecovery {
543    /// Returns whether or not we should elicit an ACK even if we wouldn't
544    /// otherwise have constructed an ACK eliciting packet.
545    fn should_elicit_ack(&self, epoch: Epoch) -> bool {
546        self.epochs[epoch].loss_probes > 0 ||
547            self.outstanding_non_ack_eliciting >=
548                MAX_OUTSTANDING_NON_ACK_ELICITING
549    }
550
551    fn next_acked_frame(&mut self, epoch: Epoch) -> Option<frame::Frame> {
552        self.epochs[epoch].acked_frames.pop()
553    }
554
555    fn next_lost_frame(&mut self, epoch: Epoch) -> Option<frame::Frame> {
556        self.epochs[epoch].lost_frames.pop()
557    }
558
559    fn get_largest_acked_on_epoch(&self, epoch: Epoch) -> Option<u64> {
560        self.epochs[epoch].largest_acked_packet
561    }
562
563    fn has_lost_frames(&self, epoch: Epoch) -> bool {
564        !self.epochs[epoch].lost_frames.is_empty()
565    }
566
567    fn loss_probes(&self, epoch: Epoch) -> usize {
568        self.epochs[epoch].loss_probes
569    }
570
571    #[cfg(test)]
572    fn inc_loss_probes(&mut self, epoch: Epoch) {
573        self.epochs[epoch].loss_probes += 1;
574    }
575
576    fn ping_sent(&mut self, epoch: Epoch) {
577        self.epochs[epoch].loss_probes =
578            self.epochs[epoch].loss_probes.saturating_sub(1);
579    }
580
581    fn on_packet_sent(
582        &mut self, mut pkt: Sent, epoch: Epoch,
583        handshake_status: HandshakeStatus, now: Instant, trace_id: &str,
584    ) {
585        let ack_eliciting = pkt.ack_eliciting;
586        let in_flight = pkt.in_flight;
587        let sent_bytes = pkt.size;
588
589        if ack_eliciting {
590            self.outstanding_non_ack_eliciting = 0;
591        } else {
592            self.outstanding_non_ack_eliciting += 1;
593        }
594
595        if in_flight && ack_eliciting {
596            self.epochs[epoch].time_of_last_ack_eliciting_packet = Some(now);
597        }
598
599        self.congestion.on_packet_sent(
600            self.bytes_in_flight.get(),
601            sent_bytes,
602            now,
603            &mut pkt,
604            self.bytes_lost,
605            in_flight,
606        );
607
608        if in_flight {
609            self.epochs[epoch].in_flight_count += 1;
610            self.bytes_in_flight.add(sent_bytes, now);
611
612            self.set_loss_detection_timer(handshake_status, now);
613        }
614
615        self.bytes_sent += sent_bytes;
616
617        #[cfg(test)]
618        {
619            self.epochs[epoch].test_largest_sent_pkt_num_on_path = self.epochs
620                [epoch]
621                .test_largest_sent_pkt_num_on_path
622                .max(Some(pkt.pkt_num));
623        }
624
625        self.epochs[epoch].sent_packets.push_back(pkt);
626
627        trace!("{trace_id} {self:?}");
628    }
629
630    fn get_packet_send_time(&self, now: Instant) -> Instant {
631        now
632    }
633
634    // `peer_sent_ack_ranges` should not be used without validation.
635    fn on_ack_received(
636        &mut self, peer_sent_ack_ranges: &RangeSet, ack_delay: u64, epoch: Epoch,
637        handshake_status: HandshakeStatus, now: Instant, skip_pn: Option<u64>,
638        trace_id: &str,
639    ) -> Result<OnAckReceivedOutcome> {
640        let AckedDetectionResult {
641            acked_bytes,
642            spurious_losses,
643            spurious_pkt_thresh,
644            has_ack_eliciting,
645            has_in_flight_spurious_loss,
646        } = self.epochs[epoch].detect_and_remove_acked_packets(
647            now,
648            peer_sent_ack_ranges,
649            &mut self.newly_acked,
650            &self.rtt_stats,
651            skip_pn,
652            trace_id,
653        )?;
654
655        self.lost_spurious_count += spurious_losses;
656        if let Some(thresh) = spurious_pkt_thresh {
657            self.pkt_thresh =
658                self.pkt_thresh.max(thresh.min(MAX_PACKET_THRESHOLD));
659            self.time_thresh = PACKET_REORDER_TIME_THRESHOLD;
660        }
661
662        // Undo congestion window update.
663        if has_in_flight_spurious_loss {
664            (self.congestion.cc_ops.rollback)(&mut self.congestion);
665        }
666
667        if self.newly_acked.is_empty() {
668            return Ok(OnAckReceivedOutcome::default());
669        }
670
671        let largest_newly_acked = self.newly_acked.last().unwrap();
672
673        // Update `largest_acked_packet` based on the validated `newly_acked`
674        // value.
675        let largest_acked_pkt_num = self.epochs[epoch]
676            .largest_acked_packet
677            .unwrap_or(0)
678            .max(largest_newly_acked.pkt_num);
679        self.epochs[epoch].largest_acked_packet = Some(largest_acked_pkt_num);
680
681        // Check if largest packet is newly acked.
682        if largest_newly_acked.pkt_num == largest_acked_pkt_num &&
683            has_ack_eliciting
684        {
685            let latest_rtt = now - largest_newly_acked.time_sent;
686            self.rtt_stats.update_rtt(
687                latest_rtt,
688                Duration::from_micros(ack_delay),
689                now,
690                handshake_status.completed,
691            );
692        }
693
694        // Detect and mark lost packets without removing them from the sent
695        // packets list.
696        let (lost_packets, lost_bytes) =
697            self.detect_lost_packets(epoch, now, trace_id);
698
699        self.congestion.on_packets_acked(
700            self.bytes_in_flight.get(),
701            &mut self.newly_acked,
702            &self.rtt_stats,
703            now,
704        );
705
706        self.bytes_in_flight.saturating_subtract(acked_bytes, now);
707
708        self.pto_count = 0;
709
710        self.set_loss_detection_timer(handshake_status, now);
711
712        self.epochs[epoch]
713            .drain_acked_and_lost_packets(now - self.rtt_stats.rtt());
714
715        Ok(OnAckReceivedOutcome {
716            lost_packets,
717            lost_bytes,
718            acked_bytes,
719            spurious_losses,
720        })
721    }
722
723    fn on_loss_detection_timeout(
724        &mut self, handshake_status: HandshakeStatus, now: Instant,
725        trace_id: &str,
726    ) -> OnLossDetectionTimeoutOutcome {
727        let (earliest_loss_time, epoch) = self.loss_time_and_space();
728
729        if earliest_loss_time.is_some() {
730            // Time threshold loss detection.
731            let (lost_packets, lost_bytes) =
732                self.detect_lost_packets(epoch, now, trace_id);
733
734            self.set_loss_detection_timer(handshake_status, now);
735
736            trace!("{trace_id} {self:?}");
737            return OnLossDetectionTimeoutOutcome {
738                lost_packets,
739                lost_bytes,
740            };
741        }
742
743        let epoch = if self.bytes_in_flight.get() > 0 {
744            // Send new data if available, else retransmit old data. If neither
745            // is available, send a single PING frame.
746            let (_, e) = self.pto_time_and_space(handshake_status, now);
747
748            e
749        } else {
750            // Client sends an anti-deadlock packet: Initial is padded to earn
751            // more anti-amplification credit, a Handshake packet proves address
752            // ownership.
753            if handshake_status.has_handshake_keys {
754                Epoch::Handshake
755            } else {
756                Epoch::Initial
757            }
758        };
759
760        self.pto_count += 1;
761
762        let epoch = &mut self.epochs[epoch];
763
764        epoch.loss_probes =
765            cmp::min(self.pto_count as usize, MAX_PTO_PROBES_COUNT);
766
767        let unacked_iter = epoch.sent_packets
768            .iter_mut()
769            // Skip packets that have already been acked or lost, and packets
770            // that don't contain either CRYPTO or STREAM frames.
771            .filter(|p| p.has_data && p.time_acked.is_none() && p.time_lost.is_none())
772            // Only return as many packets as the number of probe packets that
773            // will be sent.
774            .take(epoch.loss_probes);
775
776        // Retransmit the frames from the oldest sent packets on PTO. However
777        // the packets are not actually declared lost (so there is no effect to
778        // congestion control), we just reschedule the data they carried.
779        //
780        // This will also trigger sending an ACK and retransmitting frames like
781        // HANDSHAKE_DONE and MAX_DATA / MAX_STREAM_DATA as well, in addition
782        // to CRYPTO and STREAM, if the original packet carried them.
783        for unacked in unacked_iter {
784            epoch.lost_frames.extend_from_slice(&unacked.frames);
785        }
786
787        self.set_loss_detection_timer(handshake_status, now);
788
789        trace!("{trace_id} {self:?}");
790
791        OnLossDetectionTimeoutOutcome {
792            lost_packets: 0,
793            lost_bytes: 0,
794        }
795    }
796
797    fn on_pkt_num_space_discarded(
798        &mut self, epoch: Epoch, handshake_status: HandshakeStatus, now: Instant,
799    ) {
800        let epoch = &mut self.epochs[epoch];
801
802        let unacked_bytes = epoch
803            .sent_packets
804            .iter()
805            .filter(|p| {
806                p.in_flight && p.time_acked.is_none() && p.time_lost.is_none()
807            })
808            .fold(0, |acc, p| acc + p.size);
809
810        self.bytes_in_flight.saturating_subtract(unacked_bytes, now);
811
812        epoch.sent_packets.clear();
813        epoch.lost_frames.clear();
814        epoch.acked_frames.clear();
815
816        epoch.time_of_last_ack_eliciting_packet = None;
817        epoch.loss_time = None;
818        epoch.loss_probes = 0;
819        epoch.in_flight_count = 0;
820
821        self.set_loss_detection_timer(handshake_status, now);
822    }
823
824    fn on_path_change(
825        &mut self, epoch: Epoch, now: Instant, trace_id: &str,
826    ) -> (usize, usize) {
827        // Time threshold loss detection.
828        self.detect_lost_packets(epoch, now, trace_id)
829    }
830
831    fn loss_detection_timer(&self) -> Option<Instant> {
832        self.loss_timer.time
833    }
834
835    fn cwnd(&self) -> usize {
836        self.congestion.congestion_window()
837    }
838
839    fn cwnd_available(&self) -> usize {
840        // Ignore cwnd when sending probe packets.
841        if self.epochs.iter().any(|e| e.loss_probes > 0) {
842            return usize::MAX;
843        }
844
845        // Open more space (snd_cnt) for PRR when allowed.
846        self.cwnd().saturating_sub(self.bytes_in_flight.get()) +
847            self.congestion.prr.snd_cnt
848    }
849
850    fn rtt(&self) -> Duration {
851        self.rtt_stats.rtt()
852    }
853
854    fn min_rtt(&self) -> Option<Duration> {
855        self.rtt_stats.min_rtt()
856    }
857
858    fn max_rtt(&self) -> Option<Duration> {
859        self.rtt_stats.max_rtt()
860    }
861
862    fn rttvar(&self) -> Duration {
863        self.rtt_stats.rttvar
864    }
865
866    fn pto(&self) -> Duration {
867        self.rtt() + cmp::max(self.rtt_stats.rttvar * 4, GRANULARITY)
868    }
869
870    /// The most recent data delivery rate estimate.
871    fn delivery_rate(&self) -> Bandwidth {
872        self.congestion.delivery_rate()
873    }
874
875    fn max_bandwidth(&self) -> Option<Bandwidth> {
876        // TODO implement
877        None
878    }
879
880    /// Statistics from when a CCA first exited the startup phase.
881    fn startup_exit(&self) -> Option<StartupExit> {
882        self.congestion.ssthresh.startup_exit()
883    }
884
885    fn max_datagram_size(&self) -> usize {
886        self.max_datagram_size
887    }
888
889    fn pmtud_update_max_datagram_size(&mut self, new_max_datagram_size: usize) {
890        // Congestion Window is updated only when it's not updated already.
891        // Update cwnd if it hasn't been updated yet.
892        if self.cwnd() ==
893            self.max_datagram_size *
894                self.congestion.initial_congestion_window_packets
895        {
896            self.congestion.congestion_window = new_max_datagram_size *
897                self.congestion.initial_congestion_window_packets;
898        }
899
900        self.max_datagram_size = new_max_datagram_size;
901    }
902
903    fn update_max_datagram_size(&mut self, new_max_datagram_size: usize) {
904        self.pmtud_update_max_datagram_size(
905            self.max_datagram_size.min(new_max_datagram_size),
906        )
907    }
908
909    #[cfg(test)]
910    fn sent_packets_len(&self, epoch: Epoch) -> usize {
911        self.epochs[epoch].sent_packets.len()
912    }
913
914    #[cfg(test)]
915    fn in_flight_count(&self, epoch: Epoch) -> usize {
916        self.epochs[epoch].in_flight_count
917    }
918
919    fn bytes_in_flight(&self) -> usize {
920        self.bytes_in_flight.get()
921    }
922
923    fn bytes_in_flight_duration(&self) -> Duration {
924        self.bytes_in_flight.get_duration()
925    }
926
927    #[cfg(test)]
928    fn pacing_rate(&self) -> u64 {
929        0
930    }
931
932    #[cfg(test)]
933    fn pto_count(&self) -> u32 {
934        self.pto_count
935    }
936
937    #[cfg(test)]
938    fn pkt_thresh(&self) -> Option<u64> {
939        Some(self.pkt_thresh)
940    }
941
942    #[cfg(test)]
943    fn time_thresh(&self) -> f64 {
944        self.time_thresh
945    }
946
947    #[cfg(test)]
948    fn lost_spurious_count(&self) -> usize {
949        self.lost_spurious_count
950    }
951
952    #[cfg(test)]
953    fn detect_lost_packets_for_test(
954        &mut self, epoch: Epoch, now: Instant,
955    ) -> (usize, usize) {
956        self.detect_lost_packets(epoch, now, "")
957    }
958
959    // FIXME only used by gcongestion
960    fn on_app_limited(&mut self) {
961        // Not implemented for legacy recovery, update_app_limited and
962        // delivery_rate_update_app_limited used instead.
963    }
964
965    #[cfg(test)]
966    fn largest_sent_pkt_num_on_path(&self, epoch: Epoch) -> Option<u64> {
967        self.epochs[epoch].test_largest_sent_pkt_num_on_path
968    }
969
970    #[cfg(test)]
971    fn app_limited(&self) -> bool {
972        self.congestion.app_limited
973    }
974
975    fn update_app_limited(&mut self, v: bool) {
976        self.congestion.update_app_limited(v);
977    }
978
979    // FIXME only used by congestion
980    fn delivery_rate_update_app_limited(&mut self, v: bool) {
981        self.congestion.delivery_rate.update_app_limited(v);
982    }
983
984    // FIXME only used by congestion
985    fn update_max_ack_delay(&mut self, max_ack_delay: Duration) {
986        self.rtt_stats.max_ack_delay = max_ack_delay;
987    }
988
989    #[cfg(feature = "qlog")]
990    fn state_str(&self, now: Instant) -> &'static str {
991        (self.congestion.cc_ops.state_str)(&self.congestion, now)
992    }
993
994    #[cfg(feature = "qlog")]
995    fn get_updated_qlog_event_data(&mut self) -> Option<EventData> {
996        let qlog_metrics = QlogMetrics {
997            min_rtt: *self.rtt_stats.min_rtt,
998            smoothed_rtt: self.rtt(),
999            latest_rtt: self.rtt_stats.latest_rtt,
1000            rttvar: self.rtt_stats.rttvar,
1001            cwnd: self.cwnd() as u64,
1002            bytes_in_flight: self.bytes_in_flight.get() as u64,
1003            ssthresh: Some(self.congestion.ssthresh.get() as u64),
1004            pacing_rate: 0,
1005        };
1006
1007        self.qlog_metrics.maybe_update(qlog_metrics)
1008    }
1009
1010    #[cfg(feature = "qlog")]
1011    fn get_updated_qlog_cc_state(
1012        &mut self, now: Instant,
1013    ) -> Option<&'static str> {
1014        let cc_state = self.state_str(now);
1015        if cc_state != self.qlog_prev_cc_state {
1016            self.qlog_prev_cc_state = cc_state;
1017            Some(cc_state)
1018        } else {
1019            None
1020        }
1021    }
1022
1023    fn send_quantum(&self) -> usize {
1024        self.congestion.send_quantum()
1025    }
1026
1027    fn get_next_release_time(&self) -> ReleaseDecision {
1028        ReleaseDecision {
1029            time: ReleaseTime::Immediate,
1030            allow_burst: false,
1031        }
1032    }
1033
1034    fn gcongestion_enabled(&self) -> bool {
1035        false
1036    }
1037
1038    fn lost_count(&self) -> usize {
1039        self.congestion.lost_count
1040    }
1041
1042    fn bytes_lost(&self) -> u64 {
1043        self.bytes_lost
1044    }
1045}
1046
1047impl std::fmt::Debug for LegacyRecovery {
1048    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
1049        write!(f, "timer={:?} ", self.loss_timer)?;
1050        write!(f, "latest_rtt={:?} ", self.rtt_stats.latest_rtt)?;
1051        write!(f, "srtt={:?} ", self.rtt_stats.smoothed_rtt)?;
1052        write!(f, "min_rtt={:?} ", *self.rtt_stats.min_rtt)?;
1053        write!(f, "rttvar={:?} ", self.rtt_stats.rttvar)?;
1054        write!(f, "cwnd={} ", self.cwnd())?;
1055        write!(f, "ssthresh={} ", self.congestion.ssthresh.get())?;
1056        write!(f, "bytes_in_flight={} ", self.bytes_in_flight.get())?;
1057        write!(f, "app_limited={} ", self.congestion.app_limited)?;
1058        write!(
1059            f,
1060            "congestion_recovery_start_time={:?} ",
1061            self.congestion.congestion_recovery_start_time
1062        )?;
1063        write!(f, "{:?} ", self.congestion.delivery_rate)?;
1064
1065        if self.congestion.hystart.enabled() {
1066            write!(f, "hystart={:?} ", self.congestion.hystart)?;
1067        }
1068
1069        // CC-specific debug info
1070        (self.congestion.cc_ops.debug_fmt)(&self.congestion, f)?;
1071
1072        Ok(())
1073    }
1074}
1075
1076#[derive(Clone)]
1077pub struct Acked {
1078    pub pkt_num: u64,
1079
1080    pub time_sent: Instant,
1081
1082    pub size: usize,
1083
1084    pub rtt: Duration,
1085
1086    pub delivered: usize,
1087
1088    pub delivered_time: Instant,
1089
1090    pub first_sent_time: Instant,
1091
1092    pub is_app_limited: bool,
1093}