Skip to main content

quiche/recovery/gcongestion/
pacer.rs

1// Copyright (c) 2013 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5// Copyright (C) 2023, Cloudflare, Inc.
6// All rights reserved.
7//
8// Redistribution and use in source and binary forms, with or without
9// modification, are permitted provided that the following conditions are
10// met:
11//
12//     * Redistributions of source code must retain the above copyright notice,
13//       this list of conditions and the following disclaimer.
14//
15//     * Redistributions in binary form must reproduce the above copyright
16//       notice, this list of conditions and the following disclaimer in the
17//       documentation and/or other materials provided with the distribution.
18//
19// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
20// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
21// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
23// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
24// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
25// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
26// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
27// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
28// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
29// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30
31use std::time::Instant;
32
33use crate::recovery::gcongestion::bbr2::BBRv2;
34use crate::recovery::gcongestion::Bandwidth;
35use crate::recovery::gcongestion::CongestionControl;
36use crate::recovery::rtt::RttStats;
37use crate::recovery::RecoveryStats;
38use crate::recovery::ReleaseDecision;
39use crate::recovery::ReleaseTime;
40
41use super::Acked;
42use super::Lost;
43
44/// Congestion window fraction that the pacing sender allows in bursts during
45/// pacing.
46const LUMPY_PACING_CWND_FRACTION: f64 = 0.25;
47
48/// Number of packets that the pacing sender allows in bursts during pacing.
49/// This is ignored if a flow's estimated bandwidth is lower than 1200 kbps.
50const LUMPY_PACING_SIZE: usize = 2;
51
52/// The minimum estimated client bandwidth below which the pacing sender will
53/// not allow bursts.
54const LUMPY_PACING_MIN_BANDWIDTH_KBPS: Bandwidth =
55    Bandwidth::from_kbits_per_second(1_200);
56
57/// Configured maximum size of the burst coming out of quiescence.  The burst is
58/// never larger than the current CWND in packets.
59const INITIAL_UNPACED_BURST: usize = 10;
60
61#[derive(Debug)]
62pub struct Pacer {
63    /// Should this [`Pacer`] be making any release decisions?
64    enabled: bool,
65    /// Underlying sender
66    sender: BBRv2,
67    /// The maximum rate the [`Pacer`] will use.
68    max_pacing_rate: Option<Bandwidth>,
69    /// Number of unpaced packets to be sent before packets are delayed.
70    burst_tokens: usize,
71    /// When can the next packet be sent.
72    ideal_next_packet_send_time: ReleaseTime,
73    initial_burst_size: usize,
74    /// Number of unpaced packets to be sent before packets are delayed. This
75    /// token is consumed after [`Self::burst_tokens`] ran out.
76    lumpy_tokens: usize,
77    /// Indicates whether pacing throttles the sending. If true, make up for
78    /// lost time.
79    pacing_limited: bool,
80}
81
82impl Pacer {
83    /// Create a new [`Pacer`] with and underlying [`Congestion`]
84    /// implementation, and an optional throttling as specified by
85    /// `max_pacing_rate`.
86    pub(crate) fn new(
87        enabled: bool, congestion: BBRv2, max_pacing_rate: Option<Bandwidth>,
88    ) -> Self {
89        Pacer {
90            enabled,
91            sender: congestion,
92            max_pacing_rate,
93            burst_tokens: INITIAL_UNPACED_BURST,
94            ideal_next_packet_send_time: ReleaseTime::Immediate,
95            initial_burst_size: INITIAL_UNPACED_BURST,
96            lumpy_tokens: 0,
97            pacing_limited: false,
98        }
99    }
100
101    pub fn get_next_release_time(&self) -> ReleaseDecision {
102        if !self.enabled {
103            return ReleaseDecision {
104                time: ReleaseTime::Immediate,
105                allow_burst: true,
106            };
107        }
108
109        let allow_burst = self.burst_tokens > 0 || self.lumpy_tokens > 0;
110        ReleaseDecision {
111            time: self.ideal_next_packet_send_time,
112            allow_burst,
113        }
114    }
115
116    #[cfg(feature = "qlog")]
117    pub fn state_str(&self) -> &'static str {
118        self.sender.state_str()
119    }
120
121    pub fn get_congestion_window(&self) -> usize {
122        self.sender.get_congestion_window()
123    }
124
125    pub fn on_packet_sent(
126        &mut self, sent_time: Instant, bytes_in_flight: usize,
127        packet_number: u64, bytes: usize, is_retransmissible: bool,
128        rtt_stats: &RttStats,
129    ) {
130        self.sender.on_packet_sent(
131            sent_time,
132            bytes_in_flight,
133            packet_number,
134            bytes,
135            is_retransmissible,
136        );
137
138        if !self.enabled || !is_retransmissible {
139            return;
140        }
141
142        // If in recovery, the connection is not coming out of quiescence.
143        if bytes_in_flight == 0 && !self.sender.is_in_recovery() {
144            // Add more burst tokens anytime the connection is leaving quiescence,
145            // but limit it to the equivalent of a single bulk write,
146            // not exceeding the current CWND in packets.
147            self.burst_tokens = self
148                .initial_burst_size
149                .min(self.sender.get_congestion_window_in_packets());
150        }
151
152        if self.burst_tokens > 0 {
153            self.burst_tokens -= 1;
154            self.ideal_next_packet_send_time = ReleaseTime::Immediate;
155            self.pacing_limited = false;
156            return;
157        }
158
159        // The next packet should be sent as soon as the current packet has been
160        // transferred. PacingRate is based on bytes in flight including this
161        // packet.
162        let delay = self
163            .pacing_rate(bytes_in_flight + bytes, rtt_stats)
164            .transfer_time(bytes);
165
166        if !self.pacing_limited || self.lumpy_tokens == 0 {
167            // Reset lumpy_tokens_ if either application or cwnd throttles sending
168            // or token runs out.
169            self.lumpy_tokens = 1.max(LUMPY_PACING_SIZE.min(
170                (self.sender.get_congestion_window_in_packets() as f64 *
171                    LUMPY_PACING_CWND_FRACTION) as usize,
172            ));
173
174            if self.sender.bandwidth_estimate(rtt_stats) <
175                LUMPY_PACING_MIN_BANDWIDTH_KBPS
176            {
177                // Below 1.2Mbps, send 1 packet at once, because one full-sized
178                // packet is about 10ms of queueing.
179                self.lumpy_tokens = 1;
180            }
181
182            if bytes_in_flight + bytes >= self.sender.get_congestion_window() {
183                // Don't add lumpy_tokens if the congestion controller is CWND
184                // limited.
185                self.lumpy_tokens = 1;
186            }
187        }
188
189        self.lumpy_tokens -= 1;
190        self.ideal_next_packet_send_time.set_max(sent_time);
191        self.ideal_next_packet_send_time.inc(delay);
192        // Stop making up for lost time if underlying sender prevents sending.
193        self.pacing_limited = self.sender.can_send(bytes_in_flight + bytes);
194    }
195
196    #[allow(clippy::too_many_arguments)]
197    #[inline]
198    pub fn on_congestion_event(
199        &mut self, rtt_updated: bool, prior_in_flight: usize,
200        bytes_in_flight: usize, event_time: Instant, acked_packets: &[Acked],
201        lost_packets: &[Lost], least_unacked: u64, rtt_stats: &RttStats,
202        recovery_stats: &mut RecoveryStats,
203    ) {
204        self.sender.on_congestion_event(
205            rtt_updated,
206            prior_in_flight,
207            bytes_in_flight,
208            event_time,
209            acked_packets,
210            lost_packets,
211            least_unacked,
212            rtt_stats,
213            recovery_stats,
214        );
215
216        if !self.enabled {
217            return;
218        }
219
220        if !lost_packets.is_empty() {
221            // Clear any burst tokens when entering recovery.
222            self.burst_tokens = 0;
223        }
224
225        if let Some(max_pacing_rate) = self.max_pacing_rate {
226            if rtt_updated {
227                let max_rate = max_pacing_rate * 1.25f32;
228                let max_cwnd =
229                    max_rate.to_bytes_per_period(rtt_stats.smoothed_rtt);
230                self.sender.limit_cwnd(max_cwnd as usize);
231            }
232        }
233    }
234
235    pub fn on_packet_neutered(&mut self, packet_number: u64) {
236        self.sender.on_packet_neutered(packet_number);
237    }
238
239    pub fn on_retransmission_timeout(&mut self, packets_retransmitted: bool) {
240        self.sender.on_retransmission_timeout(packets_retransmitted)
241    }
242
243    pub fn pacing_rate(
244        &self, bytes_in_flight: usize, rtt_stats: &RttStats,
245    ) -> Bandwidth {
246        let sender_rate = self.sender.pacing_rate(bytes_in_flight, rtt_stats);
247        match self.max_pacing_rate {
248            Some(rate) if self.enabled => rate.min(sender_rate),
249            _ => sender_rate,
250        }
251    }
252
253    pub fn bandwidth_estimate(&self, rtt_stats: &RttStats) -> Bandwidth {
254        self.sender.bandwidth_estimate(rtt_stats)
255    }
256
257    pub fn max_bandwidth(&self) -> Bandwidth {
258        self.sender.max_bandwidth()
259    }
260
261    pub fn on_app_limited(&mut self, bytes_in_flight: usize) {
262        self.pacing_limited = false;
263        self.sender.on_app_limited(bytes_in_flight);
264    }
265
266    pub fn update_mss(&mut self, new_mss: usize) {
267        self.sender.update_mss(new_mss)
268    }
269
270    #[cfg(feature = "qlog")]
271    pub fn ssthresh(&self) -> Option<u64> {
272        self.sender.ssthresh()
273    }
274
275    #[cfg(test)]
276    pub fn is_app_limited(&self, bytes_in_flight: usize) -> bool {
277        !self.is_cwnd_limited(bytes_in_flight)
278    }
279
280    #[cfg(test)]
281    fn is_cwnd_limited(&self, bytes_in_flight: usize) -> bool {
282        !self.pacing_limited && self.sender.is_cwnd_limited(bytes_in_flight)
283    }
284}