1use std::cmp;
28
29use std::sync::Arc;
30
31use std::collections::hash_map;
32use std::collections::HashMap;
33use std::collections::HashSet;
34
35use intrusive_collections::intrusive_adapter;
36use intrusive_collections::KeyAdapter;
37use intrusive_collections::RBTree;
38use intrusive_collections::RBTreeAtomicLink;
39
40use smallvec::SmallVec;
41
42use crate::range_buf::DefaultBufFactory;
43use crate::BufFactory;
44use crate::Error;
45use crate::Result;
46
47const DEFAULT_URGENCY: u8 = 127;
48
49const DEFAULT_STREAM_WINDOW: u64 = 32 * 1024;
51
52pub const MAX_STREAM_WINDOW: u64 = 16 * 1024 * 1024;
54
55#[derive(Default)]
60pub struct StreamIdHasher {
61 id: u64,
62}
63
64impl std::hash::Hasher for StreamIdHasher {
65 #[inline]
66 fn finish(&self) -> u64 {
67 self.id
68 }
69
70 #[inline]
71 fn write_u64(&mut self, id: u64) {
72 self.id = id;
73 }
74
75 #[inline]
76 fn write(&mut self, _: &[u8]) {
77 unimplemented!()
80 }
81}
82
83type BuildStreamIdHasher = std::hash::BuildHasherDefault<StreamIdHasher>;
84
85pub type StreamIdHashMap<V> = HashMap<u64, V, BuildStreamIdHasher>;
86pub type StreamIdHashSet = HashSet<u64, BuildStreamIdHasher>;
87
88#[derive(Default)]
90pub struct StreamMap<F: BufFactory = DefaultBufFactory> {
91 streams: StreamIdHashMap<Stream<F>>,
93
94 collected: StreamIdHashSet,
100
101 peer_max_streams_bidi: u64,
103
104 peer_max_streams_uni: u64,
106
107 peer_opened_streams_bidi: u64,
109
110 peer_opened_streams_uni: u64,
112
113 local_max_streams_bidi: u64,
115 local_max_streams_bidi_next: u64,
116
117 local_max_streams_uni: u64,
119 local_max_streams_uni_next: u64,
120
121 local_opened_streams_bidi: u64,
123
124 local_opened_streams_uni: u64,
126
127 flushable: RBTree<StreamFlushablePriorityAdapter>,
131
132 pub readable: RBTree<StreamReadablePriorityAdapter>,
136
137 pub writable: RBTree<StreamWritablePriorityAdapter>,
142
143 almost_full: StreamIdHashSet,
148
149 blocked: StreamIdHashMap<u64>,
153
154 reset: StreamIdHashMap<(u64, u64)>,
158
159 stopped: StreamIdHashMap<u64>,
163
164 max_stream_window: u64,
166}
167
168impl<F: BufFactory> StreamMap<F> {
169 pub fn new(
170 max_streams_bidi: u64, max_streams_uni: u64, max_stream_window: u64,
171 ) -> Self {
172 StreamMap {
173 local_max_streams_bidi: max_streams_bidi,
174 local_max_streams_bidi_next: max_streams_bidi,
175
176 local_max_streams_uni: max_streams_uni,
177 local_max_streams_uni_next: max_streams_uni,
178
179 max_stream_window,
180
181 ..StreamMap::default()
182 }
183 }
184
185 pub fn get(&self, id: u64) -> Option<&Stream<F>> {
187 self.streams.get(&id)
188 }
189
190 pub fn get_mut(&mut self, id: u64) -> Option<&mut Stream<F>> {
192 self.streams.get_mut(&id)
193 }
194
195 pub(crate) fn get_or_create(
208 &mut self, id: u64, local_params: &crate::TransportParams,
209 peer_params: &crate::TransportParams, local: bool, is_server: bool,
210 ) -> Result<&mut Stream<F>> {
211 let (stream, is_new_and_writable) = match self.streams.entry(id) {
212 hash_map::Entry::Vacant(v) => {
213 if self.collected.contains(&id) {
215 return Err(Error::Done);
216 }
217
218 if local != is_local(id, is_server) {
219 return Err(Error::InvalidStreamState(id));
220 }
221
222 let (max_rx_data, max_tx_data) = match (local, is_bidi(id)) {
223 (true, true) => (
225 local_params.initial_max_stream_data_bidi_local,
226 peer_params.initial_max_stream_data_bidi_remote,
227 ),
228
229 (true, false) => (0, peer_params.initial_max_stream_data_uni),
231
232 (false, true) => (
234 local_params.initial_max_stream_data_bidi_remote,
235 peer_params.initial_max_stream_data_bidi_local,
236 ),
237
238 (false, false) =>
240 (local_params.initial_max_stream_data_uni, 0),
241 };
242
243 let stream_sequence = id >> 2;
247
248 match (is_local(id, is_server), is_bidi(id)) {
250 (true, true) => {
251 let n = std::cmp::max(
252 self.local_opened_streams_bidi,
253 stream_sequence + 1,
254 );
255
256 if n > self.peer_max_streams_bidi {
257 return Err(Error::StreamLimit);
258 }
259
260 self.local_opened_streams_bidi = n;
261 },
262
263 (true, false) => {
264 let n = std::cmp::max(
265 self.local_opened_streams_uni,
266 stream_sequence + 1,
267 );
268
269 if n > self.peer_max_streams_uni {
270 return Err(Error::StreamLimit);
271 }
272
273 self.local_opened_streams_uni = n;
274 },
275
276 (false, true) => {
277 let n = std::cmp::max(
278 self.peer_opened_streams_bidi,
279 stream_sequence + 1,
280 );
281
282 if n > self.local_max_streams_bidi {
283 return Err(Error::StreamLimit);
284 }
285
286 self.peer_opened_streams_bidi = n;
287 },
288
289 (false, false) => {
290 let n = std::cmp::max(
291 self.peer_opened_streams_uni,
292 stream_sequence + 1,
293 );
294
295 if n > self.local_max_streams_uni {
296 return Err(Error::StreamLimit);
297 }
298
299 self.peer_opened_streams_uni = n;
300 },
301 };
302
303 let s = Stream::new(
304 id,
305 max_rx_data,
306 max_tx_data,
307 is_bidi(id),
308 local,
309 self.max_stream_window,
310 );
311
312 let is_writable = s.is_writable();
313
314 (v.insert(s), is_writable)
315 },
316
317 hash_map::Entry::Occupied(v) => (v.into_mut(), false),
318 };
319
320 if is_new_and_writable {
323 self.writable.insert(Arc::clone(&stream.priority_key));
324 }
325
326 Ok(stream)
327 }
328
329 pub fn insert_readable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
333 if !priority_key.readable.is_linked() {
334 self.readable.insert(Arc::clone(priority_key));
335 }
336 }
337
338 pub fn remove_readable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
340 if !priority_key.readable.is_linked() {
341 return;
342 }
343
344 let mut c = {
345 let ptr = Arc::as_ptr(priority_key);
346 unsafe { self.readable.cursor_mut_from_ptr(ptr) }
347 };
348
349 c.remove();
350 }
351
352 pub fn insert_writable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
359 if !priority_key.writable.is_linked() {
360 self.writable.insert(Arc::clone(priority_key));
361 }
362 }
363
364 pub fn remove_writable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
369 if !priority_key.writable.is_linked() {
370 return;
371 }
372
373 let mut c = {
374 let ptr = Arc::as_ptr(priority_key);
375 unsafe { self.writable.cursor_mut_from_ptr(ptr) }
376 };
377
378 c.remove();
379 }
380
381 pub fn insert_flushable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
385 if !priority_key.flushable.is_linked() {
386 self.flushable.insert(Arc::clone(priority_key));
387 }
388 }
389
390 pub fn remove_flushable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
392 if !priority_key.flushable.is_linked() {
393 return;
394 }
395
396 let mut c = {
397 let ptr = Arc::as_ptr(priority_key);
398 unsafe { self.flushable.cursor_mut_from_ptr(ptr) }
399 };
400
401 c.remove();
402 }
403
404 pub fn peek_flushable(&self) -> Option<Arc<StreamPriorityKey>> {
405 self.flushable.front().clone_pointer()
406 }
407
408 pub fn update_priority(
410 &mut self, old: &Arc<StreamPriorityKey>, new: &Arc<StreamPriorityKey>,
411 ) {
412 if old.readable.is_linked() {
413 self.remove_readable(old);
414 self.readable.insert(Arc::clone(new));
415 }
416
417 if old.writable.is_linked() {
418 self.remove_writable(old);
419 self.writable.insert(Arc::clone(new));
420 }
421
422 if old.flushable.is_linked() {
423 self.remove_flushable(old);
424 self.flushable.insert(Arc::clone(new));
425 }
426 }
427
428 pub fn insert_almost_full(&mut self, stream_id: u64) {
432 self.almost_full.insert(stream_id);
433 }
434
435 pub fn remove_almost_full(&mut self, stream_id: u64) {
437 self.almost_full.remove(&stream_id);
438 }
439
440 pub fn insert_blocked(&mut self, stream_id: u64, off: u64) {
445 self.blocked.insert(stream_id, off);
446 }
447
448 pub fn remove_blocked(&mut self, stream_id: u64) {
450 self.blocked.remove(&stream_id);
451 }
452
453 pub fn insert_reset(
458 &mut self, stream_id: u64, error_code: u64, final_size: u64,
459 ) {
460 self.reset.insert(stream_id, (error_code, final_size));
461 }
462
463 pub fn remove_reset(&mut self, stream_id: u64) {
465 self.reset.remove(&stream_id);
466 }
467
468 pub fn insert_stopped(&mut self, stream_id: u64, error_code: u64) {
473 self.stopped.insert(stream_id, error_code);
474 }
475
476 pub fn remove_stopped(&mut self, stream_id: u64) {
478 self.stopped.remove(&stream_id);
479 }
480
481 pub fn update_peer_max_streams_bidi(&mut self, v: u64) {
483 self.peer_max_streams_bidi = cmp::max(self.peer_max_streams_bidi, v);
484 }
485
486 pub fn update_peer_max_streams_uni(&mut self, v: u64) {
488 self.peer_max_streams_uni = cmp::max(self.peer_max_streams_uni, v);
489 }
490
491 pub fn update_max_streams_bidi(&mut self) {
493 self.local_max_streams_bidi = self.local_max_streams_bidi_next;
494 }
495
496 pub fn max_streams_bidi(&self) -> u64 {
498 self.local_max_streams_bidi
499 }
500
501 pub fn max_streams_bidi_next(&mut self) -> u64 {
503 self.local_max_streams_bidi_next
504 }
505
506 pub fn update_max_streams_uni(&mut self) {
508 self.local_max_streams_uni = self.local_max_streams_uni_next;
509 }
510
511 pub fn max_streams_uni_next(&mut self) -> u64 {
513 self.local_max_streams_uni_next
514 }
515
516 pub fn peer_streams_left_bidi(&self) -> u64 {
519 self.peer_max_streams_bidi - self.local_opened_streams_bidi
520 }
521
522 pub fn peer_streams_left_uni(&self) -> u64 {
525 self.peer_max_streams_uni - self.local_opened_streams_uni
526 }
527
528 pub fn collect(&mut self, stream_id: u64, local: bool) {
533 if !local {
534 if is_bidi(stream_id) {
537 self.local_max_streams_bidi_next =
538 self.local_max_streams_bidi_next.saturating_add(1);
539 } else {
540 self.local_max_streams_uni_next =
541 self.local_max_streams_uni_next.saturating_add(1);
542 }
543 }
544
545 let s = self.streams.remove(&stream_id).unwrap();
546
547 self.remove_readable(&s.priority_key);
548
549 self.remove_writable(&s.priority_key);
550
551 self.remove_flushable(&s.priority_key);
552
553 self.collected.insert(stream_id);
554 }
555
556 pub fn readable(&self) -> StreamIter {
558 StreamIter {
559 streams: self.readable.iter().map(|s| s.id).collect(),
560 index: 0,
561 }
562 }
563
564 pub fn writable(&self) -> StreamIter {
566 StreamIter {
567 streams: self.writable.iter().map(|s| s.id).collect(),
568 index: 0,
569 }
570 }
571
572 pub fn almost_full(&self) -> StreamIter {
574 StreamIter::from(&self.almost_full)
575 }
576
577 pub fn blocked(&self) -> hash_map::Iter<u64, u64> {
579 self.blocked.iter()
580 }
581
582 pub fn reset(&self) -> hash_map::Iter<u64, (u64, u64)> {
584 self.reset.iter()
585 }
586
587 pub fn stopped(&self) -> hash_map::Iter<u64, u64> {
589 self.stopped.iter()
590 }
591
592 pub fn is_collected(&self, stream_id: u64) -> bool {
594 self.collected.contains(&stream_id)
595 }
596
597 pub fn has_flushable(&self) -> bool {
599 !self.flushable.is_empty()
600 }
601
602 pub fn has_readable(&self) -> bool {
604 !self.readable.is_empty()
605 }
606
607 pub fn has_almost_full(&self) -> bool {
610 !self.almost_full.is_empty()
611 }
612
613 pub fn has_blocked(&self) -> bool {
615 !self.blocked.is_empty()
616 }
617
618 pub fn has_reset(&self) -> bool {
620 !self.reset.is_empty()
621 }
622
623 pub fn has_stopped(&self) -> bool {
625 !self.stopped.is_empty()
626 }
627
628 pub fn should_update_max_streams_bidi(&self) -> bool {
631 self.local_max_streams_bidi_next != self.local_max_streams_bidi &&
632 self.local_max_streams_bidi_next / 2 >
633 self.local_max_streams_bidi - self.peer_opened_streams_bidi
634 }
635
636 pub fn should_update_max_streams_uni(&self) -> bool {
639 self.local_max_streams_uni_next != self.local_max_streams_uni &&
640 self.local_max_streams_uni_next / 2 >
641 self.local_max_streams_uni - self.peer_opened_streams_uni
642 }
643
644 #[cfg(test)]
646 pub fn len(&self) -> usize {
647 self.streams.len()
648 }
649}
650
651pub struct Stream<F: BufFactory = DefaultBufFactory> {
653 pub recv: recv_buf::RecvBuf,
655
656 pub send: send_buf::SendBuf<F>,
658
659 pub send_lowat: usize,
660
661 pub bidi: bool,
663
664 pub local: bool,
666
667 pub urgency: u8,
669
670 pub incremental: bool,
672
673 pub priority_key: Arc<StreamPriorityKey>,
674}
675
676impl<F: BufFactory> Stream<F> {
677 pub fn new(
679 id: u64, max_rx_data: u64, max_tx_data: u64, bidi: bool, local: bool,
680 max_window: u64,
681 ) -> Self {
682 let priority_key = Arc::new(StreamPriorityKey {
683 id,
684 ..Default::default()
685 });
686
687 Stream {
688 recv: recv_buf::RecvBuf::new(max_rx_data, max_window),
689 send: send_buf::SendBuf::new(max_tx_data),
690 send_lowat: 1,
691 bidi,
692 local,
693 urgency: priority_key.urgency,
694 incremental: priority_key.incremental,
695 priority_key,
696 }
697 }
698
699 pub fn is_readable(&self) -> bool {
701 self.recv.ready()
702 }
703
704 pub fn is_writable(&self) -> bool {
707 !self.send.is_shutdown() &&
708 !self.send.is_fin() &&
709 (self.send.off_back() + self.send_lowat as u64) <
710 self.send.max_off()
711 }
712
713 pub fn is_flushable(&self) -> bool {
716 let off_front = self.send.off_front();
717
718 !self.send.is_empty() &&
719 off_front < self.send.off_back() &&
720 off_front < self.send.max_off()
721 }
722
723 pub fn is_complete(&self) -> bool {
733 match (self.bidi, self.local) {
734 (true, _) => self.recv.is_fin() && self.send.is_complete(),
737
738 (false, true) => self.send.is_complete(),
741
742 (false, false) => self.recv.is_fin(),
745 }
746 }
747}
748
749pub fn is_local(stream_id: u64, is_server: bool) -> bool {
751 (stream_id & 0x1) == (is_server as u64)
752}
753
754pub fn is_bidi(stream_id: u64) -> bool {
756 (stream_id & 0x2) == 0
757}
758
759#[derive(Clone, Debug)]
760pub struct StreamPriorityKey {
761 pub urgency: u8,
762 pub incremental: bool,
763 pub id: u64,
764
765 pub readable: RBTreeAtomicLink,
766 pub writable: RBTreeAtomicLink,
767 pub flushable: RBTreeAtomicLink,
768}
769
770impl Default for StreamPriorityKey {
771 fn default() -> Self {
772 Self {
773 urgency: DEFAULT_URGENCY,
774 incremental: true,
775 id: Default::default(),
776 readable: Default::default(),
777 writable: Default::default(),
778 flushable: Default::default(),
779 }
780 }
781}
782
783impl PartialEq for StreamPriorityKey {
784 fn eq(&self, other: &Self) -> bool {
785 self.id == other.id
786 }
787}
788
789impl Eq for StreamPriorityKey {}
790
791impl PartialOrd for StreamPriorityKey {
792 #[allow(clippy::non_canonical_partial_ord_impl)]
794 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
795 if self.id == other.id {
797 return Some(std::cmp::Ordering::Equal);
798 }
799
800 if self.urgency != other.urgency {
802 return self.urgency.partial_cmp(&other.urgency);
803 }
804
805 if !self.incremental && !other.incremental {
808 return self.id.partial_cmp(&other.id);
809 }
810
811 if self.incremental && !other.incremental {
813 return Some(std::cmp::Ordering::Greater);
814 }
815 if !self.incremental && other.incremental {
816 return Some(std::cmp::Ordering::Less);
817 }
818
819 Some(std::cmp::Ordering::Greater)
823 }
824}
825
826impl Ord for StreamPriorityKey {
827 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
828 self.partial_cmp(other).unwrap()
830 }
831}
832
833intrusive_adapter!(pub StreamWritablePriorityAdapter = Arc<StreamPriorityKey>: StreamPriorityKey { writable: RBTreeAtomicLink });
834
835impl KeyAdapter<'_> for StreamWritablePriorityAdapter {
836 type Key = StreamPriorityKey;
837
838 fn get_key(&self, s: &StreamPriorityKey) -> Self::Key {
839 s.clone()
840 }
841}
842
843intrusive_adapter!(pub StreamReadablePriorityAdapter = Arc<StreamPriorityKey>: StreamPriorityKey { readable: RBTreeAtomicLink });
844
845impl KeyAdapter<'_> for StreamReadablePriorityAdapter {
846 type Key = StreamPriorityKey;
847
848 fn get_key(&self, s: &StreamPriorityKey) -> Self::Key {
849 s.clone()
850 }
851}
852
853intrusive_adapter!(pub StreamFlushablePriorityAdapter = Arc<StreamPriorityKey>: StreamPriorityKey { flushable: RBTreeAtomicLink });
854
855impl KeyAdapter<'_> for StreamFlushablePriorityAdapter {
856 type Key = StreamPriorityKey;
857
858 fn get_key(&self, s: &StreamPriorityKey) -> Self::Key {
859 s.clone()
860 }
861}
862
863#[derive(Default)]
865pub struct StreamIter {
866 streams: SmallVec<[u64; 8]>,
867 index: usize,
868}
869
870impl StreamIter {
871 #[inline]
872 fn from(streams: &StreamIdHashSet) -> Self {
873 StreamIter {
874 streams: streams.iter().copied().collect(),
875 index: 0,
876 }
877 }
878}
879
880impl Iterator for StreamIter {
881 type Item = u64;
882
883 #[inline]
884 fn next(&mut self) -> Option<Self::Item> {
885 let v = self.streams.get(self.index)?;
886 self.index += 1;
887 Some(*v)
888 }
889}
890
891impl ExactSizeIterator for StreamIter {
892 #[inline]
893 fn len(&self) -> usize {
894 self.streams.len() - self.index
895 }
896}
897
898#[cfg(test)]
899mod tests {
900 use crate::range_buf::RangeBuf;
901
902 use super::*;
903
904 #[test]
905 fn recv_flow_control() {
906 let mut stream =
907 <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
908 assert!(!stream.recv.almost_full());
909
910 let mut buf = [0; 32];
911
912 let first = RangeBuf::from(b"hello", 0, false);
913 let second = RangeBuf::from(b"world", 5, false);
914 let third = RangeBuf::from(b"something", 10, false);
915
916 assert_eq!(stream.recv.write(second), Ok(()));
917 assert_eq!(stream.recv.write(first), Ok(()));
918 assert!(!stream.recv.almost_full());
919
920 assert_eq!(stream.recv.write(third), Err(Error::FlowControl));
921
922 let (len, fin) = stream.recv.emit(&mut buf).unwrap();
923 assert_eq!(&buf[..len], b"helloworld");
924 assert!(!fin);
925
926 assert!(stream.recv.almost_full());
927
928 stream.recv.update_max_data(std::time::Instant::now());
929 assert_eq!(stream.recv.max_data_next(), 25);
930 assert!(!stream.recv.almost_full());
931
932 let third = RangeBuf::from(b"something", 10, false);
933 assert_eq!(stream.recv.write(third), Ok(()));
934 }
935
936 #[test]
937 fn recv_past_fin() {
938 let mut stream =
939 <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
940 assert!(!stream.recv.almost_full());
941
942 let first = RangeBuf::from(b"hello", 0, true);
943 let second = RangeBuf::from(b"world", 5, false);
944
945 assert_eq!(stream.recv.write(first), Ok(()));
946 assert_eq!(stream.recv.write(second), Err(Error::FinalSize));
947 }
948
949 #[test]
950 fn recv_fin_dup() {
951 let mut stream =
952 <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
953 assert!(!stream.recv.almost_full());
954
955 let first = RangeBuf::from(b"hello", 0, true);
956 let second = RangeBuf::from(b"hello", 0, true);
957
958 assert_eq!(stream.recv.write(first), Ok(()));
959 assert_eq!(stream.recv.write(second), Ok(()));
960
961 let mut buf = [0; 32];
962
963 let (len, fin) = stream.recv.emit(&mut buf).unwrap();
964 assert_eq!(&buf[..len], b"hello");
965 assert!(fin);
966 }
967
968 #[test]
969 fn recv_fin_change() {
970 let mut stream =
971 <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
972 assert!(!stream.recv.almost_full());
973
974 let first = RangeBuf::from(b"hello", 0, true);
975 let second = RangeBuf::from(b"world", 5, true);
976
977 assert_eq!(stream.recv.write(second), Ok(()));
978 assert_eq!(stream.recv.write(first), Err(Error::FinalSize));
979 }
980
981 #[test]
982 fn recv_fin_lower_than_received() {
983 let mut stream =
984 <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
985 assert!(!stream.recv.almost_full());
986
987 let first = RangeBuf::from(b"hello", 0, true);
988 let second = RangeBuf::from(b"world", 5, false);
989
990 assert_eq!(stream.recv.write(second), Ok(()));
991 assert_eq!(stream.recv.write(first), Err(Error::FinalSize));
992 }
993
994 #[test]
995 fn recv_fin_flow_control() {
996 let mut stream =
997 <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
998 assert!(!stream.recv.almost_full());
999
1000 let mut buf = [0; 32];
1001
1002 let first = RangeBuf::from(b"hello", 0, false);
1003 let second = RangeBuf::from(b"world", 5, true);
1004
1005 assert_eq!(stream.recv.write(first), Ok(()));
1006 assert_eq!(stream.recv.write(second), Ok(()));
1007
1008 let (len, fin) = stream.recv.emit(&mut buf).unwrap();
1009 assert_eq!(&buf[..len], b"helloworld");
1010 assert!(fin);
1011
1012 assert!(!stream.recv.almost_full());
1013 }
1014
1015 #[test]
1016 fn recv_fin_reset_mismatch() {
1017 let mut stream =
1018 <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1019 assert!(!stream.recv.almost_full());
1020
1021 let first = RangeBuf::from(b"hello", 0, true);
1022
1023 assert_eq!(stream.recv.write(first), Ok(()));
1024 assert_eq!(stream.recv.reset(0, 10), Err(Error::FinalSize));
1025 }
1026
1027 #[test]
1028 fn recv_reset_dup() {
1029 let mut stream =
1030 <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1031 assert!(!stream.recv.almost_full());
1032
1033 let first = RangeBuf::from(b"hello", 0, false);
1034
1035 assert_eq!(stream.recv.write(first), Ok(()));
1036 assert_eq!(stream.recv.reset(0, 5), Ok(0));
1037 assert_eq!(stream.recv.reset(0, 5), Ok(0));
1038 }
1039
1040 #[test]
1041 fn recv_reset_change() {
1042 let mut stream =
1043 <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1044 assert!(!stream.recv.almost_full());
1045
1046 let first = RangeBuf::from(b"hello", 0, false);
1047
1048 assert_eq!(stream.recv.write(first), Ok(()));
1049 assert_eq!(stream.recv.reset(0, 5), Ok(0));
1050 assert_eq!(stream.recv.reset(0, 10), Err(Error::FinalSize));
1051 }
1052
1053 #[test]
1054 fn recv_reset_lower_than_received() {
1055 let mut stream =
1056 <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1057 assert!(!stream.recv.almost_full());
1058
1059 let first = RangeBuf::from(b"hello", 0, false);
1060
1061 assert_eq!(stream.recv.write(first), Ok(()));
1062 assert_eq!(stream.recv.reset(0, 4), Err(Error::FinalSize));
1063 }
1064
1065 #[test]
1066 fn send_flow_control() {
1067 let mut buf = [0; 25];
1068
1069 let mut stream =
1070 <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1071
1072 let first = b"hello";
1073 let second = b"world";
1074 let third = b"something";
1075
1076 assert!(stream.send.write(first, false).is_ok());
1077 assert!(stream.send.write(second, false).is_ok());
1078 assert!(stream.send.write(third, false).is_ok());
1079
1080 assert_eq!(stream.send.off_front(), 0);
1081
1082 let (written, fin) = stream.send.emit(&mut buf[..25]).unwrap();
1083 assert_eq!(written, 15);
1084 assert!(!fin);
1085 assert_eq!(&buf[..written], b"helloworldsomet");
1086
1087 assert_eq!(stream.send.off_front(), 15);
1088
1089 let (written, fin) = stream.send.emit(&mut buf[..25]).unwrap();
1090 assert_eq!(written, 0);
1091 assert!(!fin);
1092 assert_eq!(&buf[..written], b"");
1093
1094 stream.send.retransmit(0, 15);
1095
1096 assert_eq!(stream.send.off_front(), 0);
1097
1098 let (written, fin) = stream.send.emit(&mut buf[..10]).unwrap();
1099 assert_eq!(written, 10);
1100 assert!(!fin);
1101 assert_eq!(&buf[..written], b"helloworld");
1102
1103 assert_eq!(stream.send.off_front(), 10);
1104
1105 let (written, fin) = stream.send.emit(&mut buf[..10]).unwrap();
1106 assert_eq!(written, 5);
1107 assert!(!fin);
1108 assert_eq!(&buf[..written], b"somet");
1109 }
1110
1111 #[test]
1112 fn send_past_fin() {
1113 let mut stream =
1114 <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1115
1116 let first = b"hello";
1117 let second = b"world";
1118 let third = b"third";
1119
1120 assert_eq!(stream.send.write(first, false), Ok(5));
1121
1122 assert_eq!(stream.send.write(second, true), Ok(5));
1123 assert!(stream.send.is_fin());
1124
1125 assert_eq!(stream.send.write(third, false), Err(Error::FinalSize));
1126 }
1127
1128 #[test]
1129 fn send_fin_dup() {
1130 let mut stream =
1131 <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1132
1133 assert_eq!(stream.send.write(b"hello", true), Ok(5));
1134 assert!(stream.send.is_fin());
1135
1136 assert_eq!(stream.send.write(b"", true), Ok(0));
1137 assert!(stream.send.is_fin());
1138 }
1139
1140 #[test]
1141 fn send_undo_fin() {
1142 let mut stream =
1143 <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1144
1145 assert_eq!(stream.send.write(b"hello", true), Ok(5));
1146 assert!(stream.send.is_fin());
1147
1148 assert_eq!(
1149 stream.send.write(b"helloworld", true),
1150 Err(Error::FinalSize)
1151 );
1152 }
1153
1154 #[test]
1155 fn send_fin_max_data_match() {
1156 let mut buf = [0; 15];
1157
1158 let mut stream =
1159 <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1160
1161 let slice = b"hellohellohello";
1162
1163 assert!(stream.send.write(slice, true).is_ok());
1164
1165 let (written, fin) = stream.send.emit(&mut buf[..15]).unwrap();
1166 assert_eq!(written, 15);
1167 assert!(fin);
1168 assert_eq!(&buf[..written], slice);
1169 }
1170
1171 #[test]
1172 fn send_fin_zero_length() {
1173 let mut buf = [0; 5];
1174
1175 let mut stream =
1176 <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1177
1178 assert_eq!(stream.send.write(b"hello", false), Ok(5));
1179 assert_eq!(stream.send.write(b"", true), Ok(0));
1180 assert!(stream.send.is_fin());
1181
1182 let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1183 assert_eq!(written, 5);
1184 assert!(fin);
1185 assert_eq!(&buf[..written], b"hello");
1186 }
1187
1188 #[test]
1189 fn send_ack() {
1190 let mut buf = [0; 5];
1191
1192 let mut stream =
1193 <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1194
1195 assert_eq!(stream.send.write(b"hello", false), Ok(5));
1196 assert_eq!(stream.send.write(b"world", false), Ok(5));
1197 assert_eq!(stream.send.write(b"", true), Ok(0));
1198 assert!(stream.send.is_fin());
1199
1200 assert_eq!(stream.send.off_front(), 0);
1201
1202 let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1203 assert_eq!(written, 5);
1204 assert!(!fin);
1205 assert_eq!(&buf[..written], b"hello");
1206
1207 stream.send.ack_and_drop(0, 5);
1208
1209 stream.send.retransmit(0, 5);
1210
1211 assert_eq!(stream.send.off_front(), 5);
1212
1213 let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1214 assert_eq!(written, 5);
1215 assert!(fin);
1216 assert_eq!(&buf[..written], b"world");
1217 }
1218
1219 #[test]
1220 fn send_ack_reordering() {
1221 let mut buf = [0; 5];
1222
1223 let mut stream =
1224 <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1225
1226 assert_eq!(stream.send.write(b"hello", false), Ok(5));
1227 assert_eq!(stream.send.write(b"world", false), Ok(5));
1228 assert_eq!(stream.send.write(b"", true), Ok(0));
1229 assert!(stream.send.is_fin());
1230
1231 assert_eq!(stream.send.off_front(), 0);
1232
1233 let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1234 assert_eq!(written, 5);
1235 assert!(!fin);
1236 assert_eq!(&buf[..written], b"hello");
1237
1238 assert_eq!(stream.send.off_front(), 5);
1239
1240 let (written, fin) = stream.send.emit(&mut buf[..1]).unwrap();
1241 assert_eq!(written, 1);
1242 assert!(!fin);
1243 assert_eq!(&buf[..written], b"w");
1244
1245 stream.send.ack_and_drop(5, 1);
1246 stream.send.ack_and_drop(0, 5);
1247
1248 stream.send.retransmit(0, 5);
1249 stream.send.retransmit(5, 1);
1250
1251 assert_eq!(stream.send.off_front(), 6);
1252
1253 let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1254 assert_eq!(written, 4);
1255 assert!(fin);
1256 assert_eq!(&buf[..written], b"orld");
1257 }
1258
1259 #[test]
1260 fn recv_data_below_off() {
1261 let mut stream =
1262 <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1263
1264 let first = RangeBuf::from(b"hello", 0, false);
1265
1266 assert_eq!(stream.recv.write(first), Ok(()));
1267
1268 let mut buf = [0; 10];
1269
1270 let (len, fin) = stream.recv.emit(&mut buf).unwrap();
1271 assert_eq!(&buf[..len], b"hello");
1272 assert!(!fin);
1273
1274 let first = RangeBuf::from(b"elloworld", 1, true);
1275 assert_eq!(stream.recv.write(first), Ok(()));
1276
1277 let (len, fin) = stream.recv.emit(&mut buf).unwrap();
1278 assert_eq!(&buf[..len], b"world");
1279 assert!(fin);
1280 }
1281
1282 #[test]
1283 fn stream_complete() {
1284 let mut stream =
1285 <Stream>::new(0, 30, 30, true, true, DEFAULT_STREAM_WINDOW);
1286
1287 assert_eq!(stream.send.write(b"hello", false), Ok(5));
1288 assert_eq!(stream.send.write(b"world", false), Ok(5));
1289
1290 assert!(!stream.send.is_complete());
1291 assert!(!stream.send.is_fin());
1292
1293 assert_eq!(stream.send.write(b"", true), Ok(0));
1294
1295 assert!(!stream.send.is_complete());
1296 assert!(stream.send.is_fin());
1297
1298 let buf = RangeBuf::from(b"hello", 0, true);
1299 assert!(stream.recv.write(buf).is_ok());
1300 assert!(!stream.recv.is_fin());
1301
1302 stream.send.ack(6, 4);
1303 assert!(!stream.send.is_complete());
1304
1305 let mut buf = [0; 2];
1306 assert_eq!(stream.recv.emit(&mut buf), Ok((2, false)));
1307 assert!(!stream.recv.is_fin());
1308
1309 stream.send.ack(1, 5);
1310 assert!(!stream.send.is_complete());
1311
1312 stream.send.ack(0, 1);
1313 assert!(stream.send.is_complete());
1314
1315 assert!(!stream.is_complete());
1316
1317 let mut buf = [0; 3];
1318 assert_eq!(stream.recv.emit(&mut buf), Ok((3, true)));
1319 assert!(stream.recv.is_fin());
1320
1321 assert!(stream.is_complete());
1322 }
1323
1324 #[test]
1325 fn send_fin_zero_length_output() {
1326 let mut buf = [0; 5];
1327
1328 let mut stream =
1329 <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1330
1331 assert_eq!(stream.send.write(b"hello", false), Ok(5));
1332 assert_eq!(stream.send.off_front(), 0);
1333 assert!(!stream.send.is_fin());
1334
1335 let (written, fin) = stream.send.emit(&mut buf).unwrap();
1336 assert_eq!(written, 5);
1337 assert!(!fin);
1338 assert_eq!(&buf[..written], b"hello");
1339
1340 assert_eq!(stream.send.write(b"", true), Ok(0));
1341 assert!(stream.send.is_fin());
1342 assert_eq!(stream.send.off_front(), 5);
1343
1344 let (written, fin) = stream.send.emit(&mut buf).unwrap();
1345 assert_eq!(written, 0);
1346 assert!(fin);
1347 assert_eq!(&buf[..written], b"");
1348 }
1349
1350 fn stream_send_ready(stream: &Stream) -> bool {
1351 !stream.send.is_empty() &&
1352 stream.send.off_front() < stream.send.off_back()
1353 }
1354
1355 #[test]
1356 fn send_emit() {
1357 let mut buf = [0; 5];
1358
1359 let mut stream =
1360 <Stream>::new(0, 0, 20, true, true, DEFAULT_STREAM_WINDOW);
1361
1362 assert_eq!(stream.send.write(b"hello", false), Ok(5));
1363 assert_eq!(stream.send.write(b"world", false), Ok(5));
1364 assert_eq!(stream.send.write(b"olleh", false), Ok(5));
1365 assert_eq!(stream.send.write(b"dlrow", true), Ok(5));
1366 assert_eq!(stream.send.off_front(), 0);
1367 assert_eq!(stream.send.bufs_count(), 4);
1368
1369 assert!(stream.is_flushable());
1370
1371 assert!(stream_send_ready(&stream));
1372 assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1373 assert_eq!(stream.send.off_front(), 4);
1374 assert_eq!(&buf[..4], b"hell");
1375
1376 assert!(stream_send_ready(&stream));
1377 assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1378 assert_eq!(stream.send.off_front(), 8);
1379 assert_eq!(&buf[..4], b"owor");
1380
1381 assert!(stream_send_ready(&stream));
1382 assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1383 assert_eq!(stream.send.off_front(), 10);
1384 assert_eq!(&buf[..2], b"ld");
1385
1386 assert!(stream_send_ready(&stream));
1387 assert_eq!(stream.send.emit(&mut buf[..1]), Ok((1, false)));
1388 assert_eq!(stream.send.off_front(), 11);
1389 assert_eq!(&buf[..1], b"o");
1390
1391 assert!(stream_send_ready(&stream));
1392 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1393 assert_eq!(stream.send.off_front(), 16);
1394 assert_eq!(&buf[..5], b"llehd");
1395
1396 assert!(stream_send_ready(&stream));
1397 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((4, true)));
1398 assert_eq!(stream.send.off_front(), 20);
1399 assert_eq!(&buf[..4], b"lrow");
1400
1401 assert!(!stream.is_flushable());
1402
1403 assert!(!stream_send_ready(&stream));
1404 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((0, true)));
1405 assert_eq!(stream.send.off_front(), 20);
1406 }
1407
1408 #[test]
1409 fn send_emit_ack() {
1410 let mut buf = [0; 5];
1411
1412 let mut stream =
1413 <Stream>::new(0, 0, 20, true, true, DEFAULT_STREAM_WINDOW);
1414
1415 assert_eq!(stream.send.write(b"hello", false), Ok(5));
1416 assert_eq!(stream.send.write(b"world", false), Ok(5));
1417 assert_eq!(stream.send.write(b"olleh", false), Ok(5));
1418 assert_eq!(stream.send.write(b"dlrow", true), Ok(5));
1419 assert_eq!(stream.send.off_front(), 0);
1420 assert_eq!(stream.send.bufs_count(), 4);
1421
1422 assert!(stream.is_flushable());
1423
1424 assert!(stream_send_ready(&stream));
1425 assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1426 assert_eq!(stream.send.off_front(), 4);
1427 assert_eq!(&buf[..4], b"hell");
1428
1429 assert!(stream_send_ready(&stream));
1430 assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1431 assert_eq!(stream.send.off_front(), 8);
1432 assert_eq!(&buf[..4], b"owor");
1433
1434 stream.send.ack_and_drop(0, 5);
1435 assert_eq!(stream.send.bufs_count(), 3);
1436
1437 assert!(stream_send_ready(&stream));
1438 assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1439 assert_eq!(stream.send.off_front(), 10);
1440 assert_eq!(&buf[..2], b"ld");
1441
1442 stream.send.ack_and_drop(7, 5);
1443 assert_eq!(stream.send.bufs_count(), 3);
1444
1445 assert!(stream_send_ready(&stream));
1446 assert_eq!(stream.send.emit(&mut buf[..1]), Ok((1, false)));
1447 assert_eq!(stream.send.off_front(), 11);
1448 assert_eq!(&buf[..1], b"o");
1449
1450 assert!(stream_send_ready(&stream));
1451 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1452 assert_eq!(stream.send.off_front(), 16);
1453 assert_eq!(&buf[..5], b"llehd");
1454
1455 stream.send.ack_and_drop(5, 7);
1456 assert_eq!(stream.send.bufs_count(), 2);
1457
1458 assert!(stream_send_ready(&stream));
1459 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((4, true)));
1460 assert_eq!(stream.send.off_front(), 20);
1461 assert_eq!(&buf[..4], b"lrow");
1462
1463 assert!(!stream.is_flushable());
1464
1465 assert!(!stream_send_ready(&stream));
1466 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((0, true)));
1467 assert_eq!(stream.send.off_front(), 20);
1468
1469 stream.send.ack_and_drop(22, 4);
1470 assert_eq!(stream.send.bufs_count(), 2);
1471
1472 stream.send.ack_and_drop(20, 1);
1473 assert_eq!(stream.send.bufs_count(), 2);
1474 }
1475
1476 #[test]
1477 fn send_emit_retransmit() {
1478 let mut buf = [0; 5];
1479
1480 let mut stream =
1481 <Stream>::new(0, 0, 20, true, true, DEFAULT_STREAM_WINDOW);
1482
1483 assert_eq!(stream.send.write(b"hello", false), Ok(5));
1484 assert_eq!(stream.send.write(b"world", false), Ok(5));
1485 assert_eq!(stream.send.write(b"olleh", false), Ok(5));
1486 assert_eq!(stream.send.write(b"dlrow", true), Ok(5));
1487 assert_eq!(stream.send.off_front(), 0);
1488 assert_eq!(stream.send.bufs_count(), 4);
1489
1490 assert!(stream.is_flushable());
1491
1492 assert!(stream_send_ready(&stream));
1493 assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1494 assert_eq!(stream.send.off_front(), 4);
1495 assert_eq!(&buf[..4], b"hell");
1496
1497 assert!(stream_send_ready(&stream));
1498 assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1499 assert_eq!(stream.send.off_front(), 8);
1500 assert_eq!(&buf[..4], b"owor");
1501
1502 stream.send.retransmit(3, 3);
1503 assert_eq!(stream.send.off_front(), 3);
1504
1505 assert!(stream_send_ready(&stream));
1506 assert_eq!(stream.send.emit(&mut buf[..3]), Ok((3, false)));
1507 assert_eq!(stream.send.off_front(), 8);
1508 assert_eq!(&buf[..3], b"low");
1509
1510 assert!(stream_send_ready(&stream));
1511 assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1512 assert_eq!(stream.send.off_front(), 10);
1513 assert_eq!(&buf[..2], b"ld");
1514
1515 stream.send.ack_and_drop(7, 2);
1516
1517 stream.send.retransmit(8, 2);
1518
1519 assert!(stream_send_ready(&stream));
1520 assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1521 assert_eq!(stream.send.off_front(), 10);
1522 assert_eq!(&buf[..2], b"ld");
1523
1524 assert!(stream_send_ready(&stream));
1525 assert_eq!(stream.send.emit(&mut buf[..1]), Ok((1, false)));
1526 assert_eq!(stream.send.off_front(), 11);
1527 assert_eq!(&buf[..1], b"o");
1528
1529 assert!(stream_send_ready(&stream));
1530 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1531 assert_eq!(stream.send.off_front(), 16);
1532 assert_eq!(&buf[..5], b"llehd");
1533
1534 stream.send.retransmit(12, 2);
1535
1536 assert!(stream_send_ready(&stream));
1537 assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1538 assert_eq!(stream.send.off_front(), 16);
1539 assert_eq!(&buf[..2], b"le");
1540
1541 assert!(stream_send_ready(&stream));
1542 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((4, true)));
1543 assert_eq!(stream.send.off_front(), 20);
1544 assert_eq!(&buf[..4], b"lrow");
1545
1546 assert!(!stream.is_flushable());
1547
1548 assert!(!stream_send_ready(&stream));
1549 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((0, true)));
1550 assert_eq!(stream.send.off_front(), 20);
1551
1552 stream.send.retransmit(7, 12);
1553
1554 assert!(stream_send_ready(&stream));
1555 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1556 assert_eq!(stream.send.off_front(), 12);
1557 assert_eq!(&buf[..5], b"rldol");
1558
1559 assert!(stream_send_ready(&stream));
1560 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1561 assert_eq!(stream.send.off_front(), 17);
1562 assert_eq!(&buf[..5], b"lehdl");
1563
1564 assert!(stream_send_ready(&stream));
1565 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((2, false)));
1566 assert_eq!(stream.send.off_front(), 20);
1567 assert_eq!(&buf[..2], b"ro");
1568
1569 stream.send.ack_and_drop(12, 7);
1570
1571 stream.send.retransmit(7, 12);
1572
1573 assert!(stream_send_ready(&stream));
1574 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1575 assert_eq!(stream.send.off_front(), 12);
1576 assert_eq!(&buf[..5], b"rldol");
1577
1578 assert!(stream_send_ready(&stream));
1579 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1580 assert_eq!(stream.send.off_front(), 17);
1581 assert_eq!(&buf[..5], b"lehdl");
1582
1583 assert!(stream_send_ready(&stream));
1584 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((2, false)));
1585 assert_eq!(stream.send.off_front(), 20);
1586 assert_eq!(&buf[..2], b"ro");
1587 }
1588
1589 #[test]
1590 fn rangebuf_split_off() {
1591 let mut buf = <RangeBuf>::from(b"helloworld", 5, true);
1592 assert_eq!(buf.start, 0);
1593 assert_eq!(buf.pos, 0);
1594 assert_eq!(buf.len, 10);
1595 assert_eq!(buf.off, 5);
1596 assert!(buf.fin);
1597
1598 assert_eq!(buf.len(), 10);
1599 assert_eq!(buf.off(), 5);
1600 assert!(buf.fin());
1601
1602 assert_eq!(&buf[..], b"helloworld");
1603
1604 buf.consume(5);
1606
1607 assert_eq!(buf.start, 0);
1608 assert_eq!(buf.pos, 5);
1609 assert_eq!(buf.len, 10);
1610 assert_eq!(buf.off, 5);
1611 assert!(buf.fin);
1612
1613 assert_eq!(buf.len(), 5);
1614 assert_eq!(buf.off(), 10);
1615 assert!(buf.fin());
1616
1617 assert_eq!(&buf[..], b"world");
1618
1619 let mut new_buf = buf.split_off(3);
1621
1622 assert_eq!(buf.start, 0);
1623 assert_eq!(buf.pos, 3);
1624 assert_eq!(buf.len, 3);
1625 assert_eq!(buf.off, 5);
1626 assert!(!buf.fin);
1627
1628 assert_eq!(buf.len(), 0);
1629 assert_eq!(buf.off(), 8);
1630 assert!(!buf.fin());
1631
1632 assert_eq!(&buf[..], b"");
1633
1634 assert_eq!(new_buf.start, 3);
1635 assert_eq!(new_buf.pos, 5);
1636 assert_eq!(new_buf.len, 7);
1637 assert_eq!(new_buf.off, 8);
1638 assert!(new_buf.fin);
1639
1640 assert_eq!(new_buf.len(), 5);
1641 assert_eq!(new_buf.off(), 10);
1642 assert!(new_buf.fin());
1643
1644 assert_eq!(&new_buf[..], b"world");
1645
1646 new_buf.consume(2);
1648
1649 assert_eq!(new_buf.start, 3);
1650 assert_eq!(new_buf.pos, 7);
1651 assert_eq!(new_buf.len, 7);
1652 assert_eq!(new_buf.off, 8);
1653 assert!(new_buf.fin);
1654
1655 assert_eq!(new_buf.len(), 3);
1656 assert_eq!(new_buf.off(), 12);
1657 assert!(new_buf.fin());
1658
1659 assert_eq!(&new_buf[..], b"rld");
1660
1661 let mut new_new_buf = new_buf.split_off(5);
1663
1664 assert_eq!(new_buf.start, 3);
1665 assert_eq!(new_buf.pos, 7);
1666 assert_eq!(new_buf.len, 5);
1667 assert_eq!(new_buf.off, 8);
1668 assert!(!new_buf.fin);
1669
1670 assert_eq!(new_buf.len(), 1);
1671 assert_eq!(new_buf.off(), 12);
1672 assert!(!new_buf.fin());
1673
1674 assert_eq!(&new_buf[..], b"r");
1675
1676 assert_eq!(new_new_buf.start, 8);
1677 assert_eq!(new_new_buf.pos, 8);
1678 assert_eq!(new_new_buf.len, 2);
1679 assert_eq!(new_new_buf.off, 13);
1680 assert!(new_new_buf.fin);
1681
1682 assert_eq!(new_new_buf.len(), 2);
1683 assert_eq!(new_new_buf.off(), 13);
1684 assert!(new_new_buf.fin());
1685
1686 assert_eq!(&new_new_buf[..], b"ld");
1687
1688 new_new_buf.consume(2);
1690
1691 assert_eq!(new_new_buf.start, 8);
1692 assert_eq!(new_new_buf.pos, 10);
1693 assert_eq!(new_new_buf.len, 2);
1694 assert_eq!(new_new_buf.off, 13);
1695 assert!(new_new_buf.fin);
1696
1697 assert_eq!(new_new_buf.len(), 0);
1698 assert_eq!(new_new_buf.off(), 15);
1699 assert!(new_new_buf.fin());
1700
1701 assert_eq!(&new_new_buf[..], b"");
1702 }
1703
1704 #[test]
1707 fn stream_limit_auto_open() {
1708 let local_tp = crate::TransportParams::default();
1709 let peer_tp = crate::TransportParams::default();
1710
1711 let mut streams = <StreamMap>::new(5, 5, 5);
1712
1713 let stream_id = 500;
1714 assert!(!is_local(stream_id, true), "stream id is peer initiated");
1715 assert!(is_bidi(stream_id), "stream id is bidirectional");
1716 assert_eq!(
1717 streams
1718 .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
1719 .err(),
1720 Some(Error::StreamLimit),
1721 "stream limit should be exceeded"
1722 );
1723 }
1724
1725 #[test]
1728 fn stream_create_out_of_order() {
1729 let local_tp = crate::TransportParams::default();
1730 let peer_tp = crate::TransportParams::default();
1731
1732 let mut streams = <StreamMap>::new(5, 5, 5);
1733
1734 for stream_id in [8, 12, 4] {
1735 assert!(is_local(stream_id, false), "stream id is client initiated");
1736 assert!(is_bidi(stream_id), "stream id is bidirectional");
1737 assert!(streams
1738 .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
1739 .is_ok());
1740 }
1741 }
1742
1743 #[test]
1745 fn stream_limit_edge() {
1746 let local_tp = crate::TransportParams::default();
1747 let peer_tp = crate::TransportParams::default();
1748
1749 let mut streams = <StreamMap>::new(3, 3, 3);
1750
1751 let stream_id = 8;
1753 assert!(streams
1754 .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
1755 .is_ok());
1756
1757 let stream_id = 12;
1759 assert_eq!(
1760 streams
1761 .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
1762 .err(),
1763 Some(Error::StreamLimit)
1764 );
1765 }
1766
1767 fn cycle_stream_priority(stream_id: u64, streams: &mut StreamMap) {
1768 let key = streams.get(stream_id).unwrap().priority_key.clone();
1769 streams.update_priority(&key.clone(), &key);
1770 }
1771
1772 #[test]
1773 fn writable_prioritized_default_priority() {
1774 let local_tp = crate::TransportParams::default();
1775 let peer_tp = crate::TransportParams {
1776 initial_max_stream_data_bidi_local: 100,
1777 initial_max_stream_data_uni: 100,
1778 ..Default::default()
1779 };
1780
1781 let mut streams = StreamMap::new(100, 100, 100);
1782
1783 for id in [0, 4, 8, 12] {
1784 assert!(streams
1785 .get_or_create(id, &local_tp, &peer_tp, false, true)
1786 .is_ok());
1787 }
1788
1789 let walk_1: Vec<u64> = streams.writable().collect();
1790 cycle_stream_priority(*walk_1.first().unwrap(), &mut streams);
1791 let walk_2: Vec<u64> = streams.writable().collect();
1792 cycle_stream_priority(*walk_2.first().unwrap(), &mut streams);
1793 let walk_3: Vec<u64> = streams.writable().collect();
1794 cycle_stream_priority(*walk_3.first().unwrap(), &mut streams);
1795 let walk_4: Vec<u64> = streams.writable().collect();
1796 cycle_stream_priority(*walk_4.first().unwrap(), &mut streams);
1797 let walk_5: Vec<u64> = streams.writable().collect();
1798
1799 assert_eq!(walk_1, vec![0, 4, 8, 12]);
1802 assert_eq!(walk_2, vec![4, 8, 12, 0]);
1803 assert_eq!(walk_3, vec![8, 12, 0, 4]);
1804 assert_eq!(walk_4, vec![12, 0, 4, 8,]);
1805 assert_eq!(walk_5, vec![0, 4, 8, 12]);
1806 }
1807
1808 #[test]
1809 fn writable_prioritized_insert_order() {
1810 let local_tp = crate::TransportParams::default();
1811 let peer_tp = crate::TransportParams {
1812 initial_max_stream_data_bidi_local: 100,
1813 initial_max_stream_data_uni: 100,
1814 ..Default::default()
1815 };
1816
1817 let mut streams = StreamMap::new(100, 100, 100);
1818
1819 for id in [12, 4, 8, 0] {
1822 assert!(streams
1823 .get_or_create(id, &local_tp, &peer_tp, false, true)
1824 .is_ok());
1825 }
1826
1827 let walk_1: Vec<u64> = streams.writable().collect();
1828 cycle_stream_priority(*walk_1.first().unwrap(), &mut streams);
1829 let walk_2: Vec<u64> = streams.writable().collect();
1830 cycle_stream_priority(*walk_2.first().unwrap(), &mut streams);
1831 let walk_3: Vec<u64> = streams.writable().collect();
1832 cycle_stream_priority(*walk_3.first().unwrap(), &mut streams);
1833 let walk_4: Vec<u64> = streams.writable().collect();
1834 cycle_stream_priority(*walk_4.first().unwrap(), &mut streams);
1835 let walk_5: Vec<u64> = streams.writable().collect();
1836 assert_eq!(walk_1, vec![12, 4, 8, 0]);
1837 assert_eq!(walk_2, vec![4, 8, 0, 12]);
1838 assert_eq!(walk_3, vec![8, 0, 12, 4,]);
1839 assert_eq!(walk_4, vec![0, 12, 4, 8]);
1840 assert_eq!(walk_5, vec![12, 4, 8, 0]);
1841 }
1842
1843 #[test]
1844 fn writable_prioritized_mixed_urgency() {
1845 let local_tp = crate::TransportParams::default();
1846 let peer_tp = crate::TransportParams {
1847 initial_max_stream_data_bidi_local: 100,
1848 initial_max_stream_data_uni: 100,
1849 ..Default::default()
1850 };
1851
1852 let mut streams = <StreamMap>::new(100, 100, 100);
1853
1854 let input = vec![
1857 (0, 100),
1858 (4, 90),
1859 (8, 80),
1860 (12, 70),
1861 (16, 60),
1862 (20, 50),
1863 (24, 40),
1864 (28, 30),
1865 (32, 20),
1866 (36, 10),
1867 (40, 0),
1868 ];
1869
1870 for (id, urgency) in input.clone() {
1871 let stream = streams
1874 .get_or_create(id, &local_tp, &peer_tp, false, true)
1875 .unwrap();
1876
1877 stream.urgency = urgency;
1878
1879 let new_priority_key = Arc::new(StreamPriorityKey {
1880 urgency: stream.urgency,
1881 incremental: stream.incremental,
1882 id,
1883 ..Default::default()
1884 });
1885
1886 let old_priority_key = std::mem::replace(
1887 &mut stream.priority_key,
1888 new_priority_key.clone(),
1889 );
1890
1891 streams.update_priority(&old_priority_key, &new_priority_key);
1892 }
1893
1894 let walk_1: Vec<u64> = streams.writable().collect();
1895 assert_eq!(walk_1, vec![40, 36, 32, 28, 24, 20, 16, 12, 8, 4, 0]);
1896
1897 for (id, urgency) in input {
1899 let stream = streams
1902 .get_or_create(id, &local_tp, &peer_tp, false, true)
1903 .unwrap();
1904
1905 stream.urgency = urgency;
1906
1907 let new_priority_key = Arc::new(StreamPriorityKey {
1908 urgency: stream.urgency,
1909 incremental: stream.incremental,
1910 id,
1911 ..Default::default()
1912 });
1913
1914 let old_priority_key = std::mem::replace(
1915 &mut stream.priority_key,
1916 new_priority_key.clone(),
1917 );
1918
1919 streams.update_priority(&old_priority_key, &new_priority_key);
1920 }
1921
1922 let walk_2: Vec<u64> = streams.writable().collect();
1923 assert_eq!(walk_2, vec![40, 36, 32, 28, 24, 20, 16, 12, 8, 4, 0]);
1924
1925 streams.collect(24, true);
1927
1928 let walk_3: Vec<u64> = streams.writable().collect();
1929 assert_eq!(walk_3, vec![40, 36, 32, 28, 20, 16, 12, 8, 4, 0]);
1930
1931 streams.collect(40, true);
1932 streams.collect(0, true);
1933
1934 let walk_4: Vec<u64> = streams.writable().collect();
1935 assert_eq!(walk_4, vec![36, 32, 28, 20, 16, 12, 8, 4]);
1936
1937 streams
1939 .get_or_create(44, &local_tp, &peer_tp, false, true)
1940 .unwrap();
1941
1942 let walk_5: Vec<u64> = streams.writable().collect();
1943 assert_eq!(walk_5, vec![36, 32, 28, 20, 16, 12, 8, 4, 44]);
1944 }
1945
1946 #[test]
1947 fn writable_prioritized_mixed_urgencies_incrementals() {
1948 let local_tp = crate::TransportParams::default();
1949 let peer_tp = crate::TransportParams {
1950 initial_max_stream_data_bidi_local: 100,
1951 initial_max_stream_data_uni: 100,
1952 ..Default::default()
1953 };
1954
1955 let mut streams = StreamMap::new(100, 100, 100);
1956
1957 let input = vec![
1959 (0, 100),
1960 (4, 20),
1961 (8, 100),
1962 (12, 20),
1963 (16, 90),
1964 (20, 25),
1965 (24, 90),
1966 (28, 30),
1967 (32, 80),
1968 (36, 20),
1969 (40, 0),
1970 ];
1971
1972 for (id, urgency) in input.clone() {
1973 let stream = streams
1976 .get_or_create(id, &local_tp, &peer_tp, false, true)
1977 .unwrap();
1978
1979 stream.urgency = urgency;
1980
1981 let new_priority_key = Arc::new(StreamPriorityKey {
1982 urgency: stream.urgency,
1983 incremental: stream.incremental,
1984 id,
1985 ..Default::default()
1986 });
1987
1988 let old_priority_key = std::mem::replace(
1989 &mut stream.priority_key,
1990 new_priority_key.clone(),
1991 );
1992
1993 streams.update_priority(&old_priority_key, &new_priority_key);
1994 }
1995
1996 let walk_1: Vec<u64> = streams.writable().collect();
1997 cycle_stream_priority(4, &mut streams);
1998 cycle_stream_priority(16, &mut streams);
1999 cycle_stream_priority(0, &mut streams);
2000 let walk_2: Vec<u64> = streams.writable().collect();
2001 cycle_stream_priority(12, &mut streams);
2002 cycle_stream_priority(24, &mut streams);
2003 cycle_stream_priority(8, &mut streams);
2004 let walk_3: Vec<u64> = streams.writable().collect();
2005 cycle_stream_priority(36, &mut streams);
2006 cycle_stream_priority(16, &mut streams);
2007 cycle_stream_priority(0, &mut streams);
2008 let walk_4: Vec<u64> = streams.writable().collect();
2009 cycle_stream_priority(4, &mut streams);
2010 cycle_stream_priority(24, &mut streams);
2011 cycle_stream_priority(8, &mut streams);
2012 let walk_5: Vec<u64> = streams.writable().collect();
2013 cycle_stream_priority(12, &mut streams);
2014 cycle_stream_priority(16, &mut streams);
2015 cycle_stream_priority(0, &mut streams);
2016 let walk_6: Vec<u64> = streams.writable().collect();
2017 cycle_stream_priority(36, &mut streams);
2018 cycle_stream_priority(24, &mut streams);
2019 cycle_stream_priority(8, &mut streams);
2020 let walk_7: Vec<u64> = streams.writable().collect();
2021 cycle_stream_priority(4, &mut streams);
2022 cycle_stream_priority(16, &mut streams);
2023 cycle_stream_priority(0, &mut streams);
2024 let walk_8: Vec<u64> = streams.writable().collect();
2025 cycle_stream_priority(12, &mut streams);
2026 cycle_stream_priority(24, &mut streams);
2027 cycle_stream_priority(8, &mut streams);
2028 let walk_9: Vec<u64> = streams.writable().collect();
2029 cycle_stream_priority(36, &mut streams);
2030 cycle_stream_priority(16, &mut streams);
2031 cycle_stream_priority(0, &mut streams);
2032
2033 assert_eq!(walk_1, vec![40, 4, 12, 36, 20, 28, 32, 16, 24, 0, 8]);
2034 assert_eq!(walk_2, vec![40, 12, 36, 4, 20, 28, 32, 24, 16, 8, 0]);
2035 assert_eq!(walk_3, vec![40, 36, 4, 12, 20, 28, 32, 16, 24, 0, 8]);
2036 assert_eq!(walk_4, vec![40, 4, 12, 36, 20, 28, 32, 24, 16, 8, 0]);
2037 assert_eq!(walk_5, vec![40, 12, 36, 4, 20, 28, 32, 16, 24, 0, 8]);
2038 assert_eq!(walk_6, vec![40, 36, 4, 12, 20, 28, 32, 24, 16, 8, 0]);
2039 assert_eq!(walk_7, vec![40, 4, 12, 36, 20, 28, 32, 16, 24, 0, 8]);
2040 assert_eq!(walk_8, vec![40, 12, 36, 4, 20, 28, 32, 24, 16, 8, 0]);
2041 assert_eq!(walk_9, vec![40, 36, 4, 12, 20, 28, 32, 16, 24, 0, 8]);
2042
2043 streams.collect(20, true);
2045
2046 let walk_10: Vec<u64> = streams.writable().collect();
2047 assert_eq!(walk_10, vec![40, 4, 12, 36, 28, 32, 24, 16, 8, 0]);
2048
2049 let stream = streams
2051 .get_or_create(44, &local_tp, &peer_tp, false, true)
2052 .unwrap();
2053
2054 stream.urgency = 20;
2055 stream.incremental = true;
2056
2057 let new_priority_key = Arc::new(StreamPriorityKey {
2058 urgency: stream.urgency,
2059 incremental: stream.incremental,
2060 id: 44,
2061 ..Default::default()
2062 });
2063
2064 let old_priority_key =
2065 std::mem::replace(&mut stream.priority_key, new_priority_key.clone());
2066
2067 streams.update_priority(&old_priority_key, &new_priority_key);
2068
2069 let walk_11: Vec<u64> = streams.writable().collect();
2070 assert_eq!(walk_11, vec![40, 4, 12, 36, 44, 28, 32, 24, 16, 8, 0]);
2071 }
2072
2073 #[test]
2074 fn priority_tree_dupes() {
2075 let mut prioritized_writable: RBTree<StreamWritablePriorityAdapter> =
2076 Default::default();
2077
2078 for id in [0, 4, 8, 12] {
2079 let s = Arc::new(StreamPriorityKey {
2080 urgency: 0,
2081 incremental: false,
2082 id,
2083 ..Default::default()
2084 });
2085
2086 prioritized_writable.insert(s);
2087 }
2088
2089 let walk_1: Vec<u64> =
2090 prioritized_writable.iter().map(|s| s.id).collect();
2091 assert_eq!(walk_1, vec![0, 4, 8, 12]);
2092
2093 for id in [0, 4, 8, 12] {
2096 let s = Arc::new(StreamPriorityKey {
2097 urgency: 0,
2098 incremental: false,
2099 id,
2100 ..Default::default()
2101 });
2102
2103 prioritized_writable.insert(s);
2104 }
2105
2106 let walk_2: Vec<u64> =
2107 prioritized_writable.iter().map(|s| s.id).collect();
2108 assert_eq!(walk_2, vec![0, 0, 4, 4, 8, 8, 12, 12]);
2109 }
2110}
2111
2112mod recv_buf;
2113mod send_buf;