Skip to main content

quiche/stream/
recv_buf.rs

1// Copyright (C) 2023, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27use std::cmp;
28
29use std::collections::BTreeMap;
30use std::collections::VecDeque;
31
32use std::time::Duration;
33use std::time::Instant;
34
35use crate::stream::RecvAction;
36use crate::stream::RecvBufResetReturn;
37use crate::Error;
38use crate::Result;
39
40use crate::flowcontrol;
41
42use crate::range_buf::RangeBuf;
43
44/// Receive-side stream buffer.
45///
46/// Stream data received by the peer is buffered in a list of data chunks
47/// ordered by offset in ascending order. Contiguous data can then be read
48/// into a slice.
49#[derive(Debug, Default)]
50pub struct RecvBuf {
51    /// Chunks of data received from the peer that have not yet been read by
52    /// the application, ordered by offset.
53    data: BTreeMap<u64, RangeBuf>,
54
55    /// The lowest data offset that has yet to be read by the application.
56    off: u64,
57
58    /// The total length of data received on this stream.
59    len: u64,
60
61    /// Receiver flow controller.
62    flow_control: flowcontrol::FlowControl,
63
64    /// The final stream offset received from the peer, if any.
65    fin_off: Option<u64>,
66
67    /// The error code received via RESET_STREAM.
68    error: Option<u64>,
69
70    /// Whether incoming data is validated but not buffered.
71    drain: bool,
72}
73
74impl RecvBuf {
75    /// Creates a new receive buffer.
76    pub fn new(max_data: u64, initial_window: u64, max_window: u64) -> RecvBuf {
77        RecvBuf {
78            flow_control: flowcontrol::FlowControl::new(
79                max_data,
80                initial_window,
81                max_window,
82            ),
83            ..RecvBuf::default()
84        }
85    }
86
87    /// Inserts the given chunk of data in the buffer.
88    ///
89    /// This also takes care of enforcing stream flow control limits, as well
90    /// as handling incoming data that overlaps data that is already in the
91    /// buffer.
92    pub fn write(&mut self, buf: RangeBuf) -> Result<()> {
93        if buf.max_off() > self.max_data() {
94            return Err(Error::FlowControl);
95        }
96
97        if let Some(fin_off) = self.fin_off {
98            // Stream's size is known, forbid data beyond that point.
99            if buf.max_off() > fin_off {
100                return Err(Error::FinalSize);
101            }
102
103            // Stream's size is already known, forbid changing it.
104            if buf.fin() && fin_off != buf.max_off() {
105                return Err(Error::FinalSize);
106            }
107        }
108
109        // Stream's known size is lower than data already received.
110        if buf.fin() && buf.max_off() < self.len {
111            return Err(Error::FinalSize);
112        }
113
114        // We already saved the final offset, so there's nothing else we
115        // need to keep from the RangeBuf if it's empty.
116        if self.fin_off.is_some() && buf.is_empty() {
117            return Ok(());
118        }
119
120        if buf.fin() {
121            self.fin_off = Some(buf.max_off());
122        }
123
124        // No need to store empty buffer that doesn't carry the fin flag.
125        if !buf.fin() && buf.is_empty() {
126            return Ok(());
127        }
128
129        // Check if data is fully duplicate, that is the buffer's max offset is
130        // lower or equal to the offset already stored in the recv buffer.
131        if self.off >= buf.max_off() {
132            // An exception is applied to empty range buffers, because an empty
133            // buffer's max offset matches the max offset of the recv buffer.
134            //
135            // By this point all spurious empty buffers should have already been
136            // discarded, so allowing empty buffers here should be safe.
137            if !buf.is_empty() {
138                return Ok(());
139            }
140        }
141
142        let mut tmp_bufs = VecDeque::with_capacity(2);
143        tmp_bufs.push_back(buf);
144
145        'tmp: while let Some(mut buf) = tmp_bufs.pop_front() {
146            // Discard incoming data below current stream offset. Bytes up to
147            // `self.off` have already been received so we should not buffer
148            // them again. This is also important to make sure `ready()` doesn't
149            // get stuck when a buffer with lower offset than the stream's is
150            // buffered.
151            if self.off_front() > buf.off() {
152                buf = buf.split_off((self.off_front() - buf.off()) as usize);
153            }
154
155            // Handle overlapping data. If the incoming data's starting offset
156            // is above the previous maximum received offset, there is clearly
157            // no overlap so this logic can be skipped. However do still try to
158            // merge an empty final buffer (i.e. an empty buffer with the fin
159            // flag set, which is the only kind of empty buffer that should
160            // reach this point).
161            if buf.off() < self.max_off() || buf.is_empty() {
162                for (_, b) in self.data.range(buf.off()..) {
163                    let off = buf.off();
164
165                    // We are past the current buffer.
166                    if b.off() > buf.max_off() {
167                        break;
168                    }
169
170                    // New buffer is fully contained in existing buffer.
171                    if off >= b.off() && buf.max_off() <= b.max_off() {
172                        continue 'tmp;
173                    }
174
175                    // New buffer's start overlaps existing buffer.
176                    if off >= b.off() && off < b.max_off() {
177                        buf = buf.split_off((b.max_off() - off) as usize);
178                    }
179
180                    // New buffer's end overlaps existing buffer.
181                    if off < b.off() && buf.max_off() > b.off() {
182                        tmp_bufs
183                            .push_back(buf.split_off((b.off() - off) as usize));
184                    }
185                }
186            }
187
188            self.len = cmp::max(self.len, buf.max_off());
189
190            if !self.drain {
191                self.data.insert(buf.max_off(), buf);
192            } else {
193                // we are not storing any data, off == len
194                self.off = self.len;
195            }
196        }
197
198        Ok(())
199    }
200
201    /// Reads contiguous data from the receive buffer.
202    ///
203    /// Data is written into the given `out` buffer, up to the length of `out`.
204    ///
205    /// Only contiguous data is removed, starting from offset 0. The offset is
206    /// incremented as data is taken out of the receive buffer. If there is no
207    /// data at the expected read offset, the `Done` error is returned.
208    ///
209    /// On success the amount of data read and a flag indicating
210    /// if there is no more data in the buffer, are returned as a tuple.
211    #[inline]
212    pub fn emit(&mut self, mut out: &mut [u8]) -> Result<(usize, bool)> {
213        self.emit_or_discard(RecvAction::Emit { out: &mut out })
214    }
215
216    /// Reads or discards contiguous data from the receive buffer.
217    ///
218    /// Passing an `action` of `StreamRecvAction::Emit` results in data being
219    /// written into the provided buffer, up to its length.
220    ///
221    /// Passing an `action` of `StreamRecvAction::Discard` results in up to
222    /// the indicated number of bytes being discarded without copying.
223    ///
224    /// Only contiguous data is removed, starting from offset 0. The offset is
225    /// incremented as data is taken out of the receive buffer. If there is no
226    /// data at the expected read offset, the `Done` error is returned.
227    ///
228    /// On success the amount of data read or discarded, and a flag indicating
229    /// if there is no more data in the buffer, are returned as a tuple.
230    pub fn emit_or_discard<B: bytes::BufMut>(
231        &mut self, mut action: RecvAction<B>,
232    ) -> Result<(usize, bool)> {
233        let mut len = 0;
234        let mut cap = match &action {
235            RecvAction::Emit { out } => out.remaining_mut(),
236            RecvAction::Discard { len } => *len,
237        };
238
239        if !self.ready() {
240            return Err(Error::Done);
241        }
242
243        // The stream was reset, so clear its data and return the error code
244        // instead.
245        if let Some(e) = self.error {
246            self.data.clear();
247            return Err(Error::StreamReset(e));
248        }
249
250        while cap > 0 && self.ready() {
251            let mut entry = match self.data.first_entry() {
252                Some(entry) => entry,
253                None => break,
254            };
255
256            let buf = entry.get_mut();
257
258            let buf_len = cmp::min(buf.len(), cap);
259
260            // Only copy data if we're emitting, not discarding.
261            if let RecvAction::Emit { ref mut out } = action {
262                // Note: `BufMut::remaining_mut()` cannot "shrink", but BufMut
263                // impls are allowed to grow the buffer, so we
264                // check here that we still have at least
265                // `cap` bytes, but we can't require equality
266                debug_assert!(
267                    cap <= out.remaining_mut(),
268                    "We updated `cap` incorrectly"
269                );
270                out.put_slice(&buf[..buf_len])
271            }
272
273            self.off += buf_len as u64;
274
275            len += buf_len;
276            cap -= buf_len;
277
278            if buf_len < buf.len() {
279                buf.consume(buf_len);
280
281                // We reached the maximum capacity, so end here.
282                break;
283            }
284
285            entry.remove();
286        }
287
288        // Update consumed bytes for flow control.
289        self.flow_control.add_consumed(len as u64);
290
291        Ok((len, self.is_fin()))
292    }
293
294    /// Resets the stream at the given offset.
295    pub fn reset(
296        &mut self, error_code: u64, final_size: u64,
297    ) -> Result<RecvBufResetReturn> {
298        // Stream's size is already known, forbid changing it.
299        if let Some(fin_off) = self.fin_off {
300            if fin_off != final_size {
301                return Err(Error::FinalSize);
302            }
303        }
304
305        // Stream's known size is lower than data already received.
306        if final_size < self.len {
307            return Err(Error::FinalSize);
308        }
309
310        if self.error.is_some() {
311            // We already verified that the final size matches
312            return Ok(RecvBufResetReturn::zero());
313        }
314
315        // Calculate how many bytes need to be removed from the connection flow
316        // control.
317        let result = RecvBufResetReturn {
318            max_data_delta: final_size - self.len,
319            consumed_flowcontrol: final_size - self.off,
320        };
321
322        self.error = Some(error_code);
323
324        // Clear all data already buffered.
325        self.off = final_size;
326
327        self.data.clear();
328
329        // In order to ensure the application is notified when the stream is
330        // reset, enqueue a zero-length buffer at the final size offset.
331        let buf = RangeBuf::from(b"", final_size, true);
332        self.write(buf)?;
333
334        Ok(result)
335    }
336
337    /// Commits the new max_data limit.
338    pub fn update_max_data(&mut self, now: Instant) {
339        self.flow_control.update_max_data(now);
340    }
341
342    /// Return the new max_data limit.
343    pub fn max_data_next(&mut self) -> u64 {
344        self.flow_control.max_data_next()
345    }
346
347    /// Return the current flow control limit.
348    pub fn max_data(&self) -> u64 {
349        self.flow_control.max_data()
350    }
351
352    /// Return the current window.
353    pub fn window(&self) -> u64 {
354        self.flow_control.window()
355    }
356
357    /// Autotune the window size.
358    pub fn autotune_window(&mut self, now: Instant, rtt: Duration) {
359        self.flow_control.autotune_window(now, rtt);
360    }
361
362    /// Shuts down receiving data and returns the number of bytes
363    /// that should be returned to the connection level flow
364    /// control
365    pub fn shutdown(&mut self) -> Result<u64> {
366        if self.drain {
367            return Err(Error::Done);
368        }
369
370        self.drain = true;
371
372        self.data.clear();
373
374        let consumed = self.max_off() - self.off;
375        self.off = self.max_off();
376
377        Ok(consumed)
378    }
379
380    /// Returns the lowest offset of data buffered.
381    pub fn off_front(&self) -> u64 {
382        self.off
383    }
384
385    /// Returns true if we need to update the local flow control limit.
386    pub fn almost_full(&self) -> bool {
387        self.fin_off.is_none() && self.flow_control.should_update_max_data()
388    }
389
390    /// Returns the largest offset ever received.
391    pub fn max_off(&self) -> u64 {
392        self.len
393    }
394
395    /// Returns true if the receive-side of the stream is complete.
396    ///
397    /// This happens when the stream's receive final size is known, and the
398    /// application has read all data from the stream.
399    pub fn is_fin(&self) -> bool {
400        if self.fin_off == Some(self.off) {
401            return true;
402        }
403
404        false
405    }
406
407    /// Returns true if the stream is not storing incoming data.
408    pub fn is_draining(&self) -> bool {
409        self.drain
410    }
411
412    /// Returns true if the stream has data to be read.
413    pub fn ready(&self) -> bool {
414        let (_, buf) = match self.data.first_key_value() {
415            Some(v) => v,
416            None => return false,
417        };
418
419        buf.off() == self.off
420    }
421
422    #[cfg(test)]
423    pub(crate) fn flow_control_for_tests(&self) -> &flowcontrol::FlowControl {
424        &self.flow_control
425    }
426}
427
428#[cfg(test)]
429mod tests {
430    use super::*;
431
432    /// The default size of the receiver stream flow control window.
433    const DEFAULT_STREAM_WINDOW: u64 = 32 * 1024;
434    use bytes::BufMut as _;
435    use rstest::rstest;
436
437    // Helper function for testing either buffer emit or discard.
438    //
439    // The `emit` parameter controls whether data is emitted or discarded from
440    // `recv`.
441    //
442    // The `target_len` parameter controls the maximum amount of bytes that
443    // could be read, up to the capacity of `recv`. The `result_len` is the
444    // actual number of bytes that were taken out of `recv`. An assert is
445    // performed on `result_len` to ensure the number of bytes read meets the
446    // caller expectations.
447    //
448    // The `is_fin` parameter relates to the buffer's finished status. An assert
449    // is performed on it to ensure the status meet the caller expectations.
450    //
451    // The `test_bytes` parameter carries an optional slice of bytes. Is set, an
452    // assert is performed against the bytes that were read out of the buffer,
453    // to ensure caller expectations are met.
454    fn assert_emit_discard(
455        recv: &mut RecvBuf, emit: bool, target_len: usize, result_len: usize,
456        is_fin: bool, test_bytes: Option<&[u8]>,
457    ) {
458        let mut buf = Vec::<u8>::with_capacity(512).limit(target_len);
459        let action = if emit {
460            RecvAction::Emit { out: &mut buf }
461        } else {
462            RecvAction::Discard { len: target_len }
463        };
464
465        let (read, fin) = recv.emit_or_discard(action).unwrap();
466
467        let buf = buf.into_inner();
468        if emit {
469            assert_eq!(buf.len(), read);
470            if let Some(v) = test_bytes {
471                assert_eq!(&buf, v);
472            }
473        }
474
475        assert_eq!(read, result_len);
476        assert_eq!(is_fin, fin);
477    }
478
479    // Helper function for testing buffer status for either emit or discard.
480    fn assert_emit_discard_done(recv: &mut RecvBuf, emit: bool) {
481        let mut buf = [0u8; 32];
482        let action = if emit {
483            RecvAction::Emit {
484                out: &mut buf.as_mut_slice(),
485            }
486        } else {
487            RecvAction::Discard { len: 32 }
488        };
489        assert_eq!(recv.emit_or_discard(action), Err(Error::Done));
490    }
491
492    #[rstest]
493    fn empty_read(#[values(true, false)] emit: bool) {
494        let mut recv =
495            RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
496        assert_eq!(recv.len, 0);
497
498        assert_emit_discard_done(&mut recv, emit);
499    }
500
501    #[rstest]
502    fn empty_stream_frame(#[values(true, false)] emit: bool) {
503        let mut recv =
504            RecvBuf::new(15, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
505        assert_eq!(recv.len, 0);
506
507        let buf = RangeBuf::from(b"hello", 0, false);
508        assert!(recv.write(buf).is_ok());
509        assert_eq!(recv.len, 5);
510        assert_eq!(recv.off, 0);
511        assert_eq!(recv.data.len(), 1);
512
513        assert_emit_discard(&mut recv, emit, 32, 5, false, None);
514
515        // Don't store non-fin empty buffer.
516        let buf = RangeBuf::from(b"", 10, false);
517        assert!(recv.write(buf).is_ok());
518        assert_eq!(recv.len, 5);
519        assert_eq!(recv.off, 5);
520        assert_eq!(recv.data.len(), 0);
521
522        // Check flow control for empty buffer.
523        let buf = RangeBuf::from(b"", 16, false);
524        assert_eq!(recv.write(buf), Err(Error::FlowControl));
525
526        // Store fin empty buffer.
527        let buf = RangeBuf::from(b"", 5, true);
528        assert!(recv.write(buf).is_ok());
529        assert_eq!(recv.len, 5);
530        assert_eq!(recv.off, 5);
531        assert_eq!(recv.data.len(), 1);
532
533        // Don't store additional fin empty buffers.
534        let buf = RangeBuf::from(b"", 5, true);
535        assert!(recv.write(buf).is_ok());
536        assert_eq!(recv.len, 5);
537        assert_eq!(recv.off, 5);
538        assert_eq!(recv.data.len(), 1);
539
540        // Don't store additional fin non-empty buffers.
541        let buf = RangeBuf::from(b"aa", 3, true);
542        assert!(recv.write(buf).is_ok());
543        assert_eq!(recv.len, 5);
544        assert_eq!(recv.off, 5);
545        assert_eq!(recv.data.len(), 1);
546
547        // Validate final size with fin empty buffers.
548        let buf = RangeBuf::from(b"", 6, true);
549        assert_eq!(recv.write(buf), Err(Error::FinalSize));
550        let buf = RangeBuf::from(b"", 4, true);
551        assert_eq!(recv.write(buf), Err(Error::FinalSize));
552
553        assert_emit_discard(&mut recv, emit, 32, 0, true, None);
554    }
555
556    #[rstest]
557    fn ordered_read(#[values(true, false)] emit: bool) {
558        let mut recv =
559            RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
560        assert_eq!(recv.len, 0);
561
562        let first = RangeBuf::from(b"hello", 0, false);
563        let second = RangeBuf::from(b"world", 5, false);
564        let third = RangeBuf::from(b"something", 10, true);
565
566        assert!(recv.write(second).is_ok());
567        assert_eq!(recv.len, 10);
568        assert_eq!(recv.off, 0);
569
570        assert_emit_discard_done(&mut recv, emit);
571
572        assert!(recv.write(third).is_ok());
573        assert_eq!(recv.len, 19);
574        assert_eq!(recv.off, 0);
575
576        assert_emit_discard_done(&mut recv, emit);
577
578        assert!(recv.write(first).is_ok());
579        assert_eq!(recv.len, 19);
580        assert_eq!(recv.off, 0);
581
582        assert_emit_discard(
583            &mut recv,
584            emit,
585            32,
586            19,
587            true,
588            Some(b"helloworldsomething"),
589        );
590        assert_eq!(recv.len, 19);
591        assert_eq!(recv.off, 19);
592
593        assert_emit_discard_done(&mut recv, emit);
594    }
595
596    /// Test shutdown behavior
597    #[rstest]
598    fn shutdown(#[values(true, false)] emit: bool) {
599        let mut recv =
600            RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
601        assert_eq!(recv.len, 0);
602
603        let first = RangeBuf::from(b"hello", 0, false);
604        let second = RangeBuf::from(b"world", 5, false);
605        let third = RangeBuf::from(b"something", 10, false);
606
607        assert!(recv.write(second).is_ok());
608        assert_eq!(recv.len, 10);
609        assert_eq!(recv.off, 0);
610
611        assert_emit_discard_done(&mut recv, emit);
612
613        // shutdown the buffer. Buffer is dropped.
614        assert_eq!(recv.shutdown(), Ok(10));
615        assert_eq!(recv.len, 10);
616        assert_eq!(recv.off, 10);
617        assert_eq!(recv.data.len(), 0);
618
619        assert_emit_discard_done(&mut recv, emit);
620
621        // subsequent writes are validated but not added to the buffer
622        assert!(recv.write(first).is_ok());
623        assert_eq!(recv.len, 10);
624        assert_eq!(recv.off, 10);
625        assert_eq!(recv.data.len(), 0);
626
627        // the max offset of received data can increase and
628        // the recv.off must increase with it
629        assert!(recv.write(third).is_ok());
630        assert_eq!(recv.len, 19);
631        assert_eq!(recv.off, 19);
632        assert_eq!(recv.data.len(), 0);
633
634        // Send a reset
635        assert_emit_discard_done(&mut recv, emit);
636        assert_eq!(
637            recv.reset(42, 123),
638            Ok(RecvBufResetReturn {
639                max_data_delta: 104,
640                consumed_flowcontrol: 104,
641            })
642        );
643        assert_eq!(recv.len, 123);
644        assert_eq!(recv.off, 123);
645        assert_eq!(recv.data.len(), 0);
646
647        assert_emit_discard_done(&mut recv, emit);
648    }
649
650    #[rstest]
651    fn split_read(#[values(true, false)] emit: bool) {
652        let mut recv =
653            RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
654        assert_eq!(recv.len, 0);
655
656        let first = RangeBuf::from(b"something", 0, false);
657        let second = RangeBuf::from(b"helloworld", 9, true);
658
659        assert!(recv.write(first).is_ok());
660        assert_eq!(recv.len, 9);
661        assert_eq!(recv.off, 0);
662
663        assert!(recv.write(second).is_ok());
664        assert_eq!(recv.len, 19);
665        assert_eq!(recv.off, 0);
666
667        assert_emit_discard(&mut recv, emit, 10, 10, false, Some(b"somethingh"));
668        assert_eq!(recv.len, 19);
669        assert_eq!(recv.off, 10);
670
671        assert_emit_discard(&mut recv, emit, 5, 5, false, Some(b"ellow"));
672        assert_eq!(recv.len, 19);
673        assert_eq!(recv.off, 15);
674
675        assert_emit_discard(&mut recv, emit, 5, 4, true, Some(b"orld"));
676        assert_eq!(recv.len, 19);
677        assert_eq!(recv.off, 19);
678    }
679
680    #[test]
681    fn split_read_incremental_buf() {
682        let mut recv =
683            RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
684        assert_eq!(recv.len, 0);
685
686        let first = RangeBuf::from(b"something", 0, false);
687        let second = RangeBuf::from(b"helloworld", 9, true);
688
689        assert!(recv.write(first).is_ok());
690        assert_eq!(recv.len, 9);
691        assert_eq!(recv.off, 0);
692
693        assert!(recv.write(second).is_ok());
694        assert_eq!(recv.len, 19);
695        assert_eq!(recv.off, 0);
696
697        let mut buf = Vec::new().limit(10);
698        assert_eq!(
699            recv.emit_or_discard(RecvAction::Emit { out: &mut buf }),
700            Ok((10, false))
701        );
702        assert_eq!(recv.len, 19);
703        assert_eq!(recv.off, 10);
704        assert_eq!(buf.get_ref().len(), 10);
705        assert_eq!(buf.get_ref().as_slice(), b"somethingh");
706
707        buf.set_limit(5);
708        assert_eq!(
709            recv.emit_or_discard(RecvAction::Emit { out: &mut buf }),
710            Ok((5, false))
711        );
712        assert_eq!(recv.len, 19);
713        assert_eq!(recv.off, 15);
714        assert_eq!(buf.get_ref().len(), 15);
715        assert_eq!(buf.get_ref().as_slice(), b"somethinghellow");
716
717        buf.set_limit(42);
718        assert_eq!(
719            recv.emit_or_discard(RecvAction::Emit { out: &mut buf }),
720            Ok((4, true))
721        );
722        assert_eq!(recv.len, 19);
723        assert_eq!(recv.off, 19);
724        assert_eq!(buf.get_ref().len(), 19);
725        assert_eq!(buf.get_ref().as_slice(), b"somethinghelloworld");
726    }
727
728    #[rstest]
729    fn incomplete_read(#[values(true, false)] emit: bool) {
730        let mut recv =
731            RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
732        assert_eq!(recv.len, 0);
733
734        let mut buf = [0u8; 32];
735
736        let first = RangeBuf::from(b"something", 0, false);
737        let second = RangeBuf::from(b"helloworld", 9, true);
738
739        assert!(recv.write(second).is_ok());
740        assert_eq!(recv.len, 19);
741        assert_eq!(recv.off, 0);
742
743        let action = if emit {
744            RecvAction::Emit {
745                out: &mut buf.as_mut_slice(),
746            }
747        } else {
748            RecvAction::Discard { len: 32 }
749        };
750        assert_eq!(recv.emit_or_discard(action), Err(Error::Done));
751
752        assert!(recv.write(first).is_ok());
753        assert_eq!(recv.len, 19);
754        assert_eq!(recv.off, 0);
755
756        assert_emit_discard(
757            &mut recv,
758            emit,
759            32,
760            19,
761            true,
762            Some(b"somethinghelloworld"),
763        );
764        assert_eq!(recv.len, 19);
765        assert_eq!(recv.off, 19);
766    }
767
768    #[rstest]
769    fn zero_len_read(#[values(true, false)] emit: bool) {
770        let mut recv =
771            RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
772        assert_eq!(recv.len, 0);
773
774        let first = RangeBuf::from(b"something", 0, false);
775        let second = RangeBuf::from(b"", 9, true);
776
777        assert!(recv.write(first).is_ok());
778        assert_eq!(recv.len, 9);
779        assert_eq!(recv.off, 0);
780        assert_eq!(recv.data.len(), 1);
781
782        assert!(recv.write(second).is_ok());
783        assert_eq!(recv.len, 9);
784        assert_eq!(recv.off, 0);
785        assert_eq!(recv.data.len(), 1);
786
787        assert_emit_discard(&mut recv, emit, 32, 9, true, Some(b"something"));
788        assert_eq!(recv.len, 9);
789        assert_eq!(recv.off, 9);
790    }
791
792    #[rstest]
793    fn past_read(#[values(true, false)] emit: bool) {
794        let mut recv =
795            RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
796        assert_eq!(recv.len, 0);
797
798        let first = RangeBuf::from(b"something", 0, false);
799        let second = RangeBuf::from(b"hello", 3, false);
800        let third = RangeBuf::from(b"ello", 4, true);
801        let fourth = RangeBuf::from(b"ello", 5, true);
802
803        assert!(recv.write(first).is_ok());
804        assert_eq!(recv.len, 9);
805        assert_eq!(recv.off, 0);
806        assert_eq!(recv.data.len(), 1);
807
808        assert_emit_discard(&mut recv, emit, 32, 9, false, Some(b"something"));
809        assert_eq!(recv.len, 9);
810        assert_eq!(recv.off, 9);
811
812        assert!(recv.write(second).is_ok());
813        assert_eq!(recv.len, 9);
814        assert_eq!(recv.off, 9);
815        assert_eq!(recv.data.len(), 0);
816
817        assert_eq!(recv.write(third), Err(Error::FinalSize));
818
819        assert!(recv.write(fourth).is_ok());
820        assert_eq!(recv.len, 9);
821        assert_eq!(recv.off, 9);
822        assert_eq!(recv.data.len(), 0);
823
824        assert_emit_discard_done(&mut recv, emit);
825    }
826
827    #[rstest]
828    fn fully_overlapping_read(#[values(true, false)] emit: bool) {
829        let mut recv =
830            RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
831        assert_eq!(recv.len, 0);
832
833        let first = RangeBuf::from(b"something", 0, false);
834        let second = RangeBuf::from(b"hello", 4, false);
835
836        assert!(recv.write(first).is_ok());
837        assert_eq!(recv.len, 9);
838        assert_eq!(recv.off, 0);
839        assert_eq!(recv.data.len(), 1);
840
841        assert!(recv.write(second).is_ok());
842        assert_eq!(recv.len, 9);
843        assert_eq!(recv.off, 0);
844        assert_eq!(recv.data.len(), 1);
845
846        assert_emit_discard(&mut recv, emit, 32, 9, false, Some(b"something"));
847        assert_eq!(recv.len, 9);
848        assert_eq!(recv.off, 9);
849        assert_eq!(recv.data.len(), 0);
850
851        assert_emit_discard_done(&mut recv, emit);
852    }
853
854    #[rstest]
855    fn fully_overlapping_read2(#[values(true, false)] emit: bool) {
856        let mut recv =
857            RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
858        assert_eq!(recv.len, 0);
859
860        let first = RangeBuf::from(b"something", 0, false);
861        let second = RangeBuf::from(b"hello", 4, false);
862
863        assert!(recv.write(second).is_ok());
864        assert_eq!(recv.len, 9);
865        assert_eq!(recv.off, 0);
866        assert_eq!(recv.data.len(), 1);
867
868        assert!(recv.write(first).is_ok());
869        assert_eq!(recv.len, 9);
870        assert_eq!(recv.off, 0);
871        assert_eq!(recv.data.len(), 2);
872
873        assert_emit_discard(&mut recv, emit, 32, 9, false, Some(b"somehello"));
874        assert_eq!(recv.len, 9);
875        assert_eq!(recv.off, 9);
876        assert_eq!(recv.data.len(), 0);
877
878        assert_emit_discard_done(&mut recv, emit);
879    }
880
881    #[rstest]
882    fn fully_overlapping_read3(#[values(true, false)] emit: bool) {
883        let mut recv =
884            RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
885        assert_eq!(recv.len, 0);
886
887        let first = RangeBuf::from(b"something", 0, false);
888        let second = RangeBuf::from(b"hello", 3, false);
889
890        assert!(recv.write(second).is_ok());
891        assert_eq!(recv.len, 8);
892        assert_eq!(recv.off, 0);
893        assert_eq!(recv.data.len(), 1);
894
895        assert!(recv.write(first).is_ok());
896        assert_eq!(recv.len, 9);
897        assert_eq!(recv.off, 0);
898        assert_eq!(recv.data.len(), 3);
899
900        assert_emit_discard(&mut recv, emit, 32, 9, false, Some(b"somhellog"));
901        assert_eq!(recv.len, 9);
902        assert_eq!(recv.off, 9);
903        assert_eq!(recv.data.len(), 0);
904
905        assert_emit_discard_done(&mut recv, emit);
906    }
907
908    #[rstest]
909    fn fully_overlapping_read_multi(#[values(true, false)] emit: bool) {
910        let mut recv =
911            RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
912        assert_eq!(recv.len, 0);
913
914        let first = RangeBuf::from(b"somethingsomething", 0, false);
915        let second = RangeBuf::from(b"hello", 3, false);
916        let third = RangeBuf::from(b"hello", 12, false);
917
918        assert!(recv.write(second).is_ok());
919        assert_eq!(recv.len, 8);
920        assert_eq!(recv.off, 0);
921        assert_eq!(recv.data.len(), 1);
922
923        assert!(recv.write(third).is_ok());
924        assert_eq!(recv.len, 17);
925        assert_eq!(recv.off, 0);
926        assert_eq!(recv.data.len(), 2);
927
928        assert!(recv.write(first).is_ok());
929        assert_eq!(recv.len, 18);
930        assert_eq!(recv.off, 0);
931        assert_eq!(recv.data.len(), 5);
932
933        assert_emit_discard(
934            &mut recv,
935            emit,
936            32,
937            18,
938            false,
939            Some(b"somhellogsomhellog"),
940        );
941        assert_eq!(recv.len, 18);
942        assert_eq!(recv.off, 18);
943        assert_eq!(recv.data.len(), 0);
944
945        assert_emit_discard_done(&mut recv, emit);
946    }
947
948    #[rstest]
949    fn overlapping_start_read(#[values(true, false)] emit: bool) {
950        let mut recv =
951            RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
952        assert_eq!(recv.len, 0);
953
954        let first = RangeBuf::from(b"something", 0, false);
955        let second = RangeBuf::from(b"hello", 8, true);
956
957        assert!(recv.write(first).is_ok());
958        assert_eq!(recv.len, 9);
959        assert_eq!(recv.off, 0);
960        assert_eq!(recv.data.len(), 1);
961
962        assert!(recv.write(second).is_ok());
963        assert_eq!(recv.len, 13);
964        assert_eq!(recv.off, 0);
965        assert_eq!(recv.data.len(), 2);
966
967        assert_emit_discard(
968            &mut recv,
969            emit,
970            32,
971            13,
972            true,
973            Some(b"somethingello"),
974        );
975
976        assert_eq!(recv.len, 13);
977        assert_eq!(recv.off, 13);
978
979        assert_emit_discard_done(&mut recv, emit);
980    }
981
982    #[rstest]
983    fn overlapping_end_read(#[values(true, false)] emit: bool) {
984        let mut recv =
985            RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
986        assert_eq!(recv.len, 0);
987
988        let first = RangeBuf::from(b"hello", 0, false);
989        let second = RangeBuf::from(b"something", 3, true);
990
991        assert!(recv.write(second).is_ok());
992        assert_eq!(recv.len, 12);
993        assert_eq!(recv.off, 0);
994        assert_eq!(recv.data.len(), 1);
995
996        assert!(recv.write(first).is_ok());
997        assert_eq!(recv.len, 12);
998        assert_eq!(recv.off, 0);
999        assert_eq!(recv.data.len(), 2);
1000
1001        assert_emit_discard(&mut recv, emit, 32, 12, true, Some(b"helsomething"));
1002        assert_eq!(recv.len, 12);
1003        assert_eq!(recv.off, 12);
1004
1005        assert_emit_discard_done(&mut recv, emit);
1006    }
1007
1008    #[rstest]
1009    fn overlapping_end_twice_read(#[values(true, false)] emit: bool) {
1010        let mut recv =
1011            RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
1012        assert_eq!(recv.len, 0);
1013
1014        let first = RangeBuf::from(b"he", 0, false);
1015        let second = RangeBuf::from(b"ow", 4, false);
1016        let third = RangeBuf::from(b"rl", 7, false);
1017        let fourth = RangeBuf::from(b"helloworld", 0, true);
1018
1019        assert!(recv.write(third).is_ok());
1020        assert_eq!(recv.len, 9);
1021        assert_eq!(recv.off, 0);
1022        assert_eq!(recv.data.len(), 1);
1023
1024        assert!(recv.write(second).is_ok());
1025        assert_eq!(recv.len, 9);
1026        assert_eq!(recv.off, 0);
1027        assert_eq!(recv.data.len(), 2);
1028
1029        assert!(recv.write(first).is_ok());
1030        assert_eq!(recv.len, 9);
1031        assert_eq!(recv.off, 0);
1032        assert_eq!(recv.data.len(), 3);
1033
1034        assert!(recv.write(fourth).is_ok());
1035        assert_eq!(recv.len, 10);
1036        assert_eq!(recv.off, 0);
1037        assert_eq!(recv.data.len(), 6);
1038
1039        assert_emit_discard(&mut recv, emit, 32, 10, true, Some(b"helloworld"));
1040        assert_eq!(recv.len, 10);
1041        assert_eq!(recv.off, 10);
1042
1043        assert_emit_discard_done(&mut recv, emit);
1044    }
1045
1046    #[rstest]
1047    fn overlapping_end_twice_and_contained_read(
1048        #[values(true, false)] emit: bool,
1049    ) {
1050        let mut recv =
1051            RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
1052        assert_eq!(recv.len, 0);
1053
1054        let first = RangeBuf::from(b"hellow", 0, false);
1055        let second = RangeBuf::from(b"barfoo", 10, true);
1056        let third = RangeBuf::from(b"rl", 7, false);
1057        let fourth = RangeBuf::from(b"elloworldbarfoo", 1, true);
1058
1059        assert!(recv.write(third).is_ok());
1060        assert_eq!(recv.len, 9);
1061        assert_eq!(recv.off, 0);
1062        assert_eq!(recv.data.len(), 1);
1063
1064        assert!(recv.write(second).is_ok());
1065        assert_eq!(recv.len, 16);
1066        assert_eq!(recv.off, 0);
1067        assert_eq!(recv.data.len(), 2);
1068
1069        assert!(recv.write(first).is_ok());
1070        assert_eq!(recv.len, 16);
1071        assert_eq!(recv.off, 0);
1072        assert_eq!(recv.data.len(), 3);
1073
1074        assert!(recv.write(fourth).is_ok());
1075        assert_eq!(recv.len, 16);
1076        assert_eq!(recv.off, 0);
1077        assert_eq!(recv.data.len(), 5);
1078
1079        assert_emit_discard(
1080            &mut recv,
1081            emit,
1082            32,
1083            16,
1084            true,
1085            Some(b"helloworldbarfoo"),
1086        );
1087        assert_eq!(recv.len, 16);
1088        assert_eq!(recv.off, 16);
1089
1090        assert_emit_discard_done(&mut recv, emit);
1091    }
1092
1093    #[rstest]
1094    fn partially_multi_overlapping_reordered_read(
1095        #[values(true, false)] emit: bool,
1096    ) {
1097        let mut recv =
1098            RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
1099        assert_eq!(recv.len, 0);
1100
1101        let first = RangeBuf::from(b"hello", 8, false);
1102        let second = RangeBuf::from(b"something", 0, false);
1103        let third = RangeBuf::from(b"moar", 11, true);
1104
1105        assert!(recv.write(first).is_ok());
1106        assert_eq!(recv.len, 13);
1107        assert_eq!(recv.off, 0);
1108        assert_eq!(recv.data.len(), 1);
1109
1110        assert!(recv.write(second).is_ok());
1111        assert_eq!(recv.len, 13);
1112        assert_eq!(recv.off, 0);
1113        assert_eq!(recv.data.len(), 2);
1114
1115        assert!(recv.write(third).is_ok());
1116        assert_eq!(recv.len, 15);
1117        assert_eq!(recv.off, 0);
1118        assert_eq!(recv.data.len(), 3);
1119
1120        assert_emit_discard(
1121            &mut recv,
1122            emit,
1123            32,
1124            15,
1125            true,
1126            Some(b"somethinhelloar"),
1127        );
1128        assert_eq!(recv.len, 15);
1129        assert_eq!(recv.off, 15);
1130        assert_eq!(recv.data.len(), 0);
1131
1132        assert_emit_discard_done(&mut recv, emit);
1133    }
1134
1135    #[rstest]
1136    fn partially_multi_overlapping_reordered_read2(
1137        #[values(true, false)] emit: bool,
1138    ) {
1139        let mut recv =
1140            RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
1141        assert_eq!(recv.len, 0);
1142
1143        let first = RangeBuf::from(b"aaa", 0, false);
1144        let second = RangeBuf::from(b"bbb", 2, false);
1145        let third = RangeBuf::from(b"ccc", 4, false);
1146        let fourth = RangeBuf::from(b"ddd", 6, false);
1147        let fifth = RangeBuf::from(b"eee", 9, false);
1148        let sixth = RangeBuf::from(b"fff", 11, false);
1149
1150        assert!(recv.write(second).is_ok());
1151        assert_eq!(recv.len, 5);
1152        assert_eq!(recv.off, 0);
1153        assert_eq!(recv.data.len(), 1);
1154
1155        assert!(recv.write(fourth).is_ok());
1156        assert_eq!(recv.len, 9);
1157        assert_eq!(recv.off, 0);
1158        assert_eq!(recv.data.len(), 2);
1159
1160        assert!(recv.write(third).is_ok());
1161        assert_eq!(recv.len, 9);
1162        assert_eq!(recv.off, 0);
1163        assert_eq!(recv.data.len(), 3);
1164
1165        assert!(recv.write(first).is_ok());
1166        assert_eq!(recv.len, 9);
1167        assert_eq!(recv.off, 0);
1168        assert_eq!(recv.data.len(), 4);
1169
1170        assert!(recv.write(sixth).is_ok());
1171        assert_eq!(recv.len, 14);
1172        assert_eq!(recv.off, 0);
1173        assert_eq!(recv.data.len(), 5);
1174
1175        assert!(recv.write(fifth).is_ok());
1176        assert_eq!(recv.len, 14);
1177        assert_eq!(recv.off, 0);
1178        assert_eq!(recv.data.len(), 6);
1179
1180        assert_emit_discard(
1181            &mut recv,
1182            emit,
1183            32,
1184            14,
1185            false,
1186            Some(b"aabbbcdddeefff"),
1187        );
1188        assert_eq!(recv.len, 14);
1189        assert_eq!(recv.off, 14);
1190        assert_eq!(recv.data.len(), 0);
1191
1192        assert_emit_discard_done(&mut recv, emit);
1193    }
1194
1195    #[test]
1196    fn mixed_read_actions() {
1197        let mut recv =
1198            RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
1199        assert_eq!(recv.len, 0);
1200
1201        let first = RangeBuf::from(b"hello", 0, false);
1202        let second = RangeBuf::from(b"world", 5, false);
1203        let third = RangeBuf::from(b"something", 10, true);
1204
1205        assert!(recv.write(second).is_ok());
1206        assert_eq!(recv.len, 10);
1207        assert_eq!(recv.off, 0);
1208
1209        assert_emit_discard_done(&mut recv, true);
1210        assert_emit_discard_done(&mut recv, false);
1211
1212        assert!(recv.write(third).is_ok());
1213        assert_eq!(recv.len, 19);
1214        assert_eq!(recv.off, 0);
1215
1216        assert_emit_discard_done(&mut recv, true);
1217        assert_emit_discard_done(&mut recv, false);
1218
1219        assert!(recv.write(first).is_ok());
1220        assert_eq!(recv.len, 19);
1221        assert_eq!(recv.off, 0);
1222
1223        assert_emit_discard(&mut recv, true, 5, 5, false, Some(b"hello"));
1224        assert_eq!(recv.len, 19);
1225        assert_eq!(recv.off, 5);
1226
1227        assert_emit_discard(&mut recv, false, 5, 5, false, None);
1228        assert_eq!(recv.len, 19);
1229        assert_eq!(recv.off, 10);
1230
1231        assert_emit_discard(&mut recv, true, 9, 9, true, Some(b"something"));
1232        assert_eq!(recv.len, 19);
1233        assert_eq!(recv.off, 19);
1234
1235        assert_emit_discard_done(&mut recv, true);
1236        assert_emit_discard_done(&mut recv, false);
1237    }
1238}