Skip to main content

quiche/stream/
recv_buf.rs

1// Copyright (C) 2023, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27use std::cmp;
28
29use std::collections::BTreeMap;
30use std::collections::VecDeque;
31
32use std::time::Duration;
33use std::time::Instant;
34
35use crate::stream::RecvAction;
36use crate::stream::RecvBufResetReturn;
37use crate::Error;
38use crate::Result;
39
40use crate::flowcontrol;
41
42use crate::range_buf::RangeBuf;
43
44/// Receive-side stream buffer.
45///
46/// Stream data received by the peer is buffered in a list of data chunks
47/// ordered by offset in ascending order. Contiguous data can then be read
48/// into a slice.
49#[derive(Debug, Default)]
50pub struct RecvBuf {
51    /// Chunks of data received from the peer that have not yet been read by
52    /// the application, ordered by offset.
53    data: BTreeMap<u64, RangeBuf>,
54
55    /// The lowest data offset that has yet to be read by the application.
56    off: u64,
57
58    /// The total length of data received on this stream.
59    len: u64,
60
61    /// Receiver flow controller.
62    flow_control: flowcontrol::FlowControl,
63
64    /// The final stream offset received from the peer, if any.
65    fin_off: Option<u64>,
66
67    /// The error code received via RESET_STREAM.
68    error: Option<u64>,
69
70    /// Whether incoming data is validated but not buffered.
71    drain: bool,
72}
73
74impl RecvBuf {
75    /// Creates a new receive buffer.
76    pub fn new(max_data: u64, initial_window: u64, max_window: u64) -> RecvBuf {
77        RecvBuf {
78            flow_control: flowcontrol::FlowControl::new(
79                max_data,
80                initial_window,
81                max_window,
82            ),
83            ..RecvBuf::default()
84        }
85    }
86
87    /// Inserts the given chunk of data in the buffer.
88    ///
89    /// This also takes care of enforcing stream flow control limits, as well
90    /// as handling incoming data that overlaps data that is already in the
91    /// buffer.
92    pub fn write(&mut self, buf: RangeBuf) -> Result<()> {
93        if buf.max_off() > self.max_data() {
94            return Err(Error::FlowControl);
95        }
96
97        if let Some(fin_off) = self.fin_off {
98            // Stream's size is known, forbid data beyond that point.
99            if buf.max_off() > fin_off {
100                return Err(Error::FinalSize);
101            }
102
103            // Stream's size is already known, forbid changing it.
104            if buf.fin() && fin_off != buf.max_off() {
105                return Err(Error::FinalSize);
106            }
107        }
108
109        // Stream's known size is lower than data already received.
110        if buf.fin() && buf.max_off() < self.len {
111            return Err(Error::FinalSize);
112        }
113
114        // We already saved the final offset, so there's nothing else we
115        // need to keep from the RangeBuf if it's empty.
116        if self.fin_off.is_some() && buf.is_empty() {
117            return Ok(());
118        }
119
120        if buf.fin() {
121            self.fin_off = Some(buf.max_off());
122        }
123
124        // No need to store empty buffer that doesn't carry the fin flag.
125        if !buf.fin() && buf.is_empty() {
126            return Ok(());
127        }
128
129        // Check if data is fully duplicate, that is the buffer's max offset is
130        // lower or equal to the offset already stored in the recv buffer.
131        if self.off >= buf.max_off() {
132            // An exception is applied to empty range buffers, because an empty
133            // buffer's max offset matches the max offset of the recv buffer.
134            //
135            // By this point all spurious empty buffers should have already been
136            // discarded, so allowing empty buffers here should be safe.
137            if !buf.is_empty() {
138                return Ok(());
139            }
140        }
141
142        let mut tmp_bufs = VecDeque::with_capacity(2);
143        tmp_bufs.push_back(buf);
144
145        'tmp: while let Some(mut buf) = tmp_bufs.pop_front() {
146            // Discard incoming data below current stream offset. Bytes up to
147            // `self.off` have already been received so we should not buffer
148            // them again. This is also important to make sure `ready()` doesn't
149            // get stuck when a buffer with lower offset than the stream's is
150            // buffered.
151            if self.off_front() > buf.off() {
152                buf = buf.split_off((self.off_front() - buf.off()) as usize);
153            }
154
155            // Handle overlapping data. If the incoming data's starting offset
156            // is above the previous maximum received offset, there is clearly
157            // no overlap so this logic can be skipped. However do still try to
158            // merge an empty final buffer (i.e. an empty buffer with the fin
159            // flag set, which is the only kind of empty buffer that should
160            // reach this point).
161            if buf.off() < self.max_off() || buf.is_empty() {
162                for (_, b) in self.data.range(buf.off()..) {
163                    let off = buf.off();
164
165                    // We are past the current buffer.
166                    if b.off() > buf.max_off() {
167                        break;
168                    }
169
170                    // New buffer is fully contained in existing buffer.
171                    if off >= b.off() && buf.max_off() <= b.max_off() {
172                        continue 'tmp;
173                    }
174
175                    // New buffer's start overlaps existing buffer.
176                    if off >= b.off() && off < b.max_off() {
177                        buf = buf.split_off((b.max_off() - off) as usize);
178                    }
179
180                    // New buffer's end overlaps existing buffer.
181                    if off < b.off() && buf.max_off() > b.off() {
182                        tmp_bufs
183                            .push_back(buf.split_off((b.off() - off) as usize));
184                    }
185                }
186            }
187
188            self.len = cmp::max(self.len, buf.max_off());
189
190            if !self.drain {
191                self.data.insert(buf.max_off(), buf);
192            } else {
193                // we are not storing any data, off == len
194                self.off = self.len;
195            }
196        }
197
198        Ok(())
199    }
200
201    /// Reads contiguous data from the receive buffer.
202    ///
203    /// Data is written into the given `out` buffer, up to the length of `out`.
204    ///
205    /// Only contiguous data is removed, starting from offset 0. The offset is
206    /// incremented as data is taken out of the receive buffer. If there is no
207    /// data at the expected read offset, the `Done` error is returned.
208    ///
209    /// On success the amount of data read and a flag indicating
210    /// if there is no more data in the buffer, are returned as a tuple.
211    #[inline]
212    pub fn emit(&mut self, mut out: &mut [u8]) -> Result<(usize, bool)> {
213        self.emit_or_discard(RecvAction::Emit { out: &mut out })
214    }
215
216    /// Reads or discards contiguous data from the receive buffer.
217    ///
218    /// Passing an `action` of `StreamRecvAction::Emit` results in data being
219    /// written into the provided buffer, up to its length.
220    ///
221    /// Passing an `action` of `StreamRecvAction::Discard` results in up to
222    /// the indicated number of bytes being discarded without copying.
223    ///
224    /// Only contiguous data is removed, starting from offset 0. The offset is
225    /// incremented as data is taken out of the receive buffer. If there is no
226    /// data at the expected read offset, the `Done` error is returned.
227    ///
228    /// On success the amount of data read or discarded, and a flag indicating
229    /// if there is no more data in the buffer, are returned as a tuple.
230    pub fn emit_or_discard<B: bytes::BufMut>(
231        &mut self, mut action: RecvAction<B>,
232    ) -> Result<(usize, bool)> {
233        let mut len = 0;
234        let mut cap = match &action {
235            RecvAction::Emit { out } => out.remaining_mut(),
236            RecvAction::Discard { len } => *len,
237        };
238
239        if !self.ready() {
240            return Err(Error::Done);
241        }
242
243        // The stream was reset, so clear its data and return the error code
244        // instead.
245        if let Some(e) = self.error {
246            self.data.clear();
247            return Err(Error::StreamReset(e));
248        }
249
250        while cap > 0 && self.ready() {
251            let mut entry = match self.data.first_entry() {
252                Some(entry) => entry,
253                None => break,
254            };
255
256            let buf = entry.get_mut();
257
258            let buf_len = cmp::min(buf.len(), cap);
259
260            // Only copy data if we're emitting, not discarding.
261            if let RecvAction::Emit { ref mut out } = action {
262                // Note: `BufMut::remaining_mut()` cannot "shrink", but BufMut
263                // impls are allowed to grow the buffer, so we
264                // check here that we still have at least
265                // `cap` bytes, but we can't require equality
266                debug_assert!(
267                    cap <= out.remaining_mut(),
268                    "We updated `cap` incorrectly"
269                );
270                out.put_slice(&buf[..buf_len])
271            }
272
273            self.off += buf_len as u64;
274
275            len += buf_len;
276            cap -= buf_len;
277
278            if buf_len < buf.len() {
279                buf.consume(buf_len);
280
281                // We reached the maximum capacity, so end here.
282                break;
283            }
284
285            entry.remove();
286        }
287
288        // Update consumed bytes for flow control.
289        self.flow_control.add_consumed(len as u64);
290
291        Ok((len, self.is_fin()))
292    }
293
294    /// Resets the stream at the given offset.
295    pub fn reset(
296        &mut self, error_code: u64, final_size: u64,
297    ) -> Result<RecvBufResetReturn> {
298        // Stream's size is already known, forbid changing it.
299        if let Some(fin_off) = self.fin_off {
300            if fin_off != final_size {
301                return Err(Error::FinalSize);
302            }
303        }
304
305        // Stream's known size is lower than data already received.
306        if final_size < self.len {
307            return Err(Error::FinalSize);
308        }
309
310        if self.error.is_some() {
311            // We already verified that the final size matches
312            return Ok(RecvBufResetReturn::zero());
313        }
314
315        // Calculate how many bytes need to be removed from the connection flow
316        // control.
317        let result = RecvBufResetReturn {
318            max_data_delta: final_size - self.len,
319            consumed_flowcontrol: final_size - self.off,
320        };
321
322        self.error = Some(error_code);
323
324        // Clear all data already buffered.
325        self.off = final_size;
326
327        self.data.clear();
328
329        // In order to ensure the application is notified when the stream is
330        // reset, enqueue a zero-length buffer at the final size offset.
331        let buf = RangeBuf::from(b"", final_size, true);
332        self.write(buf)?;
333
334        Ok(result)
335    }
336
337    /// Commits the new max_data limit.
338    pub fn update_max_data(&mut self, now: Instant) {
339        self.flow_control.update_max_data(now);
340    }
341
342    /// Return the new max_data limit.
343    pub fn max_data_next(&mut self) -> u64 {
344        self.flow_control.max_data_next()
345    }
346
347    /// Return the current flow control limit.
348    pub fn max_data(&self) -> u64 {
349        self.flow_control.max_data()
350    }
351
352    /// Return the current window.
353    pub fn window(&self) -> u64 {
354        self.flow_control.window()
355    }
356
357    /// Autotune the window size.
358    pub fn autotune_window(&mut self, now: Instant, rtt: Duration) {
359        self.flow_control.autotune_window(now, rtt);
360    }
361
362    /// Shuts down receiving data and returns the number of bytes
363    /// that should be returned to the connection level flow
364    /// control
365    pub fn shutdown(&mut self) -> Result<u64> {
366        if self.drain {
367            return Err(Error::Done);
368        }
369
370        self.drain = true;
371
372        self.data.clear();
373
374        let consumed = self.max_off() - self.off;
375        self.off = self.max_off();
376
377        Ok(consumed)
378    }
379
380    /// Returns the lowest offset of data buffered.
381    pub fn off_front(&self) -> u64 {
382        self.off
383    }
384
385    /// Returns true if we need to update the local flow control limit.
386    pub fn almost_full(&self) -> bool {
387        self.fin_off.is_none() && self.flow_control.should_update_max_data()
388    }
389
390    /// Returns the largest offset ever received.
391    pub fn max_off(&self) -> u64 {
392        self.len
393    }
394
395    /// Returns true if the receive-side of the stream is complete.
396    ///
397    /// This happens when the stream's receive final size is known, and the
398    /// application has read all data from the stream.
399    pub fn is_fin(&self) -> bool {
400        if self.fin_off == Some(self.off) {
401            return true;
402        }
403
404        false
405    }
406
407    /// Returns true if the stream is not storing incoming data.
408    pub fn is_draining(&self) -> bool {
409        self.drain
410    }
411
412    /// Returns true if the stream has data to be read.
413    pub fn ready(&self) -> bool {
414        let (_, buf) = match self.data.first_key_value() {
415            Some(v) => v,
416            None => return false,
417        };
418
419        buf.off() == self.off
420    }
421
422    #[cfg(test)]
423    pub(crate) fn flow_control_for_tests(&self) -> &flowcontrol::FlowControl {
424        &self.flow_control
425    }
426}
427
428#[cfg(test)]
429mod tests {
430    use super::*;
431    use crate::stream::DEFAULT_STREAM_WINDOW;
432    use bytes::BufMut as _;
433    use rstest::rstest;
434
435    // Helper function for testing either buffer emit or discard.
436    //
437    // The `emit` parameter controls whether data is emitted or discarded from
438    // `recv`.
439    //
440    // The `target_len` parameter controls the maximum amount of bytes that
441    // could be read, up to the capacity of `recv`. The `result_len` is the
442    // actual number of bytes that were taken out of `recv`. An assert is
443    // performed on `result_len` to ensure the number of bytes read meets the
444    // caller expectations.
445    //
446    // The `is_fin` parameter relates to the buffer's finished status. An assert
447    // is performed on it to ensure the status meet the caller expectations.
448    //
449    // The `test_bytes` parameter carries an optional slice of bytes. Is set, an
450    // assert is performed against the bytes that were read out of the buffer,
451    // to ensure caller expectations are met.
452    fn assert_emit_discard(
453        recv: &mut RecvBuf, emit: bool, target_len: usize, result_len: usize,
454        is_fin: bool, test_bytes: Option<&[u8]>,
455    ) {
456        let mut buf = Vec::<u8>::with_capacity(512).limit(target_len);
457        let action = if emit {
458            RecvAction::Emit { out: &mut buf }
459        } else {
460            RecvAction::Discard { len: target_len }
461        };
462
463        let (read, fin) = recv.emit_or_discard(action).unwrap();
464
465        let buf = buf.into_inner();
466        if emit {
467            assert_eq!(buf.len(), read);
468            if let Some(v) = test_bytes {
469                assert_eq!(&buf, v);
470            }
471        }
472
473        assert_eq!(read, result_len);
474        assert_eq!(is_fin, fin);
475    }
476
477    // Helper function for testing buffer status for either emit or discard.
478    fn assert_emit_discard_done(recv: &mut RecvBuf, emit: bool) {
479        let mut buf = [0u8; 32];
480        let action = if emit {
481            RecvAction::Emit {
482                out: &mut buf.as_mut_slice(),
483            }
484        } else {
485            RecvAction::Discard { len: 32 }
486        };
487        assert_eq!(recv.emit_or_discard(action), Err(Error::Done));
488    }
489
490    #[rstest]
491    fn empty_read(#[values(true, false)] emit: bool) {
492        let mut recv =
493            RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
494        assert_eq!(recv.len, 0);
495
496        assert_emit_discard_done(&mut recv, emit);
497    }
498
499    #[rstest]
500    fn empty_stream_frame(#[values(true, false)] emit: bool) {
501        let mut recv =
502            RecvBuf::new(15, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
503        assert_eq!(recv.len, 0);
504
505        let buf = RangeBuf::from(b"hello", 0, false);
506        assert!(recv.write(buf).is_ok());
507        assert_eq!(recv.len, 5);
508        assert_eq!(recv.off, 0);
509        assert_eq!(recv.data.len(), 1);
510
511        assert_emit_discard(&mut recv, emit, 32, 5, false, None);
512
513        // Don't store non-fin empty buffer.
514        let buf = RangeBuf::from(b"", 10, false);
515        assert!(recv.write(buf).is_ok());
516        assert_eq!(recv.len, 5);
517        assert_eq!(recv.off, 5);
518        assert_eq!(recv.data.len(), 0);
519
520        // Check flow control for empty buffer.
521        let buf = RangeBuf::from(b"", 16, false);
522        assert_eq!(recv.write(buf), Err(Error::FlowControl));
523
524        // Store fin empty buffer.
525        let buf = RangeBuf::from(b"", 5, true);
526        assert!(recv.write(buf).is_ok());
527        assert_eq!(recv.len, 5);
528        assert_eq!(recv.off, 5);
529        assert_eq!(recv.data.len(), 1);
530
531        // Don't store additional fin empty buffers.
532        let buf = RangeBuf::from(b"", 5, true);
533        assert!(recv.write(buf).is_ok());
534        assert_eq!(recv.len, 5);
535        assert_eq!(recv.off, 5);
536        assert_eq!(recv.data.len(), 1);
537
538        // Don't store additional fin non-empty buffers.
539        let buf = RangeBuf::from(b"aa", 3, true);
540        assert!(recv.write(buf).is_ok());
541        assert_eq!(recv.len, 5);
542        assert_eq!(recv.off, 5);
543        assert_eq!(recv.data.len(), 1);
544
545        // Validate final size with fin empty buffers.
546        let buf = RangeBuf::from(b"", 6, true);
547        assert_eq!(recv.write(buf), Err(Error::FinalSize));
548        let buf = RangeBuf::from(b"", 4, true);
549        assert_eq!(recv.write(buf), Err(Error::FinalSize));
550
551        assert_emit_discard(&mut recv, emit, 32, 0, true, None);
552    }
553
554    #[rstest]
555    fn ordered_read(#[values(true, false)] emit: bool) {
556        let mut recv =
557            RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
558        assert_eq!(recv.len, 0);
559
560        let first = RangeBuf::from(b"hello", 0, false);
561        let second = RangeBuf::from(b"world", 5, false);
562        let third = RangeBuf::from(b"something", 10, true);
563
564        assert!(recv.write(second).is_ok());
565        assert_eq!(recv.len, 10);
566        assert_eq!(recv.off, 0);
567
568        assert_emit_discard_done(&mut recv, emit);
569
570        assert!(recv.write(third).is_ok());
571        assert_eq!(recv.len, 19);
572        assert_eq!(recv.off, 0);
573
574        assert_emit_discard_done(&mut recv, emit);
575
576        assert!(recv.write(first).is_ok());
577        assert_eq!(recv.len, 19);
578        assert_eq!(recv.off, 0);
579
580        assert_emit_discard(
581            &mut recv,
582            emit,
583            32,
584            19,
585            true,
586            Some(b"helloworldsomething"),
587        );
588        assert_eq!(recv.len, 19);
589        assert_eq!(recv.off, 19);
590
591        assert_emit_discard_done(&mut recv, emit);
592    }
593
594    /// Test shutdown behavior
595    #[rstest]
596    fn shutdown(#[values(true, false)] emit: bool) {
597        let mut recv =
598            RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
599        assert_eq!(recv.len, 0);
600
601        let first = RangeBuf::from(b"hello", 0, false);
602        let second = RangeBuf::from(b"world", 5, false);
603        let third = RangeBuf::from(b"something", 10, false);
604
605        assert!(recv.write(second).is_ok());
606        assert_eq!(recv.len, 10);
607        assert_eq!(recv.off, 0);
608
609        assert_emit_discard_done(&mut recv, emit);
610
611        // shutdown the buffer. Buffer is dropped.
612        assert_eq!(recv.shutdown(), Ok(10));
613        assert_eq!(recv.len, 10);
614        assert_eq!(recv.off, 10);
615        assert_eq!(recv.data.len(), 0);
616
617        assert_emit_discard_done(&mut recv, emit);
618
619        // subsequent writes are validated but not added to the buffer
620        assert!(recv.write(first).is_ok());
621        assert_eq!(recv.len, 10);
622        assert_eq!(recv.off, 10);
623        assert_eq!(recv.data.len(), 0);
624
625        // the max offset of received data can increase and
626        // the recv.off must increase with it
627        assert!(recv.write(third).is_ok());
628        assert_eq!(recv.len, 19);
629        assert_eq!(recv.off, 19);
630        assert_eq!(recv.data.len(), 0);
631
632        // Send a reset
633        assert_emit_discard_done(&mut recv, emit);
634        assert_eq!(
635            recv.reset(42, 123),
636            Ok(RecvBufResetReturn {
637                max_data_delta: 104,
638                consumed_flowcontrol: 104,
639            })
640        );
641        assert_eq!(recv.len, 123);
642        assert_eq!(recv.off, 123);
643        assert_eq!(recv.data.len(), 0);
644
645        assert_emit_discard_done(&mut recv, emit);
646    }
647
648    #[rstest]
649    fn split_read(#[values(true, false)] emit: bool) {
650        let mut recv =
651            RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
652        assert_eq!(recv.len, 0);
653
654        let first = RangeBuf::from(b"something", 0, false);
655        let second = RangeBuf::from(b"helloworld", 9, true);
656
657        assert!(recv.write(first).is_ok());
658        assert_eq!(recv.len, 9);
659        assert_eq!(recv.off, 0);
660
661        assert!(recv.write(second).is_ok());
662        assert_eq!(recv.len, 19);
663        assert_eq!(recv.off, 0);
664
665        assert_emit_discard(&mut recv, emit, 10, 10, false, Some(b"somethingh"));
666        assert_eq!(recv.len, 19);
667        assert_eq!(recv.off, 10);
668
669        assert_emit_discard(&mut recv, emit, 5, 5, false, Some(b"ellow"));
670        assert_eq!(recv.len, 19);
671        assert_eq!(recv.off, 15);
672
673        assert_emit_discard(&mut recv, emit, 5, 4, true, Some(b"orld"));
674        assert_eq!(recv.len, 19);
675        assert_eq!(recv.off, 19);
676    }
677
678    #[test]
679    fn split_read_incremental_buf() {
680        let mut recv =
681            RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
682        assert_eq!(recv.len, 0);
683
684        let first = RangeBuf::from(b"something", 0, false);
685        let second = RangeBuf::from(b"helloworld", 9, true);
686
687        assert!(recv.write(first).is_ok());
688        assert_eq!(recv.len, 9);
689        assert_eq!(recv.off, 0);
690
691        assert!(recv.write(second).is_ok());
692        assert_eq!(recv.len, 19);
693        assert_eq!(recv.off, 0);
694
695        let mut buf = Vec::new().limit(10);
696        assert_eq!(
697            recv.emit_or_discard(RecvAction::Emit { out: &mut buf }),
698            Ok((10, false))
699        );
700        assert_eq!(recv.len, 19);
701        assert_eq!(recv.off, 10);
702        assert_eq!(buf.get_ref().len(), 10);
703        assert_eq!(buf.get_ref().as_slice(), b"somethingh");
704
705        buf.set_limit(5);
706        assert_eq!(
707            recv.emit_or_discard(RecvAction::Emit { out: &mut buf }),
708            Ok((5, false))
709        );
710        assert_eq!(recv.len, 19);
711        assert_eq!(recv.off, 15);
712        assert_eq!(buf.get_ref().len(), 15);
713        assert_eq!(buf.get_ref().as_slice(), b"somethinghellow");
714
715        buf.set_limit(42);
716        assert_eq!(
717            recv.emit_or_discard(RecvAction::Emit { out: &mut buf }),
718            Ok((4, true))
719        );
720        assert_eq!(recv.len, 19);
721        assert_eq!(recv.off, 19);
722        assert_eq!(buf.get_ref().len(), 19);
723        assert_eq!(buf.get_ref().as_slice(), b"somethinghelloworld");
724    }
725
726    #[rstest]
727    fn incomplete_read(#[values(true, false)] emit: bool) {
728        let mut recv =
729            RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
730        assert_eq!(recv.len, 0);
731
732        let mut buf = [0u8; 32];
733
734        let first = RangeBuf::from(b"something", 0, false);
735        let second = RangeBuf::from(b"helloworld", 9, true);
736
737        assert!(recv.write(second).is_ok());
738        assert_eq!(recv.len, 19);
739        assert_eq!(recv.off, 0);
740
741        let action = if emit {
742            RecvAction::Emit {
743                out: &mut buf.as_mut_slice(),
744            }
745        } else {
746            RecvAction::Discard { len: 32 }
747        };
748        assert_eq!(recv.emit_or_discard(action), Err(Error::Done));
749
750        assert!(recv.write(first).is_ok());
751        assert_eq!(recv.len, 19);
752        assert_eq!(recv.off, 0);
753
754        assert_emit_discard(
755            &mut recv,
756            emit,
757            32,
758            19,
759            true,
760            Some(b"somethinghelloworld"),
761        );
762        assert_eq!(recv.len, 19);
763        assert_eq!(recv.off, 19);
764    }
765
766    #[rstest]
767    fn zero_len_read(#[values(true, false)] emit: bool) {
768        let mut recv =
769            RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
770        assert_eq!(recv.len, 0);
771
772        let first = RangeBuf::from(b"something", 0, false);
773        let second = RangeBuf::from(b"", 9, true);
774
775        assert!(recv.write(first).is_ok());
776        assert_eq!(recv.len, 9);
777        assert_eq!(recv.off, 0);
778        assert_eq!(recv.data.len(), 1);
779
780        assert!(recv.write(second).is_ok());
781        assert_eq!(recv.len, 9);
782        assert_eq!(recv.off, 0);
783        assert_eq!(recv.data.len(), 1);
784
785        assert_emit_discard(&mut recv, emit, 32, 9, true, Some(b"something"));
786        assert_eq!(recv.len, 9);
787        assert_eq!(recv.off, 9);
788    }
789
790    #[rstest]
791    fn past_read(#[values(true, false)] emit: bool) {
792        let mut recv =
793            RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
794        assert_eq!(recv.len, 0);
795
796        let first = RangeBuf::from(b"something", 0, false);
797        let second = RangeBuf::from(b"hello", 3, false);
798        let third = RangeBuf::from(b"ello", 4, true);
799        let fourth = RangeBuf::from(b"ello", 5, true);
800
801        assert!(recv.write(first).is_ok());
802        assert_eq!(recv.len, 9);
803        assert_eq!(recv.off, 0);
804        assert_eq!(recv.data.len(), 1);
805
806        assert_emit_discard(&mut recv, emit, 32, 9, false, Some(b"something"));
807        assert_eq!(recv.len, 9);
808        assert_eq!(recv.off, 9);
809
810        assert!(recv.write(second).is_ok());
811        assert_eq!(recv.len, 9);
812        assert_eq!(recv.off, 9);
813        assert_eq!(recv.data.len(), 0);
814
815        assert_eq!(recv.write(third), Err(Error::FinalSize));
816
817        assert!(recv.write(fourth).is_ok());
818        assert_eq!(recv.len, 9);
819        assert_eq!(recv.off, 9);
820        assert_eq!(recv.data.len(), 0);
821
822        assert_emit_discard_done(&mut recv, emit);
823    }
824
825    #[rstest]
826    fn fully_overlapping_read(#[values(true, false)] emit: bool) {
827        let mut recv =
828            RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
829        assert_eq!(recv.len, 0);
830
831        let first = RangeBuf::from(b"something", 0, false);
832        let second = RangeBuf::from(b"hello", 4, false);
833
834        assert!(recv.write(first).is_ok());
835        assert_eq!(recv.len, 9);
836        assert_eq!(recv.off, 0);
837        assert_eq!(recv.data.len(), 1);
838
839        assert!(recv.write(second).is_ok());
840        assert_eq!(recv.len, 9);
841        assert_eq!(recv.off, 0);
842        assert_eq!(recv.data.len(), 1);
843
844        assert_emit_discard(&mut recv, emit, 32, 9, false, Some(b"something"));
845        assert_eq!(recv.len, 9);
846        assert_eq!(recv.off, 9);
847        assert_eq!(recv.data.len(), 0);
848
849        assert_emit_discard_done(&mut recv, emit);
850    }
851
852    #[rstest]
853    fn fully_overlapping_read2(#[values(true, false)] emit: bool) {
854        let mut recv =
855            RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
856        assert_eq!(recv.len, 0);
857
858        let first = RangeBuf::from(b"something", 0, false);
859        let second = RangeBuf::from(b"hello", 4, false);
860
861        assert!(recv.write(second).is_ok());
862        assert_eq!(recv.len, 9);
863        assert_eq!(recv.off, 0);
864        assert_eq!(recv.data.len(), 1);
865
866        assert!(recv.write(first).is_ok());
867        assert_eq!(recv.len, 9);
868        assert_eq!(recv.off, 0);
869        assert_eq!(recv.data.len(), 2);
870
871        assert_emit_discard(&mut recv, emit, 32, 9, false, Some(b"somehello"));
872        assert_eq!(recv.len, 9);
873        assert_eq!(recv.off, 9);
874        assert_eq!(recv.data.len(), 0);
875
876        assert_emit_discard_done(&mut recv, emit);
877    }
878
879    #[rstest]
880    fn fully_overlapping_read3(#[values(true, false)] emit: bool) {
881        let mut recv =
882            RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
883        assert_eq!(recv.len, 0);
884
885        let first = RangeBuf::from(b"something", 0, false);
886        let second = RangeBuf::from(b"hello", 3, false);
887
888        assert!(recv.write(second).is_ok());
889        assert_eq!(recv.len, 8);
890        assert_eq!(recv.off, 0);
891        assert_eq!(recv.data.len(), 1);
892
893        assert!(recv.write(first).is_ok());
894        assert_eq!(recv.len, 9);
895        assert_eq!(recv.off, 0);
896        assert_eq!(recv.data.len(), 3);
897
898        assert_emit_discard(&mut recv, emit, 32, 9, false, Some(b"somhellog"));
899        assert_eq!(recv.len, 9);
900        assert_eq!(recv.off, 9);
901        assert_eq!(recv.data.len(), 0);
902
903        assert_emit_discard_done(&mut recv, emit);
904    }
905
906    #[rstest]
907    fn fully_overlapping_read_multi(#[values(true, false)] emit: bool) {
908        let mut recv =
909            RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
910        assert_eq!(recv.len, 0);
911
912        let first = RangeBuf::from(b"somethingsomething", 0, false);
913        let second = RangeBuf::from(b"hello", 3, false);
914        let third = RangeBuf::from(b"hello", 12, false);
915
916        assert!(recv.write(second).is_ok());
917        assert_eq!(recv.len, 8);
918        assert_eq!(recv.off, 0);
919        assert_eq!(recv.data.len(), 1);
920
921        assert!(recv.write(third).is_ok());
922        assert_eq!(recv.len, 17);
923        assert_eq!(recv.off, 0);
924        assert_eq!(recv.data.len(), 2);
925
926        assert!(recv.write(first).is_ok());
927        assert_eq!(recv.len, 18);
928        assert_eq!(recv.off, 0);
929        assert_eq!(recv.data.len(), 5);
930
931        assert_emit_discard(
932            &mut recv,
933            emit,
934            32,
935            18,
936            false,
937            Some(b"somhellogsomhellog"),
938        );
939        assert_eq!(recv.len, 18);
940        assert_eq!(recv.off, 18);
941        assert_eq!(recv.data.len(), 0);
942
943        assert_emit_discard_done(&mut recv, emit);
944    }
945
946    #[rstest]
947    fn overlapping_start_read(#[values(true, false)] emit: bool) {
948        let mut recv =
949            RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
950        assert_eq!(recv.len, 0);
951
952        let first = RangeBuf::from(b"something", 0, false);
953        let second = RangeBuf::from(b"hello", 8, true);
954
955        assert!(recv.write(first).is_ok());
956        assert_eq!(recv.len, 9);
957        assert_eq!(recv.off, 0);
958        assert_eq!(recv.data.len(), 1);
959
960        assert!(recv.write(second).is_ok());
961        assert_eq!(recv.len, 13);
962        assert_eq!(recv.off, 0);
963        assert_eq!(recv.data.len(), 2);
964
965        assert_emit_discard(
966            &mut recv,
967            emit,
968            32,
969            13,
970            true,
971            Some(b"somethingello"),
972        );
973
974        assert_eq!(recv.len, 13);
975        assert_eq!(recv.off, 13);
976
977        assert_emit_discard_done(&mut recv, emit);
978    }
979
980    #[rstest]
981    fn overlapping_end_read(#[values(true, false)] emit: bool) {
982        let mut recv =
983            RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
984        assert_eq!(recv.len, 0);
985
986        let first = RangeBuf::from(b"hello", 0, false);
987        let second = RangeBuf::from(b"something", 3, true);
988
989        assert!(recv.write(second).is_ok());
990        assert_eq!(recv.len, 12);
991        assert_eq!(recv.off, 0);
992        assert_eq!(recv.data.len(), 1);
993
994        assert!(recv.write(first).is_ok());
995        assert_eq!(recv.len, 12);
996        assert_eq!(recv.off, 0);
997        assert_eq!(recv.data.len(), 2);
998
999        assert_emit_discard(&mut recv, emit, 32, 12, true, Some(b"helsomething"));
1000        assert_eq!(recv.len, 12);
1001        assert_eq!(recv.off, 12);
1002
1003        assert_emit_discard_done(&mut recv, emit);
1004    }
1005
1006    #[rstest]
1007    fn overlapping_end_twice_read(#[values(true, false)] emit: bool) {
1008        let mut recv =
1009            RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
1010        assert_eq!(recv.len, 0);
1011
1012        let first = RangeBuf::from(b"he", 0, false);
1013        let second = RangeBuf::from(b"ow", 4, false);
1014        let third = RangeBuf::from(b"rl", 7, false);
1015        let fourth = RangeBuf::from(b"helloworld", 0, true);
1016
1017        assert!(recv.write(third).is_ok());
1018        assert_eq!(recv.len, 9);
1019        assert_eq!(recv.off, 0);
1020        assert_eq!(recv.data.len(), 1);
1021
1022        assert!(recv.write(second).is_ok());
1023        assert_eq!(recv.len, 9);
1024        assert_eq!(recv.off, 0);
1025        assert_eq!(recv.data.len(), 2);
1026
1027        assert!(recv.write(first).is_ok());
1028        assert_eq!(recv.len, 9);
1029        assert_eq!(recv.off, 0);
1030        assert_eq!(recv.data.len(), 3);
1031
1032        assert!(recv.write(fourth).is_ok());
1033        assert_eq!(recv.len, 10);
1034        assert_eq!(recv.off, 0);
1035        assert_eq!(recv.data.len(), 6);
1036
1037        assert_emit_discard(&mut recv, emit, 32, 10, true, Some(b"helloworld"));
1038        assert_eq!(recv.len, 10);
1039        assert_eq!(recv.off, 10);
1040
1041        assert_emit_discard_done(&mut recv, emit);
1042    }
1043
1044    #[rstest]
1045    fn overlapping_end_twice_and_contained_read(
1046        #[values(true, false)] emit: bool,
1047    ) {
1048        let mut recv =
1049            RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
1050        assert_eq!(recv.len, 0);
1051
1052        let first = RangeBuf::from(b"hellow", 0, false);
1053        let second = RangeBuf::from(b"barfoo", 10, true);
1054        let third = RangeBuf::from(b"rl", 7, false);
1055        let fourth = RangeBuf::from(b"elloworldbarfoo", 1, true);
1056
1057        assert!(recv.write(third).is_ok());
1058        assert_eq!(recv.len, 9);
1059        assert_eq!(recv.off, 0);
1060        assert_eq!(recv.data.len(), 1);
1061
1062        assert!(recv.write(second).is_ok());
1063        assert_eq!(recv.len, 16);
1064        assert_eq!(recv.off, 0);
1065        assert_eq!(recv.data.len(), 2);
1066
1067        assert!(recv.write(first).is_ok());
1068        assert_eq!(recv.len, 16);
1069        assert_eq!(recv.off, 0);
1070        assert_eq!(recv.data.len(), 3);
1071
1072        assert!(recv.write(fourth).is_ok());
1073        assert_eq!(recv.len, 16);
1074        assert_eq!(recv.off, 0);
1075        assert_eq!(recv.data.len(), 5);
1076
1077        assert_emit_discard(
1078            &mut recv,
1079            emit,
1080            32,
1081            16,
1082            true,
1083            Some(b"helloworldbarfoo"),
1084        );
1085        assert_eq!(recv.len, 16);
1086        assert_eq!(recv.off, 16);
1087
1088        assert_emit_discard_done(&mut recv, emit);
1089    }
1090
1091    #[rstest]
1092    fn partially_multi_overlapping_reordered_read(
1093        #[values(true, false)] emit: bool,
1094    ) {
1095        let mut recv =
1096            RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
1097        assert_eq!(recv.len, 0);
1098
1099        let first = RangeBuf::from(b"hello", 8, false);
1100        let second = RangeBuf::from(b"something", 0, false);
1101        let third = RangeBuf::from(b"moar", 11, true);
1102
1103        assert!(recv.write(first).is_ok());
1104        assert_eq!(recv.len, 13);
1105        assert_eq!(recv.off, 0);
1106        assert_eq!(recv.data.len(), 1);
1107
1108        assert!(recv.write(second).is_ok());
1109        assert_eq!(recv.len, 13);
1110        assert_eq!(recv.off, 0);
1111        assert_eq!(recv.data.len(), 2);
1112
1113        assert!(recv.write(third).is_ok());
1114        assert_eq!(recv.len, 15);
1115        assert_eq!(recv.off, 0);
1116        assert_eq!(recv.data.len(), 3);
1117
1118        assert_emit_discard(
1119            &mut recv,
1120            emit,
1121            32,
1122            15,
1123            true,
1124            Some(b"somethinhelloar"),
1125        );
1126        assert_eq!(recv.len, 15);
1127        assert_eq!(recv.off, 15);
1128        assert_eq!(recv.data.len(), 0);
1129
1130        assert_emit_discard_done(&mut recv, emit);
1131    }
1132
1133    #[rstest]
1134    fn partially_multi_overlapping_reordered_read2(
1135        #[values(true, false)] emit: bool,
1136    ) {
1137        let mut recv =
1138            RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
1139        assert_eq!(recv.len, 0);
1140
1141        let first = RangeBuf::from(b"aaa", 0, false);
1142        let second = RangeBuf::from(b"bbb", 2, false);
1143        let third = RangeBuf::from(b"ccc", 4, false);
1144        let fourth = RangeBuf::from(b"ddd", 6, false);
1145        let fifth = RangeBuf::from(b"eee", 9, false);
1146        let sixth = RangeBuf::from(b"fff", 11, false);
1147
1148        assert!(recv.write(second).is_ok());
1149        assert_eq!(recv.len, 5);
1150        assert_eq!(recv.off, 0);
1151        assert_eq!(recv.data.len(), 1);
1152
1153        assert!(recv.write(fourth).is_ok());
1154        assert_eq!(recv.len, 9);
1155        assert_eq!(recv.off, 0);
1156        assert_eq!(recv.data.len(), 2);
1157
1158        assert!(recv.write(third).is_ok());
1159        assert_eq!(recv.len, 9);
1160        assert_eq!(recv.off, 0);
1161        assert_eq!(recv.data.len(), 3);
1162
1163        assert!(recv.write(first).is_ok());
1164        assert_eq!(recv.len, 9);
1165        assert_eq!(recv.off, 0);
1166        assert_eq!(recv.data.len(), 4);
1167
1168        assert!(recv.write(sixth).is_ok());
1169        assert_eq!(recv.len, 14);
1170        assert_eq!(recv.off, 0);
1171        assert_eq!(recv.data.len(), 5);
1172
1173        assert!(recv.write(fifth).is_ok());
1174        assert_eq!(recv.len, 14);
1175        assert_eq!(recv.off, 0);
1176        assert_eq!(recv.data.len(), 6);
1177
1178        assert_emit_discard(
1179            &mut recv,
1180            emit,
1181            32,
1182            14,
1183            false,
1184            Some(b"aabbbcdddeefff"),
1185        );
1186        assert_eq!(recv.len, 14);
1187        assert_eq!(recv.off, 14);
1188        assert_eq!(recv.data.len(), 0);
1189
1190        assert_emit_discard_done(&mut recv, emit);
1191    }
1192
1193    #[test]
1194    fn mixed_read_actions() {
1195        let mut recv =
1196            RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
1197        assert_eq!(recv.len, 0);
1198
1199        let first = RangeBuf::from(b"hello", 0, false);
1200        let second = RangeBuf::from(b"world", 5, false);
1201        let third = RangeBuf::from(b"something", 10, true);
1202
1203        assert!(recv.write(second).is_ok());
1204        assert_eq!(recv.len, 10);
1205        assert_eq!(recv.off, 0);
1206
1207        assert_emit_discard_done(&mut recv, true);
1208        assert_emit_discard_done(&mut recv, false);
1209
1210        assert!(recv.write(third).is_ok());
1211        assert_eq!(recv.len, 19);
1212        assert_eq!(recv.off, 0);
1213
1214        assert_emit_discard_done(&mut recv, true);
1215        assert_emit_discard_done(&mut recv, false);
1216
1217        assert!(recv.write(first).is_ok());
1218        assert_eq!(recv.len, 19);
1219        assert_eq!(recv.off, 0);
1220
1221        assert_emit_discard(&mut recv, true, 5, 5, false, Some(b"hello"));
1222        assert_eq!(recv.len, 19);
1223        assert_eq!(recv.off, 5);
1224
1225        assert_emit_discard(&mut recv, false, 5, 5, false, None);
1226        assert_eq!(recv.len, 19);
1227        assert_eq!(recv.off, 10);
1228
1229        assert_emit_discard(&mut recv, true, 9, 9, true, Some(b"something"));
1230        assert_eq!(recv.len, 19);
1231        assert_eq!(recv.off, 19);
1232
1233        assert_emit_discard_done(&mut recv, true);
1234        assert_emit_discard_done(&mut recv, false);
1235    }
1236}