quiche/stream/
mod.rs

1// Copyright (C) 2018-2019, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27use std::cmp;
28
29use std::sync::Arc;
30
31use std::collections::hash_map;
32use std::collections::HashMap;
33use std::collections::HashSet;
34
35use intrusive_collections::intrusive_adapter;
36use intrusive_collections::KeyAdapter;
37use intrusive_collections::RBTree;
38use intrusive_collections::RBTreeAtomicLink;
39
40use smallvec::SmallVec;
41
42use crate::range_buf::DefaultBufFactory;
43use crate::BufFactory;
44use crate::Error;
45use crate::Result;
46
47const DEFAULT_URGENCY: u8 = 127;
48
49// The default size of the receiver stream flow control window.
50const DEFAULT_STREAM_WINDOW: u64 = 32 * 1024;
51
52/// The maximum size of the receiver stream flow control window.
53pub const MAX_STREAM_WINDOW: u64 = 16 * 1024 * 1024;
54
55/// A simple no-op hasher for Stream IDs.
56///
57/// The QUIC protocol and quiche library guarantees stream ID uniqueness, so
58/// we can save effort by avoiding using a more complicated algorithm.
59#[derive(Default)]
60pub struct StreamIdHasher {
61    id: u64,
62}
63
64impl std::hash::Hasher for StreamIdHasher {
65    #[inline]
66    fn finish(&self) -> u64 {
67        self.id
68    }
69
70    #[inline]
71    fn write_u64(&mut self, id: u64) {
72        self.id = id;
73    }
74
75    #[inline]
76    fn write(&mut self, _: &[u8]) {
77        // We need a default write() for the trait but stream IDs will always
78        // be a u64 so we just delegate to write_u64.
79        unimplemented!()
80    }
81}
82
83type BuildStreamIdHasher = std::hash::BuildHasherDefault<StreamIdHasher>;
84
85pub type StreamIdHashMap<V> = HashMap<u64, V, BuildStreamIdHasher>;
86pub type StreamIdHashSet = HashSet<u64, BuildStreamIdHasher>;
87
88/// Keeps track of QUIC streams and enforces stream limits.
89#[derive(Default)]
90pub struct StreamMap<F: BufFactory = DefaultBufFactory> {
91    /// Map of streams indexed by stream ID.
92    streams: StreamIdHashMap<Stream<F>>,
93
94    /// Set of streams that were completed and garbage collected.
95    ///
96    /// Instead of keeping the full stream state forever, we collect completed
97    /// streams to save memory, but we still need to keep track of previously
98    /// created streams, to prevent peers from re-creating them.
99    collected: StreamIdHashSet,
100
101    /// Peer's maximum bidirectional stream count limit.
102    peer_max_streams_bidi: u64,
103
104    /// Peer's maximum unidirectional stream count limit.
105    peer_max_streams_uni: u64,
106
107    /// The total number of bidirectional streams opened by the peer.
108    peer_opened_streams_bidi: u64,
109
110    /// The total number of unidirectional streams opened by the peer.
111    peer_opened_streams_uni: u64,
112
113    /// Local maximum bidirectional stream count limit.
114    local_max_streams_bidi: u64,
115    local_max_streams_bidi_next: u64,
116
117    /// Local maximum unidirectional stream count limit.
118    local_max_streams_uni: u64,
119    local_max_streams_uni_next: u64,
120
121    /// The total number of bidirectional streams opened by the local endpoint.
122    local_opened_streams_bidi: u64,
123
124    /// The total number of unidirectional streams opened by the local endpoint.
125    local_opened_streams_uni: u64,
126
127    /// Queue of stream IDs corresponding to streams that have buffered data
128    /// ready to be sent to the peer. This also implies that the stream has
129    /// enough flow control credits to send at least some of that data.
130    flushable: RBTree<StreamFlushablePriorityAdapter>,
131
132    /// Set of stream IDs corresponding to streams that have outstanding data
133    /// to read. This is used to generate a `StreamIter` of streams without
134    /// having to iterate over the full list of streams.
135    pub readable: RBTree<StreamReadablePriorityAdapter>,
136
137    /// Set of stream IDs corresponding to streams that have enough flow control
138    /// capacity to be written to, and is not finished. This is used to generate
139    /// a `StreamIter` of streams without having to iterate over the full list
140    /// of streams.
141    pub writable: RBTree<StreamWritablePriorityAdapter>,
142
143    /// Set of stream IDs corresponding to streams that are almost out of flow
144    /// control credit and need to send MAX_STREAM_DATA. This is used to
145    /// generate a `StreamIter` of streams without having to iterate over the
146    /// full list of streams.
147    almost_full: StreamIdHashSet,
148
149    /// Set of stream IDs corresponding to streams that are blocked. The value
150    /// of the map elements represents the offset of the stream at which the
151    /// blocking occurred.
152    blocked: StreamIdHashMap<u64>,
153
154    /// Set of stream IDs corresponding to streams that are reset. The value
155    /// of the map elements is a tuple of the error code and final size values
156    /// to include in the RESET_STREAM frame.
157    reset: StreamIdHashMap<(u64, u64)>,
158
159    /// Set of stream IDs corresponding to streams that are shutdown on the
160    /// receive side, and need to send a STOP_SENDING frame. The value of the
161    /// map elements is the error code to include in the STOP_SENDING frame.
162    stopped: StreamIdHashMap<u64>,
163
164    /// The maximum size of a stream window.
165    max_stream_window: u64,
166}
167
168impl<F: BufFactory> StreamMap<F> {
169    pub fn new(
170        max_streams_bidi: u64, max_streams_uni: u64, max_stream_window: u64,
171    ) -> Self {
172        StreamMap {
173            local_max_streams_bidi: max_streams_bidi,
174            local_max_streams_bidi_next: max_streams_bidi,
175
176            local_max_streams_uni: max_streams_uni,
177            local_max_streams_uni_next: max_streams_uni,
178
179            max_stream_window,
180
181            ..StreamMap::default()
182        }
183    }
184
185    /// Returns the stream with the given ID if it exists.
186    pub fn get(&self, id: u64) -> Option<&Stream<F>> {
187        self.streams.get(&id)
188    }
189
190    /// Returns the mutable stream with the given ID if it exists.
191    pub fn get_mut(&mut self, id: u64) -> Option<&mut Stream<F>> {
192        self.streams.get_mut(&id)
193    }
194
195    /// Returns the mutable stream with the given ID if it exists, or creates
196    /// a new one otherwise.
197    ///
198    /// The `local` parameter indicates whether the stream's creation was
199    /// requested by the local application rather than the peer, and is
200    /// used to validate the requested stream ID, and to select the initial
201    /// flow control values from the local and remote transport parameters
202    /// (also passed as arguments).
203    ///
204    /// This also takes care of enforcing both local and the peer's stream
205    /// count limits. If one of these limits is violated, the `StreamLimit`
206    /// error is returned.
207    pub(crate) fn get_or_create(
208        &mut self, id: u64, local_params: &crate::TransportParams,
209        peer_params: &crate::TransportParams, local: bool, is_server: bool,
210    ) -> Result<&mut Stream<F>> {
211        let (stream, is_new_and_writable) = match self.streams.entry(id) {
212            hash_map::Entry::Vacant(v) => {
213                // Stream has already been closed and garbage collected.
214                if self.collected.contains(&id) {
215                    return Err(Error::Done);
216                }
217
218                if local != is_local(id, is_server) {
219                    return Err(Error::InvalidStreamState(id));
220                }
221
222                let (max_rx_data, max_tx_data) = match (local, is_bidi(id)) {
223                    // Locally-initiated bidirectional stream.
224                    (true, true) => (
225                        local_params.initial_max_stream_data_bidi_local,
226                        peer_params.initial_max_stream_data_bidi_remote,
227                    ),
228
229                    // Locally-initiated unidirectional stream.
230                    (true, false) => (0, peer_params.initial_max_stream_data_uni),
231
232                    // Remotely-initiated bidirectional stream.
233                    (false, true) => (
234                        local_params.initial_max_stream_data_bidi_remote,
235                        peer_params.initial_max_stream_data_bidi_local,
236                    ),
237
238                    // Remotely-initiated unidirectional stream.
239                    (false, false) =>
240                        (local_params.initial_max_stream_data_uni, 0),
241                };
242
243                // The two least significant bits from a stream id identify the
244                // type of stream. Truncate those bits to get the sequence for
245                // that stream type.
246                let stream_sequence = id >> 2;
247
248                // Enforce stream count limits.
249                match (is_local(id, is_server), is_bidi(id)) {
250                    (true, true) => {
251                        let n = std::cmp::max(
252                            self.local_opened_streams_bidi,
253                            stream_sequence + 1,
254                        );
255
256                        if n > self.peer_max_streams_bidi {
257                            return Err(Error::StreamLimit);
258                        }
259
260                        self.local_opened_streams_bidi = n;
261                    },
262
263                    (true, false) => {
264                        let n = std::cmp::max(
265                            self.local_opened_streams_uni,
266                            stream_sequence + 1,
267                        );
268
269                        if n > self.peer_max_streams_uni {
270                            return Err(Error::StreamLimit);
271                        }
272
273                        self.local_opened_streams_uni = n;
274                    },
275
276                    (false, true) => {
277                        let n = std::cmp::max(
278                            self.peer_opened_streams_bidi,
279                            stream_sequence + 1,
280                        );
281
282                        if n > self.local_max_streams_bidi {
283                            return Err(Error::StreamLimit);
284                        }
285
286                        self.peer_opened_streams_bidi = n;
287                    },
288
289                    (false, false) => {
290                        let n = std::cmp::max(
291                            self.peer_opened_streams_uni,
292                            stream_sequence + 1,
293                        );
294
295                        if n > self.local_max_streams_uni {
296                            return Err(Error::StreamLimit);
297                        }
298
299                        self.peer_opened_streams_uni = n;
300                    },
301                };
302
303                let s = Stream::new(
304                    id,
305                    max_rx_data,
306                    max_tx_data,
307                    is_bidi(id),
308                    local,
309                    self.max_stream_window,
310                );
311
312                let is_writable = s.is_writable();
313
314                (v.insert(s), is_writable)
315            },
316
317            hash_map::Entry::Occupied(v) => (v.into_mut(), false),
318        };
319
320        // Newly created stream might already be writable due to initial flow
321        // control limits.
322        if is_new_and_writable {
323            self.writable.insert(Arc::clone(&stream.priority_key));
324        }
325
326        Ok(stream)
327    }
328
329    /// Adds the stream ID to the readable streams set.
330    ///
331    /// If the stream was already in the list, this does nothing.
332    pub fn insert_readable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
333        if !priority_key.readable.is_linked() {
334            self.readable.insert(Arc::clone(priority_key));
335        }
336    }
337
338    /// Removes the stream ID from the readable streams set.
339    pub fn remove_readable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
340        if !priority_key.readable.is_linked() {
341            return;
342        }
343
344        let mut c = {
345            let ptr = Arc::as_ptr(priority_key);
346            unsafe { self.readable.cursor_mut_from_ptr(ptr) }
347        };
348
349        c.remove();
350    }
351
352    /// Adds the stream ID to the writable streams set.
353    ///
354    /// This should also be called anytime a new stream is created, in addition
355    /// to when an existing stream becomes writable.
356    ///
357    /// If the stream was already in the list, this does nothing.
358    pub fn insert_writable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
359        if !priority_key.writable.is_linked() {
360            self.writable.insert(Arc::clone(priority_key));
361        }
362    }
363
364    /// Removes the stream ID from the writable streams set.
365    ///
366    /// This should also be called anytime an existing stream stops being
367    /// writable.
368    pub fn remove_writable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
369        if !priority_key.writable.is_linked() {
370            return;
371        }
372
373        let mut c = {
374            let ptr = Arc::as_ptr(priority_key);
375            unsafe { self.writable.cursor_mut_from_ptr(ptr) }
376        };
377
378        c.remove();
379    }
380
381    /// Adds the stream ID to the flushable streams set.
382    ///
383    /// If the stream was already in the list, this does nothing.
384    pub fn insert_flushable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
385        if !priority_key.flushable.is_linked() {
386            self.flushable.insert(Arc::clone(priority_key));
387        }
388    }
389
390    /// Removes the stream ID from the flushable streams set.
391    pub fn remove_flushable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
392        if !priority_key.flushable.is_linked() {
393            return;
394        }
395
396        let mut c = {
397            let ptr = Arc::as_ptr(priority_key);
398            unsafe { self.flushable.cursor_mut_from_ptr(ptr) }
399        };
400
401        c.remove();
402    }
403
404    pub fn peek_flushable(&self) -> Option<Arc<StreamPriorityKey>> {
405        self.flushable.front().clone_pointer()
406    }
407
408    /// Updates the priorities of a stream.
409    pub fn update_priority(
410        &mut self, old: &Arc<StreamPriorityKey>, new: &Arc<StreamPriorityKey>,
411    ) {
412        if old.readable.is_linked() {
413            self.remove_readable(old);
414            self.readable.insert(Arc::clone(new));
415        }
416
417        if old.writable.is_linked() {
418            self.remove_writable(old);
419            self.writable.insert(Arc::clone(new));
420        }
421
422        if old.flushable.is_linked() {
423            self.remove_flushable(old);
424            self.flushable.insert(Arc::clone(new));
425        }
426    }
427
428    /// Adds the stream ID to the almost full streams set.
429    ///
430    /// If the stream was already in the list, this does nothing.
431    pub fn insert_almost_full(&mut self, stream_id: u64) {
432        self.almost_full.insert(stream_id);
433    }
434
435    /// Removes the stream ID from the almost full streams set.
436    pub fn remove_almost_full(&mut self, stream_id: u64) {
437        self.almost_full.remove(&stream_id);
438    }
439
440    /// Adds the stream ID to the blocked streams set with the
441    /// given offset value.
442    ///
443    /// If the stream was already in the list, this does nothing.
444    pub fn insert_blocked(&mut self, stream_id: u64, off: u64) {
445        self.blocked.insert(stream_id, off);
446    }
447
448    /// Removes the stream ID from the blocked streams set.
449    pub fn remove_blocked(&mut self, stream_id: u64) {
450        self.blocked.remove(&stream_id);
451    }
452
453    /// Adds the stream ID to the reset streams set with the
454    /// given error code and final size values.
455    ///
456    /// If the stream was already in the list, this does nothing.
457    pub fn insert_reset(
458        &mut self, stream_id: u64, error_code: u64, final_size: u64,
459    ) {
460        self.reset.insert(stream_id, (error_code, final_size));
461    }
462
463    /// Removes the stream ID from the reset streams set.
464    pub fn remove_reset(&mut self, stream_id: u64) {
465        self.reset.remove(&stream_id);
466    }
467
468    /// Adds the stream ID to the stopped streams set with the
469    /// given error code.
470    ///
471    /// If the stream was already in the list, this does nothing.
472    pub fn insert_stopped(&mut self, stream_id: u64, error_code: u64) {
473        self.stopped.insert(stream_id, error_code);
474    }
475
476    /// Removes the stream ID from the stopped streams set.
477    pub fn remove_stopped(&mut self, stream_id: u64) {
478        self.stopped.remove(&stream_id);
479    }
480
481    /// Updates the peer's maximum bidirectional stream count limit.
482    pub fn update_peer_max_streams_bidi(&mut self, v: u64) {
483        self.peer_max_streams_bidi = cmp::max(self.peer_max_streams_bidi, v);
484    }
485
486    /// Updates the peer's maximum unidirectional stream count limit.
487    pub fn update_peer_max_streams_uni(&mut self, v: u64) {
488        self.peer_max_streams_uni = cmp::max(self.peer_max_streams_uni, v);
489    }
490
491    /// Commits the new max_streams_bidi limit.
492    pub fn update_max_streams_bidi(&mut self) {
493        self.local_max_streams_bidi = self.local_max_streams_bidi_next;
494    }
495
496    /// Sets the max_streams_bidi limit to the given value.
497    pub fn set_max_streams_bidi(&mut self, max: u64) {
498        self.local_max_streams_bidi = max;
499        self.local_max_streams_bidi_next = max;
500    }
501
502    /// Returns the current max_streams_bidi limit.
503    pub fn max_streams_bidi(&self) -> u64 {
504        self.local_max_streams_bidi
505    }
506
507    /// Returns the new max_streams_bidi limit.
508    pub fn max_streams_bidi_next(&mut self) -> u64 {
509        self.local_max_streams_bidi_next
510    }
511
512    /// Commits the new max_streams_uni limit.
513    pub fn update_max_streams_uni(&mut self) {
514        self.local_max_streams_uni = self.local_max_streams_uni_next;
515    }
516
517    /// Returns the new max_streams_uni limit.
518    pub fn max_streams_uni_next(&mut self) -> u64 {
519        self.local_max_streams_uni_next
520    }
521
522    /// Returns the number of bidirectional streams that can be created
523    /// before the peer's stream count limit is reached.
524    pub fn peer_streams_left_bidi(&self) -> u64 {
525        self.peer_max_streams_bidi - self.local_opened_streams_bidi
526    }
527
528    /// Returns the number of unidirectional streams that can be created
529    /// before the peer's stream count limit is reached.
530    pub fn peer_streams_left_uni(&self) -> u64 {
531        self.peer_max_streams_uni - self.local_opened_streams_uni
532    }
533
534    /// Drops completed stream.
535    ///
536    /// This should only be called when Stream::is_complete() returns true for
537    /// the given stream.
538    pub fn collect(&mut self, stream_id: u64, local: bool) {
539        if !local {
540            // If the stream was created by the peer, give back a max streams
541            // credit.
542            if is_bidi(stream_id) {
543                self.local_max_streams_bidi_next =
544                    self.local_max_streams_bidi_next.saturating_add(1);
545            } else {
546                self.local_max_streams_uni_next =
547                    self.local_max_streams_uni_next.saturating_add(1);
548            }
549        }
550
551        let s = self.streams.remove(&stream_id).unwrap();
552
553        self.remove_readable(&s.priority_key);
554
555        self.remove_writable(&s.priority_key);
556
557        self.remove_flushable(&s.priority_key);
558
559        self.collected.insert(stream_id);
560    }
561
562    /// Creates an iterator over streams that have outstanding data to read.
563    pub fn readable(&self) -> StreamIter {
564        StreamIter {
565            streams: self.readable.iter().map(|s| s.id).collect(),
566            index: 0,
567        }
568    }
569
570    /// Creates an iterator over streams that can be written to.
571    pub fn writable(&self) -> StreamIter {
572        StreamIter {
573            streams: self.writable.iter().map(|s| s.id).collect(),
574            index: 0,
575        }
576    }
577
578    /// Creates an iterator over streams that need to send MAX_STREAM_DATA.
579    pub fn almost_full(&self) -> StreamIter {
580        StreamIter::from(&self.almost_full)
581    }
582
583    /// Creates an iterator over streams that need to send STREAM_DATA_BLOCKED.
584    pub fn blocked(&self) -> hash_map::Iter<u64, u64> {
585        self.blocked.iter()
586    }
587
588    /// Creates an iterator over streams that need to send RESET_STREAM.
589    pub fn reset(&self) -> hash_map::Iter<u64, (u64, u64)> {
590        self.reset.iter()
591    }
592
593    /// Creates an iterator over streams that need to send STOP_SENDING.
594    pub fn stopped(&self) -> hash_map::Iter<u64, u64> {
595        self.stopped.iter()
596    }
597
598    /// Returns true if the stream has been collected.
599    pub fn is_collected(&self, stream_id: u64) -> bool {
600        self.collected.contains(&stream_id)
601    }
602
603    /// Returns true if there are any streams that have data to write.
604    pub fn has_flushable(&self) -> bool {
605        !self.flushable.is_empty()
606    }
607
608    /// Returns true if there are any streams that have data to read.
609    pub fn has_readable(&self) -> bool {
610        !self.readable.is_empty()
611    }
612
613    /// Returns true if there are any streams that need to update the local
614    /// flow control limit.
615    pub fn has_almost_full(&self) -> bool {
616        !self.almost_full.is_empty()
617    }
618
619    /// Returns true if there are any streams that are blocked.
620    pub fn has_blocked(&self) -> bool {
621        !self.blocked.is_empty()
622    }
623
624    /// Returns true if there are any streams that are reset.
625    pub fn has_reset(&self) -> bool {
626        !self.reset.is_empty()
627    }
628
629    /// Returns true if there are any streams that need to send STOP_SENDING.
630    pub fn has_stopped(&self) -> bool {
631        !self.stopped.is_empty()
632    }
633
634    /// Returns true if the max bidirectional streams count needs to be updated
635    /// by sending a MAX_STREAMS frame to the peer.
636    pub fn should_update_max_streams_bidi(&self) -> bool {
637        self.local_max_streams_bidi_next != self.local_max_streams_bidi &&
638            self.local_max_streams_bidi_next / 2 >
639                self.local_max_streams_bidi - self.peer_opened_streams_bidi
640    }
641
642    /// Returns true if the max unidirectional streams count needs to be updated
643    /// by sending a MAX_STREAMS frame to the peer.
644    pub fn should_update_max_streams_uni(&self) -> bool {
645        self.local_max_streams_uni_next != self.local_max_streams_uni &&
646            self.local_max_streams_uni_next / 2 >
647                self.local_max_streams_uni - self.peer_opened_streams_uni
648    }
649
650    /// Returns the number of active streams in the map.
651    #[cfg(test)]
652    pub fn len(&self) -> usize {
653        self.streams.len()
654    }
655}
656
657/// A QUIC stream.
658pub struct Stream<F: BufFactory = DefaultBufFactory> {
659    /// Receive-side stream buffer.
660    pub recv: recv_buf::RecvBuf,
661
662    /// Send-side stream buffer.
663    pub send: send_buf::SendBuf<F>,
664
665    pub send_lowat: usize,
666
667    /// Whether the stream is bidirectional.
668    pub bidi: bool,
669
670    /// Whether the stream was created by the local endpoint.
671    pub local: bool,
672
673    /// The stream's urgency (lower is better). Default is `DEFAULT_URGENCY`.
674    pub urgency: u8,
675
676    /// Whether the stream can be flushed incrementally. Default is `true`.
677    pub incremental: bool,
678
679    pub priority_key: Arc<StreamPriorityKey>,
680}
681
682impl<F: BufFactory> Stream<F> {
683    /// Creates a new stream with the given flow control limits.
684    pub fn new(
685        id: u64, max_rx_data: u64, max_tx_data: u64, bidi: bool, local: bool,
686        max_window: u64,
687    ) -> Self {
688        let priority_key = Arc::new(StreamPriorityKey {
689            id,
690            ..Default::default()
691        });
692
693        Stream {
694            recv: recv_buf::RecvBuf::new(max_rx_data, max_window),
695            send: send_buf::SendBuf::new(max_tx_data),
696            send_lowat: 1,
697            bidi,
698            local,
699            urgency: priority_key.urgency,
700            incremental: priority_key.incremental,
701            priority_key,
702        }
703    }
704
705    /// Returns true if the stream has data to read.
706    pub fn is_readable(&self) -> bool {
707        self.recv.ready()
708    }
709
710    /// Returns true if the stream has enough flow control capacity to be
711    /// written to, and is not finished.
712    pub fn is_writable(&self) -> bool {
713        !self.send.is_shutdown() &&
714            !self.send.is_fin() &&
715            (self.send.off_back() + self.send_lowat as u64) <
716                self.send.max_off()
717    }
718
719    /// Returns true if the stream has data to send and is allowed to send at
720    /// least some of it.
721    pub fn is_flushable(&self) -> bool {
722        let off_front = self.send.off_front();
723
724        !self.send.is_empty() &&
725            off_front < self.send.off_back() &&
726            off_front < self.send.max_off()
727    }
728
729    /// Returns true if the stream is complete.
730    ///
731    /// For bidirectional streams this happens when both the receive and send
732    /// sides are complete. That is when all incoming data has been read by the
733    /// application, and when all outgoing data has been acked by the peer.
734    ///
735    /// For unidirectional streams this happens when either the receive or send
736    /// side is complete, depending on whether the stream was created locally
737    /// or not.
738    pub fn is_complete(&self) -> bool {
739        match (self.bidi, self.local) {
740            // For bidirectional streams we need to check both receive and send
741            // sides for completion.
742            (true, _) => self.recv.is_fin() && self.send.is_complete(),
743
744            // For unidirectional streams generated locally, we only need to
745            // check the send side for completion.
746            (false, true) => self.send.is_complete(),
747
748            // For unidirectional streams generated by the peer, we only need
749            // to check the receive side for completion.
750            (false, false) => self.recv.is_fin(),
751        }
752    }
753}
754
755/// Returns true if the stream was created locally.
756pub fn is_local(stream_id: u64, is_server: bool) -> bool {
757    (stream_id & 0x1) == (is_server as u64)
758}
759
760/// Returns true if the stream is bidirectional.
761pub fn is_bidi(stream_id: u64) -> bool {
762    (stream_id & 0x2) == 0
763}
764
765#[derive(Clone, Debug)]
766pub struct StreamPriorityKey {
767    pub urgency: u8,
768    pub incremental: bool,
769    pub id: u64,
770
771    pub readable: RBTreeAtomicLink,
772    pub writable: RBTreeAtomicLink,
773    pub flushable: RBTreeAtomicLink,
774}
775
776impl Default for StreamPriorityKey {
777    fn default() -> Self {
778        Self {
779            urgency: DEFAULT_URGENCY,
780            incremental: true,
781            id: Default::default(),
782            readable: Default::default(),
783            writable: Default::default(),
784            flushable: Default::default(),
785        }
786    }
787}
788
789impl PartialEq for StreamPriorityKey {
790    fn eq(&self, other: &Self) -> bool {
791        self.id == other.id
792    }
793}
794
795impl Eq for StreamPriorityKey {}
796
797impl PartialOrd for StreamPriorityKey {
798    // Priority ordering is complex, disable Clippy warning.
799    #[allow(clippy::non_canonical_partial_ord_impl)]
800    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
801        // Ignore priority if ID matches.
802        if self.id == other.id {
803            return Some(std::cmp::Ordering::Equal);
804        }
805
806        // First, order by urgency...
807        if self.urgency != other.urgency {
808            return self.urgency.partial_cmp(&other.urgency);
809        }
810
811        // ...when the urgency is the same, and both are not incremental, order
812        // by stream ID...
813        if !self.incremental && !other.incremental {
814            return self.id.partial_cmp(&other.id);
815        }
816
817        // ...non-incremental takes priority over incremental...
818        if self.incremental && !other.incremental {
819            return Some(std::cmp::Ordering::Greater);
820        }
821        if !self.incremental && other.incremental {
822            return Some(std::cmp::Ordering::Less);
823        }
824
825        // ...finally, when both are incremental, `other` takes precedence (so
826        // `self` is always sorted after other same-urgency incremental
827        // entries).
828        Some(std::cmp::Ordering::Greater)
829    }
830}
831
832impl Ord for StreamPriorityKey {
833    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
834        // `partial_cmp()` never returns `None`, so this should be safe.
835        self.partial_cmp(other).unwrap()
836    }
837}
838
839intrusive_adapter!(pub StreamWritablePriorityAdapter = Arc<StreamPriorityKey>: StreamPriorityKey { writable: RBTreeAtomicLink });
840
841impl KeyAdapter<'_> for StreamWritablePriorityAdapter {
842    type Key = StreamPriorityKey;
843
844    fn get_key(&self, s: &StreamPriorityKey) -> Self::Key {
845        s.clone()
846    }
847}
848
849intrusive_adapter!(pub StreamReadablePriorityAdapter = Arc<StreamPriorityKey>: StreamPriorityKey { readable: RBTreeAtomicLink });
850
851impl KeyAdapter<'_> for StreamReadablePriorityAdapter {
852    type Key = StreamPriorityKey;
853
854    fn get_key(&self, s: &StreamPriorityKey) -> Self::Key {
855        s.clone()
856    }
857}
858
859intrusive_adapter!(pub StreamFlushablePriorityAdapter = Arc<StreamPriorityKey>: StreamPriorityKey { flushable: RBTreeAtomicLink });
860
861impl KeyAdapter<'_> for StreamFlushablePriorityAdapter {
862    type Key = StreamPriorityKey;
863
864    fn get_key(&self, s: &StreamPriorityKey) -> Self::Key {
865        s.clone()
866    }
867}
868
869/// An iterator over QUIC streams.
870#[derive(Default)]
871pub struct StreamIter {
872    streams: SmallVec<[u64; 8]>,
873    index: usize,
874}
875
876impl StreamIter {
877    #[inline]
878    fn from(streams: &StreamIdHashSet) -> Self {
879        StreamIter {
880            streams: streams.iter().copied().collect(),
881            index: 0,
882        }
883    }
884}
885
886impl Iterator for StreamIter {
887    type Item = u64;
888
889    #[inline]
890    fn next(&mut self) -> Option<Self::Item> {
891        let v = self.streams.get(self.index)?;
892        self.index += 1;
893        Some(*v)
894    }
895}
896
897impl ExactSizeIterator for StreamIter {
898    #[inline]
899    fn len(&self) -> usize {
900        self.streams.len() - self.index
901    }
902}
903
904#[cfg(test)]
905mod tests {
906    use crate::range_buf::RangeBuf;
907
908    use super::*;
909
910    #[test]
911    fn recv_flow_control() {
912        let mut stream =
913            <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
914        assert!(!stream.recv.almost_full());
915
916        let mut buf = [0; 32];
917
918        let first = RangeBuf::from(b"hello", 0, false);
919        let second = RangeBuf::from(b"world", 5, false);
920        let third = RangeBuf::from(b"something", 10, false);
921
922        assert_eq!(stream.recv.write(second), Ok(()));
923        assert_eq!(stream.recv.write(first), Ok(()));
924        assert!(!stream.recv.almost_full());
925
926        assert_eq!(stream.recv.write(third), Err(Error::FlowControl));
927
928        let (len, fin) = stream.recv.emit(&mut buf).unwrap();
929        assert_eq!(&buf[..len], b"helloworld");
930        assert!(!fin);
931
932        assert!(stream.recv.almost_full());
933
934        stream.recv.update_max_data(std::time::Instant::now());
935        assert_eq!(stream.recv.max_data_next(), 25);
936        assert!(!stream.recv.almost_full());
937
938        let third = RangeBuf::from(b"something", 10, false);
939        assert_eq!(stream.recv.write(third), Ok(()));
940    }
941
942    #[test]
943    fn recv_past_fin() {
944        let mut stream =
945            <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
946        assert!(!stream.recv.almost_full());
947
948        let first = RangeBuf::from(b"hello", 0, true);
949        let second = RangeBuf::from(b"world", 5, false);
950
951        assert_eq!(stream.recv.write(first), Ok(()));
952        assert_eq!(stream.recv.write(second), Err(Error::FinalSize));
953    }
954
955    #[test]
956    fn recv_fin_dup() {
957        let mut stream =
958            <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
959        assert!(!stream.recv.almost_full());
960
961        let first = RangeBuf::from(b"hello", 0, true);
962        let second = RangeBuf::from(b"hello", 0, true);
963
964        assert_eq!(stream.recv.write(first), Ok(()));
965        assert_eq!(stream.recv.write(second), Ok(()));
966
967        let mut buf = [0; 32];
968
969        let (len, fin) = stream.recv.emit(&mut buf).unwrap();
970        assert_eq!(&buf[..len], b"hello");
971        assert!(fin);
972    }
973
974    #[test]
975    fn recv_fin_change() {
976        let mut stream =
977            <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
978        assert!(!stream.recv.almost_full());
979
980        let first = RangeBuf::from(b"hello", 0, true);
981        let second = RangeBuf::from(b"world", 5, true);
982
983        assert_eq!(stream.recv.write(second), Ok(()));
984        assert_eq!(stream.recv.write(first), Err(Error::FinalSize));
985    }
986
987    #[test]
988    fn recv_fin_lower_than_received() {
989        let mut stream =
990            <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
991        assert!(!stream.recv.almost_full());
992
993        let first = RangeBuf::from(b"hello", 0, true);
994        let second = RangeBuf::from(b"world", 5, false);
995
996        assert_eq!(stream.recv.write(second), Ok(()));
997        assert_eq!(stream.recv.write(first), Err(Error::FinalSize));
998    }
999
1000    #[test]
1001    fn recv_fin_flow_control() {
1002        let mut stream =
1003            <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1004        assert!(!stream.recv.almost_full());
1005
1006        let mut buf = [0; 32];
1007
1008        let first = RangeBuf::from(b"hello", 0, false);
1009        let second = RangeBuf::from(b"world", 5, true);
1010
1011        assert_eq!(stream.recv.write(first), Ok(()));
1012        assert_eq!(stream.recv.write(second), Ok(()));
1013
1014        let (len, fin) = stream.recv.emit(&mut buf).unwrap();
1015        assert_eq!(&buf[..len], b"helloworld");
1016        assert!(fin);
1017
1018        assert!(!stream.recv.almost_full());
1019    }
1020
1021    #[test]
1022    fn recv_fin_reset_mismatch() {
1023        let mut stream =
1024            <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1025        assert!(!stream.recv.almost_full());
1026
1027        let first = RangeBuf::from(b"hello", 0, true);
1028
1029        assert_eq!(stream.recv.write(first), Ok(()));
1030        assert_eq!(stream.recv.reset(0, 10), Err(Error::FinalSize));
1031    }
1032
1033    #[test]
1034    fn recv_reset_dup() {
1035        let mut stream =
1036            <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1037        assert!(!stream.recv.almost_full());
1038
1039        let first = RangeBuf::from(b"hello", 0, false);
1040
1041        assert_eq!(stream.recv.write(first), Ok(()));
1042        assert_eq!(stream.recv.reset(0, 5), Ok(0));
1043        assert_eq!(stream.recv.reset(0, 5), Ok(0));
1044    }
1045
1046    #[test]
1047    fn recv_reset_change() {
1048        let mut stream =
1049            <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1050        assert!(!stream.recv.almost_full());
1051
1052        let first = RangeBuf::from(b"hello", 0, false);
1053
1054        assert_eq!(stream.recv.write(first), Ok(()));
1055        assert_eq!(stream.recv.reset(0, 5), Ok(0));
1056        assert_eq!(stream.recv.reset(0, 10), Err(Error::FinalSize));
1057    }
1058
1059    #[test]
1060    fn recv_reset_lower_than_received() {
1061        let mut stream =
1062            <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1063        assert!(!stream.recv.almost_full());
1064
1065        let first = RangeBuf::from(b"hello", 0, false);
1066
1067        assert_eq!(stream.recv.write(first), Ok(()));
1068        assert_eq!(stream.recv.reset(0, 4), Err(Error::FinalSize));
1069    }
1070
1071    #[test]
1072    fn send_flow_control() {
1073        let mut buf = [0; 25];
1074
1075        let mut stream =
1076            <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1077
1078        let first = b"hello";
1079        let second = b"world";
1080        let third = b"something";
1081
1082        assert!(stream.send.write(first, false).is_ok());
1083        assert!(stream.send.write(second, false).is_ok());
1084        assert!(stream.send.write(third, false).is_ok());
1085
1086        assert_eq!(stream.send.off_front(), 0);
1087
1088        let (written, fin) = stream.send.emit(&mut buf[..25]).unwrap();
1089        assert_eq!(written, 15);
1090        assert!(!fin);
1091        assert_eq!(&buf[..written], b"helloworldsomet");
1092
1093        assert_eq!(stream.send.off_front(), 15);
1094
1095        let (written, fin) = stream.send.emit(&mut buf[..25]).unwrap();
1096        assert_eq!(written, 0);
1097        assert!(!fin);
1098        assert_eq!(&buf[..written], b"");
1099
1100        stream.send.retransmit(0, 15);
1101
1102        assert_eq!(stream.send.off_front(), 0);
1103
1104        let (written, fin) = stream.send.emit(&mut buf[..10]).unwrap();
1105        assert_eq!(written, 10);
1106        assert!(!fin);
1107        assert_eq!(&buf[..written], b"helloworld");
1108
1109        assert_eq!(stream.send.off_front(), 10);
1110
1111        let (written, fin) = stream.send.emit(&mut buf[..10]).unwrap();
1112        assert_eq!(written, 5);
1113        assert!(!fin);
1114        assert_eq!(&buf[..written], b"somet");
1115    }
1116
1117    #[test]
1118    fn send_past_fin() {
1119        let mut stream =
1120            <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1121
1122        let first = b"hello";
1123        let second = b"world";
1124        let third = b"third";
1125
1126        assert_eq!(stream.send.write(first, false), Ok(5));
1127
1128        assert_eq!(stream.send.write(second, true), Ok(5));
1129        assert!(stream.send.is_fin());
1130
1131        assert_eq!(stream.send.write(third, false), Err(Error::FinalSize));
1132    }
1133
1134    #[test]
1135    fn send_fin_dup() {
1136        let mut stream =
1137            <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1138
1139        assert_eq!(stream.send.write(b"hello", true), Ok(5));
1140        assert!(stream.send.is_fin());
1141
1142        assert_eq!(stream.send.write(b"", true), Ok(0));
1143        assert!(stream.send.is_fin());
1144    }
1145
1146    #[test]
1147    fn send_undo_fin() {
1148        let mut stream =
1149            <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1150
1151        assert_eq!(stream.send.write(b"hello", true), Ok(5));
1152        assert!(stream.send.is_fin());
1153
1154        assert_eq!(
1155            stream.send.write(b"helloworld", true),
1156            Err(Error::FinalSize)
1157        );
1158    }
1159
1160    #[test]
1161    fn send_fin_max_data_match() {
1162        let mut buf = [0; 15];
1163
1164        let mut stream =
1165            <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1166
1167        let slice = b"hellohellohello";
1168
1169        assert!(stream.send.write(slice, true).is_ok());
1170
1171        let (written, fin) = stream.send.emit(&mut buf[..15]).unwrap();
1172        assert_eq!(written, 15);
1173        assert!(fin);
1174        assert_eq!(&buf[..written], slice);
1175    }
1176
1177    #[test]
1178    fn send_fin_zero_length() {
1179        let mut buf = [0; 5];
1180
1181        let mut stream =
1182            <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1183
1184        assert_eq!(stream.send.write(b"hello", false), Ok(5));
1185        assert_eq!(stream.send.write(b"", true), Ok(0));
1186        assert!(stream.send.is_fin());
1187
1188        let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1189        assert_eq!(written, 5);
1190        assert!(fin);
1191        assert_eq!(&buf[..written], b"hello");
1192    }
1193
1194    #[test]
1195    fn send_ack() {
1196        let mut buf = [0; 5];
1197
1198        let mut stream =
1199            <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1200
1201        assert_eq!(stream.send.write(b"hello", false), Ok(5));
1202        assert_eq!(stream.send.write(b"world", false), Ok(5));
1203        assert_eq!(stream.send.write(b"", true), Ok(0));
1204        assert!(stream.send.is_fin());
1205
1206        assert_eq!(stream.send.off_front(), 0);
1207
1208        let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1209        assert_eq!(written, 5);
1210        assert!(!fin);
1211        assert_eq!(&buf[..written], b"hello");
1212
1213        stream.send.ack_and_drop(0, 5);
1214
1215        stream.send.retransmit(0, 5);
1216
1217        assert_eq!(stream.send.off_front(), 5);
1218
1219        let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1220        assert_eq!(written, 5);
1221        assert!(fin);
1222        assert_eq!(&buf[..written], b"world");
1223    }
1224
1225    #[test]
1226    fn send_ack_reordering() {
1227        let mut buf = [0; 5];
1228
1229        let mut stream =
1230            <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1231
1232        assert_eq!(stream.send.write(b"hello", false), Ok(5));
1233        assert_eq!(stream.send.write(b"world", false), Ok(5));
1234        assert_eq!(stream.send.write(b"", true), Ok(0));
1235        assert!(stream.send.is_fin());
1236
1237        assert_eq!(stream.send.off_front(), 0);
1238
1239        let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1240        assert_eq!(written, 5);
1241        assert!(!fin);
1242        assert_eq!(&buf[..written], b"hello");
1243
1244        assert_eq!(stream.send.off_front(), 5);
1245
1246        let (written, fin) = stream.send.emit(&mut buf[..1]).unwrap();
1247        assert_eq!(written, 1);
1248        assert!(!fin);
1249        assert_eq!(&buf[..written], b"w");
1250
1251        stream.send.ack_and_drop(5, 1);
1252        stream.send.ack_and_drop(0, 5);
1253
1254        stream.send.retransmit(0, 5);
1255        stream.send.retransmit(5, 1);
1256
1257        assert_eq!(stream.send.off_front(), 6);
1258
1259        let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1260        assert_eq!(written, 4);
1261        assert!(fin);
1262        assert_eq!(&buf[..written], b"orld");
1263    }
1264
1265    #[test]
1266    fn recv_data_below_off() {
1267        let mut stream =
1268            <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1269
1270        let first = RangeBuf::from(b"hello", 0, false);
1271
1272        assert_eq!(stream.recv.write(first), Ok(()));
1273
1274        let mut buf = [0; 10];
1275
1276        let (len, fin) = stream.recv.emit(&mut buf).unwrap();
1277        assert_eq!(&buf[..len], b"hello");
1278        assert!(!fin);
1279
1280        let first = RangeBuf::from(b"elloworld", 1, true);
1281        assert_eq!(stream.recv.write(first), Ok(()));
1282
1283        let (len, fin) = stream.recv.emit(&mut buf).unwrap();
1284        assert_eq!(&buf[..len], b"world");
1285        assert!(fin);
1286    }
1287
1288    #[test]
1289    fn stream_complete() {
1290        let mut stream =
1291            <Stream>::new(0, 30, 30, true, true, DEFAULT_STREAM_WINDOW);
1292
1293        assert_eq!(stream.send.write(b"hello", false), Ok(5));
1294        assert_eq!(stream.send.write(b"world", false), Ok(5));
1295
1296        assert!(!stream.send.is_complete());
1297        assert!(!stream.send.is_fin());
1298
1299        assert_eq!(stream.send.write(b"", true), Ok(0));
1300
1301        assert!(!stream.send.is_complete());
1302        assert!(stream.send.is_fin());
1303
1304        let buf = RangeBuf::from(b"hello", 0, true);
1305        assert!(stream.recv.write(buf).is_ok());
1306        assert!(!stream.recv.is_fin());
1307
1308        stream.send.ack(6, 4);
1309        assert!(!stream.send.is_complete());
1310
1311        let mut buf = [0; 2];
1312        assert_eq!(stream.recv.emit(&mut buf), Ok((2, false)));
1313        assert!(!stream.recv.is_fin());
1314
1315        stream.send.ack(1, 5);
1316        assert!(!stream.send.is_complete());
1317
1318        stream.send.ack(0, 1);
1319        assert!(stream.send.is_complete());
1320
1321        assert!(!stream.is_complete());
1322
1323        let mut buf = [0; 3];
1324        assert_eq!(stream.recv.emit(&mut buf), Ok((3, true)));
1325        assert!(stream.recv.is_fin());
1326
1327        assert!(stream.is_complete());
1328    }
1329
1330    #[test]
1331    fn send_fin_zero_length_output() {
1332        let mut buf = [0; 5];
1333
1334        let mut stream =
1335            <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1336
1337        assert_eq!(stream.send.write(b"hello", false), Ok(5));
1338        assert_eq!(stream.send.off_front(), 0);
1339        assert!(!stream.send.is_fin());
1340
1341        let (written, fin) = stream.send.emit(&mut buf).unwrap();
1342        assert_eq!(written, 5);
1343        assert!(!fin);
1344        assert_eq!(&buf[..written], b"hello");
1345
1346        assert_eq!(stream.send.write(b"", true), Ok(0));
1347        assert!(stream.send.is_fin());
1348        assert_eq!(stream.send.off_front(), 5);
1349
1350        let (written, fin) = stream.send.emit(&mut buf).unwrap();
1351        assert_eq!(written, 0);
1352        assert!(fin);
1353        assert_eq!(&buf[..written], b"");
1354    }
1355
1356    fn stream_send_ready(stream: &Stream) -> bool {
1357        !stream.send.is_empty() &&
1358            stream.send.off_front() < stream.send.off_back()
1359    }
1360
1361    #[test]
1362    fn send_emit() {
1363        let mut buf = [0; 5];
1364
1365        let mut stream =
1366            <Stream>::new(0, 0, 20, true, true, DEFAULT_STREAM_WINDOW);
1367
1368        assert_eq!(stream.send.write(b"hello", false), Ok(5));
1369        assert_eq!(stream.send.write(b"world", false), Ok(5));
1370        assert_eq!(stream.send.write(b"olleh", false), Ok(5));
1371        assert_eq!(stream.send.write(b"dlrow", true), Ok(5));
1372        assert_eq!(stream.send.off_front(), 0);
1373        assert_eq!(stream.send.bufs_count(), 4);
1374
1375        assert!(stream.is_flushable());
1376
1377        assert!(stream_send_ready(&stream));
1378        assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1379        assert_eq!(stream.send.off_front(), 4);
1380        assert_eq!(&buf[..4], b"hell");
1381
1382        assert!(stream_send_ready(&stream));
1383        assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1384        assert_eq!(stream.send.off_front(), 8);
1385        assert_eq!(&buf[..4], b"owor");
1386
1387        assert!(stream_send_ready(&stream));
1388        assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1389        assert_eq!(stream.send.off_front(), 10);
1390        assert_eq!(&buf[..2], b"ld");
1391
1392        assert!(stream_send_ready(&stream));
1393        assert_eq!(stream.send.emit(&mut buf[..1]), Ok((1, false)));
1394        assert_eq!(stream.send.off_front(), 11);
1395        assert_eq!(&buf[..1], b"o");
1396
1397        assert!(stream_send_ready(&stream));
1398        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1399        assert_eq!(stream.send.off_front(), 16);
1400        assert_eq!(&buf[..5], b"llehd");
1401
1402        assert!(stream_send_ready(&stream));
1403        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((4, true)));
1404        assert_eq!(stream.send.off_front(), 20);
1405        assert_eq!(&buf[..4], b"lrow");
1406
1407        assert!(!stream.is_flushable());
1408
1409        assert!(!stream_send_ready(&stream));
1410        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((0, true)));
1411        assert_eq!(stream.send.off_front(), 20);
1412    }
1413
1414    #[test]
1415    fn send_emit_ack() {
1416        let mut buf = [0; 5];
1417
1418        let mut stream =
1419            <Stream>::new(0, 0, 20, true, true, DEFAULT_STREAM_WINDOW);
1420
1421        assert_eq!(stream.send.write(b"hello", false), Ok(5));
1422        assert_eq!(stream.send.write(b"world", false), Ok(5));
1423        assert_eq!(stream.send.write(b"olleh", false), Ok(5));
1424        assert_eq!(stream.send.write(b"dlrow", true), Ok(5));
1425        assert_eq!(stream.send.off_front(), 0);
1426        assert_eq!(stream.send.bufs_count(), 4);
1427
1428        assert!(stream.is_flushable());
1429
1430        assert!(stream_send_ready(&stream));
1431        assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1432        assert_eq!(stream.send.off_front(), 4);
1433        assert_eq!(&buf[..4], b"hell");
1434
1435        assert!(stream_send_ready(&stream));
1436        assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1437        assert_eq!(stream.send.off_front(), 8);
1438        assert_eq!(&buf[..4], b"owor");
1439
1440        stream.send.ack_and_drop(0, 5);
1441        assert_eq!(stream.send.bufs_count(), 3);
1442
1443        assert!(stream_send_ready(&stream));
1444        assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1445        assert_eq!(stream.send.off_front(), 10);
1446        assert_eq!(&buf[..2], b"ld");
1447
1448        stream.send.ack_and_drop(7, 5);
1449        assert_eq!(stream.send.bufs_count(), 3);
1450
1451        assert!(stream_send_ready(&stream));
1452        assert_eq!(stream.send.emit(&mut buf[..1]), Ok((1, false)));
1453        assert_eq!(stream.send.off_front(), 11);
1454        assert_eq!(&buf[..1], b"o");
1455
1456        assert!(stream_send_ready(&stream));
1457        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1458        assert_eq!(stream.send.off_front(), 16);
1459        assert_eq!(&buf[..5], b"llehd");
1460
1461        stream.send.ack_and_drop(5, 7);
1462        assert_eq!(stream.send.bufs_count(), 2);
1463
1464        assert!(stream_send_ready(&stream));
1465        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((4, true)));
1466        assert_eq!(stream.send.off_front(), 20);
1467        assert_eq!(&buf[..4], b"lrow");
1468
1469        assert!(!stream.is_flushable());
1470
1471        assert!(!stream_send_ready(&stream));
1472        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((0, true)));
1473        assert_eq!(stream.send.off_front(), 20);
1474
1475        stream.send.ack_and_drop(22, 4);
1476        assert_eq!(stream.send.bufs_count(), 2);
1477
1478        stream.send.ack_and_drop(20, 1);
1479        assert_eq!(stream.send.bufs_count(), 2);
1480    }
1481
1482    #[test]
1483    fn send_emit_retransmit() {
1484        let mut buf = [0; 5];
1485
1486        let mut stream =
1487            <Stream>::new(0, 0, 20, true, true, DEFAULT_STREAM_WINDOW);
1488
1489        assert_eq!(stream.send.write(b"hello", false), Ok(5));
1490        assert_eq!(stream.send.write(b"world", false), Ok(5));
1491        assert_eq!(stream.send.write(b"olleh", false), Ok(5));
1492        assert_eq!(stream.send.write(b"dlrow", true), Ok(5));
1493        assert_eq!(stream.send.off_front(), 0);
1494        assert_eq!(stream.send.bufs_count(), 4);
1495
1496        assert!(stream.is_flushable());
1497
1498        assert!(stream_send_ready(&stream));
1499        assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1500        assert_eq!(stream.send.off_front(), 4);
1501        assert_eq!(&buf[..4], b"hell");
1502
1503        assert!(stream_send_ready(&stream));
1504        assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1505        assert_eq!(stream.send.off_front(), 8);
1506        assert_eq!(&buf[..4], b"owor");
1507
1508        stream.send.retransmit(3, 3);
1509        assert_eq!(stream.send.off_front(), 3);
1510
1511        assert!(stream_send_ready(&stream));
1512        assert_eq!(stream.send.emit(&mut buf[..3]), Ok((3, false)));
1513        assert_eq!(stream.send.off_front(), 8);
1514        assert_eq!(&buf[..3], b"low");
1515
1516        assert!(stream_send_ready(&stream));
1517        assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1518        assert_eq!(stream.send.off_front(), 10);
1519        assert_eq!(&buf[..2], b"ld");
1520
1521        stream.send.ack_and_drop(7, 2);
1522
1523        stream.send.retransmit(8, 2);
1524
1525        assert!(stream_send_ready(&stream));
1526        assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1527        assert_eq!(stream.send.off_front(), 10);
1528        assert_eq!(&buf[..2], b"ld");
1529
1530        assert!(stream_send_ready(&stream));
1531        assert_eq!(stream.send.emit(&mut buf[..1]), Ok((1, false)));
1532        assert_eq!(stream.send.off_front(), 11);
1533        assert_eq!(&buf[..1], b"o");
1534
1535        assert!(stream_send_ready(&stream));
1536        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1537        assert_eq!(stream.send.off_front(), 16);
1538        assert_eq!(&buf[..5], b"llehd");
1539
1540        stream.send.retransmit(12, 2);
1541
1542        assert!(stream_send_ready(&stream));
1543        assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1544        assert_eq!(stream.send.off_front(), 16);
1545        assert_eq!(&buf[..2], b"le");
1546
1547        assert!(stream_send_ready(&stream));
1548        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((4, true)));
1549        assert_eq!(stream.send.off_front(), 20);
1550        assert_eq!(&buf[..4], b"lrow");
1551
1552        assert!(!stream.is_flushable());
1553
1554        assert!(!stream_send_ready(&stream));
1555        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((0, true)));
1556        assert_eq!(stream.send.off_front(), 20);
1557
1558        stream.send.retransmit(7, 12);
1559
1560        assert!(stream_send_ready(&stream));
1561        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1562        assert_eq!(stream.send.off_front(), 12);
1563        assert_eq!(&buf[..5], b"rldol");
1564
1565        assert!(stream_send_ready(&stream));
1566        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1567        assert_eq!(stream.send.off_front(), 17);
1568        assert_eq!(&buf[..5], b"lehdl");
1569
1570        assert!(stream_send_ready(&stream));
1571        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((2, false)));
1572        assert_eq!(stream.send.off_front(), 20);
1573        assert_eq!(&buf[..2], b"ro");
1574
1575        stream.send.ack_and_drop(12, 7);
1576
1577        stream.send.retransmit(7, 12);
1578
1579        assert!(stream_send_ready(&stream));
1580        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1581        assert_eq!(stream.send.off_front(), 12);
1582        assert_eq!(&buf[..5], b"rldol");
1583
1584        assert!(stream_send_ready(&stream));
1585        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1586        assert_eq!(stream.send.off_front(), 17);
1587        assert_eq!(&buf[..5], b"lehdl");
1588
1589        assert!(stream_send_ready(&stream));
1590        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((2, false)));
1591        assert_eq!(stream.send.off_front(), 20);
1592        assert_eq!(&buf[..2], b"ro");
1593    }
1594
1595    #[test]
1596    fn rangebuf_split_off() {
1597        let mut buf = <RangeBuf>::from(b"helloworld", 5, true);
1598        assert_eq!(buf.start, 0);
1599        assert_eq!(buf.pos, 0);
1600        assert_eq!(buf.len, 10);
1601        assert_eq!(buf.off, 5);
1602        assert!(buf.fin);
1603
1604        assert_eq!(buf.len(), 10);
1605        assert_eq!(buf.off(), 5);
1606        assert!(buf.fin());
1607
1608        assert_eq!(&buf[..], b"helloworld");
1609
1610        // Advance buffer.
1611        buf.consume(5);
1612
1613        assert_eq!(buf.start, 0);
1614        assert_eq!(buf.pos, 5);
1615        assert_eq!(buf.len, 10);
1616        assert_eq!(buf.off, 5);
1617        assert!(buf.fin);
1618
1619        assert_eq!(buf.len(), 5);
1620        assert_eq!(buf.off(), 10);
1621        assert!(buf.fin());
1622
1623        assert_eq!(&buf[..], b"world");
1624
1625        // Split buffer before position.
1626        let mut new_buf = buf.split_off(3);
1627
1628        assert_eq!(buf.start, 0);
1629        assert_eq!(buf.pos, 3);
1630        assert_eq!(buf.len, 3);
1631        assert_eq!(buf.off, 5);
1632        assert!(!buf.fin);
1633
1634        assert_eq!(buf.len(), 0);
1635        assert_eq!(buf.off(), 8);
1636        assert!(!buf.fin());
1637
1638        assert_eq!(&buf[..], b"");
1639
1640        assert_eq!(new_buf.start, 3);
1641        assert_eq!(new_buf.pos, 5);
1642        assert_eq!(new_buf.len, 7);
1643        assert_eq!(new_buf.off, 8);
1644        assert!(new_buf.fin);
1645
1646        assert_eq!(new_buf.len(), 5);
1647        assert_eq!(new_buf.off(), 10);
1648        assert!(new_buf.fin());
1649
1650        assert_eq!(&new_buf[..], b"world");
1651
1652        // Advance buffer.
1653        new_buf.consume(2);
1654
1655        assert_eq!(new_buf.start, 3);
1656        assert_eq!(new_buf.pos, 7);
1657        assert_eq!(new_buf.len, 7);
1658        assert_eq!(new_buf.off, 8);
1659        assert!(new_buf.fin);
1660
1661        assert_eq!(new_buf.len(), 3);
1662        assert_eq!(new_buf.off(), 12);
1663        assert!(new_buf.fin());
1664
1665        assert_eq!(&new_buf[..], b"rld");
1666
1667        // Split buffer after position.
1668        let mut new_new_buf = new_buf.split_off(5);
1669
1670        assert_eq!(new_buf.start, 3);
1671        assert_eq!(new_buf.pos, 7);
1672        assert_eq!(new_buf.len, 5);
1673        assert_eq!(new_buf.off, 8);
1674        assert!(!new_buf.fin);
1675
1676        assert_eq!(new_buf.len(), 1);
1677        assert_eq!(new_buf.off(), 12);
1678        assert!(!new_buf.fin());
1679
1680        assert_eq!(&new_buf[..], b"r");
1681
1682        assert_eq!(new_new_buf.start, 8);
1683        assert_eq!(new_new_buf.pos, 8);
1684        assert_eq!(new_new_buf.len, 2);
1685        assert_eq!(new_new_buf.off, 13);
1686        assert!(new_new_buf.fin);
1687
1688        assert_eq!(new_new_buf.len(), 2);
1689        assert_eq!(new_new_buf.off(), 13);
1690        assert!(new_new_buf.fin());
1691
1692        assert_eq!(&new_new_buf[..], b"ld");
1693
1694        // Advance buffer.
1695        new_new_buf.consume(2);
1696
1697        assert_eq!(new_new_buf.start, 8);
1698        assert_eq!(new_new_buf.pos, 10);
1699        assert_eq!(new_new_buf.len, 2);
1700        assert_eq!(new_new_buf.off, 13);
1701        assert!(new_new_buf.fin);
1702
1703        assert_eq!(new_new_buf.len(), 0);
1704        assert_eq!(new_new_buf.off(), 15);
1705        assert!(new_new_buf.fin());
1706
1707        assert_eq!(&new_new_buf[..], b"");
1708    }
1709
1710    /// RFC9000 2.1: A stream ID that is used out of order results in all
1711    /// streams of that type with lower-numbered stream IDs also being opened.
1712    #[test]
1713    fn stream_limit_auto_open() {
1714        let local_tp = crate::TransportParams::default();
1715        let peer_tp = crate::TransportParams::default();
1716
1717        let mut streams = <StreamMap>::new(5, 5, 5);
1718
1719        let stream_id = 500;
1720        assert!(!is_local(stream_id, true), "stream id is peer initiated");
1721        assert!(is_bidi(stream_id), "stream id is bidirectional");
1722        assert_eq!(
1723            streams
1724                .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
1725                .err(),
1726            Some(Error::StreamLimit),
1727            "stream limit should be exceeded"
1728        );
1729    }
1730
1731    /// Stream limit should be satisfied regardless of what order we open
1732    /// streams
1733    #[test]
1734    fn stream_create_out_of_order() {
1735        let local_tp = crate::TransportParams::default();
1736        let peer_tp = crate::TransportParams::default();
1737
1738        let mut streams = <StreamMap>::new(5, 5, 5);
1739
1740        for stream_id in [8, 12, 4] {
1741            assert!(is_local(stream_id, false), "stream id is client initiated");
1742            assert!(is_bidi(stream_id), "stream id is bidirectional");
1743            assert!(streams
1744                .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
1745                .is_ok());
1746        }
1747    }
1748
1749    /// Check stream limit boundary cases
1750    #[test]
1751    fn stream_limit_edge() {
1752        let local_tp = crate::TransportParams::default();
1753        let peer_tp = crate::TransportParams::default();
1754
1755        let mut streams = <StreamMap>::new(3, 3, 3);
1756
1757        // Highest permitted
1758        let stream_id = 8;
1759        assert!(streams
1760            .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
1761            .is_ok());
1762
1763        // One more than highest permitted
1764        let stream_id = 12;
1765        assert_eq!(
1766            streams
1767                .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
1768                .err(),
1769            Some(Error::StreamLimit)
1770        );
1771    }
1772
1773    fn cycle_stream_priority(stream_id: u64, streams: &mut StreamMap) {
1774        let key = streams.get(stream_id).unwrap().priority_key.clone();
1775        streams.update_priority(&key.clone(), &key);
1776    }
1777
1778    #[test]
1779    fn writable_prioritized_default_priority() {
1780        let local_tp = crate::TransportParams::default();
1781        let peer_tp = crate::TransportParams {
1782            initial_max_stream_data_bidi_local: 100,
1783            initial_max_stream_data_uni: 100,
1784            ..Default::default()
1785        };
1786
1787        let mut streams = StreamMap::new(100, 100, 100);
1788
1789        for id in [0, 4, 8, 12] {
1790            assert!(streams
1791                .get_or_create(id, &local_tp, &peer_tp, false, true)
1792                .is_ok());
1793        }
1794
1795        let walk_1: Vec<u64> = streams.writable().collect();
1796        cycle_stream_priority(*walk_1.first().unwrap(), &mut streams);
1797        let walk_2: Vec<u64> = streams.writable().collect();
1798        cycle_stream_priority(*walk_2.first().unwrap(), &mut streams);
1799        let walk_3: Vec<u64> = streams.writable().collect();
1800        cycle_stream_priority(*walk_3.first().unwrap(), &mut streams);
1801        let walk_4: Vec<u64> = streams.writable().collect();
1802        cycle_stream_priority(*walk_4.first().unwrap(), &mut streams);
1803        let walk_5: Vec<u64> = streams.writable().collect();
1804
1805        // All streams are non-incremental and same urgency by default. Multiple
1806        // visits shuffle their order.
1807        assert_eq!(walk_1, vec![0, 4, 8, 12]);
1808        assert_eq!(walk_2, vec![4, 8, 12, 0]);
1809        assert_eq!(walk_3, vec![8, 12, 0, 4]);
1810        assert_eq!(walk_4, vec![12, 0, 4, 8,]);
1811        assert_eq!(walk_5, vec![0, 4, 8, 12]);
1812    }
1813
1814    #[test]
1815    fn writable_prioritized_insert_order() {
1816        let local_tp = crate::TransportParams::default();
1817        let peer_tp = crate::TransportParams {
1818            initial_max_stream_data_bidi_local: 100,
1819            initial_max_stream_data_uni: 100,
1820            ..Default::default()
1821        };
1822
1823        let mut streams = StreamMap::new(100, 100, 100);
1824
1825        // Inserting same-urgency incremental streams in a "random" order yields
1826        // same order to start with.
1827        for id in [12, 4, 8, 0] {
1828            assert!(streams
1829                .get_or_create(id, &local_tp, &peer_tp, false, true)
1830                .is_ok());
1831        }
1832
1833        let walk_1: Vec<u64> = streams.writable().collect();
1834        cycle_stream_priority(*walk_1.first().unwrap(), &mut streams);
1835        let walk_2: Vec<u64> = streams.writable().collect();
1836        cycle_stream_priority(*walk_2.first().unwrap(), &mut streams);
1837        let walk_3: Vec<u64> = streams.writable().collect();
1838        cycle_stream_priority(*walk_3.first().unwrap(), &mut streams);
1839        let walk_4: Vec<u64> = streams.writable().collect();
1840        cycle_stream_priority(*walk_4.first().unwrap(), &mut streams);
1841        let walk_5: Vec<u64> = streams.writable().collect();
1842        assert_eq!(walk_1, vec![12, 4, 8, 0]);
1843        assert_eq!(walk_2, vec![4, 8, 0, 12]);
1844        assert_eq!(walk_3, vec![8, 0, 12, 4,]);
1845        assert_eq!(walk_4, vec![0, 12, 4, 8]);
1846        assert_eq!(walk_5, vec![12, 4, 8, 0]);
1847    }
1848
1849    #[test]
1850    fn writable_prioritized_mixed_urgency() {
1851        let local_tp = crate::TransportParams::default();
1852        let peer_tp = crate::TransportParams {
1853            initial_max_stream_data_bidi_local: 100,
1854            initial_max_stream_data_uni: 100,
1855            ..Default::default()
1856        };
1857
1858        let mut streams = <StreamMap>::new(100, 100, 100);
1859
1860        // Streams where the urgency descends (becomes more important). No stream
1861        // shares an urgency.
1862        let input = vec![
1863            (0, 100),
1864            (4, 90),
1865            (8, 80),
1866            (12, 70),
1867            (16, 60),
1868            (20, 50),
1869            (24, 40),
1870            (28, 30),
1871            (32, 20),
1872            (36, 10),
1873            (40, 0),
1874        ];
1875
1876        for (id, urgency) in input.clone() {
1877            // this duplicates some code from stream_priority in order to access
1878            // streams and the collection they're in
1879            let stream = streams
1880                .get_or_create(id, &local_tp, &peer_tp, false, true)
1881                .unwrap();
1882
1883            stream.urgency = urgency;
1884
1885            let new_priority_key = Arc::new(StreamPriorityKey {
1886                urgency: stream.urgency,
1887                incremental: stream.incremental,
1888                id,
1889                ..Default::default()
1890            });
1891
1892            let old_priority_key = std::mem::replace(
1893                &mut stream.priority_key,
1894                new_priority_key.clone(),
1895            );
1896
1897            streams.update_priority(&old_priority_key, &new_priority_key);
1898        }
1899
1900        let walk_1: Vec<u64> = streams.writable().collect();
1901        assert_eq!(walk_1, vec![40, 36, 32, 28, 24, 20, 16, 12, 8, 4, 0]);
1902
1903        // Re-applying priority to a stream does not cause duplication.
1904        for (id, urgency) in input {
1905            // this duplicates some code from stream_priority in order to access
1906            // streams and the collection they're in
1907            let stream = streams
1908                .get_or_create(id, &local_tp, &peer_tp, false, true)
1909                .unwrap();
1910
1911            stream.urgency = urgency;
1912
1913            let new_priority_key = Arc::new(StreamPriorityKey {
1914                urgency: stream.urgency,
1915                incremental: stream.incremental,
1916                id,
1917                ..Default::default()
1918            });
1919
1920            let old_priority_key = std::mem::replace(
1921                &mut stream.priority_key,
1922                new_priority_key.clone(),
1923            );
1924
1925            streams.update_priority(&old_priority_key, &new_priority_key);
1926        }
1927
1928        let walk_2: Vec<u64> = streams.writable().collect();
1929        assert_eq!(walk_2, vec![40, 36, 32, 28, 24, 20, 16, 12, 8, 4, 0]);
1930
1931        // Removing streams doesn't break expected ordering.
1932        streams.collect(24, true);
1933
1934        let walk_3: Vec<u64> = streams.writable().collect();
1935        assert_eq!(walk_3, vec![40, 36, 32, 28, 20, 16, 12, 8, 4, 0]);
1936
1937        streams.collect(40, true);
1938        streams.collect(0, true);
1939
1940        let walk_4: Vec<u64> = streams.writable().collect();
1941        assert_eq!(walk_4, vec![36, 32, 28, 20, 16, 12, 8, 4]);
1942
1943        // Adding streams doesn't break expected ordering.
1944        streams
1945            .get_or_create(44, &local_tp, &peer_tp, false, true)
1946            .unwrap();
1947
1948        let walk_5: Vec<u64> = streams.writable().collect();
1949        assert_eq!(walk_5, vec![36, 32, 28, 20, 16, 12, 8, 4, 44]);
1950    }
1951
1952    #[test]
1953    fn writable_prioritized_mixed_urgencies_incrementals() {
1954        let local_tp = crate::TransportParams::default();
1955        let peer_tp = crate::TransportParams {
1956            initial_max_stream_data_bidi_local: 100,
1957            initial_max_stream_data_uni: 100,
1958            ..Default::default()
1959        };
1960
1961        let mut streams = StreamMap::new(100, 100, 100);
1962
1963        // Streams that share some urgency level
1964        let input = vec![
1965            (0, 100),
1966            (4, 20),
1967            (8, 100),
1968            (12, 20),
1969            (16, 90),
1970            (20, 25),
1971            (24, 90),
1972            (28, 30),
1973            (32, 80),
1974            (36, 20),
1975            (40, 0),
1976        ];
1977
1978        for (id, urgency) in input.clone() {
1979            // this duplicates some code from stream_priority in order to access
1980            // streams and the collection they're in
1981            let stream = streams
1982                .get_or_create(id, &local_tp, &peer_tp, false, true)
1983                .unwrap();
1984
1985            stream.urgency = urgency;
1986
1987            let new_priority_key = Arc::new(StreamPriorityKey {
1988                urgency: stream.urgency,
1989                incremental: stream.incremental,
1990                id,
1991                ..Default::default()
1992            });
1993
1994            let old_priority_key = std::mem::replace(
1995                &mut stream.priority_key,
1996                new_priority_key.clone(),
1997            );
1998
1999            streams.update_priority(&old_priority_key, &new_priority_key);
2000        }
2001
2002        let walk_1: Vec<u64> = streams.writable().collect();
2003        cycle_stream_priority(4, &mut streams);
2004        cycle_stream_priority(16, &mut streams);
2005        cycle_stream_priority(0, &mut streams);
2006        let walk_2: Vec<u64> = streams.writable().collect();
2007        cycle_stream_priority(12, &mut streams);
2008        cycle_stream_priority(24, &mut streams);
2009        cycle_stream_priority(8, &mut streams);
2010        let walk_3: Vec<u64> = streams.writable().collect();
2011        cycle_stream_priority(36, &mut streams);
2012        cycle_stream_priority(16, &mut streams);
2013        cycle_stream_priority(0, &mut streams);
2014        let walk_4: Vec<u64> = streams.writable().collect();
2015        cycle_stream_priority(4, &mut streams);
2016        cycle_stream_priority(24, &mut streams);
2017        cycle_stream_priority(8, &mut streams);
2018        let walk_5: Vec<u64> = streams.writable().collect();
2019        cycle_stream_priority(12, &mut streams);
2020        cycle_stream_priority(16, &mut streams);
2021        cycle_stream_priority(0, &mut streams);
2022        let walk_6: Vec<u64> = streams.writable().collect();
2023        cycle_stream_priority(36, &mut streams);
2024        cycle_stream_priority(24, &mut streams);
2025        cycle_stream_priority(8, &mut streams);
2026        let walk_7: Vec<u64> = streams.writable().collect();
2027        cycle_stream_priority(4, &mut streams);
2028        cycle_stream_priority(16, &mut streams);
2029        cycle_stream_priority(0, &mut streams);
2030        let walk_8: Vec<u64> = streams.writable().collect();
2031        cycle_stream_priority(12, &mut streams);
2032        cycle_stream_priority(24, &mut streams);
2033        cycle_stream_priority(8, &mut streams);
2034        let walk_9: Vec<u64> = streams.writable().collect();
2035        cycle_stream_priority(36, &mut streams);
2036        cycle_stream_priority(16, &mut streams);
2037        cycle_stream_priority(0, &mut streams);
2038
2039        assert_eq!(walk_1, vec![40, 4, 12, 36, 20, 28, 32, 16, 24, 0, 8]);
2040        assert_eq!(walk_2, vec![40, 12, 36, 4, 20, 28, 32, 24, 16, 8, 0]);
2041        assert_eq!(walk_3, vec![40, 36, 4, 12, 20, 28, 32, 16, 24, 0, 8]);
2042        assert_eq!(walk_4, vec![40, 4, 12, 36, 20, 28, 32, 24, 16, 8, 0]);
2043        assert_eq!(walk_5, vec![40, 12, 36, 4, 20, 28, 32, 16, 24, 0, 8]);
2044        assert_eq!(walk_6, vec![40, 36, 4, 12, 20, 28, 32, 24, 16, 8, 0]);
2045        assert_eq!(walk_7, vec![40, 4, 12, 36, 20, 28, 32, 16, 24, 0, 8]);
2046        assert_eq!(walk_8, vec![40, 12, 36, 4, 20, 28, 32, 24, 16, 8, 0]);
2047        assert_eq!(walk_9, vec![40, 36, 4, 12, 20, 28, 32, 16, 24, 0, 8]);
2048
2049        // Removing streams doesn't break expected ordering.
2050        streams.collect(20, true);
2051
2052        let walk_10: Vec<u64> = streams.writable().collect();
2053        assert_eq!(walk_10, vec![40, 4, 12, 36, 28, 32, 24, 16, 8, 0]);
2054
2055        // Adding streams doesn't break expected ordering.
2056        let stream = streams
2057            .get_or_create(44, &local_tp, &peer_tp, false, true)
2058            .unwrap();
2059
2060        stream.urgency = 20;
2061        stream.incremental = true;
2062
2063        let new_priority_key = Arc::new(StreamPriorityKey {
2064            urgency: stream.urgency,
2065            incremental: stream.incremental,
2066            id: 44,
2067            ..Default::default()
2068        });
2069
2070        let old_priority_key =
2071            std::mem::replace(&mut stream.priority_key, new_priority_key.clone());
2072
2073        streams.update_priority(&old_priority_key, &new_priority_key);
2074
2075        let walk_11: Vec<u64> = streams.writable().collect();
2076        assert_eq!(walk_11, vec![40, 4, 12, 36, 44, 28, 32, 24, 16, 8, 0]);
2077    }
2078
2079    #[test]
2080    fn priority_tree_dupes() {
2081        let mut prioritized_writable: RBTree<StreamWritablePriorityAdapter> =
2082            Default::default();
2083
2084        for id in [0, 4, 8, 12] {
2085            let s = Arc::new(StreamPriorityKey {
2086                urgency: 0,
2087                incremental: false,
2088                id,
2089                ..Default::default()
2090            });
2091
2092            prioritized_writable.insert(s);
2093        }
2094
2095        let walk_1: Vec<u64> =
2096            prioritized_writable.iter().map(|s| s.id).collect();
2097        assert_eq!(walk_1, vec![0, 4, 8, 12]);
2098
2099        // Default keys could cause duplicate entries, this is normally protected
2100        // against via StreamMap.
2101        for id in [0, 4, 8, 12] {
2102            let s = Arc::new(StreamPriorityKey {
2103                urgency: 0,
2104                incremental: false,
2105                id,
2106                ..Default::default()
2107            });
2108
2109            prioritized_writable.insert(s);
2110        }
2111
2112        let walk_2: Vec<u64> =
2113            prioritized_writable.iter().map(|s| s.id).collect();
2114        assert_eq!(walk_2, vec![0, 0, 4, 4, 8, 8, 12, 12]);
2115    }
2116}
2117
2118mod recv_buf;
2119mod send_buf;