quiche/recovery/congestion/
recovery.rs

1// Copyright (C) 2018-2019, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27use std::cmp;
28
29use std::time::Duration;
30use std::time::Instant;
31
32use std::collections::VecDeque;
33
34use super::RecoveryConfig;
35use super::Sent;
36
37use crate::packet::Epoch;
38use crate::ranges::RangeSet;
39use crate::recovery::Bandwidth;
40use crate::recovery::HandshakeStatus;
41use crate::recovery::OnLossDetectionTimeoutOutcome;
42use crate::recovery::RecoveryOps;
43use crate::recovery::StartupExit;
44use crate::Error;
45use crate::Result;
46
47#[cfg(feature = "qlog")]
48use crate::recovery::QlogMetrics;
49
50use crate::frame;
51
52#[cfg(feature = "qlog")]
53use qlog::events::EventData;
54
55use super::pacer;
56use super::Congestion;
57use crate::recovery::bytes_in_flight::BytesInFlight;
58use crate::recovery::rtt::RttStats;
59use crate::recovery::LossDetectionTimer;
60use crate::recovery::OnAckReceivedOutcome;
61use crate::recovery::ReleaseDecision;
62use crate::recovery::ReleaseTime;
63use crate::recovery::GRANULARITY;
64use crate::recovery::INITIAL_PACKET_THRESHOLD;
65use crate::recovery::INITIAL_TIME_THRESHOLD;
66use crate::recovery::MAX_OUTSTANDING_NON_ACK_ELICITING;
67use crate::recovery::MAX_PACKET_THRESHOLD;
68use crate::recovery::MAX_PTO_PROBES_COUNT;
69
70#[derive(Default)]
71struct RecoveryEpoch {
72    /// The time the most recent ack-eliciting packet was sent.
73    time_of_last_ack_eliciting_packet: Option<Instant>,
74
75    /// The largest packet number acknowledged in the packet number space so
76    /// far.
77    largest_acked_packet: Option<u64>,
78
79    /// The time at which the next packet in that packet number space can be
80    /// considered lost based on exceeding the reordering window in time.
81    loss_time: Option<Instant>,
82
83    /// An association of packet numbers in a packet number space to information
84    /// about them.
85    sent_packets: VecDeque<Sent>,
86
87    loss_probes: usize,
88    in_flight_count: usize,
89
90    acked_frames: Vec<frame::Frame>,
91    lost_frames: Vec<frame::Frame>,
92
93    /// The largest packet number sent in the packet number space so far.
94    #[cfg(test)]
95    test_largest_sent_pkt_num_on_path: Option<u64>,
96}
97
98struct AckedDetectionResult {
99    acked_bytes: usize,
100    spurious_losses: usize,
101    spurious_pkt_thresh: Option<u64>,
102    has_ack_eliciting: bool,
103    has_in_flight_spurious_loss: bool,
104}
105
106struct LossDetectionResult {
107    largest_lost_pkt: Option<Sent>,
108    lost_packets: usize,
109    lost_bytes: usize,
110    pmtud_lost_bytes: usize,
111}
112
113impl RecoveryEpoch {
114    // `peer_sent_ack_ranges` should not be used without validation.
115    fn detect_and_remove_acked_packets(
116        &mut self, now: Instant, peer_sent_ack_ranges: &RangeSet,
117        newly_acked: &mut Vec<Acked>, rtt_stats: &RttStats, skip_pn: Option<u64>,
118        trace_id: &str,
119    ) -> Result<AckedDetectionResult> {
120        newly_acked.clear();
121
122        let mut acked_bytes = 0;
123        let mut spurious_losses = 0;
124        let mut spurious_pkt_thresh = None;
125        let mut has_ack_eliciting = false;
126        let mut has_in_flight_spurious_loss = false;
127
128        let largest_ack_received = peer_sent_ack_ranges
129            .last()
130            .expect("ACK frames should always have at least one ack range");
131        let largest_acked = self
132            .largest_acked_packet
133            .unwrap_or(0)
134            .max(largest_ack_received);
135
136        for peer_sent_range in peer_sent_ack_ranges.iter() {
137            if skip_pn.is_some_and(|skip_pn| peer_sent_range.contains(&skip_pn)) {
138                // https://www.rfc-editor.org/rfc/rfc9000#section-13.1
139                // An endpoint SHOULD treat receipt of an acknowledgment
140                // for a packet it did not send as
141                // a connection error of type PROTOCOL_VIOLATION
142                return Err(Error::OptimisticAckDetected);
143            }
144
145            // Because packets always have incrementing numbers, they are always
146            // in sorted order.
147            let start = if self
148                .sent_packets
149                .front()
150                .filter(|e| e.pkt_num >= peer_sent_range.start)
151                .is_some()
152            {
153                // Usually it will be the first packet.
154                0
155            } else {
156                self.sent_packets
157                    .binary_search_by_key(&peer_sent_range.start, |p| p.pkt_num)
158                    .unwrap_or_else(|e| e)
159            };
160
161            for unacked in self.sent_packets.range_mut(start..) {
162                if unacked.pkt_num >= peer_sent_range.end {
163                    break;
164                }
165
166                if unacked.time_acked.is_some() {
167                    // Already acked.
168                } else if unacked.time_lost.is_some() {
169                    // An acked packet was already declared lost.
170                    spurious_losses += 1;
171                    spurious_pkt_thresh
172                        .get_or_insert(largest_acked - unacked.pkt_num + 1);
173                    unacked.time_acked = Some(now);
174
175                    if unacked.in_flight {
176                        has_in_flight_spurious_loss = true;
177                    }
178                } else {
179                    if unacked.in_flight {
180                        self.in_flight_count -= 1;
181                        acked_bytes += unacked.size;
182                    }
183
184                    newly_acked.push(Acked {
185                        pkt_num: unacked.pkt_num,
186                        time_sent: unacked.time_sent,
187                        size: unacked.size,
188
189                        rtt: now.saturating_duration_since(unacked.time_sent),
190                        delivered: unacked.delivered,
191                        delivered_time: unacked.delivered_time,
192                        first_sent_time: unacked.first_sent_time,
193                        is_app_limited: unacked.is_app_limited,
194                    });
195
196                    trace!("{} packet newly acked {}", trace_id, unacked.pkt_num);
197
198                    self.acked_frames
199                        .extend(std::mem::take(&mut unacked.frames));
200
201                    has_ack_eliciting |= unacked.ack_eliciting;
202                    unacked.time_acked = Some(now);
203                }
204            }
205        }
206
207        self.drain_acked_and_lost_packets(now - rtt_stats.rtt());
208
209        Ok(AckedDetectionResult {
210            acked_bytes,
211            spurious_losses,
212            spurious_pkt_thresh,
213            has_ack_eliciting,
214            has_in_flight_spurious_loss,
215        })
216    }
217
218    fn detect_lost_packets(
219        &mut self, loss_delay: Duration, pkt_thresh: u64, now: Instant,
220        trace_id: &str, epoch: Epoch,
221    ) -> LossDetectionResult {
222        self.loss_time = None;
223
224        // Minimum time of kGranularity before packets are deemed lost.
225        let loss_delay = cmp::max(loss_delay, GRANULARITY);
226        let largest_acked = self.largest_acked_packet.unwrap_or(0);
227
228        // Packets sent before this time are deemed lost.
229        let lost_send_time = now.checked_sub(loss_delay).unwrap();
230
231        let mut lost_packets = 0;
232        let mut lost_bytes = 0;
233        let mut pmtud_lost_bytes = 0;
234
235        let mut largest_lost_pkt = None;
236
237        let unacked_iter = self.sent_packets
238        .iter_mut()
239        // Skip packets that follow the largest acked packet.
240        .take_while(|p| p.pkt_num <= largest_acked)
241        // Skip packets that have already been acked or lost.
242        .filter(|p| p.time_acked.is_none() && p.time_lost.is_none());
243
244        for unacked in unacked_iter {
245            // Mark packet as lost, or set time when it should be marked.
246            if unacked.time_sent <= lost_send_time ||
247                largest_acked >= unacked.pkt_num + pkt_thresh
248            {
249                self.lost_frames.extend(unacked.frames.drain(..));
250
251                unacked.time_lost = Some(now);
252
253                if unacked.is_pmtud_probe {
254                    pmtud_lost_bytes += unacked.size;
255                    self.in_flight_count -= 1;
256
257                    // Do not track PMTUD probes losses.
258                    continue;
259                }
260
261                if unacked.in_flight {
262                    lost_bytes += unacked.size;
263
264                    // Frames have already been removed from the packet, so
265                    // cloning the whole packet should be relatively cheap.
266                    largest_lost_pkt = Some(unacked.clone());
267
268                    self.in_flight_count -= 1;
269
270                    trace!(
271                        "{} packet {} lost on epoch {}",
272                        trace_id,
273                        unacked.pkt_num,
274                        epoch
275                    );
276                }
277
278                lost_packets += 1;
279            } else {
280                let loss_time = match self.loss_time {
281                    None => unacked.time_sent + loss_delay,
282
283                    Some(loss_time) =>
284                        cmp::min(loss_time, unacked.time_sent + loss_delay),
285                };
286
287                self.loss_time = Some(loss_time);
288                break;
289            }
290        }
291
292        LossDetectionResult {
293            largest_lost_pkt,
294            lost_packets,
295            lost_bytes,
296            pmtud_lost_bytes,
297        }
298    }
299
300    fn drain_acked_and_lost_packets(&mut self, loss_thresh: Instant) {
301        // In order to avoid removing elements from the middle of the list
302        // (which would require copying other elements to compact the list),
303        // we only remove a contiguous range of elements from the start of the
304        // list.
305        //
306        // This means that acked or lost elements coming after this will not
307        // be removed at this point, but their removal is delayed for a later
308        // time, once the gaps have been filled.
309        while let Some(pkt) = self.sent_packets.front() {
310            if let Some(time_lost) = pkt.time_lost {
311                if time_lost > loss_thresh {
312                    break;
313                }
314            }
315
316            if pkt.time_acked.is_none() && pkt.time_lost.is_none() {
317                break;
318            }
319
320            self.sent_packets.pop_front();
321        }
322    }
323}
324
325pub struct LegacyRecovery {
326    epochs: [RecoveryEpoch; Epoch::count()],
327
328    loss_timer: LossDetectionTimer,
329
330    pto_count: u32,
331
332    rtt_stats: RttStats,
333
334    lost_spurious_count: usize,
335
336    pkt_thresh: u64,
337
338    time_thresh: f64,
339
340    bytes_in_flight: BytesInFlight,
341
342    bytes_sent: usize,
343
344    bytes_lost: u64,
345
346    pub max_datagram_size: usize,
347
348    #[cfg(feature = "qlog")]
349    qlog_metrics: QlogMetrics,
350
351    #[cfg(feature = "qlog")]
352    qlog_prev_cc_state: &'static str,
353
354    /// How many non-ack-eliciting packets have been sent.
355    outstanding_non_ack_eliciting: usize,
356
357    pub congestion: Congestion,
358
359    /// A resusable list of acks.
360    newly_acked: Vec<Acked>,
361}
362
363impl LegacyRecovery {
364    pub fn new_with_config(recovery_config: &RecoveryConfig) -> Self {
365        Self {
366            epochs: Default::default(),
367
368            loss_timer: Default::default(),
369
370            pto_count: 0,
371
372            rtt_stats: RttStats::new(
373                recovery_config.initial_rtt,
374                recovery_config.max_ack_delay,
375            ),
376
377            lost_spurious_count: 0,
378
379            pkt_thresh: INITIAL_PACKET_THRESHOLD,
380
381            time_thresh: INITIAL_TIME_THRESHOLD,
382
383            bytes_in_flight: Default::default(),
384
385            bytes_sent: 0,
386
387            bytes_lost: 0,
388
389            max_datagram_size: recovery_config.max_send_udp_payload_size,
390
391            #[cfg(feature = "qlog")]
392            qlog_metrics: QlogMetrics::default(),
393
394            #[cfg(feature = "qlog")]
395            qlog_prev_cc_state: "",
396
397            outstanding_non_ack_eliciting: 0,
398
399            congestion: Congestion::from_config(recovery_config),
400
401            newly_acked: Vec::new(),
402        }
403    }
404
405    #[cfg(test)]
406    pub fn new(config: &crate::Config) -> Self {
407        Self::new_with_config(&RecoveryConfig::from_config(config))
408    }
409
410    fn loss_time_and_space(&self) -> (Option<Instant>, Epoch) {
411        let mut epoch = Epoch::Initial;
412        let mut time = self.epochs[epoch].loss_time;
413
414        // Iterate over all packet number spaces starting from Handshake.
415        for e in [Epoch::Handshake, Epoch::Application] {
416            let new_time = self.epochs[e].loss_time;
417            if time.is_none() || new_time < time {
418                time = new_time;
419                epoch = e;
420            }
421        }
422
423        (time, epoch)
424    }
425
426    fn pto_time_and_space(
427        &self, handshake_status: HandshakeStatus, now: Instant,
428    ) -> (Option<Instant>, Epoch) {
429        let mut duration = self.pto() * 2_u32.pow(self.pto_count);
430
431        // Arm PTO from now when there are no inflight packets.
432        if self.bytes_in_flight.is_zero() {
433            if handshake_status.has_handshake_keys {
434                return (Some(now + duration), Epoch::Handshake);
435            } else {
436                return (Some(now + duration), Epoch::Initial);
437            }
438        }
439
440        let mut pto_timeout = None;
441        let mut pto_space = Epoch::Initial;
442
443        // Iterate over all packet number spaces.
444        for e in [Epoch::Initial, Epoch::Handshake, Epoch::Application] {
445            let epoch = &self.epochs[e];
446            if epoch.in_flight_count == 0 {
447                continue;
448            }
449
450            if e == Epoch::Application {
451                // Skip Application Data until handshake completes.
452                if !handshake_status.completed {
453                    return (pto_timeout, pto_space);
454                }
455
456                // Include max_ack_delay and backoff for Application Data.
457                duration +=
458                    self.rtt_stats.max_ack_delay * 2_u32.pow(self.pto_count);
459            }
460
461            let new_time = epoch
462                .time_of_last_ack_eliciting_packet
463                .map(|t| t + duration);
464
465            if pto_timeout.is_none() || new_time < pto_timeout {
466                pto_timeout = new_time;
467                pto_space = e;
468            }
469        }
470
471        (pto_timeout, pto_space)
472    }
473
474    fn set_loss_detection_timer(
475        &mut self, handshake_status: HandshakeStatus, now: Instant,
476    ) {
477        let (earliest_loss_time, _) = self.loss_time_and_space();
478
479        if let Some(to) = earliest_loss_time {
480            // Time threshold loss detection.
481            self.loss_timer.update(to);
482            return;
483        }
484
485        if self.bytes_in_flight.is_zero() &&
486            handshake_status.peer_verified_address
487        {
488            self.loss_timer.clear();
489            return;
490        }
491
492        // PTO timer.
493        if let (Some(timeout), _) = self.pto_time_and_space(handshake_status, now)
494        {
495            self.loss_timer.update(timeout);
496        }
497    }
498
499    fn detect_lost_packets(
500        &mut self, epoch: Epoch, now: Instant, trace_id: &str,
501    ) -> (usize, usize) {
502        let loss_delay = cmp::max(self.rtt_stats.latest_rtt, self.rtt())
503            .mul_f64(self.time_thresh);
504
505        let loss = self.epochs[epoch].detect_lost_packets(
506            loss_delay,
507            self.pkt_thresh,
508            now,
509            trace_id,
510            epoch,
511        );
512
513        if let Some(pkt) = loss.largest_lost_pkt {
514            if !self.congestion.in_congestion_recovery(pkt.time_sent) {
515                (self.congestion.cc_ops.checkpoint)(&mut self.congestion);
516            }
517
518            (self.congestion.cc_ops.congestion_event)(
519                &mut self.congestion,
520                self.bytes_in_flight.get(),
521                loss.lost_bytes,
522                &pkt,
523                now,
524            );
525
526            self.bytes_in_flight
527                .saturating_subtract(loss.lost_bytes, now);
528        };
529
530        self.bytes_in_flight
531            .saturating_subtract(loss.pmtud_lost_bytes, now);
532
533        self.epochs[epoch]
534            .drain_acked_and_lost_packets(now - self.rtt_stats.rtt());
535
536        self.congestion.lost_count += loss.lost_packets;
537
538        (loss.lost_packets, loss.lost_bytes)
539    }
540}
541
542impl RecoveryOps for LegacyRecovery {
543    /// Returns whether or not we should elicit an ACK even if we wouldn't
544    /// otherwise have constructed an ACK eliciting packet.
545    fn should_elicit_ack(&self, epoch: Epoch) -> bool {
546        self.epochs[epoch].loss_probes > 0 ||
547            self.outstanding_non_ack_eliciting >=
548                MAX_OUTSTANDING_NON_ACK_ELICITING
549    }
550
551    fn get_acked_frames(&mut self, epoch: Epoch) -> Vec<frame::Frame> {
552        std::mem::take(&mut self.epochs[epoch].acked_frames)
553    }
554
555    fn get_lost_frames(&mut self, epoch: Epoch) -> Vec<frame::Frame> {
556        std::mem::take(&mut self.epochs[epoch].lost_frames)
557    }
558
559    fn get_largest_acked_on_epoch(&self, epoch: Epoch) -> Option<u64> {
560        self.epochs[epoch].largest_acked_packet
561    }
562
563    fn has_lost_frames(&self, epoch: Epoch) -> bool {
564        !self.epochs[epoch].lost_frames.is_empty()
565    }
566
567    fn loss_probes(&self, epoch: Epoch) -> usize {
568        self.epochs[epoch].loss_probes
569    }
570
571    #[cfg(test)]
572    fn inc_loss_probes(&mut self, epoch: Epoch) {
573        self.epochs[epoch].loss_probes += 1;
574    }
575
576    fn ping_sent(&mut self, epoch: Epoch) {
577        self.epochs[epoch].loss_probes =
578            self.epochs[epoch].loss_probes.saturating_sub(1);
579    }
580
581    fn on_packet_sent(
582        &mut self, mut pkt: Sent, epoch: Epoch,
583        handshake_status: HandshakeStatus, now: Instant, trace_id: &str,
584    ) {
585        let ack_eliciting = pkt.ack_eliciting;
586        let in_flight = pkt.in_flight;
587        let sent_bytes = pkt.size;
588
589        if ack_eliciting {
590            self.outstanding_non_ack_eliciting = 0;
591        } else {
592            self.outstanding_non_ack_eliciting += 1;
593        }
594
595        if in_flight && ack_eliciting {
596            self.epochs[epoch].time_of_last_ack_eliciting_packet = Some(now);
597        }
598
599        self.congestion.on_packet_sent(
600            self.bytes_in_flight.get(),
601            sent_bytes,
602            now,
603            &mut pkt,
604            &self.rtt_stats,
605            self.bytes_lost,
606            in_flight,
607        );
608
609        if in_flight {
610            self.epochs[epoch].in_flight_count += 1;
611            self.bytes_in_flight.add(sent_bytes, now);
612
613            self.set_loss_detection_timer(handshake_status, now);
614        }
615
616        self.bytes_sent += sent_bytes;
617
618        #[cfg(test)]
619        {
620            self.epochs[epoch].test_largest_sent_pkt_num_on_path = self.epochs
621                [epoch]
622                .test_largest_sent_pkt_num_on_path
623                .max(Some(pkt.pkt_num));
624        }
625
626        self.epochs[epoch].sent_packets.push_back(pkt);
627
628        trace!("{trace_id} {self:?}");
629    }
630
631    fn get_packet_send_time(&self, _now: Instant) -> Instant {
632        // TODO .max(now)
633        self.congestion.get_packet_send_time()
634    }
635
636    // `peer_sent_ack_ranges` should not be used without validation.
637    fn on_ack_received(
638        &mut self, peer_sent_ack_ranges: &RangeSet, ack_delay: u64, epoch: Epoch,
639        handshake_status: HandshakeStatus, now: Instant, skip_pn: Option<u64>,
640        trace_id: &str,
641    ) -> Result<OnAckReceivedOutcome> {
642        let AckedDetectionResult {
643            acked_bytes,
644            spurious_losses,
645            spurious_pkt_thresh,
646            has_ack_eliciting,
647            has_in_flight_spurious_loss,
648        } = self.epochs[epoch].detect_and_remove_acked_packets(
649            now,
650            peer_sent_ack_ranges,
651            &mut self.newly_acked,
652            &self.rtt_stats,
653            skip_pn,
654            trace_id,
655        )?;
656
657        self.lost_spurious_count += spurious_losses;
658        if let Some(thresh) = spurious_pkt_thresh {
659            self.pkt_thresh =
660                self.pkt_thresh.max(thresh.min(MAX_PACKET_THRESHOLD));
661        }
662
663        // Undo congestion window update.
664        if has_in_flight_spurious_loss {
665            (self.congestion.cc_ops.rollback)(&mut self.congestion);
666        }
667
668        if self.newly_acked.is_empty() {
669            return Ok(OnAckReceivedOutcome::default());
670        }
671
672        let largest_newly_acked = self.newly_acked.last().unwrap();
673
674        // Update `largest_acked_packet` based on the validated `newly_acked`
675        // value.
676        let largest_acked_pkt_num = self.epochs[epoch]
677            .largest_acked_packet
678            .unwrap_or(0)
679            .max(largest_newly_acked.pkt_num);
680        self.epochs[epoch].largest_acked_packet = Some(largest_acked_pkt_num);
681
682        // Check if largest packet is newly acked.
683        if largest_newly_acked.pkt_num == largest_acked_pkt_num &&
684            has_ack_eliciting
685        {
686            let latest_rtt = now - largest_newly_acked.time_sent;
687            self.rtt_stats.update_rtt(
688                latest_rtt,
689                Duration::from_micros(ack_delay),
690                now,
691                handshake_status.completed,
692            );
693        }
694
695        // Detect and mark lost packets without removing them from the sent
696        // packets list.
697        let (lost_packets, lost_bytes) =
698            self.detect_lost_packets(epoch, now, trace_id);
699
700        self.congestion.on_packets_acked(
701            self.bytes_in_flight.get(),
702            &mut self.newly_acked,
703            &self.rtt_stats,
704            now,
705        );
706
707        self.bytes_in_flight.saturating_subtract(acked_bytes, now);
708
709        self.pto_count = 0;
710
711        self.set_loss_detection_timer(handshake_status, now);
712
713        self.epochs[epoch]
714            .drain_acked_and_lost_packets(now - self.rtt_stats.rtt());
715
716        Ok(OnAckReceivedOutcome {
717            lost_packets,
718            lost_bytes,
719            acked_bytes,
720            spurious_losses,
721        })
722    }
723
724    fn on_loss_detection_timeout(
725        &mut self, handshake_status: HandshakeStatus, now: Instant,
726        trace_id: &str,
727    ) -> OnLossDetectionTimeoutOutcome {
728        let (earliest_loss_time, epoch) = self.loss_time_and_space();
729
730        if earliest_loss_time.is_some() {
731            // Time threshold loss detection.
732            let (lost_packets, lost_bytes) =
733                self.detect_lost_packets(epoch, now, trace_id);
734
735            self.set_loss_detection_timer(handshake_status, now);
736
737            trace!("{trace_id} {self:?}");
738            return OnLossDetectionTimeoutOutcome {
739                lost_packets,
740                lost_bytes,
741            };
742        }
743
744        let epoch = if self.bytes_in_flight.get() > 0 {
745            // Send new data if available, else retransmit old data. If neither
746            // is available, send a single PING frame.
747            let (_, e) = self.pto_time_and_space(handshake_status, now);
748
749            e
750        } else {
751            // Client sends an anti-deadlock packet: Initial is padded to earn
752            // more anti-amplification credit, a Handshake packet proves address
753            // ownership.
754            if handshake_status.has_handshake_keys {
755                Epoch::Handshake
756            } else {
757                Epoch::Initial
758            }
759        };
760
761        self.pto_count += 1;
762
763        let epoch = &mut self.epochs[epoch];
764
765        epoch.loss_probes =
766            cmp::min(self.pto_count as usize, MAX_PTO_PROBES_COUNT);
767
768        let unacked_iter = epoch.sent_packets
769            .iter_mut()
770            // Skip packets that have already been acked or lost, and packets
771            // that don't contain either CRYPTO or STREAM frames.
772            .filter(|p| p.has_data && p.time_acked.is_none() && p.time_lost.is_none())
773            // Only return as many packets as the number of probe packets that
774            // will be sent.
775            .take(epoch.loss_probes);
776
777        // Retransmit the frames from the oldest sent packets on PTO. However
778        // the packets are not actually declared lost (so there is no effect to
779        // congestion control), we just reschedule the data they carried.
780        //
781        // This will also trigger sending an ACK and retransmitting frames like
782        // HANDSHAKE_DONE and MAX_DATA / MAX_STREAM_DATA as well, in addition
783        // to CRYPTO and STREAM, if the original packet carried them.
784        for unacked in unacked_iter {
785            epoch.lost_frames.extend_from_slice(&unacked.frames);
786        }
787
788        self.set_loss_detection_timer(handshake_status, now);
789
790        trace!("{trace_id} {self:?}");
791
792        OnLossDetectionTimeoutOutcome {
793            lost_packets: 0,
794            lost_bytes: 0,
795        }
796    }
797
798    fn on_pkt_num_space_discarded(
799        &mut self, epoch: Epoch, handshake_status: HandshakeStatus, now: Instant,
800    ) {
801        let epoch = &mut self.epochs[epoch];
802
803        let unacked_bytes = epoch
804            .sent_packets
805            .iter()
806            .filter(|p| {
807                p.in_flight && p.time_acked.is_none() && p.time_lost.is_none()
808            })
809            .fold(0, |acc, p| acc + p.size);
810
811        self.bytes_in_flight.saturating_subtract(unacked_bytes, now);
812
813        epoch.sent_packets.clear();
814        epoch.lost_frames.clear();
815        epoch.acked_frames.clear();
816
817        epoch.time_of_last_ack_eliciting_packet = None;
818        epoch.loss_time = None;
819        epoch.loss_probes = 0;
820        epoch.in_flight_count = 0;
821
822        self.set_loss_detection_timer(handshake_status, now);
823    }
824
825    fn on_path_change(
826        &mut self, epoch: Epoch, now: Instant, trace_id: &str,
827    ) -> (usize, usize) {
828        // Time threshold loss detection.
829        self.detect_lost_packets(epoch, now, trace_id)
830    }
831
832    fn loss_detection_timer(&self) -> Option<Instant> {
833        self.loss_timer.time
834    }
835
836    fn cwnd(&self) -> usize {
837        self.congestion.congestion_window()
838    }
839
840    fn cwnd_available(&self) -> usize {
841        // Ignore cwnd when sending probe packets.
842        if self.epochs.iter().any(|e| e.loss_probes > 0) {
843            return usize::MAX;
844        }
845
846        // Open more space (snd_cnt) for PRR when allowed.
847        self.cwnd().saturating_sub(self.bytes_in_flight.get()) +
848            self.congestion.prr.snd_cnt
849    }
850
851    fn rtt(&self) -> Duration {
852        self.rtt_stats.rtt()
853    }
854
855    fn min_rtt(&self) -> Option<Duration> {
856        self.rtt_stats.min_rtt()
857    }
858
859    fn max_rtt(&self) -> Option<Duration> {
860        self.rtt_stats.max_rtt()
861    }
862
863    fn rttvar(&self) -> Duration {
864        self.rtt_stats.rttvar
865    }
866
867    fn pto(&self) -> Duration {
868        self.rtt() + cmp::max(self.rtt_stats.rttvar * 4, GRANULARITY)
869    }
870
871    /// The most recent data delivery rate estimate.
872    fn delivery_rate(&self) -> Bandwidth {
873        self.congestion.delivery_rate()
874    }
875
876    /// Statistics from when a CCA first exited the startup phase.
877    fn startup_exit(&self) -> Option<StartupExit> {
878        self.congestion.ssthresh.startup_exit()
879    }
880
881    fn max_datagram_size(&self) -> usize {
882        self.max_datagram_size
883    }
884
885    fn pmtud_update_max_datagram_size(&mut self, new_max_datagram_size: usize) {
886        // Congestion Window is updated only when it's not updated already.
887        // Update cwnd if it hasn't been updated yet.
888        if self.cwnd() ==
889            self.max_datagram_size *
890                self.congestion.initial_congestion_window_packets
891        {
892            self.congestion.congestion_window = new_max_datagram_size *
893                self.congestion.initial_congestion_window_packets;
894        }
895
896        self.congestion.pacer = pacer::Pacer::new(
897            self.congestion.pacer.enabled(),
898            self.cwnd(),
899            0,
900            new_max_datagram_size,
901            self.congestion.pacer.max_pacing_rate(),
902        );
903
904        self.max_datagram_size = new_max_datagram_size;
905    }
906
907    fn update_max_datagram_size(&mut self, new_max_datagram_size: usize) {
908        self.pmtud_update_max_datagram_size(
909            self.max_datagram_size.min(new_max_datagram_size),
910        )
911    }
912
913    #[cfg(test)]
914    fn sent_packets_len(&self, epoch: Epoch) -> usize {
915        self.epochs[epoch].sent_packets.len()
916    }
917
918    #[cfg(test)]
919    fn in_flight_count(&self, epoch: Epoch) -> usize {
920        self.epochs[epoch].in_flight_count
921    }
922
923    #[cfg(test)]
924    fn bytes_in_flight(&self) -> usize {
925        self.bytes_in_flight.get()
926    }
927
928    fn bytes_in_flight_duration(&self) -> Duration {
929        self.bytes_in_flight.get_duration()
930    }
931
932    #[cfg(test)]
933    fn pacing_rate(&self) -> u64 {
934        self.congestion.pacer.rate()
935    }
936
937    #[cfg(test)]
938    fn pto_count(&self) -> u32 {
939        self.pto_count
940    }
941
942    #[cfg(test)]
943    fn pkt_thresh(&self) -> u64 {
944        self.pkt_thresh
945    }
946
947    #[cfg(test)]
948    fn lost_spurious_count(&self) -> usize {
949        self.lost_spurious_count
950    }
951
952    #[cfg(test)]
953    fn detect_lost_packets_for_test(
954        &mut self, epoch: Epoch, now: Instant,
955    ) -> (usize, usize) {
956        self.detect_lost_packets(epoch, now, "")
957    }
958
959    // FIXME only used by gcongestion
960    fn on_app_limited(&mut self) {
961        // Not implemented for legacy recovery, update_app_limited and
962        // delivery_rate_update_app_limited used instead.
963    }
964
965    #[cfg(test)]
966    fn largest_sent_pkt_num_on_path(&self, epoch: Epoch) -> Option<u64> {
967        self.epochs[epoch].test_largest_sent_pkt_num_on_path
968    }
969
970    #[cfg(test)]
971    fn app_limited(&self) -> bool {
972        self.congestion.app_limited
973    }
974
975    fn update_app_limited(&mut self, v: bool) {
976        self.congestion.update_app_limited(v);
977    }
978
979    // FIXME only used by congestion
980    fn delivery_rate_update_app_limited(&mut self, v: bool) {
981        self.congestion.delivery_rate.update_app_limited(v);
982    }
983
984    // FIXME only used by congestion
985    fn update_max_ack_delay(&mut self, max_ack_delay: Duration) {
986        self.rtt_stats.max_ack_delay = max_ack_delay;
987    }
988
989    #[cfg(feature = "qlog")]
990    fn state_str(&self, now: Instant) -> &'static str {
991        (self.congestion.cc_ops.state_str)(&self.congestion, now)
992    }
993
994    #[cfg(feature = "qlog")]
995    fn get_updated_qlog_event_data(&mut self) -> Option<EventData> {
996        let qlog_metrics = QlogMetrics {
997            min_rtt: *self.rtt_stats.min_rtt,
998            smoothed_rtt: self.rtt(),
999            latest_rtt: self.rtt_stats.latest_rtt,
1000            rttvar: self.rtt_stats.rttvar,
1001            cwnd: self.cwnd() as u64,
1002            bytes_in_flight: self.bytes_in_flight.get() as u64,
1003            ssthresh: Some(self.congestion.ssthresh.get() as u64),
1004            pacing_rate: self.congestion.pacer.rate(),
1005        };
1006
1007        self.qlog_metrics.maybe_update(qlog_metrics)
1008    }
1009
1010    #[cfg(feature = "qlog")]
1011    fn get_updated_qlog_cc_state(
1012        &mut self, now: Instant,
1013    ) -> Option<&'static str> {
1014        let cc_state = self.state_str(now);
1015        if cc_state != self.qlog_prev_cc_state {
1016            self.qlog_prev_cc_state = cc_state;
1017            Some(cc_state)
1018        } else {
1019            None
1020        }
1021    }
1022
1023    fn send_quantum(&self) -> usize {
1024        self.congestion.send_quantum()
1025    }
1026
1027    // TODO tests
1028    fn get_next_release_time(&self) -> ReleaseDecision {
1029        let now = Instant::now();
1030        let next_send_time = self.congestion.get_packet_send_time();
1031        if next_send_time > now {
1032            ReleaseDecision {
1033                time: ReleaseTime::At(next_send_time),
1034                allow_burst: false,
1035            }
1036        } else {
1037            ReleaseDecision {
1038                time: ReleaseTime::Immediate,
1039                allow_burst: false,
1040            }
1041        }
1042    }
1043
1044    fn gcongestion_enabled(&self) -> bool {
1045        false
1046    }
1047
1048    fn lost_count(&self) -> usize {
1049        self.congestion.lost_count
1050    }
1051
1052    fn bytes_lost(&self) -> u64 {
1053        self.bytes_lost
1054    }
1055}
1056
1057impl std::fmt::Debug for LegacyRecovery {
1058    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
1059        write!(f, "timer={:?} ", self.loss_timer)?;
1060        write!(f, "latest_rtt={:?} ", self.rtt_stats.latest_rtt)?;
1061        write!(f, "srtt={:?} ", self.rtt_stats.smoothed_rtt)?;
1062        write!(f, "min_rtt={:?} ", *self.rtt_stats.min_rtt)?;
1063        write!(f, "rttvar={:?} ", self.rtt_stats.rttvar)?;
1064        write!(f, "cwnd={} ", self.cwnd())?;
1065        write!(f, "ssthresh={} ", self.congestion.ssthresh.get())?;
1066        write!(f, "bytes_in_flight={} ", self.bytes_in_flight.get())?;
1067        write!(f, "app_limited={} ", self.congestion.app_limited)?;
1068        write!(
1069            f,
1070            "congestion_recovery_start_time={:?} ",
1071            self.congestion.congestion_recovery_start_time
1072        )?;
1073        write!(f, "{:?} ", self.congestion.delivery_rate)?;
1074        write!(f, "pacer={:?} ", self.congestion.pacer)?;
1075
1076        if self.congestion.hystart.enabled() {
1077            write!(f, "hystart={:?} ", self.congestion.hystart)?;
1078        }
1079
1080        // CC-specific debug info
1081        (self.congestion.cc_ops.debug_fmt)(&self.congestion, f)?;
1082
1083        Ok(())
1084    }
1085}
1086
1087#[derive(Clone)]
1088pub struct Acked {
1089    pub pkt_num: u64,
1090
1091    pub time_sent: Instant,
1092
1093    pub size: usize,
1094
1095    pub rtt: Duration,
1096
1097    pub delivered: usize,
1098
1099    pub delivered_time: Instant,
1100
1101    pub first_sent_time: Instant,
1102
1103    pub is_app_limited: bool,
1104}