1use std::cmp;
28
29use std::sync::Arc;
30
31use std::collections::hash_map;
32use std::collections::HashMap;
33use std::collections::HashSet;
34
35use intrusive_collections::intrusive_adapter;
36use intrusive_collections::KeyAdapter;
37use intrusive_collections::RBTree;
38use intrusive_collections::RBTreeAtomicLink;
39
40use smallvec::SmallVec;
41
42use crate::range_buf::DefaultBufFactory;
43use crate::BufFactory;
44use crate::Error;
45use crate::Result;
46
47const DEFAULT_URGENCY: u8 = 127;
48
49const DEFAULT_STREAM_WINDOW: u64 = 32 * 1024;
51
52pub const MAX_STREAM_WINDOW: u64 = 16 * 1024 * 1024;
54
55#[derive(Default)]
60pub struct StreamIdHasher {
61 id: u64,
62}
63
64impl std::hash::Hasher for StreamIdHasher {
65 #[inline]
66 fn finish(&self) -> u64 {
67 self.id
68 }
69
70 #[inline]
71 fn write_u64(&mut self, id: u64) {
72 self.id = id;
73 }
74
75 #[inline]
76 fn write(&mut self, _: &[u8]) {
77 unimplemented!()
80 }
81}
82
83type BuildStreamIdHasher = std::hash::BuildHasherDefault<StreamIdHasher>;
84
85pub type StreamIdHashMap<V> = HashMap<u64, V, BuildStreamIdHasher>;
86pub type StreamIdHashSet = HashSet<u64, BuildStreamIdHasher>;
87
88#[derive(Default)]
90pub struct StreamMap<F: BufFactory = DefaultBufFactory> {
91 streams: StreamIdHashMap<Stream<F>>,
93
94 collected: StreamIdHashSet,
100
101 peer_max_streams_bidi: u64,
103
104 peer_max_streams_uni: u64,
106
107 peer_opened_streams_bidi: u64,
109
110 peer_opened_streams_uni: u64,
112
113 local_max_streams_bidi: u64,
115 local_max_streams_bidi_next: u64,
116
117 local_max_streams_uni: u64,
119 local_max_streams_uni_next: u64,
120
121 local_opened_streams_bidi: u64,
123
124 local_opened_streams_uni: u64,
126
127 flushable: RBTree<StreamFlushablePriorityAdapter>,
131
132 pub readable: RBTree<StreamReadablePriorityAdapter>,
136
137 pub writable: RBTree<StreamWritablePriorityAdapter>,
142
143 almost_full: StreamIdHashSet,
148
149 blocked: StreamIdHashMap<u64>,
153
154 reset: StreamIdHashMap<(u64, u64)>,
158
159 stopped: StreamIdHashMap<u64>,
163
164 max_stream_window: u64,
166}
167
168impl<F: BufFactory> StreamMap<F> {
169 pub fn new(
170 max_streams_bidi: u64, max_streams_uni: u64, max_stream_window: u64,
171 ) -> Self {
172 StreamMap {
173 local_max_streams_bidi: max_streams_bidi,
174 local_max_streams_bidi_next: max_streams_bidi,
175
176 local_max_streams_uni: max_streams_uni,
177 local_max_streams_uni_next: max_streams_uni,
178
179 max_stream_window,
180
181 ..StreamMap::default()
182 }
183 }
184
185 pub fn get(&self, id: u64) -> Option<&Stream<F>> {
187 self.streams.get(&id)
188 }
189
190 pub fn get_mut(&mut self, id: u64) -> Option<&mut Stream<F>> {
192 self.streams.get_mut(&id)
193 }
194
195 pub(crate) fn get_or_create(
208 &mut self, id: u64, local_params: &crate::TransportParams,
209 peer_params: &crate::TransportParams, local: bool, is_server: bool,
210 ) -> Result<&mut Stream<F>> {
211 let (stream, is_new_and_writable) = match self.streams.entry(id) {
212 hash_map::Entry::Vacant(v) => {
213 if self.collected.contains(&id) {
215 return Err(Error::Done);
216 }
217
218 if local != is_local(id, is_server) {
219 return Err(Error::InvalidStreamState(id));
220 }
221
222 let (max_rx_data, max_tx_data) = match (local, is_bidi(id)) {
223 (true, true) => (
225 local_params.initial_max_stream_data_bidi_local,
226 peer_params.initial_max_stream_data_bidi_remote,
227 ),
228
229 (true, false) => (0, peer_params.initial_max_stream_data_uni),
231
232 (false, true) => (
234 local_params.initial_max_stream_data_bidi_remote,
235 peer_params.initial_max_stream_data_bidi_local,
236 ),
237
238 (false, false) =>
240 (local_params.initial_max_stream_data_uni, 0),
241 };
242
243 let stream_sequence = id >> 2;
247
248 match (is_local(id, is_server), is_bidi(id)) {
250 (true, true) => {
251 let n = std::cmp::max(
252 self.local_opened_streams_bidi,
253 stream_sequence + 1,
254 );
255
256 if n > self.peer_max_streams_bidi {
257 return Err(Error::StreamLimit);
258 }
259
260 self.local_opened_streams_bidi = n;
261 },
262
263 (true, false) => {
264 let n = std::cmp::max(
265 self.local_opened_streams_uni,
266 stream_sequence + 1,
267 );
268
269 if n > self.peer_max_streams_uni {
270 return Err(Error::StreamLimit);
271 }
272
273 self.local_opened_streams_uni = n;
274 },
275
276 (false, true) => {
277 let n = std::cmp::max(
278 self.peer_opened_streams_bidi,
279 stream_sequence + 1,
280 );
281
282 if n > self.local_max_streams_bidi {
283 return Err(Error::StreamLimit);
284 }
285
286 self.peer_opened_streams_bidi = n;
287 },
288
289 (false, false) => {
290 let n = std::cmp::max(
291 self.peer_opened_streams_uni,
292 stream_sequence + 1,
293 );
294
295 if n > self.local_max_streams_uni {
296 return Err(Error::StreamLimit);
297 }
298
299 self.peer_opened_streams_uni = n;
300 },
301 };
302
303 let s = Stream::new(
304 id,
305 max_rx_data,
306 max_tx_data,
307 is_bidi(id),
308 local,
309 self.max_stream_window,
310 );
311
312 let is_writable = s.is_writable();
313
314 (v.insert(s), is_writable)
315 },
316
317 hash_map::Entry::Occupied(v) => (v.into_mut(), false),
318 };
319
320 if is_new_and_writable {
323 self.writable.insert(Arc::clone(&stream.priority_key));
324 }
325
326 Ok(stream)
327 }
328
329 pub fn insert_readable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
333 if !priority_key.readable.is_linked() {
334 self.readable.insert(Arc::clone(priority_key));
335 }
336 }
337
338 pub fn remove_readable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
340 if !priority_key.readable.is_linked() {
341 return;
342 }
343
344 let mut c = {
345 let ptr = Arc::as_ptr(priority_key);
346 unsafe { self.readable.cursor_mut_from_ptr(ptr) }
347 };
348
349 c.remove();
350 }
351
352 pub fn insert_writable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
359 if !priority_key.writable.is_linked() {
360 self.writable.insert(Arc::clone(priority_key));
361 }
362 }
363
364 pub fn remove_writable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
369 if !priority_key.writable.is_linked() {
370 return;
371 }
372
373 let mut c = {
374 let ptr = Arc::as_ptr(priority_key);
375 unsafe { self.writable.cursor_mut_from_ptr(ptr) }
376 };
377
378 c.remove();
379 }
380
381 pub fn insert_flushable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
385 if !priority_key.flushable.is_linked() {
386 self.flushable.insert(Arc::clone(priority_key));
387 }
388 }
389
390 pub fn remove_flushable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
392 if !priority_key.flushable.is_linked() {
393 return;
394 }
395
396 let mut c = {
397 let ptr = Arc::as_ptr(priority_key);
398 unsafe { self.flushable.cursor_mut_from_ptr(ptr) }
399 };
400
401 c.remove();
402 }
403
404 pub fn peek_flushable(&self) -> Option<Arc<StreamPriorityKey>> {
405 self.flushable.front().clone_pointer()
406 }
407
408 pub fn update_priority(
410 &mut self, old: &Arc<StreamPriorityKey>, new: &Arc<StreamPriorityKey>,
411 ) {
412 if old.readable.is_linked() {
413 self.remove_readable(old);
414 self.readable.insert(Arc::clone(new));
415 }
416
417 if old.writable.is_linked() {
418 self.remove_writable(old);
419 self.writable.insert(Arc::clone(new));
420 }
421
422 if old.flushable.is_linked() {
423 self.remove_flushable(old);
424 self.flushable.insert(Arc::clone(new));
425 }
426 }
427
428 pub fn insert_almost_full(&mut self, stream_id: u64) {
432 self.almost_full.insert(stream_id);
433 }
434
435 pub fn remove_almost_full(&mut self, stream_id: u64) {
437 self.almost_full.remove(&stream_id);
438 }
439
440 pub fn insert_blocked(&mut self, stream_id: u64, off: u64) {
445 self.blocked.insert(stream_id, off);
446 }
447
448 pub fn remove_blocked(&mut self, stream_id: u64) {
450 self.blocked.remove(&stream_id);
451 }
452
453 pub fn insert_reset(
458 &mut self, stream_id: u64, error_code: u64, final_size: u64,
459 ) {
460 self.reset.insert(stream_id, (error_code, final_size));
461 }
462
463 pub fn remove_reset(&mut self, stream_id: u64) {
465 self.reset.remove(&stream_id);
466 }
467
468 pub fn insert_stopped(&mut self, stream_id: u64, error_code: u64) {
473 self.stopped.insert(stream_id, error_code);
474 }
475
476 pub fn remove_stopped(&mut self, stream_id: u64) {
478 self.stopped.remove(&stream_id);
479 }
480
481 pub fn update_peer_max_streams_bidi(&mut self, v: u64) {
483 self.peer_max_streams_bidi = cmp::max(self.peer_max_streams_bidi, v);
484 }
485
486 pub fn update_peer_max_streams_uni(&mut self, v: u64) {
488 self.peer_max_streams_uni = cmp::max(self.peer_max_streams_uni, v);
489 }
490
491 pub fn update_max_streams_bidi(&mut self) {
493 self.local_max_streams_bidi = self.local_max_streams_bidi_next;
494 }
495
496 pub fn set_max_streams_bidi(&mut self, max: u64) {
498 self.local_max_streams_bidi = max;
499 self.local_max_streams_bidi_next = max;
500 }
501
502 pub fn max_streams_bidi(&self) -> u64 {
504 self.local_max_streams_bidi
505 }
506
507 pub fn max_streams_bidi_next(&mut self) -> u64 {
509 self.local_max_streams_bidi_next
510 }
511
512 pub fn update_max_streams_uni(&mut self) {
514 self.local_max_streams_uni = self.local_max_streams_uni_next;
515 }
516
517 pub fn max_streams_uni_next(&mut self) -> u64 {
519 self.local_max_streams_uni_next
520 }
521
522 pub fn peer_streams_left_bidi(&self) -> u64 {
525 self.peer_max_streams_bidi - self.local_opened_streams_bidi
526 }
527
528 pub fn peer_streams_left_uni(&self) -> u64 {
531 self.peer_max_streams_uni - self.local_opened_streams_uni
532 }
533
534 pub fn collect(&mut self, stream_id: u64, local: bool) {
539 if !local {
540 if is_bidi(stream_id) {
543 self.local_max_streams_bidi_next =
544 self.local_max_streams_bidi_next.saturating_add(1);
545 } else {
546 self.local_max_streams_uni_next =
547 self.local_max_streams_uni_next.saturating_add(1);
548 }
549 }
550
551 let s = self.streams.remove(&stream_id).unwrap();
552
553 self.remove_readable(&s.priority_key);
554
555 self.remove_writable(&s.priority_key);
556
557 self.remove_flushable(&s.priority_key);
558
559 self.collected.insert(stream_id);
560 }
561
562 pub fn readable(&self) -> StreamIter {
564 StreamIter {
565 streams: self.readable.iter().map(|s| s.id).collect(),
566 index: 0,
567 }
568 }
569
570 pub fn writable(&self) -> StreamIter {
572 StreamIter {
573 streams: self.writable.iter().map(|s| s.id).collect(),
574 index: 0,
575 }
576 }
577
578 pub fn almost_full(&self) -> StreamIter {
580 StreamIter::from(&self.almost_full)
581 }
582
583 pub fn blocked(&self) -> hash_map::Iter<u64, u64> {
585 self.blocked.iter()
586 }
587
588 pub fn reset(&self) -> hash_map::Iter<u64, (u64, u64)> {
590 self.reset.iter()
591 }
592
593 pub fn stopped(&self) -> hash_map::Iter<u64, u64> {
595 self.stopped.iter()
596 }
597
598 pub fn is_collected(&self, stream_id: u64) -> bool {
600 self.collected.contains(&stream_id)
601 }
602
603 pub fn has_flushable(&self) -> bool {
605 !self.flushable.is_empty()
606 }
607
608 pub fn has_readable(&self) -> bool {
610 !self.readable.is_empty()
611 }
612
613 pub fn has_almost_full(&self) -> bool {
616 !self.almost_full.is_empty()
617 }
618
619 pub fn has_blocked(&self) -> bool {
621 !self.blocked.is_empty()
622 }
623
624 pub fn has_reset(&self) -> bool {
626 !self.reset.is_empty()
627 }
628
629 pub fn has_stopped(&self) -> bool {
631 !self.stopped.is_empty()
632 }
633
634 pub fn should_update_max_streams_bidi(&self) -> bool {
637 self.local_max_streams_bidi_next != self.local_max_streams_bidi &&
638 self.local_max_streams_bidi_next / 2 >
639 self.local_max_streams_bidi - self.peer_opened_streams_bidi
640 }
641
642 pub fn should_update_max_streams_uni(&self) -> bool {
645 self.local_max_streams_uni_next != self.local_max_streams_uni &&
646 self.local_max_streams_uni_next / 2 >
647 self.local_max_streams_uni - self.peer_opened_streams_uni
648 }
649
650 #[cfg(test)]
652 pub fn len(&self) -> usize {
653 self.streams.len()
654 }
655}
656
657pub struct Stream<F: BufFactory = DefaultBufFactory> {
659 pub recv: recv_buf::RecvBuf,
661
662 pub send: send_buf::SendBuf<F>,
664
665 pub send_lowat: usize,
666
667 pub bidi: bool,
669
670 pub local: bool,
672
673 pub urgency: u8,
675
676 pub incremental: bool,
678
679 pub priority_key: Arc<StreamPriorityKey>,
680}
681
682impl<F: BufFactory> Stream<F> {
683 pub fn new(
685 id: u64, max_rx_data: u64, max_tx_data: u64, bidi: bool, local: bool,
686 max_window: u64,
687 ) -> Self {
688 let priority_key = Arc::new(StreamPriorityKey {
689 id,
690 ..Default::default()
691 });
692
693 Stream {
694 recv: recv_buf::RecvBuf::new(max_rx_data, max_window),
695 send: send_buf::SendBuf::new(max_tx_data),
696 send_lowat: 1,
697 bidi,
698 local,
699 urgency: priority_key.urgency,
700 incremental: priority_key.incremental,
701 priority_key,
702 }
703 }
704
705 pub fn is_readable(&self) -> bool {
707 self.recv.ready()
708 }
709
710 pub fn is_writable(&self) -> bool {
713 !self.send.is_shutdown() &&
714 !self.send.is_fin() &&
715 (self.send.off_back() + self.send_lowat as u64) <
716 self.send.max_off()
717 }
718
719 pub fn is_flushable(&self) -> bool {
722 let off_front = self.send.off_front();
723
724 !self.send.is_empty() &&
725 off_front < self.send.off_back() &&
726 off_front < self.send.max_off()
727 }
728
729 pub fn is_complete(&self) -> bool {
739 match (self.bidi, self.local) {
740 (true, _) => self.recv.is_fin() && self.send.is_complete(),
743
744 (false, true) => self.send.is_complete(),
747
748 (false, false) => self.recv.is_fin(),
751 }
752 }
753}
754
755pub fn is_local(stream_id: u64, is_server: bool) -> bool {
757 (stream_id & 0x1) == (is_server as u64)
758}
759
760pub fn is_bidi(stream_id: u64) -> bool {
762 (stream_id & 0x2) == 0
763}
764
765#[derive(Clone, Debug)]
766pub struct StreamPriorityKey {
767 pub urgency: u8,
768 pub incremental: bool,
769 pub id: u64,
770
771 pub readable: RBTreeAtomicLink,
772 pub writable: RBTreeAtomicLink,
773 pub flushable: RBTreeAtomicLink,
774}
775
776impl Default for StreamPriorityKey {
777 fn default() -> Self {
778 Self {
779 urgency: DEFAULT_URGENCY,
780 incremental: true,
781 id: Default::default(),
782 readable: Default::default(),
783 writable: Default::default(),
784 flushable: Default::default(),
785 }
786 }
787}
788
789impl PartialEq for StreamPriorityKey {
790 fn eq(&self, other: &Self) -> bool {
791 self.id == other.id
792 }
793}
794
795impl Eq for StreamPriorityKey {}
796
797impl PartialOrd for StreamPriorityKey {
798 #[allow(clippy::non_canonical_partial_ord_impl)]
800 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
801 if self.id == other.id {
803 return Some(std::cmp::Ordering::Equal);
804 }
805
806 if self.urgency != other.urgency {
808 return self.urgency.partial_cmp(&other.urgency);
809 }
810
811 if !self.incremental && !other.incremental {
814 return self.id.partial_cmp(&other.id);
815 }
816
817 if self.incremental && !other.incremental {
819 return Some(std::cmp::Ordering::Greater);
820 }
821 if !self.incremental && other.incremental {
822 return Some(std::cmp::Ordering::Less);
823 }
824
825 Some(std::cmp::Ordering::Greater)
829 }
830}
831
832impl Ord for StreamPriorityKey {
833 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
834 self.partial_cmp(other).unwrap()
836 }
837}
838
839intrusive_adapter!(pub StreamWritablePriorityAdapter = Arc<StreamPriorityKey>: StreamPriorityKey { writable: RBTreeAtomicLink });
840
841impl KeyAdapter<'_> for StreamWritablePriorityAdapter {
842 type Key = StreamPriorityKey;
843
844 fn get_key(&self, s: &StreamPriorityKey) -> Self::Key {
845 s.clone()
846 }
847}
848
849intrusive_adapter!(pub StreamReadablePriorityAdapter = Arc<StreamPriorityKey>: StreamPriorityKey { readable: RBTreeAtomicLink });
850
851impl KeyAdapter<'_> for StreamReadablePriorityAdapter {
852 type Key = StreamPriorityKey;
853
854 fn get_key(&self, s: &StreamPriorityKey) -> Self::Key {
855 s.clone()
856 }
857}
858
859intrusive_adapter!(pub StreamFlushablePriorityAdapter = Arc<StreamPriorityKey>: StreamPriorityKey { flushable: RBTreeAtomicLink });
860
861impl KeyAdapter<'_> for StreamFlushablePriorityAdapter {
862 type Key = StreamPriorityKey;
863
864 fn get_key(&self, s: &StreamPriorityKey) -> Self::Key {
865 s.clone()
866 }
867}
868
869#[derive(Default)]
871pub struct StreamIter {
872 streams: SmallVec<[u64; 8]>,
873 index: usize,
874}
875
876impl StreamIter {
877 #[inline]
878 fn from(streams: &StreamIdHashSet) -> Self {
879 StreamIter {
880 streams: streams.iter().copied().collect(),
881 index: 0,
882 }
883 }
884}
885
886impl Iterator for StreamIter {
887 type Item = u64;
888
889 #[inline]
890 fn next(&mut self) -> Option<Self::Item> {
891 let v = self.streams.get(self.index)?;
892 self.index += 1;
893 Some(*v)
894 }
895}
896
897impl ExactSizeIterator for StreamIter {
898 #[inline]
899 fn len(&self) -> usize {
900 self.streams.len() - self.index
901 }
902}
903
904#[cfg(test)]
905mod tests {
906 use crate::range_buf::RangeBuf;
907
908 use super::*;
909
910 #[test]
911 fn recv_flow_control() {
912 let mut stream =
913 <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
914 assert!(!stream.recv.almost_full());
915
916 let mut buf = [0; 32];
917
918 let first = RangeBuf::from(b"hello", 0, false);
919 let second = RangeBuf::from(b"world", 5, false);
920 let third = RangeBuf::from(b"something", 10, false);
921
922 assert_eq!(stream.recv.write(second), Ok(()));
923 assert_eq!(stream.recv.write(first), Ok(()));
924 assert!(!stream.recv.almost_full());
925
926 assert_eq!(stream.recv.write(third), Err(Error::FlowControl));
927
928 let (len, fin) = stream.recv.emit(&mut buf).unwrap();
929 assert_eq!(&buf[..len], b"helloworld");
930 assert!(!fin);
931
932 assert!(stream.recv.almost_full());
933
934 stream.recv.update_max_data(std::time::Instant::now());
935 assert_eq!(stream.recv.max_data_next(), 25);
936 assert!(!stream.recv.almost_full());
937
938 let third = RangeBuf::from(b"something", 10, false);
939 assert_eq!(stream.recv.write(third), Ok(()));
940 }
941
942 #[test]
943 fn recv_past_fin() {
944 let mut stream =
945 <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
946 assert!(!stream.recv.almost_full());
947
948 let first = RangeBuf::from(b"hello", 0, true);
949 let second = RangeBuf::from(b"world", 5, false);
950
951 assert_eq!(stream.recv.write(first), Ok(()));
952 assert_eq!(stream.recv.write(second), Err(Error::FinalSize));
953 }
954
955 #[test]
956 fn recv_fin_dup() {
957 let mut stream =
958 <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
959 assert!(!stream.recv.almost_full());
960
961 let first = RangeBuf::from(b"hello", 0, true);
962 let second = RangeBuf::from(b"hello", 0, true);
963
964 assert_eq!(stream.recv.write(first), Ok(()));
965 assert_eq!(stream.recv.write(second), Ok(()));
966
967 let mut buf = [0; 32];
968
969 let (len, fin) = stream.recv.emit(&mut buf).unwrap();
970 assert_eq!(&buf[..len], b"hello");
971 assert!(fin);
972 }
973
974 #[test]
975 fn recv_fin_change() {
976 let mut stream =
977 <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
978 assert!(!stream.recv.almost_full());
979
980 let first = RangeBuf::from(b"hello", 0, true);
981 let second = RangeBuf::from(b"world", 5, true);
982
983 assert_eq!(stream.recv.write(second), Ok(()));
984 assert_eq!(stream.recv.write(first), Err(Error::FinalSize));
985 }
986
987 #[test]
988 fn recv_fin_lower_than_received() {
989 let mut stream =
990 <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
991 assert!(!stream.recv.almost_full());
992
993 let first = RangeBuf::from(b"hello", 0, true);
994 let second = RangeBuf::from(b"world", 5, false);
995
996 assert_eq!(stream.recv.write(second), Ok(()));
997 assert_eq!(stream.recv.write(first), Err(Error::FinalSize));
998 }
999
1000 #[test]
1001 fn recv_fin_flow_control() {
1002 let mut stream =
1003 <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1004 assert!(!stream.recv.almost_full());
1005
1006 let mut buf = [0; 32];
1007
1008 let first = RangeBuf::from(b"hello", 0, false);
1009 let second = RangeBuf::from(b"world", 5, true);
1010
1011 assert_eq!(stream.recv.write(first), Ok(()));
1012 assert_eq!(stream.recv.write(second), Ok(()));
1013
1014 let (len, fin) = stream.recv.emit(&mut buf).unwrap();
1015 assert_eq!(&buf[..len], b"helloworld");
1016 assert!(fin);
1017
1018 assert!(!stream.recv.almost_full());
1019 }
1020
1021 #[test]
1022 fn recv_fin_reset_mismatch() {
1023 let mut stream =
1024 <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1025 assert!(!stream.recv.almost_full());
1026
1027 let first = RangeBuf::from(b"hello", 0, true);
1028
1029 assert_eq!(stream.recv.write(first), Ok(()));
1030 assert_eq!(stream.recv.reset(0, 10), Err(Error::FinalSize));
1031 }
1032
1033 #[test]
1034 fn recv_reset_dup() {
1035 let mut stream =
1036 <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1037 assert!(!stream.recv.almost_full());
1038
1039 let first = RangeBuf::from(b"hello", 0, false);
1040
1041 assert_eq!(stream.recv.write(first), Ok(()));
1042 assert_eq!(stream.recv.reset(0, 5), Ok(0));
1043 assert_eq!(stream.recv.reset(0, 5), Ok(0));
1044 }
1045
1046 #[test]
1047 fn recv_reset_change() {
1048 let mut stream =
1049 <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1050 assert!(!stream.recv.almost_full());
1051
1052 let first = RangeBuf::from(b"hello", 0, false);
1053
1054 assert_eq!(stream.recv.write(first), Ok(()));
1055 assert_eq!(stream.recv.reset(0, 5), Ok(0));
1056 assert_eq!(stream.recv.reset(0, 10), Err(Error::FinalSize));
1057 }
1058
1059 #[test]
1060 fn recv_reset_lower_than_received() {
1061 let mut stream =
1062 <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1063 assert!(!stream.recv.almost_full());
1064
1065 let first = RangeBuf::from(b"hello", 0, false);
1066
1067 assert_eq!(stream.recv.write(first), Ok(()));
1068 assert_eq!(stream.recv.reset(0, 4), Err(Error::FinalSize));
1069 }
1070
1071 #[test]
1072 fn send_flow_control() {
1073 let mut buf = [0; 25];
1074
1075 let mut stream =
1076 <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1077
1078 let first = b"hello";
1079 let second = b"world";
1080 let third = b"something";
1081
1082 assert!(stream.send.write(first, false).is_ok());
1083 assert!(stream.send.write(second, false).is_ok());
1084 assert!(stream.send.write(third, false).is_ok());
1085
1086 assert_eq!(stream.send.off_front(), 0);
1087
1088 let (written, fin) = stream.send.emit(&mut buf[..25]).unwrap();
1089 assert_eq!(written, 15);
1090 assert!(!fin);
1091 assert_eq!(&buf[..written], b"helloworldsomet");
1092
1093 assert_eq!(stream.send.off_front(), 15);
1094
1095 let (written, fin) = stream.send.emit(&mut buf[..25]).unwrap();
1096 assert_eq!(written, 0);
1097 assert!(!fin);
1098 assert_eq!(&buf[..written], b"");
1099
1100 stream.send.retransmit(0, 15);
1101
1102 assert_eq!(stream.send.off_front(), 0);
1103
1104 let (written, fin) = stream.send.emit(&mut buf[..10]).unwrap();
1105 assert_eq!(written, 10);
1106 assert!(!fin);
1107 assert_eq!(&buf[..written], b"helloworld");
1108
1109 assert_eq!(stream.send.off_front(), 10);
1110
1111 let (written, fin) = stream.send.emit(&mut buf[..10]).unwrap();
1112 assert_eq!(written, 5);
1113 assert!(!fin);
1114 assert_eq!(&buf[..written], b"somet");
1115 }
1116
1117 #[test]
1118 fn send_past_fin() {
1119 let mut stream =
1120 <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1121
1122 let first = b"hello";
1123 let second = b"world";
1124 let third = b"third";
1125
1126 assert_eq!(stream.send.write(first, false), Ok(5));
1127
1128 assert_eq!(stream.send.write(second, true), Ok(5));
1129 assert!(stream.send.is_fin());
1130
1131 assert_eq!(stream.send.write(third, false), Err(Error::FinalSize));
1132 }
1133
1134 #[test]
1135 fn send_fin_dup() {
1136 let mut stream =
1137 <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1138
1139 assert_eq!(stream.send.write(b"hello", true), Ok(5));
1140 assert!(stream.send.is_fin());
1141
1142 assert_eq!(stream.send.write(b"", true), Ok(0));
1143 assert!(stream.send.is_fin());
1144 }
1145
1146 #[test]
1147 fn send_undo_fin() {
1148 let mut stream =
1149 <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1150
1151 assert_eq!(stream.send.write(b"hello", true), Ok(5));
1152 assert!(stream.send.is_fin());
1153
1154 assert_eq!(
1155 stream.send.write(b"helloworld", true),
1156 Err(Error::FinalSize)
1157 );
1158 }
1159
1160 #[test]
1161 fn send_fin_max_data_match() {
1162 let mut buf = [0; 15];
1163
1164 let mut stream =
1165 <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1166
1167 let slice = b"hellohellohello";
1168
1169 assert!(stream.send.write(slice, true).is_ok());
1170
1171 let (written, fin) = stream.send.emit(&mut buf[..15]).unwrap();
1172 assert_eq!(written, 15);
1173 assert!(fin);
1174 assert_eq!(&buf[..written], slice);
1175 }
1176
1177 #[test]
1178 fn send_fin_zero_length() {
1179 let mut buf = [0; 5];
1180
1181 let mut stream =
1182 <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1183
1184 assert_eq!(stream.send.write(b"hello", false), Ok(5));
1185 assert_eq!(stream.send.write(b"", true), Ok(0));
1186 assert!(stream.send.is_fin());
1187
1188 let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1189 assert_eq!(written, 5);
1190 assert!(fin);
1191 assert_eq!(&buf[..written], b"hello");
1192 }
1193
1194 #[test]
1195 fn send_ack() {
1196 let mut buf = [0; 5];
1197
1198 let mut stream =
1199 <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1200
1201 assert_eq!(stream.send.write(b"hello", false), Ok(5));
1202 assert_eq!(stream.send.write(b"world", false), Ok(5));
1203 assert_eq!(stream.send.write(b"", true), Ok(0));
1204 assert!(stream.send.is_fin());
1205
1206 assert_eq!(stream.send.off_front(), 0);
1207
1208 let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1209 assert_eq!(written, 5);
1210 assert!(!fin);
1211 assert_eq!(&buf[..written], b"hello");
1212
1213 stream.send.ack_and_drop(0, 5);
1214
1215 stream.send.retransmit(0, 5);
1216
1217 assert_eq!(stream.send.off_front(), 5);
1218
1219 let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1220 assert_eq!(written, 5);
1221 assert!(fin);
1222 assert_eq!(&buf[..written], b"world");
1223 }
1224
1225 #[test]
1226 fn send_ack_reordering() {
1227 let mut buf = [0; 5];
1228
1229 let mut stream =
1230 <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1231
1232 assert_eq!(stream.send.write(b"hello", false), Ok(5));
1233 assert_eq!(stream.send.write(b"world", false), Ok(5));
1234 assert_eq!(stream.send.write(b"", true), Ok(0));
1235 assert!(stream.send.is_fin());
1236
1237 assert_eq!(stream.send.off_front(), 0);
1238
1239 let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1240 assert_eq!(written, 5);
1241 assert!(!fin);
1242 assert_eq!(&buf[..written], b"hello");
1243
1244 assert_eq!(stream.send.off_front(), 5);
1245
1246 let (written, fin) = stream.send.emit(&mut buf[..1]).unwrap();
1247 assert_eq!(written, 1);
1248 assert!(!fin);
1249 assert_eq!(&buf[..written], b"w");
1250
1251 stream.send.ack_and_drop(5, 1);
1252 stream.send.ack_and_drop(0, 5);
1253
1254 stream.send.retransmit(0, 5);
1255 stream.send.retransmit(5, 1);
1256
1257 assert_eq!(stream.send.off_front(), 6);
1258
1259 let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1260 assert_eq!(written, 4);
1261 assert!(fin);
1262 assert_eq!(&buf[..written], b"orld");
1263 }
1264
1265 #[test]
1266 fn recv_data_below_off() {
1267 let mut stream =
1268 <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1269
1270 let first = RangeBuf::from(b"hello", 0, false);
1271
1272 assert_eq!(stream.recv.write(first), Ok(()));
1273
1274 let mut buf = [0; 10];
1275
1276 let (len, fin) = stream.recv.emit(&mut buf).unwrap();
1277 assert_eq!(&buf[..len], b"hello");
1278 assert!(!fin);
1279
1280 let first = RangeBuf::from(b"elloworld", 1, true);
1281 assert_eq!(stream.recv.write(first), Ok(()));
1282
1283 let (len, fin) = stream.recv.emit(&mut buf).unwrap();
1284 assert_eq!(&buf[..len], b"world");
1285 assert!(fin);
1286 }
1287
1288 #[test]
1289 fn stream_complete() {
1290 let mut stream =
1291 <Stream>::new(0, 30, 30, true, true, DEFAULT_STREAM_WINDOW);
1292
1293 assert_eq!(stream.send.write(b"hello", false), Ok(5));
1294 assert_eq!(stream.send.write(b"world", false), Ok(5));
1295
1296 assert!(!stream.send.is_complete());
1297 assert!(!stream.send.is_fin());
1298
1299 assert_eq!(stream.send.write(b"", true), Ok(0));
1300
1301 assert!(!stream.send.is_complete());
1302 assert!(stream.send.is_fin());
1303
1304 let buf = RangeBuf::from(b"hello", 0, true);
1305 assert!(stream.recv.write(buf).is_ok());
1306 assert!(!stream.recv.is_fin());
1307
1308 stream.send.ack(6, 4);
1309 assert!(!stream.send.is_complete());
1310
1311 let mut buf = [0; 2];
1312 assert_eq!(stream.recv.emit(&mut buf), Ok((2, false)));
1313 assert!(!stream.recv.is_fin());
1314
1315 stream.send.ack(1, 5);
1316 assert!(!stream.send.is_complete());
1317
1318 stream.send.ack(0, 1);
1319 assert!(stream.send.is_complete());
1320
1321 assert!(!stream.is_complete());
1322
1323 let mut buf = [0; 3];
1324 assert_eq!(stream.recv.emit(&mut buf), Ok((3, true)));
1325 assert!(stream.recv.is_fin());
1326
1327 assert!(stream.is_complete());
1328 }
1329
1330 #[test]
1331 fn send_fin_zero_length_output() {
1332 let mut buf = [0; 5];
1333
1334 let mut stream =
1335 <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1336
1337 assert_eq!(stream.send.write(b"hello", false), Ok(5));
1338 assert_eq!(stream.send.off_front(), 0);
1339 assert!(!stream.send.is_fin());
1340
1341 let (written, fin) = stream.send.emit(&mut buf).unwrap();
1342 assert_eq!(written, 5);
1343 assert!(!fin);
1344 assert_eq!(&buf[..written], b"hello");
1345
1346 assert_eq!(stream.send.write(b"", true), Ok(0));
1347 assert!(stream.send.is_fin());
1348 assert_eq!(stream.send.off_front(), 5);
1349
1350 let (written, fin) = stream.send.emit(&mut buf).unwrap();
1351 assert_eq!(written, 0);
1352 assert!(fin);
1353 assert_eq!(&buf[..written], b"");
1354 }
1355
1356 fn stream_send_ready(stream: &Stream) -> bool {
1357 !stream.send.is_empty() &&
1358 stream.send.off_front() < stream.send.off_back()
1359 }
1360
1361 #[test]
1362 fn send_emit() {
1363 let mut buf = [0; 5];
1364
1365 let mut stream =
1366 <Stream>::new(0, 0, 20, true, true, DEFAULT_STREAM_WINDOW);
1367
1368 assert_eq!(stream.send.write(b"hello", false), Ok(5));
1369 assert_eq!(stream.send.write(b"world", false), Ok(5));
1370 assert_eq!(stream.send.write(b"olleh", false), Ok(5));
1371 assert_eq!(stream.send.write(b"dlrow", true), Ok(5));
1372 assert_eq!(stream.send.off_front(), 0);
1373 assert_eq!(stream.send.bufs_count(), 4);
1374
1375 assert!(stream.is_flushable());
1376
1377 assert!(stream_send_ready(&stream));
1378 assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1379 assert_eq!(stream.send.off_front(), 4);
1380 assert_eq!(&buf[..4], b"hell");
1381
1382 assert!(stream_send_ready(&stream));
1383 assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1384 assert_eq!(stream.send.off_front(), 8);
1385 assert_eq!(&buf[..4], b"owor");
1386
1387 assert!(stream_send_ready(&stream));
1388 assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1389 assert_eq!(stream.send.off_front(), 10);
1390 assert_eq!(&buf[..2], b"ld");
1391
1392 assert!(stream_send_ready(&stream));
1393 assert_eq!(stream.send.emit(&mut buf[..1]), Ok((1, false)));
1394 assert_eq!(stream.send.off_front(), 11);
1395 assert_eq!(&buf[..1], b"o");
1396
1397 assert!(stream_send_ready(&stream));
1398 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1399 assert_eq!(stream.send.off_front(), 16);
1400 assert_eq!(&buf[..5], b"llehd");
1401
1402 assert!(stream_send_ready(&stream));
1403 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((4, true)));
1404 assert_eq!(stream.send.off_front(), 20);
1405 assert_eq!(&buf[..4], b"lrow");
1406
1407 assert!(!stream.is_flushable());
1408
1409 assert!(!stream_send_ready(&stream));
1410 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((0, true)));
1411 assert_eq!(stream.send.off_front(), 20);
1412 }
1413
1414 #[test]
1415 fn send_emit_ack() {
1416 let mut buf = [0; 5];
1417
1418 let mut stream =
1419 <Stream>::new(0, 0, 20, true, true, DEFAULT_STREAM_WINDOW);
1420
1421 assert_eq!(stream.send.write(b"hello", false), Ok(5));
1422 assert_eq!(stream.send.write(b"world", false), Ok(5));
1423 assert_eq!(stream.send.write(b"olleh", false), Ok(5));
1424 assert_eq!(stream.send.write(b"dlrow", true), Ok(5));
1425 assert_eq!(stream.send.off_front(), 0);
1426 assert_eq!(stream.send.bufs_count(), 4);
1427
1428 assert!(stream.is_flushable());
1429
1430 assert!(stream_send_ready(&stream));
1431 assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1432 assert_eq!(stream.send.off_front(), 4);
1433 assert_eq!(&buf[..4], b"hell");
1434
1435 assert!(stream_send_ready(&stream));
1436 assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1437 assert_eq!(stream.send.off_front(), 8);
1438 assert_eq!(&buf[..4], b"owor");
1439
1440 stream.send.ack_and_drop(0, 5);
1441 assert_eq!(stream.send.bufs_count(), 3);
1442
1443 assert!(stream_send_ready(&stream));
1444 assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1445 assert_eq!(stream.send.off_front(), 10);
1446 assert_eq!(&buf[..2], b"ld");
1447
1448 stream.send.ack_and_drop(7, 5);
1449 assert_eq!(stream.send.bufs_count(), 3);
1450
1451 assert!(stream_send_ready(&stream));
1452 assert_eq!(stream.send.emit(&mut buf[..1]), Ok((1, false)));
1453 assert_eq!(stream.send.off_front(), 11);
1454 assert_eq!(&buf[..1], b"o");
1455
1456 assert!(stream_send_ready(&stream));
1457 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1458 assert_eq!(stream.send.off_front(), 16);
1459 assert_eq!(&buf[..5], b"llehd");
1460
1461 stream.send.ack_and_drop(5, 7);
1462 assert_eq!(stream.send.bufs_count(), 2);
1463
1464 assert!(stream_send_ready(&stream));
1465 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((4, true)));
1466 assert_eq!(stream.send.off_front(), 20);
1467 assert_eq!(&buf[..4], b"lrow");
1468
1469 assert!(!stream.is_flushable());
1470
1471 assert!(!stream_send_ready(&stream));
1472 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((0, true)));
1473 assert_eq!(stream.send.off_front(), 20);
1474
1475 stream.send.ack_and_drop(22, 4);
1476 assert_eq!(stream.send.bufs_count(), 2);
1477
1478 stream.send.ack_and_drop(20, 1);
1479 assert_eq!(stream.send.bufs_count(), 2);
1480 }
1481
1482 #[test]
1483 fn send_emit_retransmit() {
1484 let mut buf = [0; 5];
1485
1486 let mut stream =
1487 <Stream>::new(0, 0, 20, true, true, DEFAULT_STREAM_WINDOW);
1488
1489 assert_eq!(stream.send.write(b"hello", false), Ok(5));
1490 assert_eq!(stream.send.write(b"world", false), Ok(5));
1491 assert_eq!(stream.send.write(b"olleh", false), Ok(5));
1492 assert_eq!(stream.send.write(b"dlrow", true), Ok(5));
1493 assert_eq!(stream.send.off_front(), 0);
1494 assert_eq!(stream.send.bufs_count(), 4);
1495
1496 assert!(stream.is_flushable());
1497
1498 assert!(stream_send_ready(&stream));
1499 assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1500 assert_eq!(stream.send.off_front(), 4);
1501 assert_eq!(&buf[..4], b"hell");
1502
1503 assert!(stream_send_ready(&stream));
1504 assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1505 assert_eq!(stream.send.off_front(), 8);
1506 assert_eq!(&buf[..4], b"owor");
1507
1508 stream.send.retransmit(3, 3);
1509 assert_eq!(stream.send.off_front(), 3);
1510
1511 assert!(stream_send_ready(&stream));
1512 assert_eq!(stream.send.emit(&mut buf[..3]), Ok((3, false)));
1513 assert_eq!(stream.send.off_front(), 8);
1514 assert_eq!(&buf[..3], b"low");
1515
1516 assert!(stream_send_ready(&stream));
1517 assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1518 assert_eq!(stream.send.off_front(), 10);
1519 assert_eq!(&buf[..2], b"ld");
1520
1521 stream.send.ack_and_drop(7, 2);
1522
1523 stream.send.retransmit(8, 2);
1524
1525 assert!(stream_send_ready(&stream));
1526 assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1527 assert_eq!(stream.send.off_front(), 10);
1528 assert_eq!(&buf[..2], b"ld");
1529
1530 assert!(stream_send_ready(&stream));
1531 assert_eq!(stream.send.emit(&mut buf[..1]), Ok((1, false)));
1532 assert_eq!(stream.send.off_front(), 11);
1533 assert_eq!(&buf[..1], b"o");
1534
1535 assert!(stream_send_ready(&stream));
1536 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1537 assert_eq!(stream.send.off_front(), 16);
1538 assert_eq!(&buf[..5], b"llehd");
1539
1540 stream.send.retransmit(12, 2);
1541
1542 assert!(stream_send_ready(&stream));
1543 assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1544 assert_eq!(stream.send.off_front(), 16);
1545 assert_eq!(&buf[..2], b"le");
1546
1547 assert!(stream_send_ready(&stream));
1548 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((4, true)));
1549 assert_eq!(stream.send.off_front(), 20);
1550 assert_eq!(&buf[..4], b"lrow");
1551
1552 assert!(!stream.is_flushable());
1553
1554 assert!(!stream_send_ready(&stream));
1555 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((0, true)));
1556 assert_eq!(stream.send.off_front(), 20);
1557
1558 stream.send.retransmit(7, 12);
1559
1560 assert!(stream_send_ready(&stream));
1561 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1562 assert_eq!(stream.send.off_front(), 12);
1563 assert_eq!(&buf[..5], b"rldol");
1564
1565 assert!(stream_send_ready(&stream));
1566 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1567 assert_eq!(stream.send.off_front(), 17);
1568 assert_eq!(&buf[..5], b"lehdl");
1569
1570 assert!(stream_send_ready(&stream));
1571 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((2, false)));
1572 assert_eq!(stream.send.off_front(), 20);
1573 assert_eq!(&buf[..2], b"ro");
1574
1575 stream.send.ack_and_drop(12, 7);
1576
1577 stream.send.retransmit(7, 12);
1578
1579 assert!(stream_send_ready(&stream));
1580 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1581 assert_eq!(stream.send.off_front(), 12);
1582 assert_eq!(&buf[..5], b"rldol");
1583
1584 assert!(stream_send_ready(&stream));
1585 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1586 assert_eq!(stream.send.off_front(), 17);
1587 assert_eq!(&buf[..5], b"lehdl");
1588
1589 assert!(stream_send_ready(&stream));
1590 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((2, false)));
1591 assert_eq!(stream.send.off_front(), 20);
1592 assert_eq!(&buf[..2], b"ro");
1593 }
1594
1595 #[test]
1596 fn rangebuf_split_off() {
1597 let mut buf = <RangeBuf>::from(b"helloworld", 5, true);
1598 assert_eq!(buf.start, 0);
1599 assert_eq!(buf.pos, 0);
1600 assert_eq!(buf.len, 10);
1601 assert_eq!(buf.off, 5);
1602 assert!(buf.fin);
1603
1604 assert_eq!(buf.len(), 10);
1605 assert_eq!(buf.off(), 5);
1606 assert!(buf.fin());
1607
1608 assert_eq!(&buf[..], b"helloworld");
1609
1610 buf.consume(5);
1612
1613 assert_eq!(buf.start, 0);
1614 assert_eq!(buf.pos, 5);
1615 assert_eq!(buf.len, 10);
1616 assert_eq!(buf.off, 5);
1617 assert!(buf.fin);
1618
1619 assert_eq!(buf.len(), 5);
1620 assert_eq!(buf.off(), 10);
1621 assert!(buf.fin());
1622
1623 assert_eq!(&buf[..], b"world");
1624
1625 let mut new_buf = buf.split_off(3);
1627
1628 assert_eq!(buf.start, 0);
1629 assert_eq!(buf.pos, 3);
1630 assert_eq!(buf.len, 3);
1631 assert_eq!(buf.off, 5);
1632 assert!(!buf.fin);
1633
1634 assert_eq!(buf.len(), 0);
1635 assert_eq!(buf.off(), 8);
1636 assert!(!buf.fin());
1637
1638 assert_eq!(&buf[..], b"");
1639
1640 assert_eq!(new_buf.start, 3);
1641 assert_eq!(new_buf.pos, 5);
1642 assert_eq!(new_buf.len, 7);
1643 assert_eq!(new_buf.off, 8);
1644 assert!(new_buf.fin);
1645
1646 assert_eq!(new_buf.len(), 5);
1647 assert_eq!(new_buf.off(), 10);
1648 assert!(new_buf.fin());
1649
1650 assert_eq!(&new_buf[..], b"world");
1651
1652 new_buf.consume(2);
1654
1655 assert_eq!(new_buf.start, 3);
1656 assert_eq!(new_buf.pos, 7);
1657 assert_eq!(new_buf.len, 7);
1658 assert_eq!(new_buf.off, 8);
1659 assert!(new_buf.fin);
1660
1661 assert_eq!(new_buf.len(), 3);
1662 assert_eq!(new_buf.off(), 12);
1663 assert!(new_buf.fin());
1664
1665 assert_eq!(&new_buf[..], b"rld");
1666
1667 let mut new_new_buf = new_buf.split_off(5);
1669
1670 assert_eq!(new_buf.start, 3);
1671 assert_eq!(new_buf.pos, 7);
1672 assert_eq!(new_buf.len, 5);
1673 assert_eq!(new_buf.off, 8);
1674 assert!(!new_buf.fin);
1675
1676 assert_eq!(new_buf.len(), 1);
1677 assert_eq!(new_buf.off(), 12);
1678 assert!(!new_buf.fin());
1679
1680 assert_eq!(&new_buf[..], b"r");
1681
1682 assert_eq!(new_new_buf.start, 8);
1683 assert_eq!(new_new_buf.pos, 8);
1684 assert_eq!(new_new_buf.len, 2);
1685 assert_eq!(new_new_buf.off, 13);
1686 assert!(new_new_buf.fin);
1687
1688 assert_eq!(new_new_buf.len(), 2);
1689 assert_eq!(new_new_buf.off(), 13);
1690 assert!(new_new_buf.fin());
1691
1692 assert_eq!(&new_new_buf[..], b"ld");
1693
1694 new_new_buf.consume(2);
1696
1697 assert_eq!(new_new_buf.start, 8);
1698 assert_eq!(new_new_buf.pos, 10);
1699 assert_eq!(new_new_buf.len, 2);
1700 assert_eq!(new_new_buf.off, 13);
1701 assert!(new_new_buf.fin);
1702
1703 assert_eq!(new_new_buf.len(), 0);
1704 assert_eq!(new_new_buf.off(), 15);
1705 assert!(new_new_buf.fin());
1706
1707 assert_eq!(&new_new_buf[..], b"");
1708 }
1709
1710 #[test]
1713 fn stream_limit_auto_open() {
1714 let local_tp = crate::TransportParams::default();
1715 let peer_tp = crate::TransportParams::default();
1716
1717 let mut streams = <StreamMap>::new(5, 5, 5);
1718
1719 let stream_id = 500;
1720 assert!(!is_local(stream_id, true), "stream id is peer initiated");
1721 assert!(is_bidi(stream_id), "stream id is bidirectional");
1722 assert_eq!(
1723 streams
1724 .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
1725 .err(),
1726 Some(Error::StreamLimit),
1727 "stream limit should be exceeded"
1728 );
1729 }
1730
1731 #[test]
1734 fn stream_create_out_of_order() {
1735 let local_tp = crate::TransportParams::default();
1736 let peer_tp = crate::TransportParams::default();
1737
1738 let mut streams = <StreamMap>::new(5, 5, 5);
1739
1740 for stream_id in [8, 12, 4] {
1741 assert!(is_local(stream_id, false), "stream id is client initiated");
1742 assert!(is_bidi(stream_id), "stream id is bidirectional");
1743 assert!(streams
1744 .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
1745 .is_ok());
1746 }
1747 }
1748
1749 #[test]
1751 fn stream_limit_edge() {
1752 let local_tp = crate::TransportParams::default();
1753 let peer_tp = crate::TransportParams::default();
1754
1755 let mut streams = <StreamMap>::new(3, 3, 3);
1756
1757 let stream_id = 8;
1759 assert!(streams
1760 .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
1761 .is_ok());
1762
1763 let stream_id = 12;
1765 assert_eq!(
1766 streams
1767 .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
1768 .err(),
1769 Some(Error::StreamLimit)
1770 );
1771 }
1772
1773 fn cycle_stream_priority(stream_id: u64, streams: &mut StreamMap) {
1774 let key = streams.get(stream_id).unwrap().priority_key.clone();
1775 streams.update_priority(&key.clone(), &key);
1776 }
1777
1778 #[test]
1779 fn writable_prioritized_default_priority() {
1780 let local_tp = crate::TransportParams::default();
1781 let peer_tp = crate::TransportParams {
1782 initial_max_stream_data_bidi_local: 100,
1783 initial_max_stream_data_uni: 100,
1784 ..Default::default()
1785 };
1786
1787 let mut streams = StreamMap::new(100, 100, 100);
1788
1789 for id in [0, 4, 8, 12] {
1790 assert!(streams
1791 .get_or_create(id, &local_tp, &peer_tp, false, true)
1792 .is_ok());
1793 }
1794
1795 let walk_1: Vec<u64> = streams.writable().collect();
1796 cycle_stream_priority(*walk_1.first().unwrap(), &mut streams);
1797 let walk_2: Vec<u64> = streams.writable().collect();
1798 cycle_stream_priority(*walk_2.first().unwrap(), &mut streams);
1799 let walk_3: Vec<u64> = streams.writable().collect();
1800 cycle_stream_priority(*walk_3.first().unwrap(), &mut streams);
1801 let walk_4: Vec<u64> = streams.writable().collect();
1802 cycle_stream_priority(*walk_4.first().unwrap(), &mut streams);
1803 let walk_5: Vec<u64> = streams.writable().collect();
1804
1805 assert_eq!(walk_1, vec![0, 4, 8, 12]);
1808 assert_eq!(walk_2, vec![4, 8, 12, 0]);
1809 assert_eq!(walk_3, vec![8, 12, 0, 4]);
1810 assert_eq!(walk_4, vec![12, 0, 4, 8,]);
1811 assert_eq!(walk_5, vec![0, 4, 8, 12]);
1812 }
1813
1814 #[test]
1815 fn writable_prioritized_insert_order() {
1816 let local_tp = crate::TransportParams::default();
1817 let peer_tp = crate::TransportParams {
1818 initial_max_stream_data_bidi_local: 100,
1819 initial_max_stream_data_uni: 100,
1820 ..Default::default()
1821 };
1822
1823 let mut streams = StreamMap::new(100, 100, 100);
1824
1825 for id in [12, 4, 8, 0] {
1828 assert!(streams
1829 .get_or_create(id, &local_tp, &peer_tp, false, true)
1830 .is_ok());
1831 }
1832
1833 let walk_1: Vec<u64> = streams.writable().collect();
1834 cycle_stream_priority(*walk_1.first().unwrap(), &mut streams);
1835 let walk_2: Vec<u64> = streams.writable().collect();
1836 cycle_stream_priority(*walk_2.first().unwrap(), &mut streams);
1837 let walk_3: Vec<u64> = streams.writable().collect();
1838 cycle_stream_priority(*walk_3.first().unwrap(), &mut streams);
1839 let walk_4: Vec<u64> = streams.writable().collect();
1840 cycle_stream_priority(*walk_4.first().unwrap(), &mut streams);
1841 let walk_5: Vec<u64> = streams.writable().collect();
1842 assert_eq!(walk_1, vec![12, 4, 8, 0]);
1843 assert_eq!(walk_2, vec![4, 8, 0, 12]);
1844 assert_eq!(walk_3, vec![8, 0, 12, 4,]);
1845 assert_eq!(walk_4, vec![0, 12, 4, 8]);
1846 assert_eq!(walk_5, vec![12, 4, 8, 0]);
1847 }
1848
1849 #[test]
1850 fn writable_prioritized_mixed_urgency() {
1851 let local_tp = crate::TransportParams::default();
1852 let peer_tp = crate::TransportParams {
1853 initial_max_stream_data_bidi_local: 100,
1854 initial_max_stream_data_uni: 100,
1855 ..Default::default()
1856 };
1857
1858 let mut streams = <StreamMap>::new(100, 100, 100);
1859
1860 let input = vec![
1863 (0, 100),
1864 (4, 90),
1865 (8, 80),
1866 (12, 70),
1867 (16, 60),
1868 (20, 50),
1869 (24, 40),
1870 (28, 30),
1871 (32, 20),
1872 (36, 10),
1873 (40, 0),
1874 ];
1875
1876 for (id, urgency) in input.clone() {
1877 let stream = streams
1880 .get_or_create(id, &local_tp, &peer_tp, false, true)
1881 .unwrap();
1882
1883 stream.urgency = urgency;
1884
1885 let new_priority_key = Arc::new(StreamPriorityKey {
1886 urgency: stream.urgency,
1887 incremental: stream.incremental,
1888 id,
1889 ..Default::default()
1890 });
1891
1892 let old_priority_key = std::mem::replace(
1893 &mut stream.priority_key,
1894 new_priority_key.clone(),
1895 );
1896
1897 streams.update_priority(&old_priority_key, &new_priority_key);
1898 }
1899
1900 let walk_1: Vec<u64> = streams.writable().collect();
1901 assert_eq!(walk_1, vec![40, 36, 32, 28, 24, 20, 16, 12, 8, 4, 0]);
1902
1903 for (id, urgency) in input {
1905 let stream = streams
1908 .get_or_create(id, &local_tp, &peer_tp, false, true)
1909 .unwrap();
1910
1911 stream.urgency = urgency;
1912
1913 let new_priority_key = Arc::new(StreamPriorityKey {
1914 urgency: stream.urgency,
1915 incremental: stream.incremental,
1916 id,
1917 ..Default::default()
1918 });
1919
1920 let old_priority_key = std::mem::replace(
1921 &mut stream.priority_key,
1922 new_priority_key.clone(),
1923 );
1924
1925 streams.update_priority(&old_priority_key, &new_priority_key);
1926 }
1927
1928 let walk_2: Vec<u64> = streams.writable().collect();
1929 assert_eq!(walk_2, vec![40, 36, 32, 28, 24, 20, 16, 12, 8, 4, 0]);
1930
1931 streams.collect(24, true);
1933
1934 let walk_3: Vec<u64> = streams.writable().collect();
1935 assert_eq!(walk_3, vec![40, 36, 32, 28, 20, 16, 12, 8, 4, 0]);
1936
1937 streams.collect(40, true);
1938 streams.collect(0, true);
1939
1940 let walk_4: Vec<u64> = streams.writable().collect();
1941 assert_eq!(walk_4, vec![36, 32, 28, 20, 16, 12, 8, 4]);
1942
1943 streams
1945 .get_or_create(44, &local_tp, &peer_tp, false, true)
1946 .unwrap();
1947
1948 let walk_5: Vec<u64> = streams.writable().collect();
1949 assert_eq!(walk_5, vec![36, 32, 28, 20, 16, 12, 8, 4, 44]);
1950 }
1951
1952 #[test]
1953 fn writable_prioritized_mixed_urgencies_incrementals() {
1954 let local_tp = crate::TransportParams::default();
1955 let peer_tp = crate::TransportParams {
1956 initial_max_stream_data_bidi_local: 100,
1957 initial_max_stream_data_uni: 100,
1958 ..Default::default()
1959 };
1960
1961 let mut streams = StreamMap::new(100, 100, 100);
1962
1963 let input = vec![
1965 (0, 100),
1966 (4, 20),
1967 (8, 100),
1968 (12, 20),
1969 (16, 90),
1970 (20, 25),
1971 (24, 90),
1972 (28, 30),
1973 (32, 80),
1974 (36, 20),
1975 (40, 0),
1976 ];
1977
1978 for (id, urgency) in input.clone() {
1979 let stream = streams
1982 .get_or_create(id, &local_tp, &peer_tp, false, true)
1983 .unwrap();
1984
1985 stream.urgency = urgency;
1986
1987 let new_priority_key = Arc::new(StreamPriorityKey {
1988 urgency: stream.urgency,
1989 incremental: stream.incremental,
1990 id,
1991 ..Default::default()
1992 });
1993
1994 let old_priority_key = std::mem::replace(
1995 &mut stream.priority_key,
1996 new_priority_key.clone(),
1997 );
1998
1999 streams.update_priority(&old_priority_key, &new_priority_key);
2000 }
2001
2002 let walk_1: Vec<u64> = streams.writable().collect();
2003 cycle_stream_priority(4, &mut streams);
2004 cycle_stream_priority(16, &mut streams);
2005 cycle_stream_priority(0, &mut streams);
2006 let walk_2: Vec<u64> = streams.writable().collect();
2007 cycle_stream_priority(12, &mut streams);
2008 cycle_stream_priority(24, &mut streams);
2009 cycle_stream_priority(8, &mut streams);
2010 let walk_3: Vec<u64> = streams.writable().collect();
2011 cycle_stream_priority(36, &mut streams);
2012 cycle_stream_priority(16, &mut streams);
2013 cycle_stream_priority(0, &mut streams);
2014 let walk_4: Vec<u64> = streams.writable().collect();
2015 cycle_stream_priority(4, &mut streams);
2016 cycle_stream_priority(24, &mut streams);
2017 cycle_stream_priority(8, &mut streams);
2018 let walk_5: Vec<u64> = streams.writable().collect();
2019 cycle_stream_priority(12, &mut streams);
2020 cycle_stream_priority(16, &mut streams);
2021 cycle_stream_priority(0, &mut streams);
2022 let walk_6: Vec<u64> = streams.writable().collect();
2023 cycle_stream_priority(36, &mut streams);
2024 cycle_stream_priority(24, &mut streams);
2025 cycle_stream_priority(8, &mut streams);
2026 let walk_7: Vec<u64> = streams.writable().collect();
2027 cycle_stream_priority(4, &mut streams);
2028 cycle_stream_priority(16, &mut streams);
2029 cycle_stream_priority(0, &mut streams);
2030 let walk_8: Vec<u64> = streams.writable().collect();
2031 cycle_stream_priority(12, &mut streams);
2032 cycle_stream_priority(24, &mut streams);
2033 cycle_stream_priority(8, &mut streams);
2034 let walk_9: Vec<u64> = streams.writable().collect();
2035 cycle_stream_priority(36, &mut streams);
2036 cycle_stream_priority(16, &mut streams);
2037 cycle_stream_priority(0, &mut streams);
2038
2039 assert_eq!(walk_1, vec![40, 4, 12, 36, 20, 28, 32, 16, 24, 0, 8]);
2040 assert_eq!(walk_2, vec![40, 12, 36, 4, 20, 28, 32, 24, 16, 8, 0]);
2041 assert_eq!(walk_3, vec![40, 36, 4, 12, 20, 28, 32, 16, 24, 0, 8]);
2042 assert_eq!(walk_4, vec![40, 4, 12, 36, 20, 28, 32, 24, 16, 8, 0]);
2043 assert_eq!(walk_5, vec![40, 12, 36, 4, 20, 28, 32, 16, 24, 0, 8]);
2044 assert_eq!(walk_6, vec![40, 36, 4, 12, 20, 28, 32, 24, 16, 8, 0]);
2045 assert_eq!(walk_7, vec![40, 4, 12, 36, 20, 28, 32, 16, 24, 0, 8]);
2046 assert_eq!(walk_8, vec![40, 12, 36, 4, 20, 28, 32, 24, 16, 8, 0]);
2047 assert_eq!(walk_9, vec![40, 36, 4, 12, 20, 28, 32, 16, 24, 0, 8]);
2048
2049 streams.collect(20, true);
2051
2052 let walk_10: Vec<u64> = streams.writable().collect();
2053 assert_eq!(walk_10, vec![40, 4, 12, 36, 28, 32, 24, 16, 8, 0]);
2054
2055 let stream = streams
2057 .get_or_create(44, &local_tp, &peer_tp, false, true)
2058 .unwrap();
2059
2060 stream.urgency = 20;
2061 stream.incremental = true;
2062
2063 let new_priority_key = Arc::new(StreamPriorityKey {
2064 urgency: stream.urgency,
2065 incremental: stream.incremental,
2066 id: 44,
2067 ..Default::default()
2068 });
2069
2070 let old_priority_key =
2071 std::mem::replace(&mut stream.priority_key, new_priority_key.clone());
2072
2073 streams.update_priority(&old_priority_key, &new_priority_key);
2074
2075 let walk_11: Vec<u64> = streams.writable().collect();
2076 assert_eq!(walk_11, vec![40, 4, 12, 36, 44, 28, 32, 24, 16, 8, 0]);
2077 }
2078
2079 #[test]
2080 fn priority_tree_dupes() {
2081 let mut prioritized_writable: RBTree<StreamWritablePriorityAdapter> =
2082 Default::default();
2083
2084 for id in [0, 4, 8, 12] {
2085 let s = Arc::new(StreamPriorityKey {
2086 urgency: 0,
2087 incremental: false,
2088 id,
2089 ..Default::default()
2090 });
2091
2092 prioritized_writable.insert(s);
2093 }
2094
2095 let walk_1: Vec<u64> =
2096 prioritized_writable.iter().map(|s| s.id).collect();
2097 assert_eq!(walk_1, vec![0, 4, 8, 12]);
2098
2099 for id in [0, 4, 8, 12] {
2102 let s = Arc::new(StreamPriorityKey {
2103 urgency: 0,
2104 incremental: false,
2105 id,
2106 ..Default::default()
2107 });
2108
2109 prioritized_writable.insert(s);
2110 }
2111
2112 let walk_2: Vec<u64> =
2113 prioritized_writable.iter().map(|s| s.id).collect();
2114 assert_eq!(walk_2, vec![0, 0, 4, 4, 8, 8, 12, 12]);
2115 }
2116}
2117
2118mod recv_buf;
2119mod send_buf;