Skip to main content

quiche/stream/
recv_buf.rs

1// Copyright (C) 2023, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27use std::cmp;
28
29use std::collections::BTreeMap;
30use std::collections::VecDeque;
31
32use std::time::Duration;
33use std::time::Instant;
34
35use crate::stream::RecvAction;
36use crate::stream::RecvBufResetReturn;
37use crate::Error;
38use crate::Result;
39
40use crate::flowcontrol;
41
42use crate::range_buf::RangeBuf;
43
44use super::DEFAULT_STREAM_WINDOW;
45
46/// Receive-side stream buffer.
47///
48/// Stream data received by the peer is buffered in a list of data chunks
49/// ordered by offset in ascending order. Contiguous data can then be read
50/// into a slice.
51#[derive(Debug, Default)]
52pub struct RecvBuf {
53    /// Chunks of data received from the peer that have not yet been read by
54    /// the application, ordered by offset.
55    data: BTreeMap<u64, RangeBuf>,
56
57    /// The lowest data offset that has yet to be read by the application.
58    off: u64,
59
60    /// The total length of data received on this stream.
61    len: u64,
62
63    /// Receiver flow controller.
64    flow_control: flowcontrol::FlowControl,
65
66    /// The final stream offset received from the peer, if any.
67    fin_off: Option<u64>,
68
69    /// The error code received via RESET_STREAM.
70    error: Option<u64>,
71
72    /// Whether incoming data is validated but not buffered.
73    drain: bool,
74}
75
76impl RecvBuf {
77    /// Creates a new receive buffer.
78    pub fn new(max_data: u64, max_window: u64) -> RecvBuf {
79        RecvBuf {
80            flow_control: flowcontrol::FlowControl::new(
81                max_data,
82                cmp::min(max_data, DEFAULT_STREAM_WINDOW),
83                max_window,
84            ),
85            ..RecvBuf::default()
86        }
87    }
88
89    /// Inserts the given chunk of data in the buffer.
90    ///
91    /// This also takes care of enforcing stream flow control limits, as well
92    /// as handling incoming data that overlaps data that is already in the
93    /// buffer.
94    pub fn write(&mut self, buf: RangeBuf) -> Result<()> {
95        if buf.max_off() > self.max_data() {
96            return Err(Error::FlowControl);
97        }
98
99        if let Some(fin_off) = self.fin_off {
100            // Stream's size is known, forbid data beyond that point.
101            if buf.max_off() > fin_off {
102                return Err(Error::FinalSize);
103            }
104
105            // Stream's size is already known, forbid changing it.
106            if buf.fin() && fin_off != buf.max_off() {
107                return Err(Error::FinalSize);
108            }
109        }
110
111        // Stream's known size is lower than data already received.
112        if buf.fin() && buf.max_off() < self.len {
113            return Err(Error::FinalSize);
114        }
115
116        // We already saved the final offset, so there's nothing else we
117        // need to keep from the RangeBuf if it's empty.
118        if self.fin_off.is_some() && buf.is_empty() {
119            return Ok(());
120        }
121
122        if buf.fin() {
123            self.fin_off = Some(buf.max_off());
124        }
125
126        // No need to store empty buffer that doesn't carry the fin flag.
127        if !buf.fin() && buf.is_empty() {
128            return Ok(());
129        }
130
131        // Check if data is fully duplicate, that is the buffer's max offset is
132        // lower or equal to the offset already stored in the recv buffer.
133        if self.off >= buf.max_off() {
134            // An exception is applied to empty range buffers, because an empty
135            // buffer's max offset matches the max offset of the recv buffer.
136            //
137            // By this point all spurious empty buffers should have already been
138            // discarded, so allowing empty buffers here should be safe.
139            if !buf.is_empty() {
140                return Ok(());
141            }
142        }
143
144        let mut tmp_bufs = VecDeque::with_capacity(2);
145        tmp_bufs.push_back(buf);
146
147        'tmp: while let Some(mut buf) = tmp_bufs.pop_front() {
148            // Discard incoming data below current stream offset. Bytes up to
149            // `self.off` have already been received so we should not buffer
150            // them again. This is also important to make sure `ready()` doesn't
151            // get stuck when a buffer with lower offset than the stream's is
152            // buffered.
153            if self.off_front() > buf.off() {
154                buf = buf.split_off((self.off_front() - buf.off()) as usize);
155            }
156
157            // Handle overlapping data. If the incoming data's starting offset
158            // is above the previous maximum received offset, there is clearly
159            // no overlap so this logic can be skipped. However do still try to
160            // merge an empty final buffer (i.e. an empty buffer with the fin
161            // flag set, which is the only kind of empty buffer that should
162            // reach this point).
163            if buf.off() < self.max_off() || buf.is_empty() {
164                for (_, b) in self.data.range(buf.off()..) {
165                    let off = buf.off();
166
167                    // We are past the current buffer.
168                    if b.off() > buf.max_off() {
169                        break;
170                    }
171
172                    // New buffer is fully contained in existing buffer.
173                    if off >= b.off() && buf.max_off() <= b.max_off() {
174                        continue 'tmp;
175                    }
176
177                    // New buffer's start overlaps existing buffer.
178                    if off >= b.off() && off < b.max_off() {
179                        buf = buf.split_off((b.max_off() - off) as usize);
180                    }
181
182                    // New buffer's end overlaps existing buffer.
183                    if off < b.off() && buf.max_off() > b.off() {
184                        tmp_bufs
185                            .push_back(buf.split_off((b.off() - off) as usize));
186                    }
187                }
188            }
189
190            self.len = cmp::max(self.len, buf.max_off());
191
192            if !self.drain {
193                self.data.insert(buf.max_off(), buf);
194            } else {
195                // we are not storing any data, off == len
196                self.off = self.len;
197            }
198        }
199
200        Ok(())
201    }
202
203    /// Reads contiguous data from the receive buffer.
204    ///
205    /// Data is written into the given `out` buffer, up to the length of `out`.
206    ///
207    /// Only contiguous data is removed, starting from offset 0. The offset is
208    /// incremented as data is taken out of the receive buffer. If there is no
209    /// data at the expected read offset, the `Done` error is returned.
210    ///
211    /// On success the amount of data read and a flag indicating
212    /// if there is no more data in the buffer, are returned as a tuple.
213    pub fn emit(&mut self, out: &mut [u8]) -> Result<(usize, bool)> {
214        self.emit_or_discard(RecvAction::Emit { out })
215    }
216
217    /// Reads or discards contiguous data from the receive buffer.
218    ///
219    /// Passing an `action` of `StreamRecvAction::Emit` results in data being
220    /// written into the provided buffer, up to its length.
221    ///
222    /// Passing an `action` of `StreamRecvAction::Discard` results in up to
223    /// the indicated number of bytes being discarded without copying.
224    ///
225    /// Only contiguous data is removed, starting from offset 0. The offset is
226    /// incremented as data is taken out of the receive buffer. If there is no
227    /// data at the expected read offset, the `Done` error is returned.
228    ///
229    /// On success the amount of data read or discarded, and a flag indicating
230    /// if there is no more data in the buffer, are returned as a tuple.
231    pub fn emit_or_discard(
232        &mut self, mut action: RecvAction,
233    ) -> Result<(usize, bool)> {
234        let mut len = 0;
235        let mut cap = match &action {
236            RecvAction::Emit { out } => out.len(),
237            RecvAction::Discard { len } => *len,
238        };
239
240        if !self.ready() {
241            return Err(Error::Done);
242        }
243
244        // The stream was reset, so clear its data and return the error code
245        // instead.
246        if let Some(e) = self.error {
247            self.data.clear();
248            return Err(Error::StreamReset(e));
249        }
250
251        while cap > 0 && self.ready() {
252            let mut entry = match self.data.first_entry() {
253                Some(entry) => entry,
254                None => break,
255            };
256
257            let buf = entry.get_mut();
258
259            let buf_len = cmp::min(buf.len(), cap);
260
261            // Only copy data if we're emitting, not discarding.
262            if let RecvAction::Emit { ref mut out } = action {
263                out[len..len + buf_len].copy_from_slice(&buf[..buf_len]);
264            }
265
266            self.off += buf_len as u64;
267
268            len += buf_len;
269            cap -= buf_len;
270
271            if buf_len < buf.len() {
272                buf.consume(buf_len);
273
274                // We reached the maximum capacity, so end here.
275                break;
276            }
277
278            entry.remove();
279        }
280
281        // Update consumed bytes for flow control.
282        self.flow_control.add_consumed(len as u64);
283
284        Ok((len, self.is_fin()))
285    }
286
287    /// Resets the stream at the given offset.
288    pub fn reset(
289        &mut self, error_code: u64, final_size: u64,
290    ) -> Result<RecvBufResetReturn> {
291        // Stream's size is already known, forbid changing it.
292        if let Some(fin_off) = self.fin_off {
293            if fin_off != final_size {
294                return Err(Error::FinalSize);
295            }
296        }
297
298        // Stream's known size is lower than data already received.
299        if final_size < self.len {
300            return Err(Error::FinalSize);
301        }
302
303        if self.error.is_some() {
304            // We already verified that the final size matches
305            return Ok(RecvBufResetReturn::zero());
306        }
307
308        // Calculate how many bytes need to be removed from the connection flow
309        // control.
310        let result = RecvBufResetReturn {
311            max_data_delta: final_size - self.len,
312            consumed_flowcontrol: final_size - self.off,
313        };
314
315        self.error = Some(error_code);
316
317        // Clear all data already buffered.
318        self.off = final_size;
319
320        self.data.clear();
321
322        // In order to ensure the application is notified when the stream is
323        // reset, enqueue a zero-length buffer at the final size offset.
324        let buf = RangeBuf::from(b"", final_size, true);
325        self.write(buf)?;
326
327        Ok(result)
328    }
329
330    /// Commits the new max_data limit.
331    pub fn update_max_data(&mut self, now: Instant) {
332        self.flow_control.update_max_data(now);
333    }
334
335    /// Return the new max_data limit.
336    pub fn max_data_next(&mut self) -> u64 {
337        self.flow_control.max_data_next()
338    }
339
340    /// Return the current flow control limit.
341    pub fn max_data(&self) -> u64 {
342        self.flow_control.max_data()
343    }
344
345    /// Return the current window.
346    pub fn window(&self) -> u64 {
347        self.flow_control.window()
348    }
349
350    /// Autotune the window size.
351    pub fn autotune_window(&mut self, now: Instant, rtt: Duration) {
352        self.flow_control.autotune_window(now, rtt);
353    }
354
355    /// Shuts down receiving data and returns the number of bytes
356    /// that should be returned to the connection level flow
357    /// control
358    pub fn shutdown(&mut self) -> Result<u64> {
359        if self.drain {
360            return Err(Error::Done);
361        }
362
363        self.drain = true;
364
365        self.data.clear();
366
367        let consumed = self.max_off() - self.off;
368        self.off = self.max_off();
369
370        Ok(consumed)
371    }
372
373    /// Returns the lowest offset of data buffered.
374    pub fn off_front(&self) -> u64 {
375        self.off
376    }
377
378    /// Returns true if we need to update the local flow control limit.
379    pub fn almost_full(&self) -> bool {
380        self.fin_off.is_none() && self.flow_control.should_update_max_data()
381    }
382
383    /// Returns the largest offset ever received.
384    pub fn max_off(&self) -> u64 {
385        self.len
386    }
387
388    /// Returns true if the receive-side of the stream is complete.
389    ///
390    /// This happens when the stream's receive final size is known, and the
391    /// application has read all data from the stream.
392    pub fn is_fin(&self) -> bool {
393        if self.fin_off == Some(self.off) {
394            return true;
395        }
396
397        false
398    }
399
400    /// Returns true if the stream is not storing incoming data.
401    pub fn is_draining(&self) -> bool {
402        self.drain
403    }
404
405    /// Returns true if the stream has data to be read.
406    pub fn ready(&self) -> bool {
407        let (_, buf) = match self.data.first_key_value() {
408            Some(v) => v,
409            None => return false,
410        };
411
412        buf.off() == self.off
413    }
414}
415
416#[cfg(test)]
417mod tests {
418    use super::*;
419    use rstest::rstest;
420
421    // Helper function for testing either buffer emit or discard.
422    //
423    // The `emit` parameter controls whether data is emitted or discarded from
424    // `recv`.
425    //
426    // The `target_len` parameter controls the maximum amount of bytes that
427    // could be read, up to the capacity of `recv`. The `result_len` is the
428    // actual number of bytes that were taken out of `recv`. An assert is
429    // performed on `result_len` to ensure the number of bytes read meets the
430    // caller expectations.
431    //
432    // The `is_fin` parameter relates to the buffer's finished status. An assert
433    // is performed on it to ensure the status meet the caller expectations.
434    //
435    // The `test_bytes` parameter carries an optional slice of bytes. Is set, an
436    // assert is performed against the bytes that were read out of the buffer,
437    // to ensure caller expectations are met.
438    fn assert_emit_discard(
439        recv: &mut RecvBuf, emit: bool, target_len: usize, result_len: usize,
440        is_fin: bool, test_bytes: Option<&[u8]>,
441    ) {
442        let mut buf = [0; 32];
443        let action = if emit {
444            RecvAction::Emit {
445                out: &mut buf[..target_len],
446            }
447        } else {
448            RecvAction::Discard { len: target_len }
449        };
450
451        let (read, fin) = recv.emit_or_discard(action).unwrap();
452
453        if emit {
454            if let Some(v) = test_bytes {
455                assert_eq!(&buf[..read], v);
456            }
457        }
458
459        assert_eq!(read, result_len);
460        assert_eq!(is_fin, fin);
461    }
462
463    // Helper function for testing buffer status for either emit or discard.
464    fn assert_emit_discard_done(recv: &mut RecvBuf, emit: bool) {
465        let mut buf = [0; 32];
466        let action = if emit {
467            RecvAction::Emit { out: &mut buf }
468        } else {
469            RecvAction::Discard { len: 32 }
470        };
471        assert_eq!(recv.emit_or_discard(action), Err(Error::Done));
472    }
473
474    #[rstest]
475    fn empty_read(#[values(true, false)] emit: bool) {
476        let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
477        assert_eq!(recv.len, 0);
478
479        assert_emit_discard_done(&mut recv, emit);
480    }
481
482    #[rstest]
483    fn empty_stream_frame(#[values(true, false)] emit: bool) {
484        let mut recv = RecvBuf::new(15, DEFAULT_STREAM_WINDOW);
485        assert_eq!(recv.len, 0);
486
487        let buf = RangeBuf::from(b"hello", 0, false);
488        assert!(recv.write(buf).is_ok());
489        assert_eq!(recv.len, 5);
490        assert_eq!(recv.off, 0);
491        assert_eq!(recv.data.len(), 1);
492
493        assert_emit_discard(&mut recv, emit, 32, 5, false, None);
494
495        // Don't store non-fin empty buffer.
496        let buf = RangeBuf::from(b"", 10, false);
497        assert!(recv.write(buf).is_ok());
498        assert_eq!(recv.len, 5);
499        assert_eq!(recv.off, 5);
500        assert_eq!(recv.data.len(), 0);
501
502        // Check flow control for empty buffer.
503        let buf = RangeBuf::from(b"", 16, false);
504        assert_eq!(recv.write(buf), Err(Error::FlowControl));
505
506        // Store fin empty buffer.
507        let buf = RangeBuf::from(b"", 5, true);
508        assert!(recv.write(buf).is_ok());
509        assert_eq!(recv.len, 5);
510        assert_eq!(recv.off, 5);
511        assert_eq!(recv.data.len(), 1);
512
513        // Don't store additional fin empty buffers.
514        let buf = RangeBuf::from(b"", 5, true);
515        assert!(recv.write(buf).is_ok());
516        assert_eq!(recv.len, 5);
517        assert_eq!(recv.off, 5);
518        assert_eq!(recv.data.len(), 1);
519
520        // Don't store additional fin non-empty buffers.
521        let buf = RangeBuf::from(b"aa", 3, true);
522        assert!(recv.write(buf).is_ok());
523        assert_eq!(recv.len, 5);
524        assert_eq!(recv.off, 5);
525        assert_eq!(recv.data.len(), 1);
526
527        // Validate final size with fin empty buffers.
528        let buf = RangeBuf::from(b"", 6, true);
529        assert_eq!(recv.write(buf), Err(Error::FinalSize));
530        let buf = RangeBuf::from(b"", 4, true);
531        assert_eq!(recv.write(buf), Err(Error::FinalSize));
532
533        assert_emit_discard(&mut recv, emit, 32, 0, true, None);
534    }
535
536    #[rstest]
537    fn ordered_read(#[values(true, false)] emit: bool) {
538        let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
539        assert_eq!(recv.len, 0);
540
541        let first = RangeBuf::from(b"hello", 0, false);
542        let second = RangeBuf::from(b"world", 5, false);
543        let third = RangeBuf::from(b"something", 10, true);
544
545        assert!(recv.write(second).is_ok());
546        assert_eq!(recv.len, 10);
547        assert_eq!(recv.off, 0);
548
549        assert_emit_discard_done(&mut recv, emit);
550
551        assert!(recv.write(third).is_ok());
552        assert_eq!(recv.len, 19);
553        assert_eq!(recv.off, 0);
554
555        assert_emit_discard_done(&mut recv, emit);
556
557        assert!(recv.write(first).is_ok());
558        assert_eq!(recv.len, 19);
559        assert_eq!(recv.off, 0);
560
561        assert_emit_discard(
562            &mut recv,
563            emit,
564            32,
565            19,
566            true,
567            Some(b"helloworldsomething"),
568        );
569        assert_eq!(recv.len, 19);
570        assert_eq!(recv.off, 19);
571
572        assert_emit_discard_done(&mut recv, emit);
573    }
574
575    /// Test shutdown behavior
576    #[rstest]
577    fn shutdown(#[values(true, false)] emit: bool) {
578        let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
579        assert_eq!(recv.len, 0);
580
581        let first = RangeBuf::from(b"hello", 0, false);
582        let second = RangeBuf::from(b"world", 5, false);
583        let third = RangeBuf::from(b"something", 10, false);
584
585        assert!(recv.write(second).is_ok());
586        assert_eq!(recv.len, 10);
587        assert_eq!(recv.off, 0);
588
589        assert_emit_discard_done(&mut recv, emit);
590
591        // shutdown the buffer. Buffer is dropped.
592        assert_eq!(recv.shutdown(), Ok(10));
593        assert_eq!(recv.len, 10);
594        assert_eq!(recv.off, 10);
595        assert_eq!(recv.data.len(), 0);
596
597        assert_emit_discard_done(&mut recv, emit);
598
599        // subsequent writes are validated but not added to the buffer
600        assert!(recv.write(first).is_ok());
601        assert_eq!(recv.len, 10);
602        assert_eq!(recv.off, 10);
603        assert_eq!(recv.data.len(), 0);
604
605        // the max offset of received data can increase and
606        // the recv.off must increase with it
607        assert!(recv.write(third).is_ok());
608        assert_eq!(recv.len, 19);
609        assert_eq!(recv.off, 19);
610        assert_eq!(recv.data.len(), 0);
611
612        // Send a reset
613        assert_emit_discard_done(&mut recv, emit);
614        assert_eq!(
615            recv.reset(42, 123),
616            Ok(RecvBufResetReturn {
617                max_data_delta: 104,
618                consumed_flowcontrol: 104,
619            })
620        );
621        assert_eq!(recv.len, 123);
622        assert_eq!(recv.off, 123);
623        assert_eq!(recv.data.len(), 0);
624
625        assert_emit_discard_done(&mut recv, emit);
626    }
627
628    #[rstest]
629    fn split_read(#[values(true, false)] emit: bool) {
630        let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
631        assert_eq!(recv.len, 0);
632
633        let first = RangeBuf::from(b"something", 0, false);
634        let second = RangeBuf::from(b"helloworld", 9, true);
635
636        assert!(recv.write(first).is_ok());
637        assert_eq!(recv.len, 9);
638        assert_eq!(recv.off, 0);
639
640        assert!(recv.write(second).is_ok());
641        assert_eq!(recv.len, 19);
642        assert_eq!(recv.off, 0);
643
644        assert_emit_discard(&mut recv, emit, 10, 10, false, Some(b"somethingh"));
645        assert_eq!(recv.len, 19);
646        assert_eq!(recv.off, 10);
647
648        assert_emit_discard(&mut recv, emit, 5, 5, false, Some(b"ellow"));
649        assert_eq!(recv.len, 19);
650        assert_eq!(recv.off, 15);
651
652        assert_emit_discard(&mut recv, emit, 5, 4, true, Some(b"orld"));
653        assert_eq!(recv.len, 19);
654        assert_eq!(recv.off, 19);
655    }
656
657    #[rstest]
658    fn incomplete_read(#[values(true, false)] emit: bool) {
659        let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
660        assert_eq!(recv.len, 0);
661
662        let mut buf = [0; 32];
663
664        let first = RangeBuf::from(b"something", 0, false);
665        let second = RangeBuf::from(b"helloworld", 9, true);
666
667        assert!(recv.write(second).is_ok());
668        assert_eq!(recv.len, 19);
669        assert_eq!(recv.off, 0);
670
671        let action = if emit {
672            RecvAction::Emit { out: &mut buf }
673        } else {
674            RecvAction::Discard { len: 32 }
675        };
676        assert_eq!(recv.emit_or_discard(action), Err(Error::Done));
677
678        assert!(recv.write(first).is_ok());
679        assert_eq!(recv.len, 19);
680        assert_eq!(recv.off, 0);
681
682        assert_emit_discard(
683            &mut recv,
684            emit,
685            32,
686            19,
687            true,
688            Some(b"somethinghelloworld"),
689        );
690        assert_eq!(recv.len, 19);
691        assert_eq!(recv.off, 19);
692    }
693
694    #[rstest]
695    fn zero_len_read(#[values(true, false)] emit: bool) {
696        let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
697        assert_eq!(recv.len, 0);
698
699        let first = RangeBuf::from(b"something", 0, false);
700        let second = RangeBuf::from(b"", 9, true);
701
702        assert!(recv.write(first).is_ok());
703        assert_eq!(recv.len, 9);
704        assert_eq!(recv.off, 0);
705        assert_eq!(recv.data.len(), 1);
706
707        assert!(recv.write(second).is_ok());
708        assert_eq!(recv.len, 9);
709        assert_eq!(recv.off, 0);
710        assert_eq!(recv.data.len(), 1);
711
712        assert_emit_discard(&mut recv, emit, 32, 9, true, Some(b"something"));
713        assert_eq!(recv.len, 9);
714        assert_eq!(recv.off, 9);
715    }
716
717    #[rstest]
718    fn past_read(#[values(true, false)] emit: bool) {
719        let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
720        assert_eq!(recv.len, 0);
721
722        let first = RangeBuf::from(b"something", 0, false);
723        let second = RangeBuf::from(b"hello", 3, false);
724        let third = RangeBuf::from(b"ello", 4, true);
725        let fourth = RangeBuf::from(b"ello", 5, true);
726
727        assert!(recv.write(first).is_ok());
728        assert_eq!(recv.len, 9);
729        assert_eq!(recv.off, 0);
730        assert_eq!(recv.data.len(), 1);
731
732        assert_emit_discard(&mut recv, emit, 32, 9, false, Some(b"something"));
733        assert_eq!(recv.len, 9);
734        assert_eq!(recv.off, 9);
735
736        assert!(recv.write(second).is_ok());
737        assert_eq!(recv.len, 9);
738        assert_eq!(recv.off, 9);
739        assert_eq!(recv.data.len(), 0);
740
741        assert_eq!(recv.write(third), Err(Error::FinalSize));
742
743        assert!(recv.write(fourth).is_ok());
744        assert_eq!(recv.len, 9);
745        assert_eq!(recv.off, 9);
746        assert_eq!(recv.data.len(), 0);
747
748        assert_emit_discard_done(&mut recv, emit);
749    }
750
751    #[rstest]
752    fn fully_overlapping_read(#[values(true, false)] emit: bool) {
753        let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
754        assert_eq!(recv.len, 0);
755
756        let first = RangeBuf::from(b"something", 0, false);
757        let second = RangeBuf::from(b"hello", 4, false);
758
759        assert!(recv.write(first).is_ok());
760        assert_eq!(recv.len, 9);
761        assert_eq!(recv.off, 0);
762        assert_eq!(recv.data.len(), 1);
763
764        assert!(recv.write(second).is_ok());
765        assert_eq!(recv.len, 9);
766        assert_eq!(recv.off, 0);
767        assert_eq!(recv.data.len(), 1);
768
769        assert_emit_discard(&mut recv, emit, 32, 9, false, Some(b"something"));
770        assert_eq!(recv.len, 9);
771        assert_eq!(recv.off, 9);
772        assert_eq!(recv.data.len(), 0);
773
774        assert_emit_discard_done(&mut recv, emit);
775    }
776
777    #[rstest]
778    fn fully_overlapping_read2(#[values(true, false)] emit: bool) {
779        let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
780        assert_eq!(recv.len, 0);
781
782        let first = RangeBuf::from(b"something", 0, false);
783        let second = RangeBuf::from(b"hello", 4, false);
784
785        assert!(recv.write(second).is_ok());
786        assert_eq!(recv.len, 9);
787        assert_eq!(recv.off, 0);
788        assert_eq!(recv.data.len(), 1);
789
790        assert!(recv.write(first).is_ok());
791        assert_eq!(recv.len, 9);
792        assert_eq!(recv.off, 0);
793        assert_eq!(recv.data.len(), 2);
794
795        assert_emit_discard(&mut recv, emit, 32, 9, false, Some(b"somehello"));
796        assert_eq!(recv.len, 9);
797        assert_eq!(recv.off, 9);
798        assert_eq!(recv.data.len(), 0);
799
800        assert_emit_discard_done(&mut recv, emit);
801    }
802
803    #[rstest]
804    fn fully_overlapping_read3(#[values(true, false)] emit: bool) {
805        let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
806        assert_eq!(recv.len, 0);
807
808        let first = RangeBuf::from(b"something", 0, false);
809        let second = RangeBuf::from(b"hello", 3, false);
810
811        assert!(recv.write(second).is_ok());
812        assert_eq!(recv.len, 8);
813        assert_eq!(recv.off, 0);
814        assert_eq!(recv.data.len(), 1);
815
816        assert!(recv.write(first).is_ok());
817        assert_eq!(recv.len, 9);
818        assert_eq!(recv.off, 0);
819        assert_eq!(recv.data.len(), 3);
820
821        assert_emit_discard(&mut recv, emit, 32, 9, false, Some(b"somhellog"));
822        assert_eq!(recv.len, 9);
823        assert_eq!(recv.off, 9);
824        assert_eq!(recv.data.len(), 0);
825
826        assert_emit_discard_done(&mut recv, emit);
827    }
828
829    #[rstest]
830    fn fully_overlapping_read_multi(#[values(true, false)] emit: bool) {
831        let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
832        assert_eq!(recv.len, 0);
833
834        let first = RangeBuf::from(b"somethingsomething", 0, false);
835        let second = RangeBuf::from(b"hello", 3, false);
836        let third = RangeBuf::from(b"hello", 12, false);
837
838        assert!(recv.write(second).is_ok());
839        assert_eq!(recv.len, 8);
840        assert_eq!(recv.off, 0);
841        assert_eq!(recv.data.len(), 1);
842
843        assert!(recv.write(third).is_ok());
844        assert_eq!(recv.len, 17);
845        assert_eq!(recv.off, 0);
846        assert_eq!(recv.data.len(), 2);
847
848        assert!(recv.write(first).is_ok());
849        assert_eq!(recv.len, 18);
850        assert_eq!(recv.off, 0);
851        assert_eq!(recv.data.len(), 5);
852
853        assert_emit_discard(
854            &mut recv,
855            emit,
856            32,
857            18,
858            false,
859            Some(b"somhellogsomhellog"),
860        );
861        assert_eq!(recv.len, 18);
862        assert_eq!(recv.off, 18);
863        assert_eq!(recv.data.len(), 0);
864
865        assert_emit_discard_done(&mut recv, emit);
866    }
867
868    #[rstest]
869    fn overlapping_start_read(#[values(true, false)] emit: bool) {
870        let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
871        assert_eq!(recv.len, 0);
872
873        let first = RangeBuf::from(b"something", 0, false);
874        let second = RangeBuf::from(b"hello", 8, true);
875
876        assert!(recv.write(first).is_ok());
877        assert_eq!(recv.len, 9);
878        assert_eq!(recv.off, 0);
879        assert_eq!(recv.data.len(), 1);
880
881        assert!(recv.write(second).is_ok());
882        assert_eq!(recv.len, 13);
883        assert_eq!(recv.off, 0);
884        assert_eq!(recv.data.len(), 2);
885
886        assert_emit_discard(
887            &mut recv,
888            emit,
889            32,
890            13,
891            true,
892            Some(b"somethingello"),
893        );
894
895        assert_eq!(recv.len, 13);
896        assert_eq!(recv.off, 13);
897
898        assert_emit_discard_done(&mut recv, emit);
899    }
900
901    #[rstest]
902    fn overlapping_end_read(#[values(true, false)] emit: bool) {
903        let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
904        assert_eq!(recv.len, 0);
905
906        let first = RangeBuf::from(b"hello", 0, false);
907        let second = RangeBuf::from(b"something", 3, true);
908
909        assert!(recv.write(second).is_ok());
910        assert_eq!(recv.len, 12);
911        assert_eq!(recv.off, 0);
912        assert_eq!(recv.data.len(), 1);
913
914        assert!(recv.write(first).is_ok());
915        assert_eq!(recv.len, 12);
916        assert_eq!(recv.off, 0);
917        assert_eq!(recv.data.len(), 2);
918
919        assert_emit_discard(&mut recv, emit, 32, 12, true, Some(b"helsomething"));
920        assert_eq!(recv.len, 12);
921        assert_eq!(recv.off, 12);
922
923        assert_emit_discard_done(&mut recv, emit);
924    }
925
926    #[rstest]
927    fn overlapping_end_twice_read(#[values(true, false)] emit: bool) {
928        let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
929        assert_eq!(recv.len, 0);
930
931        let first = RangeBuf::from(b"he", 0, false);
932        let second = RangeBuf::from(b"ow", 4, false);
933        let third = RangeBuf::from(b"rl", 7, false);
934        let fourth = RangeBuf::from(b"helloworld", 0, true);
935
936        assert!(recv.write(third).is_ok());
937        assert_eq!(recv.len, 9);
938        assert_eq!(recv.off, 0);
939        assert_eq!(recv.data.len(), 1);
940
941        assert!(recv.write(second).is_ok());
942        assert_eq!(recv.len, 9);
943        assert_eq!(recv.off, 0);
944        assert_eq!(recv.data.len(), 2);
945
946        assert!(recv.write(first).is_ok());
947        assert_eq!(recv.len, 9);
948        assert_eq!(recv.off, 0);
949        assert_eq!(recv.data.len(), 3);
950
951        assert!(recv.write(fourth).is_ok());
952        assert_eq!(recv.len, 10);
953        assert_eq!(recv.off, 0);
954        assert_eq!(recv.data.len(), 6);
955
956        assert_emit_discard(&mut recv, emit, 32, 10, true, Some(b"helloworld"));
957        assert_eq!(recv.len, 10);
958        assert_eq!(recv.off, 10);
959
960        assert_emit_discard_done(&mut recv, emit);
961    }
962
963    #[rstest]
964    fn overlapping_end_twice_and_contained_read(
965        #[values(true, false)] emit: bool,
966    ) {
967        let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
968        assert_eq!(recv.len, 0);
969
970        let first = RangeBuf::from(b"hellow", 0, false);
971        let second = RangeBuf::from(b"barfoo", 10, true);
972        let third = RangeBuf::from(b"rl", 7, false);
973        let fourth = RangeBuf::from(b"elloworldbarfoo", 1, true);
974
975        assert!(recv.write(third).is_ok());
976        assert_eq!(recv.len, 9);
977        assert_eq!(recv.off, 0);
978        assert_eq!(recv.data.len(), 1);
979
980        assert!(recv.write(second).is_ok());
981        assert_eq!(recv.len, 16);
982        assert_eq!(recv.off, 0);
983        assert_eq!(recv.data.len(), 2);
984
985        assert!(recv.write(first).is_ok());
986        assert_eq!(recv.len, 16);
987        assert_eq!(recv.off, 0);
988        assert_eq!(recv.data.len(), 3);
989
990        assert!(recv.write(fourth).is_ok());
991        assert_eq!(recv.len, 16);
992        assert_eq!(recv.off, 0);
993        assert_eq!(recv.data.len(), 5);
994
995        assert_emit_discard(
996            &mut recv,
997            emit,
998            32,
999            16,
1000            true,
1001            Some(b"helloworldbarfoo"),
1002        );
1003        assert_eq!(recv.len, 16);
1004        assert_eq!(recv.off, 16);
1005
1006        assert_emit_discard_done(&mut recv, emit);
1007    }
1008
1009    #[rstest]
1010    fn partially_multi_overlapping_reordered_read(
1011        #[values(true, false)] emit: bool,
1012    ) {
1013        let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
1014        assert_eq!(recv.len, 0);
1015
1016        let first = RangeBuf::from(b"hello", 8, false);
1017        let second = RangeBuf::from(b"something", 0, false);
1018        let third = RangeBuf::from(b"moar", 11, true);
1019
1020        assert!(recv.write(first).is_ok());
1021        assert_eq!(recv.len, 13);
1022        assert_eq!(recv.off, 0);
1023        assert_eq!(recv.data.len(), 1);
1024
1025        assert!(recv.write(second).is_ok());
1026        assert_eq!(recv.len, 13);
1027        assert_eq!(recv.off, 0);
1028        assert_eq!(recv.data.len(), 2);
1029
1030        assert!(recv.write(third).is_ok());
1031        assert_eq!(recv.len, 15);
1032        assert_eq!(recv.off, 0);
1033        assert_eq!(recv.data.len(), 3);
1034
1035        assert_emit_discard(
1036            &mut recv,
1037            emit,
1038            32,
1039            15,
1040            true,
1041            Some(b"somethinhelloar"),
1042        );
1043        assert_eq!(recv.len, 15);
1044        assert_eq!(recv.off, 15);
1045        assert_eq!(recv.data.len(), 0);
1046
1047        assert_emit_discard_done(&mut recv, emit);
1048    }
1049
1050    #[rstest]
1051    fn partially_multi_overlapping_reordered_read2(
1052        #[values(true, false)] emit: bool,
1053    ) {
1054        let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
1055        assert_eq!(recv.len, 0);
1056
1057        let first = RangeBuf::from(b"aaa", 0, false);
1058        let second = RangeBuf::from(b"bbb", 2, false);
1059        let third = RangeBuf::from(b"ccc", 4, false);
1060        let fourth = RangeBuf::from(b"ddd", 6, false);
1061        let fifth = RangeBuf::from(b"eee", 9, false);
1062        let sixth = RangeBuf::from(b"fff", 11, false);
1063
1064        assert!(recv.write(second).is_ok());
1065        assert_eq!(recv.len, 5);
1066        assert_eq!(recv.off, 0);
1067        assert_eq!(recv.data.len(), 1);
1068
1069        assert!(recv.write(fourth).is_ok());
1070        assert_eq!(recv.len, 9);
1071        assert_eq!(recv.off, 0);
1072        assert_eq!(recv.data.len(), 2);
1073
1074        assert!(recv.write(third).is_ok());
1075        assert_eq!(recv.len, 9);
1076        assert_eq!(recv.off, 0);
1077        assert_eq!(recv.data.len(), 3);
1078
1079        assert!(recv.write(first).is_ok());
1080        assert_eq!(recv.len, 9);
1081        assert_eq!(recv.off, 0);
1082        assert_eq!(recv.data.len(), 4);
1083
1084        assert!(recv.write(sixth).is_ok());
1085        assert_eq!(recv.len, 14);
1086        assert_eq!(recv.off, 0);
1087        assert_eq!(recv.data.len(), 5);
1088
1089        assert!(recv.write(fifth).is_ok());
1090        assert_eq!(recv.len, 14);
1091        assert_eq!(recv.off, 0);
1092        assert_eq!(recv.data.len(), 6);
1093
1094        assert_emit_discard(
1095            &mut recv,
1096            emit,
1097            32,
1098            14,
1099            false,
1100            Some(b"aabbbcdddeefff"),
1101        );
1102        assert_eq!(recv.len, 14);
1103        assert_eq!(recv.off, 14);
1104        assert_eq!(recv.data.len(), 0);
1105
1106        assert_emit_discard_done(&mut recv, emit);
1107    }
1108
1109    #[test]
1110    fn mixed_read_actions() {
1111        let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
1112        assert_eq!(recv.len, 0);
1113
1114        let first = RangeBuf::from(b"hello", 0, false);
1115        let second = RangeBuf::from(b"world", 5, false);
1116        let third = RangeBuf::from(b"something", 10, true);
1117
1118        assert!(recv.write(second).is_ok());
1119        assert_eq!(recv.len, 10);
1120        assert_eq!(recv.off, 0);
1121
1122        assert_emit_discard_done(&mut recv, true);
1123        assert_emit_discard_done(&mut recv, false);
1124
1125        assert!(recv.write(third).is_ok());
1126        assert_eq!(recv.len, 19);
1127        assert_eq!(recv.off, 0);
1128
1129        assert_emit_discard_done(&mut recv, true);
1130        assert_emit_discard_done(&mut recv, false);
1131
1132        assert!(recv.write(first).is_ok());
1133        assert_eq!(recv.len, 19);
1134        assert_eq!(recv.off, 0);
1135
1136        assert_emit_discard(&mut recv, true, 5, 5, false, Some(b"hello"));
1137        assert_eq!(recv.len, 19);
1138        assert_eq!(recv.off, 5);
1139
1140        assert_emit_discard(&mut recv, false, 5, 5, false, None);
1141        assert_eq!(recv.len, 19);
1142        assert_eq!(recv.off, 10);
1143
1144        assert_emit_discard(&mut recv, true, 9, 9, true, Some(b"something"));
1145        assert_eq!(recv.len, 19);
1146        assert_eq!(recv.off, 19);
1147
1148        assert_emit_discard_done(&mut recv, true);
1149        assert_emit_discard_done(&mut recv, false);
1150    }
1151}