quiche/recovery/gcongestion/
pacer.rs

1// Copyright (c) 2013 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5// Copyright (C) 2023, Cloudflare, Inc.
6// All rights reserved.
7//
8// Redistribution and use in source and binary forms, with or without
9// modification, are permitted provided that the following conditions are
10// met:
11//
12//     * Redistributions of source code must retain the above copyright notice,
13//       this list of conditions and the following disclaimer.
14//
15//     * Redistributions in binary form must reproduce the above copyright
16//       notice, this list of conditions and the following disclaimer in the
17//       documentation and/or other materials provided with the distribution.
18//
19// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
20// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
21// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
23// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
24// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
25// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
26// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
27// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
28// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
29// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30
31use std::time::Instant;
32
33use crate::recovery::gcongestion::bbr2::BBRv2;
34use crate::recovery::gcongestion::Bandwidth;
35use crate::recovery::gcongestion::CongestionControl;
36use crate::recovery::rtt::RttStats;
37use crate::recovery::RecoveryStats;
38use crate::recovery::ReleaseDecision;
39use crate::recovery::ReleaseTime;
40
41use super::Acked;
42use super::Lost;
43
44/// Congestion window fraction that the pacing sender allows in bursts during
45/// pacing.
46const LUMPY_PACING_CWND_FRACTION: f64 = 0.25;
47
48/// Number of packets that the pacing sender allows in bursts during pacing.
49/// This is ignored if a flow's estimated bandwidth is lower than 1200 kbps.
50const LUMPY_PACING_SIZE: usize = 2;
51
52/// The minimum estimated client bandwidth below which the pacing sender will
53/// not allow bursts.
54const LUMPY_PACING_MIN_BANDWIDTH_KBPS: Bandwidth =
55    Bandwidth::from_kbits_per_second(1_200);
56
57/// Configured maximum size of the burst coming out of quiescence.  The burst is
58/// never larger than the current CWND in packets.
59const INITIAL_UNPACED_BURST: usize = 10;
60
61#[derive(Debug)]
62pub struct Pacer {
63    /// Should this [`Pacer`] be making any release decisions?
64    enabled: bool,
65    /// Underlying sender
66    sender: BBRv2,
67    /// The maximum rate the [`Pacer`] will use.
68    max_pacing_rate: Option<Bandwidth>,
69    /// Number of unpaced packets to be sent before packets are delayed.
70    burst_tokens: usize,
71    /// When can the next packet be sent.
72    ideal_next_packet_send_time: ReleaseTime,
73    initial_burst_size: usize,
74    /// Number of unpaced packets to be sent before packets are delayed. This
75    /// token is consumed after [`Self::burst_tokens`] ran out.
76    lumpy_tokens: usize,
77    /// Indicates whether pacing throttles the sending. If true, make up for
78    /// lost time.
79    pacing_limited: bool,
80}
81
82impl Pacer {
83    /// Create a new [`Pacer`] with and underlying [`Congestion`]
84    /// implementation, and an optional throttling as specified by
85    /// `max_pacing_rate`.
86    pub(crate) fn new(
87        enabled: bool, congestion: BBRv2, max_pacing_rate: Option<Bandwidth>,
88    ) -> Self {
89        Pacer {
90            enabled,
91            sender: congestion,
92            max_pacing_rate,
93            burst_tokens: INITIAL_UNPACED_BURST,
94            ideal_next_packet_send_time: ReleaseTime::Immediate,
95            initial_burst_size: INITIAL_UNPACED_BURST,
96            lumpy_tokens: 0,
97            pacing_limited: false,
98        }
99    }
100
101    pub fn get_next_release_time(&self) -> ReleaseDecision {
102        if !self.enabled {
103            return ReleaseDecision {
104                time: ReleaseTime::Immediate,
105                allow_burst: true,
106            };
107        }
108
109        let allow_burst = self.burst_tokens > 0 || self.lumpy_tokens > 0;
110        ReleaseDecision {
111            time: self.ideal_next_packet_send_time,
112            allow_burst,
113        }
114    }
115
116    #[cfg(feature = "qlog")]
117    pub fn state_str(&self) -> &'static str {
118        self.sender.state_str()
119    }
120
121    pub fn get_congestion_window(&self) -> usize {
122        self.sender.get_congestion_window()
123    }
124
125    pub fn on_packet_sent(
126        &mut self, sent_time: Instant, bytes_in_flight: usize,
127        packet_number: u64, bytes: usize, is_retransmissible: bool,
128        rtt_stats: &RttStats,
129    ) {
130        self.sender.on_packet_sent(
131            sent_time,
132            bytes_in_flight,
133            packet_number,
134            bytes,
135            is_retransmissible,
136            rtt_stats,
137        );
138
139        if !self.enabled || !is_retransmissible {
140            return;
141        }
142
143        // If in recovery, the connection is not coming out of quiescence.
144        if bytes_in_flight == 0 && !self.sender.is_in_recovery() {
145            // Add more burst tokens anytime the connection is leaving quiescence,
146            // but limit it to the equivalent of a single bulk write,
147            // not exceeding the current CWND in packets.
148            self.burst_tokens = self
149                .initial_burst_size
150                .min(self.sender.get_congestion_window_in_packets());
151        }
152
153        if self.burst_tokens > 0 {
154            self.burst_tokens -= 1;
155            self.ideal_next_packet_send_time = ReleaseTime::Immediate;
156            self.pacing_limited = false;
157            return;
158        }
159
160        // The next packet should be sent as soon as the current packet has been
161        // transferred. PacingRate is based on bytes in flight including this
162        // packet.
163        let delay = self
164            .pacing_rate(bytes_in_flight + bytes, rtt_stats)
165            .transfer_time(bytes);
166
167        if !self.pacing_limited || self.lumpy_tokens == 0 {
168            // Reset lumpy_tokens_ if either application or cwnd throttles sending
169            // or token runs out.
170            self.lumpy_tokens = 1.max(LUMPY_PACING_SIZE.min(
171                (self.sender.get_congestion_window_in_packets() as f64 *
172                    LUMPY_PACING_CWND_FRACTION) as usize,
173            ));
174
175            if self.sender.bandwidth_estimate(rtt_stats) <
176                LUMPY_PACING_MIN_BANDWIDTH_KBPS
177            {
178                // Below 1.2Mbps, send 1 packet at once, because one full-sized
179                // packet is about 10ms of queueing.
180                self.lumpy_tokens = 1;
181            }
182
183            if bytes_in_flight + bytes >= self.sender.get_congestion_window() {
184                // Don't add lumpy_tokens if the congestion controller is CWND
185                // limited.
186                self.lumpy_tokens = 1;
187            }
188        }
189
190        self.lumpy_tokens -= 1;
191        self.ideal_next_packet_send_time.set_max(sent_time);
192        self.ideal_next_packet_send_time.inc(delay);
193        // Stop making up for lost time if underlying sender prevents sending.
194        self.pacing_limited = self.sender.can_send(bytes_in_flight + bytes);
195    }
196
197    #[allow(clippy::too_many_arguments)]
198    #[inline]
199    pub fn on_congestion_event(
200        &mut self, rtt_updated: bool, prior_in_flight: usize,
201        bytes_in_flight: usize, event_time: Instant, acked_packets: &[Acked],
202        lost_packets: &[Lost], least_unacked: u64, rtt_stats: &RttStats,
203        recovery_stats: &mut RecoveryStats,
204    ) {
205        self.sender.on_congestion_event(
206            rtt_updated,
207            prior_in_flight,
208            bytes_in_flight,
209            event_time,
210            acked_packets,
211            lost_packets,
212            least_unacked,
213            rtt_stats,
214            recovery_stats,
215        );
216
217        if !self.enabled {
218            return;
219        }
220
221        if !lost_packets.is_empty() {
222            // Clear any burst tokens when entering recovery.
223            self.burst_tokens = 0;
224        }
225
226        if let Some(max_pacing_rate) = self.max_pacing_rate {
227            if rtt_updated {
228                let max_rate = max_pacing_rate * 1.25f32;
229                let max_cwnd =
230                    max_rate.to_bytes_per_period(rtt_stats.smoothed_rtt);
231                self.sender.limit_cwnd(max_cwnd as usize);
232            }
233        }
234    }
235
236    pub fn on_packet_neutered(&mut self, packet_number: u64) {
237        self.sender.on_packet_neutered(packet_number);
238    }
239
240    pub fn on_retransmission_timeout(&mut self, packets_retransmitted: bool) {
241        self.sender.on_retransmission_timeout(packets_retransmitted)
242    }
243
244    pub fn pacing_rate(
245        &self, bytes_in_flight: usize, rtt_stats: &RttStats,
246    ) -> Bandwidth {
247        let sender_rate = self.sender.pacing_rate(bytes_in_flight, rtt_stats);
248        match self.max_pacing_rate {
249            Some(rate) if self.enabled => rate.min(sender_rate),
250            _ => sender_rate,
251        }
252    }
253
254    pub fn bandwidth_estimate(&self, rtt_stats: &RttStats) -> Bandwidth {
255        self.sender.bandwidth_estimate(rtt_stats)
256    }
257
258    pub fn max_bandwidth(&self) -> Bandwidth {
259        self.sender.max_bandwidth()
260    }
261
262    pub fn on_app_limited(&mut self, bytes_in_flight: usize) {
263        self.pacing_limited = false;
264        self.sender.on_app_limited(bytes_in_flight);
265    }
266
267    pub fn update_mss(&mut self, new_mss: usize) {
268        self.sender.update_mss(new_mss)
269    }
270
271    #[cfg(feature = "qlog")]
272    pub fn ssthresh(&self) -> Option<u64> {
273        self.sender.ssthresh()
274    }
275
276    #[cfg(test)]
277    pub fn is_app_limited(&self, bytes_in_flight: usize) -> bool {
278        !self.is_cwnd_limited(bytes_in_flight)
279    }
280
281    #[cfg(test)]
282    fn is_cwnd_limited(&self, bytes_in_flight: usize) -> bool {
283        !self.pacing_limited && self.sender.is_cwnd_limited(bytes_in_flight)
284    }
285}