quiche/recovery/gcongestion/
pacer.rs

1// Copyright (c) 2013 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5// Copyright (C) 2023, Cloudflare, Inc.
6// All rights reserved.
7//
8// Redistribution and use in source and binary forms, with or without
9// modification, are permitted provided that the following conditions are
10// met:
11//
12//     * Redistributions of source code must retain the above copyright notice,
13//       this list of conditions and the following disclaimer.
14//
15//     * Redistributions in binary form must reproduce the above copyright
16//       notice, this list of conditions and the following disclaimer in the
17//       documentation and/or other materials provided with the distribution.
18//
19// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
20// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
21// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
23// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
24// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
25// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
26// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
27// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
28// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
29// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30
31use std::time::Instant;
32
33use crate::recovery::rtt::RttStats;
34use crate::recovery::ReleaseDecision;
35use crate::recovery::ReleaseTime;
36
37use super::bandwidth::Bandwidth;
38use super::Acked;
39use super::Congestion;
40use super::CongestionControl;
41use super::Lost;
42
43/// Congestion window fraction that the pacing sender allows in bursts during
44/// pacing.
45const LUMPY_PACING_CWND_FRACTION: f64 = 0.25;
46
47/// Number of packets that the pacing sender allows in bursts during pacing.
48/// This is ignored if a flow's estimated bandwidth is lower than 1200 kbps.
49const LUMPY_PACING_SIZE: usize = 2;
50
51/// The minimum estimated client bandwidth below which the pacing sender will
52/// not allow bursts.
53const LUMPY_PACING_MIN_BANDWIDTH_KBPS: Bandwidth =
54    Bandwidth::from_kbits_per_second(1_200);
55
56/// Configured maximum size of the burst coming out of quiescence.  The burst is
57/// never larger than the current CWND in packets.
58const INITIAL_UNPACED_BURST: usize = 10;
59
60#[derive(Debug)]
61pub struct Pacer {
62    /// Should this [`Pacer`] be making any release decisions?
63    enabled: bool,
64    /// Underlying sender
65    sender: Congestion,
66    /// The maximum rate the [`Pacer`] will use.
67    max_pacing_rate: Option<Bandwidth>,
68    /// Number of unpaced packets to be sent before packets are delayed.
69    burst_tokens: usize,
70    /// When can the next packet be sent.
71    ideal_next_packet_send_time: ReleaseTime,
72    initial_burst_size: usize,
73    /// Number of unpaced packets to be sent before packets are delayed. This
74    /// token is consumed after [`burst_tokens`] ran out.
75    lumpy_tokens: usize,
76    /// Indicates whether pacing throttles the sending. If true, make up for
77    /// lost time.
78    pacing_limited: bool,
79}
80
81impl Pacer {
82    /// Create a new [`Pacer`] with and underlying [`Congestion`]
83    /// implementation, and an optional throttling as specified by
84    /// [`max_pacing_rate`].
85    pub(crate) fn new(
86        enabled: bool, congestion: Congestion, max_pacing_rate: Option<Bandwidth>,
87    ) -> Self {
88        Pacer {
89            enabled,
90            sender: congestion,
91            max_pacing_rate,
92            burst_tokens: INITIAL_UNPACED_BURST,
93            ideal_next_packet_send_time: ReleaseTime::Immediate,
94            initial_burst_size: INITIAL_UNPACED_BURST,
95            lumpy_tokens: 0,
96            pacing_limited: false,
97        }
98    }
99
100    pub fn get_next_release_time(&self) -> ReleaseDecision {
101        if !self.enabled {
102            return ReleaseDecision {
103                time: ReleaseTime::Immediate,
104                allow_burst: true,
105            };
106        }
107
108        let allow_burst = self.burst_tokens > 0 || self.lumpy_tokens > 0;
109        ReleaseDecision {
110            time: self.ideal_next_packet_send_time,
111            allow_burst,
112        }
113    }
114}
115
116impl CongestionControl for Pacer {
117    fn get_congestion_window(&self) -> usize {
118        self.sender.get_congestion_window()
119    }
120
121    fn get_congestion_window_in_packets(&self) -> usize {
122        self.sender.get_congestion_window_in_packets()
123    }
124
125    fn can_send(&self, bytes_in_flight: usize) -> bool {
126        self.sender.can_send(bytes_in_flight)
127    }
128
129    fn on_packet_sent(
130        &mut self, sent_time: Instant, bytes_in_flight: usize,
131        packet_number: u64, bytes: usize, is_retransmissible: bool,
132        rtt_stats: &RttStats,
133    ) {
134        self.sender.on_packet_sent(
135            sent_time,
136            bytes_in_flight,
137            packet_number,
138            bytes,
139            is_retransmissible,
140            rtt_stats,
141        );
142
143        if !self.enabled || !is_retransmissible {
144            return;
145        }
146
147        // If in recovery, the connection is not coming out of quiescence.
148        if bytes_in_flight == 0 && !self.sender.is_in_recovery() {
149            // Add more burst tokens anytime the connection is leaving quiescence,
150            // but limit it to the equivalent of a single bulk write,
151            // not exceeding the current CWND in packets.
152            self.burst_tokens = self
153                .initial_burst_size
154                .min(self.sender.get_congestion_window_in_packets());
155        }
156
157        if self.burst_tokens > 0 {
158            self.burst_tokens -= 1;
159            self.ideal_next_packet_send_time = ReleaseTime::Immediate;
160            self.pacing_limited = false;
161            return;
162        }
163
164        // The next packet should be sent as soon as the current packet has been
165        // transferred. PacingRate is based on bytes in flight including this
166        // packet.
167        let delay = self
168            .pacing_rate(bytes_in_flight + bytes, rtt_stats)
169            .transfer_time(bytes);
170
171        if !self.pacing_limited || self.lumpy_tokens == 0 {
172            // Reset lumpy_tokens_ if either application or cwnd throttles sending
173            // or token runs out.
174            self.lumpy_tokens = 1.max(LUMPY_PACING_SIZE.min(
175                (self.sender.get_congestion_window_in_packets() as f64 *
176                    LUMPY_PACING_CWND_FRACTION) as usize,
177            ));
178
179            if self.sender.bandwidth_estimate(rtt_stats) <
180                LUMPY_PACING_MIN_BANDWIDTH_KBPS
181            {
182                // Below 1.2Mbps, send 1 packet at once, because one full-sized
183                // packet is about 10ms of queueing.
184                self.lumpy_tokens = 1;
185            }
186
187            if bytes_in_flight + bytes >= self.sender.get_congestion_window() {
188                // Don't add lumpy_tokens if the congestion controller is CWND
189                // limited.
190                self.lumpy_tokens = 1;
191            }
192        }
193
194        self.lumpy_tokens -= 1;
195        self.ideal_next_packet_send_time.set_max(sent_time);
196        self.ideal_next_packet_send_time.inc(delay);
197        // Stop making up for lost time if underlying sender prevents sending.
198        self.pacing_limited = self.sender.can_send(bytes_in_flight + bytes);
199    }
200
201    #[inline]
202    fn on_congestion_event(
203        &mut self, rtt_updated: bool, prior_in_flight: usize,
204        bytes_in_flight: usize, event_time: Instant, acked_packets: &[Acked],
205        lost_packets: &[Lost], least_unacked: u64, rtt_stats: &RttStats,
206    ) {
207        self.sender.on_congestion_event(
208            rtt_updated,
209            prior_in_flight,
210            bytes_in_flight,
211            event_time,
212            acked_packets,
213            lost_packets,
214            least_unacked,
215            rtt_stats,
216        );
217
218        if !self.enabled {
219            return;
220        }
221
222        if !lost_packets.is_empty() {
223            // Clear any burst tokens when entering recovery.
224            self.burst_tokens = 0;
225        }
226
227        if let Some(max_pacing_rate) = self.max_pacing_rate {
228            if rtt_updated {
229                let max_rate = max_pacing_rate * 1.25f32;
230                let max_cwnd =
231                    max_rate.to_bytes_per_period(rtt_stats.smoothed_rtt);
232                self.sender.limit_cwnd(max_cwnd as usize);
233            }
234        }
235    }
236
237    fn on_packet_neutered(&mut self, packet_number: u64) {
238        self.sender.on_packet_neutered(packet_number);
239    }
240
241    fn on_retransmission_timeout(&mut self, packets_retransmitted: bool) {
242        self.sender.on_retransmission_timeout(packets_retransmitted)
243    }
244
245    fn on_connection_migration(&mut self) {
246        self.sender.on_connection_migration()
247    }
248
249    fn is_cwnd_limited(&self, bytes_in_flight: usize) -> bool {
250        !self.pacing_limited && self.sender.is_cwnd_limited(bytes_in_flight)
251    }
252
253    fn is_in_recovery(&self) -> bool {
254        self.sender.is_in_recovery()
255    }
256
257    fn pacing_rate(
258        &self, bytes_in_flight: usize, rtt_stats: &RttStats,
259    ) -> Bandwidth {
260        let sender_rate = self.sender.pacing_rate(bytes_in_flight, rtt_stats);
261        match self.max_pacing_rate {
262            Some(rate) if self.enabled => rate.min(sender_rate),
263            _ => sender_rate,
264        }
265    }
266
267    fn bandwidth_estimate(&self, rtt_stats: &RttStats) -> Bandwidth {
268        self.sender.bandwidth_estimate(rtt_stats)
269    }
270
271    fn on_app_limited(&mut self, bytes_in_flight: usize) {
272        self.pacing_limited = false;
273        self.sender.on_app_limited(bytes_in_flight);
274    }
275
276    fn update_mss(&mut self, new_mss: usize) {
277        self.sender.update_mss(new_mss)
278    }
279
280    #[cfg(feature = "qlog")]
281    fn ssthresh(&self) -> Option<u64> {
282        self.sender.ssthresh()
283    }
284}