quiche/recovery/gcongestion/
pacer.rs

1// Copyright (c) 2013 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5// Copyright (C) 2023, Cloudflare, Inc.
6// All rights reserved.
7//
8// Redistribution and use in source and binary forms, with or without
9// modification, are permitted provided that the following conditions are
10// met:
11//
12//     * Redistributions of source code must retain the above copyright notice,
13//       this list of conditions and the following disclaimer.
14//
15//     * Redistributions in binary form must reproduce the above copyright
16//       notice, this list of conditions and the following disclaimer in the
17//       documentation and/or other materials provided with the distribution.
18//
19// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
20// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
21// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
23// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
24// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
25// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
26// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
27// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
28// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
29// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30
31use std::time::Instant;
32
33use crate::recovery::gcongestion::Bandwidth;
34use crate::recovery::rtt::RttStats;
35use crate::recovery::RecoveryStats;
36use crate::recovery::ReleaseDecision;
37use crate::recovery::ReleaseTime;
38
39use super::Acked;
40use super::Congestion;
41use super::CongestionControl;
42use super::Lost;
43
44/// Congestion window fraction that the pacing sender allows in bursts during
45/// pacing.
46const LUMPY_PACING_CWND_FRACTION: f64 = 0.25;
47
48/// Number of packets that the pacing sender allows in bursts during pacing.
49/// This is ignored if a flow's estimated bandwidth is lower than 1200 kbps.
50const LUMPY_PACING_SIZE: usize = 2;
51
52/// The minimum estimated client bandwidth below which the pacing sender will
53/// not allow bursts.
54const LUMPY_PACING_MIN_BANDWIDTH_KBPS: Bandwidth =
55    Bandwidth::from_kbits_per_second(1_200);
56
57/// Configured maximum size of the burst coming out of quiescence.  The burst is
58/// never larger than the current CWND in packets.
59const INITIAL_UNPACED_BURST: usize = 10;
60
61#[derive(Debug)]
62pub struct Pacer {
63    /// Should this [`Pacer`] be making any release decisions?
64    enabled: bool,
65    /// Underlying sender
66    sender: Congestion,
67    /// The maximum rate the [`Pacer`] will use.
68    max_pacing_rate: Option<Bandwidth>,
69    /// Number of unpaced packets to be sent before packets are delayed.
70    burst_tokens: usize,
71    /// When can the next packet be sent.
72    ideal_next_packet_send_time: ReleaseTime,
73    initial_burst_size: usize,
74    /// Number of unpaced packets to be sent before packets are delayed. This
75    /// token is consumed after [`Self::burst_tokens`] ran out.
76    lumpy_tokens: usize,
77    /// Indicates whether pacing throttles the sending. If true, make up for
78    /// lost time.
79    pacing_limited: bool,
80}
81
82impl Pacer {
83    /// Create a new [`Pacer`] with and underlying [`Congestion`]
84    /// implementation, and an optional throttling as specified by
85    /// `max_pacing_rate`.
86    pub(crate) fn new(
87        enabled: bool, congestion: Congestion, max_pacing_rate: Option<Bandwidth>,
88    ) -> Self {
89        Pacer {
90            enabled,
91            sender: congestion,
92            max_pacing_rate,
93            burst_tokens: INITIAL_UNPACED_BURST,
94            ideal_next_packet_send_time: ReleaseTime::Immediate,
95            initial_burst_size: INITIAL_UNPACED_BURST,
96            lumpy_tokens: 0,
97            pacing_limited: false,
98        }
99    }
100
101    pub fn get_next_release_time(&self) -> ReleaseDecision {
102        if !self.enabled {
103            return ReleaseDecision {
104                time: ReleaseTime::Immediate,
105                allow_burst: true,
106            };
107        }
108
109        let allow_burst = self.burst_tokens > 0 || self.lumpy_tokens > 0;
110        ReleaseDecision {
111            time: self.ideal_next_packet_send_time,
112            allow_burst,
113        }
114    }
115}
116
117impl CongestionControl for Pacer {
118    #[cfg(feature = "qlog")]
119    fn state_str(&self) -> &'static str {
120        self.sender.state_str()
121    }
122
123    fn get_congestion_window(&self) -> usize {
124        self.sender.get_congestion_window()
125    }
126
127    fn get_congestion_window_in_packets(&self) -> usize {
128        self.sender.get_congestion_window_in_packets()
129    }
130
131    fn can_send(&self, bytes_in_flight: usize) -> bool {
132        self.sender.can_send(bytes_in_flight)
133    }
134
135    fn on_packet_sent(
136        &mut self, sent_time: Instant, bytes_in_flight: usize,
137        packet_number: u64, bytes: usize, is_retransmissible: bool,
138        rtt_stats: &RttStats,
139    ) {
140        self.sender.on_packet_sent(
141            sent_time,
142            bytes_in_flight,
143            packet_number,
144            bytes,
145            is_retransmissible,
146            rtt_stats,
147        );
148
149        if !self.enabled || !is_retransmissible {
150            return;
151        }
152
153        // If in recovery, the connection is not coming out of quiescence.
154        if bytes_in_flight == 0 && !self.sender.is_in_recovery() {
155            // Add more burst tokens anytime the connection is leaving quiescence,
156            // but limit it to the equivalent of a single bulk write,
157            // not exceeding the current CWND in packets.
158            self.burst_tokens = self
159                .initial_burst_size
160                .min(self.sender.get_congestion_window_in_packets());
161        }
162
163        if self.burst_tokens > 0 {
164            self.burst_tokens -= 1;
165            self.ideal_next_packet_send_time = ReleaseTime::Immediate;
166            self.pacing_limited = false;
167            return;
168        }
169
170        // The next packet should be sent as soon as the current packet has been
171        // transferred. PacingRate is based on bytes in flight including this
172        // packet.
173        let delay = self
174            .pacing_rate(bytes_in_flight + bytes, rtt_stats)
175            .transfer_time(bytes);
176
177        if !self.pacing_limited || self.lumpy_tokens == 0 {
178            // Reset lumpy_tokens_ if either application or cwnd throttles sending
179            // or token runs out.
180            self.lumpy_tokens = 1.max(LUMPY_PACING_SIZE.min(
181                (self.sender.get_congestion_window_in_packets() as f64 *
182                    LUMPY_PACING_CWND_FRACTION) as usize,
183            ));
184
185            if self.sender.bandwidth_estimate(rtt_stats) <
186                LUMPY_PACING_MIN_BANDWIDTH_KBPS
187            {
188                // Below 1.2Mbps, send 1 packet at once, because one full-sized
189                // packet is about 10ms of queueing.
190                self.lumpy_tokens = 1;
191            }
192
193            if bytes_in_flight + bytes >= self.sender.get_congestion_window() {
194                // Don't add lumpy_tokens if the congestion controller is CWND
195                // limited.
196                self.lumpy_tokens = 1;
197            }
198        }
199
200        self.lumpy_tokens -= 1;
201        self.ideal_next_packet_send_time.set_max(sent_time);
202        self.ideal_next_packet_send_time.inc(delay);
203        // Stop making up for lost time if underlying sender prevents sending.
204        self.pacing_limited = self.sender.can_send(bytes_in_flight + bytes);
205    }
206
207    #[inline]
208    fn on_congestion_event(
209        &mut self, rtt_updated: bool, prior_in_flight: usize,
210        bytes_in_flight: usize, event_time: Instant, acked_packets: &[Acked],
211        lost_packets: &[Lost], least_unacked: u64, rtt_stats: &RttStats,
212        recovery_stats: &mut RecoveryStats,
213    ) {
214        self.sender.on_congestion_event(
215            rtt_updated,
216            prior_in_flight,
217            bytes_in_flight,
218            event_time,
219            acked_packets,
220            lost_packets,
221            least_unacked,
222            rtt_stats,
223            recovery_stats,
224        );
225
226        if !self.enabled {
227            return;
228        }
229
230        if !lost_packets.is_empty() {
231            // Clear any burst tokens when entering recovery.
232            self.burst_tokens = 0;
233        }
234
235        if let Some(max_pacing_rate) = self.max_pacing_rate {
236            if rtt_updated {
237                let max_rate = max_pacing_rate * 1.25f32;
238                let max_cwnd =
239                    max_rate.to_bytes_per_period(rtt_stats.smoothed_rtt);
240                self.sender.limit_cwnd(max_cwnd as usize);
241            }
242        }
243    }
244
245    fn on_packet_neutered(&mut self, packet_number: u64) {
246        self.sender.on_packet_neutered(packet_number);
247    }
248
249    fn on_retransmission_timeout(&mut self, packets_retransmitted: bool) {
250        self.sender.on_retransmission_timeout(packets_retransmitted)
251    }
252
253    fn on_connection_migration(&mut self) {
254        self.sender.on_connection_migration()
255    }
256
257    fn is_cwnd_limited(&self, bytes_in_flight: usize) -> bool {
258        !self.pacing_limited && self.sender.is_cwnd_limited(bytes_in_flight)
259    }
260
261    fn is_in_recovery(&self) -> bool {
262        self.sender.is_in_recovery()
263    }
264
265    fn pacing_rate(
266        &self, bytes_in_flight: usize, rtt_stats: &RttStats,
267    ) -> Bandwidth {
268        let sender_rate = self.sender.pacing_rate(bytes_in_flight, rtt_stats);
269        match self.max_pacing_rate {
270            Some(rate) if self.enabled => rate.min(sender_rate),
271            _ => sender_rate,
272        }
273    }
274
275    fn bandwidth_estimate(&self, rtt_stats: &RttStats) -> Bandwidth {
276        self.sender.bandwidth_estimate(rtt_stats)
277    }
278
279    fn on_app_limited(&mut self, bytes_in_flight: usize) {
280        self.pacing_limited = false;
281        self.sender.on_app_limited(bytes_in_flight);
282    }
283
284    fn update_mss(&mut self, new_mss: usize) {
285        self.sender.update_mss(new_mss)
286    }
287
288    #[cfg(feature = "qlog")]
289    fn ssthresh(&self) -> Option<u64> {
290        self.sender.ssthresh()
291    }
292}