quiche/stream/
mod.rs

1// Copyright (C) 2018-2019, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27use std::cmp;
28
29use std::sync::Arc;
30
31use std::collections::hash_map;
32use std::collections::HashMap;
33use std::collections::HashSet;
34
35use intrusive_collections::intrusive_adapter;
36use intrusive_collections::KeyAdapter;
37use intrusive_collections::RBTree;
38use intrusive_collections::RBTreeAtomicLink;
39
40use smallvec::SmallVec;
41
42use crate::range_buf::DefaultBufFactory;
43use crate::BufFactory;
44use crate::Error;
45use crate::Result;
46
47const DEFAULT_URGENCY: u8 = 127;
48
49// The default size of the receiver stream flow control window.
50const DEFAULT_STREAM_WINDOW: u64 = 32 * 1024;
51
52/// The maximum size of the receiver stream flow control window.
53pub const MAX_STREAM_WINDOW: u64 = 16 * 1024 * 1024;
54
55/// A simple no-op hasher for Stream IDs.
56///
57/// The QUIC protocol and quiche library guarantees stream ID uniqueness, so
58/// we can save effort by avoiding using a more complicated algorithm.
59#[derive(Default)]
60pub struct StreamIdHasher {
61    id: u64,
62}
63
64impl std::hash::Hasher for StreamIdHasher {
65    #[inline]
66    fn finish(&self) -> u64 {
67        self.id
68    }
69
70    #[inline]
71    fn write_u64(&mut self, id: u64) {
72        self.id = id;
73    }
74
75    #[inline]
76    fn write(&mut self, _: &[u8]) {
77        // We need a default write() for the trait but stream IDs will always
78        // be a u64 so we just delegate to write_u64.
79        unimplemented!()
80    }
81}
82
83type BuildStreamIdHasher = std::hash::BuildHasherDefault<StreamIdHasher>;
84
85pub type StreamIdHashMap<V> = HashMap<u64, V, BuildStreamIdHasher>;
86pub type StreamIdHashSet = HashSet<u64, BuildStreamIdHasher>;
87
88/// Keeps track of QUIC streams and enforces stream limits.
89#[derive(Default)]
90pub struct StreamMap<F: BufFactory = DefaultBufFactory> {
91    /// Map of streams indexed by stream ID.
92    streams: StreamIdHashMap<Stream<F>>,
93
94    /// Set of streams that were completed and garbage collected.
95    ///
96    /// Instead of keeping the full stream state forever, we collect completed
97    /// streams to save memory, but we still need to keep track of previously
98    /// created streams, to prevent peers from re-creating them.
99    collected: StreamIdHashSet,
100
101    /// Peer's maximum bidirectional stream count limit.
102    peer_max_streams_bidi: u64,
103
104    /// Peer's maximum unidirectional stream count limit.
105    peer_max_streams_uni: u64,
106
107    /// The total number of bidirectional streams opened by the peer.
108    peer_opened_streams_bidi: u64,
109
110    /// The total number of unidirectional streams opened by the peer.
111    peer_opened_streams_uni: u64,
112
113    /// Local maximum bidirectional stream count limit.
114    local_max_streams_bidi: u64,
115    local_max_streams_bidi_next: u64,
116
117    /// Local maximum unidirectional stream count limit.
118    local_max_streams_uni: u64,
119    local_max_streams_uni_next: u64,
120
121    /// The total number of bidirectional streams opened by the local endpoint.
122    local_opened_streams_bidi: u64,
123
124    /// The total number of unidirectional streams opened by the local endpoint.
125    local_opened_streams_uni: u64,
126
127    /// Queue of stream IDs corresponding to streams that have buffered data
128    /// ready to be sent to the peer. This also implies that the stream has
129    /// enough flow control credits to send at least some of that data.
130    flushable: RBTree<StreamFlushablePriorityAdapter>,
131
132    /// Set of stream IDs corresponding to streams that have outstanding data
133    /// to read. This is used to generate a `StreamIter` of streams without
134    /// having to iterate over the full list of streams.
135    pub readable: RBTree<StreamReadablePriorityAdapter>,
136
137    /// Set of stream IDs corresponding to streams that have enough flow control
138    /// capacity to be written to, and is not finished. This is used to generate
139    /// a `StreamIter` of streams without having to iterate over the full list
140    /// of streams.
141    pub writable: RBTree<StreamWritablePriorityAdapter>,
142
143    /// Set of stream IDs corresponding to streams that are almost out of flow
144    /// control credit and need to send MAX_STREAM_DATA. This is used to
145    /// generate a `StreamIter` of streams without having to iterate over the
146    /// full list of streams.
147    almost_full: StreamIdHashSet,
148
149    /// Set of stream IDs corresponding to streams that are blocked. The value
150    /// of the map elements represents the offset of the stream at which the
151    /// blocking occurred.
152    blocked: StreamIdHashMap<u64>,
153
154    /// Set of stream IDs corresponding to streams that are reset. The value
155    /// of the map elements is a tuple of the error code and final size values
156    /// to include in the RESET_STREAM frame.
157    reset: StreamIdHashMap<(u64, u64)>,
158
159    /// Set of stream IDs corresponding to streams that are shutdown on the
160    /// receive side, and need to send a STOP_SENDING frame. The value of the
161    /// map elements is the error code to include in the STOP_SENDING frame.
162    stopped: StreamIdHashMap<u64>,
163
164    /// The maximum size of a stream window.
165    max_stream_window: u64,
166}
167
168impl<F: BufFactory> StreamMap<F> {
169    pub fn new(
170        max_streams_bidi: u64, max_streams_uni: u64, max_stream_window: u64,
171    ) -> Self {
172        StreamMap {
173            local_max_streams_bidi: max_streams_bidi,
174            local_max_streams_bidi_next: max_streams_bidi,
175
176            local_max_streams_uni: max_streams_uni,
177            local_max_streams_uni_next: max_streams_uni,
178
179            max_stream_window,
180
181            ..StreamMap::default()
182        }
183    }
184
185    /// Returns the stream with the given ID if it exists.
186    pub fn get(&self, id: u64) -> Option<&Stream<F>> {
187        self.streams.get(&id)
188    }
189
190    /// Returns the mutable stream with the given ID if it exists.
191    pub fn get_mut(&mut self, id: u64) -> Option<&mut Stream<F>> {
192        self.streams.get_mut(&id)
193    }
194
195    /// Returns the mutable stream with the given ID if it exists, or creates
196    /// a new one otherwise.
197    ///
198    /// The `local` parameter indicates whether the stream's creation was
199    /// requested by the local application rather than the peer, and is
200    /// used to validate the requested stream ID, and to select the initial
201    /// flow control values from the local and remote transport parameters
202    /// (also passed as arguments).
203    ///
204    /// This also takes care of enforcing both local and the peer's stream
205    /// count limits. If one of these limits is violated, the `StreamLimit`
206    /// error is returned.
207    pub(crate) fn get_or_create(
208        &mut self, id: u64, local_params: &crate::TransportParams,
209        peer_params: &crate::TransportParams, local: bool, is_server: bool,
210    ) -> Result<&mut Stream<F>> {
211        let (stream, is_new_and_writable) = match self.streams.entry(id) {
212            hash_map::Entry::Vacant(v) => {
213                // Stream has already been closed and garbage collected.
214                if self.collected.contains(&id) {
215                    return Err(Error::Done);
216                }
217
218                if local != is_local(id, is_server) {
219                    return Err(Error::InvalidStreamState(id));
220                }
221
222                let (max_rx_data, max_tx_data) = match (local, is_bidi(id)) {
223                    // Locally-initiated bidirectional stream.
224                    (true, true) => (
225                        local_params.initial_max_stream_data_bidi_local,
226                        peer_params.initial_max_stream_data_bidi_remote,
227                    ),
228
229                    // Locally-initiated unidirectional stream.
230                    (true, false) => (0, peer_params.initial_max_stream_data_uni),
231
232                    // Remotely-initiated bidirectional stream.
233                    (false, true) => (
234                        local_params.initial_max_stream_data_bidi_remote,
235                        peer_params.initial_max_stream_data_bidi_local,
236                    ),
237
238                    // Remotely-initiated unidirectional stream.
239                    (false, false) =>
240                        (local_params.initial_max_stream_data_uni, 0),
241                };
242
243                // The two least significant bits from a stream id identify the
244                // type of stream. Truncate those bits to get the sequence for
245                // that stream type.
246                let stream_sequence = id >> 2;
247
248                // Enforce stream count limits.
249                match (is_local(id, is_server), is_bidi(id)) {
250                    (true, true) => {
251                        let n = std::cmp::max(
252                            self.local_opened_streams_bidi,
253                            stream_sequence + 1,
254                        );
255
256                        if n > self.peer_max_streams_bidi {
257                            return Err(Error::StreamLimit);
258                        }
259
260                        self.local_opened_streams_bidi = n;
261                    },
262
263                    (true, false) => {
264                        let n = std::cmp::max(
265                            self.local_opened_streams_uni,
266                            stream_sequence + 1,
267                        );
268
269                        if n > self.peer_max_streams_uni {
270                            return Err(Error::StreamLimit);
271                        }
272
273                        self.local_opened_streams_uni = n;
274                    },
275
276                    (false, true) => {
277                        let n = std::cmp::max(
278                            self.peer_opened_streams_bidi,
279                            stream_sequence + 1,
280                        );
281
282                        if n > self.local_max_streams_bidi {
283                            return Err(Error::StreamLimit);
284                        }
285
286                        self.peer_opened_streams_bidi = n;
287                    },
288
289                    (false, false) => {
290                        let n = std::cmp::max(
291                            self.peer_opened_streams_uni,
292                            stream_sequence + 1,
293                        );
294
295                        if n > self.local_max_streams_uni {
296                            return Err(Error::StreamLimit);
297                        }
298
299                        self.peer_opened_streams_uni = n;
300                    },
301                };
302
303                let s = Stream::new(
304                    id,
305                    max_rx_data,
306                    max_tx_data,
307                    is_bidi(id),
308                    local,
309                    self.max_stream_window,
310                );
311
312                let is_writable = s.is_writable();
313
314                (v.insert(s), is_writable)
315            },
316
317            hash_map::Entry::Occupied(v) => (v.into_mut(), false),
318        };
319
320        // Newly created stream might already be writable due to initial flow
321        // control limits.
322        if is_new_and_writable {
323            self.writable.insert(Arc::clone(&stream.priority_key));
324        }
325
326        Ok(stream)
327    }
328
329    /// Adds the stream ID to the readable streams set.
330    ///
331    /// If the stream was already in the list, this does nothing.
332    pub fn insert_readable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
333        if !priority_key.readable.is_linked() {
334            self.readable.insert(Arc::clone(priority_key));
335        }
336    }
337
338    /// Removes the stream ID from the readable streams set.
339    pub fn remove_readable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
340        if !priority_key.readable.is_linked() {
341            return;
342        }
343
344        let mut c = {
345            let ptr = Arc::as_ptr(priority_key);
346            unsafe { self.readable.cursor_mut_from_ptr(ptr) }
347        };
348
349        c.remove();
350    }
351
352    /// Adds the stream ID to the writable streams set.
353    ///
354    /// This should also be called anytime a new stream is created, in addition
355    /// to when an existing stream becomes writable.
356    ///
357    /// If the stream was already in the list, this does nothing.
358    pub fn insert_writable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
359        if !priority_key.writable.is_linked() {
360            self.writable.insert(Arc::clone(priority_key));
361        }
362    }
363
364    /// Removes the stream ID from the writable streams set.
365    ///
366    /// This should also be called anytime an existing stream stops being
367    /// writable.
368    pub fn remove_writable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
369        if !priority_key.writable.is_linked() {
370            return;
371        }
372
373        let mut c = {
374            let ptr = Arc::as_ptr(priority_key);
375            unsafe { self.writable.cursor_mut_from_ptr(ptr) }
376        };
377
378        c.remove();
379    }
380
381    /// Adds the stream ID to the flushable streams set.
382    ///
383    /// If the stream was already in the list, this does nothing.
384    pub fn insert_flushable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
385        if !priority_key.flushable.is_linked() {
386            self.flushable.insert(Arc::clone(priority_key));
387        }
388    }
389
390    /// Removes the stream ID from the flushable streams set.
391    pub fn remove_flushable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
392        if !priority_key.flushable.is_linked() {
393            return;
394        }
395
396        let mut c = {
397            let ptr = Arc::as_ptr(priority_key);
398            unsafe { self.flushable.cursor_mut_from_ptr(ptr) }
399        };
400
401        c.remove();
402    }
403
404    pub fn peek_flushable(&self) -> Option<Arc<StreamPriorityKey>> {
405        self.flushable.front().clone_pointer()
406    }
407
408    /// Updates the priorities of a stream.
409    pub fn update_priority(
410        &mut self, old: &Arc<StreamPriorityKey>, new: &Arc<StreamPriorityKey>,
411    ) {
412        if old.readable.is_linked() {
413            self.remove_readable(old);
414            self.readable.insert(Arc::clone(new));
415        }
416
417        if old.writable.is_linked() {
418            self.remove_writable(old);
419            self.writable.insert(Arc::clone(new));
420        }
421
422        if old.flushable.is_linked() {
423            self.remove_flushable(old);
424            self.flushable.insert(Arc::clone(new));
425        }
426    }
427
428    /// Adds the stream ID to the almost full streams set.
429    ///
430    /// If the stream was already in the list, this does nothing.
431    pub fn insert_almost_full(&mut self, stream_id: u64) {
432        self.almost_full.insert(stream_id);
433    }
434
435    /// Removes the stream ID from the almost full streams set.
436    pub fn remove_almost_full(&mut self, stream_id: u64) {
437        self.almost_full.remove(&stream_id);
438    }
439
440    /// Adds the stream ID to the blocked streams set with the
441    /// given offset value.
442    ///
443    /// If the stream was already in the list, this does nothing.
444    pub fn insert_blocked(&mut self, stream_id: u64, off: u64) {
445        self.blocked.insert(stream_id, off);
446    }
447
448    /// Removes the stream ID from the blocked streams set.
449    pub fn remove_blocked(&mut self, stream_id: u64) {
450        self.blocked.remove(&stream_id);
451    }
452
453    /// Adds the stream ID to the reset streams set with the
454    /// given error code and final size values.
455    ///
456    /// If the stream was already in the list, this does nothing.
457    pub fn insert_reset(
458        &mut self, stream_id: u64, error_code: u64, final_size: u64,
459    ) {
460        self.reset.insert(stream_id, (error_code, final_size));
461    }
462
463    /// Removes the stream ID from the reset streams set.
464    pub fn remove_reset(&mut self, stream_id: u64) {
465        self.reset.remove(&stream_id);
466    }
467
468    /// Adds the stream ID to the stopped streams set with the
469    /// given error code.
470    ///
471    /// If the stream was already in the list, this does nothing.
472    pub fn insert_stopped(&mut self, stream_id: u64, error_code: u64) {
473        self.stopped.insert(stream_id, error_code);
474    }
475
476    /// Removes the stream ID from the stopped streams set.
477    pub fn remove_stopped(&mut self, stream_id: u64) {
478        self.stopped.remove(&stream_id);
479    }
480
481    /// Updates the peer's maximum bidirectional stream count limit.
482    pub fn update_peer_max_streams_bidi(&mut self, v: u64) {
483        self.peer_max_streams_bidi = cmp::max(self.peer_max_streams_bidi, v);
484    }
485
486    /// Updates the peer's maximum unidirectional stream count limit.
487    pub fn update_peer_max_streams_uni(&mut self, v: u64) {
488        self.peer_max_streams_uni = cmp::max(self.peer_max_streams_uni, v);
489    }
490
491    /// Commits the new max_streams_bidi limit.
492    pub fn update_max_streams_bidi(&mut self) {
493        self.local_max_streams_bidi = self.local_max_streams_bidi_next;
494    }
495
496    /// Returns the current max_streams_bidi limit.
497    pub fn max_streams_bidi(&self) -> u64 {
498        self.local_max_streams_bidi
499    }
500
501    /// Returns the new max_streams_bidi limit.
502    pub fn max_streams_bidi_next(&mut self) -> u64 {
503        self.local_max_streams_bidi_next
504    }
505
506    /// Commits the new max_streams_uni limit.
507    pub fn update_max_streams_uni(&mut self) {
508        self.local_max_streams_uni = self.local_max_streams_uni_next;
509    }
510
511    /// Returns the new max_streams_uni limit.
512    pub fn max_streams_uni_next(&mut self) -> u64 {
513        self.local_max_streams_uni_next
514    }
515
516    /// Returns the number of bidirectional streams that can be created
517    /// before the peer's stream count limit is reached.
518    pub fn peer_streams_left_bidi(&self) -> u64 {
519        self.peer_max_streams_bidi - self.local_opened_streams_bidi
520    }
521
522    /// Returns the number of unidirectional streams that can be created
523    /// before the peer's stream count limit is reached.
524    pub fn peer_streams_left_uni(&self) -> u64 {
525        self.peer_max_streams_uni - self.local_opened_streams_uni
526    }
527
528    /// Drops completed stream.
529    ///
530    /// This should only be called when Stream::is_complete() returns true for
531    /// the given stream.
532    pub fn collect(&mut self, stream_id: u64, local: bool) {
533        if !local {
534            // If the stream was created by the peer, give back a max streams
535            // credit.
536            if is_bidi(stream_id) {
537                self.local_max_streams_bidi_next =
538                    self.local_max_streams_bidi_next.saturating_add(1);
539            } else {
540                self.local_max_streams_uni_next =
541                    self.local_max_streams_uni_next.saturating_add(1);
542            }
543        }
544
545        let s = self.streams.remove(&stream_id).unwrap();
546
547        self.remove_readable(&s.priority_key);
548
549        self.remove_writable(&s.priority_key);
550
551        self.remove_flushable(&s.priority_key);
552
553        self.collected.insert(stream_id);
554    }
555
556    /// Creates an iterator over streams that have outstanding data to read.
557    pub fn readable(&self) -> StreamIter {
558        StreamIter {
559            streams: self.readable.iter().map(|s| s.id).collect(),
560            index: 0,
561        }
562    }
563
564    /// Creates an iterator over streams that can be written to.
565    pub fn writable(&self) -> StreamIter {
566        StreamIter {
567            streams: self.writable.iter().map(|s| s.id).collect(),
568            index: 0,
569        }
570    }
571
572    /// Creates an iterator over streams that need to send MAX_STREAM_DATA.
573    pub fn almost_full(&self) -> StreamIter {
574        StreamIter::from(&self.almost_full)
575    }
576
577    /// Creates an iterator over streams that need to send STREAM_DATA_BLOCKED.
578    pub fn blocked(&self) -> hash_map::Iter<u64, u64> {
579        self.blocked.iter()
580    }
581
582    /// Creates an iterator over streams that need to send RESET_STREAM.
583    pub fn reset(&self) -> hash_map::Iter<u64, (u64, u64)> {
584        self.reset.iter()
585    }
586
587    /// Creates an iterator over streams that need to send STOP_SENDING.
588    pub fn stopped(&self) -> hash_map::Iter<u64, u64> {
589        self.stopped.iter()
590    }
591
592    /// Returns true if the stream has been collected.
593    pub fn is_collected(&self, stream_id: u64) -> bool {
594        self.collected.contains(&stream_id)
595    }
596
597    /// Returns true if there are any streams that have data to write.
598    pub fn has_flushable(&self) -> bool {
599        !self.flushable.is_empty()
600    }
601
602    /// Returns true if there are any streams that have data to read.
603    pub fn has_readable(&self) -> bool {
604        !self.readable.is_empty()
605    }
606
607    /// Returns true if there are any streams that need to update the local
608    /// flow control limit.
609    pub fn has_almost_full(&self) -> bool {
610        !self.almost_full.is_empty()
611    }
612
613    /// Returns true if there are any streams that are blocked.
614    pub fn has_blocked(&self) -> bool {
615        !self.blocked.is_empty()
616    }
617
618    /// Returns true if there are any streams that are reset.
619    pub fn has_reset(&self) -> bool {
620        !self.reset.is_empty()
621    }
622
623    /// Returns true if there are any streams that need to send STOP_SENDING.
624    pub fn has_stopped(&self) -> bool {
625        !self.stopped.is_empty()
626    }
627
628    /// Returns true if the max bidirectional streams count needs to be updated
629    /// by sending a MAX_STREAMS frame to the peer.
630    pub fn should_update_max_streams_bidi(&self) -> bool {
631        self.local_max_streams_bidi_next != self.local_max_streams_bidi &&
632            self.local_max_streams_bidi_next / 2 >
633                self.local_max_streams_bidi - self.peer_opened_streams_bidi
634    }
635
636    /// Returns true if the max unidirectional streams count needs to be updated
637    /// by sending a MAX_STREAMS frame to the peer.
638    pub fn should_update_max_streams_uni(&self) -> bool {
639        self.local_max_streams_uni_next != self.local_max_streams_uni &&
640            self.local_max_streams_uni_next / 2 >
641                self.local_max_streams_uni - self.peer_opened_streams_uni
642    }
643
644    /// Returns the number of active streams in the map.
645    #[cfg(test)]
646    pub fn len(&self) -> usize {
647        self.streams.len()
648    }
649}
650
651/// A QUIC stream.
652pub struct Stream<F: BufFactory = DefaultBufFactory> {
653    /// Receive-side stream buffer.
654    pub recv: recv_buf::RecvBuf,
655
656    /// Send-side stream buffer.
657    pub send: send_buf::SendBuf<F>,
658
659    pub send_lowat: usize,
660
661    /// Whether the stream is bidirectional.
662    pub bidi: bool,
663
664    /// Whether the stream was created by the local endpoint.
665    pub local: bool,
666
667    /// The stream's urgency (lower is better). Default is `DEFAULT_URGENCY`.
668    pub urgency: u8,
669
670    /// Whether the stream can be flushed incrementally. Default is `true`.
671    pub incremental: bool,
672
673    pub priority_key: Arc<StreamPriorityKey>,
674}
675
676impl<F: BufFactory> Stream<F> {
677    /// Creates a new stream with the given flow control limits.
678    pub fn new(
679        id: u64, max_rx_data: u64, max_tx_data: u64, bidi: bool, local: bool,
680        max_window: u64,
681    ) -> Self {
682        let priority_key = Arc::new(StreamPriorityKey {
683            id,
684            ..Default::default()
685        });
686
687        Stream {
688            recv: recv_buf::RecvBuf::new(max_rx_data, max_window),
689            send: send_buf::SendBuf::new(max_tx_data),
690            send_lowat: 1,
691            bidi,
692            local,
693            urgency: priority_key.urgency,
694            incremental: priority_key.incremental,
695            priority_key,
696        }
697    }
698
699    /// Returns true if the stream has data to read.
700    pub fn is_readable(&self) -> bool {
701        self.recv.ready()
702    }
703
704    /// Returns true if the stream has enough flow control capacity to be
705    /// written to, and is not finished.
706    pub fn is_writable(&self) -> bool {
707        !self.send.is_shutdown() &&
708            !self.send.is_fin() &&
709            (self.send.off_back() + self.send_lowat as u64) <
710                self.send.max_off()
711    }
712
713    /// Returns true if the stream has data to send and is allowed to send at
714    /// least some of it.
715    pub fn is_flushable(&self) -> bool {
716        let off_front = self.send.off_front();
717
718        !self.send.is_empty() &&
719            off_front < self.send.off_back() &&
720            off_front < self.send.max_off()
721    }
722
723    /// Returns true if the stream is complete.
724    ///
725    /// For bidirectional streams this happens when both the receive and send
726    /// sides are complete. That is when all incoming data has been read by the
727    /// application, and when all outgoing data has been acked by the peer.
728    ///
729    /// For unidirectional streams this happens when either the receive or send
730    /// side is complete, depending on whether the stream was created locally
731    /// or not.
732    pub fn is_complete(&self) -> bool {
733        match (self.bidi, self.local) {
734            // For bidirectional streams we need to check both receive and send
735            // sides for completion.
736            (true, _) => self.recv.is_fin() && self.send.is_complete(),
737
738            // For unidirectional streams generated locally, we only need to
739            // check the send side for completion.
740            (false, true) => self.send.is_complete(),
741
742            // For unidirectional streams generated by the peer, we only need
743            // to check the receive side for completion.
744            (false, false) => self.recv.is_fin(),
745        }
746    }
747}
748
749/// Returns true if the stream was created locally.
750pub fn is_local(stream_id: u64, is_server: bool) -> bool {
751    (stream_id & 0x1) == (is_server as u64)
752}
753
754/// Returns true if the stream is bidirectional.
755pub fn is_bidi(stream_id: u64) -> bool {
756    (stream_id & 0x2) == 0
757}
758
759#[derive(Clone, Debug)]
760pub struct StreamPriorityKey {
761    pub urgency: u8,
762    pub incremental: bool,
763    pub id: u64,
764
765    pub readable: RBTreeAtomicLink,
766    pub writable: RBTreeAtomicLink,
767    pub flushable: RBTreeAtomicLink,
768}
769
770impl Default for StreamPriorityKey {
771    fn default() -> Self {
772        Self {
773            urgency: DEFAULT_URGENCY,
774            incremental: true,
775            id: Default::default(),
776            readable: Default::default(),
777            writable: Default::default(),
778            flushable: Default::default(),
779        }
780    }
781}
782
783impl PartialEq for StreamPriorityKey {
784    fn eq(&self, other: &Self) -> bool {
785        self.id == other.id
786    }
787}
788
789impl Eq for StreamPriorityKey {}
790
791impl PartialOrd for StreamPriorityKey {
792    // Priority ordering is complex, disable Clippy warning.
793    #[allow(clippy::non_canonical_partial_ord_impl)]
794    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
795        // Ignore priority if ID matches.
796        if self.id == other.id {
797            return Some(std::cmp::Ordering::Equal);
798        }
799
800        // First, order by urgency...
801        if self.urgency != other.urgency {
802            return self.urgency.partial_cmp(&other.urgency);
803        }
804
805        // ...when the urgency is the same, and both are not incremental, order
806        // by stream ID...
807        if !self.incremental && !other.incremental {
808            return self.id.partial_cmp(&other.id);
809        }
810
811        // ...non-incremental takes priority over incremental...
812        if self.incremental && !other.incremental {
813            return Some(std::cmp::Ordering::Greater);
814        }
815        if !self.incremental && other.incremental {
816            return Some(std::cmp::Ordering::Less);
817        }
818
819        // ...finally, when both are incremental, `other` takes precedence (so
820        // `self` is always sorted after other same-urgency incremental
821        // entries).
822        Some(std::cmp::Ordering::Greater)
823    }
824}
825
826impl Ord for StreamPriorityKey {
827    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
828        // `partial_cmp()` never returns `None`, so this should be safe.
829        self.partial_cmp(other).unwrap()
830    }
831}
832
833intrusive_adapter!(pub StreamWritablePriorityAdapter = Arc<StreamPriorityKey>: StreamPriorityKey { writable: RBTreeAtomicLink });
834
835impl KeyAdapter<'_> for StreamWritablePriorityAdapter {
836    type Key = StreamPriorityKey;
837
838    fn get_key(&self, s: &StreamPriorityKey) -> Self::Key {
839        s.clone()
840    }
841}
842
843intrusive_adapter!(pub StreamReadablePriorityAdapter = Arc<StreamPriorityKey>: StreamPriorityKey { readable: RBTreeAtomicLink });
844
845impl KeyAdapter<'_> for StreamReadablePriorityAdapter {
846    type Key = StreamPriorityKey;
847
848    fn get_key(&self, s: &StreamPriorityKey) -> Self::Key {
849        s.clone()
850    }
851}
852
853intrusive_adapter!(pub StreamFlushablePriorityAdapter = Arc<StreamPriorityKey>: StreamPriorityKey { flushable: RBTreeAtomicLink });
854
855impl KeyAdapter<'_> for StreamFlushablePriorityAdapter {
856    type Key = StreamPriorityKey;
857
858    fn get_key(&self, s: &StreamPriorityKey) -> Self::Key {
859        s.clone()
860    }
861}
862
863/// An iterator over QUIC streams.
864#[derive(Default)]
865pub struct StreamIter {
866    streams: SmallVec<[u64; 8]>,
867    index: usize,
868}
869
870impl StreamIter {
871    #[inline]
872    fn from(streams: &StreamIdHashSet) -> Self {
873        StreamIter {
874            streams: streams.iter().copied().collect(),
875            index: 0,
876        }
877    }
878}
879
880impl Iterator for StreamIter {
881    type Item = u64;
882
883    #[inline]
884    fn next(&mut self) -> Option<Self::Item> {
885        let v = self.streams.get(self.index)?;
886        self.index += 1;
887        Some(*v)
888    }
889}
890
891impl ExactSizeIterator for StreamIter {
892    #[inline]
893    fn len(&self) -> usize {
894        self.streams.len() - self.index
895    }
896}
897
898#[cfg(test)]
899mod tests {
900    use crate::range_buf::RangeBuf;
901
902    use super::*;
903
904    #[test]
905    fn recv_flow_control() {
906        let mut stream =
907            <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
908        assert!(!stream.recv.almost_full());
909
910        let mut buf = [0; 32];
911
912        let first = RangeBuf::from(b"hello", 0, false);
913        let second = RangeBuf::from(b"world", 5, false);
914        let third = RangeBuf::from(b"something", 10, false);
915
916        assert_eq!(stream.recv.write(second), Ok(()));
917        assert_eq!(stream.recv.write(first), Ok(()));
918        assert!(!stream.recv.almost_full());
919
920        assert_eq!(stream.recv.write(third), Err(Error::FlowControl));
921
922        let (len, fin) = stream.recv.emit(&mut buf).unwrap();
923        assert_eq!(&buf[..len], b"helloworld");
924        assert!(!fin);
925
926        assert!(stream.recv.almost_full());
927
928        stream.recv.update_max_data(std::time::Instant::now());
929        assert_eq!(stream.recv.max_data_next(), 25);
930        assert!(!stream.recv.almost_full());
931
932        let third = RangeBuf::from(b"something", 10, false);
933        assert_eq!(stream.recv.write(third), Ok(()));
934    }
935
936    #[test]
937    fn recv_past_fin() {
938        let mut stream =
939            <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
940        assert!(!stream.recv.almost_full());
941
942        let first = RangeBuf::from(b"hello", 0, true);
943        let second = RangeBuf::from(b"world", 5, false);
944
945        assert_eq!(stream.recv.write(first), Ok(()));
946        assert_eq!(stream.recv.write(second), Err(Error::FinalSize));
947    }
948
949    #[test]
950    fn recv_fin_dup() {
951        let mut stream =
952            <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
953        assert!(!stream.recv.almost_full());
954
955        let first = RangeBuf::from(b"hello", 0, true);
956        let second = RangeBuf::from(b"hello", 0, true);
957
958        assert_eq!(stream.recv.write(first), Ok(()));
959        assert_eq!(stream.recv.write(second), Ok(()));
960
961        let mut buf = [0; 32];
962
963        let (len, fin) = stream.recv.emit(&mut buf).unwrap();
964        assert_eq!(&buf[..len], b"hello");
965        assert!(fin);
966    }
967
968    #[test]
969    fn recv_fin_change() {
970        let mut stream =
971            <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
972        assert!(!stream.recv.almost_full());
973
974        let first = RangeBuf::from(b"hello", 0, true);
975        let second = RangeBuf::from(b"world", 5, true);
976
977        assert_eq!(stream.recv.write(second), Ok(()));
978        assert_eq!(stream.recv.write(first), Err(Error::FinalSize));
979    }
980
981    #[test]
982    fn recv_fin_lower_than_received() {
983        let mut stream =
984            <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
985        assert!(!stream.recv.almost_full());
986
987        let first = RangeBuf::from(b"hello", 0, true);
988        let second = RangeBuf::from(b"world", 5, false);
989
990        assert_eq!(stream.recv.write(second), Ok(()));
991        assert_eq!(stream.recv.write(first), Err(Error::FinalSize));
992    }
993
994    #[test]
995    fn recv_fin_flow_control() {
996        let mut stream =
997            <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
998        assert!(!stream.recv.almost_full());
999
1000        let mut buf = [0; 32];
1001
1002        let first = RangeBuf::from(b"hello", 0, false);
1003        let second = RangeBuf::from(b"world", 5, true);
1004
1005        assert_eq!(stream.recv.write(first), Ok(()));
1006        assert_eq!(stream.recv.write(second), Ok(()));
1007
1008        let (len, fin) = stream.recv.emit(&mut buf).unwrap();
1009        assert_eq!(&buf[..len], b"helloworld");
1010        assert!(fin);
1011
1012        assert!(!stream.recv.almost_full());
1013    }
1014
1015    #[test]
1016    fn recv_fin_reset_mismatch() {
1017        let mut stream =
1018            <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1019        assert!(!stream.recv.almost_full());
1020
1021        let first = RangeBuf::from(b"hello", 0, true);
1022
1023        assert_eq!(stream.recv.write(first), Ok(()));
1024        assert_eq!(stream.recv.reset(0, 10), Err(Error::FinalSize));
1025    }
1026
1027    #[test]
1028    fn recv_reset_dup() {
1029        let mut stream =
1030            <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1031        assert!(!stream.recv.almost_full());
1032
1033        let first = RangeBuf::from(b"hello", 0, false);
1034
1035        assert_eq!(stream.recv.write(first), Ok(()));
1036        assert_eq!(stream.recv.reset(0, 5), Ok(0));
1037        assert_eq!(stream.recv.reset(0, 5), Ok(0));
1038    }
1039
1040    #[test]
1041    fn recv_reset_change() {
1042        let mut stream =
1043            <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1044        assert!(!stream.recv.almost_full());
1045
1046        let first = RangeBuf::from(b"hello", 0, false);
1047
1048        assert_eq!(stream.recv.write(first), Ok(()));
1049        assert_eq!(stream.recv.reset(0, 5), Ok(0));
1050        assert_eq!(stream.recv.reset(0, 10), Err(Error::FinalSize));
1051    }
1052
1053    #[test]
1054    fn recv_reset_lower_than_received() {
1055        let mut stream =
1056            <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1057        assert!(!stream.recv.almost_full());
1058
1059        let first = RangeBuf::from(b"hello", 0, false);
1060
1061        assert_eq!(stream.recv.write(first), Ok(()));
1062        assert_eq!(stream.recv.reset(0, 4), Err(Error::FinalSize));
1063    }
1064
1065    #[test]
1066    fn send_flow_control() {
1067        let mut buf = [0; 25];
1068
1069        let mut stream =
1070            <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1071
1072        let first = b"hello";
1073        let second = b"world";
1074        let third = b"something";
1075
1076        assert!(stream.send.write(first, false).is_ok());
1077        assert!(stream.send.write(second, false).is_ok());
1078        assert!(stream.send.write(third, false).is_ok());
1079
1080        assert_eq!(stream.send.off_front(), 0);
1081
1082        let (written, fin) = stream.send.emit(&mut buf[..25]).unwrap();
1083        assert_eq!(written, 15);
1084        assert!(!fin);
1085        assert_eq!(&buf[..written], b"helloworldsomet");
1086
1087        assert_eq!(stream.send.off_front(), 15);
1088
1089        let (written, fin) = stream.send.emit(&mut buf[..25]).unwrap();
1090        assert_eq!(written, 0);
1091        assert!(!fin);
1092        assert_eq!(&buf[..written], b"");
1093
1094        stream.send.retransmit(0, 15);
1095
1096        assert_eq!(stream.send.off_front(), 0);
1097
1098        let (written, fin) = stream.send.emit(&mut buf[..10]).unwrap();
1099        assert_eq!(written, 10);
1100        assert!(!fin);
1101        assert_eq!(&buf[..written], b"helloworld");
1102
1103        assert_eq!(stream.send.off_front(), 10);
1104
1105        let (written, fin) = stream.send.emit(&mut buf[..10]).unwrap();
1106        assert_eq!(written, 5);
1107        assert!(!fin);
1108        assert_eq!(&buf[..written], b"somet");
1109    }
1110
1111    #[test]
1112    fn send_past_fin() {
1113        let mut stream =
1114            <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1115
1116        let first = b"hello";
1117        let second = b"world";
1118        let third = b"third";
1119
1120        assert_eq!(stream.send.write(first, false), Ok(5));
1121
1122        assert_eq!(stream.send.write(second, true), Ok(5));
1123        assert!(stream.send.is_fin());
1124
1125        assert_eq!(stream.send.write(third, false), Err(Error::FinalSize));
1126    }
1127
1128    #[test]
1129    fn send_fin_dup() {
1130        let mut stream =
1131            <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1132
1133        assert_eq!(stream.send.write(b"hello", true), Ok(5));
1134        assert!(stream.send.is_fin());
1135
1136        assert_eq!(stream.send.write(b"", true), Ok(0));
1137        assert!(stream.send.is_fin());
1138    }
1139
1140    #[test]
1141    fn send_undo_fin() {
1142        let mut stream =
1143            <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1144
1145        assert_eq!(stream.send.write(b"hello", true), Ok(5));
1146        assert!(stream.send.is_fin());
1147
1148        assert_eq!(
1149            stream.send.write(b"helloworld", true),
1150            Err(Error::FinalSize)
1151        );
1152    }
1153
1154    #[test]
1155    fn send_fin_max_data_match() {
1156        let mut buf = [0; 15];
1157
1158        let mut stream =
1159            <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1160
1161        let slice = b"hellohellohello";
1162
1163        assert!(stream.send.write(slice, true).is_ok());
1164
1165        let (written, fin) = stream.send.emit(&mut buf[..15]).unwrap();
1166        assert_eq!(written, 15);
1167        assert!(fin);
1168        assert_eq!(&buf[..written], slice);
1169    }
1170
1171    #[test]
1172    fn send_fin_zero_length() {
1173        let mut buf = [0; 5];
1174
1175        let mut stream =
1176            <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1177
1178        assert_eq!(stream.send.write(b"hello", false), Ok(5));
1179        assert_eq!(stream.send.write(b"", true), Ok(0));
1180        assert!(stream.send.is_fin());
1181
1182        let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1183        assert_eq!(written, 5);
1184        assert!(fin);
1185        assert_eq!(&buf[..written], b"hello");
1186    }
1187
1188    #[test]
1189    fn send_ack() {
1190        let mut buf = [0; 5];
1191
1192        let mut stream =
1193            <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1194
1195        assert_eq!(stream.send.write(b"hello", false), Ok(5));
1196        assert_eq!(stream.send.write(b"world", false), Ok(5));
1197        assert_eq!(stream.send.write(b"", true), Ok(0));
1198        assert!(stream.send.is_fin());
1199
1200        assert_eq!(stream.send.off_front(), 0);
1201
1202        let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1203        assert_eq!(written, 5);
1204        assert!(!fin);
1205        assert_eq!(&buf[..written], b"hello");
1206
1207        stream.send.ack_and_drop(0, 5);
1208
1209        stream.send.retransmit(0, 5);
1210
1211        assert_eq!(stream.send.off_front(), 5);
1212
1213        let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1214        assert_eq!(written, 5);
1215        assert!(fin);
1216        assert_eq!(&buf[..written], b"world");
1217    }
1218
1219    #[test]
1220    fn send_ack_reordering() {
1221        let mut buf = [0; 5];
1222
1223        let mut stream =
1224            <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1225
1226        assert_eq!(stream.send.write(b"hello", false), Ok(5));
1227        assert_eq!(stream.send.write(b"world", false), Ok(5));
1228        assert_eq!(stream.send.write(b"", true), Ok(0));
1229        assert!(stream.send.is_fin());
1230
1231        assert_eq!(stream.send.off_front(), 0);
1232
1233        let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1234        assert_eq!(written, 5);
1235        assert!(!fin);
1236        assert_eq!(&buf[..written], b"hello");
1237
1238        assert_eq!(stream.send.off_front(), 5);
1239
1240        let (written, fin) = stream.send.emit(&mut buf[..1]).unwrap();
1241        assert_eq!(written, 1);
1242        assert!(!fin);
1243        assert_eq!(&buf[..written], b"w");
1244
1245        stream.send.ack_and_drop(5, 1);
1246        stream.send.ack_and_drop(0, 5);
1247
1248        stream.send.retransmit(0, 5);
1249        stream.send.retransmit(5, 1);
1250
1251        assert_eq!(stream.send.off_front(), 6);
1252
1253        let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1254        assert_eq!(written, 4);
1255        assert!(fin);
1256        assert_eq!(&buf[..written], b"orld");
1257    }
1258
1259    #[test]
1260    fn recv_data_below_off() {
1261        let mut stream =
1262            <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1263
1264        let first = RangeBuf::from(b"hello", 0, false);
1265
1266        assert_eq!(stream.recv.write(first), Ok(()));
1267
1268        let mut buf = [0; 10];
1269
1270        let (len, fin) = stream.recv.emit(&mut buf).unwrap();
1271        assert_eq!(&buf[..len], b"hello");
1272        assert!(!fin);
1273
1274        let first = RangeBuf::from(b"elloworld", 1, true);
1275        assert_eq!(stream.recv.write(first), Ok(()));
1276
1277        let (len, fin) = stream.recv.emit(&mut buf).unwrap();
1278        assert_eq!(&buf[..len], b"world");
1279        assert!(fin);
1280    }
1281
1282    #[test]
1283    fn stream_complete() {
1284        let mut stream =
1285            <Stream>::new(0, 30, 30, true, true, DEFAULT_STREAM_WINDOW);
1286
1287        assert_eq!(stream.send.write(b"hello", false), Ok(5));
1288        assert_eq!(stream.send.write(b"world", false), Ok(5));
1289
1290        assert!(!stream.send.is_complete());
1291        assert!(!stream.send.is_fin());
1292
1293        assert_eq!(stream.send.write(b"", true), Ok(0));
1294
1295        assert!(!stream.send.is_complete());
1296        assert!(stream.send.is_fin());
1297
1298        let buf = RangeBuf::from(b"hello", 0, true);
1299        assert!(stream.recv.write(buf).is_ok());
1300        assert!(!stream.recv.is_fin());
1301
1302        stream.send.ack(6, 4);
1303        assert!(!stream.send.is_complete());
1304
1305        let mut buf = [0; 2];
1306        assert_eq!(stream.recv.emit(&mut buf), Ok((2, false)));
1307        assert!(!stream.recv.is_fin());
1308
1309        stream.send.ack(1, 5);
1310        assert!(!stream.send.is_complete());
1311
1312        stream.send.ack(0, 1);
1313        assert!(stream.send.is_complete());
1314
1315        assert!(!stream.is_complete());
1316
1317        let mut buf = [0; 3];
1318        assert_eq!(stream.recv.emit(&mut buf), Ok((3, true)));
1319        assert!(stream.recv.is_fin());
1320
1321        assert!(stream.is_complete());
1322    }
1323
1324    #[test]
1325    fn send_fin_zero_length_output() {
1326        let mut buf = [0; 5];
1327
1328        let mut stream =
1329            <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1330
1331        assert_eq!(stream.send.write(b"hello", false), Ok(5));
1332        assert_eq!(stream.send.off_front(), 0);
1333        assert!(!stream.send.is_fin());
1334
1335        let (written, fin) = stream.send.emit(&mut buf).unwrap();
1336        assert_eq!(written, 5);
1337        assert!(!fin);
1338        assert_eq!(&buf[..written], b"hello");
1339
1340        assert_eq!(stream.send.write(b"", true), Ok(0));
1341        assert!(stream.send.is_fin());
1342        assert_eq!(stream.send.off_front(), 5);
1343
1344        let (written, fin) = stream.send.emit(&mut buf).unwrap();
1345        assert_eq!(written, 0);
1346        assert!(fin);
1347        assert_eq!(&buf[..written], b"");
1348    }
1349
1350    fn stream_send_ready(stream: &Stream) -> bool {
1351        !stream.send.is_empty() &&
1352            stream.send.off_front() < stream.send.off_back()
1353    }
1354
1355    #[test]
1356    fn send_emit() {
1357        let mut buf = [0; 5];
1358
1359        let mut stream =
1360            <Stream>::new(0, 0, 20, true, true, DEFAULT_STREAM_WINDOW);
1361
1362        assert_eq!(stream.send.write(b"hello", false), Ok(5));
1363        assert_eq!(stream.send.write(b"world", false), Ok(5));
1364        assert_eq!(stream.send.write(b"olleh", false), Ok(5));
1365        assert_eq!(stream.send.write(b"dlrow", true), Ok(5));
1366        assert_eq!(stream.send.off_front(), 0);
1367        assert_eq!(stream.send.bufs_count(), 4);
1368
1369        assert!(stream.is_flushable());
1370
1371        assert!(stream_send_ready(&stream));
1372        assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1373        assert_eq!(stream.send.off_front(), 4);
1374        assert_eq!(&buf[..4], b"hell");
1375
1376        assert!(stream_send_ready(&stream));
1377        assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1378        assert_eq!(stream.send.off_front(), 8);
1379        assert_eq!(&buf[..4], b"owor");
1380
1381        assert!(stream_send_ready(&stream));
1382        assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1383        assert_eq!(stream.send.off_front(), 10);
1384        assert_eq!(&buf[..2], b"ld");
1385
1386        assert!(stream_send_ready(&stream));
1387        assert_eq!(stream.send.emit(&mut buf[..1]), Ok((1, false)));
1388        assert_eq!(stream.send.off_front(), 11);
1389        assert_eq!(&buf[..1], b"o");
1390
1391        assert!(stream_send_ready(&stream));
1392        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1393        assert_eq!(stream.send.off_front(), 16);
1394        assert_eq!(&buf[..5], b"llehd");
1395
1396        assert!(stream_send_ready(&stream));
1397        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((4, true)));
1398        assert_eq!(stream.send.off_front(), 20);
1399        assert_eq!(&buf[..4], b"lrow");
1400
1401        assert!(!stream.is_flushable());
1402
1403        assert!(!stream_send_ready(&stream));
1404        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((0, true)));
1405        assert_eq!(stream.send.off_front(), 20);
1406    }
1407
1408    #[test]
1409    fn send_emit_ack() {
1410        let mut buf = [0; 5];
1411
1412        let mut stream =
1413            <Stream>::new(0, 0, 20, true, true, DEFAULT_STREAM_WINDOW);
1414
1415        assert_eq!(stream.send.write(b"hello", false), Ok(5));
1416        assert_eq!(stream.send.write(b"world", false), Ok(5));
1417        assert_eq!(stream.send.write(b"olleh", false), Ok(5));
1418        assert_eq!(stream.send.write(b"dlrow", true), Ok(5));
1419        assert_eq!(stream.send.off_front(), 0);
1420        assert_eq!(stream.send.bufs_count(), 4);
1421
1422        assert!(stream.is_flushable());
1423
1424        assert!(stream_send_ready(&stream));
1425        assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1426        assert_eq!(stream.send.off_front(), 4);
1427        assert_eq!(&buf[..4], b"hell");
1428
1429        assert!(stream_send_ready(&stream));
1430        assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1431        assert_eq!(stream.send.off_front(), 8);
1432        assert_eq!(&buf[..4], b"owor");
1433
1434        stream.send.ack_and_drop(0, 5);
1435        assert_eq!(stream.send.bufs_count(), 3);
1436
1437        assert!(stream_send_ready(&stream));
1438        assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1439        assert_eq!(stream.send.off_front(), 10);
1440        assert_eq!(&buf[..2], b"ld");
1441
1442        stream.send.ack_and_drop(7, 5);
1443        assert_eq!(stream.send.bufs_count(), 3);
1444
1445        assert!(stream_send_ready(&stream));
1446        assert_eq!(stream.send.emit(&mut buf[..1]), Ok((1, false)));
1447        assert_eq!(stream.send.off_front(), 11);
1448        assert_eq!(&buf[..1], b"o");
1449
1450        assert!(stream_send_ready(&stream));
1451        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1452        assert_eq!(stream.send.off_front(), 16);
1453        assert_eq!(&buf[..5], b"llehd");
1454
1455        stream.send.ack_and_drop(5, 7);
1456        assert_eq!(stream.send.bufs_count(), 2);
1457
1458        assert!(stream_send_ready(&stream));
1459        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((4, true)));
1460        assert_eq!(stream.send.off_front(), 20);
1461        assert_eq!(&buf[..4], b"lrow");
1462
1463        assert!(!stream.is_flushable());
1464
1465        assert!(!stream_send_ready(&stream));
1466        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((0, true)));
1467        assert_eq!(stream.send.off_front(), 20);
1468
1469        stream.send.ack_and_drop(22, 4);
1470        assert_eq!(stream.send.bufs_count(), 2);
1471
1472        stream.send.ack_and_drop(20, 1);
1473        assert_eq!(stream.send.bufs_count(), 2);
1474    }
1475
1476    #[test]
1477    fn send_emit_retransmit() {
1478        let mut buf = [0; 5];
1479
1480        let mut stream =
1481            <Stream>::new(0, 0, 20, true, true, DEFAULT_STREAM_WINDOW);
1482
1483        assert_eq!(stream.send.write(b"hello", false), Ok(5));
1484        assert_eq!(stream.send.write(b"world", false), Ok(5));
1485        assert_eq!(stream.send.write(b"olleh", false), Ok(5));
1486        assert_eq!(stream.send.write(b"dlrow", true), Ok(5));
1487        assert_eq!(stream.send.off_front(), 0);
1488        assert_eq!(stream.send.bufs_count(), 4);
1489
1490        assert!(stream.is_flushable());
1491
1492        assert!(stream_send_ready(&stream));
1493        assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1494        assert_eq!(stream.send.off_front(), 4);
1495        assert_eq!(&buf[..4], b"hell");
1496
1497        assert!(stream_send_ready(&stream));
1498        assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1499        assert_eq!(stream.send.off_front(), 8);
1500        assert_eq!(&buf[..4], b"owor");
1501
1502        stream.send.retransmit(3, 3);
1503        assert_eq!(stream.send.off_front(), 3);
1504
1505        assert!(stream_send_ready(&stream));
1506        assert_eq!(stream.send.emit(&mut buf[..3]), Ok((3, false)));
1507        assert_eq!(stream.send.off_front(), 8);
1508        assert_eq!(&buf[..3], b"low");
1509
1510        assert!(stream_send_ready(&stream));
1511        assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1512        assert_eq!(stream.send.off_front(), 10);
1513        assert_eq!(&buf[..2], b"ld");
1514
1515        stream.send.ack_and_drop(7, 2);
1516
1517        stream.send.retransmit(8, 2);
1518
1519        assert!(stream_send_ready(&stream));
1520        assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1521        assert_eq!(stream.send.off_front(), 10);
1522        assert_eq!(&buf[..2], b"ld");
1523
1524        assert!(stream_send_ready(&stream));
1525        assert_eq!(stream.send.emit(&mut buf[..1]), Ok((1, false)));
1526        assert_eq!(stream.send.off_front(), 11);
1527        assert_eq!(&buf[..1], b"o");
1528
1529        assert!(stream_send_ready(&stream));
1530        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1531        assert_eq!(stream.send.off_front(), 16);
1532        assert_eq!(&buf[..5], b"llehd");
1533
1534        stream.send.retransmit(12, 2);
1535
1536        assert!(stream_send_ready(&stream));
1537        assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1538        assert_eq!(stream.send.off_front(), 16);
1539        assert_eq!(&buf[..2], b"le");
1540
1541        assert!(stream_send_ready(&stream));
1542        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((4, true)));
1543        assert_eq!(stream.send.off_front(), 20);
1544        assert_eq!(&buf[..4], b"lrow");
1545
1546        assert!(!stream.is_flushable());
1547
1548        assert!(!stream_send_ready(&stream));
1549        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((0, true)));
1550        assert_eq!(stream.send.off_front(), 20);
1551
1552        stream.send.retransmit(7, 12);
1553
1554        assert!(stream_send_ready(&stream));
1555        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1556        assert_eq!(stream.send.off_front(), 12);
1557        assert_eq!(&buf[..5], b"rldol");
1558
1559        assert!(stream_send_ready(&stream));
1560        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1561        assert_eq!(stream.send.off_front(), 17);
1562        assert_eq!(&buf[..5], b"lehdl");
1563
1564        assert!(stream_send_ready(&stream));
1565        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((2, false)));
1566        assert_eq!(stream.send.off_front(), 20);
1567        assert_eq!(&buf[..2], b"ro");
1568
1569        stream.send.ack_and_drop(12, 7);
1570
1571        stream.send.retransmit(7, 12);
1572
1573        assert!(stream_send_ready(&stream));
1574        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1575        assert_eq!(stream.send.off_front(), 12);
1576        assert_eq!(&buf[..5], b"rldol");
1577
1578        assert!(stream_send_ready(&stream));
1579        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1580        assert_eq!(stream.send.off_front(), 17);
1581        assert_eq!(&buf[..5], b"lehdl");
1582
1583        assert!(stream_send_ready(&stream));
1584        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((2, false)));
1585        assert_eq!(stream.send.off_front(), 20);
1586        assert_eq!(&buf[..2], b"ro");
1587    }
1588
1589    #[test]
1590    fn rangebuf_split_off() {
1591        let mut buf = <RangeBuf>::from(b"helloworld", 5, true);
1592        assert_eq!(buf.start, 0);
1593        assert_eq!(buf.pos, 0);
1594        assert_eq!(buf.len, 10);
1595        assert_eq!(buf.off, 5);
1596        assert!(buf.fin);
1597
1598        assert_eq!(buf.len(), 10);
1599        assert_eq!(buf.off(), 5);
1600        assert!(buf.fin());
1601
1602        assert_eq!(&buf[..], b"helloworld");
1603
1604        // Advance buffer.
1605        buf.consume(5);
1606
1607        assert_eq!(buf.start, 0);
1608        assert_eq!(buf.pos, 5);
1609        assert_eq!(buf.len, 10);
1610        assert_eq!(buf.off, 5);
1611        assert!(buf.fin);
1612
1613        assert_eq!(buf.len(), 5);
1614        assert_eq!(buf.off(), 10);
1615        assert!(buf.fin());
1616
1617        assert_eq!(&buf[..], b"world");
1618
1619        // Split buffer before position.
1620        let mut new_buf = buf.split_off(3);
1621
1622        assert_eq!(buf.start, 0);
1623        assert_eq!(buf.pos, 3);
1624        assert_eq!(buf.len, 3);
1625        assert_eq!(buf.off, 5);
1626        assert!(!buf.fin);
1627
1628        assert_eq!(buf.len(), 0);
1629        assert_eq!(buf.off(), 8);
1630        assert!(!buf.fin());
1631
1632        assert_eq!(&buf[..], b"");
1633
1634        assert_eq!(new_buf.start, 3);
1635        assert_eq!(new_buf.pos, 5);
1636        assert_eq!(new_buf.len, 7);
1637        assert_eq!(new_buf.off, 8);
1638        assert!(new_buf.fin);
1639
1640        assert_eq!(new_buf.len(), 5);
1641        assert_eq!(new_buf.off(), 10);
1642        assert!(new_buf.fin());
1643
1644        assert_eq!(&new_buf[..], b"world");
1645
1646        // Advance buffer.
1647        new_buf.consume(2);
1648
1649        assert_eq!(new_buf.start, 3);
1650        assert_eq!(new_buf.pos, 7);
1651        assert_eq!(new_buf.len, 7);
1652        assert_eq!(new_buf.off, 8);
1653        assert!(new_buf.fin);
1654
1655        assert_eq!(new_buf.len(), 3);
1656        assert_eq!(new_buf.off(), 12);
1657        assert!(new_buf.fin());
1658
1659        assert_eq!(&new_buf[..], b"rld");
1660
1661        // Split buffer after position.
1662        let mut new_new_buf = new_buf.split_off(5);
1663
1664        assert_eq!(new_buf.start, 3);
1665        assert_eq!(new_buf.pos, 7);
1666        assert_eq!(new_buf.len, 5);
1667        assert_eq!(new_buf.off, 8);
1668        assert!(!new_buf.fin);
1669
1670        assert_eq!(new_buf.len(), 1);
1671        assert_eq!(new_buf.off(), 12);
1672        assert!(!new_buf.fin());
1673
1674        assert_eq!(&new_buf[..], b"r");
1675
1676        assert_eq!(new_new_buf.start, 8);
1677        assert_eq!(new_new_buf.pos, 8);
1678        assert_eq!(new_new_buf.len, 2);
1679        assert_eq!(new_new_buf.off, 13);
1680        assert!(new_new_buf.fin);
1681
1682        assert_eq!(new_new_buf.len(), 2);
1683        assert_eq!(new_new_buf.off(), 13);
1684        assert!(new_new_buf.fin());
1685
1686        assert_eq!(&new_new_buf[..], b"ld");
1687
1688        // Advance buffer.
1689        new_new_buf.consume(2);
1690
1691        assert_eq!(new_new_buf.start, 8);
1692        assert_eq!(new_new_buf.pos, 10);
1693        assert_eq!(new_new_buf.len, 2);
1694        assert_eq!(new_new_buf.off, 13);
1695        assert!(new_new_buf.fin);
1696
1697        assert_eq!(new_new_buf.len(), 0);
1698        assert_eq!(new_new_buf.off(), 15);
1699        assert!(new_new_buf.fin());
1700
1701        assert_eq!(&new_new_buf[..], b"");
1702    }
1703
1704    /// RFC9000 2.1: A stream ID that is used out of order results in all
1705    /// streams of that type with lower-numbered stream IDs also being opened.
1706    #[test]
1707    fn stream_limit_auto_open() {
1708        let local_tp = crate::TransportParams::default();
1709        let peer_tp = crate::TransportParams::default();
1710
1711        let mut streams = <StreamMap>::new(5, 5, 5);
1712
1713        let stream_id = 500;
1714        assert!(!is_local(stream_id, true), "stream id is peer initiated");
1715        assert!(is_bidi(stream_id), "stream id is bidirectional");
1716        assert_eq!(
1717            streams
1718                .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
1719                .err(),
1720            Some(Error::StreamLimit),
1721            "stream limit should be exceeded"
1722        );
1723    }
1724
1725    /// Stream limit should be satisfied regardless of what order we open
1726    /// streams
1727    #[test]
1728    fn stream_create_out_of_order() {
1729        let local_tp = crate::TransportParams::default();
1730        let peer_tp = crate::TransportParams::default();
1731
1732        let mut streams = <StreamMap>::new(5, 5, 5);
1733
1734        for stream_id in [8, 12, 4] {
1735            assert!(is_local(stream_id, false), "stream id is client initiated");
1736            assert!(is_bidi(stream_id), "stream id is bidirectional");
1737            assert!(streams
1738                .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
1739                .is_ok());
1740        }
1741    }
1742
1743    /// Check stream limit boundary cases
1744    #[test]
1745    fn stream_limit_edge() {
1746        let local_tp = crate::TransportParams::default();
1747        let peer_tp = crate::TransportParams::default();
1748
1749        let mut streams = <StreamMap>::new(3, 3, 3);
1750
1751        // Highest permitted
1752        let stream_id = 8;
1753        assert!(streams
1754            .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
1755            .is_ok());
1756
1757        // One more than highest permitted
1758        let stream_id = 12;
1759        assert_eq!(
1760            streams
1761                .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
1762                .err(),
1763            Some(Error::StreamLimit)
1764        );
1765    }
1766
1767    fn cycle_stream_priority(stream_id: u64, streams: &mut StreamMap) {
1768        let key = streams.get(stream_id).unwrap().priority_key.clone();
1769        streams.update_priority(&key.clone(), &key);
1770    }
1771
1772    #[test]
1773    fn writable_prioritized_default_priority() {
1774        let local_tp = crate::TransportParams::default();
1775        let peer_tp = crate::TransportParams {
1776            initial_max_stream_data_bidi_local: 100,
1777            initial_max_stream_data_uni: 100,
1778            ..Default::default()
1779        };
1780
1781        let mut streams = StreamMap::new(100, 100, 100);
1782
1783        for id in [0, 4, 8, 12] {
1784            assert!(streams
1785                .get_or_create(id, &local_tp, &peer_tp, false, true)
1786                .is_ok());
1787        }
1788
1789        let walk_1: Vec<u64> = streams.writable().collect();
1790        cycle_stream_priority(*walk_1.first().unwrap(), &mut streams);
1791        let walk_2: Vec<u64> = streams.writable().collect();
1792        cycle_stream_priority(*walk_2.first().unwrap(), &mut streams);
1793        let walk_3: Vec<u64> = streams.writable().collect();
1794        cycle_stream_priority(*walk_3.first().unwrap(), &mut streams);
1795        let walk_4: Vec<u64> = streams.writable().collect();
1796        cycle_stream_priority(*walk_4.first().unwrap(), &mut streams);
1797        let walk_5: Vec<u64> = streams.writable().collect();
1798
1799        // All streams are non-incremental and same urgency by default. Multiple
1800        // visits shuffle their order.
1801        assert_eq!(walk_1, vec![0, 4, 8, 12]);
1802        assert_eq!(walk_2, vec![4, 8, 12, 0]);
1803        assert_eq!(walk_3, vec![8, 12, 0, 4]);
1804        assert_eq!(walk_4, vec![12, 0, 4, 8,]);
1805        assert_eq!(walk_5, vec![0, 4, 8, 12]);
1806    }
1807
1808    #[test]
1809    fn writable_prioritized_insert_order() {
1810        let local_tp = crate::TransportParams::default();
1811        let peer_tp = crate::TransportParams {
1812            initial_max_stream_data_bidi_local: 100,
1813            initial_max_stream_data_uni: 100,
1814            ..Default::default()
1815        };
1816
1817        let mut streams = StreamMap::new(100, 100, 100);
1818
1819        // Inserting same-urgency incremental streams in a "random" order yields
1820        // same order to start with.
1821        for id in [12, 4, 8, 0] {
1822            assert!(streams
1823                .get_or_create(id, &local_tp, &peer_tp, false, true)
1824                .is_ok());
1825        }
1826
1827        let walk_1: Vec<u64> = streams.writable().collect();
1828        cycle_stream_priority(*walk_1.first().unwrap(), &mut streams);
1829        let walk_2: Vec<u64> = streams.writable().collect();
1830        cycle_stream_priority(*walk_2.first().unwrap(), &mut streams);
1831        let walk_3: Vec<u64> = streams.writable().collect();
1832        cycle_stream_priority(*walk_3.first().unwrap(), &mut streams);
1833        let walk_4: Vec<u64> = streams.writable().collect();
1834        cycle_stream_priority(*walk_4.first().unwrap(), &mut streams);
1835        let walk_5: Vec<u64> = streams.writable().collect();
1836        assert_eq!(walk_1, vec![12, 4, 8, 0]);
1837        assert_eq!(walk_2, vec![4, 8, 0, 12]);
1838        assert_eq!(walk_3, vec![8, 0, 12, 4,]);
1839        assert_eq!(walk_4, vec![0, 12, 4, 8]);
1840        assert_eq!(walk_5, vec![12, 4, 8, 0]);
1841    }
1842
1843    #[test]
1844    fn writable_prioritized_mixed_urgency() {
1845        let local_tp = crate::TransportParams::default();
1846        let peer_tp = crate::TransportParams {
1847            initial_max_stream_data_bidi_local: 100,
1848            initial_max_stream_data_uni: 100,
1849            ..Default::default()
1850        };
1851
1852        let mut streams = <StreamMap>::new(100, 100, 100);
1853
1854        // Streams where the urgency descends (becomes more important). No stream
1855        // shares an urgency.
1856        let input = vec![
1857            (0, 100),
1858            (4, 90),
1859            (8, 80),
1860            (12, 70),
1861            (16, 60),
1862            (20, 50),
1863            (24, 40),
1864            (28, 30),
1865            (32, 20),
1866            (36, 10),
1867            (40, 0),
1868        ];
1869
1870        for (id, urgency) in input.clone() {
1871            // this duplicates some code from stream_priority in order to access
1872            // streams and the collection they're in
1873            let stream = streams
1874                .get_or_create(id, &local_tp, &peer_tp, false, true)
1875                .unwrap();
1876
1877            stream.urgency = urgency;
1878
1879            let new_priority_key = Arc::new(StreamPriorityKey {
1880                urgency: stream.urgency,
1881                incremental: stream.incremental,
1882                id,
1883                ..Default::default()
1884            });
1885
1886            let old_priority_key = std::mem::replace(
1887                &mut stream.priority_key,
1888                new_priority_key.clone(),
1889            );
1890
1891            streams.update_priority(&old_priority_key, &new_priority_key);
1892        }
1893
1894        let walk_1: Vec<u64> = streams.writable().collect();
1895        assert_eq!(walk_1, vec![40, 36, 32, 28, 24, 20, 16, 12, 8, 4, 0]);
1896
1897        // Re-applying priority to a stream does not cause duplication.
1898        for (id, urgency) in input {
1899            // this duplicates some code from stream_priority in order to access
1900            // streams and the collection they're in
1901            let stream = streams
1902                .get_or_create(id, &local_tp, &peer_tp, false, true)
1903                .unwrap();
1904
1905            stream.urgency = urgency;
1906
1907            let new_priority_key = Arc::new(StreamPriorityKey {
1908                urgency: stream.urgency,
1909                incremental: stream.incremental,
1910                id,
1911                ..Default::default()
1912            });
1913
1914            let old_priority_key = std::mem::replace(
1915                &mut stream.priority_key,
1916                new_priority_key.clone(),
1917            );
1918
1919            streams.update_priority(&old_priority_key, &new_priority_key);
1920        }
1921
1922        let walk_2: Vec<u64> = streams.writable().collect();
1923        assert_eq!(walk_2, vec![40, 36, 32, 28, 24, 20, 16, 12, 8, 4, 0]);
1924
1925        // Removing streams doesn't break expected ordering.
1926        streams.collect(24, true);
1927
1928        let walk_3: Vec<u64> = streams.writable().collect();
1929        assert_eq!(walk_3, vec![40, 36, 32, 28, 20, 16, 12, 8, 4, 0]);
1930
1931        streams.collect(40, true);
1932        streams.collect(0, true);
1933
1934        let walk_4: Vec<u64> = streams.writable().collect();
1935        assert_eq!(walk_4, vec![36, 32, 28, 20, 16, 12, 8, 4]);
1936
1937        // Adding streams doesn't break expected ordering.
1938        streams
1939            .get_or_create(44, &local_tp, &peer_tp, false, true)
1940            .unwrap();
1941
1942        let walk_5: Vec<u64> = streams.writable().collect();
1943        assert_eq!(walk_5, vec![36, 32, 28, 20, 16, 12, 8, 4, 44]);
1944    }
1945
1946    #[test]
1947    fn writable_prioritized_mixed_urgencies_incrementals() {
1948        let local_tp = crate::TransportParams::default();
1949        let peer_tp = crate::TransportParams {
1950            initial_max_stream_data_bidi_local: 100,
1951            initial_max_stream_data_uni: 100,
1952            ..Default::default()
1953        };
1954
1955        let mut streams = StreamMap::new(100, 100, 100);
1956
1957        // Streams that share some urgency level
1958        let input = vec![
1959            (0, 100),
1960            (4, 20),
1961            (8, 100),
1962            (12, 20),
1963            (16, 90),
1964            (20, 25),
1965            (24, 90),
1966            (28, 30),
1967            (32, 80),
1968            (36, 20),
1969            (40, 0),
1970        ];
1971
1972        for (id, urgency) in input.clone() {
1973            // this duplicates some code from stream_priority in order to access
1974            // streams and the collection they're in
1975            let stream = streams
1976                .get_or_create(id, &local_tp, &peer_tp, false, true)
1977                .unwrap();
1978
1979            stream.urgency = urgency;
1980
1981            let new_priority_key = Arc::new(StreamPriorityKey {
1982                urgency: stream.urgency,
1983                incremental: stream.incremental,
1984                id,
1985                ..Default::default()
1986            });
1987
1988            let old_priority_key = std::mem::replace(
1989                &mut stream.priority_key,
1990                new_priority_key.clone(),
1991            );
1992
1993            streams.update_priority(&old_priority_key, &new_priority_key);
1994        }
1995
1996        let walk_1: Vec<u64> = streams.writable().collect();
1997        cycle_stream_priority(4, &mut streams);
1998        cycle_stream_priority(16, &mut streams);
1999        cycle_stream_priority(0, &mut streams);
2000        let walk_2: Vec<u64> = streams.writable().collect();
2001        cycle_stream_priority(12, &mut streams);
2002        cycle_stream_priority(24, &mut streams);
2003        cycle_stream_priority(8, &mut streams);
2004        let walk_3: Vec<u64> = streams.writable().collect();
2005        cycle_stream_priority(36, &mut streams);
2006        cycle_stream_priority(16, &mut streams);
2007        cycle_stream_priority(0, &mut streams);
2008        let walk_4: Vec<u64> = streams.writable().collect();
2009        cycle_stream_priority(4, &mut streams);
2010        cycle_stream_priority(24, &mut streams);
2011        cycle_stream_priority(8, &mut streams);
2012        let walk_5: Vec<u64> = streams.writable().collect();
2013        cycle_stream_priority(12, &mut streams);
2014        cycle_stream_priority(16, &mut streams);
2015        cycle_stream_priority(0, &mut streams);
2016        let walk_6: Vec<u64> = streams.writable().collect();
2017        cycle_stream_priority(36, &mut streams);
2018        cycle_stream_priority(24, &mut streams);
2019        cycle_stream_priority(8, &mut streams);
2020        let walk_7: Vec<u64> = streams.writable().collect();
2021        cycle_stream_priority(4, &mut streams);
2022        cycle_stream_priority(16, &mut streams);
2023        cycle_stream_priority(0, &mut streams);
2024        let walk_8: Vec<u64> = streams.writable().collect();
2025        cycle_stream_priority(12, &mut streams);
2026        cycle_stream_priority(24, &mut streams);
2027        cycle_stream_priority(8, &mut streams);
2028        let walk_9: Vec<u64> = streams.writable().collect();
2029        cycle_stream_priority(36, &mut streams);
2030        cycle_stream_priority(16, &mut streams);
2031        cycle_stream_priority(0, &mut streams);
2032
2033        assert_eq!(walk_1, vec![40, 4, 12, 36, 20, 28, 32, 16, 24, 0, 8]);
2034        assert_eq!(walk_2, vec![40, 12, 36, 4, 20, 28, 32, 24, 16, 8, 0]);
2035        assert_eq!(walk_3, vec![40, 36, 4, 12, 20, 28, 32, 16, 24, 0, 8]);
2036        assert_eq!(walk_4, vec![40, 4, 12, 36, 20, 28, 32, 24, 16, 8, 0]);
2037        assert_eq!(walk_5, vec![40, 12, 36, 4, 20, 28, 32, 16, 24, 0, 8]);
2038        assert_eq!(walk_6, vec![40, 36, 4, 12, 20, 28, 32, 24, 16, 8, 0]);
2039        assert_eq!(walk_7, vec![40, 4, 12, 36, 20, 28, 32, 16, 24, 0, 8]);
2040        assert_eq!(walk_8, vec![40, 12, 36, 4, 20, 28, 32, 24, 16, 8, 0]);
2041        assert_eq!(walk_9, vec![40, 36, 4, 12, 20, 28, 32, 16, 24, 0, 8]);
2042
2043        // Removing streams doesn't break expected ordering.
2044        streams.collect(20, true);
2045
2046        let walk_10: Vec<u64> = streams.writable().collect();
2047        assert_eq!(walk_10, vec![40, 4, 12, 36, 28, 32, 24, 16, 8, 0]);
2048
2049        // Adding streams doesn't break expected ordering.
2050        let stream = streams
2051            .get_or_create(44, &local_tp, &peer_tp, false, true)
2052            .unwrap();
2053
2054        stream.urgency = 20;
2055        stream.incremental = true;
2056
2057        let new_priority_key = Arc::new(StreamPriorityKey {
2058            urgency: stream.urgency,
2059            incremental: stream.incremental,
2060            id: 44,
2061            ..Default::default()
2062        });
2063
2064        let old_priority_key =
2065            std::mem::replace(&mut stream.priority_key, new_priority_key.clone());
2066
2067        streams.update_priority(&old_priority_key, &new_priority_key);
2068
2069        let walk_11: Vec<u64> = streams.writable().collect();
2070        assert_eq!(walk_11, vec![40, 4, 12, 36, 44, 28, 32, 24, 16, 8, 0]);
2071    }
2072
2073    #[test]
2074    fn priority_tree_dupes() {
2075        let mut prioritized_writable: RBTree<StreamWritablePriorityAdapter> =
2076            Default::default();
2077
2078        for id in [0, 4, 8, 12] {
2079            let s = Arc::new(StreamPriorityKey {
2080                urgency: 0,
2081                incremental: false,
2082                id,
2083                ..Default::default()
2084            });
2085
2086            prioritized_writable.insert(s);
2087        }
2088
2089        let walk_1: Vec<u64> =
2090            prioritized_writable.iter().map(|s| s.id).collect();
2091        assert_eq!(walk_1, vec![0, 4, 8, 12]);
2092
2093        // Default keys could cause duplicate entries, this is normally protected
2094        // against via StreamMap.
2095        for id in [0, 4, 8, 12] {
2096            let s = Arc::new(StreamPriorityKey {
2097                urgency: 0,
2098                incremental: false,
2099                id,
2100                ..Default::default()
2101            });
2102
2103            prioritized_writable.insert(s);
2104        }
2105
2106        let walk_2: Vec<u64> =
2107            prioritized_writable.iter().map(|s| s.id).collect();
2108        assert_eq!(walk_2, vec![0, 0, 4, 4, 8, 8, 12, 12]);
2109    }
2110}
2111
2112mod recv_buf;
2113mod send_buf;