Skip to main content

quiche/stream/
mod.rs

1// Copyright (C) 2018-2019, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27use std::cmp;
28
29use std::sync::Arc;
30
31use std::collections::hash_map;
32use std::collections::HashMap;
33use std::collections::HashSet;
34
35use intrusive_collections::intrusive_adapter;
36use intrusive_collections::KeyAdapter;
37use intrusive_collections::RBTree;
38use intrusive_collections::RBTreeAtomicLink;
39
40use smallvec::SmallVec;
41
42use crate::buffers::DefaultBufFactory;
43use crate::BufFactory;
44use crate::Error;
45use crate::Result;
46
47const DEFAULT_URGENCY: u8 = 127;
48
49/// The default size of the receiver stream flow control window.
50pub(crate) const DEFAULT_STREAM_WINDOW: u64 = 32 * 1024;
51
52/// The maximum size of the receiver stream flow control window.
53pub const MAX_STREAM_WINDOW: u64 = 16 * 1024 * 1024;
54
55/// A simple no-op hasher for Stream IDs.
56///
57/// The QUIC protocol and quiche library guarantees stream ID uniqueness, so
58/// we can save effort by avoiding using a more complicated algorithm.
59#[derive(Default)]
60pub struct StreamIdHasher {
61    id: u64,
62}
63
64/// Return value type of `RecvBuf::reset()`
65#[derive(Debug, PartialEq, Clone, Copy)]
66pub struct RecvBufResetReturn {
67    /// Returns the difference between the previous max_data offset
68    /// received and the final size reported by the reset
69    pub max_data_delta: u64,
70
71    /// The amount of flow control credit that should be returned to the
72    /// connection level flow control.
73    pub consumed_flowcontrol: u64,
74}
75
76impl RecvBufResetReturn {
77    pub fn zero() -> Self {
78        Self {
79            max_data_delta: 0,
80            consumed_flowcontrol: 0,
81        }
82    }
83}
84
85/// Action to perform when reading from a stream's receive buffer.
86pub enum RecvAction<T: bytes::BufMut> {
87    /// Emit data by copying it into the provided buffer.
88    Emit { out: T },
89    /// Discard up to the specified number of bytes without copying.
90    Discard { len: usize },
91}
92
93impl std::hash::Hasher for StreamIdHasher {
94    #[inline]
95    fn finish(&self) -> u64 {
96        self.id
97    }
98
99    #[inline]
100    fn write_u64(&mut self, id: u64) {
101        self.id = id;
102    }
103
104    #[inline]
105    fn write(&mut self, _: &[u8]) {
106        // We need a default write() for the trait but stream IDs will always
107        // be a u64 so we just delegate to write_u64.
108        unimplemented!()
109    }
110}
111
112type BuildStreamIdHasher = std::hash::BuildHasherDefault<StreamIdHasher>;
113
114pub type StreamIdHashMap<V> = HashMap<u64, V, BuildStreamIdHasher>;
115pub type StreamIdHashSet = HashSet<u64, BuildStreamIdHasher>;
116
117/// Keeps track of QUIC streams and enforces stream limits.
118#[derive(Default)]
119pub struct StreamMap<F: BufFactory = DefaultBufFactory> {
120    /// Map of streams indexed by stream ID.
121    streams: StreamIdHashMap<Stream<F>>,
122
123    /// Set of streams that were completed and garbage collected.
124    ///
125    /// Instead of keeping the full stream state forever, we collect completed
126    /// streams to save memory, but we still need to keep track of previously
127    /// created streams, to prevent peers from re-creating them.
128    collected: StreamIdHashSet,
129
130    /// Peer's maximum bidirectional stream count limit.
131    peer_max_streams_bidi: u64,
132
133    /// Peer's maximum unidirectional stream count limit.
134    peer_max_streams_uni: u64,
135
136    /// The total number of bidirectional streams opened by the peer.
137    peer_opened_streams_bidi: u64,
138
139    /// The total number of unidirectional streams opened by the peer.
140    peer_opened_streams_uni: u64,
141
142    /// Local maximum bidirectional stream count limit.
143    local_max_streams_bidi: u64,
144    local_max_streams_bidi_next: u64,
145
146    /// Initial maximum bidirectional stream count.
147    initial_max_streams_bidi: u64,
148
149    /// Local maximum unidirectional stream count limit.
150    local_max_streams_uni: u64,
151    local_max_streams_uni_next: u64,
152
153    /// Initial maximum unidirectional stream count.
154    initial_max_streams_uni: u64,
155
156    /// The total number of bidirectional streams opened by the local endpoint.
157    local_opened_streams_bidi: u64,
158
159    /// The total number of unidirectional streams opened by the local endpoint.
160    local_opened_streams_uni: u64,
161
162    /// Queue of stream IDs corresponding to streams that have buffered data
163    /// ready to be sent to the peer. This also implies that the stream has
164    /// enough flow control credits to send at least some of that data.
165    flushable: RBTree<StreamFlushablePriorityAdapter>,
166
167    /// Set of stream IDs corresponding to streams that have outstanding data
168    /// to read. This is used to generate a `StreamIter` of streams without
169    /// having to iterate over the full list of streams.
170    pub readable: RBTree<StreamReadablePriorityAdapter>,
171
172    /// Set of stream IDs corresponding to streams that have enough flow control
173    /// capacity to be written to, and is not finished. This is used to generate
174    /// a `StreamIter` of streams without having to iterate over the full list
175    /// of streams.
176    pub writable: RBTree<StreamWritablePriorityAdapter>,
177
178    /// Set of stream IDs corresponding to streams that are almost out of flow
179    /// control credit and need to send MAX_STREAM_DATA. This is used to
180    /// generate a `StreamIter` of streams without having to iterate over the
181    /// full list of streams.
182    almost_full: StreamIdHashSet,
183
184    /// Set of stream IDs corresponding to streams that are blocked. The value
185    /// of the map elements represents the offset of the stream at which the
186    /// blocking occurred.
187    blocked: StreamIdHashMap<u64>,
188
189    /// Set of stream IDs corresponding to streams that are reset. The value
190    /// of the map elements is a tuple of the error code and final size values
191    /// to include in the RESET_STREAM frame.
192    reset: StreamIdHashMap<(u64, u64)>,
193
194    /// Set of stream IDs corresponding to streams that are shutdown on the
195    /// receive side, and need to send a STOP_SENDING frame. The value of the
196    /// map elements is the error code to include in the STOP_SENDING frame.
197    stopped: StreamIdHashMap<u64>,
198
199    /// The maximum size of a stream window.
200    max_stream_window: u64,
201
202    /// When `true`, the initial flow control window will be set to the
203    /// `max_rx_data`, if false, it will be set to `DEFAULT_STREAM_WINDOW`
204    use_initial_max_data_as_flow_control_win: bool,
205}
206
207impl<F: BufFactory> StreamMap<F> {
208    pub fn new(
209        max_streams_bidi: u64, max_streams_uni: u64, max_stream_window: u64,
210    ) -> Self {
211        StreamMap {
212            local_max_streams_bidi: max_streams_bidi,
213            local_max_streams_bidi_next: max_streams_bidi,
214            initial_max_streams_bidi: max_streams_bidi,
215
216            local_max_streams_uni: max_streams_uni,
217            local_max_streams_uni_next: max_streams_uni,
218            initial_max_streams_uni: max_streams_uni,
219
220            max_stream_window,
221
222            ..StreamMap::default()
223        }
224    }
225
226    /// Returns the stream with the given ID if it exists.
227    pub fn get(&self, id: u64) -> Option<&Stream<F>> {
228        self.streams.get(&id)
229    }
230
231    /// Returns the mutable stream with the given ID if it exists.
232    pub fn get_mut(&mut self, id: u64) -> Option<&mut Stream<F>> {
233        self.streams.get_mut(&id)
234    }
235
236    /// Returns the mutable stream with the given ID if it exists, or creates
237    /// a new one otherwise.
238    ///
239    /// The `local` parameter indicates whether the stream's creation was
240    /// requested by the local application rather than the peer, and is
241    /// used to validate the requested stream ID, and to select the initial
242    /// flow control values from the local and remote transport parameters
243    /// (also passed as arguments).
244    ///
245    /// This also takes care of enforcing both local and the peer's stream
246    /// count limits. If one of these limits is violated, the `StreamLimit`
247    /// error is returned.
248    pub(crate) fn get_or_create(
249        &mut self, id: u64, local_params: &crate::TransportParams,
250        peer_params: &crate::TransportParams, local: bool, is_server: bool,
251    ) -> Result<&mut Stream<F>> {
252        let (stream, is_new_and_writable) = match self.streams.entry(id) {
253            hash_map::Entry::Vacant(v) => {
254                // Stream has already been closed and garbage collected.
255                if self.collected.contains(&id) {
256                    return Err(Error::Done);
257                }
258
259                if local != is_local(id, is_server) {
260                    return Err(Error::InvalidStreamState(id));
261                }
262
263                let (max_rx_data, max_tx_data) = match (local, is_bidi(id)) {
264                    // Locally-initiated bidirectional stream.
265                    (true, true) => (
266                        local_params.initial_max_stream_data_bidi_local,
267                        peer_params.initial_max_stream_data_bidi_remote,
268                    ),
269
270                    // Locally-initiated unidirectional stream.
271                    (true, false) => (0, peer_params.initial_max_stream_data_uni),
272
273                    // Remotely-initiated bidirectional stream.
274                    (false, true) => (
275                        local_params.initial_max_stream_data_bidi_remote,
276                        peer_params.initial_max_stream_data_bidi_local,
277                    ),
278
279                    // Remotely-initiated unidirectional stream.
280                    (false, false) =>
281                        (local_params.initial_max_stream_data_uni, 0),
282                };
283
284                // The two least significant bits from a stream id identify the
285                // type of stream. Truncate those bits to get the sequence for
286                // that stream type.
287                let stream_sequence = id >> 2;
288
289                // Enforce stream count limits.
290                match (is_local(id, is_server), is_bidi(id)) {
291                    (true, true) => {
292                        let n = cmp::max(
293                            self.local_opened_streams_bidi,
294                            stream_sequence + 1,
295                        );
296
297                        if n > self.peer_max_streams_bidi {
298                            return Err(Error::StreamLimit);
299                        }
300
301                        self.local_opened_streams_bidi = n;
302                    },
303
304                    (true, false) => {
305                        let n = cmp::max(
306                            self.local_opened_streams_uni,
307                            stream_sequence + 1,
308                        );
309
310                        if n > self.peer_max_streams_uni {
311                            return Err(Error::StreamLimit);
312                        }
313
314                        self.local_opened_streams_uni = n;
315                    },
316
317                    (false, true) => {
318                        let n = cmp::max(
319                            self.peer_opened_streams_bidi,
320                            stream_sequence + 1,
321                        );
322
323                        if n > self.local_max_streams_bidi {
324                            return Err(Error::StreamLimit);
325                        }
326
327                        self.peer_opened_streams_bidi = n;
328                    },
329
330                    (false, false) => {
331                        let n = cmp::max(
332                            self.peer_opened_streams_uni,
333                            stream_sequence + 1,
334                        );
335
336                        if n > self.local_max_streams_uni {
337                            return Err(Error::StreamLimit);
338                        }
339
340                        self.peer_opened_streams_uni = n;
341                    },
342                };
343
344                let initial_window =
345                    if self.use_initial_max_data_as_flow_control_win {
346                        max_rx_data
347                    } else {
348                        cmp::min(max_rx_data, DEFAULT_STREAM_WINDOW)
349                    };
350                let s = Stream::new(
351                    id,
352                    max_rx_data,
353                    max_tx_data,
354                    local,
355                    initial_window,
356                    self.max_stream_window,
357                );
358
359                let is_writable = s.is_writable();
360
361                (v.insert(s), is_writable)
362            },
363
364            hash_map::Entry::Occupied(v) => (v.into_mut(), false),
365        };
366
367        // Newly created stream might already be writable due to initial flow
368        // control limits.
369        if is_new_and_writable {
370            self.writable.insert(Arc::clone(&stream.priority_key));
371        }
372
373        Ok(stream)
374    }
375
376    /// Adds the stream ID to the readable streams set.
377    ///
378    /// If the stream was already in the list, this does nothing.
379    pub fn insert_readable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
380        if !priority_key.readable.is_linked() {
381            self.readable.insert(Arc::clone(priority_key));
382        }
383    }
384
385    /// Removes the stream ID from the readable streams set.
386    pub fn remove_readable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
387        if !priority_key.readable.is_linked() {
388            return;
389        }
390
391        let mut c = {
392            let ptr = Arc::as_ptr(priority_key);
393            unsafe { self.readable.cursor_mut_from_ptr(ptr) }
394        };
395
396        c.remove();
397    }
398
399    /// Adds the stream ID to the writable streams set.
400    ///
401    /// This should also be called anytime a new stream is created, in addition
402    /// to when an existing stream becomes writable.
403    ///
404    /// If the stream was already in the list, this does nothing.
405    pub fn insert_writable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
406        if !priority_key.writable.is_linked() {
407            self.writable.insert(Arc::clone(priority_key));
408        }
409    }
410
411    /// Removes the stream ID from the writable streams set.
412    ///
413    /// This should also be called anytime an existing stream stops being
414    /// writable.
415    pub fn remove_writable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
416        if !priority_key.writable.is_linked() {
417            return;
418        }
419
420        let mut c = {
421            let ptr = Arc::as_ptr(priority_key);
422            unsafe { self.writable.cursor_mut_from_ptr(ptr) }
423        };
424
425        c.remove();
426    }
427
428    /// Adds the stream ID to the flushable streams set.
429    ///
430    /// If the stream was already in the list, this does nothing.
431    pub fn insert_flushable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
432        if !priority_key.flushable.is_linked() {
433            self.flushable.insert(Arc::clone(priority_key));
434        }
435    }
436
437    /// Removes the stream ID from the flushable streams set.
438    pub fn remove_flushable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
439        if !priority_key.flushable.is_linked() {
440            return;
441        }
442
443        let mut c = {
444            let ptr = Arc::as_ptr(priority_key);
445            unsafe { self.flushable.cursor_mut_from_ptr(ptr) }
446        };
447
448        c.remove();
449    }
450
451    pub fn peek_flushable(&self) -> Option<Arc<StreamPriorityKey>> {
452        self.flushable.front().clone_pointer()
453    }
454
455    /// Updates the priorities of a stream.
456    pub fn update_priority(
457        &mut self, old: &Arc<StreamPriorityKey>, new: &Arc<StreamPriorityKey>,
458    ) {
459        if old.readable.is_linked() {
460            self.remove_readable(old);
461            self.readable.insert(Arc::clone(new));
462        }
463
464        if old.writable.is_linked() {
465            self.remove_writable(old);
466            self.writable.insert(Arc::clone(new));
467        }
468
469        if old.flushable.is_linked() {
470            self.remove_flushable(old);
471            self.flushable.insert(Arc::clone(new));
472        }
473    }
474
475    /// Adds the stream ID to the almost full streams set.
476    ///
477    /// If the stream was already in the list, this does nothing.
478    pub fn insert_almost_full(&mut self, stream_id: u64) {
479        self.almost_full.insert(stream_id);
480    }
481
482    /// Removes the stream ID from the almost full streams set.
483    pub fn remove_almost_full(&mut self, stream_id: u64) {
484        self.almost_full.remove(&stream_id);
485    }
486
487    /// Adds the stream ID to the blocked streams set with the
488    /// given offset value.
489    ///
490    /// If the stream was already in the list, this does nothing.
491    pub fn insert_blocked(&mut self, stream_id: u64, off: u64) {
492        self.blocked.insert(stream_id, off);
493    }
494
495    /// Removes the stream ID from the blocked streams set.
496    pub fn remove_blocked(&mut self, stream_id: u64) {
497        self.blocked.remove(&stream_id);
498    }
499
500    /// Adds the stream ID to the reset streams set with the
501    /// given error code and final size values.
502    ///
503    /// If the stream was already in the list, this does nothing.
504    pub fn insert_reset(
505        &mut self, stream_id: u64, error_code: u64, final_size: u64,
506    ) {
507        self.reset.insert(stream_id, (error_code, final_size));
508    }
509
510    /// Removes the stream ID from the reset streams set.
511    pub fn remove_reset(&mut self, stream_id: u64) {
512        self.reset.remove(&stream_id);
513    }
514
515    /// Adds the stream ID to the stopped streams set with the
516    /// given error code.
517    ///
518    /// If the stream was already in the list, this does nothing.
519    pub fn insert_stopped(&mut self, stream_id: u64, error_code: u64) {
520        self.stopped.insert(stream_id, error_code);
521    }
522
523    /// Removes the stream ID from the stopped streams set.
524    pub fn remove_stopped(&mut self, stream_id: u64) {
525        self.stopped.remove(&stream_id);
526    }
527
528    /// Updates the peer's maximum bidirectional stream count limit.
529    pub fn update_peer_max_streams_bidi(&mut self, v: u64) {
530        self.peer_max_streams_bidi = cmp::max(self.peer_max_streams_bidi, v);
531    }
532
533    /// Updates the peer's maximum unidirectional stream count limit.
534    pub fn update_peer_max_streams_uni(&mut self, v: u64) {
535        self.peer_max_streams_uni = cmp::max(self.peer_max_streams_uni, v);
536    }
537
538    /// Commits the new max_streams_bidi limit.
539    pub fn update_max_streams_bidi(&mut self) {
540        self.local_max_streams_bidi = self.local_max_streams_bidi_next;
541    }
542
543    /// Sets the max_streams_bidi limit to the given value.
544    pub fn set_max_streams_bidi(&mut self, max: u64) {
545        self.local_max_streams_bidi = max;
546        self.local_max_streams_bidi_next = max;
547        self.initial_max_streams_bidi = max;
548    }
549
550    /// Returns the current max_streams_bidi limit.
551    pub fn max_streams_bidi(&self) -> u64 {
552        self.local_max_streams_bidi
553    }
554
555    /// Returns the new max_streams_bidi limit.
556    pub fn max_streams_bidi_next(&mut self) -> u64 {
557        self.local_max_streams_bidi_next
558    }
559
560    /// Commits the new max_streams_uni limit.
561    pub fn update_max_streams_uni(&mut self) {
562        self.local_max_streams_uni = self.local_max_streams_uni_next;
563    }
564
565    /// Returns the new max_streams_uni limit.
566    pub fn max_streams_uni_next(&mut self) -> u64 {
567        self.local_max_streams_uni_next
568    }
569
570    /// Returns the peer's current maximum bidirectional stream count limit.
571    pub fn peer_max_streams_bidi(&self) -> u64 {
572        self.peer_max_streams_bidi
573    }
574
575    /// Returns the number of bidirectional streams that can be created
576    /// before the peer's stream count limit is reached.
577    pub fn peer_streams_left_bidi(&self) -> u64 {
578        self.peer_max_streams_bidi - self.local_opened_streams_bidi
579    }
580
581    /// Returns the peer's current maximum unidirectional stream count limit.
582    pub fn peer_max_streams_uni(&self) -> u64 {
583        self.peer_max_streams_uni
584    }
585
586    /// Returns the number of unidirectional streams that can be created
587    /// before the peer's stream count limit is reached.
588    pub fn peer_streams_left_uni(&self) -> u64 {
589        self.peer_max_streams_uni - self.local_opened_streams_uni
590    }
591
592    /// Drops completed stream.
593    ///
594    /// This should only be called when Stream::is_complete() returns true for
595    /// the given stream.
596    pub fn collect(&mut self, stream_id: u64, local: bool) {
597        if !local {
598            // If the stream was created by the peer, give back a max streams
599            // credit.
600            if is_bidi(stream_id) {
601                self.local_max_streams_bidi_next =
602                    self.local_max_streams_bidi_next.saturating_add(1);
603            } else {
604                self.local_max_streams_uni_next =
605                    self.local_max_streams_uni_next.saturating_add(1);
606            }
607        }
608
609        let s = self.streams.remove(&stream_id).unwrap();
610
611        self.remove_readable(&s.priority_key);
612
613        self.remove_writable(&s.priority_key);
614
615        self.remove_flushable(&s.priority_key);
616
617        self.collected.insert(stream_id);
618    }
619
620    /// Creates an iterator over streams that have outstanding data to read.
621    pub fn readable(&self) -> StreamIter {
622        StreamIter {
623            streams: self.readable.iter().map(|s| s.id).collect(),
624            index: 0,
625        }
626    }
627
628    /// Creates an iterator over streams that can be written to.
629    pub fn writable(&self) -> StreamIter {
630        StreamIter {
631            streams: self.writable.iter().map(|s| s.id).collect(),
632            index: 0,
633        }
634    }
635
636    /// Creates an iterator over streams that need to send MAX_STREAM_DATA.
637    pub fn almost_full(&self) -> StreamIter {
638        StreamIter::from(&self.almost_full)
639    }
640
641    /// Creates an iterator over streams that need to send STREAM_DATA_BLOCKED.
642    pub fn blocked(&self) -> hash_map::Iter<'_, u64, u64> {
643        self.blocked.iter()
644    }
645
646    /// Creates an iterator over streams that need to send RESET_STREAM.
647    pub fn reset(&self) -> hash_map::Iter<'_, u64, (u64, u64)> {
648        self.reset.iter()
649    }
650
651    /// Creates an iterator over streams that need to send STOP_SENDING.
652    pub fn stopped(&self) -> hash_map::Iter<'_, u64, u64> {
653        self.stopped.iter()
654    }
655
656    /// Returns true if the stream has been collected.
657    pub fn is_collected(&self, stream_id: u64) -> bool {
658        self.collected.contains(&stream_id)
659    }
660
661    /// Returns true if there are any streams that have data to write.
662    pub fn has_flushable(&self) -> bool {
663        !self.flushable.is_empty()
664    }
665
666    /// Returns true if there are any streams that have data to read.
667    pub fn has_readable(&self) -> bool {
668        !self.readable.is_empty()
669    }
670
671    /// Returns true if there are any streams that need to update the local
672    /// flow control limit.
673    pub fn has_almost_full(&self) -> bool {
674        !self.almost_full.is_empty()
675    }
676
677    /// Returns true if there are any streams that are blocked.
678    pub fn has_blocked(&self) -> bool {
679        !self.blocked.is_empty()
680    }
681
682    /// Returns true if there are any streams that are reset.
683    pub fn has_reset(&self) -> bool {
684        !self.reset.is_empty()
685    }
686
687    /// Returns true if there are any streams that need to send STOP_SENDING.
688    pub fn has_stopped(&self) -> bool {
689        !self.stopped.is_empty()
690    }
691
692    /// Returns true if the max bidirectional streams count needs to be updated
693    /// by sending a MAX_STREAMS frame to the peer.
694    ///
695    /// This only sends MAX_STREAMS when available capacity is at or below 50%
696    /// of the initial maximum streams target.
697    pub fn should_update_max_streams_bidi(&self) -> bool {
698        let available = self
699            .local_max_streams_bidi
700            .saturating_sub(self.peer_opened_streams_bidi);
701        self.local_max_streams_bidi_next != self.local_max_streams_bidi &&
702            available <= self.initial_max_streams_bidi / 2
703    }
704
705    /// Returns true if the max unidirectional streams count needs to be updated
706    /// by sending a MAX_STREAMS frame to the peer.
707    ///
708    /// This only send MAX_STREAMS when available capacity is at or below 50% of
709    /// the initial maximum streams target.
710    pub fn should_update_max_streams_uni(&self) -> bool {
711        let available = self
712            .local_max_streams_uni
713            .saturating_sub(self.peer_opened_streams_uni);
714        self.local_max_streams_uni_next != self.local_max_streams_uni &&
715            available <= self.initial_max_streams_uni / 2
716    }
717
718    /// Returns the number of active streams in the map.
719    #[cfg(test)]
720    pub fn len(&self) -> usize {
721        self.streams.len()
722    }
723
724    /// When `true`, the initial flow control window will be set to the
725    /// `max_rx_data`, if false, it will be set to `DEFAULT_STREAM_WINDOW`
726    pub(crate) fn set_use_initial_max_data_as_flow_control_win(
727        &mut self, v: bool,
728    ) {
729        self.use_initial_max_data_as_flow_control_win = v;
730    }
731}
732
733/// A QUIC stream.
734pub struct Stream<F: BufFactory = DefaultBufFactory> {
735    /// Receive-side stream buffer.
736    pub recv: recv_buf::RecvBuf,
737
738    /// Send-side stream buffer.
739    pub send: send_buf::SendBuf<F>,
740
741    pub send_lowat: usize,
742
743    /// Whether the stream is bidirectional.
744    pub bidi: bool,
745
746    /// Whether the stream was created by the local endpoint.
747    pub local: bool,
748
749    /// The stream's urgency (lower is better). Default is `DEFAULT_URGENCY`.
750    pub urgency: u8,
751
752    /// Whether the stream can be flushed incrementally. Default is `true`.
753    pub incremental: bool,
754
755    pub priority_key: Arc<StreamPriorityKey>,
756}
757
758impl<F: BufFactory> Stream<F> {
759    /// Creates a new stream with the given flow control limits.
760    pub fn new(
761        id: u64, max_rx_data: u64, max_tx_data: u64, local: bool,
762        initial_window: u64, max_window: u64,
763    ) -> Self {
764        let priority_key = Arc::new(StreamPriorityKey {
765            id,
766            ..Default::default()
767        });
768
769        Stream {
770            recv: recv_buf::RecvBuf::new(max_rx_data, initial_window, max_window),
771            send: send_buf::SendBuf::new(max_tx_data),
772            send_lowat: 1,
773            bidi: is_bidi(id),
774            local,
775            urgency: priority_key.urgency,
776            incremental: priority_key.incremental,
777            priority_key,
778        }
779    }
780
781    /// Returns true if the stream has data to read.
782    pub fn is_readable(&self) -> bool {
783        self.recv.ready()
784    }
785
786    /// Returns true if the stream has enough flow control capacity to be
787    /// written to, and is not finished.
788    pub fn is_writable(&self) -> bool {
789        !self.send.is_shutdown() &&
790            !self.send.is_fin() &&
791            (self.send.off_back() + self.send_lowat as u64) <
792                self.send.max_off()
793    }
794
795    /// Returns true if the stream has data to send and is allowed to send at
796    /// least some of it.
797    pub fn is_flushable(&self) -> bool {
798        let off_front = self.send.off_front();
799
800        !self.send.is_empty() &&
801            off_front < self.send.off_back() &&
802            off_front < self.send.max_off()
803    }
804
805    /// Returns true if the stream is complete.
806    ///
807    /// For bidirectional streams this happens when both the receive and send
808    /// sides are complete. That is when all incoming data has been read by the
809    /// application, and when all outgoing data has been acked by the peer.
810    ///
811    /// For unidirectional streams this happens when either the receive or send
812    /// side is complete, depending on whether the stream was created locally
813    /// or not.
814    pub fn is_complete(&self) -> bool {
815        match (self.bidi, self.local) {
816            // For bidirectional streams we need to check both receive and send
817            // sides for completion.
818            (true, _) => self.recv.is_fin() && self.send.is_complete(),
819
820            // For unidirectional streams generated locally, we only need to
821            // check the send side for completion.
822            (false, true) => self.send.is_complete(),
823
824            // For unidirectional streams generated by the peer, we only need
825            // to check the receive side for completion.
826            (false, false) => self.recv.is_fin(),
827        }
828    }
829}
830
831/// Returns true if the stream was created locally.
832pub fn is_local(stream_id: u64, is_server: bool) -> bool {
833    (stream_id & 0x1) == (is_server as u64)
834}
835
836/// Returns true if the stream is bidirectional.
837pub fn is_bidi(stream_id: u64) -> bool {
838    (stream_id & 0x2) == 0
839}
840
841#[derive(Clone, Debug)]
842pub struct StreamPriorityKey {
843    pub urgency: u8,
844    pub incremental: bool,
845    pub id: u64,
846
847    pub readable: RBTreeAtomicLink,
848    pub writable: RBTreeAtomicLink,
849    pub flushable: RBTreeAtomicLink,
850}
851
852impl Default for StreamPriorityKey {
853    fn default() -> Self {
854        Self {
855            urgency: DEFAULT_URGENCY,
856            incremental: true,
857            id: Default::default(),
858            readable: Default::default(),
859            writable: Default::default(),
860            flushable: Default::default(),
861        }
862    }
863}
864
865impl PartialEq for StreamPriorityKey {
866    fn eq(&self, other: &Self) -> bool {
867        self.id == other.id
868    }
869}
870
871impl Eq for StreamPriorityKey {}
872
873impl PartialOrd for StreamPriorityKey {
874    fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
875        Some(self.cmp(other))
876    }
877}
878
879impl Ord for StreamPriorityKey {
880    fn cmp(&self, other: &Self) -> cmp::Ordering {
881        // Ignore priority if ID matches.
882        if self.id == other.id {
883            return cmp::Ordering::Equal;
884        }
885
886        // First, order by urgency...
887        if self.urgency != other.urgency {
888            return self.urgency.cmp(&other.urgency);
889        }
890
891        // ...when the urgency is the same, and both are not incremental, order
892        // by stream ID...
893        if !self.incremental && !other.incremental {
894            return self.id.cmp(&other.id);
895        }
896
897        // ...non-incremental takes priority over incremental...
898        if self.incremental && !other.incremental {
899            return cmp::Ordering::Greater;
900        }
901        if !self.incremental && other.incremental {
902            return cmp::Ordering::Less;
903        }
904
905        // ...finally, when both are incremental, `other` takes precedence (so
906        // `self` is always sorted after other same-urgency incremental
907        // entries).
908        cmp::Ordering::Greater
909    }
910}
911
912intrusive_adapter!(pub StreamWritablePriorityAdapter = Arc<StreamPriorityKey>: StreamPriorityKey { writable: RBTreeAtomicLink });
913
914impl KeyAdapter<'_> for StreamWritablePriorityAdapter {
915    type Key = StreamPriorityKey;
916
917    fn get_key(&self, s: &StreamPriorityKey) -> Self::Key {
918        s.clone()
919    }
920}
921
922intrusive_adapter!(pub StreamReadablePriorityAdapter = Arc<StreamPriorityKey>: StreamPriorityKey { readable: RBTreeAtomicLink });
923
924impl KeyAdapter<'_> for StreamReadablePriorityAdapter {
925    type Key = StreamPriorityKey;
926
927    fn get_key(&self, s: &StreamPriorityKey) -> Self::Key {
928        s.clone()
929    }
930}
931
932intrusive_adapter!(pub StreamFlushablePriorityAdapter = Arc<StreamPriorityKey>: StreamPriorityKey { flushable: RBTreeAtomicLink });
933
934impl KeyAdapter<'_> for StreamFlushablePriorityAdapter {
935    type Key = StreamPriorityKey;
936
937    fn get_key(&self, s: &StreamPriorityKey) -> Self::Key {
938        s.clone()
939    }
940}
941
942/// An iterator over QUIC streams.
943#[derive(Default)]
944pub struct StreamIter {
945    streams: SmallVec<[u64; 8]>,
946    index: usize,
947}
948
949impl StreamIter {
950    #[inline]
951    fn from(streams: &StreamIdHashSet) -> Self {
952        StreamIter {
953            streams: streams.iter().copied().collect(),
954            index: 0,
955        }
956    }
957}
958
959impl Iterator for StreamIter {
960    type Item = u64;
961
962    #[inline]
963    fn next(&mut self) -> Option<Self::Item> {
964        let v = self.streams.get(self.index)?;
965        self.index += 1;
966        Some(*v)
967    }
968}
969
970impl ExactSizeIterator for StreamIter {
971    #[inline]
972    fn len(&self) -> usize {
973        self.streams.len() - self.index
974    }
975}
976
977#[cfg(test)]
978mod tests {
979    use crate::range_buf::RangeBuf;
980
981    use super::*;
982
983    #[test]
984    fn recv_flow_control() {
985        let mut stream = <Stream>::new(0, 15, 0, true, 15, DEFAULT_STREAM_WINDOW);
986        assert!(!stream.recv.almost_full());
987
988        let mut buf = [0; 32];
989
990        let first = RangeBuf::from(b"hello", 0, false);
991        let second = RangeBuf::from(b"world", 5, false);
992        let third = RangeBuf::from(b"something", 10, false);
993
994        assert_eq!(stream.recv.write(second), Ok(()));
995        assert_eq!(stream.recv.write(first), Ok(()));
996        assert!(!stream.recv.almost_full());
997
998        assert_eq!(stream.recv.write(third), Err(Error::FlowControl));
999
1000        let (len, fin) = stream.recv.emit(&mut buf).unwrap();
1001        assert_eq!(&buf[..len], b"helloworld");
1002        assert!(!fin);
1003
1004        assert!(stream.recv.almost_full());
1005
1006        stream.recv.update_max_data(std::time::Instant::now());
1007        assert_eq!(stream.recv.max_data_next(), 25);
1008        assert!(!stream.recv.almost_full());
1009
1010        let third = RangeBuf::from(b"something", 10, false);
1011        assert_eq!(stream.recv.write(third), Ok(()));
1012    }
1013
1014    #[test]
1015    fn recv_past_fin() {
1016        let mut stream = <Stream>::new(0, 15, 0, true, 15, DEFAULT_STREAM_WINDOW);
1017        assert!(!stream.recv.almost_full());
1018
1019        let first = RangeBuf::from(b"hello", 0, true);
1020        let second = RangeBuf::from(b"world", 5, false);
1021
1022        assert_eq!(stream.recv.write(first), Ok(()));
1023        assert_eq!(stream.recv.write(second), Err(Error::FinalSize));
1024    }
1025
1026    #[test]
1027    fn recv_fin_dup() {
1028        let mut stream = <Stream>::new(0, 15, 0, true, 15, DEFAULT_STREAM_WINDOW);
1029        assert!(!stream.recv.almost_full());
1030
1031        let first = RangeBuf::from(b"hello", 0, true);
1032        let second = RangeBuf::from(b"hello", 0, true);
1033
1034        assert_eq!(stream.recv.write(first), Ok(()));
1035        assert_eq!(stream.recv.write(second), Ok(()));
1036
1037        let mut buf = [0; 32];
1038
1039        let (len, fin) = stream.recv.emit(&mut buf).unwrap();
1040        assert_eq!(&buf[..len], b"hello");
1041        assert!(fin);
1042    }
1043
1044    #[test]
1045    fn recv_fin_change() {
1046        let mut stream = <Stream>::new(0, 15, 0, true, 15, DEFAULT_STREAM_WINDOW);
1047        assert!(!stream.recv.almost_full());
1048
1049        let first = RangeBuf::from(b"hello", 0, true);
1050        let second = RangeBuf::from(b"world", 5, true);
1051
1052        assert_eq!(stream.recv.write(second), Ok(()));
1053        assert_eq!(stream.recv.write(first), Err(Error::FinalSize));
1054    }
1055
1056    #[test]
1057    fn recv_fin_lower_than_received() {
1058        let mut stream = <Stream>::new(0, 15, 0, true, 15, DEFAULT_STREAM_WINDOW);
1059        assert!(!stream.recv.almost_full());
1060
1061        let first = RangeBuf::from(b"hello", 0, true);
1062        let second = RangeBuf::from(b"world", 5, false);
1063
1064        assert_eq!(stream.recv.write(second), Ok(()));
1065        assert_eq!(stream.recv.write(first), Err(Error::FinalSize));
1066    }
1067
1068    #[test]
1069    fn recv_fin_flow_control() {
1070        let mut stream = <Stream>::new(0, 15, 0, true, 15, DEFAULT_STREAM_WINDOW);
1071        assert!(!stream.recv.almost_full());
1072
1073        let mut buf = [0; 32];
1074
1075        let first = RangeBuf::from(b"hello", 0, false);
1076        let second = RangeBuf::from(b"world", 5, true);
1077
1078        assert_eq!(stream.recv.write(first), Ok(()));
1079        assert_eq!(stream.recv.write(second), Ok(()));
1080
1081        let (len, fin) = stream.recv.emit(&mut buf).unwrap();
1082        assert_eq!(&buf[..len], b"helloworld");
1083        assert!(fin);
1084
1085        assert!(!stream.recv.almost_full());
1086    }
1087
1088    #[test]
1089    fn recv_fin_reset_mismatch() {
1090        let mut stream = <Stream>::new(0, 15, 0, true, 15, DEFAULT_STREAM_WINDOW);
1091        assert!(!stream.recv.almost_full());
1092
1093        let first = RangeBuf::from(b"hello", 0, true);
1094
1095        assert_eq!(stream.recv.write(first), Ok(()));
1096        assert_eq!(stream.recv.reset(0, 10), Err(Error::FinalSize));
1097    }
1098
1099    #[test]
1100    fn recv_reset_with_gap() {
1101        let mut stream = <Stream>::new(0, 15, 0, true, 15, DEFAULT_STREAM_WINDOW);
1102        assert!(!stream.recv.almost_full());
1103
1104        let first = RangeBuf::from(b"hello", 0, false);
1105
1106        assert_eq!(stream.recv.write(first), Ok(()));
1107        // Read one byte.
1108        assert_eq!(stream.recv.emit(&mut [0; 1]), Ok((1, false)));
1109        // Reset with a final size > than max previously received
1110        assert_eq!(
1111            stream.recv.reset(0, 10),
1112            Ok(RecvBufResetReturn {
1113                max_data_delta: 5,
1114                // consumed_flowcontrol is 9, since we already read 1 byte
1115                consumed_flowcontrol: 9
1116            })
1117        );
1118        assert_eq!(stream.recv.reset(0, 10), Ok(RecvBufResetReturn::zero()));
1119    }
1120
1121    #[test]
1122    fn recv_reset_dup() {
1123        let mut stream = <Stream>::new(0, 15, 0, true, 15, DEFAULT_STREAM_WINDOW);
1124        assert!(!stream.recv.almost_full());
1125
1126        let first = RangeBuf::from(b"hello", 0, false);
1127
1128        assert_eq!(stream.recv.write(first), Ok(()));
1129        assert_eq!(
1130            stream.recv.reset(0, 5),
1131            Ok(RecvBufResetReturn {
1132                max_data_delta: 0,
1133                consumed_flowcontrol: 5
1134            })
1135        );
1136        assert_eq!(stream.recv.reset(0, 5), Ok(RecvBufResetReturn::zero()));
1137    }
1138
1139    #[test]
1140    fn recv_reset_change() {
1141        let mut stream = <Stream>::new(0, 15, 0, true, 15, DEFAULT_STREAM_WINDOW);
1142        assert!(!stream.recv.almost_full());
1143
1144        let first = RangeBuf::from(b"hello", 0, false);
1145
1146        assert_eq!(stream.recv.write(first), Ok(()));
1147        assert_eq!(
1148            stream.recv.reset(0, 5),
1149            Ok(RecvBufResetReturn {
1150                max_data_delta: 0,
1151                consumed_flowcontrol: 5
1152            })
1153        );
1154        assert_eq!(stream.recv.reset(0, 10), Err(Error::FinalSize));
1155    }
1156
1157    #[test]
1158    fn recv_reset_lower_than_received() {
1159        let mut stream = <Stream>::new(0, 15, 0, true, 15, DEFAULT_STREAM_WINDOW);
1160        assert!(!stream.recv.almost_full());
1161
1162        let first = RangeBuf::from(b"hello", 0, false);
1163
1164        assert_eq!(stream.recv.write(first), Ok(()));
1165        assert_eq!(stream.recv.reset(0, 4), Err(Error::FinalSize));
1166    }
1167
1168    #[test]
1169    fn send_flow_control() {
1170        let mut buf = [0; 25];
1171
1172        let mut stream = <Stream>::new(0, 0, 15, true, 0, DEFAULT_STREAM_WINDOW);
1173
1174        let first = b"hello";
1175        let second = b"world";
1176        let third = b"something";
1177
1178        assert!(stream.send.write(first, false).is_ok());
1179        assert!(stream.send.write(second, false).is_ok());
1180        assert!(stream.send.write(third, false).is_ok());
1181
1182        assert_eq!(stream.send.off_front(), 0);
1183
1184        let (written, fin) = stream.send.emit(&mut buf[..25]).unwrap();
1185        assert_eq!(written, 15);
1186        assert!(!fin);
1187        assert_eq!(&buf[..written], b"helloworldsomet");
1188
1189        assert_eq!(stream.send.off_front(), 15);
1190
1191        let (written, fin) = stream.send.emit(&mut buf[..25]).unwrap();
1192        assert_eq!(written, 0);
1193        assert!(!fin);
1194        assert_eq!(&buf[..written], b"");
1195
1196        stream.send.retransmit(0, 15);
1197
1198        assert_eq!(stream.send.off_front(), 0);
1199
1200        let (written, fin) = stream.send.emit(&mut buf[..10]).unwrap();
1201        assert_eq!(written, 10);
1202        assert!(!fin);
1203        assert_eq!(&buf[..written], b"helloworld");
1204
1205        assert_eq!(stream.send.off_front(), 10);
1206
1207        let (written, fin) = stream.send.emit(&mut buf[..10]).unwrap();
1208        assert_eq!(written, 5);
1209        assert!(!fin);
1210        assert_eq!(&buf[..written], b"somet");
1211    }
1212
1213    #[test]
1214    fn send_past_fin() {
1215        let mut stream = <Stream>::new(0, 0, 15, true, 0, DEFAULT_STREAM_WINDOW);
1216
1217        let first = b"hello";
1218        let second = b"world";
1219        let third = b"third";
1220
1221        assert_eq!(stream.send.write(first, false), Ok(5));
1222
1223        assert_eq!(stream.send.write(second, true), Ok(5));
1224        assert!(stream.send.is_fin());
1225
1226        assert_eq!(stream.send.write(third, false), Err(Error::FinalSize));
1227    }
1228
1229    #[test]
1230    fn send_fin_dup() {
1231        let mut stream = <Stream>::new(0, 0, 15, true, 0, DEFAULT_STREAM_WINDOW);
1232
1233        assert_eq!(stream.send.write(b"hello", true), Ok(5));
1234        assert!(stream.send.is_fin());
1235
1236        assert_eq!(stream.send.write(b"", true), Ok(0));
1237        assert!(stream.send.is_fin());
1238    }
1239
1240    #[test]
1241    fn send_undo_fin() {
1242        let mut stream = <Stream>::new(0, 0, 15, true, 0, DEFAULT_STREAM_WINDOW);
1243
1244        assert_eq!(stream.send.write(b"hello", true), Ok(5));
1245        assert!(stream.send.is_fin());
1246
1247        assert_eq!(
1248            stream.send.write(b"helloworld", true),
1249            Err(Error::FinalSize)
1250        );
1251    }
1252
1253    #[test]
1254    fn send_fin_max_data_match() {
1255        let mut buf = [0; 15];
1256
1257        let mut stream = <Stream>::new(0, 0, 15, true, 0, DEFAULT_STREAM_WINDOW);
1258
1259        let slice = b"hellohellohello";
1260
1261        assert!(stream.send.write(slice, true).is_ok());
1262
1263        let (written, fin) = stream.send.emit(&mut buf[..15]).unwrap();
1264        assert_eq!(written, 15);
1265        assert!(fin);
1266        assert_eq!(&buf[..written], slice);
1267    }
1268
1269    #[test]
1270    fn send_fin_zero_length() {
1271        let mut buf = [0; 5];
1272
1273        let mut stream = <Stream>::new(0, 0, 15, true, 0, DEFAULT_STREAM_WINDOW);
1274
1275        assert_eq!(stream.send.write(b"hello", false), Ok(5));
1276        assert_eq!(stream.send.write(b"", true), Ok(0));
1277        assert!(stream.send.is_fin());
1278
1279        let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1280        assert_eq!(written, 5);
1281        assert!(fin);
1282        assert_eq!(&buf[..written], b"hello");
1283    }
1284
1285    #[test]
1286    fn send_ack() {
1287        let mut buf = [0; 5];
1288
1289        let mut stream = <Stream>::new(0, 0, 15, true, 0, DEFAULT_STREAM_WINDOW);
1290
1291        assert_eq!(stream.send.write(b"hello", false), Ok(5));
1292        assert_eq!(stream.send.write(b"world", false), Ok(5));
1293        assert_eq!(stream.send.write(b"", true), Ok(0));
1294        assert!(stream.send.is_fin());
1295
1296        assert_eq!(stream.send.off_front(), 0);
1297
1298        let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1299        assert_eq!(written, 5);
1300        assert!(!fin);
1301        assert_eq!(&buf[..written], b"hello");
1302
1303        stream.send.ack_and_drop(0, 5);
1304
1305        stream.send.retransmit(0, 5);
1306
1307        assert_eq!(stream.send.off_front(), 5);
1308
1309        let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1310        assert_eq!(written, 5);
1311        assert!(fin);
1312        assert_eq!(&buf[..written], b"world");
1313    }
1314
1315    #[test]
1316    fn send_ack_reordering() {
1317        let mut buf = [0; 5];
1318
1319        let mut stream = <Stream>::new(0, 0, 15, true, 0, DEFAULT_STREAM_WINDOW);
1320
1321        assert_eq!(stream.send.write(b"hello", false), Ok(5));
1322        assert_eq!(stream.send.write(b"world", false), Ok(5));
1323        assert_eq!(stream.send.write(b"", true), Ok(0));
1324        assert!(stream.send.is_fin());
1325
1326        assert_eq!(stream.send.off_front(), 0);
1327
1328        let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1329        assert_eq!(written, 5);
1330        assert!(!fin);
1331        assert_eq!(&buf[..written], b"hello");
1332
1333        assert_eq!(stream.send.off_front(), 5);
1334
1335        let (written, fin) = stream.send.emit(&mut buf[..1]).unwrap();
1336        assert_eq!(written, 1);
1337        assert!(!fin);
1338        assert_eq!(&buf[..written], b"w");
1339
1340        stream.send.ack_and_drop(5, 1);
1341        stream.send.ack_and_drop(0, 5);
1342
1343        stream.send.retransmit(0, 5);
1344        stream.send.retransmit(5, 1);
1345
1346        assert_eq!(stream.send.off_front(), 6);
1347
1348        let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1349        assert_eq!(written, 4);
1350        assert!(fin);
1351        assert_eq!(&buf[..written], b"orld");
1352    }
1353
1354    #[test]
1355    fn recv_data_below_off() {
1356        let mut stream = <Stream>::new(0, 15, 0, true, 15, DEFAULT_STREAM_WINDOW);
1357
1358        let first = RangeBuf::from(b"hello", 0, false);
1359
1360        assert_eq!(stream.recv.write(first), Ok(()));
1361
1362        let mut buf = [0; 10];
1363
1364        let (len, fin) = stream.recv.emit(&mut buf).unwrap();
1365        assert_eq!(&buf[..len], b"hello");
1366        assert!(!fin);
1367
1368        let first = RangeBuf::from(b"elloworld", 1, true);
1369        assert_eq!(stream.recv.write(first), Ok(()));
1370
1371        let (len, fin) = stream.recv.emit(&mut buf).unwrap();
1372        assert_eq!(&buf[..len], b"world");
1373        assert!(fin);
1374    }
1375
1376    #[test]
1377    fn stream_complete() {
1378        let mut stream =
1379            <Stream>::new(0, 30, 30, true, 30, DEFAULT_STREAM_WINDOW);
1380
1381        assert_eq!(stream.send.write(b"hello", false), Ok(5));
1382        assert_eq!(stream.send.write(b"world", false), Ok(5));
1383
1384        assert!(!stream.send.is_complete());
1385        assert!(!stream.send.is_fin());
1386
1387        assert_eq!(stream.send.write(b"", true), Ok(0));
1388
1389        assert!(!stream.send.is_complete());
1390        assert!(stream.send.is_fin());
1391
1392        let buf = RangeBuf::from(b"hello", 0, true);
1393        assert!(stream.recv.write(buf).is_ok());
1394        assert!(!stream.recv.is_fin());
1395
1396        stream.send.ack(6, 4);
1397        assert!(!stream.send.is_complete());
1398
1399        let mut buf = [0; 2];
1400        assert_eq!(stream.recv.emit(&mut buf), Ok((2, false)));
1401        assert!(!stream.recv.is_fin());
1402
1403        stream.send.ack(1, 5);
1404        assert!(!stream.send.is_complete());
1405
1406        stream.send.ack(0, 1);
1407        assert!(stream.send.is_complete());
1408
1409        assert!(!stream.is_complete());
1410
1411        let mut buf = [0; 3];
1412        assert_eq!(stream.recv.emit(&mut buf), Ok((3, true)));
1413        assert!(stream.recv.is_fin());
1414
1415        assert!(stream.is_complete());
1416    }
1417
1418    #[test]
1419    fn send_fin_zero_length_output() {
1420        let mut buf = [0; 5];
1421
1422        let mut stream = <Stream>::new(0, 0, 15, true, 0, DEFAULT_STREAM_WINDOW);
1423
1424        assert_eq!(stream.send.write(b"hello", false), Ok(5));
1425        assert_eq!(stream.send.off_front(), 0);
1426        assert!(!stream.send.is_fin());
1427
1428        let (written, fin) = stream.send.emit(&mut buf).unwrap();
1429        assert_eq!(written, 5);
1430        assert!(!fin);
1431        assert_eq!(&buf[..written], b"hello");
1432
1433        assert_eq!(stream.send.write(b"", true), Ok(0));
1434        assert!(stream.send.is_fin());
1435        assert_eq!(stream.send.off_front(), 5);
1436
1437        let (written, fin) = stream.send.emit(&mut buf).unwrap();
1438        assert_eq!(written, 0);
1439        assert!(fin);
1440        assert_eq!(&buf[..written], b"");
1441    }
1442
1443    fn stream_send_ready(stream: &Stream) -> bool {
1444        !stream.send.is_empty() &&
1445            stream.send.off_front() < stream.send.off_back()
1446    }
1447
1448    #[test]
1449    fn send_emit() {
1450        let mut buf = [0; 5];
1451
1452        let mut stream = <Stream>::new(0, 0, 20, true, 0, DEFAULT_STREAM_WINDOW);
1453
1454        assert_eq!(stream.send.write(b"hello", false), Ok(5));
1455        assert_eq!(stream.send.write(b"world", false), Ok(5));
1456        assert_eq!(stream.send.write(b"olleh", false), Ok(5));
1457        assert_eq!(stream.send.write(b"dlrow", true), Ok(5));
1458        assert_eq!(stream.send.off_front(), 0);
1459        assert_eq!(stream.send.bufs_count(), 4);
1460
1461        assert!(stream.is_flushable());
1462
1463        assert!(stream_send_ready(&stream));
1464        assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1465        assert_eq!(stream.send.off_front(), 4);
1466        assert_eq!(&buf[..4], b"hell");
1467
1468        assert!(stream_send_ready(&stream));
1469        assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1470        assert_eq!(stream.send.off_front(), 8);
1471        assert_eq!(&buf[..4], b"owor");
1472
1473        assert!(stream_send_ready(&stream));
1474        assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1475        assert_eq!(stream.send.off_front(), 10);
1476        assert_eq!(&buf[..2], b"ld");
1477
1478        assert!(stream_send_ready(&stream));
1479        assert_eq!(stream.send.emit(&mut buf[..1]), Ok((1, false)));
1480        assert_eq!(stream.send.off_front(), 11);
1481        assert_eq!(&buf[..1], b"o");
1482
1483        assert!(stream_send_ready(&stream));
1484        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1485        assert_eq!(stream.send.off_front(), 16);
1486        assert_eq!(&buf[..5], b"llehd");
1487
1488        assert!(stream_send_ready(&stream));
1489        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((4, true)));
1490        assert_eq!(stream.send.off_front(), 20);
1491        assert_eq!(&buf[..4], b"lrow");
1492
1493        assert!(!stream.is_flushable());
1494
1495        assert!(!stream_send_ready(&stream));
1496        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((0, true)));
1497        assert_eq!(stream.send.off_front(), 20);
1498    }
1499
1500    #[test]
1501    fn send_emit_ack() {
1502        let mut buf = [0; 5];
1503
1504        let mut stream = <Stream>::new(0, 0, 20, true, 0, DEFAULT_STREAM_WINDOW);
1505
1506        assert_eq!(stream.send.write(b"hello", false), Ok(5));
1507        assert_eq!(stream.send.write(b"world", false), Ok(5));
1508        assert_eq!(stream.send.write(b"olleh", false), Ok(5));
1509        assert_eq!(stream.send.write(b"dlrow", true), Ok(5));
1510        assert_eq!(stream.send.off_front(), 0);
1511        assert_eq!(stream.send.bufs_count(), 4);
1512
1513        assert!(stream.is_flushable());
1514
1515        assert!(stream_send_ready(&stream));
1516        assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1517        assert_eq!(stream.send.off_front(), 4);
1518        assert_eq!(&buf[..4], b"hell");
1519
1520        assert!(stream_send_ready(&stream));
1521        assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1522        assert_eq!(stream.send.off_front(), 8);
1523        assert_eq!(&buf[..4], b"owor");
1524
1525        stream.send.ack_and_drop(0, 5);
1526        assert_eq!(stream.send.bufs_count(), 3);
1527
1528        assert!(stream_send_ready(&stream));
1529        assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1530        assert_eq!(stream.send.off_front(), 10);
1531        assert_eq!(&buf[..2], b"ld");
1532
1533        stream.send.ack_and_drop(7, 5);
1534        assert_eq!(stream.send.bufs_count(), 3);
1535
1536        assert!(stream_send_ready(&stream));
1537        assert_eq!(stream.send.emit(&mut buf[..1]), Ok((1, false)));
1538        assert_eq!(stream.send.off_front(), 11);
1539        assert_eq!(&buf[..1], b"o");
1540
1541        assert!(stream_send_ready(&stream));
1542        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1543        assert_eq!(stream.send.off_front(), 16);
1544        assert_eq!(&buf[..5], b"llehd");
1545
1546        stream.send.ack_and_drop(5, 7);
1547        assert_eq!(stream.send.bufs_count(), 2);
1548
1549        assert!(stream_send_ready(&stream));
1550        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((4, true)));
1551        assert_eq!(stream.send.off_front(), 20);
1552        assert_eq!(&buf[..4], b"lrow");
1553
1554        assert!(!stream.is_flushable());
1555
1556        assert!(!stream_send_ready(&stream));
1557        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((0, true)));
1558        assert_eq!(stream.send.off_front(), 20);
1559
1560        stream.send.ack_and_drop(22, 4);
1561        assert_eq!(stream.send.bufs_count(), 2);
1562
1563        stream.send.ack_and_drop(20, 1);
1564        assert_eq!(stream.send.bufs_count(), 2);
1565    }
1566
1567    #[test]
1568    fn send_emit_retransmit() {
1569        let mut buf = [0; 5];
1570
1571        let mut stream = <Stream>::new(
1572            0,
1573            0,
1574            20,
1575            true,
1576            DEFAULT_STREAM_WINDOW,
1577            DEFAULT_STREAM_WINDOW,
1578        );
1579
1580        assert_eq!(stream.send.write(b"hello", false), Ok(5));
1581        assert_eq!(stream.send.write(b"world", false), Ok(5));
1582        assert_eq!(stream.send.write(b"olleh", false), Ok(5));
1583        assert_eq!(stream.send.write(b"dlrow", true), Ok(5));
1584        assert_eq!(stream.send.off_front(), 0);
1585        assert_eq!(stream.send.bufs_count(), 4);
1586
1587        assert!(stream.is_flushable());
1588
1589        assert!(stream_send_ready(&stream));
1590        assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1591        assert_eq!(stream.send.off_front(), 4);
1592        assert_eq!(&buf[..4], b"hell");
1593
1594        assert!(stream_send_ready(&stream));
1595        assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1596        assert_eq!(stream.send.off_front(), 8);
1597        assert_eq!(&buf[..4], b"owor");
1598
1599        stream.send.retransmit(3, 3);
1600        assert_eq!(stream.send.off_front(), 3);
1601
1602        assert!(stream_send_ready(&stream));
1603        assert_eq!(stream.send.emit(&mut buf[..3]), Ok((3, false)));
1604        assert_eq!(stream.send.off_front(), 8);
1605        assert_eq!(&buf[..3], b"low");
1606
1607        assert!(stream_send_ready(&stream));
1608        assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1609        assert_eq!(stream.send.off_front(), 10);
1610        assert_eq!(&buf[..2], b"ld");
1611
1612        stream.send.ack_and_drop(7, 2);
1613
1614        stream.send.retransmit(8, 2);
1615
1616        assert!(stream_send_ready(&stream));
1617        assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1618        assert_eq!(stream.send.off_front(), 10);
1619        assert_eq!(&buf[..2], b"ld");
1620
1621        assert!(stream_send_ready(&stream));
1622        assert_eq!(stream.send.emit(&mut buf[..1]), Ok((1, false)));
1623        assert_eq!(stream.send.off_front(), 11);
1624        assert_eq!(&buf[..1], b"o");
1625
1626        assert!(stream_send_ready(&stream));
1627        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1628        assert_eq!(stream.send.off_front(), 16);
1629        assert_eq!(&buf[..5], b"llehd");
1630
1631        stream.send.retransmit(12, 2);
1632
1633        assert!(stream_send_ready(&stream));
1634        assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1635        assert_eq!(stream.send.off_front(), 16);
1636        assert_eq!(&buf[..2], b"le");
1637
1638        assert!(stream_send_ready(&stream));
1639        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((4, true)));
1640        assert_eq!(stream.send.off_front(), 20);
1641        assert_eq!(&buf[..4], b"lrow");
1642
1643        assert!(!stream.is_flushable());
1644
1645        assert!(!stream_send_ready(&stream));
1646        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((0, true)));
1647        assert_eq!(stream.send.off_front(), 20);
1648
1649        stream.send.retransmit(7, 12);
1650
1651        assert!(stream_send_ready(&stream));
1652        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1653        assert_eq!(stream.send.off_front(), 12);
1654        assert_eq!(&buf[..5], b"rldol");
1655
1656        assert!(stream_send_ready(&stream));
1657        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1658        assert_eq!(stream.send.off_front(), 17);
1659        assert_eq!(&buf[..5], b"lehdl");
1660
1661        assert!(stream_send_ready(&stream));
1662        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((2, false)));
1663        assert_eq!(stream.send.off_front(), 20);
1664        assert_eq!(&buf[..2], b"ro");
1665
1666        stream.send.ack_and_drop(12, 7);
1667
1668        stream.send.retransmit(7, 12);
1669
1670        assert!(stream_send_ready(&stream));
1671        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1672        assert_eq!(stream.send.off_front(), 12);
1673        assert_eq!(&buf[..5], b"rldol");
1674
1675        assert!(stream_send_ready(&stream));
1676        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1677        assert_eq!(stream.send.off_front(), 17);
1678        assert_eq!(&buf[..5], b"lehdl");
1679
1680        assert!(stream_send_ready(&stream));
1681        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((2, false)));
1682        assert_eq!(stream.send.off_front(), 20);
1683        assert_eq!(&buf[..2], b"ro");
1684    }
1685
1686    #[test]
1687    fn rangebuf_split_off() {
1688        let mut buf = <RangeBuf>::from(b"helloworld", 5, true);
1689        assert_eq!(buf.start, 0);
1690        assert_eq!(buf.pos, 0);
1691        assert_eq!(buf.len, 10);
1692        assert_eq!(buf.off, 5);
1693        assert!(buf.fin);
1694
1695        assert_eq!(buf.len(), 10);
1696        assert_eq!(buf.off(), 5);
1697        assert!(buf.fin());
1698
1699        assert_eq!(&buf[..], b"helloworld");
1700
1701        // Advance buffer.
1702        buf.consume(5);
1703
1704        assert_eq!(buf.start, 0);
1705        assert_eq!(buf.pos, 5);
1706        assert_eq!(buf.len, 10);
1707        assert_eq!(buf.off, 5);
1708        assert!(buf.fin);
1709
1710        assert_eq!(buf.len(), 5);
1711        assert_eq!(buf.off(), 10);
1712        assert!(buf.fin());
1713
1714        assert_eq!(&buf[..], b"world");
1715
1716        // Split buffer before position.
1717        let mut new_buf = buf.split_off(3);
1718
1719        assert_eq!(buf.start, 0);
1720        assert_eq!(buf.pos, 3);
1721        assert_eq!(buf.len, 3);
1722        assert_eq!(buf.off, 5);
1723        assert!(!buf.fin);
1724
1725        assert_eq!(buf.len(), 0);
1726        assert_eq!(buf.off(), 8);
1727        assert!(!buf.fin());
1728
1729        assert_eq!(&buf[..], b"");
1730
1731        assert_eq!(new_buf.start, 3);
1732        assert_eq!(new_buf.pos, 5);
1733        assert_eq!(new_buf.len, 7);
1734        assert_eq!(new_buf.off, 8);
1735        assert!(new_buf.fin);
1736
1737        assert_eq!(new_buf.len(), 5);
1738        assert_eq!(new_buf.off(), 10);
1739        assert!(new_buf.fin());
1740
1741        assert_eq!(&new_buf[..], b"world");
1742
1743        // Advance buffer.
1744        new_buf.consume(2);
1745
1746        assert_eq!(new_buf.start, 3);
1747        assert_eq!(new_buf.pos, 7);
1748        assert_eq!(new_buf.len, 7);
1749        assert_eq!(new_buf.off, 8);
1750        assert!(new_buf.fin);
1751
1752        assert_eq!(new_buf.len(), 3);
1753        assert_eq!(new_buf.off(), 12);
1754        assert!(new_buf.fin());
1755
1756        assert_eq!(&new_buf[..], b"rld");
1757
1758        // Split buffer after position.
1759        let mut new_new_buf = new_buf.split_off(5);
1760
1761        assert_eq!(new_buf.start, 3);
1762        assert_eq!(new_buf.pos, 7);
1763        assert_eq!(new_buf.len, 5);
1764        assert_eq!(new_buf.off, 8);
1765        assert!(!new_buf.fin);
1766
1767        assert_eq!(new_buf.len(), 1);
1768        assert_eq!(new_buf.off(), 12);
1769        assert!(!new_buf.fin());
1770
1771        assert_eq!(&new_buf[..], b"r");
1772
1773        assert_eq!(new_new_buf.start, 8);
1774        assert_eq!(new_new_buf.pos, 8);
1775        assert_eq!(new_new_buf.len, 2);
1776        assert_eq!(new_new_buf.off, 13);
1777        assert!(new_new_buf.fin);
1778
1779        assert_eq!(new_new_buf.len(), 2);
1780        assert_eq!(new_new_buf.off(), 13);
1781        assert!(new_new_buf.fin());
1782
1783        assert_eq!(&new_new_buf[..], b"ld");
1784
1785        // Advance buffer.
1786        new_new_buf.consume(2);
1787
1788        assert_eq!(new_new_buf.start, 8);
1789        assert_eq!(new_new_buf.pos, 10);
1790        assert_eq!(new_new_buf.len, 2);
1791        assert_eq!(new_new_buf.off, 13);
1792        assert!(new_new_buf.fin);
1793
1794        assert_eq!(new_new_buf.len(), 0);
1795        assert_eq!(new_new_buf.off(), 15);
1796        assert!(new_new_buf.fin());
1797
1798        assert_eq!(&new_new_buf[..], b"");
1799    }
1800
1801    /// RFC9000 2.1: A stream ID that is used out of order results in all
1802    /// streams of that type with lower-numbered stream IDs also being opened.
1803    #[test]
1804    fn stream_limit_auto_open() {
1805        let local_tp = crate::TransportParams::default();
1806        let peer_tp = crate::TransportParams::default();
1807
1808        let mut streams = <StreamMap>::new(5, 5, 5);
1809
1810        let stream_id = 500;
1811        assert!(!is_local(stream_id, true), "stream id is peer initiated");
1812        assert!(is_bidi(stream_id), "stream id is bidirectional");
1813        assert_eq!(
1814            streams
1815                .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
1816                .err(),
1817            Some(Error::StreamLimit),
1818            "stream limit should be exceeded"
1819        );
1820    }
1821
1822    /// Stream limit should be satisfied regardless of what order we open
1823    /// streams
1824    #[test]
1825    fn stream_create_out_of_order() {
1826        let local_tp = crate::TransportParams::default();
1827        let peer_tp = crate::TransportParams::default();
1828
1829        let mut streams = <StreamMap>::new(5, 5, 5);
1830
1831        for stream_id in [8, 12, 4] {
1832            assert!(is_local(stream_id, false), "stream id is client initiated");
1833            assert!(is_bidi(stream_id), "stream id is bidirectional");
1834            assert!(streams
1835                .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
1836                .is_ok());
1837        }
1838    }
1839
1840    /// Check stream limit boundary cases
1841    #[test]
1842    fn stream_limit_edge() {
1843        let local_tp = crate::TransportParams::default();
1844        let peer_tp = crate::TransportParams::default();
1845
1846        let mut streams = <StreamMap>::new(3, 3, 3);
1847
1848        // Highest permitted
1849        let stream_id = 8;
1850        assert!(streams
1851            .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
1852            .is_ok());
1853
1854        // One more than highest permitted
1855        let stream_id = 12;
1856        assert_eq!(
1857            streams
1858                .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
1859                .err(),
1860            Some(Error::StreamLimit)
1861        );
1862    }
1863
1864    fn cycle_stream_priority(stream_id: u64, streams: &mut StreamMap) {
1865        let key = streams.get(stream_id).unwrap().priority_key.clone();
1866        streams.update_priority(&key.clone(), &key);
1867    }
1868
1869    #[test]
1870    fn writable_prioritized_default_priority() {
1871        let local_tp = crate::TransportParams::default();
1872        let peer_tp = crate::TransportParams {
1873            initial_max_stream_data_bidi_local: 100,
1874            initial_max_stream_data_uni: 100,
1875            ..Default::default()
1876        };
1877
1878        let mut streams = StreamMap::new(100, 100, 100);
1879
1880        for id in [0, 4, 8, 12] {
1881            assert!(streams
1882                .get_or_create(id, &local_tp, &peer_tp, false, true)
1883                .is_ok());
1884        }
1885
1886        let walk_1: Vec<u64> = streams.writable().collect();
1887        cycle_stream_priority(*walk_1.first().unwrap(), &mut streams);
1888        let walk_2: Vec<u64> = streams.writable().collect();
1889        cycle_stream_priority(*walk_2.first().unwrap(), &mut streams);
1890        let walk_3: Vec<u64> = streams.writable().collect();
1891        cycle_stream_priority(*walk_3.first().unwrap(), &mut streams);
1892        let walk_4: Vec<u64> = streams.writable().collect();
1893        cycle_stream_priority(*walk_4.first().unwrap(), &mut streams);
1894        let walk_5: Vec<u64> = streams.writable().collect();
1895
1896        // All streams are non-incremental and same urgency by default. Multiple
1897        // visits shuffle their order.
1898        assert_eq!(walk_1, vec![0, 4, 8, 12]);
1899        assert_eq!(walk_2, vec![4, 8, 12, 0]);
1900        assert_eq!(walk_3, vec![8, 12, 0, 4]);
1901        assert_eq!(walk_4, vec![12, 0, 4, 8,]);
1902        assert_eq!(walk_5, vec![0, 4, 8, 12]);
1903    }
1904
1905    #[test]
1906    fn writable_prioritized_insert_order() {
1907        let local_tp = crate::TransportParams::default();
1908        let peer_tp = crate::TransportParams {
1909            initial_max_stream_data_bidi_local: 100,
1910            initial_max_stream_data_uni: 100,
1911            ..Default::default()
1912        };
1913
1914        let mut streams = StreamMap::new(100, 100, 100);
1915
1916        // Inserting same-urgency incremental streams in a "random" order yields
1917        // same order to start with.
1918        for id in [12, 4, 8, 0] {
1919            assert!(streams
1920                .get_or_create(id, &local_tp, &peer_tp, false, true)
1921                .is_ok());
1922        }
1923
1924        let walk_1: Vec<u64> = streams.writable().collect();
1925        cycle_stream_priority(*walk_1.first().unwrap(), &mut streams);
1926        let walk_2: Vec<u64> = streams.writable().collect();
1927        cycle_stream_priority(*walk_2.first().unwrap(), &mut streams);
1928        let walk_3: Vec<u64> = streams.writable().collect();
1929        cycle_stream_priority(*walk_3.first().unwrap(), &mut streams);
1930        let walk_4: Vec<u64> = streams.writable().collect();
1931        cycle_stream_priority(*walk_4.first().unwrap(), &mut streams);
1932        let walk_5: Vec<u64> = streams.writable().collect();
1933        assert_eq!(walk_1, vec![12, 4, 8, 0]);
1934        assert_eq!(walk_2, vec![4, 8, 0, 12]);
1935        assert_eq!(walk_3, vec![8, 0, 12, 4,]);
1936        assert_eq!(walk_4, vec![0, 12, 4, 8]);
1937        assert_eq!(walk_5, vec![12, 4, 8, 0]);
1938    }
1939
1940    #[test]
1941    fn writable_prioritized_mixed_urgency() {
1942        let local_tp = crate::TransportParams::default();
1943        let peer_tp = crate::TransportParams {
1944            initial_max_stream_data_bidi_local: 100,
1945            initial_max_stream_data_uni: 100,
1946            ..Default::default()
1947        };
1948
1949        let mut streams = <StreamMap>::new(100, 100, 100);
1950
1951        // Streams where the urgency descends (becomes more important). No stream
1952        // shares an urgency.
1953        let input = vec![
1954            (0, 100),
1955            (4, 90),
1956            (8, 80),
1957            (12, 70),
1958            (16, 60),
1959            (20, 50),
1960            (24, 40),
1961            (28, 30),
1962            (32, 20),
1963            (36, 10),
1964            (40, 0),
1965        ];
1966
1967        for (id, urgency) in input.clone() {
1968            // this duplicates some code from stream_priority in order to access
1969            // streams and the collection they're in
1970            let stream = streams
1971                .get_or_create(id, &local_tp, &peer_tp, false, true)
1972                .unwrap();
1973
1974            stream.urgency = urgency;
1975
1976            let new_priority_key = Arc::new(StreamPriorityKey {
1977                urgency: stream.urgency,
1978                incremental: stream.incremental,
1979                id,
1980                ..Default::default()
1981            });
1982
1983            let old_priority_key = std::mem::replace(
1984                &mut stream.priority_key,
1985                new_priority_key.clone(),
1986            );
1987
1988            streams.update_priority(&old_priority_key, &new_priority_key);
1989        }
1990
1991        let walk_1: Vec<u64> = streams.writable().collect();
1992        assert_eq!(walk_1, vec![40, 36, 32, 28, 24, 20, 16, 12, 8, 4, 0]);
1993
1994        // Re-applying priority to a stream does not cause duplication.
1995        for (id, urgency) in input {
1996            // this duplicates some code from stream_priority in order to access
1997            // streams and the collection they're in
1998            let stream = streams
1999                .get_or_create(id, &local_tp, &peer_tp, false, true)
2000                .unwrap();
2001
2002            stream.urgency = urgency;
2003
2004            let new_priority_key = Arc::new(StreamPriorityKey {
2005                urgency: stream.urgency,
2006                incremental: stream.incremental,
2007                id,
2008                ..Default::default()
2009            });
2010
2011            let old_priority_key = std::mem::replace(
2012                &mut stream.priority_key,
2013                new_priority_key.clone(),
2014            );
2015
2016            streams.update_priority(&old_priority_key, &new_priority_key);
2017        }
2018
2019        let walk_2: Vec<u64> = streams.writable().collect();
2020        assert_eq!(walk_2, vec![40, 36, 32, 28, 24, 20, 16, 12, 8, 4, 0]);
2021
2022        // Removing streams doesn't break expected ordering.
2023        streams.collect(24, true);
2024
2025        let walk_3: Vec<u64> = streams.writable().collect();
2026        assert_eq!(walk_3, vec![40, 36, 32, 28, 20, 16, 12, 8, 4, 0]);
2027
2028        streams.collect(40, true);
2029        streams.collect(0, true);
2030
2031        let walk_4: Vec<u64> = streams.writable().collect();
2032        assert_eq!(walk_4, vec![36, 32, 28, 20, 16, 12, 8, 4]);
2033
2034        // Adding streams doesn't break expected ordering.
2035        streams
2036            .get_or_create(44, &local_tp, &peer_tp, false, true)
2037            .unwrap();
2038
2039        let walk_5: Vec<u64> = streams.writable().collect();
2040        assert_eq!(walk_5, vec![36, 32, 28, 20, 16, 12, 8, 4, 44]);
2041    }
2042
2043    #[test]
2044    fn writable_prioritized_mixed_urgencies_incrementals() {
2045        let local_tp = crate::TransportParams::default();
2046        let peer_tp = crate::TransportParams {
2047            initial_max_stream_data_bidi_local: 100,
2048            initial_max_stream_data_uni: 100,
2049            ..Default::default()
2050        };
2051
2052        let mut streams = StreamMap::new(100, 100, 100);
2053
2054        // Streams that share some urgency level
2055        let input = vec![
2056            (0, 100),
2057            (4, 20),
2058            (8, 100),
2059            (12, 20),
2060            (16, 90),
2061            (20, 25),
2062            (24, 90),
2063            (28, 30),
2064            (32, 80),
2065            (36, 20),
2066            (40, 0),
2067        ];
2068
2069        for (id, urgency) in input.clone() {
2070            // this duplicates some code from stream_priority in order to access
2071            // streams and the collection they're in
2072            let stream = streams
2073                .get_or_create(id, &local_tp, &peer_tp, false, true)
2074                .unwrap();
2075
2076            stream.urgency = urgency;
2077
2078            let new_priority_key = Arc::new(StreamPriorityKey {
2079                urgency: stream.urgency,
2080                incremental: stream.incremental,
2081                id,
2082                ..Default::default()
2083            });
2084
2085            let old_priority_key = std::mem::replace(
2086                &mut stream.priority_key,
2087                new_priority_key.clone(),
2088            );
2089
2090            streams.update_priority(&old_priority_key, &new_priority_key);
2091        }
2092
2093        let walk_1: Vec<u64> = streams.writable().collect();
2094        cycle_stream_priority(4, &mut streams);
2095        cycle_stream_priority(16, &mut streams);
2096        cycle_stream_priority(0, &mut streams);
2097        let walk_2: Vec<u64> = streams.writable().collect();
2098        cycle_stream_priority(12, &mut streams);
2099        cycle_stream_priority(24, &mut streams);
2100        cycle_stream_priority(8, &mut streams);
2101        let walk_3: Vec<u64> = streams.writable().collect();
2102        cycle_stream_priority(36, &mut streams);
2103        cycle_stream_priority(16, &mut streams);
2104        cycle_stream_priority(0, &mut streams);
2105        let walk_4: Vec<u64> = streams.writable().collect();
2106        cycle_stream_priority(4, &mut streams);
2107        cycle_stream_priority(24, &mut streams);
2108        cycle_stream_priority(8, &mut streams);
2109        let walk_5: Vec<u64> = streams.writable().collect();
2110        cycle_stream_priority(12, &mut streams);
2111        cycle_stream_priority(16, &mut streams);
2112        cycle_stream_priority(0, &mut streams);
2113        let walk_6: Vec<u64> = streams.writable().collect();
2114        cycle_stream_priority(36, &mut streams);
2115        cycle_stream_priority(24, &mut streams);
2116        cycle_stream_priority(8, &mut streams);
2117        let walk_7: Vec<u64> = streams.writable().collect();
2118        cycle_stream_priority(4, &mut streams);
2119        cycle_stream_priority(16, &mut streams);
2120        cycle_stream_priority(0, &mut streams);
2121        let walk_8: Vec<u64> = streams.writable().collect();
2122        cycle_stream_priority(12, &mut streams);
2123        cycle_stream_priority(24, &mut streams);
2124        cycle_stream_priority(8, &mut streams);
2125        let walk_9: Vec<u64> = streams.writable().collect();
2126        cycle_stream_priority(36, &mut streams);
2127        cycle_stream_priority(16, &mut streams);
2128        cycle_stream_priority(0, &mut streams);
2129
2130        assert_eq!(walk_1, vec![40, 4, 12, 36, 20, 28, 32, 16, 24, 0, 8]);
2131        assert_eq!(walk_2, vec![40, 12, 36, 4, 20, 28, 32, 24, 16, 8, 0]);
2132        assert_eq!(walk_3, vec![40, 36, 4, 12, 20, 28, 32, 16, 24, 0, 8]);
2133        assert_eq!(walk_4, vec![40, 4, 12, 36, 20, 28, 32, 24, 16, 8, 0]);
2134        assert_eq!(walk_5, vec![40, 12, 36, 4, 20, 28, 32, 16, 24, 0, 8]);
2135        assert_eq!(walk_6, vec![40, 36, 4, 12, 20, 28, 32, 24, 16, 8, 0]);
2136        assert_eq!(walk_7, vec![40, 4, 12, 36, 20, 28, 32, 16, 24, 0, 8]);
2137        assert_eq!(walk_8, vec![40, 12, 36, 4, 20, 28, 32, 24, 16, 8, 0]);
2138        assert_eq!(walk_9, vec![40, 36, 4, 12, 20, 28, 32, 16, 24, 0, 8]);
2139
2140        // Removing streams doesn't break expected ordering.
2141        streams.collect(20, true);
2142
2143        let walk_10: Vec<u64> = streams.writable().collect();
2144        assert_eq!(walk_10, vec![40, 4, 12, 36, 28, 32, 24, 16, 8, 0]);
2145
2146        // Adding streams doesn't break expected ordering.
2147        let stream = streams
2148            .get_or_create(44, &local_tp, &peer_tp, false, true)
2149            .unwrap();
2150
2151        stream.urgency = 20;
2152        stream.incremental = true;
2153
2154        let new_priority_key = Arc::new(StreamPriorityKey {
2155            urgency: stream.urgency,
2156            incremental: stream.incremental,
2157            id: 44,
2158            ..Default::default()
2159        });
2160
2161        let old_priority_key =
2162            std::mem::replace(&mut stream.priority_key, new_priority_key.clone());
2163
2164        streams.update_priority(&old_priority_key, &new_priority_key);
2165
2166        let walk_11: Vec<u64> = streams.writable().collect();
2167        assert_eq!(walk_11, vec![40, 4, 12, 36, 44, 28, 32, 24, 16, 8, 0]);
2168    }
2169
2170    #[test]
2171    fn priority_tree_dupes() {
2172        let mut prioritized_writable: RBTree<StreamWritablePriorityAdapter> =
2173            Default::default();
2174
2175        for id in [0, 4, 8, 12] {
2176            let s = Arc::new(StreamPriorityKey {
2177                urgency: 0,
2178                incremental: false,
2179                id,
2180                ..Default::default()
2181            });
2182
2183            prioritized_writable.insert(s);
2184        }
2185
2186        let walk_1: Vec<u64> =
2187            prioritized_writable.iter().map(|s| s.id).collect();
2188        assert_eq!(walk_1, vec![0, 4, 8, 12]);
2189
2190        // Default keys could cause duplicate entries, this is normally protected
2191        // against via StreamMap.
2192        for id in [0, 4, 8, 12] {
2193            let s = Arc::new(StreamPriorityKey {
2194                urgency: 0,
2195                incremental: false,
2196                id,
2197                ..Default::default()
2198            });
2199
2200            prioritized_writable.insert(s);
2201        }
2202
2203        let walk_2: Vec<u64> =
2204            prioritized_writable.iter().map(|s| s.id).collect();
2205        assert_eq!(walk_2, vec![0, 0, 4, 4, 8, 8, 12, 12]);
2206    }
2207}
2208
2209mod recv_buf;
2210mod send_buf;