quiche/h3/
mod.rs

1// Copyright (C) 2019, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27//! HTTP/3 wire protocol and QPACK implementation.
28//!
29//! This module provides a high level API for sending and receiving HTTP/3
30//! requests and responses on top of the QUIC transport protocol.
31//!
32//! ## Connection setup
33//!
34//! HTTP/3 connections require a QUIC transport-layer connection, see
35//! [Connection setup] for a full description of the setup process.
36//!
37//! To use HTTP/3, the QUIC connection must be configured with a suitable
38//! Application Layer Protocol Negotiation (ALPN) Protocol ID:
39//!
40//! ```
41//! let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION)?;
42//! config.set_application_protos(quiche::h3::APPLICATION_PROTOCOL)?;
43//! # Ok::<(), quiche::Error>(())
44//! ```
45//!
46//! The QUIC handshake is driven by [sending] and [receiving] QUIC packets.
47//!
48//! Once the handshake has completed, the first step in establishing an HTTP/3
49//! connection is creating its configuration object:
50//!
51//! ```
52//! let h3_config = quiche::h3::Config::new()?;
53//! # Ok::<(), quiche::h3::Error>(())
54//! ```
55//!
56//! HTTP/3 client and server connections are both created using the
57//! [`with_transport()`] function, the role is inferred from the type of QUIC
58//! connection:
59//!
60//! ```no_run
61//! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap();
62//! # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]);
63//! # let peer = "127.0.0.1:1234".parse().unwrap();
64//! # let local = "127.0.0.1:4321".parse().unwrap();
65//! # let mut conn = quiche::accept(&scid, None, local, peer, &mut config).unwrap();
66//! # let h3_config = quiche::h3::Config::new()?;
67//! let h3_conn = quiche::h3::Connection::with_transport(&mut conn, &h3_config)?;
68//! # Ok::<(), quiche::h3::Error>(())
69//! ```
70//!
71//! ## Sending a request
72//!
73//! An HTTP/3 client can send a request by using the connection's
74//! [`send_request()`] method to queue request headers; [sending] QUIC packets
75//! causes the requests to get sent to the peer:
76//!
77//! ```no_run
78//! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap();
79//! # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]);
80//! # let peer = "127.0.0.1:1234".parse().unwrap();
81//! # let local = "127.0.0.1:4321".parse().unwrap();
82//! # let mut conn = quiche::connect(None, &scid, local, peer, &mut config).unwrap();
83//! # let h3_config = quiche::h3::Config::new()?;
84//! # let mut h3_conn = quiche::h3::Connection::with_transport(&mut conn, &h3_config)?;
85//! let req = vec![
86//!     quiche::h3::Header::new(b":method", b"GET"),
87//!     quiche::h3::Header::new(b":scheme", b"https"),
88//!     quiche::h3::Header::new(b":authority", b"quic.tech"),
89//!     quiche::h3::Header::new(b":path", b"/"),
90//!     quiche::h3::Header::new(b"user-agent", b"quiche"),
91//! ];
92//!
93//! h3_conn.send_request(&mut conn, &req, true)?;
94//! # Ok::<(), quiche::h3::Error>(())
95//! ```
96//!
97//! An HTTP/3 client can send a request with additional body data by using
98//! the connection's [`send_body()`] method:
99//!
100//! ```no_run
101//! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap();
102//! # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]);
103//! # let peer = "127.0.0.1:1234".parse().unwrap();
104//! # let local = "127.0.0.1:4321".parse().unwrap();
105//! # let mut conn = quiche::connect(None, &scid, local, peer, &mut config).unwrap();
106//! # let h3_config = quiche::h3::Config::new()?;
107//! # let mut h3_conn = quiche::h3::Connection::with_transport(&mut conn, &h3_config)?;
108//! let req = vec![
109//!     quiche::h3::Header::new(b":method", b"GET"),
110//!     quiche::h3::Header::new(b":scheme", b"https"),
111//!     quiche::h3::Header::new(b":authority", b"quic.tech"),
112//!     quiche::h3::Header::new(b":path", b"/"),
113//!     quiche::h3::Header::new(b"user-agent", b"quiche"),
114//! ];
115//!
116//! let stream_id = h3_conn.send_request(&mut conn, &req, false)?;
117//! h3_conn.send_body(&mut conn, stream_id, b"Hello World!", true)?;
118//! # Ok::<(), quiche::h3::Error>(())
119//! ```
120//!
121//! ## Handling requests and responses
122//!
123//! After [receiving] QUIC packets, HTTP/3 data is processed using the
124//! connection's [`poll()`] method. On success, this returns an [`Event`] object
125//! and an ID corresponding to the stream where the `Event` originated.
126//!
127//! An HTTP/3 server uses [`poll()`] to read requests and responds to them using
128//! [`send_response()`] and [`send_body()`]:
129//!
130//! ```no_run
131//! use quiche::h3::NameValue;
132//!
133//! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap();
134//! # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]);
135//! # let peer = "127.0.0.1:1234".parse().unwrap();
136//! # let local = "127.0.0.1:1234".parse().unwrap();
137//! # let mut conn = quiche::accept(&scid, None, local, peer, &mut config).unwrap();
138//! # let h3_config = quiche::h3::Config::new()?;
139//! # let mut h3_conn = quiche::h3::Connection::with_transport(&mut conn, &h3_config)?;
140//! loop {
141//!     match h3_conn.poll(&mut conn) {
142//!         Ok((stream_id, quiche::h3::Event::Headers{list, more_frames})) => {
143//!             let mut headers = list.into_iter();
144//!
145//!             // Look for the request's method.
146//!             let method = headers.find(|h| h.name() == b":method").unwrap();
147//!
148//!             // Look for the request's path.
149//!             let path = headers.find(|h| h.name() == b":path").unwrap();
150//!
151//!             if method.value() == b"GET" && path.value() == b"/" {
152//!                 let resp = vec![
153//!                     quiche::h3::Header::new(b":status", 200.to_string().as_bytes()),
154//!                     quiche::h3::Header::new(b"server", b"quiche"),
155//!                 ];
156//!
157//!                 h3_conn.send_response(&mut conn, stream_id, &resp, false)?;
158//!                 h3_conn.send_body(&mut conn, stream_id, b"Hello World!", true)?;
159//!             }
160//!         },
161//!
162//!         Ok((stream_id, quiche::h3::Event::Data)) => {
163//!             // Request body data, handle it.
164//!             # return Ok(());
165//!         },
166//!
167//!         Ok((stream_id, quiche::h3::Event::Finished)) => {
168//!             // Peer terminated stream, handle it.
169//!         },
170//!
171//!         Ok((stream_id, quiche::h3::Event::Reset(err))) => {
172//!             // Peer reset the stream, handle it.
173//!         },
174//!
175//!         Ok((_flow_id, quiche::h3::Event::PriorityUpdate)) => (),
176//!
177//!         Ok((goaway_id, quiche::h3::Event::GoAway)) => {
178//!              // Peer signalled it is going away, handle it.
179//!         },
180//!
181//!         Err(quiche::h3::Error::Done) => {
182//!             // Done reading.
183//!             break;
184//!         },
185//!
186//!         Err(e) => {
187//!             // An error occurred, handle it.
188//!             break;
189//!         },
190//!     }
191//! }
192//! # Ok::<(), quiche::h3::Error>(())
193//! ```
194//!
195//! An HTTP/3 client uses [`poll()`] to read responses:
196//!
197//! ```no_run
198//! use quiche::h3::NameValue;
199//!
200//! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap();
201//! # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]);
202//! # let peer = "127.0.0.1:1234".parse().unwrap();
203//! # let local = "127.0.0.1:1234".parse().unwrap();
204//! # let mut conn = quiche::connect(None, &scid, local, peer, &mut config).unwrap();
205//! # let h3_config = quiche::h3::Config::new()?;
206//! # let mut h3_conn = quiche::h3::Connection::with_transport(&mut conn, &h3_config)?;
207//! loop {
208//!     match h3_conn.poll(&mut conn) {
209//!         Ok((stream_id, quiche::h3::Event::Headers{list, more_frames})) => {
210//!             let status = list.iter().find(|h| h.name() == b":status").unwrap();
211//!             println!("Received {} response on stream {}",
212//!                      std::str::from_utf8(status.value()).unwrap(),
213//!                      stream_id);
214//!         },
215//!
216//!         Ok((stream_id, quiche::h3::Event::Data)) => {
217//!             let mut body = vec![0; 4096];
218//!
219//!             // Consume all body data received on the stream.
220//!             while let Ok(read) =
221//!                 h3_conn.recv_body(&mut conn, stream_id, &mut body)
222//!             {
223//!                 println!("Received {} bytes of payload on stream {}",
224//!                          read, stream_id);
225//!             }
226//!         },
227//!
228//!         Ok((stream_id, quiche::h3::Event::Finished)) => {
229//!             // Peer terminated stream, handle it.
230//!         },
231//!
232//!         Ok((stream_id, quiche::h3::Event::Reset(err))) => {
233//!             // Peer reset the stream, handle it.
234//!         },
235//!
236//!         Ok((_prioritized_element_id, quiche::h3::Event::PriorityUpdate)) => (),
237//!
238//!         Ok((goaway_id, quiche::h3::Event::GoAway)) => {
239//!              // Peer signalled it is going away, handle it.
240//!         },
241//!
242//!         Err(quiche::h3::Error::Done) => {
243//!             // Done reading.
244//!             break;
245//!         },
246//!
247//!         Err(e) => {
248//!             // An error occurred, handle it.
249//!             break;
250//!         },
251//!     }
252//! }
253//! # Ok::<(), quiche::h3::Error>(())
254//! ```
255//!
256//! ## Detecting end of request or response
257//!
258//! A single HTTP/3 request or response may consist of several HEADERS and DATA
259//! frames; it is finished when the QUIC stream is closed. Calling [`poll()`]
260//! repeatedly will generate an [`Event`] for each of these. The application may
261//! use these event to do additional HTTP semantic validation.
262//!
263//! ## HTTP/3 protocol errors
264//!
265//! Quiche is responsible for managing the HTTP/3 connection, ensuring it is in
266//! a correct state and validating all messages received by a peer. This mainly
267//! takes place in the [`poll()`] method. If an HTTP/3 error occurs, quiche will
268//! close the connection and send an appropriate CONNECTION_CLOSE frame to the
269//! peer. An [`Error`] is returned to the application so that it can perform any
270//! required tidy up such as closing sockets.
271//!
272//! [`application_proto()`]: ../struct.Connection.html#method.application_proto
273//! [`stream_finished()`]: ../struct.Connection.html#method.stream_finished
274//! [Connection setup]: ../index.html#connection-setup
275//! [sending]: ../index.html#generating-outgoing-packets
276//! [receiving]: ../index.html#handling-incoming-packets
277//! [`with_transport()`]: struct.Connection.html#method.with_transport
278//! [`poll()`]: struct.Connection.html#method.poll
279//! [`Event`]: enum.Event.html
280//! [`Error`]: enum.Error.html
281//! [`send_request()`]: struct.Connection.html#method.send_response
282//! [`send_response()`]: struct.Connection.html#method.send_response
283//! [`send_body()`]: struct.Connection.html#method.send_body
284
285use std::collections::HashSet;
286use std::collections::VecDeque;
287
288#[cfg(feature = "sfv")]
289use std::convert::TryFrom;
290use std::fmt;
291use std::fmt::Write;
292
293#[cfg(feature = "qlog")]
294use qlog::events::h3::H3FrameCreated;
295#[cfg(feature = "qlog")]
296use qlog::events::h3::H3FrameParsed;
297#[cfg(feature = "qlog")]
298use qlog::events::h3::H3Owner;
299#[cfg(feature = "qlog")]
300use qlog::events::h3::H3PriorityTargetStreamType;
301#[cfg(feature = "qlog")]
302use qlog::events::h3::H3StreamType;
303#[cfg(feature = "qlog")]
304use qlog::events::h3::H3StreamTypeSet;
305#[cfg(feature = "qlog")]
306use qlog::events::h3::Http3EventType;
307#[cfg(feature = "qlog")]
308use qlog::events::h3::Http3Frame;
309#[cfg(feature = "qlog")]
310use qlog::events::EventData;
311#[cfg(feature = "qlog")]
312use qlog::events::EventImportance;
313#[cfg(feature = "qlog")]
314use qlog::events::EventType;
315
316/// List of ALPN tokens of supported HTTP/3 versions.
317///
318/// This can be passed directly to the [`Config::set_application_protos()`]
319/// method when implementing HTTP/3 applications.
320///
321/// [`Config::set_application_protos()`]:
322/// ../struct.Config.html#method.set_application_protos
323pub const APPLICATION_PROTOCOL: &[&[u8]] = &[b"h3"];
324
325// The offset used when converting HTTP/3 urgency to quiche urgency.
326const PRIORITY_URGENCY_OFFSET: u8 = 124;
327
328// Parameter values as specified in [Extensible Priorities].
329//
330// [Extensible Priorities]: https://www.rfc-editor.org/rfc/rfc9218.html#section-4.
331const PRIORITY_URGENCY_LOWER_BOUND: u8 = 0;
332const PRIORITY_URGENCY_UPPER_BOUND: u8 = 7;
333const PRIORITY_URGENCY_DEFAULT: u8 = 3;
334const PRIORITY_INCREMENTAL_DEFAULT: bool = false;
335
336#[cfg(feature = "qlog")]
337const QLOG_FRAME_CREATED: EventType =
338    EventType::Http3EventType(Http3EventType::FrameCreated);
339#[cfg(feature = "qlog")]
340const QLOG_FRAME_PARSED: EventType =
341    EventType::Http3EventType(Http3EventType::FrameParsed);
342#[cfg(feature = "qlog")]
343const QLOG_STREAM_TYPE_SET: EventType =
344    EventType::Http3EventType(Http3EventType::StreamTypeSet);
345
346/// A specialized [`Result`] type for quiche HTTP/3 operations.
347///
348/// This type is used throughout quiche's HTTP/3 public API for any operation
349/// that can produce an error.
350///
351/// [`Result`]: https://doc.rust-lang.org/std/result/enum.Result.html
352pub type Result<T> = std::result::Result<T, Error>;
353
354/// An HTTP/3 error.
355#[derive(Clone, Copy, Debug, PartialEq, Eq)]
356pub enum Error {
357    /// There is no error or no work to do
358    Done,
359
360    /// The provided buffer is too short.
361    BufferTooShort,
362
363    /// Internal error in the HTTP/3 stack.
364    InternalError,
365
366    /// Endpoint detected that the peer is exhibiting behavior that causes.
367    /// excessive load.
368    ExcessiveLoad,
369
370    /// Stream ID or Push ID greater that current maximum was
371    /// used incorrectly, such as exceeding a limit, reducing a limit,
372    /// or being reused.
373    IdError,
374
375    /// The endpoint detected that its peer created a stream that it will not
376    /// accept.
377    StreamCreationError,
378
379    /// A required critical stream was closed.
380    ClosedCriticalStream,
381
382    /// No SETTINGS frame at beginning of control stream.
383    MissingSettings,
384
385    /// A frame was received which is not permitted in the current state.
386    FrameUnexpected,
387
388    /// Frame violated layout or size rules.
389    FrameError,
390
391    /// QPACK Header block decompression failure.
392    QpackDecompressionFailed,
393
394    /// Error originated from the transport layer.
395    TransportError(crate::Error),
396
397    /// The underlying QUIC stream (or connection) doesn't have enough capacity
398    /// for the operation to complete. The application should retry later on.
399    StreamBlocked,
400
401    /// Error in the payload of a SETTINGS frame.
402    SettingsError,
403
404    /// Server rejected request.
405    RequestRejected,
406
407    /// Request or its response cancelled.
408    RequestCancelled,
409
410    /// Client's request stream terminated without containing a full-formed
411    /// request.
412    RequestIncomplete,
413
414    /// An HTTP message was malformed and cannot be processed.
415    MessageError,
416
417    /// The TCP connection established in response to a CONNECT request was
418    /// reset or abnormally closed.
419    ConnectError,
420
421    /// The requested operation cannot be served over HTTP/3. Peer should retry
422    /// over HTTP/1.1.
423    VersionFallback,
424}
425
426/// HTTP/3 error codes sent on the wire.
427///
428/// As defined in [RFC9114](https://www.rfc-editor.org/rfc/rfc9114.html#http-error-codes).
429#[derive(Copy, Clone, Debug, Eq, PartialEq)]
430pub enum WireErrorCode {
431    /// No error. This is used when the connection or stream needs to be closed,
432    /// but there is no error to signal.
433    NoError              = 0x100,
434    /// Peer violated protocol requirements in a way that does not match a more
435    /// specific error code or endpoint declines to use the more specific
436    /// error code.
437    GeneralProtocolError = 0x101,
438    /// An internal error has occurred in the HTTP stack.
439    InternalError        = 0x102,
440    /// The endpoint detected that its peer created a stream that it will not
441    /// accept.
442    StreamCreationError  = 0x103,
443    /// A stream required by the HTTP/3 connection was closed or reset.
444    ClosedCriticalStream = 0x104,
445    /// A frame was received that was not permitted in the current state or on
446    /// the current stream.
447    FrameUnexpected      = 0x105,
448    /// A frame that fails to satisfy layout requirements or with an invalid
449    /// size was received.
450    FrameError           = 0x106,
451    /// The endpoint detected that its peer is exhibiting a behavior that might
452    /// be generating excessive load.
453    ExcessiveLoad        = 0x107,
454    /// A stream ID or push ID was used incorrectly, such as exceeding a limit,
455    /// reducing a limit, or being reused.
456    IdError              = 0x108,
457    /// An endpoint detected an error in the payload of a SETTINGS frame.
458    SettingsError        = 0x109,
459    /// No SETTINGS frame was received at the beginning of the control stream.
460    MissingSettings      = 0x10a,
461    /// A server rejected a request without performing any application
462    /// processing.
463    RequestRejected      = 0x10b,
464    /// The request or its response (including pushed response) is cancelled.
465    RequestCancelled     = 0x10c,
466    /// The client's stream terminated without containing a fully formed
467    /// request.
468    RequestIncomplete    = 0x10d,
469    /// An HTTP message was malformed and cannot be processed.
470    MessageError         = 0x10e,
471    /// The TCP connection established in response to a CONNECT request was
472    /// reset or abnormally closed.
473    ConnectError         = 0x10f,
474    /// The requested operation cannot be served over HTTP/3. The peer should
475    /// retry over HTTP/1.1.
476    VersionFallback      = 0x110,
477}
478
479impl Error {
480    fn to_wire(self) -> u64 {
481        match self {
482            Error::Done => WireErrorCode::NoError as u64,
483            Error::InternalError => WireErrorCode::InternalError as u64,
484            Error::StreamCreationError =>
485                WireErrorCode::StreamCreationError as u64,
486            Error::ClosedCriticalStream =>
487                WireErrorCode::ClosedCriticalStream as u64,
488            Error::FrameUnexpected => WireErrorCode::FrameUnexpected as u64,
489            Error::FrameError => WireErrorCode::FrameError as u64,
490            Error::ExcessiveLoad => WireErrorCode::ExcessiveLoad as u64,
491            Error::IdError => WireErrorCode::IdError as u64,
492            Error::MissingSettings => WireErrorCode::MissingSettings as u64,
493            Error::QpackDecompressionFailed => 0x200,
494            Error::BufferTooShort => 0x999,
495            Error::TransportError { .. } | Error::StreamBlocked => 0xFF,
496            Error::SettingsError => WireErrorCode::SettingsError as u64,
497            Error::RequestRejected => WireErrorCode::RequestRejected as u64,
498            Error::RequestCancelled => WireErrorCode::RequestCancelled as u64,
499            Error::RequestIncomplete => WireErrorCode::RequestIncomplete as u64,
500            Error::MessageError => WireErrorCode::MessageError as u64,
501            Error::ConnectError => WireErrorCode::ConnectError as u64,
502            Error::VersionFallback => WireErrorCode::VersionFallback as u64,
503        }
504    }
505
506    #[cfg(feature = "ffi")]
507    fn to_c(self) -> libc::ssize_t {
508        match self {
509            Error::Done => -1,
510            Error::BufferTooShort => -2,
511            Error::InternalError => -3,
512            Error::ExcessiveLoad => -4,
513            Error::IdError => -5,
514            Error::StreamCreationError => -6,
515            Error::ClosedCriticalStream => -7,
516            Error::MissingSettings => -8,
517            Error::FrameUnexpected => -9,
518            Error::FrameError => -10,
519            Error::QpackDecompressionFailed => -11,
520            // -12 was previously used for TransportError, skip it
521            Error::StreamBlocked => -13,
522            Error::SettingsError => -14,
523            Error::RequestRejected => -15,
524            Error::RequestCancelled => -16,
525            Error::RequestIncomplete => -17,
526            Error::MessageError => -18,
527            Error::ConnectError => -19,
528            Error::VersionFallback => -20,
529
530            Error::TransportError(quic_error) => quic_error.to_c() - 1000,
531        }
532    }
533}
534
535impl std::fmt::Display for Error {
536    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
537        write!(f, "{self:?}")
538    }
539}
540
541impl std::error::Error for Error {
542    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
543        None
544    }
545}
546
547impl std::convert::From<super::Error> for Error {
548    fn from(err: super::Error) -> Self {
549        match err {
550            super::Error::Done => Error::Done,
551
552            _ => Error::TransportError(err),
553        }
554    }
555}
556
557impl std::convert::From<octets::BufferTooShortError> for Error {
558    fn from(_err: octets::BufferTooShortError) -> Self {
559        Error::BufferTooShort
560    }
561}
562
563/// An HTTP/3 configuration.
564pub struct Config {
565    max_field_section_size: Option<u64>,
566    qpack_max_table_capacity: Option<u64>,
567    qpack_blocked_streams: Option<u64>,
568    connect_protocol_enabled: Option<u64>,
569    /// additional settings are settings that are not part of the H3
570    /// settings explicitly handled above
571    additional_settings: Option<Vec<(u64, u64)>>,
572}
573
574impl Config {
575    /// Creates a new configuration object with default settings.
576    pub const fn new() -> Result<Config> {
577        Ok(Config {
578            max_field_section_size: None,
579            qpack_max_table_capacity: None,
580            qpack_blocked_streams: None,
581            connect_protocol_enabled: None,
582            additional_settings: None,
583        })
584    }
585
586    /// Sets the `SETTINGS_MAX_FIELD_SECTION_SIZE` setting.
587    ///
588    /// By default no limit is enforced. When a request whose headers exceed
589    /// the limit set by the application is received, the call to the [`poll()`]
590    /// method will return the [`Error::ExcessiveLoad`] error, and the
591    /// connection will be closed.
592    ///
593    /// [`poll()`]: struct.Connection.html#method.poll
594    /// [`Error::ExcessiveLoad`]: enum.Error.html#variant.ExcessiveLoad
595    pub fn set_max_field_section_size(&mut self, v: u64) {
596        self.max_field_section_size = Some(v);
597    }
598
599    /// Sets the `SETTINGS_QPACK_MAX_TABLE_CAPACITY` setting.
600    ///
601    /// The default value is `0`.
602    pub fn set_qpack_max_table_capacity(&mut self, v: u64) {
603        self.qpack_max_table_capacity = Some(v);
604    }
605
606    /// Sets the `SETTINGS_QPACK_BLOCKED_STREAMS` setting.
607    ///
608    /// The default value is `0`.
609    pub fn set_qpack_blocked_streams(&mut self, v: u64) {
610        self.qpack_blocked_streams = Some(v);
611    }
612
613    /// Sets or omits the `SETTINGS_ENABLE_CONNECT_PROTOCOL` setting.
614    ///
615    /// The default value is `false`.
616    pub fn enable_extended_connect(&mut self, enabled: bool) {
617        if enabled {
618            self.connect_protocol_enabled = Some(1);
619        } else {
620            self.connect_protocol_enabled = None;
621        }
622    }
623
624    /// Sets additional HTTP/3 settings.
625    ///
626    /// The default value is no additional settings.
627    /// The `additional_settings` parameter must not the following
628    /// settings as they are already handled by this library:
629    ///
630    /// - SETTINGS_QPACK_MAX_TABLE_CAPACITY
631    /// - SETTINGS_MAX_FIELD_SECTION_SIZE
632    /// - SETTINGS_QPACK_BLOCKED_STREAMS
633    /// - SETTINGS_ENABLE_CONNECT_PROTOCOL
634    /// - SETTINGS_H3_DATAGRAM
635    ///
636    /// If such a setting is present in the `additional_settings`,
637    /// the method will return the [`Error::SettingsError`] error.
638    ///
639    /// If a setting identifier is present twice in `additional_settings`,
640    /// the method will return the [`Error::SettingsError`] error.
641    ///
642    /// [`Error::SettingsError`]: enum.Error.html#variant.SettingsError
643    pub fn set_additional_settings(
644        &mut self, additional_settings: Vec<(u64, u64)>,
645    ) -> Result<()> {
646        let explicit_quiche_settings = HashSet::from([
647            frame::SETTINGS_QPACK_MAX_TABLE_CAPACITY,
648            frame::SETTINGS_MAX_FIELD_SECTION_SIZE,
649            frame::SETTINGS_QPACK_BLOCKED_STREAMS,
650            frame::SETTINGS_ENABLE_CONNECT_PROTOCOL,
651            frame::SETTINGS_H3_DATAGRAM,
652            frame::SETTINGS_H3_DATAGRAM_00,
653        ]);
654
655        let dedup_settings: HashSet<u64> =
656            additional_settings.iter().map(|(key, _)| *key).collect();
657
658        if dedup_settings.len() != additional_settings.len() ||
659            !explicit_quiche_settings.is_disjoint(&dedup_settings)
660        {
661            return Err(Error::SettingsError);
662        }
663        self.additional_settings = Some(additional_settings);
664        Ok(())
665    }
666}
667
668/// A trait for types with associated string name and value.
669pub trait NameValue {
670    /// Returns the object's name.
671    fn name(&self) -> &[u8];
672
673    /// Returns the object's value.
674    fn value(&self) -> &[u8];
675}
676
677impl<N, V> NameValue for (N, V)
678where
679    N: AsRef<[u8]>,
680    V: AsRef<[u8]>,
681{
682    fn name(&self) -> &[u8] {
683        self.0.as_ref()
684    }
685
686    fn value(&self) -> &[u8] {
687        self.1.as_ref()
688    }
689}
690
691/// An owned name-value pair representing a raw HTTP header.
692#[derive(Clone, PartialEq, Eq)]
693pub struct Header(Vec<u8>, Vec<u8>);
694
695fn try_print_as_readable(hdr: &[u8], f: &mut fmt::Formatter) -> fmt::Result {
696    match std::str::from_utf8(hdr) {
697        Ok(s) => f.write_str(&s.escape_default().to_string()),
698        Err(_) => write!(f, "{hdr:?}"),
699    }
700}
701
702impl fmt::Debug for Header {
703    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
704        f.write_char('"')?;
705        try_print_as_readable(&self.0, f)?;
706        f.write_str(": ")?;
707        try_print_as_readable(&self.1, f)?;
708        f.write_char('"')
709    }
710}
711
712impl Header {
713    /// Creates a new header.
714    ///
715    /// Both `name` and `value` will be cloned.
716    pub fn new(name: &[u8], value: &[u8]) -> Self {
717        Self(name.to_vec(), value.to_vec())
718    }
719}
720
721impl NameValue for Header {
722    fn name(&self) -> &[u8] {
723        &self.0
724    }
725
726    fn value(&self) -> &[u8] {
727        &self.1
728    }
729}
730
731/// A non-owned name-value pair representing a raw HTTP header.
732#[derive(Clone, Debug, PartialEq, Eq)]
733pub struct HeaderRef<'a>(&'a [u8], &'a [u8]);
734
735impl<'a> HeaderRef<'a> {
736    /// Creates a new header.
737    pub const fn new(name: &'a [u8], value: &'a [u8]) -> Self {
738        Self(name, value)
739    }
740}
741
742impl NameValue for HeaderRef<'_> {
743    fn name(&self) -> &[u8] {
744        self.0
745    }
746
747    fn value(&self) -> &[u8] {
748        self.1
749    }
750}
751
752/// An HTTP/3 connection event.
753#[derive(Clone, Debug, PartialEq, Eq)]
754pub enum Event {
755    /// Request/response headers were received.
756    Headers {
757        /// The list of received header fields. The application should validate
758        /// pseudo-headers and headers.
759        list: Vec<Header>,
760
761        /// Whether more frames will follow the headers on the stream.
762        more_frames: bool,
763    },
764
765    /// Data was received.
766    ///
767    /// This indicates that the application can use the [`recv_body()`] method
768    /// to retrieve the data from the stream.
769    ///
770    /// Note that [`recv_body()`] will need to be called repeatedly until the
771    /// [`Done`] value is returned, as the event will not be re-armed until all
772    /// buffered data is read.
773    ///
774    /// [`recv_body()`]: struct.Connection.html#method.recv_body
775    /// [`Done`]: enum.Error.html#variant.Done
776    Data,
777
778    /// Stream was closed,
779    Finished,
780
781    /// Stream was reset.
782    ///
783    /// The associated data represents the error code sent by the peer.
784    Reset(u64),
785
786    /// PRIORITY_UPDATE was received.
787    ///
788    /// This indicates that the application can use the
789    /// [`take_last_priority_update()`] method to take the last received
790    /// PRIORITY_UPDATE for a specified stream.
791    ///
792    /// This event is triggered once per stream until the last PRIORITY_UPDATE
793    /// is taken. It is recommended that applications defer taking the
794    /// PRIORITY_UPDATE until after [`poll()`] returns [`Done`].
795    ///
796    /// [`take_last_priority_update()`]: struct.Connection.html#method.take_last_priority_update
797    /// [`poll()`]: struct.Connection.html#method.poll
798    /// [`Done`]: enum.Error.html#variant.Done
799    PriorityUpdate,
800
801    /// GOAWAY was received.
802    GoAway,
803}
804
805/// Extensible Priorities parameters.
806///
807/// The `TryFrom` trait supports constructing this object from the serialized
808/// Structured Fields Dictionary field value. I.e, use `TryFrom` to parse the
809/// value of a Priority header field or a PRIORITY_UPDATE frame. Using this
810/// trait requires the `sfv` feature to be enabled.
811#[derive(Debug, PartialEq, Eq)]
812#[repr(C)]
813pub struct Priority {
814    urgency: u8,
815    incremental: bool,
816}
817
818impl Default for Priority {
819    fn default() -> Self {
820        Priority {
821            urgency: PRIORITY_URGENCY_DEFAULT,
822            incremental: PRIORITY_INCREMENTAL_DEFAULT,
823        }
824    }
825}
826
827impl Priority {
828    /// Creates a new Priority.
829    pub const fn new(urgency: u8, incremental: bool) -> Self {
830        Priority {
831            urgency,
832            incremental,
833        }
834    }
835}
836
837#[cfg(feature = "sfv")]
838#[cfg_attr(docsrs, doc(cfg(feature = "sfv")))]
839impl TryFrom<&[u8]> for Priority {
840    type Error = crate::h3::Error;
841
842    /// Try to parse an Extensible Priority field value.
843    ///
844    /// The field value is expected to be a Structured Fields Dictionary; see
845    /// [Extensible Priorities].
846    ///
847    /// If the `u` or `i` fields are contained with correct types, a constructed
848    /// Priority object is returned. Note that urgency values outside of valid
849    /// range (0 through 7) are clamped to 7.
850    ///
851    /// If the `u` or `i` fields are contained with the wrong types,
852    /// Error::Done is returned.
853    ///
854    /// Omitted parameters will yield default values.
855    ///
856    /// [Extensible Priorities]: https://www.rfc-editor.org/rfc/rfc9218.html#section-4.
857    fn try_from(value: &[u8]) -> std::result::Result<Self, Self::Error> {
858        let dict = match sfv::Parser::parse_dictionary(value) {
859            Ok(v) => v,
860
861            Err(_) => return Err(Error::Done),
862        };
863
864        let urgency = match dict.get("u") {
865            // If there is a u parameter, try to read it as an Item of type
866            // Integer. If the value out of the spec's allowed range
867            // (0 through 7), that's an error so set it to the upper
868            // bound (lowest priority) to avoid interference with
869            // other streams.
870            Some(sfv::ListEntry::Item(item)) => match item.bare_item.as_int() {
871                Some(v) => {
872                    if !(PRIORITY_URGENCY_LOWER_BOUND as i64..=
873                        PRIORITY_URGENCY_UPPER_BOUND as i64)
874                        .contains(&v)
875                    {
876                        PRIORITY_URGENCY_UPPER_BOUND
877                    } else {
878                        v as u8
879                    }
880                },
881
882                None => return Err(Error::Done),
883            },
884
885            Some(sfv::ListEntry::InnerList(_)) => return Err(Error::Done),
886
887            // Omitted so use default value.
888            None => PRIORITY_URGENCY_DEFAULT,
889        };
890
891        let incremental = match dict.get("i") {
892            Some(sfv::ListEntry::Item(item)) =>
893                item.bare_item.as_bool().ok_or(Error::Done)?,
894
895            // Omitted so use default value.
896            _ => false,
897        };
898
899        Ok(Priority::new(urgency, incremental))
900    }
901}
902
903struct ConnectionSettings {
904    pub max_field_section_size: Option<u64>,
905    pub qpack_max_table_capacity: Option<u64>,
906    pub qpack_blocked_streams: Option<u64>,
907    pub connect_protocol_enabled: Option<u64>,
908    pub h3_datagram: Option<u64>,
909    pub additional_settings: Option<Vec<(u64, u64)>>,
910    pub raw: Option<Vec<(u64, u64)>>,
911}
912
913#[derive(Default)]
914struct QpackStreams {
915    pub encoder_stream_id: Option<u64>,
916    pub encoder_stream_bytes: u64,
917    pub decoder_stream_id: Option<u64>,
918    pub decoder_stream_bytes: u64,
919}
920
921/// Statistics about the connection.
922///
923/// A connection's statistics can be collected using the [`stats()`] method.
924///
925/// [`stats()`]: struct.Connection.html#method.stats
926#[derive(Clone, Default)]
927pub struct Stats {
928    /// The number of bytes received on the QPACK encoder stream.
929    pub qpack_encoder_stream_recv_bytes: u64,
930    /// The number of bytes received on the QPACK decoder stream.
931    pub qpack_decoder_stream_recv_bytes: u64,
932}
933
934fn close_conn_critical_stream(conn: &mut super::Connection) -> Result<()> {
935    conn.close(
936        true,
937        Error::ClosedCriticalStream.to_wire(),
938        b"Critical stream closed.",
939    )?;
940
941    Err(Error::ClosedCriticalStream)
942}
943
944fn close_conn_if_critical_stream_finished(
945    conn: &mut super::Connection, stream_id: u64,
946) -> Result<()> {
947    if conn.stream_finished(stream_id) {
948        close_conn_critical_stream(conn)?;
949    }
950
951    Ok(())
952}
953
954/// An HTTP/3 connection.
955pub struct Connection {
956    is_server: bool,
957
958    next_request_stream_id: u64,
959    next_uni_stream_id: u64,
960
961    streams: crate::stream::StreamIdHashMap<stream::Stream>,
962
963    local_settings: ConnectionSettings,
964    peer_settings: ConnectionSettings,
965
966    control_stream_id: Option<u64>,
967    peer_control_stream_id: Option<u64>,
968
969    qpack_encoder: qpack::Encoder,
970    qpack_decoder: qpack::Decoder,
971
972    local_qpack_streams: QpackStreams,
973    peer_qpack_streams: QpackStreams,
974
975    max_push_id: u64,
976
977    finished_streams: VecDeque<u64>,
978
979    frames_greased: bool,
980
981    local_goaway_id: Option<u64>,
982    peer_goaway_id: Option<u64>,
983}
984
985impl Connection {
986    fn new(
987        config: &Config, is_server: bool, enable_dgram: bool,
988    ) -> Result<Connection> {
989        let initial_uni_stream_id = if is_server { 0x3 } else { 0x2 };
990        let h3_datagram = if enable_dgram { Some(1) } else { None };
991
992        Ok(Connection {
993            is_server,
994
995            next_request_stream_id: 0,
996
997            next_uni_stream_id: initial_uni_stream_id,
998
999            streams: Default::default(),
1000
1001            local_settings: ConnectionSettings {
1002                max_field_section_size: config.max_field_section_size,
1003                qpack_max_table_capacity: config.qpack_max_table_capacity,
1004                qpack_blocked_streams: config.qpack_blocked_streams,
1005                connect_protocol_enabled: config.connect_protocol_enabled,
1006                h3_datagram,
1007                additional_settings: config.additional_settings.clone(),
1008                raw: Default::default(),
1009            },
1010
1011            peer_settings: ConnectionSettings {
1012                max_field_section_size: None,
1013                qpack_max_table_capacity: None,
1014                qpack_blocked_streams: None,
1015                h3_datagram: None,
1016                connect_protocol_enabled: None,
1017                additional_settings: Default::default(),
1018                raw: Default::default(),
1019            },
1020
1021            control_stream_id: None,
1022            peer_control_stream_id: None,
1023
1024            qpack_encoder: qpack::Encoder::new(),
1025            qpack_decoder: qpack::Decoder::new(),
1026
1027            local_qpack_streams: Default::default(),
1028            peer_qpack_streams: Default::default(),
1029
1030            max_push_id: 0,
1031
1032            finished_streams: VecDeque::new(),
1033
1034            frames_greased: false,
1035
1036            local_goaway_id: None,
1037            peer_goaway_id: None,
1038        })
1039    }
1040
1041    /// Creates a new HTTP/3 connection using the provided QUIC connection.
1042    ///
1043    /// This will also initiate the HTTP/3 handshake with the peer by opening
1044    /// all control streams (including QPACK) and sending the local settings.
1045    ///
1046    /// On success the new connection is returned.
1047    ///
1048    /// The [`StreamLimit`] error is returned when the HTTP/3 control stream
1049    /// cannot be created due to stream limits.
1050    ///
1051    /// The [`InternalError`] error is returned when either the underlying QUIC
1052    /// connection is not in a suitable state, or the HTTP/3 control stream
1053    /// cannot be created due to flow control limits.
1054    ///
1055    /// [`StreamLimit`]: ../enum.Error.html#variant.StreamLimit
1056    /// [`InternalError`]: ../enum.Error.html#variant.InternalError
1057    pub fn with_transport(
1058        conn: &mut super::Connection, config: &Config,
1059    ) -> Result<Connection> {
1060        let is_client = !conn.is_server;
1061        if is_client && !(conn.is_established() || conn.is_in_early_data()) {
1062            trace!("{} QUIC connection must be established or in early data before creating an HTTP/3 connection", conn.trace_id());
1063            return Err(Error::InternalError);
1064        }
1065
1066        let mut http3_conn =
1067            Connection::new(config, conn.is_server, conn.dgram_enabled())?;
1068
1069        match http3_conn.send_settings(conn) {
1070            Ok(_) => (),
1071
1072            Err(e) => {
1073                conn.close(true, e.to_wire(), b"Error opening control stream")?;
1074                return Err(e);
1075            },
1076        };
1077
1078        // Try opening QPACK streams, but ignore errors if it fails since we
1079        // don't need them right now.
1080        http3_conn.open_qpack_encoder_stream(conn).ok();
1081        http3_conn.open_qpack_decoder_stream(conn).ok();
1082
1083        if conn.grease {
1084            // Try opening a GREASE stream, but ignore errors since it's not
1085            // critical.
1086            http3_conn.open_grease_stream(conn).ok();
1087        }
1088
1089        Ok(http3_conn)
1090    }
1091
1092    /// Sends an HTTP/3 request.
1093    ///
1094    /// The request is encoded from the provided list of headers without a
1095    /// body, and sent on a newly allocated stream. To include a body,
1096    /// set `fin` as `false` and subsequently call [`send_body()`] with the
1097    /// same `conn` and the `stream_id` returned from this method.
1098    ///
1099    /// On success the newly allocated stream ID is returned.
1100    ///
1101    /// The [`StreamBlocked`] error is returned when the underlying QUIC stream
1102    /// doesn't have enough capacity for the operation to complete. When this
1103    /// happens the application should retry the operation once the stream is
1104    /// reported as writable again.
1105    ///
1106    /// [`send_body()`]: struct.Connection.html#method.send_body
1107    /// [`StreamBlocked`]: enum.Error.html#variant.StreamBlocked
1108    pub fn send_request<T: NameValue>(
1109        &mut self, conn: &mut super::Connection, headers: &[T], fin: bool,
1110    ) -> Result<u64> {
1111        // If we received a GOAWAY from the peer, MUST NOT initiate new
1112        // requests.
1113        if self.peer_goaway_id.is_some() {
1114            return Err(Error::FrameUnexpected);
1115        }
1116
1117        let stream_id = self.next_request_stream_id;
1118
1119        self.streams
1120            .insert(stream_id, stream::Stream::new(stream_id, true));
1121
1122        // The underlying QUIC stream does not exist yet, so calls to e.g.
1123        // stream_capacity() will fail. By writing a 0-length buffer, we force
1124        // the creation of the QUIC stream state, without actually writing
1125        // anything.
1126        if let Err(e) = conn.stream_send(stream_id, b"", false) {
1127            self.streams.remove(&stream_id);
1128
1129            if e == super::Error::Done {
1130                return Err(Error::StreamBlocked);
1131            }
1132
1133            return Err(e.into());
1134        };
1135
1136        self.send_headers(conn, stream_id, headers, fin)?;
1137
1138        // To avoid skipping stream IDs, we only calculate the next available
1139        // stream ID when a request has been successfully buffered.
1140        self.next_request_stream_id = self
1141            .next_request_stream_id
1142            .checked_add(4)
1143            .ok_or(Error::IdError)?;
1144
1145        Ok(stream_id)
1146    }
1147
1148    /// Sends an HTTP/3 response on the specified stream with default priority.
1149    ///
1150    /// This method sends the provided `headers` as a single initial response
1151    /// without a body.
1152    ///
1153    /// To send a non-final 1xx, then a final 200+ without body:
1154    ///   * send_response() with `fin` set to `false`.
1155    ///   * [`send_additional_headers()`] with fin set to `true` using the same
1156    ///     `stream_id` value.
1157    ///
1158    /// To send a non-final 1xx, then a final 200+ with body:
1159    ///   * send_response() with `fin` set to `false`.
1160    ///   * [`send_additional_headers()`] with fin set to `false` and same
1161    ///     `stream_id` value.
1162    ///   * [`send_body()`] with same `stream_id`.
1163    ///
1164    /// To send a final 200+ with body:
1165    ///   * send_response() with `fin` set to `false`.
1166    ///   * [`send_body()`] with same `stream_id`.
1167    ///
1168    /// Additional headers can only be sent during certain phases of an HTTP/3
1169    /// message exchange, see [Section 4.1 of RFC 9114]. The [`FrameUnexpected`]
1170    /// error is returned if this method, or [`send_response_with_priority()`],
1171    /// are called multiple times with the same `stream_id` value.
1172    ///
1173    /// The [`StreamBlocked`] error is returned when the underlying QUIC stream
1174    /// doesn't have enough capacity for the operation to complete. When this
1175    /// happens the application should retry the operation once the stream is
1176    /// reported as writable again.
1177    ///
1178    /// [`send_body()`]: struct.Connection.html#method.send_body
1179    /// [`send_additional_headers()`]:
1180    ///     struct.Connection.html#method.send_additional_headers
1181    /// [`send_response_with_priority()`]:
1182    ///     struct.Connection.html#method.send_response_with_priority
1183    /// [`FrameUnexpected`]: enum.Error.html#variant.FrameUnexpected
1184    /// [`StreamBlocked`]: enum.Error.html#variant.StreamBlocked
1185    pub fn send_response<T: NameValue>(
1186        &mut self, conn: &mut super::Connection, stream_id: u64, headers: &[T],
1187        fin: bool,
1188    ) -> Result<()> {
1189        let priority = Default::default();
1190
1191        self.send_response_with_priority(
1192            conn, stream_id, headers, &priority, fin,
1193        )?;
1194
1195        Ok(())
1196    }
1197
1198    /// Sends an HTTP/3 response on the specified stream with specified
1199    /// priority.
1200    ///
1201    /// The `priority` parameter represents [Extensible Priority]
1202    /// parameters. If the urgency is outside the range 0-7, it will be clamped
1203    /// to 7.
1204    ///
1205    /// This method sends the provided `headers` as a single initial response
1206    /// without a body.
1207    ///
1208    /// To send a non-final 1xx, then a final 200+ without body:
1209    ///   * send_response_with_priority() with `fin` set to `false`.
1210    ///   * [`send_additional_headers()`] with fin set to `true` using the same
1211    ///     `stream_id` value.
1212    ///
1213    /// To send a non-final 1xx, then a final 200+ with body:
1214    ///   * send_response_with_priority() with `fin` set to `false`.
1215    ///   * [`send_additional_headers()`] with fin set to `false` and same
1216    ///     `stream_id` value.
1217    ///   * [`send_body()`] with same `stream_id`.
1218    ///
1219    /// To send a final 200+ with body:
1220    ///   * send_response_with_priority() with `fin` set to `false`.
1221    ///   * [`send_body()`] with same `stream_id`.
1222    ///
1223    /// Additional headers can only be sent during certain phases of an HTTP/3
1224    /// message exchange, see [Section 4.1 of RFC 9114]. The [`FrameUnexpected`]
1225    /// error is returned if this method, or [`send_response()`],
1226    /// are called multiple times with the same `stream_id` value.
1227    ///
1228    /// The [`StreamBlocked`] error is returned when the underlying QUIC stream
1229    /// doesn't have enough capacity for the operation to complete. When this
1230    /// happens the application should retry the operation once the stream is
1231    /// reported as writable again.
1232    ///
1233    /// [`send_body()`]: struct.Connection.html#method.send_body
1234    /// [`send_additional_headers()`]:
1235    ///     struct.Connection.html#method.send_additional_headers
1236    /// [`send_response()`]:
1237    ///     struct.Connection.html#method.send_response
1238    /// [`FrameUnexpected`]: enum.Error.html#variant.FrameUnexpected
1239    /// [`StreamBlocked`]: enum.Error.html#variant.StreamBlocked
1240    /// [Extensible Priority]: https://www.rfc-editor.org/rfc/rfc9218.html#section-4.
1241    pub fn send_response_with_priority<T: NameValue>(
1242        &mut self, conn: &mut super::Connection, stream_id: u64, headers: &[T],
1243        priority: &Priority, fin: bool,
1244    ) -> Result<()> {
1245        match self.streams.get(&stream_id) {
1246            Some(s) => {
1247                // Only one initial HEADERS allowed.
1248                if s.local_initialized() {
1249                    return Err(Error::FrameUnexpected);
1250                }
1251
1252                s
1253            },
1254
1255            None => return Err(Error::FrameUnexpected),
1256        };
1257
1258        self.send_headers(conn, stream_id, headers, fin)?;
1259
1260        // Clamp and shift urgency into quiche-priority space
1261        let urgency = priority
1262            .urgency
1263            .clamp(PRIORITY_URGENCY_LOWER_BOUND, PRIORITY_URGENCY_UPPER_BOUND) +
1264            PRIORITY_URGENCY_OFFSET;
1265
1266        conn.stream_priority(stream_id, urgency, priority.incremental)?;
1267
1268        Ok(())
1269    }
1270
1271    /// Sends additional HTTP/3 headers.
1272    ///
1273    /// After the initial request or response headers have been sent, using
1274    /// [`send_request()`] or [`send_response()`] respectively, this method can
1275    /// be used send an additional HEADERS frame. For example, to send a single
1276    /// instance of trailers after a request with a body, or to issue another
1277    /// non-final 1xx after a preceding 1xx, or to issue a final response after
1278    /// a preceding 1xx.
1279    ///
1280    /// Additional headers can only be sent during certain phases of an HTTP/3
1281    /// message exchange, see [Section 4.1 of RFC 9114]. The [`FrameUnexpected`]
1282    /// error is returned when this method is called during the wrong phase,
1283    /// such as before initial headers have been sent, or if trailers have
1284    /// already been sent.
1285    ///
1286    /// The [`StreamBlocked`] error is returned when the underlying QUIC stream
1287    /// doesn't have enough capacity for the operation to complete. When this
1288    /// happens the application should retry the operation once the stream is
1289    /// reported as writable again.
1290    ///
1291    /// [`send_request()`]: struct.Connection.html#method.send_request
1292    /// [`send_response()`]: struct.Connection.html#method.send_response
1293    /// [`StreamBlocked`]: enum.Error.html#variant.StreamBlocked
1294    /// [`FrameUnexpected`]: enum.Error.html#variant.FrameUnexpected
1295    /// [Section 4.1 of RFC 9114]:
1296    ///     https://www.rfc-editor.org/rfc/rfc9114.html#section-4.1.
1297    pub fn send_additional_headers<T: NameValue>(
1298        &mut self, conn: &mut super::Connection, stream_id: u64, headers: &[T],
1299        is_trailer_section: bool, fin: bool,
1300    ) -> Result<()> {
1301        // Clients can only send trailer headers.
1302        if !self.is_server && !is_trailer_section {
1303            return Err(Error::FrameUnexpected);
1304        }
1305
1306        match self.streams.get(&stream_id) {
1307            Some(s) => {
1308                // Initial HEADERS must have been sent.
1309                if !s.local_initialized() {
1310                    return Err(Error::FrameUnexpected);
1311                }
1312
1313                // Only one trailing HEADERS allowed.
1314                if s.trailers_sent() {
1315                    return Err(Error::FrameUnexpected);
1316                }
1317
1318                s
1319            },
1320
1321            None => return Err(Error::FrameUnexpected),
1322        };
1323
1324        self.send_headers(conn, stream_id, headers, fin)?;
1325
1326        if is_trailer_section {
1327            // send_headers() might have tidied the stream away, so we need to
1328            // check again.
1329            if let Some(s) = self.streams.get_mut(&stream_id) {
1330                s.mark_trailers_sent();
1331            }
1332        }
1333
1334        Ok(())
1335    }
1336
1337    fn encode_header_block<T: NameValue>(
1338        &mut self, headers: &[T],
1339    ) -> Result<Vec<u8>> {
1340        let headers_len = headers
1341            .iter()
1342            .fold(0, |acc, h| acc + h.value().len() + h.name().len() + 32);
1343
1344        let mut header_block = vec![0; headers_len];
1345        let len = self
1346            .qpack_encoder
1347            .encode(headers, &mut header_block)
1348            .map_err(|_| Error::InternalError)?;
1349
1350        header_block.truncate(len);
1351
1352        Ok(header_block)
1353    }
1354
1355    fn send_headers<T: NameValue>(
1356        &mut self, conn: &mut super::Connection, stream_id: u64, headers: &[T],
1357        fin: bool,
1358    ) -> Result<()> {
1359        let mut d = [42; 10];
1360        let mut b = octets::OctetsMut::with_slice(&mut d);
1361
1362        if !self.frames_greased && conn.grease {
1363            self.send_grease_frames(conn, stream_id)?;
1364            self.frames_greased = true;
1365        }
1366
1367        let header_block = self.encode_header_block(headers)?;
1368
1369        let overhead = octets::varint_len(frame::HEADERS_FRAME_TYPE_ID) +
1370            octets::varint_len(header_block.len() as u64);
1371
1372        // Headers need to be sent atomically, so make sure the stream has
1373        // enough capacity.
1374        match conn.stream_writable(stream_id, overhead + header_block.len()) {
1375            Ok(true) => (),
1376
1377            Ok(false) => return Err(Error::StreamBlocked),
1378
1379            Err(e) => {
1380                if conn.stream_finished(stream_id) {
1381                    self.streams.remove(&stream_id);
1382                }
1383
1384                return Err(e.into());
1385            },
1386        };
1387
1388        b.put_varint(frame::HEADERS_FRAME_TYPE_ID)?;
1389        b.put_varint(header_block.len() as u64)?;
1390        let off = b.off();
1391        conn.stream_send(stream_id, &d[..off], false)?;
1392
1393        // Sending header block separately avoids unnecessary copy.
1394        conn.stream_send(stream_id, &header_block, fin)?;
1395
1396        trace!(
1397            "{} tx frm HEADERS stream={} len={} fin={}",
1398            conn.trace_id(),
1399            stream_id,
1400            header_block.len(),
1401            fin
1402        );
1403
1404        qlog_with_type!(QLOG_FRAME_CREATED, conn.qlog, q, {
1405            let qlog_headers = headers
1406                .iter()
1407                .map(|h| qlog::events::h3::HttpHeader {
1408                    name: String::from_utf8_lossy(h.name()).into_owned(),
1409                    value: String::from_utf8_lossy(h.value()).into_owned(),
1410                })
1411                .collect();
1412
1413            let frame = Http3Frame::Headers {
1414                headers: qlog_headers,
1415            };
1416            let ev_data = EventData::H3FrameCreated(H3FrameCreated {
1417                stream_id,
1418                length: Some(header_block.len() as u64),
1419                frame,
1420                ..Default::default()
1421            });
1422
1423            q.add_event_data_now(ev_data).ok();
1424        });
1425
1426        if let Some(s) = self.streams.get_mut(&stream_id) {
1427            s.initialize_local();
1428        }
1429
1430        if fin && conn.stream_finished(stream_id) {
1431            self.streams.remove(&stream_id);
1432        }
1433
1434        Ok(())
1435    }
1436
1437    /// Sends an HTTP/3 body chunk on the given stream.
1438    ///
1439    /// On success the number of bytes written is returned, or [`Done`] if no
1440    /// bytes could be written (e.g. because the stream is blocked).
1441    ///
1442    /// Note that the number of written bytes returned can be lower than the
1443    /// length of the input buffer when the underlying QUIC stream doesn't have
1444    /// enough capacity for the operation to complete.
1445    ///
1446    /// When a partial write happens (including when [`Done`] is returned) the
1447    /// application should retry the operation once the stream is reported as
1448    /// writable again.
1449    ///
1450    /// [`Done`]: enum.Error.html#variant.Done
1451    pub fn send_body(
1452        &mut self, conn: &mut super::Connection, stream_id: u64, body: &[u8],
1453        fin: bool,
1454    ) -> Result<usize> {
1455        let mut d = [42; 10];
1456        let mut b = octets::OctetsMut::with_slice(&mut d);
1457
1458        // Validate that it is sane to send data on the stream.
1459        if stream_id % 4 != 0 {
1460            return Err(Error::FrameUnexpected);
1461        }
1462
1463        match self.streams.get_mut(&stream_id) {
1464            Some(s) => {
1465                if !s.local_initialized() {
1466                    return Err(Error::FrameUnexpected);
1467                }
1468
1469                if s.trailers_sent() {
1470                    return Err(Error::FrameUnexpected);
1471                }
1472            },
1473
1474            None => {
1475                return Err(Error::FrameUnexpected);
1476            },
1477        };
1478
1479        // Avoid sending 0-length DATA frames when the fin flag is false.
1480        if body.is_empty() && !fin {
1481            return Err(Error::Done);
1482        }
1483
1484        let overhead = octets::varint_len(frame::DATA_FRAME_TYPE_ID) +
1485            octets::varint_len(body.len() as u64);
1486
1487        let stream_cap = match conn.stream_capacity(stream_id) {
1488            Ok(v) => v,
1489
1490            Err(e) => {
1491                if conn.stream_finished(stream_id) {
1492                    self.streams.remove(&stream_id);
1493                }
1494
1495                return Err(e.into());
1496            },
1497        };
1498
1499        // Make sure there is enough capacity to send the DATA frame header.
1500        if stream_cap < overhead {
1501            let _ = conn.stream_writable(stream_id, overhead + 1);
1502            return Err(Error::Done);
1503        }
1504
1505        // Cap the frame payload length to the stream's capacity.
1506        let body_len = std::cmp::min(body.len(), stream_cap - overhead);
1507
1508        // If we can't send the entire body, set the fin flag to false so the
1509        // application can try again later.
1510        let fin = if body_len != body.len() { false } else { fin };
1511
1512        // Again, avoid sending 0-length DATA frames when the fin flag is false.
1513        if body_len == 0 && !fin {
1514            let _ = conn.stream_writable(stream_id, overhead + 1);
1515            return Err(Error::Done);
1516        }
1517
1518        b.put_varint(frame::DATA_FRAME_TYPE_ID)?;
1519        b.put_varint(body_len as u64)?;
1520        let off = b.off();
1521        conn.stream_send(stream_id, &d[..off], false)?;
1522
1523        // Return how many bytes were written, excluding the frame header.
1524        // Sending body separately avoids unnecessary copy.
1525        let written = conn.stream_send(stream_id, &body[..body_len], fin)?;
1526
1527        trace!(
1528            "{} tx frm DATA stream={} len={} fin={}",
1529            conn.trace_id(),
1530            stream_id,
1531            written,
1532            fin
1533        );
1534
1535        qlog_with_type!(QLOG_FRAME_CREATED, conn.qlog, q, {
1536            let frame = Http3Frame::Data { raw: None };
1537            let ev_data = EventData::H3FrameCreated(H3FrameCreated {
1538                stream_id,
1539                length: Some(written as u64),
1540                frame,
1541                ..Default::default()
1542            });
1543
1544            q.add_event_data_now(ev_data).ok();
1545        });
1546
1547        if written < body.len() {
1548            // Ensure the peer is notified that the connection or stream is
1549            // blocked when the stream's capacity is limited by flow control.
1550            //
1551            // We only need enough capacity to send a few bytes, to make sure
1552            // the stream doesn't hang due to congestion window not growing
1553            // enough.
1554            let _ = conn.stream_writable(stream_id, overhead + 1);
1555        }
1556
1557        if fin && written == body.len() && conn.stream_finished(stream_id) {
1558            self.streams.remove(&stream_id);
1559        }
1560
1561        Ok(written)
1562    }
1563
1564    /// Returns whether the peer enabled HTTP/3 DATAGRAM frame support.
1565    ///
1566    /// Support is signalled by the peer's SETTINGS, so this method always
1567    /// returns false until they have been processed using the [`poll()`]
1568    /// method.
1569    ///
1570    /// [`poll()`]: struct.Connection.html#method.poll
1571    pub fn dgram_enabled_by_peer(&self, conn: &super::Connection) -> bool {
1572        self.peer_settings.h3_datagram == Some(1) &&
1573            conn.dgram_max_writable_len().is_some()
1574    }
1575
1576    /// Returns whether the peer enabled extended CONNECT support.
1577    ///
1578    /// Support is signalled by the peer's SETTINGS, so this method always
1579    /// returns false until they have been processed using the [`poll()`]
1580    /// method.
1581    ///
1582    /// [`poll()`]: struct.Connection.html#method.poll
1583    pub fn extended_connect_enabled_by_peer(&self) -> bool {
1584        self.peer_settings.connect_protocol_enabled == Some(1)
1585    }
1586
1587    /// Reads request or response body data into the provided buffer.
1588    ///
1589    /// Applications should call this method whenever the [`poll()`] method
1590    /// returns a [`Data`] event.
1591    ///
1592    /// On success the amount of bytes read is returned, or [`Done`] if there
1593    /// is no data to read.
1594    ///
1595    /// [`poll()`]: struct.Connection.html#method.poll
1596    /// [`Data`]: enum.Event.html#variant.Data
1597    /// [`Done`]: enum.Error.html#variant.Done
1598    pub fn recv_body(
1599        &mut self, conn: &mut super::Connection, stream_id: u64, out: &mut [u8],
1600    ) -> Result<usize> {
1601        let mut total = 0;
1602
1603        // Try to consume all buffered data for the stream, even across multiple
1604        // DATA frames.
1605        while total < out.len() {
1606            let stream = self.streams.get_mut(&stream_id).ok_or(Error::Done)?;
1607
1608            if stream.state() != stream::State::Data {
1609                break;
1610            }
1611
1612            let (read, fin) =
1613                match stream.try_consume_data(conn, &mut out[total..]) {
1614                    Ok(v) => v,
1615
1616                    Err(Error::Done) => break,
1617
1618                    Err(e) => return Err(e),
1619                };
1620
1621            total += read;
1622
1623            // No more data to read, we are done.
1624            if read == 0 || fin {
1625                break;
1626            }
1627
1628            // Process incoming data from the stream. For example, if a whole
1629            // DATA frame was consumed, and another one is queued behind it,
1630            // this will ensure the additional data will also be returned to
1631            // the application.
1632            match self.process_readable_stream(conn, stream_id, false) {
1633                Ok(_) => unreachable!(),
1634
1635                Err(Error::Done) => (),
1636
1637                Err(e) => return Err(e),
1638            };
1639
1640            if conn.stream_finished(stream_id) {
1641                break;
1642            }
1643        }
1644
1645        // While body is being received, the stream is marked as finished only
1646        // when all data is read by the application.
1647        if conn.stream_finished(stream_id) {
1648            self.process_finished_stream(stream_id);
1649        }
1650
1651        if total == 0 {
1652            return Err(Error::Done);
1653        }
1654
1655        Ok(total)
1656    }
1657
1658    /// Sends a PRIORITY_UPDATE frame on the control stream with specified
1659    /// request stream ID and priority.
1660    ///
1661    /// The `priority` parameter represents [Extensible Priority]
1662    /// parameters. If the urgency is outside the range 0-7, it will be clamped
1663    /// to 7.
1664    ///
1665    /// The [`StreamBlocked`] error is returned when the underlying QUIC stream
1666    /// doesn't have enough capacity for the operation to complete. When this
1667    /// happens the application should retry the operation once the stream is
1668    /// reported as writable again.
1669    ///
1670    /// [`StreamBlocked`]: enum.Error.html#variant.StreamBlocked
1671    /// [Extensible Priority]: https://www.rfc-editor.org/rfc/rfc9218.html#section-4.
1672    pub fn send_priority_update_for_request(
1673        &mut self, conn: &mut super::Connection, stream_id: u64,
1674        priority: &Priority,
1675    ) -> Result<()> {
1676        let mut d = [42; 20];
1677        let mut b = octets::OctetsMut::with_slice(&mut d);
1678
1679        // Validate that it is sane to send PRIORITY_UPDATE.
1680        if self.is_server {
1681            return Err(Error::FrameUnexpected);
1682        }
1683
1684        if stream_id % 4 != 0 {
1685            return Err(Error::FrameUnexpected);
1686        }
1687
1688        let control_stream_id =
1689            self.control_stream_id.ok_or(Error::FrameUnexpected)?;
1690
1691        let urgency = priority
1692            .urgency
1693            .clamp(PRIORITY_URGENCY_LOWER_BOUND, PRIORITY_URGENCY_UPPER_BOUND);
1694
1695        let mut field_value = format!("u={urgency}");
1696
1697        if priority.incremental {
1698            field_value.push_str(",i");
1699        }
1700
1701        let priority_field_value = field_value.as_bytes();
1702        let frame_payload_len =
1703            octets::varint_len(stream_id) + priority_field_value.len();
1704
1705        let overhead =
1706            octets::varint_len(frame::PRIORITY_UPDATE_FRAME_REQUEST_TYPE_ID) +
1707                octets::varint_len(stream_id) +
1708                octets::varint_len(frame_payload_len as u64);
1709
1710        // Make sure the control stream has enough capacity.
1711        match conn.stream_writable(
1712            control_stream_id,
1713            overhead + priority_field_value.len(),
1714        ) {
1715            Ok(true) => (),
1716
1717            Ok(false) => return Err(Error::StreamBlocked),
1718
1719            Err(e) => {
1720                return Err(e.into());
1721            },
1722        }
1723
1724        b.put_varint(frame::PRIORITY_UPDATE_FRAME_REQUEST_TYPE_ID)?;
1725        b.put_varint(frame_payload_len as u64)?;
1726        b.put_varint(stream_id)?;
1727        let off = b.off();
1728        conn.stream_send(control_stream_id, &d[..off], false)?;
1729
1730        // Sending field value separately avoids unnecessary copy.
1731        conn.stream_send(control_stream_id, priority_field_value, false)?;
1732
1733        trace!(
1734            "{} tx frm PRIORITY_UPDATE request_stream={} priority_field_value={}",
1735            conn.trace_id(),
1736            stream_id,
1737            field_value,
1738        );
1739
1740        qlog_with_type!(QLOG_FRAME_CREATED, conn.qlog, q, {
1741            let frame = Http3Frame::PriorityUpdate {
1742                target_stream_type: H3PriorityTargetStreamType::Request,
1743                prioritized_element_id: stream_id,
1744                priority_field_value: field_value.clone(),
1745            };
1746
1747            let ev_data = EventData::H3FrameCreated(H3FrameCreated {
1748                stream_id,
1749                length: Some(priority_field_value.len() as u64),
1750                frame,
1751                ..Default::default()
1752            });
1753
1754            q.add_event_data_now(ev_data).ok();
1755        });
1756
1757        Ok(())
1758    }
1759
1760    /// Take the last PRIORITY_UPDATE for a prioritized element ID.
1761    ///
1762    /// When the [`poll()`] method returns a [`PriorityUpdate`] event for a
1763    /// prioritized element, the event has triggered and will not rearm until
1764    /// applications call this method. It is recommended that applications defer
1765    /// taking the PRIORITY_UPDATE until after [`poll()`] returns [`Done`].
1766    ///
1767    /// On success the Priority Field Value is returned, or [`Done`] if there is
1768    /// no PRIORITY_UPDATE to read (either because there is no value to take, or
1769    /// because the prioritized element does not exist).
1770    ///
1771    /// [`poll()`]: struct.Connection.html#method.poll
1772    /// [`PriorityUpdate`]: enum.Event.html#variant.PriorityUpdate
1773    /// [`Done`]: enum.Error.html#variant.Done
1774    pub fn take_last_priority_update(
1775        &mut self, prioritized_element_id: u64,
1776    ) -> Result<Vec<u8>> {
1777        if let Some(stream) = self.streams.get_mut(&prioritized_element_id) {
1778            return stream.take_last_priority_update().ok_or(Error::Done);
1779        }
1780
1781        Err(Error::Done)
1782    }
1783
1784    /// Processes HTTP/3 data received from the peer.
1785    ///
1786    /// On success it returns an [`Event`] and an ID, or [`Done`] when there are
1787    /// no events to report.
1788    ///
1789    /// Note that all events are edge-triggered, meaning that once reported they
1790    /// will not be reported again by calling this method again, until the event
1791    /// is re-armed.
1792    ///
1793    /// The events [`Headers`], [`Data`] and [`Finished`] return a stream ID,
1794    /// which is used in methods [`recv_body()`], [`send_response()`] or
1795    /// [`send_body()`].
1796    ///
1797    /// The event [`GoAway`] returns an ID that depends on the connection role.
1798    /// A client receives the largest processed stream ID. A server receives the
1799    /// the largest permitted push ID.
1800    ///
1801    /// The event [`PriorityUpdate`] only occurs at servers. It returns a
1802    /// prioritized element ID that is used in the method
1803    /// [`take_last_priority_update()`], which rearms the event for that ID.
1804    ///
1805    /// If an error occurs while processing data, the connection is closed with
1806    /// the appropriate error code, using the transport's [`close()`] method.
1807    ///
1808    /// [`Event`]: enum.Event.html
1809    /// [`Done`]: enum.Error.html#variant.Done
1810    /// [`Headers`]: enum.Event.html#variant.Headers
1811    /// [`Data`]: enum.Event.html#variant.Data
1812    /// [`Finished`]: enum.Event.html#variant.Finished
1813    /// [`GoAway`]: enum.Event.html#variant.GoAWay
1814    /// [`PriorityUpdate`]: enum.Event.html#variant.PriorityUpdate
1815    /// [`recv_body()`]: struct.Connection.html#method.recv_body
1816    /// [`send_response()`]: struct.Connection.html#method.send_response
1817    /// [`send_body()`]: struct.Connection.html#method.send_body
1818    /// [`recv_dgram()`]: struct.Connection.html#method.recv_dgram
1819    /// [`take_last_priority_update()`]: struct.Connection.html#method.take_last_priority_update
1820    /// [`close()`]: ../struct.Connection.html#method.close
1821    pub fn poll(&mut self, conn: &mut super::Connection) -> Result<(u64, Event)> {
1822        // When connection close is initiated by the local application (e.g. due
1823        // to a protocol error), the connection itself might be in a broken
1824        // state, so return early.
1825        if conn.local_error.is_some() {
1826            return Err(Error::Done);
1827        }
1828
1829        // Process control streams first.
1830        if let Some(stream_id) = self.peer_control_stream_id {
1831            match self.process_control_stream(conn, stream_id) {
1832                Ok(ev) => return Ok(ev),
1833
1834                Err(Error::Done) => (),
1835
1836                Err(e) => return Err(e),
1837            };
1838        }
1839
1840        if let Some(stream_id) = self.peer_qpack_streams.encoder_stream_id {
1841            match self.process_control_stream(conn, stream_id) {
1842                Ok(ev) => return Ok(ev),
1843
1844                Err(Error::Done) => (),
1845
1846                Err(e) => return Err(e),
1847            };
1848        }
1849
1850        if let Some(stream_id) = self.peer_qpack_streams.decoder_stream_id {
1851            match self.process_control_stream(conn, stream_id) {
1852                Ok(ev) => return Ok(ev),
1853
1854                Err(Error::Done) => (),
1855
1856                Err(e) => return Err(e),
1857            };
1858        }
1859
1860        // Process finished streams list.
1861        if let Some(finished) = self.finished_streams.pop_front() {
1862            return Ok((finished, Event::Finished));
1863        }
1864
1865        // Process HTTP/3 data from readable streams.
1866        for s in conn.readable() {
1867            trace!("{} stream id {} is readable", conn.trace_id(), s);
1868
1869            let ev = match self.process_readable_stream(conn, s, true) {
1870                Ok(v) => Some(v),
1871
1872                Err(Error::Done) => None,
1873
1874                // Return early if the stream was reset, to avoid returning
1875                // a Finished event later as well.
1876                Err(Error::TransportError(crate::Error::StreamReset(e))) =>
1877                    return Ok((s, Event::Reset(e))),
1878
1879                Err(e) => return Err(e),
1880            };
1881
1882            if conn.stream_finished(s) {
1883                self.process_finished_stream(s);
1884            }
1885
1886            // TODO: check if stream is completed so it can be freed
1887            if let Some(ev) = ev {
1888                return Ok(ev);
1889            }
1890        }
1891
1892        // Process finished streams list once again, to make sure `Finished`
1893        // events are returned when receiving empty stream frames with the fin
1894        // flag set.
1895        if let Some(finished) = self.finished_streams.pop_front() {
1896            if conn.stream_readable(finished) {
1897                // The stream is finished, but is still readable, it may
1898                // indicate that there is a pending error, such as reset.
1899                if let Err(crate::Error::StreamReset(e)) =
1900                    conn.stream_recv(finished, &mut [])
1901                {
1902                    return Ok((finished, Event::Reset(e)));
1903                }
1904            }
1905            return Ok((finished, Event::Finished));
1906        }
1907
1908        Err(Error::Done)
1909    }
1910
1911    /// Sends a GOAWAY frame to initiate graceful connection closure.
1912    ///
1913    /// When quiche is used in the server role, the `id` parameter is the stream
1914    /// ID of the highest processed request. This can be any valid ID between 0
1915    /// and 2^62-4. However, the ID cannot be increased. Failure to satisfy
1916    /// these conditions will return an error.
1917    ///
1918    /// This method does not close the QUIC connection. Applications are
1919    /// required to call [`close()`] themselves.
1920    ///
1921    /// [`close()`]: ../struct.Connection.html#method.close
1922    pub fn send_goaway(
1923        &mut self, conn: &mut super::Connection, id: u64,
1924    ) -> Result<()> {
1925        let mut id = id;
1926
1927        // TODO: server push
1928        //
1929        // In the meantime always send 0 from client.
1930        if !self.is_server {
1931            id = 0;
1932        }
1933
1934        if self.is_server && id % 4 != 0 {
1935            return Err(Error::IdError);
1936        }
1937
1938        if let Some(sent_id) = self.local_goaway_id {
1939            if id > sent_id {
1940                return Err(Error::IdError);
1941            }
1942        }
1943
1944        if let Some(stream_id) = self.control_stream_id {
1945            let mut d = [42; 10];
1946            let mut b = octets::OctetsMut::with_slice(&mut d);
1947
1948            let frame = frame::Frame::GoAway { id };
1949
1950            let wire_len = frame.to_bytes(&mut b)?;
1951            let stream_cap = conn.stream_capacity(stream_id)?;
1952
1953            if stream_cap < wire_len {
1954                return Err(Error::StreamBlocked);
1955            }
1956
1957            trace!("{} tx frm {:?}", conn.trace_id(), frame);
1958
1959            qlog_with_type!(QLOG_FRAME_CREATED, conn.qlog, q, {
1960                let ev_data = EventData::H3FrameCreated(H3FrameCreated {
1961                    stream_id,
1962                    length: Some(octets::varint_len(id) as u64),
1963                    frame: frame.to_qlog(),
1964                    ..Default::default()
1965                });
1966
1967                q.add_event_data_now(ev_data).ok();
1968            });
1969
1970            let off = b.off();
1971            conn.stream_send(stream_id, &d[..off], false)?;
1972
1973            self.local_goaway_id = Some(id);
1974        }
1975
1976        Ok(())
1977    }
1978
1979    /// Gets the raw settings from peer including unknown and reserved types.
1980    ///
1981    /// The order of settings is the same as received in the SETTINGS frame.
1982    pub fn peer_settings_raw(&self) -> Option<&[(u64, u64)]> {
1983        self.peer_settings.raw.as_deref()
1984    }
1985
1986    fn open_uni_stream(
1987        &mut self, conn: &mut super::Connection, ty: u64,
1988    ) -> Result<u64> {
1989        let stream_id = self.next_uni_stream_id;
1990
1991        let mut d = [0; 8];
1992        let mut b = octets::OctetsMut::with_slice(&mut d);
1993
1994        match ty {
1995            // Control and QPACK streams are the most important to schedule.
1996            stream::HTTP3_CONTROL_STREAM_TYPE_ID |
1997            stream::QPACK_ENCODER_STREAM_TYPE_ID |
1998            stream::QPACK_DECODER_STREAM_TYPE_ID => {
1999                conn.stream_priority(stream_id, 0, false)?;
2000            },
2001
2002            // TODO: Server push
2003            stream::HTTP3_PUSH_STREAM_TYPE_ID => (),
2004
2005            // Anything else is a GREASE stream, so make it the least important.
2006            _ => {
2007                conn.stream_priority(stream_id, 255, false)?;
2008            },
2009        }
2010
2011        conn.stream_send(stream_id, b.put_varint(ty)?, false)?;
2012
2013        // To avoid skipping stream IDs, we only calculate the next available
2014        // stream ID when data has been successfully buffered.
2015        self.next_uni_stream_id = self
2016            .next_uni_stream_id
2017            .checked_add(4)
2018            .ok_or(Error::IdError)?;
2019
2020        Ok(stream_id)
2021    }
2022
2023    fn open_qpack_encoder_stream(
2024        &mut self, conn: &mut super::Connection,
2025    ) -> Result<()> {
2026        let stream_id =
2027            self.open_uni_stream(conn, stream::QPACK_ENCODER_STREAM_TYPE_ID)?;
2028
2029        self.local_qpack_streams.encoder_stream_id = Some(stream_id);
2030
2031        qlog_with_type!(QLOG_STREAM_TYPE_SET, conn.qlog, q, {
2032            let ev_data = EventData::H3StreamTypeSet(H3StreamTypeSet {
2033                stream_id,
2034                owner: Some(H3Owner::Local),
2035                stream_type: H3StreamType::QpackEncode,
2036                ..Default::default()
2037            });
2038
2039            q.add_event_data_now(ev_data).ok();
2040        });
2041
2042        Ok(())
2043    }
2044
2045    fn open_qpack_decoder_stream(
2046        &mut self, conn: &mut super::Connection,
2047    ) -> Result<()> {
2048        let stream_id =
2049            self.open_uni_stream(conn, stream::QPACK_DECODER_STREAM_TYPE_ID)?;
2050
2051        self.local_qpack_streams.decoder_stream_id = Some(stream_id);
2052
2053        qlog_with_type!(QLOG_STREAM_TYPE_SET, conn.qlog, q, {
2054            let ev_data = EventData::H3StreamTypeSet(H3StreamTypeSet {
2055                stream_id,
2056                owner: Some(H3Owner::Local),
2057                stream_type: H3StreamType::QpackDecode,
2058                ..Default::default()
2059            });
2060
2061            q.add_event_data_now(ev_data).ok();
2062        });
2063
2064        Ok(())
2065    }
2066
2067    /// Send GREASE frames on the provided stream ID.
2068    fn send_grease_frames(
2069        &mut self, conn: &mut super::Connection, stream_id: u64,
2070    ) -> Result<()> {
2071        let mut d = [0; 8];
2072
2073        let stream_cap = match conn.stream_capacity(stream_id) {
2074            Ok(v) => v,
2075
2076            Err(e) => {
2077                if conn.stream_finished(stream_id) {
2078                    self.streams.remove(&stream_id);
2079                }
2080
2081                return Err(e.into());
2082            },
2083        };
2084
2085        let grease_frame1 = grease_value();
2086        let grease_frame2 = grease_value();
2087        let grease_payload = b"GREASE is the word";
2088
2089        let overhead = octets::varint_len(grease_frame1) + // frame type
2090            1 + // payload len
2091            octets::varint_len(grease_frame2) + // frame type
2092            1 + // payload len
2093            grease_payload.len(); // payload
2094
2095        // Don't send GREASE if there is not enough capacity for it. Greasing
2096        // will _not_ be attempted again later on.
2097        if stream_cap < overhead {
2098            return Ok(());
2099        }
2100
2101        // Empty GREASE frame.
2102        let mut b = octets::OctetsMut::with_slice(&mut d);
2103        conn.stream_send(stream_id, b.put_varint(grease_frame1)?, false)?;
2104
2105        let mut b = octets::OctetsMut::with_slice(&mut d);
2106        conn.stream_send(stream_id, b.put_varint(0)?, false)?;
2107
2108        trace!(
2109            "{} tx frm GREASE stream={} len=0",
2110            conn.trace_id(),
2111            stream_id
2112        );
2113
2114        qlog_with_type!(QLOG_FRAME_CREATED, conn.qlog, q, {
2115            let frame = Http3Frame::Reserved { length: Some(0) };
2116            let ev_data = EventData::H3FrameCreated(H3FrameCreated {
2117                stream_id,
2118                length: Some(0),
2119                frame,
2120                ..Default::default()
2121            });
2122
2123            q.add_event_data_now(ev_data).ok();
2124        });
2125
2126        // GREASE frame with payload.
2127        let mut b = octets::OctetsMut::with_slice(&mut d);
2128        conn.stream_send(stream_id, b.put_varint(grease_frame2)?, false)?;
2129
2130        let mut b = octets::OctetsMut::with_slice(&mut d);
2131        conn.stream_send(stream_id, b.put_varint(18)?, false)?;
2132
2133        conn.stream_send(stream_id, grease_payload, false)?;
2134
2135        trace!(
2136            "{} tx frm GREASE stream={} len={}",
2137            conn.trace_id(),
2138            stream_id,
2139            grease_payload.len()
2140        );
2141
2142        qlog_with_type!(QLOG_FRAME_CREATED, conn.qlog, q, {
2143            let frame = Http3Frame::Reserved {
2144                length: Some(grease_payload.len() as u64),
2145            };
2146            let ev_data = EventData::H3FrameCreated(H3FrameCreated {
2147                stream_id,
2148                length: Some(grease_payload.len() as u64),
2149                frame,
2150                ..Default::default()
2151            });
2152
2153            q.add_event_data_now(ev_data).ok();
2154        });
2155
2156        Ok(())
2157    }
2158
2159    /// Opens a new unidirectional stream with a GREASE type and sends some
2160    /// unframed payload.
2161    fn open_grease_stream(&mut self, conn: &mut super::Connection) -> Result<()> {
2162        let ty = grease_value();
2163        match self.open_uni_stream(conn, ty) {
2164            Ok(stream_id) => {
2165                conn.stream_send(stream_id, b"GREASE is the word", true)?;
2166
2167                trace!("{} open GREASE stream {}", conn.trace_id(), stream_id);
2168
2169                qlog_with_type!(QLOG_STREAM_TYPE_SET, conn.qlog, q, {
2170                    let ev_data = EventData::H3StreamTypeSet(H3StreamTypeSet {
2171                        stream_id,
2172                        owner: Some(H3Owner::Local),
2173                        stream_type: H3StreamType::Unknown,
2174                        stream_type_value: Some(ty),
2175                        ..Default::default()
2176                    });
2177
2178                    q.add_event_data_now(ev_data).ok();
2179                });
2180            },
2181
2182            Err(Error::IdError) => {
2183                trace!("{} GREASE stream blocked", conn.trace_id(),);
2184
2185                return Ok(());
2186            },
2187
2188            Err(e) => return Err(e),
2189        };
2190
2191        Ok(())
2192    }
2193
2194    /// Sends SETTINGS frame based on HTTP/3 configuration.
2195    fn send_settings(&mut self, conn: &mut super::Connection) -> Result<()> {
2196        let stream_id = match self
2197            .open_uni_stream(conn, stream::HTTP3_CONTROL_STREAM_TYPE_ID)
2198        {
2199            Ok(v) => v,
2200
2201            Err(e) => {
2202                trace!("{} Control stream blocked", conn.trace_id(),);
2203
2204                if e == Error::Done {
2205                    return Err(Error::InternalError);
2206                }
2207
2208                return Err(e);
2209            },
2210        };
2211
2212        self.control_stream_id = Some(stream_id);
2213
2214        qlog_with_type!(QLOG_STREAM_TYPE_SET, conn.qlog, q, {
2215            let ev_data = EventData::H3StreamTypeSet(H3StreamTypeSet {
2216                stream_id,
2217                owner: Some(H3Owner::Local),
2218                stream_type: H3StreamType::Control,
2219                ..Default::default()
2220            });
2221
2222            q.add_event_data_now(ev_data).ok();
2223        });
2224
2225        let grease = if conn.grease {
2226            Some((grease_value(), grease_value()))
2227        } else {
2228            None
2229        };
2230
2231        let frame = frame::Frame::Settings {
2232            max_field_section_size: self.local_settings.max_field_section_size,
2233            qpack_max_table_capacity: self
2234                .local_settings
2235                .qpack_max_table_capacity,
2236            qpack_blocked_streams: self.local_settings.qpack_blocked_streams,
2237            connect_protocol_enabled: self
2238                .local_settings
2239                .connect_protocol_enabled,
2240            h3_datagram: self.local_settings.h3_datagram,
2241            grease,
2242            additional_settings: self.local_settings.additional_settings.clone(),
2243            raw: Default::default(),
2244        };
2245
2246        let mut d = [42; 128];
2247        let mut b = octets::OctetsMut::with_slice(&mut d);
2248
2249        frame.to_bytes(&mut b)?;
2250
2251        let off = b.off();
2252
2253        if let Some(id) = self.control_stream_id {
2254            conn.stream_send(id, &d[..off], false)?;
2255
2256            trace!(
2257                "{} tx frm SETTINGS stream={} len={}",
2258                conn.trace_id(),
2259                id,
2260                off
2261            );
2262
2263            qlog_with_type!(QLOG_FRAME_CREATED, conn.qlog, q, {
2264                let frame = frame.to_qlog();
2265                let ev_data = EventData::H3FrameCreated(H3FrameCreated {
2266                    stream_id: id,
2267                    length: Some(off as u64),
2268                    frame,
2269                    ..Default::default()
2270                });
2271
2272                q.add_event_data_now(ev_data).ok();
2273            });
2274        }
2275
2276        Ok(())
2277    }
2278
2279    fn process_control_stream(
2280        &mut self, conn: &mut super::Connection, stream_id: u64,
2281    ) -> Result<(u64, Event)> {
2282        close_conn_if_critical_stream_finished(conn, stream_id)?;
2283
2284        if !conn.stream_readable(stream_id) {
2285            return Err(Error::Done);
2286        }
2287
2288        match self.process_readable_stream(conn, stream_id, true) {
2289            Ok(ev) => return Ok(ev),
2290
2291            Err(Error::Done) => (),
2292
2293            Err(e) => return Err(e),
2294        };
2295
2296        close_conn_if_critical_stream_finished(conn, stream_id)?;
2297
2298        Err(Error::Done)
2299    }
2300
2301    fn process_readable_stream(
2302        &mut self, conn: &mut super::Connection, stream_id: u64, polling: bool,
2303    ) -> Result<(u64, Event)> {
2304        self.streams
2305            .entry(stream_id)
2306            .or_insert_with(|| stream::Stream::new(stream_id, false));
2307
2308        // We need to get a fresh reference to the stream for each
2309        // iteration, to avoid borrowing `self` for the entire duration
2310        // of the loop, because we'll need to borrow it again in the
2311        // `State::FramePayload` case below.
2312        while let Some(stream) = self.streams.get_mut(&stream_id) {
2313            match stream.state() {
2314                stream::State::StreamType => {
2315                    stream.try_fill_buffer(conn)?;
2316
2317                    let varint = match stream.try_consume_varint() {
2318                        Ok(v) => v,
2319
2320                        Err(_) => continue,
2321                    };
2322
2323                    let ty = stream::Type::deserialize(varint)?;
2324
2325                    if let Err(e) = stream.set_ty(ty) {
2326                        conn.close(true, e.to_wire(), b"")?;
2327                        return Err(e);
2328                    }
2329
2330                    qlog_with_type!(QLOG_STREAM_TYPE_SET, conn.qlog, q, {
2331                        let ty_val = if matches!(ty, stream::Type::Unknown) {
2332                            Some(varint)
2333                        } else {
2334                            None
2335                        };
2336
2337                        let ev_data =
2338                            EventData::H3StreamTypeSet(H3StreamTypeSet {
2339                                stream_id,
2340                                owner: Some(H3Owner::Remote),
2341                                stream_type: ty.to_qlog(),
2342                                stream_type_value: ty_val,
2343                                ..Default::default()
2344                            });
2345
2346                        q.add_event_data_now(ev_data).ok();
2347                    });
2348
2349                    match &ty {
2350                        stream::Type::Control => {
2351                            // Only one control stream allowed.
2352                            if self.peer_control_stream_id.is_some() {
2353                                conn.close(
2354                                    true,
2355                                    Error::StreamCreationError.to_wire(),
2356                                    b"Received multiple control streams",
2357                                )?;
2358
2359                                return Err(Error::StreamCreationError);
2360                            }
2361
2362                            trace!(
2363                                "{} open peer's control stream {}",
2364                                conn.trace_id(),
2365                                stream_id
2366                            );
2367
2368                            close_conn_if_critical_stream_finished(
2369                                conn, stream_id,
2370                            )?;
2371
2372                            self.peer_control_stream_id = Some(stream_id);
2373                        },
2374
2375                        stream::Type::Push => {
2376                            // Only clients can receive push stream.
2377                            if self.is_server {
2378                                conn.close(
2379                                    true,
2380                                    Error::StreamCreationError.to_wire(),
2381                                    b"Server received push stream.",
2382                                )?;
2383
2384                                return Err(Error::StreamCreationError);
2385                            }
2386                        },
2387
2388                        stream::Type::QpackEncoder => {
2389                            // Only one qpack encoder stream allowed.
2390                            if self.peer_qpack_streams.encoder_stream_id.is_some()
2391                            {
2392                                conn.close(
2393                                    true,
2394                                    Error::StreamCreationError.to_wire(),
2395                                    b"Received multiple QPACK encoder streams",
2396                                )?;
2397
2398                                return Err(Error::StreamCreationError);
2399                            }
2400
2401                            close_conn_if_critical_stream_finished(
2402                                conn, stream_id,
2403                            )?;
2404
2405                            self.peer_qpack_streams.encoder_stream_id =
2406                                Some(stream_id);
2407                        },
2408
2409                        stream::Type::QpackDecoder => {
2410                            // Only one qpack decoder allowed.
2411                            if self.peer_qpack_streams.decoder_stream_id.is_some()
2412                            {
2413                                conn.close(
2414                                    true,
2415                                    Error::StreamCreationError.to_wire(),
2416                                    b"Received multiple QPACK decoder streams",
2417                                )?;
2418
2419                                return Err(Error::StreamCreationError);
2420                            }
2421
2422                            close_conn_if_critical_stream_finished(
2423                                conn, stream_id,
2424                            )?;
2425
2426                            self.peer_qpack_streams.decoder_stream_id =
2427                                Some(stream_id);
2428                        },
2429
2430                        stream::Type::Unknown => {
2431                            // Unknown stream types are ignored.
2432                            // TODO: we MAY send STOP_SENDING
2433                        },
2434
2435                        stream::Type::Request => unreachable!(),
2436                    }
2437                },
2438
2439                stream::State::PushId => {
2440                    stream.try_fill_buffer(conn)?;
2441
2442                    let varint = match stream.try_consume_varint() {
2443                        Ok(v) => v,
2444
2445                        Err(_) => continue,
2446                    };
2447
2448                    if let Err(e) = stream.set_push_id(varint) {
2449                        conn.close(true, e.to_wire(), b"")?;
2450                        return Err(e);
2451                    }
2452                },
2453
2454                stream::State::FrameType => {
2455                    stream.try_fill_buffer(conn)?;
2456
2457                    let varint = match stream.try_consume_varint() {
2458                        Ok(v) => v,
2459
2460                        Err(_) => continue,
2461                    };
2462
2463                    match stream.set_frame_type(varint) {
2464                        Err(Error::FrameUnexpected) => {
2465                            let msg = format!("Unexpected frame type {varint}");
2466
2467                            conn.close(
2468                                true,
2469                                Error::FrameUnexpected.to_wire(),
2470                                msg.as_bytes(),
2471                            )?;
2472
2473                            return Err(Error::FrameUnexpected);
2474                        },
2475
2476                        Err(e) => {
2477                            conn.close(
2478                                true,
2479                                e.to_wire(),
2480                                b"Error handling frame.",
2481                            )?;
2482
2483                            return Err(e);
2484                        },
2485
2486                        _ => (),
2487                    }
2488                },
2489
2490                stream::State::FramePayloadLen => {
2491                    stream.try_fill_buffer(conn)?;
2492
2493                    let payload_len = match stream.try_consume_varint() {
2494                        Ok(v) => v,
2495
2496                        Err(_) => continue,
2497                    };
2498
2499                    // DATA frames are handled uniquely. After this point we lose
2500                    // visibility of DATA framing, so just log here.
2501                    if Some(frame::DATA_FRAME_TYPE_ID) == stream.frame_type() {
2502                        trace!(
2503                            "{} rx frm DATA stream={} wire_payload_len={}",
2504                            conn.trace_id(),
2505                            stream_id,
2506                            payload_len
2507                        );
2508
2509                        qlog_with_type!(QLOG_FRAME_PARSED, conn.qlog, q, {
2510                            let frame = Http3Frame::Data { raw: None };
2511
2512                            let ev_data =
2513                                EventData::H3FrameParsed(H3FrameParsed {
2514                                    stream_id,
2515                                    length: Some(payload_len),
2516                                    frame,
2517                                    ..Default::default()
2518                                });
2519
2520                            q.add_event_data_now(ev_data).ok();
2521                        });
2522                    }
2523
2524                    if let Err(e) = stream.set_frame_payload_len(payload_len) {
2525                        conn.close(true, e.to_wire(), b"")?;
2526                        return Err(e);
2527                    }
2528                },
2529
2530                stream::State::FramePayload => {
2531                    // Do not emit events when not polling.
2532                    if !polling {
2533                        break;
2534                    }
2535
2536                    stream.try_fill_buffer(conn)?;
2537
2538                    let (frame, payload_len) = match stream.try_consume_frame() {
2539                        Ok(frame) => frame,
2540
2541                        Err(Error::Done) => return Err(Error::Done),
2542
2543                        Err(e) => {
2544                            conn.close(
2545                                true,
2546                                e.to_wire(),
2547                                b"Error handling frame.",
2548                            )?;
2549
2550                            return Err(e);
2551                        },
2552                    };
2553
2554                    match self.process_frame(conn, stream_id, frame, payload_len)
2555                    {
2556                        Ok(ev) => return Ok(ev),
2557
2558                        Err(Error::Done) => {
2559                            // This might be a frame that is processed internally
2560                            // without needing to bubble up to the user as an
2561                            // event. Check whether the frame has FIN'd by QUIC
2562                            // to prevent trying to read again on a closed stream.
2563                            if conn.stream_finished(stream_id) {
2564                                break;
2565                            }
2566                        },
2567
2568                        Err(e) => return Err(e),
2569                    };
2570                },
2571
2572                stream::State::Data => {
2573                    // Do not emit events when not polling.
2574                    if !polling {
2575                        break;
2576                    }
2577
2578                    if !stream.try_trigger_data_event() {
2579                        break;
2580                    }
2581
2582                    return Ok((stream_id, Event::Data));
2583                },
2584
2585                stream::State::QpackInstruction => {
2586                    let mut d = [0; 4096];
2587
2588                    // Read data from the stream and discard immediately.
2589                    loop {
2590                        let (recv, fin) = conn.stream_recv(stream_id, &mut d)?;
2591
2592                        match stream.ty() {
2593                            Some(stream::Type::QpackEncoder) =>
2594                                self.peer_qpack_streams.encoder_stream_bytes +=
2595                                    recv as u64,
2596                            Some(stream::Type::QpackDecoder) =>
2597                                self.peer_qpack_streams.decoder_stream_bytes +=
2598                                    recv as u64,
2599                            _ => unreachable!(),
2600                        };
2601
2602                        if fin {
2603                            close_conn_critical_stream(conn)?;
2604                        }
2605                    }
2606                },
2607
2608                stream::State::Drain => {
2609                    // Discard incoming data on the stream.
2610                    conn.stream_shutdown(
2611                        stream_id,
2612                        crate::Shutdown::Read,
2613                        0x100,
2614                    )?;
2615
2616                    break;
2617                },
2618
2619                stream::State::Finished => break,
2620            }
2621        }
2622
2623        Err(Error::Done)
2624    }
2625
2626    fn process_finished_stream(&mut self, stream_id: u64) {
2627        let stream = match self.streams.get_mut(&stream_id) {
2628            Some(v) => v,
2629
2630            None => return,
2631        };
2632
2633        if stream.state() == stream::State::Finished {
2634            return;
2635        }
2636
2637        match stream.ty() {
2638            Some(stream::Type::Request) | Some(stream::Type::Push) => {
2639                stream.finished();
2640
2641                self.finished_streams.push_back(stream_id);
2642            },
2643
2644            _ => (),
2645        };
2646    }
2647
2648    fn process_frame(
2649        &mut self, conn: &mut super::Connection, stream_id: u64,
2650        frame: frame::Frame, payload_len: u64,
2651    ) -> Result<(u64, Event)> {
2652        trace!(
2653            "{} rx frm {:?} stream={} payload_len={}",
2654            conn.trace_id(),
2655            frame,
2656            stream_id,
2657            payload_len
2658        );
2659
2660        qlog_with_type!(QLOG_FRAME_PARSED, conn.qlog, q, {
2661            // HEADERS frames are special case and will be logged below.
2662            if !matches!(frame, frame::Frame::Headers { .. }) {
2663                let frame = frame.to_qlog();
2664                let ev_data = EventData::H3FrameParsed(H3FrameParsed {
2665                    stream_id,
2666                    length: Some(payload_len),
2667                    frame,
2668                    ..Default::default()
2669                });
2670
2671                q.add_event_data_now(ev_data).ok();
2672            }
2673        });
2674
2675        match frame {
2676            frame::Frame::Settings {
2677                max_field_section_size,
2678                qpack_max_table_capacity,
2679                qpack_blocked_streams,
2680                connect_protocol_enabled,
2681                h3_datagram,
2682                additional_settings,
2683                raw,
2684                ..
2685            } => {
2686                self.peer_settings = ConnectionSettings {
2687                    max_field_section_size,
2688                    qpack_max_table_capacity,
2689                    qpack_blocked_streams,
2690                    connect_protocol_enabled,
2691                    h3_datagram,
2692                    additional_settings,
2693                    raw,
2694                };
2695
2696                if let Some(1) = h3_datagram {
2697                    // The peer MUST have also enabled DATAGRAM with a TP
2698                    if conn.dgram_max_writable_len().is_none() {
2699                        conn.close(
2700                            true,
2701                            Error::SettingsError.to_wire(),
2702                            b"H3_DATAGRAM sent with value 1 but max_datagram_frame_size TP not set.",
2703                        )?;
2704
2705                        return Err(Error::SettingsError);
2706                    }
2707                }
2708            },
2709
2710            frame::Frame::Headers { header_block } => {
2711                if Some(stream_id) == self.peer_control_stream_id {
2712                    conn.close(
2713                        true,
2714                        Error::FrameUnexpected.to_wire(),
2715                        b"HEADERS received on control stream",
2716                    )?;
2717
2718                    return Err(Error::FrameUnexpected);
2719                }
2720
2721                // Servers reject too many HEADERS frames.
2722                if let Some(s) = self.streams.get_mut(&stream_id) {
2723                    if self.is_server && s.headers_received_count() == 2 {
2724                        conn.close(
2725                            true,
2726                            Error::FrameUnexpected.to_wire(),
2727                            b"Too many HEADERS frames",
2728                        )?;
2729                        return Err(Error::FrameUnexpected);
2730                    }
2731
2732                    s.increment_headers_received();
2733                }
2734
2735                // Use "infinite" as default value for max_field_section_size if
2736                // it is not configured by the application.
2737                let max_size = self
2738                    .local_settings
2739                    .max_field_section_size
2740                    .unwrap_or(u64::MAX);
2741
2742                let headers = match self
2743                    .qpack_decoder
2744                    .decode(&header_block[..], max_size)
2745                {
2746                    Ok(v) => v,
2747
2748                    Err(e) => {
2749                        let e = match e {
2750                            qpack::Error::HeaderListTooLarge =>
2751                                Error::ExcessiveLoad,
2752
2753                            _ => Error::QpackDecompressionFailed,
2754                        };
2755
2756                        conn.close(true, e.to_wire(), b"Error parsing headers.")?;
2757
2758                        return Err(e);
2759                    },
2760                };
2761
2762                qlog_with_type!(QLOG_FRAME_PARSED, conn.qlog, q, {
2763                    let qlog_headers = headers
2764                        .iter()
2765                        .map(|h| qlog::events::h3::HttpHeader {
2766                            name: String::from_utf8_lossy(h.name()).into_owned(),
2767                            value: String::from_utf8_lossy(h.value())
2768                                .into_owned(),
2769                        })
2770                        .collect();
2771
2772                    let frame = Http3Frame::Headers {
2773                        headers: qlog_headers,
2774                    };
2775
2776                    let ev_data = EventData::H3FrameParsed(H3FrameParsed {
2777                        stream_id,
2778                        length: Some(payload_len),
2779                        frame,
2780                        ..Default::default()
2781                    });
2782
2783                    q.add_event_data_now(ev_data).ok();
2784                });
2785
2786                let more_frames = !conn.stream_finished(stream_id);
2787
2788                return Ok((stream_id, Event::Headers {
2789                    list: headers,
2790                    more_frames,
2791                }));
2792            },
2793
2794            frame::Frame::Data { .. } => {
2795                if Some(stream_id) == self.peer_control_stream_id {
2796                    conn.close(
2797                        true,
2798                        Error::FrameUnexpected.to_wire(),
2799                        b"DATA received on control stream",
2800                    )?;
2801
2802                    return Err(Error::FrameUnexpected);
2803                }
2804
2805                // Do nothing. The Data event is returned separately.
2806            },
2807
2808            frame::Frame::GoAway { id } => {
2809                if Some(stream_id) != self.peer_control_stream_id {
2810                    conn.close(
2811                        true,
2812                        Error::FrameUnexpected.to_wire(),
2813                        b"GOAWAY received on non-control stream",
2814                    )?;
2815
2816                    return Err(Error::FrameUnexpected);
2817                }
2818
2819                if !self.is_server && id % 4 != 0 {
2820                    conn.close(
2821                        true,
2822                        Error::FrameUnexpected.to_wire(),
2823                        b"GOAWAY received with ID of non-request stream",
2824                    )?;
2825
2826                    return Err(Error::IdError);
2827                }
2828
2829                if let Some(received_id) = self.peer_goaway_id {
2830                    if id > received_id {
2831                        conn.close(
2832                            true,
2833                            Error::IdError.to_wire(),
2834                            b"GOAWAY received with ID larger than previously received",
2835                        )?;
2836
2837                        return Err(Error::IdError);
2838                    }
2839                }
2840
2841                self.peer_goaway_id = Some(id);
2842
2843                return Ok((id, Event::GoAway));
2844            },
2845
2846            frame::Frame::MaxPushId { push_id } => {
2847                if Some(stream_id) != self.peer_control_stream_id {
2848                    conn.close(
2849                        true,
2850                        Error::FrameUnexpected.to_wire(),
2851                        b"MAX_PUSH_ID received on non-control stream",
2852                    )?;
2853
2854                    return Err(Error::FrameUnexpected);
2855                }
2856
2857                if !self.is_server {
2858                    conn.close(
2859                        true,
2860                        Error::FrameUnexpected.to_wire(),
2861                        b"MAX_PUSH_ID received by client",
2862                    )?;
2863
2864                    return Err(Error::FrameUnexpected);
2865                }
2866
2867                if push_id < self.max_push_id {
2868                    conn.close(
2869                        true,
2870                        Error::IdError.to_wire(),
2871                        b"MAX_PUSH_ID reduced limit",
2872                    )?;
2873
2874                    return Err(Error::IdError);
2875                }
2876
2877                self.max_push_id = push_id;
2878            },
2879
2880            frame::Frame::PushPromise { .. } => {
2881                if self.is_server {
2882                    conn.close(
2883                        true,
2884                        Error::FrameUnexpected.to_wire(),
2885                        b"PUSH_PROMISE received by server",
2886                    )?;
2887
2888                    return Err(Error::FrameUnexpected);
2889                }
2890
2891                if stream_id % 4 != 0 {
2892                    conn.close(
2893                        true,
2894                        Error::FrameUnexpected.to_wire(),
2895                        b"PUSH_PROMISE received on non-request stream",
2896                    )?;
2897
2898                    return Err(Error::FrameUnexpected);
2899                }
2900
2901                // TODO: implement more checks and PUSH_PROMISE event
2902            },
2903
2904            frame::Frame::CancelPush { .. } => {
2905                if Some(stream_id) != self.peer_control_stream_id {
2906                    conn.close(
2907                        true,
2908                        Error::FrameUnexpected.to_wire(),
2909                        b"CANCEL_PUSH received on non-control stream",
2910                    )?;
2911
2912                    return Err(Error::FrameUnexpected);
2913                }
2914
2915                // TODO: implement CANCEL_PUSH frame
2916            },
2917
2918            frame::Frame::PriorityUpdateRequest {
2919                prioritized_element_id,
2920                priority_field_value,
2921            } => {
2922                if !self.is_server {
2923                    conn.close(
2924                        true,
2925                        Error::FrameUnexpected.to_wire(),
2926                        b"PRIORITY_UPDATE received by client",
2927                    )?;
2928
2929                    return Err(Error::FrameUnexpected);
2930                }
2931
2932                if Some(stream_id) != self.peer_control_stream_id {
2933                    conn.close(
2934                        true,
2935                        Error::FrameUnexpected.to_wire(),
2936                        b"PRIORITY_UPDATE received on non-control stream",
2937                    )?;
2938
2939                    return Err(Error::FrameUnexpected);
2940                }
2941
2942                if prioritized_element_id % 4 != 0 {
2943                    conn.close(
2944                        true,
2945                        Error::FrameUnexpected.to_wire(),
2946                        b"PRIORITY_UPDATE for request stream type with wrong ID",
2947                    )?;
2948
2949                    return Err(Error::FrameUnexpected);
2950                }
2951
2952                if prioritized_element_id > conn.streams.max_streams_bidi() * 4 {
2953                    conn.close(
2954                        true,
2955                        Error::IdError.to_wire(),
2956                        b"PRIORITY_UPDATE for request stream beyond max streams limit",
2957                    )?;
2958
2959                    return Err(Error::IdError);
2960                }
2961
2962                // If the PRIORITY_UPDATE is valid, consider storing the latest
2963                // contents. Due to reordering, it is possible that we might
2964                // receive frames that reference streams that have not yet to
2965                // been opened and that's OK because it's within our concurrency
2966                // limit. However, we discard PRIORITY_UPDATE that refers to
2967                // streams that we know have been collected.
2968                if conn.streams.is_collected(prioritized_element_id) {
2969                    return Err(Error::Done);
2970                }
2971
2972                // If the stream did not yet exist, create it and store.
2973                let stream =
2974                    self.streams.entry(prioritized_element_id).or_insert_with(
2975                        || stream::Stream::new(prioritized_element_id, false),
2976                    );
2977
2978                let had_priority_update = stream.has_last_priority_update();
2979                stream.set_last_priority_update(Some(priority_field_value));
2980
2981                // Only trigger the event when there wasn't already a stored
2982                // PRIORITY_UPDATE.
2983                if !had_priority_update {
2984                    return Ok((prioritized_element_id, Event::PriorityUpdate));
2985                } else {
2986                    return Err(Error::Done);
2987                }
2988            },
2989
2990            frame::Frame::PriorityUpdatePush {
2991                prioritized_element_id,
2992                ..
2993            } => {
2994                if !self.is_server {
2995                    conn.close(
2996                        true,
2997                        Error::FrameUnexpected.to_wire(),
2998                        b"PRIORITY_UPDATE received by client",
2999                    )?;
3000
3001                    return Err(Error::FrameUnexpected);
3002                }
3003
3004                if Some(stream_id) != self.peer_control_stream_id {
3005                    conn.close(
3006                        true,
3007                        Error::FrameUnexpected.to_wire(),
3008                        b"PRIORITY_UPDATE received on non-control stream",
3009                    )?;
3010
3011                    return Err(Error::FrameUnexpected);
3012                }
3013
3014                if prioritized_element_id % 3 != 0 {
3015                    conn.close(
3016                        true,
3017                        Error::FrameUnexpected.to_wire(),
3018                        b"PRIORITY_UPDATE for push stream type with wrong ID",
3019                    )?;
3020
3021                    return Err(Error::FrameUnexpected);
3022                }
3023
3024                // TODO: we only implement this if we implement server push
3025            },
3026
3027            frame::Frame::Unknown { .. } => (),
3028        }
3029
3030        Err(Error::Done)
3031    }
3032
3033    /// Collects and returns statistics about the connection.
3034    #[inline]
3035    pub fn stats(&self) -> Stats {
3036        Stats {
3037            qpack_encoder_stream_recv_bytes: self
3038                .peer_qpack_streams
3039                .encoder_stream_bytes,
3040            qpack_decoder_stream_recv_bytes: self
3041                .peer_qpack_streams
3042                .decoder_stream_bytes,
3043        }
3044    }
3045}
3046
3047/// Generates an HTTP/3 GREASE variable length integer.
3048pub fn grease_value() -> u64 {
3049    let n = super::rand::rand_u64_uniform(148_764_065_110_560_899);
3050    31 * n + 33
3051}
3052
3053#[doc(hidden)]
3054pub mod testing {
3055    use super::*;
3056
3057    use crate::testing;
3058
3059    /// Session is an HTTP/3 test helper structure. It holds a client, server
3060    /// and pipe that allows them to communicate.
3061    ///
3062    /// `default()` creates a session with some sensible default
3063    /// configuration. `with_configs()` allows for providing a specific
3064    /// configuration.
3065    ///
3066    /// `handshake()` performs all the steps needed to establish an HTTP/3
3067    /// connection.
3068    ///
3069    /// Some utility functions are provided that make it less verbose to send
3070    /// request, responses and individual headers. The full quiche API remains
3071    /// available for any test that need to do unconventional things (such as
3072    /// bad behaviour that triggers errors).
3073    pub struct Session {
3074        pub pipe: testing::Pipe,
3075        pub client: Connection,
3076        pub server: Connection,
3077    }
3078
3079    impl Session {
3080        pub fn new() -> Result<Session> {
3081            fn path_relative_to_manifest_dir(path: &str) -> String {
3082                std::fs::canonicalize(
3083                    std::path::Path::new(env!("CARGO_MANIFEST_DIR")).join(path),
3084                )
3085                .unwrap()
3086                .to_string_lossy()
3087                .into_owned()
3088            }
3089
3090            let mut config = crate::Config::new(crate::PROTOCOL_VERSION)?;
3091            config.load_cert_chain_from_pem_file(
3092                &path_relative_to_manifest_dir("examples/cert.crt"),
3093            )?;
3094            config.load_priv_key_from_pem_file(
3095                &path_relative_to_manifest_dir("examples/cert.key"),
3096            )?;
3097            config.set_application_protos(&[b"h3"])?;
3098            config.set_initial_max_data(1500);
3099            config.set_initial_max_stream_data_bidi_local(150);
3100            config.set_initial_max_stream_data_bidi_remote(150);
3101            config.set_initial_max_stream_data_uni(150);
3102            config.set_initial_max_streams_bidi(5);
3103            config.set_initial_max_streams_uni(5);
3104            config.verify_peer(false);
3105            config.enable_dgram(true, 3, 3);
3106            config.set_ack_delay_exponent(8);
3107
3108            let h3_config = Config::new()?;
3109            Session::with_configs(&mut config, &h3_config)
3110        }
3111
3112        pub fn with_configs(
3113            config: &mut crate::Config, h3_config: &Config,
3114        ) -> Result<Session> {
3115            let pipe = testing::Pipe::with_config(config)?;
3116            let client_dgram = pipe.client.dgram_enabled();
3117            let server_dgram = pipe.server.dgram_enabled();
3118            Ok(Session {
3119                pipe,
3120                client: Connection::new(h3_config, false, client_dgram)?,
3121                server: Connection::new(h3_config, true, server_dgram)?,
3122            })
3123        }
3124
3125        /// Do the HTTP/3 handshake so both ends are in sane initial state.
3126        pub fn handshake(&mut self) -> Result<()> {
3127            self.pipe.handshake()?;
3128
3129            // Client streams.
3130            self.client.send_settings(&mut self.pipe.client)?;
3131            self.pipe.advance().ok();
3132
3133            self.client
3134                .open_qpack_encoder_stream(&mut self.pipe.client)?;
3135            self.pipe.advance().ok();
3136
3137            self.client
3138                .open_qpack_decoder_stream(&mut self.pipe.client)?;
3139            self.pipe.advance().ok();
3140
3141            if self.pipe.client.grease {
3142                self.client.open_grease_stream(&mut self.pipe.client)?;
3143            }
3144
3145            self.pipe.advance().ok();
3146
3147            // Server streams.
3148            self.server.send_settings(&mut self.pipe.server)?;
3149            self.pipe.advance().ok();
3150
3151            self.server
3152                .open_qpack_encoder_stream(&mut self.pipe.server)?;
3153            self.pipe.advance().ok();
3154
3155            self.server
3156                .open_qpack_decoder_stream(&mut self.pipe.server)?;
3157            self.pipe.advance().ok();
3158
3159            if self.pipe.server.grease {
3160                self.server.open_grease_stream(&mut self.pipe.server)?;
3161            }
3162
3163            self.advance().ok();
3164
3165            while self.client.poll(&mut self.pipe.client).is_ok() {
3166                // Do nothing.
3167            }
3168
3169            while self.server.poll(&mut self.pipe.server).is_ok() {
3170                // Do nothing.
3171            }
3172
3173            Ok(())
3174        }
3175
3176        /// Advances the session pipe over the buffer.
3177        pub fn advance(&mut self) -> crate::Result<()> {
3178            self.pipe.advance()
3179        }
3180
3181        /// Polls the client for events.
3182        pub fn poll_client(&mut self) -> Result<(u64, Event)> {
3183            self.client.poll(&mut self.pipe.client)
3184        }
3185
3186        /// Polls the server for events.
3187        pub fn poll_server(&mut self) -> Result<(u64, Event)> {
3188            self.server.poll(&mut self.pipe.server)
3189        }
3190
3191        /// Sends a request from client with default headers.
3192        ///
3193        /// On success it returns the newly allocated stream and the headers.
3194        pub fn send_request(&mut self, fin: bool) -> Result<(u64, Vec<Header>)> {
3195            let req = vec![
3196                Header::new(b":method", b"GET"),
3197                Header::new(b":scheme", b"https"),
3198                Header::new(b":authority", b"quic.tech"),
3199                Header::new(b":path", b"/test"),
3200                Header::new(b"user-agent", b"quiche-test"),
3201            ];
3202
3203            let stream =
3204                self.client.send_request(&mut self.pipe.client, &req, fin)?;
3205
3206            self.advance().ok();
3207
3208            Ok((stream, req))
3209        }
3210
3211        /// Sends a response from server with default headers.
3212        ///
3213        /// On success it returns the headers.
3214        pub fn send_response(
3215            &mut self, stream: u64, fin: bool,
3216        ) -> Result<Vec<Header>> {
3217            let resp = vec![
3218                Header::new(b":status", b"200"),
3219                Header::new(b"server", b"quiche-test"),
3220            ];
3221
3222            self.server.send_response(
3223                &mut self.pipe.server,
3224                stream,
3225                &resp,
3226                fin,
3227            )?;
3228
3229            self.advance().ok();
3230
3231            Ok(resp)
3232        }
3233
3234        /// Sends some default payload from client.
3235        ///
3236        /// On success it returns the payload.
3237        pub fn send_body_client(
3238            &mut self, stream: u64, fin: bool,
3239        ) -> Result<Vec<u8>> {
3240            let bytes = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
3241
3242            self.client
3243                .send_body(&mut self.pipe.client, stream, &bytes, fin)?;
3244
3245            self.advance().ok();
3246
3247            Ok(bytes)
3248        }
3249
3250        /// Fetches DATA payload from the server.
3251        ///
3252        /// On success it returns the number of bytes received.
3253        pub fn recv_body_client(
3254            &mut self, stream: u64, buf: &mut [u8],
3255        ) -> Result<usize> {
3256            self.client.recv_body(&mut self.pipe.client, stream, buf)
3257        }
3258
3259        /// Sends some default payload from server.
3260        ///
3261        /// On success it returns the payload.
3262        pub fn send_body_server(
3263            &mut self, stream: u64, fin: bool,
3264        ) -> Result<Vec<u8>> {
3265            let bytes = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
3266
3267            self.server
3268                .send_body(&mut self.pipe.server, stream, &bytes, fin)?;
3269
3270            self.advance().ok();
3271
3272            Ok(bytes)
3273        }
3274
3275        /// Fetches DATA payload from the client.
3276        ///
3277        /// On success it returns the number of bytes received.
3278        pub fn recv_body_server(
3279            &mut self, stream: u64, buf: &mut [u8],
3280        ) -> Result<usize> {
3281            self.server.recv_body(&mut self.pipe.server, stream, buf)
3282        }
3283
3284        /// Sends a single HTTP/3 frame from the client.
3285        pub fn send_frame_client(
3286            &mut self, frame: frame::Frame, stream_id: u64, fin: bool,
3287        ) -> Result<()> {
3288            let mut d = [42; 65535];
3289
3290            let mut b = octets::OctetsMut::with_slice(&mut d);
3291
3292            frame.to_bytes(&mut b)?;
3293
3294            let off = b.off();
3295            self.pipe.client.stream_send(stream_id, &d[..off], fin)?;
3296
3297            self.advance().ok();
3298
3299            Ok(())
3300        }
3301
3302        /// Send an HTTP/3 DATAGRAM with default data from the client.
3303        ///
3304        /// On success it returns the data.
3305        pub fn send_dgram_client(&mut self, flow_id: u64) -> Result<Vec<u8>> {
3306            let bytes = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
3307            let len = octets::varint_len(flow_id) + bytes.len();
3308            let mut d = vec![0; len];
3309            let mut b = octets::OctetsMut::with_slice(&mut d);
3310
3311            b.put_varint(flow_id)?;
3312            b.put_bytes(&bytes)?;
3313
3314            self.pipe.client.dgram_send(&d)?;
3315
3316            self.advance().ok();
3317
3318            Ok(bytes)
3319        }
3320
3321        /// Receives an HTTP/3 DATAGRAM from the server.
3322        ///
3323        /// On success it returns the DATAGRAM length, flow ID and flow ID
3324        /// length.
3325        pub fn recv_dgram_client(
3326            &mut self, buf: &mut [u8],
3327        ) -> Result<(usize, u64, usize)> {
3328            let len = self.pipe.client.dgram_recv(buf)?;
3329            let mut b = octets::Octets::with_slice(buf);
3330            let flow_id = b.get_varint()?;
3331
3332            Ok((len, flow_id, b.off()))
3333        }
3334
3335        /// Send an HTTP/3 DATAGRAM with default data from the server
3336        ///
3337        /// On success it returns the data.
3338        pub fn send_dgram_server(&mut self, flow_id: u64) -> Result<Vec<u8>> {
3339            let bytes = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
3340            let len = octets::varint_len(flow_id) + bytes.len();
3341            let mut d = vec![0; len];
3342            let mut b = octets::OctetsMut::with_slice(&mut d);
3343
3344            b.put_varint(flow_id)?;
3345            b.put_bytes(&bytes)?;
3346
3347            self.pipe.server.dgram_send(&d)?;
3348
3349            self.advance().ok();
3350
3351            Ok(bytes)
3352        }
3353
3354        /// Receives an HTTP/3 DATAGRAM from the client.
3355        ///
3356        /// On success it returns the DATAGRAM length, flow ID and flow ID
3357        /// length.
3358        pub fn recv_dgram_server(
3359            &mut self, buf: &mut [u8],
3360        ) -> Result<(usize, u64, usize)> {
3361            let len = self.pipe.server.dgram_recv(buf)?;
3362            let mut b = octets::Octets::with_slice(buf);
3363            let flow_id = b.get_varint()?;
3364
3365            Ok((len, flow_id, b.off()))
3366        }
3367
3368        /// Sends a single HTTP/3 frame from the server.
3369        pub fn send_frame_server(
3370            &mut self, frame: frame::Frame, stream_id: u64, fin: bool,
3371        ) -> Result<()> {
3372            let mut d = [42; 65535];
3373
3374            let mut b = octets::OctetsMut::with_slice(&mut d);
3375
3376            frame.to_bytes(&mut b)?;
3377
3378            let off = b.off();
3379            self.pipe.server.stream_send(stream_id, &d[..off], fin)?;
3380
3381            self.advance().ok();
3382
3383            Ok(())
3384        }
3385
3386        /// Sends an arbitrary buffer of HTTP/3 stream data from the client.
3387        pub fn send_arbitrary_stream_data_client(
3388            &mut self, data: &[u8], stream_id: u64, fin: bool,
3389        ) -> Result<()> {
3390            self.pipe.client.stream_send(stream_id, data, fin)?;
3391
3392            self.advance().ok();
3393
3394            Ok(())
3395        }
3396
3397        /// Sends an arbitrary buffer of HTTP/3 stream data from the server.
3398        pub fn send_arbitrary_stream_data_server(
3399            &mut self, data: &[u8], stream_id: u64, fin: bool,
3400        ) -> Result<()> {
3401            self.pipe.server.stream_send(stream_id, data, fin)?;
3402
3403            self.advance().ok();
3404
3405            Ok(())
3406        }
3407    }
3408}
3409
3410#[cfg(test)]
3411mod tests {
3412    use super::*;
3413
3414    use super::testing::*;
3415
3416    #[test]
3417    /// Make sure that random GREASE values is within the specified limit.
3418    fn grease_value_in_varint_limit() {
3419        assert!(grease_value() < 2u64.pow(62) - 1);
3420    }
3421
3422    #[cfg(not(feature = "openssl"))] // 0-RTT not supported when using openssl/quictls
3423    #[test]
3424    fn h3_handshake_0rtt() {
3425        let mut buf = [0; 65535];
3426
3427        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
3428        config
3429            .load_cert_chain_from_pem_file("examples/cert.crt")
3430            .unwrap();
3431        config
3432            .load_priv_key_from_pem_file("examples/cert.key")
3433            .unwrap();
3434        config
3435            .set_application_protos(&[b"proto1", b"proto2"])
3436            .unwrap();
3437        config.set_initial_max_data(30);
3438        config.set_initial_max_stream_data_bidi_local(15);
3439        config.set_initial_max_stream_data_bidi_remote(15);
3440        config.set_initial_max_stream_data_uni(15);
3441        config.set_initial_max_streams_bidi(3);
3442        config.set_initial_max_streams_uni(3);
3443        config.enable_early_data();
3444        config.verify_peer(false);
3445
3446        let h3_config = Config::new().unwrap();
3447
3448        // Perform initial handshake.
3449        let mut pipe = crate::testing::Pipe::with_config(&mut config).unwrap();
3450        assert_eq!(pipe.handshake(), Ok(()));
3451
3452        // Extract session,
3453        let session = pipe.client.session().unwrap();
3454
3455        // Configure session on new connection.
3456        let mut pipe = crate::testing::Pipe::with_config(&mut config).unwrap();
3457        assert_eq!(pipe.client.set_session(session), Ok(()));
3458
3459        // Can't create an H3 connection until the QUIC connection is determined
3460        // to have made sufficient early data progress.
3461        assert!(matches!(
3462            Connection::with_transport(&mut pipe.client, &h3_config),
3463            Err(Error::InternalError)
3464        ));
3465
3466        // Client sends initial flight.
3467        let (len, _) = pipe.client.send(&mut buf).unwrap();
3468
3469        // Now an H3 connection can be created.
3470        assert!(Connection::with_transport(&mut pipe.client, &h3_config).is_ok());
3471        assert_eq!(pipe.server_recv(&mut buf[..len]), Ok(len));
3472
3473        // Client sends 0-RTT packet.
3474        let pkt_type = crate::packet::Type::ZeroRTT;
3475
3476        let frames = [crate::frame::Frame::Stream {
3477            stream_id: 6,
3478            data: crate::stream::RangeBuf::from(b"aaaaa", 0, true),
3479        }];
3480
3481        assert_eq!(
3482            pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
3483            Ok(1200)
3484        );
3485
3486        assert_eq!(pipe.server.undecryptable_pkts.len(), 0);
3487
3488        // 0-RTT stream data is readable.
3489        let mut r = pipe.server.readable();
3490        assert_eq!(r.next(), Some(6));
3491        assert_eq!(r.next(), None);
3492
3493        let mut b = [0; 15];
3494        assert_eq!(pipe.server.stream_recv(6, &mut b), Ok((5, true)));
3495        assert_eq!(&b[..5], b"aaaaa");
3496    }
3497
3498    #[test]
3499    /// Send a request with no body, get a response with no body.
3500    fn request_no_body_response_no_body() {
3501        let mut s = Session::new().unwrap();
3502        s.handshake().unwrap();
3503
3504        let (stream, req) = s.send_request(true).unwrap();
3505
3506        assert_eq!(stream, 0);
3507
3508        let ev_headers = Event::Headers {
3509            list: req,
3510            more_frames: false,
3511        };
3512
3513        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
3514        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
3515
3516        let resp = s.send_response(stream, true).unwrap();
3517
3518        let ev_headers = Event::Headers {
3519            list: resp,
3520            more_frames: false,
3521        };
3522
3523        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
3524        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
3525        assert_eq!(s.poll_client(), Err(Error::Done));
3526    }
3527
3528    #[test]
3529    /// Send a request with no body, get a response with one DATA frame.
3530    fn request_no_body_response_one_chunk() {
3531        let mut s = Session::new().unwrap();
3532        s.handshake().unwrap();
3533
3534        let (stream, req) = s.send_request(true).unwrap();
3535        assert_eq!(stream, 0);
3536
3537        let ev_headers = Event::Headers {
3538            list: req,
3539            more_frames: false,
3540        };
3541
3542        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
3543
3544        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
3545
3546        let resp = s.send_response(stream, false).unwrap();
3547
3548        let body = s.send_body_server(stream, true).unwrap();
3549
3550        let mut recv_buf = vec![0; body.len()];
3551
3552        let ev_headers = Event::Headers {
3553            list: resp,
3554            more_frames: true,
3555        };
3556
3557        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
3558
3559        assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
3560        assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(body.len()));
3561
3562        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
3563        assert_eq!(s.poll_client(), Err(Error::Done));
3564    }
3565
3566    #[test]
3567    /// Send a request with no body, get a response with multiple DATA frames.
3568    fn request_no_body_response_many_chunks() {
3569        let mut s = Session::new().unwrap();
3570        s.handshake().unwrap();
3571
3572        let (stream, req) = s.send_request(true).unwrap();
3573
3574        let ev_headers = Event::Headers {
3575            list: req,
3576            more_frames: false,
3577        };
3578
3579        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
3580        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
3581
3582        let total_data_frames = 4;
3583
3584        let resp = s.send_response(stream, false).unwrap();
3585
3586        for _ in 0..total_data_frames - 1 {
3587            s.send_body_server(stream, false).unwrap();
3588        }
3589
3590        let body = s.send_body_server(stream, true).unwrap();
3591
3592        let mut recv_buf = vec![0; body.len()];
3593
3594        let ev_headers = Event::Headers {
3595            list: resp,
3596            more_frames: true,
3597        };
3598
3599        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
3600        assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
3601        assert_eq!(s.poll_client(), Err(Error::Done));
3602
3603        for _ in 0..total_data_frames {
3604            assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(body.len()));
3605        }
3606
3607        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
3608        assert_eq!(s.poll_client(), Err(Error::Done));
3609    }
3610
3611    #[test]
3612    /// Send a request with one DATA frame, get a response with no body.
3613    fn request_one_chunk_response_no_body() {
3614        let mut s = Session::new().unwrap();
3615        s.handshake().unwrap();
3616
3617        let (stream, req) = s.send_request(false).unwrap();
3618
3619        let body = s.send_body_client(stream, true).unwrap();
3620
3621        let mut recv_buf = vec![0; body.len()];
3622
3623        let ev_headers = Event::Headers {
3624            list: req,
3625            more_frames: true,
3626        };
3627
3628        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
3629
3630        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
3631        assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
3632
3633        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
3634
3635        let resp = s.send_response(stream, true).unwrap();
3636
3637        let ev_headers = Event::Headers {
3638            list: resp,
3639            more_frames: false,
3640        };
3641
3642        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
3643        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
3644    }
3645
3646    #[test]
3647    /// Send a request with multiple DATA frames, get a response with no body.
3648    fn request_many_chunks_response_no_body() {
3649        let mut s = Session::new().unwrap();
3650        s.handshake().unwrap();
3651
3652        let (stream, req) = s.send_request(false).unwrap();
3653
3654        let total_data_frames = 4;
3655
3656        for _ in 0..total_data_frames - 1 {
3657            s.send_body_client(stream, false).unwrap();
3658        }
3659
3660        let body = s.send_body_client(stream, true).unwrap();
3661
3662        let mut recv_buf = vec![0; body.len()];
3663
3664        let ev_headers = Event::Headers {
3665            list: req,
3666            more_frames: true,
3667        };
3668
3669        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
3670        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
3671        assert_eq!(s.poll_server(), Err(Error::Done));
3672
3673        for _ in 0..total_data_frames {
3674            assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
3675        }
3676
3677        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
3678
3679        let resp = s.send_response(stream, true).unwrap();
3680
3681        let ev_headers = Event::Headers {
3682            list: resp,
3683            more_frames: false,
3684        };
3685
3686        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
3687        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
3688    }
3689
3690    #[test]
3691    /// Send a request with multiple DATA frames, get a response with one DATA
3692    /// frame.
3693    fn many_requests_many_chunks_response_one_chunk() {
3694        let mut s = Session::new().unwrap();
3695        s.handshake().unwrap();
3696
3697        let mut reqs = Vec::new();
3698
3699        let (stream1, req1) = s.send_request(false).unwrap();
3700        assert_eq!(stream1, 0);
3701        reqs.push(req1);
3702
3703        let (stream2, req2) = s.send_request(false).unwrap();
3704        assert_eq!(stream2, 4);
3705        reqs.push(req2);
3706
3707        let (stream3, req3) = s.send_request(false).unwrap();
3708        assert_eq!(stream3, 8);
3709        reqs.push(req3);
3710
3711        let body = s.send_body_client(stream1, false).unwrap();
3712        s.send_body_client(stream2, false).unwrap();
3713        s.send_body_client(stream3, false).unwrap();
3714
3715        let mut recv_buf = vec![0; body.len()];
3716
3717        // Reverse order of writes.
3718
3719        s.send_body_client(stream3, true).unwrap();
3720        s.send_body_client(stream2, true).unwrap();
3721        s.send_body_client(stream1, true).unwrap();
3722
3723        let (_, ev) = s.poll_server().unwrap();
3724        let ev_headers = Event::Headers {
3725            list: reqs[0].clone(),
3726            more_frames: true,
3727        };
3728        assert_eq!(ev, ev_headers);
3729
3730        let (_, ev) = s.poll_server().unwrap();
3731        let ev_headers = Event::Headers {
3732            list: reqs[1].clone(),
3733            more_frames: true,
3734        };
3735        assert_eq!(ev, ev_headers);
3736
3737        let (_, ev) = s.poll_server().unwrap();
3738        let ev_headers = Event::Headers {
3739            list: reqs[2].clone(),
3740            more_frames: true,
3741        };
3742        assert_eq!(ev, ev_headers);
3743
3744        assert_eq!(s.poll_server(), Ok((0, Event::Data)));
3745        assert_eq!(s.recv_body_server(0, &mut recv_buf), Ok(body.len()));
3746        assert_eq!(s.poll_client(), Err(Error::Done));
3747        assert_eq!(s.recv_body_server(0, &mut recv_buf), Ok(body.len()));
3748        assert_eq!(s.poll_server(), Ok((0, Event::Finished)));
3749
3750        assert_eq!(s.poll_server(), Ok((4, Event::Data)));
3751        assert_eq!(s.recv_body_server(4, &mut recv_buf), Ok(body.len()));
3752        assert_eq!(s.poll_client(), Err(Error::Done));
3753        assert_eq!(s.recv_body_server(4, &mut recv_buf), Ok(body.len()));
3754        assert_eq!(s.poll_server(), Ok((4, Event::Finished)));
3755
3756        assert_eq!(s.poll_server(), Ok((8, Event::Data)));
3757        assert_eq!(s.recv_body_server(8, &mut recv_buf), Ok(body.len()));
3758        assert_eq!(s.poll_client(), Err(Error::Done));
3759        assert_eq!(s.recv_body_server(8, &mut recv_buf), Ok(body.len()));
3760        assert_eq!(s.poll_server(), Ok((8, Event::Finished)));
3761
3762        assert_eq!(s.poll_server(), Err(Error::Done));
3763
3764        let mut resps = Vec::new();
3765
3766        let resp1 = s.send_response(stream1, true).unwrap();
3767        resps.push(resp1);
3768
3769        let resp2 = s.send_response(stream2, true).unwrap();
3770        resps.push(resp2);
3771
3772        let resp3 = s.send_response(stream3, true).unwrap();
3773        resps.push(resp3);
3774
3775        for _ in 0..resps.len() {
3776            let (stream, ev) = s.poll_client().unwrap();
3777            let ev_headers = Event::Headers {
3778                list: resps[(stream / 4) as usize].clone(),
3779                more_frames: false,
3780            };
3781            assert_eq!(ev, ev_headers);
3782            assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
3783        }
3784
3785        assert_eq!(s.poll_client(), Err(Error::Done));
3786    }
3787
3788    #[test]
3789    /// Send a request with no body, get a response with one DATA frame and an
3790    /// empty FIN after reception from the client.
3791    fn request_no_body_response_one_chunk_empty_fin() {
3792        let mut s = Session::new().unwrap();
3793        s.handshake().unwrap();
3794
3795        let (stream, req) = s.send_request(true).unwrap();
3796
3797        let ev_headers = Event::Headers {
3798            list: req,
3799            more_frames: false,
3800        };
3801
3802        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
3803        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
3804
3805        let resp = s.send_response(stream, false).unwrap();
3806
3807        let body = s.send_body_server(stream, false).unwrap();
3808
3809        let mut recv_buf = vec![0; body.len()];
3810
3811        let ev_headers = Event::Headers {
3812            list: resp,
3813            more_frames: true,
3814        };
3815
3816        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
3817
3818        assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
3819        assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(body.len()));
3820
3821        assert_eq!(s.pipe.server.stream_send(stream, &[], true), Ok(0));
3822        s.advance().ok();
3823
3824        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
3825        assert_eq!(s.poll_client(), Err(Error::Done));
3826    }
3827
3828    #[test]
3829    /// Send a request with no body, get a response with no body followed by
3830    /// GREASE that is STREAM frame with a FIN.
3831    fn request_no_body_response_no_body_with_grease() {
3832        let mut s = Session::new().unwrap();
3833        s.handshake().unwrap();
3834
3835        let (stream, req) = s.send_request(true).unwrap();
3836
3837        assert_eq!(stream, 0);
3838
3839        let ev_headers = Event::Headers {
3840            list: req,
3841            more_frames: false,
3842        };
3843
3844        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
3845        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
3846
3847        let resp = s.send_response(stream, false).unwrap();
3848
3849        let ev_headers = Event::Headers {
3850            list: resp,
3851            more_frames: true,
3852        };
3853
3854        // Inject a GREASE frame
3855        let mut d = [42; 10];
3856        let mut b = octets::OctetsMut::with_slice(&mut d);
3857
3858        let frame_type = b.put_varint(148_764_065_110_560_899).unwrap();
3859        s.pipe.server.stream_send(0, frame_type, false).unwrap();
3860
3861        let frame_len = b.put_varint(10).unwrap();
3862        s.pipe.server.stream_send(0, frame_len, false).unwrap();
3863
3864        s.pipe.server.stream_send(0, &d, true).unwrap();
3865
3866        s.advance().ok();
3867
3868        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
3869        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
3870        assert_eq!(s.poll_client(), Err(Error::Done));
3871    }
3872
3873    #[test]
3874    /// Try to send DATA frames before HEADERS.
3875    fn body_response_before_headers() {
3876        let mut s = Session::new().unwrap();
3877        s.handshake().unwrap();
3878
3879        let (stream, req) = s.send_request(true).unwrap();
3880        assert_eq!(stream, 0);
3881
3882        let ev_headers = Event::Headers {
3883            list: req,
3884            more_frames: false,
3885        };
3886
3887        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
3888
3889        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
3890
3891        assert_eq!(
3892            s.send_body_server(stream, true),
3893            Err(Error::FrameUnexpected)
3894        );
3895
3896        assert_eq!(s.poll_client(), Err(Error::Done));
3897    }
3898
3899    #[test]
3900    /// Try to send DATA frames on wrong streams, ensure the API returns an
3901    /// error before anything hits the transport layer.
3902    fn send_body_invalid_client_stream() {
3903        let mut s = Session::new().unwrap();
3904        s.handshake().unwrap();
3905
3906        assert_eq!(s.send_body_client(0, true), Err(Error::FrameUnexpected));
3907
3908        assert_eq!(
3909            s.send_body_client(s.client.control_stream_id.unwrap(), true),
3910            Err(Error::FrameUnexpected)
3911        );
3912
3913        assert_eq!(
3914            s.send_body_client(
3915                s.client.local_qpack_streams.encoder_stream_id.unwrap(),
3916                true
3917            ),
3918            Err(Error::FrameUnexpected)
3919        );
3920
3921        assert_eq!(
3922            s.send_body_client(
3923                s.client.local_qpack_streams.decoder_stream_id.unwrap(),
3924                true
3925            ),
3926            Err(Error::FrameUnexpected)
3927        );
3928
3929        assert_eq!(
3930            s.send_body_client(s.client.peer_control_stream_id.unwrap(), true),
3931            Err(Error::FrameUnexpected)
3932        );
3933
3934        assert_eq!(
3935            s.send_body_client(
3936                s.client.peer_qpack_streams.encoder_stream_id.unwrap(),
3937                true
3938            ),
3939            Err(Error::FrameUnexpected)
3940        );
3941
3942        assert_eq!(
3943            s.send_body_client(
3944                s.client.peer_qpack_streams.decoder_stream_id.unwrap(),
3945                true
3946            ),
3947            Err(Error::FrameUnexpected)
3948        );
3949    }
3950
3951    #[test]
3952    /// Try to send DATA frames on wrong streams, ensure the API returns an
3953    /// error before anything hits the transport layer.
3954    fn send_body_invalid_server_stream() {
3955        let mut s = Session::new().unwrap();
3956        s.handshake().unwrap();
3957
3958        assert_eq!(s.send_body_server(0, true), Err(Error::FrameUnexpected));
3959
3960        assert_eq!(
3961            s.send_body_server(s.server.control_stream_id.unwrap(), true),
3962            Err(Error::FrameUnexpected)
3963        );
3964
3965        assert_eq!(
3966            s.send_body_server(
3967                s.server.local_qpack_streams.encoder_stream_id.unwrap(),
3968                true
3969            ),
3970            Err(Error::FrameUnexpected)
3971        );
3972
3973        assert_eq!(
3974            s.send_body_server(
3975                s.server.local_qpack_streams.decoder_stream_id.unwrap(),
3976                true
3977            ),
3978            Err(Error::FrameUnexpected)
3979        );
3980
3981        assert_eq!(
3982            s.send_body_server(s.server.peer_control_stream_id.unwrap(), true),
3983            Err(Error::FrameUnexpected)
3984        );
3985
3986        assert_eq!(
3987            s.send_body_server(
3988                s.server.peer_qpack_streams.encoder_stream_id.unwrap(),
3989                true
3990            ),
3991            Err(Error::FrameUnexpected)
3992        );
3993
3994        assert_eq!(
3995            s.send_body_server(
3996                s.server.peer_qpack_streams.decoder_stream_id.unwrap(),
3997                true
3998            ),
3999            Err(Error::FrameUnexpected)
4000        );
4001    }
4002
4003    #[test]
4004    /// Client sends request with body and trailers.
4005    fn trailers() {
4006        let mut s = Session::new().unwrap();
4007        s.handshake().unwrap();
4008
4009        let (stream, req) = s.send_request(false).unwrap();
4010
4011        let body = s.send_body_client(stream, false).unwrap();
4012
4013        let mut recv_buf = vec![0; body.len()];
4014
4015        let req_trailers = vec![Header::new(b"foo", b"bar")];
4016
4017        s.client
4018            .send_additional_headers(
4019                &mut s.pipe.client,
4020                stream,
4021                &req_trailers,
4022                true,
4023                true,
4024            )
4025            .unwrap();
4026
4027        s.advance().ok();
4028
4029        let ev_headers = Event::Headers {
4030            list: req,
4031            more_frames: true,
4032        };
4033
4034        let ev_trailers = Event::Headers {
4035            list: req_trailers,
4036            more_frames: false,
4037        };
4038
4039        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4040
4041        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
4042        assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
4043
4044        assert_eq!(s.poll_server(), Ok((stream, ev_trailers)));
4045        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
4046    }
4047
4048    #[test]
4049    /// Server responds with a 103, then a 200 with no body.
4050    fn informational_response() {
4051        let mut s = Session::new().unwrap();
4052        s.handshake().unwrap();
4053
4054        let (stream, req) = s.send_request(true).unwrap();
4055
4056        assert_eq!(stream, 0);
4057
4058        let ev_headers = Event::Headers {
4059            list: req,
4060            more_frames: false,
4061        };
4062
4063        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4064        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
4065
4066        let info_resp = vec![
4067            Header::new(b":status", b"103"),
4068            Header::new(b"link", b"<https://example.com>; rel=\"preconnect\""),
4069        ];
4070
4071        let resp = vec![
4072            Header::new(b":status", b"200"),
4073            Header::new(b"server", b"quiche-test"),
4074        ];
4075
4076        s.server
4077            .send_response(&mut s.pipe.server, stream, &info_resp, false)
4078            .unwrap();
4079
4080        s.server
4081            .send_additional_headers(
4082                &mut s.pipe.server,
4083                stream,
4084                &resp,
4085                false,
4086                true,
4087            )
4088            .unwrap();
4089
4090        s.advance().ok();
4091
4092        let ev_info_headers = Event::Headers {
4093            list: info_resp,
4094            more_frames: true,
4095        };
4096
4097        let ev_headers = Event::Headers {
4098            list: resp,
4099            more_frames: false,
4100        };
4101
4102        assert_eq!(s.poll_client(), Ok((stream, ev_info_headers)));
4103        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
4104        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
4105        assert_eq!(s.poll_client(), Err(Error::Done));
4106    }
4107
4108    #[test]
4109    /// Server responds with a 103, then attempts to send a 200 using
4110    /// send_response again, which should fail.
4111    fn no_multiple_response() {
4112        let mut s = Session::new().unwrap();
4113        s.handshake().unwrap();
4114
4115        let (stream, req) = s.send_request(true).unwrap();
4116
4117        assert_eq!(stream, 0);
4118
4119        let ev_headers = Event::Headers {
4120            list: req,
4121            more_frames: false,
4122        };
4123
4124        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4125        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
4126
4127        let info_resp = vec![
4128            Header::new(b":status", b"103"),
4129            Header::new(b"link", b"<https://example.com>; rel=\"preconnect\""),
4130        ];
4131
4132        let resp = vec![
4133            Header::new(b":status", b"200"),
4134            Header::new(b"server", b"quiche-test"),
4135        ];
4136
4137        s.server
4138            .send_response(&mut s.pipe.server, stream, &info_resp, false)
4139            .unwrap();
4140
4141        assert_eq!(
4142            Err(Error::FrameUnexpected),
4143            s.server
4144                .send_response(&mut s.pipe.server, stream, &resp, true)
4145        );
4146
4147        s.advance().ok();
4148
4149        let ev_info_headers = Event::Headers {
4150            list: info_resp,
4151            more_frames: true,
4152        };
4153
4154        assert_eq!(s.poll_client(), Ok((stream, ev_info_headers)));
4155        assert_eq!(s.poll_client(), Err(Error::Done));
4156    }
4157
4158    #[test]
4159    /// Server attempts to use send_additional_headers before initial response.
4160    fn no_send_additional_before_initial_response() {
4161        let mut s = Session::new().unwrap();
4162        s.handshake().unwrap();
4163
4164        let (stream, req) = s.send_request(true).unwrap();
4165
4166        assert_eq!(stream, 0);
4167
4168        let ev_headers = Event::Headers {
4169            list: req,
4170            more_frames: false,
4171        };
4172
4173        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4174        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
4175
4176        let info_resp = vec![
4177            Header::new(b":status", b"103"),
4178            Header::new(b"link", b"<https://example.com>; rel=\"preconnect\""),
4179        ];
4180
4181        assert_eq!(
4182            Err(Error::FrameUnexpected),
4183            s.server.send_additional_headers(
4184                &mut s.pipe.server,
4185                stream,
4186                &info_resp,
4187                false,
4188                false
4189            )
4190        );
4191
4192        s.advance().ok();
4193
4194        assert_eq!(s.poll_client(), Err(Error::Done));
4195    }
4196
4197    #[test]
4198    /// Client sends multiple HEADERS before data.
4199    fn additional_headers_before_data_client() {
4200        let mut s = Session::new().unwrap();
4201        s.handshake().unwrap();
4202
4203        let (stream, req) = s.send_request(false).unwrap();
4204
4205        let req_trailer = vec![Header::new(b"goodbye", b"world")];
4206
4207        assert_eq!(
4208            s.client.send_additional_headers(
4209                &mut s.pipe.client,
4210                stream,
4211                &req_trailer,
4212                true,
4213                false
4214            ),
4215            Ok(())
4216        );
4217
4218        s.advance().ok();
4219
4220        let ev_initial_headers = Event::Headers {
4221            list: req,
4222            more_frames: true,
4223        };
4224
4225        let ev_trailing_headers = Event::Headers {
4226            list: req_trailer,
4227            more_frames: true,
4228        };
4229
4230        assert_eq!(s.poll_server(), Ok((stream, ev_initial_headers)));
4231        assert_eq!(s.poll_server(), Ok((stream, ev_trailing_headers)));
4232        assert_eq!(s.poll_server(), Err(Error::Done));
4233    }
4234
4235    #[test]
4236    /// Client sends multiple HEADERS before data.
4237    fn data_after_trailers_client() {
4238        let mut s = Session::new().unwrap();
4239        s.handshake().unwrap();
4240
4241        let (stream, req) = s.send_request(false).unwrap();
4242
4243        let body = s.send_body_client(stream, false).unwrap();
4244
4245        let mut recv_buf = vec![0; body.len()];
4246
4247        let req_trailers = vec![Header::new(b"foo", b"bar")];
4248
4249        s.client
4250            .send_additional_headers(
4251                &mut s.pipe.client,
4252                stream,
4253                &req_trailers,
4254                true,
4255                false,
4256            )
4257            .unwrap();
4258
4259        s.advance().ok();
4260
4261        s.send_frame_client(
4262            frame::Frame::Data {
4263                payload: vec![1, 2, 3, 4],
4264            },
4265            stream,
4266            true,
4267        )
4268        .unwrap();
4269
4270        let ev_headers = Event::Headers {
4271            list: req,
4272            more_frames: true,
4273        };
4274
4275        let ev_trailers = Event::Headers {
4276            list: req_trailers,
4277            more_frames: true,
4278        };
4279
4280        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4281        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
4282        assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
4283        assert_eq!(s.poll_server(), Ok((stream, ev_trailers)));
4284        assert_eq!(s.poll_server(), Err(Error::FrameUnexpected));
4285    }
4286
4287    #[test]
4288    /// Send a MAX_PUSH_ID frame from the client on a valid stream.
4289    fn max_push_id_from_client_good() {
4290        let mut s = Session::new().unwrap();
4291        s.handshake().unwrap();
4292
4293        s.send_frame_client(
4294            frame::Frame::MaxPushId { push_id: 1 },
4295            s.client.control_stream_id.unwrap(),
4296            false,
4297        )
4298        .unwrap();
4299
4300        assert_eq!(s.poll_server(), Err(Error::Done));
4301    }
4302
4303    #[test]
4304    /// Send a MAX_PUSH_ID frame from the client on an invalid stream.
4305    fn max_push_id_from_client_bad_stream() {
4306        let mut s = Session::new().unwrap();
4307        s.handshake().unwrap();
4308
4309        let (stream, req) = s.send_request(false).unwrap();
4310
4311        s.send_frame_client(
4312            frame::Frame::MaxPushId { push_id: 2 },
4313            stream,
4314            false,
4315        )
4316        .unwrap();
4317
4318        let ev_headers = Event::Headers {
4319            list: req,
4320            more_frames: true,
4321        };
4322
4323        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4324        assert_eq!(s.poll_server(), Err(Error::FrameUnexpected));
4325    }
4326
4327    #[test]
4328    /// Send a sequence of MAX_PUSH_ID frames from the client that attempt to
4329    /// reduce the limit.
4330    fn max_push_id_from_client_limit_reduction() {
4331        let mut s = Session::new().unwrap();
4332        s.handshake().unwrap();
4333
4334        s.send_frame_client(
4335            frame::Frame::MaxPushId { push_id: 2 },
4336            s.client.control_stream_id.unwrap(),
4337            false,
4338        )
4339        .unwrap();
4340
4341        s.send_frame_client(
4342            frame::Frame::MaxPushId { push_id: 1 },
4343            s.client.control_stream_id.unwrap(),
4344            false,
4345        )
4346        .unwrap();
4347
4348        assert_eq!(s.poll_server(), Err(Error::IdError));
4349    }
4350
4351    #[test]
4352    /// Send a MAX_PUSH_ID frame from the server, which is forbidden.
4353    fn max_push_id_from_server() {
4354        let mut s = Session::new().unwrap();
4355        s.handshake().unwrap();
4356
4357        s.send_frame_server(
4358            frame::Frame::MaxPushId { push_id: 1 },
4359            s.server.control_stream_id.unwrap(),
4360            false,
4361        )
4362        .unwrap();
4363
4364        assert_eq!(s.poll_client(), Err(Error::FrameUnexpected));
4365    }
4366
4367    #[test]
4368    /// Send a PUSH_PROMISE frame from the client, which is forbidden.
4369    fn push_promise_from_client() {
4370        let mut s = Session::new().unwrap();
4371        s.handshake().unwrap();
4372
4373        let (stream, req) = s.send_request(false).unwrap();
4374
4375        let header_block = s.client.encode_header_block(&req).unwrap();
4376
4377        s.send_frame_client(
4378            frame::Frame::PushPromise {
4379                push_id: 1,
4380                header_block,
4381            },
4382            stream,
4383            false,
4384        )
4385        .unwrap();
4386
4387        let ev_headers = Event::Headers {
4388            list: req,
4389            more_frames: true,
4390        };
4391
4392        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4393        assert_eq!(s.poll_server(), Err(Error::FrameUnexpected));
4394    }
4395
4396    #[test]
4397    /// Send a CANCEL_PUSH frame from the client.
4398    fn cancel_push_from_client() {
4399        let mut s = Session::new().unwrap();
4400        s.handshake().unwrap();
4401
4402        s.send_frame_client(
4403            frame::Frame::CancelPush { push_id: 1 },
4404            s.client.control_stream_id.unwrap(),
4405            false,
4406        )
4407        .unwrap();
4408
4409        assert_eq!(s.poll_server(), Err(Error::Done));
4410    }
4411
4412    #[test]
4413    /// Send a CANCEL_PUSH frame from the client on an invalid stream.
4414    fn cancel_push_from_client_bad_stream() {
4415        let mut s = Session::new().unwrap();
4416        s.handshake().unwrap();
4417
4418        let (stream, req) = s.send_request(false).unwrap();
4419
4420        s.send_frame_client(
4421            frame::Frame::CancelPush { push_id: 2 },
4422            stream,
4423            false,
4424        )
4425        .unwrap();
4426
4427        let ev_headers = Event::Headers {
4428            list: req,
4429            more_frames: true,
4430        };
4431
4432        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4433        assert_eq!(s.poll_server(), Err(Error::FrameUnexpected));
4434    }
4435
4436    #[test]
4437    /// Send a CANCEL_PUSH frame from the client.
4438    fn cancel_push_from_server() {
4439        let mut s = Session::new().unwrap();
4440        s.handshake().unwrap();
4441
4442        s.send_frame_server(
4443            frame::Frame::CancelPush { push_id: 1 },
4444            s.server.control_stream_id.unwrap(),
4445            false,
4446        )
4447        .unwrap();
4448
4449        assert_eq!(s.poll_client(), Err(Error::Done));
4450    }
4451
4452    #[test]
4453    /// Send a GOAWAY frame from the client.
4454    fn goaway_from_client_good() {
4455        let mut s = Session::new().unwrap();
4456        s.handshake().unwrap();
4457
4458        s.client.send_goaway(&mut s.pipe.client, 100).unwrap();
4459
4460        s.advance().ok();
4461
4462        // TODO: server push
4463        assert_eq!(s.poll_server(), Ok((0, Event::GoAway)));
4464    }
4465
4466    #[test]
4467    /// Send a GOAWAY frame from the server.
4468    fn goaway_from_server_good() {
4469        let mut s = Session::new().unwrap();
4470        s.handshake().unwrap();
4471
4472        s.server.send_goaway(&mut s.pipe.server, 4000).unwrap();
4473
4474        s.advance().ok();
4475
4476        assert_eq!(s.poll_client(), Ok((4000, Event::GoAway)));
4477    }
4478
4479    #[test]
4480    /// A client MUST NOT send a request after it receives GOAWAY.
4481    fn client_request_after_goaway() {
4482        let mut s = Session::new().unwrap();
4483        s.handshake().unwrap();
4484
4485        s.server.send_goaway(&mut s.pipe.server, 4000).unwrap();
4486
4487        s.advance().ok();
4488
4489        assert_eq!(s.poll_client(), Ok((4000, Event::GoAway)));
4490
4491        assert_eq!(s.send_request(true), Err(Error::FrameUnexpected));
4492    }
4493
4494    #[test]
4495    /// Send a GOAWAY frame from the server, using an invalid goaway ID.
4496    fn goaway_from_server_invalid_id() {
4497        let mut s = Session::new().unwrap();
4498        s.handshake().unwrap();
4499
4500        s.send_frame_server(
4501            frame::Frame::GoAway { id: 1 },
4502            s.server.control_stream_id.unwrap(),
4503            false,
4504        )
4505        .unwrap();
4506
4507        assert_eq!(s.poll_client(), Err(Error::IdError));
4508    }
4509
4510    #[test]
4511    /// Send multiple GOAWAY frames from the server, that increase the goaway
4512    /// ID.
4513    fn goaway_from_server_increase_id() {
4514        let mut s = Session::new().unwrap();
4515        s.handshake().unwrap();
4516
4517        s.send_frame_server(
4518            frame::Frame::GoAway { id: 0 },
4519            s.server.control_stream_id.unwrap(),
4520            false,
4521        )
4522        .unwrap();
4523
4524        s.send_frame_server(
4525            frame::Frame::GoAway { id: 4 },
4526            s.server.control_stream_id.unwrap(),
4527            false,
4528        )
4529        .unwrap();
4530
4531        assert_eq!(s.poll_client(), Ok((0, Event::GoAway)));
4532
4533        assert_eq!(s.poll_client(), Err(Error::IdError));
4534    }
4535
4536    #[test]
4537    #[cfg(feature = "sfv")]
4538    fn parse_priority_field_value() {
4539        // Legal dicts
4540        assert_eq!(
4541            Ok(Priority::new(0, false)),
4542            Priority::try_from(b"u=0".as_slice())
4543        );
4544        assert_eq!(
4545            Ok(Priority::new(3, false)),
4546            Priority::try_from(b"u=3".as_slice())
4547        );
4548        assert_eq!(
4549            Ok(Priority::new(7, false)),
4550            Priority::try_from(b"u=7".as_slice())
4551        );
4552
4553        assert_eq!(
4554            Ok(Priority::new(0, true)),
4555            Priority::try_from(b"u=0, i".as_slice())
4556        );
4557        assert_eq!(
4558            Ok(Priority::new(3, true)),
4559            Priority::try_from(b"u=3, i".as_slice())
4560        );
4561        assert_eq!(
4562            Ok(Priority::new(7, true)),
4563            Priority::try_from(b"u=7, i".as_slice())
4564        );
4565
4566        assert_eq!(
4567            Ok(Priority::new(0, true)),
4568            Priority::try_from(b"u=0, i=?1".as_slice())
4569        );
4570        assert_eq!(
4571            Ok(Priority::new(3, true)),
4572            Priority::try_from(b"u=3, i=?1".as_slice())
4573        );
4574        assert_eq!(
4575            Ok(Priority::new(7, true)),
4576            Priority::try_from(b"u=7, i=?1".as_slice())
4577        );
4578
4579        assert_eq!(
4580            Ok(Priority::new(3, false)),
4581            Priority::try_from(b"".as_slice())
4582        );
4583
4584        assert_eq!(
4585            Ok(Priority::new(0, true)),
4586            Priority::try_from(b"u=0;foo, i;bar".as_slice())
4587        );
4588        assert_eq!(
4589            Ok(Priority::new(3, true)),
4590            Priority::try_from(b"u=3;hello, i;world".as_slice())
4591        );
4592        assert_eq!(
4593            Ok(Priority::new(7, true)),
4594            Priority::try_from(b"u=7;croeso, i;gymru".as_slice())
4595        );
4596
4597        assert_eq!(
4598            Ok(Priority::new(0, true)),
4599            Priority::try_from(b"u=0, i, spinaltap=11".as_slice())
4600        );
4601
4602        // Illegal formats
4603        assert_eq!(Err(Error::Done), Priority::try_from(b"0".as_slice()));
4604        assert_eq!(
4605            Ok(Priority::new(7, false)),
4606            Priority::try_from(b"u=-1".as_slice())
4607        );
4608        assert_eq!(Err(Error::Done), Priority::try_from(b"u=0.2".as_slice()));
4609        assert_eq!(
4610            Ok(Priority::new(7, false)),
4611            Priority::try_from(b"u=100".as_slice())
4612        );
4613        assert_eq!(
4614            Err(Error::Done),
4615            Priority::try_from(b"u=3, i=true".as_slice())
4616        );
4617
4618        // Trailing comma in dict is malformed
4619        assert_eq!(Err(Error::Done), Priority::try_from(b"u=7, ".as_slice()));
4620    }
4621
4622    #[test]
4623    /// Send a PRIORITY_UPDATE for request stream from the client.
4624    fn priority_update_request() {
4625        let mut s = Session::new().unwrap();
4626        s.handshake().unwrap();
4627
4628        s.client
4629            .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
4630                urgency: 3,
4631                incremental: false,
4632            })
4633            .unwrap();
4634        s.advance().ok();
4635
4636        assert_eq!(s.poll_server(), Ok((0, Event::PriorityUpdate)));
4637        assert_eq!(s.poll_server(), Err(Error::Done));
4638    }
4639
4640    #[test]
4641    /// Send a PRIORITY_UPDATE for request stream from the client.
4642    fn priority_update_single_stream_rearm() {
4643        let mut s = Session::new().unwrap();
4644        s.handshake().unwrap();
4645
4646        s.client
4647            .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
4648                urgency: 3,
4649                incremental: false,
4650            })
4651            .unwrap();
4652        s.advance().ok();
4653
4654        assert_eq!(s.poll_server(), Ok((0, Event::PriorityUpdate)));
4655        assert_eq!(s.poll_server(), Err(Error::Done));
4656
4657        s.client
4658            .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
4659                urgency: 5,
4660                incremental: false,
4661            })
4662            .unwrap();
4663        s.advance().ok();
4664
4665        assert_eq!(s.poll_server(), Err(Error::Done));
4666
4667        // There is only one PRIORITY_UPDATE frame to read. Once read, the event
4668        // will rearm ready for more.
4669        assert_eq!(s.server.take_last_priority_update(0), Ok(b"u=5".to_vec()));
4670        assert_eq!(s.server.take_last_priority_update(0), Err(Error::Done));
4671
4672        s.client
4673            .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
4674                urgency: 7,
4675                incremental: false,
4676            })
4677            .unwrap();
4678        s.advance().ok();
4679
4680        assert_eq!(s.poll_server(), Ok((0, Event::PriorityUpdate)));
4681        assert_eq!(s.poll_server(), Err(Error::Done));
4682
4683        assert_eq!(s.server.take_last_priority_update(0), Ok(b"u=7".to_vec()));
4684        assert_eq!(s.server.take_last_priority_update(0), Err(Error::Done));
4685    }
4686
4687    #[test]
4688    /// Send multiple PRIORITY_UPDATE frames for different streams from the
4689    /// client across multiple flights of exchange.
4690    fn priority_update_request_multiple_stream_arm_multiple_flights() {
4691        let mut s = Session::new().unwrap();
4692        s.handshake().unwrap();
4693
4694        s.client
4695            .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
4696                urgency: 3,
4697                incremental: false,
4698            })
4699            .unwrap();
4700        s.advance().ok();
4701
4702        assert_eq!(s.poll_server(), Ok((0, Event::PriorityUpdate)));
4703        assert_eq!(s.poll_server(), Err(Error::Done));
4704
4705        s.client
4706            .send_priority_update_for_request(&mut s.pipe.client, 4, &Priority {
4707                urgency: 1,
4708                incremental: false,
4709            })
4710            .unwrap();
4711        s.advance().ok();
4712
4713        assert_eq!(s.poll_server(), Ok((4, Event::PriorityUpdate)));
4714        assert_eq!(s.poll_server(), Err(Error::Done));
4715
4716        s.client
4717            .send_priority_update_for_request(&mut s.pipe.client, 8, &Priority {
4718                urgency: 2,
4719                incremental: false,
4720            })
4721            .unwrap();
4722        s.advance().ok();
4723
4724        assert_eq!(s.poll_server(), Ok((8, Event::PriorityUpdate)));
4725        assert_eq!(s.poll_server(), Err(Error::Done));
4726
4727        assert_eq!(s.server.take_last_priority_update(0), Ok(b"u=3".to_vec()));
4728        assert_eq!(s.server.take_last_priority_update(4), Ok(b"u=1".to_vec()));
4729        assert_eq!(s.server.take_last_priority_update(8), Ok(b"u=2".to_vec()));
4730        assert_eq!(s.server.take_last_priority_update(0), Err(Error::Done));
4731    }
4732
4733    #[test]
4734    /// Send multiple PRIORITY_UPDATE frames for different streams from the
4735    /// client across a single flight.
4736    fn priority_update_request_multiple_stream_arm_single_flight() {
4737        let mut s = Session::new().unwrap();
4738        s.handshake().unwrap();
4739
4740        let mut d = [42; 65535];
4741
4742        let mut b = octets::OctetsMut::with_slice(&mut d);
4743
4744        let p1 = frame::Frame::PriorityUpdateRequest {
4745            prioritized_element_id: 0,
4746            priority_field_value: b"u=3".to_vec(),
4747        };
4748
4749        let p2 = frame::Frame::PriorityUpdateRequest {
4750            prioritized_element_id: 4,
4751            priority_field_value: b"u=3".to_vec(),
4752        };
4753
4754        let p3 = frame::Frame::PriorityUpdateRequest {
4755            prioritized_element_id: 8,
4756            priority_field_value: b"u=3".to_vec(),
4757        };
4758
4759        p1.to_bytes(&mut b).unwrap();
4760        p2.to_bytes(&mut b).unwrap();
4761        p3.to_bytes(&mut b).unwrap();
4762
4763        let off = b.off();
4764        s.pipe
4765            .client
4766            .stream_send(s.client.control_stream_id.unwrap(), &d[..off], false)
4767            .unwrap();
4768
4769        s.advance().ok();
4770
4771        assert_eq!(s.poll_server(), Ok((0, Event::PriorityUpdate)));
4772        assert_eq!(s.poll_server(), Ok((4, Event::PriorityUpdate)));
4773        assert_eq!(s.poll_server(), Ok((8, Event::PriorityUpdate)));
4774        assert_eq!(s.poll_server(), Err(Error::Done));
4775
4776        assert_eq!(s.server.take_last_priority_update(0), Ok(b"u=3".to_vec()));
4777        assert_eq!(s.server.take_last_priority_update(4), Ok(b"u=3".to_vec()));
4778        assert_eq!(s.server.take_last_priority_update(8), Ok(b"u=3".to_vec()));
4779
4780        assert_eq!(s.server.take_last_priority_update(0), Err(Error::Done));
4781    }
4782
4783    #[test]
4784    /// Send a PRIORITY_UPDATE for a request stream, before and after the stream
4785    /// has been completed.
4786    fn priority_update_request_collected_completed() {
4787        let mut s = Session::new().unwrap();
4788        s.handshake().unwrap();
4789
4790        s.client
4791            .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
4792                urgency: 3,
4793                incremental: false,
4794            })
4795            .unwrap();
4796        s.advance().ok();
4797
4798        let (stream, req) = s.send_request(true).unwrap();
4799        let ev_headers = Event::Headers {
4800            list: req,
4801            more_frames: false,
4802        };
4803
4804        // Priority event is generated before request headers.
4805        assert_eq!(s.poll_server(), Ok((0, Event::PriorityUpdate)));
4806        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4807        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
4808        assert_eq!(s.poll_server(), Err(Error::Done));
4809
4810        assert_eq!(s.server.take_last_priority_update(0), Ok(b"u=3".to_vec()));
4811        assert_eq!(s.server.take_last_priority_update(0), Err(Error::Done));
4812
4813        let resp = s.send_response(stream, true).unwrap();
4814
4815        let ev_headers = Event::Headers {
4816            list: resp,
4817            more_frames: false,
4818        };
4819
4820        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
4821        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
4822        assert_eq!(s.poll_client(), Err(Error::Done));
4823
4824        // Now send a PRIORITY_UPDATE for the completed request stream.
4825        s.client
4826            .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
4827                urgency: 3,
4828                incremental: false,
4829            })
4830            .unwrap();
4831        s.advance().ok();
4832
4833        // No event generated at server
4834        assert_eq!(s.poll_server(), Err(Error::Done));
4835    }
4836
4837    #[test]
4838    /// Send a PRIORITY_UPDATE for a request stream, before and after the stream
4839    /// has been stopped.
4840    fn priority_update_request_collected_stopped() {
4841        let mut s = Session::new().unwrap();
4842        s.handshake().unwrap();
4843
4844        s.client
4845            .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
4846                urgency: 3,
4847                incremental: false,
4848            })
4849            .unwrap();
4850        s.advance().ok();
4851
4852        let (stream, req) = s.send_request(false).unwrap();
4853        let ev_headers = Event::Headers {
4854            list: req,
4855            more_frames: true,
4856        };
4857
4858        // Priority event is generated before request headers.
4859        assert_eq!(s.poll_server(), Ok((0, Event::PriorityUpdate)));
4860        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4861        assert_eq!(s.poll_server(), Err(Error::Done));
4862
4863        assert_eq!(s.server.take_last_priority_update(0), Ok(b"u=3".to_vec()));
4864        assert_eq!(s.server.take_last_priority_update(0), Err(Error::Done));
4865
4866        s.pipe
4867            .client
4868            .stream_shutdown(stream, crate::Shutdown::Write, 0x100)
4869            .unwrap();
4870        s.pipe
4871            .client
4872            .stream_shutdown(stream, crate::Shutdown::Read, 0x100)
4873            .unwrap();
4874
4875        s.advance().ok();
4876
4877        assert_eq!(s.poll_server(), Ok((0, Event::Reset(0x100))));
4878        assert_eq!(s.poll_server(), Err(Error::Done));
4879
4880        // Now send a PRIORITY_UPDATE for the closed request stream.
4881        s.client
4882            .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
4883                urgency: 3,
4884                incremental: false,
4885            })
4886            .unwrap();
4887        s.advance().ok();
4888
4889        // No event generated at server
4890        assert_eq!(s.poll_server(), Err(Error::Done));
4891    }
4892
4893    #[test]
4894    /// Send a PRIORITY_UPDATE for push stream from the client.
4895    fn priority_update_push() {
4896        let mut s = Session::new().unwrap();
4897        s.handshake().unwrap();
4898
4899        s.send_frame_client(
4900            frame::Frame::PriorityUpdatePush {
4901                prioritized_element_id: 3,
4902                priority_field_value: b"u=3".to_vec(),
4903            },
4904            s.client.control_stream_id.unwrap(),
4905            false,
4906        )
4907        .unwrap();
4908
4909        assert_eq!(s.poll_server(), Err(Error::Done));
4910    }
4911
4912    #[test]
4913    /// Send a PRIORITY_UPDATE for request stream from the client but for an
4914    /// incorrect stream type.
4915    fn priority_update_request_bad_stream() {
4916        let mut s = Session::new().unwrap();
4917        s.handshake().unwrap();
4918
4919        s.send_frame_client(
4920            frame::Frame::PriorityUpdateRequest {
4921                prioritized_element_id: 5,
4922                priority_field_value: b"u=3".to_vec(),
4923            },
4924            s.client.control_stream_id.unwrap(),
4925            false,
4926        )
4927        .unwrap();
4928
4929        assert_eq!(s.poll_server(), Err(Error::FrameUnexpected));
4930    }
4931
4932    #[test]
4933    /// Send a PRIORITY_UPDATE for push stream from the client but for an
4934    /// incorrect stream type.
4935    fn priority_update_push_bad_stream() {
4936        let mut s = Session::new().unwrap();
4937        s.handshake().unwrap();
4938
4939        s.send_frame_client(
4940            frame::Frame::PriorityUpdatePush {
4941                prioritized_element_id: 5,
4942                priority_field_value: b"u=3".to_vec(),
4943            },
4944            s.client.control_stream_id.unwrap(),
4945            false,
4946        )
4947        .unwrap();
4948
4949        assert_eq!(s.poll_server(), Err(Error::FrameUnexpected));
4950    }
4951
4952    #[test]
4953    /// Send a PRIORITY_UPDATE for request stream from the server.
4954    fn priority_update_request_from_server() {
4955        let mut s = Session::new().unwrap();
4956        s.handshake().unwrap();
4957
4958        s.send_frame_server(
4959            frame::Frame::PriorityUpdateRequest {
4960                prioritized_element_id: 0,
4961                priority_field_value: b"u=3".to_vec(),
4962            },
4963            s.server.control_stream_id.unwrap(),
4964            false,
4965        )
4966        .unwrap();
4967
4968        assert_eq!(s.poll_client(), Err(Error::FrameUnexpected));
4969    }
4970
4971    #[test]
4972    /// Send a PRIORITY_UPDATE for request stream from the server.
4973    fn priority_update_push_from_server() {
4974        let mut s = Session::new().unwrap();
4975        s.handshake().unwrap();
4976
4977        s.send_frame_server(
4978            frame::Frame::PriorityUpdatePush {
4979                prioritized_element_id: 0,
4980                priority_field_value: b"u=3".to_vec(),
4981            },
4982            s.server.control_stream_id.unwrap(),
4983            false,
4984        )
4985        .unwrap();
4986
4987        assert_eq!(s.poll_client(), Err(Error::FrameUnexpected));
4988    }
4989
4990    #[test]
4991    /// Ensure quiche allocates streams for client and server roles as expected.
4992    fn uni_stream_local_counting() {
4993        let config = Config::new().unwrap();
4994
4995        let h3_cln = Connection::new(&config, false, false).unwrap();
4996        assert_eq!(h3_cln.next_uni_stream_id, 2);
4997
4998        let h3_srv = Connection::new(&config, true, false).unwrap();
4999        assert_eq!(h3_srv.next_uni_stream_id, 3);
5000    }
5001
5002    #[test]
5003    /// Client opens multiple control streams, which is forbidden.
5004    fn open_multiple_control_streams() {
5005        let mut s = Session::new().unwrap();
5006        s.handshake().unwrap();
5007
5008        let stream_id = s.client.next_uni_stream_id;
5009
5010        let mut d = [42; 8];
5011        let mut b = octets::OctetsMut::with_slice(&mut d);
5012
5013        s.pipe
5014            .client
5015            .stream_send(
5016                stream_id,
5017                b.put_varint(stream::HTTP3_CONTROL_STREAM_TYPE_ID).unwrap(),
5018                false,
5019            )
5020            .unwrap();
5021
5022        s.advance().ok();
5023
5024        assert_eq!(s.poll_server(), Err(Error::StreamCreationError));
5025    }
5026
5027    #[test]
5028    /// Client closes the control stream, which is forbidden.
5029    fn close_control_stream_after_type() {
5030        let mut s = Session::new().unwrap();
5031        s.handshake().unwrap();
5032
5033        s.pipe
5034            .client
5035            .stream_send(s.client.control_stream_id.unwrap(), &[], true)
5036            .unwrap();
5037
5038        s.advance().ok();
5039
5040        assert_eq!(
5041            Err(Error::ClosedCriticalStream),
5042            s.server.poll(&mut s.pipe.server)
5043        );
5044        assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
5045    }
5046
5047    #[test]
5048    /// Client closes the control stream after a frame is sent, which is
5049    /// forbidden.
5050    fn close_control_stream_after_frame() {
5051        let mut s = Session::new().unwrap();
5052        s.handshake().unwrap();
5053
5054        s.send_frame_client(
5055            frame::Frame::MaxPushId { push_id: 1 },
5056            s.client.control_stream_id.unwrap(),
5057            true,
5058        )
5059        .unwrap();
5060
5061        assert_eq!(
5062            Err(Error::ClosedCriticalStream),
5063            s.server.poll(&mut s.pipe.server)
5064        );
5065        assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
5066    }
5067
5068    #[test]
5069    /// Client resets the control stream, which is forbidden.
5070    fn reset_control_stream_after_type() {
5071        let mut s = Session::new().unwrap();
5072        s.handshake().unwrap();
5073
5074        s.pipe
5075            .client
5076            .stream_shutdown(
5077                s.client.control_stream_id.unwrap(),
5078                crate::Shutdown::Write,
5079                0,
5080            )
5081            .unwrap();
5082
5083        s.advance().ok();
5084
5085        assert_eq!(
5086            Err(Error::ClosedCriticalStream),
5087            s.server.poll(&mut s.pipe.server)
5088        );
5089        assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
5090    }
5091
5092    #[test]
5093    /// Client resets the control stream after a frame is sent, which is
5094    /// forbidden.
5095    fn reset_control_stream_after_frame() {
5096        let mut s = Session::new().unwrap();
5097        s.handshake().unwrap();
5098
5099        s.send_frame_client(
5100            frame::Frame::MaxPushId { push_id: 1 },
5101            s.client.control_stream_id.unwrap(),
5102            false,
5103        )
5104        .unwrap();
5105
5106        assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
5107
5108        s.pipe
5109            .client
5110            .stream_shutdown(
5111                s.client.control_stream_id.unwrap(),
5112                crate::Shutdown::Write,
5113                0,
5114            )
5115            .unwrap();
5116
5117        s.advance().ok();
5118
5119        assert_eq!(
5120            Err(Error::ClosedCriticalStream),
5121            s.server.poll(&mut s.pipe.server)
5122        );
5123        assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
5124    }
5125
5126    #[test]
5127    /// Client closes QPACK stream, which is forbidden.
5128    fn close_qpack_stream_after_type() {
5129        let mut s = Session::new().unwrap();
5130        s.handshake().unwrap();
5131
5132        s.pipe
5133            .client
5134            .stream_send(
5135                s.client.local_qpack_streams.encoder_stream_id.unwrap(),
5136                &[],
5137                true,
5138            )
5139            .unwrap();
5140
5141        s.advance().ok();
5142
5143        assert_eq!(
5144            Err(Error::ClosedCriticalStream),
5145            s.server.poll(&mut s.pipe.server)
5146        );
5147        assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
5148    }
5149
5150    #[test]
5151    /// Client closes QPACK stream after sending some stuff, which is forbidden.
5152    fn close_qpack_stream_after_data() {
5153        let mut s = Session::new().unwrap();
5154        s.handshake().unwrap();
5155
5156        let stream_id = s.client.local_qpack_streams.encoder_stream_id.unwrap();
5157        let d = [0; 1];
5158
5159        s.pipe.client.stream_send(stream_id, &d, false).unwrap();
5160        s.pipe.client.stream_send(stream_id, &d, true).unwrap();
5161
5162        s.advance().ok();
5163
5164        assert_eq!(
5165            Err(Error::ClosedCriticalStream),
5166            s.server.poll(&mut s.pipe.server)
5167        );
5168        assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
5169    }
5170
5171    #[test]
5172    /// Client resets QPACK stream, which is forbidden.
5173    fn reset_qpack_stream_after_type() {
5174        let mut s = Session::new().unwrap();
5175        s.handshake().unwrap();
5176
5177        s.pipe
5178            .client
5179            .stream_shutdown(
5180                s.client.local_qpack_streams.encoder_stream_id.unwrap(),
5181                crate::Shutdown::Write,
5182                0,
5183            )
5184            .unwrap();
5185
5186        s.advance().ok();
5187
5188        assert_eq!(
5189            Err(Error::ClosedCriticalStream),
5190            s.server.poll(&mut s.pipe.server)
5191        );
5192        assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
5193    }
5194
5195    #[test]
5196    /// Client resets QPACK stream after sending some stuff, which is forbidden.
5197    fn reset_qpack_stream_after_data() {
5198        let mut s = Session::new().unwrap();
5199        s.handshake().unwrap();
5200
5201        let stream_id = s.client.local_qpack_streams.encoder_stream_id.unwrap();
5202        let d = [0; 1];
5203
5204        s.pipe.client.stream_send(stream_id, &d, false).unwrap();
5205        s.pipe.client.stream_send(stream_id, &d, false).unwrap();
5206
5207        s.advance().ok();
5208
5209        assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
5210
5211        s.pipe
5212            .client
5213            .stream_shutdown(stream_id, crate::Shutdown::Write, 0)
5214            .unwrap();
5215
5216        s.advance().ok();
5217
5218        assert_eq!(
5219            Err(Error::ClosedCriticalStream),
5220            s.server.poll(&mut s.pipe.server)
5221        );
5222        assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
5223    }
5224
5225    #[test]
5226    /// Client sends QPACK data.
5227    fn qpack_data() {
5228        // TODO: QPACK instructions are ignored until dynamic table support is
5229        // added so we just test that the data is safely ignored.
5230        let mut s = Session::new().unwrap();
5231        s.handshake().unwrap();
5232
5233        let e_stream_id = s.client.local_qpack_streams.encoder_stream_id.unwrap();
5234        let d_stream_id = s.client.local_qpack_streams.decoder_stream_id.unwrap();
5235        let d = [0; 20];
5236
5237        s.pipe.client.stream_send(e_stream_id, &d, false).unwrap();
5238        s.advance().ok();
5239
5240        s.pipe.client.stream_send(d_stream_id, &d, false).unwrap();
5241        s.advance().ok();
5242
5243        match s.server.poll(&mut s.pipe.server) {
5244            Ok(_) => panic!(),
5245
5246            Err(Error::Done) => {
5247                assert_eq!(s.server.peer_qpack_streams.encoder_stream_bytes, 20);
5248                assert_eq!(s.server.peer_qpack_streams.decoder_stream_bytes, 20);
5249            },
5250
5251            Err(_) => {
5252                panic!();
5253            },
5254        }
5255
5256        let stats = s.server.stats();
5257        assert_eq!(stats.qpack_encoder_stream_recv_bytes, 20);
5258        assert_eq!(stats.qpack_decoder_stream_recv_bytes, 20);
5259    }
5260
5261    #[test]
5262    /// Tests limits for the stream state buffer maximum size.
5263    fn max_state_buf_size() {
5264        let mut s = Session::new().unwrap();
5265        s.handshake().unwrap();
5266
5267        let req = vec![
5268            Header::new(b":method", b"GET"),
5269            Header::new(b":scheme", b"https"),
5270            Header::new(b":authority", b"quic.tech"),
5271            Header::new(b":path", b"/test"),
5272            Header::new(b"user-agent", b"quiche-test"),
5273        ];
5274
5275        assert_eq!(
5276            s.client.send_request(&mut s.pipe.client, &req, false),
5277            Ok(0)
5278        );
5279
5280        s.advance().ok();
5281
5282        let ev_headers = Event::Headers {
5283            list: req,
5284            more_frames: true,
5285        };
5286
5287        assert_eq!(s.server.poll(&mut s.pipe.server), Ok((0, ev_headers)));
5288
5289        // DATA frames don't consume the state buffer, so can be of any size.
5290        let mut d = [42; 128];
5291        let mut b = octets::OctetsMut::with_slice(&mut d);
5292
5293        let frame_type = b.put_varint(frame::DATA_FRAME_TYPE_ID).unwrap();
5294        s.pipe.client.stream_send(0, frame_type, false).unwrap();
5295
5296        let frame_len = b.put_varint(1 << 24).unwrap();
5297        s.pipe.client.stream_send(0, frame_len, false).unwrap();
5298
5299        s.pipe.client.stream_send(0, &d, false).unwrap();
5300
5301        s.advance().ok();
5302
5303        assert_eq!(s.server.poll(&mut s.pipe.server), Ok((0, Event::Data)));
5304
5305        // GREASE frames consume the state buffer, so need to be limited.
5306        let mut s = Session::new().unwrap();
5307        s.handshake().unwrap();
5308
5309        let mut d = [42; 128];
5310        let mut b = octets::OctetsMut::with_slice(&mut d);
5311
5312        let frame_type = b.put_varint(148_764_065_110_560_899).unwrap();
5313        s.pipe.client.stream_send(0, frame_type, false).unwrap();
5314
5315        let frame_len = b.put_varint(1 << 24).unwrap();
5316        s.pipe.client.stream_send(0, frame_len, false).unwrap();
5317
5318        s.pipe.client.stream_send(0, &d, false).unwrap();
5319
5320        s.advance().ok();
5321
5322        assert_eq!(s.server.poll(&mut s.pipe.server), Err(Error::ExcessiveLoad));
5323    }
5324
5325    #[test]
5326    /// Tests that DATA frames are properly truncated depending on the request
5327    /// stream's outgoing flow control capacity.
5328    fn stream_backpressure() {
5329        let bytes = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
5330
5331        let mut s = Session::new().unwrap();
5332        s.handshake().unwrap();
5333
5334        let (stream, req) = s.send_request(false).unwrap();
5335
5336        let total_data_frames = 6;
5337
5338        for _ in 0..total_data_frames {
5339            assert_eq!(
5340                s.client
5341                    .send_body(&mut s.pipe.client, stream, &bytes, false),
5342                Ok(bytes.len())
5343            );
5344
5345            s.advance().ok();
5346        }
5347
5348        assert_eq!(
5349            s.client.send_body(&mut s.pipe.client, stream, &bytes, true),
5350            Ok(bytes.len() - 2)
5351        );
5352
5353        s.advance().ok();
5354
5355        let mut recv_buf = vec![0; bytes.len()];
5356
5357        let ev_headers = Event::Headers {
5358            list: req,
5359            more_frames: true,
5360        };
5361
5362        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
5363        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
5364        assert_eq!(s.poll_server(), Err(Error::Done));
5365
5366        for _ in 0..total_data_frames {
5367            assert_eq!(
5368                s.recv_body_server(stream, &mut recv_buf),
5369                Ok(bytes.len())
5370            );
5371        }
5372
5373        assert_eq!(
5374            s.recv_body_server(stream, &mut recv_buf),
5375            Ok(bytes.len() - 2)
5376        );
5377
5378        // Fin flag from last send_body() call was not sent as the buffer was
5379        // only partially written.
5380        assert_eq!(s.poll_server(), Err(Error::Done));
5381    }
5382
5383    #[test]
5384    /// Tests that the max header list size setting is enforced.
5385    fn request_max_header_size_limit() {
5386        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
5387        config
5388            .load_cert_chain_from_pem_file("examples/cert.crt")
5389            .unwrap();
5390        config
5391            .load_priv_key_from_pem_file("examples/cert.key")
5392            .unwrap();
5393        config.set_application_protos(&[b"h3"]).unwrap();
5394        config.set_initial_max_data(1500);
5395        config.set_initial_max_stream_data_bidi_local(150);
5396        config.set_initial_max_stream_data_bidi_remote(150);
5397        config.set_initial_max_stream_data_uni(150);
5398        config.set_initial_max_streams_bidi(5);
5399        config.set_initial_max_streams_uni(5);
5400        config.verify_peer(false);
5401
5402        let mut h3_config = Config::new().unwrap();
5403        h3_config.set_max_field_section_size(65);
5404
5405        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
5406
5407        s.handshake().unwrap();
5408
5409        let req = vec![
5410            Header::new(b":method", b"GET"),
5411            Header::new(b":scheme", b"https"),
5412            Header::new(b":authority", b"quic.tech"),
5413            Header::new(b":path", b"/test"),
5414            Header::new(b"aaaaaaa", b"aaaaaaaa"),
5415        ];
5416
5417        let stream = s
5418            .client
5419            .send_request(&mut s.pipe.client, &req, true)
5420            .unwrap();
5421
5422        s.advance().ok();
5423
5424        assert_eq!(stream, 0);
5425
5426        assert_eq!(s.poll_server(), Err(Error::ExcessiveLoad));
5427
5428        assert_eq!(
5429            s.pipe.server.local_error.as_ref().unwrap().error_code,
5430            Error::to_wire(Error::ExcessiveLoad)
5431        );
5432    }
5433
5434    #[test]
5435    /// Tests that Error::TransportError contains a transport error.
5436    fn transport_error() {
5437        let mut s = Session::new().unwrap();
5438        s.handshake().unwrap();
5439
5440        let req = vec![
5441            Header::new(b":method", b"GET"),
5442            Header::new(b":scheme", b"https"),
5443            Header::new(b":authority", b"quic.tech"),
5444            Header::new(b":path", b"/test"),
5445            Header::new(b"user-agent", b"quiche-test"),
5446        ];
5447
5448        // We need to open all streams in the same flight, so we can't use the
5449        // Session::send_request() method because it also calls advance(),
5450        // otherwise the server would send a MAX_STREAMS frame and the client
5451        // wouldn't hit the streams limit.
5452        assert_eq!(s.client.send_request(&mut s.pipe.client, &req, true), Ok(0));
5453        assert_eq!(s.client.send_request(&mut s.pipe.client, &req, true), Ok(4));
5454        assert_eq!(s.client.send_request(&mut s.pipe.client, &req, true), Ok(8));
5455        assert_eq!(
5456            s.client.send_request(&mut s.pipe.client, &req, true),
5457            Ok(12)
5458        );
5459        assert_eq!(
5460            s.client.send_request(&mut s.pipe.client, &req, true),
5461            Ok(16)
5462        );
5463
5464        assert_eq!(
5465            s.client.send_request(&mut s.pipe.client, &req, true),
5466            Err(Error::TransportError(crate::Error::StreamLimit))
5467        );
5468    }
5469
5470    #[test]
5471    /// Tests that sending DATA before HEADERS causes an error.
5472    fn data_before_headers() {
5473        let mut s = Session::new().unwrap();
5474        s.handshake().unwrap();
5475
5476        let mut d = [42; 128];
5477        let mut b = octets::OctetsMut::with_slice(&mut d);
5478
5479        let frame_type = b.put_varint(frame::DATA_FRAME_TYPE_ID).unwrap();
5480        s.pipe.client.stream_send(0, frame_type, false).unwrap();
5481
5482        let frame_len = b.put_varint(5).unwrap();
5483        s.pipe.client.stream_send(0, frame_len, false).unwrap();
5484
5485        s.pipe.client.stream_send(0, b"hello", false).unwrap();
5486
5487        s.advance().ok();
5488
5489        assert_eq!(
5490            s.server.poll(&mut s.pipe.server),
5491            Err(Error::FrameUnexpected)
5492        );
5493    }
5494
5495    #[test]
5496    /// Tests that calling poll() after an error occurred does nothing.
5497    fn poll_after_error() {
5498        let mut s = Session::new().unwrap();
5499        s.handshake().unwrap();
5500
5501        let mut d = [42; 128];
5502        let mut b = octets::OctetsMut::with_slice(&mut d);
5503
5504        let frame_type = b.put_varint(148_764_065_110_560_899).unwrap();
5505        s.pipe.client.stream_send(0, frame_type, false).unwrap();
5506
5507        let frame_len = b.put_varint(1 << 24).unwrap();
5508        s.pipe.client.stream_send(0, frame_len, false).unwrap();
5509
5510        s.pipe.client.stream_send(0, &d, false).unwrap();
5511
5512        s.advance().ok();
5513
5514        assert_eq!(s.server.poll(&mut s.pipe.server), Err(Error::ExcessiveLoad));
5515
5516        // Try to call poll() again after an error occurred.
5517        assert_eq!(s.server.poll(&mut s.pipe.server), Err(Error::Done));
5518    }
5519
5520    #[test]
5521    /// Tests that we limit sending HEADERS based on the stream capacity.
5522    fn headers_blocked() {
5523        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
5524        config
5525            .load_cert_chain_from_pem_file("examples/cert.crt")
5526            .unwrap();
5527        config
5528            .load_priv_key_from_pem_file("examples/cert.key")
5529            .unwrap();
5530        config.set_application_protos(&[b"h3"]).unwrap();
5531        config.set_initial_max_data(70);
5532        config.set_initial_max_stream_data_bidi_local(150);
5533        config.set_initial_max_stream_data_bidi_remote(150);
5534        config.set_initial_max_stream_data_uni(150);
5535        config.set_initial_max_streams_bidi(100);
5536        config.set_initial_max_streams_uni(5);
5537        config.verify_peer(false);
5538
5539        let h3_config = Config::new().unwrap();
5540
5541        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
5542
5543        s.handshake().unwrap();
5544
5545        let req = vec![
5546            Header::new(b":method", b"GET"),
5547            Header::new(b":scheme", b"https"),
5548            Header::new(b":authority", b"quic.tech"),
5549            Header::new(b":path", b"/test"),
5550        ];
5551
5552        assert_eq!(s.client.send_request(&mut s.pipe.client, &req, true), Ok(0));
5553
5554        assert_eq!(
5555            s.client.send_request(&mut s.pipe.client, &req, true),
5556            Err(Error::StreamBlocked)
5557        );
5558
5559        // Clear the writable stream queue.
5560        assert_eq!(s.pipe.client.stream_writable_next(), Some(2));
5561        assert_eq!(s.pipe.client.stream_writable_next(), Some(6));
5562        assert_eq!(s.pipe.client.stream_writable_next(), Some(10));
5563        assert_eq!(s.pipe.client.stream_writable_next(), None);
5564
5565        s.advance().ok();
5566
5567        // Once the server gives flow control credits back, we can send the
5568        // request.
5569        assert_eq!(s.pipe.client.stream_writable_next(), Some(4));
5570        assert_eq!(s.client.send_request(&mut s.pipe.client, &req, true), Ok(4));
5571    }
5572
5573    #[test]
5574    /// Ensure StreamBlocked when connection flow control prevents headers.
5575    fn headers_blocked_on_conn() {
5576        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
5577        config
5578            .load_cert_chain_from_pem_file("examples/cert.crt")
5579            .unwrap();
5580        config
5581            .load_priv_key_from_pem_file("examples/cert.key")
5582            .unwrap();
5583        config.set_application_protos(&[b"h3"]).unwrap();
5584        config.set_initial_max_data(70);
5585        config.set_initial_max_stream_data_bidi_local(150);
5586        config.set_initial_max_stream_data_bidi_remote(150);
5587        config.set_initial_max_stream_data_uni(150);
5588        config.set_initial_max_streams_bidi(100);
5589        config.set_initial_max_streams_uni(5);
5590        config.verify_peer(false);
5591
5592        let h3_config = Config::new().unwrap();
5593
5594        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
5595
5596        s.handshake().unwrap();
5597
5598        // After the HTTP handshake, some bytes of connection flow control have
5599        // been consumed. Fill the connection with more grease data on the control
5600        // stream.
5601        let d = [42; 28];
5602        assert_eq!(s.pipe.client.stream_send(2, &d, false), Ok(23));
5603
5604        let req = vec![
5605            Header::new(b":method", b"GET"),
5606            Header::new(b":scheme", b"https"),
5607            Header::new(b":authority", b"quic.tech"),
5608            Header::new(b":path", b"/test"),
5609        ];
5610
5611        // There is 0 connection-level flow control, so sending a request is
5612        // blocked.
5613        assert_eq!(
5614            s.client.send_request(&mut s.pipe.client, &req, true),
5615            Err(Error::StreamBlocked)
5616        );
5617        assert_eq!(s.pipe.client.stream_writable_next(), None);
5618
5619        // Emit the control stream data and drain it at the server via poll() to
5620        // consumes it via poll() and gives back flow control.
5621        s.advance().ok();
5622        assert_eq!(s.poll_server(), Err(Error::Done));
5623        s.advance().ok();
5624
5625        // Now we can send the request.
5626        assert_eq!(s.pipe.client.stream_writable_next(), Some(2));
5627        assert_eq!(s.pipe.client.stream_writable_next(), Some(6));
5628        assert_eq!(s.client.send_request(&mut s.pipe.client, &req, true), Ok(0));
5629    }
5630
5631    #[test]
5632    /// Ensure STREAM_DATA_BLOCKED is not emitted multiple times with the same
5633    /// offset when trying to send large bodies.
5634    fn send_body_truncation_stream_blocked() {
5635        use crate::testing::decode_pkt;
5636
5637        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
5638        config
5639            .load_cert_chain_from_pem_file("examples/cert.crt")
5640            .unwrap();
5641        config
5642            .load_priv_key_from_pem_file("examples/cert.key")
5643            .unwrap();
5644        config.set_application_protos(&[b"h3"]).unwrap();
5645        config.set_initial_max_data(10000); // large connection-level flow control
5646        config.set_initial_max_stream_data_bidi_local(80);
5647        config.set_initial_max_stream_data_bidi_remote(80);
5648        config.set_initial_max_stream_data_uni(150);
5649        config.set_initial_max_streams_bidi(100);
5650        config.set_initial_max_streams_uni(5);
5651        config.verify_peer(false);
5652
5653        let h3_config = Config::new().unwrap();
5654
5655        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
5656
5657        s.handshake().unwrap();
5658
5659        let (stream, req) = s.send_request(true).unwrap();
5660
5661        let ev_headers = Event::Headers {
5662            list: req,
5663            more_frames: false,
5664        };
5665
5666        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
5667        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
5668
5669        let _ = s.send_response(stream, false).unwrap();
5670
5671        assert_eq!(s.pipe.server.streams.blocked().len(), 0);
5672
5673        // The body must be larger than the stream window would allow
5674        let d = [42; 500];
5675        let mut off = 0;
5676
5677        let sent = s
5678            .server
5679            .send_body(&mut s.pipe.server, stream, &d, true)
5680            .unwrap();
5681        assert_eq!(sent, 25);
5682        off += sent;
5683
5684        // send_body wrote as much as it could (sent < size of buff).
5685        assert_eq!(s.pipe.server.streams.blocked().len(), 1);
5686        assert_eq!(
5687            s.server
5688                .send_body(&mut s.pipe.server, stream, &d[off..], true),
5689            Err(Error::Done)
5690        );
5691        assert_eq!(s.pipe.server.streams.blocked().len(), 1);
5692
5693        // Now read raw frames to see what the QUIC layer did
5694        let mut buf = [0; 65535];
5695        let (len, _) = s.pipe.server.send(&mut buf).unwrap();
5696
5697        let frames = decode_pkt(&mut s.pipe.client, &mut buf[..len]).unwrap();
5698
5699        let mut iter = frames.iter();
5700
5701        assert_eq!(
5702            iter.next(),
5703            Some(&crate::frame::Frame::StreamDataBlocked {
5704                stream_id: 0,
5705                limit: 80,
5706            })
5707        );
5708
5709        // At the server, after sending the STREAM_DATA_BLOCKED frame, we clear
5710        // the mark.
5711        assert_eq!(s.pipe.server.streams.blocked().len(), 0);
5712
5713        // Don't read any data from the client, so stream flow control is never
5714        // given back in the form of changing the stream's max offset.
5715        // Subsequent body send operations will still fail but no more
5716        // STREAM_DATA_BLOCKED frames should be submitted since the limit didn't
5717        // change. No frames means no packet to send.
5718        assert_eq!(
5719            s.server
5720                .send_body(&mut s.pipe.server, stream, &d[off..], true),
5721            Err(Error::Done)
5722        );
5723        assert_eq!(s.pipe.server.streams.blocked().len(), 0);
5724        assert_eq!(s.pipe.server.send(&mut buf), Err(crate::Error::Done));
5725
5726        // Now update the client's max offset manually.
5727        let frames = [crate::frame::Frame::MaxStreamData {
5728            stream_id: 0,
5729            max: 100,
5730        }];
5731
5732        let pkt_type = crate::packet::Type::Short;
5733        assert_eq!(
5734            s.pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
5735            Ok(39),
5736        );
5737
5738        let sent = s
5739            .server
5740            .send_body(&mut s.pipe.server, stream, &d[off..], true)
5741            .unwrap();
5742        assert_eq!(sent, 18);
5743
5744        // Same thing here...
5745        assert_eq!(s.pipe.server.streams.blocked().len(), 1);
5746        assert_eq!(
5747            s.server
5748                .send_body(&mut s.pipe.server, stream, &d[off..], true),
5749            Err(Error::Done)
5750        );
5751        assert_eq!(s.pipe.server.streams.blocked().len(), 1);
5752
5753        let (len, _) = s.pipe.server.send(&mut buf).unwrap();
5754
5755        let frames = decode_pkt(&mut s.pipe.client, &mut buf[..len]).unwrap();
5756
5757        let mut iter = frames.iter();
5758
5759        assert_eq!(
5760            iter.next(),
5761            Some(&crate::frame::Frame::StreamDataBlocked {
5762                stream_id: 0,
5763                limit: 100,
5764            })
5765        );
5766    }
5767
5768    #[test]
5769    /// Ensure stream doesn't hang due to small cwnd.
5770    fn send_body_stream_blocked_by_small_cwnd() {
5771        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
5772        config
5773            .load_cert_chain_from_pem_file("examples/cert.crt")
5774            .unwrap();
5775        config
5776            .load_priv_key_from_pem_file("examples/cert.key")
5777            .unwrap();
5778        config.set_application_protos(&[b"h3"]).unwrap();
5779        config.set_initial_max_data(100000); // large connection-level flow control
5780        config.set_initial_max_stream_data_bidi_local(100000);
5781        config.set_initial_max_stream_data_bidi_remote(50000);
5782        config.set_initial_max_stream_data_uni(150);
5783        config.set_initial_max_streams_bidi(100);
5784        config.set_initial_max_streams_uni(5);
5785        config.verify_peer(false);
5786
5787        let h3_config = Config::new().unwrap();
5788
5789        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
5790
5791        s.handshake().unwrap();
5792
5793        let (stream, req) = s.send_request(true).unwrap();
5794
5795        let ev_headers = Event::Headers {
5796            list: req,
5797            more_frames: false,
5798        };
5799
5800        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
5801        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
5802
5803        let _ = s.send_response(stream, false).unwrap();
5804
5805        // Clear the writable stream queue.
5806        assert_eq!(s.pipe.server.stream_writable_next(), Some(3));
5807        assert_eq!(s.pipe.server.stream_writable_next(), Some(7));
5808        assert_eq!(s.pipe.server.stream_writable_next(), Some(11));
5809        assert_eq!(s.pipe.server.stream_writable_next(), Some(stream));
5810        assert_eq!(s.pipe.server.stream_writable_next(), None);
5811
5812        // The body must be larger than the cwnd would allow.
5813        let send_buf = [42; 80000];
5814
5815        let sent = s
5816            .server
5817            .send_body(&mut s.pipe.server, stream, &send_buf, true)
5818            .unwrap();
5819
5820        // send_body wrote as much as it could (sent < size of buff).
5821        assert_eq!(sent, 11995);
5822
5823        s.advance().ok();
5824
5825        // Client reads received headers and body.
5826        let mut recv_buf = [42; 80000];
5827        assert!(s.poll_client().is_ok());
5828        assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
5829        assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(11995));
5830
5831        s.advance().ok();
5832
5833        // Server send cap is smaller than remaining body buffer.
5834        assert!(s.pipe.server.tx_cap < send_buf.len() - sent);
5835
5836        // Once the server cwnd opens up, we can send more body.
5837        assert_eq!(s.pipe.server.stream_writable_next(), Some(0));
5838    }
5839
5840    #[test]
5841    /// Ensure stream doesn't hang due to small cwnd.
5842    fn send_body_stream_blocked_zero_length() {
5843        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
5844        config
5845            .load_cert_chain_from_pem_file("examples/cert.crt")
5846            .unwrap();
5847        config
5848            .load_priv_key_from_pem_file("examples/cert.key")
5849            .unwrap();
5850        config.set_application_protos(&[b"h3"]).unwrap();
5851        config.set_initial_max_data(100000); // large connection-level flow control
5852        config.set_initial_max_stream_data_bidi_local(100000);
5853        config.set_initial_max_stream_data_bidi_remote(50000);
5854        config.set_initial_max_stream_data_uni(150);
5855        config.set_initial_max_streams_bidi(100);
5856        config.set_initial_max_streams_uni(5);
5857        config.verify_peer(false);
5858
5859        let h3_config = Config::new().unwrap();
5860
5861        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
5862
5863        s.handshake().unwrap();
5864
5865        let (stream, req) = s.send_request(true).unwrap();
5866
5867        let ev_headers = Event::Headers {
5868            list: req,
5869            more_frames: false,
5870        };
5871
5872        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
5873        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
5874
5875        let _ = s.send_response(stream, false).unwrap();
5876
5877        // Clear the writable stream queue.
5878        assert_eq!(s.pipe.server.stream_writable_next(), Some(3));
5879        assert_eq!(s.pipe.server.stream_writable_next(), Some(7));
5880        assert_eq!(s.pipe.server.stream_writable_next(), Some(11));
5881        assert_eq!(s.pipe.server.stream_writable_next(), Some(stream));
5882        assert_eq!(s.pipe.server.stream_writable_next(), None);
5883
5884        // The body is large enough to fill the cwnd, except for enough bytes
5885        // for another DATA frame header (but no payload).
5886        let send_buf = [42; 11994];
5887
5888        let sent = s
5889            .server
5890            .send_body(&mut s.pipe.server, stream, &send_buf, false)
5891            .unwrap();
5892
5893        assert_eq!(sent, 11994);
5894
5895        // There is only enough capacity left for the DATA frame header, but
5896        // no payload.
5897        assert_eq!(s.pipe.server.stream_capacity(stream).unwrap(), 3);
5898        assert_eq!(
5899            s.server
5900                .send_body(&mut s.pipe.server, stream, &send_buf, false),
5901            Err(Error::Done)
5902        );
5903
5904        s.advance().ok();
5905
5906        // Client reads received headers and body.
5907        let mut recv_buf = [42; 80000];
5908        assert!(s.poll_client().is_ok());
5909        assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
5910        assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(11994));
5911
5912        s.advance().ok();
5913
5914        // Once the server cwnd opens up, we can send more body.
5915        assert_eq!(s.pipe.server.stream_writable_next(), Some(0));
5916    }
5917
5918    #[test]
5919    /// Test handling of 0-length DATA writes with and without fin.
5920    fn zero_length_data() {
5921        let mut s = Session::new().unwrap();
5922        s.handshake().unwrap();
5923
5924        let (stream, req) = s.send_request(false).unwrap();
5925
5926        assert_eq!(
5927            s.client.send_body(&mut s.pipe.client, 0, b"", false),
5928            Err(Error::Done)
5929        );
5930        assert_eq!(s.client.send_body(&mut s.pipe.client, 0, b"", true), Ok(0));
5931
5932        s.advance().ok();
5933
5934        let mut recv_buf = vec![0; 100];
5935
5936        let ev_headers = Event::Headers {
5937            list: req,
5938            more_frames: true,
5939        };
5940
5941        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
5942
5943        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
5944        assert_eq!(s.recv_body_server(stream, &mut recv_buf), Err(Error::Done));
5945
5946        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
5947        assert_eq!(s.poll_server(), Err(Error::Done));
5948
5949        let resp = s.send_response(stream, false).unwrap();
5950
5951        assert_eq!(
5952            s.server.send_body(&mut s.pipe.server, 0, b"", false),
5953            Err(Error::Done)
5954        );
5955        assert_eq!(s.server.send_body(&mut s.pipe.server, 0, b"", true), Ok(0));
5956
5957        s.advance().ok();
5958
5959        let ev_headers = Event::Headers {
5960            list: resp,
5961            more_frames: true,
5962        };
5963
5964        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
5965
5966        assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
5967        assert_eq!(s.recv_body_client(stream, &mut recv_buf), Err(Error::Done));
5968
5969        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
5970        assert_eq!(s.poll_client(), Err(Error::Done));
5971    }
5972
5973    #[test]
5974    /// Tests that blocked 0-length DATA writes are reported correctly.
5975    fn zero_length_data_blocked() {
5976        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
5977        config
5978            .load_cert_chain_from_pem_file("examples/cert.crt")
5979            .unwrap();
5980        config
5981            .load_priv_key_from_pem_file("examples/cert.key")
5982            .unwrap();
5983        config.set_application_protos(&[b"h3"]).unwrap();
5984        config.set_initial_max_data(69);
5985        config.set_initial_max_stream_data_bidi_local(150);
5986        config.set_initial_max_stream_data_bidi_remote(150);
5987        config.set_initial_max_stream_data_uni(150);
5988        config.set_initial_max_streams_bidi(100);
5989        config.set_initial_max_streams_uni(5);
5990        config.verify_peer(false);
5991
5992        let h3_config = Config::new().unwrap();
5993
5994        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
5995
5996        s.handshake().unwrap();
5997
5998        let req = vec![
5999            Header::new(b":method", b"GET"),
6000            Header::new(b":scheme", b"https"),
6001            Header::new(b":authority", b"quic.tech"),
6002            Header::new(b":path", b"/test"),
6003        ];
6004
6005        assert_eq!(
6006            s.client.send_request(&mut s.pipe.client, &req, false),
6007            Ok(0)
6008        );
6009
6010        assert_eq!(
6011            s.client.send_body(&mut s.pipe.client, 0, b"", true),
6012            Err(Error::Done)
6013        );
6014
6015        // Clear the writable stream queue.
6016        assert_eq!(s.pipe.client.stream_writable_next(), Some(2));
6017        assert_eq!(s.pipe.client.stream_writable_next(), Some(6));
6018        assert_eq!(s.pipe.client.stream_writable_next(), Some(10));
6019        assert_eq!(s.pipe.client.stream_writable_next(), None);
6020
6021        s.advance().ok();
6022
6023        // Once the server gives flow control credits back, we can send the body.
6024        assert_eq!(s.pipe.client.stream_writable_next(), Some(0));
6025        assert_eq!(s.client.send_body(&mut s.pipe.client, 0, b"", true), Ok(0));
6026    }
6027
6028    #[test]
6029    /// Tests that receiving an empty SETTINGS frame is handled and reported.
6030    fn empty_settings() {
6031        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
6032        config
6033            .load_cert_chain_from_pem_file("examples/cert.crt")
6034            .unwrap();
6035        config
6036            .load_priv_key_from_pem_file("examples/cert.key")
6037            .unwrap();
6038        config.set_application_protos(&[b"h3"]).unwrap();
6039        config.set_initial_max_data(1500);
6040        config.set_initial_max_stream_data_bidi_local(150);
6041        config.set_initial_max_stream_data_bidi_remote(150);
6042        config.set_initial_max_stream_data_uni(150);
6043        config.set_initial_max_streams_bidi(5);
6044        config.set_initial_max_streams_uni(5);
6045        config.verify_peer(false);
6046        config.set_ack_delay_exponent(8);
6047        config.grease(false);
6048
6049        let h3_config = Config::new().unwrap();
6050        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
6051
6052        s.handshake().unwrap();
6053
6054        assert!(s.client.peer_settings_raw().is_some());
6055        assert!(s.server.peer_settings_raw().is_some());
6056    }
6057
6058    #[test]
6059    /// Tests that receiving a H3_DATAGRAM setting is ok.
6060    fn dgram_setting() {
6061        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
6062        config
6063            .load_cert_chain_from_pem_file("examples/cert.crt")
6064            .unwrap();
6065        config
6066            .load_priv_key_from_pem_file("examples/cert.key")
6067            .unwrap();
6068        config.set_application_protos(&[b"h3"]).unwrap();
6069        config.set_initial_max_data(70);
6070        config.set_initial_max_stream_data_bidi_local(150);
6071        config.set_initial_max_stream_data_bidi_remote(150);
6072        config.set_initial_max_stream_data_uni(150);
6073        config.set_initial_max_streams_bidi(100);
6074        config.set_initial_max_streams_uni(5);
6075        config.enable_dgram(true, 1000, 1000);
6076        config.verify_peer(false);
6077
6078        let h3_config = Config::new().unwrap();
6079
6080        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
6081        assert_eq!(s.pipe.handshake(), Ok(()));
6082
6083        s.client.send_settings(&mut s.pipe.client).unwrap();
6084        assert_eq!(s.pipe.advance(), Ok(()));
6085
6086        // Before processing SETTINGS (via poll), HTTP/3 DATAGRAMS are not
6087        // enabled.
6088        assert!(!s.server.dgram_enabled_by_peer(&s.pipe.server));
6089
6090        // When everything is ok, poll returns Done and DATAGRAM is enabled.
6091        assert_eq!(s.server.poll(&mut s.pipe.server), Err(Error::Done));
6092        assert!(s.server.dgram_enabled_by_peer(&s.pipe.server));
6093
6094        // Now detect things on the client
6095        s.server.send_settings(&mut s.pipe.server).unwrap();
6096        assert_eq!(s.pipe.advance(), Ok(()));
6097        assert!(!s.client.dgram_enabled_by_peer(&s.pipe.client));
6098        assert_eq!(s.client.poll(&mut s.pipe.client), Err(Error::Done));
6099        assert!(s.client.dgram_enabled_by_peer(&s.pipe.client));
6100    }
6101
6102    #[test]
6103    /// Tests that receiving a H3_DATAGRAM setting when no TP is set generates
6104    /// an error.
6105    fn dgram_setting_no_tp() {
6106        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
6107        config
6108            .load_cert_chain_from_pem_file("examples/cert.crt")
6109            .unwrap();
6110        config
6111            .load_priv_key_from_pem_file("examples/cert.key")
6112            .unwrap();
6113        config.set_application_protos(&[b"h3"]).unwrap();
6114        config.set_initial_max_data(70);
6115        config.set_initial_max_stream_data_bidi_local(150);
6116        config.set_initial_max_stream_data_bidi_remote(150);
6117        config.set_initial_max_stream_data_uni(150);
6118        config.set_initial_max_streams_bidi(100);
6119        config.set_initial_max_streams_uni(5);
6120        config.verify_peer(false);
6121
6122        let h3_config = Config::new().unwrap();
6123
6124        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
6125        assert_eq!(s.pipe.handshake(), Ok(()));
6126
6127        s.client.control_stream_id = Some(
6128            s.client
6129                .open_uni_stream(
6130                    &mut s.pipe.client,
6131                    stream::HTTP3_CONTROL_STREAM_TYPE_ID,
6132                )
6133                .unwrap(),
6134        );
6135
6136        let settings = frame::Frame::Settings {
6137            max_field_section_size: None,
6138            qpack_max_table_capacity: None,
6139            qpack_blocked_streams: None,
6140            connect_protocol_enabled: None,
6141            h3_datagram: Some(1),
6142            grease: None,
6143            additional_settings: Default::default(),
6144            raw: Default::default(),
6145        };
6146
6147        s.send_frame_client(settings, s.client.control_stream_id.unwrap(), false)
6148            .unwrap();
6149
6150        assert_eq!(s.pipe.advance(), Ok(()));
6151
6152        assert_eq!(s.server.poll(&mut s.pipe.server), Err(Error::SettingsError));
6153    }
6154
6155    #[test]
6156    /// Tests that receiving SETTINGS with prohibited values generates an error.
6157    fn settings_h2_prohibited() {
6158        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
6159        config
6160            .load_cert_chain_from_pem_file("examples/cert.crt")
6161            .unwrap();
6162        config
6163            .load_priv_key_from_pem_file("examples/cert.key")
6164            .unwrap();
6165        config.set_application_protos(&[b"h3"]).unwrap();
6166        config.set_initial_max_data(70);
6167        config.set_initial_max_stream_data_bidi_local(150);
6168        config.set_initial_max_stream_data_bidi_remote(150);
6169        config.set_initial_max_stream_data_uni(150);
6170        config.set_initial_max_streams_bidi(100);
6171        config.set_initial_max_streams_uni(5);
6172        config.verify_peer(false);
6173
6174        let h3_config = Config::new().unwrap();
6175
6176        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
6177        assert_eq!(s.pipe.handshake(), Ok(()));
6178
6179        s.client.control_stream_id = Some(
6180            s.client
6181                .open_uni_stream(
6182                    &mut s.pipe.client,
6183                    stream::HTTP3_CONTROL_STREAM_TYPE_ID,
6184                )
6185                .unwrap(),
6186        );
6187
6188        s.server.control_stream_id = Some(
6189            s.server
6190                .open_uni_stream(
6191                    &mut s.pipe.server,
6192                    stream::HTTP3_CONTROL_STREAM_TYPE_ID,
6193                )
6194                .unwrap(),
6195        );
6196
6197        let frame_payload_len = 2u64;
6198        let settings = [
6199            frame::SETTINGS_FRAME_TYPE_ID as u8,
6200            frame_payload_len as u8,
6201            0x2, // 0x2 is a reserved setting type
6202            1,
6203        ];
6204
6205        s.send_arbitrary_stream_data_client(
6206            &settings,
6207            s.client.control_stream_id.unwrap(),
6208            false,
6209        )
6210        .unwrap();
6211
6212        s.send_arbitrary_stream_data_server(
6213            &settings,
6214            s.server.control_stream_id.unwrap(),
6215            false,
6216        )
6217        .unwrap();
6218
6219        assert_eq!(s.pipe.advance(), Ok(()));
6220
6221        assert_eq!(s.server.poll(&mut s.pipe.server), Err(Error::SettingsError));
6222
6223        assert_eq!(s.client.poll(&mut s.pipe.client), Err(Error::SettingsError));
6224    }
6225
6226    #[test]
6227    /// Tests that setting SETTINGS with prohibited values generates an error.
6228    fn set_prohibited_additional_settings() {
6229        let mut h3_config = Config::new().unwrap();
6230        assert_eq!(
6231            h3_config.set_additional_settings(vec![(
6232                frame::SETTINGS_QPACK_MAX_TABLE_CAPACITY,
6233                43
6234            )]),
6235            Err(Error::SettingsError)
6236        );
6237        assert_eq!(
6238            h3_config.set_additional_settings(vec![(
6239                frame::SETTINGS_MAX_FIELD_SECTION_SIZE,
6240                43
6241            )]),
6242            Err(Error::SettingsError)
6243        );
6244        assert_eq!(
6245            h3_config.set_additional_settings(vec![(
6246                frame::SETTINGS_QPACK_BLOCKED_STREAMS,
6247                43
6248            )]),
6249            Err(Error::SettingsError)
6250        );
6251        assert_eq!(
6252            h3_config.set_additional_settings(vec![(
6253                frame::SETTINGS_ENABLE_CONNECT_PROTOCOL,
6254                43
6255            )]),
6256            Err(Error::SettingsError)
6257        );
6258        assert_eq!(
6259            h3_config
6260                .set_additional_settings(vec![(frame::SETTINGS_H3_DATAGRAM, 43)]),
6261            Err(Error::SettingsError)
6262        );
6263    }
6264
6265    #[test]
6266    /// Tests additional settings are actually exchanged by the peers.
6267    fn set_additional_settings() {
6268        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
6269        config
6270            .load_cert_chain_from_pem_file("examples/cert.crt")
6271            .unwrap();
6272        config
6273            .load_priv_key_from_pem_file("examples/cert.key")
6274            .unwrap();
6275        config.set_application_protos(&[b"h3"]).unwrap();
6276        config.set_initial_max_data(70);
6277        config.set_initial_max_stream_data_bidi_local(150);
6278        config.set_initial_max_stream_data_bidi_remote(150);
6279        config.set_initial_max_stream_data_uni(150);
6280        config.set_initial_max_streams_bidi(100);
6281        config.set_initial_max_streams_uni(5);
6282        config.verify_peer(false);
6283        config.grease(false);
6284
6285        let mut h3_config = Config::new().unwrap();
6286        h3_config
6287            .set_additional_settings(vec![(42, 43), (44, 45)])
6288            .unwrap();
6289
6290        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
6291        assert_eq!(s.pipe.handshake(), Ok(()));
6292
6293        assert_eq!(s.pipe.advance(), Ok(()));
6294
6295        s.client.send_settings(&mut s.pipe.client).unwrap();
6296        assert_eq!(s.pipe.advance(), Ok(()));
6297        assert_eq!(s.server.poll(&mut s.pipe.server), Err(Error::Done));
6298
6299        s.server.send_settings(&mut s.pipe.server).unwrap();
6300        assert_eq!(s.pipe.advance(), Ok(()));
6301        assert_eq!(s.client.poll(&mut s.pipe.client), Err(Error::Done));
6302
6303        assert_eq!(
6304            s.server.peer_settings_raw(),
6305            Some(&[(42, 43), (44, 45)][..])
6306        );
6307        assert_eq!(
6308            s.client.peer_settings_raw(),
6309            Some(&[(42, 43), (44, 45)][..])
6310        );
6311    }
6312
6313    #[test]
6314    /// Send a single DATAGRAM.
6315    fn single_dgram() {
6316        let mut buf = [0; 65535];
6317        let mut s = Session::new().unwrap();
6318        s.handshake().unwrap();
6319
6320        // We'll send default data of 10 bytes on flow ID 0.
6321        let result = (11, 0, 1);
6322
6323        s.send_dgram_client(0).unwrap();
6324
6325        assert_eq!(s.poll_server(), Err(Error::Done));
6326        assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
6327
6328        s.send_dgram_server(0).unwrap();
6329        assert_eq!(s.poll_client(), Err(Error::Done));
6330        assert_eq!(s.recv_dgram_client(&mut buf), Ok(result));
6331    }
6332
6333    #[test]
6334    /// Send multiple DATAGRAMs.
6335    fn multiple_dgram() {
6336        let mut buf = [0; 65535];
6337        let mut s = Session::new().unwrap();
6338        s.handshake().unwrap();
6339
6340        // We'll send default data of 10 bytes on flow ID 0.
6341        let result = (11, 0, 1);
6342
6343        s.send_dgram_client(0).unwrap();
6344        s.send_dgram_client(0).unwrap();
6345        s.send_dgram_client(0).unwrap();
6346
6347        assert_eq!(s.poll_server(), Err(Error::Done));
6348        assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
6349        assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
6350        assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
6351        assert_eq!(s.recv_dgram_server(&mut buf), Err(Error::Done));
6352
6353        s.send_dgram_server(0).unwrap();
6354        s.send_dgram_server(0).unwrap();
6355        s.send_dgram_server(0).unwrap();
6356
6357        assert_eq!(s.poll_client(), Err(Error::Done));
6358        assert_eq!(s.recv_dgram_client(&mut buf), Ok(result));
6359        assert_eq!(s.recv_dgram_client(&mut buf), Ok(result));
6360        assert_eq!(s.recv_dgram_client(&mut buf), Ok(result));
6361        assert_eq!(s.recv_dgram_client(&mut buf), Err(Error::Done));
6362    }
6363
6364    #[test]
6365    /// Send more DATAGRAMs than the send queue allows.
6366    fn multiple_dgram_overflow() {
6367        let mut buf = [0; 65535];
6368        let mut s = Session::new().unwrap();
6369        s.handshake().unwrap();
6370
6371        // We'll send default data of 10 bytes on flow ID 0.
6372        let result = (11, 0, 1);
6373
6374        // Five DATAGRAMs
6375        s.send_dgram_client(0).unwrap();
6376        s.send_dgram_client(0).unwrap();
6377        s.send_dgram_client(0).unwrap();
6378        s.send_dgram_client(0).unwrap();
6379        s.send_dgram_client(0).unwrap();
6380
6381        // Only 3 independent DATAGRAMs to read events will fire.
6382        assert_eq!(s.poll_server(), Err(Error::Done));
6383        assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
6384        assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
6385        assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
6386        assert_eq!(s.recv_dgram_server(&mut buf), Err(Error::Done));
6387    }
6388
6389    #[test]
6390    /// Send a single DATAGRAM and request.
6391    fn poll_datagram_cycling_no_read() {
6392        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
6393        config
6394            .load_cert_chain_from_pem_file("examples/cert.crt")
6395            .unwrap();
6396        config
6397            .load_priv_key_from_pem_file("examples/cert.key")
6398            .unwrap();
6399        config.set_application_protos(&[b"h3"]).unwrap();
6400        config.set_initial_max_data(1500);
6401        config.set_initial_max_stream_data_bidi_local(150);
6402        config.set_initial_max_stream_data_bidi_remote(150);
6403        config.set_initial_max_stream_data_uni(150);
6404        config.set_initial_max_streams_bidi(100);
6405        config.set_initial_max_streams_uni(5);
6406        config.verify_peer(false);
6407        config.enable_dgram(true, 100, 100);
6408
6409        let h3_config = Config::new().unwrap();
6410        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
6411        s.handshake().unwrap();
6412
6413        // Send request followed by DATAGRAM on client side.
6414        let (stream, req) = s.send_request(false).unwrap();
6415
6416        s.send_body_client(stream, true).unwrap();
6417
6418        let ev_headers = Event::Headers {
6419            list: req,
6420            more_frames: true,
6421        };
6422
6423        s.send_dgram_client(0).unwrap();
6424
6425        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
6426        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
6427
6428        assert_eq!(s.poll_server(), Err(Error::Done));
6429    }
6430
6431    #[test]
6432    /// Send a single DATAGRAM and request.
6433    fn poll_datagram_single_read() {
6434        let mut buf = [0; 65535];
6435
6436        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
6437        config
6438            .load_cert_chain_from_pem_file("examples/cert.crt")
6439            .unwrap();
6440        config
6441            .load_priv_key_from_pem_file("examples/cert.key")
6442            .unwrap();
6443        config.set_application_protos(&[b"h3"]).unwrap();
6444        config.set_initial_max_data(1500);
6445        config.set_initial_max_stream_data_bidi_local(150);
6446        config.set_initial_max_stream_data_bidi_remote(150);
6447        config.set_initial_max_stream_data_uni(150);
6448        config.set_initial_max_streams_bidi(100);
6449        config.set_initial_max_streams_uni(5);
6450        config.verify_peer(false);
6451        config.enable_dgram(true, 100, 100);
6452
6453        let h3_config = Config::new().unwrap();
6454        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
6455        s.handshake().unwrap();
6456
6457        // We'll send default data of 10 bytes on flow ID 0.
6458        let result = (11, 0, 1);
6459
6460        // Send request followed by DATAGRAM on client side.
6461        let (stream, req) = s.send_request(false).unwrap();
6462
6463        let body = s.send_body_client(stream, true).unwrap();
6464
6465        let mut recv_buf = vec![0; body.len()];
6466
6467        let ev_headers = Event::Headers {
6468            list: req,
6469            more_frames: true,
6470        };
6471
6472        s.send_dgram_client(0).unwrap();
6473
6474        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
6475        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
6476
6477        assert_eq!(s.poll_server(), Err(Error::Done));
6478
6479        assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
6480
6481        assert_eq!(s.poll_server(), Err(Error::Done));
6482
6483        assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
6484        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
6485        assert_eq!(s.poll_server(), Err(Error::Done));
6486
6487        // Send response followed by DATAGRAM on server side
6488        let resp = s.send_response(stream, false).unwrap();
6489
6490        let body = s.send_body_server(stream, true).unwrap();
6491
6492        let mut recv_buf = vec![0; body.len()];
6493
6494        let ev_headers = Event::Headers {
6495            list: resp,
6496            more_frames: true,
6497        };
6498
6499        s.send_dgram_server(0).unwrap();
6500
6501        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
6502        assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
6503
6504        assert_eq!(s.poll_client(), Err(Error::Done));
6505
6506        assert_eq!(s.recv_dgram_client(&mut buf), Ok(result));
6507
6508        assert_eq!(s.poll_client(), Err(Error::Done));
6509
6510        assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(body.len()));
6511
6512        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
6513        assert_eq!(s.poll_client(), Err(Error::Done));
6514    }
6515
6516    #[test]
6517    /// Send multiple DATAGRAMs and requests.
6518    fn poll_datagram_multi_read() {
6519        let mut buf = [0; 65535];
6520
6521        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
6522        config
6523            .load_cert_chain_from_pem_file("examples/cert.crt")
6524            .unwrap();
6525        config
6526            .load_priv_key_from_pem_file("examples/cert.key")
6527            .unwrap();
6528        config.set_application_protos(&[b"h3"]).unwrap();
6529        config.set_initial_max_data(1500);
6530        config.set_initial_max_stream_data_bidi_local(150);
6531        config.set_initial_max_stream_data_bidi_remote(150);
6532        config.set_initial_max_stream_data_uni(150);
6533        config.set_initial_max_streams_bidi(100);
6534        config.set_initial_max_streams_uni(5);
6535        config.verify_peer(false);
6536        config.enable_dgram(true, 100, 100);
6537
6538        let h3_config = Config::new().unwrap();
6539        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
6540        s.handshake().unwrap();
6541
6542        // 10 bytes on flow ID 0 and 2.
6543        let flow_0_result = (11, 0, 1);
6544        let flow_2_result = (11, 2, 1);
6545
6546        // Send requests followed by DATAGRAMs on client side.
6547        let (stream, req) = s.send_request(false).unwrap();
6548
6549        let body = s.send_body_client(stream, true).unwrap();
6550
6551        let mut recv_buf = vec![0; body.len()];
6552
6553        let ev_headers = Event::Headers {
6554            list: req,
6555            more_frames: true,
6556        };
6557
6558        s.send_dgram_client(0).unwrap();
6559        s.send_dgram_client(0).unwrap();
6560        s.send_dgram_client(0).unwrap();
6561        s.send_dgram_client(0).unwrap();
6562        s.send_dgram_client(0).unwrap();
6563        s.send_dgram_client(2).unwrap();
6564        s.send_dgram_client(2).unwrap();
6565        s.send_dgram_client(2).unwrap();
6566        s.send_dgram_client(2).unwrap();
6567        s.send_dgram_client(2).unwrap();
6568
6569        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
6570        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
6571
6572        assert_eq!(s.poll_server(), Err(Error::Done));
6573
6574        // Second cycle, start to read
6575        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
6576        assert_eq!(s.poll_server(), Err(Error::Done));
6577        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
6578        assert_eq!(s.poll_server(), Err(Error::Done));
6579        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
6580        assert_eq!(s.poll_server(), Err(Error::Done));
6581
6582        assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
6583        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
6584
6585        assert_eq!(s.poll_server(), Err(Error::Done));
6586
6587        // Third cycle.
6588        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
6589        assert_eq!(s.poll_server(), Err(Error::Done));
6590        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
6591        assert_eq!(s.poll_server(), Err(Error::Done));
6592        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
6593        assert_eq!(s.poll_server(), Err(Error::Done));
6594        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
6595        assert_eq!(s.poll_server(), Err(Error::Done));
6596        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
6597        assert_eq!(s.poll_server(), Err(Error::Done));
6598        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
6599        assert_eq!(s.poll_server(), Err(Error::Done));
6600        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
6601        assert_eq!(s.poll_server(), Err(Error::Done));
6602
6603        // Send response followed by DATAGRAM on server side
6604        let resp = s.send_response(stream, false).unwrap();
6605
6606        let body = s.send_body_server(stream, true).unwrap();
6607
6608        let mut recv_buf = vec![0; body.len()];
6609
6610        let ev_headers = Event::Headers {
6611            list: resp,
6612            more_frames: true,
6613        };
6614
6615        s.send_dgram_server(0).unwrap();
6616        s.send_dgram_server(0).unwrap();
6617        s.send_dgram_server(0).unwrap();
6618        s.send_dgram_server(0).unwrap();
6619        s.send_dgram_server(0).unwrap();
6620        s.send_dgram_server(2).unwrap();
6621        s.send_dgram_server(2).unwrap();
6622        s.send_dgram_server(2).unwrap();
6623        s.send_dgram_server(2).unwrap();
6624        s.send_dgram_server(2).unwrap();
6625
6626        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
6627        assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
6628
6629        assert_eq!(s.poll_client(), Err(Error::Done));
6630
6631        // Second cycle, start to read
6632        assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_0_result));
6633        assert_eq!(s.poll_client(), Err(Error::Done));
6634        assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_0_result));
6635        assert_eq!(s.poll_client(), Err(Error::Done));
6636        assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_0_result));
6637        assert_eq!(s.poll_client(), Err(Error::Done));
6638
6639        assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(body.len()));
6640        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
6641
6642        assert_eq!(s.poll_client(), Err(Error::Done));
6643
6644        // Third cycle.
6645        assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_0_result));
6646        assert_eq!(s.poll_client(), Err(Error::Done));
6647        assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_0_result));
6648        assert_eq!(s.poll_client(), Err(Error::Done));
6649        assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_2_result));
6650        assert_eq!(s.poll_client(), Err(Error::Done));
6651        assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_2_result));
6652        assert_eq!(s.poll_client(), Err(Error::Done));
6653        assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_2_result));
6654        assert_eq!(s.poll_client(), Err(Error::Done));
6655        assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_2_result));
6656        assert_eq!(s.poll_client(), Err(Error::Done));
6657        assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_2_result));
6658        assert_eq!(s.poll_client(), Err(Error::Done));
6659    }
6660
6661    #[test]
6662    /// Tests that the Finished event is not issued for streams of unknown type
6663    /// (e.g. GREASE).
6664    fn finished_is_for_requests() {
6665        let mut s = Session::new().unwrap();
6666        s.handshake().unwrap();
6667
6668        assert_eq!(s.poll_client(), Err(Error::Done));
6669        assert_eq!(s.poll_server(), Err(Error::Done));
6670
6671        assert_eq!(s.client.open_grease_stream(&mut s.pipe.client), Ok(()));
6672        assert_eq!(s.pipe.advance(), Ok(()));
6673
6674        assert_eq!(s.poll_client(), Err(Error::Done));
6675        assert_eq!(s.poll_server(), Err(Error::Done));
6676    }
6677
6678    #[test]
6679    /// Tests that streams are marked as finished only once.
6680    fn finished_once() {
6681        let mut s = Session::new().unwrap();
6682        s.handshake().unwrap();
6683
6684        let (stream, req) = s.send_request(false).unwrap();
6685        let body = s.send_body_client(stream, true).unwrap();
6686
6687        let mut recv_buf = vec![0; body.len()];
6688
6689        let ev_headers = Event::Headers {
6690            list: req,
6691            more_frames: true,
6692        };
6693
6694        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
6695        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
6696
6697        assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
6698        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
6699
6700        assert_eq!(s.recv_body_server(stream, &mut recv_buf), Err(Error::Done));
6701        assert_eq!(s.poll_server(), Err(Error::Done));
6702    }
6703
6704    #[test]
6705    /// Tests that the Data event is properly re-armed.
6706    fn data_event_rearm() {
6707        let bytes = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
6708
6709        let mut s = Session::new().unwrap();
6710        s.handshake().unwrap();
6711
6712        let (r1_id, r1_hdrs) = s.send_request(false).unwrap();
6713
6714        let mut recv_buf = vec![0; bytes.len()];
6715
6716        let r1_ev_headers = Event::Headers {
6717            list: r1_hdrs,
6718            more_frames: true,
6719        };
6720
6721        // Manually send an incomplete DATA frame (i.e. the frame size is longer
6722        // than the actual data sent).
6723        {
6724            let mut d = [42; 10];
6725            let mut b = octets::OctetsMut::with_slice(&mut d);
6726
6727            b.put_varint(frame::DATA_FRAME_TYPE_ID).unwrap();
6728            b.put_varint(bytes.len() as u64).unwrap();
6729            let off = b.off();
6730            s.pipe.client.stream_send(r1_id, &d[..off], false).unwrap();
6731
6732            assert_eq!(
6733                s.pipe.client.stream_send(r1_id, &bytes[..5], false),
6734                Ok(5)
6735            );
6736
6737            s.advance().ok();
6738        }
6739
6740        assert_eq!(s.poll_server(), Ok((r1_id, r1_ev_headers)));
6741        assert_eq!(s.poll_server(), Ok((r1_id, Event::Data)));
6742        assert_eq!(s.poll_server(), Err(Error::Done));
6743
6744        // Read the available body data.
6745        assert_eq!(s.recv_body_server(r1_id, &mut recv_buf), Ok(5));
6746
6747        // Send the remaining DATA payload.
6748        assert_eq!(s.pipe.client.stream_send(r1_id, &bytes[5..], false), Ok(5));
6749        s.advance().ok();
6750
6751        assert_eq!(s.poll_server(), Ok((r1_id, Event::Data)));
6752        assert_eq!(s.poll_server(), Err(Error::Done));
6753
6754        // Read the rest of the body data.
6755        assert_eq!(s.recv_body_server(r1_id, &mut recv_buf), Ok(5));
6756        assert_eq!(s.poll_server(), Err(Error::Done));
6757
6758        // Send more data.
6759        let r1_body = s.send_body_client(r1_id, false).unwrap();
6760
6761        assert_eq!(s.poll_server(), Ok((r1_id, Event::Data)));
6762        assert_eq!(s.poll_server(), Err(Error::Done));
6763
6764        assert_eq!(s.recv_body_server(r1_id, &mut recv_buf), Ok(r1_body.len()));
6765
6766        // Send a new request to ensure cross-stream events don't break rearming.
6767        let (r2_id, r2_hdrs) = s.send_request(false).unwrap();
6768        let r2_ev_headers = Event::Headers {
6769            list: r2_hdrs,
6770            more_frames: true,
6771        };
6772        let r2_body = s.send_body_client(r2_id, false).unwrap();
6773
6774        s.advance().ok();
6775
6776        assert_eq!(s.poll_server(), Ok((r2_id, r2_ev_headers)));
6777        assert_eq!(s.poll_server(), Ok((r2_id, Event::Data)));
6778        assert_eq!(s.recv_body_server(r2_id, &mut recv_buf), Ok(r2_body.len()));
6779        assert_eq!(s.poll_server(), Err(Error::Done));
6780
6781        // Send more data on request 1, then trailing HEADERS.
6782        let r1_body = s.send_body_client(r1_id, false).unwrap();
6783
6784        let trailers = vec![Header::new(b"hello", b"world")];
6785
6786        s.client
6787            .send_headers(&mut s.pipe.client, r1_id, &trailers, true)
6788            .unwrap();
6789
6790        let r1_ev_trailers = Event::Headers {
6791            list: trailers.clone(),
6792            more_frames: false,
6793        };
6794
6795        s.advance().ok();
6796
6797        assert_eq!(s.poll_server(), Ok((r1_id, Event::Data)));
6798        assert_eq!(s.recv_body_server(r1_id, &mut recv_buf), Ok(r1_body.len()));
6799
6800        assert_eq!(s.poll_server(), Ok((r1_id, r1_ev_trailers)));
6801        assert_eq!(s.poll_server(), Ok((r1_id, Event::Finished)));
6802        assert_eq!(s.poll_server(), Err(Error::Done));
6803
6804        // Send more data on request 2, then trailing HEADERS.
6805        let r2_body = s.send_body_client(r2_id, false).unwrap();
6806
6807        s.client
6808            .send_headers(&mut s.pipe.client, r2_id, &trailers, false)
6809            .unwrap();
6810
6811        let r2_ev_trailers = Event::Headers {
6812            list: trailers,
6813            more_frames: true,
6814        };
6815
6816        s.advance().ok();
6817
6818        assert_eq!(s.poll_server(), Ok((r2_id, Event::Data)));
6819        assert_eq!(s.recv_body_server(r2_id, &mut recv_buf), Ok(r2_body.len()));
6820        assert_eq!(s.poll_server(), Ok((r2_id, r2_ev_trailers)));
6821        assert_eq!(s.poll_server(), Err(Error::Done));
6822
6823        let (r3_id, r3_hdrs) = s.send_request(false).unwrap();
6824
6825        let r3_ev_headers = Event::Headers {
6826            list: r3_hdrs,
6827            more_frames: true,
6828        };
6829
6830        // Manually send an incomplete DATA frame (i.e. only the header is sent).
6831        {
6832            let mut d = [42; 10];
6833            let mut b = octets::OctetsMut::with_slice(&mut d);
6834
6835            b.put_varint(frame::DATA_FRAME_TYPE_ID).unwrap();
6836            b.put_varint(bytes.len() as u64).unwrap();
6837            let off = b.off();
6838            s.pipe.client.stream_send(r3_id, &d[..off], false).unwrap();
6839
6840            s.advance().ok();
6841        }
6842
6843        assert_eq!(s.poll_server(), Ok((r3_id, r3_ev_headers)));
6844        assert_eq!(s.poll_server(), Ok((r3_id, Event::Data)));
6845        assert_eq!(s.poll_server(), Err(Error::Done));
6846
6847        assert_eq!(s.recv_body_server(r3_id, &mut recv_buf), Err(Error::Done));
6848
6849        assert_eq!(s.pipe.client.stream_send(r3_id, &bytes[..5], false), Ok(5));
6850
6851        s.advance().ok();
6852
6853        assert_eq!(s.poll_server(), Ok((r3_id, Event::Data)));
6854        assert_eq!(s.poll_server(), Err(Error::Done));
6855
6856        assert_eq!(s.recv_body_server(r3_id, &mut recv_buf), Ok(5));
6857
6858        assert_eq!(s.pipe.client.stream_send(r3_id, &bytes[5..], false), Ok(5));
6859        s.advance().ok();
6860
6861        assert_eq!(s.poll_server(), Ok((r3_id, Event::Data)));
6862        assert_eq!(s.poll_server(), Err(Error::Done));
6863
6864        assert_eq!(s.recv_body_server(r3_id, &mut recv_buf), Ok(5));
6865
6866        // Buffer multiple data frames.
6867        let body = s.send_body_client(r3_id, false).unwrap();
6868        s.send_body_client(r3_id, false).unwrap();
6869        s.send_body_client(r3_id, false).unwrap();
6870
6871        assert_eq!(s.poll_server(), Ok((r3_id, Event::Data)));
6872        assert_eq!(s.poll_server(), Err(Error::Done));
6873
6874        {
6875            let mut d = [42; 10];
6876            let mut b = octets::OctetsMut::with_slice(&mut d);
6877
6878            b.put_varint(frame::DATA_FRAME_TYPE_ID).unwrap();
6879            b.put_varint(0).unwrap();
6880            let off = b.off();
6881            s.pipe.client.stream_send(r3_id, &d[..off], true).unwrap();
6882
6883            s.advance().ok();
6884        }
6885
6886        let mut recv_buf = vec![0; bytes.len() * 3];
6887
6888        assert_eq!(s.recv_body_server(r3_id, &mut recv_buf), Ok(body.len() * 3));
6889    }
6890
6891    #[test]
6892    /// Tests that the Datagram event is properly re-armed.
6893    fn dgram_event_rearm() {
6894        let mut buf = [0; 65535];
6895
6896        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
6897        config
6898            .load_cert_chain_from_pem_file("examples/cert.crt")
6899            .unwrap();
6900        config
6901            .load_priv_key_from_pem_file("examples/cert.key")
6902            .unwrap();
6903        config.set_application_protos(&[b"h3"]).unwrap();
6904        config.set_initial_max_data(1500);
6905        config.set_initial_max_stream_data_bidi_local(150);
6906        config.set_initial_max_stream_data_bidi_remote(150);
6907        config.set_initial_max_stream_data_uni(150);
6908        config.set_initial_max_streams_bidi(100);
6909        config.set_initial_max_streams_uni(5);
6910        config.verify_peer(false);
6911        config.enable_dgram(true, 100, 100);
6912
6913        let h3_config = Config::new().unwrap();
6914        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
6915        s.handshake().unwrap();
6916
6917        // 10 bytes on flow ID 0 and 2.
6918        let flow_0_result = (11, 0, 1);
6919        let flow_2_result = (11, 2, 1);
6920
6921        // Send requests followed by DATAGRAMs on client side.
6922        let (stream, req) = s.send_request(false).unwrap();
6923
6924        let body = s.send_body_client(stream, true).unwrap();
6925
6926        let mut recv_buf = vec![0; body.len()];
6927
6928        let ev_headers = Event::Headers {
6929            list: req,
6930            more_frames: true,
6931        };
6932
6933        s.send_dgram_client(0).unwrap();
6934        s.send_dgram_client(0).unwrap();
6935        s.send_dgram_client(2).unwrap();
6936        s.send_dgram_client(2).unwrap();
6937
6938        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
6939        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
6940
6941        assert_eq!(s.poll_server(), Err(Error::Done));
6942        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
6943
6944        assert_eq!(s.poll_server(), Err(Error::Done));
6945        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
6946
6947        assert_eq!(s.poll_server(), Err(Error::Done));
6948        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
6949
6950        assert_eq!(s.poll_server(), Err(Error::Done));
6951        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
6952
6953        assert_eq!(s.poll_server(), Err(Error::Done));
6954
6955        s.send_dgram_client(0).unwrap();
6956        s.send_dgram_client(2).unwrap();
6957
6958        assert_eq!(s.poll_server(), Err(Error::Done));
6959
6960        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
6961        assert_eq!(s.poll_server(), Err(Error::Done));
6962
6963        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
6964        assert_eq!(s.poll_server(), Err(Error::Done));
6965
6966        assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
6967        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
6968    }
6969
6970    #[test]
6971    fn reset_stream() {
6972        let mut buf = [0; 65535];
6973
6974        let mut s = Session::new().unwrap();
6975        s.handshake().unwrap();
6976
6977        // Client sends request.
6978        let (stream, req) = s.send_request(false).unwrap();
6979
6980        let ev_headers = Event::Headers {
6981            list: req,
6982            more_frames: true,
6983        };
6984
6985        // Server sends response and closes stream.
6986        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
6987        assert_eq!(s.poll_server(), Err(Error::Done));
6988
6989        let resp = s.send_response(stream, true).unwrap();
6990
6991        let ev_headers = Event::Headers {
6992            list: resp,
6993            more_frames: false,
6994        };
6995
6996        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
6997        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
6998        assert_eq!(s.poll_client(), Err(Error::Done));
6999
7000        // Client sends RESET_STREAM, closing stream.
7001        let frames = [crate::frame::Frame::ResetStream {
7002            stream_id: stream,
7003            error_code: 42,
7004            final_size: 68,
7005        }];
7006
7007        let pkt_type = crate::packet::Type::Short;
7008        assert_eq!(
7009            s.pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
7010            Ok(39)
7011        );
7012
7013        // Server issues Reset event for the stream.
7014        assert_eq!(s.poll_server(), Ok((stream, Event::Reset(42))));
7015        assert_eq!(s.poll_server(), Err(Error::Done));
7016
7017        // Sending RESET_STREAM again shouldn't trigger another Reset event.
7018        assert_eq!(
7019            s.pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
7020            Ok(39)
7021        );
7022
7023        assert_eq!(s.poll_server(), Err(Error::Done));
7024    }
7025
7026    #[test]
7027    fn reset_finished_at_server() {
7028        let mut s = Session::new().unwrap();
7029        s.handshake().unwrap();
7030
7031        // Client sends HEADERS and doesn't fin
7032        let (stream, _req) = s.send_request(false).unwrap();
7033
7034        // ..then Client sends RESET_STREAM
7035        assert_eq!(
7036            s.pipe.client.stream_shutdown(0, crate::Shutdown::Write, 0),
7037            Ok(())
7038        );
7039
7040        assert_eq!(s.pipe.advance(), Ok(()));
7041
7042        // Server receives just a reset
7043        assert_eq!(s.poll_server(), Ok((stream, Event::Reset(0))));
7044        assert_eq!(s.poll_server(), Err(Error::Done));
7045
7046        // Client sends HEADERS and fin
7047        let (stream, req) = s.send_request(true).unwrap();
7048
7049        // ..then Client sends RESET_STREAM
7050        assert_eq!(
7051            s.pipe.client.stream_shutdown(4, crate::Shutdown::Write, 0),
7052            Ok(())
7053        );
7054
7055        let ev_headers = Event::Headers {
7056            list: req,
7057            more_frames: false,
7058        };
7059
7060        // Server receives headers and fin.
7061        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
7062        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
7063        assert_eq!(s.poll_server(), Err(Error::Done));
7064    }
7065
7066    #[test]
7067    fn reset_finished_at_server_with_data_pending() {
7068        let mut s = Session::new().unwrap();
7069        s.handshake().unwrap();
7070
7071        // Client sends HEADERS and doesn't fin.
7072        let (stream, req) = s.send_request(false).unwrap();
7073
7074        assert!(s.send_body_client(stream, false).is_ok());
7075
7076        assert_eq!(s.pipe.advance(), Ok(()));
7077
7078        let ev_headers = Event::Headers {
7079            list: req,
7080            more_frames: true,
7081        };
7082
7083        // Server receives headers and data...
7084        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
7085        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
7086
7087        // ..then Client sends RESET_STREAM.
7088        assert_eq!(
7089            s.pipe
7090                .client
7091                .stream_shutdown(stream, crate::Shutdown::Write, 0),
7092            Ok(())
7093        );
7094
7095        assert_eq!(s.pipe.advance(), Ok(()));
7096
7097        // Server receives the reset and there are no more readable streams.
7098        assert_eq!(s.poll_server(), Ok((stream, Event::Reset(0))));
7099        assert_eq!(s.poll_server(), Err(Error::Done));
7100        assert_eq!(s.pipe.server.readable().len(), 0);
7101    }
7102
7103    #[test]
7104    fn reset_finished_at_client() {
7105        let mut buf = [0; 65535];
7106        let mut s = Session::new().unwrap();
7107        s.handshake().unwrap();
7108
7109        // Client sends HEADERS and doesn't fin
7110        let (stream, req) = s.send_request(false).unwrap();
7111
7112        let ev_headers = Event::Headers {
7113            list: req,
7114            more_frames: true,
7115        };
7116
7117        // Server receives headers.
7118        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
7119        assert_eq!(s.poll_server(), Err(Error::Done));
7120
7121        // Server sends response and doesn't fin
7122        s.send_response(stream, false).unwrap();
7123
7124        assert_eq!(s.pipe.advance(), Ok(()));
7125
7126        // .. then Server sends RESET_STREAM
7127        assert_eq!(
7128            s.pipe
7129                .server
7130                .stream_shutdown(stream, crate::Shutdown::Write, 0),
7131            Ok(())
7132        );
7133
7134        assert_eq!(s.pipe.advance(), Ok(()));
7135
7136        // Client receives Reset only
7137        assert_eq!(s.poll_client(), Ok((stream, Event::Reset(0))));
7138        assert_eq!(s.poll_server(), Err(Error::Done));
7139
7140        // Client sends headers and fin.
7141        let (stream, req) = s.send_request(true).unwrap();
7142
7143        let ev_headers = Event::Headers {
7144            list: req,
7145            more_frames: false,
7146        };
7147
7148        // Server receives headers and fin.
7149        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
7150        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
7151        assert_eq!(s.poll_server(), Err(Error::Done));
7152
7153        // Server sends response and fin
7154        let resp = s.send_response(stream, true).unwrap();
7155
7156        assert_eq!(s.pipe.advance(), Ok(()));
7157
7158        // ..then Server sends RESET_STREAM
7159        let frames = [crate::frame::Frame::ResetStream {
7160            stream_id: stream,
7161            error_code: 42,
7162            final_size: 68,
7163        }];
7164
7165        let pkt_type = crate::packet::Type::Short;
7166        assert_eq!(
7167            s.pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
7168            Ok(39)
7169        );
7170
7171        assert_eq!(s.pipe.advance(), Ok(()));
7172
7173        let ev_headers = Event::Headers {
7174            list: resp,
7175            more_frames: false,
7176        };
7177
7178        // Client receives headers and fin.
7179        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
7180        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
7181        assert_eq!(s.poll_client(), Err(Error::Done));
7182    }
7183}
7184
7185#[cfg(feature = "ffi")]
7186mod ffi;
7187#[cfg(feature = "internal")]
7188#[doc(hidden)]
7189pub mod frame;
7190#[cfg(not(feature = "internal"))]
7191mod frame;
7192#[doc(hidden)]
7193pub mod qpack;
7194mod stream;