1use std::cmp;
28
29use std::sync::Arc;
30
31use std::collections::hash_map;
32use std::collections::HashMap;
33use std::collections::HashSet;
34
35use intrusive_collections::intrusive_adapter;
36use intrusive_collections::KeyAdapter;
37use intrusive_collections::RBTree;
38use intrusive_collections::RBTreeAtomicLink;
39
40use smallvec::SmallVec;
41
42use crate::range_buf::DefaultBufFactory;
43use crate::BufFactory;
44use crate::Error;
45use crate::Result;
46
47const DEFAULT_URGENCY: u8 = 127;
48
49const DEFAULT_STREAM_WINDOW: u64 = 32 * 1024;
51
52pub const MAX_STREAM_WINDOW: u64 = 16 * 1024 * 1024;
54
55#[derive(Default)]
60pub struct StreamIdHasher {
61 id: u64,
62}
63
64#[derive(Debug, PartialEq, Clone, Copy)]
66pub struct RecvBufResetReturn {
67 pub max_data_delta: u64,
70
71 pub consumed_flowcontrol: u64,
74}
75
76impl RecvBufResetReturn {
77 pub fn zero() -> Self {
78 Self {
79 max_data_delta: 0,
80 consumed_flowcontrol: 0,
81 }
82 }
83}
84
85impl std::hash::Hasher for StreamIdHasher {
86 #[inline]
87 fn finish(&self) -> u64 {
88 self.id
89 }
90
91 #[inline]
92 fn write_u64(&mut self, id: u64) {
93 self.id = id;
94 }
95
96 #[inline]
97 fn write(&mut self, _: &[u8]) {
98 unimplemented!()
101 }
102}
103
104type BuildStreamIdHasher = std::hash::BuildHasherDefault<StreamIdHasher>;
105
106pub type StreamIdHashMap<V> = HashMap<u64, V, BuildStreamIdHasher>;
107pub type StreamIdHashSet = HashSet<u64, BuildStreamIdHasher>;
108
109#[derive(Default)]
111pub struct StreamMap<F: BufFactory = DefaultBufFactory> {
112 streams: StreamIdHashMap<Stream<F>>,
114
115 collected: StreamIdHashSet,
121
122 peer_max_streams_bidi: u64,
124
125 peer_max_streams_uni: u64,
127
128 peer_opened_streams_bidi: u64,
130
131 peer_opened_streams_uni: u64,
133
134 local_max_streams_bidi: u64,
136 local_max_streams_bidi_next: u64,
137
138 local_max_streams_uni: u64,
140 local_max_streams_uni_next: u64,
141
142 local_opened_streams_bidi: u64,
144
145 local_opened_streams_uni: u64,
147
148 flushable: RBTree<StreamFlushablePriorityAdapter>,
152
153 pub readable: RBTree<StreamReadablePriorityAdapter>,
157
158 pub writable: RBTree<StreamWritablePriorityAdapter>,
163
164 almost_full: StreamIdHashSet,
169
170 blocked: StreamIdHashMap<u64>,
174
175 reset: StreamIdHashMap<(u64, u64)>,
179
180 stopped: StreamIdHashMap<u64>,
184
185 max_stream_window: u64,
187}
188
189impl<F: BufFactory> StreamMap<F> {
190 pub fn new(
191 max_streams_bidi: u64, max_streams_uni: u64, max_stream_window: u64,
192 ) -> Self {
193 StreamMap {
194 local_max_streams_bidi: max_streams_bidi,
195 local_max_streams_bidi_next: max_streams_bidi,
196
197 local_max_streams_uni: max_streams_uni,
198 local_max_streams_uni_next: max_streams_uni,
199
200 max_stream_window,
201
202 ..StreamMap::default()
203 }
204 }
205
206 pub fn get(&self, id: u64) -> Option<&Stream<F>> {
208 self.streams.get(&id)
209 }
210
211 pub fn get_mut(&mut self, id: u64) -> Option<&mut Stream<F>> {
213 self.streams.get_mut(&id)
214 }
215
216 pub(crate) fn get_or_create(
229 &mut self, id: u64, local_params: &crate::TransportParams,
230 peer_params: &crate::TransportParams, local: bool, is_server: bool,
231 ) -> Result<&mut Stream<F>> {
232 let (stream, is_new_and_writable) = match self.streams.entry(id) {
233 hash_map::Entry::Vacant(v) => {
234 if self.collected.contains(&id) {
236 return Err(Error::Done);
237 }
238
239 if local != is_local(id, is_server) {
240 return Err(Error::InvalidStreamState(id));
241 }
242
243 let (max_rx_data, max_tx_data) = match (local, is_bidi(id)) {
244 (true, true) => (
246 local_params.initial_max_stream_data_bidi_local,
247 peer_params.initial_max_stream_data_bidi_remote,
248 ),
249
250 (true, false) => (0, peer_params.initial_max_stream_data_uni),
252
253 (false, true) => (
255 local_params.initial_max_stream_data_bidi_remote,
256 peer_params.initial_max_stream_data_bidi_local,
257 ),
258
259 (false, false) =>
261 (local_params.initial_max_stream_data_uni, 0),
262 };
263
264 let stream_sequence = id >> 2;
268
269 match (is_local(id, is_server), is_bidi(id)) {
271 (true, true) => {
272 let n = cmp::max(
273 self.local_opened_streams_bidi,
274 stream_sequence + 1,
275 );
276
277 if n > self.peer_max_streams_bidi {
278 return Err(Error::StreamLimit);
279 }
280
281 self.local_opened_streams_bidi = n;
282 },
283
284 (true, false) => {
285 let n = cmp::max(
286 self.local_opened_streams_uni,
287 stream_sequence + 1,
288 );
289
290 if n > self.peer_max_streams_uni {
291 return Err(Error::StreamLimit);
292 }
293
294 self.local_opened_streams_uni = n;
295 },
296
297 (false, true) => {
298 let n = cmp::max(
299 self.peer_opened_streams_bidi,
300 stream_sequence + 1,
301 );
302
303 if n > self.local_max_streams_bidi {
304 return Err(Error::StreamLimit);
305 }
306
307 self.peer_opened_streams_bidi = n;
308 },
309
310 (false, false) => {
311 let n = cmp::max(
312 self.peer_opened_streams_uni,
313 stream_sequence + 1,
314 );
315
316 if n > self.local_max_streams_uni {
317 return Err(Error::StreamLimit);
318 }
319
320 self.peer_opened_streams_uni = n;
321 },
322 };
323
324 let s = Stream::new(
325 id,
326 max_rx_data,
327 max_tx_data,
328 is_bidi(id),
329 local,
330 self.max_stream_window,
331 );
332
333 let is_writable = s.is_writable();
334
335 (v.insert(s), is_writable)
336 },
337
338 hash_map::Entry::Occupied(v) => (v.into_mut(), false),
339 };
340
341 if is_new_and_writable {
344 self.writable.insert(Arc::clone(&stream.priority_key));
345 }
346
347 Ok(stream)
348 }
349
350 pub fn insert_readable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
354 if !priority_key.readable.is_linked() {
355 self.readable.insert(Arc::clone(priority_key));
356 }
357 }
358
359 pub fn remove_readable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
361 if !priority_key.readable.is_linked() {
362 return;
363 }
364
365 let mut c = {
366 let ptr = Arc::as_ptr(priority_key);
367 unsafe { self.readable.cursor_mut_from_ptr(ptr) }
368 };
369
370 c.remove();
371 }
372
373 pub fn insert_writable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
380 if !priority_key.writable.is_linked() {
381 self.writable.insert(Arc::clone(priority_key));
382 }
383 }
384
385 pub fn remove_writable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
390 if !priority_key.writable.is_linked() {
391 return;
392 }
393
394 let mut c = {
395 let ptr = Arc::as_ptr(priority_key);
396 unsafe { self.writable.cursor_mut_from_ptr(ptr) }
397 };
398
399 c.remove();
400 }
401
402 pub fn insert_flushable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
406 if !priority_key.flushable.is_linked() {
407 self.flushable.insert(Arc::clone(priority_key));
408 }
409 }
410
411 pub fn remove_flushable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
413 if !priority_key.flushable.is_linked() {
414 return;
415 }
416
417 let mut c = {
418 let ptr = Arc::as_ptr(priority_key);
419 unsafe { self.flushable.cursor_mut_from_ptr(ptr) }
420 };
421
422 c.remove();
423 }
424
425 pub fn peek_flushable(&self) -> Option<Arc<StreamPriorityKey>> {
426 self.flushable.front().clone_pointer()
427 }
428
429 pub fn update_priority(
431 &mut self, old: &Arc<StreamPriorityKey>, new: &Arc<StreamPriorityKey>,
432 ) {
433 if old.readable.is_linked() {
434 self.remove_readable(old);
435 self.readable.insert(Arc::clone(new));
436 }
437
438 if old.writable.is_linked() {
439 self.remove_writable(old);
440 self.writable.insert(Arc::clone(new));
441 }
442
443 if old.flushable.is_linked() {
444 self.remove_flushable(old);
445 self.flushable.insert(Arc::clone(new));
446 }
447 }
448
449 pub fn insert_almost_full(&mut self, stream_id: u64) {
453 self.almost_full.insert(stream_id);
454 }
455
456 pub fn remove_almost_full(&mut self, stream_id: u64) {
458 self.almost_full.remove(&stream_id);
459 }
460
461 pub fn insert_blocked(&mut self, stream_id: u64, off: u64) {
466 self.blocked.insert(stream_id, off);
467 }
468
469 pub fn remove_blocked(&mut self, stream_id: u64) {
471 self.blocked.remove(&stream_id);
472 }
473
474 pub fn insert_reset(
479 &mut self, stream_id: u64, error_code: u64, final_size: u64,
480 ) {
481 self.reset.insert(stream_id, (error_code, final_size));
482 }
483
484 pub fn remove_reset(&mut self, stream_id: u64) {
486 self.reset.remove(&stream_id);
487 }
488
489 pub fn insert_stopped(&mut self, stream_id: u64, error_code: u64) {
494 self.stopped.insert(stream_id, error_code);
495 }
496
497 pub fn remove_stopped(&mut self, stream_id: u64) {
499 self.stopped.remove(&stream_id);
500 }
501
502 pub fn update_peer_max_streams_bidi(&mut self, v: u64) {
504 self.peer_max_streams_bidi = cmp::max(self.peer_max_streams_bidi, v);
505 }
506
507 pub fn update_peer_max_streams_uni(&mut self, v: u64) {
509 self.peer_max_streams_uni = cmp::max(self.peer_max_streams_uni, v);
510 }
511
512 pub fn update_max_streams_bidi(&mut self) {
514 self.local_max_streams_bidi = self.local_max_streams_bidi_next;
515 }
516
517 pub fn set_max_streams_bidi(&mut self, max: u64) {
519 self.local_max_streams_bidi = max;
520 self.local_max_streams_bidi_next = max;
521 }
522
523 pub fn max_streams_bidi(&self) -> u64 {
525 self.local_max_streams_bidi
526 }
527
528 pub fn max_streams_bidi_next(&mut self) -> u64 {
530 self.local_max_streams_bidi_next
531 }
532
533 pub fn update_max_streams_uni(&mut self) {
535 self.local_max_streams_uni = self.local_max_streams_uni_next;
536 }
537
538 pub fn max_streams_uni_next(&mut self) -> u64 {
540 self.local_max_streams_uni_next
541 }
542
543 pub fn peer_streams_left_bidi(&self) -> u64 {
546 self.peer_max_streams_bidi - self.local_opened_streams_bidi
547 }
548
549 pub fn peer_streams_left_uni(&self) -> u64 {
552 self.peer_max_streams_uni - self.local_opened_streams_uni
553 }
554
555 pub fn collect(&mut self, stream_id: u64, local: bool) {
560 if !local {
561 if is_bidi(stream_id) {
564 self.local_max_streams_bidi_next =
565 self.local_max_streams_bidi_next.saturating_add(1);
566 } else {
567 self.local_max_streams_uni_next =
568 self.local_max_streams_uni_next.saturating_add(1);
569 }
570 }
571
572 let s = self.streams.remove(&stream_id).unwrap();
573
574 self.remove_readable(&s.priority_key);
575
576 self.remove_writable(&s.priority_key);
577
578 self.remove_flushable(&s.priority_key);
579
580 self.collected.insert(stream_id);
581 }
582
583 pub fn readable(&self) -> StreamIter {
585 StreamIter {
586 streams: self.readable.iter().map(|s| s.id).collect(),
587 index: 0,
588 }
589 }
590
591 pub fn writable(&self) -> StreamIter {
593 StreamIter {
594 streams: self.writable.iter().map(|s| s.id).collect(),
595 index: 0,
596 }
597 }
598
599 pub fn almost_full(&self) -> StreamIter {
601 StreamIter::from(&self.almost_full)
602 }
603
604 pub fn blocked(&self) -> hash_map::Iter<'_, u64, u64> {
606 self.blocked.iter()
607 }
608
609 pub fn reset(&self) -> hash_map::Iter<'_, u64, (u64, u64)> {
611 self.reset.iter()
612 }
613
614 pub fn stopped(&self) -> hash_map::Iter<'_, u64, u64> {
616 self.stopped.iter()
617 }
618
619 pub fn is_collected(&self, stream_id: u64) -> bool {
621 self.collected.contains(&stream_id)
622 }
623
624 pub fn has_flushable(&self) -> bool {
626 !self.flushable.is_empty()
627 }
628
629 pub fn has_readable(&self) -> bool {
631 !self.readable.is_empty()
632 }
633
634 pub fn has_almost_full(&self) -> bool {
637 !self.almost_full.is_empty()
638 }
639
640 pub fn has_blocked(&self) -> bool {
642 !self.blocked.is_empty()
643 }
644
645 pub fn has_reset(&self) -> bool {
647 !self.reset.is_empty()
648 }
649
650 pub fn has_stopped(&self) -> bool {
652 !self.stopped.is_empty()
653 }
654
655 pub fn should_update_max_streams_bidi(&self) -> bool {
658 self.local_max_streams_bidi_next != self.local_max_streams_bidi &&
659 self.local_max_streams_bidi_next / 2 >
660 self.local_max_streams_bidi - self.peer_opened_streams_bidi
661 }
662
663 pub fn should_update_max_streams_uni(&self) -> bool {
666 self.local_max_streams_uni_next != self.local_max_streams_uni &&
667 self.local_max_streams_uni_next / 2 >
668 self.local_max_streams_uni - self.peer_opened_streams_uni
669 }
670
671 #[cfg(test)]
673 pub fn len(&self) -> usize {
674 self.streams.len()
675 }
676}
677
678pub struct Stream<F: BufFactory = DefaultBufFactory> {
680 pub recv: recv_buf::RecvBuf,
682
683 pub send: send_buf::SendBuf<F>,
685
686 pub send_lowat: usize,
687
688 pub bidi: bool,
690
691 pub local: bool,
693
694 pub urgency: u8,
696
697 pub incremental: bool,
699
700 pub priority_key: Arc<StreamPriorityKey>,
701}
702
703impl<F: BufFactory> Stream<F> {
704 pub fn new(
706 id: u64, max_rx_data: u64, max_tx_data: u64, bidi: bool, local: bool,
707 max_window: u64,
708 ) -> Self {
709 let priority_key = Arc::new(StreamPriorityKey {
710 id,
711 ..Default::default()
712 });
713
714 Stream {
715 recv: recv_buf::RecvBuf::new(max_rx_data, max_window),
716 send: send_buf::SendBuf::new(max_tx_data),
717 send_lowat: 1,
718 bidi,
719 local,
720 urgency: priority_key.urgency,
721 incremental: priority_key.incremental,
722 priority_key,
723 }
724 }
725
726 pub fn is_readable(&self) -> bool {
728 self.recv.ready()
729 }
730
731 pub fn is_writable(&self) -> bool {
734 !self.send.is_shutdown() &&
735 !self.send.is_fin() &&
736 (self.send.off_back() + self.send_lowat as u64) <
737 self.send.max_off()
738 }
739
740 pub fn is_flushable(&self) -> bool {
743 let off_front = self.send.off_front();
744
745 !self.send.is_empty() &&
746 off_front < self.send.off_back() &&
747 off_front < self.send.max_off()
748 }
749
750 pub fn is_complete(&self) -> bool {
760 match (self.bidi, self.local) {
761 (true, _) => self.recv.is_fin() && self.send.is_complete(),
764
765 (false, true) => self.send.is_complete(),
768
769 (false, false) => self.recv.is_fin(),
772 }
773 }
774}
775
776pub fn is_local(stream_id: u64, is_server: bool) -> bool {
778 (stream_id & 0x1) == (is_server as u64)
779}
780
781pub fn is_bidi(stream_id: u64) -> bool {
783 (stream_id & 0x2) == 0
784}
785
786#[derive(Clone, Debug)]
787pub struct StreamPriorityKey {
788 pub urgency: u8,
789 pub incremental: bool,
790 pub id: u64,
791
792 pub readable: RBTreeAtomicLink,
793 pub writable: RBTreeAtomicLink,
794 pub flushable: RBTreeAtomicLink,
795}
796
797impl Default for StreamPriorityKey {
798 fn default() -> Self {
799 Self {
800 urgency: DEFAULT_URGENCY,
801 incremental: true,
802 id: Default::default(),
803 readable: Default::default(),
804 writable: Default::default(),
805 flushable: Default::default(),
806 }
807 }
808}
809
810impl PartialEq for StreamPriorityKey {
811 fn eq(&self, other: &Self) -> bool {
812 self.id == other.id
813 }
814}
815
816impl Eq for StreamPriorityKey {}
817
818impl PartialOrd for StreamPriorityKey {
819 fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
820 Some(self.cmp(other))
821 }
822}
823
824impl Ord for StreamPriorityKey {
825 fn cmp(&self, other: &Self) -> cmp::Ordering {
826 if self.id == other.id {
828 return cmp::Ordering::Equal;
829 }
830
831 if self.urgency != other.urgency {
833 return self.urgency.cmp(&other.urgency);
834 }
835
836 if !self.incremental && !other.incremental {
839 return self.id.cmp(&other.id);
840 }
841
842 if self.incremental && !other.incremental {
844 return cmp::Ordering::Greater;
845 }
846 if !self.incremental && other.incremental {
847 return cmp::Ordering::Less;
848 }
849
850 cmp::Ordering::Greater
854 }
855}
856
857intrusive_adapter!(pub StreamWritablePriorityAdapter = Arc<StreamPriorityKey>: StreamPriorityKey { writable: RBTreeAtomicLink });
858
859impl KeyAdapter<'_> for StreamWritablePriorityAdapter {
860 type Key = StreamPriorityKey;
861
862 fn get_key(&self, s: &StreamPriorityKey) -> Self::Key {
863 s.clone()
864 }
865}
866
867intrusive_adapter!(pub StreamReadablePriorityAdapter = Arc<StreamPriorityKey>: StreamPriorityKey { readable: RBTreeAtomicLink });
868
869impl KeyAdapter<'_> for StreamReadablePriorityAdapter {
870 type Key = StreamPriorityKey;
871
872 fn get_key(&self, s: &StreamPriorityKey) -> Self::Key {
873 s.clone()
874 }
875}
876
877intrusive_adapter!(pub StreamFlushablePriorityAdapter = Arc<StreamPriorityKey>: StreamPriorityKey { flushable: RBTreeAtomicLink });
878
879impl KeyAdapter<'_> for StreamFlushablePriorityAdapter {
880 type Key = StreamPriorityKey;
881
882 fn get_key(&self, s: &StreamPriorityKey) -> Self::Key {
883 s.clone()
884 }
885}
886
887#[derive(Default)]
889pub struct StreamIter {
890 streams: SmallVec<[u64; 8]>,
891 index: usize,
892}
893
894impl StreamIter {
895 #[inline]
896 fn from(streams: &StreamIdHashSet) -> Self {
897 StreamIter {
898 streams: streams.iter().copied().collect(),
899 index: 0,
900 }
901 }
902}
903
904impl Iterator for StreamIter {
905 type Item = u64;
906
907 #[inline]
908 fn next(&mut self) -> Option<Self::Item> {
909 let v = self.streams.get(self.index)?;
910 self.index += 1;
911 Some(*v)
912 }
913}
914
915impl ExactSizeIterator for StreamIter {
916 #[inline]
917 fn len(&self) -> usize {
918 self.streams.len() - self.index
919 }
920}
921
922#[cfg(test)]
923mod tests {
924 use crate::range_buf::RangeBuf;
925
926 use super::*;
927
928 #[test]
929 fn recv_flow_control() {
930 let mut stream =
931 <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
932 assert!(!stream.recv.almost_full());
933
934 let mut buf = [0; 32];
935
936 let first = RangeBuf::from(b"hello", 0, false);
937 let second = RangeBuf::from(b"world", 5, false);
938 let third = RangeBuf::from(b"something", 10, false);
939
940 assert_eq!(stream.recv.write(second), Ok(()));
941 assert_eq!(stream.recv.write(first), Ok(()));
942 assert!(!stream.recv.almost_full());
943
944 assert_eq!(stream.recv.write(third), Err(Error::FlowControl));
945
946 let (len, fin) = stream.recv.emit(&mut buf).unwrap();
947 assert_eq!(&buf[..len], b"helloworld");
948 assert!(!fin);
949
950 assert!(stream.recv.almost_full());
951
952 stream.recv.update_max_data(std::time::Instant::now());
953 assert_eq!(stream.recv.max_data_next(), 25);
954 assert!(!stream.recv.almost_full());
955
956 let third = RangeBuf::from(b"something", 10, false);
957 assert_eq!(stream.recv.write(third), Ok(()));
958 }
959
960 #[test]
961 fn recv_past_fin() {
962 let mut stream =
963 <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
964 assert!(!stream.recv.almost_full());
965
966 let first = RangeBuf::from(b"hello", 0, true);
967 let second = RangeBuf::from(b"world", 5, false);
968
969 assert_eq!(stream.recv.write(first), Ok(()));
970 assert_eq!(stream.recv.write(second), Err(Error::FinalSize));
971 }
972
973 #[test]
974 fn recv_fin_dup() {
975 let mut stream =
976 <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
977 assert!(!stream.recv.almost_full());
978
979 let first = RangeBuf::from(b"hello", 0, true);
980 let second = RangeBuf::from(b"hello", 0, true);
981
982 assert_eq!(stream.recv.write(first), Ok(()));
983 assert_eq!(stream.recv.write(second), Ok(()));
984
985 let mut buf = [0; 32];
986
987 let (len, fin) = stream.recv.emit(&mut buf).unwrap();
988 assert_eq!(&buf[..len], b"hello");
989 assert!(fin);
990 }
991
992 #[test]
993 fn recv_fin_change() {
994 let mut stream =
995 <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
996 assert!(!stream.recv.almost_full());
997
998 let first = RangeBuf::from(b"hello", 0, true);
999 let second = RangeBuf::from(b"world", 5, true);
1000
1001 assert_eq!(stream.recv.write(second), Ok(()));
1002 assert_eq!(stream.recv.write(first), Err(Error::FinalSize));
1003 }
1004
1005 #[test]
1006 fn recv_fin_lower_than_received() {
1007 let mut stream =
1008 <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1009 assert!(!stream.recv.almost_full());
1010
1011 let first = RangeBuf::from(b"hello", 0, true);
1012 let second = RangeBuf::from(b"world", 5, false);
1013
1014 assert_eq!(stream.recv.write(second), Ok(()));
1015 assert_eq!(stream.recv.write(first), Err(Error::FinalSize));
1016 }
1017
1018 #[test]
1019 fn recv_fin_flow_control() {
1020 let mut stream =
1021 <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1022 assert!(!stream.recv.almost_full());
1023
1024 let mut buf = [0; 32];
1025
1026 let first = RangeBuf::from(b"hello", 0, false);
1027 let second = RangeBuf::from(b"world", 5, true);
1028
1029 assert_eq!(stream.recv.write(first), Ok(()));
1030 assert_eq!(stream.recv.write(second), Ok(()));
1031
1032 let (len, fin) = stream.recv.emit(&mut buf).unwrap();
1033 assert_eq!(&buf[..len], b"helloworld");
1034 assert!(fin);
1035
1036 assert!(!stream.recv.almost_full());
1037 }
1038
1039 #[test]
1040 fn recv_fin_reset_mismatch() {
1041 let mut stream =
1042 <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1043 assert!(!stream.recv.almost_full());
1044
1045 let first = RangeBuf::from(b"hello", 0, true);
1046
1047 assert_eq!(stream.recv.write(first), Ok(()));
1048 assert_eq!(stream.recv.reset(0, 10), Err(Error::FinalSize));
1049 }
1050
1051 #[test]
1052 fn recv_reset_with_gap() {
1053 let mut stream =
1054 <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1055 assert!(!stream.recv.almost_full());
1056
1057 let first = RangeBuf::from(b"hello", 0, false);
1058
1059 assert_eq!(stream.recv.write(first), Ok(()));
1060 assert_eq!(stream.recv.emit(&mut [0; 1]), Ok((1, false)));
1062 assert_eq!(
1064 stream.recv.reset(0, 10),
1065 Ok(RecvBufResetReturn {
1066 max_data_delta: 5,
1067 consumed_flowcontrol: 9
1069 })
1070 );
1071 assert_eq!(stream.recv.reset(0, 10), Ok(RecvBufResetReturn::zero()));
1072 }
1073
1074 #[test]
1075 fn recv_reset_dup() {
1076 let mut stream =
1077 <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1078 assert!(!stream.recv.almost_full());
1079
1080 let first = RangeBuf::from(b"hello", 0, false);
1081
1082 assert_eq!(stream.recv.write(first), Ok(()));
1083 assert_eq!(
1084 stream.recv.reset(0, 5),
1085 Ok(RecvBufResetReturn {
1086 max_data_delta: 0,
1087 consumed_flowcontrol: 5
1088 })
1089 );
1090 assert_eq!(stream.recv.reset(0, 5), Ok(RecvBufResetReturn::zero()));
1091 }
1092
1093 #[test]
1094 fn recv_reset_change() {
1095 let mut stream =
1096 <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1097 assert!(!stream.recv.almost_full());
1098
1099 let first = RangeBuf::from(b"hello", 0, false);
1100
1101 assert_eq!(stream.recv.write(first), Ok(()));
1102 assert_eq!(
1103 stream.recv.reset(0, 5),
1104 Ok(RecvBufResetReturn {
1105 max_data_delta: 0,
1106 consumed_flowcontrol: 5
1107 })
1108 );
1109 assert_eq!(stream.recv.reset(0, 10), Err(Error::FinalSize));
1110 }
1111
1112 #[test]
1113 fn recv_reset_lower_than_received() {
1114 let mut stream =
1115 <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1116 assert!(!stream.recv.almost_full());
1117
1118 let first = RangeBuf::from(b"hello", 0, false);
1119
1120 assert_eq!(stream.recv.write(first), Ok(()));
1121 assert_eq!(stream.recv.reset(0, 4), Err(Error::FinalSize));
1122 }
1123
1124 #[test]
1125 fn send_flow_control() {
1126 let mut buf = [0; 25];
1127
1128 let mut stream =
1129 <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1130
1131 let first = b"hello";
1132 let second = b"world";
1133 let third = b"something";
1134
1135 assert!(stream.send.write(first, false).is_ok());
1136 assert!(stream.send.write(second, false).is_ok());
1137 assert!(stream.send.write(third, false).is_ok());
1138
1139 assert_eq!(stream.send.off_front(), 0);
1140
1141 let (written, fin) = stream.send.emit(&mut buf[..25]).unwrap();
1142 assert_eq!(written, 15);
1143 assert!(!fin);
1144 assert_eq!(&buf[..written], b"helloworldsomet");
1145
1146 assert_eq!(stream.send.off_front(), 15);
1147
1148 let (written, fin) = stream.send.emit(&mut buf[..25]).unwrap();
1149 assert_eq!(written, 0);
1150 assert!(!fin);
1151 assert_eq!(&buf[..written], b"");
1152
1153 stream.send.retransmit(0, 15);
1154
1155 assert_eq!(stream.send.off_front(), 0);
1156
1157 let (written, fin) = stream.send.emit(&mut buf[..10]).unwrap();
1158 assert_eq!(written, 10);
1159 assert!(!fin);
1160 assert_eq!(&buf[..written], b"helloworld");
1161
1162 assert_eq!(stream.send.off_front(), 10);
1163
1164 let (written, fin) = stream.send.emit(&mut buf[..10]).unwrap();
1165 assert_eq!(written, 5);
1166 assert!(!fin);
1167 assert_eq!(&buf[..written], b"somet");
1168 }
1169
1170 #[test]
1171 fn send_past_fin() {
1172 let mut stream =
1173 <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1174
1175 let first = b"hello";
1176 let second = b"world";
1177 let third = b"third";
1178
1179 assert_eq!(stream.send.write(first, false), Ok(5));
1180
1181 assert_eq!(stream.send.write(second, true), Ok(5));
1182 assert!(stream.send.is_fin());
1183
1184 assert_eq!(stream.send.write(third, false), Err(Error::FinalSize));
1185 }
1186
1187 #[test]
1188 fn send_fin_dup() {
1189 let mut stream =
1190 <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1191
1192 assert_eq!(stream.send.write(b"hello", true), Ok(5));
1193 assert!(stream.send.is_fin());
1194
1195 assert_eq!(stream.send.write(b"", true), Ok(0));
1196 assert!(stream.send.is_fin());
1197 }
1198
1199 #[test]
1200 fn send_undo_fin() {
1201 let mut stream =
1202 <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1203
1204 assert_eq!(stream.send.write(b"hello", true), Ok(5));
1205 assert!(stream.send.is_fin());
1206
1207 assert_eq!(
1208 stream.send.write(b"helloworld", true),
1209 Err(Error::FinalSize)
1210 );
1211 }
1212
1213 #[test]
1214 fn send_fin_max_data_match() {
1215 let mut buf = [0; 15];
1216
1217 let mut stream =
1218 <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1219
1220 let slice = b"hellohellohello";
1221
1222 assert!(stream.send.write(slice, true).is_ok());
1223
1224 let (written, fin) = stream.send.emit(&mut buf[..15]).unwrap();
1225 assert_eq!(written, 15);
1226 assert!(fin);
1227 assert_eq!(&buf[..written], slice);
1228 }
1229
1230 #[test]
1231 fn send_fin_zero_length() {
1232 let mut buf = [0; 5];
1233
1234 let mut stream =
1235 <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1236
1237 assert_eq!(stream.send.write(b"hello", false), Ok(5));
1238 assert_eq!(stream.send.write(b"", true), Ok(0));
1239 assert!(stream.send.is_fin());
1240
1241 let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1242 assert_eq!(written, 5);
1243 assert!(fin);
1244 assert_eq!(&buf[..written], b"hello");
1245 }
1246
1247 #[test]
1248 fn send_ack() {
1249 let mut buf = [0; 5];
1250
1251 let mut stream =
1252 <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1253
1254 assert_eq!(stream.send.write(b"hello", false), Ok(5));
1255 assert_eq!(stream.send.write(b"world", false), Ok(5));
1256 assert_eq!(stream.send.write(b"", true), Ok(0));
1257 assert!(stream.send.is_fin());
1258
1259 assert_eq!(stream.send.off_front(), 0);
1260
1261 let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1262 assert_eq!(written, 5);
1263 assert!(!fin);
1264 assert_eq!(&buf[..written], b"hello");
1265
1266 stream.send.ack_and_drop(0, 5);
1267
1268 stream.send.retransmit(0, 5);
1269
1270 assert_eq!(stream.send.off_front(), 5);
1271
1272 let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1273 assert_eq!(written, 5);
1274 assert!(fin);
1275 assert_eq!(&buf[..written], b"world");
1276 }
1277
1278 #[test]
1279 fn send_ack_reordering() {
1280 let mut buf = [0; 5];
1281
1282 let mut stream =
1283 <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1284
1285 assert_eq!(stream.send.write(b"hello", false), Ok(5));
1286 assert_eq!(stream.send.write(b"world", false), Ok(5));
1287 assert_eq!(stream.send.write(b"", true), Ok(0));
1288 assert!(stream.send.is_fin());
1289
1290 assert_eq!(stream.send.off_front(), 0);
1291
1292 let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1293 assert_eq!(written, 5);
1294 assert!(!fin);
1295 assert_eq!(&buf[..written], b"hello");
1296
1297 assert_eq!(stream.send.off_front(), 5);
1298
1299 let (written, fin) = stream.send.emit(&mut buf[..1]).unwrap();
1300 assert_eq!(written, 1);
1301 assert!(!fin);
1302 assert_eq!(&buf[..written], b"w");
1303
1304 stream.send.ack_and_drop(5, 1);
1305 stream.send.ack_and_drop(0, 5);
1306
1307 stream.send.retransmit(0, 5);
1308 stream.send.retransmit(5, 1);
1309
1310 assert_eq!(stream.send.off_front(), 6);
1311
1312 let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1313 assert_eq!(written, 4);
1314 assert!(fin);
1315 assert_eq!(&buf[..written], b"orld");
1316 }
1317
1318 #[test]
1319 fn recv_data_below_off() {
1320 let mut stream =
1321 <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1322
1323 let first = RangeBuf::from(b"hello", 0, false);
1324
1325 assert_eq!(stream.recv.write(first), Ok(()));
1326
1327 let mut buf = [0; 10];
1328
1329 let (len, fin) = stream.recv.emit(&mut buf).unwrap();
1330 assert_eq!(&buf[..len], b"hello");
1331 assert!(!fin);
1332
1333 let first = RangeBuf::from(b"elloworld", 1, true);
1334 assert_eq!(stream.recv.write(first), Ok(()));
1335
1336 let (len, fin) = stream.recv.emit(&mut buf).unwrap();
1337 assert_eq!(&buf[..len], b"world");
1338 assert!(fin);
1339 }
1340
1341 #[test]
1342 fn stream_complete() {
1343 let mut stream =
1344 <Stream>::new(0, 30, 30, true, true, DEFAULT_STREAM_WINDOW);
1345
1346 assert_eq!(stream.send.write(b"hello", false), Ok(5));
1347 assert_eq!(stream.send.write(b"world", false), Ok(5));
1348
1349 assert!(!stream.send.is_complete());
1350 assert!(!stream.send.is_fin());
1351
1352 assert_eq!(stream.send.write(b"", true), Ok(0));
1353
1354 assert!(!stream.send.is_complete());
1355 assert!(stream.send.is_fin());
1356
1357 let buf = RangeBuf::from(b"hello", 0, true);
1358 assert!(stream.recv.write(buf).is_ok());
1359 assert!(!stream.recv.is_fin());
1360
1361 stream.send.ack(6, 4);
1362 assert!(!stream.send.is_complete());
1363
1364 let mut buf = [0; 2];
1365 assert_eq!(stream.recv.emit(&mut buf), Ok((2, false)));
1366 assert!(!stream.recv.is_fin());
1367
1368 stream.send.ack(1, 5);
1369 assert!(!stream.send.is_complete());
1370
1371 stream.send.ack(0, 1);
1372 assert!(stream.send.is_complete());
1373
1374 assert!(!stream.is_complete());
1375
1376 let mut buf = [0; 3];
1377 assert_eq!(stream.recv.emit(&mut buf), Ok((3, true)));
1378 assert!(stream.recv.is_fin());
1379
1380 assert!(stream.is_complete());
1381 }
1382
1383 #[test]
1384 fn send_fin_zero_length_output() {
1385 let mut buf = [0; 5];
1386
1387 let mut stream =
1388 <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1389
1390 assert_eq!(stream.send.write(b"hello", false), Ok(5));
1391 assert_eq!(stream.send.off_front(), 0);
1392 assert!(!stream.send.is_fin());
1393
1394 let (written, fin) = stream.send.emit(&mut buf).unwrap();
1395 assert_eq!(written, 5);
1396 assert!(!fin);
1397 assert_eq!(&buf[..written], b"hello");
1398
1399 assert_eq!(stream.send.write(b"", true), Ok(0));
1400 assert!(stream.send.is_fin());
1401 assert_eq!(stream.send.off_front(), 5);
1402
1403 let (written, fin) = stream.send.emit(&mut buf).unwrap();
1404 assert_eq!(written, 0);
1405 assert!(fin);
1406 assert_eq!(&buf[..written], b"");
1407 }
1408
1409 fn stream_send_ready(stream: &Stream) -> bool {
1410 !stream.send.is_empty() &&
1411 stream.send.off_front() < stream.send.off_back()
1412 }
1413
1414 #[test]
1415 fn send_emit() {
1416 let mut buf = [0; 5];
1417
1418 let mut stream =
1419 <Stream>::new(0, 0, 20, true, true, DEFAULT_STREAM_WINDOW);
1420
1421 assert_eq!(stream.send.write(b"hello", false), Ok(5));
1422 assert_eq!(stream.send.write(b"world", false), Ok(5));
1423 assert_eq!(stream.send.write(b"olleh", false), Ok(5));
1424 assert_eq!(stream.send.write(b"dlrow", true), Ok(5));
1425 assert_eq!(stream.send.off_front(), 0);
1426 assert_eq!(stream.send.bufs_count(), 4);
1427
1428 assert!(stream.is_flushable());
1429
1430 assert!(stream_send_ready(&stream));
1431 assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1432 assert_eq!(stream.send.off_front(), 4);
1433 assert_eq!(&buf[..4], b"hell");
1434
1435 assert!(stream_send_ready(&stream));
1436 assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1437 assert_eq!(stream.send.off_front(), 8);
1438 assert_eq!(&buf[..4], b"owor");
1439
1440 assert!(stream_send_ready(&stream));
1441 assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1442 assert_eq!(stream.send.off_front(), 10);
1443 assert_eq!(&buf[..2], b"ld");
1444
1445 assert!(stream_send_ready(&stream));
1446 assert_eq!(stream.send.emit(&mut buf[..1]), Ok((1, false)));
1447 assert_eq!(stream.send.off_front(), 11);
1448 assert_eq!(&buf[..1], b"o");
1449
1450 assert!(stream_send_ready(&stream));
1451 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1452 assert_eq!(stream.send.off_front(), 16);
1453 assert_eq!(&buf[..5], b"llehd");
1454
1455 assert!(stream_send_ready(&stream));
1456 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((4, true)));
1457 assert_eq!(stream.send.off_front(), 20);
1458 assert_eq!(&buf[..4], b"lrow");
1459
1460 assert!(!stream.is_flushable());
1461
1462 assert!(!stream_send_ready(&stream));
1463 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((0, true)));
1464 assert_eq!(stream.send.off_front(), 20);
1465 }
1466
1467 #[test]
1468 fn send_emit_ack() {
1469 let mut buf = [0; 5];
1470
1471 let mut stream =
1472 <Stream>::new(0, 0, 20, true, true, DEFAULT_STREAM_WINDOW);
1473
1474 assert_eq!(stream.send.write(b"hello", false), Ok(5));
1475 assert_eq!(stream.send.write(b"world", false), Ok(5));
1476 assert_eq!(stream.send.write(b"olleh", false), Ok(5));
1477 assert_eq!(stream.send.write(b"dlrow", true), Ok(5));
1478 assert_eq!(stream.send.off_front(), 0);
1479 assert_eq!(stream.send.bufs_count(), 4);
1480
1481 assert!(stream.is_flushable());
1482
1483 assert!(stream_send_ready(&stream));
1484 assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1485 assert_eq!(stream.send.off_front(), 4);
1486 assert_eq!(&buf[..4], b"hell");
1487
1488 assert!(stream_send_ready(&stream));
1489 assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1490 assert_eq!(stream.send.off_front(), 8);
1491 assert_eq!(&buf[..4], b"owor");
1492
1493 stream.send.ack_and_drop(0, 5);
1494 assert_eq!(stream.send.bufs_count(), 3);
1495
1496 assert!(stream_send_ready(&stream));
1497 assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1498 assert_eq!(stream.send.off_front(), 10);
1499 assert_eq!(&buf[..2], b"ld");
1500
1501 stream.send.ack_and_drop(7, 5);
1502 assert_eq!(stream.send.bufs_count(), 3);
1503
1504 assert!(stream_send_ready(&stream));
1505 assert_eq!(stream.send.emit(&mut buf[..1]), Ok((1, false)));
1506 assert_eq!(stream.send.off_front(), 11);
1507 assert_eq!(&buf[..1], b"o");
1508
1509 assert!(stream_send_ready(&stream));
1510 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1511 assert_eq!(stream.send.off_front(), 16);
1512 assert_eq!(&buf[..5], b"llehd");
1513
1514 stream.send.ack_and_drop(5, 7);
1515 assert_eq!(stream.send.bufs_count(), 2);
1516
1517 assert!(stream_send_ready(&stream));
1518 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((4, true)));
1519 assert_eq!(stream.send.off_front(), 20);
1520 assert_eq!(&buf[..4], b"lrow");
1521
1522 assert!(!stream.is_flushable());
1523
1524 assert!(!stream_send_ready(&stream));
1525 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((0, true)));
1526 assert_eq!(stream.send.off_front(), 20);
1527
1528 stream.send.ack_and_drop(22, 4);
1529 assert_eq!(stream.send.bufs_count(), 2);
1530
1531 stream.send.ack_and_drop(20, 1);
1532 assert_eq!(stream.send.bufs_count(), 2);
1533 }
1534
1535 #[test]
1536 fn send_emit_retransmit() {
1537 let mut buf = [0; 5];
1538
1539 let mut stream =
1540 <Stream>::new(0, 0, 20, true, true, DEFAULT_STREAM_WINDOW);
1541
1542 assert_eq!(stream.send.write(b"hello", false), Ok(5));
1543 assert_eq!(stream.send.write(b"world", false), Ok(5));
1544 assert_eq!(stream.send.write(b"olleh", false), Ok(5));
1545 assert_eq!(stream.send.write(b"dlrow", true), Ok(5));
1546 assert_eq!(stream.send.off_front(), 0);
1547 assert_eq!(stream.send.bufs_count(), 4);
1548
1549 assert!(stream.is_flushable());
1550
1551 assert!(stream_send_ready(&stream));
1552 assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1553 assert_eq!(stream.send.off_front(), 4);
1554 assert_eq!(&buf[..4], b"hell");
1555
1556 assert!(stream_send_ready(&stream));
1557 assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1558 assert_eq!(stream.send.off_front(), 8);
1559 assert_eq!(&buf[..4], b"owor");
1560
1561 stream.send.retransmit(3, 3);
1562 assert_eq!(stream.send.off_front(), 3);
1563
1564 assert!(stream_send_ready(&stream));
1565 assert_eq!(stream.send.emit(&mut buf[..3]), Ok((3, false)));
1566 assert_eq!(stream.send.off_front(), 8);
1567 assert_eq!(&buf[..3], b"low");
1568
1569 assert!(stream_send_ready(&stream));
1570 assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1571 assert_eq!(stream.send.off_front(), 10);
1572 assert_eq!(&buf[..2], b"ld");
1573
1574 stream.send.ack_and_drop(7, 2);
1575
1576 stream.send.retransmit(8, 2);
1577
1578 assert!(stream_send_ready(&stream));
1579 assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1580 assert_eq!(stream.send.off_front(), 10);
1581 assert_eq!(&buf[..2], b"ld");
1582
1583 assert!(stream_send_ready(&stream));
1584 assert_eq!(stream.send.emit(&mut buf[..1]), Ok((1, false)));
1585 assert_eq!(stream.send.off_front(), 11);
1586 assert_eq!(&buf[..1], b"o");
1587
1588 assert!(stream_send_ready(&stream));
1589 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1590 assert_eq!(stream.send.off_front(), 16);
1591 assert_eq!(&buf[..5], b"llehd");
1592
1593 stream.send.retransmit(12, 2);
1594
1595 assert!(stream_send_ready(&stream));
1596 assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1597 assert_eq!(stream.send.off_front(), 16);
1598 assert_eq!(&buf[..2], b"le");
1599
1600 assert!(stream_send_ready(&stream));
1601 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((4, true)));
1602 assert_eq!(stream.send.off_front(), 20);
1603 assert_eq!(&buf[..4], b"lrow");
1604
1605 assert!(!stream.is_flushable());
1606
1607 assert!(!stream_send_ready(&stream));
1608 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((0, true)));
1609 assert_eq!(stream.send.off_front(), 20);
1610
1611 stream.send.retransmit(7, 12);
1612
1613 assert!(stream_send_ready(&stream));
1614 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1615 assert_eq!(stream.send.off_front(), 12);
1616 assert_eq!(&buf[..5], b"rldol");
1617
1618 assert!(stream_send_ready(&stream));
1619 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1620 assert_eq!(stream.send.off_front(), 17);
1621 assert_eq!(&buf[..5], b"lehdl");
1622
1623 assert!(stream_send_ready(&stream));
1624 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((2, false)));
1625 assert_eq!(stream.send.off_front(), 20);
1626 assert_eq!(&buf[..2], b"ro");
1627
1628 stream.send.ack_and_drop(12, 7);
1629
1630 stream.send.retransmit(7, 12);
1631
1632 assert!(stream_send_ready(&stream));
1633 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1634 assert_eq!(stream.send.off_front(), 12);
1635 assert_eq!(&buf[..5], b"rldol");
1636
1637 assert!(stream_send_ready(&stream));
1638 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1639 assert_eq!(stream.send.off_front(), 17);
1640 assert_eq!(&buf[..5], b"lehdl");
1641
1642 assert!(stream_send_ready(&stream));
1643 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((2, false)));
1644 assert_eq!(stream.send.off_front(), 20);
1645 assert_eq!(&buf[..2], b"ro");
1646 }
1647
1648 #[test]
1649 fn rangebuf_split_off() {
1650 let mut buf = <RangeBuf>::from(b"helloworld", 5, true);
1651 assert_eq!(buf.start, 0);
1652 assert_eq!(buf.pos, 0);
1653 assert_eq!(buf.len, 10);
1654 assert_eq!(buf.off, 5);
1655 assert!(buf.fin);
1656
1657 assert_eq!(buf.len(), 10);
1658 assert_eq!(buf.off(), 5);
1659 assert!(buf.fin());
1660
1661 assert_eq!(&buf[..], b"helloworld");
1662
1663 buf.consume(5);
1665
1666 assert_eq!(buf.start, 0);
1667 assert_eq!(buf.pos, 5);
1668 assert_eq!(buf.len, 10);
1669 assert_eq!(buf.off, 5);
1670 assert!(buf.fin);
1671
1672 assert_eq!(buf.len(), 5);
1673 assert_eq!(buf.off(), 10);
1674 assert!(buf.fin());
1675
1676 assert_eq!(&buf[..], b"world");
1677
1678 let mut new_buf = buf.split_off(3);
1680
1681 assert_eq!(buf.start, 0);
1682 assert_eq!(buf.pos, 3);
1683 assert_eq!(buf.len, 3);
1684 assert_eq!(buf.off, 5);
1685 assert!(!buf.fin);
1686
1687 assert_eq!(buf.len(), 0);
1688 assert_eq!(buf.off(), 8);
1689 assert!(!buf.fin());
1690
1691 assert_eq!(&buf[..], b"");
1692
1693 assert_eq!(new_buf.start, 3);
1694 assert_eq!(new_buf.pos, 5);
1695 assert_eq!(new_buf.len, 7);
1696 assert_eq!(new_buf.off, 8);
1697 assert!(new_buf.fin);
1698
1699 assert_eq!(new_buf.len(), 5);
1700 assert_eq!(new_buf.off(), 10);
1701 assert!(new_buf.fin());
1702
1703 assert_eq!(&new_buf[..], b"world");
1704
1705 new_buf.consume(2);
1707
1708 assert_eq!(new_buf.start, 3);
1709 assert_eq!(new_buf.pos, 7);
1710 assert_eq!(new_buf.len, 7);
1711 assert_eq!(new_buf.off, 8);
1712 assert!(new_buf.fin);
1713
1714 assert_eq!(new_buf.len(), 3);
1715 assert_eq!(new_buf.off(), 12);
1716 assert!(new_buf.fin());
1717
1718 assert_eq!(&new_buf[..], b"rld");
1719
1720 let mut new_new_buf = new_buf.split_off(5);
1722
1723 assert_eq!(new_buf.start, 3);
1724 assert_eq!(new_buf.pos, 7);
1725 assert_eq!(new_buf.len, 5);
1726 assert_eq!(new_buf.off, 8);
1727 assert!(!new_buf.fin);
1728
1729 assert_eq!(new_buf.len(), 1);
1730 assert_eq!(new_buf.off(), 12);
1731 assert!(!new_buf.fin());
1732
1733 assert_eq!(&new_buf[..], b"r");
1734
1735 assert_eq!(new_new_buf.start, 8);
1736 assert_eq!(new_new_buf.pos, 8);
1737 assert_eq!(new_new_buf.len, 2);
1738 assert_eq!(new_new_buf.off, 13);
1739 assert!(new_new_buf.fin);
1740
1741 assert_eq!(new_new_buf.len(), 2);
1742 assert_eq!(new_new_buf.off(), 13);
1743 assert!(new_new_buf.fin());
1744
1745 assert_eq!(&new_new_buf[..], b"ld");
1746
1747 new_new_buf.consume(2);
1749
1750 assert_eq!(new_new_buf.start, 8);
1751 assert_eq!(new_new_buf.pos, 10);
1752 assert_eq!(new_new_buf.len, 2);
1753 assert_eq!(new_new_buf.off, 13);
1754 assert!(new_new_buf.fin);
1755
1756 assert_eq!(new_new_buf.len(), 0);
1757 assert_eq!(new_new_buf.off(), 15);
1758 assert!(new_new_buf.fin());
1759
1760 assert_eq!(&new_new_buf[..], b"");
1761 }
1762
1763 #[test]
1766 fn stream_limit_auto_open() {
1767 let local_tp = crate::TransportParams::default();
1768 let peer_tp = crate::TransportParams::default();
1769
1770 let mut streams = <StreamMap>::new(5, 5, 5);
1771
1772 let stream_id = 500;
1773 assert!(!is_local(stream_id, true), "stream id is peer initiated");
1774 assert!(is_bidi(stream_id), "stream id is bidirectional");
1775 assert_eq!(
1776 streams
1777 .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
1778 .err(),
1779 Some(Error::StreamLimit),
1780 "stream limit should be exceeded"
1781 );
1782 }
1783
1784 #[test]
1787 fn stream_create_out_of_order() {
1788 let local_tp = crate::TransportParams::default();
1789 let peer_tp = crate::TransportParams::default();
1790
1791 let mut streams = <StreamMap>::new(5, 5, 5);
1792
1793 for stream_id in [8, 12, 4] {
1794 assert!(is_local(stream_id, false), "stream id is client initiated");
1795 assert!(is_bidi(stream_id), "stream id is bidirectional");
1796 assert!(streams
1797 .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
1798 .is_ok());
1799 }
1800 }
1801
1802 #[test]
1804 fn stream_limit_edge() {
1805 let local_tp = crate::TransportParams::default();
1806 let peer_tp = crate::TransportParams::default();
1807
1808 let mut streams = <StreamMap>::new(3, 3, 3);
1809
1810 let stream_id = 8;
1812 assert!(streams
1813 .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
1814 .is_ok());
1815
1816 let stream_id = 12;
1818 assert_eq!(
1819 streams
1820 .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
1821 .err(),
1822 Some(Error::StreamLimit)
1823 );
1824 }
1825
1826 fn cycle_stream_priority(stream_id: u64, streams: &mut StreamMap) {
1827 let key = streams.get(stream_id).unwrap().priority_key.clone();
1828 streams.update_priority(&key.clone(), &key);
1829 }
1830
1831 #[test]
1832 fn writable_prioritized_default_priority() {
1833 let local_tp = crate::TransportParams::default();
1834 let peer_tp = crate::TransportParams {
1835 initial_max_stream_data_bidi_local: 100,
1836 initial_max_stream_data_uni: 100,
1837 ..Default::default()
1838 };
1839
1840 let mut streams = StreamMap::new(100, 100, 100);
1841
1842 for id in [0, 4, 8, 12] {
1843 assert!(streams
1844 .get_or_create(id, &local_tp, &peer_tp, false, true)
1845 .is_ok());
1846 }
1847
1848 let walk_1: Vec<u64> = streams.writable().collect();
1849 cycle_stream_priority(*walk_1.first().unwrap(), &mut streams);
1850 let walk_2: Vec<u64> = streams.writable().collect();
1851 cycle_stream_priority(*walk_2.first().unwrap(), &mut streams);
1852 let walk_3: Vec<u64> = streams.writable().collect();
1853 cycle_stream_priority(*walk_3.first().unwrap(), &mut streams);
1854 let walk_4: Vec<u64> = streams.writable().collect();
1855 cycle_stream_priority(*walk_4.first().unwrap(), &mut streams);
1856 let walk_5: Vec<u64> = streams.writable().collect();
1857
1858 assert_eq!(walk_1, vec![0, 4, 8, 12]);
1861 assert_eq!(walk_2, vec![4, 8, 12, 0]);
1862 assert_eq!(walk_3, vec![8, 12, 0, 4]);
1863 assert_eq!(walk_4, vec![12, 0, 4, 8,]);
1864 assert_eq!(walk_5, vec![0, 4, 8, 12]);
1865 }
1866
1867 #[test]
1868 fn writable_prioritized_insert_order() {
1869 let local_tp = crate::TransportParams::default();
1870 let peer_tp = crate::TransportParams {
1871 initial_max_stream_data_bidi_local: 100,
1872 initial_max_stream_data_uni: 100,
1873 ..Default::default()
1874 };
1875
1876 let mut streams = StreamMap::new(100, 100, 100);
1877
1878 for id in [12, 4, 8, 0] {
1881 assert!(streams
1882 .get_or_create(id, &local_tp, &peer_tp, false, true)
1883 .is_ok());
1884 }
1885
1886 let walk_1: Vec<u64> = streams.writable().collect();
1887 cycle_stream_priority(*walk_1.first().unwrap(), &mut streams);
1888 let walk_2: Vec<u64> = streams.writable().collect();
1889 cycle_stream_priority(*walk_2.first().unwrap(), &mut streams);
1890 let walk_3: Vec<u64> = streams.writable().collect();
1891 cycle_stream_priority(*walk_3.first().unwrap(), &mut streams);
1892 let walk_4: Vec<u64> = streams.writable().collect();
1893 cycle_stream_priority(*walk_4.first().unwrap(), &mut streams);
1894 let walk_5: Vec<u64> = streams.writable().collect();
1895 assert_eq!(walk_1, vec![12, 4, 8, 0]);
1896 assert_eq!(walk_2, vec![4, 8, 0, 12]);
1897 assert_eq!(walk_3, vec![8, 0, 12, 4,]);
1898 assert_eq!(walk_4, vec![0, 12, 4, 8]);
1899 assert_eq!(walk_5, vec![12, 4, 8, 0]);
1900 }
1901
1902 #[test]
1903 fn writable_prioritized_mixed_urgency() {
1904 let local_tp = crate::TransportParams::default();
1905 let peer_tp = crate::TransportParams {
1906 initial_max_stream_data_bidi_local: 100,
1907 initial_max_stream_data_uni: 100,
1908 ..Default::default()
1909 };
1910
1911 let mut streams = <StreamMap>::new(100, 100, 100);
1912
1913 let input = vec![
1916 (0, 100),
1917 (4, 90),
1918 (8, 80),
1919 (12, 70),
1920 (16, 60),
1921 (20, 50),
1922 (24, 40),
1923 (28, 30),
1924 (32, 20),
1925 (36, 10),
1926 (40, 0),
1927 ];
1928
1929 for (id, urgency) in input.clone() {
1930 let stream = streams
1933 .get_or_create(id, &local_tp, &peer_tp, false, true)
1934 .unwrap();
1935
1936 stream.urgency = urgency;
1937
1938 let new_priority_key = Arc::new(StreamPriorityKey {
1939 urgency: stream.urgency,
1940 incremental: stream.incremental,
1941 id,
1942 ..Default::default()
1943 });
1944
1945 let old_priority_key = std::mem::replace(
1946 &mut stream.priority_key,
1947 new_priority_key.clone(),
1948 );
1949
1950 streams.update_priority(&old_priority_key, &new_priority_key);
1951 }
1952
1953 let walk_1: Vec<u64> = streams.writable().collect();
1954 assert_eq!(walk_1, vec![40, 36, 32, 28, 24, 20, 16, 12, 8, 4, 0]);
1955
1956 for (id, urgency) in input {
1958 let stream = streams
1961 .get_or_create(id, &local_tp, &peer_tp, false, true)
1962 .unwrap();
1963
1964 stream.urgency = urgency;
1965
1966 let new_priority_key = Arc::new(StreamPriorityKey {
1967 urgency: stream.urgency,
1968 incremental: stream.incremental,
1969 id,
1970 ..Default::default()
1971 });
1972
1973 let old_priority_key = std::mem::replace(
1974 &mut stream.priority_key,
1975 new_priority_key.clone(),
1976 );
1977
1978 streams.update_priority(&old_priority_key, &new_priority_key);
1979 }
1980
1981 let walk_2: Vec<u64> = streams.writable().collect();
1982 assert_eq!(walk_2, vec![40, 36, 32, 28, 24, 20, 16, 12, 8, 4, 0]);
1983
1984 streams.collect(24, true);
1986
1987 let walk_3: Vec<u64> = streams.writable().collect();
1988 assert_eq!(walk_3, vec![40, 36, 32, 28, 20, 16, 12, 8, 4, 0]);
1989
1990 streams.collect(40, true);
1991 streams.collect(0, true);
1992
1993 let walk_4: Vec<u64> = streams.writable().collect();
1994 assert_eq!(walk_4, vec![36, 32, 28, 20, 16, 12, 8, 4]);
1995
1996 streams
1998 .get_or_create(44, &local_tp, &peer_tp, false, true)
1999 .unwrap();
2000
2001 let walk_5: Vec<u64> = streams.writable().collect();
2002 assert_eq!(walk_5, vec![36, 32, 28, 20, 16, 12, 8, 4, 44]);
2003 }
2004
2005 #[test]
2006 fn writable_prioritized_mixed_urgencies_incrementals() {
2007 let local_tp = crate::TransportParams::default();
2008 let peer_tp = crate::TransportParams {
2009 initial_max_stream_data_bidi_local: 100,
2010 initial_max_stream_data_uni: 100,
2011 ..Default::default()
2012 };
2013
2014 let mut streams = StreamMap::new(100, 100, 100);
2015
2016 let input = vec![
2018 (0, 100),
2019 (4, 20),
2020 (8, 100),
2021 (12, 20),
2022 (16, 90),
2023 (20, 25),
2024 (24, 90),
2025 (28, 30),
2026 (32, 80),
2027 (36, 20),
2028 (40, 0),
2029 ];
2030
2031 for (id, urgency) in input.clone() {
2032 let stream = streams
2035 .get_or_create(id, &local_tp, &peer_tp, false, true)
2036 .unwrap();
2037
2038 stream.urgency = urgency;
2039
2040 let new_priority_key = Arc::new(StreamPriorityKey {
2041 urgency: stream.urgency,
2042 incremental: stream.incremental,
2043 id,
2044 ..Default::default()
2045 });
2046
2047 let old_priority_key = std::mem::replace(
2048 &mut stream.priority_key,
2049 new_priority_key.clone(),
2050 );
2051
2052 streams.update_priority(&old_priority_key, &new_priority_key);
2053 }
2054
2055 let walk_1: Vec<u64> = streams.writable().collect();
2056 cycle_stream_priority(4, &mut streams);
2057 cycle_stream_priority(16, &mut streams);
2058 cycle_stream_priority(0, &mut streams);
2059 let walk_2: Vec<u64> = streams.writable().collect();
2060 cycle_stream_priority(12, &mut streams);
2061 cycle_stream_priority(24, &mut streams);
2062 cycle_stream_priority(8, &mut streams);
2063 let walk_3: Vec<u64> = streams.writable().collect();
2064 cycle_stream_priority(36, &mut streams);
2065 cycle_stream_priority(16, &mut streams);
2066 cycle_stream_priority(0, &mut streams);
2067 let walk_4: Vec<u64> = streams.writable().collect();
2068 cycle_stream_priority(4, &mut streams);
2069 cycle_stream_priority(24, &mut streams);
2070 cycle_stream_priority(8, &mut streams);
2071 let walk_5: Vec<u64> = streams.writable().collect();
2072 cycle_stream_priority(12, &mut streams);
2073 cycle_stream_priority(16, &mut streams);
2074 cycle_stream_priority(0, &mut streams);
2075 let walk_6: Vec<u64> = streams.writable().collect();
2076 cycle_stream_priority(36, &mut streams);
2077 cycle_stream_priority(24, &mut streams);
2078 cycle_stream_priority(8, &mut streams);
2079 let walk_7: Vec<u64> = streams.writable().collect();
2080 cycle_stream_priority(4, &mut streams);
2081 cycle_stream_priority(16, &mut streams);
2082 cycle_stream_priority(0, &mut streams);
2083 let walk_8: Vec<u64> = streams.writable().collect();
2084 cycle_stream_priority(12, &mut streams);
2085 cycle_stream_priority(24, &mut streams);
2086 cycle_stream_priority(8, &mut streams);
2087 let walk_9: Vec<u64> = streams.writable().collect();
2088 cycle_stream_priority(36, &mut streams);
2089 cycle_stream_priority(16, &mut streams);
2090 cycle_stream_priority(0, &mut streams);
2091
2092 assert_eq!(walk_1, vec![40, 4, 12, 36, 20, 28, 32, 16, 24, 0, 8]);
2093 assert_eq!(walk_2, vec![40, 12, 36, 4, 20, 28, 32, 24, 16, 8, 0]);
2094 assert_eq!(walk_3, vec![40, 36, 4, 12, 20, 28, 32, 16, 24, 0, 8]);
2095 assert_eq!(walk_4, vec![40, 4, 12, 36, 20, 28, 32, 24, 16, 8, 0]);
2096 assert_eq!(walk_5, vec![40, 12, 36, 4, 20, 28, 32, 16, 24, 0, 8]);
2097 assert_eq!(walk_6, vec![40, 36, 4, 12, 20, 28, 32, 24, 16, 8, 0]);
2098 assert_eq!(walk_7, vec![40, 4, 12, 36, 20, 28, 32, 16, 24, 0, 8]);
2099 assert_eq!(walk_8, vec![40, 12, 36, 4, 20, 28, 32, 24, 16, 8, 0]);
2100 assert_eq!(walk_9, vec![40, 36, 4, 12, 20, 28, 32, 16, 24, 0, 8]);
2101
2102 streams.collect(20, true);
2104
2105 let walk_10: Vec<u64> = streams.writable().collect();
2106 assert_eq!(walk_10, vec![40, 4, 12, 36, 28, 32, 24, 16, 8, 0]);
2107
2108 let stream = streams
2110 .get_or_create(44, &local_tp, &peer_tp, false, true)
2111 .unwrap();
2112
2113 stream.urgency = 20;
2114 stream.incremental = true;
2115
2116 let new_priority_key = Arc::new(StreamPriorityKey {
2117 urgency: stream.urgency,
2118 incremental: stream.incremental,
2119 id: 44,
2120 ..Default::default()
2121 });
2122
2123 let old_priority_key =
2124 std::mem::replace(&mut stream.priority_key, new_priority_key.clone());
2125
2126 streams.update_priority(&old_priority_key, &new_priority_key);
2127
2128 let walk_11: Vec<u64> = streams.writable().collect();
2129 assert_eq!(walk_11, vec![40, 4, 12, 36, 44, 28, 32, 24, 16, 8, 0]);
2130 }
2131
2132 #[test]
2133 fn priority_tree_dupes() {
2134 let mut prioritized_writable: RBTree<StreamWritablePriorityAdapter> =
2135 Default::default();
2136
2137 for id in [0, 4, 8, 12] {
2138 let s = Arc::new(StreamPriorityKey {
2139 urgency: 0,
2140 incremental: false,
2141 id,
2142 ..Default::default()
2143 });
2144
2145 prioritized_writable.insert(s);
2146 }
2147
2148 let walk_1: Vec<u64> =
2149 prioritized_writable.iter().map(|s| s.id).collect();
2150 assert_eq!(walk_1, vec![0, 4, 8, 12]);
2151
2152 for id in [0, 4, 8, 12] {
2155 let s = Arc::new(StreamPriorityKey {
2156 urgency: 0,
2157 incremental: false,
2158 id,
2159 ..Default::default()
2160 });
2161
2162 prioritized_writable.insert(s);
2163 }
2164
2165 let walk_2: Vec<u64> =
2166 prioritized_writable.iter().map(|s| s.id).collect();
2167 assert_eq!(walk_2, vec![0, 0, 4, 4, 8, 8, 12, 12]);
2168 }
2169}
2170
2171mod recv_buf;
2172mod send_buf;