quiche/recovery/congestion/
recovery.rs

1// Copyright (C) 2018-2019, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27use std::cmp;
28
29use std::time::Duration;
30use std::time::Instant;
31
32use std::collections::VecDeque;
33
34use super::RecoveryConfig;
35use super::Sent;
36
37use crate::packet::Epoch;
38use crate::ranges::RangeSet;
39use crate::recovery::Bandwidth;
40use crate::recovery::HandshakeStatus;
41use crate::recovery::OnLossDetectionTimeoutOutcome;
42use crate::recovery::RecoveryOps;
43use crate::recovery::StartupExit;
44use crate::Error;
45use crate::Result;
46
47#[cfg(feature = "qlog")]
48use crate::recovery::QlogMetrics;
49
50use crate::frame;
51
52#[cfg(feature = "qlog")]
53use qlog::events::EventData;
54
55use super::pacer;
56use super::Congestion;
57use crate::recovery::bytes_in_flight::BytesInFlight;
58use crate::recovery::rtt::RttStats;
59use crate::recovery::LossDetectionTimer;
60use crate::recovery::OnAckReceivedOutcome;
61use crate::recovery::ReleaseDecision;
62use crate::recovery::ReleaseTime;
63use crate::recovery::GRANULARITY;
64use crate::recovery::INITIAL_PACKET_THRESHOLD;
65use crate::recovery::INITIAL_TIME_THRESHOLD;
66use crate::recovery::MAX_OUTSTANDING_NON_ACK_ELICITING;
67use crate::recovery::MAX_PACKET_THRESHOLD;
68use crate::recovery::MAX_PTO_PROBES_COUNT;
69use crate::recovery::PACKET_REORDER_TIME_THRESHOLD;
70
71#[derive(Default)]
72struct RecoveryEpoch {
73    /// The time the most recent ack-eliciting packet was sent.
74    time_of_last_ack_eliciting_packet: Option<Instant>,
75
76    /// The largest packet number acknowledged in the packet number space so
77    /// far.
78    largest_acked_packet: Option<u64>,
79
80    /// The time at which the next packet in that packet number space can be
81    /// considered lost based on exceeding the reordering window in time.
82    loss_time: Option<Instant>,
83
84    /// An association of packet numbers in a packet number space to information
85    /// about them.
86    sent_packets: VecDeque<Sent>,
87
88    loss_probes: usize,
89    in_flight_count: usize,
90
91    acked_frames: Vec<frame::Frame>,
92    lost_frames: Vec<frame::Frame>,
93
94    /// The largest packet number sent in the packet number space so far.
95    #[cfg(test)]
96    test_largest_sent_pkt_num_on_path: Option<u64>,
97}
98
99struct AckedDetectionResult {
100    acked_bytes: usize,
101    spurious_losses: usize,
102    spurious_pkt_thresh: Option<u64>,
103    has_ack_eliciting: bool,
104    has_in_flight_spurious_loss: bool,
105}
106
107struct LossDetectionResult {
108    largest_lost_pkt: Option<Sent>,
109    lost_packets: usize,
110    lost_bytes: usize,
111    pmtud_lost_bytes: usize,
112}
113
114impl RecoveryEpoch {
115    // `peer_sent_ack_ranges` should not be used without validation.
116    fn detect_and_remove_acked_packets(
117        &mut self, now: Instant, peer_sent_ack_ranges: &RangeSet,
118        newly_acked: &mut Vec<Acked>, rtt_stats: &RttStats, skip_pn: Option<u64>,
119        trace_id: &str,
120    ) -> Result<AckedDetectionResult> {
121        newly_acked.clear();
122
123        let mut acked_bytes = 0;
124        let mut spurious_losses = 0;
125        let mut spurious_pkt_thresh = None;
126        let mut has_ack_eliciting = false;
127        let mut has_in_flight_spurious_loss = false;
128
129        let largest_ack_received = peer_sent_ack_ranges
130            .last()
131            .expect("ACK frames should always have at least one ack range");
132        let largest_acked = self
133            .largest_acked_packet
134            .unwrap_or(0)
135            .max(largest_ack_received);
136
137        for peer_sent_range in peer_sent_ack_ranges.iter() {
138            if skip_pn.is_some_and(|skip_pn| peer_sent_range.contains(&skip_pn)) {
139                // https://www.rfc-editor.org/rfc/rfc9000#section-13.1
140                // An endpoint SHOULD treat receipt of an acknowledgment
141                // for a packet it did not send as
142                // a connection error of type PROTOCOL_VIOLATION
143                return Err(Error::OptimisticAckDetected);
144            }
145
146            // Because packets always have incrementing numbers, they are always
147            // in sorted order.
148            let start = if self
149                .sent_packets
150                .front()
151                .filter(|e| e.pkt_num >= peer_sent_range.start)
152                .is_some()
153            {
154                // Usually it will be the first packet.
155                0
156            } else {
157                self.sent_packets
158                    .binary_search_by_key(&peer_sent_range.start, |p| p.pkt_num)
159                    .unwrap_or_else(|e| e)
160            };
161
162            for unacked in self.sent_packets.range_mut(start..) {
163                if unacked.pkt_num >= peer_sent_range.end {
164                    break;
165                }
166
167                if unacked.time_acked.is_some() {
168                    // Already acked.
169                } else if unacked.time_lost.is_some() {
170                    // An acked packet was already declared lost.
171                    spurious_losses += 1;
172                    spurious_pkt_thresh
173                        .get_or_insert(largest_acked - unacked.pkt_num + 1);
174                    unacked.time_acked = Some(now);
175
176                    if unacked.in_flight {
177                        has_in_flight_spurious_loss = true;
178                    }
179                } else {
180                    if unacked.in_flight {
181                        self.in_flight_count -= 1;
182                        acked_bytes += unacked.size;
183                    }
184
185                    newly_acked.push(Acked {
186                        pkt_num: unacked.pkt_num,
187                        time_sent: unacked.time_sent,
188                        size: unacked.size,
189
190                        rtt: now.saturating_duration_since(unacked.time_sent),
191                        delivered: unacked.delivered,
192                        delivered_time: unacked.delivered_time,
193                        first_sent_time: unacked.first_sent_time,
194                        is_app_limited: unacked.is_app_limited,
195                    });
196
197                    trace!("{} packet newly acked {}", trace_id, unacked.pkt_num);
198
199                    self.acked_frames
200                        .extend(std::mem::take(&mut unacked.frames));
201
202                    has_ack_eliciting |= unacked.ack_eliciting;
203                    unacked.time_acked = Some(now);
204                }
205            }
206        }
207
208        self.drain_acked_and_lost_packets(now - rtt_stats.rtt());
209
210        Ok(AckedDetectionResult {
211            acked_bytes,
212            spurious_losses,
213            spurious_pkt_thresh,
214            has_ack_eliciting,
215            has_in_flight_spurious_loss,
216        })
217    }
218
219    fn detect_lost_packets(
220        &mut self, loss_delay: Duration, pkt_thresh: u64, now: Instant,
221        trace_id: &str, epoch: Epoch,
222    ) -> LossDetectionResult {
223        self.loss_time = None;
224
225        // Minimum time of kGranularity before packets are deemed lost.
226        let loss_delay = cmp::max(loss_delay, GRANULARITY);
227        let largest_acked = self.largest_acked_packet.unwrap_or(0);
228
229        // Packets sent before this time are deemed lost.
230        let lost_send_time = now.checked_sub(loss_delay).unwrap();
231
232        let mut lost_packets = 0;
233        let mut lost_bytes = 0;
234        let mut pmtud_lost_bytes = 0;
235
236        let mut largest_lost_pkt = None;
237
238        let unacked_iter = self.sent_packets
239        .iter_mut()
240        // Skip packets that follow the largest acked packet.
241        .take_while(|p| p.pkt_num <= largest_acked)
242        // Skip packets that have already been acked or lost.
243        .filter(|p| p.time_acked.is_none() && p.time_lost.is_none());
244
245        for unacked in unacked_iter {
246            // Mark packet as lost, or set time when it should be marked.
247            if unacked.time_sent <= lost_send_time ||
248                largest_acked >= unacked.pkt_num + pkt_thresh
249            {
250                self.lost_frames.extend(unacked.frames.drain(..));
251
252                unacked.time_lost = Some(now);
253
254                if unacked.is_pmtud_probe {
255                    pmtud_lost_bytes += unacked.size;
256                    self.in_flight_count -= 1;
257
258                    // Do not track PMTUD probes losses.
259                    continue;
260                }
261
262                if unacked.in_flight {
263                    lost_bytes += unacked.size;
264
265                    // Frames have already been removed from the packet, so
266                    // cloning the whole packet should be relatively cheap.
267                    largest_lost_pkt = Some(unacked.clone());
268
269                    self.in_flight_count -= 1;
270
271                    trace!(
272                        "{} packet {} lost on epoch {}",
273                        trace_id,
274                        unacked.pkt_num,
275                        epoch
276                    );
277                }
278
279                lost_packets += 1;
280            } else {
281                let loss_time = match self.loss_time {
282                    None => unacked.time_sent + loss_delay,
283
284                    Some(loss_time) =>
285                        cmp::min(loss_time, unacked.time_sent + loss_delay),
286                };
287
288                self.loss_time = Some(loss_time);
289                break;
290            }
291        }
292
293        LossDetectionResult {
294            largest_lost_pkt,
295            lost_packets,
296            lost_bytes,
297            pmtud_lost_bytes,
298        }
299    }
300
301    fn drain_acked_and_lost_packets(&mut self, loss_thresh: Instant) {
302        // In order to avoid removing elements from the middle of the list
303        // (which would require copying other elements to compact the list),
304        // we only remove a contiguous range of elements from the start of the
305        // list.
306        //
307        // This means that acked or lost elements coming after this will not
308        // be removed at this point, but their removal is delayed for a later
309        // time, once the gaps have been filled.
310        while let Some(pkt) = self.sent_packets.front() {
311            if let Some(time_lost) = pkt.time_lost {
312                if time_lost > loss_thresh {
313                    break;
314                }
315            }
316
317            if pkt.time_acked.is_none() && pkt.time_lost.is_none() {
318                break;
319            }
320
321            self.sent_packets.pop_front();
322        }
323    }
324}
325
326pub struct LegacyRecovery {
327    epochs: [RecoveryEpoch; Epoch::count()],
328
329    loss_timer: LossDetectionTimer,
330
331    pto_count: u32,
332
333    rtt_stats: RttStats,
334
335    lost_spurious_count: usize,
336
337    pkt_thresh: u64,
338
339    time_thresh: f64,
340
341    bytes_in_flight: BytesInFlight,
342
343    bytes_sent: usize,
344
345    bytes_lost: u64,
346
347    pub max_datagram_size: usize,
348
349    #[cfg(feature = "qlog")]
350    qlog_metrics: QlogMetrics,
351
352    #[cfg(feature = "qlog")]
353    qlog_prev_cc_state: &'static str,
354
355    /// How many non-ack-eliciting packets have been sent.
356    outstanding_non_ack_eliciting: usize,
357
358    pub congestion: Congestion,
359
360    /// A resusable list of acks.
361    newly_acked: Vec<Acked>,
362}
363
364impl LegacyRecovery {
365    pub fn new_with_config(recovery_config: &RecoveryConfig) -> Self {
366        Self {
367            epochs: Default::default(),
368
369            loss_timer: Default::default(),
370
371            pto_count: 0,
372
373            rtt_stats: RttStats::new(
374                recovery_config.initial_rtt,
375                recovery_config.max_ack_delay,
376            ),
377
378            lost_spurious_count: 0,
379
380            pkt_thresh: INITIAL_PACKET_THRESHOLD,
381
382            time_thresh: INITIAL_TIME_THRESHOLD,
383
384            bytes_in_flight: Default::default(),
385
386            bytes_sent: 0,
387
388            bytes_lost: 0,
389
390            max_datagram_size: recovery_config.max_send_udp_payload_size,
391
392            #[cfg(feature = "qlog")]
393            qlog_metrics: QlogMetrics::default(),
394
395            #[cfg(feature = "qlog")]
396            qlog_prev_cc_state: "",
397
398            outstanding_non_ack_eliciting: 0,
399
400            congestion: Congestion::from_config(recovery_config),
401
402            newly_acked: Vec::new(),
403        }
404    }
405
406    #[cfg(test)]
407    pub fn new(config: &crate::Config) -> Self {
408        Self::new_with_config(&RecoveryConfig::from_config(config))
409    }
410
411    fn loss_time_and_space(&self) -> (Option<Instant>, Epoch) {
412        let mut epoch = Epoch::Initial;
413        let mut time = self.epochs[epoch].loss_time;
414
415        // Iterate over all packet number spaces starting from Handshake.
416        for e in [Epoch::Handshake, Epoch::Application] {
417            let new_time = self.epochs[e].loss_time;
418            if time.is_none() || new_time < time {
419                time = new_time;
420                epoch = e;
421            }
422        }
423
424        (time, epoch)
425    }
426
427    fn pto_time_and_space(
428        &self, handshake_status: HandshakeStatus, now: Instant,
429    ) -> (Option<Instant>, Epoch) {
430        let mut duration = self.pto() * 2_u32.pow(self.pto_count);
431
432        // Arm PTO from now when there are no inflight packets.
433        if self.bytes_in_flight.is_zero() {
434            if handshake_status.has_handshake_keys {
435                return (Some(now + duration), Epoch::Handshake);
436            } else {
437                return (Some(now + duration), Epoch::Initial);
438            }
439        }
440
441        let mut pto_timeout = None;
442        let mut pto_space = Epoch::Initial;
443
444        // Iterate over all packet number spaces.
445        for e in [Epoch::Initial, Epoch::Handshake, Epoch::Application] {
446            let epoch = &self.epochs[e];
447            if epoch.in_flight_count == 0 {
448                continue;
449            }
450
451            if e == Epoch::Application {
452                // Skip Application Data until handshake completes.
453                if !handshake_status.completed {
454                    return (pto_timeout, pto_space);
455                }
456
457                // Include max_ack_delay and backoff for Application Data.
458                duration +=
459                    self.rtt_stats.max_ack_delay * 2_u32.pow(self.pto_count);
460            }
461
462            let new_time = epoch
463                .time_of_last_ack_eliciting_packet
464                .map(|t| t + duration);
465
466            if pto_timeout.is_none() || new_time < pto_timeout {
467                pto_timeout = new_time;
468                pto_space = e;
469            }
470        }
471
472        (pto_timeout, pto_space)
473    }
474
475    fn set_loss_detection_timer(
476        &mut self, handshake_status: HandshakeStatus, now: Instant,
477    ) {
478        let (earliest_loss_time, _) = self.loss_time_and_space();
479
480        if let Some(to) = earliest_loss_time {
481            // Time threshold loss detection.
482            self.loss_timer.update(to);
483            return;
484        }
485
486        if self.bytes_in_flight.is_zero() &&
487            handshake_status.peer_verified_address
488        {
489            self.loss_timer.clear();
490            return;
491        }
492
493        // PTO timer.
494        if let (Some(timeout), _) = self.pto_time_and_space(handshake_status, now)
495        {
496            self.loss_timer.update(timeout);
497        }
498    }
499
500    fn detect_lost_packets(
501        &mut self, epoch: Epoch, now: Instant, trace_id: &str,
502    ) -> (usize, usize) {
503        let loss_delay = cmp::max(self.rtt_stats.latest_rtt, self.rtt())
504            .mul_f64(self.time_thresh);
505
506        let loss = self.epochs[epoch].detect_lost_packets(
507            loss_delay,
508            self.pkt_thresh,
509            now,
510            trace_id,
511            epoch,
512        );
513
514        if let Some(pkt) = loss.largest_lost_pkt {
515            if !self.congestion.in_congestion_recovery(pkt.time_sent) {
516                (self.congestion.cc_ops.checkpoint)(&mut self.congestion);
517            }
518
519            (self.congestion.cc_ops.congestion_event)(
520                &mut self.congestion,
521                self.bytes_in_flight.get(),
522                loss.lost_bytes,
523                &pkt,
524                now,
525            );
526
527            self.bytes_in_flight
528                .saturating_subtract(loss.lost_bytes, now);
529        };
530
531        self.bytes_in_flight
532            .saturating_subtract(loss.pmtud_lost_bytes, now);
533
534        self.epochs[epoch]
535            .drain_acked_and_lost_packets(now - self.rtt_stats.rtt());
536
537        self.congestion.lost_count += loss.lost_packets;
538
539        (loss.lost_packets, loss.lost_bytes)
540    }
541}
542
543impl RecoveryOps for LegacyRecovery {
544    /// Returns whether or not we should elicit an ACK even if we wouldn't
545    /// otherwise have constructed an ACK eliciting packet.
546    fn should_elicit_ack(&self, epoch: Epoch) -> bool {
547        self.epochs[epoch].loss_probes > 0 ||
548            self.outstanding_non_ack_eliciting >=
549                MAX_OUTSTANDING_NON_ACK_ELICITING
550    }
551
552    fn get_acked_frames(&mut self, epoch: Epoch) -> Vec<frame::Frame> {
553        std::mem::take(&mut self.epochs[epoch].acked_frames)
554    }
555
556    fn get_lost_frames(&mut self, epoch: Epoch) -> Vec<frame::Frame> {
557        std::mem::take(&mut self.epochs[epoch].lost_frames)
558    }
559
560    fn get_largest_acked_on_epoch(&self, epoch: Epoch) -> Option<u64> {
561        self.epochs[epoch].largest_acked_packet
562    }
563
564    fn has_lost_frames(&self, epoch: Epoch) -> bool {
565        !self.epochs[epoch].lost_frames.is_empty()
566    }
567
568    fn loss_probes(&self, epoch: Epoch) -> usize {
569        self.epochs[epoch].loss_probes
570    }
571
572    #[cfg(test)]
573    fn inc_loss_probes(&mut self, epoch: Epoch) {
574        self.epochs[epoch].loss_probes += 1;
575    }
576
577    fn ping_sent(&mut self, epoch: Epoch) {
578        self.epochs[epoch].loss_probes =
579            self.epochs[epoch].loss_probes.saturating_sub(1);
580    }
581
582    fn on_packet_sent(
583        &mut self, mut pkt: Sent, epoch: Epoch,
584        handshake_status: HandshakeStatus, now: Instant, trace_id: &str,
585    ) {
586        let ack_eliciting = pkt.ack_eliciting;
587        let in_flight = pkt.in_flight;
588        let sent_bytes = pkt.size;
589
590        if ack_eliciting {
591            self.outstanding_non_ack_eliciting = 0;
592        } else {
593            self.outstanding_non_ack_eliciting += 1;
594        }
595
596        if in_flight && ack_eliciting {
597            self.epochs[epoch].time_of_last_ack_eliciting_packet = Some(now);
598        }
599
600        self.congestion.on_packet_sent(
601            self.bytes_in_flight.get(),
602            sent_bytes,
603            now,
604            &mut pkt,
605            &self.rtt_stats,
606            self.bytes_lost,
607            in_flight,
608        );
609
610        if in_flight {
611            self.epochs[epoch].in_flight_count += 1;
612            self.bytes_in_flight.add(sent_bytes, now);
613
614            self.set_loss_detection_timer(handshake_status, now);
615        }
616
617        self.bytes_sent += sent_bytes;
618
619        #[cfg(test)]
620        {
621            self.epochs[epoch].test_largest_sent_pkt_num_on_path = self.epochs
622                [epoch]
623                .test_largest_sent_pkt_num_on_path
624                .max(Some(pkt.pkt_num));
625        }
626
627        self.epochs[epoch].sent_packets.push_back(pkt);
628
629        trace!("{trace_id} {self:?}");
630    }
631
632    fn get_packet_send_time(&self, _now: Instant) -> Instant {
633        // TODO .max(now)
634        self.congestion.get_packet_send_time()
635    }
636
637    // `peer_sent_ack_ranges` should not be used without validation.
638    fn on_ack_received(
639        &mut self, peer_sent_ack_ranges: &RangeSet, ack_delay: u64, epoch: Epoch,
640        handshake_status: HandshakeStatus, now: Instant, skip_pn: Option<u64>,
641        trace_id: &str,
642    ) -> Result<OnAckReceivedOutcome> {
643        let AckedDetectionResult {
644            acked_bytes,
645            spurious_losses,
646            spurious_pkt_thresh,
647            has_ack_eliciting,
648            has_in_flight_spurious_loss,
649        } = self.epochs[epoch].detect_and_remove_acked_packets(
650            now,
651            peer_sent_ack_ranges,
652            &mut self.newly_acked,
653            &self.rtt_stats,
654            skip_pn,
655            trace_id,
656        )?;
657
658        self.lost_spurious_count += spurious_losses;
659        if let Some(thresh) = spurious_pkt_thresh {
660            self.pkt_thresh =
661                self.pkt_thresh.max(thresh.min(MAX_PACKET_THRESHOLD));
662            self.time_thresh = PACKET_REORDER_TIME_THRESHOLD;
663        }
664
665        // Undo congestion window update.
666        if has_in_flight_spurious_loss {
667            (self.congestion.cc_ops.rollback)(&mut self.congestion);
668        }
669
670        if self.newly_acked.is_empty() {
671            return Ok(OnAckReceivedOutcome::default());
672        }
673
674        let largest_newly_acked = self.newly_acked.last().unwrap();
675
676        // Update `largest_acked_packet` based on the validated `newly_acked`
677        // value.
678        let largest_acked_pkt_num = self.epochs[epoch]
679            .largest_acked_packet
680            .unwrap_or(0)
681            .max(largest_newly_acked.pkt_num);
682        self.epochs[epoch].largest_acked_packet = Some(largest_acked_pkt_num);
683
684        // Check if largest packet is newly acked.
685        if largest_newly_acked.pkt_num == largest_acked_pkt_num &&
686            has_ack_eliciting
687        {
688            let latest_rtt = now - largest_newly_acked.time_sent;
689            self.rtt_stats.update_rtt(
690                latest_rtt,
691                Duration::from_micros(ack_delay),
692                now,
693                handshake_status.completed,
694            );
695        }
696
697        // Detect and mark lost packets without removing them from the sent
698        // packets list.
699        let (lost_packets, lost_bytes) =
700            self.detect_lost_packets(epoch, now, trace_id);
701
702        self.congestion.on_packets_acked(
703            self.bytes_in_flight.get(),
704            &mut self.newly_acked,
705            &self.rtt_stats,
706            now,
707        );
708
709        self.bytes_in_flight.saturating_subtract(acked_bytes, now);
710
711        self.pto_count = 0;
712
713        self.set_loss_detection_timer(handshake_status, now);
714
715        self.epochs[epoch]
716            .drain_acked_and_lost_packets(now - self.rtt_stats.rtt());
717
718        Ok(OnAckReceivedOutcome {
719            lost_packets,
720            lost_bytes,
721            acked_bytes,
722            spurious_losses,
723        })
724    }
725
726    fn on_loss_detection_timeout(
727        &mut self, handshake_status: HandshakeStatus, now: Instant,
728        trace_id: &str,
729    ) -> OnLossDetectionTimeoutOutcome {
730        let (earliest_loss_time, epoch) = self.loss_time_and_space();
731
732        if earliest_loss_time.is_some() {
733            // Time threshold loss detection.
734            let (lost_packets, lost_bytes) =
735                self.detect_lost_packets(epoch, now, trace_id);
736
737            self.set_loss_detection_timer(handshake_status, now);
738
739            trace!("{trace_id} {self:?}");
740            return OnLossDetectionTimeoutOutcome {
741                lost_packets,
742                lost_bytes,
743            };
744        }
745
746        let epoch = if self.bytes_in_flight.get() > 0 {
747            // Send new data if available, else retransmit old data. If neither
748            // is available, send a single PING frame.
749            let (_, e) = self.pto_time_and_space(handshake_status, now);
750
751            e
752        } else {
753            // Client sends an anti-deadlock packet: Initial is padded to earn
754            // more anti-amplification credit, a Handshake packet proves address
755            // ownership.
756            if handshake_status.has_handshake_keys {
757                Epoch::Handshake
758            } else {
759                Epoch::Initial
760            }
761        };
762
763        self.pto_count += 1;
764
765        let epoch = &mut self.epochs[epoch];
766
767        epoch.loss_probes =
768            cmp::min(self.pto_count as usize, MAX_PTO_PROBES_COUNT);
769
770        let unacked_iter = epoch.sent_packets
771            .iter_mut()
772            // Skip packets that have already been acked or lost, and packets
773            // that don't contain either CRYPTO or STREAM frames.
774            .filter(|p| p.has_data && p.time_acked.is_none() && p.time_lost.is_none())
775            // Only return as many packets as the number of probe packets that
776            // will be sent.
777            .take(epoch.loss_probes);
778
779        // Retransmit the frames from the oldest sent packets on PTO. However
780        // the packets are not actually declared lost (so there is no effect to
781        // congestion control), we just reschedule the data they carried.
782        //
783        // This will also trigger sending an ACK and retransmitting frames like
784        // HANDSHAKE_DONE and MAX_DATA / MAX_STREAM_DATA as well, in addition
785        // to CRYPTO and STREAM, if the original packet carried them.
786        for unacked in unacked_iter {
787            epoch.lost_frames.extend_from_slice(&unacked.frames);
788        }
789
790        self.set_loss_detection_timer(handshake_status, now);
791
792        trace!("{trace_id} {self:?}");
793
794        OnLossDetectionTimeoutOutcome {
795            lost_packets: 0,
796            lost_bytes: 0,
797        }
798    }
799
800    fn on_pkt_num_space_discarded(
801        &mut self, epoch: Epoch, handshake_status: HandshakeStatus, now: Instant,
802    ) {
803        let epoch = &mut self.epochs[epoch];
804
805        let unacked_bytes = epoch
806            .sent_packets
807            .iter()
808            .filter(|p| {
809                p.in_flight && p.time_acked.is_none() && p.time_lost.is_none()
810            })
811            .fold(0, |acc, p| acc + p.size);
812
813        self.bytes_in_flight.saturating_subtract(unacked_bytes, now);
814
815        epoch.sent_packets.clear();
816        epoch.lost_frames.clear();
817        epoch.acked_frames.clear();
818
819        epoch.time_of_last_ack_eliciting_packet = None;
820        epoch.loss_time = None;
821        epoch.loss_probes = 0;
822        epoch.in_flight_count = 0;
823
824        self.set_loss_detection_timer(handshake_status, now);
825    }
826
827    fn on_path_change(
828        &mut self, epoch: Epoch, now: Instant, trace_id: &str,
829    ) -> (usize, usize) {
830        // Time threshold loss detection.
831        self.detect_lost_packets(epoch, now, trace_id)
832    }
833
834    fn loss_detection_timer(&self) -> Option<Instant> {
835        self.loss_timer.time
836    }
837
838    fn cwnd(&self) -> usize {
839        self.congestion.congestion_window()
840    }
841
842    fn cwnd_available(&self) -> usize {
843        // Ignore cwnd when sending probe packets.
844        if self.epochs.iter().any(|e| e.loss_probes > 0) {
845            return usize::MAX;
846        }
847
848        // Open more space (snd_cnt) for PRR when allowed.
849        self.cwnd().saturating_sub(self.bytes_in_flight.get()) +
850            self.congestion.prr.snd_cnt
851    }
852
853    fn rtt(&self) -> Duration {
854        self.rtt_stats.rtt()
855    }
856
857    fn min_rtt(&self) -> Option<Duration> {
858        self.rtt_stats.min_rtt()
859    }
860
861    fn max_rtt(&self) -> Option<Duration> {
862        self.rtt_stats.max_rtt()
863    }
864
865    fn rttvar(&self) -> Duration {
866        self.rtt_stats.rttvar
867    }
868
869    fn pto(&self) -> Duration {
870        self.rtt() + cmp::max(self.rtt_stats.rttvar * 4, GRANULARITY)
871    }
872
873    /// The most recent data delivery rate estimate.
874    fn delivery_rate(&self) -> Bandwidth {
875        self.congestion.delivery_rate()
876    }
877
878    fn max_bandwidth(&self) -> Option<Bandwidth> {
879        // TODO implement
880        None
881    }
882
883    /// Statistics from when a CCA first exited the startup phase.
884    fn startup_exit(&self) -> Option<StartupExit> {
885        self.congestion.ssthresh.startup_exit()
886    }
887
888    fn max_datagram_size(&self) -> usize {
889        self.max_datagram_size
890    }
891
892    fn pmtud_update_max_datagram_size(&mut self, new_max_datagram_size: usize) {
893        // Congestion Window is updated only when it's not updated already.
894        // Update cwnd if it hasn't been updated yet.
895        if self.cwnd() ==
896            self.max_datagram_size *
897                self.congestion.initial_congestion_window_packets
898        {
899            self.congestion.congestion_window = new_max_datagram_size *
900                self.congestion.initial_congestion_window_packets;
901        }
902
903        self.congestion.pacer = pacer::Pacer::new(
904            self.congestion.pacer.enabled(),
905            self.cwnd(),
906            0,
907            new_max_datagram_size,
908            self.congestion.pacer.max_pacing_rate(),
909        );
910
911        self.max_datagram_size = new_max_datagram_size;
912    }
913
914    fn update_max_datagram_size(&mut self, new_max_datagram_size: usize) {
915        self.pmtud_update_max_datagram_size(
916            self.max_datagram_size.min(new_max_datagram_size),
917        )
918    }
919
920    #[cfg(test)]
921    fn sent_packets_len(&self, epoch: Epoch) -> usize {
922        self.epochs[epoch].sent_packets.len()
923    }
924
925    #[cfg(test)]
926    fn in_flight_count(&self, epoch: Epoch) -> usize {
927        self.epochs[epoch].in_flight_count
928    }
929
930    #[cfg(test)]
931    fn bytes_in_flight(&self) -> usize {
932        self.bytes_in_flight.get()
933    }
934
935    fn bytes_in_flight_duration(&self) -> Duration {
936        self.bytes_in_flight.get_duration()
937    }
938
939    #[cfg(test)]
940    fn pacing_rate(&self) -> u64 {
941        self.congestion.pacer.rate()
942    }
943
944    #[cfg(test)]
945    fn pto_count(&self) -> u32 {
946        self.pto_count
947    }
948
949    #[cfg(test)]
950    fn pkt_thresh(&self) -> Option<u64> {
951        Some(self.pkt_thresh)
952    }
953
954    #[cfg(test)]
955    fn time_thresh(&self) -> f64 {
956        self.time_thresh
957    }
958
959    #[cfg(test)]
960    fn lost_spurious_count(&self) -> usize {
961        self.lost_spurious_count
962    }
963
964    #[cfg(test)]
965    fn detect_lost_packets_for_test(
966        &mut self, epoch: Epoch, now: Instant,
967    ) -> (usize, usize) {
968        self.detect_lost_packets(epoch, now, "")
969    }
970
971    // FIXME only used by gcongestion
972    fn on_app_limited(&mut self) {
973        // Not implemented for legacy recovery, update_app_limited and
974        // delivery_rate_update_app_limited used instead.
975    }
976
977    #[cfg(test)]
978    fn largest_sent_pkt_num_on_path(&self, epoch: Epoch) -> Option<u64> {
979        self.epochs[epoch].test_largest_sent_pkt_num_on_path
980    }
981
982    #[cfg(test)]
983    fn app_limited(&self) -> bool {
984        self.congestion.app_limited
985    }
986
987    fn update_app_limited(&mut self, v: bool) {
988        self.congestion.update_app_limited(v);
989    }
990
991    // FIXME only used by congestion
992    fn delivery_rate_update_app_limited(&mut self, v: bool) {
993        self.congestion.delivery_rate.update_app_limited(v);
994    }
995
996    // FIXME only used by congestion
997    fn update_max_ack_delay(&mut self, max_ack_delay: Duration) {
998        self.rtt_stats.max_ack_delay = max_ack_delay;
999    }
1000
1001    #[cfg(feature = "qlog")]
1002    fn state_str(&self, now: Instant) -> &'static str {
1003        (self.congestion.cc_ops.state_str)(&self.congestion, now)
1004    }
1005
1006    #[cfg(feature = "qlog")]
1007    fn get_updated_qlog_event_data(&mut self) -> Option<EventData> {
1008        let qlog_metrics = QlogMetrics {
1009            min_rtt: *self.rtt_stats.min_rtt,
1010            smoothed_rtt: self.rtt(),
1011            latest_rtt: self.rtt_stats.latest_rtt,
1012            rttvar: self.rtt_stats.rttvar,
1013            cwnd: self.cwnd() as u64,
1014            bytes_in_flight: self.bytes_in_flight.get() as u64,
1015            ssthresh: Some(self.congestion.ssthresh.get() as u64),
1016            pacing_rate: self.congestion.pacer.rate(),
1017        };
1018
1019        self.qlog_metrics.maybe_update(qlog_metrics)
1020    }
1021
1022    #[cfg(feature = "qlog")]
1023    fn get_updated_qlog_cc_state(
1024        &mut self, now: Instant,
1025    ) -> Option<&'static str> {
1026        let cc_state = self.state_str(now);
1027        if cc_state != self.qlog_prev_cc_state {
1028            self.qlog_prev_cc_state = cc_state;
1029            Some(cc_state)
1030        } else {
1031            None
1032        }
1033    }
1034
1035    fn send_quantum(&self) -> usize {
1036        self.congestion.send_quantum()
1037    }
1038
1039    // TODO tests
1040    fn get_next_release_time(&self) -> ReleaseDecision {
1041        let now = Instant::now();
1042        let next_send_time = self.congestion.get_packet_send_time();
1043        if next_send_time > now {
1044            ReleaseDecision {
1045                time: ReleaseTime::At(next_send_time),
1046                allow_burst: false,
1047            }
1048        } else {
1049            ReleaseDecision {
1050                time: ReleaseTime::Immediate,
1051                allow_burst: false,
1052            }
1053        }
1054    }
1055
1056    fn gcongestion_enabled(&self) -> bool {
1057        false
1058    }
1059
1060    fn lost_count(&self) -> usize {
1061        self.congestion.lost_count
1062    }
1063
1064    fn bytes_lost(&self) -> u64 {
1065        self.bytes_lost
1066    }
1067}
1068
1069impl std::fmt::Debug for LegacyRecovery {
1070    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
1071        write!(f, "timer={:?} ", self.loss_timer)?;
1072        write!(f, "latest_rtt={:?} ", self.rtt_stats.latest_rtt)?;
1073        write!(f, "srtt={:?} ", self.rtt_stats.smoothed_rtt)?;
1074        write!(f, "min_rtt={:?} ", *self.rtt_stats.min_rtt)?;
1075        write!(f, "rttvar={:?} ", self.rtt_stats.rttvar)?;
1076        write!(f, "cwnd={} ", self.cwnd())?;
1077        write!(f, "ssthresh={} ", self.congestion.ssthresh.get())?;
1078        write!(f, "bytes_in_flight={} ", self.bytes_in_flight.get())?;
1079        write!(f, "app_limited={} ", self.congestion.app_limited)?;
1080        write!(
1081            f,
1082            "congestion_recovery_start_time={:?} ",
1083            self.congestion.congestion_recovery_start_time
1084        )?;
1085        write!(f, "{:?} ", self.congestion.delivery_rate)?;
1086        write!(f, "pacer={:?} ", self.congestion.pacer)?;
1087
1088        if self.congestion.hystart.enabled() {
1089            write!(f, "hystart={:?} ", self.congestion.hystart)?;
1090        }
1091
1092        // CC-specific debug info
1093        (self.congestion.cc_ops.debug_fmt)(&self.congestion, f)?;
1094
1095        Ok(())
1096    }
1097}
1098
1099#[derive(Clone)]
1100pub struct Acked {
1101    pub pkt_num: u64,
1102
1103    pub time_sent: Instant,
1104
1105    pub size: usize,
1106
1107    pub rtt: Duration,
1108
1109    pub delivered: usize,
1110
1111    pub delivered_time: Instant,
1112
1113    pub first_sent_time: Instant,
1114
1115    pub is_app_limited: bool,
1116}