quiche/recovery/congestion/
recovery.rs

1// Copyright (C) 2018-2019, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27use std::cmp;
28
29use std::time::Duration;
30use std::time::Instant;
31
32use std::collections::VecDeque;
33
34use super::RecoveryConfig;
35use super::Sent;
36
37use crate::recovery::HandshakeStatus;
38use crate::recovery::RecoveryOps;
39
40use crate::packet::Epoch;
41use crate::ranges::RangeSet;
42
43#[cfg(feature = "qlog")]
44use crate::recovery::QlogMetrics;
45
46use crate::frame;
47use crate::packet;
48use crate::ranges;
49
50#[cfg(feature = "qlog")]
51use qlog::events::EventData;
52
53use super::pacer;
54use super::Congestion;
55use crate::recovery::rtt::RttStats;
56use crate::recovery::LossDetectionTimer;
57use crate::recovery::ReleaseDecision;
58use crate::recovery::ReleaseTime;
59use crate::recovery::GRANULARITY;
60use crate::recovery::INITIAL_PACKET_THRESHOLD;
61use crate::recovery::INITIAL_TIME_THRESHOLD;
62use crate::recovery::MAX_OUTSTANDING_NON_ACK_ELICITING;
63use crate::recovery::MAX_PACKET_THRESHOLD;
64use crate::recovery::MAX_PTO_PROBES_COUNT;
65
66#[derive(Default)]
67struct RecoveryEpoch {
68    /// The time the most recent ack-eliciting packet was sent.
69    time_of_last_ack_eliciting_packet: Option<Instant>,
70
71    /// The largest packet number acknowledged in the packet number space so
72    /// far.
73    largest_acked_packet: Option<u64>,
74
75    /// The time at which the next packet in that packet number space can be
76    /// considered lost based on exceeding the reordering window in time.
77    loss_time: Option<Instant>,
78
79    /// An association of packet numbers in a packet number space to information
80    /// about them.
81    sent_packets: VecDeque<Sent>,
82
83    loss_probes: usize,
84    in_flight_count: usize,
85
86    acked_frames: Vec<frame::Frame>,
87    lost_frames: Vec<frame::Frame>,
88}
89
90struct AckedDetectionResult {
91    acked_bytes: usize,
92    spurious_losses: usize,
93    spurious_pkt_thresh: Option<u64>,
94    has_ack_eliciting: bool,
95    has_in_flight_spurious_loss: bool,
96}
97
98struct LossDetectionResult {
99    largest_lost_pkt: Option<Sent>,
100    lost_packets: usize,
101    lost_bytes: usize,
102    pmtud_lost_bytes: usize,
103}
104
105impl RecoveryEpoch {
106    fn detect_and_remove_acked_packets(
107        &mut self, now: Instant, acked: &RangeSet, newly_acked: &mut Vec<Acked>,
108        rtt_stats: &RttStats, trace_id: &str,
109    ) -> AckedDetectionResult {
110        newly_acked.clear();
111
112        let mut acked_bytes = 0;
113        let mut spurious_losses = 0;
114        let mut spurious_pkt_thresh = None;
115        let mut has_ack_eliciting = false;
116        let mut has_in_flight_spurious_loss = false;
117
118        let largest_acked = self.largest_acked_packet.unwrap();
119
120        for ack in acked.iter() {
121            // Because packets always have incrementing numbers, they are always
122            // in sorted order.
123            let start = if self
124                .sent_packets
125                .front()
126                .filter(|e| e.pkt_num >= ack.start)
127                .is_some()
128            {
129                // Usually it will be the first packet.
130                0
131            } else {
132                self.sent_packets
133                    .binary_search_by_key(&ack.start, |p| p.pkt_num)
134                    .unwrap_or_else(|e| e)
135            };
136
137            for unacked in self.sent_packets.range_mut(start..) {
138                if unacked.pkt_num >= ack.end {
139                    break;
140                }
141
142                if unacked.time_acked.is_some() {
143                    // Already acked.
144                } else if unacked.time_lost.is_some() {
145                    // An acked packet was already declared lost.
146                    spurious_losses += 1;
147                    spurious_pkt_thresh
148                        .get_or_insert(largest_acked - unacked.pkt_num + 1);
149                    unacked.time_acked = Some(now);
150
151                    if unacked.in_flight {
152                        has_in_flight_spurious_loss = true;
153                    }
154                } else {
155                    if unacked.in_flight {
156                        self.in_flight_count -= 1;
157                        acked_bytes += unacked.size;
158                    }
159
160                    newly_acked.push(Acked {
161                        pkt_num: unacked.pkt_num,
162                        time_sent: unacked.time_sent,
163                        size: unacked.size,
164
165                        rtt: now.saturating_duration_since(unacked.time_sent),
166                        delivered: unacked.delivered,
167                        delivered_time: unacked.delivered_time,
168                        first_sent_time: unacked.first_sent_time,
169                        is_app_limited: unacked.is_app_limited,
170                    });
171
172                    trace!("{} packet newly acked {}", trace_id, unacked.pkt_num);
173
174                    self.acked_frames
175                        .extend(std::mem::take(&mut unacked.frames));
176
177                    has_ack_eliciting |= unacked.ack_eliciting;
178                    unacked.time_acked = Some(now);
179                }
180            }
181        }
182
183        self.drain_acked_and_lost_packets(now - rtt_stats.rtt());
184
185        AckedDetectionResult {
186            acked_bytes,
187            spurious_losses,
188            spurious_pkt_thresh,
189            has_ack_eliciting,
190            has_in_flight_spurious_loss,
191        }
192    }
193
194    fn detect_lost_packets(
195        &mut self, loss_delay: Duration, pkt_thresh: u64, now: Instant,
196        trace_id: &str, epoch: Epoch,
197    ) -> LossDetectionResult {
198        self.loss_time = None;
199
200        // Minimum time of kGranularity before packets are deemed lost.
201        let loss_delay = cmp::max(loss_delay, GRANULARITY);
202        let largest_acked = self.largest_acked_packet.unwrap_or(0);
203
204        // Packets sent before this time are deemed lost.
205        let lost_send_time = now.checked_sub(loss_delay).unwrap();
206
207        let mut lost_packets = 0;
208        let mut lost_bytes = 0;
209        let mut pmtud_lost_bytes = 0;
210
211        let mut largest_lost_pkt = None;
212
213        let unacked_iter = self.sent_packets
214        .iter_mut()
215        // Skip packets that follow the largest acked packet.
216        .take_while(|p| p.pkt_num <= largest_acked)
217        // Skip packets that have already been acked or lost.
218        .filter(|p| p.time_acked.is_none() && p.time_lost.is_none());
219
220        for unacked in unacked_iter {
221            // Mark packet as lost, or set time when it should be marked.
222            if unacked.time_sent <= lost_send_time ||
223                largest_acked >= unacked.pkt_num + pkt_thresh
224            {
225                self.lost_frames.extend(unacked.frames.drain(..));
226
227                unacked.time_lost = Some(now);
228
229                if unacked.pmtud {
230                    pmtud_lost_bytes += unacked.size;
231                    self.in_flight_count -= 1;
232
233                    // Do not track PMTUD probes losses.
234                    continue;
235                }
236
237                if unacked.in_flight {
238                    lost_bytes += unacked.size;
239
240                    // Frames have already been removed from the packet, so
241                    // cloning the whole packet should be relatively cheap.
242                    largest_lost_pkt = Some(unacked.clone());
243
244                    self.in_flight_count -= 1;
245
246                    trace!(
247                        "{} packet {} lost on epoch {}",
248                        trace_id,
249                        unacked.pkt_num,
250                        epoch
251                    );
252                }
253
254                lost_packets += 1;
255            } else {
256                let loss_time = match self.loss_time {
257                    None => unacked.time_sent + loss_delay,
258
259                    Some(loss_time) =>
260                        cmp::min(loss_time, unacked.time_sent + loss_delay),
261                };
262
263                self.loss_time = Some(loss_time);
264                break;
265            }
266        }
267
268        LossDetectionResult {
269            largest_lost_pkt,
270            lost_packets,
271            lost_bytes,
272            pmtud_lost_bytes,
273        }
274    }
275
276    fn drain_acked_and_lost_packets(&mut self, loss_thresh: Instant) {
277        // In order to avoid removing elements from the middle of the list
278        // (which would require copying other elements to compact the list),
279        // we only remove a contiguous range of elements from the start of the
280        // list.
281        //
282        // This means that acked or lost elements coming after this will not
283        // be removed at this point, but their removal is delayed for a later
284        // time, once the gaps have been filled.
285        while let Some(pkt) = self.sent_packets.front() {
286            if let Some(time_lost) = pkt.time_lost {
287                if time_lost > loss_thresh {
288                    break;
289                }
290            }
291
292            if pkt.time_acked.is_none() && pkt.time_lost.is_none() {
293                break;
294            }
295
296            self.sent_packets.pop_front();
297        }
298    }
299}
300
301pub struct LegacyRecovery {
302    epochs: [RecoveryEpoch; packet::Epoch::count()],
303
304    loss_timer: LossDetectionTimer,
305
306    pto_count: u32,
307
308    rtt_stats: RttStats,
309
310    lost_spurious_count: usize,
311
312    pkt_thresh: u64,
313
314    time_thresh: f64,
315
316    pub bytes_in_flight: usize,
317
318    bytes_sent: usize,
319
320    bytes_lost: u64,
321
322    pub max_datagram_size: usize,
323
324    #[cfg(feature = "qlog")]
325    qlog_metrics: QlogMetrics,
326
327    /// How many non-ack-eliciting packets have been sent.
328    outstanding_non_ack_eliciting: usize,
329
330    pub congestion: Congestion,
331
332    /// A resusable list of acks.
333    newly_acked: Vec<Acked>,
334}
335
336impl LegacyRecovery {
337    pub fn new_with_config(recovery_config: &RecoveryConfig) -> Self {
338        Self {
339            epochs: Default::default(),
340
341            loss_timer: Default::default(),
342
343            pto_count: 0,
344
345            rtt_stats: RttStats::new(recovery_config.max_ack_delay),
346
347            lost_spurious_count: 0,
348
349            pkt_thresh: INITIAL_PACKET_THRESHOLD,
350
351            time_thresh: INITIAL_TIME_THRESHOLD,
352
353            bytes_in_flight: 0,
354
355            bytes_sent: 0,
356
357            bytes_lost: 0,
358
359            max_datagram_size: recovery_config.max_send_udp_payload_size,
360
361            #[cfg(feature = "qlog")]
362            qlog_metrics: QlogMetrics::default(),
363
364            outstanding_non_ack_eliciting: 0,
365
366            congestion: Congestion::from_config(recovery_config),
367
368            newly_acked: Vec::new(),
369        }
370    }
371
372    #[cfg(test)]
373    pub fn new(config: &crate::Config) -> Self {
374        Self::new_with_config(&RecoveryConfig::from_config(config))
375    }
376
377    fn loss_time_and_space(&self) -> (Option<Instant>, packet::Epoch) {
378        let mut epoch = packet::Epoch::Initial;
379        let mut time = self.epochs[epoch].loss_time;
380
381        // Iterate over all packet number spaces starting from Handshake.
382        for e in [packet::Epoch::Handshake, packet::Epoch::Application] {
383            let new_time = self.epochs[e].loss_time;
384            if time.is_none() || new_time < time {
385                time = new_time;
386                epoch = e;
387            }
388        }
389
390        (time, epoch)
391    }
392
393    fn pto_time_and_space(
394        &self, handshake_status: HandshakeStatus, now: Instant,
395    ) -> (Option<Instant>, packet::Epoch) {
396        let mut duration = self.pto() * 2_u32.pow(self.pto_count);
397
398        // Arm PTO from now when there are no inflight packets.
399        if self.bytes_in_flight == 0 {
400            if handshake_status.has_handshake_keys {
401                return (Some(now + duration), packet::Epoch::Handshake);
402            } else {
403                return (Some(now + duration), packet::Epoch::Initial);
404            }
405        }
406
407        let mut pto_timeout = None;
408        let mut pto_space = packet::Epoch::Initial;
409
410        // Iterate over all packet number spaces.
411        for e in [
412            packet::Epoch::Initial,
413            packet::Epoch::Handshake,
414            packet::Epoch::Application,
415        ] {
416            let epoch = &self.epochs[e];
417            if epoch.in_flight_count == 0 {
418                continue;
419            }
420
421            if e == packet::Epoch::Application {
422                // Skip Application Data until handshake completes.
423                if !handshake_status.completed {
424                    return (pto_timeout, pto_space);
425                }
426
427                // Include max_ack_delay and backoff for Application Data.
428                duration +=
429                    self.rtt_stats.max_ack_delay * 2_u32.pow(self.pto_count);
430            }
431
432            let new_time = epoch
433                .time_of_last_ack_eliciting_packet
434                .map(|t| t + duration);
435
436            if pto_timeout.is_none() || new_time < pto_timeout {
437                pto_timeout = new_time;
438                pto_space = e;
439            }
440        }
441
442        (pto_timeout, pto_space)
443    }
444
445    fn set_loss_detection_timer(
446        &mut self, handshake_status: HandshakeStatus, now: Instant,
447    ) {
448        let (earliest_loss_time, _) = self.loss_time_and_space();
449
450        if let Some(to) = earliest_loss_time {
451            // Time threshold loss detection.
452            self.loss_timer.update(to);
453            return;
454        }
455
456        if self.bytes_in_flight == 0 && handshake_status.peer_verified_address {
457            self.loss_timer.clear();
458            return;
459        }
460
461        // PTO timer.
462        if let (Some(timeout), _) = self.pto_time_and_space(handshake_status, now)
463        {
464            self.loss_timer.update(timeout);
465        }
466    }
467
468    fn detect_lost_packets(
469        &mut self, epoch: packet::Epoch, now: Instant, trace_id: &str,
470    ) -> (usize, usize) {
471        let loss_delay = cmp::max(self.rtt_stats.latest_rtt, self.rtt())
472            .mul_f64(self.time_thresh);
473
474        let loss = self.epochs[epoch].detect_lost_packets(
475            loss_delay,
476            self.pkt_thresh,
477            now,
478            trace_id,
479            epoch,
480        );
481
482        if let Some(pkt) = loss.largest_lost_pkt {
483            if !self.congestion.in_congestion_recovery(pkt.time_sent) {
484                (self.congestion.cc_ops.checkpoint)(&mut self.congestion);
485            }
486
487            (self.congestion.cc_ops.congestion_event)(
488                &mut self.congestion,
489                self.bytes_in_flight,
490                loss.lost_bytes,
491                &pkt,
492                now,
493            );
494
495            self.bytes_in_flight -= loss.lost_bytes;
496        };
497
498        self.bytes_in_flight -= loss.pmtud_lost_bytes;
499
500        self.epochs[epoch]
501            .drain_acked_and_lost_packets(now - self.rtt_stats.rtt());
502
503        self.congestion.lost_count += loss.lost_packets;
504
505        (loss.lost_packets, loss.lost_bytes)
506    }
507}
508
509impl RecoveryOps for LegacyRecovery {
510    /// Returns whether or not we should elicit an ACK even if we wouldn't
511    /// otherwise have constructed an ACK eliciting packet.
512    fn should_elicit_ack(&self, epoch: packet::Epoch) -> bool {
513        self.epochs[epoch].loss_probes > 0 ||
514            self.outstanding_non_ack_eliciting >=
515                MAX_OUTSTANDING_NON_ACK_ELICITING
516    }
517
518    fn get_acked_frames(&mut self, epoch: packet::Epoch) -> Vec<frame::Frame> {
519        std::mem::take(&mut self.epochs[epoch].acked_frames)
520    }
521
522    fn get_lost_frames(&mut self, epoch: packet::Epoch) -> Vec<frame::Frame> {
523        std::mem::take(&mut self.epochs[epoch].lost_frames)
524    }
525
526    fn get_largest_acked_on_epoch(&self, epoch: packet::Epoch) -> Option<u64> {
527        self.epochs[epoch].largest_acked_packet
528    }
529
530    fn has_lost_frames(&self, epoch: packet::Epoch) -> bool {
531        !self.epochs[epoch].lost_frames.is_empty()
532    }
533
534    fn loss_probes(&self, epoch: packet::Epoch) -> usize {
535        self.epochs[epoch].loss_probes
536    }
537
538    #[cfg(test)]
539    fn inc_loss_probes(&mut self, epoch: packet::Epoch) {
540        self.epochs[epoch].loss_probes += 1;
541    }
542
543    fn ping_sent(&mut self, epoch: packet::Epoch) {
544        self.epochs[epoch].loss_probes =
545            self.epochs[epoch].loss_probes.saturating_sub(1);
546    }
547
548    fn on_packet_sent(
549        &mut self, mut pkt: Sent, epoch: packet::Epoch,
550        handshake_status: HandshakeStatus, now: Instant, trace_id: &str,
551    ) {
552        let ack_eliciting = pkt.ack_eliciting;
553        let in_flight = pkt.in_flight;
554        let sent_bytes = pkt.size;
555
556        if ack_eliciting {
557            self.outstanding_non_ack_eliciting = 0;
558        } else {
559            self.outstanding_non_ack_eliciting += 1;
560        }
561
562        if in_flight && ack_eliciting {
563            self.epochs[epoch].time_of_last_ack_eliciting_packet = Some(now);
564        }
565
566        self.congestion.on_packet_sent(
567            self.bytes_in_flight,
568            sent_bytes,
569            now,
570            &mut pkt,
571            &self.rtt_stats,
572            self.bytes_lost,
573            in_flight,
574        );
575
576        if in_flight {
577            self.epochs[epoch].in_flight_count += 1;
578            self.bytes_in_flight += sent_bytes;
579
580            self.set_loss_detection_timer(handshake_status, now);
581        }
582
583        self.bytes_sent += sent_bytes;
584
585        self.epochs[epoch].sent_packets.push_back(pkt);
586
587        trace!("{} {:?}", trace_id, self);
588    }
589
590    fn get_packet_send_time(&self) -> Instant {
591        self.congestion.get_packet_send_time()
592    }
593
594    #[allow(clippy::too_many_arguments)]
595    fn on_ack_received(
596        &mut self, ranges: &ranges::RangeSet, ack_delay: u64,
597        epoch: packet::Epoch, handshake_status: HandshakeStatus, now: Instant,
598        trace_id: &str,
599    ) -> (usize, usize, usize) {
600        let largest_acked = ranges.last().unwrap();
601
602        // Update the largest acked packet.
603        let largest_acked = self.epochs[epoch]
604            .largest_acked_packet
605            .unwrap_or(0)
606            .max(largest_acked);
607
608        self.epochs[epoch].largest_acked_packet = Some(largest_acked);
609
610        let AckedDetectionResult {
611            acked_bytes,
612            spurious_losses,
613            spurious_pkt_thresh,
614            has_ack_eliciting,
615            has_in_flight_spurious_loss,
616        } = self.epochs[epoch].detect_and_remove_acked_packets(
617            now,
618            ranges,
619            &mut self.newly_acked,
620            &self.rtt_stats,
621            trace_id,
622        );
623
624        self.lost_spurious_count += spurious_losses;
625        if let Some(thresh) = spurious_pkt_thresh {
626            self.pkt_thresh =
627                self.pkt_thresh.max(thresh.min(MAX_PACKET_THRESHOLD));
628        }
629
630        // Undo congestion window update.
631        if has_in_flight_spurious_loss {
632            (self.congestion.cc_ops.rollback)(&mut self.congestion);
633        }
634
635        if self.newly_acked.is_empty() {
636            return (0, 0, 0);
637        }
638
639        // Check if largest packet is newly acked.
640        let largest_newly_acked = self.newly_acked.last().unwrap();
641
642        if largest_newly_acked.pkt_num == largest_acked && has_ack_eliciting {
643            let latest_rtt = now - largest_newly_acked.time_sent;
644            self.rtt_stats.update_rtt(
645                latest_rtt,
646                Duration::from_micros(ack_delay),
647                now,
648                handshake_status.completed,
649            );
650        }
651
652        // Detect and mark lost packets without removing them from the sent
653        // packets list.
654        let loss = self.detect_lost_packets(epoch, now, trace_id);
655
656        self.congestion.on_packets_acked(
657            self.bytes_in_flight,
658            &mut self.newly_acked,
659            &self.rtt_stats,
660            now,
661        );
662
663        self.bytes_in_flight -= acked_bytes;
664
665        self.pto_count = 0;
666
667        self.set_loss_detection_timer(handshake_status, now);
668
669        self.epochs[epoch]
670            .drain_acked_and_lost_packets(now - self.rtt_stats.rtt());
671
672        (loss.0, loss.1, acked_bytes)
673    }
674
675    fn on_loss_detection_timeout(
676        &mut self, handshake_status: HandshakeStatus, now: Instant,
677        trace_id: &str,
678    ) -> (usize, usize) {
679        let (earliest_loss_time, epoch) = self.loss_time_and_space();
680
681        if earliest_loss_time.is_some() {
682            // Time threshold loss detection.
683            let loss = self.detect_lost_packets(epoch, now, trace_id);
684
685            self.set_loss_detection_timer(handshake_status, now);
686
687            trace!("{} {:?}", trace_id, self);
688            return loss;
689        }
690
691        let epoch = if self.bytes_in_flight > 0 {
692            // Send new data if available, else retransmit old data. If neither
693            // is available, send a single PING frame.
694            let (_, e) = self.pto_time_and_space(handshake_status, now);
695
696            e
697        } else {
698            // Client sends an anti-deadlock packet: Initial is padded to earn
699            // more anti-amplification credit, a Handshake packet proves address
700            // ownership.
701            if handshake_status.has_handshake_keys {
702                packet::Epoch::Handshake
703            } else {
704                packet::Epoch::Initial
705            }
706        };
707
708        self.pto_count += 1;
709
710        let epoch = &mut self.epochs[epoch];
711
712        epoch.loss_probes =
713            cmp::min(self.pto_count as usize, MAX_PTO_PROBES_COUNT);
714
715        let unacked_iter = epoch.sent_packets
716            .iter_mut()
717            // Skip packets that have already been acked or lost, and packets
718            // that don't contain either CRYPTO or STREAM frames.
719            .filter(|p| p.has_data && p.time_acked.is_none() && p.time_lost.is_none())
720            // Only return as many packets as the number of probe packets that
721            // will be sent.
722            .take(epoch.loss_probes);
723
724        // Retransmit the frames from the oldest sent packets on PTO. However
725        // the packets are not actually declared lost (so there is no effect to
726        // congestion control), we just reschedule the data they carried.
727        //
728        // This will also trigger sending an ACK and retransmitting frames like
729        // HANDSHAKE_DONE and MAX_DATA / MAX_STREAM_DATA as well, in addition
730        // to CRYPTO and STREAM, if the original packet carried them.
731        for unacked in unacked_iter {
732            epoch.lost_frames.extend_from_slice(&unacked.frames);
733        }
734
735        self.set_loss_detection_timer(handshake_status, now);
736
737        trace!("{} {:?}", trace_id, self);
738
739        (0, 0)
740    }
741
742    fn on_pkt_num_space_discarded(
743        &mut self, epoch: packet::Epoch, handshake_status: HandshakeStatus,
744        now: Instant,
745    ) {
746        let epoch = &mut self.epochs[epoch];
747
748        let unacked_bytes = epoch
749            .sent_packets
750            .iter()
751            .filter(|p| {
752                p.in_flight && p.time_acked.is_none() && p.time_lost.is_none()
753            })
754            .fold(0, |acc, p| acc + p.size);
755
756        self.bytes_in_flight -= unacked_bytes;
757
758        epoch.sent_packets.clear();
759        epoch.lost_frames.clear();
760        epoch.acked_frames.clear();
761
762        epoch.time_of_last_ack_eliciting_packet = None;
763        epoch.loss_time = None;
764        epoch.loss_probes = 0;
765        epoch.in_flight_count = 0;
766
767        self.set_loss_detection_timer(handshake_status, now);
768    }
769
770    fn on_path_change(
771        &mut self, epoch: packet::Epoch, now: Instant, trace_id: &str,
772    ) -> (usize, usize) {
773        // Time threshold loss detection.
774        self.detect_lost_packets(epoch, now, trace_id)
775    }
776
777    fn loss_detection_timer(&self) -> Option<Instant> {
778        self.loss_timer.time
779    }
780
781    fn cwnd(&self) -> usize {
782        self.congestion.congestion_window()
783    }
784
785    fn cwnd_available(&self) -> usize {
786        // Ignore cwnd when sending probe packets.
787        if self.epochs.iter().any(|e| e.loss_probes > 0) {
788            return usize::MAX;
789        }
790
791        // Open more space (snd_cnt) for PRR when allowed.
792        self.cwnd().saturating_sub(self.bytes_in_flight) +
793            self.congestion.prr.snd_cnt
794    }
795
796    fn rtt(&self) -> Duration {
797        self.rtt_stats.rtt()
798    }
799
800    fn min_rtt(&self) -> Option<Duration> {
801        self.rtt_stats.min_rtt()
802    }
803
804    fn rttvar(&self) -> Duration {
805        self.rtt_stats.rttvar
806    }
807
808    fn pto(&self) -> Duration {
809        self.rtt() + cmp::max(self.rtt_stats.rttvar * 4, GRANULARITY)
810    }
811
812    fn delivery_rate(&self) -> u64 {
813        self.congestion.delivery_rate()
814    }
815
816    fn max_datagram_size(&self) -> usize {
817        self.max_datagram_size
818    }
819
820    fn pmtud_update_max_datagram_size(&mut self, new_max_datagram_size: usize) {
821        // Congestion Window is updated only when it's not updated already.
822        // Update cwnd if it hasn't been updated yet.
823        if self.cwnd() ==
824            self.max_datagram_size *
825                self.congestion.initial_congestion_window_packets
826        {
827            self.congestion.congestion_window = new_max_datagram_size *
828                self.congestion.initial_congestion_window_packets;
829        }
830
831        self.congestion.pacer = pacer::Pacer::new(
832            self.congestion.pacer.enabled(),
833            self.cwnd(),
834            0,
835            new_max_datagram_size,
836            self.congestion.pacer.max_pacing_rate(),
837        );
838
839        self.max_datagram_size = new_max_datagram_size;
840    }
841
842    fn update_max_datagram_size(&mut self, new_max_datagram_size: usize) {
843        self.pmtud_update_max_datagram_size(
844            self.max_datagram_size.min(new_max_datagram_size),
845        )
846    }
847
848    #[cfg(test)]
849    fn sent_packets_len(&self, epoch: packet::Epoch) -> usize {
850        self.epochs[epoch].sent_packets.len()
851    }
852
853    #[cfg(test)]
854    fn in_flight_count(&self, epoch: packet::Epoch) -> usize {
855        self.epochs[epoch].in_flight_count
856    }
857
858    #[cfg(test)]
859    fn bytes_in_flight(&self) -> usize {
860        self.bytes_in_flight
861    }
862
863    #[cfg(test)]
864    fn pacing_rate(&self) -> u64 {
865        self.congestion.pacer.rate()
866    }
867
868    #[cfg(test)]
869    fn pto_count(&self) -> u32 {
870        self.pto_count
871    }
872
873    #[cfg(test)]
874    fn pkt_thresh(&self) -> u64 {
875        self.pkt_thresh
876    }
877
878    #[cfg(test)]
879    fn lost_spurious_count(&self) -> usize {
880        self.lost_spurious_count
881    }
882
883    #[cfg(test)]
884    fn detect_lost_packets_for_test(
885        &mut self, epoch: packet::Epoch, now: Instant,
886    ) -> (usize, usize) {
887        self.detect_lost_packets(epoch, now, "")
888    }
889
890    fn on_app_limited(&mut self) {
891        // Not implemented for legacy recovery, update_app_limited and
892        // delivery_rate_update_app_limited used instead.
893    }
894
895    #[cfg(test)]
896    fn app_limited(&self) -> bool {
897        self.congestion.app_limited
898    }
899
900    fn update_app_limited(&mut self, v: bool) {
901        self.congestion.app_limited = v;
902    }
903
904    fn delivery_rate_update_app_limited(&mut self, v: bool) {
905        self.congestion.delivery_rate.update_app_limited(v);
906    }
907
908    fn update_max_ack_delay(&mut self, max_ack_delay: Duration) {
909        self.rtt_stats.max_ack_delay = max_ack_delay;
910    }
911
912    #[cfg(feature = "qlog")]
913    fn maybe_qlog(&mut self) -> Option<EventData> {
914        let qlog_metrics = QlogMetrics {
915            min_rtt: *self.rtt_stats.min_rtt,
916            smoothed_rtt: self.rtt(),
917            latest_rtt: self.rtt_stats.latest_rtt,
918            rttvar: self.rtt_stats.rttvar,
919            cwnd: self.cwnd() as u64,
920            bytes_in_flight: self.bytes_in_flight as u64,
921            ssthresh: Some(self.congestion.ssthresh as u64),
922            pacing_rate: self.congestion.pacer.rate(),
923        };
924
925        self.qlog_metrics.maybe_update(qlog_metrics)
926    }
927
928    fn send_quantum(&self) -> usize {
929        self.congestion.send_quantum()
930    }
931
932    // TODO tests
933    fn get_next_release_time(&self) -> ReleaseDecision {
934        let now = Instant::now();
935        let next_send_time = self.congestion.get_packet_send_time();
936        if next_send_time > now {
937            ReleaseDecision {
938                time: ReleaseTime::At(next_send_time),
939                allow_burst: false,
940            }
941        } else {
942            ReleaseDecision {
943                time: ReleaseTime::Immediate,
944                allow_burst: false,
945            }
946        }
947    }
948
949    fn gcongestion_enabled(&self) -> bool {
950        false
951    }
952
953    fn lost_count(&self) -> usize {
954        self.congestion.lost_count
955    }
956
957    fn bytes_lost(&self) -> u64 {
958        self.bytes_lost
959    }
960}
961
962impl std::fmt::Debug for LegacyRecovery {
963    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
964        write!(f, "timer={:?} ", self.loss_timer)?;
965        write!(f, "latest_rtt={:?} ", self.rtt_stats.latest_rtt)?;
966        write!(f, "srtt={:?} ", self.rtt_stats.smoothed_rtt)?;
967        write!(f, "min_rtt={:?} ", *self.rtt_stats.min_rtt)?;
968        write!(f, "rttvar={:?} ", self.rtt_stats.rttvar)?;
969        write!(f, "cwnd={} ", self.cwnd())?;
970        write!(f, "ssthresh={} ", self.congestion.ssthresh)?;
971        write!(f, "bytes_in_flight={} ", self.bytes_in_flight)?;
972        write!(f, "app_limited={} ", self.congestion.app_limited)?;
973        write!(
974            f,
975            "congestion_recovery_start_time={:?} ",
976            self.congestion.congestion_recovery_start_time
977        )?;
978        write!(f, "{:?} ", self.congestion.delivery_rate)?;
979        write!(f, "pacer={:?} ", self.congestion.pacer)?;
980
981        if self.congestion.hystart.enabled() {
982            write!(f, "hystart={:?} ", self.congestion.hystart)?;
983        }
984
985        // CC-specific debug info
986        (self.congestion.cc_ops.debug_fmt)(&self.congestion, f)?;
987
988        Ok(())
989    }
990}
991
992#[derive(Clone)]
993pub struct Acked {
994    pub pkt_num: u64,
995
996    pub time_sent: Instant,
997
998    pub size: usize,
999
1000    pub rtt: Duration,
1001
1002    pub delivered: usize,
1003
1004    pub delivered_time: Instant,
1005
1006    pub first_sent_time: Instant,
1007
1008    pub is_app_limited: bool,
1009}