Skip to main content

quiche/h3/
mod.rs

1// Copyright (C) 2019, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27//! HTTP/3 wire protocol and QPACK implementation.
28//!
29//! This module provides a high level API for sending and receiving HTTP/3
30//! requests and responses on top of the QUIC transport protocol.
31//!
32//! ## Connection setup
33//!
34//! HTTP/3 connections require a QUIC transport-layer connection, see
35//! [Connection setup] for a full description of the setup process.
36//!
37//! To use HTTP/3, the QUIC connection must be configured with a suitable
38//! Application Layer Protocol Negotiation (ALPN) Protocol ID:
39//!
40//! ```
41//! let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION)?;
42//! config.set_application_protos(quiche::h3::APPLICATION_PROTOCOL)?;
43//! # Ok::<(), quiche::Error>(())
44//! ```
45//!
46//! The QUIC handshake is driven by [sending] and [receiving] QUIC packets.
47//!
48//! Once the handshake has completed, the first step in establishing an HTTP/3
49//! connection is creating its configuration object:
50//!
51//! ```
52//! let h3_config = quiche::h3::Config::new()?;
53//! # Ok::<(), quiche::h3::Error>(())
54//! ```
55//!
56//! HTTP/3 client and server connections are both created using the
57//! [`with_transport()`] function, the role is inferred from the type of QUIC
58//! connection:
59//!
60//! ```no_run
61//! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap();
62//! # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]);
63//! # let peer = "127.0.0.1:1234".parse().unwrap();
64//! # let local = "127.0.0.1:4321".parse().unwrap();
65//! # let mut conn = quiche::accept(&scid, None, local, peer, &mut config).unwrap();
66//! # let h3_config = quiche::h3::Config::new()?;
67//! let h3_conn = quiche::h3::Connection::with_transport(&mut conn, &h3_config)?;
68//! # Ok::<(), quiche::h3::Error>(())
69//! ```
70//!
71//! ## Sending a request
72//!
73//! An HTTP/3 client can send a request by using the connection's
74//! [`send_request()`] method to queue request headers; [sending] QUIC packets
75//! causes the requests to get sent to the peer:
76//!
77//! ```no_run
78//! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap();
79//! # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]);
80//! # let peer = "127.0.0.1:1234".parse().unwrap();
81//! # let local = "127.0.0.1:4321".parse().unwrap();
82//! # let mut conn = quiche::connect(None, &scid, local, peer, &mut config).unwrap();
83//! # let h3_config = quiche::h3::Config::new()?;
84//! # let mut h3_conn = quiche::h3::Connection::with_transport(&mut conn, &h3_config)?;
85//! let req = vec![
86//!     quiche::h3::Header::new(b":method", b"GET"),
87//!     quiche::h3::Header::new(b":scheme", b"https"),
88//!     quiche::h3::Header::new(b":authority", b"quic.tech"),
89//!     quiche::h3::Header::new(b":path", b"/"),
90//!     quiche::h3::Header::new(b"user-agent", b"quiche"),
91//! ];
92//!
93//! h3_conn.send_request(&mut conn, &req, true)?;
94//! # Ok::<(), quiche::h3::Error>(())
95//! ```
96//!
97//! An HTTP/3 client can send a request with additional body data by using
98//! the connection's [`send_body()`] method:
99//!
100//! ```no_run
101//! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap();
102//! # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]);
103//! # let peer = "127.0.0.1:1234".parse().unwrap();
104//! # let local = "127.0.0.1:4321".parse().unwrap();
105//! # let mut conn = quiche::connect(None, &scid, local, peer, &mut config).unwrap();
106//! # let h3_config = quiche::h3::Config::new()?;
107//! # let mut h3_conn = quiche::h3::Connection::with_transport(&mut conn, &h3_config)?;
108//! let req = vec![
109//!     quiche::h3::Header::new(b":method", b"GET"),
110//!     quiche::h3::Header::new(b":scheme", b"https"),
111//!     quiche::h3::Header::new(b":authority", b"quic.tech"),
112//!     quiche::h3::Header::new(b":path", b"/"),
113//!     quiche::h3::Header::new(b"user-agent", b"quiche"),
114//! ];
115//!
116//! let stream_id = h3_conn.send_request(&mut conn, &req, false)?;
117//! h3_conn.send_body(&mut conn, stream_id, b"Hello World!", true)?;
118//! # Ok::<(), quiche::h3::Error>(())
119//! ```
120//!
121//! ## Handling requests and responses
122//!
123//! After [receiving] QUIC packets, HTTP/3 data is processed using the
124//! connection's [`poll()`] method. On success, this returns an [`Event`] object
125//! and an ID corresponding to the stream where the `Event` originated.
126//!
127//! An HTTP/3 server uses [`poll()`] to read requests and responds to them using
128//! [`send_response()`] and [`send_body()`]:
129//!
130//! ```no_run
131//! use quiche::h3::NameValue;
132//!
133//! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap();
134//! # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]);
135//! # let peer = "127.0.0.1:1234".parse().unwrap();
136//! # let local = "127.0.0.1:1234".parse().unwrap();
137//! # let mut conn = quiche::accept(&scid, None, local, peer, &mut config).unwrap();
138//! # let h3_config = quiche::h3::Config::new()?;
139//! # let mut h3_conn = quiche::h3::Connection::with_transport(&mut conn, &h3_config)?;
140//! loop {
141//!     match h3_conn.poll(&mut conn) {
142//!         Ok((stream_id, quiche::h3::Event::Headers{list, more_frames})) => {
143//!             let mut headers = list.into_iter();
144//!
145//!             // Look for the request's method.
146//!             let method = headers.find(|h| h.name() == b":method").unwrap();
147//!
148//!             // Look for the request's path.
149//!             let path = headers.find(|h| h.name() == b":path").unwrap();
150//!
151//!             if method.value() == b"GET" && path.value() == b"/" {
152//!                 let resp = vec![
153//!                     quiche::h3::Header::new(b":status", 200.to_string().as_bytes()),
154//!                     quiche::h3::Header::new(b"server", b"quiche"),
155//!                 ];
156//!
157//!                 h3_conn.send_response(&mut conn, stream_id, &resp, false)?;
158//!                 h3_conn.send_body(&mut conn, stream_id, b"Hello World!", true)?;
159//!             }
160//!         },
161//!
162//!         Ok((stream_id, quiche::h3::Event::Data)) => {
163//!             // Request body data, handle it.
164//!             # return Ok(());
165//!         },
166//!
167//!         Ok((stream_id, quiche::h3::Event::Finished)) => {
168//!             // Peer terminated stream, handle it.
169//!         },
170//!
171//!         Ok((stream_id, quiche::h3::Event::Reset(err))) => {
172//!             // Peer reset the stream, handle it.
173//!         },
174//!
175//!         Ok((_flow_id, quiche::h3::Event::PriorityUpdate)) => (),
176//!
177//!         Ok((goaway_id, quiche::h3::Event::GoAway)) => {
178//!              // Peer signalled it is going away, handle it.
179//!         },
180//!
181//!         Err(quiche::h3::Error::Done) => {
182//!             // Done reading.
183//!             break;
184//!         },
185//!
186//!         Err(e) => {
187//!             // An error occurred, handle it.
188//!             break;
189//!         },
190//!     }
191//! }
192//! # Ok::<(), quiche::h3::Error>(())
193//! ```
194//!
195//! An HTTP/3 client uses [`poll()`] to read responses:
196//!
197//! ```no_run
198//! use quiche::h3::NameValue;
199//!
200//! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap();
201//! # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]);
202//! # let peer = "127.0.0.1:1234".parse().unwrap();
203//! # let local = "127.0.0.1:1234".parse().unwrap();
204//! # let mut conn = quiche::connect(None, &scid, local, peer, &mut config).unwrap();
205//! # let h3_config = quiche::h3::Config::new()?;
206//! # let mut h3_conn = quiche::h3::Connection::with_transport(&mut conn, &h3_config)?;
207//! loop {
208//!     match h3_conn.poll(&mut conn) {
209//!         Ok((stream_id, quiche::h3::Event::Headers{list, more_frames})) => {
210//!             let status = list.iter().find(|h| h.name() == b":status").unwrap();
211//!             println!("Received {} response on stream {}",
212//!                      std::str::from_utf8(status.value()).unwrap(),
213//!                      stream_id);
214//!         },
215//!
216//!         Ok((stream_id, quiche::h3::Event::Data)) => {
217//!             let mut body = vec![0; 4096];
218//!
219//!             // Consume all body data received on the stream.
220//!             while let Ok(read) =
221//!                 h3_conn.recv_body(&mut conn, stream_id, &mut body)
222//!             {
223//!                 println!("Received {} bytes of payload on stream {}",
224//!                          read, stream_id);
225//!             }
226//!         },
227//!
228//!         Ok((stream_id, quiche::h3::Event::Finished)) => {
229//!             // Peer terminated stream, handle it.
230//!         },
231//!
232//!         Ok((stream_id, quiche::h3::Event::Reset(err))) => {
233//!             // Peer reset the stream, handle it.
234//!         },
235//!
236//!         Ok((_prioritized_element_id, quiche::h3::Event::PriorityUpdate)) => (),
237//!
238//!         Ok((goaway_id, quiche::h3::Event::GoAway)) => {
239//!              // Peer signalled it is going away, handle it.
240//!         },
241//!
242//!         Err(quiche::h3::Error::Done) => {
243//!             // Done reading.
244//!             break;
245//!         },
246//!
247//!         Err(e) => {
248//!             // An error occurred, handle it.
249//!             break;
250//!         },
251//!     }
252//! }
253//! # Ok::<(), quiche::h3::Error>(())
254//! ```
255//!
256//! ## Detecting end of request or response
257//!
258//! A single HTTP/3 request or response may consist of several HEADERS and DATA
259//! frames; it is finished when the QUIC stream is closed. Calling [`poll()`]
260//! repeatedly will generate an [`Event`] for each of these. The application may
261//! use these event to do additional HTTP semantic validation.
262//!
263//! ## HTTP/3 protocol errors
264//!
265//! Quiche is responsible for managing the HTTP/3 connection, ensuring it is in
266//! a correct state and validating all messages received by a peer. This mainly
267//! takes place in the [`poll()`] method. If an HTTP/3 error occurs, quiche will
268//! close the connection and send an appropriate CONNECTION_CLOSE frame to the
269//! peer. An [`Error`] is returned to the application so that it can perform any
270//! required tidy up such as closing sockets.
271//!
272//! [`application_proto()`]: ../struct.Connection.html#method.application_proto
273//! [`stream_finished()`]: ../struct.Connection.html#method.stream_finished
274//! [Connection setup]: ../index.html#connection-setup
275//! [sending]: ../index.html#generating-outgoing-packets
276//! [receiving]: ../index.html#handling-incoming-packets
277//! [`with_transport()`]: struct.Connection.html#method.with_transport
278//! [`poll()`]: struct.Connection.html#method.poll
279//! [`Event`]: enum.Event.html
280//! [`Error`]: enum.Error.html
281//! [`send_request()`]: struct.Connection.html#method.send_response
282//! [`send_response()`]: struct.Connection.html#method.send_response
283//! [`send_body()`]: struct.Connection.html#method.send_body
284
285use std::collections::HashSet;
286use std::collections::VecDeque;
287
288#[cfg(feature = "sfv")]
289use std::convert::TryFrom;
290use std::fmt;
291use std::fmt::Write;
292
293#[cfg(feature = "qlog")]
294use qlog::events::h3::H3FrameCreated;
295#[cfg(feature = "qlog")]
296use qlog::events::h3::H3FrameParsed;
297#[cfg(feature = "qlog")]
298use qlog::events::h3::H3Owner;
299#[cfg(feature = "qlog")]
300use qlog::events::h3::H3PriorityTargetStreamType;
301#[cfg(feature = "qlog")]
302use qlog::events::h3::H3StreamType;
303#[cfg(feature = "qlog")]
304use qlog::events::h3::H3StreamTypeSet;
305#[cfg(feature = "qlog")]
306use qlog::events::h3::Http3EventType;
307#[cfg(feature = "qlog")]
308use qlog::events::h3::Http3Frame;
309#[cfg(feature = "qlog")]
310use qlog::events::EventData;
311#[cfg(feature = "qlog")]
312use qlog::events::EventImportance;
313#[cfg(feature = "qlog")]
314use qlog::events::EventType;
315
316use crate::buffers::BufFactory;
317use crate::BufSplit;
318
319/// List of ALPN tokens of supported HTTP/3 versions.
320///
321/// This can be passed directly to the [`Config::set_application_protos()`]
322/// method when implementing HTTP/3 applications.
323///
324/// [`Config::set_application_protos()`]:
325/// ../struct.Config.html#method.set_application_protos
326pub const APPLICATION_PROTOCOL: &[&[u8]] = &[b"h3"];
327
328// The offset used when converting HTTP/3 urgency to quiche urgency.
329const PRIORITY_URGENCY_OFFSET: u8 = 124;
330
331// Parameter values as specified in [Extensible Priorities].
332//
333// [Extensible Priorities]: https://www.rfc-editor.org/rfc/rfc9218.html#section-4.
334const PRIORITY_URGENCY_LOWER_BOUND: u8 = 0;
335const PRIORITY_URGENCY_UPPER_BOUND: u8 = 7;
336const PRIORITY_URGENCY_DEFAULT: u8 = 3;
337const PRIORITY_INCREMENTAL_DEFAULT: bool = false;
338
339#[cfg(feature = "qlog")]
340const QLOG_FRAME_CREATED: EventType =
341    EventType::Http3EventType(Http3EventType::FrameCreated);
342#[cfg(feature = "qlog")]
343const QLOG_FRAME_PARSED: EventType =
344    EventType::Http3EventType(Http3EventType::FrameParsed);
345#[cfg(feature = "qlog")]
346const QLOG_STREAM_TYPE_SET: EventType =
347    EventType::Http3EventType(Http3EventType::StreamTypeSet);
348
349/// A specialized [`Result`] type for quiche HTTP/3 operations.
350///
351/// This type is used throughout quiche's HTTP/3 public API for any operation
352/// that can produce an error.
353///
354/// [`Result`]: https://doc.rust-lang.org/std/result/enum.Result.html
355pub type Result<T> = std::result::Result<T, Error>;
356
357/// An HTTP/3 error.
358#[derive(Clone, Copy, Debug, PartialEq, Eq)]
359pub enum Error {
360    /// There is no error or no work to do
361    Done,
362
363    /// The provided buffer is too short.
364    BufferTooShort,
365
366    /// Internal error in the HTTP/3 stack.
367    InternalError,
368
369    /// Endpoint detected that the peer is exhibiting behavior that causes.
370    /// excessive load.
371    ExcessiveLoad,
372
373    /// Stream ID or Push ID greater that current maximum was
374    /// used incorrectly, such as exceeding a limit, reducing a limit,
375    /// or being reused.
376    IdError,
377
378    /// The endpoint detected that its peer created a stream that it will not
379    /// accept.
380    StreamCreationError,
381
382    /// A required critical stream was closed.
383    ClosedCriticalStream,
384
385    /// No SETTINGS frame at beginning of control stream.
386    MissingSettings,
387
388    /// A frame was received which is not permitted in the current state.
389    FrameUnexpected,
390
391    /// Frame violated layout or size rules.
392    FrameError,
393
394    /// QPACK Header block decompression failure.
395    QpackDecompressionFailed,
396
397    /// Error originated from the transport layer.
398    TransportError(crate::Error),
399
400    /// The underlying QUIC stream (or connection) doesn't have enough capacity
401    /// for the operation to complete. The application should retry later on.
402    StreamBlocked,
403
404    /// Error in the payload of a SETTINGS frame.
405    SettingsError,
406
407    /// Server rejected request.
408    RequestRejected,
409
410    /// Request or its response cancelled.
411    RequestCancelled,
412
413    /// Client's request stream terminated without containing a full-formed
414    /// request.
415    RequestIncomplete,
416
417    /// An HTTP message was malformed and cannot be processed.
418    MessageError,
419
420    /// The TCP connection established in response to a CONNECT request was
421    /// reset or abnormally closed.
422    ConnectError,
423
424    /// The requested operation cannot be served over HTTP/3. Peer should retry
425    /// over HTTP/1.1.
426    VersionFallback,
427}
428
429/// HTTP/3 error codes sent on the wire.
430///
431/// As defined in [RFC9114](https://www.rfc-editor.org/rfc/rfc9114.html#http-error-codes).
432#[derive(Copy, Clone, Debug, Eq, PartialEq)]
433pub enum WireErrorCode {
434    /// No error. This is used when the connection or stream needs to be closed,
435    /// but there is no error to signal.
436    NoError              = 0x100,
437    /// Peer violated protocol requirements in a way that does not match a more
438    /// specific error code or endpoint declines to use the more specific
439    /// error code.
440    GeneralProtocolError = 0x101,
441    /// An internal error has occurred in the HTTP stack.
442    InternalError        = 0x102,
443    /// The endpoint detected that its peer created a stream that it will not
444    /// accept.
445    StreamCreationError  = 0x103,
446    /// A stream required by the HTTP/3 connection was closed or reset.
447    ClosedCriticalStream = 0x104,
448    /// A frame was received that was not permitted in the current state or on
449    /// the current stream.
450    FrameUnexpected      = 0x105,
451    /// A frame that fails to satisfy layout requirements or with an invalid
452    /// size was received.
453    FrameError           = 0x106,
454    /// The endpoint detected that its peer is exhibiting a behavior that might
455    /// be generating excessive load.
456    ExcessiveLoad        = 0x107,
457    /// A stream ID or push ID was used incorrectly, such as exceeding a limit,
458    /// reducing a limit, or being reused.
459    IdError              = 0x108,
460    /// An endpoint detected an error in the payload of a SETTINGS frame.
461    SettingsError        = 0x109,
462    /// No SETTINGS frame was received at the beginning of the control stream.
463    MissingSettings      = 0x10a,
464    /// A server rejected a request without performing any application
465    /// processing.
466    RequestRejected      = 0x10b,
467    /// The request or its response (including pushed response) is cancelled.
468    RequestCancelled     = 0x10c,
469    /// The client's stream terminated without containing a fully formed
470    /// request.
471    RequestIncomplete    = 0x10d,
472    /// An HTTP message was malformed and cannot be processed.
473    MessageError         = 0x10e,
474    /// The TCP connection established in response to a CONNECT request was
475    /// reset or abnormally closed.
476    ConnectError         = 0x10f,
477    /// The requested operation cannot be served over HTTP/3. The peer should
478    /// retry over HTTP/1.1.
479    VersionFallback      = 0x110,
480}
481
482impl Error {
483    fn to_wire(self) -> u64 {
484        match self {
485            Error::Done => WireErrorCode::NoError as u64,
486            Error::InternalError => WireErrorCode::InternalError as u64,
487            Error::StreamCreationError =>
488                WireErrorCode::StreamCreationError as u64,
489            Error::ClosedCriticalStream =>
490                WireErrorCode::ClosedCriticalStream as u64,
491            Error::FrameUnexpected => WireErrorCode::FrameUnexpected as u64,
492            Error::FrameError => WireErrorCode::FrameError as u64,
493            Error::ExcessiveLoad => WireErrorCode::ExcessiveLoad as u64,
494            Error::IdError => WireErrorCode::IdError as u64,
495            Error::MissingSettings => WireErrorCode::MissingSettings as u64,
496            Error::QpackDecompressionFailed => 0x200,
497            Error::BufferTooShort => 0x999,
498            Error::TransportError { .. } | Error::StreamBlocked => 0xFF,
499            Error::SettingsError => WireErrorCode::SettingsError as u64,
500            Error::RequestRejected => WireErrorCode::RequestRejected as u64,
501            Error::RequestCancelled => WireErrorCode::RequestCancelled as u64,
502            Error::RequestIncomplete => WireErrorCode::RequestIncomplete as u64,
503            Error::MessageError => WireErrorCode::MessageError as u64,
504            Error::ConnectError => WireErrorCode::ConnectError as u64,
505            Error::VersionFallback => WireErrorCode::VersionFallback as u64,
506        }
507    }
508
509    #[cfg(feature = "ffi")]
510    fn to_c(self) -> libc::ssize_t {
511        match self {
512            Error::Done => -1,
513            Error::BufferTooShort => -2,
514            Error::InternalError => -3,
515            Error::ExcessiveLoad => -4,
516            Error::IdError => -5,
517            Error::StreamCreationError => -6,
518            Error::ClosedCriticalStream => -7,
519            Error::MissingSettings => -8,
520            Error::FrameUnexpected => -9,
521            Error::FrameError => -10,
522            Error::QpackDecompressionFailed => -11,
523            // -12 was previously used for TransportError, skip it
524            Error::StreamBlocked => -13,
525            Error::SettingsError => -14,
526            Error::RequestRejected => -15,
527            Error::RequestCancelled => -16,
528            Error::RequestIncomplete => -17,
529            Error::MessageError => -18,
530            Error::ConnectError => -19,
531            Error::VersionFallback => -20,
532
533            Error::TransportError(quic_error) => quic_error.to_c() - 1000,
534        }
535    }
536}
537
538impl fmt::Display for Error {
539    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
540        write!(f, "{self:?}")
541    }
542}
543
544impl std::error::Error for Error {
545    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
546        None
547    }
548}
549
550impl From<super::Error> for Error {
551    fn from(err: super::Error) -> Self {
552        match err {
553            super::Error::Done => Error::Done,
554
555            _ => Error::TransportError(err),
556        }
557    }
558}
559
560impl From<octets::BufferTooShortError> for Error {
561    fn from(_err: octets::BufferTooShortError) -> Self {
562        Error::BufferTooShort
563    }
564}
565
566/// An HTTP/3 configuration.
567pub struct Config {
568    max_field_section_size: Option<u64>,
569    qpack_max_table_capacity: Option<u64>,
570    qpack_blocked_streams: Option<u64>,
571    connect_protocol_enabled: Option<u64>,
572    /// additional settings are settings that are not part of the H3
573    /// settings explicitly handled above
574    additional_settings: Option<Vec<(u64, u64)>>,
575}
576
577impl Config {
578    /// Creates a new configuration object with default settings.
579    pub const fn new() -> Result<Config> {
580        Ok(Config {
581            max_field_section_size: None,
582            qpack_max_table_capacity: None,
583            qpack_blocked_streams: None,
584            connect_protocol_enabled: None,
585            additional_settings: None,
586        })
587    }
588
589    /// Sets the `SETTINGS_MAX_FIELD_SECTION_SIZE` setting.
590    ///
591    /// By default no limit is enforced. When a request whose headers exceed
592    /// the limit set by the application is received, the call to the [`poll()`]
593    /// method will return the [`Error::ExcessiveLoad`] error, and the
594    /// connection will be closed.
595    ///
596    /// [`poll()`]: struct.Connection.html#method.poll
597    /// [`Error::ExcessiveLoad`]: enum.Error.html#variant.ExcessiveLoad
598    pub fn set_max_field_section_size(&mut self, v: u64) {
599        self.max_field_section_size = Some(v);
600    }
601
602    /// Sets the `SETTINGS_QPACK_MAX_TABLE_CAPACITY` setting.
603    ///
604    /// The default value is `0`.
605    pub fn set_qpack_max_table_capacity(&mut self, v: u64) {
606        self.qpack_max_table_capacity = Some(v);
607    }
608
609    /// Sets the `SETTINGS_QPACK_BLOCKED_STREAMS` setting.
610    ///
611    /// The default value is `0`.
612    pub fn set_qpack_blocked_streams(&mut self, v: u64) {
613        self.qpack_blocked_streams = Some(v);
614    }
615
616    /// Sets or omits the `SETTINGS_ENABLE_CONNECT_PROTOCOL` setting.
617    ///
618    /// The default value is `false`.
619    pub fn enable_extended_connect(&mut self, enabled: bool) {
620        if enabled {
621            self.connect_protocol_enabled = Some(1);
622        } else {
623            self.connect_protocol_enabled = None;
624        }
625    }
626
627    /// Sets additional HTTP/3 settings.
628    ///
629    /// The default value is no additional settings.
630    /// The `additional_settings` parameter must not the following
631    /// settings as they are already handled by this library:
632    ///
633    /// - SETTINGS_QPACK_MAX_TABLE_CAPACITY
634    /// - SETTINGS_MAX_FIELD_SECTION_SIZE
635    /// - SETTINGS_QPACK_BLOCKED_STREAMS
636    /// - SETTINGS_ENABLE_CONNECT_PROTOCOL
637    /// - SETTINGS_H3_DATAGRAM
638    ///
639    /// If such a setting is present in the `additional_settings`,
640    /// the method will return the [`Error::SettingsError`] error.
641    ///
642    /// If a setting identifier is present twice in `additional_settings`,
643    /// the method will return the [`Error::SettingsError`] error.
644    ///
645    /// [`Error::SettingsError`]: enum.Error.html#variant.SettingsError
646    pub fn set_additional_settings(
647        &mut self, additional_settings: Vec<(u64, u64)>,
648    ) -> Result<()> {
649        let explicit_quiche_settings = HashSet::from([
650            frame::SETTINGS_QPACK_MAX_TABLE_CAPACITY,
651            frame::SETTINGS_MAX_FIELD_SECTION_SIZE,
652            frame::SETTINGS_QPACK_BLOCKED_STREAMS,
653            frame::SETTINGS_ENABLE_CONNECT_PROTOCOL,
654            frame::SETTINGS_H3_DATAGRAM,
655            frame::SETTINGS_H3_DATAGRAM_00,
656        ]);
657
658        let dedup_settings: HashSet<u64> =
659            additional_settings.iter().map(|(key, _)| *key).collect();
660
661        if dedup_settings.len() != additional_settings.len() ||
662            !explicit_quiche_settings.is_disjoint(&dedup_settings)
663        {
664            return Err(Error::SettingsError);
665        }
666        self.additional_settings = Some(additional_settings);
667        Ok(())
668    }
669}
670
671/// A trait for types with associated string name and value.
672pub trait NameValue {
673    /// Returns the object's name.
674    fn name(&self) -> &[u8];
675
676    /// Returns the object's value.
677    fn value(&self) -> &[u8];
678}
679
680impl<N, V> NameValue for (N, V)
681where
682    N: AsRef<[u8]>,
683    V: AsRef<[u8]>,
684{
685    fn name(&self) -> &[u8] {
686        self.0.as_ref()
687    }
688
689    fn value(&self) -> &[u8] {
690        self.1.as_ref()
691    }
692}
693
694/// An owned name-value pair representing a raw HTTP header.
695#[derive(Clone, PartialEq, Eq)]
696pub struct Header(Vec<u8>, Vec<u8>);
697
698fn try_print_as_readable(hdr: &[u8], f: &mut fmt::Formatter) -> fmt::Result {
699    match std::str::from_utf8(hdr) {
700        Ok(s) => f.write_str(&s.escape_default().to_string()),
701        Err(_) => write!(f, "{hdr:?}"),
702    }
703}
704
705impl fmt::Debug for Header {
706    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
707        f.write_char('"')?;
708        try_print_as_readable(&self.0, f)?;
709        f.write_str(": ")?;
710        try_print_as_readable(&self.1, f)?;
711        f.write_char('"')
712    }
713}
714
715impl Header {
716    /// Creates a new header.
717    ///
718    /// Both `name` and `value` will be cloned.
719    pub fn new(name: &[u8], value: &[u8]) -> Self {
720        Self(name.to_vec(), value.to_vec())
721    }
722}
723
724impl NameValue for Header {
725    fn name(&self) -> &[u8] {
726        &self.0
727    }
728
729    fn value(&self) -> &[u8] {
730        &self.1
731    }
732}
733
734/// A non-owned name-value pair representing a raw HTTP header.
735#[derive(Clone, Debug, PartialEq, Eq)]
736pub struct HeaderRef<'a>(&'a [u8], &'a [u8]);
737
738impl<'a> HeaderRef<'a> {
739    /// Creates a new header.
740    pub const fn new(name: &'a [u8], value: &'a [u8]) -> Self {
741        Self(name, value)
742    }
743}
744
745impl NameValue for HeaderRef<'_> {
746    fn name(&self) -> &[u8] {
747        self.0
748    }
749
750    fn value(&self) -> &[u8] {
751        self.1
752    }
753}
754
755/// An HTTP/3 connection event.
756#[derive(Clone, Debug, PartialEq, Eq)]
757pub enum Event {
758    /// Request/response headers were received.
759    Headers {
760        /// The list of received header fields. The application should validate
761        /// pseudo-headers and headers.
762        list: Vec<Header>,
763
764        /// Whether more frames will follow the headers on the stream.
765        more_frames: bool,
766    },
767
768    /// Data was received.
769    ///
770    /// This indicates that the application can use the [`recv_body()`] method
771    /// to retrieve the data from the stream.
772    ///
773    /// Note that [`recv_body()`] will need to be called repeatedly until the
774    /// [`Done`] value is returned, as the event will not be re-armed until all
775    /// buffered data is read.
776    ///
777    /// [`recv_body()`]: struct.Connection.html#method.recv_body
778    /// [`Done`]: enum.Error.html#variant.Done
779    Data,
780
781    /// Stream was closed,
782    Finished,
783
784    /// Stream was reset.
785    ///
786    /// The associated data represents the error code sent by the peer.
787    Reset(u64),
788
789    /// PRIORITY_UPDATE was received.
790    ///
791    /// This indicates that the application can use the
792    /// [`take_last_priority_update()`] method to take the last received
793    /// PRIORITY_UPDATE for a specified stream.
794    ///
795    /// This event is triggered once per stream until the last PRIORITY_UPDATE
796    /// is taken. It is recommended that applications defer taking the
797    /// PRIORITY_UPDATE until after [`poll()`] returns [`Done`].
798    ///
799    /// [`take_last_priority_update()`]: struct.Connection.html#method.take_last_priority_update
800    /// [`poll()`]: struct.Connection.html#method.poll
801    /// [`Done`]: enum.Error.html#variant.Done
802    PriorityUpdate,
803
804    /// GOAWAY was received.
805    GoAway,
806}
807
808/// Extensible Priorities parameters.
809///
810/// The `TryFrom` trait supports constructing this object from the serialized
811/// Structured Fields Dictionary field value. I.e, use `TryFrom` to parse the
812/// value of a Priority header field or a PRIORITY_UPDATE frame. Using this
813/// trait requires the `sfv` feature to be enabled.
814#[derive(Clone, Copy, Debug, PartialEq, Eq)]
815#[repr(C)]
816pub struct Priority {
817    urgency: u8,
818    incremental: bool,
819}
820
821impl Default for Priority {
822    fn default() -> Self {
823        Priority {
824            urgency: PRIORITY_URGENCY_DEFAULT,
825            incremental: PRIORITY_INCREMENTAL_DEFAULT,
826        }
827    }
828}
829
830impl Priority {
831    /// Creates a new Priority.
832    pub const fn new(urgency: u8, incremental: bool) -> Self {
833        Priority {
834            urgency,
835            incremental,
836        }
837    }
838}
839
840#[cfg(feature = "sfv")]
841#[cfg_attr(docsrs, doc(cfg(feature = "sfv")))]
842impl TryFrom<&[u8]> for Priority {
843    type Error = Error;
844
845    /// Try to parse an Extensible Priority field value.
846    ///
847    /// The field value is expected to be a Structured Fields Dictionary; see
848    /// [Extensible Priorities].
849    ///
850    /// If the `u` or `i` fields are contained with correct types, a constructed
851    /// Priority object is returned. Note that urgency values outside of valid
852    /// range (0 through 7) are clamped to 7.
853    ///
854    /// If the `u` or `i` fields are contained with the wrong types,
855    /// Error::Done is returned.
856    ///
857    /// Omitted parameters will yield default values.
858    ///
859    /// [Extensible Priorities]: https://www.rfc-editor.org/rfc/rfc9218.html#section-4.
860    fn try_from(value: &[u8]) -> std::result::Result<Self, Self::Error> {
861        let dict = match sfv::Parser::parse_dictionary(value) {
862            Ok(v) => v,
863
864            Err(_) => return Err(Error::Done),
865        };
866
867        let urgency = match dict.get("u") {
868            // If there is a u parameter, try to read it as an Item of type
869            // Integer. If the value out of the spec's allowed range
870            // (0 through 7), that's an error so set it to the upper
871            // bound (lowest priority) to avoid interference with
872            // other streams.
873            Some(sfv::ListEntry::Item(item)) => match item.bare_item.as_int() {
874                Some(v) => {
875                    if !(PRIORITY_URGENCY_LOWER_BOUND as i64..=
876                        PRIORITY_URGENCY_UPPER_BOUND as i64)
877                        .contains(&v)
878                    {
879                        PRIORITY_URGENCY_UPPER_BOUND
880                    } else {
881                        v as u8
882                    }
883                },
884
885                None => return Err(Error::Done),
886            },
887
888            Some(sfv::ListEntry::InnerList(_)) => return Err(Error::Done),
889
890            // Omitted so use default value.
891            None => PRIORITY_URGENCY_DEFAULT,
892        };
893
894        let incremental = match dict.get("i") {
895            Some(sfv::ListEntry::Item(item)) =>
896                item.bare_item.as_bool().ok_or(Error::Done)?,
897
898            // Omitted so use default value.
899            _ => false,
900        };
901
902        Ok(Priority::new(urgency, incremental))
903    }
904}
905
906struct ConnectionSettings {
907    pub max_field_section_size: Option<u64>,
908    pub qpack_max_table_capacity: Option<u64>,
909    pub qpack_blocked_streams: Option<u64>,
910    pub connect_protocol_enabled: Option<u64>,
911    pub h3_datagram: Option<u64>,
912    pub additional_settings: Option<Vec<(u64, u64)>>,
913    pub raw: Option<Vec<(u64, u64)>>,
914}
915
916#[derive(Default)]
917struct QpackStreams {
918    pub encoder_stream_id: Option<u64>,
919    pub encoder_stream_bytes: u64,
920    pub decoder_stream_id: Option<u64>,
921    pub decoder_stream_bytes: u64,
922}
923
924/// Statistics about the connection.
925///
926/// A connection's statistics can be collected using the [`stats()`] method.
927///
928/// [`stats()`]: struct.Connection.html#method.stats
929#[derive(Clone, Default)]
930pub struct Stats {
931    /// The number of bytes received on the QPACK encoder stream.
932    pub qpack_encoder_stream_recv_bytes: u64,
933    /// The number of bytes received on the QPACK decoder stream.
934    pub qpack_decoder_stream_recv_bytes: u64,
935}
936
937fn close_conn_critical_stream<F: BufFactory>(
938    conn: &mut super::Connection<F>,
939) -> Result<()> {
940    conn.close(
941        true,
942        Error::ClosedCriticalStream.to_wire(),
943        b"Critical stream closed.",
944    )?;
945
946    Err(Error::ClosedCriticalStream)
947}
948
949fn close_conn_if_critical_stream_finished<F: BufFactory>(
950    conn: &mut super::Connection<F>, stream_id: u64,
951) -> Result<()> {
952    if conn.stream_finished(stream_id) {
953        close_conn_critical_stream(conn)?;
954    }
955
956    Ok(())
957}
958
959/// An HTTP/3 connection.
960pub struct Connection {
961    is_server: bool,
962
963    next_request_stream_id: u64,
964    next_uni_stream_id: u64,
965
966    streams: crate::stream::StreamIdHashMap<stream::Stream>,
967
968    local_settings: ConnectionSettings,
969    peer_settings: ConnectionSettings,
970
971    control_stream_id: Option<u64>,
972    peer_control_stream_id: Option<u64>,
973
974    qpack_encoder: qpack::Encoder,
975    qpack_decoder: qpack::Decoder,
976
977    local_qpack_streams: QpackStreams,
978    peer_qpack_streams: QpackStreams,
979
980    max_push_id: u64,
981
982    finished_streams: VecDeque<u64>,
983
984    frames_greased: bool,
985
986    local_goaway_id: Option<u64>,
987    peer_goaway_id: Option<u64>,
988}
989
990impl Connection {
991    fn new(
992        config: &Config, is_server: bool, enable_dgram: bool,
993    ) -> Result<Connection> {
994        let initial_uni_stream_id = if is_server { 0x3 } else { 0x2 };
995        let h3_datagram = if enable_dgram { Some(1) } else { None };
996
997        Ok(Connection {
998            is_server,
999
1000            next_request_stream_id: 0,
1001
1002            next_uni_stream_id: initial_uni_stream_id,
1003
1004            streams: Default::default(),
1005
1006            local_settings: ConnectionSettings {
1007                max_field_section_size: config.max_field_section_size,
1008                qpack_max_table_capacity: config.qpack_max_table_capacity,
1009                qpack_blocked_streams: config.qpack_blocked_streams,
1010                connect_protocol_enabled: config.connect_protocol_enabled,
1011                h3_datagram,
1012                additional_settings: config.additional_settings.clone(),
1013                raw: Default::default(),
1014            },
1015
1016            peer_settings: ConnectionSettings {
1017                max_field_section_size: None,
1018                qpack_max_table_capacity: None,
1019                qpack_blocked_streams: None,
1020                h3_datagram: None,
1021                connect_protocol_enabled: None,
1022                additional_settings: Default::default(),
1023                raw: Default::default(),
1024            },
1025
1026            control_stream_id: None,
1027            peer_control_stream_id: None,
1028
1029            qpack_encoder: qpack::Encoder::new(),
1030            qpack_decoder: qpack::Decoder::new(),
1031
1032            local_qpack_streams: Default::default(),
1033            peer_qpack_streams: Default::default(),
1034
1035            max_push_id: 0,
1036
1037            finished_streams: VecDeque::new(),
1038
1039            frames_greased: false,
1040
1041            local_goaway_id: None,
1042            peer_goaway_id: None,
1043        })
1044    }
1045
1046    /// Creates a new HTTP/3 connection using the provided QUIC connection.
1047    ///
1048    /// This will also initiate the HTTP/3 handshake with the peer by opening
1049    /// all control streams (including QPACK) and sending the local settings.
1050    ///
1051    /// On success the new connection is returned.
1052    ///
1053    /// The [`StreamLimit`] error is returned when the HTTP/3 control stream
1054    /// cannot be created due to stream limits.
1055    ///
1056    /// The [`InternalError`] error is returned when either the underlying QUIC
1057    /// connection is not in a suitable state, or the HTTP/3 control stream
1058    /// cannot be created due to flow control limits.
1059    ///
1060    /// [`StreamLimit`]: ../enum.Error.html#variant.StreamLimit
1061    /// [`InternalError`]: ../enum.Error.html#variant.InternalError
1062    pub fn with_transport<F: BufFactory>(
1063        conn: &mut super::Connection<F>, config: &Config,
1064    ) -> Result<Connection> {
1065        let is_client = !conn.is_server;
1066        if is_client && !(conn.is_established() || conn.is_in_early_data()) {
1067            trace!("{} QUIC connection must be established or in early data before creating an HTTP/3 connection", conn.trace_id());
1068            return Err(Error::InternalError);
1069        }
1070
1071        let mut http3_conn =
1072            Connection::new(config, conn.is_server, conn.dgram_enabled())?;
1073
1074        match http3_conn.send_settings(conn) {
1075            Ok(_) => (),
1076
1077            Err(e) => {
1078                conn.close(true, e.to_wire(), b"Error opening control stream")?;
1079                return Err(e);
1080            },
1081        };
1082
1083        // Try opening QPACK streams, but ignore errors if it fails since we
1084        // don't need them right now.
1085        http3_conn.open_qpack_encoder_stream(conn).ok();
1086        http3_conn.open_qpack_decoder_stream(conn).ok();
1087
1088        if conn.grease {
1089            // Try opening a GREASE stream, but ignore errors since it's not
1090            // critical.
1091            http3_conn.open_grease_stream(conn).ok();
1092        }
1093
1094        Ok(http3_conn)
1095    }
1096
1097    /// Sends an HTTP/3 request.
1098    ///
1099    /// The request is encoded from the provided list of headers without a
1100    /// body, and sent on a newly allocated stream. To include a body,
1101    /// set `fin` as `false` and subsequently call [`send_body()`] with the
1102    /// same `conn` and the `stream_id` returned from this method.
1103    ///
1104    /// On success the newly allocated stream ID is returned.
1105    ///
1106    /// The [`StreamBlocked`] error is returned when the underlying QUIC stream
1107    /// doesn't have enough capacity for the operation to complete. When this
1108    /// happens the application should retry the operation once the stream is
1109    /// reported as writable again.
1110    ///
1111    /// [`send_body()`]: struct.Connection.html#method.send_body
1112    /// [`StreamBlocked`]: enum.Error.html#variant.StreamBlocked
1113    pub fn send_request<T: NameValue, F: BufFactory>(
1114        &mut self, conn: &mut super::Connection<F>, headers: &[T], fin: bool,
1115    ) -> Result<u64> {
1116        // If we received a GOAWAY from the peer, MUST NOT initiate new
1117        // requests.
1118        if self.peer_goaway_id.is_some() {
1119            return Err(Error::FrameUnexpected);
1120        }
1121
1122        let stream_id = self.next_request_stream_id;
1123
1124        self.streams
1125            .insert(stream_id, <stream::Stream>::new(stream_id, true));
1126
1127        // The underlying QUIC stream does not exist yet, so calls to e.g.
1128        // stream_capacity() will fail. By writing a 0-length buffer, we force
1129        // the creation of the QUIC stream state, without actually writing
1130        // anything.
1131        if let Err(e) = conn.stream_send(stream_id, b"", false) {
1132            self.streams.remove(&stream_id);
1133
1134            if e == super::Error::Done {
1135                return Err(Error::StreamBlocked);
1136            }
1137
1138            return Err(e.into());
1139        };
1140
1141        self.send_headers(conn, stream_id, headers, fin)?;
1142
1143        // To avoid skipping stream IDs, we only calculate the next available
1144        // stream ID when a request has been successfully buffered.
1145        self.next_request_stream_id = self
1146            .next_request_stream_id
1147            .checked_add(4)
1148            .ok_or(Error::IdError)?;
1149
1150        Ok(stream_id)
1151    }
1152
1153    /// Sends an HTTP/3 response on the specified stream with default priority.
1154    ///
1155    /// This method sends the provided `headers` as a single initial response
1156    /// without a body.
1157    ///
1158    /// To send a non-final 1xx, then a final 200+ without body:
1159    ///   * send_response() with `fin` set to `false`.
1160    ///   * [`send_additional_headers()`] with fin set to `true` using the same
1161    ///     `stream_id` value.
1162    ///
1163    /// To send a non-final 1xx, then a final 200+ with body:
1164    ///   * send_response() with `fin` set to `false`.
1165    ///   * [`send_additional_headers()`] with fin set to `false` and same
1166    ///     `stream_id` value.
1167    ///   * [`send_body()`] with same `stream_id`.
1168    ///
1169    /// To send a final 200+ with body:
1170    ///   * send_response() with `fin` set to `false`.
1171    ///   * [`send_body()`] with same `stream_id`.
1172    ///
1173    /// Additional headers can only be sent during certain phases of an HTTP/3
1174    /// message exchange, see [Section 4.1 of RFC 9114]. The [`FrameUnexpected`]
1175    /// error is returned if this method, or [`send_response_with_priority()`],
1176    /// are called multiple times with the same `stream_id` value.
1177    ///
1178    /// The [`StreamBlocked`] error is returned when the underlying QUIC stream
1179    /// doesn't have enough capacity for the operation to complete. When this
1180    /// happens the application should retry the operation once the stream is
1181    /// reported as writable again.
1182    ///
1183    /// [`send_body()`]: struct.Connection.html#method.send_body
1184    /// [`send_additional_headers()`]:
1185    ///     struct.Connection.html#method.send_additional_headers
1186    /// [`send_response_with_priority()`]:
1187    ///     struct.Connection.html#method.send_response_with_priority
1188    /// [`FrameUnexpected`]: enum.Error.html#variant.FrameUnexpected
1189    /// [`StreamBlocked`]: enum.Error.html#variant.StreamBlocked
1190    pub fn send_response<T: NameValue, F: BufFactory>(
1191        &mut self, conn: &mut super::Connection<F>, stream_id: u64,
1192        headers: &[T], fin: bool,
1193    ) -> Result<()> {
1194        let priority = Default::default();
1195
1196        self.send_response_with_priority(
1197            conn, stream_id, headers, &priority, fin,
1198        )?;
1199
1200        Ok(())
1201    }
1202
1203    /// Sends an HTTP/3 response on the specified stream with specified
1204    /// priority.
1205    ///
1206    /// This method sends the provided `headers` as a single initial response
1207    /// without a body.
1208    ///
1209    /// To send a non-final 1xx, then a final 200+ without body:
1210    ///   * send_response_with_priority() with `fin` set to `false`.
1211    ///   * [`send_additional_headers()`] with fin set to `true` using the same
1212    ///     `stream_id` value.
1213    ///
1214    /// To send a non-final 1xx, then a final 200+ with body:
1215    ///   * send_response_with_priority() with `fin` set to `false`.
1216    ///   * [`send_additional_headers()`] with fin set to `false` and same
1217    ///     `stream_id` value.
1218    ///   * [`send_body()`] with same `stream_id`.
1219    ///
1220    /// To send a final 200+ with body:
1221    ///   * send_response_with_priority() with `fin` set to `false`.
1222    ///   * [`send_body()`] with same `stream_id`.
1223    ///
1224    /// The `priority` parameter represents [Extensible Priority]
1225    /// parameters. If the urgency is outside the range 0-7, it will be clamped
1226    /// to 7.
1227    ///
1228    /// Additional headers can only be sent during certain phases of an HTTP/3
1229    /// message exchange, see [Section 4.1 of RFC 9114]. The [`FrameUnexpected`]
1230    /// error is returned if this method, or [`send_response()`],
1231    /// are called multiple times with the same `stream_id` value.
1232    ///
1233    /// The [`StreamBlocked`] error is returned when the underlying QUIC stream
1234    /// doesn't have enough capacity for the operation to complete. When this
1235    /// happens the application should retry the operation once the stream is
1236    /// reported as writable again.
1237    ///
1238    /// [`send_body()`]: struct.Connection.html#method.send_body
1239    /// [`send_additional_headers()`]:
1240    ///     struct.Connection.html#method.send_additional_headers
1241    /// [`send_response()`]:
1242    ///     struct.Connection.html#method.send_response
1243    /// [`FrameUnexpected`]: enum.Error.html#variant.FrameUnexpected
1244    /// [`StreamBlocked`]: enum.Error.html#variant.StreamBlocked
1245    /// [Extensible Priority]: https://www.rfc-editor.org/rfc/rfc9218.html#section-4.
1246    pub fn send_response_with_priority<T: NameValue, F: BufFactory>(
1247        &mut self, conn: &mut super::Connection<F>, stream_id: u64,
1248        headers: &[T], priority: &Priority, fin: bool,
1249    ) -> Result<()> {
1250        match self.streams.get(&stream_id) {
1251            Some(s) => {
1252                // Only one initial HEADERS allowed.
1253                if s.local_initialized() {
1254                    return Err(Error::FrameUnexpected);
1255                }
1256
1257                s
1258            },
1259
1260            None => return Err(Error::FrameUnexpected),
1261        };
1262
1263        self.send_headers(conn, stream_id, headers, fin)?;
1264
1265        // Clamp and shift urgency into quiche-priority space
1266        let urgency = priority
1267            .urgency
1268            .clamp(PRIORITY_URGENCY_LOWER_BOUND, PRIORITY_URGENCY_UPPER_BOUND) +
1269            PRIORITY_URGENCY_OFFSET;
1270
1271        conn.stream_priority(stream_id, urgency, priority.incremental)?;
1272
1273        Ok(())
1274    }
1275
1276    /// Sends additional HTTP/3 headers.
1277    ///
1278    /// After the initial request or response headers have been sent, using
1279    /// [`send_request()`] or [`send_response()`] respectively, this method can
1280    /// be used send an additional HEADERS frame. For example, to send a single
1281    /// instance of trailers after a request with a body, or to issue another
1282    /// non-final 1xx after a preceding 1xx, or to issue a final response after
1283    /// a preceding 1xx.
1284    ///
1285    /// Additional headers can only be sent during certain phases of an HTTP/3
1286    /// message exchange, see [Section 4.1 of RFC 9114]. The [`FrameUnexpected`]
1287    /// error is returned when this method is called during the wrong phase,
1288    /// such as before initial headers have been sent, or if trailers have
1289    /// already been sent.
1290    ///
1291    /// The [`StreamBlocked`] error is returned when the underlying QUIC stream
1292    /// doesn't have enough capacity for the operation to complete. When this
1293    /// happens the application should retry the operation once the stream is
1294    /// reported as writable again.
1295    ///
1296    /// [`send_request()`]: struct.Connection.html#method.send_request
1297    /// [`send_response()`]: struct.Connection.html#method.send_response
1298    /// [`StreamBlocked`]: enum.Error.html#variant.StreamBlocked
1299    /// [`FrameUnexpected`]: enum.Error.html#variant.FrameUnexpected
1300    /// [Section 4.1 of RFC 9114]:
1301    ///     https://www.rfc-editor.org/rfc/rfc9114.html#section-4.1.
1302    pub fn send_additional_headers<T: NameValue, F: BufFactory>(
1303        &mut self, conn: &mut super::Connection<F>, stream_id: u64,
1304        headers: &[T], is_trailer_section: bool, fin: bool,
1305    ) -> Result<()> {
1306        // Clients can only send trailer headers.
1307        if !self.is_server && !is_trailer_section {
1308            return Err(Error::FrameUnexpected);
1309        }
1310
1311        match self.streams.get(&stream_id) {
1312            Some(s) => {
1313                // Initial HEADERS must have been sent.
1314                if !s.local_initialized() {
1315                    return Err(Error::FrameUnexpected);
1316                }
1317
1318                // Only one trailing HEADERS allowed.
1319                if s.trailers_sent() {
1320                    return Err(Error::FrameUnexpected);
1321                }
1322
1323                s
1324            },
1325
1326            None => return Err(Error::FrameUnexpected),
1327        };
1328
1329        self.send_headers(conn, stream_id, headers, fin)?;
1330
1331        if is_trailer_section {
1332            // send_headers() might have tidied the stream away, so we need to
1333            // check again.
1334            if let Some(s) = self.streams.get_mut(&stream_id) {
1335                s.mark_trailers_sent();
1336            }
1337        }
1338
1339        Ok(())
1340    }
1341
1342    /// Sends additional HTTP/3 headers with specified priority.
1343    ///
1344    /// After the initial request or response headers have been sent, using
1345    /// [`send_request()`] or [`send_response()`] respectively, this method can
1346    /// be used send an additional HEADERS frame. For example, to send a single
1347    /// instance of trailers after a request with a body, or to issue another
1348    /// non-final 1xx after a preceding 1xx, or to issue a final response after
1349    /// a preceding 1xx.
1350    ///
1351    /// The `priority` parameter represents [Extensible Priority]
1352    /// parameters. If the urgency is outside the range 0-7, it will be clamped
1353    /// to 7.
1354    ///
1355    /// Additional headers can only be sent during certain phases of an HTTP/3
1356    /// message exchange, see [Section 4.1 of RFC 9114]. The [`FrameUnexpected`]
1357    /// error is returned when this method is called during the wrong phase,
1358    /// such as before initial headers have been sent, or if trailers have
1359    /// already been sent.
1360    ///
1361    /// The [`StreamBlocked`] error is returned when the underlying QUIC stream
1362    /// doesn't have enough capacity for the operation to complete. When this
1363    /// happens the application should retry the operation once the stream is
1364    /// reported as writable again.
1365    ///
1366    /// [`send_request()`]: struct.Connection.html#method.send_request
1367    /// [`send_response()`]: struct.Connection.html#method.send_response
1368    /// [`StreamBlocked`]: enum.Error.html#variant.StreamBlocked
1369    /// [`FrameUnexpected`]: enum.Error.html#variant.FrameUnexpected
1370    /// [Section 4.1 of RFC 9114]:
1371    ///     https://www.rfc-editor.org/rfc/rfc9114.html#section-4.1.
1372    /// [Extensible Priority]: https://www.rfc-editor.org/rfc/rfc9218.html#section-4.
1373    pub fn send_additional_headers_with_priority<T: NameValue, F: BufFactory>(
1374        &mut self, conn: &mut super::Connection<F>, stream_id: u64,
1375        headers: &[T], priority: &Priority, is_trailer_section: bool, fin: bool,
1376    ) -> Result<()> {
1377        self.send_additional_headers(
1378            conn,
1379            stream_id,
1380            headers,
1381            is_trailer_section,
1382            fin,
1383        )?;
1384
1385        // Clamp and shift urgency into quiche-priority space
1386        let urgency = priority
1387            .urgency
1388            .clamp(PRIORITY_URGENCY_LOWER_BOUND, PRIORITY_URGENCY_UPPER_BOUND) +
1389            PRIORITY_URGENCY_OFFSET;
1390
1391        conn.stream_priority(stream_id, urgency, priority.incremental)?;
1392
1393        Ok(())
1394    }
1395
1396    fn encode_header_block<T: NameValue>(
1397        &mut self, headers: &[T],
1398    ) -> Result<Vec<u8>> {
1399        let headers_len = headers
1400            .iter()
1401            .fold(0, |acc, h| acc + h.value().len() + h.name().len() + 32);
1402
1403        let mut header_block = vec![0; headers_len];
1404        let len = self
1405            .qpack_encoder
1406            .encode(headers, &mut header_block)
1407            .map_err(|_| Error::InternalError)?;
1408
1409        header_block.truncate(len);
1410
1411        Ok(header_block)
1412    }
1413
1414    fn send_headers<T: NameValue, F: BufFactory>(
1415        &mut self, conn: &mut super::Connection<F>, stream_id: u64,
1416        headers: &[T], fin: bool,
1417    ) -> Result<()> {
1418        let mut d = [42; 10];
1419        let mut b = octets::OctetsMut::with_slice(&mut d);
1420
1421        if !self.frames_greased && conn.grease {
1422            self.send_grease_frames(conn, stream_id)?;
1423            self.frames_greased = true;
1424        }
1425
1426        let header_block = self.encode_header_block(headers)?;
1427
1428        let overhead = octets::varint_len(frame::HEADERS_FRAME_TYPE_ID) +
1429            octets::varint_len(header_block.len() as u64);
1430
1431        // Headers need to be sent atomically, so make sure the stream has
1432        // enough capacity.
1433        match conn.stream_writable(stream_id, overhead + header_block.len()) {
1434            Ok(true) => (),
1435
1436            Ok(false) => return Err(Error::StreamBlocked),
1437
1438            Err(e) => {
1439                if conn.stream_finished(stream_id) {
1440                    self.streams.remove(&stream_id);
1441                }
1442
1443                return Err(e.into());
1444            },
1445        };
1446
1447        b.put_varint(frame::HEADERS_FRAME_TYPE_ID)?;
1448        b.put_varint(header_block.len() as u64)?;
1449        let off = b.off();
1450        conn.stream_send(stream_id, &d[..off], false)?;
1451
1452        // Sending header block separately avoids unnecessary copy.
1453        conn.stream_send(stream_id, &header_block, fin)?;
1454
1455        trace!(
1456            "{} tx frm HEADERS stream={} len={} fin={}",
1457            conn.trace_id(),
1458            stream_id,
1459            header_block.len(),
1460            fin
1461        );
1462
1463        qlog_with_type!(QLOG_FRAME_CREATED, conn.qlog, q, {
1464            let qlog_headers = headers
1465                .iter()
1466                .map(|h| qlog::events::h3::HttpHeader {
1467                    name: String::from_utf8_lossy(h.name()).into_owned(),
1468                    value: String::from_utf8_lossy(h.value()).into_owned(),
1469                })
1470                .collect();
1471
1472            let frame = Http3Frame::Headers {
1473                headers: qlog_headers,
1474            };
1475            let ev_data = EventData::H3FrameCreated(H3FrameCreated {
1476                stream_id,
1477                length: Some(header_block.len() as u64),
1478                frame,
1479                ..Default::default()
1480            });
1481
1482            q.add_event_data_now(ev_data).ok();
1483        });
1484
1485        if let Some(s) = self.streams.get_mut(&stream_id) {
1486            s.initialize_local();
1487        }
1488
1489        if fin && conn.stream_finished(stream_id) {
1490            self.streams.remove(&stream_id);
1491        }
1492
1493        Ok(())
1494    }
1495
1496    /// Sends an HTTP/3 body chunk on the given stream.
1497    ///
1498    /// On success the number of bytes written is returned, or [`Done`] if no
1499    /// bytes could be written (e.g. because the stream is blocked).
1500    ///
1501    /// Note that the number of written bytes returned can be lower than the
1502    /// length of the input buffer when the underlying QUIC stream doesn't have
1503    /// enough capacity for the operation to complete.
1504    ///
1505    /// When a partial write happens (including when [`Done`] is returned) the
1506    /// application should retry the operation once the stream is reported as
1507    /// writable again.
1508    ///
1509    /// [`Done`]: enum.Error.html#variant.Done
1510    pub fn send_body<F: BufFactory>(
1511        &mut self, conn: &mut super::Connection<F>, stream_id: u64, body: &[u8],
1512        fin: bool,
1513    ) -> Result<usize> {
1514        self.do_send_body(
1515            conn,
1516            stream_id,
1517            body,
1518            fin,
1519            |conn: &mut super::Connection<F>,
1520             header: &[u8],
1521             stream_id: u64,
1522             body: &[u8],
1523             body_len: usize,
1524             fin: bool| {
1525                conn.stream_send(stream_id, header, false)?;
1526                Ok(conn
1527                    .stream_send(stream_id, &body[..body_len], fin)
1528                    .map(|v| (v, v))?)
1529            },
1530        )
1531    }
1532
1533    /// Sends an HTTP/3 body chunk provided as a raw buffer on the given stream.
1534    ///
1535    /// If the capacity allows it the buffer will be appended to the stream's
1536    /// send queue with zero copying.
1537    ///
1538    /// On success the number of bytes written is returned, or [`Done`] if no
1539    /// bytes could be written (e.g. because the stream is blocked).
1540    ///
1541    /// Note that the number of written bytes returned can be lower than the
1542    /// length of the input buffer when the underlying QUIC stream doesn't have
1543    /// enough capacity for the operation to complete.
1544    ///
1545    /// When a partial write happens (including when [`Done`] is returned) the
1546    /// remaining (unwrittent) buffer will also be returned. The application
1547    /// should retry the operation once the stream is reported as writable
1548    /// again.
1549    ///
1550    /// [`Done`]: enum.Error.html#variant.Done
1551    pub fn send_body_zc<F>(
1552        &mut self, conn: &mut super::Connection<F>, stream_id: u64,
1553        body: &mut F::Buf, fin: bool,
1554    ) -> Result<usize>
1555    where
1556        F: BufFactory,
1557        F::Buf: BufSplit,
1558    {
1559        self.do_send_body(
1560            conn,
1561            stream_id,
1562            body,
1563            fin,
1564            |conn: &mut super::Connection<F>,
1565             header: &[u8],
1566             stream_id: u64,
1567             body: &mut F::Buf,
1568             mut body_len: usize,
1569             fin: bool| {
1570                let with_prefix = body.try_add_prefix(header);
1571                if !with_prefix {
1572                    conn.stream_send(stream_id, header, false)?;
1573                } else {
1574                    body_len += header.len();
1575                }
1576
1577                let (mut n, rem) = conn.stream_send_zc(
1578                    stream_id,
1579                    body.clone(),
1580                    Some(body_len),
1581                    fin,
1582                )?;
1583
1584                if with_prefix {
1585                    n -= header.len();
1586                }
1587
1588                if let Some(rem) = rem {
1589                    let _ = std::mem::replace(body, rem);
1590                }
1591
1592                Ok((n, n))
1593            },
1594        )
1595    }
1596
1597    fn do_send_body<F, B, R, SND>(
1598        &mut self, conn: &mut super::Connection<F>, stream_id: u64, body: B,
1599        fin: bool, write_fn: SND,
1600    ) -> Result<R>
1601    where
1602        F: BufFactory,
1603        B: AsRef<[u8]>,
1604        SND: FnOnce(
1605            &mut super::Connection<F>,
1606            &[u8],
1607            u64,
1608            B,
1609            usize,
1610            bool,
1611        ) -> Result<(usize, R)>,
1612    {
1613        let mut d = [42; 10];
1614        let mut b = octets::OctetsMut::with_slice(&mut d);
1615
1616        let len = body.as_ref().len();
1617
1618        // Validate that it is sane to send data on the stream.
1619        if stream_id % 4 != 0 {
1620            return Err(Error::FrameUnexpected);
1621        }
1622
1623        match self.streams.get_mut(&stream_id) {
1624            Some(s) => {
1625                if !s.local_initialized() {
1626                    return Err(Error::FrameUnexpected);
1627                }
1628
1629                if s.trailers_sent() {
1630                    return Err(Error::FrameUnexpected);
1631                }
1632            },
1633
1634            None => {
1635                return Err(Error::FrameUnexpected);
1636            },
1637        };
1638
1639        // Avoid sending 0-length DATA frames when the fin flag is false.
1640        if len == 0 && !fin {
1641            return Err(Error::Done);
1642        }
1643
1644        let overhead = octets::varint_len(frame::DATA_FRAME_TYPE_ID) +
1645            octets::varint_len(len as u64);
1646
1647        let stream_cap = match conn.stream_capacity(stream_id) {
1648            Ok(v) => v,
1649
1650            Err(e) => {
1651                if conn.stream_finished(stream_id) {
1652                    self.streams.remove(&stream_id);
1653                }
1654
1655                return Err(e.into());
1656            },
1657        };
1658
1659        // Make sure there is enough capacity to send the DATA frame header.
1660        if stream_cap < overhead {
1661            let _ = conn.stream_writable(stream_id, overhead + 1);
1662            return Err(Error::Done);
1663        }
1664
1665        // Cap the frame payload length to the stream's capacity.
1666        let body_len = std::cmp::min(len, stream_cap - overhead);
1667
1668        // If we can't send the entire body, set the fin flag to false so the
1669        // application can try again later.
1670        let fin = if body_len != len { false } else { fin };
1671
1672        // Again, avoid sending 0-length DATA frames when the fin flag is false.
1673        if body_len == 0 && !fin {
1674            let _ = conn.stream_writable(stream_id, overhead + 1);
1675            return Err(Error::Done);
1676        }
1677
1678        b.put_varint(frame::DATA_FRAME_TYPE_ID)?;
1679        b.put_varint(body_len as u64)?;
1680        let off = b.off();
1681
1682        // Return how many bytes were written, excluding the frame header.
1683        // Sending body separately avoids unnecessary copy.
1684        let (written, ret) =
1685            write_fn(conn, &d[..off], stream_id, body, body_len, fin)?;
1686
1687        trace!(
1688            "{} tx frm DATA stream={} len={} fin={}",
1689            conn.trace_id(),
1690            stream_id,
1691            written,
1692            fin
1693        );
1694
1695        qlog_with_type!(QLOG_FRAME_CREATED, conn.qlog, q, {
1696            let frame = Http3Frame::Data { raw: None };
1697            let ev_data = EventData::H3FrameCreated(H3FrameCreated {
1698                stream_id,
1699                length: Some(written as u64),
1700                frame,
1701                ..Default::default()
1702            });
1703
1704            q.add_event_data_now(ev_data).ok();
1705        });
1706
1707        if written < len {
1708            // Ensure the peer is notified that the connection or stream is
1709            // blocked when the stream's capacity is limited by flow control.
1710            //
1711            // We only need enough capacity to send a few bytes, to make sure
1712            // the stream doesn't hang due to congestion window not growing
1713            // enough.
1714            let _ = conn.stream_writable(stream_id, overhead + 1);
1715        }
1716
1717        if fin && written == len && conn.stream_finished(stream_id) {
1718            self.streams.remove(&stream_id);
1719        }
1720
1721        Ok(ret)
1722    }
1723
1724    /// Returns whether the peer enabled HTTP/3 DATAGRAM frame support.
1725    ///
1726    /// Support is signalled by the peer's SETTINGS, so this method always
1727    /// returns false until they have been processed using the [`poll()`]
1728    /// method.
1729    ///
1730    /// [`poll()`]: struct.Connection.html#method.poll
1731    pub fn dgram_enabled_by_peer<F: BufFactory>(
1732        &self, conn: &super::Connection<F>,
1733    ) -> bool {
1734        self.peer_settings.h3_datagram == Some(1) &&
1735            conn.dgram_max_writable_len().is_some()
1736    }
1737
1738    /// Returns whether the peer enabled extended CONNECT support.
1739    ///
1740    /// Support is signalled by the peer's SETTINGS, so this method always
1741    /// returns false until they have been processed using the [`poll()`]
1742    /// method.
1743    ///
1744    /// [`poll()`]: struct.Connection.html#method.poll
1745    pub fn extended_connect_enabled_by_peer(&self) -> bool {
1746        self.peer_settings.connect_protocol_enabled == Some(1)
1747    }
1748
1749    /// Reads request or response body data into the provided buffer.
1750    ///
1751    /// Applications should call this method whenever the [`poll()`] method
1752    /// returns a [`Data`] event.
1753    ///
1754    /// On success the amount of bytes read is returned, or [`Done`] if there
1755    /// is no data to read.
1756    ///
1757    /// [`poll()`]: struct.Connection.html#method.poll
1758    /// [`Data`]: enum.Event.html#variant.Data
1759    /// [`Done`]: enum.Error.html#variant.Done
1760    pub fn recv_body<F: BufFactory>(
1761        &mut self, conn: &mut super::Connection<F>, stream_id: u64,
1762        out: &mut [u8],
1763    ) -> Result<usize> {
1764        let mut total = 0;
1765
1766        // Try to consume all buffered data for the stream, even across multiple
1767        // DATA frames.
1768        while total < out.len() {
1769            let stream = self.streams.get_mut(&stream_id).ok_or(Error::Done)?;
1770
1771            if stream.state() != stream::State::Data {
1772                break;
1773            }
1774
1775            let (read, fin) =
1776                match stream.try_consume_data(conn, &mut out[total..]) {
1777                    Ok(v) => v,
1778
1779                    Err(Error::Done) => break,
1780
1781                    Err(e) => return Err(e),
1782                };
1783
1784            total += read;
1785
1786            // No more data to read, we are done.
1787            if read == 0 || fin {
1788                break;
1789            }
1790
1791            // Process incoming data from the stream. For example, if a whole
1792            // DATA frame was consumed, and another one is queued behind it,
1793            // this will ensure the additional data will also be returned to
1794            // the application.
1795            match self.process_readable_stream(conn, stream_id, false) {
1796                Ok(_) => unreachable!(),
1797
1798                Err(Error::Done) => (),
1799
1800                Err(e) => return Err(e),
1801            };
1802
1803            if conn.stream_finished(stream_id) {
1804                break;
1805            }
1806        }
1807
1808        // While body is being received, the stream is marked as finished only
1809        // when all data is read by the application.
1810        if conn.stream_finished(stream_id) {
1811            self.process_finished_stream(stream_id);
1812        }
1813
1814        if total == 0 {
1815            return Err(Error::Done);
1816        }
1817
1818        Ok(total)
1819    }
1820
1821    /// Sends a PRIORITY_UPDATE frame on the control stream with specified
1822    /// request stream ID and priority.
1823    ///
1824    /// The `priority` parameter represents [Extensible Priority]
1825    /// parameters. If the urgency is outside the range 0-7, it will be clamped
1826    /// to 7.
1827    ///
1828    /// The [`StreamBlocked`] error is returned when the underlying QUIC stream
1829    /// doesn't have enough capacity for the operation to complete. When this
1830    /// happens the application should retry the operation once the stream is
1831    /// reported as writable again.
1832    ///
1833    /// [`StreamBlocked`]: enum.Error.html#variant.StreamBlocked
1834    /// [Extensible Priority]: https://www.rfc-editor.org/rfc/rfc9218.html#section-4.
1835    pub fn send_priority_update_for_request<F: BufFactory>(
1836        &mut self, conn: &mut super::Connection<F>, stream_id: u64,
1837        priority: &Priority,
1838    ) -> Result<()> {
1839        let mut d = [42; 20];
1840        let mut b = octets::OctetsMut::with_slice(&mut d);
1841
1842        // Validate that it is sane to send PRIORITY_UPDATE.
1843        if self.is_server {
1844            return Err(Error::FrameUnexpected);
1845        }
1846
1847        if stream_id % 4 != 0 {
1848            return Err(Error::FrameUnexpected);
1849        }
1850
1851        let control_stream_id =
1852            self.control_stream_id.ok_or(Error::FrameUnexpected)?;
1853
1854        let urgency = priority
1855            .urgency
1856            .clamp(PRIORITY_URGENCY_LOWER_BOUND, PRIORITY_URGENCY_UPPER_BOUND);
1857
1858        let mut field_value = format!("u={urgency}");
1859
1860        if priority.incremental {
1861            field_value.push_str(",i");
1862        }
1863
1864        let priority_field_value = field_value.as_bytes();
1865        let frame_payload_len =
1866            octets::varint_len(stream_id) + priority_field_value.len();
1867
1868        let overhead =
1869            octets::varint_len(frame::PRIORITY_UPDATE_FRAME_REQUEST_TYPE_ID) +
1870                octets::varint_len(stream_id) +
1871                octets::varint_len(frame_payload_len as u64);
1872
1873        // Make sure the control stream has enough capacity.
1874        match conn.stream_writable(
1875            control_stream_id,
1876            overhead + priority_field_value.len(),
1877        ) {
1878            Ok(true) => (),
1879
1880            Ok(false) => return Err(Error::StreamBlocked),
1881
1882            Err(e) => {
1883                return Err(e.into());
1884            },
1885        }
1886
1887        b.put_varint(frame::PRIORITY_UPDATE_FRAME_REQUEST_TYPE_ID)?;
1888        b.put_varint(frame_payload_len as u64)?;
1889        b.put_varint(stream_id)?;
1890        let off = b.off();
1891        conn.stream_send(control_stream_id, &d[..off], false)?;
1892
1893        // Sending field value separately avoids unnecessary copy.
1894        conn.stream_send(control_stream_id, priority_field_value, false)?;
1895
1896        trace!(
1897            "{} tx frm PRIORITY_UPDATE request_stream={} priority_field_value={}",
1898            conn.trace_id(),
1899            stream_id,
1900            field_value,
1901        );
1902
1903        qlog_with_type!(QLOG_FRAME_CREATED, conn.qlog, q, {
1904            let frame = Http3Frame::PriorityUpdate {
1905                target_stream_type: H3PriorityTargetStreamType::Request,
1906                prioritized_element_id: stream_id,
1907                priority_field_value: field_value.clone(),
1908            };
1909
1910            let ev_data = EventData::H3FrameCreated(H3FrameCreated {
1911                stream_id,
1912                length: Some(priority_field_value.len() as u64),
1913                frame,
1914                ..Default::default()
1915            });
1916
1917            q.add_event_data_now(ev_data).ok();
1918        });
1919
1920        Ok(())
1921    }
1922
1923    /// Take the last PRIORITY_UPDATE for a prioritized element ID.
1924    ///
1925    /// When the [`poll()`] method returns a [`PriorityUpdate`] event for a
1926    /// prioritized element, the event has triggered and will not rearm until
1927    /// applications call this method. It is recommended that applications defer
1928    /// taking the PRIORITY_UPDATE until after [`poll()`] returns [`Done`].
1929    ///
1930    /// On success the Priority Field Value is returned, or [`Done`] if there is
1931    /// no PRIORITY_UPDATE to read (either because there is no value to take, or
1932    /// because the prioritized element does not exist).
1933    ///
1934    /// [`poll()`]: struct.Connection.html#method.poll
1935    /// [`PriorityUpdate`]: enum.Event.html#variant.PriorityUpdate
1936    /// [`Done`]: enum.Error.html#variant.Done
1937    pub fn take_last_priority_update(
1938        &mut self, prioritized_element_id: u64,
1939    ) -> Result<Vec<u8>> {
1940        if let Some(stream) = self.streams.get_mut(&prioritized_element_id) {
1941            return stream.take_last_priority_update().ok_or(Error::Done);
1942        }
1943
1944        Err(Error::Done)
1945    }
1946
1947    /// Processes HTTP/3 data received from the peer.
1948    ///
1949    /// On success it returns an [`Event`] and an ID, or [`Done`] when there are
1950    /// no events to report.
1951    ///
1952    /// Note that all events are edge-triggered, meaning that once reported they
1953    /// will not be reported again by calling this method again, until the event
1954    /// is re-armed.
1955    ///
1956    /// The events [`Headers`], [`Data`] and [`Finished`] return a stream ID,
1957    /// which is used in methods [`recv_body()`], [`send_response()`] or
1958    /// [`send_body()`].
1959    ///
1960    /// The event [`GoAway`] returns an ID that depends on the connection role.
1961    /// A client receives the largest processed stream ID. A server receives the
1962    /// the largest permitted push ID.
1963    ///
1964    /// The event [`PriorityUpdate`] only occurs at servers. It returns a
1965    /// prioritized element ID that is used in the method
1966    /// [`take_last_priority_update()`], which rearms the event for that ID.
1967    ///
1968    /// If an error occurs while processing data, the connection is closed with
1969    /// the appropriate error code, using the transport's [`close()`] method.
1970    ///
1971    /// [`Event`]: enum.Event.html
1972    /// [`Done`]: enum.Error.html#variant.Done
1973    /// [`Headers`]: enum.Event.html#variant.Headers
1974    /// [`Data`]: enum.Event.html#variant.Data
1975    /// [`Finished`]: enum.Event.html#variant.Finished
1976    /// [`GoAway`]: enum.Event.html#variant.GoAWay
1977    /// [`PriorityUpdate`]: enum.Event.html#variant.PriorityUpdate
1978    /// [`recv_body()`]: struct.Connection.html#method.recv_body
1979    /// [`send_response()`]: struct.Connection.html#method.send_response
1980    /// [`send_body()`]: struct.Connection.html#method.send_body
1981    /// [`recv_dgram()`]: struct.Connection.html#method.recv_dgram
1982    /// [`take_last_priority_update()`]: struct.Connection.html#method.take_last_priority_update
1983    /// [`close()`]: ../struct.Connection.html#method.close
1984    pub fn poll<F: BufFactory>(
1985        &mut self, conn: &mut super::Connection<F>,
1986    ) -> Result<(u64, Event)> {
1987        // When connection close is initiated by the local application (e.g. due
1988        // to a protocol error), the connection itself might be in a broken
1989        // state, so return early.
1990        if conn.local_error.is_some() {
1991            return Err(Error::Done);
1992        }
1993
1994        // Process control streams first.
1995        if let Some(stream_id) = self.peer_control_stream_id {
1996            match self.process_control_stream(conn, stream_id) {
1997                Ok(ev) => return Ok(ev),
1998
1999                Err(Error::Done) => (),
2000
2001                Err(e) => return Err(e),
2002            };
2003        }
2004
2005        if let Some(stream_id) = self.peer_qpack_streams.encoder_stream_id {
2006            match self.process_control_stream(conn, stream_id) {
2007                Ok(ev) => return Ok(ev),
2008
2009                Err(Error::Done) => (),
2010
2011                Err(e) => return Err(e),
2012            };
2013        }
2014
2015        if let Some(stream_id) = self.peer_qpack_streams.decoder_stream_id {
2016            match self.process_control_stream(conn, stream_id) {
2017                Ok(ev) => return Ok(ev),
2018
2019                Err(Error::Done) => (),
2020
2021                Err(e) => return Err(e),
2022            };
2023        }
2024
2025        // Process finished streams list.
2026        if let Some(finished) = self.finished_streams.pop_front() {
2027            return Ok((finished, Event::Finished));
2028        }
2029
2030        // Process HTTP/3 data from readable streams.
2031        for s in conn.readable() {
2032            trace!("{} stream id {} is readable", conn.trace_id(), s);
2033
2034            let ev = match self.process_readable_stream(conn, s, true) {
2035                Ok(v) => Some(v),
2036
2037                Err(Error::Done) => None,
2038
2039                // Return early if the stream was reset, to avoid returning
2040                // a Finished event later as well.
2041                Err(Error::TransportError(crate::Error::StreamReset(e))) =>
2042                    return Ok((s, Event::Reset(e))),
2043
2044                Err(e) => return Err(e),
2045            };
2046
2047            if conn.stream_finished(s) {
2048                self.process_finished_stream(s);
2049            }
2050
2051            // TODO: check if stream is completed so it can be freed
2052            if let Some(ev) = ev {
2053                return Ok(ev);
2054            }
2055        }
2056
2057        // Process finished streams list once again, to make sure `Finished`
2058        // events are returned when receiving empty stream frames with the fin
2059        // flag set.
2060        if let Some(finished) = self.finished_streams.pop_front() {
2061            if conn.stream_readable(finished) {
2062                // The stream is finished, but is still readable, it may
2063                // indicate that there is a pending error, such as reset.
2064                if let Err(crate::Error::StreamReset(e)) =
2065                    conn.stream_recv(finished, &mut [])
2066                {
2067                    return Ok((finished, Event::Reset(e)));
2068                }
2069            }
2070            return Ok((finished, Event::Finished));
2071        }
2072
2073        Err(Error::Done)
2074    }
2075
2076    /// Sends a GOAWAY frame to initiate graceful connection closure.
2077    ///
2078    /// When quiche is used in the server role, the `id` parameter is the stream
2079    /// ID of the highest processed request. This can be any valid ID between 0
2080    /// and 2^62-4. However, the ID cannot be increased. Failure to satisfy
2081    /// these conditions will return an error.
2082    ///
2083    /// This method does not close the QUIC connection. Applications are
2084    /// required to call [`close()`] themselves.
2085    ///
2086    /// [`close()`]: ../struct.Connection.html#method.close
2087    pub fn send_goaway<F: BufFactory>(
2088        &mut self, conn: &mut super::Connection<F>, id: u64,
2089    ) -> Result<()> {
2090        let mut id = id;
2091
2092        // TODO: server push
2093        //
2094        // In the meantime always send 0 from client.
2095        if !self.is_server {
2096            id = 0;
2097        }
2098
2099        if self.is_server && id % 4 != 0 {
2100            return Err(Error::IdError);
2101        }
2102
2103        if let Some(sent_id) = self.local_goaway_id {
2104            if id > sent_id {
2105                return Err(Error::IdError);
2106            }
2107        }
2108
2109        if let Some(stream_id) = self.control_stream_id {
2110            let mut d = [42; 10];
2111            let mut b = octets::OctetsMut::with_slice(&mut d);
2112
2113            let frame = frame::Frame::GoAway { id };
2114
2115            let wire_len = frame.to_bytes(&mut b)?;
2116            let stream_cap = conn.stream_capacity(stream_id)?;
2117
2118            if stream_cap < wire_len {
2119                return Err(Error::StreamBlocked);
2120            }
2121
2122            trace!("{} tx frm {:?}", conn.trace_id(), frame);
2123
2124            qlog_with_type!(QLOG_FRAME_CREATED, conn.qlog, q, {
2125                let ev_data = EventData::H3FrameCreated(H3FrameCreated {
2126                    stream_id,
2127                    length: Some(octets::varint_len(id) as u64),
2128                    frame: frame.to_qlog(),
2129                    ..Default::default()
2130                });
2131
2132                q.add_event_data_now(ev_data).ok();
2133            });
2134
2135            let off = b.off();
2136            conn.stream_send(stream_id, &d[..off], false)?;
2137
2138            self.local_goaway_id = Some(id);
2139        }
2140
2141        Ok(())
2142    }
2143
2144    /// Gets the raw settings from peer including unknown and reserved types.
2145    ///
2146    /// The order of settings is the same as received in the SETTINGS frame.
2147    pub fn peer_settings_raw(&self) -> Option<&[(u64, u64)]> {
2148        self.peer_settings.raw.as_deref()
2149    }
2150
2151    fn open_uni_stream<F: BufFactory>(
2152        &mut self, conn: &mut super::Connection<F>, ty: u64,
2153    ) -> Result<u64> {
2154        let stream_id = self.next_uni_stream_id;
2155
2156        let mut d = [0; 8];
2157        let mut b = octets::OctetsMut::with_slice(&mut d);
2158
2159        match ty {
2160            // Control and QPACK streams are the most important to schedule.
2161            stream::HTTP3_CONTROL_STREAM_TYPE_ID |
2162            stream::QPACK_ENCODER_STREAM_TYPE_ID |
2163            stream::QPACK_DECODER_STREAM_TYPE_ID => {
2164                conn.stream_priority(stream_id, 0, false)?;
2165            },
2166
2167            // TODO: Server push
2168            stream::HTTP3_PUSH_STREAM_TYPE_ID => (),
2169
2170            // Anything else is a GREASE stream, so make it the least important.
2171            _ => {
2172                conn.stream_priority(stream_id, 255, false)?;
2173            },
2174        }
2175
2176        conn.stream_send(stream_id, b.put_varint(ty)?, false)?;
2177
2178        // To avoid skipping stream IDs, we only calculate the next available
2179        // stream ID when data has been successfully buffered.
2180        self.next_uni_stream_id = self
2181            .next_uni_stream_id
2182            .checked_add(4)
2183            .ok_or(Error::IdError)?;
2184
2185        Ok(stream_id)
2186    }
2187
2188    fn open_qpack_encoder_stream<F: BufFactory>(
2189        &mut self, conn: &mut super::Connection<F>,
2190    ) -> Result<()> {
2191        let stream_id =
2192            self.open_uni_stream(conn, stream::QPACK_ENCODER_STREAM_TYPE_ID)?;
2193
2194        self.local_qpack_streams.encoder_stream_id = Some(stream_id);
2195
2196        qlog_with_type!(QLOG_STREAM_TYPE_SET, conn.qlog, q, {
2197            let ev_data = EventData::H3StreamTypeSet(H3StreamTypeSet {
2198                stream_id,
2199                owner: Some(H3Owner::Local),
2200                stream_type: H3StreamType::QpackEncode,
2201                ..Default::default()
2202            });
2203
2204            q.add_event_data_now(ev_data).ok();
2205        });
2206
2207        Ok(())
2208    }
2209
2210    fn open_qpack_decoder_stream<F: BufFactory>(
2211        &mut self, conn: &mut super::Connection<F>,
2212    ) -> Result<()> {
2213        let stream_id =
2214            self.open_uni_stream(conn, stream::QPACK_DECODER_STREAM_TYPE_ID)?;
2215
2216        self.local_qpack_streams.decoder_stream_id = Some(stream_id);
2217
2218        qlog_with_type!(QLOG_STREAM_TYPE_SET, conn.qlog, q, {
2219            let ev_data = EventData::H3StreamTypeSet(H3StreamTypeSet {
2220                stream_id,
2221                owner: Some(H3Owner::Local),
2222                stream_type: H3StreamType::QpackDecode,
2223                ..Default::default()
2224            });
2225
2226            q.add_event_data_now(ev_data).ok();
2227        });
2228
2229        Ok(())
2230    }
2231
2232    /// Send GREASE frames on the provided stream ID.
2233    fn send_grease_frames<F: BufFactory>(
2234        &mut self, conn: &mut super::Connection<F>, stream_id: u64,
2235    ) -> Result<()> {
2236        let mut d = [0; 8];
2237
2238        let stream_cap = match conn.stream_capacity(stream_id) {
2239            Ok(v) => v,
2240
2241            Err(e) => {
2242                if conn.stream_finished(stream_id) {
2243                    self.streams.remove(&stream_id);
2244                }
2245
2246                return Err(e.into());
2247            },
2248        };
2249
2250        let grease_frame1 = grease_value();
2251        let grease_frame2 = grease_value();
2252        let grease_payload = b"GREASE is the word";
2253
2254        let overhead = octets::varint_len(grease_frame1) + // frame type
2255            1 + // payload len
2256            octets::varint_len(grease_frame2) + // frame type
2257            1 + // payload len
2258            grease_payload.len(); // payload
2259
2260        // Don't send GREASE if there is not enough capacity for it. Greasing
2261        // will _not_ be attempted again later on.
2262        if stream_cap < overhead {
2263            return Ok(());
2264        }
2265
2266        // Empty GREASE frame.
2267        let mut b = octets::OctetsMut::with_slice(&mut d);
2268        conn.stream_send(stream_id, b.put_varint(grease_frame1)?, false)?;
2269
2270        let mut b = octets::OctetsMut::with_slice(&mut d);
2271        conn.stream_send(stream_id, b.put_varint(0)?, false)?;
2272
2273        trace!(
2274            "{} tx frm GREASE stream={} len=0",
2275            conn.trace_id(),
2276            stream_id
2277        );
2278
2279        qlog_with_type!(QLOG_FRAME_CREATED, conn.qlog, q, {
2280            let frame = Http3Frame::Reserved { length: Some(0) };
2281            let ev_data = EventData::H3FrameCreated(H3FrameCreated {
2282                stream_id,
2283                length: Some(0),
2284                frame,
2285                ..Default::default()
2286            });
2287
2288            q.add_event_data_now(ev_data).ok();
2289        });
2290
2291        // GREASE frame with payload.
2292        let mut b = octets::OctetsMut::with_slice(&mut d);
2293        conn.stream_send(stream_id, b.put_varint(grease_frame2)?, false)?;
2294
2295        let mut b = octets::OctetsMut::with_slice(&mut d);
2296        conn.stream_send(stream_id, b.put_varint(18)?, false)?;
2297
2298        conn.stream_send(stream_id, grease_payload, false)?;
2299
2300        trace!(
2301            "{} tx frm GREASE stream={} len={}",
2302            conn.trace_id(),
2303            stream_id,
2304            grease_payload.len()
2305        );
2306
2307        qlog_with_type!(QLOG_FRAME_CREATED, conn.qlog, q, {
2308            let frame = Http3Frame::Reserved {
2309                length: Some(grease_payload.len() as u64),
2310            };
2311            let ev_data = EventData::H3FrameCreated(H3FrameCreated {
2312                stream_id,
2313                length: Some(grease_payload.len() as u64),
2314                frame,
2315                ..Default::default()
2316            });
2317
2318            q.add_event_data_now(ev_data).ok();
2319        });
2320
2321        Ok(())
2322    }
2323
2324    /// Opens a new unidirectional stream with a GREASE type and sends some
2325    /// unframed payload.
2326    fn open_grease_stream<F: BufFactory>(
2327        &mut self, conn: &mut super::Connection<F>,
2328    ) -> Result<()> {
2329        let ty = grease_value();
2330        match self.open_uni_stream(conn, ty) {
2331            Ok(stream_id) => {
2332                conn.stream_send(stream_id, b"GREASE is the word", true)?;
2333
2334                trace!("{} open GREASE stream {}", conn.trace_id(), stream_id);
2335
2336                qlog_with_type!(QLOG_STREAM_TYPE_SET, conn.qlog, q, {
2337                    let ev_data = EventData::H3StreamTypeSet(H3StreamTypeSet {
2338                        stream_id,
2339                        owner: Some(H3Owner::Local),
2340                        stream_type: H3StreamType::Unknown,
2341                        stream_type_value: Some(ty),
2342                        ..Default::default()
2343                    });
2344
2345                    q.add_event_data_now(ev_data).ok();
2346                });
2347            },
2348
2349            Err(Error::IdError) => {
2350                trace!("{} GREASE stream blocked", conn.trace_id(),);
2351
2352                return Ok(());
2353            },
2354
2355            Err(e) => return Err(e),
2356        };
2357
2358        Ok(())
2359    }
2360
2361    /// Sends SETTINGS frame based on HTTP/3 configuration.
2362    fn send_settings<F: BufFactory>(
2363        &mut self, conn: &mut super::Connection<F>,
2364    ) -> Result<()> {
2365        let stream_id = match self
2366            .open_uni_stream(conn, stream::HTTP3_CONTROL_STREAM_TYPE_ID)
2367        {
2368            Ok(v) => v,
2369
2370            Err(e) => {
2371                trace!("{} Control stream blocked", conn.trace_id(),);
2372
2373                if e == Error::Done {
2374                    return Err(Error::InternalError);
2375                }
2376
2377                return Err(e);
2378            },
2379        };
2380
2381        self.control_stream_id = Some(stream_id);
2382
2383        qlog_with_type!(QLOG_STREAM_TYPE_SET, conn.qlog, q, {
2384            let ev_data = EventData::H3StreamTypeSet(H3StreamTypeSet {
2385                stream_id,
2386                owner: Some(H3Owner::Local),
2387                stream_type: H3StreamType::Control,
2388                ..Default::default()
2389            });
2390
2391            q.add_event_data_now(ev_data).ok();
2392        });
2393
2394        let grease = if conn.grease {
2395            Some((grease_value(), grease_value()))
2396        } else {
2397            None
2398        };
2399
2400        let frame = frame::Frame::Settings {
2401            max_field_section_size: self.local_settings.max_field_section_size,
2402            qpack_max_table_capacity: self
2403                .local_settings
2404                .qpack_max_table_capacity,
2405            qpack_blocked_streams: self.local_settings.qpack_blocked_streams,
2406            connect_protocol_enabled: self
2407                .local_settings
2408                .connect_protocol_enabled,
2409            h3_datagram: self.local_settings.h3_datagram,
2410            grease,
2411            additional_settings: self.local_settings.additional_settings.clone(),
2412            raw: Default::default(),
2413        };
2414
2415        let mut d = [42; 128];
2416        let mut b = octets::OctetsMut::with_slice(&mut d);
2417
2418        frame.to_bytes(&mut b)?;
2419
2420        let off = b.off();
2421
2422        if let Some(id) = self.control_stream_id {
2423            conn.stream_send(id, &d[..off], false)?;
2424
2425            trace!(
2426                "{} tx frm SETTINGS stream={} len={}",
2427                conn.trace_id(),
2428                id,
2429                off
2430            );
2431
2432            qlog_with_type!(QLOG_FRAME_CREATED, conn.qlog, q, {
2433                let frame = frame.to_qlog();
2434                let ev_data = EventData::H3FrameCreated(H3FrameCreated {
2435                    stream_id: id,
2436                    length: Some(off as u64),
2437                    frame,
2438                    ..Default::default()
2439                });
2440
2441                q.add_event_data_now(ev_data).ok();
2442            });
2443        }
2444
2445        Ok(())
2446    }
2447
2448    fn process_control_stream<F: BufFactory>(
2449        &mut self, conn: &mut super::Connection<F>, stream_id: u64,
2450    ) -> Result<(u64, Event)> {
2451        close_conn_if_critical_stream_finished(conn, stream_id)?;
2452
2453        if !conn.stream_readable(stream_id) {
2454            return Err(Error::Done);
2455        }
2456
2457        match self.process_readable_stream(conn, stream_id, true) {
2458            Ok(ev) => return Ok(ev),
2459
2460            Err(Error::Done) => (),
2461
2462            Err(e) => return Err(e),
2463        };
2464
2465        close_conn_if_critical_stream_finished(conn, stream_id)?;
2466
2467        Err(Error::Done)
2468    }
2469
2470    fn process_readable_stream<F: BufFactory>(
2471        &mut self, conn: &mut super::Connection<F>, stream_id: u64, polling: bool,
2472    ) -> Result<(u64, Event)> {
2473        self.streams
2474            .entry(stream_id)
2475            .or_insert_with(|| <stream::Stream>::new(stream_id, false));
2476
2477        // We need to get a fresh reference to the stream for each
2478        // iteration, to avoid borrowing `self` for the entire duration
2479        // of the loop, because we'll need to borrow it again in the
2480        // `State::FramePayload` case below.
2481        while let Some(stream) = self.streams.get_mut(&stream_id) {
2482            match stream.state() {
2483                stream::State::StreamType => {
2484                    stream.try_fill_buffer(conn)?;
2485
2486                    let varint = match stream.try_consume_varint() {
2487                        Ok(v) => v,
2488
2489                        Err(_) => continue,
2490                    };
2491
2492                    let ty = stream::Type::deserialize(varint)?;
2493
2494                    if let Err(e) = stream.set_ty(ty) {
2495                        conn.close(true, e.to_wire(), b"")?;
2496                        return Err(e);
2497                    }
2498
2499                    qlog_with_type!(QLOG_STREAM_TYPE_SET, conn.qlog, q, {
2500                        let ty_val = if matches!(ty, stream::Type::Unknown) {
2501                            Some(varint)
2502                        } else {
2503                            None
2504                        };
2505
2506                        let ev_data =
2507                            EventData::H3StreamTypeSet(H3StreamTypeSet {
2508                                stream_id,
2509                                owner: Some(H3Owner::Remote),
2510                                stream_type: ty.to_qlog(),
2511                                stream_type_value: ty_val,
2512                                ..Default::default()
2513                            });
2514
2515                        q.add_event_data_now(ev_data).ok();
2516                    });
2517
2518                    match &ty {
2519                        stream::Type::Control => {
2520                            // Only one control stream allowed.
2521                            if self.peer_control_stream_id.is_some() {
2522                                conn.close(
2523                                    true,
2524                                    Error::StreamCreationError.to_wire(),
2525                                    b"Received multiple control streams",
2526                                )?;
2527
2528                                return Err(Error::StreamCreationError);
2529                            }
2530
2531                            trace!(
2532                                "{} open peer's control stream {}",
2533                                conn.trace_id(),
2534                                stream_id
2535                            );
2536
2537                            close_conn_if_critical_stream_finished(
2538                                conn, stream_id,
2539                            )?;
2540
2541                            self.peer_control_stream_id = Some(stream_id);
2542                        },
2543
2544                        stream::Type::Push => {
2545                            // Only clients can receive push stream.
2546                            if self.is_server {
2547                                conn.close(
2548                                    true,
2549                                    Error::StreamCreationError.to_wire(),
2550                                    b"Server received push stream.",
2551                                )?;
2552
2553                                return Err(Error::StreamCreationError);
2554                            }
2555                        },
2556
2557                        stream::Type::QpackEncoder => {
2558                            // Only one qpack encoder stream allowed.
2559                            if self.peer_qpack_streams.encoder_stream_id.is_some()
2560                            {
2561                                conn.close(
2562                                    true,
2563                                    Error::StreamCreationError.to_wire(),
2564                                    b"Received multiple QPACK encoder streams",
2565                                )?;
2566
2567                                return Err(Error::StreamCreationError);
2568                            }
2569
2570                            close_conn_if_critical_stream_finished(
2571                                conn, stream_id,
2572                            )?;
2573
2574                            self.peer_qpack_streams.encoder_stream_id =
2575                                Some(stream_id);
2576                        },
2577
2578                        stream::Type::QpackDecoder => {
2579                            // Only one qpack decoder allowed.
2580                            if self.peer_qpack_streams.decoder_stream_id.is_some()
2581                            {
2582                                conn.close(
2583                                    true,
2584                                    Error::StreamCreationError.to_wire(),
2585                                    b"Received multiple QPACK decoder streams",
2586                                )?;
2587
2588                                return Err(Error::StreamCreationError);
2589                            }
2590
2591                            close_conn_if_critical_stream_finished(
2592                                conn, stream_id,
2593                            )?;
2594
2595                            self.peer_qpack_streams.decoder_stream_id =
2596                                Some(stream_id);
2597                        },
2598
2599                        stream::Type::Unknown => {
2600                            // Unknown stream types are ignored.
2601                            // TODO: we MAY send STOP_SENDING
2602                        },
2603
2604                        stream::Type::Request => unreachable!(),
2605                    }
2606                },
2607
2608                stream::State::PushId => {
2609                    stream.try_fill_buffer(conn)?;
2610
2611                    let varint = match stream.try_consume_varint() {
2612                        Ok(v) => v,
2613
2614                        Err(_) => continue,
2615                    };
2616
2617                    if let Err(e) = stream.set_push_id(varint) {
2618                        conn.close(true, e.to_wire(), b"")?;
2619                        return Err(e);
2620                    }
2621                },
2622
2623                stream::State::FrameType => {
2624                    stream.try_fill_buffer(conn)?;
2625
2626                    let varint = match stream.try_consume_varint() {
2627                        Ok(v) => v,
2628
2629                        Err(_) => continue,
2630                    };
2631
2632                    match stream.set_frame_type(varint) {
2633                        Err(Error::FrameUnexpected) => {
2634                            let msg = format!("Unexpected frame type {varint}");
2635
2636                            conn.close(
2637                                true,
2638                                Error::FrameUnexpected.to_wire(),
2639                                msg.as_bytes(),
2640                            )?;
2641
2642                            return Err(Error::FrameUnexpected);
2643                        },
2644
2645                        Err(e) => {
2646                            conn.close(
2647                                true,
2648                                e.to_wire(),
2649                                b"Error handling frame.",
2650                            )?;
2651
2652                            return Err(e);
2653                        },
2654
2655                        _ => (),
2656                    }
2657                },
2658
2659                stream::State::FramePayloadLen => {
2660                    stream.try_fill_buffer(conn)?;
2661
2662                    let payload_len = match stream.try_consume_varint() {
2663                        Ok(v) => v,
2664
2665                        Err(_) => continue,
2666                    };
2667
2668                    // DATA frames are handled uniquely. After this point we lose
2669                    // visibility of DATA framing, so just log here.
2670                    if Some(frame::DATA_FRAME_TYPE_ID) == stream.frame_type() {
2671                        trace!(
2672                            "{} rx frm DATA stream={} wire_payload_len={}",
2673                            conn.trace_id(),
2674                            stream_id,
2675                            payload_len
2676                        );
2677
2678                        qlog_with_type!(QLOG_FRAME_PARSED, conn.qlog, q, {
2679                            let frame = Http3Frame::Data { raw: None };
2680
2681                            let ev_data =
2682                                EventData::H3FrameParsed(H3FrameParsed {
2683                                    stream_id,
2684                                    length: Some(payload_len),
2685                                    frame,
2686                                    ..Default::default()
2687                                });
2688
2689                            q.add_event_data_now(ev_data).ok();
2690                        });
2691                    }
2692
2693                    if let Err(e) = stream.set_frame_payload_len(payload_len) {
2694                        conn.close(true, e.to_wire(), b"")?;
2695                        return Err(e);
2696                    }
2697                },
2698
2699                stream::State::FramePayload => {
2700                    // Do not emit events when not polling.
2701                    if !polling {
2702                        break;
2703                    }
2704
2705                    stream.try_fill_buffer(conn)?;
2706
2707                    let (frame, payload_len) = match stream.try_consume_frame() {
2708                        Ok(frame) => frame,
2709
2710                        Err(Error::Done) => return Err(Error::Done),
2711
2712                        Err(e) => {
2713                            conn.close(
2714                                true,
2715                                e.to_wire(),
2716                                b"Error handling frame.",
2717                            )?;
2718
2719                            return Err(e);
2720                        },
2721                    };
2722
2723                    match self.process_frame(conn, stream_id, frame, payload_len)
2724                    {
2725                        Ok(ev) => return Ok(ev),
2726
2727                        Err(Error::Done) => {
2728                            // This might be a frame that is processed internally
2729                            // without needing to bubble up to the user as an
2730                            // event. Check whether the frame has FIN'd by QUIC
2731                            // to prevent trying to read again on a closed stream.
2732                            if conn.stream_finished(stream_id) {
2733                                break;
2734                            }
2735                        },
2736
2737                        Err(e) => return Err(e),
2738                    };
2739                },
2740
2741                stream::State::Data => {
2742                    // Do not emit events when not polling.
2743                    if !polling {
2744                        break;
2745                    }
2746
2747                    if !stream.try_trigger_data_event() {
2748                        break;
2749                    }
2750
2751                    return Ok((stream_id, Event::Data));
2752                },
2753
2754                stream::State::QpackInstruction => {
2755                    let mut d = [0; 4096];
2756
2757                    // Read data from the stream and discard immediately.
2758                    loop {
2759                        let (recv, fin) = conn.stream_recv(stream_id, &mut d)?;
2760
2761                        match stream.ty() {
2762                            Some(stream::Type::QpackEncoder) =>
2763                                self.peer_qpack_streams.encoder_stream_bytes +=
2764                                    recv as u64,
2765                            Some(stream::Type::QpackDecoder) =>
2766                                self.peer_qpack_streams.decoder_stream_bytes +=
2767                                    recv as u64,
2768                            _ => unreachable!(),
2769                        };
2770
2771                        if fin {
2772                            close_conn_critical_stream(conn)?;
2773                        }
2774                    }
2775                },
2776
2777                stream::State::Drain => {
2778                    // Discard incoming data on the stream.
2779                    conn.stream_shutdown(
2780                        stream_id,
2781                        crate::Shutdown::Read,
2782                        0x100,
2783                    )?;
2784
2785                    break;
2786                },
2787
2788                stream::State::Finished => break,
2789            }
2790        }
2791
2792        Err(Error::Done)
2793    }
2794
2795    fn process_finished_stream(&mut self, stream_id: u64) {
2796        let stream = match self.streams.get_mut(&stream_id) {
2797            Some(v) => v,
2798
2799            None => return,
2800        };
2801
2802        if stream.state() == stream::State::Finished {
2803            return;
2804        }
2805
2806        match stream.ty() {
2807            Some(stream::Type::Request) | Some(stream::Type::Push) => {
2808                stream.finished();
2809
2810                self.finished_streams.push_back(stream_id);
2811            },
2812
2813            _ => (),
2814        };
2815    }
2816
2817    fn process_frame<F: BufFactory>(
2818        &mut self, conn: &mut super::Connection<F>, stream_id: u64,
2819        frame: frame::Frame, payload_len: u64,
2820    ) -> Result<(u64, Event)> {
2821        trace!(
2822            "{} rx frm {:?} stream={} payload_len={}",
2823            conn.trace_id(),
2824            frame,
2825            stream_id,
2826            payload_len
2827        );
2828
2829        qlog_with_type!(QLOG_FRAME_PARSED, conn.qlog, q, {
2830            // HEADERS frames are special case and will be logged below.
2831            if !matches!(frame, frame::Frame::Headers { .. }) {
2832                let frame = frame.to_qlog();
2833                let ev_data = EventData::H3FrameParsed(H3FrameParsed {
2834                    stream_id,
2835                    length: Some(payload_len),
2836                    frame,
2837                    ..Default::default()
2838                });
2839
2840                q.add_event_data_now(ev_data).ok();
2841            }
2842        });
2843
2844        match frame {
2845            frame::Frame::Settings {
2846                max_field_section_size,
2847                qpack_max_table_capacity,
2848                qpack_blocked_streams,
2849                connect_protocol_enabled,
2850                h3_datagram,
2851                additional_settings,
2852                raw,
2853                ..
2854            } => {
2855                self.peer_settings = ConnectionSettings {
2856                    max_field_section_size,
2857                    qpack_max_table_capacity,
2858                    qpack_blocked_streams,
2859                    connect_protocol_enabled,
2860                    h3_datagram,
2861                    additional_settings,
2862                    raw,
2863                };
2864
2865                if let Some(1) = h3_datagram {
2866                    // The peer MUST have also enabled DATAGRAM with a TP
2867                    if conn.dgram_max_writable_len().is_none() {
2868                        conn.close(
2869                            true,
2870                            Error::SettingsError.to_wire(),
2871                            b"H3_DATAGRAM sent with value 1 but max_datagram_frame_size TP not set.",
2872                        )?;
2873
2874                        return Err(Error::SettingsError);
2875                    }
2876                }
2877            },
2878
2879            frame::Frame::Headers { header_block } => {
2880                // Servers reject too many HEADERS frames.
2881                if let Some(s) = self.streams.get_mut(&stream_id) {
2882                    if self.is_server && s.headers_received_count() == 2 {
2883                        conn.close(
2884                            true,
2885                            Error::FrameUnexpected.to_wire(),
2886                            b"Too many HEADERS frames",
2887                        )?;
2888                        return Err(Error::FrameUnexpected);
2889                    }
2890
2891                    s.increment_headers_received();
2892                }
2893
2894                // Use "infinite" as default value for max_field_section_size if
2895                // it is not configured by the application.
2896                let max_size = self
2897                    .local_settings
2898                    .max_field_section_size
2899                    .unwrap_or(u64::MAX);
2900
2901                let headers = match self
2902                    .qpack_decoder
2903                    .decode(&header_block[..], max_size)
2904                {
2905                    Ok(v) => v,
2906
2907                    Err(e) => {
2908                        let e = match e {
2909                            qpack::Error::HeaderListTooLarge =>
2910                                Error::ExcessiveLoad,
2911
2912                            _ => Error::QpackDecompressionFailed,
2913                        };
2914
2915                        conn.close(true, e.to_wire(), b"Error parsing headers.")?;
2916
2917                        return Err(e);
2918                    },
2919                };
2920
2921                qlog_with_type!(QLOG_FRAME_PARSED, conn.qlog, q, {
2922                    let qlog_headers = headers
2923                        .iter()
2924                        .map(|h| qlog::events::h3::HttpHeader {
2925                            name: String::from_utf8_lossy(h.name()).into_owned(),
2926                            value: String::from_utf8_lossy(h.value())
2927                                .into_owned(),
2928                        })
2929                        .collect();
2930
2931                    let frame = Http3Frame::Headers {
2932                        headers: qlog_headers,
2933                    };
2934
2935                    let ev_data = EventData::H3FrameParsed(H3FrameParsed {
2936                        stream_id,
2937                        length: Some(payload_len),
2938                        frame,
2939                        ..Default::default()
2940                    });
2941
2942                    q.add_event_data_now(ev_data).ok();
2943                });
2944
2945                let more_frames = !conn.stream_finished(stream_id);
2946
2947                return Ok((stream_id, Event::Headers {
2948                    list: headers,
2949                    more_frames,
2950                }));
2951            },
2952
2953            frame::Frame::Data { .. } => {
2954                // Do nothing. The Data event is returned separately.
2955            },
2956
2957            frame::Frame::GoAway { id } => {
2958                if !self.is_server && id % 4 != 0 {
2959                    conn.close(
2960                        true,
2961                        Error::FrameUnexpected.to_wire(),
2962                        b"GOAWAY received with ID of non-request stream",
2963                    )?;
2964
2965                    return Err(Error::IdError);
2966                }
2967
2968                if let Some(received_id) = self.peer_goaway_id {
2969                    if id > received_id {
2970                        conn.close(
2971                            true,
2972                            Error::IdError.to_wire(),
2973                            b"GOAWAY received with ID larger than previously received",
2974                        )?;
2975
2976                        return Err(Error::IdError);
2977                    }
2978                }
2979
2980                self.peer_goaway_id = Some(id);
2981
2982                return Ok((id, Event::GoAway));
2983            },
2984
2985            frame::Frame::MaxPushId { push_id } => {
2986                if !self.is_server {
2987                    conn.close(
2988                        true,
2989                        Error::FrameUnexpected.to_wire(),
2990                        b"MAX_PUSH_ID received by client",
2991                    )?;
2992
2993                    return Err(Error::FrameUnexpected);
2994                }
2995
2996                if push_id < self.max_push_id {
2997                    conn.close(
2998                        true,
2999                        Error::IdError.to_wire(),
3000                        b"MAX_PUSH_ID reduced limit",
3001                    )?;
3002
3003                    return Err(Error::IdError);
3004                }
3005
3006                self.max_push_id = push_id;
3007            },
3008
3009            frame::Frame::PushPromise { .. } => {
3010                if self.is_server {
3011                    conn.close(
3012                        true,
3013                        Error::FrameUnexpected.to_wire(),
3014                        b"PUSH_PROMISE received by server",
3015                    )?;
3016
3017                    return Err(Error::FrameUnexpected);
3018                }
3019
3020                if stream_id % 4 != 0 {
3021                    conn.close(
3022                        true,
3023                        Error::FrameUnexpected.to_wire(),
3024                        b"PUSH_PROMISE received on non-request stream",
3025                    )?;
3026
3027                    return Err(Error::FrameUnexpected);
3028                }
3029
3030                // TODO: implement more checks and PUSH_PROMISE event
3031            },
3032
3033            frame::Frame::CancelPush { .. } => {
3034                // TODO: implement CANCEL_PUSH frame
3035            },
3036
3037            frame::Frame::PriorityUpdateRequest {
3038                prioritized_element_id,
3039                priority_field_value,
3040            } => {
3041                if !self.is_server {
3042                    conn.close(
3043                        true,
3044                        Error::FrameUnexpected.to_wire(),
3045                        b"PRIORITY_UPDATE received by client",
3046                    )?;
3047
3048                    return Err(Error::FrameUnexpected);
3049                }
3050
3051                if prioritized_element_id % 4 != 0 {
3052                    conn.close(
3053                        true,
3054                        Error::FrameUnexpected.to_wire(),
3055                        b"PRIORITY_UPDATE for request stream type with wrong ID",
3056                    )?;
3057
3058                    return Err(Error::FrameUnexpected);
3059                }
3060
3061                if prioritized_element_id > conn.streams.max_streams_bidi() * 4 {
3062                    conn.close(
3063                        true,
3064                        Error::IdError.to_wire(),
3065                        b"PRIORITY_UPDATE for request stream beyond max streams limit",
3066                    )?;
3067
3068                    return Err(Error::IdError);
3069                }
3070
3071                // If the PRIORITY_UPDATE is valid, consider storing the latest
3072                // contents. Due to reordering, it is possible that we might
3073                // receive frames that reference streams that have not yet to
3074                // been opened and that's OK because it's within our concurrency
3075                // limit. However, we discard PRIORITY_UPDATE that refers to
3076                // streams that we know have been collected.
3077                if conn.streams.is_collected(prioritized_element_id) {
3078                    return Err(Error::Done);
3079                }
3080
3081                // If the stream did not yet exist, create it and store.
3082                let stream =
3083                    self.streams.entry(prioritized_element_id).or_insert_with(
3084                        || <stream::Stream>::new(prioritized_element_id, false),
3085                    );
3086
3087                let had_priority_update = stream.has_last_priority_update();
3088                stream.set_last_priority_update(Some(priority_field_value));
3089
3090                // Only trigger the event when there wasn't already a stored
3091                // PRIORITY_UPDATE.
3092                if !had_priority_update {
3093                    return Ok((prioritized_element_id, Event::PriorityUpdate));
3094                } else {
3095                    return Err(Error::Done);
3096                }
3097            },
3098
3099            frame::Frame::PriorityUpdatePush {
3100                prioritized_element_id,
3101                ..
3102            } => {
3103                if !self.is_server {
3104                    conn.close(
3105                        true,
3106                        Error::FrameUnexpected.to_wire(),
3107                        b"PRIORITY_UPDATE received by client",
3108                    )?;
3109
3110                    return Err(Error::FrameUnexpected);
3111                }
3112
3113                if prioritized_element_id % 3 != 0 {
3114                    conn.close(
3115                        true,
3116                        Error::FrameUnexpected.to_wire(),
3117                        b"PRIORITY_UPDATE for push stream type with wrong ID",
3118                    )?;
3119
3120                    return Err(Error::FrameUnexpected);
3121                }
3122
3123                // TODO: we only implement this if we implement server push
3124            },
3125
3126            frame::Frame::Unknown { .. } => (),
3127        }
3128
3129        Err(Error::Done)
3130    }
3131
3132    /// Collects and returns statistics about the connection.
3133    #[inline]
3134    pub fn stats(&self) -> Stats {
3135        Stats {
3136            qpack_encoder_stream_recv_bytes: self
3137                .peer_qpack_streams
3138                .encoder_stream_bytes,
3139            qpack_decoder_stream_recv_bytes: self
3140                .peer_qpack_streams
3141                .decoder_stream_bytes,
3142        }
3143    }
3144}
3145
3146/// Generates an HTTP/3 GREASE variable length integer.
3147pub fn grease_value() -> u64 {
3148    let n = super::rand::rand_u64_uniform(148_764_065_110_560_899);
3149    31 * n + 33
3150}
3151
3152#[doc(hidden)]
3153#[cfg(any(test, feature = "internal"))]
3154pub mod testing {
3155    use super::*;
3156
3157    use crate::test_utils;
3158    use crate::DefaultBufFactory;
3159
3160    /// Session is an HTTP/3 test helper structure. It holds a client, server
3161    /// and pipe that allows them to communicate.
3162    ///
3163    /// `default()` creates a session with some sensible default
3164    /// configuration. `with_configs()` allows for providing a specific
3165    /// configuration.
3166    ///
3167    /// `handshake()` performs all the steps needed to establish an HTTP/3
3168    /// connection.
3169    ///
3170    /// Some utility functions are provided that make it less verbose to send
3171    /// request, responses and individual headers. The full quiche API remains
3172    /// available for any test that need to do unconventional things (such as
3173    /// bad behaviour that triggers errors).
3174    pub struct Session<F = DefaultBufFactory>
3175    where
3176        F: BufFactory,
3177    {
3178        pub pipe: test_utils::Pipe<F>,
3179        pub client: Connection,
3180        pub server: Connection,
3181    }
3182
3183    impl Session {
3184        pub fn new() -> Result<Session> {
3185            Session::<DefaultBufFactory>::new_with_buf()
3186        }
3187
3188        pub fn with_configs(
3189            config: &mut crate::Config, h3_config: &Config,
3190        ) -> Result<Session> {
3191            Session::<DefaultBufFactory>::with_configs_and_buf(config, h3_config)
3192        }
3193    }
3194
3195    impl<F: BufFactory> Session<F> {
3196        pub fn new_with_buf() -> Result<Session<F>> {
3197            fn path_relative_to_manifest_dir(path: &str) -> String {
3198                std::fs::canonicalize(
3199                    std::path::Path::new(env!("CARGO_MANIFEST_DIR")).join(path),
3200                )
3201                .unwrap()
3202                .to_string_lossy()
3203                .into_owned()
3204            }
3205
3206            let mut config = crate::Config::new(crate::PROTOCOL_VERSION)?;
3207            config.load_cert_chain_from_pem_file(
3208                &path_relative_to_manifest_dir("examples/cert.crt"),
3209            )?;
3210            config.load_priv_key_from_pem_file(
3211                &path_relative_to_manifest_dir("examples/cert.key"),
3212            )?;
3213            config.set_application_protos(&[b"h3"])?;
3214            config.set_initial_max_data(1500);
3215            config.set_initial_max_stream_data_bidi_local(150);
3216            config.set_initial_max_stream_data_bidi_remote(150);
3217            config.set_initial_max_stream_data_uni(150);
3218            config.set_initial_max_streams_bidi(5);
3219            config.set_initial_max_streams_uni(5);
3220            config.verify_peer(false);
3221            config.enable_dgram(true, 3, 3);
3222            config.set_ack_delay_exponent(8);
3223
3224            let h3_config = Config::new()?;
3225            Session::with_configs_and_buf(&mut config, &h3_config)
3226        }
3227
3228        pub fn with_configs_and_buf(
3229            config: &mut crate::Config, h3_config: &Config,
3230        ) -> Result<Session<F>> {
3231            let pipe = test_utils::Pipe::with_config_and_buf(config)?;
3232            let client_dgram = pipe.client.dgram_enabled();
3233            let server_dgram = pipe.server.dgram_enabled();
3234            Ok(Session {
3235                pipe,
3236                client: Connection::new(h3_config, false, client_dgram)?,
3237                server: Connection::new(h3_config, true, server_dgram)?,
3238            })
3239        }
3240
3241        /// Do the HTTP/3 handshake so both ends are in sane initial state.
3242        pub fn handshake(&mut self) -> Result<()> {
3243            self.pipe.handshake()?;
3244
3245            // Client streams.
3246            self.client.send_settings(&mut self.pipe.client)?;
3247            self.pipe.advance().ok();
3248
3249            self.client
3250                .open_qpack_encoder_stream(&mut self.pipe.client)?;
3251            self.pipe.advance().ok();
3252
3253            self.client
3254                .open_qpack_decoder_stream(&mut self.pipe.client)?;
3255            self.pipe.advance().ok();
3256
3257            if self.pipe.client.grease {
3258                self.client.open_grease_stream(&mut self.pipe.client)?;
3259            }
3260
3261            self.pipe.advance().ok();
3262
3263            // Server streams.
3264            self.server.send_settings(&mut self.pipe.server)?;
3265            self.pipe.advance().ok();
3266
3267            self.server
3268                .open_qpack_encoder_stream(&mut self.pipe.server)?;
3269            self.pipe.advance().ok();
3270
3271            self.server
3272                .open_qpack_decoder_stream(&mut self.pipe.server)?;
3273            self.pipe.advance().ok();
3274
3275            if self.pipe.server.grease {
3276                self.server.open_grease_stream(&mut self.pipe.server)?;
3277            }
3278
3279            self.advance().ok();
3280
3281            while self.client.poll(&mut self.pipe.client).is_ok() {
3282                // Do nothing.
3283            }
3284
3285            while self.server.poll(&mut self.pipe.server).is_ok() {
3286                // Do nothing.
3287            }
3288
3289            Ok(())
3290        }
3291
3292        /// Advances the session pipe over the buffer.
3293        pub fn advance(&mut self) -> crate::Result<()> {
3294            self.pipe.advance()
3295        }
3296
3297        /// Polls the client for events.
3298        pub fn poll_client(&mut self) -> Result<(u64, Event)> {
3299            self.client.poll(&mut self.pipe.client)
3300        }
3301
3302        /// Polls the server for events.
3303        pub fn poll_server(&mut self) -> Result<(u64, Event)> {
3304            self.server.poll(&mut self.pipe.server)
3305        }
3306
3307        /// Sends a request from client with default headers.
3308        ///
3309        /// On success it returns the newly allocated stream and the headers.
3310        pub fn send_request(&mut self, fin: bool) -> Result<(u64, Vec<Header>)> {
3311            let req = vec![
3312                Header::new(b":method", b"GET"),
3313                Header::new(b":scheme", b"https"),
3314                Header::new(b":authority", b"quic.tech"),
3315                Header::new(b":path", b"/test"),
3316                Header::new(b"user-agent", b"quiche-test"),
3317            ];
3318
3319            let stream =
3320                self.client.send_request(&mut self.pipe.client, &req, fin)?;
3321
3322            self.advance().ok();
3323
3324            Ok((stream, req))
3325        }
3326
3327        /// Sends a response from server with default headers.
3328        ///
3329        /// On success it returns the headers.
3330        pub fn send_response(
3331            &mut self, stream: u64, fin: bool,
3332        ) -> Result<Vec<Header>> {
3333            let resp = vec![
3334                Header::new(b":status", b"200"),
3335                Header::new(b"server", b"quiche-test"),
3336            ];
3337
3338            self.server.send_response(
3339                &mut self.pipe.server,
3340                stream,
3341                &resp,
3342                fin,
3343            )?;
3344
3345            self.advance().ok();
3346
3347            Ok(resp)
3348        }
3349
3350        /// Sends some default payload from client.
3351        ///
3352        /// On success it returns the payload.
3353        pub fn send_body_client(
3354            &mut self, stream: u64, fin: bool,
3355        ) -> Result<Vec<u8>> {
3356            let bytes = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
3357
3358            self.client
3359                .send_body(&mut self.pipe.client, stream, &bytes, fin)?;
3360
3361            self.advance().ok();
3362
3363            Ok(bytes)
3364        }
3365
3366        /// Fetches DATA payload from the server.
3367        ///
3368        /// On success it returns the number of bytes received.
3369        pub fn recv_body_client(
3370            &mut self, stream: u64, buf: &mut [u8],
3371        ) -> Result<usize> {
3372            self.client.recv_body(&mut self.pipe.client, stream, buf)
3373        }
3374
3375        /// Sends some default payload from server.
3376        ///
3377        /// On success it returns the payload.
3378        pub fn send_body_server(
3379            &mut self, stream: u64, fin: bool,
3380        ) -> Result<Vec<u8>> {
3381            let bytes = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
3382
3383            self.server
3384                .send_body(&mut self.pipe.server, stream, &bytes, fin)?;
3385
3386            self.advance().ok();
3387
3388            Ok(bytes)
3389        }
3390
3391        /// Fetches DATA payload from the client.
3392        ///
3393        /// On success it returns the number of bytes received.
3394        pub fn recv_body_server(
3395            &mut self, stream: u64, buf: &mut [u8],
3396        ) -> Result<usize> {
3397            self.server.recv_body(&mut self.pipe.server, stream, buf)
3398        }
3399
3400        /// Sends a single HTTP/3 frame from the client.
3401        pub fn send_frame_client(
3402            &mut self, frame: frame::Frame, stream_id: u64, fin: bool,
3403        ) -> Result<()> {
3404            let mut d = [42; 65535];
3405
3406            let mut b = octets::OctetsMut::with_slice(&mut d);
3407
3408            frame.to_bytes(&mut b)?;
3409
3410            let off = b.off();
3411            self.pipe.client.stream_send(stream_id, &d[..off], fin)?;
3412
3413            self.advance().ok();
3414
3415            Ok(())
3416        }
3417
3418        /// Send an HTTP/3 DATAGRAM with default data from the client.
3419        ///
3420        /// On success it returns the data.
3421        pub fn send_dgram_client(&mut self, flow_id: u64) -> Result<Vec<u8>> {
3422            let bytes = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
3423            let len = octets::varint_len(flow_id) + bytes.len();
3424            let mut d = vec![0; len];
3425            let mut b = octets::OctetsMut::with_slice(&mut d);
3426
3427            b.put_varint(flow_id)?;
3428            b.put_bytes(&bytes)?;
3429
3430            self.pipe.client.dgram_send(&d)?;
3431
3432            self.advance().ok();
3433
3434            Ok(bytes)
3435        }
3436
3437        /// Receives an HTTP/3 DATAGRAM from the server.
3438        ///
3439        /// On success it returns the DATAGRAM length, flow ID and flow ID
3440        /// length.
3441        pub fn recv_dgram_client(
3442            &mut self, buf: &mut [u8],
3443        ) -> Result<(usize, u64, usize)> {
3444            let len = self.pipe.client.dgram_recv(buf)?;
3445            let mut b = octets::Octets::with_slice(buf);
3446            let flow_id = b.get_varint()?;
3447
3448            Ok((len, flow_id, b.off()))
3449        }
3450
3451        /// Send an HTTP/3 DATAGRAM with default data from the server
3452        ///
3453        /// On success it returns the data.
3454        pub fn send_dgram_server(&mut self, flow_id: u64) -> Result<Vec<u8>> {
3455            let bytes = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
3456            let len = octets::varint_len(flow_id) + bytes.len();
3457            let mut d = vec![0; len];
3458            let mut b = octets::OctetsMut::with_slice(&mut d);
3459
3460            b.put_varint(flow_id)?;
3461            b.put_bytes(&bytes)?;
3462
3463            self.pipe.server.dgram_send(&d)?;
3464
3465            self.advance().ok();
3466
3467            Ok(bytes)
3468        }
3469
3470        /// Receives an HTTP/3 DATAGRAM from the client.
3471        ///
3472        /// On success it returns the DATAGRAM length, flow ID and flow ID
3473        /// length.
3474        pub fn recv_dgram_server(
3475            &mut self, buf: &mut [u8],
3476        ) -> Result<(usize, u64, usize)> {
3477            let len = self.pipe.server.dgram_recv(buf)?;
3478            let mut b = octets::Octets::with_slice(buf);
3479            let flow_id = b.get_varint()?;
3480
3481            Ok((len, flow_id, b.off()))
3482        }
3483
3484        /// Sends a single HTTP/3 frame from the server.
3485        pub fn send_frame_server(
3486            &mut self, frame: frame::Frame, stream_id: u64, fin: bool,
3487        ) -> Result<()> {
3488            let mut d = [42; 65535];
3489
3490            let mut b = octets::OctetsMut::with_slice(&mut d);
3491
3492            frame.to_bytes(&mut b)?;
3493
3494            let off = b.off();
3495            self.pipe.server.stream_send(stream_id, &d[..off], fin)?;
3496
3497            self.advance().ok();
3498
3499            Ok(())
3500        }
3501
3502        /// Sends an arbitrary buffer of HTTP/3 stream data from the client.
3503        pub fn send_arbitrary_stream_data_client(
3504            &mut self, data: &[u8], stream_id: u64, fin: bool,
3505        ) -> Result<()> {
3506            self.pipe.client.stream_send(stream_id, data, fin)?;
3507
3508            self.advance().ok();
3509
3510            Ok(())
3511        }
3512
3513        /// Sends an arbitrary buffer of HTTP/3 stream data from the server.
3514        pub fn send_arbitrary_stream_data_server(
3515            &mut self, data: &[u8], stream_id: u64, fin: bool,
3516        ) -> Result<()> {
3517            self.pipe.server.stream_send(stream_id, data, fin)?;
3518
3519            self.advance().ok();
3520
3521            Ok(())
3522        }
3523    }
3524}
3525
3526#[cfg(test)]
3527mod tests {
3528    use super::*;
3529
3530    use super::testing::*;
3531
3532    #[test]
3533    /// Make sure that random GREASE values is within the specified limit.
3534    fn grease_value_in_varint_limit() {
3535        assert!(grease_value() < 2u64.pow(62) - 1);
3536    }
3537
3538    #[cfg(not(feature = "openssl"))] // 0-RTT not supported when using openssl/quictls
3539    #[test]
3540    fn h3_handshake_0rtt() {
3541        let mut buf = [0; 65535];
3542
3543        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
3544        config
3545            .load_cert_chain_from_pem_file("examples/cert.crt")
3546            .unwrap();
3547        config
3548            .load_priv_key_from_pem_file("examples/cert.key")
3549            .unwrap();
3550        config
3551            .set_application_protos(&[b"proto1", b"proto2"])
3552            .unwrap();
3553        config.set_initial_max_data(30);
3554        config.set_initial_max_stream_data_bidi_local(15);
3555        config.set_initial_max_stream_data_bidi_remote(15);
3556        config.set_initial_max_stream_data_uni(15);
3557        config.set_initial_max_streams_bidi(3);
3558        config.set_initial_max_streams_uni(3);
3559        config.enable_early_data();
3560        config.verify_peer(false);
3561
3562        let h3_config = Config::new().unwrap();
3563
3564        // Perform initial handshake.
3565        let mut pipe = crate::test_utils::Pipe::with_config(&mut config).unwrap();
3566        assert_eq!(pipe.handshake(), Ok(()));
3567
3568        // Extract session,
3569        let session = pipe.client.session().unwrap();
3570
3571        // Configure session on new connection.
3572        let mut pipe = crate::test_utils::Pipe::with_config(&mut config).unwrap();
3573        assert_eq!(pipe.client.set_session(session), Ok(()));
3574
3575        // Can't create an H3 connection until the QUIC connection is determined
3576        // to have made sufficient early data progress.
3577        assert!(matches!(
3578            Connection::with_transport(&mut pipe.client, &h3_config),
3579            Err(Error::InternalError)
3580        ));
3581
3582        // Client sends initial flight.
3583        let (len, _) = pipe.client.send(&mut buf).unwrap();
3584
3585        // Now an H3 connection can be created.
3586        assert!(Connection::with_transport(&mut pipe.client, &h3_config).is_ok());
3587        assert_eq!(pipe.server_recv(&mut buf[..len]), Ok(len));
3588
3589        // Client sends 0-RTT packet.
3590        let pkt_type = crate::packet::Type::ZeroRTT;
3591
3592        let frames = [crate::frame::Frame::Stream {
3593            stream_id: 6,
3594            data: <crate::range_buf::RangeBuf>::from(b"aaaaa", 0, true),
3595        }];
3596
3597        assert_eq!(
3598            pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
3599            Ok(1200)
3600        );
3601
3602        assert_eq!(pipe.server.undecryptable_pkts.len(), 0);
3603
3604        // 0-RTT stream data is readable.
3605        let mut r = pipe.server.readable();
3606        assert_eq!(r.next(), Some(6));
3607        assert_eq!(r.next(), None);
3608
3609        let mut b = [0; 15];
3610        assert_eq!(pipe.server.stream_recv(6, &mut b), Ok((5, true)));
3611        assert_eq!(&b[..5], b"aaaaa");
3612    }
3613
3614    #[test]
3615    /// Send a request with no body, get a response with no body.
3616    fn request_no_body_response_no_body() {
3617        let mut s = Session::new().unwrap();
3618        s.handshake().unwrap();
3619
3620        let (stream, req) = s.send_request(true).unwrap();
3621
3622        assert_eq!(stream, 0);
3623
3624        let ev_headers = Event::Headers {
3625            list: req,
3626            more_frames: false,
3627        };
3628
3629        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
3630        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
3631
3632        let resp = s.send_response(stream, true).unwrap();
3633
3634        let ev_headers = Event::Headers {
3635            list: resp,
3636            more_frames: false,
3637        };
3638
3639        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
3640        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
3641        assert_eq!(s.poll_client(), Err(Error::Done));
3642    }
3643
3644    #[test]
3645    /// Send a request with no body, get a response with one DATA frame.
3646    fn request_no_body_response_one_chunk() {
3647        let mut s = Session::new().unwrap();
3648        s.handshake().unwrap();
3649
3650        let (stream, req) = s.send_request(true).unwrap();
3651        assert_eq!(stream, 0);
3652
3653        let ev_headers = Event::Headers {
3654            list: req,
3655            more_frames: false,
3656        };
3657
3658        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
3659
3660        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
3661
3662        let resp = s.send_response(stream, false).unwrap();
3663
3664        let body = s.send_body_server(stream, true).unwrap();
3665
3666        let mut recv_buf = vec![0; body.len()];
3667
3668        let ev_headers = Event::Headers {
3669            list: resp,
3670            more_frames: true,
3671        };
3672
3673        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
3674
3675        assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
3676        assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(body.len()));
3677
3678        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
3679        assert_eq!(s.poll_client(), Err(Error::Done));
3680    }
3681
3682    #[test]
3683    /// Send a request with no body, get a response with multiple DATA frames.
3684    fn request_no_body_response_many_chunks() {
3685        let mut s = Session::new().unwrap();
3686        s.handshake().unwrap();
3687
3688        let (stream, req) = s.send_request(true).unwrap();
3689
3690        let ev_headers = Event::Headers {
3691            list: req,
3692            more_frames: false,
3693        };
3694
3695        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
3696        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
3697
3698        let total_data_frames = 4;
3699
3700        let resp = s.send_response(stream, false).unwrap();
3701
3702        for _ in 0..total_data_frames - 1 {
3703            s.send_body_server(stream, false).unwrap();
3704        }
3705
3706        let body = s.send_body_server(stream, true).unwrap();
3707
3708        let mut recv_buf = vec![0; body.len()];
3709
3710        let ev_headers = Event::Headers {
3711            list: resp,
3712            more_frames: true,
3713        };
3714
3715        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
3716        assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
3717        assert_eq!(s.poll_client(), Err(Error::Done));
3718
3719        for _ in 0..total_data_frames {
3720            assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(body.len()));
3721        }
3722
3723        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
3724        assert_eq!(s.poll_client(), Err(Error::Done));
3725    }
3726
3727    #[test]
3728    /// Send a request with one DATA frame, get a response with no body.
3729    fn request_one_chunk_response_no_body() {
3730        let mut s = Session::new().unwrap();
3731        s.handshake().unwrap();
3732
3733        let (stream, req) = s.send_request(false).unwrap();
3734
3735        let body = s.send_body_client(stream, true).unwrap();
3736
3737        let mut recv_buf = vec![0; body.len()];
3738
3739        let ev_headers = Event::Headers {
3740            list: req,
3741            more_frames: true,
3742        };
3743
3744        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
3745
3746        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
3747        assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
3748
3749        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
3750
3751        let resp = s.send_response(stream, true).unwrap();
3752
3753        let ev_headers = Event::Headers {
3754            list: resp,
3755            more_frames: false,
3756        };
3757
3758        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
3759        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
3760    }
3761
3762    #[test]
3763    /// Send a request with multiple DATA frames, get a response with no body.
3764    fn request_many_chunks_response_no_body() {
3765        let mut s = Session::new().unwrap();
3766        s.handshake().unwrap();
3767
3768        let (stream, req) = s.send_request(false).unwrap();
3769
3770        let total_data_frames = 4;
3771
3772        for _ in 0..total_data_frames - 1 {
3773            s.send_body_client(stream, false).unwrap();
3774        }
3775
3776        let body = s.send_body_client(stream, true).unwrap();
3777
3778        let mut recv_buf = vec![0; body.len()];
3779
3780        let ev_headers = Event::Headers {
3781            list: req,
3782            more_frames: true,
3783        };
3784
3785        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
3786        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
3787        assert_eq!(s.poll_server(), Err(Error::Done));
3788
3789        for _ in 0..total_data_frames {
3790            assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
3791        }
3792
3793        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
3794
3795        let resp = s.send_response(stream, true).unwrap();
3796
3797        let ev_headers = Event::Headers {
3798            list: resp,
3799            more_frames: false,
3800        };
3801
3802        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
3803        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
3804    }
3805
3806    #[test]
3807    /// Send a request with multiple DATA frames, get a response with one DATA
3808    /// frame.
3809    fn many_requests_many_chunks_response_one_chunk() {
3810        let mut s = Session::new().unwrap();
3811        s.handshake().unwrap();
3812
3813        let mut reqs = Vec::new();
3814
3815        let (stream1, req1) = s.send_request(false).unwrap();
3816        assert_eq!(stream1, 0);
3817        reqs.push(req1);
3818
3819        let (stream2, req2) = s.send_request(false).unwrap();
3820        assert_eq!(stream2, 4);
3821        reqs.push(req2);
3822
3823        let (stream3, req3) = s.send_request(false).unwrap();
3824        assert_eq!(stream3, 8);
3825        reqs.push(req3);
3826
3827        let body = s.send_body_client(stream1, false).unwrap();
3828        s.send_body_client(stream2, false).unwrap();
3829        s.send_body_client(stream3, false).unwrap();
3830
3831        let mut recv_buf = vec![0; body.len()];
3832
3833        // Reverse order of writes.
3834
3835        s.send_body_client(stream3, true).unwrap();
3836        s.send_body_client(stream2, true).unwrap();
3837        s.send_body_client(stream1, true).unwrap();
3838
3839        let (_, ev) = s.poll_server().unwrap();
3840        let ev_headers = Event::Headers {
3841            list: reqs[0].clone(),
3842            more_frames: true,
3843        };
3844        assert_eq!(ev, ev_headers);
3845
3846        let (_, ev) = s.poll_server().unwrap();
3847        let ev_headers = Event::Headers {
3848            list: reqs[1].clone(),
3849            more_frames: true,
3850        };
3851        assert_eq!(ev, ev_headers);
3852
3853        let (_, ev) = s.poll_server().unwrap();
3854        let ev_headers = Event::Headers {
3855            list: reqs[2].clone(),
3856            more_frames: true,
3857        };
3858        assert_eq!(ev, ev_headers);
3859
3860        assert_eq!(s.poll_server(), Ok((0, Event::Data)));
3861        assert_eq!(s.recv_body_server(0, &mut recv_buf), Ok(body.len()));
3862        assert_eq!(s.poll_client(), Err(Error::Done));
3863        assert_eq!(s.recv_body_server(0, &mut recv_buf), Ok(body.len()));
3864        assert_eq!(s.poll_server(), Ok((0, Event::Finished)));
3865
3866        assert_eq!(s.poll_server(), Ok((4, Event::Data)));
3867        assert_eq!(s.recv_body_server(4, &mut recv_buf), Ok(body.len()));
3868        assert_eq!(s.poll_client(), Err(Error::Done));
3869        assert_eq!(s.recv_body_server(4, &mut recv_buf), Ok(body.len()));
3870        assert_eq!(s.poll_server(), Ok((4, Event::Finished)));
3871
3872        assert_eq!(s.poll_server(), Ok((8, Event::Data)));
3873        assert_eq!(s.recv_body_server(8, &mut recv_buf), Ok(body.len()));
3874        assert_eq!(s.poll_client(), Err(Error::Done));
3875        assert_eq!(s.recv_body_server(8, &mut recv_buf), Ok(body.len()));
3876        assert_eq!(s.poll_server(), Ok((8, Event::Finished)));
3877
3878        assert_eq!(s.poll_server(), Err(Error::Done));
3879
3880        let mut resps = Vec::new();
3881
3882        let resp1 = s.send_response(stream1, true).unwrap();
3883        resps.push(resp1);
3884
3885        let resp2 = s.send_response(stream2, true).unwrap();
3886        resps.push(resp2);
3887
3888        let resp3 = s.send_response(stream3, true).unwrap();
3889        resps.push(resp3);
3890
3891        for _ in 0..resps.len() {
3892            let (stream, ev) = s.poll_client().unwrap();
3893            let ev_headers = Event::Headers {
3894                list: resps[(stream / 4) as usize].clone(),
3895                more_frames: false,
3896            };
3897            assert_eq!(ev, ev_headers);
3898            assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
3899        }
3900
3901        assert_eq!(s.poll_client(), Err(Error::Done));
3902    }
3903
3904    #[test]
3905    /// Send a request with no body, get a response with one DATA frame and an
3906    /// empty FIN after reception from the client.
3907    fn request_no_body_response_one_chunk_empty_fin() {
3908        let mut s = Session::new().unwrap();
3909        s.handshake().unwrap();
3910
3911        let (stream, req) = s.send_request(true).unwrap();
3912
3913        let ev_headers = Event::Headers {
3914            list: req,
3915            more_frames: false,
3916        };
3917
3918        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
3919        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
3920
3921        let resp = s.send_response(stream, false).unwrap();
3922
3923        let body = s.send_body_server(stream, false).unwrap();
3924
3925        let mut recv_buf = vec![0; body.len()];
3926
3927        let ev_headers = Event::Headers {
3928            list: resp,
3929            more_frames: true,
3930        };
3931
3932        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
3933
3934        assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
3935        assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(body.len()));
3936
3937        assert_eq!(s.pipe.server.stream_send(stream, &[], true), Ok(0));
3938        s.advance().ok();
3939
3940        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
3941        assert_eq!(s.poll_client(), Err(Error::Done));
3942    }
3943
3944    #[test]
3945    /// Send a request with no body, get a response with no body followed by
3946    /// GREASE that is STREAM frame with a FIN.
3947    fn request_no_body_response_no_body_with_grease() {
3948        let mut s = Session::new().unwrap();
3949        s.handshake().unwrap();
3950
3951        let (stream, req) = s.send_request(true).unwrap();
3952
3953        assert_eq!(stream, 0);
3954
3955        let ev_headers = Event::Headers {
3956            list: req,
3957            more_frames: false,
3958        };
3959
3960        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
3961        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
3962
3963        let resp = s.send_response(stream, false).unwrap();
3964
3965        let ev_headers = Event::Headers {
3966            list: resp,
3967            more_frames: true,
3968        };
3969
3970        // Inject a GREASE frame
3971        let mut d = [42; 10];
3972        let mut b = octets::OctetsMut::with_slice(&mut d);
3973
3974        let frame_type = b.put_varint(148_764_065_110_560_899).unwrap();
3975        s.pipe.server.stream_send(0, frame_type, false).unwrap();
3976
3977        let frame_len = b.put_varint(10).unwrap();
3978        s.pipe.server.stream_send(0, frame_len, false).unwrap();
3979
3980        s.pipe.server.stream_send(0, &d, true).unwrap();
3981
3982        s.advance().ok();
3983
3984        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
3985        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
3986        assert_eq!(s.poll_client(), Err(Error::Done));
3987    }
3988
3989    #[test]
3990    /// Try to send DATA frames before HEADERS.
3991    fn body_response_before_headers() {
3992        let mut s = Session::new().unwrap();
3993        s.handshake().unwrap();
3994
3995        let (stream, req) = s.send_request(true).unwrap();
3996        assert_eq!(stream, 0);
3997
3998        let ev_headers = Event::Headers {
3999            list: req,
4000            more_frames: false,
4001        };
4002
4003        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4004
4005        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
4006
4007        assert_eq!(
4008            s.send_body_server(stream, true),
4009            Err(Error::FrameUnexpected)
4010        );
4011
4012        assert_eq!(s.poll_client(), Err(Error::Done));
4013    }
4014
4015    #[test]
4016    /// Try to send DATA frames on wrong streams, ensure the API returns an
4017    /// error before anything hits the transport layer.
4018    fn send_body_invalid_client_stream() {
4019        let mut s = Session::new().unwrap();
4020        s.handshake().unwrap();
4021
4022        assert_eq!(s.send_body_client(0, true), Err(Error::FrameUnexpected));
4023
4024        assert_eq!(
4025            s.send_body_client(s.client.control_stream_id.unwrap(), true),
4026            Err(Error::FrameUnexpected)
4027        );
4028
4029        assert_eq!(
4030            s.send_body_client(
4031                s.client.local_qpack_streams.encoder_stream_id.unwrap(),
4032                true
4033            ),
4034            Err(Error::FrameUnexpected)
4035        );
4036
4037        assert_eq!(
4038            s.send_body_client(
4039                s.client.local_qpack_streams.decoder_stream_id.unwrap(),
4040                true
4041            ),
4042            Err(Error::FrameUnexpected)
4043        );
4044
4045        assert_eq!(
4046            s.send_body_client(s.client.peer_control_stream_id.unwrap(), true),
4047            Err(Error::FrameUnexpected)
4048        );
4049
4050        assert_eq!(
4051            s.send_body_client(
4052                s.client.peer_qpack_streams.encoder_stream_id.unwrap(),
4053                true
4054            ),
4055            Err(Error::FrameUnexpected)
4056        );
4057
4058        assert_eq!(
4059            s.send_body_client(
4060                s.client.peer_qpack_streams.decoder_stream_id.unwrap(),
4061                true
4062            ),
4063            Err(Error::FrameUnexpected)
4064        );
4065    }
4066
4067    #[test]
4068    /// Try to send DATA frames on wrong streams, ensure the API returns an
4069    /// error before anything hits the transport layer.
4070    fn send_body_invalid_server_stream() {
4071        let mut s = Session::new().unwrap();
4072        s.handshake().unwrap();
4073
4074        assert_eq!(s.send_body_server(0, true), Err(Error::FrameUnexpected));
4075
4076        assert_eq!(
4077            s.send_body_server(s.server.control_stream_id.unwrap(), true),
4078            Err(Error::FrameUnexpected)
4079        );
4080
4081        assert_eq!(
4082            s.send_body_server(
4083                s.server.local_qpack_streams.encoder_stream_id.unwrap(),
4084                true
4085            ),
4086            Err(Error::FrameUnexpected)
4087        );
4088
4089        assert_eq!(
4090            s.send_body_server(
4091                s.server.local_qpack_streams.decoder_stream_id.unwrap(),
4092                true
4093            ),
4094            Err(Error::FrameUnexpected)
4095        );
4096
4097        assert_eq!(
4098            s.send_body_server(s.server.peer_control_stream_id.unwrap(), true),
4099            Err(Error::FrameUnexpected)
4100        );
4101
4102        assert_eq!(
4103            s.send_body_server(
4104                s.server.peer_qpack_streams.encoder_stream_id.unwrap(),
4105                true
4106            ),
4107            Err(Error::FrameUnexpected)
4108        );
4109
4110        assert_eq!(
4111            s.send_body_server(
4112                s.server.peer_qpack_streams.decoder_stream_id.unwrap(),
4113                true
4114            ),
4115            Err(Error::FrameUnexpected)
4116        );
4117    }
4118
4119    #[test]
4120    /// Client sends request with body and trailers.
4121    fn trailers() {
4122        let mut s = Session::new().unwrap();
4123        s.handshake().unwrap();
4124
4125        let (stream, req) = s.send_request(false).unwrap();
4126
4127        let body = s.send_body_client(stream, false).unwrap();
4128
4129        let mut recv_buf = vec![0; body.len()];
4130
4131        let req_trailers = vec![Header::new(b"foo", b"bar")];
4132
4133        s.client
4134            .send_additional_headers(
4135                &mut s.pipe.client,
4136                stream,
4137                &req_trailers,
4138                true,
4139                true,
4140            )
4141            .unwrap();
4142
4143        s.advance().ok();
4144
4145        let ev_headers = Event::Headers {
4146            list: req,
4147            more_frames: true,
4148        };
4149
4150        let ev_trailers = Event::Headers {
4151            list: req_trailers,
4152            more_frames: false,
4153        };
4154
4155        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4156
4157        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
4158        assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
4159
4160        assert_eq!(s.poll_server(), Ok((stream, ev_trailers)));
4161        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
4162    }
4163
4164    #[test]
4165    /// Server responds with a 103, then a 200 with no body.
4166    fn informational_response() {
4167        let mut s = Session::new().unwrap();
4168        s.handshake().unwrap();
4169
4170        let (stream, req) = s.send_request(true).unwrap();
4171
4172        assert_eq!(stream, 0);
4173
4174        let ev_headers = Event::Headers {
4175            list: req,
4176            more_frames: false,
4177        };
4178
4179        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4180        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
4181
4182        let info_resp = vec![
4183            Header::new(b":status", b"103"),
4184            Header::new(b"link", b"<https://example.com>; rel=\"preconnect\""),
4185        ];
4186
4187        let resp = vec![
4188            Header::new(b":status", b"200"),
4189            Header::new(b"server", b"quiche-test"),
4190        ];
4191
4192        s.server
4193            .send_response(&mut s.pipe.server, stream, &info_resp, false)
4194            .unwrap();
4195
4196        s.server
4197            .send_additional_headers(
4198                &mut s.pipe.server,
4199                stream,
4200                &resp,
4201                false,
4202                true,
4203            )
4204            .unwrap();
4205
4206        s.advance().ok();
4207
4208        let ev_info_headers = Event::Headers {
4209            list: info_resp,
4210            more_frames: true,
4211        };
4212
4213        let ev_headers = Event::Headers {
4214            list: resp,
4215            more_frames: false,
4216        };
4217
4218        assert_eq!(s.poll_client(), Ok((stream, ev_info_headers)));
4219        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
4220        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
4221        assert_eq!(s.poll_client(), Err(Error::Done));
4222    }
4223
4224    #[test]
4225    /// Server responds with a 103, then attempts to send a 200 using
4226    /// send_response again, which should fail.
4227    fn no_multiple_response() {
4228        let mut s = Session::new().unwrap();
4229        s.handshake().unwrap();
4230
4231        let (stream, req) = s.send_request(true).unwrap();
4232
4233        assert_eq!(stream, 0);
4234
4235        let ev_headers = Event::Headers {
4236            list: req,
4237            more_frames: false,
4238        };
4239
4240        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4241        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
4242
4243        let info_resp = vec![
4244            Header::new(b":status", b"103"),
4245            Header::new(b"link", b"<https://example.com>; rel=\"preconnect\""),
4246        ];
4247
4248        let resp = vec![
4249            Header::new(b":status", b"200"),
4250            Header::new(b"server", b"quiche-test"),
4251        ];
4252
4253        s.server
4254            .send_response(&mut s.pipe.server, stream, &info_resp, false)
4255            .unwrap();
4256
4257        assert_eq!(
4258            Err(Error::FrameUnexpected),
4259            s.server
4260                .send_response(&mut s.pipe.server, stream, &resp, true)
4261        );
4262
4263        s.advance().ok();
4264
4265        let ev_info_headers = Event::Headers {
4266            list: info_resp,
4267            more_frames: true,
4268        };
4269
4270        assert_eq!(s.poll_client(), Ok((stream, ev_info_headers)));
4271        assert_eq!(s.poll_client(), Err(Error::Done));
4272    }
4273
4274    #[test]
4275    /// Server attempts to use send_additional_headers before initial response.
4276    fn no_send_additional_before_initial_response() {
4277        let mut s = Session::new().unwrap();
4278        s.handshake().unwrap();
4279
4280        let (stream, req) = s.send_request(true).unwrap();
4281
4282        assert_eq!(stream, 0);
4283
4284        let ev_headers = Event::Headers {
4285            list: req,
4286            more_frames: false,
4287        };
4288
4289        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4290        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
4291
4292        let info_resp = vec![
4293            Header::new(b":status", b"103"),
4294            Header::new(b"link", b"<https://example.com>; rel=\"preconnect\""),
4295        ];
4296
4297        assert_eq!(
4298            Err(Error::FrameUnexpected),
4299            s.server.send_additional_headers(
4300                &mut s.pipe.server,
4301                stream,
4302                &info_resp,
4303                false,
4304                false
4305            )
4306        );
4307
4308        s.advance().ok();
4309
4310        assert_eq!(s.poll_client(), Err(Error::Done));
4311    }
4312
4313    #[test]
4314    /// Client sends multiple HEADERS before data.
4315    fn additional_headers_before_data_client() {
4316        let mut s = Session::new().unwrap();
4317        s.handshake().unwrap();
4318
4319        let (stream, req) = s.send_request(false).unwrap();
4320
4321        let req_trailer = vec![Header::new(b"goodbye", b"world")];
4322
4323        assert_eq!(
4324            s.client.send_additional_headers(
4325                &mut s.pipe.client,
4326                stream,
4327                &req_trailer,
4328                true,
4329                false
4330            ),
4331            Ok(())
4332        );
4333
4334        s.advance().ok();
4335
4336        let ev_initial_headers = Event::Headers {
4337            list: req,
4338            more_frames: true,
4339        };
4340
4341        let ev_trailing_headers = Event::Headers {
4342            list: req_trailer,
4343            more_frames: true,
4344        };
4345
4346        assert_eq!(s.poll_server(), Ok((stream, ev_initial_headers)));
4347        assert_eq!(s.poll_server(), Ok((stream, ev_trailing_headers)));
4348        assert_eq!(s.poll_server(), Err(Error::Done));
4349    }
4350
4351    #[test]
4352    /// Client sends multiple HEADERS before data.
4353    fn data_after_trailers_client() {
4354        let mut s = Session::new().unwrap();
4355        s.handshake().unwrap();
4356
4357        let (stream, req) = s.send_request(false).unwrap();
4358
4359        let body = s.send_body_client(stream, false).unwrap();
4360
4361        let mut recv_buf = vec![0; body.len()];
4362
4363        let req_trailers = vec![Header::new(b"foo", b"bar")];
4364
4365        s.client
4366            .send_additional_headers(
4367                &mut s.pipe.client,
4368                stream,
4369                &req_trailers,
4370                true,
4371                false,
4372            )
4373            .unwrap();
4374
4375        s.advance().ok();
4376
4377        s.send_frame_client(
4378            frame::Frame::Data {
4379                payload: vec![1, 2, 3, 4],
4380            },
4381            stream,
4382            true,
4383        )
4384        .unwrap();
4385
4386        let ev_headers = Event::Headers {
4387            list: req,
4388            more_frames: true,
4389        };
4390
4391        let ev_trailers = Event::Headers {
4392            list: req_trailers,
4393            more_frames: true,
4394        };
4395
4396        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4397        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
4398        assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
4399        assert_eq!(s.poll_server(), Ok((stream, ev_trailers)));
4400        assert_eq!(s.poll_server(), Err(Error::FrameUnexpected));
4401    }
4402
4403    #[test]
4404    /// Send a MAX_PUSH_ID frame from the client on a valid stream.
4405    fn max_push_id_from_client_good() {
4406        let mut s = Session::new().unwrap();
4407        s.handshake().unwrap();
4408
4409        s.send_frame_client(
4410            frame::Frame::MaxPushId { push_id: 1 },
4411            s.client.control_stream_id.unwrap(),
4412            false,
4413        )
4414        .unwrap();
4415
4416        assert_eq!(s.poll_server(), Err(Error::Done));
4417    }
4418
4419    #[test]
4420    /// Send a MAX_PUSH_ID frame from the client on an invalid stream.
4421    fn max_push_id_from_client_bad_stream() {
4422        let mut s = Session::new().unwrap();
4423        s.handshake().unwrap();
4424
4425        let (stream, req) = s.send_request(false).unwrap();
4426
4427        s.send_frame_client(
4428            frame::Frame::MaxPushId { push_id: 2 },
4429            stream,
4430            false,
4431        )
4432        .unwrap();
4433
4434        let ev_headers = Event::Headers {
4435            list: req,
4436            more_frames: true,
4437        };
4438
4439        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4440        assert_eq!(s.poll_server(), Err(Error::FrameUnexpected));
4441    }
4442
4443    #[test]
4444    /// Send a sequence of MAX_PUSH_ID frames from the client that attempt to
4445    /// reduce the limit.
4446    fn max_push_id_from_client_limit_reduction() {
4447        let mut s = Session::new().unwrap();
4448        s.handshake().unwrap();
4449
4450        s.send_frame_client(
4451            frame::Frame::MaxPushId { push_id: 2 },
4452            s.client.control_stream_id.unwrap(),
4453            false,
4454        )
4455        .unwrap();
4456
4457        s.send_frame_client(
4458            frame::Frame::MaxPushId { push_id: 1 },
4459            s.client.control_stream_id.unwrap(),
4460            false,
4461        )
4462        .unwrap();
4463
4464        assert_eq!(s.poll_server(), Err(Error::IdError));
4465    }
4466
4467    #[test]
4468    /// Send a MAX_PUSH_ID frame from the server, which is forbidden.
4469    fn max_push_id_from_server() {
4470        let mut s = Session::new().unwrap();
4471        s.handshake().unwrap();
4472
4473        s.send_frame_server(
4474            frame::Frame::MaxPushId { push_id: 1 },
4475            s.server.control_stream_id.unwrap(),
4476            false,
4477        )
4478        .unwrap();
4479
4480        assert_eq!(s.poll_client(), Err(Error::FrameUnexpected));
4481    }
4482
4483    #[test]
4484    /// Send a PUSH_PROMISE frame from the client, which is forbidden.
4485    fn push_promise_from_client() {
4486        let mut s = Session::new().unwrap();
4487        s.handshake().unwrap();
4488
4489        let (stream, req) = s.send_request(false).unwrap();
4490
4491        let header_block = s.client.encode_header_block(&req).unwrap();
4492
4493        s.send_frame_client(
4494            frame::Frame::PushPromise {
4495                push_id: 1,
4496                header_block,
4497            },
4498            stream,
4499            false,
4500        )
4501        .unwrap();
4502
4503        let ev_headers = Event::Headers {
4504            list: req,
4505            more_frames: true,
4506        };
4507
4508        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4509        assert_eq!(s.poll_server(), Err(Error::FrameUnexpected));
4510    }
4511
4512    #[test]
4513    /// Send a CANCEL_PUSH frame from the client.
4514    fn cancel_push_from_client() {
4515        let mut s = Session::new().unwrap();
4516        s.handshake().unwrap();
4517
4518        s.send_frame_client(
4519            frame::Frame::CancelPush { push_id: 1 },
4520            s.client.control_stream_id.unwrap(),
4521            false,
4522        )
4523        .unwrap();
4524
4525        assert_eq!(s.poll_server(), Err(Error::Done));
4526    }
4527
4528    #[test]
4529    /// Send a CANCEL_PUSH frame from the client on an invalid stream.
4530    fn cancel_push_from_client_bad_stream() {
4531        let mut s = Session::new().unwrap();
4532        s.handshake().unwrap();
4533
4534        let (stream, req) = s.send_request(false).unwrap();
4535
4536        s.send_frame_client(
4537            frame::Frame::CancelPush { push_id: 2 },
4538            stream,
4539            false,
4540        )
4541        .unwrap();
4542
4543        let ev_headers = Event::Headers {
4544            list: req,
4545            more_frames: true,
4546        };
4547
4548        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4549        assert_eq!(s.poll_server(), Err(Error::FrameUnexpected));
4550    }
4551
4552    #[test]
4553    /// Send a CANCEL_PUSH frame from the client.
4554    fn cancel_push_from_server() {
4555        let mut s = Session::new().unwrap();
4556        s.handshake().unwrap();
4557
4558        s.send_frame_server(
4559            frame::Frame::CancelPush { push_id: 1 },
4560            s.server.control_stream_id.unwrap(),
4561            false,
4562        )
4563        .unwrap();
4564
4565        assert_eq!(s.poll_client(), Err(Error::Done));
4566    }
4567
4568    #[test]
4569    /// Send a GOAWAY frame from the client.
4570    fn goaway_from_client_good() {
4571        let mut s = Session::new().unwrap();
4572        s.handshake().unwrap();
4573
4574        s.client.send_goaway(&mut s.pipe.client, 100).unwrap();
4575
4576        s.advance().ok();
4577
4578        // TODO: server push
4579        assert_eq!(s.poll_server(), Ok((0, Event::GoAway)));
4580    }
4581
4582    #[test]
4583    /// Send a GOAWAY frame from the server.
4584    fn goaway_from_server_good() {
4585        let mut s = Session::new().unwrap();
4586        s.handshake().unwrap();
4587
4588        s.server.send_goaway(&mut s.pipe.server, 4000).unwrap();
4589
4590        s.advance().ok();
4591
4592        assert_eq!(s.poll_client(), Ok((4000, Event::GoAway)));
4593    }
4594
4595    #[test]
4596    /// A client MUST NOT send a request after it receives GOAWAY.
4597    fn client_request_after_goaway() {
4598        let mut s = Session::new().unwrap();
4599        s.handshake().unwrap();
4600
4601        s.server.send_goaway(&mut s.pipe.server, 4000).unwrap();
4602
4603        s.advance().ok();
4604
4605        assert_eq!(s.poll_client(), Ok((4000, Event::GoAway)));
4606
4607        assert_eq!(s.send_request(true), Err(Error::FrameUnexpected));
4608    }
4609
4610    #[test]
4611    /// Send a GOAWAY frame from the server, using an invalid goaway ID.
4612    fn goaway_from_server_invalid_id() {
4613        let mut s = Session::new().unwrap();
4614        s.handshake().unwrap();
4615
4616        s.send_frame_server(
4617            frame::Frame::GoAway { id: 1 },
4618            s.server.control_stream_id.unwrap(),
4619            false,
4620        )
4621        .unwrap();
4622
4623        assert_eq!(s.poll_client(), Err(Error::IdError));
4624    }
4625
4626    #[test]
4627    /// Send multiple GOAWAY frames from the server, that increase the goaway
4628    /// ID.
4629    fn goaway_from_server_increase_id() {
4630        let mut s = Session::new().unwrap();
4631        s.handshake().unwrap();
4632
4633        s.send_frame_server(
4634            frame::Frame::GoAway { id: 0 },
4635            s.server.control_stream_id.unwrap(),
4636            false,
4637        )
4638        .unwrap();
4639
4640        s.send_frame_server(
4641            frame::Frame::GoAway { id: 4 },
4642            s.server.control_stream_id.unwrap(),
4643            false,
4644        )
4645        .unwrap();
4646
4647        assert_eq!(s.poll_client(), Ok((0, Event::GoAway)));
4648
4649        assert_eq!(s.poll_client(), Err(Error::IdError));
4650    }
4651
4652    #[test]
4653    #[cfg(feature = "sfv")]
4654    fn parse_priority_field_value() {
4655        // Legal dicts
4656        assert_eq!(
4657            Ok(Priority::new(0, false)),
4658            Priority::try_from(b"u=0".as_slice())
4659        );
4660        assert_eq!(
4661            Ok(Priority::new(3, false)),
4662            Priority::try_from(b"u=3".as_slice())
4663        );
4664        assert_eq!(
4665            Ok(Priority::new(7, false)),
4666            Priority::try_from(b"u=7".as_slice())
4667        );
4668
4669        assert_eq!(
4670            Ok(Priority::new(0, true)),
4671            Priority::try_from(b"u=0, i".as_slice())
4672        );
4673        assert_eq!(
4674            Ok(Priority::new(3, true)),
4675            Priority::try_from(b"u=3, i".as_slice())
4676        );
4677        assert_eq!(
4678            Ok(Priority::new(7, true)),
4679            Priority::try_from(b"u=7, i".as_slice())
4680        );
4681
4682        assert_eq!(
4683            Ok(Priority::new(0, true)),
4684            Priority::try_from(b"u=0, i=?1".as_slice())
4685        );
4686        assert_eq!(
4687            Ok(Priority::new(3, true)),
4688            Priority::try_from(b"u=3, i=?1".as_slice())
4689        );
4690        assert_eq!(
4691            Ok(Priority::new(7, true)),
4692            Priority::try_from(b"u=7, i=?1".as_slice())
4693        );
4694
4695        assert_eq!(
4696            Ok(Priority::new(3, false)),
4697            Priority::try_from(b"".as_slice())
4698        );
4699
4700        assert_eq!(
4701            Ok(Priority::new(0, true)),
4702            Priority::try_from(b"u=0;foo, i;bar".as_slice())
4703        );
4704        assert_eq!(
4705            Ok(Priority::new(3, true)),
4706            Priority::try_from(b"u=3;hello, i;world".as_slice())
4707        );
4708        assert_eq!(
4709            Ok(Priority::new(7, true)),
4710            Priority::try_from(b"u=7;croeso, i;gymru".as_slice())
4711        );
4712
4713        assert_eq!(
4714            Ok(Priority::new(0, true)),
4715            Priority::try_from(b"u=0, i, spinaltap=11".as_slice())
4716        );
4717
4718        // Illegal formats
4719        assert_eq!(Err(Error::Done), Priority::try_from(b"0".as_slice()));
4720        assert_eq!(
4721            Ok(Priority::new(7, false)),
4722            Priority::try_from(b"u=-1".as_slice())
4723        );
4724        assert_eq!(Err(Error::Done), Priority::try_from(b"u=0.2".as_slice()));
4725        assert_eq!(
4726            Ok(Priority::new(7, false)),
4727            Priority::try_from(b"u=100".as_slice())
4728        );
4729        assert_eq!(
4730            Err(Error::Done),
4731            Priority::try_from(b"u=3, i=true".as_slice())
4732        );
4733
4734        // Trailing comma in dict is malformed
4735        assert_eq!(Err(Error::Done), Priority::try_from(b"u=7, ".as_slice()));
4736    }
4737
4738    #[test]
4739    /// Send a PRIORITY_UPDATE for request stream from the client.
4740    fn priority_update_request() {
4741        let mut s = Session::new().unwrap();
4742        s.handshake().unwrap();
4743
4744        s.client
4745            .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
4746                urgency: 3,
4747                incremental: false,
4748            })
4749            .unwrap();
4750        s.advance().ok();
4751
4752        assert_eq!(s.poll_server(), Ok((0, Event::PriorityUpdate)));
4753        assert_eq!(s.poll_server(), Err(Error::Done));
4754    }
4755
4756    #[test]
4757    /// Send a PRIORITY_UPDATE for request stream from the client.
4758    fn priority_update_single_stream_rearm() {
4759        let mut s = Session::new().unwrap();
4760        s.handshake().unwrap();
4761
4762        s.client
4763            .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
4764                urgency: 3,
4765                incremental: false,
4766            })
4767            .unwrap();
4768        s.advance().ok();
4769
4770        assert_eq!(s.poll_server(), Ok((0, Event::PriorityUpdate)));
4771        assert_eq!(s.poll_server(), Err(Error::Done));
4772
4773        s.client
4774            .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
4775                urgency: 5,
4776                incremental: false,
4777            })
4778            .unwrap();
4779        s.advance().ok();
4780
4781        assert_eq!(s.poll_server(), Err(Error::Done));
4782
4783        // There is only one PRIORITY_UPDATE frame to read. Once read, the event
4784        // will rearm ready for more.
4785        assert_eq!(s.server.take_last_priority_update(0), Ok(b"u=5".to_vec()));
4786        assert_eq!(s.server.take_last_priority_update(0), Err(Error::Done));
4787
4788        s.client
4789            .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
4790                urgency: 7,
4791                incremental: false,
4792            })
4793            .unwrap();
4794        s.advance().ok();
4795
4796        assert_eq!(s.poll_server(), Ok((0, Event::PriorityUpdate)));
4797        assert_eq!(s.poll_server(), Err(Error::Done));
4798
4799        assert_eq!(s.server.take_last_priority_update(0), Ok(b"u=7".to_vec()));
4800        assert_eq!(s.server.take_last_priority_update(0), Err(Error::Done));
4801    }
4802
4803    #[test]
4804    /// Send multiple PRIORITY_UPDATE frames for different streams from the
4805    /// client across multiple flights of exchange.
4806    fn priority_update_request_multiple_stream_arm_multiple_flights() {
4807        let mut s = Session::new().unwrap();
4808        s.handshake().unwrap();
4809
4810        s.client
4811            .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
4812                urgency: 3,
4813                incremental: false,
4814            })
4815            .unwrap();
4816        s.advance().ok();
4817
4818        assert_eq!(s.poll_server(), Ok((0, Event::PriorityUpdate)));
4819        assert_eq!(s.poll_server(), Err(Error::Done));
4820
4821        s.client
4822            .send_priority_update_for_request(&mut s.pipe.client, 4, &Priority {
4823                urgency: 1,
4824                incremental: false,
4825            })
4826            .unwrap();
4827        s.advance().ok();
4828
4829        assert_eq!(s.poll_server(), Ok((4, Event::PriorityUpdate)));
4830        assert_eq!(s.poll_server(), Err(Error::Done));
4831
4832        s.client
4833            .send_priority_update_for_request(&mut s.pipe.client, 8, &Priority {
4834                urgency: 2,
4835                incremental: false,
4836            })
4837            .unwrap();
4838        s.advance().ok();
4839
4840        assert_eq!(s.poll_server(), Ok((8, Event::PriorityUpdate)));
4841        assert_eq!(s.poll_server(), Err(Error::Done));
4842
4843        assert_eq!(s.server.take_last_priority_update(0), Ok(b"u=3".to_vec()));
4844        assert_eq!(s.server.take_last_priority_update(4), Ok(b"u=1".to_vec()));
4845        assert_eq!(s.server.take_last_priority_update(8), Ok(b"u=2".to_vec()));
4846        assert_eq!(s.server.take_last_priority_update(0), Err(Error::Done));
4847    }
4848
4849    #[test]
4850    /// Send multiple PRIORITY_UPDATE frames for different streams from the
4851    /// client across a single flight.
4852    fn priority_update_request_multiple_stream_arm_single_flight() {
4853        let mut s = Session::new().unwrap();
4854        s.handshake().unwrap();
4855
4856        let mut d = [42; 65535];
4857
4858        let mut b = octets::OctetsMut::with_slice(&mut d);
4859
4860        let p1 = frame::Frame::PriorityUpdateRequest {
4861            prioritized_element_id: 0,
4862            priority_field_value: b"u=3".to_vec(),
4863        };
4864
4865        let p2 = frame::Frame::PriorityUpdateRequest {
4866            prioritized_element_id: 4,
4867            priority_field_value: b"u=3".to_vec(),
4868        };
4869
4870        let p3 = frame::Frame::PriorityUpdateRequest {
4871            prioritized_element_id: 8,
4872            priority_field_value: b"u=3".to_vec(),
4873        };
4874
4875        p1.to_bytes(&mut b).unwrap();
4876        p2.to_bytes(&mut b).unwrap();
4877        p3.to_bytes(&mut b).unwrap();
4878
4879        let off = b.off();
4880        s.pipe
4881            .client
4882            .stream_send(s.client.control_stream_id.unwrap(), &d[..off], false)
4883            .unwrap();
4884
4885        s.advance().ok();
4886
4887        assert_eq!(s.poll_server(), Ok((0, Event::PriorityUpdate)));
4888        assert_eq!(s.poll_server(), Ok((4, Event::PriorityUpdate)));
4889        assert_eq!(s.poll_server(), Ok((8, Event::PriorityUpdate)));
4890        assert_eq!(s.poll_server(), Err(Error::Done));
4891
4892        assert_eq!(s.server.take_last_priority_update(0), Ok(b"u=3".to_vec()));
4893        assert_eq!(s.server.take_last_priority_update(4), Ok(b"u=3".to_vec()));
4894        assert_eq!(s.server.take_last_priority_update(8), Ok(b"u=3".to_vec()));
4895
4896        assert_eq!(s.server.take_last_priority_update(0), Err(Error::Done));
4897    }
4898
4899    #[test]
4900    /// Send a PRIORITY_UPDATE for a request stream, before and after the stream
4901    /// has been completed.
4902    fn priority_update_request_collected_completed() {
4903        let mut s = Session::new().unwrap();
4904        s.handshake().unwrap();
4905
4906        s.client
4907            .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
4908                urgency: 3,
4909                incremental: false,
4910            })
4911            .unwrap();
4912        s.advance().ok();
4913
4914        let (stream, req) = s.send_request(true).unwrap();
4915        let ev_headers = Event::Headers {
4916            list: req,
4917            more_frames: false,
4918        };
4919
4920        // Priority event is generated before request headers.
4921        assert_eq!(s.poll_server(), Ok((0, Event::PriorityUpdate)));
4922        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4923        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
4924        assert_eq!(s.poll_server(), Err(Error::Done));
4925
4926        assert_eq!(s.server.take_last_priority_update(0), Ok(b"u=3".to_vec()));
4927        assert_eq!(s.server.take_last_priority_update(0), Err(Error::Done));
4928
4929        let resp = s.send_response(stream, true).unwrap();
4930
4931        let ev_headers = Event::Headers {
4932            list: resp,
4933            more_frames: false,
4934        };
4935
4936        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
4937        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
4938        assert_eq!(s.poll_client(), Err(Error::Done));
4939
4940        // Now send a PRIORITY_UPDATE for the completed request stream.
4941        s.client
4942            .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
4943                urgency: 3,
4944                incremental: false,
4945            })
4946            .unwrap();
4947        s.advance().ok();
4948
4949        // No event generated at server
4950        assert_eq!(s.poll_server(), Err(Error::Done));
4951    }
4952
4953    #[test]
4954    /// Send a PRIORITY_UPDATE for a request stream, before and after the stream
4955    /// has been stopped.
4956    fn priority_update_request_collected_stopped() {
4957        let mut s = Session::new().unwrap();
4958        s.handshake().unwrap();
4959
4960        s.client
4961            .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
4962                urgency: 3,
4963                incremental: false,
4964            })
4965            .unwrap();
4966        s.advance().ok();
4967
4968        let (stream, req) = s.send_request(false).unwrap();
4969        let ev_headers = Event::Headers {
4970            list: req,
4971            more_frames: true,
4972        };
4973
4974        // Priority event is generated before request headers.
4975        assert_eq!(s.poll_server(), Ok((0, Event::PriorityUpdate)));
4976        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4977        assert_eq!(s.poll_server(), Err(Error::Done));
4978
4979        assert_eq!(s.server.take_last_priority_update(0), Ok(b"u=3".to_vec()));
4980        assert_eq!(s.server.take_last_priority_update(0), Err(Error::Done));
4981
4982        s.pipe
4983            .client
4984            .stream_shutdown(stream, crate::Shutdown::Write, 0x100)
4985            .unwrap();
4986        s.pipe
4987            .client
4988            .stream_shutdown(stream, crate::Shutdown::Read, 0x100)
4989            .unwrap();
4990
4991        s.advance().ok();
4992
4993        assert_eq!(s.poll_server(), Ok((0, Event::Reset(0x100))));
4994        assert_eq!(s.poll_server(), Err(Error::Done));
4995
4996        // Now send a PRIORITY_UPDATE for the closed request stream.
4997        s.client
4998            .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
4999                urgency: 3,
5000                incremental: false,
5001            })
5002            .unwrap();
5003        s.advance().ok();
5004
5005        // No event generated at server
5006        assert_eq!(s.poll_server(), Err(Error::Done));
5007
5008        assert!(s.pipe.server.streams.is_collected(0));
5009        assert!(s.pipe.client.streams.is_collected(0));
5010    }
5011
5012    #[test]
5013    /// Send a PRIORITY_UPDATE for push stream from the client.
5014    fn priority_update_push() {
5015        let mut s = Session::new().unwrap();
5016        s.handshake().unwrap();
5017
5018        s.send_frame_client(
5019            frame::Frame::PriorityUpdatePush {
5020                prioritized_element_id: 3,
5021                priority_field_value: b"u=3".to_vec(),
5022            },
5023            s.client.control_stream_id.unwrap(),
5024            false,
5025        )
5026        .unwrap();
5027
5028        assert_eq!(s.poll_server(), Err(Error::Done));
5029    }
5030
5031    #[test]
5032    /// Send a PRIORITY_UPDATE for request stream from the client but for an
5033    /// incorrect stream type.
5034    fn priority_update_request_bad_stream() {
5035        let mut s = Session::new().unwrap();
5036        s.handshake().unwrap();
5037
5038        s.send_frame_client(
5039            frame::Frame::PriorityUpdateRequest {
5040                prioritized_element_id: 5,
5041                priority_field_value: b"u=3".to_vec(),
5042            },
5043            s.client.control_stream_id.unwrap(),
5044            false,
5045        )
5046        .unwrap();
5047
5048        assert_eq!(s.poll_server(), Err(Error::FrameUnexpected));
5049    }
5050
5051    #[test]
5052    /// Send a PRIORITY_UPDATE for push stream from the client but for an
5053    /// incorrect stream type.
5054    fn priority_update_push_bad_stream() {
5055        let mut s = Session::new().unwrap();
5056        s.handshake().unwrap();
5057
5058        s.send_frame_client(
5059            frame::Frame::PriorityUpdatePush {
5060                prioritized_element_id: 5,
5061                priority_field_value: b"u=3".to_vec(),
5062            },
5063            s.client.control_stream_id.unwrap(),
5064            false,
5065        )
5066        .unwrap();
5067
5068        assert_eq!(s.poll_server(), Err(Error::FrameUnexpected));
5069    }
5070
5071    #[test]
5072    /// Send a PRIORITY_UPDATE for request stream from the server.
5073    fn priority_update_request_from_server() {
5074        let mut s = Session::new().unwrap();
5075        s.handshake().unwrap();
5076
5077        s.send_frame_server(
5078            frame::Frame::PriorityUpdateRequest {
5079                prioritized_element_id: 0,
5080                priority_field_value: b"u=3".to_vec(),
5081            },
5082            s.server.control_stream_id.unwrap(),
5083            false,
5084        )
5085        .unwrap();
5086
5087        assert_eq!(s.poll_client(), Err(Error::FrameUnexpected));
5088    }
5089
5090    #[test]
5091    /// Send a PRIORITY_UPDATE for request stream from the server.
5092    fn priority_update_push_from_server() {
5093        let mut s = Session::new().unwrap();
5094        s.handshake().unwrap();
5095
5096        s.send_frame_server(
5097            frame::Frame::PriorityUpdatePush {
5098                prioritized_element_id: 0,
5099                priority_field_value: b"u=3".to_vec(),
5100            },
5101            s.server.control_stream_id.unwrap(),
5102            false,
5103        )
5104        .unwrap();
5105
5106        assert_eq!(s.poll_client(), Err(Error::FrameUnexpected));
5107    }
5108
5109    #[test]
5110    /// Ensure quiche allocates streams for client and server roles as expected.
5111    fn uni_stream_local_counting() {
5112        let config = Config::new().unwrap();
5113
5114        let h3_cln = Connection::new(&config, false, false).unwrap();
5115        assert_eq!(h3_cln.next_uni_stream_id, 2);
5116
5117        let h3_srv = Connection::new(&config, true, false).unwrap();
5118        assert_eq!(h3_srv.next_uni_stream_id, 3);
5119    }
5120
5121    #[test]
5122    /// Client opens multiple control streams, which is forbidden.
5123    fn open_multiple_control_streams() {
5124        let mut s = Session::new().unwrap();
5125        s.handshake().unwrap();
5126
5127        let stream_id = s.client.next_uni_stream_id;
5128
5129        let mut d = [42; 8];
5130        let mut b = octets::OctetsMut::with_slice(&mut d);
5131
5132        s.pipe
5133            .client
5134            .stream_send(
5135                stream_id,
5136                b.put_varint(stream::HTTP3_CONTROL_STREAM_TYPE_ID).unwrap(),
5137                false,
5138            )
5139            .unwrap();
5140
5141        s.advance().ok();
5142
5143        assert_eq!(s.poll_server(), Err(Error::StreamCreationError));
5144    }
5145
5146    #[test]
5147    /// Client closes the control stream, which is forbidden.
5148    fn close_control_stream_after_type() {
5149        let mut s = Session::new().unwrap();
5150        s.handshake().unwrap();
5151
5152        s.pipe
5153            .client
5154            .stream_send(s.client.control_stream_id.unwrap(), &[], true)
5155            .unwrap();
5156
5157        s.advance().ok();
5158
5159        assert_eq!(
5160            Err(Error::ClosedCriticalStream),
5161            s.server.poll(&mut s.pipe.server)
5162        );
5163        assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
5164    }
5165
5166    #[test]
5167    /// Client closes the control stream after a frame is sent, which is
5168    /// forbidden.
5169    fn close_control_stream_after_frame() {
5170        let mut s = Session::new().unwrap();
5171        s.handshake().unwrap();
5172
5173        s.send_frame_client(
5174            frame::Frame::MaxPushId { push_id: 1 },
5175            s.client.control_stream_id.unwrap(),
5176            true,
5177        )
5178        .unwrap();
5179
5180        assert_eq!(
5181            Err(Error::ClosedCriticalStream),
5182            s.server.poll(&mut s.pipe.server)
5183        );
5184        assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
5185    }
5186
5187    #[test]
5188    /// Client resets the control stream, which is forbidden.
5189    fn reset_control_stream_after_type() {
5190        let mut s = Session::new().unwrap();
5191        s.handshake().unwrap();
5192
5193        s.pipe
5194            .client
5195            .stream_shutdown(
5196                s.client.control_stream_id.unwrap(),
5197                crate::Shutdown::Write,
5198                0,
5199            )
5200            .unwrap();
5201
5202        s.advance().ok();
5203
5204        assert_eq!(
5205            Err(Error::ClosedCriticalStream),
5206            s.server.poll(&mut s.pipe.server)
5207        );
5208        assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
5209    }
5210
5211    #[test]
5212    /// Client resets the control stream after a frame is sent, which is
5213    /// forbidden.
5214    fn reset_control_stream_after_frame() {
5215        let mut s = Session::new().unwrap();
5216        s.handshake().unwrap();
5217
5218        s.send_frame_client(
5219            frame::Frame::MaxPushId { push_id: 1 },
5220            s.client.control_stream_id.unwrap(),
5221            false,
5222        )
5223        .unwrap();
5224
5225        assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
5226
5227        s.pipe
5228            .client
5229            .stream_shutdown(
5230                s.client.control_stream_id.unwrap(),
5231                crate::Shutdown::Write,
5232                0,
5233            )
5234            .unwrap();
5235
5236        s.advance().ok();
5237
5238        assert_eq!(
5239            Err(Error::ClosedCriticalStream),
5240            s.server.poll(&mut s.pipe.server)
5241        );
5242        assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
5243    }
5244
5245    #[test]
5246    /// Client closes QPACK stream, which is forbidden.
5247    fn close_qpack_stream_after_type() {
5248        let mut s = Session::new().unwrap();
5249        s.handshake().unwrap();
5250
5251        s.pipe
5252            .client
5253            .stream_send(
5254                s.client.local_qpack_streams.encoder_stream_id.unwrap(),
5255                &[],
5256                true,
5257            )
5258            .unwrap();
5259
5260        s.advance().ok();
5261
5262        assert_eq!(
5263            Err(Error::ClosedCriticalStream),
5264            s.server.poll(&mut s.pipe.server)
5265        );
5266        assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
5267    }
5268
5269    #[test]
5270    /// Client closes QPACK stream after sending some stuff, which is forbidden.
5271    fn close_qpack_stream_after_data() {
5272        let mut s = Session::new().unwrap();
5273        s.handshake().unwrap();
5274
5275        let stream_id = s.client.local_qpack_streams.encoder_stream_id.unwrap();
5276        let d = [0; 1];
5277
5278        s.pipe.client.stream_send(stream_id, &d, false).unwrap();
5279        s.pipe.client.stream_send(stream_id, &d, true).unwrap();
5280
5281        s.advance().ok();
5282
5283        assert_eq!(
5284            Err(Error::ClosedCriticalStream),
5285            s.server.poll(&mut s.pipe.server)
5286        );
5287        assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
5288    }
5289
5290    #[test]
5291    /// Client resets QPACK stream, which is forbidden.
5292    fn reset_qpack_stream_after_type() {
5293        let mut s = Session::new().unwrap();
5294        s.handshake().unwrap();
5295
5296        s.pipe
5297            .client
5298            .stream_shutdown(
5299                s.client.local_qpack_streams.encoder_stream_id.unwrap(),
5300                crate::Shutdown::Write,
5301                0,
5302            )
5303            .unwrap();
5304
5305        s.advance().ok();
5306
5307        assert_eq!(
5308            Err(Error::ClosedCriticalStream),
5309            s.server.poll(&mut s.pipe.server)
5310        );
5311        assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
5312    }
5313
5314    #[test]
5315    /// Client resets QPACK stream after sending some stuff, which is forbidden.
5316    fn reset_qpack_stream_after_data() {
5317        let mut s = Session::new().unwrap();
5318        s.handshake().unwrap();
5319
5320        let stream_id = s.client.local_qpack_streams.encoder_stream_id.unwrap();
5321        let d = [0; 1];
5322
5323        s.pipe.client.stream_send(stream_id, &d, false).unwrap();
5324        s.pipe.client.stream_send(stream_id, &d, false).unwrap();
5325
5326        s.advance().ok();
5327
5328        assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
5329
5330        s.pipe
5331            .client
5332            .stream_shutdown(stream_id, crate::Shutdown::Write, 0)
5333            .unwrap();
5334
5335        s.advance().ok();
5336
5337        assert_eq!(
5338            Err(Error::ClosedCriticalStream),
5339            s.server.poll(&mut s.pipe.server)
5340        );
5341        assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
5342    }
5343
5344    #[test]
5345    /// Client sends QPACK data.
5346    fn qpack_data() {
5347        // TODO: QPACK instructions are ignored until dynamic table support is
5348        // added so we just test that the data is safely ignored.
5349        let mut s = Session::new().unwrap();
5350        s.handshake().unwrap();
5351
5352        let e_stream_id = s.client.local_qpack_streams.encoder_stream_id.unwrap();
5353        let d_stream_id = s.client.local_qpack_streams.decoder_stream_id.unwrap();
5354        let d = [0; 20];
5355
5356        s.pipe.client.stream_send(e_stream_id, &d, false).unwrap();
5357        s.advance().ok();
5358
5359        s.pipe.client.stream_send(d_stream_id, &d, false).unwrap();
5360        s.advance().ok();
5361
5362        match s.server.poll(&mut s.pipe.server) {
5363            Ok(_) => panic!(),
5364
5365            Err(Error::Done) => {
5366                assert_eq!(s.server.peer_qpack_streams.encoder_stream_bytes, 20);
5367                assert_eq!(s.server.peer_qpack_streams.decoder_stream_bytes, 20);
5368            },
5369
5370            Err(_) => {
5371                panic!();
5372            },
5373        }
5374
5375        let stats = s.server.stats();
5376        assert_eq!(stats.qpack_encoder_stream_recv_bytes, 20);
5377        assert_eq!(stats.qpack_decoder_stream_recv_bytes, 20);
5378    }
5379
5380    #[test]
5381    /// Tests limits for the stream state buffer maximum size.
5382    fn max_state_buf_size() {
5383        let mut s = Session::new().unwrap();
5384        s.handshake().unwrap();
5385
5386        let req = vec![
5387            Header::new(b":method", b"GET"),
5388            Header::new(b":scheme", b"https"),
5389            Header::new(b":authority", b"quic.tech"),
5390            Header::new(b":path", b"/test"),
5391            Header::new(b"user-agent", b"quiche-test"),
5392        ];
5393
5394        assert_eq!(
5395            s.client.send_request(&mut s.pipe.client, &req, false),
5396            Ok(0)
5397        );
5398
5399        s.advance().ok();
5400
5401        let ev_headers = Event::Headers {
5402            list: req,
5403            more_frames: true,
5404        };
5405
5406        assert_eq!(s.server.poll(&mut s.pipe.server), Ok((0, ev_headers)));
5407
5408        // DATA frames don't consume the state buffer, so can be of any size.
5409        let mut d = [42; 128];
5410        let mut b = octets::OctetsMut::with_slice(&mut d);
5411
5412        let frame_type = b.put_varint(frame::DATA_FRAME_TYPE_ID).unwrap();
5413        s.pipe.client.stream_send(0, frame_type, false).unwrap();
5414
5415        let frame_len = b.put_varint(1 << 24).unwrap();
5416        s.pipe.client.stream_send(0, frame_len, false).unwrap();
5417
5418        s.pipe.client.stream_send(0, &d, false).unwrap();
5419
5420        s.advance().ok();
5421
5422        assert_eq!(s.server.poll(&mut s.pipe.server), Ok((0, Event::Data)));
5423
5424        // GREASE frames consume the state buffer, so need to be limited.
5425        let mut s = Session::new().unwrap();
5426        s.handshake().unwrap();
5427
5428        let mut d = [42; 128];
5429        let mut b = octets::OctetsMut::with_slice(&mut d);
5430
5431        let frame_type = b.put_varint(148_764_065_110_560_899).unwrap();
5432        s.pipe.client.stream_send(0, frame_type, false).unwrap();
5433
5434        let frame_len = b.put_varint(1 << 24).unwrap();
5435        s.pipe.client.stream_send(0, frame_len, false).unwrap();
5436
5437        s.pipe.client.stream_send(0, &d, false).unwrap();
5438
5439        s.advance().ok();
5440
5441        assert_eq!(s.server.poll(&mut s.pipe.server), Err(Error::ExcessiveLoad));
5442    }
5443
5444    #[test]
5445    /// Tests that DATA frames are properly truncated depending on the request
5446    /// stream's outgoing flow control capacity.
5447    fn stream_backpressure() {
5448        let bytes = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
5449
5450        let mut s = Session::new().unwrap();
5451        s.handshake().unwrap();
5452
5453        let (stream, req) = s.send_request(false).unwrap();
5454
5455        let total_data_frames = 6;
5456
5457        for _ in 0..total_data_frames {
5458            assert_eq!(
5459                s.client
5460                    .send_body(&mut s.pipe.client, stream, &bytes, false),
5461                Ok(bytes.len())
5462            );
5463
5464            s.advance().ok();
5465        }
5466
5467        assert_eq!(
5468            s.client.send_body(&mut s.pipe.client, stream, &bytes, true),
5469            Ok(bytes.len() - 2)
5470        );
5471
5472        s.advance().ok();
5473
5474        let mut recv_buf = vec![0; bytes.len()];
5475
5476        let ev_headers = Event::Headers {
5477            list: req,
5478            more_frames: true,
5479        };
5480
5481        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
5482        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
5483        assert_eq!(s.poll_server(), Err(Error::Done));
5484
5485        for _ in 0..total_data_frames {
5486            assert_eq!(
5487                s.recv_body_server(stream, &mut recv_buf),
5488                Ok(bytes.len())
5489            );
5490        }
5491
5492        assert_eq!(
5493            s.recv_body_server(stream, &mut recv_buf),
5494            Ok(bytes.len() - 2)
5495        );
5496
5497        // Fin flag from last send_body() call was not sent as the buffer was
5498        // only partially written.
5499        assert_eq!(s.poll_server(), Err(Error::Done));
5500
5501        assert_eq!(s.pipe.server.data_blocked_sent_count, 0);
5502        assert_eq!(s.pipe.server.stream_data_blocked_sent_count, 0);
5503        assert_eq!(s.pipe.server.data_blocked_recv_count, 0);
5504        assert_eq!(s.pipe.server.stream_data_blocked_recv_count, 1);
5505
5506        assert_eq!(s.pipe.client.data_blocked_sent_count, 0);
5507        assert_eq!(s.pipe.client.stream_data_blocked_sent_count, 1);
5508        assert_eq!(s.pipe.client.data_blocked_recv_count, 0);
5509        assert_eq!(s.pipe.client.stream_data_blocked_recv_count, 0);
5510    }
5511
5512    #[test]
5513    /// Tests that the max header list size setting is enforced.
5514    fn request_max_header_size_limit() {
5515        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
5516        config
5517            .load_cert_chain_from_pem_file("examples/cert.crt")
5518            .unwrap();
5519        config
5520            .load_priv_key_from_pem_file("examples/cert.key")
5521            .unwrap();
5522        config.set_application_protos(&[b"h3"]).unwrap();
5523        config.set_initial_max_data(1500);
5524        config.set_initial_max_stream_data_bidi_local(150);
5525        config.set_initial_max_stream_data_bidi_remote(150);
5526        config.set_initial_max_stream_data_uni(150);
5527        config.set_initial_max_streams_bidi(5);
5528        config.set_initial_max_streams_uni(5);
5529        config.verify_peer(false);
5530
5531        let mut h3_config = Config::new().unwrap();
5532        h3_config.set_max_field_section_size(65);
5533
5534        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
5535
5536        s.handshake().unwrap();
5537
5538        let req = vec![
5539            Header::new(b":method", b"GET"),
5540            Header::new(b":scheme", b"https"),
5541            Header::new(b":authority", b"quic.tech"),
5542            Header::new(b":path", b"/test"),
5543            Header::new(b"aaaaaaa", b"aaaaaaaa"),
5544        ];
5545
5546        let stream = s
5547            .client
5548            .send_request(&mut s.pipe.client, &req, true)
5549            .unwrap();
5550
5551        s.advance().ok();
5552
5553        assert_eq!(stream, 0);
5554
5555        assert_eq!(s.poll_server(), Err(Error::ExcessiveLoad));
5556
5557        assert_eq!(
5558            s.pipe.server.local_error.as_ref().unwrap().error_code,
5559            Error::to_wire(Error::ExcessiveLoad)
5560        );
5561    }
5562
5563    #[test]
5564    /// Tests that Error::TransportError contains a transport error.
5565    fn transport_error() {
5566        let mut s = Session::new().unwrap();
5567        s.handshake().unwrap();
5568
5569        let req = vec![
5570            Header::new(b":method", b"GET"),
5571            Header::new(b":scheme", b"https"),
5572            Header::new(b":authority", b"quic.tech"),
5573            Header::new(b":path", b"/test"),
5574            Header::new(b"user-agent", b"quiche-test"),
5575        ];
5576
5577        // We need to open all streams in the same flight, so we can't use the
5578        // Session::send_request() method because it also calls advance(),
5579        // otherwise the server would send a MAX_STREAMS frame and the client
5580        // wouldn't hit the streams limit.
5581        assert_eq!(s.client.send_request(&mut s.pipe.client, &req, true), Ok(0));
5582        assert_eq!(s.client.send_request(&mut s.pipe.client, &req, true), Ok(4));
5583        assert_eq!(s.client.send_request(&mut s.pipe.client, &req, true), Ok(8));
5584        assert_eq!(
5585            s.client.send_request(&mut s.pipe.client, &req, true),
5586            Ok(12)
5587        );
5588        assert_eq!(
5589            s.client.send_request(&mut s.pipe.client, &req, true),
5590            Ok(16)
5591        );
5592
5593        assert_eq!(
5594            s.client.send_request(&mut s.pipe.client, &req, true),
5595            Err(Error::TransportError(crate::Error::StreamLimit))
5596        );
5597    }
5598
5599    #[test]
5600    /// Tests that sending DATA before HEADERS causes an error.
5601    fn data_before_headers() {
5602        let mut s = Session::new().unwrap();
5603        s.handshake().unwrap();
5604
5605        let mut d = [42; 128];
5606        let mut b = octets::OctetsMut::with_slice(&mut d);
5607
5608        let frame_type = b.put_varint(frame::DATA_FRAME_TYPE_ID).unwrap();
5609        s.pipe.client.stream_send(0, frame_type, false).unwrap();
5610
5611        let frame_len = b.put_varint(5).unwrap();
5612        s.pipe.client.stream_send(0, frame_len, false).unwrap();
5613
5614        s.pipe.client.stream_send(0, b"hello", false).unwrap();
5615
5616        s.advance().ok();
5617
5618        assert_eq!(
5619            s.server.poll(&mut s.pipe.server),
5620            Err(Error::FrameUnexpected)
5621        );
5622    }
5623
5624    #[test]
5625    /// Tests that calling poll() after an error occurred does nothing.
5626    fn poll_after_error() {
5627        let mut s = Session::new().unwrap();
5628        s.handshake().unwrap();
5629
5630        let mut d = [42; 128];
5631        let mut b = octets::OctetsMut::with_slice(&mut d);
5632
5633        let frame_type = b.put_varint(148_764_065_110_560_899).unwrap();
5634        s.pipe.client.stream_send(0, frame_type, false).unwrap();
5635
5636        let frame_len = b.put_varint(1 << 24).unwrap();
5637        s.pipe.client.stream_send(0, frame_len, false).unwrap();
5638
5639        s.pipe.client.stream_send(0, &d, false).unwrap();
5640
5641        s.advance().ok();
5642
5643        assert_eq!(s.server.poll(&mut s.pipe.server), Err(Error::ExcessiveLoad));
5644
5645        // Try to call poll() again after an error occurred.
5646        assert_eq!(s.server.poll(&mut s.pipe.server), Err(Error::Done));
5647    }
5648
5649    #[test]
5650    /// Tests that we limit sending HEADERS based on the stream capacity.
5651    fn headers_blocked() {
5652        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
5653        config
5654            .load_cert_chain_from_pem_file("examples/cert.crt")
5655            .unwrap();
5656        config
5657            .load_priv_key_from_pem_file("examples/cert.key")
5658            .unwrap();
5659        config.set_application_protos(&[b"h3"]).unwrap();
5660        config.set_initial_max_data(70);
5661        config.set_initial_max_stream_data_bidi_local(150);
5662        config.set_initial_max_stream_data_bidi_remote(150);
5663        config.set_initial_max_stream_data_uni(150);
5664        config.set_initial_max_streams_bidi(100);
5665        config.set_initial_max_streams_uni(5);
5666        config.verify_peer(false);
5667
5668        let h3_config = Config::new().unwrap();
5669
5670        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
5671
5672        s.handshake().unwrap();
5673
5674        let req = vec![
5675            Header::new(b":method", b"GET"),
5676            Header::new(b":scheme", b"https"),
5677            Header::new(b":authority", b"quic.tech"),
5678            Header::new(b":path", b"/test"),
5679        ];
5680
5681        assert_eq!(s.client.send_request(&mut s.pipe.client, &req, true), Ok(0));
5682
5683        assert_eq!(
5684            s.client.send_request(&mut s.pipe.client, &req, true),
5685            Err(Error::StreamBlocked)
5686        );
5687
5688        // Clear the writable stream queue.
5689        assert_eq!(s.pipe.client.stream_writable_next(), Some(2));
5690        assert_eq!(s.pipe.client.stream_writable_next(), Some(6));
5691        assert_eq!(s.pipe.client.stream_writable_next(), Some(10));
5692        assert_eq!(s.pipe.client.stream_writable_next(), None);
5693
5694        s.advance().ok();
5695
5696        // Once the server gives flow control credits back, we can send the
5697        // request.
5698        assert_eq!(s.pipe.client.stream_writable_next(), Some(4));
5699        assert_eq!(s.client.send_request(&mut s.pipe.client, &req, true), Ok(4));
5700
5701        assert_eq!(s.pipe.server.data_blocked_sent_count, 0);
5702        assert_eq!(s.pipe.server.stream_data_blocked_sent_count, 0);
5703        assert_eq!(s.pipe.server.data_blocked_recv_count, 1);
5704        assert_eq!(s.pipe.server.stream_data_blocked_recv_count, 0);
5705
5706        assert_eq!(s.pipe.client.data_blocked_sent_count, 1);
5707        assert_eq!(s.pipe.client.stream_data_blocked_sent_count, 0);
5708        assert_eq!(s.pipe.client.data_blocked_recv_count, 0);
5709        assert_eq!(s.pipe.client.stream_data_blocked_recv_count, 0);
5710    }
5711
5712    #[test]
5713    /// Ensure StreamBlocked when connection flow control prevents headers.
5714    fn headers_blocked_on_conn() {
5715        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
5716        config
5717            .load_cert_chain_from_pem_file("examples/cert.crt")
5718            .unwrap();
5719        config
5720            .load_priv_key_from_pem_file("examples/cert.key")
5721            .unwrap();
5722        config.set_application_protos(&[b"h3"]).unwrap();
5723        config.set_initial_max_data(70);
5724        config.set_initial_max_stream_data_bidi_local(150);
5725        config.set_initial_max_stream_data_bidi_remote(150);
5726        config.set_initial_max_stream_data_uni(150);
5727        config.set_initial_max_streams_bidi(100);
5728        config.set_initial_max_streams_uni(5);
5729        config.verify_peer(false);
5730
5731        let h3_config = Config::new().unwrap();
5732
5733        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
5734
5735        s.handshake().unwrap();
5736
5737        // After the HTTP handshake, some bytes of connection flow control have
5738        // been consumed. Fill the connection with more grease data on the control
5739        // stream.
5740        let d = [42; 28];
5741        assert_eq!(s.pipe.client.stream_send(2, &d, false), Ok(23));
5742
5743        let req = vec![
5744            Header::new(b":method", b"GET"),
5745            Header::new(b":scheme", b"https"),
5746            Header::new(b":authority", b"quic.tech"),
5747            Header::new(b":path", b"/test"),
5748        ];
5749
5750        // There is 0 connection-level flow control, so sending a request is
5751        // blocked.
5752        assert_eq!(
5753            s.client.send_request(&mut s.pipe.client, &req, true),
5754            Err(Error::StreamBlocked)
5755        );
5756        assert_eq!(s.pipe.client.stream_writable_next(), None);
5757
5758        // Emit the control stream data and drain it at the server via poll() to
5759        // consumes it via poll() and gives back flow control.
5760        s.advance().ok();
5761        assert_eq!(s.poll_server(), Err(Error::Done));
5762        s.advance().ok();
5763
5764        // Now we can send the request.
5765        assert_eq!(s.pipe.client.stream_writable_next(), Some(2));
5766        assert_eq!(s.pipe.client.stream_writable_next(), Some(6));
5767        assert_eq!(s.client.send_request(&mut s.pipe.client, &req, true), Ok(0));
5768
5769        assert_eq!(s.pipe.server.data_blocked_sent_count, 0);
5770        assert_eq!(s.pipe.server.stream_data_blocked_sent_count, 0);
5771        assert_eq!(s.pipe.server.data_blocked_recv_count, 1);
5772        assert_eq!(s.pipe.server.stream_data_blocked_recv_count, 0);
5773
5774        assert_eq!(s.pipe.client.data_blocked_sent_count, 1);
5775        assert_eq!(s.pipe.client.stream_data_blocked_sent_count, 0);
5776        assert_eq!(s.pipe.client.data_blocked_recv_count, 0);
5777        assert_eq!(s.pipe.client.stream_data_blocked_recv_count, 0);
5778    }
5779
5780    #[test]
5781    /// Ensure STREAM_DATA_BLOCKED is not emitted multiple times with the same
5782    /// offset when trying to send large bodies.
5783    fn send_body_truncation_stream_blocked() {
5784        use crate::test_utils::decode_pkt;
5785
5786        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
5787        config
5788            .load_cert_chain_from_pem_file("examples/cert.crt")
5789            .unwrap();
5790        config
5791            .load_priv_key_from_pem_file("examples/cert.key")
5792            .unwrap();
5793        config.set_application_protos(&[b"h3"]).unwrap();
5794        config.set_initial_max_data(10000); // large connection-level flow control
5795        config.set_initial_max_stream_data_bidi_local(80);
5796        config.set_initial_max_stream_data_bidi_remote(80);
5797        config.set_initial_max_stream_data_uni(150);
5798        config.set_initial_max_streams_bidi(100);
5799        config.set_initial_max_streams_uni(5);
5800        config.verify_peer(false);
5801
5802        let h3_config = Config::new().unwrap();
5803
5804        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
5805
5806        s.handshake().unwrap();
5807
5808        let (stream, req) = s.send_request(true).unwrap();
5809
5810        let ev_headers = Event::Headers {
5811            list: req,
5812            more_frames: false,
5813        };
5814
5815        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
5816        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
5817
5818        let _ = s.send_response(stream, false).unwrap();
5819
5820        assert_eq!(s.pipe.server.streams.blocked().len(), 0);
5821
5822        // The body must be larger than the stream window would allow
5823        let d = [42; 500];
5824        let mut off = 0;
5825
5826        let sent = s
5827            .server
5828            .send_body(&mut s.pipe.server, stream, &d, true)
5829            .unwrap();
5830        assert_eq!(sent, 25);
5831        off += sent;
5832
5833        // send_body wrote as much as it could (sent < size of buff).
5834        assert_eq!(s.pipe.server.streams.blocked().len(), 1);
5835        assert_eq!(
5836            s.server
5837                .send_body(&mut s.pipe.server, stream, &d[off..], true),
5838            Err(Error::Done)
5839        );
5840        assert_eq!(s.pipe.server.streams.blocked().len(), 1);
5841
5842        // Now read raw frames to see what the QUIC layer did
5843        let mut buf = [0; 65535];
5844        let (len, _) = s.pipe.server.send(&mut buf).unwrap();
5845
5846        let frames = decode_pkt(&mut s.pipe.client, &mut buf[..len]).unwrap();
5847
5848        let mut iter = frames.iter();
5849
5850        assert_eq!(
5851            iter.next(),
5852            Some(&crate::frame::Frame::StreamDataBlocked {
5853                stream_id: 0,
5854                limit: 80,
5855            })
5856        );
5857
5858        // At the server, after sending the STREAM_DATA_BLOCKED frame, we clear
5859        // the mark.
5860        assert_eq!(s.pipe.server.streams.blocked().len(), 0);
5861
5862        // Don't read any data from the client, so stream flow control is never
5863        // given back in the form of changing the stream's max offset.
5864        // Subsequent body send operations will still fail but no more
5865        // STREAM_DATA_BLOCKED frames should be submitted since the limit didn't
5866        // change. No frames means no packet to send.
5867        assert_eq!(
5868            s.server
5869                .send_body(&mut s.pipe.server, stream, &d[off..], true),
5870            Err(Error::Done)
5871        );
5872        assert_eq!(s.pipe.server.streams.blocked().len(), 0);
5873        assert_eq!(s.pipe.server.send(&mut buf), Err(crate::Error::Done));
5874
5875        // Now update the client's max offset manually.
5876        let frames = [crate::frame::Frame::MaxStreamData {
5877            stream_id: 0,
5878            max: 100,
5879        }];
5880
5881        let pkt_type = crate::packet::Type::Short;
5882        assert_eq!(
5883            s.pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
5884            Ok(39),
5885        );
5886
5887        let sent = s
5888            .server
5889            .send_body(&mut s.pipe.server, stream, &d[off..], true)
5890            .unwrap();
5891        assert_eq!(sent, 18);
5892
5893        // Same thing here...
5894        assert_eq!(s.pipe.server.streams.blocked().len(), 1);
5895        assert_eq!(
5896            s.server
5897                .send_body(&mut s.pipe.server, stream, &d[off..], true),
5898            Err(Error::Done)
5899        );
5900        assert_eq!(s.pipe.server.streams.blocked().len(), 1);
5901
5902        let (len, _) = s.pipe.server.send(&mut buf).unwrap();
5903
5904        let frames = decode_pkt(&mut s.pipe.client, &mut buf[..len]).unwrap();
5905
5906        let mut iter = frames.iter();
5907
5908        assert_eq!(
5909            iter.next(),
5910            Some(&crate::frame::Frame::StreamDataBlocked {
5911                stream_id: 0,
5912                limit: 100,
5913            })
5914        );
5915    }
5916
5917    #[test]
5918    /// Ensure stream doesn't hang due to small cwnd.
5919    fn send_body_stream_blocked_by_small_cwnd() {
5920        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
5921        config
5922            .load_cert_chain_from_pem_file("examples/cert.crt")
5923            .unwrap();
5924        config
5925            .load_priv_key_from_pem_file("examples/cert.key")
5926            .unwrap();
5927        config.set_application_protos(&[b"h3"]).unwrap();
5928        config.set_initial_max_data(100000); // large connection-level flow control
5929        config.set_initial_max_stream_data_bidi_local(100000);
5930        config.set_initial_max_stream_data_bidi_remote(50000);
5931        config.set_initial_max_stream_data_uni(150);
5932        config.set_initial_max_streams_bidi(100);
5933        config.set_initial_max_streams_uni(5);
5934        config.verify_peer(false);
5935
5936        let h3_config = Config::new().unwrap();
5937
5938        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
5939
5940        s.handshake().unwrap();
5941
5942        let (stream, req) = s.send_request(true).unwrap();
5943
5944        let ev_headers = Event::Headers {
5945            list: req,
5946            more_frames: false,
5947        };
5948
5949        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
5950        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
5951
5952        let _ = s.send_response(stream, false).unwrap();
5953
5954        // Clear the writable stream queue.
5955        assert_eq!(s.pipe.server.stream_writable_next(), Some(3));
5956        assert_eq!(s.pipe.server.stream_writable_next(), Some(7));
5957        assert_eq!(s.pipe.server.stream_writable_next(), Some(11));
5958        assert_eq!(s.pipe.server.stream_writable_next(), Some(stream));
5959        assert_eq!(s.pipe.server.stream_writable_next(), None);
5960
5961        // The body must be larger than the cwnd would allow.
5962        let send_buf = [42; 80000];
5963
5964        let sent = s
5965            .server
5966            .send_body(&mut s.pipe.server, stream, &send_buf, true)
5967            .unwrap();
5968
5969        // send_body wrote as much as it could (sent < size of buff).
5970        assert_eq!(sent, 11995);
5971
5972        s.advance().ok();
5973
5974        // Client reads received headers and body.
5975        let mut recv_buf = [42; 80000];
5976        assert!(s.poll_client().is_ok());
5977        assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
5978        assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(11995));
5979
5980        s.advance().ok();
5981
5982        // Server send cap is smaller than remaining body buffer.
5983        assert!(s.pipe.server.tx_cap < send_buf.len() - sent);
5984
5985        // Once the server cwnd opens up, we can send more body.
5986        assert_eq!(s.pipe.server.stream_writable_next(), Some(0));
5987    }
5988
5989    #[test]
5990    /// Ensure stream doesn't hang due to small cwnd.
5991    fn send_body_stream_blocked_zero_length() {
5992        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
5993        config
5994            .load_cert_chain_from_pem_file("examples/cert.crt")
5995            .unwrap();
5996        config
5997            .load_priv_key_from_pem_file("examples/cert.key")
5998            .unwrap();
5999        config.set_application_protos(&[b"h3"]).unwrap();
6000        config.set_initial_max_data(100000); // large connection-level flow control
6001        config.set_initial_max_stream_data_bidi_local(100000);
6002        config.set_initial_max_stream_data_bidi_remote(50000);
6003        config.set_initial_max_stream_data_uni(150);
6004        config.set_initial_max_streams_bidi(100);
6005        config.set_initial_max_streams_uni(5);
6006        config.verify_peer(false);
6007
6008        let h3_config = Config::new().unwrap();
6009
6010        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
6011
6012        s.handshake().unwrap();
6013
6014        let (stream, req) = s.send_request(true).unwrap();
6015
6016        let ev_headers = Event::Headers {
6017            list: req,
6018            more_frames: false,
6019        };
6020
6021        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
6022        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
6023
6024        let _ = s.send_response(stream, false).unwrap();
6025
6026        // Clear the writable stream queue.
6027        assert_eq!(s.pipe.server.stream_writable_next(), Some(3));
6028        assert_eq!(s.pipe.server.stream_writable_next(), Some(7));
6029        assert_eq!(s.pipe.server.stream_writable_next(), Some(11));
6030        assert_eq!(s.pipe.server.stream_writable_next(), Some(stream));
6031        assert_eq!(s.pipe.server.stream_writable_next(), None);
6032
6033        // The body is large enough to fill the cwnd, except for enough bytes
6034        // for another DATA frame header (but no payload).
6035        let send_buf = [42; 11994];
6036
6037        let sent = s
6038            .server
6039            .send_body(&mut s.pipe.server, stream, &send_buf, false)
6040            .unwrap();
6041
6042        assert_eq!(sent, 11994);
6043
6044        // There is only enough capacity left for the DATA frame header, but
6045        // no payload.
6046        assert_eq!(s.pipe.server.stream_capacity(stream).unwrap(), 3);
6047        assert_eq!(
6048            s.server
6049                .send_body(&mut s.pipe.server, stream, &send_buf, false),
6050            Err(Error::Done)
6051        );
6052
6053        s.advance().ok();
6054
6055        // Client reads received headers and body.
6056        let mut recv_buf = [42; 80000];
6057        assert!(s.poll_client().is_ok());
6058        assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
6059        assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(11994));
6060
6061        s.advance().ok();
6062
6063        // Once the server cwnd opens up, we can send more body.
6064        assert_eq!(s.pipe.server.stream_writable_next(), Some(0));
6065    }
6066
6067    #[test]
6068    /// Test handling of 0-length DATA writes with and without fin.
6069    fn zero_length_data() {
6070        let mut s = Session::new().unwrap();
6071        s.handshake().unwrap();
6072
6073        let (stream, req) = s.send_request(false).unwrap();
6074
6075        assert_eq!(
6076            s.client.send_body(&mut s.pipe.client, 0, b"", false),
6077            Err(Error::Done)
6078        );
6079        assert_eq!(s.client.send_body(&mut s.pipe.client, 0, b"", true), Ok(0));
6080
6081        s.advance().ok();
6082
6083        let mut recv_buf = vec![0; 100];
6084
6085        let ev_headers = Event::Headers {
6086            list: req,
6087            more_frames: true,
6088        };
6089
6090        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
6091
6092        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
6093        assert_eq!(s.recv_body_server(stream, &mut recv_buf), Err(Error::Done));
6094
6095        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
6096        assert_eq!(s.poll_server(), Err(Error::Done));
6097
6098        let resp = s.send_response(stream, false).unwrap();
6099
6100        assert_eq!(
6101            s.server.send_body(&mut s.pipe.server, 0, b"", false),
6102            Err(Error::Done)
6103        );
6104        assert_eq!(s.server.send_body(&mut s.pipe.server, 0, b"", true), Ok(0));
6105
6106        s.advance().ok();
6107
6108        let ev_headers = Event::Headers {
6109            list: resp,
6110            more_frames: true,
6111        };
6112
6113        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
6114
6115        assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
6116        assert_eq!(s.recv_body_client(stream, &mut recv_buf), Err(Error::Done));
6117
6118        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
6119        assert_eq!(s.poll_client(), Err(Error::Done));
6120    }
6121
6122    #[test]
6123    /// Tests that blocked 0-length DATA writes are reported correctly.
6124    fn zero_length_data_blocked() {
6125        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
6126        config
6127            .load_cert_chain_from_pem_file("examples/cert.crt")
6128            .unwrap();
6129        config
6130            .load_priv_key_from_pem_file("examples/cert.key")
6131            .unwrap();
6132        config.set_application_protos(&[b"h3"]).unwrap();
6133        config.set_initial_max_data(69);
6134        config.set_initial_max_stream_data_bidi_local(150);
6135        config.set_initial_max_stream_data_bidi_remote(150);
6136        config.set_initial_max_stream_data_uni(150);
6137        config.set_initial_max_streams_bidi(100);
6138        config.set_initial_max_streams_uni(5);
6139        config.verify_peer(false);
6140
6141        let h3_config = Config::new().unwrap();
6142
6143        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
6144
6145        s.handshake().unwrap();
6146
6147        let req = vec![
6148            Header::new(b":method", b"GET"),
6149            Header::new(b":scheme", b"https"),
6150            Header::new(b":authority", b"quic.tech"),
6151            Header::new(b":path", b"/test"),
6152        ];
6153
6154        assert_eq!(
6155            s.client.send_request(&mut s.pipe.client, &req, false),
6156            Ok(0)
6157        );
6158
6159        assert_eq!(
6160            s.client.send_body(&mut s.pipe.client, 0, b"", true),
6161            Err(Error::Done)
6162        );
6163
6164        // Clear the writable stream queue.
6165        assert_eq!(s.pipe.client.stream_writable_next(), Some(2));
6166        assert_eq!(s.pipe.client.stream_writable_next(), Some(6));
6167        assert_eq!(s.pipe.client.stream_writable_next(), Some(10));
6168        assert_eq!(s.pipe.client.stream_writable_next(), None);
6169
6170        s.advance().ok();
6171
6172        // Once the server gives flow control credits back, we can send the body.
6173        assert_eq!(s.pipe.client.stream_writable_next(), Some(0));
6174        assert_eq!(s.client.send_body(&mut s.pipe.client, 0, b"", true), Ok(0));
6175    }
6176
6177    #[test]
6178    /// Tests that receiving an empty SETTINGS frame is handled and reported.
6179    fn empty_settings() {
6180        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
6181        config
6182            .load_cert_chain_from_pem_file("examples/cert.crt")
6183            .unwrap();
6184        config
6185            .load_priv_key_from_pem_file("examples/cert.key")
6186            .unwrap();
6187        config.set_application_protos(&[b"h3"]).unwrap();
6188        config.set_initial_max_data(1500);
6189        config.set_initial_max_stream_data_bidi_local(150);
6190        config.set_initial_max_stream_data_bidi_remote(150);
6191        config.set_initial_max_stream_data_uni(150);
6192        config.set_initial_max_streams_bidi(5);
6193        config.set_initial_max_streams_uni(5);
6194        config.verify_peer(false);
6195        config.set_ack_delay_exponent(8);
6196        config.grease(false);
6197
6198        let h3_config = Config::new().unwrap();
6199        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
6200
6201        s.handshake().unwrap();
6202
6203        assert!(s.client.peer_settings_raw().is_some());
6204        assert!(s.server.peer_settings_raw().is_some());
6205    }
6206
6207    #[test]
6208    /// Tests that receiving a H3_DATAGRAM setting is ok.
6209    fn dgram_setting() {
6210        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
6211        config
6212            .load_cert_chain_from_pem_file("examples/cert.crt")
6213            .unwrap();
6214        config
6215            .load_priv_key_from_pem_file("examples/cert.key")
6216            .unwrap();
6217        config.set_application_protos(&[b"h3"]).unwrap();
6218        config.set_initial_max_data(70);
6219        config.set_initial_max_stream_data_bidi_local(150);
6220        config.set_initial_max_stream_data_bidi_remote(150);
6221        config.set_initial_max_stream_data_uni(150);
6222        config.set_initial_max_streams_bidi(100);
6223        config.set_initial_max_streams_uni(5);
6224        config.enable_dgram(true, 1000, 1000);
6225        config.verify_peer(false);
6226
6227        let h3_config = Config::new().unwrap();
6228
6229        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
6230        assert_eq!(s.pipe.handshake(), Ok(()));
6231
6232        s.client.send_settings(&mut s.pipe.client).unwrap();
6233        assert_eq!(s.pipe.advance(), Ok(()));
6234
6235        // Before processing SETTINGS (via poll), HTTP/3 DATAGRAMS are not
6236        // enabled.
6237        assert!(!s.server.dgram_enabled_by_peer(&s.pipe.server));
6238
6239        // When everything is ok, poll returns Done and DATAGRAM is enabled.
6240        assert_eq!(s.server.poll(&mut s.pipe.server), Err(Error::Done));
6241        assert!(s.server.dgram_enabled_by_peer(&s.pipe.server));
6242
6243        // Now detect things on the client
6244        s.server.send_settings(&mut s.pipe.server).unwrap();
6245        assert_eq!(s.pipe.advance(), Ok(()));
6246        assert!(!s.client.dgram_enabled_by_peer(&s.pipe.client));
6247        assert_eq!(s.client.poll(&mut s.pipe.client), Err(Error::Done));
6248        assert!(s.client.dgram_enabled_by_peer(&s.pipe.client));
6249    }
6250
6251    #[test]
6252    /// Tests that receiving a H3_DATAGRAM setting when no TP is set generates
6253    /// an error.
6254    fn dgram_setting_no_tp() {
6255        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
6256        config
6257            .load_cert_chain_from_pem_file("examples/cert.crt")
6258            .unwrap();
6259        config
6260            .load_priv_key_from_pem_file("examples/cert.key")
6261            .unwrap();
6262        config.set_application_protos(&[b"h3"]).unwrap();
6263        config.set_initial_max_data(70);
6264        config.set_initial_max_stream_data_bidi_local(150);
6265        config.set_initial_max_stream_data_bidi_remote(150);
6266        config.set_initial_max_stream_data_uni(150);
6267        config.set_initial_max_streams_bidi(100);
6268        config.set_initial_max_streams_uni(5);
6269        config.verify_peer(false);
6270
6271        let h3_config = Config::new().unwrap();
6272
6273        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
6274        assert_eq!(s.pipe.handshake(), Ok(()));
6275
6276        s.client.control_stream_id = Some(
6277            s.client
6278                .open_uni_stream(
6279                    &mut s.pipe.client,
6280                    stream::HTTP3_CONTROL_STREAM_TYPE_ID,
6281                )
6282                .unwrap(),
6283        );
6284
6285        let settings = frame::Frame::Settings {
6286            max_field_section_size: None,
6287            qpack_max_table_capacity: None,
6288            qpack_blocked_streams: None,
6289            connect_protocol_enabled: None,
6290            h3_datagram: Some(1),
6291            grease: None,
6292            additional_settings: Default::default(),
6293            raw: Default::default(),
6294        };
6295
6296        s.send_frame_client(settings, s.client.control_stream_id.unwrap(), false)
6297            .unwrap();
6298
6299        assert_eq!(s.pipe.advance(), Ok(()));
6300
6301        assert_eq!(s.server.poll(&mut s.pipe.server), Err(Error::SettingsError));
6302    }
6303
6304    #[test]
6305    /// Tests that receiving SETTINGS with prohibited values generates an error.
6306    fn settings_h2_prohibited() {
6307        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
6308        config
6309            .load_cert_chain_from_pem_file("examples/cert.crt")
6310            .unwrap();
6311        config
6312            .load_priv_key_from_pem_file("examples/cert.key")
6313            .unwrap();
6314        config.set_application_protos(&[b"h3"]).unwrap();
6315        config.set_initial_max_data(70);
6316        config.set_initial_max_stream_data_bidi_local(150);
6317        config.set_initial_max_stream_data_bidi_remote(150);
6318        config.set_initial_max_stream_data_uni(150);
6319        config.set_initial_max_streams_bidi(100);
6320        config.set_initial_max_streams_uni(5);
6321        config.verify_peer(false);
6322
6323        let h3_config = Config::new().unwrap();
6324
6325        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
6326        assert_eq!(s.pipe.handshake(), Ok(()));
6327
6328        s.client.control_stream_id = Some(
6329            s.client
6330                .open_uni_stream(
6331                    &mut s.pipe.client,
6332                    stream::HTTP3_CONTROL_STREAM_TYPE_ID,
6333                )
6334                .unwrap(),
6335        );
6336
6337        s.server.control_stream_id = Some(
6338            s.server
6339                .open_uni_stream(
6340                    &mut s.pipe.server,
6341                    stream::HTTP3_CONTROL_STREAM_TYPE_ID,
6342                )
6343                .unwrap(),
6344        );
6345
6346        let frame_payload_len = 2u64;
6347        let settings = [
6348            frame::SETTINGS_FRAME_TYPE_ID as u8,
6349            frame_payload_len as u8,
6350            0x2, // 0x2 is a reserved setting type
6351            1,
6352        ];
6353
6354        s.send_arbitrary_stream_data_client(
6355            &settings,
6356            s.client.control_stream_id.unwrap(),
6357            false,
6358        )
6359        .unwrap();
6360
6361        s.send_arbitrary_stream_data_server(
6362            &settings,
6363            s.server.control_stream_id.unwrap(),
6364            false,
6365        )
6366        .unwrap();
6367
6368        assert_eq!(s.pipe.advance(), Ok(()));
6369
6370        assert_eq!(s.server.poll(&mut s.pipe.server), Err(Error::SettingsError));
6371
6372        assert_eq!(s.client.poll(&mut s.pipe.client), Err(Error::SettingsError));
6373    }
6374
6375    #[test]
6376    /// Tests that setting SETTINGS with prohibited values generates an error.
6377    fn set_prohibited_additional_settings() {
6378        let mut h3_config = Config::new().unwrap();
6379        assert_eq!(
6380            h3_config.set_additional_settings(vec![(
6381                frame::SETTINGS_QPACK_MAX_TABLE_CAPACITY,
6382                43
6383            )]),
6384            Err(Error::SettingsError)
6385        );
6386        assert_eq!(
6387            h3_config.set_additional_settings(vec![(
6388                frame::SETTINGS_MAX_FIELD_SECTION_SIZE,
6389                43
6390            )]),
6391            Err(Error::SettingsError)
6392        );
6393        assert_eq!(
6394            h3_config.set_additional_settings(vec![(
6395                frame::SETTINGS_QPACK_BLOCKED_STREAMS,
6396                43
6397            )]),
6398            Err(Error::SettingsError)
6399        );
6400        assert_eq!(
6401            h3_config.set_additional_settings(vec![(
6402                frame::SETTINGS_ENABLE_CONNECT_PROTOCOL,
6403                43
6404            )]),
6405            Err(Error::SettingsError)
6406        );
6407        assert_eq!(
6408            h3_config
6409                .set_additional_settings(vec![(frame::SETTINGS_H3_DATAGRAM, 43)]),
6410            Err(Error::SettingsError)
6411        );
6412    }
6413
6414    #[test]
6415    /// Tests additional settings are actually exchanged by the peers.
6416    fn set_additional_settings() {
6417        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
6418        config
6419            .load_cert_chain_from_pem_file("examples/cert.crt")
6420            .unwrap();
6421        config
6422            .load_priv_key_from_pem_file("examples/cert.key")
6423            .unwrap();
6424        config.set_application_protos(&[b"h3"]).unwrap();
6425        config.set_initial_max_data(70);
6426        config.set_initial_max_stream_data_bidi_local(150);
6427        config.set_initial_max_stream_data_bidi_remote(150);
6428        config.set_initial_max_stream_data_uni(150);
6429        config.set_initial_max_streams_bidi(100);
6430        config.set_initial_max_streams_uni(5);
6431        config.verify_peer(false);
6432        config.grease(false);
6433
6434        let mut h3_config = Config::new().unwrap();
6435        h3_config
6436            .set_additional_settings(vec![(42, 43), (44, 45)])
6437            .unwrap();
6438
6439        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
6440        assert_eq!(s.pipe.handshake(), Ok(()));
6441
6442        assert_eq!(s.pipe.advance(), Ok(()));
6443
6444        s.client.send_settings(&mut s.pipe.client).unwrap();
6445        assert_eq!(s.pipe.advance(), Ok(()));
6446        assert_eq!(s.server.poll(&mut s.pipe.server), Err(Error::Done));
6447
6448        s.server.send_settings(&mut s.pipe.server).unwrap();
6449        assert_eq!(s.pipe.advance(), Ok(()));
6450        assert_eq!(s.client.poll(&mut s.pipe.client), Err(Error::Done));
6451
6452        assert_eq!(
6453            s.server.peer_settings_raw(),
6454            Some(&[(42, 43), (44, 45)][..])
6455        );
6456        assert_eq!(
6457            s.client.peer_settings_raw(),
6458            Some(&[(42, 43), (44, 45)][..])
6459        );
6460    }
6461
6462    #[test]
6463    /// Send a single DATAGRAM.
6464    fn single_dgram() {
6465        let mut buf = [0; 65535];
6466        let mut s = Session::new().unwrap();
6467        s.handshake().unwrap();
6468
6469        // We'll send default data of 10 bytes on flow ID 0.
6470        let result = (11, 0, 1);
6471
6472        s.send_dgram_client(0).unwrap();
6473
6474        assert_eq!(s.poll_server(), Err(Error::Done));
6475        assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
6476
6477        s.send_dgram_server(0).unwrap();
6478        assert_eq!(s.poll_client(), Err(Error::Done));
6479        assert_eq!(s.recv_dgram_client(&mut buf), Ok(result));
6480    }
6481
6482    #[test]
6483    /// Send multiple DATAGRAMs.
6484    fn multiple_dgram() {
6485        let mut buf = [0; 65535];
6486        let mut s = Session::new().unwrap();
6487        s.handshake().unwrap();
6488
6489        // We'll send default data of 10 bytes on flow ID 0.
6490        let result = (11, 0, 1);
6491
6492        s.send_dgram_client(0).unwrap();
6493        s.send_dgram_client(0).unwrap();
6494        s.send_dgram_client(0).unwrap();
6495
6496        assert_eq!(s.poll_server(), Err(Error::Done));
6497        assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
6498        assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
6499        assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
6500        assert_eq!(s.recv_dgram_server(&mut buf), Err(Error::Done));
6501
6502        s.send_dgram_server(0).unwrap();
6503        s.send_dgram_server(0).unwrap();
6504        s.send_dgram_server(0).unwrap();
6505
6506        assert_eq!(s.poll_client(), Err(Error::Done));
6507        assert_eq!(s.recv_dgram_client(&mut buf), Ok(result));
6508        assert_eq!(s.recv_dgram_client(&mut buf), Ok(result));
6509        assert_eq!(s.recv_dgram_client(&mut buf), Ok(result));
6510        assert_eq!(s.recv_dgram_client(&mut buf), Err(Error::Done));
6511    }
6512
6513    #[test]
6514    /// Send more DATAGRAMs than the send queue allows.
6515    fn multiple_dgram_overflow() {
6516        let mut buf = [0; 65535];
6517        let mut s = Session::new().unwrap();
6518        s.handshake().unwrap();
6519
6520        // We'll send default data of 10 bytes on flow ID 0.
6521        let result = (11, 0, 1);
6522
6523        // Five DATAGRAMs
6524        s.send_dgram_client(0).unwrap();
6525        s.send_dgram_client(0).unwrap();
6526        s.send_dgram_client(0).unwrap();
6527        s.send_dgram_client(0).unwrap();
6528        s.send_dgram_client(0).unwrap();
6529
6530        // Only 3 independent DATAGRAMs to read events will fire.
6531        assert_eq!(s.poll_server(), Err(Error::Done));
6532        assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
6533        assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
6534        assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
6535        assert_eq!(s.recv_dgram_server(&mut buf), Err(Error::Done));
6536    }
6537
6538    #[test]
6539    /// Send a single DATAGRAM and request.
6540    fn poll_datagram_cycling_no_read() {
6541        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
6542        config
6543            .load_cert_chain_from_pem_file("examples/cert.crt")
6544            .unwrap();
6545        config
6546            .load_priv_key_from_pem_file("examples/cert.key")
6547            .unwrap();
6548        config.set_application_protos(&[b"h3"]).unwrap();
6549        config.set_initial_max_data(1500);
6550        config.set_initial_max_stream_data_bidi_local(150);
6551        config.set_initial_max_stream_data_bidi_remote(150);
6552        config.set_initial_max_stream_data_uni(150);
6553        config.set_initial_max_streams_bidi(100);
6554        config.set_initial_max_streams_uni(5);
6555        config.verify_peer(false);
6556        config.enable_dgram(true, 100, 100);
6557
6558        let h3_config = Config::new().unwrap();
6559        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
6560        s.handshake().unwrap();
6561
6562        // Send request followed by DATAGRAM on client side.
6563        let (stream, req) = s.send_request(false).unwrap();
6564
6565        s.send_body_client(stream, true).unwrap();
6566
6567        let ev_headers = Event::Headers {
6568            list: req,
6569            more_frames: true,
6570        };
6571
6572        s.send_dgram_client(0).unwrap();
6573
6574        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
6575        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
6576
6577        assert_eq!(s.poll_server(), Err(Error::Done));
6578    }
6579
6580    #[test]
6581    /// Send a single DATAGRAM and request.
6582    fn poll_datagram_single_read() {
6583        let mut buf = [0; 65535];
6584
6585        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
6586        config
6587            .load_cert_chain_from_pem_file("examples/cert.crt")
6588            .unwrap();
6589        config
6590            .load_priv_key_from_pem_file("examples/cert.key")
6591            .unwrap();
6592        config.set_application_protos(&[b"h3"]).unwrap();
6593        config.set_initial_max_data(1500);
6594        config.set_initial_max_stream_data_bidi_local(150);
6595        config.set_initial_max_stream_data_bidi_remote(150);
6596        config.set_initial_max_stream_data_uni(150);
6597        config.set_initial_max_streams_bidi(100);
6598        config.set_initial_max_streams_uni(5);
6599        config.verify_peer(false);
6600        config.enable_dgram(true, 100, 100);
6601
6602        let h3_config = Config::new().unwrap();
6603        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
6604        s.handshake().unwrap();
6605
6606        // We'll send default data of 10 bytes on flow ID 0.
6607        let result = (11, 0, 1);
6608
6609        // Send request followed by DATAGRAM on client side.
6610        let (stream, req) = s.send_request(false).unwrap();
6611
6612        let body = s.send_body_client(stream, true).unwrap();
6613
6614        let mut recv_buf = vec![0; body.len()];
6615
6616        let ev_headers = Event::Headers {
6617            list: req,
6618            more_frames: true,
6619        };
6620
6621        s.send_dgram_client(0).unwrap();
6622
6623        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
6624        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
6625
6626        assert_eq!(s.poll_server(), Err(Error::Done));
6627
6628        assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
6629
6630        assert_eq!(s.poll_server(), Err(Error::Done));
6631
6632        assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
6633        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
6634        assert_eq!(s.poll_server(), Err(Error::Done));
6635
6636        // Send response followed by DATAGRAM on server side
6637        let resp = s.send_response(stream, false).unwrap();
6638
6639        let body = s.send_body_server(stream, true).unwrap();
6640
6641        let mut recv_buf = vec![0; body.len()];
6642
6643        let ev_headers = Event::Headers {
6644            list: resp,
6645            more_frames: true,
6646        };
6647
6648        s.send_dgram_server(0).unwrap();
6649
6650        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
6651        assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
6652
6653        assert_eq!(s.poll_client(), Err(Error::Done));
6654
6655        assert_eq!(s.recv_dgram_client(&mut buf), Ok(result));
6656
6657        assert_eq!(s.poll_client(), Err(Error::Done));
6658
6659        assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(body.len()));
6660
6661        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
6662        assert_eq!(s.poll_client(), Err(Error::Done));
6663    }
6664
6665    #[test]
6666    /// Send multiple DATAGRAMs and requests.
6667    fn poll_datagram_multi_read() {
6668        let mut buf = [0; 65535];
6669
6670        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
6671        config
6672            .load_cert_chain_from_pem_file("examples/cert.crt")
6673            .unwrap();
6674        config
6675            .load_priv_key_from_pem_file("examples/cert.key")
6676            .unwrap();
6677        config.set_application_protos(&[b"h3"]).unwrap();
6678        config.set_initial_max_data(1500);
6679        config.set_initial_max_stream_data_bidi_local(150);
6680        config.set_initial_max_stream_data_bidi_remote(150);
6681        config.set_initial_max_stream_data_uni(150);
6682        config.set_initial_max_streams_bidi(100);
6683        config.set_initial_max_streams_uni(5);
6684        config.verify_peer(false);
6685        config.enable_dgram(true, 100, 100);
6686
6687        let h3_config = Config::new().unwrap();
6688        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
6689        s.handshake().unwrap();
6690
6691        // 10 bytes on flow ID 0 and 2.
6692        let flow_0_result = (11, 0, 1);
6693        let flow_2_result = (11, 2, 1);
6694
6695        // Send requests followed by DATAGRAMs on client side.
6696        let (stream, req) = s.send_request(false).unwrap();
6697
6698        let body = s.send_body_client(stream, true).unwrap();
6699
6700        let mut recv_buf = vec![0; body.len()];
6701
6702        let ev_headers = Event::Headers {
6703            list: req,
6704            more_frames: true,
6705        };
6706
6707        s.send_dgram_client(0).unwrap();
6708        s.send_dgram_client(0).unwrap();
6709        s.send_dgram_client(0).unwrap();
6710        s.send_dgram_client(0).unwrap();
6711        s.send_dgram_client(0).unwrap();
6712        s.send_dgram_client(2).unwrap();
6713        s.send_dgram_client(2).unwrap();
6714        s.send_dgram_client(2).unwrap();
6715        s.send_dgram_client(2).unwrap();
6716        s.send_dgram_client(2).unwrap();
6717
6718        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
6719        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
6720
6721        assert_eq!(s.poll_server(), Err(Error::Done));
6722
6723        // Second cycle, start to read
6724        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
6725        assert_eq!(s.poll_server(), Err(Error::Done));
6726        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
6727        assert_eq!(s.poll_server(), Err(Error::Done));
6728        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
6729        assert_eq!(s.poll_server(), Err(Error::Done));
6730
6731        assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
6732        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
6733
6734        assert_eq!(s.poll_server(), Err(Error::Done));
6735
6736        // Third cycle.
6737        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
6738        assert_eq!(s.poll_server(), Err(Error::Done));
6739        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
6740        assert_eq!(s.poll_server(), Err(Error::Done));
6741        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
6742        assert_eq!(s.poll_server(), Err(Error::Done));
6743        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
6744        assert_eq!(s.poll_server(), Err(Error::Done));
6745        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
6746        assert_eq!(s.poll_server(), Err(Error::Done));
6747        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
6748        assert_eq!(s.poll_server(), Err(Error::Done));
6749        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
6750        assert_eq!(s.poll_server(), Err(Error::Done));
6751
6752        // Send response followed by DATAGRAM on server side
6753        let resp = s.send_response(stream, false).unwrap();
6754
6755        let body = s.send_body_server(stream, true).unwrap();
6756
6757        let mut recv_buf = vec![0; body.len()];
6758
6759        let ev_headers = Event::Headers {
6760            list: resp,
6761            more_frames: true,
6762        };
6763
6764        s.send_dgram_server(0).unwrap();
6765        s.send_dgram_server(0).unwrap();
6766        s.send_dgram_server(0).unwrap();
6767        s.send_dgram_server(0).unwrap();
6768        s.send_dgram_server(0).unwrap();
6769        s.send_dgram_server(2).unwrap();
6770        s.send_dgram_server(2).unwrap();
6771        s.send_dgram_server(2).unwrap();
6772        s.send_dgram_server(2).unwrap();
6773        s.send_dgram_server(2).unwrap();
6774
6775        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
6776        assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
6777
6778        assert_eq!(s.poll_client(), Err(Error::Done));
6779
6780        // Second cycle, start to read
6781        assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_0_result));
6782        assert_eq!(s.poll_client(), Err(Error::Done));
6783        assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_0_result));
6784        assert_eq!(s.poll_client(), Err(Error::Done));
6785        assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_0_result));
6786        assert_eq!(s.poll_client(), Err(Error::Done));
6787
6788        assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(body.len()));
6789        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
6790
6791        assert_eq!(s.poll_client(), Err(Error::Done));
6792
6793        // Third cycle.
6794        assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_0_result));
6795        assert_eq!(s.poll_client(), Err(Error::Done));
6796        assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_0_result));
6797        assert_eq!(s.poll_client(), Err(Error::Done));
6798        assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_2_result));
6799        assert_eq!(s.poll_client(), Err(Error::Done));
6800        assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_2_result));
6801        assert_eq!(s.poll_client(), Err(Error::Done));
6802        assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_2_result));
6803        assert_eq!(s.poll_client(), Err(Error::Done));
6804        assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_2_result));
6805        assert_eq!(s.poll_client(), Err(Error::Done));
6806        assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_2_result));
6807        assert_eq!(s.poll_client(), Err(Error::Done));
6808    }
6809
6810    #[test]
6811    /// Tests that the Finished event is not issued for streams of unknown type
6812    /// (e.g. GREASE).
6813    fn finished_is_for_requests() {
6814        let mut s = Session::new().unwrap();
6815        s.handshake().unwrap();
6816
6817        assert_eq!(s.poll_client(), Err(Error::Done));
6818        assert_eq!(s.poll_server(), Err(Error::Done));
6819
6820        assert_eq!(s.client.open_grease_stream(&mut s.pipe.client), Ok(()));
6821        assert_eq!(s.pipe.advance(), Ok(()));
6822
6823        assert_eq!(s.poll_client(), Err(Error::Done));
6824        assert_eq!(s.poll_server(), Err(Error::Done));
6825    }
6826
6827    #[test]
6828    /// Tests that streams are marked as finished only once.
6829    fn finished_once() {
6830        let mut s = Session::new().unwrap();
6831        s.handshake().unwrap();
6832
6833        let (stream, req) = s.send_request(false).unwrap();
6834        let body = s.send_body_client(stream, true).unwrap();
6835
6836        let mut recv_buf = vec![0; body.len()];
6837
6838        let ev_headers = Event::Headers {
6839            list: req,
6840            more_frames: true,
6841        };
6842
6843        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
6844        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
6845
6846        assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
6847        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
6848
6849        assert_eq!(s.recv_body_server(stream, &mut recv_buf), Err(Error::Done));
6850        assert_eq!(s.poll_server(), Err(Error::Done));
6851    }
6852
6853    #[test]
6854    /// Tests that the Data event is properly re-armed.
6855    fn data_event_rearm() {
6856        let bytes = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
6857
6858        let mut s = Session::new().unwrap();
6859        s.handshake().unwrap();
6860
6861        let (r1_id, r1_hdrs) = s.send_request(false).unwrap();
6862
6863        let mut recv_buf = vec![0; bytes.len()];
6864
6865        let r1_ev_headers = Event::Headers {
6866            list: r1_hdrs,
6867            more_frames: true,
6868        };
6869
6870        // Manually send an incomplete DATA frame (i.e. the frame size is longer
6871        // than the actual data sent).
6872        {
6873            let mut d = [42; 10];
6874            let mut b = octets::OctetsMut::with_slice(&mut d);
6875
6876            b.put_varint(frame::DATA_FRAME_TYPE_ID).unwrap();
6877            b.put_varint(bytes.len() as u64).unwrap();
6878            let off = b.off();
6879            s.pipe.client.stream_send(r1_id, &d[..off], false).unwrap();
6880
6881            assert_eq!(
6882                s.pipe.client.stream_send(r1_id, &bytes[..5], false),
6883                Ok(5)
6884            );
6885
6886            s.advance().ok();
6887        }
6888
6889        assert_eq!(s.poll_server(), Ok((r1_id, r1_ev_headers)));
6890        assert_eq!(s.poll_server(), Ok((r1_id, Event::Data)));
6891        assert_eq!(s.poll_server(), Err(Error::Done));
6892
6893        // Read the available body data.
6894        assert_eq!(s.recv_body_server(r1_id, &mut recv_buf), Ok(5));
6895
6896        // Send the remaining DATA payload.
6897        assert_eq!(s.pipe.client.stream_send(r1_id, &bytes[5..], false), Ok(5));
6898        s.advance().ok();
6899
6900        assert_eq!(s.poll_server(), Ok((r1_id, Event::Data)));
6901        assert_eq!(s.poll_server(), Err(Error::Done));
6902
6903        // Read the rest of the body data.
6904        assert_eq!(s.recv_body_server(r1_id, &mut recv_buf), Ok(5));
6905        assert_eq!(s.poll_server(), Err(Error::Done));
6906
6907        // Send more data.
6908        let r1_body = s.send_body_client(r1_id, false).unwrap();
6909
6910        assert_eq!(s.poll_server(), Ok((r1_id, Event::Data)));
6911        assert_eq!(s.poll_server(), Err(Error::Done));
6912
6913        assert_eq!(s.recv_body_server(r1_id, &mut recv_buf), Ok(r1_body.len()));
6914
6915        // Send a new request to ensure cross-stream events don't break rearming.
6916        let (r2_id, r2_hdrs) = s.send_request(false).unwrap();
6917        let r2_ev_headers = Event::Headers {
6918            list: r2_hdrs,
6919            more_frames: true,
6920        };
6921        let r2_body = s.send_body_client(r2_id, false).unwrap();
6922
6923        s.advance().ok();
6924
6925        assert_eq!(s.poll_server(), Ok((r2_id, r2_ev_headers)));
6926        assert_eq!(s.poll_server(), Ok((r2_id, Event::Data)));
6927        assert_eq!(s.recv_body_server(r2_id, &mut recv_buf), Ok(r2_body.len()));
6928        assert_eq!(s.poll_server(), Err(Error::Done));
6929
6930        // Send more data on request 1, then trailing HEADERS.
6931        let r1_body = s.send_body_client(r1_id, false).unwrap();
6932
6933        let trailers = vec![Header::new(b"hello", b"world")];
6934
6935        s.client
6936            .send_headers(&mut s.pipe.client, r1_id, &trailers, true)
6937            .unwrap();
6938
6939        let r1_ev_trailers = Event::Headers {
6940            list: trailers.clone(),
6941            more_frames: false,
6942        };
6943
6944        s.advance().ok();
6945
6946        assert_eq!(s.poll_server(), Ok((r1_id, Event::Data)));
6947        assert_eq!(s.recv_body_server(r1_id, &mut recv_buf), Ok(r1_body.len()));
6948
6949        assert_eq!(s.poll_server(), Ok((r1_id, r1_ev_trailers)));
6950        assert_eq!(s.poll_server(), Ok((r1_id, Event::Finished)));
6951        assert_eq!(s.poll_server(), Err(Error::Done));
6952
6953        // Send more data on request 2, then trailing HEADERS.
6954        let r2_body = s.send_body_client(r2_id, false).unwrap();
6955
6956        s.client
6957            .send_headers(&mut s.pipe.client, r2_id, &trailers, false)
6958            .unwrap();
6959
6960        let r2_ev_trailers = Event::Headers {
6961            list: trailers,
6962            more_frames: true,
6963        };
6964
6965        s.advance().ok();
6966
6967        assert_eq!(s.poll_server(), Ok((r2_id, Event::Data)));
6968        assert_eq!(s.recv_body_server(r2_id, &mut recv_buf), Ok(r2_body.len()));
6969        assert_eq!(s.poll_server(), Ok((r2_id, r2_ev_trailers)));
6970        assert_eq!(s.poll_server(), Err(Error::Done));
6971
6972        let (r3_id, r3_hdrs) = s.send_request(false).unwrap();
6973
6974        let r3_ev_headers = Event::Headers {
6975            list: r3_hdrs,
6976            more_frames: true,
6977        };
6978
6979        // Manually send an incomplete DATA frame (i.e. only the header is sent).
6980        {
6981            let mut d = [42; 10];
6982            let mut b = octets::OctetsMut::with_slice(&mut d);
6983
6984            b.put_varint(frame::DATA_FRAME_TYPE_ID).unwrap();
6985            b.put_varint(bytes.len() as u64).unwrap();
6986            let off = b.off();
6987            s.pipe.client.stream_send(r3_id, &d[..off], false).unwrap();
6988
6989            s.advance().ok();
6990        }
6991
6992        assert_eq!(s.poll_server(), Ok((r3_id, r3_ev_headers)));
6993        assert_eq!(s.poll_server(), Ok((r3_id, Event::Data)));
6994        assert_eq!(s.poll_server(), Err(Error::Done));
6995
6996        assert_eq!(s.recv_body_server(r3_id, &mut recv_buf), Err(Error::Done));
6997
6998        assert_eq!(s.pipe.client.stream_send(r3_id, &bytes[..5], false), Ok(5));
6999
7000        s.advance().ok();
7001
7002        assert_eq!(s.poll_server(), Ok((r3_id, Event::Data)));
7003        assert_eq!(s.poll_server(), Err(Error::Done));
7004
7005        assert_eq!(s.recv_body_server(r3_id, &mut recv_buf), Ok(5));
7006
7007        assert_eq!(s.pipe.client.stream_send(r3_id, &bytes[5..], false), Ok(5));
7008        s.advance().ok();
7009
7010        assert_eq!(s.poll_server(), Ok((r3_id, Event::Data)));
7011        assert_eq!(s.poll_server(), Err(Error::Done));
7012
7013        assert_eq!(s.recv_body_server(r3_id, &mut recv_buf), Ok(5));
7014
7015        // Buffer multiple data frames.
7016        let body = s.send_body_client(r3_id, false).unwrap();
7017        s.send_body_client(r3_id, false).unwrap();
7018        s.send_body_client(r3_id, false).unwrap();
7019
7020        assert_eq!(s.poll_server(), Ok((r3_id, Event::Data)));
7021        assert_eq!(s.poll_server(), Err(Error::Done));
7022
7023        {
7024            let mut d = [42; 10];
7025            let mut b = octets::OctetsMut::with_slice(&mut d);
7026
7027            b.put_varint(frame::DATA_FRAME_TYPE_ID).unwrap();
7028            b.put_varint(0).unwrap();
7029            let off = b.off();
7030            s.pipe.client.stream_send(r3_id, &d[..off], true).unwrap();
7031
7032            s.advance().ok();
7033        }
7034
7035        let mut recv_buf = vec![0; bytes.len() * 3];
7036
7037        assert_eq!(s.recv_body_server(r3_id, &mut recv_buf), Ok(body.len() * 3));
7038    }
7039
7040    #[test]
7041    /// Tests that the Datagram event is properly re-armed.
7042    fn dgram_event_rearm() {
7043        let mut buf = [0; 65535];
7044
7045        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
7046        config
7047            .load_cert_chain_from_pem_file("examples/cert.crt")
7048            .unwrap();
7049        config
7050            .load_priv_key_from_pem_file("examples/cert.key")
7051            .unwrap();
7052        config.set_application_protos(&[b"h3"]).unwrap();
7053        config.set_initial_max_data(1500);
7054        config.set_initial_max_stream_data_bidi_local(150);
7055        config.set_initial_max_stream_data_bidi_remote(150);
7056        config.set_initial_max_stream_data_uni(150);
7057        config.set_initial_max_streams_bidi(100);
7058        config.set_initial_max_streams_uni(5);
7059        config.verify_peer(false);
7060        config.enable_dgram(true, 100, 100);
7061
7062        let h3_config = Config::new().unwrap();
7063        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
7064        s.handshake().unwrap();
7065
7066        // 10 bytes on flow ID 0 and 2.
7067        let flow_0_result = (11, 0, 1);
7068        let flow_2_result = (11, 2, 1);
7069
7070        // Send requests followed by DATAGRAMs on client side.
7071        let (stream, req) = s.send_request(false).unwrap();
7072
7073        let body = s.send_body_client(stream, true).unwrap();
7074
7075        let mut recv_buf = vec![0; body.len()];
7076
7077        let ev_headers = Event::Headers {
7078            list: req,
7079            more_frames: true,
7080        };
7081
7082        s.send_dgram_client(0).unwrap();
7083        s.send_dgram_client(0).unwrap();
7084        s.send_dgram_client(2).unwrap();
7085        s.send_dgram_client(2).unwrap();
7086
7087        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
7088        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
7089
7090        assert_eq!(s.poll_server(), Err(Error::Done));
7091        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
7092
7093        assert_eq!(s.poll_server(), Err(Error::Done));
7094        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
7095
7096        assert_eq!(s.poll_server(), Err(Error::Done));
7097        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
7098
7099        assert_eq!(s.poll_server(), Err(Error::Done));
7100        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
7101
7102        assert_eq!(s.poll_server(), Err(Error::Done));
7103
7104        s.send_dgram_client(0).unwrap();
7105        s.send_dgram_client(2).unwrap();
7106
7107        assert_eq!(s.poll_server(), Err(Error::Done));
7108
7109        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
7110        assert_eq!(s.poll_server(), Err(Error::Done));
7111
7112        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
7113        assert_eq!(s.poll_server(), Err(Error::Done));
7114
7115        assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
7116        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
7117
7118        // Verify that dgram counts are incremented.
7119        assert_eq!(s.pipe.client.dgram_sent_count, 6);
7120        assert_eq!(s.pipe.client.dgram_recv_count, 0);
7121        assert_eq!(s.pipe.server.dgram_sent_count, 0);
7122        assert_eq!(s.pipe.server.dgram_recv_count, 6);
7123
7124        let server_path = s.pipe.server.paths.get_active().expect("no active");
7125        let client_path = s.pipe.client.paths.get_active().expect("no active");
7126        assert_eq!(client_path.dgram_sent_count, 6);
7127        assert_eq!(client_path.dgram_recv_count, 0);
7128        assert_eq!(server_path.dgram_sent_count, 0);
7129        assert_eq!(server_path.dgram_recv_count, 6);
7130    }
7131
7132    #[test]
7133    fn reset_stream() {
7134        let mut buf = [0; 65535];
7135
7136        let mut s = Session::new().unwrap();
7137        s.handshake().unwrap();
7138
7139        // Client sends request.
7140        let (stream, req) = s.send_request(false).unwrap();
7141
7142        let ev_headers = Event::Headers {
7143            list: req,
7144            more_frames: true,
7145        };
7146
7147        // Server sends response and closes stream.
7148        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
7149        assert_eq!(s.poll_server(), Err(Error::Done));
7150
7151        let resp = s.send_response(stream, true).unwrap();
7152
7153        let ev_headers = Event::Headers {
7154            list: resp,
7155            more_frames: false,
7156        };
7157
7158        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
7159        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
7160        assert_eq!(s.poll_client(), Err(Error::Done));
7161
7162        // Client sends RESET_STREAM, closing stream.
7163        let frames = [crate::frame::Frame::ResetStream {
7164            stream_id: stream,
7165            error_code: 42,
7166            final_size: 68,
7167        }];
7168
7169        let pkt_type = crate::packet::Type::Short;
7170        assert_eq!(
7171            s.pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
7172            Ok(39)
7173        );
7174
7175        // Server issues Reset event for the stream.
7176        assert_eq!(s.poll_server(), Ok((stream, Event::Reset(42))));
7177        assert_eq!(s.poll_server(), Err(Error::Done));
7178
7179        // Sending RESET_STREAM again shouldn't trigger another Reset event.
7180        assert_eq!(
7181            s.pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
7182            Ok(39)
7183        );
7184
7185        assert_eq!(s.poll_server(), Err(Error::Done));
7186    }
7187
7188    /// The client shuts down the stream's write direction, the server
7189    /// shuts down its side with fin
7190    #[test]
7191    fn client_shutdown_write_server_fin() {
7192        let mut buf = [0; 65535];
7193        let mut s = Session::new().unwrap();
7194        s.handshake().unwrap();
7195
7196        // Client sends request.
7197        let (stream, req) = s.send_request(false).unwrap();
7198
7199        let ev_headers = Event::Headers {
7200            list: req,
7201            more_frames: true,
7202        };
7203
7204        // Server sends response and closes stream.
7205        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
7206        assert_eq!(s.poll_server(), Err(Error::Done));
7207
7208        let resp = s.send_response(stream, true).unwrap();
7209
7210        let ev_headers = Event::Headers {
7211            list: resp,
7212            more_frames: false,
7213        };
7214
7215        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
7216        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
7217        assert_eq!(s.poll_client(), Err(Error::Done));
7218
7219        // Client shuts down stream ==> sends RESET_STREAM
7220        assert_eq!(
7221            s.pipe
7222                .client
7223                .stream_shutdown(stream, crate::Shutdown::Write, 42),
7224            Ok(())
7225        );
7226        assert_eq!(s.advance(), Ok(()));
7227
7228        // Server sees the Reset event for the stream.
7229        assert_eq!(s.poll_server(), Ok((stream, Event::Reset(42))));
7230        assert_eq!(s.poll_server(), Err(Error::Done));
7231
7232        // Streams have been collected by quiche
7233        assert!(s.pipe.server.streams.is_collected(stream));
7234        assert!(s.pipe.client.streams.is_collected(stream));
7235
7236        // Client sends another request, server sends response without fin
7237        //
7238        let (stream, req) = s.send_request(false).unwrap();
7239
7240        let ev_headers = Event::Headers {
7241            list: req,
7242            more_frames: true,
7243        };
7244
7245        // Check that server has received the request.
7246        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
7247        assert_eq!(s.poll_server(), Err(Error::Done));
7248
7249        // Server sends reponse without closing the stream.
7250        let resp = s.send_response(stream, false).unwrap();
7251
7252        let ev_headers = Event::Headers {
7253            list: resp,
7254            more_frames: true,
7255        };
7256
7257        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
7258        assert_eq!(s.poll_client(), Err(Error::Done));
7259
7260        // Client shuts down stream ==> sends RESET_STREAM
7261        assert_eq!(
7262            s.pipe
7263                .client
7264                .stream_shutdown(stream, crate::Shutdown::Write, 42),
7265            Ok(())
7266        );
7267        assert_eq!(s.advance(), Ok(()));
7268
7269        // Server sees the Reset event for the stream.
7270        assert_eq!(s.poll_server(), Ok((stream, Event::Reset(42))));
7271        assert_eq!(s.poll_server(), Err(Error::Done));
7272
7273        // Server sends body and closes the stream.
7274        s.send_body_server(stream, true).unwrap();
7275
7276        // Stream has been collected on server by quiche
7277        assert!(s.pipe.server.streams.is_collected(stream));
7278        // Client stream has not been collected, the client needs to
7279        // read the fin from the stream first.
7280        assert!(!s.pipe.client.streams.is_collected(stream));
7281        assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
7282        s.recv_body_client(stream, &mut buf).unwrap();
7283        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
7284        assert_eq!(s.poll_client(), Err(Error::Done));
7285        assert!(s.pipe.client.streams.is_collected(stream));
7286    }
7287
7288    #[test]
7289    fn client_shutdown_read() {
7290        let mut buf = [0; 65535];
7291        let mut s = Session::new().unwrap();
7292        s.handshake().unwrap();
7293
7294        // Client sends request and leaves stream open.
7295        let (stream, req) = s.send_request(false).unwrap();
7296
7297        let ev_headers = Event::Headers {
7298            list: req,
7299            more_frames: true,
7300        };
7301
7302        // Server sends response and leaves stream open.
7303        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
7304        assert_eq!(s.poll_server(), Err(Error::Done));
7305
7306        let resp = s.send_response(stream, false).unwrap();
7307
7308        let ev_headers = Event::Headers {
7309            list: resp,
7310            more_frames: true,
7311        };
7312
7313        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
7314        assert_eq!(s.poll_client(), Err(Error::Done));
7315        // Client shuts down read
7316        assert_eq!(
7317            s.pipe
7318                .client
7319                .stream_shutdown(stream, crate::Shutdown::Read, 42),
7320            Ok(())
7321        );
7322        assert_eq!(s.advance(), Ok(()));
7323
7324        // Stream is writable on server side, but returns StreamStopped
7325        assert_eq!(s.poll_server(), Err(Error::Done));
7326        let writables: Vec<u64> = s.pipe.server.writable().collect();
7327        assert!(writables.contains(&stream));
7328        assert_eq!(
7329            s.send_body_server(stream, false),
7330            Err(Error::TransportError(crate::Error::StreamStopped(42)))
7331        );
7332
7333        // Client needs to finish its side by sending a fin
7334        assert_eq!(
7335            s.client.send_body(&mut s.pipe.client, stream, &[], true),
7336            Ok(0)
7337        );
7338        assert_eq!(s.advance(), Ok(()));
7339        // Note, we get an Event::Data for an empty buffer today. But it
7340        // would also be fine to not get it.
7341        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
7342        assert_eq!(s.recv_body_server(stream, &mut buf), Err(Error::Done));
7343        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
7344        assert_eq!(s.poll_server(), Err(Error::Done));
7345
7346        // Since the client has already send a fin, the stream is collected
7347        // on both client and server
7348        assert!(s.pipe.client.streams.is_collected(stream));
7349        assert!(s.pipe.server.streams.is_collected(stream));
7350    }
7351
7352    #[test]
7353    fn reset_finished_at_server() {
7354        let mut s = Session::new().unwrap();
7355        s.handshake().unwrap();
7356
7357        // Client sends HEADERS and doesn't fin
7358        let (stream, _req) = s.send_request(false).unwrap();
7359
7360        // ..then Client sends RESET_STREAM
7361        assert_eq!(
7362            s.pipe.client.stream_shutdown(0, crate::Shutdown::Write, 0),
7363            Ok(())
7364        );
7365
7366        assert_eq!(s.pipe.advance(), Ok(()));
7367
7368        // Server receives just a reset
7369        assert_eq!(s.poll_server(), Ok((stream, Event::Reset(0))));
7370        assert_eq!(s.poll_server(), Err(Error::Done));
7371
7372        // Client sends HEADERS and fin
7373        let (stream, req) = s.send_request(true).unwrap();
7374
7375        // ..then Client sends RESET_STREAM
7376        assert_eq!(
7377            s.pipe.client.stream_shutdown(4, crate::Shutdown::Write, 0),
7378            Ok(())
7379        );
7380
7381        let ev_headers = Event::Headers {
7382            list: req,
7383            more_frames: false,
7384        };
7385
7386        // Server receives headers and fin.
7387        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
7388        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
7389        assert_eq!(s.poll_server(), Err(Error::Done));
7390    }
7391
7392    #[test]
7393    fn reset_finished_at_server_with_data_pending() {
7394        let mut s = Session::new().unwrap();
7395        s.handshake().unwrap();
7396
7397        // Client sends HEADERS and doesn't fin.
7398        let (stream, req) = s.send_request(false).unwrap();
7399
7400        assert!(s.send_body_client(stream, false).is_ok());
7401
7402        assert_eq!(s.pipe.advance(), Ok(()));
7403
7404        let ev_headers = Event::Headers {
7405            list: req,
7406            more_frames: true,
7407        };
7408
7409        // Server receives headers and data...
7410        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
7411        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
7412
7413        // ..then Client sends RESET_STREAM.
7414        assert_eq!(
7415            s.pipe
7416                .client
7417                .stream_shutdown(stream, crate::Shutdown::Write, 0),
7418            Ok(())
7419        );
7420
7421        assert_eq!(s.pipe.advance(), Ok(()));
7422
7423        // The server does *not* attempt to read from the stream,
7424        // but polls and receives the reset and there are no more
7425        // readable streams.
7426        assert_eq!(s.poll_server(), Ok((stream, Event::Reset(0))));
7427        assert_eq!(s.poll_server(), Err(Error::Done));
7428        assert_eq!(s.pipe.server.readable().len(), 0);
7429    }
7430
7431    #[test]
7432    fn reset_finished_at_server_with_data_pending_2() {
7433        let mut s = Session::new().unwrap();
7434        s.handshake().unwrap();
7435
7436        // Client sends HEADERS and doesn't fin.
7437        let (stream, req) = s.send_request(false).unwrap();
7438
7439        assert!(s.send_body_client(stream, false).is_ok());
7440
7441        assert_eq!(s.pipe.advance(), Ok(()));
7442
7443        let ev_headers = Event::Headers {
7444            list: req,
7445            more_frames: true,
7446        };
7447
7448        // Server receives headers and data...
7449        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
7450        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
7451
7452        // ..then Client sends RESET_STREAM.
7453        assert_eq!(
7454            s.pipe
7455                .client
7456                .stream_shutdown(stream, crate::Shutdown::Write, 0),
7457            Ok(())
7458        );
7459
7460        assert_eq!(s.pipe.advance(), Ok(()));
7461
7462        // Server reads from the stream and receives the reset while
7463        // attempting to read.
7464        assert_eq!(
7465            s.recv_body_server(stream, &mut [0; 100]),
7466            Err(Error::TransportError(crate::Error::StreamReset(0)))
7467        );
7468
7469        // No more events and there are no more readable streams.
7470        assert_eq!(s.poll_server(), Err(Error::Done));
7471        assert_eq!(s.pipe.server.readable().len(), 0);
7472    }
7473
7474    #[test]
7475    fn reset_finished_at_client() {
7476        let mut buf = [0; 65535];
7477        let mut s = Session::new().unwrap();
7478        s.handshake().unwrap();
7479
7480        // Client sends HEADERS and doesn't fin
7481        let (stream, req) = s.send_request(false).unwrap();
7482
7483        let ev_headers = Event::Headers {
7484            list: req,
7485            more_frames: true,
7486        };
7487
7488        // Server receives headers.
7489        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
7490        assert_eq!(s.poll_server(), Err(Error::Done));
7491
7492        // Server sends response and doesn't fin
7493        s.send_response(stream, false).unwrap();
7494
7495        assert_eq!(s.pipe.advance(), Ok(()));
7496
7497        // .. then Server sends RESET_STREAM
7498        assert_eq!(
7499            s.pipe
7500                .server
7501                .stream_shutdown(stream, crate::Shutdown::Write, 0),
7502            Ok(())
7503        );
7504
7505        assert_eq!(s.pipe.advance(), Ok(()));
7506
7507        // Client receives Reset only
7508        assert_eq!(s.poll_client(), Ok((stream, Event::Reset(0))));
7509        assert_eq!(s.poll_server(), Err(Error::Done));
7510
7511        // Client sends headers and fin.
7512        let (stream, req) = s.send_request(true).unwrap();
7513
7514        let ev_headers = Event::Headers {
7515            list: req,
7516            more_frames: false,
7517        };
7518
7519        // Server receives headers and fin.
7520        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
7521        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
7522        assert_eq!(s.poll_server(), Err(Error::Done));
7523
7524        // Server sends response and fin
7525        let resp = s.send_response(stream, true).unwrap();
7526
7527        assert_eq!(s.pipe.advance(), Ok(()));
7528
7529        // ..then Server sends RESET_STREAM
7530        let frames = [crate::frame::Frame::ResetStream {
7531            stream_id: stream,
7532            error_code: 42,
7533            final_size: 68,
7534        }];
7535
7536        let pkt_type = crate::packet::Type::Short;
7537        assert_eq!(
7538            s.pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
7539            Ok(39)
7540        );
7541
7542        assert_eq!(s.pipe.advance(), Ok(()));
7543
7544        let ev_headers = Event::Headers {
7545            list: resp,
7546            more_frames: false,
7547        };
7548
7549        // Client receives headers and fin.
7550        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
7551        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
7552        assert_eq!(s.poll_client(), Err(Error::Done));
7553    }
7554}
7555
7556#[cfg(feature = "ffi")]
7557mod ffi;
7558#[cfg(feature = "internal")]
7559#[doc(hidden)]
7560pub mod frame;
7561#[cfg(not(feature = "internal"))]
7562mod frame;
7563#[doc(hidden)]
7564pub mod qpack;
7565mod stream;