1use std::cmp;
28
29use std::collections::BTreeMap;
30use std::collections::VecDeque;
31
32use std::time::Duration;
33use std::time::Instant;
34
35use crate::Error;
36use crate::Result;
37
38use crate::flowcontrol;
39
40use crate::range_buf::RangeBuf;
41
42use super::DEFAULT_STREAM_WINDOW;
43
44#[derive(Debug, Default)]
50pub struct RecvBuf {
51 data: BTreeMap<u64, RangeBuf>,
54
55 off: u64,
57
58 len: u64,
60
61 flow_control: flowcontrol::FlowControl,
63
64 fin_off: Option<u64>,
66
67 error: Option<u64>,
69
70 drain: bool,
72}
73
74impl RecvBuf {
75 pub fn new(max_data: u64, max_window: u64) -> RecvBuf {
77 RecvBuf {
78 flow_control: flowcontrol::FlowControl::new(
79 max_data,
80 cmp::min(max_data, DEFAULT_STREAM_WINDOW),
81 max_window,
82 ),
83 ..RecvBuf::default()
84 }
85 }
86
87 pub fn write(&mut self, buf: RangeBuf) -> Result<()> {
93 if buf.max_off() > self.max_data() {
94 return Err(Error::FlowControl);
95 }
96
97 if let Some(fin_off) = self.fin_off {
98 if buf.max_off() > fin_off {
100 return Err(Error::FinalSize);
101 }
102
103 if buf.fin() && fin_off != buf.max_off() {
105 return Err(Error::FinalSize);
106 }
107 }
108
109 if buf.fin() && buf.max_off() < self.len {
111 return Err(Error::FinalSize);
112 }
113
114 if self.fin_off.is_some() && buf.is_empty() {
117 return Ok(());
118 }
119
120 if buf.fin() {
121 self.fin_off = Some(buf.max_off());
122 }
123
124 if !buf.fin() && buf.is_empty() {
126 return Ok(());
127 }
128
129 if self.off >= buf.max_off() {
132 if !buf.is_empty() {
138 return Ok(());
139 }
140 }
141
142 let mut tmp_bufs = VecDeque::with_capacity(2);
143 tmp_bufs.push_back(buf);
144
145 'tmp: while let Some(mut buf) = tmp_bufs.pop_front() {
146 if self.off_front() > buf.off() {
152 buf = buf.split_off((self.off_front() - buf.off()) as usize);
153 }
154
155 if buf.off() < self.max_off() || buf.is_empty() {
162 for (_, b) in self.data.range(buf.off()..) {
163 let off = buf.off();
164
165 if b.off() > buf.max_off() {
167 break;
168 }
169
170 if off >= b.off() && buf.max_off() <= b.max_off() {
172 continue 'tmp;
173 }
174
175 if off >= b.off() && off < b.max_off() {
177 buf = buf.split_off((b.max_off() - off) as usize);
178 }
179
180 if off < b.off() && buf.max_off() > b.off() {
182 tmp_bufs
183 .push_back(buf.split_off((b.off() - off) as usize));
184 }
185 }
186 }
187
188 self.len = cmp::max(self.len, buf.max_off());
189
190 if !self.drain {
191 self.data.insert(buf.max_off(), buf);
192 }
193 }
194
195 Ok(())
196 }
197
198 pub fn emit(&mut self, out: &mut [u8]) -> Result<(usize, bool)> {
208 let mut len = 0;
209 let mut cap = out.len();
210
211 if !self.ready() {
212 return Err(Error::Done);
213 }
214
215 if let Some(e) = self.error {
218 self.data.clear();
219 return Err(Error::StreamReset(e));
220 }
221
222 while cap > 0 && self.ready() {
223 let mut entry = match self.data.first_entry() {
224 Some(entry) => entry,
225 None => break,
226 };
227
228 let buf = entry.get_mut();
229
230 let buf_len = cmp::min(buf.len(), cap);
231
232 out[len..len + buf_len].copy_from_slice(&buf[..buf_len]);
233
234 self.off += buf_len as u64;
235
236 len += buf_len;
237 cap -= buf_len;
238
239 if buf_len < buf.len() {
240 buf.consume(buf_len);
241
242 break;
244 }
245
246 entry.remove();
247 }
248
249 self.flow_control.add_consumed(len as u64);
251
252 Ok((len, self.is_fin()))
253 }
254
255 pub fn reset(&mut self, error_code: u64, final_size: u64) -> Result<usize> {
257 if let Some(fin_off) = self.fin_off {
259 if fin_off != final_size {
260 return Err(Error::FinalSize);
261 }
262 }
263
264 if final_size < self.len {
266 return Err(Error::FinalSize);
267 }
268
269 let max_data_delta = final_size - self.len;
272
273 if self.error.is_some() {
274 return Ok(max_data_delta as usize);
275 }
276
277 self.error = Some(error_code);
278
279 self.off = final_size;
281
282 self.data.clear();
283
284 let buf = RangeBuf::from(b"", final_size, true);
287 self.write(buf)?;
288
289 Ok(max_data_delta as usize)
290 }
291
292 pub fn update_max_data(&mut self, now: Instant) {
294 self.flow_control.update_max_data(now);
295 }
296
297 pub fn max_data_next(&mut self) -> u64 {
299 self.flow_control.max_data_next()
300 }
301
302 pub fn max_data(&self) -> u64 {
304 self.flow_control.max_data()
305 }
306
307 pub fn window(&self) -> u64 {
309 self.flow_control.window()
310 }
311
312 pub fn autotune_window(&mut self, now: Instant, rtt: Duration) {
314 self.flow_control.autotune_window(now, rtt);
315 }
316
317 pub fn shutdown(&mut self) -> Result<()> {
319 if self.drain {
320 return Err(Error::Done);
321 }
322
323 self.drain = true;
324
325 self.data.clear();
326
327 self.off = self.max_off();
328
329 Ok(())
330 }
331
332 pub fn off_front(&self) -> u64 {
334 self.off
335 }
336
337 pub fn almost_full(&self) -> bool {
339 self.fin_off.is_none() && self.flow_control.should_update_max_data()
340 }
341
342 pub fn max_off(&self) -> u64 {
344 self.len
345 }
346
347 pub fn is_fin(&self) -> bool {
352 if self.fin_off == Some(self.off) {
353 return true;
354 }
355
356 false
357 }
358
359 pub fn is_draining(&self) -> bool {
361 self.drain
362 }
363
364 pub fn ready(&self) -> bool {
366 let (_, buf) = match self.data.first_key_value() {
367 Some(v) => v,
368 None => return false,
369 };
370
371 buf.off() == self.off
372 }
373}
374
375#[cfg(test)]
376mod tests {
377 use super::*;
378
379 #[test]
380 fn empty_read() {
381 let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
382 assert_eq!(recv.len, 0);
383
384 let mut buf = [0; 32];
385
386 assert_eq!(recv.emit(&mut buf), Err(Error::Done));
387 }
388
389 #[test]
390 fn empty_stream_frame() {
391 let mut recv = RecvBuf::new(15, DEFAULT_STREAM_WINDOW);
392 assert_eq!(recv.len, 0);
393
394 let buf = RangeBuf::from(b"hello", 0, false);
395 assert!(recv.write(buf).is_ok());
396 assert_eq!(recv.len, 5);
397 assert_eq!(recv.off, 0);
398 assert_eq!(recv.data.len(), 1);
399
400 let mut buf = [0; 32];
401 assert_eq!(recv.emit(&mut buf), Ok((5, false)));
402
403 let buf = RangeBuf::from(b"", 10, false);
405 assert!(recv.write(buf).is_ok());
406 assert_eq!(recv.len, 5);
407 assert_eq!(recv.off, 5);
408 assert_eq!(recv.data.len(), 0);
409
410 let buf = RangeBuf::from(b"", 16, false);
412 assert_eq!(recv.write(buf), Err(Error::FlowControl));
413
414 let buf = RangeBuf::from(b"", 5, true);
416 assert!(recv.write(buf).is_ok());
417 assert_eq!(recv.len, 5);
418 assert_eq!(recv.off, 5);
419 assert_eq!(recv.data.len(), 1);
420
421 let buf = RangeBuf::from(b"", 5, true);
423 assert!(recv.write(buf).is_ok());
424 assert_eq!(recv.len, 5);
425 assert_eq!(recv.off, 5);
426 assert_eq!(recv.data.len(), 1);
427
428 let buf = RangeBuf::from(b"aa", 3, true);
430 assert!(recv.write(buf).is_ok());
431 assert_eq!(recv.len, 5);
432 assert_eq!(recv.off, 5);
433 assert_eq!(recv.data.len(), 1);
434
435 let buf = RangeBuf::from(b"", 6, true);
437 assert_eq!(recv.write(buf), Err(Error::FinalSize));
438 let buf = RangeBuf::from(b"", 4, true);
439 assert_eq!(recv.write(buf), Err(Error::FinalSize));
440
441 let mut buf = [0; 32];
442 assert_eq!(recv.emit(&mut buf), Ok((0, true)));
443 }
444
445 #[test]
446 fn ordered_read() {
447 let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
448 assert_eq!(recv.len, 0);
449
450 let mut buf = [0; 32];
451
452 let first = RangeBuf::from(b"hello", 0, false);
453 let second = RangeBuf::from(b"world", 5, false);
454 let third = RangeBuf::from(b"something", 10, true);
455
456 assert!(recv.write(second).is_ok());
457 assert_eq!(recv.len, 10);
458 assert_eq!(recv.off, 0);
459
460 assert_eq!(recv.emit(&mut buf), Err(Error::Done));
461
462 assert!(recv.write(third).is_ok());
463 assert_eq!(recv.len, 19);
464 assert_eq!(recv.off, 0);
465
466 assert_eq!(recv.emit(&mut buf), Err(Error::Done));
467
468 assert!(recv.write(first).is_ok());
469 assert_eq!(recv.len, 19);
470 assert_eq!(recv.off, 0);
471
472 let (len, fin) = recv.emit(&mut buf).unwrap();
473 assert_eq!(len, 19);
474 assert!(fin);
475 assert_eq!(&buf[..len], b"helloworldsomething");
476 assert_eq!(recv.len, 19);
477 assert_eq!(recv.off, 19);
478
479 assert_eq!(recv.emit(&mut buf), Err(Error::Done));
480 }
481
482 #[test]
483 fn split_read() {
484 let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
485 assert_eq!(recv.len, 0);
486
487 let mut buf = [0; 32];
488
489 let first = RangeBuf::from(b"something", 0, false);
490 let second = RangeBuf::from(b"helloworld", 9, true);
491
492 assert!(recv.write(first).is_ok());
493 assert_eq!(recv.len, 9);
494 assert_eq!(recv.off, 0);
495
496 assert!(recv.write(second).is_ok());
497 assert_eq!(recv.len, 19);
498 assert_eq!(recv.off, 0);
499
500 let (len, fin) = recv.emit(&mut buf[..10]).unwrap();
501 assert_eq!(len, 10);
502 assert!(!fin);
503 assert_eq!(&buf[..len], b"somethingh");
504 assert_eq!(recv.len, 19);
505 assert_eq!(recv.off, 10);
506
507 let (len, fin) = recv.emit(&mut buf[..5]).unwrap();
508 assert_eq!(len, 5);
509 assert!(!fin);
510 assert_eq!(&buf[..len], b"ellow");
511 assert_eq!(recv.len, 19);
512 assert_eq!(recv.off, 15);
513
514 let (len, fin) = recv.emit(&mut buf[..10]).unwrap();
515 assert_eq!(len, 4);
516 assert!(fin);
517 assert_eq!(&buf[..len], b"orld");
518 assert_eq!(recv.len, 19);
519 assert_eq!(recv.off, 19);
520 }
521
522 #[test]
523 fn incomplete_read() {
524 let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
525 assert_eq!(recv.len, 0);
526
527 let mut buf = [0; 32];
528
529 let first = RangeBuf::from(b"something", 0, false);
530 let second = RangeBuf::from(b"helloworld", 9, true);
531
532 assert!(recv.write(second).is_ok());
533 assert_eq!(recv.len, 19);
534 assert_eq!(recv.off, 0);
535
536 assert_eq!(recv.emit(&mut buf), Err(Error::Done));
537
538 assert!(recv.write(first).is_ok());
539 assert_eq!(recv.len, 19);
540 assert_eq!(recv.off, 0);
541
542 let (len, fin) = recv.emit(&mut buf).unwrap();
543 assert_eq!(len, 19);
544 assert!(fin);
545 assert_eq!(&buf[..len], b"somethinghelloworld");
546 assert_eq!(recv.len, 19);
547 assert_eq!(recv.off, 19);
548 }
549
550 #[test]
551 fn zero_len_read() {
552 let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
553 assert_eq!(recv.len, 0);
554
555 let mut buf = [0; 32];
556
557 let first = RangeBuf::from(b"something", 0, false);
558 let second = RangeBuf::from(b"", 9, true);
559
560 assert!(recv.write(first).is_ok());
561 assert_eq!(recv.len, 9);
562 assert_eq!(recv.off, 0);
563 assert_eq!(recv.data.len(), 1);
564
565 assert!(recv.write(second).is_ok());
566 assert_eq!(recv.len, 9);
567 assert_eq!(recv.off, 0);
568 assert_eq!(recv.data.len(), 1);
569
570 let (len, fin) = recv.emit(&mut buf).unwrap();
571 assert_eq!(len, 9);
572 assert!(fin);
573 assert_eq!(&buf[..len], b"something");
574 assert_eq!(recv.len, 9);
575 assert_eq!(recv.off, 9);
576 }
577
578 #[test]
579 fn past_read() {
580 let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
581 assert_eq!(recv.len, 0);
582
583 let mut buf = [0; 32];
584
585 let first = RangeBuf::from(b"something", 0, false);
586 let second = RangeBuf::from(b"hello", 3, false);
587 let third = RangeBuf::from(b"ello", 4, true);
588 let fourth = RangeBuf::from(b"ello", 5, true);
589
590 assert!(recv.write(first).is_ok());
591 assert_eq!(recv.len, 9);
592 assert_eq!(recv.off, 0);
593 assert_eq!(recv.data.len(), 1);
594
595 let (len, fin) = recv.emit(&mut buf).unwrap();
596 assert_eq!(len, 9);
597 assert!(!fin);
598 assert_eq!(&buf[..len], b"something");
599 assert_eq!(recv.len, 9);
600 assert_eq!(recv.off, 9);
601
602 assert!(recv.write(second).is_ok());
603 assert_eq!(recv.len, 9);
604 assert_eq!(recv.off, 9);
605 assert_eq!(recv.data.len(), 0);
606
607 assert_eq!(recv.write(third), Err(Error::FinalSize));
608
609 assert!(recv.write(fourth).is_ok());
610 assert_eq!(recv.len, 9);
611 assert_eq!(recv.off, 9);
612 assert_eq!(recv.data.len(), 0);
613
614 assert_eq!(recv.emit(&mut buf), Err(Error::Done));
615 }
616
617 #[test]
618 fn fully_overlapping_read() {
619 let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
620 assert_eq!(recv.len, 0);
621
622 let mut buf = [0; 32];
623
624 let first = RangeBuf::from(b"something", 0, false);
625 let second = RangeBuf::from(b"hello", 4, false);
626
627 assert!(recv.write(first).is_ok());
628 assert_eq!(recv.len, 9);
629 assert_eq!(recv.off, 0);
630 assert_eq!(recv.data.len(), 1);
631
632 assert!(recv.write(second).is_ok());
633 assert_eq!(recv.len, 9);
634 assert_eq!(recv.off, 0);
635 assert_eq!(recv.data.len(), 1);
636
637 let (len, fin) = recv.emit(&mut buf).unwrap();
638 assert_eq!(len, 9);
639 assert!(!fin);
640 assert_eq!(&buf[..len], b"something");
641 assert_eq!(recv.len, 9);
642 assert_eq!(recv.off, 9);
643 assert_eq!(recv.data.len(), 0);
644
645 assert_eq!(recv.emit(&mut buf), Err(Error::Done));
646 }
647
648 #[test]
649 fn fully_overlapping_read2() {
650 let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
651 assert_eq!(recv.len, 0);
652
653 let mut buf = [0; 32];
654
655 let first = RangeBuf::from(b"something", 0, false);
656 let second = RangeBuf::from(b"hello", 4, false);
657
658 assert!(recv.write(second).is_ok());
659 assert_eq!(recv.len, 9);
660 assert_eq!(recv.off, 0);
661 assert_eq!(recv.data.len(), 1);
662
663 assert!(recv.write(first).is_ok());
664 assert_eq!(recv.len, 9);
665 assert_eq!(recv.off, 0);
666 assert_eq!(recv.data.len(), 2);
667
668 let (len, fin) = recv.emit(&mut buf).unwrap();
669 assert_eq!(len, 9);
670 assert!(!fin);
671 assert_eq!(&buf[..len], b"somehello");
672 assert_eq!(recv.len, 9);
673 assert_eq!(recv.off, 9);
674 assert_eq!(recv.data.len(), 0);
675
676 assert_eq!(recv.emit(&mut buf), Err(Error::Done));
677 }
678
679 #[test]
680 fn fully_overlapping_read3() {
681 let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
682 assert_eq!(recv.len, 0);
683
684 let mut buf = [0; 32];
685
686 let first = RangeBuf::from(b"something", 0, false);
687 let second = RangeBuf::from(b"hello", 3, false);
688
689 assert!(recv.write(second).is_ok());
690 assert_eq!(recv.len, 8);
691 assert_eq!(recv.off, 0);
692 assert_eq!(recv.data.len(), 1);
693
694 assert!(recv.write(first).is_ok());
695 assert_eq!(recv.len, 9);
696 assert_eq!(recv.off, 0);
697 assert_eq!(recv.data.len(), 3);
698
699 let (len, fin) = recv.emit(&mut buf).unwrap();
700 assert_eq!(len, 9);
701 assert!(!fin);
702 assert_eq!(&buf[..len], b"somhellog");
703 assert_eq!(recv.len, 9);
704 assert_eq!(recv.off, 9);
705 assert_eq!(recv.data.len(), 0);
706
707 assert_eq!(recv.emit(&mut buf), Err(Error::Done));
708 }
709
710 #[test]
711 fn fully_overlapping_read_multi() {
712 let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
713 assert_eq!(recv.len, 0);
714
715 let mut buf = [0; 32];
716
717 let first = RangeBuf::from(b"somethingsomething", 0, false);
718 let second = RangeBuf::from(b"hello", 3, false);
719 let third = RangeBuf::from(b"hello", 12, false);
720
721 assert!(recv.write(second).is_ok());
722 assert_eq!(recv.len, 8);
723 assert_eq!(recv.off, 0);
724 assert_eq!(recv.data.len(), 1);
725
726 assert!(recv.write(third).is_ok());
727 assert_eq!(recv.len, 17);
728 assert_eq!(recv.off, 0);
729 assert_eq!(recv.data.len(), 2);
730
731 assert!(recv.write(first).is_ok());
732 assert_eq!(recv.len, 18);
733 assert_eq!(recv.off, 0);
734 assert_eq!(recv.data.len(), 5);
735
736 let (len, fin) = recv.emit(&mut buf).unwrap();
737 assert_eq!(len, 18);
738 assert!(!fin);
739 assert_eq!(&buf[..len], b"somhellogsomhellog");
740 assert_eq!(recv.len, 18);
741 assert_eq!(recv.off, 18);
742 assert_eq!(recv.data.len(), 0);
743
744 assert_eq!(recv.emit(&mut buf), Err(Error::Done));
745 }
746
747 #[test]
748 fn overlapping_start_read() {
749 let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
750 assert_eq!(recv.len, 0);
751
752 let mut buf = [0; 32];
753
754 let first = RangeBuf::from(b"something", 0, false);
755 let second = RangeBuf::from(b"hello", 8, true);
756
757 assert!(recv.write(first).is_ok());
758 assert_eq!(recv.len, 9);
759 assert_eq!(recv.off, 0);
760 assert_eq!(recv.data.len(), 1);
761
762 assert!(recv.write(second).is_ok());
763 assert_eq!(recv.len, 13);
764 assert_eq!(recv.off, 0);
765 assert_eq!(recv.data.len(), 2);
766
767 let (len, fin) = recv.emit(&mut buf).unwrap();
768 assert_eq!(len, 13);
769 assert!(fin);
770 assert_eq!(&buf[..len], b"somethingello");
771 assert_eq!(recv.len, 13);
772 assert_eq!(recv.off, 13);
773
774 assert_eq!(recv.emit(&mut buf), Err(Error::Done));
775 }
776
777 #[test]
778 fn overlapping_end_read() {
779 let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
780 assert_eq!(recv.len, 0);
781
782 let mut buf = [0; 32];
783
784 let first = RangeBuf::from(b"hello", 0, false);
785 let second = RangeBuf::from(b"something", 3, true);
786
787 assert!(recv.write(second).is_ok());
788 assert_eq!(recv.len, 12);
789 assert_eq!(recv.off, 0);
790 assert_eq!(recv.data.len(), 1);
791
792 assert!(recv.write(first).is_ok());
793 assert_eq!(recv.len, 12);
794 assert_eq!(recv.off, 0);
795 assert_eq!(recv.data.len(), 2);
796
797 let (len, fin) = recv.emit(&mut buf).unwrap();
798 assert_eq!(len, 12);
799 assert!(fin);
800 assert_eq!(&buf[..len], b"helsomething");
801 assert_eq!(recv.len, 12);
802 assert_eq!(recv.off, 12);
803
804 assert_eq!(recv.emit(&mut buf), Err(Error::Done));
805 }
806
807 #[test]
808 fn overlapping_end_twice_read() {
809 let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
810 assert_eq!(recv.len, 0);
811
812 let mut buf = [0; 32];
813
814 let first = RangeBuf::from(b"he", 0, false);
815 let second = RangeBuf::from(b"ow", 4, false);
816 let third = RangeBuf::from(b"rl", 7, false);
817 let fourth = RangeBuf::from(b"helloworld", 0, true);
818
819 assert!(recv.write(third).is_ok());
820 assert_eq!(recv.len, 9);
821 assert_eq!(recv.off, 0);
822 assert_eq!(recv.data.len(), 1);
823
824 assert!(recv.write(second).is_ok());
825 assert_eq!(recv.len, 9);
826 assert_eq!(recv.off, 0);
827 assert_eq!(recv.data.len(), 2);
828
829 assert!(recv.write(first).is_ok());
830 assert_eq!(recv.len, 9);
831 assert_eq!(recv.off, 0);
832 assert_eq!(recv.data.len(), 3);
833
834 assert!(recv.write(fourth).is_ok());
835 assert_eq!(recv.len, 10);
836 assert_eq!(recv.off, 0);
837 assert_eq!(recv.data.len(), 6);
838
839 let (len, fin) = recv.emit(&mut buf).unwrap();
840 assert_eq!(len, 10);
841 assert!(fin);
842 assert_eq!(&buf[..len], b"helloworld");
843 assert_eq!(recv.len, 10);
844 assert_eq!(recv.off, 10);
845
846 assert_eq!(recv.emit(&mut buf), Err(Error::Done));
847 }
848
849 #[test]
850 fn overlapping_end_twice_and_contained_read() {
851 let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
852 assert_eq!(recv.len, 0);
853
854 let mut buf = [0; 32];
855
856 let first = RangeBuf::from(b"hellow", 0, false);
857 let second = RangeBuf::from(b"barfoo", 10, true);
858 let third = RangeBuf::from(b"rl", 7, false);
859 let fourth = RangeBuf::from(b"elloworldbarfoo", 1, true);
860
861 assert!(recv.write(third).is_ok());
862 assert_eq!(recv.len, 9);
863 assert_eq!(recv.off, 0);
864 assert_eq!(recv.data.len(), 1);
865
866 assert!(recv.write(second).is_ok());
867 assert_eq!(recv.len, 16);
868 assert_eq!(recv.off, 0);
869 assert_eq!(recv.data.len(), 2);
870
871 assert!(recv.write(first).is_ok());
872 assert_eq!(recv.len, 16);
873 assert_eq!(recv.off, 0);
874 assert_eq!(recv.data.len(), 3);
875
876 assert!(recv.write(fourth).is_ok());
877 assert_eq!(recv.len, 16);
878 assert_eq!(recv.off, 0);
879 assert_eq!(recv.data.len(), 5);
880
881 let (len, fin) = recv.emit(&mut buf).unwrap();
882 assert_eq!(len, 16);
883 assert!(fin);
884 assert_eq!(&buf[..len], b"helloworldbarfoo");
885 assert_eq!(recv.len, 16);
886 assert_eq!(recv.off, 16);
887
888 assert_eq!(recv.emit(&mut buf), Err(Error::Done));
889 }
890
891 #[test]
892 fn partially_multi_overlapping_reordered_read() {
893 let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
894 assert_eq!(recv.len, 0);
895
896 let mut buf = [0; 32];
897
898 let first = RangeBuf::from(b"hello", 8, false);
899 let second = RangeBuf::from(b"something", 0, false);
900 let third = RangeBuf::from(b"moar", 11, true);
901
902 assert!(recv.write(first).is_ok());
903 assert_eq!(recv.len, 13);
904 assert_eq!(recv.off, 0);
905 assert_eq!(recv.data.len(), 1);
906
907 assert!(recv.write(second).is_ok());
908 assert_eq!(recv.len, 13);
909 assert_eq!(recv.off, 0);
910 assert_eq!(recv.data.len(), 2);
911
912 assert!(recv.write(third).is_ok());
913 assert_eq!(recv.len, 15);
914 assert_eq!(recv.off, 0);
915 assert_eq!(recv.data.len(), 3);
916
917 let (len, fin) = recv.emit(&mut buf).unwrap();
918 assert_eq!(len, 15);
919 assert!(fin);
920 assert_eq!(&buf[..len], b"somethinhelloar");
921 assert_eq!(recv.len, 15);
922 assert_eq!(recv.off, 15);
923 assert_eq!(recv.data.len(), 0);
924
925 assert_eq!(recv.emit(&mut buf), Err(Error::Done));
926 }
927
928 #[test]
929 fn partially_multi_overlapping_reordered_read2() {
930 let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
931 assert_eq!(recv.len, 0);
932
933 let mut buf = [0; 32];
934
935 let first = RangeBuf::from(b"aaa", 0, false);
936 let second = RangeBuf::from(b"bbb", 2, false);
937 let third = RangeBuf::from(b"ccc", 4, false);
938 let fourth = RangeBuf::from(b"ddd", 6, false);
939 let fifth = RangeBuf::from(b"eee", 9, false);
940 let sixth = RangeBuf::from(b"fff", 11, false);
941
942 assert!(recv.write(second).is_ok());
943 assert_eq!(recv.len, 5);
944 assert_eq!(recv.off, 0);
945 assert_eq!(recv.data.len(), 1);
946
947 assert!(recv.write(fourth).is_ok());
948 assert_eq!(recv.len, 9);
949 assert_eq!(recv.off, 0);
950 assert_eq!(recv.data.len(), 2);
951
952 assert!(recv.write(third).is_ok());
953 assert_eq!(recv.len, 9);
954 assert_eq!(recv.off, 0);
955 assert_eq!(recv.data.len(), 3);
956
957 assert!(recv.write(first).is_ok());
958 assert_eq!(recv.len, 9);
959 assert_eq!(recv.off, 0);
960 assert_eq!(recv.data.len(), 4);
961
962 assert!(recv.write(sixth).is_ok());
963 assert_eq!(recv.len, 14);
964 assert_eq!(recv.off, 0);
965 assert_eq!(recv.data.len(), 5);
966
967 assert!(recv.write(fifth).is_ok());
968 assert_eq!(recv.len, 14);
969 assert_eq!(recv.off, 0);
970 assert_eq!(recv.data.len(), 6);
971
972 let (len, fin) = recv.emit(&mut buf).unwrap();
973 assert_eq!(len, 14);
974 assert!(!fin);
975 assert_eq!(&buf[..len], b"aabbbcdddeefff");
976 assert_eq!(recv.len, 14);
977 assert_eq!(recv.off, 14);
978 assert_eq!(recv.data.len(), 0);
979
980 assert_eq!(recv.emit(&mut buf), Err(Error::Done));
981 }
982}