quiche/stream/
recv_buf.rs

1// Copyright (C) 2023, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27use std::cmp;
28use std::time;
29
30use std::collections::BTreeMap;
31use std::collections::VecDeque;
32
33use crate::Error;
34use crate::Result;
35
36use crate::flowcontrol;
37
38use super::RangeBuf;
39use super::DEFAULT_STREAM_WINDOW;
40
41/// Receive-side stream buffer.
42///
43/// Stream data received by the peer is buffered in a list of data chunks
44/// ordered by offset in ascending order. Contiguous data can then be read
45/// into a slice.
46#[derive(Debug, Default)]
47pub struct RecvBuf {
48    /// Chunks of data received from the peer that have not yet been read by
49    /// the application, ordered by offset.
50    data: BTreeMap<u64, RangeBuf>,
51
52    /// The lowest data offset that has yet to be read by the application.
53    off: u64,
54
55    /// The total length of data received on this stream.
56    len: u64,
57
58    /// Receiver flow controller.
59    flow_control: flowcontrol::FlowControl,
60
61    /// The final stream offset received from the peer, if any.
62    fin_off: Option<u64>,
63
64    /// The error code received via RESET_STREAM.
65    error: Option<u64>,
66
67    /// Whether incoming data is validated but not buffered.
68    drain: bool,
69}
70
71impl RecvBuf {
72    /// Creates a new receive buffer.
73    pub fn new(max_data: u64, max_window: u64) -> RecvBuf {
74        RecvBuf {
75            flow_control: flowcontrol::FlowControl::new(
76                max_data,
77                cmp::min(max_data, DEFAULT_STREAM_WINDOW),
78                max_window,
79            ),
80            ..RecvBuf::default()
81        }
82    }
83
84    /// Inserts the given chunk of data in the buffer.
85    ///
86    /// This also takes care of enforcing stream flow control limits, as well
87    /// as handling incoming data that overlaps data that is already in the
88    /// buffer.
89    pub fn write(&mut self, buf: RangeBuf) -> Result<()> {
90        if buf.max_off() > self.max_data() {
91            return Err(Error::FlowControl);
92        }
93
94        if let Some(fin_off) = self.fin_off {
95            // Stream's size is known, forbid data beyond that point.
96            if buf.max_off() > fin_off {
97                return Err(Error::FinalSize);
98            }
99
100            // Stream's size is already known, forbid changing it.
101            if buf.fin() && fin_off != buf.max_off() {
102                return Err(Error::FinalSize);
103            }
104        }
105
106        // Stream's known size is lower than data already received.
107        if buf.fin() && buf.max_off() < self.len {
108            return Err(Error::FinalSize);
109        }
110
111        // We already saved the final offset, so there's nothing else we
112        // need to keep from the RangeBuf if it's empty.
113        if self.fin_off.is_some() && buf.is_empty() {
114            return Ok(());
115        }
116
117        if buf.fin() {
118            self.fin_off = Some(buf.max_off());
119        }
120
121        // No need to store empty buffer that doesn't carry the fin flag.
122        if !buf.fin() && buf.is_empty() {
123            return Ok(());
124        }
125
126        // Check if data is fully duplicate, that is the buffer's max offset is
127        // lower or equal to the offset already stored in the recv buffer.
128        if self.off >= buf.max_off() {
129            // An exception is applied to empty range buffers, because an empty
130            // buffer's max offset matches the max offset of the recv buffer.
131            //
132            // By this point all spurious empty buffers should have already been
133            // discarded, so allowing empty buffers here should be safe.
134            if !buf.is_empty() {
135                return Ok(());
136            }
137        }
138
139        let mut tmp_bufs = VecDeque::with_capacity(2);
140        tmp_bufs.push_back(buf);
141
142        'tmp: while let Some(mut buf) = tmp_bufs.pop_front() {
143            // Discard incoming data below current stream offset. Bytes up to
144            // `self.off` have already been received so we should not buffer
145            // them again. This is also important to make sure `ready()` doesn't
146            // get stuck when a buffer with lower offset than the stream's is
147            // buffered.
148            if self.off_front() > buf.off() {
149                buf = buf.split_off((self.off_front() - buf.off()) as usize);
150            }
151
152            // Handle overlapping data. If the incoming data's starting offset
153            // is above the previous maximum received offset, there is clearly
154            // no overlap so this logic can be skipped. However do still try to
155            // merge an empty final buffer (i.e. an empty buffer with the fin
156            // flag set, which is the only kind of empty buffer that should
157            // reach this point).
158            if buf.off() < self.max_off() || buf.is_empty() {
159                for (_, b) in self.data.range(buf.off()..) {
160                    let off = buf.off();
161
162                    // We are past the current buffer.
163                    if b.off() > buf.max_off() {
164                        break;
165                    }
166
167                    // New buffer is fully contained in existing buffer.
168                    if off >= b.off() && buf.max_off() <= b.max_off() {
169                        continue 'tmp;
170                    }
171
172                    // New buffer's start overlaps existing buffer.
173                    if off >= b.off() && off < b.max_off() {
174                        buf = buf.split_off((b.max_off() - off) as usize);
175                    }
176
177                    // New buffer's end overlaps existing buffer.
178                    if off < b.off() && buf.max_off() > b.off() {
179                        tmp_bufs
180                            .push_back(buf.split_off((b.off() - off) as usize));
181                    }
182                }
183            }
184
185            self.len = cmp::max(self.len, buf.max_off());
186
187            if !self.drain {
188                self.data.insert(buf.max_off(), buf);
189            }
190        }
191
192        Ok(())
193    }
194
195    /// Writes data from the receive buffer into the given output buffer.
196    ///
197    /// Only contiguous data is written to the output buffer, starting from
198    /// offset 0. The offset is incremented as data is read out of the receive
199    /// buffer into the application buffer. If there is no data at the expected
200    /// read offset, the `Done` error is returned.
201    ///
202    /// On success the amount of data read, and a flag indicating if there is
203    /// no more data in the buffer, are returned as a tuple.
204    pub fn emit(&mut self, out: &mut [u8]) -> Result<(usize, bool)> {
205        let mut len = 0;
206        let mut cap = out.len();
207
208        if !self.ready() {
209            return Err(Error::Done);
210        }
211
212        // The stream was reset, so clear its data and return the error code
213        // instead.
214        if let Some(e) = self.error {
215            self.data.clear();
216            return Err(Error::StreamReset(e));
217        }
218
219        while cap > 0 && self.ready() {
220            let mut entry = match self.data.first_entry() {
221                Some(entry) => entry,
222                None => break,
223            };
224
225            let buf = entry.get_mut();
226
227            let buf_len = cmp::min(buf.len(), cap);
228
229            out[len..len + buf_len].copy_from_slice(&buf[..buf_len]);
230
231            self.off += buf_len as u64;
232
233            len += buf_len;
234            cap -= buf_len;
235
236            if buf_len < buf.len() {
237                buf.consume(buf_len);
238
239                // We reached the maximum capacity, so end here.
240                break;
241            }
242
243            entry.remove();
244        }
245
246        // Update consumed bytes for flow control.
247        self.flow_control.add_consumed(len as u64);
248
249        Ok((len, self.is_fin()))
250    }
251
252    /// Resets the stream at the given offset.
253    pub fn reset(&mut self, error_code: u64, final_size: u64) -> Result<usize> {
254        // Stream's size is already known, forbid changing it.
255        if let Some(fin_off) = self.fin_off {
256            if fin_off != final_size {
257                return Err(Error::FinalSize);
258            }
259        }
260
261        // Stream's known size is lower than data already received.
262        if final_size < self.len {
263            return Err(Error::FinalSize);
264        }
265
266        // Calculate how many bytes need to be removed from the connection flow
267        // control.
268        let max_data_delta = final_size - self.len;
269
270        if self.error.is_some() {
271            return Ok(max_data_delta as usize);
272        }
273
274        self.error = Some(error_code);
275
276        // Clear all data already buffered.
277        self.off = final_size;
278
279        self.data.clear();
280
281        // In order to ensure the application is notified when the stream is
282        // reset, enqueue a zero-length buffer at the final size offset.
283        let buf = RangeBuf::from(b"", final_size, true);
284        self.write(buf)?;
285
286        Ok(max_data_delta as usize)
287    }
288
289    /// Commits the new max_data limit.
290    pub fn update_max_data(&mut self, now: time::Instant) {
291        self.flow_control.update_max_data(now);
292    }
293
294    /// Return the new max_data limit.
295    pub fn max_data_next(&mut self) -> u64 {
296        self.flow_control.max_data_next()
297    }
298
299    /// Return the current flow control limit.
300    pub fn max_data(&self) -> u64 {
301        self.flow_control.max_data()
302    }
303
304    /// Return the current window.
305    pub fn window(&self) -> u64 {
306        self.flow_control.window()
307    }
308
309    /// Autotune the window size.
310    pub fn autotune_window(&mut self, now: time::Instant, rtt: time::Duration) {
311        self.flow_control.autotune_window(now, rtt);
312    }
313
314    /// Shuts down receiving data.
315    pub fn shutdown(&mut self) -> Result<()> {
316        if self.drain {
317            return Err(Error::Done);
318        }
319
320        self.drain = true;
321
322        self.data.clear();
323
324        self.off = self.max_off();
325
326        Ok(())
327    }
328
329    /// Returns the lowest offset of data buffered.
330    pub fn off_front(&self) -> u64 {
331        self.off
332    }
333
334    /// Returns true if we need to update the local flow control limit.
335    pub fn almost_full(&self) -> bool {
336        self.fin_off.is_none() && self.flow_control.should_update_max_data()
337    }
338
339    /// Returns the largest offset ever received.
340    pub fn max_off(&self) -> u64 {
341        self.len
342    }
343
344    /// Returns true if the receive-side of the stream is complete.
345    ///
346    /// This happens when the stream's receive final size is known, and the
347    /// application has read all data from the stream.
348    pub fn is_fin(&self) -> bool {
349        if self.fin_off == Some(self.off) {
350            return true;
351        }
352
353        false
354    }
355
356    /// Returns true if the stream is not storing incoming data.
357    pub fn is_draining(&self) -> bool {
358        self.drain
359    }
360
361    /// Returns true if the stream has data to be read.
362    pub fn ready(&self) -> bool {
363        let (_, buf) = match self.data.first_key_value() {
364            Some(v) => v,
365            None => return false,
366        };
367
368        buf.off() == self.off
369    }
370}
371
372#[cfg(test)]
373mod tests {
374    use super::*;
375
376    #[test]
377    fn empty_read() {
378        let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
379        assert_eq!(recv.len, 0);
380
381        let mut buf = [0; 32];
382
383        assert_eq!(recv.emit(&mut buf), Err(Error::Done));
384    }
385
386    #[test]
387    fn empty_stream_frame() {
388        let mut recv = RecvBuf::new(15, DEFAULT_STREAM_WINDOW);
389        assert_eq!(recv.len, 0);
390
391        let buf = RangeBuf::from(b"hello", 0, false);
392        assert!(recv.write(buf).is_ok());
393        assert_eq!(recv.len, 5);
394        assert_eq!(recv.off, 0);
395        assert_eq!(recv.data.len(), 1);
396
397        let mut buf = [0; 32];
398        assert_eq!(recv.emit(&mut buf), Ok((5, false)));
399
400        // Don't store non-fin empty buffer.
401        let buf = RangeBuf::from(b"", 10, false);
402        assert!(recv.write(buf).is_ok());
403        assert_eq!(recv.len, 5);
404        assert_eq!(recv.off, 5);
405        assert_eq!(recv.data.len(), 0);
406
407        // Check flow control for empty buffer.
408        let buf = RangeBuf::from(b"", 16, false);
409        assert_eq!(recv.write(buf), Err(Error::FlowControl));
410
411        // Store fin empty buffer.
412        let buf = RangeBuf::from(b"", 5, true);
413        assert!(recv.write(buf).is_ok());
414        assert_eq!(recv.len, 5);
415        assert_eq!(recv.off, 5);
416        assert_eq!(recv.data.len(), 1);
417
418        // Don't store additional fin empty buffers.
419        let buf = RangeBuf::from(b"", 5, true);
420        assert!(recv.write(buf).is_ok());
421        assert_eq!(recv.len, 5);
422        assert_eq!(recv.off, 5);
423        assert_eq!(recv.data.len(), 1);
424
425        // Don't store additional fin non-empty buffers.
426        let buf = RangeBuf::from(b"aa", 3, true);
427        assert!(recv.write(buf).is_ok());
428        assert_eq!(recv.len, 5);
429        assert_eq!(recv.off, 5);
430        assert_eq!(recv.data.len(), 1);
431
432        // Validate final size with fin empty buffers.
433        let buf = RangeBuf::from(b"", 6, true);
434        assert_eq!(recv.write(buf), Err(Error::FinalSize));
435        let buf = RangeBuf::from(b"", 4, true);
436        assert_eq!(recv.write(buf), Err(Error::FinalSize));
437
438        let mut buf = [0; 32];
439        assert_eq!(recv.emit(&mut buf), Ok((0, true)));
440    }
441
442    #[test]
443    fn ordered_read() {
444        let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
445        assert_eq!(recv.len, 0);
446
447        let mut buf = [0; 32];
448
449        let first = RangeBuf::from(b"hello", 0, false);
450        let second = RangeBuf::from(b"world", 5, false);
451        let third = RangeBuf::from(b"something", 10, true);
452
453        assert!(recv.write(second).is_ok());
454        assert_eq!(recv.len, 10);
455        assert_eq!(recv.off, 0);
456
457        assert_eq!(recv.emit(&mut buf), Err(Error::Done));
458
459        assert!(recv.write(third).is_ok());
460        assert_eq!(recv.len, 19);
461        assert_eq!(recv.off, 0);
462
463        assert_eq!(recv.emit(&mut buf), Err(Error::Done));
464
465        assert!(recv.write(first).is_ok());
466        assert_eq!(recv.len, 19);
467        assert_eq!(recv.off, 0);
468
469        let (len, fin) = recv.emit(&mut buf).unwrap();
470        assert_eq!(len, 19);
471        assert!(fin);
472        assert_eq!(&buf[..len], b"helloworldsomething");
473        assert_eq!(recv.len, 19);
474        assert_eq!(recv.off, 19);
475
476        assert_eq!(recv.emit(&mut buf), Err(Error::Done));
477    }
478
479    #[test]
480    fn split_read() {
481        let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
482        assert_eq!(recv.len, 0);
483
484        let mut buf = [0; 32];
485
486        let first = RangeBuf::from(b"something", 0, false);
487        let second = RangeBuf::from(b"helloworld", 9, true);
488
489        assert!(recv.write(first).is_ok());
490        assert_eq!(recv.len, 9);
491        assert_eq!(recv.off, 0);
492
493        assert!(recv.write(second).is_ok());
494        assert_eq!(recv.len, 19);
495        assert_eq!(recv.off, 0);
496
497        let (len, fin) = recv.emit(&mut buf[..10]).unwrap();
498        assert_eq!(len, 10);
499        assert!(!fin);
500        assert_eq!(&buf[..len], b"somethingh");
501        assert_eq!(recv.len, 19);
502        assert_eq!(recv.off, 10);
503
504        let (len, fin) = recv.emit(&mut buf[..5]).unwrap();
505        assert_eq!(len, 5);
506        assert!(!fin);
507        assert_eq!(&buf[..len], b"ellow");
508        assert_eq!(recv.len, 19);
509        assert_eq!(recv.off, 15);
510
511        let (len, fin) = recv.emit(&mut buf[..10]).unwrap();
512        assert_eq!(len, 4);
513        assert!(fin);
514        assert_eq!(&buf[..len], b"orld");
515        assert_eq!(recv.len, 19);
516        assert_eq!(recv.off, 19);
517    }
518
519    #[test]
520    fn incomplete_read() {
521        let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
522        assert_eq!(recv.len, 0);
523
524        let mut buf = [0; 32];
525
526        let first = RangeBuf::from(b"something", 0, false);
527        let second = RangeBuf::from(b"helloworld", 9, true);
528
529        assert!(recv.write(second).is_ok());
530        assert_eq!(recv.len, 19);
531        assert_eq!(recv.off, 0);
532
533        assert_eq!(recv.emit(&mut buf), Err(Error::Done));
534
535        assert!(recv.write(first).is_ok());
536        assert_eq!(recv.len, 19);
537        assert_eq!(recv.off, 0);
538
539        let (len, fin) = recv.emit(&mut buf).unwrap();
540        assert_eq!(len, 19);
541        assert!(fin);
542        assert_eq!(&buf[..len], b"somethinghelloworld");
543        assert_eq!(recv.len, 19);
544        assert_eq!(recv.off, 19);
545    }
546
547    #[test]
548    fn zero_len_read() {
549        let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
550        assert_eq!(recv.len, 0);
551
552        let mut buf = [0; 32];
553
554        let first = RangeBuf::from(b"something", 0, false);
555        let second = RangeBuf::from(b"", 9, true);
556
557        assert!(recv.write(first).is_ok());
558        assert_eq!(recv.len, 9);
559        assert_eq!(recv.off, 0);
560        assert_eq!(recv.data.len(), 1);
561
562        assert!(recv.write(second).is_ok());
563        assert_eq!(recv.len, 9);
564        assert_eq!(recv.off, 0);
565        assert_eq!(recv.data.len(), 1);
566
567        let (len, fin) = recv.emit(&mut buf).unwrap();
568        assert_eq!(len, 9);
569        assert!(fin);
570        assert_eq!(&buf[..len], b"something");
571        assert_eq!(recv.len, 9);
572        assert_eq!(recv.off, 9);
573    }
574
575    #[test]
576    fn past_read() {
577        let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
578        assert_eq!(recv.len, 0);
579
580        let mut buf = [0; 32];
581
582        let first = RangeBuf::from(b"something", 0, false);
583        let second = RangeBuf::from(b"hello", 3, false);
584        let third = RangeBuf::from(b"ello", 4, true);
585        let fourth = RangeBuf::from(b"ello", 5, true);
586
587        assert!(recv.write(first).is_ok());
588        assert_eq!(recv.len, 9);
589        assert_eq!(recv.off, 0);
590        assert_eq!(recv.data.len(), 1);
591
592        let (len, fin) = recv.emit(&mut buf).unwrap();
593        assert_eq!(len, 9);
594        assert!(!fin);
595        assert_eq!(&buf[..len], b"something");
596        assert_eq!(recv.len, 9);
597        assert_eq!(recv.off, 9);
598
599        assert!(recv.write(second).is_ok());
600        assert_eq!(recv.len, 9);
601        assert_eq!(recv.off, 9);
602        assert_eq!(recv.data.len(), 0);
603
604        assert_eq!(recv.write(third), Err(Error::FinalSize));
605
606        assert!(recv.write(fourth).is_ok());
607        assert_eq!(recv.len, 9);
608        assert_eq!(recv.off, 9);
609        assert_eq!(recv.data.len(), 0);
610
611        assert_eq!(recv.emit(&mut buf), Err(Error::Done));
612    }
613
614    #[test]
615    fn fully_overlapping_read() {
616        let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
617        assert_eq!(recv.len, 0);
618
619        let mut buf = [0; 32];
620
621        let first = RangeBuf::from(b"something", 0, false);
622        let second = RangeBuf::from(b"hello", 4, false);
623
624        assert!(recv.write(first).is_ok());
625        assert_eq!(recv.len, 9);
626        assert_eq!(recv.off, 0);
627        assert_eq!(recv.data.len(), 1);
628
629        assert!(recv.write(second).is_ok());
630        assert_eq!(recv.len, 9);
631        assert_eq!(recv.off, 0);
632        assert_eq!(recv.data.len(), 1);
633
634        let (len, fin) = recv.emit(&mut buf).unwrap();
635        assert_eq!(len, 9);
636        assert!(!fin);
637        assert_eq!(&buf[..len], b"something");
638        assert_eq!(recv.len, 9);
639        assert_eq!(recv.off, 9);
640        assert_eq!(recv.data.len(), 0);
641
642        assert_eq!(recv.emit(&mut buf), Err(Error::Done));
643    }
644
645    #[test]
646    fn fully_overlapping_read2() {
647        let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
648        assert_eq!(recv.len, 0);
649
650        let mut buf = [0; 32];
651
652        let first = RangeBuf::from(b"something", 0, false);
653        let second = RangeBuf::from(b"hello", 4, false);
654
655        assert!(recv.write(second).is_ok());
656        assert_eq!(recv.len, 9);
657        assert_eq!(recv.off, 0);
658        assert_eq!(recv.data.len(), 1);
659
660        assert!(recv.write(first).is_ok());
661        assert_eq!(recv.len, 9);
662        assert_eq!(recv.off, 0);
663        assert_eq!(recv.data.len(), 2);
664
665        let (len, fin) = recv.emit(&mut buf).unwrap();
666        assert_eq!(len, 9);
667        assert!(!fin);
668        assert_eq!(&buf[..len], b"somehello");
669        assert_eq!(recv.len, 9);
670        assert_eq!(recv.off, 9);
671        assert_eq!(recv.data.len(), 0);
672
673        assert_eq!(recv.emit(&mut buf), Err(Error::Done));
674    }
675
676    #[test]
677    fn fully_overlapping_read3() {
678        let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
679        assert_eq!(recv.len, 0);
680
681        let mut buf = [0; 32];
682
683        let first = RangeBuf::from(b"something", 0, false);
684        let second = RangeBuf::from(b"hello", 3, false);
685
686        assert!(recv.write(second).is_ok());
687        assert_eq!(recv.len, 8);
688        assert_eq!(recv.off, 0);
689        assert_eq!(recv.data.len(), 1);
690
691        assert!(recv.write(first).is_ok());
692        assert_eq!(recv.len, 9);
693        assert_eq!(recv.off, 0);
694        assert_eq!(recv.data.len(), 3);
695
696        let (len, fin) = recv.emit(&mut buf).unwrap();
697        assert_eq!(len, 9);
698        assert!(!fin);
699        assert_eq!(&buf[..len], b"somhellog");
700        assert_eq!(recv.len, 9);
701        assert_eq!(recv.off, 9);
702        assert_eq!(recv.data.len(), 0);
703
704        assert_eq!(recv.emit(&mut buf), Err(Error::Done));
705    }
706
707    #[test]
708    fn fully_overlapping_read_multi() {
709        let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
710        assert_eq!(recv.len, 0);
711
712        let mut buf = [0; 32];
713
714        let first = RangeBuf::from(b"somethingsomething", 0, false);
715        let second = RangeBuf::from(b"hello", 3, false);
716        let third = RangeBuf::from(b"hello", 12, false);
717
718        assert!(recv.write(second).is_ok());
719        assert_eq!(recv.len, 8);
720        assert_eq!(recv.off, 0);
721        assert_eq!(recv.data.len(), 1);
722
723        assert!(recv.write(third).is_ok());
724        assert_eq!(recv.len, 17);
725        assert_eq!(recv.off, 0);
726        assert_eq!(recv.data.len(), 2);
727
728        assert!(recv.write(first).is_ok());
729        assert_eq!(recv.len, 18);
730        assert_eq!(recv.off, 0);
731        assert_eq!(recv.data.len(), 5);
732
733        let (len, fin) = recv.emit(&mut buf).unwrap();
734        assert_eq!(len, 18);
735        assert!(!fin);
736        assert_eq!(&buf[..len], b"somhellogsomhellog");
737        assert_eq!(recv.len, 18);
738        assert_eq!(recv.off, 18);
739        assert_eq!(recv.data.len(), 0);
740
741        assert_eq!(recv.emit(&mut buf), Err(Error::Done));
742    }
743
744    #[test]
745    fn overlapping_start_read() {
746        let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
747        assert_eq!(recv.len, 0);
748
749        let mut buf = [0; 32];
750
751        let first = RangeBuf::from(b"something", 0, false);
752        let second = RangeBuf::from(b"hello", 8, true);
753
754        assert!(recv.write(first).is_ok());
755        assert_eq!(recv.len, 9);
756        assert_eq!(recv.off, 0);
757        assert_eq!(recv.data.len(), 1);
758
759        assert!(recv.write(second).is_ok());
760        assert_eq!(recv.len, 13);
761        assert_eq!(recv.off, 0);
762        assert_eq!(recv.data.len(), 2);
763
764        let (len, fin) = recv.emit(&mut buf).unwrap();
765        assert_eq!(len, 13);
766        assert!(fin);
767        assert_eq!(&buf[..len], b"somethingello");
768        assert_eq!(recv.len, 13);
769        assert_eq!(recv.off, 13);
770
771        assert_eq!(recv.emit(&mut buf), Err(Error::Done));
772    }
773
774    #[test]
775    fn overlapping_end_read() {
776        let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
777        assert_eq!(recv.len, 0);
778
779        let mut buf = [0; 32];
780
781        let first = RangeBuf::from(b"hello", 0, false);
782        let second = RangeBuf::from(b"something", 3, true);
783
784        assert!(recv.write(second).is_ok());
785        assert_eq!(recv.len, 12);
786        assert_eq!(recv.off, 0);
787        assert_eq!(recv.data.len(), 1);
788
789        assert!(recv.write(first).is_ok());
790        assert_eq!(recv.len, 12);
791        assert_eq!(recv.off, 0);
792        assert_eq!(recv.data.len(), 2);
793
794        let (len, fin) = recv.emit(&mut buf).unwrap();
795        assert_eq!(len, 12);
796        assert!(fin);
797        assert_eq!(&buf[..len], b"helsomething");
798        assert_eq!(recv.len, 12);
799        assert_eq!(recv.off, 12);
800
801        assert_eq!(recv.emit(&mut buf), Err(Error::Done));
802    }
803
804    #[test]
805    fn overlapping_end_twice_read() {
806        let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
807        assert_eq!(recv.len, 0);
808
809        let mut buf = [0; 32];
810
811        let first = RangeBuf::from(b"he", 0, false);
812        let second = RangeBuf::from(b"ow", 4, false);
813        let third = RangeBuf::from(b"rl", 7, false);
814        let fourth = RangeBuf::from(b"helloworld", 0, true);
815
816        assert!(recv.write(third).is_ok());
817        assert_eq!(recv.len, 9);
818        assert_eq!(recv.off, 0);
819        assert_eq!(recv.data.len(), 1);
820
821        assert!(recv.write(second).is_ok());
822        assert_eq!(recv.len, 9);
823        assert_eq!(recv.off, 0);
824        assert_eq!(recv.data.len(), 2);
825
826        assert!(recv.write(first).is_ok());
827        assert_eq!(recv.len, 9);
828        assert_eq!(recv.off, 0);
829        assert_eq!(recv.data.len(), 3);
830
831        assert!(recv.write(fourth).is_ok());
832        assert_eq!(recv.len, 10);
833        assert_eq!(recv.off, 0);
834        assert_eq!(recv.data.len(), 6);
835
836        let (len, fin) = recv.emit(&mut buf).unwrap();
837        assert_eq!(len, 10);
838        assert!(fin);
839        assert_eq!(&buf[..len], b"helloworld");
840        assert_eq!(recv.len, 10);
841        assert_eq!(recv.off, 10);
842
843        assert_eq!(recv.emit(&mut buf), Err(Error::Done));
844    }
845
846    #[test]
847    fn overlapping_end_twice_and_contained_read() {
848        let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
849        assert_eq!(recv.len, 0);
850
851        let mut buf = [0; 32];
852
853        let first = RangeBuf::from(b"hellow", 0, false);
854        let second = RangeBuf::from(b"barfoo", 10, true);
855        let third = RangeBuf::from(b"rl", 7, false);
856        let fourth = RangeBuf::from(b"elloworldbarfoo", 1, true);
857
858        assert!(recv.write(third).is_ok());
859        assert_eq!(recv.len, 9);
860        assert_eq!(recv.off, 0);
861        assert_eq!(recv.data.len(), 1);
862
863        assert!(recv.write(second).is_ok());
864        assert_eq!(recv.len, 16);
865        assert_eq!(recv.off, 0);
866        assert_eq!(recv.data.len(), 2);
867
868        assert!(recv.write(first).is_ok());
869        assert_eq!(recv.len, 16);
870        assert_eq!(recv.off, 0);
871        assert_eq!(recv.data.len(), 3);
872
873        assert!(recv.write(fourth).is_ok());
874        assert_eq!(recv.len, 16);
875        assert_eq!(recv.off, 0);
876        assert_eq!(recv.data.len(), 5);
877
878        let (len, fin) = recv.emit(&mut buf).unwrap();
879        assert_eq!(len, 16);
880        assert!(fin);
881        assert_eq!(&buf[..len], b"helloworldbarfoo");
882        assert_eq!(recv.len, 16);
883        assert_eq!(recv.off, 16);
884
885        assert_eq!(recv.emit(&mut buf), Err(Error::Done));
886    }
887
888    #[test]
889    fn partially_multi_overlapping_reordered_read() {
890        let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
891        assert_eq!(recv.len, 0);
892
893        let mut buf = [0; 32];
894
895        let first = RangeBuf::from(b"hello", 8, false);
896        let second = RangeBuf::from(b"something", 0, false);
897        let third = RangeBuf::from(b"moar", 11, true);
898
899        assert!(recv.write(first).is_ok());
900        assert_eq!(recv.len, 13);
901        assert_eq!(recv.off, 0);
902        assert_eq!(recv.data.len(), 1);
903
904        assert!(recv.write(second).is_ok());
905        assert_eq!(recv.len, 13);
906        assert_eq!(recv.off, 0);
907        assert_eq!(recv.data.len(), 2);
908
909        assert!(recv.write(third).is_ok());
910        assert_eq!(recv.len, 15);
911        assert_eq!(recv.off, 0);
912        assert_eq!(recv.data.len(), 3);
913
914        let (len, fin) = recv.emit(&mut buf).unwrap();
915        assert_eq!(len, 15);
916        assert!(fin);
917        assert_eq!(&buf[..len], b"somethinhelloar");
918        assert_eq!(recv.len, 15);
919        assert_eq!(recv.off, 15);
920        assert_eq!(recv.data.len(), 0);
921
922        assert_eq!(recv.emit(&mut buf), Err(Error::Done));
923    }
924
925    #[test]
926    fn partially_multi_overlapping_reordered_read2() {
927        let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
928        assert_eq!(recv.len, 0);
929
930        let mut buf = [0; 32];
931
932        let first = RangeBuf::from(b"aaa", 0, false);
933        let second = RangeBuf::from(b"bbb", 2, false);
934        let third = RangeBuf::from(b"ccc", 4, false);
935        let fourth = RangeBuf::from(b"ddd", 6, false);
936        let fifth = RangeBuf::from(b"eee", 9, false);
937        let sixth = RangeBuf::from(b"fff", 11, false);
938
939        assert!(recv.write(second).is_ok());
940        assert_eq!(recv.len, 5);
941        assert_eq!(recv.off, 0);
942        assert_eq!(recv.data.len(), 1);
943
944        assert!(recv.write(fourth).is_ok());
945        assert_eq!(recv.len, 9);
946        assert_eq!(recv.off, 0);
947        assert_eq!(recv.data.len(), 2);
948
949        assert!(recv.write(third).is_ok());
950        assert_eq!(recv.len, 9);
951        assert_eq!(recv.off, 0);
952        assert_eq!(recv.data.len(), 3);
953
954        assert!(recv.write(first).is_ok());
955        assert_eq!(recv.len, 9);
956        assert_eq!(recv.off, 0);
957        assert_eq!(recv.data.len(), 4);
958
959        assert!(recv.write(sixth).is_ok());
960        assert_eq!(recv.len, 14);
961        assert_eq!(recv.off, 0);
962        assert_eq!(recv.data.len(), 5);
963
964        assert!(recv.write(fifth).is_ok());
965        assert_eq!(recv.len, 14);
966        assert_eq!(recv.off, 0);
967        assert_eq!(recv.data.len(), 6);
968
969        let (len, fin) = recv.emit(&mut buf).unwrap();
970        assert_eq!(len, 14);
971        assert!(!fin);
972        assert_eq!(&buf[..len], b"aabbbcdddeefff");
973        assert_eq!(recv.len, 14);
974        assert_eq!(recv.off, 14);
975        assert_eq!(recv.data.len(), 0);
976
977        assert_eq!(recv.emit(&mut buf), Err(Error::Done));
978    }
979}