1use std::cmp;
28
29use std::sync::Arc;
30
31use std::collections::hash_map;
32use std::collections::HashMap;
33use std::collections::HashSet;
34
35use intrusive_collections::intrusive_adapter;
36use intrusive_collections::KeyAdapter;
37use intrusive_collections::RBTree;
38use intrusive_collections::RBTreeAtomicLink;
39
40use smallvec::SmallVec;
41
42use crate::range_buf::DefaultBufFactory;
43use crate::BufFactory;
44use crate::Error;
45use crate::Result;
46
47const DEFAULT_URGENCY: u8 = 127;
48
49const DEFAULT_STREAM_WINDOW: u64 = 32 * 1024;
51
52pub const MAX_STREAM_WINDOW: u64 = 16 * 1024 * 1024;
54
55#[derive(Default)]
60pub struct StreamIdHasher {
61 id: u64,
62}
63
64#[derive(Debug, PartialEq, Clone, Copy)]
66pub struct RecvBufResetReturn {
67 pub max_data_delta: u64,
70
71 pub consumed_flowcontrol: u64,
74}
75
76impl RecvBufResetReturn {
77 pub fn zero() -> Self {
78 Self {
79 max_data_delta: 0,
80 consumed_flowcontrol: 0,
81 }
82 }
83}
84
85pub enum RecvAction<'a> {
87 Emit { out: &'a mut [u8] },
89 Discard { len: usize },
91}
92
93impl std::hash::Hasher for StreamIdHasher {
94 #[inline]
95 fn finish(&self) -> u64 {
96 self.id
97 }
98
99 #[inline]
100 fn write_u64(&mut self, id: u64) {
101 self.id = id;
102 }
103
104 #[inline]
105 fn write(&mut self, _: &[u8]) {
106 unimplemented!()
109 }
110}
111
112type BuildStreamIdHasher = std::hash::BuildHasherDefault<StreamIdHasher>;
113
114pub type StreamIdHashMap<V> = HashMap<u64, V, BuildStreamIdHasher>;
115pub type StreamIdHashSet = HashSet<u64, BuildStreamIdHasher>;
116
117#[derive(Default)]
119pub struct StreamMap<F: BufFactory = DefaultBufFactory> {
120 streams: StreamIdHashMap<Stream<F>>,
122
123 collected: StreamIdHashSet,
129
130 peer_max_streams_bidi: u64,
132
133 peer_max_streams_uni: u64,
135
136 peer_opened_streams_bidi: u64,
138
139 peer_opened_streams_uni: u64,
141
142 local_max_streams_bidi: u64,
144 local_max_streams_bidi_next: u64,
145
146 local_max_streams_uni: u64,
148 local_max_streams_uni_next: u64,
149
150 local_opened_streams_bidi: u64,
152
153 local_opened_streams_uni: u64,
155
156 flushable: RBTree<StreamFlushablePriorityAdapter>,
160
161 pub readable: RBTree<StreamReadablePriorityAdapter>,
165
166 pub writable: RBTree<StreamWritablePriorityAdapter>,
171
172 almost_full: StreamIdHashSet,
177
178 blocked: StreamIdHashMap<u64>,
182
183 reset: StreamIdHashMap<(u64, u64)>,
187
188 stopped: StreamIdHashMap<u64>,
192
193 max_stream_window: u64,
195}
196
197impl<F: BufFactory> StreamMap<F> {
198 pub fn new(
199 max_streams_bidi: u64, max_streams_uni: u64, max_stream_window: u64,
200 ) -> Self {
201 StreamMap {
202 local_max_streams_bidi: max_streams_bidi,
203 local_max_streams_bidi_next: max_streams_bidi,
204
205 local_max_streams_uni: max_streams_uni,
206 local_max_streams_uni_next: max_streams_uni,
207
208 max_stream_window,
209
210 ..StreamMap::default()
211 }
212 }
213
214 pub fn get(&self, id: u64) -> Option<&Stream<F>> {
216 self.streams.get(&id)
217 }
218
219 pub fn get_mut(&mut self, id: u64) -> Option<&mut Stream<F>> {
221 self.streams.get_mut(&id)
222 }
223
224 pub(crate) fn get_or_create(
237 &mut self, id: u64, local_params: &crate::TransportParams,
238 peer_params: &crate::TransportParams, local: bool, is_server: bool,
239 ) -> Result<&mut Stream<F>> {
240 let (stream, is_new_and_writable) = match self.streams.entry(id) {
241 hash_map::Entry::Vacant(v) => {
242 if self.collected.contains(&id) {
244 return Err(Error::Done);
245 }
246
247 if local != is_local(id, is_server) {
248 return Err(Error::InvalidStreamState(id));
249 }
250
251 let (max_rx_data, max_tx_data) = match (local, is_bidi(id)) {
252 (true, true) => (
254 local_params.initial_max_stream_data_bidi_local,
255 peer_params.initial_max_stream_data_bidi_remote,
256 ),
257
258 (true, false) => (0, peer_params.initial_max_stream_data_uni),
260
261 (false, true) => (
263 local_params.initial_max_stream_data_bidi_remote,
264 peer_params.initial_max_stream_data_bidi_local,
265 ),
266
267 (false, false) =>
269 (local_params.initial_max_stream_data_uni, 0),
270 };
271
272 let stream_sequence = id >> 2;
276
277 match (is_local(id, is_server), is_bidi(id)) {
279 (true, true) => {
280 let n = cmp::max(
281 self.local_opened_streams_bidi,
282 stream_sequence + 1,
283 );
284
285 if n > self.peer_max_streams_bidi {
286 return Err(Error::StreamLimit);
287 }
288
289 self.local_opened_streams_bidi = n;
290 },
291
292 (true, false) => {
293 let n = cmp::max(
294 self.local_opened_streams_uni,
295 stream_sequence + 1,
296 );
297
298 if n > self.peer_max_streams_uni {
299 return Err(Error::StreamLimit);
300 }
301
302 self.local_opened_streams_uni = n;
303 },
304
305 (false, true) => {
306 let n = cmp::max(
307 self.peer_opened_streams_bidi,
308 stream_sequence + 1,
309 );
310
311 if n > self.local_max_streams_bidi {
312 return Err(Error::StreamLimit);
313 }
314
315 self.peer_opened_streams_bidi = n;
316 },
317
318 (false, false) => {
319 let n = cmp::max(
320 self.peer_opened_streams_uni,
321 stream_sequence + 1,
322 );
323
324 if n > self.local_max_streams_uni {
325 return Err(Error::StreamLimit);
326 }
327
328 self.peer_opened_streams_uni = n;
329 },
330 };
331
332 let s = Stream::new(
333 id,
334 max_rx_data,
335 max_tx_data,
336 is_bidi(id),
337 local,
338 self.max_stream_window,
339 );
340
341 let is_writable = s.is_writable();
342
343 (v.insert(s), is_writable)
344 },
345
346 hash_map::Entry::Occupied(v) => (v.into_mut(), false),
347 };
348
349 if is_new_and_writable {
352 self.writable.insert(Arc::clone(&stream.priority_key));
353 }
354
355 Ok(stream)
356 }
357
358 pub fn insert_readable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
362 if !priority_key.readable.is_linked() {
363 self.readable.insert(Arc::clone(priority_key));
364 }
365 }
366
367 pub fn remove_readable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
369 if !priority_key.readable.is_linked() {
370 return;
371 }
372
373 let mut c = {
374 let ptr = Arc::as_ptr(priority_key);
375 unsafe { self.readable.cursor_mut_from_ptr(ptr) }
376 };
377
378 c.remove();
379 }
380
381 pub fn insert_writable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
388 if !priority_key.writable.is_linked() {
389 self.writable.insert(Arc::clone(priority_key));
390 }
391 }
392
393 pub fn remove_writable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
398 if !priority_key.writable.is_linked() {
399 return;
400 }
401
402 let mut c = {
403 let ptr = Arc::as_ptr(priority_key);
404 unsafe { self.writable.cursor_mut_from_ptr(ptr) }
405 };
406
407 c.remove();
408 }
409
410 pub fn insert_flushable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
414 if !priority_key.flushable.is_linked() {
415 self.flushable.insert(Arc::clone(priority_key));
416 }
417 }
418
419 pub fn remove_flushable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
421 if !priority_key.flushable.is_linked() {
422 return;
423 }
424
425 let mut c = {
426 let ptr = Arc::as_ptr(priority_key);
427 unsafe { self.flushable.cursor_mut_from_ptr(ptr) }
428 };
429
430 c.remove();
431 }
432
433 pub fn peek_flushable(&self) -> Option<Arc<StreamPriorityKey>> {
434 self.flushable.front().clone_pointer()
435 }
436
437 pub fn update_priority(
439 &mut self, old: &Arc<StreamPriorityKey>, new: &Arc<StreamPriorityKey>,
440 ) {
441 if old.readable.is_linked() {
442 self.remove_readable(old);
443 self.readable.insert(Arc::clone(new));
444 }
445
446 if old.writable.is_linked() {
447 self.remove_writable(old);
448 self.writable.insert(Arc::clone(new));
449 }
450
451 if old.flushable.is_linked() {
452 self.remove_flushable(old);
453 self.flushable.insert(Arc::clone(new));
454 }
455 }
456
457 pub fn insert_almost_full(&mut self, stream_id: u64) {
461 self.almost_full.insert(stream_id);
462 }
463
464 pub fn remove_almost_full(&mut self, stream_id: u64) {
466 self.almost_full.remove(&stream_id);
467 }
468
469 pub fn insert_blocked(&mut self, stream_id: u64, off: u64) {
474 self.blocked.insert(stream_id, off);
475 }
476
477 pub fn remove_blocked(&mut self, stream_id: u64) {
479 self.blocked.remove(&stream_id);
480 }
481
482 pub fn insert_reset(
487 &mut self, stream_id: u64, error_code: u64, final_size: u64,
488 ) {
489 self.reset.insert(stream_id, (error_code, final_size));
490 }
491
492 pub fn remove_reset(&mut self, stream_id: u64) {
494 self.reset.remove(&stream_id);
495 }
496
497 pub fn insert_stopped(&mut self, stream_id: u64, error_code: u64) {
502 self.stopped.insert(stream_id, error_code);
503 }
504
505 pub fn remove_stopped(&mut self, stream_id: u64) {
507 self.stopped.remove(&stream_id);
508 }
509
510 pub fn update_peer_max_streams_bidi(&mut self, v: u64) {
512 self.peer_max_streams_bidi = cmp::max(self.peer_max_streams_bidi, v);
513 }
514
515 pub fn update_peer_max_streams_uni(&mut self, v: u64) {
517 self.peer_max_streams_uni = cmp::max(self.peer_max_streams_uni, v);
518 }
519
520 pub fn update_max_streams_bidi(&mut self) {
522 self.local_max_streams_bidi = self.local_max_streams_bidi_next;
523 }
524
525 pub fn set_max_streams_bidi(&mut self, max: u64) {
527 self.local_max_streams_bidi = max;
528 self.local_max_streams_bidi_next = max;
529 }
530
531 pub fn max_streams_bidi(&self) -> u64 {
533 self.local_max_streams_bidi
534 }
535
536 pub fn max_streams_bidi_next(&mut self) -> u64 {
538 self.local_max_streams_bidi_next
539 }
540
541 pub fn update_max_streams_uni(&mut self) {
543 self.local_max_streams_uni = self.local_max_streams_uni_next;
544 }
545
546 pub fn max_streams_uni_next(&mut self) -> u64 {
548 self.local_max_streams_uni_next
549 }
550
551 pub fn peer_streams_left_bidi(&self) -> u64 {
554 self.peer_max_streams_bidi - self.local_opened_streams_bidi
555 }
556
557 pub fn peer_streams_left_uni(&self) -> u64 {
560 self.peer_max_streams_uni - self.local_opened_streams_uni
561 }
562
563 pub fn collect(&mut self, stream_id: u64, local: bool) {
568 if !local {
569 if is_bidi(stream_id) {
572 self.local_max_streams_bidi_next =
573 self.local_max_streams_bidi_next.saturating_add(1);
574 } else {
575 self.local_max_streams_uni_next =
576 self.local_max_streams_uni_next.saturating_add(1);
577 }
578 }
579
580 let s = self.streams.remove(&stream_id).unwrap();
581
582 self.remove_readable(&s.priority_key);
583
584 self.remove_writable(&s.priority_key);
585
586 self.remove_flushable(&s.priority_key);
587
588 self.collected.insert(stream_id);
589 }
590
591 pub fn readable(&self) -> StreamIter {
593 StreamIter {
594 streams: self.readable.iter().map(|s| s.id).collect(),
595 index: 0,
596 }
597 }
598
599 pub fn writable(&self) -> StreamIter {
601 StreamIter {
602 streams: self.writable.iter().map(|s| s.id).collect(),
603 index: 0,
604 }
605 }
606
607 pub fn almost_full(&self) -> StreamIter {
609 StreamIter::from(&self.almost_full)
610 }
611
612 pub fn blocked(&self) -> hash_map::Iter<'_, u64, u64> {
614 self.blocked.iter()
615 }
616
617 pub fn reset(&self) -> hash_map::Iter<'_, u64, (u64, u64)> {
619 self.reset.iter()
620 }
621
622 pub fn stopped(&self) -> hash_map::Iter<'_, u64, u64> {
624 self.stopped.iter()
625 }
626
627 pub fn is_collected(&self, stream_id: u64) -> bool {
629 self.collected.contains(&stream_id)
630 }
631
632 pub fn has_flushable(&self) -> bool {
634 !self.flushable.is_empty()
635 }
636
637 pub fn has_readable(&self) -> bool {
639 !self.readable.is_empty()
640 }
641
642 pub fn has_almost_full(&self) -> bool {
645 !self.almost_full.is_empty()
646 }
647
648 pub fn has_blocked(&self) -> bool {
650 !self.blocked.is_empty()
651 }
652
653 pub fn has_reset(&self) -> bool {
655 !self.reset.is_empty()
656 }
657
658 pub fn has_stopped(&self) -> bool {
660 !self.stopped.is_empty()
661 }
662
663 pub fn should_update_max_streams_bidi(&self) -> bool {
666 self.local_max_streams_bidi_next != self.local_max_streams_bidi &&
667 self.local_max_streams_bidi_next / 2 >
668 self.local_max_streams_bidi - self.peer_opened_streams_bidi
669 }
670
671 pub fn should_update_max_streams_uni(&self) -> bool {
674 self.local_max_streams_uni_next != self.local_max_streams_uni &&
675 self.local_max_streams_uni_next / 2 >
676 self.local_max_streams_uni - self.peer_opened_streams_uni
677 }
678
679 #[cfg(test)]
681 pub fn len(&self) -> usize {
682 self.streams.len()
683 }
684}
685
686pub struct Stream<F: BufFactory = DefaultBufFactory> {
688 pub recv: recv_buf::RecvBuf,
690
691 pub send: send_buf::SendBuf<F>,
693
694 pub send_lowat: usize,
695
696 pub bidi: bool,
698
699 pub local: bool,
701
702 pub urgency: u8,
704
705 pub incremental: bool,
707
708 pub priority_key: Arc<StreamPriorityKey>,
709}
710
711impl<F: BufFactory> Stream<F> {
712 pub fn new(
714 id: u64, max_rx_data: u64, max_tx_data: u64, bidi: bool, local: bool,
715 max_window: u64,
716 ) -> Self {
717 let priority_key = Arc::new(StreamPriorityKey {
718 id,
719 ..Default::default()
720 });
721
722 Stream {
723 recv: recv_buf::RecvBuf::new(max_rx_data, max_window),
724 send: send_buf::SendBuf::new(max_tx_data),
725 send_lowat: 1,
726 bidi,
727 local,
728 urgency: priority_key.urgency,
729 incremental: priority_key.incremental,
730 priority_key,
731 }
732 }
733
734 pub fn is_readable(&self) -> bool {
736 self.recv.ready()
737 }
738
739 pub fn is_writable(&self) -> bool {
742 !self.send.is_shutdown() &&
743 !self.send.is_fin() &&
744 (self.send.off_back() + self.send_lowat as u64) <
745 self.send.max_off()
746 }
747
748 pub fn is_flushable(&self) -> bool {
751 let off_front = self.send.off_front();
752
753 !self.send.is_empty() &&
754 off_front < self.send.off_back() &&
755 off_front < self.send.max_off()
756 }
757
758 pub fn is_complete(&self) -> bool {
768 match (self.bidi, self.local) {
769 (true, _) => self.recv.is_fin() && self.send.is_complete(),
772
773 (false, true) => self.send.is_complete(),
776
777 (false, false) => self.recv.is_fin(),
780 }
781 }
782}
783
784pub fn is_local(stream_id: u64, is_server: bool) -> bool {
786 (stream_id & 0x1) == (is_server as u64)
787}
788
789pub fn is_bidi(stream_id: u64) -> bool {
791 (stream_id & 0x2) == 0
792}
793
794#[derive(Clone, Debug)]
795pub struct StreamPriorityKey {
796 pub urgency: u8,
797 pub incremental: bool,
798 pub id: u64,
799
800 pub readable: RBTreeAtomicLink,
801 pub writable: RBTreeAtomicLink,
802 pub flushable: RBTreeAtomicLink,
803}
804
805impl Default for StreamPriorityKey {
806 fn default() -> Self {
807 Self {
808 urgency: DEFAULT_URGENCY,
809 incremental: true,
810 id: Default::default(),
811 readable: Default::default(),
812 writable: Default::default(),
813 flushable: Default::default(),
814 }
815 }
816}
817
818impl PartialEq for StreamPriorityKey {
819 fn eq(&self, other: &Self) -> bool {
820 self.id == other.id
821 }
822}
823
824impl Eq for StreamPriorityKey {}
825
826impl PartialOrd for StreamPriorityKey {
827 fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
828 Some(self.cmp(other))
829 }
830}
831
832impl Ord for StreamPriorityKey {
833 fn cmp(&self, other: &Self) -> cmp::Ordering {
834 if self.id == other.id {
836 return cmp::Ordering::Equal;
837 }
838
839 if self.urgency != other.urgency {
841 return self.urgency.cmp(&other.urgency);
842 }
843
844 if !self.incremental && !other.incremental {
847 return self.id.cmp(&other.id);
848 }
849
850 if self.incremental && !other.incremental {
852 return cmp::Ordering::Greater;
853 }
854 if !self.incremental && other.incremental {
855 return cmp::Ordering::Less;
856 }
857
858 cmp::Ordering::Greater
862 }
863}
864
865intrusive_adapter!(pub StreamWritablePriorityAdapter = Arc<StreamPriorityKey>: StreamPriorityKey { writable: RBTreeAtomicLink });
866
867impl KeyAdapter<'_> for StreamWritablePriorityAdapter {
868 type Key = StreamPriorityKey;
869
870 fn get_key(&self, s: &StreamPriorityKey) -> Self::Key {
871 s.clone()
872 }
873}
874
875intrusive_adapter!(pub StreamReadablePriorityAdapter = Arc<StreamPriorityKey>: StreamPriorityKey { readable: RBTreeAtomicLink });
876
877impl KeyAdapter<'_> for StreamReadablePriorityAdapter {
878 type Key = StreamPriorityKey;
879
880 fn get_key(&self, s: &StreamPriorityKey) -> Self::Key {
881 s.clone()
882 }
883}
884
885intrusive_adapter!(pub StreamFlushablePriorityAdapter = Arc<StreamPriorityKey>: StreamPriorityKey { flushable: RBTreeAtomicLink });
886
887impl KeyAdapter<'_> for StreamFlushablePriorityAdapter {
888 type Key = StreamPriorityKey;
889
890 fn get_key(&self, s: &StreamPriorityKey) -> Self::Key {
891 s.clone()
892 }
893}
894
895#[derive(Default)]
897pub struct StreamIter {
898 streams: SmallVec<[u64; 8]>,
899 index: usize,
900}
901
902impl StreamIter {
903 #[inline]
904 fn from(streams: &StreamIdHashSet) -> Self {
905 StreamIter {
906 streams: streams.iter().copied().collect(),
907 index: 0,
908 }
909 }
910}
911
912impl Iterator for StreamIter {
913 type Item = u64;
914
915 #[inline]
916 fn next(&mut self) -> Option<Self::Item> {
917 let v = self.streams.get(self.index)?;
918 self.index += 1;
919 Some(*v)
920 }
921}
922
923impl ExactSizeIterator for StreamIter {
924 #[inline]
925 fn len(&self) -> usize {
926 self.streams.len() - self.index
927 }
928}
929
930#[cfg(test)]
931mod tests {
932 use crate::range_buf::RangeBuf;
933
934 use super::*;
935
936 #[test]
937 fn recv_flow_control() {
938 let mut stream =
939 <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
940 assert!(!stream.recv.almost_full());
941
942 let mut buf = [0; 32];
943
944 let first = RangeBuf::from(b"hello", 0, false);
945 let second = RangeBuf::from(b"world", 5, false);
946 let third = RangeBuf::from(b"something", 10, false);
947
948 assert_eq!(stream.recv.write(second), Ok(()));
949 assert_eq!(stream.recv.write(first), Ok(()));
950 assert!(!stream.recv.almost_full());
951
952 assert_eq!(stream.recv.write(third), Err(Error::FlowControl));
953
954 let (len, fin) = stream.recv.emit(&mut buf).unwrap();
955 assert_eq!(&buf[..len], b"helloworld");
956 assert!(!fin);
957
958 assert!(stream.recv.almost_full());
959
960 stream.recv.update_max_data(std::time::Instant::now());
961 assert_eq!(stream.recv.max_data_next(), 25);
962 assert!(!stream.recv.almost_full());
963
964 let third = RangeBuf::from(b"something", 10, false);
965 assert_eq!(stream.recv.write(third), Ok(()));
966 }
967
968 #[test]
969 fn recv_past_fin() {
970 let mut stream =
971 <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
972 assert!(!stream.recv.almost_full());
973
974 let first = RangeBuf::from(b"hello", 0, true);
975 let second = RangeBuf::from(b"world", 5, false);
976
977 assert_eq!(stream.recv.write(first), Ok(()));
978 assert_eq!(stream.recv.write(second), Err(Error::FinalSize));
979 }
980
981 #[test]
982 fn recv_fin_dup() {
983 let mut stream =
984 <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
985 assert!(!stream.recv.almost_full());
986
987 let first = RangeBuf::from(b"hello", 0, true);
988 let second = RangeBuf::from(b"hello", 0, true);
989
990 assert_eq!(stream.recv.write(first), Ok(()));
991 assert_eq!(stream.recv.write(second), Ok(()));
992
993 let mut buf = [0; 32];
994
995 let (len, fin) = stream.recv.emit(&mut buf).unwrap();
996 assert_eq!(&buf[..len], b"hello");
997 assert!(fin);
998 }
999
1000 #[test]
1001 fn recv_fin_change() {
1002 let mut stream =
1003 <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1004 assert!(!stream.recv.almost_full());
1005
1006 let first = RangeBuf::from(b"hello", 0, true);
1007 let second = RangeBuf::from(b"world", 5, true);
1008
1009 assert_eq!(stream.recv.write(second), Ok(()));
1010 assert_eq!(stream.recv.write(first), Err(Error::FinalSize));
1011 }
1012
1013 #[test]
1014 fn recv_fin_lower_than_received() {
1015 let mut stream =
1016 <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1017 assert!(!stream.recv.almost_full());
1018
1019 let first = RangeBuf::from(b"hello", 0, true);
1020 let second = RangeBuf::from(b"world", 5, false);
1021
1022 assert_eq!(stream.recv.write(second), Ok(()));
1023 assert_eq!(stream.recv.write(first), Err(Error::FinalSize));
1024 }
1025
1026 #[test]
1027 fn recv_fin_flow_control() {
1028 let mut stream =
1029 <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1030 assert!(!stream.recv.almost_full());
1031
1032 let mut buf = [0; 32];
1033
1034 let first = RangeBuf::from(b"hello", 0, false);
1035 let second = RangeBuf::from(b"world", 5, true);
1036
1037 assert_eq!(stream.recv.write(first), Ok(()));
1038 assert_eq!(stream.recv.write(second), Ok(()));
1039
1040 let (len, fin) = stream.recv.emit(&mut buf).unwrap();
1041 assert_eq!(&buf[..len], b"helloworld");
1042 assert!(fin);
1043
1044 assert!(!stream.recv.almost_full());
1045 }
1046
1047 #[test]
1048 fn recv_fin_reset_mismatch() {
1049 let mut stream =
1050 <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1051 assert!(!stream.recv.almost_full());
1052
1053 let first = RangeBuf::from(b"hello", 0, true);
1054
1055 assert_eq!(stream.recv.write(first), Ok(()));
1056 assert_eq!(stream.recv.reset(0, 10), Err(Error::FinalSize));
1057 }
1058
1059 #[test]
1060 fn recv_reset_with_gap() {
1061 let mut stream =
1062 <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1063 assert!(!stream.recv.almost_full());
1064
1065 let first = RangeBuf::from(b"hello", 0, false);
1066
1067 assert_eq!(stream.recv.write(first), Ok(()));
1068 assert_eq!(stream.recv.emit(&mut [0; 1]), Ok((1, false)));
1070 assert_eq!(
1072 stream.recv.reset(0, 10),
1073 Ok(RecvBufResetReturn {
1074 max_data_delta: 5,
1075 consumed_flowcontrol: 9
1077 })
1078 );
1079 assert_eq!(stream.recv.reset(0, 10), Ok(RecvBufResetReturn::zero()));
1080 }
1081
1082 #[test]
1083 fn recv_reset_dup() {
1084 let mut stream =
1085 <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1086 assert!(!stream.recv.almost_full());
1087
1088 let first = RangeBuf::from(b"hello", 0, false);
1089
1090 assert_eq!(stream.recv.write(first), Ok(()));
1091 assert_eq!(
1092 stream.recv.reset(0, 5),
1093 Ok(RecvBufResetReturn {
1094 max_data_delta: 0,
1095 consumed_flowcontrol: 5
1096 })
1097 );
1098 assert_eq!(stream.recv.reset(0, 5), Ok(RecvBufResetReturn::zero()));
1099 }
1100
1101 #[test]
1102 fn recv_reset_change() {
1103 let mut stream =
1104 <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1105 assert!(!stream.recv.almost_full());
1106
1107 let first = RangeBuf::from(b"hello", 0, false);
1108
1109 assert_eq!(stream.recv.write(first), Ok(()));
1110 assert_eq!(
1111 stream.recv.reset(0, 5),
1112 Ok(RecvBufResetReturn {
1113 max_data_delta: 0,
1114 consumed_flowcontrol: 5
1115 })
1116 );
1117 assert_eq!(stream.recv.reset(0, 10), Err(Error::FinalSize));
1118 }
1119
1120 #[test]
1121 fn recv_reset_lower_than_received() {
1122 let mut stream =
1123 <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1124 assert!(!stream.recv.almost_full());
1125
1126 let first = RangeBuf::from(b"hello", 0, false);
1127
1128 assert_eq!(stream.recv.write(first), Ok(()));
1129 assert_eq!(stream.recv.reset(0, 4), Err(Error::FinalSize));
1130 }
1131
1132 #[test]
1133 fn send_flow_control() {
1134 let mut buf = [0; 25];
1135
1136 let mut stream =
1137 <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1138
1139 let first = b"hello";
1140 let second = b"world";
1141 let third = b"something";
1142
1143 assert!(stream.send.write(first, false).is_ok());
1144 assert!(stream.send.write(second, false).is_ok());
1145 assert!(stream.send.write(third, false).is_ok());
1146
1147 assert_eq!(stream.send.off_front(), 0);
1148
1149 let (written, fin) = stream.send.emit(&mut buf[..25]).unwrap();
1150 assert_eq!(written, 15);
1151 assert!(!fin);
1152 assert_eq!(&buf[..written], b"helloworldsomet");
1153
1154 assert_eq!(stream.send.off_front(), 15);
1155
1156 let (written, fin) = stream.send.emit(&mut buf[..25]).unwrap();
1157 assert_eq!(written, 0);
1158 assert!(!fin);
1159 assert_eq!(&buf[..written], b"");
1160
1161 stream.send.retransmit(0, 15);
1162
1163 assert_eq!(stream.send.off_front(), 0);
1164
1165 let (written, fin) = stream.send.emit(&mut buf[..10]).unwrap();
1166 assert_eq!(written, 10);
1167 assert!(!fin);
1168 assert_eq!(&buf[..written], b"helloworld");
1169
1170 assert_eq!(stream.send.off_front(), 10);
1171
1172 let (written, fin) = stream.send.emit(&mut buf[..10]).unwrap();
1173 assert_eq!(written, 5);
1174 assert!(!fin);
1175 assert_eq!(&buf[..written], b"somet");
1176 }
1177
1178 #[test]
1179 fn send_past_fin() {
1180 let mut stream =
1181 <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1182
1183 let first = b"hello";
1184 let second = b"world";
1185 let third = b"third";
1186
1187 assert_eq!(stream.send.write(first, false), Ok(5));
1188
1189 assert_eq!(stream.send.write(second, true), Ok(5));
1190 assert!(stream.send.is_fin());
1191
1192 assert_eq!(stream.send.write(third, false), Err(Error::FinalSize));
1193 }
1194
1195 #[test]
1196 fn send_fin_dup() {
1197 let mut stream =
1198 <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1199
1200 assert_eq!(stream.send.write(b"hello", true), Ok(5));
1201 assert!(stream.send.is_fin());
1202
1203 assert_eq!(stream.send.write(b"", true), Ok(0));
1204 assert!(stream.send.is_fin());
1205 }
1206
1207 #[test]
1208 fn send_undo_fin() {
1209 let mut stream =
1210 <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1211
1212 assert_eq!(stream.send.write(b"hello", true), Ok(5));
1213 assert!(stream.send.is_fin());
1214
1215 assert_eq!(
1216 stream.send.write(b"helloworld", true),
1217 Err(Error::FinalSize)
1218 );
1219 }
1220
1221 #[test]
1222 fn send_fin_max_data_match() {
1223 let mut buf = [0; 15];
1224
1225 let mut stream =
1226 <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1227
1228 let slice = b"hellohellohello";
1229
1230 assert!(stream.send.write(slice, true).is_ok());
1231
1232 let (written, fin) = stream.send.emit(&mut buf[..15]).unwrap();
1233 assert_eq!(written, 15);
1234 assert!(fin);
1235 assert_eq!(&buf[..written], slice);
1236 }
1237
1238 #[test]
1239 fn send_fin_zero_length() {
1240 let mut buf = [0; 5];
1241
1242 let mut stream =
1243 <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1244
1245 assert_eq!(stream.send.write(b"hello", false), Ok(5));
1246 assert_eq!(stream.send.write(b"", true), Ok(0));
1247 assert!(stream.send.is_fin());
1248
1249 let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1250 assert_eq!(written, 5);
1251 assert!(fin);
1252 assert_eq!(&buf[..written], b"hello");
1253 }
1254
1255 #[test]
1256 fn send_ack() {
1257 let mut buf = [0; 5];
1258
1259 let mut stream =
1260 <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1261
1262 assert_eq!(stream.send.write(b"hello", false), Ok(5));
1263 assert_eq!(stream.send.write(b"world", false), Ok(5));
1264 assert_eq!(stream.send.write(b"", true), Ok(0));
1265 assert!(stream.send.is_fin());
1266
1267 assert_eq!(stream.send.off_front(), 0);
1268
1269 let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1270 assert_eq!(written, 5);
1271 assert!(!fin);
1272 assert_eq!(&buf[..written], b"hello");
1273
1274 stream.send.ack_and_drop(0, 5);
1275
1276 stream.send.retransmit(0, 5);
1277
1278 assert_eq!(stream.send.off_front(), 5);
1279
1280 let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1281 assert_eq!(written, 5);
1282 assert!(fin);
1283 assert_eq!(&buf[..written], b"world");
1284 }
1285
1286 #[test]
1287 fn send_ack_reordering() {
1288 let mut buf = [0; 5];
1289
1290 let mut stream =
1291 <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1292
1293 assert_eq!(stream.send.write(b"hello", false), Ok(5));
1294 assert_eq!(stream.send.write(b"world", false), Ok(5));
1295 assert_eq!(stream.send.write(b"", true), Ok(0));
1296 assert!(stream.send.is_fin());
1297
1298 assert_eq!(stream.send.off_front(), 0);
1299
1300 let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1301 assert_eq!(written, 5);
1302 assert!(!fin);
1303 assert_eq!(&buf[..written], b"hello");
1304
1305 assert_eq!(stream.send.off_front(), 5);
1306
1307 let (written, fin) = stream.send.emit(&mut buf[..1]).unwrap();
1308 assert_eq!(written, 1);
1309 assert!(!fin);
1310 assert_eq!(&buf[..written], b"w");
1311
1312 stream.send.ack_and_drop(5, 1);
1313 stream.send.ack_and_drop(0, 5);
1314
1315 stream.send.retransmit(0, 5);
1316 stream.send.retransmit(5, 1);
1317
1318 assert_eq!(stream.send.off_front(), 6);
1319
1320 let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1321 assert_eq!(written, 4);
1322 assert!(fin);
1323 assert_eq!(&buf[..written], b"orld");
1324 }
1325
1326 #[test]
1327 fn recv_data_below_off() {
1328 let mut stream =
1329 <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1330
1331 let first = RangeBuf::from(b"hello", 0, false);
1332
1333 assert_eq!(stream.recv.write(first), Ok(()));
1334
1335 let mut buf = [0; 10];
1336
1337 let (len, fin) = stream.recv.emit(&mut buf).unwrap();
1338 assert_eq!(&buf[..len], b"hello");
1339 assert!(!fin);
1340
1341 let first = RangeBuf::from(b"elloworld", 1, true);
1342 assert_eq!(stream.recv.write(first), Ok(()));
1343
1344 let (len, fin) = stream.recv.emit(&mut buf).unwrap();
1345 assert_eq!(&buf[..len], b"world");
1346 assert!(fin);
1347 }
1348
1349 #[test]
1350 fn stream_complete() {
1351 let mut stream =
1352 <Stream>::new(0, 30, 30, true, true, DEFAULT_STREAM_WINDOW);
1353
1354 assert_eq!(stream.send.write(b"hello", false), Ok(5));
1355 assert_eq!(stream.send.write(b"world", false), Ok(5));
1356
1357 assert!(!stream.send.is_complete());
1358 assert!(!stream.send.is_fin());
1359
1360 assert_eq!(stream.send.write(b"", true), Ok(0));
1361
1362 assert!(!stream.send.is_complete());
1363 assert!(stream.send.is_fin());
1364
1365 let buf = RangeBuf::from(b"hello", 0, true);
1366 assert!(stream.recv.write(buf).is_ok());
1367 assert!(!stream.recv.is_fin());
1368
1369 stream.send.ack(6, 4);
1370 assert!(!stream.send.is_complete());
1371
1372 let mut buf = [0; 2];
1373 assert_eq!(stream.recv.emit(&mut buf), Ok((2, false)));
1374 assert!(!stream.recv.is_fin());
1375
1376 stream.send.ack(1, 5);
1377 assert!(!stream.send.is_complete());
1378
1379 stream.send.ack(0, 1);
1380 assert!(stream.send.is_complete());
1381
1382 assert!(!stream.is_complete());
1383
1384 let mut buf = [0; 3];
1385 assert_eq!(stream.recv.emit(&mut buf), Ok((3, true)));
1386 assert!(stream.recv.is_fin());
1387
1388 assert!(stream.is_complete());
1389 }
1390
1391 #[test]
1392 fn send_fin_zero_length_output() {
1393 let mut buf = [0; 5];
1394
1395 let mut stream =
1396 <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1397
1398 assert_eq!(stream.send.write(b"hello", false), Ok(5));
1399 assert_eq!(stream.send.off_front(), 0);
1400 assert!(!stream.send.is_fin());
1401
1402 let (written, fin) = stream.send.emit(&mut buf).unwrap();
1403 assert_eq!(written, 5);
1404 assert!(!fin);
1405 assert_eq!(&buf[..written], b"hello");
1406
1407 assert_eq!(stream.send.write(b"", true), Ok(0));
1408 assert!(stream.send.is_fin());
1409 assert_eq!(stream.send.off_front(), 5);
1410
1411 let (written, fin) = stream.send.emit(&mut buf).unwrap();
1412 assert_eq!(written, 0);
1413 assert!(fin);
1414 assert_eq!(&buf[..written], b"");
1415 }
1416
1417 fn stream_send_ready(stream: &Stream) -> bool {
1418 !stream.send.is_empty() &&
1419 stream.send.off_front() < stream.send.off_back()
1420 }
1421
1422 #[test]
1423 fn send_emit() {
1424 let mut buf = [0; 5];
1425
1426 let mut stream =
1427 <Stream>::new(0, 0, 20, true, true, DEFAULT_STREAM_WINDOW);
1428
1429 assert_eq!(stream.send.write(b"hello", false), Ok(5));
1430 assert_eq!(stream.send.write(b"world", false), Ok(5));
1431 assert_eq!(stream.send.write(b"olleh", false), Ok(5));
1432 assert_eq!(stream.send.write(b"dlrow", true), Ok(5));
1433 assert_eq!(stream.send.off_front(), 0);
1434 assert_eq!(stream.send.bufs_count(), 4);
1435
1436 assert!(stream.is_flushable());
1437
1438 assert!(stream_send_ready(&stream));
1439 assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1440 assert_eq!(stream.send.off_front(), 4);
1441 assert_eq!(&buf[..4], b"hell");
1442
1443 assert!(stream_send_ready(&stream));
1444 assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1445 assert_eq!(stream.send.off_front(), 8);
1446 assert_eq!(&buf[..4], b"owor");
1447
1448 assert!(stream_send_ready(&stream));
1449 assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1450 assert_eq!(stream.send.off_front(), 10);
1451 assert_eq!(&buf[..2], b"ld");
1452
1453 assert!(stream_send_ready(&stream));
1454 assert_eq!(stream.send.emit(&mut buf[..1]), Ok((1, false)));
1455 assert_eq!(stream.send.off_front(), 11);
1456 assert_eq!(&buf[..1], b"o");
1457
1458 assert!(stream_send_ready(&stream));
1459 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1460 assert_eq!(stream.send.off_front(), 16);
1461 assert_eq!(&buf[..5], b"llehd");
1462
1463 assert!(stream_send_ready(&stream));
1464 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((4, true)));
1465 assert_eq!(stream.send.off_front(), 20);
1466 assert_eq!(&buf[..4], b"lrow");
1467
1468 assert!(!stream.is_flushable());
1469
1470 assert!(!stream_send_ready(&stream));
1471 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((0, true)));
1472 assert_eq!(stream.send.off_front(), 20);
1473 }
1474
1475 #[test]
1476 fn send_emit_ack() {
1477 let mut buf = [0; 5];
1478
1479 let mut stream =
1480 <Stream>::new(0, 0, 20, true, true, DEFAULT_STREAM_WINDOW);
1481
1482 assert_eq!(stream.send.write(b"hello", false), Ok(5));
1483 assert_eq!(stream.send.write(b"world", false), Ok(5));
1484 assert_eq!(stream.send.write(b"olleh", false), Ok(5));
1485 assert_eq!(stream.send.write(b"dlrow", true), Ok(5));
1486 assert_eq!(stream.send.off_front(), 0);
1487 assert_eq!(stream.send.bufs_count(), 4);
1488
1489 assert!(stream.is_flushable());
1490
1491 assert!(stream_send_ready(&stream));
1492 assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1493 assert_eq!(stream.send.off_front(), 4);
1494 assert_eq!(&buf[..4], b"hell");
1495
1496 assert!(stream_send_ready(&stream));
1497 assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1498 assert_eq!(stream.send.off_front(), 8);
1499 assert_eq!(&buf[..4], b"owor");
1500
1501 stream.send.ack_and_drop(0, 5);
1502 assert_eq!(stream.send.bufs_count(), 3);
1503
1504 assert!(stream_send_ready(&stream));
1505 assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1506 assert_eq!(stream.send.off_front(), 10);
1507 assert_eq!(&buf[..2], b"ld");
1508
1509 stream.send.ack_and_drop(7, 5);
1510 assert_eq!(stream.send.bufs_count(), 3);
1511
1512 assert!(stream_send_ready(&stream));
1513 assert_eq!(stream.send.emit(&mut buf[..1]), Ok((1, false)));
1514 assert_eq!(stream.send.off_front(), 11);
1515 assert_eq!(&buf[..1], b"o");
1516
1517 assert!(stream_send_ready(&stream));
1518 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1519 assert_eq!(stream.send.off_front(), 16);
1520 assert_eq!(&buf[..5], b"llehd");
1521
1522 stream.send.ack_and_drop(5, 7);
1523 assert_eq!(stream.send.bufs_count(), 2);
1524
1525 assert!(stream_send_ready(&stream));
1526 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((4, true)));
1527 assert_eq!(stream.send.off_front(), 20);
1528 assert_eq!(&buf[..4], b"lrow");
1529
1530 assert!(!stream.is_flushable());
1531
1532 assert!(!stream_send_ready(&stream));
1533 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((0, true)));
1534 assert_eq!(stream.send.off_front(), 20);
1535
1536 stream.send.ack_and_drop(22, 4);
1537 assert_eq!(stream.send.bufs_count(), 2);
1538
1539 stream.send.ack_and_drop(20, 1);
1540 assert_eq!(stream.send.bufs_count(), 2);
1541 }
1542
1543 #[test]
1544 fn send_emit_retransmit() {
1545 let mut buf = [0; 5];
1546
1547 let mut stream =
1548 <Stream>::new(0, 0, 20, true, true, DEFAULT_STREAM_WINDOW);
1549
1550 assert_eq!(stream.send.write(b"hello", false), Ok(5));
1551 assert_eq!(stream.send.write(b"world", false), Ok(5));
1552 assert_eq!(stream.send.write(b"olleh", false), Ok(5));
1553 assert_eq!(stream.send.write(b"dlrow", true), Ok(5));
1554 assert_eq!(stream.send.off_front(), 0);
1555 assert_eq!(stream.send.bufs_count(), 4);
1556
1557 assert!(stream.is_flushable());
1558
1559 assert!(stream_send_ready(&stream));
1560 assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1561 assert_eq!(stream.send.off_front(), 4);
1562 assert_eq!(&buf[..4], b"hell");
1563
1564 assert!(stream_send_ready(&stream));
1565 assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1566 assert_eq!(stream.send.off_front(), 8);
1567 assert_eq!(&buf[..4], b"owor");
1568
1569 stream.send.retransmit(3, 3);
1570 assert_eq!(stream.send.off_front(), 3);
1571
1572 assert!(stream_send_ready(&stream));
1573 assert_eq!(stream.send.emit(&mut buf[..3]), Ok((3, false)));
1574 assert_eq!(stream.send.off_front(), 8);
1575 assert_eq!(&buf[..3], b"low");
1576
1577 assert!(stream_send_ready(&stream));
1578 assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1579 assert_eq!(stream.send.off_front(), 10);
1580 assert_eq!(&buf[..2], b"ld");
1581
1582 stream.send.ack_and_drop(7, 2);
1583
1584 stream.send.retransmit(8, 2);
1585
1586 assert!(stream_send_ready(&stream));
1587 assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1588 assert_eq!(stream.send.off_front(), 10);
1589 assert_eq!(&buf[..2], b"ld");
1590
1591 assert!(stream_send_ready(&stream));
1592 assert_eq!(stream.send.emit(&mut buf[..1]), Ok((1, false)));
1593 assert_eq!(stream.send.off_front(), 11);
1594 assert_eq!(&buf[..1], b"o");
1595
1596 assert!(stream_send_ready(&stream));
1597 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1598 assert_eq!(stream.send.off_front(), 16);
1599 assert_eq!(&buf[..5], b"llehd");
1600
1601 stream.send.retransmit(12, 2);
1602
1603 assert!(stream_send_ready(&stream));
1604 assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1605 assert_eq!(stream.send.off_front(), 16);
1606 assert_eq!(&buf[..2], b"le");
1607
1608 assert!(stream_send_ready(&stream));
1609 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((4, true)));
1610 assert_eq!(stream.send.off_front(), 20);
1611 assert_eq!(&buf[..4], b"lrow");
1612
1613 assert!(!stream.is_flushable());
1614
1615 assert!(!stream_send_ready(&stream));
1616 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((0, true)));
1617 assert_eq!(stream.send.off_front(), 20);
1618
1619 stream.send.retransmit(7, 12);
1620
1621 assert!(stream_send_ready(&stream));
1622 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1623 assert_eq!(stream.send.off_front(), 12);
1624 assert_eq!(&buf[..5], b"rldol");
1625
1626 assert!(stream_send_ready(&stream));
1627 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1628 assert_eq!(stream.send.off_front(), 17);
1629 assert_eq!(&buf[..5], b"lehdl");
1630
1631 assert!(stream_send_ready(&stream));
1632 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((2, false)));
1633 assert_eq!(stream.send.off_front(), 20);
1634 assert_eq!(&buf[..2], b"ro");
1635
1636 stream.send.ack_and_drop(12, 7);
1637
1638 stream.send.retransmit(7, 12);
1639
1640 assert!(stream_send_ready(&stream));
1641 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1642 assert_eq!(stream.send.off_front(), 12);
1643 assert_eq!(&buf[..5], b"rldol");
1644
1645 assert!(stream_send_ready(&stream));
1646 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1647 assert_eq!(stream.send.off_front(), 17);
1648 assert_eq!(&buf[..5], b"lehdl");
1649
1650 assert!(stream_send_ready(&stream));
1651 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((2, false)));
1652 assert_eq!(stream.send.off_front(), 20);
1653 assert_eq!(&buf[..2], b"ro");
1654 }
1655
1656 #[test]
1657 fn rangebuf_split_off() {
1658 let mut buf = <RangeBuf>::from(b"helloworld", 5, true);
1659 assert_eq!(buf.start, 0);
1660 assert_eq!(buf.pos, 0);
1661 assert_eq!(buf.len, 10);
1662 assert_eq!(buf.off, 5);
1663 assert!(buf.fin);
1664
1665 assert_eq!(buf.len(), 10);
1666 assert_eq!(buf.off(), 5);
1667 assert!(buf.fin());
1668
1669 assert_eq!(&buf[..], b"helloworld");
1670
1671 buf.consume(5);
1673
1674 assert_eq!(buf.start, 0);
1675 assert_eq!(buf.pos, 5);
1676 assert_eq!(buf.len, 10);
1677 assert_eq!(buf.off, 5);
1678 assert!(buf.fin);
1679
1680 assert_eq!(buf.len(), 5);
1681 assert_eq!(buf.off(), 10);
1682 assert!(buf.fin());
1683
1684 assert_eq!(&buf[..], b"world");
1685
1686 let mut new_buf = buf.split_off(3);
1688
1689 assert_eq!(buf.start, 0);
1690 assert_eq!(buf.pos, 3);
1691 assert_eq!(buf.len, 3);
1692 assert_eq!(buf.off, 5);
1693 assert!(!buf.fin);
1694
1695 assert_eq!(buf.len(), 0);
1696 assert_eq!(buf.off(), 8);
1697 assert!(!buf.fin());
1698
1699 assert_eq!(&buf[..], b"");
1700
1701 assert_eq!(new_buf.start, 3);
1702 assert_eq!(new_buf.pos, 5);
1703 assert_eq!(new_buf.len, 7);
1704 assert_eq!(new_buf.off, 8);
1705 assert!(new_buf.fin);
1706
1707 assert_eq!(new_buf.len(), 5);
1708 assert_eq!(new_buf.off(), 10);
1709 assert!(new_buf.fin());
1710
1711 assert_eq!(&new_buf[..], b"world");
1712
1713 new_buf.consume(2);
1715
1716 assert_eq!(new_buf.start, 3);
1717 assert_eq!(new_buf.pos, 7);
1718 assert_eq!(new_buf.len, 7);
1719 assert_eq!(new_buf.off, 8);
1720 assert!(new_buf.fin);
1721
1722 assert_eq!(new_buf.len(), 3);
1723 assert_eq!(new_buf.off(), 12);
1724 assert!(new_buf.fin());
1725
1726 assert_eq!(&new_buf[..], b"rld");
1727
1728 let mut new_new_buf = new_buf.split_off(5);
1730
1731 assert_eq!(new_buf.start, 3);
1732 assert_eq!(new_buf.pos, 7);
1733 assert_eq!(new_buf.len, 5);
1734 assert_eq!(new_buf.off, 8);
1735 assert!(!new_buf.fin);
1736
1737 assert_eq!(new_buf.len(), 1);
1738 assert_eq!(new_buf.off(), 12);
1739 assert!(!new_buf.fin());
1740
1741 assert_eq!(&new_buf[..], b"r");
1742
1743 assert_eq!(new_new_buf.start, 8);
1744 assert_eq!(new_new_buf.pos, 8);
1745 assert_eq!(new_new_buf.len, 2);
1746 assert_eq!(new_new_buf.off, 13);
1747 assert!(new_new_buf.fin);
1748
1749 assert_eq!(new_new_buf.len(), 2);
1750 assert_eq!(new_new_buf.off(), 13);
1751 assert!(new_new_buf.fin());
1752
1753 assert_eq!(&new_new_buf[..], b"ld");
1754
1755 new_new_buf.consume(2);
1757
1758 assert_eq!(new_new_buf.start, 8);
1759 assert_eq!(new_new_buf.pos, 10);
1760 assert_eq!(new_new_buf.len, 2);
1761 assert_eq!(new_new_buf.off, 13);
1762 assert!(new_new_buf.fin);
1763
1764 assert_eq!(new_new_buf.len(), 0);
1765 assert_eq!(new_new_buf.off(), 15);
1766 assert!(new_new_buf.fin());
1767
1768 assert_eq!(&new_new_buf[..], b"");
1769 }
1770
1771 #[test]
1774 fn stream_limit_auto_open() {
1775 let local_tp = crate::TransportParams::default();
1776 let peer_tp = crate::TransportParams::default();
1777
1778 let mut streams = <StreamMap>::new(5, 5, 5);
1779
1780 let stream_id = 500;
1781 assert!(!is_local(stream_id, true), "stream id is peer initiated");
1782 assert!(is_bidi(stream_id), "stream id is bidirectional");
1783 assert_eq!(
1784 streams
1785 .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
1786 .err(),
1787 Some(Error::StreamLimit),
1788 "stream limit should be exceeded"
1789 );
1790 }
1791
1792 #[test]
1795 fn stream_create_out_of_order() {
1796 let local_tp = crate::TransportParams::default();
1797 let peer_tp = crate::TransportParams::default();
1798
1799 let mut streams = <StreamMap>::new(5, 5, 5);
1800
1801 for stream_id in [8, 12, 4] {
1802 assert!(is_local(stream_id, false), "stream id is client initiated");
1803 assert!(is_bidi(stream_id), "stream id is bidirectional");
1804 assert!(streams
1805 .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
1806 .is_ok());
1807 }
1808 }
1809
1810 #[test]
1812 fn stream_limit_edge() {
1813 let local_tp = crate::TransportParams::default();
1814 let peer_tp = crate::TransportParams::default();
1815
1816 let mut streams = <StreamMap>::new(3, 3, 3);
1817
1818 let stream_id = 8;
1820 assert!(streams
1821 .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
1822 .is_ok());
1823
1824 let stream_id = 12;
1826 assert_eq!(
1827 streams
1828 .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
1829 .err(),
1830 Some(Error::StreamLimit)
1831 );
1832 }
1833
1834 fn cycle_stream_priority(stream_id: u64, streams: &mut StreamMap) {
1835 let key = streams.get(stream_id).unwrap().priority_key.clone();
1836 streams.update_priority(&key.clone(), &key);
1837 }
1838
1839 #[test]
1840 fn writable_prioritized_default_priority() {
1841 let local_tp = crate::TransportParams::default();
1842 let peer_tp = crate::TransportParams {
1843 initial_max_stream_data_bidi_local: 100,
1844 initial_max_stream_data_uni: 100,
1845 ..Default::default()
1846 };
1847
1848 let mut streams = StreamMap::new(100, 100, 100);
1849
1850 for id in [0, 4, 8, 12] {
1851 assert!(streams
1852 .get_or_create(id, &local_tp, &peer_tp, false, true)
1853 .is_ok());
1854 }
1855
1856 let walk_1: Vec<u64> = streams.writable().collect();
1857 cycle_stream_priority(*walk_1.first().unwrap(), &mut streams);
1858 let walk_2: Vec<u64> = streams.writable().collect();
1859 cycle_stream_priority(*walk_2.first().unwrap(), &mut streams);
1860 let walk_3: Vec<u64> = streams.writable().collect();
1861 cycle_stream_priority(*walk_3.first().unwrap(), &mut streams);
1862 let walk_4: Vec<u64> = streams.writable().collect();
1863 cycle_stream_priority(*walk_4.first().unwrap(), &mut streams);
1864 let walk_5: Vec<u64> = streams.writable().collect();
1865
1866 assert_eq!(walk_1, vec![0, 4, 8, 12]);
1869 assert_eq!(walk_2, vec![4, 8, 12, 0]);
1870 assert_eq!(walk_3, vec![8, 12, 0, 4]);
1871 assert_eq!(walk_4, vec![12, 0, 4, 8,]);
1872 assert_eq!(walk_5, vec![0, 4, 8, 12]);
1873 }
1874
1875 #[test]
1876 fn writable_prioritized_insert_order() {
1877 let local_tp = crate::TransportParams::default();
1878 let peer_tp = crate::TransportParams {
1879 initial_max_stream_data_bidi_local: 100,
1880 initial_max_stream_data_uni: 100,
1881 ..Default::default()
1882 };
1883
1884 let mut streams = StreamMap::new(100, 100, 100);
1885
1886 for id in [12, 4, 8, 0] {
1889 assert!(streams
1890 .get_or_create(id, &local_tp, &peer_tp, false, true)
1891 .is_ok());
1892 }
1893
1894 let walk_1: Vec<u64> = streams.writable().collect();
1895 cycle_stream_priority(*walk_1.first().unwrap(), &mut streams);
1896 let walk_2: Vec<u64> = streams.writable().collect();
1897 cycle_stream_priority(*walk_2.first().unwrap(), &mut streams);
1898 let walk_3: Vec<u64> = streams.writable().collect();
1899 cycle_stream_priority(*walk_3.first().unwrap(), &mut streams);
1900 let walk_4: Vec<u64> = streams.writable().collect();
1901 cycle_stream_priority(*walk_4.first().unwrap(), &mut streams);
1902 let walk_5: Vec<u64> = streams.writable().collect();
1903 assert_eq!(walk_1, vec![12, 4, 8, 0]);
1904 assert_eq!(walk_2, vec![4, 8, 0, 12]);
1905 assert_eq!(walk_3, vec![8, 0, 12, 4,]);
1906 assert_eq!(walk_4, vec![0, 12, 4, 8]);
1907 assert_eq!(walk_5, vec![12, 4, 8, 0]);
1908 }
1909
1910 #[test]
1911 fn writable_prioritized_mixed_urgency() {
1912 let local_tp = crate::TransportParams::default();
1913 let peer_tp = crate::TransportParams {
1914 initial_max_stream_data_bidi_local: 100,
1915 initial_max_stream_data_uni: 100,
1916 ..Default::default()
1917 };
1918
1919 let mut streams = <StreamMap>::new(100, 100, 100);
1920
1921 let input = vec![
1924 (0, 100),
1925 (4, 90),
1926 (8, 80),
1927 (12, 70),
1928 (16, 60),
1929 (20, 50),
1930 (24, 40),
1931 (28, 30),
1932 (32, 20),
1933 (36, 10),
1934 (40, 0),
1935 ];
1936
1937 for (id, urgency) in input.clone() {
1938 let stream = streams
1941 .get_or_create(id, &local_tp, &peer_tp, false, true)
1942 .unwrap();
1943
1944 stream.urgency = urgency;
1945
1946 let new_priority_key = Arc::new(StreamPriorityKey {
1947 urgency: stream.urgency,
1948 incremental: stream.incremental,
1949 id,
1950 ..Default::default()
1951 });
1952
1953 let old_priority_key = std::mem::replace(
1954 &mut stream.priority_key,
1955 new_priority_key.clone(),
1956 );
1957
1958 streams.update_priority(&old_priority_key, &new_priority_key);
1959 }
1960
1961 let walk_1: Vec<u64> = streams.writable().collect();
1962 assert_eq!(walk_1, vec![40, 36, 32, 28, 24, 20, 16, 12, 8, 4, 0]);
1963
1964 for (id, urgency) in input {
1966 let stream = streams
1969 .get_or_create(id, &local_tp, &peer_tp, false, true)
1970 .unwrap();
1971
1972 stream.urgency = urgency;
1973
1974 let new_priority_key = Arc::new(StreamPriorityKey {
1975 urgency: stream.urgency,
1976 incremental: stream.incremental,
1977 id,
1978 ..Default::default()
1979 });
1980
1981 let old_priority_key = std::mem::replace(
1982 &mut stream.priority_key,
1983 new_priority_key.clone(),
1984 );
1985
1986 streams.update_priority(&old_priority_key, &new_priority_key);
1987 }
1988
1989 let walk_2: Vec<u64> = streams.writable().collect();
1990 assert_eq!(walk_2, vec![40, 36, 32, 28, 24, 20, 16, 12, 8, 4, 0]);
1991
1992 streams.collect(24, true);
1994
1995 let walk_3: Vec<u64> = streams.writable().collect();
1996 assert_eq!(walk_3, vec![40, 36, 32, 28, 20, 16, 12, 8, 4, 0]);
1997
1998 streams.collect(40, true);
1999 streams.collect(0, true);
2000
2001 let walk_4: Vec<u64> = streams.writable().collect();
2002 assert_eq!(walk_4, vec![36, 32, 28, 20, 16, 12, 8, 4]);
2003
2004 streams
2006 .get_or_create(44, &local_tp, &peer_tp, false, true)
2007 .unwrap();
2008
2009 let walk_5: Vec<u64> = streams.writable().collect();
2010 assert_eq!(walk_5, vec![36, 32, 28, 20, 16, 12, 8, 4, 44]);
2011 }
2012
2013 #[test]
2014 fn writable_prioritized_mixed_urgencies_incrementals() {
2015 let local_tp = crate::TransportParams::default();
2016 let peer_tp = crate::TransportParams {
2017 initial_max_stream_data_bidi_local: 100,
2018 initial_max_stream_data_uni: 100,
2019 ..Default::default()
2020 };
2021
2022 let mut streams = StreamMap::new(100, 100, 100);
2023
2024 let input = vec![
2026 (0, 100),
2027 (4, 20),
2028 (8, 100),
2029 (12, 20),
2030 (16, 90),
2031 (20, 25),
2032 (24, 90),
2033 (28, 30),
2034 (32, 80),
2035 (36, 20),
2036 (40, 0),
2037 ];
2038
2039 for (id, urgency) in input.clone() {
2040 let stream = streams
2043 .get_or_create(id, &local_tp, &peer_tp, false, true)
2044 .unwrap();
2045
2046 stream.urgency = urgency;
2047
2048 let new_priority_key = Arc::new(StreamPriorityKey {
2049 urgency: stream.urgency,
2050 incremental: stream.incremental,
2051 id,
2052 ..Default::default()
2053 });
2054
2055 let old_priority_key = std::mem::replace(
2056 &mut stream.priority_key,
2057 new_priority_key.clone(),
2058 );
2059
2060 streams.update_priority(&old_priority_key, &new_priority_key);
2061 }
2062
2063 let walk_1: Vec<u64> = streams.writable().collect();
2064 cycle_stream_priority(4, &mut streams);
2065 cycle_stream_priority(16, &mut streams);
2066 cycle_stream_priority(0, &mut streams);
2067 let walk_2: Vec<u64> = streams.writable().collect();
2068 cycle_stream_priority(12, &mut streams);
2069 cycle_stream_priority(24, &mut streams);
2070 cycle_stream_priority(8, &mut streams);
2071 let walk_3: Vec<u64> = streams.writable().collect();
2072 cycle_stream_priority(36, &mut streams);
2073 cycle_stream_priority(16, &mut streams);
2074 cycle_stream_priority(0, &mut streams);
2075 let walk_4: Vec<u64> = streams.writable().collect();
2076 cycle_stream_priority(4, &mut streams);
2077 cycle_stream_priority(24, &mut streams);
2078 cycle_stream_priority(8, &mut streams);
2079 let walk_5: Vec<u64> = streams.writable().collect();
2080 cycle_stream_priority(12, &mut streams);
2081 cycle_stream_priority(16, &mut streams);
2082 cycle_stream_priority(0, &mut streams);
2083 let walk_6: Vec<u64> = streams.writable().collect();
2084 cycle_stream_priority(36, &mut streams);
2085 cycle_stream_priority(24, &mut streams);
2086 cycle_stream_priority(8, &mut streams);
2087 let walk_7: Vec<u64> = streams.writable().collect();
2088 cycle_stream_priority(4, &mut streams);
2089 cycle_stream_priority(16, &mut streams);
2090 cycle_stream_priority(0, &mut streams);
2091 let walk_8: Vec<u64> = streams.writable().collect();
2092 cycle_stream_priority(12, &mut streams);
2093 cycle_stream_priority(24, &mut streams);
2094 cycle_stream_priority(8, &mut streams);
2095 let walk_9: Vec<u64> = streams.writable().collect();
2096 cycle_stream_priority(36, &mut streams);
2097 cycle_stream_priority(16, &mut streams);
2098 cycle_stream_priority(0, &mut streams);
2099
2100 assert_eq!(walk_1, vec![40, 4, 12, 36, 20, 28, 32, 16, 24, 0, 8]);
2101 assert_eq!(walk_2, vec![40, 12, 36, 4, 20, 28, 32, 24, 16, 8, 0]);
2102 assert_eq!(walk_3, vec![40, 36, 4, 12, 20, 28, 32, 16, 24, 0, 8]);
2103 assert_eq!(walk_4, vec![40, 4, 12, 36, 20, 28, 32, 24, 16, 8, 0]);
2104 assert_eq!(walk_5, vec![40, 12, 36, 4, 20, 28, 32, 16, 24, 0, 8]);
2105 assert_eq!(walk_6, vec![40, 36, 4, 12, 20, 28, 32, 24, 16, 8, 0]);
2106 assert_eq!(walk_7, vec![40, 4, 12, 36, 20, 28, 32, 16, 24, 0, 8]);
2107 assert_eq!(walk_8, vec![40, 12, 36, 4, 20, 28, 32, 24, 16, 8, 0]);
2108 assert_eq!(walk_9, vec![40, 36, 4, 12, 20, 28, 32, 16, 24, 0, 8]);
2109
2110 streams.collect(20, true);
2112
2113 let walk_10: Vec<u64> = streams.writable().collect();
2114 assert_eq!(walk_10, vec![40, 4, 12, 36, 28, 32, 24, 16, 8, 0]);
2115
2116 let stream = streams
2118 .get_or_create(44, &local_tp, &peer_tp, false, true)
2119 .unwrap();
2120
2121 stream.urgency = 20;
2122 stream.incremental = true;
2123
2124 let new_priority_key = Arc::new(StreamPriorityKey {
2125 urgency: stream.urgency,
2126 incremental: stream.incremental,
2127 id: 44,
2128 ..Default::default()
2129 });
2130
2131 let old_priority_key =
2132 std::mem::replace(&mut stream.priority_key, new_priority_key.clone());
2133
2134 streams.update_priority(&old_priority_key, &new_priority_key);
2135
2136 let walk_11: Vec<u64> = streams.writable().collect();
2137 assert_eq!(walk_11, vec![40, 4, 12, 36, 44, 28, 32, 24, 16, 8, 0]);
2138 }
2139
2140 #[test]
2141 fn priority_tree_dupes() {
2142 let mut prioritized_writable: RBTree<StreamWritablePriorityAdapter> =
2143 Default::default();
2144
2145 for id in [0, 4, 8, 12] {
2146 let s = Arc::new(StreamPriorityKey {
2147 urgency: 0,
2148 incremental: false,
2149 id,
2150 ..Default::default()
2151 });
2152
2153 prioritized_writable.insert(s);
2154 }
2155
2156 let walk_1: Vec<u64> =
2157 prioritized_writable.iter().map(|s| s.id).collect();
2158 assert_eq!(walk_1, vec![0, 4, 8, 12]);
2159
2160 for id in [0, 4, 8, 12] {
2163 let s = Arc::new(StreamPriorityKey {
2164 urgency: 0,
2165 incremental: false,
2166 id,
2167 ..Default::default()
2168 });
2169
2170 prioritized_writable.insert(s);
2171 }
2172
2173 let walk_2: Vec<u64> =
2174 prioritized_writable.iter().map(|s| s.id).collect();
2175 assert_eq!(walk_2, vec![0, 0, 4, 4, 8, 8, 12, 12]);
2176 }
2177}
2178
2179mod recv_buf;
2180mod send_buf;