quiche/recovery/congestion/
mod.rs

1// Copyright (C) 2024, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27use debug_panic::debug_panic;
28use std::time::Duration;
29use std::time::Instant;
30
31use self::recovery::Acked;
32use super::bandwidth::Bandwidth;
33use super::RecoveryConfig;
34use super::Sent;
35use crate::recovery::rtt;
36use crate::recovery::rtt::RttStats;
37use crate::recovery::CongestionControlAlgorithm;
38use crate::StartupExit;
39use crate::StartupExitReason;
40
41pub const PACING_MULTIPLIER: f64 = 1.25;
42
43pub struct SsThresh {
44    // Current slow start threshold.  Defaults to usize::MAX which
45    // indicates we're still in the initial slow start phase.
46    ssthresh: usize,
47
48    // Information about the slow start exit, if it already happened.
49    // Set on the first call to update().
50    startup_exit: Option<StartupExit>,
51}
52
53impl Default for SsThresh {
54    fn default() -> Self {
55        Self {
56            ssthresh: usize::MAX,
57            startup_exit: None,
58        }
59    }
60}
61
62impl SsThresh {
63    fn get(&self) -> usize {
64        self.ssthresh
65    }
66
67    fn startup_exit(&self) -> Option<StartupExit> {
68        self.startup_exit
69    }
70
71    fn update(&mut self, ssthresh: usize, in_css: bool) {
72        if self.startup_exit.is_none() {
73            let reason = if in_css {
74                // Exit happened in conservative slow start, attribute
75                // the exit to persistent queues.
76                StartupExitReason::PersistentQueue
77            } else {
78                // In normal slow start, attribute the exit to loss.
79                StartupExitReason::Loss
80            };
81            self.startup_exit = Some(StartupExit::new(ssthresh, reason));
82        }
83        self.ssthresh = ssthresh;
84    }
85}
86
87pub struct Congestion {
88    // Congestion control.
89    pub(crate) cc_ops: &'static CongestionControlOps,
90
91    cubic_state: cubic::State,
92
93    // HyStart++.
94    pub(crate) hystart: hystart::Hystart,
95
96    // Pacing.
97    pub(crate) pacer: pacer::Pacer,
98
99    // RFC6937 PRR.
100    pub(crate) prr: prr::PRR,
101
102    // The maximum size of a data aggregate scheduled and
103    // transmitted together.
104    send_quantum: usize,
105
106    // BBR state.
107    bbr_state: bbr::State,
108
109    // BBRv2 state.
110    bbr2_state: bbr2::State,
111
112    pub(crate) congestion_window: usize,
113
114    pub(crate) ssthresh: SsThresh,
115
116    bytes_acked_sl: usize,
117
118    bytes_acked_ca: usize,
119
120    pub(crate) congestion_recovery_start_time: Option<Instant>,
121
122    pub(crate) app_limited: bool,
123
124    pub(crate) delivery_rate: delivery_rate::Rate,
125
126    initial_rtt: Duration,
127
128    /// Initial congestion window size in terms of packet count.
129    pub(crate) initial_congestion_window_packets: usize,
130
131    max_datagram_size: usize,
132
133    pub(crate) lost_count: usize,
134}
135
136impl Congestion {
137    pub(crate) fn from_config(recovery_config: &RecoveryConfig) -> Self {
138        let initial_congestion_window = recovery_config.max_send_udp_payload_size *
139            recovery_config.initial_congestion_window_packets;
140
141        let mut cc = Congestion {
142            congestion_window: initial_congestion_window,
143
144            ssthresh: Default::default(),
145
146            bytes_acked_sl: 0,
147
148            bytes_acked_ca: 0,
149
150            congestion_recovery_start_time: None,
151
152            cc_ops: recovery_config.cc_algorithm.into(),
153
154            cubic_state: cubic::State::default(),
155
156            app_limited: false,
157
158            lost_count: 0,
159
160            initial_rtt: recovery_config.initial_rtt,
161
162            initial_congestion_window_packets: recovery_config
163                .initial_congestion_window_packets,
164
165            max_datagram_size: recovery_config.max_send_udp_payload_size,
166
167            send_quantum: initial_congestion_window,
168
169            delivery_rate: delivery_rate::Rate::default(),
170
171            hystart: hystart::Hystart::new(recovery_config.hystart),
172
173            pacer: pacer::Pacer::new(
174                recovery_config.pacing,
175                initial_congestion_window,
176                0,
177                recovery_config.max_send_udp_payload_size,
178                recovery_config.max_pacing_rate,
179            ),
180
181            prr: prr::PRR::default(),
182
183            bbr_state: bbr::State::new(),
184
185            bbr2_state: bbr2::State::new(),
186        };
187
188        (cc.cc_ops.on_init)(&mut cc);
189
190        cc
191    }
192
193    pub(crate) fn in_congestion_recovery(&self, sent_time: Instant) -> bool {
194        match self.congestion_recovery_start_time {
195            Some(congestion_recovery_start_time) =>
196                sent_time <= congestion_recovery_start_time,
197
198            None => false,
199        }
200    }
201
202    /// The most recent data delivery rate estimate.
203    pub(crate) fn delivery_rate(&self) -> Bandwidth {
204        self.delivery_rate.sample_delivery_rate()
205    }
206
207    pub(crate) fn send_quantum(&self) -> usize {
208        self.send_quantum
209    }
210
211    pub(crate) fn set_pacing_rate(&mut self, rate: u64, now: Instant) {
212        self.pacer.update(self.send_quantum, rate, now);
213    }
214
215    pub(crate) fn congestion_window(&self) -> usize {
216        self.congestion_window
217    }
218
219    fn update_app_limited(&mut self, v: bool) {
220        self.app_limited = v;
221    }
222
223    #[allow(clippy::too_many_arguments)]
224    pub(crate) fn on_packet_sent(
225        &mut self, bytes_in_flight: usize, sent_bytes: usize, now: Instant,
226        pkt: &mut Sent, rtt_stats: &RttStats, bytes_lost: u64, in_flight: bool,
227    ) {
228        if in_flight {
229            self.update_app_limited(
230                (bytes_in_flight + sent_bytes) < self.congestion_window,
231            );
232
233            (self.cc_ops.on_packet_sent)(self, sent_bytes, bytes_in_flight, now);
234
235            self.prr.on_packet_sent(sent_bytes);
236
237            // HyStart++: Start of the round in a slow start.
238            if self.hystart.enabled() &&
239                self.congestion_window < self.ssthresh.get()
240            {
241                self.hystart.start_round(pkt.pkt_num);
242            }
243        }
244
245        // Pacing: Set the pacing rate if CC doesn't do its own.
246        if !(self.cc_ops.has_custom_pacing)() && rtt_stats.has_first_rtt_sample {
247            let rate = PACING_MULTIPLIER * self.congestion_window as f64 /
248                rtt_stats.smoothed_rtt.as_secs_f64();
249            self.set_pacing_rate(rate as u64, now);
250        }
251
252        self.schedule_next_packet(now, sent_bytes);
253
254        pkt.time_sent = self.get_packet_send_time();
255
256        // bytes_in_flight is already updated. Use previous value.
257        self.delivery_rate
258            .on_packet_sent(pkt, bytes_in_flight, bytes_lost);
259    }
260
261    pub(crate) fn on_packets_acked(
262        &mut self, bytes_in_flight: usize, acked: &mut Vec<Acked>,
263        rtt_stats: &RttStats, now: Instant,
264    ) {
265        // Update delivery rate sample per acked packet.
266        for pkt in acked.iter() {
267            self.delivery_rate.update_rate_sample(pkt, now);
268        }
269
270        // Fill in a rate sample.
271        self.delivery_rate.generate_rate_sample(*rtt_stats.min_rtt);
272
273        // Call congestion control hooks.
274        (self.cc_ops.on_packets_acked)(
275            self,
276            bytes_in_flight,
277            acked,
278            now,
279            rtt_stats,
280        );
281    }
282
283    fn schedule_next_packet(&mut self, now: Instant, packet_size: usize) {
284        // Don't pace in any of these cases:
285        //   * Packet contains no data.
286        //   * The congestion window is within initcwnd.
287
288        let in_initcwnd = self.congestion_window <
289            self.max_datagram_size * self.initial_congestion_window_packets;
290
291        let sent_bytes = if !self.pacer.enabled() || in_initcwnd {
292            0
293        } else {
294            packet_size
295        };
296
297        self.pacer.send(sent_bytes, now);
298    }
299
300    pub(crate) fn get_packet_send_time(&self) -> Instant {
301        self.pacer.next_time()
302    }
303}
304
305pub(crate) struct CongestionControlOps {
306    pub on_init: fn(r: &mut Congestion),
307
308    pub on_packet_sent: fn(
309        r: &mut Congestion,
310        sent_bytes: usize,
311        bytes_in_flight: usize,
312        now: Instant,
313    ),
314
315    pub on_packets_acked: fn(
316        r: &mut Congestion,
317        bytes_in_flight: usize,
318        packets: &mut Vec<Acked>,
319        now: Instant,
320        rtt_stats: &RttStats,
321    ),
322
323    pub congestion_event: fn(
324        r: &mut Congestion,
325        bytes_in_flight: usize,
326        lost_bytes: usize,
327        largest_lost_packet: &Sent,
328        now: Instant,
329    ),
330
331    pub checkpoint: fn(r: &mut Congestion),
332
333    pub rollback: fn(r: &mut Congestion) -> bool,
334
335    pub has_custom_pacing: fn() -> bool,
336
337    #[cfg(feature = "qlog")]
338    pub state_str: fn(r: &Congestion, now: Instant) -> &'static str,
339
340    pub debug_fmt: fn(
341        r: &Congestion,
342        formatter: &mut std::fmt::Formatter,
343    ) -> std::fmt::Result,
344}
345
346impl From<CongestionControlAlgorithm> for &'static CongestionControlOps {
347    fn from(algo: CongestionControlAlgorithm) -> Self {
348        match algo {
349            CongestionControlAlgorithm::Reno => &reno::RENO,
350            CongestionControlAlgorithm::CUBIC => &cubic::CUBIC,
351            CongestionControlAlgorithm::BBR => &bbr::BBR,
352            CongestionControlAlgorithm::BBR2 => &bbr2::BBR2,
353            CongestionControlAlgorithm::Bbr2Gcongestion => {
354                debug_panic!("legacy implementation, not gcongestion");
355                &bbr2::BBR2
356            },
357        }
358    }
359}
360
361#[cfg(test)]
362mod tests {
363    use super::*;
364
365    #[test]
366    fn ssthresh_init() {
367        let ssthresh: SsThresh = Default::default();
368        assert_eq!(ssthresh.get(), usize::MAX);
369        assert_eq!(ssthresh.startup_exit(), None);
370    }
371
372    #[test]
373    fn ssthresh_in_css() {
374        let expected_startup_exit =
375            StartupExit::new(1000, StartupExitReason::PersistentQueue);
376        let mut ssthresh: SsThresh = Default::default();
377        ssthresh.update(1000, true);
378        assert_eq!(ssthresh.get(), 1000);
379        assert_eq!(ssthresh.startup_exit(), Some(expected_startup_exit));
380
381        ssthresh.update(2000, true);
382        assert_eq!(ssthresh.get(), 2000);
383        // startup_exit is only updated on the first update.
384        assert_eq!(ssthresh.startup_exit(), Some(expected_startup_exit));
385
386        ssthresh.update(500, false);
387        assert_eq!(ssthresh.get(), 500);
388        assert_eq!(ssthresh.startup_exit(), Some(expected_startup_exit));
389    }
390
391    #[test]
392    fn ssthresh_in_slow_start() {
393        let expected_startup_exit =
394            StartupExit::new(1000, StartupExitReason::Loss);
395        let mut ssthresh: SsThresh = Default::default();
396        ssthresh.update(1000, false);
397        assert_eq!(ssthresh.get(), 1000);
398        assert_eq!(ssthresh.startup_exit(), Some(expected_startup_exit));
399
400        ssthresh.update(2000, true);
401        assert_eq!(ssthresh.get(), 2000);
402        // startup_exit is only updated on the first update.
403        assert_eq!(ssthresh.startup_exit(), Some(expected_startup_exit));
404
405        ssthresh.update(500, false);
406        assert_eq!(ssthresh.get(), 500);
407        assert_eq!(ssthresh.startup_exit(), Some(expected_startup_exit));
408    }
409}
410
411mod bbr;
412mod bbr2;
413mod cubic;
414mod delivery_rate;
415mod hystart;
416pub(crate) mod pacer;
417mod prr;
418pub(crate) mod recovery;
419mod reno;
420
421#[cfg(test)]
422mod test_sender;