use std::cmp;
use std::str::FromStr;
use std::time::Duration;
use std::time::Instant;
use std::collections::VecDeque;
use crate::Config;
use crate::Result;
use crate::frame;
use crate::minmax;
use crate::packet;
use crate::ranges;
#[cfg(feature = "qlog")]
use qlog::events::EventData;
use smallvec::SmallVec;
const INITIAL_PACKET_THRESHOLD: u64 = 3;
const MAX_PACKET_THRESHOLD: u64 = 20;
const INITIAL_TIME_THRESHOLD: f64 = 9.0 / 8.0;
const GRANULARITY: Duration = Duration::from_millis(1);
const INITIAL_RTT: Duration = Duration::from_millis(333);
const PERSISTENT_CONGESTION_THRESHOLD: u32 = 3;
const RTT_WINDOW: Duration = Duration::from_secs(300);
const MAX_PTO_PROBES_COUNT: usize = 2;
const MINIMUM_WINDOW_PACKETS: usize = 2;
const LOSS_REDUCTION_FACTOR: f64 = 0.5;
const PACING_MULTIPLIER: f64 = 1.25;
pub(super) const MAX_OUTSTANDING_NON_ACK_ELICITING: usize = 24;
pub struct Recovery {
loss_detection_timer: Option<Instant>,
pto_count: u32,
time_of_last_sent_ack_eliciting_pkt:
[Option<Instant>; packet::Epoch::count()],
largest_acked_pkt: [u64; packet::Epoch::count()],
largest_sent_pkt: [u64; packet::Epoch::count()],
latest_rtt: Duration,
smoothed_rtt: Option<Duration>,
rttvar: Duration,
minmax_filter: minmax::Minmax<Duration>,
min_rtt: Duration,
pub max_ack_delay: Duration,
loss_time: [Option<Instant>; packet::Epoch::count()],
sent: [VecDeque<Sent>; packet::Epoch::count()],
pub lost: [Vec<frame::Frame>; packet::Epoch::count()],
pub acked: [Vec<frame::Frame>; packet::Epoch::count()],
pub lost_count: usize,
pub lost_spurious_count: usize,
pub loss_probes: [usize; packet::Epoch::count()],
in_flight_count: [usize; packet::Epoch::count()],
app_limited: bool,
delivery_rate: delivery_rate::Rate,
pkt_thresh: u64,
time_thresh: f64,
cc_ops: &'static CongestionControlOps,
congestion_window: usize,
bytes_in_flight: usize,
ssthresh: usize,
bytes_acked_sl: usize,
bytes_acked_ca: usize,
bytes_sent: usize,
pub bytes_lost: u64,
congestion_recovery_start_time: Option<Instant>,
max_datagram_size: usize,
cubic_state: cubic::State,
hystart: hystart::Hystart,
pub pacer: pacer::Pacer,
prr: prr::PRR,
#[cfg(feature = "qlog")]
qlog_metrics: QlogMetrics,
send_quantum: usize,
bbr_state: bbr::State,
bbr2_state: bbr2::State,
outstanding_non_ack_eliciting: usize,
initial_congestion_window_packets: usize,
}
pub struct RecoveryConfig {
max_send_udp_payload_size: usize,
pub max_ack_delay: Duration,
cc_ops: &'static CongestionControlOps,
hystart: bool,
pacing: bool,
max_pacing_rate: Option<u64>,
initial_congestion_window_packets: usize,
}
impl RecoveryConfig {
pub fn from_config(config: &Config) -> Self {
Self {
max_send_udp_payload_size: config.max_send_udp_payload_size,
max_ack_delay: Duration::ZERO,
cc_ops: config.cc_algorithm.into(),
hystart: config.hystart,
pacing: config.pacing,
max_pacing_rate: config.max_pacing_rate,
initial_congestion_window_packets: config
.initial_congestion_window_packets,
}
}
}
impl Recovery {
pub fn new_with_config(recovery_config: &RecoveryConfig) -> Self {
let initial_congestion_window = recovery_config.max_send_udp_payload_size *
recovery_config.initial_congestion_window_packets;
Recovery {
loss_detection_timer: None,
pto_count: 0,
time_of_last_sent_ack_eliciting_pkt: [None; packet::Epoch::count()],
largest_acked_pkt: [u64::MAX; packet::Epoch::count()],
largest_sent_pkt: [0; packet::Epoch::count()],
latest_rtt: Duration::ZERO,
smoothed_rtt: None,
minmax_filter: minmax::Minmax::new(Duration::ZERO),
min_rtt: Duration::ZERO,
rttvar: INITIAL_RTT / 2,
max_ack_delay: recovery_config.max_ack_delay,
loss_time: [None; packet::Epoch::count()],
sent: [VecDeque::new(), VecDeque::new(), VecDeque::new()],
lost: [Vec::new(), Vec::new(), Vec::new()],
acked: [Vec::new(), Vec::new(), Vec::new()],
lost_count: 0,
lost_spurious_count: 0,
loss_probes: [0; packet::Epoch::count()],
in_flight_count: [0; packet::Epoch::count()],
congestion_window: initial_congestion_window,
pkt_thresh: INITIAL_PACKET_THRESHOLD,
time_thresh: INITIAL_TIME_THRESHOLD,
bytes_in_flight: 0,
ssthresh: usize::MAX,
bytes_acked_sl: 0,
bytes_acked_ca: 0,
bytes_sent: 0,
bytes_lost: 0,
congestion_recovery_start_time: None,
max_datagram_size: recovery_config.max_send_udp_payload_size,
cc_ops: recovery_config.cc_ops,
delivery_rate: delivery_rate::Rate::default(),
cubic_state: cubic::State::default(),
app_limited: false,
hystart: hystart::Hystart::new(recovery_config.hystart),
pacer: pacer::Pacer::new(
recovery_config.pacing,
initial_congestion_window,
0,
recovery_config.max_send_udp_payload_size,
recovery_config.max_pacing_rate,
),
prr: prr::PRR::default(),
send_quantum: initial_congestion_window,
#[cfg(feature = "qlog")]
qlog_metrics: QlogMetrics::default(),
bbr_state: bbr::State::new(),
bbr2_state: bbr2::State::new(),
outstanding_non_ack_eliciting: 0,
initial_congestion_window_packets: recovery_config
.initial_congestion_window_packets,
}
}
pub fn new(config: &Config) -> Self {
Self::new_with_config(&RecoveryConfig::from_config(config))
}
pub fn on_init(&mut self) {
(self.cc_ops.on_init)(self);
}
pub fn reset(&mut self) {
self.congestion_window =
self.max_datagram_size * self.initial_congestion_window_packets;
self.in_flight_count = [0; packet::Epoch::count()];
self.congestion_recovery_start_time = None;
self.ssthresh = usize::MAX;
(self.cc_ops.reset)(self);
self.hystart.reset();
self.prr = prr::PRR::default();
}
pub fn should_elicit_ack(&self, epoch: packet::Epoch) -> bool {
self.loss_probes[epoch] > 0 ||
self.outstanding_non_ack_eliciting >=
MAX_OUTSTANDING_NON_ACK_ELICITING
}
pub fn on_packet_sent(
&mut self, mut pkt: Sent, epoch: packet::Epoch,
handshake_status: HandshakeStatus, now: Instant, trace_id: &str,
) {
let ack_eliciting = pkt.ack_eliciting;
let in_flight = pkt.in_flight;
let sent_bytes = pkt.size;
let pkt_num = pkt.pkt_num;
if ack_eliciting {
self.outstanding_non_ack_eliciting = 0;
} else {
self.outstanding_non_ack_eliciting += 1;
}
self.largest_sent_pkt[epoch] =
cmp::max(self.largest_sent_pkt[epoch], pkt_num);
if in_flight {
if ack_eliciting {
self.time_of_last_sent_ack_eliciting_pkt[epoch] = Some(now);
}
self.in_flight_count[epoch] += 1;
self.update_app_limited(
(self.bytes_in_flight + sent_bytes) < self.congestion_window,
);
self.on_packet_sent_cc(sent_bytes, now);
self.prr.on_packet_sent(sent_bytes);
self.set_loss_detection_timer(handshake_status, now);
}
if self.hystart.enabled() &&
epoch == packet::Epoch::Application &&
self.congestion_window < self.ssthresh
{
self.hystart.start_round(pkt_num);
}
if !(self.cc_ops.has_custom_pacing)() {
if let Some(srtt) = self.smoothed_rtt {
let rate = PACING_MULTIPLIER * self.congestion_window as f64 /
srtt.as_secs_f64();
self.set_pacing_rate(rate as u64, now);
}
}
self.schedule_next_packet(epoch, now, sent_bytes);
pkt.time_sent = self.get_packet_send_time();
self.delivery_rate.on_packet_sent(
&mut pkt,
self.bytes_in_flight - sent_bytes,
self.bytes_lost,
);
self.sent[epoch].push_back(pkt);
self.bytes_sent += sent_bytes;
trace!("{} {:?}", trace_id, self);
}
fn on_packet_sent_cc(&mut self, sent_bytes: usize, now: Instant) {
(self.cc_ops.on_packet_sent)(self, sent_bytes, now);
}
pub fn set_pacing_rate(&mut self, rate: u64, now: Instant) {
self.pacer.update(self.send_quantum, rate, now);
}
pub fn get_packet_send_time(&self) -> Instant {
self.pacer.next_time()
}
fn schedule_next_packet(
&mut self, epoch: packet::Epoch, now: Instant, packet_size: usize,
) {
let is_app = epoch == packet::Epoch::Application;
let in_initcwnd = self.bytes_sent <
self.max_datagram_size * self.initial_congestion_window_packets;
let sent_bytes = if !self.pacer.enabled() || !is_app || in_initcwnd {
0
} else {
packet_size
};
self.pacer.send(sent_bytes, now);
}
#[allow(clippy::too_many_arguments)]
pub fn on_ack_received(
&mut self, ranges: &ranges::RangeSet, ack_delay: u64,
epoch: packet::Epoch, handshake_status: HandshakeStatus, now: Instant,
trace_id: &str, newly_acked: &mut Vec<Acked>,
) -> Result<(usize, usize)> {
let largest_acked = ranges.last().unwrap();
if self.largest_acked_pkt[epoch] == u64::MAX {
self.largest_acked_pkt[epoch] = largest_acked;
} else {
self.largest_acked_pkt[epoch] =
cmp::max(self.largest_acked_pkt[epoch], largest_acked);
}
let mut has_ack_eliciting = false;
let mut largest_newly_acked_pkt_num = 0;
let mut largest_newly_acked_sent_time = now;
let mut undo_cwnd = false;
let max_rtt = cmp::max(self.latest_rtt, self.rtt());
let sent = &mut self.sent[epoch];
for r in ranges.iter() {
let lowest_acked_in_block = r.start;
let largest_acked_in_block = r.end - 1;
let first_unacked = if sent
.front()
.map(|p| p.pkt_num >= lowest_acked_in_block)
.unwrap_or(true)
{
0
} else {
sent.binary_search_by_key(&lowest_acked_in_block, |e| e.pkt_num)
.unwrap_or_else(|i| i)
};
let unacked_iter = sent.range_mut(first_unacked..)
.take_while(|p| p.pkt_num <= largest_acked_in_block)
.filter(|p| p.time_acked.is_none());
for unacked in unacked_iter {
unacked.time_acked = Some(now);
if unacked.time_lost.is_some() {
let pkt_thresh =
self.largest_acked_pkt[epoch] - unacked.pkt_num + 1;
let pkt_thresh = cmp::min(MAX_PACKET_THRESHOLD, pkt_thresh);
self.pkt_thresh = cmp::max(self.pkt_thresh, pkt_thresh);
let loss_delay = max_rtt.mul_f64(self.time_thresh);
if now.saturating_duration_since(unacked.time_sent) >
loss_delay
{
self.time_thresh = 5_f64 / 4_f64;
}
if unacked.in_flight {
undo_cwnd = true;
}
self.lost_spurious_count += 1;
continue;
}
if unacked.ack_eliciting {
has_ack_eliciting = true;
}
largest_newly_acked_pkt_num = unacked.pkt_num;
largest_newly_acked_sent_time = unacked.time_sent;
self.acked[epoch].extend(unacked.frames.drain(..));
if unacked.in_flight {
self.in_flight_count[epoch] =
self.in_flight_count[epoch].saturating_sub(1);
}
newly_acked.push(Acked {
pkt_num: unacked.pkt_num,
time_sent: unacked.time_sent,
size: unacked.size,
rtt: now.saturating_duration_since(unacked.time_sent),
delivered: unacked.delivered,
delivered_time: unacked.delivered_time,
first_sent_time: unacked.first_sent_time,
is_app_limited: unacked.is_app_limited,
tx_in_flight: unacked.tx_in_flight,
lost: unacked.lost,
});
trace!("{} packet newly acked {}", trace_id, unacked.pkt_num);
}
}
if undo_cwnd {
(self.cc_ops.rollback)(self);
}
if newly_acked.is_empty() {
return Ok((0, 0));
}
if largest_newly_acked_pkt_num == largest_acked && has_ack_eliciting {
let latest_rtt =
now.saturating_duration_since(largest_newly_acked_sent_time);
let ack_delay = if epoch == packet::Epoch::Application {
Duration::from_micros(ack_delay)
} else {
Duration::from_micros(0)
};
if !latest_rtt.is_zero() {
self.update_rtt(latest_rtt, ack_delay, now);
}
}
let (lost_packets, lost_bytes) =
self.detect_lost_packets(epoch, now, trace_id);
self.on_packets_acked(newly_acked, epoch, now);
self.pto_count = 0;
self.set_loss_detection_timer(handshake_status, now);
self.drain_packets(epoch, now);
Ok((lost_packets, lost_bytes))
}
pub fn on_loss_detection_timeout(
&mut self, handshake_status: HandshakeStatus, now: Instant,
trace_id: &str,
) -> (usize, usize) {
let (earliest_loss_time, epoch) = self.loss_time_and_space();
if earliest_loss_time.is_some() {
let (lost_packets, lost_bytes) =
self.detect_lost_packets(epoch, now, trace_id);
self.set_loss_detection_timer(handshake_status, now);
trace!("{} {:?}", trace_id, self);
return (lost_packets, lost_bytes);
}
let epoch = if self.bytes_in_flight > 0 {
let (_, e) = self.pto_time_and_space(handshake_status, now);
e
} else {
if handshake_status.has_handshake_keys {
packet::Epoch::Handshake
} else {
packet::Epoch::Initial
}
};
self.pto_count += 1;
self.loss_probes[epoch] =
cmp::min(self.pto_count as usize, MAX_PTO_PROBES_COUNT);
let unacked_iter = self.sent[epoch]
.iter_mut()
.filter(|p| p.has_data && p.time_acked.is_none() && p.time_lost.is_none())
.take(self.loss_probes[epoch]);
for unacked in unacked_iter {
self.lost[epoch].extend_from_slice(&unacked.frames);
}
self.set_loss_detection_timer(handshake_status, now);
trace!("{} {:?}", trace_id, self);
(0, 0)
}
pub fn on_pkt_num_space_discarded(
&mut self, epoch: packet::Epoch, handshake_status: HandshakeStatus,
now: Instant,
) {
let unacked_bytes = self.sent[epoch]
.iter()
.filter(|p| {
p.in_flight && p.time_acked.is_none() && p.time_lost.is_none()
})
.fold(0, |acc, p| acc + p.size);
self.bytes_in_flight = self.bytes_in_flight.saturating_sub(unacked_bytes);
self.sent[epoch].clear();
self.lost[epoch].clear();
self.acked[epoch].clear();
self.time_of_last_sent_ack_eliciting_pkt[epoch] = None;
self.loss_time[epoch] = None;
self.loss_probes[epoch] = 0;
self.in_flight_count[epoch] = 0;
self.set_loss_detection_timer(handshake_status, now);
}
pub fn on_path_change(
&mut self, epoch: packet::Epoch, now: Instant, trace_id: &str,
) -> (usize, usize) {
self.detect_lost_packets(epoch, now, trace_id)
}
pub fn loss_detection_timer(&self) -> Option<Instant> {
self.loss_detection_timer
}
pub fn cwnd(&self) -> usize {
self.congestion_window
}
pub fn cwnd_available(&self) -> usize {
if self.loss_probes.iter().any(|&x| x > 0) {
return usize::MAX;
}
self.congestion_window.saturating_sub(self.bytes_in_flight) +
self.prr.snd_cnt
}
pub fn rtt(&self) -> Duration {
self.smoothed_rtt.unwrap_or(INITIAL_RTT)
}
pub fn min_rtt(&self) -> Option<Duration> {
if self.min_rtt == Duration::ZERO {
return None;
}
Some(self.min_rtt)
}
pub fn rttvar(&self) -> Duration {
self.rttvar
}
pub fn pto(&self) -> Duration {
self.rtt() + cmp::max(self.rttvar * 4, GRANULARITY)
}
pub fn delivery_rate(&self) -> u64 {
self.delivery_rate.sample_delivery_rate()
}
pub fn max_datagram_size(&self) -> usize {
self.max_datagram_size
}
pub fn pmtud_update_max_datagram_size(
&mut self, new_max_datagram_size: usize,
) {
if self.congestion_window ==
self.max_datagram_size * self.initial_congestion_window_packets
{
self.congestion_window =
new_max_datagram_size * self.initial_congestion_window_packets;
}
self.pacer = pacer::Pacer::new(
self.pacer.enabled(),
self.congestion_window,
0,
new_max_datagram_size,
self.pacer.max_pacing_rate(),
);
self.max_datagram_size = new_max_datagram_size;
}
pub fn update_max_datagram_size(&mut self, new_max_datagram_size: usize) {
let max_datagram_size =
cmp::min(self.max_datagram_size, new_max_datagram_size);
if self.congestion_window ==
self.max_datagram_size * self.initial_congestion_window_packets
{
self.congestion_window =
max_datagram_size * self.initial_congestion_window_packets;
}
self.pacer = pacer::Pacer::new(
self.pacer.enabled(),
self.congestion_window,
0,
max_datagram_size,
self.pacer.max_pacing_rate(),
);
self.max_datagram_size = max_datagram_size;
}
fn update_rtt(
&mut self, latest_rtt: Duration, ack_delay: Duration, now: Instant,
) {
self.latest_rtt = latest_rtt;
match self.smoothed_rtt {
None => {
self.min_rtt = self.minmax_filter.reset(now, latest_rtt);
self.smoothed_rtt = Some(latest_rtt);
self.rttvar = latest_rtt / 2;
},
Some(srtt) => {
self.min_rtt =
self.minmax_filter.running_min(RTT_WINDOW, now, latest_rtt);
let ack_delay = cmp::min(self.max_ack_delay, ack_delay);
let adjusted_rtt = if latest_rtt > self.min_rtt + ack_delay {
latest_rtt - ack_delay
} else {
latest_rtt
};
self.rttvar = self.rttvar.mul_f64(3.0 / 4.0) +
sub_abs(srtt, adjusted_rtt).mul_f64(1.0 / 4.0);
self.smoothed_rtt = Some(
srtt.mul_f64(7.0 / 8.0) + adjusted_rtt.mul_f64(1.0 / 8.0),
);
},
}
}
fn loss_time_and_space(&self) -> (Option<Instant>, packet::Epoch) {
let mut epoch = packet::Epoch::Initial;
let mut time = self.loss_time[epoch];
for &e in packet::Epoch::epochs(
packet::Epoch::Handshake..=packet::Epoch::Application,
) {
let new_time = self.loss_time[e];
if time.is_none() || new_time < time {
time = new_time;
epoch = e;
}
}
(time, epoch)
}
fn pto_time_and_space(
&self, handshake_status: HandshakeStatus, now: Instant,
) -> (Option<Instant>, packet::Epoch) {
let mut duration = self.pto() * 2_u32.pow(self.pto_count);
if self.bytes_in_flight == 0 {
if handshake_status.has_handshake_keys {
return (Some(now + duration), packet::Epoch::Handshake);
} else {
return (Some(now + duration), packet::Epoch::Initial);
}
}
let mut pto_timeout = None;
let mut pto_space = packet::Epoch::Initial;
for &e in packet::Epoch::epochs(
packet::Epoch::Initial..=packet::Epoch::Application,
) {
if self.in_flight_count[e] == 0 {
continue;
}
if e == packet::Epoch::Application {
if !handshake_status.completed {
return (pto_timeout, pto_space);
}
duration += self.max_ack_delay * 2_u32.pow(self.pto_count);
}
let new_time =
self.time_of_last_sent_ack_eliciting_pkt[e].map(|t| t + duration);
if pto_timeout.is_none() || new_time < pto_timeout {
pto_timeout = new_time;
pto_space = e;
}
}
(pto_timeout, pto_space)
}
fn set_loss_detection_timer(
&mut self, handshake_status: HandshakeStatus, now: Instant,
) {
let (earliest_loss_time, _) = self.loss_time_and_space();
if earliest_loss_time.is_some() {
self.loss_detection_timer = earliest_loss_time;
return;
}
if self.bytes_in_flight == 0 && handshake_status.peer_verified_address {
self.loss_detection_timer = None;
return;
}
let (timeout, _) = self.pto_time_and_space(handshake_status, now);
self.loss_detection_timer = timeout;
}
fn detect_lost_packets(
&mut self, epoch: packet::Epoch, now: Instant, trace_id: &str,
) -> (usize, usize) {
let largest_acked = self.largest_acked_pkt[epoch];
self.loss_time[epoch] = None;
let loss_delay =
cmp::max(self.latest_rtt, self.rtt()).mul_f64(self.time_thresh);
let loss_delay = cmp::max(loss_delay, GRANULARITY);
let lost_send_time = now.checked_sub(loss_delay).unwrap();
let mut lost_packets = 0;
let mut lost_bytes = 0;
let mut largest_lost_pkt = None;
let unacked_iter = self.sent[epoch]
.iter_mut()
.take_while(|p| p.pkt_num <= largest_acked)
.filter(|p| p.time_acked.is_none() && p.time_lost.is_none());
for unacked in unacked_iter {
if unacked.time_sent <= lost_send_time ||
largest_acked >= unacked.pkt_num + self.pkt_thresh
{
self.lost[epoch].extend(unacked.frames.drain(..));
unacked.time_lost = Some(now);
if unacked.in_flight && !unacked.pmtud {
lost_bytes += unacked.size;
largest_lost_pkt = Some(unacked.clone());
self.in_flight_count[epoch] =
self.in_flight_count[epoch].saturating_sub(1);
trace!(
"{} packet {} lost on epoch {}",
trace_id,
unacked.pkt_num,
epoch
);
}
if !unacked.pmtud {
lost_packets += 1;
self.lost_count += 1;
}
} else {
let loss_time = match self.loss_time[epoch] {
None => unacked.time_sent + loss_delay,
Some(loss_time) =>
cmp::min(loss_time, unacked.time_sent + loss_delay),
};
self.loss_time[epoch] = Some(loss_time);
break;
}
}
self.bytes_lost += lost_bytes as u64;
if let Some(pkt) = largest_lost_pkt {
if !pkt.pmtud {
self.on_packets_lost(lost_bytes, &pkt, epoch, now);
}
}
self.drain_packets(epoch, now);
(lost_packets, lost_bytes)
}
fn drain_packets(&mut self, epoch: packet::Epoch, now: Instant) {
let mut lowest_non_expired_pkt_index = self.sent[epoch].len();
for (i, pkt) in self.sent[epoch].iter().enumerate() {
if let Some(time_lost) = pkt.time_lost {
if time_lost + self.rtt() > now {
lowest_non_expired_pkt_index = i;
break;
}
}
if pkt.time_acked.is_none() && pkt.time_lost.is_none() {
lowest_non_expired_pkt_index = i;
break;
}
}
self.sent[epoch].drain(..lowest_non_expired_pkt_index);
}
fn on_packets_acked(
&mut self, acked: &mut Vec<Acked>, epoch: packet::Epoch, now: Instant,
) {
for pkt in acked.iter() {
self.delivery_rate.update_rate_sample(pkt, now);
}
self.delivery_rate.generate_rate_sample(self.min_rtt);
(self.cc_ops.on_packets_acked)(self, acked, epoch, now);
}
fn in_congestion_recovery(&self, sent_time: Instant) -> bool {
match self.congestion_recovery_start_time {
Some(congestion_recovery_start_time) =>
sent_time <= congestion_recovery_start_time,
None => false,
}
}
fn in_persistent_congestion(&mut self, _largest_lost_pkt_num: u64) -> bool {
let _congestion_period = self.pto() * PERSISTENT_CONGESTION_THRESHOLD;
false
}
fn on_packets_lost(
&mut self, lost_bytes: usize, largest_lost_pkt: &Sent,
epoch: packet::Epoch, now: Instant,
) {
self.bytes_in_flight = self.bytes_in_flight.saturating_sub(lost_bytes);
self.congestion_event(lost_bytes, largest_lost_pkt, epoch, now);
if self.in_persistent_congestion(largest_lost_pkt.pkt_num) {
self.collapse_cwnd();
}
}
fn congestion_event(
&mut self, lost_bytes: usize, largest_lost_pkt: &Sent,
epoch: packet::Epoch, now: Instant,
) {
let time_sent = largest_lost_pkt.time_sent;
if !self.in_congestion_recovery(time_sent) {
(self.cc_ops.checkpoint)(self);
}
(self.cc_ops.congestion_event)(
self,
lost_bytes,
largest_lost_pkt,
epoch,
now,
);
}
fn collapse_cwnd(&mut self) {
(self.cc_ops.collapse_cwnd)(self);
}
pub fn update_app_limited(&mut self, v: bool) {
self.app_limited = v;
}
pub fn app_limited(&self) -> bool {
self.app_limited
}
pub fn delivery_rate_update_app_limited(&mut self, v: bool) {
self.delivery_rate.update_app_limited(v);
}
#[cfg(feature = "qlog")]
pub fn maybe_qlog(&mut self) -> Option<EventData> {
let qlog_metrics = QlogMetrics {
min_rtt: self.min_rtt,
smoothed_rtt: self.rtt(),
latest_rtt: self.latest_rtt,
rttvar: self.rttvar,
cwnd: self.cwnd() as u64,
bytes_in_flight: self.bytes_in_flight as u64,
ssthresh: self.ssthresh as u64,
pacing_rate: self.pacer.rate(),
};
self.qlog_metrics.maybe_update(qlog_metrics)
}
pub fn send_quantum(&self) -> usize {
self.send_quantum
}
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
#[repr(C)]
pub enum CongestionControlAlgorithm {
Reno = 0,
CUBIC = 1,
BBR = 2,
BBR2 = 3,
}
impl FromStr for CongestionControlAlgorithm {
type Err = crate::Error;
fn from_str(name: &str) -> std::result::Result<Self, Self::Err> {
match name {
"reno" => Ok(CongestionControlAlgorithm::Reno),
"cubic" => Ok(CongestionControlAlgorithm::CUBIC),
"bbr" => Ok(CongestionControlAlgorithm::BBR),
"bbr2" => Ok(CongestionControlAlgorithm::BBR2),
_ => Err(crate::Error::CongestionControl),
}
}
}
pub struct CongestionControlOps {
pub on_init: fn(r: &mut Recovery),
pub reset: fn(r: &mut Recovery),
pub on_packet_sent: fn(r: &mut Recovery, sent_bytes: usize, now: Instant),
pub on_packets_acked: fn(
r: &mut Recovery,
packets: &mut Vec<Acked>,
epoch: packet::Epoch,
now: Instant,
),
pub congestion_event: fn(
r: &mut Recovery,
lost_bytes: usize,
largest_lost_packet: &Sent,
epoch: packet::Epoch,
now: Instant,
),
pub collapse_cwnd: fn(r: &mut Recovery),
pub checkpoint: fn(r: &mut Recovery),
pub rollback: fn(r: &mut Recovery) -> bool,
pub has_custom_pacing: fn() -> bool,
pub debug_fmt:
fn(r: &Recovery, formatter: &mut std::fmt::Formatter) -> std::fmt::Result,
}
impl From<CongestionControlAlgorithm> for &'static CongestionControlOps {
fn from(algo: CongestionControlAlgorithm) -> Self {
match algo {
CongestionControlAlgorithm::Reno => &reno::RENO,
CongestionControlAlgorithm::CUBIC => &cubic::CUBIC,
CongestionControlAlgorithm::BBR => &bbr::BBR,
CongestionControlAlgorithm::BBR2 => &bbr2::BBR2,
}
}
}
impl std::fmt::Debug for Recovery {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self.loss_detection_timer {
Some(v) => {
let now = Instant::now();
if v > now {
let d = v.duration_since(now);
write!(f, "timer={d:?} ")?;
} else {
write!(f, "timer=exp ")?;
}
},
None => {
write!(f, "timer=none ")?;
},
};
write!(f, "latest_rtt={:?} ", self.latest_rtt)?;
write!(f, "srtt={:?} ", self.smoothed_rtt)?;
write!(f, "min_rtt={:?} ", self.min_rtt)?;
write!(f, "rttvar={:?} ", self.rttvar)?;
write!(f, "loss_time={:?} ", self.loss_time)?;
write!(f, "loss_probes={:?} ", self.loss_probes)?;
write!(f, "cwnd={} ", self.congestion_window)?;
write!(f, "ssthresh={} ", self.ssthresh)?;
write!(f, "bytes_in_flight={} ", self.bytes_in_flight)?;
write!(f, "app_limited={} ", self.app_limited)?;
write!(
f,
"congestion_recovery_start_time={:?} ",
self.congestion_recovery_start_time
)?;
write!(f, "{:?} ", self.delivery_rate)?;
write!(f, "pacer={:?} ", self.pacer)?;
if self.hystart.enabled() {
write!(f, "hystart={:?} ", self.hystart)?;
}
(self.cc_ops.debug_fmt)(self, f)?;
Ok(())
}
}
#[derive(Clone)]
pub struct Sent {
pub pkt_num: u64,
pub frames: SmallVec<[frame::Frame; 1]>,
pub time_sent: Instant,
pub time_acked: Option<Instant>,
pub time_lost: Option<Instant>,
pub size: usize,
pub ack_eliciting: bool,
pub in_flight: bool,
pub delivered: usize,
pub delivered_time: Instant,
pub first_sent_time: Instant,
pub is_app_limited: bool,
pub tx_in_flight: usize,
pub lost: u64,
pub has_data: bool,
pub pmtud: bool,
}
impl std::fmt::Debug for Sent {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "pkt_num={:?} ", self.pkt_num)?;
write!(f, "pkt_sent_time={:?} ", self.time_sent)?;
write!(f, "pkt_size={:?} ", self.size)?;
write!(f, "delivered={:?} ", self.delivered)?;
write!(f, "delivered_time={:?} ", self.delivered_time)?;
write!(f, "first_sent_time={:?} ", self.first_sent_time)?;
write!(f, "is_app_limited={} ", self.is_app_limited)?;
write!(f, "tx_in_flight={} ", self.tx_in_flight)?;
write!(f, "lost={} ", self.lost)?;
write!(f, "has_data={} ", self.has_data)?;
write!(f, "pmtud={}", self.pmtud)?;
Ok(())
}
}
#[derive(Clone)]
pub struct Acked {
pub pkt_num: u64,
pub time_sent: Instant,
pub size: usize,
pub rtt: Duration,
pub delivered: usize,
pub delivered_time: Instant,
pub first_sent_time: Instant,
pub is_app_limited: bool,
pub tx_in_flight: usize,
pub lost: u64,
}
#[derive(Clone, Copy, Debug)]
pub struct HandshakeStatus {
pub has_handshake_keys: bool,
pub peer_verified_address: bool,
pub completed: bool,
}
#[cfg(test)]
impl Default for HandshakeStatus {
fn default() -> HandshakeStatus {
HandshakeStatus {
has_handshake_keys: true,
peer_verified_address: true,
completed: true,
}
}
}
fn sub_abs(lhs: Duration, rhs: Duration) -> Duration {
if lhs > rhs {
lhs - rhs
} else {
rhs - lhs
}
}
#[derive(Default)]
#[cfg(feature = "qlog")]
struct QlogMetrics {
min_rtt: Duration,
smoothed_rtt: Duration,
latest_rtt: Duration,
rttvar: Duration,
cwnd: u64,
bytes_in_flight: u64,
ssthresh: u64,
pacing_rate: u64,
}
#[cfg(feature = "qlog")]
impl QlogMetrics {
fn maybe_update(&mut self, latest: Self) -> Option<EventData> {
let mut emit_event = false;
let new_min_rtt = if self.min_rtt != latest.min_rtt {
self.min_rtt = latest.min_rtt;
emit_event = true;
Some(latest.min_rtt.as_secs_f32() * 1000.0)
} else {
None
};
let new_smoothed_rtt = if self.smoothed_rtt != latest.smoothed_rtt {
self.smoothed_rtt = latest.smoothed_rtt;
emit_event = true;
Some(latest.smoothed_rtt.as_secs_f32() * 1000.0)
} else {
None
};
let new_latest_rtt = if self.latest_rtt != latest.latest_rtt {
self.latest_rtt = latest.latest_rtt;
emit_event = true;
Some(latest.latest_rtt.as_secs_f32() * 1000.0)
} else {
None
};
let new_rttvar = if self.rttvar != latest.rttvar {
self.rttvar = latest.rttvar;
emit_event = true;
Some(latest.rttvar.as_secs_f32() * 1000.0)
} else {
None
};
let new_cwnd = if self.cwnd != latest.cwnd {
self.cwnd = latest.cwnd;
emit_event = true;
Some(latest.cwnd)
} else {
None
};
let new_bytes_in_flight =
if self.bytes_in_flight != latest.bytes_in_flight {
self.bytes_in_flight = latest.bytes_in_flight;
emit_event = true;
Some(latest.bytes_in_flight)
} else {
None
};
let new_ssthresh = if self.ssthresh != latest.ssthresh {
self.ssthresh = latest.ssthresh;
emit_event = true;
Some(latest.ssthresh)
} else {
None
};
let new_pacing_rate = if self.pacing_rate != latest.pacing_rate {
self.pacing_rate = latest.pacing_rate;
emit_event = true;
Some(latest.pacing_rate)
} else {
None
};
if emit_event {
return Some(EventData::MetricsUpdated(
qlog::events::quic::MetricsUpdated {
min_rtt: new_min_rtt,
smoothed_rtt: new_smoothed_rtt,
latest_rtt: new_latest_rtt,
rtt_variance: new_rttvar,
pto_count: None,
congestion_window: new_cwnd,
bytes_in_flight: new_bytes_in_flight,
ssthresh: new_ssthresh,
packets_in_flight: None,
pacing_rate: new_pacing_rate,
},
));
}
None
}
}
#[cfg(test)]
mod tests {
use super::*;
use smallvec::smallvec;
#[test]
fn lookup_cc_algo_ok() {
let algo = CongestionControlAlgorithm::from_str("reno").unwrap();
assert_eq!(algo, CongestionControlAlgorithm::Reno);
}
#[test]
fn lookup_cc_algo_bad() {
assert_eq!(
CongestionControlAlgorithm::from_str("???"),
Err(crate::Error::CongestionControl)
);
}
#[test]
fn collapse_cwnd() {
let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
cfg.set_cc_algorithm(CongestionControlAlgorithm::Reno);
let mut r = Recovery::new(&cfg);
r.collapse_cwnd();
assert_eq!(r.cwnd(), r.max_datagram_size * MINIMUM_WINDOW_PACKETS);
}
#[test]
fn loss_on_pto() {
let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
cfg.set_cc_algorithm(CongestionControlAlgorithm::Reno);
let mut r = Recovery::new(&cfg);
let mut now = Instant::now();
assert_eq!(r.sent[packet::Epoch::Application].len(), 0);
let p = Sent {
pkt_num: 0,
frames: smallvec![],
time_sent: now,
time_acked: None,
time_lost: None,
size: 1000,
ack_eliciting: true,
in_flight: true,
delivered: 0,
delivered_time: now,
first_sent_time: now,
is_app_limited: false,
tx_in_flight: 0,
lost: 0,
has_data: false,
pmtud: false,
};
r.on_packet_sent(
p,
packet::Epoch::Application,
HandshakeStatus::default(),
now,
"",
);
assert_eq!(r.sent[packet::Epoch::Application].len(), 1);
assert_eq!(r.bytes_in_flight, 1000);
let p = Sent {
pkt_num: 1,
frames: smallvec![],
time_sent: now,
time_acked: None,
time_lost: None,
size: 1000,
ack_eliciting: true,
in_flight: true,
delivered: 0,
delivered_time: now,
first_sent_time: now,
is_app_limited: false,
tx_in_flight: 0,
lost: 0,
has_data: false,
pmtud: false,
};
r.on_packet_sent(
p,
packet::Epoch::Application,
HandshakeStatus::default(),
now,
"",
);
assert_eq!(r.sent[packet::Epoch::Application].len(), 2);
assert_eq!(r.bytes_in_flight, 2000);
let p = Sent {
pkt_num: 2,
frames: smallvec![],
time_sent: now,
time_acked: None,
time_lost: None,
size: 1000,
ack_eliciting: true,
in_flight: true,
delivered: 0,
delivered_time: now,
first_sent_time: now,
is_app_limited: false,
tx_in_flight: 0,
lost: 0,
has_data: false,
pmtud: false,
};
r.on_packet_sent(
p,
packet::Epoch::Application,
HandshakeStatus::default(),
now,
"",
);
assert_eq!(r.sent[packet::Epoch::Application].len(), 3);
assert_eq!(r.bytes_in_flight, 3000);
let p = Sent {
pkt_num: 3,
frames: smallvec![],
time_sent: now,
time_acked: None,
time_lost: None,
size: 1000,
ack_eliciting: true,
in_flight: true,
delivered: 0,
delivered_time: now,
first_sent_time: now,
is_app_limited: false,
tx_in_flight: 0,
lost: 0,
has_data: false,
pmtud: false,
};
r.on_packet_sent(
p,
packet::Epoch::Application,
HandshakeStatus::default(),
now,
"",
);
assert_eq!(r.sent[packet::Epoch::Application].len(), 4);
assert_eq!(r.bytes_in_flight, 4000);
now += Duration::from_millis(10);
let mut acked = ranges::RangeSet::default();
acked.insert(0..2);
assert_eq!(
r.on_ack_received(
&acked,
25,
packet::Epoch::Application,
HandshakeStatus::default(),
now,
"",
&mut Vec::new(),
),
Ok((0, 0))
);
assert_eq!(r.sent[packet::Epoch::Application].len(), 2);
assert_eq!(r.bytes_in_flight, 2000);
assert_eq!(r.lost_count, 0);
now = r.loss_detection_timer().unwrap();
r.on_loss_detection_timeout(HandshakeStatus::default(), now, "");
assert_eq!(r.loss_probes[packet::Epoch::Application], 1);
assert_eq!(r.lost_count, 0);
assert_eq!(r.pto_count, 1);
let p = Sent {
pkt_num: 4,
frames: smallvec![],
time_sent: now,
time_acked: None,
time_lost: None,
size: 1000,
ack_eliciting: true,
in_flight: true,
delivered: 0,
delivered_time: now,
first_sent_time: now,
is_app_limited: false,
tx_in_flight: 0,
lost: 0,
has_data: false,
pmtud: false,
};
r.on_packet_sent(
p,
packet::Epoch::Application,
HandshakeStatus::default(),
now,
"",
);
assert_eq!(r.sent[packet::Epoch::Application].len(), 3);
assert_eq!(r.bytes_in_flight, 3000);
let p = Sent {
pkt_num: 5,
frames: smallvec![],
time_sent: now,
time_acked: None,
time_lost: None,
size: 1000,
ack_eliciting: true,
in_flight: true,
delivered: 0,
delivered_time: now,
first_sent_time: now,
is_app_limited: false,
tx_in_flight: 0,
lost: 0,
has_data: false,
pmtud: false,
};
r.on_packet_sent(
p,
packet::Epoch::Application,
HandshakeStatus::default(),
now,
"",
);
assert_eq!(r.sent[packet::Epoch::Application].len(), 4);
assert_eq!(r.bytes_in_flight, 4000);
assert_eq!(r.lost_count, 0);
now += Duration::from_millis(10);
let mut acked = ranges::RangeSet::default();
acked.insert(4..6);
assert_eq!(
r.on_ack_received(
&acked,
25,
packet::Epoch::Application,
HandshakeStatus::default(),
now,
"",
&mut Vec::new(),
),
Ok((2, 2000))
);
assert_eq!(r.sent[packet::Epoch::Application].len(), 4);
assert_eq!(r.bytes_in_flight, 0);
assert_eq!(r.lost_count, 2);
now += r.rtt();
r.detect_lost_packets(packet::Epoch::Application, now, "");
assert_eq!(r.sent[packet::Epoch::Application].len(), 0);
}
#[test]
fn loss_on_timer() {
let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
cfg.set_cc_algorithm(CongestionControlAlgorithm::Reno);
let mut r = Recovery::new(&cfg);
let mut now = Instant::now();
assert_eq!(r.sent[packet::Epoch::Application].len(), 0);
let p = Sent {
pkt_num: 0,
frames: smallvec![],
time_sent: now,
time_acked: None,
time_lost: None,
size: 1000,
ack_eliciting: true,
in_flight: true,
delivered: 0,
delivered_time: now,
first_sent_time: now,
is_app_limited: false,
tx_in_flight: 0,
lost: 0,
has_data: false,
pmtud: false,
};
r.on_packet_sent(
p,
packet::Epoch::Application,
HandshakeStatus::default(),
now,
"",
);
assert_eq!(r.sent[packet::Epoch::Application].len(), 1);
assert_eq!(r.bytes_in_flight, 1000);
let p = Sent {
pkt_num: 1,
frames: smallvec![],
time_sent: now,
time_acked: None,
time_lost: None,
size: 1000,
ack_eliciting: true,
in_flight: true,
delivered: 0,
delivered_time: now,
first_sent_time: now,
is_app_limited: false,
tx_in_flight: 0,
lost: 0,
has_data: false,
pmtud: false,
};
r.on_packet_sent(
p,
packet::Epoch::Application,
HandshakeStatus::default(),
now,
"",
);
assert_eq!(r.sent[packet::Epoch::Application].len(), 2);
assert_eq!(r.bytes_in_flight, 2000);
let p = Sent {
pkt_num: 2,
frames: smallvec![],
time_sent: now,
time_acked: None,
time_lost: None,
size: 1000,
ack_eliciting: true,
in_flight: true,
delivered: 0,
delivered_time: now,
first_sent_time: now,
is_app_limited: false,
tx_in_flight: 0,
lost: 0,
has_data: false,
pmtud: false,
};
r.on_packet_sent(
p,
packet::Epoch::Application,
HandshakeStatus::default(),
now,
"",
);
assert_eq!(r.sent[packet::Epoch::Application].len(), 3);
assert_eq!(r.bytes_in_flight, 3000);
let p = Sent {
pkt_num: 3,
frames: smallvec![],
time_sent: now,
time_acked: None,
time_lost: None,
size: 1000,
ack_eliciting: true,
in_flight: true,
delivered: 0,
delivered_time: now,
first_sent_time: now,
is_app_limited: false,
tx_in_flight: 0,
lost: 0,
has_data: false,
pmtud: false,
};
r.on_packet_sent(
p,
packet::Epoch::Application,
HandshakeStatus::default(),
now,
"",
);
assert_eq!(r.sent[packet::Epoch::Application].len(), 4);
assert_eq!(r.bytes_in_flight, 4000);
now += Duration::from_millis(10);
let mut acked = ranges::RangeSet::default();
acked.insert(0..2);
acked.insert(3..4);
assert_eq!(
r.on_ack_received(
&acked,
25,
packet::Epoch::Application,
HandshakeStatus::default(),
now,
"",
&mut Vec::new(),
),
Ok((0, 0))
);
assert_eq!(r.sent[packet::Epoch::Application].len(), 2);
assert_eq!(r.bytes_in_flight, 1000);
assert_eq!(r.lost_count, 0);
now = r.loss_detection_timer().unwrap();
r.on_loss_detection_timeout(HandshakeStatus::default(), now, "");
assert_eq!(r.loss_probes[packet::Epoch::Application], 0);
assert_eq!(r.sent[packet::Epoch::Application].len(), 2);
assert_eq!(r.bytes_in_flight, 0);
assert_eq!(r.lost_count, 1);
now += r.rtt();
r.detect_lost_packets(packet::Epoch::Application, now, "");
assert_eq!(r.sent[packet::Epoch::Application].len(), 0);
}
#[test]
fn loss_on_reordering() {
let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
cfg.set_cc_algorithm(CongestionControlAlgorithm::Reno);
let mut r = Recovery::new(&cfg);
let mut now = Instant::now();
assert_eq!(r.sent[packet::Epoch::Application].len(), 0);
let p = Sent {
pkt_num: 0,
frames: smallvec![],
time_sent: now,
time_acked: None,
time_lost: None,
size: 1000,
ack_eliciting: true,
in_flight: true,
delivered: 0,
delivered_time: now,
first_sent_time: now,
is_app_limited: false,
tx_in_flight: 0,
lost: 0,
has_data: false,
pmtud: false,
};
r.on_packet_sent(
p,
packet::Epoch::Application,
HandshakeStatus::default(),
now,
"",
);
assert_eq!(r.sent[packet::Epoch::Application].len(), 1);
assert_eq!(r.bytes_in_flight, 1000);
let p = Sent {
pkt_num: 1,
frames: smallvec![],
time_sent: now,
time_acked: None,
time_lost: None,
size: 1000,
ack_eliciting: true,
in_flight: true,
delivered: 0,
delivered_time: now,
first_sent_time: now,
is_app_limited: false,
tx_in_flight: 0,
lost: 0,
has_data: false,
pmtud: false,
};
r.on_packet_sent(
p,
packet::Epoch::Application,
HandshakeStatus::default(),
now,
"",
);
assert_eq!(r.sent[packet::Epoch::Application].len(), 2);
assert_eq!(r.bytes_in_flight, 2000);
let p = Sent {
pkt_num: 2,
frames: smallvec![],
time_sent: now,
time_acked: None,
time_lost: None,
size: 1000,
ack_eliciting: true,
in_flight: true,
delivered: 0,
delivered_time: now,
first_sent_time: now,
is_app_limited: false,
tx_in_flight: 0,
lost: 0,
has_data: false,
pmtud: false,
};
r.on_packet_sent(
p,
packet::Epoch::Application,
HandshakeStatus::default(),
now,
"",
);
assert_eq!(r.sent[packet::Epoch::Application].len(), 3);
assert_eq!(r.bytes_in_flight, 3000);
let p = Sent {
pkt_num: 3,
frames: smallvec![],
time_sent: now,
time_acked: None,
time_lost: None,
size: 1000,
ack_eliciting: true,
in_flight: true,
delivered: 0,
delivered_time: now,
first_sent_time: now,
is_app_limited: false,
tx_in_flight: 0,
lost: 0,
has_data: false,
pmtud: false,
};
r.on_packet_sent(
p,
packet::Epoch::Application,
HandshakeStatus::default(),
now,
"",
);
assert_eq!(r.sent[packet::Epoch::Application].len(), 4);
assert_eq!(r.bytes_in_flight, 4000);
now += Duration::from_millis(10);
let mut acked = ranges::RangeSet::default();
acked.insert(2..4);
assert_eq!(
r.on_ack_received(
&acked,
25,
packet::Epoch::Application,
HandshakeStatus::default(),
now,
"",
&mut Vec::new(),
),
Ok((1, 1000))
);
now += Duration::from_millis(10);
let mut acked = ranges::RangeSet::default();
acked.insert(0..2);
assert_eq!(r.pkt_thresh, INITIAL_PACKET_THRESHOLD);
assert_eq!(
r.on_ack_received(
&acked,
25,
packet::Epoch::Application,
HandshakeStatus::default(),
now,
"",
&mut Vec::new(),
),
Ok((0, 0))
);
assert_eq!(r.sent[packet::Epoch::Application].len(), 4);
assert_eq!(r.bytes_in_flight, 0);
assert_eq!(r.lost_count, 1);
assert_eq!(r.lost_spurious_count, 1);
assert_eq!(r.pkt_thresh, 4);
now += r.rtt();
r.detect_lost_packets(packet::Epoch::Application, now, "");
assert_eq!(r.sent[packet::Epoch::Application].len(), 0);
}
#[test]
fn pacing() {
let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
cfg.set_cc_algorithm(CongestionControlAlgorithm::CUBIC);
let mut r = Recovery::new(&cfg);
let mut now = Instant::now();
assert_eq!(r.sent[packet::Epoch::Application].len(), 0);
let p = Sent {
pkt_num: 0,
frames: smallvec![],
time_sent: now,
time_acked: None,
time_lost: None,
size: 12000,
ack_eliciting: true,
in_flight: true,
delivered: 0,
delivered_time: now,
first_sent_time: now,
is_app_limited: false,
tx_in_flight: 0,
lost: 0,
has_data: false,
pmtud: false,
};
r.on_packet_sent(
p,
packet::Epoch::Application,
HandshakeStatus::default(),
now,
"",
);
assert_eq!(r.sent[packet::Epoch::Application].len(), 1);
assert_eq!(r.bytes_in_flight, 12000);
assert_eq!(r.pacer.rate(), 0);
assert_eq!(r.get_packet_send_time(), now);
now += Duration::from_millis(50);
let mut acked = ranges::RangeSet::default();
acked.insert(0..1);
assert_eq!(
r.on_ack_received(
&acked,
10,
packet::Epoch::Application,
HandshakeStatus::default(),
now,
"",
&mut Vec::new(),
),
Ok((0, 0))
);
assert_eq!(r.sent[packet::Epoch::Application].len(), 0);
assert_eq!(r.bytes_in_flight, 0);
assert_eq!(r.smoothed_rtt.unwrap(), Duration::from_millis(50));
assert_eq!(r.congestion_window, 12000 + 1200);
let p = Sent {
pkt_num: 1,
frames: smallvec![],
time_sent: now,
time_acked: None,
time_lost: None,
size: 6000,
ack_eliciting: true,
in_flight: true,
delivered: 0,
delivered_time: now,
first_sent_time: now,
is_app_limited: false,
tx_in_flight: 0,
lost: 0,
has_data: false,
pmtud: false,
};
r.on_packet_sent(
p,
packet::Epoch::Application,
HandshakeStatus::default(),
now,
"",
);
assert_eq!(r.sent[packet::Epoch::Application].len(), 1);
assert_eq!(r.bytes_in_flight, 6000);
assert_eq!(r.get_packet_send_time(), now);
let p = Sent {
pkt_num: 2,
frames: smallvec![],
time_sent: now,
time_acked: None,
time_lost: None,
size: 6000,
ack_eliciting: true,
in_flight: true,
delivered: 0,
delivered_time: now,
first_sent_time: now,
is_app_limited: false,
tx_in_flight: 0,
lost: 0,
has_data: false,
pmtud: false,
};
r.on_packet_sent(
p,
packet::Epoch::Application,
HandshakeStatus::default(),
now,
"",
);
assert_eq!(r.sent[packet::Epoch::Application].len(), 2);
assert_eq!(r.bytes_in_flight, 12000);
let p = Sent {
pkt_num: 3,
frames: smallvec![],
time_sent: now,
time_acked: None,
time_lost: None,
size: 1000,
ack_eliciting: true,
in_flight: true,
delivered: 0,
delivered_time: now,
first_sent_time: now,
is_app_limited: false,
tx_in_flight: 0,
lost: 0,
has_data: false,
pmtud: false,
};
r.on_packet_sent(
p,
packet::Epoch::Application,
HandshakeStatus::default(),
now,
"",
);
assert_eq!(r.sent[packet::Epoch::Application].len(), 3);
assert_eq!(r.bytes_in_flight, 13000);
let pacing_rate =
(r.congestion_window as f64 * PACING_MULTIPLIER / 0.05) as u64;
assert_eq!(r.pacer.rate(), pacing_rate);
assert_eq!(
r.get_packet_send_time(),
now + Duration::from_secs_f64(12000.0 / pacing_rate as f64)
);
}
#[test]
fn pmtud_loss_on_timer() {
let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
cfg.set_cc_algorithm(CongestionControlAlgorithm::Reno);
let mut r = Recovery::new(&cfg);
let mut now = Instant::now();
assert_eq!(r.sent[packet::Epoch::Application].len(), 0);
let p = Sent {
pkt_num: 0,
frames: smallvec![],
time_sent: now,
time_acked: None,
time_lost: None,
size: 1000,
ack_eliciting: true,
in_flight: true,
delivered: 0,
delivered_time: now,
first_sent_time: now,
is_app_limited: false,
tx_in_flight: 0,
lost: 0,
has_data: false,
pmtud: false,
};
r.on_packet_sent(
p,
packet::Epoch::Application,
HandshakeStatus::default(),
now,
"",
);
assert_eq!(r.sent[packet::Epoch::Application].len(), 1);
assert_eq!(r.bytes_in_flight, 1000);
let p = Sent {
pkt_num: 1,
frames: smallvec![],
time_sent: now,
time_acked: None,
time_lost: None,
size: 1000,
ack_eliciting: true,
in_flight: true,
delivered: 0,
delivered_time: now,
first_sent_time: now,
is_app_limited: false,
tx_in_flight: 0,
lost: 0,
has_data: false,
pmtud: true,
};
r.on_packet_sent(
p,
packet::Epoch::Application,
HandshakeStatus::default(),
now,
"",
);
let p = Sent {
pkt_num: 2,
frames: smallvec![],
time_sent: now,
time_acked: None,
time_lost: None,
size: 1000,
ack_eliciting: true,
in_flight: true,
delivered: 0,
delivered_time: now,
first_sent_time: now,
is_app_limited: false,
tx_in_flight: 0,
lost: 0,
has_data: false,
pmtud: false,
};
r.on_packet_sent(
p,
packet::Epoch::Application,
HandshakeStatus::default(),
now,
"",
);
now += Duration::from_millis(10);
let mut acked = ranges::RangeSet::default();
acked.insert(0..1);
acked.insert(2..3);
assert_eq!(
r.on_ack_received(
&acked,
25,
packet::Epoch::Application,
HandshakeStatus::default(),
now,
"",
&mut Vec::new(),
),
Ok((0, 0))
);
assert_eq!(r.sent[packet::Epoch::Application].len(), 2);
assert_eq!(r.bytes_in_flight, 1000);
assert_eq!(r.lost_count, 0);
now = r.loss_detection_timer().unwrap();
r.on_loss_detection_timeout(HandshakeStatus::default(), now, "");
assert_eq!(r.loss_probes[packet::Epoch::Application], 0);
assert_eq!(r.sent[packet::Epoch::Application].len(), 2);
assert_eq!(r.bytes_in_flight, 1000);
assert_eq!(r.congestion_window, 12000);
assert_eq!(r.lost_count, 0);
now += r.rtt();
r.detect_lost_packets(packet::Epoch::Application, now, "");
assert_eq!(r.sent[packet::Epoch::Application].len(), 0);
}
}
mod bbr;
mod bbr2;
mod cubic;
mod delivery_rate;
mod hystart;
mod pacer;
mod prr;
mod reno;