1use std::cmp;
28
29use std::sync::Arc;
30
31use std::collections::hash_map;
32use std::collections::HashMap;
33use std::collections::HashSet;
34
35use intrusive_collections::intrusive_adapter;
36use intrusive_collections::KeyAdapter;
37use intrusive_collections::RBTree;
38use intrusive_collections::RBTreeAtomicLink;
39
40use smallvec::SmallVec;
41
42use crate::range_buf::DefaultBufFactory;
43use crate::BufFactory;
44use crate::Error;
45use crate::Result;
46
47const DEFAULT_URGENCY: u8 = 127;
48
49const DEFAULT_STREAM_WINDOW: u64 = 32 * 1024;
51
52pub const MAX_STREAM_WINDOW: u64 = 16 * 1024 * 1024;
54
55#[derive(Default)]
60pub struct StreamIdHasher {
61 id: u64,
62}
63
64#[derive(Debug, PartialEq, Clone, Copy)]
66pub struct RecvBufResetReturn {
67 pub max_data_delta: u64,
70
71 pub consumed_flowcontrol: u64,
74}
75
76impl RecvBufResetReturn {
77 pub fn zero() -> Self {
78 Self {
79 max_data_delta: 0,
80 consumed_flowcontrol: 0,
81 }
82 }
83}
84
85pub enum RecvAction<'a> {
87 Emit { out: &'a mut [u8] },
89 Discard { len: usize },
91}
92
93impl std::hash::Hasher for StreamIdHasher {
94 #[inline]
95 fn finish(&self) -> u64 {
96 self.id
97 }
98
99 #[inline]
100 fn write_u64(&mut self, id: u64) {
101 self.id = id;
102 }
103
104 #[inline]
105 fn write(&mut self, _: &[u8]) {
106 unimplemented!()
109 }
110}
111
112type BuildStreamIdHasher = std::hash::BuildHasherDefault<StreamIdHasher>;
113
114pub type StreamIdHashMap<V> = HashMap<u64, V, BuildStreamIdHasher>;
115pub type StreamIdHashSet = HashSet<u64, BuildStreamIdHasher>;
116
117#[derive(Default)]
119pub struct StreamMap<F: BufFactory = DefaultBufFactory> {
120 streams: StreamIdHashMap<Stream<F>>,
122
123 collected: StreamIdHashSet,
129
130 peer_max_streams_bidi: u64,
132
133 peer_max_streams_uni: u64,
135
136 peer_opened_streams_bidi: u64,
138
139 peer_opened_streams_uni: u64,
141
142 local_max_streams_bidi: u64,
144 local_max_streams_bidi_next: u64,
145
146 initial_max_streams_bidi: u64,
148
149 local_max_streams_uni: u64,
151 local_max_streams_uni_next: u64,
152
153 initial_max_streams_uni: u64,
155
156 local_opened_streams_bidi: u64,
158
159 local_opened_streams_uni: u64,
161
162 flushable: RBTree<StreamFlushablePriorityAdapter>,
166
167 pub readable: RBTree<StreamReadablePriorityAdapter>,
171
172 pub writable: RBTree<StreamWritablePriorityAdapter>,
177
178 almost_full: StreamIdHashSet,
183
184 blocked: StreamIdHashMap<u64>,
188
189 reset: StreamIdHashMap<(u64, u64)>,
193
194 stopped: StreamIdHashMap<u64>,
198
199 max_stream_window: u64,
201}
202
203impl<F: BufFactory> StreamMap<F> {
204 pub fn new(
205 max_streams_bidi: u64, max_streams_uni: u64, max_stream_window: u64,
206 ) -> Self {
207 StreamMap {
208 local_max_streams_bidi: max_streams_bidi,
209 local_max_streams_bidi_next: max_streams_bidi,
210 initial_max_streams_bidi: max_streams_bidi,
211
212 local_max_streams_uni: max_streams_uni,
213 local_max_streams_uni_next: max_streams_uni,
214 initial_max_streams_uni: max_streams_uni,
215
216 max_stream_window,
217
218 ..StreamMap::default()
219 }
220 }
221
222 pub fn get(&self, id: u64) -> Option<&Stream<F>> {
224 self.streams.get(&id)
225 }
226
227 pub fn get_mut(&mut self, id: u64) -> Option<&mut Stream<F>> {
229 self.streams.get_mut(&id)
230 }
231
232 pub(crate) fn get_or_create(
245 &mut self, id: u64, local_params: &crate::TransportParams,
246 peer_params: &crate::TransportParams, local: bool, is_server: bool,
247 ) -> Result<&mut Stream<F>> {
248 let (stream, is_new_and_writable) = match self.streams.entry(id) {
249 hash_map::Entry::Vacant(v) => {
250 if self.collected.contains(&id) {
252 return Err(Error::Done);
253 }
254
255 if local != is_local(id, is_server) {
256 return Err(Error::InvalidStreamState(id));
257 }
258
259 let (max_rx_data, max_tx_data) = match (local, is_bidi(id)) {
260 (true, true) => (
262 local_params.initial_max_stream_data_bidi_local,
263 peer_params.initial_max_stream_data_bidi_remote,
264 ),
265
266 (true, false) => (0, peer_params.initial_max_stream_data_uni),
268
269 (false, true) => (
271 local_params.initial_max_stream_data_bidi_remote,
272 peer_params.initial_max_stream_data_bidi_local,
273 ),
274
275 (false, false) =>
277 (local_params.initial_max_stream_data_uni, 0),
278 };
279
280 let stream_sequence = id >> 2;
284
285 match (is_local(id, is_server), is_bidi(id)) {
287 (true, true) => {
288 let n = cmp::max(
289 self.local_opened_streams_bidi,
290 stream_sequence + 1,
291 );
292
293 if n > self.peer_max_streams_bidi {
294 return Err(Error::StreamLimit);
295 }
296
297 self.local_opened_streams_bidi = n;
298 },
299
300 (true, false) => {
301 let n = cmp::max(
302 self.local_opened_streams_uni,
303 stream_sequence + 1,
304 );
305
306 if n > self.peer_max_streams_uni {
307 return Err(Error::StreamLimit);
308 }
309
310 self.local_opened_streams_uni = n;
311 },
312
313 (false, true) => {
314 let n = cmp::max(
315 self.peer_opened_streams_bidi,
316 stream_sequence + 1,
317 );
318
319 if n > self.local_max_streams_bidi {
320 return Err(Error::StreamLimit);
321 }
322
323 self.peer_opened_streams_bidi = n;
324 },
325
326 (false, false) => {
327 let n = cmp::max(
328 self.peer_opened_streams_uni,
329 stream_sequence + 1,
330 );
331
332 if n > self.local_max_streams_uni {
333 return Err(Error::StreamLimit);
334 }
335
336 self.peer_opened_streams_uni = n;
337 },
338 };
339
340 let s = Stream::new(
341 id,
342 max_rx_data,
343 max_tx_data,
344 is_bidi(id),
345 local,
346 self.max_stream_window,
347 );
348
349 let is_writable = s.is_writable();
350
351 (v.insert(s), is_writable)
352 },
353
354 hash_map::Entry::Occupied(v) => (v.into_mut(), false),
355 };
356
357 if is_new_and_writable {
360 self.writable.insert(Arc::clone(&stream.priority_key));
361 }
362
363 Ok(stream)
364 }
365
366 pub fn insert_readable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
370 if !priority_key.readable.is_linked() {
371 self.readable.insert(Arc::clone(priority_key));
372 }
373 }
374
375 pub fn remove_readable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
377 if !priority_key.readable.is_linked() {
378 return;
379 }
380
381 let mut c = {
382 let ptr = Arc::as_ptr(priority_key);
383 unsafe { self.readable.cursor_mut_from_ptr(ptr) }
384 };
385
386 c.remove();
387 }
388
389 pub fn insert_writable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
396 if !priority_key.writable.is_linked() {
397 self.writable.insert(Arc::clone(priority_key));
398 }
399 }
400
401 pub fn remove_writable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
406 if !priority_key.writable.is_linked() {
407 return;
408 }
409
410 let mut c = {
411 let ptr = Arc::as_ptr(priority_key);
412 unsafe { self.writable.cursor_mut_from_ptr(ptr) }
413 };
414
415 c.remove();
416 }
417
418 pub fn insert_flushable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
422 if !priority_key.flushable.is_linked() {
423 self.flushable.insert(Arc::clone(priority_key));
424 }
425 }
426
427 pub fn remove_flushable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
429 if !priority_key.flushable.is_linked() {
430 return;
431 }
432
433 let mut c = {
434 let ptr = Arc::as_ptr(priority_key);
435 unsafe { self.flushable.cursor_mut_from_ptr(ptr) }
436 };
437
438 c.remove();
439 }
440
441 pub fn peek_flushable(&self) -> Option<Arc<StreamPriorityKey>> {
442 self.flushable.front().clone_pointer()
443 }
444
445 pub fn update_priority(
447 &mut self, old: &Arc<StreamPriorityKey>, new: &Arc<StreamPriorityKey>,
448 ) {
449 if old.readable.is_linked() {
450 self.remove_readable(old);
451 self.readable.insert(Arc::clone(new));
452 }
453
454 if old.writable.is_linked() {
455 self.remove_writable(old);
456 self.writable.insert(Arc::clone(new));
457 }
458
459 if old.flushable.is_linked() {
460 self.remove_flushable(old);
461 self.flushable.insert(Arc::clone(new));
462 }
463 }
464
465 pub fn insert_almost_full(&mut self, stream_id: u64) {
469 self.almost_full.insert(stream_id);
470 }
471
472 pub fn remove_almost_full(&mut self, stream_id: u64) {
474 self.almost_full.remove(&stream_id);
475 }
476
477 pub fn insert_blocked(&mut self, stream_id: u64, off: u64) {
482 self.blocked.insert(stream_id, off);
483 }
484
485 pub fn remove_blocked(&mut self, stream_id: u64) {
487 self.blocked.remove(&stream_id);
488 }
489
490 pub fn insert_reset(
495 &mut self, stream_id: u64, error_code: u64, final_size: u64,
496 ) {
497 self.reset.insert(stream_id, (error_code, final_size));
498 }
499
500 pub fn remove_reset(&mut self, stream_id: u64) {
502 self.reset.remove(&stream_id);
503 }
504
505 pub fn insert_stopped(&mut self, stream_id: u64, error_code: u64) {
510 self.stopped.insert(stream_id, error_code);
511 }
512
513 pub fn remove_stopped(&mut self, stream_id: u64) {
515 self.stopped.remove(&stream_id);
516 }
517
518 pub fn update_peer_max_streams_bidi(&mut self, v: u64) {
520 self.peer_max_streams_bidi = cmp::max(self.peer_max_streams_bidi, v);
521 }
522
523 pub fn update_peer_max_streams_uni(&mut self, v: u64) {
525 self.peer_max_streams_uni = cmp::max(self.peer_max_streams_uni, v);
526 }
527
528 pub fn update_max_streams_bidi(&mut self) {
530 self.local_max_streams_bidi = self.local_max_streams_bidi_next;
531 }
532
533 pub fn set_max_streams_bidi(&mut self, max: u64) {
535 self.local_max_streams_bidi = max;
536 self.local_max_streams_bidi_next = max;
537 self.initial_max_streams_bidi = max;
538 }
539
540 pub fn max_streams_bidi(&self) -> u64 {
542 self.local_max_streams_bidi
543 }
544
545 pub fn max_streams_bidi_next(&mut self) -> u64 {
547 self.local_max_streams_bidi_next
548 }
549
550 pub fn update_max_streams_uni(&mut self) {
552 self.local_max_streams_uni = self.local_max_streams_uni_next;
553 }
554
555 pub fn max_streams_uni_next(&mut self) -> u64 {
557 self.local_max_streams_uni_next
558 }
559
560 pub fn peer_streams_left_bidi(&self) -> u64 {
563 self.peer_max_streams_bidi - self.local_opened_streams_bidi
564 }
565
566 pub fn peer_streams_left_uni(&self) -> u64 {
569 self.peer_max_streams_uni - self.local_opened_streams_uni
570 }
571
572 pub fn collect(&mut self, stream_id: u64, local: bool) {
577 if !local {
578 if is_bidi(stream_id) {
581 self.local_max_streams_bidi_next =
582 self.local_max_streams_bidi_next.saturating_add(1);
583 } else {
584 self.local_max_streams_uni_next =
585 self.local_max_streams_uni_next.saturating_add(1);
586 }
587 }
588
589 let s = self.streams.remove(&stream_id).unwrap();
590
591 self.remove_readable(&s.priority_key);
592
593 self.remove_writable(&s.priority_key);
594
595 self.remove_flushable(&s.priority_key);
596
597 self.collected.insert(stream_id);
598 }
599
600 pub fn readable(&self) -> StreamIter {
602 StreamIter {
603 streams: self.readable.iter().map(|s| s.id).collect(),
604 index: 0,
605 }
606 }
607
608 pub fn writable(&self) -> StreamIter {
610 StreamIter {
611 streams: self.writable.iter().map(|s| s.id).collect(),
612 index: 0,
613 }
614 }
615
616 pub fn almost_full(&self) -> StreamIter {
618 StreamIter::from(&self.almost_full)
619 }
620
621 pub fn blocked(&self) -> hash_map::Iter<'_, u64, u64> {
623 self.blocked.iter()
624 }
625
626 pub fn reset(&self) -> hash_map::Iter<'_, u64, (u64, u64)> {
628 self.reset.iter()
629 }
630
631 pub fn stopped(&self) -> hash_map::Iter<'_, u64, u64> {
633 self.stopped.iter()
634 }
635
636 pub fn is_collected(&self, stream_id: u64) -> bool {
638 self.collected.contains(&stream_id)
639 }
640
641 pub fn has_flushable(&self) -> bool {
643 !self.flushable.is_empty()
644 }
645
646 pub fn has_readable(&self) -> bool {
648 !self.readable.is_empty()
649 }
650
651 pub fn has_almost_full(&self) -> bool {
654 !self.almost_full.is_empty()
655 }
656
657 pub fn has_blocked(&self) -> bool {
659 !self.blocked.is_empty()
660 }
661
662 pub fn has_reset(&self) -> bool {
664 !self.reset.is_empty()
665 }
666
667 pub fn has_stopped(&self) -> bool {
669 !self.stopped.is_empty()
670 }
671
672 pub fn should_update_max_streams_bidi(&self) -> bool {
678 let available = self
679 .local_max_streams_bidi
680 .saturating_sub(self.peer_opened_streams_bidi);
681 self.local_max_streams_bidi_next != self.local_max_streams_bidi &&
682 available <= self.initial_max_streams_bidi / 2
683 }
684
685 pub fn should_update_max_streams_uni(&self) -> bool {
691 let available = self
692 .local_max_streams_uni
693 .saturating_sub(self.peer_opened_streams_uni);
694 self.local_max_streams_uni_next != self.local_max_streams_uni &&
695 available <= self.initial_max_streams_uni / 2
696 }
697
698 #[cfg(test)]
700 pub fn len(&self) -> usize {
701 self.streams.len()
702 }
703}
704
705pub struct Stream<F: BufFactory = DefaultBufFactory> {
707 pub recv: recv_buf::RecvBuf,
709
710 pub send: send_buf::SendBuf<F>,
712
713 pub send_lowat: usize,
714
715 pub bidi: bool,
717
718 pub local: bool,
720
721 pub urgency: u8,
723
724 pub incremental: bool,
726
727 pub priority_key: Arc<StreamPriorityKey>,
728}
729
730impl<F: BufFactory> Stream<F> {
731 pub fn new(
733 id: u64, max_rx_data: u64, max_tx_data: u64, bidi: bool, local: bool,
734 max_window: u64,
735 ) -> Self {
736 let priority_key = Arc::new(StreamPriorityKey {
737 id,
738 ..Default::default()
739 });
740
741 Stream {
742 recv: recv_buf::RecvBuf::new(max_rx_data, max_window),
743 send: send_buf::SendBuf::new(max_tx_data),
744 send_lowat: 1,
745 bidi,
746 local,
747 urgency: priority_key.urgency,
748 incremental: priority_key.incremental,
749 priority_key,
750 }
751 }
752
753 pub fn is_readable(&self) -> bool {
755 self.recv.ready()
756 }
757
758 pub fn is_writable(&self) -> bool {
761 !self.send.is_shutdown() &&
762 !self.send.is_fin() &&
763 (self.send.off_back() + self.send_lowat as u64) <
764 self.send.max_off()
765 }
766
767 pub fn is_flushable(&self) -> bool {
770 let off_front = self.send.off_front();
771
772 !self.send.is_empty() &&
773 off_front < self.send.off_back() &&
774 off_front < self.send.max_off()
775 }
776
777 pub fn is_complete(&self) -> bool {
787 match (self.bidi, self.local) {
788 (true, _) => self.recv.is_fin() && self.send.is_complete(),
791
792 (false, true) => self.send.is_complete(),
795
796 (false, false) => self.recv.is_fin(),
799 }
800 }
801}
802
803pub fn is_local(stream_id: u64, is_server: bool) -> bool {
805 (stream_id & 0x1) == (is_server as u64)
806}
807
808pub fn is_bidi(stream_id: u64) -> bool {
810 (stream_id & 0x2) == 0
811}
812
813#[derive(Clone, Debug)]
814pub struct StreamPriorityKey {
815 pub urgency: u8,
816 pub incremental: bool,
817 pub id: u64,
818
819 pub readable: RBTreeAtomicLink,
820 pub writable: RBTreeAtomicLink,
821 pub flushable: RBTreeAtomicLink,
822}
823
824impl Default for StreamPriorityKey {
825 fn default() -> Self {
826 Self {
827 urgency: DEFAULT_URGENCY,
828 incremental: true,
829 id: Default::default(),
830 readable: Default::default(),
831 writable: Default::default(),
832 flushable: Default::default(),
833 }
834 }
835}
836
837impl PartialEq for StreamPriorityKey {
838 fn eq(&self, other: &Self) -> bool {
839 self.id == other.id
840 }
841}
842
843impl Eq for StreamPriorityKey {}
844
845impl PartialOrd for StreamPriorityKey {
846 fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
847 Some(self.cmp(other))
848 }
849}
850
851impl Ord for StreamPriorityKey {
852 fn cmp(&self, other: &Self) -> cmp::Ordering {
853 if self.id == other.id {
855 return cmp::Ordering::Equal;
856 }
857
858 if self.urgency != other.urgency {
860 return self.urgency.cmp(&other.urgency);
861 }
862
863 if !self.incremental && !other.incremental {
866 return self.id.cmp(&other.id);
867 }
868
869 if self.incremental && !other.incremental {
871 return cmp::Ordering::Greater;
872 }
873 if !self.incremental && other.incremental {
874 return cmp::Ordering::Less;
875 }
876
877 cmp::Ordering::Greater
881 }
882}
883
884intrusive_adapter!(pub StreamWritablePriorityAdapter = Arc<StreamPriorityKey>: StreamPriorityKey { writable: RBTreeAtomicLink });
885
886impl KeyAdapter<'_> for StreamWritablePriorityAdapter {
887 type Key = StreamPriorityKey;
888
889 fn get_key(&self, s: &StreamPriorityKey) -> Self::Key {
890 s.clone()
891 }
892}
893
894intrusive_adapter!(pub StreamReadablePriorityAdapter = Arc<StreamPriorityKey>: StreamPriorityKey { readable: RBTreeAtomicLink });
895
896impl KeyAdapter<'_> for StreamReadablePriorityAdapter {
897 type Key = StreamPriorityKey;
898
899 fn get_key(&self, s: &StreamPriorityKey) -> Self::Key {
900 s.clone()
901 }
902}
903
904intrusive_adapter!(pub StreamFlushablePriorityAdapter = Arc<StreamPriorityKey>: StreamPriorityKey { flushable: RBTreeAtomicLink });
905
906impl KeyAdapter<'_> for StreamFlushablePriorityAdapter {
907 type Key = StreamPriorityKey;
908
909 fn get_key(&self, s: &StreamPriorityKey) -> Self::Key {
910 s.clone()
911 }
912}
913
914#[derive(Default)]
916pub struct StreamIter {
917 streams: SmallVec<[u64; 8]>,
918 index: usize,
919}
920
921impl StreamIter {
922 #[inline]
923 fn from(streams: &StreamIdHashSet) -> Self {
924 StreamIter {
925 streams: streams.iter().copied().collect(),
926 index: 0,
927 }
928 }
929}
930
931impl Iterator for StreamIter {
932 type Item = u64;
933
934 #[inline]
935 fn next(&mut self) -> Option<Self::Item> {
936 let v = self.streams.get(self.index)?;
937 self.index += 1;
938 Some(*v)
939 }
940}
941
942impl ExactSizeIterator for StreamIter {
943 #[inline]
944 fn len(&self) -> usize {
945 self.streams.len() - self.index
946 }
947}
948
949#[cfg(test)]
950mod tests {
951 use crate::range_buf::RangeBuf;
952
953 use super::*;
954
955 #[test]
956 fn recv_flow_control() {
957 let mut stream =
958 <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
959 assert!(!stream.recv.almost_full());
960
961 let mut buf = [0; 32];
962
963 let first = RangeBuf::from(b"hello", 0, false);
964 let second = RangeBuf::from(b"world", 5, false);
965 let third = RangeBuf::from(b"something", 10, false);
966
967 assert_eq!(stream.recv.write(second), Ok(()));
968 assert_eq!(stream.recv.write(first), Ok(()));
969 assert!(!stream.recv.almost_full());
970
971 assert_eq!(stream.recv.write(third), Err(Error::FlowControl));
972
973 let (len, fin) = stream.recv.emit(&mut buf).unwrap();
974 assert_eq!(&buf[..len], b"helloworld");
975 assert!(!fin);
976
977 assert!(stream.recv.almost_full());
978
979 stream.recv.update_max_data(std::time::Instant::now());
980 assert_eq!(stream.recv.max_data_next(), 25);
981 assert!(!stream.recv.almost_full());
982
983 let third = RangeBuf::from(b"something", 10, false);
984 assert_eq!(stream.recv.write(third), Ok(()));
985 }
986
987 #[test]
988 fn recv_past_fin() {
989 let mut stream =
990 <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
991 assert!(!stream.recv.almost_full());
992
993 let first = RangeBuf::from(b"hello", 0, true);
994 let second = RangeBuf::from(b"world", 5, false);
995
996 assert_eq!(stream.recv.write(first), Ok(()));
997 assert_eq!(stream.recv.write(second), Err(Error::FinalSize));
998 }
999
1000 #[test]
1001 fn recv_fin_dup() {
1002 let mut stream =
1003 <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1004 assert!(!stream.recv.almost_full());
1005
1006 let first = RangeBuf::from(b"hello", 0, true);
1007 let second = RangeBuf::from(b"hello", 0, true);
1008
1009 assert_eq!(stream.recv.write(first), Ok(()));
1010 assert_eq!(stream.recv.write(second), Ok(()));
1011
1012 let mut buf = [0; 32];
1013
1014 let (len, fin) = stream.recv.emit(&mut buf).unwrap();
1015 assert_eq!(&buf[..len], b"hello");
1016 assert!(fin);
1017 }
1018
1019 #[test]
1020 fn recv_fin_change() {
1021 let mut stream =
1022 <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1023 assert!(!stream.recv.almost_full());
1024
1025 let first = RangeBuf::from(b"hello", 0, true);
1026 let second = RangeBuf::from(b"world", 5, true);
1027
1028 assert_eq!(stream.recv.write(second), Ok(()));
1029 assert_eq!(stream.recv.write(first), Err(Error::FinalSize));
1030 }
1031
1032 #[test]
1033 fn recv_fin_lower_than_received() {
1034 let mut stream =
1035 <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1036 assert!(!stream.recv.almost_full());
1037
1038 let first = RangeBuf::from(b"hello", 0, true);
1039 let second = RangeBuf::from(b"world", 5, false);
1040
1041 assert_eq!(stream.recv.write(second), Ok(()));
1042 assert_eq!(stream.recv.write(first), Err(Error::FinalSize));
1043 }
1044
1045 #[test]
1046 fn recv_fin_flow_control() {
1047 let mut stream =
1048 <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1049 assert!(!stream.recv.almost_full());
1050
1051 let mut buf = [0; 32];
1052
1053 let first = RangeBuf::from(b"hello", 0, false);
1054 let second = RangeBuf::from(b"world", 5, true);
1055
1056 assert_eq!(stream.recv.write(first), Ok(()));
1057 assert_eq!(stream.recv.write(second), Ok(()));
1058
1059 let (len, fin) = stream.recv.emit(&mut buf).unwrap();
1060 assert_eq!(&buf[..len], b"helloworld");
1061 assert!(fin);
1062
1063 assert!(!stream.recv.almost_full());
1064 }
1065
1066 #[test]
1067 fn recv_fin_reset_mismatch() {
1068 let mut stream =
1069 <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1070 assert!(!stream.recv.almost_full());
1071
1072 let first = RangeBuf::from(b"hello", 0, true);
1073
1074 assert_eq!(stream.recv.write(first), Ok(()));
1075 assert_eq!(stream.recv.reset(0, 10), Err(Error::FinalSize));
1076 }
1077
1078 #[test]
1079 fn recv_reset_with_gap() {
1080 let mut stream =
1081 <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1082 assert!(!stream.recv.almost_full());
1083
1084 let first = RangeBuf::from(b"hello", 0, false);
1085
1086 assert_eq!(stream.recv.write(first), Ok(()));
1087 assert_eq!(stream.recv.emit(&mut [0; 1]), Ok((1, false)));
1089 assert_eq!(
1091 stream.recv.reset(0, 10),
1092 Ok(RecvBufResetReturn {
1093 max_data_delta: 5,
1094 consumed_flowcontrol: 9
1096 })
1097 );
1098 assert_eq!(stream.recv.reset(0, 10), Ok(RecvBufResetReturn::zero()));
1099 }
1100
1101 #[test]
1102 fn recv_reset_dup() {
1103 let mut stream =
1104 <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1105 assert!(!stream.recv.almost_full());
1106
1107 let first = RangeBuf::from(b"hello", 0, false);
1108
1109 assert_eq!(stream.recv.write(first), Ok(()));
1110 assert_eq!(
1111 stream.recv.reset(0, 5),
1112 Ok(RecvBufResetReturn {
1113 max_data_delta: 0,
1114 consumed_flowcontrol: 5
1115 })
1116 );
1117 assert_eq!(stream.recv.reset(0, 5), Ok(RecvBufResetReturn::zero()));
1118 }
1119
1120 #[test]
1121 fn recv_reset_change() {
1122 let mut stream =
1123 <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1124 assert!(!stream.recv.almost_full());
1125
1126 let first = RangeBuf::from(b"hello", 0, false);
1127
1128 assert_eq!(stream.recv.write(first), Ok(()));
1129 assert_eq!(
1130 stream.recv.reset(0, 5),
1131 Ok(RecvBufResetReturn {
1132 max_data_delta: 0,
1133 consumed_flowcontrol: 5
1134 })
1135 );
1136 assert_eq!(stream.recv.reset(0, 10), Err(Error::FinalSize));
1137 }
1138
1139 #[test]
1140 fn recv_reset_lower_than_received() {
1141 let mut stream =
1142 <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1143 assert!(!stream.recv.almost_full());
1144
1145 let first = RangeBuf::from(b"hello", 0, false);
1146
1147 assert_eq!(stream.recv.write(first), Ok(()));
1148 assert_eq!(stream.recv.reset(0, 4), Err(Error::FinalSize));
1149 }
1150
1151 #[test]
1152 fn send_flow_control() {
1153 let mut buf = [0; 25];
1154
1155 let mut stream =
1156 <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1157
1158 let first = b"hello";
1159 let second = b"world";
1160 let third = b"something";
1161
1162 assert!(stream.send.write(first, false).is_ok());
1163 assert!(stream.send.write(second, false).is_ok());
1164 assert!(stream.send.write(third, false).is_ok());
1165
1166 assert_eq!(stream.send.off_front(), 0);
1167
1168 let (written, fin) = stream.send.emit(&mut buf[..25]).unwrap();
1169 assert_eq!(written, 15);
1170 assert!(!fin);
1171 assert_eq!(&buf[..written], b"helloworldsomet");
1172
1173 assert_eq!(stream.send.off_front(), 15);
1174
1175 let (written, fin) = stream.send.emit(&mut buf[..25]).unwrap();
1176 assert_eq!(written, 0);
1177 assert!(!fin);
1178 assert_eq!(&buf[..written], b"");
1179
1180 stream.send.retransmit(0, 15);
1181
1182 assert_eq!(stream.send.off_front(), 0);
1183
1184 let (written, fin) = stream.send.emit(&mut buf[..10]).unwrap();
1185 assert_eq!(written, 10);
1186 assert!(!fin);
1187 assert_eq!(&buf[..written], b"helloworld");
1188
1189 assert_eq!(stream.send.off_front(), 10);
1190
1191 let (written, fin) = stream.send.emit(&mut buf[..10]).unwrap();
1192 assert_eq!(written, 5);
1193 assert!(!fin);
1194 assert_eq!(&buf[..written], b"somet");
1195 }
1196
1197 #[test]
1198 fn send_past_fin() {
1199 let mut stream =
1200 <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1201
1202 let first = b"hello";
1203 let second = b"world";
1204 let third = b"third";
1205
1206 assert_eq!(stream.send.write(first, false), Ok(5));
1207
1208 assert_eq!(stream.send.write(second, true), Ok(5));
1209 assert!(stream.send.is_fin());
1210
1211 assert_eq!(stream.send.write(third, false), Err(Error::FinalSize));
1212 }
1213
1214 #[test]
1215 fn send_fin_dup() {
1216 let mut stream =
1217 <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1218
1219 assert_eq!(stream.send.write(b"hello", true), Ok(5));
1220 assert!(stream.send.is_fin());
1221
1222 assert_eq!(stream.send.write(b"", true), Ok(0));
1223 assert!(stream.send.is_fin());
1224 }
1225
1226 #[test]
1227 fn send_undo_fin() {
1228 let mut stream =
1229 <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1230
1231 assert_eq!(stream.send.write(b"hello", true), Ok(5));
1232 assert!(stream.send.is_fin());
1233
1234 assert_eq!(
1235 stream.send.write(b"helloworld", true),
1236 Err(Error::FinalSize)
1237 );
1238 }
1239
1240 #[test]
1241 fn send_fin_max_data_match() {
1242 let mut buf = [0; 15];
1243
1244 let mut stream =
1245 <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1246
1247 let slice = b"hellohellohello";
1248
1249 assert!(stream.send.write(slice, true).is_ok());
1250
1251 let (written, fin) = stream.send.emit(&mut buf[..15]).unwrap();
1252 assert_eq!(written, 15);
1253 assert!(fin);
1254 assert_eq!(&buf[..written], slice);
1255 }
1256
1257 #[test]
1258 fn send_fin_zero_length() {
1259 let mut buf = [0; 5];
1260
1261 let mut stream =
1262 <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1263
1264 assert_eq!(stream.send.write(b"hello", false), Ok(5));
1265 assert_eq!(stream.send.write(b"", true), Ok(0));
1266 assert!(stream.send.is_fin());
1267
1268 let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1269 assert_eq!(written, 5);
1270 assert!(fin);
1271 assert_eq!(&buf[..written], b"hello");
1272 }
1273
1274 #[test]
1275 fn send_ack() {
1276 let mut buf = [0; 5];
1277
1278 let mut stream =
1279 <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1280
1281 assert_eq!(stream.send.write(b"hello", false), Ok(5));
1282 assert_eq!(stream.send.write(b"world", false), Ok(5));
1283 assert_eq!(stream.send.write(b"", true), Ok(0));
1284 assert!(stream.send.is_fin());
1285
1286 assert_eq!(stream.send.off_front(), 0);
1287
1288 let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1289 assert_eq!(written, 5);
1290 assert!(!fin);
1291 assert_eq!(&buf[..written], b"hello");
1292
1293 stream.send.ack_and_drop(0, 5);
1294
1295 stream.send.retransmit(0, 5);
1296
1297 assert_eq!(stream.send.off_front(), 5);
1298
1299 let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1300 assert_eq!(written, 5);
1301 assert!(fin);
1302 assert_eq!(&buf[..written], b"world");
1303 }
1304
1305 #[test]
1306 fn send_ack_reordering() {
1307 let mut buf = [0; 5];
1308
1309 let mut stream =
1310 <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1311
1312 assert_eq!(stream.send.write(b"hello", false), Ok(5));
1313 assert_eq!(stream.send.write(b"world", false), Ok(5));
1314 assert_eq!(stream.send.write(b"", true), Ok(0));
1315 assert!(stream.send.is_fin());
1316
1317 assert_eq!(stream.send.off_front(), 0);
1318
1319 let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1320 assert_eq!(written, 5);
1321 assert!(!fin);
1322 assert_eq!(&buf[..written], b"hello");
1323
1324 assert_eq!(stream.send.off_front(), 5);
1325
1326 let (written, fin) = stream.send.emit(&mut buf[..1]).unwrap();
1327 assert_eq!(written, 1);
1328 assert!(!fin);
1329 assert_eq!(&buf[..written], b"w");
1330
1331 stream.send.ack_and_drop(5, 1);
1332 stream.send.ack_and_drop(0, 5);
1333
1334 stream.send.retransmit(0, 5);
1335 stream.send.retransmit(5, 1);
1336
1337 assert_eq!(stream.send.off_front(), 6);
1338
1339 let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1340 assert_eq!(written, 4);
1341 assert!(fin);
1342 assert_eq!(&buf[..written], b"orld");
1343 }
1344
1345 #[test]
1346 fn recv_data_below_off() {
1347 let mut stream =
1348 <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1349
1350 let first = RangeBuf::from(b"hello", 0, false);
1351
1352 assert_eq!(stream.recv.write(first), Ok(()));
1353
1354 let mut buf = [0; 10];
1355
1356 let (len, fin) = stream.recv.emit(&mut buf).unwrap();
1357 assert_eq!(&buf[..len], b"hello");
1358 assert!(!fin);
1359
1360 let first = RangeBuf::from(b"elloworld", 1, true);
1361 assert_eq!(stream.recv.write(first), Ok(()));
1362
1363 let (len, fin) = stream.recv.emit(&mut buf).unwrap();
1364 assert_eq!(&buf[..len], b"world");
1365 assert!(fin);
1366 }
1367
1368 #[test]
1369 fn stream_complete() {
1370 let mut stream =
1371 <Stream>::new(0, 30, 30, true, true, DEFAULT_STREAM_WINDOW);
1372
1373 assert_eq!(stream.send.write(b"hello", false), Ok(5));
1374 assert_eq!(stream.send.write(b"world", false), Ok(5));
1375
1376 assert!(!stream.send.is_complete());
1377 assert!(!stream.send.is_fin());
1378
1379 assert_eq!(stream.send.write(b"", true), Ok(0));
1380
1381 assert!(!stream.send.is_complete());
1382 assert!(stream.send.is_fin());
1383
1384 let buf = RangeBuf::from(b"hello", 0, true);
1385 assert!(stream.recv.write(buf).is_ok());
1386 assert!(!stream.recv.is_fin());
1387
1388 stream.send.ack(6, 4);
1389 assert!(!stream.send.is_complete());
1390
1391 let mut buf = [0; 2];
1392 assert_eq!(stream.recv.emit(&mut buf), Ok((2, false)));
1393 assert!(!stream.recv.is_fin());
1394
1395 stream.send.ack(1, 5);
1396 assert!(!stream.send.is_complete());
1397
1398 stream.send.ack(0, 1);
1399 assert!(stream.send.is_complete());
1400
1401 assert!(!stream.is_complete());
1402
1403 let mut buf = [0; 3];
1404 assert_eq!(stream.recv.emit(&mut buf), Ok((3, true)));
1405 assert!(stream.recv.is_fin());
1406
1407 assert!(stream.is_complete());
1408 }
1409
1410 #[test]
1411 fn send_fin_zero_length_output() {
1412 let mut buf = [0; 5];
1413
1414 let mut stream =
1415 <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1416
1417 assert_eq!(stream.send.write(b"hello", false), Ok(5));
1418 assert_eq!(stream.send.off_front(), 0);
1419 assert!(!stream.send.is_fin());
1420
1421 let (written, fin) = stream.send.emit(&mut buf).unwrap();
1422 assert_eq!(written, 5);
1423 assert!(!fin);
1424 assert_eq!(&buf[..written], b"hello");
1425
1426 assert_eq!(stream.send.write(b"", true), Ok(0));
1427 assert!(stream.send.is_fin());
1428 assert_eq!(stream.send.off_front(), 5);
1429
1430 let (written, fin) = stream.send.emit(&mut buf).unwrap();
1431 assert_eq!(written, 0);
1432 assert!(fin);
1433 assert_eq!(&buf[..written], b"");
1434 }
1435
1436 fn stream_send_ready(stream: &Stream) -> bool {
1437 !stream.send.is_empty() &&
1438 stream.send.off_front() < stream.send.off_back()
1439 }
1440
1441 #[test]
1442 fn send_emit() {
1443 let mut buf = [0; 5];
1444
1445 let mut stream =
1446 <Stream>::new(0, 0, 20, true, true, DEFAULT_STREAM_WINDOW);
1447
1448 assert_eq!(stream.send.write(b"hello", false), Ok(5));
1449 assert_eq!(stream.send.write(b"world", false), Ok(5));
1450 assert_eq!(stream.send.write(b"olleh", false), Ok(5));
1451 assert_eq!(stream.send.write(b"dlrow", true), Ok(5));
1452 assert_eq!(stream.send.off_front(), 0);
1453 assert_eq!(stream.send.bufs_count(), 4);
1454
1455 assert!(stream.is_flushable());
1456
1457 assert!(stream_send_ready(&stream));
1458 assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1459 assert_eq!(stream.send.off_front(), 4);
1460 assert_eq!(&buf[..4], b"hell");
1461
1462 assert!(stream_send_ready(&stream));
1463 assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1464 assert_eq!(stream.send.off_front(), 8);
1465 assert_eq!(&buf[..4], b"owor");
1466
1467 assert!(stream_send_ready(&stream));
1468 assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1469 assert_eq!(stream.send.off_front(), 10);
1470 assert_eq!(&buf[..2], b"ld");
1471
1472 assert!(stream_send_ready(&stream));
1473 assert_eq!(stream.send.emit(&mut buf[..1]), Ok((1, false)));
1474 assert_eq!(stream.send.off_front(), 11);
1475 assert_eq!(&buf[..1], b"o");
1476
1477 assert!(stream_send_ready(&stream));
1478 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1479 assert_eq!(stream.send.off_front(), 16);
1480 assert_eq!(&buf[..5], b"llehd");
1481
1482 assert!(stream_send_ready(&stream));
1483 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((4, true)));
1484 assert_eq!(stream.send.off_front(), 20);
1485 assert_eq!(&buf[..4], b"lrow");
1486
1487 assert!(!stream.is_flushable());
1488
1489 assert!(!stream_send_ready(&stream));
1490 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((0, true)));
1491 assert_eq!(stream.send.off_front(), 20);
1492 }
1493
1494 #[test]
1495 fn send_emit_ack() {
1496 let mut buf = [0; 5];
1497
1498 let mut stream =
1499 <Stream>::new(0, 0, 20, true, true, DEFAULT_STREAM_WINDOW);
1500
1501 assert_eq!(stream.send.write(b"hello", false), Ok(5));
1502 assert_eq!(stream.send.write(b"world", false), Ok(5));
1503 assert_eq!(stream.send.write(b"olleh", false), Ok(5));
1504 assert_eq!(stream.send.write(b"dlrow", true), Ok(5));
1505 assert_eq!(stream.send.off_front(), 0);
1506 assert_eq!(stream.send.bufs_count(), 4);
1507
1508 assert!(stream.is_flushable());
1509
1510 assert!(stream_send_ready(&stream));
1511 assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1512 assert_eq!(stream.send.off_front(), 4);
1513 assert_eq!(&buf[..4], b"hell");
1514
1515 assert!(stream_send_ready(&stream));
1516 assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1517 assert_eq!(stream.send.off_front(), 8);
1518 assert_eq!(&buf[..4], b"owor");
1519
1520 stream.send.ack_and_drop(0, 5);
1521 assert_eq!(stream.send.bufs_count(), 3);
1522
1523 assert!(stream_send_ready(&stream));
1524 assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1525 assert_eq!(stream.send.off_front(), 10);
1526 assert_eq!(&buf[..2], b"ld");
1527
1528 stream.send.ack_and_drop(7, 5);
1529 assert_eq!(stream.send.bufs_count(), 3);
1530
1531 assert!(stream_send_ready(&stream));
1532 assert_eq!(stream.send.emit(&mut buf[..1]), Ok((1, false)));
1533 assert_eq!(stream.send.off_front(), 11);
1534 assert_eq!(&buf[..1], b"o");
1535
1536 assert!(stream_send_ready(&stream));
1537 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1538 assert_eq!(stream.send.off_front(), 16);
1539 assert_eq!(&buf[..5], b"llehd");
1540
1541 stream.send.ack_and_drop(5, 7);
1542 assert_eq!(stream.send.bufs_count(), 2);
1543
1544 assert!(stream_send_ready(&stream));
1545 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((4, true)));
1546 assert_eq!(stream.send.off_front(), 20);
1547 assert_eq!(&buf[..4], b"lrow");
1548
1549 assert!(!stream.is_flushable());
1550
1551 assert!(!stream_send_ready(&stream));
1552 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((0, true)));
1553 assert_eq!(stream.send.off_front(), 20);
1554
1555 stream.send.ack_and_drop(22, 4);
1556 assert_eq!(stream.send.bufs_count(), 2);
1557
1558 stream.send.ack_and_drop(20, 1);
1559 assert_eq!(stream.send.bufs_count(), 2);
1560 }
1561
1562 #[test]
1563 fn send_emit_retransmit() {
1564 let mut buf = [0; 5];
1565
1566 let mut stream =
1567 <Stream>::new(0, 0, 20, true, true, DEFAULT_STREAM_WINDOW);
1568
1569 assert_eq!(stream.send.write(b"hello", false), Ok(5));
1570 assert_eq!(stream.send.write(b"world", false), Ok(5));
1571 assert_eq!(stream.send.write(b"olleh", false), Ok(5));
1572 assert_eq!(stream.send.write(b"dlrow", true), Ok(5));
1573 assert_eq!(stream.send.off_front(), 0);
1574 assert_eq!(stream.send.bufs_count(), 4);
1575
1576 assert!(stream.is_flushable());
1577
1578 assert!(stream_send_ready(&stream));
1579 assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1580 assert_eq!(stream.send.off_front(), 4);
1581 assert_eq!(&buf[..4], b"hell");
1582
1583 assert!(stream_send_ready(&stream));
1584 assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1585 assert_eq!(stream.send.off_front(), 8);
1586 assert_eq!(&buf[..4], b"owor");
1587
1588 stream.send.retransmit(3, 3);
1589 assert_eq!(stream.send.off_front(), 3);
1590
1591 assert!(stream_send_ready(&stream));
1592 assert_eq!(stream.send.emit(&mut buf[..3]), Ok((3, false)));
1593 assert_eq!(stream.send.off_front(), 8);
1594 assert_eq!(&buf[..3], b"low");
1595
1596 assert!(stream_send_ready(&stream));
1597 assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1598 assert_eq!(stream.send.off_front(), 10);
1599 assert_eq!(&buf[..2], b"ld");
1600
1601 stream.send.ack_and_drop(7, 2);
1602
1603 stream.send.retransmit(8, 2);
1604
1605 assert!(stream_send_ready(&stream));
1606 assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1607 assert_eq!(stream.send.off_front(), 10);
1608 assert_eq!(&buf[..2], b"ld");
1609
1610 assert!(stream_send_ready(&stream));
1611 assert_eq!(stream.send.emit(&mut buf[..1]), Ok((1, false)));
1612 assert_eq!(stream.send.off_front(), 11);
1613 assert_eq!(&buf[..1], b"o");
1614
1615 assert!(stream_send_ready(&stream));
1616 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1617 assert_eq!(stream.send.off_front(), 16);
1618 assert_eq!(&buf[..5], b"llehd");
1619
1620 stream.send.retransmit(12, 2);
1621
1622 assert!(stream_send_ready(&stream));
1623 assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1624 assert_eq!(stream.send.off_front(), 16);
1625 assert_eq!(&buf[..2], b"le");
1626
1627 assert!(stream_send_ready(&stream));
1628 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((4, true)));
1629 assert_eq!(stream.send.off_front(), 20);
1630 assert_eq!(&buf[..4], b"lrow");
1631
1632 assert!(!stream.is_flushable());
1633
1634 assert!(!stream_send_ready(&stream));
1635 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((0, true)));
1636 assert_eq!(stream.send.off_front(), 20);
1637
1638 stream.send.retransmit(7, 12);
1639
1640 assert!(stream_send_ready(&stream));
1641 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1642 assert_eq!(stream.send.off_front(), 12);
1643 assert_eq!(&buf[..5], b"rldol");
1644
1645 assert!(stream_send_ready(&stream));
1646 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1647 assert_eq!(stream.send.off_front(), 17);
1648 assert_eq!(&buf[..5], b"lehdl");
1649
1650 assert!(stream_send_ready(&stream));
1651 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((2, false)));
1652 assert_eq!(stream.send.off_front(), 20);
1653 assert_eq!(&buf[..2], b"ro");
1654
1655 stream.send.ack_and_drop(12, 7);
1656
1657 stream.send.retransmit(7, 12);
1658
1659 assert!(stream_send_ready(&stream));
1660 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1661 assert_eq!(stream.send.off_front(), 12);
1662 assert_eq!(&buf[..5], b"rldol");
1663
1664 assert!(stream_send_ready(&stream));
1665 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1666 assert_eq!(stream.send.off_front(), 17);
1667 assert_eq!(&buf[..5], b"lehdl");
1668
1669 assert!(stream_send_ready(&stream));
1670 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((2, false)));
1671 assert_eq!(stream.send.off_front(), 20);
1672 assert_eq!(&buf[..2], b"ro");
1673 }
1674
1675 #[test]
1676 fn rangebuf_split_off() {
1677 let mut buf = <RangeBuf>::from(b"helloworld", 5, true);
1678 assert_eq!(buf.start, 0);
1679 assert_eq!(buf.pos, 0);
1680 assert_eq!(buf.len, 10);
1681 assert_eq!(buf.off, 5);
1682 assert!(buf.fin);
1683
1684 assert_eq!(buf.len(), 10);
1685 assert_eq!(buf.off(), 5);
1686 assert!(buf.fin());
1687
1688 assert_eq!(&buf[..], b"helloworld");
1689
1690 buf.consume(5);
1692
1693 assert_eq!(buf.start, 0);
1694 assert_eq!(buf.pos, 5);
1695 assert_eq!(buf.len, 10);
1696 assert_eq!(buf.off, 5);
1697 assert!(buf.fin);
1698
1699 assert_eq!(buf.len(), 5);
1700 assert_eq!(buf.off(), 10);
1701 assert!(buf.fin());
1702
1703 assert_eq!(&buf[..], b"world");
1704
1705 let mut new_buf = buf.split_off(3);
1707
1708 assert_eq!(buf.start, 0);
1709 assert_eq!(buf.pos, 3);
1710 assert_eq!(buf.len, 3);
1711 assert_eq!(buf.off, 5);
1712 assert!(!buf.fin);
1713
1714 assert_eq!(buf.len(), 0);
1715 assert_eq!(buf.off(), 8);
1716 assert!(!buf.fin());
1717
1718 assert_eq!(&buf[..], b"");
1719
1720 assert_eq!(new_buf.start, 3);
1721 assert_eq!(new_buf.pos, 5);
1722 assert_eq!(new_buf.len, 7);
1723 assert_eq!(new_buf.off, 8);
1724 assert!(new_buf.fin);
1725
1726 assert_eq!(new_buf.len(), 5);
1727 assert_eq!(new_buf.off(), 10);
1728 assert!(new_buf.fin());
1729
1730 assert_eq!(&new_buf[..], b"world");
1731
1732 new_buf.consume(2);
1734
1735 assert_eq!(new_buf.start, 3);
1736 assert_eq!(new_buf.pos, 7);
1737 assert_eq!(new_buf.len, 7);
1738 assert_eq!(new_buf.off, 8);
1739 assert!(new_buf.fin);
1740
1741 assert_eq!(new_buf.len(), 3);
1742 assert_eq!(new_buf.off(), 12);
1743 assert!(new_buf.fin());
1744
1745 assert_eq!(&new_buf[..], b"rld");
1746
1747 let mut new_new_buf = new_buf.split_off(5);
1749
1750 assert_eq!(new_buf.start, 3);
1751 assert_eq!(new_buf.pos, 7);
1752 assert_eq!(new_buf.len, 5);
1753 assert_eq!(new_buf.off, 8);
1754 assert!(!new_buf.fin);
1755
1756 assert_eq!(new_buf.len(), 1);
1757 assert_eq!(new_buf.off(), 12);
1758 assert!(!new_buf.fin());
1759
1760 assert_eq!(&new_buf[..], b"r");
1761
1762 assert_eq!(new_new_buf.start, 8);
1763 assert_eq!(new_new_buf.pos, 8);
1764 assert_eq!(new_new_buf.len, 2);
1765 assert_eq!(new_new_buf.off, 13);
1766 assert!(new_new_buf.fin);
1767
1768 assert_eq!(new_new_buf.len(), 2);
1769 assert_eq!(new_new_buf.off(), 13);
1770 assert!(new_new_buf.fin());
1771
1772 assert_eq!(&new_new_buf[..], b"ld");
1773
1774 new_new_buf.consume(2);
1776
1777 assert_eq!(new_new_buf.start, 8);
1778 assert_eq!(new_new_buf.pos, 10);
1779 assert_eq!(new_new_buf.len, 2);
1780 assert_eq!(new_new_buf.off, 13);
1781 assert!(new_new_buf.fin);
1782
1783 assert_eq!(new_new_buf.len(), 0);
1784 assert_eq!(new_new_buf.off(), 15);
1785 assert!(new_new_buf.fin());
1786
1787 assert_eq!(&new_new_buf[..], b"");
1788 }
1789
1790 #[test]
1793 fn stream_limit_auto_open() {
1794 let local_tp = crate::TransportParams::default();
1795 let peer_tp = crate::TransportParams::default();
1796
1797 let mut streams = <StreamMap>::new(5, 5, 5);
1798
1799 let stream_id = 500;
1800 assert!(!is_local(stream_id, true), "stream id is peer initiated");
1801 assert!(is_bidi(stream_id), "stream id is bidirectional");
1802 assert_eq!(
1803 streams
1804 .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
1805 .err(),
1806 Some(Error::StreamLimit),
1807 "stream limit should be exceeded"
1808 );
1809 }
1810
1811 #[test]
1814 fn stream_create_out_of_order() {
1815 let local_tp = crate::TransportParams::default();
1816 let peer_tp = crate::TransportParams::default();
1817
1818 let mut streams = <StreamMap>::new(5, 5, 5);
1819
1820 for stream_id in [8, 12, 4] {
1821 assert!(is_local(stream_id, false), "stream id is client initiated");
1822 assert!(is_bidi(stream_id), "stream id is bidirectional");
1823 assert!(streams
1824 .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
1825 .is_ok());
1826 }
1827 }
1828
1829 #[test]
1831 fn stream_limit_edge() {
1832 let local_tp = crate::TransportParams::default();
1833 let peer_tp = crate::TransportParams::default();
1834
1835 let mut streams = <StreamMap>::new(3, 3, 3);
1836
1837 let stream_id = 8;
1839 assert!(streams
1840 .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
1841 .is_ok());
1842
1843 let stream_id = 12;
1845 assert_eq!(
1846 streams
1847 .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
1848 .err(),
1849 Some(Error::StreamLimit)
1850 );
1851 }
1852
1853 fn cycle_stream_priority(stream_id: u64, streams: &mut StreamMap) {
1854 let key = streams.get(stream_id).unwrap().priority_key.clone();
1855 streams.update_priority(&key.clone(), &key);
1856 }
1857
1858 #[test]
1859 fn writable_prioritized_default_priority() {
1860 let local_tp = crate::TransportParams::default();
1861 let peer_tp = crate::TransportParams {
1862 initial_max_stream_data_bidi_local: 100,
1863 initial_max_stream_data_uni: 100,
1864 ..Default::default()
1865 };
1866
1867 let mut streams = StreamMap::new(100, 100, 100);
1868
1869 for id in [0, 4, 8, 12] {
1870 assert!(streams
1871 .get_or_create(id, &local_tp, &peer_tp, false, true)
1872 .is_ok());
1873 }
1874
1875 let walk_1: Vec<u64> = streams.writable().collect();
1876 cycle_stream_priority(*walk_1.first().unwrap(), &mut streams);
1877 let walk_2: Vec<u64> = streams.writable().collect();
1878 cycle_stream_priority(*walk_2.first().unwrap(), &mut streams);
1879 let walk_3: Vec<u64> = streams.writable().collect();
1880 cycle_stream_priority(*walk_3.first().unwrap(), &mut streams);
1881 let walk_4: Vec<u64> = streams.writable().collect();
1882 cycle_stream_priority(*walk_4.first().unwrap(), &mut streams);
1883 let walk_5: Vec<u64> = streams.writable().collect();
1884
1885 assert_eq!(walk_1, vec![0, 4, 8, 12]);
1888 assert_eq!(walk_2, vec![4, 8, 12, 0]);
1889 assert_eq!(walk_3, vec![8, 12, 0, 4]);
1890 assert_eq!(walk_4, vec![12, 0, 4, 8,]);
1891 assert_eq!(walk_5, vec![0, 4, 8, 12]);
1892 }
1893
1894 #[test]
1895 fn writable_prioritized_insert_order() {
1896 let local_tp = crate::TransportParams::default();
1897 let peer_tp = crate::TransportParams {
1898 initial_max_stream_data_bidi_local: 100,
1899 initial_max_stream_data_uni: 100,
1900 ..Default::default()
1901 };
1902
1903 let mut streams = StreamMap::new(100, 100, 100);
1904
1905 for id in [12, 4, 8, 0] {
1908 assert!(streams
1909 .get_or_create(id, &local_tp, &peer_tp, false, true)
1910 .is_ok());
1911 }
1912
1913 let walk_1: Vec<u64> = streams.writable().collect();
1914 cycle_stream_priority(*walk_1.first().unwrap(), &mut streams);
1915 let walk_2: Vec<u64> = streams.writable().collect();
1916 cycle_stream_priority(*walk_2.first().unwrap(), &mut streams);
1917 let walk_3: Vec<u64> = streams.writable().collect();
1918 cycle_stream_priority(*walk_3.first().unwrap(), &mut streams);
1919 let walk_4: Vec<u64> = streams.writable().collect();
1920 cycle_stream_priority(*walk_4.first().unwrap(), &mut streams);
1921 let walk_5: Vec<u64> = streams.writable().collect();
1922 assert_eq!(walk_1, vec![12, 4, 8, 0]);
1923 assert_eq!(walk_2, vec![4, 8, 0, 12]);
1924 assert_eq!(walk_3, vec![8, 0, 12, 4,]);
1925 assert_eq!(walk_4, vec![0, 12, 4, 8]);
1926 assert_eq!(walk_5, vec![12, 4, 8, 0]);
1927 }
1928
1929 #[test]
1930 fn writable_prioritized_mixed_urgency() {
1931 let local_tp = crate::TransportParams::default();
1932 let peer_tp = crate::TransportParams {
1933 initial_max_stream_data_bidi_local: 100,
1934 initial_max_stream_data_uni: 100,
1935 ..Default::default()
1936 };
1937
1938 let mut streams = <StreamMap>::new(100, 100, 100);
1939
1940 let input = vec![
1943 (0, 100),
1944 (4, 90),
1945 (8, 80),
1946 (12, 70),
1947 (16, 60),
1948 (20, 50),
1949 (24, 40),
1950 (28, 30),
1951 (32, 20),
1952 (36, 10),
1953 (40, 0),
1954 ];
1955
1956 for (id, urgency) in input.clone() {
1957 let stream = streams
1960 .get_or_create(id, &local_tp, &peer_tp, false, true)
1961 .unwrap();
1962
1963 stream.urgency = urgency;
1964
1965 let new_priority_key = Arc::new(StreamPriorityKey {
1966 urgency: stream.urgency,
1967 incremental: stream.incremental,
1968 id,
1969 ..Default::default()
1970 });
1971
1972 let old_priority_key = std::mem::replace(
1973 &mut stream.priority_key,
1974 new_priority_key.clone(),
1975 );
1976
1977 streams.update_priority(&old_priority_key, &new_priority_key);
1978 }
1979
1980 let walk_1: Vec<u64> = streams.writable().collect();
1981 assert_eq!(walk_1, vec![40, 36, 32, 28, 24, 20, 16, 12, 8, 4, 0]);
1982
1983 for (id, urgency) in input {
1985 let stream = streams
1988 .get_or_create(id, &local_tp, &peer_tp, false, true)
1989 .unwrap();
1990
1991 stream.urgency = urgency;
1992
1993 let new_priority_key = Arc::new(StreamPriorityKey {
1994 urgency: stream.urgency,
1995 incremental: stream.incremental,
1996 id,
1997 ..Default::default()
1998 });
1999
2000 let old_priority_key = std::mem::replace(
2001 &mut stream.priority_key,
2002 new_priority_key.clone(),
2003 );
2004
2005 streams.update_priority(&old_priority_key, &new_priority_key);
2006 }
2007
2008 let walk_2: Vec<u64> = streams.writable().collect();
2009 assert_eq!(walk_2, vec![40, 36, 32, 28, 24, 20, 16, 12, 8, 4, 0]);
2010
2011 streams.collect(24, true);
2013
2014 let walk_3: Vec<u64> = streams.writable().collect();
2015 assert_eq!(walk_3, vec![40, 36, 32, 28, 20, 16, 12, 8, 4, 0]);
2016
2017 streams.collect(40, true);
2018 streams.collect(0, true);
2019
2020 let walk_4: Vec<u64> = streams.writable().collect();
2021 assert_eq!(walk_4, vec![36, 32, 28, 20, 16, 12, 8, 4]);
2022
2023 streams
2025 .get_or_create(44, &local_tp, &peer_tp, false, true)
2026 .unwrap();
2027
2028 let walk_5: Vec<u64> = streams.writable().collect();
2029 assert_eq!(walk_5, vec![36, 32, 28, 20, 16, 12, 8, 4, 44]);
2030 }
2031
2032 #[test]
2033 fn writable_prioritized_mixed_urgencies_incrementals() {
2034 let local_tp = crate::TransportParams::default();
2035 let peer_tp = crate::TransportParams {
2036 initial_max_stream_data_bidi_local: 100,
2037 initial_max_stream_data_uni: 100,
2038 ..Default::default()
2039 };
2040
2041 let mut streams = StreamMap::new(100, 100, 100);
2042
2043 let input = vec![
2045 (0, 100),
2046 (4, 20),
2047 (8, 100),
2048 (12, 20),
2049 (16, 90),
2050 (20, 25),
2051 (24, 90),
2052 (28, 30),
2053 (32, 80),
2054 (36, 20),
2055 (40, 0),
2056 ];
2057
2058 for (id, urgency) in input.clone() {
2059 let stream = streams
2062 .get_or_create(id, &local_tp, &peer_tp, false, true)
2063 .unwrap();
2064
2065 stream.urgency = urgency;
2066
2067 let new_priority_key = Arc::new(StreamPriorityKey {
2068 urgency: stream.urgency,
2069 incremental: stream.incremental,
2070 id,
2071 ..Default::default()
2072 });
2073
2074 let old_priority_key = std::mem::replace(
2075 &mut stream.priority_key,
2076 new_priority_key.clone(),
2077 );
2078
2079 streams.update_priority(&old_priority_key, &new_priority_key);
2080 }
2081
2082 let walk_1: Vec<u64> = streams.writable().collect();
2083 cycle_stream_priority(4, &mut streams);
2084 cycle_stream_priority(16, &mut streams);
2085 cycle_stream_priority(0, &mut streams);
2086 let walk_2: Vec<u64> = streams.writable().collect();
2087 cycle_stream_priority(12, &mut streams);
2088 cycle_stream_priority(24, &mut streams);
2089 cycle_stream_priority(8, &mut streams);
2090 let walk_3: Vec<u64> = streams.writable().collect();
2091 cycle_stream_priority(36, &mut streams);
2092 cycle_stream_priority(16, &mut streams);
2093 cycle_stream_priority(0, &mut streams);
2094 let walk_4: Vec<u64> = streams.writable().collect();
2095 cycle_stream_priority(4, &mut streams);
2096 cycle_stream_priority(24, &mut streams);
2097 cycle_stream_priority(8, &mut streams);
2098 let walk_5: Vec<u64> = streams.writable().collect();
2099 cycle_stream_priority(12, &mut streams);
2100 cycle_stream_priority(16, &mut streams);
2101 cycle_stream_priority(0, &mut streams);
2102 let walk_6: Vec<u64> = streams.writable().collect();
2103 cycle_stream_priority(36, &mut streams);
2104 cycle_stream_priority(24, &mut streams);
2105 cycle_stream_priority(8, &mut streams);
2106 let walk_7: Vec<u64> = streams.writable().collect();
2107 cycle_stream_priority(4, &mut streams);
2108 cycle_stream_priority(16, &mut streams);
2109 cycle_stream_priority(0, &mut streams);
2110 let walk_8: Vec<u64> = streams.writable().collect();
2111 cycle_stream_priority(12, &mut streams);
2112 cycle_stream_priority(24, &mut streams);
2113 cycle_stream_priority(8, &mut streams);
2114 let walk_9: Vec<u64> = streams.writable().collect();
2115 cycle_stream_priority(36, &mut streams);
2116 cycle_stream_priority(16, &mut streams);
2117 cycle_stream_priority(0, &mut streams);
2118
2119 assert_eq!(walk_1, vec![40, 4, 12, 36, 20, 28, 32, 16, 24, 0, 8]);
2120 assert_eq!(walk_2, vec![40, 12, 36, 4, 20, 28, 32, 24, 16, 8, 0]);
2121 assert_eq!(walk_3, vec![40, 36, 4, 12, 20, 28, 32, 16, 24, 0, 8]);
2122 assert_eq!(walk_4, vec![40, 4, 12, 36, 20, 28, 32, 24, 16, 8, 0]);
2123 assert_eq!(walk_5, vec![40, 12, 36, 4, 20, 28, 32, 16, 24, 0, 8]);
2124 assert_eq!(walk_6, vec![40, 36, 4, 12, 20, 28, 32, 24, 16, 8, 0]);
2125 assert_eq!(walk_7, vec![40, 4, 12, 36, 20, 28, 32, 16, 24, 0, 8]);
2126 assert_eq!(walk_8, vec![40, 12, 36, 4, 20, 28, 32, 24, 16, 8, 0]);
2127 assert_eq!(walk_9, vec![40, 36, 4, 12, 20, 28, 32, 16, 24, 0, 8]);
2128
2129 streams.collect(20, true);
2131
2132 let walk_10: Vec<u64> = streams.writable().collect();
2133 assert_eq!(walk_10, vec![40, 4, 12, 36, 28, 32, 24, 16, 8, 0]);
2134
2135 let stream = streams
2137 .get_or_create(44, &local_tp, &peer_tp, false, true)
2138 .unwrap();
2139
2140 stream.urgency = 20;
2141 stream.incremental = true;
2142
2143 let new_priority_key = Arc::new(StreamPriorityKey {
2144 urgency: stream.urgency,
2145 incremental: stream.incremental,
2146 id: 44,
2147 ..Default::default()
2148 });
2149
2150 let old_priority_key =
2151 std::mem::replace(&mut stream.priority_key, new_priority_key.clone());
2152
2153 streams.update_priority(&old_priority_key, &new_priority_key);
2154
2155 let walk_11: Vec<u64> = streams.writable().collect();
2156 assert_eq!(walk_11, vec![40, 4, 12, 36, 44, 28, 32, 24, 16, 8, 0]);
2157 }
2158
2159 #[test]
2160 fn priority_tree_dupes() {
2161 let mut prioritized_writable: RBTree<StreamWritablePriorityAdapter> =
2162 Default::default();
2163
2164 for id in [0, 4, 8, 12] {
2165 let s = Arc::new(StreamPriorityKey {
2166 urgency: 0,
2167 incremental: false,
2168 id,
2169 ..Default::default()
2170 });
2171
2172 prioritized_writable.insert(s);
2173 }
2174
2175 let walk_1: Vec<u64> =
2176 prioritized_writable.iter().map(|s| s.id).collect();
2177 assert_eq!(walk_1, vec![0, 4, 8, 12]);
2178
2179 for id in [0, 4, 8, 12] {
2182 let s = Arc::new(StreamPriorityKey {
2183 urgency: 0,
2184 incremental: false,
2185 id,
2186 ..Default::default()
2187 });
2188
2189 prioritized_writable.insert(s);
2190 }
2191
2192 let walk_2: Vec<u64> =
2193 prioritized_writable.iter().map(|s| s.id).collect();
2194 assert_eq!(walk_2, vec![0, 0, 4, 4, 8, 8, 12, 12]);
2195 }
2196}
2197
2198mod recv_buf;
2199mod send_buf;