quiche/recovery/gcongestion/
pacer.rs1use std::time::Instant;
32
33use crate::recovery::gcongestion::bbr2::BBRv2;
34use crate::recovery::gcongestion::Bandwidth;
35use crate::recovery::gcongestion::CongestionControl;
36use crate::recovery::rtt::RttStats;
37use crate::recovery::RecoveryStats;
38use crate::recovery::ReleaseDecision;
39use crate::recovery::ReleaseTime;
40
41use super::Acked;
42use super::Lost;
43
44const LUMPY_PACING_CWND_FRACTION: f64 = 0.25;
47
48const LUMPY_PACING_SIZE: usize = 2;
51
52const LUMPY_PACING_MIN_BANDWIDTH_KBPS: Bandwidth =
55 Bandwidth::from_kbits_per_second(1_200);
56
57const INITIAL_UNPACED_BURST: usize = 10;
60
61#[derive(Debug)]
62pub struct Pacer {
63 enabled: bool,
65 sender: BBRv2,
67 max_pacing_rate: Option<Bandwidth>,
69 burst_tokens: usize,
71 ideal_next_packet_send_time: ReleaseTime,
73 initial_burst_size: usize,
74 lumpy_tokens: usize,
77 pacing_limited: bool,
80}
81
82impl Pacer {
83 pub(crate) fn new(
87 enabled: bool, congestion: BBRv2, max_pacing_rate: Option<Bandwidth>,
88 ) -> Self {
89 Pacer {
90 enabled,
91 sender: congestion,
92 max_pacing_rate,
93 burst_tokens: INITIAL_UNPACED_BURST,
94 ideal_next_packet_send_time: ReleaseTime::Immediate,
95 initial_burst_size: INITIAL_UNPACED_BURST,
96 lumpy_tokens: 0,
97 pacing_limited: false,
98 }
99 }
100
101 pub fn get_next_release_time(&self) -> ReleaseDecision {
102 if !self.enabled {
103 return ReleaseDecision {
104 time: ReleaseTime::Immediate,
105 allow_burst: true,
106 };
107 }
108
109 let allow_burst = self.burst_tokens > 0 || self.lumpy_tokens > 0;
110 ReleaseDecision {
111 time: self.ideal_next_packet_send_time,
112 allow_burst,
113 }
114 }
115
116 #[cfg(feature = "qlog")]
117 pub fn state_str(&self) -> &'static str {
118 self.sender.state_str()
119 }
120
121 pub fn get_congestion_window(&self) -> usize {
122 self.sender.get_congestion_window()
123 }
124
125 pub fn on_packet_sent(
126 &mut self, sent_time: Instant, bytes_in_flight: usize,
127 packet_number: u64, bytes: usize, is_retransmissible: bool,
128 rtt_stats: &RttStats,
129 ) {
130 self.sender.on_packet_sent(
131 sent_time,
132 bytes_in_flight,
133 packet_number,
134 bytes,
135 is_retransmissible,
136 rtt_stats,
137 );
138
139 if !self.enabled || !is_retransmissible {
140 return;
141 }
142
143 if bytes_in_flight == 0 && !self.sender.is_in_recovery() {
145 self.burst_tokens = self
149 .initial_burst_size
150 .min(self.sender.get_congestion_window_in_packets());
151 }
152
153 if self.burst_tokens > 0 {
154 self.burst_tokens -= 1;
155 self.ideal_next_packet_send_time = ReleaseTime::Immediate;
156 self.pacing_limited = false;
157 return;
158 }
159
160 let delay = self
164 .pacing_rate(bytes_in_flight + bytes, rtt_stats)
165 .transfer_time(bytes);
166
167 if !self.pacing_limited || self.lumpy_tokens == 0 {
168 self.lumpy_tokens = 1.max(LUMPY_PACING_SIZE.min(
171 (self.sender.get_congestion_window_in_packets() as f64 *
172 LUMPY_PACING_CWND_FRACTION) as usize,
173 ));
174
175 if self.sender.bandwidth_estimate(rtt_stats) <
176 LUMPY_PACING_MIN_BANDWIDTH_KBPS
177 {
178 self.lumpy_tokens = 1;
181 }
182
183 if bytes_in_flight + bytes >= self.sender.get_congestion_window() {
184 self.lumpy_tokens = 1;
187 }
188 }
189
190 self.lumpy_tokens -= 1;
191 self.ideal_next_packet_send_time.set_max(sent_time);
192 self.ideal_next_packet_send_time.inc(delay);
193 self.pacing_limited = self.sender.can_send(bytes_in_flight + bytes);
195 }
196
197 #[allow(clippy::too_many_arguments)]
198 #[inline]
199 pub fn on_congestion_event(
200 &mut self, rtt_updated: bool, prior_in_flight: usize,
201 bytes_in_flight: usize, event_time: Instant, acked_packets: &[Acked],
202 lost_packets: &[Lost], least_unacked: u64, rtt_stats: &RttStats,
203 recovery_stats: &mut RecoveryStats,
204 ) {
205 self.sender.on_congestion_event(
206 rtt_updated,
207 prior_in_flight,
208 bytes_in_flight,
209 event_time,
210 acked_packets,
211 lost_packets,
212 least_unacked,
213 rtt_stats,
214 recovery_stats,
215 );
216
217 if !self.enabled {
218 return;
219 }
220
221 if !lost_packets.is_empty() {
222 self.burst_tokens = 0;
224 }
225
226 if let Some(max_pacing_rate) = self.max_pacing_rate {
227 if rtt_updated {
228 let max_rate = max_pacing_rate * 1.25f32;
229 let max_cwnd =
230 max_rate.to_bytes_per_period(rtt_stats.smoothed_rtt);
231 self.sender.limit_cwnd(max_cwnd as usize);
232 }
233 }
234 }
235
236 pub fn on_packet_neutered(&mut self, packet_number: u64) {
237 self.sender.on_packet_neutered(packet_number);
238 }
239
240 pub fn on_retransmission_timeout(&mut self, packets_retransmitted: bool) {
241 self.sender.on_retransmission_timeout(packets_retransmitted)
242 }
243
244 pub fn pacing_rate(
245 &self, bytes_in_flight: usize, rtt_stats: &RttStats,
246 ) -> Bandwidth {
247 let sender_rate = self.sender.pacing_rate(bytes_in_flight, rtt_stats);
248 match self.max_pacing_rate {
249 Some(rate) if self.enabled => rate.min(sender_rate),
250 _ => sender_rate,
251 }
252 }
253
254 pub fn bandwidth_estimate(&self, rtt_stats: &RttStats) -> Bandwidth {
255 self.sender.bandwidth_estimate(rtt_stats)
256 }
257
258 pub fn max_bandwidth(&self) -> Bandwidth {
259 self.sender.max_bandwidth()
260 }
261
262 pub fn on_app_limited(&mut self, bytes_in_flight: usize) {
263 self.pacing_limited = false;
264 self.sender.on_app_limited(bytes_in_flight);
265 }
266
267 pub fn update_mss(&mut self, new_mss: usize) {
268 self.sender.update_mss(new_mss)
269 }
270
271 #[cfg(feature = "qlog")]
272 pub fn ssthresh(&self) -> Option<u64> {
273 self.sender.ssthresh()
274 }
275
276 #[cfg(test)]
277 pub fn is_app_limited(&self, bytes_in_flight: usize) -> bool {
278 !self.is_cwnd_limited(bytes_in_flight)
279 }
280
281 #[cfg(test)]
282 fn is_cwnd_limited(&self, bytes_in_flight: usize) -> bool {
283 !self.pacing_limited && self.sender.is_cwnd_limited(bytes_in_flight)
284 }
285}