Skip to main content

quiche/stream/
mod.rs

1// Copyright (C) 2018-2019, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27use std::cmp;
28
29use std::sync::Arc;
30
31use std::collections::hash_map;
32use std::collections::HashMap;
33use std::collections::HashSet;
34
35use intrusive_collections::intrusive_adapter;
36use intrusive_collections::KeyAdapter;
37use intrusive_collections::RBTree;
38use intrusive_collections::RBTreeAtomicLink;
39
40use smallvec::SmallVec;
41
42use crate::buffers::DefaultBufFactory;
43use crate::BufFactory;
44use crate::Error;
45use crate::Result;
46
47const DEFAULT_URGENCY: u8 = 127;
48
49// The default size of the receiver stream flow control window.
50const DEFAULT_STREAM_WINDOW: u64 = 32 * 1024;
51
52/// The maximum size of the receiver stream flow control window.
53pub const MAX_STREAM_WINDOW: u64 = 16 * 1024 * 1024;
54
55/// A simple no-op hasher for Stream IDs.
56///
57/// The QUIC protocol and quiche library guarantees stream ID uniqueness, so
58/// we can save effort by avoiding using a more complicated algorithm.
59#[derive(Default)]
60pub struct StreamIdHasher {
61    id: u64,
62}
63
64/// Return value type of `RecvBuf::reset()`
65#[derive(Debug, PartialEq, Clone, Copy)]
66pub struct RecvBufResetReturn {
67    /// Returns the difference between the previous max_data offset
68    /// received and the final size reported by the reset
69    pub max_data_delta: u64,
70
71    /// The amount of flow control credit that should be returned to the
72    /// connection level flow control.
73    pub consumed_flowcontrol: u64,
74}
75
76impl RecvBufResetReturn {
77    pub fn zero() -> Self {
78        Self {
79            max_data_delta: 0,
80            consumed_flowcontrol: 0,
81        }
82    }
83}
84
85/// Action to perform when reading from a stream's receive buffer.
86pub enum RecvAction<'a> {
87    /// Emit data by copying it into the provided buffer.
88    Emit { out: &'a mut [u8] },
89    /// Discard up to the specified number of bytes without copying.
90    Discard { len: usize },
91}
92
93impl std::hash::Hasher for StreamIdHasher {
94    #[inline]
95    fn finish(&self) -> u64 {
96        self.id
97    }
98
99    #[inline]
100    fn write_u64(&mut self, id: u64) {
101        self.id = id;
102    }
103
104    #[inline]
105    fn write(&mut self, _: &[u8]) {
106        // We need a default write() for the trait but stream IDs will always
107        // be a u64 so we just delegate to write_u64.
108        unimplemented!()
109    }
110}
111
112type BuildStreamIdHasher = std::hash::BuildHasherDefault<StreamIdHasher>;
113
114pub type StreamIdHashMap<V> = HashMap<u64, V, BuildStreamIdHasher>;
115pub type StreamIdHashSet = HashSet<u64, BuildStreamIdHasher>;
116
117/// Keeps track of QUIC streams and enforces stream limits.
118#[derive(Default)]
119pub struct StreamMap<F: BufFactory = DefaultBufFactory> {
120    /// Map of streams indexed by stream ID.
121    streams: StreamIdHashMap<Stream<F>>,
122
123    /// Set of streams that were completed and garbage collected.
124    ///
125    /// Instead of keeping the full stream state forever, we collect completed
126    /// streams to save memory, but we still need to keep track of previously
127    /// created streams, to prevent peers from re-creating them.
128    collected: StreamIdHashSet,
129
130    /// Peer's maximum bidirectional stream count limit.
131    peer_max_streams_bidi: u64,
132
133    /// Peer's maximum unidirectional stream count limit.
134    peer_max_streams_uni: u64,
135
136    /// The total number of bidirectional streams opened by the peer.
137    peer_opened_streams_bidi: u64,
138
139    /// The total number of unidirectional streams opened by the peer.
140    peer_opened_streams_uni: u64,
141
142    /// Local maximum bidirectional stream count limit.
143    local_max_streams_bidi: u64,
144    local_max_streams_bidi_next: u64,
145
146    /// Initial maximum bidirectional stream count.
147    initial_max_streams_bidi: u64,
148
149    /// Local maximum unidirectional stream count limit.
150    local_max_streams_uni: u64,
151    local_max_streams_uni_next: u64,
152
153    /// Initial maximum unidirectional stream count.
154    initial_max_streams_uni: u64,
155
156    /// The total number of bidirectional streams opened by the local endpoint.
157    local_opened_streams_bidi: u64,
158
159    /// The total number of unidirectional streams opened by the local endpoint.
160    local_opened_streams_uni: u64,
161
162    /// Queue of stream IDs corresponding to streams that have buffered data
163    /// ready to be sent to the peer. This also implies that the stream has
164    /// enough flow control credits to send at least some of that data.
165    flushable: RBTree<StreamFlushablePriorityAdapter>,
166
167    /// Set of stream IDs corresponding to streams that have outstanding data
168    /// to read. This is used to generate a `StreamIter` of streams without
169    /// having to iterate over the full list of streams.
170    pub readable: RBTree<StreamReadablePriorityAdapter>,
171
172    /// Set of stream IDs corresponding to streams that have enough flow control
173    /// capacity to be written to, and is not finished. This is used to generate
174    /// a `StreamIter` of streams without having to iterate over the full list
175    /// of streams.
176    pub writable: RBTree<StreamWritablePriorityAdapter>,
177
178    /// Set of stream IDs corresponding to streams that are almost out of flow
179    /// control credit and need to send MAX_STREAM_DATA. This is used to
180    /// generate a `StreamIter` of streams without having to iterate over the
181    /// full list of streams.
182    almost_full: StreamIdHashSet,
183
184    /// Set of stream IDs corresponding to streams that are blocked. The value
185    /// of the map elements represents the offset of the stream at which the
186    /// blocking occurred.
187    blocked: StreamIdHashMap<u64>,
188
189    /// Set of stream IDs corresponding to streams that are reset. The value
190    /// of the map elements is a tuple of the error code and final size values
191    /// to include in the RESET_STREAM frame.
192    reset: StreamIdHashMap<(u64, u64)>,
193
194    /// Set of stream IDs corresponding to streams that are shutdown on the
195    /// receive side, and need to send a STOP_SENDING frame. The value of the
196    /// map elements is the error code to include in the STOP_SENDING frame.
197    stopped: StreamIdHashMap<u64>,
198
199    /// The maximum size of a stream window.
200    max_stream_window: u64,
201}
202
203impl<F: BufFactory> StreamMap<F> {
204    pub fn new(
205        max_streams_bidi: u64, max_streams_uni: u64, max_stream_window: u64,
206    ) -> Self {
207        StreamMap {
208            local_max_streams_bidi: max_streams_bidi,
209            local_max_streams_bidi_next: max_streams_bidi,
210            initial_max_streams_bidi: max_streams_bidi,
211
212            local_max_streams_uni: max_streams_uni,
213            local_max_streams_uni_next: max_streams_uni,
214            initial_max_streams_uni: max_streams_uni,
215
216            max_stream_window,
217
218            ..StreamMap::default()
219        }
220    }
221
222    /// Returns the stream with the given ID if it exists.
223    pub fn get(&self, id: u64) -> Option<&Stream<F>> {
224        self.streams.get(&id)
225    }
226
227    /// Returns the mutable stream with the given ID if it exists.
228    pub fn get_mut(&mut self, id: u64) -> Option<&mut Stream<F>> {
229        self.streams.get_mut(&id)
230    }
231
232    /// Returns the mutable stream with the given ID if it exists, or creates
233    /// a new one otherwise.
234    ///
235    /// The `local` parameter indicates whether the stream's creation was
236    /// requested by the local application rather than the peer, and is
237    /// used to validate the requested stream ID, and to select the initial
238    /// flow control values from the local and remote transport parameters
239    /// (also passed as arguments).
240    ///
241    /// This also takes care of enforcing both local and the peer's stream
242    /// count limits. If one of these limits is violated, the `StreamLimit`
243    /// error is returned.
244    pub(crate) fn get_or_create(
245        &mut self, id: u64, local_params: &crate::TransportParams,
246        peer_params: &crate::TransportParams, local: bool, is_server: bool,
247    ) -> Result<&mut Stream<F>> {
248        let (stream, is_new_and_writable) = match self.streams.entry(id) {
249            hash_map::Entry::Vacant(v) => {
250                // Stream has already been closed and garbage collected.
251                if self.collected.contains(&id) {
252                    return Err(Error::Done);
253                }
254
255                if local != is_local(id, is_server) {
256                    return Err(Error::InvalidStreamState(id));
257                }
258
259                let (max_rx_data, max_tx_data) = match (local, is_bidi(id)) {
260                    // Locally-initiated bidirectional stream.
261                    (true, true) => (
262                        local_params.initial_max_stream_data_bidi_local,
263                        peer_params.initial_max_stream_data_bidi_remote,
264                    ),
265
266                    // Locally-initiated unidirectional stream.
267                    (true, false) => (0, peer_params.initial_max_stream_data_uni),
268
269                    // Remotely-initiated bidirectional stream.
270                    (false, true) => (
271                        local_params.initial_max_stream_data_bidi_remote,
272                        peer_params.initial_max_stream_data_bidi_local,
273                    ),
274
275                    // Remotely-initiated unidirectional stream.
276                    (false, false) =>
277                        (local_params.initial_max_stream_data_uni, 0),
278                };
279
280                // The two least significant bits from a stream id identify the
281                // type of stream. Truncate those bits to get the sequence for
282                // that stream type.
283                let stream_sequence = id >> 2;
284
285                // Enforce stream count limits.
286                match (is_local(id, is_server), is_bidi(id)) {
287                    (true, true) => {
288                        let n = cmp::max(
289                            self.local_opened_streams_bidi,
290                            stream_sequence + 1,
291                        );
292
293                        if n > self.peer_max_streams_bidi {
294                            return Err(Error::StreamLimit);
295                        }
296
297                        self.local_opened_streams_bidi = n;
298                    },
299
300                    (true, false) => {
301                        let n = cmp::max(
302                            self.local_opened_streams_uni,
303                            stream_sequence + 1,
304                        );
305
306                        if n > self.peer_max_streams_uni {
307                            return Err(Error::StreamLimit);
308                        }
309
310                        self.local_opened_streams_uni = n;
311                    },
312
313                    (false, true) => {
314                        let n = cmp::max(
315                            self.peer_opened_streams_bidi,
316                            stream_sequence + 1,
317                        );
318
319                        if n > self.local_max_streams_bidi {
320                            return Err(Error::StreamLimit);
321                        }
322
323                        self.peer_opened_streams_bidi = n;
324                    },
325
326                    (false, false) => {
327                        let n = cmp::max(
328                            self.peer_opened_streams_uni,
329                            stream_sequence + 1,
330                        );
331
332                        if n > self.local_max_streams_uni {
333                            return Err(Error::StreamLimit);
334                        }
335
336                        self.peer_opened_streams_uni = n;
337                    },
338                };
339
340                let s = Stream::new(
341                    id,
342                    max_rx_data,
343                    max_tx_data,
344                    is_bidi(id),
345                    local,
346                    self.max_stream_window,
347                );
348
349                let is_writable = s.is_writable();
350
351                (v.insert(s), is_writable)
352            },
353
354            hash_map::Entry::Occupied(v) => (v.into_mut(), false),
355        };
356
357        // Newly created stream might already be writable due to initial flow
358        // control limits.
359        if is_new_and_writable {
360            self.writable.insert(Arc::clone(&stream.priority_key));
361        }
362
363        Ok(stream)
364    }
365
366    /// Adds the stream ID to the readable streams set.
367    ///
368    /// If the stream was already in the list, this does nothing.
369    pub fn insert_readable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
370        if !priority_key.readable.is_linked() {
371            self.readable.insert(Arc::clone(priority_key));
372        }
373    }
374
375    /// Removes the stream ID from the readable streams set.
376    pub fn remove_readable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
377        if !priority_key.readable.is_linked() {
378            return;
379        }
380
381        let mut c = {
382            let ptr = Arc::as_ptr(priority_key);
383            unsafe { self.readable.cursor_mut_from_ptr(ptr) }
384        };
385
386        c.remove();
387    }
388
389    /// Adds the stream ID to the writable streams set.
390    ///
391    /// This should also be called anytime a new stream is created, in addition
392    /// to when an existing stream becomes writable.
393    ///
394    /// If the stream was already in the list, this does nothing.
395    pub fn insert_writable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
396        if !priority_key.writable.is_linked() {
397            self.writable.insert(Arc::clone(priority_key));
398        }
399    }
400
401    /// Removes the stream ID from the writable streams set.
402    ///
403    /// This should also be called anytime an existing stream stops being
404    /// writable.
405    pub fn remove_writable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
406        if !priority_key.writable.is_linked() {
407            return;
408        }
409
410        let mut c = {
411            let ptr = Arc::as_ptr(priority_key);
412            unsafe { self.writable.cursor_mut_from_ptr(ptr) }
413        };
414
415        c.remove();
416    }
417
418    /// Adds the stream ID to the flushable streams set.
419    ///
420    /// If the stream was already in the list, this does nothing.
421    pub fn insert_flushable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
422        if !priority_key.flushable.is_linked() {
423            self.flushable.insert(Arc::clone(priority_key));
424        }
425    }
426
427    /// Removes the stream ID from the flushable streams set.
428    pub fn remove_flushable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
429        if !priority_key.flushable.is_linked() {
430            return;
431        }
432
433        let mut c = {
434            let ptr = Arc::as_ptr(priority_key);
435            unsafe { self.flushable.cursor_mut_from_ptr(ptr) }
436        };
437
438        c.remove();
439    }
440
441    pub fn peek_flushable(&self) -> Option<Arc<StreamPriorityKey>> {
442        self.flushable.front().clone_pointer()
443    }
444
445    /// Updates the priorities of a stream.
446    pub fn update_priority(
447        &mut self, old: &Arc<StreamPriorityKey>, new: &Arc<StreamPriorityKey>,
448    ) {
449        if old.readable.is_linked() {
450            self.remove_readable(old);
451            self.readable.insert(Arc::clone(new));
452        }
453
454        if old.writable.is_linked() {
455            self.remove_writable(old);
456            self.writable.insert(Arc::clone(new));
457        }
458
459        if old.flushable.is_linked() {
460            self.remove_flushable(old);
461            self.flushable.insert(Arc::clone(new));
462        }
463    }
464
465    /// Adds the stream ID to the almost full streams set.
466    ///
467    /// If the stream was already in the list, this does nothing.
468    pub fn insert_almost_full(&mut self, stream_id: u64) {
469        self.almost_full.insert(stream_id);
470    }
471
472    /// Removes the stream ID from the almost full streams set.
473    pub fn remove_almost_full(&mut self, stream_id: u64) {
474        self.almost_full.remove(&stream_id);
475    }
476
477    /// Adds the stream ID to the blocked streams set with the
478    /// given offset value.
479    ///
480    /// If the stream was already in the list, this does nothing.
481    pub fn insert_blocked(&mut self, stream_id: u64, off: u64) {
482        self.blocked.insert(stream_id, off);
483    }
484
485    /// Removes the stream ID from the blocked streams set.
486    pub fn remove_blocked(&mut self, stream_id: u64) {
487        self.blocked.remove(&stream_id);
488    }
489
490    /// Adds the stream ID to the reset streams set with the
491    /// given error code and final size values.
492    ///
493    /// If the stream was already in the list, this does nothing.
494    pub fn insert_reset(
495        &mut self, stream_id: u64, error_code: u64, final_size: u64,
496    ) {
497        self.reset.insert(stream_id, (error_code, final_size));
498    }
499
500    /// Removes the stream ID from the reset streams set.
501    pub fn remove_reset(&mut self, stream_id: u64) {
502        self.reset.remove(&stream_id);
503    }
504
505    /// Adds the stream ID to the stopped streams set with the
506    /// given error code.
507    ///
508    /// If the stream was already in the list, this does nothing.
509    pub fn insert_stopped(&mut self, stream_id: u64, error_code: u64) {
510        self.stopped.insert(stream_id, error_code);
511    }
512
513    /// Removes the stream ID from the stopped streams set.
514    pub fn remove_stopped(&mut self, stream_id: u64) {
515        self.stopped.remove(&stream_id);
516    }
517
518    /// Updates the peer's maximum bidirectional stream count limit.
519    pub fn update_peer_max_streams_bidi(&mut self, v: u64) {
520        self.peer_max_streams_bidi = cmp::max(self.peer_max_streams_bidi, v);
521    }
522
523    /// Updates the peer's maximum unidirectional stream count limit.
524    pub fn update_peer_max_streams_uni(&mut self, v: u64) {
525        self.peer_max_streams_uni = cmp::max(self.peer_max_streams_uni, v);
526    }
527
528    /// Commits the new max_streams_bidi limit.
529    pub fn update_max_streams_bidi(&mut self) {
530        self.local_max_streams_bidi = self.local_max_streams_bidi_next;
531    }
532
533    /// Sets the max_streams_bidi limit to the given value.
534    pub fn set_max_streams_bidi(&mut self, max: u64) {
535        self.local_max_streams_bidi = max;
536        self.local_max_streams_bidi_next = max;
537        self.initial_max_streams_bidi = max;
538    }
539
540    /// Returns the current max_streams_bidi limit.
541    pub fn max_streams_bidi(&self) -> u64 {
542        self.local_max_streams_bidi
543    }
544
545    /// Returns the new max_streams_bidi limit.
546    pub fn max_streams_bidi_next(&mut self) -> u64 {
547        self.local_max_streams_bidi_next
548    }
549
550    /// Commits the new max_streams_uni limit.
551    pub fn update_max_streams_uni(&mut self) {
552        self.local_max_streams_uni = self.local_max_streams_uni_next;
553    }
554
555    /// Returns the new max_streams_uni limit.
556    pub fn max_streams_uni_next(&mut self) -> u64 {
557        self.local_max_streams_uni_next
558    }
559
560    /// Returns the peer's current maximum bidirectional stream count limit.
561    pub fn peer_max_streams_bidi(&self) -> u64 {
562        self.peer_max_streams_bidi
563    }
564
565    /// Returns the number of bidirectional streams that can be created
566    /// before the peer's stream count limit is reached.
567    pub fn peer_streams_left_bidi(&self) -> u64 {
568        self.peer_max_streams_bidi - self.local_opened_streams_bidi
569    }
570
571    /// Returns the peer's current maximum unidirectional stream count limit.
572    pub fn peer_max_streams_uni(&self) -> u64 {
573        self.peer_max_streams_uni
574    }
575
576    /// Returns the number of unidirectional streams that can be created
577    /// before the peer's stream count limit is reached.
578    pub fn peer_streams_left_uni(&self) -> u64 {
579        self.peer_max_streams_uni - self.local_opened_streams_uni
580    }
581
582    /// Drops completed stream.
583    ///
584    /// This should only be called when Stream::is_complete() returns true for
585    /// the given stream.
586    pub fn collect(&mut self, stream_id: u64, local: bool) {
587        if !local {
588            // If the stream was created by the peer, give back a max streams
589            // credit.
590            if is_bidi(stream_id) {
591                self.local_max_streams_bidi_next =
592                    self.local_max_streams_bidi_next.saturating_add(1);
593            } else {
594                self.local_max_streams_uni_next =
595                    self.local_max_streams_uni_next.saturating_add(1);
596            }
597        }
598
599        let s = self.streams.remove(&stream_id).unwrap();
600
601        self.remove_readable(&s.priority_key);
602
603        self.remove_writable(&s.priority_key);
604
605        self.remove_flushable(&s.priority_key);
606
607        self.collected.insert(stream_id);
608    }
609
610    /// Creates an iterator over streams that have outstanding data to read.
611    pub fn readable(&self) -> StreamIter {
612        StreamIter {
613            streams: self.readable.iter().map(|s| s.id).collect(),
614            index: 0,
615        }
616    }
617
618    /// Creates an iterator over streams that can be written to.
619    pub fn writable(&self) -> StreamIter {
620        StreamIter {
621            streams: self.writable.iter().map(|s| s.id).collect(),
622            index: 0,
623        }
624    }
625
626    /// Creates an iterator over streams that need to send MAX_STREAM_DATA.
627    pub fn almost_full(&self) -> StreamIter {
628        StreamIter::from(&self.almost_full)
629    }
630
631    /// Creates an iterator over streams that need to send STREAM_DATA_BLOCKED.
632    pub fn blocked(&self) -> hash_map::Iter<'_, u64, u64> {
633        self.blocked.iter()
634    }
635
636    /// Creates an iterator over streams that need to send RESET_STREAM.
637    pub fn reset(&self) -> hash_map::Iter<'_, u64, (u64, u64)> {
638        self.reset.iter()
639    }
640
641    /// Creates an iterator over streams that need to send STOP_SENDING.
642    pub fn stopped(&self) -> hash_map::Iter<'_, u64, u64> {
643        self.stopped.iter()
644    }
645
646    /// Returns true if the stream has been collected.
647    pub fn is_collected(&self, stream_id: u64) -> bool {
648        self.collected.contains(&stream_id)
649    }
650
651    /// Returns true if there are any streams that have data to write.
652    pub fn has_flushable(&self) -> bool {
653        !self.flushable.is_empty()
654    }
655
656    /// Returns true if there are any streams that have data to read.
657    pub fn has_readable(&self) -> bool {
658        !self.readable.is_empty()
659    }
660
661    /// Returns true if there are any streams that need to update the local
662    /// flow control limit.
663    pub fn has_almost_full(&self) -> bool {
664        !self.almost_full.is_empty()
665    }
666
667    /// Returns true if there are any streams that are blocked.
668    pub fn has_blocked(&self) -> bool {
669        !self.blocked.is_empty()
670    }
671
672    /// Returns true if there are any streams that are reset.
673    pub fn has_reset(&self) -> bool {
674        !self.reset.is_empty()
675    }
676
677    /// Returns true if there are any streams that need to send STOP_SENDING.
678    pub fn has_stopped(&self) -> bool {
679        !self.stopped.is_empty()
680    }
681
682    /// Returns true if the max bidirectional streams count needs to be updated
683    /// by sending a MAX_STREAMS frame to the peer.
684    ///
685    /// This only sends MAX_STREAMS when available capacity is at or below 50%
686    /// of the initial maximum streams target.
687    pub fn should_update_max_streams_bidi(&self) -> bool {
688        let available = self
689            .local_max_streams_bidi
690            .saturating_sub(self.peer_opened_streams_bidi);
691        self.local_max_streams_bidi_next != self.local_max_streams_bidi &&
692            available <= self.initial_max_streams_bidi / 2
693    }
694
695    /// Returns true if the max unidirectional streams count needs to be updated
696    /// by sending a MAX_STREAMS frame to the peer.
697    ///
698    /// This only send MAX_STREAMS when available capacity is at or below 50% of
699    /// the initial maximum streams target.
700    pub fn should_update_max_streams_uni(&self) -> bool {
701        let available = self
702            .local_max_streams_uni
703            .saturating_sub(self.peer_opened_streams_uni);
704        self.local_max_streams_uni_next != self.local_max_streams_uni &&
705            available <= self.initial_max_streams_uni / 2
706    }
707
708    /// Returns the number of active streams in the map.
709    #[cfg(test)]
710    pub fn len(&self) -> usize {
711        self.streams.len()
712    }
713}
714
715/// A QUIC stream.
716pub struct Stream<F: BufFactory = DefaultBufFactory> {
717    /// Receive-side stream buffer.
718    pub recv: recv_buf::RecvBuf,
719
720    /// Send-side stream buffer.
721    pub send: send_buf::SendBuf<F>,
722
723    pub send_lowat: usize,
724
725    /// Whether the stream is bidirectional.
726    pub bidi: bool,
727
728    /// Whether the stream was created by the local endpoint.
729    pub local: bool,
730
731    /// The stream's urgency (lower is better). Default is `DEFAULT_URGENCY`.
732    pub urgency: u8,
733
734    /// Whether the stream can be flushed incrementally. Default is `true`.
735    pub incremental: bool,
736
737    pub priority_key: Arc<StreamPriorityKey>,
738}
739
740impl<F: BufFactory> Stream<F> {
741    /// Creates a new stream with the given flow control limits.
742    pub fn new(
743        id: u64, max_rx_data: u64, max_tx_data: u64, bidi: bool, local: bool,
744        max_window: u64,
745    ) -> Self {
746        let priority_key = Arc::new(StreamPriorityKey {
747            id,
748            ..Default::default()
749        });
750
751        Stream {
752            recv: recv_buf::RecvBuf::new(max_rx_data, max_window),
753            send: send_buf::SendBuf::new(max_tx_data),
754            send_lowat: 1,
755            bidi,
756            local,
757            urgency: priority_key.urgency,
758            incremental: priority_key.incremental,
759            priority_key,
760        }
761    }
762
763    /// Returns true if the stream has data to read.
764    pub fn is_readable(&self) -> bool {
765        self.recv.ready()
766    }
767
768    /// Returns true if the stream has enough flow control capacity to be
769    /// written to, and is not finished.
770    pub fn is_writable(&self) -> bool {
771        !self.send.is_shutdown() &&
772            !self.send.is_fin() &&
773            (self.send.off_back() + self.send_lowat as u64) <
774                self.send.max_off()
775    }
776
777    /// Returns true if the stream has data to send and is allowed to send at
778    /// least some of it.
779    pub fn is_flushable(&self) -> bool {
780        let off_front = self.send.off_front();
781
782        !self.send.is_empty() &&
783            off_front < self.send.off_back() &&
784            off_front < self.send.max_off()
785    }
786
787    /// Returns true if the stream is complete.
788    ///
789    /// For bidirectional streams this happens when both the receive and send
790    /// sides are complete. That is when all incoming data has been read by the
791    /// application, and when all outgoing data has been acked by the peer.
792    ///
793    /// For unidirectional streams this happens when either the receive or send
794    /// side is complete, depending on whether the stream was created locally
795    /// or not.
796    pub fn is_complete(&self) -> bool {
797        match (self.bidi, self.local) {
798            // For bidirectional streams we need to check both receive and send
799            // sides for completion.
800            (true, _) => self.recv.is_fin() && self.send.is_complete(),
801
802            // For unidirectional streams generated locally, we only need to
803            // check the send side for completion.
804            (false, true) => self.send.is_complete(),
805
806            // For unidirectional streams generated by the peer, we only need
807            // to check the receive side for completion.
808            (false, false) => self.recv.is_fin(),
809        }
810    }
811}
812
813/// Returns true if the stream was created locally.
814pub fn is_local(stream_id: u64, is_server: bool) -> bool {
815    (stream_id & 0x1) == (is_server as u64)
816}
817
818/// Returns true if the stream is bidirectional.
819pub fn is_bidi(stream_id: u64) -> bool {
820    (stream_id & 0x2) == 0
821}
822
823#[derive(Clone, Debug)]
824pub struct StreamPriorityKey {
825    pub urgency: u8,
826    pub incremental: bool,
827    pub id: u64,
828
829    pub readable: RBTreeAtomicLink,
830    pub writable: RBTreeAtomicLink,
831    pub flushable: RBTreeAtomicLink,
832}
833
834impl Default for StreamPriorityKey {
835    fn default() -> Self {
836        Self {
837            urgency: DEFAULT_URGENCY,
838            incremental: true,
839            id: Default::default(),
840            readable: Default::default(),
841            writable: Default::default(),
842            flushable: Default::default(),
843        }
844    }
845}
846
847impl PartialEq for StreamPriorityKey {
848    fn eq(&self, other: &Self) -> bool {
849        self.id == other.id
850    }
851}
852
853impl Eq for StreamPriorityKey {}
854
855impl PartialOrd for StreamPriorityKey {
856    fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
857        Some(self.cmp(other))
858    }
859}
860
861impl Ord for StreamPriorityKey {
862    fn cmp(&self, other: &Self) -> cmp::Ordering {
863        // Ignore priority if ID matches.
864        if self.id == other.id {
865            return cmp::Ordering::Equal;
866        }
867
868        // First, order by urgency...
869        if self.urgency != other.urgency {
870            return self.urgency.cmp(&other.urgency);
871        }
872
873        // ...when the urgency is the same, and both are not incremental, order
874        // by stream ID...
875        if !self.incremental && !other.incremental {
876            return self.id.cmp(&other.id);
877        }
878
879        // ...non-incremental takes priority over incremental...
880        if self.incremental && !other.incremental {
881            return cmp::Ordering::Greater;
882        }
883        if !self.incremental && other.incremental {
884            return cmp::Ordering::Less;
885        }
886
887        // ...finally, when both are incremental, `other` takes precedence (so
888        // `self` is always sorted after other same-urgency incremental
889        // entries).
890        cmp::Ordering::Greater
891    }
892}
893
894intrusive_adapter!(pub StreamWritablePriorityAdapter = Arc<StreamPriorityKey>: StreamPriorityKey { writable: RBTreeAtomicLink });
895
896impl KeyAdapter<'_> for StreamWritablePriorityAdapter {
897    type Key = StreamPriorityKey;
898
899    fn get_key(&self, s: &StreamPriorityKey) -> Self::Key {
900        s.clone()
901    }
902}
903
904intrusive_adapter!(pub StreamReadablePriorityAdapter = Arc<StreamPriorityKey>: StreamPriorityKey { readable: RBTreeAtomicLink });
905
906impl KeyAdapter<'_> for StreamReadablePriorityAdapter {
907    type Key = StreamPriorityKey;
908
909    fn get_key(&self, s: &StreamPriorityKey) -> Self::Key {
910        s.clone()
911    }
912}
913
914intrusive_adapter!(pub StreamFlushablePriorityAdapter = Arc<StreamPriorityKey>: StreamPriorityKey { flushable: RBTreeAtomicLink });
915
916impl KeyAdapter<'_> for StreamFlushablePriorityAdapter {
917    type Key = StreamPriorityKey;
918
919    fn get_key(&self, s: &StreamPriorityKey) -> Self::Key {
920        s.clone()
921    }
922}
923
924/// An iterator over QUIC streams.
925#[derive(Default)]
926pub struct StreamIter {
927    streams: SmallVec<[u64; 8]>,
928    index: usize,
929}
930
931impl StreamIter {
932    #[inline]
933    fn from(streams: &StreamIdHashSet) -> Self {
934        StreamIter {
935            streams: streams.iter().copied().collect(),
936            index: 0,
937        }
938    }
939}
940
941impl Iterator for StreamIter {
942    type Item = u64;
943
944    #[inline]
945    fn next(&mut self) -> Option<Self::Item> {
946        let v = self.streams.get(self.index)?;
947        self.index += 1;
948        Some(*v)
949    }
950}
951
952impl ExactSizeIterator for StreamIter {
953    #[inline]
954    fn len(&self) -> usize {
955        self.streams.len() - self.index
956    }
957}
958
959#[cfg(test)]
960mod tests {
961    use crate::range_buf::RangeBuf;
962
963    use super::*;
964
965    #[test]
966    fn recv_flow_control() {
967        let mut stream =
968            <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
969        assert!(!stream.recv.almost_full());
970
971        let mut buf = [0; 32];
972
973        let first = RangeBuf::from(b"hello", 0, false);
974        let second = RangeBuf::from(b"world", 5, false);
975        let third = RangeBuf::from(b"something", 10, false);
976
977        assert_eq!(stream.recv.write(second), Ok(()));
978        assert_eq!(stream.recv.write(first), Ok(()));
979        assert!(!stream.recv.almost_full());
980
981        assert_eq!(stream.recv.write(third), Err(Error::FlowControl));
982
983        let (len, fin) = stream.recv.emit(&mut buf).unwrap();
984        assert_eq!(&buf[..len], b"helloworld");
985        assert!(!fin);
986
987        assert!(stream.recv.almost_full());
988
989        stream.recv.update_max_data(std::time::Instant::now());
990        assert_eq!(stream.recv.max_data_next(), 25);
991        assert!(!stream.recv.almost_full());
992
993        let third = RangeBuf::from(b"something", 10, false);
994        assert_eq!(stream.recv.write(third), Ok(()));
995    }
996
997    #[test]
998    fn recv_past_fin() {
999        let mut stream =
1000            <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1001        assert!(!stream.recv.almost_full());
1002
1003        let first = RangeBuf::from(b"hello", 0, true);
1004        let second = RangeBuf::from(b"world", 5, false);
1005
1006        assert_eq!(stream.recv.write(first), Ok(()));
1007        assert_eq!(stream.recv.write(second), Err(Error::FinalSize));
1008    }
1009
1010    #[test]
1011    fn recv_fin_dup() {
1012        let mut stream =
1013            <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1014        assert!(!stream.recv.almost_full());
1015
1016        let first = RangeBuf::from(b"hello", 0, true);
1017        let second = RangeBuf::from(b"hello", 0, true);
1018
1019        assert_eq!(stream.recv.write(first), Ok(()));
1020        assert_eq!(stream.recv.write(second), Ok(()));
1021
1022        let mut buf = [0; 32];
1023
1024        let (len, fin) = stream.recv.emit(&mut buf).unwrap();
1025        assert_eq!(&buf[..len], b"hello");
1026        assert!(fin);
1027    }
1028
1029    #[test]
1030    fn recv_fin_change() {
1031        let mut stream =
1032            <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1033        assert!(!stream.recv.almost_full());
1034
1035        let first = RangeBuf::from(b"hello", 0, true);
1036        let second = RangeBuf::from(b"world", 5, true);
1037
1038        assert_eq!(stream.recv.write(second), Ok(()));
1039        assert_eq!(stream.recv.write(first), Err(Error::FinalSize));
1040    }
1041
1042    #[test]
1043    fn recv_fin_lower_than_received() {
1044        let mut stream =
1045            <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1046        assert!(!stream.recv.almost_full());
1047
1048        let first = RangeBuf::from(b"hello", 0, true);
1049        let second = RangeBuf::from(b"world", 5, false);
1050
1051        assert_eq!(stream.recv.write(second), Ok(()));
1052        assert_eq!(stream.recv.write(first), Err(Error::FinalSize));
1053    }
1054
1055    #[test]
1056    fn recv_fin_flow_control() {
1057        let mut stream =
1058            <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1059        assert!(!stream.recv.almost_full());
1060
1061        let mut buf = [0; 32];
1062
1063        let first = RangeBuf::from(b"hello", 0, false);
1064        let second = RangeBuf::from(b"world", 5, true);
1065
1066        assert_eq!(stream.recv.write(first), Ok(()));
1067        assert_eq!(stream.recv.write(second), Ok(()));
1068
1069        let (len, fin) = stream.recv.emit(&mut buf).unwrap();
1070        assert_eq!(&buf[..len], b"helloworld");
1071        assert!(fin);
1072
1073        assert!(!stream.recv.almost_full());
1074    }
1075
1076    #[test]
1077    fn recv_fin_reset_mismatch() {
1078        let mut stream =
1079            <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1080        assert!(!stream.recv.almost_full());
1081
1082        let first = RangeBuf::from(b"hello", 0, true);
1083
1084        assert_eq!(stream.recv.write(first), Ok(()));
1085        assert_eq!(stream.recv.reset(0, 10), Err(Error::FinalSize));
1086    }
1087
1088    #[test]
1089    fn recv_reset_with_gap() {
1090        let mut stream =
1091            <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1092        assert!(!stream.recv.almost_full());
1093
1094        let first = RangeBuf::from(b"hello", 0, false);
1095
1096        assert_eq!(stream.recv.write(first), Ok(()));
1097        // Read one byte.
1098        assert_eq!(stream.recv.emit(&mut [0; 1]), Ok((1, false)));
1099        // Reset with a final size > than max previously received
1100        assert_eq!(
1101            stream.recv.reset(0, 10),
1102            Ok(RecvBufResetReturn {
1103                max_data_delta: 5,
1104                // consumed_flowcontrol is 9, since we already read 1 byte
1105                consumed_flowcontrol: 9
1106            })
1107        );
1108        assert_eq!(stream.recv.reset(0, 10), Ok(RecvBufResetReturn::zero()));
1109    }
1110
1111    #[test]
1112    fn recv_reset_dup() {
1113        let mut stream =
1114            <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1115        assert!(!stream.recv.almost_full());
1116
1117        let first = RangeBuf::from(b"hello", 0, false);
1118
1119        assert_eq!(stream.recv.write(first), Ok(()));
1120        assert_eq!(
1121            stream.recv.reset(0, 5),
1122            Ok(RecvBufResetReturn {
1123                max_data_delta: 0,
1124                consumed_flowcontrol: 5
1125            })
1126        );
1127        assert_eq!(stream.recv.reset(0, 5), Ok(RecvBufResetReturn::zero()));
1128    }
1129
1130    #[test]
1131    fn recv_reset_change() {
1132        let mut stream =
1133            <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1134        assert!(!stream.recv.almost_full());
1135
1136        let first = RangeBuf::from(b"hello", 0, false);
1137
1138        assert_eq!(stream.recv.write(first), Ok(()));
1139        assert_eq!(
1140            stream.recv.reset(0, 5),
1141            Ok(RecvBufResetReturn {
1142                max_data_delta: 0,
1143                consumed_flowcontrol: 5
1144            })
1145        );
1146        assert_eq!(stream.recv.reset(0, 10), Err(Error::FinalSize));
1147    }
1148
1149    #[test]
1150    fn recv_reset_lower_than_received() {
1151        let mut stream =
1152            <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1153        assert!(!stream.recv.almost_full());
1154
1155        let first = RangeBuf::from(b"hello", 0, false);
1156
1157        assert_eq!(stream.recv.write(first), Ok(()));
1158        assert_eq!(stream.recv.reset(0, 4), Err(Error::FinalSize));
1159    }
1160
1161    #[test]
1162    fn send_flow_control() {
1163        let mut buf = [0; 25];
1164
1165        let mut stream =
1166            <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1167
1168        let first = b"hello";
1169        let second = b"world";
1170        let third = b"something";
1171
1172        assert!(stream.send.write(first, false).is_ok());
1173        assert!(stream.send.write(second, false).is_ok());
1174        assert!(stream.send.write(third, false).is_ok());
1175
1176        assert_eq!(stream.send.off_front(), 0);
1177
1178        let (written, fin) = stream.send.emit(&mut buf[..25]).unwrap();
1179        assert_eq!(written, 15);
1180        assert!(!fin);
1181        assert_eq!(&buf[..written], b"helloworldsomet");
1182
1183        assert_eq!(stream.send.off_front(), 15);
1184
1185        let (written, fin) = stream.send.emit(&mut buf[..25]).unwrap();
1186        assert_eq!(written, 0);
1187        assert!(!fin);
1188        assert_eq!(&buf[..written], b"");
1189
1190        stream.send.retransmit(0, 15);
1191
1192        assert_eq!(stream.send.off_front(), 0);
1193
1194        let (written, fin) = stream.send.emit(&mut buf[..10]).unwrap();
1195        assert_eq!(written, 10);
1196        assert!(!fin);
1197        assert_eq!(&buf[..written], b"helloworld");
1198
1199        assert_eq!(stream.send.off_front(), 10);
1200
1201        let (written, fin) = stream.send.emit(&mut buf[..10]).unwrap();
1202        assert_eq!(written, 5);
1203        assert!(!fin);
1204        assert_eq!(&buf[..written], b"somet");
1205    }
1206
1207    #[test]
1208    fn send_past_fin() {
1209        let mut stream =
1210            <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1211
1212        let first = b"hello";
1213        let second = b"world";
1214        let third = b"third";
1215
1216        assert_eq!(stream.send.write(first, false), Ok(5));
1217
1218        assert_eq!(stream.send.write(second, true), Ok(5));
1219        assert!(stream.send.is_fin());
1220
1221        assert_eq!(stream.send.write(third, false), Err(Error::FinalSize));
1222    }
1223
1224    #[test]
1225    fn send_fin_dup() {
1226        let mut stream =
1227            <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1228
1229        assert_eq!(stream.send.write(b"hello", true), Ok(5));
1230        assert!(stream.send.is_fin());
1231
1232        assert_eq!(stream.send.write(b"", true), Ok(0));
1233        assert!(stream.send.is_fin());
1234    }
1235
1236    #[test]
1237    fn send_undo_fin() {
1238        let mut stream =
1239            <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1240
1241        assert_eq!(stream.send.write(b"hello", true), Ok(5));
1242        assert!(stream.send.is_fin());
1243
1244        assert_eq!(
1245            stream.send.write(b"helloworld", true),
1246            Err(Error::FinalSize)
1247        );
1248    }
1249
1250    #[test]
1251    fn send_fin_max_data_match() {
1252        let mut buf = [0; 15];
1253
1254        let mut stream =
1255            <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1256
1257        let slice = b"hellohellohello";
1258
1259        assert!(stream.send.write(slice, true).is_ok());
1260
1261        let (written, fin) = stream.send.emit(&mut buf[..15]).unwrap();
1262        assert_eq!(written, 15);
1263        assert!(fin);
1264        assert_eq!(&buf[..written], slice);
1265    }
1266
1267    #[test]
1268    fn send_fin_zero_length() {
1269        let mut buf = [0; 5];
1270
1271        let mut stream =
1272            <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1273
1274        assert_eq!(stream.send.write(b"hello", false), Ok(5));
1275        assert_eq!(stream.send.write(b"", true), Ok(0));
1276        assert!(stream.send.is_fin());
1277
1278        let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1279        assert_eq!(written, 5);
1280        assert!(fin);
1281        assert_eq!(&buf[..written], b"hello");
1282    }
1283
1284    #[test]
1285    fn send_ack() {
1286        let mut buf = [0; 5];
1287
1288        let mut stream =
1289            <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1290
1291        assert_eq!(stream.send.write(b"hello", false), Ok(5));
1292        assert_eq!(stream.send.write(b"world", false), Ok(5));
1293        assert_eq!(stream.send.write(b"", true), Ok(0));
1294        assert!(stream.send.is_fin());
1295
1296        assert_eq!(stream.send.off_front(), 0);
1297
1298        let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1299        assert_eq!(written, 5);
1300        assert!(!fin);
1301        assert_eq!(&buf[..written], b"hello");
1302
1303        stream.send.ack_and_drop(0, 5);
1304
1305        stream.send.retransmit(0, 5);
1306
1307        assert_eq!(stream.send.off_front(), 5);
1308
1309        let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1310        assert_eq!(written, 5);
1311        assert!(fin);
1312        assert_eq!(&buf[..written], b"world");
1313    }
1314
1315    #[test]
1316    fn send_ack_reordering() {
1317        let mut buf = [0; 5];
1318
1319        let mut stream =
1320            <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1321
1322        assert_eq!(stream.send.write(b"hello", false), Ok(5));
1323        assert_eq!(stream.send.write(b"world", false), Ok(5));
1324        assert_eq!(stream.send.write(b"", true), Ok(0));
1325        assert!(stream.send.is_fin());
1326
1327        assert_eq!(stream.send.off_front(), 0);
1328
1329        let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1330        assert_eq!(written, 5);
1331        assert!(!fin);
1332        assert_eq!(&buf[..written], b"hello");
1333
1334        assert_eq!(stream.send.off_front(), 5);
1335
1336        let (written, fin) = stream.send.emit(&mut buf[..1]).unwrap();
1337        assert_eq!(written, 1);
1338        assert!(!fin);
1339        assert_eq!(&buf[..written], b"w");
1340
1341        stream.send.ack_and_drop(5, 1);
1342        stream.send.ack_and_drop(0, 5);
1343
1344        stream.send.retransmit(0, 5);
1345        stream.send.retransmit(5, 1);
1346
1347        assert_eq!(stream.send.off_front(), 6);
1348
1349        let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1350        assert_eq!(written, 4);
1351        assert!(fin);
1352        assert_eq!(&buf[..written], b"orld");
1353    }
1354
1355    #[test]
1356    fn recv_data_below_off() {
1357        let mut stream =
1358            <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1359
1360        let first = RangeBuf::from(b"hello", 0, false);
1361
1362        assert_eq!(stream.recv.write(first), Ok(()));
1363
1364        let mut buf = [0; 10];
1365
1366        let (len, fin) = stream.recv.emit(&mut buf).unwrap();
1367        assert_eq!(&buf[..len], b"hello");
1368        assert!(!fin);
1369
1370        let first = RangeBuf::from(b"elloworld", 1, true);
1371        assert_eq!(stream.recv.write(first), Ok(()));
1372
1373        let (len, fin) = stream.recv.emit(&mut buf).unwrap();
1374        assert_eq!(&buf[..len], b"world");
1375        assert!(fin);
1376    }
1377
1378    #[test]
1379    fn stream_complete() {
1380        let mut stream =
1381            <Stream>::new(0, 30, 30, true, true, DEFAULT_STREAM_WINDOW);
1382
1383        assert_eq!(stream.send.write(b"hello", false), Ok(5));
1384        assert_eq!(stream.send.write(b"world", false), Ok(5));
1385
1386        assert!(!stream.send.is_complete());
1387        assert!(!stream.send.is_fin());
1388
1389        assert_eq!(stream.send.write(b"", true), Ok(0));
1390
1391        assert!(!stream.send.is_complete());
1392        assert!(stream.send.is_fin());
1393
1394        let buf = RangeBuf::from(b"hello", 0, true);
1395        assert!(stream.recv.write(buf).is_ok());
1396        assert!(!stream.recv.is_fin());
1397
1398        stream.send.ack(6, 4);
1399        assert!(!stream.send.is_complete());
1400
1401        let mut buf = [0; 2];
1402        assert_eq!(stream.recv.emit(&mut buf), Ok((2, false)));
1403        assert!(!stream.recv.is_fin());
1404
1405        stream.send.ack(1, 5);
1406        assert!(!stream.send.is_complete());
1407
1408        stream.send.ack(0, 1);
1409        assert!(stream.send.is_complete());
1410
1411        assert!(!stream.is_complete());
1412
1413        let mut buf = [0; 3];
1414        assert_eq!(stream.recv.emit(&mut buf), Ok((3, true)));
1415        assert!(stream.recv.is_fin());
1416
1417        assert!(stream.is_complete());
1418    }
1419
1420    #[test]
1421    fn send_fin_zero_length_output() {
1422        let mut buf = [0; 5];
1423
1424        let mut stream =
1425            <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1426
1427        assert_eq!(stream.send.write(b"hello", false), Ok(5));
1428        assert_eq!(stream.send.off_front(), 0);
1429        assert!(!stream.send.is_fin());
1430
1431        let (written, fin) = stream.send.emit(&mut buf).unwrap();
1432        assert_eq!(written, 5);
1433        assert!(!fin);
1434        assert_eq!(&buf[..written], b"hello");
1435
1436        assert_eq!(stream.send.write(b"", true), Ok(0));
1437        assert!(stream.send.is_fin());
1438        assert_eq!(stream.send.off_front(), 5);
1439
1440        let (written, fin) = stream.send.emit(&mut buf).unwrap();
1441        assert_eq!(written, 0);
1442        assert!(fin);
1443        assert_eq!(&buf[..written], b"");
1444    }
1445
1446    fn stream_send_ready(stream: &Stream) -> bool {
1447        !stream.send.is_empty() &&
1448            stream.send.off_front() < stream.send.off_back()
1449    }
1450
1451    #[test]
1452    fn send_emit() {
1453        let mut buf = [0; 5];
1454
1455        let mut stream =
1456            <Stream>::new(0, 0, 20, true, true, DEFAULT_STREAM_WINDOW);
1457
1458        assert_eq!(stream.send.write(b"hello", false), Ok(5));
1459        assert_eq!(stream.send.write(b"world", false), Ok(5));
1460        assert_eq!(stream.send.write(b"olleh", false), Ok(5));
1461        assert_eq!(stream.send.write(b"dlrow", true), Ok(5));
1462        assert_eq!(stream.send.off_front(), 0);
1463        assert_eq!(stream.send.bufs_count(), 4);
1464
1465        assert!(stream.is_flushable());
1466
1467        assert!(stream_send_ready(&stream));
1468        assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1469        assert_eq!(stream.send.off_front(), 4);
1470        assert_eq!(&buf[..4], b"hell");
1471
1472        assert!(stream_send_ready(&stream));
1473        assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1474        assert_eq!(stream.send.off_front(), 8);
1475        assert_eq!(&buf[..4], b"owor");
1476
1477        assert!(stream_send_ready(&stream));
1478        assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1479        assert_eq!(stream.send.off_front(), 10);
1480        assert_eq!(&buf[..2], b"ld");
1481
1482        assert!(stream_send_ready(&stream));
1483        assert_eq!(stream.send.emit(&mut buf[..1]), Ok((1, false)));
1484        assert_eq!(stream.send.off_front(), 11);
1485        assert_eq!(&buf[..1], b"o");
1486
1487        assert!(stream_send_ready(&stream));
1488        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1489        assert_eq!(stream.send.off_front(), 16);
1490        assert_eq!(&buf[..5], b"llehd");
1491
1492        assert!(stream_send_ready(&stream));
1493        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((4, true)));
1494        assert_eq!(stream.send.off_front(), 20);
1495        assert_eq!(&buf[..4], b"lrow");
1496
1497        assert!(!stream.is_flushable());
1498
1499        assert!(!stream_send_ready(&stream));
1500        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((0, true)));
1501        assert_eq!(stream.send.off_front(), 20);
1502    }
1503
1504    #[test]
1505    fn send_emit_ack() {
1506        let mut buf = [0; 5];
1507
1508        let mut stream =
1509            <Stream>::new(0, 0, 20, true, true, DEFAULT_STREAM_WINDOW);
1510
1511        assert_eq!(stream.send.write(b"hello", false), Ok(5));
1512        assert_eq!(stream.send.write(b"world", false), Ok(5));
1513        assert_eq!(stream.send.write(b"olleh", false), Ok(5));
1514        assert_eq!(stream.send.write(b"dlrow", true), Ok(5));
1515        assert_eq!(stream.send.off_front(), 0);
1516        assert_eq!(stream.send.bufs_count(), 4);
1517
1518        assert!(stream.is_flushable());
1519
1520        assert!(stream_send_ready(&stream));
1521        assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1522        assert_eq!(stream.send.off_front(), 4);
1523        assert_eq!(&buf[..4], b"hell");
1524
1525        assert!(stream_send_ready(&stream));
1526        assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1527        assert_eq!(stream.send.off_front(), 8);
1528        assert_eq!(&buf[..4], b"owor");
1529
1530        stream.send.ack_and_drop(0, 5);
1531        assert_eq!(stream.send.bufs_count(), 3);
1532
1533        assert!(stream_send_ready(&stream));
1534        assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1535        assert_eq!(stream.send.off_front(), 10);
1536        assert_eq!(&buf[..2], b"ld");
1537
1538        stream.send.ack_and_drop(7, 5);
1539        assert_eq!(stream.send.bufs_count(), 3);
1540
1541        assert!(stream_send_ready(&stream));
1542        assert_eq!(stream.send.emit(&mut buf[..1]), Ok((1, false)));
1543        assert_eq!(stream.send.off_front(), 11);
1544        assert_eq!(&buf[..1], b"o");
1545
1546        assert!(stream_send_ready(&stream));
1547        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1548        assert_eq!(stream.send.off_front(), 16);
1549        assert_eq!(&buf[..5], b"llehd");
1550
1551        stream.send.ack_and_drop(5, 7);
1552        assert_eq!(stream.send.bufs_count(), 2);
1553
1554        assert!(stream_send_ready(&stream));
1555        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((4, true)));
1556        assert_eq!(stream.send.off_front(), 20);
1557        assert_eq!(&buf[..4], b"lrow");
1558
1559        assert!(!stream.is_flushable());
1560
1561        assert!(!stream_send_ready(&stream));
1562        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((0, true)));
1563        assert_eq!(stream.send.off_front(), 20);
1564
1565        stream.send.ack_and_drop(22, 4);
1566        assert_eq!(stream.send.bufs_count(), 2);
1567
1568        stream.send.ack_and_drop(20, 1);
1569        assert_eq!(stream.send.bufs_count(), 2);
1570    }
1571
1572    #[test]
1573    fn send_emit_retransmit() {
1574        let mut buf = [0; 5];
1575
1576        let mut stream =
1577            <Stream>::new(0, 0, 20, true, true, DEFAULT_STREAM_WINDOW);
1578
1579        assert_eq!(stream.send.write(b"hello", false), Ok(5));
1580        assert_eq!(stream.send.write(b"world", false), Ok(5));
1581        assert_eq!(stream.send.write(b"olleh", false), Ok(5));
1582        assert_eq!(stream.send.write(b"dlrow", true), Ok(5));
1583        assert_eq!(stream.send.off_front(), 0);
1584        assert_eq!(stream.send.bufs_count(), 4);
1585
1586        assert!(stream.is_flushable());
1587
1588        assert!(stream_send_ready(&stream));
1589        assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1590        assert_eq!(stream.send.off_front(), 4);
1591        assert_eq!(&buf[..4], b"hell");
1592
1593        assert!(stream_send_ready(&stream));
1594        assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1595        assert_eq!(stream.send.off_front(), 8);
1596        assert_eq!(&buf[..4], b"owor");
1597
1598        stream.send.retransmit(3, 3);
1599        assert_eq!(stream.send.off_front(), 3);
1600
1601        assert!(stream_send_ready(&stream));
1602        assert_eq!(stream.send.emit(&mut buf[..3]), Ok((3, false)));
1603        assert_eq!(stream.send.off_front(), 8);
1604        assert_eq!(&buf[..3], b"low");
1605
1606        assert!(stream_send_ready(&stream));
1607        assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1608        assert_eq!(stream.send.off_front(), 10);
1609        assert_eq!(&buf[..2], b"ld");
1610
1611        stream.send.ack_and_drop(7, 2);
1612
1613        stream.send.retransmit(8, 2);
1614
1615        assert!(stream_send_ready(&stream));
1616        assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1617        assert_eq!(stream.send.off_front(), 10);
1618        assert_eq!(&buf[..2], b"ld");
1619
1620        assert!(stream_send_ready(&stream));
1621        assert_eq!(stream.send.emit(&mut buf[..1]), Ok((1, false)));
1622        assert_eq!(stream.send.off_front(), 11);
1623        assert_eq!(&buf[..1], b"o");
1624
1625        assert!(stream_send_ready(&stream));
1626        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1627        assert_eq!(stream.send.off_front(), 16);
1628        assert_eq!(&buf[..5], b"llehd");
1629
1630        stream.send.retransmit(12, 2);
1631
1632        assert!(stream_send_ready(&stream));
1633        assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1634        assert_eq!(stream.send.off_front(), 16);
1635        assert_eq!(&buf[..2], b"le");
1636
1637        assert!(stream_send_ready(&stream));
1638        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((4, true)));
1639        assert_eq!(stream.send.off_front(), 20);
1640        assert_eq!(&buf[..4], b"lrow");
1641
1642        assert!(!stream.is_flushable());
1643
1644        assert!(!stream_send_ready(&stream));
1645        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((0, true)));
1646        assert_eq!(stream.send.off_front(), 20);
1647
1648        stream.send.retransmit(7, 12);
1649
1650        assert!(stream_send_ready(&stream));
1651        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1652        assert_eq!(stream.send.off_front(), 12);
1653        assert_eq!(&buf[..5], b"rldol");
1654
1655        assert!(stream_send_ready(&stream));
1656        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1657        assert_eq!(stream.send.off_front(), 17);
1658        assert_eq!(&buf[..5], b"lehdl");
1659
1660        assert!(stream_send_ready(&stream));
1661        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((2, false)));
1662        assert_eq!(stream.send.off_front(), 20);
1663        assert_eq!(&buf[..2], b"ro");
1664
1665        stream.send.ack_and_drop(12, 7);
1666
1667        stream.send.retransmit(7, 12);
1668
1669        assert!(stream_send_ready(&stream));
1670        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1671        assert_eq!(stream.send.off_front(), 12);
1672        assert_eq!(&buf[..5], b"rldol");
1673
1674        assert!(stream_send_ready(&stream));
1675        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1676        assert_eq!(stream.send.off_front(), 17);
1677        assert_eq!(&buf[..5], b"lehdl");
1678
1679        assert!(stream_send_ready(&stream));
1680        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((2, false)));
1681        assert_eq!(stream.send.off_front(), 20);
1682        assert_eq!(&buf[..2], b"ro");
1683    }
1684
1685    #[test]
1686    fn rangebuf_split_off() {
1687        let mut buf = <RangeBuf>::from(b"helloworld", 5, true);
1688        assert_eq!(buf.start, 0);
1689        assert_eq!(buf.pos, 0);
1690        assert_eq!(buf.len, 10);
1691        assert_eq!(buf.off, 5);
1692        assert!(buf.fin);
1693
1694        assert_eq!(buf.len(), 10);
1695        assert_eq!(buf.off(), 5);
1696        assert!(buf.fin());
1697
1698        assert_eq!(&buf[..], b"helloworld");
1699
1700        // Advance buffer.
1701        buf.consume(5);
1702
1703        assert_eq!(buf.start, 0);
1704        assert_eq!(buf.pos, 5);
1705        assert_eq!(buf.len, 10);
1706        assert_eq!(buf.off, 5);
1707        assert!(buf.fin);
1708
1709        assert_eq!(buf.len(), 5);
1710        assert_eq!(buf.off(), 10);
1711        assert!(buf.fin());
1712
1713        assert_eq!(&buf[..], b"world");
1714
1715        // Split buffer before position.
1716        let mut new_buf = buf.split_off(3);
1717
1718        assert_eq!(buf.start, 0);
1719        assert_eq!(buf.pos, 3);
1720        assert_eq!(buf.len, 3);
1721        assert_eq!(buf.off, 5);
1722        assert!(!buf.fin);
1723
1724        assert_eq!(buf.len(), 0);
1725        assert_eq!(buf.off(), 8);
1726        assert!(!buf.fin());
1727
1728        assert_eq!(&buf[..], b"");
1729
1730        assert_eq!(new_buf.start, 3);
1731        assert_eq!(new_buf.pos, 5);
1732        assert_eq!(new_buf.len, 7);
1733        assert_eq!(new_buf.off, 8);
1734        assert!(new_buf.fin);
1735
1736        assert_eq!(new_buf.len(), 5);
1737        assert_eq!(new_buf.off(), 10);
1738        assert!(new_buf.fin());
1739
1740        assert_eq!(&new_buf[..], b"world");
1741
1742        // Advance buffer.
1743        new_buf.consume(2);
1744
1745        assert_eq!(new_buf.start, 3);
1746        assert_eq!(new_buf.pos, 7);
1747        assert_eq!(new_buf.len, 7);
1748        assert_eq!(new_buf.off, 8);
1749        assert!(new_buf.fin);
1750
1751        assert_eq!(new_buf.len(), 3);
1752        assert_eq!(new_buf.off(), 12);
1753        assert!(new_buf.fin());
1754
1755        assert_eq!(&new_buf[..], b"rld");
1756
1757        // Split buffer after position.
1758        let mut new_new_buf = new_buf.split_off(5);
1759
1760        assert_eq!(new_buf.start, 3);
1761        assert_eq!(new_buf.pos, 7);
1762        assert_eq!(new_buf.len, 5);
1763        assert_eq!(new_buf.off, 8);
1764        assert!(!new_buf.fin);
1765
1766        assert_eq!(new_buf.len(), 1);
1767        assert_eq!(new_buf.off(), 12);
1768        assert!(!new_buf.fin());
1769
1770        assert_eq!(&new_buf[..], b"r");
1771
1772        assert_eq!(new_new_buf.start, 8);
1773        assert_eq!(new_new_buf.pos, 8);
1774        assert_eq!(new_new_buf.len, 2);
1775        assert_eq!(new_new_buf.off, 13);
1776        assert!(new_new_buf.fin);
1777
1778        assert_eq!(new_new_buf.len(), 2);
1779        assert_eq!(new_new_buf.off(), 13);
1780        assert!(new_new_buf.fin());
1781
1782        assert_eq!(&new_new_buf[..], b"ld");
1783
1784        // Advance buffer.
1785        new_new_buf.consume(2);
1786
1787        assert_eq!(new_new_buf.start, 8);
1788        assert_eq!(new_new_buf.pos, 10);
1789        assert_eq!(new_new_buf.len, 2);
1790        assert_eq!(new_new_buf.off, 13);
1791        assert!(new_new_buf.fin);
1792
1793        assert_eq!(new_new_buf.len(), 0);
1794        assert_eq!(new_new_buf.off(), 15);
1795        assert!(new_new_buf.fin());
1796
1797        assert_eq!(&new_new_buf[..], b"");
1798    }
1799
1800    /// RFC9000 2.1: A stream ID that is used out of order results in all
1801    /// streams of that type with lower-numbered stream IDs also being opened.
1802    #[test]
1803    fn stream_limit_auto_open() {
1804        let local_tp = crate::TransportParams::default();
1805        let peer_tp = crate::TransportParams::default();
1806
1807        let mut streams = <StreamMap>::new(5, 5, 5);
1808
1809        let stream_id = 500;
1810        assert!(!is_local(stream_id, true), "stream id is peer initiated");
1811        assert!(is_bidi(stream_id), "stream id is bidirectional");
1812        assert_eq!(
1813            streams
1814                .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
1815                .err(),
1816            Some(Error::StreamLimit),
1817            "stream limit should be exceeded"
1818        );
1819    }
1820
1821    /// Stream limit should be satisfied regardless of what order we open
1822    /// streams
1823    #[test]
1824    fn stream_create_out_of_order() {
1825        let local_tp = crate::TransportParams::default();
1826        let peer_tp = crate::TransportParams::default();
1827
1828        let mut streams = <StreamMap>::new(5, 5, 5);
1829
1830        for stream_id in [8, 12, 4] {
1831            assert!(is_local(stream_id, false), "stream id is client initiated");
1832            assert!(is_bidi(stream_id), "stream id is bidirectional");
1833            assert!(streams
1834                .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
1835                .is_ok());
1836        }
1837    }
1838
1839    /// Check stream limit boundary cases
1840    #[test]
1841    fn stream_limit_edge() {
1842        let local_tp = crate::TransportParams::default();
1843        let peer_tp = crate::TransportParams::default();
1844
1845        let mut streams = <StreamMap>::new(3, 3, 3);
1846
1847        // Highest permitted
1848        let stream_id = 8;
1849        assert!(streams
1850            .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
1851            .is_ok());
1852
1853        // One more than highest permitted
1854        let stream_id = 12;
1855        assert_eq!(
1856            streams
1857                .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
1858                .err(),
1859            Some(Error::StreamLimit)
1860        );
1861    }
1862
1863    fn cycle_stream_priority(stream_id: u64, streams: &mut StreamMap) {
1864        let key = streams.get(stream_id).unwrap().priority_key.clone();
1865        streams.update_priority(&key.clone(), &key);
1866    }
1867
1868    #[test]
1869    fn writable_prioritized_default_priority() {
1870        let local_tp = crate::TransportParams::default();
1871        let peer_tp = crate::TransportParams {
1872            initial_max_stream_data_bidi_local: 100,
1873            initial_max_stream_data_uni: 100,
1874            ..Default::default()
1875        };
1876
1877        let mut streams = StreamMap::new(100, 100, 100);
1878
1879        for id in [0, 4, 8, 12] {
1880            assert!(streams
1881                .get_or_create(id, &local_tp, &peer_tp, false, true)
1882                .is_ok());
1883        }
1884
1885        let walk_1: Vec<u64> = streams.writable().collect();
1886        cycle_stream_priority(*walk_1.first().unwrap(), &mut streams);
1887        let walk_2: Vec<u64> = streams.writable().collect();
1888        cycle_stream_priority(*walk_2.first().unwrap(), &mut streams);
1889        let walk_3: Vec<u64> = streams.writable().collect();
1890        cycle_stream_priority(*walk_3.first().unwrap(), &mut streams);
1891        let walk_4: Vec<u64> = streams.writable().collect();
1892        cycle_stream_priority(*walk_4.first().unwrap(), &mut streams);
1893        let walk_5: Vec<u64> = streams.writable().collect();
1894
1895        // All streams are non-incremental and same urgency by default. Multiple
1896        // visits shuffle their order.
1897        assert_eq!(walk_1, vec![0, 4, 8, 12]);
1898        assert_eq!(walk_2, vec![4, 8, 12, 0]);
1899        assert_eq!(walk_3, vec![8, 12, 0, 4]);
1900        assert_eq!(walk_4, vec![12, 0, 4, 8,]);
1901        assert_eq!(walk_5, vec![0, 4, 8, 12]);
1902    }
1903
1904    #[test]
1905    fn writable_prioritized_insert_order() {
1906        let local_tp = crate::TransportParams::default();
1907        let peer_tp = crate::TransportParams {
1908            initial_max_stream_data_bidi_local: 100,
1909            initial_max_stream_data_uni: 100,
1910            ..Default::default()
1911        };
1912
1913        let mut streams = StreamMap::new(100, 100, 100);
1914
1915        // Inserting same-urgency incremental streams in a "random" order yields
1916        // same order to start with.
1917        for id in [12, 4, 8, 0] {
1918            assert!(streams
1919                .get_or_create(id, &local_tp, &peer_tp, false, true)
1920                .is_ok());
1921        }
1922
1923        let walk_1: Vec<u64> = streams.writable().collect();
1924        cycle_stream_priority(*walk_1.first().unwrap(), &mut streams);
1925        let walk_2: Vec<u64> = streams.writable().collect();
1926        cycle_stream_priority(*walk_2.first().unwrap(), &mut streams);
1927        let walk_3: Vec<u64> = streams.writable().collect();
1928        cycle_stream_priority(*walk_3.first().unwrap(), &mut streams);
1929        let walk_4: Vec<u64> = streams.writable().collect();
1930        cycle_stream_priority(*walk_4.first().unwrap(), &mut streams);
1931        let walk_5: Vec<u64> = streams.writable().collect();
1932        assert_eq!(walk_1, vec![12, 4, 8, 0]);
1933        assert_eq!(walk_2, vec![4, 8, 0, 12]);
1934        assert_eq!(walk_3, vec![8, 0, 12, 4,]);
1935        assert_eq!(walk_4, vec![0, 12, 4, 8]);
1936        assert_eq!(walk_5, vec![12, 4, 8, 0]);
1937    }
1938
1939    #[test]
1940    fn writable_prioritized_mixed_urgency() {
1941        let local_tp = crate::TransportParams::default();
1942        let peer_tp = crate::TransportParams {
1943            initial_max_stream_data_bidi_local: 100,
1944            initial_max_stream_data_uni: 100,
1945            ..Default::default()
1946        };
1947
1948        let mut streams = <StreamMap>::new(100, 100, 100);
1949
1950        // Streams where the urgency descends (becomes more important). No stream
1951        // shares an urgency.
1952        let input = vec![
1953            (0, 100),
1954            (4, 90),
1955            (8, 80),
1956            (12, 70),
1957            (16, 60),
1958            (20, 50),
1959            (24, 40),
1960            (28, 30),
1961            (32, 20),
1962            (36, 10),
1963            (40, 0),
1964        ];
1965
1966        for (id, urgency) in input.clone() {
1967            // this duplicates some code from stream_priority in order to access
1968            // streams and the collection they're in
1969            let stream = streams
1970                .get_or_create(id, &local_tp, &peer_tp, false, true)
1971                .unwrap();
1972
1973            stream.urgency = urgency;
1974
1975            let new_priority_key = Arc::new(StreamPriorityKey {
1976                urgency: stream.urgency,
1977                incremental: stream.incremental,
1978                id,
1979                ..Default::default()
1980            });
1981
1982            let old_priority_key = std::mem::replace(
1983                &mut stream.priority_key,
1984                new_priority_key.clone(),
1985            );
1986
1987            streams.update_priority(&old_priority_key, &new_priority_key);
1988        }
1989
1990        let walk_1: Vec<u64> = streams.writable().collect();
1991        assert_eq!(walk_1, vec![40, 36, 32, 28, 24, 20, 16, 12, 8, 4, 0]);
1992
1993        // Re-applying priority to a stream does not cause duplication.
1994        for (id, urgency) in input {
1995            // this duplicates some code from stream_priority in order to access
1996            // streams and the collection they're in
1997            let stream = streams
1998                .get_or_create(id, &local_tp, &peer_tp, false, true)
1999                .unwrap();
2000
2001            stream.urgency = urgency;
2002
2003            let new_priority_key = Arc::new(StreamPriorityKey {
2004                urgency: stream.urgency,
2005                incremental: stream.incremental,
2006                id,
2007                ..Default::default()
2008            });
2009
2010            let old_priority_key = std::mem::replace(
2011                &mut stream.priority_key,
2012                new_priority_key.clone(),
2013            );
2014
2015            streams.update_priority(&old_priority_key, &new_priority_key);
2016        }
2017
2018        let walk_2: Vec<u64> = streams.writable().collect();
2019        assert_eq!(walk_2, vec![40, 36, 32, 28, 24, 20, 16, 12, 8, 4, 0]);
2020
2021        // Removing streams doesn't break expected ordering.
2022        streams.collect(24, true);
2023
2024        let walk_3: Vec<u64> = streams.writable().collect();
2025        assert_eq!(walk_3, vec![40, 36, 32, 28, 20, 16, 12, 8, 4, 0]);
2026
2027        streams.collect(40, true);
2028        streams.collect(0, true);
2029
2030        let walk_4: Vec<u64> = streams.writable().collect();
2031        assert_eq!(walk_4, vec![36, 32, 28, 20, 16, 12, 8, 4]);
2032
2033        // Adding streams doesn't break expected ordering.
2034        streams
2035            .get_or_create(44, &local_tp, &peer_tp, false, true)
2036            .unwrap();
2037
2038        let walk_5: Vec<u64> = streams.writable().collect();
2039        assert_eq!(walk_5, vec![36, 32, 28, 20, 16, 12, 8, 4, 44]);
2040    }
2041
2042    #[test]
2043    fn writable_prioritized_mixed_urgencies_incrementals() {
2044        let local_tp = crate::TransportParams::default();
2045        let peer_tp = crate::TransportParams {
2046            initial_max_stream_data_bidi_local: 100,
2047            initial_max_stream_data_uni: 100,
2048            ..Default::default()
2049        };
2050
2051        let mut streams = StreamMap::new(100, 100, 100);
2052
2053        // Streams that share some urgency level
2054        let input = vec![
2055            (0, 100),
2056            (4, 20),
2057            (8, 100),
2058            (12, 20),
2059            (16, 90),
2060            (20, 25),
2061            (24, 90),
2062            (28, 30),
2063            (32, 80),
2064            (36, 20),
2065            (40, 0),
2066        ];
2067
2068        for (id, urgency) in input.clone() {
2069            // this duplicates some code from stream_priority in order to access
2070            // streams and the collection they're in
2071            let stream = streams
2072                .get_or_create(id, &local_tp, &peer_tp, false, true)
2073                .unwrap();
2074
2075            stream.urgency = urgency;
2076
2077            let new_priority_key = Arc::new(StreamPriorityKey {
2078                urgency: stream.urgency,
2079                incremental: stream.incremental,
2080                id,
2081                ..Default::default()
2082            });
2083
2084            let old_priority_key = std::mem::replace(
2085                &mut stream.priority_key,
2086                new_priority_key.clone(),
2087            );
2088
2089            streams.update_priority(&old_priority_key, &new_priority_key);
2090        }
2091
2092        let walk_1: Vec<u64> = streams.writable().collect();
2093        cycle_stream_priority(4, &mut streams);
2094        cycle_stream_priority(16, &mut streams);
2095        cycle_stream_priority(0, &mut streams);
2096        let walk_2: Vec<u64> = streams.writable().collect();
2097        cycle_stream_priority(12, &mut streams);
2098        cycle_stream_priority(24, &mut streams);
2099        cycle_stream_priority(8, &mut streams);
2100        let walk_3: Vec<u64> = streams.writable().collect();
2101        cycle_stream_priority(36, &mut streams);
2102        cycle_stream_priority(16, &mut streams);
2103        cycle_stream_priority(0, &mut streams);
2104        let walk_4: Vec<u64> = streams.writable().collect();
2105        cycle_stream_priority(4, &mut streams);
2106        cycle_stream_priority(24, &mut streams);
2107        cycle_stream_priority(8, &mut streams);
2108        let walk_5: Vec<u64> = streams.writable().collect();
2109        cycle_stream_priority(12, &mut streams);
2110        cycle_stream_priority(16, &mut streams);
2111        cycle_stream_priority(0, &mut streams);
2112        let walk_6: Vec<u64> = streams.writable().collect();
2113        cycle_stream_priority(36, &mut streams);
2114        cycle_stream_priority(24, &mut streams);
2115        cycle_stream_priority(8, &mut streams);
2116        let walk_7: Vec<u64> = streams.writable().collect();
2117        cycle_stream_priority(4, &mut streams);
2118        cycle_stream_priority(16, &mut streams);
2119        cycle_stream_priority(0, &mut streams);
2120        let walk_8: Vec<u64> = streams.writable().collect();
2121        cycle_stream_priority(12, &mut streams);
2122        cycle_stream_priority(24, &mut streams);
2123        cycle_stream_priority(8, &mut streams);
2124        let walk_9: Vec<u64> = streams.writable().collect();
2125        cycle_stream_priority(36, &mut streams);
2126        cycle_stream_priority(16, &mut streams);
2127        cycle_stream_priority(0, &mut streams);
2128
2129        assert_eq!(walk_1, vec![40, 4, 12, 36, 20, 28, 32, 16, 24, 0, 8]);
2130        assert_eq!(walk_2, vec![40, 12, 36, 4, 20, 28, 32, 24, 16, 8, 0]);
2131        assert_eq!(walk_3, vec![40, 36, 4, 12, 20, 28, 32, 16, 24, 0, 8]);
2132        assert_eq!(walk_4, vec![40, 4, 12, 36, 20, 28, 32, 24, 16, 8, 0]);
2133        assert_eq!(walk_5, vec![40, 12, 36, 4, 20, 28, 32, 16, 24, 0, 8]);
2134        assert_eq!(walk_6, vec![40, 36, 4, 12, 20, 28, 32, 24, 16, 8, 0]);
2135        assert_eq!(walk_7, vec![40, 4, 12, 36, 20, 28, 32, 16, 24, 0, 8]);
2136        assert_eq!(walk_8, vec![40, 12, 36, 4, 20, 28, 32, 24, 16, 8, 0]);
2137        assert_eq!(walk_9, vec![40, 36, 4, 12, 20, 28, 32, 16, 24, 0, 8]);
2138
2139        // Removing streams doesn't break expected ordering.
2140        streams.collect(20, true);
2141
2142        let walk_10: Vec<u64> = streams.writable().collect();
2143        assert_eq!(walk_10, vec![40, 4, 12, 36, 28, 32, 24, 16, 8, 0]);
2144
2145        // Adding streams doesn't break expected ordering.
2146        let stream = streams
2147            .get_or_create(44, &local_tp, &peer_tp, false, true)
2148            .unwrap();
2149
2150        stream.urgency = 20;
2151        stream.incremental = true;
2152
2153        let new_priority_key = Arc::new(StreamPriorityKey {
2154            urgency: stream.urgency,
2155            incremental: stream.incremental,
2156            id: 44,
2157            ..Default::default()
2158        });
2159
2160        let old_priority_key =
2161            std::mem::replace(&mut stream.priority_key, new_priority_key.clone());
2162
2163        streams.update_priority(&old_priority_key, &new_priority_key);
2164
2165        let walk_11: Vec<u64> = streams.writable().collect();
2166        assert_eq!(walk_11, vec![40, 4, 12, 36, 44, 28, 32, 24, 16, 8, 0]);
2167    }
2168
2169    #[test]
2170    fn priority_tree_dupes() {
2171        let mut prioritized_writable: RBTree<StreamWritablePriorityAdapter> =
2172            Default::default();
2173
2174        for id in [0, 4, 8, 12] {
2175            let s = Arc::new(StreamPriorityKey {
2176                urgency: 0,
2177                incremental: false,
2178                id,
2179                ..Default::default()
2180            });
2181
2182            prioritized_writable.insert(s);
2183        }
2184
2185        let walk_1: Vec<u64> =
2186            prioritized_writable.iter().map(|s| s.id).collect();
2187        assert_eq!(walk_1, vec![0, 4, 8, 12]);
2188
2189        // Default keys could cause duplicate entries, this is normally protected
2190        // against via StreamMap.
2191        for id in [0, 4, 8, 12] {
2192            let s = Arc::new(StreamPriorityKey {
2193                urgency: 0,
2194                incremental: false,
2195                id,
2196                ..Default::default()
2197            });
2198
2199            prioritized_writable.insert(s);
2200        }
2201
2202        let walk_2: Vec<u64> =
2203            prioritized_writable.iter().map(|s| s.id).collect();
2204        assert_eq!(walk_2, vec![0, 0, 4, 4, 8, 8, 12, 12]);
2205    }
2206}
2207
2208mod recv_buf;
2209mod send_buf;