1use std::cmp;
28
29use std::collections::BTreeMap;
30use std::collections::VecDeque;
31
32use std::time::Duration;
33use std::time::Instant;
34
35use crate::stream::RecvAction;
36use crate::stream::RecvBufResetReturn;
37use crate::Error;
38use crate::Result;
39
40use crate::flowcontrol;
41
42use crate::range_buf::RangeBuf;
43
44use super::DEFAULT_STREAM_WINDOW;
45
46#[derive(Debug, Default)]
52pub struct RecvBuf {
53 data: BTreeMap<u64, RangeBuf>,
56
57 off: u64,
59
60 len: u64,
62
63 flow_control: flowcontrol::FlowControl,
65
66 fin_off: Option<u64>,
68
69 error: Option<u64>,
71
72 drain: bool,
74}
75
76impl RecvBuf {
77 pub fn new(max_data: u64, max_window: u64) -> RecvBuf {
79 RecvBuf {
80 flow_control: flowcontrol::FlowControl::new(
81 max_data,
82 cmp::min(max_data, DEFAULT_STREAM_WINDOW),
83 max_window,
84 ),
85 ..RecvBuf::default()
86 }
87 }
88
89 pub fn write(&mut self, buf: RangeBuf) -> Result<()> {
95 if buf.max_off() > self.max_data() {
96 return Err(Error::FlowControl);
97 }
98
99 if let Some(fin_off) = self.fin_off {
100 if buf.max_off() > fin_off {
102 return Err(Error::FinalSize);
103 }
104
105 if buf.fin() && fin_off != buf.max_off() {
107 return Err(Error::FinalSize);
108 }
109 }
110
111 if buf.fin() && buf.max_off() < self.len {
113 return Err(Error::FinalSize);
114 }
115
116 if self.fin_off.is_some() && buf.is_empty() {
119 return Ok(());
120 }
121
122 if buf.fin() {
123 self.fin_off = Some(buf.max_off());
124 }
125
126 if !buf.fin() && buf.is_empty() {
128 return Ok(());
129 }
130
131 if self.off >= buf.max_off() {
134 if !buf.is_empty() {
140 return Ok(());
141 }
142 }
143
144 let mut tmp_bufs = VecDeque::with_capacity(2);
145 tmp_bufs.push_back(buf);
146
147 'tmp: while let Some(mut buf) = tmp_bufs.pop_front() {
148 if self.off_front() > buf.off() {
154 buf = buf.split_off((self.off_front() - buf.off()) as usize);
155 }
156
157 if buf.off() < self.max_off() || buf.is_empty() {
164 for (_, b) in self.data.range(buf.off()..) {
165 let off = buf.off();
166
167 if b.off() > buf.max_off() {
169 break;
170 }
171
172 if off >= b.off() && buf.max_off() <= b.max_off() {
174 continue 'tmp;
175 }
176
177 if off >= b.off() && off < b.max_off() {
179 buf = buf.split_off((b.max_off() - off) as usize);
180 }
181
182 if off < b.off() && buf.max_off() > b.off() {
184 tmp_bufs
185 .push_back(buf.split_off((b.off() - off) as usize));
186 }
187 }
188 }
189
190 self.len = cmp::max(self.len, buf.max_off());
191
192 if !self.drain {
193 self.data.insert(buf.max_off(), buf);
194 } else {
195 self.off = self.len;
197 }
198 }
199
200 Ok(())
201 }
202
203 pub fn emit(&mut self, out: &mut [u8]) -> Result<(usize, bool)> {
214 self.emit_or_discard(RecvAction::Emit { out })
215 }
216
217 pub fn emit_or_discard(
232 &mut self, mut action: RecvAction,
233 ) -> Result<(usize, bool)> {
234 let mut len = 0;
235 let mut cap = match &action {
236 RecvAction::Emit { out } => out.len(),
237 RecvAction::Discard { len } => *len,
238 };
239
240 if !self.ready() {
241 return Err(Error::Done);
242 }
243
244 if let Some(e) = self.error {
247 self.data.clear();
248 return Err(Error::StreamReset(e));
249 }
250
251 while cap > 0 && self.ready() {
252 let mut entry = match self.data.first_entry() {
253 Some(entry) => entry,
254 None => break,
255 };
256
257 let buf = entry.get_mut();
258
259 let buf_len = cmp::min(buf.len(), cap);
260
261 if let RecvAction::Emit { ref mut out } = action {
263 out[len..len + buf_len].copy_from_slice(&buf[..buf_len]);
264 }
265
266 self.off += buf_len as u64;
267
268 len += buf_len;
269 cap -= buf_len;
270
271 if buf_len < buf.len() {
272 buf.consume(buf_len);
273
274 break;
276 }
277
278 entry.remove();
279 }
280
281 self.flow_control.add_consumed(len as u64);
283
284 Ok((len, self.is_fin()))
285 }
286
287 pub fn reset(
289 &mut self, error_code: u64, final_size: u64,
290 ) -> Result<RecvBufResetReturn> {
291 if let Some(fin_off) = self.fin_off {
293 if fin_off != final_size {
294 return Err(Error::FinalSize);
295 }
296 }
297
298 if final_size < self.len {
300 return Err(Error::FinalSize);
301 }
302
303 if self.error.is_some() {
304 return Ok(RecvBufResetReturn::zero());
306 }
307
308 let result = RecvBufResetReturn {
311 max_data_delta: final_size - self.len,
312 consumed_flowcontrol: final_size - self.off,
313 };
314
315 self.error = Some(error_code);
316
317 self.off = final_size;
319
320 self.data.clear();
321
322 let buf = RangeBuf::from(b"", final_size, true);
325 self.write(buf)?;
326
327 Ok(result)
328 }
329
330 pub fn update_max_data(&mut self, now: Instant) {
332 self.flow_control.update_max_data(now);
333 }
334
335 pub fn max_data_next(&mut self) -> u64 {
337 self.flow_control.max_data_next()
338 }
339
340 pub fn max_data(&self) -> u64 {
342 self.flow_control.max_data()
343 }
344
345 pub fn window(&self) -> u64 {
347 self.flow_control.window()
348 }
349
350 pub fn autotune_window(&mut self, now: Instant, rtt: Duration) {
352 self.flow_control.autotune_window(now, rtt);
353 }
354
355 pub fn shutdown(&mut self) -> Result<u64> {
359 if self.drain {
360 return Err(Error::Done);
361 }
362
363 self.drain = true;
364
365 self.data.clear();
366
367 let consumed = self.max_off() - self.off;
368 self.off = self.max_off();
369
370 Ok(consumed)
371 }
372
373 pub fn off_front(&self) -> u64 {
375 self.off
376 }
377
378 pub fn almost_full(&self) -> bool {
380 self.fin_off.is_none() && self.flow_control.should_update_max_data()
381 }
382
383 pub fn max_off(&self) -> u64 {
385 self.len
386 }
387
388 pub fn is_fin(&self) -> bool {
393 if self.fin_off == Some(self.off) {
394 return true;
395 }
396
397 false
398 }
399
400 pub fn is_draining(&self) -> bool {
402 self.drain
403 }
404
405 pub fn ready(&self) -> bool {
407 let (_, buf) = match self.data.first_key_value() {
408 Some(v) => v,
409 None => return false,
410 };
411
412 buf.off() == self.off
413 }
414}
415
416#[cfg(test)]
417mod tests {
418 use super::*;
419 use rstest::rstest;
420
421 fn assert_emit_discard(
439 recv: &mut RecvBuf, emit: bool, target_len: usize, result_len: usize,
440 is_fin: bool, test_bytes: Option<&[u8]>,
441 ) {
442 let mut buf = [0; 32];
443 let action = if emit {
444 RecvAction::Emit {
445 out: &mut buf[..target_len],
446 }
447 } else {
448 RecvAction::Discard { len: target_len }
449 };
450
451 let (read, fin) = recv.emit_or_discard(action).unwrap();
452
453 if emit {
454 if let Some(v) = test_bytes {
455 assert_eq!(&buf[..read], v);
456 }
457 }
458
459 assert_eq!(read, result_len);
460 assert_eq!(is_fin, fin);
461 }
462
463 fn assert_emit_discard_done(recv: &mut RecvBuf, emit: bool) {
465 let mut buf = [0; 32];
466 let action = if emit {
467 RecvAction::Emit { out: &mut buf }
468 } else {
469 RecvAction::Discard { len: 32 }
470 };
471 assert_eq!(recv.emit_or_discard(action), Err(Error::Done));
472 }
473
474 #[rstest]
475 fn empty_read(#[values(true, false)] emit: bool) {
476 let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
477 assert_eq!(recv.len, 0);
478
479 assert_emit_discard_done(&mut recv, emit);
480 }
481
482 #[rstest]
483 fn empty_stream_frame(#[values(true, false)] emit: bool) {
484 let mut recv = RecvBuf::new(15, DEFAULT_STREAM_WINDOW);
485 assert_eq!(recv.len, 0);
486
487 let buf = RangeBuf::from(b"hello", 0, false);
488 assert!(recv.write(buf).is_ok());
489 assert_eq!(recv.len, 5);
490 assert_eq!(recv.off, 0);
491 assert_eq!(recv.data.len(), 1);
492
493 assert_emit_discard(&mut recv, emit, 32, 5, false, None);
494
495 let buf = RangeBuf::from(b"", 10, false);
497 assert!(recv.write(buf).is_ok());
498 assert_eq!(recv.len, 5);
499 assert_eq!(recv.off, 5);
500 assert_eq!(recv.data.len(), 0);
501
502 let buf = RangeBuf::from(b"", 16, false);
504 assert_eq!(recv.write(buf), Err(Error::FlowControl));
505
506 let buf = RangeBuf::from(b"", 5, true);
508 assert!(recv.write(buf).is_ok());
509 assert_eq!(recv.len, 5);
510 assert_eq!(recv.off, 5);
511 assert_eq!(recv.data.len(), 1);
512
513 let buf = RangeBuf::from(b"", 5, true);
515 assert!(recv.write(buf).is_ok());
516 assert_eq!(recv.len, 5);
517 assert_eq!(recv.off, 5);
518 assert_eq!(recv.data.len(), 1);
519
520 let buf = RangeBuf::from(b"aa", 3, true);
522 assert!(recv.write(buf).is_ok());
523 assert_eq!(recv.len, 5);
524 assert_eq!(recv.off, 5);
525 assert_eq!(recv.data.len(), 1);
526
527 let buf = RangeBuf::from(b"", 6, true);
529 assert_eq!(recv.write(buf), Err(Error::FinalSize));
530 let buf = RangeBuf::from(b"", 4, true);
531 assert_eq!(recv.write(buf), Err(Error::FinalSize));
532
533 assert_emit_discard(&mut recv, emit, 32, 0, true, None);
534 }
535
536 #[rstest]
537 fn ordered_read(#[values(true, false)] emit: bool) {
538 let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
539 assert_eq!(recv.len, 0);
540
541 let first = RangeBuf::from(b"hello", 0, false);
542 let second = RangeBuf::from(b"world", 5, false);
543 let third = RangeBuf::from(b"something", 10, true);
544
545 assert!(recv.write(second).is_ok());
546 assert_eq!(recv.len, 10);
547 assert_eq!(recv.off, 0);
548
549 assert_emit_discard_done(&mut recv, emit);
550
551 assert!(recv.write(third).is_ok());
552 assert_eq!(recv.len, 19);
553 assert_eq!(recv.off, 0);
554
555 assert_emit_discard_done(&mut recv, emit);
556
557 assert!(recv.write(first).is_ok());
558 assert_eq!(recv.len, 19);
559 assert_eq!(recv.off, 0);
560
561 assert_emit_discard(
562 &mut recv,
563 emit,
564 32,
565 19,
566 true,
567 Some(b"helloworldsomething"),
568 );
569 assert_eq!(recv.len, 19);
570 assert_eq!(recv.off, 19);
571
572 assert_emit_discard_done(&mut recv, emit);
573 }
574
575 #[rstest]
577 fn shutdown(#[values(true, false)] emit: bool) {
578 let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
579 assert_eq!(recv.len, 0);
580
581 let first = RangeBuf::from(b"hello", 0, false);
582 let second = RangeBuf::from(b"world", 5, false);
583 let third = RangeBuf::from(b"something", 10, false);
584
585 assert!(recv.write(second).is_ok());
586 assert_eq!(recv.len, 10);
587 assert_eq!(recv.off, 0);
588
589 assert_emit_discard_done(&mut recv, emit);
590
591 assert_eq!(recv.shutdown(), Ok(10));
593 assert_eq!(recv.len, 10);
594 assert_eq!(recv.off, 10);
595 assert_eq!(recv.data.len(), 0);
596
597 assert_emit_discard_done(&mut recv, emit);
598
599 assert!(recv.write(first).is_ok());
601 assert_eq!(recv.len, 10);
602 assert_eq!(recv.off, 10);
603 assert_eq!(recv.data.len(), 0);
604
605 assert!(recv.write(third).is_ok());
608 assert_eq!(recv.len, 19);
609 assert_eq!(recv.off, 19);
610 assert_eq!(recv.data.len(), 0);
611
612 assert_emit_discard_done(&mut recv, emit);
614 assert_eq!(
615 recv.reset(42, 123),
616 Ok(RecvBufResetReturn {
617 max_data_delta: 104,
618 consumed_flowcontrol: 104,
619 })
620 );
621 assert_eq!(recv.len, 123);
622 assert_eq!(recv.off, 123);
623 assert_eq!(recv.data.len(), 0);
624
625 assert_emit_discard_done(&mut recv, emit);
626 }
627
628 #[rstest]
629 fn split_read(#[values(true, false)] emit: bool) {
630 let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
631 assert_eq!(recv.len, 0);
632
633 let first = RangeBuf::from(b"something", 0, false);
634 let second = RangeBuf::from(b"helloworld", 9, true);
635
636 assert!(recv.write(first).is_ok());
637 assert_eq!(recv.len, 9);
638 assert_eq!(recv.off, 0);
639
640 assert!(recv.write(second).is_ok());
641 assert_eq!(recv.len, 19);
642 assert_eq!(recv.off, 0);
643
644 assert_emit_discard(&mut recv, emit, 10, 10, false, Some(b"somethingh"));
645 assert_eq!(recv.len, 19);
646 assert_eq!(recv.off, 10);
647
648 assert_emit_discard(&mut recv, emit, 5, 5, false, Some(b"ellow"));
649 assert_eq!(recv.len, 19);
650 assert_eq!(recv.off, 15);
651
652 assert_emit_discard(&mut recv, emit, 5, 4, true, Some(b"orld"));
653 assert_eq!(recv.len, 19);
654 assert_eq!(recv.off, 19);
655 }
656
657 #[rstest]
658 fn incomplete_read(#[values(true, false)] emit: bool) {
659 let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
660 assert_eq!(recv.len, 0);
661
662 let mut buf = [0; 32];
663
664 let first = RangeBuf::from(b"something", 0, false);
665 let second = RangeBuf::from(b"helloworld", 9, true);
666
667 assert!(recv.write(second).is_ok());
668 assert_eq!(recv.len, 19);
669 assert_eq!(recv.off, 0);
670
671 let action = if emit {
672 RecvAction::Emit { out: &mut buf }
673 } else {
674 RecvAction::Discard { len: 32 }
675 };
676 assert_eq!(recv.emit_or_discard(action), Err(Error::Done));
677
678 assert!(recv.write(first).is_ok());
679 assert_eq!(recv.len, 19);
680 assert_eq!(recv.off, 0);
681
682 assert_emit_discard(
683 &mut recv,
684 emit,
685 32,
686 19,
687 true,
688 Some(b"somethinghelloworld"),
689 );
690 assert_eq!(recv.len, 19);
691 assert_eq!(recv.off, 19);
692 }
693
694 #[rstest]
695 fn zero_len_read(#[values(true, false)] emit: bool) {
696 let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
697 assert_eq!(recv.len, 0);
698
699 let first = RangeBuf::from(b"something", 0, false);
700 let second = RangeBuf::from(b"", 9, true);
701
702 assert!(recv.write(first).is_ok());
703 assert_eq!(recv.len, 9);
704 assert_eq!(recv.off, 0);
705 assert_eq!(recv.data.len(), 1);
706
707 assert!(recv.write(second).is_ok());
708 assert_eq!(recv.len, 9);
709 assert_eq!(recv.off, 0);
710 assert_eq!(recv.data.len(), 1);
711
712 assert_emit_discard(&mut recv, emit, 32, 9, true, Some(b"something"));
713 assert_eq!(recv.len, 9);
714 assert_eq!(recv.off, 9);
715 }
716
717 #[rstest]
718 fn past_read(#[values(true, false)] emit: bool) {
719 let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
720 assert_eq!(recv.len, 0);
721
722 let first = RangeBuf::from(b"something", 0, false);
723 let second = RangeBuf::from(b"hello", 3, false);
724 let third = RangeBuf::from(b"ello", 4, true);
725 let fourth = RangeBuf::from(b"ello", 5, true);
726
727 assert!(recv.write(first).is_ok());
728 assert_eq!(recv.len, 9);
729 assert_eq!(recv.off, 0);
730 assert_eq!(recv.data.len(), 1);
731
732 assert_emit_discard(&mut recv, emit, 32, 9, false, Some(b"something"));
733 assert_eq!(recv.len, 9);
734 assert_eq!(recv.off, 9);
735
736 assert!(recv.write(second).is_ok());
737 assert_eq!(recv.len, 9);
738 assert_eq!(recv.off, 9);
739 assert_eq!(recv.data.len(), 0);
740
741 assert_eq!(recv.write(third), Err(Error::FinalSize));
742
743 assert!(recv.write(fourth).is_ok());
744 assert_eq!(recv.len, 9);
745 assert_eq!(recv.off, 9);
746 assert_eq!(recv.data.len(), 0);
747
748 assert_emit_discard_done(&mut recv, emit);
749 }
750
751 #[rstest]
752 fn fully_overlapping_read(#[values(true, false)] emit: bool) {
753 let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
754 assert_eq!(recv.len, 0);
755
756 let first = RangeBuf::from(b"something", 0, false);
757 let second = RangeBuf::from(b"hello", 4, false);
758
759 assert!(recv.write(first).is_ok());
760 assert_eq!(recv.len, 9);
761 assert_eq!(recv.off, 0);
762 assert_eq!(recv.data.len(), 1);
763
764 assert!(recv.write(second).is_ok());
765 assert_eq!(recv.len, 9);
766 assert_eq!(recv.off, 0);
767 assert_eq!(recv.data.len(), 1);
768
769 assert_emit_discard(&mut recv, emit, 32, 9, false, Some(b"something"));
770 assert_eq!(recv.len, 9);
771 assert_eq!(recv.off, 9);
772 assert_eq!(recv.data.len(), 0);
773
774 assert_emit_discard_done(&mut recv, emit);
775 }
776
777 #[rstest]
778 fn fully_overlapping_read2(#[values(true, false)] emit: bool) {
779 let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
780 assert_eq!(recv.len, 0);
781
782 let first = RangeBuf::from(b"something", 0, false);
783 let second = RangeBuf::from(b"hello", 4, false);
784
785 assert!(recv.write(second).is_ok());
786 assert_eq!(recv.len, 9);
787 assert_eq!(recv.off, 0);
788 assert_eq!(recv.data.len(), 1);
789
790 assert!(recv.write(first).is_ok());
791 assert_eq!(recv.len, 9);
792 assert_eq!(recv.off, 0);
793 assert_eq!(recv.data.len(), 2);
794
795 assert_emit_discard(&mut recv, emit, 32, 9, false, Some(b"somehello"));
796 assert_eq!(recv.len, 9);
797 assert_eq!(recv.off, 9);
798 assert_eq!(recv.data.len(), 0);
799
800 assert_emit_discard_done(&mut recv, emit);
801 }
802
803 #[rstest]
804 fn fully_overlapping_read3(#[values(true, false)] emit: bool) {
805 let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
806 assert_eq!(recv.len, 0);
807
808 let first = RangeBuf::from(b"something", 0, false);
809 let second = RangeBuf::from(b"hello", 3, false);
810
811 assert!(recv.write(second).is_ok());
812 assert_eq!(recv.len, 8);
813 assert_eq!(recv.off, 0);
814 assert_eq!(recv.data.len(), 1);
815
816 assert!(recv.write(first).is_ok());
817 assert_eq!(recv.len, 9);
818 assert_eq!(recv.off, 0);
819 assert_eq!(recv.data.len(), 3);
820
821 assert_emit_discard(&mut recv, emit, 32, 9, false, Some(b"somhellog"));
822 assert_eq!(recv.len, 9);
823 assert_eq!(recv.off, 9);
824 assert_eq!(recv.data.len(), 0);
825
826 assert_emit_discard_done(&mut recv, emit);
827 }
828
829 #[rstest]
830 fn fully_overlapping_read_multi(#[values(true, false)] emit: bool) {
831 let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
832 assert_eq!(recv.len, 0);
833
834 let first = RangeBuf::from(b"somethingsomething", 0, false);
835 let second = RangeBuf::from(b"hello", 3, false);
836 let third = RangeBuf::from(b"hello", 12, false);
837
838 assert!(recv.write(second).is_ok());
839 assert_eq!(recv.len, 8);
840 assert_eq!(recv.off, 0);
841 assert_eq!(recv.data.len(), 1);
842
843 assert!(recv.write(third).is_ok());
844 assert_eq!(recv.len, 17);
845 assert_eq!(recv.off, 0);
846 assert_eq!(recv.data.len(), 2);
847
848 assert!(recv.write(first).is_ok());
849 assert_eq!(recv.len, 18);
850 assert_eq!(recv.off, 0);
851 assert_eq!(recv.data.len(), 5);
852
853 assert_emit_discard(
854 &mut recv,
855 emit,
856 32,
857 18,
858 false,
859 Some(b"somhellogsomhellog"),
860 );
861 assert_eq!(recv.len, 18);
862 assert_eq!(recv.off, 18);
863 assert_eq!(recv.data.len(), 0);
864
865 assert_emit_discard_done(&mut recv, emit);
866 }
867
868 #[rstest]
869 fn overlapping_start_read(#[values(true, false)] emit: bool) {
870 let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
871 assert_eq!(recv.len, 0);
872
873 let first = RangeBuf::from(b"something", 0, false);
874 let second = RangeBuf::from(b"hello", 8, true);
875
876 assert!(recv.write(first).is_ok());
877 assert_eq!(recv.len, 9);
878 assert_eq!(recv.off, 0);
879 assert_eq!(recv.data.len(), 1);
880
881 assert!(recv.write(second).is_ok());
882 assert_eq!(recv.len, 13);
883 assert_eq!(recv.off, 0);
884 assert_eq!(recv.data.len(), 2);
885
886 assert_emit_discard(
887 &mut recv,
888 emit,
889 32,
890 13,
891 true,
892 Some(b"somethingello"),
893 );
894
895 assert_eq!(recv.len, 13);
896 assert_eq!(recv.off, 13);
897
898 assert_emit_discard_done(&mut recv, emit);
899 }
900
901 #[rstest]
902 fn overlapping_end_read(#[values(true, false)] emit: bool) {
903 let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
904 assert_eq!(recv.len, 0);
905
906 let first = RangeBuf::from(b"hello", 0, false);
907 let second = RangeBuf::from(b"something", 3, true);
908
909 assert!(recv.write(second).is_ok());
910 assert_eq!(recv.len, 12);
911 assert_eq!(recv.off, 0);
912 assert_eq!(recv.data.len(), 1);
913
914 assert!(recv.write(first).is_ok());
915 assert_eq!(recv.len, 12);
916 assert_eq!(recv.off, 0);
917 assert_eq!(recv.data.len(), 2);
918
919 assert_emit_discard(&mut recv, emit, 32, 12, true, Some(b"helsomething"));
920 assert_eq!(recv.len, 12);
921 assert_eq!(recv.off, 12);
922
923 assert_emit_discard_done(&mut recv, emit);
924 }
925
926 #[rstest]
927 fn overlapping_end_twice_read(#[values(true, false)] emit: bool) {
928 let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
929 assert_eq!(recv.len, 0);
930
931 let first = RangeBuf::from(b"he", 0, false);
932 let second = RangeBuf::from(b"ow", 4, false);
933 let third = RangeBuf::from(b"rl", 7, false);
934 let fourth = RangeBuf::from(b"helloworld", 0, true);
935
936 assert!(recv.write(third).is_ok());
937 assert_eq!(recv.len, 9);
938 assert_eq!(recv.off, 0);
939 assert_eq!(recv.data.len(), 1);
940
941 assert!(recv.write(second).is_ok());
942 assert_eq!(recv.len, 9);
943 assert_eq!(recv.off, 0);
944 assert_eq!(recv.data.len(), 2);
945
946 assert!(recv.write(first).is_ok());
947 assert_eq!(recv.len, 9);
948 assert_eq!(recv.off, 0);
949 assert_eq!(recv.data.len(), 3);
950
951 assert!(recv.write(fourth).is_ok());
952 assert_eq!(recv.len, 10);
953 assert_eq!(recv.off, 0);
954 assert_eq!(recv.data.len(), 6);
955
956 assert_emit_discard(&mut recv, emit, 32, 10, true, Some(b"helloworld"));
957 assert_eq!(recv.len, 10);
958 assert_eq!(recv.off, 10);
959
960 assert_emit_discard_done(&mut recv, emit);
961 }
962
963 #[rstest]
964 fn overlapping_end_twice_and_contained_read(
965 #[values(true, false)] emit: bool,
966 ) {
967 let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
968 assert_eq!(recv.len, 0);
969
970 let first = RangeBuf::from(b"hellow", 0, false);
971 let second = RangeBuf::from(b"barfoo", 10, true);
972 let third = RangeBuf::from(b"rl", 7, false);
973 let fourth = RangeBuf::from(b"elloworldbarfoo", 1, true);
974
975 assert!(recv.write(third).is_ok());
976 assert_eq!(recv.len, 9);
977 assert_eq!(recv.off, 0);
978 assert_eq!(recv.data.len(), 1);
979
980 assert!(recv.write(second).is_ok());
981 assert_eq!(recv.len, 16);
982 assert_eq!(recv.off, 0);
983 assert_eq!(recv.data.len(), 2);
984
985 assert!(recv.write(first).is_ok());
986 assert_eq!(recv.len, 16);
987 assert_eq!(recv.off, 0);
988 assert_eq!(recv.data.len(), 3);
989
990 assert!(recv.write(fourth).is_ok());
991 assert_eq!(recv.len, 16);
992 assert_eq!(recv.off, 0);
993 assert_eq!(recv.data.len(), 5);
994
995 assert_emit_discard(
996 &mut recv,
997 emit,
998 32,
999 16,
1000 true,
1001 Some(b"helloworldbarfoo"),
1002 );
1003 assert_eq!(recv.len, 16);
1004 assert_eq!(recv.off, 16);
1005
1006 assert_emit_discard_done(&mut recv, emit);
1007 }
1008
1009 #[rstest]
1010 fn partially_multi_overlapping_reordered_read(
1011 #[values(true, false)] emit: bool,
1012 ) {
1013 let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
1014 assert_eq!(recv.len, 0);
1015
1016 let first = RangeBuf::from(b"hello", 8, false);
1017 let second = RangeBuf::from(b"something", 0, false);
1018 let third = RangeBuf::from(b"moar", 11, true);
1019
1020 assert!(recv.write(first).is_ok());
1021 assert_eq!(recv.len, 13);
1022 assert_eq!(recv.off, 0);
1023 assert_eq!(recv.data.len(), 1);
1024
1025 assert!(recv.write(second).is_ok());
1026 assert_eq!(recv.len, 13);
1027 assert_eq!(recv.off, 0);
1028 assert_eq!(recv.data.len(), 2);
1029
1030 assert!(recv.write(third).is_ok());
1031 assert_eq!(recv.len, 15);
1032 assert_eq!(recv.off, 0);
1033 assert_eq!(recv.data.len(), 3);
1034
1035 assert_emit_discard(
1036 &mut recv,
1037 emit,
1038 32,
1039 15,
1040 true,
1041 Some(b"somethinhelloar"),
1042 );
1043 assert_eq!(recv.len, 15);
1044 assert_eq!(recv.off, 15);
1045 assert_eq!(recv.data.len(), 0);
1046
1047 assert_emit_discard_done(&mut recv, emit);
1048 }
1049
1050 #[rstest]
1051 fn partially_multi_overlapping_reordered_read2(
1052 #[values(true, false)] emit: bool,
1053 ) {
1054 let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
1055 assert_eq!(recv.len, 0);
1056
1057 let first = RangeBuf::from(b"aaa", 0, false);
1058 let second = RangeBuf::from(b"bbb", 2, false);
1059 let third = RangeBuf::from(b"ccc", 4, false);
1060 let fourth = RangeBuf::from(b"ddd", 6, false);
1061 let fifth = RangeBuf::from(b"eee", 9, false);
1062 let sixth = RangeBuf::from(b"fff", 11, false);
1063
1064 assert!(recv.write(second).is_ok());
1065 assert_eq!(recv.len, 5);
1066 assert_eq!(recv.off, 0);
1067 assert_eq!(recv.data.len(), 1);
1068
1069 assert!(recv.write(fourth).is_ok());
1070 assert_eq!(recv.len, 9);
1071 assert_eq!(recv.off, 0);
1072 assert_eq!(recv.data.len(), 2);
1073
1074 assert!(recv.write(third).is_ok());
1075 assert_eq!(recv.len, 9);
1076 assert_eq!(recv.off, 0);
1077 assert_eq!(recv.data.len(), 3);
1078
1079 assert!(recv.write(first).is_ok());
1080 assert_eq!(recv.len, 9);
1081 assert_eq!(recv.off, 0);
1082 assert_eq!(recv.data.len(), 4);
1083
1084 assert!(recv.write(sixth).is_ok());
1085 assert_eq!(recv.len, 14);
1086 assert_eq!(recv.off, 0);
1087 assert_eq!(recv.data.len(), 5);
1088
1089 assert!(recv.write(fifth).is_ok());
1090 assert_eq!(recv.len, 14);
1091 assert_eq!(recv.off, 0);
1092 assert_eq!(recv.data.len(), 6);
1093
1094 assert_emit_discard(
1095 &mut recv,
1096 emit,
1097 32,
1098 14,
1099 false,
1100 Some(b"aabbbcdddeefff"),
1101 );
1102 assert_eq!(recv.len, 14);
1103 assert_eq!(recv.off, 14);
1104 assert_eq!(recv.data.len(), 0);
1105
1106 assert_emit_discard_done(&mut recv, emit);
1107 }
1108
1109 #[test]
1110 fn mixed_read_actions() {
1111 let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
1112 assert_eq!(recv.len, 0);
1113
1114 let first = RangeBuf::from(b"hello", 0, false);
1115 let second = RangeBuf::from(b"world", 5, false);
1116 let third = RangeBuf::from(b"something", 10, true);
1117
1118 assert!(recv.write(second).is_ok());
1119 assert_eq!(recv.len, 10);
1120 assert_eq!(recv.off, 0);
1121
1122 assert_emit_discard_done(&mut recv, true);
1123 assert_emit_discard_done(&mut recv, false);
1124
1125 assert!(recv.write(third).is_ok());
1126 assert_eq!(recv.len, 19);
1127 assert_eq!(recv.off, 0);
1128
1129 assert_emit_discard_done(&mut recv, true);
1130 assert_emit_discard_done(&mut recv, false);
1131
1132 assert!(recv.write(first).is_ok());
1133 assert_eq!(recv.len, 19);
1134 assert_eq!(recv.off, 0);
1135
1136 assert_emit_discard(&mut recv, true, 5, 5, false, Some(b"hello"));
1137 assert_eq!(recv.len, 19);
1138 assert_eq!(recv.off, 5);
1139
1140 assert_emit_discard(&mut recv, false, 5, 5, false, None);
1141 assert_eq!(recv.len, 19);
1142 assert_eq!(recv.off, 10);
1143
1144 assert_emit_discard(&mut recv, true, 9, 9, true, Some(b"something"));
1145 assert_eq!(recv.len, 19);
1146 assert_eq!(recv.off, 19);
1147
1148 assert_emit_discard_done(&mut recv, true);
1149 assert_emit_discard_done(&mut recv, false);
1150 }
1151}