quiche/recovery/gcongestion/
pacer.rs1use std::time::Instant;
32
33use crate::recovery::gcongestion::bbr2::BBRv2;
34use crate::recovery::gcongestion::Bandwidth;
35use crate::recovery::gcongestion::CongestionControl;
36use crate::recovery::rtt::RttStats;
37use crate::recovery::RecoveryStats;
38use crate::recovery::ReleaseDecision;
39use crate::recovery::ReleaseTime;
40
41use super::Acked;
42use super::Lost;
43
44const LUMPY_PACING_CWND_FRACTION: f64 = 0.25;
47
48const LUMPY_PACING_SIZE: usize = 2;
51
52const LUMPY_PACING_MIN_BANDWIDTH_KBPS: Bandwidth =
55 Bandwidth::from_kbits_per_second(1_200);
56
57const INITIAL_UNPACED_BURST: usize = 10;
60
61#[derive(Debug)]
62pub struct Pacer {
63 enabled: bool,
65 sender: BBRv2,
67 max_pacing_rate: Option<Bandwidth>,
69 burst_tokens: usize,
71 ideal_next_packet_send_time: ReleaseTime,
73 initial_burst_size: usize,
74 lumpy_tokens: usize,
77 pacing_limited: bool,
80}
81
82impl Pacer {
83 pub(crate) fn new(
87 enabled: bool, congestion: BBRv2, max_pacing_rate: Option<Bandwidth>,
88 ) -> Self {
89 Pacer {
90 enabled,
91 sender: congestion,
92 max_pacing_rate,
93 burst_tokens: INITIAL_UNPACED_BURST,
94 ideal_next_packet_send_time: ReleaseTime::Immediate,
95 initial_burst_size: INITIAL_UNPACED_BURST,
96 lumpy_tokens: 0,
97 pacing_limited: false,
98 }
99 }
100
101 pub fn get_next_release_time(&self) -> ReleaseDecision {
102 if !self.enabled {
103 return ReleaseDecision {
104 time: ReleaseTime::Immediate,
105 allow_burst: true,
106 };
107 }
108
109 let allow_burst = self.burst_tokens > 0 || self.lumpy_tokens > 0;
110 ReleaseDecision {
111 time: self.ideal_next_packet_send_time,
112 allow_burst,
113 }
114 }
115
116 #[cfg(feature = "qlog")]
117 pub fn state_str(&self) -> &'static str {
118 self.sender.state_str()
119 }
120
121 pub fn get_congestion_window(&self) -> usize {
122 self.sender.get_congestion_window()
123 }
124
125 pub fn on_packet_sent(
126 &mut self, sent_time: Instant, bytes_in_flight: usize,
127 packet_number: u64, bytes: usize, is_retransmissible: bool,
128 rtt_stats: &RttStats,
129 ) {
130 self.sender.on_packet_sent(
131 sent_time,
132 bytes_in_flight,
133 packet_number,
134 bytes,
135 is_retransmissible,
136 );
137
138 if !self.enabled || !is_retransmissible {
139 return;
140 }
141
142 if bytes_in_flight == 0 && !self.sender.is_in_recovery() {
144 self.burst_tokens = self
148 .initial_burst_size
149 .min(self.sender.get_congestion_window_in_packets());
150 }
151
152 if self.burst_tokens > 0 {
153 self.burst_tokens -= 1;
154 self.ideal_next_packet_send_time = ReleaseTime::Immediate;
155 self.pacing_limited = false;
156 return;
157 }
158
159 let delay = self
163 .pacing_rate(bytes_in_flight + bytes, rtt_stats)
164 .transfer_time(bytes);
165
166 if !self.pacing_limited || self.lumpy_tokens == 0 {
167 self.lumpy_tokens = 1.max(LUMPY_PACING_SIZE.min(
170 (self.sender.get_congestion_window_in_packets() as f64 *
171 LUMPY_PACING_CWND_FRACTION) as usize,
172 ));
173
174 if self.sender.bandwidth_estimate(rtt_stats) <
175 LUMPY_PACING_MIN_BANDWIDTH_KBPS
176 {
177 self.lumpy_tokens = 1;
180 }
181
182 if bytes_in_flight + bytes >= self.sender.get_congestion_window() {
183 self.lumpy_tokens = 1;
186 }
187 }
188
189 self.lumpy_tokens -= 1;
190 self.ideal_next_packet_send_time.set_max(sent_time);
191 self.ideal_next_packet_send_time.inc(delay);
192 self.pacing_limited = self.sender.can_send(bytes_in_flight + bytes);
194 }
195
196 #[allow(clippy::too_many_arguments)]
197 #[inline]
198 pub fn on_congestion_event(
199 &mut self, rtt_updated: bool, prior_in_flight: usize,
200 bytes_in_flight: usize, event_time: Instant, acked_packets: &[Acked],
201 lost_packets: &[Lost], least_unacked: u64, rtt_stats: &RttStats,
202 recovery_stats: &mut RecoveryStats,
203 ) {
204 self.sender.on_congestion_event(
205 rtt_updated,
206 prior_in_flight,
207 bytes_in_flight,
208 event_time,
209 acked_packets,
210 lost_packets,
211 least_unacked,
212 rtt_stats,
213 recovery_stats,
214 );
215
216 if !self.enabled {
217 return;
218 }
219
220 if !lost_packets.is_empty() {
221 self.burst_tokens = 0;
223 }
224
225 if let Some(max_pacing_rate) = self.max_pacing_rate {
226 if rtt_updated {
227 let max_rate = max_pacing_rate * 1.25f32;
228 let max_cwnd =
229 max_rate.to_bytes_per_period(rtt_stats.smoothed_rtt);
230 self.sender.limit_cwnd(max_cwnd as usize);
231 }
232 }
233 }
234
235 pub fn on_packet_neutered(&mut self, packet_number: u64) {
236 self.sender.on_packet_neutered(packet_number);
237 }
238
239 pub fn on_retransmission_timeout(&mut self, packets_retransmitted: bool) {
240 self.sender.on_retransmission_timeout(packets_retransmitted)
241 }
242
243 pub fn pacing_rate(
244 &self, bytes_in_flight: usize, rtt_stats: &RttStats,
245 ) -> Bandwidth {
246 let sender_rate = self.sender.pacing_rate(bytes_in_flight, rtt_stats);
247 match self.max_pacing_rate {
248 Some(rate) if self.enabled => rate.min(sender_rate),
249 _ => sender_rate,
250 }
251 }
252
253 pub fn bandwidth_estimate(&self, rtt_stats: &RttStats) -> Bandwidth {
254 self.sender.bandwidth_estimate(rtt_stats)
255 }
256
257 pub fn max_bandwidth(&self) -> Bandwidth {
258 self.sender.max_bandwidth()
259 }
260
261 pub fn on_app_limited(&mut self, bytes_in_flight: usize) {
262 self.pacing_limited = false;
263 self.sender.on_app_limited(bytes_in_flight);
264 }
265
266 pub fn update_mss(&mut self, new_mss: usize) {
267 self.sender.update_mss(new_mss)
268 }
269
270 #[cfg(feature = "qlog")]
271 pub fn ssthresh(&self) -> Option<u64> {
272 self.sender.ssthresh()
273 }
274
275 #[cfg(test)]
276 pub fn is_app_limited(&self, bytes_in_flight: usize) -> bool {
277 !self.is_cwnd_limited(bytes_in_flight)
278 }
279
280 #[cfg(test)]
281 fn is_cwnd_limited(&self, bytes_in_flight: usize) -> bool {
282 !self.pacing_limited && self.sender.is_cwnd_limited(bytes_in_flight)
283 }
284}