quiche/stream/
mod.rs

1// Copyright (C) 2018-2019, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27use std::cmp;
28
29use std::sync::Arc;
30
31use std::collections::hash_map;
32use std::collections::HashMap;
33use std::collections::HashSet;
34
35use intrusive_collections::intrusive_adapter;
36use intrusive_collections::KeyAdapter;
37use intrusive_collections::RBTree;
38use intrusive_collections::RBTreeAtomicLink;
39
40use smallvec::SmallVec;
41
42use crate::range_buf::DefaultBufFactory;
43use crate::BufFactory;
44use crate::Error;
45use crate::Result;
46
47const DEFAULT_URGENCY: u8 = 127;
48
49// The default size of the receiver stream flow control window.
50const DEFAULT_STREAM_WINDOW: u64 = 32 * 1024;
51
52/// The maximum size of the receiver stream flow control window.
53pub const MAX_STREAM_WINDOW: u64 = 16 * 1024 * 1024;
54
55/// A simple no-op hasher for Stream IDs.
56///
57/// The QUIC protocol and quiche library guarantees stream ID uniqueness, so
58/// we can save effort by avoiding using a more complicated algorithm.
59#[derive(Default)]
60pub struct StreamIdHasher {
61    id: u64,
62}
63
64/// Return value type of `RecvBuf::reset()`
65#[derive(Debug, PartialEq, Clone, Copy)]
66pub struct RecvBufResetReturn {
67    /// Returns the difference between the previous max_data offset
68    /// received and the final size reported by the reset
69    pub max_data_delta: u64,
70
71    /// The amount of flow control credit that should be returned to the
72    /// connection level flow control.
73    pub consumed_flowcontrol: u64,
74}
75
76impl RecvBufResetReturn {
77    pub fn zero() -> Self {
78        Self {
79            max_data_delta: 0,
80            consumed_flowcontrol: 0,
81        }
82    }
83}
84
85impl std::hash::Hasher for StreamIdHasher {
86    #[inline]
87    fn finish(&self) -> u64 {
88        self.id
89    }
90
91    #[inline]
92    fn write_u64(&mut self, id: u64) {
93        self.id = id;
94    }
95
96    #[inline]
97    fn write(&mut self, _: &[u8]) {
98        // We need a default write() for the trait but stream IDs will always
99        // be a u64 so we just delegate to write_u64.
100        unimplemented!()
101    }
102}
103
104type BuildStreamIdHasher = std::hash::BuildHasherDefault<StreamIdHasher>;
105
106pub type StreamIdHashMap<V> = HashMap<u64, V, BuildStreamIdHasher>;
107pub type StreamIdHashSet = HashSet<u64, BuildStreamIdHasher>;
108
109/// Keeps track of QUIC streams and enforces stream limits.
110#[derive(Default)]
111pub struct StreamMap<F: BufFactory = DefaultBufFactory> {
112    /// Map of streams indexed by stream ID.
113    streams: StreamIdHashMap<Stream<F>>,
114
115    /// Set of streams that were completed and garbage collected.
116    ///
117    /// Instead of keeping the full stream state forever, we collect completed
118    /// streams to save memory, but we still need to keep track of previously
119    /// created streams, to prevent peers from re-creating them.
120    collected: StreamIdHashSet,
121
122    /// Peer's maximum bidirectional stream count limit.
123    peer_max_streams_bidi: u64,
124
125    /// Peer's maximum unidirectional stream count limit.
126    peer_max_streams_uni: u64,
127
128    /// The total number of bidirectional streams opened by the peer.
129    peer_opened_streams_bidi: u64,
130
131    /// The total number of unidirectional streams opened by the peer.
132    peer_opened_streams_uni: u64,
133
134    /// Local maximum bidirectional stream count limit.
135    local_max_streams_bidi: u64,
136    local_max_streams_bidi_next: u64,
137
138    /// Local maximum unidirectional stream count limit.
139    local_max_streams_uni: u64,
140    local_max_streams_uni_next: u64,
141
142    /// The total number of bidirectional streams opened by the local endpoint.
143    local_opened_streams_bidi: u64,
144
145    /// The total number of unidirectional streams opened by the local endpoint.
146    local_opened_streams_uni: u64,
147
148    /// Queue of stream IDs corresponding to streams that have buffered data
149    /// ready to be sent to the peer. This also implies that the stream has
150    /// enough flow control credits to send at least some of that data.
151    flushable: RBTree<StreamFlushablePriorityAdapter>,
152
153    /// Set of stream IDs corresponding to streams that have outstanding data
154    /// to read. This is used to generate a `StreamIter` of streams without
155    /// having to iterate over the full list of streams.
156    pub readable: RBTree<StreamReadablePriorityAdapter>,
157
158    /// Set of stream IDs corresponding to streams that have enough flow control
159    /// capacity to be written to, and is not finished. This is used to generate
160    /// a `StreamIter` of streams without having to iterate over the full list
161    /// of streams.
162    pub writable: RBTree<StreamWritablePriorityAdapter>,
163
164    /// Set of stream IDs corresponding to streams that are almost out of flow
165    /// control credit and need to send MAX_STREAM_DATA. This is used to
166    /// generate a `StreamIter` of streams without having to iterate over the
167    /// full list of streams.
168    almost_full: StreamIdHashSet,
169
170    /// Set of stream IDs corresponding to streams that are blocked. The value
171    /// of the map elements represents the offset of the stream at which the
172    /// blocking occurred.
173    blocked: StreamIdHashMap<u64>,
174
175    /// Set of stream IDs corresponding to streams that are reset. The value
176    /// of the map elements is a tuple of the error code and final size values
177    /// to include in the RESET_STREAM frame.
178    reset: StreamIdHashMap<(u64, u64)>,
179
180    /// Set of stream IDs corresponding to streams that are shutdown on the
181    /// receive side, and need to send a STOP_SENDING frame. The value of the
182    /// map elements is the error code to include in the STOP_SENDING frame.
183    stopped: StreamIdHashMap<u64>,
184
185    /// The maximum size of a stream window.
186    max_stream_window: u64,
187}
188
189impl<F: BufFactory> StreamMap<F> {
190    pub fn new(
191        max_streams_bidi: u64, max_streams_uni: u64, max_stream_window: u64,
192    ) -> Self {
193        StreamMap {
194            local_max_streams_bidi: max_streams_bidi,
195            local_max_streams_bidi_next: max_streams_bidi,
196
197            local_max_streams_uni: max_streams_uni,
198            local_max_streams_uni_next: max_streams_uni,
199
200            max_stream_window,
201
202            ..StreamMap::default()
203        }
204    }
205
206    /// Returns the stream with the given ID if it exists.
207    pub fn get(&self, id: u64) -> Option<&Stream<F>> {
208        self.streams.get(&id)
209    }
210
211    /// Returns the mutable stream with the given ID if it exists.
212    pub fn get_mut(&mut self, id: u64) -> Option<&mut Stream<F>> {
213        self.streams.get_mut(&id)
214    }
215
216    /// Returns the mutable stream with the given ID if it exists, or creates
217    /// a new one otherwise.
218    ///
219    /// The `local` parameter indicates whether the stream's creation was
220    /// requested by the local application rather than the peer, and is
221    /// used to validate the requested stream ID, and to select the initial
222    /// flow control values from the local and remote transport parameters
223    /// (also passed as arguments).
224    ///
225    /// This also takes care of enforcing both local and the peer's stream
226    /// count limits. If one of these limits is violated, the `StreamLimit`
227    /// error is returned.
228    pub(crate) fn get_or_create(
229        &mut self, id: u64, local_params: &crate::TransportParams,
230        peer_params: &crate::TransportParams, local: bool, is_server: bool,
231    ) -> Result<&mut Stream<F>> {
232        let (stream, is_new_and_writable) = match self.streams.entry(id) {
233            hash_map::Entry::Vacant(v) => {
234                // Stream has already been closed and garbage collected.
235                if self.collected.contains(&id) {
236                    return Err(Error::Done);
237                }
238
239                if local != is_local(id, is_server) {
240                    return Err(Error::InvalidStreamState(id));
241                }
242
243                let (max_rx_data, max_tx_data) = match (local, is_bidi(id)) {
244                    // Locally-initiated bidirectional stream.
245                    (true, true) => (
246                        local_params.initial_max_stream_data_bidi_local,
247                        peer_params.initial_max_stream_data_bidi_remote,
248                    ),
249
250                    // Locally-initiated unidirectional stream.
251                    (true, false) => (0, peer_params.initial_max_stream_data_uni),
252
253                    // Remotely-initiated bidirectional stream.
254                    (false, true) => (
255                        local_params.initial_max_stream_data_bidi_remote,
256                        peer_params.initial_max_stream_data_bidi_local,
257                    ),
258
259                    // Remotely-initiated unidirectional stream.
260                    (false, false) =>
261                        (local_params.initial_max_stream_data_uni, 0),
262                };
263
264                // The two least significant bits from a stream id identify the
265                // type of stream. Truncate those bits to get the sequence for
266                // that stream type.
267                let stream_sequence = id >> 2;
268
269                // Enforce stream count limits.
270                match (is_local(id, is_server), is_bidi(id)) {
271                    (true, true) => {
272                        let n = cmp::max(
273                            self.local_opened_streams_bidi,
274                            stream_sequence + 1,
275                        );
276
277                        if n > self.peer_max_streams_bidi {
278                            return Err(Error::StreamLimit);
279                        }
280
281                        self.local_opened_streams_bidi = n;
282                    },
283
284                    (true, false) => {
285                        let n = cmp::max(
286                            self.local_opened_streams_uni,
287                            stream_sequence + 1,
288                        );
289
290                        if n > self.peer_max_streams_uni {
291                            return Err(Error::StreamLimit);
292                        }
293
294                        self.local_opened_streams_uni = n;
295                    },
296
297                    (false, true) => {
298                        let n = cmp::max(
299                            self.peer_opened_streams_bidi,
300                            stream_sequence + 1,
301                        );
302
303                        if n > self.local_max_streams_bidi {
304                            return Err(Error::StreamLimit);
305                        }
306
307                        self.peer_opened_streams_bidi = n;
308                    },
309
310                    (false, false) => {
311                        let n = cmp::max(
312                            self.peer_opened_streams_uni,
313                            stream_sequence + 1,
314                        );
315
316                        if n > self.local_max_streams_uni {
317                            return Err(Error::StreamLimit);
318                        }
319
320                        self.peer_opened_streams_uni = n;
321                    },
322                };
323
324                let s = Stream::new(
325                    id,
326                    max_rx_data,
327                    max_tx_data,
328                    is_bidi(id),
329                    local,
330                    self.max_stream_window,
331                );
332
333                let is_writable = s.is_writable();
334
335                (v.insert(s), is_writable)
336            },
337
338            hash_map::Entry::Occupied(v) => (v.into_mut(), false),
339        };
340
341        // Newly created stream might already be writable due to initial flow
342        // control limits.
343        if is_new_and_writable {
344            self.writable.insert(Arc::clone(&stream.priority_key));
345        }
346
347        Ok(stream)
348    }
349
350    /// Adds the stream ID to the readable streams set.
351    ///
352    /// If the stream was already in the list, this does nothing.
353    pub fn insert_readable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
354        if !priority_key.readable.is_linked() {
355            self.readable.insert(Arc::clone(priority_key));
356        }
357    }
358
359    /// Removes the stream ID from the readable streams set.
360    pub fn remove_readable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
361        if !priority_key.readable.is_linked() {
362            return;
363        }
364
365        let mut c = {
366            let ptr = Arc::as_ptr(priority_key);
367            unsafe { self.readable.cursor_mut_from_ptr(ptr) }
368        };
369
370        c.remove();
371    }
372
373    /// Adds the stream ID to the writable streams set.
374    ///
375    /// This should also be called anytime a new stream is created, in addition
376    /// to when an existing stream becomes writable.
377    ///
378    /// If the stream was already in the list, this does nothing.
379    pub fn insert_writable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
380        if !priority_key.writable.is_linked() {
381            self.writable.insert(Arc::clone(priority_key));
382        }
383    }
384
385    /// Removes the stream ID from the writable streams set.
386    ///
387    /// This should also be called anytime an existing stream stops being
388    /// writable.
389    pub fn remove_writable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
390        if !priority_key.writable.is_linked() {
391            return;
392        }
393
394        let mut c = {
395            let ptr = Arc::as_ptr(priority_key);
396            unsafe { self.writable.cursor_mut_from_ptr(ptr) }
397        };
398
399        c.remove();
400    }
401
402    /// Adds the stream ID to the flushable streams set.
403    ///
404    /// If the stream was already in the list, this does nothing.
405    pub fn insert_flushable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
406        if !priority_key.flushable.is_linked() {
407            self.flushable.insert(Arc::clone(priority_key));
408        }
409    }
410
411    /// Removes the stream ID from the flushable streams set.
412    pub fn remove_flushable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
413        if !priority_key.flushable.is_linked() {
414            return;
415        }
416
417        let mut c = {
418            let ptr = Arc::as_ptr(priority_key);
419            unsafe { self.flushable.cursor_mut_from_ptr(ptr) }
420        };
421
422        c.remove();
423    }
424
425    pub fn peek_flushable(&self) -> Option<Arc<StreamPriorityKey>> {
426        self.flushable.front().clone_pointer()
427    }
428
429    /// Updates the priorities of a stream.
430    pub fn update_priority(
431        &mut self, old: &Arc<StreamPriorityKey>, new: &Arc<StreamPriorityKey>,
432    ) {
433        if old.readable.is_linked() {
434            self.remove_readable(old);
435            self.readable.insert(Arc::clone(new));
436        }
437
438        if old.writable.is_linked() {
439            self.remove_writable(old);
440            self.writable.insert(Arc::clone(new));
441        }
442
443        if old.flushable.is_linked() {
444            self.remove_flushable(old);
445            self.flushable.insert(Arc::clone(new));
446        }
447    }
448
449    /// Adds the stream ID to the almost full streams set.
450    ///
451    /// If the stream was already in the list, this does nothing.
452    pub fn insert_almost_full(&mut self, stream_id: u64) {
453        self.almost_full.insert(stream_id);
454    }
455
456    /// Removes the stream ID from the almost full streams set.
457    pub fn remove_almost_full(&mut self, stream_id: u64) {
458        self.almost_full.remove(&stream_id);
459    }
460
461    /// Adds the stream ID to the blocked streams set with the
462    /// given offset value.
463    ///
464    /// If the stream was already in the list, this does nothing.
465    pub fn insert_blocked(&mut self, stream_id: u64, off: u64) {
466        self.blocked.insert(stream_id, off);
467    }
468
469    /// Removes the stream ID from the blocked streams set.
470    pub fn remove_blocked(&mut self, stream_id: u64) {
471        self.blocked.remove(&stream_id);
472    }
473
474    /// Adds the stream ID to the reset streams set with the
475    /// given error code and final size values.
476    ///
477    /// If the stream was already in the list, this does nothing.
478    pub fn insert_reset(
479        &mut self, stream_id: u64, error_code: u64, final_size: u64,
480    ) {
481        self.reset.insert(stream_id, (error_code, final_size));
482    }
483
484    /// Removes the stream ID from the reset streams set.
485    pub fn remove_reset(&mut self, stream_id: u64) {
486        self.reset.remove(&stream_id);
487    }
488
489    /// Adds the stream ID to the stopped streams set with the
490    /// given error code.
491    ///
492    /// If the stream was already in the list, this does nothing.
493    pub fn insert_stopped(&mut self, stream_id: u64, error_code: u64) {
494        self.stopped.insert(stream_id, error_code);
495    }
496
497    /// Removes the stream ID from the stopped streams set.
498    pub fn remove_stopped(&mut self, stream_id: u64) {
499        self.stopped.remove(&stream_id);
500    }
501
502    /// Updates the peer's maximum bidirectional stream count limit.
503    pub fn update_peer_max_streams_bidi(&mut self, v: u64) {
504        self.peer_max_streams_bidi = cmp::max(self.peer_max_streams_bidi, v);
505    }
506
507    /// Updates the peer's maximum unidirectional stream count limit.
508    pub fn update_peer_max_streams_uni(&mut self, v: u64) {
509        self.peer_max_streams_uni = cmp::max(self.peer_max_streams_uni, v);
510    }
511
512    /// Commits the new max_streams_bidi limit.
513    pub fn update_max_streams_bidi(&mut self) {
514        self.local_max_streams_bidi = self.local_max_streams_bidi_next;
515    }
516
517    /// Sets the max_streams_bidi limit to the given value.
518    pub fn set_max_streams_bidi(&mut self, max: u64) {
519        self.local_max_streams_bidi = max;
520        self.local_max_streams_bidi_next = max;
521    }
522
523    /// Returns the current max_streams_bidi limit.
524    pub fn max_streams_bidi(&self) -> u64 {
525        self.local_max_streams_bidi
526    }
527
528    /// Returns the new max_streams_bidi limit.
529    pub fn max_streams_bidi_next(&mut self) -> u64 {
530        self.local_max_streams_bidi_next
531    }
532
533    /// Commits the new max_streams_uni limit.
534    pub fn update_max_streams_uni(&mut self) {
535        self.local_max_streams_uni = self.local_max_streams_uni_next;
536    }
537
538    /// Returns the new max_streams_uni limit.
539    pub fn max_streams_uni_next(&mut self) -> u64 {
540        self.local_max_streams_uni_next
541    }
542
543    /// Returns the number of bidirectional streams that can be created
544    /// before the peer's stream count limit is reached.
545    pub fn peer_streams_left_bidi(&self) -> u64 {
546        self.peer_max_streams_bidi - self.local_opened_streams_bidi
547    }
548
549    /// Returns the number of unidirectional streams that can be created
550    /// before the peer's stream count limit is reached.
551    pub fn peer_streams_left_uni(&self) -> u64 {
552        self.peer_max_streams_uni - self.local_opened_streams_uni
553    }
554
555    /// Drops completed stream.
556    ///
557    /// This should only be called when Stream::is_complete() returns true for
558    /// the given stream.
559    pub fn collect(&mut self, stream_id: u64, local: bool) {
560        if !local {
561            // If the stream was created by the peer, give back a max streams
562            // credit.
563            if is_bidi(stream_id) {
564                self.local_max_streams_bidi_next =
565                    self.local_max_streams_bidi_next.saturating_add(1);
566            } else {
567                self.local_max_streams_uni_next =
568                    self.local_max_streams_uni_next.saturating_add(1);
569            }
570        }
571
572        let s = self.streams.remove(&stream_id).unwrap();
573
574        self.remove_readable(&s.priority_key);
575
576        self.remove_writable(&s.priority_key);
577
578        self.remove_flushable(&s.priority_key);
579
580        self.collected.insert(stream_id);
581    }
582
583    /// Creates an iterator over streams that have outstanding data to read.
584    pub fn readable(&self) -> StreamIter {
585        StreamIter {
586            streams: self.readable.iter().map(|s| s.id).collect(),
587            index: 0,
588        }
589    }
590
591    /// Creates an iterator over streams that can be written to.
592    pub fn writable(&self) -> StreamIter {
593        StreamIter {
594            streams: self.writable.iter().map(|s| s.id).collect(),
595            index: 0,
596        }
597    }
598
599    /// Creates an iterator over streams that need to send MAX_STREAM_DATA.
600    pub fn almost_full(&self) -> StreamIter {
601        StreamIter::from(&self.almost_full)
602    }
603
604    /// Creates an iterator over streams that need to send STREAM_DATA_BLOCKED.
605    pub fn blocked(&self) -> hash_map::Iter<'_, u64, u64> {
606        self.blocked.iter()
607    }
608
609    /// Creates an iterator over streams that need to send RESET_STREAM.
610    pub fn reset(&self) -> hash_map::Iter<'_, u64, (u64, u64)> {
611        self.reset.iter()
612    }
613
614    /// Creates an iterator over streams that need to send STOP_SENDING.
615    pub fn stopped(&self) -> hash_map::Iter<'_, u64, u64> {
616        self.stopped.iter()
617    }
618
619    /// Returns true if the stream has been collected.
620    pub fn is_collected(&self, stream_id: u64) -> bool {
621        self.collected.contains(&stream_id)
622    }
623
624    /// Returns true if there are any streams that have data to write.
625    pub fn has_flushable(&self) -> bool {
626        !self.flushable.is_empty()
627    }
628
629    /// Returns true if there are any streams that have data to read.
630    pub fn has_readable(&self) -> bool {
631        !self.readable.is_empty()
632    }
633
634    /// Returns true if there are any streams that need to update the local
635    /// flow control limit.
636    pub fn has_almost_full(&self) -> bool {
637        !self.almost_full.is_empty()
638    }
639
640    /// Returns true if there are any streams that are blocked.
641    pub fn has_blocked(&self) -> bool {
642        !self.blocked.is_empty()
643    }
644
645    /// Returns true if there are any streams that are reset.
646    pub fn has_reset(&self) -> bool {
647        !self.reset.is_empty()
648    }
649
650    /// Returns true if there are any streams that need to send STOP_SENDING.
651    pub fn has_stopped(&self) -> bool {
652        !self.stopped.is_empty()
653    }
654
655    /// Returns true if the max bidirectional streams count needs to be updated
656    /// by sending a MAX_STREAMS frame to the peer.
657    pub fn should_update_max_streams_bidi(&self) -> bool {
658        self.local_max_streams_bidi_next != self.local_max_streams_bidi &&
659            self.local_max_streams_bidi_next / 2 >
660                self.local_max_streams_bidi - self.peer_opened_streams_bidi
661    }
662
663    /// Returns true if the max unidirectional streams count needs to be updated
664    /// by sending a MAX_STREAMS frame to the peer.
665    pub fn should_update_max_streams_uni(&self) -> bool {
666        self.local_max_streams_uni_next != self.local_max_streams_uni &&
667            self.local_max_streams_uni_next / 2 >
668                self.local_max_streams_uni - self.peer_opened_streams_uni
669    }
670
671    /// Returns the number of active streams in the map.
672    #[cfg(test)]
673    pub fn len(&self) -> usize {
674        self.streams.len()
675    }
676}
677
678/// A QUIC stream.
679pub struct Stream<F: BufFactory = DefaultBufFactory> {
680    /// Receive-side stream buffer.
681    pub recv: recv_buf::RecvBuf,
682
683    /// Send-side stream buffer.
684    pub send: send_buf::SendBuf<F>,
685
686    pub send_lowat: usize,
687
688    /// Whether the stream is bidirectional.
689    pub bidi: bool,
690
691    /// Whether the stream was created by the local endpoint.
692    pub local: bool,
693
694    /// The stream's urgency (lower is better). Default is `DEFAULT_URGENCY`.
695    pub urgency: u8,
696
697    /// Whether the stream can be flushed incrementally. Default is `true`.
698    pub incremental: bool,
699
700    pub priority_key: Arc<StreamPriorityKey>,
701}
702
703impl<F: BufFactory> Stream<F> {
704    /// Creates a new stream with the given flow control limits.
705    pub fn new(
706        id: u64, max_rx_data: u64, max_tx_data: u64, bidi: bool, local: bool,
707        max_window: u64,
708    ) -> Self {
709        let priority_key = Arc::new(StreamPriorityKey {
710            id,
711            ..Default::default()
712        });
713
714        Stream {
715            recv: recv_buf::RecvBuf::new(max_rx_data, max_window),
716            send: send_buf::SendBuf::new(max_tx_data),
717            send_lowat: 1,
718            bidi,
719            local,
720            urgency: priority_key.urgency,
721            incremental: priority_key.incremental,
722            priority_key,
723        }
724    }
725
726    /// Returns true if the stream has data to read.
727    pub fn is_readable(&self) -> bool {
728        self.recv.ready()
729    }
730
731    /// Returns true if the stream has enough flow control capacity to be
732    /// written to, and is not finished.
733    pub fn is_writable(&self) -> bool {
734        !self.send.is_shutdown() &&
735            !self.send.is_fin() &&
736            (self.send.off_back() + self.send_lowat as u64) <
737                self.send.max_off()
738    }
739
740    /// Returns true if the stream has data to send and is allowed to send at
741    /// least some of it.
742    pub fn is_flushable(&self) -> bool {
743        let off_front = self.send.off_front();
744
745        !self.send.is_empty() &&
746            off_front < self.send.off_back() &&
747            off_front < self.send.max_off()
748    }
749
750    /// Returns true if the stream is complete.
751    ///
752    /// For bidirectional streams this happens when both the receive and send
753    /// sides are complete. That is when all incoming data has been read by the
754    /// application, and when all outgoing data has been acked by the peer.
755    ///
756    /// For unidirectional streams this happens when either the receive or send
757    /// side is complete, depending on whether the stream was created locally
758    /// or not.
759    pub fn is_complete(&self) -> bool {
760        match (self.bidi, self.local) {
761            // For bidirectional streams we need to check both receive and send
762            // sides for completion.
763            (true, _) => self.recv.is_fin() && self.send.is_complete(),
764
765            // For unidirectional streams generated locally, we only need to
766            // check the send side for completion.
767            (false, true) => self.send.is_complete(),
768
769            // For unidirectional streams generated by the peer, we only need
770            // to check the receive side for completion.
771            (false, false) => self.recv.is_fin(),
772        }
773    }
774}
775
776/// Returns true if the stream was created locally.
777pub fn is_local(stream_id: u64, is_server: bool) -> bool {
778    (stream_id & 0x1) == (is_server as u64)
779}
780
781/// Returns true if the stream is bidirectional.
782pub fn is_bidi(stream_id: u64) -> bool {
783    (stream_id & 0x2) == 0
784}
785
786#[derive(Clone, Debug)]
787pub struct StreamPriorityKey {
788    pub urgency: u8,
789    pub incremental: bool,
790    pub id: u64,
791
792    pub readable: RBTreeAtomicLink,
793    pub writable: RBTreeAtomicLink,
794    pub flushable: RBTreeAtomicLink,
795}
796
797impl Default for StreamPriorityKey {
798    fn default() -> Self {
799        Self {
800            urgency: DEFAULT_URGENCY,
801            incremental: true,
802            id: Default::default(),
803            readable: Default::default(),
804            writable: Default::default(),
805            flushable: Default::default(),
806        }
807    }
808}
809
810impl PartialEq for StreamPriorityKey {
811    fn eq(&self, other: &Self) -> bool {
812        self.id == other.id
813    }
814}
815
816impl Eq for StreamPriorityKey {}
817
818impl PartialOrd for StreamPriorityKey {
819    fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
820        Some(self.cmp(other))
821    }
822}
823
824impl Ord for StreamPriorityKey {
825    fn cmp(&self, other: &Self) -> cmp::Ordering {
826        // Ignore priority if ID matches.
827        if self.id == other.id {
828            return cmp::Ordering::Equal;
829        }
830
831        // First, order by urgency...
832        if self.urgency != other.urgency {
833            return self.urgency.cmp(&other.urgency);
834        }
835
836        // ...when the urgency is the same, and both are not incremental, order
837        // by stream ID...
838        if !self.incremental && !other.incremental {
839            return self.id.cmp(&other.id);
840        }
841
842        // ...non-incremental takes priority over incremental...
843        if self.incremental && !other.incremental {
844            return cmp::Ordering::Greater;
845        }
846        if !self.incremental && other.incremental {
847            return cmp::Ordering::Less;
848        }
849
850        // ...finally, when both are incremental, `other` takes precedence (so
851        // `self` is always sorted after other same-urgency incremental
852        // entries).
853        cmp::Ordering::Greater
854    }
855}
856
857intrusive_adapter!(pub StreamWritablePriorityAdapter = Arc<StreamPriorityKey>: StreamPriorityKey { writable: RBTreeAtomicLink });
858
859impl KeyAdapter<'_> for StreamWritablePriorityAdapter {
860    type Key = StreamPriorityKey;
861
862    fn get_key(&self, s: &StreamPriorityKey) -> Self::Key {
863        s.clone()
864    }
865}
866
867intrusive_adapter!(pub StreamReadablePriorityAdapter = Arc<StreamPriorityKey>: StreamPriorityKey { readable: RBTreeAtomicLink });
868
869impl KeyAdapter<'_> for StreamReadablePriorityAdapter {
870    type Key = StreamPriorityKey;
871
872    fn get_key(&self, s: &StreamPriorityKey) -> Self::Key {
873        s.clone()
874    }
875}
876
877intrusive_adapter!(pub StreamFlushablePriorityAdapter = Arc<StreamPriorityKey>: StreamPriorityKey { flushable: RBTreeAtomicLink });
878
879impl KeyAdapter<'_> for StreamFlushablePriorityAdapter {
880    type Key = StreamPriorityKey;
881
882    fn get_key(&self, s: &StreamPriorityKey) -> Self::Key {
883        s.clone()
884    }
885}
886
887/// An iterator over QUIC streams.
888#[derive(Default)]
889pub struct StreamIter {
890    streams: SmallVec<[u64; 8]>,
891    index: usize,
892}
893
894impl StreamIter {
895    #[inline]
896    fn from(streams: &StreamIdHashSet) -> Self {
897        StreamIter {
898            streams: streams.iter().copied().collect(),
899            index: 0,
900        }
901    }
902}
903
904impl Iterator for StreamIter {
905    type Item = u64;
906
907    #[inline]
908    fn next(&mut self) -> Option<Self::Item> {
909        let v = self.streams.get(self.index)?;
910        self.index += 1;
911        Some(*v)
912    }
913}
914
915impl ExactSizeIterator for StreamIter {
916    #[inline]
917    fn len(&self) -> usize {
918        self.streams.len() - self.index
919    }
920}
921
922#[cfg(test)]
923mod tests {
924    use crate::range_buf::RangeBuf;
925
926    use super::*;
927
928    #[test]
929    fn recv_flow_control() {
930        let mut stream =
931            <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
932        assert!(!stream.recv.almost_full());
933
934        let mut buf = [0; 32];
935
936        let first = RangeBuf::from(b"hello", 0, false);
937        let second = RangeBuf::from(b"world", 5, false);
938        let third = RangeBuf::from(b"something", 10, false);
939
940        assert_eq!(stream.recv.write(second), Ok(()));
941        assert_eq!(stream.recv.write(first), Ok(()));
942        assert!(!stream.recv.almost_full());
943
944        assert_eq!(stream.recv.write(third), Err(Error::FlowControl));
945
946        let (len, fin) = stream.recv.emit(&mut buf).unwrap();
947        assert_eq!(&buf[..len], b"helloworld");
948        assert!(!fin);
949
950        assert!(stream.recv.almost_full());
951
952        stream.recv.update_max_data(std::time::Instant::now());
953        assert_eq!(stream.recv.max_data_next(), 25);
954        assert!(!stream.recv.almost_full());
955
956        let third = RangeBuf::from(b"something", 10, false);
957        assert_eq!(stream.recv.write(third), Ok(()));
958    }
959
960    #[test]
961    fn recv_past_fin() {
962        let mut stream =
963            <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
964        assert!(!stream.recv.almost_full());
965
966        let first = RangeBuf::from(b"hello", 0, true);
967        let second = RangeBuf::from(b"world", 5, false);
968
969        assert_eq!(stream.recv.write(first), Ok(()));
970        assert_eq!(stream.recv.write(second), Err(Error::FinalSize));
971    }
972
973    #[test]
974    fn recv_fin_dup() {
975        let mut stream =
976            <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
977        assert!(!stream.recv.almost_full());
978
979        let first = RangeBuf::from(b"hello", 0, true);
980        let second = RangeBuf::from(b"hello", 0, true);
981
982        assert_eq!(stream.recv.write(first), Ok(()));
983        assert_eq!(stream.recv.write(second), Ok(()));
984
985        let mut buf = [0; 32];
986
987        let (len, fin) = stream.recv.emit(&mut buf).unwrap();
988        assert_eq!(&buf[..len], b"hello");
989        assert!(fin);
990    }
991
992    #[test]
993    fn recv_fin_change() {
994        let mut stream =
995            <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
996        assert!(!stream.recv.almost_full());
997
998        let first = RangeBuf::from(b"hello", 0, true);
999        let second = RangeBuf::from(b"world", 5, true);
1000
1001        assert_eq!(stream.recv.write(second), Ok(()));
1002        assert_eq!(stream.recv.write(first), Err(Error::FinalSize));
1003    }
1004
1005    #[test]
1006    fn recv_fin_lower_than_received() {
1007        let mut stream =
1008            <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1009        assert!(!stream.recv.almost_full());
1010
1011        let first = RangeBuf::from(b"hello", 0, true);
1012        let second = RangeBuf::from(b"world", 5, false);
1013
1014        assert_eq!(stream.recv.write(second), Ok(()));
1015        assert_eq!(stream.recv.write(first), Err(Error::FinalSize));
1016    }
1017
1018    #[test]
1019    fn recv_fin_flow_control() {
1020        let mut stream =
1021            <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1022        assert!(!stream.recv.almost_full());
1023
1024        let mut buf = [0; 32];
1025
1026        let first = RangeBuf::from(b"hello", 0, false);
1027        let second = RangeBuf::from(b"world", 5, true);
1028
1029        assert_eq!(stream.recv.write(first), Ok(()));
1030        assert_eq!(stream.recv.write(second), Ok(()));
1031
1032        let (len, fin) = stream.recv.emit(&mut buf).unwrap();
1033        assert_eq!(&buf[..len], b"helloworld");
1034        assert!(fin);
1035
1036        assert!(!stream.recv.almost_full());
1037    }
1038
1039    #[test]
1040    fn recv_fin_reset_mismatch() {
1041        let mut stream =
1042            <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1043        assert!(!stream.recv.almost_full());
1044
1045        let first = RangeBuf::from(b"hello", 0, true);
1046
1047        assert_eq!(stream.recv.write(first), Ok(()));
1048        assert_eq!(stream.recv.reset(0, 10), Err(Error::FinalSize));
1049    }
1050
1051    #[test]
1052    fn recv_reset_with_gap() {
1053        let mut stream =
1054            <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1055        assert!(!stream.recv.almost_full());
1056
1057        let first = RangeBuf::from(b"hello", 0, false);
1058
1059        assert_eq!(stream.recv.write(first), Ok(()));
1060        // Read one byte.
1061        assert_eq!(stream.recv.emit(&mut [0; 1]), Ok((1, false)));
1062        // Reset with a final size > than max previously received
1063        assert_eq!(
1064            stream.recv.reset(0, 10),
1065            Ok(RecvBufResetReturn {
1066                max_data_delta: 5,
1067                // consumed_flowcontrol is 9, since we already read 1 byte
1068                consumed_flowcontrol: 9
1069            })
1070        );
1071        assert_eq!(stream.recv.reset(0, 10), Ok(RecvBufResetReturn::zero()));
1072    }
1073
1074    #[test]
1075    fn recv_reset_dup() {
1076        let mut stream =
1077            <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1078        assert!(!stream.recv.almost_full());
1079
1080        let first = RangeBuf::from(b"hello", 0, false);
1081
1082        assert_eq!(stream.recv.write(first), Ok(()));
1083        assert_eq!(
1084            stream.recv.reset(0, 5),
1085            Ok(RecvBufResetReturn {
1086                max_data_delta: 0,
1087                consumed_flowcontrol: 5
1088            })
1089        );
1090        assert_eq!(stream.recv.reset(0, 5), Ok(RecvBufResetReturn::zero()));
1091    }
1092
1093    #[test]
1094    fn recv_reset_change() {
1095        let mut stream =
1096            <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1097        assert!(!stream.recv.almost_full());
1098
1099        let first = RangeBuf::from(b"hello", 0, false);
1100
1101        assert_eq!(stream.recv.write(first), Ok(()));
1102        assert_eq!(
1103            stream.recv.reset(0, 5),
1104            Ok(RecvBufResetReturn {
1105                max_data_delta: 0,
1106                consumed_flowcontrol: 5
1107            })
1108        );
1109        assert_eq!(stream.recv.reset(0, 10), Err(Error::FinalSize));
1110    }
1111
1112    #[test]
1113    fn recv_reset_lower_than_received() {
1114        let mut stream =
1115            <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1116        assert!(!stream.recv.almost_full());
1117
1118        let first = RangeBuf::from(b"hello", 0, false);
1119
1120        assert_eq!(stream.recv.write(first), Ok(()));
1121        assert_eq!(stream.recv.reset(0, 4), Err(Error::FinalSize));
1122    }
1123
1124    #[test]
1125    fn send_flow_control() {
1126        let mut buf = [0; 25];
1127
1128        let mut stream =
1129            <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1130
1131        let first = b"hello";
1132        let second = b"world";
1133        let third = b"something";
1134
1135        assert!(stream.send.write(first, false).is_ok());
1136        assert!(stream.send.write(second, false).is_ok());
1137        assert!(stream.send.write(third, false).is_ok());
1138
1139        assert_eq!(stream.send.off_front(), 0);
1140
1141        let (written, fin) = stream.send.emit(&mut buf[..25]).unwrap();
1142        assert_eq!(written, 15);
1143        assert!(!fin);
1144        assert_eq!(&buf[..written], b"helloworldsomet");
1145
1146        assert_eq!(stream.send.off_front(), 15);
1147
1148        let (written, fin) = stream.send.emit(&mut buf[..25]).unwrap();
1149        assert_eq!(written, 0);
1150        assert!(!fin);
1151        assert_eq!(&buf[..written], b"");
1152
1153        stream.send.retransmit(0, 15);
1154
1155        assert_eq!(stream.send.off_front(), 0);
1156
1157        let (written, fin) = stream.send.emit(&mut buf[..10]).unwrap();
1158        assert_eq!(written, 10);
1159        assert!(!fin);
1160        assert_eq!(&buf[..written], b"helloworld");
1161
1162        assert_eq!(stream.send.off_front(), 10);
1163
1164        let (written, fin) = stream.send.emit(&mut buf[..10]).unwrap();
1165        assert_eq!(written, 5);
1166        assert!(!fin);
1167        assert_eq!(&buf[..written], b"somet");
1168    }
1169
1170    #[test]
1171    fn send_past_fin() {
1172        let mut stream =
1173            <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1174
1175        let first = b"hello";
1176        let second = b"world";
1177        let third = b"third";
1178
1179        assert_eq!(stream.send.write(first, false), Ok(5));
1180
1181        assert_eq!(stream.send.write(second, true), Ok(5));
1182        assert!(stream.send.is_fin());
1183
1184        assert_eq!(stream.send.write(third, false), Err(Error::FinalSize));
1185    }
1186
1187    #[test]
1188    fn send_fin_dup() {
1189        let mut stream =
1190            <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1191
1192        assert_eq!(stream.send.write(b"hello", true), Ok(5));
1193        assert!(stream.send.is_fin());
1194
1195        assert_eq!(stream.send.write(b"", true), Ok(0));
1196        assert!(stream.send.is_fin());
1197    }
1198
1199    #[test]
1200    fn send_undo_fin() {
1201        let mut stream =
1202            <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1203
1204        assert_eq!(stream.send.write(b"hello", true), Ok(5));
1205        assert!(stream.send.is_fin());
1206
1207        assert_eq!(
1208            stream.send.write(b"helloworld", true),
1209            Err(Error::FinalSize)
1210        );
1211    }
1212
1213    #[test]
1214    fn send_fin_max_data_match() {
1215        let mut buf = [0; 15];
1216
1217        let mut stream =
1218            <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1219
1220        let slice = b"hellohellohello";
1221
1222        assert!(stream.send.write(slice, true).is_ok());
1223
1224        let (written, fin) = stream.send.emit(&mut buf[..15]).unwrap();
1225        assert_eq!(written, 15);
1226        assert!(fin);
1227        assert_eq!(&buf[..written], slice);
1228    }
1229
1230    #[test]
1231    fn send_fin_zero_length() {
1232        let mut buf = [0; 5];
1233
1234        let mut stream =
1235            <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1236
1237        assert_eq!(stream.send.write(b"hello", false), Ok(5));
1238        assert_eq!(stream.send.write(b"", true), Ok(0));
1239        assert!(stream.send.is_fin());
1240
1241        let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1242        assert_eq!(written, 5);
1243        assert!(fin);
1244        assert_eq!(&buf[..written], b"hello");
1245    }
1246
1247    #[test]
1248    fn send_ack() {
1249        let mut buf = [0; 5];
1250
1251        let mut stream =
1252            <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1253
1254        assert_eq!(stream.send.write(b"hello", false), Ok(5));
1255        assert_eq!(stream.send.write(b"world", false), Ok(5));
1256        assert_eq!(stream.send.write(b"", true), Ok(0));
1257        assert!(stream.send.is_fin());
1258
1259        assert_eq!(stream.send.off_front(), 0);
1260
1261        let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1262        assert_eq!(written, 5);
1263        assert!(!fin);
1264        assert_eq!(&buf[..written], b"hello");
1265
1266        stream.send.ack_and_drop(0, 5);
1267
1268        stream.send.retransmit(0, 5);
1269
1270        assert_eq!(stream.send.off_front(), 5);
1271
1272        let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1273        assert_eq!(written, 5);
1274        assert!(fin);
1275        assert_eq!(&buf[..written], b"world");
1276    }
1277
1278    #[test]
1279    fn send_ack_reordering() {
1280        let mut buf = [0; 5];
1281
1282        let mut stream =
1283            <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1284
1285        assert_eq!(stream.send.write(b"hello", false), Ok(5));
1286        assert_eq!(stream.send.write(b"world", false), Ok(5));
1287        assert_eq!(stream.send.write(b"", true), Ok(0));
1288        assert!(stream.send.is_fin());
1289
1290        assert_eq!(stream.send.off_front(), 0);
1291
1292        let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1293        assert_eq!(written, 5);
1294        assert!(!fin);
1295        assert_eq!(&buf[..written], b"hello");
1296
1297        assert_eq!(stream.send.off_front(), 5);
1298
1299        let (written, fin) = stream.send.emit(&mut buf[..1]).unwrap();
1300        assert_eq!(written, 1);
1301        assert!(!fin);
1302        assert_eq!(&buf[..written], b"w");
1303
1304        stream.send.ack_and_drop(5, 1);
1305        stream.send.ack_and_drop(0, 5);
1306
1307        stream.send.retransmit(0, 5);
1308        stream.send.retransmit(5, 1);
1309
1310        assert_eq!(stream.send.off_front(), 6);
1311
1312        let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1313        assert_eq!(written, 4);
1314        assert!(fin);
1315        assert_eq!(&buf[..written], b"orld");
1316    }
1317
1318    #[test]
1319    fn recv_data_below_off() {
1320        let mut stream =
1321            <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1322
1323        let first = RangeBuf::from(b"hello", 0, false);
1324
1325        assert_eq!(stream.recv.write(first), Ok(()));
1326
1327        let mut buf = [0; 10];
1328
1329        let (len, fin) = stream.recv.emit(&mut buf).unwrap();
1330        assert_eq!(&buf[..len], b"hello");
1331        assert!(!fin);
1332
1333        let first = RangeBuf::from(b"elloworld", 1, true);
1334        assert_eq!(stream.recv.write(first), Ok(()));
1335
1336        let (len, fin) = stream.recv.emit(&mut buf).unwrap();
1337        assert_eq!(&buf[..len], b"world");
1338        assert!(fin);
1339    }
1340
1341    #[test]
1342    fn stream_complete() {
1343        let mut stream =
1344            <Stream>::new(0, 30, 30, true, true, DEFAULT_STREAM_WINDOW);
1345
1346        assert_eq!(stream.send.write(b"hello", false), Ok(5));
1347        assert_eq!(stream.send.write(b"world", false), Ok(5));
1348
1349        assert!(!stream.send.is_complete());
1350        assert!(!stream.send.is_fin());
1351
1352        assert_eq!(stream.send.write(b"", true), Ok(0));
1353
1354        assert!(!stream.send.is_complete());
1355        assert!(stream.send.is_fin());
1356
1357        let buf = RangeBuf::from(b"hello", 0, true);
1358        assert!(stream.recv.write(buf).is_ok());
1359        assert!(!stream.recv.is_fin());
1360
1361        stream.send.ack(6, 4);
1362        assert!(!stream.send.is_complete());
1363
1364        let mut buf = [0; 2];
1365        assert_eq!(stream.recv.emit(&mut buf), Ok((2, false)));
1366        assert!(!stream.recv.is_fin());
1367
1368        stream.send.ack(1, 5);
1369        assert!(!stream.send.is_complete());
1370
1371        stream.send.ack(0, 1);
1372        assert!(stream.send.is_complete());
1373
1374        assert!(!stream.is_complete());
1375
1376        let mut buf = [0; 3];
1377        assert_eq!(stream.recv.emit(&mut buf), Ok((3, true)));
1378        assert!(stream.recv.is_fin());
1379
1380        assert!(stream.is_complete());
1381    }
1382
1383    #[test]
1384    fn send_fin_zero_length_output() {
1385        let mut buf = [0; 5];
1386
1387        let mut stream =
1388            <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1389
1390        assert_eq!(stream.send.write(b"hello", false), Ok(5));
1391        assert_eq!(stream.send.off_front(), 0);
1392        assert!(!stream.send.is_fin());
1393
1394        let (written, fin) = stream.send.emit(&mut buf).unwrap();
1395        assert_eq!(written, 5);
1396        assert!(!fin);
1397        assert_eq!(&buf[..written], b"hello");
1398
1399        assert_eq!(stream.send.write(b"", true), Ok(0));
1400        assert!(stream.send.is_fin());
1401        assert_eq!(stream.send.off_front(), 5);
1402
1403        let (written, fin) = stream.send.emit(&mut buf).unwrap();
1404        assert_eq!(written, 0);
1405        assert!(fin);
1406        assert_eq!(&buf[..written], b"");
1407    }
1408
1409    fn stream_send_ready(stream: &Stream) -> bool {
1410        !stream.send.is_empty() &&
1411            stream.send.off_front() < stream.send.off_back()
1412    }
1413
1414    #[test]
1415    fn send_emit() {
1416        let mut buf = [0; 5];
1417
1418        let mut stream =
1419            <Stream>::new(0, 0, 20, true, true, DEFAULT_STREAM_WINDOW);
1420
1421        assert_eq!(stream.send.write(b"hello", false), Ok(5));
1422        assert_eq!(stream.send.write(b"world", false), Ok(5));
1423        assert_eq!(stream.send.write(b"olleh", false), Ok(5));
1424        assert_eq!(stream.send.write(b"dlrow", true), Ok(5));
1425        assert_eq!(stream.send.off_front(), 0);
1426        assert_eq!(stream.send.bufs_count(), 4);
1427
1428        assert!(stream.is_flushable());
1429
1430        assert!(stream_send_ready(&stream));
1431        assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1432        assert_eq!(stream.send.off_front(), 4);
1433        assert_eq!(&buf[..4], b"hell");
1434
1435        assert!(stream_send_ready(&stream));
1436        assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1437        assert_eq!(stream.send.off_front(), 8);
1438        assert_eq!(&buf[..4], b"owor");
1439
1440        assert!(stream_send_ready(&stream));
1441        assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1442        assert_eq!(stream.send.off_front(), 10);
1443        assert_eq!(&buf[..2], b"ld");
1444
1445        assert!(stream_send_ready(&stream));
1446        assert_eq!(stream.send.emit(&mut buf[..1]), Ok((1, false)));
1447        assert_eq!(stream.send.off_front(), 11);
1448        assert_eq!(&buf[..1], b"o");
1449
1450        assert!(stream_send_ready(&stream));
1451        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1452        assert_eq!(stream.send.off_front(), 16);
1453        assert_eq!(&buf[..5], b"llehd");
1454
1455        assert!(stream_send_ready(&stream));
1456        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((4, true)));
1457        assert_eq!(stream.send.off_front(), 20);
1458        assert_eq!(&buf[..4], b"lrow");
1459
1460        assert!(!stream.is_flushable());
1461
1462        assert!(!stream_send_ready(&stream));
1463        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((0, true)));
1464        assert_eq!(stream.send.off_front(), 20);
1465    }
1466
1467    #[test]
1468    fn send_emit_ack() {
1469        let mut buf = [0; 5];
1470
1471        let mut stream =
1472            <Stream>::new(0, 0, 20, true, true, DEFAULT_STREAM_WINDOW);
1473
1474        assert_eq!(stream.send.write(b"hello", false), Ok(5));
1475        assert_eq!(stream.send.write(b"world", false), Ok(5));
1476        assert_eq!(stream.send.write(b"olleh", false), Ok(5));
1477        assert_eq!(stream.send.write(b"dlrow", true), Ok(5));
1478        assert_eq!(stream.send.off_front(), 0);
1479        assert_eq!(stream.send.bufs_count(), 4);
1480
1481        assert!(stream.is_flushable());
1482
1483        assert!(stream_send_ready(&stream));
1484        assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1485        assert_eq!(stream.send.off_front(), 4);
1486        assert_eq!(&buf[..4], b"hell");
1487
1488        assert!(stream_send_ready(&stream));
1489        assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1490        assert_eq!(stream.send.off_front(), 8);
1491        assert_eq!(&buf[..4], b"owor");
1492
1493        stream.send.ack_and_drop(0, 5);
1494        assert_eq!(stream.send.bufs_count(), 3);
1495
1496        assert!(stream_send_ready(&stream));
1497        assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1498        assert_eq!(stream.send.off_front(), 10);
1499        assert_eq!(&buf[..2], b"ld");
1500
1501        stream.send.ack_and_drop(7, 5);
1502        assert_eq!(stream.send.bufs_count(), 3);
1503
1504        assert!(stream_send_ready(&stream));
1505        assert_eq!(stream.send.emit(&mut buf[..1]), Ok((1, false)));
1506        assert_eq!(stream.send.off_front(), 11);
1507        assert_eq!(&buf[..1], b"o");
1508
1509        assert!(stream_send_ready(&stream));
1510        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1511        assert_eq!(stream.send.off_front(), 16);
1512        assert_eq!(&buf[..5], b"llehd");
1513
1514        stream.send.ack_and_drop(5, 7);
1515        assert_eq!(stream.send.bufs_count(), 2);
1516
1517        assert!(stream_send_ready(&stream));
1518        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((4, true)));
1519        assert_eq!(stream.send.off_front(), 20);
1520        assert_eq!(&buf[..4], b"lrow");
1521
1522        assert!(!stream.is_flushable());
1523
1524        assert!(!stream_send_ready(&stream));
1525        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((0, true)));
1526        assert_eq!(stream.send.off_front(), 20);
1527
1528        stream.send.ack_and_drop(22, 4);
1529        assert_eq!(stream.send.bufs_count(), 2);
1530
1531        stream.send.ack_and_drop(20, 1);
1532        assert_eq!(stream.send.bufs_count(), 2);
1533    }
1534
1535    #[test]
1536    fn send_emit_retransmit() {
1537        let mut buf = [0; 5];
1538
1539        let mut stream =
1540            <Stream>::new(0, 0, 20, true, true, DEFAULT_STREAM_WINDOW);
1541
1542        assert_eq!(stream.send.write(b"hello", false), Ok(5));
1543        assert_eq!(stream.send.write(b"world", false), Ok(5));
1544        assert_eq!(stream.send.write(b"olleh", false), Ok(5));
1545        assert_eq!(stream.send.write(b"dlrow", true), Ok(5));
1546        assert_eq!(stream.send.off_front(), 0);
1547        assert_eq!(stream.send.bufs_count(), 4);
1548
1549        assert!(stream.is_flushable());
1550
1551        assert!(stream_send_ready(&stream));
1552        assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1553        assert_eq!(stream.send.off_front(), 4);
1554        assert_eq!(&buf[..4], b"hell");
1555
1556        assert!(stream_send_ready(&stream));
1557        assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1558        assert_eq!(stream.send.off_front(), 8);
1559        assert_eq!(&buf[..4], b"owor");
1560
1561        stream.send.retransmit(3, 3);
1562        assert_eq!(stream.send.off_front(), 3);
1563
1564        assert!(stream_send_ready(&stream));
1565        assert_eq!(stream.send.emit(&mut buf[..3]), Ok((3, false)));
1566        assert_eq!(stream.send.off_front(), 8);
1567        assert_eq!(&buf[..3], b"low");
1568
1569        assert!(stream_send_ready(&stream));
1570        assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1571        assert_eq!(stream.send.off_front(), 10);
1572        assert_eq!(&buf[..2], b"ld");
1573
1574        stream.send.ack_and_drop(7, 2);
1575
1576        stream.send.retransmit(8, 2);
1577
1578        assert!(stream_send_ready(&stream));
1579        assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1580        assert_eq!(stream.send.off_front(), 10);
1581        assert_eq!(&buf[..2], b"ld");
1582
1583        assert!(stream_send_ready(&stream));
1584        assert_eq!(stream.send.emit(&mut buf[..1]), Ok((1, false)));
1585        assert_eq!(stream.send.off_front(), 11);
1586        assert_eq!(&buf[..1], b"o");
1587
1588        assert!(stream_send_ready(&stream));
1589        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1590        assert_eq!(stream.send.off_front(), 16);
1591        assert_eq!(&buf[..5], b"llehd");
1592
1593        stream.send.retransmit(12, 2);
1594
1595        assert!(stream_send_ready(&stream));
1596        assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1597        assert_eq!(stream.send.off_front(), 16);
1598        assert_eq!(&buf[..2], b"le");
1599
1600        assert!(stream_send_ready(&stream));
1601        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((4, true)));
1602        assert_eq!(stream.send.off_front(), 20);
1603        assert_eq!(&buf[..4], b"lrow");
1604
1605        assert!(!stream.is_flushable());
1606
1607        assert!(!stream_send_ready(&stream));
1608        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((0, true)));
1609        assert_eq!(stream.send.off_front(), 20);
1610
1611        stream.send.retransmit(7, 12);
1612
1613        assert!(stream_send_ready(&stream));
1614        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1615        assert_eq!(stream.send.off_front(), 12);
1616        assert_eq!(&buf[..5], b"rldol");
1617
1618        assert!(stream_send_ready(&stream));
1619        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1620        assert_eq!(stream.send.off_front(), 17);
1621        assert_eq!(&buf[..5], b"lehdl");
1622
1623        assert!(stream_send_ready(&stream));
1624        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((2, false)));
1625        assert_eq!(stream.send.off_front(), 20);
1626        assert_eq!(&buf[..2], b"ro");
1627
1628        stream.send.ack_and_drop(12, 7);
1629
1630        stream.send.retransmit(7, 12);
1631
1632        assert!(stream_send_ready(&stream));
1633        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1634        assert_eq!(stream.send.off_front(), 12);
1635        assert_eq!(&buf[..5], b"rldol");
1636
1637        assert!(stream_send_ready(&stream));
1638        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1639        assert_eq!(stream.send.off_front(), 17);
1640        assert_eq!(&buf[..5], b"lehdl");
1641
1642        assert!(stream_send_ready(&stream));
1643        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((2, false)));
1644        assert_eq!(stream.send.off_front(), 20);
1645        assert_eq!(&buf[..2], b"ro");
1646    }
1647
1648    #[test]
1649    fn rangebuf_split_off() {
1650        let mut buf = <RangeBuf>::from(b"helloworld", 5, true);
1651        assert_eq!(buf.start, 0);
1652        assert_eq!(buf.pos, 0);
1653        assert_eq!(buf.len, 10);
1654        assert_eq!(buf.off, 5);
1655        assert!(buf.fin);
1656
1657        assert_eq!(buf.len(), 10);
1658        assert_eq!(buf.off(), 5);
1659        assert!(buf.fin());
1660
1661        assert_eq!(&buf[..], b"helloworld");
1662
1663        // Advance buffer.
1664        buf.consume(5);
1665
1666        assert_eq!(buf.start, 0);
1667        assert_eq!(buf.pos, 5);
1668        assert_eq!(buf.len, 10);
1669        assert_eq!(buf.off, 5);
1670        assert!(buf.fin);
1671
1672        assert_eq!(buf.len(), 5);
1673        assert_eq!(buf.off(), 10);
1674        assert!(buf.fin());
1675
1676        assert_eq!(&buf[..], b"world");
1677
1678        // Split buffer before position.
1679        let mut new_buf = buf.split_off(3);
1680
1681        assert_eq!(buf.start, 0);
1682        assert_eq!(buf.pos, 3);
1683        assert_eq!(buf.len, 3);
1684        assert_eq!(buf.off, 5);
1685        assert!(!buf.fin);
1686
1687        assert_eq!(buf.len(), 0);
1688        assert_eq!(buf.off(), 8);
1689        assert!(!buf.fin());
1690
1691        assert_eq!(&buf[..], b"");
1692
1693        assert_eq!(new_buf.start, 3);
1694        assert_eq!(new_buf.pos, 5);
1695        assert_eq!(new_buf.len, 7);
1696        assert_eq!(new_buf.off, 8);
1697        assert!(new_buf.fin);
1698
1699        assert_eq!(new_buf.len(), 5);
1700        assert_eq!(new_buf.off(), 10);
1701        assert!(new_buf.fin());
1702
1703        assert_eq!(&new_buf[..], b"world");
1704
1705        // Advance buffer.
1706        new_buf.consume(2);
1707
1708        assert_eq!(new_buf.start, 3);
1709        assert_eq!(new_buf.pos, 7);
1710        assert_eq!(new_buf.len, 7);
1711        assert_eq!(new_buf.off, 8);
1712        assert!(new_buf.fin);
1713
1714        assert_eq!(new_buf.len(), 3);
1715        assert_eq!(new_buf.off(), 12);
1716        assert!(new_buf.fin());
1717
1718        assert_eq!(&new_buf[..], b"rld");
1719
1720        // Split buffer after position.
1721        let mut new_new_buf = new_buf.split_off(5);
1722
1723        assert_eq!(new_buf.start, 3);
1724        assert_eq!(new_buf.pos, 7);
1725        assert_eq!(new_buf.len, 5);
1726        assert_eq!(new_buf.off, 8);
1727        assert!(!new_buf.fin);
1728
1729        assert_eq!(new_buf.len(), 1);
1730        assert_eq!(new_buf.off(), 12);
1731        assert!(!new_buf.fin());
1732
1733        assert_eq!(&new_buf[..], b"r");
1734
1735        assert_eq!(new_new_buf.start, 8);
1736        assert_eq!(new_new_buf.pos, 8);
1737        assert_eq!(new_new_buf.len, 2);
1738        assert_eq!(new_new_buf.off, 13);
1739        assert!(new_new_buf.fin);
1740
1741        assert_eq!(new_new_buf.len(), 2);
1742        assert_eq!(new_new_buf.off(), 13);
1743        assert!(new_new_buf.fin());
1744
1745        assert_eq!(&new_new_buf[..], b"ld");
1746
1747        // Advance buffer.
1748        new_new_buf.consume(2);
1749
1750        assert_eq!(new_new_buf.start, 8);
1751        assert_eq!(new_new_buf.pos, 10);
1752        assert_eq!(new_new_buf.len, 2);
1753        assert_eq!(new_new_buf.off, 13);
1754        assert!(new_new_buf.fin);
1755
1756        assert_eq!(new_new_buf.len(), 0);
1757        assert_eq!(new_new_buf.off(), 15);
1758        assert!(new_new_buf.fin());
1759
1760        assert_eq!(&new_new_buf[..], b"");
1761    }
1762
1763    /// RFC9000 2.1: A stream ID that is used out of order results in all
1764    /// streams of that type with lower-numbered stream IDs also being opened.
1765    #[test]
1766    fn stream_limit_auto_open() {
1767        let local_tp = crate::TransportParams::default();
1768        let peer_tp = crate::TransportParams::default();
1769
1770        let mut streams = <StreamMap>::new(5, 5, 5);
1771
1772        let stream_id = 500;
1773        assert!(!is_local(stream_id, true), "stream id is peer initiated");
1774        assert!(is_bidi(stream_id), "stream id is bidirectional");
1775        assert_eq!(
1776            streams
1777                .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
1778                .err(),
1779            Some(Error::StreamLimit),
1780            "stream limit should be exceeded"
1781        );
1782    }
1783
1784    /// Stream limit should be satisfied regardless of what order we open
1785    /// streams
1786    #[test]
1787    fn stream_create_out_of_order() {
1788        let local_tp = crate::TransportParams::default();
1789        let peer_tp = crate::TransportParams::default();
1790
1791        let mut streams = <StreamMap>::new(5, 5, 5);
1792
1793        for stream_id in [8, 12, 4] {
1794            assert!(is_local(stream_id, false), "stream id is client initiated");
1795            assert!(is_bidi(stream_id), "stream id is bidirectional");
1796            assert!(streams
1797                .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
1798                .is_ok());
1799        }
1800    }
1801
1802    /// Check stream limit boundary cases
1803    #[test]
1804    fn stream_limit_edge() {
1805        let local_tp = crate::TransportParams::default();
1806        let peer_tp = crate::TransportParams::default();
1807
1808        let mut streams = <StreamMap>::new(3, 3, 3);
1809
1810        // Highest permitted
1811        let stream_id = 8;
1812        assert!(streams
1813            .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
1814            .is_ok());
1815
1816        // One more than highest permitted
1817        let stream_id = 12;
1818        assert_eq!(
1819            streams
1820                .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
1821                .err(),
1822            Some(Error::StreamLimit)
1823        );
1824    }
1825
1826    fn cycle_stream_priority(stream_id: u64, streams: &mut StreamMap) {
1827        let key = streams.get(stream_id).unwrap().priority_key.clone();
1828        streams.update_priority(&key.clone(), &key);
1829    }
1830
1831    #[test]
1832    fn writable_prioritized_default_priority() {
1833        let local_tp = crate::TransportParams::default();
1834        let peer_tp = crate::TransportParams {
1835            initial_max_stream_data_bidi_local: 100,
1836            initial_max_stream_data_uni: 100,
1837            ..Default::default()
1838        };
1839
1840        let mut streams = StreamMap::new(100, 100, 100);
1841
1842        for id in [0, 4, 8, 12] {
1843            assert!(streams
1844                .get_or_create(id, &local_tp, &peer_tp, false, true)
1845                .is_ok());
1846        }
1847
1848        let walk_1: Vec<u64> = streams.writable().collect();
1849        cycle_stream_priority(*walk_1.first().unwrap(), &mut streams);
1850        let walk_2: Vec<u64> = streams.writable().collect();
1851        cycle_stream_priority(*walk_2.first().unwrap(), &mut streams);
1852        let walk_3: Vec<u64> = streams.writable().collect();
1853        cycle_stream_priority(*walk_3.first().unwrap(), &mut streams);
1854        let walk_4: Vec<u64> = streams.writable().collect();
1855        cycle_stream_priority(*walk_4.first().unwrap(), &mut streams);
1856        let walk_5: Vec<u64> = streams.writable().collect();
1857
1858        // All streams are non-incremental and same urgency by default. Multiple
1859        // visits shuffle their order.
1860        assert_eq!(walk_1, vec![0, 4, 8, 12]);
1861        assert_eq!(walk_2, vec![4, 8, 12, 0]);
1862        assert_eq!(walk_3, vec![8, 12, 0, 4]);
1863        assert_eq!(walk_4, vec![12, 0, 4, 8,]);
1864        assert_eq!(walk_5, vec![0, 4, 8, 12]);
1865    }
1866
1867    #[test]
1868    fn writable_prioritized_insert_order() {
1869        let local_tp = crate::TransportParams::default();
1870        let peer_tp = crate::TransportParams {
1871            initial_max_stream_data_bidi_local: 100,
1872            initial_max_stream_data_uni: 100,
1873            ..Default::default()
1874        };
1875
1876        let mut streams = StreamMap::new(100, 100, 100);
1877
1878        // Inserting same-urgency incremental streams in a "random" order yields
1879        // same order to start with.
1880        for id in [12, 4, 8, 0] {
1881            assert!(streams
1882                .get_or_create(id, &local_tp, &peer_tp, false, true)
1883                .is_ok());
1884        }
1885
1886        let walk_1: Vec<u64> = streams.writable().collect();
1887        cycle_stream_priority(*walk_1.first().unwrap(), &mut streams);
1888        let walk_2: Vec<u64> = streams.writable().collect();
1889        cycle_stream_priority(*walk_2.first().unwrap(), &mut streams);
1890        let walk_3: Vec<u64> = streams.writable().collect();
1891        cycle_stream_priority(*walk_3.first().unwrap(), &mut streams);
1892        let walk_4: Vec<u64> = streams.writable().collect();
1893        cycle_stream_priority(*walk_4.first().unwrap(), &mut streams);
1894        let walk_5: Vec<u64> = streams.writable().collect();
1895        assert_eq!(walk_1, vec![12, 4, 8, 0]);
1896        assert_eq!(walk_2, vec![4, 8, 0, 12]);
1897        assert_eq!(walk_3, vec![8, 0, 12, 4,]);
1898        assert_eq!(walk_4, vec![0, 12, 4, 8]);
1899        assert_eq!(walk_5, vec![12, 4, 8, 0]);
1900    }
1901
1902    #[test]
1903    fn writable_prioritized_mixed_urgency() {
1904        let local_tp = crate::TransportParams::default();
1905        let peer_tp = crate::TransportParams {
1906            initial_max_stream_data_bidi_local: 100,
1907            initial_max_stream_data_uni: 100,
1908            ..Default::default()
1909        };
1910
1911        let mut streams = <StreamMap>::new(100, 100, 100);
1912
1913        // Streams where the urgency descends (becomes more important). No stream
1914        // shares an urgency.
1915        let input = vec![
1916            (0, 100),
1917            (4, 90),
1918            (8, 80),
1919            (12, 70),
1920            (16, 60),
1921            (20, 50),
1922            (24, 40),
1923            (28, 30),
1924            (32, 20),
1925            (36, 10),
1926            (40, 0),
1927        ];
1928
1929        for (id, urgency) in input.clone() {
1930            // this duplicates some code from stream_priority in order to access
1931            // streams and the collection they're in
1932            let stream = streams
1933                .get_or_create(id, &local_tp, &peer_tp, false, true)
1934                .unwrap();
1935
1936            stream.urgency = urgency;
1937
1938            let new_priority_key = Arc::new(StreamPriorityKey {
1939                urgency: stream.urgency,
1940                incremental: stream.incremental,
1941                id,
1942                ..Default::default()
1943            });
1944
1945            let old_priority_key = std::mem::replace(
1946                &mut stream.priority_key,
1947                new_priority_key.clone(),
1948            );
1949
1950            streams.update_priority(&old_priority_key, &new_priority_key);
1951        }
1952
1953        let walk_1: Vec<u64> = streams.writable().collect();
1954        assert_eq!(walk_1, vec![40, 36, 32, 28, 24, 20, 16, 12, 8, 4, 0]);
1955
1956        // Re-applying priority to a stream does not cause duplication.
1957        for (id, urgency) in input {
1958            // this duplicates some code from stream_priority in order to access
1959            // streams and the collection they're in
1960            let stream = streams
1961                .get_or_create(id, &local_tp, &peer_tp, false, true)
1962                .unwrap();
1963
1964            stream.urgency = urgency;
1965
1966            let new_priority_key = Arc::new(StreamPriorityKey {
1967                urgency: stream.urgency,
1968                incremental: stream.incremental,
1969                id,
1970                ..Default::default()
1971            });
1972
1973            let old_priority_key = std::mem::replace(
1974                &mut stream.priority_key,
1975                new_priority_key.clone(),
1976            );
1977
1978            streams.update_priority(&old_priority_key, &new_priority_key);
1979        }
1980
1981        let walk_2: Vec<u64> = streams.writable().collect();
1982        assert_eq!(walk_2, vec![40, 36, 32, 28, 24, 20, 16, 12, 8, 4, 0]);
1983
1984        // Removing streams doesn't break expected ordering.
1985        streams.collect(24, true);
1986
1987        let walk_3: Vec<u64> = streams.writable().collect();
1988        assert_eq!(walk_3, vec![40, 36, 32, 28, 20, 16, 12, 8, 4, 0]);
1989
1990        streams.collect(40, true);
1991        streams.collect(0, true);
1992
1993        let walk_4: Vec<u64> = streams.writable().collect();
1994        assert_eq!(walk_4, vec![36, 32, 28, 20, 16, 12, 8, 4]);
1995
1996        // Adding streams doesn't break expected ordering.
1997        streams
1998            .get_or_create(44, &local_tp, &peer_tp, false, true)
1999            .unwrap();
2000
2001        let walk_5: Vec<u64> = streams.writable().collect();
2002        assert_eq!(walk_5, vec![36, 32, 28, 20, 16, 12, 8, 4, 44]);
2003    }
2004
2005    #[test]
2006    fn writable_prioritized_mixed_urgencies_incrementals() {
2007        let local_tp = crate::TransportParams::default();
2008        let peer_tp = crate::TransportParams {
2009            initial_max_stream_data_bidi_local: 100,
2010            initial_max_stream_data_uni: 100,
2011            ..Default::default()
2012        };
2013
2014        let mut streams = StreamMap::new(100, 100, 100);
2015
2016        // Streams that share some urgency level
2017        let input = vec![
2018            (0, 100),
2019            (4, 20),
2020            (8, 100),
2021            (12, 20),
2022            (16, 90),
2023            (20, 25),
2024            (24, 90),
2025            (28, 30),
2026            (32, 80),
2027            (36, 20),
2028            (40, 0),
2029        ];
2030
2031        for (id, urgency) in input.clone() {
2032            // this duplicates some code from stream_priority in order to access
2033            // streams and the collection they're in
2034            let stream = streams
2035                .get_or_create(id, &local_tp, &peer_tp, false, true)
2036                .unwrap();
2037
2038            stream.urgency = urgency;
2039
2040            let new_priority_key = Arc::new(StreamPriorityKey {
2041                urgency: stream.urgency,
2042                incremental: stream.incremental,
2043                id,
2044                ..Default::default()
2045            });
2046
2047            let old_priority_key = std::mem::replace(
2048                &mut stream.priority_key,
2049                new_priority_key.clone(),
2050            );
2051
2052            streams.update_priority(&old_priority_key, &new_priority_key);
2053        }
2054
2055        let walk_1: Vec<u64> = streams.writable().collect();
2056        cycle_stream_priority(4, &mut streams);
2057        cycle_stream_priority(16, &mut streams);
2058        cycle_stream_priority(0, &mut streams);
2059        let walk_2: Vec<u64> = streams.writable().collect();
2060        cycle_stream_priority(12, &mut streams);
2061        cycle_stream_priority(24, &mut streams);
2062        cycle_stream_priority(8, &mut streams);
2063        let walk_3: Vec<u64> = streams.writable().collect();
2064        cycle_stream_priority(36, &mut streams);
2065        cycle_stream_priority(16, &mut streams);
2066        cycle_stream_priority(0, &mut streams);
2067        let walk_4: Vec<u64> = streams.writable().collect();
2068        cycle_stream_priority(4, &mut streams);
2069        cycle_stream_priority(24, &mut streams);
2070        cycle_stream_priority(8, &mut streams);
2071        let walk_5: Vec<u64> = streams.writable().collect();
2072        cycle_stream_priority(12, &mut streams);
2073        cycle_stream_priority(16, &mut streams);
2074        cycle_stream_priority(0, &mut streams);
2075        let walk_6: Vec<u64> = streams.writable().collect();
2076        cycle_stream_priority(36, &mut streams);
2077        cycle_stream_priority(24, &mut streams);
2078        cycle_stream_priority(8, &mut streams);
2079        let walk_7: Vec<u64> = streams.writable().collect();
2080        cycle_stream_priority(4, &mut streams);
2081        cycle_stream_priority(16, &mut streams);
2082        cycle_stream_priority(0, &mut streams);
2083        let walk_8: Vec<u64> = streams.writable().collect();
2084        cycle_stream_priority(12, &mut streams);
2085        cycle_stream_priority(24, &mut streams);
2086        cycle_stream_priority(8, &mut streams);
2087        let walk_9: Vec<u64> = streams.writable().collect();
2088        cycle_stream_priority(36, &mut streams);
2089        cycle_stream_priority(16, &mut streams);
2090        cycle_stream_priority(0, &mut streams);
2091
2092        assert_eq!(walk_1, vec![40, 4, 12, 36, 20, 28, 32, 16, 24, 0, 8]);
2093        assert_eq!(walk_2, vec![40, 12, 36, 4, 20, 28, 32, 24, 16, 8, 0]);
2094        assert_eq!(walk_3, vec![40, 36, 4, 12, 20, 28, 32, 16, 24, 0, 8]);
2095        assert_eq!(walk_4, vec![40, 4, 12, 36, 20, 28, 32, 24, 16, 8, 0]);
2096        assert_eq!(walk_5, vec![40, 12, 36, 4, 20, 28, 32, 16, 24, 0, 8]);
2097        assert_eq!(walk_6, vec![40, 36, 4, 12, 20, 28, 32, 24, 16, 8, 0]);
2098        assert_eq!(walk_7, vec![40, 4, 12, 36, 20, 28, 32, 16, 24, 0, 8]);
2099        assert_eq!(walk_8, vec![40, 12, 36, 4, 20, 28, 32, 24, 16, 8, 0]);
2100        assert_eq!(walk_9, vec![40, 36, 4, 12, 20, 28, 32, 16, 24, 0, 8]);
2101
2102        // Removing streams doesn't break expected ordering.
2103        streams.collect(20, true);
2104
2105        let walk_10: Vec<u64> = streams.writable().collect();
2106        assert_eq!(walk_10, vec![40, 4, 12, 36, 28, 32, 24, 16, 8, 0]);
2107
2108        // Adding streams doesn't break expected ordering.
2109        let stream = streams
2110            .get_or_create(44, &local_tp, &peer_tp, false, true)
2111            .unwrap();
2112
2113        stream.urgency = 20;
2114        stream.incremental = true;
2115
2116        let new_priority_key = Arc::new(StreamPriorityKey {
2117            urgency: stream.urgency,
2118            incremental: stream.incremental,
2119            id: 44,
2120            ..Default::default()
2121        });
2122
2123        let old_priority_key =
2124            std::mem::replace(&mut stream.priority_key, new_priority_key.clone());
2125
2126        streams.update_priority(&old_priority_key, &new_priority_key);
2127
2128        let walk_11: Vec<u64> = streams.writable().collect();
2129        assert_eq!(walk_11, vec![40, 4, 12, 36, 44, 28, 32, 24, 16, 8, 0]);
2130    }
2131
2132    #[test]
2133    fn priority_tree_dupes() {
2134        let mut prioritized_writable: RBTree<StreamWritablePriorityAdapter> =
2135            Default::default();
2136
2137        for id in [0, 4, 8, 12] {
2138            let s = Arc::new(StreamPriorityKey {
2139                urgency: 0,
2140                incremental: false,
2141                id,
2142                ..Default::default()
2143            });
2144
2145            prioritized_writable.insert(s);
2146        }
2147
2148        let walk_1: Vec<u64> =
2149            prioritized_writable.iter().map(|s| s.id).collect();
2150        assert_eq!(walk_1, vec![0, 4, 8, 12]);
2151
2152        // Default keys could cause duplicate entries, this is normally protected
2153        // against via StreamMap.
2154        for id in [0, 4, 8, 12] {
2155            let s = Arc::new(StreamPriorityKey {
2156                urgency: 0,
2157                incremental: false,
2158                id,
2159                ..Default::default()
2160            });
2161
2162            prioritized_writable.insert(s);
2163        }
2164
2165        let walk_2: Vec<u64> =
2166            prioritized_writable.iter().map(|s| s.id).collect();
2167        assert_eq!(walk_2, vec![0, 0, 4, 4, 8, 8, 12, 12]);
2168    }
2169}
2170
2171mod recv_buf;
2172mod send_buf;