use super::Error;
use super::Result;
use super::frame;
pub const HTTP3_CONTROL_STREAM_TYPE_ID: u64 = 0x0;
pub const HTTP3_PUSH_STREAM_TYPE_ID: u64 = 0x1;
pub const QPACK_ENCODER_STREAM_TYPE_ID: u64 = 0x2;
pub const QPACK_DECODER_STREAM_TYPE_ID: u64 = 0x3;
const MAX_STATE_BUF_SIZE: usize = (1 << 24) - 1;
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum Type {
Control,
Request,
Push,
QpackEncoder,
QpackDecoder,
Unknown,
}
impl Type {
#[cfg(feature = "qlog")]
pub fn to_qlog(self) -> qlog::events::h3::H3StreamType {
match self {
Type::Control => qlog::events::h3::H3StreamType::Control,
Type::Request => qlog::events::h3::H3StreamType::Request,
Type::Push => qlog::events::h3::H3StreamType::Push,
Type::QpackEncoder => qlog::events::h3::H3StreamType::QpackEncode,
Type::QpackDecoder => qlog::events::h3::H3StreamType::QpackDecode,
Type::Unknown => qlog::events::h3::H3StreamType::Unknown,
}
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum State {
StreamType,
FrameType,
FramePayloadLen,
FramePayload,
Data,
PushId,
QpackInstruction,
Drain,
Finished,
}
impl Type {
pub fn deserialize(v: u64) -> Result<Type> {
match v {
HTTP3_CONTROL_STREAM_TYPE_ID => Ok(Type::Control),
HTTP3_PUSH_STREAM_TYPE_ID => Ok(Type::Push),
QPACK_ENCODER_STREAM_TYPE_ID => Ok(Type::QpackEncoder),
QPACK_DECODER_STREAM_TYPE_ID => Ok(Type::QpackDecoder),
_ => Ok(Type::Unknown),
}
}
}
#[derive(Debug)]
pub struct Stream {
id: u64,
ty: Option<Type>,
state: State,
state_buf: Vec<u8>,
state_len: usize,
state_off: usize,
frame_type: Option<u64>,
is_local: bool,
remote_initialized: bool,
local_initialized: bool,
data_event_triggered: bool,
last_priority_update: Option<Vec<u8>>,
headers_received_count: usize,
data_received: bool,
trailers_sent: bool,
trailers_received: bool,
}
impl Stream {
pub fn new(id: u64, is_local: bool) -> Stream {
let (ty, state) = if crate::stream::is_bidi(id) {
(Some(Type::Request), State::FrameType)
} else {
(None, State::StreamType)
};
Stream {
id,
ty,
state,
state_buf: vec![0; 16],
state_len: 1,
state_off: 0,
frame_type: None,
is_local,
remote_initialized: false,
local_initialized: false,
data_event_triggered: false,
last_priority_update: None,
headers_received_count: 0,
data_received: false,
trailers_sent: false,
trailers_received: false,
}
}
pub fn ty(&self) -> Option<Type> {
self.ty
}
pub fn state(&self) -> State {
self.state
}
pub fn set_ty(&mut self, ty: Type) -> Result<()> {
assert_eq!(self.state, State::StreamType);
self.ty = Some(ty);
let state = match ty {
Type::Control | Type::Request => State::FrameType,
Type::Push => State::PushId,
Type::QpackEncoder | Type::QpackDecoder => {
self.remote_initialized = true;
State::QpackInstruction
},
Type::Unknown => State::Drain,
};
self.state_transition(state, 1, true)?;
Ok(())
}
pub fn set_push_id(&mut self, _id: u64) -> Result<()> {
assert_eq!(self.state, State::PushId);
self.state_transition(State::FrameType, 1, true)?;
Ok(())
}
pub fn set_frame_type(&mut self, ty: u64) -> Result<()> {
assert_eq!(self.state, State::FrameType);
match self.ty {
Some(Type::Control) => {
match (ty, self.remote_initialized) {
(frame::SETTINGS_FRAME_TYPE_ID, false) =>
self.remote_initialized = true,
(_, false) => return Err(Error::MissingSettings),
(frame::SETTINGS_FRAME_TYPE_ID, true) =>
return Err(Error::FrameUnexpected),
(frame::DATA_FRAME_TYPE_ID, true) =>
return Err(Error::FrameUnexpected),
(frame::HEADERS_FRAME_TYPE_ID, true) =>
return Err(Error::FrameUnexpected),
(frame::PUSH_PROMISE_FRAME_TYPE_ID, true) =>
return Err(Error::FrameUnexpected),
(_, true) => (),
}
},
Some(Type::Request) => {
if !self.is_local {
match (ty, self.remote_initialized) {
(frame::HEADERS_FRAME_TYPE_ID, false) => {
self.remote_initialized = true;
},
(frame::DATA_FRAME_TYPE_ID, false) =>
return Err(Error::FrameUnexpected),
(frame::HEADERS_FRAME_TYPE_ID, true) => {
if self.trailers_received {
return Err(Error::FrameUnexpected);
}
if self.data_received {
self.trailers_received = true;
}
},
(frame::DATA_FRAME_TYPE_ID, true) => {
if self.trailers_received {
return Err(Error::FrameUnexpected);
}
self.data_received = true;
},
(frame::CANCEL_PUSH_FRAME_TYPE_ID, _) =>
return Err(Error::FrameUnexpected),
(frame::SETTINGS_FRAME_TYPE_ID, _) =>
return Err(Error::FrameUnexpected),
(frame::GOAWAY_FRAME_TYPE_ID, _) =>
return Err(Error::FrameUnexpected),
(frame::MAX_PUSH_FRAME_TYPE_ID, _) =>
return Err(Error::FrameUnexpected),
_ => (),
}
}
},
Some(Type::Push) => {
match ty {
frame::CANCEL_PUSH_FRAME_TYPE_ID =>
return Err(Error::FrameUnexpected),
frame::SETTINGS_FRAME_TYPE_ID =>
return Err(Error::FrameUnexpected),
frame::PUSH_PROMISE_FRAME_TYPE_ID =>
return Err(Error::FrameUnexpected),
frame::GOAWAY_FRAME_TYPE_ID =>
return Err(Error::FrameUnexpected),
frame::MAX_PUSH_FRAME_TYPE_ID =>
return Err(Error::FrameUnexpected),
_ => (),
}
},
_ => return Err(Error::FrameUnexpected),
}
self.frame_type = Some(ty);
self.state_transition(State::FramePayloadLen, 1, true)?;
Ok(())
}
pub fn frame_type(&self) -> Option<u64> {
self.frame_type
}
pub fn set_frame_payload_len(&mut self, len: u64) -> Result<()> {
assert_eq!(self.state, State::FramePayloadLen);
if matches!(self.ty, Some(Type::Control | Type::Request | Type::Push)) {
let (state, resize) = match self.frame_type {
Some(frame::DATA_FRAME_TYPE_ID) => (State::Data, false),
Some(
frame::GOAWAY_FRAME_TYPE_ID |
frame::PUSH_PROMISE_FRAME_TYPE_ID |
frame::CANCEL_PUSH_FRAME_TYPE_ID |
frame::MAX_PUSH_FRAME_TYPE_ID,
) => {
if len == 0 {
return Err(Error::FrameError);
}
(State::FramePayload, true)
},
_ => (State::FramePayload, true),
};
self.state_transition(state, len as usize, resize)?;
return Ok(());
}
Err(Error::InternalError)
}
pub fn try_fill_buffer(
&mut self, conn: &mut crate::Connection,
) -> Result<()> {
if self.state_buffer_complete() {
return Ok(());
}
let buf = &mut self.state_buf[self.state_off..self.state_len];
let read = match conn.stream_recv(self.id, buf) {
Ok((len, fin)) => {
if fin &&
matches!(
self.ty,
Some(Type::Control) |
Some(Type::QpackEncoder) |
Some(Type::QpackDecoder)
)
{
super::close_conn_critical_stream(conn)?;
}
len
},
Err(e @ crate::Error::StreamReset(_)) => {
if matches!(
self.ty,
Some(Type::Control) |
Some(Type::QpackEncoder) |
Some(Type::QpackDecoder)
) {
super::close_conn_critical_stream(conn)?;
}
return Err(e.into());
},
Err(e) => {
if e == crate::Error::Done {
self.reset_data_event();
}
return Err(e.into());
},
};
trace!(
"{} read {} bytes on stream {}",
conn.trace_id(),
read,
self.id,
);
self.state_off += read;
if !self.state_buffer_complete() {
self.reset_data_event();
return Err(Error::Done);
}
Ok(())
}
pub fn initialize_local(&mut self) {
self.local_initialized = true
}
pub fn local_initialized(&self) -> bool {
self.local_initialized
}
pub fn increment_headers_received(&mut self) {
self.headers_received_count =
self.headers_received_count.saturating_add(1);
}
pub fn headers_received_count(&self) -> usize {
self.headers_received_count
}
pub fn mark_trailers_sent(&mut self) {
self.trailers_sent = true;
}
pub fn trailers_sent(&self) -> bool {
self.trailers_sent
}
#[cfg(test)]
fn try_fill_buffer_for_tests(
&mut self, stream: &mut std::io::Cursor<Vec<u8>>,
) -> Result<()> {
if self.state_buffer_complete() {
return Ok(());
}
let buf = &mut self.state_buf[self.state_off..self.state_len];
let read = std::io::Read::read(stream, buf).unwrap();
self.state_off += read;
if !self.state_buffer_complete() {
return Err(Error::Done);
}
Ok(())
}
pub fn try_consume_varint(&mut self) -> Result<u64> {
if self.state_off == 1 {
self.state_len = octets::varint_parse_len(self.state_buf[0]);
self.state_buf.resize(self.state_len, 0);
}
if !self.state_buffer_complete() {
return Err(Error::Done);
}
let varint = octets::Octets::with_slice(&self.state_buf).get_varint()?;
Ok(varint)
}
pub fn try_consume_frame(&mut self) -> Result<(frame::Frame, u64)> {
self.reset_data_event();
let payload_len = self.state_len as u64;
let frame = frame::Frame::from_bytes(
self.frame_type.unwrap(),
payload_len,
&self.state_buf,
)?;
self.state_transition(State::FrameType, 1, true)?;
Ok((frame, payload_len))
}
pub fn try_consume_data(
&mut self, conn: &mut crate::Connection, out: &mut [u8],
) -> Result<(usize, bool)> {
let left = std::cmp::min(out.len(), self.state_len - self.state_off);
let (len, fin) = match conn.stream_recv(self.id, &mut out[..left]) {
Ok(v) => v,
Err(e) => {
if e == crate::Error::Done {
self.reset_data_event();
}
return Err(e.into());
},
};
self.state_off += len;
if !conn.stream_readable(self.id) {
self.reset_data_event();
}
if self.state_buffer_complete() {
self.state_transition(State::FrameType, 1, true)?;
}
Ok((len, fin))
}
pub fn finished(&mut self) {
let _ = self.state_transition(State::Finished, 0, false);
}
#[cfg(test)]
fn try_consume_data_for_tests(
&mut self, stream: &mut std::io::Cursor<Vec<u8>>, out: &mut [u8],
) -> Result<usize> {
let left = std::cmp::min(out.len(), self.state_len - self.state_off);
let len = std::io::Read::read(stream, &mut out[..left]).unwrap();
self.state_off += len;
if self.state_buffer_complete() {
self.state_transition(State::FrameType, 1, true)?;
}
Ok(len)
}
pub fn try_trigger_data_event(&mut self) -> bool {
if self.data_event_triggered {
return false;
}
self.data_event_triggered = true;
true
}
fn reset_data_event(&mut self) {
self.data_event_triggered = false;
}
pub fn set_last_priority_update(&mut self, priority_update: Option<Vec<u8>>) {
self.last_priority_update = priority_update;
}
pub fn take_last_priority_update(&mut self) -> Option<Vec<u8>> {
self.last_priority_update.take()
}
pub fn has_last_priority_update(&self) -> bool {
self.last_priority_update.is_some()
}
fn state_buffer_complete(&self) -> bool {
self.state_off == self.state_len
}
fn state_transition(
&mut self, new_state: State, expected_len: usize, resize: bool,
) -> Result<()> {
if resize {
if expected_len > MAX_STATE_BUF_SIZE {
return Err(Error::ExcessiveLoad);
}
self.state_buf.resize(expected_len, 0);
}
self.state = new_state;
self.state_off = 0;
self.state_len = expected_len;
Ok(())
}
}
#[cfg(test)]
mod tests {
use crate::h3::frame::*;
use super::*;
fn open_uni(b: &mut octets::OctetsMut, ty: u64) -> Result<Stream> {
let stream = Stream::new(2, false);
assert_eq!(stream.state, State::StreamType);
b.put_varint(ty)?;
Ok(stream)
}
fn parse_uni(
stream: &mut Stream, ty: u64, cursor: &mut std::io::Cursor<Vec<u8>>,
) -> Result<()> {
stream.try_fill_buffer_for_tests(cursor)?;
let stream_ty = stream.try_consume_varint()?;
assert_eq!(stream_ty, ty);
stream.set_ty(Type::deserialize(stream_ty).unwrap())?;
Ok(())
}
fn parse_skip_frame(
stream: &mut Stream, cursor: &mut std::io::Cursor<Vec<u8>>,
) -> Result<()> {
stream.try_fill_buffer_for_tests(cursor)?;
let frame_ty = stream.try_consume_varint()?;
stream.set_frame_type(frame_ty)?;
assert_eq!(stream.state, State::FramePayloadLen);
stream.try_fill_buffer_for_tests(cursor)?;
let frame_payload_len = stream.try_consume_varint()?;
stream.set_frame_payload_len(frame_payload_len)?;
assert_eq!(stream.state, State::FramePayload);
stream.try_fill_buffer_for_tests(cursor)?;
stream.try_consume_frame()?;
assert_eq!(stream.state, State::FrameType);
Ok(())
}
#[test]
fn control_good() {
let mut d = vec![42; 40];
let mut b = octets::OctetsMut::with_slice(&mut d);
let raw_settings = vec![
(SETTINGS_MAX_FIELD_SECTION_SIZE, 0),
(SETTINGS_QPACK_MAX_TABLE_CAPACITY, 0),
(SETTINGS_QPACK_BLOCKED_STREAMS, 0),
];
let frame = Frame::Settings {
max_field_section_size: Some(0),
qpack_max_table_capacity: Some(0),
qpack_blocked_streams: Some(0),
connect_protocol_enabled: None,
h3_datagram: None,
grease: None,
additional_settings: None,
raw: Some(raw_settings),
};
let mut stream = open_uni(&mut b, HTTP3_CONTROL_STREAM_TYPE_ID).unwrap();
frame.to_bytes(&mut b).unwrap();
let mut cursor = std::io::Cursor::new(d);
parse_uni(&mut stream, HTTP3_CONTROL_STREAM_TYPE_ID, &mut cursor)
.unwrap();
assert_eq!(stream.state, State::FrameType);
stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
let frame_ty = stream.try_consume_varint().unwrap();
assert_eq!(frame_ty, frame::SETTINGS_FRAME_TYPE_ID);
stream.set_frame_type(frame_ty).unwrap();
assert_eq!(stream.state, State::FramePayloadLen);
stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
let frame_payload_len = stream.try_consume_varint().unwrap();
assert_eq!(frame_payload_len, 6);
stream.set_frame_payload_len(frame_payload_len).unwrap();
assert_eq!(stream.state, State::FramePayload);
stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
assert_eq!(stream.try_consume_frame(), Ok((frame, 6)));
assert_eq!(stream.state, State::FrameType);
}
#[test]
fn control_empty_settings() {
let mut d = vec![42; 40];
let mut b = octets::OctetsMut::with_slice(&mut d);
let frame = Frame::Settings {
max_field_section_size: None,
qpack_max_table_capacity: None,
qpack_blocked_streams: None,
connect_protocol_enabled: None,
h3_datagram: None,
grease: None,
additional_settings: None,
raw: Some(vec![]),
};
let mut stream = open_uni(&mut b, HTTP3_CONTROL_STREAM_TYPE_ID).unwrap();
frame.to_bytes(&mut b).unwrap();
let mut cursor = std::io::Cursor::new(d);
parse_uni(&mut stream, HTTP3_CONTROL_STREAM_TYPE_ID, &mut cursor)
.unwrap();
assert_eq!(stream.state, State::FrameType);
stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
let frame_ty = stream.try_consume_varint().unwrap();
assert_eq!(frame_ty, frame::SETTINGS_FRAME_TYPE_ID);
stream.set_frame_type(frame_ty).unwrap();
assert_eq!(stream.state, State::FramePayloadLen);
stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
let frame_payload_len = stream.try_consume_varint().unwrap();
assert_eq!(frame_payload_len, 0);
stream.set_frame_payload_len(frame_payload_len).unwrap();
assert_eq!(stream.state, State::FramePayload);
stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
assert_eq!(stream.try_consume_frame(), Ok((frame, 0)));
assert_eq!(stream.state, State::FrameType);
}
#[test]
fn control_bad_multiple_settings() {
let mut d = vec![42; 40];
let mut b = octets::OctetsMut::with_slice(&mut d);
let raw_settings = vec![
(SETTINGS_MAX_FIELD_SECTION_SIZE, 0),
(SETTINGS_QPACK_MAX_TABLE_CAPACITY, 0),
(SETTINGS_QPACK_BLOCKED_STREAMS, 0),
];
let frame = frame::Frame::Settings {
max_field_section_size: Some(0),
qpack_max_table_capacity: Some(0),
qpack_blocked_streams: Some(0),
connect_protocol_enabled: None,
h3_datagram: None,
grease: None,
additional_settings: None,
raw: Some(raw_settings),
};
let mut stream = open_uni(&mut b, HTTP3_CONTROL_STREAM_TYPE_ID).unwrap();
frame.to_bytes(&mut b).unwrap();
frame.to_bytes(&mut b).unwrap();
let mut cursor = std::io::Cursor::new(d);
parse_uni(&mut stream, HTTP3_CONTROL_STREAM_TYPE_ID, &mut cursor)
.unwrap();
assert_eq!(stream.state, State::FrameType);
stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
let frame_ty = stream.try_consume_varint().unwrap();
assert_eq!(frame_ty, frame::SETTINGS_FRAME_TYPE_ID);
stream.set_frame_type(frame_ty).unwrap();
assert_eq!(stream.state, State::FramePayloadLen);
stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
let frame_payload_len = stream.try_consume_varint().unwrap();
assert_eq!(frame_payload_len, 6);
stream.set_frame_payload_len(frame_payload_len).unwrap();
assert_eq!(stream.state, State::FramePayload);
stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
assert_eq!(stream.try_consume_frame(), Ok((frame, 6)));
assert_eq!(stream.state, State::FrameType);
stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
let frame_ty = stream.try_consume_varint().unwrap();
assert_eq!(stream.set_frame_type(frame_ty), Err(Error::FrameUnexpected));
}
#[test]
fn control_bad_late_settings() {
let mut d = vec![42; 40];
let mut b = octets::OctetsMut::with_slice(&mut d);
let goaway = frame::Frame::GoAway { id: 0 };
let raw_settings = vec![
(SETTINGS_MAX_FIELD_SECTION_SIZE, 0),
(SETTINGS_QPACK_MAX_TABLE_CAPACITY, 0),
(SETTINGS_QPACK_BLOCKED_STREAMS, 0),
];
let settings = frame::Frame::Settings {
max_field_section_size: Some(0),
qpack_max_table_capacity: Some(0),
qpack_blocked_streams: Some(0),
connect_protocol_enabled: None,
h3_datagram: None,
grease: None,
additional_settings: None,
raw: Some(raw_settings),
};
let mut stream = open_uni(&mut b, HTTP3_CONTROL_STREAM_TYPE_ID).unwrap();
goaway.to_bytes(&mut b).unwrap();
settings.to_bytes(&mut b).unwrap();
let mut cursor = std::io::Cursor::new(d);
parse_uni(&mut stream, HTTP3_CONTROL_STREAM_TYPE_ID, &mut cursor)
.unwrap();
assert_eq!(stream.state, State::FrameType);
stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
let frame_ty = stream.try_consume_varint().unwrap();
assert_eq!(stream.set_frame_type(frame_ty), Err(Error::MissingSettings));
}
#[test]
fn control_bad_frame() {
let mut d = vec![42; 40];
let mut b = octets::OctetsMut::with_slice(&mut d);
let header_block = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12];
let hdrs = frame::Frame::Headers { header_block };
let raw_settings = vec![
(SETTINGS_MAX_FIELD_SECTION_SIZE, 0),
(SETTINGS_QPACK_MAX_TABLE_CAPACITY, 0),
(SETTINGS_QPACK_BLOCKED_STREAMS, 0),
(33, 33),
];
let settings = frame::Frame::Settings {
max_field_section_size: Some(0),
qpack_max_table_capacity: Some(0),
qpack_blocked_streams: Some(0),
connect_protocol_enabled: None,
h3_datagram: None,
grease: None,
additional_settings: None,
raw: Some(raw_settings),
};
let mut stream = open_uni(&mut b, HTTP3_CONTROL_STREAM_TYPE_ID).unwrap();
settings.to_bytes(&mut b).unwrap();
hdrs.to_bytes(&mut b).unwrap();
let mut cursor = std::io::Cursor::new(d);
parse_uni(&mut stream, HTTP3_CONTROL_STREAM_TYPE_ID, &mut cursor)
.unwrap();
assert_eq!(stream.state, State::FrameType);
stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
let frame_ty = stream.try_consume_varint().unwrap();
stream.set_frame_type(frame_ty).unwrap();
stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
let frame_payload_len = stream.try_consume_varint().unwrap();
stream.set_frame_payload_len(frame_payload_len).unwrap();
stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
assert!(stream.try_consume_frame().is_ok());
stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
let frame_ty = stream.try_consume_varint().unwrap();
assert_eq!(stream.set_frame_type(frame_ty), Err(Error::FrameUnexpected));
}
#[test]
fn request_no_data() {
let mut stream = Stream::new(0, false);
assert_eq!(stream.ty, Some(Type::Request));
assert_eq!(stream.state, State::FrameType);
assert_eq!(stream.try_consume_varint(), Err(Error::Done));
}
#[test]
fn request_good() {
let mut stream = Stream::new(0, false);
let mut d = vec![42; 128];
let mut b = octets::OctetsMut::with_slice(&mut d);
let header_block = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12];
let payload = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12];
let hdrs = frame::Frame::Headers { header_block };
let data = frame::Frame::Data {
payload: payload.clone(),
};
hdrs.to_bytes(&mut b).unwrap();
data.to_bytes(&mut b).unwrap();
let mut cursor = std::io::Cursor::new(d);
stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
let frame_ty = stream.try_consume_varint().unwrap();
assert_eq!(frame_ty, frame::HEADERS_FRAME_TYPE_ID);
stream.set_frame_type(frame_ty).unwrap();
assert_eq!(stream.state, State::FramePayloadLen);
stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
let frame_payload_len = stream.try_consume_varint().unwrap();
assert_eq!(frame_payload_len, 12);
stream.set_frame_payload_len(frame_payload_len).unwrap();
assert_eq!(stream.state, State::FramePayload);
stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
assert_eq!(stream.try_consume_frame(), Ok((hdrs, 12)));
assert_eq!(stream.state, State::FrameType);
stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
let frame_ty = stream.try_consume_varint().unwrap();
assert_eq!(frame_ty, frame::DATA_FRAME_TYPE_ID);
stream.set_frame_type(frame_ty).unwrap();
assert_eq!(stream.state, State::FramePayloadLen);
stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
let frame_payload_len = stream.try_consume_varint().unwrap();
assert_eq!(frame_payload_len, 12);
stream.set_frame_payload_len(frame_payload_len).unwrap();
assert_eq!(stream.state, State::Data);
let mut recv_buf = vec![0; payload.len()];
assert_eq!(
stream.try_consume_data_for_tests(&mut cursor, &mut recv_buf),
Ok(payload.len())
);
assert_eq!(payload, recv_buf);
assert_eq!(stream.state, State::FrameType);
}
#[test]
fn push_good() {
let mut d = vec![42; 128];
let mut b = octets::OctetsMut::with_slice(&mut d);
let header_block = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12];
let payload = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12];
let hdrs = frame::Frame::Headers { header_block };
let data = frame::Frame::Data {
payload: payload.clone(),
};
let mut stream = open_uni(&mut b, HTTP3_PUSH_STREAM_TYPE_ID).unwrap();
b.put_varint(1).unwrap();
hdrs.to_bytes(&mut b).unwrap();
data.to_bytes(&mut b).unwrap();
let mut cursor = std::io::Cursor::new(d);
parse_uni(&mut stream, HTTP3_PUSH_STREAM_TYPE_ID, &mut cursor).unwrap();
assert_eq!(stream.state, State::PushId);
stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
let push_id = stream.try_consume_varint().unwrap();
assert_eq!(push_id, 1);
stream.set_push_id(push_id).unwrap();
assert_eq!(stream.state, State::FrameType);
stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
let frame_ty = stream.try_consume_varint().unwrap();
assert_eq!(frame_ty, frame::HEADERS_FRAME_TYPE_ID);
stream.set_frame_type(frame_ty).unwrap();
assert_eq!(stream.state, State::FramePayloadLen);
stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
let frame_payload_len = stream.try_consume_varint().unwrap();
assert_eq!(frame_payload_len, 12);
stream.set_frame_payload_len(frame_payload_len).unwrap();
assert_eq!(stream.state, State::FramePayload);
stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
assert_eq!(stream.try_consume_frame(), Ok((hdrs, 12)));
assert_eq!(stream.state, State::FrameType);
stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
let frame_ty = stream.try_consume_varint().unwrap();
assert_eq!(frame_ty, frame::DATA_FRAME_TYPE_ID);
stream.set_frame_type(frame_ty).unwrap();
assert_eq!(stream.state, State::FramePayloadLen);
stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
let frame_payload_len = stream.try_consume_varint().unwrap();
assert_eq!(frame_payload_len, 12);
stream.set_frame_payload_len(frame_payload_len).unwrap();
assert_eq!(stream.state, State::Data);
let mut recv_buf = vec![0; payload.len()];
assert_eq!(
stream.try_consume_data_for_tests(&mut cursor, &mut recv_buf),
Ok(payload.len())
);
assert_eq!(payload, recv_buf);
assert_eq!(stream.state, State::FrameType);
}
#[test]
fn grease() {
let mut d = vec![42; 20];
let mut b = octets::OctetsMut::with_slice(&mut d);
let mut stream = open_uni(&mut b, 33).unwrap();
let mut cursor = std::io::Cursor::new(d);
stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
let stream_ty = stream.try_consume_varint().unwrap();
assert_eq!(stream_ty, 33);
stream
.set_ty(Type::deserialize(stream_ty).unwrap())
.unwrap();
assert_eq!(stream.state, State::Drain);
}
#[test]
fn data_before_headers() {
let mut stream = Stream::new(0, false);
let mut d = vec![42; 128];
let mut b = octets::OctetsMut::with_slice(&mut d);
let data = frame::Frame::Data {
payload: vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12],
};
data.to_bytes(&mut b).unwrap();
let mut cursor = std::io::Cursor::new(d);
stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
let frame_ty = stream.try_consume_varint().unwrap();
assert_eq!(frame_ty, frame::DATA_FRAME_TYPE_ID);
assert_eq!(stream.set_frame_type(frame_ty), Err(Error::FrameUnexpected));
}
#[test]
fn additional_headers() {
let mut stream = Stream::new(0, false);
let mut d = vec![42; 128];
let mut b = octets::OctetsMut::with_slice(&mut d);
let header_block = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12];
let payload = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12];
let info_hdrs = frame::Frame::Headers {
header_block: header_block.clone(),
};
let non_info_hdrs = frame::Frame::Headers {
header_block: header_block.clone(),
};
let trailers = frame::Frame::Headers { header_block };
let data = frame::Frame::Data {
payload: payload.clone(),
};
info_hdrs.to_bytes(&mut b).unwrap();
non_info_hdrs.to_bytes(&mut b).unwrap();
data.to_bytes(&mut b).unwrap();
trailers.to_bytes(&mut b).unwrap();
let mut cursor = std::io::Cursor::new(d);
stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
let frame_ty = stream.try_consume_varint().unwrap();
assert_eq!(frame_ty, frame::HEADERS_FRAME_TYPE_ID);
stream.set_frame_type(frame_ty).unwrap();
assert_eq!(stream.state, State::FramePayloadLen);
stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
let frame_payload_len = stream.try_consume_varint().unwrap();
assert_eq!(frame_payload_len, 12);
stream.set_frame_payload_len(frame_payload_len).unwrap();
assert_eq!(stream.state, State::FramePayload);
stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
assert_eq!(stream.try_consume_frame(), Ok((info_hdrs, 12)));
assert_eq!(stream.state, State::FrameType);
stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
let frame_ty = stream.try_consume_varint().unwrap();
assert_eq!(frame_ty, frame::HEADERS_FRAME_TYPE_ID);
stream.set_frame_type(frame_ty).unwrap();
assert_eq!(stream.state, State::FramePayloadLen);
stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
let frame_payload_len = stream.try_consume_varint().unwrap();
assert_eq!(frame_payload_len, 12);
stream.set_frame_payload_len(frame_payload_len).unwrap();
assert_eq!(stream.state, State::FramePayload);
stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
assert_eq!(stream.try_consume_frame(), Ok((non_info_hdrs, 12)));
assert_eq!(stream.state, State::FrameType);
stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
let frame_ty = stream.try_consume_varint().unwrap();
assert_eq!(frame_ty, frame::DATA_FRAME_TYPE_ID);
stream.set_frame_type(frame_ty).unwrap();
assert_eq!(stream.state, State::FramePayloadLen);
stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
let frame_payload_len = stream.try_consume_varint().unwrap();
assert_eq!(frame_payload_len, 12);
stream.set_frame_payload_len(frame_payload_len).unwrap();
assert_eq!(stream.state, State::Data);
let mut recv_buf = vec![0; payload.len()];
assert_eq!(
stream.try_consume_data_for_tests(&mut cursor, &mut recv_buf),
Ok(payload.len())
);
assert_eq!(payload, recv_buf);
assert_eq!(stream.state, State::FrameType);
stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
let frame_ty = stream.try_consume_varint().unwrap();
assert_eq!(frame_ty, frame::HEADERS_FRAME_TYPE_ID);
stream.set_frame_type(frame_ty).unwrap();
assert_eq!(stream.state, State::FramePayloadLen);
stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
let frame_payload_len = stream.try_consume_varint().unwrap();
assert_eq!(frame_payload_len, 12);
stream.set_frame_payload_len(frame_payload_len).unwrap();
assert_eq!(stream.state, State::FramePayload);
stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
assert_eq!(stream.try_consume_frame(), Ok((trailers, 12)));
assert_eq!(stream.state, State::FrameType);
}
#[test]
fn zero_length_goaway() {
let mut d = vec![42; 128];
let mut b = octets::OctetsMut::with_slice(&mut d);
let frame = Frame::Settings {
max_field_section_size: None,
qpack_max_table_capacity: None,
qpack_blocked_streams: None,
connect_protocol_enabled: None,
h3_datagram: None,
grease: None,
additional_settings: None,
raw: Some(vec![]),
};
let mut stream = open_uni(&mut b, HTTP3_CONTROL_STREAM_TYPE_ID).unwrap();
frame.to_bytes(&mut b).unwrap();
b.put_varint(frame::GOAWAY_FRAME_TYPE_ID).unwrap();
b.put_varint(0).unwrap();
let mut cursor = std::io::Cursor::new(d);
parse_uni(&mut stream, HTTP3_CONTROL_STREAM_TYPE_ID, &mut cursor)
.unwrap();
parse_skip_frame(&mut stream, &mut cursor).unwrap();
stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
let frame_ty = stream.try_consume_varint().unwrap();
assert_eq!(frame_ty, frame::GOAWAY_FRAME_TYPE_ID);
stream.set_frame_type(frame_ty).unwrap();
assert_eq!(stream.state, State::FramePayloadLen);
stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
let frame_payload_len = stream.try_consume_varint().unwrap();
assert_eq!(
Err(Error::FrameError),
stream.set_frame_payload_len(frame_payload_len)
);
}
#[test]
fn zero_length_push_promise() {
let mut d = vec![42; 128];
let mut b = octets::OctetsMut::with_slice(&mut d);
let mut stream = Stream::new(0, false);
assert_eq!(stream.ty, Some(Type::Request));
assert_eq!(stream.state, State::FrameType);
b.put_varint(frame::PUSH_PROMISE_FRAME_TYPE_ID).unwrap();
b.put_varint(0).unwrap();
let mut cursor = std::io::Cursor::new(d);
stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
let frame_ty = stream.try_consume_varint().unwrap();
assert_eq!(frame_ty, frame::PUSH_PROMISE_FRAME_TYPE_ID);
stream.set_frame_type(frame_ty).unwrap();
assert_eq!(stream.state, State::FramePayloadLen);
stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
let frame_payload_len = stream.try_consume_varint().unwrap();
assert_eq!(
Err(Error::FrameError),
stream.set_frame_payload_len(frame_payload_len)
);
}
#[test]
fn zero_length_cancel_push() {
let mut d = vec![42; 128];
let mut b = octets::OctetsMut::with_slice(&mut d);
let frame = Frame::Settings {
max_field_section_size: None,
qpack_max_table_capacity: None,
qpack_blocked_streams: None,
connect_protocol_enabled: None,
h3_datagram: None,
grease: None,
additional_settings: None,
raw: Some(vec![]),
};
let mut stream = open_uni(&mut b, HTTP3_CONTROL_STREAM_TYPE_ID).unwrap();
frame.to_bytes(&mut b).unwrap();
b.put_varint(frame::CANCEL_PUSH_FRAME_TYPE_ID).unwrap();
b.put_varint(0).unwrap();
let mut cursor = std::io::Cursor::new(d);
parse_uni(&mut stream, HTTP3_CONTROL_STREAM_TYPE_ID, &mut cursor)
.unwrap();
parse_skip_frame(&mut stream, &mut cursor).unwrap();
stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
let frame_ty = stream.try_consume_varint().unwrap();
assert_eq!(frame_ty, frame::CANCEL_PUSH_FRAME_TYPE_ID);
stream.set_frame_type(frame_ty).unwrap();
assert_eq!(stream.state, State::FramePayloadLen);
stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
let frame_payload_len = stream.try_consume_varint().unwrap();
assert_eq!(
Err(Error::FrameError),
stream.set_frame_payload_len(frame_payload_len)
);
}
#[test]
fn zero_length_max_push_id() {
let mut d = vec![42; 128];
let mut b = octets::OctetsMut::with_slice(&mut d);
let frame = Frame::Settings {
max_field_section_size: None,
qpack_max_table_capacity: None,
qpack_blocked_streams: None,
connect_protocol_enabled: None,
h3_datagram: None,
grease: None,
additional_settings: None,
raw: Some(vec![]),
};
let mut stream = open_uni(&mut b, HTTP3_CONTROL_STREAM_TYPE_ID).unwrap();
frame.to_bytes(&mut b).unwrap();
b.put_varint(frame::MAX_PUSH_FRAME_TYPE_ID).unwrap();
b.put_varint(0).unwrap();
let mut cursor = std::io::Cursor::new(d);
parse_uni(&mut stream, HTTP3_CONTROL_STREAM_TYPE_ID, &mut cursor)
.unwrap();
parse_skip_frame(&mut stream, &mut cursor).unwrap();
stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
let frame_ty = stream.try_consume_varint().unwrap();
assert_eq!(frame_ty, frame::MAX_PUSH_FRAME_TYPE_ID);
stream.set_frame_type(frame_ty).unwrap();
assert_eq!(stream.state, State::FramePayloadLen);
stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
let frame_payload_len = stream.try_consume_varint().unwrap();
assert_eq!(
Err(Error::FrameError),
stream.set_frame_payload_len(frame_payload_len)
);
}
}