1use std::cmp;
28
29use std::sync::Arc;
30
31use std::collections::hash_map;
32use std::collections::HashMap;
33use std::collections::HashSet;
34
35use intrusive_collections::intrusive_adapter;
36use intrusive_collections::KeyAdapter;
37use intrusive_collections::RBTree;
38use intrusive_collections::RBTreeAtomicLink;
39
40use smallvec::SmallVec;
41
42use crate::range_buf::DefaultBufFactory;
43use crate::BufFactory;
44use crate::Error;
45use crate::Result;
46
47const DEFAULT_URGENCY: u8 = 127;
48
49const DEFAULT_STREAM_WINDOW: u64 = 32 * 1024;
51
52pub const MAX_STREAM_WINDOW: u64 = 16 * 1024 * 1024;
54
55#[derive(Default)]
60pub struct StreamIdHasher {
61 id: u64,
62}
63
64#[derive(Debug, PartialEq, Clone, Copy)]
66pub struct RecvBufResetReturn {
67 pub max_data_delta: u64,
70
71 pub consumed_flowcontrol: u64,
74}
75
76impl RecvBufResetReturn {
77 pub fn zero() -> Self {
78 Self {
79 max_data_delta: 0,
80 consumed_flowcontrol: 0,
81 }
82 }
83}
84
85impl std::hash::Hasher for StreamIdHasher {
86 #[inline]
87 fn finish(&self) -> u64 {
88 self.id
89 }
90
91 #[inline]
92 fn write_u64(&mut self, id: u64) {
93 self.id = id;
94 }
95
96 #[inline]
97 fn write(&mut self, _: &[u8]) {
98 unimplemented!()
101 }
102}
103
104type BuildStreamIdHasher = std::hash::BuildHasherDefault<StreamIdHasher>;
105
106pub type StreamIdHashMap<V> = HashMap<u64, V, BuildStreamIdHasher>;
107pub type StreamIdHashSet = HashSet<u64, BuildStreamIdHasher>;
108
109#[derive(Default)]
111pub struct StreamMap<F: BufFactory = DefaultBufFactory> {
112 streams: StreamIdHashMap<Stream<F>>,
114
115 collected: StreamIdHashSet,
121
122 peer_max_streams_bidi: u64,
124
125 peer_max_streams_uni: u64,
127
128 peer_opened_streams_bidi: u64,
130
131 peer_opened_streams_uni: u64,
133
134 local_max_streams_bidi: u64,
136 local_max_streams_bidi_next: u64,
137
138 local_max_streams_uni: u64,
140 local_max_streams_uni_next: u64,
141
142 local_opened_streams_bidi: u64,
144
145 local_opened_streams_uni: u64,
147
148 flushable: RBTree<StreamFlushablePriorityAdapter>,
152
153 pub readable: RBTree<StreamReadablePriorityAdapter>,
157
158 pub writable: RBTree<StreamWritablePriorityAdapter>,
163
164 almost_full: StreamIdHashSet,
169
170 blocked: StreamIdHashMap<u64>,
174
175 reset: StreamIdHashMap<(u64, u64)>,
179
180 stopped: StreamIdHashMap<u64>,
184
185 max_stream_window: u64,
187}
188
189impl<F: BufFactory> StreamMap<F> {
190 pub fn new(
191 max_streams_bidi: u64, max_streams_uni: u64, max_stream_window: u64,
192 ) -> Self {
193 StreamMap {
194 local_max_streams_bidi: max_streams_bidi,
195 local_max_streams_bidi_next: max_streams_bidi,
196
197 local_max_streams_uni: max_streams_uni,
198 local_max_streams_uni_next: max_streams_uni,
199
200 max_stream_window,
201
202 ..StreamMap::default()
203 }
204 }
205
206 pub fn get(&self, id: u64) -> Option<&Stream<F>> {
208 self.streams.get(&id)
209 }
210
211 pub fn get_mut(&mut self, id: u64) -> Option<&mut Stream<F>> {
213 self.streams.get_mut(&id)
214 }
215
216 pub(crate) fn get_or_create(
229 &mut self, id: u64, local_params: &crate::TransportParams,
230 peer_params: &crate::TransportParams, local: bool, is_server: bool,
231 ) -> Result<&mut Stream<F>> {
232 let (stream, is_new_and_writable) = match self.streams.entry(id) {
233 hash_map::Entry::Vacant(v) => {
234 if self.collected.contains(&id) {
236 return Err(Error::Done);
237 }
238
239 if local != is_local(id, is_server) {
240 return Err(Error::InvalidStreamState(id));
241 }
242
243 let (max_rx_data, max_tx_data) = match (local, is_bidi(id)) {
244 (true, true) => (
246 local_params.initial_max_stream_data_bidi_local,
247 peer_params.initial_max_stream_data_bidi_remote,
248 ),
249
250 (true, false) => (0, peer_params.initial_max_stream_data_uni),
252
253 (false, true) => (
255 local_params.initial_max_stream_data_bidi_remote,
256 peer_params.initial_max_stream_data_bidi_local,
257 ),
258
259 (false, false) =>
261 (local_params.initial_max_stream_data_uni, 0),
262 };
263
264 let stream_sequence = id >> 2;
268
269 match (is_local(id, is_server), is_bidi(id)) {
271 (true, true) => {
272 let n = cmp::max(
273 self.local_opened_streams_bidi,
274 stream_sequence + 1,
275 );
276
277 if n > self.peer_max_streams_bidi {
278 return Err(Error::StreamLimit);
279 }
280
281 self.local_opened_streams_bidi = n;
282 },
283
284 (true, false) => {
285 let n = cmp::max(
286 self.local_opened_streams_uni,
287 stream_sequence + 1,
288 );
289
290 if n > self.peer_max_streams_uni {
291 return Err(Error::StreamLimit);
292 }
293
294 self.local_opened_streams_uni = n;
295 },
296
297 (false, true) => {
298 let n = cmp::max(
299 self.peer_opened_streams_bidi,
300 stream_sequence + 1,
301 );
302
303 if n > self.local_max_streams_bidi {
304 return Err(Error::StreamLimit);
305 }
306
307 self.peer_opened_streams_bidi = n;
308 },
309
310 (false, false) => {
311 let n = cmp::max(
312 self.peer_opened_streams_uni,
313 stream_sequence + 1,
314 );
315
316 if n > self.local_max_streams_uni {
317 return Err(Error::StreamLimit);
318 }
319
320 self.peer_opened_streams_uni = n;
321 },
322 };
323
324 let s = Stream::new(
325 id,
326 max_rx_data,
327 max_tx_data,
328 is_bidi(id),
329 local,
330 self.max_stream_window,
331 );
332
333 let is_writable = s.is_writable();
334
335 (v.insert(s), is_writable)
336 },
337
338 hash_map::Entry::Occupied(v) => (v.into_mut(), false),
339 };
340
341 if is_new_and_writable {
344 self.writable.insert(Arc::clone(&stream.priority_key));
345 }
346
347 Ok(stream)
348 }
349
350 pub fn insert_readable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
354 if !priority_key.readable.is_linked() {
355 self.readable.insert(Arc::clone(priority_key));
356 }
357 }
358
359 pub fn remove_readable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
361 if !priority_key.readable.is_linked() {
362 return;
363 }
364
365 let mut c = {
366 let ptr = Arc::as_ptr(priority_key);
367 unsafe { self.readable.cursor_mut_from_ptr(ptr) }
368 };
369
370 c.remove();
371 }
372
373 pub fn insert_writable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
380 if !priority_key.writable.is_linked() {
381 self.writable.insert(Arc::clone(priority_key));
382 }
383 }
384
385 pub fn remove_writable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
390 if !priority_key.writable.is_linked() {
391 return;
392 }
393
394 let mut c = {
395 let ptr = Arc::as_ptr(priority_key);
396 unsafe { self.writable.cursor_mut_from_ptr(ptr) }
397 };
398
399 c.remove();
400 }
401
402 pub fn insert_flushable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
406 if !priority_key.flushable.is_linked() {
407 self.flushable.insert(Arc::clone(priority_key));
408 }
409 }
410
411 pub fn remove_flushable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
413 if !priority_key.flushable.is_linked() {
414 return;
415 }
416
417 let mut c = {
418 let ptr = Arc::as_ptr(priority_key);
419 unsafe { self.flushable.cursor_mut_from_ptr(ptr) }
420 };
421
422 c.remove();
423 }
424
425 pub fn peek_flushable(&self) -> Option<Arc<StreamPriorityKey>> {
426 self.flushable.front().clone_pointer()
427 }
428
429 pub fn update_priority(
431 &mut self, old: &Arc<StreamPriorityKey>, new: &Arc<StreamPriorityKey>,
432 ) {
433 if old.readable.is_linked() {
434 self.remove_readable(old);
435 self.readable.insert(Arc::clone(new));
436 }
437
438 if old.writable.is_linked() {
439 self.remove_writable(old);
440 self.writable.insert(Arc::clone(new));
441 }
442
443 if old.flushable.is_linked() {
444 self.remove_flushable(old);
445 self.flushable.insert(Arc::clone(new));
446 }
447 }
448
449 pub fn insert_almost_full(&mut self, stream_id: u64) {
453 self.almost_full.insert(stream_id);
454 }
455
456 pub fn remove_almost_full(&mut self, stream_id: u64) {
458 self.almost_full.remove(&stream_id);
459 }
460
461 pub fn insert_blocked(&mut self, stream_id: u64, off: u64) {
466 self.blocked.insert(stream_id, off);
467 }
468
469 pub fn remove_blocked(&mut self, stream_id: u64) {
471 self.blocked.remove(&stream_id);
472 }
473
474 pub fn insert_reset(
479 &mut self, stream_id: u64, error_code: u64, final_size: u64,
480 ) {
481 self.reset.insert(stream_id, (error_code, final_size));
482 }
483
484 pub fn remove_reset(&mut self, stream_id: u64) {
486 self.reset.remove(&stream_id);
487 }
488
489 pub fn insert_stopped(&mut self, stream_id: u64, error_code: u64) {
494 self.stopped.insert(stream_id, error_code);
495 }
496
497 pub fn remove_stopped(&mut self, stream_id: u64) {
499 self.stopped.remove(&stream_id);
500 }
501
502 pub fn update_peer_max_streams_bidi(&mut self, v: u64) {
504 self.peer_max_streams_bidi = cmp::max(self.peer_max_streams_bidi, v);
505 }
506
507 pub fn update_peer_max_streams_uni(&mut self, v: u64) {
509 self.peer_max_streams_uni = cmp::max(self.peer_max_streams_uni, v);
510 }
511
512 pub fn update_max_streams_bidi(&mut self) {
514 self.local_max_streams_bidi = self.local_max_streams_bidi_next;
515 }
516
517 pub fn set_max_streams_bidi(&mut self, max: u64) {
519 self.local_max_streams_bidi = max;
520 self.local_max_streams_bidi_next = max;
521 }
522
523 pub fn max_streams_bidi(&self) -> u64 {
525 self.local_max_streams_bidi
526 }
527
528 pub fn max_streams_bidi_next(&mut self) -> u64 {
530 self.local_max_streams_bidi_next
531 }
532
533 pub fn update_max_streams_uni(&mut self) {
535 self.local_max_streams_uni = self.local_max_streams_uni_next;
536 }
537
538 pub fn max_streams_uni_next(&mut self) -> u64 {
540 self.local_max_streams_uni_next
541 }
542
543 pub fn peer_streams_left_bidi(&self) -> u64 {
546 self.peer_max_streams_bidi - self.local_opened_streams_bidi
547 }
548
549 pub fn peer_streams_left_uni(&self) -> u64 {
552 self.peer_max_streams_uni - self.local_opened_streams_uni
553 }
554
555 pub fn collect(&mut self, stream_id: u64, local: bool) {
560 if !local {
561 if is_bidi(stream_id) {
564 self.local_max_streams_bidi_next =
565 self.local_max_streams_bidi_next.saturating_add(1);
566 } else {
567 self.local_max_streams_uni_next =
568 self.local_max_streams_uni_next.saturating_add(1);
569 }
570 }
571
572 let s = self.streams.remove(&stream_id).unwrap();
573
574 self.remove_readable(&s.priority_key);
575
576 self.remove_writable(&s.priority_key);
577
578 self.remove_flushable(&s.priority_key);
579
580 self.collected.insert(stream_id);
581 }
582
583 pub fn readable(&self) -> StreamIter {
585 StreamIter {
586 streams: self.readable.iter().map(|s| s.id).collect(),
587 index: 0,
588 }
589 }
590
591 pub fn writable(&self) -> StreamIter {
593 StreamIter {
594 streams: self.writable.iter().map(|s| s.id).collect(),
595 index: 0,
596 }
597 }
598
599 pub fn almost_full(&self) -> StreamIter {
601 StreamIter::from(&self.almost_full)
602 }
603
604 pub fn blocked(&self) -> hash_map::Iter<'_, u64, u64> {
606 self.blocked.iter()
607 }
608
609 pub fn reset(&self) -> hash_map::Iter<'_, u64, (u64, u64)> {
611 self.reset.iter()
612 }
613
614 pub fn stopped(&self) -> hash_map::Iter<'_, u64, u64> {
616 self.stopped.iter()
617 }
618
619 pub fn is_collected(&self, stream_id: u64) -> bool {
621 self.collected.contains(&stream_id)
622 }
623
624 pub fn has_flushable(&self) -> bool {
626 !self.flushable.is_empty()
627 }
628
629 pub fn has_readable(&self) -> bool {
631 !self.readable.is_empty()
632 }
633
634 pub fn has_almost_full(&self) -> bool {
637 !self.almost_full.is_empty()
638 }
639
640 pub fn has_blocked(&self) -> bool {
642 !self.blocked.is_empty()
643 }
644
645 pub fn has_reset(&self) -> bool {
647 !self.reset.is_empty()
648 }
649
650 pub fn has_stopped(&self) -> bool {
652 !self.stopped.is_empty()
653 }
654
655 pub fn should_update_max_streams_bidi(&self) -> bool {
658 self.local_max_streams_bidi_next != self.local_max_streams_bidi &&
659 self.local_max_streams_bidi_next / 2 >
660 self.local_max_streams_bidi - self.peer_opened_streams_bidi
661 }
662
663 pub fn should_update_max_streams_uni(&self) -> bool {
666 self.local_max_streams_uni_next != self.local_max_streams_uni &&
667 self.local_max_streams_uni_next / 2 >
668 self.local_max_streams_uni - self.peer_opened_streams_uni
669 }
670
671 #[cfg(test)]
673 pub fn len(&self) -> usize {
674 self.streams.len()
675 }
676}
677
678pub struct Stream<F: BufFactory = DefaultBufFactory> {
680 pub recv: recv_buf::RecvBuf,
682
683 pub send: send_buf::SendBuf<F>,
685
686 pub send_lowat: usize,
687
688 pub bidi: bool,
690
691 pub local: bool,
693
694 pub urgency: u8,
696
697 pub incremental: bool,
699
700 pub priority_key: Arc<StreamPriorityKey>,
701}
702
703impl<F: BufFactory> Stream<F> {
704 pub fn new(
706 id: u64, max_rx_data: u64, max_tx_data: u64, bidi: bool, local: bool,
707 max_window: u64,
708 ) -> Self {
709 let priority_key = Arc::new(StreamPriorityKey {
710 id,
711 ..Default::default()
712 });
713
714 Stream {
715 recv: recv_buf::RecvBuf::new(max_rx_data, max_window),
716 send: send_buf::SendBuf::new(max_tx_data),
717 send_lowat: 1,
718 bidi,
719 local,
720 urgency: priority_key.urgency,
721 incremental: priority_key.incremental,
722 priority_key,
723 }
724 }
725
726 pub fn is_readable(&self) -> bool {
728 self.recv.ready()
729 }
730
731 pub fn is_writable(&self) -> bool {
734 !self.send.is_shutdown() &&
735 !self.send.is_fin() &&
736 (self.send.off_back() + self.send_lowat as u64) <
737 self.send.max_off()
738 }
739
740 pub fn is_flushable(&self) -> bool {
743 let off_front = self.send.off_front();
744
745 !self.send.is_empty() &&
746 off_front < self.send.off_back() &&
747 off_front < self.send.max_off()
748 }
749
750 pub fn is_complete(&self) -> bool {
760 match (self.bidi, self.local) {
761 (true, _) => self.recv.is_fin() && self.send.is_complete(),
764
765 (false, true) => self.send.is_complete(),
768
769 (false, false) => self.recv.is_fin(),
772 }
773 }
774}
775
776pub fn is_local(stream_id: u64, is_server: bool) -> bool {
778 (stream_id & 0x1) == (is_server as u64)
779}
780
781pub fn is_bidi(stream_id: u64) -> bool {
783 (stream_id & 0x2) == 0
784}
785
786#[derive(Clone, Debug)]
787pub struct StreamPriorityKey {
788 pub urgency: u8,
789 pub incremental: bool,
790 pub id: u64,
791
792 pub readable: RBTreeAtomicLink,
793 pub writable: RBTreeAtomicLink,
794 pub flushable: RBTreeAtomicLink,
795}
796
797impl Default for StreamPriorityKey {
798 fn default() -> Self {
799 Self {
800 urgency: DEFAULT_URGENCY,
801 incremental: true,
802 id: Default::default(),
803 readable: Default::default(),
804 writable: Default::default(),
805 flushable: Default::default(),
806 }
807 }
808}
809
810impl PartialEq for StreamPriorityKey {
811 fn eq(&self, other: &Self) -> bool {
812 self.id == other.id
813 }
814}
815
816impl Eq for StreamPriorityKey {}
817
818impl PartialOrd for StreamPriorityKey {
819 #[allow(clippy::non_canonical_partial_ord_impl)]
821 fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
822 if self.id == other.id {
824 return Some(cmp::Ordering::Equal);
825 }
826
827 if self.urgency != other.urgency {
829 return self.urgency.partial_cmp(&other.urgency);
830 }
831
832 if !self.incremental && !other.incremental {
835 return self.id.partial_cmp(&other.id);
836 }
837
838 if self.incremental && !other.incremental {
840 return Some(cmp::Ordering::Greater);
841 }
842 if !self.incremental && other.incremental {
843 return Some(cmp::Ordering::Less);
844 }
845
846 Some(cmp::Ordering::Greater)
850 }
851}
852
853impl Ord for StreamPriorityKey {
854 fn cmp(&self, other: &Self) -> cmp::Ordering {
855 self.partial_cmp(other).unwrap()
857 }
858}
859
860intrusive_adapter!(pub StreamWritablePriorityAdapter = Arc<StreamPriorityKey>: StreamPriorityKey { writable: RBTreeAtomicLink });
861
862impl KeyAdapter<'_> for StreamWritablePriorityAdapter {
863 type Key = StreamPriorityKey;
864
865 fn get_key(&self, s: &StreamPriorityKey) -> Self::Key {
866 s.clone()
867 }
868}
869
870intrusive_adapter!(pub StreamReadablePriorityAdapter = Arc<StreamPriorityKey>: StreamPriorityKey { readable: RBTreeAtomicLink });
871
872impl KeyAdapter<'_> for StreamReadablePriorityAdapter {
873 type Key = StreamPriorityKey;
874
875 fn get_key(&self, s: &StreamPriorityKey) -> Self::Key {
876 s.clone()
877 }
878}
879
880intrusive_adapter!(pub StreamFlushablePriorityAdapter = Arc<StreamPriorityKey>: StreamPriorityKey { flushable: RBTreeAtomicLink });
881
882impl KeyAdapter<'_> for StreamFlushablePriorityAdapter {
883 type Key = StreamPriorityKey;
884
885 fn get_key(&self, s: &StreamPriorityKey) -> Self::Key {
886 s.clone()
887 }
888}
889
890#[derive(Default)]
892pub struct StreamIter {
893 streams: SmallVec<[u64; 8]>,
894 index: usize,
895}
896
897impl StreamIter {
898 #[inline]
899 fn from(streams: &StreamIdHashSet) -> Self {
900 StreamIter {
901 streams: streams.iter().copied().collect(),
902 index: 0,
903 }
904 }
905}
906
907impl Iterator for StreamIter {
908 type Item = u64;
909
910 #[inline]
911 fn next(&mut self) -> Option<Self::Item> {
912 let v = self.streams.get(self.index)?;
913 self.index += 1;
914 Some(*v)
915 }
916}
917
918impl ExactSizeIterator for StreamIter {
919 #[inline]
920 fn len(&self) -> usize {
921 self.streams.len() - self.index
922 }
923}
924
925#[cfg(test)]
926mod tests {
927 use crate::range_buf::RangeBuf;
928
929 use super::*;
930
931 #[test]
932 fn recv_flow_control() {
933 let mut stream =
934 <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
935 assert!(!stream.recv.almost_full());
936
937 let mut buf = [0; 32];
938
939 let first = RangeBuf::from(b"hello", 0, false);
940 let second = RangeBuf::from(b"world", 5, false);
941 let third = RangeBuf::from(b"something", 10, false);
942
943 assert_eq!(stream.recv.write(second), Ok(()));
944 assert_eq!(stream.recv.write(first), Ok(()));
945 assert!(!stream.recv.almost_full());
946
947 assert_eq!(stream.recv.write(third), Err(Error::FlowControl));
948
949 let (len, fin) = stream.recv.emit(&mut buf).unwrap();
950 assert_eq!(&buf[..len], b"helloworld");
951 assert!(!fin);
952
953 assert!(stream.recv.almost_full());
954
955 stream.recv.update_max_data(std::time::Instant::now());
956 assert_eq!(stream.recv.max_data_next(), 25);
957 assert!(!stream.recv.almost_full());
958
959 let third = RangeBuf::from(b"something", 10, false);
960 assert_eq!(stream.recv.write(third), Ok(()));
961 }
962
963 #[test]
964 fn recv_past_fin() {
965 let mut stream =
966 <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
967 assert!(!stream.recv.almost_full());
968
969 let first = RangeBuf::from(b"hello", 0, true);
970 let second = RangeBuf::from(b"world", 5, false);
971
972 assert_eq!(stream.recv.write(first), Ok(()));
973 assert_eq!(stream.recv.write(second), Err(Error::FinalSize));
974 }
975
976 #[test]
977 fn recv_fin_dup() {
978 let mut stream =
979 <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
980 assert!(!stream.recv.almost_full());
981
982 let first = RangeBuf::from(b"hello", 0, true);
983 let second = RangeBuf::from(b"hello", 0, true);
984
985 assert_eq!(stream.recv.write(first), Ok(()));
986 assert_eq!(stream.recv.write(second), Ok(()));
987
988 let mut buf = [0; 32];
989
990 let (len, fin) = stream.recv.emit(&mut buf).unwrap();
991 assert_eq!(&buf[..len], b"hello");
992 assert!(fin);
993 }
994
995 #[test]
996 fn recv_fin_change() {
997 let mut stream =
998 <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
999 assert!(!stream.recv.almost_full());
1000
1001 let first = RangeBuf::from(b"hello", 0, true);
1002 let second = RangeBuf::from(b"world", 5, true);
1003
1004 assert_eq!(stream.recv.write(second), Ok(()));
1005 assert_eq!(stream.recv.write(first), Err(Error::FinalSize));
1006 }
1007
1008 #[test]
1009 fn recv_fin_lower_than_received() {
1010 let mut stream =
1011 <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1012 assert!(!stream.recv.almost_full());
1013
1014 let first = RangeBuf::from(b"hello", 0, true);
1015 let second = RangeBuf::from(b"world", 5, false);
1016
1017 assert_eq!(stream.recv.write(second), Ok(()));
1018 assert_eq!(stream.recv.write(first), Err(Error::FinalSize));
1019 }
1020
1021 #[test]
1022 fn recv_fin_flow_control() {
1023 let mut stream =
1024 <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1025 assert!(!stream.recv.almost_full());
1026
1027 let mut buf = [0; 32];
1028
1029 let first = RangeBuf::from(b"hello", 0, false);
1030 let second = RangeBuf::from(b"world", 5, true);
1031
1032 assert_eq!(stream.recv.write(first), Ok(()));
1033 assert_eq!(stream.recv.write(second), Ok(()));
1034
1035 let (len, fin) = stream.recv.emit(&mut buf).unwrap();
1036 assert_eq!(&buf[..len], b"helloworld");
1037 assert!(fin);
1038
1039 assert!(!stream.recv.almost_full());
1040 }
1041
1042 #[test]
1043 fn recv_fin_reset_mismatch() {
1044 let mut stream =
1045 <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1046 assert!(!stream.recv.almost_full());
1047
1048 let first = RangeBuf::from(b"hello", 0, true);
1049
1050 assert_eq!(stream.recv.write(first), Ok(()));
1051 assert_eq!(stream.recv.reset(0, 10), Err(Error::FinalSize));
1052 }
1053
1054 #[test]
1055 fn recv_reset_with_gap() {
1056 let mut stream =
1057 <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1058 assert!(!stream.recv.almost_full());
1059
1060 let first = RangeBuf::from(b"hello", 0, false);
1061
1062 assert_eq!(stream.recv.write(first), Ok(()));
1063 assert_eq!(stream.recv.emit(&mut [0; 1]), Ok((1, false)));
1065 assert_eq!(
1067 stream.recv.reset(0, 10),
1068 Ok(RecvBufResetReturn {
1069 max_data_delta: 5,
1070 consumed_flowcontrol: 9
1072 })
1073 );
1074 assert_eq!(stream.recv.reset(0, 10), Ok(RecvBufResetReturn::zero()));
1075 }
1076
1077 #[test]
1078 fn recv_reset_dup() {
1079 let mut stream =
1080 <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1081 assert!(!stream.recv.almost_full());
1082
1083 let first = RangeBuf::from(b"hello", 0, false);
1084
1085 assert_eq!(stream.recv.write(first), Ok(()));
1086 assert_eq!(
1087 stream.recv.reset(0, 5),
1088 Ok(RecvBufResetReturn {
1089 max_data_delta: 0,
1090 consumed_flowcontrol: 5
1091 })
1092 );
1093 assert_eq!(stream.recv.reset(0, 5), Ok(RecvBufResetReturn::zero()));
1094 }
1095
1096 #[test]
1097 fn recv_reset_change() {
1098 let mut stream =
1099 <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1100 assert!(!stream.recv.almost_full());
1101
1102 let first = RangeBuf::from(b"hello", 0, false);
1103
1104 assert_eq!(stream.recv.write(first), Ok(()));
1105 assert_eq!(
1106 stream.recv.reset(0, 5),
1107 Ok(RecvBufResetReturn {
1108 max_data_delta: 0,
1109 consumed_flowcontrol: 5
1110 })
1111 );
1112 assert_eq!(stream.recv.reset(0, 10), Err(Error::FinalSize));
1113 }
1114
1115 #[test]
1116 fn recv_reset_lower_than_received() {
1117 let mut stream =
1118 <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1119 assert!(!stream.recv.almost_full());
1120
1121 let first = RangeBuf::from(b"hello", 0, false);
1122
1123 assert_eq!(stream.recv.write(first), Ok(()));
1124 assert_eq!(stream.recv.reset(0, 4), Err(Error::FinalSize));
1125 }
1126
1127 #[test]
1128 fn send_flow_control() {
1129 let mut buf = [0; 25];
1130
1131 let mut stream =
1132 <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1133
1134 let first = b"hello";
1135 let second = b"world";
1136 let third = b"something";
1137
1138 assert!(stream.send.write(first, false).is_ok());
1139 assert!(stream.send.write(second, false).is_ok());
1140 assert!(stream.send.write(third, false).is_ok());
1141
1142 assert_eq!(stream.send.off_front(), 0);
1143
1144 let (written, fin) = stream.send.emit(&mut buf[..25]).unwrap();
1145 assert_eq!(written, 15);
1146 assert!(!fin);
1147 assert_eq!(&buf[..written], b"helloworldsomet");
1148
1149 assert_eq!(stream.send.off_front(), 15);
1150
1151 let (written, fin) = stream.send.emit(&mut buf[..25]).unwrap();
1152 assert_eq!(written, 0);
1153 assert!(!fin);
1154 assert_eq!(&buf[..written], b"");
1155
1156 stream.send.retransmit(0, 15);
1157
1158 assert_eq!(stream.send.off_front(), 0);
1159
1160 let (written, fin) = stream.send.emit(&mut buf[..10]).unwrap();
1161 assert_eq!(written, 10);
1162 assert!(!fin);
1163 assert_eq!(&buf[..written], b"helloworld");
1164
1165 assert_eq!(stream.send.off_front(), 10);
1166
1167 let (written, fin) = stream.send.emit(&mut buf[..10]).unwrap();
1168 assert_eq!(written, 5);
1169 assert!(!fin);
1170 assert_eq!(&buf[..written], b"somet");
1171 }
1172
1173 #[test]
1174 fn send_past_fin() {
1175 let mut stream =
1176 <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1177
1178 let first = b"hello";
1179 let second = b"world";
1180 let third = b"third";
1181
1182 assert_eq!(stream.send.write(first, false), Ok(5));
1183
1184 assert_eq!(stream.send.write(second, true), Ok(5));
1185 assert!(stream.send.is_fin());
1186
1187 assert_eq!(stream.send.write(third, false), Err(Error::FinalSize));
1188 }
1189
1190 #[test]
1191 fn send_fin_dup() {
1192 let mut stream =
1193 <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1194
1195 assert_eq!(stream.send.write(b"hello", true), Ok(5));
1196 assert!(stream.send.is_fin());
1197
1198 assert_eq!(stream.send.write(b"", true), Ok(0));
1199 assert!(stream.send.is_fin());
1200 }
1201
1202 #[test]
1203 fn send_undo_fin() {
1204 let mut stream =
1205 <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1206
1207 assert_eq!(stream.send.write(b"hello", true), Ok(5));
1208 assert!(stream.send.is_fin());
1209
1210 assert_eq!(
1211 stream.send.write(b"helloworld", true),
1212 Err(Error::FinalSize)
1213 );
1214 }
1215
1216 #[test]
1217 fn send_fin_max_data_match() {
1218 let mut buf = [0; 15];
1219
1220 let mut stream =
1221 <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1222
1223 let slice = b"hellohellohello";
1224
1225 assert!(stream.send.write(slice, true).is_ok());
1226
1227 let (written, fin) = stream.send.emit(&mut buf[..15]).unwrap();
1228 assert_eq!(written, 15);
1229 assert!(fin);
1230 assert_eq!(&buf[..written], slice);
1231 }
1232
1233 #[test]
1234 fn send_fin_zero_length() {
1235 let mut buf = [0; 5];
1236
1237 let mut stream =
1238 <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1239
1240 assert_eq!(stream.send.write(b"hello", false), Ok(5));
1241 assert_eq!(stream.send.write(b"", true), Ok(0));
1242 assert!(stream.send.is_fin());
1243
1244 let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1245 assert_eq!(written, 5);
1246 assert!(fin);
1247 assert_eq!(&buf[..written], b"hello");
1248 }
1249
1250 #[test]
1251 fn send_ack() {
1252 let mut buf = [0; 5];
1253
1254 let mut stream =
1255 <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1256
1257 assert_eq!(stream.send.write(b"hello", false), Ok(5));
1258 assert_eq!(stream.send.write(b"world", false), Ok(5));
1259 assert_eq!(stream.send.write(b"", true), Ok(0));
1260 assert!(stream.send.is_fin());
1261
1262 assert_eq!(stream.send.off_front(), 0);
1263
1264 let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1265 assert_eq!(written, 5);
1266 assert!(!fin);
1267 assert_eq!(&buf[..written], b"hello");
1268
1269 stream.send.ack_and_drop(0, 5);
1270
1271 stream.send.retransmit(0, 5);
1272
1273 assert_eq!(stream.send.off_front(), 5);
1274
1275 let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1276 assert_eq!(written, 5);
1277 assert!(fin);
1278 assert_eq!(&buf[..written], b"world");
1279 }
1280
1281 #[test]
1282 fn send_ack_reordering() {
1283 let mut buf = [0; 5];
1284
1285 let mut stream =
1286 <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1287
1288 assert_eq!(stream.send.write(b"hello", false), Ok(5));
1289 assert_eq!(stream.send.write(b"world", false), Ok(5));
1290 assert_eq!(stream.send.write(b"", true), Ok(0));
1291 assert!(stream.send.is_fin());
1292
1293 assert_eq!(stream.send.off_front(), 0);
1294
1295 let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1296 assert_eq!(written, 5);
1297 assert!(!fin);
1298 assert_eq!(&buf[..written], b"hello");
1299
1300 assert_eq!(stream.send.off_front(), 5);
1301
1302 let (written, fin) = stream.send.emit(&mut buf[..1]).unwrap();
1303 assert_eq!(written, 1);
1304 assert!(!fin);
1305 assert_eq!(&buf[..written], b"w");
1306
1307 stream.send.ack_and_drop(5, 1);
1308 stream.send.ack_and_drop(0, 5);
1309
1310 stream.send.retransmit(0, 5);
1311 stream.send.retransmit(5, 1);
1312
1313 assert_eq!(stream.send.off_front(), 6);
1314
1315 let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1316 assert_eq!(written, 4);
1317 assert!(fin);
1318 assert_eq!(&buf[..written], b"orld");
1319 }
1320
1321 #[test]
1322 fn recv_data_below_off() {
1323 let mut stream =
1324 <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1325
1326 let first = RangeBuf::from(b"hello", 0, false);
1327
1328 assert_eq!(stream.recv.write(first), Ok(()));
1329
1330 let mut buf = [0; 10];
1331
1332 let (len, fin) = stream.recv.emit(&mut buf).unwrap();
1333 assert_eq!(&buf[..len], b"hello");
1334 assert!(!fin);
1335
1336 let first = RangeBuf::from(b"elloworld", 1, true);
1337 assert_eq!(stream.recv.write(first), Ok(()));
1338
1339 let (len, fin) = stream.recv.emit(&mut buf).unwrap();
1340 assert_eq!(&buf[..len], b"world");
1341 assert!(fin);
1342 }
1343
1344 #[test]
1345 fn stream_complete() {
1346 let mut stream =
1347 <Stream>::new(0, 30, 30, true, true, DEFAULT_STREAM_WINDOW);
1348
1349 assert_eq!(stream.send.write(b"hello", false), Ok(5));
1350 assert_eq!(stream.send.write(b"world", false), Ok(5));
1351
1352 assert!(!stream.send.is_complete());
1353 assert!(!stream.send.is_fin());
1354
1355 assert_eq!(stream.send.write(b"", true), Ok(0));
1356
1357 assert!(!stream.send.is_complete());
1358 assert!(stream.send.is_fin());
1359
1360 let buf = RangeBuf::from(b"hello", 0, true);
1361 assert!(stream.recv.write(buf).is_ok());
1362 assert!(!stream.recv.is_fin());
1363
1364 stream.send.ack(6, 4);
1365 assert!(!stream.send.is_complete());
1366
1367 let mut buf = [0; 2];
1368 assert_eq!(stream.recv.emit(&mut buf), Ok((2, false)));
1369 assert!(!stream.recv.is_fin());
1370
1371 stream.send.ack(1, 5);
1372 assert!(!stream.send.is_complete());
1373
1374 stream.send.ack(0, 1);
1375 assert!(stream.send.is_complete());
1376
1377 assert!(!stream.is_complete());
1378
1379 let mut buf = [0; 3];
1380 assert_eq!(stream.recv.emit(&mut buf), Ok((3, true)));
1381 assert!(stream.recv.is_fin());
1382
1383 assert!(stream.is_complete());
1384 }
1385
1386 #[test]
1387 fn send_fin_zero_length_output() {
1388 let mut buf = [0; 5];
1389
1390 let mut stream =
1391 <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1392
1393 assert_eq!(stream.send.write(b"hello", false), Ok(5));
1394 assert_eq!(stream.send.off_front(), 0);
1395 assert!(!stream.send.is_fin());
1396
1397 let (written, fin) = stream.send.emit(&mut buf).unwrap();
1398 assert_eq!(written, 5);
1399 assert!(!fin);
1400 assert_eq!(&buf[..written], b"hello");
1401
1402 assert_eq!(stream.send.write(b"", true), Ok(0));
1403 assert!(stream.send.is_fin());
1404 assert_eq!(stream.send.off_front(), 5);
1405
1406 let (written, fin) = stream.send.emit(&mut buf).unwrap();
1407 assert_eq!(written, 0);
1408 assert!(fin);
1409 assert_eq!(&buf[..written], b"");
1410 }
1411
1412 fn stream_send_ready(stream: &Stream) -> bool {
1413 !stream.send.is_empty() &&
1414 stream.send.off_front() < stream.send.off_back()
1415 }
1416
1417 #[test]
1418 fn send_emit() {
1419 let mut buf = [0; 5];
1420
1421 let mut stream =
1422 <Stream>::new(0, 0, 20, true, true, DEFAULT_STREAM_WINDOW);
1423
1424 assert_eq!(stream.send.write(b"hello", false), Ok(5));
1425 assert_eq!(stream.send.write(b"world", false), Ok(5));
1426 assert_eq!(stream.send.write(b"olleh", false), Ok(5));
1427 assert_eq!(stream.send.write(b"dlrow", true), Ok(5));
1428 assert_eq!(stream.send.off_front(), 0);
1429 assert_eq!(stream.send.bufs_count(), 4);
1430
1431 assert!(stream.is_flushable());
1432
1433 assert!(stream_send_ready(&stream));
1434 assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1435 assert_eq!(stream.send.off_front(), 4);
1436 assert_eq!(&buf[..4], b"hell");
1437
1438 assert!(stream_send_ready(&stream));
1439 assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1440 assert_eq!(stream.send.off_front(), 8);
1441 assert_eq!(&buf[..4], b"owor");
1442
1443 assert!(stream_send_ready(&stream));
1444 assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1445 assert_eq!(stream.send.off_front(), 10);
1446 assert_eq!(&buf[..2], b"ld");
1447
1448 assert!(stream_send_ready(&stream));
1449 assert_eq!(stream.send.emit(&mut buf[..1]), Ok((1, false)));
1450 assert_eq!(stream.send.off_front(), 11);
1451 assert_eq!(&buf[..1], b"o");
1452
1453 assert!(stream_send_ready(&stream));
1454 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1455 assert_eq!(stream.send.off_front(), 16);
1456 assert_eq!(&buf[..5], b"llehd");
1457
1458 assert!(stream_send_ready(&stream));
1459 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((4, true)));
1460 assert_eq!(stream.send.off_front(), 20);
1461 assert_eq!(&buf[..4], b"lrow");
1462
1463 assert!(!stream.is_flushable());
1464
1465 assert!(!stream_send_ready(&stream));
1466 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((0, true)));
1467 assert_eq!(stream.send.off_front(), 20);
1468 }
1469
1470 #[test]
1471 fn send_emit_ack() {
1472 let mut buf = [0; 5];
1473
1474 let mut stream =
1475 <Stream>::new(0, 0, 20, true, true, DEFAULT_STREAM_WINDOW);
1476
1477 assert_eq!(stream.send.write(b"hello", false), Ok(5));
1478 assert_eq!(stream.send.write(b"world", false), Ok(5));
1479 assert_eq!(stream.send.write(b"olleh", false), Ok(5));
1480 assert_eq!(stream.send.write(b"dlrow", true), Ok(5));
1481 assert_eq!(stream.send.off_front(), 0);
1482 assert_eq!(stream.send.bufs_count(), 4);
1483
1484 assert!(stream.is_flushable());
1485
1486 assert!(stream_send_ready(&stream));
1487 assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1488 assert_eq!(stream.send.off_front(), 4);
1489 assert_eq!(&buf[..4], b"hell");
1490
1491 assert!(stream_send_ready(&stream));
1492 assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1493 assert_eq!(stream.send.off_front(), 8);
1494 assert_eq!(&buf[..4], b"owor");
1495
1496 stream.send.ack_and_drop(0, 5);
1497 assert_eq!(stream.send.bufs_count(), 3);
1498
1499 assert!(stream_send_ready(&stream));
1500 assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1501 assert_eq!(stream.send.off_front(), 10);
1502 assert_eq!(&buf[..2], b"ld");
1503
1504 stream.send.ack_and_drop(7, 5);
1505 assert_eq!(stream.send.bufs_count(), 3);
1506
1507 assert!(stream_send_ready(&stream));
1508 assert_eq!(stream.send.emit(&mut buf[..1]), Ok((1, false)));
1509 assert_eq!(stream.send.off_front(), 11);
1510 assert_eq!(&buf[..1], b"o");
1511
1512 assert!(stream_send_ready(&stream));
1513 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1514 assert_eq!(stream.send.off_front(), 16);
1515 assert_eq!(&buf[..5], b"llehd");
1516
1517 stream.send.ack_and_drop(5, 7);
1518 assert_eq!(stream.send.bufs_count(), 2);
1519
1520 assert!(stream_send_ready(&stream));
1521 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((4, true)));
1522 assert_eq!(stream.send.off_front(), 20);
1523 assert_eq!(&buf[..4], b"lrow");
1524
1525 assert!(!stream.is_flushable());
1526
1527 assert!(!stream_send_ready(&stream));
1528 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((0, true)));
1529 assert_eq!(stream.send.off_front(), 20);
1530
1531 stream.send.ack_and_drop(22, 4);
1532 assert_eq!(stream.send.bufs_count(), 2);
1533
1534 stream.send.ack_and_drop(20, 1);
1535 assert_eq!(stream.send.bufs_count(), 2);
1536 }
1537
1538 #[test]
1539 fn send_emit_retransmit() {
1540 let mut buf = [0; 5];
1541
1542 let mut stream =
1543 <Stream>::new(0, 0, 20, true, true, DEFAULT_STREAM_WINDOW);
1544
1545 assert_eq!(stream.send.write(b"hello", false), Ok(5));
1546 assert_eq!(stream.send.write(b"world", false), Ok(5));
1547 assert_eq!(stream.send.write(b"olleh", false), Ok(5));
1548 assert_eq!(stream.send.write(b"dlrow", true), Ok(5));
1549 assert_eq!(stream.send.off_front(), 0);
1550 assert_eq!(stream.send.bufs_count(), 4);
1551
1552 assert!(stream.is_flushable());
1553
1554 assert!(stream_send_ready(&stream));
1555 assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1556 assert_eq!(stream.send.off_front(), 4);
1557 assert_eq!(&buf[..4], b"hell");
1558
1559 assert!(stream_send_ready(&stream));
1560 assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1561 assert_eq!(stream.send.off_front(), 8);
1562 assert_eq!(&buf[..4], b"owor");
1563
1564 stream.send.retransmit(3, 3);
1565 assert_eq!(stream.send.off_front(), 3);
1566
1567 assert!(stream_send_ready(&stream));
1568 assert_eq!(stream.send.emit(&mut buf[..3]), Ok((3, false)));
1569 assert_eq!(stream.send.off_front(), 8);
1570 assert_eq!(&buf[..3], b"low");
1571
1572 assert!(stream_send_ready(&stream));
1573 assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1574 assert_eq!(stream.send.off_front(), 10);
1575 assert_eq!(&buf[..2], b"ld");
1576
1577 stream.send.ack_and_drop(7, 2);
1578
1579 stream.send.retransmit(8, 2);
1580
1581 assert!(stream_send_ready(&stream));
1582 assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1583 assert_eq!(stream.send.off_front(), 10);
1584 assert_eq!(&buf[..2], b"ld");
1585
1586 assert!(stream_send_ready(&stream));
1587 assert_eq!(stream.send.emit(&mut buf[..1]), Ok((1, false)));
1588 assert_eq!(stream.send.off_front(), 11);
1589 assert_eq!(&buf[..1], b"o");
1590
1591 assert!(stream_send_ready(&stream));
1592 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1593 assert_eq!(stream.send.off_front(), 16);
1594 assert_eq!(&buf[..5], b"llehd");
1595
1596 stream.send.retransmit(12, 2);
1597
1598 assert!(stream_send_ready(&stream));
1599 assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1600 assert_eq!(stream.send.off_front(), 16);
1601 assert_eq!(&buf[..2], b"le");
1602
1603 assert!(stream_send_ready(&stream));
1604 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((4, true)));
1605 assert_eq!(stream.send.off_front(), 20);
1606 assert_eq!(&buf[..4], b"lrow");
1607
1608 assert!(!stream.is_flushable());
1609
1610 assert!(!stream_send_ready(&stream));
1611 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((0, true)));
1612 assert_eq!(stream.send.off_front(), 20);
1613
1614 stream.send.retransmit(7, 12);
1615
1616 assert!(stream_send_ready(&stream));
1617 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1618 assert_eq!(stream.send.off_front(), 12);
1619 assert_eq!(&buf[..5], b"rldol");
1620
1621 assert!(stream_send_ready(&stream));
1622 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1623 assert_eq!(stream.send.off_front(), 17);
1624 assert_eq!(&buf[..5], b"lehdl");
1625
1626 assert!(stream_send_ready(&stream));
1627 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((2, false)));
1628 assert_eq!(stream.send.off_front(), 20);
1629 assert_eq!(&buf[..2], b"ro");
1630
1631 stream.send.ack_and_drop(12, 7);
1632
1633 stream.send.retransmit(7, 12);
1634
1635 assert!(stream_send_ready(&stream));
1636 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1637 assert_eq!(stream.send.off_front(), 12);
1638 assert_eq!(&buf[..5], b"rldol");
1639
1640 assert!(stream_send_ready(&stream));
1641 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1642 assert_eq!(stream.send.off_front(), 17);
1643 assert_eq!(&buf[..5], b"lehdl");
1644
1645 assert!(stream_send_ready(&stream));
1646 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((2, false)));
1647 assert_eq!(stream.send.off_front(), 20);
1648 assert_eq!(&buf[..2], b"ro");
1649 }
1650
1651 #[test]
1652 fn rangebuf_split_off() {
1653 let mut buf = <RangeBuf>::from(b"helloworld", 5, true);
1654 assert_eq!(buf.start, 0);
1655 assert_eq!(buf.pos, 0);
1656 assert_eq!(buf.len, 10);
1657 assert_eq!(buf.off, 5);
1658 assert!(buf.fin);
1659
1660 assert_eq!(buf.len(), 10);
1661 assert_eq!(buf.off(), 5);
1662 assert!(buf.fin());
1663
1664 assert_eq!(&buf[..], b"helloworld");
1665
1666 buf.consume(5);
1668
1669 assert_eq!(buf.start, 0);
1670 assert_eq!(buf.pos, 5);
1671 assert_eq!(buf.len, 10);
1672 assert_eq!(buf.off, 5);
1673 assert!(buf.fin);
1674
1675 assert_eq!(buf.len(), 5);
1676 assert_eq!(buf.off(), 10);
1677 assert!(buf.fin());
1678
1679 assert_eq!(&buf[..], b"world");
1680
1681 let mut new_buf = buf.split_off(3);
1683
1684 assert_eq!(buf.start, 0);
1685 assert_eq!(buf.pos, 3);
1686 assert_eq!(buf.len, 3);
1687 assert_eq!(buf.off, 5);
1688 assert!(!buf.fin);
1689
1690 assert_eq!(buf.len(), 0);
1691 assert_eq!(buf.off(), 8);
1692 assert!(!buf.fin());
1693
1694 assert_eq!(&buf[..], b"");
1695
1696 assert_eq!(new_buf.start, 3);
1697 assert_eq!(new_buf.pos, 5);
1698 assert_eq!(new_buf.len, 7);
1699 assert_eq!(new_buf.off, 8);
1700 assert!(new_buf.fin);
1701
1702 assert_eq!(new_buf.len(), 5);
1703 assert_eq!(new_buf.off(), 10);
1704 assert!(new_buf.fin());
1705
1706 assert_eq!(&new_buf[..], b"world");
1707
1708 new_buf.consume(2);
1710
1711 assert_eq!(new_buf.start, 3);
1712 assert_eq!(new_buf.pos, 7);
1713 assert_eq!(new_buf.len, 7);
1714 assert_eq!(new_buf.off, 8);
1715 assert!(new_buf.fin);
1716
1717 assert_eq!(new_buf.len(), 3);
1718 assert_eq!(new_buf.off(), 12);
1719 assert!(new_buf.fin());
1720
1721 assert_eq!(&new_buf[..], b"rld");
1722
1723 let mut new_new_buf = new_buf.split_off(5);
1725
1726 assert_eq!(new_buf.start, 3);
1727 assert_eq!(new_buf.pos, 7);
1728 assert_eq!(new_buf.len, 5);
1729 assert_eq!(new_buf.off, 8);
1730 assert!(!new_buf.fin);
1731
1732 assert_eq!(new_buf.len(), 1);
1733 assert_eq!(new_buf.off(), 12);
1734 assert!(!new_buf.fin());
1735
1736 assert_eq!(&new_buf[..], b"r");
1737
1738 assert_eq!(new_new_buf.start, 8);
1739 assert_eq!(new_new_buf.pos, 8);
1740 assert_eq!(new_new_buf.len, 2);
1741 assert_eq!(new_new_buf.off, 13);
1742 assert!(new_new_buf.fin);
1743
1744 assert_eq!(new_new_buf.len(), 2);
1745 assert_eq!(new_new_buf.off(), 13);
1746 assert!(new_new_buf.fin());
1747
1748 assert_eq!(&new_new_buf[..], b"ld");
1749
1750 new_new_buf.consume(2);
1752
1753 assert_eq!(new_new_buf.start, 8);
1754 assert_eq!(new_new_buf.pos, 10);
1755 assert_eq!(new_new_buf.len, 2);
1756 assert_eq!(new_new_buf.off, 13);
1757 assert!(new_new_buf.fin);
1758
1759 assert_eq!(new_new_buf.len(), 0);
1760 assert_eq!(new_new_buf.off(), 15);
1761 assert!(new_new_buf.fin());
1762
1763 assert_eq!(&new_new_buf[..], b"");
1764 }
1765
1766 #[test]
1769 fn stream_limit_auto_open() {
1770 let local_tp = crate::TransportParams::default();
1771 let peer_tp = crate::TransportParams::default();
1772
1773 let mut streams = <StreamMap>::new(5, 5, 5);
1774
1775 let stream_id = 500;
1776 assert!(!is_local(stream_id, true), "stream id is peer initiated");
1777 assert!(is_bidi(stream_id), "stream id is bidirectional");
1778 assert_eq!(
1779 streams
1780 .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
1781 .err(),
1782 Some(Error::StreamLimit),
1783 "stream limit should be exceeded"
1784 );
1785 }
1786
1787 #[test]
1790 fn stream_create_out_of_order() {
1791 let local_tp = crate::TransportParams::default();
1792 let peer_tp = crate::TransportParams::default();
1793
1794 let mut streams = <StreamMap>::new(5, 5, 5);
1795
1796 for stream_id in [8, 12, 4] {
1797 assert!(is_local(stream_id, false), "stream id is client initiated");
1798 assert!(is_bidi(stream_id), "stream id is bidirectional");
1799 assert!(streams
1800 .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
1801 .is_ok());
1802 }
1803 }
1804
1805 #[test]
1807 fn stream_limit_edge() {
1808 let local_tp = crate::TransportParams::default();
1809 let peer_tp = crate::TransportParams::default();
1810
1811 let mut streams = <StreamMap>::new(3, 3, 3);
1812
1813 let stream_id = 8;
1815 assert!(streams
1816 .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
1817 .is_ok());
1818
1819 let stream_id = 12;
1821 assert_eq!(
1822 streams
1823 .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
1824 .err(),
1825 Some(Error::StreamLimit)
1826 );
1827 }
1828
1829 fn cycle_stream_priority(stream_id: u64, streams: &mut StreamMap) {
1830 let key = streams.get(stream_id).unwrap().priority_key.clone();
1831 streams.update_priority(&key.clone(), &key);
1832 }
1833
1834 #[test]
1835 fn writable_prioritized_default_priority() {
1836 let local_tp = crate::TransportParams::default();
1837 let peer_tp = crate::TransportParams {
1838 initial_max_stream_data_bidi_local: 100,
1839 initial_max_stream_data_uni: 100,
1840 ..Default::default()
1841 };
1842
1843 let mut streams = StreamMap::new(100, 100, 100);
1844
1845 for id in [0, 4, 8, 12] {
1846 assert!(streams
1847 .get_or_create(id, &local_tp, &peer_tp, false, true)
1848 .is_ok());
1849 }
1850
1851 let walk_1: Vec<u64> = streams.writable().collect();
1852 cycle_stream_priority(*walk_1.first().unwrap(), &mut streams);
1853 let walk_2: Vec<u64> = streams.writable().collect();
1854 cycle_stream_priority(*walk_2.first().unwrap(), &mut streams);
1855 let walk_3: Vec<u64> = streams.writable().collect();
1856 cycle_stream_priority(*walk_3.first().unwrap(), &mut streams);
1857 let walk_4: Vec<u64> = streams.writable().collect();
1858 cycle_stream_priority(*walk_4.first().unwrap(), &mut streams);
1859 let walk_5: Vec<u64> = streams.writable().collect();
1860
1861 assert_eq!(walk_1, vec![0, 4, 8, 12]);
1864 assert_eq!(walk_2, vec![4, 8, 12, 0]);
1865 assert_eq!(walk_3, vec![8, 12, 0, 4]);
1866 assert_eq!(walk_4, vec![12, 0, 4, 8,]);
1867 assert_eq!(walk_5, vec![0, 4, 8, 12]);
1868 }
1869
1870 #[test]
1871 fn writable_prioritized_insert_order() {
1872 let local_tp = crate::TransportParams::default();
1873 let peer_tp = crate::TransportParams {
1874 initial_max_stream_data_bidi_local: 100,
1875 initial_max_stream_data_uni: 100,
1876 ..Default::default()
1877 };
1878
1879 let mut streams = StreamMap::new(100, 100, 100);
1880
1881 for id in [12, 4, 8, 0] {
1884 assert!(streams
1885 .get_or_create(id, &local_tp, &peer_tp, false, true)
1886 .is_ok());
1887 }
1888
1889 let walk_1: Vec<u64> = streams.writable().collect();
1890 cycle_stream_priority(*walk_1.first().unwrap(), &mut streams);
1891 let walk_2: Vec<u64> = streams.writable().collect();
1892 cycle_stream_priority(*walk_2.first().unwrap(), &mut streams);
1893 let walk_3: Vec<u64> = streams.writable().collect();
1894 cycle_stream_priority(*walk_3.first().unwrap(), &mut streams);
1895 let walk_4: Vec<u64> = streams.writable().collect();
1896 cycle_stream_priority(*walk_4.first().unwrap(), &mut streams);
1897 let walk_5: Vec<u64> = streams.writable().collect();
1898 assert_eq!(walk_1, vec![12, 4, 8, 0]);
1899 assert_eq!(walk_2, vec![4, 8, 0, 12]);
1900 assert_eq!(walk_3, vec![8, 0, 12, 4,]);
1901 assert_eq!(walk_4, vec![0, 12, 4, 8]);
1902 assert_eq!(walk_5, vec![12, 4, 8, 0]);
1903 }
1904
1905 #[test]
1906 fn writable_prioritized_mixed_urgency() {
1907 let local_tp = crate::TransportParams::default();
1908 let peer_tp = crate::TransportParams {
1909 initial_max_stream_data_bidi_local: 100,
1910 initial_max_stream_data_uni: 100,
1911 ..Default::default()
1912 };
1913
1914 let mut streams = <StreamMap>::new(100, 100, 100);
1915
1916 let input = vec![
1919 (0, 100),
1920 (4, 90),
1921 (8, 80),
1922 (12, 70),
1923 (16, 60),
1924 (20, 50),
1925 (24, 40),
1926 (28, 30),
1927 (32, 20),
1928 (36, 10),
1929 (40, 0),
1930 ];
1931
1932 for (id, urgency) in input.clone() {
1933 let stream = streams
1936 .get_or_create(id, &local_tp, &peer_tp, false, true)
1937 .unwrap();
1938
1939 stream.urgency = urgency;
1940
1941 let new_priority_key = Arc::new(StreamPriorityKey {
1942 urgency: stream.urgency,
1943 incremental: stream.incremental,
1944 id,
1945 ..Default::default()
1946 });
1947
1948 let old_priority_key = std::mem::replace(
1949 &mut stream.priority_key,
1950 new_priority_key.clone(),
1951 );
1952
1953 streams.update_priority(&old_priority_key, &new_priority_key);
1954 }
1955
1956 let walk_1: Vec<u64> = streams.writable().collect();
1957 assert_eq!(walk_1, vec![40, 36, 32, 28, 24, 20, 16, 12, 8, 4, 0]);
1958
1959 for (id, urgency) in input {
1961 let stream = streams
1964 .get_or_create(id, &local_tp, &peer_tp, false, true)
1965 .unwrap();
1966
1967 stream.urgency = urgency;
1968
1969 let new_priority_key = Arc::new(StreamPriorityKey {
1970 urgency: stream.urgency,
1971 incremental: stream.incremental,
1972 id,
1973 ..Default::default()
1974 });
1975
1976 let old_priority_key = std::mem::replace(
1977 &mut stream.priority_key,
1978 new_priority_key.clone(),
1979 );
1980
1981 streams.update_priority(&old_priority_key, &new_priority_key);
1982 }
1983
1984 let walk_2: Vec<u64> = streams.writable().collect();
1985 assert_eq!(walk_2, vec![40, 36, 32, 28, 24, 20, 16, 12, 8, 4, 0]);
1986
1987 streams.collect(24, true);
1989
1990 let walk_3: Vec<u64> = streams.writable().collect();
1991 assert_eq!(walk_3, vec![40, 36, 32, 28, 20, 16, 12, 8, 4, 0]);
1992
1993 streams.collect(40, true);
1994 streams.collect(0, true);
1995
1996 let walk_4: Vec<u64> = streams.writable().collect();
1997 assert_eq!(walk_4, vec![36, 32, 28, 20, 16, 12, 8, 4]);
1998
1999 streams
2001 .get_or_create(44, &local_tp, &peer_tp, false, true)
2002 .unwrap();
2003
2004 let walk_5: Vec<u64> = streams.writable().collect();
2005 assert_eq!(walk_5, vec![36, 32, 28, 20, 16, 12, 8, 4, 44]);
2006 }
2007
2008 #[test]
2009 fn writable_prioritized_mixed_urgencies_incrementals() {
2010 let local_tp = crate::TransportParams::default();
2011 let peer_tp = crate::TransportParams {
2012 initial_max_stream_data_bidi_local: 100,
2013 initial_max_stream_data_uni: 100,
2014 ..Default::default()
2015 };
2016
2017 let mut streams = StreamMap::new(100, 100, 100);
2018
2019 let input = vec![
2021 (0, 100),
2022 (4, 20),
2023 (8, 100),
2024 (12, 20),
2025 (16, 90),
2026 (20, 25),
2027 (24, 90),
2028 (28, 30),
2029 (32, 80),
2030 (36, 20),
2031 (40, 0),
2032 ];
2033
2034 for (id, urgency) in input.clone() {
2035 let stream = streams
2038 .get_or_create(id, &local_tp, &peer_tp, false, true)
2039 .unwrap();
2040
2041 stream.urgency = urgency;
2042
2043 let new_priority_key = Arc::new(StreamPriorityKey {
2044 urgency: stream.urgency,
2045 incremental: stream.incremental,
2046 id,
2047 ..Default::default()
2048 });
2049
2050 let old_priority_key = std::mem::replace(
2051 &mut stream.priority_key,
2052 new_priority_key.clone(),
2053 );
2054
2055 streams.update_priority(&old_priority_key, &new_priority_key);
2056 }
2057
2058 let walk_1: Vec<u64> = streams.writable().collect();
2059 cycle_stream_priority(4, &mut streams);
2060 cycle_stream_priority(16, &mut streams);
2061 cycle_stream_priority(0, &mut streams);
2062 let walk_2: Vec<u64> = streams.writable().collect();
2063 cycle_stream_priority(12, &mut streams);
2064 cycle_stream_priority(24, &mut streams);
2065 cycle_stream_priority(8, &mut streams);
2066 let walk_3: Vec<u64> = streams.writable().collect();
2067 cycle_stream_priority(36, &mut streams);
2068 cycle_stream_priority(16, &mut streams);
2069 cycle_stream_priority(0, &mut streams);
2070 let walk_4: Vec<u64> = streams.writable().collect();
2071 cycle_stream_priority(4, &mut streams);
2072 cycle_stream_priority(24, &mut streams);
2073 cycle_stream_priority(8, &mut streams);
2074 let walk_5: Vec<u64> = streams.writable().collect();
2075 cycle_stream_priority(12, &mut streams);
2076 cycle_stream_priority(16, &mut streams);
2077 cycle_stream_priority(0, &mut streams);
2078 let walk_6: Vec<u64> = streams.writable().collect();
2079 cycle_stream_priority(36, &mut streams);
2080 cycle_stream_priority(24, &mut streams);
2081 cycle_stream_priority(8, &mut streams);
2082 let walk_7: Vec<u64> = streams.writable().collect();
2083 cycle_stream_priority(4, &mut streams);
2084 cycle_stream_priority(16, &mut streams);
2085 cycle_stream_priority(0, &mut streams);
2086 let walk_8: Vec<u64> = streams.writable().collect();
2087 cycle_stream_priority(12, &mut streams);
2088 cycle_stream_priority(24, &mut streams);
2089 cycle_stream_priority(8, &mut streams);
2090 let walk_9: Vec<u64> = streams.writable().collect();
2091 cycle_stream_priority(36, &mut streams);
2092 cycle_stream_priority(16, &mut streams);
2093 cycle_stream_priority(0, &mut streams);
2094
2095 assert_eq!(walk_1, vec![40, 4, 12, 36, 20, 28, 32, 16, 24, 0, 8]);
2096 assert_eq!(walk_2, vec![40, 12, 36, 4, 20, 28, 32, 24, 16, 8, 0]);
2097 assert_eq!(walk_3, vec![40, 36, 4, 12, 20, 28, 32, 16, 24, 0, 8]);
2098 assert_eq!(walk_4, vec![40, 4, 12, 36, 20, 28, 32, 24, 16, 8, 0]);
2099 assert_eq!(walk_5, vec![40, 12, 36, 4, 20, 28, 32, 16, 24, 0, 8]);
2100 assert_eq!(walk_6, vec![40, 36, 4, 12, 20, 28, 32, 24, 16, 8, 0]);
2101 assert_eq!(walk_7, vec![40, 4, 12, 36, 20, 28, 32, 16, 24, 0, 8]);
2102 assert_eq!(walk_8, vec![40, 12, 36, 4, 20, 28, 32, 24, 16, 8, 0]);
2103 assert_eq!(walk_9, vec![40, 36, 4, 12, 20, 28, 32, 16, 24, 0, 8]);
2104
2105 streams.collect(20, true);
2107
2108 let walk_10: Vec<u64> = streams.writable().collect();
2109 assert_eq!(walk_10, vec![40, 4, 12, 36, 28, 32, 24, 16, 8, 0]);
2110
2111 let stream = streams
2113 .get_or_create(44, &local_tp, &peer_tp, false, true)
2114 .unwrap();
2115
2116 stream.urgency = 20;
2117 stream.incremental = true;
2118
2119 let new_priority_key = Arc::new(StreamPriorityKey {
2120 urgency: stream.urgency,
2121 incremental: stream.incremental,
2122 id: 44,
2123 ..Default::default()
2124 });
2125
2126 let old_priority_key =
2127 std::mem::replace(&mut stream.priority_key, new_priority_key.clone());
2128
2129 streams.update_priority(&old_priority_key, &new_priority_key);
2130
2131 let walk_11: Vec<u64> = streams.writable().collect();
2132 assert_eq!(walk_11, vec![40, 4, 12, 36, 44, 28, 32, 24, 16, 8, 0]);
2133 }
2134
2135 #[test]
2136 fn priority_tree_dupes() {
2137 let mut prioritized_writable: RBTree<StreamWritablePriorityAdapter> =
2138 Default::default();
2139
2140 for id in [0, 4, 8, 12] {
2141 let s = Arc::new(StreamPriorityKey {
2142 urgency: 0,
2143 incremental: false,
2144 id,
2145 ..Default::default()
2146 });
2147
2148 prioritized_writable.insert(s);
2149 }
2150
2151 let walk_1: Vec<u64> =
2152 prioritized_writable.iter().map(|s| s.id).collect();
2153 assert_eq!(walk_1, vec![0, 4, 8, 12]);
2154
2155 for id in [0, 4, 8, 12] {
2158 let s = Arc::new(StreamPriorityKey {
2159 urgency: 0,
2160 incremental: false,
2161 id,
2162 ..Default::default()
2163 });
2164
2165 prioritized_writable.insert(s);
2166 }
2167
2168 let walk_2: Vec<u64> =
2169 prioritized_writable.iter().map(|s| s.id).collect();
2170 assert_eq!(walk_2, vec![0, 0, 4, 4, 8, 8, 12, 12]);
2171 }
2172}
2173
2174mod recv_buf;
2175mod send_buf;