Skip to main content

quiche/stream/
mod.rs

1// Copyright (C) 2018-2019, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27use std::cmp;
28
29use std::sync::Arc;
30
31use std::collections::hash_map;
32use std::collections::HashMap;
33use std::collections::HashSet;
34
35use intrusive_collections::intrusive_adapter;
36use intrusive_collections::KeyAdapter;
37use intrusive_collections::RBTree;
38use intrusive_collections::RBTreeAtomicLink;
39
40use smallvec::SmallVec;
41
42use crate::range_buf::DefaultBufFactory;
43use crate::BufFactory;
44use crate::Error;
45use crate::Result;
46
47const DEFAULT_URGENCY: u8 = 127;
48
49// The default size of the receiver stream flow control window.
50const DEFAULT_STREAM_WINDOW: u64 = 32 * 1024;
51
52/// The maximum size of the receiver stream flow control window.
53pub const MAX_STREAM_WINDOW: u64 = 16 * 1024 * 1024;
54
55/// A simple no-op hasher for Stream IDs.
56///
57/// The QUIC protocol and quiche library guarantees stream ID uniqueness, so
58/// we can save effort by avoiding using a more complicated algorithm.
59#[derive(Default)]
60pub struct StreamIdHasher {
61    id: u64,
62}
63
64/// Return value type of `RecvBuf::reset()`
65#[derive(Debug, PartialEq, Clone, Copy)]
66pub struct RecvBufResetReturn {
67    /// Returns the difference between the previous max_data offset
68    /// received and the final size reported by the reset
69    pub max_data_delta: u64,
70
71    /// The amount of flow control credit that should be returned to the
72    /// connection level flow control.
73    pub consumed_flowcontrol: u64,
74}
75
76impl RecvBufResetReturn {
77    pub fn zero() -> Self {
78        Self {
79            max_data_delta: 0,
80            consumed_flowcontrol: 0,
81        }
82    }
83}
84
85/// Action to perform when reading from a stream's receive buffer.
86pub enum RecvAction<'a> {
87    /// Emit data by copying it into the provided buffer.
88    Emit { out: &'a mut [u8] },
89    /// Discard up to the specified number of bytes without copying.
90    Discard { len: usize },
91}
92
93impl std::hash::Hasher for StreamIdHasher {
94    #[inline]
95    fn finish(&self) -> u64 {
96        self.id
97    }
98
99    #[inline]
100    fn write_u64(&mut self, id: u64) {
101        self.id = id;
102    }
103
104    #[inline]
105    fn write(&mut self, _: &[u8]) {
106        // We need a default write() for the trait but stream IDs will always
107        // be a u64 so we just delegate to write_u64.
108        unimplemented!()
109    }
110}
111
112type BuildStreamIdHasher = std::hash::BuildHasherDefault<StreamIdHasher>;
113
114pub type StreamIdHashMap<V> = HashMap<u64, V, BuildStreamIdHasher>;
115pub type StreamIdHashSet = HashSet<u64, BuildStreamIdHasher>;
116
117/// Keeps track of QUIC streams and enforces stream limits.
118#[derive(Default)]
119pub struct StreamMap<F: BufFactory = DefaultBufFactory> {
120    /// Map of streams indexed by stream ID.
121    streams: StreamIdHashMap<Stream<F>>,
122
123    /// Set of streams that were completed and garbage collected.
124    ///
125    /// Instead of keeping the full stream state forever, we collect completed
126    /// streams to save memory, but we still need to keep track of previously
127    /// created streams, to prevent peers from re-creating them.
128    collected: StreamIdHashSet,
129
130    /// Peer's maximum bidirectional stream count limit.
131    peer_max_streams_bidi: u64,
132
133    /// Peer's maximum unidirectional stream count limit.
134    peer_max_streams_uni: u64,
135
136    /// The total number of bidirectional streams opened by the peer.
137    peer_opened_streams_bidi: u64,
138
139    /// The total number of unidirectional streams opened by the peer.
140    peer_opened_streams_uni: u64,
141
142    /// Local maximum bidirectional stream count limit.
143    local_max_streams_bidi: u64,
144    local_max_streams_bidi_next: u64,
145
146    /// Initial maximum bidirectional stream count.
147    initial_max_streams_bidi: u64,
148
149    /// Local maximum unidirectional stream count limit.
150    local_max_streams_uni: u64,
151    local_max_streams_uni_next: u64,
152
153    /// Initial maximum unidirectional stream count.
154    initial_max_streams_uni: u64,
155
156    /// The total number of bidirectional streams opened by the local endpoint.
157    local_opened_streams_bidi: u64,
158
159    /// The total number of unidirectional streams opened by the local endpoint.
160    local_opened_streams_uni: u64,
161
162    /// Queue of stream IDs corresponding to streams that have buffered data
163    /// ready to be sent to the peer. This also implies that the stream has
164    /// enough flow control credits to send at least some of that data.
165    flushable: RBTree<StreamFlushablePriorityAdapter>,
166
167    /// Set of stream IDs corresponding to streams that have outstanding data
168    /// to read. This is used to generate a `StreamIter` of streams without
169    /// having to iterate over the full list of streams.
170    pub readable: RBTree<StreamReadablePriorityAdapter>,
171
172    /// Set of stream IDs corresponding to streams that have enough flow control
173    /// capacity to be written to, and is not finished. This is used to generate
174    /// a `StreamIter` of streams without having to iterate over the full list
175    /// of streams.
176    pub writable: RBTree<StreamWritablePriorityAdapter>,
177
178    /// Set of stream IDs corresponding to streams that are almost out of flow
179    /// control credit and need to send MAX_STREAM_DATA. This is used to
180    /// generate a `StreamIter` of streams without having to iterate over the
181    /// full list of streams.
182    almost_full: StreamIdHashSet,
183
184    /// Set of stream IDs corresponding to streams that are blocked. The value
185    /// of the map elements represents the offset of the stream at which the
186    /// blocking occurred.
187    blocked: StreamIdHashMap<u64>,
188
189    /// Set of stream IDs corresponding to streams that are reset. The value
190    /// of the map elements is a tuple of the error code and final size values
191    /// to include in the RESET_STREAM frame.
192    reset: StreamIdHashMap<(u64, u64)>,
193
194    /// Set of stream IDs corresponding to streams that are shutdown on the
195    /// receive side, and need to send a STOP_SENDING frame. The value of the
196    /// map elements is the error code to include in the STOP_SENDING frame.
197    stopped: StreamIdHashMap<u64>,
198
199    /// The maximum size of a stream window.
200    max_stream_window: u64,
201}
202
203impl<F: BufFactory> StreamMap<F> {
204    pub fn new(
205        max_streams_bidi: u64, max_streams_uni: u64, max_stream_window: u64,
206    ) -> Self {
207        StreamMap {
208            local_max_streams_bidi: max_streams_bidi,
209            local_max_streams_bidi_next: max_streams_bidi,
210            initial_max_streams_bidi: max_streams_bidi,
211
212            local_max_streams_uni: max_streams_uni,
213            local_max_streams_uni_next: max_streams_uni,
214            initial_max_streams_uni: max_streams_uni,
215
216            max_stream_window,
217
218            ..StreamMap::default()
219        }
220    }
221
222    /// Returns the stream with the given ID if it exists.
223    pub fn get(&self, id: u64) -> Option<&Stream<F>> {
224        self.streams.get(&id)
225    }
226
227    /// Returns the mutable stream with the given ID if it exists.
228    pub fn get_mut(&mut self, id: u64) -> Option<&mut Stream<F>> {
229        self.streams.get_mut(&id)
230    }
231
232    /// Returns the mutable stream with the given ID if it exists, or creates
233    /// a new one otherwise.
234    ///
235    /// The `local` parameter indicates whether the stream's creation was
236    /// requested by the local application rather than the peer, and is
237    /// used to validate the requested stream ID, and to select the initial
238    /// flow control values from the local and remote transport parameters
239    /// (also passed as arguments).
240    ///
241    /// This also takes care of enforcing both local and the peer's stream
242    /// count limits. If one of these limits is violated, the `StreamLimit`
243    /// error is returned.
244    pub(crate) fn get_or_create(
245        &mut self, id: u64, local_params: &crate::TransportParams,
246        peer_params: &crate::TransportParams, local: bool, is_server: bool,
247    ) -> Result<&mut Stream<F>> {
248        let (stream, is_new_and_writable) = match self.streams.entry(id) {
249            hash_map::Entry::Vacant(v) => {
250                // Stream has already been closed and garbage collected.
251                if self.collected.contains(&id) {
252                    return Err(Error::Done);
253                }
254
255                if local != is_local(id, is_server) {
256                    return Err(Error::InvalidStreamState(id));
257                }
258
259                let (max_rx_data, max_tx_data) = match (local, is_bidi(id)) {
260                    // Locally-initiated bidirectional stream.
261                    (true, true) => (
262                        local_params.initial_max_stream_data_bidi_local,
263                        peer_params.initial_max_stream_data_bidi_remote,
264                    ),
265
266                    // Locally-initiated unidirectional stream.
267                    (true, false) => (0, peer_params.initial_max_stream_data_uni),
268
269                    // Remotely-initiated bidirectional stream.
270                    (false, true) => (
271                        local_params.initial_max_stream_data_bidi_remote,
272                        peer_params.initial_max_stream_data_bidi_local,
273                    ),
274
275                    // Remotely-initiated unidirectional stream.
276                    (false, false) =>
277                        (local_params.initial_max_stream_data_uni, 0),
278                };
279
280                // The two least significant bits from a stream id identify the
281                // type of stream. Truncate those bits to get the sequence for
282                // that stream type.
283                let stream_sequence = id >> 2;
284
285                // Enforce stream count limits.
286                match (is_local(id, is_server), is_bidi(id)) {
287                    (true, true) => {
288                        let n = cmp::max(
289                            self.local_opened_streams_bidi,
290                            stream_sequence + 1,
291                        );
292
293                        if n > self.peer_max_streams_bidi {
294                            return Err(Error::StreamLimit);
295                        }
296
297                        self.local_opened_streams_bidi = n;
298                    },
299
300                    (true, false) => {
301                        let n = cmp::max(
302                            self.local_opened_streams_uni,
303                            stream_sequence + 1,
304                        );
305
306                        if n > self.peer_max_streams_uni {
307                            return Err(Error::StreamLimit);
308                        }
309
310                        self.local_opened_streams_uni = n;
311                    },
312
313                    (false, true) => {
314                        let n = cmp::max(
315                            self.peer_opened_streams_bidi,
316                            stream_sequence + 1,
317                        );
318
319                        if n > self.local_max_streams_bidi {
320                            return Err(Error::StreamLimit);
321                        }
322
323                        self.peer_opened_streams_bidi = n;
324                    },
325
326                    (false, false) => {
327                        let n = cmp::max(
328                            self.peer_opened_streams_uni,
329                            stream_sequence + 1,
330                        );
331
332                        if n > self.local_max_streams_uni {
333                            return Err(Error::StreamLimit);
334                        }
335
336                        self.peer_opened_streams_uni = n;
337                    },
338                };
339
340                let s = Stream::new(
341                    id,
342                    max_rx_data,
343                    max_tx_data,
344                    is_bidi(id),
345                    local,
346                    self.max_stream_window,
347                );
348
349                let is_writable = s.is_writable();
350
351                (v.insert(s), is_writable)
352            },
353
354            hash_map::Entry::Occupied(v) => (v.into_mut(), false),
355        };
356
357        // Newly created stream might already be writable due to initial flow
358        // control limits.
359        if is_new_and_writable {
360            self.writable.insert(Arc::clone(&stream.priority_key));
361        }
362
363        Ok(stream)
364    }
365
366    /// Adds the stream ID to the readable streams set.
367    ///
368    /// If the stream was already in the list, this does nothing.
369    pub fn insert_readable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
370        if !priority_key.readable.is_linked() {
371            self.readable.insert(Arc::clone(priority_key));
372        }
373    }
374
375    /// Removes the stream ID from the readable streams set.
376    pub fn remove_readable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
377        if !priority_key.readable.is_linked() {
378            return;
379        }
380
381        let mut c = {
382            let ptr = Arc::as_ptr(priority_key);
383            unsafe { self.readable.cursor_mut_from_ptr(ptr) }
384        };
385
386        c.remove();
387    }
388
389    /// Adds the stream ID to the writable streams set.
390    ///
391    /// This should also be called anytime a new stream is created, in addition
392    /// to when an existing stream becomes writable.
393    ///
394    /// If the stream was already in the list, this does nothing.
395    pub fn insert_writable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
396        if !priority_key.writable.is_linked() {
397            self.writable.insert(Arc::clone(priority_key));
398        }
399    }
400
401    /// Removes the stream ID from the writable streams set.
402    ///
403    /// This should also be called anytime an existing stream stops being
404    /// writable.
405    pub fn remove_writable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
406        if !priority_key.writable.is_linked() {
407            return;
408        }
409
410        let mut c = {
411            let ptr = Arc::as_ptr(priority_key);
412            unsafe { self.writable.cursor_mut_from_ptr(ptr) }
413        };
414
415        c.remove();
416    }
417
418    /// Adds the stream ID to the flushable streams set.
419    ///
420    /// If the stream was already in the list, this does nothing.
421    pub fn insert_flushable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
422        if !priority_key.flushable.is_linked() {
423            self.flushable.insert(Arc::clone(priority_key));
424        }
425    }
426
427    /// Removes the stream ID from the flushable streams set.
428    pub fn remove_flushable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
429        if !priority_key.flushable.is_linked() {
430            return;
431        }
432
433        let mut c = {
434            let ptr = Arc::as_ptr(priority_key);
435            unsafe { self.flushable.cursor_mut_from_ptr(ptr) }
436        };
437
438        c.remove();
439    }
440
441    pub fn peek_flushable(&self) -> Option<Arc<StreamPriorityKey>> {
442        self.flushable.front().clone_pointer()
443    }
444
445    /// Updates the priorities of a stream.
446    pub fn update_priority(
447        &mut self, old: &Arc<StreamPriorityKey>, new: &Arc<StreamPriorityKey>,
448    ) {
449        if old.readable.is_linked() {
450            self.remove_readable(old);
451            self.readable.insert(Arc::clone(new));
452        }
453
454        if old.writable.is_linked() {
455            self.remove_writable(old);
456            self.writable.insert(Arc::clone(new));
457        }
458
459        if old.flushable.is_linked() {
460            self.remove_flushable(old);
461            self.flushable.insert(Arc::clone(new));
462        }
463    }
464
465    /// Adds the stream ID to the almost full streams set.
466    ///
467    /// If the stream was already in the list, this does nothing.
468    pub fn insert_almost_full(&mut self, stream_id: u64) {
469        self.almost_full.insert(stream_id);
470    }
471
472    /// Removes the stream ID from the almost full streams set.
473    pub fn remove_almost_full(&mut self, stream_id: u64) {
474        self.almost_full.remove(&stream_id);
475    }
476
477    /// Adds the stream ID to the blocked streams set with the
478    /// given offset value.
479    ///
480    /// If the stream was already in the list, this does nothing.
481    pub fn insert_blocked(&mut self, stream_id: u64, off: u64) {
482        self.blocked.insert(stream_id, off);
483    }
484
485    /// Removes the stream ID from the blocked streams set.
486    pub fn remove_blocked(&mut self, stream_id: u64) {
487        self.blocked.remove(&stream_id);
488    }
489
490    /// Adds the stream ID to the reset streams set with the
491    /// given error code and final size values.
492    ///
493    /// If the stream was already in the list, this does nothing.
494    pub fn insert_reset(
495        &mut self, stream_id: u64, error_code: u64, final_size: u64,
496    ) {
497        self.reset.insert(stream_id, (error_code, final_size));
498    }
499
500    /// Removes the stream ID from the reset streams set.
501    pub fn remove_reset(&mut self, stream_id: u64) {
502        self.reset.remove(&stream_id);
503    }
504
505    /// Adds the stream ID to the stopped streams set with the
506    /// given error code.
507    ///
508    /// If the stream was already in the list, this does nothing.
509    pub fn insert_stopped(&mut self, stream_id: u64, error_code: u64) {
510        self.stopped.insert(stream_id, error_code);
511    }
512
513    /// Removes the stream ID from the stopped streams set.
514    pub fn remove_stopped(&mut self, stream_id: u64) {
515        self.stopped.remove(&stream_id);
516    }
517
518    /// Updates the peer's maximum bidirectional stream count limit.
519    pub fn update_peer_max_streams_bidi(&mut self, v: u64) {
520        self.peer_max_streams_bidi = cmp::max(self.peer_max_streams_bidi, v);
521    }
522
523    /// Updates the peer's maximum unidirectional stream count limit.
524    pub fn update_peer_max_streams_uni(&mut self, v: u64) {
525        self.peer_max_streams_uni = cmp::max(self.peer_max_streams_uni, v);
526    }
527
528    /// Commits the new max_streams_bidi limit.
529    pub fn update_max_streams_bidi(&mut self) {
530        self.local_max_streams_bidi = self.local_max_streams_bidi_next;
531    }
532
533    /// Sets the max_streams_bidi limit to the given value.
534    pub fn set_max_streams_bidi(&mut self, max: u64) {
535        self.local_max_streams_bidi = max;
536        self.local_max_streams_bidi_next = max;
537        self.initial_max_streams_bidi = max;
538    }
539
540    /// Returns the current max_streams_bidi limit.
541    pub fn max_streams_bidi(&self) -> u64 {
542        self.local_max_streams_bidi
543    }
544
545    /// Returns the new max_streams_bidi limit.
546    pub fn max_streams_bidi_next(&mut self) -> u64 {
547        self.local_max_streams_bidi_next
548    }
549
550    /// Commits the new max_streams_uni limit.
551    pub fn update_max_streams_uni(&mut self) {
552        self.local_max_streams_uni = self.local_max_streams_uni_next;
553    }
554
555    /// Returns the new max_streams_uni limit.
556    pub fn max_streams_uni_next(&mut self) -> u64 {
557        self.local_max_streams_uni_next
558    }
559
560    /// Returns the number of bidirectional streams that can be created
561    /// before the peer's stream count limit is reached.
562    pub fn peer_streams_left_bidi(&self) -> u64 {
563        self.peer_max_streams_bidi - self.local_opened_streams_bidi
564    }
565
566    /// Returns the number of unidirectional streams that can be created
567    /// before the peer's stream count limit is reached.
568    pub fn peer_streams_left_uni(&self) -> u64 {
569        self.peer_max_streams_uni - self.local_opened_streams_uni
570    }
571
572    /// Drops completed stream.
573    ///
574    /// This should only be called when Stream::is_complete() returns true for
575    /// the given stream.
576    pub fn collect(&mut self, stream_id: u64, local: bool) {
577        if !local {
578            // If the stream was created by the peer, give back a max streams
579            // credit.
580            if is_bidi(stream_id) {
581                self.local_max_streams_bidi_next =
582                    self.local_max_streams_bidi_next.saturating_add(1);
583            } else {
584                self.local_max_streams_uni_next =
585                    self.local_max_streams_uni_next.saturating_add(1);
586            }
587        }
588
589        let s = self.streams.remove(&stream_id).unwrap();
590
591        self.remove_readable(&s.priority_key);
592
593        self.remove_writable(&s.priority_key);
594
595        self.remove_flushable(&s.priority_key);
596
597        self.collected.insert(stream_id);
598    }
599
600    /// Creates an iterator over streams that have outstanding data to read.
601    pub fn readable(&self) -> StreamIter {
602        StreamIter {
603            streams: self.readable.iter().map(|s| s.id).collect(),
604            index: 0,
605        }
606    }
607
608    /// Creates an iterator over streams that can be written to.
609    pub fn writable(&self) -> StreamIter {
610        StreamIter {
611            streams: self.writable.iter().map(|s| s.id).collect(),
612            index: 0,
613        }
614    }
615
616    /// Creates an iterator over streams that need to send MAX_STREAM_DATA.
617    pub fn almost_full(&self) -> StreamIter {
618        StreamIter::from(&self.almost_full)
619    }
620
621    /// Creates an iterator over streams that need to send STREAM_DATA_BLOCKED.
622    pub fn blocked(&self) -> hash_map::Iter<'_, u64, u64> {
623        self.blocked.iter()
624    }
625
626    /// Creates an iterator over streams that need to send RESET_STREAM.
627    pub fn reset(&self) -> hash_map::Iter<'_, u64, (u64, u64)> {
628        self.reset.iter()
629    }
630
631    /// Creates an iterator over streams that need to send STOP_SENDING.
632    pub fn stopped(&self) -> hash_map::Iter<'_, u64, u64> {
633        self.stopped.iter()
634    }
635
636    /// Returns true if the stream has been collected.
637    pub fn is_collected(&self, stream_id: u64) -> bool {
638        self.collected.contains(&stream_id)
639    }
640
641    /// Returns true if there are any streams that have data to write.
642    pub fn has_flushable(&self) -> bool {
643        !self.flushable.is_empty()
644    }
645
646    /// Returns true if there are any streams that have data to read.
647    pub fn has_readable(&self) -> bool {
648        !self.readable.is_empty()
649    }
650
651    /// Returns true if there are any streams that need to update the local
652    /// flow control limit.
653    pub fn has_almost_full(&self) -> bool {
654        !self.almost_full.is_empty()
655    }
656
657    /// Returns true if there are any streams that are blocked.
658    pub fn has_blocked(&self) -> bool {
659        !self.blocked.is_empty()
660    }
661
662    /// Returns true if there are any streams that are reset.
663    pub fn has_reset(&self) -> bool {
664        !self.reset.is_empty()
665    }
666
667    /// Returns true if there are any streams that need to send STOP_SENDING.
668    pub fn has_stopped(&self) -> bool {
669        !self.stopped.is_empty()
670    }
671
672    /// Returns true if the max bidirectional streams count needs to be updated
673    /// by sending a MAX_STREAMS frame to the peer.
674    ///
675    /// This only sends MAX_STREAMS when available capacity is at or below 50%
676    /// of the initial maximum streams target.
677    pub fn should_update_max_streams_bidi(&self) -> bool {
678        let available = self
679            .local_max_streams_bidi
680            .saturating_sub(self.peer_opened_streams_bidi);
681        self.local_max_streams_bidi_next != self.local_max_streams_bidi &&
682            available <= self.initial_max_streams_bidi / 2
683    }
684
685    /// Returns true if the max unidirectional streams count needs to be updated
686    /// by sending a MAX_STREAMS frame to the peer.
687    ///
688    /// This only send MAX_STREAMS when available capacity is at or below 50% of
689    /// the initial maximum streams target.
690    pub fn should_update_max_streams_uni(&self) -> bool {
691        let available = self
692            .local_max_streams_uni
693            .saturating_sub(self.peer_opened_streams_uni);
694        self.local_max_streams_uni_next != self.local_max_streams_uni &&
695            available <= self.initial_max_streams_uni / 2
696    }
697
698    /// Returns the number of active streams in the map.
699    #[cfg(test)]
700    pub fn len(&self) -> usize {
701        self.streams.len()
702    }
703}
704
705/// A QUIC stream.
706pub struct Stream<F: BufFactory = DefaultBufFactory> {
707    /// Receive-side stream buffer.
708    pub recv: recv_buf::RecvBuf,
709
710    /// Send-side stream buffer.
711    pub send: send_buf::SendBuf<F>,
712
713    pub send_lowat: usize,
714
715    /// Whether the stream is bidirectional.
716    pub bidi: bool,
717
718    /// Whether the stream was created by the local endpoint.
719    pub local: bool,
720
721    /// The stream's urgency (lower is better). Default is `DEFAULT_URGENCY`.
722    pub urgency: u8,
723
724    /// Whether the stream can be flushed incrementally. Default is `true`.
725    pub incremental: bool,
726
727    pub priority_key: Arc<StreamPriorityKey>,
728}
729
730impl<F: BufFactory> Stream<F> {
731    /// Creates a new stream with the given flow control limits.
732    pub fn new(
733        id: u64, max_rx_data: u64, max_tx_data: u64, bidi: bool, local: bool,
734        max_window: u64,
735    ) -> Self {
736        let priority_key = Arc::new(StreamPriorityKey {
737            id,
738            ..Default::default()
739        });
740
741        Stream {
742            recv: recv_buf::RecvBuf::new(max_rx_data, max_window),
743            send: send_buf::SendBuf::new(max_tx_data),
744            send_lowat: 1,
745            bidi,
746            local,
747            urgency: priority_key.urgency,
748            incremental: priority_key.incremental,
749            priority_key,
750        }
751    }
752
753    /// Returns true if the stream has data to read.
754    pub fn is_readable(&self) -> bool {
755        self.recv.ready()
756    }
757
758    /// Returns true if the stream has enough flow control capacity to be
759    /// written to, and is not finished.
760    pub fn is_writable(&self) -> bool {
761        !self.send.is_shutdown() &&
762            !self.send.is_fin() &&
763            (self.send.off_back() + self.send_lowat as u64) <
764                self.send.max_off()
765    }
766
767    /// Returns true if the stream has data to send and is allowed to send at
768    /// least some of it.
769    pub fn is_flushable(&self) -> bool {
770        let off_front = self.send.off_front();
771
772        !self.send.is_empty() &&
773            off_front < self.send.off_back() &&
774            off_front < self.send.max_off()
775    }
776
777    /// Returns true if the stream is complete.
778    ///
779    /// For bidirectional streams this happens when both the receive and send
780    /// sides are complete. That is when all incoming data has been read by the
781    /// application, and when all outgoing data has been acked by the peer.
782    ///
783    /// For unidirectional streams this happens when either the receive or send
784    /// side is complete, depending on whether the stream was created locally
785    /// or not.
786    pub fn is_complete(&self) -> bool {
787        match (self.bidi, self.local) {
788            // For bidirectional streams we need to check both receive and send
789            // sides for completion.
790            (true, _) => self.recv.is_fin() && self.send.is_complete(),
791
792            // For unidirectional streams generated locally, we only need to
793            // check the send side for completion.
794            (false, true) => self.send.is_complete(),
795
796            // For unidirectional streams generated by the peer, we only need
797            // to check the receive side for completion.
798            (false, false) => self.recv.is_fin(),
799        }
800    }
801}
802
803/// Returns true if the stream was created locally.
804pub fn is_local(stream_id: u64, is_server: bool) -> bool {
805    (stream_id & 0x1) == (is_server as u64)
806}
807
808/// Returns true if the stream is bidirectional.
809pub fn is_bidi(stream_id: u64) -> bool {
810    (stream_id & 0x2) == 0
811}
812
813#[derive(Clone, Debug)]
814pub struct StreamPriorityKey {
815    pub urgency: u8,
816    pub incremental: bool,
817    pub id: u64,
818
819    pub readable: RBTreeAtomicLink,
820    pub writable: RBTreeAtomicLink,
821    pub flushable: RBTreeAtomicLink,
822}
823
824impl Default for StreamPriorityKey {
825    fn default() -> Self {
826        Self {
827            urgency: DEFAULT_URGENCY,
828            incremental: true,
829            id: Default::default(),
830            readable: Default::default(),
831            writable: Default::default(),
832            flushable: Default::default(),
833        }
834    }
835}
836
837impl PartialEq for StreamPriorityKey {
838    fn eq(&self, other: &Self) -> bool {
839        self.id == other.id
840    }
841}
842
843impl Eq for StreamPriorityKey {}
844
845impl PartialOrd for StreamPriorityKey {
846    fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
847        Some(self.cmp(other))
848    }
849}
850
851impl Ord for StreamPriorityKey {
852    fn cmp(&self, other: &Self) -> cmp::Ordering {
853        // Ignore priority if ID matches.
854        if self.id == other.id {
855            return cmp::Ordering::Equal;
856        }
857
858        // First, order by urgency...
859        if self.urgency != other.urgency {
860            return self.urgency.cmp(&other.urgency);
861        }
862
863        // ...when the urgency is the same, and both are not incremental, order
864        // by stream ID...
865        if !self.incremental && !other.incremental {
866            return self.id.cmp(&other.id);
867        }
868
869        // ...non-incremental takes priority over incremental...
870        if self.incremental && !other.incremental {
871            return cmp::Ordering::Greater;
872        }
873        if !self.incremental && other.incremental {
874            return cmp::Ordering::Less;
875        }
876
877        // ...finally, when both are incremental, `other` takes precedence (so
878        // `self` is always sorted after other same-urgency incremental
879        // entries).
880        cmp::Ordering::Greater
881    }
882}
883
884intrusive_adapter!(pub StreamWritablePriorityAdapter = Arc<StreamPriorityKey>: StreamPriorityKey { writable: RBTreeAtomicLink });
885
886impl KeyAdapter<'_> for StreamWritablePriorityAdapter {
887    type Key = StreamPriorityKey;
888
889    fn get_key(&self, s: &StreamPriorityKey) -> Self::Key {
890        s.clone()
891    }
892}
893
894intrusive_adapter!(pub StreamReadablePriorityAdapter = Arc<StreamPriorityKey>: StreamPriorityKey { readable: RBTreeAtomicLink });
895
896impl KeyAdapter<'_> for StreamReadablePriorityAdapter {
897    type Key = StreamPriorityKey;
898
899    fn get_key(&self, s: &StreamPriorityKey) -> Self::Key {
900        s.clone()
901    }
902}
903
904intrusive_adapter!(pub StreamFlushablePriorityAdapter = Arc<StreamPriorityKey>: StreamPriorityKey { flushable: RBTreeAtomicLink });
905
906impl KeyAdapter<'_> for StreamFlushablePriorityAdapter {
907    type Key = StreamPriorityKey;
908
909    fn get_key(&self, s: &StreamPriorityKey) -> Self::Key {
910        s.clone()
911    }
912}
913
914/// An iterator over QUIC streams.
915#[derive(Default)]
916pub struct StreamIter {
917    streams: SmallVec<[u64; 8]>,
918    index: usize,
919}
920
921impl StreamIter {
922    #[inline]
923    fn from(streams: &StreamIdHashSet) -> Self {
924        StreamIter {
925            streams: streams.iter().copied().collect(),
926            index: 0,
927        }
928    }
929}
930
931impl Iterator for StreamIter {
932    type Item = u64;
933
934    #[inline]
935    fn next(&mut self) -> Option<Self::Item> {
936        let v = self.streams.get(self.index)?;
937        self.index += 1;
938        Some(*v)
939    }
940}
941
942impl ExactSizeIterator for StreamIter {
943    #[inline]
944    fn len(&self) -> usize {
945        self.streams.len() - self.index
946    }
947}
948
949#[cfg(test)]
950mod tests {
951    use crate::range_buf::RangeBuf;
952
953    use super::*;
954
955    #[test]
956    fn recv_flow_control() {
957        let mut stream =
958            <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
959        assert!(!stream.recv.almost_full());
960
961        let mut buf = [0; 32];
962
963        let first = RangeBuf::from(b"hello", 0, false);
964        let second = RangeBuf::from(b"world", 5, false);
965        let third = RangeBuf::from(b"something", 10, false);
966
967        assert_eq!(stream.recv.write(second), Ok(()));
968        assert_eq!(stream.recv.write(first), Ok(()));
969        assert!(!stream.recv.almost_full());
970
971        assert_eq!(stream.recv.write(third), Err(Error::FlowControl));
972
973        let (len, fin) = stream.recv.emit(&mut buf).unwrap();
974        assert_eq!(&buf[..len], b"helloworld");
975        assert!(!fin);
976
977        assert!(stream.recv.almost_full());
978
979        stream.recv.update_max_data(std::time::Instant::now());
980        assert_eq!(stream.recv.max_data_next(), 25);
981        assert!(!stream.recv.almost_full());
982
983        let third = RangeBuf::from(b"something", 10, false);
984        assert_eq!(stream.recv.write(third), Ok(()));
985    }
986
987    #[test]
988    fn recv_past_fin() {
989        let mut stream =
990            <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
991        assert!(!stream.recv.almost_full());
992
993        let first = RangeBuf::from(b"hello", 0, true);
994        let second = RangeBuf::from(b"world", 5, false);
995
996        assert_eq!(stream.recv.write(first), Ok(()));
997        assert_eq!(stream.recv.write(second), Err(Error::FinalSize));
998    }
999
1000    #[test]
1001    fn recv_fin_dup() {
1002        let mut stream =
1003            <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1004        assert!(!stream.recv.almost_full());
1005
1006        let first = RangeBuf::from(b"hello", 0, true);
1007        let second = RangeBuf::from(b"hello", 0, true);
1008
1009        assert_eq!(stream.recv.write(first), Ok(()));
1010        assert_eq!(stream.recv.write(second), Ok(()));
1011
1012        let mut buf = [0; 32];
1013
1014        let (len, fin) = stream.recv.emit(&mut buf).unwrap();
1015        assert_eq!(&buf[..len], b"hello");
1016        assert!(fin);
1017    }
1018
1019    #[test]
1020    fn recv_fin_change() {
1021        let mut stream =
1022            <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1023        assert!(!stream.recv.almost_full());
1024
1025        let first = RangeBuf::from(b"hello", 0, true);
1026        let second = RangeBuf::from(b"world", 5, true);
1027
1028        assert_eq!(stream.recv.write(second), Ok(()));
1029        assert_eq!(stream.recv.write(first), Err(Error::FinalSize));
1030    }
1031
1032    #[test]
1033    fn recv_fin_lower_than_received() {
1034        let mut stream =
1035            <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1036        assert!(!stream.recv.almost_full());
1037
1038        let first = RangeBuf::from(b"hello", 0, true);
1039        let second = RangeBuf::from(b"world", 5, false);
1040
1041        assert_eq!(stream.recv.write(second), Ok(()));
1042        assert_eq!(stream.recv.write(first), Err(Error::FinalSize));
1043    }
1044
1045    #[test]
1046    fn recv_fin_flow_control() {
1047        let mut stream =
1048            <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1049        assert!(!stream.recv.almost_full());
1050
1051        let mut buf = [0; 32];
1052
1053        let first = RangeBuf::from(b"hello", 0, false);
1054        let second = RangeBuf::from(b"world", 5, true);
1055
1056        assert_eq!(stream.recv.write(first), Ok(()));
1057        assert_eq!(stream.recv.write(second), Ok(()));
1058
1059        let (len, fin) = stream.recv.emit(&mut buf).unwrap();
1060        assert_eq!(&buf[..len], b"helloworld");
1061        assert!(fin);
1062
1063        assert!(!stream.recv.almost_full());
1064    }
1065
1066    #[test]
1067    fn recv_fin_reset_mismatch() {
1068        let mut stream =
1069            <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1070        assert!(!stream.recv.almost_full());
1071
1072        let first = RangeBuf::from(b"hello", 0, true);
1073
1074        assert_eq!(stream.recv.write(first), Ok(()));
1075        assert_eq!(stream.recv.reset(0, 10), Err(Error::FinalSize));
1076    }
1077
1078    #[test]
1079    fn recv_reset_with_gap() {
1080        let mut stream =
1081            <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1082        assert!(!stream.recv.almost_full());
1083
1084        let first = RangeBuf::from(b"hello", 0, false);
1085
1086        assert_eq!(stream.recv.write(first), Ok(()));
1087        // Read one byte.
1088        assert_eq!(stream.recv.emit(&mut [0; 1]), Ok((1, false)));
1089        // Reset with a final size > than max previously received
1090        assert_eq!(
1091            stream.recv.reset(0, 10),
1092            Ok(RecvBufResetReturn {
1093                max_data_delta: 5,
1094                // consumed_flowcontrol is 9, since we already read 1 byte
1095                consumed_flowcontrol: 9
1096            })
1097        );
1098        assert_eq!(stream.recv.reset(0, 10), Ok(RecvBufResetReturn::zero()));
1099    }
1100
1101    #[test]
1102    fn recv_reset_dup() {
1103        let mut stream =
1104            <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1105        assert!(!stream.recv.almost_full());
1106
1107        let first = RangeBuf::from(b"hello", 0, false);
1108
1109        assert_eq!(stream.recv.write(first), Ok(()));
1110        assert_eq!(
1111            stream.recv.reset(0, 5),
1112            Ok(RecvBufResetReturn {
1113                max_data_delta: 0,
1114                consumed_flowcontrol: 5
1115            })
1116        );
1117        assert_eq!(stream.recv.reset(0, 5), Ok(RecvBufResetReturn::zero()));
1118    }
1119
1120    #[test]
1121    fn recv_reset_change() {
1122        let mut stream =
1123            <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1124        assert!(!stream.recv.almost_full());
1125
1126        let first = RangeBuf::from(b"hello", 0, false);
1127
1128        assert_eq!(stream.recv.write(first), Ok(()));
1129        assert_eq!(
1130            stream.recv.reset(0, 5),
1131            Ok(RecvBufResetReturn {
1132                max_data_delta: 0,
1133                consumed_flowcontrol: 5
1134            })
1135        );
1136        assert_eq!(stream.recv.reset(0, 10), Err(Error::FinalSize));
1137    }
1138
1139    #[test]
1140    fn recv_reset_lower_than_received() {
1141        let mut stream =
1142            <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1143        assert!(!stream.recv.almost_full());
1144
1145        let first = RangeBuf::from(b"hello", 0, false);
1146
1147        assert_eq!(stream.recv.write(first), Ok(()));
1148        assert_eq!(stream.recv.reset(0, 4), Err(Error::FinalSize));
1149    }
1150
1151    #[test]
1152    fn send_flow_control() {
1153        let mut buf = [0; 25];
1154
1155        let mut stream =
1156            <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1157
1158        let first = b"hello";
1159        let second = b"world";
1160        let third = b"something";
1161
1162        assert!(stream.send.write(first, false).is_ok());
1163        assert!(stream.send.write(second, false).is_ok());
1164        assert!(stream.send.write(third, false).is_ok());
1165
1166        assert_eq!(stream.send.off_front(), 0);
1167
1168        let (written, fin) = stream.send.emit(&mut buf[..25]).unwrap();
1169        assert_eq!(written, 15);
1170        assert!(!fin);
1171        assert_eq!(&buf[..written], b"helloworldsomet");
1172
1173        assert_eq!(stream.send.off_front(), 15);
1174
1175        let (written, fin) = stream.send.emit(&mut buf[..25]).unwrap();
1176        assert_eq!(written, 0);
1177        assert!(!fin);
1178        assert_eq!(&buf[..written], b"");
1179
1180        stream.send.retransmit(0, 15);
1181
1182        assert_eq!(stream.send.off_front(), 0);
1183
1184        let (written, fin) = stream.send.emit(&mut buf[..10]).unwrap();
1185        assert_eq!(written, 10);
1186        assert!(!fin);
1187        assert_eq!(&buf[..written], b"helloworld");
1188
1189        assert_eq!(stream.send.off_front(), 10);
1190
1191        let (written, fin) = stream.send.emit(&mut buf[..10]).unwrap();
1192        assert_eq!(written, 5);
1193        assert!(!fin);
1194        assert_eq!(&buf[..written], b"somet");
1195    }
1196
1197    #[test]
1198    fn send_past_fin() {
1199        let mut stream =
1200            <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1201
1202        let first = b"hello";
1203        let second = b"world";
1204        let third = b"third";
1205
1206        assert_eq!(stream.send.write(first, false), Ok(5));
1207
1208        assert_eq!(stream.send.write(second, true), Ok(5));
1209        assert!(stream.send.is_fin());
1210
1211        assert_eq!(stream.send.write(third, false), Err(Error::FinalSize));
1212    }
1213
1214    #[test]
1215    fn send_fin_dup() {
1216        let mut stream =
1217            <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1218
1219        assert_eq!(stream.send.write(b"hello", true), Ok(5));
1220        assert!(stream.send.is_fin());
1221
1222        assert_eq!(stream.send.write(b"", true), Ok(0));
1223        assert!(stream.send.is_fin());
1224    }
1225
1226    #[test]
1227    fn send_undo_fin() {
1228        let mut stream =
1229            <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1230
1231        assert_eq!(stream.send.write(b"hello", true), Ok(5));
1232        assert!(stream.send.is_fin());
1233
1234        assert_eq!(
1235            stream.send.write(b"helloworld", true),
1236            Err(Error::FinalSize)
1237        );
1238    }
1239
1240    #[test]
1241    fn send_fin_max_data_match() {
1242        let mut buf = [0; 15];
1243
1244        let mut stream =
1245            <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1246
1247        let slice = b"hellohellohello";
1248
1249        assert!(stream.send.write(slice, true).is_ok());
1250
1251        let (written, fin) = stream.send.emit(&mut buf[..15]).unwrap();
1252        assert_eq!(written, 15);
1253        assert!(fin);
1254        assert_eq!(&buf[..written], slice);
1255    }
1256
1257    #[test]
1258    fn send_fin_zero_length() {
1259        let mut buf = [0; 5];
1260
1261        let mut stream =
1262            <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1263
1264        assert_eq!(stream.send.write(b"hello", false), Ok(5));
1265        assert_eq!(stream.send.write(b"", true), Ok(0));
1266        assert!(stream.send.is_fin());
1267
1268        let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1269        assert_eq!(written, 5);
1270        assert!(fin);
1271        assert_eq!(&buf[..written], b"hello");
1272    }
1273
1274    #[test]
1275    fn send_ack() {
1276        let mut buf = [0; 5];
1277
1278        let mut stream =
1279            <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1280
1281        assert_eq!(stream.send.write(b"hello", false), Ok(5));
1282        assert_eq!(stream.send.write(b"world", false), Ok(5));
1283        assert_eq!(stream.send.write(b"", true), Ok(0));
1284        assert!(stream.send.is_fin());
1285
1286        assert_eq!(stream.send.off_front(), 0);
1287
1288        let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1289        assert_eq!(written, 5);
1290        assert!(!fin);
1291        assert_eq!(&buf[..written], b"hello");
1292
1293        stream.send.ack_and_drop(0, 5);
1294
1295        stream.send.retransmit(0, 5);
1296
1297        assert_eq!(stream.send.off_front(), 5);
1298
1299        let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1300        assert_eq!(written, 5);
1301        assert!(fin);
1302        assert_eq!(&buf[..written], b"world");
1303    }
1304
1305    #[test]
1306    fn send_ack_reordering() {
1307        let mut buf = [0; 5];
1308
1309        let mut stream =
1310            <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1311
1312        assert_eq!(stream.send.write(b"hello", false), Ok(5));
1313        assert_eq!(stream.send.write(b"world", false), Ok(5));
1314        assert_eq!(stream.send.write(b"", true), Ok(0));
1315        assert!(stream.send.is_fin());
1316
1317        assert_eq!(stream.send.off_front(), 0);
1318
1319        let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1320        assert_eq!(written, 5);
1321        assert!(!fin);
1322        assert_eq!(&buf[..written], b"hello");
1323
1324        assert_eq!(stream.send.off_front(), 5);
1325
1326        let (written, fin) = stream.send.emit(&mut buf[..1]).unwrap();
1327        assert_eq!(written, 1);
1328        assert!(!fin);
1329        assert_eq!(&buf[..written], b"w");
1330
1331        stream.send.ack_and_drop(5, 1);
1332        stream.send.ack_and_drop(0, 5);
1333
1334        stream.send.retransmit(0, 5);
1335        stream.send.retransmit(5, 1);
1336
1337        assert_eq!(stream.send.off_front(), 6);
1338
1339        let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1340        assert_eq!(written, 4);
1341        assert!(fin);
1342        assert_eq!(&buf[..written], b"orld");
1343    }
1344
1345    #[test]
1346    fn recv_data_below_off() {
1347        let mut stream =
1348            <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1349
1350        let first = RangeBuf::from(b"hello", 0, false);
1351
1352        assert_eq!(stream.recv.write(first), Ok(()));
1353
1354        let mut buf = [0; 10];
1355
1356        let (len, fin) = stream.recv.emit(&mut buf).unwrap();
1357        assert_eq!(&buf[..len], b"hello");
1358        assert!(!fin);
1359
1360        let first = RangeBuf::from(b"elloworld", 1, true);
1361        assert_eq!(stream.recv.write(first), Ok(()));
1362
1363        let (len, fin) = stream.recv.emit(&mut buf).unwrap();
1364        assert_eq!(&buf[..len], b"world");
1365        assert!(fin);
1366    }
1367
1368    #[test]
1369    fn stream_complete() {
1370        let mut stream =
1371            <Stream>::new(0, 30, 30, true, true, DEFAULT_STREAM_WINDOW);
1372
1373        assert_eq!(stream.send.write(b"hello", false), Ok(5));
1374        assert_eq!(stream.send.write(b"world", false), Ok(5));
1375
1376        assert!(!stream.send.is_complete());
1377        assert!(!stream.send.is_fin());
1378
1379        assert_eq!(stream.send.write(b"", true), Ok(0));
1380
1381        assert!(!stream.send.is_complete());
1382        assert!(stream.send.is_fin());
1383
1384        let buf = RangeBuf::from(b"hello", 0, true);
1385        assert!(stream.recv.write(buf).is_ok());
1386        assert!(!stream.recv.is_fin());
1387
1388        stream.send.ack(6, 4);
1389        assert!(!stream.send.is_complete());
1390
1391        let mut buf = [0; 2];
1392        assert_eq!(stream.recv.emit(&mut buf), Ok((2, false)));
1393        assert!(!stream.recv.is_fin());
1394
1395        stream.send.ack(1, 5);
1396        assert!(!stream.send.is_complete());
1397
1398        stream.send.ack(0, 1);
1399        assert!(stream.send.is_complete());
1400
1401        assert!(!stream.is_complete());
1402
1403        let mut buf = [0; 3];
1404        assert_eq!(stream.recv.emit(&mut buf), Ok((3, true)));
1405        assert!(stream.recv.is_fin());
1406
1407        assert!(stream.is_complete());
1408    }
1409
1410    #[test]
1411    fn send_fin_zero_length_output() {
1412        let mut buf = [0; 5];
1413
1414        let mut stream =
1415            <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1416
1417        assert_eq!(stream.send.write(b"hello", false), Ok(5));
1418        assert_eq!(stream.send.off_front(), 0);
1419        assert!(!stream.send.is_fin());
1420
1421        let (written, fin) = stream.send.emit(&mut buf).unwrap();
1422        assert_eq!(written, 5);
1423        assert!(!fin);
1424        assert_eq!(&buf[..written], b"hello");
1425
1426        assert_eq!(stream.send.write(b"", true), Ok(0));
1427        assert!(stream.send.is_fin());
1428        assert_eq!(stream.send.off_front(), 5);
1429
1430        let (written, fin) = stream.send.emit(&mut buf).unwrap();
1431        assert_eq!(written, 0);
1432        assert!(fin);
1433        assert_eq!(&buf[..written], b"");
1434    }
1435
1436    fn stream_send_ready(stream: &Stream) -> bool {
1437        !stream.send.is_empty() &&
1438            stream.send.off_front() < stream.send.off_back()
1439    }
1440
1441    #[test]
1442    fn send_emit() {
1443        let mut buf = [0; 5];
1444
1445        let mut stream =
1446            <Stream>::new(0, 0, 20, true, true, DEFAULT_STREAM_WINDOW);
1447
1448        assert_eq!(stream.send.write(b"hello", false), Ok(5));
1449        assert_eq!(stream.send.write(b"world", false), Ok(5));
1450        assert_eq!(stream.send.write(b"olleh", false), Ok(5));
1451        assert_eq!(stream.send.write(b"dlrow", true), Ok(5));
1452        assert_eq!(stream.send.off_front(), 0);
1453        assert_eq!(stream.send.bufs_count(), 4);
1454
1455        assert!(stream.is_flushable());
1456
1457        assert!(stream_send_ready(&stream));
1458        assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1459        assert_eq!(stream.send.off_front(), 4);
1460        assert_eq!(&buf[..4], b"hell");
1461
1462        assert!(stream_send_ready(&stream));
1463        assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1464        assert_eq!(stream.send.off_front(), 8);
1465        assert_eq!(&buf[..4], b"owor");
1466
1467        assert!(stream_send_ready(&stream));
1468        assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1469        assert_eq!(stream.send.off_front(), 10);
1470        assert_eq!(&buf[..2], b"ld");
1471
1472        assert!(stream_send_ready(&stream));
1473        assert_eq!(stream.send.emit(&mut buf[..1]), Ok((1, false)));
1474        assert_eq!(stream.send.off_front(), 11);
1475        assert_eq!(&buf[..1], b"o");
1476
1477        assert!(stream_send_ready(&stream));
1478        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1479        assert_eq!(stream.send.off_front(), 16);
1480        assert_eq!(&buf[..5], b"llehd");
1481
1482        assert!(stream_send_ready(&stream));
1483        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((4, true)));
1484        assert_eq!(stream.send.off_front(), 20);
1485        assert_eq!(&buf[..4], b"lrow");
1486
1487        assert!(!stream.is_flushable());
1488
1489        assert!(!stream_send_ready(&stream));
1490        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((0, true)));
1491        assert_eq!(stream.send.off_front(), 20);
1492    }
1493
1494    #[test]
1495    fn send_emit_ack() {
1496        let mut buf = [0; 5];
1497
1498        let mut stream =
1499            <Stream>::new(0, 0, 20, true, true, DEFAULT_STREAM_WINDOW);
1500
1501        assert_eq!(stream.send.write(b"hello", false), Ok(5));
1502        assert_eq!(stream.send.write(b"world", false), Ok(5));
1503        assert_eq!(stream.send.write(b"olleh", false), Ok(5));
1504        assert_eq!(stream.send.write(b"dlrow", true), Ok(5));
1505        assert_eq!(stream.send.off_front(), 0);
1506        assert_eq!(stream.send.bufs_count(), 4);
1507
1508        assert!(stream.is_flushable());
1509
1510        assert!(stream_send_ready(&stream));
1511        assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1512        assert_eq!(stream.send.off_front(), 4);
1513        assert_eq!(&buf[..4], b"hell");
1514
1515        assert!(stream_send_ready(&stream));
1516        assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1517        assert_eq!(stream.send.off_front(), 8);
1518        assert_eq!(&buf[..4], b"owor");
1519
1520        stream.send.ack_and_drop(0, 5);
1521        assert_eq!(stream.send.bufs_count(), 3);
1522
1523        assert!(stream_send_ready(&stream));
1524        assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1525        assert_eq!(stream.send.off_front(), 10);
1526        assert_eq!(&buf[..2], b"ld");
1527
1528        stream.send.ack_and_drop(7, 5);
1529        assert_eq!(stream.send.bufs_count(), 3);
1530
1531        assert!(stream_send_ready(&stream));
1532        assert_eq!(stream.send.emit(&mut buf[..1]), Ok((1, false)));
1533        assert_eq!(stream.send.off_front(), 11);
1534        assert_eq!(&buf[..1], b"o");
1535
1536        assert!(stream_send_ready(&stream));
1537        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1538        assert_eq!(stream.send.off_front(), 16);
1539        assert_eq!(&buf[..5], b"llehd");
1540
1541        stream.send.ack_and_drop(5, 7);
1542        assert_eq!(stream.send.bufs_count(), 2);
1543
1544        assert!(stream_send_ready(&stream));
1545        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((4, true)));
1546        assert_eq!(stream.send.off_front(), 20);
1547        assert_eq!(&buf[..4], b"lrow");
1548
1549        assert!(!stream.is_flushable());
1550
1551        assert!(!stream_send_ready(&stream));
1552        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((0, true)));
1553        assert_eq!(stream.send.off_front(), 20);
1554
1555        stream.send.ack_and_drop(22, 4);
1556        assert_eq!(stream.send.bufs_count(), 2);
1557
1558        stream.send.ack_and_drop(20, 1);
1559        assert_eq!(stream.send.bufs_count(), 2);
1560    }
1561
1562    #[test]
1563    fn send_emit_retransmit() {
1564        let mut buf = [0; 5];
1565
1566        let mut stream =
1567            <Stream>::new(0, 0, 20, true, true, DEFAULT_STREAM_WINDOW);
1568
1569        assert_eq!(stream.send.write(b"hello", false), Ok(5));
1570        assert_eq!(stream.send.write(b"world", false), Ok(5));
1571        assert_eq!(stream.send.write(b"olleh", false), Ok(5));
1572        assert_eq!(stream.send.write(b"dlrow", true), Ok(5));
1573        assert_eq!(stream.send.off_front(), 0);
1574        assert_eq!(stream.send.bufs_count(), 4);
1575
1576        assert!(stream.is_flushable());
1577
1578        assert!(stream_send_ready(&stream));
1579        assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1580        assert_eq!(stream.send.off_front(), 4);
1581        assert_eq!(&buf[..4], b"hell");
1582
1583        assert!(stream_send_ready(&stream));
1584        assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1585        assert_eq!(stream.send.off_front(), 8);
1586        assert_eq!(&buf[..4], b"owor");
1587
1588        stream.send.retransmit(3, 3);
1589        assert_eq!(stream.send.off_front(), 3);
1590
1591        assert!(stream_send_ready(&stream));
1592        assert_eq!(stream.send.emit(&mut buf[..3]), Ok((3, false)));
1593        assert_eq!(stream.send.off_front(), 8);
1594        assert_eq!(&buf[..3], b"low");
1595
1596        assert!(stream_send_ready(&stream));
1597        assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1598        assert_eq!(stream.send.off_front(), 10);
1599        assert_eq!(&buf[..2], b"ld");
1600
1601        stream.send.ack_and_drop(7, 2);
1602
1603        stream.send.retransmit(8, 2);
1604
1605        assert!(stream_send_ready(&stream));
1606        assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1607        assert_eq!(stream.send.off_front(), 10);
1608        assert_eq!(&buf[..2], b"ld");
1609
1610        assert!(stream_send_ready(&stream));
1611        assert_eq!(stream.send.emit(&mut buf[..1]), Ok((1, false)));
1612        assert_eq!(stream.send.off_front(), 11);
1613        assert_eq!(&buf[..1], b"o");
1614
1615        assert!(stream_send_ready(&stream));
1616        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1617        assert_eq!(stream.send.off_front(), 16);
1618        assert_eq!(&buf[..5], b"llehd");
1619
1620        stream.send.retransmit(12, 2);
1621
1622        assert!(stream_send_ready(&stream));
1623        assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1624        assert_eq!(stream.send.off_front(), 16);
1625        assert_eq!(&buf[..2], b"le");
1626
1627        assert!(stream_send_ready(&stream));
1628        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((4, true)));
1629        assert_eq!(stream.send.off_front(), 20);
1630        assert_eq!(&buf[..4], b"lrow");
1631
1632        assert!(!stream.is_flushable());
1633
1634        assert!(!stream_send_ready(&stream));
1635        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((0, true)));
1636        assert_eq!(stream.send.off_front(), 20);
1637
1638        stream.send.retransmit(7, 12);
1639
1640        assert!(stream_send_ready(&stream));
1641        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1642        assert_eq!(stream.send.off_front(), 12);
1643        assert_eq!(&buf[..5], b"rldol");
1644
1645        assert!(stream_send_ready(&stream));
1646        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1647        assert_eq!(stream.send.off_front(), 17);
1648        assert_eq!(&buf[..5], b"lehdl");
1649
1650        assert!(stream_send_ready(&stream));
1651        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((2, false)));
1652        assert_eq!(stream.send.off_front(), 20);
1653        assert_eq!(&buf[..2], b"ro");
1654
1655        stream.send.ack_and_drop(12, 7);
1656
1657        stream.send.retransmit(7, 12);
1658
1659        assert!(stream_send_ready(&stream));
1660        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1661        assert_eq!(stream.send.off_front(), 12);
1662        assert_eq!(&buf[..5], b"rldol");
1663
1664        assert!(stream_send_ready(&stream));
1665        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1666        assert_eq!(stream.send.off_front(), 17);
1667        assert_eq!(&buf[..5], b"lehdl");
1668
1669        assert!(stream_send_ready(&stream));
1670        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((2, false)));
1671        assert_eq!(stream.send.off_front(), 20);
1672        assert_eq!(&buf[..2], b"ro");
1673    }
1674
1675    #[test]
1676    fn rangebuf_split_off() {
1677        let mut buf = <RangeBuf>::from(b"helloworld", 5, true);
1678        assert_eq!(buf.start, 0);
1679        assert_eq!(buf.pos, 0);
1680        assert_eq!(buf.len, 10);
1681        assert_eq!(buf.off, 5);
1682        assert!(buf.fin);
1683
1684        assert_eq!(buf.len(), 10);
1685        assert_eq!(buf.off(), 5);
1686        assert!(buf.fin());
1687
1688        assert_eq!(&buf[..], b"helloworld");
1689
1690        // Advance buffer.
1691        buf.consume(5);
1692
1693        assert_eq!(buf.start, 0);
1694        assert_eq!(buf.pos, 5);
1695        assert_eq!(buf.len, 10);
1696        assert_eq!(buf.off, 5);
1697        assert!(buf.fin);
1698
1699        assert_eq!(buf.len(), 5);
1700        assert_eq!(buf.off(), 10);
1701        assert!(buf.fin());
1702
1703        assert_eq!(&buf[..], b"world");
1704
1705        // Split buffer before position.
1706        let mut new_buf = buf.split_off(3);
1707
1708        assert_eq!(buf.start, 0);
1709        assert_eq!(buf.pos, 3);
1710        assert_eq!(buf.len, 3);
1711        assert_eq!(buf.off, 5);
1712        assert!(!buf.fin);
1713
1714        assert_eq!(buf.len(), 0);
1715        assert_eq!(buf.off(), 8);
1716        assert!(!buf.fin());
1717
1718        assert_eq!(&buf[..], b"");
1719
1720        assert_eq!(new_buf.start, 3);
1721        assert_eq!(new_buf.pos, 5);
1722        assert_eq!(new_buf.len, 7);
1723        assert_eq!(new_buf.off, 8);
1724        assert!(new_buf.fin);
1725
1726        assert_eq!(new_buf.len(), 5);
1727        assert_eq!(new_buf.off(), 10);
1728        assert!(new_buf.fin());
1729
1730        assert_eq!(&new_buf[..], b"world");
1731
1732        // Advance buffer.
1733        new_buf.consume(2);
1734
1735        assert_eq!(new_buf.start, 3);
1736        assert_eq!(new_buf.pos, 7);
1737        assert_eq!(new_buf.len, 7);
1738        assert_eq!(new_buf.off, 8);
1739        assert!(new_buf.fin);
1740
1741        assert_eq!(new_buf.len(), 3);
1742        assert_eq!(new_buf.off(), 12);
1743        assert!(new_buf.fin());
1744
1745        assert_eq!(&new_buf[..], b"rld");
1746
1747        // Split buffer after position.
1748        let mut new_new_buf = new_buf.split_off(5);
1749
1750        assert_eq!(new_buf.start, 3);
1751        assert_eq!(new_buf.pos, 7);
1752        assert_eq!(new_buf.len, 5);
1753        assert_eq!(new_buf.off, 8);
1754        assert!(!new_buf.fin);
1755
1756        assert_eq!(new_buf.len(), 1);
1757        assert_eq!(new_buf.off(), 12);
1758        assert!(!new_buf.fin());
1759
1760        assert_eq!(&new_buf[..], b"r");
1761
1762        assert_eq!(new_new_buf.start, 8);
1763        assert_eq!(new_new_buf.pos, 8);
1764        assert_eq!(new_new_buf.len, 2);
1765        assert_eq!(new_new_buf.off, 13);
1766        assert!(new_new_buf.fin);
1767
1768        assert_eq!(new_new_buf.len(), 2);
1769        assert_eq!(new_new_buf.off(), 13);
1770        assert!(new_new_buf.fin());
1771
1772        assert_eq!(&new_new_buf[..], b"ld");
1773
1774        // Advance buffer.
1775        new_new_buf.consume(2);
1776
1777        assert_eq!(new_new_buf.start, 8);
1778        assert_eq!(new_new_buf.pos, 10);
1779        assert_eq!(new_new_buf.len, 2);
1780        assert_eq!(new_new_buf.off, 13);
1781        assert!(new_new_buf.fin);
1782
1783        assert_eq!(new_new_buf.len(), 0);
1784        assert_eq!(new_new_buf.off(), 15);
1785        assert!(new_new_buf.fin());
1786
1787        assert_eq!(&new_new_buf[..], b"");
1788    }
1789
1790    /// RFC9000 2.1: A stream ID that is used out of order results in all
1791    /// streams of that type with lower-numbered stream IDs also being opened.
1792    #[test]
1793    fn stream_limit_auto_open() {
1794        let local_tp = crate::TransportParams::default();
1795        let peer_tp = crate::TransportParams::default();
1796
1797        let mut streams = <StreamMap>::new(5, 5, 5);
1798
1799        let stream_id = 500;
1800        assert!(!is_local(stream_id, true), "stream id is peer initiated");
1801        assert!(is_bidi(stream_id), "stream id is bidirectional");
1802        assert_eq!(
1803            streams
1804                .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
1805                .err(),
1806            Some(Error::StreamLimit),
1807            "stream limit should be exceeded"
1808        );
1809    }
1810
1811    /// Stream limit should be satisfied regardless of what order we open
1812    /// streams
1813    #[test]
1814    fn stream_create_out_of_order() {
1815        let local_tp = crate::TransportParams::default();
1816        let peer_tp = crate::TransportParams::default();
1817
1818        let mut streams = <StreamMap>::new(5, 5, 5);
1819
1820        for stream_id in [8, 12, 4] {
1821            assert!(is_local(stream_id, false), "stream id is client initiated");
1822            assert!(is_bidi(stream_id), "stream id is bidirectional");
1823            assert!(streams
1824                .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
1825                .is_ok());
1826        }
1827    }
1828
1829    /// Check stream limit boundary cases
1830    #[test]
1831    fn stream_limit_edge() {
1832        let local_tp = crate::TransportParams::default();
1833        let peer_tp = crate::TransportParams::default();
1834
1835        let mut streams = <StreamMap>::new(3, 3, 3);
1836
1837        // Highest permitted
1838        let stream_id = 8;
1839        assert!(streams
1840            .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
1841            .is_ok());
1842
1843        // One more than highest permitted
1844        let stream_id = 12;
1845        assert_eq!(
1846            streams
1847                .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
1848                .err(),
1849            Some(Error::StreamLimit)
1850        );
1851    }
1852
1853    fn cycle_stream_priority(stream_id: u64, streams: &mut StreamMap) {
1854        let key = streams.get(stream_id).unwrap().priority_key.clone();
1855        streams.update_priority(&key.clone(), &key);
1856    }
1857
1858    #[test]
1859    fn writable_prioritized_default_priority() {
1860        let local_tp = crate::TransportParams::default();
1861        let peer_tp = crate::TransportParams {
1862            initial_max_stream_data_bidi_local: 100,
1863            initial_max_stream_data_uni: 100,
1864            ..Default::default()
1865        };
1866
1867        let mut streams = StreamMap::new(100, 100, 100);
1868
1869        for id in [0, 4, 8, 12] {
1870            assert!(streams
1871                .get_or_create(id, &local_tp, &peer_tp, false, true)
1872                .is_ok());
1873        }
1874
1875        let walk_1: Vec<u64> = streams.writable().collect();
1876        cycle_stream_priority(*walk_1.first().unwrap(), &mut streams);
1877        let walk_2: Vec<u64> = streams.writable().collect();
1878        cycle_stream_priority(*walk_2.first().unwrap(), &mut streams);
1879        let walk_3: Vec<u64> = streams.writable().collect();
1880        cycle_stream_priority(*walk_3.first().unwrap(), &mut streams);
1881        let walk_4: Vec<u64> = streams.writable().collect();
1882        cycle_stream_priority(*walk_4.first().unwrap(), &mut streams);
1883        let walk_5: Vec<u64> = streams.writable().collect();
1884
1885        // All streams are non-incremental and same urgency by default. Multiple
1886        // visits shuffle their order.
1887        assert_eq!(walk_1, vec![0, 4, 8, 12]);
1888        assert_eq!(walk_2, vec![4, 8, 12, 0]);
1889        assert_eq!(walk_3, vec![8, 12, 0, 4]);
1890        assert_eq!(walk_4, vec![12, 0, 4, 8,]);
1891        assert_eq!(walk_5, vec![0, 4, 8, 12]);
1892    }
1893
1894    #[test]
1895    fn writable_prioritized_insert_order() {
1896        let local_tp = crate::TransportParams::default();
1897        let peer_tp = crate::TransportParams {
1898            initial_max_stream_data_bidi_local: 100,
1899            initial_max_stream_data_uni: 100,
1900            ..Default::default()
1901        };
1902
1903        let mut streams = StreamMap::new(100, 100, 100);
1904
1905        // Inserting same-urgency incremental streams in a "random" order yields
1906        // same order to start with.
1907        for id in [12, 4, 8, 0] {
1908            assert!(streams
1909                .get_or_create(id, &local_tp, &peer_tp, false, true)
1910                .is_ok());
1911        }
1912
1913        let walk_1: Vec<u64> = streams.writable().collect();
1914        cycle_stream_priority(*walk_1.first().unwrap(), &mut streams);
1915        let walk_2: Vec<u64> = streams.writable().collect();
1916        cycle_stream_priority(*walk_2.first().unwrap(), &mut streams);
1917        let walk_3: Vec<u64> = streams.writable().collect();
1918        cycle_stream_priority(*walk_3.first().unwrap(), &mut streams);
1919        let walk_4: Vec<u64> = streams.writable().collect();
1920        cycle_stream_priority(*walk_4.first().unwrap(), &mut streams);
1921        let walk_5: Vec<u64> = streams.writable().collect();
1922        assert_eq!(walk_1, vec![12, 4, 8, 0]);
1923        assert_eq!(walk_2, vec![4, 8, 0, 12]);
1924        assert_eq!(walk_3, vec![8, 0, 12, 4,]);
1925        assert_eq!(walk_4, vec![0, 12, 4, 8]);
1926        assert_eq!(walk_5, vec![12, 4, 8, 0]);
1927    }
1928
1929    #[test]
1930    fn writable_prioritized_mixed_urgency() {
1931        let local_tp = crate::TransportParams::default();
1932        let peer_tp = crate::TransportParams {
1933            initial_max_stream_data_bidi_local: 100,
1934            initial_max_stream_data_uni: 100,
1935            ..Default::default()
1936        };
1937
1938        let mut streams = <StreamMap>::new(100, 100, 100);
1939
1940        // Streams where the urgency descends (becomes more important). No stream
1941        // shares an urgency.
1942        let input = vec![
1943            (0, 100),
1944            (4, 90),
1945            (8, 80),
1946            (12, 70),
1947            (16, 60),
1948            (20, 50),
1949            (24, 40),
1950            (28, 30),
1951            (32, 20),
1952            (36, 10),
1953            (40, 0),
1954        ];
1955
1956        for (id, urgency) in input.clone() {
1957            // this duplicates some code from stream_priority in order to access
1958            // streams and the collection they're in
1959            let stream = streams
1960                .get_or_create(id, &local_tp, &peer_tp, false, true)
1961                .unwrap();
1962
1963            stream.urgency = urgency;
1964
1965            let new_priority_key = Arc::new(StreamPriorityKey {
1966                urgency: stream.urgency,
1967                incremental: stream.incremental,
1968                id,
1969                ..Default::default()
1970            });
1971
1972            let old_priority_key = std::mem::replace(
1973                &mut stream.priority_key,
1974                new_priority_key.clone(),
1975            );
1976
1977            streams.update_priority(&old_priority_key, &new_priority_key);
1978        }
1979
1980        let walk_1: Vec<u64> = streams.writable().collect();
1981        assert_eq!(walk_1, vec![40, 36, 32, 28, 24, 20, 16, 12, 8, 4, 0]);
1982
1983        // Re-applying priority to a stream does not cause duplication.
1984        for (id, urgency) in input {
1985            // this duplicates some code from stream_priority in order to access
1986            // streams and the collection they're in
1987            let stream = streams
1988                .get_or_create(id, &local_tp, &peer_tp, false, true)
1989                .unwrap();
1990
1991            stream.urgency = urgency;
1992
1993            let new_priority_key = Arc::new(StreamPriorityKey {
1994                urgency: stream.urgency,
1995                incremental: stream.incremental,
1996                id,
1997                ..Default::default()
1998            });
1999
2000            let old_priority_key = std::mem::replace(
2001                &mut stream.priority_key,
2002                new_priority_key.clone(),
2003            );
2004
2005            streams.update_priority(&old_priority_key, &new_priority_key);
2006        }
2007
2008        let walk_2: Vec<u64> = streams.writable().collect();
2009        assert_eq!(walk_2, vec![40, 36, 32, 28, 24, 20, 16, 12, 8, 4, 0]);
2010
2011        // Removing streams doesn't break expected ordering.
2012        streams.collect(24, true);
2013
2014        let walk_3: Vec<u64> = streams.writable().collect();
2015        assert_eq!(walk_3, vec![40, 36, 32, 28, 20, 16, 12, 8, 4, 0]);
2016
2017        streams.collect(40, true);
2018        streams.collect(0, true);
2019
2020        let walk_4: Vec<u64> = streams.writable().collect();
2021        assert_eq!(walk_4, vec![36, 32, 28, 20, 16, 12, 8, 4]);
2022
2023        // Adding streams doesn't break expected ordering.
2024        streams
2025            .get_or_create(44, &local_tp, &peer_tp, false, true)
2026            .unwrap();
2027
2028        let walk_5: Vec<u64> = streams.writable().collect();
2029        assert_eq!(walk_5, vec![36, 32, 28, 20, 16, 12, 8, 4, 44]);
2030    }
2031
2032    #[test]
2033    fn writable_prioritized_mixed_urgencies_incrementals() {
2034        let local_tp = crate::TransportParams::default();
2035        let peer_tp = crate::TransportParams {
2036            initial_max_stream_data_bidi_local: 100,
2037            initial_max_stream_data_uni: 100,
2038            ..Default::default()
2039        };
2040
2041        let mut streams = StreamMap::new(100, 100, 100);
2042
2043        // Streams that share some urgency level
2044        let input = vec![
2045            (0, 100),
2046            (4, 20),
2047            (8, 100),
2048            (12, 20),
2049            (16, 90),
2050            (20, 25),
2051            (24, 90),
2052            (28, 30),
2053            (32, 80),
2054            (36, 20),
2055            (40, 0),
2056        ];
2057
2058        for (id, urgency) in input.clone() {
2059            // this duplicates some code from stream_priority in order to access
2060            // streams and the collection they're in
2061            let stream = streams
2062                .get_or_create(id, &local_tp, &peer_tp, false, true)
2063                .unwrap();
2064
2065            stream.urgency = urgency;
2066
2067            let new_priority_key = Arc::new(StreamPriorityKey {
2068                urgency: stream.urgency,
2069                incremental: stream.incremental,
2070                id,
2071                ..Default::default()
2072            });
2073
2074            let old_priority_key = std::mem::replace(
2075                &mut stream.priority_key,
2076                new_priority_key.clone(),
2077            );
2078
2079            streams.update_priority(&old_priority_key, &new_priority_key);
2080        }
2081
2082        let walk_1: Vec<u64> = streams.writable().collect();
2083        cycle_stream_priority(4, &mut streams);
2084        cycle_stream_priority(16, &mut streams);
2085        cycle_stream_priority(0, &mut streams);
2086        let walk_2: Vec<u64> = streams.writable().collect();
2087        cycle_stream_priority(12, &mut streams);
2088        cycle_stream_priority(24, &mut streams);
2089        cycle_stream_priority(8, &mut streams);
2090        let walk_3: Vec<u64> = streams.writable().collect();
2091        cycle_stream_priority(36, &mut streams);
2092        cycle_stream_priority(16, &mut streams);
2093        cycle_stream_priority(0, &mut streams);
2094        let walk_4: Vec<u64> = streams.writable().collect();
2095        cycle_stream_priority(4, &mut streams);
2096        cycle_stream_priority(24, &mut streams);
2097        cycle_stream_priority(8, &mut streams);
2098        let walk_5: Vec<u64> = streams.writable().collect();
2099        cycle_stream_priority(12, &mut streams);
2100        cycle_stream_priority(16, &mut streams);
2101        cycle_stream_priority(0, &mut streams);
2102        let walk_6: Vec<u64> = streams.writable().collect();
2103        cycle_stream_priority(36, &mut streams);
2104        cycle_stream_priority(24, &mut streams);
2105        cycle_stream_priority(8, &mut streams);
2106        let walk_7: Vec<u64> = streams.writable().collect();
2107        cycle_stream_priority(4, &mut streams);
2108        cycle_stream_priority(16, &mut streams);
2109        cycle_stream_priority(0, &mut streams);
2110        let walk_8: Vec<u64> = streams.writable().collect();
2111        cycle_stream_priority(12, &mut streams);
2112        cycle_stream_priority(24, &mut streams);
2113        cycle_stream_priority(8, &mut streams);
2114        let walk_9: Vec<u64> = streams.writable().collect();
2115        cycle_stream_priority(36, &mut streams);
2116        cycle_stream_priority(16, &mut streams);
2117        cycle_stream_priority(0, &mut streams);
2118
2119        assert_eq!(walk_1, vec![40, 4, 12, 36, 20, 28, 32, 16, 24, 0, 8]);
2120        assert_eq!(walk_2, vec![40, 12, 36, 4, 20, 28, 32, 24, 16, 8, 0]);
2121        assert_eq!(walk_3, vec![40, 36, 4, 12, 20, 28, 32, 16, 24, 0, 8]);
2122        assert_eq!(walk_4, vec![40, 4, 12, 36, 20, 28, 32, 24, 16, 8, 0]);
2123        assert_eq!(walk_5, vec![40, 12, 36, 4, 20, 28, 32, 16, 24, 0, 8]);
2124        assert_eq!(walk_6, vec![40, 36, 4, 12, 20, 28, 32, 24, 16, 8, 0]);
2125        assert_eq!(walk_7, vec![40, 4, 12, 36, 20, 28, 32, 16, 24, 0, 8]);
2126        assert_eq!(walk_8, vec![40, 12, 36, 4, 20, 28, 32, 24, 16, 8, 0]);
2127        assert_eq!(walk_9, vec![40, 36, 4, 12, 20, 28, 32, 16, 24, 0, 8]);
2128
2129        // Removing streams doesn't break expected ordering.
2130        streams.collect(20, true);
2131
2132        let walk_10: Vec<u64> = streams.writable().collect();
2133        assert_eq!(walk_10, vec![40, 4, 12, 36, 28, 32, 24, 16, 8, 0]);
2134
2135        // Adding streams doesn't break expected ordering.
2136        let stream = streams
2137            .get_or_create(44, &local_tp, &peer_tp, false, true)
2138            .unwrap();
2139
2140        stream.urgency = 20;
2141        stream.incremental = true;
2142
2143        let new_priority_key = Arc::new(StreamPriorityKey {
2144            urgency: stream.urgency,
2145            incremental: stream.incremental,
2146            id: 44,
2147            ..Default::default()
2148        });
2149
2150        let old_priority_key =
2151            std::mem::replace(&mut stream.priority_key, new_priority_key.clone());
2152
2153        streams.update_priority(&old_priority_key, &new_priority_key);
2154
2155        let walk_11: Vec<u64> = streams.writable().collect();
2156        assert_eq!(walk_11, vec![40, 4, 12, 36, 44, 28, 32, 24, 16, 8, 0]);
2157    }
2158
2159    #[test]
2160    fn priority_tree_dupes() {
2161        let mut prioritized_writable: RBTree<StreamWritablePriorityAdapter> =
2162            Default::default();
2163
2164        for id in [0, 4, 8, 12] {
2165            let s = Arc::new(StreamPriorityKey {
2166                urgency: 0,
2167                incremental: false,
2168                id,
2169                ..Default::default()
2170            });
2171
2172            prioritized_writable.insert(s);
2173        }
2174
2175        let walk_1: Vec<u64> =
2176            prioritized_writable.iter().map(|s| s.id).collect();
2177        assert_eq!(walk_1, vec![0, 4, 8, 12]);
2178
2179        // Default keys could cause duplicate entries, this is normally protected
2180        // against via StreamMap.
2181        for id in [0, 4, 8, 12] {
2182            let s = Arc::new(StreamPriorityKey {
2183                urgency: 0,
2184                incremental: false,
2185                id,
2186                ..Default::default()
2187            });
2188
2189            prioritized_writable.insert(s);
2190        }
2191
2192        let walk_2: Vec<u64> =
2193            prioritized_writable.iter().map(|s| s.id).collect();
2194        assert_eq!(walk_2, vec![0, 0, 4, 4, 8, 8, 12, 12]);
2195    }
2196}
2197
2198mod recv_buf;
2199mod send_buf;