1use std::cmp;
28
29use std::collections::BTreeMap;
30use std::collections::VecDeque;
31
32use std::time::Duration;
33use std::time::Instant;
34
35use crate::stream::RecvAction;
36use crate::stream::RecvBufResetReturn;
37use crate::Error;
38use crate::Result;
39
40use crate::flowcontrol;
41
42use crate::range_buf::RangeBuf;
43
44#[derive(Debug, Default)]
50pub struct RecvBuf {
51 data: BTreeMap<u64, RangeBuf>,
54
55 off: u64,
57
58 len: u64,
60
61 flow_control: flowcontrol::FlowControl,
63
64 fin_off: Option<u64>,
66
67 error: Option<u64>,
69
70 drain: bool,
72}
73
74impl RecvBuf {
75 pub fn new(max_data: u64, initial_window: u64, max_window: u64) -> RecvBuf {
77 RecvBuf {
78 flow_control: flowcontrol::FlowControl::new(
79 max_data,
80 initial_window,
81 max_window,
82 ),
83 ..RecvBuf::default()
84 }
85 }
86
87 pub fn write(&mut self, buf: RangeBuf) -> Result<()> {
93 if buf.max_off() > self.max_data() {
94 return Err(Error::FlowControl);
95 }
96
97 if let Some(fin_off) = self.fin_off {
98 if buf.max_off() > fin_off {
100 return Err(Error::FinalSize);
101 }
102
103 if buf.fin() && fin_off != buf.max_off() {
105 return Err(Error::FinalSize);
106 }
107 }
108
109 if buf.fin() && buf.max_off() < self.len {
111 return Err(Error::FinalSize);
112 }
113
114 if self.fin_off.is_some() && buf.is_empty() {
117 return Ok(());
118 }
119
120 if buf.fin() {
121 self.fin_off = Some(buf.max_off());
122 }
123
124 if !buf.fin() && buf.is_empty() {
126 return Ok(());
127 }
128
129 if self.off >= buf.max_off() {
132 if !buf.is_empty() {
138 return Ok(());
139 }
140 }
141
142 let mut tmp_bufs = VecDeque::with_capacity(2);
143 tmp_bufs.push_back(buf);
144
145 'tmp: while let Some(mut buf) = tmp_bufs.pop_front() {
146 if self.off_front() > buf.off() {
152 buf = buf.split_off((self.off_front() - buf.off()) as usize);
153 }
154
155 if buf.off() < self.max_off() || buf.is_empty() {
162 for (_, b) in self.data.range(buf.off()..) {
163 let off = buf.off();
164
165 if b.off() > buf.max_off() {
167 break;
168 }
169
170 if off >= b.off() && buf.max_off() <= b.max_off() {
172 continue 'tmp;
173 }
174
175 if off >= b.off() && off < b.max_off() {
177 buf = buf.split_off((b.max_off() - off) as usize);
178 }
179
180 if off < b.off() && buf.max_off() > b.off() {
182 tmp_bufs
183 .push_back(buf.split_off((b.off() - off) as usize));
184 }
185 }
186 }
187
188 self.len = cmp::max(self.len, buf.max_off());
189
190 if !self.drain {
191 self.data.insert(buf.max_off(), buf);
192 } else {
193 self.off = self.len;
195 }
196 }
197
198 Ok(())
199 }
200
201 #[inline]
212 pub fn emit(&mut self, mut out: &mut [u8]) -> Result<(usize, bool)> {
213 self.emit_or_discard(RecvAction::Emit { out: &mut out })
214 }
215
216 pub fn emit_or_discard<B: bytes::BufMut>(
231 &mut self, mut action: RecvAction<B>,
232 ) -> Result<(usize, bool)> {
233 let mut len = 0;
234 let mut cap = match &action {
235 RecvAction::Emit { out } => out.remaining_mut(),
236 RecvAction::Discard { len } => *len,
237 };
238
239 if !self.ready() {
240 return Err(Error::Done);
241 }
242
243 if let Some(e) = self.error {
246 self.data.clear();
247 return Err(Error::StreamReset(e));
248 }
249
250 while cap > 0 && self.ready() {
251 let mut entry = match self.data.first_entry() {
252 Some(entry) => entry,
253 None => break,
254 };
255
256 let buf = entry.get_mut();
257
258 let buf_len = cmp::min(buf.len(), cap);
259
260 if let RecvAction::Emit { ref mut out } = action {
262 debug_assert!(
267 cap <= out.remaining_mut(),
268 "We updated `cap` incorrectly"
269 );
270 out.put_slice(&buf[..buf_len])
271 }
272
273 self.off += buf_len as u64;
274
275 len += buf_len;
276 cap -= buf_len;
277
278 if buf_len < buf.len() {
279 buf.consume(buf_len);
280
281 break;
283 }
284
285 entry.remove();
286 }
287
288 self.flow_control.add_consumed(len as u64);
290
291 Ok((len, self.is_fin()))
292 }
293
294 pub fn reset(
296 &mut self, error_code: u64, final_size: u64,
297 ) -> Result<RecvBufResetReturn> {
298 if let Some(fin_off) = self.fin_off {
300 if fin_off != final_size {
301 return Err(Error::FinalSize);
302 }
303 }
304
305 if final_size < self.len {
307 return Err(Error::FinalSize);
308 }
309
310 if self.error.is_some() {
311 return Ok(RecvBufResetReturn::zero());
313 }
314
315 let result = RecvBufResetReturn {
318 max_data_delta: final_size - self.len,
319 consumed_flowcontrol: final_size - self.off,
320 };
321
322 self.error = Some(error_code);
323
324 self.off = final_size;
326
327 self.data.clear();
328
329 let buf = RangeBuf::from(b"", final_size, true);
332 self.write(buf)?;
333
334 Ok(result)
335 }
336
337 pub fn update_max_data(&mut self, now: Instant) {
339 self.flow_control.update_max_data(now);
340 }
341
342 pub fn max_data_next(&mut self) -> u64 {
344 self.flow_control.max_data_next()
345 }
346
347 pub fn max_data(&self) -> u64 {
349 self.flow_control.max_data()
350 }
351
352 pub fn window(&self) -> u64 {
354 self.flow_control.window()
355 }
356
357 pub fn autotune_window(&mut self, now: Instant, rtt: Duration) {
359 self.flow_control.autotune_window(now, rtt);
360 }
361
362 pub fn shutdown(&mut self) -> Result<u64> {
366 if self.drain {
367 return Err(Error::Done);
368 }
369
370 self.drain = true;
371
372 self.data.clear();
373
374 let consumed = self.max_off() - self.off;
375 self.off = self.max_off();
376
377 Ok(consumed)
378 }
379
380 pub fn off_front(&self) -> u64 {
382 self.off
383 }
384
385 pub fn almost_full(&self) -> bool {
387 self.fin_off.is_none() && self.flow_control.should_update_max_data()
388 }
389
390 pub fn max_off(&self) -> u64 {
392 self.len
393 }
394
395 pub fn is_fin(&self) -> bool {
400 if self.fin_off == Some(self.off) {
401 return true;
402 }
403
404 false
405 }
406
407 pub fn is_draining(&self) -> bool {
409 self.drain
410 }
411
412 pub fn ready(&self) -> bool {
414 let (_, buf) = match self.data.first_key_value() {
415 Some(v) => v,
416 None => return false,
417 };
418
419 buf.off() == self.off
420 }
421
422 #[cfg(test)]
423 pub(crate) fn flow_control_for_tests(&self) -> &flowcontrol::FlowControl {
424 &self.flow_control
425 }
426}
427
428#[cfg(test)]
429mod tests {
430 use super::*;
431
432 const DEFAULT_STREAM_WINDOW: u64 = 32 * 1024;
434 use bytes::BufMut as _;
435 use rstest::rstest;
436
437 fn assert_emit_discard(
455 recv: &mut RecvBuf, emit: bool, target_len: usize, result_len: usize,
456 is_fin: bool, test_bytes: Option<&[u8]>,
457 ) {
458 let mut buf = Vec::<u8>::with_capacity(512).limit(target_len);
459 let action = if emit {
460 RecvAction::Emit { out: &mut buf }
461 } else {
462 RecvAction::Discard { len: target_len }
463 };
464
465 let (read, fin) = recv.emit_or_discard(action).unwrap();
466
467 let buf = buf.into_inner();
468 if emit {
469 assert_eq!(buf.len(), read);
470 if let Some(v) = test_bytes {
471 assert_eq!(&buf, v);
472 }
473 }
474
475 assert_eq!(read, result_len);
476 assert_eq!(is_fin, fin);
477 }
478
479 fn assert_emit_discard_done(recv: &mut RecvBuf, emit: bool) {
481 let mut buf = [0u8; 32];
482 let action = if emit {
483 RecvAction::Emit {
484 out: &mut buf.as_mut_slice(),
485 }
486 } else {
487 RecvAction::Discard { len: 32 }
488 };
489 assert_eq!(recv.emit_or_discard(action), Err(Error::Done));
490 }
491
492 #[rstest]
493 fn empty_read(#[values(true, false)] emit: bool) {
494 let mut recv =
495 RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
496 assert_eq!(recv.len, 0);
497
498 assert_emit_discard_done(&mut recv, emit);
499 }
500
501 #[rstest]
502 fn empty_stream_frame(#[values(true, false)] emit: bool) {
503 let mut recv =
504 RecvBuf::new(15, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
505 assert_eq!(recv.len, 0);
506
507 let buf = RangeBuf::from(b"hello", 0, false);
508 assert!(recv.write(buf).is_ok());
509 assert_eq!(recv.len, 5);
510 assert_eq!(recv.off, 0);
511 assert_eq!(recv.data.len(), 1);
512
513 assert_emit_discard(&mut recv, emit, 32, 5, false, None);
514
515 let buf = RangeBuf::from(b"", 10, false);
517 assert!(recv.write(buf).is_ok());
518 assert_eq!(recv.len, 5);
519 assert_eq!(recv.off, 5);
520 assert_eq!(recv.data.len(), 0);
521
522 let buf = RangeBuf::from(b"", 16, false);
524 assert_eq!(recv.write(buf), Err(Error::FlowControl));
525
526 let buf = RangeBuf::from(b"", 5, true);
528 assert!(recv.write(buf).is_ok());
529 assert_eq!(recv.len, 5);
530 assert_eq!(recv.off, 5);
531 assert_eq!(recv.data.len(), 1);
532
533 let buf = RangeBuf::from(b"", 5, true);
535 assert!(recv.write(buf).is_ok());
536 assert_eq!(recv.len, 5);
537 assert_eq!(recv.off, 5);
538 assert_eq!(recv.data.len(), 1);
539
540 let buf = RangeBuf::from(b"aa", 3, true);
542 assert!(recv.write(buf).is_ok());
543 assert_eq!(recv.len, 5);
544 assert_eq!(recv.off, 5);
545 assert_eq!(recv.data.len(), 1);
546
547 let buf = RangeBuf::from(b"", 6, true);
549 assert_eq!(recv.write(buf), Err(Error::FinalSize));
550 let buf = RangeBuf::from(b"", 4, true);
551 assert_eq!(recv.write(buf), Err(Error::FinalSize));
552
553 assert_emit_discard(&mut recv, emit, 32, 0, true, None);
554 }
555
556 #[rstest]
557 fn ordered_read(#[values(true, false)] emit: bool) {
558 let mut recv =
559 RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
560 assert_eq!(recv.len, 0);
561
562 let first = RangeBuf::from(b"hello", 0, false);
563 let second = RangeBuf::from(b"world", 5, false);
564 let third = RangeBuf::from(b"something", 10, true);
565
566 assert!(recv.write(second).is_ok());
567 assert_eq!(recv.len, 10);
568 assert_eq!(recv.off, 0);
569
570 assert_emit_discard_done(&mut recv, emit);
571
572 assert!(recv.write(third).is_ok());
573 assert_eq!(recv.len, 19);
574 assert_eq!(recv.off, 0);
575
576 assert_emit_discard_done(&mut recv, emit);
577
578 assert!(recv.write(first).is_ok());
579 assert_eq!(recv.len, 19);
580 assert_eq!(recv.off, 0);
581
582 assert_emit_discard(
583 &mut recv,
584 emit,
585 32,
586 19,
587 true,
588 Some(b"helloworldsomething"),
589 );
590 assert_eq!(recv.len, 19);
591 assert_eq!(recv.off, 19);
592
593 assert_emit_discard_done(&mut recv, emit);
594 }
595
596 #[rstest]
598 fn shutdown(#[values(true, false)] emit: bool) {
599 let mut recv =
600 RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
601 assert_eq!(recv.len, 0);
602
603 let first = RangeBuf::from(b"hello", 0, false);
604 let second = RangeBuf::from(b"world", 5, false);
605 let third = RangeBuf::from(b"something", 10, false);
606
607 assert!(recv.write(second).is_ok());
608 assert_eq!(recv.len, 10);
609 assert_eq!(recv.off, 0);
610
611 assert_emit_discard_done(&mut recv, emit);
612
613 assert_eq!(recv.shutdown(), Ok(10));
615 assert_eq!(recv.len, 10);
616 assert_eq!(recv.off, 10);
617 assert_eq!(recv.data.len(), 0);
618
619 assert_emit_discard_done(&mut recv, emit);
620
621 assert!(recv.write(first).is_ok());
623 assert_eq!(recv.len, 10);
624 assert_eq!(recv.off, 10);
625 assert_eq!(recv.data.len(), 0);
626
627 assert!(recv.write(third).is_ok());
630 assert_eq!(recv.len, 19);
631 assert_eq!(recv.off, 19);
632 assert_eq!(recv.data.len(), 0);
633
634 assert_emit_discard_done(&mut recv, emit);
636 assert_eq!(
637 recv.reset(42, 123),
638 Ok(RecvBufResetReturn {
639 max_data_delta: 104,
640 consumed_flowcontrol: 104,
641 })
642 );
643 assert_eq!(recv.len, 123);
644 assert_eq!(recv.off, 123);
645 assert_eq!(recv.data.len(), 0);
646
647 assert_emit_discard_done(&mut recv, emit);
648 }
649
650 #[rstest]
651 fn split_read(#[values(true, false)] emit: bool) {
652 let mut recv =
653 RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
654 assert_eq!(recv.len, 0);
655
656 let first = RangeBuf::from(b"something", 0, false);
657 let second = RangeBuf::from(b"helloworld", 9, true);
658
659 assert!(recv.write(first).is_ok());
660 assert_eq!(recv.len, 9);
661 assert_eq!(recv.off, 0);
662
663 assert!(recv.write(second).is_ok());
664 assert_eq!(recv.len, 19);
665 assert_eq!(recv.off, 0);
666
667 assert_emit_discard(&mut recv, emit, 10, 10, false, Some(b"somethingh"));
668 assert_eq!(recv.len, 19);
669 assert_eq!(recv.off, 10);
670
671 assert_emit_discard(&mut recv, emit, 5, 5, false, Some(b"ellow"));
672 assert_eq!(recv.len, 19);
673 assert_eq!(recv.off, 15);
674
675 assert_emit_discard(&mut recv, emit, 5, 4, true, Some(b"orld"));
676 assert_eq!(recv.len, 19);
677 assert_eq!(recv.off, 19);
678 }
679
680 #[test]
681 fn split_read_incremental_buf() {
682 let mut recv =
683 RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
684 assert_eq!(recv.len, 0);
685
686 let first = RangeBuf::from(b"something", 0, false);
687 let second = RangeBuf::from(b"helloworld", 9, true);
688
689 assert!(recv.write(first).is_ok());
690 assert_eq!(recv.len, 9);
691 assert_eq!(recv.off, 0);
692
693 assert!(recv.write(second).is_ok());
694 assert_eq!(recv.len, 19);
695 assert_eq!(recv.off, 0);
696
697 let mut buf = Vec::new().limit(10);
698 assert_eq!(
699 recv.emit_or_discard(RecvAction::Emit { out: &mut buf }),
700 Ok((10, false))
701 );
702 assert_eq!(recv.len, 19);
703 assert_eq!(recv.off, 10);
704 assert_eq!(buf.get_ref().len(), 10);
705 assert_eq!(buf.get_ref().as_slice(), b"somethingh");
706
707 buf.set_limit(5);
708 assert_eq!(
709 recv.emit_or_discard(RecvAction::Emit { out: &mut buf }),
710 Ok((5, false))
711 );
712 assert_eq!(recv.len, 19);
713 assert_eq!(recv.off, 15);
714 assert_eq!(buf.get_ref().len(), 15);
715 assert_eq!(buf.get_ref().as_slice(), b"somethinghellow");
716
717 buf.set_limit(42);
718 assert_eq!(
719 recv.emit_or_discard(RecvAction::Emit { out: &mut buf }),
720 Ok((4, true))
721 );
722 assert_eq!(recv.len, 19);
723 assert_eq!(recv.off, 19);
724 assert_eq!(buf.get_ref().len(), 19);
725 assert_eq!(buf.get_ref().as_slice(), b"somethinghelloworld");
726 }
727
728 #[rstest]
729 fn incomplete_read(#[values(true, false)] emit: bool) {
730 let mut recv =
731 RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
732 assert_eq!(recv.len, 0);
733
734 let mut buf = [0u8; 32];
735
736 let first = RangeBuf::from(b"something", 0, false);
737 let second = RangeBuf::from(b"helloworld", 9, true);
738
739 assert!(recv.write(second).is_ok());
740 assert_eq!(recv.len, 19);
741 assert_eq!(recv.off, 0);
742
743 let action = if emit {
744 RecvAction::Emit {
745 out: &mut buf.as_mut_slice(),
746 }
747 } else {
748 RecvAction::Discard { len: 32 }
749 };
750 assert_eq!(recv.emit_or_discard(action), Err(Error::Done));
751
752 assert!(recv.write(first).is_ok());
753 assert_eq!(recv.len, 19);
754 assert_eq!(recv.off, 0);
755
756 assert_emit_discard(
757 &mut recv,
758 emit,
759 32,
760 19,
761 true,
762 Some(b"somethinghelloworld"),
763 );
764 assert_eq!(recv.len, 19);
765 assert_eq!(recv.off, 19);
766 }
767
768 #[rstest]
769 fn zero_len_read(#[values(true, false)] emit: bool) {
770 let mut recv =
771 RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
772 assert_eq!(recv.len, 0);
773
774 let first = RangeBuf::from(b"something", 0, false);
775 let second = RangeBuf::from(b"", 9, true);
776
777 assert!(recv.write(first).is_ok());
778 assert_eq!(recv.len, 9);
779 assert_eq!(recv.off, 0);
780 assert_eq!(recv.data.len(), 1);
781
782 assert!(recv.write(second).is_ok());
783 assert_eq!(recv.len, 9);
784 assert_eq!(recv.off, 0);
785 assert_eq!(recv.data.len(), 1);
786
787 assert_emit_discard(&mut recv, emit, 32, 9, true, Some(b"something"));
788 assert_eq!(recv.len, 9);
789 assert_eq!(recv.off, 9);
790 }
791
792 #[rstest]
793 fn past_read(#[values(true, false)] emit: bool) {
794 let mut recv =
795 RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
796 assert_eq!(recv.len, 0);
797
798 let first = RangeBuf::from(b"something", 0, false);
799 let second = RangeBuf::from(b"hello", 3, false);
800 let third = RangeBuf::from(b"ello", 4, true);
801 let fourth = RangeBuf::from(b"ello", 5, true);
802
803 assert!(recv.write(first).is_ok());
804 assert_eq!(recv.len, 9);
805 assert_eq!(recv.off, 0);
806 assert_eq!(recv.data.len(), 1);
807
808 assert_emit_discard(&mut recv, emit, 32, 9, false, Some(b"something"));
809 assert_eq!(recv.len, 9);
810 assert_eq!(recv.off, 9);
811
812 assert!(recv.write(second).is_ok());
813 assert_eq!(recv.len, 9);
814 assert_eq!(recv.off, 9);
815 assert_eq!(recv.data.len(), 0);
816
817 assert_eq!(recv.write(third), Err(Error::FinalSize));
818
819 assert!(recv.write(fourth).is_ok());
820 assert_eq!(recv.len, 9);
821 assert_eq!(recv.off, 9);
822 assert_eq!(recv.data.len(), 0);
823
824 assert_emit_discard_done(&mut recv, emit);
825 }
826
827 #[rstest]
828 fn fully_overlapping_read(#[values(true, false)] emit: bool) {
829 let mut recv =
830 RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
831 assert_eq!(recv.len, 0);
832
833 let first = RangeBuf::from(b"something", 0, false);
834 let second = RangeBuf::from(b"hello", 4, false);
835
836 assert!(recv.write(first).is_ok());
837 assert_eq!(recv.len, 9);
838 assert_eq!(recv.off, 0);
839 assert_eq!(recv.data.len(), 1);
840
841 assert!(recv.write(second).is_ok());
842 assert_eq!(recv.len, 9);
843 assert_eq!(recv.off, 0);
844 assert_eq!(recv.data.len(), 1);
845
846 assert_emit_discard(&mut recv, emit, 32, 9, false, Some(b"something"));
847 assert_eq!(recv.len, 9);
848 assert_eq!(recv.off, 9);
849 assert_eq!(recv.data.len(), 0);
850
851 assert_emit_discard_done(&mut recv, emit);
852 }
853
854 #[rstest]
855 fn fully_overlapping_read2(#[values(true, false)] emit: bool) {
856 let mut recv =
857 RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
858 assert_eq!(recv.len, 0);
859
860 let first = RangeBuf::from(b"something", 0, false);
861 let second = RangeBuf::from(b"hello", 4, false);
862
863 assert!(recv.write(second).is_ok());
864 assert_eq!(recv.len, 9);
865 assert_eq!(recv.off, 0);
866 assert_eq!(recv.data.len(), 1);
867
868 assert!(recv.write(first).is_ok());
869 assert_eq!(recv.len, 9);
870 assert_eq!(recv.off, 0);
871 assert_eq!(recv.data.len(), 2);
872
873 assert_emit_discard(&mut recv, emit, 32, 9, false, Some(b"somehello"));
874 assert_eq!(recv.len, 9);
875 assert_eq!(recv.off, 9);
876 assert_eq!(recv.data.len(), 0);
877
878 assert_emit_discard_done(&mut recv, emit);
879 }
880
881 #[rstest]
882 fn fully_overlapping_read3(#[values(true, false)] emit: bool) {
883 let mut recv =
884 RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
885 assert_eq!(recv.len, 0);
886
887 let first = RangeBuf::from(b"something", 0, false);
888 let second = RangeBuf::from(b"hello", 3, false);
889
890 assert!(recv.write(second).is_ok());
891 assert_eq!(recv.len, 8);
892 assert_eq!(recv.off, 0);
893 assert_eq!(recv.data.len(), 1);
894
895 assert!(recv.write(first).is_ok());
896 assert_eq!(recv.len, 9);
897 assert_eq!(recv.off, 0);
898 assert_eq!(recv.data.len(), 3);
899
900 assert_emit_discard(&mut recv, emit, 32, 9, false, Some(b"somhellog"));
901 assert_eq!(recv.len, 9);
902 assert_eq!(recv.off, 9);
903 assert_eq!(recv.data.len(), 0);
904
905 assert_emit_discard_done(&mut recv, emit);
906 }
907
908 #[rstest]
909 fn fully_overlapping_read_multi(#[values(true, false)] emit: bool) {
910 let mut recv =
911 RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
912 assert_eq!(recv.len, 0);
913
914 let first = RangeBuf::from(b"somethingsomething", 0, false);
915 let second = RangeBuf::from(b"hello", 3, false);
916 let third = RangeBuf::from(b"hello", 12, false);
917
918 assert!(recv.write(second).is_ok());
919 assert_eq!(recv.len, 8);
920 assert_eq!(recv.off, 0);
921 assert_eq!(recv.data.len(), 1);
922
923 assert!(recv.write(third).is_ok());
924 assert_eq!(recv.len, 17);
925 assert_eq!(recv.off, 0);
926 assert_eq!(recv.data.len(), 2);
927
928 assert!(recv.write(first).is_ok());
929 assert_eq!(recv.len, 18);
930 assert_eq!(recv.off, 0);
931 assert_eq!(recv.data.len(), 5);
932
933 assert_emit_discard(
934 &mut recv,
935 emit,
936 32,
937 18,
938 false,
939 Some(b"somhellogsomhellog"),
940 );
941 assert_eq!(recv.len, 18);
942 assert_eq!(recv.off, 18);
943 assert_eq!(recv.data.len(), 0);
944
945 assert_emit_discard_done(&mut recv, emit);
946 }
947
948 #[rstest]
949 fn overlapping_start_read(#[values(true, false)] emit: bool) {
950 let mut recv =
951 RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
952 assert_eq!(recv.len, 0);
953
954 let first = RangeBuf::from(b"something", 0, false);
955 let second = RangeBuf::from(b"hello", 8, true);
956
957 assert!(recv.write(first).is_ok());
958 assert_eq!(recv.len, 9);
959 assert_eq!(recv.off, 0);
960 assert_eq!(recv.data.len(), 1);
961
962 assert!(recv.write(second).is_ok());
963 assert_eq!(recv.len, 13);
964 assert_eq!(recv.off, 0);
965 assert_eq!(recv.data.len(), 2);
966
967 assert_emit_discard(
968 &mut recv,
969 emit,
970 32,
971 13,
972 true,
973 Some(b"somethingello"),
974 );
975
976 assert_eq!(recv.len, 13);
977 assert_eq!(recv.off, 13);
978
979 assert_emit_discard_done(&mut recv, emit);
980 }
981
982 #[rstest]
983 fn overlapping_end_read(#[values(true, false)] emit: bool) {
984 let mut recv =
985 RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
986 assert_eq!(recv.len, 0);
987
988 let first = RangeBuf::from(b"hello", 0, false);
989 let second = RangeBuf::from(b"something", 3, true);
990
991 assert!(recv.write(second).is_ok());
992 assert_eq!(recv.len, 12);
993 assert_eq!(recv.off, 0);
994 assert_eq!(recv.data.len(), 1);
995
996 assert!(recv.write(first).is_ok());
997 assert_eq!(recv.len, 12);
998 assert_eq!(recv.off, 0);
999 assert_eq!(recv.data.len(), 2);
1000
1001 assert_emit_discard(&mut recv, emit, 32, 12, true, Some(b"helsomething"));
1002 assert_eq!(recv.len, 12);
1003 assert_eq!(recv.off, 12);
1004
1005 assert_emit_discard_done(&mut recv, emit);
1006 }
1007
1008 #[rstest]
1009 fn overlapping_end_twice_read(#[values(true, false)] emit: bool) {
1010 let mut recv =
1011 RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
1012 assert_eq!(recv.len, 0);
1013
1014 let first = RangeBuf::from(b"he", 0, false);
1015 let second = RangeBuf::from(b"ow", 4, false);
1016 let third = RangeBuf::from(b"rl", 7, false);
1017 let fourth = RangeBuf::from(b"helloworld", 0, true);
1018
1019 assert!(recv.write(third).is_ok());
1020 assert_eq!(recv.len, 9);
1021 assert_eq!(recv.off, 0);
1022 assert_eq!(recv.data.len(), 1);
1023
1024 assert!(recv.write(second).is_ok());
1025 assert_eq!(recv.len, 9);
1026 assert_eq!(recv.off, 0);
1027 assert_eq!(recv.data.len(), 2);
1028
1029 assert!(recv.write(first).is_ok());
1030 assert_eq!(recv.len, 9);
1031 assert_eq!(recv.off, 0);
1032 assert_eq!(recv.data.len(), 3);
1033
1034 assert!(recv.write(fourth).is_ok());
1035 assert_eq!(recv.len, 10);
1036 assert_eq!(recv.off, 0);
1037 assert_eq!(recv.data.len(), 6);
1038
1039 assert_emit_discard(&mut recv, emit, 32, 10, true, Some(b"helloworld"));
1040 assert_eq!(recv.len, 10);
1041 assert_eq!(recv.off, 10);
1042
1043 assert_emit_discard_done(&mut recv, emit);
1044 }
1045
1046 #[rstest]
1047 fn overlapping_end_twice_and_contained_read(
1048 #[values(true, false)] emit: bool,
1049 ) {
1050 let mut recv =
1051 RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
1052 assert_eq!(recv.len, 0);
1053
1054 let first = RangeBuf::from(b"hellow", 0, false);
1055 let second = RangeBuf::from(b"barfoo", 10, true);
1056 let third = RangeBuf::from(b"rl", 7, false);
1057 let fourth = RangeBuf::from(b"elloworldbarfoo", 1, true);
1058
1059 assert!(recv.write(third).is_ok());
1060 assert_eq!(recv.len, 9);
1061 assert_eq!(recv.off, 0);
1062 assert_eq!(recv.data.len(), 1);
1063
1064 assert!(recv.write(second).is_ok());
1065 assert_eq!(recv.len, 16);
1066 assert_eq!(recv.off, 0);
1067 assert_eq!(recv.data.len(), 2);
1068
1069 assert!(recv.write(first).is_ok());
1070 assert_eq!(recv.len, 16);
1071 assert_eq!(recv.off, 0);
1072 assert_eq!(recv.data.len(), 3);
1073
1074 assert!(recv.write(fourth).is_ok());
1075 assert_eq!(recv.len, 16);
1076 assert_eq!(recv.off, 0);
1077 assert_eq!(recv.data.len(), 5);
1078
1079 assert_emit_discard(
1080 &mut recv,
1081 emit,
1082 32,
1083 16,
1084 true,
1085 Some(b"helloworldbarfoo"),
1086 );
1087 assert_eq!(recv.len, 16);
1088 assert_eq!(recv.off, 16);
1089
1090 assert_emit_discard_done(&mut recv, emit);
1091 }
1092
1093 #[rstest]
1094 fn partially_multi_overlapping_reordered_read(
1095 #[values(true, false)] emit: bool,
1096 ) {
1097 let mut recv =
1098 RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
1099 assert_eq!(recv.len, 0);
1100
1101 let first = RangeBuf::from(b"hello", 8, false);
1102 let second = RangeBuf::from(b"something", 0, false);
1103 let third = RangeBuf::from(b"moar", 11, true);
1104
1105 assert!(recv.write(first).is_ok());
1106 assert_eq!(recv.len, 13);
1107 assert_eq!(recv.off, 0);
1108 assert_eq!(recv.data.len(), 1);
1109
1110 assert!(recv.write(second).is_ok());
1111 assert_eq!(recv.len, 13);
1112 assert_eq!(recv.off, 0);
1113 assert_eq!(recv.data.len(), 2);
1114
1115 assert!(recv.write(third).is_ok());
1116 assert_eq!(recv.len, 15);
1117 assert_eq!(recv.off, 0);
1118 assert_eq!(recv.data.len(), 3);
1119
1120 assert_emit_discard(
1121 &mut recv,
1122 emit,
1123 32,
1124 15,
1125 true,
1126 Some(b"somethinhelloar"),
1127 );
1128 assert_eq!(recv.len, 15);
1129 assert_eq!(recv.off, 15);
1130 assert_eq!(recv.data.len(), 0);
1131
1132 assert_emit_discard_done(&mut recv, emit);
1133 }
1134
1135 #[rstest]
1136 fn partially_multi_overlapping_reordered_read2(
1137 #[values(true, false)] emit: bool,
1138 ) {
1139 let mut recv =
1140 RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
1141 assert_eq!(recv.len, 0);
1142
1143 let first = RangeBuf::from(b"aaa", 0, false);
1144 let second = RangeBuf::from(b"bbb", 2, false);
1145 let third = RangeBuf::from(b"ccc", 4, false);
1146 let fourth = RangeBuf::from(b"ddd", 6, false);
1147 let fifth = RangeBuf::from(b"eee", 9, false);
1148 let sixth = RangeBuf::from(b"fff", 11, false);
1149
1150 assert!(recv.write(second).is_ok());
1151 assert_eq!(recv.len, 5);
1152 assert_eq!(recv.off, 0);
1153 assert_eq!(recv.data.len(), 1);
1154
1155 assert!(recv.write(fourth).is_ok());
1156 assert_eq!(recv.len, 9);
1157 assert_eq!(recv.off, 0);
1158 assert_eq!(recv.data.len(), 2);
1159
1160 assert!(recv.write(third).is_ok());
1161 assert_eq!(recv.len, 9);
1162 assert_eq!(recv.off, 0);
1163 assert_eq!(recv.data.len(), 3);
1164
1165 assert!(recv.write(first).is_ok());
1166 assert_eq!(recv.len, 9);
1167 assert_eq!(recv.off, 0);
1168 assert_eq!(recv.data.len(), 4);
1169
1170 assert!(recv.write(sixth).is_ok());
1171 assert_eq!(recv.len, 14);
1172 assert_eq!(recv.off, 0);
1173 assert_eq!(recv.data.len(), 5);
1174
1175 assert!(recv.write(fifth).is_ok());
1176 assert_eq!(recv.len, 14);
1177 assert_eq!(recv.off, 0);
1178 assert_eq!(recv.data.len(), 6);
1179
1180 assert_emit_discard(
1181 &mut recv,
1182 emit,
1183 32,
1184 14,
1185 false,
1186 Some(b"aabbbcdddeefff"),
1187 );
1188 assert_eq!(recv.len, 14);
1189 assert_eq!(recv.off, 14);
1190 assert_eq!(recv.data.len(), 0);
1191
1192 assert_emit_discard_done(&mut recv, emit);
1193 }
1194
1195 #[test]
1196 fn mixed_read_actions() {
1197 let mut recv =
1198 RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
1199 assert_eq!(recv.len, 0);
1200
1201 let first = RangeBuf::from(b"hello", 0, false);
1202 let second = RangeBuf::from(b"world", 5, false);
1203 let third = RangeBuf::from(b"something", 10, true);
1204
1205 assert!(recv.write(second).is_ok());
1206 assert_eq!(recv.len, 10);
1207 assert_eq!(recv.off, 0);
1208
1209 assert_emit_discard_done(&mut recv, true);
1210 assert_emit_discard_done(&mut recv, false);
1211
1212 assert!(recv.write(third).is_ok());
1213 assert_eq!(recv.len, 19);
1214 assert_eq!(recv.off, 0);
1215
1216 assert_emit_discard_done(&mut recv, true);
1217 assert_emit_discard_done(&mut recv, false);
1218
1219 assert!(recv.write(first).is_ok());
1220 assert_eq!(recv.len, 19);
1221 assert_eq!(recv.off, 0);
1222
1223 assert_emit_discard(&mut recv, true, 5, 5, false, Some(b"hello"));
1224 assert_eq!(recv.len, 19);
1225 assert_eq!(recv.off, 5);
1226
1227 assert_emit_discard(&mut recv, false, 5, 5, false, None);
1228 assert_eq!(recv.len, 19);
1229 assert_eq!(recv.off, 10);
1230
1231 assert_emit_discard(&mut recv, true, 9, 9, true, Some(b"something"));
1232 assert_eq!(recv.len, 19);
1233 assert_eq!(recv.off, 19);
1234
1235 assert_emit_discard_done(&mut recv, true);
1236 assert_emit_discard_done(&mut recv, false);
1237 }
1238}