1use std::cmp;
28
29use std::sync::Arc;
30
31use std::collections::hash_map;
32use std::collections::HashMap;
33use std::collections::HashSet;
34
35use intrusive_collections::intrusive_adapter;
36use intrusive_collections::KeyAdapter;
37use intrusive_collections::RBTree;
38use intrusive_collections::RBTreeAtomicLink;
39
40use smallvec::SmallVec;
41
42use crate::buffers::DefaultBufFactory;
43use crate::BufFactory;
44use crate::Error;
45use crate::Result;
46
47const DEFAULT_URGENCY: u8 = 127;
48
49pub(crate) const DEFAULT_STREAM_WINDOW: u64 = 32 * 1024;
51
52pub const MAX_STREAM_WINDOW: u64 = 16 * 1024 * 1024;
54
55#[derive(Default)]
60pub struct StreamIdHasher {
61 id: u64,
62}
63
64#[derive(Debug, PartialEq, Clone, Copy)]
66pub struct RecvBufResetReturn {
67 pub max_data_delta: u64,
70
71 pub consumed_flowcontrol: u64,
74}
75
76impl RecvBufResetReturn {
77 pub fn zero() -> Self {
78 Self {
79 max_data_delta: 0,
80 consumed_flowcontrol: 0,
81 }
82 }
83}
84
85pub enum RecvAction<T: bytes::BufMut> {
87 Emit { out: T },
89 Discard { len: usize },
91}
92
93impl std::hash::Hasher for StreamIdHasher {
94 #[inline]
95 fn finish(&self) -> u64 {
96 self.id
97 }
98
99 #[inline]
100 fn write_u64(&mut self, id: u64) {
101 self.id = id;
102 }
103
104 #[inline]
105 fn write(&mut self, _: &[u8]) {
106 unimplemented!()
109 }
110}
111
112type BuildStreamIdHasher = std::hash::BuildHasherDefault<StreamIdHasher>;
113
114pub type StreamIdHashMap<V> = HashMap<u64, V, BuildStreamIdHasher>;
115pub type StreamIdHashSet = HashSet<u64, BuildStreamIdHasher>;
116
117#[derive(Default)]
119pub struct StreamMap<F: BufFactory = DefaultBufFactory> {
120 streams: StreamIdHashMap<Stream<F>>,
122
123 collected: StreamIdHashSet,
129
130 peer_max_streams_bidi: u64,
132
133 peer_max_streams_uni: u64,
135
136 peer_opened_streams_bidi: u64,
138
139 peer_opened_streams_uni: u64,
141
142 local_max_streams_bidi: u64,
144 local_max_streams_bidi_next: u64,
145
146 initial_max_streams_bidi: u64,
148
149 local_max_streams_uni: u64,
151 local_max_streams_uni_next: u64,
152
153 initial_max_streams_uni: u64,
155
156 local_opened_streams_bidi: u64,
158
159 local_opened_streams_uni: u64,
161
162 flushable: RBTree<StreamFlushablePriorityAdapter>,
166
167 pub readable: RBTree<StreamReadablePriorityAdapter>,
171
172 pub writable: RBTree<StreamWritablePriorityAdapter>,
177
178 almost_full: StreamIdHashSet,
183
184 blocked: StreamIdHashMap<u64>,
188
189 reset: StreamIdHashMap<(u64, u64)>,
193
194 stopped: StreamIdHashMap<u64>,
198
199 max_stream_window: u64,
201
202 use_initial_max_data_as_flow_control_win: bool,
205}
206
207impl<F: BufFactory> StreamMap<F> {
208 pub fn new(
209 max_streams_bidi: u64, max_streams_uni: u64, max_stream_window: u64,
210 ) -> Self {
211 StreamMap {
212 local_max_streams_bidi: max_streams_bidi,
213 local_max_streams_bidi_next: max_streams_bidi,
214 initial_max_streams_bidi: max_streams_bidi,
215
216 local_max_streams_uni: max_streams_uni,
217 local_max_streams_uni_next: max_streams_uni,
218 initial_max_streams_uni: max_streams_uni,
219
220 max_stream_window,
221
222 ..StreamMap::default()
223 }
224 }
225
226 pub fn get(&self, id: u64) -> Option<&Stream<F>> {
228 self.streams.get(&id)
229 }
230
231 pub fn get_mut(&mut self, id: u64) -> Option<&mut Stream<F>> {
233 self.streams.get_mut(&id)
234 }
235
236 pub(crate) fn get_or_create(
249 &mut self, id: u64, local_params: &crate::TransportParams,
250 peer_params: &crate::TransportParams, local: bool, is_server: bool,
251 ) -> Result<&mut Stream<F>> {
252 let (stream, is_new_and_writable) = match self.streams.entry(id) {
253 hash_map::Entry::Vacant(v) => {
254 if self.collected.contains(&id) {
256 return Err(Error::Done);
257 }
258
259 if local != is_local(id, is_server) {
260 return Err(Error::InvalidStreamState(id));
261 }
262
263 let (max_rx_data, max_tx_data) = match (local, is_bidi(id)) {
264 (true, true) => (
266 local_params.initial_max_stream_data_bidi_local,
267 peer_params.initial_max_stream_data_bidi_remote,
268 ),
269
270 (true, false) => (0, peer_params.initial_max_stream_data_uni),
272
273 (false, true) => (
275 local_params.initial_max_stream_data_bidi_remote,
276 peer_params.initial_max_stream_data_bidi_local,
277 ),
278
279 (false, false) =>
281 (local_params.initial_max_stream_data_uni, 0),
282 };
283
284 let stream_sequence = id >> 2;
288
289 match (is_local(id, is_server), is_bidi(id)) {
291 (true, true) => {
292 let n = cmp::max(
293 self.local_opened_streams_bidi,
294 stream_sequence + 1,
295 );
296
297 if n > self.peer_max_streams_bidi {
298 return Err(Error::StreamLimit);
299 }
300
301 self.local_opened_streams_bidi = n;
302 },
303
304 (true, false) => {
305 let n = cmp::max(
306 self.local_opened_streams_uni,
307 stream_sequence + 1,
308 );
309
310 if n > self.peer_max_streams_uni {
311 return Err(Error::StreamLimit);
312 }
313
314 self.local_opened_streams_uni = n;
315 },
316
317 (false, true) => {
318 let n = cmp::max(
319 self.peer_opened_streams_bidi,
320 stream_sequence + 1,
321 );
322
323 if n > self.local_max_streams_bidi {
324 return Err(Error::StreamLimit);
325 }
326
327 self.peer_opened_streams_bidi = n;
328 },
329
330 (false, false) => {
331 let n = cmp::max(
332 self.peer_opened_streams_uni,
333 stream_sequence + 1,
334 );
335
336 if n > self.local_max_streams_uni {
337 return Err(Error::StreamLimit);
338 }
339
340 self.peer_opened_streams_uni = n;
341 },
342 };
343
344 let initial_window =
345 if self.use_initial_max_data_as_flow_control_win {
346 max_rx_data
347 } else {
348 cmp::min(max_rx_data, DEFAULT_STREAM_WINDOW)
349 };
350 let s = Stream::new(
351 id,
352 max_rx_data,
353 max_tx_data,
354 local,
355 initial_window,
356 self.max_stream_window,
357 );
358
359 let is_writable = s.is_writable();
360
361 (v.insert(s), is_writable)
362 },
363
364 hash_map::Entry::Occupied(v) => (v.into_mut(), false),
365 };
366
367 if is_new_and_writable {
370 self.writable.insert(Arc::clone(&stream.priority_key));
371 }
372
373 Ok(stream)
374 }
375
376 pub fn insert_readable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
380 if !priority_key.readable.is_linked() {
381 self.readable.insert(Arc::clone(priority_key));
382 }
383 }
384
385 pub fn remove_readable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
387 if !priority_key.readable.is_linked() {
388 return;
389 }
390
391 let mut c = {
392 let ptr = Arc::as_ptr(priority_key);
393 unsafe { self.readable.cursor_mut_from_ptr(ptr) }
394 };
395
396 c.remove();
397 }
398
399 pub fn insert_writable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
406 if !priority_key.writable.is_linked() {
407 self.writable.insert(Arc::clone(priority_key));
408 }
409 }
410
411 pub fn remove_writable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
416 if !priority_key.writable.is_linked() {
417 return;
418 }
419
420 let mut c = {
421 let ptr = Arc::as_ptr(priority_key);
422 unsafe { self.writable.cursor_mut_from_ptr(ptr) }
423 };
424
425 c.remove();
426 }
427
428 pub fn insert_flushable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
432 if !priority_key.flushable.is_linked() {
433 self.flushable.insert(Arc::clone(priority_key));
434 }
435 }
436
437 pub fn remove_flushable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
439 if !priority_key.flushable.is_linked() {
440 return;
441 }
442
443 let mut c = {
444 let ptr = Arc::as_ptr(priority_key);
445 unsafe { self.flushable.cursor_mut_from_ptr(ptr) }
446 };
447
448 c.remove();
449 }
450
451 pub fn peek_flushable(&self) -> Option<Arc<StreamPriorityKey>> {
452 self.flushable.front().clone_pointer()
453 }
454
455 pub fn update_priority(
457 &mut self, old: &Arc<StreamPriorityKey>, new: &Arc<StreamPriorityKey>,
458 ) {
459 if old.readable.is_linked() {
460 self.remove_readable(old);
461 self.readable.insert(Arc::clone(new));
462 }
463
464 if old.writable.is_linked() {
465 self.remove_writable(old);
466 self.writable.insert(Arc::clone(new));
467 }
468
469 if old.flushable.is_linked() {
470 self.remove_flushable(old);
471 self.flushable.insert(Arc::clone(new));
472 }
473 }
474
475 pub fn insert_almost_full(&mut self, stream_id: u64) {
479 self.almost_full.insert(stream_id);
480 }
481
482 pub fn remove_almost_full(&mut self, stream_id: u64) {
484 self.almost_full.remove(&stream_id);
485 }
486
487 pub fn insert_blocked(&mut self, stream_id: u64, off: u64) {
492 self.blocked.insert(stream_id, off);
493 }
494
495 pub fn remove_blocked(&mut self, stream_id: u64) {
497 self.blocked.remove(&stream_id);
498 }
499
500 pub fn insert_reset(
505 &mut self, stream_id: u64, error_code: u64, final_size: u64,
506 ) {
507 self.reset.insert(stream_id, (error_code, final_size));
508 }
509
510 pub fn remove_reset(&mut self, stream_id: u64) {
512 self.reset.remove(&stream_id);
513 }
514
515 pub fn insert_stopped(&mut self, stream_id: u64, error_code: u64) {
520 self.stopped.insert(stream_id, error_code);
521 }
522
523 pub fn remove_stopped(&mut self, stream_id: u64) {
525 self.stopped.remove(&stream_id);
526 }
527
528 pub fn update_peer_max_streams_bidi(&mut self, v: u64) {
530 self.peer_max_streams_bidi = cmp::max(self.peer_max_streams_bidi, v);
531 }
532
533 pub fn update_peer_max_streams_uni(&mut self, v: u64) {
535 self.peer_max_streams_uni = cmp::max(self.peer_max_streams_uni, v);
536 }
537
538 pub fn update_max_streams_bidi(&mut self) {
540 self.local_max_streams_bidi = self.local_max_streams_bidi_next;
541 }
542
543 pub fn set_max_streams_bidi(&mut self, max: u64) {
545 self.local_max_streams_bidi = max;
546 self.local_max_streams_bidi_next = max;
547 self.initial_max_streams_bidi = max;
548 }
549
550 pub fn max_streams_bidi(&self) -> u64 {
552 self.local_max_streams_bidi
553 }
554
555 pub fn max_streams_bidi_next(&mut self) -> u64 {
557 self.local_max_streams_bidi_next
558 }
559
560 pub fn update_max_streams_uni(&mut self) {
562 self.local_max_streams_uni = self.local_max_streams_uni_next;
563 }
564
565 pub fn max_streams_uni_next(&mut self) -> u64 {
567 self.local_max_streams_uni_next
568 }
569
570 pub fn peer_max_streams_bidi(&self) -> u64 {
572 self.peer_max_streams_bidi
573 }
574
575 pub fn peer_streams_left_bidi(&self) -> u64 {
578 self.peer_max_streams_bidi - self.local_opened_streams_bidi
579 }
580
581 pub fn peer_max_streams_uni(&self) -> u64 {
583 self.peer_max_streams_uni
584 }
585
586 pub fn peer_streams_left_uni(&self) -> u64 {
589 self.peer_max_streams_uni - self.local_opened_streams_uni
590 }
591
592 pub fn collect(&mut self, stream_id: u64, local: bool) {
597 if !local {
598 if is_bidi(stream_id) {
601 self.local_max_streams_bidi_next =
602 self.local_max_streams_bidi_next.saturating_add(1);
603 } else {
604 self.local_max_streams_uni_next =
605 self.local_max_streams_uni_next.saturating_add(1);
606 }
607 }
608
609 let s = self.streams.remove(&stream_id).unwrap();
610
611 self.remove_readable(&s.priority_key);
612
613 self.remove_writable(&s.priority_key);
614
615 self.remove_flushable(&s.priority_key);
616
617 self.collected.insert(stream_id);
618 }
619
620 pub fn readable(&self) -> StreamIter {
622 StreamIter {
623 streams: self.readable.iter().map(|s| s.id).collect(),
624 index: 0,
625 }
626 }
627
628 pub fn writable(&self) -> StreamIter {
630 StreamIter {
631 streams: self.writable.iter().map(|s| s.id).collect(),
632 index: 0,
633 }
634 }
635
636 pub fn almost_full(&self) -> StreamIter {
638 StreamIter::from(&self.almost_full)
639 }
640
641 pub fn blocked(&self) -> hash_map::Iter<'_, u64, u64> {
643 self.blocked.iter()
644 }
645
646 pub fn reset(&self) -> hash_map::Iter<'_, u64, (u64, u64)> {
648 self.reset.iter()
649 }
650
651 pub fn stopped(&self) -> hash_map::Iter<'_, u64, u64> {
653 self.stopped.iter()
654 }
655
656 pub fn is_collected(&self, stream_id: u64) -> bool {
658 self.collected.contains(&stream_id)
659 }
660
661 pub fn has_flushable(&self) -> bool {
663 !self.flushable.is_empty()
664 }
665
666 pub fn has_readable(&self) -> bool {
668 !self.readable.is_empty()
669 }
670
671 pub fn has_almost_full(&self) -> bool {
674 !self.almost_full.is_empty()
675 }
676
677 pub fn has_blocked(&self) -> bool {
679 !self.blocked.is_empty()
680 }
681
682 pub fn has_reset(&self) -> bool {
684 !self.reset.is_empty()
685 }
686
687 pub fn has_stopped(&self) -> bool {
689 !self.stopped.is_empty()
690 }
691
692 pub fn should_update_max_streams_bidi(&self) -> bool {
698 let available = self
699 .local_max_streams_bidi
700 .saturating_sub(self.peer_opened_streams_bidi);
701 self.local_max_streams_bidi_next != self.local_max_streams_bidi &&
702 available <= self.initial_max_streams_bidi / 2
703 }
704
705 pub fn should_update_max_streams_uni(&self) -> bool {
711 let available = self
712 .local_max_streams_uni
713 .saturating_sub(self.peer_opened_streams_uni);
714 self.local_max_streams_uni_next != self.local_max_streams_uni &&
715 available <= self.initial_max_streams_uni / 2
716 }
717
718 #[cfg(test)]
720 pub fn len(&self) -> usize {
721 self.streams.len()
722 }
723
724 pub(crate) fn set_use_initial_max_data_as_flow_control_win(
727 &mut self, v: bool,
728 ) {
729 self.use_initial_max_data_as_flow_control_win = v;
730 }
731}
732
733pub struct Stream<F: BufFactory = DefaultBufFactory> {
735 pub recv: recv_buf::RecvBuf,
737
738 pub send: send_buf::SendBuf<F>,
740
741 pub send_lowat: usize,
742
743 pub bidi: bool,
745
746 pub local: bool,
748
749 pub urgency: u8,
751
752 pub incremental: bool,
754
755 pub priority_key: Arc<StreamPriorityKey>,
756}
757
758impl<F: BufFactory> Stream<F> {
759 pub fn new(
761 id: u64, max_rx_data: u64, max_tx_data: u64, local: bool,
762 initial_window: u64, max_window: u64,
763 ) -> Self {
764 let priority_key = Arc::new(StreamPriorityKey {
765 id,
766 ..Default::default()
767 });
768
769 Stream {
770 recv: recv_buf::RecvBuf::new(max_rx_data, initial_window, max_window),
771 send: send_buf::SendBuf::new(max_tx_data),
772 send_lowat: 1,
773 bidi: is_bidi(id),
774 local,
775 urgency: priority_key.urgency,
776 incremental: priority_key.incremental,
777 priority_key,
778 }
779 }
780
781 pub fn is_readable(&self) -> bool {
783 self.recv.ready()
784 }
785
786 pub fn is_writable(&self) -> bool {
789 !self.send.is_shutdown() &&
790 !self.send.is_fin() &&
791 (self.send.off_back() + self.send_lowat as u64) <
792 self.send.max_off()
793 }
794
795 pub fn is_flushable(&self) -> bool {
798 let off_front = self.send.off_front();
799
800 !self.send.is_empty() &&
801 off_front < self.send.off_back() &&
802 off_front < self.send.max_off()
803 }
804
805 pub fn is_complete(&self) -> bool {
815 match (self.bidi, self.local) {
816 (true, _) => self.recv.is_fin() && self.send.is_complete(),
819
820 (false, true) => self.send.is_complete(),
823
824 (false, false) => self.recv.is_fin(),
827 }
828 }
829}
830
831pub fn is_local(stream_id: u64, is_server: bool) -> bool {
833 (stream_id & 0x1) == (is_server as u64)
834}
835
836pub fn is_bidi(stream_id: u64) -> bool {
838 (stream_id & 0x2) == 0
839}
840
841#[derive(Clone, Debug)]
842pub struct StreamPriorityKey {
843 pub urgency: u8,
844 pub incremental: bool,
845 pub id: u64,
846
847 pub readable: RBTreeAtomicLink,
848 pub writable: RBTreeAtomicLink,
849 pub flushable: RBTreeAtomicLink,
850}
851
852impl Default for StreamPriorityKey {
853 fn default() -> Self {
854 Self {
855 urgency: DEFAULT_URGENCY,
856 incremental: true,
857 id: Default::default(),
858 readable: Default::default(),
859 writable: Default::default(),
860 flushable: Default::default(),
861 }
862 }
863}
864
865impl PartialEq for StreamPriorityKey {
866 fn eq(&self, other: &Self) -> bool {
867 self.id == other.id
868 }
869}
870
871impl Eq for StreamPriorityKey {}
872
873impl PartialOrd for StreamPriorityKey {
874 fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
875 Some(self.cmp(other))
876 }
877}
878
879impl Ord for StreamPriorityKey {
880 fn cmp(&self, other: &Self) -> cmp::Ordering {
881 if self.id == other.id {
883 return cmp::Ordering::Equal;
884 }
885
886 if self.urgency != other.urgency {
888 return self.urgency.cmp(&other.urgency);
889 }
890
891 if !self.incremental && !other.incremental {
894 return self.id.cmp(&other.id);
895 }
896
897 if self.incremental && !other.incremental {
899 return cmp::Ordering::Greater;
900 }
901 if !self.incremental && other.incremental {
902 return cmp::Ordering::Less;
903 }
904
905 cmp::Ordering::Greater
909 }
910}
911
912intrusive_adapter!(pub StreamWritablePriorityAdapter = Arc<StreamPriorityKey>: StreamPriorityKey { writable: RBTreeAtomicLink });
913
914impl KeyAdapter<'_> for StreamWritablePriorityAdapter {
915 type Key = StreamPriorityKey;
916
917 fn get_key(&self, s: &StreamPriorityKey) -> Self::Key {
918 s.clone()
919 }
920}
921
922intrusive_adapter!(pub StreamReadablePriorityAdapter = Arc<StreamPriorityKey>: StreamPriorityKey { readable: RBTreeAtomicLink });
923
924impl KeyAdapter<'_> for StreamReadablePriorityAdapter {
925 type Key = StreamPriorityKey;
926
927 fn get_key(&self, s: &StreamPriorityKey) -> Self::Key {
928 s.clone()
929 }
930}
931
932intrusive_adapter!(pub StreamFlushablePriorityAdapter = Arc<StreamPriorityKey>: StreamPriorityKey { flushable: RBTreeAtomicLink });
933
934impl KeyAdapter<'_> for StreamFlushablePriorityAdapter {
935 type Key = StreamPriorityKey;
936
937 fn get_key(&self, s: &StreamPriorityKey) -> Self::Key {
938 s.clone()
939 }
940}
941
942#[derive(Default)]
944pub struct StreamIter {
945 streams: SmallVec<[u64; 8]>,
946 index: usize,
947}
948
949impl StreamIter {
950 #[inline]
951 fn from(streams: &StreamIdHashSet) -> Self {
952 StreamIter {
953 streams: streams.iter().copied().collect(),
954 index: 0,
955 }
956 }
957}
958
959impl Iterator for StreamIter {
960 type Item = u64;
961
962 #[inline]
963 fn next(&mut self) -> Option<Self::Item> {
964 let v = self.streams.get(self.index)?;
965 self.index += 1;
966 Some(*v)
967 }
968}
969
970impl ExactSizeIterator for StreamIter {
971 #[inline]
972 fn len(&self) -> usize {
973 self.streams.len() - self.index
974 }
975}
976
977#[cfg(test)]
978mod tests {
979 use crate::range_buf::RangeBuf;
980
981 use super::*;
982
983 #[test]
984 fn recv_flow_control() {
985 let mut stream = <Stream>::new(0, 15, 0, true, 15, DEFAULT_STREAM_WINDOW);
986 assert!(!stream.recv.almost_full());
987
988 let mut buf = [0; 32];
989
990 let first = RangeBuf::from(b"hello", 0, false);
991 let second = RangeBuf::from(b"world", 5, false);
992 let third = RangeBuf::from(b"something", 10, false);
993
994 assert_eq!(stream.recv.write(second), Ok(()));
995 assert_eq!(stream.recv.write(first), Ok(()));
996 assert!(!stream.recv.almost_full());
997
998 assert_eq!(stream.recv.write(third), Err(Error::FlowControl));
999
1000 let (len, fin) = stream.recv.emit(&mut buf).unwrap();
1001 assert_eq!(&buf[..len], b"helloworld");
1002 assert!(!fin);
1003
1004 assert!(stream.recv.almost_full());
1005
1006 stream.recv.update_max_data(std::time::Instant::now());
1007 assert_eq!(stream.recv.max_data_next(), 25);
1008 assert!(!stream.recv.almost_full());
1009
1010 let third = RangeBuf::from(b"something", 10, false);
1011 assert_eq!(stream.recv.write(third), Ok(()));
1012 }
1013
1014 #[test]
1015 fn recv_past_fin() {
1016 let mut stream = <Stream>::new(0, 15, 0, true, 15, DEFAULT_STREAM_WINDOW);
1017 assert!(!stream.recv.almost_full());
1018
1019 let first = RangeBuf::from(b"hello", 0, true);
1020 let second = RangeBuf::from(b"world", 5, false);
1021
1022 assert_eq!(stream.recv.write(first), Ok(()));
1023 assert_eq!(stream.recv.write(second), Err(Error::FinalSize));
1024 }
1025
1026 #[test]
1027 fn recv_fin_dup() {
1028 let mut stream = <Stream>::new(0, 15, 0, true, 15, DEFAULT_STREAM_WINDOW);
1029 assert!(!stream.recv.almost_full());
1030
1031 let first = RangeBuf::from(b"hello", 0, true);
1032 let second = RangeBuf::from(b"hello", 0, true);
1033
1034 assert_eq!(stream.recv.write(first), Ok(()));
1035 assert_eq!(stream.recv.write(second), Ok(()));
1036
1037 let mut buf = [0; 32];
1038
1039 let (len, fin) = stream.recv.emit(&mut buf).unwrap();
1040 assert_eq!(&buf[..len], b"hello");
1041 assert!(fin);
1042 }
1043
1044 #[test]
1045 fn recv_fin_change() {
1046 let mut stream = <Stream>::new(0, 15, 0, true, 15, DEFAULT_STREAM_WINDOW);
1047 assert!(!stream.recv.almost_full());
1048
1049 let first = RangeBuf::from(b"hello", 0, true);
1050 let second = RangeBuf::from(b"world", 5, true);
1051
1052 assert_eq!(stream.recv.write(second), Ok(()));
1053 assert_eq!(stream.recv.write(first), Err(Error::FinalSize));
1054 }
1055
1056 #[test]
1057 fn recv_fin_lower_than_received() {
1058 let mut stream = <Stream>::new(0, 15, 0, true, 15, DEFAULT_STREAM_WINDOW);
1059 assert!(!stream.recv.almost_full());
1060
1061 let first = RangeBuf::from(b"hello", 0, true);
1062 let second = RangeBuf::from(b"world", 5, false);
1063
1064 assert_eq!(stream.recv.write(second), Ok(()));
1065 assert_eq!(stream.recv.write(first), Err(Error::FinalSize));
1066 }
1067
1068 #[test]
1069 fn recv_fin_flow_control() {
1070 let mut stream = <Stream>::new(0, 15, 0, true, 15, DEFAULT_STREAM_WINDOW);
1071 assert!(!stream.recv.almost_full());
1072
1073 let mut buf = [0; 32];
1074
1075 let first = RangeBuf::from(b"hello", 0, false);
1076 let second = RangeBuf::from(b"world", 5, true);
1077
1078 assert_eq!(stream.recv.write(first), Ok(()));
1079 assert_eq!(stream.recv.write(second), Ok(()));
1080
1081 let (len, fin) = stream.recv.emit(&mut buf).unwrap();
1082 assert_eq!(&buf[..len], b"helloworld");
1083 assert!(fin);
1084
1085 assert!(!stream.recv.almost_full());
1086 }
1087
1088 #[test]
1089 fn recv_fin_reset_mismatch() {
1090 let mut stream = <Stream>::new(0, 15, 0, true, 15, DEFAULT_STREAM_WINDOW);
1091 assert!(!stream.recv.almost_full());
1092
1093 let first = RangeBuf::from(b"hello", 0, true);
1094
1095 assert_eq!(stream.recv.write(first), Ok(()));
1096 assert_eq!(stream.recv.reset(0, 10), Err(Error::FinalSize));
1097 }
1098
1099 #[test]
1100 fn recv_reset_with_gap() {
1101 let mut stream = <Stream>::new(0, 15, 0, true, 15, DEFAULT_STREAM_WINDOW);
1102 assert!(!stream.recv.almost_full());
1103
1104 let first = RangeBuf::from(b"hello", 0, false);
1105
1106 assert_eq!(stream.recv.write(first), Ok(()));
1107 assert_eq!(stream.recv.emit(&mut [0; 1]), Ok((1, false)));
1109 assert_eq!(
1111 stream.recv.reset(0, 10),
1112 Ok(RecvBufResetReturn {
1113 max_data_delta: 5,
1114 consumed_flowcontrol: 9
1116 })
1117 );
1118 assert_eq!(stream.recv.reset(0, 10), Ok(RecvBufResetReturn::zero()));
1119 }
1120
1121 #[test]
1122 fn recv_reset_dup() {
1123 let mut stream = <Stream>::new(0, 15, 0, true, 15, DEFAULT_STREAM_WINDOW);
1124 assert!(!stream.recv.almost_full());
1125
1126 let first = RangeBuf::from(b"hello", 0, false);
1127
1128 assert_eq!(stream.recv.write(first), Ok(()));
1129 assert_eq!(
1130 stream.recv.reset(0, 5),
1131 Ok(RecvBufResetReturn {
1132 max_data_delta: 0,
1133 consumed_flowcontrol: 5
1134 })
1135 );
1136 assert_eq!(stream.recv.reset(0, 5), Ok(RecvBufResetReturn::zero()));
1137 }
1138
1139 #[test]
1140 fn recv_reset_change() {
1141 let mut stream = <Stream>::new(0, 15, 0, true, 15, DEFAULT_STREAM_WINDOW);
1142 assert!(!stream.recv.almost_full());
1143
1144 let first = RangeBuf::from(b"hello", 0, false);
1145
1146 assert_eq!(stream.recv.write(first), Ok(()));
1147 assert_eq!(
1148 stream.recv.reset(0, 5),
1149 Ok(RecvBufResetReturn {
1150 max_data_delta: 0,
1151 consumed_flowcontrol: 5
1152 })
1153 );
1154 assert_eq!(stream.recv.reset(0, 10), Err(Error::FinalSize));
1155 }
1156
1157 #[test]
1158 fn recv_reset_lower_than_received() {
1159 let mut stream = <Stream>::new(0, 15, 0, true, 15, DEFAULT_STREAM_WINDOW);
1160 assert!(!stream.recv.almost_full());
1161
1162 let first = RangeBuf::from(b"hello", 0, false);
1163
1164 assert_eq!(stream.recv.write(first), Ok(()));
1165 assert_eq!(stream.recv.reset(0, 4), Err(Error::FinalSize));
1166 }
1167
1168 #[test]
1169 fn send_flow_control() {
1170 let mut buf = [0; 25];
1171
1172 let mut stream = <Stream>::new(0, 0, 15, true, 0, DEFAULT_STREAM_WINDOW);
1173
1174 let first = b"hello";
1175 let second = b"world";
1176 let third = b"something";
1177
1178 assert!(stream.send.write(first, false).is_ok());
1179 assert!(stream.send.write(second, false).is_ok());
1180 assert!(stream.send.write(third, false).is_ok());
1181
1182 assert_eq!(stream.send.off_front(), 0);
1183
1184 let (written, fin) = stream.send.emit(&mut buf[..25]).unwrap();
1185 assert_eq!(written, 15);
1186 assert!(!fin);
1187 assert_eq!(&buf[..written], b"helloworldsomet");
1188
1189 assert_eq!(stream.send.off_front(), 15);
1190
1191 let (written, fin) = stream.send.emit(&mut buf[..25]).unwrap();
1192 assert_eq!(written, 0);
1193 assert!(!fin);
1194 assert_eq!(&buf[..written], b"");
1195
1196 stream.send.retransmit(0, 15);
1197
1198 assert_eq!(stream.send.off_front(), 0);
1199
1200 let (written, fin) = stream.send.emit(&mut buf[..10]).unwrap();
1201 assert_eq!(written, 10);
1202 assert!(!fin);
1203 assert_eq!(&buf[..written], b"helloworld");
1204
1205 assert_eq!(stream.send.off_front(), 10);
1206
1207 let (written, fin) = stream.send.emit(&mut buf[..10]).unwrap();
1208 assert_eq!(written, 5);
1209 assert!(!fin);
1210 assert_eq!(&buf[..written], b"somet");
1211 }
1212
1213 #[test]
1214 fn send_past_fin() {
1215 let mut stream = <Stream>::new(0, 0, 15, true, 0, DEFAULT_STREAM_WINDOW);
1216
1217 let first = b"hello";
1218 let second = b"world";
1219 let third = b"third";
1220
1221 assert_eq!(stream.send.write(first, false), Ok(5));
1222
1223 assert_eq!(stream.send.write(second, true), Ok(5));
1224 assert!(stream.send.is_fin());
1225
1226 assert_eq!(stream.send.write(third, false), Err(Error::FinalSize));
1227 }
1228
1229 #[test]
1230 fn send_fin_dup() {
1231 let mut stream = <Stream>::new(0, 0, 15, true, 0, DEFAULT_STREAM_WINDOW);
1232
1233 assert_eq!(stream.send.write(b"hello", true), Ok(5));
1234 assert!(stream.send.is_fin());
1235
1236 assert_eq!(stream.send.write(b"", true), Ok(0));
1237 assert!(stream.send.is_fin());
1238 }
1239
1240 #[test]
1241 fn send_undo_fin() {
1242 let mut stream = <Stream>::new(0, 0, 15, true, 0, DEFAULT_STREAM_WINDOW);
1243
1244 assert_eq!(stream.send.write(b"hello", true), Ok(5));
1245 assert!(stream.send.is_fin());
1246
1247 assert_eq!(
1248 stream.send.write(b"helloworld", true),
1249 Err(Error::FinalSize)
1250 );
1251 }
1252
1253 #[test]
1254 fn send_fin_max_data_match() {
1255 let mut buf = [0; 15];
1256
1257 let mut stream = <Stream>::new(0, 0, 15, true, 0, DEFAULT_STREAM_WINDOW);
1258
1259 let slice = b"hellohellohello";
1260
1261 assert!(stream.send.write(slice, true).is_ok());
1262
1263 let (written, fin) = stream.send.emit(&mut buf[..15]).unwrap();
1264 assert_eq!(written, 15);
1265 assert!(fin);
1266 assert_eq!(&buf[..written], slice);
1267 }
1268
1269 #[test]
1270 fn send_fin_zero_length() {
1271 let mut buf = [0; 5];
1272
1273 let mut stream = <Stream>::new(0, 0, 15, true, 0, DEFAULT_STREAM_WINDOW);
1274
1275 assert_eq!(stream.send.write(b"hello", false), Ok(5));
1276 assert_eq!(stream.send.write(b"", true), Ok(0));
1277 assert!(stream.send.is_fin());
1278
1279 let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1280 assert_eq!(written, 5);
1281 assert!(fin);
1282 assert_eq!(&buf[..written], b"hello");
1283 }
1284
1285 #[test]
1286 fn send_ack() {
1287 let mut buf = [0; 5];
1288
1289 let mut stream = <Stream>::new(0, 0, 15, true, 0, DEFAULT_STREAM_WINDOW);
1290
1291 assert_eq!(stream.send.write(b"hello", false), Ok(5));
1292 assert_eq!(stream.send.write(b"world", false), Ok(5));
1293 assert_eq!(stream.send.write(b"", true), Ok(0));
1294 assert!(stream.send.is_fin());
1295
1296 assert_eq!(stream.send.off_front(), 0);
1297
1298 let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1299 assert_eq!(written, 5);
1300 assert!(!fin);
1301 assert_eq!(&buf[..written], b"hello");
1302
1303 stream.send.ack_and_drop(0, 5);
1304
1305 stream.send.retransmit(0, 5);
1306
1307 assert_eq!(stream.send.off_front(), 5);
1308
1309 let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1310 assert_eq!(written, 5);
1311 assert!(fin);
1312 assert_eq!(&buf[..written], b"world");
1313 }
1314
1315 #[test]
1316 fn send_ack_reordering() {
1317 let mut buf = [0; 5];
1318
1319 let mut stream = <Stream>::new(0, 0, 15, true, 0, DEFAULT_STREAM_WINDOW);
1320
1321 assert_eq!(stream.send.write(b"hello", false), Ok(5));
1322 assert_eq!(stream.send.write(b"world", false), Ok(5));
1323 assert_eq!(stream.send.write(b"", true), Ok(0));
1324 assert!(stream.send.is_fin());
1325
1326 assert_eq!(stream.send.off_front(), 0);
1327
1328 let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1329 assert_eq!(written, 5);
1330 assert!(!fin);
1331 assert_eq!(&buf[..written], b"hello");
1332
1333 assert_eq!(stream.send.off_front(), 5);
1334
1335 let (written, fin) = stream.send.emit(&mut buf[..1]).unwrap();
1336 assert_eq!(written, 1);
1337 assert!(!fin);
1338 assert_eq!(&buf[..written], b"w");
1339
1340 stream.send.ack_and_drop(5, 1);
1341 stream.send.ack_and_drop(0, 5);
1342
1343 stream.send.retransmit(0, 5);
1344 stream.send.retransmit(5, 1);
1345
1346 assert_eq!(stream.send.off_front(), 6);
1347
1348 let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1349 assert_eq!(written, 4);
1350 assert!(fin);
1351 assert_eq!(&buf[..written], b"orld");
1352 }
1353
1354 #[test]
1355 fn recv_data_below_off() {
1356 let mut stream = <Stream>::new(0, 15, 0, true, 15, DEFAULT_STREAM_WINDOW);
1357
1358 let first = RangeBuf::from(b"hello", 0, false);
1359
1360 assert_eq!(stream.recv.write(first), Ok(()));
1361
1362 let mut buf = [0; 10];
1363
1364 let (len, fin) = stream.recv.emit(&mut buf).unwrap();
1365 assert_eq!(&buf[..len], b"hello");
1366 assert!(!fin);
1367
1368 let first = RangeBuf::from(b"elloworld", 1, true);
1369 assert_eq!(stream.recv.write(first), Ok(()));
1370
1371 let (len, fin) = stream.recv.emit(&mut buf).unwrap();
1372 assert_eq!(&buf[..len], b"world");
1373 assert!(fin);
1374 }
1375
1376 #[test]
1377 fn stream_complete() {
1378 let mut stream =
1379 <Stream>::new(0, 30, 30, true, 30, DEFAULT_STREAM_WINDOW);
1380
1381 assert_eq!(stream.send.write(b"hello", false), Ok(5));
1382 assert_eq!(stream.send.write(b"world", false), Ok(5));
1383
1384 assert!(!stream.send.is_complete());
1385 assert!(!stream.send.is_fin());
1386
1387 assert_eq!(stream.send.write(b"", true), Ok(0));
1388
1389 assert!(!stream.send.is_complete());
1390 assert!(stream.send.is_fin());
1391
1392 let buf = RangeBuf::from(b"hello", 0, true);
1393 assert!(stream.recv.write(buf).is_ok());
1394 assert!(!stream.recv.is_fin());
1395
1396 stream.send.ack(6, 4);
1397 assert!(!stream.send.is_complete());
1398
1399 let mut buf = [0; 2];
1400 assert_eq!(stream.recv.emit(&mut buf), Ok((2, false)));
1401 assert!(!stream.recv.is_fin());
1402
1403 stream.send.ack(1, 5);
1404 assert!(!stream.send.is_complete());
1405
1406 stream.send.ack(0, 1);
1407 assert!(stream.send.is_complete());
1408
1409 assert!(!stream.is_complete());
1410
1411 let mut buf = [0; 3];
1412 assert_eq!(stream.recv.emit(&mut buf), Ok((3, true)));
1413 assert!(stream.recv.is_fin());
1414
1415 assert!(stream.is_complete());
1416 }
1417
1418 #[test]
1419 fn send_fin_zero_length_output() {
1420 let mut buf = [0; 5];
1421
1422 let mut stream = <Stream>::new(0, 0, 15, true, 0, DEFAULT_STREAM_WINDOW);
1423
1424 assert_eq!(stream.send.write(b"hello", false), Ok(5));
1425 assert_eq!(stream.send.off_front(), 0);
1426 assert!(!stream.send.is_fin());
1427
1428 let (written, fin) = stream.send.emit(&mut buf).unwrap();
1429 assert_eq!(written, 5);
1430 assert!(!fin);
1431 assert_eq!(&buf[..written], b"hello");
1432
1433 assert_eq!(stream.send.write(b"", true), Ok(0));
1434 assert!(stream.send.is_fin());
1435 assert_eq!(stream.send.off_front(), 5);
1436
1437 let (written, fin) = stream.send.emit(&mut buf).unwrap();
1438 assert_eq!(written, 0);
1439 assert!(fin);
1440 assert_eq!(&buf[..written], b"");
1441 }
1442
1443 fn stream_send_ready(stream: &Stream) -> bool {
1444 !stream.send.is_empty() &&
1445 stream.send.off_front() < stream.send.off_back()
1446 }
1447
1448 #[test]
1449 fn send_emit() {
1450 let mut buf = [0; 5];
1451
1452 let mut stream = <Stream>::new(0, 0, 20, true, 0, DEFAULT_STREAM_WINDOW);
1453
1454 assert_eq!(stream.send.write(b"hello", false), Ok(5));
1455 assert_eq!(stream.send.write(b"world", false), Ok(5));
1456 assert_eq!(stream.send.write(b"olleh", false), Ok(5));
1457 assert_eq!(stream.send.write(b"dlrow", true), Ok(5));
1458 assert_eq!(stream.send.off_front(), 0);
1459 assert_eq!(stream.send.bufs_count(), 4);
1460
1461 assert!(stream.is_flushable());
1462
1463 assert!(stream_send_ready(&stream));
1464 assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1465 assert_eq!(stream.send.off_front(), 4);
1466 assert_eq!(&buf[..4], b"hell");
1467
1468 assert!(stream_send_ready(&stream));
1469 assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1470 assert_eq!(stream.send.off_front(), 8);
1471 assert_eq!(&buf[..4], b"owor");
1472
1473 assert!(stream_send_ready(&stream));
1474 assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1475 assert_eq!(stream.send.off_front(), 10);
1476 assert_eq!(&buf[..2], b"ld");
1477
1478 assert!(stream_send_ready(&stream));
1479 assert_eq!(stream.send.emit(&mut buf[..1]), Ok((1, false)));
1480 assert_eq!(stream.send.off_front(), 11);
1481 assert_eq!(&buf[..1], b"o");
1482
1483 assert!(stream_send_ready(&stream));
1484 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1485 assert_eq!(stream.send.off_front(), 16);
1486 assert_eq!(&buf[..5], b"llehd");
1487
1488 assert!(stream_send_ready(&stream));
1489 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((4, true)));
1490 assert_eq!(stream.send.off_front(), 20);
1491 assert_eq!(&buf[..4], b"lrow");
1492
1493 assert!(!stream.is_flushable());
1494
1495 assert!(!stream_send_ready(&stream));
1496 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((0, true)));
1497 assert_eq!(stream.send.off_front(), 20);
1498 }
1499
1500 #[test]
1501 fn send_emit_ack() {
1502 let mut buf = [0; 5];
1503
1504 let mut stream = <Stream>::new(0, 0, 20, true, 0, DEFAULT_STREAM_WINDOW);
1505
1506 assert_eq!(stream.send.write(b"hello", false), Ok(5));
1507 assert_eq!(stream.send.write(b"world", false), Ok(5));
1508 assert_eq!(stream.send.write(b"olleh", false), Ok(5));
1509 assert_eq!(stream.send.write(b"dlrow", true), Ok(5));
1510 assert_eq!(stream.send.off_front(), 0);
1511 assert_eq!(stream.send.bufs_count(), 4);
1512
1513 assert!(stream.is_flushable());
1514
1515 assert!(stream_send_ready(&stream));
1516 assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1517 assert_eq!(stream.send.off_front(), 4);
1518 assert_eq!(&buf[..4], b"hell");
1519
1520 assert!(stream_send_ready(&stream));
1521 assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1522 assert_eq!(stream.send.off_front(), 8);
1523 assert_eq!(&buf[..4], b"owor");
1524
1525 stream.send.ack_and_drop(0, 5);
1526 assert_eq!(stream.send.bufs_count(), 3);
1527
1528 assert!(stream_send_ready(&stream));
1529 assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1530 assert_eq!(stream.send.off_front(), 10);
1531 assert_eq!(&buf[..2], b"ld");
1532
1533 stream.send.ack_and_drop(7, 5);
1534 assert_eq!(stream.send.bufs_count(), 3);
1535
1536 assert!(stream_send_ready(&stream));
1537 assert_eq!(stream.send.emit(&mut buf[..1]), Ok((1, false)));
1538 assert_eq!(stream.send.off_front(), 11);
1539 assert_eq!(&buf[..1], b"o");
1540
1541 assert!(stream_send_ready(&stream));
1542 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1543 assert_eq!(stream.send.off_front(), 16);
1544 assert_eq!(&buf[..5], b"llehd");
1545
1546 stream.send.ack_and_drop(5, 7);
1547 assert_eq!(stream.send.bufs_count(), 2);
1548
1549 assert!(stream_send_ready(&stream));
1550 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((4, true)));
1551 assert_eq!(stream.send.off_front(), 20);
1552 assert_eq!(&buf[..4], b"lrow");
1553
1554 assert!(!stream.is_flushable());
1555
1556 assert!(!stream_send_ready(&stream));
1557 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((0, true)));
1558 assert_eq!(stream.send.off_front(), 20);
1559
1560 stream.send.ack_and_drop(22, 4);
1561 assert_eq!(stream.send.bufs_count(), 2);
1562
1563 stream.send.ack_and_drop(20, 1);
1564 assert_eq!(stream.send.bufs_count(), 2);
1565 }
1566
1567 #[test]
1568 fn send_emit_retransmit() {
1569 let mut buf = [0; 5];
1570
1571 let mut stream = <Stream>::new(
1572 0,
1573 0,
1574 20,
1575 true,
1576 DEFAULT_STREAM_WINDOW,
1577 DEFAULT_STREAM_WINDOW,
1578 );
1579
1580 assert_eq!(stream.send.write(b"hello", false), Ok(5));
1581 assert_eq!(stream.send.write(b"world", false), Ok(5));
1582 assert_eq!(stream.send.write(b"olleh", false), Ok(5));
1583 assert_eq!(stream.send.write(b"dlrow", true), Ok(5));
1584 assert_eq!(stream.send.off_front(), 0);
1585 assert_eq!(stream.send.bufs_count(), 4);
1586
1587 assert!(stream.is_flushable());
1588
1589 assert!(stream_send_ready(&stream));
1590 assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1591 assert_eq!(stream.send.off_front(), 4);
1592 assert_eq!(&buf[..4], b"hell");
1593
1594 assert!(stream_send_ready(&stream));
1595 assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1596 assert_eq!(stream.send.off_front(), 8);
1597 assert_eq!(&buf[..4], b"owor");
1598
1599 stream.send.retransmit(3, 3);
1600 assert_eq!(stream.send.off_front(), 3);
1601
1602 assert!(stream_send_ready(&stream));
1603 assert_eq!(stream.send.emit(&mut buf[..3]), Ok((3, false)));
1604 assert_eq!(stream.send.off_front(), 8);
1605 assert_eq!(&buf[..3], b"low");
1606
1607 assert!(stream_send_ready(&stream));
1608 assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1609 assert_eq!(stream.send.off_front(), 10);
1610 assert_eq!(&buf[..2], b"ld");
1611
1612 stream.send.ack_and_drop(7, 2);
1613
1614 stream.send.retransmit(8, 2);
1615
1616 assert!(stream_send_ready(&stream));
1617 assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1618 assert_eq!(stream.send.off_front(), 10);
1619 assert_eq!(&buf[..2], b"ld");
1620
1621 assert!(stream_send_ready(&stream));
1622 assert_eq!(stream.send.emit(&mut buf[..1]), Ok((1, false)));
1623 assert_eq!(stream.send.off_front(), 11);
1624 assert_eq!(&buf[..1], b"o");
1625
1626 assert!(stream_send_ready(&stream));
1627 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1628 assert_eq!(stream.send.off_front(), 16);
1629 assert_eq!(&buf[..5], b"llehd");
1630
1631 stream.send.retransmit(12, 2);
1632
1633 assert!(stream_send_ready(&stream));
1634 assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1635 assert_eq!(stream.send.off_front(), 16);
1636 assert_eq!(&buf[..2], b"le");
1637
1638 assert!(stream_send_ready(&stream));
1639 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((4, true)));
1640 assert_eq!(stream.send.off_front(), 20);
1641 assert_eq!(&buf[..4], b"lrow");
1642
1643 assert!(!stream.is_flushable());
1644
1645 assert!(!stream_send_ready(&stream));
1646 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((0, true)));
1647 assert_eq!(stream.send.off_front(), 20);
1648
1649 stream.send.retransmit(7, 12);
1650
1651 assert!(stream_send_ready(&stream));
1652 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1653 assert_eq!(stream.send.off_front(), 12);
1654 assert_eq!(&buf[..5], b"rldol");
1655
1656 assert!(stream_send_ready(&stream));
1657 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1658 assert_eq!(stream.send.off_front(), 17);
1659 assert_eq!(&buf[..5], b"lehdl");
1660
1661 assert!(stream_send_ready(&stream));
1662 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((2, false)));
1663 assert_eq!(stream.send.off_front(), 20);
1664 assert_eq!(&buf[..2], b"ro");
1665
1666 stream.send.ack_and_drop(12, 7);
1667
1668 stream.send.retransmit(7, 12);
1669
1670 assert!(stream_send_ready(&stream));
1671 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1672 assert_eq!(stream.send.off_front(), 12);
1673 assert_eq!(&buf[..5], b"rldol");
1674
1675 assert!(stream_send_ready(&stream));
1676 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1677 assert_eq!(stream.send.off_front(), 17);
1678 assert_eq!(&buf[..5], b"lehdl");
1679
1680 assert!(stream_send_ready(&stream));
1681 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((2, false)));
1682 assert_eq!(stream.send.off_front(), 20);
1683 assert_eq!(&buf[..2], b"ro");
1684 }
1685
1686 #[test]
1687 fn rangebuf_split_off() {
1688 let mut buf = <RangeBuf>::from(b"helloworld", 5, true);
1689 assert_eq!(buf.start, 0);
1690 assert_eq!(buf.pos, 0);
1691 assert_eq!(buf.len, 10);
1692 assert_eq!(buf.off, 5);
1693 assert!(buf.fin);
1694
1695 assert_eq!(buf.len(), 10);
1696 assert_eq!(buf.off(), 5);
1697 assert!(buf.fin());
1698
1699 assert_eq!(&buf[..], b"helloworld");
1700
1701 buf.consume(5);
1703
1704 assert_eq!(buf.start, 0);
1705 assert_eq!(buf.pos, 5);
1706 assert_eq!(buf.len, 10);
1707 assert_eq!(buf.off, 5);
1708 assert!(buf.fin);
1709
1710 assert_eq!(buf.len(), 5);
1711 assert_eq!(buf.off(), 10);
1712 assert!(buf.fin());
1713
1714 assert_eq!(&buf[..], b"world");
1715
1716 let mut new_buf = buf.split_off(3);
1718
1719 assert_eq!(buf.start, 0);
1720 assert_eq!(buf.pos, 3);
1721 assert_eq!(buf.len, 3);
1722 assert_eq!(buf.off, 5);
1723 assert!(!buf.fin);
1724
1725 assert_eq!(buf.len(), 0);
1726 assert_eq!(buf.off(), 8);
1727 assert!(!buf.fin());
1728
1729 assert_eq!(&buf[..], b"");
1730
1731 assert_eq!(new_buf.start, 3);
1732 assert_eq!(new_buf.pos, 5);
1733 assert_eq!(new_buf.len, 7);
1734 assert_eq!(new_buf.off, 8);
1735 assert!(new_buf.fin);
1736
1737 assert_eq!(new_buf.len(), 5);
1738 assert_eq!(new_buf.off(), 10);
1739 assert!(new_buf.fin());
1740
1741 assert_eq!(&new_buf[..], b"world");
1742
1743 new_buf.consume(2);
1745
1746 assert_eq!(new_buf.start, 3);
1747 assert_eq!(new_buf.pos, 7);
1748 assert_eq!(new_buf.len, 7);
1749 assert_eq!(new_buf.off, 8);
1750 assert!(new_buf.fin);
1751
1752 assert_eq!(new_buf.len(), 3);
1753 assert_eq!(new_buf.off(), 12);
1754 assert!(new_buf.fin());
1755
1756 assert_eq!(&new_buf[..], b"rld");
1757
1758 let mut new_new_buf = new_buf.split_off(5);
1760
1761 assert_eq!(new_buf.start, 3);
1762 assert_eq!(new_buf.pos, 7);
1763 assert_eq!(new_buf.len, 5);
1764 assert_eq!(new_buf.off, 8);
1765 assert!(!new_buf.fin);
1766
1767 assert_eq!(new_buf.len(), 1);
1768 assert_eq!(new_buf.off(), 12);
1769 assert!(!new_buf.fin());
1770
1771 assert_eq!(&new_buf[..], b"r");
1772
1773 assert_eq!(new_new_buf.start, 8);
1774 assert_eq!(new_new_buf.pos, 8);
1775 assert_eq!(new_new_buf.len, 2);
1776 assert_eq!(new_new_buf.off, 13);
1777 assert!(new_new_buf.fin);
1778
1779 assert_eq!(new_new_buf.len(), 2);
1780 assert_eq!(new_new_buf.off(), 13);
1781 assert!(new_new_buf.fin());
1782
1783 assert_eq!(&new_new_buf[..], b"ld");
1784
1785 new_new_buf.consume(2);
1787
1788 assert_eq!(new_new_buf.start, 8);
1789 assert_eq!(new_new_buf.pos, 10);
1790 assert_eq!(new_new_buf.len, 2);
1791 assert_eq!(new_new_buf.off, 13);
1792 assert!(new_new_buf.fin);
1793
1794 assert_eq!(new_new_buf.len(), 0);
1795 assert_eq!(new_new_buf.off(), 15);
1796 assert!(new_new_buf.fin());
1797
1798 assert_eq!(&new_new_buf[..], b"");
1799 }
1800
1801 #[test]
1804 fn stream_limit_auto_open() {
1805 let local_tp = crate::TransportParams::default();
1806 let peer_tp = crate::TransportParams::default();
1807
1808 let mut streams = <StreamMap>::new(5, 5, 5);
1809
1810 let stream_id = 500;
1811 assert!(!is_local(stream_id, true), "stream id is peer initiated");
1812 assert!(is_bidi(stream_id), "stream id is bidirectional");
1813 assert_eq!(
1814 streams
1815 .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
1816 .err(),
1817 Some(Error::StreamLimit),
1818 "stream limit should be exceeded"
1819 );
1820 }
1821
1822 #[test]
1825 fn stream_create_out_of_order() {
1826 let local_tp = crate::TransportParams::default();
1827 let peer_tp = crate::TransportParams::default();
1828
1829 let mut streams = <StreamMap>::new(5, 5, 5);
1830
1831 for stream_id in [8, 12, 4] {
1832 assert!(is_local(stream_id, false), "stream id is client initiated");
1833 assert!(is_bidi(stream_id), "stream id is bidirectional");
1834 assert!(streams
1835 .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
1836 .is_ok());
1837 }
1838 }
1839
1840 #[test]
1842 fn stream_limit_edge() {
1843 let local_tp = crate::TransportParams::default();
1844 let peer_tp = crate::TransportParams::default();
1845
1846 let mut streams = <StreamMap>::new(3, 3, 3);
1847
1848 let stream_id = 8;
1850 assert!(streams
1851 .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
1852 .is_ok());
1853
1854 let stream_id = 12;
1856 assert_eq!(
1857 streams
1858 .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
1859 .err(),
1860 Some(Error::StreamLimit)
1861 );
1862 }
1863
1864 fn cycle_stream_priority(stream_id: u64, streams: &mut StreamMap) {
1865 let key = streams.get(stream_id).unwrap().priority_key.clone();
1866 streams.update_priority(&key.clone(), &key);
1867 }
1868
1869 #[test]
1870 fn writable_prioritized_default_priority() {
1871 let local_tp = crate::TransportParams::default();
1872 let peer_tp = crate::TransportParams {
1873 initial_max_stream_data_bidi_local: 100,
1874 initial_max_stream_data_uni: 100,
1875 ..Default::default()
1876 };
1877
1878 let mut streams = StreamMap::new(100, 100, 100);
1879
1880 for id in [0, 4, 8, 12] {
1881 assert!(streams
1882 .get_or_create(id, &local_tp, &peer_tp, false, true)
1883 .is_ok());
1884 }
1885
1886 let walk_1: Vec<u64> = streams.writable().collect();
1887 cycle_stream_priority(*walk_1.first().unwrap(), &mut streams);
1888 let walk_2: Vec<u64> = streams.writable().collect();
1889 cycle_stream_priority(*walk_2.first().unwrap(), &mut streams);
1890 let walk_3: Vec<u64> = streams.writable().collect();
1891 cycle_stream_priority(*walk_3.first().unwrap(), &mut streams);
1892 let walk_4: Vec<u64> = streams.writable().collect();
1893 cycle_stream_priority(*walk_4.first().unwrap(), &mut streams);
1894 let walk_5: Vec<u64> = streams.writable().collect();
1895
1896 assert_eq!(walk_1, vec![0, 4, 8, 12]);
1899 assert_eq!(walk_2, vec![4, 8, 12, 0]);
1900 assert_eq!(walk_3, vec![8, 12, 0, 4]);
1901 assert_eq!(walk_4, vec![12, 0, 4, 8,]);
1902 assert_eq!(walk_5, vec![0, 4, 8, 12]);
1903 }
1904
1905 #[test]
1906 fn writable_prioritized_insert_order() {
1907 let local_tp = crate::TransportParams::default();
1908 let peer_tp = crate::TransportParams {
1909 initial_max_stream_data_bidi_local: 100,
1910 initial_max_stream_data_uni: 100,
1911 ..Default::default()
1912 };
1913
1914 let mut streams = StreamMap::new(100, 100, 100);
1915
1916 for id in [12, 4, 8, 0] {
1919 assert!(streams
1920 .get_or_create(id, &local_tp, &peer_tp, false, true)
1921 .is_ok());
1922 }
1923
1924 let walk_1: Vec<u64> = streams.writable().collect();
1925 cycle_stream_priority(*walk_1.first().unwrap(), &mut streams);
1926 let walk_2: Vec<u64> = streams.writable().collect();
1927 cycle_stream_priority(*walk_2.first().unwrap(), &mut streams);
1928 let walk_3: Vec<u64> = streams.writable().collect();
1929 cycle_stream_priority(*walk_3.first().unwrap(), &mut streams);
1930 let walk_4: Vec<u64> = streams.writable().collect();
1931 cycle_stream_priority(*walk_4.first().unwrap(), &mut streams);
1932 let walk_5: Vec<u64> = streams.writable().collect();
1933 assert_eq!(walk_1, vec![12, 4, 8, 0]);
1934 assert_eq!(walk_2, vec![4, 8, 0, 12]);
1935 assert_eq!(walk_3, vec![8, 0, 12, 4,]);
1936 assert_eq!(walk_4, vec![0, 12, 4, 8]);
1937 assert_eq!(walk_5, vec![12, 4, 8, 0]);
1938 }
1939
1940 #[test]
1941 fn writable_prioritized_mixed_urgency() {
1942 let local_tp = crate::TransportParams::default();
1943 let peer_tp = crate::TransportParams {
1944 initial_max_stream_data_bidi_local: 100,
1945 initial_max_stream_data_uni: 100,
1946 ..Default::default()
1947 };
1948
1949 let mut streams = <StreamMap>::new(100, 100, 100);
1950
1951 let input = vec![
1954 (0, 100),
1955 (4, 90),
1956 (8, 80),
1957 (12, 70),
1958 (16, 60),
1959 (20, 50),
1960 (24, 40),
1961 (28, 30),
1962 (32, 20),
1963 (36, 10),
1964 (40, 0),
1965 ];
1966
1967 for (id, urgency) in input.clone() {
1968 let stream = streams
1971 .get_or_create(id, &local_tp, &peer_tp, false, true)
1972 .unwrap();
1973
1974 stream.urgency = urgency;
1975
1976 let new_priority_key = Arc::new(StreamPriorityKey {
1977 urgency: stream.urgency,
1978 incremental: stream.incremental,
1979 id,
1980 ..Default::default()
1981 });
1982
1983 let old_priority_key = std::mem::replace(
1984 &mut stream.priority_key,
1985 new_priority_key.clone(),
1986 );
1987
1988 streams.update_priority(&old_priority_key, &new_priority_key);
1989 }
1990
1991 let walk_1: Vec<u64> = streams.writable().collect();
1992 assert_eq!(walk_1, vec![40, 36, 32, 28, 24, 20, 16, 12, 8, 4, 0]);
1993
1994 for (id, urgency) in input {
1996 let stream = streams
1999 .get_or_create(id, &local_tp, &peer_tp, false, true)
2000 .unwrap();
2001
2002 stream.urgency = urgency;
2003
2004 let new_priority_key = Arc::new(StreamPriorityKey {
2005 urgency: stream.urgency,
2006 incremental: stream.incremental,
2007 id,
2008 ..Default::default()
2009 });
2010
2011 let old_priority_key = std::mem::replace(
2012 &mut stream.priority_key,
2013 new_priority_key.clone(),
2014 );
2015
2016 streams.update_priority(&old_priority_key, &new_priority_key);
2017 }
2018
2019 let walk_2: Vec<u64> = streams.writable().collect();
2020 assert_eq!(walk_2, vec![40, 36, 32, 28, 24, 20, 16, 12, 8, 4, 0]);
2021
2022 streams.collect(24, true);
2024
2025 let walk_3: Vec<u64> = streams.writable().collect();
2026 assert_eq!(walk_3, vec![40, 36, 32, 28, 20, 16, 12, 8, 4, 0]);
2027
2028 streams.collect(40, true);
2029 streams.collect(0, true);
2030
2031 let walk_4: Vec<u64> = streams.writable().collect();
2032 assert_eq!(walk_4, vec![36, 32, 28, 20, 16, 12, 8, 4]);
2033
2034 streams
2036 .get_or_create(44, &local_tp, &peer_tp, false, true)
2037 .unwrap();
2038
2039 let walk_5: Vec<u64> = streams.writable().collect();
2040 assert_eq!(walk_5, vec![36, 32, 28, 20, 16, 12, 8, 4, 44]);
2041 }
2042
2043 #[test]
2044 fn writable_prioritized_mixed_urgencies_incrementals() {
2045 let local_tp = crate::TransportParams::default();
2046 let peer_tp = crate::TransportParams {
2047 initial_max_stream_data_bidi_local: 100,
2048 initial_max_stream_data_uni: 100,
2049 ..Default::default()
2050 };
2051
2052 let mut streams = StreamMap::new(100, 100, 100);
2053
2054 let input = vec![
2056 (0, 100),
2057 (4, 20),
2058 (8, 100),
2059 (12, 20),
2060 (16, 90),
2061 (20, 25),
2062 (24, 90),
2063 (28, 30),
2064 (32, 80),
2065 (36, 20),
2066 (40, 0),
2067 ];
2068
2069 for (id, urgency) in input.clone() {
2070 let stream = streams
2073 .get_or_create(id, &local_tp, &peer_tp, false, true)
2074 .unwrap();
2075
2076 stream.urgency = urgency;
2077
2078 let new_priority_key = Arc::new(StreamPriorityKey {
2079 urgency: stream.urgency,
2080 incremental: stream.incremental,
2081 id,
2082 ..Default::default()
2083 });
2084
2085 let old_priority_key = std::mem::replace(
2086 &mut stream.priority_key,
2087 new_priority_key.clone(),
2088 );
2089
2090 streams.update_priority(&old_priority_key, &new_priority_key);
2091 }
2092
2093 let walk_1: Vec<u64> = streams.writable().collect();
2094 cycle_stream_priority(4, &mut streams);
2095 cycle_stream_priority(16, &mut streams);
2096 cycle_stream_priority(0, &mut streams);
2097 let walk_2: Vec<u64> = streams.writable().collect();
2098 cycle_stream_priority(12, &mut streams);
2099 cycle_stream_priority(24, &mut streams);
2100 cycle_stream_priority(8, &mut streams);
2101 let walk_3: Vec<u64> = streams.writable().collect();
2102 cycle_stream_priority(36, &mut streams);
2103 cycle_stream_priority(16, &mut streams);
2104 cycle_stream_priority(0, &mut streams);
2105 let walk_4: Vec<u64> = streams.writable().collect();
2106 cycle_stream_priority(4, &mut streams);
2107 cycle_stream_priority(24, &mut streams);
2108 cycle_stream_priority(8, &mut streams);
2109 let walk_5: Vec<u64> = streams.writable().collect();
2110 cycle_stream_priority(12, &mut streams);
2111 cycle_stream_priority(16, &mut streams);
2112 cycle_stream_priority(0, &mut streams);
2113 let walk_6: Vec<u64> = streams.writable().collect();
2114 cycle_stream_priority(36, &mut streams);
2115 cycle_stream_priority(24, &mut streams);
2116 cycle_stream_priority(8, &mut streams);
2117 let walk_7: Vec<u64> = streams.writable().collect();
2118 cycle_stream_priority(4, &mut streams);
2119 cycle_stream_priority(16, &mut streams);
2120 cycle_stream_priority(0, &mut streams);
2121 let walk_8: Vec<u64> = streams.writable().collect();
2122 cycle_stream_priority(12, &mut streams);
2123 cycle_stream_priority(24, &mut streams);
2124 cycle_stream_priority(8, &mut streams);
2125 let walk_9: Vec<u64> = streams.writable().collect();
2126 cycle_stream_priority(36, &mut streams);
2127 cycle_stream_priority(16, &mut streams);
2128 cycle_stream_priority(0, &mut streams);
2129
2130 assert_eq!(walk_1, vec![40, 4, 12, 36, 20, 28, 32, 16, 24, 0, 8]);
2131 assert_eq!(walk_2, vec![40, 12, 36, 4, 20, 28, 32, 24, 16, 8, 0]);
2132 assert_eq!(walk_3, vec![40, 36, 4, 12, 20, 28, 32, 16, 24, 0, 8]);
2133 assert_eq!(walk_4, vec![40, 4, 12, 36, 20, 28, 32, 24, 16, 8, 0]);
2134 assert_eq!(walk_5, vec![40, 12, 36, 4, 20, 28, 32, 16, 24, 0, 8]);
2135 assert_eq!(walk_6, vec![40, 36, 4, 12, 20, 28, 32, 24, 16, 8, 0]);
2136 assert_eq!(walk_7, vec![40, 4, 12, 36, 20, 28, 32, 16, 24, 0, 8]);
2137 assert_eq!(walk_8, vec![40, 12, 36, 4, 20, 28, 32, 24, 16, 8, 0]);
2138 assert_eq!(walk_9, vec![40, 36, 4, 12, 20, 28, 32, 16, 24, 0, 8]);
2139
2140 streams.collect(20, true);
2142
2143 let walk_10: Vec<u64> = streams.writable().collect();
2144 assert_eq!(walk_10, vec![40, 4, 12, 36, 28, 32, 24, 16, 8, 0]);
2145
2146 let stream = streams
2148 .get_or_create(44, &local_tp, &peer_tp, false, true)
2149 .unwrap();
2150
2151 stream.urgency = 20;
2152 stream.incremental = true;
2153
2154 let new_priority_key = Arc::new(StreamPriorityKey {
2155 urgency: stream.urgency,
2156 incremental: stream.incremental,
2157 id: 44,
2158 ..Default::default()
2159 });
2160
2161 let old_priority_key =
2162 std::mem::replace(&mut stream.priority_key, new_priority_key.clone());
2163
2164 streams.update_priority(&old_priority_key, &new_priority_key);
2165
2166 let walk_11: Vec<u64> = streams.writable().collect();
2167 assert_eq!(walk_11, vec![40, 4, 12, 36, 44, 28, 32, 24, 16, 8, 0]);
2168 }
2169
2170 #[test]
2171 fn priority_tree_dupes() {
2172 let mut prioritized_writable: RBTree<StreamWritablePriorityAdapter> =
2173 Default::default();
2174
2175 for id in [0, 4, 8, 12] {
2176 let s = Arc::new(StreamPriorityKey {
2177 urgency: 0,
2178 incremental: false,
2179 id,
2180 ..Default::default()
2181 });
2182
2183 prioritized_writable.insert(s);
2184 }
2185
2186 let walk_1: Vec<u64> =
2187 prioritized_writable.iter().map(|s| s.id).collect();
2188 assert_eq!(walk_1, vec![0, 4, 8, 12]);
2189
2190 for id in [0, 4, 8, 12] {
2193 let s = Arc::new(StreamPriorityKey {
2194 urgency: 0,
2195 incremental: false,
2196 id,
2197 ..Default::default()
2198 });
2199
2200 prioritized_writable.insert(s);
2201 }
2202
2203 let walk_2: Vec<u64> =
2204 prioritized_writable.iter().map(|s| s.id).collect();
2205 assert_eq!(walk_2, vec![0, 0, 4, 4, 8, 8, 12, 12]);
2206 }
2207}
2208
2209mod recv_buf;
2210mod send_buf;