use std::cmp;
use std::time;
use std::collections::BTreeMap;
use std::collections::VecDeque;
use crate::Error;
use crate::Result;
use crate::flowcontrol;
use super::RangeBuf;
use super::DEFAULT_STREAM_WINDOW;
#[derive(Debug, Default)]
pub struct RecvBuf {
data: BTreeMap<u64, RangeBuf>,
off: u64,
len: u64,
flow_control: flowcontrol::FlowControl,
fin_off: Option<u64>,
error: Option<u64>,
drain: bool,
}
impl RecvBuf {
pub fn new(max_data: u64, max_window: u64) -> RecvBuf {
RecvBuf {
flow_control: flowcontrol::FlowControl::new(
max_data,
cmp::min(max_data, DEFAULT_STREAM_WINDOW),
max_window,
),
..RecvBuf::default()
}
}
pub fn write(&mut self, buf: RangeBuf) -> Result<()> {
if buf.max_off() > self.max_data() {
return Err(Error::FlowControl);
}
if let Some(fin_off) = self.fin_off {
if buf.max_off() > fin_off {
return Err(Error::FinalSize);
}
if buf.fin() && fin_off != buf.max_off() {
return Err(Error::FinalSize);
}
}
if buf.fin() && buf.max_off() < self.len {
return Err(Error::FinalSize);
}
if self.fin_off.is_some() && buf.is_empty() {
return Ok(());
}
if buf.fin() {
self.fin_off = Some(buf.max_off());
}
if !buf.fin() && buf.is_empty() {
return Ok(());
}
if self.off >= buf.max_off() {
if !buf.is_empty() {
return Ok(());
}
}
let mut tmp_bufs = VecDeque::with_capacity(2);
tmp_bufs.push_back(buf);
'tmp: while let Some(mut buf) = tmp_bufs.pop_front() {
if self.off_front() > buf.off() {
buf = buf.split_off((self.off_front() - buf.off()) as usize);
}
if buf.off() < self.max_off() || buf.is_empty() {
for (_, b) in self.data.range(buf.off()..) {
let off = buf.off();
if b.off() > buf.max_off() {
break;
}
if off >= b.off() && buf.max_off() <= b.max_off() {
continue 'tmp;
}
if off >= b.off() && off < b.max_off() {
buf = buf.split_off((b.max_off() - off) as usize);
}
if off < b.off() && buf.max_off() > b.off() {
tmp_bufs
.push_back(buf.split_off((b.off() - off) as usize));
}
}
}
self.len = cmp::max(self.len, buf.max_off());
if !self.drain {
self.data.insert(buf.max_off(), buf);
}
}
Ok(())
}
pub fn emit(&mut self, out: &mut [u8]) -> Result<(usize, bool)> {
let mut len = 0;
let mut cap = out.len();
if !self.ready() {
return Err(Error::Done);
}
if let Some(e) = self.error {
self.data.clear();
return Err(Error::StreamReset(e));
}
while cap > 0 && self.ready() {
let mut entry = match self.data.first_entry() {
Some(entry) => entry,
None => break,
};
let buf = entry.get_mut();
let buf_len = cmp::min(buf.len(), cap);
out[len..len + buf_len].copy_from_slice(&buf[..buf_len]);
self.off += buf_len as u64;
len += buf_len;
cap -= buf_len;
if buf_len < buf.len() {
buf.consume(buf_len);
break;
}
entry.remove();
}
self.flow_control.add_consumed(len as u64);
Ok((len, self.is_fin()))
}
pub fn reset(&mut self, error_code: u64, final_size: u64) -> Result<usize> {
if let Some(fin_off) = self.fin_off {
if fin_off != final_size {
return Err(Error::FinalSize);
}
}
if final_size < self.len {
return Err(Error::FinalSize);
}
let max_data_delta = final_size - self.len;
if self.error.is_some() {
return Ok(max_data_delta as usize);
}
self.error = Some(error_code);
self.off = final_size;
self.data.clear();
let buf = RangeBuf::from(b"", final_size, true);
self.write(buf)?;
Ok(max_data_delta as usize)
}
pub fn update_max_data(&mut self, now: time::Instant) {
self.flow_control.update_max_data(now);
}
pub fn max_data_next(&mut self) -> u64 {
self.flow_control.max_data_next()
}
pub fn max_data(&self) -> u64 {
self.flow_control.max_data()
}
pub fn window(&self) -> u64 {
self.flow_control.window()
}
pub fn autotune_window(&mut self, now: time::Instant, rtt: time::Duration) {
self.flow_control.autotune_window(now, rtt);
}
pub fn shutdown(&mut self) -> Result<()> {
if self.drain {
return Err(Error::Done);
}
self.drain = true;
self.data.clear();
self.off = self.max_off();
Ok(())
}
pub fn off_front(&self) -> u64 {
self.off
}
pub fn almost_full(&self) -> bool {
self.fin_off.is_none() && self.flow_control.should_update_max_data()
}
pub fn max_off(&self) -> u64 {
self.len
}
pub fn is_fin(&self) -> bool {
if self.fin_off == Some(self.off) {
return true;
}
false
}
pub fn is_draining(&self) -> bool {
self.drain
}
pub fn ready(&self) -> bool {
let (_, buf) = match self.data.first_key_value() {
Some(v) => v,
None => return false,
};
buf.off() == self.off
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn empty_read() {
let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
assert_eq!(recv.len, 0);
let mut buf = [0; 32];
assert_eq!(recv.emit(&mut buf), Err(Error::Done));
}
#[test]
fn empty_stream_frame() {
let mut recv = RecvBuf::new(15, DEFAULT_STREAM_WINDOW);
assert_eq!(recv.len, 0);
let buf = RangeBuf::from(b"hello", 0, false);
assert!(recv.write(buf).is_ok());
assert_eq!(recv.len, 5);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 1);
let mut buf = [0; 32];
assert_eq!(recv.emit(&mut buf), Ok((5, false)));
let buf = RangeBuf::from(b"", 10, false);
assert!(recv.write(buf).is_ok());
assert_eq!(recv.len, 5);
assert_eq!(recv.off, 5);
assert_eq!(recv.data.len(), 0);
let buf = RangeBuf::from(b"", 16, false);
assert_eq!(recv.write(buf), Err(Error::FlowControl));
let buf = RangeBuf::from(b"", 5, true);
assert!(recv.write(buf).is_ok());
assert_eq!(recv.len, 5);
assert_eq!(recv.off, 5);
assert_eq!(recv.data.len(), 1);
let buf = RangeBuf::from(b"", 5, true);
assert!(recv.write(buf).is_ok());
assert_eq!(recv.len, 5);
assert_eq!(recv.off, 5);
assert_eq!(recv.data.len(), 1);
let buf = RangeBuf::from(b"aa", 3, true);
assert!(recv.write(buf).is_ok());
assert_eq!(recv.len, 5);
assert_eq!(recv.off, 5);
assert_eq!(recv.data.len(), 1);
let buf = RangeBuf::from(b"", 6, true);
assert_eq!(recv.write(buf), Err(Error::FinalSize));
let buf = RangeBuf::from(b"", 4, true);
assert_eq!(recv.write(buf), Err(Error::FinalSize));
let mut buf = [0; 32];
assert_eq!(recv.emit(&mut buf), Ok((0, true)));
}
#[test]
fn ordered_read() {
let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
assert_eq!(recv.len, 0);
let mut buf = [0; 32];
let first = RangeBuf::from(b"hello", 0, false);
let second = RangeBuf::from(b"world", 5, false);
let third = RangeBuf::from(b"something", 10, true);
assert!(recv.write(second).is_ok());
assert_eq!(recv.len, 10);
assert_eq!(recv.off, 0);
assert_eq!(recv.emit(&mut buf), Err(Error::Done));
assert!(recv.write(third).is_ok());
assert_eq!(recv.len, 19);
assert_eq!(recv.off, 0);
assert_eq!(recv.emit(&mut buf), Err(Error::Done));
assert!(recv.write(first).is_ok());
assert_eq!(recv.len, 19);
assert_eq!(recv.off, 0);
let (len, fin) = recv.emit(&mut buf).unwrap();
assert_eq!(len, 19);
assert!(fin);
assert_eq!(&buf[..len], b"helloworldsomething");
assert_eq!(recv.len, 19);
assert_eq!(recv.off, 19);
assert_eq!(recv.emit(&mut buf), Err(Error::Done));
}
#[test]
fn split_read() {
let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
assert_eq!(recv.len, 0);
let mut buf = [0; 32];
let first = RangeBuf::from(b"something", 0, false);
let second = RangeBuf::from(b"helloworld", 9, true);
assert!(recv.write(first).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 0);
assert!(recv.write(second).is_ok());
assert_eq!(recv.len, 19);
assert_eq!(recv.off, 0);
let (len, fin) = recv.emit(&mut buf[..10]).unwrap();
assert_eq!(len, 10);
assert!(!fin);
assert_eq!(&buf[..len], b"somethingh");
assert_eq!(recv.len, 19);
assert_eq!(recv.off, 10);
let (len, fin) = recv.emit(&mut buf[..5]).unwrap();
assert_eq!(len, 5);
assert!(!fin);
assert_eq!(&buf[..len], b"ellow");
assert_eq!(recv.len, 19);
assert_eq!(recv.off, 15);
let (len, fin) = recv.emit(&mut buf[..10]).unwrap();
assert_eq!(len, 4);
assert!(fin);
assert_eq!(&buf[..len], b"orld");
assert_eq!(recv.len, 19);
assert_eq!(recv.off, 19);
}
#[test]
fn incomplete_read() {
let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
assert_eq!(recv.len, 0);
let mut buf = [0; 32];
let first = RangeBuf::from(b"something", 0, false);
let second = RangeBuf::from(b"helloworld", 9, true);
assert!(recv.write(second).is_ok());
assert_eq!(recv.len, 19);
assert_eq!(recv.off, 0);
assert_eq!(recv.emit(&mut buf), Err(Error::Done));
assert!(recv.write(first).is_ok());
assert_eq!(recv.len, 19);
assert_eq!(recv.off, 0);
let (len, fin) = recv.emit(&mut buf).unwrap();
assert_eq!(len, 19);
assert!(fin);
assert_eq!(&buf[..len], b"somethinghelloworld");
assert_eq!(recv.len, 19);
assert_eq!(recv.off, 19);
}
#[test]
fn zero_len_read() {
let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
assert_eq!(recv.len, 0);
let mut buf = [0; 32];
let first = RangeBuf::from(b"something", 0, false);
let second = RangeBuf::from(b"", 9, true);
assert!(recv.write(first).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 1);
assert!(recv.write(second).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 1);
let (len, fin) = recv.emit(&mut buf).unwrap();
assert_eq!(len, 9);
assert!(fin);
assert_eq!(&buf[..len], b"something");
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 9);
}
#[test]
fn past_read() {
let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
assert_eq!(recv.len, 0);
let mut buf = [0; 32];
let first = RangeBuf::from(b"something", 0, false);
let second = RangeBuf::from(b"hello", 3, false);
let third = RangeBuf::from(b"ello", 4, true);
let fourth = RangeBuf::from(b"ello", 5, true);
assert!(recv.write(first).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 1);
let (len, fin) = recv.emit(&mut buf).unwrap();
assert_eq!(len, 9);
assert!(!fin);
assert_eq!(&buf[..len], b"something");
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 9);
assert!(recv.write(second).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 9);
assert_eq!(recv.data.len(), 0);
assert_eq!(recv.write(third), Err(Error::FinalSize));
assert!(recv.write(fourth).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 9);
assert_eq!(recv.data.len(), 0);
assert_eq!(recv.emit(&mut buf), Err(Error::Done));
}
#[test]
fn fully_overlapping_read() {
let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
assert_eq!(recv.len, 0);
let mut buf = [0; 32];
let first = RangeBuf::from(b"something", 0, false);
let second = RangeBuf::from(b"hello", 4, false);
assert!(recv.write(first).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 1);
assert!(recv.write(second).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 1);
let (len, fin) = recv.emit(&mut buf).unwrap();
assert_eq!(len, 9);
assert!(!fin);
assert_eq!(&buf[..len], b"something");
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 9);
assert_eq!(recv.data.len(), 0);
assert_eq!(recv.emit(&mut buf), Err(Error::Done));
}
#[test]
fn fully_overlapping_read2() {
let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
assert_eq!(recv.len, 0);
let mut buf = [0; 32];
let first = RangeBuf::from(b"something", 0, false);
let second = RangeBuf::from(b"hello", 4, false);
assert!(recv.write(second).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 1);
assert!(recv.write(first).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 2);
let (len, fin) = recv.emit(&mut buf).unwrap();
assert_eq!(len, 9);
assert!(!fin);
assert_eq!(&buf[..len], b"somehello");
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 9);
assert_eq!(recv.data.len(), 0);
assert_eq!(recv.emit(&mut buf), Err(Error::Done));
}
#[test]
fn fully_overlapping_read3() {
let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
assert_eq!(recv.len, 0);
let mut buf = [0; 32];
let first = RangeBuf::from(b"something", 0, false);
let second = RangeBuf::from(b"hello", 3, false);
assert!(recv.write(second).is_ok());
assert_eq!(recv.len, 8);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 1);
assert!(recv.write(first).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 3);
let (len, fin) = recv.emit(&mut buf).unwrap();
assert_eq!(len, 9);
assert!(!fin);
assert_eq!(&buf[..len], b"somhellog");
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 9);
assert_eq!(recv.data.len(), 0);
assert_eq!(recv.emit(&mut buf), Err(Error::Done));
}
#[test]
fn fully_overlapping_read_multi() {
let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
assert_eq!(recv.len, 0);
let mut buf = [0; 32];
let first = RangeBuf::from(b"somethingsomething", 0, false);
let second = RangeBuf::from(b"hello", 3, false);
let third = RangeBuf::from(b"hello", 12, false);
assert!(recv.write(second).is_ok());
assert_eq!(recv.len, 8);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 1);
assert!(recv.write(third).is_ok());
assert_eq!(recv.len, 17);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 2);
assert!(recv.write(first).is_ok());
assert_eq!(recv.len, 18);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 5);
let (len, fin) = recv.emit(&mut buf).unwrap();
assert_eq!(len, 18);
assert!(!fin);
assert_eq!(&buf[..len], b"somhellogsomhellog");
assert_eq!(recv.len, 18);
assert_eq!(recv.off, 18);
assert_eq!(recv.data.len(), 0);
assert_eq!(recv.emit(&mut buf), Err(Error::Done));
}
#[test]
fn overlapping_start_read() {
let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
assert_eq!(recv.len, 0);
let mut buf = [0; 32];
let first = RangeBuf::from(b"something", 0, false);
let second = RangeBuf::from(b"hello", 8, true);
assert!(recv.write(first).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 1);
assert!(recv.write(second).is_ok());
assert_eq!(recv.len, 13);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 2);
let (len, fin) = recv.emit(&mut buf).unwrap();
assert_eq!(len, 13);
assert!(fin);
assert_eq!(&buf[..len], b"somethingello");
assert_eq!(recv.len, 13);
assert_eq!(recv.off, 13);
assert_eq!(recv.emit(&mut buf), Err(Error::Done));
}
#[test]
fn overlapping_end_read() {
let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
assert_eq!(recv.len, 0);
let mut buf = [0; 32];
let first = RangeBuf::from(b"hello", 0, false);
let second = RangeBuf::from(b"something", 3, true);
assert!(recv.write(second).is_ok());
assert_eq!(recv.len, 12);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 1);
assert!(recv.write(first).is_ok());
assert_eq!(recv.len, 12);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 2);
let (len, fin) = recv.emit(&mut buf).unwrap();
assert_eq!(len, 12);
assert!(fin);
assert_eq!(&buf[..len], b"helsomething");
assert_eq!(recv.len, 12);
assert_eq!(recv.off, 12);
assert_eq!(recv.emit(&mut buf), Err(Error::Done));
}
#[test]
fn overlapping_end_twice_read() {
let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
assert_eq!(recv.len, 0);
let mut buf = [0; 32];
let first = RangeBuf::from(b"he", 0, false);
let second = RangeBuf::from(b"ow", 4, false);
let third = RangeBuf::from(b"rl", 7, false);
let fourth = RangeBuf::from(b"helloworld", 0, true);
assert!(recv.write(third).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 1);
assert!(recv.write(second).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 2);
assert!(recv.write(first).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 3);
assert!(recv.write(fourth).is_ok());
assert_eq!(recv.len, 10);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 6);
let (len, fin) = recv.emit(&mut buf).unwrap();
assert_eq!(len, 10);
assert!(fin);
assert_eq!(&buf[..len], b"helloworld");
assert_eq!(recv.len, 10);
assert_eq!(recv.off, 10);
assert_eq!(recv.emit(&mut buf), Err(Error::Done));
}
#[test]
fn overlapping_end_twice_and_contained_read() {
let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
assert_eq!(recv.len, 0);
let mut buf = [0; 32];
let first = RangeBuf::from(b"hellow", 0, false);
let second = RangeBuf::from(b"barfoo", 10, true);
let third = RangeBuf::from(b"rl", 7, false);
let fourth = RangeBuf::from(b"elloworldbarfoo", 1, true);
assert!(recv.write(third).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 1);
assert!(recv.write(second).is_ok());
assert_eq!(recv.len, 16);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 2);
assert!(recv.write(first).is_ok());
assert_eq!(recv.len, 16);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 3);
assert!(recv.write(fourth).is_ok());
assert_eq!(recv.len, 16);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 5);
let (len, fin) = recv.emit(&mut buf).unwrap();
assert_eq!(len, 16);
assert!(fin);
assert_eq!(&buf[..len], b"helloworldbarfoo");
assert_eq!(recv.len, 16);
assert_eq!(recv.off, 16);
assert_eq!(recv.emit(&mut buf), Err(Error::Done));
}
#[test]
fn partially_multi_overlapping_reordered_read() {
let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
assert_eq!(recv.len, 0);
let mut buf = [0; 32];
let first = RangeBuf::from(b"hello", 8, false);
let second = RangeBuf::from(b"something", 0, false);
let third = RangeBuf::from(b"moar", 11, true);
assert!(recv.write(first).is_ok());
assert_eq!(recv.len, 13);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 1);
assert!(recv.write(second).is_ok());
assert_eq!(recv.len, 13);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 2);
assert!(recv.write(third).is_ok());
assert_eq!(recv.len, 15);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 3);
let (len, fin) = recv.emit(&mut buf).unwrap();
assert_eq!(len, 15);
assert!(fin);
assert_eq!(&buf[..len], b"somethinhelloar");
assert_eq!(recv.len, 15);
assert_eq!(recv.off, 15);
assert_eq!(recv.data.len(), 0);
assert_eq!(recv.emit(&mut buf), Err(Error::Done));
}
#[test]
fn partially_multi_overlapping_reordered_read2() {
let mut recv = RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW);
assert_eq!(recv.len, 0);
let mut buf = [0; 32];
let first = RangeBuf::from(b"aaa", 0, false);
let second = RangeBuf::from(b"bbb", 2, false);
let third = RangeBuf::from(b"ccc", 4, false);
let fourth = RangeBuf::from(b"ddd", 6, false);
let fifth = RangeBuf::from(b"eee", 9, false);
let sixth = RangeBuf::from(b"fff", 11, false);
assert!(recv.write(second).is_ok());
assert_eq!(recv.len, 5);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 1);
assert!(recv.write(fourth).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 2);
assert!(recv.write(third).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 3);
assert!(recv.write(first).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 4);
assert!(recv.write(sixth).is_ok());
assert_eq!(recv.len, 14);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 5);
assert!(recv.write(fifth).is_ok());
assert_eq!(recv.len, 14);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 6);
let (len, fin) = recv.emit(&mut buf).unwrap();
assert_eq!(len, 14);
assert!(!fin);
assert_eq!(&buf[..len], b"aabbbcdddeefff");
assert_eq!(recv.len, 14);
assert_eq!(recv.off, 14);
assert_eq!(recv.data.len(), 0);
assert_eq!(recv.emit(&mut buf), Err(Error::Done));
}
}