quiche/stream/
recv_buf.rs

1// Copyright (C) 2023, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27use std::cmp;
28
29use std::collections::BTreeMap;
30use std::collections::VecDeque;
31
32use std::time::Duration;
33use std::time::Instant;
34
35use crate::stream::RecvBufResetReturn;
36use crate::Error;
37use crate::Result;
38
39use crate::flowcontrol;
40
41use crate::range_buf::RangeBuf;
42
43use super::DEFAULT_STREAM_WINDOW;
44
45/// Receive-side stream buffer.
46///
47/// Stream data received by the peer is buffered in a list of data chunks
48/// ordered by offset in ascending order. Contiguous data can then be read
49/// into a slice.
50#[derive(Debug, Default)]
51pub struct RecvBuf {
52    /// Chunks of data received from the peer that have not yet been read by
53    /// the application, ordered by offset.
54    data: BTreeMap<u64, RangeBuf>,
55
56    /// The lowest data offset that has yet to be read by the application.
57    off: u64,
58
59    /// The total length of data received on this stream.
60    len: u64,
61
62    /// Receiver flow controller.
63    flow_control: flowcontrol::FlowControl,
64
65    /// The final stream offset received from the peer, if any.
66    fin_off: Option<u64>,
67
68    /// The error code received via RESET_STREAM.
69    error: Option<u64>,
70
71    /// Whether incoming data is validated but not buffered.
72    drain: bool,
73}
74
75impl RecvBuf {
76    /// Creates a new receive buffer.
77    pub fn new(max_data: u64, max_window: u64) -> RecvBuf {
78        RecvBuf {
79            flow_control: flowcontrol::FlowControl::new(
80                max_data,
81                cmp::min(max_data, DEFAULT_STREAM_WINDOW),
82                max_window,
83            ),
84            ..RecvBuf::default()
85        }
86    }
87
88    /// Inserts the given chunk of data in the buffer.
89    ///
90    /// This also takes care of enforcing stream flow control limits, as well
91    /// as handling incoming data that overlaps data that is already in the
92    /// buffer.
93    pub fn write(&mut self, buf: RangeBuf) -> Result<()> {
94        if buf.max_off() > self.max_data() {
95            return Err(Error::FlowControl);
96        }
97
98        if let Some(fin_off) = self.fin_off {
99            // Stream's size is known, forbid data beyond that point.
100            if buf.max_off() > fin_off {
101                return Err(Error::FinalSize);
102            }
103
104            // Stream's size is already known, forbid changing it.
105            if buf.fin() && fin_off != buf.max_off() {
106                return Err(Error::FinalSize);
107            }
108        }
109
110        // Stream's known size is lower than data already received.
111        if buf.fin() && buf.max_off() < self.len {
112            return Err(Error::FinalSize);
113        }
114
115        // We already saved the final offset, so there's nothing else we
116        // need to keep from the RangeBuf if it's empty.
117        if self.fin_off.is_some() && buf.is_empty() {
118            return Ok(());
119        }
120
121        if buf.fin() {
122            self.fin_off = Some(buf.max_off());
123        }
124
125        // No need to store empty buffer that doesn't carry the fin flag.
126        if !buf.fin() && buf.is_empty() {
127            return Ok(());
128        }
129
130        // Check if data is fully duplicate, that is the buffer's max offset is
131        // lower or equal to the offset already stored in the recv buffer.
132        if self.off >= buf.max_off() {
133            // An exception is applied to empty range buffers, because an empty
134            // buffer's max offset matches the max offset of the recv buffer.
135            //
136            // By this point all spurious empty buffers should have already been
137            // discarded, so allowing empty buffers here should be safe.
138            if !buf.is_empty() {
139                return Ok(());
140            }
141        }
142
143        let mut tmp_bufs = VecDeque::with_capacity(2);
144        tmp_bufs.push_back(buf);
145
146        'tmp: while let Some(mut buf) = tmp_bufs.pop_front() {
147            // Discard incoming data below current stream offset. Bytes up to
148            // `self.off` have already been received so we should not buffer
149            // them again. This is also important to make sure `ready()` doesn't
150            // get stuck when a buffer with lower offset than the stream's is
151            // buffered.
152            if self.off_front() > buf.off() {
153                buf = buf.split_off((self.off_front() - buf.off()) as usize);
154            }
155
156            // Handle overlapping data. If the incoming data's starting offset
157            // is above the previous maximum received offset, there is clearly
158            // no overlap so this logic can be skipped. However do still try to
159            // merge an empty final buffer (i.e. an empty buffer with the fin
160            // flag set, which is the only kind of empty buffer that should
161            // reach this point).
162            if buf.off() < self.max_off() || buf.is_empty() {
163                for (_, b) in self.data.range(buf.off()..) {
164                    let off = buf.off();
165
166                    // We are past the current buffer.
167                    if b.off() > buf.max_off() {
168                        break;
169                    }
170
171                    // New buffer is fully contained in existing buffer.
172                    if off >= b.off() && buf.max_off() <= b.max_off() {
173                        continue 'tmp;
174                    }
175
176                    // New buffer's start overlaps existing buffer.
177                    if off >= b.off() && off < b.max_off() {
178                        buf = buf.split_off((b.max_off() - off) as usize);
179                    }
180
181                    // New buffer's end overlaps existing buffer.
182                    if off < b.off() && buf.max_off() > b.off() {
183                        tmp_bufs
184                            .push_back(buf.split_off((b.off() - off) as usize));
185                    }
186                }
187            }
188
189            self.len = cmp::max(self.len, buf.max_off());
190
191            if !self.drain {
192                self.data.insert(buf.max_off(), buf);
193            } else {
194                // we are not storing any data, off == len
195                self.off = self.len;
196            }
197        }
198
199        Ok(())
200    }
201
202    /// Writes data from the receive buffer into the given output buffer.
203    ///
204    /// Only contiguous data is written to the output buffer, starting from
205    /// offset 0. The offset is incremented as data is read out of the receive
206    /// buffer into the application buffer. If there is no data at the expected
207    /// read offset, the `Done` error is returned.
208    ///
209    /// On success the amount of data read, and a flag indicating if there is
210    /// no more data in the buffer, are returned as a tuple.
211    pub fn emit(&mut self, out: &mut [u8]) -> Result<(usize, bool)> {
212        let mut len = 0;
213        let mut cap = out.len();
214
215        if !self.ready() {
216            return Err(Error::Done);
217        }
218
219        // The stream was reset, so clear its data and return the error code
220        // instead.
221        if let Some(e) = self.error {
222            self.data.clear();
223            return Err(Error::StreamReset(e));
224        }
225
226        while cap > 0 && self.ready() {
227            let mut entry = match self.data.first_entry() {
228                Some(entry) => entry,
229                None => break,
230            };
231
232            let buf = entry.get_mut();
233
234            let buf_len = cmp::min(buf.len(), cap);
235
236            out[len..len + buf_len].copy_from_slice(&buf[..buf_len]);
237
238            self.off += buf_len as u64;
239
240            len += buf_len;
241            cap -= buf_len;
242
243            if buf_len < buf.len() {
244                buf.consume(buf_len);
245
246                // We reached the maximum capacity, so end here.
247                break;
248            }
249
250            entry.remove();
251        }
252
253        // Update consumed bytes for flow control.
254        self.flow_control.add_consumed(len as u64);
255
256        Ok((len, self.is_fin()))
257    }
258
259    /// Resets the stream at the given offset.
260    pub fn reset(
261        &mut self, error_code: u64, final_size: u64,
262    ) -> Result<RecvBufResetReturn> {
263        // Stream's size is already known, forbid changing it.
264        if let Some(fin_off) = self.fin_off {
265            if fin_off != final_size {
266                return Err(Error::FinalSize);
267            }
268        }
269
270        // Stream's known size is lower than data already received.
271        if final_size < self.len {
272            return Err(Error::FinalSize);
273        }
274
275        if self.error.is_some() {
276            // We already verified that the final size matches
277            return Ok(RecvBufResetReturn::zero());
278        }
279
280        // Calculate how many bytes need to be removed from the connection flow
281        // control.
282        let result = RecvBufResetReturn {
283            max_data_delta: final_size - self.len,
284            consumed_flowcontrol: final_size - self.off,
285        };
286
287        self.error = Some(error_code);
288
289        // Clear all data already buffered.
290        self.off = final_size;
291
292        self.data.clear();
293
294        // In order to ensure the application is notified when the stream is
295        // reset, enqueue a zero-length buffer at the final size offset.
296        let buf = RangeBuf::from(b"", final_size, true);
297        self.write(buf)?;
298
299        Ok(result)
300    }
301
302    /// Commits the new max_data limit.
303    pub fn update_max_data(&mut self, now: Instant) {
304        self.flow_control.update_max_data(now);
305    }
306
307    /// Return the new max_data limit.
308    pub fn max_data_next(&mut self) -> u64 {
309        self.flow_control.max_data_next()
310    }
311
312    /// Return the current flow control limit.
313    pub fn max_data(&self) -> u64 {
314        self.flow_control.max_data()
315    }
316
317    /// Return the current window.
318    pub fn window(&self) -> u64 {
319        self.flow_control.window()
320    }
321
322    /// Autotune the window size.
323    pub fn autotune_window(&mut self, now: Instant, rtt: Duration) {
324        self.flow_control.autotune_window(now, rtt);
325    }
326
327    /// Shuts down receiving data and returns the number of bytes
328    /// that should be returned to the connection level flow
329    /// control
330    pub fn shutdown(&mut self) -> Result<u64> {
331        if self.drain {
332            return Err(Error::Done);
333        }
334
335        self.drain = true;
336
337        self.data.clear();
338
339        let consumed = self.max_off() - self.off;
340        self.off = self.max_off();
341
342        Ok(consumed)
343    }
344
345    /// Returns the lowest offset of data buffered.
346    pub fn off_front(&self) -> u64 {
347        self.off
348    }
349
350    /// Returns true if we need to update the local flow control limit.
351    pub fn almost_full(&self) -> bool {
352        self.fin_off.is_none() && self.flow_control.should_update_max_data()
353    }
354
355    /// Returns the largest offset ever received.
356    pub fn max_off(&self) -> u64 {
357        self.len
358    }
359
360    /// Returns true if the receive-side of the stream is complete.
361    ///
362    /// This happens when the stream's receive final size is known, and the
363    /// application has read all data from the stream.
364    pub fn is_fin(&self) -> bool {
365        if self.fin_off == Some(self.off) {
366            return true;
367        }
368
369        false
370    }
371
372    /// Returns true if the stream is not storing incoming data.
373    pub fn is_draining(&self) -> bool {
374        self.drain
375    }
376
377    /// Returns true if the stream has data to be read.
378    pub fn ready(&self) -> bool {
379        let (_, buf) = match self.data.first_key_value() {
380            Some(v) => v,
381            None => return false,
382        };
383
384        buf.off() == self.off
385    }
386}
387
388#[cfg(test)]
389mod tests {
390    use super::*;
391
392    #[test]
393    fn empty_read() {
394        let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
395        assert_eq!(recv.len, 0);
396
397        let mut buf = [0; 32];
398
399        assert_eq!(recv.emit(&mut buf), Err(Error::Done));
400    }
401
402    #[test]
403    fn empty_stream_frame() {
404        let mut recv = RecvBuf::new(15, DEFAULT_STREAM_WINDOW);
405        assert_eq!(recv.len, 0);
406
407        let buf = RangeBuf::from(b"hello", 0, false);
408        assert!(recv.write(buf).is_ok());
409        assert_eq!(recv.len, 5);
410        assert_eq!(recv.off, 0);
411        assert_eq!(recv.data.len(), 1);
412
413        let mut buf = [0; 32];
414        assert_eq!(recv.emit(&mut buf), Ok((5, false)));
415
416        // Don't store non-fin empty buffer.
417        let buf = RangeBuf::from(b"", 10, false);
418        assert!(recv.write(buf).is_ok());
419        assert_eq!(recv.len, 5);
420        assert_eq!(recv.off, 5);
421        assert_eq!(recv.data.len(), 0);
422
423        // Check flow control for empty buffer.
424        let buf = RangeBuf::from(b"", 16, false);
425        assert_eq!(recv.write(buf), Err(Error::FlowControl));
426
427        // Store fin empty buffer.
428        let buf = RangeBuf::from(b"", 5, true);
429        assert!(recv.write(buf).is_ok());
430        assert_eq!(recv.len, 5);
431        assert_eq!(recv.off, 5);
432        assert_eq!(recv.data.len(), 1);
433
434        // Don't store additional fin empty buffers.
435        let buf = RangeBuf::from(b"", 5, true);
436        assert!(recv.write(buf).is_ok());
437        assert_eq!(recv.len, 5);
438        assert_eq!(recv.off, 5);
439        assert_eq!(recv.data.len(), 1);
440
441        // Don't store additional fin non-empty buffers.
442        let buf = RangeBuf::from(b"aa", 3, true);
443        assert!(recv.write(buf).is_ok());
444        assert_eq!(recv.len, 5);
445        assert_eq!(recv.off, 5);
446        assert_eq!(recv.data.len(), 1);
447
448        // Validate final size with fin empty buffers.
449        let buf = RangeBuf::from(b"", 6, true);
450        assert_eq!(recv.write(buf), Err(Error::FinalSize));
451        let buf = RangeBuf::from(b"", 4, true);
452        assert_eq!(recv.write(buf), Err(Error::FinalSize));
453
454        let mut buf = [0; 32];
455        assert_eq!(recv.emit(&mut buf), Ok((0, true)));
456    }
457
458    #[test]
459    fn ordered_read() {
460        let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
461        assert_eq!(recv.len, 0);
462
463        let mut buf = [0; 32];
464
465        let first = RangeBuf::from(b"hello", 0, false);
466        let second = RangeBuf::from(b"world", 5, false);
467        let third = RangeBuf::from(b"something", 10, true);
468
469        assert!(recv.write(second).is_ok());
470        assert_eq!(recv.len, 10);
471        assert_eq!(recv.off, 0);
472
473        assert_eq!(recv.emit(&mut buf), Err(Error::Done));
474
475        assert!(recv.write(third).is_ok());
476        assert_eq!(recv.len, 19);
477        assert_eq!(recv.off, 0);
478
479        assert_eq!(recv.emit(&mut buf), Err(Error::Done));
480
481        assert!(recv.write(first).is_ok());
482        assert_eq!(recv.len, 19);
483        assert_eq!(recv.off, 0);
484
485        let (len, fin) = recv.emit(&mut buf).unwrap();
486        assert_eq!(len, 19);
487        assert!(fin);
488        assert_eq!(&buf[..len], b"helloworldsomething");
489        assert_eq!(recv.len, 19);
490        assert_eq!(recv.off, 19);
491
492        assert_eq!(recv.emit(&mut buf), Err(Error::Done));
493    }
494
495    /// Test shutdown behavior
496    #[test]
497    fn shutdown() {
498        let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
499        assert_eq!(recv.len, 0);
500
501        let mut buf = [0; 32];
502
503        let first = RangeBuf::from(b"hello", 0, false);
504        let second = RangeBuf::from(b"world", 5, false);
505        let third = RangeBuf::from(b"something", 10, false);
506
507        assert!(recv.write(second).is_ok());
508        assert_eq!(recv.len, 10);
509        assert_eq!(recv.off, 0);
510
511        assert_eq!(recv.emit(&mut buf), Err(Error::Done));
512
513        // shutdown the buffer. Buffer is dropped.
514        assert_eq!(recv.shutdown(), Ok(10));
515        assert_eq!(recv.len, 10);
516        assert_eq!(recv.off, 10);
517        assert_eq!(recv.data.len(), 0);
518
519        assert_eq!(recv.emit(&mut buf), Err(Error::Done));
520
521        // subsequent writes are validated but not added to the buffer
522        assert!(recv.write(first).is_ok());
523        assert_eq!(recv.len, 10);
524        assert_eq!(recv.off, 10);
525        assert_eq!(recv.data.len(), 0);
526
527        // the max offset of received data can increase and
528        // the recv.off must increase with it
529        assert!(recv.write(third).is_ok());
530        assert_eq!(recv.len, 19);
531        assert_eq!(recv.off, 19);
532        assert_eq!(recv.data.len(), 0);
533
534        // Send a reset
535        assert_eq!(recv.emit(&mut buf), Err(Error::Done));
536        assert_eq!(
537            recv.reset(42, 123),
538            Ok(RecvBufResetReturn {
539                max_data_delta: 104,
540                consumed_flowcontrol: 104,
541            })
542        );
543        assert_eq!(recv.len, 123);
544        assert_eq!(recv.off, 123);
545        assert_eq!(recv.data.len(), 0);
546
547        assert_eq!(recv.emit(&mut buf), Err(Error::Done));
548    }
549
550    #[test]
551    fn split_read() {
552        let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
553        assert_eq!(recv.len, 0);
554
555        let mut buf = [0; 32];
556
557        let first = RangeBuf::from(b"something", 0, false);
558        let second = RangeBuf::from(b"helloworld", 9, true);
559
560        assert!(recv.write(first).is_ok());
561        assert_eq!(recv.len, 9);
562        assert_eq!(recv.off, 0);
563
564        assert!(recv.write(second).is_ok());
565        assert_eq!(recv.len, 19);
566        assert_eq!(recv.off, 0);
567
568        let (len, fin) = recv.emit(&mut buf[..10]).unwrap();
569        assert_eq!(len, 10);
570        assert!(!fin);
571        assert_eq!(&buf[..len], b"somethingh");
572        assert_eq!(recv.len, 19);
573        assert_eq!(recv.off, 10);
574
575        let (len, fin) = recv.emit(&mut buf[..5]).unwrap();
576        assert_eq!(len, 5);
577        assert!(!fin);
578        assert_eq!(&buf[..len], b"ellow");
579        assert_eq!(recv.len, 19);
580        assert_eq!(recv.off, 15);
581
582        let (len, fin) = recv.emit(&mut buf[..10]).unwrap();
583        assert_eq!(len, 4);
584        assert!(fin);
585        assert_eq!(&buf[..len], b"orld");
586        assert_eq!(recv.len, 19);
587        assert_eq!(recv.off, 19);
588    }
589
590    #[test]
591    fn incomplete_read() {
592        let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
593        assert_eq!(recv.len, 0);
594
595        let mut buf = [0; 32];
596
597        let first = RangeBuf::from(b"something", 0, false);
598        let second = RangeBuf::from(b"helloworld", 9, true);
599
600        assert!(recv.write(second).is_ok());
601        assert_eq!(recv.len, 19);
602        assert_eq!(recv.off, 0);
603
604        assert_eq!(recv.emit(&mut buf), Err(Error::Done));
605
606        assert!(recv.write(first).is_ok());
607        assert_eq!(recv.len, 19);
608        assert_eq!(recv.off, 0);
609
610        let (len, fin) = recv.emit(&mut buf).unwrap();
611        assert_eq!(len, 19);
612        assert!(fin);
613        assert_eq!(&buf[..len], b"somethinghelloworld");
614        assert_eq!(recv.len, 19);
615        assert_eq!(recv.off, 19);
616    }
617
618    #[test]
619    fn zero_len_read() {
620        let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
621        assert_eq!(recv.len, 0);
622
623        let mut buf = [0; 32];
624
625        let first = RangeBuf::from(b"something", 0, false);
626        let second = RangeBuf::from(b"", 9, true);
627
628        assert!(recv.write(first).is_ok());
629        assert_eq!(recv.len, 9);
630        assert_eq!(recv.off, 0);
631        assert_eq!(recv.data.len(), 1);
632
633        assert!(recv.write(second).is_ok());
634        assert_eq!(recv.len, 9);
635        assert_eq!(recv.off, 0);
636        assert_eq!(recv.data.len(), 1);
637
638        let (len, fin) = recv.emit(&mut buf).unwrap();
639        assert_eq!(len, 9);
640        assert!(fin);
641        assert_eq!(&buf[..len], b"something");
642        assert_eq!(recv.len, 9);
643        assert_eq!(recv.off, 9);
644    }
645
646    #[test]
647    fn past_read() {
648        let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
649        assert_eq!(recv.len, 0);
650
651        let mut buf = [0; 32];
652
653        let first = RangeBuf::from(b"something", 0, false);
654        let second = RangeBuf::from(b"hello", 3, false);
655        let third = RangeBuf::from(b"ello", 4, true);
656        let fourth = RangeBuf::from(b"ello", 5, true);
657
658        assert!(recv.write(first).is_ok());
659        assert_eq!(recv.len, 9);
660        assert_eq!(recv.off, 0);
661        assert_eq!(recv.data.len(), 1);
662
663        let (len, fin) = recv.emit(&mut buf).unwrap();
664        assert_eq!(len, 9);
665        assert!(!fin);
666        assert_eq!(&buf[..len], b"something");
667        assert_eq!(recv.len, 9);
668        assert_eq!(recv.off, 9);
669
670        assert!(recv.write(second).is_ok());
671        assert_eq!(recv.len, 9);
672        assert_eq!(recv.off, 9);
673        assert_eq!(recv.data.len(), 0);
674
675        assert_eq!(recv.write(third), Err(Error::FinalSize));
676
677        assert!(recv.write(fourth).is_ok());
678        assert_eq!(recv.len, 9);
679        assert_eq!(recv.off, 9);
680        assert_eq!(recv.data.len(), 0);
681
682        assert_eq!(recv.emit(&mut buf), Err(Error::Done));
683    }
684
685    #[test]
686    fn fully_overlapping_read() {
687        let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
688        assert_eq!(recv.len, 0);
689
690        let mut buf = [0; 32];
691
692        let first = RangeBuf::from(b"something", 0, false);
693        let second = RangeBuf::from(b"hello", 4, false);
694
695        assert!(recv.write(first).is_ok());
696        assert_eq!(recv.len, 9);
697        assert_eq!(recv.off, 0);
698        assert_eq!(recv.data.len(), 1);
699
700        assert!(recv.write(second).is_ok());
701        assert_eq!(recv.len, 9);
702        assert_eq!(recv.off, 0);
703        assert_eq!(recv.data.len(), 1);
704
705        let (len, fin) = recv.emit(&mut buf).unwrap();
706        assert_eq!(len, 9);
707        assert!(!fin);
708        assert_eq!(&buf[..len], b"something");
709        assert_eq!(recv.len, 9);
710        assert_eq!(recv.off, 9);
711        assert_eq!(recv.data.len(), 0);
712
713        assert_eq!(recv.emit(&mut buf), Err(Error::Done));
714    }
715
716    #[test]
717    fn fully_overlapping_read2() {
718        let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
719        assert_eq!(recv.len, 0);
720
721        let mut buf = [0; 32];
722
723        let first = RangeBuf::from(b"something", 0, false);
724        let second = RangeBuf::from(b"hello", 4, false);
725
726        assert!(recv.write(second).is_ok());
727        assert_eq!(recv.len, 9);
728        assert_eq!(recv.off, 0);
729        assert_eq!(recv.data.len(), 1);
730
731        assert!(recv.write(first).is_ok());
732        assert_eq!(recv.len, 9);
733        assert_eq!(recv.off, 0);
734        assert_eq!(recv.data.len(), 2);
735
736        let (len, fin) = recv.emit(&mut buf).unwrap();
737        assert_eq!(len, 9);
738        assert!(!fin);
739        assert_eq!(&buf[..len], b"somehello");
740        assert_eq!(recv.len, 9);
741        assert_eq!(recv.off, 9);
742        assert_eq!(recv.data.len(), 0);
743
744        assert_eq!(recv.emit(&mut buf), Err(Error::Done));
745    }
746
747    #[test]
748    fn fully_overlapping_read3() {
749        let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
750        assert_eq!(recv.len, 0);
751
752        let mut buf = [0; 32];
753
754        let first = RangeBuf::from(b"something", 0, false);
755        let second = RangeBuf::from(b"hello", 3, false);
756
757        assert!(recv.write(second).is_ok());
758        assert_eq!(recv.len, 8);
759        assert_eq!(recv.off, 0);
760        assert_eq!(recv.data.len(), 1);
761
762        assert!(recv.write(first).is_ok());
763        assert_eq!(recv.len, 9);
764        assert_eq!(recv.off, 0);
765        assert_eq!(recv.data.len(), 3);
766
767        let (len, fin) = recv.emit(&mut buf).unwrap();
768        assert_eq!(len, 9);
769        assert!(!fin);
770        assert_eq!(&buf[..len], b"somhellog");
771        assert_eq!(recv.len, 9);
772        assert_eq!(recv.off, 9);
773        assert_eq!(recv.data.len(), 0);
774
775        assert_eq!(recv.emit(&mut buf), Err(Error::Done));
776    }
777
778    #[test]
779    fn fully_overlapping_read_multi() {
780        let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
781        assert_eq!(recv.len, 0);
782
783        let mut buf = [0; 32];
784
785        let first = RangeBuf::from(b"somethingsomething", 0, false);
786        let second = RangeBuf::from(b"hello", 3, false);
787        let third = RangeBuf::from(b"hello", 12, false);
788
789        assert!(recv.write(second).is_ok());
790        assert_eq!(recv.len, 8);
791        assert_eq!(recv.off, 0);
792        assert_eq!(recv.data.len(), 1);
793
794        assert!(recv.write(third).is_ok());
795        assert_eq!(recv.len, 17);
796        assert_eq!(recv.off, 0);
797        assert_eq!(recv.data.len(), 2);
798
799        assert!(recv.write(first).is_ok());
800        assert_eq!(recv.len, 18);
801        assert_eq!(recv.off, 0);
802        assert_eq!(recv.data.len(), 5);
803
804        let (len, fin) = recv.emit(&mut buf).unwrap();
805        assert_eq!(len, 18);
806        assert!(!fin);
807        assert_eq!(&buf[..len], b"somhellogsomhellog");
808        assert_eq!(recv.len, 18);
809        assert_eq!(recv.off, 18);
810        assert_eq!(recv.data.len(), 0);
811
812        assert_eq!(recv.emit(&mut buf), Err(Error::Done));
813    }
814
815    #[test]
816    fn overlapping_start_read() {
817        let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
818        assert_eq!(recv.len, 0);
819
820        let mut buf = [0; 32];
821
822        let first = RangeBuf::from(b"something", 0, false);
823        let second = RangeBuf::from(b"hello", 8, true);
824
825        assert!(recv.write(first).is_ok());
826        assert_eq!(recv.len, 9);
827        assert_eq!(recv.off, 0);
828        assert_eq!(recv.data.len(), 1);
829
830        assert!(recv.write(second).is_ok());
831        assert_eq!(recv.len, 13);
832        assert_eq!(recv.off, 0);
833        assert_eq!(recv.data.len(), 2);
834
835        let (len, fin) = recv.emit(&mut buf).unwrap();
836        assert_eq!(len, 13);
837        assert!(fin);
838        assert_eq!(&buf[..len], b"somethingello");
839        assert_eq!(recv.len, 13);
840        assert_eq!(recv.off, 13);
841
842        assert_eq!(recv.emit(&mut buf), Err(Error::Done));
843    }
844
845    #[test]
846    fn overlapping_end_read() {
847        let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
848        assert_eq!(recv.len, 0);
849
850        let mut buf = [0; 32];
851
852        let first = RangeBuf::from(b"hello", 0, false);
853        let second = RangeBuf::from(b"something", 3, true);
854
855        assert!(recv.write(second).is_ok());
856        assert_eq!(recv.len, 12);
857        assert_eq!(recv.off, 0);
858        assert_eq!(recv.data.len(), 1);
859
860        assert!(recv.write(first).is_ok());
861        assert_eq!(recv.len, 12);
862        assert_eq!(recv.off, 0);
863        assert_eq!(recv.data.len(), 2);
864
865        let (len, fin) = recv.emit(&mut buf).unwrap();
866        assert_eq!(len, 12);
867        assert!(fin);
868        assert_eq!(&buf[..len], b"helsomething");
869        assert_eq!(recv.len, 12);
870        assert_eq!(recv.off, 12);
871
872        assert_eq!(recv.emit(&mut buf), Err(Error::Done));
873    }
874
875    #[test]
876    fn overlapping_end_twice_read() {
877        let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
878        assert_eq!(recv.len, 0);
879
880        let mut buf = [0; 32];
881
882        let first = RangeBuf::from(b"he", 0, false);
883        let second = RangeBuf::from(b"ow", 4, false);
884        let third = RangeBuf::from(b"rl", 7, false);
885        let fourth = RangeBuf::from(b"helloworld", 0, true);
886
887        assert!(recv.write(third).is_ok());
888        assert_eq!(recv.len, 9);
889        assert_eq!(recv.off, 0);
890        assert_eq!(recv.data.len(), 1);
891
892        assert!(recv.write(second).is_ok());
893        assert_eq!(recv.len, 9);
894        assert_eq!(recv.off, 0);
895        assert_eq!(recv.data.len(), 2);
896
897        assert!(recv.write(first).is_ok());
898        assert_eq!(recv.len, 9);
899        assert_eq!(recv.off, 0);
900        assert_eq!(recv.data.len(), 3);
901
902        assert!(recv.write(fourth).is_ok());
903        assert_eq!(recv.len, 10);
904        assert_eq!(recv.off, 0);
905        assert_eq!(recv.data.len(), 6);
906
907        let (len, fin) = recv.emit(&mut buf).unwrap();
908        assert_eq!(len, 10);
909        assert!(fin);
910        assert_eq!(&buf[..len], b"helloworld");
911        assert_eq!(recv.len, 10);
912        assert_eq!(recv.off, 10);
913
914        assert_eq!(recv.emit(&mut buf), Err(Error::Done));
915    }
916
917    #[test]
918    fn overlapping_end_twice_and_contained_read() {
919        let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
920        assert_eq!(recv.len, 0);
921
922        let mut buf = [0; 32];
923
924        let first = RangeBuf::from(b"hellow", 0, false);
925        let second = RangeBuf::from(b"barfoo", 10, true);
926        let third = RangeBuf::from(b"rl", 7, false);
927        let fourth = RangeBuf::from(b"elloworldbarfoo", 1, true);
928
929        assert!(recv.write(third).is_ok());
930        assert_eq!(recv.len, 9);
931        assert_eq!(recv.off, 0);
932        assert_eq!(recv.data.len(), 1);
933
934        assert!(recv.write(second).is_ok());
935        assert_eq!(recv.len, 16);
936        assert_eq!(recv.off, 0);
937        assert_eq!(recv.data.len(), 2);
938
939        assert!(recv.write(first).is_ok());
940        assert_eq!(recv.len, 16);
941        assert_eq!(recv.off, 0);
942        assert_eq!(recv.data.len(), 3);
943
944        assert!(recv.write(fourth).is_ok());
945        assert_eq!(recv.len, 16);
946        assert_eq!(recv.off, 0);
947        assert_eq!(recv.data.len(), 5);
948
949        let (len, fin) = recv.emit(&mut buf).unwrap();
950        assert_eq!(len, 16);
951        assert!(fin);
952        assert_eq!(&buf[..len], b"helloworldbarfoo");
953        assert_eq!(recv.len, 16);
954        assert_eq!(recv.off, 16);
955
956        assert_eq!(recv.emit(&mut buf), Err(Error::Done));
957    }
958
959    #[test]
960    fn partially_multi_overlapping_reordered_read() {
961        let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
962        assert_eq!(recv.len, 0);
963
964        let mut buf = [0; 32];
965
966        let first = RangeBuf::from(b"hello", 8, false);
967        let second = RangeBuf::from(b"something", 0, false);
968        let third = RangeBuf::from(b"moar", 11, true);
969
970        assert!(recv.write(first).is_ok());
971        assert_eq!(recv.len, 13);
972        assert_eq!(recv.off, 0);
973        assert_eq!(recv.data.len(), 1);
974
975        assert!(recv.write(second).is_ok());
976        assert_eq!(recv.len, 13);
977        assert_eq!(recv.off, 0);
978        assert_eq!(recv.data.len(), 2);
979
980        assert!(recv.write(third).is_ok());
981        assert_eq!(recv.len, 15);
982        assert_eq!(recv.off, 0);
983        assert_eq!(recv.data.len(), 3);
984
985        let (len, fin) = recv.emit(&mut buf).unwrap();
986        assert_eq!(len, 15);
987        assert!(fin);
988        assert_eq!(&buf[..len], b"somethinhelloar");
989        assert_eq!(recv.len, 15);
990        assert_eq!(recv.off, 15);
991        assert_eq!(recv.data.len(), 0);
992
993        assert_eq!(recv.emit(&mut buf), Err(Error::Done));
994    }
995
996    #[test]
997    fn partially_multi_overlapping_reordered_read2() {
998        let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
999        assert_eq!(recv.len, 0);
1000
1001        let mut buf = [0; 32];
1002
1003        let first = RangeBuf::from(b"aaa", 0, false);
1004        let second = RangeBuf::from(b"bbb", 2, false);
1005        let third = RangeBuf::from(b"ccc", 4, false);
1006        let fourth = RangeBuf::from(b"ddd", 6, false);
1007        let fifth = RangeBuf::from(b"eee", 9, false);
1008        let sixth = RangeBuf::from(b"fff", 11, false);
1009
1010        assert!(recv.write(second).is_ok());
1011        assert_eq!(recv.len, 5);
1012        assert_eq!(recv.off, 0);
1013        assert_eq!(recv.data.len(), 1);
1014
1015        assert!(recv.write(fourth).is_ok());
1016        assert_eq!(recv.len, 9);
1017        assert_eq!(recv.off, 0);
1018        assert_eq!(recv.data.len(), 2);
1019
1020        assert!(recv.write(third).is_ok());
1021        assert_eq!(recv.len, 9);
1022        assert_eq!(recv.off, 0);
1023        assert_eq!(recv.data.len(), 3);
1024
1025        assert!(recv.write(first).is_ok());
1026        assert_eq!(recv.len, 9);
1027        assert_eq!(recv.off, 0);
1028        assert_eq!(recv.data.len(), 4);
1029
1030        assert!(recv.write(sixth).is_ok());
1031        assert_eq!(recv.len, 14);
1032        assert_eq!(recv.off, 0);
1033        assert_eq!(recv.data.len(), 5);
1034
1035        assert!(recv.write(fifth).is_ok());
1036        assert_eq!(recv.len, 14);
1037        assert_eq!(recv.off, 0);
1038        assert_eq!(recv.data.len(), 6);
1039
1040        let (len, fin) = recv.emit(&mut buf).unwrap();
1041        assert_eq!(len, 14);
1042        assert!(!fin);
1043        assert_eq!(&buf[..len], b"aabbbcdddeefff");
1044        assert_eq!(recv.len, 14);
1045        assert_eq!(recv.off, 14);
1046        assert_eq!(recv.data.len(), 0);
1047
1048        assert_eq!(recv.emit(&mut buf), Err(Error::Done));
1049    }
1050}