quiche/recovery/gcongestion/
pacer.rs1use std::time::Instant;
32
33use crate::recovery::gcongestion::Bandwidth;
34use crate::recovery::rtt::RttStats;
35use crate::recovery::RecoveryStats;
36use crate::recovery::ReleaseDecision;
37use crate::recovery::ReleaseTime;
38
39use super::Acked;
40use super::Congestion;
41use super::CongestionControl;
42use super::Lost;
43
44const LUMPY_PACING_CWND_FRACTION: f64 = 0.25;
47
48const LUMPY_PACING_SIZE: usize = 2;
51
52const LUMPY_PACING_MIN_BANDWIDTH_KBPS: Bandwidth =
55 Bandwidth::from_kbits_per_second(1_200);
56
57const INITIAL_UNPACED_BURST: usize = 10;
60
61#[derive(Debug)]
62pub struct Pacer {
63 enabled: bool,
65 sender: Congestion,
67 max_pacing_rate: Option<Bandwidth>,
69 burst_tokens: usize,
71 ideal_next_packet_send_time: ReleaseTime,
73 initial_burst_size: usize,
74 lumpy_tokens: usize,
77 pacing_limited: bool,
80}
81
82impl Pacer {
83 pub(crate) fn new(
87 enabled: bool, congestion: Congestion, max_pacing_rate: Option<Bandwidth>,
88 ) -> Self {
89 Pacer {
90 enabled,
91 sender: congestion,
92 max_pacing_rate,
93 burst_tokens: INITIAL_UNPACED_BURST,
94 ideal_next_packet_send_time: ReleaseTime::Immediate,
95 initial_burst_size: INITIAL_UNPACED_BURST,
96 lumpy_tokens: 0,
97 pacing_limited: false,
98 }
99 }
100
101 pub fn get_next_release_time(&self) -> ReleaseDecision {
102 if !self.enabled {
103 return ReleaseDecision {
104 time: ReleaseTime::Immediate,
105 allow_burst: true,
106 };
107 }
108
109 let allow_burst = self.burst_tokens > 0 || self.lumpy_tokens > 0;
110 ReleaseDecision {
111 time: self.ideal_next_packet_send_time,
112 allow_burst,
113 }
114 }
115}
116
117impl CongestionControl for Pacer {
118 #[cfg(feature = "qlog")]
119 fn state_str(&self) -> &'static str {
120 self.sender.state_str()
121 }
122
123 fn get_congestion_window(&self) -> usize {
124 self.sender.get_congestion_window()
125 }
126
127 fn get_congestion_window_in_packets(&self) -> usize {
128 self.sender.get_congestion_window_in_packets()
129 }
130
131 fn can_send(&self, bytes_in_flight: usize) -> bool {
132 self.sender.can_send(bytes_in_flight)
133 }
134
135 fn on_packet_sent(
136 &mut self, sent_time: Instant, bytes_in_flight: usize,
137 packet_number: u64, bytes: usize, is_retransmissible: bool,
138 rtt_stats: &RttStats,
139 ) {
140 self.sender.on_packet_sent(
141 sent_time,
142 bytes_in_flight,
143 packet_number,
144 bytes,
145 is_retransmissible,
146 rtt_stats,
147 );
148
149 if !self.enabled || !is_retransmissible {
150 return;
151 }
152
153 if bytes_in_flight == 0 && !self.sender.is_in_recovery() {
155 self.burst_tokens = self
159 .initial_burst_size
160 .min(self.sender.get_congestion_window_in_packets());
161 }
162
163 if self.burst_tokens > 0 {
164 self.burst_tokens -= 1;
165 self.ideal_next_packet_send_time = ReleaseTime::Immediate;
166 self.pacing_limited = false;
167 return;
168 }
169
170 let delay = self
174 .pacing_rate(bytes_in_flight + bytes, rtt_stats)
175 .transfer_time(bytes);
176
177 if !self.pacing_limited || self.lumpy_tokens == 0 {
178 self.lumpy_tokens = 1.max(LUMPY_PACING_SIZE.min(
181 (self.sender.get_congestion_window_in_packets() as f64 *
182 LUMPY_PACING_CWND_FRACTION) as usize,
183 ));
184
185 if self.sender.bandwidth_estimate(rtt_stats) <
186 LUMPY_PACING_MIN_BANDWIDTH_KBPS
187 {
188 self.lumpy_tokens = 1;
191 }
192
193 if bytes_in_flight + bytes >= self.sender.get_congestion_window() {
194 self.lumpy_tokens = 1;
197 }
198 }
199
200 self.lumpy_tokens -= 1;
201 self.ideal_next_packet_send_time.set_max(sent_time);
202 self.ideal_next_packet_send_time.inc(delay);
203 self.pacing_limited = self.sender.can_send(bytes_in_flight + bytes);
205 }
206
207 #[inline]
208 fn on_congestion_event(
209 &mut self, rtt_updated: bool, prior_in_flight: usize,
210 bytes_in_flight: usize, event_time: Instant, acked_packets: &[Acked],
211 lost_packets: &[Lost], least_unacked: u64, rtt_stats: &RttStats,
212 recovery_stats: &mut RecoveryStats,
213 ) {
214 self.sender.on_congestion_event(
215 rtt_updated,
216 prior_in_flight,
217 bytes_in_flight,
218 event_time,
219 acked_packets,
220 lost_packets,
221 least_unacked,
222 rtt_stats,
223 recovery_stats,
224 );
225
226 if !self.enabled {
227 return;
228 }
229
230 if !lost_packets.is_empty() {
231 self.burst_tokens = 0;
233 }
234
235 if let Some(max_pacing_rate) = self.max_pacing_rate {
236 if rtt_updated {
237 let max_rate = max_pacing_rate * 1.25f32;
238 let max_cwnd =
239 max_rate.to_bytes_per_period(rtt_stats.smoothed_rtt);
240 self.sender.limit_cwnd(max_cwnd as usize);
241 }
242 }
243 }
244
245 fn on_packet_neutered(&mut self, packet_number: u64) {
246 self.sender.on_packet_neutered(packet_number);
247 }
248
249 fn on_retransmission_timeout(&mut self, packets_retransmitted: bool) {
250 self.sender.on_retransmission_timeout(packets_retransmitted)
251 }
252
253 fn on_connection_migration(&mut self) {
254 self.sender.on_connection_migration()
255 }
256
257 fn is_cwnd_limited(&self, bytes_in_flight: usize) -> bool {
258 !self.pacing_limited && self.sender.is_cwnd_limited(bytes_in_flight)
259 }
260
261 fn is_in_recovery(&self) -> bool {
262 self.sender.is_in_recovery()
263 }
264
265 fn pacing_rate(
266 &self, bytes_in_flight: usize, rtt_stats: &RttStats,
267 ) -> Bandwidth {
268 let sender_rate = self.sender.pacing_rate(bytes_in_flight, rtt_stats);
269 match self.max_pacing_rate {
270 Some(rate) if self.enabled => rate.min(sender_rate),
271 _ => sender_rate,
272 }
273 }
274
275 fn bandwidth_estimate(&self, rtt_stats: &RttStats) -> Bandwidth {
276 self.sender.bandwidth_estimate(rtt_stats)
277 }
278
279 fn on_app_limited(&mut self, bytes_in_flight: usize) {
280 self.pacing_limited = false;
281 self.sender.on_app_limited(bytes_in_flight);
282 }
283
284 fn update_mss(&mut self, new_mss: usize) {
285 self.sender.update_mss(new_mss)
286 }
287
288 #[cfg(feature = "qlog")]
289 fn ssthresh(&self) -> Option<u64> {
290 self.sender.ssthresh()
291 }
292}