Skip to main content

quiche/stream/
mod.rs

1// Copyright (C) 2018-2019, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27use std::cmp;
28
29use std::sync::Arc;
30
31use std::collections::hash_map;
32use std::collections::HashMap;
33use std::collections::HashSet;
34
35use intrusive_collections::intrusive_adapter;
36use intrusive_collections::KeyAdapter;
37use intrusive_collections::RBTree;
38use intrusive_collections::RBTreeAtomicLink;
39
40use smallvec::SmallVec;
41
42use crate::range_buf::DefaultBufFactory;
43use crate::BufFactory;
44use crate::Error;
45use crate::Result;
46
47const DEFAULT_URGENCY: u8 = 127;
48
49// The default size of the receiver stream flow control window.
50const DEFAULT_STREAM_WINDOW: u64 = 32 * 1024;
51
52/// The maximum size of the receiver stream flow control window.
53pub const MAX_STREAM_WINDOW: u64 = 16 * 1024 * 1024;
54
55/// A simple no-op hasher for Stream IDs.
56///
57/// The QUIC protocol and quiche library guarantees stream ID uniqueness, so
58/// we can save effort by avoiding using a more complicated algorithm.
59#[derive(Default)]
60pub struct StreamIdHasher {
61    id: u64,
62}
63
64/// Return value type of `RecvBuf::reset()`
65#[derive(Debug, PartialEq, Clone, Copy)]
66pub struct RecvBufResetReturn {
67    /// Returns the difference between the previous max_data offset
68    /// received and the final size reported by the reset
69    pub max_data_delta: u64,
70
71    /// The amount of flow control credit that should be returned to the
72    /// connection level flow control.
73    pub consumed_flowcontrol: u64,
74}
75
76impl RecvBufResetReturn {
77    pub fn zero() -> Self {
78        Self {
79            max_data_delta: 0,
80            consumed_flowcontrol: 0,
81        }
82    }
83}
84
85/// Action to perform when reading from a stream's receive buffer.
86pub enum RecvAction<'a> {
87    /// Emit data by copying it into the provided buffer.
88    Emit { out: &'a mut [u8] },
89    /// Discard up to the specified number of bytes without copying.
90    Discard { len: usize },
91}
92
93impl std::hash::Hasher for StreamIdHasher {
94    #[inline]
95    fn finish(&self) -> u64 {
96        self.id
97    }
98
99    #[inline]
100    fn write_u64(&mut self, id: u64) {
101        self.id = id;
102    }
103
104    #[inline]
105    fn write(&mut self, _: &[u8]) {
106        // We need a default write() for the trait but stream IDs will always
107        // be a u64 so we just delegate to write_u64.
108        unimplemented!()
109    }
110}
111
112type BuildStreamIdHasher = std::hash::BuildHasherDefault<StreamIdHasher>;
113
114pub type StreamIdHashMap<V> = HashMap<u64, V, BuildStreamIdHasher>;
115pub type StreamIdHashSet = HashSet<u64, BuildStreamIdHasher>;
116
117/// Keeps track of QUIC streams and enforces stream limits.
118#[derive(Default)]
119pub struct StreamMap<F: BufFactory = DefaultBufFactory> {
120    /// Map of streams indexed by stream ID.
121    streams: StreamIdHashMap<Stream<F>>,
122
123    /// Set of streams that were completed and garbage collected.
124    ///
125    /// Instead of keeping the full stream state forever, we collect completed
126    /// streams to save memory, but we still need to keep track of previously
127    /// created streams, to prevent peers from re-creating them.
128    collected: StreamIdHashSet,
129
130    /// Peer's maximum bidirectional stream count limit.
131    peer_max_streams_bidi: u64,
132
133    /// Peer's maximum unidirectional stream count limit.
134    peer_max_streams_uni: u64,
135
136    /// The total number of bidirectional streams opened by the peer.
137    peer_opened_streams_bidi: u64,
138
139    /// The total number of unidirectional streams opened by the peer.
140    peer_opened_streams_uni: u64,
141
142    /// Local maximum bidirectional stream count limit.
143    local_max_streams_bidi: u64,
144    local_max_streams_bidi_next: u64,
145
146    /// Local maximum unidirectional stream count limit.
147    local_max_streams_uni: u64,
148    local_max_streams_uni_next: u64,
149
150    /// The total number of bidirectional streams opened by the local endpoint.
151    local_opened_streams_bidi: u64,
152
153    /// The total number of unidirectional streams opened by the local endpoint.
154    local_opened_streams_uni: u64,
155
156    /// Queue of stream IDs corresponding to streams that have buffered data
157    /// ready to be sent to the peer. This also implies that the stream has
158    /// enough flow control credits to send at least some of that data.
159    flushable: RBTree<StreamFlushablePriorityAdapter>,
160
161    /// Set of stream IDs corresponding to streams that have outstanding data
162    /// to read. This is used to generate a `StreamIter` of streams without
163    /// having to iterate over the full list of streams.
164    pub readable: RBTree<StreamReadablePriorityAdapter>,
165
166    /// Set of stream IDs corresponding to streams that have enough flow control
167    /// capacity to be written to, and is not finished. This is used to generate
168    /// a `StreamIter` of streams without having to iterate over the full list
169    /// of streams.
170    pub writable: RBTree<StreamWritablePriorityAdapter>,
171
172    /// Set of stream IDs corresponding to streams that are almost out of flow
173    /// control credit and need to send MAX_STREAM_DATA. This is used to
174    /// generate a `StreamIter` of streams without having to iterate over the
175    /// full list of streams.
176    almost_full: StreamIdHashSet,
177
178    /// Set of stream IDs corresponding to streams that are blocked. The value
179    /// of the map elements represents the offset of the stream at which the
180    /// blocking occurred.
181    blocked: StreamIdHashMap<u64>,
182
183    /// Set of stream IDs corresponding to streams that are reset. The value
184    /// of the map elements is a tuple of the error code and final size values
185    /// to include in the RESET_STREAM frame.
186    reset: StreamIdHashMap<(u64, u64)>,
187
188    /// Set of stream IDs corresponding to streams that are shutdown on the
189    /// receive side, and need to send a STOP_SENDING frame. The value of the
190    /// map elements is the error code to include in the STOP_SENDING frame.
191    stopped: StreamIdHashMap<u64>,
192
193    /// The maximum size of a stream window.
194    max_stream_window: u64,
195}
196
197impl<F: BufFactory> StreamMap<F> {
198    pub fn new(
199        max_streams_bidi: u64, max_streams_uni: u64, max_stream_window: u64,
200    ) -> Self {
201        StreamMap {
202            local_max_streams_bidi: max_streams_bidi,
203            local_max_streams_bidi_next: max_streams_bidi,
204
205            local_max_streams_uni: max_streams_uni,
206            local_max_streams_uni_next: max_streams_uni,
207
208            max_stream_window,
209
210            ..StreamMap::default()
211        }
212    }
213
214    /// Returns the stream with the given ID if it exists.
215    pub fn get(&self, id: u64) -> Option<&Stream<F>> {
216        self.streams.get(&id)
217    }
218
219    /// Returns the mutable stream with the given ID if it exists.
220    pub fn get_mut(&mut self, id: u64) -> Option<&mut Stream<F>> {
221        self.streams.get_mut(&id)
222    }
223
224    /// Returns the mutable stream with the given ID if it exists, or creates
225    /// a new one otherwise.
226    ///
227    /// The `local` parameter indicates whether the stream's creation was
228    /// requested by the local application rather than the peer, and is
229    /// used to validate the requested stream ID, and to select the initial
230    /// flow control values from the local and remote transport parameters
231    /// (also passed as arguments).
232    ///
233    /// This also takes care of enforcing both local and the peer's stream
234    /// count limits. If one of these limits is violated, the `StreamLimit`
235    /// error is returned.
236    pub(crate) fn get_or_create(
237        &mut self, id: u64, local_params: &crate::TransportParams,
238        peer_params: &crate::TransportParams, local: bool, is_server: bool,
239    ) -> Result<&mut Stream<F>> {
240        let (stream, is_new_and_writable) = match self.streams.entry(id) {
241            hash_map::Entry::Vacant(v) => {
242                // Stream has already been closed and garbage collected.
243                if self.collected.contains(&id) {
244                    return Err(Error::Done);
245                }
246
247                if local != is_local(id, is_server) {
248                    return Err(Error::InvalidStreamState(id));
249                }
250
251                let (max_rx_data, max_tx_data) = match (local, is_bidi(id)) {
252                    // Locally-initiated bidirectional stream.
253                    (true, true) => (
254                        local_params.initial_max_stream_data_bidi_local,
255                        peer_params.initial_max_stream_data_bidi_remote,
256                    ),
257
258                    // Locally-initiated unidirectional stream.
259                    (true, false) => (0, peer_params.initial_max_stream_data_uni),
260
261                    // Remotely-initiated bidirectional stream.
262                    (false, true) => (
263                        local_params.initial_max_stream_data_bidi_remote,
264                        peer_params.initial_max_stream_data_bidi_local,
265                    ),
266
267                    // Remotely-initiated unidirectional stream.
268                    (false, false) =>
269                        (local_params.initial_max_stream_data_uni, 0),
270                };
271
272                // The two least significant bits from a stream id identify the
273                // type of stream. Truncate those bits to get the sequence for
274                // that stream type.
275                let stream_sequence = id >> 2;
276
277                // Enforce stream count limits.
278                match (is_local(id, is_server), is_bidi(id)) {
279                    (true, true) => {
280                        let n = cmp::max(
281                            self.local_opened_streams_bidi,
282                            stream_sequence + 1,
283                        );
284
285                        if n > self.peer_max_streams_bidi {
286                            return Err(Error::StreamLimit);
287                        }
288
289                        self.local_opened_streams_bidi = n;
290                    },
291
292                    (true, false) => {
293                        let n = cmp::max(
294                            self.local_opened_streams_uni,
295                            stream_sequence + 1,
296                        );
297
298                        if n > self.peer_max_streams_uni {
299                            return Err(Error::StreamLimit);
300                        }
301
302                        self.local_opened_streams_uni = n;
303                    },
304
305                    (false, true) => {
306                        let n = cmp::max(
307                            self.peer_opened_streams_bidi,
308                            stream_sequence + 1,
309                        );
310
311                        if n > self.local_max_streams_bidi {
312                            return Err(Error::StreamLimit);
313                        }
314
315                        self.peer_opened_streams_bidi = n;
316                    },
317
318                    (false, false) => {
319                        let n = cmp::max(
320                            self.peer_opened_streams_uni,
321                            stream_sequence + 1,
322                        );
323
324                        if n > self.local_max_streams_uni {
325                            return Err(Error::StreamLimit);
326                        }
327
328                        self.peer_opened_streams_uni = n;
329                    },
330                };
331
332                let s = Stream::new(
333                    id,
334                    max_rx_data,
335                    max_tx_data,
336                    is_bidi(id),
337                    local,
338                    self.max_stream_window,
339                );
340
341                let is_writable = s.is_writable();
342
343                (v.insert(s), is_writable)
344            },
345
346            hash_map::Entry::Occupied(v) => (v.into_mut(), false),
347        };
348
349        // Newly created stream might already be writable due to initial flow
350        // control limits.
351        if is_new_and_writable {
352            self.writable.insert(Arc::clone(&stream.priority_key));
353        }
354
355        Ok(stream)
356    }
357
358    /// Adds the stream ID to the readable streams set.
359    ///
360    /// If the stream was already in the list, this does nothing.
361    pub fn insert_readable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
362        if !priority_key.readable.is_linked() {
363            self.readable.insert(Arc::clone(priority_key));
364        }
365    }
366
367    /// Removes the stream ID from the readable streams set.
368    pub fn remove_readable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
369        if !priority_key.readable.is_linked() {
370            return;
371        }
372
373        let mut c = {
374            let ptr = Arc::as_ptr(priority_key);
375            unsafe { self.readable.cursor_mut_from_ptr(ptr) }
376        };
377
378        c.remove();
379    }
380
381    /// Adds the stream ID to the writable streams set.
382    ///
383    /// This should also be called anytime a new stream is created, in addition
384    /// to when an existing stream becomes writable.
385    ///
386    /// If the stream was already in the list, this does nothing.
387    pub fn insert_writable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
388        if !priority_key.writable.is_linked() {
389            self.writable.insert(Arc::clone(priority_key));
390        }
391    }
392
393    /// Removes the stream ID from the writable streams set.
394    ///
395    /// This should also be called anytime an existing stream stops being
396    /// writable.
397    pub fn remove_writable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
398        if !priority_key.writable.is_linked() {
399            return;
400        }
401
402        let mut c = {
403            let ptr = Arc::as_ptr(priority_key);
404            unsafe { self.writable.cursor_mut_from_ptr(ptr) }
405        };
406
407        c.remove();
408    }
409
410    /// Adds the stream ID to the flushable streams set.
411    ///
412    /// If the stream was already in the list, this does nothing.
413    pub fn insert_flushable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
414        if !priority_key.flushable.is_linked() {
415            self.flushable.insert(Arc::clone(priority_key));
416        }
417    }
418
419    /// Removes the stream ID from the flushable streams set.
420    pub fn remove_flushable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
421        if !priority_key.flushable.is_linked() {
422            return;
423        }
424
425        let mut c = {
426            let ptr = Arc::as_ptr(priority_key);
427            unsafe { self.flushable.cursor_mut_from_ptr(ptr) }
428        };
429
430        c.remove();
431    }
432
433    pub fn peek_flushable(&self) -> Option<Arc<StreamPriorityKey>> {
434        self.flushable.front().clone_pointer()
435    }
436
437    /// Updates the priorities of a stream.
438    pub fn update_priority(
439        &mut self, old: &Arc<StreamPriorityKey>, new: &Arc<StreamPriorityKey>,
440    ) {
441        if old.readable.is_linked() {
442            self.remove_readable(old);
443            self.readable.insert(Arc::clone(new));
444        }
445
446        if old.writable.is_linked() {
447            self.remove_writable(old);
448            self.writable.insert(Arc::clone(new));
449        }
450
451        if old.flushable.is_linked() {
452            self.remove_flushable(old);
453            self.flushable.insert(Arc::clone(new));
454        }
455    }
456
457    /// Adds the stream ID to the almost full streams set.
458    ///
459    /// If the stream was already in the list, this does nothing.
460    pub fn insert_almost_full(&mut self, stream_id: u64) {
461        self.almost_full.insert(stream_id);
462    }
463
464    /// Removes the stream ID from the almost full streams set.
465    pub fn remove_almost_full(&mut self, stream_id: u64) {
466        self.almost_full.remove(&stream_id);
467    }
468
469    /// Adds the stream ID to the blocked streams set with the
470    /// given offset value.
471    ///
472    /// If the stream was already in the list, this does nothing.
473    pub fn insert_blocked(&mut self, stream_id: u64, off: u64) {
474        self.blocked.insert(stream_id, off);
475    }
476
477    /// Removes the stream ID from the blocked streams set.
478    pub fn remove_blocked(&mut self, stream_id: u64) {
479        self.blocked.remove(&stream_id);
480    }
481
482    /// Adds the stream ID to the reset streams set with the
483    /// given error code and final size values.
484    ///
485    /// If the stream was already in the list, this does nothing.
486    pub fn insert_reset(
487        &mut self, stream_id: u64, error_code: u64, final_size: u64,
488    ) {
489        self.reset.insert(stream_id, (error_code, final_size));
490    }
491
492    /// Removes the stream ID from the reset streams set.
493    pub fn remove_reset(&mut self, stream_id: u64) {
494        self.reset.remove(&stream_id);
495    }
496
497    /// Adds the stream ID to the stopped streams set with the
498    /// given error code.
499    ///
500    /// If the stream was already in the list, this does nothing.
501    pub fn insert_stopped(&mut self, stream_id: u64, error_code: u64) {
502        self.stopped.insert(stream_id, error_code);
503    }
504
505    /// Removes the stream ID from the stopped streams set.
506    pub fn remove_stopped(&mut self, stream_id: u64) {
507        self.stopped.remove(&stream_id);
508    }
509
510    /// Updates the peer's maximum bidirectional stream count limit.
511    pub fn update_peer_max_streams_bidi(&mut self, v: u64) {
512        self.peer_max_streams_bidi = cmp::max(self.peer_max_streams_bidi, v);
513    }
514
515    /// Updates the peer's maximum unidirectional stream count limit.
516    pub fn update_peer_max_streams_uni(&mut self, v: u64) {
517        self.peer_max_streams_uni = cmp::max(self.peer_max_streams_uni, v);
518    }
519
520    /// Commits the new max_streams_bidi limit.
521    pub fn update_max_streams_bidi(&mut self) {
522        self.local_max_streams_bidi = self.local_max_streams_bidi_next;
523    }
524
525    /// Sets the max_streams_bidi limit to the given value.
526    pub fn set_max_streams_bidi(&mut self, max: u64) {
527        self.local_max_streams_bidi = max;
528        self.local_max_streams_bidi_next = max;
529    }
530
531    /// Returns the current max_streams_bidi limit.
532    pub fn max_streams_bidi(&self) -> u64 {
533        self.local_max_streams_bidi
534    }
535
536    /// Returns the new max_streams_bidi limit.
537    pub fn max_streams_bidi_next(&mut self) -> u64 {
538        self.local_max_streams_bidi_next
539    }
540
541    /// Commits the new max_streams_uni limit.
542    pub fn update_max_streams_uni(&mut self) {
543        self.local_max_streams_uni = self.local_max_streams_uni_next;
544    }
545
546    /// Returns the new max_streams_uni limit.
547    pub fn max_streams_uni_next(&mut self) -> u64 {
548        self.local_max_streams_uni_next
549    }
550
551    /// Returns the number of bidirectional streams that can be created
552    /// before the peer's stream count limit is reached.
553    pub fn peer_streams_left_bidi(&self) -> u64 {
554        self.peer_max_streams_bidi - self.local_opened_streams_bidi
555    }
556
557    /// Returns the number of unidirectional streams that can be created
558    /// before the peer's stream count limit is reached.
559    pub fn peer_streams_left_uni(&self) -> u64 {
560        self.peer_max_streams_uni - self.local_opened_streams_uni
561    }
562
563    /// Drops completed stream.
564    ///
565    /// This should only be called when Stream::is_complete() returns true for
566    /// the given stream.
567    pub fn collect(&mut self, stream_id: u64, local: bool) {
568        if !local {
569            // If the stream was created by the peer, give back a max streams
570            // credit.
571            if is_bidi(stream_id) {
572                self.local_max_streams_bidi_next =
573                    self.local_max_streams_bidi_next.saturating_add(1);
574            } else {
575                self.local_max_streams_uni_next =
576                    self.local_max_streams_uni_next.saturating_add(1);
577            }
578        }
579
580        let s = self.streams.remove(&stream_id).unwrap();
581
582        self.remove_readable(&s.priority_key);
583
584        self.remove_writable(&s.priority_key);
585
586        self.remove_flushable(&s.priority_key);
587
588        self.collected.insert(stream_id);
589    }
590
591    /// Creates an iterator over streams that have outstanding data to read.
592    pub fn readable(&self) -> StreamIter {
593        StreamIter {
594            streams: self.readable.iter().map(|s| s.id).collect(),
595            index: 0,
596        }
597    }
598
599    /// Creates an iterator over streams that can be written to.
600    pub fn writable(&self) -> StreamIter {
601        StreamIter {
602            streams: self.writable.iter().map(|s| s.id).collect(),
603            index: 0,
604        }
605    }
606
607    /// Creates an iterator over streams that need to send MAX_STREAM_DATA.
608    pub fn almost_full(&self) -> StreamIter {
609        StreamIter::from(&self.almost_full)
610    }
611
612    /// Creates an iterator over streams that need to send STREAM_DATA_BLOCKED.
613    pub fn blocked(&self) -> hash_map::Iter<'_, u64, u64> {
614        self.blocked.iter()
615    }
616
617    /// Creates an iterator over streams that need to send RESET_STREAM.
618    pub fn reset(&self) -> hash_map::Iter<'_, u64, (u64, u64)> {
619        self.reset.iter()
620    }
621
622    /// Creates an iterator over streams that need to send STOP_SENDING.
623    pub fn stopped(&self) -> hash_map::Iter<'_, u64, u64> {
624        self.stopped.iter()
625    }
626
627    /// Returns true if the stream has been collected.
628    pub fn is_collected(&self, stream_id: u64) -> bool {
629        self.collected.contains(&stream_id)
630    }
631
632    /// Returns true if there are any streams that have data to write.
633    pub fn has_flushable(&self) -> bool {
634        !self.flushable.is_empty()
635    }
636
637    /// Returns true if there are any streams that have data to read.
638    pub fn has_readable(&self) -> bool {
639        !self.readable.is_empty()
640    }
641
642    /// Returns true if there are any streams that need to update the local
643    /// flow control limit.
644    pub fn has_almost_full(&self) -> bool {
645        !self.almost_full.is_empty()
646    }
647
648    /// Returns true if there are any streams that are blocked.
649    pub fn has_blocked(&self) -> bool {
650        !self.blocked.is_empty()
651    }
652
653    /// Returns true if there are any streams that are reset.
654    pub fn has_reset(&self) -> bool {
655        !self.reset.is_empty()
656    }
657
658    /// Returns true if there are any streams that need to send STOP_SENDING.
659    pub fn has_stopped(&self) -> bool {
660        !self.stopped.is_empty()
661    }
662
663    /// Returns true if the max bidirectional streams count needs to be updated
664    /// by sending a MAX_STREAMS frame to the peer.
665    pub fn should_update_max_streams_bidi(&self) -> bool {
666        self.local_max_streams_bidi_next != self.local_max_streams_bidi &&
667            self.local_max_streams_bidi_next / 2 >
668                self.local_max_streams_bidi - self.peer_opened_streams_bidi
669    }
670
671    /// Returns true if the max unidirectional streams count needs to be updated
672    /// by sending a MAX_STREAMS frame to the peer.
673    pub fn should_update_max_streams_uni(&self) -> bool {
674        self.local_max_streams_uni_next != self.local_max_streams_uni &&
675            self.local_max_streams_uni_next / 2 >
676                self.local_max_streams_uni - self.peer_opened_streams_uni
677    }
678
679    /// Returns the number of active streams in the map.
680    #[cfg(test)]
681    pub fn len(&self) -> usize {
682        self.streams.len()
683    }
684}
685
686/// A QUIC stream.
687pub struct Stream<F: BufFactory = DefaultBufFactory> {
688    /// Receive-side stream buffer.
689    pub recv: recv_buf::RecvBuf,
690
691    /// Send-side stream buffer.
692    pub send: send_buf::SendBuf<F>,
693
694    pub send_lowat: usize,
695
696    /// Whether the stream is bidirectional.
697    pub bidi: bool,
698
699    /// Whether the stream was created by the local endpoint.
700    pub local: bool,
701
702    /// The stream's urgency (lower is better). Default is `DEFAULT_URGENCY`.
703    pub urgency: u8,
704
705    /// Whether the stream can be flushed incrementally. Default is `true`.
706    pub incremental: bool,
707
708    pub priority_key: Arc<StreamPriorityKey>,
709}
710
711impl<F: BufFactory> Stream<F> {
712    /// Creates a new stream with the given flow control limits.
713    pub fn new(
714        id: u64, max_rx_data: u64, max_tx_data: u64, bidi: bool, local: bool,
715        max_window: u64,
716    ) -> Self {
717        let priority_key = Arc::new(StreamPriorityKey {
718            id,
719            ..Default::default()
720        });
721
722        Stream {
723            recv: recv_buf::RecvBuf::new(max_rx_data, max_window),
724            send: send_buf::SendBuf::new(max_tx_data),
725            send_lowat: 1,
726            bidi,
727            local,
728            urgency: priority_key.urgency,
729            incremental: priority_key.incremental,
730            priority_key,
731        }
732    }
733
734    /// Returns true if the stream has data to read.
735    pub fn is_readable(&self) -> bool {
736        self.recv.ready()
737    }
738
739    /// Returns true if the stream has enough flow control capacity to be
740    /// written to, and is not finished.
741    pub fn is_writable(&self) -> bool {
742        !self.send.is_shutdown() &&
743            !self.send.is_fin() &&
744            (self.send.off_back() + self.send_lowat as u64) <
745                self.send.max_off()
746    }
747
748    /// Returns true if the stream has data to send and is allowed to send at
749    /// least some of it.
750    pub fn is_flushable(&self) -> bool {
751        let off_front = self.send.off_front();
752
753        !self.send.is_empty() &&
754            off_front < self.send.off_back() &&
755            off_front < self.send.max_off()
756    }
757
758    /// Returns true if the stream is complete.
759    ///
760    /// For bidirectional streams this happens when both the receive and send
761    /// sides are complete. That is when all incoming data has been read by the
762    /// application, and when all outgoing data has been acked by the peer.
763    ///
764    /// For unidirectional streams this happens when either the receive or send
765    /// side is complete, depending on whether the stream was created locally
766    /// or not.
767    pub fn is_complete(&self) -> bool {
768        match (self.bidi, self.local) {
769            // For bidirectional streams we need to check both receive and send
770            // sides for completion.
771            (true, _) => self.recv.is_fin() && self.send.is_complete(),
772
773            // For unidirectional streams generated locally, we only need to
774            // check the send side for completion.
775            (false, true) => self.send.is_complete(),
776
777            // For unidirectional streams generated by the peer, we only need
778            // to check the receive side for completion.
779            (false, false) => self.recv.is_fin(),
780        }
781    }
782}
783
784/// Returns true if the stream was created locally.
785pub fn is_local(stream_id: u64, is_server: bool) -> bool {
786    (stream_id & 0x1) == (is_server as u64)
787}
788
789/// Returns true if the stream is bidirectional.
790pub fn is_bidi(stream_id: u64) -> bool {
791    (stream_id & 0x2) == 0
792}
793
794#[derive(Clone, Debug)]
795pub struct StreamPriorityKey {
796    pub urgency: u8,
797    pub incremental: bool,
798    pub id: u64,
799
800    pub readable: RBTreeAtomicLink,
801    pub writable: RBTreeAtomicLink,
802    pub flushable: RBTreeAtomicLink,
803}
804
805impl Default for StreamPriorityKey {
806    fn default() -> Self {
807        Self {
808            urgency: DEFAULT_URGENCY,
809            incremental: true,
810            id: Default::default(),
811            readable: Default::default(),
812            writable: Default::default(),
813            flushable: Default::default(),
814        }
815    }
816}
817
818impl PartialEq for StreamPriorityKey {
819    fn eq(&self, other: &Self) -> bool {
820        self.id == other.id
821    }
822}
823
824impl Eq for StreamPriorityKey {}
825
826impl PartialOrd for StreamPriorityKey {
827    fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
828        Some(self.cmp(other))
829    }
830}
831
832impl Ord for StreamPriorityKey {
833    fn cmp(&self, other: &Self) -> cmp::Ordering {
834        // Ignore priority if ID matches.
835        if self.id == other.id {
836            return cmp::Ordering::Equal;
837        }
838
839        // First, order by urgency...
840        if self.urgency != other.urgency {
841            return self.urgency.cmp(&other.urgency);
842        }
843
844        // ...when the urgency is the same, and both are not incremental, order
845        // by stream ID...
846        if !self.incremental && !other.incremental {
847            return self.id.cmp(&other.id);
848        }
849
850        // ...non-incremental takes priority over incremental...
851        if self.incremental && !other.incremental {
852            return cmp::Ordering::Greater;
853        }
854        if !self.incremental && other.incremental {
855            return cmp::Ordering::Less;
856        }
857
858        // ...finally, when both are incremental, `other` takes precedence (so
859        // `self` is always sorted after other same-urgency incremental
860        // entries).
861        cmp::Ordering::Greater
862    }
863}
864
865intrusive_adapter!(pub StreamWritablePriorityAdapter = Arc<StreamPriorityKey>: StreamPriorityKey { writable: RBTreeAtomicLink });
866
867impl KeyAdapter<'_> for StreamWritablePriorityAdapter {
868    type Key = StreamPriorityKey;
869
870    fn get_key(&self, s: &StreamPriorityKey) -> Self::Key {
871        s.clone()
872    }
873}
874
875intrusive_adapter!(pub StreamReadablePriorityAdapter = Arc<StreamPriorityKey>: StreamPriorityKey { readable: RBTreeAtomicLink });
876
877impl KeyAdapter<'_> for StreamReadablePriorityAdapter {
878    type Key = StreamPriorityKey;
879
880    fn get_key(&self, s: &StreamPriorityKey) -> Self::Key {
881        s.clone()
882    }
883}
884
885intrusive_adapter!(pub StreamFlushablePriorityAdapter = Arc<StreamPriorityKey>: StreamPriorityKey { flushable: RBTreeAtomicLink });
886
887impl KeyAdapter<'_> for StreamFlushablePriorityAdapter {
888    type Key = StreamPriorityKey;
889
890    fn get_key(&self, s: &StreamPriorityKey) -> Self::Key {
891        s.clone()
892    }
893}
894
895/// An iterator over QUIC streams.
896#[derive(Default)]
897pub struct StreamIter {
898    streams: SmallVec<[u64; 8]>,
899    index: usize,
900}
901
902impl StreamIter {
903    #[inline]
904    fn from(streams: &StreamIdHashSet) -> Self {
905        StreamIter {
906            streams: streams.iter().copied().collect(),
907            index: 0,
908        }
909    }
910}
911
912impl Iterator for StreamIter {
913    type Item = u64;
914
915    #[inline]
916    fn next(&mut self) -> Option<Self::Item> {
917        let v = self.streams.get(self.index)?;
918        self.index += 1;
919        Some(*v)
920    }
921}
922
923impl ExactSizeIterator for StreamIter {
924    #[inline]
925    fn len(&self) -> usize {
926        self.streams.len() - self.index
927    }
928}
929
930#[cfg(test)]
931mod tests {
932    use crate::range_buf::RangeBuf;
933
934    use super::*;
935
936    #[test]
937    fn recv_flow_control() {
938        let mut stream =
939            <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
940        assert!(!stream.recv.almost_full());
941
942        let mut buf = [0; 32];
943
944        let first = RangeBuf::from(b"hello", 0, false);
945        let second = RangeBuf::from(b"world", 5, false);
946        let third = RangeBuf::from(b"something", 10, false);
947
948        assert_eq!(stream.recv.write(second), Ok(()));
949        assert_eq!(stream.recv.write(first), Ok(()));
950        assert!(!stream.recv.almost_full());
951
952        assert_eq!(stream.recv.write(third), Err(Error::FlowControl));
953
954        let (len, fin) = stream.recv.emit(&mut buf).unwrap();
955        assert_eq!(&buf[..len], b"helloworld");
956        assert!(!fin);
957
958        assert!(stream.recv.almost_full());
959
960        stream.recv.update_max_data(std::time::Instant::now());
961        assert_eq!(stream.recv.max_data_next(), 25);
962        assert!(!stream.recv.almost_full());
963
964        let third = RangeBuf::from(b"something", 10, false);
965        assert_eq!(stream.recv.write(third), Ok(()));
966    }
967
968    #[test]
969    fn recv_past_fin() {
970        let mut stream =
971            <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
972        assert!(!stream.recv.almost_full());
973
974        let first = RangeBuf::from(b"hello", 0, true);
975        let second = RangeBuf::from(b"world", 5, false);
976
977        assert_eq!(stream.recv.write(first), Ok(()));
978        assert_eq!(stream.recv.write(second), Err(Error::FinalSize));
979    }
980
981    #[test]
982    fn recv_fin_dup() {
983        let mut stream =
984            <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
985        assert!(!stream.recv.almost_full());
986
987        let first = RangeBuf::from(b"hello", 0, true);
988        let second = RangeBuf::from(b"hello", 0, true);
989
990        assert_eq!(stream.recv.write(first), Ok(()));
991        assert_eq!(stream.recv.write(second), Ok(()));
992
993        let mut buf = [0; 32];
994
995        let (len, fin) = stream.recv.emit(&mut buf).unwrap();
996        assert_eq!(&buf[..len], b"hello");
997        assert!(fin);
998    }
999
1000    #[test]
1001    fn recv_fin_change() {
1002        let mut stream =
1003            <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1004        assert!(!stream.recv.almost_full());
1005
1006        let first = RangeBuf::from(b"hello", 0, true);
1007        let second = RangeBuf::from(b"world", 5, true);
1008
1009        assert_eq!(stream.recv.write(second), Ok(()));
1010        assert_eq!(stream.recv.write(first), Err(Error::FinalSize));
1011    }
1012
1013    #[test]
1014    fn recv_fin_lower_than_received() {
1015        let mut stream =
1016            <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1017        assert!(!stream.recv.almost_full());
1018
1019        let first = RangeBuf::from(b"hello", 0, true);
1020        let second = RangeBuf::from(b"world", 5, false);
1021
1022        assert_eq!(stream.recv.write(second), Ok(()));
1023        assert_eq!(stream.recv.write(first), Err(Error::FinalSize));
1024    }
1025
1026    #[test]
1027    fn recv_fin_flow_control() {
1028        let mut stream =
1029            <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1030        assert!(!stream.recv.almost_full());
1031
1032        let mut buf = [0; 32];
1033
1034        let first = RangeBuf::from(b"hello", 0, false);
1035        let second = RangeBuf::from(b"world", 5, true);
1036
1037        assert_eq!(stream.recv.write(first), Ok(()));
1038        assert_eq!(stream.recv.write(second), Ok(()));
1039
1040        let (len, fin) = stream.recv.emit(&mut buf).unwrap();
1041        assert_eq!(&buf[..len], b"helloworld");
1042        assert!(fin);
1043
1044        assert!(!stream.recv.almost_full());
1045    }
1046
1047    #[test]
1048    fn recv_fin_reset_mismatch() {
1049        let mut stream =
1050            <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1051        assert!(!stream.recv.almost_full());
1052
1053        let first = RangeBuf::from(b"hello", 0, true);
1054
1055        assert_eq!(stream.recv.write(first), Ok(()));
1056        assert_eq!(stream.recv.reset(0, 10), Err(Error::FinalSize));
1057    }
1058
1059    #[test]
1060    fn recv_reset_with_gap() {
1061        let mut stream =
1062            <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1063        assert!(!stream.recv.almost_full());
1064
1065        let first = RangeBuf::from(b"hello", 0, false);
1066
1067        assert_eq!(stream.recv.write(first), Ok(()));
1068        // Read one byte.
1069        assert_eq!(stream.recv.emit(&mut [0; 1]), Ok((1, false)));
1070        // Reset with a final size > than max previously received
1071        assert_eq!(
1072            stream.recv.reset(0, 10),
1073            Ok(RecvBufResetReturn {
1074                max_data_delta: 5,
1075                // consumed_flowcontrol is 9, since we already read 1 byte
1076                consumed_flowcontrol: 9
1077            })
1078        );
1079        assert_eq!(stream.recv.reset(0, 10), Ok(RecvBufResetReturn::zero()));
1080    }
1081
1082    #[test]
1083    fn recv_reset_dup() {
1084        let mut stream =
1085            <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1086        assert!(!stream.recv.almost_full());
1087
1088        let first = RangeBuf::from(b"hello", 0, false);
1089
1090        assert_eq!(stream.recv.write(first), Ok(()));
1091        assert_eq!(
1092            stream.recv.reset(0, 5),
1093            Ok(RecvBufResetReturn {
1094                max_data_delta: 0,
1095                consumed_flowcontrol: 5
1096            })
1097        );
1098        assert_eq!(stream.recv.reset(0, 5), Ok(RecvBufResetReturn::zero()));
1099    }
1100
1101    #[test]
1102    fn recv_reset_change() {
1103        let mut stream =
1104            <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1105        assert!(!stream.recv.almost_full());
1106
1107        let first = RangeBuf::from(b"hello", 0, false);
1108
1109        assert_eq!(stream.recv.write(first), Ok(()));
1110        assert_eq!(
1111            stream.recv.reset(0, 5),
1112            Ok(RecvBufResetReturn {
1113                max_data_delta: 0,
1114                consumed_flowcontrol: 5
1115            })
1116        );
1117        assert_eq!(stream.recv.reset(0, 10), Err(Error::FinalSize));
1118    }
1119
1120    #[test]
1121    fn recv_reset_lower_than_received() {
1122        let mut stream =
1123            <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1124        assert!(!stream.recv.almost_full());
1125
1126        let first = RangeBuf::from(b"hello", 0, false);
1127
1128        assert_eq!(stream.recv.write(first), Ok(()));
1129        assert_eq!(stream.recv.reset(0, 4), Err(Error::FinalSize));
1130    }
1131
1132    #[test]
1133    fn send_flow_control() {
1134        let mut buf = [0; 25];
1135
1136        let mut stream =
1137            <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1138
1139        let first = b"hello";
1140        let second = b"world";
1141        let third = b"something";
1142
1143        assert!(stream.send.write(first, false).is_ok());
1144        assert!(stream.send.write(second, false).is_ok());
1145        assert!(stream.send.write(third, false).is_ok());
1146
1147        assert_eq!(stream.send.off_front(), 0);
1148
1149        let (written, fin) = stream.send.emit(&mut buf[..25]).unwrap();
1150        assert_eq!(written, 15);
1151        assert!(!fin);
1152        assert_eq!(&buf[..written], b"helloworldsomet");
1153
1154        assert_eq!(stream.send.off_front(), 15);
1155
1156        let (written, fin) = stream.send.emit(&mut buf[..25]).unwrap();
1157        assert_eq!(written, 0);
1158        assert!(!fin);
1159        assert_eq!(&buf[..written], b"");
1160
1161        stream.send.retransmit(0, 15);
1162
1163        assert_eq!(stream.send.off_front(), 0);
1164
1165        let (written, fin) = stream.send.emit(&mut buf[..10]).unwrap();
1166        assert_eq!(written, 10);
1167        assert!(!fin);
1168        assert_eq!(&buf[..written], b"helloworld");
1169
1170        assert_eq!(stream.send.off_front(), 10);
1171
1172        let (written, fin) = stream.send.emit(&mut buf[..10]).unwrap();
1173        assert_eq!(written, 5);
1174        assert!(!fin);
1175        assert_eq!(&buf[..written], b"somet");
1176    }
1177
1178    #[test]
1179    fn send_past_fin() {
1180        let mut stream =
1181            <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1182
1183        let first = b"hello";
1184        let second = b"world";
1185        let third = b"third";
1186
1187        assert_eq!(stream.send.write(first, false), Ok(5));
1188
1189        assert_eq!(stream.send.write(second, true), Ok(5));
1190        assert!(stream.send.is_fin());
1191
1192        assert_eq!(stream.send.write(third, false), Err(Error::FinalSize));
1193    }
1194
1195    #[test]
1196    fn send_fin_dup() {
1197        let mut stream =
1198            <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1199
1200        assert_eq!(stream.send.write(b"hello", true), Ok(5));
1201        assert!(stream.send.is_fin());
1202
1203        assert_eq!(stream.send.write(b"", true), Ok(0));
1204        assert!(stream.send.is_fin());
1205    }
1206
1207    #[test]
1208    fn send_undo_fin() {
1209        let mut stream =
1210            <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1211
1212        assert_eq!(stream.send.write(b"hello", true), Ok(5));
1213        assert!(stream.send.is_fin());
1214
1215        assert_eq!(
1216            stream.send.write(b"helloworld", true),
1217            Err(Error::FinalSize)
1218        );
1219    }
1220
1221    #[test]
1222    fn send_fin_max_data_match() {
1223        let mut buf = [0; 15];
1224
1225        let mut stream =
1226            <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1227
1228        let slice = b"hellohellohello";
1229
1230        assert!(stream.send.write(slice, true).is_ok());
1231
1232        let (written, fin) = stream.send.emit(&mut buf[..15]).unwrap();
1233        assert_eq!(written, 15);
1234        assert!(fin);
1235        assert_eq!(&buf[..written], slice);
1236    }
1237
1238    #[test]
1239    fn send_fin_zero_length() {
1240        let mut buf = [0; 5];
1241
1242        let mut stream =
1243            <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1244
1245        assert_eq!(stream.send.write(b"hello", false), Ok(5));
1246        assert_eq!(stream.send.write(b"", true), Ok(0));
1247        assert!(stream.send.is_fin());
1248
1249        let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1250        assert_eq!(written, 5);
1251        assert!(fin);
1252        assert_eq!(&buf[..written], b"hello");
1253    }
1254
1255    #[test]
1256    fn send_ack() {
1257        let mut buf = [0; 5];
1258
1259        let mut stream =
1260            <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1261
1262        assert_eq!(stream.send.write(b"hello", false), Ok(5));
1263        assert_eq!(stream.send.write(b"world", false), Ok(5));
1264        assert_eq!(stream.send.write(b"", true), Ok(0));
1265        assert!(stream.send.is_fin());
1266
1267        assert_eq!(stream.send.off_front(), 0);
1268
1269        let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1270        assert_eq!(written, 5);
1271        assert!(!fin);
1272        assert_eq!(&buf[..written], b"hello");
1273
1274        stream.send.ack_and_drop(0, 5);
1275
1276        stream.send.retransmit(0, 5);
1277
1278        assert_eq!(stream.send.off_front(), 5);
1279
1280        let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1281        assert_eq!(written, 5);
1282        assert!(fin);
1283        assert_eq!(&buf[..written], b"world");
1284    }
1285
1286    #[test]
1287    fn send_ack_reordering() {
1288        let mut buf = [0; 5];
1289
1290        let mut stream =
1291            <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1292
1293        assert_eq!(stream.send.write(b"hello", false), Ok(5));
1294        assert_eq!(stream.send.write(b"world", false), Ok(5));
1295        assert_eq!(stream.send.write(b"", true), Ok(0));
1296        assert!(stream.send.is_fin());
1297
1298        assert_eq!(stream.send.off_front(), 0);
1299
1300        let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1301        assert_eq!(written, 5);
1302        assert!(!fin);
1303        assert_eq!(&buf[..written], b"hello");
1304
1305        assert_eq!(stream.send.off_front(), 5);
1306
1307        let (written, fin) = stream.send.emit(&mut buf[..1]).unwrap();
1308        assert_eq!(written, 1);
1309        assert!(!fin);
1310        assert_eq!(&buf[..written], b"w");
1311
1312        stream.send.ack_and_drop(5, 1);
1313        stream.send.ack_and_drop(0, 5);
1314
1315        stream.send.retransmit(0, 5);
1316        stream.send.retransmit(5, 1);
1317
1318        assert_eq!(stream.send.off_front(), 6);
1319
1320        let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1321        assert_eq!(written, 4);
1322        assert!(fin);
1323        assert_eq!(&buf[..written], b"orld");
1324    }
1325
1326    #[test]
1327    fn recv_data_below_off() {
1328        let mut stream =
1329            <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1330
1331        let first = RangeBuf::from(b"hello", 0, false);
1332
1333        assert_eq!(stream.recv.write(first), Ok(()));
1334
1335        let mut buf = [0; 10];
1336
1337        let (len, fin) = stream.recv.emit(&mut buf).unwrap();
1338        assert_eq!(&buf[..len], b"hello");
1339        assert!(!fin);
1340
1341        let first = RangeBuf::from(b"elloworld", 1, true);
1342        assert_eq!(stream.recv.write(first), Ok(()));
1343
1344        let (len, fin) = stream.recv.emit(&mut buf).unwrap();
1345        assert_eq!(&buf[..len], b"world");
1346        assert!(fin);
1347    }
1348
1349    #[test]
1350    fn stream_complete() {
1351        let mut stream =
1352            <Stream>::new(0, 30, 30, true, true, DEFAULT_STREAM_WINDOW);
1353
1354        assert_eq!(stream.send.write(b"hello", false), Ok(5));
1355        assert_eq!(stream.send.write(b"world", false), Ok(5));
1356
1357        assert!(!stream.send.is_complete());
1358        assert!(!stream.send.is_fin());
1359
1360        assert_eq!(stream.send.write(b"", true), Ok(0));
1361
1362        assert!(!stream.send.is_complete());
1363        assert!(stream.send.is_fin());
1364
1365        let buf = RangeBuf::from(b"hello", 0, true);
1366        assert!(stream.recv.write(buf).is_ok());
1367        assert!(!stream.recv.is_fin());
1368
1369        stream.send.ack(6, 4);
1370        assert!(!stream.send.is_complete());
1371
1372        let mut buf = [0; 2];
1373        assert_eq!(stream.recv.emit(&mut buf), Ok((2, false)));
1374        assert!(!stream.recv.is_fin());
1375
1376        stream.send.ack(1, 5);
1377        assert!(!stream.send.is_complete());
1378
1379        stream.send.ack(0, 1);
1380        assert!(stream.send.is_complete());
1381
1382        assert!(!stream.is_complete());
1383
1384        let mut buf = [0; 3];
1385        assert_eq!(stream.recv.emit(&mut buf), Ok((3, true)));
1386        assert!(stream.recv.is_fin());
1387
1388        assert!(stream.is_complete());
1389    }
1390
1391    #[test]
1392    fn send_fin_zero_length_output() {
1393        let mut buf = [0; 5];
1394
1395        let mut stream =
1396            <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1397
1398        assert_eq!(stream.send.write(b"hello", false), Ok(5));
1399        assert_eq!(stream.send.off_front(), 0);
1400        assert!(!stream.send.is_fin());
1401
1402        let (written, fin) = stream.send.emit(&mut buf).unwrap();
1403        assert_eq!(written, 5);
1404        assert!(!fin);
1405        assert_eq!(&buf[..written], b"hello");
1406
1407        assert_eq!(stream.send.write(b"", true), Ok(0));
1408        assert!(stream.send.is_fin());
1409        assert_eq!(stream.send.off_front(), 5);
1410
1411        let (written, fin) = stream.send.emit(&mut buf).unwrap();
1412        assert_eq!(written, 0);
1413        assert!(fin);
1414        assert_eq!(&buf[..written], b"");
1415    }
1416
1417    fn stream_send_ready(stream: &Stream) -> bool {
1418        !stream.send.is_empty() &&
1419            stream.send.off_front() < stream.send.off_back()
1420    }
1421
1422    #[test]
1423    fn send_emit() {
1424        let mut buf = [0; 5];
1425
1426        let mut stream =
1427            <Stream>::new(0, 0, 20, true, true, DEFAULT_STREAM_WINDOW);
1428
1429        assert_eq!(stream.send.write(b"hello", false), Ok(5));
1430        assert_eq!(stream.send.write(b"world", false), Ok(5));
1431        assert_eq!(stream.send.write(b"olleh", false), Ok(5));
1432        assert_eq!(stream.send.write(b"dlrow", true), Ok(5));
1433        assert_eq!(stream.send.off_front(), 0);
1434        assert_eq!(stream.send.bufs_count(), 4);
1435
1436        assert!(stream.is_flushable());
1437
1438        assert!(stream_send_ready(&stream));
1439        assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1440        assert_eq!(stream.send.off_front(), 4);
1441        assert_eq!(&buf[..4], b"hell");
1442
1443        assert!(stream_send_ready(&stream));
1444        assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1445        assert_eq!(stream.send.off_front(), 8);
1446        assert_eq!(&buf[..4], b"owor");
1447
1448        assert!(stream_send_ready(&stream));
1449        assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1450        assert_eq!(stream.send.off_front(), 10);
1451        assert_eq!(&buf[..2], b"ld");
1452
1453        assert!(stream_send_ready(&stream));
1454        assert_eq!(stream.send.emit(&mut buf[..1]), Ok((1, false)));
1455        assert_eq!(stream.send.off_front(), 11);
1456        assert_eq!(&buf[..1], b"o");
1457
1458        assert!(stream_send_ready(&stream));
1459        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1460        assert_eq!(stream.send.off_front(), 16);
1461        assert_eq!(&buf[..5], b"llehd");
1462
1463        assert!(stream_send_ready(&stream));
1464        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((4, true)));
1465        assert_eq!(stream.send.off_front(), 20);
1466        assert_eq!(&buf[..4], b"lrow");
1467
1468        assert!(!stream.is_flushable());
1469
1470        assert!(!stream_send_ready(&stream));
1471        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((0, true)));
1472        assert_eq!(stream.send.off_front(), 20);
1473    }
1474
1475    #[test]
1476    fn send_emit_ack() {
1477        let mut buf = [0; 5];
1478
1479        let mut stream =
1480            <Stream>::new(0, 0, 20, true, true, DEFAULT_STREAM_WINDOW);
1481
1482        assert_eq!(stream.send.write(b"hello", false), Ok(5));
1483        assert_eq!(stream.send.write(b"world", false), Ok(5));
1484        assert_eq!(stream.send.write(b"olleh", false), Ok(5));
1485        assert_eq!(stream.send.write(b"dlrow", true), Ok(5));
1486        assert_eq!(stream.send.off_front(), 0);
1487        assert_eq!(stream.send.bufs_count(), 4);
1488
1489        assert!(stream.is_flushable());
1490
1491        assert!(stream_send_ready(&stream));
1492        assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1493        assert_eq!(stream.send.off_front(), 4);
1494        assert_eq!(&buf[..4], b"hell");
1495
1496        assert!(stream_send_ready(&stream));
1497        assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1498        assert_eq!(stream.send.off_front(), 8);
1499        assert_eq!(&buf[..4], b"owor");
1500
1501        stream.send.ack_and_drop(0, 5);
1502        assert_eq!(stream.send.bufs_count(), 3);
1503
1504        assert!(stream_send_ready(&stream));
1505        assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1506        assert_eq!(stream.send.off_front(), 10);
1507        assert_eq!(&buf[..2], b"ld");
1508
1509        stream.send.ack_and_drop(7, 5);
1510        assert_eq!(stream.send.bufs_count(), 3);
1511
1512        assert!(stream_send_ready(&stream));
1513        assert_eq!(stream.send.emit(&mut buf[..1]), Ok((1, false)));
1514        assert_eq!(stream.send.off_front(), 11);
1515        assert_eq!(&buf[..1], b"o");
1516
1517        assert!(stream_send_ready(&stream));
1518        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1519        assert_eq!(stream.send.off_front(), 16);
1520        assert_eq!(&buf[..5], b"llehd");
1521
1522        stream.send.ack_and_drop(5, 7);
1523        assert_eq!(stream.send.bufs_count(), 2);
1524
1525        assert!(stream_send_ready(&stream));
1526        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((4, true)));
1527        assert_eq!(stream.send.off_front(), 20);
1528        assert_eq!(&buf[..4], b"lrow");
1529
1530        assert!(!stream.is_flushable());
1531
1532        assert!(!stream_send_ready(&stream));
1533        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((0, true)));
1534        assert_eq!(stream.send.off_front(), 20);
1535
1536        stream.send.ack_and_drop(22, 4);
1537        assert_eq!(stream.send.bufs_count(), 2);
1538
1539        stream.send.ack_and_drop(20, 1);
1540        assert_eq!(stream.send.bufs_count(), 2);
1541    }
1542
1543    #[test]
1544    fn send_emit_retransmit() {
1545        let mut buf = [0; 5];
1546
1547        let mut stream =
1548            <Stream>::new(0, 0, 20, true, true, DEFAULT_STREAM_WINDOW);
1549
1550        assert_eq!(stream.send.write(b"hello", false), Ok(5));
1551        assert_eq!(stream.send.write(b"world", false), Ok(5));
1552        assert_eq!(stream.send.write(b"olleh", false), Ok(5));
1553        assert_eq!(stream.send.write(b"dlrow", true), Ok(5));
1554        assert_eq!(stream.send.off_front(), 0);
1555        assert_eq!(stream.send.bufs_count(), 4);
1556
1557        assert!(stream.is_flushable());
1558
1559        assert!(stream_send_ready(&stream));
1560        assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1561        assert_eq!(stream.send.off_front(), 4);
1562        assert_eq!(&buf[..4], b"hell");
1563
1564        assert!(stream_send_ready(&stream));
1565        assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1566        assert_eq!(stream.send.off_front(), 8);
1567        assert_eq!(&buf[..4], b"owor");
1568
1569        stream.send.retransmit(3, 3);
1570        assert_eq!(stream.send.off_front(), 3);
1571
1572        assert!(stream_send_ready(&stream));
1573        assert_eq!(stream.send.emit(&mut buf[..3]), Ok((3, false)));
1574        assert_eq!(stream.send.off_front(), 8);
1575        assert_eq!(&buf[..3], b"low");
1576
1577        assert!(stream_send_ready(&stream));
1578        assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1579        assert_eq!(stream.send.off_front(), 10);
1580        assert_eq!(&buf[..2], b"ld");
1581
1582        stream.send.ack_and_drop(7, 2);
1583
1584        stream.send.retransmit(8, 2);
1585
1586        assert!(stream_send_ready(&stream));
1587        assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1588        assert_eq!(stream.send.off_front(), 10);
1589        assert_eq!(&buf[..2], b"ld");
1590
1591        assert!(stream_send_ready(&stream));
1592        assert_eq!(stream.send.emit(&mut buf[..1]), Ok((1, false)));
1593        assert_eq!(stream.send.off_front(), 11);
1594        assert_eq!(&buf[..1], b"o");
1595
1596        assert!(stream_send_ready(&stream));
1597        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1598        assert_eq!(stream.send.off_front(), 16);
1599        assert_eq!(&buf[..5], b"llehd");
1600
1601        stream.send.retransmit(12, 2);
1602
1603        assert!(stream_send_ready(&stream));
1604        assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1605        assert_eq!(stream.send.off_front(), 16);
1606        assert_eq!(&buf[..2], b"le");
1607
1608        assert!(stream_send_ready(&stream));
1609        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((4, true)));
1610        assert_eq!(stream.send.off_front(), 20);
1611        assert_eq!(&buf[..4], b"lrow");
1612
1613        assert!(!stream.is_flushable());
1614
1615        assert!(!stream_send_ready(&stream));
1616        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((0, true)));
1617        assert_eq!(stream.send.off_front(), 20);
1618
1619        stream.send.retransmit(7, 12);
1620
1621        assert!(stream_send_ready(&stream));
1622        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1623        assert_eq!(stream.send.off_front(), 12);
1624        assert_eq!(&buf[..5], b"rldol");
1625
1626        assert!(stream_send_ready(&stream));
1627        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1628        assert_eq!(stream.send.off_front(), 17);
1629        assert_eq!(&buf[..5], b"lehdl");
1630
1631        assert!(stream_send_ready(&stream));
1632        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((2, false)));
1633        assert_eq!(stream.send.off_front(), 20);
1634        assert_eq!(&buf[..2], b"ro");
1635
1636        stream.send.ack_and_drop(12, 7);
1637
1638        stream.send.retransmit(7, 12);
1639
1640        assert!(stream_send_ready(&stream));
1641        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1642        assert_eq!(stream.send.off_front(), 12);
1643        assert_eq!(&buf[..5], b"rldol");
1644
1645        assert!(stream_send_ready(&stream));
1646        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1647        assert_eq!(stream.send.off_front(), 17);
1648        assert_eq!(&buf[..5], b"lehdl");
1649
1650        assert!(stream_send_ready(&stream));
1651        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((2, false)));
1652        assert_eq!(stream.send.off_front(), 20);
1653        assert_eq!(&buf[..2], b"ro");
1654    }
1655
1656    #[test]
1657    fn rangebuf_split_off() {
1658        let mut buf = <RangeBuf>::from(b"helloworld", 5, true);
1659        assert_eq!(buf.start, 0);
1660        assert_eq!(buf.pos, 0);
1661        assert_eq!(buf.len, 10);
1662        assert_eq!(buf.off, 5);
1663        assert!(buf.fin);
1664
1665        assert_eq!(buf.len(), 10);
1666        assert_eq!(buf.off(), 5);
1667        assert!(buf.fin());
1668
1669        assert_eq!(&buf[..], b"helloworld");
1670
1671        // Advance buffer.
1672        buf.consume(5);
1673
1674        assert_eq!(buf.start, 0);
1675        assert_eq!(buf.pos, 5);
1676        assert_eq!(buf.len, 10);
1677        assert_eq!(buf.off, 5);
1678        assert!(buf.fin);
1679
1680        assert_eq!(buf.len(), 5);
1681        assert_eq!(buf.off(), 10);
1682        assert!(buf.fin());
1683
1684        assert_eq!(&buf[..], b"world");
1685
1686        // Split buffer before position.
1687        let mut new_buf = buf.split_off(3);
1688
1689        assert_eq!(buf.start, 0);
1690        assert_eq!(buf.pos, 3);
1691        assert_eq!(buf.len, 3);
1692        assert_eq!(buf.off, 5);
1693        assert!(!buf.fin);
1694
1695        assert_eq!(buf.len(), 0);
1696        assert_eq!(buf.off(), 8);
1697        assert!(!buf.fin());
1698
1699        assert_eq!(&buf[..], b"");
1700
1701        assert_eq!(new_buf.start, 3);
1702        assert_eq!(new_buf.pos, 5);
1703        assert_eq!(new_buf.len, 7);
1704        assert_eq!(new_buf.off, 8);
1705        assert!(new_buf.fin);
1706
1707        assert_eq!(new_buf.len(), 5);
1708        assert_eq!(new_buf.off(), 10);
1709        assert!(new_buf.fin());
1710
1711        assert_eq!(&new_buf[..], b"world");
1712
1713        // Advance buffer.
1714        new_buf.consume(2);
1715
1716        assert_eq!(new_buf.start, 3);
1717        assert_eq!(new_buf.pos, 7);
1718        assert_eq!(new_buf.len, 7);
1719        assert_eq!(new_buf.off, 8);
1720        assert!(new_buf.fin);
1721
1722        assert_eq!(new_buf.len(), 3);
1723        assert_eq!(new_buf.off(), 12);
1724        assert!(new_buf.fin());
1725
1726        assert_eq!(&new_buf[..], b"rld");
1727
1728        // Split buffer after position.
1729        let mut new_new_buf = new_buf.split_off(5);
1730
1731        assert_eq!(new_buf.start, 3);
1732        assert_eq!(new_buf.pos, 7);
1733        assert_eq!(new_buf.len, 5);
1734        assert_eq!(new_buf.off, 8);
1735        assert!(!new_buf.fin);
1736
1737        assert_eq!(new_buf.len(), 1);
1738        assert_eq!(new_buf.off(), 12);
1739        assert!(!new_buf.fin());
1740
1741        assert_eq!(&new_buf[..], b"r");
1742
1743        assert_eq!(new_new_buf.start, 8);
1744        assert_eq!(new_new_buf.pos, 8);
1745        assert_eq!(new_new_buf.len, 2);
1746        assert_eq!(new_new_buf.off, 13);
1747        assert!(new_new_buf.fin);
1748
1749        assert_eq!(new_new_buf.len(), 2);
1750        assert_eq!(new_new_buf.off(), 13);
1751        assert!(new_new_buf.fin());
1752
1753        assert_eq!(&new_new_buf[..], b"ld");
1754
1755        // Advance buffer.
1756        new_new_buf.consume(2);
1757
1758        assert_eq!(new_new_buf.start, 8);
1759        assert_eq!(new_new_buf.pos, 10);
1760        assert_eq!(new_new_buf.len, 2);
1761        assert_eq!(new_new_buf.off, 13);
1762        assert!(new_new_buf.fin);
1763
1764        assert_eq!(new_new_buf.len(), 0);
1765        assert_eq!(new_new_buf.off(), 15);
1766        assert!(new_new_buf.fin());
1767
1768        assert_eq!(&new_new_buf[..], b"");
1769    }
1770
1771    /// RFC9000 2.1: A stream ID that is used out of order results in all
1772    /// streams of that type with lower-numbered stream IDs also being opened.
1773    #[test]
1774    fn stream_limit_auto_open() {
1775        let local_tp = crate::TransportParams::default();
1776        let peer_tp = crate::TransportParams::default();
1777
1778        let mut streams = <StreamMap>::new(5, 5, 5);
1779
1780        let stream_id = 500;
1781        assert!(!is_local(stream_id, true), "stream id is peer initiated");
1782        assert!(is_bidi(stream_id), "stream id is bidirectional");
1783        assert_eq!(
1784            streams
1785                .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
1786                .err(),
1787            Some(Error::StreamLimit),
1788            "stream limit should be exceeded"
1789        );
1790    }
1791
1792    /// Stream limit should be satisfied regardless of what order we open
1793    /// streams
1794    #[test]
1795    fn stream_create_out_of_order() {
1796        let local_tp = crate::TransportParams::default();
1797        let peer_tp = crate::TransportParams::default();
1798
1799        let mut streams = <StreamMap>::new(5, 5, 5);
1800
1801        for stream_id in [8, 12, 4] {
1802            assert!(is_local(stream_id, false), "stream id is client initiated");
1803            assert!(is_bidi(stream_id), "stream id is bidirectional");
1804            assert!(streams
1805                .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
1806                .is_ok());
1807        }
1808    }
1809
1810    /// Check stream limit boundary cases
1811    #[test]
1812    fn stream_limit_edge() {
1813        let local_tp = crate::TransportParams::default();
1814        let peer_tp = crate::TransportParams::default();
1815
1816        let mut streams = <StreamMap>::new(3, 3, 3);
1817
1818        // Highest permitted
1819        let stream_id = 8;
1820        assert!(streams
1821            .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
1822            .is_ok());
1823
1824        // One more than highest permitted
1825        let stream_id = 12;
1826        assert_eq!(
1827            streams
1828                .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
1829                .err(),
1830            Some(Error::StreamLimit)
1831        );
1832    }
1833
1834    fn cycle_stream_priority(stream_id: u64, streams: &mut StreamMap) {
1835        let key = streams.get(stream_id).unwrap().priority_key.clone();
1836        streams.update_priority(&key.clone(), &key);
1837    }
1838
1839    #[test]
1840    fn writable_prioritized_default_priority() {
1841        let local_tp = crate::TransportParams::default();
1842        let peer_tp = crate::TransportParams {
1843            initial_max_stream_data_bidi_local: 100,
1844            initial_max_stream_data_uni: 100,
1845            ..Default::default()
1846        };
1847
1848        let mut streams = StreamMap::new(100, 100, 100);
1849
1850        for id in [0, 4, 8, 12] {
1851            assert!(streams
1852                .get_or_create(id, &local_tp, &peer_tp, false, true)
1853                .is_ok());
1854        }
1855
1856        let walk_1: Vec<u64> = streams.writable().collect();
1857        cycle_stream_priority(*walk_1.first().unwrap(), &mut streams);
1858        let walk_2: Vec<u64> = streams.writable().collect();
1859        cycle_stream_priority(*walk_2.first().unwrap(), &mut streams);
1860        let walk_3: Vec<u64> = streams.writable().collect();
1861        cycle_stream_priority(*walk_3.first().unwrap(), &mut streams);
1862        let walk_4: Vec<u64> = streams.writable().collect();
1863        cycle_stream_priority(*walk_4.first().unwrap(), &mut streams);
1864        let walk_5: Vec<u64> = streams.writable().collect();
1865
1866        // All streams are non-incremental and same urgency by default. Multiple
1867        // visits shuffle their order.
1868        assert_eq!(walk_1, vec![0, 4, 8, 12]);
1869        assert_eq!(walk_2, vec![4, 8, 12, 0]);
1870        assert_eq!(walk_3, vec![8, 12, 0, 4]);
1871        assert_eq!(walk_4, vec![12, 0, 4, 8,]);
1872        assert_eq!(walk_5, vec![0, 4, 8, 12]);
1873    }
1874
1875    #[test]
1876    fn writable_prioritized_insert_order() {
1877        let local_tp = crate::TransportParams::default();
1878        let peer_tp = crate::TransportParams {
1879            initial_max_stream_data_bidi_local: 100,
1880            initial_max_stream_data_uni: 100,
1881            ..Default::default()
1882        };
1883
1884        let mut streams = StreamMap::new(100, 100, 100);
1885
1886        // Inserting same-urgency incremental streams in a "random" order yields
1887        // same order to start with.
1888        for id in [12, 4, 8, 0] {
1889            assert!(streams
1890                .get_or_create(id, &local_tp, &peer_tp, false, true)
1891                .is_ok());
1892        }
1893
1894        let walk_1: Vec<u64> = streams.writable().collect();
1895        cycle_stream_priority(*walk_1.first().unwrap(), &mut streams);
1896        let walk_2: Vec<u64> = streams.writable().collect();
1897        cycle_stream_priority(*walk_2.first().unwrap(), &mut streams);
1898        let walk_3: Vec<u64> = streams.writable().collect();
1899        cycle_stream_priority(*walk_3.first().unwrap(), &mut streams);
1900        let walk_4: Vec<u64> = streams.writable().collect();
1901        cycle_stream_priority(*walk_4.first().unwrap(), &mut streams);
1902        let walk_5: Vec<u64> = streams.writable().collect();
1903        assert_eq!(walk_1, vec![12, 4, 8, 0]);
1904        assert_eq!(walk_2, vec![4, 8, 0, 12]);
1905        assert_eq!(walk_3, vec![8, 0, 12, 4,]);
1906        assert_eq!(walk_4, vec![0, 12, 4, 8]);
1907        assert_eq!(walk_5, vec![12, 4, 8, 0]);
1908    }
1909
1910    #[test]
1911    fn writable_prioritized_mixed_urgency() {
1912        let local_tp = crate::TransportParams::default();
1913        let peer_tp = crate::TransportParams {
1914            initial_max_stream_data_bidi_local: 100,
1915            initial_max_stream_data_uni: 100,
1916            ..Default::default()
1917        };
1918
1919        let mut streams = <StreamMap>::new(100, 100, 100);
1920
1921        // Streams where the urgency descends (becomes more important). No stream
1922        // shares an urgency.
1923        let input = vec![
1924            (0, 100),
1925            (4, 90),
1926            (8, 80),
1927            (12, 70),
1928            (16, 60),
1929            (20, 50),
1930            (24, 40),
1931            (28, 30),
1932            (32, 20),
1933            (36, 10),
1934            (40, 0),
1935        ];
1936
1937        for (id, urgency) in input.clone() {
1938            // this duplicates some code from stream_priority in order to access
1939            // streams and the collection they're in
1940            let stream = streams
1941                .get_or_create(id, &local_tp, &peer_tp, false, true)
1942                .unwrap();
1943
1944            stream.urgency = urgency;
1945
1946            let new_priority_key = Arc::new(StreamPriorityKey {
1947                urgency: stream.urgency,
1948                incremental: stream.incremental,
1949                id,
1950                ..Default::default()
1951            });
1952
1953            let old_priority_key = std::mem::replace(
1954                &mut stream.priority_key,
1955                new_priority_key.clone(),
1956            );
1957
1958            streams.update_priority(&old_priority_key, &new_priority_key);
1959        }
1960
1961        let walk_1: Vec<u64> = streams.writable().collect();
1962        assert_eq!(walk_1, vec![40, 36, 32, 28, 24, 20, 16, 12, 8, 4, 0]);
1963
1964        // Re-applying priority to a stream does not cause duplication.
1965        for (id, urgency) in input {
1966            // this duplicates some code from stream_priority in order to access
1967            // streams and the collection they're in
1968            let stream = streams
1969                .get_or_create(id, &local_tp, &peer_tp, false, true)
1970                .unwrap();
1971
1972            stream.urgency = urgency;
1973
1974            let new_priority_key = Arc::new(StreamPriorityKey {
1975                urgency: stream.urgency,
1976                incremental: stream.incremental,
1977                id,
1978                ..Default::default()
1979            });
1980
1981            let old_priority_key = std::mem::replace(
1982                &mut stream.priority_key,
1983                new_priority_key.clone(),
1984            );
1985
1986            streams.update_priority(&old_priority_key, &new_priority_key);
1987        }
1988
1989        let walk_2: Vec<u64> = streams.writable().collect();
1990        assert_eq!(walk_2, vec![40, 36, 32, 28, 24, 20, 16, 12, 8, 4, 0]);
1991
1992        // Removing streams doesn't break expected ordering.
1993        streams.collect(24, true);
1994
1995        let walk_3: Vec<u64> = streams.writable().collect();
1996        assert_eq!(walk_3, vec![40, 36, 32, 28, 20, 16, 12, 8, 4, 0]);
1997
1998        streams.collect(40, true);
1999        streams.collect(0, true);
2000
2001        let walk_4: Vec<u64> = streams.writable().collect();
2002        assert_eq!(walk_4, vec![36, 32, 28, 20, 16, 12, 8, 4]);
2003
2004        // Adding streams doesn't break expected ordering.
2005        streams
2006            .get_or_create(44, &local_tp, &peer_tp, false, true)
2007            .unwrap();
2008
2009        let walk_5: Vec<u64> = streams.writable().collect();
2010        assert_eq!(walk_5, vec![36, 32, 28, 20, 16, 12, 8, 4, 44]);
2011    }
2012
2013    #[test]
2014    fn writable_prioritized_mixed_urgencies_incrementals() {
2015        let local_tp = crate::TransportParams::default();
2016        let peer_tp = crate::TransportParams {
2017            initial_max_stream_data_bidi_local: 100,
2018            initial_max_stream_data_uni: 100,
2019            ..Default::default()
2020        };
2021
2022        let mut streams = StreamMap::new(100, 100, 100);
2023
2024        // Streams that share some urgency level
2025        let input = vec![
2026            (0, 100),
2027            (4, 20),
2028            (8, 100),
2029            (12, 20),
2030            (16, 90),
2031            (20, 25),
2032            (24, 90),
2033            (28, 30),
2034            (32, 80),
2035            (36, 20),
2036            (40, 0),
2037        ];
2038
2039        for (id, urgency) in input.clone() {
2040            // this duplicates some code from stream_priority in order to access
2041            // streams and the collection they're in
2042            let stream = streams
2043                .get_or_create(id, &local_tp, &peer_tp, false, true)
2044                .unwrap();
2045
2046            stream.urgency = urgency;
2047
2048            let new_priority_key = Arc::new(StreamPriorityKey {
2049                urgency: stream.urgency,
2050                incremental: stream.incremental,
2051                id,
2052                ..Default::default()
2053            });
2054
2055            let old_priority_key = std::mem::replace(
2056                &mut stream.priority_key,
2057                new_priority_key.clone(),
2058            );
2059
2060            streams.update_priority(&old_priority_key, &new_priority_key);
2061        }
2062
2063        let walk_1: Vec<u64> = streams.writable().collect();
2064        cycle_stream_priority(4, &mut streams);
2065        cycle_stream_priority(16, &mut streams);
2066        cycle_stream_priority(0, &mut streams);
2067        let walk_2: Vec<u64> = streams.writable().collect();
2068        cycle_stream_priority(12, &mut streams);
2069        cycle_stream_priority(24, &mut streams);
2070        cycle_stream_priority(8, &mut streams);
2071        let walk_3: Vec<u64> = streams.writable().collect();
2072        cycle_stream_priority(36, &mut streams);
2073        cycle_stream_priority(16, &mut streams);
2074        cycle_stream_priority(0, &mut streams);
2075        let walk_4: Vec<u64> = streams.writable().collect();
2076        cycle_stream_priority(4, &mut streams);
2077        cycle_stream_priority(24, &mut streams);
2078        cycle_stream_priority(8, &mut streams);
2079        let walk_5: Vec<u64> = streams.writable().collect();
2080        cycle_stream_priority(12, &mut streams);
2081        cycle_stream_priority(16, &mut streams);
2082        cycle_stream_priority(0, &mut streams);
2083        let walk_6: Vec<u64> = streams.writable().collect();
2084        cycle_stream_priority(36, &mut streams);
2085        cycle_stream_priority(24, &mut streams);
2086        cycle_stream_priority(8, &mut streams);
2087        let walk_7: Vec<u64> = streams.writable().collect();
2088        cycle_stream_priority(4, &mut streams);
2089        cycle_stream_priority(16, &mut streams);
2090        cycle_stream_priority(0, &mut streams);
2091        let walk_8: Vec<u64> = streams.writable().collect();
2092        cycle_stream_priority(12, &mut streams);
2093        cycle_stream_priority(24, &mut streams);
2094        cycle_stream_priority(8, &mut streams);
2095        let walk_9: Vec<u64> = streams.writable().collect();
2096        cycle_stream_priority(36, &mut streams);
2097        cycle_stream_priority(16, &mut streams);
2098        cycle_stream_priority(0, &mut streams);
2099
2100        assert_eq!(walk_1, vec![40, 4, 12, 36, 20, 28, 32, 16, 24, 0, 8]);
2101        assert_eq!(walk_2, vec![40, 12, 36, 4, 20, 28, 32, 24, 16, 8, 0]);
2102        assert_eq!(walk_3, vec![40, 36, 4, 12, 20, 28, 32, 16, 24, 0, 8]);
2103        assert_eq!(walk_4, vec![40, 4, 12, 36, 20, 28, 32, 24, 16, 8, 0]);
2104        assert_eq!(walk_5, vec![40, 12, 36, 4, 20, 28, 32, 16, 24, 0, 8]);
2105        assert_eq!(walk_6, vec![40, 36, 4, 12, 20, 28, 32, 24, 16, 8, 0]);
2106        assert_eq!(walk_7, vec![40, 4, 12, 36, 20, 28, 32, 16, 24, 0, 8]);
2107        assert_eq!(walk_8, vec![40, 12, 36, 4, 20, 28, 32, 24, 16, 8, 0]);
2108        assert_eq!(walk_9, vec![40, 36, 4, 12, 20, 28, 32, 16, 24, 0, 8]);
2109
2110        // Removing streams doesn't break expected ordering.
2111        streams.collect(20, true);
2112
2113        let walk_10: Vec<u64> = streams.writable().collect();
2114        assert_eq!(walk_10, vec![40, 4, 12, 36, 28, 32, 24, 16, 8, 0]);
2115
2116        // Adding streams doesn't break expected ordering.
2117        let stream = streams
2118            .get_or_create(44, &local_tp, &peer_tp, false, true)
2119            .unwrap();
2120
2121        stream.urgency = 20;
2122        stream.incremental = true;
2123
2124        let new_priority_key = Arc::new(StreamPriorityKey {
2125            urgency: stream.urgency,
2126            incremental: stream.incremental,
2127            id: 44,
2128            ..Default::default()
2129        });
2130
2131        let old_priority_key =
2132            std::mem::replace(&mut stream.priority_key, new_priority_key.clone());
2133
2134        streams.update_priority(&old_priority_key, &new_priority_key);
2135
2136        let walk_11: Vec<u64> = streams.writable().collect();
2137        assert_eq!(walk_11, vec![40, 4, 12, 36, 44, 28, 32, 24, 16, 8, 0]);
2138    }
2139
2140    #[test]
2141    fn priority_tree_dupes() {
2142        let mut prioritized_writable: RBTree<StreamWritablePriorityAdapter> =
2143            Default::default();
2144
2145        for id in [0, 4, 8, 12] {
2146            let s = Arc::new(StreamPriorityKey {
2147                urgency: 0,
2148                incremental: false,
2149                id,
2150                ..Default::default()
2151            });
2152
2153            prioritized_writable.insert(s);
2154        }
2155
2156        let walk_1: Vec<u64> =
2157            prioritized_writable.iter().map(|s| s.id).collect();
2158        assert_eq!(walk_1, vec![0, 4, 8, 12]);
2159
2160        // Default keys could cause duplicate entries, this is normally protected
2161        // against via StreamMap.
2162        for id in [0, 4, 8, 12] {
2163            let s = Arc::new(StreamPriorityKey {
2164                urgency: 0,
2165                incremental: false,
2166                id,
2167                ..Default::default()
2168            });
2169
2170            prioritized_writable.insert(s);
2171        }
2172
2173        let walk_2: Vec<u64> =
2174            prioritized_writable.iter().map(|s| s.id).collect();
2175        assert_eq!(walk_2, vec![0, 0, 4, 4, 8, 8, 12, 12]);
2176    }
2177}
2178
2179mod recv_buf;
2180mod send_buf;