quiche/stream/
send_buf.rs

1// Copyright (C) 2023, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27use std::cmp;
28
29use std::collections::VecDeque;
30
31use crate::Error;
32use crate::Result;
33
34use crate::ranges;
35
36use super::RangeBuf;
37
38#[cfg(test)]
39const SEND_BUFFER_SIZE: usize = 5;
40
41#[cfg(not(test))]
42const SEND_BUFFER_SIZE: usize = 4096;
43
44/// Send-side stream buffer.
45///
46/// Stream data scheduled to be sent to the peer is buffered in a list of data
47/// chunks ordered by offset in ascending order. Contiguous data can then be
48/// read into a slice.
49///
50/// By default, new data is appended at the end of the stream, but data can be
51/// inserted at the start of the buffer (this is to allow data that needs to be
52/// retransmitted to be re-buffered).
53#[derive(Debug, Default)]
54pub struct SendBuf {
55    /// Chunks of data to be sent, ordered by offset.
56    data: VecDeque<RangeBuf>,
57
58    /// The index of the buffer that needs to be sent next.
59    pos: usize,
60
61    /// The maximum offset of data buffered in the stream.
62    off: u64,
63
64    /// The maximum offset of data sent to the peer, regardless of
65    /// retransmissions.
66    emit_off: u64,
67
68    /// The amount of data currently buffered.
69    len: u64,
70
71    /// The maximum offset we are allowed to send to the peer.
72    max_data: u64,
73
74    /// The last offset the stream was blocked at, if any.
75    blocked_at: Option<u64>,
76
77    /// The final stream offset written to the stream, if any.
78    fin_off: Option<u64>,
79
80    /// Whether the stream's send-side has been shut down.
81    shutdown: bool,
82
83    /// Ranges of data offsets that have been acked.
84    acked: ranges::RangeSet,
85
86    /// The error code received via STOP_SENDING.
87    error: Option<u64>,
88}
89
90impl SendBuf {
91    /// Creates a new send buffer.
92    pub fn new(max_data: u64) -> SendBuf {
93        SendBuf {
94            max_data,
95            ..SendBuf::default()
96        }
97    }
98
99    /// Inserts the given slice of data at the end of the buffer.
100    ///
101    /// The number of bytes that were actually stored in the buffer is returned
102    /// (this may be lower than the size of the input buffer, in case of partial
103    /// writes).
104    pub fn write(&mut self, mut data: &[u8], mut fin: bool) -> Result<usize> {
105        let max_off = self.off + data.len() as u64;
106
107        // Get the stream send capacity. This will return an error if the stream
108        // was stopped.
109        let capacity = self.cap()?;
110
111        if data.len() > capacity {
112            // Truncate the input buffer according to the stream's capacity.
113            let len = capacity;
114            data = &data[..len];
115
116            // We are not buffering the full input, so clear the fin flag.
117            fin = false;
118        }
119
120        if let Some(fin_off) = self.fin_off {
121            // Can't write past final offset.
122            if max_off > fin_off {
123                return Err(Error::FinalSize);
124            }
125
126            // Can't "undo" final offset.
127            if max_off == fin_off && !fin {
128                return Err(Error::FinalSize);
129            }
130        }
131
132        if fin {
133            self.fin_off = Some(max_off);
134        }
135
136        // Don't queue data that was already fully acked.
137        if self.ack_off() >= max_off {
138            return Ok(data.len());
139        }
140
141        // We already recorded the final offset, so we can just discard the
142        // empty buffer now.
143        if data.is_empty() {
144            return Ok(data.len());
145        }
146
147        let mut len = 0;
148
149        // Split the remaining input data into consistently-sized buffers to
150        // avoid fragmentation.
151        for chunk in data.chunks(SEND_BUFFER_SIZE) {
152            len += chunk.len();
153
154            let fin = len == data.len() && fin;
155
156            let buf = RangeBuf::from(chunk, self.off, fin);
157
158            // The new data can simply be appended at the end of the send buffer.
159            self.data.push_back(buf);
160
161            self.off += chunk.len() as u64;
162            self.len += chunk.len() as u64;
163        }
164
165        Ok(len)
166    }
167
168    /// Writes data from the send buffer into the given output buffer.
169    pub fn emit(&mut self, out: &mut [u8]) -> Result<(usize, bool)> {
170        let mut out_len = out.len();
171        let out_off = self.off_front();
172
173        let mut next_off = out_off;
174
175        while out_len > 0 {
176            let off_front = self.off_front();
177
178            if self.is_empty() ||
179                off_front >= self.off ||
180                off_front != next_off ||
181                off_front >= self.max_data
182            {
183                break;
184            }
185
186            let buf = match self.data.get_mut(self.pos) {
187                Some(v) => v,
188
189                None => break,
190            };
191
192            if buf.is_empty() {
193                self.pos += 1;
194                continue;
195            }
196
197            let buf_len = cmp::min(buf.len(), out_len);
198            let partial = buf_len < buf.len();
199
200            // Copy data to the output buffer.
201            let out_pos = (next_off - out_off) as usize;
202            out[out_pos..out_pos + buf_len].copy_from_slice(&buf[..buf_len]);
203
204            self.len -= buf_len as u64;
205
206            out_len -= buf_len;
207
208            next_off = buf.off() + buf_len as u64;
209
210            buf.consume(buf_len);
211
212            if partial {
213                // We reached the maximum capacity, so end here.
214                break;
215            }
216
217            self.pos += 1;
218        }
219
220        // Override the `fin` flag set for the output buffer by matching the
221        // buffer's maximum offset against the stream's final offset (if known).
222        //
223        // This is more efficient than tracking `fin` using the range buffers
224        // themselves, and lets us avoid queueing empty buffers just so we can
225        // propagate the final size.
226        let fin = self.fin_off == Some(next_off);
227
228        // Record the largest offset that has been sent so we can accurately
229        // report final_size
230        self.emit_off = cmp::max(self.emit_off, next_off);
231
232        Ok((out.len() - out_len, fin))
233    }
234
235    /// Updates the max_data limit to the given value.
236    pub fn update_max_data(&mut self, max_data: u64) {
237        self.max_data = cmp::max(self.max_data, max_data);
238    }
239
240    /// Updates the last offset the stream was blocked at, if any.
241    pub fn update_blocked_at(&mut self, blocked_at: Option<u64>) {
242        self.blocked_at = blocked_at;
243    }
244
245    /// The last offset the stream was blocked at, if any.
246    pub fn blocked_at(&self) -> Option<u64> {
247        self.blocked_at
248    }
249
250    /// Increments the acked data offset.
251    pub fn ack(&mut self, off: u64, len: usize) {
252        self.acked.insert(off..off + len as u64);
253    }
254
255    pub fn ack_and_drop(&mut self, off: u64, len: usize) {
256        self.ack(off, len);
257
258        let ack_off = self.ack_off();
259
260        if self.data.is_empty() {
261            return;
262        }
263
264        if off > ack_off {
265            return;
266        }
267
268        let mut drop_until = None;
269
270        // Drop contiguously acked data from the front of the buffer.
271        for (i, buf) in self.data.iter_mut().enumerate() {
272            // Newly acked range is past highest contiguous acked range, so we
273            // can't drop it.
274            if buf.off >= ack_off {
275                break;
276            }
277
278            // Highest contiguous acked range falls within newly acked range,
279            // so we can't drop it.
280            if buf.off < ack_off && ack_off < buf.max_off() {
281                break;
282            }
283
284            // Newly acked range can be dropped.
285            drop_until = Some(i);
286        }
287
288        if let Some(drop) = drop_until {
289            self.data.drain(..=drop);
290
291            // When a buffer is marked for retransmission, but then acked before
292            // it could be retransmitted, we might end up decreasing the SendBuf
293            // position too much, so make sure that doesn't happen.
294            self.pos = self.pos.saturating_sub(drop + 1);
295        }
296    }
297
298    pub fn retransmit(&mut self, off: u64, len: usize) {
299        let max_off = off + len as u64;
300        let ack_off = self.ack_off();
301
302        if self.data.is_empty() {
303            return;
304        }
305
306        if max_off <= ack_off {
307            return;
308        }
309
310        for i in 0..self.data.len() {
311            let buf = &mut self.data[i];
312
313            if buf.off >= max_off {
314                break;
315            }
316
317            if off > buf.max_off() {
318                continue;
319            }
320
321            // Split the buffer into 2 if the retransmit range ends before the
322            // buffer's final offset.
323            let new_buf = if buf.off < max_off && max_off < buf.max_off() {
324                Some(buf.split_off((max_off - buf.off) as usize))
325            } else {
326                None
327            };
328
329            let prev_pos = buf.pos;
330
331            // Reduce the buffer's position (expand the buffer) if the retransmit
332            // range is past the buffer's starting offset.
333            buf.pos = if off > buf.off && off <= buf.max_off() {
334                cmp::min(buf.pos, buf.start + (off - buf.off) as usize)
335            } else {
336                buf.start
337            };
338
339            self.pos = cmp::min(self.pos, i);
340
341            self.len += (prev_pos - buf.pos) as u64;
342
343            if let Some(b) = new_buf {
344                self.data.insert(i + 1, b);
345            }
346        }
347    }
348
349    /// Resets the stream at the current offset and clears all buffered data.
350    pub fn reset(&mut self) -> (u64, u64) {
351        let unsent_off = cmp::max(self.off_front(), self.emit_off);
352        let unsent_len = self.off_back().saturating_sub(unsent_off);
353
354        self.fin_off = Some(unsent_off);
355
356        // Drop all buffered data.
357        self.data.clear();
358
359        // Mark relevant data as acked.
360        self.off = unsent_off;
361        self.ack(0, self.off as usize);
362
363        self.pos = 0;
364        self.len = 0;
365
366        (self.emit_off, unsent_len)
367    }
368
369    /// Resets the streams and records the received error code.
370    ///
371    /// Calling this again after the first time has no effect.
372    pub fn stop(&mut self, error_code: u64) -> Result<(u64, u64)> {
373        if self.error.is_some() {
374            return Err(Error::Done);
375        }
376
377        let (max_off, unsent) = self.reset();
378
379        self.error = Some(error_code);
380
381        Ok((max_off, unsent))
382    }
383
384    /// Shuts down sending data.
385    pub fn shutdown(&mut self) -> Result<(u64, u64)> {
386        if self.shutdown {
387            return Err(Error::Done);
388        }
389
390        self.shutdown = true;
391
392        Ok(self.reset())
393    }
394
395    /// Returns the largest offset of data buffered.
396    pub fn off_back(&self) -> u64 {
397        self.off
398    }
399
400    /// Returns the lowest offset of data buffered.
401    pub fn off_front(&self) -> u64 {
402        let mut pos = self.pos;
403
404        // Skip empty buffers from the start of the queue.
405        while let Some(b) = self.data.get(pos) {
406            if !b.is_empty() {
407                return b.off();
408            }
409
410            pos += 1;
411        }
412
413        self.off
414    }
415
416    /// The maximum offset we are allowed to send to the peer.
417    pub fn max_off(&self) -> u64 {
418        self.max_data
419    }
420
421    /// Returns true if all data in the stream has been sent.
422    ///
423    /// This happens when the stream's send final size is known, and the
424    /// application has already written data up to that point.
425    pub fn is_fin(&self) -> bool {
426        if self.fin_off == Some(self.off) {
427            return true;
428        }
429
430        false
431    }
432
433    /// Returns true if the send-side of the stream is complete.
434    ///
435    /// This happens when the stream's send final size is known, and the peer
436    /// has already acked all stream data up to that point.
437    pub fn is_complete(&self) -> bool {
438        if let Some(fin_off) = self.fin_off {
439            if self.acked == (0..fin_off) {
440                return true;
441            }
442        }
443
444        false
445    }
446
447    /// Returns true if the stream was stopped before completion.
448    pub fn is_stopped(&self) -> bool {
449        self.error.is_some()
450    }
451
452    /// Returns true if the stream was shut down.
453    pub fn is_shutdown(&self) -> bool {
454        self.shutdown
455    }
456
457    /// Returns true if there is no data.
458    pub fn is_empty(&self) -> bool {
459        self.data.is_empty()
460    }
461
462    /// Returns the highest contiguously acked offset.
463    pub fn ack_off(&self) -> u64 {
464        match self.acked.iter().next() {
465            // Only consider the initial range if it contiguously covers the
466            // start of the stream (i.e. from offset 0).
467            Some(std::ops::Range { start: 0, end }) => end,
468
469            Some(_) | None => 0,
470        }
471    }
472
473    /// Returns the outgoing flow control capacity.
474    pub fn cap(&self) -> Result<usize> {
475        // The stream was stopped, so return the error code instead.
476        if let Some(e) = self.error {
477            return Err(Error::StreamStopped(e));
478        }
479
480        Ok((self.max_data - self.off) as usize)
481    }
482
483    /// Returns the number of separate buffers stored.
484    #[allow(dead_code)]
485    pub fn bufs_count(&self) -> usize {
486        self.data.len()
487    }
488}
489
490#[cfg(test)]
491mod tests {
492    use super::*;
493
494    #[test]
495    fn empty_write() {
496        let mut buf = [0; 5];
497
498        let mut send = SendBuf::new(u64::MAX);
499        assert_eq!(send.len, 0);
500
501        let (written, fin) = send.emit(&mut buf).unwrap();
502        assert_eq!(written, 0);
503        assert!(!fin);
504    }
505
506    #[test]
507    fn multi_write() {
508        let mut buf = [0; 128];
509
510        let mut send = SendBuf::new(u64::MAX);
511        assert_eq!(send.len, 0);
512
513        let first = b"something";
514        let second = b"helloworld";
515
516        assert!(send.write(first, false).is_ok());
517        assert_eq!(send.len, 9);
518
519        assert!(send.write(second, true).is_ok());
520        assert_eq!(send.len, 19);
521
522        let (written, fin) = send.emit(&mut buf[..128]).unwrap();
523        assert_eq!(written, 19);
524        assert!(fin);
525        assert_eq!(&buf[..written], b"somethinghelloworld");
526        assert_eq!(send.len, 0);
527    }
528
529    #[test]
530    fn split_write() {
531        let mut buf = [0; 10];
532
533        let mut send = SendBuf::new(u64::MAX);
534        assert_eq!(send.len, 0);
535
536        let first = b"something";
537        let second = b"helloworld";
538
539        assert!(send.write(first, false).is_ok());
540        assert_eq!(send.len, 9);
541
542        assert!(send.write(second, true).is_ok());
543        assert_eq!(send.len, 19);
544
545        assert_eq!(send.off_front(), 0);
546
547        let (written, fin) = send.emit(&mut buf[..10]).unwrap();
548        assert_eq!(written, 10);
549        assert!(!fin);
550        assert_eq!(&buf[..written], b"somethingh");
551        assert_eq!(send.len, 9);
552
553        assert_eq!(send.off_front(), 10);
554
555        let (written, fin) = send.emit(&mut buf[..5]).unwrap();
556        assert_eq!(written, 5);
557        assert!(!fin);
558        assert_eq!(&buf[..written], b"ellow");
559        assert_eq!(send.len, 4);
560
561        assert_eq!(send.off_front(), 15);
562
563        let (written, fin) = send.emit(&mut buf[..10]).unwrap();
564        assert_eq!(written, 4);
565        assert!(fin);
566        assert_eq!(&buf[..written], b"orld");
567        assert_eq!(send.len, 0);
568
569        assert_eq!(send.off_front(), 19);
570    }
571
572    #[test]
573    fn resend() {
574        let mut buf = [0; 15];
575
576        let mut send = SendBuf::new(u64::MAX);
577        assert_eq!(send.len, 0);
578        assert_eq!(send.off_front(), 0);
579
580        let first = b"something";
581        let second = b"helloworld";
582
583        assert!(send.write(first, false).is_ok());
584        assert_eq!(send.off_front(), 0);
585
586        assert!(send.write(second, true).is_ok());
587        assert_eq!(send.off_front(), 0);
588
589        assert_eq!(send.len, 19);
590
591        let (written, fin) = send.emit(&mut buf[..4]).unwrap();
592        assert_eq!(written, 4);
593        assert!(!fin);
594        assert_eq!(&buf[..written], b"some");
595        assert_eq!(send.len, 15);
596        assert_eq!(send.off_front(), 4);
597
598        let (written, fin) = send.emit(&mut buf[..5]).unwrap();
599        assert_eq!(written, 5);
600        assert!(!fin);
601        assert_eq!(&buf[..written], b"thing");
602        assert_eq!(send.len, 10);
603        assert_eq!(send.off_front(), 9);
604
605        let (written, fin) = send.emit(&mut buf[..5]).unwrap();
606        assert_eq!(written, 5);
607        assert!(!fin);
608        assert_eq!(&buf[..written], b"hello");
609        assert_eq!(send.len, 5);
610        assert_eq!(send.off_front(), 14);
611
612        send.retransmit(4, 5);
613        assert_eq!(send.len, 10);
614        assert_eq!(send.off_front(), 4);
615
616        send.retransmit(0, 4);
617        assert_eq!(send.len, 14);
618        assert_eq!(send.off_front(), 0);
619
620        let (written, fin) = send.emit(&mut buf[..11]).unwrap();
621        assert_eq!(written, 9);
622        assert!(!fin);
623        assert_eq!(&buf[..written], b"something");
624        assert_eq!(send.len, 5);
625        assert_eq!(send.off_front(), 14);
626
627        let (written, fin) = send.emit(&mut buf[..11]).unwrap();
628        assert_eq!(written, 5);
629        assert!(fin);
630        assert_eq!(&buf[..written], b"world");
631        assert_eq!(send.len, 0);
632        assert_eq!(send.off_front(), 19);
633    }
634
635    #[test]
636    fn write_blocked_by_off() {
637        let mut buf = [0; 10];
638
639        let mut send = SendBuf::default();
640        assert_eq!(send.len, 0);
641
642        let first = b"something";
643        let second = b"helloworld";
644
645        assert_eq!(send.write(first, false), Ok(0));
646        assert_eq!(send.len, 0);
647
648        assert_eq!(send.write(second, true), Ok(0));
649        assert_eq!(send.len, 0);
650
651        send.update_max_data(5);
652
653        assert_eq!(send.write(first, false), Ok(5));
654        assert_eq!(send.len, 5);
655
656        assert_eq!(send.write(second, true), Ok(0));
657        assert_eq!(send.len, 5);
658
659        assert_eq!(send.off_front(), 0);
660
661        let (written, fin) = send.emit(&mut buf[..10]).unwrap();
662        assert_eq!(written, 5);
663        assert!(!fin);
664        assert_eq!(&buf[..written], b"somet");
665        assert_eq!(send.len, 0);
666
667        assert_eq!(send.off_front(), 5);
668
669        let (written, fin) = send.emit(&mut buf[..10]).unwrap();
670        assert_eq!(written, 0);
671        assert!(!fin);
672        assert_eq!(&buf[..written], b"");
673        assert_eq!(send.len, 0);
674
675        send.update_max_data(15);
676
677        assert_eq!(send.write(&first[5..], false), Ok(4));
678        assert_eq!(send.len, 4);
679
680        assert_eq!(send.write(second, true), Ok(6));
681        assert_eq!(send.len, 10);
682
683        assert_eq!(send.off_front(), 5);
684
685        let (written, fin) = send.emit(&mut buf[..10]).unwrap();
686        assert_eq!(written, 10);
687        assert!(!fin);
688        assert_eq!(&buf[..10], b"hinghellow");
689        assert_eq!(send.len, 0);
690
691        send.update_max_data(25);
692
693        assert_eq!(send.write(&second[6..], true), Ok(4));
694        assert_eq!(send.len, 4);
695
696        assert_eq!(send.off_front(), 15);
697
698        let (written, fin) = send.emit(&mut buf[..10]).unwrap();
699        assert_eq!(written, 4);
700        assert!(fin);
701        assert_eq!(&buf[..written], b"orld");
702        assert_eq!(send.len, 0);
703    }
704
705    #[test]
706    fn zero_len_write() {
707        let mut buf = [0; 10];
708
709        let mut send = SendBuf::new(u64::MAX);
710        assert_eq!(send.len, 0);
711
712        let first = b"something";
713
714        assert!(send.write(first, false).is_ok());
715        assert_eq!(send.len, 9);
716
717        assert!(send.write(&[], true).is_ok());
718        assert_eq!(send.len, 9);
719
720        assert_eq!(send.off_front(), 0);
721
722        let (written, fin) = send.emit(&mut buf[..10]).unwrap();
723        assert_eq!(written, 9);
724        assert!(fin);
725        assert_eq!(&buf[..written], b"something");
726        assert_eq!(send.len, 0);
727    }
728
729    /// Check SendBuf::len calculation on a retransmit case
730    #[test]
731    fn send_buf_len_on_retransmit() {
732        let mut buf = [0; 15];
733
734        let mut send = SendBuf::new(u64::MAX);
735        assert_eq!(send.len, 0);
736        assert_eq!(send.off_front(), 0);
737
738        let first = b"something";
739
740        assert!(send.write(first, false).is_ok());
741        assert_eq!(send.off_front(), 0);
742
743        assert_eq!(send.len, 9);
744
745        let (written, fin) = send.emit(&mut buf[..4]).unwrap();
746        assert_eq!(written, 4);
747        assert!(!fin);
748        assert_eq!(&buf[..written], b"some");
749        assert_eq!(send.len, 5);
750        assert_eq!(send.off_front(), 4);
751
752        send.retransmit(3, 5);
753        assert_eq!(send.len, 6);
754        assert_eq!(send.off_front(), 3);
755    }
756
757    #[test]
758    fn send_buf_final_size_retransmit() {
759        let mut buf = [0; 50];
760        let mut send = SendBuf::new(u64::MAX);
761
762        send.write(&buf, false).unwrap();
763        assert_eq!(send.off_front(), 0);
764
765        // Emit the whole buffer
766        let (written, _fin) = send.emit(&mut buf).unwrap();
767        assert_eq!(written, buf.len());
768        assert_eq!(send.off_front(), buf.len() as u64);
769
770        // Server decides to retransmit the last 10 bytes. It's possible
771        // it's not actually lost and that the client did receive it.
772        send.retransmit(40, 10);
773
774        // Server receives STOP_SENDING from client. The final_size we
775        // send in the RESET_STREAM should be 50. If we send anything less,
776        // it's a FINAL_SIZE_ERROR.
777        let (fin_off, unsent) = send.stop(0).unwrap();
778        assert_eq!(fin_off, 50);
779        assert_eq!(unsent, 0);
780    }
781}