quiche/stream/
mod.rs

1// Copyright (C) 2018-2019, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27use std::cmp;
28
29use std::sync::Arc;
30
31use std::collections::hash_map;
32use std::collections::HashMap;
33use std::collections::HashSet;
34
35use intrusive_collections::intrusive_adapter;
36use intrusive_collections::KeyAdapter;
37use intrusive_collections::RBTree;
38use intrusive_collections::RBTreeAtomicLink;
39
40use smallvec::SmallVec;
41
42use crate::Error;
43use crate::Result;
44
45const DEFAULT_URGENCY: u8 = 127;
46
47// The default size of the receiver stream flow control window.
48const DEFAULT_STREAM_WINDOW: u64 = 32 * 1024;
49
50/// The maximum size of the receiver stream flow control window.
51pub const MAX_STREAM_WINDOW: u64 = 16 * 1024 * 1024;
52
53/// A simple no-op hasher for Stream IDs.
54///
55/// The QUIC protocol and quiche library guarantees stream ID uniqueness, so
56/// we can save effort by avoiding using a more complicated algorithm.
57#[derive(Default)]
58pub struct StreamIdHasher {
59    id: u64,
60}
61
62impl std::hash::Hasher for StreamIdHasher {
63    #[inline]
64    fn finish(&self) -> u64 {
65        self.id
66    }
67
68    #[inline]
69    fn write_u64(&mut self, id: u64) {
70        self.id = id;
71    }
72
73    #[inline]
74    fn write(&mut self, _: &[u8]) {
75        // We need a default write() for the trait but stream IDs will always
76        // be a u64 so we just delegate to write_u64.
77        unimplemented!()
78    }
79}
80
81type BuildStreamIdHasher = std::hash::BuildHasherDefault<StreamIdHasher>;
82
83pub type StreamIdHashMap<V> = HashMap<u64, V, BuildStreamIdHasher>;
84pub type StreamIdHashSet = HashSet<u64, BuildStreamIdHasher>;
85
86/// Keeps track of QUIC streams and enforces stream limits.
87#[derive(Default)]
88pub struct StreamMap {
89    /// Map of streams indexed by stream ID.
90    streams: StreamIdHashMap<Stream>,
91
92    /// Set of streams that were completed and garbage collected.
93    ///
94    /// Instead of keeping the full stream state forever, we collect completed
95    /// streams to save memory, but we still need to keep track of previously
96    /// created streams, to prevent peers from re-creating them.
97    collected: StreamIdHashSet,
98
99    /// Peer's maximum bidirectional stream count limit.
100    peer_max_streams_bidi: u64,
101
102    /// Peer's maximum unidirectional stream count limit.
103    peer_max_streams_uni: u64,
104
105    /// The total number of bidirectional streams opened by the peer.
106    peer_opened_streams_bidi: u64,
107
108    /// The total number of unidirectional streams opened by the peer.
109    peer_opened_streams_uni: u64,
110
111    /// Local maximum bidirectional stream count limit.
112    local_max_streams_bidi: u64,
113    local_max_streams_bidi_next: u64,
114
115    /// Local maximum unidirectional stream count limit.
116    local_max_streams_uni: u64,
117    local_max_streams_uni_next: u64,
118
119    /// The total number of bidirectional streams opened by the local endpoint.
120    local_opened_streams_bidi: u64,
121
122    /// The total number of unidirectional streams opened by the local endpoint.
123    local_opened_streams_uni: u64,
124
125    /// Queue of stream IDs corresponding to streams that have buffered data
126    /// ready to be sent to the peer. This also implies that the stream has
127    /// enough flow control credits to send at least some of that data.
128    flushable: RBTree<StreamFlushablePriorityAdapter>,
129
130    /// Set of stream IDs corresponding to streams that have outstanding data
131    /// to read. This is used to generate a `StreamIter` of streams without
132    /// having to iterate over the full list of streams.
133    pub readable: RBTree<StreamReadablePriorityAdapter>,
134
135    /// Set of stream IDs corresponding to streams that have enough flow control
136    /// capacity to be written to, and is not finished. This is used to generate
137    /// a `StreamIter` of streams without having to iterate over the full list
138    /// of streams.
139    pub writable: RBTree<StreamWritablePriorityAdapter>,
140
141    /// Set of stream IDs corresponding to streams that are almost out of flow
142    /// control credit and need to send MAX_STREAM_DATA. This is used to
143    /// generate a `StreamIter` of streams without having to iterate over the
144    /// full list of streams.
145    almost_full: StreamIdHashSet,
146
147    /// Set of stream IDs corresponding to streams that are blocked. The value
148    /// of the map elements represents the offset of the stream at which the
149    /// blocking occurred.
150    blocked: StreamIdHashMap<u64>,
151
152    /// Set of stream IDs corresponding to streams that are reset. The value
153    /// of the map elements is a tuple of the error code and final size values
154    /// to include in the RESET_STREAM frame.
155    reset: StreamIdHashMap<(u64, u64)>,
156
157    /// Set of stream IDs corresponding to streams that are shutdown on the
158    /// receive side, and need to send a STOP_SENDING frame. The value of the
159    /// map elements is the error code to include in the STOP_SENDING frame.
160    stopped: StreamIdHashMap<u64>,
161
162    /// The maximum size of a stream window.
163    max_stream_window: u64,
164}
165
166impl StreamMap {
167    pub fn new(
168        max_streams_bidi: u64, max_streams_uni: u64, max_stream_window: u64,
169    ) -> StreamMap {
170        StreamMap {
171            local_max_streams_bidi: max_streams_bidi,
172            local_max_streams_bidi_next: max_streams_bidi,
173
174            local_max_streams_uni: max_streams_uni,
175            local_max_streams_uni_next: max_streams_uni,
176
177            max_stream_window,
178
179            ..StreamMap::default()
180        }
181    }
182
183    /// Returns the stream with the given ID if it exists.
184    pub fn get(&self, id: u64) -> Option<&Stream> {
185        self.streams.get(&id)
186    }
187
188    /// Returns the mutable stream with the given ID if it exists.
189    pub fn get_mut(&mut self, id: u64) -> Option<&mut Stream> {
190        self.streams.get_mut(&id)
191    }
192
193    /// Returns the mutable stream with the given ID if it exists, or creates
194    /// a new one otherwise.
195    ///
196    /// The `local` parameter indicates whether the stream's creation was
197    /// requested by the local application rather than the peer, and is
198    /// used to validate the requested stream ID, and to select the initial
199    /// flow control values from the local and remote transport parameters
200    /// (also passed as arguments).
201    ///
202    /// This also takes care of enforcing both local and the peer's stream
203    /// count limits. If one of these limits is violated, the `StreamLimit`
204    /// error is returned.
205    pub(crate) fn get_or_create(
206        &mut self, id: u64, local_params: &crate::TransportParams,
207        peer_params: &crate::TransportParams, local: bool, is_server: bool,
208    ) -> Result<&mut Stream> {
209        let (stream, is_new_and_writable) = match self.streams.entry(id) {
210            hash_map::Entry::Vacant(v) => {
211                // Stream has already been closed and garbage collected.
212                if self.collected.contains(&id) {
213                    return Err(Error::Done);
214                }
215
216                if local != is_local(id, is_server) {
217                    return Err(Error::InvalidStreamState(id));
218                }
219
220                let (max_rx_data, max_tx_data) = match (local, is_bidi(id)) {
221                    // Locally-initiated bidirectional stream.
222                    (true, true) => (
223                        local_params.initial_max_stream_data_bidi_local,
224                        peer_params.initial_max_stream_data_bidi_remote,
225                    ),
226
227                    // Locally-initiated unidirectional stream.
228                    (true, false) => (0, peer_params.initial_max_stream_data_uni),
229
230                    // Remotely-initiated bidirectional stream.
231                    (false, true) => (
232                        local_params.initial_max_stream_data_bidi_remote,
233                        peer_params.initial_max_stream_data_bidi_local,
234                    ),
235
236                    // Remotely-initiated unidirectional stream.
237                    (false, false) =>
238                        (local_params.initial_max_stream_data_uni, 0),
239                };
240
241                // The two least significant bits from a stream id identify the
242                // type of stream. Truncate those bits to get the sequence for
243                // that stream type.
244                let stream_sequence = id >> 2;
245
246                // Enforce stream count limits.
247                match (is_local(id, is_server), is_bidi(id)) {
248                    (true, true) => {
249                        let n = std::cmp::max(
250                            self.local_opened_streams_bidi,
251                            stream_sequence + 1,
252                        );
253
254                        if n > self.peer_max_streams_bidi {
255                            return Err(Error::StreamLimit);
256                        }
257
258                        self.local_opened_streams_bidi = n;
259                    },
260
261                    (true, false) => {
262                        let n = std::cmp::max(
263                            self.local_opened_streams_uni,
264                            stream_sequence + 1,
265                        );
266
267                        if n > self.peer_max_streams_uni {
268                            return Err(Error::StreamLimit);
269                        }
270
271                        self.local_opened_streams_uni = n;
272                    },
273
274                    (false, true) => {
275                        let n = std::cmp::max(
276                            self.peer_opened_streams_bidi,
277                            stream_sequence + 1,
278                        );
279
280                        if n > self.local_max_streams_bidi {
281                            return Err(Error::StreamLimit);
282                        }
283
284                        self.peer_opened_streams_bidi = n;
285                    },
286
287                    (false, false) => {
288                        let n = std::cmp::max(
289                            self.peer_opened_streams_uni,
290                            stream_sequence + 1,
291                        );
292
293                        if n > self.local_max_streams_uni {
294                            return Err(Error::StreamLimit);
295                        }
296
297                        self.peer_opened_streams_uni = n;
298                    },
299                };
300
301                let s = Stream::new(
302                    id,
303                    max_rx_data,
304                    max_tx_data,
305                    is_bidi(id),
306                    local,
307                    self.max_stream_window,
308                );
309
310                let is_writable = s.is_writable();
311
312                (v.insert(s), is_writable)
313            },
314
315            hash_map::Entry::Occupied(v) => (v.into_mut(), false),
316        };
317
318        // Newly created stream might already be writable due to initial flow
319        // control limits.
320        if is_new_and_writable {
321            self.writable.insert(Arc::clone(&stream.priority_key));
322        }
323
324        Ok(stream)
325    }
326
327    /// Adds the stream ID to the readable streams set.
328    ///
329    /// If the stream was already in the list, this does nothing.
330    pub fn insert_readable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
331        if !priority_key.readable.is_linked() {
332            self.readable.insert(Arc::clone(priority_key));
333        }
334    }
335
336    /// Removes the stream ID from the readable streams set.
337    pub fn remove_readable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
338        if !priority_key.readable.is_linked() {
339            return;
340        }
341
342        let mut c = {
343            let ptr = Arc::as_ptr(priority_key);
344            unsafe { self.readable.cursor_mut_from_ptr(ptr) }
345        };
346
347        c.remove();
348    }
349
350    /// Adds the stream ID to the writable streams set.
351    ///
352    /// This should also be called anytime a new stream is created, in addition
353    /// to when an existing stream becomes writable.
354    ///
355    /// If the stream was already in the list, this does nothing.
356    pub fn insert_writable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
357        if !priority_key.writable.is_linked() {
358            self.writable.insert(Arc::clone(priority_key));
359        }
360    }
361
362    /// Removes the stream ID from the writable streams set.
363    ///
364    /// This should also be called anytime an existing stream stops being
365    /// writable.
366    pub fn remove_writable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
367        if !priority_key.writable.is_linked() {
368            return;
369        }
370
371        let mut c = {
372            let ptr = Arc::as_ptr(priority_key);
373            unsafe { self.writable.cursor_mut_from_ptr(ptr) }
374        };
375
376        c.remove();
377    }
378
379    /// Adds the stream ID to the flushable streams set.
380    ///
381    /// If the stream was already in the list, this does nothing.
382    pub fn insert_flushable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
383        if !priority_key.flushable.is_linked() {
384            self.flushable.insert(Arc::clone(priority_key));
385        }
386    }
387
388    /// Removes the stream ID from the flushable streams set.
389    pub fn remove_flushable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
390        if !priority_key.flushable.is_linked() {
391            return;
392        }
393
394        let mut c = {
395            let ptr = Arc::as_ptr(priority_key);
396            unsafe { self.flushable.cursor_mut_from_ptr(ptr) }
397        };
398
399        c.remove();
400    }
401
402    pub fn peek_flushable(&self) -> Option<Arc<StreamPriorityKey>> {
403        self.flushable.front().clone_pointer()
404    }
405
406    /// Updates the priorities of a stream.
407    pub fn update_priority(
408        &mut self, old: &Arc<StreamPriorityKey>, new: &Arc<StreamPriorityKey>,
409    ) {
410        if old.readable.is_linked() {
411            self.remove_readable(old);
412            self.readable.insert(Arc::clone(new));
413        }
414
415        if old.writable.is_linked() {
416            self.remove_writable(old);
417            self.writable.insert(Arc::clone(new));
418        }
419
420        if old.flushable.is_linked() {
421            self.remove_flushable(old);
422            self.flushable.insert(Arc::clone(new));
423        }
424    }
425
426    /// Adds the stream ID to the almost full streams set.
427    ///
428    /// If the stream was already in the list, this does nothing.
429    pub fn insert_almost_full(&mut self, stream_id: u64) {
430        self.almost_full.insert(stream_id);
431    }
432
433    /// Removes the stream ID from the almost full streams set.
434    pub fn remove_almost_full(&mut self, stream_id: u64) {
435        self.almost_full.remove(&stream_id);
436    }
437
438    /// Adds the stream ID to the blocked streams set with the
439    /// given offset value.
440    ///
441    /// If the stream was already in the list, this does nothing.
442    pub fn insert_blocked(&mut self, stream_id: u64, off: u64) {
443        self.blocked.insert(stream_id, off);
444    }
445
446    /// Removes the stream ID from the blocked streams set.
447    pub fn remove_blocked(&mut self, stream_id: u64) {
448        self.blocked.remove(&stream_id);
449    }
450
451    /// Adds the stream ID to the reset streams set with the
452    /// given error code and final size values.
453    ///
454    /// If the stream was already in the list, this does nothing.
455    pub fn insert_reset(
456        &mut self, stream_id: u64, error_code: u64, final_size: u64,
457    ) {
458        self.reset.insert(stream_id, (error_code, final_size));
459    }
460
461    /// Removes the stream ID from the reset streams set.
462    pub fn remove_reset(&mut self, stream_id: u64) {
463        self.reset.remove(&stream_id);
464    }
465
466    /// Adds the stream ID to the stopped streams set with the
467    /// given error code.
468    ///
469    /// If the stream was already in the list, this does nothing.
470    pub fn insert_stopped(&mut self, stream_id: u64, error_code: u64) {
471        self.stopped.insert(stream_id, error_code);
472    }
473
474    /// Removes the stream ID from the stopped streams set.
475    pub fn remove_stopped(&mut self, stream_id: u64) {
476        self.stopped.remove(&stream_id);
477    }
478
479    /// Updates the peer's maximum bidirectional stream count limit.
480    pub fn update_peer_max_streams_bidi(&mut self, v: u64) {
481        self.peer_max_streams_bidi = cmp::max(self.peer_max_streams_bidi, v);
482    }
483
484    /// Updates the peer's maximum unidirectional stream count limit.
485    pub fn update_peer_max_streams_uni(&mut self, v: u64) {
486        self.peer_max_streams_uni = cmp::max(self.peer_max_streams_uni, v);
487    }
488
489    /// Commits the new max_streams_bidi limit.
490    pub fn update_max_streams_bidi(&mut self) {
491        self.local_max_streams_bidi = self.local_max_streams_bidi_next;
492    }
493
494    /// Returns the current max_streams_bidi limit.
495    pub fn max_streams_bidi(&self) -> u64 {
496        self.local_max_streams_bidi
497    }
498
499    /// Returns the new max_streams_bidi limit.
500    pub fn max_streams_bidi_next(&mut self) -> u64 {
501        self.local_max_streams_bidi_next
502    }
503
504    /// Commits the new max_streams_uni limit.
505    pub fn update_max_streams_uni(&mut self) {
506        self.local_max_streams_uni = self.local_max_streams_uni_next;
507    }
508
509    /// Returns the new max_streams_uni limit.
510    pub fn max_streams_uni_next(&mut self) -> u64 {
511        self.local_max_streams_uni_next
512    }
513
514    /// Returns the number of bidirectional streams that can be created
515    /// before the peer's stream count limit is reached.
516    pub fn peer_streams_left_bidi(&self) -> u64 {
517        self.peer_max_streams_bidi - self.local_opened_streams_bidi
518    }
519
520    /// Returns the number of unidirectional streams that can be created
521    /// before the peer's stream count limit is reached.
522    pub fn peer_streams_left_uni(&self) -> u64 {
523        self.peer_max_streams_uni - self.local_opened_streams_uni
524    }
525
526    /// Drops completed stream.
527    ///
528    /// This should only be called when Stream::is_complete() returns true for
529    /// the given stream.
530    pub fn collect(&mut self, stream_id: u64, local: bool) {
531        if !local {
532            // If the stream was created by the peer, give back a max streams
533            // credit.
534            if is_bidi(stream_id) {
535                self.local_max_streams_bidi_next =
536                    self.local_max_streams_bidi_next.saturating_add(1);
537            } else {
538                self.local_max_streams_uni_next =
539                    self.local_max_streams_uni_next.saturating_add(1);
540            }
541        }
542
543        let s = self.streams.remove(&stream_id).unwrap();
544
545        self.remove_readable(&s.priority_key);
546
547        self.remove_writable(&s.priority_key);
548
549        self.remove_flushable(&s.priority_key);
550
551        self.collected.insert(stream_id);
552    }
553
554    /// Creates an iterator over streams that have outstanding data to read.
555    pub fn readable(&self) -> StreamIter {
556        StreamIter {
557            streams: self.readable.iter().map(|s| s.id).collect(),
558            index: 0,
559        }
560    }
561
562    /// Creates an iterator over streams that can be written to.
563    pub fn writable(&self) -> StreamIter {
564        StreamIter {
565            streams: self.writable.iter().map(|s| s.id).collect(),
566            index: 0,
567        }
568    }
569
570    /// Creates an iterator over streams that need to send MAX_STREAM_DATA.
571    pub fn almost_full(&self) -> StreamIter {
572        StreamIter::from(&self.almost_full)
573    }
574
575    /// Creates an iterator over streams that need to send STREAM_DATA_BLOCKED.
576    pub fn blocked(&self) -> hash_map::Iter<u64, u64> {
577        self.blocked.iter()
578    }
579
580    /// Creates an iterator over streams that need to send RESET_STREAM.
581    pub fn reset(&self) -> hash_map::Iter<u64, (u64, u64)> {
582        self.reset.iter()
583    }
584
585    /// Creates an iterator over streams that need to send STOP_SENDING.
586    pub fn stopped(&self) -> hash_map::Iter<u64, u64> {
587        self.stopped.iter()
588    }
589
590    /// Returns true if the stream has been collected.
591    pub fn is_collected(&self, stream_id: u64) -> bool {
592        self.collected.contains(&stream_id)
593    }
594
595    /// Returns true if there are any streams that have data to write.
596    pub fn has_flushable(&self) -> bool {
597        !self.flushable.is_empty()
598    }
599
600    /// Returns true if there are any streams that have data to read.
601    pub fn has_readable(&self) -> bool {
602        !self.readable.is_empty()
603    }
604
605    /// Returns true if there are any streams that need to update the local
606    /// flow control limit.
607    pub fn has_almost_full(&self) -> bool {
608        !self.almost_full.is_empty()
609    }
610
611    /// Returns true if there are any streams that are blocked.
612    pub fn has_blocked(&self) -> bool {
613        !self.blocked.is_empty()
614    }
615
616    /// Returns true if there are any streams that are reset.
617    pub fn has_reset(&self) -> bool {
618        !self.reset.is_empty()
619    }
620
621    /// Returns true if there are any streams that need to send STOP_SENDING.
622    pub fn has_stopped(&self) -> bool {
623        !self.stopped.is_empty()
624    }
625
626    /// Returns true if the max bidirectional streams count needs to be updated
627    /// by sending a MAX_STREAMS frame to the peer.
628    pub fn should_update_max_streams_bidi(&self) -> bool {
629        self.local_max_streams_bidi_next != self.local_max_streams_bidi &&
630            self.local_max_streams_bidi_next / 2 >
631                self.local_max_streams_bidi - self.peer_opened_streams_bidi
632    }
633
634    /// Returns true if the max unidirectional streams count needs to be updated
635    /// by sending a MAX_STREAMS frame to the peer.
636    pub fn should_update_max_streams_uni(&self) -> bool {
637        self.local_max_streams_uni_next != self.local_max_streams_uni &&
638            self.local_max_streams_uni_next / 2 >
639                self.local_max_streams_uni - self.peer_opened_streams_uni
640    }
641
642    /// Returns the number of active streams in the map.
643    #[cfg(test)]
644    pub fn len(&self) -> usize {
645        self.streams.len()
646    }
647}
648
649/// A QUIC stream.
650pub struct Stream {
651    /// Receive-side stream buffer.
652    pub recv: recv_buf::RecvBuf,
653
654    /// Send-side stream buffer.
655    pub send: send_buf::SendBuf,
656
657    pub send_lowat: usize,
658
659    /// Whether the stream is bidirectional.
660    pub bidi: bool,
661
662    /// Whether the stream was created by the local endpoint.
663    pub local: bool,
664
665    /// The stream's urgency (lower is better). Default is `DEFAULT_URGENCY`.
666    pub urgency: u8,
667
668    /// Whether the stream can be flushed incrementally. Default is `true`.
669    pub incremental: bool,
670
671    pub priority_key: Arc<StreamPriorityKey>,
672}
673
674impl Stream {
675    /// Creates a new stream with the given flow control limits.
676    pub fn new(
677        id: u64, max_rx_data: u64, max_tx_data: u64, bidi: bool, local: bool,
678        max_window: u64,
679    ) -> Stream {
680        let priority_key = Arc::new(StreamPriorityKey {
681            id,
682            ..Default::default()
683        });
684
685        Stream {
686            recv: recv_buf::RecvBuf::new(max_rx_data, max_window),
687            send: send_buf::SendBuf::new(max_tx_data),
688            send_lowat: 1,
689            bidi,
690            local,
691            urgency: priority_key.urgency,
692            incremental: priority_key.incremental,
693            priority_key,
694        }
695    }
696
697    /// Returns true if the stream has data to read.
698    pub fn is_readable(&self) -> bool {
699        self.recv.ready()
700    }
701
702    /// Returns true if the stream has enough flow control capacity to be
703    /// written to, and is not finished.
704    pub fn is_writable(&self) -> bool {
705        !self.send.is_shutdown() &&
706            !self.send.is_fin() &&
707            (self.send.off_back() + self.send_lowat as u64) <
708                self.send.max_off()
709    }
710
711    /// Returns true if the stream has data to send and is allowed to send at
712    /// least some of it.
713    pub fn is_flushable(&self) -> bool {
714        let off_front = self.send.off_front();
715
716        !self.send.is_empty() &&
717            off_front < self.send.off_back() &&
718            off_front < self.send.max_off()
719    }
720
721    /// Returns true if the stream is complete.
722    ///
723    /// For bidirectional streams this happens when both the receive and send
724    /// sides are complete. That is when all incoming data has been read by the
725    /// application, and when all outgoing data has been acked by the peer.
726    ///
727    /// For unidirectional streams this happens when either the receive or send
728    /// side is complete, depending on whether the stream was created locally
729    /// or not.
730    pub fn is_complete(&self) -> bool {
731        match (self.bidi, self.local) {
732            // For bidirectional streams we need to check both receive and send
733            // sides for completion.
734            (true, _) => self.recv.is_fin() && self.send.is_complete(),
735
736            // For unidirectional streams generated locally, we only need to
737            // check the send side for completion.
738            (false, true) => self.send.is_complete(),
739
740            // For unidirectional streams generated by the peer, we only need
741            // to check the receive side for completion.
742            (false, false) => self.recv.is_fin(),
743        }
744    }
745}
746
747/// Returns true if the stream was created locally.
748pub fn is_local(stream_id: u64, is_server: bool) -> bool {
749    (stream_id & 0x1) == (is_server as u64)
750}
751
752/// Returns true if the stream is bidirectional.
753pub fn is_bidi(stream_id: u64) -> bool {
754    (stream_id & 0x2) == 0
755}
756
757#[derive(Clone, Debug)]
758pub struct StreamPriorityKey {
759    pub urgency: u8,
760    pub incremental: bool,
761    pub id: u64,
762
763    pub readable: RBTreeAtomicLink,
764    pub writable: RBTreeAtomicLink,
765    pub flushable: RBTreeAtomicLink,
766}
767
768impl Default for StreamPriorityKey {
769    fn default() -> Self {
770        Self {
771            urgency: DEFAULT_URGENCY,
772            incremental: true,
773            id: Default::default(),
774            readable: Default::default(),
775            writable: Default::default(),
776            flushable: Default::default(),
777        }
778    }
779}
780
781impl PartialEq for StreamPriorityKey {
782    fn eq(&self, other: &Self) -> bool {
783        self.id == other.id
784    }
785}
786
787impl Eq for StreamPriorityKey {}
788
789impl PartialOrd for StreamPriorityKey {
790    // Priority ordering is complex, disable Clippy warning.
791    #[allow(clippy::non_canonical_partial_ord_impl)]
792    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
793        // Ignore priority if ID matches.
794        if self.id == other.id {
795            return Some(std::cmp::Ordering::Equal);
796        }
797
798        // First, order by urgency...
799        if self.urgency != other.urgency {
800            return self.urgency.partial_cmp(&other.urgency);
801        }
802
803        // ...when the urgency is the same, and both are not incremental, order
804        // by stream ID...
805        if !self.incremental && !other.incremental {
806            return self.id.partial_cmp(&other.id);
807        }
808
809        // ...non-incremental takes priority over incremental...
810        if self.incremental && !other.incremental {
811            return Some(std::cmp::Ordering::Greater);
812        }
813        if !self.incremental && other.incremental {
814            return Some(std::cmp::Ordering::Less);
815        }
816
817        // ...finally, when both are incremental, `other` takes precedence (so
818        // `self` is always sorted after other same-urgency incremental
819        // entries).
820        Some(std::cmp::Ordering::Greater)
821    }
822}
823
824impl Ord for StreamPriorityKey {
825    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
826        // `partial_cmp()` never returns `None`, so this should be safe.
827        self.partial_cmp(other).unwrap()
828    }
829}
830
831intrusive_adapter!(pub StreamWritablePriorityAdapter = Arc<StreamPriorityKey>: StreamPriorityKey { writable: RBTreeAtomicLink });
832
833impl KeyAdapter<'_> for StreamWritablePriorityAdapter {
834    type Key = StreamPriorityKey;
835
836    fn get_key(&self, s: &StreamPriorityKey) -> Self::Key {
837        s.clone()
838    }
839}
840
841intrusive_adapter!(pub StreamReadablePriorityAdapter = Arc<StreamPriorityKey>: StreamPriorityKey { readable: RBTreeAtomicLink });
842
843impl KeyAdapter<'_> for StreamReadablePriorityAdapter {
844    type Key = StreamPriorityKey;
845
846    fn get_key(&self, s: &StreamPriorityKey) -> Self::Key {
847        s.clone()
848    }
849}
850
851intrusive_adapter!(pub StreamFlushablePriorityAdapter = Arc<StreamPriorityKey>: StreamPriorityKey { flushable: RBTreeAtomicLink });
852
853impl KeyAdapter<'_> for StreamFlushablePriorityAdapter {
854    type Key = StreamPriorityKey;
855
856    fn get_key(&self, s: &StreamPriorityKey) -> Self::Key {
857        s.clone()
858    }
859}
860
861/// An iterator over QUIC streams.
862#[derive(Default)]
863pub struct StreamIter {
864    streams: SmallVec<[u64; 8]>,
865    index: usize,
866}
867
868impl StreamIter {
869    #[inline]
870    fn from(streams: &StreamIdHashSet) -> Self {
871        StreamIter {
872            streams: streams.iter().copied().collect(),
873            index: 0,
874        }
875    }
876}
877
878impl Iterator for StreamIter {
879    type Item = u64;
880
881    #[inline]
882    fn next(&mut self) -> Option<Self::Item> {
883        let v = self.streams.get(self.index)?;
884        self.index += 1;
885        Some(*v)
886    }
887}
888
889impl ExactSizeIterator for StreamIter {
890    #[inline]
891    fn len(&self) -> usize {
892        self.streams.len() - self.index
893    }
894}
895
896/// Buffer holding data at a specific offset.
897///
898/// The data is stored in a `Vec<u8>` in such a way that it can be shared
899/// between multiple `RangeBuf` objects.
900///
901/// Each `RangeBuf` will have its own view of that buffer, where the `start`
902/// value indicates the initial offset within the `Vec`, and `len` indicates the
903/// number of bytes, starting from `start` that are included.
904///
905/// In addition, `pos` indicates the current offset within the `Vec`, starting
906/// from the very beginning of the `Vec`.
907///
908/// Finally, `off` is the starting offset for the specific `RangeBuf` within the
909/// stream the buffer belongs to.
910#[derive(Clone, Debug, Default, Eq)]
911pub struct RangeBuf {
912    /// The internal buffer holding the data.
913    ///
914    /// To avoid needless allocations when a RangeBuf is split, this field is
915    /// reference-counted and can be shared between multiple RangeBuf objects,
916    /// and sliced using the `start` and `len` values.
917    data: Arc<Vec<u8>>,
918
919    /// The initial offset within the internal buffer.
920    start: usize,
921
922    /// The current offset within the internal buffer.
923    pos: usize,
924
925    /// The number of bytes in the buffer, from the initial offset.
926    len: usize,
927
928    /// The offset of the buffer within a stream.
929    off: u64,
930
931    /// Whether this contains the final byte in the stream.
932    fin: bool,
933}
934
935impl RangeBuf {
936    /// Creates a new `RangeBuf` from the given slice.
937    pub fn from(buf: &[u8], off: u64, fin: bool) -> RangeBuf {
938        RangeBuf {
939            data: Arc::new(Vec::from(buf)),
940            start: 0,
941            pos: 0,
942            len: buf.len(),
943            off,
944            fin,
945        }
946    }
947
948    /// Returns whether `self` holds the final offset in the stream.
949    pub fn fin(&self) -> bool {
950        self.fin
951    }
952
953    /// Returns the starting offset of `self`.
954    pub fn off(&self) -> u64 {
955        (self.off - self.start as u64) + self.pos as u64
956    }
957
958    /// Returns the final offset of `self`.
959    pub fn max_off(&self) -> u64 {
960        self.off() + self.len() as u64
961    }
962
963    /// Returns the length of `self`.
964    pub fn len(&self) -> usize {
965        self.len - (self.pos - self.start)
966    }
967
968    /// Returns true if `self` has a length of zero bytes.
969    pub fn is_empty(&self) -> bool {
970        self.len() == 0
971    }
972
973    /// Consumes the starting `count` bytes of `self`.
974    pub fn consume(&mut self, count: usize) {
975        self.pos += count;
976    }
977
978    /// Splits the buffer into two at the given index.
979    pub fn split_off(&mut self, at: usize) -> RangeBuf {
980        assert!(
981            at <= self.len,
982            "`at` split index (is {}) should be <= len (is {})",
983            at,
984            self.len
985        );
986
987        let buf = RangeBuf {
988            data: self.data.clone(),
989            start: self.start + at,
990            pos: cmp::max(self.pos, self.start + at),
991            len: self.len - at,
992            off: self.off + at as u64,
993            fin: self.fin,
994        };
995
996        self.pos = cmp::min(self.pos, self.start + at);
997        self.len = at;
998        self.fin = false;
999
1000        buf
1001    }
1002}
1003
1004impl std::ops::Deref for RangeBuf {
1005    type Target = [u8];
1006
1007    fn deref(&self) -> &[u8] {
1008        &self.data[self.pos..self.start + self.len]
1009    }
1010}
1011
1012impl Ord for RangeBuf {
1013    fn cmp(&self, other: &RangeBuf) -> cmp::Ordering {
1014        // Invert ordering to implement min-heap.
1015        self.off.cmp(&other.off).reverse()
1016    }
1017}
1018
1019impl PartialOrd for RangeBuf {
1020    fn partial_cmp(&self, other: &RangeBuf) -> Option<cmp::Ordering> {
1021        Some(self.cmp(other))
1022    }
1023}
1024
1025impl PartialEq for RangeBuf {
1026    fn eq(&self, other: &RangeBuf) -> bool {
1027        self.off == other.off
1028    }
1029}
1030
1031#[cfg(test)]
1032mod tests {
1033    use super::*;
1034
1035    #[test]
1036    fn recv_flow_control() {
1037        let mut stream = Stream::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1038        assert!(!stream.recv.almost_full());
1039
1040        let mut buf = [0; 32];
1041
1042        let first = RangeBuf::from(b"hello", 0, false);
1043        let second = RangeBuf::from(b"world", 5, false);
1044        let third = RangeBuf::from(b"something", 10, false);
1045
1046        assert_eq!(stream.recv.write(second), Ok(()));
1047        assert_eq!(stream.recv.write(first), Ok(()));
1048        assert!(!stream.recv.almost_full());
1049
1050        assert_eq!(stream.recv.write(third), Err(Error::FlowControl));
1051
1052        let (len, fin) = stream.recv.emit(&mut buf).unwrap();
1053        assert_eq!(&buf[..len], b"helloworld");
1054        assert!(!fin);
1055
1056        assert!(stream.recv.almost_full());
1057
1058        stream.recv.update_max_data(std::time::Instant::now());
1059        assert_eq!(stream.recv.max_data_next(), 25);
1060        assert!(!stream.recv.almost_full());
1061
1062        let third = RangeBuf::from(b"something", 10, false);
1063        assert_eq!(stream.recv.write(third), Ok(()));
1064    }
1065
1066    #[test]
1067    fn recv_past_fin() {
1068        let mut stream = Stream::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1069        assert!(!stream.recv.almost_full());
1070
1071        let first = RangeBuf::from(b"hello", 0, true);
1072        let second = RangeBuf::from(b"world", 5, false);
1073
1074        assert_eq!(stream.recv.write(first), Ok(()));
1075        assert_eq!(stream.recv.write(second), Err(Error::FinalSize));
1076    }
1077
1078    #[test]
1079    fn recv_fin_dup() {
1080        let mut stream = Stream::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1081        assert!(!stream.recv.almost_full());
1082
1083        let first = RangeBuf::from(b"hello", 0, true);
1084        let second = RangeBuf::from(b"hello", 0, true);
1085
1086        assert_eq!(stream.recv.write(first), Ok(()));
1087        assert_eq!(stream.recv.write(second), Ok(()));
1088
1089        let mut buf = [0; 32];
1090
1091        let (len, fin) = stream.recv.emit(&mut buf).unwrap();
1092        assert_eq!(&buf[..len], b"hello");
1093        assert!(fin);
1094    }
1095
1096    #[test]
1097    fn recv_fin_change() {
1098        let mut stream = Stream::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1099        assert!(!stream.recv.almost_full());
1100
1101        let first = RangeBuf::from(b"hello", 0, true);
1102        let second = RangeBuf::from(b"world", 5, true);
1103
1104        assert_eq!(stream.recv.write(second), Ok(()));
1105        assert_eq!(stream.recv.write(first), Err(Error::FinalSize));
1106    }
1107
1108    #[test]
1109    fn recv_fin_lower_than_received() {
1110        let mut stream = Stream::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1111        assert!(!stream.recv.almost_full());
1112
1113        let first = RangeBuf::from(b"hello", 0, true);
1114        let second = RangeBuf::from(b"world", 5, false);
1115
1116        assert_eq!(stream.recv.write(second), Ok(()));
1117        assert_eq!(stream.recv.write(first), Err(Error::FinalSize));
1118    }
1119
1120    #[test]
1121    fn recv_fin_flow_control() {
1122        let mut stream = Stream::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1123        assert!(!stream.recv.almost_full());
1124
1125        let mut buf = [0; 32];
1126
1127        let first = RangeBuf::from(b"hello", 0, false);
1128        let second = RangeBuf::from(b"world", 5, true);
1129
1130        assert_eq!(stream.recv.write(first), Ok(()));
1131        assert_eq!(stream.recv.write(second), Ok(()));
1132
1133        let (len, fin) = stream.recv.emit(&mut buf).unwrap();
1134        assert_eq!(&buf[..len], b"helloworld");
1135        assert!(fin);
1136
1137        assert!(!stream.recv.almost_full());
1138    }
1139
1140    #[test]
1141    fn recv_fin_reset_mismatch() {
1142        let mut stream = Stream::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1143        assert!(!stream.recv.almost_full());
1144
1145        let first = RangeBuf::from(b"hello", 0, true);
1146
1147        assert_eq!(stream.recv.write(first), Ok(()));
1148        assert_eq!(stream.recv.reset(0, 10), Err(Error::FinalSize));
1149    }
1150
1151    #[test]
1152    fn recv_reset_dup() {
1153        let mut stream = Stream::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1154        assert!(!stream.recv.almost_full());
1155
1156        let first = RangeBuf::from(b"hello", 0, false);
1157
1158        assert_eq!(stream.recv.write(first), Ok(()));
1159        assert_eq!(stream.recv.reset(0, 5), Ok(0));
1160        assert_eq!(stream.recv.reset(0, 5), Ok(0));
1161    }
1162
1163    #[test]
1164    fn recv_reset_change() {
1165        let mut stream = Stream::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1166        assert!(!stream.recv.almost_full());
1167
1168        let first = RangeBuf::from(b"hello", 0, false);
1169
1170        assert_eq!(stream.recv.write(first), Ok(()));
1171        assert_eq!(stream.recv.reset(0, 5), Ok(0));
1172        assert_eq!(stream.recv.reset(0, 10), Err(Error::FinalSize));
1173    }
1174
1175    #[test]
1176    fn recv_reset_lower_than_received() {
1177        let mut stream = Stream::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1178        assert!(!stream.recv.almost_full());
1179
1180        let first = RangeBuf::from(b"hello", 0, false);
1181
1182        assert_eq!(stream.recv.write(first), Ok(()));
1183        assert_eq!(stream.recv.reset(0, 4), Err(Error::FinalSize));
1184    }
1185
1186    #[test]
1187    fn send_flow_control() {
1188        let mut buf = [0; 25];
1189
1190        let mut stream = Stream::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1191
1192        let first = b"hello";
1193        let second = b"world";
1194        let third = b"something";
1195
1196        assert!(stream.send.write(first, false).is_ok());
1197        assert!(stream.send.write(second, false).is_ok());
1198        assert!(stream.send.write(third, false).is_ok());
1199
1200        assert_eq!(stream.send.off_front(), 0);
1201
1202        let (written, fin) = stream.send.emit(&mut buf[..25]).unwrap();
1203        assert_eq!(written, 15);
1204        assert!(!fin);
1205        assert_eq!(&buf[..written], b"helloworldsomet");
1206
1207        assert_eq!(stream.send.off_front(), 15);
1208
1209        let (written, fin) = stream.send.emit(&mut buf[..25]).unwrap();
1210        assert_eq!(written, 0);
1211        assert!(!fin);
1212        assert_eq!(&buf[..written], b"");
1213
1214        stream.send.retransmit(0, 15);
1215
1216        assert_eq!(stream.send.off_front(), 0);
1217
1218        let (written, fin) = stream.send.emit(&mut buf[..10]).unwrap();
1219        assert_eq!(written, 10);
1220        assert!(!fin);
1221        assert_eq!(&buf[..written], b"helloworld");
1222
1223        assert_eq!(stream.send.off_front(), 10);
1224
1225        let (written, fin) = stream.send.emit(&mut buf[..10]).unwrap();
1226        assert_eq!(written, 5);
1227        assert!(!fin);
1228        assert_eq!(&buf[..written], b"somet");
1229    }
1230
1231    #[test]
1232    fn send_past_fin() {
1233        let mut stream = Stream::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1234
1235        let first = b"hello";
1236        let second = b"world";
1237        let third = b"third";
1238
1239        assert_eq!(stream.send.write(first, false), Ok(5));
1240
1241        assert_eq!(stream.send.write(second, true), Ok(5));
1242        assert!(stream.send.is_fin());
1243
1244        assert_eq!(stream.send.write(third, false), Err(Error::FinalSize));
1245    }
1246
1247    #[test]
1248    fn send_fin_dup() {
1249        let mut stream = Stream::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1250
1251        assert_eq!(stream.send.write(b"hello", true), Ok(5));
1252        assert!(stream.send.is_fin());
1253
1254        assert_eq!(stream.send.write(b"", true), Ok(0));
1255        assert!(stream.send.is_fin());
1256    }
1257
1258    #[test]
1259    fn send_undo_fin() {
1260        let mut stream = Stream::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1261
1262        assert_eq!(stream.send.write(b"hello", true), Ok(5));
1263        assert!(stream.send.is_fin());
1264
1265        assert_eq!(
1266            stream.send.write(b"helloworld", true),
1267            Err(Error::FinalSize)
1268        );
1269    }
1270
1271    #[test]
1272    fn send_fin_max_data_match() {
1273        let mut buf = [0; 15];
1274
1275        let mut stream = Stream::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1276
1277        let slice = b"hellohellohello";
1278
1279        assert!(stream.send.write(slice, true).is_ok());
1280
1281        let (written, fin) = stream.send.emit(&mut buf[..15]).unwrap();
1282        assert_eq!(written, 15);
1283        assert!(fin);
1284        assert_eq!(&buf[..written], slice);
1285    }
1286
1287    #[test]
1288    fn send_fin_zero_length() {
1289        let mut buf = [0; 5];
1290
1291        let mut stream = Stream::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1292
1293        assert_eq!(stream.send.write(b"hello", false), Ok(5));
1294        assert_eq!(stream.send.write(b"", true), Ok(0));
1295        assert!(stream.send.is_fin());
1296
1297        let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1298        assert_eq!(written, 5);
1299        assert!(fin);
1300        assert_eq!(&buf[..written], b"hello");
1301    }
1302
1303    #[test]
1304    fn send_ack() {
1305        let mut buf = [0; 5];
1306
1307        let mut stream = Stream::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1308
1309        assert_eq!(stream.send.write(b"hello", false), Ok(5));
1310        assert_eq!(stream.send.write(b"world", false), Ok(5));
1311        assert_eq!(stream.send.write(b"", true), Ok(0));
1312        assert!(stream.send.is_fin());
1313
1314        assert_eq!(stream.send.off_front(), 0);
1315
1316        let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1317        assert_eq!(written, 5);
1318        assert!(!fin);
1319        assert_eq!(&buf[..written], b"hello");
1320
1321        stream.send.ack_and_drop(0, 5);
1322
1323        stream.send.retransmit(0, 5);
1324
1325        assert_eq!(stream.send.off_front(), 5);
1326
1327        let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1328        assert_eq!(written, 5);
1329        assert!(fin);
1330        assert_eq!(&buf[..written], b"world");
1331    }
1332
1333    #[test]
1334    fn send_ack_reordering() {
1335        let mut buf = [0; 5];
1336
1337        let mut stream = Stream::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1338
1339        assert_eq!(stream.send.write(b"hello", false), Ok(5));
1340        assert_eq!(stream.send.write(b"world", false), Ok(5));
1341        assert_eq!(stream.send.write(b"", true), Ok(0));
1342        assert!(stream.send.is_fin());
1343
1344        assert_eq!(stream.send.off_front(), 0);
1345
1346        let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1347        assert_eq!(written, 5);
1348        assert!(!fin);
1349        assert_eq!(&buf[..written], b"hello");
1350
1351        assert_eq!(stream.send.off_front(), 5);
1352
1353        let (written, fin) = stream.send.emit(&mut buf[..1]).unwrap();
1354        assert_eq!(written, 1);
1355        assert!(!fin);
1356        assert_eq!(&buf[..written], b"w");
1357
1358        stream.send.ack_and_drop(5, 1);
1359        stream.send.ack_and_drop(0, 5);
1360
1361        stream.send.retransmit(0, 5);
1362        stream.send.retransmit(5, 1);
1363
1364        assert_eq!(stream.send.off_front(), 6);
1365
1366        let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1367        assert_eq!(written, 4);
1368        assert!(fin);
1369        assert_eq!(&buf[..written], b"orld");
1370    }
1371
1372    #[test]
1373    fn recv_data_below_off() {
1374        let mut stream = Stream::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
1375
1376        let first = RangeBuf::from(b"hello", 0, false);
1377
1378        assert_eq!(stream.recv.write(first), Ok(()));
1379
1380        let mut buf = [0; 10];
1381
1382        let (len, fin) = stream.recv.emit(&mut buf).unwrap();
1383        assert_eq!(&buf[..len], b"hello");
1384        assert!(!fin);
1385
1386        let first = RangeBuf::from(b"elloworld", 1, true);
1387        assert_eq!(stream.recv.write(first), Ok(()));
1388
1389        let (len, fin) = stream.recv.emit(&mut buf).unwrap();
1390        assert_eq!(&buf[..len], b"world");
1391        assert!(fin);
1392    }
1393
1394    #[test]
1395    fn stream_complete() {
1396        let mut stream =
1397            Stream::new(0, 30, 30, true, true, DEFAULT_STREAM_WINDOW);
1398
1399        assert_eq!(stream.send.write(b"hello", false), Ok(5));
1400        assert_eq!(stream.send.write(b"world", false), Ok(5));
1401
1402        assert!(!stream.send.is_complete());
1403        assert!(!stream.send.is_fin());
1404
1405        assert_eq!(stream.send.write(b"", true), Ok(0));
1406
1407        assert!(!stream.send.is_complete());
1408        assert!(stream.send.is_fin());
1409
1410        let buf = RangeBuf::from(b"hello", 0, true);
1411        assert!(stream.recv.write(buf).is_ok());
1412        assert!(!stream.recv.is_fin());
1413
1414        stream.send.ack(6, 4);
1415        assert!(!stream.send.is_complete());
1416
1417        let mut buf = [0; 2];
1418        assert_eq!(stream.recv.emit(&mut buf), Ok((2, false)));
1419        assert!(!stream.recv.is_fin());
1420
1421        stream.send.ack(1, 5);
1422        assert!(!stream.send.is_complete());
1423
1424        stream.send.ack(0, 1);
1425        assert!(stream.send.is_complete());
1426
1427        assert!(!stream.is_complete());
1428
1429        let mut buf = [0; 3];
1430        assert_eq!(stream.recv.emit(&mut buf), Ok((3, true)));
1431        assert!(stream.recv.is_fin());
1432
1433        assert!(stream.is_complete());
1434    }
1435
1436    #[test]
1437    fn send_fin_zero_length_output() {
1438        let mut buf = [0; 5];
1439
1440        let mut stream = Stream::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
1441
1442        assert_eq!(stream.send.write(b"hello", false), Ok(5));
1443        assert_eq!(stream.send.off_front(), 0);
1444        assert!(!stream.send.is_fin());
1445
1446        let (written, fin) = stream.send.emit(&mut buf).unwrap();
1447        assert_eq!(written, 5);
1448        assert!(!fin);
1449        assert_eq!(&buf[..written], b"hello");
1450
1451        assert_eq!(stream.send.write(b"", true), Ok(0));
1452        assert!(stream.send.is_fin());
1453        assert_eq!(stream.send.off_front(), 5);
1454
1455        let (written, fin) = stream.send.emit(&mut buf).unwrap();
1456        assert_eq!(written, 0);
1457        assert!(fin);
1458        assert_eq!(&buf[..written], b"");
1459    }
1460
1461    fn stream_send_ready(stream: &Stream) -> bool {
1462        !stream.send.is_empty() &&
1463            stream.send.off_front() < stream.send.off_back()
1464    }
1465
1466    #[test]
1467    fn send_emit() {
1468        let mut buf = [0; 5];
1469
1470        let mut stream = Stream::new(0, 0, 20, true, true, DEFAULT_STREAM_WINDOW);
1471
1472        assert_eq!(stream.send.write(b"hello", false), Ok(5));
1473        assert_eq!(stream.send.write(b"world", false), Ok(5));
1474        assert_eq!(stream.send.write(b"olleh", false), Ok(5));
1475        assert_eq!(stream.send.write(b"dlrow", true), Ok(5));
1476        assert_eq!(stream.send.off_front(), 0);
1477        assert_eq!(stream.send.bufs_count(), 4);
1478
1479        assert!(stream.is_flushable());
1480
1481        assert!(stream_send_ready(&stream));
1482        assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1483        assert_eq!(stream.send.off_front(), 4);
1484        assert_eq!(&buf[..4], b"hell");
1485
1486        assert!(stream_send_ready(&stream));
1487        assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1488        assert_eq!(stream.send.off_front(), 8);
1489        assert_eq!(&buf[..4], b"owor");
1490
1491        assert!(stream_send_ready(&stream));
1492        assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1493        assert_eq!(stream.send.off_front(), 10);
1494        assert_eq!(&buf[..2], b"ld");
1495
1496        assert!(stream_send_ready(&stream));
1497        assert_eq!(stream.send.emit(&mut buf[..1]), Ok((1, false)));
1498        assert_eq!(stream.send.off_front(), 11);
1499        assert_eq!(&buf[..1], b"o");
1500
1501        assert!(stream_send_ready(&stream));
1502        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1503        assert_eq!(stream.send.off_front(), 16);
1504        assert_eq!(&buf[..5], b"llehd");
1505
1506        assert!(stream_send_ready(&stream));
1507        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((4, true)));
1508        assert_eq!(stream.send.off_front(), 20);
1509        assert_eq!(&buf[..4], b"lrow");
1510
1511        assert!(!stream.is_flushable());
1512
1513        assert!(!stream_send_ready(&stream));
1514        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((0, true)));
1515        assert_eq!(stream.send.off_front(), 20);
1516    }
1517
1518    #[test]
1519    fn send_emit_ack() {
1520        let mut buf = [0; 5];
1521
1522        let mut stream = Stream::new(0, 0, 20, true, true, DEFAULT_STREAM_WINDOW);
1523
1524        assert_eq!(stream.send.write(b"hello", false), Ok(5));
1525        assert_eq!(stream.send.write(b"world", false), Ok(5));
1526        assert_eq!(stream.send.write(b"olleh", false), Ok(5));
1527        assert_eq!(stream.send.write(b"dlrow", true), Ok(5));
1528        assert_eq!(stream.send.off_front(), 0);
1529        assert_eq!(stream.send.bufs_count(), 4);
1530
1531        assert!(stream.is_flushable());
1532
1533        assert!(stream_send_ready(&stream));
1534        assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1535        assert_eq!(stream.send.off_front(), 4);
1536        assert_eq!(&buf[..4], b"hell");
1537
1538        assert!(stream_send_ready(&stream));
1539        assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1540        assert_eq!(stream.send.off_front(), 8);
1541        assert_eq!(&buf[..4], b"owor");
1542
1543        stream.send.ack_and_drop(0, 5);
1544        assert_eq!(stream.send.bufs_count(), 3);
1545
1546        assert!(stream_send_ready(&stream));
1547        assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1548        assert_eq!(stream.send.off_front(), 10);
1549        assert_eq!(&buf[..2], b"ld");
1550
1551        stream.send.ack_and_drop(7, 5);
1552        assert_eq!(stream.send.bufs_count(), 3);
1553
1554        assert!(stream_send_ready(&stream));
1555        assert_eq!(stream.send.emit(&mut buf[..1]), Ok((1, false)));
1556        assert_eq!(stream.send.off_front(), 11);
1557        assert_eq!(&buf[..1], b"o");
1558
1559        assert!(stream_send_ready(&stream));
1560        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1561        assert_eq!(stream.send.off_front(), 16);
1562        assert_eq!(&buf[..5], b"llehd");
1563
1564        stream.send.ack_and_drop(5, 7);
1565        assert_eq!(stream.send.bufs_count(), 2);
1566
1567        assert!(stream_send_ready(&stream));
1568        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((4, true)));
1569        assert_eq!(stream.send.off_front(), 20);
1570        assert_eq!(&buf[..4], b"lrow");
1571
1572        assert!(!stream.is_flushable());
1573
1574        assert!(!stream_send_ready(&stream));
1575        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((0, true)));
1576        assert_eq!(stream.send.off_front(), 20);
1577
1578        stream.send.ack_and_drop(22, 4);
1579        assert_eq!(stream.send.bufs_count(), 2);
1580
1581        stream.send.ack_and_drop(20, 1);
1582        assert_eq!(stream.send.bufs_count(), 2);
1583    }
1584
1585    #[test]
1586    fn send_emit_retransmit() {
1587        let mut buf = [0; 5];
1588
1589        let mut stream = Stream::new(0, 0, 20, true, true, DEFAULT_STREAM_WINDOW);
1590
1591        assert_eq!(stream.send.write(b"hello", false), Ok(5));
1592        assert_eq!(stream.send.write(b"world", false), Ok(5));
1593        assert_eq!(stream.send.write(b"olleh", false), Ok(5));
1594        assert_eq!(stream.send.write(b"dlrow", true), Ok(5));
1595        assert_eq!(stream.send.off_front(), 0);
1596        assert_eq!(stream.send.bufs_count(), 4);
1597
1598        assert!(stream.is_flushable());
1599
1600        assert!(stream_send_ready(&stream));
1601        assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1602        assert_eq!(stream.send.off_front(), 4);
1603        assert_eq!(&buf[..4], b"hell");
1604
1605        assert!(stream_send_ready(&stream));
1606        assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1607        assert_eq!(stream.send.off_front(), 8);
1608        assert_eq!(&buf[..4], b"owor");
1609
1610        stream.send.retransmit(3, 3);
1611        assert_eq!(stream.send.off_front(), 3);
1612
1613        assert!(stream_send_ready(&stream));
1614        assert_eq!(stream.send.emit(&mut buf[..3]), Ok((3, false)));
1615        assert_eq!(stream.send.off_front(), 8);
1616        assert_eq!(&buf[..3], b"low");
1617
1618        assert!(stream_send_ready(&stream));
1619        assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1620        assert_eq!(stream.send.off_front(), 10);
1621        assert_eq!(&buf[..2], b"ld");
1622
1623        stream.send.ack_and_drop(7, 2);
1624
1625        stream.send.retransmit(8, 2);
1626
1627        assert!(stream_send_ready(&stream));
1628        assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1629        assert_eq!(stream.send.off_front(), 10);
1630        assert_eq!(&buf[..2], b"ld");
1631
1632        assert!(stream_send_ready(&stream));
1633        assert_eq!(stream.send.emit(&mut buf[..1]), Ok((1, false)));
1634        assert_eq!(stream.send.off_front(), 11);
1635        assert_eq!(&buf[..1], b"o");
1636
1637        assert!(stream_send_ready(&stream));
1638        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1639        assert_eq!(stream.send.off_front(), 16);
1640        assert_eq!(&buf[..5], b"llehd");
1641
1642        stream.send.retransmit(12, 2);
1643
1644        assert!(stream_send_ready(&stream));
1645        assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1646        assert_eq!(stream.send.off_front(), 16);
1647        assert_eq!(&buf[..2], b"le");
1648
1649        assert!(stream_send_ready(&stream));
1650        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((4, true)));
1651        assert_eq!(stream.send.off_front(), 20);
1652        assert_eq!(&buf[..4], b"lrow");
1653
1654        assert!(!stream.is_flushable());
1655
1656        assert!(!stream_send_ready(&stream));
1657        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((0, true)));
1658        assert_eq!(stream.send.off_front(), 20);
1659
1660        stream.send.retransmit(7, 12);
1661
1662        assert!(stream_send_ready(&stream));
1663        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1664        assert_eq!(stream.send.off_front(), 12);
1665        assert_eq!(&buf[..5], b"rldol");
1666
1667        assert!(stream_send_ready(&stream));
1668        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1669        assert_eq!(stream.send.off_front(), 17);
1670        assert_eq!(&buf[..5], b"lehdl");
1671
1672        assert!(stream_send_ready(&stream));
1673        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((2, false)));
1674        assert_eq!(stream.send.off_front(), 20);
1675        assert_eq!(&buf[..2], b"ro");
1676
1677        stream.send.ack_and_drop(12, 7);
1678
1679        stream.send.retransmit(7, 12);
1680
1681        assert!(stream_send_ready(&stream));
1682        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1683        assert_eq!(stream.send.off_front(), 12);
1684        assert_eq!(&buf[..5], b"rldol");
1685
1686        assert!(stream_send_ready(&stream));
1687        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1688        assert_eq!(stream.send.off_front(), 17);
1689        assert_eq!(&buf[..5], b"lehdl");
1690
1691        assert!(stream_send_ready(&stream));
1692        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((2, false)));
1693        assert_eq!(stream.send.off_front(), 20);
1694        assert_eq!(&buf[..2], b"ro");
1695    }
1696
1697    #[test]
1698    fn rangebuf_split_off() {
1699        let mut buf = RangeBuf::from(b"helloworld", 5, true);
1700        assert_eq!(buf.start, 0);
1701        assert_eq!(buf.pos, 0);
1702        assert_eq!(buf.len, 10);
1703        assert_eq!(buf.off, 5);
1704        assert!(buf.fin);
1705
1706        assert_eq!(buf.len(), 10);
1707        assert_eq!(buf.off(), 5);
1708        assert!(buf.fin());
1709
1710        assert_eq!(&buf[..], b"helloworld");
1711
1712        // Advance buffer.
1713        buf.consume(5);
1714
1715        assert_eq!(buf.start, 0);
1716        assert_eq!(buf.pos, 5);
1717        assert_eq!(buf.len, 10);
1718        assert_eq!(buf.off, 5);
1719        assert!(buf.fin);
1720
1721        assert_eq!(buf.len(), 5);
1722        assert_eq!(buf.off(), 10);
1723        assert!(buf.fin());
1724
1725        assert_eq!(&buf[..], b"world");
1726
1727        // Split buffer before position.
1728        let mut new_buf = buf.split_off(3);
1729
1730        assert_eq!(buf.start, 0);
1731        assert_eq!(buf.pos, 3);
1732        assert_eq!(buf.len, 3);
1733        assert_eq!(buf.off, 5);
1734        assert!(!buf.fin);
1735
1736        assert_eq!(buf.len(), 0);
1737        assert_eq!(buf.off(), 8);
1738        assert!(!buf.fin());
1739
1740        assert_eq!(&buf[..], b"");
1741
1742        assert_eq!(new_buf.start, 3);
1743        assert_eq!(new_buf.pos, 5);
1744        assert_eq!(new_buf.len, 7);
1745        assert_eq!(new_buf.off, 8);
1746        assert!(new_buf.fin);
1747
1748        assert_eq!(new_buf.len(), 5);
1749        assert_eq!(new_buf.off(), 10);
1750        assert!(new_buf.fin());
1751
1752        assert_eq!(&new_buf[..], b"world");
1753
1754        // Advance buffer.
1755        new_buf.consume(2);
1756
1757        assert_eq!(new_buf.start, 3);
1758        assert_eq!(new_buf.pos, 7);
1759        assert_eq!(new_buf.len, 7);
1760        assert_eq!(new_buf.off, 8);
1761        assert!(new_buf.fin);
1762
1763        assert_eq!(new_buf.len(), 3);
1764        assert_eq!(new_buf.off(), 12);
1765        assert!(new_buf.fin());
1766
1767        assert_eq!(&new_buf[..], b"rld");
1768
1769        // Split buffer after position.
1770        let mut new_new_buf = new_buf.split_off(5);
1771
1772        assert_eq!(new_buf.start, 3);
1773        assert_eq!(new_buf.pos, 7);
1774        assert_eq!(new_buf.len, 5);
1775        assert_eq!(new_buf.off, 8);
1776        assert!(!new_buf.fin);
1777
1778        assert_eq!(new_buf.len(), 1);
1779        assert_eq!(new_buf.off(), 12);
1780        assert!(!new_buf.fin());
1781
1782        assert_eq!(&new_buf[..], b"r");
1783
1784        assert_eq!(new_new_buf.start, 8);
1785        assert_eq!(new_new_buf.pos, 8);
1786        assert_eq!(new_new_buf.len, 2);
1787        assert_eq!(new_new_buf.off, 13);
1788        assert!(new_new_buf.fin);
1789
1790        assert_eq!(new_new_buf.len(), 2);
1791        assert_eq!(new_new_buf.off(), 13);
1792        assert!(new_new_buf.fin());
1793
1794        assert_eq!(&new_new_buf[..], b"ld");
1795
1796        // Advance buffer.
1797        new_new_buf.consume(2);
1798
1799        assert_eq!(new_new_buf.start, 8);
1800        assert_eq!(new_new_buf.pos, 10);
1801        assert_eq!(new_new_buf.len, 2);
1802        assert_eq!(new_new_buf.off, 13);
1803        assert!(new_new_buf.fin);
1804
1805        assert_eq!(new_new_buf.len(), 0);
1806        assert_eq!(new_new_buf.off(), 15);
1807        assert!(new_new_buf.fin());
1808
1809        assert_eq!(&new_new_buf[..], b"");
1810    }
1811
1812    /// RFC9000 2.1: A stream ID that is used out of order results in all
1813    /// streams of that type with lower-numbered stream IDs also being opened.
1814    #[test]
1815    fn stream_limit_auto_open() {
1816        let local_tp = crate::TransportParams::default();
1817        let peer_tp = crate::TransportParams::default();
1818
1819        let mut streams = StreamMap::new(5, 5, 5);
1820
1821        let stream_id = 500;
1822        assert!(!is_local(stream_id, true), "stream id is peer initiated");
1823        assert!(is_bidi(stream_id), "stream id is bidirectional");
1824        assert_eq!(
1825            streams
1826                .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
1827                .err(),
1828            Some(Error::StreamLimit),
1829            "stream limit should be exceeded"
1830        );
1831    }
1832
1833    /// Stream limit should be satisfied regardless of what order we open
1834    /// streams
1835    #[test]
1836    fn stream_create_out_of_order() {
1837        let local_tp = crate::TransportParams::default();
1838        let peer_tp = crate::TransportParams::default();
1839
1840        let mut streams = StreamMap::new(5, 5, 5);
1841
1842        for stream_id in [8, 12, 4] {
1843            assert!(is_local(stream_id, false), "stream id is client initiated");
1844            assert!(is_bidi(stream_id), "stream id is bidirectional");
1845            assert!(streams
1846                .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
1847                .is_ok());
1848        }
1849    }
1850
1851    /// Check stream limit boundary cases
1852    #[test]
1853    fn stream_limit_edge() {
1854        let local_tp = crate::TransportParams::default();
1855        let peer_tp = crate::TransportParams::default();
1856
1857        let mut streams = StreamMap::new(3, 3, 3);
1858
1859        // Highest permitted
1860        let stream_id = 8;
1861        assert!(streams
1862            .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
1863            .is_ok());
1864
1865        // One more than highest permitted
1866        let stream_id = 12;
1867        assert_eq!(
1868            streams
1869                .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
1870                .err(),
1871            Some(Error::StreamLimit)
1872        );
1873    }
1874
1875    fn cycle_stream_priority(stream_id: u64, streams: &mut StreamMap) {
1876        let key = streams.get(stream_id).unwrap().priority_key.clone();
1877        streams.update_priority(&key.clone(), &key);
1878    }
1879
1880    #[test]
1881    fn writable_prioritized_default_priority() {
1882        let local_tp = crate::TransportParams::default();
1883        let peer_tp = crate::TransportParams {
1884            initial_max_stream_data_bidi_local: 100,
1885            initial_max_stream_data_uni: 100,
1886            ..Default::default()
1887        };
1888
1889        let mut streams = StreamMap::new(100, 100, 100);
1890
1891        for id in [0, 4, 8, 12] {
1892            assert!(streams
1893                .get_or_create(id, &local_tp, &peer_tp, false, true)
1894                .is_ok());
1895        }
1896
1897        let walk_1: Vec<u64> = streams.writable().collect();
1898        cycle_stream_priority(*walk_1.first().unwrap(), &mut streams);
1899        let walk_2: Vec<u64> = streams.writable().collect();
1900        cycle_stream_priority(*walk_2.first().unwrap(), &mut streams);
1901        let walk_3: Vec<u64> = streams.writable().collect();
1902        cycle_stream_priority(*walk_3.first().unwrap(), &mut streams);
1903        let walk_4: Vec<u64> = streams.writable().collect();
1904        cycle_stream_priority(*walk_4.first().unwrap(), &mut streams);
1905        let walk_5: Vec<u64> = streams.writable().collect();
1906
1907        // All streams are non-incremental and same urgency by default. Multiple
1908        // visits shuffle their order.
1909        assert_eq!(walk_1, vec![0, 4, 8, 12]);
1910        assert_eq!(walk_2, vec![4, 8, 12, 0]);
1911        assert_eq!(walk_3, vec![8, 12, 0, 4]);
1912        assert_eq!(walk_4, vec![12, 0, 4, 8,]);
1913        assert_eq!(walk_5, vec![0, 4, 8, 12]);
1914    }
1915
1916    #[test]
1917    fn writable_prioritized_insert_order() {
1918        let local_tp = crate::TransportParams::default();
1919        let peer_tp = crate::TransportParams {
1920            initial_max_stream_data_bidi_local: 100,
1921            initial_max_stream_data_uni: 100,
1922            ..Default::default()
1923        };
1924
1925        let mut streams = StreamMap::new(100, 100, 100);
1926
1927        // Inserting same-urgency incremental streams in a "random" order yields
1928        // same order to start with.
1929        for id in [12, 4, 8, 0] {
1930            assert!(streams
1931                .get_or_create(id, &local_tp, &peer_tp, false, true)
1932                .is_ok());
1933        }
1934
1935        let walk_1: Vec<u64> = streams.writable().collect();
1936        cycle_stream_priority(*walk_1.first().unwrap(), &mut streams);
1937        let walk_2: Vec<u64> = streams.writable().collect();
1938        cycle_stream_priority(*walk_2.first().unwrap(), &mut streams);
1939        let walk_3: Vec<u64> = streams.writable().collect();
1940        cycle_stream_priority(*walk_3.first().unwrap(), &mut streams);
1941        let walk_4: Vec<u64> = streams.writable().collect();
1942        cycle_stream_priority(*walk_4.first().unwrap(), &mut streams);
1943        let walk_5: Vec<u64> = streams.writable().collect();
1944        assert_eq!(walk_1, vec![12, 4, 8, 0]);
1945        assert_eq!(walk_2, vec![4, 8, 0, 12]);
1946        assert_eq!(walk_3, vec![8, 0, 12, 4,]);
1947        assert_eq!(walk_4, vec![0, 12, 4, 8]);
1948        assert_eq!(walk_5, vec![12, 4, 8, 0]);
1949    }
1950
1951    #[test]
1952    fn writable_prioritized_mixed_urgency() {
1953        let local_tp = crate::TransportParams::default();
1954        let peer_tp = crate::TransportParams {
1955            initial_max_stream_data_bidi_local: 100,
1956            initial_max_stream_data_uni: 100,
1957            ..Default::default()
1958        };
1959
1960        let mut streams = StreamMap::new(100, 100, 100);
1961
1962        // Streams where the urgency descends (becomes more important). No stream
1963        // shares an urgency.
1964        let input = vec![
1965            (0, 100),
1966            (4, 90),
1967            (8, 80),
1968            (12, 70),
1969            (16, 60),
1970            (20, 50),
1971            (24, 40),
1972            (28, 30),
1973            (32, 20),
1974            (36, 10),
1975            (40, 0),
1976        ];
1977
1978        for (id, urgency) in input.clone() {
1979            // this duplicates some code from stream_priority in order to access
1980            // streams and the collection they're in
1981            let stream = streams
1982                .get_or_create(id, &local_tp, &peer_tp, false, true)
1983                .unwrap();
1984
1985            stream.urgency = urgency;
1986
1987            let new_priority_key = Arc::new(StreamPriorityKey {
1988                urgency: stream.urgency,
1989                incremental: stream.incremental,
1990                id,
1991                ..Default::default()
1992            });
1993
1994            let old_priority_key = std::mem::replace(
1995                &mut stream.priority_key,
1996                new_priority_key.clone(),
1997            );
1998
1999            streams.update_priority(&old_priority_key, &new_priority_key);
2000        }
2001
2002        let walk_1: Vec<u64> = streams.writable().collect();
2003        assert_eq!(walk_1, vec![40, 36, 32, 28, 24, 20, 16, 12, 8, 4, 0]);
2004
2005        // Re-applying priority to a stream does not cause duplication.
2006        for (id, urgency) in input {
2007            // this duplicates some code from stream_priority in order to access
2008            // streams and the collection they're in
2009            let stream = streams
2010                .get_or_create(id, &local_tp, &peer_tp, false, true)
2011                .unwrap();
2012
2013            stream.urgency = urgency;
2014
2015            let new_priority_key = Arc::new(StreamPriorityKey {
2016                urgency: stream.urgency,
2017                incremental: stream.incremental,
2018                id,
2019                ..Default::default()
2020            });
2021
2022            let old_priority_key = std::mem::replace(
2023                &mut stream.priority_key,
2024                new_priority_key.clone(),
2025            );
2026
2027            streams.update_priority(&old_priority_key, &new_priority_key);
2028        }
2029
2030        let walk_2: Vec<u64> = streams.writable().collect();
2031        assert_eq!(walk_2, vec![40, 36, 32, 28, 24, 20, 16, 12, 8, 4, 0]);
2032
2033        // Removing streams doesn't break expected ordering.
2034        streams.collect(24, true);
2035
2036        let walk_3: Vec<u64> = streams.writable().collect();
2037        assert_eq!(walk_3, vec![40, 36, 32, 28, 20, 16, 12, 8, 4, 0]);
2038
2039        streams.collect(40, true);
2040        streams.collect(0, true);
2041
2042        let walk_4: Vec<u64> = streams.writable().collect();
2043        assert_eq!(walk_4, vec![36, 32, 28, 20, 16, 12, 8, 4]);
2044
2045        // Adding streams doesn't break expected ordering.
2046        streams
2047            .get_or_create(44, &local_tp, &peer_tp, false, true)
2048            .unwrap();
2049
2050        let walk_5: Vec<u64> = streams.writable().collect();
2051        assert_eq!(walk_5, vec![36, 32, 28, 20, 16, 12, 8, 4, 44]);
2052    }
2053
2054    #[test]
2055    fn writable_prioritized_mixed_urgencies_incrementals() {
2056        let local_tp = crate::TransportParams::default();
2057        let peer_tp = crate::TransportParams {
2058            initial_max_stream_data_bidi_local: 100,
2059            initial_max_stream_data_uni: 100,
2060            ..Default::default()
2061        };
2062
2063        let mut streams = StreamMap::new(100, 100, 100);
2064
2065        // Streams that share some urgency level
2066        let input = vec![
2067            (0, 100),
2068            (4, 20),
2069            (8, 100),
2070            (12, 20),
2071            (16, 90),
2072            (20, 25),
2073            (24, 90),
2074            (28, 30),
2075            (32, 80),
2076            (36, 20),
2077            (40, 0),
2078        ];
2079
2080        for (id, urgency) in input.clone() {
2081            // this duplicates some code from stream_priority in order to access
2082            // streams and the collection they're in
2083            let stream = streams
2084                .get_or_create(id, &local_tp, &peer_tp, false, true)
2085                .unwrap();
2086
2087            stream.urgency = urgency;
2088
2089            let new_priority_key = Arc::new(StreamPriorityKey {
2090                urgency: stream.urgency,
2091                incremental: stream.incremental,
2092                id,
2093                ..Default::default()
2094            });
2095
2096            let old_priority_key = std::mem::replace(
2097                &mut stream.priority_key,
2098                new_priority_key.clone(),
2099            );
2100
2101            streams.update_priority(&old_priority_key, &new_priority_key);
2102        }
2103
2104        let walk_1: Vec<u64> = streams.writable().collect();
2105        cycle_stream_priority(4, &mut streams);
2106        cycle_stream_priority(16, &mut streams);
2107        cycle_stream_priority(0, &mut streams);
2108        let walk_2: Vec<u64> = streams.writable().collect();
2109        cycle_stream_priority(12, &mut streams);
2110        cycle_stream_priority(24, &mut streams);
2111        cycle_stream_priority(8, &mut streams);
2112        let walk_3: Vec<u64> = streams.writable().collect();
2113        cycle_stream_priority(36, &mut streams);
2114        cycle_stream_priority(16, &mut streams);
2115        cycle_stream_priority(0, &mut streams);
2116        let walk_4: Vec<u64> = streams.writable().collect();
2117        cycle_stream_priority(4, &mut streams);
2118        cycle_stream_priority(24, &mut streams);
2119        cycle_stream_priority(8, &mut streams);
2120        let walk_5: Vec<u64> = streams.writable().collect();
2121        cycle_stream_priority(12, &mut streams);
2122        cycle_stream_priority(16, &mut streams);
2123        cycle_stream_priority(0, &mut streams);
2124        let walk_6: Vec<u64> = streams.writable().collect();
2125        cycle_stream_priority(36, &mut streams);
2126        cycle_stream_priority(24, &mut streams);
2127        cycle_stream_priority(8, &mut streams);
2128        let walk_7: Vec<u64> = streams.writable().collect();
2129        cycle_stream_priority(4, &mut streams);
2130        cycle_stream_priority(16, &mut streams);
2131        cycle_stream_priority(0, &mut streams);
2132        let walk_8: Vec<u64> = streams.writable().collect();
2133        cycle_stream_priority(12, &mut streams);
2134        cycle_stream_priority(24, &mut streams);
2135        cycle_stream_priority(8, &mut streams);
2136        let walk_9: Vec<u64> = streams.writable().collect();
2137        cycle_stream_priority(36, &mut streams);
2138        cycle_stream_priority(16, &mut streams);
2139        cycle_stream_priority(0, &mut streams);
2140
2141        assert_eq!(walk_1, vec![40, 4, 12, 36, 20, 28, 32, 16, 24, 0, 8]);
2142        assert_eq!(walk_2, vec![40, 12, 36, 4, 20, 28, 32, 24, 16, 8, 0]);
2143        assert_eq!(walk_3, vec![40, 36, 4, 12, 20, 28, 32, 16, 24, 0, 8]);
2144        assert_eq!(walk_4, vec![40, 4, 12, 36, 20, 28, 32, 24, 16, 8, 0]);
2145        assert_eq!(walk_5, vec![40, 12, 36, 4, 20, 28, 32, 16, 24, 0, 8]);
2146        assert_eq!(walk_6, vec![40, 36, 4, 12, 20, 28, 32, 24, 16, 8, 0]);
2147        assert_eq!(walk_7, vec![40, 4, 12, 36, 20, 28, 32, 16, 24, 0, 8]);
2148        assert_eq!(walk_8, vec![40, 12, 36, 4, 20, 28, 32, 24, 16, 8, 0]);
2149        assert_eq!(walk_9, vec![40, 36, 4, 12, 20, 28, 32, 16, 24, 0, 8]);
2150
2151        // Removing streams doesn't break expected ordering.
2152        streams.collect(20, true);
2153
2154        let walk_10: Vec<u64> = streams.writable().collect();
2155        assert_eq!(walk_10, vec![40, 4, 12, 36, 28, 32, 24, 16, 8, 0]);
2156
2157        // Adding streams doesn't break expected ordering.
2158        let stream = streams
2159            .get_or_create(44, &local_tp, &peer_tp, false, true)
2160            .unwrap();
2161
2162        stream.urgency = 20;
2163        stream.incremental = true;
2164
2165        let new_priority_key = Arc::new(StreamPriorityKey {
2166            urgency: stream.urgency,
2167            incremental: stream.incremental,
2168            id: 44,
2169            ..Default::default()
2170        });
2171
2172        let old_priority_key =
2173            std::mem::replace(&mut stream.priority_key, new_priority_key.clone());
2174
2175        streams.update_priority(&old_priority_key, &new_priority_key);
2176
2177        let walk_11: Vec<u64> = streams.writable().collect();
2178        assert_eq!(walk_11, vec![40, 4, 12, 36, 44, 28, 32, 24, 16, 8, 0]);
2179    }
2180
2181    #[test]
2182    fn priority_tree_dupes() {
2183        let mut prioritized_writable: RBTree<StreamWritablePriorityAdapter> =
2184            Default::default();
2185
2186        for id in [0, 4, 8, 12] {
2187            let s = Arc::new(StreamPriorityKey {
2188                urgency: 0,
2189                incremental: false,
2190                id,
2191                ..Default::default()
2192            });
2193
2194            prioritized_writable.insert(s);
2195        }
2196
2197        let walk_1: Vec<u64> =
2198            prioritized_writable.iter().map(|s| s.id).collect();
2199        assert_eq!(walk_1, vec![0, 4, 8, 12]);
2200
2201        // Default keys could cause duplicate entries, this is normally protected
2202        // against via StreamMap.
2203        for id in [0, 4, 8, 12] {
2204            let s = Arc::new(StreamPriorityKey {
2205                urgency: 0,
2206                incremental: false,
2207                id,
2208                ..Default::default()
2209            });
2210
2211            prioritized_writable.insert(s);
2212        }
2213
2214        let walk_2: Vec<u64> =
2215            prioritized_writable.iter().map(|s| s.id).collect();
2216        assert_eq!(walk_2, vec![0, 0, 4, 4, 8, 8, 12, 12]);
2217    }
2218}
2219
2220mod recv_buf;
2221mod send_buf;