Skip to main content

quiche/stream/
mod.rs

1// Copyright (C) 2018-2019, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27use std::cmp;
28
29use std::sync::Arc;
30
31use std::collections::hash_map;
32use std::collections::HashMap;
33use std::collections::HashSet;
34
35use intrusive_collections::intrusive_adapter;
36use intrusive_collections::KeyAdapter;
37use intrusive_collections::RBTree;
38use intrusive_collections::RBTreeAtomicLink;
39
40use smallvec::SmallVec;
41
42use crate::buffers::DefaultBufFactory;
43use crate::BufFactory;
44use crate::Error;
45use crate::Result;
46
47const DEFAULT_URGENCY: u8 = 127;
48
49/// The default size of the receiver stream flow control window.
50pub(crate) const DEFAULT_STREAM_WINDOW: u64 = 32 * 1024;
51
52/// The maximum size of the receiver stream flow control window.
53pub const MAX_STREAM_WINDOW: u64 = 16 * 1024 * 1024;
54
55/// A simple no-op hasher for Stream IDs.
56///
57/// The QUIC protocol and quiche library guarantees stream ID uniqueness, so
58/// we can save effort by avoiding using a more complicated algorithm.
59#[derive(Default)]
60pub struct StreamIdHasher {
61    id: u64,
62}
63
64/// Return value type of `RecvBuf::reset()`
65#[derive(Debug, PartialEq, Clone, Copy)]
66pub struct RecvBufResetReturn {
67    /// Returns the difference between the previous max_data offset
68    /// received and the final size reported by the reset
69    pub max_data_delta: u64,
70
71    /// The amount of flow control credit that should be returned to the
72    /// connection level flow control.
73    pub consumed_flowcontrol: u64,
74}
75
76impl RecvBufResetReturn {
77    pub fn zero() -> Self {
78        Self {
79            max_data_delta: 0,
80            consumed_flowcontrol: 0,
81        }
82    }
83}
84
85/// Action to perform when reading from a stream's receive buffer.
86pub enum RecvAction<T: bytes::BufMut> {
87    /// Emit data by copying it into the provided buffer.
88    Emit { out: T },
89    /// Discard up to the specified number of bytes without copying.
90    Discard { len: usize },
91}
92
93impl std::hash::Hasher for StreamIdHasher {
94    #[inline]
95    fn finish(&self) -> u64 {
96        self.id
97    }
98
99    #[inline]
100    fn write_u64(&mut self, id: u64) {
101        self.id = id;
102    }
103
104    #[inline]
105    fn write(&mut self, _: &[u8]) {
106        // We need a default write() for the trait but stream IDs will always
107        // be a u64 so we just delegate to write_u64.
108        unimplemented!()
109    }
110}
111
112type BuildStreamIdHasher = std::hash::BuildHasherDefault<StreamIdHasher>;
113
114pub type StreamIdHashMap<V> = HashMap<u64, V, BuildStreamIdHasher>;
115pub type StreamIdHashSet = HashSet<u64, BuildStreamIdHasher>;
116
117/// Keeps track of QUIC streams and enforces stream limits.
118#[derive(Default)]
119pub struct StreamMap<F: BufFactory = DefaultBufFactory> {
120    /// Map of streams indexed by stream ID.
121    streams: StreamIdHashMap<Stream<F>>,
122
123    /// Set of streams that were completed and garbage collected.
124    ///
125    /// Instead of keeping the full stream state forever, we collect completed
126    /// streams to save memory, but we still need to keep track of previously
127    /// created streams, to prevent peers from re-creating them.
128    collected: StreamIdHashSet,
129
130    /// Peer's maximum bidirectional stream count limit.
131    peer_max_streams_bidi: u64,
132
133    /// Peer's maximum unidirectional stream count limit.
134    peer_max_streams_uni: u64,
135
136    /// The total number of bidirectional streams opened by the peer.
137    peer_opened_streams_bidi: u64,
138
139    /// The total number of unidirectional streams opened by the peer.
140    peer_opened_streams_uni: u64,
141
142    /// Local maximum bidirectional stream count limit.
143    local_max_streams_bidi: u64,
144    local_max_streams_bidi_next: u64,
145
146    /// Initial maximum bidirectional stream count.
147    initial_max_streams_bidi: u64,
148
149    /// Local maximum unidirectional stream count limit.
150    local_max_streams_uni: u64,
151    local_max_streams_uni_next: u64,
152
153    /// Initial maximum unidirectional stream count.
154    initial_max_streams_uni: u64,
155
156    /// The total number of bidirectional streams opened by the local endpoint.
157    local_opened_streams_bidi: u64,
158
159    /// The total number of unidirectional streams opened by the local endpoint.
160    local_opened_streams_uni: u64,
161
162    /// Queue of stream IDs corresponding to streams that have buffered data
163    /// ready to be sent to the peer. This also implies that the stream has
164    /// enough flow control credits to send at least some of that data.
165    flushable: RBTree<StreamFlushablePriorityAdapter>,
166
167    /// Set of stream IDs corresponding to streams that have outstanding data
168    /// to read. This is used to generate a `StreamIter` of streams without
169    /// having to iterate over the full list of streams.
170    pub readable: RBTree<StreamReadablePriorityAdapter>,
171
172    /// Set of stream IDs corresponding to streams that have enough flow control
173    /// capacity to be written to, and is not finished. This is used to generate
174    /// a `StreamIter` of streams without having to iterate over the full list
175    /// of streams.
176    pub writable: RBTree<StreamWritablePriorityAdapter>,
177
178    /// Set of stream IDs corresponding to streams that are almost out of flow
179    /// control credit and need to send MAX_STREAM_DATA. This is used to
180    /// generate a `StreamIter` of streams without having to iterate over the
181    /// full list of streams.
182    almost_full: StreamIdHashSet,
183
184    /// Set of stream IDs corresponding to streams that are blocked. The value
185    /// of the map elements represents the offset of the stream at which the
186    /// blocking occurred.
187    blocked: StreamIdHashMap<u64>,
188
189    /// Set of stream IDs corresponding to streams that are reset. The value
190    /// of the map elements is a tuple of the error code and final size values
191    /// to include in the RESET_STREAM frame.
192    reset: StreamIdHashMap<(u64, u64)>,
193
194    /// Set of stream IDs corresponding to streams that are shutdown on the
195    /// receive side, and need to send a STOP_SENDING frame. The value of the
196    /// map elements is the error code to include in the STOP_SENDING frame.
197    stopped: StreamIdHashMap<u64>,
198
199    /// The maximum size of a stream window.
200    max_stream_window: u64,
201
202    /// When `true`, the initial flow control window will be set to the
203    /// `max_rx_data`, if false, it will be set to `DEFAULT_STREAM_WINDOW`
204    use_initial_max_data_as_flow_control_win: bool,
205
206    /// Total number of bytes in send buffers across all streams.
207    tx_buffered: usize,
208}
209
210impl<F: BufFactory> StreamMap<F> {
211    pub fn new(
212        max_streams_bidi: u64, max_streams_uni: u64, max_stream_window: u64,
213    ) -> Self {
214        StreamMap {
215            local_max_streams_bidi: max_streams_bidi,
216            local_max_streams_bidi_next: max_streams_bidi,
217            initial_max_streams_bidi: max_streams_bidi,
218
219            local_max_streams_uni: max_streams_uni,
220            local_max_streams_uni_next: max_streams_uni,
221            initial_max_streams_uni: max_streams_uni,
222
223            max_stream_window,
224
225            ..StreamMap::default()
226        }
227    }
228
229    /// Returns the stream with the given ID if it exists.
230    pub fn get(&self, id: u64) -> Option<&Stream<F>> {
231        self.streams.get(&id)
232    }
233
234    /// Returns the mutable stream with the given ID if it exists.
235    pub fn get_mut(&mut self, id: u64) -> Option<&mut Stream<F>> {
236        self.streams.get_mut(&id)
237    }
238
239    /// Returns the mutable stream with the given ID if it exists, or creates
240    /// a new one otherwise.
241    ///
242    /// The `local` parameter indicates whether the stream's creation was
243    /// requested by the local application rather than the peer, and is
244    /// used to validate the requested stream ID, and to select the initial
245    /// flow control values from the local and remote transport parameters
246    /// (also passed as arguments).
247    ///
248    /// This also takes care of enforcing both local and the peer's stream
249    /// count limits. If one of these limits is violated, the `StreamLimit`
250    /// error is returned.
251    pub(crate) fn get_or_create(
252        &mut self, id: u64, local_params: &crate::TransportParams,
253        peer_params: &crate::TransportParams, local: bool, is_server: bool,
254    ) -> Result<&mut Stream<F>> {
255        let (stream, is_new_and_writable) = match self.streams.entry(id) {
256            hash_map::Entry::Vacant(v) => {
257                // Stream has already been closed and garbage collected.
258                if self.collected.contains(&id) {
259                    return Err(Error::Done);
260                }
261
262                if local != is_local(id, is_server) {
263                    return Err(Error::InvalidStreamState(id));
264                }
265
266                let (max_rx_data, max_tx_data) = match (local, is_bidi(id)) {
267                    // Locally-initiated bidirectional stream.
268                    (true, true) => (
269                        local_params.initial_max_stream_data_bidi_local,
270                        peer_params.initial_max_stream_data_bidi_remote,
271                    ),
272
273                    // Locally-initiated unidirectional stream.
274                    (true, false) => (0, peer_params.initial_max_stream_data_uni),
275
276                    // Remotely-initiated bidirectional stream.
277                    (false, true) => (
278                        local_params.initial_max_stream_data_bidi_remote,
279                        peer_params.initial_max_stream_data_bidi_local,
280                    ),
281
282                    // Remotely-initiated unidirectional stream.
283                    (false, false) =>
284                        (local_params.initial_max_stream_data_uni, 0),
285                };
286
287                // The two least significant bits from a stream id identify the
288                // type of stream. Truncate those bits to get the sequence for
289                // that stream type.
290                let stream_sequence = id >> 2;
291
292                // Enforce stream count limits.
293                match (is_local(id, is_server), is_bidi(id)) {
294                    (true, true) => {
295                        let n = cmp::max(
296                            self.local_opened_streams_bidi,
297                            stream_sequence + 1,
298                        );
299
300                        if n > self.peer_max_streams_bidi {
301                            return Err(Error::StreamLimit);
302                        }
303
304                        self.local_opened_streams_bidi = n;
305                    },
306
307                    (true, false) => {
308                        let n = cmp::max(
309                            self.local_opened_streams_uni,
310                            stream_sequence + 1,
311                        );
312
313                        if n > self.peer_max_streams_uni {
314                            return Err(Error::StreamLimit);
315                        }
316
317                        self.local_opened_streams_uni = n;
318                    },
319
320                    (false, true) => {
321                        let n = cmp::max(
322                            self.peer_opened_streams_bidi,
323                            stream_sequence + 1,
324                        );
325
326                        if n > self.local_max_streams_bidi {
327                            return Err(Error::StreamLimit);
328                        }
329
330                        self.peer_opened_streams_bidi = n;
331                    },
332
333                    (false, false) => {
334                        let n = cmp::max(
335                            self.peer_opened_streams_uni,
336                            stream_sequence + 1,
337                        );
338
339                        if n > self.local_max_streams_uni {
340                            return Err(Error::StreamLimit);
341                        }
342
343                        self.peer_opened_streams_uni = n;
344                    },
345                };
346
347                let initial_window =
348                    if self.use_initial_max_data_as_flow_control_win {
349                        max_rx_data
350                    } else {
351                        cmp::min(max_rx_data, DEFAULT_STREAM_WINDOW)
352                    };
353                let s = Stream::new(
354                    id,
355                    max_rx_data,
356                    max_tx_data,
357                    local,
358                    initial_window,
359                    self.max_stream_window,
360                );
361
362                let is_writable = s.is_writable();
363
364                (v.insert(s), is_writable)
365            },
366
367            hash_map::Entry::Occupied(v) => (v.into_mut(), false),
368        };
369
370        // Newly created stream might already be writable due to initial flow
371        // control limits.
372        if is_new_and_writable {
373            self.writable.insert(Arc::clone(&stream.priority_key));
374        }
375
376        Ok(stream)
377    }
378
379    /// Adds the stream ID to the readable streams set.
380    ///
381    /// If the stream was already in the list, this does nothing.
382    pub fn insert_readable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
383        if !priority_key.readable.is_linked() {
384            self.readable.insert(Arc::clone(priority_key));
385        }
386    }
387
388    /// Removes the stream ID from the readable streams set.
389    pub fn remove_readable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
390        if !priority_key.readable.is_linked() {
391            return;
392        }
393
394        let mut c = {
395            let ptr = Arc::as_ptr(priority_key);
396            unsafe { self.readable.cursor_mut_from_ptr(ptr) }
397        };
398
399        c.remove();
400    }
401
402    /// Adds the stream ID to the writable streams set.
403    ///
404    /// This should also be called anytime a new stream is created, in addition
405    /// to when an existing stream becomes writable.
406    ///
407    /// If the stream was already in the list, this does nothing.
408    pub fn insert_writable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
409        if !priority_key.writable.is_linked() {
410            self.writable.insert(Arc::clone(priority_key));
411        }
412    }
413
414    /// Removes the stream ID from the writable streams set.
415    ///
416    /// This should also be called anytime an existing stream stops being
417    /// writable.
418    pub fn remove_writable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
419        if !priority_key.writable.is_linked() {
420            return;
421        }
422
423        let mut c = {
424            let ptr = Arc::as_ptr(priority_key);
425            unsafe { self.writable.cursor_mut_from_ptr(ptr) }
426        };
427
428        c.remove();
429    }
430
431    /// Adds the stream ID to the flushable streams set.
432    ///
433    /// If the stream was already in the list, this does nothing.
434    pub fn insert_flushable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
435        if !priority_key.flushable.is_linked() {
436            self.flushable.insert(Arc::clone(priority_key));
437        }
438    }
439
440    /// Removes the stream ID from the flushable streams set.
441    pub fn remove_flushable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
442        if !priority_key.flushable.is_linked() {
443            return;
444        }
445
446        let mut c = {
447            let ptr = Arc::as_ptr(priority_key);
448            unsafe { self.flushable.cursor_mut_from_ptr(ptr) }
449        };
450
451        c.remove();
452    }
453
454    pub fn peek_flushable(&self) -> Option<Arc<StreamPriorityKey>> {
455        self.flushable.front().clone_pointer()
456    }
457
458    /// Updates the priorities of a stream.
459    pub fn update_priority(
460        &mut self, old: &Arc<StreamPriorityKey>, new: &Arc<StreamPriorityKey>,
461    ) {
462        if old.readable.is_linked() {
463            self.remove_readable(old);
464            self.readable.insert(Arc::clone(new));
465        }
466
467        if old.writable.is_linked() {
468            self.remove_writable(old);
469            self.writable.insert(Arc::clone(new));
470        }
471
472        if old.flushable.is_linked() {
473            self.remove_flushable(old);
474            self.flushable.insert(Arc::clone(new));
475        }
476    }
477
478    /// Adds the stream ID to the almost full streams set.
479    ///
480    /// If the stream was already in the list, this does nothing.
481    pub fn insert_almost_full(&mut self, stream_id: u64) {
482        self.almost_full.insert(stream_id);
483    }
484
485    /// Removes the stream ID from the almost full streams set.
486    pub fn remove_almost_full(&mut self, stream_id: u64) {
487        self.almost_full.remove(&stream_id);
488    }
489
490    /// Adds the stream ID to the blocked streams set with the
491    /// given offset value.
492    ///
493    /// If the stream was already in the list, this does nothing.
494    pub fn insert_blocked(&mut self, stream_id: u64, off: u64) {
495        self.blocked.insert(stream_id, off);
496    }
497
498    /// Removes the stream ID from the blocked streams set.
499    pub fn remove_blocked(&mut self, stream_id: u64) {
500        self.blocked.remove(&stream_id);
501    }
502
503    /// Adds the stream ID to the reset streams set with the
504    /// given error code and final size values.
505    ///
506    /// If the stream was already in the list, this does nothing.
507    pub fn insert_reset(
508        &mut self, stream_id: u64, error_code: u64, final_size: u64,
509    ) {
510        self.reset.insert(stream_id, (error_code, final_size));
511    }
512
513    /// Removes the stream ID from the reset streams set.
514    pub fn remove_reset(&mut self, stream_id: u64) {
515        self.reset.remove(&stream_id);
516    }
517
518    /// Adds the stream ID to the stopped streams set with the
519    /// given error code.
520    ///
521    /// If the stream was already in the list, this does nothing.
522    pub fn insert_stopped(&mut self, stream_id: u64, error_code: u64) {
523        self.stopped.insert(stream_id, error_code);
524    }
525
526    /// Removes the stream ID from the stopped streams set.
527    pub fn remove_stopped(&mut self, stream_id: u64) {
528        self.stopped.remove(&stream_id);
529    }
530
531    /// Updates the peer's maximum bidirectional stream count limit.
532    pub fn update_peer_max_streams_bidi(&mut self, v: u64) {
533        self.peer_max_streams_bidi = cmp::max(self.peer_max_streams_bidi, v);
534    }
535
536    /// Updates the peer's maximum unidirectional stream count limit.
537    pub fn update_peer_max_streams_uni(&mut self, v: u64) {
538        self.peer_max_streams_uni = cmp::max(self.peer_max_streams_uni, v);
539    }
540
541    /// Commits the new max_streams_bidi limit.
542    pub fn update_max_streams_bidi(&mut self) {
543        self.local_max_streams_bidi = self.local_max_streams_bidi_next;
544    }
545
546    /// Sets the max_streams_bidi limit to the given value.
547    pub fn set_max_streams_bidi(&mut self, max: u64) {
548        self.local_max_streams_bidi = max;
549        self.local_max_streams_bidi_next = max;
550        self.initial_max_streams_bidi = max;
551    }
552
553    /// Returns the current max_streams_bidi limit.
554    pub fn max_streams_bidi(&self) -> u64 {
555        self.local_max_streams_bidi
556    }
557
558    /// Returns the new max_streams_bidi limit.
559    pub fn max_streams_bidi_next(&mut self) -> u64 {
560        self.local_max_streams_bidi_next
561    }
562
563    /// Commits the new max_streams_uni limit.
564    pub fn update_max_streams_uni(&mut self) {
565        self.local_max_streams_uni = self.local_max_streams_uni_next;
566    }
567
568    /// Returns the new max_streams_uni limit.
569    pub fn max_streams_uni_next(&mut self) -> u64 {
570        self.local_max_streams_uni_next
571    }
572
573    /// Returns the peer's current maximum bidirectional stream count limit.
574    pub fn peer_max_streams_bidi(&self) -> u64 {
575        self.peer_max_streams_bidi
576    }
577
578    /// Returns the number of bidirectional streams that can be created
579    /// before the peer's stream count limit is reached.
580    pub fn peer_streams_left_bidi(&self) -> u64 {
581        self.peer_max_streams_bidi - self.local_opened_streams_bidi
582    }
583
584    /// Returns the peer's current maximum unidirectional stream count limit.
585    pub fn peer_max_streams_uni(&self) -> u64 {
586        self.peer_max_streams_uni
587    }
588
589    /// Returns the number of unidirectional streams that can be created
590    /// before the peer's stream count limit is reached.
591    pub fn peer_streams_left_uni(&self) -> u64 {
592        self.peer_max_streams_uni - self.local_opened_streams_uni
593    }
594
595    /// Drops completed stream.
596    ///
597    /// This should only be called when Stream::is_complete() returns true for
598    /// the given stream.
599    pub fn collect(&mut self, stream_id: u64, local: bool) {
600        if !local {
601            // If the stream was created by the peer, give back a max streams
602            // credit.
603            if is_bidi(stream_id) {
604                self.local_max_streams_bidi_next =
605                    self.local_max_streams_bidi_next.saturating_add(1);
606            } else {
607                self.local_max_streams_uni_next =
608                    self.local_max_streams_uni_next.saturating_add(1);
609            }
610        }
611
612        let s = self.streams.remove(&stream_id).unwrap();
613
614        self.remove_readable(&s.priority_key);
615
616        self.remove_writable(&s.priority_key);
617
618        self.remove_flushable(&s.priority_key);
619
620        self.collected.insert(stream_id);
621    }
622
623    /// Creates an iterator over streams that have outstanding data to read.
624    pub fn readable(&self) -> StreamIter {
625        StreamIter {
626            streams: self.readable.iter().map(|s| s.id).collect(),
627            index: 0,
628        }
629    }
630
631    /// Creates an iterator over streams that can be written to.
632    pub fn writable(&self) -> StreamIter {
633        StreamIter {
634            streams: self.writable.iter().map(|s| s.id).collect(),
635            index: 0,
636        }
637    }
638
639    /// Creates an iterator over streams that need to send MAX_STREAM_DATA.
640    pub fn almost_full(&self) -> StreamIter {
641        StreamIter::from(&self.almost_full)
642    }
643
644    /// Creates an iterator over streams that need to send STREAM_DATA_BLOCKED.
645    pub fn blocked(&self) -> hash_map::Iter<'_, u64, u64> {
646        self.blocked.iter()
647    }
648
649    /// Creates an iterator over streams that need to send RESET_STREAM.
650    pub fn reset(&self) -> hash_map::Iter<'_, u64, (u64, u64)> {
651        self.reset.iter()
652    }
653
654    /// Creates an iterator over streams that need to send STOP_SENDING.
655    pub fn stopped(&self) -> hash_map::Iter<'_, u64, u64> {
656        self.stopped.iter()
657    }
658
659    /// Returns true if the stream has been collected.
660    pub fn is_collected(&self, stream_id: u64) -> bool {
661        self.collected.contains(&stream_id)
662    }
663
664    /// Returns true if there are any streams that have data to write.
665    pub fn has_flushable(&self) -> bool {
666        !self.flushable.is_empty()
667    }
668
669    /// Returns true if there are any streams that have data to read.
670    pub fn has_readable(&self) -> bool {
671        !self.readable.is_empty()
672    }
673
674    /// Returns true if there are any streams that need to update the local
675    /// flow control limit.
676    pub fn has_almost_full(&self) -> bool {
677        !self.almost_full.is_empty()
678    }
679
680    /// Returns true if there are any streams that are blocked.
681    pub fn has_blocked(&self) -> bool {
682        !self.blocked.is_empty()
683    }
684
685    /// Returns true if there are any streams that are reset.
686    pub fn has_reset(&self) -> bool {
687        !self.reset.is_empty()
688    }
689
690    /// Returns true if there are any streams that need to send STOP_SENDING.
691    pub fn has_stopped(&self) -> bool {
692        !self.stopped.is_empty()
693    }
694
695    /// Returns true if the max bidirectional streams count needs to be updated
696    /// by sending a MAX_STREAMS frame to the peer.
697    ///
698    /// This only sends MAX_STREAMS when available capacity is at or below 50%
699    /// of the initial maximum streams target.
700    pub fn should_update_max_streams_bidi(&self) -> bool {
701        let available = self
702            .local_max_streams_bidi
703            .saturating_sub(self.peer_opened_streams_bidi);
704        self.local_max_streams_bidi_next != self.local_max_streams_bidi &&
705            available <= self.initial_max_streams_bidi / 2
706    }
707
708    /// Returns true if the max unidirectional streams count needs to be updated
709    /// by sending a MAX_STREAMS frame to the peer.
710    ///
711    /// This only send MAX_STREAMS when available capacity is at or below 50% of
712    /// the initial maximum streams target.
713    pub fn should_update_max_streams_uni(&self) -> bool {
714        let available = self
715            .local_max_streams_uni
716            .saturating_sub(self.peer_opened_streams_uni);
717        self.local_max_streams_uni_next != self.local_max_streams_uni &&
718            available <= self.initial_max_streams_uni / 2
719    }
720
721    /// Returns the number of active streams in the map.
722    #[cfg(test)]
723    pub fn len(&self) -> usize {
724        self.streams.len()
725    }
726
727    /// Returns the total number of bytes buffered across all streams.
728    pub(crate) fn tx_buffered(&self) -> usize {
729        self.tx_buffered
730    }
731
732    /// Computes the actual number of bytes in send buffers by summing across
733    /// all streams. This is used for debugging to verify that tx_buffered
734    /// is accurate.
735    fn tx_buffered_actual(&self) -> usize {
736        self.streams
737            .values()
738            .map(|s| s.send.buffered_bytes() as usize)
739            .sum()
740    }
741
742    /// Checks if the stored tx_buffered matches the actual value.
743    /// Returns true if they match, false otherwise.
744    pub(crate) fn tx_buffered_is_consistent(&self) -> bool {
745        self.tx_buffered == self.tx_buffered_actual()
746    }
747
748    /// Updates the tx_buffered value by adding the delta.
749    pub(crate) fn add_tx_buffered(&mut self, delta: usize) {
750        self.tx_buffered += delta;
751
752        #[cfg(debug_assertions)]
753        self.debug_check_tx_buffered_consistency();
754    }
755
756    /// Updates the tx_buffered value by subtracting the delta.
757    pub(crate) fn sub_tx_buffered(&mut self, delta: usize) {
758        debug_assert!(self.tx_buffered >= delta);
759        self.tx_buffered = self.tx_buffered.saturating_sub(delta);
760
761        #[cfg(debug_assertions)]
762        self.debug_check_tx_buffered_consistency();
763    }
764
765    /// Verifies that the stored tx_buffered value matches the actual bytes in
766    /// send buffers across all streams. Enabled in debug builds to catch
767    /// inconsistencies early.
768    #[cfg(debug_assertions)]
769    pub(crate) fn debug_check_tx_buffered_consistency(&self) {
770        if !self.tx_buffered_is_consistent() {
771            let buffered_per_stream = self
772                .streams
773                .iter()
774                .map(|(id, s)| (*id, s.send.buffered_bytes()))
775                .collect::<Vec<_>>();
776
777            let actual = self.tx_buffered_actual();
778            let stored = self.tx_buffered;
779            panic!(
780                "tx_buffered mismatch: stored={}, actual={}, diff={}, buffered_per_stream={:?}",
781                stored,
782                actual,
783                stored as i64 - actual as i64,
784                buffered_per_stream
785            );
786        }
787    }
788
789    /// When `true`, the initial flow control window will be set to the
790    /// `max_rx_data`, if false, it will be set to `DEFAULT_STREAM_WINDOW`
791    pub(crate) fn set_use_initial_max_data_as_flow_control_win(
792        &mut self, v: bool,
793    ) {
794        self.use_initial_max_data_as_flow_control_win = v;
795    }
796}
797
798/// A QUIC stream.
799pub struct Stream<F: BufFactory = DefaultBufFactory> {
800    /// Receive-side stream buffer.
801    pub recv: recv_buf::RecvBuf,
802
803    /// Send-side stream buffer.
804    pub send: send_buf::SendBuf<F>,
805
806    pub send_lowat: usize,
807
808    /// Whether the stream is bidirectional.
809    pub bidi: bool,
810
811    /// Whether the stream was created by the local endpoint.
812    pub local: bool,
813
814    /// The stream's urgency (lower is better). Default is `DEFAULT_URGENCY`.
815    pub urgency: u8,
816
817    /// Whether the stream can be flushed incrementally. Default is `true`.
818    pub incremental: bool,
819
820    pub priority_key: Arc<StreamPriorityKey>,
821}
822
823impl<F: BufFactory> Stream<F> {
824    /// Creates a new stream with the given flow control limits.
825    pub fn new(
826        id: u64, max_rx_data: u64, max_tx_data: u64, local: bool,
827        initial_window: u64, max_window: u64,
828    ) -> Self {
829        let priority_key = Arc::new(StreamPriorityKey {
830            id,
831            ..Default::default()
832        });
833
834        Stream {
835            recv: recv_buf::RecvBuf::new(max_rx_data, initial_window, max_window),
836            send: send_buf::SendBuf::new(max_tx_data),
837            send_lowat: 1,
838            bidi: is_bidi(id),
839            local,
840            urgency: priority_key.urgency,
841            incremental: priority_key.incremental,
842            priority_key,
843        }
844    }
845
846    /// Returns true if the stream has data to read.
847    pub fn is_readable(&self) -> bool {
848        self.recv.ready()
849    }
850
851    /// Returns true if the stream has enough flow control capacity to be
852    /// written to, and is not finished.
853    pub fn is_writable(&self) -> bool {
854        !self.send.is_shutdown() &&
855            !self.send.is_fin() &&
856            (self.send.off_back() + self.send_lowat as u64) <
857                self.send.max_off()
858    }
859
860    /// Returns true if the stream has data to send and is allowed to send at
861    /// least some of it.
862    pub fn is_flushable(&self) -> bool {
863        let off_front = self.send.off_front();
864
865        !self.send.is_empty() &&
866            off_front < self.send.off_back() &&
867            off_front < self.send.max_off()
868    }
869
870    /// Returns true if the stream is complete.
871    ///
872    /// For bidirectional streams this happens when both the receive and send
873    /// sides are complete. That is when all incoming data has been read by the
874    /// application, and when all outgoing data has been acked by the peer.
875    ///
876    /// For unidirectional streams this happens when either the receive or send
877    /// side is complete, depending on whether the stream was created locally
878    /// or not.
879    pub fn is_complete(&self) -> bool {
880        match (self.bidi, self.local) {
881            // For bidirectional streams we need to check both receive and send
882            // sides for completion.
883            (true, _) => self.recv.is_fin() && self.send.is_complete(),
884
885            // For unidirectional streams generated locally, we only need to
886            // check the send side for completion.
887            (false, true) => self.send.is_complete(),
888
889            // For unidirectional streams generated by the peer, we only need
890            // to check the receive side for completion.
891            (false, false) => self.recv.is_fin(),
892        }
893    }
894}
895
896/// Returns true if the stream was created locally.
897pub fn is_local(stream_id: u64, is_server: bool) -> bool {
898    (stream_id & 0x1) == (is_server as u64)
899}
900
901/// Returns true if the stream is bidirectional.
902pub fn is_bidi(stream_id: u64) -> bool {
903    (stream_id & 0x2) == 0
904}
905
906#[derive(Clone, Debug)]
907pub struct StreamPriorityKey {
908    pub urgency: u8,
909    pub incremental: bool,
910    pub id: u64,
911
912    pub readable: RBTreeAtomicLink,
913    pub writable: RBTreeAtomicLink,
914    pub flushable: RBTreeAtomicLink,
915}
916
917impl Default for StreamPriorityKey {
918    fn default() -> Self {
919        Self {
920            urgency: DEFAULT_URGENCY,
921            incremental: true,
922            id: Default::default(),
923            readable: Default::default(),
924            writable: Default::default(),
925            flushable: Default::default(),
926        }
927    }
928}
929
930impl PartialEq for StreamPriorityKey {
931    fn eq(&self, other: &Self) -> bool {
932        self.id == other.id
933    }
934}
935
936impl Eq for StreamPriorityKey {}
937
938impl PartialOrd for StreamPriorityKey {
939    fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
940        Some(self.cmp(other))
941    }
942}
943
944impl Ord for StreamPriorityKey {
945    fn cmp(&self, other: &Self) -> cmp::Ordering {
946        // Ignore priority if ID matches.
947        if self.id == other.id {
948            return cmp::Ordering::Equal;
949        }
950
951        // First, order by urgency...
952        if self.urgency != other.urgency {
953            return self.urgency.cmp(&other.urgency);
954        }
955
956        // ...when the urgency is the same, and both are not incremental, order
957        // by stream ID...
958        if !self.incremental && !other.incremental {
959            return self.id.cmp(&other.id);
960        }
961
962        // ...non-incremental takes priority over incremental...
963        if self.incremental && !other.incremental {
964            return cmp::Ordering::Greater;
965        }
966        if !self.incremental && other.incremental {
967            return cmp::Ordering::Less;
968        }
969
970        // ...finally, when both are incremental, `other` takes precedence (so
971        // `self` is always sorted after other same-urgency incremental
972        // entries).
973        cmp::Ordering::Greater
974    }
975}
976
977intrusive_adapter!(pub StreamWritablePriorityAdapter = Arc<StreamPriorityKey>: StreamPriorityKey { writable: RBTreeAtomicLink });
978
979impl KeyAdapter<'_> for StreamWritablePriorityAdapter {
980    type Key = StreamPriorityKey;
981
982    fn get_key(&self, s: &StreamPriorityKey) -> Self::Key {
983        s.clone()
984    }
985}
986
987intrusive_adapter!(pub StreamReadablePriorityAdapter = Arc<StreamPriorityKey>: StreamPriorityKey { readable: RBTreeAtomicLink });
988
989impl KeyAdapter<'_> for StreamReadablePriorityAdapter {
990    type Key = StreamPriorityKey;
991
992    fn get_key(&self, s: &StreamPriorityKey) -> Self::Key {
993        s.clone()
994    }
995}
996
997intrusive_adapter!(pub StreamFlushablePriorityAdapter = Arc<StreamPriorityKey>: StreamPriorityKey { flushable: RBTreeAtomicLink });
998
999impl KeyAdapter<'_> for StreamFlushablePriorityAdapter {
1000    type Key = StreamPriorityKey;
1001
1002    fn get_key(&self, s: &StreamPriorityKey) -> Self::Key {
1003        s.clone()
1004    }
1005}
1006
1007/// An iterator over QUIC streams.
1008#[derive(Default)]
1009pub struct StreamIter {
1010    streams: SmallVec<[u64; 8]>,
1011    index: usize,
1012}
1013
1014impl StreamIter {
1015    #[inline]
1016    fn from(streams: &StreamIdHashSet) -> Self {
1017        StreamIter {
1018            streams: streams.iter().copied().collect(),
1019            index: 0,
1020        }
1021    }
1022}
1023
1024impl Iterator for StreamIter {
1025    type Item = u64;
1026
1027    #[inline]
1028    fn next(&mut self) -> Option<Self::Item> {
1029        let v = self.streams.get(self.index)?;
1030        self.index += 1;
1031        Some(*v)
1032    }
1033}
1034
1035impl ExactSizeIterator for StreamIter {
1036    #[inline]
1037    fn len(&self) -> usize {
1038        self.streams.len() - self.index
1039    }
1040}
1041
1042#[cfg(test)]
1043mod tests {
1044    use crate::range_buf::RangeBuf;
1045
1046    use super::*;
1047
1048    #[test]
1049    fn recv_flow_control() {
1050        let mut stream = <Stream>::new(0, 15, 0, true, 15, DEFAULT_STREAM_WINDOW);
1051        assert!(!stream.recv.almost_full());
1052
1053        let mut buf = [0; 32];
1054
1055        let first = RangeBuf::from(b"hello", 0, false);
1056        let second = RangeBuf::from(b"world", 5, false);
1057        let third = RangeBuf::from(b"something", 10, false);
1058
1059        assert_eq!(stream.recv.write(second), Ok(()));
1060        assert_eq!(stream.recv.write(first), Ok(()));
1061        assert!(!stream.recv.almost_full());
1062
1063        assert_eq!(stream.recv.write(third), Err(Error::FlowControl));
1064
1065        let (len, fin) = stream.recv.emit(&mut buf).unwrap();
1066        assert_eq!(&buf[..len], b"helloworld");
1067        assert!(!fin);
1068
1069        assert!(stream.recv.almost_full());
1070
1071        stream.recv.update_max_data(std::time::Instant::now());
1072        assert_eq!(stream.recv.max_data_next(), 25);
1073        assert!(!stream.recv.almost_full());
1074
1075        let third = RangeBuf::from(b"something", 10, false);
1076        assert_eq!(stream.recv.write(third), Ok(()));
1077    }
1078
1079    #[test]
1080    fn recv_past_fin() {
1081        let mut stream = <Stream>::new(0, 15, 0, true, 15, DEFAULT_STREAM_WINDOW);
1082        assert!(!stream.recv.almost_full());
1083
1084        let first = RangeBuf::from(b"hello", 0, true);
1085        let second = RangeBuf::from(b"world", 5, false);
1086
1087        assert_eq!(stream.recv.write(first), Ok(()));
1088        assert_eq!(stream.recv.write(second), Err(Error::FinalSize));
1089    }
1090
1091    #[test]
1092    fn recv_fin_dup() {
1093        let mut stream = <Stream>::new(0, 15, 0, true, 15, DEFAULT_STREAM_WINDOW);
1094        assert!(!stream.recv.almost_full());
1095
1096        let first = RangeBuf::from(b"hello", 0, true);
1097        let second = RangeBuf::from(b"hello", 0, true);
1098
1099        assert_eq!(stream.recv.write(first), Ok(()));
1100        assert_eq!(stream.recv.write(second), Ok(()));
1101
1102        let mut buf = [0; 32];
1103
1104        let (len, fin) = stream.recv.emit(&mut buf).unwrap();
1105        assert_eq!(&buf[..len], b"hello");
1106        assert!(fin);
1107    }
1108
1109    #[test]
1110    fn recv_fin_change() {
1111        let mut stream = <Stream>::new(0, 15, 0, true, 15, DEFAULT_STREAM_WINDOW);
1112        assert!(!stream.recv.almost_full());
1113
1114        let first = RangeBuf::from(b"hello", 0, true);
1115        let second = RangeBuf::from(b"world", 5, true);
1116
1117        assert_eq!(stream.recv.write(second), Ok(()));
1118        assert_eq!(stream.recv.write(first), Err(Error::FinalSize));
1119    }
1120
1121    #[test]
1122    fn recv_fin_lower_than_received() {
1123        let mut stream = <Stream>::new(0, 15, 0, true, 15, DEFAULT_STREAM_WINDOW);
1124        assert!(!stream.recv.almost_full());
1125
1126        let first = RangeBuf::from(b"hello", 0, true);
1127        let second = RangeBuf::from(b"world", 5, false);
1128
1129        assert_eq!(stream.recv.write(second), Ok(()));
1130        assert_eq!(stream.recv.write(first), Err(Error::FinalSize));
1131    }
1132
1133    #[test]
1134    fn recv_fin_flow_control() {
1135        let mut stream = <Stream>::new(0, 15, 0, true, 15, DEFAULT_STREAM_WINDOW);
1136        assert!(!stream.recv.almost_full());
1137
1138        let mut buf = [0; 32];
1139
1140        let first = RangeBuf::from(b"hello", 0, false);
1141        let second = RangeBuf::from(b"world", 5, true);
1142
1143        assert_eq!(stream.recv.write(first), Ok(()));
1144        assert_eq!(stream.recv.write(second), Ok(()));
1145
1146        let (len, fin) = stream.recv.emit(&mut buf).unwrap();
1147        assert_eq!(&buf[..len], b"helloworld");
1148        assert!(fin);
1149
1150        assert!(!stream.recv.almost_full());
1151    }
1152
1153    #[test]
1154    fn recv_fin_reset_mismatch() {
1155        let mut stream = <Stream>::new(0, 15, 0, true, 15, DEFAULT_STREAM_WINDOW);
1156        assert!(!stream.recv.almost_full());
1157
1158        let first = RangeBuf::from(b"hello", 0, true);
1159
1160        assert_eq!(stream.recv.write(first), Ok(()));
1161        assert_eq!(stream.recv.reset(0, 10), Err(Error::FinalSize));
1162    }
1163
1164    #[test]
1165    fn recv_reset_with_gap() {
1166        let mut stream = <Stream>::new(0, 15, 0, true, 15, DEFAULT_STREAM_WINDOW);
1167        assert!(!stream.recv.almost_full());
1168
1169        let first = RangeBuf::from(b"hello", 0, false);
1170
1171        assert_eq!(stream.recv.write(first), Ok(()));
1172        // Read one byte.
1173        assert_eq!(stream.recv.emit(&mut [0; 1]), Ok((1, false)));
1174        // Reset with a final size > than max previously received
1175        assert_eq!(
1176            stream.recv.reset(0, 10),
1177            Ok(RecvBufResetReturn {
1178                max_data_delta: 5,
1179                // consumed_flowcontrol is 9, since we already read 1 byte
1180                consumed_flowcontrol: 9
1181            })
1182        );
1183        assert_eq!(stream.recv.reset(0, 10), Ok(RecvBufResetReturn::zero()));
1184    }
1185
1186    #[test]
1187    fn recv_reset_dup() {
1188        let mut stream = <Stream>::new(0, 15, 0, true, 15, DEFAULT_STREAM_WINDOW);
1189        assert!(!stream.recv.almost_full());
1190
1191        let first = RangeBuf::from(b"hello", 0, false);
1192
1193        assert_eq!(stream.recv.write(first), Ok(()));
1194        assert_eq!(
1195            stream.recv.reset(0, 5),
1196            Ok(RecvBufResetReturn {
1197                max_data_delta: 0,
1198                consumed_flowcontrol: 5
1199            })
1200        );
1201        assert_eq!(stream.recv.reset(0, 5), Ok(RecvBufResetReturn::zero()));
1202    }
1203
1204    #[test]
1205    fn recv_reset_change() {
1206        let mut stream = <Stream>::new(0, 15, 0, true, 15, DEFAULT_STREAM_WINDOW);
1207        assert!(!stream.recv.almost_full());
1208
1209        let first = RangeBuf::from(b"hello", 0, false);
1210
1211        assert_eq!(stream.recv.write(first), Ok(()));
1212        assert_eq!(
1213            stream.recv.reset(0, 5),
1214            Ok(RecvBufResetReturn {
1215                max_data_delta: 0,
1216                consumed_flowcontrol: 5
1217            })
1218        );
1219        assert_eq!(stream.recv.reset(0, 10), Err(Error::FinalSize));
1220    }
1221
1222    #[test]
1223    fn recv_reset_lower_than_received() {
1224        let mut stream = <Stream>::new(0, 15, 0, true, 15, DEFAULT_STREAM_WINDOW);
1225        assert!(!stream.recv.almost_full());
1226
1227        let first = RangeBuf::from(b"hello", 0, false);
1228
1229        assert_eq!(stream.recv.write(first), Ok(()));
1230        assert_eq!(stream.recv.reset(0, 4), Err(Error::FinalSize));
1231    }
1232
1233    #[test]
1234    fn send_flow_control() {
1235        let mut buf = [0; 25];
1236
1237        let mut stream = <Stream>::new(0, 0, 15, true, 0, DEFAULT_STREAM_WINDOW);
1238
1239        let first = b"hello";
1240        let second = b"world";
1241        let third = b"something";
1242
1243        assert!(stream.send.write(first, false).is_ok());
1244        assert!(stream.send.write(second, false).is_ok());
1245        assert!(stream.send.write(third, false).is_ok());
1246
1247        assert_eq!(stream.send.off_front(), 0);
1248
1249        let (written, fin) = stream.send.emit(&mut buf[..25]).unwrap();
1250        assert_eq!(written, 15);
1251        assert!(!fin);
1252        assert_eq!(&buf[..written], b"helloworldsomet");
1253
1254        assert_eq!(stream.send.off_front(), 15);
1255
1256        let (written, fin) = stream.send.emit(&mut buf[..25]).unwrap();
1257        assert_eq!(written, 0);
1258        assert!(!fin);
1259        assert_eq!(&buf[..written], b"");
1260
1261        stream.send.retransmit(0, 15);
1262
1263        assert_eq!(stream.send.off_front(), 0);
1264
1265        let (written, fin) = stream.send.emit(&mut buf[..10]).unwrap();
1266        assert_eq!(written, 10);
1267        assert!(!fin);
1268        assert_eq!(&buf[..written], b"helloworld");
1269
1270        assert_eq!(stream.send.off_front(), 10);
1271
1272        let (written, fin) = stream.send.emit(&mut buf[..10]).unwrap();
1273        assert_eq!(written, 5);
1274        assert!(!fin);
1275        assert_eq!(&buf[..written], b"somet");
1276    }
1277
1278    #[test]
1279    fn send_past_fin() {
1280        let mut stream = <Stream>::new(0, 0, 15, true, 0, DEFAULT_STREAM_WINDOW);
1281
1282        let first = b"hello";
1283        let second = b"world";
1284        let third = b"third";
1285
1286        assert_eq!(stream.send.write(first, false), Ok(5));
1287
1288        assert_eq!(stream.send.write(second, true), Ok(5));
1289        assert!(stream.send.is_fin());
1290
1291        assert_eq!(stream.send.write(third, false), Err(Error::FinalSize));
1292    }
1293
1294    #[test]
1295    fn send_fin_dup() {
1296        let mut stream = <Stream>::new(0, 0, 15, true, 0, DEFAULT_STREAM_WINDOW);
1297
1298        assert_eq!(stream.send.write(b"hello", true), Ok(5));
1299        assert!(stream.send.is_fin());
1300
1301        assert_eq!(stream.send.write(b"", true), Ok(0));
1302        assert!(stream.send.is_fin());
1303    }
1304
1305    #[test]
1306    fn send_undo_fin() {
1307        let mut stream = <Stream>::new(0, 0, 15, true, 0, DEFAULT_STREAM_WINDOW);
1308
1309        assert_eq!(stream.send.write(b"hello", true), Ok(5));
1310        assert!(stream.send.is_fin());
1311
1312        assert_eq!(
1313            stream.send.write(b"helloworld", true),
1314            Err(Error::FinalSize)
1315        );
1316    }
1317
1318    #[test]
1319    fn send_fin_max_data_match() {
1320        let mut buf = [0; 15];
1321
1322        let mut stream = <Stream>::new(0, 0, 15, true, 0, DEFAULT_STREAM_WINDOW);
1323
1324        let slice = b"hellohellohello";
1325
1326        assert!(stream.send.write(slice, true).is_ok());
1327
1328        let (written, fin) = stream.send.emit(&mut buf[..15]).unwrap();
1329        assert_eq!(written, 15);
1330        assert!(fin);
1331        assert_eq!(&buf[..written], slice);
1332    }
1333
1334    #[test]
1335    fn send_fin_zero_length() {
1336        let mut buf = [0; 5];
1337
1338        let mut stream = <Stream>::new(0, 0, 15, true, 0, DEFAULT_STREAM_WINDOW);
1339
1340        assert_eq!(stream.send.write(b"hello", false), Ok(5));
1341        assert_eq!(stream.send.write(b"", true), Ok(0));
1342        assert!(stream.send.is_fin());
1343
1344        let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1345        assert_eq!(written, 5);
1346        assert!(fin);
1347        assert_eq!(&buf[..written], b"hello");
1348    }
1349
1350    #[test]
1351    fn send_ack() {
1352        let mut buf = [0; 5];
1353
1354        let mut stream = <Stream>::new(0, 0, 15, true, 0, DEFAULT_STREAM_WINDOW);
1355
1356        assert_eq!(stream.send.write(b"hello", false), Ok(5));
1357        assert_eq!(stream.send.write(b"world", false), Ok(5));
1358        assert_eq!(stream.send.write(b"", true), Ok(0));
1359        assert!(stream.send.is_fin());
1360
1361        assert_eq!(stream.send.off_front(), 0);
1362
1363        let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1364        assert_eq!(written, 5);
1365        assert!(!fin);
1366        assert_eq!(&buf[..written], b"hello");
1367
1368        stream.send.ack_and_drop(0, 5);
1369
1370        stream.send.retransmit(0, 5);
1371
1372        assert_eq!(stream.send.off_front(), 5);
1373
1374        let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1375        assert_eq!(written, 5);
1376        assert!(fin);
1377        assert_eq!(&buf[..written], b"world");
1378    }
1379
1380    #[test]
1381    fn send_ack_reordering() {
1382        let mut buf = [0; 5];
1383
1384        let mut stream = <Stream>::new(0, 0, 15, true, 0, DEFAULT_STREAM_WINDOW);
1385
1386        assert_eq!(stream.send.write(b"hello", false), Ok(5));
1387        assert_eq!(stream.send.write(b"world", false), Ok(5));
1388        assert_eq!(stream.send.write(b"", true), Ok(0));
1389        assert!(stream.send.is_fin());
1390
1391        assert_eq!(stream.send.off_front(), 0);
1392
1393        let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1394        assert_eq!(written, 5);
1395        assert!(!fin);
1396        assert_eq!(&buf[..written], b"hello");
1397
1398        assert_eq!(stream.send.off_front(), 5);
1399
1400        let (written, fin) = stream.send.emit(&mut buf[..1]).unwrap();
1401        assert_eq!(written, 1);
1402        assert!(!fin);
1403        assert_eq!(&buf[..written], b"w");
1404
1405        stream.send.ack_and_drop(5, 1);
1406        stream.send.ack_and_drop(0, 5);
1407
1408        stream.send.retransmit(0, 5);
1409        stream.send.retransmit(5, 1);
1410
1411        assert_eq!(stream.send.off_front(), 6);
1412
1413        let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
1414        assert_eq!(written, 4);
1415        assert!(fin);
1416        assert_eq!(&buf[..written], b"orld");
1417    }
1418
1419    #[test]
1420    fn recv_data_below_off() {
1421        let mut stream = <Stream>::new(0, 15, 0, true, 15, DEFAULT_STREAM_WINDOW);
1422
1423        let first = RangeBuf::from(b"hello", 0, false);
1424
1425        assert_eq!(stream.recv.write(first), Ok(()));
1426
1427        let mut buf = [0; 10];
1428
1429        let (len, fin) = stream.recv.emit(&mut buf).unwrap();
1430        assert_eq!(&buf[..len], b"hello");
1431        assert!(!fin);
1432
1433        let first = RangeBuf::from(b"elloworld", 1, true);
1434        assert_eq!(stream.recv.write(first), Ok(()));
1435
1436        let (len, fin) = stream.recv.emit(&mut buf).unwrap();
1437        assert_eq!(&buf[..len], b"world");
1438        assert!(fin);
1439    }
1440
1441    #[test]
1442    fn stream_complete() {
1443        let mut stream =
1444            <Stream>::new(0, 30, 30, true, 30, DEFAULT_STREAM_WINDOW);
1445
1446        assert_eq!(stream.send.write(b"hello", false), Ok(5));
1447        assert_eq!(stream.send.write(b"world", false), Ok(5));
1448
1449        assert!(!stream.send.is_complete());
1450        assert!(!stream.send.is_fin());
1451
1452        assert_eq!(stream.send.write(b"", true), Ok(0));
1453
1454        assert!(!stream.send.is_complete());
1455        assert!(stream.send.is_fin());
1456
1457        let buf = RangeBuf::from(b"hello", 0, true);
1458        assert!(stream.recv.write(buf).is_ok());
1459        assert!(!stream.recv.is_fin());
1460
1461        stream.send.ack(6, 4);
1462        assert!(!stream.send.is_complete());
1463
1464        let mut buf = [0; 2];
1465        assert_eq!(stream.recv.emit(&mut buf), Ok((2, false)));
1466        assert!(!stream.recv.is_fin());
1467
1468        stream.send.ack(1, 5);
1469        assert!(!stream.send.is_complete());
1470
1471        stream.send.ack(0, 1);
1472        assert!(stream.send.is_complete());
1473
1474        assert!(!stream.is_complete());
1475
1476        let mut buf = [0; 3];
1477        assert_eq!(stream.recv.emit(&mut buf), Ok((3, true)));
1478        assert!(stream.recv.is_fin());
1479
1480        assert!(stream.is_complete());
1481    }
1482
1483    #[test]
1484    fn send_fin_zero_length_output() {
1485        let mut buf = [0; 5];
1486
1487        let mut stream = <Stream>::new(0, 0, 15, true, 0, DEFAULT_STREAM_WINDOW);
1488
1489        assert_eq!(stream.send.write(b"hello", false), Ok(5));
1490        assert_eq!(stream.send.off_front(), 0);
1491        assert!(!stream.send.is_fin());
1492
1493        let (written, fin) = stream.send.emit(&mut buf).unwrap();
1494        assert_eq!(written, 5);
1495        assert!(!fin);
1496        assert_eq!(&buf[..written], b"hello");
1497
1498        assert_eq!(stream.send.write(b"", true), Ok(0));
1499        assert!(stream.send.is_fin());
1500        assert_eq!(stream.send.off_front(), 5);
1501
1502        let (written, fin) = stream.send.emit(&mut buf).unwrap();
1503        assert_eq!(written, 0);
1504        assert!(fin);
1505        assert_eq!(&buf[..written], b"");
1506    }
1507
1508    fn stream_send_ready(stream: &Stream) -> bool {
1509        !stream.send.is_empty() &&
1510            stream.send.off_front() < stream.send.off_back()
1511    }
1512
1513    #[test]
1514    fn send_emit() {
1515        let mut buf = [0; 5];
1516
1517        let mut stream = <Stream>::new(0, 0, 20, true, 0, DEFAULT_STREAM_WINDOW);
1518
1519        assert_eq!(stream.send.write(b"hello", false), Ok(5));
1520        assert_eq!(stream.send.write(b"world", false), Ok(5));
1521        assert_eq!(stream.send.write(b"olleh", false), Ok(5));
1522        assert_eq!(stream.send.write(b"dlrow", true), Ok(5));
1523        assert_eq!(stream.send.off_front(), 0);
1524        assert_eq!(stream.send.bufs_count(), 4);
1525
1526        assert!(stream.is_flushable());
1527
1528        assert!(stream_send_ready(&stream));
1529        assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1530        assert_eq!(stream.send.off_front(), 4);
1531        assert_eq!(&buf[..4], b"hell");
1532
1533        assert!(stream_send_ready(&stream));
1534        assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1535        assert_eq!(stream.send.off_front(), 8);
1536        assert_eq!(&buf[..4], b"owor");
1537
1538        assert!(stream_send_ready(&stream));
1539        assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1540        assert_eq!(stream.send.off_front(), 10);
1541        assert_eq!(&buf[..2], b"ld");
1542
1543        assert!(stream_send_ready(&stream));
1544        assert_eq!(stream.send.emit(&mut buf[..1]), Ok((1, false)));
1545        assert_eq!(stream.send.off_front(), 11);
1546        assert_eq!(&buf[..1], b"o");
1547
1548        assert!(stream_send_ready(&stream));
1549        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1550        assert_eq!(stream.send.off_front(), 16);
1551        assert_eq!(&buf[..5], b"llehd");
1552
1553        assert!(stream_send_ready(&stream));
1554        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((4, true)));
1555        assert_eq!(stream.send.off_front(), 20);
1556        assert_eq!(&buf[..4], b"lrow");
1557
1558        assert!(!stream.is_flushable());
1559
1560        assert!(!stream_send_ready(&stream));
1561        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((0, true)));
1562        assert_eq!(stream.send.off_front(), 20);
1563    }
1564
1565    #[test]
1566    fn send_emit_ack() {
1567        let mut buf = [0; 5];
1568
1569        let mut stream = <Stream>::new(0, 0, 20, true, 0, DEFAULT_STREAM_WINDOW);
1570
1571        assert_eq!(stream.send.write(b"hello", false), Ok(5));
1572        assert_eq!(stream.send.write(b"world", false), Ok(5));
1573        assert_eq!(stream.send.write(b"olleh", false), Ok(5));
1574        assert_eq!(stream.send.write(b"dlrow", true), Ok(5));
1575        assert_eq!(stream.send.off_front(), 0);
1576        assert_eq!(stream.send.bufs_count(), 4);
1577
1578        assert!(stream.is_flushable());
1579
1580        assert!(stream_send_ready(&stream));
1581        assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1582        assert_eq!(stream.send.off_front(), 4);
1583        assert_eq!(&buf[..4], b"hell");
1584
1585        assert!(stream_send_ready(&stream));
1586        assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1587        assert_eq!(stream.send.off_front(), 8);
1588        assert_eq!(&buf[..4], b"owor");
1589
1590        stream.send.ack_and_drop(0, 5);
1591        assert_eq!(stream.send.bufs_count(), 3);
1592
1593        assert!(stream_send_ready(&stream));
1594        assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1595        assert_eq!(stream.send.off_front(), 10);
1596        assert_eq!(&buf[..2], b"ld");
1597
1598        stream.send.ack_and_drop(7, 5);
1599        assert_eq!(stream.send.bufs_count(), 3);
1600
1601        assert!(stream_send_ready(&stream));
1602        assert_eq!(stream.send.emit(&mut buf[..1]), Ok((1, false)));
1603        assert_eq!(stream.send.off_front(), 11);
1604        assert_eq!(&buf[..1], b"o");
1605
1606        assert!(stream_send_ready(&stream));
1607        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1608        assert_eq!(stream.send.off_front(), 16);
1609        assert_eq!(&buf[..5], b"llehd");
1610
1611        stream.send.ack_and_drop(5, 7);
1612        assert_eq!(stream.send.bufs_count(), 2);
1613
1614        assert!(stream_send_ready(&stream));
1615        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((4, true)));
1616        assert_eq!(stream.send.off_front(), 20);
1617        assert_eq!(&buf[..4], b"lrow");
1618
1619        assert!(!stream.is_flushable());
1620
1621        assert!(!stream_send_ready(&stream));
1622        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((0, true)));
1623        assert_eq!(stream.send.off_front(), 20);
1624
1625        stream.send.ack_and_drop(22, 4);
1626        assert_eq!(stream.send.bufs_count(), 2);
1627
1628        stream.send.ack_and_drop(20, 1);
1629        assert_eq!(stream.send.bufs_count(), 2);
1630    }
1631
1632    #[test]
1633    fn send_emit_retransmit() {
1634        let mut buf = [0; 5];
1635
1636        let mut stream = <Stream>::new(
1637            0,
1638            0,
1639            20,
1640            true,
1641            DEFAULT_STREAM_WINDOW,
1642            DEFAULT_STREAM_WINDOW,
1643        );
1644
1645        assert_eq!(stream.send.write(b"hello", false), Ok(5));
1646        assert_eq!(stream.send.write(b"world", false), Ok(5));
1647        assert_eq!(stream.send.write(b"olleh", false), Ok(5));
1648        assert_eq!(stream.send.write(b"dlrow", true), Ok(5));
1649        assert_eq!(stream.send.off_front(), 0);
1650        assert_eq!(stream.send.bufs_count(), 4);
1651
1652        assert!(stream.is_flushable());
1653
1654        assert!(stream_send_ready(&stream));
1655        assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1656        assert_eq!(stream.send.off_front(), 4);
1657        assert_eq!(&buf[..4], b"hell");
1658
1659        assert!(stream_send_ready(&stream));
1660        assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
1661        assert_eq!(stream.send.off_front(), 8);
1662        assert_eq!(&buf[..4], b"owor");
1663
1664        stream.send.retransmit(3, 3);
1665        assert_eq!(stream.send.off_front(), 3);
1666
1667        assert!(stream_send_ready(&stream));
1668        assert_eq!(stream.send.emit(&mut buf[..3]), Ok((3, false)));
1669        assert_eq!(stream.send.off_front(), 8);
1670        assert_eq!(&buf[..3], b"low");
1671
1672        assert!(stream_send_ready(&stream));
1673        assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1674        assert_eq!(stream.send.off_front(), 10);
1675        assert_eq!(&buf[..2], b"ld");
1676
1677        stream.send.ack_and_drop(7, 2);
1678
1679        stream.send.retransmit(8, 2);
1680
1681        assert!(stream_send_ready(&stream));
1682        assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1683        assert_eq!(stream.send.off_front(), 10);
1684        assert_eq!(&buf[..2], b"ld");
1685
1686        assert!(stream_send_ready(&stream));
1687        assert_eq!(stream.send.emit(&mut buf[..1]), Ok((1, false)));
1688        assert_eq!(stream.send.off_front(), 11);
1689        assert_eq!(&buf[..1], b"o");
1690
1691        assert!(stream_send_ready(&stream));
1692        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1693        assert_eq!(stream.send.off_front(), 16);
1694        assert_eq!(&buf[..5], b"llehd");
1695
1696        stream.send.retransmit(12, 2);
1697
1698        assert!(stream_send_ready(&stream));
1699        assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
1700        assert_eq!(stream.send.off_front(), 16);
1701        assert_eq!(&buf[..2], b"le");
1702
1703        assert!(stream_send_ready(&stream));
1704        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((4, true)));
1705        assert_eq!(stream.send.off_front(), 20);
1706        assert_eq!(&buf[..4], b"lrow");
1707
1708        assert!(!stream.is_flushable());
1709
1710        assert!(!stream_send_ready(&stream));
1711        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((0, true)));
1712        assert_eq!(stream.send.off_front(), 20);
1713
1714        stream.send.retransmit(7, 12);
1715
1716        assert!(stream_send_ready(&stream));
1717        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1718        assert_eq!(stream.send.off_front(), 12);
1719        assert_eq!(&buf[..5], b"rldol");
1720
1721        assert!(stream_send_ready(&stream));
1722        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1723        assert_eq!(stream.send.off_front(), 17);
1724        assert_eq!(&buf[..5], b"lehdl");
1725
1726        assert!(stream_send_ready(&stream));
1727        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((2, false)));
1728        assert_eq!(stream.send.off_front(), 20);
1729        assert_eq!(&buf[..2], b"ro");
1730
1731        stream.send.ack_and_drop(12, 7);
1732
1733        stream.send.retransmit(7, 12);
1734
1735        assert!(stream_send_ready(&stream));
1736        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1737        assert_eq!(stream.send.off_front(), 12);
1738        assert_eq!(&buf[..5], b"rldol");
1739
1740        assert!(stream_send_ready(&stream));
1741        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
1742        assert_eq!(stream.send.off_front(), 17);
1743        assert_eq!(&buf[..5], b"lehdl");
1744
1745        assert!(stream_send_ready(&stream));
1746        assert_eq!(stream.send.emit(&mut buf[..5]), Ok((2, false)));
1747        assert_eq!(stream.send.off_front(), 20);
1748        assert_eq!(&buf[..2], b"ro");
1749    }
1750
1751    #[test]
1752    fn rangebuf_split_off() {
1753        let mut buf = <RangeBuf>::from(b"helloworld", 5, true);
1754        assert_eq!(buf.start, 0);
1755        assert_eq!(buf.pos, 0);
1756        assert_eq!(buf.len, 10);
1757        assert_eq!(buf.off, 5);
1758        assert!(buf.fin);
1759
1760        assert_eq!(buf.len(), 10);
1761        assert_eq!(buf.off(), 5);
1762        assert!(buf.fin());
1763
1764        assert_eq!(&buf[..], b"helloworld");
1765
1766        // Advance buffer.
1767        buf.consume(5);
1768
1769        assert_eq!(buf.start, 0);
1770        assert_eq!(buf.pos, 5);
1771        assert_eq!(buf.len, 10);
1772        assert_eq!(buf.off, 5);
1773        assert!(buf.fin);
1774
1775        assert_eq!(buf.len(), 5);
1776        assert_eq!(buf.off(), 10);
1777        assert!(buf.fin());
1778
1779        assert_eq!(&buf[..], b"world");
1780
1781        // Split buffer before position.
1782        let mut new_buf = buf.split_off(3);
1783
1784        assert_eq!(buf.start, 0);
1785        assert_eq!(buf.pos, 3);
1786        assert_eq!(buf.len, 3);
1787        assert_eq!(buf.off, 5);
1788        assert!(!buf.fin);
1789
1790        assert_eq!(buf.len(), 0);
1791        assert_eq!(buf.off(), 8);
1792        assert!(!buf.fin());
1793
1794        assert_eq!(&buf[..], b"");
1795
1796        assert_eq!(new_buf.start, 3);
1797        assert_eq!(new_buf.pos, 5);
1798        assert_eq!(new_buf.len, 7);
1799        assert_eq!(new_buf.off, 8);
1800        assert!(new_buf.fin);
1801
1802        assert_eq!(new_buf.len(), 5);
1803        assert_eq!(new_buf.off(), 10);
1804        assert!(new_buf.fin());
1805
1806        assert_eq!(&new_buf[..], b"world");
1807
1808        // Advance buffer.
1809        new_buf.consume(2);
1810
1811        assert_eq!(new_buf.start, 3);
1812        assert_eq!(new_buf.pos, 7);
1813        assert_eq!(new_buf.len, 7);
1814        assert_eq!(new_buf.off, 8);
1815        assert!(new_buf.fin);
1816
1817        assert_eq!(new_buf.len(), 3);
1818        assert_eq!(new_buf.off(), 12);
1819        assert!(new_buf.fin());
1820
1821        assert_eq!(&new_buf[..], b"rld");
1822
1823        // Split buffer after position.
1824        let mut new_new_buf = new_buf.split_off(5);
1825
1826        assert_eq!(new_buf.start, 3);
1827        assert_eq!(new_buf.pos, 7);
1828        assert_eq!(new_buf.len, 5);
1829        assert_eq!(new_buf.off, 8);
1830        assert!(!new_buf.fin);
1831
1832        assert_eq!(new_buf.len(), 1);
1833        assert_eq!(new_buf.off(), 12);
1834        assert!(!new_buf.fin());
1835
1836        assert_eq!(&new_buf[..], b"r");
1837
1838        assert_eq!(new_new_buf.start, 8);
1839        assert_eq!(new_new_buf.pos, 8);
1840        assert_eq!(new_new_buf.len, 2);
1841        assert_eq!(new_new_buf.off, 13);
1842        assert!(new_new_buf.fin);
1843
1844        assert_eq!(new_new_buf.len(), 2);
1845        assert_eq!(new_new_buf.off(), 13);
1846        assert!(new_new_buf.fin());
1847
1848        assert_eq!(&new_new_buf[..], b"ld");
1849
1850        // Advance buffer.
1851        new_new_buf.consume(2);
1852
1853        assert_eq!(new_new_buf.start, 8);
1854        assert_eq!(new_new_buf.pos, 10);
1855        assert_eq!(new_new_buf.len, 2);
1856        assert_eq!(new_new_buf.off, 13);
1857        assert!(new_new_buf.fin);
1858
1859        assert_eq!(new_new_buf.len(), 0);
1860        assert_eq!(new_new_buf.off(), 15);
1861        assert!(new_new_buf.fin());
1862
1863        assert_eq!(&new_new_buf[..], b"");
1864    }
1865
1866    /// RFC9000 2.1: A stream ID that is used out of order results in all
1867    /// streams of that type with lower-numbered stream IDs also being opened.
1868    #[test]
1869    fn stream_limit_auto_open() {
1870        let local_tp = crate::TransportParams::default();
1871        let peer_tp = crate::TransportParams::default();
1872
1873        let mut streams = <StreamMap>::new(5, 5, 5);
1874
1875        let stream_id = 500;
1876        assert!(!is_local(stream_id, true), "stream id is peer initiated");
1877        assert!(is_bidi(stream_id), "stream id is bidirectional");
1878        assert_eq!(
1879            streams
1880                .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
1881                .err(),
1882            Some(Error::StreamLimit),
1883            "stream limit should be exceeded"
1884        );
1885    }
1886
1887    /// Stream limit should be satisfied regardless of what order we open
1888    /// streams
1889    #[test]
1890    fn stream_create_out_of_order() {
1891        let local_tp = crate::TransportParams::default();
1892        let peer_tp = crate::TransportParams::default();
1893
1894        let mut streams = <StreamMap>::new(5, 5, 5);
1895
1896        for stream_id in [8, 12, 4] {
1897            assert!(is_local(stream_id, false), "stream id is client initiated");
1898            assert!(is_bidi(stream_id), "stream id is bidirectional");
1899            assert!(streams
1900                .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
1901                .is_ok());
1902        }
1903    }
1904
1905    /// Check stream limit boundary cases
1906    #[test]
1907    fn stream_limit_edge() {
1908        let local_tp = crate::TransportParams::default();
1909        let peer_tp = crate::TransportParams::default();
1910
1911        let mut streams = <StreamMap>::new(3, 3, 3);
1912
1913        // Highest permitted
1914        let stream_id = 8;
1915        assert!(streams
1916            .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
1917            .is_ok());
1918
1919        // One more than highest permitted
1920        let stream_id = 12;
1921        assert_eq!(
1922            streams
1923                .get_or_create(stream_id, &local_tp, &peer_tp, false, true)
1924                .err(),
1925            Some(Error::StreamLimit)
1926        );
1927    }
1928
1929    fn cycle_stream_priority(stream_id: u64, streams: &mut StreamMap) {
1930        let key = streams.get(stream_id).unwrap().priority_key.clone();
1931        streams.update_priority(&key.clone(), &key);
1932    }
1933
1934    #[test]
1935    fn writable_prioritized_default_priority() {
1936        let local_tp = crate::TransportParams::default();
1937        let peer_tp = crate::TransportParams {
1938            initial_max_stream_data_bidi_local: 100,
1939            initial_max_stream_data_uni: 100,
1940            ..Default::default()
1941        };
1942
1943        let mut streams = StreamMap::new(100, 100, 100);
1944
1945        for id in [0, 4, 8, 12] {
1946            assert!(streams
1947                .get_or_create(id, &local_tp, &peer_tp, false, true)
1948                .is_ok());
1949        }
1950
1951        let walk_1: Vec<u64> = streams.writable().collect();
1952        cycle_stream_priority(*walk_1.first().unwrap(), &mut streams);
1953        let walk_2: Vec<u64> = streams.writable().collect();
1954        cycle_stream_priority(*walk_2.first().unwrap(), &mut streams);
1955        let walk_3: Vec<u64> = streams.writable().collect();
1956        cycle_stream_priority(*walk_3.first().unwrap(), &mut streams);
1957        let walk_4: Vec<u64> = streams.writable().collect();
1958        cycle_stream_priority(*walk_4.first().unwrap(), &mut streams);
1959        let walk_5: Vec<u64> = streams.writable().collect();
1960
1961        // All streams are non-incremental and same urgency by default. Multiple
1962        // visits shuffle their order.
1963        assert_eq!(walk_1, vec![0, 4, 8, 12]);
1964        assert_eq!(walk_2, vec![4, 8, 12, 0]);
1965        assert_eq!(walk_3, vec![8, 12, 0, 4]);
1966        assert_eq!(walk_4, vec![12, 0, 4, 8,]);
1967        assert_eq!(walk_5, vec![0, 4, 8, 12]);
1968    }
1969
1970    #[test]
1971    fn writable_prioritized_insert_order() {
1972        let local_tp = crate::TransportParams::default();
1973        let peer_tp = crate::TransportParams {
1974            initial_max_stream_data_bidi_local: 100,
1975            initial_max_stream_data_uni: 100,
1976            ..Default::default()
1977        };
1978
1979        let mut streams = StreamMap::new(100, 100, 100);
1980
1981        // Inserting same-urgency incremental streams in a "random" order yields
1982        // same order to start with.
1983        for id in [12, 4, 8, 0] {
1984            assert!(streams
1985                .get_or_create(id, &local_tp, &peer_tp, false, true)
1986                .is_ok());
1987        }
1988
1989        let walk_1: Vec<u64> = streams.writable().collect();
1990        cycle_stream_priority(*walk_1.first().unwrap(), &mut streams);
1991        let walk_2: Vec<u64> = streams.writable().collect();
1992        cycle_stream_priority(*walk_2.first().unwrap(), &mut streams);
1993        let walk_3: Vec<u64> = streams.writable().collect();
1994        cycle_stream_priority(*walk_3.first().unwrap(), &mut streams);
1995        let walk_4: Vec<u64> = streams.writable().collect();
1996        cycle_stream_priority(*walk_4.first().unwrap(), &mut streams);
1997        let walk_5: Vec<u64> = streams.writable().collect();
1998        assert_eq!(walk_1, vec![12, 4, 8, 0]);
1999        assert_eq!(walk_2, vec![4, 8, 0, 12]);
2000        assert_eq!(walk_3, vec![8, 0, 12, 4,]);
2001        assert_eq!(walk_4, vec![0, 12, 4, 8]);
2002        assert_eq!(walk_5, vec![12, 4, 8, 0]);
2003    }
2004
2005    #[test]
2006    fn writable_prioritized_mixed_urgency() {
2007        let local_tp = crate::TransportParams::default();
2008        let peer_tp = crate::TransportParams {
2009            initial_max_stream_data_bidi_local: 100,
2010            initial_max_stream_data_uni: 100,
2011            ..Default::default()
2012        };
2013
2014        let mut streams = <StreamMap>::new(100, 100, 100);
2015
2016        // Streams where the urgency descends (becomes more important). No stream
2017        // shares an urgency.
2018        let input = vec![
2019            (0, 100),
2020            (4, 90),
2021            (8, 80),
2022            (12, 70),
2023            (16, 60),
2024            (20, 50),
2025            (24, 40),
2026            (28, 30),
2027            (32, 20),
2028            (36, 10),
2029            (40, 0),
2030        ];
2031
2032        for (id, urgency) in input.clone() {
2033            // this duplicates some code from stream_priority in order to access
2034            // streams and the collection they're in
2035            let stream = streams
2036                .get_or_create(id, &local_tp, &peer_tp, false, true)
2037                .unwrap();
2038
2039            stream.urgency = urgency;
2040
2041            let new_priority_key = Arc::new(StreamPriorityKey {
2042                urgency: stream.urgency,
2043                incremental: stream.incremental,
2044                id,
2045                ..Default::default()
2046            });
2047
2048            let old_priority_key = std::mem::replace(
2049                &mut stream.priority_key,
2050                new_priority_key.clone(),
2051            );
2052
2053            streams.update_priority(&old_priority_key, &new_priority_key);
2054        }
2055
2056        let walk_1: Vec<u64> = streams.writable().collect();
2057        assert_eq!(walk_1, vec![40, 36, 32, 28, 24, 20, 16, 12, 8, 4, 0]);
2058
2059        // Re-applying priority to a stream does not cause duplication.
2060        for (id, urgency) in input {
2061            // this duplicates some code from stream_priority in order to access
2062            // streams and the collection they're in
2063            let stream = streams
2064                .get_or_create(id, &local_tp, &peer_tp, false, true)
2065                .unwrap();
2066
2067            stream.urgency = urgency;
2068
2069            let new_priority_key = Arc::new(StreamPriorityKey {
2070                urgency: stream.urgency,
2071                incremental: stream.incremental,
2072                id,
2073                ..Default::default()
2074            });
2075
2076            let old_priority_key = std::mem::replace(
2077                &mut stream.priority_key,
2078                new_priority_key.clone(),
2079            );
2080
2081            streams.update_priority(&old_priority_key, &new_priority_key);
2082        }
2083
2084        let walk_2: Vec<u64> = streams.writable().collect();
2085        assert_eq!(walk_2, vec![40, 36, 32, 28, 24, 20, 16, 12, 8, 4, 0]);
2086
2087        // Removing streams doesn't break expected ordering.
2088        streams.collect(24, true);
2089
2090        let walk_3: Vec<u64> = streams.writable().collect();
2091        assert_eq!(walk_3, vec![40, 36, 32, 28, 20, 16, 12, 8, 4, 0]);
2092
2093        streams.collect(40, true);
2094        streams.collect(0, true);
2095
2096        let walk_4: Vec<u64> = streams.writable().collect();
2097        assert_eq!(walk_4, vec![36, 32, 28, 20, 16, 12, 8, 4]);
2098
2099        // Adding streams doesn't break expected ordering.
2100        streams
2101            .get_or_create(44, &local_tp, &peer_tp, false, true)
2102            .unwrap();
2103
2104        let walk_5: Vec<u64> = streams.writable().collect();
2105        assert_eq!(walk_5, vec![36, 32, 28, 20, 16, 12, 8, 4, 44]);
2106    }
2107
2108    #[test]
2109    fn writable_prioritized_mixed_urgencies_incrementals() {
2110        let local_tp = crate::TransportParams::default();
2111        let peer_tp = crate::TransportParams {
2112            initial_max_stream_data_bidi_local: 100,
2113            initial_max_stream_data_uni: 100,
2114            ..Default::default()
2115        };
2116
2117        let mut streams = StreamMap::new(100, 100, 100);
2118
2119        // Streams that share some urgency level
2120        let input = vec![
2121            (0, 100),
2122            (4, 20),
2123            (8, 100),
2124            (12, 20),
2125            (16, 90),
2126            (20, 25),
2127            (24, 90),
2128            (28, 30),
2129            (32, 80),
2130            (36, 20),
2131            (40, 0),
2132        ];
2133
2134        for (id, urgency) in input.clone() {
2135            // this duplicates some code from stream_priority in order to access
2136            // streams and the collection they're in
2137            let stream = streams
2138                .get_or_create(id, &local_tp, &peer_tp, false, true)
2139                .unwrap();
2140
2141            stream.urgency = urgency;
2142
2143            let new_priority_key = Arc::new(StreamPriorityKey {
2144                urgency: stream.urgency,
2145                incremental: stream.incremental,
2146                id,
2147                ..Default::default()
2148            });
2149
2150            let old_priority_key = std::mem::replace(
2151                &mut stream.priority_key,
2152                new_priority_key.clone(),
2153            );
2154
2155            streams.update_priority(&old_priority_key, &new_priority_key);
2156        }
2157
2158        let walk_1: Vec<u64> = streams.writable().collect();
2159        cycle_stream_priority(4, &mut streams);
2160        cycle_stream_priority(16, &mut streams);
2161        cycle_stream_priority(0, &mut streams);
2162        let walk_2: Vec<u64> = streams.writable().collect();
2163        cycle_stream_priority(12, &mut streams);
2164        cycle_stream_priority(24, &mut streams);
2165        cycle_stream_priority(8, &mut streams);
2166        let walk_3: Vec<u64> = streams.writable().collect();
2167        cycle_stream_priority(36, &mut streams);
2168        cycle_stream_priority(16, &mut streams);
2169        cycle_stream_priority(0, &mut streams);
2170        let walk_4: Vec<u64> = streams.writable().collect();
2171        cycle_stream_priority(4, &mut streams);
2172        cycle_stream_priority(24, &mut streams);
2173        cycle_stream_priority(8, &mut streams);
2174        let walk_5: Vec<u64> = streams.writable().collect();
2175        cycle_stream_priority(12, &mut streams);
2176        cycle_stream_priority(16, &mut streams);
2177        cycle_stream_priority(0, &mut streams);
2178        let walk_6: Vec<u64> = streams.writable().collect();
2179        cycle_stream_priority(36, &mut streams);
2180        cycle_stream_priority(24, &mut streams);
2181        cycle_stream_priority(8, &mut streams);
2182        let walk_7: Vec<u64> = streams.writable().collect();
2183        cycle_stream_priority(4, &mut streams);
2184        cycle_stream_priority(16, &mut streams);
2185        cycle_stream_priority(0, &mut streams);
2186        let walk_8: Vec<u64> = streams.writable().collect();
2187        cycle_stream_priority(12, &mut streams);
2188        cycle_stream_priority(24, &mut streams);
2189        cycle_stream_priority(8, &mut streams);
2190        let walk_9: Vec<u64> = streams.writable().collect();
2191        cycle_stream_priority(36, &mut streams);
2192        cycle_stream_priority(16, &mut streams);
2193        cycle_stream_priority(0, &mut streams);
2194
2195        assert_eq!(walk_1, vec![40, 4, 12, 36, 20, 28, 32, 16, 24, 0, 8]);
2196        assert_eq!(walk_2, vec![40, 12, 36, 4, 20, 28, 32, 24, 16, 8, 0]);
2197        assert_eq!(walk_3, vec![40, 36, 4, 12, 20, 28, 32, 16, 24, 0, 8]);
2198        assert_eq!(walk_4, vec![40, 4, 12, 36, 20, 28, 32, 24, 16, 8, 0]);
2199        assert_eq!(walk_5, vec![40, 12, 36, 4, 20, 28, 32, 16, 24, 0, 8]);
2200        assert_eq!(walk_6, vec![40, 36, 4, 12, 20, 28, 32, 24, 16, 8, 0]);
2201        assert_eq!(walk_7, vec![40, 4, 12, 36, 20, 28, 32, 16, 24, 0, 8]);
2202        assert_eq!(walk_8, vec![40, 12, 36, 4, 20, 28, 32, 24, 16, 8, 0]);
2203        assert_eq!(walk_9, vec![40, 36, 4, 12, 20, 28, 32, 16, 24, 0, 8]);
2204
2205        // Removing streams doesn't break expected ordering.
2206        streams.collect(20, true);
2207
2208        let walk_10: Vec<u64> = streams.writable().collect();
2209        assert_eq!(walk_10, vec![40, 4, 12, 36, 28, 32, 24, 16, 8, 0]);
2210
2211        // Adding streams doesn't break expected ordering.
2212        let stream = streams
2213            .get_or_create(44, &local_tp, &peer_tp, false, true)
2214            .unwrap();
2215
2216        stream.urgency = 20;
2217        stream.incremental = true;
2218
2219        let new_priority_key = Arc::new(StreamPriorityKey {
2220            urgency: stream.urgency,
2221            incremental: stream.incremental,
2222            id: 44,
2223            ..Default::default()
2224        });
2225
2226        let old_priority_key =
2227            std::mem::replace(&mut stream.priority_key, new_priority_key.clone());
2228
2229        streams.update_priority(&old_priority_key, &new_priority_key);
2230
2231        let walk_11: Vec<u64> = streams.writable().collect();
2232        assert_eq!(walk_11, vec![40, 4, 12, 36, 44, 28, 32, 24, 16, 8, 0]);
2233    }
2234
2235    #[test]
2236    fn priority_tree_dupes() {
2237        let mut prioritized_writable: RBTree<StreamWritablePriorityAdapter> =
2238            Default::default();
2239
2240        for id in [0, 4, 8, 12] {
2241            let s = Arc::new(StreamPriorityKey {
2242                urgency: 0,
2243                incremental: false,
2244                id,
2245                ..Default::default()
2246            });
2247
2248            prioritized_writable.insert(s);
2249        }
2250
2251        let walk_1: Vec<u64> =
2252            prioritized_writable.iter().map(|s| s.id).collect();
2253        assert_eq!(walk_1, vec![0, 4, 8, 12]);
2254
2255        // Default keys could cause duplicate entries, this is normally protected
2256        // against via StreamMap.
2257        for id in [0, 4, 8, 12] {
2258            let s = Arc::new(StreamPriorityKey {
2259                urgency: 0,
2260                incremental: false,
2261                id,
2262                ..Default::default()
2263            });
2264
2265            prioritized_writable.insert(s);
2266        }
2267
2268        let walk_2: Vec<u64> =
2269            prioritized_writable.iter().map(|s| s.id).collect();
2270        assert_eq!(walk_2, vec![0, 0, 4, 4, 8, 8, 12, 12]);
2271    }
2272
2273    #[test]
2274    fn retransmit_returns_zero_when_already_acked() {
2275        let mut stream = <Stream>::new(0, 15, 15, true, 0, 15);
2276
2277        // Write and emit some data.
2278        assert_eq!(stream.send.write(b"hello", false), Ok(5));
2279        assert_eq!(stream.send.buffered_bytes(), 5);
2280
2281        let mut buf = [0; 10];
2282        let (written, _) = stream.send.emit(&mut buf).unwrap();
2283        assert_eq!(written, 5);
2284        assert_eq!(stream.send.buffered_bytes(), 0);
2285
2286        // Mark data for retransmission.
2287        let retransmitted = stream.send.retransmit(0, 5);
2288        assert_eq!(retransmitted, 5);
2289        assert_eq!(stream.send.buffered_bytes(), 5);
2290
2291        // Ack the data.
2292        stream.send.ack_and_drop(0, 5);
2293        assert_eq!(stream.send.buffered_bytes(), 0);
2294
2295        // Try to retransmit again - should return 0 since data is acked.
2296        let retransmitted = stream.send.retransmit(0, 5);
2297        assert_eq!(retransmitted, 0);
2298        assert_eq!(stream.send.buffered_bytes(), 0);
2299    }
2300
2301    #[test]
2302    fn retransmit_returns_partial_when_some_acked() {
2303        let mut stream = <Stream>::new(0, 15, 15, true, 0, 15);
2304
2305        // Write and emit 10 bytes.
2306        assert_eq!(stream.send.write(b"helloworld", false), Ok(10));
2307        assert_eq!(stream.send.buffered_bytes(), 10);
2308
2309        let mut buf = [0; 10];
2310        let (written, _) = stream.send.emit(&mut buf).unwrap();
2311        assert_eq!(written, 10);
2312        assert_eq!(stream.send.buffered_bytes(), 0);
2313
2314        // Mark all data for retransmission.
2315        let retransmitted = stream.send.retransmit(0, 10);
2316        assert_eq!(retransmitted, 10);
2317        assert_eq!(stream.send.buffered_bytes(), 10);
2318
2319        // Ack first 5 bytes and drop them.
2320        let dropped = stream.send.ack_and_drop(0, 5);
2321        assert_eq!(dropped, 5);
2322        assert_eq!(stream.send.buffered_bytes(), 5);
2323
2324        // Try to retransmit all 10 bytes - should return 5 since first 5 are
2325        // acked.
2326        let retransmitted = stream.send.retransmit(0, 10);
2327        assert_eq!(retransmitted, 0); // Already marked, so no change
2328        assert_eq!(stream.send.buffered_bytes(), 5);
2329    }
2330
2331    #[test]
2332    fn ack_and_drop_decrements_len_and_returns_dropped() {
2333        let mut stream = <Stream>::new(0, 15, 15, true, 0, 15);
2334
2335        // Write some data.
2336        assert_eq!(stream.send.write(b"hello", false), Ok(5));
2337        assert_eq!(stream.send.buffered_bytes(), 5);
2338
2339        // Emit it.
2340        let mut buf = [0; 10];
2341        let (written, _) = stream.send.emit(&mut buf).unwrap();
2342        assert_eq!(written, 5);
2343        assert_eq!(stream.send.buffered_bytes(), 0);
2344
2345        // Mark for retransmission.
2346        let retransmitted = stream.send.retransmit(0, 5);
2347        assert_eq!(retransmitted, 5);
2348        assert_eq!(stream.send.buffered_bytes(), 5);
2349
2350        // Ack and drop - should decrement len and return dropped amount.
2351        let dropped = stream.send.ack_and_drop(0, 5);
2352        assert_eq!(dropped, 5);
2353        assert_eq!(stream.send.buffered_bytes(), 0);
2354    }
2355
2356    #[test]
2357    fn ack_and_drop_partial_buffer() {
2358        let mut stream = <Stream>::new(0, 30, 30, true, 0, 30);
2359
2360        // Write and emit two chunks.
2361        assert_eq!(stream.send.write(b"hello", false), Ok(5));
2362        assert_eq!(stream.send.write(b"world", false), Ok(5));
2363        assert_eq!(stream.send.buffered_bytes(), 10);
2364
2365        let mut buf = [0; 10];
2366        let (written, _) = stream.send.emit(&mut buf).unwrap();
2367        assert_eq!(written, 10);
2368        assert_eq!(stream.send.buffered_bytes(), 0);
2369
2370        // Mark both chunks for retransmission.
2371        let retransmitted = stream.send.retransmit(0, 10);
2372        assert_eq!(retransmitted, 10);
2373        assert_eq!(stream.send.buffered_bytes(), 10);
2374
2375        // Ack and drop only first chunk.
2376        let dropped = stream.send.ack_and_drop(0, 5);
2377        assert_eq!(dropped, 5);
2378        assert_eq!(stream.send.buffered_bytes(), 5);
2379
2380        // Ack and drop second chunk.
2381        let dropped = stream.send.ack_and_drop(5, 5);
2382        assert_eq!(dropped, 5);
2383        assert_eq!(stream.send.buffered_bytes(), 0);
2384    }
2385
2386    #[test]
2387    fn ack_and_drop_returns_zero_when_nothing_dropped() {
2388        let mut stream = <Stream>::new(0, 15, 15, true, 0, 15);
2389
2390        // Write and emit data.
2391        assert_eq!(stream.send.write(b"hello", false), Ok(5));
2392        let mut buf = [0; 10];
2393        let (written, _) = stream.send.emit(&mut buf).unwrap();
2394        assert_eq!(written, 5);
2395
2396        // Ack data that's already been fully emitted and not retransmitted.
2397        // Nothing should be dropped since there's no buffered data.
2398        let dropped = stream.send.ack_and_drop(0, 5);
2399        assert_eq!(dropped, 0);
2400        assert_eq!(stream.send.buffered_bytes(), 0);
2401    }
2402
2403    #[test]
2404    fn cache_consistency_through_full_lifecycle() {
2405        // This test verifies that StreamMap.tx_buffered stays in sync with
2406        // actual buffered data through a full lifecycle: write → emit →
2407        // retransmit → ack.
2408        let mut streams = <StreamMap>::new(5, 5, 15);
2409
2410        // Create a stream using low-level StreamMap interface.
2411        let local_params = crate::TransportParams {
2412            initial_max_data: 30,
2413            initial_max_stream_data_bidi_local: 15,
2414            initial_max_stream_data_bidi_remote: 15,
2415            initial_max_stream_data_uni: 10,
2416            initial_max_streams_bidi: 5,
2417            initial_max_streams_uni: 5,
2418            ..Default::default()
2419        };
2420        let peer_params = local_params.clone();
2421
2422        // Update peer stream limits to allow locally-initiated streams.
2423        streams.update_peer_max_streams_bidi(5);
2424        streams.update_peer_max_streams_uni(5);
2425
2426        let stream_id = 0u64;
2427
2428        // Write data: both stream.send.buffered_bytes() and tx_buffered increase.
2429        {
2430            let stream = streams
2431                .get_or_create(
2432                    stream_id,
2433                    &local_params,
2434                    &peer_params,
2435                    true,
2436                    false,
2437                )
2438                .unwrap();
2439            assert_eq!(stream.send.write(b"hello", false), Ok(5));
2440        }
2441        streams.add_tx_buffered(5);
2442        assert_eq!(streams.get(stream_id).unwrap().send.buffered_bytes(), 5);
2443        assert_eq!(streams.tx_buffered(), 5);
2444        assert!(streams.tx_buffered_is_consistent());
2445
2446        // Emit data: both stream.send.buffered_bytes() and tx_buffered decrease.
2447        let mut buf = [0; 10];
2448        let written = {
2449            let stream = streams.get_mut(stream_id).unwrap();
2450            let (written, _) = stream.send.emit(&mut buf).unwrap();
2451            written
2452        };
2453        assert_eq!(written, 5);
2454        streams.sub_tx_buffered(5);
2455        assert_eq!(streams.get(stream_id).unwrap().send.buffered_bytes(), 0);
2456        assert_eq!(streams.tx_buffered(), 0);
2457        assert!(streams.tx_buffered_is_consistent());
2458
2459        // Retransmit: both stream.send.buffered_bytes() and tx_buffered increase
2460        // by actual amount retransmitted.
2461        let retransmitted = {
2462            let stream = streams.get_mut(stream_id).unwrap();
2463            stream.send.retransmit(0, 5)
2464        };
2465        assert_eq!(retransmitted, 5);
2466        streams.add_tx_buffered(retransmitted);
2467        assert_eq!(streams.get(stream_id).unwrap().send.buffered_bytes(), 5);
2468        assert_eq!(streams.tx_buffered(), 5);
2469        assert!(streams.tx_buffered_is_consistent());
2470
2471        // Ack and drop: both stream.send.buffered_bytes() and tx_buffered
2472        // decrease by actual amount dropped.
2473        let dropped = {
2474            let stream = streams.get_mut(stream_id).unwrap();
2475            stream.send.ack_and_drop(0, 5)
2476        };
2477        assert_eq!(dropped, 5);
2478        streams.sub_tx_buffered(dropped);
2479        assert_eq!(streams.get(stream_id).unwrap().send.buffered_bytes(), 0);
2480        assert_eq!(streams.tx_buffered(), 0);
2481        assert!(streams.tx_buffered_is_consistent());
2482    }
2483
2484    #[test]
2485    fn send_buf_len_reflects_buffered_data() {
2486        let mut stream = <Stream>::new(0, 15, 15, true, 0, 15);
2487
2488        // Initially empty.
2489        assert_eq!(stream.send.buffered_bytes(), 0);
2490
2491        // After write.
2492        assert_eq!(stream.send.write(b"hello", false), Ok(5));
2493        assert_eq!(stream.send.buffered_bytes(), 5);
2494
2495        // After emit.
2496        let mut buf = [0; 10];
2497        let (written, _) = stream.send.emit(&mut buf).unwrap();
2498        assert_eq!(written, 5);
2499        assert_eq!(stream.send.buffered_bytes(), 0);
2500
2501        // After retransmit.
2502        let retransmitted = stream.send.retransmit(0, 5);
2503        assert_eq!(retransmitted, 5);
2504        assert_eq!(stream.send.buffered_bytes(), 5);
2505
2506        // After ack_and_drop.
2507        let dropped = stream.send.ack_and_drop(0, 5);
2508        assert_eq!(dropped, 5);
2509        assert_eq!(stream.send.buffered_bytes(), 0);
2510    }
2511}
2512
2513mod recv_buf;
2514mod send_buf;