1use std::cmp;
28
29use std::time::Duration;
30use std::time::Instant;
31
32use std::collections::VecDeque;
33
34use crate::packet::Epoch;
35use crate::ranges::RangeSet;
36use crate::Config;
37use crate::CongestionControlAlgorithm;
38use crate::Result;
39
40use crate::frame;
41use crate::packet;
42use crate::ranges;
43
44#[cfg(feature = "qlog")]
45use qlog::events::EventData;
46
47use smallvec::SmallVec;
48
49use self::congestion::pacer;
50use self::congestion::Congestion;
51use self::rtt::RttStats;
52
53const INITIAL_PACKET_THRESHOLD: u64 = 3;
55
56const MAX_PACKET_THRESHOLD: u64 = 20;
57
58const INITIAL_TIME_THRESHOLD: f64 = 9.0 / 8.0;
59
60const GRANULARITY: Duration = Duration::from_millis(1);
61
62const MAX_PTO_PROBES_COUNT: usize = 2;
63
64const MINIMUM_WINDOW_PACKETS: usize = 2;
65
66const LOSS_REDUCTION_FACTOR: f64 = 0.5;
67
68pub(super) const MAX_OUTSTANDING_NON_ACK_ELICITING: usize = 24;
71
72#[derive(Default)]
73struct RecoveryEpoch {
74 time_of_last_ack_eliciting_packet: Option<Instant>,
76
77 largest_acked_packet: Option<u64>,
80
81 loss_time: Option<Instant>,
84
85 sent_packets: VecDeque<Sent>,
88
89 loss_probes: usize,
90 in_flight_count: usize,
91
92 acked_frames: Vec<frame::Frame>,
93 lost_frames: Vec<frame::Frame>,
94}
95
96struct AckedDetectionResult {
97 acked_bytes: usize,
98 spurious_losses: usize,
99 spurious_pkt_thresh: Option<u64>,
100 has_ack_eliciting: bool,
101 has_in_flight_spurious_loss: bool,
102}
103
104struct LossDetectionResult {
105 largest_lost_pkt: Option<Sent>,
106 lost_packets: usize,
107 lost_bytes: usize,
108 pmtud_lost_bytes: usize,
109}
110
111impl RecoveryEpoch {
112 fn detect_and_remove_acked_packets(
113 &mut self, now: Instant, acked: &RangeSet, newly_acked: &mut Vec<Acked>,
114 rtt_stats: &RttStats, trace_id: &str,
115 ) -> AckedDetectionResult {
116 newly_acked.clear();
117
118 let mut acked_bytes = 0;
119 let mut spurious_losses = 0;
120 let mut spurious_pkt_thresh = None;
121 let mut has_ack_eliciting = false;
122 let mut has_in_flight_spurious_loss = false;
123
124 let largest_acked = self.largest_acked_packet.unwrap();
125
126 for ack in acked.iter() {
127 let start = if self
130 .sent_packets
131 .front()
132 .filter(|e| e.pkt_num >= ack.start)
133 .is_some()
134 {
135 0
137 } else {
138 self.sent_packets
139 .binary_search_by_key(&ack.start, |p| p.pkt_num)
140 .unwrap_or_else(|e| e)
141 };
142
143 for unacked in self.sent_packets.range_mut(start..) {
144 if unacked.pkt_num >= ack.end {
145 break;
146 }
147
148 if unacked.time_acked.is_some() {
149 } else if unacked.time_lost.is_some() {
151 spurious_losses += 1;
153 spurious_pkt_thresh
154 .get_or_insert(largest_acked - unacked.pkt_num + 1);
155 unacked.time_acked = Some(now);
156
157 if unacked.in_flight {
158 has_in_flight_spurious_loss = true;
159 }
160 } else {
161 if unacked.in_flight {
162 self.in_flight_count -= 1;
163 acked_bytes += unacked.size;
164 }
165
166 newly_acked.push(Acked {
167 pkt_num: unacked.pkt_num,
168 time_sent: unacked.time_sent,
169 size: unacked.size,
170
171 rtt: now.saturating_duration_since(unacked.time_sent),
172 delivered: unacked.delivered,
173 delivered_time: unacked.delivered_time,
174 first_sent_time: unacked.first_sent_time,
175 is_app_limited: unacked.is_app_limited,
176 });
177
178 trace!("{} packet newly acked {}", trace_id, unacked.pkt_num);
179
180 self.acked_frames
181 .extend(std::mem::take(&mut unacked.frames));
182
183 has_ack_eliciting |= unacked.ack_eliciting;
184 unacked.time_acked = Some(now);
185 }
186 }
187 }
188
189 self.drain_acked_and_lost_packets(now - rtt_stats.rtt());
190
191 AckedDetectionResult {
192 acked_bytes,
193 spurious_losses,
194 spurious_pkt_thresh,
195 has_ack_eliciting,
196 has_in_flight_spurious_loss,
197 }
198 }
199
200 fn detect_lost_packets(
201 &mut self, loss_delay: Duration, pkt_thresh: u64, now: Instant,
202 trace_id: &str, epoch: Epoch,
203 ) -> LossDetectionResult {
204 self.loss_time = None;
205
206 let loss_delay = cmp::max(loss_delay, GRANULARITY);
208 let largest_acked = self.largest_acked_packet.unwrap_or(0);
209
210 let lost_send_time = now.checked_sub(loss_delay).unwrap();
212
213 let mut lost_packets = 0;
214 let mut lost_bytes = 0;
215 let mut pmtud_lost_bytes = 0;
216
217 let mut largest_lost_pkt = None;
218
219 let unacked_iter = self.sent_packets
220 .iter_mut()
221 .take_while(|p| p.pkt_num <= largest_acked)
223 .filter(|p| p.time_acked.is_none() && p.time_lost.is_none());
225
226 for unacked in unacked_iter {
227 if unacked.time_sent <= lost_send_time ||
229 largest_acked >= unacked.pkt_num + pkt_thresh
230 {
231 self.lost_frames.extend(unacked.frames.drain(..));
232
233 unacked.time_lost = Some(now);
234
235 if unacked.pmtud {
236 pmtud_lost_bytes += unacked.size;
237 self.in_flight_count -= 1;
238
239 continue;
241 }
242
243 if unacked.in_flight {
244 lost_bytes += unacked.size;
245
246 largest_lost_pkt = Some(unacked.clone());
249
250 self.in_flight_count -= 1;
251
252 trace!(
253 "{} packet {} lost on epoch {}",
254 trace_id,
255 unacked.pkt_num,
256 epoch
257 );
258 }
259
260 lost_packets += 1;
261 } else {
262 let loss_time = match self.loss_time {
263 None => unacked.time_sent + loss_delay,
264
265 Some(loss_time) =>
266 cmp::min(loss_time, unacked.time_sent + loss_delay),
267 };
268
269 self.loss_time = Some(loss_time);
270 break;
271 }
272 }
273
274 LossDetectionResult {
275 largest_lost_pkt,
276 lost_packets,
277 lost_bytes,
278 pmtud_lost_bytes,
279 }
280 }
281
282 fn drain_acked_and_lost_packets(&mut self, loss_thresh: Instant) {
283 while let Some(pkt) = self.sent_packets.front() {
292 if let Some(time_lost) = pkt.time_lost {
293 if time_lost > loss_thresh {
294 break;
295 }
296 }
297
298 if pkt.time_acked.is_none() && pkt.time_lost.is_none() {
299 break;
300 }
301
302 self.sent_packets.pop_front();
303 }
304 }
305}
306
307#[derive(Default)]
308struct LossDetectionTimer {
309 time: Option<Instant>,
310}
311
312impl LossDetectionTimer {
313 fn update(&mut self, timeout: Instant) {
314 self.time = Some(timeout);
315 }
316
317 fn clear(&mut self) {
318 self.time = None;
319 }
320}
321pub struct Recovery {
322 epochs: [RecoveryEpoch; packet::Epoch::count()],
323
324 loss_timer: LossDetectionTimer,
325
326 pto_count: u32,
327
328 rtt_stats: RttStats,
329
330 pub lost_spurious_count: usize,
331
332 pkt_thresh: u64,
333
334 time_thresh: f64,
335
336 bytes_in_flight: usize,
337
338 bytes_sent: usize,
339
340 pub bytes_lost: u64,
341
342 max_datagram_size: usize,
343
344 #[cfg(feature = "qlog")]
345 qlog_metrics: QlogMetrics,
346
347 outstanding_non_ack_eliciting: usize,
349
350 congestion: Congestion,
351
352 newly_acked: Vec<Acked>,
354}
355
356#[derive(Clone, Copy, Eq, PartialEq)]
357pub struct RecoveryConfig {
358 pub max_send_udp_payload_size: usize,
359 pub max_ack_delay: Duration,
360 pub cc_algorithm: CongestionControlAlgorithm,
361 pub hystart: bool,
362 pub pacing: bool,
363 pub max_pacing_rate: Option<u64>,
364 pub initial_congestion_window_packets: usize,
365}
366
367impl RecoveryConfig {
368 pub fn from_config(config: &Config) -> Self {
369 Self {
370 max_send_udp_payload_size: config.max_send_udp_payload_size,
371 max_ack_delay: Duration::ZERO,
372 cc_algorithm: config.cc_algorithm,
373 hystart: config.hystart,
374 pacing: config.pacing,
375 max_pacing_rate: config.max_pacing_rate,
376 initial_congestion_window_packets: config
377 .initial_congestion_window_packets,
378 }
379 }
380}
381
382impl Recovery {
383 pub fn new_with_config(recovery_config: &RecoveryConfig) -> Self {
384 Recovery {
385 epochs: Default::default(),
386
387 loss_timer: Default::default(),
388
389 pto_count: 0,
390
391 rtt_stats: RttStats::new(recovery_config.max_ack_delay),
392
393 lost_spurious_count: 0,
394
395 pkt_thresh: INITIAL_PACKET_THRESHOLD,
396
397 time_thresh: INITIAL_TIME_THRESHOLD,
398
399 bytes_in_flight: 0,
400
401 bytes_sent: 0,
402
403 bytes_lost: 0,
404
405 max_datagram_size: recovery_config.max_send_udp_payload_size,
406
407 #[cfg(feature = "qlog")]
408 qlog_metrics: QlogMetrics::default(),
409
410 outstanding_non_ack_eliciting: 0,
411
412 congestion: Congestion::from_config(recovery_config),
413
414 newly_acked: Vec::new(),
415 }
416 }
417
418 #[cfg(test)]
419 pub fn new(config: &Config) -> Self {
420 Self::new_with_config(&RecoveryConfig::from_config(config))
421 }
422
423 pub fn should_elicit_ack(&self, epoch: packet::Epoch) -> bool {
426 self.epochs[epoch].loss_probes > 0 ||
427 self.outstanding_non_ack_eliciting >=
428 MAX_OUTSTANDING_NON_ACK_ELICITING
429 }
430
431 pub fn get_acked_frames(
432 &mut self, epoch: packet::Epoch,
433 ) -> impl Iterator<Item = frame::Frame> + '_ {
434 self.epochs[epoch].acked_frames.drain(..)
435 }
436
437 pub fn get_lost_frames(
438 &mut self, epoch: packet::Epoch,
439 ) -> impl Iterator<Item = frame::Frame> + '_ {
440 self.epochs[epoch].lost_frames.drain(..)
441 }
442
443 pub fn get_largest_acked_on_epoch(
444 &self, epoch: packet::Epoch,
445 ) -> Option<u64> {
446 self.epochs[epoch].largest_acked_packet
447 }
448
449 pub fn has_lost_frames(&self, epoch: packet::Epoch) -> bool {
450 !self.epochs[epoch].lost_frames.is_empty()
451 }
452
453 pub fn loss_probes(&self, epoch: packet::Epoch) -> usize {
454 self.epochs[epoch].loss_probes
455 }
456
457 #[cfg(test)]
458 pub fn inc_loss_probes(&mut self, epoch: packet::Epoch) {
459 self.epochs[epoch].loss_probes += 1;
460 }
461
462 pub fn ping_sent(&mut self, epoch: packet::Epoch) {
463 self.epochs[epoch].loss_probes =
464 self.epochs[epoch].loss_probes.saturating_sub(1);
465 }
466
467 pub fn on_packet_sent(
468 &mut self, mut pkt: Sent, epoch: packet::Epoch,
469 handshake_status: HandshakeStatus, now: Instant, trace_id: &str,
470 ) {
471 let ack_eliciting = pkt.ack_eliciting;
472 let in_flight = pkt.in_flight;
473 let sent_bytes = pkt.size;
474
475 if ack_eliciting {
476 self.outstanding_non_ack_eliciting = 0;
477 } else {
478 self.outstanding_non_ack_eliciting += 1;
479 }
480
481 if in_flight && ack_eliciting {
482 self.epochs[epoch].time_of_last_ack_eliciting_packet = Some(now);
483 }
484
485 self.congestion.on_packet_sent(
486 self.bytes_in_flight,
487 sent_bytes,
488 now,
489 &mut pkt,
490 &self.rtt_stats,
491 self.bytes_lost,
492 in_flight,
493 );
494
495 if in_flight {
496 self.epochs[epoch].in_flight_count += 1;
497 self.bytes_in_flight += sent_bytes;
498
499 self.set_loss_detection_timer(handshake_status, now);
500 }
501
502 self.bytes_sent += sent_bytes;
503
504 self.epochs[epoch].sent_packets.push_back(pkt);
505
506 trace!("{} {:?}", trace_id, self);
507 }
508
509 pub fn get_packet_send_time(&self) -> Instant {
510 self.congestion.get_packet_send_time()
511 }
512
513 #[allow(clippy::too_many_arguments)]
514 pub fn on_ack_received(
515 &mut self, ranges: &ranges::RangeSet, ack_delay: u64,
516 epoch: packet::Epoch, handshake_status: HandshakeStatus, now: Instant,
517 trace_id: &str,
518 ) -> Result<(usize, usize, usize)> {
519 let largest_acked = ranges.last().unwrap();
520
521 let largest_acked = self.epochs[epoch]
523 .largest_acked_packet
524 .unwrap_or(0)
525 .max(largest_acked);
526
527 self.epochs[epoch].largest_acked_packet = Some(largest_acked);
528
529 let AckedDetectionResult {
530 acked_bytes,
531 spurious_losses,
532 spurious_pkt_thresh,
533 has_ack_eliciting,
534 has_in_flight_spurious_loss,
535 } = self.epochs[epoch].detect_and_remove_acked_packets(
536 now,
537 ranges,
538 &mut self.newly_acked,
539 &self.rtt_stats,
540 trace_id,
541 );
542
543 self.lost_spurious_count += spurious_losses;
544 if let Some(thresh) = spurious_pkt_thresh {
545 self.pkt_thresh =
546 self.pkt_thresh.max(thresh.min(MAX_PACKET_THRESHOLD));
547 }
548
549 if has_in_flight_spurious_loss {
551 (self.congestion.cc_ops.rollback)(&mut self.congestion);
552 }
553
554 if self.newly_acked.is_empty() {
555 return Ok((0, 0, 0));
556 }
557
558 let largest_newly_acked = self.newly_acked.last().unwrap();
560
561 if largest_newly_acked.pkt_num == largest_acked && has_ack_eliciting {
562 let latest_rtt = now - largest_newly_acked.time_sent;
563 self.rtt_stats.update_rtt(
564 latest_rtt,
565 Duration::from_micros(ack_delay),
566 now,
567 handshake_status.completed,
568 );
569 }
570
571 let loss = self.detect_lost_packets(epoch, now, trace_id);
574
575 self.congestion.on_packets_acked(
576 self.bytes_in_flight,
577 &mut self.newly_acked,
578 &self.rtt_stats,
579 now,
580 );
581
582 self.bytes_in_flight -= acked_bytes;
583
584 self.pto_count = 0;
585
586 self.set_loss_detection_timer(handshake_status, now);
587
588 self.epochs[epoch]
589 .drain_acked_and_lost_packets(now - self.rtt_stats.rtt());
590
591 Ok((loss.0, loss.1, acked_bytes))
592 }
593
594 pub fn on_loss_detection_timeout(
595 &mut self, handshake_status: HandshakeStatus, now: Instant,
596 trace_id: &str,
597 ) -> (usize, usize) {
598 let (earliest_loss_time, epoch) = self.loss_time_and_space();
599
600 if earliest_loss_time.is_some() {
601 let loss = self.detect_lost_packets(epoch, now, trace_id);
603
604 self.set_loss_detection_timer(handshake_status, now);
605
606 trace!("{} {:?}", trace_id, self);
607 return loss;
608 }
609
610 let epoch = if self.bytes_in_flight > 0 {
611 let (_, e) = self.pto_time_and_space(handshake_status, now);
614
615 e
616 } else {
617 if handshake_status.has_handshake_keys {
621 packet::Epoch::Handshake
622 } else {
623 packet::Epoch::Initial
624 }
625 };
626
627 self.pto_count += 1;
628
629 let epoch = &mut self.epochs[epoch];
630
631 epoch.loss_probes =
632 cmp::min(self.pto_count as usize, MAX_PTO_PROBES_COUNT);
633
634 let unacked_iter = epoch.sent_packets
635 .iter_mut()
636 .filter(|p| p.has_data && p.time_acked.is_none() && p.time_lost.is_none())
639 .take(epoch.loss_probes);
642
643 for unacked in unacked_iter {
651 epoch.lost_frames.extend_from_slice(&unacked.frames);
652 }
653
654 self.set_loss_detection_timer(handshake_status, now);
655
656 trace!("{} {:?}", trace_id, self);
657
658 (0, 0)
659 }
660
661 pub fn on_pkt_num_space_discarded(
662 &mut self, epoch: packet::Epoch, handshake_status: HandshakeStatus,
663 now: Instant,
664 ) {
665 let epoch = &mut self.epochs[epoch];
666
667 let unacked_bytes = epoch
668 .sent_packets
669 .iter()
670 .filter(|p| {
671 p.in_flight && p.time_acked.is_none() && p.time_lost.is_none()
672 })
673 .fold(0, |acc, p| acc + p.size);
674
675 self.bytes_in_flight -= unacked_bytes;
676
677 epoch.sent_packets.clear();
678 epoch.lost_frames.clear();
679 epoch.acked_frames.clear();
680
681 epoch.time_of_last_ack_eliciting_packet = None;
682 epoch.loss_time = None;
683 epoch.loss_probes = 0;
684 epoch.in_flight_count = 0;
685
686 self.set_loss_detection_timer(handshake_status, now);
687 }
688
689 pub fn on_path_change(
690 &mut self, epoch: packet::Epoch, now: Instant, trace_id: &str,
691 ) -> (usize, usize) {
692 self.detect_lost_packets(epoch, now, trace_id)
694 }
695
696 pub fn loss_detection_timer(&self) -> Option<Instant> {
697 self.loss_timer.time
698 }
699
700 pub fn cwnd(&self) -> usize {
701 self.congestion.congestion_window()
702 }
703
704 pub fn cwnd_available(&self) -> usize {
705 if self.epochs.iter().any(|e| e.loss_probes > 0) {
707 return usize::MAX;
708 }
709
710 self.cwnd().saturating_sub(self.bytes_in_flight) +
712 self.congestion.prr.snd_cnt
713 }
714
715 pub fn rtt(&self) -> Duration {
716 self.rtt_stats.rtt()
717 }
718
719 pub fn min_rtt(&self) -> Option<Duration> {
720 self.rtt_stats.min_rtt()
721 }
722
723 pub fn rttvar(&self) -> Duration {
724 self.rtt_stats.rttvar
725 }
726
727 pub fn pto(&self) -> Duration {
728 self.rtt() + cmp::max(self.rtt_stats.rttvar * 4, GRANULARITY)
729 }
730
731 pub fn delivery_rate(&self) -> u64 {
732 self.congestion.delivery_rate()
733 }
734
735 pub fn max_datagram_size(&self) -> usize {
736 self.max_datagram_size
737 }
738
739 pub fn pmtud_update_max_datagram_size(
740 &mut self, new_max_datagram_size: usize,
741 ) {
742 if self.cwnd() ==
745 self.max_datagram_size *
746 self.congestion.initial_congestion_window_packets
747 {
748 self.congestion.congestion_window = new_max_datagram_size *
749 self.congestion.initial_congestion_window_packets;
750 }
751
752 self.congestion.pacer = pacer::Pacer::new(
753 self.congestion.pacer.enabled(),
754 self.cwnd(),
755 0,
756 new_max_datagram_size,
757 self.congestion.pacer.max_pacing_rate(),
758 );
759
760 self.max_datagram_size = new_max_datagram_size;
761 }
762
763 pub fn update_max_datagram_size(&mut self, new_max_datagram_size: usize) {
764 self.pmtud_update_max_datagram_size(
765 self.max_datagram_size.min(new_max_datagram_size),
766 )
767 }
768
769 fn loss_time_and_space(&self) -> (Option<Instant>, packet::Epoch) {
770 let mut epoch = packet::Epoch::Initial;
771 let mut time = self.epochs[epoch].loss_time;
772
773 for e in [packet::Epoch::Handshake, packet::Epoch::Application] {
775 let new_time = self.epochs[e].loss_time;
776 if time.is_none() || new_time < time {
777 time = new_time;
778 epoch = e;
779 }
780 }
781
782 (time, epoch)
783 }
784
785 fn pto_time_and_space(
786 &self, handshake_status: HandshakeStatus, now: Instant,
787 ) -> (Option<Instant>, packet::Epoch) {
788 let mut duration = self.pto() * 2_u32.pow(self.pto_count);
789
790 if self.bytes_in_flight == 0 {
792 if handshake_status.has_handshake_keys {
793 return (Some(now + duration), packet::Epoch::Handshake);
794 } else {
795 return (Some(now + duration), packet::Epoch::Initial);
796 }
797 }
798
799 let mut pto_timeout = None;
800 let mut pto_space = packet::Epoch::Initial;
801
802 for e in [
804 packet::Epoch::Initial,
805 packet::Epoch::Handshake,
806 packet::Epoch::Application,
807 ] {
808 let epoch = &self.epochs[e];
809 if epoch.in_flight_count == 0 {
810 continue;
811 }
812
813 if e == packet::Epoch::Application {
814 if !handshake_status.completed {
816 return (pto_timeout, pto_space);
817 }
818
819 duration +=
821 self.rtt_stats.max_ack_delay * 2_u32.pow(self.pto_count);
822 }
823
824 let new_time = epoch
825 .time_of_last_ack_eliciting_packet
826 .map(|t| t + duration);
827
828 if pto_timeout.is_none() || new_time < pto_timeout {
829 pto_timeout = new_time;
830 pto_space = e;
831 }
832 }
833
834 (pto_timeout, pto_space)
835 }
836
837 fn set_loss_detection_timer(
838 &mut self, handshake_status: HandshakeStatus, now: Instant,
839 ) {
840 let (earliest_loss_time, _) = self.loss_time_and_space();
841
842 if let Some(to) = earliest_loss_time {
843 self.loss_timer.update(to);
845 return;
846 }
847
848 if self.bytes_in_flight == 0 && handshake_status.peer_verified_address {
849 self.loss_timer.clear();
850 return;
851 }
852
853 if let (Some(timeout), _) = self.pto_time_and_space(handshake_status, now)
855 {
856 self.loss_timer.update(timeout);
857 }
858 }
859
860 fn detect_lost_packets(
861 &mut self, epoch: packet::Epoch, now: Instant, trace_id: &str,
862 ) -> (usize, usize) {
863 let loss_delay = cmp::max(self.rtt_stats.latest_rtt, self.rtt())
864 .mul_f64(self.time_thresh);
865
866 let loss = self.epochs[epoch].detect_lost_packets(
867 loss_delay,
868 self.pkt_thresh,
869 now,
870 trace_id,
871 epoch,
872 );
873
874 if let Some(pkt) = loss.largest_lost_pkt {
875 if !self.congestion.in_congestion_recovery(pkt.time_sent) {
876 (self.congestion.cc_ops.checkpoint)(&mut self.congestion);
877 }
878
879 (self.congestion.cc_ops.congestion_event)(
880 &mut self.congestion,
881 self.bytes_in_flight,
882 loss.lost_bytes,
883 &pkt,
884 now,
885 );
886
887 self.bytes_in_flight -= loss.lost_bytes;
888 };
889
890 self.bytes_in_flight -= loss.pmtud_lost_bytes;
891
892 self.epochs[epoch]
893 .drain_acked_and_lost_packets(now - self.rtt_stats.rtt());
894
895 self.congestion.lost_count += loss.lost_packets;
896
897 (loss.lost_packets, loss.lost_bytes)
898 }
899
900 pub fn update_app_limited(&mut self, v: bool) {
901 self.congestion.app_limited = v;
902 }
903
904 #[cfg(test)]
905 pub fn app_limited(&self) -> bool {
906 self.congestion.app_limited
907 }
908
909 pub fn delivery_rate_update_app_limited(&mut self, v: bool) {
910 self.congestion.delivery_rate.update_app_limited(v);
911 }
912
913 pub fn update_max_ack_delay(&mut self, max_ack_delay: Duration) {
914 self.rtt_stats.max_ack_delay = max_ack_delay;
915 }
916
917 #[cfg(feature = "qlog")]
918 pub fn maybe_qlog(&mut self) -> Option<EventData> {
919 let qlog_metrics = QlogMetrics {
920 min_rtt: *self.rtt_stats.min_rtt,
921 smoothed_rtt: self.rtt(),
922 latest_rtt: self.rtt_stats.latest_rtt,
923 rttvar: self.rtt_stats.rttvar,
924 cwnd: self.cwnd() as u64,
925 bytes_in_flight: self.bytes_in_flight as u64,
926 ssthresh: self.congestion.ssthresh as u64,
927 pacing_rate: self.congestion.pacer.rate(),
928 };
929
930 self.qlog_metrics.maybe_update(qlog_metrics)
931 }
932
933 pub fn send_quantum(&self) -> usize {
934 self.congestion.send_quantum()
935 }
936
937 pub fn lost_count(&self) -> usize {
938 self.congestion.lost_count
939 }
940}
941
942impl std::fmt::Debug for Recovery {
943 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
944 match self.loss_timer.time {
945 Some(v) => {
946 let now = Instant::now();
947
948 if v > now {
949 let d = v.duration_since(now);
950 write!(f, "timer={d:?} ")?;
951 } else {
952 write!(f, "timer=exp ")?;
953 }
954 },
955
956 None => {
957 write!(f, "timer=none ")?;
958 },
959 };
960
961 write!(f, "latest_rtt={:?} ", self.rtt_stats.latest_rtt)?;
962 write!(f, "srtt={:?} ", self.rtt_stats.smoothed_rtt)?;
963 write!(f, "min_rtt={:?} ", *self.rtt_stats.min_rtt)?;
964 write!(f, "rttvar={:?} ", self.rtt_stats.rttvar)?;
965 write!(f, "cwnd={} ", self.cwnd())?;
966 write!(f, "ssthresh={} ", self.congestion.ssthresh)?;
967 write!(f, "bytes_in_flight={} ", self.bytes_in_flight)?;
968 write!(f, "app_limited={} ", self.congestion.app_limited)?;
969 write!(
970 f,
971 "congestion_recovery_start_time={:?} ",
972 self.congestion.congestion_recovery_start_time
973 )?;
974 write!(f, "{:?} ", self.congestion.delivery_rate)?;
975 write!(f, "pacer={:?} ", self.congestion.pacer)?;
976
977 if self.congestion.hystart.enabled() {
978 write!(f, "hystart={:?} ", self.congestion.hystart)?;
979 }
980
981 (self.congestion.cc_ops.debug_fmt)(&self.congestion, f)?;
983
984 Ok(())
985 }
986}
987
988#[derive(Clone)]
989pub struct Sent {
990 pub pkt_num: u64,
991
992 pub frames: SmallVec<[frame::Frame; 1]>,
993
994 pub time_sent: Instant,
995
996 pub time_acked: Option<Instant>,
997
998 pub time_lost: Option<Instant>,
999
1000 pub size: usize,
1001
1002 pub ack_eliciting: bool,
1003
1004 pub in_flight: bool,
1005
1006 pub delivered: usize,
1007
1008 pub delivered_time: Instant,
1009
1010 pub first_sent_time: Instant,
1011
1012 pub is_app_limited: bool,
1013
1014 pub tx_in_flight: usize,
1015
1016 pub lost: u64,
1017
1018 pub has_data: bool,
1019
1020 pub pmtud: bool,
1021}
1022
1023impl std::fmt::Debug for Sent {
1024 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
1025 write!(f, "pkt_num={:?} ", self.pkt_num)?;
1026 write!(f, "pkt_sent_time={:?} ", self.time_sent)?;
1027 write!(f, "pkt_size={:?} ", self.size)?;
1028 write!(f, "delivered={:?} ", self.delivered)?;
1029 write!(f, "delivered_time={:?} ", self.delivered_time)?;
1030 write!(f, "first_sent_time={:?} ", self.first_sent_time)?;
1031 write!(f, "is_app_limited={} ", self.is_app_limited)?;
1032 write!(f, "tx_in_flight={} ", self.tx_in_flight)?;
1033 write!(f, "lost={} ", self.lost)?;
1034 write!(f, "has_data={} ", self.has_data)?;
1035 write!(f, "pmtud={}", self.pmtud)?;
1036
1037 Ok(())
1038 }
1039}
1040
1041#[derive(Clone)]
1042pub struct Acked {
1043 pub pkt_num: u64,
1044
1045 pub time_sent: Instant,
1046
1047 pub size: usize,
1048
1049 pub rtt: Duration,
1050
1051 pub delivered: usize,
1052
1053 pub delivered_time: Instant,
1054
1055 pub first_sent_time: Instant,
1056
1057 pub is_app_limited: bool,
1058}
1059
1060#[derive(Clone, Copy, Debug)]
1061pub struct HandshakeStatus {
1062 pub has_handshake_keys: bool,
1063
1064 pub peer_verified_address: bool,
1065
1066 pub completed: bool,
1067}
1068
1069#[cfg(test)]
1070impl Default for HandshakeStatus {
1071 fn default() -> HandshakeStatus {
1072 HandshakeStatus {
1073 has_handshake_keys: true,
1074
1075 peer_verified_address: true,
1076
1077 completed: true,
1078 }
1079 }
1080}
1081
1082#[derive(Default)]
1087#[cfg(feature = "qlog")]
1088struct QlogMetrics {
1089 min_rtt: Duration,
1090 smoothed_rtt: Duration,
1091 latest_rtt: Duration,
1092 rttvar: Duration,
1093 cwnd: u64,
1094 bytes_in_flight: u64,
1095 ssthresh: u64,
1096 pacing_rate: u64,
1097}
1098
1099#[cfg(feature = "qlog")]
1100impl QlogMetrics {
1101 fn maybe_update(&mut self, latest: Self) -> Option<EventData> {
1107 let mut emit_event = false;
1108
1109 let new_min_rtt = if self.min_rtt != latest.min_rtt {
1110 self.min_rtt = latest.min_rtt;
1111 emit_event = true;
1112 Some(latest.min_rtt.as_secs_f32() * 1000.0)
1113 } else {
1114 None
1115 };
1116
1117 let new_smoothed_rtt = if self.smoothed_rtt != latest.smoothed_rtt {
1118 self.smoothed_rtt = latest.smoothed_rtt;
1119 emit_event = true;
1120 Some(latest.smoothed_rtt.as_secs_f32() * 1000.0)
1121 } else {
1122 None
1123 };
1124
1125 let new_latest_rtt = if self.latest_rtt != latest.latest_rtt {
1126 self.latest_rtt = latest.latest_rtt;
1127 emit_event = true;
1128 Some(latest.latest_rtt.as_secs_f32() * 1000.0)
1129 } else {
1130 None
1131 };
1132
1133 let new_rttvar = if self.rttvar != latest.rttvar {
1134 self.rttvar = latest.rttvar;
1135 emit_event = true;
1136 Some(latest.rttvar.as_secs_f32() * 1000.0)
1137 } else {
1138 None
1139 };
1140
1141 let new_cwnd = if self.cwnd != latest.cwnd {
1142 self.cwnd = latest.cwnd;
1143 emit_event = true;
1144 Some(latest.cwnd)
1145 } else {
1146 None
1147 };
1148
1149 let new_bytes_in_flight =
1150 if self.bytes_in_flight != latest.bytes_in_flight {
1151 self.bytes_in_flight = latest.bytes_in_flight;
1152 emit_event = true;
1153 Some(latest.bytes_in_flight)
1154 } else {
1155 None
1156 };
1157
1158 let new_ssthresh = if self.ssthresh != latest.ssthresh {
1159 self.ssthresh = latest.ssthresh;
1160 emit_event = true;
1161 Some(latest.ssthresh)
1162 } else {
1163 None
1164 };
1165
1166 let new_pacing_rate = if self.pacing_rate != latest.pacing_rate {
1167 self.pacing_rate = latest.pacing_rate;
1168 emit_event = true;
1169 Some(latest.pacing_rate)
1170 } else {
1171 None
1172 };
1173
1174 if emit_event {
1175 return Some(EventData::MetricsUpdated(
1177 qlog::events::quic::MetricsUpdated {
1178 min_rtt: new_min_rtt,
1179 smoothed_rtt: new_smoothed_rtt,
1180 latest_rtt: new_latest_rtt,
1181 rtt_variance: new_rttvar,
1182 congestion_window: new_cwnd,
1183 bytes_in_flight: new_bytes_in_flight,
1184 ssthresh: new_ssthresh,
1185 pacing_rate: new_pacing_rate,
1186 ..Default::default()
1187 },
1188 ));
1189 }
1190
1191 None
1192 }
1193}
1194
1195#[cfg(test)]
1196mod tests {
1197 use super::*;
1198 use smallvec::smallvec;
1199 use std::str::FromStr;
1200
1201 #[test]
1202 fn lookup_cc_algo_ok() {
1203 let algo = CongestionControlAlgorithm::from_str("reno").unwrap();
1204 assert_eq!(algo, CongestionControlAlgorithm::Reno);
1205 }
1206
1207 #[test]
1208 fn lookup_cc_algo_bad() {
1209 assert_eq!(
1210 CongestionControlAlgorithm::from_str("???"),
1211 Err(crate::Error::CongestionControl)
1212 );
1213 }
1214
1215 #[test]
1216 fn loss_on_pto() {
1217 let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
1218 cfg.set_cc_algorithm(CongestionControlAlgorithm::Reno);
1219
1220 let mut r = Recovery::new(&cfg);
1221
1222 let mut now = Instant::now();
1223
1224 assert_eq!(r.epochs[packet::Epoch::Application].sent_packets.len(), 0);
1225
1226 let p = Sent {
1228 pkt_num: 0,
1229 frames: smallvec![],
1230 time_sent: now,
1231 time_acked: None,
1232 time_lost: None,
1233 size: 1000,
1234 ack_eliciting: true,
1235 in_flight: true,
1236 delivered: 0,
1237 delivered_time: now,
1238 first_sent_time: now,
1239 is_app_limited: false,
1240 tx_in_flight: 0,
1241 lost: 0,
1242 has_data: false,
1243 pmtud: false,
1244 };
1245
1246 r.on_packet_sent(
1247 p,
1248 packet::Epoch::Application,
1249 HandshakeStatus::default(),
1250 now,
1251 "",
1252 );
1253
1254 assert_eq!(r.epochs[packet::Epoch::Application].sent_packets.len(), 1);
1255 assert_eq!(r.bytes_in_flight, 1000);
1256
1257 let p = Sent {
1258 pkt_num: 1,
1259 frames: smallvec![],
1260 time_sent: now,
1261 time_acked: None,
1262 time_lost: None,
1263 size: 1000,
1264 ack_eliciting: true,
1265 in_flight: true,
1266 delivered: 0,
1267 delivered_time: now,
1268 first_sent_time: now,
1269 is_app_limited: false,
1270 tx_in_flight: 0,
1271 lost: 0,
1272 has_data: false,
1273 pmtud: false,
1274 };
1275
1276 r.on_packet_sent(
1277 p,
1278 packet::Epoch::Application,
1279 HandshakeStatus::default(),
1280 now,
1281 "",
1282 );
1283
1284 assert_eq!(r.epochs[packet::Epoch::Application].sent_packets.len(), 2);
1285 assert_eq!(r.bytes_in_flight, 2000);
1286
1287 let p = Sent {
1288 pkt_num: 2,
1289 frames: smallvec![],
1290 time_sent: now,
1291 time_acked: None,
1292 time_lost: None,
1293 size: 1000,
1294 ack_eliciting: true,
1295 in_flight: true,
1296 delivered: 0,
1297 delivered_time: now,
1298 first_sent_time: now,
1299 is_app_limited: false,
1300 tx_in_flight: 0,
1301 lost: 0,
1302 has_data: false,
1303 pmtud: false,
1304 };
1305
1306 r.on_packet_sent(
1307 p,
1308 packet::Epoch::Application,
1309 HandshakeStatus::default(),
1310 now,
1311 "",
1312 );
1313 assert_eq!(r.epochs[packet::Epoch::Application].sent_packets.len(), 3);
1314 assert_eq!(r.bytes_in_flight, 3000);
1315
1316 let p = Sent {
1317 pkt_num: 3,
1318 frames: smallvec![],
1319 time_sent: now,
1320 time_acked: None,
1321 time_lost: None,
1322 size: 1000,
1323 ack_eliciting: true,
1324 in_flight: true,
1325 delivered: 0,
1326 delivered_time: now,
1327 first_sent_time: now,
1328 is_app_limited: false,
1329 tx_in_flight: 0,
1330 lost: 0,
1331 has_data: false,
1332 pmtud: false,
1333 };
1334
1335 r.on_packet_sent(
1336 p,
1337 packet::Epoch::Application,
1338 HandshakeStatus::default(),
1339 now,
1340 "",
1341 );
1342 assert_eq!(r.epochs[packet::Epoch::Application].sent_packets.len(), 4);
1343 assert_eq!(r.bytes_in_flight, 4000);
1344
1345 now += Duration::from_millis(10);
1347
1348 let mut acked = ranges::RangeSet::default();
1350 acked.insert(0..2);
1351
1352 assert_eq!(
1353 r.on_ack_received(
1354 &acked,
1355 25,
1356 packet::Epoch::Application,
1357 HandshakeStatus::default(),
1358 now,
1359 "",
1360 ),
1361 Ok((0, 0, 2 * 1000))
1362 );
1363
1364 assert_eq!(r.epochs[packet::Epoch::Application].sent_packets.len(), 2);
1365 assert_eq!(r.bytes_in_flight, 2000);
1366 assert_eq!(r.congestion.lost_count, 0);
1367
1368 now = r.loss_detection_timer().unwrap();
1370
1371 r.on_loss_detection_timeout(HandshakeStatus::default(), now, "");
1373 assert_eq!(r.epochs[packet::Epoch::Application].loss_probes, 1);
1374 assert_eq!(r.congestion.lost_count, 0);
1375 assert_eq!(r.pto_count, 1);
1376
1377 let p = Sent {
1378 pkt_num: 4,
1379 frames: smallvec![],
1380 time_sent: now,
1381 time_acked: None,
1382 time_lost: None,
1383 size: 1000,
1384 ack_eliciting: true,
1385 in_flight: true,
1386 delivered: 0,
1387 delivered_time: now,
1388 first_sent_time: now,
1389 is_app_limited: false,
1390 tx_in_flight: 0,
1391 lost: 0,
1392 has_data: false,
1393 pmtud: false,
1394 };
1395
1396 r.on_packet_sent(
1397 p,
1398 packet::Epoch::Application,
1399 HandshakeStatus::default(),
1400 now,
1401 "",
1402 );
1403 assert_eq!(r.epochs[packet::Epoch::Application].sent_packets.len(), 3);
1404 assert_eq!(r.bytes_in_flight, 3000);
1405
1406 let p = Sent {
1407 pkt_num: 5,
1408 frames: smallvec![],
1409 time_sent: now,
1410 time_acked: None,
1411 time_lost: None,
1412 size: 1000,
1413 ack_eliciting: true,
1414 in_flight: true,
1415 delivered: 0,
1416 delivered_time: now,
1417 first_sent_time: now,
1418 is_app_limited: false,
1419 tx_in_flight: 0,
1420 lost: 0,
1421 has_data: false,
1422 pmtud: false,
1423 };
1424
1425 r.on_packet_sent(
1426 p,
1427 packet::Epoch::Application,
1428 HandshakeStatus::default(),
1429 now,
1430 "",
1431 );
1432 assert_eq!(r.epochs[packet::Epoch::Application].sent_packets.len(), 4);
1433 assert_eq!(r.bytes_in_flight, 4000);
1434 assert_eq!(r.congestion.lost_count, 0);
1435
1436 now += Duration::from_millis(10);
1438
1439 let mut acked = ranges::RangeSet::default();
1441 acked.insert(4..6);
1442
1443 assert_eq!(
1444 r.on_ack_received(
1445 &acked,
1446 25,
1447 packet::Epoch::Application,
1448 HandshakeStatus::default(),
1449 now,
1450 "",
1451 ),
1452 Ok((2, 2000, 2 * 1000))
1453 );
1454
1455 assert_eq!(r.epochs[packet::Epoch::Application].sent_packets.len(), 4);
1456 assert_eq!(r.bytes_in_flight, 0);
1457
1458 assert_eq!(r.congestion.lost_count, 2);
1459
1460 now += r.rtt();
1462
1463 r.detect_lost_packets(packet::Epoch::Application, now, "");
1464
1465 assert_eq!(r.epochs[packet::Epoch::Application].sent_packets.len(), 0);
1466 }
1467
1468 #[test]
1469 fn loss_on_timer() {
1470 let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
1471 cfg.set_cc_algorithm(CongestionControlAlgorithm::Reno);
1472
1473 let mut r = Recovery::new(&cfg);
1474
1475 let mut now = Instant::now();
1476
1477 assert_eq!(r.epochs[packet::Epoch::Application].sent_packets.len(), 0);
1478
1479 let p = Sent {
1481 pkt_num: 0,
1482 frames: smallvec![],
1483 time_sent: now,
1484 time_acked: None,
1485 time_lost: None,
1486 size: 1000,
1487 ack_eliciting: true,
1488 in_flight: true,
1489 delivered: 0,
1490 delivered_time: now,
1491 first_sent_time: now,
1492 is_app_limited: false,
1493 tx_in_flight: 0,
1494 lost: 0,
1495 has_data: false,
1496 pmtud: false,
1497 };
1498
1499 r.on_packet_sent(
1500 p,
1501 packet::Epoch::Application,
1502 HandshakeStatus::default(),
1503 now,
1504 "",
1505 );
1506 assert_eq!(r.epochs[packet::Epoch::Application].sent_packets.len(), 1);
1507 assert_eq!(r.bytes_in_flight, 1000);
1508
1509 let p = Sent {
1510 pkt_num: 1,
1511 frames: smallvec![],
1512 time_sent: now,
1513 time_acked: None,
1514 time_lost: None,
1515 size: 1000,
1516 ack_eliciting: true,
1517 in_flight: true,
1518 delivered: 0,
1519 delivered_time: now,
1520 first_sent_time: now,
1521 is_app_limited: false,
1522 tx_in_flight: 0,
1523 lost: 0,
1524 has_data: false,
1525 pmtud: false,
1526 };
1527
1528 r.on_packet_sent(
1529 p,
1530 packet::Epoch::Application,
1531 HandshakeStatus::default(),
1532 now,
1533 "",
1534 );
1535 assert_eq!(r.epochs[packet::Epoch::Application].sent_packets.len(), 2);
1536 assert_eq!(r.bytes_in_flight, 2000);
1537
1538 let p = Sent {
1539 pkt_num: 2,
1540 frames: smallvec![],
1541 time_sent: now,
1542 time_acked: None,
1543 time_lost: None,
1544 size: 1000,
1545 ack_eliciting: true,
1546 in_flight: true,
1547 delivered: 0,
1548 delivered_time: now,
1549 first_sent_time: now,
1550 is_app_limited: false,
1551 tx_in_flight: 0,
1552 lost: 0,
1553 has_data: false,
1554 pmtud: false,
1555 };
1556
1557 r.on_packet_sent(
1558 p,
1559 packet::Epoch::Application,
1560 HandshakeStatus::default(),
1561 now,
1562 "",
1563 );
1564 assert_eq!(r.epochs[packet::Epoch::Application].sent_packets.len(), 3);
1565 assert_eq!(r.bytes_in_flight, 3000);
1566
1567 let p = Sent {
1568 pkt_num: 3,
1569 frames: smallvec![],
1570 time_sent: now,
1571 time_acked: None,
1572 time_lost: None,
1573 size: 1000,
1574 ack_eliciting: true,
1575 in_flight: true,
1576 delivered: 0,
1577 delivered_time: now,
1578 first_sent_time: now,
1579 is_app_limited: false,
1580 tx_in_flight: 0,
1581 lost: 0,
1582 has_data: false,
1583 pmtud: false,
1584 };
1585
1586 r.on_packet_sent(
1587 p,
1588 packet::Epoch::Application,
1589 HandshakeStatus::default(),
1590 now,
1591 "",
1592 );
1593 assert_eq!(r.epochs[packet::Epoch::Application].sent_packets.len(), 4);
1594 assert_eq!(r.bytes_in_flight, 4000);
1595
1596 now += Duration::from_millis(10);
1598
1599 let mut acked = ranges::RangeSet::default();
1601 acked.insert(0..2);
1602 acked.insert(3..4);
1603
1604 assert_eq!(
1605 r.on_ack_received(
1606 &acked,
1607 25,
1608 packet::Epoch::Application,
1609 HandshakeStatus::default(),
1610 now,
1611 "",
1612 ),
1613 Ok((0, 0, 3 * 1000))
1614 );
1615
1616 assert_eq!(r.epochs[packet::Epoch::Application].sent_packets.len(), 2);
1617 assert_eq!(r.bytes_in_flight, 1000);
1618 assert_eq!(r.congestion.lost_count, 0);
1619
1620 now = r.loss_detection_timer().unwrap();
1622
1623 r.on_loss_detection_timeout(HandshakeStatus::default(), now, "");
1625 assert_eq!(r.epochs[packet::Epoch::Application].loss_probes, 0);
1626
1627 assert_eq!(r.epochs[packet::Epoch::Application].sent_packets.len(), 2);
1628 assert_eq!(r.bytes_in_flight, 0);
1629
1630 assert_eq!(r.congestion.lost_count, 1);
1631
1632 now += r.rtt();
1634
1635 r.detect_lost_packets(packet::Epoch::Application, now, "");
1636
1637 assert_eq!(r.epochs[packet::Epoch::Application].sent_packets.len(), 0);
1638 }
1639
1640 #[test]
1641 fn loss_on_reordering() {
1642 let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
1643 cfg.set_cc_algorithm(CongestionControlAlgorithm::Reno);
1644
1645 let mut r = Recovery::new(&cfg);
1646
1647 let mut now = Instant::now();
1648
1649 assert_eq!(r.epochs[packet::Epoch::Application].sent_packets.len(), 0);
1650
1651 let p = Sent {
1653 pkt_num: 0,
1654 frames: smallvec![],
1655 time_sent: now,
1656 time_acked: None,
1657 time_lost: None,
1658 size: 1000,
1659 ack_eliciting: true,
1660 in_flight: true,
1661 delivered: 0,
1662 delivered_time: now,
1663 first_sent_time: now,
1664 is_app_limited: false,
1665 tx_in_flight: 0,
1666 lost: 0,
1667 has_data: false,
1668 pmtud: false,
1669 };
1670
1671 r.on_packet_sent(
1672 p,
1673 packet::Epoch::Application,
1674 HandshakeStatus::default(),
1675 now,
1676 "",
1677 );
1678 assert_eq!(r.epochs[packet::Epoch::Application].sent_packets.len(), 1);
1679 assert_eq!(r.bytes_in_flight, 1000);
1680
1681 let p = Sent {
1682 pkt_num: 1,
1683 frames: smallvec![],
1684 time_sent: now,
1685 time_acked: None,
1686 time_lost: None,
1687 size: 1000,
1688 ack_eliciting: true,
1689 in_flight: true,
1690 delivered: 0,
1691 delivered_time: now,
1692 first_sent_time: now,
1693 is_app_limited: false,
1694 tx_in_flight: 0,
1695 lost: 0,
1696 has_data: false,
1697 pmtud: false,
1698 };
1699
1700 r.on_packet_sent(
1701 p,
1702 packet::Epoch::Application,
1703 HandshakeStatus::default(),
1704 now,
1705 "",
1706 );
1707 assert_eq!(r.epochs[packet::Epoch::Application].sent_packets.len(), 2);
1708 assert_eq!(r.bytes_in_flight, 2000);
1709
1710 let p = Sent {
1711 pkt_num: 2,
1712 frames: smallvec![],
1713 time_sent: now,
1714 time_acked: None,
1715 time_lost: None,
1716 size: 1000,
1717 ack_eliciting: true,
1718 in_flight: true,
1719 delivered: 0,
1720 delivered_time: now,
1721 first_sent_time: now,
1722 is_app_limited: false,
1723 tx_in_flight: 0,
1724 lost: 0,
1725 has_data: false,
1726 pmtud: false,
1727 };
1728
1729 r.on_packet_sent(
1730 p,
1731 packet::Epoch::Application,
1732 HandshakeStatus::default(),
1733 now,
1734 "",
1735 );
1736 assert_eq!(r.epochs[packet::Epoch::Application].sent_packets.len(), 3);
1737 assert_eq!(r.bytes_in_flight, 3000);
1738
1739 let p = Sent {
1740 pkt_num: 3,
1741 frames: smallvec![],
1742 time_sent: now,
1743 time_acked: None,
1744 time_lost: None,
1745 size: 1000,
1746 ack_eliciting: true,
1747 in_flight: true,
1748 delivered: 0,
1749 delivered_time: now,
1750 first_sent_time: now,
1751 is_app_limited: false,
1752 tx_in_flight: 0,
1753 lost: 0,
1754 has_data: false,
1755 pmtud: false,
1756 };
1757
1758 r.on_packet_sent(
1759 p,
1760 packet::Epoch::Application,
1761 HandshakeStatus::default(),
1762 now,
1763 "",
1764 );
1765 assert_eq!(r.epochs[packet::Epoch::Application].sent_packets.len(), 4);
1766 assert_eq!(r.bytes_in_flight, 4000);
1767
1768 now += Duration::from_millis(10);
1770
1771 let mut acked = ranges::RangeSet::default();
1773 acked.insert(2..4);
1774
1775 assert_eq!(
1776 r.on_ack_received(
1777 &acked,
1778 25,
1779 packet::Epoch::Application,
1780 HandshakeStatus::default(),
1781 now,
1782 "",
1783 ),
1784 Ok((1, 1000, 1000 * 2))
1785 );
1786
1787 now += Duration::from_millis(10);
1788
1789 let mut acked = ranges::RangeSet::default();
1790 acked.insert(0..2);
1791
1792 assert_eq!(r.pkt_thresh, INITIAL_PACKET_THRESHOLD);
1793
1794 assert_eq!(
1795 r.on_ack_received(
1796 &acked,
1797 25,
1798 packet::Epoch::Application,
1799 HandshakeStatus::default(),
1800 now,
1801 "",
1802 ),
1803 Ok((0, 0, 1000))
1804 );
1805
1806 assert_eq!(r.epochs[packet::Epoch::Application].sent_packets.len(), 0);
1807 assert_eq!(r.bytes_in_flight, 0);
1808
1809 assert_eq!(r.congestion.lost_count, 1);
1811 assert_eq!(r.lost_spurious_count, 1);
1812
1813 assert_eq!(r.pkt_thresh, 4);
1815
1816 now += r.rtt();
1818
1819 r.detect_lost_packets(packet::Epoch::Application, now, "");
1820
1821 assert_eq!(r.epochs[packet::Epoch::Application].sent_packets.len(), 0);
1822 }
1823
1824 #[test]
1825 fn pacing() {
1826 let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
1827 cfg.set_cc_algorithm(CongestionControlAlgorithm::CUBIC);
1828
1829 let mut r = Recovery::new(&cfg);
1830
1831 let mut now = Instant::now();
1832
1833 assert_eq!(r.epochs[packet::Epoch::Application].sent_packets.len(), 0);
1834
1835 let p = Sent {
1837 pkt_num: 0,
1838 frames: smallvec![],
1839 time_sent: now,
1840 time_acked: None,
1841 time_lost: None,
1842 size: 12000,
1843 ack_eliciting: true,
1844 in_flight: true,
1845 delivered: 0,
1846 delivered_time: now,
1847 first_sent_time: now,
1848 is_app_limited: false,
1849 tx_in_flight: 0,
1850 lost: 0,
1851 has_data: false,
1852 pmtud: false,
1853 };
1854
1855 r.on_packet_sent(
1856 p,
1857 packet::Epoch::Application,
1858 HandshakeStatus::default(),
1859 now,
1860 "",
1861 );
1862
1863 assert_eq!(r.epochs[packet::Epoch::Application].sent_packets.len(), 1);
1864 assert_eq!(r.bytes_in_flight, 12000);
1865
1866 assert_eq!(r.congestion.pacer.rate(), 0);
1868 assert_eq!(r.get_packet_send_time(), now);
1869
1870 now += Duration::from_millis(50);
1872
1873 let mut acked = ranges::RangeSet::default();
1874 acked.insert(0..1);
1875
1876 assert_eq!(
1877 r.on_ack_received(
1878 &acked,
1879 10,
1880 packet::Epoch::Application,
1881 HandshakeStatus::default(),
1882 now,
1883 "",
1884 ),
1885 Ok((0, 0, 12000))
1886 );
1887
1888 assert_eq!(r.epochs[packet::Epoch::Application].sent_packets.len(), 0);
1889 assert_eq!(r.bytes_in_flight, 0);
1890 assert_eq!(r.rtt_stats.smoothed_rtt, Duration::from_millis(50));
1891
1892 assert_eq!(r.cwnd(), 12000 + 1200);
1894
1895 let p = Sent {
1897 pkt_num: 1,
1898 frames: smallvec![],
1899 time_sent: now,
1900 time_acked: None,
1901 time_lost: None,
1902 size: 6000,
1903 ack_eliciting: true,
1904 in_flight: true,
1905 delivered: 0,
1906 delivered_time: now,
1907 first_sent_time: now,
1908 is_app_limited: false,
1909 tx_in_flight: 0,
1910 lost: 0,
1911 has_data: false,
1912 pmtud: false,
1913 };
1914
1915 r.on_packet_sent(
1916 p,
1917 packet::Epoch::Application,
1918 HandshakeStatus::default(),
1919 now,
1920 "",
1921 );
1922
1923 assert_eq!(r.epochs[packet::Epoch::Application].sent_packets.len(), 1);
1924 assert_eq!(r.bytes_in_flight, 6000);
1925
1926 assert_eq!(r.get_packet_send_time(), now);
1928
1929 let p = Sent {
1931 pkt_num: 2,
1932 frames: smallvec![],
1933 time_sent: now,
1934 time_acked: None,
1935 time_lost: None,
1936 size: 6000,
1937 ack_eliciting: true,
1938 in_flight: true,
1939 delivered: 0,
1940 delivered_time: now,
1941 first_sent_time: now,
1942 is_app_limited: false,
1943 tx_in_flight: 0,
1944 lost: 0,
1945 has_data: false,
1946 pmtud: false,
1947 };
1948
1949 r.on_packet_sent(
1950 p,
1951 packet::Epoch::Application,
1952 HandshakeStatus::default(),
1953 now,
1954 "",
1955 );
1956
1957 assert_eq!(r.epochs[packet::Epoch::Application].sent_packets.len(), 2);
1958 assert_eq!(r.bytes_in_flight, 12000);
1959
1960 let p = Sent {
1962 pkt_num: 3,
1963 frames: smallvec![],
1964 time_sent: now,
1965 time_acked: None,
1966 time_lost: None,
1967 size: 1000,
1968 ack_eliciting: true,
1969 in_flight: true,
1970 delivered: 0,
1971 delivered_time: now,
1972 first_sent_time: now,
1973 is_app_limited: false,
1974 tx_in_flight: 0,
1975 lost: 0,
1976 has_data: false,
1977 pmtud: false,
1978 };
1979
1980 r.on_packet_sent(
1981 p,
1982 packet::Epoch::Application,
1983 HandshakeStatus::default(),
1984 now,
1985 "",
1986 );
1987
1988 assert_eq!(r.epochs[packet::Epoch::Application].sent_packets.len(), 3);
1989 assert_eq!(r.bytes_in_flight, 13000);
1990
1991 let pacing_rate =
1994 (r.cwnd() as f64 * congestion::PACING_MULTIPLIER / 0.05) as u64;
1995 assert_eq!(r.congestion.pacer.rate(), pacing_rate);
1996
1997 assert_eq!(
1998 r.get_packet_send_time(),
1999 now + Duration::from_secs_f64(12000.0 / pacing_rate as f64)
2000 );
2001 }
2002
2003 #[test]
2004 fn pmtud_loss_on_timer() {
2005 let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
2006 cfg.set_cc_algorithm(CongestionControlAlgorithm::Reno);
2007
2008 let mut r = Recovery::new(&cfg);
2009
2010 let mut now = Instant::now();
2011
2012 assert_eq!(r.epochs[packet::Epoch::Application].sent_packets.len(), 0);
2013
2014 let p = Sent {
2016 pkt_num: 0,
2017 frames: smallvec![],
2018 time_sent: now,
2019 time_acked: None,
2020 time_lost: None,
2021 size: 1000,
2022 ack_eliciting: true,
2023 in_flight: true,
2024 delivered: 0,
2025 delivered_time: now,
2026 first_sent_time: now,
2027 is_app_limited: false,
2028 tx_in_flight: 0,
2029 lost: 0,
2030 has_data: false,
2031 pmtud: false,
2032 };
2033
2034 r.on_packet_sent(
2035 p,
2036 packet::Epoch::Application,
2037 HandshakeStatus::default(),
2038 now,
2039 "",
2040 );
2041
2042 assert_eq!(r.epochs[packet::Epoch::Application].in_flight_count, 1);
2043 assert_eq!(r.epochs[packet::Epoch::Application].sent_packets.len(), 1);
2044 assert_eq!(r.bytes_in_flight, 1000);
2045
2046 let p = Sent {
2047 pkt_num: 1,
2048 frames: smallvec![],
2049 time_sent: now,
2050 time_acked: None,
2051 time_lost: None,
2052 size: 1000,
2053 ack_eliciting: true,
2054 in_flight: true,
2055 delivered: 0,
2056 delivered_time: now,
2057 first_sent_time: now,
2058 is_app_limited: false,
2059 tx_in_flight: 0,
2060 lost: 0,
2061 has_data: false,
2062 pmtud: true,
2063 };
2064
2065 r.on_packet_sent(
2066 p,
2067 packet::Epoch::Application,
2068 HandshakeStatus::default(),
2069 now,
2070 "",
2071 );
2072
2073 assert_eq!(r.epochs[packet::Epoch::Application].in_flight_count, 2);
2074
2075 let p = Sent {
2076 pkt_num: 2,
2077 frames: smallvec![],
2078 time_sent: now,
2079 time_acked: None,
2080 time_lost: None,
2081 size: 1000,
2082 ack_eliciting: true,
2083 in_flight: true,
2084 delivered: 0,
2085 delivered_time: now,
2086 first_sent_time: now,
2087 is_app_limited: false,
2088 tx_in_flight: 0,
2089 lost: 0,
2090 has_data: false,
2091 pmtud: false,
2092 };
2093
2094 r.on_packet_sent(
2095 p,
2096 packet::Epoch::Application,
2097 HandshakeStatus::default(),
2098 now,
2099 "",
2100 );
2101
2102 assert_eq!(r.epochs[packet::Epoch::Application].in_flight_count, 3);
2103
2104 now += Duration::from_millis(10);
2106
2107 let mut acked = ranges::RangeSet::default();
2109 acked.insert(0..1);
2110 acked.insert(2..3);
2111
2112 assert_eq!(
2113 r.on_ack_received(
2114 &acked,
2115 25,
2116 packet::Epoch::Application,
2117 HandshakeStatus::default(),
2118 now,
2119 "",
2120 ),
2121 Ok((0, 0, 2 * 1000))
2122 );
2123
2124 assert_eq!(r.epochs[packet::Epoch::Application].sent_packets.len(), 2);
2125 assert_eq!(r.bytes_in_flight, 1000);
2126 assert_eq!(r.congestion.lost_count, 0);
2127
2128 now = r.loss_detection_timer().unwrap();
2130
2131 r.on_loss_detection_timeout(HandshakeStatus::default(), now, "");
2133 assert_eq!(r.epochs[packet::Epoch::Application].loss_probes, 0);
2134
2135 assert_eq!(r.epochs[packet::Epoch::Application].sent_packets.len(), 2);
2136 assert_eq!(r.epochs[packet::Epoch::Application].in_flight_count, 0);
2137 assert_eq!(r.bytes_in_flight, 0);
2138 assert_eq!(r.cwnd(), 12000);
2139
2140 assert_eq!(r.congestion.lost_count, 0);
2141
2142 now += r.rtt();
2144
2145 r.detect_lost_packets(packet::Epoch::Application, now, "");
2146
2147 assert_eq!(r.epochs[packet::Epoch::Application].sent_packets.len(), 0);
2148 assert_eq!(r.epochs[packet::Epoch::Application].in_flight_count, 0);
2149 assert_eq!(r.bytes_in_flight, 0);
2150 assert_eq!(r.congestion.lost_count, 0);
2151 }
2152}
2153
2154pub mod congestion;
2155mod rtt;