1use std::cmp;
28use std::time;
29
30use std::collections::BTreeMap;
31use std::collections::VecDeque;
32
33use crate::Error;
34use crate::Result;
35
36use crate::flowcontrol;
37
38use super::RangeBuf;
39use super::DEFAULT_STREAM_WINDOW;
40
41#[derive(Debug, Default)]
47pub struct RecvBuf {
48 data: BTreeMap<u64, RangeBuf>,
51
52 off: u64,
54
55 len: u64,
57
58 flow_control: flowcontrol::FlowControl,
60
61 fin_off: Option<u64>,
63
64 error: Option<u64>,
66
67 drain: bool,
69}
70
71impl RecvBuf {
72 pub fn new(max_data: u64, max_window: u64) -> RecvBuf {
74 RecvBuf {
75 flow_control: flowcontrol::FlowControl::new(
76 max_data,
77 cmp::min(max_data, DEFAULT_STREAM_WINDOW),
78 max_window,
79 ),
80 ..RecvBuf::default()
81 }
82 }
83
84 pub fn write(&mut self, buf: RangeBuf) -> Result<()> {
90 if buf.max_off() > self.max_data() {
91 return Err(Error::FlowControl);
92 }
93
94 if let Some(fin_off) = self.fin_off {
95 if buf.max_off() > fin_off {
97 return Err(Error::FinalSize);
98 }
99
100 if buf.fin() && fin_off != buf.max_off() {
102 return Err(Error::FinalSize);
103 }
104 }
105
106 if buf.fin() && buf.max_off() < self.len {
108 return Err(Error::FinalSize);
109 }
110
111 if self.fin_off.is_some() && buf.is_empty() {
114 return Ok(());
115 }
116
117 if buf.fin() {
118 self.fin_off = Some(buf.max_off());
119 }
120
121 if !buf.fin() && buf.is_empty() {
123 return Ok(());
124 }
125
126 if self.off >= buf.max_off() {
129 if !buf.is_empty() {
135 return Ok(());
136 }
137 }
138
139 let mut tmp_bufs = VecDeque::with_capacity(2);
140 tmp_bufs.push_back(buf);
141
142 'tmp: while let Some(mut buf) = tmp_bufs.pop_front() {
143 if self.off_front() > buf.off() {
149 buf = buf.split_off((self.off_front() - buf.off()) as usize);
150 }
151
152 if buf.off() < self.max_off() || buf.is_empty() {
159 for (_, b) in self.data.range(buf.off()..) {
160 let off = buf.off();
161
162 if b.off() > buf.max_off() {
164 break;
165 }
166
167 if off >= b.off() && buf.max_off() <= b.max_off() {
169 continue 'tmp;
170 }
171
172 if off >= b.off() && off < b.max_off() {
174 buf = buf.split_off((b.max_off() - off) as usize);
175 }
176
177 if off < b.off() && buf.max_off() > b.off() {
179 tmp_bufs
180 .push_back(buf.split_off((b.off() - off) as usize));
181 }
182 }
183 }
184
185 self.len = cmp::max(self.len, buf.max_off());
186
187 if !self.drain {
188 self.data.insert(buf.max_off(), buf);
189 }
190 }
191
192 Ok(())
193 }
194
195 pub fn emit(&mut self, out: &mut [u8]) -> Result<(usize, bool)> {
205 let mut len = 0;
206 let mut cap = out.len();
207
208 if !self.ready() {
209 return Err(Error::Done);
210 }
211
212 if let Some(e) = self.error {
215 self.data.clear();
216 return Err(Error::StreamReset(e));
217 }
218
219 while cap > 0 && self.ready() {
220 let mut entry = match self.data.first_entry() {
221 Some(entry) => entry,
222 None => break,
223 };
224
225 let buf = entry.get_mut();
226
227 let buf_len = cmp::min(buf.len(), cap);
228
229 out[len..len + buf_len].copy_from_slice(&buf[..buf_len]);
230
231 self.off += buf_len as u64;
232
233 len += buf_len;
234 cap -= buf_len;
235
236 if buf_len < buf.len() {
237 buf.consume(buf_len);
238
239 break;
241 }
242
243 entry.remove();
244 }
245
246 self.flow_control.add_consumed(len as u64);
248
249 Ok((len, self.is_fin()))
250 }
251
252 pub fn reset(&mut self, error_code: u64, final_size: u64) -> Result<usize> {
254 if let Some(fin_off) = self.fin_off {
256 if fin_off != final_size {
257 return Err(Error::FinalSize);
258 }
259 }
260
261 if final_size < self.len {
263 return Err(Error::FinalSize);
264 }
265
266 let max_data_delta = final_size - self.len;
269
270 if self.error.is_some() {
271 return Ok(max_data_delta as usize);
272 }
273
274 self.error = Some(error_code);
275
276 self.off = final_size;
278
279 self.data.clear();
280
281 let buf = RangeBuf::from(b"", final_size, true);
284 self.write(buf)?;
285
286 Ok(max_data_delta as usize)
287 }
288
289 pub fn update_max_data(&mut self, now: time::Instant) {
291 self.flow_control.update_max_data(now);
292 }
293
294 pub fn max_data_next(&mut self) -> u64 {
296 self.flow_control.max_data_next()
297 }
298
299 pub fn max_data(&self) -> u64 {
301 self.flow_control.max_data()
302 }
303
304 pub fn window(&self) -> u64 {
306 self.flow_control.window()
307 }
308
309 pub fn autotune_window(&mut self, now: time::Instant, rtt: time::Duration) {
311 self.flow_control.autotune_window(now, rtt);
312 }
313
314 pub fn shutdown(&mut self) -> Result<()> {
316 if self.drain {
317 return Err(Error::Done);
318 }
319
320 self.drain = true;
321
322 self.data.clear();
323
324 self.off = self.max_off();
325
326 Ok(())
327 }
328
329 pub fn off_front(&self) -> u64 {
331 self.off
332 }
333
334 pub fn almost_full(&self) -> bool {
336 self.fin_off.is_none() && self.flow_control.should_update_max_data()
337 }
338
339 pub fn max_off(&self) -> u64 {
341 self.len
342 }
343
344 pub fn is_fin(&self) -> bool {
349 if self.fin_off == Some(self.off) {
350 return true;
351 }
352
353 false
354 }
355
356 pub fn is_draining(&self) -> bool {
358 self.drain
359 }
360
361 pub fn ready(&self) -> bool {
363 let (_, buf) = match self.data.first_key_value() {
364 Some(v) => v,
365 None => return false,
366 };
367
368 buf.off() == self.off
369 }
370}
371
372#[cfg(test)]
373mod tests {
374 use super::*;
375
376 #[test]
377 fn empty_read() {
378 let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
379 assert_eq!(recv.len, 0);
380
381 let mut buf = [0; 32];
382
383 assert_eq!(recv.emit(&mut buf), Err(Error::Done));
384 }
385
386 #[test]
387 fn empty_stream_frame() {
388 let mut recv = RecvBuf::new(15, DEFAULT_STREAM_WINDOW);
389 assert_eq!(recv.len, 0);
390
391 let buf = RangeBuf::from(b"hello", 0, false);
392 assert!(recv.write(buf).is_ok());
393 assert_eq!(recv.len, 5);
394 assert_eq!(recv.off, 0);
395 assert_eq!(recv.data.len(), 1);
396
397 let mut buf = [0; 32];
398 assert_eq!(recv.emit(&mut buf), Ok((5, false)));
399
400 let buf = RangeBuf::from(b"", 10, false);
402 assert!(recv.write(buf).is_ok());
403 assert_eq!(recv.len, 5);
404 assert_eq!(recv.off, 5);
405 assert_eq!(recv.data.len(), 0);
406
407 let buf = RangeBuf::from(b"", 16, false);
409 assert_eq!(recv.write(buf), Err(Error::FlowControl));
410
411 let buf = RangeBuf::from(b"", 5, true);
413 assert!(recv.write(buf).is_ok());
414 assert_eq!(recv.len, 5);
415 assert_eq!(recv.off, 5);
416 assert_eq!(recv.data.len(), 1);
417
418 let buf = RangeBuf::from(b"", 5, true);
420 assert!(recv.write(buf).is_ok());
421 assert_eq!(recv.len, 5);
422 assert_eq!(recv.off, 5);
423 assert_eq!(recv.data.len(), 1);
424
425 let buf = RangeBuf::from(b"aa", 3, true);
427 assert!(recv.write(buf).is_ok());
428 assert_eq!(recv.len, 5);
429 assert_eq!(recv.off, 5);
430 assert_eq!(recv.data.len(), 1);
431
432 let buf = RangeBuf::from(b"", 6, true);
434 assert_eq!(recv.write(buf), Err(Error::FinalSize));
435 let buf = RangeBuf::from(b"", 4, true);
436 assert_eq!(recv.write(buf), Err(Error::FinalSize));
437
438 let mut buf = [0; 32];
439 assert_eq!(recv.emit(&mut buf), Ok((0, true)));
440 }
441
442 #[test]
443 fn ordered_read() {
444 let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
445 assert_eq!(recv.len, 0);
446
447 let mut buf = [0; 32];
448
449 let first = RangeBuf::from(b"hello", 0, false);
450 let second = RangeBuf::from(b"world", 5, false);
451 let third = RangeBuf::from(b"something", 10, true);
452
453 assert!(recv.write(second).is_ok());
454 assert_eq!(recv.len, 10);
455 assert_eq!(recv.off, 0);
456
457 assert_eq!(recv.emit(&mut buf), Err(Error::Done));
458
459 assert!(recv.write(third).is_ok());
460 assert_eq!(recv.len, 19);
461 assert_eq!(recv.off, 0);
462
463 assert_eq!(recv.emit(&mut buf), Err(Error::Done));
464
465 assert!(recv.write(first).is_ok());
466 assert_eq!(recv.len, 19);
467 assert_eq!(recv.off, 0);
468
469 let (len, fin) = recv.emit(&mut buf).unwrap();
470 assert_eq!(len, 19);
471 assert!(fin);
472 assert_eq!(&buf[..len], b"helloworldsomething");
473 assert_eq!(recv.len, 19);
474 assert_eq!(recv.off, 19);
475
476 assert_eq!(recv.emit(&mut buf), Err(Error::Done));
477 }
478
479 #[test]
480 fn split_read() {
481 let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
482 assert_eq!(recv.len, 0);
483
484 let mut buf = [0; 32];
485
486 let first = RangeBuf::from(b"something", 0, false);
487 let second = RangeBuf::from(b"helloworld", 9, true);
488
489 assert!(recv.write(first).is_ok());
490 assert_eq!(recv.len, 9);
491 assert_eq!(recv.off, 0);
492
493 assert!(recv.write(second).is_ok());
494 assert_eq!(recv.len, 19);
495 assert_eq!(recv.off, 0);
496
497 let (len, fin) = recv.emit(&mut buf[..10]).unwrap();
498 assert_eq!(len, 10);
499 assert!(!fin);
500 assert_eq!(&buf[..len], b"somethingh");
501 assert_eq!(recv.len, 19);
502 assert_eq!(recv.off, 10);
503
504 let (len, fin) = recv.emit(&mut buf[..5]).unwrap();
505 assert_eq!(len, 5);
506 assert!(!fin);
507 assert_eq!(&buf[..len], b"ellow");
508 assert_eq!(recv.len, 19);
509 assert_eq!(recv.off, 15);
510
511 let (len, fin) = recv.emit(&mut buf[..10]).unwrap();
512 assert_eq!(len, 4);
513 assert!(fin);
514 assert_eq!(&buf[..len], b"orld");
515 assert_eq!(recv.len, 19);
516 assert_eq!(recv.off, 19);
517 }
518
519 #[test]
520 fn incomplete_read() {
521 let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
522 assert_eq!(recv.len, 0);
523
524 let mut buf = [0; 32];
525
526 let first = RangeBuf::from(b"something", 0, false);
527 let second = RangeBuf::from(b"helloworld", 9, true);
528
529 assert!(recv.write(second).is_ok());
530 assert_eq!(recv.len, 19);
531 assert_eq!(recv.off, 0);
532
533 assert_eq!(recv.emit(&mut buf), Err(Error::Done));
534
535 assert!(recv.write(first).is_ok());
536 assert_eq!(recv.len, 19);
537 assert_eq!(recv.off, 0);
538
539 let (len, fin) = recv.emit(&mut buf).unwrap();
540 assert_eq!(len, 19);
541 assert!(fin);
542 assert_eq!(&buf[..len], b"somethinghelloworld");
543 assert_eq!(recv.len, 19);
544 assert_eq!(recv.off, 19);
545 }
546
547 #[test]
548 fn zero_len_read() {
549 let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
550 assert_eq!(recv.len, 0);
551
552 let mut buf = [0; 32];
553
554 let first = RangeBuf::from(b"something", 0, false);
555 let second = RangeBuf::from(b"", 9, true);
556
557 assert!(recv.write(first).is_ok());
558 assert_eq!(recv.len, 9);
559 assert_eq!(recv.off, 0);
560 assert_eq!(recv.data.len(), 1);
561
562 assert!(recv.write(second).is_ok());
563 assert_eq!(recv.len, 9);
564 assert_eq!(recv.off, 0);
565 assert_eq!(recv.data.len(), 1);
566
567 let (len, fin) = recv.emit(&mut buf).unwrap();
568 assert_eq!(len, 9);
569 assert!(fin);
570 assert_eq!(&buf[..len], b"something");
571 assert_eq!(recv.len, 9);
572 assert_eq!(recv.off, 9);
573 }
574
575 #[test]
576 fn past_read() {
577 let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
578 assert_eq!(recv.len, 0);
579
580 let mut buf = [0; 32];
581
582 let first = RangeBuf::from(b"something", 0, false);
583 let second = RangeBuf::from(b"hello", 3, false);
584 let third = RangeBuf::from(b"ello", 4, true);
585 let fourth = RangeBuf::from(b"ello", 5, true);
586
587 assert!(recv.write(first).is_ok());
588 assert_eq!(recv.len, 9);
589 assert_eq!(recv.off, 0);
590 assert_eq!(recv.data.len(), 1);
591
592 let (len, fin) = recv.emit(&mut buf).unwrap();
593 assert_eq!(len, 9);
594 assert!(!fin);
595 assert_eq!(&buf[..len], b"something");
596 assert_eq!(recv.len, 9);
597 assert_eq!(recv.off, 9);
598
599 assert!(recv.write(second).is_ok());
600 assert_eq!(recv.len, 9);
601 assert_eq!(recv.off, 9);
602 assert_eq!(recv.data.len(), 0);
603
604 assert_eq!(recv.write(third), Err(Error::FinalSize));
605
606 assert!(recv.write(fourth).is_ok());
607 assert_eq!(recv.len, 9);
608 assert_eq!(recv.off, 9);
609 assert_eq!(recv.data.len(), 0);
610
611 assert_eq!(recv.emit(&mut buf), Err(Error::Done));
612 }
613
614 #[test]
615 fn fully_overlapping_read() {
616 let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
617 assert_eq!(recv.len, 0);
618
619 let mut buf = [0; 32];
620
621 let first = RangeBuf::from(b"something", 0, false);
622 let second = RangeBuf::from(b"hello", 4, false);
623
624 assert!(recv.write(first).is_ok());
625 assert_eq!(recv.len, 9);
626 assert_eq!(recv.off, 0);
627 assert_eq!(recv.data.len(), 1);
628
629 assert!(recv.write(second).is_ok());
630 assert_eq!(recv.len, 9);
631 assert_eq!(recv.off, 0);
632 assert_eq!(recv.data.len(), 1);
633
634 let (len, fin) = recv.emit(&mut buf).unwrap();
635 assert_eq!(len, 9);
636 assert!(!fin);
637 assert_eq!(&buf[..len], b"something");
638 assert_eq!(recv.len, 9);
639 assert_eq!(recv.off, 9);
640 assert_eq!(recv.data.len(), 0);
641
642 assert_eq!(recv.emit(&mut buf), Err(Error::Done));
643 }
644
645 #[test]
646 fn fully_overlapping_read2() {
647 let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
648 assert_eq!(recv.len, 0);
649
650 let mut buf = [0; 32];
651
652 let first = RangeBuf::from(b"something", 0, false);
653 let second = RangeBuf::from(b"hello", 4, false);
654
655 assert!(recv.write(second).is_ok());
656 assert_eq!(recv.len, 9);
657 assert_eq!(recv.off, 0);
658 assert_eq!(recv.data.len(), 1);
659
660 assert!(recv.write(first).is_ok());
661 assert_eq!(recv.len, 9);
662 assert_eq!(recv.off, 0);
663 assert_eq!(recv.data.len(), 2);
664
665 let (len, fin) = recv.emit(&mut buf).unwrap();
666 assert_eq!(len, 9);
667 assert!(!fin);
668 assert_eq!(&buf[..len], b"somehello");
669 assert_eq!(recv.len, 9);
670 assert_eq!(recv.off, 9);
671 assert_eq!(recv.data.len(), 0);
672
673 assert_eq!(recv.emit(&mut buf), Err(Error::Done));
674 }
675
676 #[test]
677 fn fully_overlapping_read3() {
678 let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
679 assert_eq!(recv.len, 0);
680
681 let mut buf = [0; 32];
682
683 let first = RangeBuf::from(b"something", 0, false);
684 let second = RangeBuf::from(b"hello", 3, false);
685
686 assert!(recv.write(second).is_ok());
687 assert_eq!(recv.len, 8);
688 assert_eq!(recv.off, 0);
689 assert_eq!(recv.data.len(), 1);
690
691 assert!(recv.write(first).is_ok());
692 assert_eq!(recv.len, 9);
693 assert_eq!(recv.off, 0);
694 assert_eq!(recv.data.len(), 3);
695
696 let (len, fin) = recv.emit(&mut buf).unwrap();
697 assert_eq!(len, 9);
698 assert!(!fin);
699 assert_eq!(&buf[..len], b"somhellog");
700 assert_eq!(recv.len, 9);
701 assert_eq!(recv.off, 9);
702 assert_eq!(recv.data.len(), 0);
703
704 assert_eq!(recv.emit(&mut buf), Err(Error::Done));
705 }
706
707 #[test]
708 fn fully_overlapping_read_multi() {
709 let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
710 assert_eq!(recv.len, 0);
711
712 let mut buf = [0; 32];
713
714 let first = RangeBuf::from(b"somethingsomething", 0, false);
715 let second = RangeBuf::from(b"hello", 3, false);
716 let third = RangeBuf::from(b"hello", 12, false);
717
718 assert!(recv.write(second).is_ok());
719 assert_eq!(recv.len, 8);
720 assert_eq!(recv.off, 0);
721 assert_eq!(recv.data.len(), 1);
722
723 assert!(recv.write(third).is_ok());
724 assert_eq!(recv.len, 17);
725 assert_eq!(recv.off, 0);
726 assert_eq!(recv.data.len(), 2);
727
728 assert!(recv.write(first).is_ok());
729 assert_eq!(recv.len, 18);
730 assert_eq!(recv.off, 0);
731 assert_eq!(recv.data.len(), 5);
732
733 let (len, fin) = recv.emit(&mut buf).unwrap();
734 assert_eq!(len, 18);
735 assert!(!fin);
736 assert_eq!(&buf[..len], b"somhellogsomhellog");
737 assert_eq!(recv.len, 18);
738 assert_eq!(recv.off, 18);
739 assert_eq!(recv.data.len(), 0);
740
741 assert_eq!(recv.emit(&mut buf), Err(Error::Done));
742 }
743
744 #[test]
745 fn overlapping_start_read() {
746 let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
747 assert_eq!(recv.len, 0);
748
749 let mut buf = [0; 32];
750
751 let first = RangeBuf::from(b"something", 0, false);
752 let second = RangeBuf::from(b"hello", 8, true);
753
754 assert!(recv.write(first).is_ok());
755 assert_eq!(recv.len, 9);
756 assert_eq!(recv.off, 0);
757 assert_eq!(recv.data.len(), 1);
758
759 assert!(recv.write(second).is_ok());
760 assert_eq!(recv.len, 13);
761 assert_eq!(recv.off, 0);
762 assert_eq!(recv.data.len(), 2);
763
764 let (len, fin) = recv.emit(&mut buf).unwrap();
765 assert_eq!(len, 13);
766 assert!(fin);
767 assert_eq!(&buf[..len], b"somethingello");
768 assert_eq!(recv.len, 13);
769 assert_eq!(recv.off, 13);
770
771 assert_eq!(recv.emit(&mut buf), Err(Error::Done));
772 }
773
774 #[test]
775 fn overlapping_end_read() {
776 let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
777 assert_eq!(recv.len, 0);
778
779 let mut buf = [0; 32];
780
781 let first = RangeBuf::from(b"hello", 0, false);
782 let second = RangeBuf::from(b"something", 3, true);
783
784 assert!(recv.write(second).is_ok());
785 assert_eq!(recv.len, 12);
786 assert_eq!(recv.off, 0);
787 assert_eq!(recv.data.len(), 1);
788
789 assert!(recv.write(first).is_ok());
790 assert_eq!(recv.len, 12);
791 assert_eq!(recv.off, 0);
792 assert_eq!(recv.data.len(), 2);
793
794 let (len, fin) = recv.emit(&mut buf).unwrap();
795 assert_eq!(len, 12);
796 assert!(fin);
797 assert_eq!(&buf[..len], b"helsomething");
798 assert_eq!(recv.len, 12);
799 assert_eq!(recv.off, 12);
800
801 assert_eq!(recv.emit(&mut buf), Err(Error::Done));
802 }
803
804 #[test]
805 fn overlapping_end_twice_read() {
806 let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
807 assert_eq!(recv.len, 0);
808
809 let mut buf = [0; 32];
810
811 let first = RangeBuf::from(b"he", 0, false);
812 let second = RangeBuf::from(b"ow", 4, false);
813 let third = RangeBuf::from(b"rl", 7, false);
814 let fourth = RangeBuf::from(b"helloworld", 0, true);
815
816 assert!(recv.write(third).is_ok());
817 assert_eq!(recv.len, 9);
818 assert_eq!(recv.off, 0);
819 assert_eq!(recv.data.len(), 1);
820
821 assert!(recv.write(second).is_ok());
822 assert_eq!(recv.len, 9);
823 assert_eq!(recv.off, 0);
824 assert_eq!(recv.data.len(), 2);
825
826 assert!(recv.write(first).is_ok());
827 assert_eq!(recv.len, 9);
828 assert_eq!(recv.off, 0);
829 assert_eq!(recv.data.len(), 3);
830
831 assert!(recv.write(fourth).is_ok());
832 assert_eq!(recv.len, 10);
833 assert_eq!(recv.off, 0);
834 assert_eq!(recv.data.len(), 6);
835
836 let (len, fin) = recv.emit(&mut buf).unwrap();
837 assert_eq!(len, 10);
838 assert!(fin);
839 assert_eq!(&buf[..len], b"helloworld");
840 assert_eq!(recv.len, 10);
841 assert_eq!(recv.off, 10);
842
843 assert_eq!(recv.emit(&mut buf), Err(Error::Done));
844 }
845
846 #[test]
847 fn overlapping_end_twice_and_contained_read() {
848 let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
849 assert_eq!(recv.len, 0);
850
851 let mut buf = [0; 32];
852
853 let first = RangeBuf::from(b"hellow", 0, false);
854 let second = RangeBuf::from(b"barfoo", 10, true);
855 let third = RangeBuf::from(b"rl", 7, false);
856 let fourth = RangeBuf::from(b"elloworldbarfoo", 1, true);
857
858 assert!(recv.write(third).is_ok());
859 assert_eq!(recv.len, 9);
860 assert_eq!(recv.off, 0);
861 assert_eq!(recv.data.len(), 1);
862
863 assert!(recv.write(second).is_ok());
864 assert_eq!(recv.len, 16);
865 assert_eq!(recv.off, 0);
866 assert_eq!(recv.data.len(), 2);
867
868 assert!(recv.write(first).is_ok());
869 assert_eq!(recv.len, 16);
870 assert_eq!(recv.off, 0);
871 assert_eq!(recv.data.len(), 3);
872
873 assert!(recv.write(fourth).is_ok());
874 assert_eq!(recv.len, 16);
875 assert_eq!(recv.off, 0);
876 assert_eq!(recv.data.len(), 5);
877
878 let (len, fin) = recv.emit(&mut buf).unwrap();
879 assert_eq!(len, 16);
880 assert!(fin);
881 assert_eq!(&buf[..len], b"helloworldbarfoo");
882 assert_eq!(recv.len, 16);
883 assert_eq!(recv.off, 16);
884
885 assert_eq!(recv.emit(&mut buf), Err(Error::Done));
886 }
887
888 #[test]
889 fn partially_multi_overlapping_reordered_read() {
890 let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
891 assert_eq!(recv.len, 0);
892
893 let mut buf = [0; 32];
894
895 let first = RangeBuf::from(b"hello", 8, false);
896 let second = RangeBuf::from(b"something", 0, false);
897 let third = RangeBuf::from(b"moar", 11, true);
898
899 assert!(recv.write(first).is_ok());
900 assert_eq!(recv.len, 13);
901 assert_eq!(recv.off, 0);
902 assert_eq!(recv.data.len(), 1);
903
904 assert!(recv.write(second).is_ok());
905 assert_eq!(recv.len, 13);
906 assert_eq!(recv.off, 0);
907 assert_eq!(recv.data.len(), 2);
908
909 assert!(recv.write(third).is_ok());
910 assert_eq!(recv.len, 15);
911 assert_eq!(recv.off, 0);
912 assert_eq!(recv.data.len(), 3);
913
914 let (len, fin) = recv.emit(&mut buf).unwrap();
915 assert_eq!(len, 15);
916 assert!(fin);
917 assert_eq!(&buf[..len], b"somethinhelloar");
918 assert_eq!(recv.len, 15);
919 assert_eq!(recv.off, 15);
920 assert_eq!(recv.data.len(), 0);
921
922 assert_eq!(recv.emit(&mut buf), Err(Error::Done));
923 }
924
925 #[test]
926 fn partially_multi_overlapping_reordered_read2() {
927 let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
928 assert_eq!(recv.len, 0);
929
930 let mut buf = [0; 32];
931
932 let first = RangeBuf::from(b"aaa", 0, false);
933 let second = RangeBuf::from(b"bbb", 2, false);
934 let third = RangeBuf::from(b"ccc", 4, false);
935 let fourth = RangeBuf::from(b"ddd", 6, false);
936 let fifth = RangeBuf::from(b"eee", 9, false);
937 let sixth = RangeBuf::from(b"fff", 11, false);
938
939 assert!(recv.write(second).is_ok());
940 assert_eq!(recv.len, 5);
941 assert_eq!(recv.off, 0);
942 assert_eq!(recv.data.len(), 1);
943
944 assert!(recv.write(fourth).is_ok());
945 assert_eq!(recv.len, 9);
946 assert_eq!(recv.off, 0);
947 assert_eq!(recv.data.len(), 2);
948
949 assert!(recv.write(third).is_ok());
950 assert_eq!(recv.len, 9);
951 assert_eq!(recv.off, 0);
952 assert_eq!(recv.data.len(), 3);
953
954 assert!(recv.write(first).is_ok());
955 assert_eq!(recv.len, 9);
956 assert_eq!(recv.off, 0);
957 assert_eq!(recv.data.len(), 4);
958
959 assert!(recv.write(sixth).is_ok());
960 assert_eq!(recv.len, 14);
961 assert_eq!(recv.off, 0);
962 assert_eq!(recv.data.len(), 5);
963
964 assert!(recv.write(fifth).is_ok());
965 assert_eq!(recv.len, 14);
966 assert_eq!(recv.off, 0);
967 assert_eq!(recv.data.len(), 6);
968
969 let (len, fin) = recv.emit(&mut buf).unwrap();
970 assert_eq!(len, 14);
971 assert!(!fin);
972 assert_eq!(&buf[..len], b"aabbbcdddeefff");
973 assert_eq!(recv.len, 14);
974 assert_eq!(recv.off, 14);
975 assert_eq!(recv.data.len(), 0);
976
977 assert_eq!(recv.emit(&mut buf), Err(Error::Done));
978 }
979}