1use std::cmp;
28
29use std::sync::Arc;
30
31use std::collections::hash_map;
32use std::collections::HashMap;
33use std::collections::HashSet;
34
35use intrusive_collections::intrusive_adapter;
36use intrusive_collections::KeyAdapter;
37use intrusive_collections::RBTree;
38use intrusive_collections::RBTreeAtomicLink;
39
40use smallvec::SmallVec;
41
42use crate::buffers::DefaultBufFactory;
43use crate::BufFactory;
44use crate::Error;
45use crate::Result;
46
47const DEFAULT_URGENCY: u8 = 127;
48
49pub(crate) const DEFAULT_STREAM_WINDOW: u64 = 32 * 1024;
51
52pub const MAX_STREAM_WINDOW: u64 = 16 * 1024 * 1024;
54
55#[derive(Default)]
60pub struct StreamIdHasher {
61 id: u64,
62}
63
64#[derive(Debug, PartialEq, Clone, Copy)]
66pub struct RecvBufResetReturn {
67 pub max_data_delta: u64,
70
71 pub consumed_flowcontrol: u64,
74}
75
76impl RecvBufResetReturn {
77 pub fn zero() -> Self {
78 Self {
79 max_data_delta: 0,
80 consumed_flowcontrol: 0,
81 }
82 }
83}
84
85pub enum RecvAction<T: bytes::BufMut> {
87 Emit { out: T },
89 Discard { len: usize },
91}
92
93impl std::hash::Hasher for StreamIdHasher {
94 #[inline]
95 fn finish(&self) -> u64 {
96 self.id
97 }
98
99 #[inline]
100 fn write_u64(&mut self, id: u64) {
101 self.id = id;
102 }
103
104 #[inline]
105 fn write(&mut self, _: &[u8]) {
106 unimplemented!()
109 }
110}
111
112type BuildStreamIdHasher = std::hash::BuildHasherDefault<StreamIdHasher>;
113
114pub type StreamIdHashMap<V> = HashMap<u64, V, BuildStreamIdHasher>;
115pub type StreamIdHashSet = HashSet<u64, BuildStreamIdHasher>;
116
117#[derive(Default)]
119pub struct StreamMap<F: BufFactory = DefaultBufFactory> {
120 streams: StreamIdHashMap<Stream<F>>,
122
123 collected: StreamIdHashSet,
129
130 peer_max_streams_bidi: u64,
132
133 peer_max_streams_uni: u64,
135
136 peer_opened_streams_bidi: u64,
138
139 peer_opened_streams_uni: u64,
141
142 local_max_streams_bidi: u64,
144 local_max_streams_bidi_next: u64,
145
146 initial_max_streams_bidi: u64,
148
149 local_max_streams_uni: u64,
151 local_max_streams_uni_next: u64,
152
153 initial_max_streams_uni: u64,
155
156 local_opened_streams_bidi: u64,
158
159 local_opened_streams_uni: u64,
161
162 flushable: RBTree<StreamFlushablePriorityAdapter>,
166
167 pub readable: RBTree<StreamReadablePriorityAdapter>,
171
172 pub writable: RBTree<StreamWritablePriorityAdapter>,
177
178 almost_full: StreamIdHashSet,
183
184 blocked: StreamIdHashMap<u64>,
188
189 reset: StreamIdHashMap<(u64, u64)>,
193
194 stopped: StreamIdHashMap<u64>,
198
199 max_stream_window: u64,
201
202 use_initial_max_data_as_flow_control_win: bool,
205
206 tx_buffered: usize,
208}
209
210impl<F: BufFactory> StreamMap<F> {
211 pub fn new(
212 max_streams_bidi: u64, max_streams_uni: u64, max_stream_window: u64,
213 ) -> Self {
214 StreamMap {
215 local_max_streams_bidi: max_streams_bidi,
216 local_max_streams_bidi_next: max_streams_bidi,
217 initial_max_streams_bidi: max_streams_bidi,
218
219 local_max_streams_uni: max_streams_uni,
220 local_max_streams_uni_next: max_streams_uni,
221 initial_max_streams_uni: max_streams_uni,
222
223 max_stream_window,
224
225 ..StreamMap::default()
226 }
227 }
228
229 pub fn get(&self, id: u64) -> Option<&Stream<F>> {
231 self.streams.get(&id)
232 }
233
234 pub fn get_mut(&mut self, id: u64) -> Option<&mut Stream<F>> {
236 self.streams.get_mut(&id)
237 }
238
239 pub(crate) fn get_or_create(
252 &mut self, id: u64, local_params: &crate::TransportParams,
253 peer_params: &crate::TransportParams, local: bool, is_server: bool,
254 ) -> Result<&mut Stream<F>> {
255 let (stream, is_new_and_writable) = match self.streams.entry(id) {
256 hash_map::Entry::Vacant(v) => {
257 if self.collected.contains(&id) {
259 return Err(Error::Done);
260 }
261
262 if local != is_local(id, is_server) {
263 return Err(Error::InvalidStreamState(id));
264 }
265
266 let (max_rx_data, max_tx_data) = match (local, is_bidi(id)) {
267 (true, true) => (
269 local_params.initial_max_stream_data_bidi_local,
270 peer_params.initial_max_stream_data_bidi_remote,
271 ),
272
273 (true, false) => (0, peer_params.initial_max_stream_data_uni),
275
276 (false, true) => (
278 local_params.initial_max_stream_data_bidi_remote,
279 peer_params.initial_max_stream_data_bidi_local,
280 ),
281
282 (false, false) =>
284 (local_params.initial_max_stream_data_uni, 0),
285 };
286
287 let stream_sequence = id >> 2;
291
292 match (is_local(id, is_server), is_bidi(id)) {
294 (true, true) => {
295 let n = cmp::max(
296 self.local_opened_streams_bidi,
297 stream_sequence + 1,
298 );
299
300 if n > self.peer_max_streams_bidi {
301 return Err(Error::StreamLimit);
302 }
303
304 self.local_opened_streams_bidi = n;
305 },
306
307 (true, false) => {
308 let n = cmp::max(
309 self.local_opened_streams_uni,
310 stream_sequence + 1,
311 );
312
313 if n > self.peer_max_streams_uni {
314 return Err(Error::StreamLimit);
315 }
316
317 self.local_opened_streams_uni = n;
318 },
319
320 (false, true) => {
321 let n = cmp::max(
322 self.peer_opened_streams_bidi,
323 stream_sequence + 1,
324 );
325
326 if n > self.local_max_streams_bidi {
327 return Err(Error::StreamLimit);
328 }
329
330 self.peer_opened_streams_bidi = n;
331 },
332
333 (false, false) => {
334 let n = cmp::max(
335 self.peer_opened_streams_uni,
336 stream_sequence + 1,
337 );
338
339 if n > self.local_max_streams_uni {
340 return Err(Error::StreamLimit);
341 }
342
343 self.peer_opened_streams_uni = n;
344 },
345 };
346
347 let initial_window =
348 if self.use_initial_max_data_as_flow_control_win {
349 max_rx_data
350 } else {
351 cmp::min(max_rx_data, DEFAULT_STREAM_WINDOW)
352 };
353 let s = Stream::new(
354 id,
355 max_rx_data,
356 max_tx_data,
357 local,
358 initial_window,
359 self.max_stream_window,
360 );
361
362 let is_writable = s.is_writable();
363
364 (v.insert(s), is_writable)
365 },
366
367 hash_map::Entry::Occupied(v) => (v.into_mut(), false),
368 };
369
370 if is_new_and_writable {
373 self.writable.insert(Arc::clone(&stream.priority_key));
374 }
375
376 Ok(stream)
377 }
378
379 pub fn insert_readable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
383 if !priority_key.readable.is_linked() {
384 self.readable.insert(Arc::clone(priority_key));
385 }
386 }
387
388 pub fn remove_readable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
390 if !priority_key.readable.is_linked() {
391 return;
392 }
393
394 let mut c = {
395 let ptr = Arc::as_ptr(priority_key);
396 unsafe { self.readable.cursor_mut_from_ptr(ptr) }
397 };
398
399 c.remove();
400 }
401
402 pub fn insert_writable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
409 if !priority_key.writable.is_linked() {
410 self.writable.insert(Arc::clone(priority_key));
411 }
412 }
413
414 pub fn remove_writable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
419 if !priority_key.writable.is_linked() {
420 return;
421 }
422
423 let mut c = {
424 let ptr = Arc::as_ptr(priority_key);
425 unsafe { self.writable.cursor_mut_from_ptr(ptr) }
426 };
427
428 c.remove();
429 }
430
431 pub fn insert_flushable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
435 if !priority_key.flushable.is_linked() {
436 self.flushable.insert(Arc::clone(priority_key));
437 }
438 }
439
440 pub fn remove_flushable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
442 if !priority_key.flushable.is_linked() {
443 return;
444 }
445
446 let mut c = {
447 let ptr = Arc::as_ptr(priority_key);
448 unsafe { self.flushable.cursor_mut_from_ptr(ptr) }
449 };
450
451 c.remove();
452 }
453
454 pub fn peek_flushable(&self) -> Option<Arc<StreamPriorityKey>> {
455 self.flushable.front().clone_pointer()
456 }
457
458 pub fn update_priority(
460 &mut self, old: &Arc<StreamPriorityKey>, new: &Arc<StreamPriorityKey>,
461 ) {
462 if old.readable.is_linked() {
463 self.remove_readable(old);
464 self.readable.insert(Arc::clone(new));
465 }
466
467 if old.writable.is_linked() {
468 self.remove_writable(old);
469 self.writable.insert(Arc::clone(new));
470 }
471
472 if old.flushable.is_linked() {
473 self.remove_flushable(old);
474 self.flushable.insert(Arc::clone(new));
475 }
476 }
477
478 pub fn insert_almost_full(&mut self, stream_id: u64) {
482 self.almost_full.insert(stream_id);
483 }
484
485 pub fn remove_almost_full(&mut self, stream_id: u64) {
487 self.almost_full.remove(&stream_id);
488 }
489
490 pub fn insert_blocked(&mut self, stream_id: u64, off: u64) {
495 self.blocked.insert(stream_id, off);
496 }
497
498 pub fn remove_blocked(&mut self, stream_id: u64) {
500 self.blocked.remove(&stream_id);
501 }
502
503 pub fn insert_reset(
508 &mut self, stream_id: u64, error_code: u64, final_size: u64,
509 ) {
510 self.reset.insert(stream_id, (error_code, final_size));
511 }
512
513 pub fn remove_reset(&mut self, stream_id: u64) {
515 self.reset.remove(&stream_id);
516 }
517
518 pub fn insert_stopped(&mut self, stream_id: u64, error_code: u64) {
523 self.stopped.insert(stream_id, error_code);
524 }
525
526 pub fn remove_stopped(&mut self, stream_id: u64) {
528 self.stopped.remove(&stream_id);
529 }
530
531 pub fn update_peer_max_streams_bidi(&mut self, v: u64) {
533 self.peer_max_streams_bidi = cmp::max(self.peer_max_streams_bidi, v);
534 }
535
536 pub fn update_peer_max_streams_uni(&mut self, v: u64) {
538 self.peer_max_streams_uni = cmp::max(self.peer_max_streams_uni, v);
539 }
540
541 pub fn update_max_streams_bidi(&mut self) {
543 self.local_max_streams_bidi = self.local_max_streams_bidi_next;
544 }
545
546 pub fn set_max_streams_bidi(&mut self, max: u64) {
548 self.local_max_streams_bidi = max;
549 self.local_max_streams_bidi_next = max;
550 self.initial_max_streams_bidi = max;
551 }
552
553 pub fn max_streams_bidi(&self) -> u64 {
555 self.local_max_streams_bidi
556 }
557
558 pub fn max_streams_bidi_next(&mut self) -> u64 {
560 self.local_max_streams_bidi_next
561 }
562
563 pub fn update_max_streams_uni(&mut self) {
565 self.local_max_streams_uni = self.local_max_streams_uni_next;
566 }
567
568 pub fn max_streams_uni_next(&mut self) -> u64 {
570 self.local_max_streams_uni_next
571 }
572
573 pub fn peer_max_streams_bidi(&self) -> u64 {
575 self.peer_max_streams_bidi
576 }
577
578 pub fn peer_streams_left_bidi(&self) -> u64 {
581 self.peer_max_streams_bidi - self.local_opened_streams_bidi
582 }
583
584 pub fn peer_max_streams_uni(&self) -> u64 {
586 self.peer_max_streams_uni
587 }
588
589 pub fn peer_streams_left_uni(&self) -> u64 {
592 self.peer_max_streams_uni - self.local_opened_streams_uni
593 }
594
595 pub fn collect(&mut self, stream_id: u64, local: bool) {
600 if !local {
601 if is_bidi(stream_id) {
604 self.local_max_streams_bidi_next =
605 self.local_max_streams_bidi_next.saturating_add(1);
606 } else {
607 self.local_max_streams_uni_next =
608 self.local_max_streams_uni_next.saturating_add(1);
609 }
610 }
611
612 let s = self.streams.remove(&stream_id).unwrap();
613
614 self.remove_readable(&s.priority_key);
615
616 self.remove_writable(&s.priority_key);
617
618 self.remove_flushable(&s.priority_key);
619
620 self.collected.insert(stream_id);
621 }
622
623 pub fn readable(&self) -> StreamIter {
625 StreamIter {
626 streams: self.readable.iter().map(|s| s.id).collect(),
627 index: 0,
628 }
629 }
630
631 pub fn writable(&self) -> StreamIter {
633 StreamIter {
634 streams: self.writable.iter().map(|s| s.id).collect(),
635 index: 0,
636 }
637 }
638
639 pub fn almost_full(&self) -> StreamIter {
641 StreamIter::from(&self.almost_full)
642 }
643
644 pub fn blocked(&self) -> hash_map::Iter<'_, u64, u64> {
646 self.blocked.iter()
647 }
648
649 pub fn reset(&self) -> hash_map::Iter<'_, u64, (u64, u64)> {
651 self.reset.iter()
652 }
653
654 pub fn stopped(&self) -> hash_map::Iter<'_, u64, u64> {
656 self.stopped.iter()
657 }
658
659 pub fn is_collected(&self, stream_id: u64) -> bool {
661 self.collected.contains(&stream_id)
662 }
663
664 pub fn has_flushable(&self) -> bool {
666 !self.flushable.is_empty()
667 }
668
669 pub fn has_readable(&self) -> bool {
671 !self.readable.is_empty()
672 }
673
674 pub fn has_almost_full(&self) -> bool {
677 !self.almost_full.is_empty()
678 }
679
680 pub fn has_blocked(&self) -> bool {
682 !self.blocked.is_empty()
683 }
684
685 pub fn has_reset(&self) -> bool {
687 !self.reset.is_empty()
688 }
689
690 pub fn has_stopped(&self) -> bool {
692 !self.stopped.is_empty()
693 }
694
695 pub fn should_update_max_streams_bidi(&self) -> bool {
701 let available = self
702 .local_max_streams_bidi
703 .saturating_sub(self.peer_opened_streams_bidi);
704 self.local_max_streams_bidi_next != self.local_max_streams_bidi &&
705 available <= self.initial_max_streams_bidi / 2
706 }
707
708 pub fn should_update_max_streams_uni(&self) -> bool {
714 let available = self
715 .local_max_streams_uni
716 .saturating_sub(self.peer_opened_streams_uni);
717 self.local_max_streams_uni_next != self.local_max_streams_uni &&
718 available <= self.initial_max_streams_uni / 2
719 }
720
721 #[cfg(test)]
723 pub fn len(&self) -> usize {
724 self.streams.len()
725 }
726
727 pub(crate) fn tx_buffered(&self) -> usize {
729 self.tx_buffered
730 }
731
732 fn tx_buffered_actual(&self) -> usize {
736 self.streams
737 .values()
738 .map(|s| s.send.buffered_bytes() as usize)
739 .sum()
740 }
741
742 pub(crate) fn tx_buffered_is_consistent(&self) -> bool {
745 self.tx_buffered == self.tx_buffered_actual()
746 }
747
748 pub(crate) fn add_tx_buffered(&mut self, delta: usize) {
750 self.tx_buffered += delta;
751
752 #[cfg(debug_assertions)]
753 self.debug_check_tx_buffered_consistency();
754 }
755
756 pub(crate) fn sub_tx_buffered(&mut self, delta: usize) {
758 debug_assert!(self.tx_buffered >= delta);
759 self.tx_buffered = self.tx_buffered.saturating_sub(delta);
760
761 #[cfg(debug_assertions)]
762 self.debug_check_tx_buffered_consistency();
763 }
764
765 #[cfg(debug_assertions)]
769 pub(crate) fn debug_check_tx_buffered_consistency(&self) {
770 if !self.tx_buffered_is_consistent() {
771 let buffered_per_stream = self
772 .streams
773 .iter()
774 .map(|(id, s)| (*id, s.send.buffered_bytes()))
775 .collect::<Vec<_>>();
776
777 let actual = self.tx_buffered_actual();
778 let stored = self.tx_buffered;
779 panic!(
780 "tx_buffered mismatch: stored={}, actual={}, diff={}, buffered_per_stream={:?}",
781 stored,
782 actual,
783 stored as i64 - actual as i64,
784 buffered_per_stream
785 );
786 }
787 }
788
789 pub(crate) fn set_use_initial_max_data_as_flow_control_win(
792 &mut self, v: bool,
793 ) {
794 self.use_initial_max_data_as_flow_control_win = v;
795 }
796}
797
798pub struct Stream<F: BufFactory = DefaultBufFactory> {
800 pub recv: recv_buf::RecvBuf,
802
803 pub send: send_buf::SendBuf<F>,
805
806 pub send_lowat: usize,
807
808 pub bidi: bool,
810
811 pub local: bool,
813
814 pub urgency: u8,
816
817 pub incremental: bool,
819
820 pub priority_key: Arc<StreamPriorityKey>,
821}
822
823impl<F: BufFactory> Stream<F> {
824 pub fn new(
826 id: u64, max_rx_data: u64, max_tx_data: u64, local: bool,
827 initial_window: u64, max_window: u64,
828 ) -> Self {
829 let priority_key = Arc::new(StreamPriorityKey {
830 id,
831 ..Default::default()
832 });
833
834 Stream {
835 recv: recv_buf::RecvBuf::new(max_rx_data, initial_window, max_window),
836 send: send_buf::SendBuf::new(max_tx_data),
837 send_lowat: 1,
838 bidi: is_bidi(id),
839 local,
840 urgency: priority_key.urgency,
841 incremental: priority_key.incremental,
842 priority_key,
843 }
844 }
845
846 pub fn is_readable(&self) -> bool {
848 self.recv.ready()
849 }
850
851 pub fn is_writable(&self) -> bool {
854 !self.send.is_shutdown() &&
855 !self.send.is_fin() &&
856 (self.send.off_back() + self.send_lowat as u64) <
857 self.send.max_off()
858 }
859
860 pub fn is_flushable(&self) -> bool {
863 let off_front = self.send.off_front();
864
865 !self.send.is_empty() &&
866 off_front < self.send.off_back() &&
867 off_front < self.send.max_off()
868 }
869
870 pub fn is_complete(&self) -> bool {
880 match (self.bidi, self.local) {
881 (true, _) => self.recv.is_fin() && self.send.is_complete(),
884
885 (false, true) => self.send.is_complete(),
888
889 (false, false) => self.recv.is_fin(),
892 }
893 }
894}
895
896pub fn is_local(stream_id: u64, is_server: bool) -> bool {
898 (stream_id & 0x1) == (is_server as u64)
899}
900
901pub fn is_bidi(stream_id: u64) -> bool {
903 (stream_id & 0x2) == 0
904}
905
906#[derive(Clone, Debug)]
907pub struct StreamPriorityKey {
908 pub urgency: u8,
909 pub incremental: bool,
910 pub id: u64,
911
912 pub readable: RBTreeAtomicLink,
913 pub writable: RBTreeAtomicLink,
914 pub flushable: RBTreeAtomicLink,
915}
916
917impl Default for StreamPriorityKey {
918 fn default() -> Self {
919 Self {
920 urgency: DEFAULT_URGENCY,
921 incremental: true,
922 id: Default::default(),
923 readable: Default::default(),
924 writable: Default::default(),
925 flushable: Default::default(),
926 }
927 }
928}
929
930impl PartialEq for StreamPriorityKey {
931 fn eq(&self, other: &Self) -> bool {
932 self.id == other.id
933 }
934}
935
936impl Eq for StreamPriorityKey {}
937
938impl PartialOrd for StreamPriorityKey {
939 fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
940 Some(self.cmp(other))
941 }
942}
943
944impl Ord for StreamPriorityKey {
945 fn cmp(&self, other: &Self) -> cmp::Ordering {
946 if self.id == other.id {
948 return cmp::Ordering::Equal;
949 }
950
951 if self.urgency != other.urgency {
953 return self.urgency.cmp(&other.urgency);
954 }
955
956 if !self.incremental && !other.incremental {
959 return self.id.cmp(&other.id);
960 }
961
962 if self.incremental && !other.incremental {
964 return cmp::Ordering::Greater;
965 }
966 if !self.incremental && other.incremental {
967 return cmp::Ordering::Less;
968 }
969
970 cmp::Ordering::Greater
974 }
975}
976
977intrusive_adapter!(pub StreamWritablePriorityAdapter = Arc<StreamPriorityKey>: StreamPriorityKey { writable: RBTreeAtomicLink });
978
979impl KeyAdapter<'_> for StreamWritablePriorityAdapter {
980 type Key = StreamPriorityKey;
981
982 fn get_key(&self, s: &StreamPriorityKey) -> Self::Key {
983 s.clone()
984 }
985}
986
987intrusive_adapter!(pub StreamReadablePriorityAdapter = Arc<StreamPriorityKey>: StreamPriorityKey { readable: RBTreeAtomicLink });
988
989impl KeyAdapter<'_> for StreamReadablePriorityAdapter {
990 type Key = StreamPriorityKey;
991
992 fn get_key(&self, s: &StreamPriorityKey) -> Self::Key {
993 s.clone()
994 }
995}
996
997intrusive_adapter!(pub StreamFlushablePriorityAdapter = Arc<StreamPriorityKey>: StreamPriorityKey { flushable: RBTreeAtomicLink });
998
999impl KeyAdapter<'_> for StreamFlushablePriorityAdapter {
1000 type Key = StreamPriorityKey;
1001
1002 fn get_key(&self, s: &StreamPriorityKey) -> Self::Key {
1003 s.clone()
1004 }
1005}
1006
1007#[derive(Default)]
1009pub struct StreamIter {
1010 streams: SmallVec<[u64; 8]>,
1011 index: usize,
1012}
1013
1014impl StreamIter {
1015 #[inline]
1016 fn from(streams: &StreamIdHashSet) -> Self {
1017 StreamIter {
1018 streams: streams.iter().copied().collect(),
1019 index: 0,
1020 }
1021 }
1022}
1023
1024impl Iterator for StreamIter {
1025 type Item = u64;
1026
1027 #[inline]
1028 fn next(&mut self) -> Option<Self::Item> {
1029 let v = self.streams.get(self.index)?;
1030 self.index += 1;
1031 Some(*v)
1032 }
1033}
1034
1035impl ExactSizeIterator for StreamIter {
1036 #[inline]
1037 fn len(&self) -> usize {
1038 self.streams.len() - self.index
1039 }
1040}
1041
1042#[cfg(test)]
1043mod tests {
1044 use crate::range_buf::RangeBuf;
1045
1046 use super::*;
1047
1048 #[test]
1049 fn recv_flow_control() {
1050 let mut stream = <Stream>::new(0, 15, 0, true, 15, DEFAULT_STREAM_WINDOW);
1051 assert!(!stream.recv.almost_full());
1052
1053 let mut buf = [0; 32];
1054
1055 let first = RangeBuf::from(b"hello", 0, false);
1056 let second = RangeBuf::from(b"world", 5, false);
1057 let third = RangeBuf::from(b"something", 10, false);
1058
1059 assert_eq!(stream.recv.write(second), Ok(()));
1060 assert_eq!(stream.recv.write(first), Ok(()));
1061 assert!(!stream.recv.almost_full());
1062
1063 assert_eq!(stream.recv.write(third), Err(Error::FlowControl));
1064
1065 let (len, fin) = stream.recv.emit(&mut buf).unwrap();
1066 assert_eq!(&buf[..len], b"helloworld");
1067 assert!(!fin);
1068
1069 assert!(stream.recv.almost_full());
1070
1071 stream.recv.update_max_data(std::time::Instant::now());
1072 assert_eq!(stream.recv.max_data_next(), 25);
1073 assert!(!stream.recv.almost_full());
1074
1075 let third = RangeBuf::from(b"something", 10, false);
1076 assert_eq!(stream.recv.write(third), Ok(()));
1077 }
1078
1079 #[test]
1080 fn recv_past_fin() {
1081 let mut stream = <Stream>::new(0, 15, 0, true, 15, DEFAULT_STREAM_WINDOW);
1082 assert!(!stream.recv.almost_full());
1083
1084 let first = RangeBuf::from(b"hello", 0, true);
1085 let second = RangeBuf::from(b"world", 5, false);
1086
1087 assert_eq!(stream.recv.write(first), Ok(()));
1088 assert_eq!(stream.recv.write(second), Err(Error::FinalSize));
1089 }
1090
1091 #[test]
1092 fn recv_fin_dup() {
1093 let mut stream = <Stream>::new(0, 15, 0, true, 15, DEFAULT_STREAM_WINDOW);
1094 assert!(!stream.recv.almost_full());
1095
1096 let first = RangeBuf::from(b"hello", 0, true);
1097 let second = RangeBuf::from(b"hello", 0, true);
1098
1099 assert_eq!(stream.recv.write(first), Ok(()));
1100 assert_eq!(stream.recv.write(second), Ok(()));
1101
1102 let mut buf = [0; 32];
1103
1104 let (len, fin) = stream.recv.emit(&mut buf).unwrap();
1105 assert_eq!(&buf[..len], b"hello");
1106 assert!(fin);
1107 }
1108
1109 #[test]
1110 fn recv_fin_change() {
1111 let mut stream = <Stream>::new(0, 15, 0, true, 15, DEFAULT_STREAM_WINDOW);
1112 assert!(!stream.recv.almost_full());
1113
1114 let first = RangeBuf::from(b"hello", 0, true);
1115 let second = RangeBuf::from(b"world", 5, true);
1116
1117 assert_eq!(stream.recv.write(second), Ok(()));
1118 assert_eq!(stream.recv.write(first), Err(Error::FinalSize));
1119 }
1120
1121 #[test]
1122 fn recv_fin_lower_than_received() {
1123 let mut stream = <Stream>::new(0, 15, 0, true, 15, DEFAULT_STREAM_WINDOW);
1124 assert!(!stream.recv.almost_full());
1125
1126 let first = RangeBuf::from(b"hello", 0, true);
1127 let second = RangeBuf::from(b"world", 5, false);
1128
1129 assert_eq!(stream.recv.write(second), Ok(()));
1130 assert_eq!(stream.recv.write(first), Err(Error::FinalSize));
1131 }
1132
1133 #[test]
1134 fn recv_fin_flow_control() {
1135 let mut stream = <Stream>::new(0, 15, 0, true, 15, DEFAULT_STREAM_WINDOW);
1136 assert!(!stream.recv.almost_full());
1137
1138 let mut buf = [0; 32];
1139
1140 let first = RangeBuf::from(b"hello", 0, false);
1141 let second = RangeBuf::from(b"world", 5, true);
1142
1143 assert_eq!(stream.recv.write(first), Ok(()));
1144 assert_eq!(stream.recv.write(second), Ok(()));
1145
1146 let (len, fin) = stream.recv.emit(&mut buf).unwrap();
1147 assert_eq!(&buf[..len], b"helloworld");
1148 assert!(fin);
1149
1150 assert!(!stream.recv.almost_full());
1151 }
1152
1153 #[test]
1154 fn recv_fin_reset_mismatch() {
1155 let mut stream = <Stream>::new(0, 15, 0, true, 15, DEFAULT_STREAM_WINDOW);
1156 assert!(!stream.recv.almost_full());
1157
1158 let first = RangeBuf::from(b"hello", 0, true);
1159
1160 assert_eq!(stream.recv.write(first), Ok(()));
1161 assert_eq!(stream.recv.reset(0, 10), Err(Error::FinalSize));
1162 }
1163
1164 #[test]
1165 fn recv_reset_with_gap() {
1166 let mut stream = <Stream>::new(0, 15, 0, true, 15, DEFAULT_STREAM_WINDOW);
1167 assert!(!stream.recv.almost_full());
1168
1169 let first = RangeBuf::from(b"hello", 0, false);
1170
1171 assert_eq!(stream.recv.write(first), Ok(()));
1172 assert_eq!(stream.recv.emit(&mut [0; 1]), Ok((1, false)));
1174 assert_eq!(
1176 stream.recv.reset(0, 10),
1177 Ok(RecvBufResetReturn {
1178 max_data_delta: 5,
1179 consumed_flowcontrol: 9
1181 })
1182 );
1183 assert_eq!(stream.recv.reset(0, 10), Ok(RecvBufResetReturn::zero()));
1184 }
1185
1186 #[test]
1187 fn recv_reset_dup() {
1188 let mut stream = <Stream>::new(0, 15, 0, true, 15, DEFAULT_STREAM_WINDOW);
1189 assert!(!stream.recv.almost_full());
1190
1191 let first = RangeBuf::from(b"hello", 0, false);
1192
1193 assert_eq!(stream.recv.write(first), Ok(()));
1194 assert_eq!(
1195 stream.recv.reset(0, 5),
1196 Ok(RecvBufResetReturn {
1197 max_data_delta: 0,
1198 consumed_flowcontrol: 5
1199 })
1200 );
1201 assert_eq!(stream.recv.reset(0, 5), Ok(RecvBufResetReturn::zero()));
1202 }
1203
1204 #[test]
1205 fn recv_reset_change() {
1206 let mut stream = <Stream>::new(0, 15, 0, true, 15, DEFAULT_STREAM_WINDOW);
1207 assert!(!stream.recv.almost_full());
1208
1209 let first = RangeBuf::from(b"hello", 0, false);
1210
1211 assert_eq!(stream.recv.write(first), Ok(()));
1212 assert_eq!(
1213 stream.recv.reset(0, 5),
1214 Ok(RecvBufResetReturn {
1215 max_data_delta: 0,
1216 consumed_flowcontrol: 5
1217 })
1218 );
1219 assert_eq!(stream.recv.reset(0, 10), Err(Error::FinalSize));
1220 }
1221
1222 #[test]
1223 fn recv_reset_lower_than_received() {
1224 let mut stream = <Stream>::new(0, 15, 0, true, 15, DEFAULT_STREAM_WINDOW);
1225 assert!(!stream.recv.almost_full());
1226
1227 let first = RangeBuf::from(b"hello", 0, false);
1228
1229 assert_eq!(stream.recv.write(first), Ok(()));
1230 assert_eq!(stream.recv.reset(0, 4), Err(Error::FinalSize));
1231 }
1232
1233 #[test]
1234 fn send_flow_control() {
1235 let mut buf = [0; 25];
1236
1237 let mut stream = <Stream>::new(0, 0, 15, true, 0, DEFAULT_STREAM_WINDOW);
1238
1239 let first = b"hello";
1240 let second = b"world";
1241 let third = b"something";
1242
1243 assert!(stream.send.write(first, false).is_ok());
1244 assert!(stream.send.write(second, false).is_ok());
1245 assert!(stream.send.write(third, false).is_ok());
1246
1247 assert_eq!(stream.send.off_front(), 0);
1248
1249 let (written, fin) = stream.send.emit(&mut buf[..25]).unwrap();
1250 assert_eq!(written, 15);
1251 assert!(!fin);
1252 assert_eq!(&buf[..written], b"helloworldsomet");
1253
1254 assert_eq!(stream.send.off_front(), 15);
1255
1256 let (written, fin) = stream.send.emit(&mut buf[..25]).unwrap();
1257 assert_eq!(written, 0);
1258 assert!(!fin);
1259 assert_eq!(&buf[..written], b"");
1260
1261 stream.send.retransmit(0, 15);
1262
1263 assert_eq!(stream.send.off_front(), 0);
1264
1265 let (written, fin) = stream.send.emit(&mut buf[..10]).unwrap();
1266 assert_eq!(written, 10);
1267 assert!(!fin);
1268 assert_eq!(&buf[..written], b"helloworld");
1269
1270 assert_eq!(stream.send.off_front(), 10);
1271
1272 let (written, fin) = stream.send.emit(&mut buf[..10]).unwrap();
1273 assert_eq!(written, 5);
1274 assert!(!fin);
1275 assert_eq!(&buf[..written], b"somet");
1276 }
1277
1278 #[test]
1279 fn send_past_fin() {
1280 let mut stream = <Stream>::new(0, 0, 15, true, 0, DEFAULT_STREAM_WINDOW);
1281
1282 let first = b"hello";
1283 let second = b"world";
1284 let third = b"third";
1285
1286 assert_eq!(stream.send.write(first, false), Ok(5));
1287
1288 assert_eq!(stream.send.write(second, true), Ok(5));
1289 assert!(stream.send.is_fin());
1290
1291 assert_eq!(stream.send.write(third, false), Err(Error::FinalSize));
1292 }
1293
1294 #[test]
1295 fn send_fin_dup() {
1296 let mut stream = <Stream>::new(0, 0, 15, true, 0, DEFAULT_STREAM_WINDOW);
1297
1298 assert_eq!(stream.send.write(b"hello", true), Ok(5));
1299 assert!(stream.send.is_fin());
1300
1301 assert_eq!(stream.send.write(b"", true), Ok(0));
1302 assert!(stream.send.is_fin());
1303 }
1304
1305 #[test]
1306 fn send_undo_fin() {
1307 let mut stream = <Stream>::new(0, 0, 15, true, 0, DEFAULT_STREAM_WINDOW);
1308
1309 assert_eq!(stream.send.write(b"hello", true), Ok(5));
1310 assert!(stream.send.is_fin());
1311
1312 assert_eq!(
1313 stream.send.write(b"helloworld", true),
1314 Err(Error::FinalSize)
1315 );
1316 }
1317
1318 #[test]
1319 fn send_fin_max_data_match() {
1320 let mut buf = [0; 15];
1321
1322 let mut stream = <Stream>::new(0, 0, 15, true, 0, DEFAULT_STREAM_WINDOW);
1323
1324 let slice = b"hellohellohello";
1325
1326 assert!(stream.send.write(slice, true).is_ok());
1327
1328 let (written, fin) = stream.send.emit(&mut buf[..15]).unwrap();
1329 assert_eq!(written, 15);
1330 assert!(fin);
1331 assert_eq!(&buf[..written], slice);
1332 }
1333
1334 #[test]
1335 fn send_fin_zero_length() {
1336 let mut buf = [0; 5];
1337
1338 let mut stream = <Stream>::new(0, 0, 15, true, 0, DEFAULT_STREAM_WINDOW);
1339
1340 assert_eq!(stream.send.write(b"hello", false), Ok(5));
1341 assert_eq!(stream.send.write(b"", true), Ok(0));
1342 assert!(stream.send.is_fin());
1343
1344 let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1345 assert_eq!(written, 5);
1346 assert!(fin);
1347 assert_eq!(&buf[..written], b"hello");
1348 }
1349
1350 #[test]
1351 fn send_ack() {
1352 let mut buf = [0; 5];
1353
1354 let mut stream = <Stream>::new(0, 0, 15, true, 0, DEFAULT_STREAM_WINDOW);
1355
1356 assert_eq!(stream.send.write(b"hello", false), Ok(5));
1357 assert_eq!(stream.send.write(b"world", false), Ok(5));
1358 assert_eq!(stream.send.write(b"", true), Ok(0));
1359 assert!(stream.send.is_fin());
1360
1361 assert_eq!(stream.send.off_front(), 0);
1362
1363 let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1364 assert_eq!(written, 5);
1365 assert!(!fin);
1366 assert_eq!(&buf[..written], b"hello");
1367
1368 stream.send.ack_and_drop(0, 5);
1369
1370 stream.send.retransmit(0, 5);
1371
1372 assert_eq!(stream.send.off_front(), 5);
1373
1374 let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1375 assert_eq!(written, 5);
1376 assert!(fin);
1377 assert_eq!(&buf[..written], b"world");
1378 }
1379
1380 #[test]
1381 fn send_ack_reordering() {
1382 let mut buf = [0; 5];
1383
1384 let mut stream = <Stream>::new(0, 0, 15, true, 0, DEFAULT_STREAM_WINDOW);
1385
1386 assert_eq!(stream.send.write(b"hello", false), Ok(5));
1387 assert_eq!(stream.send.write(b"world", false), Ok(5));
1388 assert_eq!(stream.send.write(b"", true), Ok(0));
1389 assert!(stream.send.is_fin());
1390
1391 assert_eq!(stream.send.off_front(), 0);
1392
1393 let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1394 assert_eq!(written, 5);
1395 assert!(!fin);
1396 assert_eq!(&buf[..written], b"hello");
1397
1398 assert_eq!(stream.send.off_front(), 5);
1399
1400 let (written, fin) = stream.send.emit(&mut buf[..1]).unwrap();
1401 assert_eq!(written, 1);
1402 assert!(!fin);
1403 assert_eq!(&buf[..written], b"w");
1404
1405 stream.send.ack_and_drop(5, 1);
1406 stream.send.ack_and_drop(0, 5);
1407
1408 stream.send.retransmit(0, 5);
1409 stream.send.retransmit(5, 1);
1410
1411 assert_eq!(stream.send.off_front(), 6);
1412
1413 let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1414 assert_eq!(written, 4);
1415 assert!(fin);
1416 assert_eq!(&buf[..written], b"orld");
1417 }
1418
1419 #[test]
1420 fn recv_data_below_off() {
1421 let mut stream = <Stream>::new(0, 15, 0, true, 15, DEFAULT_STREAM_WINDOW);
1422
1423 let first = RangeBuf::from(b"hello", 0, false);
1424
1425 assert_eq!(stream.recv.write(first), Ok(()));
1426
1427 let mut buf = [0; 10];
1428
1429 let (len, fin) = stream.recv.emit(&mut buf).unwrap();
1430 assert_eq!(&buf[..len], b"hello");
1431 assert!(!fin);
1432
1433 let first = RangeBuf::from(b"elloworld", 1, true);
1434 assert_eq!(stream.recv.write(first), Ok(()));
1435
1436 let (len, fin) = stream.recv.emit(&mut buf).unwrap();
1437 assert_eq!(&buf[..len], b"world");
1438 assert!(fin);
1439 }
1440
1441 #[test]
1442 fn stream_complete() {
1443 let mut stream =
1444 <Stream>::new(0, 30, 30, true, 30, DEFAULT_STREAM_WINDOW);
1445
1446 assert_eq!(stream.send.write(b"hello", false), Ok(5));
1447 assert_eq!(stream.send.write(b"world", false), Ok(5));
1448
1449 assert!(!stream.send.is_complete());
1450 assert!(!stream.send.is_fin());
1451
1452 assert_eq!(stream.send.write(b"", true), Ok(0));
1453
1454 assert!(!stream.send.is_complete());
1455 assert!(stream.send.is_fin());
1456
1457 let buf = RangeBuf::from(b"hello", 0, true);
1458 assert!(stream.recv.write(buf).is_ok());
1459 assert!(!stream.recv.is_fin());
1460
1461 stream.send.ack(6, 4);
1462 assert!(!stream.send.is_complete());
1463
1464 let mut buf = [0; 2];
1465 assert_eq!(stream.recv.emit(&mut buf), Ok((2, false)));
1466 assert!(!stream.recv.is_fin());
1467
1468 stream.send.ack(1, 5);
1469 assert!(!stream.send.is_complete());
1470
1471 stream.send.ack(0, 1);
1472 assert!(stream.send.is_complete());
1473
1474 assert!(!stream.is_complete());
1475
1476 let mut buf = [0; 3];
1477 assert_eq!(stream.recv.emit(&mut buf), Ok((3, true)));
1478 assert!(stream.recv.is_fin());
1479
1480 assert!(stream.is_complete());
1481 }
1482
1483 #[test]
1484 fn send_fin_zero_length_output() {
1485 let mut buf = [0; 5];
1486
1487 let mut stream = <Stream>::new(0, 0, 15, true, 0, DEFAULT_STREAM_WINDOW);
1488
1489 assert_eq!(stream.send.write(b"hello", false), Ok(5));
1490 assert_eq!(stream.send.off_front(), 0);
1491 assert!(!stream.send.is_fin());
1492
1493 let (written, fin) = stream.send.emit(&mut buf).unwrap();
1494 assert_eq!(written, 5);
1495 assert!(!fin);
1496 assert_eq!(&buf[..written], b"hello");
1497
1498 assert_eq!(stream.send.write(b"", true), Ok(0));
1499 assert!(stream.send.is_fin());
1500 assert_eq!(stream.send.off_front(), 5);
1501
1502 let (written, fin) = stream.send.emit(&mut buf).unwrap();
1503 assert_eq!(written, 0);
1504 assert!(fin);
1505 assert_eq!(&buf[..written], b"");
1506 }
1507
1508 fn stream_send_ready(stream: &Stream) -> bool {
1509 !stream.send.is_empty() &&
1510 stream.send.off_front() < stream.send.off_back()
1511 }
1512
1513 #[test]
1514 fn send_emit() {
1515 let mut buf = [0; 5];
1516
1517 let mut stream = <Stream>::new(0, 0, 20, true, 0, DEFAULT_STREAM_WINDOW);
1518
1519 assert_eq!(stream.send.write(b"hello", false), Ok(5));
1520 assert_eq!(stream.send.write(b"world", false), Ok(5));
1521 assert_eq!(stream.send.write(b"olleh", false), Ok(5));
1522 assert_eq!(stream.send.write(b"dlrow", true), Ok(5));
1523 assert_eq!(stream.send.off_front(), 0);
1524 assert_eq!(stream.send.bufs_count(), 4);
1525
1526 assert!(stream.is_flushable());
1527
1528 assert!(stream_send_ready(&stream));
1529 assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1530 assert_eq!(stream.send.off_front(), 4);
1531 assert_eq!(&buf[..4], b"hell");
1532
1533 assert!(stream_send_ready(&stream));
1534 assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1535 assert_eq!(stream.send.off_front(), 8);
1536 assert_eq!(&buf[..4], b"owor");
1537
1538 assert!(stream_send_ready(&stream));
1539 assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1540 assert_eq!(stream.send.off_front(), 10);
1541 assert_eq!(&buf[..2], b"ld");
1542
1543 assert!(stream_send_ready(&stream));
1544 assert_eq!(stream.send.emit(&mut buf[..1]), Ok((1, false)));
1545 assert_eq!(stream.send.off_front(), 11);
1546 assert_eq!(&buf[..1], b"o");
1547
1548 assert!(stream_send_ready(&stream));
1549 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1550 assert_eq!(stream.send.off_front(), 16);
1551 assert_eq!(&buf[..5], b"llehd");
1552
1553 assert!(stream_send_ready(&stream));
1554 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((4, true)));
1555 assert_eq!(stream.send.off_front(), 20);
1556 assert_eq!(&buf[..4], b"lrow");
1557
1558 assert!(!stream.is_flushable());
1559
1560 assert!(!stream_send_ready(&stream));
1561 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((0, true)));
1562 assert_eq!(stream.send.off_front(), 20);
1563 }
1564
1565 #[test]
1566 fn send_emit_ack() {
1567 let mut buf = [0; 5];
1568
1569 let mut stream = <Stream>::new(0, 0, 20, true, 0, DEFAULT_STREAM_WINDOW);
1570
1571 assert_eq!(stream.send.write(b"hello", false), Ok(5));
1572 assert_eq!(stream.send.write(b"world", false), Ok(5));
1573 assert_eq!(stream.send.write(b"olleh", false), Ok(5));
1574 assert_eq!(stream.send.write(b"dlrow", true), Ok(5));
1575 assert_eq!(stream.send.off_front(), 0);
1576 assert_eq!(stream.send.bufs_count(), 4);
1577
1578 assert!(stream.is_flushable());
1579
1580 assert!(stream_send_ready(&stream));
1581 assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1582 assert_eq!(stream.send.off_front(), 4);
1583 assert_eq!(&buf[..4], b"hell");
1584
1585 assert!(stream_send_ready(&stream));
1586 assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1587 assert_eq!(stream.send.off_front(), 8);
1588 assert_eq!(&buf[..4], b"owor");
1589
1590 stream.send.ack_and_drop(0, 5);
1591 assert_eq!(stream.send.bufs_count(), 3);
1592
1593 assert!(stream_send_ready(&stream));
1594 assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1595 assert_eq!(stream.send.off_front(), 10);
1596 assert_eq!(&buf[..2], b"ld");
1597
1598 stream.send.ack_and_drop(7, 5);
1599 assert_eq!(stream.send.bufs_count(), 3);
1600
1601 assert!(stream_send_ready(&stream));
1602 assert_eq!(stream.send.emit(&mut buf[..1]), Ok((1, false)));
1603 assert_eq!(stream.send.off_front(), 11);
1604 assert_eq!(&buf[..1], b"o");
1605
1606 assert!(stream_send_ready(&stream));
1607 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1608 assert_eq!(stream.send.off_front(), 16);
1609 assert_eq!(&buf[..5], b"llehd");
1610
1611 stream.send.ack_and_drop(5, 7);
1612 assert_eq!(stream.send.bufs_count(), 2);
1613
1614 assert!(stream_send_ready(&stream));
1615 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((4, true)));
1616 assert_eq!(stream.send.off_front(), 20);
1617 assert_eq!(&buf[..4], b"lrow");
1618
1619 assert!(!stream.is_flushable());
1620
1621 assert!(!stream_send_ready(&stream));
1622 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((0, true)));
1623 assert_eq!(stream.send.off_front(), 20);
1624
1625 stream.send.ack_and_drop(22, 4);
1626 assert_eq!(stream.send.bufs_count(), 2);
1627
1628 stream.send.ack_and_drop(20, 1);
1629 assert_eq!(stream.send.bufs_count(), 2);
1630 }
1631
1632 #[test]
1633 fn send_emit_retransmit() {
1634 let mut buf = [0; 5];
1635
1636 let mut stream = <Stream>::new(
1637 0,
1638 0,
1639 20,
1640 true,
1641 DEFAULT_STREAM_WINDOW,
1642 DEFAULT_STREAM_WINDOW,
1643 );
1644
1645 assert_eq!(stream.send.write(b"hello", false), Ok(5));
1646 assert_eq!(stream.send.write(b"world", false), Ok(5));
1647 assert_eq!(stream.send.write(b"olleh", false), Ok(5));
1648 assert_eq!(stream.send.write(b"dlrow", true), Ok(5));
1649 assert_eq!(stream.send.off_front(), 0);
1650 assert_eq!(stream.send.bufs_count(), 4);
1651
1652 assert!(stream.is_flushable());
1653
1654 assert!(stream_send_ready(&stream));
1655 assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1656 assert_eq!(stream.send.off_front(), 4);
1657 assert_eq!(&buf[..4], b"hell");
1658
1659 assert!(stream_send_ready(&stream));
1660 assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1661 assert_eq!(stream.send.off_front(), 8);
1662 assert_eq!(&buf[..4], b"owor");
1663
1664 stream.send.retransmit(3, 3);
1665 assert_eq!(stream.send.off_front(), 3);
1666
1667 assert!(stream_send_ready(&stream));
1668 assert_eq!(stream.send.emit(&mut buf[..3]), Ok((3, false)));
1669 assert_eq!(stream.send.off_front(), 8);
1670 assert_eq!(&buf[..3], b"low");
1671
1672 assert!(stream_send_ready(&stream));
1673 assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1674 assert_eq!(stream.send.off_front(), 10);
1675 assert_eq!(&buf[..2], b"ld");
1676
1677 stream.send.ack_and_drop(7, 2);
1678
1679 stream.send.retransmit(8, 2);
1680
1681 assert!(stream_send_ready(&stream));
1682 assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1683 assert_eq!(stream.send.off_front(), 10);
1684 assert_eq!(&buf[..2], b"ld");
1685
1686 assert!(stream_send_ready(&stream));
1687 assert_eq!(stream.send.emit(&mut buf[..1]), Ok((1, false)));
1688 assert_eq!(stream.send.off_front(), 11);
1689 assert_eq!(&buf[..1], b"o");
1690
1691 assert!(stream_send_ready(&stream));
1692 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1693 assert_eq!(stream.send.off_front(), 16);
1694 assert_eq!(&buf[..5], b"llehd");
1695
1696 stream.send.retransmit(12, 2);
1697
1698 assert!(stream_send_ready(&stream));
1699 assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1700 assert_eq!(stream.send.off_front(), 16);
1701 assert_eq!(&buf[..2], b"le");
1702
1703 assert!(stream_send_ready(&stream));
1704 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((4, true)));
1705 assert_eq!(stream.send.off_front(), 20);
1706 assert_eq!(&buf[..4], b"lrow");
1707
1708 assert!(!stream.is_flushable());
1709
1710 assert!(!stream_send_ready(&stream));
1711 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((0, true)));
1712 assert_eq!(stream.send.off_front(), 20);
1713
1714 stream.send.retransmit(7, 12);
1715
1716 assert!(stream_send_ready(&stream));
1717 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1718 assert_eq!(stream.send.off_front(), 12);
1719 assert_eq!(&buf[..5], b"rldol");
1720
1721 assert!(stream_send_ready(&stream));
1722 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1723 assert_eq!(stream.send.off_front(), 17);
1724 assert_eq!(&buf[..5], b"lehdl");
1725
1726 assert!(stream_send_ready(&stream));
1727 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((2, false)));
1728 assert_eq!(stream.send.off_front(), 20);
1729 assert_eq!(&buf[..2], b"ro");
1730
1731 stream.send.ack_and_drop(12, 7);
1732
1733 stream.send.retransmit(7, 12);
1734
1735 assert!(stream_send_ready(&stream));
1736 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1737 assert_eq!(stream.send.off_front(), 12);
1738 assert_eq!(&buf[..5], b"rldol");
1739
1740 assert!(stream_send_ready(&stream));
1741 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1742 assert_eq!(stream.send.off_front(), 17);
1743 assert_eq!(&buf[..5], b"lehdl");
1744
1745 assert!(stream_send_ready(&stream));
1746 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((2, false)));
1747 assert_eq!(stream.send.off_front(), 20);
1748 assert_eq!(&buf[..2], b"ro");
1749 }
1750
1751 #[test]
1752 fn rangebuf_split_off() {
1753 let mut buf = <RangeBuf>::from(b"helloworld", 5, true);
1754 assert_eq!(buf.start, 0);
1755 assert_eq!(buf.pos, 0);
1756 assert_eq!(buf.len, 10);
1757 assert_eq!(buf.off, 5);
1758 assert!(buf.fin);
1759
1760 assert_eq!(buf.len(), 10);
1761 assert_eq!(buf.off(), 5);
1762 assert!(buf.fin());
1763
1764 assert_eq!(&buf[..], b"helloworld");
1765
1766 buf.consume(5);
1768
1769 assert_eq!(buf.start, 0);
1770 assert_eq!(buf.pos, 5);
1771 assert_eq!(buf.len, 10);
1772 assert_eq!(buf.off, 5);
1773 assert!(buf.fin);
1774
1775 assert_eq!(buf.len(), 5);
1776 assert_eq!(buf.off(), 10);
1777 assert!(buf.fin());
1778
1779 assert_eq!(&buf[..], b"world");
1780
1781 let mut new_buf = buf.split_off(3);
1783
1784 assert_eq!(buf.start, 0);
1785 assert_eq!(buf.pos, 3);
1786 assert_eq!(buf.len, 3);
1787 assert_eq!(buf.off, 5);
1788 assert!(!buf.fin);
1789
1790 assert_eq!(buf.len(), 0);
1791 assert_eq!(buf.off(), 8);
1792 assert!(!buf.fin());
1793
1794 assert_eq!(&buf[..], b"");
1795
1796 assert_eq!(new_buf.start, 3);
1797 assert_eq!(new_buf.pos, 5);
1798 assert_eq!(new_buf.len, 7);
1799 assert_eq!(new_buf.off, 8);
1800 assert!(new_buf.fin);
1801
1802 assert_eq!(new_buf.len(), 5);
1803 assert_eq!(new_buf.off(), 10);
1804 assert!(new_buf.fin());
1805
1806 assert_eq!(&new_buf[..], b"world");
1807
1808 new_buf.consume(2);
1810
1811 assert_eq!(new_buf.start, 3);
1812 assert_eq!(new_buf.pos, 7);
1813 assert_eq!(new_buf.len, 7);
1814 assert_eq!(new_buf.off, 8);
1815 assert!(new_buf.fin);
1816
1817 assert_eq!(new_buf.len(), 3);
1818 assert_eq!(new_buf.off(), 12);
1819 assert!(new_buf.fin());
1820
1821 assert_eq!(&new_buf[..], b"rld");
1822
1823 let mut new_new_buf = new_buf.split_off(5);
1825
1826 assert_eq!(new_buf.start, 3);
1827 assert_eq!(new_buf.pos, 7);
1828 assert_eq!(new_buf.len, 5);
1829 assert_eq!(new_buf.off, 8);
1830 assert!(!new_buf.fin);
1831
1832 assert_eq!(new_buf.len(), 1);
1833 assert_eq!(new_buf.off(), 12);
1834 assert!(!new_buf.fin());
1835
1836 assert_eq!(&new_buf[..], b"r");
1837
1838 assert_eq!(new_new_buf.start, 8);
1839 assert_eq!(new_new_buf.pos, 8);
1840 assert_eq!(new_new_buf.len, 2);
1841 assert_eq!(new_new_buf.off, 13);
1842 assert!(new_new_buf.fin);
1843
1844 assert_eq!(new_new_buf.len(), 2);
1845 assert_eq!(new_new_buf.off(), 13);
1846 assert!(new_new_buf.fin());
1847
1848 assert_eq!(&new_new_buf[..], b"ld");
1849
1850 new_new_buf.consume(2);
1852
1853 assert_eq!(new_new_buf.start, 8);
1854 assert_eq!(new_new_buf.pos, 10);
1855 assert_eq!(new_new_buf.len, 2);
1856 assert_eq!(new_new_buf.off, 13);
1857 assert!(new_new_buf.fin);
1858
1859 assert_eq!(new_new_buf.len(), 0);
1860 assert_eq!(new_new_buf.off(), 15);
1861 assert!(new_new_buf.fin());
1862
1863 assert_eq!(&new_new_buf[..], b"");
1864 }
1865
1866 #[test]
1869 fn stream_limit_auto_open() {
1870 let local_tp = crate::TransportParams::default();
1871 let peer_tp = crate::TransportParams::default();
1872
1873 let mut streams = <StreamMap>::new(5, 5, 5);
1874
1875 let stream_id = 500;
1876 assert!(!is_local(stream_id, true), "stream id is peer initiated");
1877 assert!(is_bidi(stream_id), "stream id is bidirectional");
1878 assert_eq!(
1879 streams
1880 .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
1881 .err(),
1882 Some(Error::StreamLimit),
1883 "stream limit should be exceeded"
1884 );
1885 }
1886
1887 #[test]
1890 fn stream_create_out_of_order() {
1891 let local_tp = crate::TransportParams::default();
1892 let peer_tp = crate::TransportParams::default();
1893
1894 let mut streams = <StreamMap>::new(5, 5, 5);
1895
1896 for stream_id in [8, 12, 4] {
1897 assert!(is_local(stream_id, false), "stream id is client initiated");
1898 assert!(is_bidi(stream_id), "stream id is bidirectional");
1899 assert!(streams
1900 .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
1901 .is_ok());
1902 }
1903 }
1904
1905 #[test]
1907 fn stream_limit_edge() {
1908 let local_tp = crate::TransportParams::default();
1909 let peer_tp = crate::TransportParams::default();
1910
1911 let mut streams = <StreamMap>::new(3, 3, 3);
1912
1913 let stream_id = 8;
1915 assert!(streams
1916 .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
1917 .is_ok());
1918
1919 let stream_id = 12;
1921 assert_eq!(
1922 streams
1923 .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
1924 .err(),
1925 Some(Error::StreamLimit)
1926 );
1927 }
1928
1929 fn cycle_stream_priority(stream_id: u64, streams: &mut StreamMap) {
1930 let key = streams.get(stream_id).unwrap().priority_key.clone();
1931 streams.update_priority(&key.clone(), &key);
1932 }
1933
1934 #[test]
1935 fn writable_prioritized_default_priority() {
1936 let local_tp = crate::TransportParams::default();
1937 let peer_tp = crate::TransportParams {
1938 initial_max_stream_data_bidi_local: 100,
1939 initial_max_stream_data_uni: 100,
1940 ..Default::default()
1941 };
1942
1943 let mut streams = StreamMap::new(100, 100, 100);
1944
1945 for id in [0, 4, 8, 12] {
1946 assert!(streams
1947 .get_or_create(id, &local_tp, &peer_tp, false, true)
1948 .is_ok());
1949 }
1950
1951 let walk_1: Vec<u64> = streams.writable().collect();
1952 cycle_stream_priority(*walk_1.first().unwrap(), &mut streams);
1953 let walk_2: Vec<u64> = streams.writable().collect();
1954 cycle_stream_priority(*walk_2.first().unwrap(), &mut streams);
1955 let walk_3: Vec<u64> = streams.writable().collect();
1956 cycle_stream_priority(*walk_3.first().unwrap(), &mut streams);
1957 let walk_4: Vec<u64> = streams.writable().collect();
1958 cycle_stream_priority(*walk_4.first().unwrap(), &mut streams);
1959 let walk_5: Vec<u64> = streams.writable().collect();
1960
1961 assert_eq!(walk_1, vec![0, 4, 8, 12]);
1964 assert_eq!(walk_2, vec![4, 8, 12, 0]);
1965 assert_eq!(walk_3, vec![8, 12, 0, 4]);
1966 assert_eq!(walk_4, vec![12, 0, 4, 8,]);
1967 assert_eq!(walk_5, vec![0, 4, 8, 12]);
1968 }
1969
1970 #[test]
1971 fn writable_prioritized_insert_order() {
1972 let local_tp = crate::TransportParams::default();
1973 let peer_tp = crate::TransportParams {
1974 initial_max_stream_data_bidi_local: 100,
1975 initial_max_stream_data_uni: 100,
1976 ..Default::default()
1977 };
1978
1979 let mut streams = StreamMap::new(100, 100, 100);
1980
1981 for id in [12, 4, 8, 0] {
1984 assert!(streams
1985 .get_or_create(id, &local_tp, &peer_tp, false, true)
1986 .is_ok());
1987 }
1988
1989 let walk_1: Vec<u64> = streams.writable().collect();
1990 cycle_stream_priority(*walk_1.first().unwrap(), &mut streams);
1991 let walk_2: Vec<u64> = streams.writable().collect();
1992 cycle_stream_priority(*walk_2.first().unwrap(), &mut streams);
1993 let walk_3: Vec<u64> = streams.writable().collect();
1994 cycle_stream_priority(*walk_3.first().unwrap(), &mut streams);
1995 let walk_4: Vec<u64> = streams.writable().collect();
1996 cycle_stream_priority(*walk_4.first().unwrap(), &mut streams);
1997 let walk_5: Vec<u64> = streams.writable().collect();
1998 assert_eq!(walk_1, vec![12, 4, 8, 0]);
1999 assert_eq!(walk_2, vec![4, 8, 0, 12]);
2000 assert_eq!(walk_3, vec![8, 0, 12, 4,]);
2001 assert_eq!(walk_4, vec![0, 12, 4, 8]);
2002 assert_eq!(walk_5, vec![12, 4, 8, 0]);
2003 }
2004
2005 #[test]
2006 fn writable_prioritized_mixed_urgency() {
2007 let local_tp = crate::TransportParams::default();
2008 let peer_tp = crate::TransportParams {
2009 initial_max_stream_data_bidi_local: 100,
2010 initial_max_stream_data_uni: 100,
2011 ..Default::default()
2012 };
2013
2014 let mut streams = <StreamMap>::new(100, 100, 100);
2015
2016 let input = vec![
2019 (0, 100),
2020 (4, 90),
2021 (8, 80),
2022 (12, 70),
2023 (16, 60),
2024 (20, 50),
2025 (24, 40),
2026 (28, 30),
2027 (32, 20),
2028 (36, 10),
2029 (40, 0),
2030 ];
2031
2032 for (id, urgency) in input.clone() {
2033 let stream = streams
2036 .get_or_create(id, &local_tp, &peer_tp, false, true)
2037 .unwrap();
2038
2039 stream.urgency = urgency;
2040
2041 let new_priority_key = Arc::new(StreamPriorityKey {
2042 urgency: stream.urgency,
2043 incremental: stream.incremental,
2044 id,
2045 ..Default::default()
2046 });
2047
2048 let old_priority_key = std::mem::replace(
2049 &mut stream.priority_key,
2050 new_priority_key.clone(),
2051 );
2052
2053 streams.update_priority(&old_priority_key, &new_priority_key);
2054 }
2055
2056 let walk_1: Vec<u64> = streams.writable().collect();
2057 assert_eq!(walk_1, vec![40, 36, 32, 28, 24, 20, 16, 12, 8, 4, 0]);
2058
2059 for (id, urgency) in input {
2061 let stream = streams
2064 .get_or_create(id, &local_tp, &peer_tp, false, true)
2065 .unwrap();
2066
2067 stream.urgency = urgency;
2068
2069 let new_priority_key = Arc::new(StreamPriorityKey {
2070 urgency: stream.urgency,
2071 incremental: stream.incremental,
2072 id,
2073 ..Default::default()
2074 });
2075
2076 let old_priority_key = std::mem::replace(
2077 &mut stream.priority_key,
2078 new_priority_key.clone(),
2079 );
2080
2081 streams.update_priority(&old_priority_key, &new_priority_key);
2082 }
2083
2084 let walk_2: Vec<u64> = streams.writable().collect();
2085 assert_eq!(walk_2, vec![40, 36, 32, 28, 24, 20, 16, 12, 8, 4, 0]);
2086
2087 streams.collect(24, true);
2089
2090 let walk_3: Vec<u64> = streams.writable().collect();
2091 assert_eq!(walk_3, vec![40, 36, 32, 28, 20, 16, 12, 8, 4, 0]);
2092
2093 streams.collect(40, true);
2094 streams.collect(0, true);
2095
2096 let walk_4: Vec<u64> = streams.writable().collect();
2097 assert_eq!(walk_4, vec![36, 32, 28, 20, 16, 12, 8, 4]);
2098
2099 streams
2101 .get_or_create(44, &local_tp, &peer_tp, false, true)
2102 .unwrap();
2103
2104 let walk_5: Vec<u64> = streams.writable().collect();
2105 assert_eq!(walk_5, vec![36, 32, 28, 20, 16, 12, 8, 4, 44]);
2106 }
2107
2108 #[test]
2109 fn writable_prioritized_mixed_urgencies_incrementals() {
2110 let local_tp = crate::TransportParams::default();
2111 let peer_tp = crate::TransportParams {
2112 initial_max_stream_data_bidi_local: 100,
2113 initial_max_stream_data_uni: 100,
2114 ..Default::default()
2115 };
2116
2117 let mut streams = StreamMap::new(100, 100, 100);
2118
2119 let input = vec![
2121 (0, 100),
2122 (4, 20),
2123 (8, 100),
2124 (12, 20),
2125 (16, 90),
2126 (20, 25),
2127 (24, 90),
2128 (28, 30),
2129 (32, 80),
2130 (36, 20),
2131 (40, 0),
2132 ];
2133
2134 for (id, urgency) in input.clone() {
2135 let stream = streams
2138 .get_or_create(id, &local_tp, &peer_tp, false, true)
2139 .unwrap();
2140
2141 stream.urgency = urgency;
2142
2143 let new_priority_key = Arc::new(StreamPriorityKey {
2144 urgency: stream.urgency,
2145 incremental: stream.incremental,
2146 id,
2147 ..Default::default()
2148 });
2149
2150 let old_priority_key = std::mem::replace(
2151 &mut stream.priority_key,
2152 new_priority_key.clone(),
2153 );
2154
2155 streams.update_priority(&old_priority_key, &new_priority_key);
2156 }
2157
2158 let walk_1: Vec<u64> = streams.writable().collect();
2159 cycle_stream_priority(4, &mut streams);
2160 cycle_stream_priority(16, &mut streams);
2161 cycle_stream_priority(0, &mut streams);
2162 let walk_2: Vec<u64> = streams.writable().collect();
2163 cycle_stream_priority(12, &mut streams);
2164 cycle_stream_priority(24, &mut streams);
2165 cycle_stream_priority(8, &mut streams);
2166 let walk_3: Vec<u64> = streams.writable().collect();
2167 cycle_stream_priority(36, &mut streams);
2168 cycle_stream_priority(16, &mut streams);
2169 cycle_stream_priority(0, &mut streams);
2170 let walk_4: Vec<u64> = streams.writable().collect();
2171 cycle_stream_priority(4, &mut streams);
2172 cycle_stream_priority(24, &mut streams);
2173 cycle_stream_priority(8, &mut streams);
2174 let walk_5: Vec<u64> = streams.writable().collect();
2175 cycle_stream_priority(12, &mut streams);
2176 cycle_stream_priority(16, &mut streams);
2177 cycle_stream_priority(0, &mut streams);
2178 let walk_6: Vec<u64> = streams.writable().collect();
2179 cycle_stream_priority(36, &mut streams);
2180 cycle_stream_priority(24, &mut streams);
2181 cycle_stream_priority(8, &mut streams);
2182 let walk_7: Vec<u64> = streams.writable().collect();
2183 cycle_stream_priority(4, &mut streams);
2184 cycle_stream_priority(16, &mut streams);
2185 cycle_stream_priority(0, &mut streams);
2186 let walk_8: Vec<u64> = streams.writable().collect();
2187 cycle_stream_priority(12, &mut streams);
2188 cycle_stream_priority(24, &mut streams);
2189 cycle_stream_priority(8, &mut streams);
2190 let walk_9: Vec<u64> = streams.writable().collect();
2191 cycle_stream_priority(36, &mut streams);
2192 cycle_stream_priority(16, &mut streams);
2193 cycle_stream_priority(0, &mut streams);
2194
2195 assert_eq!(walk_1, vec![40, 4, 12, 36, 20, 28, 32, 16, 24, 0, 8]);
2196 assert_eq!(walk_2, vec![40, 12, 36, 4, 20, 28, 32, 24, 16, 8, 0]);
2197 assert_eq!(walk_3, vec![40, 36, 4, 12, 20, 28, 32, 16, 24, 0, 8]);
2198 assert_eq!(walk_4, vec![40, 4, 12, 36, 20, 28, 32, 24, 16, 8, 0]);
2199 assert_eq!(walk_5, vec![40, 12, 36, 4, 20, 28, 32, 16, 24, 0, 8]);
2200 assert_eq!(walk_6, vec![40, 36, 4, 12, 20, 28, 32, 24, 16, 8, 0]);
2201 assert_eq!(walk_7, vec![40, 4, 12, 36, 20, 28, 32, 16, 24, 0, 8]);
2202 assert_eq!(walk_8, vec![40, 12, 36, 4, 20, 28, 32, 24, 16, 8, 0]);
2203 assert_eq!(walk_9, vec![40, 36, 4, 12, 20, 28, 32, 16, 24, 0, 8]);
2204
2205 streams.collect(20, true);
2207
2208 let walk_10: Vec<u64> = streams.writable().collect();
2209 assert_eq!(walk_10, vec![40, 4, 12, 36, 28, 32, 24, 16, 8, 0]);
2210
2211 let stream = streams
2213 .get_or_create(44, &local_tp, &peer_tp, false, true)
2214 .unwrap();
2215
2216 stream.urgency = 20;
2217 stream.incremental = true;
2218
2219 let new_priority_key = Arc::new(StreamPriorityKey {
2220 urgency: stream.urgency,
2221 incremental: stream.incremental,
2222 id: 44,
2223 ..Default::default()
2224 });
2225
2226 let old_priority_key =
2227 std::mem::replace(&mut stream.priority_key, new_priority_key.clone());
2228
2229 streams.update_priority(&old_priority_key, &new_priority_key);
2230
2231 let walk_11: Vec<u64> = streams.writable().collect();
2232 assert_eq!(walk_11, vec![40, 4, 12, 36, 44, 28, 32, 24, 16, 8, 0]);
2233 }
2234
2235 #[test]
2236 fn priority_tree_dupes() {
2237 let mut prioritized_writable: RBTree<StreamWritablePriorityAdapter> =
2238 Default::default();
2239
2240 for id in [0, 4, 8, 12] {
2241 let s = Arc::new(StreamPriorityKey {
2242 urgency: 0,
2243 incremental: false,
2244 id,
2245 ..Default::default()
2246 });
2247
2248 prioritized_writable.insert(s);
2249 }
2250
2251 let walk_1: Vec<u64> =
2252 prioritized_writable.iter().map(|s| s.id).collect();
2253 assert_eq!(walk_1, vec![0, 4, 8, 12]);
2254
2255 for id in [0, 4, 8, 12] {
2258 let s = Arc::new(StreamPriorityKey {
2259 urgency: 0,
2260 incremental: false,
2261 id,
2262 ..Default::default()
2263 });
2264
2265 prioritized_writable.insert(s);
2266 }
2267
2268 let walk_2: Vec<u64> =
2269 prioritized_writable.iter().map(|s| s.id).collect();
2270 assert_eq!(walk_2, vec![0, 0, 4, 4, 8, 8, 12, 12]);
2271 }
2272
2273 #[test]
2274 fn retransmit_returns_zero_when_already_acked() {
2275 let mut stream = <Stream>::new(0, 15, 15, true, 0, 15);
2276
2277 assert_eq!(stream.send.write(b"hello", false), Ok(5));
2279 assert_eq!(stream.send.buffered_bytes(), 5);
2280
2281 let mut buf = [0; 10];
2282 let (written, _) = stream.send.emit(&mut buf).unwrap();
2283 assert_eq!(written, 5);
2284 assert_eq!(stream.send.buffered_bytes(), 0);
2285
2286 let retransmitted = stream.send.retransmit(0, 5);
2288 assert_eq!(retransmitted, 5);
2289 assert_eq!(stream.send.buffered_bytes(), 5);
2290
2291 stream.send.ack_and_drop(0, 5);
2293 assert_eq!(stream.send.buffered_bytes(), 0);
2294
2295 let retransmitted = stream.send.retransmit(0, 5);
2297 assert_eq!(retransmitted, 0);
2298 assert_eq!(stream.send.buffered_bytes(), 0);
2299 }
2300
2301 #[test]
2302 fn retransmit_returns_partial_when_some_acked() {
2303 let mut stream = <Stream>::new(0, 15, 15, true, 0, 15);
2304
2305 assert_eq!(stream.send.write(b"helloworld", false), Ok(10));
2307 assert_eq!(stream.send.buffered_bytes(), 10);
2308
2309 let mut buf = [0; 10];
2310 let (written, _) = stream.send.emit(&mut buf).unwrap();
2311 assert_eq!(written, 10);
2312 assert_eq!(stream.send.buffered_bytes(), 0);
2313
2314 let retransmitted = stream.send.retransmit(0, 10);
2316 assert_eq!(retransmitted, 10);
2317 assert_eq!(stream.send.buffered_bytes(), 10);
2318
2319 let dropped = stream.send.ack_and_drop(0, 5);
2321 assert_eq!(dropped, 5);
2322 assert_eq!(stream.send.buffered_bytes(), 5);
2323
2324 let retransmitted = stream.send.retransmit(0, 10);
2327 assert_eq!(retransmitted, 0); assert_eq!(stream.send.buffered_bytes(), 5);
2329 }
2330
2331 #[test]
2332 fn ack_and_drop_decrements_len_and_returns_dropped() {
2333 let mut stream = <Stream>::new(0, 15, 15, true, 0, 15);
2334
2335 assert_eq!(stream.send.write(b"hello", false), Ok(5));
2337 assert_eq!(stream.send.buffered_bytes(), 5);
2338
2339 let mut buf = [0; 10];
2341 let (written, _) = stream.send.emit(&mut buf).unwrap();
2342 assert_eq!(written, 5);
2343 assert_eq!(stream.send.buffered_bytes(), 0);
2344
2345 let retransmitted = stream.send.retransmit(0, 5);
2347 assert_eq!(retransmitted, 5);
2348 assert_eq!(stream.send.buffered_bytes(), 5);
2349
2350 let dropped = stream.send.ack_and_drop(0, 5);
2352 assert_eq!(dropped, 5);
2353 assert_eq!(stream.send.buffered_bytes(), 0);
2354 }
2355
2356 #[test]
2357 fn ack_and_drop_partial_buffer() {
2358 let mut stream = <Stream>::new(0, 30, 30, true, 0, 30);
2359
2360 assert_eq!(stream.send.write(b"hello", false), Ok(5));
2362 assert_eq!(stream.send.write(b"world", false), Ok(5));
2363 assert_eq!(stream.send.buffered_bytes(), 10);
2364
2365 let mut buf = [0; 10];
2366 let (written, _) = stream.send.emit(&mut buf).unwrap();
2367 assert_eq!(written, 10);
2368 assert_eq!(stream.send.buffered_bytes(), 0);
2369
2370 let retransmitted = stream.send.retransmit(0, 10);
2372 assert_eq!(retransmitted, 10);
2373 assert_eq!(stream.send.buffered_bytes(), 10);
2374
2375 let dropped = stream.send.ack_and_drop(0, 5);
2377 assert_eq!(dropped, 5);
2378 assert_eq!(stream.send.buffered_bytes(), 5);
2379
2380 let dropped = stream.send.ack_and_drop(5, 5);
2382 assert_eq!(dropped, 5);
2383 assert_eq!(stream.send.buffered_bytes(), 0);
2384 }
2385
2386 #[test]
2387 fn ack_and_drop_returns_zero_when_nothing_dropped() {
2388 let mut stream = <Stream>::new(0, 15, 15, true, 0, 15);
2389
2390 assert_eq!(stream.send.write(b"hello", false), Ok(5));
2392 let mut buf = [0; 10];
2393 let (written, _) = stream.send.emit(&mut buf).unwrap();
2394 assert_eq!(written, 5);
2395
2396 let dropped = stream.send.ack_and_drop(0, 5);
2399 assert_eq!(dropped, 0);
2400 assert_eq!(stream.send.buffered_bytes(), 0);
2401 }
2402
2403 #[test]
2404 fn cache_consistency_through_full_lifecycle() {
2405 let mut streams = <StreamMap>::new(5, 5, 15);
2409
2410 let local_params = crate::TransportParams {
2412 initial_max_data: 30,
2413 initial_max_stream_data_bidi_local: 15,
2414 initial_max_stream_data_bidi_remote: 15,
2415 initial_max_stream_data_uni: 10,
2416 initial_max_streams_bidi: 5,
2417 initial_max_streams_uni: 5,
2418 ..Default::default()
2419 };
2420 let peer_params = local_params.clone();
2421
2422 streams.update_peer_max_streams_bidi(5);
2424 streams.update_peer_max_streams_uni(5);
2425
2426 let stream_id = 0u64;
2427
2428 {
2430 let stream = streams
2431 .get_or_create(
2432 stream_id,
2433 &local_params,
2434 &peer_params,
2435 true,
2436 false,
2437 )
2438 .unwrap();
2439 assert_eq!(stream.send.write(b"hello", false), Ok(5));
2440 }
2441 streams.add_tx_buffered(5);
2442 assert_eq!(streams.get(stream_id).unwrap().send.buffered_bytes(), 5);
2443 assert_eq!(streams.tx_buffered(), 5);
2444 assert!(streams.tx_buffered_is_consistent());
2445
2446 let mut buf = [0; 10];
2448 let written = {
2449 let stream = streams.get_mut(stream_id).unwrap();
2450 let (written, _) = stream.send.emit(&mut buf).unwrap();
2451 written
2452 };
2453 assert_eq!(written, 5);
2454 streams.sub_tx_buffered(5);
2455 assert_eq!(streams.get(stream_id).unwrap().send.buffered_bytes(), 0);
2456 assert_eq!(streams.tx_buffered(), 0);
2457 assert!(streams.tx_buffered_is_consistent());
2458
2459 let retransmitted = {
2462 let stream = streams.get_mut(stream_id).unwrap();
2463 stream.send.retransmit(0, 5)
2464 };
2465 assert_eq!(retransmitted, 5);
2466 streams.add_tx_buffered(retransmitted);
2467 assert_eq!(streams.get(stream_id).unwrap().send.buffered_bytes(), 5);
2468 assert_eq!(streams.tx_buffered(), 5);
2469 assert!(streams.tx_buffered_is_consistent());
2470
2471 let dropped = {
2474 let stream = streams.get_mut(stream_id).unwrap();
2475 stream.send.ack_and_drop(0, 5)
2476 };
2477 assert_eq!(dropped, 5);
2478 streams.sub_tx_buffered(dropped);
2479 assert_eq!(streams.get(stream_id).unwrap().send.buffered_bytes(), 0);
2480 assert_eq!(streams.tx_buffered(), 0);
2481 assert!(streams.tx_buffered_is_consistent());
2482 }
2483
2484 #[test]
2485 fn send_buf_len_reflects_buffered_data() {
2486 let mut stream = <Stream>::new(0, 15, 15, true, 0, 15);
2487
2488 assert_eq!(stream.send.buffered_bytes(), 0);
2490
2491 assert_eq!(stream.send.write(b"hello", false), Ok(5));
2493 assert_eq!(stream.send.buffered_bytes(), 5);
2494
2495 let mut buf = [0; 10];
2497 let (written, _) = stream.send.emit(&mut buf).unwrap();
2498 assert_eq!(written, 5);
2499 assert_eq!(stream.send.buffered_bytes(), 0);
2500
2501 let retransmitted = stream.send.retransmit(0, 5);
2503 assert_eq!(retransmitted, 5);
2504 assert_eq!(stream.send.buffered_bytes(), 5);
2505
2506 let dropped = stream.send.ack_and_drop(0, 5);
2508 assert_eq!(dropped, 5);
2509 assert_eq!(stream.send.buffered_bytes(), 0);
2510 }
2511}
2512
2513mod recv_buf;
2514mod send_buf;