1use std::cmp;
28
29use std::collections::BTreeMap;
30use std::collections::VecDeque;
31
32use std::time::Duration;
33use std::time::Instant;
34
35use crate::stream::RecvBufResetReturn;
36use crate::Error;
37use crate::Result;
38
39use crate::flowcontrol;
40
41use crate::range_buf::RangeBuf;
42
43use super::DEFAULT_STREAM_WINDOW;
44
45#[derive(Debug, Default)]
51pub struct RecvBuf {
52 data: BTreeMap<u64, RangeBuf>,
55
56 off: u64,
58
59 len: u64,
61
62 flow_control: flowcontrol::FlowControl,
64
65 fin_off: Option<u64>,
67
68 error: Option<u64>,
70
71 drain: bool,
73}
74
75impl RecvBuf {
76 pub fn new(max_data: u64, max_window: u64) -> RecvBuf {
78 RecvBuf {
79 flow_control: flowcontrol::FlowControl::new(
80 max_data,
81 cmp::min(max_data, DEFAULT_STREAM_WINDOW),
82 max_window,
83 ),
84 ..RecvBuf::default()
85 }
86 }
87
88 pub fn write(&mut self, buf: RangeBuf) -> Result<()> {
94 if buf.max_off() > self.max_data() {
95 return Err(Error::FlowControl);
96 }
97
98 if let Some(fin_off) = self.fin_off {
99 if buf.max_off() > fin_off {
101 return Err(Error::FinalSize);
102 }
103
104 if buf.fin() && fin_off != buf.max_off() {
106 return Err(Error::FinalSize);
107 }
108 }
109
110 if buf.fin() && buf.max_off() < self.len {
112 return Err(Error::FinalSize);
113 }
114
115 if self.fin_off.is_some() && buf.is_empty() {
118 return Ok(());
119 }
120
121 if buf.fin() {
122 self.fin_off = Some(buf.max_off());
123 }
124
125 if !buf.fin() && buf.is_empty() {
127 return Ok(());
128 }
129
130 if self.off >= buf.max_off() {
133 if !buf.is_empty() {
139 return Ok(());
140 }
141 }
142
143 let mut tmp_bufs = VecDeque::with_capacity(2);
144 tmp_bufs.push_back(buf);
145
146 'tmp: while let Some(mut buf) = tmp_bufs.pop_front() {
147 if self.off_front() > buf.off() {
153 buf = buf.split_off((self.off_front() - buf.off()) as usize);
154 }
155
156 if buf.off() < self.max_off() || buf.is_empty() {
163 for (_, b) in self.data.range(buf.off()..) {
164 let off = buf.off();
165
166 if b.off() > buf.max_off() {
168 break;
169 }
170
171 if off >= b.off() && buf.max_off() <= b.max_off() {
173 continue 'tmp;
174 }
175
176 if off >= b.off() && off < b.max_off() {
178 buf = buf.split_off((b.max_off() - off) as usize);
179 }
180
181 if off < b.off() && buf.max_off() > b.off() {
183 tmp_bufs
184 .push_back(buf.split_off((b.off() - off) as usize));
185 }
186 }
187 }
188
189 self.len = cmp::max(self.len, buf.max_off());
190
191 if !self.drain {
192 self.data.insert(buf.max_off(), buf);
193 } else {
194 self.off = self.len;
196 }
197 }
198
199 Ok(())
200 }
201
202 pub fn emit(&mut self, out: &mut [u8]) -> Result<(usize, bool)> {
212 let mut len = 0;
213 let mut cap = out.len();
214
215 if !self.ready() {
216 return Err(Error::Done);
217 }
218
219 if let Some(e) = self.error {
222 self.data.clear();
223 return Err(Error::StreamReset(e));
224 }
225
226 while cap > 0 && self.ready() {
227 let mut entry = match self.data.first_entry() {
228 Some(entry) => entry,
229 None => break,
230 };
231
232 let buf = entry.get_mut();
233
234 let buf_len = cmp::min(buf.len(), cap);
235
236 out[len..len + buf_len].copy_from_slice(&buf[..buf_len]);
237
238 self.off += buf_len as u64;
239
240 len += buf_len;
241 cap -= buf_len;
242
243 if buf_len < buf.len() {
244 buf.consume(buf_len);
245
246 break;
248 }
249
250 entry.remove();
251 }
252
253 self.flow_control.add_consumed(len as u64);
255
256 Ok((len, self.is_fin()))
257 }
258
259 pub fn reset(
261 &mut self, error_code: u64, final_size: u64,
262 ) -> Result<RecvBufResetReturn> {
263 if let Some(fin_off) = self.fin_off {
265 if fin_off != final_size {
266 return Err(Error::FinalSize);
267 }
268 }
269
270 if final_size < self.len {
272 return Err(Error::FinalSize);
273 }
274
275 if self.error.is_some() {
276 return Ok(RecvBufResetReturn::zero());
278 }
279
280 let result = RecvBufResetReturn {
283 max_data_delta: final_size - self.len,
284 consumed_flowcontrol: final_size - self.off,
285 };
286
287 self.error = Some(error_code);
288
289 self.off = final_size;
291
292 self.data.clear();
293
294 let buf = RangeBuf::from(b"", final_size, true);
297 self.write(buf)?;
298
299 Ok(result)
300 }
301
302 pub fn update_max_data(&mut self, now: Instant) {
304 self.flow_control.update_max_data(now);
305 }
306
307 pub fn max_data_next(&mut self) -> u64 {
309 self.flow_control.max_data_next()
310 }
311
312 pub fn max_data(&self) -> u64 {
314 self.flow_control.max_data()
315 }
316
317 pub fn window(&self) -> u64 {
319 self.flow_control.window()
320 }
321
322 pub fn autotune_window(&mut self, now: Instant, rtt: Duration) {
324 self.flow_control.autotune_window(now, rtt);
325 }
326
327 pub fn shutdown(&mut self) -> Result<u64> {
331 if self.drain {
332 return Err(Error::Done);
333 }
334
335 self.drain = true;
336
337 self.data.clear();
338
339 let consumed = self.max_off() - self.off;
340 self.off = self.max_off();
341
342 Ok(consumed)
343 }
344
345 pub fn off_front(&self) -> u64 {
347 self.off
348 }
349
350 pub fn almost_full(&self) -> bool {
352 self.fin_off.is_none() && self.flow_control.should_update_max_data()
353 }
354
355 pub fn max_off(&self) -> u64 {
357 self.len
358 }
359
360 pub fn is_fin(&self) -> bool {
365 if self.fin_off == Some(self.off) {
366 return true;
367 }
368
369 false
370 }
371
372 pub fn is_draining(&self) -> bool {
374 self.drain
375 }
376
377 pub fn ready(&self) -> bool {
379 let (_, buf) = match self.data.first_key_value() {
380 Some(v) => v,
381 None => return false,
382 };
383
384 buf.off() == self.off
385 }
386}
387
388#[cfg(test)]
389mod tests {
390 use super::*;
391
392 #[test]
393 fn empty_read() {
394 let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
395 assert_eq!(recv.len, 0);
396
397 let mut buf = [0; 32];
398
399 assert_eq!(recv.emit(&mut buf), Err(Error::Done));
400 }
401
402 #[test]
403 fn empty_stream_frame() {
404 let mut recv = RecvBuf::new(15, DEFAULT_STREAM_WINDOW);
405 assert_eq!(recv.len, 0);
406
407 let buf = RangeBuf::from(b"hello", 0, false);
408 assert!(recv.write(buf).is_ok());
409 assert_eq!(recv.len, 5);
410 assert_eq!(recv.off, 0);
411 assert_eq!(recv.data.len(), 1);
412
413 let mut buf = [0; 32];
414 assert_eq!(recv.emit(&mut buf), Ok((5, false)));
415
416 let buf = RangeBuf::from(b"", 10, false);
418 assert!(recv.write(buf).is_ok());
419 assert_eq!(recv.len, 5);
420 assert_eq!(recv.off, 5);
421 assert_eq!(recv.data.len(), 0);
422
423 let buf = RangeBuf::from(b"", 16, false);
425 assert_eq!(recv.write(buf), Err(Error::FlowControl));
426
427 let buf = RangeBuf::from(b"", 5, true);
429 assert!(recv.write(buf).is_ok());
430 assert_eq!(recv.len, 5);
431 assert_eq!(recv.off, 5);
432 assert_eq!(recv.data.len(), 1);
433
434 let buf = RangeBuf::from(b"", 5, true);
436 assert!(recv.write(buf).is_ok());
437 assert_eq!(recv.len, 5);
438 assert_eq!(recv.off, 5);
439 assert_eq!(recv.data.len(), 1);
440
441 let buf = RangeBuf::from(b"aa", 3, true);
443 assert!(recv.write(buf).is_ok());
444 assert_eq!(recv.len, 5);
445 assert_eq!(recv.off, 5);
446 assert_eq!(recv.data.len(), 1);
447
448 let buf = RangeBuf::from(b"", 6, true);
450 assert_eq!(recv.write(buf), Err(Error::FinalSize));
451 let buf = RangeBuf::from(b"", 4, true);
452 assert_eq!(recv.write(buf), Err(Error::FinalSize));
453
454 let mut buf = [0; 32];
455 assert_eq!(recv.emit(&mut buf), Ok((0, true)));
456 }
457
458 #[test]
459 fn ordered_read() {
460 let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
461 assert_eq!(recv.len, 0);
462
463 let mut buf = [0; 32];
464
465 let first = RangeBuf::from(b"hello", 0, false);
466 let second = RangeBuf::from(b"world", 5, false);
467 let third = RangeBuf::from(b"something", 10, true);
468
469 assert!(recv.write(second).is_ok());
470 assert_eq!(recv.len, 10);
471 assert_eq!(recv.off, 0);
472
473 assert_eq!(recv.emit(&mut buf), Err(Error::Done));
474
475 assert!(recv.write(third).is_ok());
476 assert_eq!(recv.len, 19);
477 assert_eq!(recv.off, 0);
478
479 assert_eq!(recv.emit(&mut buf), Err(Error::Done));
480
481 assert!(recv.write(first).is_ok());
482 assert_eq!(recv.len, 19);
483 assert_eq!(recv.off, 0);
484
485 let (len, fin) = recv.emit(&mut buf).unwrap();
486 assert_eq!(len, 19);
487 assert!(fin);
488 assert_eq!(&buf[..len], b"helloworldsomething");
489 assert_eq!(recv.len, 19);
490 assert_eq!(recv.off, 19);
491
492 assert_eq!(recv.emit(&mut buf), Err(Error::Done));
493 }
494
495 #[test]
497 fn shutdown() {
498 let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
499 assert_eq!(recv.len, 0);
500
501 let mut buf = [0; 32];
502
503 let first = RangeBuf::from(b"hello", 0, false);
504 let second = RangeBuf::from(b"world", 5, false);
505 let third = RangeBuf::from(b"something", 10, false);
506
507 assert!(recv.write(second).is_ok());
508 assert_eq!(recv.len, 10);
509 assert_eq!(recv.off, 0);
510
511 assert_eq!(recv.emit(&mut buf), Err(Error::Done));
512
513 assert_eq!(recv.shutdown(), Ok(10));
515 assert_eq!(recv.len, 10);
516 assert_eq!(recv.off, 10);
517 assert_eq!(recv.data.len(), 0);
518
519 assert_eq!(recv.emit(&mut buf), Err(Error::Done));
520
521 assert!(recv.write(first).is_ok());
523 assert_eq!(recv.len, 10);
524 assert_eq!(recv.off, 10);
525 assert_eq!(recv.data.len(), 0);
526
527 assert!(recv.write(third).is_ok());
530 assert_eq!(recv.len, 19);
531 assert_eq!(recv.off, 19);
532 assert_eq!(recv.data.len(), 0);
533
534 assert_eq!(recv.emit(&mut buf), Err(Error::Done));
536 assert_eq!(
537 recv.reset(42, 123),
538 Ok(RecvBufResetReturn {
539 max_data_delta: 104,
540 consumed_flowcontrol: 104,
541 })
542 );
543 assert_eq!(recv.len, 123);
544 assert_eq!(recv.off, 123);
545 assert_eq!(recv.data.len(), 0);
546
547 assert_eq!(recv.emit(&mut buf), Err(Error::Done));
548 }
549
550 #[test]
551 fn split_read() {
552 let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
553 assert_eq!(recv.len, 0);
554
555 let mut buf = [0; 32];
556
557 let first = RangeBuf::from(b"something", 0, false);
558 let second = RangeBuf::from(b"helloworld", 9, true);
559
560 assert!(recv.write(first).is_ok());
561 assert_eq!(recv.len, 9);
562 assert_eq!(recv.off, 0);
563
564 assert!(recv.write(second).is_ok());
565 assert_eq!(recv.len, 19);
566 assert_eq!(recv.off, 0);
567
568 let (len, fin) = recv.emit(&mut buf[..10]).unwrap();
569 assert_eq!(len, 10);
570 assert!(!fin);
571 assert_eq!(&buf[..len], b"somethingh");
572 assert_eq!(recv.len, 19);
573 assert_eq!(recv.off, 10);
574
575 let (len, fin) = recv.emit(&mut buf[..5]).unwrap();
576 assert_eq!(len, 5);
577 assert!(!fin);
578 assert_eq!(&buf[..len], b"ellow");
579 assert_eq!(recv.len, 19);
580 assert_eq!(recv.off, 15);
581
582 let (len, fin) = recv.emit(&mut buf[..10]).unwrap();
583 assert_eq!(len, 4);
584 assert!(fin);
585 assert_eq!(&buf[..len], b"orld");
586 assert_eq!(recv.len, 19);
587 assert_eq!(recv.off, 19);
588 }
589
590 #[test]
591 fn incomplete_read() {
592 let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
593 assert_eq!(recv.len, 0);
594
595 let mut buf = [0; 32];
596
597 let first = RangeBuf::from(b"something", 0, false);
598 let second = RangeBuf::from(b"helloworld", 9, true);
599
600 assert!(recv.write(second).is_ok());
601 assert_eq!(recv.len, 19);
602 assert_eq!(recv.off, 0);
603
604 assert_eq!(recv.emit(&mut buf), Err(Error::Done));
605
606 assert!(recv.write(first).is_ok());
607 assert_eq!(recv.len, 19);
608 assert_eq!(recv.off, 0);
609
610 let (len, fin) = recv.emit(&mut buf).unwrap();
611 assert_eq!(len, 19);
612 assert!(fin);
613 assert_eq!(&buf[..len], b"somethinghelloworld");
614 assert_eq!(recv.len, 19);
615 assert_eq!(recv.off, 19);
616 }
617
618 #[test]
619 fn zero_len_read() {
620 let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
621 assert_eq!(recv.len, 0);
622
623 let mut buf = [0; 32];
624
625 let first = RangeBuf::from(b"something", 0, false);
626 let second = RangeBuf::from(b"", 9, true);
627
628 assert!(recv.write(first).is_ok());
629 assert_eq!(recv.len, 9);
630 assert_eq!(recv.off, 0);
631 assert_eq!(recv.data.len(), 1);
632
633 assert!(recv.write(second).is_ok());
634 assert_eq!(recv.len, 9);
635 assert_eq!(recv.off, 0);
636 assert_eq!(recv.data.len(), 1);
637
638 let (len, fin) = recv.emit(&mut buf).unwrap();
639 assert_eq!(len, 9);
640 assert!(fin);
641 assert_eq!(&buf[..len], b"something");
642 assert_eq!(recv.len, 9);
643 assert_eq!(recv.off, 9);
644 }
645
646 #[test]
647 fn past_read() {
648 let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
649 assert_eq!(recv.len, 0);
650
651 let mut buf = [0; 32];
652
653 let first = RangeBuf::from(b"something", 0, false);
654 let second = RangeBuf::from(b"hello", 3, false);
655 let third = RangeBuf::from(b"ello", 4, true);
656 let fourth = RangeBuf::from(b"ello", 5, true);
657
658 assert!(recv.write(first).is_ok());
659 assert_eq!(recv.len, 9);
660 assert_eq!(recv.off, 0);
661 assert_eq!(recv.data.len(), 1);
662
663 let (len, fin) = recv.emit(&mut buf).unwrap();
664 assert_eq!(len, 9);
665 assert!(!fin);
666 assert_eq!(&buf[..len], b"something");
667 assert_eq!(recv.len, 9);
668 assert_eq!(recv.off, 9);
669
670 assert!(recv.write(second).is_ok());
671 assert_eq!(recv.len, 9);
672 assert_eq!(recv.off, 9);
673 assert_eq!(recv.data.len(), 0);
674
675 assert_eq!(recv.write(third), Err(Error::FinalSize));
676
677 assert!(recv.write(fourth).is_ok());
678 assert_eq!(recv.len, 9);
679 assert_eq!(recv.off, 9);
680 assert_eq!(recv.data.len(), 0);
681
682 assert_eq!(recv.emit(&mut buf), Err(Error::Done));
683 }
684
685 #[test]
686 fn fully_overlapping_read() {
687 let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
688 assert_eq!(recv.len, 0);
689
690 let mut buf = [0; 32];
691
692 let first = RangeBuf::from(b"something", 0, false);
693 let second = RangeBuf::from(b"hello", 4, false);
694
695 assert!(recv.write(first).is_ok());
696 assert_eq!(recv.len, 9);
697 assert_eq!(recv.off, 0);
698 assert_eq!(recv.data.len(), 1);
699
700 assert!(recv.write(second).is_ok());
701 assert_eq!(recv.len, 9);
702 assert_eq!(recv.off, 0);
703 assert_eq!(recv.data.len(), 1);
704
705 let (len, fin) = recv.emit(&mut buf).unwrap();
706 assert_eq!(len, 9);
707 assert!(!fin);
708 assert_eq!(&buf[..len], b"something");
709 assert_eq!(recv.len, 9);
710 assert_eq!(recv.off, 9);
711 assert_eq!(recv.data.len(), 0);
712
713 assert_eq!(recv.emit(&mut buf), Err(Error::Done));
714 }
715
716 #[test]
717 fn fully_overlapping_read2() {
718 let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
719 assert_eq!(recv.len, 0);
720
721 let mut buf = [0; 32];
722
723 let first = RangeBuf::from(b"something", 0, false);
724 let second = RangeBuf::from(b"hello", 4, false);
725
726 assert!(recv.write(second).is_ok());
727 assert_eq!(recv.len, 9);
728 assert_eq!(recv.off, 0);
729 assert_eq!(recv.data.len(), 1);
730
731 assert!(recv.write(first).is_ok());
732 assert_eq!(recv.len, 9);
733 assert_eq!(recv.off, 0);
734 assert_eq!(recv.data.len(), 2);
735
736 let (len, fin) = recv.emit(&mut buf).unwrap();
737 assert_eq!(len, 9);
738 assert!(!fin);
739 assert_eq!(&buf[..len], b"somehello");
740 assert_eq!(recv.len, 9);
741 assert_eq!(recv.off, 9);
742 assert_eq!(recv.data.len(), 0);
743
744 assert_eq!(recv.emit(&mut buf), Err(Error::Done));
745 }
746
747 #[test]
748 fn fully_overlapping_read3() {
749 let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
750 assert_eq!(recv.len, 0);
751
752 let mut buf = [0; 32];
753
754 let first = RangeBuf::from(b"something", 0, false);
755 let second = RangeBuf::from(b"hello", 3, false);
756
757 assert!(recv.write(second).is_ok());
758 assert_eq!(recv.len, 8);
759 assert_eq!(recv.off, 0);
760 assert_eq!(recv.data.len(), 1);
761
762 assert!(recv.write(first).is_ok());
763 assert_eq!(recv.len, 9);
764 assert_eq!(recv.off, 0);
765 assert_eq!(recv.data.len(), 3);
766
767 let (len, fin) = recv.emit(&mut buf).unwrap();
768 assert_eq!(len, 9);
769 assert!(!fin);
770 assert_eq!(&buf[..len], b"somhellog");
771 assert_eq!(recv.len, 9);
772 assert_eq!(recv.off, 9);
773 assert_eq!(recv.data.len(), 0);
774
775 assert_eq!(recv.emit(&mut buf), Err(Error::Done));
776 }
777
778 #[test]
779 fn fully_overlapping_read_multi() {
780 let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
781 assert_eq!(recv.len, 0);
782
783 let mut buf = [0; 32];
784
785 let first = RangeBuf::from(b"somethingsomething", 0, false);
786 let second = RangeBuf::from(b"hello", 3, false);
787 let third = RangeBuf::from(b"hello", 12, false);
788
789 assert!(recv.write(second).is_ok());
790 assert_eq!(recv.len, 8);
791 assert_eq!(recv.off, 0);
792 assert_eq!(recv.data.len(), 1);
793
794 assert!(recv.write(third).is_ok());
795 assert_eq!(recv.len, 17);
796 assert_eq!(recv.off, 0);
797 assert_eq!(recv.data.len(), 2);
798
799 assert!(recv.write(first).is_ok());
800 assert_eq!(recv.len, 18);
801 assert_eq!(recv.off, 0);
802 assert_eq!(recv.data.len(), 5);
803
804 let (len, fin) = recv.emit(&mut buf).unwrap();
805 assert_eq!(len, 18);
806 assert!(!fin);
807 assert_eq!(&buf[..len], b"somhellogsomhellog");
808 assert_eq!(recv.len, 18);
809 assert_eq!(recv.off, 18);
810 assert_eq!(recv.data.len(), 0);
811
812 assert_eq!(recv.emit(&mut buf), Err(Error::Done));
813 }
814
815 #[test]
816 fn overlapping_start_read() {
817 let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
818 assert_eq!(recv.len, 0);
819
820 let mut buf = [0; 32];
821
822 let first = RangeBuf::from(b"something", 0, false);
823 let second = RangeBuf::from(b"hello", 8, true);
824
825 assert!(recv.write(first).is_ok());
826 assert_eq!(recv.len, 9);
827 assert_eq!(recv.off, 0);
828 assert_eq!(recv.data.len(), 1);
829
830 assert!(recv.write(second).is_ok());
831 assert_eq!(recv.len, 13);
832 assert_eq!(recv.off, 0);
833 assert_eq!(recv.data.len(), 2);
834
835 let (len, fin) = recv.emit(&mut buf).unwrap();
836 assert_eq!(len, 13);
837 assert!(fin);
838 assert_eq!(&buf[..len], b"somethingello");
839 assert_eq!(recv.len, 13);
840 assert_eq!(recv.off, 13);
841
842 assert_eq!(recv.emit(&mut buf), Err(Error::Done));
843 }
844
845 #[test]
846 fn overlapping_end_read() {
847 let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
848 assert_eq!(recv.len, 0);
849
850 let mut buf = [0; 32];
851
852 let first = RangeBuf::from(b"hello", 0, false);
853 let second = RangeBuf::from(b"something", 3, true);
854
855 assert!(recv.write(second).is_ok());
856 assert_eq!(recv.len, 12);
857 assert_eq!(recv.off, 0);
858 assert_eq!(recv.data.len(), 1);
859
860 assert!(recv.write(first).is_ok());
861 assert_eq!(recv.len, 12);
862 assert_eq!(recv.off, 0);
863 assert_eq!(recv.data.len(), 2);
864
865 let (len, fin) = recv.emit(&mut buf).unwrap();
866 assert_eq!(len, 12);
867 assert!(fin);
868 assert_eq!(&buf[..len], b"helsomething");
869 assert_eq!(recv.len, 12);
870 assert_eq!(recv.off, 12);
871
872 assert_eq!(recv.emit(&mut buf), Err(Error::Done));
873 }
874
875 #[test]
876 fn overlapping_end_twice_read() {
877 let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
878 assert_eq!(recv.len, 0);
879
880 let mut buf = [0; 32];
881
882 let first = RangeBuf::from(b"he", 0, false);
883 let second = RangeBuf::from(b"ow", 4, false);
884 let third = RangeBuf::from(b"rl", 7, false);
885 let fourth = RangeBuf::from(b"helloworld", 0, true);
886
887 assert!(recv.write(third).is_ok());
888 assert_eq!(recv.len, 9);
889 assert_eq!(recv.off, 0);
890 assert_eq!(recv.data.len(), 1);
891
892 assert!(recv.write(second).is_ok());
893 assert_eq!(recv.len, 9);
894 assert_eq!(recv.off, 0);
895 assert_eq!(recv.data.len(), 2);
896
897 assert!(recv.write(first).is_ok());
898 assert_eq!(recv.len, 9);
899 assert_eq!(recv.off, 0);
900 assert_eq!(recv.data.len(), 3);
901
902 assert!(recv.write(fourth).is_ok());
903 assert_eq!(recv.len, 10);
904 assert_eq!(recv.off, 0);
905 assert_eq!(recv.data.len(), 6);
906
907 let (len, fin) = recv.emit(&mut buf).unwrap();
908 assert_eq!(len, 10);
909 assert!(fin);
910 assert_eq!(&buf[..len], b"helloworld");
911 assert_eq!(recv.len, 10);
912 assert_eq!(recv.off, 10);
913
914 assert_eq!(recv.emit(&mut buf), Err(Error::Done));
915 }
916
917 #[test]
918 fn overlapping_end_twice_and_contained_read() {
919 let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
920 assert_eq!(recv.len, 0);
921
922 let mut buf = [0; 32];
923
924 let first = RangeBuf::from(b"hellow", 0, false);
925 let second = RangeBuf::from(b"barfoo", 10, true);
926 let third = RangeBuf::from(b"rl", 7, false);
927 let fourth = RangeBuf::from(b"elloworldbarfoo", 1, true);
928
929 assert!(recv.write(third).is_ok());
930 assert_eq!(recv.len, 9);
931 assert_eq!(recv.off, 0);
932 assert_eq!(recv.data.len(), 1);
933
934 assert!(recv.write(second).is_ok());
935 assert_eq!(recv.len, 16);
936 assert_eq!(recv.off, 0);
937 assert_eq!(recv.data.len(), 2);
938
939 assert!(recv.write(first).is_ok());
940 assert_eq!(recv.len, 16);
941 assert_eq!(recv.off, 0);
942 assert_eq!(recv.data.len(), 3);
943
944 assert!(recv.write(fourth).is_ok());
945 assert_eq!(recv.len, 16);
946 assert_eq!(recv.off, 0);
947 assert_eq!(recv.data.len(), 5);
948
949 let (len, fin) = recv.emit(&mut buf).unwrap();
950 assert_eq!(len, 16);
951 assert!(fin);
952 assert_eq!(&buf[..len], b"helloworldbarfoo");
953 assert_eq!(recv.len, 16);
954 assert_eq!(recv.off, 16);
955
956 assert_eq!(recv.emit(&mut buf), Err(Error::Done));
957 }
958
959 #[test]
960 fn partially_multi_overlapping_reordered_read() {
961 let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
962 assert_eq!(recv.len, 0);
963
964 let mut buf = [0; 32];
965
966 let first = RangeBuf::from(b"hello", 8, false);
967 let second = RangeBuf::from(b"something", 0, false);
968 let third = RangeBuf::from(b"moar", 11, true);
969
970 assert!(recv.write(first).is_ok());
971 assert_eq!(recv.len, 13);
972 assert_eq!(recv.off, 0);
973 assert_eq!(recv.data.len(), 1);
974
975 assert!(recv.write(second).is_ok());
976 assert_eq!(recv.len, 13);
977 assert_eq!(recv.off, 0);
978 assert_eq!(recv.data.len(), 2);
979
980 assert!(recv.write(third).is_ok());
981 assert_eq!(recv.len, 15);
982 assert_eq!(recv.off, 0);
983 assert_eq!(recv.data.len(), 3);
984
985 let (len, fin) = recv.emit(&mut buf).unwrap();
986 assert_eq!(len, 15);
987 assert!(fin);
988 assert_eq!(&buf[..len], b"somethinhelloar");
989 assert_eq!(recv.len, 15);
990 assert_eq!(recv.off, 15);
991 assert_eq!(recv.data.len(), 0);
992
993 assert_eq!(recv.emit(&mut buf), Err(Error::Done));
994 }
995
996 #[test]
997 fn partially_multi_overlapping_reordered_read2() {
998 let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
999 assert_eq!(recv.len, 0);
1000
1001 let mut buf = [0; 32];
1002
1003 let first = RangeBuf::from(b"aaa", 0, false);
1004 let second = RangeBuf::from(b"bbb", 2, false);
1005 let third = RangeBuf::from(b"ccc", 4, false);
1006 let fourth = RangeBuf::from(b"ddd", 6, false);
1007 let fifth = RangeBuf::from(b"eee", 9, false);
1008 let sixth = RangeBuf::from(b"fff", 11, false);
1009
1010 assert!(recv.write(second).is_ok());
1011 assert_eq!(recv.len, 5);
1012 assert_eq!(recv.off, 0);
1013 assert_eq!(recv.data.len(), 1);
1014
1015 assert!(recv.write(fourth).is_ok());
1016 assert_eq!(recv.len, 9);
1017 assert_eq!(recv.off, 0);
1018 assert_eq!(recv.data.len(), 2);
1019
1020 assert!(recv.write(third).is_ok());
1021 assert_eq!(recv.len, 9);
1022 assert_eq!(recv.off, 0);
1023 assert_eq!(recv.data.len(), 3);
1024
1025 assert!(recv.write(first).is_ok());
1026 assert_eq!(recv.len, 9);
1027 assert_eq!(recv.off, 0);
1028 assert_eq!(recv.data.len(), 4);
1029
1030 assert!(recv.write(sixth).is_ok());
1031 assert_eq!(recv.len, 14);
1032 assert_eq!(recv.off, 0);
1033 assert_eq!(recv.data.len(), 5);
1034
1035 assert!(recv.write(fifth).is_ok());
1036 assert_eq!(recv.len, 14);
1037 assert_eq!(recv.off, 0);
1038 assert_eq!(recv.data.len(), 6);
1039
1040 let (len, fin) = recv.emit(&mut buf).unwrap();
1041 assert_eq!(len, 14);
1042 assert!(!fin);
1043 assert_eq!(&buf[..len], b"aabbbcdddeefff");
1044 assert_eq!(recv.len, 14);
1045 assert_eq!(recv.off, 14);
1046 assert_eq!(recv.data.len(), 0);
1047
1048 assert_eq!(recv.emit(&mut buf), Err(Error::Done));
1049 }
1050}