quiche/recovery/gcongestion/
pacer.rs1use std::time::Instant;
32
33use crate::recovery::rtt::RttStats;
34use crate::recovery::ReleaseDecision;
35use crate::recovery::ReleaseTime;
36
37use super::bandwidth::Bandwidth;
38use super::Acked;
39use super::Congestion;
40use super::CongestionControl;
41use super::Lost;
42
43const LUMPY_PACING_CWND_FRACTION: f64 = 0.25;
46
47const LUMPY_PACING_SIZE: usize = 2;
50
51const LUMPY_PACING_MIN_BANDWIDTH_KBPS: Bandwidth =
54 Bandwidth::from_kbits_per_second(1_200);
55
56const INITIAL_UNPACED_BURST: usize = 10;
59
60#[derive(Debug)]
61pub struct Pacer {
62 enabled: bool,
64 sender: Congestion,
66 max_pacing_rate: Option<Bandwidth>,
68 burst_tokens: usize,
70 ideal_next_packet_send_time: ReleaseTime,
72 initial_burst_size: usize,
73 lumpy_tokens: usize,
76 pacing_limited: bool,
79}
80
81impl Pacer {
82 pub(crate) fn new(
86 enabled: bool, congestion: Congestion, max_pacing_rate: Option<Bandwidth>,
87 ) -> Self {
88 Pacer {
89 enabled,
90 sender: congestion,
91 max_pacing_rate,
92 burst_tokens: INITIAL_UNPACED_BURST,
93 ideal_next_packet_send_time: ReleaseTime::Immediate,
94 initial_burst_size: INITIAL_UNPACED_BURST,
95 lumpy_tokens: 0,
96 pacing_limited: false,
97 }
98 }
99
100 pub fn get_next_release_time(&self) -> ReleaseDecision {
101 if !self.enabled {
102 return ReleaseDecision {
103 time: ReleaseTime::Immediate,
104 allow_burst: true,
105 };
106 }
107
108 let allow_burst = self.burst_tokens > 0 || self.lumpy_tokens > 0;
109 ReleaseDecision {
110 time: self.ideal_next_packet_send_time,
111 allow_burst,
112 }
113 }
114}
115
116impl CongestionControl for Pacer {
117 fn get_congestion_window(&self) -> usize {
118 self.sender.get_congestion_window()
119 }
120
121 fn get_congestion_window_in_packets(&self) -> usize {
122 self.sender.get_congestion_window_in_packets()
123 }
124
125 fn can_send(&self, bytes_in_flight: usize) -> bool {
126 self.sender.can_send(bytes_in_flight)
127 }
128
129 fn on_packet_sent(
130 &mut self, sent_time: Instant, bytes_in_flight: usize,
131 packet_number: u64, bytes: usize, is_retransmissible: bool,
132 rtt_stats: &RttStats,
133 ) {
134 self.sender.on_packet_sent(
135 sent_time,
136 bytes_in_flight,
137 packet_number,
138 bytes,
139 is_retransmissible,
140 rtt_stats,
141 );
142
143 if !self.enabled || !is_retransmissible {
144 return;
145 }
146
147 if bytes_in_flight == 0 && !self.sender.is_in_recovery() {
149 self.burst_tokens = self
153 .initial_burst_size
154 .min(self.sender.get_congestion_window_in_packets());
155 }
156
157 if self.burst_tokens > 0 {
158 self.burst_tokens -= 1;
159 self.ideal_next_packet_send_time = ReleaseTime::Immediate;
160 self.pacing_limited = false;
161 return;
162 }
163
164 let delay = self
168 .pacing_rate(bytes_in_flight + bytes, rtt_stats)
169 .transfer_time(bytes);
170
171 if !self.pacing_limited || self.lumpy_tokens == 0 {
172 self.lumpy_tokens = 1.max(LUMPY_PACING_SIZE.min(
175 (self.sender.get_congestion_window_in_packets() as f64 *
176 LUMPY_PACING_CWND_FRACTION) as usize,
177 ));
178
179 if self.sender.bandwidth_estimate(rtt_stats) <
180 LUMPY_PACING_MIN_BANDWIDTH_KBPS
181 {
182 self.lumpy_tokens = 1;
185 }
186
187 if bytes_in_flight + bytes >= self.sender.get_congestion_window() {
188 self.lumpy_tokens = 1;
191 }
192 }
193
194 self.lumpy_tokens -= 1;
195 self.ideal_next_packet_send_time.set_max(sent_time);
196 self.ideal_next_packet_send_time.inc(delay);
197 self.pacing_limited = self.sender.can_send(bytes_in_flight + bytes);
199 }
200
201 #[inline]
202 fn on_congestion_event(
203 &mut self, rtt_updated: bool, prior_in_flight: usize,
204 bytes_in_flight: usize, event_time: Instant, acked_packets: &[Acked],
205 lost_packets: &[Lost], least_unacked: u64, rtt_stats: &RttStats,
206 ) {
207 self.sender.on_congestion_event(
208 rtt_updated,
209 prior_in_flight,
210 bytes_in_flight,
211 event_time,
212 acked_packets,
213 lost_packets,
214 least_unacked,
215 rtt_stats,
216 );
217
218 if !self.enabled {
219 return;
220 }
221
222 if !lost_packets.is_empty() {
223 self.burst_tokens = 0;
225 }
226
227 if let Some(max_pacing_rate) = self.max_pacing_rate {
228 if rtt_updated {
229 let max_rate = max_pacing_rate * 1.25f32;
230 let max_cwnd =
231 max_rate.to_bytes_per_period(rtt_stats.smoothed_rtt);
232 self.sender.limit_cwnd(max_cwnd as usize);
233 }
234 }
235 }
236
237 fn on_packet_neutered(&mut self, packet_number: u64) {
238 self.sender.on_packet_neutered(packet_number);
239 }
240
241 fn on_retransmission_timeout(&mut self, packets_retransmitted: bool) {
242 self.sender.on_retransmission_timeout(packets_retransmitted)
243 }
244
245 fn on_connection_migration(&mut self) {
246 self.sender.on_connection_migration()
247 }
248
249 fn is_cwnd_limited(&self, bytes_in_flight: usize) -> bool {
250 !self.pacing_limited && self.sender.is_cwnd_limited(bytes_in_flight)
251 }
252
253 fn is_in_recovery(&self) -> bool {
254 self.sender.is_in_recovery()
255 }
256
257 fn pacing_rate(
258 &self, bytes_in_flight: usize, rtt_stats: &RttStats,
259 ) -> Bandwidth {
260 let sender_rate = self.sender.pacing_rate(bytes_in_flight, rtt_stats);
261 match self.max_pacing_rate {
262 Some(rate) if self.enabled => rate.min(sender_rate),
263 _ => sender_rate,
264 }
265 }
266
267 fn bandwidth_estimate(&self, rtt_stats: &RttStats) -> Bandwidth {
268 self.sender.bandwidth_estimate(rtt_stats)
269 }
270
271 fn on_app_limited(&mut self, bytes_in_flight: usize) {
272 self.pacing_limited = false;
273 self.sender.on_app_limited(bytes_in_flight);
274 }
275
276 fn update_mss(&mut self, new_mss: usize) {
277 self.sender.update_mss(new_mss)
278 }
279
280 #[cfg(feature = "qlog")]
281 fn ssthresh(&self) -> Option<u64> {
282 self.sender.ssthresh()
283 }
284}