quiche/recovery/
mod.rs

1// Copyright (C) 2018-2019, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27use std::cmp;
28
29use std::time::Duration;
30use std::time::Instant;
31
32use std::collections::VecDeque;
33
34use crate::packet::Epoch;
35use crate::ranges::RangeSet;
36use crate::Config;
37use crate::CongestionControlAlgorithm;
38use crate::Result;
39
40use crate::frame;
41use crate::packet;
42use crate::ranges;
43
44#[cfg(feature = "qlog")]
45use qlog::events::EventData;
46
47use smallvec::SmallVec;
48
49use self::congestion::pacer;
50use self::congestion::Congestion;
51use self::rtt::RttStats;
52
53// Loss Recovery
54const INITIAL_PACKET_THRESHOLD: u64 = 3;
55
56const MAX_PACKET_THRESHOLD: u64 = 20;
57
58const INITIAL_TIME_THRESHOLD: f64 = 9.0 / 8.0;
59
60const GRANULARITY: Duration = Duration::from_millis(1);
61
62const MAX_PTO_PROBES_COUNT: usize = 2;
63
64const MINIMUM_WINDOW_PACKETS: usize = 2;
65
66const LOSS_REDUCTION_FACTOR: f64 = 0.5;
67
68// How many non ACK eliciting packets we send before including a PING to solicit
69// an ACK.
70pub(super) const MAX_OUTSTANDING_NON_ACK_ELICITING: usize = 24;
71
72#[derive(Default)]
73struct RecoveryEpoch {
74    /// The time the most recent ack-eliciting packet was sent.
75    time_of_last_ack_eliciting_packet: Option<Instant>,
76
77    /// The largest packet number acknowledged in the packet number space so
78    /// far.
79    largest_acked_packet: Option<u64>,
80
81    /// The time at which the next packet in that packet number space can be
82    /// considered lost based on exceeding the reordering window in time.
83    loss_time: Option<Instant>,
84
85    /// An association of packet numbers in a packet number space to information
86    /// about them.
87    sent_packets: VecDeque<Sent>,
88
89    loss_probes: usize,
90    in_flight_count: usize,
91
92    acked_frames: Vec<frame::Frame>,
93    lost_frames: Vec<frame::Frame>,
94}
95
96struct AckedDetectionResult {
97    acked_bytes: usize,
98    spurious_losses: usize,
99    spurious_pkt_thresh: Option<u64>,
100    has_ack_eliciting: bool,
101    has_in_flight_spurious_loss: bool,
102}
103
104struct LossDetectionResult {
105    largest_lost_pkt: Option<Sent>,
106    lost_packets: usize,
107    lost_bytes: usize,
108    pmtud_lost_bytes: usize,
109}
110
111impl RecoveryEpoch {
112    fn detect_and_remove_acked_packets(
113        &mut self, now: Instant, acked: &RangeSet, newly_acked: &mut Vec<Acked>,
114        rtt_stats: &RttStats, trace_id: &str,
115    ) -> AckedDetectionResult {
116        newly_acked.clear();
117
118        let mut acked_bytes = 0;
119        let mut spurious_losses = 0;
120        let mut spurious_pkt_thresh = None;
121        let mut has_ack_eliciting = false;
122        let mut has_in_flight_spurious_loss = false;
123
124        let largest_acked = self.largest_acked_packet.unwrap();
125
126        for ack in acked.iter() {
127            // Because packets always have incrementing numbers, they are always
128            // in sorted order.
129            let start = if self
130                .sent_packets
131                .front()
132                .filter(|e| e.pkt_num >= ack.start)
133                .is_some()
134            {
135                // Usually it will be the first packet.
136                0
137            } else {
138                self.sent_packets
139                    .binary_search_by_key(&ack.start, |p| p.pkt_num)
140                    .unwrap_or_else(|e| e)
141            };
142
143            for unacked in self.sent_packets.range_mut(start..) {
144                if unacked.pkt_num >= ack.end {
145                    break;
146                }
147
148                if unacked.time_acked.is_some() {
149                    // Already acked.
150                } else if unacked.time_lost.is_some() {
151                    // An acked packet was already declared lost.
152                    spurious_losses += 1;
153                    spurious_pkt_thresh
154                        .get_or_insert(largest_acked - unacked.pkt_num + 1);
155                    unacked.time_acked = Some(now);
156
157                    if unacked.in_flight {
158                        has_in_flight_spurious_loss = true;
159                    }
160                } else {
161                    if unacked.in_flight {
162                        self.in_flight_count -= 1;
163                        acked_bytes += unacked.size;
164                    }
165
166                    newly_acked.push(Acked {
167                        pkt_num: unacked.pkt_num,
168                        time_sent: unacked.time_sent,
169                        size: unacked.size,
170
171                        rtt: now.saturating_duration_since(unacked.time_sent),
172                        delivered: unacked.delivered,
173                        delivered_time: unacked.delivered_time,
174                        first_sent_time: unacked.first_sent_time,
175                        is_app_limited: unacked.is_app_limited,
176                    });
177
178                    trace!("{} packet newly acked {}", trace_id, unacked.pkt_num);
179
180                    self.acked_frames
181                        .extend(std::mem::take(&mut unacked.frames));
182
183                    has_ack_eliciting |= unacked.ack_eliciting;
184                    unacked.time_acked = Some(now);
185                }
186            }
187        }
188
189        self.drain_acked_and_lost_packets(now - rtt_stats.rtt());
190
191        AckedDetectionResult {
192            acked_bytes,
193            spurious_losses,
194            spurious_pkt_thresh,
195            has_ack_eliciting,
196            has_in_flight_spurious_loss,
197        }
198    }
199
200    fn detect_lost_packets(
201        &mut self, loss_delay: Duration, pkt_thresh: u64, now: Instant,
202        trace_id: &str, epoch: Epoch,
203    ) -> LossDetectionResult {
204        self.loss_time = None;
205
206        // Minimum time of kGranularity before packets are deemed lost.
207        let loss_delay = cmp::max(loss_delay, GRANULARITY);
208        let largest_acked = self.largest_acked_packet.unwrap_or(0);
209
210        // Packets sent before this time are deemed lost.
211        let lost_send_time = now.checked_sub(loss_delay).unwrap();
212
213        let mut lost_packets = 0;
214        let mut lost_bytes = 0;
215        let mut pmtud_lost_bytes = 0;
216
217        let mut largest_lost_pkt = None;
218
219        let unacked_iter = self.sent_packets
220        .iter_mut()
221        // Skip packets that follow the largest acked packet.
222        .take_while(|p| p.pkt_num <= largest_acked)
223        // Skip packets that have already been acked or lost.
224        .filter(|p| p.time_acked.is_none() && p.time_lost.is_none());
225
226        for unacked in unacked_iter {
227            // Mark packet as lost, or set time when it should be marked.
228            if unacked.time_sent <= lost_send_time ||
229                largest_acked >= unacked.pkt_num + pkt_thresh
230            {
231                self.lost_frames.extend(unacked.frames.drain(..));
232
233                unacked.time_lost = Some(now);
234
235                if unacked.pmtud {
236                    pmtud_lost_bytes += unacked.size;
237                    self.in_flight_count -= 1;
238
239                    // Do not track PMTUD probes losses.
240                    continue;
241                }
242
243                if unacked.in_flight {
244                    lost_bytes += unacked.size;
245
246                    // Frames have already been removed from the packet, so
247                    // cloning the whole packet should be relatively cheap.
248                    largest_lost_pkt = Some(unacked.clone());
249
250                    self.in_flight_count -= 1;
251
252                    trace!(
253                        "{} packet {} lost on epoch {}",
254                        trace_id,
255                        unacked.pkt_num,
256                        epoch
257                    );
258                }
259
260                lost_packets += 1;
261            } else {
262                let loss_time = match self.loss_time {
263                    None => unacked.time_sent + loss_delay,
264
265                    Some(loss_time) =>
266                        cmp::min(loss_time, unacked.time_sent + loss_delay),
267                };
268
269                self.loss_time = Some(loss_time);
270                break;
271            }
272        }
273
274        LossDetectionResult {
275            largest_lost_pkt,
276            lost_packets,
277            lost_bytes,
278            pmtud_lost_bytes,
279        }
280    }
281
282    fn drain_acked_and_lost_packets(&mut self, loss_thresh: Instant) {
283        // In order to avoid removing elements from the middle of the list
284        // (which would require copying other elements to compact the list),
285        // we only remove a contiguous range of elements from the start of the
286        // list.
287        //
288        // This means that acked or lost elements coming after this will not
289        // be removed at this point, but their removal is delayed for a later
290        // time, once the gaps have been filled.
291        while let Some(pkt) = self.sent_packets.front() {
292            if let Some(time_lost) = pkt.time_lost {
293                if time_lost > loss_thresh {
294                    break;
295                }
296            }
297
298            if pkt.time_acked.is_none() && pkt.time_lost.is_none() {
299                break;
300            }
301
302            self.sent_packets.pop_front();
303        }
304    }
305}
306
307#[derive(Default)]
308struct LossDetectionTimer {
309    time: Option<Instant>,
310}
311
312impl LossDetectionTimer {
313    fn update(&mut self, timeout: Instant) {
314        self.time = Some(timeout);
315    }
316
317    fn clear(&mut self) {
318        self.time = None;
319    }
320}
321pub struct Recovery {
322    epochs: [RecoveryEpoch; packet::Epoch::count()],
323
324    loss_timer: LossDetectionTimer,
325
326    pto_count: u32,
327
328    rtt_stats: RttStats,
329
330    pub lost_spurious_count: usize,
331
332    pkt_thresh: u64,
333
334    time_thresh: f64,
335
336    bytes_in_flight: usize,
337
338    bytes_sent: usize,
339
340    pub bytes_lost: u64,
341
342    max_datagram_size: usize,
343
344    #[cfg(feature = "qlog")]
345    qlog_metrics: QlogMetrics,
346
347    /// How many non-ack-eliciting packets have been sent.
348    outstanding_non_ack_eliciting: usize,
349
350    congestion: Congestion,
351
352    /// A resusable list of acks.
353    newly_acked: Vec<Acked>,
354}
355
356#[derive(Clone, Copy, Eq, PartialEq)]
357pub struct RecoveryConfig {
358    pub max_send_udp_payload_size: usize,
359    pub max_ack_delay: Duration,
360    pub cc_algorithm: CongestionControlAlgorithm,
361    pub hystart: bool,
362    pub pacing: bool,
363    pub max_pacing_rate: Option<u64>,
364    pub initial_congestion_window_packets: usize,
365}
366
367impl RecoveryConfig {
368    pub fn from_config(config: &Config) -> Self {
369        Self {
370            max_send_udp_payload_size: config.max_send_udp_payload_size,
371            max_ack_delay: Duration::ZERO,
372            cc_algorithm: config.cc_algorithm,
373            hystart: config.hystart,
374            pacing: config.pacing,
375            max_pacing_rate: config.max_pacing_rate,
376            initial_congestion_window_packets: config
377                .initial_congestion_window_packets,
378        }
379    }
380}
381
382impl Recovery {
383    pub fn new_with_config(recovery_config: &RecoveryConfig) -> Self {
384        Recovery {
385            epochs: Default::default(),
386
387            loss_timer: Default::default(),
388
389            pto_count: 0,
390
391            rtt_stats: RttStats::new(recovery_config.max_ack_delay),
392
393            lost_spurious_count: 0,
394
395            pkt_thresh: INITIAL_PACKET_THRESHOLD,
396
397            time_thresh: INITIAL_TIME_THRESHOLD,
398
399            bytes_in_flight: 0,
400
401            bytes_sent: 0,
402
403            bytes_lost: 0,
404
405            max_datagram_size: recovery_config.max_send_udp_payload_size,
406
407            #[cfg(feature = "qlog")]
408            qlog_metrics: QlogMetrics::default(),
409
410            outstanding_non_ack_eliciting: 0,
411
412            congestion: Congestion::from_config(recovery_config),
413
414            newly_acked: Vec::new(),
415        }
416    }
417
418    #[cfg(test)]
419    pub fn new(config: &Config) -> Self {
420        Self::new_with_config(&RecoveryConfig::from_config(config))
421    }
422
423    /// Returns whether or not we should elicit an ACK even if we wouldn't
424    /// otherwise have constructed an ACK eliciting packet.
425    pub fn should_elicit_ack(&self, epoch: packet::Epoch) -> bool {
426        self.epochs[epoch].loss_probes > 0 ||
427            self.outstanding_non_ack_eliciting >=
428                MAX_OUTSTANDING_NON_ACK_ELICITING
429    }
430
431    pub fn get_acked_frames(
432        &mut self, epoch: packet::Epoch,
433    ) -> impl Iterator<Item = frame::Frame> + '_ {
434        self.epochs[epoch].acked_frames.drain(..)
435    }
436
437    pub fn get_lost_frames(
438        &mut self, epoch: packet::Epoch,
439    ) -> impl Iterator<Item = frame::Frame> + '_ {
440        self.epochs[epoch].lost_frames.drain(..)
441    }
442
443    pub fn get_largest_acked_on_epoch(
444        &self, epoch: packet::Epoch,
445    ) -> Option<u64> {
446        self.epochs[epoch].largest_acked_packet
447    }
448
449    pub fn has_lost_frames(&self, epoch: packet::Epoch) -> bool {
450        !self.epochs[epoch].lost_frames.is_empty()
451    }
452
453    pub fn loss_probes(&self, epoch: packet::Epoch) -> usize {
454        self.epochs[epoch].loss_probes
455    }
456
457    #[cfg(test)]
458    pub fn inc_loss_probes(&mut self, epoch: packet::Epoch) {
459        self.epochs[epoch].loss_probes += 1;
460    }
461
462    pub fn ping_sent(&mut self, epoch: packet::Epoch) {
463        self.epochs[epoch].loss_probes =
464            self.epochs[epoch].loss_probes.saturating_sub(1);
465    }
466
467    pub fn on_packet_sent(
468        &mut self, mut pkt: Sent, epoch: packet::Epoch,
469        handshake_status: HandshakeStatus, now: Instant, trace_id: &str,
470    ) {
471        let ack_eliciting = pkt.ack_eliciting;
472        let in_flight = pkt.in_flight;
473        let sent_bytes = pkt.size;
474
475        if ack_eliciting {
476            self.outstanding_non_ack_eliciting = 0;
477        } else {
478            self.outstanding_non_ack_eliciting += 1;
479        }
480
481        if in_flight && ack_eliciting {
482            self.epochs[epoch].time_of_last_ack_eliciting_packet = Some(now);
483        }
484
485        self.congestion.on_packet_sent(
486            self.bytes_in_flight,
487            sent_bytes,
488            now,
489            &mut pkt,
490            &self.rtt_stats,
491            self.bytes_lost,
492            in_flight,
493        );
494
495        if in_flight {
496            self.epochs[epoch].in_flight_count += 1;
497            self.bytes_in_flight += sent_bytes;
498
499            self.set_loss_detection_timer(handshake_status, now);
500        }
501
502        self.bytes_sent += sent_bytes;
503
504        self.epochs[epoch].sent_packets.push_back(pkt);
505
506        trace!("{} {:?}", trace_id, self);
507    }
508
509    pub fn get_packet_send_time(&self) -> Instant {
510        self.congestion.get_packet_send_time()
511    }
512
513    #[allow(clippy::too_many_arguments)]
514    pub fn on_ack_received(
515        &mut self, ranges: &ranges::RangeSet, ack_delay: u64,
516        epoch: packet::Epoch, handshake_status: HandshakeStatus, now: Instant,
517        trace_id: &str,
518    ) -> Result<(usize, usize, usize)> {
519        let largest_acked = ranges.last().unwrap();
520
521        // Update the largest acked packet.
522        let largest_acked = self.epochs[epoch]
523            .largest_acked_packet
524            .unwrap_or(0)
525            .max(largest_acked);
526
527        self.epochs[epoch].largest_acked_packet = Some(largest_acked);
528
529        let AckedDetectionResult {
530            acked_bytes,
531            spurious_losses,
532            spurious_pkt_thresh,
533            has_ack_eliciting,
534            has_in_flight_spurious_loss,
535        } = self.epochs[epoch].detect_and_remove_acked_packets(
536            now,
537            ranges,
538            &mut self.newly_acked,
539            &self.rtt_stats,
540            trace_id,
541        );
542
543        self.lost_spurious_count += spurious_losses;
544        if let Some(thresh) = spurious_pkt_thresh {
545            self.pkt_thresh =
546                self.pkt_thresh.max(thresh.min(MAX_PACKET_THRESHOLD));
547        }
548
549        // Undo congestion window update.
550        if has_in_flight_spurious_loss {
551            (self.congestion.cc_ops.rollback)(&mut self.congestion);
552        }
553
554        if self.newly_acked.is_empty() {
555            return Ok((0, 0, 0));
556        }
557
558        // Check if largest packet is newly acked.
559        let largest_newly_acked = self.newly_acked.last().unwrap();
560
561        if largest_newly_acked.pkt_num == largest_acked && has_ack_eliciting {
562            let latest_rtt = now - largest_newly_acked.time_sent;
563            self.rtt_stats.update_rtt(
564                latest_rtt,
565                Duration::from_micros(ack_delay),
566                now,
567                handshake_status.completed,
568            );
569        }
570
571        // Detect and mark lost packets without removing them from the sent
572        // packets list.
573        let loss = self.detect_lost_packets(epoch, now, trace_id);
574
575        self.congestion.on_packets_acked(
576            self.bytes_in_flight,
577            &mut self.newly_acked,
578            &self.rtt_stats,
579            now,
580        );
581
582        self.bytes_in_flight -= acked_bytes;
583
584        self.pto_count = 0;
585
586        self.set_loss_detection_timer(handshake_status, now);
587
588        self.epochs[epoch]
589            .drain_acked_and_lost_packets(now - self.rtt_stats.rtt());
590
591        Ok((loss.0, loss.1, acked_bytes))
592    }
593
594    pub fn on_loss_detection_timeout(
595        &mut self, handshake_status: HandshakeStatus, now: Instant,
596        trace_id: &str,
597    ) -> (usize, usize) {
598        let (earliest_loss_time, epoch) = self.loss_time_and_space();
599
600        if earliest_loss_time.is_some() {
601            // Time threshold loss detection.
602            let loss = self.detect_lost_packets(epoch, now, trace_id);
603
604            self.set_loss_detection_timer(handshake_status, now);
605
606            trace!("{} {:?}", trace_id, self);
607            return loss;
608        }
609
610        let epoch = if self.bytes_in_flight > 0 {
611            // Send new data if available, else retransmit old data. If neither
612            // is available, send a single PING frame.
613            let (_, e) = self.pto_time_and_space(handshake_status, now);
614
615            e
616        } else {
617            // Client sends an anti-deadlock packet: Initial is padded to earn
618            // more anti-amplification credit, a Handshake packet proves address
619            // ownership.
620            if handshake_status.has_handshake_keys {
621                packet::Epoch::Handshake
622            } else {
623                packet::Epoch::Initial
624            }
625        };
626
627        self.pto_count += 1;
628
629        let epoch = &mut self.epochs[epoch];
630
631        epoch.loss_probes =
632            cmp::min(self.pto_count as usize, MAX_PTO_PROBES_COUNT);
633
634        let unacked_iter = epoch.sent_packets
635            .iter_mut()
636            // Skip packets that have already been acked or lost, and packets
637            // that don't contain either CRYPTO or STREAM frames.
638            .filter(|p| p.has_data && p.time_acked.is_none() && p.time_lost.is_none())
639            // Only return as many packets as the number of probe packets that
640            // will be sent.
641            .take(epoch.loss_probes);
642
643        // Retransmit the frames from the oldest sent packets on PTO. However
644        // the packets are not actually declared lost (so there is no effect to
645        // congestion control), we just reschedule the data they carried.
646        //
647        // This will also trigger sending an ACK and retransmitting frames like
648        // HANDSHAKE_DONE and MAX_DATA / MAX_STREAM_DATA as well, in addition
649        // to CRYPTO and STREAM, if the original packet carried them.
650        for unacked in unacked_iter {
651            epoch.lost_frames.extend_from_slice(&unacked.frames);
652        }
653
654        self.set_loss_detection_timer(handshake_status, now);
655
656        trace!("{} {:?}", trace_id, self);
657
658        (0, 0)
659    }
660
661    pub fn on_pkt_num_space_discarded(
662        &mut self, epoch: packet::Epoch, handshake_status: HandshakeStatus,
663        now: Instant,
664    ) {
665        let epoch = &mut self.epochs[epoch];
666
667        let unacked_bytes = epoch
668            .sent_packets
669            .iter()
670            .filter(|p| {
671                p.in_flight && p.time_acked.is_none() && p.time_lost.is_none()
672            })
673            .fold(0, |acc, p| acc + p.size);
674
675        self.bytes_in_flight -= unacked_bytes;
676
677        epoch.sent_packets.clear();
678        epoch.lost_frames.clear();
679        epoch.acked_frames.clear();
680
681        epoch.time_of_last_ack_eliciting_packet = None;
682        epoch.loss_time = None;
683        epoch.loss_probes = 0;
684        epoch.in_flight_count = 0;
685
686        self.set_loss_detection_timer(handshake_status, now);
687    }
688
689    pub fn on_path_change(
690        &mut self, epoch: packet::Epoch, now: Instant, trace_id: &str,
691    ) -> (usize, usize) {
692        // Time threshold loss detection.
693        self.detect_lost_packets(epoch, now, trace_id)
694    }
695
696    pub fn loss_detection_timer(&self) -> Option<Instant> {
697        self.loss_timer.time
698    }
699
700    pub fn cwnd(&self) -> usize {
701        self.congestion.congestion_window()
702    }
703
704    pub fn cwnd_available(&self) -> usize {
705        // Ignore cwnd when sending probe packets.
706        if self.epochs.iter().any(|e| e.loss_probes > 0) {
707            return usize::MAX;
708        }
709
710        // Open more space (snd_cnt) for PRR when allowed.
711        self.cwnd().saturating_sub(self.bytes_in_flight) +
712            self.congestion.prr.snd_cnt
713    }
714
715    pub fn rtt(&self) -> Duration {
716        self.rtt_stats.rtt()
717    }
718
719    pub fn min_rtt(&self) -> Option<Duration> {
720        self.rtt_stats.min_rtt()
721    }
722
723    pub fn rttvar(&self) -> Duration {
724        self.rtt_stats.rttvar
725    }
726
727    pub fn pto(&self) -> Duration {
728        self.rtt() + cmp::max(self.rtt_stats.rttvar * 4, GRANULARITY)
729    }
730
731    pub fn delivery_rate(&self) -> u64 {
732        self.congestion.delivery_rate()
733    }
734
735    pub fn max_datagram_size(&self) -> usize {
736        self.max_datagram_size
737    }
738
739    pub fn pmtud_update_max_datagram_size(
740        &mut self, new_max_datagram_size: usize,
741    ) {
742        // Congestion Window is updated only when it's not updated already.
743        // Update cwnd if it hasn't been updated yet.
744        if self.cwnd() ==
745            self.max_datagram_size *
746                self.congestion.initial_congestion_window_packets
747        {
748            self.congestion.congestion_window = new_max_datagram_size *
749                self.congestion.initial_congestion_window_packets;
750        }
751
752        self.congestion.pacer = pacer::Pacer::new(
753            self.congestion.pacer.enabled(),
754            self.cwnd(),
755            0,
756            new_max_datagram_size,
757            self.congestion.pacer.max_pacing_rate(),
758        );
759
760        self.max_datagram_size = new_max_datagram_size;
761    }
762
763    pub fn update_max_datagram_size(&mut self, new_max_datagram_size: usize) {
764        self.pmtud_update_max_datagram_size(
765            self.max_datagram_size.min(new_max_datagram_size),
766        )
767    }
768
769    fn loss_time_and_space(&self) -> (Option<Instant>, packet::Epoch) {
770        let mut epoch = packet::Epoch::Initial;
771        let mut time = self.epochs[epoch].loss_time;
772
773        // Iterate over all packet number spaces starting from Handshake.
774        for e in [packet::Epoch::Handshake, packet::Epoch::Application] {
775            let new_time = self.epochs[e].loss_time;
776            if time.is_none() || new_time < time {
777                time = new_time;
778                epoch = e;
779            }
780        }
781
782        (time, epoch)
783    }
784
785    fn pto_time_and_space(
786        &self, handshake_status: HandshakeStatus, now: Instant,
787    ) -> (Option<Instant>, packet::Epoch) {
788        let mut duration = self.pto() * 2_u32.pow(self.pto_count);
789
790        // Arm PTO from now when there are no inflight packets.
791        if self.bytes_in_flight == 0 {
792            if handshake_status.has_handshake_keys {
793                return (Some(now + duration), packet::Epoch::Handshake);
794            } else {
795                return (Some(now + duration), packet::Epoch::Initial);
796            }
797        }
798
799        let mut pto_timeout = None;
800        let mut pto_space = packet::Epoch::Initial;
801
802        // Iterate over all packet number spaces.
803        for e in [
804            packet::Epoch::Initial,
805            packet::Epoch::Handshake,
806            packet::Epoch::Application,
807        ] {
808            let epoch = &self.epochs[e];
809            if epoch.in_flight_count == 0 {
810                continue;
811            }
812
813            if e == packet::Epoch::Application {
814                // Skip Application Data until handshake completes.
815                if !handshake_status.completed {
816                    return (pto_timeout, pto_space);
817                }
818
819                // Include max_ack_delay and backoff for Application Data.
820                duration +=
821                    self.rtt_stats.max_ack_delay * 2_u32.pow(self.pto_count);
822            }
823
824            let new_time = epoch
825                .time_of_last_ack_eliciting_packet
826                .map(|t| t + duration);
827
828            if pto_timeout.is_none() || new_time < pto_timeout {
829                pto_timeout = new_time;
830                pto_space = e;
831            }
832        }
833
834        (pto_timeout, pto_space)
835    }
836
837    fn set_loss_detection_timer(
838        &mut self, handshake_status: HandshakeStatus, now: Instant,
839    ) {
840        let (earliest_loss_time, _) = self.loss_time_and_space();
841
842        if let Some(to) = earliest_loss_time {
843            // Time threshold loss detection.
844            self.loss_timer.update(to);
845            return;
846        }
847
848        if self.bytes_in_flight == 0 && handshake_status.peer_verified_address {
849            self.loss_timer.clear();
850            return;
851        }
852
853        // PTO timer.
854        if let (Some(timeout), _) = self.pto_time_and_space(handshake_status, now)
855        {
856            self.loss_timer.update(timeout);
857        }
858    }
859
860    fn detect_lost_packets(
861        &mut self, epoch: packet::Epoch, now: Instant, trace_id: &str,
862    ) -> (usize, usize) {
863        let loss_delay = cmp::max(self.rtt_stats.latest_rtt, self.rtt())
864            .mul_f64(self.time_thresh);
865
866        let loss = self.epochs[epoch].detect_lost_packets(
867            loss_delay,
868            self.pkt_thresh,
869            now,
870            trace_id,
871            epoch,
872        );
873
874        if let Some(pkt) = loss.largest_lost_pkt {
875            if !self.congestion.in_congestion_recovery(pkt.time_sent) {
876                (self.congestion.cc_ops.checkpoint)(&mut self.congestion);
877            }
878
879            (self.congestion.cc_ops.congestion_event)(
880                &mut self.congestion,
881                self.bytes_in_flight,
882                loss.lost_bytes,
883                &pkt,
884                now,
885            );
886
887            self.bytes_in_flight -= loss.lost_bytes;
888        };
889
890        self.bytes_in_flight -= loss.pmtud_lost_bytes;
891
892        self.epochs[epoch]
893            .drain_acked_and_lost_packets(now - self.rtt_stats.rtt());
894
895        self.congestion.lost_count += loss.lost_packets;
896
897        (loss.lost_packets, loss.lost_bytes)
898    }
899
900    pub fn update_app_limited(&mut self, v: bool) {
901        self.congestion.app_limited = v;
902    }
903
904    #[cfg(test)]
905    pub fn app_limited(&self) -> bool {
906        self.congestion.app_limited
907    }
908
909    pub fn delivery_rate_update_app_limited(&mut self, v: bool) {
910        self.congestion.delivery_rate.update_app_limited(v);
911    }
912
913    pub fn update_max_ack_delay(&mut self, max_ack_delay: Duration) {
914        self.rtt_stats.max_ack_delay = max_ack_delay;
915    }
916
917    #[cfg(feature = "qlog")]
918    pub fn maybe_qlog(&mut self) -> Option<EventData> {
919        let qlog_metrics = QlogMetrics {
920            min_rtt: *self.rtt_stats.min_rtt,
921            smoothed_rtt: self.rtt(),
922            latest_rtt: self.rtt_stats.latest_rtt,
923            rttvar: self.rtt_stats.rttvar,
924            cwnd: self.cwnd() as u64,
925            bytes_in_flight: self.bytes_in_flight as u64,
926            ssthresh: self.congestion.ssthresh as u64,
927            pacing_rate: self.congestion.pacer.rate(),
928        };
929
930        self.qlog_metrics.maybe_update(qlog_metrics)
931    }
932
933    pub fn send_quantum(&self) -> usize {
934        self.congestion.send_quantum()
935    }
936
937    pub fn lost_count(&self) -> usize {
938        self.congestion.lost_count
939    }
940}
941
942impl std::fmt::Debug for Recovery {
943    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
944        match self.loss_timer.time {
945            Some(v) => {
946                let now = Instant::now();
947
948                if v > now {
949                    let d = v.duration_since(now);
950                    write!(f, "timer={d:?} ")?;
951                } else {
952                    write!(f, "timer=exp ")?;
953                }
954            },
955
956            None => {
957                write!(f, "timer=none ")?;
958            },
959        };
960
961        write!(f, "latest_rtt={:?} ", self.rtt_stats.latest_rtt)?;
962        write!(f, "srtt={:?} ", self.rtt_stats.smoothed_rtt)?;
963        write!(f, "min_rtt={:?} ", *self.rtt_stats.min_rtt)?;
964        write!(f, "rttvar={:?} ", self.rtt_stats.rttvar)?;
965        write!(f, "cwnd={} ", self.cwnd())?;
966        write!(f, "ssthresh={} ", self.congestion.ssthresh)?;
967        write!(f, "bytes_in_flight={} ", self.bytes_in_flight)?;
968        write!(f, "app_limited={} ", self.congestion.app_limited)?;
969        write!(
970            f,
971            "congestion_recovery_start_time={:?} ",
972            self.congestion.congestion_recovery_start_time
973        )?;
974        write!(f, "{:?} ", self.congestion.delivery_rate)?;
975        write!(f, "pacer={:?} ", self.congestion.pacer)?;
976
977        if self.congestion.hystart.enabled() {
978            write!(f, "hystart={:?} ", self.congestion.hystart)?;
979        }
980
981        // CC-specific debug info
982        (self.congestion.cc_ops.debug_fmt)(&self.congestion, f)?;
983
984        Ok(())
985    }
986}
987
988#[derive(Clone)]
989pub struct Sent {
990    pub pkt_num: u64,
991
992    pub frames: SmallVec<[frame::Frame; 1]>,
993
994    pub time_sent: Instant,
995
996    pub time_acked: Option<Instant>,
997
998    pub time_lost: Option<Instant>,
999
1000    pub size: usize,
1001
1002    pub ack_eliciting: bool,
1003
1004    pub in_flight: bool,
1005
1006    pub delivered: usize,
1007
1008    pub delivered_time: Instant,
1009
1010    pub first_sent_time: Instant,
1011
1012    pub is_app_limited: bool,
1013
1014    pub tx_in_flight: usize,
1015
1016    pub lost: u64,
1017
1018    pub has_data: bool,
1019
1020    pub pmtud: bool,
1021}
1022
1023impl std::fmt::Debug for Sent {
1024    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
1025        write!(f, "pkt_num={:?} ", self.pkt_num)?;
1026        write!(f, "pkt_sent_time={:?} ", self.time_sent)?;
1027        write!(f, "pkt_size={:?} ", self.size)?;
1028        write!(f, "delivered={:?} ", self.delivered)?;
1029        write!(f, "delivered_time={:?} ", self.delivered_time)?;
1030        write!(f, "first_sent_time={:?} ", self.first_sent_time)?;
1031        write!(f, "is_app_limited={} ", self.is_app_limited)?;
1032        write!(f, "tx_in_flight={} ", self.tx_in_flight)?;
1033        write!(f, "lost={} ", self.lost)?;
1034        write!(f, "has_data={} ", self.has_data)?;
1035        write!(f, "pmtud={}", self.pmtud)?;
1036
1037        Ok(())
1038    }
1039}
1040
1041#[derive(Clone)]
1042pub struct Acked {
1043    pub pkt_num: u64,
1044
1045    pub time_sent: Instant,
1046
1047    pub size: usize,
1048
1049    pub rtt: Duration,
1050
1051    pub delivered: usize,
1052
1053    pub delivered_time: Instant,
1054
1055    pub first_sent_time: Instant,
1056
1057    pub is_app_limited: bool,
1058}
1059
1060#[derive(Clone, Copy, Debug)]
1061pub struct HandshakeStatus {
1062    pub has_handshake_keys: bool,
1063
1064    pub peer_verified_address: bool,
1065
1066    pub completed: bool,
1067}
1068
1069#[cfg(test)]
1070impl Default for HandshakeStatus {
1071    fn default() -> HandshakeStatus {
1072        HandshakeStatus {
1073            has_handshake_keys: true,
1074
1075            peer_verified_address: true,
1076
1077            completed: true,
1078        }
1079    }
1080}
1081
1082// We don't need to log all qlog metrics every time there is a recovery event.
1083// Instead, we can log only the MetricsUpdated event data fields that we care
1084// about, only when they change. To support this, the QLogMetrics structure
1085// keeps a running picture of the fields.
1086#[derive(Default)]
1087#[cfg(feature = "qlog")]
1088struct QlogMetrics {
1089    min_rtt: Duration,
1090    smoothed_rtt: Duration,
1091    latest_rtt: Duration,
1092    rttvar: Duration,
1093    cwnd: u64,
1094    bytes_in_flight: u64,
1095    ssthresh: u64,
1096    pacing_rate: u64,
1097}
1098
1099#[cfg(feature = "qlog")]
1100impl QlogMetrics {
1101    // Make a qlog event if the latest instance of QlogMetrics is different.
1102    //
1103    // This function diffs each of the fields. A qlog MetricsUpdated event is
1104    // only generated if at least one field is different. Where fields are
1105    // different, the qlog event contains the latest value.
1106    fn maybe_update(&mut self, latest: Self) -> Option<EventData> {
1107        let mut emit_event = false;
1108
1109        let new_min_rtt = if self.min_rtt != latest.min_rtt {
1110            self.min_rtt = latest.min_rtt;
1111            emit_event = true;
1112            Some(latest.min_rtt.as_secs_f32() * 1000.0)
1113        } else {
1114            None
1115        };
1116
1117        let new_smoothed_rtt = if self.smoothed_rtt != latest.smoothed_rtt {
1118            self.smoothed_rtt = latest.smoothed_rtt;
1119            emit_event = true;
1120            Some(latest.smoothed_rtt.as_secs_f32() * 1000.0)
1121        } else {
1122            None
1123        };
1124
1125        let new_latest_rtt = if self.latest_rtt != latest.latest_rtt {
1126            self.latest_rtt = latest.latest_rtt;
1127            emit_event = true;
1128            Some(latest.latest_rtt.as_secs_f32() * 1000.0)
1129        } else {
1130            None
1131        };
1132
1133        let new_rttvar = if self.rttvar != latest.rttvar {
1134            self.rttvar = latest.rttvar;
1135            emit_event = true;
1136            Some(latest.rttvar.as_secs_f32() * 1000.0)
1137        } else {
1138            None
1139        };
1140
1141        let new_cwnd = if self.cwnd != latest.cwnd {
1142            self.cwnd = latest.cwnd;
1143            emit_event = true;
1144            Some(latest.cwnd)
1145        } else {
1146            None
1147        };
1148
1149        let new_bytes_in_flight =
1150            if self.bytes_in_flight != latest.bytes_in_flight {
1151                self.bytes_in_flight = latest.bytes_in_flight;
1152                emit_event = true;
1153                Some(latest.bytes_in_flight)
1154            } else {
1155                None
1156            };
1157
1158        let new_ssthresh = if self.ssthresh != latest.ssthresh {
1159            self.ssthresh = latest.ssthresh;
1160            emit_event = true;
1161            Some(latest.ssthresh)
1162        } else {
1163            None
1164        };
1165
1166        let new_pacing_rate = if self.pacing_rate != latest.pacing_rate {
1167            self.pacing_rate = latest.pacing_rate;
1168            emit_event = true;
1169            Some(latest.pacing_rate)
1170        } else {
1171            None
1172        };
1173
1174        if emit_event {
1175            // QVis can't use all these fields and they can be large.
1176            return Some(EventData::MetricsUpdated(
1177                qlog::events::quic::MetricsUpdated {
1178                    min_rtt: new_min_rtt,
1179                    smoothed_rtt: new_smoothed_rtt,
1180                    latest_rtt: new_latest_rtt,
1181                    rtt_variance: new_rttvar,
1182                    congestion_window: new_cwnd,
1183                    bytes_in_flight: new_bytes_in_flight,
1184                    ssthresh: new_ssthresh,
1185                    pacing_rate: new_pacing_rate,
1186                    ..Default::default()
1187                },
1188            ));
1189        }
1190
1191        None
1192    }
1193}
1194
1195#[cfg(test)]
1196mod tests {
1197    use super::*;
1198    use smallvec::smallvec;
1199    use std::str::FromStr;
1200
1201    #[test]
1202    fn lookup_cc_algo_ok() {
1203        let algo = CongestionControlAlgorithm::from_str("reno").unwrap();
1204        assert_eq!(algo, CongestionControlAlgorithm::Reno);
1205    }
1206
1207    #[test]
1208    fn lookup_cc_algo_bad() {
1209        assert_eq!(
1210            CongestionControlAlgorithm::from_str("???"),
1211            Err(crate::Error::CongestionControl)
1212        );
1213    }
1214
1215    #[test]
1216    fn loss_on_pto() {
1217        let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
1218        cfg.set_cc_algorithm(CongestionControlAlgorithm::Reno);
1219
1220        let mut r = Recovery::new(&cfg);
1221
1222        let mut now = Instant::now();
1223
1224        assert_eq!(r.epochs[packet::Epoch::Application].sent_packets.len(), 0);
1225
1226        // Start by sending a few packets.
1227        let p = Sent {
1228            pkt_num: 0,
1229            frames: smallvec![],
1230            time_sent: now,
1231            time_acked: None,
1232            time_lost: None,
1233            size: 1000,
1234            ack_eliciting: true,
1235            in_flight: true,
1236            delivered: 0,
1237            delivered_time: now,
1238            first_sent_time: now,
1239            is_app_limited: false,
1240            tx_in_flight: 0,
1241            lost: 0,
1242            has_data: false,
1243            pmtud: false,
1244        };
1245
1246        r.on_packet_sent(
1247            p,
1248            packet::Epoch::Application,
1249            HandshakeStatus::default(),
1250            now,
1251            "",
1252        );
1253
1254        assert_eq!(r.epochs[packet::Epoch::Application].sent_packets.len(), 1);
1255        assert_eq!(r.bytes_in_flight, 1000);
1256
1257        let p = Sent {
1258            pkt_num: 1,
1259            frames: smallvec![],
1260            time_sent: now,
1261            time_acked: None,
1262            time_lost: None,
1263            size: 1000,
1264            ack_eliciting: true,
1265            in_flight: true,
1266            delivered: 0,
1267            delivered_time: now,
1268            first_sent_time: now,
1269            is_app_limited: false,
1270            tx_in_flight: 0,
1271            lost: 0,
1272            has_data: false,
1273            pmtud: false,
1274        };
1275
1276        r.on_packet_sent(
1277            p,
1278            packet::Epoch::Application,
1279            HandshakeStatus::default(),
1280            now,
1281            "",
1282        );
1283
1284        assert_eq!(r.epochs[packet::Epoch::Application].sent_packets.len(), 2);
1285        assert_eq!(r.bytes_in_flight, 2000);
1286
1287        let p = Sent {
1288            pkt_num: 2,
1289            frames: smallvec![],
1290            time_sent: now,
1291            time_acked: None,
1292            time_lost: None,
1293            size: 1000,
1294            ack_eliciting: true,
1295            in_flight: true,
1296            delivered: 0,
1297            delivered_time: now,
1298            first_sent_time: now,
1299            is_app_limited: false,
1300            tx_in_flight: 0,
1301            lost: 0,
1302            has_data: false,
1303            pmtud: false,
1304        };
1305
1306        r.on_packet_sent(
1307            p,
1308            packet::Epoch::Application,
1309            HandshakeStatus::default(),
1310            now,
1311            "",
1312        );
1313        assert_eq!(r.epochs[packet::Epoch::Application].sent_packets.len(), 3);
1314        assert_eq!(r.bytes_in_flight, 3000);
1315
1316        let p = Sent {
1317            pkt_num: 3,
1318            frames: smallvec![],
1319            time_sent: now,
1320            time_acked: None,
1321            time_lost: None,
1322            size: 1000,
1323            ack_eliciting: true,
1324            in_flight: true,
1325            delivered: 0,
1326            delivered_time: now,
1327            first_sent_time: now,
1328            is_app_limited: false,
1329            tx_in_flight: 0,
1330            lost: 0,
1331            has_data: false,
1332            pmtud: false,
1333        };
1334
1335        r.on_packet_sent(
1336            p,
1337            packet::Epoch::Application,
1338            HandshakeStatus::default(),
1339            now,
1340            "",
1341        );
1342        assert_eq!(r.epochs[packet::Epoch::Application].sent_packets.len(), 4);
1343        assert_eq!(r.bytes_in_flight, 4000);
1344
1345        // Wait for 10ms.
1346        now += Duration::from_millis(10);
1347
1348        // Only the first 2 packets are acked.
1349        let mut acked = ranges::RangeSet::default();
1350        acked.insert(0..2);
1351
1352        assert_eq!(
1353            r.on_ack_received(
1354                &acked,
1355                25,
1356                packet::Epoch::Application,
1357                HandshakeStatus::default(),
1358                now,
1359                "",
1360            ),
1361            Ok((0, 0, 2 * 1000))
1362        );
1363
1364        assert_eq!(r.epochs[packet::Epoch::Application].sent_packets.len(), 2);
1365        assert_eq!(r.bytes_in_flight, 2000);
1366        assert_eq!(r.congestion.lost_count, 0);
1367
1368        // Wait until loss detection timer expires.
1369        now = r.loss_detection_timer().unwrap();
1370
1371        // PTO.
1372        r.on_loss_detection_timeout(HandshakeStatus::default(), now, "");
1373        assert_eq!(r.epochs[packet::Epoch::Application].loss_probes, 1);
1374        assert_eq!(r.congestion.lost_count, 0);
1375        assert_eq!(r.pto_count, 1);
1376
1377        let p = Sent {
1378            pkt_num: 4,
1379            frames: smallvec![],
1380            time_sent: now,
1381            time_acked: None,
1382            time_lost: None,
1383            size: 1000,
1384            ack_eliciting: true,
1385            in_flight: true,
1386            delivered: 0,
1387            delivered_time: now,
1388            first_sent_time: now,
1389            is_app_limited: false,
1390            tx_in_flight: 0,
1391            lost: 0,
1392            has_data: false,
1393            pmtud: false,
1394        };
1395
1396        r.on_packet_sent(
1397            p,
1398            packet::Epoch::Application,
1399            HandshakeStatus::default(),
1400            now,
1401            "",
1402        );
1403        assert_eq!(r.epochs[packet::Epoch::Application].sent_packets.len(), 3);
1404        assert_eq!(r.bytes_in_flight, 3000);
1405
1406        let p = Sent {
1407            pkt_num: 5,
1408            frames: smallvec![],
1409            time_sent: now,
1410            time_acked: None,
1411            time_lost: None,
1412            size: 1000,
1413            ack_eliciting: true,
1414            in_flight: true,
1415            delivered: 0,
1416            delivered_time: now,
1417            first_sent_time: now,
1418            is_app_limited: false,
1419            tx_in_flight: 0,
1420            lost: 0,
1421            has_data: false,
1422            pmtud: false,
1423        };
1424
1425        r.on_packet_sent(
1426            p,
1427            packet::Epoch::Application,
1428            HandshakeStatus::default(),
1429            now,
1430            "",
1431        );
1432        assert_eq!(r.epochs[packet::Epoch::Application].sent_packets.len(), 4);
1433        assert_eq!(r.bytes_in_flight, 4000);
1434        assert_eq!(r.congestion.lost_count, 0);
1435
1436        // Wait for 10ms.
1437        now += Duration::from_millis(10);
1438
1439        // PTO packets are acked.
1440        let mut acked = ranges::RangeSet::default();
1441        acked.insert(4..6);
1442
1443        assert_eq!(
1444            r.on_ack_received(
1445                &acked,
1446                25,
1447                packet::Epoch::Application,
1448                HandshakeStatus::default(),
1449                now,
1450                "",
1451            ),
1452            Ok((2, 2000, 2 * 1000))
1453        );
1454
1455        assert_eq!(r.epochs[packet::Epoch::Application].sent_packets.len(), 4);
1456        assert_eq!(r.bytes_in_flight, 0);
1457
1458        assert_eq!(r.congestion.lost_count, 2);
1459
1460        // Wait 1 RTT.
1461        now += r.rtt();
1462
1463        r.detect_lost_packets(packet::Epoch::Application, now, "");
1464
1465        assert_eq!(r.epochs[packet::Epoch::Application].sent_packets.len(), 0);
1466    }
1467
1468    #[test]
1469    fn loss_on_timer() {
1470        let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
1471        cfg.set_cc_algorithm(CongestionControlAlgorithm::Reno);
1472
1473        let mut r = Recovery::new(&cfg);
1474
1475        let mut now = Instant::now();
1476
1477        assert_eq!(r.epochs[packet::Epoch::Application].sent_packets.len(), 0);
1478
1479        // Start by sending a few packets.
1480        let p = Sent {
1481            pkt_num: 0,
1482            frames: smallvec![],
1483            time_sent: now,
1484            time_acked: None,
1485            time_lost: None,
1486            size: 1000,
1487            ack_eliciting: true,
1488            in_flight: true,
1489            delivered: 0,
1490            delivered_time: now,
1491            first_sent_time: now,
1492            is_app_limited: false,
1493            tx_in_flight: 0,
1494            lost: 0,
1495            has_data: false,
1496            pmtud: false,
1497        };
1498
1499        r.on_packet_sent(
1500            p,
1501            packet::Epoch::Application,
1502            HandshakeStatus::default(),
1503            now,
1504            "",
1505        );
1506        assert_eq!(r.epochs[packet::Epoch::Application].sent_packets.len(), 1);
1507        assert_eq!(r.bytes_in_flight, 1000);
1508
1509        let p = Sent {
1510            pkt_num: 1,
1511            frames: smallvec![],
1512            time_sent: now,
1513            time_acked: None,
1514            time_lost: None,
1515            size: 1000,
1516            ack_eliciting: true,
1517            in_flight: true,
1518            delivered: 0,
1519            delivered_time: now,
1520            first_sent_time: now,
1521            is_app_limited: false,
1522            tx_in_flight: 0,
1523            lost: 0,
1524            has_data: false,
1525            pmtud: false,
1526        };
1527
1528        r.on_packet_sent(
1529            p,
1530            packet::Epoch::Application,
1531            HandshakeStatus::default(),
1532            now,
1533            "",
1534        );
1535        assert_eq!(r.epochs[packet::Epoch::Application].sent_packets.len(), 2);
1536        assert_eq!(r.bytes_in_flight, 2000);
1537
1538        let p = Sent {
1539            pkt_num: 2,
1540            frames: smallvec![],
1541            time_sent: now,
1542            time_acked: None,
1543            time_lost: None,
1544            size: 1000,
1545            ack_eliciting: true,
1546            in_flight: true,
1547            delivered: 0,
1548            delivered_time: now,
1549            first_sent_time: now,
1550            is_app_limited: false,
1551            tx_in_flight: 0,
1552            lost: 0,
1553            has_data: false,
1554            pmtud: false,
1555        };
1556
1557        r.on_packet_sent(
1558            p,
1559            packet::Epoch::Application,
1560            HandshakeStatus::default(),
1561            now,
1562            "",
1563        );
1564        assert_eq!(r.epochs[packet::Epoch::Application].sent_packets.len(), 3);
1565        assert_eq!(r.bytes_in_flight, 3000);
1566
1567        let p = Sent {
1568            pkt_num: 3,
1569            frames: smallvec![],
1570            time_sent: now,
1571            time_acked: None,
1572            time_lost: None,
1573            size: 1000,
1574            ack_eliciting: true,
1575            in_flight: true,
1576            delivered: 0,
1577            delivered_time: now,
1578            first_sent_time: now,
1579            is_app_limited: false,
1580            tx_in_flight: 0,
1581            lost: 0,
1582            has_data: false,
1583            pmtud: false,
1584        };
1585
1586        r.on_packet_sent(
1587            p,
1588            packet::Epoch::Application,
1589            HandshakeStatus::default(),
1590            now,
1591            "",
1592        );
1593        assert_eq!(r.epochs[packet::Epoch::Application].sent_packets.len(), 4);
1594        assert_eq!(r.bytes_in_flight, 4000);
1595
1596        // Wait for 10ms.
1597        now += Duration::from_millis(10);
1598
1599        // Only the first 2 packets and the last one are acked.
1600        let mut acked = ranges::RangeSet::default();
1601        acked.insert(0..2);
1602        acked.insert(3..4);
1603
1604        assert_eq!(
1605            r.on_ack_received(
1606                &acked,
1607                25,
1608                packet::Epoch::Application,
1609                HandshakeStatus::default(),
1610                now,
1611                "",
1612            ),
1613            Ok((0, 0, 3 * 1000))
1614        );
1615
1616        assert_eq!(r.epochs[packet::Epoch::Application].sent_packets.len(), 2);
1617        assert_eq!(r.bytes_in_flight, 1000);
1618        assert_eq!(r.congestion.lost_count, 0);
1619
1620        // Wait until loss detection timer expires.
1621        now = r.loss_detection_timer().unwrap();
1622
1623        // Packet is declared lost.
1624        r.on_loss_detection_timeout(HandshakeStatus::default(), now, "");
1625        assert_eq!(r.epochs[packet::Epoch::Application].loss_probes, 0);
1626
1627        assert_eq!(r.epochs[packet::Epoch::Application].sent_packets.len(), 2);
1628        assert_eq!(r.bytes_in_flight, 0);
1629
1630        assert_eq!(r.congestion.lost_count, 1);
1631
1632        // Wait 1 RTT.
1633        now += r.rtt();
1634
1635        r.detect_lost_packets(packet::Epoch::Application, now, "");
1636
1637        assert_eq!(r.epochs[packet::Epoch::Application].sent_packets.len(), 0);
1638    }
1639
1640    #[test]
1641    fn loss_on_reordering() {
1642        let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
1643        cfg.set_cc_algorithm(CongestionControlAlgorithm::Reno);
1644
1645        let mut r = Recovery::new(&cfg);
1646
1647        let mut now = Instant::now();
1648
1649        assert_eq!(r.epochs[packet::Epoch::Application].sent_packets.len(), 0);
1650
1651        // Start by sending a few packets.
1652        let p = Sent {
1653            pkt_num: 0,
1654            frames: smallvec![],
1655            time_sent: now,
1656            time_acked: None,
1657            time_lost: None,
1658            size: 1000,
1659            ack_eliciting: true,
1660            in_flight: true,
1661            delivered: 0,
1662            delivered_time: now,
1663            first_sent_time: now,
1664            is_app_limited: false,
1665            tx_in_flight: 0,
1666            lost: 0,
1667            has_data: false,
1668            pmtud: false,
1669        };
1670
1671        r.on_packet_sent(
1672            p,
1673            packet::Epoch::Application,
1674            HandshakeStatus::default(),
1675            now,
1676            "",
1677        );
1678        assert_eq!(r.epochs[packet::Epoch::Application].sent_packets.len(), 1);
1679        assert_eq!(r.bytes_in_flight, 1000);
1680
1681        let p = Sent {
1682            pkt_num: 1,
1683            frames: smallvec![],
1684            time_sent: now,
1685            time_acked: None,
1686            time_lost: None,
1687            size: 1000,
1688            ack_eliciting: true,
1689            in_flight: true,
1690            delivered: 0,
1691            delivered_time: now,
1692            first_sent_time: now,
1693            is_app_limited: false,
1694            tx_in_flight: 0,
1695            lost: 0,
1696            has_data: false,
1697            pmtud: false,
1698        };
1699
1700        r.on_packet_sent(
1701            p,
1702            packet::Epoch::Application,
1703            HandshakeStatus::default(),
1704            now,
1705            "",
1706        );
1707        assert_eq!(r.epochs[packet::Epoch::Application].sent_packets.len(), 2);
1708        assert_eq!(r.bytes_in_flight, 2000);
1709
1710        let p = Sent {
1711            pkt_num: 2,
1712            frames: smallvec![],
1713            time_sent: now,
1714            time_acked: None,
1715            time_lost: None,
1716            size: 1000,
1717            ack_eliciting: true,
1718            in_flight: true,
1719            delivered: 0,
1720            delivered_time: now,
1721            first_sent_time: now,
1722            is_app_limited: false,
1723            tx_in_flight: 0,
1724            lost: 0,
1725            has_data: false,
1726            pmtud: false,
1727        };
1728
1729        r.on_packet_sent(
1730            p,
1731            packet::Epoch::Application,
1732            HandshakeStatus::default(),
1733            now,
1734            "",
1735        );
1736        assert_eq!(r.epochs[packet::Epoch::Application].sent_packets.len(), 3);
1737        assert_eq!(r.bytes_in_flight, 3000);
1738
1739        let p = Sent {
1740            pkt_num: 3,
1741            frames: smallvec![],
1742            time_sent: now,
1743            time_acked: None,
1744            time_lost: None,
1745            size: 1000,
1746            ack_eliciting: true,
1747            in_flight: true,
1748            delivered: 0,
1749            delivered_time: now,
1750            first_sent_time: now,
1751            is_app_limited: false,
1752            tx_in_flight: 0,
1753            lost: 0,
1754            has_data: false,
1755            pmtud: false,
1756        };
1757
1758        r.on_packet_sent(
1759            p,
1760            packet::Epoch::Application,
1761            HandshakeStatus::default(),
1762            now,
1763            "",
1764        );
1765        assert_eq!(r.epochs[packet::Epoch::Application].sent_packets.len(), 4);
1766        assert_eq!(r.bytes_in_flight, 4000);
1767
1768        // Wait for 10ms.
1769        now += Duration::from_millis(10);
1770
1771        // ACKs are reordered.
1772        let mut acked = ranges::RangeSet::default();
1773        acked.insert(2..4);
1774
1775        assert_eq!(
1776            r.on_ack_received(
1777                &acked,
1778                25,
1779                packet::Epoch::Application,
1780                HandshakeStatus::default(),
1781                now,
1782                "",
1783            ),
1784            Ok((1, 1000, 1000 * 2))
1785        );
1786
1787        now += Duration::from_millis(10);
1788
1789        let mut acked = ranges::RangeSet::default();
1790        acked.insert(0..2);
1791
1792        assert_eq!(r.pkt_thresh, INITIAL_PACKET_THRESHOLD);
1793
1794        assert_eq!(
1795            r.on_ack_received(
1796                &acked,
1797                25,
1798                packet::Epoch::Application,
1799                HandshakeStatus::default(),
1800                now,
1801                "",
1802            ),
1803            Ok((0, 0, 1000))
1804        );
1805
1806        assert_eq!(r.epochs[packet::Epoch::Application].sent_packets.len(), 0);
1807        assert_eq!(r.bytes_in_flight, 0);
1808
1809        // Spurious loss.
1810        assert_eq!(r.congestion.lost_count, 1);
1811        assert_eq!(r.lost_spurious_count, 1);
1812
1813        // Packet threshold was increased.
1814        assert_eq!(r.pkt_thresh, 4);
1815
1816        // Wait 1 RTT.
1817        now += r.rtt();
1818
1819        r.detect_lost_packets(packet::Epoch::Application, now, "");
1820
1821        assert_eq!(r.epochs[packet::Epoch::Application].sent_packets.len(), 0);
1822    }
1823
1824    #[test]
1825    fn pacing() {
1826        let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
1827        cfg.set_cc_algorithm(CongestionControlAlgorithm::CUBIC);
1828
1829        let mut r = Recovery::new(&cfg);
1830
1831        let mut now = Instant::now();
1832
1833        assert_eq!(r.epochs[packet::Epoch::Application].sent_packets.len(), 0);
1834
1835        // send out first packet (a full initcwnd).
1836        let p = Sent {
1837            pkt_num: 0,
1838            frames: smallvec![],
1839            time_sent: now,
1840            time_acked: None,
1841            time_lost: None,
1842            size: 12000,
1843            ack_eliciting: true,
1844            in_flight: true,
1845            delivered: 0,
1846            delivered_time: now,
1847            first_sent_time: now,
1848            is_app_limited: false,
1849            tx_in_flight: 0,
1850            lost: 0,
1851            has_data: false,
1852            pmtud: false,
1853        };
1854
1855        r.on_packet_sent(
1856            p,
1857            packet::Epoch::Application,
1858            HandshakeStatus::default(),
1859            now,
1860            "",
1861        );
1862
1863        assert_eq!(r.epochs[packet::Epoch::Application].sent_packets.len(), 1);
1864        assert_eq!(r.bytes_in_flight, 12000);
1865
1866        // First packet will be sent out immediately.
1867        assert_eq!(r.congestion.pacer.rate(), 0);
1868        assert_eq!(r.get_packet_send_time(), now);
1869
1870        // Wait 50ms for ACK.
1871        now += Duration::from_millis(50);
1872
1873        let mut acked = ranges::RangeSet::default();
1874        acked.insert(0..1);
1875
1876        assert_eq!(
1877            r.on_ack_received(
1878                &acked,
1879                10,
1880                packet::Epoch::Application,
1881                HandshakeStatus::default(),
1882                now,
1883                "",
1884            ),
1885            Ok((0, 0, 12000))
1886        );
1887
1888        assert_eq!(r.epochs[packet::Epoch::Application].sent_packets.len(), 0);
1889        assert_eq!(r.bytes_in_flight, 0);
1890        assert_eq!(r.rtt_stats.smoothed_rtt, Duration::from_millis(50));
1891
1892        // 1 MSS increased.
1893        assert_eq!(r.cwnd(), 12000 + 1200);
1894
1895        // Send out second packet.
1896        let p = Sent {
1897            pkt_num: 1,
1898            frames: smallvec![],
1899            time_sent: now,
1900            time_acked: None,
1901            time_lost: None,
1902            size: 6000,
1903            ack_eliciting: true,
1904            in_flight: true,
1905            delivered: 0,
1906            delivered_time: now,
1907            first_sent_time: now,
1908            is_app_limited: false,
1909            tx_in_flight: 0,
1910            lost: 0,
1911            has_data: false,
1912            pmtud: false,
1913        };
1914
1915        r.on_packet_sent(
1916            p,
1917            packet::Epoch::Application,
1918            HandshakeStatus::default(),
1919            now,
1920            "",
1921        );
1922
1923        assert_eq!(r.epochs[packet::Epoch::Application].sent_packets.len(), 1);
1924        assert_eq!(r.bytes_in_flight, 6000);
1925
1926        // Pacing is not done during initial phase of connection.
1927        assert_eq!(r.get_packet_send_time(), now);
1928
1929        // Send the third packet out.
1930        let p = Sent {
1931            pkt_num: 2,
1932            frames: smallvec![],
1933            time_sent: now,
1934            time_acked: None,
1935            time_lost: None,
1936            size: 6000,
1937            ack_eliciting: true,
1938            in_flight: true,
1939            delivered: 0,
1940            delivered_time: now,
1941            first_sent_time: now,
1942            is_app_limited: false,
1943            tx_in_flight: 0,
1944            lost: 0,
1945            has_data: false,
1946            pmtud: false,
1947        };
1948
1949        r.on_packet_sent(
1950            p,
1951            packet::Epoch::Application,
1952            HandshakeStatus::default(),
1953            now,
1954            "",
1955        );
1956
1957        assert_eq!(r.epochs[packet::Epoch::Application].sent_packets.len(), 2);
1958        assert_eq!(r.bytes_in_flight, 12000);
1959
1960        // Send the third packet out.
1961        let p = Sent {
1962            pkt_num: 3,
1963            frames: smallvec![],
1964            time_sent: now,
1965            time_acked: None,
1966            time_lost: None,
1967            size: 1000,
1968            ack_eliciting: true,
1969            in_flight: true,
1970            delivered: 0,
1971            delivered_time: now,
1972            first_sent_time: now,
1973            is_app_limited: false,
1974            tx_in_flight: 0,
1975            lost: 0,
1976            has_data: false,
1977            pmtud: false,
1978        };
1979
1980        r.on_packet_sent(
1981            p,
1982            packet::Epoch::Application,
1983            HandshakeStatus::default(),
1984            now,
1985            "",
1986        );
1987
1988        assert_eq!(r.epochs[packet::Epoch::Application].sent_packets.len(), 3);
1989        assert_eq!(r.bytes_in_flight, 13000);
1990
1991        // We pace this outgoing packet. as all conditions for pacing
1992        // are passed.
1993        let pacing_rate =
1994            (r.cwnd() as f64 * congestion::PACING_MULTIPLIER / 0.05) as u64;
1995        assert_eq!(r.congestion.pacer.rate(), pacing_rate);
1996
1997        assert_eq!(
1998            r.get_packet_send_time(),
1999            now + Duration::from_secs_f64(12000.0 / pacing_rate as f64)
2000        );
2001    }
2002
2003    #[test]
2004    fn pmtud_loss_on_timer() {
2005        let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
2006        cfg.set_cc_algorithm(CongestionControlAlgorithm::Reno);
2007
2008        let mut r = Recovery::new(&cfg);
2009
2010        let mut now = Instant::now();
2011
2012        assert_eq!(r.epochs[packet::Epoch::Application].sent_packets.len(), 0);
2013
2014        // Start by sending a few packets.
2015        let p = Sent {
2016            pkt_num: 0,
2017            frames: smallvec![],
2018            time_sent: now,
2019            time_acked: None,
2020            time_lost: None,
2021            size: 1000,
2022            ack_eliciting: true,
2023            in_flight: true,
2024            delivered: 0,
2025            delivered_time: now,
2026            first_sent_time: now,
2027            is_app_limited: false,
2028            tx_in_flight: 0,
2029            lost: 0,
2030            has_data: false,
2031            pmtud: false,
2032        };
2033
2034        r.on_packet_sent(
2035            p,
2036            packet::Epoch::Application,
2037            HandshakeStatus::default(),
2038            now,
2039            "",
2040        );
2041
2042        assert_eq!(r.epochs[packet::Epoch::Application].in_flight_count, 1);
2043        assert_eq!(r.epochs[packet::Epoch::Application].sent_packets.len(), 1);
2044        assert_eq!(r.bytes_in_flight, 1000);
2045
2046        let p = Sent {
2047            pkt_num: 1,
2048            frames: smallvec![],
2049            time_sent: now,
2050            time_acked: None,
2051            time_lost: None,
2052            size: 1000,
2053            ack_eliciting: true,
2054            in_flight: true,
2055            delivered: 0,
2056            delivered_time: now,
2057            first_sent_time: now,
2058            is_app_limited: false,
2059            tx_in_flight: 0,
2060            lost: 0,
2061            has_data: false,
2062            pmtud: true,
2063        };
2064
2065        r.on_packet_sent(
2066            p,
2067            packet::Epoch::Application,
2068            HandshakeStatus::default(),
2069            now,
2070            "",
2071        );
2072
2073        assert_eq!(r.epochs[packet::Epoch::Application].in_flight_count, 2);
2074
2075        let p = Sent {
2076            pkt_num: 2,
2077            frames: smallvec![],
2078            time_sent: now,
2079            time_acked: None,
2080            time_lost: None,
2081            size: 1000,
2082            ack_eliciting: true,
2083            in_flight: true,
2084            delivered: 0,
2085            delivered_time: now,
2086            first_sent_time: now,
2087            is_app_limited: false,
2088            tx_in_flight: 0,
2089            lost: 0,
2090            has_data: false,
2091            pmtud: false,
2092        };
2093
2094        r.on_packet_sent(
2095            p,
2096            packet::Epoch::Application,
2097            HandshakeStatus::default(),
2098            now,
2099            "",
2100        );
2101
2102        assert_eq!(r.epochs[packet::Epoch::Application].in_flight_count, 3);
2103
2104        // Wait for 10ms.
2105        now += Duration::from_millis(10);
2106
2107        // Only the first  packets and the last one are acked.
2108        let mut acked = ranges::RangeSet::default();
2109        acked.insert(0..1);
2110        acked.insert(2..3);
2111
2112        assert_eq!(
2113            r.on_ack_received(
2114                &acked,
2115                25,
2116                packet::Epoch::Application,
2117                HandshakeStatus::default(),
2118                now,
2119                "",
2120            ),
2121            Ok((0, 0, 2 * 1000))
2122        );
2123
2124        assert_eq!(r.epochs[packet::Epoch::Application].sent_packets.len(), 2);
2125        assert_eq!(r.bytes_in_flight, 1000);
2126        assert_eq!(r.congestion.lost_count, 0);
2127
2128        // Wait until loss detection timer expires.
2129        now = r.loss_detection_timer().unwrap();
2130
2131        // Packet is declared lost.
2132        r.on_loss_detection_timeout(HandshakeStatus::default(), now, "");
2133        assert_eq!(r.epochs[packet::Epoch::Application].loss_probes, 0);
2134
2135        assert_eq!(r.epochs[packet::Epoch::Application].sent_packets.len(), 2);
2136        assert_eq!(r.epochs[packet::Epoch::Application].in_flight_count, 0);
2137        assert_eq!(r.bytes_in_flight, 0);
2138        assert_eq!(r.cwnd(), 12000);
2139
2140        assert_eq!(r.congestion.lost_count, 0);
2141
2142        // Wait 1 RTT.
2143        now += r.rtt();
2144
2145        r.detect_lost_packets(packet::Epoch::Application, now, "");
2146
2147        assert_eq!(r.epochs[packet::Epoch::Application].sent_packets.len(), 0);
2148        assert_eq!(r.epochs[packet::Epoch::Application].in_flight_count, 0);
2149        assert_eq!(r.bytes_in_flight, 0);
2150        assert_eq!(r.congestion.lost_count, 0);
2151    }
2152}
2153
2154pub mod congestion;
2155mod rtt;