1use std::cmp;
28
29use std::sync::Arc;
30
31use std::collections::hash_map;
32use std::collections::HashMap;
33use std::collections::HashSet;
34
35use intrusive_collections::intrusive_adapter;
36use intrusive_collections::KeyAdapter;
37use intrusive_collections::RBTree;
38use intrusive_collections::RBTreeAtomicLink;
39
40use smallvec::SmallVec;
41
42use crate::buffers::DefaultBufFactory;
43use crate::BufFactory;
44use crate::Error;
45use crate::Result;
46
47const DEFAULT_URGENCY: u8 = 127;
48
49const DEFAULT_STREAM_WINDOW: u64 = 32 * 1024;
51
52pub const MAX_STREAM_WINDOW: u64 = 16 * 1024 * 1024;
54
55#[derive(Default)]
60pub struct StreamIdHasher {
61 id: u64,
62}
63
64#[derive(Debug, PartialEq, Clone, Copy)]
66pub struct RecvBufResetReturn {
67 pub max_data_delta: u64,
70
71 pub consumed_flowcontrol: u64,
74}
75
76impl RecvBufResetReturn {
77 pub fn zero() -> Self {
78 Self {
79 max_data_delta: 0,
80 consumed_flowcontrol: 0,
81 }
82 }
83}
84
85pub enum RecvAction<'a> {
87 Emit { out: &'a mut [u8] },
89 Discard { len: usize },
91}
92
93impl std::hash::Hasher for StreamIdHasher {
94 #[inline]
95 fn finish(&self) -> u64 {
96 self.id
97 }
98
99 #[inline]
100 fn write_u64(&mut self, id: u64) {
101 self.id = id;
102 }
103
104 #[inline]
105 fn write(&mut self, _: &[u8]) {
106 unimplemented!()
109 }
110}
111
112type BuildStreamIdHasher = std::hash::BuildHasherDefault<StreamIdHasher>;
113
114pub type StreamIdHashMap<V> = HashMap<u64, V, BuildStreamIdHasher>;
115pub type StreamIdHashSet = HashSet<u64, BuildStreamIdHasher>;
116
117#[derive(Default)]
119pub struct StreamMap<F: BufFactory = DefaultBufFactory> {
120 streams: StreamIdHashMap<Stream<F>>,
122
123 collected: StreamIdHashSet,
129
130 peer_max_streams_bidi: u64,
132
133 peer_max_streams_uni: u64,
135
136 peer_opened_streams_bidi: u64,
138
139 peer_opened_streams_uni: u64,
141
142 local_max_streams_bidi: u64,
144 local_max_streams_bidi_next: u64,
145
146 initial_max_streams_bidi: u64,
148
149 local_max_streams_uni: u64,
151 local_max_streams_uni_next: u64,
152
153 initial_max_streams_uni: u64,
155
156 local_opened_streams_bidi: u64,
158
159 local_opened_streams_uni: u64,
161
162 flushable: RBTree<StreamFlushablePriorityAdapter>,
166
167 pub readable: RBTree<StreamReadablePriorityAdapter>,
171
172 pub writable: RBTree<StreamWritablePriorityAdapter>,
177
178 almost_full: StreamIdHashSet,
183
184 blocked: StreamIdHashMap<u64>,
188
189 reset: StreamIdHashMap<(u64, u64)>,
193
194 stopped: StreamIdHashMap<u64>,
198
199 max_stream_window: u64,
201}
202
203impl<F: BufFactory> StreamMap<F> {
204 pub fn new(
205 max_streams_bidi: u64, max_streams_uni: u64, max_stream_window: u64,
206 ) -> Self {
207 StreamMap {
208 local_max_streams_bidi: max_streams_bidi,
209 local_max_streams_bidi_next: max_streams_bidi,
210 initial_max_streams_bidi: max_streams_bidi,
211
212 local_max_streams_uni: max_streams_uni,
213 local_max_streams_uni_next: max_streams_uni,
214 initial_max_streams_uni: max_streams_uni,
215
216 max_stream_window,
217
218 ..StreamMap::default()
219 }
220 }
221
222 pub fn get(&self, id: u64) -> Option<&Stream<F>> {
224 self.streams.get(&id)
225 }
226
227 pub fn get_mut(&mut self, id: u64) -> Option<&mut Stream<F>> {
229 self.streams.get_mut(&id)
230 }
231
232 pub(crate) fn get_or_create(
245 &mut self, id: u64, local_params: &crate::TransportParams,
246 peer_params: &crate::TransportParams, local: bool, is_server: bool,
247 ) -> Result<&mut Stream<F>> {
248 let (stream, is_new_and_writable) = match self.streams.entry(id) {
249 hash_map::Entry::Vacant(v) => {
250 if self.collected.contains(&id) {
252 return Err(Error::Done);
253 }
254
255 if local != is_local(id, is_server) {
256 return Err(Error::InvalidStreamState(id));
257 }
258
259 let (max_rx_data, max_tx_data) = match (local, is_bidi(id)) {
260 (true, true) => (
262 local_params.initial_max_stream_data_bidi_local,
263 peer_params.initial_max_stream_data_bidi_remote,
264 ),
265
266 (true, false) => (0, peer_params.initial_max_stream_data_uni),
268
269 (false, true) => (
271 local_params.initial_max_stream_data_bidi_remote,
272 peer_params.initial_max_stream_data_bidi_local,
273 ),
274
275 (false, false) =>
277 (local_params.initial_max_stream_data_uni, 0),
278 };
279
280 let stream_sequence = id >> 2;
284
285 match (is_local(id, is_server), is_bidi(id)) {
287 (true, true) => {
288 let n = cmp::max(
289 self.local_opened_streams_bidi,
290 stream_sequence + 1,
291 );
292
293 if n > self.peer_max_streams_bidi {
294 return Err(Error::StreamLimit);
295 }
296
297 self.local_opened_streams_bidi = n;
298 },
299
300 (true, false) => {
301 let n = cmp::max(
302 self.local_opened_streams_uni,
303 stream_sequence + 1,
304 );
305
306 if n > self.peer_max_streams_uni {
307 return Err(Error::StreamLimit);
308 }
309
310 self.local_opened_streams_uni = n;
311 },
312
313 (false, true) => {
314 let n = cmp::max(
315 self.peer_opened_streams_bidi,
316 stream_sequence + 1,
317 );
318
319 if n > self.local_max_streams_bidi {
320 return Err(Error::StreamLimit);
321 }
322
323 self.peer_opened_streams_bidi = n;
324 },
325
326 (false, false) => {
327 let n = cmp::max(
328 self.peer_opened_streams_uni,
329 stream_sequence + 1,
330 );
331
332 if n > self.local_max_streams_uni {
333 return Err(Error::StreamLimit);
334 }
335
336 self.peer_opened_streams_uni = n;
337 },
338 };
339
340 let s = Stream::new(
341 id,
342 max_rx_data,
343 max_tx_data,
344 is_bidi(id),
345 local,
346 self.max_stream_window,
347 );
348
349 let is_writable = s.is_writable();
350
351 (v.insert(s), is_writable)
352 },
353
354 hash_map::Entry::Occupied(v) => (v.into_mut(), false),
355 };
356
357 if is_new_and_writable {
360 self.writable.insert(Arc::clone(&stream.priority_key));
361 }
362
363 Ok(stream)
364 }
365
366 pub fn insert_readable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
370 if !priority_key.readable.is_linked() {
371 self.readable.insert(Arc::clone(priority_key));
372 }
373 }
374
375 pub fn remove_readable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
377 if !priority_key.readable.is_linked() {
378 return;
379 }
380
381 let mut c = {
382 let ptr = Arc::as_ptr(priority_key);
383 unsafe { self.readable.cursor_mut_from_ptr(ptr) }
384 };
385
386 c.remove();
387 }
388
389 pub fn insert_writable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
396 if !priority_key.writable.is_linked() {
397 self.writable.insert(Arc::clone(priority_key));
398 }
399 }
400
401 pub fn remove_writable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
406 if !priority_key.writable.is_linked() {
407 return;
408 }
409
410 let mut c = {
411 let ptr = Arc::as_ptr(priority_key);
412 unsafe { self.writable.cursor_mut_from_ptr(ptr) }
413 };
414
415 c.remove();
416 }
417
418 pub fn insert_flushable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
422 if !priority_key.flushable.is_linked() {
423 self.flushable.insert(Arc::clone(priority_key));
424 }
425 }
426
427 pub fn remove_flushable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
429 if !priority_key.flushable.is_linked() {
430 return;
431 }
432
433 let mut c = {
434 let ptr = Arc::as_ptr(priority_key);
435 unsafe { self.flushable.cursor_mut_from_ptr(ptr) }
436 };
437
438 c.remove();
439 }
440
441 pub fn peek_flushable(&self) -> Option<Arc<StreamPriorityKey>> {
442 self.flushable.front().clone_pointer()
443 }
444
445 pub fn update_priority(
447 &mut self, old: &Arc<StreamPriorityKey>, new: &Arc<StreamPriorityKey>,
448 ) {
449 if old.readable.is_linked() {
450 self.remove_readable(old);
451 self.readable.insert(Arc::clone(new));
452 }
453
454 if old.writable.is_linked() {
455 self.remove_writable(old);
456 self.writable.insert(Arc::clone(new));
457 }
458
459 if old.flushable.is_linked() {
460 self.remove_flushable(old);
461 self.flushable.insert(Arc::clone(new));
462 }
463 }
464
465 pub fn insert_almost_full(&mut self, stream_id: u64) {
469 self.almost_full.insert(stream_id);
470 }
471
472 pub fn remove_almost_full(&mut self, stream_id: u64) {
474 self.almost_full.remove(&stream_id);
475 }
476
477 pub fn insert_blocked(&mut self, stream_id: u64, off: u64) {
482 self.blocked.insert(stream_id, off);
483 }
484
485 pub fn remove_blocked(&mut self, stream_id: u64) {
487 self.blocked.remove(&stream_id);
488 }
489
490 pub fn insert_reset(
495 &mut self, stream_id: u64, error_code: u64, final_size: u64,
496 ) {
497 self.reset.insert(stream_id, (error_code, final_size));
498 }
499
500 pub fn remove_reset(&mut self, stream_id: u64) {
502 self.reset.remove(&stream_id);
503 }
504
505 pub fn insert_stopped(&mut self, stream_id: u64, error_code: u64) {
510 self.stopped.insert(stream_id, error_code);
511 }
512
513 pub fn remove_stopped(&mut self, stream_id: u64) {
515 self.stopped.remove(&stream_id);
516 }
517
518 pub fn update_peer_max_streams_bidi(&mut self, v: u64) {
520 self.peer_max_streams_bidi = cmp::max(self.peer_max_streams_bidi, v);
521 }
522
523 pub fn update_peer_max_streams_uni(&mut self, v: u64) {
525 self.peer_max_streams_uni = cmp::max(self.peer_max_streams_uni, v);
526 }
527
528 pub fn update_max_streams_bidi(&mut self) {
530 self.local_max_streams_bidi = self.local_max_streams_bidi_next;
531 }
532
533 pub fn set_max_streams_bidi(&mut self, max: u64) {
535 self.local_max_streams_bidi = max;
536 self.local_max_streams_bidi_next = max;
537 self.initial_max_streams_bidi = max;
538 }
539
540 pub fn max_streams_bidi(&self) -> u64 {
542 self.local_max_streams_bidi
543 }
544
545 pub fn max_streams_bidi_next(&mut self) -> u64 {
547 self.local_max_streams_bidi_next
548 }
549
550 pub fn update_max_streams_uni(&mut self) {
552 self.local_max_streams_uni = self.local_max_streams_uni_next;
553 }
554
555 pub fn max_streams_uni_next(&mut self) -> u64 {
557 self.local_max_streams_uni_next
558 }
559
560 pub fn peer_max_streams_bidi(&self) -> u64 {
562 self.peer_max_streams_bidi
563 }
564
565 pub fn peer_streams_left_bidi(&self) -> u64 {
568 self.peer_max_streams_bidi - self.local_opened_streams_bidi
569 }
570
571 pub fn peer_max_streams_uni(&self) -> u64 {
573 self.peer_max_streams_uni
574 }
575
576 pub fn peer_streams_left_uni(&self) -> u64 {
579 self.peer_max_streams_uni - self.local_opened_streams_uni
580 }
581
582 pub fn collect(&mut self, stream_id: u64, local: bool) {
587 if !local {
588 if is_bidi(stream_id) {
591 self.local_max_streams_bidi_next =
592 self.local_max_streams_bidi_next.saturating_add(1);
593 } else {
594 self.local_max_streams_uni_next =
595 self.local_max_streams_uni_next.saturating_add(1);
596 }
597 }
598
599 let s = self.streams.remove(&stream_id).unwrap();
600
601 self.remove_readable(&s.priority_key);
602
603 self.remove_writable(&s.priority_key);
604
605 self.remove_flushable(&s.priority_key);
606
607 self.collected.insert(stream_id);
608 }
609
610 pub fn readable(&self) -> StreamIter {
612 StreamIter {
613 streams: self.readable.iter().map(|s| s.id).collect(),
614 index: 0,
615 }
616 }
617
618 pub fn writable(&self) -> StreamIter {
620 StreamIter {
621 streams: self.writable.iter().map(|s| s.id).collect(),
622 index: 0,
623 }
624 }
625
626 pub fn almost_full(&self) -> StreamIter {
628 StreamIter::from(&self.almost_full)
629 }
630
631 pub fn blocked(&self) -> hash_map::Iter<'_, u64, u64> {
633 self.blocked.iter()
634 }
635
636 pub fn reset(&self) -> hash_map::Iter<'_, u64, (u64, u64)> {
638 self.reset.iter()
639 }
640
641 pub fn stopped(&self) -> hash_map::Iter<'_, u64, u64> {
643 self.stopped.iter()
644 }
645
646 pub fn is_collected(&self, stream_id: u64) -> bool {
648 self.collected.contains(&stream_id)
649 }
650
651 pub fn has_flushable(&self) -> bool {
653 !self.flushable.is_empty()
654 }
655
656 pub fn has_readable(&self) -> bool {
658 !self.readable.is_empty()
659 }
660
661 pub fn has_almost_full(&self) -> bool {
664 !self.almost_full.is_empty()
665 }
666
667 pub fn has_blocked(&self) -> bool {
669 !self.blocked.is_empty()
670 }
671
672 pub fn has_reset(&self) -> bool {
674 !self.reset.is_empty()
675 }
676
677 pub fn has_stopped(&self) -> bool {
679 !self.stopped.is_empty()
680 }
681
682 pub fn should_update_max_streams_bidi(&self) -> bool {
688 let available = self
689 .local_max_streams_bidi
690 .saturating_sub(self.peer_opened_streams_bidi);
691 self.local_max_streams_bidi_next != self.local_max_streams_bidi &&
692 available <= self.initial_max_streams_bidi / 2
693 }
694
695 pub fn should_update_max_streams_uni(&self) -> bool {
701 let available = self
702 .local_max_streams_uni
703 .saturating_sub(self.peer_opened_streams_uni);
704 self.local_max_streams_uni_next != self.local_max_streams_uni &&
705 available <= self.initial_max_streams_uni / 2
706 }
707
708 #[cfg(test)]
710 pub fn len(&self) -> usize {
711 self.streams.len()
712 }
713}
714
715pub struct Stream<F: BufFactory = DefaultBufFactory> {
717 pub recv: recv_buf::RecvBuf,
719
720 pub send: send_buf::SendBuf<F>,
722
723 pub send_lowat: usize,
724
725 pub bidi: bool,
727
728 pub local: bool,
730
731 pub urgency: u8,
733
734 pub incremental: bool,
736
737 pub priority_key: Arc<StreamPriorityKey>,
738}
739
740impl<F: BufFactory> Stream<F> {
741 pub fn new(
743 id: u64, max_rx_data: u64, max_tx_data: u64, bidi: bool, local: bool,
744 max_window: u64,
745 ) -> Self {
746 let priority_key = Arc::new(StreamPriorityKey {
747 id,
748 ..Default::default()
749 });
750
751 Stream {
752 recv: recv_buf::RecvBuf::new(max_rx_data, max_window),
753 send: send_buf::SendBuf::new(max_tx_data),
754 send_lowat: 1,
755 bidi,
756 local,
757 urgency: priority_key.urgency,
758 incremental: priority_key.incremental,
759 priority_key,
760 }
761 }
762
763 pub fn is_readable(&self) -> bool {
765 self.recv.ready()
766 }
767
768 pub fn is_writable(&self) -> bool {
771 !self.send.is_shutdown() &&
772 !self.send.is_fin() &&
773 (self.send.off_back() + self.send_lowat as u64) <
774 self.send.max_off()
775 }
776
777 pub fn is_flushable(&self) -> bool {
780 let off_front = self.send.off_front();
781
782 !self.send.is_empty() &&
783 off_front < self.send.off_back() &&
784 off_front < self.send.max_off()
785 }
786
787 pub fn is_complete(&self) -> bool {
797 match (self.bidi, self.local) {
798 (true, _) => self.recv.is_fin() && self.send.is_complete(),
801
802 (false, true) => self.send.is_complete(),
805
806 (false, false) => self.recv.is_fin(),
809 }
810 }
811}
812
813pub fn is_local(stream_id: u64, is_server: bool) -> bool {
815 (stream_id & 0x1) == (is_server as u64)
816}
817
818pub fn is_bidi(stream_id: u64) -> bool {
820 (stream_id & 0x2) == 0
821}
822
823#[derive(Clone, Debug)]
824pub struct StreamPriorityKey {
825 pub urgency: u8,
826 pub incremental: bool,
827 pub id: u64,
828
829 pub readable: RBTreeAtomicLink,
830 pub writable: RBTreeAtomicLink,
831 pub flushable: RBTreeAtomicLink,
832}
833
834impl Default for StreamPriorityKey {
835 fn default() -> Self {
836 Self {
837 urgency: DEFAULT_URGENCY,
838 incremental: true,
839 id: Default::default(),
840 readable: Default::default(),
841 writable: Default::default(),
842 flushable: Default::default(),
843 }
844 }
845}
846
847impl PartialEq for StreamPriorityKey {
848 fn eq(&self, other: &Self) -> bool {
849 self.id == other.id
850 }
851}
852
853impl Eq for StreamPriorityKey {}
854
855impl PartialOrd for StreamPriorityKey {
856 fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
857 Some(self.cmp(other))
858 }
859}
860
861impl Ord for StreamPriorityKey {
862 fn cmp(&self, other: &Self) -> cmp::Ordering {
863 if self.id == other.id {
865 return cmp::Ordering::Equal;
866 }
867
868 if self.urgency != other.urgency {
870 return self.urgency.cmp(&other.urgency);
871 }
872
873 if !self.incremental && !other.incremental {
876 return self.id.cmp(&other.id);
877 }
878
879 if self.incremental && !other.incremental {
881 return cmp::Ordering::Greater;
882 }
883 if !self.incremental && other.incremental {
884 return cmp::Ordering::Less;
885 }
886
887 cmp::Ordering::Greater
891 }
892}
893
894intrusive_adapter!(pub StreamWritablePriorityAdapter = Arc<StreamPriorityKey>: StreamPriorityKey { writable: RBTreeAtomicLink });
895
896impl KeyAdapter<'_> for StreamWritablePriorityAdapter {
897 type Key = StreamPriorityKey;
898
899 fn get_key(&self, s: &StreamPriorityKey) -> Self::Key {
900 s.clone()
901 }
902}
903
904intrusive_adapter!(pub StreamReadablePriorityAdapter = Arc<StreamPriorityKey>: StreamPriorityKey { readable: RBTreeAtomicLink });
905
906impl KeyAdapter<'_> for StreamReadablePriorityAdapter {
907 type Key = StreamPriorityKey;
908
909 fn get_key(&self, s: &StreamPriorityKey) -> Self::Key {
910 s.clone()
911 }
912}
913
914intrusive_adapter!(pub StreamFlushablePriorityAdapter = Arc<StreamPriorityKey>: StreamPriorityKey { flushable: RBTreeAtomicLink });
915
916impl KeyAdapter<'_> for StreamFlushablePriorityAdapter {
917 type Key = StreamPriorityKey;
918
919 fn get_key(&self, s: &StreamPriorityKey) -> Self::Key {
920 s.clone()
921 }
922}
923
924#[derive(Default)]
926pub struct StreamIter {
927 streams: SmallVec<[u64; 8]>,
928 index: usize,
929}
930
931impl StreamIter {
932 #[inline]
933 fn from(streams: &StreamIdHashSet) -> Self {
934 StreamIter {
935 streams: streams.iter().copied().collect(),
936 index: 0,
937 }
938 }
939}
940
941impl Iterator for StreamIter {
942 type Item = u64;
943
944 #[inline]
945 fn next(&mut self) -> Option<Self::Item> {
946 let v = self.streams.get(self.index)?;
947 self.index += 1;
948 Some(*v)
949 }
950}
951
952impl ExactSizeIterator for StreamIter {
953 #[inline]
954 fn len(&self) -> usize {
955 self.streams.len() - self.index
956 }
957}
958
959#[cfg(test)]
960mod tests {
961 use crate::range_buf::RangeBuf;
962
963 use super::*;
964
965 #[test]
966 fn recv_flow_control() {
967 let mut stream =
968 <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
969 assert!(!stream.recv.almost_full());
970
971 let mut buf = [0; 32];
972
973 let first = RangeBuf::from(b"hello", 0, false);
974 let second = RangeBuf::from(b"world", 5, false);
975 let third = RangeBuf::from(b"something", 10, false);
976
977 assert_eq!(stream.recv.write(second), Ok(()));
978 assert_eq!(stream.recv.write(first), Ok(()));
979 assert!(!stream.recv.almost_full());
980
981 assert_eq!(stream.recv.write(third), Err(Error::FlowControl));
982
983 let (len, fin) = stream.recv.emit(&mut buf).unwrap();
984 assert_eq!(&buf[..len], b"helloworld");
985 assert!(!fin);
986
987 assert!(stream.recv.almost_full());
988
989 stream.recv.update_max_data(std::time::Instant::now());
990 assert_eq!(stream.recv.max_data_next(), 25);
991 assert!(!stream.recv.almost_full());
992
993 let third = RangeBuf::from(b"something", 10, false);
994 assert_eq!(stream.recv.write(third), Ok(()));
995 }
996
997 #[test]
998 fn recv_past_fin() {
999 let mut stream =
1000 <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1001 assert!(!stream.recv.almost_full());
1002
1003 let first = RangeBuf::from(b"hello", 0, true);
1004 let second = RangeBuf::from(b"world", 5, false);
1005
1006 assert_eq!(stream.recv.write(first), Ok(()));
1007 assert_eq!(stream.recv.write(second), Err(Error::FinalSize));
1008 }
1009
1010 #[test]
1011 fn recv_fin_dup() {
1012 let mut stream =
1013 <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1014 assert!(!stream.recv.almost_full());
1015
1016 let first = RangeBuf::from(b"hello", 0, true);
1017 let second = RangeBuf::from(b"hello", 0, true);
1018
1019 assert_eq!(stream.recv.write(first), Ok(()));
1020 assert_eq!(stream.recv.write(second), Ok(()));
1021
1022 let mut buf = [0; 32];
1023
1024 let (len, fin) = stream.recv.emit(&mut buf).unwrap();
1025 assert_eq!(&buf[..len], b"hello");
1026 assert!(fin);
1027 }
1028
1029 #[test]
1030 fn recv_fin_change() {
1031 let mut stream =
1032 <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1033 assert!(!stream.recv.almost_full());
1034
1035 let first = RangeBuf::from(b"hello", 0, true);
1036 let second = RangeBuf::from(b"world", 5, true);
1037
1038 assert_eq!(stream.recv.write(second), Ok(()));
1039 assert_eq!(stream.recv.write(first), Err(Error::FinalSize));
1040 }
1041
1042 #[test]
1043 fn recv_fin_lower_than_received() {
1044 let mut stream =
1045 <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1046 assert!(!stream.recv.almost_full());
1047
1048 let first = RangeBuf::from(b"hello", 0, true);
1049 let second = RangeBuf::from(b"world", 5, false);
1050
1051 assert_eq!(stream.recv.write(second), Ok(()));
1052 assert_eq!(stream.recv.write(first), Err(Error::FinalSize));
1053 }
1054
1055 #[test]
1056 fn recv_fin_flow_control() {
1057 let mut stream =
1058 <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1059 assert!(!stream.recv.almost_full());
1060
1061 let mut buf = [0; 32];
1062
1063 let first = RangeBuf::from(b"hello", 0, false);
1064 let second = RangeBuf::from(b"world", 5, true);
1065
1066 assert_eq!(stream.recv.write(first), Ok(()));
1067 assert_eq!(stream.recv.write(second), Ok(()));
1068
1069 let (len, fin) = stream.recv.emit(&mut buf).unwrap();
1070 assert_eq!(&buf[..len], b"helloworld");
1071 assert!(fin);
1072
1073 assert!(!stream.recv.almost_full());
1074 }
1075
1076 #[test]
1077 fn recv_fin_reset_mismatch() {
1078 let mut stream =
1079 <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1080 assert!(!stream.recv.almost_full());
1081
1082 let first = RangeBuf::from(b"hello", 0, true);
1083
1084 assert_eq!(stream.recv.write(first), Ok(()));
1085 assert_eq!(stream.recv.reset(0, 10), Err(Error::FinalSize));
1086 }
1087
1088 #[test]
1089 fn recv_reset_with_gap() {
1090 let mut stream =
1091 <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1092 assert!(!stream.recv.almost_full());
1093
1094 let first = RangeBuf::from(b"hello", 0, false);
1095
1096 assert_eq!(stream.recv.write(first), Ok(()));
1097 assert_eq!(stream.recv.emit(&mut [0; 1]), Ok((1, false)));
1099 assert_eq!(
1101 stream.recv.reset(0, 10),
1102 Ok(RecvBufResetReturn {
1103 max_data_delta: 5,
1104 consumed_flowcontrol: 9
1106 })
1107 );
1108 assert_eq!(stream.recv.reset(0, 10), Ok(RecvBufResetReturn::zero()));
1109 }
1110
1111 #[test]
1112 fn recv_reset_dup() {
1113 let mut stream =
1114 <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1115 assert!(!stream.recv.almost_full());
1116
1117 let first = RangeBuf::from(b"hello", 0, false);
1118
1119 assert_eq!(stream.recv.write(first), Ok(()));
1120 assert_eq!(
1121 stream.recv.reset(0, 5),
1122 Ok(RecvBufResetReturn {
1123 max_data_delta: 0,
1124 consumed_flowcontrol: 5
1125 })
1126 );
1127 assert_eq!(stream.recv.reset(0, 5), Ok(RecvBufResetReturn::zero()));
1128 }
1129
1130 #[test]
1131 fn recv_reset_change() {
1132 let mut stream =
1133 <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1134 assert!(!stream.recv.almost_full());
1135
1136 let first = RangeBuf::from(b"hello", 0, false);
1137
1138 assert_eq!(stream.recv.write(first), Ok(()));
1139 assert_eq!(
1140 stream.recv.reset(0, 5),
1141 Ok(RecvBufResetReturn {
1142 max_data_delta: 0,
1143 consumed_flowcontrol: 5
1144 })
1145 );
1146 assert_eq!(stream.recv.reset(0, 10), Err(Error::FinalSize));
1147 }
1148
1149 #[test]
1150 fn recv_reset_lower_than_received() {
1151 let mut stream =
1152 <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1153 assert!(!stream.recv.almost_full());
1154
1155 let first = RangeBuf::from(b"hello", 0, false);
1156
1157 assert_eq!(stream.recv.write(first), Ok(()));
1158 assert_eq!(stream.recv.reset(0, 4), Err(Error::FinalSize));
1159 }
1160
1161 #[test]
1162 fn send_flow_control() {
1163 let mut buf = [0; 25];
1164
1165 let mut stream =
1166 <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1167
1168 let first = b"hello";
1169 let second = b"world";
1170 let third = b"something";
1171
1172 assert!(stream.send.write(first, false).is_ok());
1173 assert!(stream.send.write(second, false).is_ok());
1174 assert!(stream.send.write(third, false).is_ok());
1175
1176 assert_eq!(stream.send.off_front(), 0);
1177
1178 let (written, fin) = stream.send.emit(&mut buf[..25]).unwrap();
1179 assert_eq!(written, 15);
1180 assert!(!fin);
1181 assert_eq!(&buf[..written], b"helloworldsomet");
1182
1183 assert_eq!(stream.send.off_front(), 15);
1184
1185 let (written, fin) = stream.send.emit(&mut buf[..25]).unwrap();
1186 assert_eq!(written, 0);
1187 assert!(!fin);
1188 assert_eq!(&buf[..written], b"");
1189
1190 stream.send.retransmit(0, 15);
1191
1192 assert_eq!(stream.send.off_front(), 0);
1193
1194 let (written, fin) = stream.send.emit(&mut buf[..10]).unwrap();
1195 assert_eq!(written, 10);
1196 assert!(!fin);
1197 assert_eq!(&buf[..written], b"helloworld");
1198
1199 assert_eq!(stream.send.off_front(), 10);
1200
1201 let (written, fin) = stream.send.emit(&mut buf[..10]).unwrap();
1202 assert_eq!(written, 5);
1203 assert!(!fin);
1204 assert_eq!(&buf[..written], b"somet");
1205 }
1206
1207 #[test]
1208 fn send_past_fin() {
1209 let mut stream =
1210 <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1211
1212 let first = b"hello";
1213 let second = b"world";
1214 let third = b"third";
1215
1216 assert_eq!(stream.send.write(first, false), Ok(5));
1217
1218 assert_eq!(stream.send.write(second, true), Ok(5));
1219 assert!(stream.send.is_fin());
1220
1221 assert_eq!(stream.send.write(third, false), Err(Error::FinalSize));
1222 }
1223
1224 #[test]
1225 fn send_fin_dup() {
1226 let mut stream =
1227 <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1228
1229 assert_eq!(stream.send.write(b"hello", true), Ok(5));
1230 assert!(stream.send.is_fin());
1231
1232 assert_eq!(stream.send.write(b"", true), Ok(0));
1233 assert!(stream.send.is_fin());
1234 }
1235
1236 #[test]
1237 fn send_undo_fin() {
1238 let mut stream =
1239 <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1240
1241 assert_eq!(stream.send.write(b"hello", true), Ok(5));
1242 assert!(stream.send.is_fin());
1243
1244 assert_eq!(
1245 stream.send.write(b"helloworld", true),
1246 Err(Error::FinalSize)
1247 );
1248 }
1249
1250 #[test]
1251 fn send_fin_max_data_match() {
1252 let mut buf = [0; 15];
1253
1254 let mut stream =
1255 <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1256
1257 let slice = b"hellohellohello";
1258
1259 assert!(stream.send.write(slice, true).is_ok());
1260
1261 let (written, fin) = stream.send.emit(&mut buf[..15]).unwrap();
1262 assert_eq!(written, 15);
1263 assert!(fin);
1264 assert_eq!(&buf[..written], slice);
1265 }
1266
1267 #[test]
1268 fn send_fin_zero_length() {
1269 let mut buf = [0; 5];
1270
1271 let mut stream =
1272 <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1273
1274 assert_eq!(stream.send.write(b"hello", false), Ok(5));
1275 assert_eq!(stream.send.write(b"", true), Ok(0));
1276 assert!(stream.send.is_fin());
1277
1278 let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1279 assert_eq!(written, 5);
1280 assert!(fin);
1281 assert_eq!(&buf[..written], b"hello");
1282 }
1283
1284 #[test]
1285 fn send_ack() {
1286 let mut buf = [0; 5];
1287
1288 let mut stream =
1289 <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1290
1291 assert_eq!(stream.send.write(b"hello", false), Ok(5));
1292 assert_eq!(stream.send.write(b"world", false), Ok(5));
1293 assert_eq!(stream.send.write(b"", true), Ok(0));
1294 assert!(stream.send.is_fin());
1295
1296 assert_eq!(stream.send.off_front(), 0);
1297
1298 let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1299 assert_eq!(written, 5);
1300 assert!(!fin);
1301 assert_eq!(&buf[..written], b"hello");
1302
1303 stream.send.ack_and_drop(0, 5);
1304
1305 stream.send.retransmit(0, 5);
1306
1307 assert_eq!(stream.send.off_front(), 5);
1308
1309 let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1310 assert_eq!(written, 5);
1311 assert!(fin);
1312 assert_eq!(&buf[..written], b"world");
1313 }
1314
1315 #[test]
1316 fn send_ack_reordering() {
1317 let mut buf = [0; 5];
1318
1319 let mut stream =
1320 <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1321
1322 assert_eq!(stream.send.write(b"hello", false), Ok(5));
1323 assert_eq!(stream.send.write(b"world", false), Ok(5));
1324 assert_eq!(stream.send.write(b"", true), Ok(0));
1325 assert!(stream.send.is_fin());
1326
1327 assert_eq!(stream.send.off_front(), 0);
1328
1329 let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1330 assert_eq!(written, 5);
1331 assert!(!fin);
1332 assert_eq!(&buf[..written], b"hello");
1333
1334 assert_eq!(stream.send.off_front(), 5);
1335
1336 let (written, fin) = stream.send.emit(&mut buf[..1]).unwrap();
1337 assert_eq!(written, 1);
1338 assert!(!fin);
1339 assert_eq!(&buf[..written], b"w");
1340
1341 stream.send.ack_and_drop(5, 1);
1342 stream.send.ack_and_drop(0, 5);
1343
1344 stream.send.retransmit(0, 5);
1345 stream.send.retransmit(5, 1);
1346
1347 assert_eq!(stream.send.off_front(), 6);
1348
1349 let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1350 assert_eq!(written, 4);
1351 assert!(fin);
1352 assert_eq!(&buf[..written], b"orld");
1353 }
1354
1355 #[test]
1356 fn recv_data_below_off() {
1357 let mut stream =
1358 <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1359
1360 let first = RangeBuf::from(b"hello", 0, false);
1361
1362 assert_eq!(stream.recv.write(first), Ok(()));
1363
1364 let mut buf = [0; 10];
1365
1366 let (len, fin) = stream.recv.emit(&mut buf).unwrap();
1367 assert_eq!(&buf[..len], b"hello");
1368 assert!(!fin);
1369
1370 let first = RangeBuf::from(b"elloworld", 1, true);
1371 assert_eq!(stream.recv.write(first), Ok(()));
1372
1373 let (len, fin) = stream.recv.emit(&mut buf).unwrap();
1374 assert_eq!(&buf[..len], b"world");
1375 assert!(fin);
1376 }
1377
1378 #[test]
1379 fn stream_complete() {
1380 let mut stream =
1381 <Stream>::new(0, 30, 30, true, true, DEFAULT_STREAM_WINDOW);
1382
1383 assert_eq!(stream.send.write(b"hello", false), Ok(5));
1384 assert_eq!(stream.send.write(b"world", false), Ok(5));
1385
1386 assert!(!stream.send.is_complete());
1387 assert!(!stream.send.is_fin());
1388
1389 assert_eq!(stream.send.write(b"", true), Ok(0));
1390
1391 assert!(!stream.send.is_complete());
1392 assert!(stream.send.is_fin());
1393
1394 let buf = RangeBuf::from(b"hello", 0, true);
1395 assert!(stream.recv.write(buf).is_ok());
1396 assert!(!stream.recv.is_fin());
1397
1398 stream.send.ack(6, 4);
1399 assert!(!stream.send.is_complete());
1400
1401 let mut buf = [0; 2];
1402 assert_eq!(stream.recv.emit(&mut buf), Ok((2, false)));
1403 assert!(!stream.recv.is_fin());
1404
1405 stream.send.ack(1, 5);
1406 assert!(!stream.send.is_complete());
1407
1408 stream.send.ack(0, 1);
1409 assert!(stream.send.is_complete());
1410
1411 assert!(!stream.is_complete());
1412
1413 let mut buf = [0; 3];
1414 assert_eq!(stream.recv.emit(&mut buf), Ok((3, true)));
1415 assert!(stream.recv.is_fin());
1416
1417 assert!(stream.is_complete());
1418 }
1419
1420 #[test]
1421 fn send_fin_zero_length_output() {
1422 let mut buf = [0; 5];
1423
1424 let mut stream =
1425 <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1426
1427 assert_eq!(stream.send.write(b"hello", false), Ok(5));
1428 assert_eq!(stream.send.off_front(), 0);
1429 assert!(!stream.send.is_fin());
1430
1431 let (written, fin) = stream.send.emit(&mut buf).unwrap();
1432 assert_eq!(written, 5);
1433 assert!(!fin);
1434 assert_eq!(&buf[..written], b"hello");
1435
1436 assert_eq!(stream.send.write(b"", true), Ok(0));
1437 assert!(stream.send.is_fin());
1438 assert_eq!(stream.send.off_front(), 5);
1439
1440 let (written, fin) = stream.send.emit(&mut buf).unwrap();
1441 assert_eq!(written, 0);
1442 assert!(fin);
1443 assert_eq!(&buf[..written], b"");
1444 }
1445
1446 fn stream_send_ready(stream: &Stream) -> bool {
1447 !stream.send.is_empty() &&
1448 stream.send.off_front() < stream.send.off_back()
1449 }
1450
1451 #[test]
1452 fn send_emit() {
1453 let mut buf = [0; 5];
1454
1455 let mut stream =
1456 <Stream>::new(0, 0, 20, true, true, DEFAULT_STREAM_WINDOW);
1457
1458 assert_eq!(stream.send.write(b"hello", false), Ok(5));
1459 assert_eq!(stream.send.write(b"world", false), Ok(5));
1460 assert_eq!(stream.send.write(b"olleh", false), Ok(5));
1461 assert_eq!(stream.send.write(b"dlrow", true), Ok(5));
1462 assert_eq!(stream.send.off_front(), 0);
1463 assert_eq!(stream.send.bufs_count(), 4);
1464
1465 assert!(stream.is_flushable());
1466
1467 assert!(stream_send_ready(&stream));
1468 assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1469 assert_eq!(stream.send.off_front(), 4);
1470 assert_eq!(&buf[..4], b"hell");
1471
1472 assert!(stream_send_ready(&stream));
1473 assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1474 assert_eq!(stream.send.off_front(), 8);
1475 assert_eq!(&buf[..4], b"owor");
1476
1477 assert!(stream_send_ready(&stream));
1478 assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1479 assert_eq!(stream.send.off_front(), 10);
1480 assert_eq!(&buf[..2], b"ld");
1481
1482 assert!(stream_send_ready(&stream));
1483 assert_eq!(stream.send.emit(&mut buf[..1]), Ok((1, false)));
1484 assert_eq!(stream.send.off_front(), 11);
1485 assert_eq!(&buf[..1], b"o");
1486
1487 assert!(stream_send_ready(&stream));
1488 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1489 assert_eq!(stream.send.off_front(), 16);
1490 assert_eq!(&buf[..5], b"llehd");
1491
1492 assert!(stream_send_ready(&stream));
1493 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((4, true)));
1494 assert_eq!(stream.send.off_front(), 20);
1495 assert_eq!(&buf[..4], b"lrow");
1496
1497 assert!(!stream.is_flushable());
1498
1499 assert!(!stream_send_ready(&stream));
1500 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((0, true)));
1501 assert_eq!(stream.send.off_front(), 20);
1502 }
1503
1504 #[test]
1505 fn send_emit_ack() {
1506 let mut buf = [0; 5];
1507
1508 let mut stream =
1509 <Stream>::new(0, 0, 20, true, true, DEFAULT_STREAM_WINDOW);
1510
1511 assert_eq!(stream.send.write(b"hello", false), Ok(5));
1512 assert_eq!(stream.send.write(b"world", false), Ok(5));
1513 assert_eq!(stream.send.write(b"olleh", false), Ok(5));
1514 assert_eq!(stream.send.write(b"dlrow", true), Ok(5));
1515 assert_eq!(stream.send.off_front(), 0);
1516 assert_eq!(stream.send.bufs_count(), 4);
1517
1518 assert!(stream.is_flushable());
1519
1520 assert!(stream_send_ready(&stream));
1521 assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1522 assert_eq!(stream.send.off_front(), 4);
1523 assert_eq!(&buf[..4], b"hell");
1524
1525 assert!(stream_send_ready(&stream));
1526 assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1527 assert_eq!(stream.send.off_front(), 8);
1528 assert_eq!(&buf[..4], b"owor");
1529
1530 stream.send.ack_and_drop(0, 5);
1531 assert_eq!(stream.send.bufs_count(), 3);
1532
1533 assert!(stream_send_ready(&stream));
1534 assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1535 assert_eq!(stream.send.off_front(), 10);
1536 assert_eq!(&buf[..2], b"ld");
1537
1538 stream.send.ack_and_drop(7, 5);
1539 assert_eq!(stream.send.bufs_count(), 3);
1540
1541 assert!(stream_send_ready(&stream));
1542 assert_eq!(stream.send.emit(&mut buf[..1]), Ok((1, false)));
1543 assert_eq!(stream.send.off_front(), 11);
1544 assert_eq!(&buf[..1], b"o");
1545
1546 assert!(stream_send_ready(&stream));
1547 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1548 assert_eq!(stream.send.off_front(), 16);
1549 assert_eq!(&buf[..5], b"llehd");
1550
1551 stream.send.ack_and_drop(5, 7);
1552 assert_eq!(stream.send.bufs_count(), 2);
1553
1554 assert!(stream_send_ready(&stream));
1555 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((4, true)));
1556 assert_eq!(stream.send.off_front(), 20);
1557 assert_eq!(&buf[..4], b"lrow");
1558
1559 assert!(!stream.is_flushable());
1560
1561 assert!(!stream_send_ready(&stream));
1562 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((0, true)));
1563 assert_eq!(stream.send.off_front(), 20);
1564
1565 stream.send.ack_and_drop(22, 4);
1566 assert_eq!(stream.send.bufs_count(), 2);
1567
1568 stream.send.ack_and_drop(20, 1);
1569 assert_eq!(stream.send.bufs_count(), 2);
1570 }
1571
1572 #[test]
1573 fn send_emit_retransmit() {
1574 let mut buf = [0; 5];
1575
1576 let mut stream =
1577 <Stream>::new(0, 0, 20, true, true, DEFAULT_STREAM_WINDOW);
1578
1579 assert_eq!(stream.send.write(b"hello", false), Ok(5));
1580 assert_eq!(stream.send.write(b"world", false), Ok(5));
1581 assert_eq!(stream.send.write(b"olleh", false), Ok(5));
1582 assert_eq!(stream.send.write(b"dlrow", true), Ok(5));
1583 assert_eq!(stream.send.off_front(), 0);
1584 assert_eq!(stream.send.bufs_count(), 4);
1585
1586 assert!(stream.is_flushable());
1587
1588 assert!(stream_send_ready(&stream));
1589 assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1590 assert_eq!(stream.send.off_front(), 4);
1591 assert_eq!(&buf[..4], b"hell");
1592
1593 assert!(stream_send_ready(&stream));
1594 assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1595 assert_eq!(stream.send.off_front(), 8);
1596 assert_eq!(&buf[..4], b"owor");
1597
1598 stream.send.retransmit(3, 3);
1599 assert_eq!(stream.send.off_front(), 3);
1600
1601 assert!(stream_send_ready(&stream));
1602 assert_eq!(stream.send.emit(&mut buf[..3]), Ok((3, false)));
1603 assert_eq!(stream.send.off_front(), 8);
1604 assert_eq!(&buf[..3], b"low");
1605
1606 assert!(stream_send_ready(&stream));
1607 assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1608 assert_eq!(stream.send.off_front(), 10);
1609 assert_eq!(&buf[..2], b"ld");
1610
1611 stream.send.ack_and_drop(7, 2);
1612
1613 stream.send.retransmit(8, 2);
1614
1615 assert!(stream_send_ready(&stream));
1616 assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1617 assert_eq!(stream.send.off_front(), 10);
1618 assert_eq!(&buf[..2], b"ld");
1619
1620 assert!(stream_send_ready(&stream));
1621 assert_eq!(stream.send.emit(&mut buf[..1]), Ok((1, false)));
1622 assert_eq!(stream.send.off_front(), 11);
1623 assert_eq!(&buf[..1], b"o");
1624
1625 assert!(stream_send_ready(&stream));
1626 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1627 assert_eq!(stream.send.off_front(), 16);
1628 assert_eq!(&buf[..5], b"llehd");
1629
1630 stream.send.retransmit(12, 2);
1631
1632 assert!(stream_send_ready(&stream));
1633 assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1634 assert_eq!(stream.send.off_front(), 16);
1635 assert_eq!(&buf[..2], b"le");
1636
1637 assert!(stream_send_ready(&stream));
1638 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((4, true)));
1639 assert_eq!(stream.send.off_front(), 20);
1640 assert_eq!(&buf[..4], b"lrow");
1641
1642 assert!(!stream.is_flushable());
1643
1644 assert!(!stream_send_ready(&stream));
1645 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((0, true)));
1646 assert_eq!(stream.send.off_front(), 20);
1647
1648 stream.send.retransmit(7, 12);
1649
1650 assert!(stream_send_ready(&stream));
1651 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1652 assert_eq!(stream.send.off_front(), 12);
1653 assert_eq!(&buf[..5], b"rldol");
1654
1655 assert!(stream_send_ready(&stream));
1656 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1657 assert_eq!(stream.send.off_front(), 17);
1658 assert_eq!(&buf[..5], b"lehdl");
1659
1660 assert!(stream_send_ready(&stream));
1661 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((2, false)));
1662 assert_eq!(stream.send.off_front(), 20);
1663 assert_eq!(&buf[..2], b"ro");
1664
1665 stream.send.ack_and_drop(12, 7);
1666
1667 stream.send.retransmit(7, 12);
1668
1669 assert!(stream_send_ready(&stream));
1670 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1671 assert_eq!(stream.send.off_front(), 12);
1672 assert_eq!(&buf[..5], b"rldol");
1673
1674 assert!(stream_send_ready(&stream));
1675 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1676 assert_eq!(stream.send.off_front(), 17);
1677 assert_eq!(&buf[..5], b"lehdl");
1678
1679 assert!(stream_send_ready(&stream));
1680 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((2, false)));
1681 assert_eq!(stream.send.off_front(), 20);
1682 assert_eq!(&buf[..2], b"ro");
1683 }
1684
1685 #[test]
1686 fn rangebuf_split_off() {
1687 let mut buf = <RangeBuf>::from(b"helloworld", 5, true);
1688 assert_eq!(buf.start, 0);
1689 assert_eq!(buf.pos, 0);
1690 assert_eq!(buf.len, 10);
1691 assert_eq!(buf.off, 5);
1692 assert!(buf.fin);
1693
1694 assert_eq!(buf.len(), 10);
1695 assert_eq!(buf.off(), 5);
1696 assert!(buf.fin());
1697
1698 assert_eq!(&buf[..], b"helloworld");
1699
1700 buf.consume(5);
1702
1703 assert_eq!(buf.start, 0);
1704 assert_eq!(buf.pos, 5);
1705 assert_eq!(buf.len, 10);
1706 assert_eq!(buf.off, 5);
1707 assert!(buf.fin);
1708
1709 assert_eq!(buf.len(), 5);
1710 assert_eq!(buf.off(), 10);
1711 assert!(buf.fin());
1712
1713 assert_eq!(&buf[..], b"world");
1714
1715 let mut new_buf = buf.split_off(3);
1717
1718 assert_eq!(buf.start, 0);
1719 assert_eq!(buf.pos, 3);
1720 assert_eq!(buf.len, 3);
1721 assert_eq!(buf.off, 5);
1722 assert!(!buf.fin);
1723
1724 assert_eq!(buf.len(), 0);
1725 assert_eq!(buf.off(), 8);
1726 assert!(!buf.fin());
1727
1728 assert_eq!(&buf[..], b"");
1729
1730 assert_eq!(new_buf.start, 3);
1731 assert_eq!(new_buf.pos, 5);
1732 assert_eq!(new_buf.len, 7);
1733 assert_eq!(new_buf.off, 8);
1734 assert!(new_buf.fin);
1735
1736 assert_eq!(new_buf.len(), 5);
1737 assert_eq!(new_buf.off(), 10);
1738 assert!(new_buf.fin());
1739
1740 assert_eq!(&new_buf[..], b"world");
1741
1742 new_buf.consume(2);
1744
1745 assert_eq!(new_buf.start, 3);
1746 assert_eq!(new_buf.pos, 7);
1747 assert_eq!(new_buf.len, 7);
1748 assert_eq!(new_buf.off, 8);
1749 assert!(new_buf.fin);
1750
1751 assert_eq!(new_buf.len(), 3);
1752 assert_eq!(new_buf.off(), 12);
1753 assert!(new_buf.fin());
1754
1755 assert_eq!(&new_buf[..], b"rld");
1756
1757 let mut new_new_buf = new_buf.split_off(5);
1759
1760 assert_eq!(new_buf.start, 3);
1761 assert_eq!(new_buf.pos, 7);
1762 assert_eq!(new_buf.len, 5);
1763 assert_eq!(new_buf.off, 8);
1764 assert!(!new_buf.fin);
1765
1766 assert_eq!(new_buf.len(), 1);
1767 assert_eq!(new_buf.off(), 12);
1768 assert!(!new_buf.fin());
1769
1770 assert_eq!(&new_buf[..], b"r");
1771
1772 assert_eq!(new_new_buf.start, 8);
1773 assert_eq!(new_new_buf.pos, 8);
1774 assert_eq!(new_new_buf.len, 2);
1775 assert_eq!(new_new_buf.off, 13);
1776 assert!(new_new_buf.fin);
1777
1778 assert_eq!(new_new_buf.len(), 2);
1779 assert_eq!(new_new_buf.off(), 13);
1780 assert!(new_new_buf.fin());
1781
1782 assert_eq!(&new_new_buf[..], b"ld");
1783
1784 new_new_buf.consume(2);
1786
1787 assert_eq!(new_new_buf.start, 8);
1788 assert_eq!(new_new_buf.pos, 10);
1789 assert_eq!(new_new_buf.len, 2);
1790 assert_eq!(new_new_buf.off, 13);
1791 assert!(new_new_buf.fin);
1792
1793 assert_eq!(new_new_buf.len(), 0);
1794 assert_eq!(new_new_buf.off(), 15);
1795 assert!(new_new_buf.fin());
1796
1797 assert_eq!(&new_new_buf[..], b"");
1798 }
1799
1800 #[test]
1803 fn stream_limit_auto_open() {
1804 let local_tp = crate::TransportParams::default();
1805 let peer_tp = crate::TransportParams::default();
1806
1807 let mut streams = <StreamMap>::new(5, 5, 5);
1808
1809 let stream_id = 500;
1810 assert!(!is_local(stream_id, true), "stream id is peer initiated");
1811 assert!(is_bidi(stream_id), "stream id is bidirectional");
1812 assert_eq!(
1813 streams
1814 .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
1815 .err(),
1816 Some(Error::StreamLimit),
1817 "stream limit should be exceeded"
1818 );
1819 }
1820
1821 #[test]
1824 fn stream_create_out_of_order() {
1825 let local_tp = crate::TransportParams::default();
1826 let peer_tp = crate::TransportParams::default();
1827
1828 let mut streams = <StreamMap>::new(5, 5, 5);
1829
1830 for stream_id in [8, 12, 4] {
1831 assert!(is_local(stream_id, false), "stream id is client initiated");
1832 assert!(is_bidi(stream_id), "stream id is bidirectional");
1833 assert!(streams
1834 .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
1835 .is_ok());
1836 }
1837 }
1838
1839 #[test]
1841 fn stream_limit_edge() {
1842 let local_tp = crate::TransportParams::default();
1843 let peer_tp = crate::TransportParams::default();
1844
1845 let mut streams = <StreamMap>::new(3, 3, 3);
1846
1847 let stream_id = 8;
1849 assert!(streams
1850 .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
1851 .is_ok());
1852
1853 let stream_id = 12;
1855 assert_eq!(
1856 streams
1857 .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
1858 .err(),
1859 Some(Error::StreamLimit)
1860 );
1861 }
1862
1863 fn cycle_stream_priority(stream_id: u64, streams: &mut StreamMap) {
1864 let key = streams.get(stream_id).unwrap().priority_key.clone();
1865 streams.update_priority(&key.clone(), &key);
1866 }
1867
1868 #[test]
1869 fn writable_prioritized_default_priority() {
1870 let local_tp = crate::TransportParams::default();
1871 let peer_tp = crate::TransportParams {
1872 initial_max_stream_data_bidi_local: 100,
1873 initial_max_stream_data_uni: 100,
1874 ..Default::default()
1875 };
1876
1877 let mut streams = StreamMap::new(100, 100, 100);
1878
1879 for id in [0, 4, 8, 12] {
1880 assert!(streams
1881 .get_or_create(id, &local_tp, &peer_tp, false, true)
1882 .is_ok());
1883 }
1884
1885 let walk_1: Vec<u64> = streams.writable().collect();
1886 cycle_stream_priority(*walk_1.first().unwrap(), &mut streams);
1887 let walk_2: Vec<u64> = streams.writable().collect();
1888 cycle_stream_priority(*walk_2.first().unwrap(), &mut streams);
1889 let walk_3: Vec<u64> = streams.writable().collect();
1890 cycle_stream_priority(*walk_3.first().unwrap(), &mut streams);
1891 let walk_4: Vec<u64> = streams.writable().collect();
1892 cycle_stream_priority(*walk_4.first().unwrap(), &mut streams);
1893 let walk_5: Vec<u64> = streams.writable().collect();
1894
1895 assert_eq!(walk_1, vec![0, 4, 8, 12]);
1898 assert_eq!(walk_2, vec![4, 8, 12, 0]);
1899 assert_eq!(walk_3, vec![8, 12, 0, 4]);
1900 assert_eq!(walk_4, vec![12, 0, 4, 8,]);
1901 assert_eq!(walk_5, vec![0, 4, 8, 12]);
1902 }
1903
1904 #[test]
1905 fn writable_prioritized_insert_order() {
1906 let local_tp = crate::TransportParams::default();
1907 let peer_tp = crate::TransportParams {
1908 initial_max_stream_data_bidi_local: 100,
1909 initial_max_stream_data_uni: 100,
1910 ..Default::default()
1911 };
1912
1913 let mut streams = StreamMap::new(100, 100, 100);
1914
1915 for id in [12, 4, 8, 0] {
1918 assert!(streams
1919 .get_or_create(id, &local_tp, &peer_tp, false, true)
1920 .is_ok());
1921 }
1922
1923 let walk_1: Vec<u64> = streams.writable().collect();
1924 cycle_stream_priority(*walk_1.first().unwrap(), &mut streams);
1925 let walk_2: Vec<u64> = streams.writable().collect();
1926 cycle_stream_priority(*walk_2.first().unwrap(), &mut streams);
1927 let walk_3: Vec<u64> = streams.writable().collect();
1928 cycle_stream_priority(*walk_3.first().unwrap(), &mut streams);
1929 let walk_4: Vec<u64> = streams.writable().collect();
1930 cycle_stream_priority(*walk_4.first().unwrap(), &mut streams);
1931 let walk_5: Vec<u64> = streams.writable().collect();
1932 assert_eq!(walk_1, vec![12, 4, 8, 0]);
1933 assert_eq!(walk_2, vec![4, 8, 0, 12]);
1934 assert_eq!(walk_3, vec![8, 0, 12, 4,]);
1935 assert_eq!(walk_4, vec![0, 12, 4, 8]);
1936 assert_eq!(walk_5, vec![12, 4, 8, 0]);
1937 }
1938
1939 #[test]
1940 fn writable_prioritized_mixed_urgency() {
1941 let local_tp = crate::TransportParams::default();
1942 let peer_tp = crate::TransportParams {
1943 initial_max_stream_data_bidi_local: 100,
1944 initial_max_stream_data_uni: 100,
1945 ..Default::default()
1946 };
1947
1948 let mut streams = <StreamMap>::new(100, 100, 100);
1949
1950 let input = vec![
1953 (0, 100),
1954 (4, 90),
1955 (8, 80),
1956 (12, 70),
1957 (16, 60),
1958 (20, 50),
1959 (24, 40),
1960 (28, 30),
1961 (32, 20),
1962 (36, 10),
1963 (40, 0),
1964 ];
1965
1966 for (id, urgency) in input.clone() {
1967 let stream = streams
1970 .get_or_create(id, &local_tp, &peer_tp, false, true)
1971 .unwrap();
1972
1973 stream.urgency = urgency;
1974
1975 let new_priority_key = Arc::new(StreamPriorityKey {
1976 urgency: stream.urgency,
1977 incremental: stream.incremental,
1978 id,
1979 ..Default::default()
1980 });
1981
1982 let old_priority_key = std::mem::replace(
1983 &mut stream.priority_key,
1984 new_priority_key.clone(),
1985 );
1986
1987 streams.update_priority(&old_priority_key, &new_priority_key);
1988 }
1989
1990 let walk_1: Vec<u64> = streams.writable().collect();
1991 assert_eq!(walk_1, vec![40, 36, 32, 28, 24, 20, 16, 12, 8, 4, 0]);
1992
1993 for (id, urgency) in input {
1995 let stream = streams
1998 .get_or_create(id, &local_tp, &peer_tp, false, true)
1999 .unwrap();
2000
2001 stream.urgency = urgency;
2002
2003 let new_priority_key = Arc::new(StreamPriorityKey {
2004 urgency: stream.urgency,
2005 incremental: stream.incremental,
2006 id,
2007 ..Default::default()
2008 });
2009
2010 let old_priority_key = std::mem::replace(
2011 &mut stream.priority_key,
2012 new_priority_key.clone(),
2013 );
2014
2015 streams.update_priority(&old_priority_key, &new_priority_key);
2016 }
2017
2018 let walk_2: Vec<u64> = streams.writable().collect();
2019 assert_eq!(walk_2, vec![40, 36, 32, 28, 24, 20, 16, 12, 8, 4, 0]);
2020
2021 streams.collect(24, true);
2023
2024 let walk_3: Vec<u64> = streams.writable().collect();
2025 assert_eq!(walk_3, vec![40, 36, 32, 28, 20, 16, 12, 8, 4, 0]);
2026
2027 streams.collect(40, true);
2028 streams.collect(0, true);
2029
2030 let walk_4: Vec<u64> = streams.writable().collect();
2031 assert_eq!(walk_4, vec![36, 32, 28, 20, 16, 12, 8, 4]);
2032
2033 streams
2035 .get_or_create(44, &local_tp, &peer_tp, false, true)
2036 .unwrap();
2037
2038 let walk_5: Vec<u64> = streams.writable().collect();
2039 assert_eq!(walk_5, vec![36, 32, 28, 20, 16, 12, 8, 4, 44]);
2040 }
2041
2042 #[test]
2043 fn writable_prioritized_mixed_urgencies_incrementals() {
2044 let local_tp = crate::TransportParams::default();
2045 let peer_tp = crate::TransportParams {
2046 initial_max_stream_data_bidi_local: 100,
2047 initial_max_stream_data_uni: 100,
2048 ..Default::default()
2049 };
2050
2051 let mut streams = StreamMap::new(100, 100, 100);
2052
2053 let input = vec![
2055 (0, 100),
2056 (4, 20),
2057 (8, 100),
2058 (12, 20),
2059 (16, 90),
2060 (20, 25),
2061 (24, 90),
2062 (28, 30),
2063 (32, 80),
2064 (36, 20),
2065 (40, 0),
2066 ];
2067
2068 for (id, urgency) in input.clone() {
2069 let stream = streams
2072 .get_or_create(id, &local_tp, &peer_tp, false, true)
2073 .unwrap();
2074
2075 stream.urgency = urgency;
2076
2077 let new_priority_key = Arc::new(StreamPriorityKey {
2078 urgency: stream.urgency,
2079 incremental: stream.incremental,
2080 id,
2081 ..Default::default()
2082 });
2083
2084 let old_priority_key = std::mem::replace(
2085 &mut stream.priority_key,
2086 new_priority_key.clone(),
2087 );
2088
2089 streams.update_priority(&old_priority_key, &new_priority_key);
2090 }
2091
2092 let walk_1: Vec<u64> = streams.writable().collect();
2093 cycle_stream_priority(4, &mut streams);
2094 cycle_stream_priority(16, &mut streams);
2095 cycle_stream_priority(0, &mut streams);
2096 let walk_2: Vec<u64> = streams.writable().collect();
2097 cycle_stream_priority(12, &mut streams);
2098 cycle_stream_priority(24, &mut streams);
2099 cycle_stream_priority(8, &mut streams);
2100 let walk_3: Vec<u64> = streams.writable().collect();
2101 cycle_stream_priority(36, &mut streams);
2102 cycle_stream_priority(16, &mut streams);
2103 cycle_stream_priority(0, &mut streams);
2104 let walk_4: Vec<u64> = streams.writable().collect();
2105 cycle_stream_priority(4, &mut streams);
2106 cycle_stream_priority(24, &mut streams);
2107 cycle_stream_priority(8, &mut streams);
2108 let walk_5: Vec<u64> = streams.writable().collect();
2109 cycle_stream_priority(12, &mut streams);
2110 cycle_stream_priority(16, &mut streams);
2111 cycle_stream_priority(0, &mut streams);
2112 let walk_6: Vec<u64> = streams.writable().collect();
2113 cycle_stream_priority(36, &mut streams);
2114 cycle_stream_priority(24, &mut streams);
2115 cycle_stream_priority(8, &mut streams);
2116 let walk_7: Vec<u64> = streams.writable().collect();
2117 cycle_stream_priority(4, &mut streams);
2118 cycle_stream_priority(16, &mut streams);
2119 cycle_stream_priority(0, &mut streams);
2120 let walk_8: Vec<u64> = streams.writable().collect();
2121 cycle_stream_priority(12, &mut streams);
2122 cycle_stream_priority(24, &mut streams);
2123 cycle_stream_priority(8, &mut streams);
2124 let walk_9: Vec<u64> = streams.writable().collect();
2125 cycle_stream_priority(36, &mut streams);
2126 cycle_stream_priority(16, &mut streams);
2127 cycle_stream_priority(0, &mut streams);
2128
2129 assert_eq!(walk_1, vec![40, 4, 12, 36, 20, 28, 32, 16, 24, 0, 8]);
2130 assert_eq!(walk_2, vec![40, 12, 36, 4, 20, 28, 32, 24, 16, 8, 0]);
2131 assert_eq!(walk_3, vec![40, 36, 4, 12, 20, 28, 32, 16, 24, 0, 8]);
2132 assert_eq!(walk_4, vec![40, 4, 12, 36, 20, 28, 32, 24, 16, 8, 0]);
2133 assert_eq!(walk_5, vec![40, 12, 36, 4, 20, 28, 32, 16, 24, 0, 8]);
2134 assert_eq!(walk_6, vec![40, 36, 4, 12, 20, 28, 32, 24, 16, 8, 0]);
2135 assert_eq!(walk_7, vec![40, 4, 12, 36, 20, 28, 32, 16, 24, 0, 8]);
2136 assert_eq!(walk_8, vec![40, 12, 36, 4, 20, 28, 32, 24, 16, 8, 0]);
2137 assert_eq!(walk_9, vec![40, 36, 4, 12, 20, 28, 32, 16, 24, 0, 8]);
2138
2139 streams.collect(20, true);
2141
2142 let walk_10: Vec<u64> = streams.writable().collect();
2143 assert_eq!(walk_10, vec![40, 4, 12, 36, 28, 32, 24, 16, 8, 0]);
2144
2145 let stream = streams
2147 .get_or_create(44, &local_tp, &peer_tp, false, true)
2148 .unwrap();
2149
2150 stream.urgency = 20;
2151 stream.incremental = true;
2152
2153 let new_priority_key = Arc::new(StreamPriorityKey {
2154 urgency: stream.urgency,
2155 incremental: stream.incremental,
2156 id: 44,
2157 ..Default::default()
2158 });
2159
2160 let old_priority_key =
2161 std::mem::replace(&mut stream.priority_key, new_priority_key.clone());
2162
2163 streams.update_priority(&old_priority_key, &new_priority_key);
2164
2165 let walk_11: Vec<u64> = streams.writable().collect();
2166 assert_eq!(walk_11, vec![40, 4, 12, 36, 44, 28, 32, 24, 16, 8, 0]);
2167 }
2168
2169 #[test]
2170 fn priority_tree_dupes() {
2171 let mut prioritized_writable: RBTree<StreamWritablePriorityAdapter> =
2172 Default::default();
2173
2174 for id in [0, 4, 8, 12] {
2175 let s = Arc::new(StreamPriorityKey {
2176 urgency: 0,
2177 incremental: false,
2178 id,
2179 ..Default::default()
2180 });
2181
2182 prioritized_writable.insert(s);
2183 }
2184
2185 let walk_1: Vec<u64> =
2186 prioritized_writable.iter().map(|s| s.id).collect();
2187 assert_eq!(walk_1, vec![0, 4, 8, 12]);
2188
2189 for id in [0, 4, 8, 12] {
2192 let s = Arc::new(StreamPriorityKey {
2193 urgency: 0,
2194 incremental: false,
2195 id,
2196 ..Default::default()
2197 });
2198
2199 prioritized_writable.insert(s);
2200 }
2201
2202 let walk_2: Vec<u64> =
2203 prioritized_writable.iter().map(|s| s.id).collect();
2204 assert_eq!(walk_2, vec![0, 0, 4, 4, 8, 8, 12, 12]);
2205 }
2206}
2207
2208mod recv_buf;
2209mod send_buf;