quiche/stream/
mod.rs

1// Copyright (C) 2018-2019, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27use std::cmp;
28
29use std::sync::Arc;
30
31use std::collections::hash_map;
32use std::collections::HashMap;
33use std::collections::HashSet;
34
35use intrusive_collections::intrusive_adapter;
36use intrusive_collections::KeyAdapter;
37use intrusive_collections::RBTree;
38use intrusive_collections::RBTreeAtomicLink;
39
40use smallvec::SmallVec;
41
42use crate::range_buf::DefaultBufFactory;
43use crate::BufFactory;
44use crate::Error;
45use crate::Result;
46
47const DEFAULT_URGENCY: u8 = 127;
48
49// The default size of the receiver stream flow control window.
50const DEFAULT_STREAM_WINDOW: u64 = 32 * 1024;
51
52/// The maximum size of the receiver stream flow control window.
53pub const MAX_STREAM_WINDOW: u64 = 16 * 1024 * 1024;
54
55/// A simple no-op hasher for Stream IDs.
56///
57/// The QUIC protocol and quiche library guarantees stream ID uniqueness, so
58/// we can save effort by avoiding using a more complicated algorithm.
59#[derive(Default)]
60pub struct StreamIdHasher {
61    id: u64,
62}
63
64/// Return value type of `RecvBuf::reset()`
65#[derive(Debug, PartialEq, Clone, Copy)]
66pub struct RecvBufResetReturn {
67    /// Returns the difference between the previous max_data offset
68    /// received and the final size reported by the reset
69    pub max_data_delta: u64,
70
71    /// The amount of flow control credit that should be returned to the
72    /// connection level flow control.
73    pub consumed_flowcontrol: u64,
74}
75
76impl RecvBufResetReturn {
77    pub fn zero() -> Self {
78        Self {
79            max_data_delta: 0,
80            consumed_flowcontrol: 0,
81        }
82    }
83}
84
85impl std::hash::Hasher for StreamIdHasher {
86    #[inline]
87    fn finish(&self) -> u64 {
88        self.id
89    }
90
91    #[inline]
92    fn write_u64(&mut self, id: u64) {
93        self.id = id;
94    }
95
96    #[inline]
97    fn write(&mut self, _: &[u8]) {
98        // We need a default write() for the trait but stream IDs will always
99        // be a u64 so we just delegate to write_u64.
100        unimplemented!()
101    }
102}
103
104type BuildStreamIdHasher = std::hash::BuildHasherDefault<StreamIdHasher>;
105
106pub type StreamIdHashMap<V> = HashMap<u64, V, BuildStreamIdHasher>;
107pub type StreamIdHashSet = HashSet<u64, BuildStreamIdHasher>;
108
109/// Keeps track of QUIC streams and enforces stream limits.
110#[derive(Default)]
111pub struct StreamMap<F: BufFactory = DefaultBufFactory> {
112    /// Map of streams indexed by stream ID.
113    streams: StreamIdHashMap<Stream<F>>,
114
115    /// Set of streams that were completed and garbage collected.
116    ///
117    /// Instead of keeping the full stream state forever, we collect completed
118    /// streams to save memory, but we still need to keep track of previously
119    /// created streams, to prevent peers from re-creating them.
120    collected: StreamIdHashSet,
121
122    /// Peer's maximum bidirectional stream count limit.
123    peer_max_streams_bidi: u64,
124
125    /// Peer's maximum unidirectional stream count limit.
126    peer_max_streams_uni: u64,
127
128    /// The total number of bidirectional streams opened by the peer.
129    peer_opened_streams_bidi: u64,
130
131    /// The total number of unidirectional streams opened by the peer.
132    peer_opened_streams_uni: u64,
133
134    /// Local maximum bidirectional stream count limit.
135    local_max_streams_bidi: u64,
136    local_max_streams_bidi_next: u64,
137
138    /// Local maximum unidirectional stream count limit.
139    local_max_streams_uni: u64,
140    local_max_streams_uni_next: u64,
141
142    /// The total number of bidirectional streams opened by the local endpoint.
143    local_opened_streams_bidi: u64,
144
145    /// The total number of unidirectional streams opened by the local endpoint.
146    local_opened_streams_uni: u64,
147
148    /// Queue of stream IDs corresponding to streams that have buffered data
149    /// ready to be sent to the peer. This also implies that the stream has
150    /// enough flow control credits to send at least some of that data.
151    flushable: RBTree<StreamFlushablePriorityAdapter>,
152
153    /// Set of stream IDs corresponding to streams that have outstanding data
154    /// to read. This is used to generate a `StreamIter` of streams without
155    /// having to iterate over the full list of streams.
156    pub readable: RBTree<StreamReadablePriorityAdapter>,
157
158    /// Set of stream IDs corresponding to streams that have enough flow control
159    /// capacity to be written to, and is not finished. This is used to generate
160    /// a `StreamIter` of streams without having to iterate over the full list
161    /// of streams.
162    pub writable: RBTree<StreamWritablePriorityAdapter>,
163
164    /// Set of stream IDs corresponding to streams that are almost out of flow
165    /// control credit and need to send MAX_STREAM_DATA. This is used to
166    /// generate a `StreamIter` of streams without having to iterate over the
167    /// full list of streams.
168    almost_full: StreamIdHashSet,
169
170    /// Set of stream IDs corresponding to streams that are blocked. The value
171    /// of the map elements represents the offset of the stream at which the
172    /// blocking occurred.
173    blocked: StreamIdHashMap<u64>,
174
175    /// Set of stream IDs corresponding to streams that are reset. The value
176    /// of the map elements is a tuple of the error code and final size values
177    /// to include in the RESET_STREAM frame.
178    reset: StreamIdHashMap<(u64, u64)>,
179
180    /// Set of stream IDs corresponding to streams that are shutdown on the
181    /// receive side, and need to send a STOP_SENDING frame. The value of the
182    /// map elements is the error code to include in the STOP_SENDING frame.
183    stopped: StreamIdHashMap<u64>,
184
185    /// The maximum size of a stream window.
186    max_stream_window: u64,
187}
188
189impl<F: BufFactory> StreamMap<F> {
190    pub fn new(
191        max_streams_bidi: u64, max_streams_uni: u64, max_stream_window: u64,
192    ) -> Self {
193        StreamMap {
194            local_max_streams_bidi: max_streams_bidi,
195            local_max_streams_bidi_next: max_streams_bidi,
196
197            local_max_streams_uni: max_streams_uni,
198            local_max_streams_uni_next: max_streams_uni,
199
200            max_stream_window,
201
202            ..StreamMap::default()
203        }
204    }
205
206    /// Returns the stream with the given ID if it exists.
207    pub fn get(&self, id: u64) -> Option<&Stream<F>> {
208        self.streams.get(&id)
209    }
210
211    /// Returns the mutable stream with the given ID if it exists.
212    pub fn get_mut(&mut self, id: u64) -> Option<&mut Stream<F>> {
213        self.streams.get_mut(&id)
214    }
215
216    /// Returns the mutable stream with the given ID if it exists, or creates
217    /// a new one otherwise.
218    ///
219    /// The `local` parameter indicates whether the stream's creation was
220    /// requested by the local application rather than the peer, and is
221    /// used to validate the requested stream ID, and to select the initial
222    /// flow control values from the local and remote transport parameters
223    /// (also passed as arguments).
224    ///
225    /// This also takes care of enforcing both local and the peer's stream
226    /// count limits. If one of these limits is violated, the `StreamLimit`
227    /// error is returned.
228    pub(crate) fn get_or_create(
229        &mut self, id: u64, local_params: &crate::TransportParams,
230        peer_params: &crate::TransportParams, local: bool, is_server: bool,
231    ) -> Result<&mut Stream<F>> {
232        let (stream, is_new_and_writable) = match self.streams.entry(id) {
233            hash_map::Entry::Vacant(v) => {
234                // Stream has already been closed and garbage collected.
235                if self.collected.contains(&id) {
236                    return Err(Error::Done);
237                }
238
239                if local != is_local(id, is_server) {
240                    return Err(Error::InvalidStreamState(id));
241                }
242
243                let (max_rx_data, max_tx_data) = match (local, is_bidi(id)) {
244                    // Locally-initiated bidirectional stream.
245                    (true, true) => (
246                        local_params.initial_max_stream_data_bidi_local,
247                        peer_params.initial_max_stream_data_bidi_remote,
248                    ),
249
250                    // Locally-initiated unidirectional stream.
251                    (true, false) => (0, peer_params.initial_max_stream_data_uni),
252
253                    // Remotely-initiated bidirectional stream.
254                    (false, true) => (
255                        local_params.initial_max_stream_data_bidi_remote,
256                        peer_params.initial_max_stream_data_bidi_local,
257                    ),
258
259                    // Remotely-initiated unidirectional stream.
260                    (false, false) =>
261                        (local_params.initial_max_stream_data_uni, 0),
262                };
263
264                // The two least significant bits from a stream id identify the
265                // type of stream. Truncate those bits to get the sequence for
266                // that stream type.
267                let stream_sequence = id >> 2;
268
269                // Enforce stream count limits.
270                match (is_local(id, is_server), is_bidi(id)) {
271                    (true, true) => {
272                        let n = cmp::max(
273                            self.local_opened_streams_bidi,
274                            stream_sequence + 1,
275                        );
276
277                        if n > self.peer_max_streams_bidi {
278                            return Err(Error::StreamLimit);
279                        }
280
281                        self.local_opened_streams_bidi = n;
282                    },
283
284                    (true, false) => {
285                        let n = cmp::max(
286                            self.local_opened_streams_uni,
287                            stream_sequence + 1,
288                        );
289
290                        if n > self.peer_max_streams_uni {
291                            return Err(Error::StreamLimit);
292                        }
293
294                        self.local_opened_streams_uni = n;
295                    },
296
297                    (false, true) => {
298                        let n = cmp::max(
299                            self.peer_opened_streams_bidi,
300                            stream_sequence + 1,
301                        );
302
303                        if n > self.local_max_streams_bidi {
304                            return Err(Error::StreamLimit);
305                        }
306
307                        self.peer_opened_streams_bidi = n;
308                    },
309
310                    (false, false) => {
311                        let n = cmp::max(
312                            self.peer_opened_streams_uni,
313                            stream_sequence + 1,
314                        );
315
316                        if n > self.local_max_streams_uni {
317                            return Err(Error::StreamLimit);
318                        }
319
320                        self.peer_opened_streams_uni = n;
321                    },
322                };
323
324                let s = Stream::new(
325                    id,
326                    max_rx_data,
327                    max_tx_data,
328                    is_bidi(id),
329                    local,
330                    self.max_stream_window,
331                );
332
333                let is_writable = s.is_writable();
334
335                (v.insert(s), is_writable)
336            },
337
338            hash_map::Entry::Occupied(v) => (v.into_mut(), false),
339        };
340
341        // Newly created stream might already be writable due to initial flow
342        // control limits.
343        if is_new_and_writable {
344            self.writable.insert(Arc::clone(&stream.priority_key));
345        }
346
347        Ok(stream)
348    }
349
350    /// Adds the stream ID to the readable streams set.
351    ///
352    /// If the stream was already in the list, this does nothing.
353    pub fn insert_readable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
354        if !priority_key.readable.is_linked() {
355            self.readable.insert(Arc::clone(priority_key));
356        }
357    }
358
359    /// Removes the stream ID from the readable streams set.
360    pub fn remove_readable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
361        if !priority_key.readable.is_linked() {
362            return;
363        }
364
365        let mut c = {
366            let ptr = Arc::as_ptr(priority_key);
367            unsafe { self.readable.cursor_mut_from_ptr(ptr) }
368        };
369
370        c.remove();
371    }
372
373    /// Adds the stream ID to the writable streams set.
374    ///
375    /// This should also be called anytime a new stream is created, in addition
376    /// to when an existing stream becomes writable.
377    ///
378    /// If the stream was already in the list, this does nothing.
379    pub fn insert_writable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
380        if !priority_key.writable.is_linked() {
381            self.writable.insert(Arc::clone(priority_key));
382        }
383    }
384
385    /// Removes the stream ID from the writable streams set.
386    ///
387    /// This should also be called anytime an existing stream stops being
388    /// writable.
389    pub fn remove_writable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
390        if !priority_key.writable.is_linked() {
391            return;
392        }
393
394        let mut c = {
395            let ptr = Arc::as_ptr(priority_key);
396            unsafe { self.writable.cursor_mut_from_ptr(ptr) }
397        };
398
399        c.remove();
400    }
401
402    /// Adds the stream ID to the flushable streams set.
403    ///
404    /// If the stream was already in the list, this does nothing.
405    pub fn insert_flushable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
406        if !priority_key.flushable.is_linked() {
407            self.flushable.insert(Arc::clone(priority_key));
408        }
409    }
410
411    /// Removes the stream ID from the flushable streams set.
412    pub fn remove_flushable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
413        if !priority_key.flushable.is_linked() {
414            return;
415        }
416
417        let mut c = {
418            let ptr = Arc::as_ptr(priority_key);
419            unsafe { self.flushable.cursor_mut_from_ptr(ptr) }
420        };
421
422        c.remove();
423    }
424
425    pub fn peek_flushable(&self) -> Option<Arc<StreamPriorityKey>> {
426        self.flushable.front().clone_pointer()
427    }
428
429    /// Updates the priorities of a stream.
430    pub fn update_priority(
431        &mut self, old: &Arc<StreamPriorityKey>, new: &Arc<StreamPriorityKey>,
432    ) {
433        if old.readable.is_linked() {
434            self.remove_readable(old);
435            self.readable.insert(Arc::clone(new));
436        }
437
438        if old.writable.is_linked() {
439            self.remove_writable(old);
440            self.writable.insert(Arc::clone(new));
441        }
442
443        if old.flushable.is_linked() {
444            self.remove_flushable(old);
445            self.flushable.insert(Arc::clone(new));
446        }
447    }
448
449    /// Adds the stream ID to the almost full streams set.
450    ///
451    /// If the stream was already in the list, this does nothing.
452    pub fn insert_almost_full(&mut self, stream_id: u64) {
453        self.almost_full.insert(stream_id);
454    }
455
456    /// Removes the stream ID from the almost full streams set.
457    pub fn remove_almost_full(&mut self, stream_id: u64) {
458        self.almost_full.remove(&stream_id);
459    }
460
461    /// Adds the stream ID to the blocked streams set with the
462    /// given offset value.
463    ///
464    /// If the stream was already in the list, this does nothing.
465    pub fn insert_blocked(&mut self, stream_id: u64, off: u64) {
466        self.blocked.insert(stream_id, off);
467    }
468
469    /// Removes the stream ID from the blocked streams set.
470    pub fn remove_blocked(&mut self, stream_id: u64) {
471        self.blocked.remove(&stream_id);
472    }
473
474    /// Adds the stream ID to the reset streams set with the
475    /// given error code and final size values.
476    ///
477    /// If the stream was already in the list, this does nothing.
478    pub fn insert_reset(
479        &mut self, stream_id: u64, error_code: u64, final_size: u64,
480    ) {
481        self.reset.insert(stream_id, (error_code, final_size));
482    }
483
484    /// Removes the stream ID from the reset streams set.
485    pub fn remove_reset(&mut self, stream_id: u64) {
486        self.reset.remove(&stream_id);
487    }
488
489    /// Adds the stream ID to the stopped streams set with the
490    /// given error code.
491    ///
492    /// If the stream was already in the list, this does nothing.
493    pub fn insert_stopped(&mut self, stream_id: u64, error_code: u64) {
494        self.stopped.insert(stream_id, error_code);
495    }
496
497    /// Removes the stream ID from the stopped streams set.
498    pub fn remove_stopped(&mut self, stream_id: u64) {
499        self.stopped.remove(&stream_id);
500    }
501
502    /// Updates the peer's maximum bidirectional stream count limit.
503    pub fn update_peer_max_streams_bidi(&mut self, v: u64) {
504        self.peer_max_streams_bidi = cmp::max(self.peer_max_streams_bidi, v);
505    }
506
507    /// Updates the peer's maximum unidirectional stream count limit.
508    pub fn update_peer_max_streams_uni(&mut self, v: u64) {
509        self.peer_max_streams_uni = cmp::max(self.peer_max_streams_uni, v);
510    }
511
512    /// Commits the new max_streams_bidi limit.
513    pub fn update_max_streams_bidi(&mut self) {
514        self.local_max_streams_bidi = self.local_max_streams_bidi_next;
515    }
516
517    /// Sets the max_streams_bidi limit to the given value.
518    pub fn set_max_streams_bidi(&mut self, max: u64) {
519        self.local_max_streams_bidi = max;
520        self.local_max_streams_bidi_next = max;
521    }
522
523    /// Returns the current max_streams_bidi limit.
524    pub fn max_streams_bidi(&self) -> u64 {
525        self.local_max_streams_bidi
526    }
527
528    /// Returns the new max_streams_bidi limit.
529    pub fn max_streams_bidi_next(&mut self) -> u64 {
530        self.local_max_streams_bidi_next
531    }
532
533    /// Commits the new max_streams_uni limit.
534    pub fn update_max_streams_uni(&mut self) {
535        self.local_max_streams_uni = self.local_max_streams_uni_next;
536    }
537
538    /// Returns the new max_streams_uni limit.
539    pub fn max_streams_uni_next(&mut self) -> u64 {
540        self.local_max_streams_uni_next
541    }
542
543    /// Returns the number of bidirectional streams that can be created
544    /// before the peer's stream count limit is reached.
545    pub fn peer_streams_left_bidi(&self) -> u64 {
546        self.peer_max_streams_bidi - self.local_opened_streams_bidi
547    }
548
549    /// Returns the number of unidirectional streams that can be created
550    /// before the peer's stream count limit is reached.
551    pub fn peer_streams_left_uni(&self) -> u64 {
552        self.peer_max_streams_uni - self.local_opened_streams_uni
553    }
554
555    /// Drops completed stream.
556    ///
557    /// This should only be called when Stream::is_complete() returns true for
558    /// the given stream.
559    pub fn collect(&mut self, stream_id: u64, local: bool) {
560        if !local {
561            // If the stream was created by the peer, give back a max streams
562            // credit.
563            if is_bidi(stream_id) {
564                self.local_max_streams_bidi_next =
565                    self.local_max_streams_bidi_next.saturating_add(1);
566            } else {
567                self.local_max_streams_uni_next =
568                    self.local_max_streams_uni_next.saturating_add(1);
569            }
570        }
571
572        let s = self.streams.remove(&stream_id).unwrap();
573
574        self.remove_readable(&s.priority_key);
575
576        self.remove_writable(&s.priority_key);
577
578        self.remove_flushable(&s.priority_key);
579
580        self.collected.insert(stream_id);
581    }
582
583    /// Creates an iterator over streams that have outstanding data to read.
584    pub fn readable(&self) -> StreamIter {
585        StreamIter {
586            streams: self.readable.iter().map(|s| s.id).collect(),
587            index: 0,
588        }
589    }
590
591    /// Creates an iterator over streams that can be written to.
592    pub fn writable(&self) -> StreamIter {
593        StreamIter {
594            streams: self.writable.iter().map(|s| s.id).collect(),
595            index: 0,
596        }
597    }
598
599    /// Creates an iterator over streams that need to send MAX_STREAM_DATA.
600    pub fn almost_full(&self) -> StreamIter {
601        StreamIter::from(&self.almost_full)
602    }
603
604    /// Creates an iterator over streams that need to send STREAM_DATA_BLOCKED.
605    pub fn blocked(&self) -> hash_map::Iter<'_, u64, u64> {
606        self.blocked.iter()
607    }
608
609    /// Creates an iterator over streams that need to send RESET_STREAM.
610    pub fn reset(&self) -> hash_map::Iter<'_, u64, (u64, u64)> {
611        self.reset.iter()
612    }
613
614    /// Creates an iterator over streams that need to send STOP_SENDING.
615    pub fn stopped(&self) -> hash_map::Iter<'_, u64, u64> {
616        self.stopped.iter()
617    }
618
619    /// Returns true if the stream has been collected.
620    pub fn is_collected(&self, stream_id: u64) -> bool {
621        self.collected.contains(&stream_id)
622    }
623
624    /// Returns true if there are any streams that have data to write.
625    pub fn has_flushable(&self) -> bool {
626        !self.flushable.is_empty()
627    }
628
629    /// Returns true if there are any streams that have data to read.
630    pub fn has_readable(&self) -> bool {
631        !self.readable.is_empty()
632    }
633
634    /// Returns true if there are any streams that need to update the local
635    /// flow control limit.
636    pub fn has_almost_full(&self) -> bool {
637        !self.almost_full.is_empty()
638    }
639
640    /// Returns true if there are any streams that are blocked.
641    pub fn has_blocked(&self) -> bool {
642        !self.blocked.is_empty()
643    }
644
645    /// Returns true if there are any streams that are reset.
646    pub fn has_reset(&self) -> bool {
647        !self.reset.is_empty()
648    }
649
650    /// Returns true if there are any streams that need to send STOP_SENDING.
651    pub fn has_stopped(&self) -> bool {
652        !self.stopped.is_empty()
653    }
654
655    /// Returns true if the max bidirectional streams count needs to be updated
656    /// by sending a MAX_STREAMS frame to the peer.
657    pub fn should_update_max_streams_bidi(&self) -> bool {
658        self.local_max_streams_bidi_next != self.local_max_streams_bidi &&
659            self.local_max_streams_bidi_next / 2 >
660                self.local_max_streams_bidi - self.peer_opened_streams_bidi
661    }
662
663    /// Returns true if the max unidirectional streams count needs to be updated
664    /// by sending a MAX_STREAMS frame to the peer.
665    pub fn should_update_max_streams_uni(&self) -> bool {
666        self.local_max_streams_uni_next != self.local_max_streams_uni &&
667            self.local_max_streams_uni_next / 2 >
668                self.local_max_streams_uni - self.peer_opened_streams_uni
669    }
670
671    /// Returns the number of active streams in the map.
672    #[cfg(test)]
673    pub fn len(&self) -> usize {
674        self.streams.len()
675    }
676}
677
678/// A QUIC stream.
679pub struct Stream<F: BufFactory = DefaultBufFactory> {
680    /// Receive-side stream buffer.
681    pub recv: recv_buf::RecvBuf,
682
683    /// Send-side stream buffer.
684    pub send: send_buf::SendBuf<F>,
685
686    pub send_lowat: usize,
687
688    /// Whether the stream is bidirectional.
689    pub bidi: bool,
690
691    /// Whether the stream was created by the local endpoint.
692    pub local: bool,
693
694    /// The stream's urgency (lower is better). Default is `DEFAULT_URGENCY`.
695    pub urgency: u8,
696
697    /// Whether the stream can be flushed incrementally. Default is `true`.
698    pub incremental: bool,
699
700    pub priority_key: Arc<StreamPriorityKey>,
701}
702
703impl<F: BufFactory> Stream<F> {
704    /// Creates a new stream with the given flow control limits.
705    pub fn new(
706        id: u64, max_rx_data: u64, max_tx_data: u64, bidi: bool, local: bool,
707        max_window: u64,
708    ) -> Self {
709        let priority_key = Arc::new(StreamPriorityKey {
710            id,
711            ..Default::default()
712        });
713
714        Stream {
715            recv: recv_buf::RecvBuf::new(max_rx_data, max_window),
716            send: send_buf::SendBuf::new(max_tx_data),
717            send_lowat: 1,
718            bidi,
719            local,
720            urgency: priority_key.urgency,
721            incremental: priority_key.incremental,
722            priority_key,
723        }
724    }
725
726    /// Returns true if the stream has data to read.
727    pub fn is_readable(&self) -> bool {
728        self.recv.ready()
729    }
730
731    /// Returns true if the stream has enough flow control capacity to be
732    /// written to, and is not finished.
733    pub fn is_writable(&self) -> bool {
734        !self.send.is_shutdown() &&
735            !self.send.is_fin() &&
736            (self.send.off_back() + self.send_lowat as u64) <
737                self.send.max_off()
738    }
739
740    /// Returns true if the stream has data to send and is allowed to send at
741    /// least some of it.
742    pub fn is_flushable(&self) -> bool {
743        let off_front = self.send.off_front();
744
745        !self.send.is_empty() &&
746            off_front < self.send.off_back() &&
747            off_front < self.send.max_off()
748    }
749
750    /// Returns true if the stream is complete.
751    ///
752    /// For bidirectional streams this happens when both the receive and send
753    /// sides are complete. That is when all incoming data has been read by the
754    /// application, and when all outgoing data has been acked by the peer.
755    ///
756    /// For unidirectional streams this happens when either the receive or send
757    /// side is complete, depending on whether the stream was created locally
758    /// or not.
759    pub fn is_complete(&self) -> bool {
760        match (self.bidi, self.local) {
761            // For bidirectional streams we need to check both receive and send
762            // sides for completion.
763            (true, _) => self.recv.is_fin() && self.send.is_complete(),
764
765            // For unidirectional streams generated locally, we only need to
766            // check the send side for completion.
767            (false, true) => self.send.is_complete(),
768
769            // For unidirectional streams generated by the peer, we only need
770            // to check the receive side for completion.
771            (false, false) => self.recv.is_fin(),
772        }
773    }
774}
775
776/// Returns true if the stream was created locally.
777pub fn is_local(stream_id: u64, is_server: bool) -> bool {
778    (stream_id & 0x1) == (is_server as u64)
779}
780
781/// Returns true if the stream is bidirectional.
782pub fn is_bidi(stream_id: u64) -> bool {
783    (stream_id & 0x2) == 0
784}
785
786#[derive(Clone, Debug)]
787pub struct StreamPriorityKey {
788    pub urgency: u8,
789    pub incremental: bool,
790    pub id: u64,
791
792    pub readable: RBTreeAtomicLink,
793    pub writable: RBTreeAtomicLink,
794    pub flushable: RBTreeAtomicLink,
795}
796
797impl Default for StreamPriorityKey {
798    fn default() -> Self {
799        Self {
800            urgency: DEFAULT_URGENCY,
801            incremental: true,
802            id: Default::default(),
803            readable: Default::default(),
804            writable: Default::default(),
805            flushable: Default::default(),
806        }
807    }
808}
809
810impl PartialEq for StreamPriorityKey {
811    fn eq(&self, other: &Self) -> bool {
812        self.id == other.id
813    }
814}
815
816impl Eq for StreamPriorityKey {}
817
818impl PartialOrd for StreamPriorityKey {
819    // Priority ordering is complex, disable Clippy warning.
820    #[allow(clippy::non_canonical_partial_ord_impl)]
821    fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
822        // Ignore priority if ID matches.
823        if self.id == other.id {
824            return Some(cmp::Ordering::Equal);
825        }
826
827        // First, order by urgency...
828        if self.urgency != other.urgency {
829            return self.urgency.partial_cmp(&other.urgency);
830        }
831
832        // ...when the urgency is the same, and both are not incremental, order
833        // by stream ID...
834        if !self.incremental && !other.incremental {
835            return self.id.partial_cmp(&other.id);
836        }
837
838        // ...non-incremental takes priority over incremental...
839        if self.incremental && !other.incremental {
840            return Some(cmp::Ordering::Greater);
841        }
842        if !self.incremental && other.incremental {
843            return Some(cmp::Ordering::Less);
844        }
845
846        // ...finally, when both are incremental, `other` takes precedence (so
847        // `self` is always sorted after other same-urgency incremental
848        // entries).
849        Some(cmp::Ordering::Greater)
850    }
851}
852
853impl Ord for StreamPriorityKey {
854    fn cmp(&self, other: &Self) -> cmp::Ordering {
855        // `partial_cmp()` never returns `None`, so this should be safe.
856        self.partial_cmp(other).unwrap()
857    }
858}
859
860intrusive_adapter!(pub StreamWritablePriorityAdapter = Arc<StreamPriorityKey>: StreamPriorityKey { writable: RBTreeAtomicLink });
861
862impl KeyAdapter<'_> for StreamWritablePriorityAdapter {
863    type Key = StreamPriorityKey;
864
865    fn get_key(&self, s: &StreamPriorityKey) -> Self::Key {
866        s.clone()
867    }
868}
869
870intrusive_adapter!(pub StreamReadablePriorityAdapter = Arc<StreamPriorityKey>: StreamPriorityKey { readable: RBTreeAtomicLink });
871
872impl KeyAdapter<'_> for StreamReadablePriorityAdapter {
873    type Key = StreamPriorityKey;
874
875    fn get_key(&self, s: &StreamPriorityKey) -> Self::Key {
876        s.clone()
877    }
878}
879
880intrusive_adapter!(pub StreamFlushablePriorityAdapter = Arc<StreamPriorityKey>: StreamPriorityKey { flushable: RBTreeAtomicLink });
881
882impl KeyAdapter<'_> for StreamFlushablePriorityAdapter {
883    type Key = StreamPriorityKey;
884
885    fn get_key(&self, s: &StreamPriorityKey) -> Self::Key {
886        s.clone()
887    }
888}
889
890/// An iterator over QUIC streams.
891#[derive(Default)]
892pub struct StreamIter {
893    streams: SmallVec<[u64; 8]>,
894    index: usize,
895}
896
897impl StreamIter {
898    #[inline]
899    fn from(streams: &StreamIdHashSet) -> Self {
900        StreamIter {
901            streams: streams.iter().copied().collect(),
902            index: 0,
903        }
904    }
905}
906
907impl Iterator for StreamIter {
908    type Item = u64;
909
910    #[inline]
911    fn next(&mut self) -> Option<Self::Item> {
912        let v = self.streams.get(self.index)?;
913        self.index += 1;
914        Some(*v)
915    }
916}
917
918impl ExactSizeIterator for StreamIter {
919    #[inline]
920    fn len(&self) -> usize {
921        self.streams.len() - self.index
922    }
923}
924
925#[cfg(test)]
926mod tests {
927    use crate::range_buf::RangeBuf;
928
929    use super::*;
930
931    #[test]
932    fn recv_flow_control() {
933        let mut stream =
934            <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
935        assert!(!stream.recv.almost_full());
936
937        let mut buf = [0; 32];
938
939        let first = RangeBuf::from(b"hello", 0, false);
940        let second = RangeBuf::from(b"world", 5, false);
941        let third = RangeBuf::from(b"something", 10, false);
942
943        assert_eq!(stream.recv.write(second), Ok(()));
944        assert_eq!(stream.recv.write(first), Ok(()));
945        assert!(!stream.recv.almost_full());
946
947        assert_eq!(stream.recv.write(third), Err(Error::FlowControl));
948
949        let (len, fin) = stream.recv.emit(&mut buf).unwrap();
950        assert_eq!(&buf[..len], b"helloworld");
951        assert!(!fin);
952
953        assert!(stream.recv.almost_full());
954
955        stream.recv.update_max_data(std::time::Instant::now());
956        assert_eq!(stream.recv.max_data_next(), 25);
957        assert!(!stream.recv.almost_full());
958
959        let third = RangeBuf::from(b"something", 10, false);
960        assert_eq!(stream.recv.write(third), Ok(()));
961    }
962
963    #[test]
964    fn recv_past_fin() {
965        let mut stream =
966            <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
967        assert!(!stream.recv.almost_full());
968
969        let first = RangeBuf::from(b"hello", 0, true);
970        let second = RangeBuf::from(b"world", 5, false);
971
972        assert_eq!(stream.recv.write(first), Ok(()));
973        assert_eq!(stream.recv.write(second), Err(Error::FinalSize));
974    }
975
976    #[test]
977    fn recv_fin_dup() {
978        let mut stream =
979            <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
980        assert!(!stream.recv.almost_full());
981
982        let first = RangeBuf::from(b"hello", 0, true);
983        let second = RangeBuf::from(b"hello", 0, true);
984
985        assert_eq!(stream.recv.write(first), Ok(()));
986        assert_eq!(stream.recv.write(second), Ok(()));
987
988        let mut buf = [0; 32];
989
990        let (len, fin) = stream.recv.emit(&mut buf).unwrap();
991        assert_eq!(&buf[..len], b"hello");
992        assert!(fin);
993    }
994
995    #[test]
996    fn recv_fin_change() {
997        let mut stream =
998            <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
999        assert!(!stream.recv.almost_full());
1000
1001        let first = RangeBuf::from(b"hello", 0, true);
1002        let second = RangeBuf::from(b"world", 5, true);
1003
1004        assert_eq!(stream.recv.write(second), Ok(()));
1005        assert_eq!(stream.recv.write(first), Err(Error::FinalSize));
1006    }
1007
1008    #[test]
1009    fn recv_fin_lower_than_received() {
1010        let mut stream =
1011            <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1012        assert!(!stream.recv.almost_full());
1013
1014        let first = RangeBuf::from(b"hello", 0, true);
1015        let second = RangeBuf::from(b"world", 5, false);
1016
1017        assert_eq!(stream.recv.write(second), Ok(()));
1018        assert_eq!(stream.recv.write(first), Err(Error::FinalSize));
1019    }
1020
1021    #[test]
1022    fn recv_fin_flow_control() {
1023        let mut stream =
1024            <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1025        assert!(!stream.recv.almost_full());
1026
1027        let mut buf = [0; 32];
1028
1029        let first = RangeBuf::from(b"hello", 0, false);
1030        let second = RangeBuf::from(b"world", 5, true);
1031
1032        assert_eq!(stream.recv.write(first), Ok(()));
1033        assert_eq!(stream.recv.write(second), Ok(()));
1034
1035        let (len, fin) = stream.recv.emit(&mut buf).unwrap();
1036        assert_eq!(&buf[..len], b"helloworld");
1037        assert!(fin);
1038
1039        assert!(!stream.recv.almost_full());
1040    }
1041
1042    #[test]
1043    fn recv_fin_reset_mismatch() {
1044        let mut stream =
1045            <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1046        assert!(!stream.recv.almost_full());
1047
1048        let first = RangeBuf::from(b"hello", 0, true);
1049
1050        assert_eq!(stream.recv.write(first), Ok(()));
1051        assert_eq!(stream.recv.reset(0, 10), Err(Error::FinalSize));
1052    }
1053
1054    #[test]
1055    fn recv_reset_with_gap() {
1056        let mut stream =
1057            <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1058        assert!(!stream.recv.almost_full());
1059
1060        let first = RangeBuf::from(b"hello", 0, false);
1061
1062        assert_eq!(stream.recv.write(first), Ok(()));
1063        // Read one byte.
1064        assert_eq!(stream.recv.emit(&mut [0; 1]), Ok((1, false)));
1065        // Reset with a final size > than max previously received
1066        assert_eq!(
1067            stream.recv.reset(0, 10),
1068            Ok(RecvBufResetReturn {
1069                max_data_delta: 5,
1070                // consumed_flowcontrol is 9, since we already read 1 byte
1071                consumed_flowcontrol: 9
1072            })
1073        );
1074        assert_eq!(stream.recv.reset(0, 10), Ok(RecvBufResetReturn::zero()));
1075    }
1076
1077    #[test]
1078    fn recv_reset_dup() {
1079        let mut stream =
1080            <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1081        assert!(!stream.recv.almost_full());
1082
1083        let first = RangeBuf::from(b"hello", 0, false);
1084
1085        assert_eq!(stream.recv.write(first), Ok(()));
1086        assert_eq!(
1087            stream.recv.reset(0, 5),
1088            Ok(RecvBufResetReturn {
1089                max_data_delta: 0,
1090                consumed_flowcontrol: 5
1091            })
1092        );
1093        assert_eq!(stream.recv.reset(0, 5), Ok(RecvBufResetReturn::zero()));
1094    }
1095
1096    #[test]
1097    fn recv_reset_change() {
1098        let mut stream =
1099            <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1100        assert!(!stream.recv.almost_full());
1101
1102        let first = RangeBuf::from(b"hello", 0, false);
1103
1104        assert_eq!(stream.recv.write(first), Ok(()));
1105        assert_eq!(
1106            stream.recv.reset(0, 5),
1107            Ok(RecvBufResetReturn {
1108                max_data_delta: 0,
1109                consumed_flowcontrol: 5
1110            })
1111        );
1112        assert_eq!(stream.recv.reset(0, 10), Err(Error::FinalSize));
1113    }
1114
1115    #[test]
1116    fn recv_reset_lower_than_received() {
1117        let mut stream =
1118            <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1119        assert!(!stream.recv.almost_full());
1120
1121        let first = RangeBuf::from(b"hello", 0, false);
1122
1123        assert_eq!(stream.recv.write(first), Ok(()));
1124        assert_eq!(stream.recv.reset(0, 4), Err(Error::FinalSize));
1125    }
1126
1127    #[test]
1128    fn send_flow_control() {
1129        let mut buf = [0; 25];
1130
1131        let mut stream =
1132            <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1133
1134        let first = b"hello";
1135        let second = b"world";
1136        let third = b"something";
1137
1138        assert!(stream.send.write(first, false).is_ok());
1139        assert!(stream.send.write(second, false).is_ok());
1140        assert!(stream.send.write(third, false).is_ok());
1141
1142        assert_eq!(stream.send.off_front(), 0);
1143
1144        let (written, fin) = stream.send.emit(&mut buf[..25]).unwrap();
1145        assert_eq!(written, 15);
1146        assert!(!fin);
1147        assert_eq!(&buf[..written], b"helloworldsomet");
1148
1149        assert_eq!(stream.send.off_front(), 15);
1150
1151        let (written, fin) = stream.send.emit(&mut buf[..25]).unwrap();
1152        assert_eq!(written, 0);
1153        assert!(!fin);
1154        assert_eq!(&buf[..written], b"");
1155
1156        stream.send.retransmit(0, 15);
1157
1158        assert_eq!(stream.send.off_front(), 0);
1159
1160        let (written, fin) = stream.send.emit(&mut buf[..10]).unwrap();
1161        assert_eq!(written, 10);
1162        assert!(!fin);
1163        assert_eq!(&buf[..written], b"helloworld");
1164
1165        assert_eq!(stream.send.off_front(), 10);
1166
1167        let (written, fin) = stream.send.emit(&mut buf[..10]).unwrap();
1168        assert_eq!(written, 5);
1169        assert!(!fin);
1170        assert_eq!(&buf[..written], b"somet");
1171    }
1172
1173    #[test]
1174    fn send_past_fin() {
1175        let mut stream =
1176            <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1177
1178        let first = b"hello";
1179        let second = b"world";
1180        let third = b"third";
1181
1182        assert_eq!(stream.send.write(first, false), Ok(5));
1183
1184        assert_eq!(stream.send.write(second, true), Ok(5));
1185        assert!(stream.send.is_fin());
1186
1187        assert_eq!(stream.send.write(third, false), Err(Error::FinalSize));
1188    }
1189
1190    #[test]
1191    fn send_fin_dup() {
1192        let mut stream =
1193            <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1194
1195        assert_eq!(stream.send.write(b"hello", true), Ok(5));
1196        assert!(stream.send.is_fin());
1197
1198        assert_eq!(stream.send.write(b"", true), Ok(0));
1199        assert!(stream.send.is_fin());
1200    }
1201
1202    #[test]
1203    fn send_undo_fin() {
1204        let mut stream =
1205            <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1206
1207        assert_eq!(stream.send.write(b"hello", true), Ok(5));
1208        assert!(stream.send.is_fin());
1209
1210        assert_eq!(
1211            stream.send.write(b"helloworld", true),
1212            Err(Error::FinalSize)
1213        );
1214    }
1215
1216    #[test]
1217    fn send_fin_max_data_match() {
1218        let mut buf = [0; 15];
1219
1220        let mut stream =
1221            <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1222
1223        let slice = b"hellohellohello";
1224
1225        assert!(stream.send.write(slice, true).is_ok());
1226
1227        let (written, fin) = stream.send.emit(&mut buf[..15]).unwrap();
1228        assert_eq!(written, 15);
1229        assert!(fin);
1230        assert_eq!(&buf[..written], slice);
1231    }
1232
1233    #[test]
1234    fn send_fin_zero_length() {
1235        let mut buf = [0; 5];
1236
1237        let mut stream =
1238            <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1239
1240        assert_eq!(stream.send.write(b"hello", false), Ok(5));
1241        assert_eq!(stream.send.write(b"", true), Ok(0));
1242        assert!(stream.send.is_fin());
1243
1244        let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1245        assert_eq!(written, 5);
1246        assert!(fin);
1247        assert_eq!(&buf[..written], b"hello");
1248    }
1249
1250    #[test]
1251    fn send_ack() {
1252        let mut buf = [0; 5];
1253
1254        let mut stream =
1255            <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1256
1257        assert_eq!(stream.send.write(b"hello", false), Ok(5));
1258        assert_eq!(stream.send.write(b"world", false), Ok(5));
1259        assert_eq!(stream.send.write(b"", true), Ok(0));
1260        assert!(stream.send.is_fin());
1261
1262        assert_eq!(stream.send.off_front(), 0);
1263
1264        let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1265        assert_eq!(written, 5);
1266        assert!(!fin);
1267        assert_eq!(&buf[..written], b"hello");
1268
1269        stream.send.ack_and_drop(0, 5);
1270
1271        stream.send.retransmit(0, 5);
1272
1273        assert_eq!(stream.send.off_front(), 5);
1274
1275        let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1276        assert_eq!(written, 5);
1277        assert!(fin);
1278        assert_eq!(&buf[..written], b"world");
1279    }
1280
1281    #[test]
1282    fn send_ack_reordering() {
1283        let mut buf = [0; 5];
1284
1285        let mut stream =
1286            <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1287
1288        assert_eq!(stream.send.write(b"hello", false), Ok(5));
1289        assert_eq!(stream.send.write(b"world", false), Ok(5));
1290        assert_eq!(stream.send.write(b"", true), Ok(0));
1291        assert!(stream.send.is_fin());
1292
1293        assert_eq!(stream.send.off_front(), 0);
1294
1295        let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1296        assert_eq!(written, 5);
1297        assert!(!fin);
1298        assert_eq!(&buf[..written], b"hello");
1299
1300        assert_eq!(stream.send.off_front(), 5);
1301
1302        let (written, fin) = stream.send.emit(&mut buf[..1]).unwrap();
1303        assert_eq!(written, 1);
1304        assert!(!fin);
1305        assert_eq!(&buf[..written], b"w");
1306
1307        stream.send.ack_and_drop(5, 1);
1308        stream.send.ack_and_drop(0, 5);
1309
1310        stream.send.retransmit(0, 5);
1311        stream.send.retransmit(5, 1);
1312
1313        assert_eq!(stream.send.off_front(), 6);
1314
1315        let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1316        assert_eq!(written, 4);
1317        assert!(fin);
1318        assert_eq!(&buf[..written], b"orld");
1319    }
1320
1321    #[test]
1322    fn recv_data_below_off() {
1323        let mut stream =
1324            <Stream>::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1325
1326        let first = RangeBuf::from(b"hello", 0, false);
1327
1328        assert_eq!(stream.recv.write(first), Ok(()));
1329
1330        let mut buf = [0; 10];
1331
1332        let (len, fin) = stream.recv.emit(&mut buf).unwrap();
1333        assert_eq!(&buf[..len], b"hello");
1334        assert!(!fin);
1335
1336        let first = RangeBuf::from(b"elloworld", 1, true);
1337        assert_eq!(stream.recv.write(first), Ok(()));
1338
1339        let (len, fin) = stream.recv.emit(&mut buf).unwrap();
1340        assert_eq!(&buf[..len], b"world");
1341        assert!(fin);
1342    }
1343
1344    #[test]
1345    fn stream_complete() {
1346        let mut stream =
1347            <Stream>::new(0, 30, 30, true, true, DEFAULT_STREAM_WINDOW);
1348
1349        assert_eq!(stream.send.write(b"hello", false), Ok(5));
1350        assert_eq!(stream.send.write(b"world", false), Ok(5));
1351
1352        assert!(!stream.send.is_complete());
1353        assert!(!stream.send.is_fin());
1354
1355        assert_eq!(stream.send.write(b"", true), Ok(0));
1356
1357        assert!(!stream.send.is_complete());
1358        assert!(stream.send.is_fin());
1359
1360        let buf = RangeBuf::from(b"hello", 0, true);
1361        assert!(stream.recv.write(buf).is_ok());
1362        assert!(!stream.recv.is_fin());
1363
1364        stream.send.ack(6, 4);
1365        assert!(!stream.send.is_complete());
1366
1367        let mut buf = [0; 2];
1368        assert_eq!(stream.recv.emit(&mut buf), Ok((2, false)));
1369        assert!(!stream.recv.is_fin());
1370
1371        stream.send.ack(1, 5);
1372        assert!(!stream.send.is_complete());
1373
1374        stream.send.ack(0, 1);
1375        assert!(stream.send.is_complete());
1376
1377        assert!(!stream.is_complete());
1378
1379        let mut buf = [0; 3];
1380        assert_eq!(stream.recv.emit(&mut buf), Ok((3, true)));
1381        assert!(stream.recv.is_fin());
1382
1383        assert!(stream.is_complete());
1384    }
1385
1386    #[test]
1387    fn send_fin_zero_length_output() {
1388        let mut buf = [0; 5];
1389
1390        let mut stream =
1391            <Stream>::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1392
1393        assert_eq!(stream.send.write(b"hello", false), Ok(5));
1394        assert_eq!(stream.send.off_front(), 0);
1395        assert!(!stream.send.is_fin());
1396
1397        let (written, fin) = stream.send.emit(&mut buf).unwrap();
1398        assert_eq!(written, 5);
1399        assert!(!fin);
1400        assert_eq!(&buf[..written], b"hello");
1401
1402        assert_eq!(stream.send.write(b"", true), Ok(0));
1403        assert!(stream.send.is_fin());
1404        assert_eq!(stream.send.off_front(), 5);
1405
1406        let (written, fin) = stream.send.emit(&mut buf).unwrap();
1407        assert_eq!(written, 0);
1408        assert!(fin);
1409        assert_eq!(&buf[..written], b"");
1410    }
1411
1412    fn stream_send_ready(stream: &Stream) -> bool {
1413        !stream.send.is_empty() &&
1414            stream.send.off_front() < stream.send.off_back()
1415    }
1416
1417    #[test]
1418    fn send_emit() {
1419        let mut buf = [0; 5];
1420
1421        let mut stream =
1422            <Stream>::new(0, 0, 20, true, true, DEFAULT_STREAM_WINDOW);
1423
1424        assert_eq!(stream.send.write(b"hello", false), Ok(5));
1425        assert_eq!(stream.send.write(b"world", false), Ok(5));
1426        assert_eq!(stream.send.write(b"olleh", false), Ok(5));
1427        assert_eq!(stream.send.write(b"dlrow", true), Ok(5));
1428        assert_eq!(stream.send.off_front(), 0);
1429        assert_eq!(stream.send.bufs_count(), 4);
1430
1431        assert!(stream.is_flushable());
1432
1433        assert!(stream_send_ready(&stream));
1434        assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1435        assert_eq!(stream.send.off_front(), 4);
1436        assert_eq!(&buf[..4], b"hell");
1437
1438        assert!(stream_send_ready(&stream));
1439        assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1440        assert_eq!(stream.send.off_front(), 8);
1441        assert_eq!(&buf[..4], b"owor");
1442
1443        assert!(stream_send_ready(&stream));
1444        assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1445        assert_eq!(stream.send.off_front(), 10);
1446        assert_eq!(&buf[..2], b"ld");
1447
1448        assert!(stream_send_ready(&stream));
1449        assert_eq!(stream.send.emit(&mut buf[..1]), Ok((1, false)));
1450        assert_eq!(stream.send.off_front(), 11);
1451        assert_eq!(&buf[..1], b"o");
1452
1453        assert!(stream_send_ready(&stream));
1454        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1455        assert_eq!(stream.send.off_front(), 16);
1456        assert_eq!(&buf[..5], b"llehd");
1457
1458        assert!(stream_send_ready(&stream));
1459        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((4, true)));
1460        assert_eq!(stream.send.off_front(), 20);
1461        assert_eq!(&buf[..4], b"lrow");
1462
1463        assert!(!stream.is_flushable());
1464
1465        assert!(!stream_send_ready(&stream));
1466        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((0, true)));
1467        assert_eq!(stream.send.off_front(), 20);
1468    }
1469
1470    #[test]
1471    fn send_emit_ack() {
1472        let mut buf = [0; 5];
1473
1474        let mut stream =
1475            <Stream>::new(0, 0, 20, true, true, DEFAULT_STREAM_WINDOW);
1476
1477        assert_eq!(stream.send.write(b"hello", false), Ok(5));
1478        assert_eq!(stream.send.write(b"world", false), Ok(5));
1479        assert_eq!(stream.send.write(b"olleh", false), Ok(5));
1480        assert_eq!(stream.send.write(b"dlrow", true), Ok(5));
1481        assert_eq!(stream.send.off_front(), 0);
1482        assert_eq!(stream.send.bufs_count(), 4);
1483
1484        assert!(stream.is_flushable());
1485
1486        assert!(stream_send_ready(&stream));
1487        assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1488        assert_eq!(stream.send.off_front(), 4);
1489        assert_eq!(&buf[..4], b"hell");
1490
1491        assert!(stream_send_ready(&stream));
1492        assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1493        assert_eq!(stream.send.off_front(), 8);
1494        assert_eq!(&buf[..4], b"owor");
1495
1496        stream.send.ack_and_drop(0, 5);
1497        assert_eq!(stream.send.bufs_count(), 3);
1498
1499        assert!(stream_send_ready(&stream));
1500        assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1501        assert_eq!(stream.send.off_front(), 10);
1502        assert_eq!(&buf[..2], b"ld");
1503
1504        stream.send.ack_and_drop(7, 5);
1505        assert_eq!(stream.send.bufs_count(), 3);
1506
1507        assert!(stream_send_ready(&stream));
1508        assert_eq!(stream.send.emit(&mut buf[..1]), Ok((1, false)));
1509        assert_eq!(stream.send.off_front(), 11);
1510        assert_eq!(&buf[..1], b"o");
1511
1512        assert!(stream_send_ready(&stream));
1513        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1514        assert_eq!(stream.send.off_front(), 16);
1515        assert_eq!(&buf[..5], b"llehd");
1516
1517        stream.send.ack_and_drop(5, 7);
1518        assert_eq!(stream.send.bufs_count(), 2);
1519
1520        assert!(stream_send_ready(&stream));
1521        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((4, true)));
1522        assert_eq!(stream.send.off_front(), 20);
1523        assert_eq!(&buf[..4], b"lrow");
1524
1525        assert!(!stream.is_flushable());
1526
1527        assert!(!stream_send_ready(&stream));
1528        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((0, true)));
1529        assert_eq!(stream.send.off_front(), 20);
1530
1531        stream.send.ack_and_drop(22, 4);
1532        assert_eq!(stream.send.bufs_count(), 2);
1533
1534        stream.send.ack_and_drop(20, 1);
1535        assert_eq!(stream.send.bufs_count(), 2);
1536    }
1537
1538    #[test]
1539    fn send_emit_retransmit() {
1540        let mut buf = [0; 5];
1541
1542        let mut stream =
1543            <Stream>::new(0, 0, 20, true, true, DEFAULT_STREAM_WINDOW);
1544
1545        assert_eq!(stream.send.write(b"hello", false), Ok(5));
1546        assert_eq!(stream.send.write(b"world", false), Ok(5));
1547        assert_eq!(stream.send.write(b"olleh", false), Ok(5));
1548        assert_eq!(stream.send.write(b"dlrow", true), Ok(5));
1549        assert_eq!(stream.send.off_front(), 0);
1550        assert_eq!(stream.send.bufs_count(), 4);
1551
1552        assert!(stream.is_flushable());
1553
1554        assert!(stream_send_ready(&stream));
1555        assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1556        assert_eq!(stream.send.off_front(), 4);
1557        assert_eq!(&buf[..4], b"hell");
1558
1559        assert!(stream_send_ready(&stream));
1560        assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1561        assert_eq!(stream.send.off_front(), 8);
1562        assert_eq!(&buf[..4], b"owor");
1563
1564        stream.send.retransmit(3, 3);
1565        assert_eq!(stream.send.off_front(), 3);
1566
1567        assert!(stream_send_ready(&stream));
1568        assert_eq!(stream.send.emit(&mut buf[..3]), Ok((3, false)));
1569        assert_eq!(stream.send.off_front(), 8);
1570        assert_eq!(&buf[..3], b"low");
1571
1572        assert!(stream_send_ready(&stream));
1573        assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1574        assert_eq!(stream.send.off_front(), 10);
1575        assert_eq!(&buf[..2], b"ld");
1576
1577        stream.send.ack_and_drop(7, 2);
1578
1579        stream.send.retransmit(8, 2);
1580
1581        assert!(stream_send_ready(&stream));
1582        assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1583        assert_eq!(stream.send.off_front(), 10);
1584        assert_eq!(&buf[..2], b"ld");
1585
1586        assert!(stream_send_ready(&stream));
1587        assert_eq!(stream.send.emit(&mut buf[..1]), Ok((1, false)));
1588        assert_eq!(stream.send.off_front(), 11);
1589        assert_eq!(&buf[..1], b"o");
1590
1591        assert!(stream_send_ready(&stream));
1592        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1593        assert_eq!(stream.send.off_front(), 16);
1594        assert_eq!(&buf[..5], b"llehd");
1595
1596        stream.send.retransmit(12, 2);
1597
1598        assert!(stream_send_ready(&stream));
1599        assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1600        assert_eq!(stream.send.off_front(), 16);
1601        assert_eq!(&buf[..2], b"le");
1602
1603        assert!(stream_send_ready(&stream));
1604        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((4, true)));
1605        assert_eq!(stream.send.off_front(), 20);
1606        assert_eq!(&buf[..4], b"lrow");
1607
1608        assert!(!stream.is_flushable());
1609
1610        assert!(!stream_send_ready(&stream));
1611        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((0, true)));
1612        assert_eq!(stream.send.off_front(), 20);
1613
1614        stream.send.retransmit(7, 12);
1615
1616        assert!(stream_send_ready(&stream));
1617        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1618        assert_eq!(stream.send.off_front(), 12);
1619        assert_eq!(&buf[..5], b"rldol");
1620
1621        assert!(stream_send_ready(&stream));
1622        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1623        assert_eq!(stream.send.off_front(), 17);
1624        assert_eq!(&buf[..5], b"lehdl");
1625
1626        assert!(stream_send_ready(&stream));
1627        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((2, false)));
1628        assert_eq!(stream.send.off_front(), 20);
1629        assert_eq!(&buf[..2], b"ro");
1630
1631        stream.send.ack_and_drop(12, 7);
1632
1633        stream.send.retransmit(7, 12);
1634
1635        assert!(stream_send_ready(&stream));
1636        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1637        assert_eq!(stream.send.off_front(), 12);
1638        assert_eq!(&buf[..5], b"rldol");
1639
1640        assert!(stream_send_ready(&stream));
1641        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1642        assert_eq!(stream.send.off_front(), 17);
1643        assert_eq!(&buf[..5], b"lehdl");
1644
1645        assert!(stream_send_ready(&stream));
1646        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((2, false)));
1647        assert_eq!(stream.send.off_front(), 20);
1648        assert_eq!(&buf[..2], b"ro");
1649    }
1650
1651    #[test]
1652    fn rangebuf_split_off() {
1653        let mut buf = <RangeBuf>::from(b"helloworld", 5, true);
1654        assert_eq!(buf.start, 0);
1655        assert_eq!(buf.pos, 0);
1656        assert_eq!(buf.len, 10);
1657        assert_eq!(buf.off, 5);
1658        assert!(buf.fin);
1659
1660        assert_eq!(buf.len(), 10);
1661        assert_eq!(buf.off(), 5);
1662        assert!(buf.fin());
1663
1664        assert_eq!(&buf[..], b"helloworld");
1665
1666        // Advance buffer.
1667        buf.consume(5);
1668
1669        assert_eq!(buf.start, 0);
1670        assert_eq!(buf.pos, 5);
1671        assert_eq!(buf.len, 10);
1672        assert_eq!(buf.off, 5);
1673        assert!(buf.fin);
1674
1675        assert_eq!(buf.len(), 5);
1676        assert_eq!(buf.off(), 10);
1677        assert!(buf.fin());
1678
1679        assert_eq!(&buf[..], b"world");
1680
1681        // Split buffer before position.
1682        let mut new_buf = buf.split_off(3);
1683
1684        assert_eq!(buf.start, 0);
1685        assert_eq!(buf.pos, 3);
1686        assert_eq!(buf.len, 3);
1687        assert_eq!(buf.off, 5);
1688        assert!(!buf.fin);
1689
1690        assert_eq!(buf.len(), 0);
1691        assert_eq!(buf.off(), 8);
1692        assert!(!buf.fin());
1693
1694        assert_eq!(&buf[..], b"");
1695
1696        assert_eq!(new_buf.start, 3);
1697        assert_eq!(new_buf.pos, 5);
1698        assert_eq!(new_buf.len, 7);
1699        assert_eq!(new_buf.off, 8);
1700        assert!(new_buf.fin);
1701
1702        assert_eq!(new_buf.len(), 5);
1703        assert_eq!(new_buf.off(), 10);
1704        assert!(new_buf.fin());
1705
1706        assert_eq!(&new_buf[..], b"world");
1707
1708        // Advance buffer.
1709        new_buf.consume(2);
1710
1711        assert_eq!(new_buf.start, 3);
1712        assert_eq!(new_buf.pos, 7);
1713        assert_eq!(new_buf.len, 7);
1714        assert_eq!(new_buf.off, 8);
1715        assert!(new_buf.fin);
1716
1717        assert_eq!(new_buf.len(), 3);
1718        assert_eq!(new_buf.off(), 12);
1719        assert!(new_buf.fin());
1720
1721        assert_eq!(&new_buf[..], b"rld");
1722
1723        // Split buffer after position.
1724        let mut new_new_buf = new_buf.split_off(5);
1725
1726        assert_eq!(new_buf.start, 3);
1727        assert_eq!(new_buf.pos, 7);
1728        assert_eq!(new_buf.len, 5);
1729        assert_eq!(new_buf.off, 8);
1730        assert!(!new_buf.fin);
1731
1732        assert_eq!(new_buf.len(), 1);
1733        assert_eq!(new_buf.off(), 12);
1734        assert!(!new_buf.fin());
1735
1736        assert_eq!(&new_buf[..], b"r");
1737
1738        assert_eq!(new_new_buf.start, 8);
1739        assert_eq!(new_new_buf.pos, 8);
1740        assert_eq!(new_new_buf.len, 2);
1741        assert_eq!(new_new_buf.off, 13);
1742        assert!(new_new_buf.fin);
1743
1744        assert_eq!(new_new_buf.len(), 2);
1745        assert_eq!(new_new_buf.off(), 13);
1746        assert!(new_new_buf.fin());
1747
1748        assert_eq!(&new_new_buf[..], b"ld");
1749
1750        // Advance buffer.
1751        new_new_buf.consume(2);
1752
1753        assert_eq!(new_new_buf.start, 8);
1754        assert_eq!(new_new_buf.pos, 10);
1755        assert_eq!(new_new_buf.len, 2);
1756        assert_eq!(new_new_buf.off, 13);
1757        assert!(new_new_buf.fin);
1758
1759        assert_eq!(new_new_buf.len(), 0);
1760        assert_eq!(new_new_buf.off(), 15);
1761        assert!(new_new_buf.fin());
1762
1763        assert_eq!(&new_new_buf[..], b"");
1764    }
1765
1766    /// RFC9000 2.1: A stream ID that is used out of order results in all
1767    /// streams of that type with lower-numbered stream IDs also being opened.
1768    #[test]
1769    fn stream_limit_auto_open() {
1770        let local_tp = crate::TransportParams::default();
1771        let peer_tp = crate::TransportParams::default();
1772
1773        let mut streams = <StreamMap>::new(5, 5, 5);
1774
1775        let stream_id = 500;
1776        assert!(!is_local(stream_id, true), "stream id is peer initiated");
1777        assert!(is_bidi(stream_id), "stream id is bidirectional");
1778        assert_eq!(
1779            streams
1780                .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
1781                .err(),
1782            Some(Error::StreamLimit),
1783            "stream limit should be exceeded"
1784        );
1785    }
1786
1787    /// Stream limit should be satisfied regardless of what order we open
1788    /// streams
1789    #[test]
1790    fn stream_create_out_of_order() {
1791        let local_tp = crate::TransportParams::default();
1792        let peer_tp = crate::TransportParams::default();
1793
1794        let mut streams = <StreamMap>::new(5, 5, 5);
1795
1796        for stream_id in [8, 12, 4] {
1797            assert!(is_local(stream_id, false), "stream id is client initiated");
1798            assert!(is_bidi(stream_id), "stream id is bidirectional");
1799            assert!(streams
1800                .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
1801                .is_ok());
1802        }
1803    }
1804
1805    /// Check stream limit boundary cases
1806    #[test]
1807    fn stream_limit_edge() {
1808        let local_tp = crate::TransportParams::default();
1809        let peer_tp = crate::TransportParams::default();
1810
1811        let mut streams = <StreamMap>::new(3, 3, 3);
1812
1813        // Highest permitted
1814        let stream_id = 8;
1815        assert!(streams
1816            .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
1817            .is_ok());
1818
1819        // One more than highest permitted
1820        let stream_id = 12;
1821        assert_eq!(
1822            streams
1823                .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
1824                .err(),
1825            Some(Error::StreamLimit)
1826        );
1827    }
1828
1829    fn cycle_stream_priority(stream_id: u64, streams: &mut StreamMap) {
1830        let key = streams.get(stream_id).unwrap().priority_key.clone();
1831        streams.update_priority(&key.clone(), &key);
1832    }
1833
1834    #[test]
1835    fn writable_prioritized_default_priority() {
1836        let local_tp = crate::TransportParams::default();
1837        let peer_tp = crate::TransportParams {
1838            initial_max_stream_data_bidi_local: 100,
1839            initial_max_stream_data_uni: 100,
1840            ..Default::default()
1841        };
1842
1843        let mut streams = StreamMap::new(100, 100, 100);
1844
1845        for id in [0, 4, 8, 12] {
1846            assert!(streams
1847                .get_or_create(id, &local_tp, &peer_tp, false, true)
1848                .is_ok());
1849        }
1850
1851        let walk_1: Vec<u64> = streams.writable().collect();
1852        cycle_stream_priority(*walk_1.first().unwrap(), &mut streams);
1853        let walk_2: Vec<u64> = streams.writable().collect();
1854        cycle_stream_priority(*walk_2.first().unwrap(), &mut streams);
1855        let walk_3: Vec<u64> = streams.writable().collect();
1856        cycle_stream_priority(*walk_3.first().unwrap(), &mut streams);
1857        let walk_4: Vec<u64> = streams.writable().collect();
1858        cycle_stream_priority(*walk_4.first().unwrap(), &mut streams);
1859        let walk_5: Vec<u64> = streams.writable().collect();
1860
1861        // All streams are non-incremental and same urgency by default. Multiple
1862        // visits shuffle their order.
1863        assert_eq!(walk_1, vec![0, 4, 8, 12]);
1864        assert_eq!(walk_2, vec![4, 8, 12, 0]);
1865        assert_eq!(walk_3, vec![8, 12, 0, 4]);
1866        assert_eq!(walk_4, vec![12, 0, 4, 8,]);
1867        assert_eq!(walk_5, vec![0, 4, 8, 12]);
1868    }
1869
1870    #[test]
1871    fn writable_prioritized_insert_order() {
1872        let local_tp = crate::TransportParams::default();
1873        let peer_tp = crate::TransportParams {
1874            initial_max_stream_data_bidi_local: 100,
1875            initial_max_stream_data_uni: 100,
1876            ..Default::default()
1877        };
1878
1879        let mut streams = StreamMap::new(100, 100, 100);
1880
1881        // Inserting same-urgency incremental streams in a "random" order yields
1882        // same order to start with.
1883        for id in [12, 4, 8, 0] {
1884            assert!(streams
1885                .get_or_create(id, &local_tp, &peer_tp, false, true)
1886                .is_ok());
1887        }
1888
1889        let walk_1: Vec<u64> = streams.writable().collect();
1890        cycle_stream_priority(*walk_1.first().unwrap(), &mut streams);
1891        let walk_2: Vec<u64> = streams.writable().collect();
1892        cycle_stream_priority(*walk_2.first().unwrap(), &mut streams);
1893        let walk_3: Vec<u64> = streams.writable().collect();
1894        cycle_stream_priority(*walk_3.first().unwrap(), &mut streams);
1895        let walk_4: Vec<u64> = streams.writable().collect();
1896        cycle_stream_priority(*walk_4.first().unwrap(), &mut streams);
1897        let walk_5: Vec<u64> = streams.writable().collect();
1898        assert_eq!(walk_1, vec![12, 4, 8, 0]);
1899        assert_eq!(walk_2, vec![4, 8, 0, 12]);
1900        assert_eq!(walk_3, vec![8, 0, 12, 4,]);
1901        assert_eq!(walk_4, vec![0, 12, 4, 8]);
1902        assert_eq!(walk_5, vec![12, 4, 8, 0]);
1903    }
1904
1905    #[test]
1906    fn writable_prioritized_mixed_urgency() {
1907        let local_tp = crate::TransportParams::default();
1908        let peer_tp = crate::TransportParams {
1909            initial_max_stream_data_bidi_local: 100,
1910            initial_max_stream_data_uni: 100,
1911            ..Default::default()
1912        };
1913
1914        let mut streams = <StreamMap>::new(100, 100, 100);
1915
1916        // Streams where the urgency descends (becomes more important). No stream
1917        // shares an urgency.
1918        let input = vec![
1919            (0, 100),
1920            (4, 90),
1921            (8, 80),
1922            (12, 70),
1923            (16, 60),
1924            (20, 50),
1925            (24, 40),
1926            (28, 30),
1927            (32, 20),
1928            (36, 10),
1929            (40, 0),
1930        ];
1931
1932        for (id, urgency) in input.clone() {
1933            // this duplicates some code from stream_priority in order to access
1934            // streams and the collection they're in
1935            let stream = streams
1936                .get_or_create(id, &local_tp, &peer_tp, false, true)
1937                .unwrap();
1938
1939            stream.urgency = urgency;
1940
1941            let new_priority_key = Arc::new(StreamPriorityKey {
1942                urgency: stream.urgency,
1943                incremental: stream.incremental,
1944                id,
1945                ..Default::default()
1946            });
1947
1948            let old_priority_key = std::mem::replace(
1949                &mut stream.priority_key,
1950                new_priority_key.clone(),
1951            );
1952
1953            streams.update_priority(&old_priority_key, &new_priority_key);
1954        }
1955
1956        let walk_1: Vec<u64> = streams.writable().collect();
1957        assert_eq!(walk_1, vec![40, 36, 32, 28, 24, 20, 16, 12, 8, 4, 0]);
1958
1959        // Re-applying priority to a stream does not cause duplication.
1960        for (id, urgency) in input {
1961            // this duplicates some code from stream_priority in order to access
1962            // streams and the collection they're in
1963            let stream = streams
1964                .get_or_create(id, &local_tp, &peer_tp, false, true)
1965                .unwrap();
1966
1967            stream.urgency = urgency;
1968
1969            let new_priority_key = Arc::new(StreamPriorityKey {
1970                urgency: stream.urgency,
1971                incremental: stream.incremental,
1972                id,
1973                ..Default::default()
1974            });
1975
1976            let old_priority_key = std::mem::replace(
1977                &mut stream.priority_key,
1978                new_priority_key.clone(),
1979            );
1980
1981            streams.update_priority(&old_priority_key, &new_priority_key);
1982        }
1983
1984        let walk_2: Vec<u64> = streams.writable().collect();
1985        assert_eq!(walk_2, vec![40, 36, 32, 28, 24, 20, 16, 12, 8, 4, 0]);
1986
1987        // Removing streams doesn't break expected ordering.
1988        streams.collect(24, true);
1989
1990        let walk_3: Vec<u64> = streams.writable().collect();
1991        assert_eq!(walk_3, vec![40, 36, 32, 28, 20, 16, 12, 8, 4, 0]);
1992
1993        streams.collect(40, true);
1994        streams.collect(0, true);
1995
1996        let walk_4: Vec<u64> = streams.writable().collect();
1997        assert_eq!(walk_4, vec![36, 32, 28, 20, 16, 12, 8, 4]);
1998
1999        // Adding streams doesn't break expected ordering.
2000        streams
2001            .get_or_create(44, &local_tp, &peer_tp, false, true)
2002            .unwrap();
2003
2004        let walk_5: Vec<u64> = streams.writable().collect();
2005        assert_eq!(walk_5, vec![36, 32, 28, 20, 16, 12, 8, 4, 44]);
2006    }
2007
2008    #[test]
2009    fn writable_prioritized_mixed_urgencies_incrementals() {
2010        let local_tp = crate::TransportParams::default();
2011        let peer_tp = crate::TransportParams {
2012            initial_max_stream_data_bidi_local: 100,
2013            initial_max_stream_data_uni: 100,
2014            ..Default::default()
2015        };
2016
2017        let mut streams = StreamMap::new(100, 100, 100);
2018
2019        // Streams that share some urgency level
2020        let input = vec![
2021            (0, 100),
2022            (4, 20),
2023            (8, 100),
2024            (12, 20),
2025            (16, 90),
2026            (20, 25),
2027            (24, 90),
2028            (28, 30),
2029            (32, 80),
2030            (36, 20),
2031            (40, 0),
2032        ];
2033
2034        for (id, urgency) in input.clone() {
2035            // this duplicates some code from stream_priority in order to access
2036            // streams and the collection they're in
2037            let stream = streams
2038                .get_or_create(id, &local_tp, &peer_tp, false, true)
2039                .unwrap();
2040
2041            stream.urgency = urgency;
2042
2043            let new_priority_key = Arc::new(StreamPriorityKey {
2044                urgency: stream.urgency,
2045                incremental: stream.incremental,
2046                id,
2047                ..Default::default()
2048            });
2049
2050            let old_priority_key = std::mem::replace(
2051                &mut stream.priority_key,
2052                new_priority_key.clone(),
2053            );
2054
2055            streams.update_priority(&old_priority_key, &new_priority_key);
2056        }
2057
2058        let walk_1: Vec<u64> = streams.writable().collect();
2059        cycle_stream_priority(4, &mut streams);
2060        cycle_stream_priority(16, &mut streams);
2061        cycle_stream_priority(0, &mut streams);
2062        let walk_2: Vec<u64> = streams.writable().collect();
2063        cycle_stream_priority(12, &mut streams);
2064        cycle_stream_priority(24, &mut streams);
2065        cycle_stream_priority(8, &mut streams);
2066        let walk_3: Vec<u64> = streams.writable().collect();
2067        cycle_stream_priority(36, &mut streams);
2068        cycle_stream_priority(16, &mut streams);
2069        cycle_stream_priority(0, &mut streams);
2070        let walk_4: Vec<u64> = streams.writable().collect();
2071        cycle_stream_priority(4, &mut streams);
2072        cycle_stream_priority(24, &mut streams);
2073        cycle_stream_priority(8, &mut streams);
2074        let walk_5: Vec<u64> = streams.writable().collect();
2075        cycle_stream_priority(12, &mut streams);
2076        cycle_stream_priority(16, &mut streams);
2077        cycle_stream_priority(0, &mut streams);
2078        let walk_6: Vec<u64> = streams.writable().collect();
2079        cycle_stream_priority(36, &mut streams);
2080        cycle_stream_priority(24, &mut streams);
2081        cycle_stream_priority(8, &mut streams);
2082        let walk_7: Vec<u64> = streams.writable().collect();
2083        cycle_stream_priority(4, &mut streams);
2084        cycle_stream_priority(16, &mut streams);
2085        cycle_stream_priority(0, &mut streams);
2086        let walk_8: Vec<u64> = streams.writable().collect();
2087        cycle_stream_priority(12, &mut streams);
2088        cycle_stream_priority(24, &mut streams);
2089        cycle_stream_priority(8, &mut streams);
2090        let walk_9: Vec<u64> = streams.writable().collect();
2091        cycle_stream_priority(36, &mut streams);
2092        cycle_stream_priority(16, &mut streams);
2093        cycle_stream_priority(0, &mut streams);
2094
2095        assert_eq!(walk_1, vec![40, 4, 12, 36, 20, 28, 32, 16, 24, 0, 8]);
2096        assert_eq!(walk_2, vec![40, 12, 36, 4, 20, 28, 32, 24, 16, 8, 0]);
2097        assert_eq!(walk_3, vec![40, 36, 4, 12, 20, 28, 32, 16, 24, 0, 8]);
2098        assert_eq!(walk_4, vec![40, 4, 12, 36, 20, 28, 32, 24, 16, 8, 0]);
2099        assert_eq!(walk_5, vec![40, 12, 36, 4, 20, 28, 32, 16, 24, 0, 8]);
2100        assert_eq!(walk_6, vec![40, 36, 4, 12, 20, 28, 32, 24, 16, 8, 0]);
2101        assert_eq!(walk_7, vec![40, 4, 12, 36, 20, 28, 32, 16, 24, 0, 8]);
2102        assert_eq!(walk_8, vec![40, 12, 36, 4, 20, 28, 32, 24, 16, 8, 0]);
2103        assert_eq!(walk_9, vec![40, 36, 4, 12, 20, 28, 32, 16, 24, 0, 8]);
2104
2105        // Removing streams doesn't break expected ordering.
2106        streams.collect(20, true);
2107
2108        let walk_10: Vec<u64> = streams.writable().collect();
2109        assert_eq!(walk_10, vec![40, 4, 12, 36, 28, 32, 24, 16, 8, 0]);
2110
2111        // Adding streams doesn't break expected ordering.
2112        let stream = streams
2113            .get_or_create(44, &local_tp, &peer_tp, false, true)
2114            .unwrap();
2115
2116        stream.urgency = 20;
2117        stream.incremental = true;
2118
2119        let new_priority_key = Arc::new(StreamPriorityKey {
2120            urgency: stream.urgency,
2121            incremental: stream.incremental,
2122            id: 44,
2123            ..Default::default()
2124        });
2125
2126        let old_priority_key =
2127            std::mem::replace(&mut stream.priority_key, new_priority_key.clone());
2128
2129        streams.update_priority(&old_priority_key, &new_priority_key);
2130
2131        let walk_11: Vec<u64> = streams.writable().collect();
2132        assert_eq!(walk_11, vec![40, 4, 12, 36, 44, 28, 32, 24, 16, 8, 0]);
2133    }
2134
2135    #[test]
2136    fn priority_tree_dupes() {
2137        let mut prioritized_writable: RBTree<StreamWritablePriorityAdapter> =
2138            Default::default();
2139
2140        for id in [0, 4, 8, 12] {
2141            let s = Arc::new(StreamPriorityKey {
2142                urgency: 0,
2143                incremental: false,
2144                id,
2145                ..Default::default()
2146            });
2147
2148            prioritized_writable.insert(s);
2149        }
2150
2151        let walk_1: Vec<u64> =
2152            prioritized_writable.iter().map(|s| s.id).collect();
2153        assert_eq!(walk_1, vec![0, 4, 8, 12]);
2154
2155        // Default keys could cause duplicate entries, this is normally protected
2156        // against via StreamMap.
2157        for id in [0, 4, 8, 12] {
2158            let s = Arc::new(StreamPriorityKey {
2159                urgency: 0,
2160                incremental: false,
2161                id,
2162                ..Default::default()
2163            });
2164
2165            prioritized_writable.insert(s);
2166        }
2167
2168        let walk_2: Vec<u64> =
2169            prioritized_writable.iter().map(|s| s.id).collect();
2170        assert_eq!(walk_2, vec![0, 0, 4, 4, 8, 8, 12, 12]);
2171    }
2172}
2173
2174mod recv_buf;
2175mod send_buf;