1use std::time::Duration;
28use std::time::Instant;
29
30const WINDOW_INCREASE_FACTOR: u64 = 2;
33
34const WINDOW_TRIGGER_FACTOR: u32 = 2;
37
38#[derive(Default, Debug)]
39pub struct FlowControl {
40 consumed: u64,
42
43 max_data: u64,
45
46 window: u64,
49
50 max_window: u64,
52
53 last_update: Option<Instant>,
55}
56
57impl FlowControl {
58 pub fn new(max_data: u64, window: u64, max_window: u64) -> Self {
59 Self {
60 max_data,
61
62 window,
63
64 max_window,
65
66 ..Default::default()
67 }
68 }
69
70 pub fn window(&self) -> u64 {
72 self.window
73 }
74
75 pub fn max_data(&self) -> u64 {
77 self.max_data
78 }
79
80 pub fn add_consumed(&mut self, consumed: u64) {
82 self.consumed += consumed;
83 }
84
85 pub fn should_update_max_data(&self) -> bool {
90 let available_window = self.max_data - self.consumed;
91
92 available_window < (self.window / 2)
93 }
94
95 pub fn max_data_next(&self) -> u64 {
97 self.consumed + self.window
98 }
99
100 pub fn update_max_data(&mut self, now: Instant) {
102 self.max_data = self.max_data_next();
103 self.last_update = Some(now);
104 }
105
106 pub fn autotune_window(&mut self, now: Instant, rtt: Duration) {
110 if let Some(last_update) = self.last_update {
111 if now - last_update < rtt * WINDOW_TRIGGER_FACTOR {
112 self.window = std::cmp::min(
113 self.window * WINDOW_INCREASE_FACTOR,
114 self.max_window,
115 );
116 }
117 }
118 }
119
120 pub fn ensure_window_lower_bound(&mut self, min_window: u64) {
123 if min_window > self.window {
124 self.window = min_window;
125 }
126 }
127}
128
129#[cfg(test)]
130mod tests {
131 use super::*;
132
133 #[test]
134 fn max_data() {
135 let fc = FlowControl::new(100, 20, 100);
136
137 assert_eq!(fc.max_data(), 100);
138 }
139
140 #[test]
141 fn should_update_max_data() {
142 let mut fc = FlowControl::new(100, 20, 100);
143
144 fc.add_consumed(85);
145 assert!(!fc.should_update_max_data());
146
147 fc.add_consumed(10);
148 assert!(fc.should_update_max_data());
149 }
150
151 #[test]
152 fn max_data_next() {
153 let mut fc = FlowControl::new(100, 20, 100);
154
155 let consumed = 95;
156
157 fc.add_consumed(consumed);
158 assert!(fc.should_update_max_data());
159 assert_eq!(fc.max_data_next(), consumed + 20);
160 }
161
162 #[test]
163 fn update_max_data() {
164 let mut fc = FlowControl::new(100, 20, 100);
165
166 let consumed = 95;
167
168 fc.add_consumed(consumed);
169 assert!(fc.should_update_max_data());
170
171 let max_data_next = fc.max_data_next();
172 assert_eq!(fc.max_data_next(), consumed + 20);
173
174 fc.update_max_data(Instant::now());
175 assert_eq!(fc.max_data(), max_data_next);
176 }
177
178 #[test]
179 fn autotune_window() {
180 let w = 20;
181 let mut fc = FlowControl::new(100, w, 100);
182
183 let consumed = 95;
184
185 fc.add_consumed(consumed);
186 assert!(fc.should_update_max_data());
187
188 let max_data_next = fc.max_data_next();
189 assert_eq!(max_data_next, consumed + w);
190
191 fc.update_max_data(Instant::now());
192 assert_eq!(fc.max_data(), max_data_next);
193
194 fc.autotune_window(Instant::now(), Duration::from_millis(100));
196
197 let w = w * 2;
198 let consumed_inc = 15;
199
200 fc.add_consumed(consumed_inc);
201 assert!(fc.should_update_max_data());
202
203 let max_data_next = fc.max_data_next();
204 assert_eq!(max_data_next, consumed + consumed_inc + w);
205 }
206
207 #[test]
208 fn ensure_window_lower_bound() {
209 let w = 20;
210 let mut fc = FlowControl::new(100, w, 100);
211
212 fc.ensure_window_lower_bound(w);
214 assert_eq!(fc.window(), 20);
215
216 fc.ensure_window_lower_bound(w * 2);
218 assert_eq!(fc.window(), 40);
219 }
220}