quiche/recovery/congestion/
pacer.rs

1// Copyright (C) 2022, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27//! Pacer provides the timestamp for the next packet to be sent based on the
28//! current send_quantum, pacing rate and last updated time.
29//!
30//! It's a kind of leaky bucket algorithm (RFC9002, 7.7 Pacing) but it considers
31//! max burst (send_quantum, in bytes) and provide the same timestamp for the
32//! same sized packets (except last one) to be GSO friendly, assuming we send
33//! packets using multiple sendmsg(), a sendmmsg(), or sendmsg() with GSO
34//! without waiting for new I/O events.
35//!
36//! After sending a burst of packets, the next timestamp will be updated based
37//! on the current pacing rate. It will make actual timestamp sent and recorded
38//! timestamp (Sent.time_sent) as close as possible. If GSO is not used, it will
39//! still try to provide close timestamp if the send burst is implemented.
40
41use std::time::Duration;
42use std::time::Instant;
43
44#[derive(Debug)]
45pub struct Pacer {
46    /// Whether pacing is enabled.
47    enabled: bool,
48
49    /// Bucket capacity (bytes).
50    capacity: usize,
51
52    /// Bucket used (bytes).
53    used: usize,
54
55    /// Sending pacing rate (bytes/sec).
56    rate: u64,
57
58    /// Timestamp of the last packet sent time update.
59    last_update: Instant,
60
61    /// Timestamp of the next packet to be sent.
62    next_time: Instant,
63
64    /// Current MSS.
65    max_datagram_size: usize,
66
67    /// Last packet size.
68    last_packet_size: Option<usize>,
69
70    /// Interval to be added in next burst.
71    iv: Duration,
72
73    /// Max pacing rate (bytes/sec).
74    max_pacing_rate: Option<u64>,
75}
76
77impl Pacer {
78    pub fn new(
79        enabled: bool, capacity: usize, rate: u64, max_datagram_size: usize,
80        max_pacing_rate: Option<u64>,
81    ) -> Self {
82        // Round capacity to MSS.
83        let capacity = capacity / max_datagram_size * max_datagram_size;
84        let pacing_rate = if let Some(max_rate) = max_pacing_rate {
85            max_rate.min(rate)
86        } else {
87            rate
88        };
89
90        Pacer {
91            enabled,
92
93            capacity,
94
95            used: 0,
96
97            rate: pacing_rate,
98
99            last_update: Instant::now(),
100
101            next_time: Instant::now(),
102
103            max_datagram_size,
104
105            last_packet_size: None,
106
107            iv: Duration::ZERO,
108
109            max_pacing_rate,
110        }
111    }
112
113    /// Returns whether pacing is enabled.
114    pub fn enabled(&self) -> bool {
115        self.enabled
116    }
117
118    /// Returns the current pacing rate.
119    pub fn rate(&self) -> u64 {
120        self.rate
121    }
122
123    /// Returns max pacing rate.
124    pub fn max_pacing_rate(&self) -> Option<u64> {
125        self.max_pacing_rate
126    }
127
128    /// Updates the bucket capacity or pacing_rate.
129    pub fn update(&mut self, capacity: usize, rate: u64, now: Instant) {
130        let capacity = capacity / self.max_datagram_size * self.max_datagram_size;
131
132        if self.capacity != capacity {
133            self.reset(now);
134        }
135
136        self.capacity = capacity;
137
138        self.rate = if let Some(max_rate) = self.max_pacing_rate {
139            max_rate.min(rate)
140        } else {
141            rate
142        };
143    }
144
145    /// Resets the pacer for the next burst.
146    fn reset(&mut self, now: Instant) {
147        self.used = 0;
148
149        self.last_update = now;
150
151        self.next_time = self.next_time.max(now);
152
153        self.last_packet_size = None;
154
155        self.iv = Duration::ZERO;
156    }
157
158    /// Updates the timestamp for the packet to send.
159    pub fn send(&mut self, packet_size: usize, now: Instant) {
160        if self.rate() == 0 {
161            self.reset(now);
162
163            return;
164        }
165
166        if !self.iv.is_zero() {
167            self.next_time = self.next_time.max(now) + self.iv;
168
169            self.iv = Duration::ZERO;
170        }
171
172        let interval =
173            Duration::from_secs_f64(self.capacity as f64 / self.rate() as f64);
174
175        let elapsed = now.saturating_duration_since(self.last_update);
176
177        // If too old, reset it.
178        if elapsed > interval {
179            self.reset(now);
180        }
181
182        self.used += packet_size;
183
184        let same_size = if let Some(last_packet_size) = self.last_packet_size {
185            last_packet_size == packet_size
186        } else {
187            true
188        };
189
190        self.last_packet_size = Some(packet_size);
191
192        if self.used >= self.capacity || !same_size {
193            self.iv =
194                Duration::from_secs_f64(self.used as f64 / self.rate() as f64);
195
196            self.used = 0;
197
198            self.last_update = now;
199
200            self.last_packet_size = None;
201        };
202    }
203
204    /// Returns the timestamp for the next packet.
205    pub fn next_time(&self) -> Instant {
206        self.next_time
207    }
208}
209
210#[cfg(test)]
211mod tests {
212    use super::*;
213
214    #[test]
215    fn pacer_update() {
216        let datagram_size = 1200;
217        let max_burst = datagram_size * 10;
218        let pacing_rate = 100_000;
219
220        let mut p = Pacer::new(true, max_burst, pacing_rate, datagram_size, None);
221
222        let now = Instant::now();
223
224        // Send 6000 (half of max_burst) -> no timestamp change yet.
225        p.send(6000, now);
226
227        assert!(now.duration_since(p.next_time()) < Duration::from_millis(1));
228
229        // Send 6000 bytes -> max_burst filled.
230        p.send(6000, now);
231
232        assert!(now.duration_since(p.next_time()) < Duration::from_millis(1));
233
234        // Start of a new burst.
235        let now = now + Duration::from_millis(5);
236
237        // Send 1000 bytes and next_time is updated.
238        p.send(1000, now);
239
240        let interval = max_burst as f64 / pacing_rate as f64;
241
242        assert_eq!(p.next_time() - now, Duration::from_secs_f64(interval));
243    }
244
245    #[test]
246    /// Same as pacer_update() but adds some idle time between transfers to
247    /// trigger a reset.
248    fn pacer_idle() {
249        let datagram_size = 1200;
250        let max_burst = datagram_size * 10;
251        let pacing_rate = 100_000;
252
253        let mut p = Pacer::new(true, max_burst, pacing_rate, datagram_size, None);
254
255        let now = Instant::now();
256
257        // Send 6000 (half of max_burst) -> no timestamp change yet.
258        p.send(6000, now);
259
260        assert!(now.duration_since(p.next_time()) < Duration::from_millis(1));
261
262        // Sleep 200ms to reset the idle pacer (at least 120ms).
263        let now = now + Duration::from_millis(200);
264
265        // Send 6000 bytes -> idle reset and a new burst  isstarted.
266        p.send(6000, now);
267
268        assert_eq!(p.next_time(), now);
269    }
270
271    #[test]
272    fn pacer_set_max_pacing_rate() {
273        let datagram_size = 1200;
274        let max_burst = datagram_size * 10;
275        let pacing_rate = 100_000;
276        let max_pacing_rate = 50_000;
277
278        // Use the max_pacing_rate.
279        let mut p = Pacer::new(
280            true,
281            max_burst,
282            pacing_rate,
283            datagram_size,
284            Some(max_pacing_rate),
285        );
286
287        let now = Instant::now();
288
289        // Send 6000 (half of max_burst) -> no timestamp change yet.
290        p.send(6000, now);
291
292        assert!(now.duration_since(p.next_time()) < Duration::from_millis(1));
293
294        // Send 6000 bytes -> max_burst filled.
295        p.send(6000, now);
296
297        assert!(now.duration_since(p.next_time()) < Duration::from_millis(1));
298
299        // Start of a second burst.
300        let now = now + Duration::from_millis(5);
301        p.send(12000, now);
302
303        let second_burst_send_time = p.next_time();
304
305        let interval = max_burst as f64 / max_pacing_rate as f64;
306
307        assert_eq!(
308            second_burst_send_time - now,
309            Duration::from_secs_f64(interval)
310        );
311
312        // Start of third burst
313        let now = now + Duration::from_millis(5);
314
315        // Update pacer rate.
316        p.update(max_burst, 75_000, now);
317
318        p.send(12000, now);
319
320        let third_burst_send_time = p.next_time();
321
322        assert_eq!(
323            third_burst_send_time - second_burst_send_time,
324            Duration::from_secs_f64(interval)
325        );
326    }
327}