1use std::cmp;
28
29use std::collections::BTreeMap;
30use std::collections::VecDeque;
31
32use std::time::Duration;
33use std::time::Instant;
34
35use crate::stream::RecvAction;
36use crate::stream::RecvBufResetReturn;
37use crate::Error;
38use crate::Result;
39
40use crate::flowcontrol;
41
42use crate::range_buf::RangeBuf;
43
44#[derive(Debug, Default)]
50pub struct RecvBuf {
51 data: BTreeMap<u64, RangeBuf>,
54
55 off: u64,
57
58 len: u64,
60
61 flow_control: flowcontrol::FlowControl,
63
64 fin_off: Option<u64>,
66
67 error: Option<u64>,
69
70 drain: bool,
72}
73
74impl RecvBuf {
75 pub fn new(max_data: u64, initial_window: u64, max_window: u64) -> RecvBuf {
77 RecvBuf {
78 flow_control: flowcontrol::FlowControl::new(
79 max_data,
80 initial_window,
81 max_window,
82 ),
83 ..RecvBuf::default()
84 }
85 }
86
87 pub fn write(&mut self, buf: RangeBuf) -> Result<()> {
93 if buf.max_off() > self.max_data() {
94 return Err(Error::FlowControl);
95 }
96
97 if let Some(fin_off) = self.fin_off {
98 if buf.max_off() > fin_off {
100 return Err(Error::FinalSize);
101 }
102
103 if buf.fin() && fin_off != buf.max_off() {
105 return Err(Error::FinalSize);
106 }
107 }
108
109 if buf.fin() && buf.max_off() < self.len {
111 return Err(Error::FinalSize);
112 }
113
114 if self.fin_off.is_some() && buf.is_empty() {
117 return Ok(());
118 }
119
120 if buf.fin() {
121 self.fin_off = Some(buf.max_off());
122 }
123
124 if !buf.fin() && buf.is_empty() {
126 return Ok(());
127 }
128
129 if self.off >= buf.max_off() {
132 if !buf.is_empty() {
138 return Ok(());
139 }
140 }
141
142 let mut tmp_bufs = VecDeque::with_capacity(2);
143 tmp_bufs.push_back(buf);
144
145 'tmp: while let Some(mut buf) = tmp_bufs.pop_front() {
146 if self.off_front() > buf.off() {
152 buf = buf.split_off((self.off_front() - buf.off()) as usize);
153 }
154
155 if buf.off() < self.max_off() || buf.is_empty() {
162 for (_, b) in self.data.range(buf.off()..) {
163 let off = buf.off();
164
165 if b.off() > buf.max_off() {
167 break;
168 }
169
170 if off >= b.off() && buf.max_off() <= b.max_off() {
172 continue 'tmp;
173 }
174
175 if off >= b.off() && off < b.max_off() {
177 buf = buf.split_off((b.max_off() - off) as usize);
178 }
179
180 if off < b.off() && buf.max_off() > b.off() {
182 tmp_bufs
183 .push_back(buf.split_off((b.off() - off) as usize));
184 }
185 }
186 }
187
188 self.len = cmp::max(self.len, buf.max_off());
189
190 if !self.drain {
191 self.data.insert(buf.max_off(), buf);
192 } else {
193 self.off = self.len;
195 }
196 }
197
198 Ok(())
199 }
200
201 #[inline]
212 pub fn emit(&mut self, mut out: &mut [u8]) -> Result<(usize, bool)> {
213 self.emit_or_discard(RecvAction::Emit { out: &mut out })
214 }
215
216 pub fn emit_or_discard<B: bytes::BufMut>(
231 &mut self, mut action: RecvAction<B>,
232 ) -> Result<(usize, bool)> {
233 let mut len = 0;
234 let mut cap = match &action {
235 RecvAction::Emit { out } => out.remaining_mut(),
236 RecvAction::Discard { len } => *len,
237 };
238
239 if !self.ready() {
240 return Err(Error::Done);
241 }
242
243 if let Some(e) = self.error {
246 self.data.clear();
247 return Err(Error::StreamReset(e));
248 }
249
250 while cap > 0 && self.ready() {
251 let mut entry = match self.data.first_entry() {
252 Some(entry) => entry,
253 None => break,
254 };
255
256 let buf = entry.get_mut();
257
258 let buf_len = cmp::min(buf.len(), cap);
259
260 if let RecvAction::Emit { ref mut out } = action {
262 debug_assert!(
267 cap <= out.remaining_mut(),
268 "We updated `cap` incorrectly"
269 );
270 out.put_slice(&buf[..buf_len])
271 }
272
273 self.off += buf_len as u64;
274
275 len += buf_len;
276 cap -= buf_len;
277
278 if buf_len < buf.len() {
279 buf.consume(buf_len);
280
281 break;
283 }
284
285 entry.remove();
286 }
287
288 self.flow_control.add_consumed(len as u64);
290
291 Ok((len, self.is_fin()))
292 }
293
294 pub fn reset(
296 &mut self, error_code: u64, final_size: u64,
297 ) -> Result<RecvBufResetReturn> {
298 if let Some(fin_off) = self.fin_off {
300 if fin_off != final_size {
301 return Err(Error::FinalSize);
302 }
303 }
304
305 if final_size < self.len {
307 return Err(Error::FinalSize);
308 }
309
310 if self.error.is_some() {
311 return Ok(RecvBufResetReturn::zero());
313 }
314
315 let result = RecvBufResetReturn {
318 max_data_delta: final_size - self.len,
319 consumed_flowcontrol: final_size - self.off,
320 };
321
322 self.error = Some(error_code);
323
324 self.off = final_size;
326
327 self.data.clear();
328
329 let buf = RangeBuf::from(b"", final_size, true);
332 self.write(buf)?;
333
334 Ok(result)
335 }
336
337 pub fn update_max_data(&mut self, now: Instant) {
339 self.flow_control.update_max_data(now);
340 }
341
342 pub fn max_data_next(&mut self) -> u64 {
344 self.flow_control.max_data_next()
345 }
346
347 pub fn max_data(&self) -> u64 {
349 self.flow_control.max_data()
350 }
351
352 pub fn window(&self) -> u64 {
354 self.flow_control.window()
355 }
356
357 pub fn autotune_window(&mut self, now: Instant, rtt: Duration) {
359 self.flow_control.autotune_window(now, rtt);
360 }
361
362 pub fn shutdown(&mut self) -> Result<u64> {
366 if self.drain {
367 return Err(Error::Done);
368 }
369
370 self.drain = true;
371
372 self.data.clear();
373
374 let consumed = self.max_off() - self.off;
375 self.off = self.max_off();
376
377 Ok(consumed)
378 }
379
380 pub fn off_front(&self) -> u64 {
382 self.off
383 }
384
385 pub fn almost_full(&self) -> bool {
387 self.fin_off.is_none() && self.flow_control.should_update_max_data()
388 }
389
390 pub fn max_off(&self) -> u64 {
392 self.len
393 }
394
395 pub fn is_fin(&self) -> bool {
400 if self.fin_off == Some(self.off) {
401 return true;
402 }
403
404 false
405 }
406
407 pub fn is_draining(&self) -> bool {
409 self.drain
410 }
411
412 pub fn ready(&self) -> bool {
414 let (_, buf) = match self.data.first_key_value() {
415 Some(v) => v,
416 None => return false,
417 };
418
419 buf.off() == self.off
420 }
421
422 #[cfg(test)]
423 pub(crate) fn flow_control_for_tests(&self) -> &flowcontrol::FlowControl {
424 &self.flow_control
425 }
426}
427
428#[cfg(test)]
429mod tests {
430 use super::*;
431 use crate::stream::DEFAULT_STREAM_WINDOW;
432 use bytes::BufMut as _;
433 use rstest::rstest;
434
435 fn assert_emit_discard(
453 recv: &mut RecvBuf, emit: bool, target_len: usize, result_len: usize,
454 is_fin: bool, test_bytes: Option<&[u8]>,
455 ) {
456 let mut buf = Vec::<u8>::with_capacity(512).limit(target_len);
457 let action = if emit {
458 RecvAction::Emit { out: &mut buf }
459 } else {
460 RecvAction::Discard { len: target_len }
461 };
462
463 let (read, fin) = recv.emit_or_discard(action).unwrap();
464
465 let buf = buf.into_inner();
466 if emit {
467 assert_eq!(buf.len(), read);
468 if let Some(v) = test_bytes {
469 assert_eq!(&buf, v);
470 }
471 }
472
473 assert_eq!(read, result_len);
474 assert_eq!(is_fin, fin);
475 }
476
477 fn assert_emit_discard_done(recv: &mut RecvBuf, emit: bool) {
479 let mut buf = [0u8; 32];
480 let action = if emit {
481 RecvAction::Emit {
482 out: &mut buf.as_mut_slice(),
483 }
484 } else {
485 RecvAction::Discard { len: 32 }
486 };
487 assert_eq!(recv.emit_or_discard(action), Err(Error::Done));
488 }
489
490 #[rstest]
491 fn empty_read(#[values(true, false)] emit: bool) {
492 let mut recv =
493 RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
494 assert_eq!(recv.len, 0);
495
496 assert_emit_discard_done(&mut recv, emit);
497 }
498
499 #[rstest]
500 fn empty_stream_frame(#[values(true, false)] emit: bool) {
501 let mut recv =
502 RecvBuf::new(15, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
503 assert_eq!(recv.len, 0);
504
505 let buf = RangeBuf::from(b"hello", 0, false);
506 assert!(recv.write(buf).is_ok());
507 assert_eq!(recv.len, 5);
508 assert_eq!(recv.off, 0);
509 assert_eq!(recv.data.len(), 1);
510
511 assert_emit_discard(&mut recv, emit, 32, 5, false, None);
512
513 let buf = RangeBuf::from(b"", 10, false);
515 assert!(recv.write(buf).is_ok());
516 assert_eq!(recv.len, 5);
517 assert_eq!(recv.off, 5);
518 assert_eq!(recv.data.len(), 0);
519
520 let buf = RangeBuf::from(b"", 16, false);
522 assert_eq!(recv.write(buf), Err(Error::FlowControl));
523
524 let buf = RangeBuf::from(b"", 5, true);
526 assert!(recv.write(buf).is_ok());
527 assert_eq!(recv.len, 5);
528 assert_eq!(recv.off, 5);
529 assert_eq!(recv.data.len(), 1);
530
531 let buf = RangeBuf::from(b"", 5, true);
533 assert!(recv.write(buf).is_ok());
534 assert_eq!(recv.len, 5);
535 assert_eq!(recv.off, 5);
536 assert_eq!(recv.data.len(), 1);
537
538 let buf = RangeBuf::from(b"aa", 3, true);
540 assert!(recv.write(buf).is_ok());
541 assert_eq!(recv.len, 5);
542 assert_eq!(recv.off, 5);
543 assert_eq!(recv.data.len(), 1);
544
545 let buf = RangeBuf::from(b"", 6, true);
547 assert_eq!(recv.write(buf), Err(Error::FinalSize));
548 let buf = RangeBuf::from(b"", 4, true);
549 assert_eq!(recv.write(buf), Err(Error::FinalSize));
550
551 assert_emit_discard(&mut recv, emit, 32, 0, true, None);
552 }
553
554 #[rstest]
555 fn ordered_read(#[values(true, false)] emit: bool) {
556 let mut recv =
557 RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
558 assert_eq!(recv.len, 0);
559
560 let first = RangeBuf::from(b"hello", 0, false);
561 let second = RangeBuf::from(b"world", 5, false);
562 let third = RangeBuf::from(b"something", 10, true);
563
564 assert!(recv.write(second).is_ok());
565 assert_eq!(recv.len, 10);
566 assert_eq!(recv.off, 0);
567
568 assert_emit_discard_done(&mut recv, emit);
569
570 assert!(recv.write(third).is_ok());
571 assert_eq!(recv.len, 19);
572 assert_eq!(recv.off, 0);
573
574 assert_emit_discard_done(&mut recv, emit);
575
576 assert!(recv.write(first).is_ok());
577 assert_eq!(recv.len, 19);
578 assert_eq!(recv.off, 0);
579
580 assert_emit_discard(
581 &mut recv,
582 emit,
583 32,
584 19,
585 true,
586 Some(b"helloworldsomething"),
587 );
588 assert_eq!(recv.len, 19);
589 assert_eq!(recv.off, 19);
590
591 assert_emit_discard_done(&mut recv, emit);
592 }
593
594 #[rstest]
596 fn shutdown(#[values(true, false)] emit: bool) {
597 let mut recv =
598 RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
599 assert_eq!(recv.len, 0);
600
601 let first = RangeBuf::from(b"hello", 0, false);
602 let second = RangeBuf::from(b"world", 5, false);
603 let third = RangeBuf::from(b"something", 10, false);
604
605 assert!(recv.write(second).is_ok());
606 assert_eq!(recv.len, 10);
607 assert_eq!(recv.off, 0);
608
609 assert_emit_discard_done(&mut recv, emit);
610
611 assert_eq!(recv.shutdown(), Ok(10));
613 assert_eq!(recv.len, 10);
614 assert_eq!(recv.off, 10);
615 assert_eq!(recv.data.len(), 0);
616
617 assert_emit_discard_done(&mut recv, emit);
618
619 assert!(recv.write(first).is_ok());
621 assert_eq!(recv.len, 10);
622 assert_eq!(recv.off, 10);
623 assert_eq!(recv.data.len(), 0);
624
625 assert!(recv.write(third).is_ok());
628 assert_eq!(recv.len, 19);
629 assert_eq!(recv.off, 19);
630 assert_eq!(recv.data.len(), 0);
631
632 assert_emit_discard_done(&mut recv, emit);
634 assert_eq!(
635 recv.reset(42, 123),
636 Ok(RecvBufResetReturn {
637 max_data_delta: 104,
638 consumed_flowcontrol: 104,
639 })
640 );
641 assert_eq!(recv.len, 123);
642 assert_eq!(recv.off, 123);
643 assert_eq!(recv.data.len(), 0);
644
645 assert_emit_discard_done(&mut recv, emit);
646 }
647
648 #[rstest]
649 fn split_read(#[values(true, false)] emit: bool) {
650 let mut recv =
651 RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
652 assert_eq!(recv.len, 0);
653
654 let first = RangeBuf::from(b"something", 0, false);
655 let second = RangeBuf::from(b"helloworld", 9, true);
656
657 assert!(recv.write(first).is_ok());
658 assert_eq!(recv.len, 9);
659 assert_eq!(recv.off, 0);
660
661 assert!(recv.write(second).is_ok());
662 assert_eq!(recv.len, 19);
663 assert_eq!(recv.off, 0);
664
665 assert_emit_discard(&mut recv, emit, 10, 10, false, Some(b"somethingh"));
666 assert_eq!(recv.len, 19);
667 assert_eq!(recv.off, 10);
668
669 assert_emit_discard(&mut recv, emit, 5, 5, false, Some(b"ellow"));
670 assert_eq!(recv.len, 19);
671 assert_eq!(recv.off, 15);
672
673 assert_emit_discard(&mut recv, emit, 5, 4, true, Some(b"orld"));
674 assert_eq!(recv.len, 19);
675 assert_eq!(recv.off, 19);
676 }
677
678 #[test]
679 fn split_read_incremental_buf() {
680 let mut recv =
681 RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
682 assert_eq!(recv.len, 0);
683
684 let first = RangeBuf::from(b"something", 0, false);
685 let second = RangeBuf::from(b"helloworld", 9, true);
686
687 assert!(recv.write(first).is_ok());
688 assert_eq!(recv.len, 9);
689 assert_eq!(recv.off, 0);
690
691 assert!(recv.write(second).is_ok());
692 assert_eq!(recv.len, 19);
693 assert_eq!(recv.off, 0);
694
695 let mut buf = Vec::new().limit(10);
696 assert_eq!(
697 recv.emit_or_discard(RecvAction::Emit { out: &mut buf }),
698 Ok((10, false))
699 );
700 assert_eq!(recv.len, 19);
701 assert_eq!(recv.off, 10);
702 assert_eq!(buf.get_ref().len(), 10);
703 assert_eq!(buf.get_ref().as_slice(), b"somethingh");
704
705 buf.set_limit(5);
706 assert_eq!(
707 recv.emit_or_discard(RecvAction::Emit { out: &mut buf }),
708 Ok((5, false))
709 );
710 assert_eq!(recv.len, 19);
711 assert_eq!(recv.off, 15);
712 assert_eq!(buf.get_ref().len(), 15);
713 assert_eq!(buf.get_ref().as_slice(), b"somethinghellow");
714
715 buf.set_limit(42);
716 assert_eq!(
717 recv.emit_or_discard(RecvAction::Emit { out: &mut buf }),
718 Ok((4, true))
719 );
720 assert_eq!(recv.len, 19);
721 assert_eq!(recv.off, 19);
722 assert_eq!(buf.get_ref().len(), 19);
723 assert_eq!(buf.get_ref().as_slice(), b"somethinghelloworld");
724 }
725
726 #[rstest]
727 fn incomplete_read(#[values(true, false)] emit: bool) {
728 let mut recv =
729 RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
730 assert_eq!(recv.len, 0);
731
732 let mut buf = [0u8; 32];
733
734 let first = RangeBuf::from(b"something", 0, false);
735 let second = RangeBuf::from(b"helloworld", 9, true);
736
737 assert!(recv.write(second).is_ok());
738 assert_eq!(recv.len, 19);
739 assert_eq!(recv.off, 0);
740
741 let action = if emit {
742 RecvAction::Emit {
743 out: &mut buf.as_mut_slice(),
744 }
745 } else {
746 RecvAction::Discard { len: 32 }
747 };
748 assert_eq!(recv.emit_or_discard(action), Err(Error::Done));
749
750 assert!(recv.write(first).is_ok());
751 assert_eq!(recv.len, 19);
752 assert_eq!(recv.off, 0);
753
754 assert_emit_discard(
755 &mut recv,
756 emit,
757 32,
758 19,
759 true,
760 Some(b"somethinghelloworld"),
761 );
762 assert_eq!(recv.len, 19);
763 assert_eq!(recv.off, 19);
764 }
765
766 #[rstest]
767 fn zero_len_read(#[values(true, false)] emit: bool) {
768 let mut recv =
769 RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
770 assert_eq!(recv.len, 0);
771
772 let first = RangeBuf::from(b"something", 0, false);
773 let second = RangeBuf::from(b"", 9, true);
774
775 assert!(recv.write(first).is_ok());
776 assert_eq!(recv.len, 9);
777 assert_eq!(recv.off, 0);
778 assert_eq!(recv.data.len(), 1);
779
780 assert!(recv.write(second).is_ok());
781 assert_eq!(recv.len, 9);
782 assert_eq!(recv.off, 0);
783 assert_eq!(recv.data.len(), 1);
784
785 assert_emit_discard(&mut recv, emit, 32, 9, true, Some(b"something"));
786 assert_eq!(recv.len, 9);
787 assert_eq!(recv.off, 9);
788 }
789
790 #[rstest]
791 fn past_read(#[values(true, false)] emit: bool) {
792 let mut recv =
793 RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
794 assert_eq!(recv.len, 0);
795
796 let first = RangeBuf::from(b"something", 0, false);
797 let second = RangeBuf::from(b"hello", 3, false);
798 let third = RangeBuf::from(b"ello", 4, true);
799 let fourth = RangeBuf::from(b"ello", 5, true);
800
801 assert!(recv.write(first).is_ok());
802 assert_eq!(recv.len, 9);
803 assert_eq!(recv.off, 0);
804 assert_eq!(recv.data.len(), 1);
805
806 assert_emit_discard(&mut recv, emit, 32, 9, false, Some(b"something"));
807 assert_eq!(recv.len, 9);
808 assert_eq!(recv.off, 9);
809
810 assert!(recv.write(second).is_ok());
811 assert_eq!(recv.len, 9);
812 assert_eq!(recv.off, 9);
813 assert_eq!(recv.data.len(), 0);
814
815 assert_eq!(recv.write(third), Err(Error::FinalSize));
816
817 assert!(recv.write(fourth).is_ok());
818 assert_eq!(recv.len, 9);
819 assert_eq!(recv.off, 9);
820 assert_eq!(recv.data.len(), 0);
821
822 assert_emit_discard_done(&mut recv, emit);
823 }
824
825 #[rstest]
826 fn fully_overlapping_read(#[values(true, false)] emit: bool) {
827 let mut recv =
828 RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
829 assert_eq!(recv.len, 0);
830
831 let first = RangeBuf::from(b"something", 0, false);
832 let second = RangeBuf::from(b"hello", 4, false);
833
834 assert!(recv.write(first).is_ok());
835 assert_eq!(recv.len, 9);
836 assert_eq!(recv.off, 0);
837 assert_eq!(recv.data.len(), 1);
838
839 assert!(recv.write(second).is_ok());
840 assert_eq!(recv.len, 9);
841 assert_eq!(recv.off, 0);
842 assert_eq!(recv.data.len(), 1);
843
844 assert_emit_discard(&mut recv, emit, 32, 9, false, Some(b"something"));
845 assert_eq!(recv.len, 9);
846 assert_eq!(recv.off, 9);
847 assert_eq!(recv.data.len(), 0);
848
849 assert_emit_discard_done(&mut recv, emit);
850 }
851
852 #[rstest]
853 fn fully_overlapping_read2(#[values(true, false)] emit: bool) {
854 let mut recv =
855 RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
856 assert_eq!(recv.len, 0);
857
858 let first = RangeBuf::from(b"something", 0, false);
859 let second = RangeBuf::from(b"hello", 4, false);
860
861 assert!(recv.write(second).is_ok());
862 assert_eq!(recv.len, 9);
863 assert_eq!(recv.off, 0);
864 assert_eq!(recv.data.len(), 1);
865
866 assert!(recv.write(first).is_ok());
867 assert_eq!(recv.len, 9);
868 assert_eq!(recv.off, 0);
869 assert_eq!(recv.data.len(), 2);
870
871 assert_emit_discard(&mut recv, emit, 32, 9, false, Some(b"somehello"));
872 assert_eq!(recv.len, 9);
873 assert_eq!(recv.off, 9);
874 assert_eq!(recv.data.len(), 0);
875
876 assert_emit_discard_done(&mut recv, emit);
877 }
878
879 #[rstest]
880 fn fully_overlapping_read3(#[values(true, false)] emit: bool) {
881 let mut recv =
882 RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
883 assert_eq!(recv.len, 0);
884
885 let first = RangeBuf::from(b"something", 0, false);
886 let second = RangeBuf::from(b"hello", 3, false);
887
888 assert!(recv.write(second).is_ok());
889 assert_eq!(recv.len, 8);
890 assert_eq!(recv.off, 0);
891 assert_eq!(recv.data.len(), 1);
892
893 assert!(recv.write(first).is_ok());
894 assert_eq!(recv.len, 9);
895 assert_eq!(recv.off, 0);
896 assert_eq!(recv.data.len(), 3);
897
898 assert_emit_discard(&mut recv, emit, 32, 9, false, Some(b"somhellog"));
899 assert_eq!(recv.len, 9);
900 assert_eq!(recv.off, 9);
901 assert_eq!(recv.data.len(), 0);
902
903 assert_emit_discard_done(&mut recv, emit);
904 }
905
906 #[rstest]
907 fn fully_overlapping_read_multi(#[values(true, false)] emit: bool) {
908 let mut recv =
909 RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
910 assert_eq!(recv.len, 0);
911
912 let first = RangeBuf::from(b"somethingsomething", 0, false);
913 let second = RangeBuf::from(b"hello", 3, false);
914 let third = RangeBuf::from(b"hello", 12, false);
915
916 assert!(recv.write(second).is_ok());
917 assert_eq!(recv.len, 8);
918 assert_eq!(recv.off, 0);
919 assert_eq!(recv.data.len(), 1);
920
921 assert!(recv.write(third).is_ok());
922 assert_eq!(recv.len, 17);
923 assert_eq!(recv.off, 0);
924 assert_eq!(recv.data.len(), 2);
925
926 assert!(recv.write(first).is_ok());
927 assert_eq!(recv.len, 18);
928 assert_eq!(recv.off, 0);
929 assert_eq!(recv.data.len(), 5);
930
931 assert_emit_discard(
932 &mut recv,
933 emit,
934 32,
935 18,
936 false,
937 Some(b"somhellogsomhellog"),
938 );
939 assert_eq!(recv.len, 18);
940 assert_eq!(recv.off, 18);
941 assert_eq!(recv.data.len(), 0);
942
943 assert_emit_discard_done(&mut recv, emit);
944 }
945
946 #[rstest]
947 fn overlapping_start_read(#[values(true, false)] emit: bool) {
948 let mut recv =
949 RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
950 assert_eq!(recv.len, 0);
951
952 let first = RangeBuf::from(b"something", 0, false);
953 let second = RangeBuf::from(b"hello", 8, true);
954
955 assert!(recv.write(first).is_ok());
956 assert_eq!(recv.len, 9);
957 assert_eq!(recv.off, 0);
958 assert_eq!(recv.data.len(), 1);
959
960 assert!(recv.write(second).is_ok());
961 assert_eq!(recv.len, 13);
962 assert_eq!(recv.off, 0);
963 assert_eq!(recv.data.len(), 2);
964
965 assert_emit_discard(
966 &mut recv,
967 emit,
968 32,
969 13,
970 true,
971 Some(b"somethingello"),
972 );
973
974 assert_eq!(recv.len, 13);
975 assert_eq!(recv.off, 13);
976
977 assert_emit_discard_done(&mut recv, emit);
978 }
979
980 #[rstest]
981 fn overlapping_end_read(#[values(true, false)] emit: bool) {
982 let mut recv =
983 RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
984 assert_eq!(recv.len, 0);
985
986 let first = RangeBuf::from(b"hello", 0, false);
987 let second = RangeBuf::from(b"something", 3, true);
988
989 assert!(recv.write(second).is_ok());
990 assert_eq!(recv.len, 12);
991 assert_eq!(recv.off, 0);
992 assert_eq!(recv.data.len(), 1);
993
994 assert!(recv.write(first).is_ok());
995 assert_eq!(recv.len, 12);
996 assert_eq!(recv.off, 0);
997 assert_eq!(recv.data.len(), 2);
998
999 assert_emit_discard(&mut recv, emit, 32, 12, true, Some(b"helsomething"));
1000 assert_eq!(recv.len, 12);
1001 assert_eq!(recv.off, 12);
1002
1003 assert_emit_discard_done(&mut recv, emit);
1004 }
1005
1006 #[rstest]
1007 fn overlapping_end_twice_read(#[values(true, false)] emit: bool) {
1008 let mut recv =
1009 RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
1010 assert_eq!(recv.len, 0);
1011
1012 let first = RangeBuf::from(b"he", 0, false);
1013 let second = RangeBuf::from(b"ow", 4, false);
1014 let third = RangeBuf::from(b"rl", 7, false);
1015 let fourth = RangeBuf::from(b"helloworld", 0, true);
1016
1017 assert!(recv.write(third).is_ok());
1018 assert_eq!(recv.len, 9);
1019 assert_eq!(recv.off, 0);
1020 assert_eq!(recv.data.len(), 1);
1021
1022 assert!(recv.write(second).is_ok());
1023 assert_eq!(recv.len, 9);
1024 assert_eq!(recv.off, 0);
1025 assert_eq!(recv.data.len(), 2);
1026
1027 assert!(recv.write(first).is_ok());
1028 assert_eq!(recv.len, 9);
1029 assert_eq!(recv.off, 0);
1030 assert_eq!(recv.data.len(), 3);
1031
1032 assert!(recv.write(fourth).is_ok());
1033 assert_eq!(recv.len, 10);
1034 assert_eq!(recv.off, 0);
1035 assert_eq!(recv.data.len(), 6);
1036
1037 assert_emit_discard(&mut recv, emit, 32, 10, true, Some(b"helloworld"));
1038 assert_eq!(recv.len, 10);
1039 assert_eq!(recv.off, 10);
1040
1041 assert_emit_discard_done(&mut recv, emit);
1042 }
1043
1044 #[rstest]
1045 fn overlapping_end_twice_and_contained_read(
1046 #[values(true, false)] emit: bool,
1047 ) {
1048 let mut recv =
1049 RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
1050 assert_eq!(recv.len, 0);
1051
1052 let first = RangeBuf::from(b"hellow", 0, false);
1053 let second = RangeBuf::from(b"barfoo", 10, true);
1054 let third = RangeBuf::from(b"rl", 7, false);
1055 let fourth = RangeBuf::from(b"elloworldbarfoo", 1, true);
1056
1057 assert!(recv.write(third).is_ok());
1058 assert_eq!(recv.len, 9);
1059 assert_eq!(recv.off, 0);
1060 assert_eq!(recv.data.len(), 1);
1061
1062 assert!(recv.write(second).is_ok());
1063 assert_eq!(recv.len, 16);
1064 assert_eq!(recv.off, 0);
1065 assert_eq!(recv.data.len(), 2);
1066
1067 assert!(recv.write(first).is_ok());
1068 assert_eq!(recv.len, 16);
1069 assert_eq!(recv.off, 0);
1070 assert_eq!(recv.data.len(), 3);
1071
1072 assert!(recv.write(fourth).is_ok());
1073 assert_eq!(recv.len, 16);
1074 assert_eq!(recv.off, 0);
1075 assert_eq!(recv.data.len(), 5);
1076
1077 assert_emit_discard(
1078 &mut recv,
1079 emit,
1080 32,
1081 16,
1082 true,
1083 Some(b"helloworldbarfoo"),
1084 );
1085 assert_eq!(recv.len, 16);
1086 assert_eq!(recv.off, 16);
1087
1088 assert_emit_discard_done(&mut recv, emit);
1089 }
1090
1091 #[rstest]
1092 fn partially_multi_overlapping_reordered_read(
1093 #[values(true, false)] emit: bool,
1094 ) {
1095 let mut recv =
1096 RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
1097 assert_eq!(recv.len, 0);
1098
1099 let first = RangeBuf::from(b"hello", 8, false);
1100 let second = RangeBuf::from(b"something", 0, false);
1101 let third = RangeBuf::from(b"moar", 11, true);
1102
1103 assert!(recv.write(first).is_ok());
1104 assert_eq!(recv.len, 13);
1105 assert_eq!(recv.off, 0);
1106 assert_eq!(recv.data.len(), 1);
1107
1108 assert!(recv.write(second).is_ok());
1109 assert_eq!(recv.len, 13);
1110 assert_eq!(recv.off, 0);
1111 assert_eq!(recv.data.len(), 2);
1112
1113 assert!(recv.write(third).is_ok());
1114 assert_eq!(recv.len, 15);
1115 assert_eq!(recv.off, 0);
1116 assert_eq!(recv.data.len(), 3);
1117
1118 assert_emit_discard(
1119 &mut recv,
1120 emit,
1121 32,
1122 15,
1123 true,
1124 Some(b"somethinhelloar"),
1125 );
1126 assert_eq!(recv.len, 15);
1127 assert_eq!(recv.off, 15);
1128 assert_eq!(recv.data.len(), 0);
1129
1130 assert_emit_discard_done(&mut recv, emit);
1131 }
1132
1133 #[rstest]
1134 fn partially_multi_overlapping_reordered_read2(
1135 #[values(true, false)] emit: bool,
1136 ) {
1137 let mut recv =
1138 RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
1139 assert_eq!(recv.len, 0);
1140
1141 let first = RangeBuf::from(b"aaa", 0, false);
1142 let second = RangeBuf::from(b"bbb", 2, false);
1143 let third = RangeBuf::from(b"ccc", 4, false);
1144 let fourth = RangeBuf::from(b"ddd", 6, false);
1145 let fifth = RangeBuf::from(b"eee", 9, false);
1146 let sixth = RangeBuf::from(b"fff", 11, false);
1147
1148 assert!(recv.write(second).is_ok());
1149 assert_eq!(recv.len, 5);
1150 assert_eq!(recv.off, 0);
1151 assert_eq!(recv.data.len(), 1);
1152
1153 assert!(recv.write(fourth).is_ok());
1154 assert_eq!(recv.len, 9);
1155 assert_eq!(recv.off, 0);
1156 assert_eq!(recv.data.len(), 2);
1157
1158 assert!(recv.write(third).is_ok());
1159 assert_eq!(recv.len, 9);
1160 assert_eq!(recv.off, 0);
1161 assert_eq!(recv.data.len(), 3);
1162
1163 assert!(recv.write(first).is_ok());
1164 assert_eq!(recv.len, 9);
1165 assert_eq!(recv.off, 0);
1166 assert_eq!(recv.data.len(), 4);
1167
1168 assert!(recv.write(sixth).is_ok());
1169 assert_eq!(recv.len, 14);
1170 assert_eq!(recv.off, 0);
1171 assert_eq!(recv.data.len(), 5);
1172
1173 assert!(recv.write(fifth).is_ok());
1174 assert_eq!(recv.len, 14);
1175 assert_eq!(recv.off, 0);
1176 assert_eq!(recv.data.len(), 6);
1177
1178 assert_emit_discard(
1179 &mut recv,
1180 emit,
1181 32,
1182 14,
1183 false,
1184 Some(b"aabbbcdddeefff"),
1185 );
1186 assert_eq!(recv.len, 14);
1187 assert_eq!(recv.off, 14);
1188 assert_eq!(recv.data.len(), 0);
1189
1190 assert_emit_discard_done(&mut recv, emit);
1191 }
1192
1193 #[test]
1194 fn mixed_read_actions() {
1195 let mut recv =
1196 RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
1197 assert_eq!(recv.len, 0);
1198
1199 let first = RangeBuf::from(b"hello", 0, false);
1200 let second = RangeBuf::from(b"world", 5, false);
1201 let third = RangeBuf::from(b"something", 10, true);
1202
1203 assert!(recv.write(second).is_ok());
1204 assert_eq!(recv.len, 10);
1205 assert_eq!(recv.off, 0);
1206
1207 assert_emit_discard_done(&mut recv, true);
1208 assert_emit_discard_done(&mut recv, false);
1209
1210 assert!(recv.write(third).is_ok());
1211 assert_eq!(recv.len, 19);
1212 assert_eq!(recv.off, 0);
1213
1214 assert_emit_discard_done(&mut recv, true);
1215 assert_emit_discard_done(&mut recv, false);
1216
1217 assert!(recv.write(first).is_ok());
1218 assert_eq!(recv.len, 19);
1219 assert_eq!(recv.off, 0);
1220
1221 assert_emit_discard(&mut recv, true, 5, 5, false, Some(b"hello"));
1222 assert_eq!(recv.len, 19);
1223 assert_eq!(recv.off, 5);
1224
1225 assert_emit_discard(&mut recv, false, 5, 5, false, None);
1226 assert_eq!(recv.len, 19);
1227 assert_eq!(recv.off, 10);
1228
1229 assert_emit_discard(&mut recv, true, 9, 9, true, Some(b"something"));
1230 assert_eq!(recv.len, 19);
1231 assert_eq!(recv.off, 19);
1232
1233 assert_emit_discard_done(&mut recv, true);
1234 assert_emit_discard_done(&mut recv, false);
1235 }
1236}