quiche/stream/
recv_buf.rs

1// Copyright (C) 2023, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27use std::cmp;
28
29use std::collections::BTreeMap;
30use std::collections::VecDeque;
31
32use std::time::Duration;
33use std::time::Instant;
34
35use crate::Error;
36use crate::Result;
37
38use crate::flowcontrol;
39
40use crate::range_buf::RangeBuf;
41
42use super::DEFAULT_STREAM_WINDOW;
43
44/// Receive-side stream buffer.
45///
46/// Stream data received by the peer is buffered in a list of data chunks
47/// ordered by offset in ascending order. Contiguous data can then be read
48/// into a slice.
49#[derive(Debug, Default)]
50pub struct RecvBuf {
51    /// Chunks of data received from the peer that have not yet been read by
52    /// the application, ordered by offset.
53    data: BTreeMap<u64, RangeBuf>,
54
55    /// The lowest data offset that has yet to be read by the application.
56    off: u64,
57
58    /// The total length of data received on this stream.
59    len: u64,
60
61    /// Receiver flow controller.
62    flow_control: flowcontrol::FlowControl,
63
64    /// The final stream offset received from the peer, if any.
65    fin_off: Option<u64>,
66
67    /// The error code received via RESET_STREAM.
68    error: Option<u64>,
69
70    /// Whether incoming data is validated but not buffered.
71    drain: bool,
72}
73
74impl RecvBuf {
75    /// Creates a new receive buffer.
76    pub fn new(max_data: u64, max_window: u64) -> RecvBuf {
77        RecvBuf {
78            flow_control: flowcontrol::FlowControl::new(
79                max_data,
80                cmp::min(max_data, DEFAULT_STREAM_WINDOW),
81                max_window,
82            ),
83            ..RecvBuf::default()
84        }
85    }
86
87    /// Inserts the given chunk of data in the buffer.
88    ///
89    /// This also takes care of enforcing stream flow control limits, as well
90    /// as handling incoming data that overlaps data that is already in the
91    /// buffer.
92    pub fn write(&mut self, buf: RangeBuf) -> Result<()> {
93        if buf.max_off() > self.max_data() {
94            return Err(Error::FlowControl);
95        }
96
97        if let Some(fin_off) = self.fin_off {
98            // Stream's size is known, forbid data beyond that point.
99            if buf.max_off() > fin_off {
100                return Err(Error::FinalSize);
101            }
102
103            // Stream's size is already known, forbid changing it.
104            if buf.fin() && fin_off != buf.max_off() {
105                return Err(Error::FinalSize);
106            }
107        }
108
109        // Stream's known size is lower than data already received.
110        if buf.fin() && buf.max_off() < self.len {
111            return Err(Error::FinalSize);
112        }
113
114        // We already saved the final offset, so there's nothing else we
115        // need to keep from the RangeBuf if it's empty.
116        if self.fin_off.is_some() && buf.is_empty() {
117            return Ok(());
118        }
119
120        if buf.fin() {
121            self.fin_off = Some(buf.max_off());
122        }
123
124        // No need to store empty buffer that doesn't carry the fin flag.
125        if !buf.fin() && buf.is_empty() {
126            return Ok(());
127        }
128
129        // Check if data is fully duplicate, that is the buffer's max offset is
130        // lower or equal to the offset already stored in the recv buffer.
131        if self.off >= buf.max_off() {
132            // An exception is applied to empty range buffers, because an empty
133            // buffer's max offset matches the max offset of the recv buffer.
134            //
135            // By this point all spurious empty buffers should have already been
136            // discarded, so allowing empty buffers here should be safe.
137            if !buf.is_empty() {
138                return Ok(());
139            }
140        }
141
142        let mut tmp_bufs = VecDeque::with_capacity(2);
143        tmp_bufs.push_back(buf);
144
145        'tmp: while let Some(mut buf) = tmp_bufs.pop_front() {
146            // Discard incoming data below current stream offset. Bytes up to
147            // `self.off` have already been received so we should not buffer
148            // them again. This is also important to make sure `ready()` doesn't
149            // get stuck when a buffer with lower offset than the stream's is
150            // buffered.
151            if self.off_front() > buf.off() {
152                buf = buf.split_off((self.off_front() - buf.off()) as usize);
153            }
154
155            // Handle overlapping data. If the incoming data's starting offset
156            // is above the previous maximum received offset, there is clearly
157            // no overlap so this logic can be skipped. However do still try to
158            // merge an empty final buffer (i.e. an empty buffer with the fin
159            // flag set, which is the only kind of empty buffer that should
160            // reach this point).
161            if buf.off() < self.max_off() || buf.is_empty() {
162                for (_, b) in self.data.range(buf.off()..) {
163                    let off = buf.off();
164
165                    // We are past the current buffer.
166                    if b.off() > buf.max_off() {
167                        break;
168                    }
169
170                    // New buffer is fully contained in existing buffer.
171                    if off >= b.off() && buf.max_off() <= b.max_off() {
172                        continue 'tmp;
173                    }
174
175                    // New buffer's start overlaps existing buffer.
176                    if off >= b.off() && off < b.max_off() {
177                        buf = buf.split_off((b.max_off() - off) as usize);
178                    }
179
180                    // New buffer's end overlaps existing buffer.
181                    if off < b.off() && buf.max_off() > b.off() {
182                        tmp_bufs
183                            .push_back(buf.split_off((b.off() - off) as usize));
184                    }
185                }
186            }
187
188            self.len = cmp::max(self.len, buf.max_off());
189
190            if !self.drain {
191                self.data.insert(buf.max_off(), buf);
192            }
193        }
194
195        Ok(())
196    }
197
198    /// Writes data from the receive buffer into the given output buffer.
199    ///
200    /// Only contiguous data is written to the output buffer, starting from
201    /// offset 0. The offset is incremented as data is read out of the receive
202    /// buffer into the application buffer. If there is no data at the expected
203    /// read offset, the `Done` error is returned.
204    ///
205    /// On success the amount of data read, and a flag indicating if there is
206    /// no more data in the buffer, are returned as a tuple.
207    pub fn emit(&mut self, out: &mut [u8]) -> Result<(usize, bool)> {
208        let mut len = 0;
209        let mut cap = out.len();
210
211        if !self.ready() {
212            return Err(Error::Done);
213        }
214
215        // The stream was reset, so clear its data and return the error code
216        // instead.
217        if let Some(e) = self.error {
218            self.data.clear();
219            return Err(Error::StreamReset(e));
220        }
221
222        while cap > 0 && self.ready() {
223            let mut entry = match self.data.first_entry() {
224                Some(entry) => entry,
225                None => break,
226            };
227
228            let buf = entry.get_mut();
229
230            let buf_len = cmp::min(buf.len(), cap);
231
232            out[len..len + buf_len].copy_from_slice(&buf[..buf_len]);
233
234            self.off += buf_len as u64;
235
236            len += buf_len;
237            cap -= buf_len;
238
239            if buf_len < buf.len() {
240                buf.consume(buf_len);
241
242                // We reached the maximum capacity, so end here.
243                break;
244            }
245
246            entry.remove();
247        }
248
249        // Update consumed bytes for flow control.
250        self.flow_control.add_consumed(len as u64);
251
252        Ok((len, self.is_fin()))
253    }
254
255    /// Resets the stream at the given offset.
256    pub fn reset(&mut self, error_code: u64, final_size: u64) -> Result<usize> {
257        // Stream's size is already known, forbid changing it.
258        if let Some(fin_off) = self.fin_off {
259            if fin_off != final_size {
260                return Err(Error::FinalSize);
261            }
262        }
263
264        // Stream's known size is lower than data already received.
265        if final_size < self.len {
266            return Err(Error::FinalSize);
267        }
268
269        // Calculate how many bytes need to be removed from the connection flow
270        // control.
271        let max_data_delta = final_size - self.len;
272
273        if self.error.is_some() {
274            return Ok(max_data_delta as usize);
275        }
276
277        self.error = Some(error_code);
278
279        // Clear all data already buffered.
280        self.off = final_size;
281
282        self.data.clear();
283
284        // In order to ensure the application is notified when the stream is
285        // reset, enqueue a zero-length buffer at the final size offset.
286        let buf = RangeBuf::from(b"", final_size, true);
287        self.write(buf)?;
288
289        Ok(max_data_delta as usize)
290    }
291
292    /// Commits the new max_data limit.
293    pub fn update_max_data(&mut self, now: Instant) {
294        self.flow_control.update_max_data(now);
295    }
296
297    /// Return the new max_data limit.
298    pub fn max_data_next(&mut self) -> u64 {
299        self.flow_control.max_data_next()
300    }
301
302    /// Return the current flow control limit.
303    pub fn max_data(&self) -> u64 {
304        self.flow_control.max_data()
305    }
306
307    /// Return the current window.
308    pub fn window(&self) -> u64 {
309        self.flow_control.window()
310    }
311
312    /// Autotune the window size.
313    pub fn autotune_window(&mut self, now: Instant, rtt: Duration) {
314        self.flow_control.autotune_window(now, rtt);
315    }
316
317    /// Shuts down receiving data.
318    pub fn shutdown(&mut self) -> Result<()> {
319        if self.drain {
320            return Err(Error::Done);
321        }
322
323        self.drain = true;
324
325        self.data.clear();
326
327        self.off = self.max_off();
328
329        Ok(())
330    }
331
332    /// Returns the lowest offset of data buffered.
333    pub fn off_front(&self) -> u64 {
334        self.off
335    }
336
337    /// Returns true if we need to update the local flow control limit.
338    pub fn almost_full(&self) -> bool {
339        self.fin_off.is_none() && self.flow_control.should_update_max_data()
340    }
341
342    /// Returns the largest offset ever received.
343    pub fn max_off(&self) -> u64 {
344        self.len
345    }
346
347    /// Returns true if the receive-side of the stream is complete.
348    ///
349    /// This happens when the stream's receive final size is known, and the
350    /// application has read all data from the stream.
351    pub fn is_fin(&self) -> bool {
352        if self.fin_off == Some(self.off) {
353            return true;
354        }
355
356        false
357    }
358
359    /// Returns true if the stream is not storing incoming data.
360    pub fn is_draining(&self) -> bool {
361        self.drain
362    }
363
364    /// Returns true if the stream has data to be read.
365    pub fn ready(&self) -> bool {
366        let (_, buf) = match self.data.first_key_value() {
367            Some(v) => v,
368            None => return false,
369        };
370
371        buf.off() == self.off
372    }
373}
374
375#[cfg(test)]
376mod tests {
377    use super::*;
378
379    #[test]
380    fn empty_read() {
381        let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
382        assert_eq!(recv.len, 0);
383
384        let mut buf = [0; 32];
385
386        assert_eq!(recv.emit(&mut buf), Err(Error::Done));
387    }
388
389    #[test]
390    fn empty_stream_frame() {
391        let mut recv = RecvBuf::new(15, DEFAULT_STREAM_WINDOW);
392        assert_eq!(recv.len, 0);
393
394        let buf = RangeBuf::from(b"hello", 0, false);
395        assert!(recv.write(buf).is_ok());
396        assert_eq!(recv.len, 5);
397        assert_eq!(recv.off, 0);
398        assert_eq!(recv.data.len(), 1);
399
400        let mut buf = [0; 32];
401        assert_eq!(recv.emit(&mut buf), Ok((5, false)));
402
403        // Don't store non-fin empty buffer.
404        let buf = RangeBuf::from(b"", 10, false);
405        assert!(recv.write(buf).is_ok());
406        assert_eq!(recv.len, 5);
407        assert_eq!(recv.off, 5);
408        assert_eq!(recv.data.len(), 0);
409
410        // Check flow control for empty buffer.
411        let buf = RangeBuf::from(b"", 16, false);
412        assert_eq!(recv.write(buf), Err(Error::FlowControl));
413
414        // Store fin empty buffer.
415        let buf = RangeBuf::from(b"", 5, true);
416        assert!(recv.write(buf).is_ok());
417        assert_eq!(recv.len, 5);
418        assert_eq!(recv.off, 5);
419        assert_eq!(recv.data.len(), 1);
420
421        // Don't store additional fin empty buffers.
422        let buf = RangeBuf::from(b"", 5, true);
423        assert!(recv.write(buf).is_ok());
424        assert_eq!(recv.len, 5);
425        assert_eq!(recv.off, 5);
426        assert_eq!(recv.data.len(), 1);
427
428        // Don't store additional fin non-empty buffers.
429        let buf = RangeBuf::from(b"aa", 3, true);
430        assert!(recv.write(buf).is_ok());
431        assert_eq!(recv.len, 5);
432        assert_eq!(recv.off, 5);
433        assert_eq!(recv.data.len(), 1);
434
435        // Validate final size with fin empty buffers.
436        let buf = RangeBuf::from(b"", 6, true);
437        assert_eq!(recv.write(buf), Err(Error::FinalSize));
438        let buf = RangeBuf::from(b"", 4, true);
439        assert_eq!(recv.write(buf), Err(Error::FinalSize));
440
441        let mut buf = [0; 32];
442        assert_eq!(recv.emit(&mut buf), Ok((0, true)));
443    }
444
445    #[test]
446    fn ordered_read() {
447        let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
448        assert_eq!(recv.len, 0);
449
450        let mut buf = [0; 32];
451
452        let first = RangeBuf::from(b"hello", 0, false);
453        let second = RangeBuf::from(b"world", 5, false);
454        let third = RangeBuf::from(b"something", 10, true);
455
456        assert!(recv.write(second).is_ok());
457        assert_eq!(recv.len, 10);
458        assert_eq!(recv.off, 0);
459
460        assert_eq!(recv.emit(&mut buf), Err(Error::Done));
461
462        assert!(recv.write(third).is_ok());
463        assert_eq!(recv.len, 19);
464        assert_eq!(recv.off, 0);
465
466        assert_eq!(recv.emit(&mut buf), Err(Error::Done));
467
468        assert!(recv.write(first).is_ok());
469        assert_eq!(recv.len, 19);
470        assert_eq!(recv.off, 0);
471
472        let (len, fin) = recv.emit(&mut buf).unwrap();
473        assert_eq!(len, 19);
474        assert!(fin);
475        assert_eq!(&buf[..len], b"helloworldsomething");
476        assert_eq!(recv.len, 19);
477        assert_eq!(recv.off, 19);
478
479        assert_eq!(recv.emit(&mut buf), Err(Error::Done));
480    }
481
482    #[test]
483    fn split_read() {
484        let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
485        assert_eq!(recv.len, 0);
486
487        let mut buf = [0; 32];
488
489        let first = RangeBuf::from(b"something", 0, false);
490        let second = RangeBuf::from(b"helloworld", 9, true);
491
492        assert!(recv.write(first).is_ok());
493        assert_eq!(recv.len, 9);
494        assert_eq!(recv.off, 0);
495
496        assert!(recv.write(second).is_ok());
497        assert_eq!(recv.len, 19);
498        assert_eq!(recv.off, 0);
499
500        let (len, fin) = recv.emit(&mut buf[..10]).unwrap();
501        assert_eq!(len, 10);
502        assert!(!fin);
503        assert_eq!(&buf[..len], b"somethingh");
504        assert_eq!(recv.len, 19);
505        assert_eq!(recv.off, 10);
506
507        let (len, fin) = recv.emit(&mut buf[..5]).unwrap();
508        assert_eq!(len, 5);
509        assert!(!fin);
510        assert_eq!(&buf[..len], b"ellow");
511        assert_eq!(recv.len, 19);
512        assert_eq!(recv.off, 15);
513
514        let (len, fin) = recv.emit(&mut buf[..10]).unwrap();
515        assert_eq!(len, 4);
516        assert!(fin);
517        assert_eq!(&buf[..len], b"orld");
518        assert_eq!(recv.len, 19);
519        assert_eq!(recv.off, 19);
520    }
521
522    #[test]
523    fn incomplete_read() {
524        let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
525        assert_eq!(recv.len, 0);
526
527        let mut buf = [0; 32];
528
529        let first = RangeBuf::from(b"something", 0, false);
530        let second = RangeBuf::from(b"helloworld", 9, true);
531
532        assert!(recv.write(second).is_ok());
533        assert_eq!(recv.len, 19);
534        assert_eq!(recv.off, 0);
535
536        assert_eq!(recv.emit(&mut buf), Err(Error::Done));
537
538        assert!(recv.write(first).is_ok());
539        assert_eq!(recv.len, 19);
540        assert_eq!(recv.off, 0);
541
542        let (len, fin) = recv.emit(&mut buf).unwrap();
543        assert_eq!(len, 19);
544        assert!(fin);
545        assert_eq!(&buf[..len], b"somethinghelloworld");
546        assert_eq!(recv.len, 19);
547        assert_eq!(recv.off, 19);
548    }
549
550    #[test]
551    fn zero_len_read() {
552        let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
553        assert_eq!(recv.len, 0);
554
555        let mut buf = [0; 32];
556
557        let first = RangeBuf::from(b"something", 0, false);
558        let second = RangeBuf::from(b"", 9, true);
559
560        assert!(recv.write(first).is_ok());
561        assert_eq!(recv.len, 9);
562        assert_eq!(recv.off, 0);
563        assert_eq!(recv.data.len(), 1);
564
565        assert!(recv.write(second).is_ok());
566        assert_eq!(recv.len, 9);
567        assert_eq!(recv.off, 0);
568        assert_eq!(recv.data.len(), 1);
569
570        let (len, fin) = recv.emit(&mut buf).unwrap();
571        assert_eq!(len, 9);
572        assert!(fin);
573        assert_eq!(&buf[..len], b"something");
574        assert_eq!(recv.len, 9);
575        assert_eq!(recv.off, 9);
576    }
577
578    #[test]
579    fn past_read() {
580        let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
581        assert_eq!(recv.len, 0);
582
583        let mut buf = [0; 32];
584
585        let first = RangeBuf::from(b"something", 0, false);
586        let second = RangeBuf::from(b"hello", 3, false);
587        let third = RangeBuf::from(b"ello", 4, true);
588        let fourth = RangeBuf::from(b"ello", 5, true);
589
590        assert!(recv.write(first).is_ok());
591        assert_eq!(recv.len, 9);
592        assert_eq!(recv.off, 0);
593        assert_eq!(recv.data.len(), 1);
594
595        let (len, fin) = recv.emit(&mut buf).unwrap();
596        assert_eq!(len, 9);
597        assert!(!fin);
598        assert_eq!(&buf[..len], b"something");
599        assert_eq!(recv.len, 9);
600        assert_eq!(recv.off, 9);
601
602        assert!(recv.write(second).is_ok());
603        assert_eq!(recv.len, 9);
604        assert_eq!(recv.off, 9);
605        assert_eq!(recv.data.len(), 0);
606
607        assert_eq!(recv.write(third), Err(Error::FinalSize));
608
609        assert!(recv.write(fourth).is_ok());
610        assert_eq!(recv.len, 9);
611        assert_eq!(recv.off, 9);
612        assert_eq!(recv.data.len(), 0);
613
614        assert_eq!(recv.emit(&mut buf), Err(Error::Done));
615    }
616
617    #[test]
618    fn fully_overlapping_read() {
619        let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
620        assert_eq!(recv.len, 0);
621
622        let mut buf = [0; 32];
623
624        let first = RangeBuf::from(b"something", 0, false);
625        let second = RangeBuf::from(b"hello", 4, false);
626
627        assert!(recv.write(first).is_ok());
628        assert_eq!(recv.len, 9);
629        assert_eq!(recv.off, 0);
630        assert_eq!(recv.data.len(), 1);
631
632        assert!(recv.write(second).is_ok());
633        assert_eq!(recv.len, 9);
634        assert_eq!(recv.off, 0);
635        assert_eq!(recv.data.len(), 1);
636
637        let (len, fin) = recv.emit(&mut buf).unwrap();
638        assert_eq!(len, 9);
639        assert!(!fin);
640        assert_eq!(&buf[..len], b"something");
641        assert_eq!(recv.len, 9);
642        assert_eq!(recv.off, 9);
643        assert_eq!(recv.data.len(), 0);
644
645        assert_eq!(recv.emit(&mut buf), Err(Error::Done));
646    }
647
648    #[test]
649    fn fully_overlapping_read2() {
650        let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
651        assert_eq!(recv.len, 0);
652
653        let mut buf = [0; 32];
654
655        let first = RangeBuf::from(b"something", 0, false);
656        let second = RangeBuf::from(b"hello", 4, false);
657
658        assert!(recv.write(second).is_ok());
659        assert_eq!(recv.len, 9);
660        assert_eq!(recv.off, 0);
661        assert_eq!(recv.data.len(), 1);
662
663        assert!(recv.write(first).is_ok());
664        assert_eq!(recv.len, 9);
665        assert_eq!(recv.off, 0);
666        assert_eq!(recv.data.len(), 2);
667
668        let (len, fin) = recv.emit(&mut buf).unwrap();
669        assert_eq!(len, 9);
670        assert!(!fin);
671        assert_eq!(&buf[..len], b"somehello");
672        assert_eq!(recv.len, 9);
673        assert_eq!(recv.off, 9);
674        assert_eq!(recv.data.len(), 0);
675
676        assert_eq!(recv.emit(&mut buf), Err(Error::Done));
677    }
678
679    #[test]
680    fn fully_overlapping_read3() {
681        let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
682        assert_eq!(recv.len, 0);
683
684        let mut buf = [0; 32];
685
686        let first = RangeBuf::from(b"something", 0, false);
687        let second = RangeBuf::from(b"hello", 3, false);
688
689        assert!(recv.write(second).is_ok());
690        assert_eq!(recv.len, 8);
691        assert_eq!(recv.off, 0);
692        assert_eq!(recv.data.len(), 1);
693
694        assert!(recv.write(first).is_ok());
695        assert_eq!(recv.len, 9);
696        assert_eq!(recv.off, 0);
697        assert_eq!(recv.data.len(), 3);
698
699        let (len, fin) = recv.emit(&mut buf).unwrap();
700        assert_eq!(len, 9);
701        assert!(!fin);
702        assert_eq!(&buf[..len], b"somhellog");
703        assert_eq!(recv.len, 9);
704        assert_eq!(recv.off, 9);
705        assert_eq!(recv.data.len(), 0);
706
707        assert_eq!(recv.emit(&mut buf), Err(Error::Done));
708    }
709
710    #[test]
711    fn fully_overlapping_read_multi() {
712        let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
713        assert_eq!(recv.len, 0);
714
715        let mut buf = [0; 32];
716
717        let first = RangeBuf::from(b"somethingsomething", 0, false);
718        let second = RangeBuf::from(b"hello", 3, false);
719        let third = RangeBuf::from(b"hello", 12, false);
720
721        assert!(recv.write(second).is_ok());
722        assert_eq!(recv.len, 8);
723        assert_eq!(recv.off, 0);
724        assert_eq!(recv.data.len(), 1);
725
726        assert!(recv.write(third).is_ok());
727        assert_eq!(recv.len, 17);
728        assert_eq!(recv.off, 0);
729        assert_eq!(recv.data.len(), 2);
730
731        assert!(recv.write(first).is_ok());
732        assert_eq!(recv.len, 18);
733        assert_eq!(recv.off, 0);
734        assert_eq!(recv.data.len(), 5);
735
736        let (len, fin) = recv.emit(&mut buf).unwrap();
737        assert_eq!(len, 18);
738        assert!(!fin);
739        assert_eq!(&buf[..len], b"somhellogsomhellog");
740        assert_eq!(recv.len, 18);
741        assert_eq!(recv.off, 18);
742        assert_eq!(recv.data.len(), 0);
743
744        assert_eq!(recv.emit(&mut buf), Err(Error::Done));
745    }
746
747    #[test]
748    fn overlapping_start_read() {
749        let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
750        assert_eq!(recv.len, 0);
751
752        let mut buf = [0; 32];
753
754        let first = RangeBuf::from(b"something", 0, false);
755        let second = RangeBuf::from(b"hello", 8, true);
756
757        assert!(recv.write(first).is_ok());
758        assert_eq!(recv.len, 9);
759        assert_eq!(recv.off, 0);
760        assert_eq!(recv.data.len(), 1);
761
762        assert!(recv.write(second).is_ok());
763        assert_eq!(recv.len, 13);
764        assert_eq!(recv.off, 0);
765        assert_eq!(recv.data.len(), 2);
766
767        let (len, fin) = recv.emit(&mut buf).unwrap();
768        assert_eq!(len, 13);
769        assert!(fin);
770        assert_eq!(&buf[..len], b"somethingello");
771        assert_eq!(recv.len, 13);
772        assert_eq!(recv.off, 13);
773
774        assert_eq!(recv.emit(&mut buf), Err(Error::Done));
775    }
776
777    #[test]
778    fn overlapping_end_read() {
779        let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
780        assert_eq!(recv.len, 0);
781
782        let mut buf = [0; 32];
783
784        let first = RangeBuf::from(b"hello", 0, false);
785        let second = RangeBuf::from(b"something", 3, true);
786
787        assert!(recv.write(second).is_ok());
788        assert_eq!(recv.len, 12);
789        assert_eq!(recv.off, 0);
790        assert_eq!(recv.data.len(), 1);
791
792        assert!(recv.write(first).is_ok());
793        assert_eq!(recv.len, 12);
794        assert_eq!(recv.off, 0);
795        assert_eq!(recv.data.len(), 2);
796
797        let (len, fin) = recv.emit(&mut buf).unwrap();
798        assert_eq!(len, 12);
799        assert!(fin);
800        assert_eq!(&buf[..len], b"helsomething");
801        assert_eq!(recv.len, 12);
802        assert_eq!(recv.off, 12);
803
804        assert_eq!(recv.emit(&mut buf), Err(Error::Done));
805    }
806
807    #[test]
808    fn overlapping_end_twice_read() {
809        let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
810        assert_eq!(recv.len, 0);
811
812        let mut buf = [0; 32];
813
814        let first = RangeBuf::from(b"he", 0, false);
815        let second = RangeBuf::from(b"ow", 4, false);
816        let third = RangeBuf::from(b"rl", 7, false);
817        let fourth = RangeBuf::from(b"helloworld", 0, true);
818
819        assert!(recv.write(third).is_ok());
820        assert_eq!(recv.len, 9);
821        assert_eq!(recv.off, 0);
822        assert_eq!(recv.data.len(), 1);
823
824        assert!(recv.write(second).is_ok());
825        assert_eq!(recv.len, 9);
826        assert_eq!(recv.off, 0);
827        assert_eq!(recv.data.len(), 2);
828
829        assert!(recv.write(first).is_ok());
830        assert_eq!(recv.len, 9);
831        assert_eq!(recv.off, 0);
832        assert_eq!(recv.data.len(), 3);
833
834        assert!(recv.write(fourth).is_ok());
835        assert_eq!(recv.len, 10);
836        assert_eq!(recv.off, 0);
837        assert_eq!(recv.data.len(), 6);
838
839        let (len, fin) = recv.emit(&mut buf).unwrap();
840        assert_eq!(len, 10);
841        assert!(fin);
842        assert_eq!(&buf[..len], b"helloworld");
843        assert_eq!(recv.len, 10);
844        assert_eq!(recv.off, 10);
845
846        assert_eq!(recv.emit(&mut buf), Err(Error::Done));
847    }
848
849    #[test]
850    fn overlapping_end_twice_and_contained_read() {
851        let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
852        assert_eq!(recv.len, 0);
853
854        let mut buf = [0; 32];
855
856        let first = RangeBuf::from(b"hellow", 0, false);
857        let second = RangeBuf::from(b"barfoo", 10, true);
858        let third = RangeBuf::from(b"rl", 7, false);
859        let fourth = RangeBuf::from(b"elloworldbarfoo", 1, true);
860
861        assert!(recv.write(third).is_ok());
862        assert_eq!(recv.len, 9);
863        assert_eq!(recv.off, 0);
864        assert_eq!(recv.data.len(), 1);
865
866        assert!(recv.write(second).is_ok());
867        assert_eq!(recv.len, 16);
868        assert_eq!(recv.off, 0);
869        assert_eq!(recv.data.len(), 2);
870
871        assert!(recv.write(first).is_ok());
872        assert_eq!(recv.len, 16);
873        assert_eq!(recv.off, 0);
874        assert_eq!(recv.data.len(), 3);
875
876        assert!(recv.write(fourth).is_ok());
877        assert_eq!(recv.len, 16);
878        assert_eq!(recv.off, 0);
879        assert_eq!(recv.data.len(), 5);
880
881        let (len, fin) = recv.emit(&mut buf).unwrap();
882        assert_eq!(len, 16);
883        assert!(fin);
884        assert_eq!(&buf[..len], b"helloworldbarfoo");
885        assert_eq!(recv.len, 16);
886        assert_eq!(recv.off, 16);
887
888        assert_eq!(recv.emit(&mut buf), Err(Error::Done));
889    }
890
891    #[test]
892    fn partially_multi_overlapping_reordered_read() {
893        let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
894        assert_eq!(recv.len, 0);
895
896        let mut buf = [0; 32];
897
898        let first = RangeBuf::from(b"hello", 8, false);
899        let second = RangeBuf::from(b"something", 0, false);
900        let third = RangeBuf::from(b"moar", 11, true);
901
902        assert!(recv.write(first).is_ok());
903        assert_eq!(recv.len, 13);
904        assert_eq!(recv.off, 0);
905        assert_eq!(recv.data.len(), 1);
906
907        assert!(recv.write(second).is_ok());
908        assert_eq!(recv.len, 13);
909        assert_eq!(recv.off, 0);
910        assert_eq!(recv.data.len(), 2);
911
912        assert!(recv.write(third).is_ok());
913        assert_eq!(recv.len, 15);
914        assert_eq!(recv.off, 0);
915        assert_eq!(recv.data.len(), 3);
916
917        let (len, fin) = recv.emit(&mut buf).unwrap();
918        assert_eq!(len, 15);
919        assert!(fin);
920        assert_eq!(&buf[..len], b"somethinhelloar");
921        assert_eq!(recv.len, 15);
922        assert_eq!(recv.off, 15);
923        assert_eq!(recv.data.len(), 0);
924
925        assert_eq!(recv.emit(&mut buf), Err(Error::Done));
926    }
927
928    #[test]
929    fn partially_multi_overlapping_reordered_read2() {
930        let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
931        assert_eq!(recv.len, 0);
932
933        let mut buf = [0; 32];
934
935        let first = RangeBuf::from(b"aaa", 0, false);
936        let second = RangeBuf::from(b"bbb", 2, false);
937        let third = RangeBuf::from(b"ccc", 4, false);
938        let fourth = RangeBuf::from(b"ddd", 6, false);
939        let fifth = RangeBuf::from(b"eee", 9, false);
940        let sixth = RangeBuf::from(b"fff", 11, false);
941
942        assert!(recv.write(second).is_ok());
943        assert_eq!(recv.len, 5);
944        assert_eq!(recv.off, 0);
945        assert_eq!(recv.data.len(), 1);
946
947        assert!(recv.write(fourth).is_ok());
948        assert_eq!(recv.len, 9);
949        assert_eq!(recv.off, 0);
950        assert_eq!(recv.data.len(), 2);
951
952        assert!(recv.write(third).is_ok());
953        assert_eq!(recv.len, 9);
954        assert_eq!(recv.off, 0);
955        assert_eq!(recv.data.len(), 3);
956
957        assert!(recv.write(first).is_ok());
958        assert_eq!(recv.len, 9);
959        assert_eq!(recv.off, 0);
960        assert_eq!(recv.data.len(), 4);
961
962        assert!(recv.write(sixth).is_ok());
963        assert_eq!(recv.len, 14);
964        assert_eq!(recv.off, 0);
965        assert_eq!(recv.data.len(), 5);
966
967        assert!(recv.write(fifth).is_ok());
968        assert_eq!(recv.len, 14);
969        assert_eq!(recv.off, 0);
970        assert_eq!(recv.data.len(), 6);
971
972        let (len, fin) = recv.emit(&mut buf).unwrap();
973        assert_eq!(len, 14);
974        assert!(!fin);
975        assert_eq!(&buf[..len], b"aabbbcdddeefff");
976        assert_eq!(recv.len, 14);
977        assert_eq!(recv.off, 14);
978        assert_eq!(recv.data.len(), 0);
979
980        assert_eq!(recv.emit(&mut buf), Err(Error::Done));
981    }
982}