use std::cmp;
use std::sync::Arc;
use std::collections::hash_map;
use std::collections::HashMap;
use std::collections::HashSet;
use intrusive_collections::intrusive_adapter;
use intrusive_collections::KeyAdapter;
use intrusive_collections::RBTree;
use intrusive_collections::RBTreeAtomicLink;
use smallvec::SmallVec;
use crate::Error;
use crate::Result;
const DEFAULT_URGENCY: u8 = 127;
const DEFAULT_STREAM_WINDOW: u64 = 32 * 1024;
pub const MAX_STREAM_WINDOW: u64 = 16 * 1024 * 1024;
#[derive(Default)]
pub struct StreamIdHasher {
id: u64,
}
impl std::hash::Hasher for StreamIdHasher {
#[inline]
fn finish(&self) -> u64 {
self.id
}
#[inline]
fn write_u64(&mut self, id: u64) {
self.id = id;
}
#[inline]
fn write(&mut self, _: &[u8]) {
unimplemented!()
}
}
type BuildStreamIdHasher = std::hash::BuildHasherDefault<StreamIdHasher>;
pub type StreamIdHashMap<V> = HashMap<u64, V, BuildStreamIdHasher>;
pub type StreamIdHashSet = HashSet<u64, BuildStreamIdHasher>;
#[derive(Default)]
pub struct StreamMap {
streams: StreamIdHashMap<Stream>,
collected: StreamIdHashSet,
peer_max_streams_bidi: u64,
peer_max_streams_uni: u64,
peer_opened_streams_bidi: u64,
peer_opened_streams_uni: u64,
local_max_streams_bidi: u64,
local_max_streams_bidi_next: u64,
local_max_streams_uni: u64,
local_max_streams_uni_next: u64,
local_opened_streams_bidi: u64,
local_opened_streams_uni: u64,
flushable: RBTree<StreamFlushablePriorityAdapter>,
pub readable: RBTree<StreamReadablePriorityAdapter>,
pub writable: RBTree<StreamWritablePriorityAdapter>,
almost_full: StreamIdHashSet,
blocked: StreamIdHashMap<u64>,
reset: StreamIdHashMap<(u64, u64)>,
stopped: StreamIdHashMap<u64>,
max_stream_window: u64,
}
impl StreamMap {
pub fn new(
max_streams_bidi: u64, max_streams_uni: u64, max_stream_window: u64,
) -> StreamMap {
StreamMap {
local_max_streams_bidi: max_streams_bidi,
local_max_streams_bidi_next: max_streams_bidi,
local_max_streams_uni: max_streams_uni,
local_max_streams_uni_next: max_streams_uni,
max_stream_window,
..StreamMap::default()
}
}
pub fn get(&self, id: u64) -> Option<&Stream> {
self.streams.get(&id)
}
pub fn get_mut(&mut self, id: u64) -> Option<&mut Stream> {
self.streams.get_mut(&id)
}
pub(crate) fn get_or_create(
&mut self, id: u64, local_params: &crate::TransportParams,
peer_params: &crate::TransportParams, local: bool, is_server: bool,
) -> Result<&mut Stream> {
let (stream, is_new_and_writable) = match self.streams.entry(id) {
hash_map::Entry::Vacant(v) => {
if self.collected.contains(&id) {
return Err(Error::Done);
}
if local != is_local(id, is_server) {
return Err(Error::InvalidStreamState(id));
}
let (max_rx_data, max_tx_data) = match (local, is_bidi(id)) {
(true, true) => (
local_params.initial_max_stream_data_bidi_local,
peer_params.initial_max_stream_data_bidi_remote,
),
(true, false) => (0, peer_params.initial_max_stream_data_uni),
(false, true) => (
local_params.initial_max_stream_data_bidi_remote,
peer_params.initial_max_stream_data_bidi_local,
),
(false, false) =>
(local_params.initial_max_stream_data_uni, 0),
};
let stream_sequence = id >> 2;
match (is_local(id, is_server), is_bidi(id)) {
(true, true) => {
let n = std::cmp::max(
self.local_opened_streams_bidi,
stream_sequence + 1,
);
if n > self.peer_max_streams_bidi {
return Err(Error::StreamLimit);
}
self.local_opened_streams_bidi = n;
},
(true, false) => {
let n = std::cmp::max(
self.local_opened_streams_uni,
stream_sequence + 1,
);
if n > self.peer_max_streams_uni {
return Err(Error::StreamLimit);
}
self.local_opened_streams_uni = n;
},
(false, true) => {
let n = std::cmp::max(
self.peer_opened_streams_bidi,
stream_sequence + 1,
);
if n > self.local_max_streams_bidi {
return Err(Error::StreamLimit);
}
self.peer_opened_streams_bidi = n;
},
(false, false) => {
let n = std::cmp::max(
self.peer_opened_streams_uni,
stream_sequence + 1,
);
if n > self.local_max_streams_uni {
return Err(Error::StreamLimit);
}
self.peer_opened_streams_uni = n;
},
};
let s = Stream::new(
id,
max_rx_data,
max_tx_data,
is_bidi(id),
local,
self.max_stream_window,
);
let is_writable = s.is_writable();
(v.insert(s), is_writable)
},
hash_map::Entry::Occupied(v) => (v.into_mut(), false),
};
if is_new_and_writable {
self.writable.insert(Arc::clone(&stream.priority_key));
}
Ok(stream)
}
pub fn insert_readable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
if !priority_key.readable.is_linked() {
self.readable.insert(Arc::clone(priority_key));
}
}
pub fn remove_readable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
if !priority_key.readable.is_linked() {
return;
}
let mut c = {
let ptr = Arc::as_ptr(priority_key);
unsafe { self.readable.cursor_mut_from_ptr(ptr) }
};
c.remove();
}
pub fn insert_writable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
if !priority_key.writable.is_linked() {
self.writable.insert(Arc::clone(priority_key));
}
}
pub fn remove_writable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
if !priority_key.writable.is_linked() {
return;
}
let mut c = {
let ptr = Arc::as_ptr(priority_key);
unsafe { self.writable.cursor_mut_from_ptr(ptr) }
};
c.remove();
}
pub fn insert_flushable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
if !priority_key.flushable.is_linked() {
self.flushable.insert(Arc::clone(priority_key));
}
}
pub fn remove_flushable(&mut self, priority_key: &Arc<StreamPriorityKey>) {
if !priority_key.flushable.is_linked() {
return;
}
let mut c = {
let ptr = Arc::as_ptr(priority_key);
unsafe { self.flushable.cursor_mut_from_ptr(ptr) }
};
c.remove();
}
pub fn peek_flushable(&self) -> Option<Arc<StreamPriorityKey>> {
self.flushable.front().clone_pointer()
}
pub fn update_priority(
&mut self, old: &Arc<StreamPriorityKey>, new: &Arc<StreamPriorityKey>,
) {
if old.readable.is_linked() {
self.remove_readable(old);
self.readable.insert(Arc::clone(new));
}
if old.writable.is_linked() {
self.remove_writable(old);
self.writable.insert(Arc::clone(new));
}
if old.flushable.is_linked() {
self.remove_flushable(old);
self.flushable.insert(Arc::clone(new));
}
}
pub fn insert_almost_full(&mut self, stream_id: u64) {
self.almost_full.insert(stream_id);
}
pub fn remove_almost_full(&mut self, stream_id: u64) {
self.almost_full.remove(&stream_id);
}
pub fn insert_blocked(&mut self, stream_id: u64, off: u64) {
self.blocked.insert(stream_id, off);
}
pub fn remove_blocked(&mut self, stream_id: u64) {
self.blocked.remove(&stream_id);
}
pub fn insert_reset(
&mut self, stream_id: u64, error_code: u64, final_size: u64,
) {
self.reset.insert(stream_id, (error_code, final_size));
}
pub fn remove_reset(&mut self, stream_id: u64) {
self.reset.remove(&stream_id);
}
pub fn insert_stopped(&mut self, stream_id: u64, error_code: u64) {
self.stopped.insert(stream_id, error_code);
}
pub fn remove_stopped(&mut self, stream_id: u64) {
self.stopped.remove(&stream_id);
}
pub fn update_peer_max_streams_bidi(&mut self, v: u64) {
self.peer_max_streams_bidi = cmp::max(self.peer_max_streams_bidi, v);
}
pub fn update_peer_max_streams_uni(&mut self, v: u64) {
self.peer_max_streams_uni = cmp::max(self.peer_max_streams_uni, v);
}
pub fn update_max_streams_bidi(&mut self) {
self.local_max_streams_bidi = self.local_max_streams_bidi_next;
}
pub fn max_streams_bidi(&self) -> u64 {
self.local_max_streams_bidi
}
pub fn max_streams_bidi_next(&mut self) -> u64 {
self.local_max_streams_bidi_next
}
pub fn update_max_streams_uni(&mut self) {
self.local_max_streams_uni = self.local_max_streams_uni_next;
}
pub fn max_streams_uni_next(&mut self) -> u64 {
self.local_max_streams_uni_next
}
pub fn peer_streams_left_bidi(&self) -> u64 {
self.peer_max_streams_bidi - self.local_opened_streams_bidi
}
pub fn peer_streams_left_uni(&self) -> u64 {
self.peer_max_streams_uni - self.local_opened_streams_uni
}
pub fn collect(&mut self, stream_id: u64, local: bool) {
if !local {
if is_bidi(stream_id) {
self.local_max_streams_bidi_next =
self.local_max_streams_bidi_next.saturating_add(1);
} else {
self.local_max_streams_uni_next =
self.local_max_streams_uni_next.saturating_add(1);
}
}
let s = self.streams.remove(&stream_id).unwrap();
self.remove_readable(&s.priority_key);
self.remove_writable(&s.priority_key);
self.remove_flushable(&s.priority_key);
self.collected.insert(stream_id);
}
pub fn readable(&self) -> StreamIter {
StreamIter {
streams: self.readable.iter().map(|s| s.id).collect(),
index: 0,
}
}
pub fn writable(&self) -> StreamIter {
StreamIter {
streams: self.writable.iter().map(|s| s.id).collect(),
index: 0,
}
}
pub fn almost_full(&self) -> StreamIter {
StreamIter::from(&self.almost_full)
}
pub fn blocked(&self) -> hash_map::Iter<u64, u64> {
self.blocked.iter()
}
pub fn reset(&self) -> hash_map::Iter<u64, (u64, u64)> {
self.reset.iter()
}
pub fn stopped(&self) -> hash_map::Iter<u64, u64> {
self.stopped.iter()
}
pub fn is_collected(&self, stream_id: u64) -> bool {
self.collected.contains(&stream_id)
}
pub fn has_flushable(&self) -> bool {
!self.flushable.is_empty()
}
pub fn has_readable(&self) -> bool {
!self.readable.is_empty()
}
pub fn has_almost_full(&self) -> bool {
!self.almost_full.is_empty()
}
pub fn has_blocked(&self) -> bool {
!self.blocked.is_empty()
}
pub fn has_reset(&self) -> bool {
!self.reset.is_empty()
}
pub fn has_stopped(&self) -> bool {
!self.stopped.is_empty()
}
pub fn should_update_max_streams_bidi(&self) -> bool {
self.local_max_streams_bidi_next != self.local_max_streams_bidi &&
self.local_max_streams_bidi_next / 2 >
self.local_max_streams_bidi - self.peer_opened_streams_bidi
}
pub fn should_update_max_streams_uni(&self) -> bool {
self.local_max_streams_uni_next != self.local_max_streams_uni &&
self.local_max_streams_uni_next / 2 >
self.local_max_streams_uni - self.peer_opened_streams_uni
}
#[cfg(test)]
pub fn len(&self) -> usize {
self.streams.len()
}
}
pub struct Stream {
pub recv: recv_buf::RecvBuf,
pub send: send_buf::SendBuf,
pub send_lowat: usize,
pub bidi: bool,
pub local: bool,
pub urgency: u8,
pub incremental: bool,
pub priority_key: Arc<StreamPriorityKey>,
}
impl Stream {
pub fn new(
id: u64, max_rx_data: u64, max_tx_data: u64, bidi: bool, local: bool,
max_window: u64,
) -> Stream {
let priority_key = Arc::new(StreamPriorityKey {
id,
..Default::default()
});
Stream {
recv: recv_buf::RecvBuf::new(max_rx_data, max_window),
send: send_buf::SendBuf::new(max_tx_data),
send_lowat: 1,
bidi,
local,
urgency: priority_key.urgency,
incremental: priority_key.incremental,
priority_key,
}
}
pub fn is_readable(&self) -> bool {
self.recv.ready()
}
pub fn is_writable(&self) -> bool {
!self.send.is_shutdown() &&
!self.send.is_fin() &&
(self.send.off_back() + self.send_lowat as u64) <
self.send.max_off()
}
pub fn is_flushable(&self) -> bool {
let off_front = self.send.off_front();
!self.send.is_empty() &&
off_front < self.send.off_back() &&
off_front < self.send.max_off()
}
pub fn is_complete(&self) -> bool {
match (self.bidi, self.local) {
(true, _) => self.recv.is_fin() && self.send.is_complete(),
(false, true) => self.send.is_complete(),
(false, false) => self.recv.is_fin(),
}
}
}
pub fn is_local(stream_id: u64, is_server: bool) -> bool {
(stream_id & 0x1) == (is_server as u64)
}
pub fn is_bidi(stream_id: u64) -> bool {
(stream_id & 0x2) == 0
}
#[derive(Clone, Debug)]
pub struct StreamPriorityKey {
pub urgency: u8,
pub incremental: bool,
pub id: u64,
pub readable: RBTreeAtomicLink,
pub writable: RBTreeAtomicLink,
pub flushable: RBTreeAtomicLink,
}
impl Default for StreamPriorityKey {
fn default() -> Self {
Self {
urgency: DEFAULT_URGENCY,
incremental: true,
id: Default::default(),
readable: Default::default(),
writable: Default::default(),
flushable: Default::default(),
}
}
}
impl PartialEq for StreamPriorityKey {
fn eq(&self, other: &Self) -> bool {
self.id == other.id
}
}
impl Eq for StreamPriorityKey {}
impl PartialOrd for StreamPriorityKey {
#[allow(clippy::non_canonical_partial_ord_impl)]
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
if self.id == other.id {
return Some(std::cmp::Ordering::Equal);
}
if self.urgency != other.urgency {
return self.urgency.partial_cmp(&other.urgency);
}
if !self.incremental && !other.incremental {
return self.id.partial_cmp(&other.id);
}
if self.incremental && !other.incremental {
return Some(std::cmp::Ordering::Greater);
}
if !self.incremental && other.incremental {
return Some(std::cmp::Ordering::Less);
}
Some(std::cmp::Ordering::Greater)
}
}
impl Ord for StreamPriorityKey {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.partial_cmp(other).unwrap()
}
}
intrusive_adapter!(pub StreamWritablePriorityAdapter = Arc<StreamPriorityKey>: StreamPriorityKey { writable: RBTreeAtomicLink });
impl<'a> KeyAdapter<'a> for StreamWritablePriorityAdapter {
type Key = StreamPriorityKey;
fn get_key(&self, s: &StreamPriorityKey) -> Self::Key {
s.clone()
}
}
intrusive_adapter!(pub StreamReadablePriorityAdapter = Arc<StreamPriorityKey>: StreamPriorityKey { readable: RBTreeAtomicLink });
impl<'a> KeyAdapter<'a> for StreamReadablePriorityAdapter {
type Key = StreamPriorityKey;
fn get_key(&self, s: &StreamPriorityKey) -> Self::Key {
s.clone()
}
}
intrusive_adapter!(pub StreamFlushablePriorityAdapter = Arc<StreamPriorityKey>: StreamPriorityKey { flushable: RBTreeAtomicLink });
impl<'a> KeyAdapter<'a> for StreamFlushablePriorityAdapter {
type Key = StreamPriorityKey;
fn get_key(&self, s: &StreamPriorityKey) -> Self::Key {
s.clone()
}
}
#[derive(Default)]
pub struct StreamIter {
streams: SmallVec<[u64; 8]>,
index: usize,
}
impl StreamIter {
#[inline]
fn from(streams: &StreamIdHashSet) -> Self {
StreamIter {
streams: streams.iter().copied().collect(),
index: 0,
}
}
}
impl Iterator for StreamIter {
type Item = u64;
#[inline]
fn next(&mut self) -> Option<Self::Item> {
let v = self.streams.get(self.index)?;
self.index += 1;
Some(*v)
}
}
impl ExactSizeIterator for StreamIter {
#[inline]
fn len(&self) -> usize {
self.streams.len() - self.index
}
}
#[derive(Clone, Debug, Default, Eq)]
pub struct RangeBuf {
data: Arc<Vec<u8>>,
start: usize,
pos: usize,
len: usize,
off: u64,
fin: bool,
}
impl RangeBuf {
pub fn from(buf: &[u8], off: u64, fin: bool) -> RangeBuf {
RangeBuf {
data: Arc::new(Vec::from(buf)),
start: 0,
pos: 0,
len: buf.len(),
off,
fin,
}
}
pub fn fin(&self) -> bool {
self.fin
}
pub fn off(&self) -> u64 {
(self.off - self.start as u64) + self.pos as u64
}
pub fn max_off(&self) -> u64 {
self.off() + self.len() as u64
}
pub fn len(&self) -> usize {
self.len - (self.pos - self.start)
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn consume(&mut self, count: usize) {
self.pos += count;
}
pub fn split_off(&mut self, at: usize) -> RangeBuf {
assert!(
at <= self.len,
"`at` split index (is {}) should be <= len (is {})",
at,
self.len
);
let buf = RangeBuf {
data: self.data.clone(),
start: self.start + at,
pos: cmp::max(self.pos, self.start + at),
len: self.len - at,
off: self.off + at as u64,
fin: self.fin,
};
self.pos = cmp::min(self.pos, self.start + at);
self.len = at;
self.fin = false;
buf
}
}
impl std::ops::Deref for RangeBuf {
type Target = [u8];
fn deref(&self) -> &[u8] {
&self.data[self.pos..self.start + self.len]
}
}
impl Ord for RangeBuf {
fn cmp(&self, other: &RangeBuf) -> cmp::Ordering {
self.off.cmp(&other.off).reverse()
}
}
impl PartialOrd for RangeBuf {
fn partial_cmp(&self, other: &RangeBuf) -> Option<cmp::Ordering> {
Some(self.cmp(other))
}
}
impl PartialEq for RangeBuf {
fn eq(&self, other: &RangeBuf) -> bool {
self.off == other.off
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn recv_flow_control() {
let mut stream = Stream::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
assert!(!stream.recv.almost_full());
let mut buf = [0; 32];
let first = RangeBuf::from(b"hello", 0, false);
let second = RangeBuf::from(b"world", 5, false);
let third = RangeBuf::from(b"something", 10, false);
assert_eq!(stream.recv.write(second), Ok(()));
assert_eq!(stream.recv.write(first), Ok(()));
assert!(!stream.recv.almost_full());
assert_eq!(stream.recv.write(third), Err(Error::FlowControl));
let (len, fin) = stream.recv.emit(&mut buf).unwrap();
assert_eq!(&buf[..len], b"helloworld");
assert!(!fin);
assert!(stream.recv.almost_full());
stream.recv.update_max_data(std::time::Instant::now());
assert_eq!(stream.recv.max_data_next(), 25);
assert!(!stream.recv.almost_full());
let third = RangeBuf::from(b"something", 10, false);
assert_eq!(stream.recv.write(third), Ok(()));
}
#[test]
fn recv_past_fin() {
let mut stream = Stream::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
assert!(!stream.recv.almost_full());
let first = RangeBuf::from(b"hello", 0, true);
let second = RangeBuf::from(b"world", 5, false);
assert_eq!(stream.recv.write(first), Ok(()));
assert_eq!(stream.recv.write(second), Err(Error::FinalSize));
}
#[test]
fn recv_fin_dup() {
let mut stream = Stream::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
assert!(!stream.recv.almost_full());
let first = RangeBuf::from(b"hello", 0, true);
let second = RangeBuf::from(b"hello", 0, true);
assert_eq!(stream.recv.write(first), Ok(()));
assert_eq!(stream.recv.write(second), Ok(()));
let mut buf = [0; 32];
let (len, fin) = stream.recv.emit(&mut buf).unwrap();
assert_eq!(&buf[..len], b"hello");
assert!(fin);
}
#[test]
fn recv_fin_change() {
let mut stream = Stream::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
assert!(!stream.recv.almost_full());
let first = RangeBuf::from(b"hello", 0, true);
let second = RangeBuf::from(b"world", 5, true);
assert_eq!(stream.recv.write(second), Ok(()));
assert_eq!(stream.recv.write(first), Err(Error::FinalSize));
}
#[test]
fn recv_fin_lower_than_received() {
let mut stream = Stream::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
assert!(!stream.recv.almost_full());
let first = RangeBuf::from(b"hello", 0, true);
let second = RangeBuf::from(b"world", 5, false);
assert_eq!(stream.recv.write(second), Ok(()));
assert_eq!(stream.recv.write(first), Err(Error::FinalSize));
}
#[test]
fn recv_fin_flow_control() {
let mut stream = Stream::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
assert!(!stream.recv.almost_full());
let mut buf = [0; 32];
let first = RangeBuf::from(b"hello", 0, false);
let second = RangeBuf::from(b"world", 5, true);
assert_eq!(stream.recv.write(first), Ok(()));
assert_eq!(stream.recv.write(second), Ok(()));
let (len, fin) = stream.recv.emit(&mut buf).unwrap();
assert_eq!(&buf[..len], b"helloworld");
assert!(fin);
assert!(!stream.recv.almost_full());
}
#[test]
fn recv_fin_reset_mismatch() {
let mut stream = Stream::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
assert!(!stream.recv.almost_full());
let first = RangeBuf::from(b"hello", 0, true);
assert_eq!(stream.recv.write(first), Ok(()));
assert_eq!(stream.recv.reset(0, 10), Err(Error::FinalSize));
}
#[test]
fn recv_reset_dup() {
let mut stream = Stream::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
assert!(!stream.recv.almost_full());
let first = RangeBuf::from(b"hello", 0, false);
assert_eq!(stream.recv.write(first), Ok(()));
assert_eq!(stream.recv.reset(0, 5), Ok(0));
assert_eq!(stream.recv.reset(0, 5), Ok(0));
}
#[test]
fn recv_reset_change() {
let mut stream = Stream::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
assert!(!stream.recv.almost_full());
let first = RangeBuf::from(b"hello", 0, false);
assert_eq!(stream.recv.write(first), Ok(()));
assert_eq!(stream.recv.reset(0, 5), Ok(0));
assert_eq!(stream.recv.reset(0, 10), Err(Error::FinalSize));
}
#[test]
fn recv_reset_lower_than_received() {
let mut stream = Stream::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
assert!(!stream.recv.almost_full());
let first = RangeBuf::from(b"hello", 0, false);
assert_eq!(stream.recv.write(first), Ok(()));
assert_eq!(stream.recv.reset(0, 4), Err(Error::FinalSize));
}
#[test]
fn send_flow_control() {
let mut buf = [0; 25];
let mut stream = Stream::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
let first = b"hello";
let second = b"world";
let third = b"something";
assert!(stream.send.write(first, false).is_ok());
assert!(stream.send.write(second, false).is_ok());
assert!(stream.send.write(third, false).is_ok());
assert_eq!(stream.send.off_front(), 0);
let (written, fin) = stream.send.emit(&mut buf[..25]).unwrap();
assert_eq!(written, 15);
assert!(!fin);
assert_eq!(&buf[..written], b"helloworldsomet");
assert_eq!(stream.send.off_front(), 15);
let (written, fin) = stream.send.emit(&mut buf[..25]).unwrap();
assert_eq!(written, 0);
assert!(!fin);
assert_eq!(&buf[..written], b"");
stream.send.retransmit(0, 15);
assert_eq!(stream.send.off_front(), 0);
let (written, fin) = stream.send.emit(&mut buf[..10]).unwrap();
assert_eq!(written, 10);
assert!(!fin);
assert_eq!(&buf[..written], b"helloworld");
assert_eq!(stream.send.off_front(), 10);
let (written, fin) = stream.send.emit(&mut buf[..10]).unwrap();
assert_eq!(written, 5);
assert!(!fin);
assert_eq!(&buf[..written], b"somet");
}
#[test]
fn send_past_fin() {
let mut stream = Stream::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
let first = b"hello";
let second = b"world";
let third = b"third";
assert_eq!(stream.send.write(first, false), Ok(5));
assert_eq!(stream.send.write(second, true), Ok(5));
assert!(stream.send.is_fin());
assert_eq!(stream.send.write(third, false), Err(Error::FinalSize));
}
#[test]
fn send_fin_dup() {
let mut stream = Stream::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
assert_eq!(stream.send.write(b"hello", true), Ok(5));
assert!(stream.send.is_fin());
assert_eq!(stream.send.write(b"", true), Ok(0));
assert!(stream.send.is_fin());
}
#[test]
fn send_undo_fin() {
let mut stream = Stream::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
assert_eq!(stream.send.write(b"hello", true), Ok(5));
assert!(stream.send.is_fin());
assert_eq!(
stream.send.write(b"helloworld", true),
Err(Error::FinalSize)
);
}
#[test]
fn send_fin_max_data_match() {
let mut buf = [0; 15];
let mut stream = Stream::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
let slice = b"hellohellohello";
assert!(stream.send.write(slice, true).is_ok());
let (written, fin) = stream.send.emit(&mut buf[..15]).unwrap();
assert_eq!(written, 15);
assert!(fin);
assert_eq!(&buf[..written], slice);
}
#[test]
fn send_fin_zero_length() {
let mut buf = [0; 5];
let mut stream = Stream::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
assert_eq!(stream.send.write(b"hello", false), Ok(5));
assert_eq!(stream.send.write(b"", true), Ok(0));
assert!(stream.send.is_fin());
let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
assert_eq!(written, 5);
assert!(fin);
assert_eq!(&buf[..written], b"hello");
}
#[test]
fn send_ack() {
let mut buf = [0; 5];
let mut stream = Stream::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
assert_eq!(stream.send.write(b"hello", false), Ok(5));
assert_eq!(stream.send.write(b"world", false), Ok(5));
assert_eq!(stream.send.write(b"", true), Ok(0));
assert!(stream.send.is_fin());
assert_eq!(stream.send.off_front(), 0);
let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
assert_eq!(written, 5);
assert!(!fin);
assert_eq!(&buf[..written], b"hello");
stream.send.ack_and_drop(0, 5);
stream.send.retransmit(0, 5);
assert_eq!(stream.send.off_front(), 5);
let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
assert_eq!(written, 5);
assert!(fin);
assert_eq!(&buf[..written], b"world");
}
#[test]
fn send_ack_reordering() {
let mut buf = [0; 5];
let mut stream = Stream::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
assert_eq!(stream.send.write(b"hello", false), Ok(5));
assert_eq!(stream.send.write(b"world", false), Ok(5));
assert_eq!(stream.send.write(b"", true), Ok(0));
assert!(stream.send.is_fin());
assert_eq!(stream.send.off_front(), 0);
let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
assert_eq!(written, 5);
assert!(!fin);
assert_eq!(&buf[..written], b"hello");
assert_eq!(stream.send.off_front(), 5);
let (written, fin) = stream.send.emit(&mut buf[..1]).unwrap();
assert_eq!(written, 1);
assert!(!fin);
assert_eq!(&buf[..written], b"w");
stream.send.ack_and_drop(5, 1);
stream.send.ack_and_drop(0, 5);
stream.send.retransmit(0, 5);
stream.send.retransmit(5, 1);
assert_eq!(stream.send.off_front(), 6);
let (written, fin) = stream.send.emit(&mut buf[..5]).unwrap();
assert_eq!(written, 4);
assert!(fin);
assert_eq!(&buf[..written], b"orld");
}
#[test]
fn recv_data_below_off() {
let mut stream = Stream::new(0, 15, 0, true, true, DEFAULT_STREAM_WINDOW);
let first = RangeBuf::from(b"hello", 0, false);
assert_eq!(stream.recv.write(first), Ok(()));
let mut buf = [0; 10];
let (len, fin) = stream.recv.emit(&mut buf).unwrap();
assert_eq!(&buf[..len], b"hello");
assert!(!fin);
let first = RangeBuf::from(b"elloworld", 1, true);
assert_eq!(stream.recv.write(first), Ok(()));
let (len, fin) = stream.recv.emit(&mut buf).unwrap();
assert_eq!(&buf[..len], b"world");
assert!(fin);
}
#[test]
fn stream_complete() {
let mut stream =
Stream::new(0, 30, 30, true, true, DEFAULT_STREAM_WINDOW);
assert_eq!(stream.send.write(b"hello", false), Ok(5));
assert_eq!(stream.send.write(b"world", false), Ok(5));
assert!(!stream.send.is_complete());
assert!(!stream.send.is_fin());
assert_eq!(stream.send.write(b"", true), Ok(0));
assert!(!stream.send.is_complete());
assert!(stream.send.is_fin());
let buf = RangeBuf::from(b"hello", 0, true);
assert!(stream.recv.write(buf).is_ok());
assert!(!stream.recv.is_fin());
stream.send.ack(6, 4);
assert!(!stream.send.is_complete());
let mut buf = [0; 2];
assert_eq!(stream.recv.emit(&mut buf), Ok((2, false)));
assert!(!stream.recv.is_fin());
stream.send.ack(1, 5);
assert!(!stream.send.is_complete());
stream.send.ack(0, 1);
assert!(stream.send.is_complete());
assert!(!stream.is_complete());
let mut buf = [0; 3];
assert_eq!(stream.recv.emit(&mut buf), Ok((3, true)));
assert!(stream.recv.is_fin());
assert!(stream.is_complete());
}
#[test]
fn send_fin_zero_length_output() {
let mut buf = [0; 5];
let mut stream = Stream::new(0, 0, 15, true, true, DEFAULT_STREAM_WINDOW);
assert_eq!(stream.send.write(b"hello", false), Ok(5));
assert_eq!(stream.send.off_front(), 0);
assert!(!stream.send.is_fin());
let (written, fin) = stream.send.emit(&mut buf).unwrap();
assert_eq!(written, 5);
assert!(!fin);
assert_eq!(&buf[..written], b"hello");
assert_eq!(stream.send.write(b"", true), Ok(0));
assert!(stream.send.is_fin());
assert_eq!(stream.send.off_front(), 5);
let (written, fin) = stream.send.emit(&mut buf).unwrap();
assert_eq!(written, 0);
assert!(fin);
assert_eq!(&buf[..written], b"");
}
fn stream_send_ready(stream: &Stream) -> bool {
!stream.send.is_empty() &&
stream.send.off_front() < stream.send.off_back()
}
#[test]
fn send_emit() {
let mut buf = [0; 5];
let mut stream = Stream::new(0, 0, 20, true, true, DEFAULT_STREAM_WINDOW);
assert_eq!(stream.send.write(b"hello", false), Ok(5));
assert_eq!(stream.send.write(b"world", false), Ok(5));
assert_eq!(stream.send.write(b"olleh", false), Ok(5));
assert_eq!(stream.send.write(b"dlrow", true), Ok(5));
assert_eq!(stream.send.off_front(), 0);
assert_eq!(stream.send.bufs_count(), 4);
assert!(stream.is_flushable());
assert!(stream_send_ready(&stream));
assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
assert_eq!(stream.send.off_front(), 4);
assert_eq!(&buf[..4], b"hell");
assert!(stream_send_ready(&stream));
assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
assert_eq!(stream.send.off_front(), 8);
assert_eq!(&buf[..4], b"owor");
assert!(stream_send_ready(&stream));
assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
assert_eq!(stream.send.off_front(), 10);
assert_eq!(&buf[..2], b"ld");
assert!(stream_send_ready(&stream));
assert_eq!(stream.send.emit(&mut buf[..1]), Ok((1, false)));
assert_eq!(stream.send.off_front(), 11);
assert_eq!(&buf[..1], b"o");
assert!(stream_send_ready(&stream));
assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
assert_eq!(stream.send.off_front(), 16);
assert_eq!(&buf[..5], b"llehd");
assert!(stream_send_ready(&stream));
assert_eq!(stream.send.emit(&mut buf[..5]), Ok((4, true)));
assert_eq!(stream.send.off_front(), 20);
assert_eq!(&buf[..4], b"lrow");
assert!(!stream.is_flushable());
assert!(!stream_send_ready(&stream));
assert_eq!(stream.send.emit(&mut buf[..5]), Ok((0, true)));
assert_eq!(stream.send.off_front(), 20);
}
#[test]
fn send_emit_ack() {
let mut buf = [0; 5];
let mut stream = Stream::new(0, 0, 20, true, true, DEFAULT_STREAM_WINDOW);
assert_eq!(stream.send.write(b"hello", false), Ok(5));
assert_eq!(stream.send.write(b"world", false), Ok(5));
assert_eq!(stream.send.write(b"olleh", false), Ok(5));
assert_eq!(stream.send.write(b"dlrow", true), Ok(5));
assert_eq!(stream.send.off_front(), 0);
assert_eq!(stream.send.bufs_count(), 4);
assert!(stream.is_flushable());
assert!(stream_send_ready(&stream));
assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
assert_eq!(stream.send.off_front(), 4);
assert_eq!(&buf[..4], b"hell");
assert!(stream_send_ready(&stream));
assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
assert_eq!(stream.send.off_front(), 8);
assert_eq!(&buf[..4], b"owor");
stream.send.ack_and_drop(0, 5);
assert_eq!(stream.send.bufs_count(), 3);
assert!(stream_send_ready(&stream));
assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
assert_eq!(stream.send.off_front(), 10);
assert_eq!(&buf[..2], b"ld");
stream.send.ack_and_drop(7, 5);
assert_eq!(stream.send.bufs_count(), 3);
assert!(stream_send_ready(&stream));
assert_eq!(stream.send.emit(&mut buf[..1]), Ok((1, false)));
assert_eq!(stream.send.off_front(), 11);
assert_eq!(&buf[..1], b"o");
assert!(stream_send_ready(&stream));
assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
assert_eq!(stream.send.off_front(), 16);
assert_eq!(&buf[..5], b"llehd");
stream.send.ack_and_drop(5, 7);
assert_eq!(stream.send.bufs_count(), 2);
assert!(stream_send_ready(&stream));
assert_eq!(stream.send.emit(&mut buf[..5]), Ok((4, true)));
assert_eq!(stream.send.off_front(), 20);
assert_eq!(&buf[..4], b"lrow");
assert!(!stream.is_flushable());
assert!(!stream_send_ready(&stream));
assert_eq!(stream.send.emit(&mut buf[..5]), Ok((0, true)));
assert_eq!(stream.send.off_front(), 20);
stream.send.ack_and_drop(22, 4);
assert_eq!(stream.send.bufs_count(), 2);
stream.send.ack_and_drop(20, 1);
assert_eq!(stream.send.bufs_count(), 2);
}
#[test]
fn send_emit_retransmit() {
let mut buf = [0; 5];
let mut stream = Stream::new(0, 0, 20, true, true, DEFAULT_STREAM_WINDOW);
assert_eq!(stream.send.write(b"hello", false), Ok(5));
assert_eq!(stream.send.write(b"world", false), Ok(5));
assert_eq!(stream.send.write(b"olleh", false), Ok(5));
assert_eq!(stream.send.write(b"dlrow", true), Ok(5));
assert_eq!(stream.send.off_front(), 0);
assert_eq!(stream.send.bufs_count(), 4);
assert!(stream.is_flushable());
assert!(stream_send_ready(&stream));
assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
assert_eq!(stream.send.off_front(), 4);
assert_eq!(&buf[..4], b"hell");
assert!(stream_send_ready(&stream));
assert_eq!(stream.send.emit(&mut buf[..4]), Ok((4, false)));
assert_eq!(stream.send.off_front(), 8);
assert_eq!(&buf[..4], b"owor");
stream.send.retransmit(3, 3);
assert_eq!(stream.send.off_front(), 3);
assert!(stream_send_ready(&stream));
assert_eq!(stream.send.emit(&mut buf[..3]), Ok((3, false)));
assert_eq!(stream.send.off_front(), 8);
assert_eq!(&buf[..3], b"low");
assert!(stream_send_ready(&stream));
assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
assert_eq!(stream.send.off_front(), 10);
assert_eq!(&buf[..2], b"ld");
stream.send.ack_and_drop(7, 2);
stream.send.retransmit(8, 2);
assert!(stream_send_ready(&stream));
assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
assert_eq!(stream.send.off_front(), 10);
assert_eq!(&buf[..2], b"ld");
assert!(stream_send_ready(&stream));
assert_eq!(stream.send.emit(&mut buf[..1]), Ok((1, false)));
assert_eq!(stream.send.off_front(), 11);
assert_eq!(&buf[..1], b"o");
assert!(stream_send_ready(&stream));
assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
assert_eq!(stream.send.off_front(), 16);
assert_eq!(&buf[..5], b"llehd");
stream.send.retransmit(12, 2);
assert!(stream_send_ready(&stream));
assert_eq!(stream.send.emit(&mut buf[..2]), Ok((2, false)));
assert_eq!(stream.send.off_front(), 16);
assert_eq!(&buf[..2], b"le");
assert!(stream_send_ready(&stream));
assert_eq!(stream.send.emit(&mut buf[..5]), Ok((4, true)));
assert_eq!(stream.send.off_front(), 20);
assert_eq!(&buf[..4], b"lrow");
assert!(!stream.is_flushable());
assert!(!stream_send_ready(&stream));
assert_eq!(stream.send.emit(&mut buf[..5]), Ok((0, true)));
assert_eq!(stream.send.off_front(), 20);
stream.send.retransmit(7, 12);
assert!(stream_send_ready(&stream));
assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
assert_eq!(stream.send.off_front(), 12);
assert_eq!(&buf[..5], b"rldol");
assert!(stream_send_ready(&stream));
assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
assert_eq!(stream.send.off_front(), 17);
assert_eq!(&buf[..5], b"lehdl");
assert!(stream_send_ready(&stream));
assert_eq!(stream.send.emit(&mut buf[..5]), Ok((2, false)));
assert_eq!(stream.send.off_front(), 20);
assert_eq!(&buf[..2], b"ro");
stream.send.ack_and_drop(12, 7);
stream.send.retransmit(7, 12);
assert!(stream_send_ready(&stream));
assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
assert_eq!(stream.send.off_front(), 12);
assert_eq!(&buf[..5], b"rldol");
assert!(stream_send_ready(&stream));
assert_eq!(stream.send.emit(&mut buf[..5]), Ok((5, false)));
assert_eq!(stream.send.off_front(), 17);
assert_eq!(&buf[..5], b"lehdl");
assert!(stream_send_ready(&stream));
assert_eq!(stream.send.emit(&mut buf[..5]), Ok((2, false)));
assert_eq!(stream.send.off_front(), 20);
assert_eq!(&buf[..2], b"ro");
}
#[test]
fn rangebuf_split_off() {
let mut buf = RangeBuf::from(b"helloworld", 5, true);
assert_eq!(buf.start, 0);
assert_eq!(buf.pos, 0);
assert_eq!(buf.len, 10);
assert_eq!(buf.off, 5);
assert!(buf.fin);
assert_eq!(buf.len(), 10);
assert_eq!(buf.off(), 5);
assert!(buf.fin());
assert_eq!(&buf[..], b"helloworld");
buf.consume(5);
assert_eq!(buf.start, 0);
assert_eq!(buf.pos, 5);
assert_eq!(buf.len, 10);
assert_eq!(buf.off, 5);
assert!(buf.fin);
assert_eq!(buf.len(), 5);
assert_eq!(buf.off(), 10);
assert!(buf.fin());
assert_eq!(&buf[..], b"world");
let mut new_buf = buf.split_off(3);
assert_eq!(buf.start, 0);
assert_eq!(buf.pos, 3);
assert_eq!(buf.len, 3);
assert_eq!(buf.off, 5);
assert!(!buf.fin);
assert_eq!(buf.len(), 0);
assert_eq!(buf.off(), 8);
assert!(!buf.fin());
assert_eq!(&buf[..], b"");
assert_eq!(new_buf.start, 3);
assert_eq!(new_buf.pos, 5);
assert_eq!(new_buf.len, 7);
assert_eq!(new_buf.off, 8);
assert!(new_buf.fin);
assert_eq!(new_buf.len(), 5);
assert_eq!(new_buf.off(), 10);
assert!(new_buf.fin());
assert_eq!(&new_buf[..], b"world");
new_buf.consume(2);
assert_eq!(new_buf.start, 3);
assert_eq!(new_buf.pos, 7);
assert_eq!(new_buf.len, 7);
assert_eq!(new_buf.off, 8);
assert!(new_buf.fin);
assert_eq!(new_buf.len(), 3);
assert_eq!(new_buf.off(), 12);
assert!(new_buf.fin());
assert_eq!(&new_buf[..], b"rld");
let mut new_new_buf = new_buf.split_off(5);
assert_eq!(new_buf.start, 3);
assert_eq!(new_buf.pos, 7);
assert_eq!(new_buf.len, 5);
assert_eq!(new_buf.off, 8);
assert!(!new_buf.fin);
assert_eq!(new_buf.len(), 1);
assert_eq!(new_buf.off(), 12);
assert!(!new_buf.fin());
assert_eq!(&new_buf[..], b"r");
assert_eq!(new_new_buf.start, 8);
assert_eq!(new_new_buf.pos, 8);
assert_eq!(new_new_buf.len, 2);
assert_eq!(new_new_buf.off, 13);
assert!(new_new_buf.fin);
assert_eq!(new_new_buf.len(), 2);
assert_eq!(new_new_buf.off(), 13);
assert!(new_new_buf.fin());
assert_eq!(&new_new_buf[..], b"ld");
new_new_buf.consume(2);
assert_eq!(new_new_buf.start, 8);
assert_eq!(new_new_buf.pos, 10);
assert_eq!(new_new_buf.len, 2);
assert_eq!(new_new_buf.off, 13);
assert!(new_new_buf.fin);
assert_eq!(new_new_buf.len(), 0);
assert_eq!(new_new_buf.off(), 15);
assert!(new_new_buf.fin());
assert_eq!(&new_new_buf[..], b"");
}
#[test]
fn stream_limit_auto_open() {
let local_tp = crate::TransportParams::default();
let peer_tp = crate::TransportParams::default();
let mut streams = StreamMap::new(5, 5, 5);
let stream_id = 500;
assert!(!is_local(stream_id, true), "stream id is peer initiated");
assert!(is_bidi(stream_id), "stream id is bidirectional");
assert_eq!(
streams
.get_or_create(stream_id, &local_tp, &peer_tp, false, true)
.err(),
Some(Error::StreamLimit),
"stream limit should be exceeded"
);
}
#[test]
fn stream_create_out_of_order() {
let local_tp = crate::TransportParams::default();
let peer_tp = crate::TransportParams::default();
let mut streams = StreamMap::new(5, 5, 5);
for stream_id in [8, 12, 4] {
assert!(is_local(stream_id, false), "stream id is client initiated");
assert!(is_bidi(stream_id), "stream id is bidirectional");
assert!(streams
.get_or_create(stream_id, &local_tp, &peer_tp, false, true)
.is_ok());
}
}
#[test]
fn stream_limit_edge() {
let local_tp = crate::TransportParams::default();
let peer_tp = crate::TransportParams::default();
let mut streams = StreamMap::new(3, 3, 3);
let stream_id = 8;
assert!(streams
.get_or_create(stream_id, &local_tp, &peer_tp, false, true)
.is_ok());
let stream_id = 12;
assert_eq!(
streams
.get_or_create(stream_id, &local_tp, &peer_tp, false, true)
.err(),
Some(Error::StreamLimit)
);
}
fn cycle_stream_priority(stream_id: u64, streams: &mut StreamMap) {
let key = streams.get(stream_id).unwrap().priority_key.clone();
streams.update_priority(&key.clone(), &key);
}
#[test]
fn writable_prioritized_default_priority() {
let local_tp = crate::TransportParams::default();
let peer_tp = crate::TransportParams {
initial_max_stream_data_bidi_local: 100,
initial_max_stream_data_uni: 100,
..Default::default()
};
let mut streams = StreamMap::new(100, 100, 100);
for id in [0, 4, 8, 12] {
assert!(streams
.get_or_create(id, &local_tp, &peer_tp, false, true)
.is_ok());
}
let walk_1: Vec<u64> = streams.writable().collect();
cycle_stream_priority(*walk_1.first().unwrap(), &mut streams);
let walk_2: Vec<u64> = streams.writable().collect();
cycle_stream_priority(*walk_2.first().unwrap(), &mut streams);
let walk_3: Vec<u64> = streams.writable().collect();
cycle_stream_priority(*walk_3.first().unwrap(), &mut streams);
let walk_4: Vec<u64> = streams.writable().collect();
cycle_stream_priority(*walk_4.first().unwrap(), &mut streams);
let walk_5: Vec<u64> = streams.writable().collect();
assert_eq!(walk_1, vec![0, 4, 8, 12]);
assert_eq!(walk_2, vec![4, 8, 12, 0]);
assert_eq!(walk_3, vec![8, 12, 0, 4]);
assert_eq!(walk_4, vec![12, 0, 4, 8,]);
assert_eq!(walk_5, vec![0, 4, 8, 12]);
}
#[test]
fn writable_prioritized_insert_order() {
let local_tp = crate::TransportParams::default();
let peer_tp = crate::TransportParams {
initial_max_stream_data_bidi_local: 100,
initial_max_stream_data_uni: 100,
..Default::default()
};
let mut streams = StreamMap::new(100, 100, 100);
for id in [12, 4, 8, 0] {
assert!(streams
.get_or_create(id, &local_tp, &peer_tp, false, true)
.is_ok());
}
let walk_1: Vec<u64> = streams.writable().collect();
cycle_stream_priority(*walk_1.first().unwrap(), &mut streams);
let walk_2: Vec<u64> = streams.writable().collect();
cycle_stream_priority(*walk_2.first().unwrap(), &mut streams);
let walk_3: Vec<u64> = streams.writable().collect();
cycle_stream_priority(*walk_3.first().unwrap(), &mut streams);
let walk_4: Vec<u64> = streams.writable().collect();
cycle_stream_priority(*walk_4.first().unwrap(), &mut streams);
let walk_5: Vec<u64> = streams.writable().collect();
assert_eq!(walk_1, vec![12, 4, 8, 0]);
assert_eq!(walk_2, vec![4, 8, 0, 12]);
assert_eq!(walk_3, vec![8, 0, 12, 4,]);
assert_eq!(walk_4, vec![0, 12, 4, 8]);
assert_eq!(walk_5, vec![12, 4, 8, 0]);
}
#[test]
fn writable_prioritized_mixed_urgency() {
let local_tp = crate::TransportParams::default();
let peer_tp = crate::TransportParams {
initial_max_stream_data_bidi_local: 100,
initial_max_stream_data_uni: 100,
..Default::default()
};
let mut streams = StreamMap::new(100, 100, 100);
let input = vec![
(0, 100),
(4, 90),
(8, 80),
(12, 70),
(16, 60),
(20, 50),
(24, 40),
(28, 30),
(32, 20),
(36, 10),
(40, 0),
];
for (id, urgency) in input.clone() {
let stream = streams
.get_or_create(id, &local_tp, &peer_tp, false, true)
.unwrap();
stream.urgency = urgency;
let new_priority_key = Arc::new(StreamPriorityKey {
urgency: stream.urgency,
incremental: stream.incremental,
id,
..Default::default()
});
let old_priority_key = std::mem::replace(
&mut stream.priority_key,
new_priority_key.clone(),
);
streams.update_priority(&old_priority_key, &new_priority_key);
}
let walk_1: Vec<u64> = streams.writable().collect();
assert_eq!(walk_1, vec![40, 36, 32, 28, 24, 20, 16, 12, 8, 4, 0]);
for (id, urgency) in input {
let stream = streams
.get_or_create(id, &local_tp, &peer_tp, false, true)
.unwrap();
stream.urgency = urgency;
let new_priority_key = Arc::new(StreamPriorityKey {
urgency: stream.urgency,
incremental: stream.incremental,
id,
..Default::default()
});
let old_priority_key = std::mem::replace(
&mut stream.priority_key,
new_priority_key.clone(),
);
streams.update_priority(&old_priority_key, &new_priority_key);
}
let walk_2: Vec<u64> = streams.writable().collect();
assert_eq!(walk_2, vec![40, 36, 32, 28, 24, 20, 16, 12, 8, 4, 0]);
streams.collect(24, true);
let walk_3: Vec<u64> = streams.writable().collect();
assert_eq!(walk_3, vec![40, 36, 32, 28, 20, 16, 12, 8, 4, 0]);
streams.collect(40, true);
streams.collect(0, true);
let walk_4: Vec<u64> = streams.writable().collect();
assert_eq!(walk_4, vec![36, 32, 28, 20, 16, 12, 8, 4]);
streams
.get_or_create(44, &local_tp, &peer_tp, false, true)
.unwrap();
let walk_5: Vec<u64> = streams.writable().collect();
assert_eq!(walk_5, vec![36, 32, 28, 20, 16, 12, 8, 4, 44]);
}
#[test]
fn writable_prioritized_mixed_urgencies_incrementals() {
let local_tp = crate::TransportParams::default();
let peer_tp = crate::TransportParams {
initial_max_stream_data_bidi_local: 100,
initial_max_stream_data_uni: 100,
..Default::default()
};
let mut streams = StreamMap::new(100, 100, 100);
let input = vec![
(0, 100),
(4, 20),
(8, 100),
(12, 20),
(16, 90),
(20, 25),
(24, 90),
(28, 30),
(32, 80),
(36, 20),
(40, 0),
];
for (id, urgency) in input.clone() {
let stream = streams
.get_or_create(id, &local_tp, &peer_tp, false, true)
.unwrap();
stream.urgency = urgency;
let new_priority_key = Arc::new(StreamPriorityKey {
urgency: stream.urgency,
incremental: stream.incremental,
id,
..Default::default()
});
let old_priority_key = std::mem::replace(
&mut stream.priority_key,
new_priority_key.clone(),
);
streams.update_priority(&old_priority_key, &new_priority_key);
}
let walk_1: Vec<u64> = streams.writable().collect();
cycle_stream_priority(4, &mut streams);
cycle_stream_priority(16, &mut streams);
cycle_stream_priority(0, &mut streams);
let walk_2: Vec<u64> = streams.writable().collect();
cycle_stream_priority(12, &mut streams);
cycle_stream_priority(24, &mut streams);
cycle_stream_priority(8, &mut streams);
let walk_3: Vec<u64> = streams.writable().collect();
cycle_stream_priority(36, &mut streams);
cycle_stream_priority(16, &mut streams);
cycle_stream_priority(0, &mut streams);
let walk_4: Vec<u64> = streams.writable().collect();
cycle_stream_priority(4, &mut streams);
cycle_stream_priority(24, &mut streams);
cycle_stream_priority(8, &mut streams);
let walk_5: Vec<u64> = streams.writable().collect();
cycle_stream_priority(12, &mut streams);
cycle_stream_priority(16, &mut streams);
cycle_stream_priority(0, &mut streams);
let walk_6: Vec<u64> = streams.writable().collect();
cycle_stream_priority(36, &mut streams);
cycle_stream_priority(24, &mut streams);
cycle_stream_priority(8, &mut streams);
let walk_7: Vec<u64> = streams.writable().collect();
cycle_stream_priority(4, &mut streams);
cycle_stream_priority(16, &mut streams);
cycle_stream_priority(0, &mut streams);
let walk_8: Vec<u64> = streams.writable().collect();
cycle_stream_priority(12, &mut streams);
cycle_stream_priority(24, &mut streams);
cycle_stream_priority(8, &mut streams);
let walk_9: Vec<u64> = streams.writable().collect();
cycle_stream_priority(36, &mut streams);
cycle_stream_priority(16, &mut streams);
cycle_stream_priority(0, &mut streams);
assert_eq!(walk_1, vec![40, 4, 12, 36, 20, 28, 32, 16, 24, 0, 8]);
assert_eq!(walk_2, vec![40, 12, 36, 4, 20, 28, 32, 24, 16, 8, 0]);
assert_eq!(walk_3, vec![40, 36, 4, 12, 20, 28, 32, 16, 24, 0, 8]);
assert_eq!(walk_4, vec![40, 4, 12, 36, 20, 28, 32, 24, 16, 8, 0]);
assert_eq!(walk_5, vec![40, 12, 36, 4, 20, 28, 32, 16, 24, 0, 8]);
assert_eq!(walk_6, vec![40, 36, 4, 12, 20, 28, 32, 24, 16, 8, 0]);
assert_eq!(walk_7, vec![40, 4, 12, 36, 20, 28, 32, 16, 24, 0, 8]);
assert_eq!(walk_8, vec![40, 12, 36, 4, 20, 28, 32, 24, 16, 8, 0]);
assert_eq!(walk_9, vec![40, 36, 4, 12, 20, 28, 32, 16, 24, 0, 8]);
streams.collect(20, true);
let walk_10: Vec<u64> = streams.writable().collect();
assert_eq!(walk_10, vec![40, 4, 12, 36, 28, 32, 24, 16, 8, 0]);
let stream = streams
.get_or_create(44, &local_tp, &peer_tp, false, true)
.unwrap();
stream.urgency = 20;
stream.incremental = true;
let new_priority_key = Arc::new(StreamPriorityKey {
urgency: stream.urgency,
incremental: stream.incremental,
id: 44,
..Default::default()
});
let old_priority_key =
std::mem::replace(&mut stream.priority_key, new_priority_key.clone());
streams.update_priority(&old_priority_key, &new_priority_key);
let walk_11: Vec<u64> = streams.writable().collect();
assert_eq!(walk_11, vec![40, 4, 12, 36, 44, 28, 32, 24, 16, 8, 0]);
}
#[test]
fn priority_tree_dupes() {
let mut prioritized_writable: RBTree<StreamWritablePriorityAdapter> =
Default::default();
for id in [0, 4, 8, 12] {
let s = Arc::new(StreamPriorityKey {
urgency: 0,
incremental: false,
id,
..Default::default()
});
prioritized_writable.insert(s);
}
let walk_1: Vec<u64> =
prioritized_writable.iter().map(|s| s.id).collect();
assert_eq!(walk_1, vec![0, 4, 8, 12]);
for id in [0, 4, 8, 12] {
let s = Arc::new(StreamPriorityKey {
urgency: 0,
incremental: false,
id,
..Default::default()
});
prioritized_writable.insert(s);
}
let walk_2: Vec<u64> =
prioritized_writable.iter().map(|s| s.id).collect();
assert_eq!(walk_2, vec![0, 0, 4, 4, 8, 8, 12, 12]);
}
}
mod recv_buf;
mod send_buf;