1use std::cmp;
28
29use std::sync::Arc;
30
31use std::collections::hash_map;
32use std::collections::HashMap;
33use std::collections::HashSet;
34
35use intrusive_collections::intrusive_adapter;
36use intrusive_collections::KeyAdapter;
37use intrusive_collections::RBTree;
38use intrusive_collections::RBTreeAtomicLink;
39
40use smallvec::SmallVec;
41
42use crate::Error;
43use crate::Result;
44
45const DEFAULT_URGENCY: u8 = 127;
46
47const DEFAULT_STREAM_WINDOW: u64 = 32 * 1024;
49
50pub const MAX_STREAM_WINDOW: u64 = 16 * 1024 * 1024;
52
53#[derive(Default)]
58pub struct StreamIdHasher {
59 id: u64,
60}
61
62impl std::hash::Hasher for StreamIdHasher {
63 #[inline]
64 fn finish(&self) -> u64 {
65 self.id
66 }
67
68 #[inline]
69 fn write_u64(&mut self, id: u64) {
70 self.id = id;
71 }
72
73 #[inline]
74 fn write(&mut self, _: &[u8]) {
75 unimplemented!()
78 }
79}
80
81type BuildStreamIdHasher = std::hash::BuildHasherDefault<StreamIdHasher>;
82
83pub type StreamIdHashMap<V> = HashMap<u64, V, BuildStreamIdHasher>;
84pub type StreamIdHashSet = HashSet<u64, BuildStreamIdHasher>;
85
86#[derive(Default)]
88pub struct StreamMap {
89 streams: StreamIdHashMap<Stream>,
91
92 collected: StreamIdHashSet,
98
99 peer_max_streams_bidi: u64,
101
102 peer_max_streams_uni: u64,
104
105 peer_opened_streams_bidi: u64,
107
108 peer_opened_streams_uni: u64,
110
111 local_max_streams_bidi: u64,
113 local_max_streams_bidi_next: u64,
114
115 local_max_streams_uni: u64,
117 local_max_streams_uni_next: u64,
118
119 local_opened_streams_bidi: u64,
121
122 local_opened_streams_uni: u64,
124
125 flushable: RBTree<StreamFlushablePriorityAdapter>,
129
130 pub readable: RBTree<StreamReadablePriorityAdapter>,
134
135 pub writable: RBTree<StreamWritablePriorityAdapter>,
140
141 almost_full: StreamIdHashSet,
146
147 blocked: StreamIdHashMap<u64>,
151
152 reset: StreamIdHashMap<(u64, u64)>,
156
157 stopped: StreamIdHashMap<u64>,
161
162 max_stream_window: u64,
164}
165
166impl StreamMap {
167 pub fn new(
168 max_streams_bidi: u64, max_streams_uni: u64, max_stream_window: u64,
169 ) -> StreamMap {
170 StreamMap {
171 local_max_streams_bidi: max_streams_bidi,
172 local_max_streams_bidi_next: max_streams_bidi,
173
174 local_max_streams_uni: max_streams_uni,
175 local_max_streams_uni_next: max_streams_uni,
176
177 max_stream_window,
178
179 ..StreamMap::default()
180 }
181 }
182
183 pub fn get(&self, id: u64) -> Option<&Stream> {
185 self.streams.get(&id)
186 }
187
188 pub fn get_mut(&mut self, id: u64) -> Option<&mut Stream> {
190 self.streams.get_mut(&id)
191 }
192
193 pub(crate) fn get_or_create(
206 &mut self, id: u64, local_params: &crate::TransportParams,
207 peer_params: &crate::TransportParams, local: bool, is_server: bool,
208 ) -> Result<&mut Stream> {
209 let (stream, is_new_and_writable) = match self.streams.entry(id) {
210 hash_map::Entry::Vacant(v) => {
211 if self.collected.contains(&id) {
213 return Err(Error::Done);
214 }
215
216 if local != is_local(id, is_server) {
217 return Err(Error::InvalidStreamState(id));
218 }
219
220 let (max_rx_data, max_tx_data) = match (local, is_bidi(id)) {
221 (true, true) => (
223 local_params.initial_max_stream_data_bidi_local,
224 peer_params.initial_max_stream_data_bidi_remote,
225 ),
226
227 (true, false) => (0, peer_params.initial_max_stream_data_uni),
229
230 (false, true) => (
232 local_params.initial_max_stream_data_bidi_remote,
233 peer_params.initial_max_stream_data_bidi_local,
234 ),
235
236 (false, false) =>
238 (local_params.initial_max_stream_data_uni, 0),
239 };
240
241 let stream_sequence = id >> 2;
245
246 match (is_local(id, is_server), is_bidi(id)) {
248 (true, true) => {
249 let n = std::cmp::max(
250 self.local_opened_streams_bidi,
251 stream_sequence + 1,
252 );
253
254 if n > self.peer_max_streams_bidi {
255 return Err(Error::StreamLimit);
256 }
257
258 self.local_opened_streams_bidi = n;
259 },
260
261 (true, false) => {
262 let n = std::cmp::max(
263 self.local_opened_streams_uni,
264 stream_sequence + 1,
265 );
266
267 if n > self.peer_max_streams_uni {
268 return Err(Error::StreamLimit);
269 }
270
271 self.local_opened_streams_uni = n;
272 },
273
274 (false, true) => {
275 let n = std::cmp::max(
276 self.peer_opened_streams_bidi,
277 stream_sequence + 1,
278 );
279
280 if n > self.local_max_streams_bidi {
281 return Err(Error::StreamLimit);
282 }
283
284 self.peer_opened_streams_bidi = n;
285 },
286
287 (false, false) => {
288 let n = std::cmp::max(
289 self.peer_opened_streams_uni,
290 stream_sequence + 1,
291 );
292
293 if n > self.local_max_streams_uni {
294 return Err(Error::StreamLimit);
295 }
296
297 self.peer_opened_streams_uni = n;
298 },
299 };
300
301 let s = Stream::new(
302 id,
303 max_rx_data,
304 max_tx_data,
305 is_bidi(id),
306 local,
307 self.max_stream_window,
308 );
309
310 let is_writable = s.is_writable();
311
312 (v.insert(s), is_writable)
313 },
314
315 hash_map::Entry::Occupied(v) => (v.into_mut(), false),
316 };
317
318 if is_new_and_writable {
321 self.writable.insert(Arc::clone(&stream.priority_key));
322 }
323
324 Ok(stream)
325 }
326
327 pub fn insert_readable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
331 if !priority_key.readable.is_linked() {
332 self.readable.insert(Arc::clone(priority_key));
333 }
334 }
335
336 pub fn remove_readable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
338 if !priority_key.readable.is_linked() {
339 return;
340 }
341
342 let mut c = {
343 let ptr = Arc::as_ptr(priority_key);
344 unsafe { self.readable.cursor_mut_from_ptr(ptr) }
345 };
346
347 c.remove();
348 }
349
350 pub fn insert_writable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
357 if !priority_key.writable.is_linked() {
358 self.writable.insert(Arc::clone(priority_key));
359 }
360 }
361
362 pub fn remove_writable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
367 if !priority_key.writable.is_linked() {
368 return;
369 }
370
371 let mut c = {
372 let ptr = Arc::as_ptr(priority_key);
373 unsafe { self.writable.cursor_mut_from_ptr(ptr) }
374 };
375
376 c.remove();
377 }
378
379 pub fn insert_flushable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
383 if !priority_key.flushable.is_linked() {
384 self.flushable.insert(Arc::clone(priority_key));
385 }
386 }
387
388 pub fn remove_flushable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
390 if !priority_key.flushable.is_linked() {
391 return;
392 }
393
394 let mut c = {
395 let ptr = Arc::as_ptr(priority_key);
396 unsafe { self.flushable.cursor_mut_from_ptr(ptr) }
397 };
398
399 c.remove();
400 }
401
402 pub fn peek_flushable(&self) -> Option<Arc<StreamPriorityKey>> {
403 self.flushable.front().clone_pointer()
404 }
405
406 pub fn update_priority(
408 &mut self, old: &Arc<StreamPriorityKey>, new: &Arc<StreamPriorityKey>,
409 ) {
410 if old.readable.is_linked() {
411 self.remove_readable(old);
412 self.readable.insert(Arc::clone(new));
413 }
414
415 if old.writable.is_linked() {
416 self.remove_writable(old);
417 self.writable.insert(Arc::clone(new));
418 }
419
420 if old.flushable.is_linked() {
421 self.remove_flushable(old);
422 self.flushable.insert(Arc::clone(new));
423 }
424 }
425
426 pub fn insert_almost_full(&mut self, stream_id: u64) {
430 self.almost_full.insert(stream_id);
431 }
432
433 pub fn remove_almost_full(&mut self, stream_id: u64) {
435 self.almost_full.remove(&stream_id);
436 }
437
438 pub fn insert_blocked(&mut self, stream_id: u64, off: u64) {
443 self.blocked.insert(stream_id, off);
444 }
445
446 pub fn remove_blocked(&mut self, stream_id: u64) {
448 self.blocked.remove(&stream_id);
449 }
450
451 pub fn insert_reset(
456 &mut self, stream_id: u64, error_code: u64, final_size: u64,
457 ) {
458 self.reset.insert(stream_id, (error_code, final_size));
459 }
460
461 pub fn remove_reset(&mut self, stream_id: u64) {
463 self.reset.remove(&stream_id);
464 }
465
466 pub fn insert_stopped(&mut self, stream_id: u64, error_code: u64) {
471 self.stopped.insert(stream_id, error_code);
472 }
473
474 pub fn remove_stopped(&mut self, stream_id: u64) {
476 self.stopped.remove(&stream_id);
477 }
478
479 pub fn update_peer_max_streams_bidi(&mut self, v: u64) {
481 self.peer_max_streams_bidi = cmp::max(self.peer_max_streams_bidi, v);
482 }
483
484 pub fn update_peer_max_streams_uni(&mut self, v: u64) {
486 self.peer_max_streams_uni = cmp::max(self.peer_max_streams_uni, v);
487 }
488
489 pub fn update_max_streams_bidi(&mut self) {
491 self.local_max_streams_bidi = self.local_max_streams_bidi_next;
492 }
493
494 pub fn max_streams_bidi(&self) -> u64 {
496 self.local_max_streams_bidi
497 }
498
499 pub fn max_streams_bidi_next(&mut self) -> u64 {
501 self.local_max_streams_bidi_next
502 }
503
504 pub fn update_max_streams_uni(&mut self) {
506 self.local_max_streams_uni = self.local_max_streams_uni_next;
507 }
508
509 pub fn max_streams_uni_next(&mut self) -> u64 {
511 self.local_max_streams_uni_next
512 }
513
514 pub fn peer_streams_left_bidi(&self) -> u64 {
517 self.peer_max_streams_bidi - self.local_opened_streams_bidi
518 }
519
520 pub fn peer_streams_left_uni(&self) -> u64 {
523 self.peer_max_streams_uni - self.local_opened_streams_uni
524 }
525
526 pub fn collect(&mut self, stream_id: u64, local: bool) {
531 if !local {
532 if is_bidi(stream_id) {
535 self.local_max_streams_bidi_next =
536 self.local_max_streams_bidi_next.saturating_add(1);
537 } else {
538 self.local_max_streams_uni_next =
539 self.local_max_streams_uni_next.saturating_add(1);
540 }
541 }
542
543 let s = self.streams.remove(&stream_id).unwrap();
544
545 self.remove_readable(&s.priority_key);
546
547 self.remove_writable(&s.priority_key);
548
549 self.remove_flushable(&s.priority_key);
550
551 self.collected.insert(stream_id);
552 }
553
554 pub fn readable(&self) -> StreamIter {
556 StreamIter {
557 streams: self.readable.iter().map(|s| s.id).collect(),
558 index: 0,
559 }
560 }
561
562 pub fn writable(&self) -> StreamIter {
564 StreamIter {
565 streams: self.writable.iter().map(|s| s.id).collect(),
566 index: 0,
567 }
568 }
569
570 pub fn almost_full(&self) -> StreamIter {
572 StreamIter::from(&self.almost_full)
573 }
574
575 pub fn blocked(&self) -> hash_map::Iter<u64, u64> {
577 self.blocked.iter()
578 }
579
580 pub fn reset(&self) -> hash_map::Iter<u64, (u64, u64)> {
582 self.reset.iter()
583 }
584
585 pub fn stopped(&self) -> hash_map::Iter<u64, u64> {
587 self.stopped.iter()
588 }
589
590 pub fn is_collected(&self, stream_id: u64) -> bool {
592 self.collected.contains(&stream_id)
593 }
594
595 pub fn has_flushable(&self) -> bool {
597 !self.flushable.is_empty()
598 }
599
600 pub fn has_readable(&self) -> bool {
602 !self.readable.is_empty()
603 }
604
605 pub fn has_almost_full(&self) -> bool {
608 !self.almost_full.is_empty()
609 }
610
611 pub fn has_blocked(&self) -> bool {
613 !self.blocked.is_empty()
614 }
615
616 pub fn has_reset(&self) -> bool {
618 !self.reset.is_empty()
619 }
620
621 pub fn has_stopped(&self) -> bool {
623 !self.stopped.is_empty()
624 }
625
626 pub fn should_update_max_streams_bidi(&self) -> bool {
629 self.local_max_streams_bidi_next != self.local_max_streams_bidi &&
630 self.local_max_streams_bidi_next / 2 >
631 self.local_max_streams_bidi - self.peer_opened_streams_bidi
632 }
633
634 pub fn should_update_max_streams_uni(&self) -> bool {
637 self.local_max_streams_uni_next != self.local_max_streams_uni &&
638 self.local_max_streams_uni_next / 2 >
639 self.local_max_streams_uni - self.peer_opened_streams_uni
640 }
641
642 #[cfg(test)]
644 pub fn len(&self) -> usize {
645 self.streams.len()
646 }
647}
648
649pub struct Stream {
651 pub recv: recv_buf::RecvBuf,
653
654 pub send: send_buf::SendBuf,
656
657 pub send_lowat: usize,
658
659 pub bidi: bool,
661
662 pub local: bool,
664
665 pub urgency: u8,
667
668 pub incremental: bool,
670
671 pub priority_key: Arc<StreamPriorityKey>,
672}
673
674impl Stream {
675 pub fn new(
677 id: u64, max_rx_data: u64, max_tx_data: u64, bidi: bool, local: bool,
678 max_window: u64,
679 ) -> Stream {
680 let priority_key = Arc::new(StreamPriorityKey {
681 id,
682 ..Default::default()
683 });
684
685 Stream {
686 recv: recv_buf::RecvBuf::new(max_rx_data, max_window),
687 send: send_buf::SendBuf::new(max_tx_data),
688 send_lowat: 1,
689 bidi,
690 local,
691 urgency: priority_key.urgency,
692 incremental: priority_key.incremental,
693 priority_key,
694 }
695 }
696
697 pub fn is_readable(&self) -> bool {
699 self.recv.ready()
700 }
701
702 pub fn is_writable(&self) -> bool {
705 !self.send.is_shutdown() &&
706 !self.send.is_fin() &&
707 (self.send.off_back() + self.send_lowat as u64) <
708 self.send.max_off()
709 }
710
711 pub fn is_flushable(&self) -> bool {
714 let off_front = self.send.off_front();
715
716 !self.send.is_empty() &&
717 off_front < self.send.off_back() &&
718 off_front < self.send.max_off()
719 }
720
721 pub fn is_complete(&self) -> bool {
731 match (self.bidi, self.local) {
732 (true, _) => self.recv.is_fin() && self.send.is_complete(),
735
736 (false, true) => self.send.is_complete(),
739
740 (false, false) => self.recv.is_fin(),
743 }
744 }
745}
746
747pub fn is_local(stream_id: u64, is_server: bool) -> bool {
749 (stream_id & 0x1) == (is_server as u64)
750}
751
752pub fn is_bidi(stream_id: u64) -> bool {
754 (stream_id & 0x2) == 0
755}
756
757#[derive(Clone, Debug)]
758pub struct StreamPriorityKey {
759 pub urgency: u8,
760 pub incremental: bool,
761 pub id: u64,
762
763 pub readable: RBTreeAtomicLink,
764 pub writable: RBTreeAtomicLink,
765 pub flushable: RBTreeAtomicLink,
766}
767
768impl Default for StreamPriorityKey {
769 fn default() -> Self {
770 Self {
771 urgency: DEFAULT_URGENCY,
772 incremental: true,
773 id: Default::default(),
774 readable: Default::default(),
775 writable: Default::default(),
776 flushable: Default::default(),
777 }
778 }
779}
780
781impl PartialEq for StreamPriorityKey {
782 fn eq(&self, other: &Self) -> bool {
783 self.id == other.id
784 }
785}
786
787impl Eq for StreamPriorityKey {}
788
789impl PartialOrd for StreamPriorityKey {
790 #[allow(clippy::non_canonical_partial_ord_impl)]
792 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
793 if self.id == other.id {
795 return Some(std::cmp::Ordering::Equal);
796 }
797
798 if self.urgency != other.urgency {
800 return self.urgency.partial_cmp(&other.urgency);
801 }
802
803 if !self.incremental && !other.incremental {
806 return self.id.partial_cmp(&other.id);
807 }
808
809 if self.incremental && !other.incremental {
811 return Some(std::cmp::Ordering::Greater);
812 }
813 if !self.incremental && other.incremental {
814 return Some(std::cmp::Ordering::Less);
815 }
816
817 Some(std::cmp::Ordering::Greater)
821 }
822}
823
824impl Ord for StreamPriorityKey {
825 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
826 self.partial_cmp(other).unwrap()
828 }
829}
830
831intrusive_adapter!(pub StreamWritablePriorityAdapter = Arc<StreamPriorityKey>: StreamPriorityKey { writable: RBTreeAtomicLink });
832
833impl KeyAdapter<'_> for StreamWritablePriorityAdapter {
834 type Key = StreamPriorityKey;
835
836 fn get_key(&self, s: &StreamPriorityKey) -> Self::Key {
837 s.clone()
838 }
839}
840
841intrusive_adapter!(pub StreamReadablePriorityAdapter = Arc<StreamPriorityKey>: StreamPriorityKey { readable: RBTreeAtomicLink });
842
843impl KeyAdapter<'_> for StreamReadablePriorityAdapter {
844 type Key = StreamPriorityKey;
845
846 fn get_key(&self, s: &StreamPriorityKey) -> Self::Key {
847 s.clone()
848 }
849}
850
851intrusive_adapter!(pub StreamFlushablePriorityAdapter = Arc<StreamPriorityKey>: StreamPriorityKey { flushable: RBTreeAtomicLink });
852
853impl KeyAdapter<'_> for StreamFlushablePriorityAdapter {
854 type Key = StreamPriorityKey;
855
856 fn get_key(&self, s: &StreamPriorityKey) -> Self::Key {
857 s.clone()
858 }
859}
860
861#[derive(Default)]
863pub struct StreamIter {
864 streams: SmallVec<[u64; 8]>,
865 index: usize,
866}
867
868impl StreamIter {
869 #[inline]
870 fn from(streams: &StreamIdHashSet) -> Self {
871 StreamIter {
872 streams: streams.iter().copied().collect(),
873 index: 0,
874 }
875 }
876}
877
878impl Iterator for StreamIter {
879 type Item = u64;
880
881 #[inline]
882 fn next(&mut self) -> Option<Self::Item> {
883 let v = self.streams.get(self.index)?;
884 self.index += 1;
885 Some(*v)
886 }
887}
888
889impl ExactSizeIterator for StreamIter {
890 #[inline]
891 fn len(&self) -> usize {
892 self.streams.len() - self.index
893 }
894}
895
896#[derive(Clone, Debug, Default, Eq)]
911pub struct RangeBuf {
912 data: Arc<Vec<u8>>,
918
919 start: usize,
921
922 pos: usize,
924
925 len: usize,
927
928 off: u64,
930
931 fin: bool,
933}
934
935impl RangeBuf {
936 pub fn from(buf: &[u8], off: u64, fin: bool) -> RangeBuf {
938 RangeBuf {
939 data: Arc::new(Vec::from(buf)),
940 start: 0,
941 pos: 0,
942 len: buf.len(),
943 off,
944 fin,
945 }
946 }
947
948 pub fn fin(&self) -> bool {
950 self.fin
951 }
952
953 pub fn off(&self) -> u64 {
955 (self.off - self.start as u64) + self.pos as u64
956 }
957
958 pub fn max_off(&self) -> u64 {
960 self.off() + self.len() as u64
961 }
962
963 pub fn len(&self) -> usize {
965 self.len - (self.pos - self.start)
966 }
967
968 pub fn is_empty(&self) -> bool {
970 self.len() == 0
971 }
972
973 pub fn consume(&mut self, count: usize) {
975 self.pos += count;
976 }
977
978 pub fn split_off(&mut self, at: usize) -> RangeBuf {
980 assert!(
981 at <= self.len,
982 "`at` split index (is {}) should be <= len (is {})",
983 at,
984 self.len
985 );
986
987 let buf = RangeBuf {
988 data: self.data.clone(),
989 start: self.start + at,
990 pos: cmp::max(self.pos, self.start + at),
991 len: self.len - at,
992 off: self.off + at as u64,
993 fin: self.fin,
994 };
995
996 self.pos = cmp::min(self.pos, self.start + at);
997 self.len = at;
998 self.fin = false;
999
1000 buf
1001 }
1002}
1003
1004impl std::ops::Deref for RangeBuf {
1005 type Target = [u8];
1006
1007 fn deref(&self) -> &[u8] {
1008 &self.data[self.pos..self.start + self.len]
1009 }
1010}
1011
1012impl Ord for RangeBuf {
1013 fn cmp(&self, other: &RangeBuf) -> cmp::Ordering {
1014 self.off.cmp(&other.off).reverse()
1016 }
1017}
1018
1019impl PartialOrd for RangeBuf {
1020 fn partial_cmp(&self, other: &RangeBuf) -> Option<cmp::Ordering> {
1021 Some(self.cmp(other))
1022 }
1023}
1024
1025impl PartialEq for RangeBuf {
1026 fn eq(&self, other: &RangeBuf) -> bool {
1027 self.off == other.off
1028 }
1029}
1030
1031#[cfg(test)]
1032mod tests {
1033 use super::*;
1034
1035 #[test]
1036 fn recv_flow_control() {
1037 let mut stream = Stream::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1038 assert!(!stream.recv.almost_full());
1039
1040 let mut buf = [0; 32];
1041
1042 let first = RangeBuf::from(b"hello", 0, false);
1043 let second = RangeBuf::from(b"world", 5, false);
1044 let third = RangeBuf::from(b"something", 10, false);
1045
1046 assert_eq!(stream.recv.write(second), Ok(()));
1047 assert_eq!(stream.recv.write(first), Ok(()));
1048 assert!(!stream.recv.almost_full());
1049
1050 assert_eq!(stream.recv.write(third), Err(Error::FlowControl));
1051
1052 let (len, fin) = stream.recv.emit(&mut buf).unwrap();
1053 assert_eq!(&buf[..len], b"helloworld");
1054 assert!(!fin);
1055
1056 assert!(stream.recv.almost_full());
1057
1058 stream.recv.update_max_data(std::time::Instant::now());
1059 assert_eq!(stream.recv.max_data_next(), 25);
1060 assert!(!stream.recv.almost_full());
1061
1062 let third = RangeBuf::from(b"something", 10, false);
1063 assert_eq!(stream.recv.write(third), Ok(()));
1064 }
1065
1066 #[test]
1067 fn recv_past_fin() {
1068 let mut stream = Stream::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1069 assert!(!stream.recv.almost_full());
1070
1071 let first = RangeBuf::from(b"hello", 0, true);
1072 let second = RangeBuf::from(b"world", 5, false);
1073
1074 assert_eq!(stream.recv.write(first), Ok(()));
1075 assert_eq!(stream.recv.write(second), Err(Error::FinalSize));
1076 }
1077
1078 #[test]
1079 fn recv_fin_dup() {
1080 let mut stream = Stream::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1081 assert!(!stream.recv.almost_full());
1082
1083 let first = RangeBuf::from(b"hello", 0, true);
1084 let second = RangeBuf::from(b"hello", 0, true);
1085
1086 assert_eq!(stream.recv.write(first), Ok(()));
1087 assert_eq!(stream.recv.write(second), Ok(()));
1088
1089 let mut buf = [0; 32];
1090
1091 let (len, fin) = stream.recv.emit(&mut buf).unwrap();
1092 assert_eq!(&buf[..len], b"hello");
1093 assert!(fin);
1094 }
1095
1096 #[test]
1097 fn recv_fin_change() {
1098 let mut stream = Stream::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1099 assert!(!stream.recv.almost_full());
1100
1101 let first = RangeBuf::from(b"hello", 0, true);
1102 let second = RangeBuf::from(b"world", 5, true);
1103
1104 assert_eq!(stream.recv.write(second), Ok(()));
1105 assert_eq!(stream.recv.write(first), Err(Error::FinalSize));
1106 }
1107
1108 #[test]
1109 fn recv_fin_lower_than_received() {
1110 let mut stream = Stream::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1111 assert!(!stream.recv.almost_full());
1112
1113 let first = RangeBuf::from(b"hello", 0, true);
1114 let second = RangeBuf::from(b"world", 5, false);
1115
1116 assert_eq!(stream.recv.write(second), Ok(()));
1117 assert_eq!(stream.recv.write(first), Err(Error::FinalSize));
1118 }
1119
1120 #[test]
1121 fn recv_fin_flow_control() {
1122 let mut stream = Stream::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1123 assert!(!stream.recv.almost_full());
1124
1125 let mut buf = [0; 32];
1126
1127 let first = RangeBuf::from(b"hello", 0, false);
1128 let second = RangeBuf::from(b"world", 5, true);
1129
1130 assert_eq!(stream.recv.write(first), Ok(()));
1131 assert_eq!(stream.recv.write(second), Ok(()));
1132
1133 let (len, fin) = stream.recv.emit(&mut buf).unwrap();
1134 assert_eq!(&buf[..len], b"helloworld");
1135 assert!(fin);
1136
1137 assert!(!stream.recv.almost_full());
1138 }
1139
1140 #[test]
1141 fn recv_fin_reset_mismatch() {
1142 let mut stream = Stream::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1143 assert!(!stream.recv.almost_full());
1144
1145 let first = RangeBuf::from(b"hello", 0, true);
1146
1147 assert_eq!(stream.recv.write(first), Ok(()));
1148 assert_eq!(stream.recv.reset(0, 10), Err(Error::FinalSize));
1149 }
1150
1151 #[test]
1152 fn recv_reset_dup() {
1153 let mut stream = Stream::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1154 assert!(!stream.recv.almost_full());
1155
1156 let first = RangeBuf::from(b"hello", 0, false);
1157
1158 assert_eq!(stream.recv.write(first), Ok(()));
1159 assert_eq!(stream.recv.reset(0, 5), Ok(0));
1160 assert_eq!(stream.recv.reset(0, 5), Ok(0));
1161 }
1162
1163 #[test]
1164 fn recv_reset_change() {
1165 let mut stream = Stream::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1166 assert!(!stream.recv.almost_full());
1167
1168 let first = RangeBuf::from(b"hello", 0, false);
1169
1170 assert_eq!(stream.recv.write(first), Ok(()));
1171 assert_eq!(stream.recv.reset(0, 5), Ok(0));
1172 assert_eq!(stream.recv.reset(0, 10), Err(Error::FinalSize));
1173 }
1174
1175 #[test]
1176 fn recv_reset_lower_than_received() {
1177 let mut stream = Stream::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1178 assert!(!stream.recv.almost_full());
1179
1180 let first = RangeBuf::from(b"hello", 0, false);
1181
1182 assert_eq!(stream.recv.write(first), Ok(()));
1183 assert_eq!(stream.recv.reset(0, 4), Err(Error::FinalSize));
1184 }
1185
1186 #[test]
1187 fn send_flow_control() {
1188 let mut buf = [0; 25];
1189
1190 let mut stream = Stream::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1191
1192 let first = b"hello";
1193 let second = b"world";
1194 let third = b"something";
1195
1196 assert!(stream.send.write(first, false).is_ok());
1197 assert!(stream.send.write(second, false).is_ok());
1198 assert!(stream.send.write(third, false).is_ok());
1199
1200 assert_eq!(stream.send.off_front(), 0);
1201
1202 let (written, fin) = stream.send.emit(&mut buf[..25]).unwrap();
1203 assert_eq!(written, 15);
1204 assert!(!fin);
1205 assert_eq!(&buf[..written], b"helloworldsomet");
1206
1207 assert_eq!(stream.send.off_front(), 15);
1208
1209 let (written, fin) = stream.send.emit(&mut buf[..25]).unwrap();
1210 assert_eq!(written, 0);
1211 assert!(!fin);
1212 assert_eq!(&buf[..written], b"");
1213
1214 stream.send.retransmit(0, 15);
1215
1216 assert_eq!(stream.send.off_front(), 0);
1217
1218 let (written, fin) = stream.send.emit(&mut buf[..10]).unwrap();
1219 assert_eq!(written, 10);
1220 assert!(!fin);
1221 assert_eq!(&buf[..written], b"helloworld");
1222
1223 assert_eq!(stream.send.off_front(), 10);
1224
1225 let (written, fin) = stream.send.emit(&mut buf[..10]).unwrap();
1226 assert_eq!(written, 5);
1227 assert!(!fin);
1228 assert_eq!(&buf[..written], b"somet");
1229 }
1230
1231 #[test]
1232 fn send_past_fin() {
1233 let mut stream = Stream::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1234
1235 let first = b"hello";
1236 let second = b"world";
1237 let third = b"third";
1238
1239 assert_eq!(stream.send.write(first, false), Ok(5));
1240
1241 assert_eq!(stream.send.write(second, true), Ok(5));
1242 assert!(stream.send.is_fin());
1243
1244 assert_eq!(stream.send.write(third, false), Err(Error::FinalSize));
1245 }
1246
1247 #[test]
1248 fn send_fin_dup() {
1249 let mut stream = Stream::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1250
1251 assert_eq!(stream.send.write(b"hello", true), Ok(5));
1252 assert!(stream.send.is_fin());
1253
1254 assert_eq!(stream.send.write(b"", true), Ok(0));
1255 assert!(stream.send.is_fin());
1256 }
1257
1258 #[test]
1259 fn send_undo_fin() {
1260 let mut stream = Stream::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1261
1262 assert_eq!(stream.send.write(b"hello", true), Ok(5));
1263 assert!(stream.send.is_fin());
1264
1265 assert_eq!(
1266 stream.send.write(b"helloworld", true),
1267 Err(Error::FinalSize)
1268 );
1269 }
1270
1271 #[test]
1272 fn send_fin_max_data_match() {
1273 let mut buf = [0; 15];
1274
1275 let mut stream = Stream::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1276
1277 let slice = b"hellohellohello";
1278
1279 assert!(stream.send.write(slice, true).is_ok());
1280
1281 let (written, fin) = stream.send.emit(&mut buf[..15]).unwrap();
1282 assert_eq!(written, 15);
1283 assert!(fin);
1284 assert_eq!(&buf[..written], slice);
1285 }
1286
1287 #[test]
1288 fn send_fin_zero_length() {
1289 let mut buf = [0; 5];
1290
1291 let mut stream = Stream::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1292
1293 assert_eq!(stream.send.write(b"hello", false), Ok(5));
1294 assert_eq!(stream.send.write(b"", true), Ok(0));
1295 assert!(stream.send.is_fin());
1296
1297 let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1298 assert_eq!(written, 5);
1299 assert!(fin);
1300 assert_eq!(&buf[..written], b"hello");
1301 }
1302
1303 #[test]
1304 fn send_ack() {
1305 let mut buf = [0; 5];
1306
1307 let mut stream = Stream::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1308
1309 assert_eq!(stream.send.write(b"hello", false), Ok(5));
1310 assert_eq!(stream.send.write(b"world", false), Ok(5));
1311 assert_eq!(stream.send.write(b"", true), Ok(0));
1312 assert!(stream.send.is_fin());
1313
1314 assert_eq!(stream.send.off_front(), 0);
1315
1316 let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1317 assert_eq!(written, 5);
1318 assert!(!fin);
1319 assert_eq!(&buf[..written], b"hello");
1320
1321 stream.send.ack_and_drop(0, 5);
1322
1323 stream.send.retransmit(0, 5);
1324
1325 assert_eq!(stream.send.off_front(), 5);
1326
1327 let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1328 assert_eq!(written, 5);
1329 assert!(fin);
1330 assert_eq!(&buf[..written], b"world");
1331 }
1332
1333 #[test]
1334 fn send_ack_reordering() {
1335 let mut buf = [0; 5];
1336
1337 let mut stream = Stream::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1338
1339 assert_eq!(stream.send.write(b"hello", false), Ok(5));
1340 assert_eq!(stream.send.write(b"world", false), Ok(5));
1341 assert_eq!(stream.send.write(b"", true), Ok(0));
1342 assert!(stream.send.is_fin());
1343
1344 assert_eq!(stream.send.off_front(), 0);
1345
1346 let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1347 assert_eq!(written, 5);
1348 assert!(!fin);
1349 assert_eq!(&buf[..written], b"hello");
1350
1351 assert_eq!(stream.send.off_front(), 5);
1352
1353 let (written, fin) = stream.send.emit(&mut buf[..1]).unwrap();
1354 assert_eq!(written, 1);
1355 assert!(!fin);
1356 assert_eq!(&buf[..written], b"w");
1357
1358 stream.send.ack_and_drop(5, 1);
1359 stream.send.ack_and_drop(0, 5);
1360
1361 stream.send.retransmit(0, 5);
1362 stream.send.retransmit(5, 1);
1363
1364 assert_eq!(stream.send.off_front(), 6);
1365
1366 let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1367 assert_eq!(written, 4);
1368 assert!(fin);
1369 assert_eq!(&buf[..written], b"orld");
1370 }
1371
1372 #[test]
1373 fn recv_data_below_off() {
1374 let mut stream = Stream::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1375
1376 let first = RangeBuf::from(b"hello", 0, false);
1377
1378 assert_eq!(stream.recv.write(first), Ok(()));
1379
1380 let mut buf = [0; 10];
1381
1382 let (len, fin) = stream.recv.emit(&mut buf).unwrap();
1383 assert_eq!(&buf[..len], b"hello");
1384 assert!(!fin);
1385
1386 let first = RangeBuf::from(b"elloworld", 1, true);
1387 assert_eq!(stream.recv.write(first), Ok(()));
1388
1389 let (len, fin) = stream.recv.emit(&mut buf).unwrap();
1390 assert_eq!(&buf[..len], b"world");
1391 assert!(fin);
1392 }
1393
1394 #[test]
1395 fn stream_complete() {
1396 let mut stream =
1397 Stream::new(0, 30, 30, true, true, DEFAULT_STREAM_WINDOW);
1398
1399 assert_eq!(stream.send.write(b"hello", false), Ok(5));
1400 assert_eq!(stream.send.write(b"world", false), Ok(5));
1401
1402 assert!(!stream.send.is_complete());
1403 assert!(!stream.send.is_fin());
1404
1405 assert_eq!(stream.send.write(b"", true), Ok(0));
1406
1407 assert!(!stream.send.is_complete());
1408 assert!(stream.send.is_fin());
1409
1410 let buf = RangeBuf::from(b"hello", 0, true);
1411 assert!(stream.recv.write(buf).is_ok());
1412 assert!(!stream.recv.is_fin());
1413
1414 stream.send.ack(6, 4);
1415 assert!(!stream.send.is_complete());
1416
1417 let mut buf = [0; 2];
1418 assert_eq!(stream.recv.emit(&mut buf), Ok((2, false)));
1419 assert!(!stream.recv.is_fin());
1420
1421 stream.send.ack(1, 5);
1422 assert!(!stream.send.is_complete());
1423
1424 stream.send.ack(0, 1);
1425 assert!(stream.send.is_complete());
1426
1427 assert!(!stream.is_complete());
1428
1429 let mut buf = [0; 3];
1430 assert_eq!(stream.recv.emit(&mut buf), Ok((3, true)));
1431 assert!(stream.recv.is_fin());
1432
1433 assert!(stream.is_complete());
1434 }
1435
1436 #[test]
1437 fn send_fin_zero_length_output() {
1438 let mut buf = [0; 5];
1439
1440 let mut stream = Stream::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1441
1442 assert_eq!(stream.send.write(b"hello", false), Ok(5));
1443 assert_eq!(stream.send.off_front(), 0);
1444 assert!(!stream.send.is_fin());
1445
1446 let (written, fin) = stream.send.emit(&mut buf).unwrap();
1447 assert_eq!(written, 5);
1448 assert!(!fin);
1449 assert_eq!(&buf[..written], b"hello");
1450
1451 assert_eq!(stream.send.write(b"", true), Ok(0));
1452 assert!(stream.send.is_fin());
1453 assert_eq!(stream.send.off_front(), 5);
1454
1455 let (written, fin) = stream.send.emit(&mut buf).unwrap();
1456 assert_eq!(written, 0);
1457 assert!(fin);
1458 assert_eq!(&buf[..written], b"");
1459 }
1460
1461 fn stream_send_ready(stream: &Stream) -> bool {
1462 !stream.send.is_empty() &&
1463 stream.send.off_front() < stream.send.off_back()
1464 }
1465
1466 #[test]
1467 fn send_emit() {
1468 let mut buf = [0; 5];
1469
1470 let mut stream = Stream::new(0, 0, 20, true, true, DEFAULT_STREAM_WINDOW);
1471
1472 assert_eq!(stream.send.write(b"hello", false), Ok(5));
1473 assert_eq!(stream.send.write(b"world", false), Ok(5));
1474 assert_eq!(stream.send.write(b"olleh", false), Ok(5));
1475 assert_eq!(stream.send.write(b"dlrow", true), Ok(5));
1476 assert_eq!(stream.send.off_front(), 0);
1477 assert_eq!(stream.send.bufs_count(), 4);
1478
1479 assert!(stream.is_flushable());
1480
1481 assert!(stream_send_ready(&stream));
1482 assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1483 assert_eq!(stream.send.off_front(), 4);
1484 assert_eq!(&buf[..4], b"hell");
1485
1486 assert!(stream_send_ready(&stream));
1487 assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1488 assert_eq!(stream.send.off_front(), 8);
1489 assert_eq!(&buf[..4], b"owor");
1490
1491 assert!(stream_send_ready(&stream));
1492 assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1493 assert_eq!(stream.send.off_front(), 10);
1494 assert_eq!(&buf[..2], b"ld");
1495
1496 assert!(stream_send_ready(&stream));
1497 assert_eq!(stream.send.emit(&mut buf[..1]), Ok((1, false)));
1498 assert_eq!(stream.send.off_front(), 11);
1499 assert_eq!(&buf[..1], b"o");
1500
1501 assert!(stream_send_ready(&stream));
1502 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1503 assert_eq!(stream.send.off_front(), 16);
1504 assert_eq!(&buf[..5], b"llehd");
1505
1506 assert!(stream_send_ready(&stream));
1507 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((4, true)));
1508 assert_eq!(stream.send.off_front(), 20);
1509 assert_eq!(&buf[..4], b"lrow");
1510
1511 assert!(!stream.is_flushable());
1512
1513 assert!(!stream_send_ready(&stream));
1514 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((0, true)));
1515 assert_eq!(stream.send.off_front(), 20);
1516 }
1517
1518 #[test]
1519 fn send_emit_ack() {
1520 let mut buf = [0; 5];
1521
1522 let mut stream = Stream::new(0, 0, 20, true, true, DEFAULT_STREAM_WINDOW);
1523
1524 assert_eq!(stream.send.write(b"hello", false), Ok(5));
1525 assert_eq!(stream.send.write(b"world", false), Ok(5));
1526 assert_eq!(stream.send.write(b"olleh", false), Ok(5));
1527 assert_eq!(stream.send.write(b"dlrow", true), Ok(5));
1528 assert_eq!(stream.send.off_front(), 0);
1529 assert_eq!(stream.send.bufs_count(), 4);
1530
1531 assert!(stream.is_flushable());
1532
1533 assert!(stream_send_ready(&stream));
1534 assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1535 assert_eq!(stream.send.off_front(), 4);
1536 assert_eq!(&buf[..4], b"hell");
1537
1538 assert!(stream_send_ready(&stream));
1539 assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1540 assert_eq!(stream.send.off_front(), 8);
1541 assert_eq!(&buf[..4], b"owor");
1542
1543 stream.send.ack_and_drop(0, 5);
1544 assert_eq!(stream.send.bufs_count(), 3);
1545
1546 assert!(stream_send_ready(&stream));
1547 assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1548 assert_eq!(stream.send.off_front(), 10);
1549 assert_eq!(&buf[..2], b"ld");
1550
1551 stream.send.ack_and_drop(7, 5);
1552 assert_eq!(stream.send.bufs_count(), 3);
1553
1554 assert!(stream_send_ready(&stream));
1555 assert_eq!(stream.send.emit(&mut buf[..1]), Ok((1, false)));
1556 assert_eq!(stream.send.off_front(), 11);
1557 assert_eq!(&buf[..1], b"o");
1558
1559 assert!(stream_send_ready(&stream));
1560 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1561 assert_eq!(stream.send.off_front(), 16);
1562 assert_eq!(&buf[..5], b"llehd");
1563
1564 stream.send.ack_and_drop(5, 7);
1565 assert_eq!(stream.send.bufs_count(), 2);
1566
1567 assert!(stream_send_ready(&stream));
1568 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((4, true)));
1569 assert_eq!(stream.send.off_front(), 20);
1570 assert_eq!(&buf[..4], b"lrow");
1571
1572 assert!(!stream.is_flushable());
1573
1574 assert!(!stream_send_ready(&stream));
1575 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((0, true)));
1576 assert_eq!(stream.send.off_front(), 20);
1577
1578 stream.send.ack_and_drop(22, 4);
1579 assert_eq!(stream.send.bufs_count(), 2);
1580
1581 stream.send.ack_and_drop(20, 1);
1582 assert_eq!(stream.send.bufs_count(), 2);
1583 }
1584
1585 #[test]
1586 fn send_emit_retransmit() {
1587 let mut buf = [0; 5];
1588
1589 let mut stream = Stream::new(0, 0, 20, true, true, DEFAULT_STREAM_WINDOW);
1590
1591 assert_eq!(stream.send.write(b"hello", false), Ok(5));
1592 assert_eq!(stream.send.write(b"world", false), Ok(5));
1593 assert_eq!(stream.send.write(b"olleh", false), Ok(5));
1594 assert_eq!(stream.send.write(b"dlrow", true), Ok(5));
1595 assert_eq!(stream.send.off_front(), 0);
1596 assert_eq!(stream.send.bufs_count(), 4);
1597
1598 assert!(stream.is_flushable());
1599
1600 assert!(stream_send_ready(&stream));
1601 assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1602 assert_eq!(stream.send.off_front(), 4);
1603 assert_eq!(&buf[..4], b"hell");
1604
1605 assert!(stream_send_ready(&stream));
1606 assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1607 assert_eq!(stream.send.off_front(), 8);
1608 assert_eq!(&buf[..4], b"owor");
1609
1610 stream.send.retransmit(3, 3);
1611 assert_eq!(stream.send.off_front(), 3);
1612
1613 assert!(stream_send_ready(&stream));
1614 assert_eq!(stream.send.emit(&mut buf[..3]), Ok((3, false)));
1615 assert_eq!(stream.send.off_front(), 8);
1616 assert_eq!(&buf[..3], b"low");
1617
1618 assert!(stream_send_ready(&stream));
1619 assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1620 assert_eq!(stream.send.off_front(), 10);
1621 assert_eq!(&buf[..2], b"ld");
1622
1623 stream.send.ack_and_drop(7, 2);
1624
1625 stream.send.retransmit(8, 2);
1626
1627 assert!(stream_send_ready(&stream));
1628 assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1629 assert_eq!(stream.send.off_front(), 10);
1630 assert_eq!(&buf[..2], b"ld");
1631
1632 assert!(stream_send_ready(&stream));
1633 assert_eq!(stream.send.emit(&mut buf[..1]), Ok((1, false)));
1634 assert_eq!(stream.send.off_front(), 11);
1635 assert_eq!(&buf[..1], b"o");
1636
1637 assert!(stream_send_ready(&stream));
1638 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1639 assert_eq!(stream.send.off_front(), 16);
1640 assert_eq!(&buf[..5], b"llehd");
1641
1642 stream.send.retransmit(12, 2);
1643
1644 assert!(stream_send_ready(&stream));
1645 assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1646 assert_eq!(stream.send.off_front(), 16);
1647 assert_eq!(&buf[..2], b"le");
1648
1649 assert!(stream_send_ready(&stream));
1650 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((4, true)));
1651 assert_eq!(stream.send.off_front(), 20);
1652 assert_eq!(&buf[..4], b"lrow");
1653
1654 assert!(!stream.is_flushable());
1655
1656 assert!(!stream_send_ready(&stream));
1657 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((0, true)));
1658 assert_eq!(stream.send.off_front(), 20);
1659
1660 stream.send.retransmit(7, 12);
1661
1662 assert!(stream_send_ready(&stream));
1663 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1664 assert_eq!(stream.send.off_front(), 12);
1665 assert_eq!(&buf[..5], b"rldol");
1666
1667 assert!(stream_send_ready(&stream));
1668 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1669 assert_eq!(stream.send.off_front(), 17);
1670 assert_eq!(&buf[..5], b"lehdl");
1671
1672 assert!(stream_send_ready(&stream));
1673 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((2, false)));
1674 assert_eq!(stream.send.off_front(), 20);
1675 assert_eq!(&buf[..2], b"ro");
1676
1677 stream.send.ack_and_drop(12, 7);
1678
1679 stream.send.retransmit(7, 12);
1680
1681 assert!(stream_send_ready(&stream));
1682 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1683 assert_eq!(stream.send.off_front(), 12);
1684 assert_eq!(&buf[..5], b"rldol");
1685
1686 assert!(stream_send_ready(&stream));
1687 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1688 assert_eq!(stream.send.off_front(), 17);
1689 assert_eq!(&buf[..5], b"lehdl");
1690
1691 assert!(stream_send_ready(&stream));
1692 assert_eq!(stream.send.emit(&mut buf[..5]), Ok((2, false)));
1693 assert_eq!(stream.send.off_front(), 20);
1694 assert_eq!(&buf[..2], b"ro");
1695 }
1696
1697 #[test]
1698 fn rangebuf_split_off() {
1699 let mut buf = RangeBuf::from(b"helloworld", 5, true);
1700 assert_eq!(buf.start, 0);
1701 assert_eq!(buf.pos, 0);
1702 assert_eq!(buf.len, 10);
1703 assert_eq!(buf.off, 5);
1704 assert!(buf.fin);
1705
1706 assert_eq!(buf.len(), 10);
1707 assert_eq!(buf.off(), 5);
1708 assert!(buf.fin());
1709
1710 assert_eq!(&buf[..], b"helloworld");
1711
1712 buf.consume(5);
1714
1715 assert_eq!(buf.start, 0);
1716 assert_eq!(buf.pos, 5);
1717 assert_eq!(buf.len, 10);
1718 assert_eq!(buf.off, 5);
1719 assert!(buf.fin);
1720
1721 assert_eq!(buf.len(), 5);
1722 assert_eq!(buf.off(), 10);
1723 assert!(buf.fin());
1724
1725 assert_eq!(&buf[..], b"world");
1726
1727 let mut new_buf = buf.split_off(3);
1729
1730 assert_eq!(buf.start, 0);
1731 assert_eq!(buf.pos, 3);
1732 assert_eq!(buf.len, 3);
1733 assert_eq!(buf.off, 5);
1734 assert!(!buf.fin);
1735
1736 assert_eq!(buf.len(), 0);
1737 assert_eq!(buf.off(), 8);
1738 assert!(!buf.fin());
1739
1740 assert_eq!(&buf[..], b"");
1741
1742 assert_eq!(new_buf.start, 3);
1743 assert_eq!(new_buf.pos, 5);
1744 assert_eq!(new_buf.len, 7);
1745 assert_eq!(new_buf.off, 8);
1746 assert!(new_buf.fin);
1747
1748 assert_eq!(new_buf.len(), 5);
1749 assert_eq!(new_buf.off(), 10);
1750 assert!(new_buf.fin());
1751
1752 assert_eq!(&new_buf[..], b"world");
1753
1754 new_buf.consume(2);
1756
1757 assert_eq!(new_buf.start, 3);
1758 assert_eq!(new_buf.pos, 7);
1759 assert_eq!(new_buf.len, 7);
1760 assert_eq!(new_buf.off, 8);
1761 assert!(new_buf.fin);
1762
1763 assert_eq!(new_buf.len(), 3);
1764 assert_eq!(new_buf.off(), 12);
1765 assert!(new_buf.fin());
1766
1767 assert_eq!(&new_buf[..], b"rld");
1768
1769 let mut new_new_buf = new_buf.split_off(5);
1771
1772 assert_eq!(new_buf.start, 3);
1773 assert_eq!(new_buf.pos, 7);
1774 assert_eq!(new_buf.len, 5);
1775 assert_eq!(new_buf.off, 8);
1776 assert!(!new_buf.fin);
1777
1778 assert_eq!(new_buf.len(), 1);
1779 assert_eq!(new_buf.off(), 12);
1780 assert!(!new_buf.fin());
1781
1782 assert_eq!(&new_buf[..], b"r");
1783
1784 assert_eq!(new_new_buf.start, 8);
1785 assert_eq!(new_new_buf.pos, 8);
1786 assert_eq!(new_new_buf.len, 2);
1787 assert_eq!(new_new_buf.off, 13);
1788 assert!(new_new_buf.fin);
1789
1790 assert_eq!(new_new_buf.len(), 2);
1791 assert_eq!(new_new_buf.off(), 13);
1792 assert!(new_new_buf.fin());
1793
1794 assert_eq!(&new_new_buf[..], b"ld");
1795
1796 new_new_buf.consume(2);
1798
1799 assert_eq!(new_new_buf.start, 8);
1800 assert_eq!(new_new_buf.pos, 10);
1801 assert_eq!(new_new_buf.len, 2);
1802 assert_eq!(new_new_buf.off, 13);
1803 assert!(new_new_buf.fin);
1804
1805 assert_eq!(new_new_buf.len(), 0);
1806 assert_eq!(new_new_buf.off(), 15);
1807 assert!(new_new_buf.fin());
1808
1809 assert_eq!(&new_new_buf[..], b"");
1810 }
1811
1812 #[test]
1815 fn stream_limit_auto_open() {
1816 let local_tp = crate::TransportParams::default();
1817 let peer_tp = crate::TransportParams::default();
1818
1819 let mut streams = StreamMap::new(5, 5, 5);
1820
1821 let stream_id = 500;
1822 assert!(!is_local(stream_id, true), "stream id is peer initiated");
1823 assert!(is_bidi(stream_id), "stream id is bidirectional");
1824 assert_eq!(
1825 streams
1826 .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
1827 .err(),
1828 Some(Error::StreamLimit),
1829 "stream limit should be exceeded"
1830 );
1831 }
1832
1833 #[test]
1836 fn stream_create_out_of_order() {
1837 let local_tp = crate::TransportParams::default();
1838 let peer_tp = crate::TransportParams::default();
1839
1840 let mut streams = StreamMap::new(5, 5, 5);
1841
1842 for stream_id in [8, 12, 4] {
1843 assert!(is_local(stream_id, false), "stream id is client initiated");
1844 assert!(is_bidi(stream_id), "stream id is bidirectional");
1845 assert!(streams
1846 .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
1847 .is_ok());
1848 }
1849 }
1850
1851 #[test]
1853 fn stream_limit_edge() {
1854 let local_tp = crate::TransportParams::default();
1855 let peer_tp = crate::TransportParams::default();
1856
1857 let mut streams = StreamMap::new(3, 3, 3);
1858
1859 let stream_id = 8;
1861 assert!(streams
1862 .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
1863 .is_ok());
1864
1865 let stream_id = 12;
1867 assert_eq!(
1868 streams
1869 .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
1870 .err(),
1871 Some(Error::StreamLimit)
1872 );
1873 }
1874
1875 fn cycle_stream_priority(stream_id: u64, streams: &mut StreamMap) {
1876 let key = streams.get(stream_id).unwrap().priority_key.clone();
1877 streams.update_priority(&key.clone(), &key);
1878 }
1879
1880 #[test]
1881 fn writable_prioritized_default_priority() {
1882 let local_tp = crate::TransportParams::default();
1883 let peer_tp = crate::TransportParams {
1884 initial_max_stream_data_bidi_local: 100,
1885 initial_max_stream_data_uni: 100,
1886 ..Default::default()
1887 };
1888
1889 let mut streams = StreamMap::new(100, 100, 100);
1890
1891 for id in [0, 4, 8, 12] {
1892 assert!(streams
1893 .get_or_create(id, &local_tp, &peer_tp, false, true)
1894 .is_ok());
1895 }
1896
1897 let walk_1: Vec<u64> = streams.writable().collect();
1898 cycle_stream_priority(*walk_1.first().unwrap(), &mut streams);
1899 let walk_2: Vec<u64> = streams.writable().collect();
1900 cycle_stream_priority(*walk_2.first().unwrap(), &mut streams);
1901 let walk_3: Vec<u64> = streams.writable().collect();
1902 cycle_stream_priority(*walk_3.first().unwrap(), &mut streams);
1903 let walk_4: Vec<u64> = streams.writable().collect();
1904 cycle_stream_priority(*walk_4.first().unwrap(), &mut streams);
1905 let walk_5: Vec<u64> = streams.writable().collect();
1906
1907 assert_eq!(walk_1, vec![0, 4, 8, 12]);
1910 assert_eq!(walk_2, vec![4, 8, 12, 0]);
1911 assert_eq!(walk_3, vec![8, 12, 0, 4]);
1912 assert_eq!(walk_4, vec![12, 0, 4, 8,]);
1913 assert_eq!(walk_5, vec![0, 4, 8, 12]);
1914 }
1915
1916 #[test]
1917 fn writable_prioritized_insert_order() {
1918 let local_tp = crate::TransportParams::default();
1919 let peer_tp = crate::TransportParams {
1920 initial_max_stream_data_bidi_local: 100,
1921 initial_max_stream_data_uni: 100,
1922 ..Default::default()
1923 };
1924
1925 let mut streams = StreamMap::new(100, 100, 100);
1926
1927 for id in [12, 4, 8, 0] {
1930 assert!(streams
1931 .get_or_create(id, &local_tp, &peer_tp, false, true)
1932 .is_ok());
1933 }
1934
1935 let walk_1: Vec<u64> = streams.writable().collect();
1936 cycle_stream_priority(*walk_1.first().unwrap(), &mut streams);
1937 let walk_2: Vec<u64> = streams.writable().collect();
1938 cycle_stream_priority(*walk_2.first().unwrap(), &mut streams);
1939 let walk_3: Vec<u64> = streams.writable().collect();
1940 cycle_stream_priority(*walk_3.first().unwrap(), &mut streams);
1941 let walk_4: Vec<u64> = streams.writable().collect();
1942 cycle_stream_priority(*walk_4.first().unwrap(), &mut streams);
1943 let walk_5: Vec<u64> = streams.writable().collect();
1944 assert_eq!(walk_1, vec![12, 4, 8, 0]);
1945 assert_eq!(walk_2, vec![4, 8, 0, 12]);
1946 assert_eq!(walk_3, vec![8, 0, 12, 4,]);
1947 assert_eq!(walk_4, vec![0, 12, 4, 8]);
1948 assert_eq!(walk_5, vec![12, 4, 8, 0]);
1949 }
1950
1951 #[test]
1952 fn writable_prioritized_mixed_urgency() {
1953 let local_tp = crate::TransportParams::default();
1954 let peer_tp = crate::TransportParams {
1955 initial_max_stream_data_bidi_local: 100,
1956 initial_max_stream_data_uni: 100,
1957 ..Default::default()
1958 };
1959
1960 let mut streams = StreamMap::new(100, 100, 100);
1961
1962 let input = vec![
1965 (0, 100),
1966 (4, 90),
1967 (8, 80),
1968 (12, 70),
1969 (16, 60),
1970 (20, 50),
1971 (24, 40),
1972 (28, 30),
1973 (32, 20),
1974 (36, 10),
1975 (40, 0),
1976 ];
1977
1978 for (id, urgency) in input.clone() {
1979 let stream = streams
1982 .get_or_create(id, &local_tp, &peer_tp, false, true)
1983 .unwrap();
1984
1985 stream.urgency = urgency;
1986
1987 let new_priority_key = Arc::new(StreamPriorityKey {
1988 urgency: stream.urgency,
1989 incremental: stream.incremental,
1990 id,
1991 ..Default::default()
1992 });
1993
1994 let old_priority_key = std::mem::replace(
1995 &mut stream.priority_key,
1996 new_priority_key.clone(),
1997 );
1998
1999 streams.update_priority(&old_priority_key, &new_priority_key);
2000 }
2001
2002 let walk_1: Vec<u64> = streams.writable().collect();
2003 assert_eq!(walk_1, vec![40, 36, 32, 28, 24, 20, 16, 12, 8, 4, 0]);
2004
2005 for (id, urgency) in input {
2007 let stream = streams
2010 .get_or_create(id, &local_tp, &peer_tp, false, true)
2011 .unwrap();
2012
2013 stream.urgency = urgency;
2014
2015 let new_priority_key = Arc::new(StreamPriorityKey {
2016 urgency: stream.urgency,
2017 incremental: stream.incremental,
2018 id,
2019 ..Default::default()
2020 });
2021
2022 let old_priority_key = std::mem::replace(
2023 &mut stream.priority_key,
2024 new_priority_key.clone(),
2025 );
2026
2027 streams.update_priority(&old_priority_key, &new_priority_key);
2028 }
2029
2030 let walk_2: Vec<u64> = streams.writable().collect();
2031 assert_eq!(walk_2, vec![40, 36, 32, 28, 24, 20, 16, 12, 8, 4, 0]);
2032
2033 streams.collect(24, true);
2035
2036 let walk_3: Vec<u64> = streams.writable().collect();
2037 assert_eq!(walk_3, vec![40, 36, 32, 28, 20, 16, 12, 8, 4, 0]);
2038
2039 streams.collect(40, true);
2040 streams.collect(0, true);
2041
2042 let walk_4: Vec<u64> = streams.writable().collect();
2043 assert_eq!(walk_4, vec![36, 32, 28, 20, 16, 12, 8, 4]);
2044
2045 streams
2047 .get_or_create(44, &local_tp, &peer_tp, false, true)
2048 .unwrap();
2049
2050 let walk_5: Vec<u64> = streams.writable().collect();
2051 assert_eq!(walk_5, vec![36, 32, 28, 20, 16, 12, 8, 4, 44]);
2052 }
2053
2054 #[test]
2055 fn writable_prioritized_mixed_urgencies_incrementals() {
2056 let local_tp = crate::TransportParams::default();
2057 let peer_tp = crate::TransportParams {
2058 initial_max_stream_data_bidi_local: 100,
2059 initial_max_stream_data_uni: 100,
2060 ..Default::default()
2061 };
2062
2063 let mut streams = StreamMap::new(100, 100, 100);
2064
2065 let input = vec![
2067 (0, 100),
2068 (4, 20),
2069 (8, 100),
2070 (12, 20),
2071 (16, 90),
2072 (20, 25),
2073 (24, 90),
2074 (28, 30),
2075 (32, 80),
2076 (36, 20),
2077 (40, 0),
2078 ];
2079
2080 for (id, urgency) in input.clone() {
2081 let stream = streams
2084 .get_or_create(id, &local_tp, &peer_tp, false, true)
2085 .unwrap();
2086
2087 stream.urgency = urgency;
2088
2089 let new_priority_key = Arc::new(StreamPriorityKey {
2090 urgency: stream.urgency,
2091 incremental: stream.incremental,
2092 id,
2093 ..Default::default()
2094 });
2095
2096 let old_priority_key = std::mem::replace(
2097 &mut stream.priority_key,
2098 new_priority_key.clone(),
2099 );
2100
2101 streams.update_priority(&old_priority_key, &new_priority_key);
2102 }
2103
2104 let walk_1: Vec<u64> = streams.writable().collect();
2105 cycle_stream_priority(4, &mut streams);
2106 cycle_stream_priority(16, &mut streams);
2107 cycle_stream_priority(0, &mut streams);
2108 let walk_2: Vec<u64> = streams.writable().collect();
2109 cycle_stream_priority(12, &mut streams);
2110 cycle_stream_priority(24, &mut streams);
2111 cycle_stream_priority(8, &mut streams);
2112 let walk_3: Vec<u64> = streams.writable().collect();
2113 cycle_stream_priority(36, &mut streams);
2114 cycle_stream_priority(16, &mut streams);
2115 cycle_stream_priority(0, &mut streams);
2116 let walk_4: Vec<u64> = streams.writable().collect();
2117 cycle_stream_priority(4, &mut streams);
2118 cycle_stream_priority(24, &mut streams);
2119 cycle_stream_priority(8, &mut streams);
2120 let walk_5: Vec<u64> = streams.writable().collect();
2121 cycle_stream_priority(12, &mut streams);
2122 cycle_stream_priority(16, &mut streams);
2123 cycle_stream_priority(0, &mut streams);
2124 let walk_6: Vec<u64> = streams.writable().collect();
2125 cycle_stream_priority(36, &mut streams);
2126 cycle_stream_priority(24, &mut streams);
2127 cycle_stream_priority(8, &mut streams);
2128 let walk_7: Vec<u64> = streams.writable().collect();
2129 cycle_stream_priority(4, &mut streams);
2130 cycle_stream_priority(16, &mut streams);
2131 cycle_stream_priority(0, &mut streams);
2132 let walk_8: Vec<u64> = streams.writable().collect();
2133 cycle_stream_priority(12, &mut streams);
2134 cycle_stream_priority(24, &mut streams);
2135 cycle_stream_priority(8, &mut streams);
2136 let walk_9: Vec<u64> = streams.writable().collect();
2137 cycle_stream_priority(36, &mut streams);
2138 cycle_stream_priority(16, &mut streams);
2139 cycle_stream_priority(0, &mut streams);
2140
2141 assert_eq!(walk_1, vec![40, 4, 12, 36, 20, 28, 32, 16, 24, 0, 8]);
2142 assert_eq!(walk_2, vec![40, 12, 36, 4, 20, 28, 32, 24, 16, 8, 0]);
2143 assert_eq!(walk_3, vec![40, 36, 4, 12, 20, 28, 32, 16, 24, 0, 8]);
2144 assert_eq!(walk_4, vec![40, 4, 12, 36, 20, 28, 32, 24, 16, 8, 0]);
2145 assert_eq!(walk_5, vec![40, 12, 36, 4, 20, 28, 32, 16, 24, 0, 8]);
2146 assert_eq!(walk_6, vec![40, 36, 4, 12, 20, 28, 32, 24, 16, 8, 0]);
2147 assert_eq!(walk_7, vec![40, 4, 12, 36, 20, 28, 32, 16, 24, 0, 8]);
2148 assert_eq!(walk_8, vec![40, 12, 36, 4, 20, 28, 32, 24, 16, 8, 0]);
2149 assert_eq!(walk_9, vec![40, 36, 4, 12, 20, 28, 32, 16, 24, 0, 8]);
2150
2151 streams.collect(20, true);
2153
2154 let walk_10: Vec<u64> = streams.writable().collect();
2155 assert_eq!(walk_10, vec![40, 4, 12, 36, 28, 32, 24, 16, 8, 0]);
2156
2157 let stream = streams
2159 .get_or_create(44, &local_tp, &peer_tp, false, true)
2160 .unwrap();
2161
2162 stream.urgency = 20;
2163 stream.incremental = true;
2164
2165 let new_priority_key = Arc::new(StreamPriorityKey {
2166 urgency: stream.urgency,
2167 incremental: stream.incremental,
2168 id: 44,
2169 ..Default::default()
2170 });
2171
2172 let old_priority_key =
2173 std::mem::replace(&mut stream.priority_key, new_priority_key.clone());
2174
2175 streams.update_priority(&old_priority_key, &new_priority_key);
2176
2177 let walk_11: Vec<u64> = streams.writable().collect();
2178 assert_eq!(walk_11, vec![40, 4, 12, 36, 44, 28, 32, 24, 16, 8, 0]);
2179 }
2180
2181 #[test]
2182 fn priority_tree_dupes() {
2183 let mut prioritized_writable: RBTree<StreamWritablePriorityAdapter> =
2184 Default::default();
2185
2186 for id in [0, 4, 8, 12] {
2187 let s = Arc::new(StreamPriorityKey {
2188 urgency: 0,
2189 incremental: false,
2190 id,
2191 ..Default::default()
2192 });
2193
2194 prioritized_writable.insert(s);
2195 }
2196
2197 let walk_1: Vec<u64> =
2198 prioritized_writable.iter().map(|s| s.id).collect();
2199 assert_eq!(walk_1, vec![0, 4, 8, 12]);
2200
2201 for id in [0, 4, 8, 12] {
2204 let s = Arc::new(StreamPriorityKey {
2205 urgency: 0,
2206 incremental: false,
2207 id,
2208 ..Default::default()
2209 });
2210
2211 prioritized_writable.insert(s);
2212 }
2213
2214 let walk_2: Vec<u64> =
2215 prioritized_writable.iter().map(|s| s.id).collect();
2216 assert_eq!(walk_2, vec![0, 0, 4, 4, 8, 8, 12, 12]);
2217 }
2218}
2219
2220mod recv_buf;
2221mod send_buf;