quiche/h3/
mod.rs

1// Copyright (C) 2019, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27//! HTTP/3 wire protocol and QPACK implementation.
28//!
29//! This module provides a high level API for sending and receiving HTTP/3
30//! requests and responses on top of the QUIC transport protocol.
31//!
32//! ## Connection setup
33//!
34//! HTTP/3 connections require a QUIC transport-layer connection, see
35//! [Connection setup] for a full description of the setup process.
36//!
37//! To use HTTP/3, the QUIC connection must be configured with a suitable
38//! Application Layer Protocol Negotiation (ALPN) Protocol ID:
39//!
40//! ```
41//! let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION)?;
42//! config.set_application_protos(quiche::h3::APPLICATION_PROTOCOL)?;
43//! # Ok::<(), quiche::Error>(())
44//! ```
45//!
46//! The QUIC handshake is driven by [sending] and [receiving] QUIC packets.
47//!
48//! Once the handshake has completed, the first step in establishing an HTTP/3
49//! connection is creating its configuration object:
50//!
51//! ```
52//! let h3_config = quiche::h3::Config::new()?;
53//! # Ok::<(), quiche::h3::Error>(())
54//! ```
55//!
56//! HTTP/3 client and server connections are both created using the
57//! [`with_transport()`] function, the role is inferred from the type of QUIC
58//! connection:
59//!
60//! ```no_run
61//! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap();
62//! # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]);
63//! # let peer = "127.0.0.1:1234".parse().unwrap();
64//! # let local = "127.0.0.1:4321".parse().unwrap();
65//! # let mut conn = quiche::accept(&scid, None, local, peer, &mut config).unwrap();
66//! # let h3_config = quiche::h3::Config::new()?;
67//! let h3_conn = quiche::h3::Connection::with_transport(&mut conn, &h3_config)?;
68//! # Ok::<(), quiche::h3::Error>(())
69//! ```
70//!
71//! ## Sending a request
72//!
73//! An HTTP/3 client can send a request by using the connection's
74//! [`send_request()`] method to queue request headers; [sending] QUIC packets
75//! causes the requests to get sent to the peer:
76//!
77//! ```no_run
78//! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap();
79//! # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]);
80//! # let peer = "127.0.0.1:1234".parse().unwrap();
81//! # let local = "127.0.0.1:4321".parse().unwrap();
82//! # let mut conn = quiche::connect(None, &scid, local, peer, &mut config).unwrap();
83//! # let h3_config = quiche::h3::Config::new()?;
84//! # let mut h3_conn = quiche::h3::Connection::with_transport(&mut conn, &h3_config)?;
85//! let req = vec![
86//!     quiche::h3::Header::new(b":method", b"GET"),
87//!     quiche::h3::Header::new(b":scheme", b"https"),
88//!     quiche::h3::Header::new(b":authority", b"quic.tech"),
89//!     quiche::h3::Header::new(b":path", b"/"),
90//!     quiche::h3::Header::new(b"user-agent", b"quiche"),
91//! ];
92//!
93//! h3_conn.send_request(&mut conn, &req, true)?;
94//! # Ok::<(), quiche::h3::Error>(())
95//! ```
96//!
97//! An HTTP/3 client can send a request with additional body data by using
98//! the connection's [`send_body()`] method:
99//!
100//! ```no_run
101//! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap();
102//! # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]);
103//! # let peer = "127.0.0.1:1234".parse().unwrap();
104//! # let local = "127.0.0.1:4321".parse().unwrap();
105//! # let mut conn = quiche::connect(None, &scid, local, peer, &mut config).unwrap();
106//! # let h3_config = quiche::h3::Config::new()?;
107//! # let mut h3_conn = quiche::h3::Connection::with_transport(&mut conn, &h3_config)?;
108//! let req = vec![
109//!     quiche::h3::Header::new(b":method", b"GET"),
110//!     quiche::h3::Header::new(b":scheme", b"https"),
111//!     quiche::h3::Header::new(b":authority", b"quic.tech"),
112//!     quiche::h3::Header::new(b":path", b"/"),
113//!     quiche::h3::Header::new(b"user-agent", b"quiche"),
114//! ];
115//!
116//! let stream_id = h3_conn.send_request(&mut conn, &req, false)?;
117//! h3_conn.send_body(&mut conn, stream_id, b"Hello World!", true)?;
118//! # Ok::<(), quiche::h3::Error>(())
119//! ```
120//!
121//! ## Handling requests and responses
122//!
123//! After [receiving] QUIC packets, HTTP/3 data is processed using the
124//! connection's [`poll()`] method. On success, this returns an [`Event`] object
125//! and an ID corresponding to the stream where the `Event` originated.
126//!
127//! An HTTP/3 server uses [`poll()`] to read requests and responds to them using
128//! [`send_response()`] and [`send_body()`]:
129//!
130//! ```no_run
131//! use quiche::h3::NameValue;
132//!
133//! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap();
134//! # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]);
135//! # let peer = "127.0.0.1:1234".parse().unwrap();
136//! # let local = "127.0.0.1:1234".parse().unwrap();
137//! # let mut conn = quiche::accept(&scid, None, local, peer, &mut config).unwrap();
138//! # let h3_config = quiche::h3::Config::new()?;
139//! # let mut h3_conn = quiche::h3::Connection::with_transport(&mut conn, &h3_config)?;
140//! loop {
141//!     match h3_conn.poll(&mut conn) {
142//!         Ok((stream_id, quiche::h3::Event::Headers{list, more_frames})) => {
143//!             let mut headers = list.into_iter();
144//!
145//!             // Look for the request's method.
146//!             let method = headers.find(|h| h.name() == b":method").unwrap();
147//!
148//!             // Look for the request's path.
149//!             let path = headers.find(|h| h.name() == b":path").unwrap();
150//!
151//!             if method.value() == b"GET" && path.value() == b"/" {
152//!                 let resp = vec![
153//!                     quiche::h3::Header::new(b":status", 200.to_string().as_bytes()),
154//!                     quiche::h3::Header::new(b"server", b"quiche"),
155//!                 ];
156//!
157//!                 h3_conn.send_response(&mut conn, stream_id, &resp, false)?;
158//!                 h3_conn.send_body(&mut conn, stream_id, b"Hello World!", true)?;
159//!             }
160//!         },
161//!
162//!         Ok((stream_id, quiche::h3::Event::Data)) => {
163//!             // Request body data, handle it.
164//!             # return Ok(());
165//!         },
166//!
167//!         Ok((stream_id, quiche::h3::Event::Finished)) => {
168//!             // Peer terminated stream, handle it.
169//!         },
170//!
171//!         Ok((stream_id, quiche::h3::Event::Reset(err))) => {
172//!             // Peer reset the stream, handle it.
173//!         },
174//!
175//!         Ok((_flow_id, quiche::h3::Event::PriorityUpdate)) => (),
176//!
177//!         Ok((goaway_id, quiche::h3::Event::GoAway)) => {
178//!              // Peer signalled it is going away, handle it.
179//!         },
180//!
181//!         Err(quiche::h3::Error::Done) => {
182//!             // Done reading.
183//!             break;
184//!         },
185//!
186//!         Err(e) => {
187//!             // An error occurred, handle it.
188//!             break;
189//!         },
190//!     }
191//! }
192//! # Ok::<(), quiche::h3::Error>(())
193//! ```
194//!
195//! An HTTP/3 client uses [`poll()`] to read responses:
196//!
197//! ```no_run
198//! use quiche::h3::NameValue;
199//!
200//! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap();
201//! # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]);
202//! # let peer = "127.0.0.1:1234".parse().unwrap();
203//! # let local = "127.0.0.1:1234".parse().unwrap();
204//! # let mut conn = quiche::connect(None, &scid, local, peer, &mut config).unwrap();
205//! # let h3_config = quiche::h3::Config::new()?;
206//! # let mut h3_conn = quiche::h3::Connection::with_transport(&mut conn, &h3_config)?;
207//! loop {
208//!     match h3_conn.poll(&mut conn) {
209//!         Ok((stream_id, quiche::h3::Event::Headers{list, more_frames})) => {
210//!             let status = list.iter().find(|h| h.name() == b":status").unwrap();
211//!             println!("Received {} response on stream {}",
212//!                      std::str::from_utf8(status.value()).unwrap(),
213//!                      stream_id);
214//!         },
215//!
216//!         Ok((stream_id, quiche::h3::Event::Data)) => {
217//!             let mut body = vec![0; 4096];
218//!
219//!             // Consume all body data received on the stream.
220//!             while let Ok(read) =
221//!                 h3_conn.recv_body(&mut conn, stream_id, &mut body)
222//!             {
223//!                 println!("Received {} bytes of payload on stream {}",
224//!                          read, stream_id);
225//!             }
226//!         },
227//!
228//!         Ok((stream_id, quiche::h3::Event::Finished)) => {
229//!             // Peer terminated stream, handle it.
230//!         },
231//!
232//!         Ok((stream_id, quiche::h3::Event::Reset(err))) => {
233//!             // Peer reset the stream, handle it.
234//!         },
235//!
236//!         Ok((_prioritized_element_id, quiche::h3::Event::PriorityUpdate)) => (),
237//!
238//!         Ok((goaway_id, quiche::h3::Event::GoAway)) => {
239//!              // Peer signalled it is going away, handle it.
240//!         },
241//!
242//!         Err(quiche::h3::Error::Done) => {
243//!             // Done reading.
244//!             break;
245//!         },
246//!
247//!         Err(e) => {
248//!             // An error occurred, handle it.
249//!             break;
250//!         },
251//!     }
252//! }
253//! # Ok::<(), quiche::h3::Error>(())
254//! ```
255//!
256//! ## Detecting end of request or response
257//!
258//! A single HTTP/3 request or response may consist of several HEADERS and DATA
259//! frames; it is finished when the QUIC stream is closed. Calling [`poll()`]
260//! repeatedly will generate an [`Event`] for each of these. The application may
261//! use these event to do additional HTTP semantic validation.
262//!
263//! ## HTTP/3 protocol errors
264//!
265//! Quiche is responsible for managing the HTTP/3 connection, ensuring it is in
266//! a correct state and validating all messages received by a peer. This mainly
267//! takes place in the [`poll()`] method. If an HTTP/3 error occurs, quiche will
268//! close the connection and send an appropriate CONNECTION_CLOSE frame to the
269//! peer. An [`Error`] is returned to the application so that it can perform any
270//! required tidy up such as closing sockets.
271//!
272//! [`application_proto()`]: ../struct.Connection.html#method.application_proto
273//! [`stream_finished()`]: ../struct.Connection.html#method.stream_finished
274//! [Connection setup]: ../index.html#connection-setup
275//! [sending]: ../index.html#generating-outgoing-packets
276//! [receiving]: ../index.html#handling-incoming-packets
277//! [`with_transport()`]: struct.Connection.html#method.with_transport
278//! [`poll()`]: struct.Connection.html#method.poll
279//! [`Event`]: enum.Event.html
280//! [`Error`]: enum.Error.html
281//! [`send_request()`]: struct.Connection.html#method.send_response
282//! [`send_response()`]: struct.Connection.html#method.send_response
283//! [`send_body()`]: struct.Connection.html#method.send_body
284
285use std::collections::HashSet;
286use std::collections::VecDeque;
287
288#[cfg(feature = "sfv")]
289use std::convert::TryFrom;
290use std::fmt;
291use std::fmt::Write;
292
293#[cfg(feature = "qlog")]
294use qlog::events::h3::H3FrameCreated;
295#[cfg(feature = "qlog")]
296use qlog::events::h3::H3FrameParsed;
297#[cfg(feature = "qlog")]
298use qlog::events::h3::H3Owner;
299#[cfg(feature = "qlog")]
300use qlog::events::h3::H3PriorityTargetStreamType;
301#[cfg(feature = "qlog")]
302use qlog::events::h3::H3StreamType;
303#[cfg(feature = "qlog")]
304use qlog::events::h3::H3StreamTypeSet;
305#[cfg(feature = "qlog")]
306use qlog::events::h3::Http3EventType;
307#[cfg(feature = "qlog")]
308use qlog::events::h3::Http3Frame;
309#[cfg(feature = "qlog")]
310use qlog::events::EventData;
311#[cfg(feature = "qlog")]
312use qlog::events::EventImportance;
313#[cfg(feature = "qlog")]
314use qlog::events::EventType;
315
316use crate::range_buf::BufFactory;
317use crate::BufSplit;
318
319/// List of ALPN tokens of supported HTTP/3 versions.
320///
321/// This can be passed directly to the [`Config::set_application_protos()`]
322/// method when implementing HTTP/3 applications.
323///
324/// [`Config::set_application_protos()`]:
325/// ../struct.Config.html#method.set_application_protos
326pub const APPLICATION_PROTOCOL: &[&[u8]] = &[b"h3"];
327
328// The offset used when converting HTTP/3 urgency to quiche urgency.
329const PRIORITY_URGENCY_OFFSET: u8 = 124;
330
331// Parameter values as specified in [Extensible Priorities].
332//
333// [Extensible Priorities]: https://www.rfc-editor.org/rfc/rfc9218.html#section-4.
334const PRIORITY_URGENCY_LOWER_BOUND: u8 = 0;
335const PRIORITY_URGENCY_UPPER_BOUND: u8 = 7;
336const PRIORITY_URGENCY_DEFAULT: u8 = 3;
337const PRIORITY_INCREMENTAL_DEFAULT: bool = false;
338
339#[cfg(feature = "qlog")]
340const QLOG_FRAME_CREATED: EventType =
341    EventType::Http3EventType(Http3EventType::FrameCreated);
342#[cfg(feature = "qlog")]
343const QLOG_FRAME_PARSED: EventType =
344    EventType::Http3EventType(Http3EventType::FrameParsed);
345#[cfg(feature = "qlog")]
346const QLOG_STREAM_TYPE_SET: EventType =
347    EventType::Http3EventType(Http3EventType::StreamTypeSet);
348
349/// A specialized [`Result`] type for quiche HTTP/3 operations.
350///
351/// This type is used throughout quiche's HTTP/3 public API for any operation
352/// that can produce an error.
353///
354/// [`Result`]: https://doc.rust-lang.org/std/result/enum.Result.html
355pub type Result<T> = std::result::Result<T, Error>;
356
357/// An HTTP/3 error.
358#[derive(Clone, Copy, Debug, PartialEq, Eq)]
359pub enum Error {
360    /// There is no error or no work to do
361    Done,
362
363    /// The provided buffer is too short.
364    BufferTooShort,
365
366    /// Internal error in the HTTP/3 stack.
367    InternalError,
368
369    /// Endpoint detected that the peer is exhibiting behavior that causes.
370    /// excessive load.
371    ExcessiveLoad,
372
373    /// Stream ID or Push ID greater that current maximum was
374    /// used incorrectly, such as exceeding a limit, reducing a limit,
375    /// or being reused.
376    IdError,
377
378    /// The endpoint detected that its peer created a stream that it will not
379    /// accept.
380    StreamCreationError,
381
382    /// A required critical stream was closed.
383    ClosedCriticalStream,
384
385    /// No SETTINGS frame at beginning of control stream.
386    MissingSettings,
387
388    /// A frame was received which is not permitted in the current state.
389    FrameUnexpected,
390
391    /// Frame violated layout or size rules.
392    FrameError,
393
394    /// QPACK Header block decompression failure.
395    QpackDecompressionFailed,
396
397    /// Error originated from the transport layer.
398    TransportError(crate::Error),
399
400    /// The underlying QUIC stream (or connection) doesn't have enough capacity
401    /// for the operation to complete. The application should retry later on.
402    StreamBlocked,
403
404    /// Error in the payload of a SETTINGS frame.
405    SettingsError,
406
407    /// Server rejected request.
408    RequestRejected,
409
410    /// Request or its response cancelled.
411    RequestCancelled,
412
413    /// Client's request stream terminated without containing a full-formed
414    /// request.
415    RequestIncomplete,
416
417    /// An HTTP message was malformed and cannot be processed.
418    MessageError,
419
420    /// The TCP connection established in response to a CONNECT request was
421    /// reset or abnormally closed.
422    ConnectError,
423
424    /// The requested operation cannot be served over HTTP/3. Peer should retry
425    /// over HTTP/1.1.
426    VersionFallback,
427}
428
429/// HTTP/3 error codes sent on the wire.
430///
431/// As defined in [RFC9114](https://www.rfc-editor.org/rfc/rfc9114.html#http-error-codes).
432#[derive(Copy, Clone, Debug, Eq, PartialEq)]
433pub enum WireErrorCode {
434    /// No error. This is used when the connection or stream needs to be closed,
435    /// but there is no error to signal.
436    NoError              = 0x100,
437    /// Peer violated protocol requirements in a way that does not match a more
438    /// specific error code or endpoint declines to use the more specific
439    /// error code.
440    GeneralProtocolError = 0x101,
441    /// An internal error has occurred in the HTTP stack.
442    InternalError        = 0x102,
443    /// The endpoint detected that its peer created a stream that it will not
444    /// accept.
445    StreamCreationError  = 0x103,
446    /// A stream required by the HTTP/3 connection was closed or reset.
447    ClosedCriticalStream = 0x104,
448    /// A frame was received that was not permitted in the current state or on
449    /// the current stream.
450    FrameUnexpected      = 0x105,
451    /// A frame that fails to satisfy layout requirements or with an invalid
452    /// size was received.
453    FrameError           = 0x106,
454    /// The endpoint detected that its peer is exhibiting a behavior that might
455    /// be generating excessive load.
456    ExcessiveLoad        = 0x107,
457    /// A stream ID or push ID was used incorrectly, such as exceeding a limit,
458    /// reducing a limit, or being reused.
459    IdError              = 0x108,
460    /// An endpoint detected an error in the payload of a SETTINGS frame.
461    SettingsError        = 0x109,
462    /// No SETTINGS frame was received at the beginning of the control stream.
463    MissingSettings      = 0x10a,
464    /// A server rejected a request without performing any application
465    /// processing.
466    RequestRejected      = 0x10b,
467    /// The request or its response (including pushed response) is cancelled.
468    RequestCancelled     = 0x10c,
469    /// The client's stream terminated without containing a fully formed
470    /// request.
471    RequestIncomplete    = 0x10d,
472    /// An HTTP message was malformed and cannot be processed.
473    MessageError         = 0x10e,
474    /// The TCP connection established in response to a CONNECT request was
475    /// reset or abnormally closed.
476    ConnectError         = 0x10f,
477    /// The requested operation cannot be served over HTTP/3. The peer should
478    /// retry over HTTP/1.1.
479    VersionFallback      = 0x110,
480}
481
482impl Error {
483    fn to_wire(self) -> u64 {
484        match self {
485            Error::Done => WireErrorCode::NoError as u64,
486            Error::InternalError => WireErrorCode::InternalError as u64,
487            Error::StreamCreationError =>
488                WireErrorCode::StreamCreationError as u64,
489            Error::ClosedCriticalStream =>
490                WireErrorCode::ClosedCriticalStream as u64,
491            Error::FrameUnexpected => WireErrorCode::FrameUnexpected as u64,
492            Error::FrameError => WireErrorCode::FrameError as u64,
493            Error::ExcessiveLoad => WireErrorCode::ExcessiveLoad as u64,
494            Error::IdError => WireErrorCode::IdError as u64,
495            Error::MissingSettings => WireErrorCode::MissingSettings as u64,
496            Error::QpackDecompressionFailed => 0x200,
497            Error::BufferTooShort => 0x999,
498            Error::TransportError { .. } | Error::StreamBlocked => 0xFF,
499            Error::SettingsError => WireErrorCode::SettingsError as u64,
500            Error::RequestRejected => WireErrorCode::RequestRejected as u64,
501            Error::RequestCancelled => WireErrorCode::RequestCancelled as u64,
502            Error::RequestIncomplete => WireErrorCode::RequestIncomplete as u64,
503            Error::MessageError => WireErrorCode::MessageError as u64,
504            Error::ConnectError => WireErrorCode::ConnectError as u64,
505            Error::VersionFallback => WireErrorCode::VersionFallback as u64,
506        }
507    }
508
509    #[cfg(feature = "ffi")]
510    fn to_c(self) -> libc::ssize_t {
511        match self {
512            Error::Done => -1,
513            Error::BufferTooShort => -2,
514            Error::InternalError => -3,
515            Error::ExcessiveLoad => -4,
516            Error::IdError => -5,
517            Error::StreamCreationError => -6,
518            Error::ClosedCriticalStream => -7,
519            Error::MissingSettings => -8,
520            Error::FrameUnexpected => -9,
521            Error::FrameError => -10,
522            Error::QpackDecompressionFailed => -11,
523            // -12 was previously used for TransportError, skip it
524            Error::StreamBlocked => -13,
525            Error::SettingsError => -14,
526            Error::RequestRejected => -15,
527            Error::RequestCancelled => -16,
528            Error::RequestIncomplete => -17,
529            Error::MessageError => -18,
530            Error::ConnectError => -19,
531            Error::VersionFallback => -20,
532
533            Error::TransportError(quic_error) => quic_error.to_c() - 1000,
534        }
535    }
536}
537
538impl fmt::Display for Error {
539    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
540        write!(f, "{self:?}")
541    }
542}
543
544impl std::error::Error for Error {
545    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
546        None
547    }
548}
549
550impl From<super::Error> for Error {
551    fn from(err: super::Error) -> Self {
552        match err {
553            super::Error::Done => Error::Done,
554
555            _ => Error::TransportError(err),
556        }
557    }
558}
559
560impl From<octets::BufferTooShortError> for Error {
561    fn from(_err: octets::BufferTooShortError) -> Self {
562        Error::BufferTooShort
563    }
564}
565
566/// An HTTP/3 configuration.
567pub struct Config {
568    max_field_section_size: Option<u64>,
569    qpack_max_table_capacity: Option<u64>,
570    qpack_blocked_streams: Option<u64>,
571    connect_protocol_enabled: Option<u64>,
572    /// additional settings are settings that are not part of the H3
573    /// settings explicitly handled above
574    additional_settings: Option<Vec<(u64, u64)>>,
575}
576
577impl Config {
578    /// Creates a new configuration object with default settings.
579    pub const fn new() -> Result<Config> {
580        Ok(Config {
581            max_field_section_size: None,
582            qpack_max_table_capacity: None,
583            qpack_blocked_streams: None,
584            connect_protocol_enabled: None,
585            additional_settings: None,
586        })
587    }
588
589    /// Sets the `SETTINGS_MAX_FIELD_SECTION_SIZE` setting.
590    ///
591    /// By default no limit is enforced. When a request whose headers exceed
592    /// the limit set by the application is received, the call to the [`poll()`]
593    /// method will return the [`Error::ExcessiveLoad`] error, and the
594    /// connection will be closed.
595    ///
596    /// [`poll()`]: struct.Connection.html#method.poll
597    /// [`Error::ExcessiveLoad`]: enum.Error.html#variant.ExcessiveLoad
598    pub fn set_max_field_section_size(&mut self, v: u64) {
599        self.max_field_section_size = Some(v);
600    }
601
602    /// Sets the `SETTINGS_QPACK_MAX_TABLE_CAPACITY` setting.
603    ///
604    /// The default value is `0`.
605    pub fn set_qpack_max_table_capacity(&mut self, v: u64) {
606        self.qpack_max_table_capacity = Some(v);
607    }
608
609    /// Sets the `SETTINGS_QPACK_BLOCKED_STREAMS` setting.
610    ///
611    /// The default value is `0`.
612    pub fn set_qpack_blocked_streams(&mut self, v: u64) {
613        self.qpack_blocked_streams = Some(v);
614    }
615
616    /// Sets or omits the `SETTINGS_ENABLE_CONNECT_PROTOCOL` setting.
617    ///
618    /// The default value is `false`.
619    pub fn enable_extended_connect(&mut self, enabled: bool) {
620        if enabled {
621            self.connect_protocol_enabled = Some(1);
622        } else {
623            self.connect_protocol_enabled = None;
624        }
625    }
626
627    /// Sets additional HTTP/3 settings.
628    ///
629    /// The default value is no additional settings.
630    /// The `additional_settings` parameter must not the following
631    /// settings as they are already handled by this library:
632    ///
633    /// - SETTINGS_QPACK_MAX_TABLE_CAPACITY
634    /// - SETTINGS_MAX_FIELD_SECTION_SIZE
635    /// - SETTINGS_QPACK_BLOCKED_STREAMS
636    /// - SETTINGS_ENABLE_CONNECT_PROTOCOL
637    /// - SETTINGS_H3_DATAGRAM
638    ///
639    /// If such a setting is present in the `additional_settings`,
640    /// the method will return the [`Error::SettingsError`] error.
641    ///
642    /// If a setting identifier is present twice in `additional_settings`,
643    /// the method will return the [`Error::SettingsError`] error.
644    ///
645    /// [`Error::SettingsError`]: enum.Error.html#variant.SettingsError
646    pub fn set_additional_settings(
647        &mut self, additional_settings: Vec<(u64, u64)>,
648    ) -> Result<()> {
649        let explicit_quiche_settings = HashSet::from([
650            frame::SETTINGS_QPACK_MAX_TABLE_CAPACITY,
651            frame::SETTINGS_MAX_FIELD_SECTION_SIZE,
652            frame::SETTINGS_QPACK_BLOCKED_STREAMS,
653            frame::SETTINGS_ENABLE_CONNECT_PROTOCOL,
654            frame::SETTINGS_H3_DATAGRAM,
655            frame::SETTINGS_H3_DATAGRAM_00,
656        ]);
657
658        let dedup_settings: HashSet<u64> =
659            additional_settings.iter().map(|(key, _)| *key).collect();
660
661        if dedup_settings.len() != additional_settings.len() ||
662            !explicit_quiche_settings.is_disjoint(&dedup_settings)
663        {
664            return Err(Error::SettingsError);
665        }
666        self.additional_settings = Some(additional_settings);
667        Ok(())
668    }
669}
670
671/// A trait for types with associated string name and value.
672pub trait NameValue {
673    /// Returns the object's name.
674    fn name(&self) -> &[u8];
675
676    /// Returns the object's value.
677    fn value(&self) -> &[u8];
678}
679
680impl<N, V> NameValue for (N, V)
681where
682    N: AsRef<[u8]>,
683    V: AsRef<[u8]>,
684{
685    fn name(&self) -> &[u8] {
686        self.0.as_ref()
687    }
688
689    fn value(&self) -> &[u8] {
690        self.1.as_ref()
691    }
692}
693
694/// An owned name-value pair representing a raw HTTP header.
695#[derive(Clone, PartialEq, Eq)]
696pub struct Header(Vec<u8>, Vec<u8>);
697
698fn try_print_as_readable(hdr: &[u8], f: &mut fmt::Formatter) -> fmt::Result {
699    match std::str::from_utf8(hdr) {
700        Ok(s) => f.write_str(&s.escape_default().to_string()),
701        Err(_) => write!(f, "{hdr:?}"),
702    }
703}
704
705impl fmt::Debug for Header {
706    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
707        f.write_char('"')?;
708        try_print_as_readable(&self.0, f)?;
709        f.write_str(": ")?;
710        try_print_as_readable(&self.1, f)?;
711        f.write_char('"')
712    }
713}
714
715impl Header {
716    /// Creates a new header.
717    ///
718    /// Both `name` and `value` will be cloned.
719    pub fn new(name: &[u8], value: &[u8]) -> Self {
720        Self(name.to_vec(), value.to_vec())
721    }
722}
723
724impl NameValue for Header {
725    fn name(&self) -> &[u8] {
726        &self.0
727    }
728
729    fn value(&self) -> &[u8] {
730        &self.1
731    }
732}
733
734/// A non-owned name-value pair representing a raw HTTP header.
735#[derive(Clone, Debug, PartialEq, Eq)]
736pub struct HeaderRef<'a>(&'a [u8], &'a [u8]);
737
738impl<'a> HeaderRef<'a> {
739    /// Creates a new header.
740    pub const fn new(name: &'a [u8], value: &'a [u8]) -> Self {
741        Self(name, value)
742    }
743}
744
745impl NameValue for HeaderRef<'_> {
746    fn name(&self) -> &[u8] {
747        self.0
748    }
749
750    fn value(&self) -> &[u8] {
751        self.1
752    }
753}
754
755/// An HTTP/3 connection event.
756#[derive(Clone, Debug, PartialEq, Eq)]
757pub enum Event {
758    /// Request/response headers were received.
759    Headers {
760        /// The list of received header fields. The application should validate
761        /// pseudo-headers and headers.
762        list: Vec<Header>,
763
764        /// Whether more frames will follow the headers on the stream.
765        more_frames: bool,
766    },
767
768    /// Data was received.
769    ///
770    /// This indicates that the application can use the [`recv_body()`] method
771    /// to retrieve the data from the stream.
772    ///
773    /// Note that [`recv_body()`] will need to be called repeatedly until the
774    /// [`Done`] value is returned, as the event will not be re-armed until all
775    /// buffered data is read.
776    ///
777    /// [`recv_body()`]: struct.Connection.html#method.recv_body
778    /// [`Done`]: enum.Error.html#variant.Done
779    Data,
780
781    /// Stream was closed,
782    Finished,
783
784    /// Stream was reset.
785    ///
786    /// The associated data represents the error code sent by the peer.
787    Reset(u64),
788
789    /// PRIORITY_UPDATE was received.
790    ///
791    /// This indicates that the application can use the
792    /// [`take_last_priority_update()`] method to take the last received
793    /// PRIORITY_UPDATE for a specified stream.
794    ///
795    /// This event is triggered once per stream until the last PRIORITY_UPDATE
796    /// is taken. It is recommended that applications defer taking the
797    /// PRIORITY_UPDATE until after [`poll()`] returns [`Done`].
798    ///
799    /// [`take_last_priority_update()`]: struct.Connection.html#method.take_last_priority_update
800    /// [`poll()`]: struct.Connection.html#method.poll
801    /// [`Done`]: enum.Error.html#variant.Done
802    PriorityUpdate,
803
804    /// GOAWAY was received.
805    GoAway,
806}
807
808/// Extensible Priorities parameters.
809///
810/// The `TryFrom` trait supports constructing this object from the serialized
811/// Structured Fields Dictionary field value. I.e, use `TryFrom` to parse the
812/// value of a Priority header field or a PRIORITY_UPDATE frame. Using this
813/// trait requires the `sfv` feature to be enabled.
814#[derive(Clone, Copy, Debug, PartialEq, Eq)]
815#[repr(C)]
816pub struct Priority {
817    urgency: u8,
818    incremental: bool,
819}
820
821impl Default for Priority {
822    fn default() -> Self {
823        Priority {
824            urgency: PRIORITY_URGENCY_DEFAULT,
825            incremental: PRIORITY_INCREMENTAL_DEFAULT,
826        }
827    }
828}
829
830impl Priority {
831    /// Creates a new Priority.
832    pub const fn new(urgency: u8, incremental: bool) -> Self {
833        Priority {
834            urgency,
835            incremental,
836        }
837    }
838}
839
840#[cfg(feature = "sfv")]
841#[cfg_attr(docsrs, doc(cfg(feature = "sfv")))]
842impl TryFrom<&[u8]> for Priority {
843    type Error = Error;
844
845    /// Try to parse an Extensible Priority field value.
846    ///
847    /// The field value is expected to be a Structured Fields Dictionary; see
848    /// [Extensible Priorities].
849    ///
850    /// If the `u` or `i` fields are contained with correct types, a constructed
851    /// Priority object is returned. Note that urgency values outside of valid
852    /// range (0 through 7) are clamped to 7.
853    ///
854    /// If the `u` or `i` fields are contained with the wrong types,
855    /// Error::Done is returned.
856    ///
857    /// Omitted parameters will yield default values.
858    ///
859    /// [Extensible Priorities]: https://www.rfc-editor.org/rfc/rfc9218.html#section-4.
860    fn try_from(value: &[u8]) -> std::result::Result<Self, Self::Error> {
861        let dict = match sfv::Parser::parse_dictionary(value) {
862            Ok(v) => v,
863
864            Err(_) => return Err(Error::Done),
865        };
866
867        let urgency = match dict.get("u") {
868            // If there is a u parameter, try to read it as an Item of type
869            // Integer. If the value out of the spec's allowed range
870            // (0 through 7), that's an error so set it to the upper
871            // bound (lowest priority) to avoid interference with
872            // other streams.
873            Some(sfv::ListEntry::Item(item)) => match item.bare_item.as_int() {
874                Some(v) => {
875                    if !(PRIORITY_URGENCY_LOWER_BOUND as i64..=
876                        PRIORITY_URGENCY_UPPER_BOUND as i64)
877                        .contains(&v)
878                    {
879                        PRIORITY_URGENCY_UPPER_BOUND
880                    } else {
881                        v as u8
882                    }
883                },
884
885                None => return Err(Error::Done),
886            },
887
888            Some(sfv::ListEntry::InnerList(_)) => return Err(Error::Done),
889
890            // Omitted so use default value.
891            None => PRIORITY_URGENCY_DEFAULT,
892        };
893
894        let incremental = match dict.get("i") {
895            Some(sfv::ListEntry::Item(item)) =>
896                item.bare_item.as_bool().ok_or(Error::Done)?,
897
898            // Omitted so use default value.
899            _ => false,
900        };
901
902        Ok(Priority::new(urgency, incremental))
903    }
904}
905
906struct ConnectionSettings {
907    pub max_field_section_size: Option<u64>,
908    pub qpack_max_table_capacity: Option<u64>,
909    pub qpack_blocked_streams: Option<u64>,
910    pub connect_protocol_enabled: Option<u64>,
911    pub h3_datagram: Option<u64>,
912    pub additional_settings: Option<Vec<(u64, u64)>>,
913    pub raw: Option<Vec<(u64, u64)>>,
914}
915
916#[derive(Default)]
917struct QpackStreams {
918    pub encoder_stream_id: Option<u64>,
919    pub encoder_stream_bytes: u64,
920    pub decoder_stream_id: Option<u64>,
921    pub decoder_stream_bytes: u64,
922}
923
924/// Statistics about the connection.
925///
926/// A connection's statistics can be collected using the [`stats()`] method.
927///
928/// [`stats()`]: struct.Connection.html#method.stats
929#[derive(Clone, Default)]
930pub struct Stats {
931    /// The number of bytes received on the QPACK encoder stream.
932    pub qpack_encoder_stream_recv_bytes: u64,
933    /// The number of bytes received on the QPACK decoder stream.
934    pub qpack_decoder_stream_recv_bytes: u64,
935}
936
937fn close_conn_critical_stream<F: BufFactory>(
938    conn: &mut super::Connection<F>,
939) -> Result<()> {
940    conn.close(
941        true,
942        Error::ClosedCriticalStream.to_wire(),
943        b"Critical stream closed.",
944    )?;
945
946    Err(Error::ClosedCriticalStream)
947}
948
949fn close_conn_if_critical_stream_finished<F: BufFactory>(
950    conn: &mut super::Connection<F>, stream_id: u64,
951) -> Result<()> {
952    if conn.stream_finished(stream_id) {
953        close_conn_critical_stream(conn)?;
954    }
955
956    Ok(())
957}
958
959/// An HTTP/3 connection.
960pub struct Connection {
961    is_server: bool,
962
963    next_request_stream_id: u64,
964    next_uni_stream_id: u64,
965
966    streams: crate::stream::StreamIdHashMap<stream::Stream>,
967
968    local_settings: ConnectionSettings,
969    peer_settings: ConnectionSettings,
970
971    control_stream_id: Option<u64>,
972    peer_control_stream_id: Option<u64>,
973
974    qpack_encoder: qpack::Encoder,
975    qpack_decoder: qpack::Decoder,
976
977    local_qpack_streams: QpackStreams,
978    peer_qpack_streams: QpackStreams,
979
980    max_push_id: u64,
981
982    finished_streams: VecDeque<u64>,
983
984    frames_greased: bool,
985
986    local_goaway_id: Option<u64>,
987    peer_goaway_id: Option<u64>,
988}
989
990impl Connection {
991    fn new(
992        config: &Config, is_server: bool, enable_dgram: bool,
993    ) -> Result<Connection> {
994        let initial_uni_stream_id = if is_server { 0x3 } else { 0x2 };
995        let h3_datagram = if enable_dgram { Some(1) } else { None };
996
997        Ok(Connection {
998            is_server,
999
1000            next_request_stream_id: 0,
1001
1002            next_uni_stream_id: initial_uni_stream_id,
1003
1004            streams: Default::default(),
1005
1006            local_settings: ConnectionSettings {
1007                max_field_section_size: config.max_field_section_size,
1008                qpack_max_table_capacity: config.qpack_max_table_capacity,
1009                qpack_blocked_streams: config.qpack_blocked_streams,
1010                connect_protocol_enabled: config.connect_protocol_enabled,
1011                h3_datagram,
1012                additional_settings: config.additional_settings.clone(),
1013                raw: Default::default(),
1014            },
1015
1016            peer_settings: ConnectionSettings {
1017                max_field_section_size: None,
1018                qpack_max_table_capacity: None,
1019                qpack_blocked_streams: None,
1020                h3_datagram: None,
1021                connect_protocol_enabled: None,
1022                additional_settings: Default::default(),
1023                raw: Default::default(),
1024            },
1025
1026            control_stream_id: None,
1027            peer_control_stream_id: None,
1028
1029            qpack_encoder: qpack::Encoder::new(),
1030            qpack_decoder: qpack::Decoder::new(),
1031
1032            local_qpack_streams: Default::default(),
1033            peer_qpack_streams: Default::default(),
1034
1035            max_push_id: 0,
1036
1037            finished_streams: VecDeque::new(),
1038
1039            frames_greased: false,
1040
1041            local_goaway_id: None,
1042            peer_goaway_id: None,
1043        })
1044    }
1045
1046    /// Creates a new HTTP/3 connection using the provided QUIC connection.
1047    ///
1048    /// This will also initiate the HTTP/3 handshake with the peer by opening
1049    /// all control streams (including QPACK) and sending the local settings.
1050    ///
1051    /// On success the new connection is returned.
1052    ///
1053    /// The [`StreamLimit`] error is returned when the HTTP/3 control stream
1054    /// cannot be created due to stream limits.
1055    ///
1056    /// The [`InternalError`] error is returned when either the underlying QUIC
1057    /// connection is not in a suitable state, or the HTTP/3 control stream
1058    /// cannot be created due to flow control limits.
1059    ///
1060    /// [`StreamLimit`]: ../enum.Error.html#variant.StreamLimit
1061    /// [`InternalError`]: ../enum.Error.html#variant.InternalError
1062    pub fn with_transport<F: BufFactory>(
1063        conn: &mut super::Connection<F>, config: &Config,
1064    ) -> Result<Connection> {
1065        let is_client = !conn.is_server;
1066        if is_client && !(conn.is_established() || conn.is_in_early_data()) {
1067            trace!("{} QUIC connection must be established or in early data before creating an HTTP/3 connection", conn.trace_id());
1068            return Err(Error::InternalError);
1069        }
1070
1071        let mut http3_conn =
1072            Connection::new(config, conn.is_server, conn.dgram_enabled())?;
1073
1074        match http3_conn.send_settings(conn) {
1075            Ok(_) => (),
1076
1077            Err(e) => {
1078                conn.close(true, e.to_wire(), b"Error opening control stream")?;
1079                return Err(e);
1080            },
1081        };
1082
1083        // Try opening QPACK streams, but ignore errors if it fails since we
1084        // don't need them right now.
1085        http3_conn.open_qpack_encoder_stream(conn).ok();
1086        http3_conn.open_qpack_decoder_stream(conn).ok();
1087
1088        if conn.grease {
1089            // Try opening a GREASE stream, but ignore errors since it's not
1090            // critical.
1091            http3_conn.open_grease_stream(conn).ok();
1092        }
1093
1094        Ok(http3_conn)
1095    }
1096
1097    /// Sends an HTTP/3 request.
1098    ///
1099    /// The request is encoded from the provided list of headers without a
1100    /// body, and sent on a newly allocated stream. To include a body,
1101    /// set `fin` as `false` and subsequently call [`send_body()`] with the
1102    /// same `conn` and the `stream_id` returned from this method.
1103    ///
1104    /// On success the newly allocated stream ID is returned.
1105    ///
1106    /// The [`StreamBlocked`] error is returned when the underlying QUIC stream
1107    /// doesn't have enough capacity for the operation to complete. When this
1108    /// happens the application should retry the operation once the stream is
1109    /// reported as writable again.
1110    ///
1111    /// [`send_body()`]: struct.Connection.html#method.send_body
1112    /// [`StreamBlocked`]: enum.Error.html#variant.StreamBlocked
1113    pub fn send_request<T: NameValue, F: BufFactory>(
1114        &mut self, conn: &mut super::Connection<F>, headers: &[T], fin: bool,
1115    ) -> Result<u64> {
1116        // If we received a GOAWAY from the peer, MUST NOT initiate new
1117        // requests.
1118        if self.peer_goaway_id.is_some() {
1119            return Err(Error::FrameUnexpected);
1120        }
1121
1122        let stream_id = self.next_request_stream_id;
1123
1124        self.streams
1125            .insert(stream_id, <stream::Stream>::new(stream_id, true));
1126
1127        // The underlying QUIC stream does not exist yet, so calls to e.g.
1128        // stream_capacity() will fail. By writing a 0-length buffer, we force
1129        // the creation of the QUIC stream state, without actually writing
1130        // anything.
1131        if let Err(e) = conn.stream_send(stream_id, b"", false) {
1132            self.streams.remove(&stream_id);
1133
1134            if e == super::Error::Done {
1135                return Err(Error::StreamBlocked);
1136            }
1137
1138            return Err(e.into());
1139        };
1140
1141        self.send_headers(conn, stream_id, headers, fin)?;
1142
1143        // To avoid skipping stream IDs, we only calculate the next available
1144        // stream ID when a request has been successfully buffered.
1145        self.next_request_stream_id = self
1146            .next_request_stream_id
1147            .checked_add(4)
1148            .ok_or(Error::IdError)?;
1149
1150        Ok(stream_id)
1151    }
1152
1153    /// Sends an HTTP/3 response on the specified stream with default priority.
1154    ///
1155    /// This method sends the provided `headers` as a single initial response
1156    /// without a body.
1157    ///
1158    /// To send a non-final 1xx, then a final 200+ without body:
1159    ///   * send_response() with `fin` set to `false`.
1160    ///   * [`send_additional_headers()`] with fin set to `true` using the same
1161    ///     `stream_id` value.
1162    ///
1163    /// To send a non-final 1xx, then a final 200+ with body:
1164    ///   * send_response() with `fin` set to `false`.
1165    ///   * [`send_additional_headers()`] with fin set to `false` and same
1166    ///     `stream_id` value.
1167    ///   * [`send_body()`] with same `stream_id`.
1168    ///
1169    /// To send a final 200+ with body:
1170    ///   * send_response() with `fin` set to `false`.
1171    ///   * [`send_body()`] with same `stream_id`.
1172    ///
1173    /// Additional headers can only be sent during certain phases of an HTTP/3
1174    /// message exchange, see [Section 4.1 of RFC 9114]. The [`FrameUnexpected`]
1175    /// error is returned if this method, or [`send_response_with_priority()`],
1176    /// are called multiple times with the same `stream_id` value.
1177    ///
1178    /// The [`StreamBlocked`] error is returned when the underlying QUIC stream
1179    /// doesn't have enough capacity for the operation to complete. When this
1180    /// happens the application should retry the operation once the stream is
1181    /// reported as writable again.
1182    ///
1183    /// [`send_body()`]: struct.Connection.html#method.send_body
1184    /// [`send_additional_headers()`]:
1185    ///     struct.Connection.html#method.send_additional_headers
1186    /// [`send_response_with_priority()`]:
1187    ///     struct.Connection.html#method.send_response_with_priority
1188    /// [`FrameUnexpected`]: enum.Error.html#variant.FrameUnexpected
1189    /// [`StreamBlocked`]: enum.Error.html#variant.StreamBlocked
1190    pub fn send_response<T: NameValue, F: BufFactory>(
1191        &mut self, conn: &mut super::Connection<F>, stream_id: u64,
1192        headers: &[T], fin: bool,
1193    ) -> Result<()> {
1194        let priority = Default::default();
1195
1196        self.send_response_with_priority(
1197            conn, stream_id, headers, &priority, fin,
1198        )?;
1199
1200        Ok(())
1201    }
1202
1203    /// Sends an HTTP/3 response on the specified stream with specified
1204    /// priority.
1205    ///
1206    /// This method sends the provided `headers` as a single initial response
1207    /// without a body.
1208    ///
1209    /// To send a non-final 1xx, then a final 200+ without body:
1210    ///   * send_response_with_priority() with `fin` set to `false`.
1211    ///   * [`send_additional_headers()`] with fin set to `true` using the same
1212    ///     `stream_id` value.
1213    ///
1214    /// To send a non-final 1xx, then a final 200+ with body:
1215    ///   * send_response_with_priority() with `fin` set to `false`.
1216    ///   * [`send_additional_headers()`] with fin set to `false` and same
1217    ///     `stream_id` value.
1218    ///   * [`send_body()`] with same `stream_id`.
1219    ///
1220    /// To send a final 200+ with body:
1221    ///   * send_response_with_priority() with `fin` set to `false`.
1222    ///   * [`send_body()`] with same `stream_id`.
1223    ///
1224    /// The `priority` parameter represents [Extensible Priority]
1225    /// parameters. If the urgency is outside the range 0-7, it will be clamped
1226    /// to 7.
1227    ///
1228    /// Additional headers can only be sent during certain phases of an HTTP/3
1229    /// message exchange, see [Section 4.1 of RFC 9114]. The [`FrameUnexpected`]
1230    /// error is returned if this method, or [`send_response()`],
1231    /// are called multiple times with the same `stream_id` value.
1232    ///
1233    /// The [`StreamBlocked`] error is returned when the underlying QUIC stream
1234    /// doesn't have enough capacity for the operation to complete. When this
1235    /// happens the application should retry the operation once the stream is
1236    /// reported as writable again.
1237    ///
1238    /// [`send_body()`]: struct.Connection.html#method.send_body
1239    /// [`send_additional_headers()`]:
1240    ///     struct.Connection.html#method.send_additional_headers
1241    /// [`send_response()`]:
1242    ///     struct.Connection.html#method.send_response
1243    /// [`FrameUnexpected`]: enum.Error.html#variant.FrameUnexpected
1244    /// [`StreamBlocked`]: enum.Error.html#variant.StreamBlocked
1245    /// [Extensible Priority]: https://www.rfc-editor.org/rfc/rfc9218.html#section-4.
1246    pub fn send_response_with_priority<T: NameValue, F: BufFactory>(
1247        &mut self, conn: &mut super::Connection<F>, stream_id: u64,
1248        headers: &[T], priority: &Priority, fin: bool,
1249    ) -> Result<()> {
1250        match self.streams.get(&stream_id) {
1251            Some(s) => {
1252                // Only one initial HEADERS allowed.
1253                if s.local_initialized() {
1254                    return Err(Error::FrameUnexpected);
1255                }
1256
1257                s
1258            },
1259
1260            None => return Err(Error::FrameUnexpected),
1261        };
1262
1263        self.send_headers(conn, stream_id, headers, fin)?;
1264
1265        // Clamp and shift urgency into quiche-priority space
1266        let urgency = priority
1267            .urgency
1268            .clamp(PRIORITY_URGENCY_LOWER_BOUND, PRIORITY_URGENCY_UPPER_BOUND) +
1269            PRIORITY_URGENCY_OFFSET;
1270
1271        conn.stream_priority(stream_id, urgency, priority.incremental)?;
1272
1273        Ok(())
1274    }
1275
1276    /// Sends additional HTTP/3 headers.
1277    ///
1278    /// After the initial request or response headers have been sent, using
1279    /// [`send_request()`] or [`send_response()`] respectively, this method can
1280    /// be used send an additional HEADERS frame. For example, to send a single
1281    /// instance of trailers after a request with a body, or to issue another
1282    /// non-final 1xx after a preceding 1xx, or to issue a final response after
1283    /// a preceding 1xx.
1284    ///
1285    /// Additional headers can only be sent during certain phases of an HTTP/3
1286    /// message exchange, see [Section 4.1 of RFC 9114]. The [`FrameUnexpected`]
1287    /// error is returned when this method is called during the wrong phase,
1288    /// such as before initial headers have been sent, or if trailers have
1289    /// already been sent.
1290    ///
1291    /// The [`StreamBlocked`] error is returned when the underlying QUIC stream
1292    /// doesn't have enough capacity for the operation to complete. When this
1293    /// happens the application should retry the operation once the stream is
1294    /// reported as writable again.
1295    ///
1296    /// [`send_request()`]: struct.Connection.html#method.send_request
1297    /// [`send_response()`]: struct.Connection.html#method.send_response
1298    /// [`StreamBlocked`]: enum.Error.html#variant.StreamBlocked
1299    /// [`FrameUnexpected`]: enum.Error.html#variant.FrameUnexpected
1300    /// [Section 4.1 of RFC 9114]:
1301    ///     https://www.rfc-editor.org/rfc/rfc9114.html#section-4.1.
1302    pub fn send_additional_headers<T: NameValue, F: BufFactory>(
1303        &mut self, conn: &mut super::Connection<F>, stream_id: u64,
1304        headers: &[T], is_trailer_section: bool, fin: bool,
1305    ) -> Result<()> {
1306        // Clients can only send trailer headers.
1307        if !self.is_server && !is_trailer_section {
1308            return Err(Error::FrameUnexpected);
1309        }
1310
1311        match self.streams.get(&stream_id) {
1312            Some(s) => {
1313                // Initial HEADERS must have been sent.
1314                if !s.local_initialized() {
1315                    return Err(Error::FrameUnexpected);
1316                }
1317
1318                // Only one trailing HEADERS allowed.
1319                if s.trailers_sent() {
1320                    return Err(Error::FrameUnexpected);
1321                }
1322
1323                s
1324            },
1325
1326            None => return Err(Error::FrameUnexpected),
1327        };
1328
1329        self.send_headers(conn, stream_id, headers, fin)?;
1330
1331        if is_trailer_section {
1332            // send_headers() might have tidied the stream away, so we need to
1333            // check again.
1334            if let Some(s) = self.streams.get_mut(&stream_id) {
1335                s.mark_trailers_sent();
1336            }
1337        }
1338
1339        Ok(())
1340    }
1341
1342    /// Sends additional HTTP/3 headers with specified priority.
1343    ///
1344    /// After the initial request or response headers have been sent, using
1345    /// [`send_request()`] or [`send_response()`] respectively, this method can
1346    /// be used send an additional HEADERS frame. For example, to send a single
1347    /// instance of trailers after a request with a body, or to issue another
1348    /// non-final 1xx after a preceding 1xx, or to issue a final response after
1349    /// a preceding 1xx.
1350    ///
1351    /// The `priority` parameter represents [Extensible Priority]
1352    /// parameters. If the urgency is outside the range 0-7, it will be clamped
1353    /// to 7.
1354    ///
1355    /// Additional headers can only be sent during certain phases of an HTTP/3
1356    /// message exchange, see [Section 4.1 of RFC 9114]. The [`FrameUnexpected`]
1357    /// error is returned when this method is called during the wrong phase,
1358    /// such as before initial headers have been sent, or if trailers have
1359    /// already been sent.
1360    ///
1361    /// The [`StreamBlocked`] error is returned when the underlying QUIC stream
1362    /// doesn't have enough capacity for the operation to complete. When this
1363    /// happens the application should retry the operation once the stream is
1364    /// reported as writable again.
1365    ///
1366    /// [`send_request()`]: struct.Connection.html#method.send_request
1367    /// [`send_response()`]: struct.Connection.html#method.send_response
1368    /// [`StreamBlocked`]: enum.Error.html#variant.StreamBlocked
1369    /// [`FrameUnexpected`]: enum.Error.html#variant.FrameUnexpected
1370    /// [Section 4.1 of RFC 9114]:
1371    ///     https://www.rfc-editor.org/rfc/rfc9114.html#section-4.1.
1372    /// [Extensible Priority]: https://www.rfc-editor.org/rfc/rfc9218.html#section-4.
1373    pub fn send_additional_headers_with_priority<T: NameValue, F: BufFactory>(
1374        &mut self, conn: &mut super::Connection<F>, stream_id: u64,
1375        headers: &[T], priority: &Priority, is_trailer_section: bool, fin: bool,
1376    ) -> Result<()> {
1377        self.send_additional_headers(
1378            conn,
1379            stream_id,
1380            headers,
1381            is_trailer_section,
1382            fin,
1383        )?;
1384
1385        // Clamp and shift urgency into quiche-priority space
1386        let urgency = priority
1387            .urgency
1388            .clamp(PRIORITY_URGENCY_LOWER_BOUND, PRIORITY_URGENCY_UPPER_BOUND) +
1389            PRIORITY_URGENCY_OFFSET;
1390
1391        conn.stream_priority(stream_id, urgency, priority.incremental)?;
1392
1393        Ok(())
1394    }
1395
1396    fn encode_header_block<T: NameValue>(
1397        &mut self, headers: &[T],
1398    ) -> Result<Vec<u8>> {
1399        let headers_len = headers
1400            .iter()
1401            .fold(0, |acc, h| acc + h.value().len() + h.name().len() + 32);
1402
1403        let mut header_block = vec![0; headers_len];
1404        let len = self
1405            .qpack_encoder
1406            .encode(headers, &mut header_block)
1407            .map_err(|_| Error::InternalError)?;
1408
1409        header_block.truncate(len);
1410
1411        Ok(header_block)
1412    }
1413
1414    fn send_headers<T: NameValue, F: BufFactory>(
1415        &mut self, conn: &mut super::Connection<F>, stream_id: u64,
1416        headers: &[T], fin: bool,
1417    ) -> Result<()> {
1418        let mut d = [42; 10];
1419        let mut b = octets::OctetsMut::with_slice(&mut d);
1420
1421        if !self.frames_greased && conn.grease {
1422            self.send_grease_frames(conn, stream_id)?;
1423            self.frames_greased = true;
1424        }
1425
1426        let header_block = self.encode_header_block(headers)?;
1427
1428        let overhead = octets::varint_len(frame::HEADERS_FRAME_TYPE_ID) +
1429            octets::varint_len(header_block.len() as u64);
1430
1431        // Headers need to be sent atomically, so make sure the stream has
1432        // enough capacity.
1433        match conn.stream_writable(stream_id, overhead + header_block.len()) {
1434            Ok(true) => (),
1435
1436            Ok(false) => return Err(Error::StreamBlocked),
1437
1438            Err(e) => {
1439                if conn.stream_finished(stream_id) {
1440                    self.streams.remove(&stream_id);
1441                }
1442
1443                return Err(e.into());
1444            },
1445        };
1446
1447        b.put_varint(frame::HEADERS_FRAME_TYPE_ID)?;
1448        b.put_varint(header_block.len() as u64)?;
1449        let off = b.off();
1450        conn.stream_send(stream_id, &d[..off], false)?;
1451
1452        // Sending header block separately avoids unnecessary copy.
1453        conn.stream_send(stream_id, &header_block, fin)?;
1454
1455        trace!(
1456            "{} tx frm HEADERS stream={} len={} fin={}",
1457            conn.trace_id(),
1458            stream_id,
1459            header_block.len(),
1460            fin
1461        );
1462
1463        qlog_with_type!(QLOG_FRAME_CREATED, conn.qlog, q, {
1464            let qlog_headers = headers
1465                .iter()
1466                .map(|h| qlog::events::h3::HttpHeader {
1467                    name: String::from_utf8_lossy(h.name()).into_owned(),
1468                    value: String::from_utf8_lossy(h.value()).into_owned(),
1469                })
1470                .collect();
1471
1472            let frame = Http3Frame::Headers {
1473                headers: qlog_headers,
1474            };
1475            let ev_data = EventData::H3FrameCreated(H3FrameCreated {
1476                stream_id,
1477                length: Some(header_block.len() as u64),
1478                frame,
1479                ..Default::default()
1480            });
1481
1482            q.add_event_data_now(ev_data).ok();
1483        });
1484
1485        if let Some(s) = self.streams.get_mut(&stream_id) {
1486            s.initialize_local();
1487        }
1488
1489        if fin && conn.stream_finished(stream_id) {
1490            self.streams.remove(&stream_id);
1491        }
1492
1493        Ok(())
1494    }
1495
1496    /// Sends an HTTP/3 body chunk on the given stream.
1497    ///
1498    /// On success the number of bytes written is returned, or [`Done`] if no
1499    /// bytes could be written (e.g. because the stream is blocked).
1500    ///
1501    /// Note that the number of written bytes returned can be lower than the
1502    /// length of the input buffer when the underlying QUIC stream doesn't have
1503    /// enough capacity for the operation to complete.
1504    ///
1505    /// When a partial write happens (including when [`Done`] is returned) the
1506    /// application should retry the operation once the stream is reported as
1507    /// writable again.
1508    ///
1509    /// [`Done`]: enum.Error.html#variant.Done
1510    pub fn send_body<F: BufFactory>(
1511        &mut self, conn: &mut super::Connection<F>, stream_id: u64, body: &[u8],
1512        fin: bool,
1513    ) -> Result<usize> {
1514        self.do_send_body(
1515            conn,
1516            stream_id,
1517            body,
1518            fin,
1519            |conn: &mut super::Connection<F>,
1520             header: &[u8],
1521             stream_id: u64,
1522             body: &[u8],
1523             body_len: usize,
1524             fin: bool| {
1525                conn.stream_send(stream_id, header, false)?;
1526                Ok(conn
1527                    .stream_send(stream_id, &body[..body_len], fin)
1528                    .map(|v| (v, v))?)
1529            },
1530        )
1531    }
1532
1533    /// Sends an HTTP/3 body chunk provided as a raw buffer on the given stream.
1534    ///
1535    /// If the capacity allows it the buffer will be appended to the stream's
1536    /// send queue with zero copying.
1537    ///
1538    /// On success the number of bytes written is returned, or [`Done`] if no
1539    /// bytes could be written (e.g. because the stream is blocked).
1540    ///
1541    /// Note that the number of written bytes returned can be lower than the
1542    /// length of the input buffer when the underlying QUIC stream doesn't have
1543    /// enough capacity for the operation to complete.
1544    ///
1545    /// When a partial write happens (including when [`Done`] is returned) the
1546    /// remaining (unwrittent) buffer will also be returned. The application
1547    /// should retry the operation once the stream is reported as writable
1548    /// again.
1549    ///
1550    /// [`Done`]: enum.Error.html#variant.Done
1551    pub fn send_body_zc<F>(
1552        &mut self, conn: &mut super::Connection<F>, stream_id: u64,
1553        body: &mut F::Buf, fin: bool,
1554    ) -> Result<usize>
1555    where
1556        F: BufFactory,
1557        F::Buf: BufSplit,
1558    {
1559        self.do_send_body(
1560            conn,
1561            stream_id,
1562            body,
1563            fin,
1564            |conn: &mut super::Connection<F>,
1565             header: &[u8],
1566             stream_id: u64,
1567             body: &mut F::Buf,
1568             mut body_len: usize,
1569             fin: bool| {
1570                let with_prefix = body.try_add_prefix(header);
1571                if !with_prefix {
1572                    conn.stream_send(stream_id, header, false)?;
1573                } else {
1574                    body_len += header.len();
1575                }
1576
1577                let (mut n, rem) = conn.stream_send_zc(
1578                    stream_id,
1579                    body.clone(),
1580                    Some(body_len),
1581                    fin,
1582                )?;
1583
1584                if with_prefix {
1585                    n -= header.len();
1586                }
1587
1588                if let Some(rem) = rem {
1589                    let _ = std::mem::replace(body, rem);
1590                }
1591
1592                Ok((n, n))
1593            },
1594        )
1595    }
1596
1597    fn do_send_body<F, B, R, SND>(
1598        &mut self, conn: &mut super::Connection<F>, stream_id: u64, body: B,
1599        fin: bool, write_fn: SND,
1600    ) -> Result<R>
1601    where
1602        F: BufFactory,
1603        B: AsRef<[u8]>,
1604        SND: FnOnce(
1605            &mut super::Connection<F>,
1606            &[u8],
1607            u64,
1608            B,
1609            usize,
1610            bool,
1611        ) -> Result<(usize, R)>,
1612    {
1613        let mut d = [42; 10];
1614        let mut b = octets::OctetsMut::with_slice(&mut d);
1615
1616        let len = body.as_ref().len();
1617
1618        // Validate that it is sane to send data on the stream.
1619        if stream_id % 4 != 0 {
1620            return Err(Error::FrameUnexpected);
1621        }
1622
1623        match self.streams.get_mut(&stream_id) {
1624            Some(s) => {
1625                if !s.local_initialized() {
1626                    return Err(Error::FrameUnexpected);
1627                }
1628
1629                if s.trailers_sent() {
1630                    return Err(Error::FrameUnexpected);
1631                }
1632            },
1633
1634            None => {
1635                return Err(Error::FrameUnexpected);
1636            },
1637        };
1638
1639        // Avoid sending 0-length DATA frames when the fin flag is false.
1640        if len == 0 && !fin {
1641            return Err(Error::Done);
1642        }
1643
1644        let overhead = octets::varint_len(frame::DATA_FRAME_TYPE_ID) +
1645            octets::varint_len(len as u64);
1646
1647        let stream_cap = match conn.stream_capacity(stream_id) {
1648            Ok(v) => v,
1649
1650            Err(e) => {
1651                if conn.stream_finished(stream_id) {
1652                    self.streams.remove(&stream_id);
1653                }
1654
1655                return Err(e.into());
1656            },
1657        };
1658
1659        // Make sure there is enough capacity to send the DATA frame header.
1660        if stream_cap < overhead {
1661            let _ = conn.stream_writable(stream_id, overhead + 1);
1662            return Err(Error::Done);
1663        }
1664
1665        // Cap the frame payload length to the stream's capacity.
1666        let body_len = std::cmp::min(len, stream_cap - overhead);
1667
1668        // If we can't send the entire body, set the fin flag to false so the
1669        // application can try again later.
1670        let fin = if body_len != len { false } else { fin };
1671
1672        // Again, avoid sending 0-length DATA frames when the fin flag is false.
1673        if body_len == 0 && !fin {
1674            let _ = conn.stream_writable(stream_id, overhead + 1);
1675            return Err(Error::Done);
1676        }
1677
1678        b.put_varint(frame::DATA_FRAME_TYPE_ID)?;
1679        b.put_varint(body_len as u64)?;
1680        let off = b.off();
1681
1682        // Return how many bytes were written, excluding the frame header.
1683        // Sending body separately avoids unnecessary copy.
1684        let (written, ret) =
1685            write_fn(conn, &d[..off], stream_id, body, body_len, fin)?;
1686
1687        trace!(
1688            "{} tx frm DATA stream={} len={} fin={}",
1689            conn.trace_id(),
1690            stream_id,
1691            written,
1692            fin
1693        );
1694
1695        qlog_with_type!(QLOG_FRAME_CREATED, conn.qlog, q, {
1696            let frame = Http3Frame::Data { raw: None };
1697            let ev_data = EventData::H3FrameCreated(H3FrameCreated {
1698                stream_id,
1699                length: Some(written as u64),
1700                frame,
1701                ..Default::default()
1702            });
1703
1704            q.add_event_data_now(ev_data).ok();
1705        });
1706
1707        if written < len {
1708            // Ensure the peer is notified that the connection or stream is
1709            // blocked when the stream's capacity is limited by flow control.
1710            //
1711            // We only need enough capacity to send a few bytes, to make sure
1712            // the stream doesn't hang due to congestion window not growing
1713            // enough.
1714            let _ = conn.stream_writable(stream_id, overhead + 1);
1715        }
1716
1717        if fin && written == len && conn.stream_finished(stream_id) {
1718            self.streams.remove(&stream_id);
1719        }
1720
1721        Ok(ret)
1722    }
1723
1724    /// Returns whether the peer enabled HTTP/3 DATAGRAM frame support.
1725    ///
1726    /// Support is signalled by the peer's SETTINGS, so this method always
1727    /// returns false until they have been processed using the [`poll()`]
1728    /// method.
1729    ///
1730    /// [`poll()`]: struct.Connection.html#method.poll
1731    pub fn dgram_enabled_by_peer<F: BufFactory>(
1732        &self, conn: &super::Connection<F>,
1733    ) -> bool {
1734        self.peer_settings.h3_datagram == Some(1) &&
1735            conn.dgram_max_writable_len().is_some()
1736    }
1737
1738    /// Returns whether the peer enabled extended CONNECT support.
1739    ///
1740    /// Support is signalled by the peer's SETTINGS, so this method always
1741    /// returns false until they have been processed using the [`poll()`]
1742    /// method.
1743    ///
1744    /// [`poll()`]: struct.Connection.html#method.poll
1745    pub fn extended_connect_enabled_by_peer(&self) -> bool {
1746        self.peer_settings.connect_protocol_enabled == Some(1)
1747    }
1748
1749    /// Reads request or response body data into the provided buffer.
1750    ///
1751    /// Applications should call this method whenever the [`poll()`] method
1752    /// returns a [`Data`] event.
1753    ///
1754    /// On success the amount of bytes read is returned, or [`Done`] if there
1755    /// is no data to read.
1756    ///
1757    /// [`poll()`]: struct.Connection.html#method.poll
1758    /// [`Data`]: enum.Event.html#variant.Data
1759    /// [`Done`]: enum.Error.html#variant.Done
1760    pub fn recv_body<F: BufFactory>(
1761        &mut self, conn: &mut super::Connection<F>, stream_id: u64,
1762        out: &mut [u8],
1763    ) -> Result<usize> {
1764        let mut total = 0;
1765
1766        // Try to consume all buffered data for the stream, even across multiple
1767        // DATA frames.
1768        while total < out.len() {
1769            let stream = self.streams.get_mut(&stream_id).ok_or(Error::Done)?;
1770
1771            if stream.state() != stream::State::Data {
1772                break;
1773            }
1774
1775            let (read, fin) =
1776                match stream.try_consume_data(conn, &mut out[total..]) {
1777                    Ok(v) => v,
1778
1779                    Err(Error::Done) => break,
1780
1781                    Err(e) => return Err(e),
1782                };
1783
1784            total += read;
1785
1786            // No more data to read, we are done.
1787            if read == 0 || fin {
1788                break;
1789            }
1790
1791            // Process incoming data from the stream. For example, if a whole
1792            // DATA frame was consumed, and another one is queued behind it,
1793            // this will ensure the additional data will also be returned to
1794            // the application.
1795            match self.process_readable_stream(conn, stream_id, false) {
1796                Ok(_) => unreachable!(),
1797
1798                Err(Error::Done) => (),
1799
1800                Err(e) => return Err(e),
1801            };
1802
1803            if conn.stream_finished(stream_id) {
1804                break;
1805            }
1806        }
1807
1808        // While body is being received, the stream is marked as finished only
1809        // when all data is read by the application.
1810        if conn.stream_finished(stream_id) {
1811            self.process_finished_stream(stream_id);
1812        }
1813
1814        if total == 0 {
1815            return Err(Error::Done);
1816        }
1817
1818        Ok(total)
1819    }
1820
1821    /// Sends a PRIORITY_UPDATE frame on the control stream with specified
1822    /// request stream ID and priority.
1823    ///
1824    /// The `priority` parameter represents [Extensible Priority]
1825    /// parameters. If the urgency is outside the range 0-7, it will be clamped
1826    /// to 7.
1827    ///
1828    /// The [`StreamBlocked`] error is returned when the underlying QUIC stream
1829    /// doesn't have enough capacity for the operation to complete. When this
1830    /// happens the application should retry the operation once the stream is
1831    /// reported as writable again.
1832    ///
1833    /// [`StreamBlocked`]: enum.Error.html#variant.StreamBlocked
1834    /// [Extensible Priority]: https://www.rfc-editor.org/rfc/rfc9218.html#section-4.
1835    pub fn send_priority_update_for_request<F: BufFactory>(
1836        &mut self, conn: &mut super::Connection<F>, stream_id: u64,
1837        priority: &Priority,
1838    ) -> Result<()> {
1839        let mut d = [42; 20];
1840        let mut b = octets::OctetsMut::with_slice(&mut d);
1841
1842        // Validate that it is sane to send PRIORITY_UPDATE.
1843        if self.is_server {
1844            return Err(Error::FrameUnexpected);
1845        }
1846
1847        if stream_id % 4 != 0 {
1848            return Err(Error::FrameUnexpected);
1849        }
1850
1851        let control_stream_id =
1852            self.control_stream_id.ok_or(Error::FrameUnexpected)?;
1853
1854        let urgency = priority
1855            .urgency
1856            .clamp(PRIORITY_URGENCY_LOWER_BOUND, PRIORITY_URGENCY_UPPER_BOUND);
1857
1858        let mut field_value = format!("u={urgency}");
1859
1860        if priority.incremental {
1861            field_value.push_str(",i");
1862        }
1863
1864        let priority_field_value = field_value.as_bytes();
1865        let frame_payload_len =
1866            octets::varint_len(stream_id) + priority_field_value.len();
1867
1868        let overhead =
1869            octets::varint_len(frame::PRIORITY_UPDATE_FRAME_REQUEST_TYPE_ID) +
1870                octets::varint_len(stream_id) +
1871                octets::varint_len(frame_payload_len as u64);
1872
1873        // Make sure the control stream has enough capacity.
1874        match conn.stream_writable(
1875            control_stream_id,
1876            overhead + priority_field_value.len(),
1877        ) {
1878            Ok(true) => (),
1879
1880            Ok(false) => return Err(Error::StreamBlocked),
1881
1882            Err(e) => {
1883                return Err(e.into());
1884            },
1885        }
1886
1887        b.put_varint(frame::PRIORITY_UPDATE_FRAME_REQUEST_TYPE_ID)?;
1888        b.put_varint(frame_payload_len as u64)?;
1889        b.put_varint(stream_id)?;
1890        let off = b.off();
1891        conn.stream_send(control_stream_id, &d[..off], false)?;
1892
1893        // Sending field value separately avoids unnecessary copy.
1894        conn.stream_send(control_stream_id, priority_field_value, false)?;
1895
1896        trace!(
1897            "{} tx frm PRIORITY_UPDATE request_stream={} priority_field_value={}",
1898            conn.trace_id(),
1899            stream_id,
1900            field_value,
1901        );
1902
1903        qlog_with_type!(QLOG_FRAME_CREATED, conn.qlog, q, {
1904            let frame = Http3Frame::PriorityUpdate {
1905                target_stream_type: H3PriorityTargetStreamType::Request,
1906                prioritized_element_id: stream_id,
1907                priority_field_value: field_value.clone(),
1908            };
1909
1910            let ev_data = EventData::H3FrameCreated(H3FrameCreated {
1911                stream_id,
1912                length: Some(priority_field_value.len() as u64),
1913                frame,
1914                ..Default::default()
1915            });
1916
1917            q.add_event_data_now(ev_data).ok();
1918        });
1919
1920        Ok(())
1921    }
1922
1923    /// Take the last PRIORITY_UPDATE for a prioritized element ID.
1924    ///
1925    /// When the [`poll()`] method returns a [`PriorityUpdate`] event for a
1926    /// prioritized element, the event has triggered and will not rearm until
1927    /// applications call this method. It is recommended that applications defer
1928    /// taking the PRIORITY_UPDATE until after [`poll()`] returns [`Done`].
1929    ///
1930    /// On success the Priority Field Value is returned, or [`Done`] if there is
1931    /// no PRIORITY_UPDATE to read (either because there is no value to take, or
1932    /// because the prioritized element does not exist).
1933    ///
1934    /// [`poll()`]: struct.Connection.html#method.poll
1935    /// [`PriorityUpdate`]: enum.Event.html#variant.PriorityUpdate
1936    /// [`Done`]: enum.Error.html#variant.Done
1937    pub fn take_last_priority_update(
1938        &mut self, prioritized_element_id: u64,
1939    ) -> Result<Vec<u8>> {
1940        if let Some(stream) = self.streams.get_mut(&prioritized_element_id) {
1941            return stream.take_last_priority_update().ok_or(Error::Done);
1942        }
1943
1944        Err(Error::Done)
1945    }
1946
1947    /// Processes HTTP/3 data received from the peer.
1948    ///
1949    /// On success it returns an [`Event`] and an ID, or [`Done`] when there are
1950    /// no events to report.
1951    ///
1952    /// Note that all events are edge-triggered, meaning that once reported they
1953    /// will not be reported again by calling this method again, until the event
1954    /// is re-armed.
1955    ///
1956    /// The events [`Headers`], [`Data`] and [`Finished`] return a stream ID,
1957    /// which is used in methods [`recv_body()`], [`send_response()`] or
1958    /// [`send_body()`].
1959    ///
1960    /// The event [`GoAway`] returns an ID that depends on the connection role.
1961    /// A client receives the largest processed stream ID. A server receives the
1962    /// the largest permitted push ID.
1963    ///
1964    /// The event [`PriorityUpdate`] only occurs at servers. It returns a
1965    /// prioritized element ID that is used in the method
1966    /// [`take_last_priority_update()`], which rearms the event for that ID.
1967    ///
1968    /// If an error occurs while processing data, the connection is closed with
1969    /// the appropriate error code, using the transport's [`close()`] method.
1970    ///
1971    /// [`Event`]: enum.Event.html
1972    /// [`Done`]: enum.Error.html#variant.Done
1973    /// [`Headers`]: enum.Event.html#variant.Headers
1974    /// [`Data`]: enum.Event.html#variant.Data
1975    /// [`Finished`]: enum.Event.html#variant.Finished
1976    /// [`GoAway`]: enum.Event.html#variant.GoAWay
1977    /// [`PriorityUpdate`]: enum.Event.html#variant.PriorityUpdate
1978    /// [`recv_body()`]: struct.Connection.html#method.recv_body
1979    /// [`send_response()`]: struct.Connection.html#method.send_response
1980    /// [`send_body()`]: struct.Connection.html#method.send_body
1981    /// [`recv_dgram()`]: struct.Connection.html#method.recv_dgram
1982    /// [`take_last_priority_update()`]: struct.Connection.html#method.take_last_priority_update
1983    /// [`close()`]: ../struct.Connection.html#method.close
1984    pub fn poll<F: BufFactory>(
1985        &mut self, conn: &mut super::Connection<F>,
1986    ) -> Result<(u64, Event)> {
1987        // When connection close is initiated by the local application (e.g. due
1988        // to a protocol error), the connection itself might be in a broken
1989        // state, so return early.
1990        if conn.local_error.is_some() {
1991            return Err(Error::Done);
1992        }
1993
1994        // Process control streams first.
1995        if let Some(stream_id) = self.peer_control_stream_id {
1996            match self.process_control_stream(conn, stream_id) {
1997                Ok(ev) => return Ok(ev),
1998
1999                Err(Error::Done) => (),
2000
2001                Err(e) => return Err(e),
2002            };
2003        }
2004
2005        if let Some(stream_id) = self.peer_qpack_streams.encoder_stream_id {
2006            match self.process_control_stream(conn, stream_id) {
2007                Ok(ev) => return Ok(ev),
2008
2009                Err(Error::Done) => (),
2010
2011                Err(e) => return Err(e),
2012            };
2013        }
2014
2015        if let Some(stream_id) = self.peer_qpack_streams.decoder_stream_id {
2016            match self.process_control_stream(conn, stream_id) {
2017                Ok(ev) => return Ok(ev),
2018
2019                Err(Error::Done) => (),
2020
2021                Err(e) => return Err(e),
2022            };
2023        }
2024
2025        // Process finished streams list.
2026        if let Some(finished) = self.finished_streams.pop_front() {
2027            return Ok((finished, Event::Finished));
2028        }
2029
2030        // Process HTTP/3 data from readable streams.
2031        for s in conn.readable() {
2032            trace!("{} stream id {} is readable", conn.trace_id(), s);
2033
2034            let ev = match self.process_readable_stream(conn, s, true) {
2035                Ok(v) => Some(v),
2036
2037                Err(Error::Done) => None,
2038
2039                // Return early if the stream was reset, to avoid returning
2040                // a Finished event later as well.
2041                Err(Error::TransportError(crate::Error::StreamReset(e))) =>
2042                    return Ok((s, Event::Reset(e))),
2043
2044                Err(e) => return Err(e),
2045            };
2046
2047            if conn.stream_finished(s) {
2048                self.process_finished_stream(s);
2049            }
2050
2051            // TODO: check if stream is completed so it can be freed
2052            if let Some(ev) = ev {
2053                return Ok(ev);
2054            }
2055        }
2056
2057        // Process finished streams list once again, to make sure `Finished`
2058        // events are returned when receiving empty stream frames with the fin
2059        // flag set.
2060        if let Some(finished) = self.finished_streams.pop_front() {
2061            if conn.stream_readable(finished) {
2062                // The stream is finished, but is still readable, it may
2063                // indicate that there is a pending error, such as reset.
2064                if let Err(crate::Error::StreamReset(e)) =
2065                    conn.stream_recv(finished, &mut [])
2066                {
2067                    return Ok((finished, Event::Reset(e)));
2068                }
2069            }
2070            return Ok((finished, Event::Finished));
2071        }
2072
2073        Err(Error::Done)
2074    }
2075
2076    /// Sends a GOAWAY frame to initiate graceful connection closure.
2077    ///
2078    /// When quiche is used in the server role, the `id` parameter is the stream
2079    /// ID of the highest processed request. This can be any valid ID between 0
2080    /// and 2^62-4. However, the ID cannot be increased. Failure to satisfy
2081    /// these conditions will return an error.
2082    ///
2083    /// This method does not close the QUIC connection. Applications are
2084    /// required to call [`close()`] themselves.
2085    ///
2086    /// [`close()`]: ../struct.Connection.html#method.close
2087    pub fn send_goaway<F: BufFactory>(
2088        &mut self, conn: &mut super::Connection<F>, id: u64,
2089    ) -> Result<()> {
2090        let mut id = id;
2091
2092        // TODO: server push
2093        //
2094        // In the meantime always send 0 from client.
2095        if !self.is_server {
2096            id = 0;
2097        }
2098
2099        if self.is_server && id % 4 != 0 {
2100            return Err(Error::IdError);
2101        }
2102
2103        if let Some(sent_id) = self.local_goaway_id {
2104            if id > sent_id {
2105                return Err(Error::IdError);
2106            }
2107        }
2108
2109        if let Some(stream_id) = self.control_stream_id {
2110            let mut d = [42; 10];
2111            let mut b = octets::OctetsMut::with_slice(&mut d);
2112
2113            let frame = frame::Frame::GoAway { id };
2114
2115            let wire_len = frame.to_bytes(&mut b)?;
2116            let stream_cap = conn.stream_capacity(stream_id)?;
2117
2118            if stream_cap < wire_len {
2119                return Err(Error::StreamBlocked);
2120            }
2121
2122            trace!("{} tx frm {:?}", conn.trace_id(), frame);
2123
2124            qlog_with_type!(QLOG_FRAME_CREATED, conn.qlog, q, {
2125                let ev_data = EventData::H3FrameCreated(H3FrameCreated {
2126                    stream_id,
2127                    length: Some(octets::varint_len(id) as u64),
2128                    frame: frame.to_qlog(),
2129                    ..Default::default()
2130                });
2131
2132                q.add_event_data_now(ev_data).ok();
2133            });
2134
2135            let off = b.off();
2136            conn.stream_send(stream_id, &d[..off], false)?;
2137
2138            self.local_goaway_id = Some(id);
2139        }
2140
2141        Ok(())
2142    }
2143
2144    /// Gets the raw settings from peer including unknown and reserved types.
2145    ///
2146    /// The order of settings is the same as received in the SETTINGS frame.
2147    pub fn peer_settings_raw(&self) -> Option<&[(u64, u64)]> {
2148        self.peer_settings.raw.as_deref()
2149    }
2150
2151    fn open_uni_stream<F: BufFactory>(
2152        &mut self, conn: &mut super::Connection<F>, ty: u64,
2153    ) -> Result<u64> {
2154        let stream_id = self.next_uni_stream_id;
2155
2156        let mut d = [0; 8];
2157        let mut b = octets::OctetsMut::with_slice(&mut d);
2158
2159        match ty {
2160            // Control and QPACK streams are the most important to schedule.
2161            stream::HTTP3_CONTROL_STREAM_TYPE_ID |
2162            stream::QPACK_ENCODER_STREAM_TYPE_ID |
2163            stream::QPACK_DECODER_STREAM_TYPE_ID => {
2164                conn.stream_priority(stream_id, 0, false)?;
2165            },
2166
2167            // TODO: Server push
2168            stream::HTTP3_PUSH_STREAM_TYPE_ID => (),
2169
2170            // Anything else is a GREASE stream, so make it the least important.
2171            _ => {
2172                conn.stream_priority(stream_id, 255, false)?;
2173            },
2174        }
2175
2176        conn.stream_send(stream_id, b.put_varint(ty)?, false)?;
2177
2178        // To avoid skipping stream IDs, we only calculate the next available
2179        // stream ID when data has been successfully buffered.
2180        self.next_uni_stream_id = self
2181            .next_uni_stream_id
2182            .checked_add(4)
2183            .ok_or(Error::IdError)?;
2184
2185        Ok(stream_id)
2186    }
2187
2188    fn open_qpack_encoder_stream<F: BufFactory>(
2189        &mut self, conn: &mut super::Connection<F>,
2190    ) -> Result<()> {
2191        let stream_id =
2192            self.open_uni_stream(conn, stream::QPACK_ENCODER_STREAM_TYPE_ID)?;
2193
2194        self.local_qpack_streams.encoder_stream_id = Some(stream_id);
2195
2196        qlog_with_type!(QLOG_STREAM_TYPE_SET, conn.qlog, q, {
2197            let ev_data = EventData::H3StreamTypeSet(H3StreamTypeSet {
2198                stream_id,
2199                owner: Some(H3Owner::Local),
2200                stream_type: H3StreamType::QpackEncode,
2201                ..Default::default()
2202            });
2203
2204            q.add_event_data_now(ev_data).ok();
2205        });
2206
2207        Ok(())
2208    }
2209
2210    fn open_qpack_decoder_stream<F: BufFactory>(
2211        &mut self, conn: &mut super::Connection<F>,
2212    ) -> Result<()> {
2213        let stream_id =
2214            self.open_uni_stream(conn, stream::QPACK_DECODER_STREAM_TYPE_ID)?;
2215
2216        self.local_qpack_streams.decoder_stream_id = Some(stream_id);
2217
2218        qlog_with_type!(QLOG_STREAM_TYPE_SET, conn.qlog, q, {
2219            let ev_data = EventData::H3StreamTypeSet(H3StreamTypeSet {
2220                stream_id,
2221                owner: Some(H3Owner::Local),
2222                stream_type: H3StreamType::QpackDecode,
2223                ..Default::default()
2224            });
2225
2226            q.add_event_data_now(ev_data).ok();
2227        });
2228
2229        Ok(())
2230    }
2231
2232    /// Send GREASE frames on the provided stream ID.
2233    fn send_grease_frames<F: BufFactory>(
2234        &mut self, conn: &mut super::Connection<F>, stream_id: u64,
2235    ) -> Result<()> {
2236        let mut d = [0; 8];
2237
2238        let stream_cap = match conn.stream_capacity(stream_id) {
2239            Ok(v) => v,
2240
2241            Err(e) => {
2242                if conn.stream_finished(stream_id) {
2243                    self.streams.remove(&stream_id);
2244                }
2245
2246                return Err(e.into());
2247            },
2248        };
2249
2250        let grease_frame1 = grease_value();
2251        let grease_frame2 = grease_value();
2252        let grease_payload = b"GREASE is the word";
2253
2254        let overhead = octets::varint_len(grease_frame1) + // frame type
2255            1 + // payload len
2256            octets::varint_len(grease_frame2) + // frame type
2257            1 + // payload len
2258            grease_payload.len(); // payload
2259
2260        // Don't send GREASE if there is not enough capacity for it. Greasing
2261        // will _not_ be attempted again later on.
2262        if stream_cap < overhead {
2263            return Ok(());
2264        }
2265
2266        // Empty GREASE frame.
2267        let mut b = octets::OctetsMut::with_slice(&mut d);
2268        conn.stream_send(stream_id, b.put_varint(grease_frame1)?, false)?;
2269
2270        let mut b = octets::OctetsMut::with_slice(&mut d);
2271        conn.stream_send(stream_id, b.put_varint(0)?, false)?;
2272
2273        trace!(
2274            "{} tx frm GREASE stream={} len=0",
2275            conn.trace_id(),
2276            stream_id
2277        );
2278
2279        qlog_with_type!(QLOG_FRAME_CREATED, conn.qlog, q, {
2280            let frame = Http3Frame::Reserved { length: Some(0) };
2281            let ev_data = EventData::H3FrameCreated(H3FrameCreated {
2282                stream_id,
2283                length: Some(0),
2284                frame,
2285                ..Default::default()
2286            });
2287
2288            q.add_event_data_now(ev_data).ok();
2289        });
2290
2291        // GREASE frame with payload.
2292        let mut b = octets::OctetsMut::with_slice(&mut d);
2293        conn.stream_send(stream_id, b.put_varint(grease_frame2)?, false)?;
2294
2295        let mut b = octets::OctetsMut::with_slice(&mut d);
2296        conn.stream_send(stream_id, b.put_varint(18)?, false)?;
2297
2298        conn.stream_send(stream_id, grease_payload, false)?;
2299
2300        trace!(
2301            "{} tx frm GREASE stream={} len={}",
2302            conn.trace_id(),
2303            stream_id,
2304            grease_payload.len()
2305        );
2306
2307        qlog_with_type!(QLOG_FRAME_CREATED, conn.qlog, q, {
2308            let frame = Http3Frame::Reserved {
2309                length: Some(grease_payload.len() as u64),
2310            };
2311            let ev_data = EventData::H3FrameCreated(H3FrameCreated {
2312                stream_id,
2313                length: Some(grease_payload.len() as u64),
2314                frame,
2315                ..Default::default()
2316            });
2317
2318            q.add_event_data_now(ev_data).ok();
2319        });
2320
2321        Ok(())
2322    }
2323
2324    /// Opens a new unidirectional stream with a GREASE type and sends some
2325    /// unframed payload.
2326    fn open_grease_stream<F: BufFactory>(
2327        &mut self, conn: &mut super::Connection<F>,
2328    ) -> Result<()> {
2329        let ty = grease_value();
2330        match self.open_uni_stream(conn, ty) {
2331            Ok(stream_id) => {
2332                conn.stream_send(stream_id, b"GREASE is the word", true)?;
2333
2334                trace!("{} open GREASE stream {}", conn.trace_id(), stream_id);
2335
2336                qlog_with_type!(QLOG_STREAM_TYPE_SET, conn.qlog, q, {
2337                    let ev_data = EventData::H3StreamTypeSet(H3StreamTypeSet {
2338                        stream_id,
2339                        owner: Some(H3Owner::Local),
2340                        stream_type: H3StreamType::Unknown,
2341                        stream_type_value: Some(ty),
2342                        ..Default::default()
2343                    });
2344
2345                    q.add_event_data_now(ev_data).ok();
2346                });
2347            },
2348
2349            Err(Error::IdError) => {
2350                trace!("{} GREASE stream blocked", conn.trace_id(),);
2351
2352                return Ok(());
2353            },
2354
2355            Err(e) => return Err(e),
2356        };
2357
2358        Ok(())
2359    }
2360
2361    /// Sends SETTINGS frame based on HTTP/3 configuration.
2362    fn send_settings<F: BufFactory>(
2363        &mut self, conn: &mut super::Connection<F>,
2364    ) -> Result<()> {
2365        let stream_id = match self
2366            .open_uni_stream(conn, stream::HTTP3_CONTROL_STREAM_TYPE_ID)
2367        {
2368            Ok(v) => v,
2369
2370            Err(e) => {
2371                trace!("{} Control stream blocked", conn.trace_id(),);
2372
2373                if e == Error::Done {
2374                    return Err(Error::InternalError);
2375                }
2376
2377                return Err(e);
2378            },
2379        };
2380
2381        self.control_stream_id = Some(stream_id);
2382
2383        qlog_with_type!(QLOG_STREAM_TYPE_SET, conn.qlog, q, {
2384            let ev_data = EventData::H3StreamTypeSet(H3StreamTypeSet {
2385                stream_id,
2386                owner: Some(H3Owner::Local),
2387                stream_type: H3StreamType::Control,
2388                ..Default::default()
2389            });
2390
2391            q.add_event_data_now(ev_data).ok();
2392        });
2393
2394        let grease = if conn.grease {
2395            Some((grease_value(), grease_value()))
2396        } else {
2397            None
2398        };
2399
2400        let frame = frame::Frame::Settings {
2401            max_field_section_size: self.local_settings.max_field_section_size,
2402            qpack_max_table_capacity: self
2403                .local_settings
2404                .qpack_max_table_capacity,
2405            qpack_blocked_streams: self.local_settings.qpack_blocked_streams,
2406            connect_protocol_enabled: self
2407                .local_settings
2408                .connect_protocol_enabled,
2409            h3_datagram: self.local_settings.h3_datagram,
2410            grease,
2411            additional_settings: self.local_settings.additional_settings.clone(),
2412            raw: Default::default(),
2413        };
2414
2415        let mut d = [42; 128];
2416        let mut b = octets::OctetsMut::with_slice(&mut d);
2417
2418        frame.to_bytes(&mut b)?;
2419
2420        let off = b.off();
2421
2422        if let Some(id) = self.control_stream_id {
2423            conn.stream_send(id, &d[..off], false)?;
2424
2425            trace!(
2426                "{} tx frm SETTINGS stream={} len={}",
2427                conn.trace_id(),
2428                id,
2429                off
2430            );
2431
2432            qlog_with_type!(QLOG_FRAME_CREATED, conn.qlog, q, {
2433                let frame = frame.to_qlog();
2434                let ev_data = EventData::H3FrameCreated(H3FrameCreated {
2435                    stream_id: id,
2436                    length: Some(off as u64),
2437                    frame,
2438                    ..Default::default()
2439                });
2440
2441                q.add_event_data_now(ev_data).ok();
2442            });
2443        }
2444
2445        Ok(())
2446    }
2447
2448    fn process_control_stream<F: BufFactory>(
2449        &mut self, conn: &mut super::Connection<F>, stream_id: u64,
2450    ) -> Result<(u64, Event)> {
2451        close_conn_if_critical_stream_finished(conn, stream_id)?;
2452
2453        if !conn.stream_readable(stream_id) {
2454            return Err(Error::Done);
2455        }
2456
2457        match self.process_readable_stream(conn, stream_id, true) {
2458            Ok(ev) => return Ok(ev),
2459
2460            Err(Error::Done) => (),
2461
2462            Err(e) => return Err(e),
2463        };
2464
2465        close_conn_if_critical_stream_finished(conn, stream_id)?;
2466
2467        Err(Error::Done)
2468    }
2469
2470    fn process_readable_stream<F: BufFactory>(
2471        &mut self, conn: &mut super::Connection<F>, stream_id: u64, polling: bool,
2472    ) -> Result<(u64, Event)> {
2473        self.streams
2474            .entry(stream_id)
2475            .or_insert_with(|| <stream::Stream>::new(stream_id, false));
2476
2477        // We need to get a fresh reference to the stream for each
2478        // iteration, to avoid borrowing `self` for the entire duration
2479        // of the loop, because we'll need to borrow it again in the
2480        // `State::FramePayload` case below.
2481        while let Some(stream) = self.streams.get_mut(&stream_id) {
2482            match stream.state() {
2483                stream::State::StreamType => {
2484                    stream.try_fill_buffer(conn)?;
2485
2486                    let varint = match stream.try_consume_varint() {
2487                        Ok(v) => v,
2488
2489                        Err(_) => continue,
2490                    };
2491
2492                    let ty = stream::Type::deserialize(varint)?;
2493
2494                    if let Err(e) = stream.set_ty(ty) {
2495                        conn.close(true, e.to_wire(), b"")?;
2496                        return Err(e);
2497                    }
2498
2499                    qlog_with_type!(QLOG_STREAM_TYPE_SET, conn.qlog, q, {
2500                        let ty_val = if matches!(ty, stream::Type::Unknown) {
2501                            Some(varint)
2502                        } else {
2503                            None
2504                        };
2505
2506                        let ev_data =
2507                            EventData::H3StreamTypeSet(H3StreamTypeSet {
2508                                stream_id,
2509                                owner: Some(H3Owner::Remote),
2510                                stream_type: ty.to_qlog(),
2511                                stream_type_value: ty_val,
2512                                ..Default::default()
2513                            });
2514
2515                        q.add_event_data_now(ev_data).ok();
2516                    });
2517
2518                    match &ty {
2519                        stream::Type::Control => {
2520                            // Only one control stream allowed.
2521                            if self.peer_control_stream_id.is_some() {
2522                                conn.close(
2523                                    true,
2524                                    Error::StreamCreationError.to_wire(),
2525                                    b"Received multiple control streams",
2526                                )?;
2527
2528                                return Err(Error::StreamCreationError);
2529                            }
2530
2531                            trace!(
2532                                "{} open peer's control stream {}",
2533                                conn.trace_id(),
2534                                stream_id
2535                            );
2536
2537                            close_conn_if_critical_stream_finished(
2538                                conn, stream_id,
2539                            )?;
2540
2541                            self.peer_control_stream_id = Some(stream_id);
2542                        },
2543
2544                        stream::Type::Push => {
2545                            // Only clients can receive push stream.
2546                            if self.is_server {
2547                                conn.close(
2548                                    true,
2549                                    Error::StreamCreationError.to_wire(),
2550                                    b"Server received push stream.",
2551                                )?;
2552
2553                                return Err(Error::StreamCreationError);
2554                            }
2555                        },
2556
2557                        stream::Type::QpackEncoder => {
2558                            // Only one qpack encoder stream allowed.
2559                            if self.peer_qpack_streams.encoder_stream_id.is_some()
2560                            {
2561                                conn.close(
2562                                    true,
2563                                    Error::StreamCreationError.to_wire(),
2564                                    b"Received multiple QPACK encoder streams",
2565                                )?;
2566
2567                                return Err(Error::StreamCreationError);
2568                            }
2569
2570                            close_conn_if_critical_stream_finished(
2571                                conn, stream_id,
2572                            )?;
2573
2574                            self.peer_qpack_streams.encoder_stream_id =
2575                                Some(stream_id);
2576                        },
2577
2578                        stream::Type::QpackDecoder => {
2579                            // Only one qpack decoder allowed.
2580                            if self.peer_qpack_streams.decoder_stream_id.is_some()
2581                            {
2582                                conn.close(
2583                                    true,
2584                                    Error::StreamCreationError.to_wire(),
2585                                    b"Received multiple QPACK decoder streams",
2586                                )?;
2587
2588                                return Err(Error::StreamCreationError);
2589                            }
2590
2591                            close_conn_if_critical_stream_finished(
2592                                conn, stream_id,
2593                            )?;
2594
2595                            self.peer_qpack_streams.decoder_stream_id =
2596                                Some(stream_id);
2597                        },
2598
2599                        stream::Type::Unknown => {
2600                            // Unknown stream types are ignored.
2601                            // TODO: we MAY send STOP_SENDING
2602                        },
2603
2604                        stream::Type::Request => unreachable!(),
2605                    }
2606                },
2607
2608                stream::State::PushId => {
2609                    stream.try_fill_buffer(conn)?;
2610
2611                    let varint = match stream.try_consume_varint() {
2612                        Ok(v) => v,
2613
2614                        Err(_) => continue,
2615                    };
2616
2617                    if let Err(e) = stream.set_push_id(varint) {
2618                        conn.close(true, e.to_wire(), b"")?;
2619                        return Err(e);
2620                    }
2621                },
2622
2623                stream::State::FrameType => {
2624                    stream.try_fill_buffer(conn)?;
2625
2626                    let varint = match stream.try_consume_varint() {
2627                        Ok(v) => v,
2628
2629                        Err(_) => continue,
2630                    };
2631
2632                    match stream.set_frame_type(varint) {
2633                        Err(Error::FrameUnexpected) => {
2634                            let msg = format!("Unexpected frame type {varint}");
2635
2636                            conn.close(
2637                                true,
2638                                Error::FrameUnexpected.to_wire(),
2639                                msg.as_bytes(),
2640                            )?;
2641
2642                            return Err(Error::FrameUnexpected);
2643                        },
2644
2645                        Err(e) => {
2646                            conn.close(
2647                                true,
2648                                e.to_wire(),
2649                                b"Error handling frame.",
2650                            )?;
2651
2652                            return Err(e);
2653                        },
2654
2655                        _ => (),
2656                    }
2657                },
2658
2659                stream::State::FramePayloadLen => {
2660                    stream.try_fill_buffer(conn)?;
2661
2662                    let payload_len = match stream.try_consume_varint() {
2663                        Ok(v) => v,
2664
2665                        Err(_) => continue,
2666                    };
2667
2668                    // DATA frames are handled uniquely. After this point we lose
2669                    // visibility of DATA framing, so just log here.
2670                    if Some(frame::DATA_FRAME_TYPE_ID) == stream.frame_type() {
2671                        trace!(
2672                            "{} rx frm DATA stream={} wire_payload_len={}",
2673                            conn.trace_id(),
2674                            stream_id,
2675                            payload_len
2676                        );
2677
2678                        qlog_with_type!(QLOG_FRAME_PARSED, conn.qlog, q, {
2679                            let frame = Http3Frame::Data { raw: None };
2680
2681                            let ev_data =
2682                                EventData::H3FrameParsed(H3FrameParsed {
2683                                    stream_id,
2684                                    length: Some(payload_len),
2685                                    frame,
2686                                    ..Default::default()
2687                                });
2688
2689                            q.add_event_data_now(ev_data).ok();
2690                        });
2691                    }
2692
2693                    if let Err(e) = stream.set_frame_payload_len(payload_len) {
2694                        conn.close(true, e.to_wire(), b"")?;
2695                        return Err(e);
2696                    }
2697                },
2698
2699                stream::State::FramePayload => {
2700                    // Do not emit events when not polling.
2701                    if !polling {
2702                        break;
2703                    }
2704
2705                    stream.try_fill_buffer(conn)?;
2706
2707                    let (frame, payload_len) = match stream.try_consume_frame() {
2708                        Ok(frame) => frame,
2709
2710                        Err(Error::Done) => return Err(Error::Done),
2711
2712                        Err(e) => {
2713                            conn.close(
2714                                true,
2715                                e.to_wire(),
2716                                b"Error handling frame.",
2717                            )?;
2718
2719                            return Err(e);
2720                        },
2721                    };
2722
2723                    match self.process_frame(conn, stream_id, frame, payload_len)
2724                    {
2725                        Ok(ev) => return Ok(ev),
2726
2727                        Err(Error::Done) => {
2728                            // This might be a frame that is processed internally
2729                            // without needing to bubble up to the user as an
2730                            // event. Check whether the frame has FIN'd by QUIC
2731                            // to prevent trying to read again on a closed stream.
2732                            if conn.stream_finished(stream_id) {
2733                                break;
2734                            }
2735                        },
2736
2737                        Err(e) => return Err(e),
2738                    };
2739                },
2740
2741                stream::State::Data => {
2742                    // Do not emit events when not polling.
2743                    if !polling {
2744                        break;
2745                    }
2746
2747                    if !stream.try_trigger_data_event() {
2748                        break;
2749                    }
2750
2751                    return Ok((stream_id, Event::Data));
2752                },
2753
2754                stream::State::QpackInstruction => {
2755                    let mut d = [0; 4096];
2756
2757                    // Read data from the stream and discard immediately.
2758                    loop {
2759                        let (recv, fin) = conn.stream_recv(stream_id, &mut d)?;
2760
2761                        match stream.ty() {
2762                            Some(stream::Type::QpackEncoder) =>
2763                                self.peer_qpack_streams.encoder_stream_bytes +=
2764                                    recv as u64,
2765                            Some(stream::Type::QpackDecoder) =>
2766                                self.peer_qpack_streams.decoder_stream_bytes +=
2767                                    recv as u64,
2768                            _ => unreachable!(),
2769                        };
2770
2771                        if fin {
2772                            close_conn_critical_stream(conn)?;
2773                        }
2774                    }
2775                },
2776
2777                stream::State::Drain => {
2778                    // Discard incoming data on the stream.
2779                    conn.stream_shutdown(
2780                        stream_id,
2781                        crate::Shutdown::Read,
2782                        0x100,
2783                    )?;
2784
2785                    break;
2786                },
2787
2788                stream::State::Finished => break,
2789            }
2790        }
2791
2792        Err(Error::Done)
2793    }
2794
2795    fn process_finished_stream(&mut self, stream_id: u64) {
2796        let stream = match self.streams.get_mut(&stream_id) {
2797            Some(v) => v,
2798
2799            None => return,
2800        };
2801
2802        if stream.state() == stream::State::Finished {
2803            return;
2804        }
2805
2806        match stream.ty() {
2807            Some(stream::Type::Request) | Some(stream::Type::Push) => {
2808                stream.finished();
2809
2810                self.finished_streams.push_back(stream_id);
2811            },
2812
2813            _ => (),
2814        };
2815    }
2816
2817    fn process_frame<F: BufFactory>(
2818        &mut self, conn: &mut super::Connection<F>, stream_id: u64,
2819        frame: frame::Frame, payload_len: u64,
2820    ) -> Result<(u64, Event)> {
2821        trace!(
2822            "{} rx frm {:?} stream={} payload_len={}",
2823            conn.trace_id(),
2824            frame,
2825            stream_id,
2826            payload_len
2827        );
2828
2829        qlog_with_type!(QLOG_FRAME_PARSED, conn.qlog, q, {
2830            // HEADERS frames are special case and will be logged below.
2831            if !matches!(frame, frame::Frame::Headers { .. }) {
2832                let frame = frame.to_qlog();
2833                let ev_data = EventData::H3FrameParsed(H3FrameParsed {
2834                    stream_id,
2835                    length: Some(payload_len),
2836                    frame,
2837                    ..Default::default()
2838                });
2839
2840                q.add_event_data_now(ev_data).ok();
2841            }
2842        });
2843
2844        match frame {
2845            frame::Frame::Settings {
2846                max_field_section_size,
2847                qpack_max_table_capacity,
2848                qpack_blocked_streams,
2849                connect_protocol_enabled,
2850                h3_datagram,
2851                additional_settings,
2852                raw,
2853                ..
2854            } => {
2855                self.peer_settings = ConnectionSettings {
2856                    max_field_section_size,
2857                    qpack_max_table_capacity,
2858                    qpack_blocked_streams,
2859                    connect_protocol_enabled,
2860                    h3_datagram,
2861                    additional_settings,
2862                    raw,
2863                };
2864
2865                if let Some(1) = h3_datagram {
2866                    // The peer MUST have also enabled DATAGRAM with a TP
2867                    if conn.dgram_max_writable_len().is_none() {
2868                        conn.close(
2869                            true,
2870                            Error::SettingsError.to_wire(),
2871                            b"H3_DATAGRAM sent with value 1 but max_datagram_frame_size TP not set.",
2872                        )?;
2873
2874                        return Err(Error::SettingsError);
2875                    }
2876                }
2877            },
2878
2879            frame::Frame::Headers { header_block } => {
2880                // Servers reject too many HEADERS frames.
2881                if let Some(s) = self.streams.get_mut(&stream_id) {
2882                    if self.is_server && s.headers_received_count() == 2 {
2883                        conn.close(
2884                            true,
2885                            Error::FrameUnexpected.to_wire(),
2886                            b"Too many HEADERS frames",
2887                        )?;
2888                        return Err(Error::FrameUnexpected);
2889                    }
2890
2891                    s.increment_headers_received();
2892                }
2893
2894                // Use "infinite" as default value for max_field_section_size if
2895                // it is not configured by the application.
2896                let max_size = self
2897                    .local_settings
2898                    .max_field_section_size
2899                    .unwrap_or(u64::MAX);
2900
2901                let headers = match self
2902                    .qpack_decoder
2903                    .decode(&header_block[..], max_size)
2904                {
2905                    Ok(v) => v,
2906
2907                    Err(e) => {
2908                        let e = match e {
2909                            qpack::Error::HeaderListTooLarge =>
2910                                Error::ExcessiveLoad,
2911
2912                            _ => Error::QpackDecompressionFailed,
2913                        };
2914
2915                        conn.close(true, e.to_wire(), b"Error parsing headers.")?;
2916
2917                        return Err(e);
2918                    },
2919                };
2920
2921                qlog_with_type!(QLOG_FRAME_PARSED, conn.qlog, q, {
2922                    let qlog_headers = headers
2923                        .iter()
2924                        .map(|h| qlog::events::h3::HttpHeader {
2925                            name: String::from_utf8_lossy(h.name()).into_owned(),
2926                            value: String::from_utf8_lossy(h.value())
2927                                .into_owned(),
2928                        })
2929                        .collect();
2930
2931                    let frame = Http3Frame::Headers {
2932                        headers: qlog_headers,
2933                    };
2934
2935                    let ev_data = EventData::H3FrameParsed(H3FrameParsed {
2936                        stream_id,
2937                        length: Some(payload_len),
2938                        frame,
2939                        ..Default::default()
2940                    });
2941
2942                    q.add_event_data_now(ev_data).ok();
2943                });
2944
2945                let more_frames = !conn.stream_finished(stream_id);
2946
2947                return Ok((stream_id, Event::Headers {
2948                    list: headers,
2949                    more_frames,
2950                }));
2951            },
2952
2953            frame::Frame::Data { .. } => {
2954                // Do nothing. The Data event is returned separately.
2955            },
2956
2957            frame::Frame::GoAway { id } => {
2958                if !self.is_server && id % 4 != 0 {
2959                    conn.close(
2960                        true,
2961                        Error::FrameUnexpected.to_wire(),
2962                        b"GOAWAY received with ID of non-request stream",
2963                    )?;
2964
2965                    return Err(Error::IdError);
2966                }
2967
2968                if let Some(received_id) = self.peer_goaway_id {
2969                    if id > received_id {
2970                        conn.close(
2971                            true,
2972                            Error::IdError.to_wire(),
2973                            b"GOAWAY received with ID larger than previously received",
2974                        )?;
2975
2976                        return Err(Error::IdError);
2977                    }
2978                }
2979
2980                self.peer_goaway_id = Some(id);
2981
2982                return Ok((id, Event::GoAway));
2983            },
2984
2985            frame::Frame::MaxPushId { push_id } => {
2986                if !self.is_server {
2987                    conn.close(
2988                        true,
2989                        Error::FrameUnexpected.to_wire(),
2990                        b"MAX_PUSH_ID received by client",
2991                    )?;
2992
2993                    return Err(Error::FrameUnexpected);
2994                }
2995
2996                if push_id < self.max_push_id {
2997                    conn.close(
2998                        true,
2999                        Error::IdError.to_wire(),
3000                        b"MAX_PUSH_ID reduced limit",
3001                    )?;
3002
3003                    return Err(Error::IdError);
3004                }
3005
3006                self.max_push_id = push_id;
3007            },
3008
3009            frame::Frame::PushPromise { .. } => {
3010                if self.is_server {
3011                    conn.close(
3012                        true,
3013                        Error::FrameUnexpected.to_wire(),
3014                        b"PUSH_PROMISE received by server",
3015                    )?;
3016
3017                    return Err(Error::FrameUnexpected);
3018                }
3019
3020                if stream_id % 4 != 0 {
3021                    conn.close(
3022                        true,
3023                        Error::FrameUnexpected.to_wire(),
3024                        b"PUSH_PROMISE received on non-request stream",
3025                    )?;
3026
3027                    return Err(Error::FrameUnexpected);
3028                }
3029
3030                // TODO: implement more checks and PUSH_PROMISE event
3031            },
3032
3033            frame::Frame::CancelPush { .. } => {
3034                // TODO: implement CANCEL_PUSH frame
3035            },
3036
3037            frame::Frame::PriorityUpdateRequest {
3038                prioritized_element_id,
3039                priority_field_value,
3040            } => {
3041                if !self.is_server {
3042                    conn.close(
3043                        true,
3044                        Error::FrameUnexpected.to_wire(),
3045                        b"PRIORITY_UPDATE received by client",
3046                    )?;
3047
3048                    return Err(Error::FrameUnexpected);
3049                }
3050
3051                if prioritized_element_id % 4 != 0 {
3052                    conn.close(
3053                        true,
3054                        Error::FrameUnexpected.to_wire(),
3055                        b"PRIORITY_UPDATE for request stream type with wrong ID",
3056                    )?;
3057
3058                    return Err(Error::FrameUnexpected);
3059                }
3060
3061                if prioritized_element_id > conn.streams.max_streams_bidi() * 4 {
3062                    conn.close(
3063                        true,
3064                        Error::IdError.to_wire(),
3065                        b"PRIORITY_UPDATE for request stream beyond max streams limit",
3066                    )?;
3067
3068                    return Err(Error::IdError);
3069                }
3070
3071                // If the PRIORITY_UPDATE is valid, consider storing the latest
3072                // contents. Due to reordering, it is possible that we might
3073                // receive frames that reference streams that have not yet to
3074                // been opened and that's OK because it's within our concurrency
3075                // limit. However, we discard PRIORITY_UPDATE that refers to
3076                // streams that we know have been collected.
3077                if conn.streams.is_collected(prioritized_element_id) {
3078                    return Err(Error::Done);
3079                }
3080
3081                // If the stream did not yet exist, create it and store.
3082                let stream =
3083                    self.streams.entry(prioritized_element_id).or_insert_with(
3084                        || <stream::Stream>::new(prioritized_element_id, false),
3085                    );
3086
3087                let had_priority_update = stream.has_last_priority_update();
3088                stream.set_last_priority_update(Some(priority_field_value));
3089
3090                // Only trigger the event when there wasn't already a stored
3091                // PRIORITY_UPDATE.
3092                if !had_priority_update {
3093                    return Ok((prioritized_element_id, Event::PriorityUpdate));
3094                } else {
3095                    return Err(Error::Done);
3096                }
3097            },
3098
3099            frame::Frame::PriorityUpdatePush {
3100                prioritized_element_id,
3101                ..
3102            } => {
3103                if !self.is_server {
3104                    conn.close(
3105                        true,
3106                        Error::FrameUnexpected.to_wire(),
3107                        b"PRIORITY_UPDATE received by client",
3108                    )?;
3109
3110                    return Err(Error::FrameUnexpected);
3111                }
3112
3113                if prioritized_element_id % 3 != 0 {
3114                    conn.close(
3115                        true,
3116                        Error::FrameUnexpected.to_wire(),
3117                        b"PRIORITY_UPDATE for push stream type with wrong ID",
3118                    )?;
3119
3120                    return Err(Error::FrameUnexpected);
3121                }
3122
3123                // TODO: we only implement this if we implement server push
3124            },
3125
3126            frame::Frame::Unknown { .. } => (),
3127        }
3128
3129        Err(Error::Done)
3130    }
3131
3132    /// Collects and returns statistics about the connection.
3133    #[inline]
3134    pub fn stats(&self) -> Stats {
3135        Stats {
3136            qpack_encoder_stream_recv_bytes: self
3137                .peer_qpack_streams
3138                .encoder_stream_bytes,
3139            qpack_decoder_stream_recv_bytes: self
3140                .peer_qpack_streams
3141                .decoder_stream_bytes,
3142        }
3143    }
3144}
3145
3146/// Generates an HTTP/3 GREASE variable length integer.
3147pub fn grease_value() -> u64 {
3148    let n = super::rand::rand_u64_uniform(148_764_065_110_560_899);
3149    31 * n + 33
3150}
3151
3152#[doc(hidden)]
3153pub mod testing {
3154    use super::*;
3155
3156    use crate::test_utils;
3157
3158    /// Session is an HTTP/3 test helper structure. It holds a client, server
3159    /// and pipe that allows them to communicate.
3160    ///
3161    /// `default()` creates a session with some sensible default
3162    /// configuration. `with_configs()` allows for providing a specific
3163    /// configuration.
3164    ///
3165    /// `handshake()` performs all the steps needed to establish an HTTP/3
3166    /// connection.
3167    ///
3168    /// Some utility functions are provided that make it less verbose to send
3169    /// request, responses and individual headers. The full quiche API remains
3170    /// available for any test that need to do unconventional things (such as
3171    /// bad behaviour that triggers errors).
3172    pub struct Session {
3173        pub pipe: test_utils::Pipe,
3174        pub client: Connection,
3175        pub server: Connection,
3176    }
3177
3178    impl Session {
3179        pub fn new() -> Result<Session> {
3180            fn path_relative_to_manifest_dir(path: &str) -> String {
3181                std::fs::canonicalize(
3182                    std::path::Path::new(env!("CARGO_MANIFEST_DIR")).join(path),
3183                )
3184                .unwrap()
3185                .to_string_lossy()
3186                .into_owned()
3187            }
3188
3189            let mut config = crate::Config::new(crate::PROTOCOL_VERSION)?;
3190            config.load_cert_chain_from_pem_file(
3191                &path_relative_to_manifest_dir("examples/cert.crt"),
3192            )?;
3193            config.load_priv_key_from_pem_file(
3194                &path_relative_to_manifest_dir("examples/cert.key"),
3195            )?;
3196            config.set_application_protos(&[b"h3"])?;
3197            config.set_initial_max_data(1500);
3198            config.set_initial_max_stream_data_bidi_local(150);
3199            config.set_initial_max_stream_data_bidi_remote(150);
3200            config.set_initial_max_stream_data_uni(150);
3201            config.set_initial_max_streams_bidi(5);
3202            config.set_initial_max_streams_uni(5);
3203            config.verify_peer(false);
3204            config.enable_dgram(true, 3, 3);
3205            config.set_ack_delay_exponent(8);
3206
3207            let h3_config = Config::new()?;
3208            Session::with_configs(&mut config, &h3_config)
3209        }
3210
3211        pub fn with_configs(
3212            config: &mut crate::Config, h3_config: &Config,
3213        ) -> Result<Session> {
3214            let pipe = test_utils::Pipe::with_config(config)?;
3215            let client_dgram = pipe.client.dgram_enabled();
3216            let server_dgram = pipe.server.dgram_enabled();
3217            Ok(Session {
3218                pipe,
3219                client: Connection::new(h3_config, false, client_dgram)?,
3220                server: Connection::new(h3_config, true, server_dgram)?,
3221            })
3222        }
3223
3224        /// Do the HTTP/3 handshake so both ends are in sane initial state.
3225        pub fn handshake(&mut self) -> Result<()> {
3226            self.pipe.handshake()?;
3227
3228            // Client streams.
3229            self.client.send_settings(&mut self.pipe.client)?;
3230            self.pipe.advance().ok();
3231
3232            self.client
3233                .open_qpack_encoder_stream(&mut self.pipe.client)?;
3234            self.pipe.advance().ok();
3235
3236            self.client
3237                .open_qpack_decoder_stream(&mut self.pipe.client)?;
3238            self.pipe.advance().ok();
3239
3240            if self.pipe.client.grease {
3241                self.client.open_grease_stream(&mut self.pipe.client)?;
3242            }
3243
3244            self.pipe.advance().ok();
3245
3246            // Server streams.
3247            self.server.send_settings(&mut self.pipe.server)?;
3248            self.pipe.advance().ok();
3249
3250            self.server
3251                .open_qpack_encoder_stream(&mut self.pipe.server)?;
3252            self.pipe.advance().ok();
3253
3254            self.server
3255                .open_qpack_decoder_stream(&mut self.pipe.server)?;
3256            self.pipe.advance().ok();
3257
3258            if self.pipe.server.grease {
3259                self.server.open_grease_stream(&mut self.pipe.server)?;
3260            }
3261
3262            self.advance().ok();
3263
3264            while self.client.poll(&mut self.pipe.client).is_ok() {
3265                // Do nothing.
3266            }
3267
3268            while self.server.poll(&mut self.pipe.server).is_ok() {
3269                // Do nothing.
3270            }
3271
3272            Ok(())
3273        }
3274
3275        /// Advances the session pipe over the buffer.
3276        pub fn advance(&mut self) -> crate::Result<()> {
3277            self.pipe.advance()
3278        }
3279
3280        /// Polls the client for events.
3281        pub fn poll_client(&mut self) -> Result<(u64, Event)> {
3282            self.client.poll(&mut self.pipe.client)
3283        }
3284
3285        /// Polls the server for events.
3286        pub fn poll_server(&mut self) -> Result<(u64, Event)> {
3287            self.server.poll(&mut self.pipe.server)
3288        }
3289
3290        /// Sends a request from client with default headers.
3291        ///
3292        /// On success it returns the newly allocated stream and the headers.
3293        pub fn send_request(&mut self, fin: bool) -> Result<(u64, Vec<Header>)> {
3294            let req = vec![
3295                Header::new(b":method", b"GET"),
3296                Header::new(b":scheme", b"https"),
3297                Header::new(b":authority", b"quic.tech"),
3298                Header::new(b":path", b"/test"),
3299                Header::new(b"user-agent", b"quiche-test"),
3300            ];
3301
3302            let stream =
3303                self.client.send_request(&mut self.pipe.client, &req, fin)?;
3304
3305            self.advance().ok();
3306
3307            Ok((stream, req))
3308        }
3309
3310        /// Sends a response from server with default headers.
3311        ///
3312        /// On success it returns the headers.
3313        pub fn send_response(
3314            &mut self, stream: u64, fin: bool,
3315        ) -> Result<Vec<Header>> {
3316            let resp = vec![
3317                Header::new(b":status", b"200"),
3318                Header::new(b"server", b"quiche-test"),
3319            ];
3320
3321            self.server.send_response(
3322                &mut self.pipe.server,
3323                stream,
3324                &resp,
3325                fin,
3326            )?;
3327
3328            self.advance().ok();
3329
3330            Ok(resp)
3331        }
3332
3333        /// Sends some default payload from client.
3334        ///
3335        /// On success it returns the payload.
3336        pub fn send_body_client(
3337            &mut self, stream: u64, fin: bool,
3338        ) -> Result<Vec<u8>> {
3339            let bytes = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
3340
3341            self.client
3342                .send_body(&mut self.pipe.client, stream, &bytes, fin)?;
3343
3344            self.advance().ok();
3345
3346            Ok(bytes)
3347        }
3348
3349        /// Fetches DATA payload from the server.
3350        ///
3351        /// On success it returns the number of bytes received.
3352        pub fn recv_body_client(
3353            &mut self, stream: u64, buf: &mut [u8],
3354        ) -> Result<usize> {
3355            self.client.recv_body(&mut self.pipe.client, stream, buf)
3356        }
3357
3358        /// Sends some default payload from server.
3359        ///
3360        /// On success it returns the payload.
3361        pub fn send_body_server(
3362            &mut self, stream: u64, fin: bool,
3363        ) -> Result<Vec<u8>> {
3364            let bytes = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
3365
3366            self.server
3367                .send_body(&mut self.pipe.server, stream, &bytes, fin)?;
3368
3369            self.advance().ok();
3370
3371            Ok(bytes)
3372        }
3373
3374        /// Fetches DATA payload from the client.
3375        ///
3376        /// On success it returns the number of bytes received.
3377        pub fn recv_body_server(
3378            &mut self, stream: u64, buf: &mut [u8],
3379        ) -> Result<usize> {
3380            self.server.recv_body(&mut self.pipe.server, stream, buf)
3381        }
3382
3383        /// Sends a single HTTP/3 frame from the client.
3384        pub fn send_frame_client(
3385            &mut self, frame: frame::Frame, stream_id: u64, fin: bool,
3386        ) -> Result<()> {
3387            let mut d = [42; 65535];
3388
3389            let mut b = octets::OctetsMut::with_slice(&mut d);
3390
3391            frame.to_bytes(&mut b)?;
3392
3393            let off = b.off();
3394            self.pipe.client.stream_send(stream_id, &d[..off], fin)?;
3395
3396            self.advance().ok();
3397
3398            Ok(())
3399        }
3400
3401        /// Send an HTTP/3 DATAGRAM with default data from the client.
3402        ///
3403        /// On success it returns the data.
3404        pub fn send_dgram_client(&mut self, flow_id: u64) -> Result<Vec<u8>> {
3405            let bytes = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
3406            let len = octets::varint_len(flow_id) + bytes.len();
3407            let mut d = vec![0; len];
3408            let mut b = octets::OctetsMut::with_slice(&mut d);
3409
3410            b.put_varint(flow_id)?;
3411            b.put_bytes(&bytes)?;
3412
3413            self.pipe.client.dgram_send(&d)?;
3414
3415            self.advance().ok();
3416
3417            Ok(bytes)
3418        }
3419
3420        /// Receives an HTTP/3 DATAGRAM from the server.
3421        ///
3422        /// On success it returns the DATAGRAM length, flow ID and flow ID
3423        /// length.
3424        pub fn recv_dgram_client(
3425            &mut self, buf: &mut [u8],
3426        ) -> Result<(usize, u64, usize)> {
3427            let len = self.pipe.client.dgram_recv(buf)?;
3428            let mut b = octets::Octets::with_slice(buf);
3429            let flow_id = b.get_varint()?;
3430
3431            Ok((len, flow_id, b.off()))
3432        }
3433
3434        /// Send an HTTP/3 DATAGRAM with default data from the server
3435        ///
3436        /// On success it returns the data.
3437        pub fn send_dgram_server(&mut self, flow_id: u64) -> Result<Vec<u8>> {
3438            let bytes = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
3439            let len = octets::varint_len(flow_id) + bytes.len();
3440            let mut d = vec![0; len];
3441            let mut b = octets::OctetsMut::with_slice(&mut d);
3442
3443            b.put_varint(flow_id)?;
3444            b.put_bytes(&bytes)?;
3445
3446            self.pipe.server.dgram_send(&d)?;
3447
3448            self.advance().ok();
3449
3450            Ok(bytes)
3451        }
3452
3453        /// Receives an HTTP/3 DATAGRAM from the client.
3454        ///
3455        /// On success it returns the DATAGRAM length, flow ID and flow ID
3456        /// length.
3457        pub fn recv_dgram_server(
3458            &mut self, buf: &mut [u8],
3459        ) -> Result<(usize, u64, usize)> {
3460            let len = self.pipe.server.dgram_recv(buf)?;
3461            let mut b = octets::Octets::with_slice(buf);
3462            let flow_id = b.get_varint()?;
3463
3464            Ok((len, flow_id, b.off()))
3465        }
3466
3467        /// Sends a single HTTP/3 frame from the server.
3468        pub fn send_frame_server(
3469            &mut self, frame: frame::Frame, stream_id: u64, fin: bool,
3470        ) -> Result<()> {
3471            let mut d = [42; 65535];
3472
3473            let mut b = octets::OctetsMut::with_slice(&mut d);
3474
3475            frame.to_bytes(&mut b)?;
3476
3477            let off = b.off();
3478            self.pipe.server.stream_send(stream_id, &d[..off], fin)?;
3479
3480            self.advance().ok();
3481
3482            Ok(())
3483        }
3484
3485        /// Sends an arbitrary buffer of HTTP/3 stream data from the client.
3486        pub fn send_arbitrary_stream_data_client(
3487            &mut self, data: &[u8], stream_id: u64, fin: bool,
3488        ) -> Result<()> {
3489            self.pipe.client.stream_send(stream_id, data, fin)?;
3490
3491            self.advance().ok();
3492
3493            Ok(())
3494        }
3495
3496        /// Sends an arbitrary buffer of HTTP/3 stream data from the server.
3497        pub fn send_arbitrary_stream_data_server(
3498            &mut self, data: &[u8], stream_id: u64, fin: bool,
3499        ) -> Result<()> {
3500            self.pipe.server.stream_send(stream_id, data, fin)?;
3501
3502            self.advance().ok();
3503
3504            Ok(())
3505        }
3506    }
3507}
3508
3509#[cfg(test)]
3510mod tests {
3511    use super::*;
3512
3513    use super::testing::*;
3514
3515    #[test]
3516    /// Make sure that random GREASE values is within the specified limit.
3517    fn grease_value_in_varint_limit() {
3518        assert!(grease_value() < 2u64.pow(62) - 1);
3519    }
3520
3521    #[cfg(not(feature = "openssl"))] // 0-RTT not supported when using openssl/quictls
3522    #[test]
3523    fn h3_handshake_0rtt() {
3524        let mut buf = [0; 65535];
3525
3526        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
3527        config
3528            .load_cert_chain_from_pem_file("examples/cert.crt")
3529            .unwrap();
3530        config
3531            .load_priv_key_from_pem_file("examples/cert.key")
3532            .unwrap();
3533        config
3534            .set_application_protos(&[b"proto1", b"proto2"])
3535            .unwrap();
3536        config.set_initial_max_data(30);
3537        config.set_initial_max_stream_data_bidi_local(15);
3538        config.set_initial_max_stream_data_bidi_remote(15);
3539        config.set_initial_max_stream_data_uni(15);
3540        config.set_initial_max_streams_bidi(3);
3541        config.set_initial_max_streams_uni(3);
3542        config.enable_early_data();
3543        config.verify_peer(false);
3544
3545        let h3_config = Config::new().unwrap();
3546
3547        // Perform initial handshake.
3548        let mut pipe = crate::test_utils::Pipe::with_config(&mut config).unwrap();
3549        assert_eq!(pipe.handshake(), Ok(()));
3550
3551        // Extract session,
3552        let session = pipe.client.session().unwrap();
3553
3554        // Configure session on new connection.
3555        let mut pipe = crate::test_utils::Pipe::with_config(&mut config).unwrap();
3556        assert_eq!(pipe.client.set_session(session), Ok(()));
3557
3558        // Can't create an H3 connection until the QUIC connection is determined
3559        // to have made sufficient early data progress.
3560        assert!(matches!(
3561            Connection::with_transport(&mut pipe.client, &h3_config),
3562            Err(Error::InternalError)
3563        ));
3564
3565        // Client sends initial flight.
3566        let (len, _) = pipe.client.send(&mut buf).unwrap();
3567
3568        // Now an H3 connection can be created.
3569        assert!(Connection::with_transport(&mut pipe.client, &h3_config).is_ok());
3570        assert_eq!(pipe.server_recv(&mut buf[..len]), Ok(len));
3571
3572        // Client sends 0-RTT packet.
3573        let pkt_type = crate::packet::Type::ZeroRTT;
3574
3575        let frames = [crate::frame::Frame::Stream {
3576            stream_id: 6,
3577            data: <crate::range_buf::RangeBuf>::from(b"aaaaa", 0, true),
3578        }];
3579
3580        assert_eq!(
3581            pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
3582            Ok(1200)
3583        );
3584
3585        assert_eq!(pipe.server.undecryptable_pkts.len(), 0);
3586
3587        // 0-RTT stream data is readable.
3588        let mut r = pipe.server.readable();
3589        assert_eq!(r.next(), Some(6));
3590        assert_eq!(r.next(), None);
3591
3592        let mut b = [0; 15];
3593        assert_eq!(pipe.server.stream_recv(6, &mut b), Ok((5, true)));
3594        assert_eq!(&b[..5], b"aaaaa");
3595    }
3596
3597    #[test]
3598    /// Send a request with no body, get a response with no body.
3599    fn request_no_body_response_no_body() {
3600        let mut s = Session::new().unwrap();
3601        s.handshake().unwrap();
3602
3603        let (stream, req) = s.send_request(true).unwrap();
3604
3605        assert_eq!(stream, 0);
3606
3607        let ev_headers = Event::Headers {
3608            list: req,
3609            more_frames: false,
3610        };
3611
3612        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
3613        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
3614
3615        let resp = s.send_response(stream, true).unwrap();
3616
3617        let ev_headers = Event::Headers {
3618            list: resp,
3619            more_frames: false,
3620        };
3621
3622        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
3623        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
3624        assert_eq!(s.poll_client(), Err(Error::Done));
3625    }
3626
3627    #[test]
3628    /// Send a request with no body, get a response with one DATA frame.
3629    fn request_no_body_response_one_chunk() {
3630        let mut s = Session::new().unwrap();
3631        s.handshake().unwrap();
3632
3633        let (stream, req) = s.send_request(true).unwrap();
3634        assert_eq!(stream, 0);
3635
3636        let ev_headers = Event::Headers {
3637            list: req,
3638            more_frames: false,
3639        };
3640
3641        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
3642
3643        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
3644
3645        let resp = s.send_response(stream, false).unwrap();
3646
3647        let body = s.send_body_server(stream, true).unwrap();
3648
3649        let mut recv_buf = vec![0; body.len()];
3650
3651        let ev_headers = Event::Headers {
3652            list: resp,
3653            more_frames: true,
3654        };
3655
3656        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
3657
3658        assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
3659        assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(body.len()));
3660
3661        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
3662        assert_eq!(s.poll_client(), Err(Error::Done));
3663    }
3664
3665    #[test]
3666    /// Send a request with no body, get a response with multiple DATA frames.
3667    fn request_no_body_response_many_chunks() {
3668        let mut s = Session::new().unwrap();
3669        s.handshake().unwrap();
3670
3671        let (stream, req) = s.send_request(true).unwrap();
3672
3673        let ev_headers = Event::Headers {
3674            list: req,
3675            more_frames: false,
3676        };
3677
3678        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
3679        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
3680
3681        let total_data_frames = 4;
3682
3683        let resp = s.send_response(stream, false).unwrap();
3684
3685        for _ in 0..total_data_frames - 1 {
3686            s.send_body_server(stream, false).unwrap();
3687        }
3688
3689        let body = s.send_body_server(stream, true).unwrap();
3690
3691        let mut recv_buf = vec![0; body.len()];
3692
3693        let ev_headers = Event::Headers {
3694            list: resp,
3695            more_frames: true,
3696        };
3697
3698        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
3699        assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
3700        assert_eq!(s.poll_client(), Err(Error::Done));
3701
3702        for _ in 0..total_data_frames {
3703            assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(body.len()));
3704        }
3705
3706        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
3707        assert_eq!(s.poll_client(), Err(Error::Done));
3708    }
3709
3710    #[test]
3711    /// Send a request with one DATA frame, get a response with no body.
3712    fn request_one_chunk_response_no_body() {
3713        let mut s = Session::new().unwrap();
3714        s.handshake().unwrap();
3715
3716        let (stream, req) = s.send_request(false).unwrap();
3717
3718        let body = s.send_body_client(stream, true).unwrap();
3719
3720        let mut recv_buf = vec![0; body.len()];
3721
3722        let ev_headers = Event::Headers {
3723            list: req,
3724            more_frames: true,
3725        };
3726
3727        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
3728
3729        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
3730        assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
3731
3732        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
3733
3734        let resp = s.send_response(stream, true).unwrap();
3735
3736        let ev_headers = Event::Headers {
3737            list: resp,
3738            more_frames: false,
3739        };
3740
3741        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
3742        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
3743    }
3744
3745    #[test]
3746    /// Send a request with multiple DATA frames, get a response with no body.
3747    fn request_many_chunks_response_no_body() {
3748        let mut s = Session::new().unwrap();
3749        s.handshake().unwrap();
3750
3751        let (stream, req) = s.send_request(false).unwrap();
3752
3753        let total_data_frames = 4;
3754
3755        for _ in 0..total_data_frames - 1 {
3756            s.send_body_client(stream, false).unwrap();
3757        }
3758
3759        let body = s.send_body_client(stream, true).unwrap();
3760
3761        let mut recv_buf = vec![0; body.len()];
3762
3763        let ev_headers = Event::Headers {
3764            list: req,
3765            more_frames: true,
3766        };
3767
3768        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
3769        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
3770        assert_eq!(s.poll_server(), Err(Error::Done));
3771
3772        for _ in 0..total_data_frames {
3773            assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
3774        }
3775
3776        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
3777
3778        let resp = s.send_response(stream, true).unwrap();
3779
3780        let ev_headers = Event::Headers {
3781            list: resp,
3782            more_frames: false,
3783        };
3784
3785        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
3786        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
3787    }
3788
3789    #[test]
3790    /// Send a request with multiple DATA frames, get a response with one DATA
3791    /// frame.
3792    fn many_requests_many_chunks_response_one_chunk() {
3793        let mut s = Session::new().unwrap();
3794        s.handshake().unwrap();
3795
3796        let mut reqs = Vec::new();
3797
3798        let (stream1, req1) = s.send_request(false).unwrap();
3799        assert_eq!(stream1, 0);
3800        reqs.push(req1);
3801
3802        let (stream2, req2) = s.send_request(false).unwrap();
3803        assert_eq!(stream2, 4);
3804        reqs.push(req2);
3805
3806        let (stream3, req3) = s.send_request(false).unwrap();
3807        assert_eq!(stream3, 8);
3808        reqs.push(req3);
3809
3810        let body = s.send_body_client(stream1, false).unwrap();
3811        s.send_body_client(stream2, false).unwrap();
3812        s.send_body_client(stream3, false).unwrap();
3813
3814        let mut recv_buf = vec![0; body.len()];
3815
3816        // Reverse order of writes.
3817
3818        s.send_body_client(stream3, true).unwrap();
3819        s.send_body_client(stream2, true).unwrap();
3820        s.send_body_client(stream1, true).unwrap();
3821
3822        let (_, ev) = s.poll_server().unwrap();
3823        let ev_headers = Event::Headers {
3824            list: reqs[0].clone(),
3825            more_frames: true,
3826        };
3827        assert_eq!(ev, ev_headers);
3828
3829        let (_, ev) = s.poll_server().unwrap();
3830        let ev_headers = Event::Headers {
3831            list: reqs[1].clone(),
3832            more_frames: true,
3833        };
3834        assert_eq!(ev, ev_headers);
3835
3836        let (_, ev) = s.poll_server().unwrap();
3837        let ev_headers = Event::Headers {
3838            list: reqs[2].clone(),
3839            more_frames: true,
3840        };
3841        assert_eq!(ev, ev_headers);
3842
3843        assert_eq!(s.poll_server(), Ok((0, Event::Data)));
3844        assert_eq!(s.recv_body_server(0, &mut recv_buf), Ok(body.len()));
3845        assert_eq!(s.poll_client(), Err(Error::Done));
3846        assert_eq!(s.recv_body_server(0, &mut recv_buf), Ok(body.len()));
3847        assert_eq!(s.poll_server(), Ok((0, Event::Finished)));
3848
3849        assert_eq!(s.poll_server(), Ok((4, Event::Data)));
3850        assert_eq!(s.recv_body_server(4, &mut recv_buf), Ok(body.len()));
3851        assert_eq!(s.poll_client(), Err(Error::Done));
3852        assert_eq!(s.recv_body_server(4, &mut recv_buf), Ok(body.len()));
3853        assert_eq!(s.poll_server(), Ok((4, Event::Finished)));
3854
3855        assert_eq!(s.poll_server(), Ok((8, Event::Data)));
3856        assert_eq!(s.recv_body_server(8, &mut recv_buf), Ok(body.len()));
3857        assert_eq!(s.poll_client(), Err(Error::Done));
3858        assert_eq!(s.recv_body_server(8, &mut recv_buf), Ok(body.len()));
3859        assert_eq!(s.poll_server(), Ok((8, Event::Finished)));
3860
3861        assert_eq!(s.poll_server(), Err(Error::Done));
3862
3863        let mut resps = Vec::new();
3864
3865        let resp1 = s.send_response(stream1, true).unwrap();
3866        resps.push(resp1);
3867
3868        let resp2 = s.send_response(stream2, true).unwrap();
3869        resps.push(resp2);
3870
3871        let resp3 = s.send_response(stream3, true).unwrap();
3872        resps.push(resp3);
3873
3874        for _ in 0..resps.len() {
3875            let (stream, ev) = s.poll_client().unwrap();
3876            let ev_headers = Event::Headers {
3877                list: resps[(stream / 4) as usize].clone(),
3878                more_frames: false,
3879            };
3880            assert_eq!(ev, ev_headers);
3881            assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
3882        }
3883
3884        assert_eq!(s.poll_client(), Err(Error::Done));
3885    }
3886
3887    #[test]
3888    /// Send a request with no body, get a response with one DATA frame and an
3889    /// empty FIN after reception from the client.
3890    fn request_no_body_response_one_chunk_empty_fin() {
3891        let mut s = Session::new().unwrap();
3892        s.handshake().unwrap();
3893
3894        let (stream, req) = s.send_request(true).unwrap();
3895
3896        let ev_headers = Event::Headers {
3897            list: req,
3898            more_frames: false,
3899        };
3900
3901        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
3902        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
3903
3904        let resp = s.send_response(stream, false).unwrap();
3905
3906        let body = s.send_body_server(stream, false).unwrap();
3907
3908        let mut recv_buf = vec![0; body.len()];
3909
3910        let ev_headers = Event::Headers {
3911            list: resp,
3912            more_frames: true,
3913        };
3914
3915        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
3916
3917        assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
3918        assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(body.len()));
3919
3920        assert_eq!(s.pipe.server.stream_send(stream, &[], true), Ok(0));
3921        s.advance().ok();
3922
3923        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
3924        assert_eq!(s.poll_client(), Err(Error::Done));
3925    }
3926
3927    #[test]
3928    /// Send a request with no body, get a response with no body followed by
3929    /// GREASE that is STREAM frame with a FIN.
3930    fn request_no_body_response_no_body_with_grease() {
3931        let mut s = Session::new().unwrap();
3932        s.handshake().unwrap();
3933
3934        let (stream, req) = s.send_request(true).unwrap();
3935
3936        assert_eq!(stream, 0);
3937
3938        let ev_headers = Event::Headers {
3939            list: req,
3940            more_frames: false,
3941        };
3942
3943        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
3944        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
3945
3946        let resp = s.send_response(stream, false).unwrap();
3947
3948        let ev_headers = Event::Headers {
3949            list: resp,
3950            more_frames: true,
3951        };
3952
3953        // Inject a GREASE frame
3954        let mut d = [42; 10];
3955        let mut b = octets::OctetsMut::with_slice(&mut d);
3956
3957        let frame_type = b.put_varint(148_764_065_110_560_899).unwrap();
3958        s.pipe.server.stream_send(0, frame_type, false).unwrap();
3959
3960        let frame_len = b.put_varint(10).unwrap();
3961        s.pipe.server.stream_send(0, frame_len, false).unwrap();
3962
3963        s.pipe.server.stream_send(0, &d, true).unwrap();
3964
3965        s.advance().ok();
3966
3967        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
3968        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
3969        assert_eq!(s.poll_client(), Err(Error::Done));
3970    }
3971
3972    #[test]
3973    /// Try to send DATA frames before HEADERS.
3974    fn body_response_before_headers() {
3975        let mut s = Session::new().unwrap();
3976        s.handshake().unwrap();
3977
3978        let (stream, req) = s.send_request(true).unwrap();
3979        assert_eq!(stream, 0);
3980
3981        let ev_headers = Event::Headers {
3982            list: req,
3983            more_frames: false,
3984        };
3985
3986        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
3987
3988        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
3989
3990        assert_eq!(
3991            s.send_body_server(stream, true),
3992            Err(Error::FrameUnexpected)
3993        );
3994
3995        assert_eq!(s.poll_client(), Err(Error::Done));
3996    }
3997
3998    #[test]
3999    /// Try to send DATA frames on wrong streams, ensure the API returns an
4000    /// error before anything hits the transport layer.
4001    fn send_body_invalid_client_stream() {
4002        let mut s = Session::new().unwrap();
4003        s.handshake().unwrap();
4004
4005        assert_eq!(s.send_body_client(0, true), Err(Error::FrameUnexpected));
4006
4007        assert_eq!(
4008            s.send_body_client(s.client.control_stream_id.unwrap(), true),
4009            Err(Error::FrameUnexpected)
4010        );
4011
4012        assert_eq!(
4013            s.send_body_client(
4014                s.client.local_qpack_streams.encoder_stream_id.unwrap(),
4015                true
4016            ),
4017            Err(Error::FrameUnexpected)
4018        );
4019
4020        assert_eq!(
4021            s.send_body_client(
4022                s.client.local_qpack_streams.decoder_stream_id.unwrap(),
4023                true
4024            ),
4025            Err(Error::FrameUnexpected)
4026        );
4027
4028        assert_eq!(
4029            s.send_body_client(s.client.peer_control_stream_id.unwrap(), true),
4030            Err(Error::FrameUnexpected)
4031        );
4032
4033        assert_eq!(
4034            s.send_body_client(
4035                s.client.peer_qpack_streams.encoder_stream_id.unwrap(),
4036                true
4037            ),
4038            Err(Error::FrameUnexpected)
4039        );
4040
4041        assert_eq!(
4042            s.send_body_client(
4043                s.client.peer_qpack_streams.decoder_stream_id.unwrap(),
4044                true
4045            ),
4046            Err(Error::FrameUnexpected)
4047        );
4048    }
4049
4050    #[test]
4051    /// Try to send DATA frames on wrong streams, ensure the API returns an
4052    /// error before anything hits the transport layer.
4053    fn send_body_invalid_server_stream() {
4054        let mut s = Session::new().unwrap();
4055        s.handshake().unwrap();
4056
4057        assert_eq!(s.send_body_server(0, true), Err(Error::FrameUnexpected));
4058
4059        assert_eq!(
4060            s.send_body_server(s.server.control_stream_id.unwrap(), true),
4061            Err(Error::FrameUnexpected)
4062        );
4063
4064        assert_eq!(
4065            s.send_body_server(
4066                s.server.local_qpack_streams.encoder_stream_id.unwrap(),
4067                true
4068            ),
4069            Err(Error::FrameUnexpected)
4070        );
4071
4072        assert_eq!(
4073            s.send_body_server(
4074                s.server.local_qpack_streams.decoder_stream_id.unwrap(),
4075                true
4076            ),
4077            Err(Error::FrameUnexpected)
4078        );
4079
4080        assert_eq!(
4081            s.send_body_server(s.server.peer_control_stream_id.unwrap(), true),
4082            Err(Error::FrameUnexpected)
4083        );
4084
4085        assert_eq!(
4086            s.send_body_server(
4087                s.server.peer_qpack_streams.encoder_stream_id.unwrap(),
4088                true
4089            ),
4090            Err(Error::FrameUnexpected)
4091        );
4092
4093        assert_eq!(
4094            s.send_body_server(
4095                s.server.peer_qpack_streams.decoder_stream_id.unwrap(),
4096                true
4097            ),
4098            Err(Error::FrameUnexpected)
4099        );
4100    }
4101
4102    #[test]
4103    /// Client sends request with body and trailers.
4104    fn trailers() {
4105        let mut s = Session::new().unwrap();
4106        s.handshake().unwrap();
4107
4108        let (stream, req) = s.send_request(false).unwrap();
4109
4110        let body = s.send_body_client(stream, false).unwrap();
4111
4112        let mut recv_buf = vec![0; body.len()];
4113
4114        let req_trailers = vec![Header::new(b"foo", b"bar")];
4115
4116        s.client
4117            .send_additional_headers(
4118                &mut s.pipe.client,
4119                stream,
4120                &req_trailers,
4121                true,
4122                true,
4123            )
4124            .unwrap();
4125
4126        s.advance().ok();
4127
4128        let ev_headers = Event::Headers {
4129            list: req,
4130            more_frames: true,
4131        };
4132
4133        let ev_trailers = Event::Headers {
4134            list: req_trailers,
4135            more_frames: false,
4136        };
4137
4138        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4139
4140        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
4141        assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
4142
4143        assert_eq!(s.poll_server(), Ok((stream, ev_trailers)));
4144        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
4145    }
4146
4147    #[test]
4148    /// Server responds with a 103, then a 200 with no body.
4149    fn informational_response() {
4150        let mut s = Session::new().unwrap();
4151        s.handshake().unwrap();
4152
4153        let (stream, req) = s.send_request(true).unwrap();
4154
4155        assert_eq!(stream, 0);
4156
4157        let ev_headers = Event::Headers {
4158            list: req,
4159            more_frames: false,
4160        };
4161
4162        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4163        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
4164
4165        let info_resp = vec![
4166            Header::new(b":status", b"103"),
4167            Header::new(b"link", b"<https://example.com>; rel=\"preconnect\""),
4168        ];
4169
4170        let resp = vec![
4171            Header::new(b":status", b"200"),
4172            Header::new(b"server", b"quiche-test"),
4173        ];
4174
4175        s.server
4176            .send_response(&mut s.pipe.server, stream, &info_resp, false)
4177            .unwrap();
4178
4179        s.server
4180            .send_additional_headers(
4181                &mut s.pipe.server,
4182                stream,
4183                &resp,
4184                false,
4185                true,
4186            )
4187            .unwrap();
4188
4189        s.advance().ok();
4190
4191        let ev_info_headers = Event::Headers {
4192            list: info_resp,
4193            more_frames: true,
4194        };
4195
4196        let ev_headers = Event::Headers {
4197            list: resp,
4198            more_frames: false,
4199        };
4200
4201        assert_eq!(s.poll_client(), Ok((stream, ev_info_headers)));
4202        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
4203        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
4204        assert_eq!(s.poll_client(), Err(Error::Done));
4205    }
4206
4207    #[test]
4208    /// Server responds with a 103, then attempts to send a 200 using
4209    /// send_response again, which should fail.
4210    fn no_multiple_response() {
4211        let mut s = Session::new().unwrap();
4212        s.handshake().unwrap();
4213
4214        let (stream, req) = s.send_request(true).unwrap();
4215
4216        assert_eq!(stream, 0);
4217
4218        let ev_headers = Event::Headers {
4219            list: req,
4220            more_frames: false,
4221        };
4222
4223        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4224        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
4225
4226        let info_resp = vec![
4227            Header::new(b":status", b"103"),
4228            Header::new(b"link", b"<https://example.com>; rel=\"preconnect\""),
4229        ];
4230
4231        let resp = vec![
4232            Header::new(b":status", b"200"),
4233            Header::new(b"server", b"quiche-test"),
4234        ];
4235
4236        s.server
4237            .send_response(&mut s.pipe.server, stream, &info_resp, false)
4238            .unwrap();
4239
4240        assert_eq!(
4241            Err(Error::FrameUnexpected),
4242            s.server
4243                .send_response(&mut s.pipe.server, stream, &resp, true)
4244        );
4245
4246        s.advance().ok();
4247
4248        let ev_info_headers = Event::Headers {
4249            list: info_resp,
4250            more_frames: true,
4251        };
4252
4253        assert_eq!(s.poll_client(), Ok((stream, ev_info_headers)));
4254        assert_eq!(s.poll_client(), Err(Error::Done));
4255    }
4256
4257    #[test]
4258    /// Server attempts to use send_additional_headers before initial response.
4259    fn no_send_additional_before_initial_response() {
4260        let mut s = Session::new().unwrap();
4261        s.handshake().unwrap();
4262
4263        let (stream, req) = s.send_request(true).unwrap();
4264
4265        assert_eq!(stream, 0);
4266
4267        let ev_headers = Event::Headers {
4268            list: req,
4269            more_frames: false,
4270        };
4271
4272        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4273        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
4274
4275        let info_resp = vec![
4276            Header::new(b":status", b"103"),
4277            Header::new(b"link", b"<https://example.com>; rel=\"preconnect\""),
4278        ];
4279
4280        assert_eq!(
4281            Err(Error::FrameUnexpected),
4282            s.server.send_additional_headers(
4283                &mut s.pipe.server,
4284                stream,
4285                &info_resp,
4286                false,
4287                false
4288            )
4289        );
4290
4291        s.advance().ok();
4292
4293        assert_eq!(s.poll_client(), Err(Error::Done));
4294    }
4295
4296    #[test]
4297    /// Client sends multiple HEADERS before data.
4298    fn additional_headers_before_data_client() {
4299        let mut s = Session::new().unwrap();
4300        s.handshake().unwrap();
4301
4302        let (stream, req) = s.send_request(false).unwrap();
4303
4304        let req_trailer = vec![Header::new(b"goodbye", b"world")];
4305
4306        assert_eq!(
4307            s.client.send_additional_headers(
4308                &mut s.pipe.client,
4309                stream,
4310                &req_trailer,
4311                true,
4312                false
4313            ),
4314            Ok(())
4315        );
4316
4317        s.advance().ok();
4318
4319        let ev_initial_headers = Event::Headers {
4320            list: req,
4321            more_frames: true,
4322        };
4323
4324        let ev_trailing_headers = Event::Headers {
4325            list: req_trailer,
4326            more_frames: true,
4327        };
4328
4329        assert_eq!(s.poll_server(), Ok((stream, ev_initial_headers)));
4330        assert_eq!(s.poll_server(), Ok((stream, ev_trailing_headers)));
4331        assert_eq!(s.poll_server(), Err(Error::Done));
4332    }
4333
4334    #[test]
4335    /// Client sends multiple HEADERS before data.
4336    fn data_after_trailers_client() {
4337        let mut s = Session::new().unwrap();
4338        s.handshake().unwrap();
4339
4340        let (stream, req) = s.send_request(false).unwrap();
4341
4342        let body = s.send_body_client(stream, false).unwrap();
4343
4344        let mut recv_buf = vec![0; body.len()];
4345
4346        let req_trailers = vec![Header::new(b"foo", b"bar")];
4347
4348        s.client
4349            .send_additional_headers(
4350                &mut s.pipe.client,
4351                stream,
4352                &req_trailers,
4353                true,
4354                false,
4355            )
4356            .unwrap();
4357
4358        s.advance().ok();
4359
4360        s.send_frame_client(
4361            frame::Frame::Data {
4362                payload: vec![1, 2, 3, 4],
4363            },
4364            stream,
4365            true,
4366        )
4367        .unwrap();
4368
4369        let ev_headers = Event::Headers {
4370            list: req,
4371            more_frames: true,
4372        };
4373
4374        let ev_trailers = Event::Headers {
4375            list: req_trailers,
4376            more_frames: true,
4377        };
4378
4379        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4380        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
4381        assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
4382        assert_eq!(s.poll_server(), Ok((stream, ev_trailers)));
4383        assert_eq!(s.poll_server(), Err(Error::FrameUnexpected));
4384    }
4385
4386    #[test]
4387    /// Send a MAX_PUSH_ID frame from the client on a valid stream.
4388    fn max_push_id_from_client_good() {
4389        let mut s = Session::new().unwrap();
4390        s.handshake().unwrap();
4391
4392        s.send_frame_client(
4393            frame::Frame::MaxPushId { push_id: 1 },
4394            s.client.control_stream_id.unwrap(),
4395            false,
4396        )
4397        .unwrap();
4398
4399        assert_eq!(s.poll_server(), Err(Error::Done));
4400    }
4401
4402    #[test]
4403    /// Send a MAX_PUSH_ID frame from the client on an invalid stream.
4404    fn max_push_id_from_client_bad_stream() {
4405        let mut s = Session::new().unwrap();
4406        s.handshake().unwrap();
4407
4408        let (stream, req) = s.send_request(false).unwrap();
4409
4410        s.send_frame_client(
4411            frame::Frame::MaxPushId { push_id: 2 },
4412            stream,
4413            false,
4414        )
4415        .unwrap();
4416
4417        let ev_headers = Event::Headers {
4418            list: req,
4419            more_frames: true,
4420        };
4421
4422        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4423        assert_eq!(s.poll_server(), Err(Error::FrameUnexpected));
4424    }
4425
4426    #[test]
4427    /// Send a sequence of MAX_PUSH_ID frames from the client that attempt to
4428    /// reduce the limit.
4429    fn max_push_id_from_client_limit_reduction() {
4430        let mut s = Session::new().unwrap();
4431        s.handshake().unwrap();
4432
4433        s.send_frame_client(
4434            frame::Frame::MaxPushId { push_id: 2 },
4435            s.client.control_stream_id.unwrap(),
4436            false,
4437        )
4438        .unwrap();
4439
4440        s.send_frame_client(
4441            frame::Frame::MaxPushId { push_id: 1 },
4442            s.client.control_stream_id.unwrap(),
4443            false,
4444        )
4445        .unwrap();
4446
4447        assert_eq!(s.poll_server(), Err(Error::IdError));
4448    }
4449
4450    #[test]
4451    /// Send a MAX_PUSH_ID frame from the server, which is forbidden.
4452    fn max_push_id_from_server() {
4453        let mut s = Session::new().unwrap();
4454        s.handshake().unwrap();
4455
4456        s.send_frame_server(
4457            frame::Frame::MaxPushId { push_id: 1 },
4458            s.server.control_stream_id.unwrap(),
4459            false,
4460        )
4461        .unwrap();
4462
4463        assert_eq!(s.poll_client(), Err(Error::FrameUnexpected));
4464    }
4465
4466    #[test]
4467    /// Send a PUSH_PROMISE frame from the client, which is forbidden.
4468    fn push_promise_from_client() {
4469        let mut s = Session::new().unwrap();
4470        s.handshake().unwrap();
4471
4472        let (stream, req) = s.send_request(false).unwrap();
4473
4474        let header_block = s.client.encode_header_block(&req).unwrap();
4475
4476        s.send_frame_client(
4477            frame::Frame::PushPromise {
4478                push_id: 1,
4479                header_block,
4480            },
4481            stream,
4482            false,
4483        )
4484        .unwrap();
4485
4486        let ev_headers = Event::Headers {
4487            list: req,
4488            more_frames: true,
4489        };
4490
4491        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4492        assert_eq!(s.poll_server(), Err(Error::FrameUnexpected));
4493    }
4494
4495    #[test]
4496    /// Send a CANCEL_PUSH frame from the client.
4497    fn cancel_push_from_client() {
4498        let mut s = Session::new().unwrap();
4499        s.handshake().unwrap();
4500
4501        s.send_frame_client(
4502            frame::Frame::CancelPush { push_id: 1 },
4503            s.client.control_stream_id.unwrap(),
4504            false,
4505        )
4506        .unwrap();
4507
4508        assert_eq!(s.poll_server(), Err(Error::Done));
4509    }
4510
4511    #[test]
4512    /// Send a CANCEL_PUSH frame from the client on an invalid stream.
4513    fn cancel_push_from_client_bad_stream() {
4514        let mut s = Session::new().unwrap();
4515        s.handshake().unwrap();
4516
4517        let (stream, req) = s.send_request(false).unwrap();
4518
4519        s.send_frame_client(
4520            frame::Frame::CancelPush { push_id: 2 },
4521            stream,
4522            false,
4523        )
4524        .unwrap();
4525
4526        let ev_headers = Event::Headers {
4527            list: req,
4528            more_frames: true,
4529        };
4530
4531        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4532        assert_eq!(s.poll_server(), Err(Error::FrameUnexpected));
4533    }
4534
4535    #[test]
4536    /// Send a CANCEL_PUSH frame from the client.
4537    fn cancel_push_from_server() {
4538        let mut s = Session::new().unwrap();
4539        s.handshake().unwrap();
4540
4541        s.send_frame_server(
4542            frame::Frame::CancelPush { push_id: 1 },
4543            s.server.control_stream_id.unwrap(),
4544            false,
4545        )
4546        .unwrap();
4547
4548        assert_eq!(s.poll_client(), Err(Error::Done));
4549    }
4550
4551    #[test]
4552    /// Send a GOAWAY frame from the client.
4553    fn goaway_from_client_good() {
4554        let mut s = Session::new().unwrap();
4555        s.handshake().unwrap();
4556
4557        s.client.send_goaway(&mut s.pipe.client, 100).unwrap();
4558
4559        s.advance().ok();
4560
4561        // TODO: server push
4562        assert_eq!(s.poll_server(), Ok((0, Event::GoAway)));
4563    }
4564
4565    #[test]
4566    /// Send a GOAWAY frame from the server.
4567    fn goaway_from_server_good() {
4568        let mut s = Session::new().unwrap();
4569        s.handshake().unwrap();
4570
4571        s.server.send_goaway(&mut s.pipe.server, 4000).unwrap();
4572
4573        s.advance().ok();
4574
4575        assert_eq!(s.poll_client(), Ok((4000, Event::GoAway)));
4576    }
4577
4578    #[test]
4579    /// A client MUST NOT send a request after it receives GOAWAY.
4580    fn client_request_after_goaway() {
4581        let mut s = Session::new().unwrap();
4582        s.handshake().unwrap();
4583
4584        s.server.send_goaway(&mut s.pipe.server, 4000).unwrap();
4585
4586        s.advance().ok();
4587
4588        assert_eq!(s.poll_client(), Ok((4000, Event::GoAway)));
4589
4590        assert_eq!(s.send_request(true), Err(Error::FrameUnexpected));
4591    }
4592
4593    #[test]
4594    /// Send a GOAWAY frame from the server, using an invalid goaway ID.
4595    fn goaway_from_server_invalid_id() {
4596        let mut s = Session::new().unwrap();
4597        s.handshake().unwrap();
4598
4599        s.send_frame_server(
4600            frame::Frame::GoAway { id: 1 },
4601            s.server.control_stream_id.unwrap(),
4602            false,
4603        )
4604        .unwrap();
4605
4606        assert_eq!(s.poll_client(), Err(Error::IdError));
4607    }
4608
4609    #[test]
4610    /// Send multiple GOAWAY frames from the server, that increase the goaway
4611    /// ID.
4612    fn goaway_from_server_increase_id() {
4613        let mut s = Session::new().unwrap();
4614        s.handshake().unwrap();
4615
4616        s.send_frame_server(
4617            frame::Frame::GoAway { id: 0 },
4618            s.server.control_stream_id.unwrap(),
4619            false,
4620        )
4621        .unwrap();
4622
4623        s.send_frame_server(
4624            frame::Frame::GoAway { id: 4 },
4625            s.server.control_stream_id.unwrap(),
4626            false,
4627        )
4628        .unwrap();
4629
4630        assert_eq!(s.poll_client(), Ok((0, Event::GoAway)));
4631
4632        assert_eq!(s.poll_client(), Err(Error::IdError));
4633    }
4634
4635    #[test]
4636    #[cfg(feature = "sfv")]
4637    fn parse_priority_field_value() {
4638        // Legal dicts
4639        assert_eq!(
4640            Ok(Priority::new(0, false)),
4641            Priority::try_from(b"u=0".as_slice())
4642        );
4643        assert_eq!(
4644            Ok(Priority::new(3, false)),
4645            Priority::try_from(b"u=3".as_slice())
4646        );
4647        assert_eq!(
4648            Ok(Priority::new(7, false)),
4649            Priority::try_from(b"u=7".as_slice())
4650        );
4651
4652        assert_eq!(
4653            Ok(Priority::new(0, true)),
4654            Priority::try_from(b"u=0, i".as_slice())
4655        );
4656        assert_eq!(
4657            Ok(Priority::new(3, true)),
4658            Priority::try_from(b"u=3, i".as_slice())
4659        );
4660        assert_eq!(
4661            Ok(Priority::new(7, true)),
4662            Priority::try_from(b"u=7, i".as_slice())
4663        );
4664
4665        assert_eq!(
4666            Ok(Priority::new(0, true)),
4667            Priority::try_from(b"u=0, i=?1".as_slice())
4668        );
4669        assert_eq!(
4670            Ok(Priority::new(3, true)),
4671            Priority::try_from(b"u=3, i=?1".as_slice())
4672        );
4673        assert_eq!(
4674            Ok(Priority::new(7, true)),
4675            Priority::try_from(b"u=7, i=?1".as_slice())
4676        );
4677
4678        assert_eq!(
4679            Ok(Priority::new(3, false)),
4680            Priority::try_from(b"".as_slice())
4681        );
4682
4683        assert_eq!(
4684            Ok(Priority::new(0, true)),
4685            Priority::try_from(b"u=0;foo, i;bar".as_slice())
4686        );
4687        assert_eq!(
4688            Ok(Priority::new(3, true)),
4689            Priority::try_from(b"u=3;hello, i;world".as_slice())
4690        );
4691        assert_eq!(
4692            Ok(Priority::new(7, true)),
4693            Priority::try_from(b"u=7;croeso, i;gymru".as_slice())
4694        );
4695
4696        assert_eq!(
4697            Ok(Priority::new(0, true)),
4698            Priority::try_from(b"u=0, i, spinaltap=11".as_slice())
4699        );
4700
4701        // Illegal formats
4702        assert_eq!(Err(Error::Done), Priority::try_from(b"0".as_slice()));
4703        assert_eq!(
4704            Ok(Priority::new(7, false)),
4705            Priority::try_from(b"u=-1".as_slice())
4706        );
4707        assert_eq!(Err(Error::Done), Priority::try_from(b"u=0.2".as_slice()));
4708        assert_eq!(
4709            Ok(Priority::new(7, false)),
4710            Priority::try_from(b"u=100".as_slice())
4711        );
4712        assert_eq!(
4713            Err(Error::Done),
4714            Priority::try_from(b"u=3, i=true".as_slice())
4715        );
4716
4717        // Trailing comma in dict is malformed
4718        assert_eq!(Err(Error::Done), Priority::try_from(b"u=7, ".as_slice()));
4719    }
4720
4721    #[test]
4722    /// Send a PRIORITY_UPDATE for request stream from the client.
4723    fn priority_update_request() {
4724        let mut s = Session::new().unwrap();
4725        s.handshake().unwrap();
4726
4727        s.client
4728            .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
4729                urgency: 3,
4730                incremental: false,
4731            })
4732            .unwrap();
4733        s.advance().ok();
4734
4735        assert_eq!(s.poll_server(), Ok((0, Event::PriorityUpdate)));
4736        assert_eq!(s.poll_server(), Err(Error::Done));
4737    }
4738
4739    #[test]
4740    /// Send a PRIORITY_UPDATE for request stream from the client.
4741    fn priority_update_single_stream_rearm() {
4742        let mut s = Session::new().unwrap();
4743        s.handshake().unwrap();
4744
4745        s.client
4746            .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
4747                urgency: 3,
4748                incremental: false,
4749            })
4750            .unwrap();
4751        s.advance().ok();
4752
4753        assert_eq!(s.poll_server(), Ok((0, Event::PriorityUpdate)));
4754        assert_eq!(s.poll_server(), Err(Error::Done));
4755
4756        s.client
4757            .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
4758                urgency: 5,
4759                incremental: false,
4760            })
4761            .unwrap();
4762        s.advance().ok();
4763
4764        assert_eq!(s.poll_server(), Err(Error::Done));
4765
4766        // There is only one PRIORITY_UPDATE frame to read. Once read, the event
4767        // will rearm ready for more.
4768        assert_eq!(s.server.take_last_priority_update(0), Ok(b"u=5".to_vec()));
4769        assert_eq!(s.server.take_last_priority_update(0), Err(Error::Done));
4770
4771        s.client
4772            .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
4773                urgency: 7,
4774                incremental: false,
4775            })
4776            .unwrap();
4777        s.advance().ok();
4778
4779        assert_eq!(s.poll_server(), Ok((0, Event::PriorityUpdate)));
4780        assert_eq!(s.poll_server(), Err(Error::Done));
4781
4782        assert_eq!(s.server.take_last_priority_update(0), Ok(b"u=7".to_vec()));
4783        assert_eq!(s.server.take_last_priority_update(0), Err(Error::Done));
4784    }
4785
4786    #[test]
4787    /// Send multiple PRIORITY_UPDATE frames for different streams from the
4788    /// client across multiple flights of exchange.
4789    fn priority_update_request_multiple_stream_arm_multiple_flights() {
4790        let mut s = Session::new().unwrap();
4791        s.handshake().unwrap();
4792
4793        s.client
4794            .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
4795                urgency: 3,
4796                incremental: false,
4797            })
4798            .unwrap();
4799        s.advance().ok();
4800
4801        assert_eq!(s.poll_server(), Ok((0, Event::PriorityUpdate)));
4802        assert_eq!(s.poll_server(), Err(Error::Done));
4803
4804        s.client
4805            .send_priority_update_for_request(&mut s.pipe.client, 4, &Priority {
4806                urgency: 1,
4807                incremental: false,
4808            })
4809            .unwrap();
4810        s.advance().ok();
4811
4812        assert_eq!(s.poll_server(), Ok((4, Event::PriorityUpdate)));
4813        assert_eq!(s.poll_server(), Err(Error::Done));
4814
4815        s.client
4816            .send_priority_update_for_request(&mut s.pipe.client, 8, &Priority {
4817                urgency: 2,
4818                incremental: false,
4819            })
4820            .unwrap();
4821        s.advance().ok();
4822
4823        assert_eq!(s.poll_server(), Ok((8, Event::PriorityUpdate)));
4824        assert_eq!(s.poll_server(), Err(Error::Done));
4825
4826        assert_eq!(s.server.take_last_priority_update(0), Ok(b"u=3".to_vec()));
4827        assert_eq!(s.server.take_last_priority_update(4), Ok(b"u=1".to_vec()));
4828        assert_eq!(s.server.take_last_priority_update(8), Ok(b"u=2".to_vec()));
4829        assert_eq!(s.server.take_last_priority_update(0), Err(Error::Done));
4830    }
4831
4832    #[test]
4833    /// Send multiple PRIORITY_UPDATE frames for different streams from the
4834    /// client across a single flight.
4835    fn priority_update_request_multiple_stream_arm_single_flight() {
4836        let mut s = Session::new().unwrap();
4837        s.handshake().unwrap();
4838
4839        let mut d = [42; 65535];
4840
4841        let mut b = octets::OctetsMut::with_slice(&mut d);
4842
4843        let p1 = frame::Frame::PriorityUpdateRequest {
4844            prioritized_element_id: 0,
4845            priority_field_value: b"u=3".to_vec(),
4846        };
4847
4848        let p2 = frame::Frame::PriorityUpdateRequest {
4849            prioritized_element_id: 4,
4850            priority_field_value: b"u=3".to_vec(),
4851        };
4852
4853        let p3 = frame::Frame::PriorityUpdateRequest {
4854            prioritized_element_id: 8,
4855            priority_field_value: b"u=3".to_vec(),
4856        };
4857
4858        p1.to_bytes(&mut b).unwrap();
4859        p2.to_bytes(&mut b).unwrap();
4860        p3.to_bytes(&mut b).unwrap();
4861
4862        let off = b.off();
4863        s.pipe
4864            .client
4865            .stream_send(s.client.control_stream_id.unwrap(), &d[..off], false)
4866            .unwrap();
4867
4868        s.advance().ok();
4869
4870        assert_eq!(s.poll_server(), Ok((0, Event::PriorityUpdate)));
4871        assert_eq!(s.poll_server(), Ok((4, Event::PriorityUpdate)));
4872        assert_eq!(s.poll_server(), Ok((8, Event::PriorityUpdate)));
4873        assert_eq!(s.poll_server(), Err(Error::Done));
4874
4875        assert_eq!(s.server.take_last_priority_update(0), Ok(b"u=3".to_vec()));
4876        assert_eq!(s.server.take_last_priority_update(4), Ok(b"u=3".to_vec()));
4877        assert_eq!(s.server.take_last_priority_update(8), Ok(b"u=3".to_vec()));
4878
4879        assert_eq!(s.server.take_last_priority_update(0), Err(Error::Done));
4880    }
4881
4882    #[test]
4883    /// Send a PRIORITY_UPDATE for a request stream, before and after the stream
4884    /// has been completed.
4885    fn priority_update_request_collected_completed() {
4886        let mut s = Session::new().unwrap();
4887        s.handshake().unwrap();
4888
4889        s.client
4890            .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
4891                urgency: 3,
4892                incremental: false,
4893            })
4894            .unwrap();
4895        s.advance().ok();
4896
4897        let (stream, req) = s.send_request(true).unwrap();
4898        let ev_headers = Event::Headers {
4899            list: req,
4900            more_frames: false,
4901        };
4902
4903        // Priority event is generated before request headers.
4904        assert_eq!(s.poll_server(), Ok((0, Event::PriorityUpdate)));
4905        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4906        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
4907        assert_eq!(s.poll_server(), Err(Error::Done));
4908
4909        assert_eq!(s.server.take_last_priority_update(0), Ok(b"u=3".to_vec()));
4910        assert_eq!(s.server.take_last_priority_update(0), Err(Error::Done));
4911
4912        let resp = s.send_response(stream, true).unwrap();
4913
4914        let ev_headers = Event::Headers {
4915            list: resp,
4916            more_frames: false,
4917        };
4918
4919        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
4920        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
4921        assert_eq!(s.poll_client(), Err(Error::Done));
4922
4923        // Now send a PRIORITY_UPDATE for the completed request stream.
4924        s.client
4925            .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
4926                urgency: 3,
4927                incremental: false,
4928            })
4929            .unwrap();
4930        s.advance().ok();
4931
4932        // No event generated at server
4933        assert_eq!(s.poll_server(), Err(Error::Done));
4934    }
4935
4936    #[test]
4937    /// Send a PRIORITY_UPDATE for a request stream, before and after the stream
4938    /// has been stopped.
4939    fn priority_update_request_collected_stopped() {
4940        let mut s = Session::new().unwrap();
4941        s.handshake().unwrap();
4942
4943        s.client
4944            .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
4945                urgency: 3,
4946                incremental: false,
4947            })
4948            .unwrap();
4949        s.advance().ok();
4950
4951        let (stream, req) = s.send_request(false).unwrap();
4952        let ev_headers = Event::Headers {
4953            list: req,
4954            more_frames: true,
4955        };
4956
4957        // Priority event is generated before request headers.
4958        assert_eq!(s.poll_server(), Ok((0, Event::PriorityUpdate)));
4959        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4960        assert_eq!(s.poll_server(), Err(Error::Done));
4961
4962        assert_eq!(s.server.take_last_priority_update(0), Ok(b"u=3".to_vec()));
4963        assert_eq!(s.server.take_last_priority_update(0), Err(Error::Done));
4964
4965        s.pipe
4966            .client
4967            .stream_shutdown(stream, crate::Shutdown::Write, 0x100)
4968            .unwrap();
4969        s.pipe
4970            .client
4971            .stream_shutdown(stream, crate::Shutdown::Read, 0x100)
4972            .unwrap();
4973
4974        s.advance().ok();
4975
4976        assert_eq!(s.poll_server(), Ok((0, Event::Reset(0x100))));
4977        assert_eq!(s.poll_server(), Err(Error::Done));
4978
4979        // Now send a PRIORITY_UPDATE for the closed request stream.
4980        s.client
4981            .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
4982                urgency: 3,
4983                incremental: false,
4984            })
4985            .unwrap();
4986        s.advance().ok();
4987
4988        // No event generated at server
4989        assert_eq!(s.poll_server(), Err(Error::Done));
4990    }
4991
4992    #[test]
4993    /// Send a PRIORITY_UPDATE for push stream from the client.
4994    fn priority_update_push() {
4995        let mut s = Session::new().unwrap();
4996        s.handshake().unwrap();
4997
4998        s.send_frame_client(
4999            frame::Frame::PriorityUpdatePush {
5000                prioritized_element_id: 3,
5001                priority_field_value: b"u=3".to_vec(),
5002            },
5003            s.client.control_stream_id.unwrap(),
5004            false,
5005        )
5006        .unwrap();
5007
5008        assert_eq!(s.poll_server(), Err(Error::Done));
5009    }
5010
5011    #[test]
5012    /// Send a PRIORITY_UPDATE for request stream from the client but for an
5013    /// incorrect stream type.
5014    fn priority_update_request_bad_stream() {
5015        let mut s = Session::new().unwrap();
5016        s.handshake().unwrap();
5017
5018        s.send_frame_client(
5019            frame::Frame::PriorityUpdateRequest {
5020                prioritized_element_id: 5,
5021                priority_field_value: b"u=3".to_vec(),
5022            },
5023            s.client.control_stream_id.unwrap(),
5024            false,
5025        )
5026        .unwrap();
5027
5028        assert_eq!(s.poll_server(), Err(Error::FrameUnexpected));
5029    }
5030
5031    #[test]
5032    /// Send a PRIORITY_UPDATE for push stream from the client but for an
5033    /// incorrect stream type.
5034    fn priority_update_push_bad_stream() {
5035        let mut s = Session::new().unwrap();
5036        s.handshake().unwrap();
5037
5038        s.send_frame_client(
5039            frame::Frame::PriorityUpdatePush {
5040                prioritized_element_id: 5,
5041                priority_field_value: b"u=3".to_vec(),
5042            },
5043            s.client.control_stream_id.unwrap(),
5044            false,
5045        )
5046        .unwrap();
5047
5048        assert_eq!(s.poll_server(), Err(Error::FrameUnexpected));
5049    }
5050
5051    #[test]
5052    /// Send a PRIORITY_UPDATE for request stream from the server.
5053    fn priority_update_request_from_server() {
5054        let mut s = Session::new().unwrap();
5055        s.handshake().unwrap();
5056
5057        s.send_frame_server(
5058            frame::Frame::PriorityUpdateRequest {
5059                prioritized_element_id: 0,
5060                priority_field_value: b"u=3".to_vec(),
5061            },
5062            s.server.control_stream_id.unwrap(),
5063            false,
5064        )
5065        .unwrap();
5066
5067        assert_eq!(s.poll_client(), Err(Error::FrameUnexpected));
5068    }
5069
5070    #[test]
5071    /// Send a PRIORITY_UPDATE for request stream from the server.
5072    fn priority_update_push_from_server() {
5073        let mut s = Session::new().unwrap();
5074        s.handshake().unwrap();
5075
5076        s.send_frame_server(
5077            frame::Frame::PriorityUpdatePush {
5078                prioritized_element_id: 0,
5079                priority_field_value: b"u=3".to_vec(),
5080            },
5081            s.server.control_stream_id.unwrap(),
5082            false,
5083        )
5084        .unwrap();
5085
5086        assert_eq!(s.poll_client(), Err(Error::FrameUnexpected));
5087    }
5088
5089    #[test]
5090    /// Ensure quiche allocates streams for client and server roles as expected.
5091    fn uni_stream_local_counting() {
5092        let config = Config::new().unwrap();
5093
5094        let h3_cln = Connection::new(&config, false, false).unwrap();
5095        assert_eq!(h3_cln.next_uni_stream_id, 2);
5096
5097        let h3_srv = Connection::new(&config, true, false).unwrap();
5098        assert_eq!(h3_srv.next_uni_stream_id, 3);
5099    }
5100
5101    #[test]
5102    /// Client opens multiple control streams, which is forbidden.
5103    fn open_multiple_control_streams() {
5104        let mut s = Session::new().unwrap();
5105        s.handshake().unwrap();
5106
5107        let stream_id = s.client.next_uni_stream_id;
5108
5109        let mut d = [42; 8];
5110        let mut b = octets::OctetsMut::with_slice(&mut d);
5111
5112        s.pipe
5113            .client
5114            .stream_send(
5115                stream_id,
5116                b.put_varint(stream::HTTP3_CONTROL_STREAM_TYPE_ID).unwrap(),
5117                false,
5118            )
5119            .unwrap();
5120
5121        s.advance().ok();
5122
5123        assert_eq!(s.poll_server(), Err(Error::StreamCreationError));
5124    }
5125
5126    #[test]
5127    /// Client closes the control stream, which is forbidden.
5128    fn close_control_stream_after_type() {
5129        let mut s = Session::new().unwrap();
5130        s.handshake().unwrap();
5131
5132        s.pipe
5133            .client
5134            .stream_send(s.client.control_stream_id.unwrap(), &[], true)
5135            .unwrap();
5136
5137        s.advance().ok();
5138
5139        assert_eq!(
5140            Err(Error::ClosedCriticalStream),
5141            s.server.poll(&mut s.pipe.server)
5142        );
5143        assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
5144    }
5145
5146    #[test]
5147    /// Client closes the control stream after a frame is sent, which is
5148    /// forbidden.
5149    fn close_control_stream_after_frame() {
5150        let mut s = Session::new().unwrap();
5151        s.handshake().unwrap();
5152
5153        s.send_frame_client(
5154            frame::Frame::MaxPushId { push_id: 1 },
5155            s.client.control_stream_id.unwrap(),
5156            true,
5157        )
5158        .unwrap();
5159
5160        assert_eq!(
5161            Err(Error::ClosedCriticalStream),
5162            s.server.poll(&mut s.pipe.server)
5163        );
5164        assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
5165    }
5166
5167    #[test]
5168    /// Client resets the control stream, which is forbidden.
5169    fn reset_control_stream_after_type() {
5170        let mut s = Session::new().unwrap();
5171        s.handshake().unwrap();
5172
5173        s.pipe
5174            .client
5175            .stream_shutdown(
5176                s.client.control_stream_id.unwrap(),
5177                crate::Shutdown::Write,
5178                0,
5179            )
5180            .unwrap();
5181
5182        s.advance().ok();
5183
5184        assert_eq!(
5185            Err(Error::ClosedCriticalStream),
5186            s.server.poll(&mut s.pipe.server)
5187        );
5188        assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
5189    }
5190
5191    #[test]
5192    /// Client resets the control stream after a frame is sent, which is
5193    /// forbidden.
5194    fn reset_control_stream_after_frame() {
5195        let mut s = Session::new().unwrap();
5196        s.handshake().unwrap();
5197
5198        s.send_frame_client(
5199            frame::Frame::MaxPushId { push_id: 1 },
5200            s.client.control_stream_id.unwrap(),
5201            false,
5202        )
5203        .unwrap();
5204
5205        assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
5206
5207        s.pipe
5208            .client
5209            .stream_shutdown(
5210                s.client.control_stream_id.unwrap(),
5211                crate::Shutdown::Write,
5212                0,
5213            )
5214            .unwrap();
5215
5216        s.advance().ok();
5217
5218        assert_eq!(
5219            Err(Error::ClosedCriticalStream),
5220            s.server.poll(&mut s.pipe.server)
5221        );
5222        assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
5223    }
5224
5225    #[test]
5226    /// Client closes QPACK stream, which is forbidden.
5227    fn close_qpack_stream_after_type() {
5228        let mut s = Session::new().unwrap();
5229        s.handshake().unwrap();
5230
5231        s.pipe
5232            .client
5233            .stream_send(
5234                s.client.local_qpack_streams.encoder_stream_id.unwrap(),
5235                &[],
5236                true,
5237            )
5238            .unwrap();
5239
5240        s.advance().ok();
5241
5242        assert_eq!(
5243            Err(Error::ClosedCriticalStream),
5244            s.server.poll(&mut s.pipe.server)
5245        );
5246        assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
5247    }
5248
5249    #[test]
5250    /// Client closes QPACK stream after sending some stuff, which is forbidden.
5251    fn close_qpack_stream_after_data() {
5252        let mut s = Session::new().unwrap();
5253        s.handshake().unwrap();
5254
5255        let stream_id = s.client.local_qpack_streams.encoder_stream_id.unwrap();
5256        let d = [0; 1];
5257
5258        s.pipe.client.stream_send(stream_id, &d, false).unwrap();
5259        s.pipe.client.stream_send(stream_id, &d, true).unwrap();
5260
5261        s.advance().ok();
5262
5263        assert_eq!(
5264            Err(Error::ClosedCriticalStream),
5265            s.server.poll(&mut s.pipe.server)
5266        );
5267        assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
5268    }
5269
5270    #[test]
5271    /// Client resets QPACK stream, which is forbidden.
5272    fn reset_qpack_stream_after_type() {
5273        let mut s = Session::new().unwrap();
5274        s.handshake().unwrap();
5275
5276        s.pipe
5277            .client
5278            .stream_shutdown(
5279                s.client.local_qpack_streams.encoder_stream_id.unwrap(),
5280                crate::Shutdown::Write,
5281                0,
5282            )
5283            .unwrap();
5284
5285        s.advance().ok();
5286
5287        assert_eq!(
5288            Err(Error::ClosedCriticalStream),
5289            s.server.poll(&mut s.pipe.server)
5290        );
5291        assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
5292    }
5293
5294    #[test]
5295    /// Client resets QPACK stream after sending some stuff, which is forbidden.
5296    fn reset_qpack_stream_after_data() {
5297        let mut s = Session::new().unwrap();
5298        s.handshake().unwrap();
5299
5300        let stream_id = s.client.local_qpack_streams.encoder_stream_id.unwrap();
5301        let d = [0; 1];
5302
5303        s.pipe.client.stream_send(stream_id, &d, false).unwrap();
5304        s.pipe.client.stream_send(stream_id, &d, false).unwrap();
5305
5306        s.advance().ok();
5307
5308        assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
5309
5310        s.pipe
5311            .client
5312            .stream_shutdown(stream_id, crate::Shutdown::Write, 0)
5313            .unwrap();
5314
5315        s.advance().ok();
5316
5317        assert_eq!(
5318            Err(Error::ClosedCriticalStream),
5319            s.server.poll(&mut s.pipe.server)
5320        );
5321        assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
5322    }
5323
5324    #[test]
5325    /// Client sends QPACK data.
5326    fn qpack_data() {
5327        // TODO: QPACK instructions are ignored until dynamic table support is
5328        // added so we just test that the data is safely ignored.
5329        let mut s = Session::new().unwrap();
5330        s.handshake().unwrap();
5331
5332        let e_stream_id = s.client.local_qpack_streams.encoder_stream_id.unwrap();
5333        let d_stream_id = s.client.local_qpack_streams.decoder_stream_id.unwrap();
5334        let d = [0; 20];
5335
5336        s.pipe.client.stream_send(e_stream_id, &d, false).unwrap();
5337        s.advance().ok();
5338
5339        s.pipe.client.stream_send(d_stream_id, &d, false).unwrap();
5340        s.advance().ok();
5341
5342        match s.server.poll(&mut s.pipe.server) {
5343            Ok(_) => panic!(),
5344
5345            Err(Error::Done) => {
5346                assert_eq!(s.server.peer_qpack_streams.encoder_stream_bytes, 20);
5347                assert_eq!(s.server.peer_qpack_streams.decoder_stream_bytes, 20);
5348            },
5349
5350            Err(_) => {
5351                panic!();
5352            },
5353        }
5354
5355        let stats = s.server.stats();
5356        assert_eq!(stats.qpack_encoder_stream_recv_bytes, 20);
5357        assert_eq!(stats.qpack_decoder_stream_recv_bytes, 20);
5358    }
5359
5360    #[test]
5361    /// Tests limits for the stream state buffer maximum size.
5362    fn max_state_buf_size() {
5363        let mut s = Session::new().unwrap();
5364        s.handshake().unwrap();
5365
5366        let req = vec![
5367            Header::new(b":method", b"GET"),
5368            Header::new(b":scheme", b"https"),
5369            Header::new(b":authority", b"quic.tech"),
5370            Header::new(b":path", b"/test"),
5371            Header::new(b"user-agent", b"quiche-test"),
5372        ];
5373
5374        assert_eq!(
5375            s.client.send_request(&mut s.pipe.client, &req, false),
5376            Ok(0)
5377        );
5378
5379        s.advance().ok();
5380
5381        let ev_headers = Event::Headers {
5382            list: req,
5383            more_frames: true,
5384        };
5385
5386        assert_eq!(s.server.poll(&mut s.pipe.server), Ok((0, ev_headers)));
5387
5388        // DATA frames don't consume the state buffer, so can be of any size.
5389        let mut d = [42; 128];
5390        let mut b = octets::OctetsMut::with_slice(&mut d);
5391
5392        let frame_type = b.put_varint(frame::DATA_FRAME_TYPE_ID).unwrap();
5393        s.pipe.client.stream_send(0, frame_type, false).unwrap();
5394
5395        let frame_len = b.put_varint(1 << 24).unwrap();
5396        s.pipe.client.stream_send(0, frame_len, false).unwrap();
5397
5398        s.pipe.client.stream_send(0, &d, false).unwrap();
5399
5400        s.advance().ok();
5401
5402        assert_eq!(s.server.poll(&mut s.pipe.server), Ok((0, Event::Data)));
5403
5404        // GREASE frames consume the state buffer, so need to be limited.
5405        let mut s = Session::new().unwrap();
5406        s.handshake().unwrap();
5407
5408        let mut d = [42; 128];
5409        let mut b = octets::OctetsMut::with_slice(&mut d);
5410
5411        let frame_type = b.put_varint(148_764_065_110_560_899).unwrap();
5412        s.pipe.client.stream_send(0, frame_type, false).unwrap();
5413
5414        let frame_len = b.put_varint(1 << 24).unwrap();
5415        s.pipe.client.stream_send(0, frame_len, false).unwrap();
5416
5417        s.pipe.client.stream_send(0, &d, false).unwrap();
5418
5419        s.advance().ok();
5420
5421        assert_eq!(s.server.poll(&mut s.pipe.server), Err(Error::ExcessiveLoad));
5422    }
5423
5424    #[test]
5425    /// Tests that DATA frames are properly truncated depending on the request
5426    /// stream's outgoing flow control capacity.
5427    fn stream_backpressure() {
5428        let bytes = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
5429
5430        let mut s = Session::new().unwrap();
5431        s.handshake().unwrap();
5432
5433        let (stream, req) = s.send_request(false).unwrap();
5434
5435        let total_data_frames = 6;
5436
5437        for _ in 0..total_data_frames {
5438            assert_eq!(
5439                s.client
5440                    .send_body(&mut s.pipe.client, stream, &bytes, false),
5441                Ok(bytes.len())
5442            );
5443
5444            s.advance().ok();
5445        }
5446
5447        assert_eq!(
5448            s.client.send_body(&mut s.pipe.client, stream, &bytes, true),
5449            Ok(bytes.len() - 2)
5450        );
5451
5452        s.advance().ok();
5453
5454        let mut recv_buf = vec![0; bytes.len()];
5455
5456        let ev_headers = Event::Headers {
5457            list: req,
5458            more_frames: true,
5459        };
5460
5461        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
5462        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
5463        assert_eq!(s.poll_server(), Err(Error::Done));
5464
5465        for _ in 0..total_data_frames {
5466            assert_eq!(
5467                s.recv_body_server(stream, &mut recv_buf),
5468                Ok(bytes.len())
5469            );
5470        }
5471
5472        assert_eq!(
5473            s.recv_body_server(stream, &mut recv_buf),
5474            Ok(bytes.len() - 2)
5475        );
5476
5477        // Fin flag from last send_body() call was not sent as the buffer was
5478        // only partially written.
5479        assert_eq!(s.poll_server(), Err(Error::Done));
5480    }
5481
5482    #[test]
5483    /// Tests that the max header list size setting is enforced.
5484    fn request_max_header_size_limit() {
5485        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
5486        config
5487            .load_cert_chain_from_pem_file("examples/cert.crt")
5488            .unwrap();
5489        config
5490            .load_priv_key_from_pem_file("examples/cert.key")
5491            .unwrap();
5492        config.set_application_protos(&[b"h3"]).unwrap();
5493        config.set_initial_max_data(1500);
5494        config.set_initial_max_stream_data_bidi_local(150);
5495        config.set_initial_max_stream_data_bidi_remote(150);
5496        config.set_initial_max_stream_data_uni(150);
5497        config.set_initial_max_streams_bidi(5);
5498        config.set_initial_max_streams_uni(5);
5499        config.verify_peer(false);
5500
5501        let mut h3_config = Config::new().unwrap();
5502        h3_config.set_max_field_section_size(65);
5503
5504        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
5505
5506        s.handshake().unwrap();
5507
5508        let req = vec![
5509            Header::new(b":method", b"GET"),
5510            Header::new(b":scheme", b"https"),
5511            Header::new(b":authority", b"quic.tech"),
5512            Header::new(b":path", b"/test"),
5513            Header::new(b"aaaaaaa", b"aaaaaaaa"),
5514        ];
5515
5516        let stream = s
5517            .client
5518            .send_request(&mut s.pipe.client, &req, true)
5519            .unwrap();
5520
5521        s.advance().ok();
5522
5523        assert_eq!(stream, 0);
5524
5525        assert_eq!(s.poll_server(), Err(Error::ExcessiveLoad));
5526
5527        assert_eq!(
5528            s.pipe.server.local_error.as_ref().unwrap().error_code,
5529            Error::to_wire(Error::ExcessiveLoad)
5530        );
5531    }
5532
5533    #[test]
5534    /// Tests that Error::TransportError contains a transport error.
5535    fn transport_error() {
5536        let mut s = Session::new().unwrap();
5537        s.handshake().unwrap();
5538
5539        let req = vec![
5540            Header::new(b":method", b"GET"),
5541            Header::new(b":scheme", b"https"),
5542            Header::new(b":authority", b"quic.tech"),
5543            Header::new(b":path", b"/test"),
5544            Header::new(b"user-agent", b"quiche-test"),
5545        ];
5546
5547        // We need to open all streams in the same flight, so we can't use the
5548        // Session::send_request() method because it also calls advance(),
5549        // otherwise the server would send a MAX_STREAMS frame and the client
5550        // wouldn't hit the streams limit.
5551        assert_eq!(s.client.send_request(&mut s.pipe.client, &req, true), Ok(0));
5552        assert_eq!(s.client.send_request(&mut s.pipe.client, &req, true), Ok(4));
5553        assert_eq!(s.client.send_request(&mut s.pipe.client, &req, true), Ok(8));
5554        assert_eq!(
5555            s.client.send_request(&mut s.pipe.client, &req, true),
5556            Ok(12)
5557        );
5558        assert_eq!(
5559            s.client.send_request(&mut s.pipe.client, &req, true),
5560            Ok(16)
5561        );
5562
5563        assert_eq!(
5564            s.client.send_request(&mut s.pipe.client, &req, true),
5565            Err(Error::TransportError(crate::Error::StreamLimit))
5566        );
5567    }
5568
5569    #[test]
5570    /// Tests that sending DATA before HEADERS causes an error.
5571    fn data_before_headers() {
5572        let mut s = Session::new().unwrap();
5573        s.handshake().unwrap();
5574
5575        let mut d = [42; 128];
5576        let mut b = octets::OctetsMut::with_slice(&mut d);
5577
5578        let frame_type = b.put_varint(frame::DATA_FRAME_TYPE_ID).unwrap();
5579        s.pipe.client.stream_send(0, frame_type, false).unwrap();
5580
5581        let frame_len = b.put_varint(5).unwrap();
5582        s.pipe.client.stream_send(0, frame_len, false).unwrap();
5583
5584        s.pipe.client.stream_send(0, b"hello", false).unwrap();
5585
5586        s.advance().ok();
5587
5588        assert_eq!(
5589            s.server.poll(&mut s.pipe.server),
5590            Err(Error::FrameUnexpected)
5591        );
5592    }
5593
5594    #[test]
5595    /// Tests that calling poll() after an error occurred does nothing.
5596    fn poll_after_error() {
5597        let mut s = Session::new().unwrap();
5598        s.handshake().unwrap();
5599
5600        let mut d = [42; 128];
5601        let mut b = octets::OctetsMut::with_slice(&mut d);
5602
5603        let frame_type = b.put_varint(148_764_065_110_560_899).unwrap();
5604        s.pipe.client.stream_send(0, frame_type, false).unwrap();
5605
5606        let frame_len = b.put_varint(1 << 24).unwrap();
5607        s.pipe.client.stream_send(0, frame_len, false).unwrap();
5608
5609        s.pipe.client.stream_send(0, &d, false).unwrap();
5610
5611        s.advance().ok();
5612
5613        assert_eq!(s.server.poll(&mut s.pipe.server), Err(Error::ExcessiveLoad));
5614
5615        // Try to call poll() again after an error occurred.
5616        assert_eq!(s.server.poll(&mut s.pipe.server), Err(Error::Done));
5617    }
5618
5619    #[test]
5620    /// Tests that we limit sending HEADERS based on the stream capacity.
5621    fn headers_blocked() {
5622        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
5623        config
5624            .load_cert_chain_from_pem_file("examples/cert.crt")
5625            .unwrap();
5626        config
5627            .load_priv_key_from_pem_file("examples/cert.key")
5628            .unwrap();
5629        config.set_application_protos(&[b"h3"]).unwrap();
5630        config.set_initial_max_data(70);
5631        config.set_initial_max_stream_data_bidi_local(150);
5632        config.set_initial_max_stream_data_bidi_remote(150);
5633        config.set_initial_max_stream_data_uni(150);
5634        config.set_initial_max_streams_bidi(100);
5635        config.set_initial_max_streams_uni(5);
5636        config.verify_peer(false);
5637
5638        let h3_config = Config::new().unwrap();
5639
5640        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
5641
5642        s.handshake().unwrap();
5643
5644        let req = vec![
5645            Header::new(b":method", b"GET"),
5646            Header::new(b":scheme", b"https"),
5647            Header::new(b":authority", b"quic.tech"),
5648            Header::new(b":path", b"/test"),
5649        ];
5650
5651        assert_eq!(s.client.send_request(&mut s.pipe.client, &req, true), Ok(0));
5652
5653        assert_eq!(
5654            s.client.send_request(&mut s.pipe.client, &req, true),
5655            Err(Error::StreamBlocked)
5656        );
5657
5658        // Clear the writable stream queue.
5659        assert_eq!(s.pipe.client.stream_writable_next(), Some(2));
5660        assert_eq!(s.pipe.client.stream_writable_next(), Some(6));
5661        assert_eq!(s.pipe.client.stream_writable_next(), Some(10));
5662        assert_eq!(s.pipe.client.stream_writable_next(), None);
5663
5664        s.advance().ok();
5665
5666        // Once the server gives flow control credits back, we can send the
5667        // request.
5668        assert_eq!(s.pipe.client.stream_writable_next(), Some(4));
5669        assert_eq!(s.client.send_request(&mut s.pipe.client, &req, true), Ok(4));
5670    }
5671
5672    #[test]
5673    /// Ensure StreamBlocked when connection flow control prevents headers.
5674    fn headers_blocked_on_conn() {
5675        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
5676        config
5677            .load_cert_chain_from_pem_file("examples/cert.crt")
5678            .unwrap();
5679        config
5680            .load_priv_key_from_pem_file("examples/cert.key")
5681            .unwrap();
5682        config.set_application_protos(&[b"h3"]).unwrap();
5683        config.set_initial_max_data(70);
5684        config.set_initial_max_stream_data_bidi_local(150);
5685        config.set_initial_max_stream_data_bidi_remote(150);
5686        config.set_initial_max_stream_data_uni(150);
5687        config.set_initial_max_streams_bidi(100);
5688        config.set_initial_max_streams_uni(5);
5689        config.verify_peer(false);
5690
5691        let h3_config = Config::new().unwrap();
5692
5693        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
5694
5695        s.handshake().unwrap();
5696
5697        // After the HTTP handshake, some bytes of connection flow control have
5698        // been consumed. Fill the connection with more grease data on the control
5699        // stream.
5700        let d = [42; 28];
5701        assert_eq!(s.pipe.client.stream_send(2, &d, false), Ok(23));
5702
5703        let req = vec![
5704            Header::new(b":method", b"GET"),
5705            Header::new(b":scheme", b"https"),
5706            Header::new(b":authority", b"quic.tech"),
5707            Header::new(b":path", b"/test"),
5708        ];
5709
5710        // There is 0 connection-level flow control, so sending a request is
5711        // blocked.
5712        assert_eq!(
5713            s.client.send_request(&mut s.pipe.client, &req, true),
5714            Err(Error::StreamBlocked)
5715        );
5716        assert_eq!(s.pipe.client.stream_writable_next(), None);
5717
5718        // Emit the control stream data and drain it at the server via poll() to
5719        // consumes it via poll() and gives back flow control.
5720        s.advance().ok();
5721        assert_eq!(s.poll_server(), Err(Error::Done));
5722        s.advance().ok();
5723
5724        // Now we can send the request.
5725        assert_eq!(s.pipe.client.stream_writable_next(), Some(2));
5726        assert_eq!(s.pipe.client.stream_writable_next(), Some(6));
5727        assert_eq!(s.client.send_request(&mut s.pipe.client, &req, true), Ok(0));
5728    }
5729
5730    #[test]
5731    /// Ensure STREAM_DATA_BLOCKED is not emitted multiple times with the same
5732    /// offset when trying to send large bodies.
5733    fn send_body_truncation_stream_blocked() {
5734        use crate::test_utils::decode_pkt;
5735
5736        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
5737        config
5738            .load_cert_chain_from_pem_file("examples/cert.crt")
5739            .unwrap();
5740        config
5741            .load_priv_key_from_pem_file("examples/cert.key")
5742            .unwrap();
5743        config.set_application_protos(&[b"h3"]).unwrap();
5744        config.set_initial_max_data(10000); // large connection-level flow control
5745        config.set_initial_max_stream_data_bidi_local(80);
5746        config.set_initial_max_stream_data_bidi_remote(80);
5747        config.set_initial_max_stream_data_uni(150);
5748        config.set_initial_max_streams_bidi(100);
5749        config.set_initial_max_streams_uni(5);
5750        config.verify_peer(false);
5751
5752        let h3_config = Config::new().unwrap();
5753
5754        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
5755
5756        s.handshake().unwrap();
5757
5758        let (stream, req) = s.send_request(true).unwrap();
5759
5760        let ev_headers = Event::Headers {
5761            list: req,
5762            more_frames: false,
5763        };
5764
5765        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
5766        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
5767
5768        let _ = s.send_response(stream, false).unwrap();
5769
5770        assert_eq!(s.pipe.server.streams.blocked().len(), 0);
5771
5772        // The body must be larger than the stream window would allow
5773        let d = [42; 500];
5774        let mut off = 0;
5775
5776        let sent = s
5777            .server
5778            .send_body(&mut s.pipe.server, stream, &d, true)
5779            .unwrap();
5780        assert_eq!(sent, 25);
5781        off += sent;
5782
5783        // send_body wrote as much as it could (sent < size of buff).
5784        assert_eq!(s.pipe.server.streams.blocked().len(), 1);
5785        assert_eq!(
5786            s.server
5787                .send_body(&mut s.pipe.server, stream, &d[off..], true),
5788            Err(Error::Done)
5789        );
5790        assert_eq!(s.pipe.server.streams.blocked().len(), 1);
5791
5792        // Now read raw frames to see what the QUIC layer did
5793        let mut buf = [0; 65535];
5794        let (len, _) = s.pipe.server.send(&mut buf).unwrap();
5795
5796        let frames = decode_pkt(&mut s.pipe.client, &mut buf[..len]).unwrap();
5797
5798        let mut iter = frames.iter();
5799
5800        assert_eq!(
5801            iter.next(),
5802            Some(&crate::frame::Frame::StreamDataBlocked {
5803                stream_id: 0,
5804                limit: 80,
5805            })
5806        );
5807
5808        // At the server, after sending the STREAM_DATA_BLOCKED frame, we clear
5809        // the mark.
5810        assert_eq!(s.pipe.server.streams.blocked().len(), 0);
5811
5812        // Don't read any data from the client, so stream flow control is never
5813        // given back in the form of changing the stream's max offset.
5814        // Subsequent body send operations will still fail but no more
5815        // STREAM_DATA_BLOCKED frames should be submitted since the limit didn't
5816        // change. No frames means no packet to send.
5817        assert_eq!(
5818            s.server
5819                .send_body(&mut s.pipe.server, stream, &d[off..], true),
5820            Err(Error::Done)
5821        );
5822        assert_eq!(s.pipe.server.streams.blocked().len(), 0);
5823        assert_eq!(s.pipe.server.send(&mut buf), Err(crate::Error::Done));
5824
5825        // Now update the client's max offset manually.
5826        let frames = [crate::frame::Frame::MaxStreamData {
5827            stream_id: 0,
5828            max: 100,
5829        }];
5830
5831        let pkt_type = crate::packet::Type::Short;
5832        assert_eq!(
5833            s.pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
5834            Ok(39),
5835        );
5836
5837        let sent = s
5838            .server
5839            .send_body(&mut s.pipe.server, stream, &d[off..], true)
5840            .unwrap();
5841        assert_eq!(sent, 18);
5842
5843        // Same thing here...
5844        assert_eq!(s.pipe.server.streams.blocked().len(), 1);
5845        assert_eq!(
5846            s.server
5847                .send_body(&mut s.pipe.server, stream, &d[off..], true),
5848            Err(Error::Done)
5849        );
5850        assert_eq!(s.pipe.server.streams.blocked().len(), 1);
5851
5852        let (len, _) = s.pipe.server.send(&mut buf).unwrap();
5853
5854        let frames = decode_pkt(&mut s.pipe.client, &mut buf[..len]).unwrap();
5855
5856        let mut iter = frames.iter();
5857
5858        assert_eq!(
5859            iter.next(),
5860            Some(&crate::frame::Frame::StreamDataBlocked {
5861                stream_id: 0,
5862                limit: 100,
5863            })
5864        );
5865    }
5866
5867    #[test]
5868    /// Ensure stream doesn't hang due to small cwnd.
5869    fn send_body_stream_blocked_by_small_cwnd() {
5870        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
5871        config
5872            .load_cert_chain_from_pem_file("examples/cert.crt")
5873            .unwrap();
5874        config
5875            .load_priv_key_from_pem_file("examples/cert.key")
5876            .unwrap();
5877        config.set_application_protos(&[b"h3"]).unwrap();
5878        config.set_initial_max_data(100000); // large connection-level flow control
5879        config.set_initial_max_stream_data_bidi_local(100000);
5880        config.set_initial_max_stream_data_bidi_remote(50000);
5881        config.set_initial_max_stream_data_uni(150);
5882        config.set_initial_max_streams_bidi(100);
5883        config.set_initial_max_streams_uni(5);
5884        config.verify_peer(false);
5885
5886        let h3_config = Config::new().unwrap();
5887
5888        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
5889
5890        s.handshake().unwrap();
5891
5892        let (stream, req) = s.send_request(true).unwrap();
5893
5894        let ev_headers = Event::Headers {
5895            list: req,
5896            more_frames: false,
5897        };
5898
5899        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
5900        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
5901
5902        let _ = s.send_response(stream, false).unwrap();
5903
5904        // Clear the writable stream queue.
5905        assert_eq!(s.pipe.server.stream_writable_next(), Some(3));
5906        assert_eq!(s.pipe.server.stream_writable_next(), Some(7));
5907        assert_eq!(s.pipe.server.stream_writable_next(), Some(11));
5908        assert_eq!(s.pipe.server.stream_writable_next(), Some(stream));
5909        assert_eq!(s.pipe.server.stream_writable_next(), None);
5910
5911        // The body must be larger than the cwnd would allow.
5912        let send_buf = [42; 80000];
5913
5914        let sent = s
5915            .server
5916            .send_body(&mut s.pipe.server, stream, &send_buf, true)
5917            .unwrap();
5918
5919        // send_body wrote as much as it could (sent < size of buff).
5920        assert_eq!(sent, 11995);
5921
5922        s.advance().ok();
5923
5924        // Client reads received headers and body.
5925        let mut recv_buf = [42; 80000];
5926        assert!(s.poll_client().is_ok());
5927        assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
5928        assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(11995));
5929
5930        s.advance().ok();
5931
5932        // Server send cap is smaller than remaining body buffer.
5933        assert!(s.pipe.server.tx_cap < send_buf.len() - sent);
5934
5935        // Once the server cwnd opens up, we can send more body.
5936        assert_eq!(s.pipe.server.stream_writable_next(), Some(0));
5937    }
5938
5939    #[test]
5940    /// Ensure stream doesn't hang due to small cwnd.
5941    fn send_body_stream_blocked_zero_length() {
5942        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
5943        config
5944            .load_cert_chain_from_pem_file("examples/cert.crt")
5945            .unwrap();
5946        config
5947            .load_priv_key_from_pem_file("examples/cert.key")
5948            .unwrap();
5949        config.set_application_protos(&[b"h3"]).unwrap();
5950        config.set_initial_max_data(100000); // large connection-level flow control
5951        config.set_initial_max_stream_data_bidi_local(100000);
5952        config.set_initial_max_stream_data_bidi_remote(50000);
5953        config.set_initial_max_stream_data_uni(150);
5954        config.set_initial_max_streams_bidi(100);
5955        config.set_initial_max_streams_uni(5);
5956        config.verify_peer(false);
5957
5958        let h3_config = Config::new().unwrap();
5959
5960        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
5961
5962        s.handshake().unwrap();
5963
5964        let (stream, req) = s.send_request(true).unwrap();
5965
5966        let ev_headers = Event::Headers {
5967            list: req,
5968            more_frames: false,
5969        };
5970
5971        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
5972        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
5973
5974        let _ = s.send_response(stream, false).unwrap();
5975
5976        // Clear the writable stream queue.
5977        assert_eq!(s.pipe.server.stream_writable_next(), Some(3));
5978        assert_eq!(s.pipe.server.stream_writable_next(), Some(7));
5979        assert_eq!(s.pipe.server.stream_writable_next(), Some(11));
5980        assert_eq!(s.pipe.server.stream_writable_next(), Some(stream));
5981        assert_eq!(s.pipe.server.stream_writable_next(), None);
5982
5983        // The body is large enough to fill the cwnd, except for enough bytes
5984        // for another DATA frame header (but no payload).
5985        let send_buf = [42; 11994];
5986
5987        let sent = s
5988            .server
5989            .send_body(&mut s.pipe.server, stream, &send_buf, false)
5990            .unwrap();
5991
5992        assert_eq!(sent, 11994);
5993
5994        // There is only enough capacity left for the DATA frame header, but
5995        // no payload.
5996        assert_eq!(s.pipe.server.stream_capacity(stream).unwrap(), 3);
5997        assert_eq!(
5998            s.server
5999                .send_body(&mut s.pipe.server, stream, &send_buf, false),
6000            Err(Error::Done)
6001        );
6002
6003        s.advance().ok();
6004
6005        // Client reads received headers and body.
6006        let mut recv_buf = [42; 80000];
6007        assert!(s.poll_client().is_ok());
6008        assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
6009        assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(11994));
6010
6011        s.advance().ok();
6012
6013        // Once the server cwnd opens up, we can send more body.
6014        assert_eq!(s.pipe.server.stream_writable_next(), Some(0));
6015    }
6016
6017    #[test]
6018    /// Test handling of 0-length DATA writes with and without fin.
6019    fn zero_length_data() {
6020        let mut s = Session::new().unwrap();
6021        s.handshake().unwrap();
6022
6023        let (stream, req) = s.send_request(false).unwrap();
6024
6025        assert_eq!(
6026            s.client.send_body(&mut s.pipe.client, 0, b"", false),
6027            Err(Error::Done)
6028        );
6029        assert_eq!(s.client.send_body(&mut s.pipe.client, 0, b"", true), Ok(0));
6030
6031        s.advance().ok();
6032
6033        let mut recv_buf = vec![0; 100];
6034
6035        let ev_headers = Event::Headers {
6036            list: req,
6037            more_frames: true,
6038        };
6039
6040        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
6041
6042        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
6043        assert_eq!(s.recv_body_server(stream, &mut recv_buf), Err(Error::Done));
6044
6045        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
6046        assert_eq!(s.poll_server(), Err(Error::Done));
6047
6048        let resp = s.send_response(stream, false).unwrap();
6049
6050        assert_eq!(
6051            s.server.send_body(&mut s.pipe.server, 0, b"", false),
6052            Err(Error::Done)
6053        );
6054        assert_eq!(s.server.send_body(&mut s.pipe.server, 0, b"", true), Ok(0));
6055
6056        s.advance().ok();
6057
6058        let ev_headers = Event::Headers {
6059            list: resp,
6060            more_frames: true,
6061        };
6062
6063        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
6064
6065        assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
6066        assert_eq!(s.recv_body_client(stream, &mut recv_buf), Err(Error::Done));
6067
6068        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
6069        assert_eq!(s.poll_client(), Err(Error::Done));
6070    }
6071
6072    #[test]
6073    /// Tests that blocked 0-length DATA writes are reported correctly.
6074    fn zero_length_data_blocked() {
6075        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
6076        config
6077            .load_cert_chain_from_pem_file("examples/cert.crt")
6078            .unwrap();
6079        config
6080            .load_priv_key_from_pem_file("examples/cert.key")
6081            .unwrap();
6082        config.set_application_protos(&[b"h3"]).unwrap();
6083        config.set_initial_max_data(69);
6084        config.set_initial_max_stream_data_bidi_local(150);
6085        config.set_initial_max_stream_data_bidi_remote(150);
6086        config.set_initial_max_stream_data_uni(150);
6087        config.set_initial_max_streams_bidi(100);
6088        config.set_initial_max_streams_uni(5);
6089        config.verify_peer(false);
6090
6091        let h3_config = Config::new().unwrap();
6092
6093        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
6094
6095        s.handshake().unwrap();
6096
6097        let req = vec![
6098            Header::new(b":method", b"GET"),
6099            Header::new(b":scheme", b"https"),
6100            Header::new(b":authority", b"quic.tech"),
6101            Header::new(b":path", b"/test"),
6102        ];
6103
6104        assert_eq!(
6105            s.client.send_request(&mut s.pipe.client, &req, false),
6106            Ok(0)
6107        );
6108
6109        assert_eq!(
6110            s.client.send_body(&mut s.pipe.client, 0, b"", true),
6111            Err(Error::Done)
6112        );
6113
6114        // Clear the writable stream queue.
6115        assert_eq!(s.pipe.client.stream_writable_next(), Some(2));
6116        assert_eq!(s.pipe.client.stream_writable_next(), Some(6));
6117        assert_eq!(s.pipe.client.stream_writable_next(), Some(10));
6118        assert_eq!(s.pipe.client.stream_writable_next(), None);
6119
6120        s.advance().ok();
6121
6122        // Once the server gives flow control credits back, we can send the body.
6123        assert_eq!(s.pipe.client.stream_writable_next(), Some(0));
6124        assert_eq!(s.client.send_body(&mut s.pipe.client, 0, b"", true), Ok(0));
6125    }
6126
6127    #[test]
6128    /// Tests that receiving an empty SETTINGS frame is handled and reported.
6129    fn empty_settings() {
6130        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
6131        config
6132            .load_cert_chain_from_pem_file("examples/cert.crt")
6133            .unwrap();
6134        config
6135            .load_priv_key_from_pem_file("examples/cert.key")
6136            .unwrap();
6137        config.set_application_protos(&[b"h3"]).unwrap();
6138        config.set_initial_max_data(1500);
6139        config.set_initial_max_stream_data_bidi_local(150);
6140        config.set_initial_max_stream_data_bidi_remote(150);
6141        config.set_initial_max_stream_data_uni(150);
6142        config.set_initial_max_streams_bidi(5);
6143        config.set_initial_max_streams_uni(5);
6144        config.verify_peer(false);
6145        config.set_ack_delay_exponent(8);
6146        config.grease(false);
6147
6148        let h3_config = Config::new().unwrap();
6149        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
6150
6151        s.handshake().unwrap();
6152
6153        assert!(s.client.peer_settings_raw().is_some());
6154        assert!(s.server.peer_settings_raw().is_some());
6155    }
6156
6157    #[test]
6158    /// Tests that receiving a H3_DATAGRAM setting is ok.
6159    fn dgram_setting() {
6160        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
6161        config
6162            .load_cert_chain_from_pem_file("examples/cert.crt")
6163            .unwrap();
6164        config
6165            .load_priv_key_from_pem_file("examples/cert.key")
6166            .unwrap();
6167        config.set_application_protos(&[b"h3"]).unwrap();
6168        config.set_initial_max_data(70);
6169        config.set_initial_max_stream_data_bidi_local(150);
6170        config.set_initial_max_stream_data_bidi_remote(150);
6171        config.set_initial_max_stream_data_uni(150);
6172        config.set_initial_max_streams_bidi(100);
6173        config.set_initial_max_streams_uni(5);
6174        config.enable_dgram(true, 1000, 1000);
6175        config.verify_peer(false);
6176
6177        let h3_config = Config::new().unwrap();
6178
6179        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
6180        assert_eq!(s.pipe.handshake(), Ok(()));
6181
6182        s.client.send_settings(&mut s.pipe.client).unwrap();
6183        assert_eq!(s.pipe.advance(), Ok(()));
6184
6185        // Before processing SETTINGS (via poll), HTTP/3 DATAGRAMS are not
6186        // enabled.
6187        assert!(!s.server.dgram_enabled_by_peer(&s.pipe.server));
6188
6189        // When everything is ok, poll returns Done and DATAGRAM is enabled.
6190        assert_eq!(s.server.poll(&mut s.pipe.server), Err(Error::Done));
6191        assert!(s.server.dgram_enabled_by_peer(&s.pipe.server));
6192
6193        // Now detect things on the client
6194        s.server.send_settings(&mut s.pipe.server).unwrap();
6195        assert_eq!(s.pipe.advance(), Ok(()));
6196        assert!(!s.client.dgram_enabled_by_peer(&s.pipe.client));
6197        assert_eq!(s.client.poll(&mut s.pipe.client), Err(Error::Done));
6198        assert!(s.client.dgram_enabled_by_peer(&s.pipe.client));
6199    }
6200
6201    #[test]
6202    /// Tests that receiving a H3_DATAGRAM setting when no TP is set generates
6203    /// an error.
6204    fn dgram_setting_no_tp() {
6205        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
6206        config
6207            .load_cert_chain_from_pem_file("examples/cert.crt")
6208            .unwrap();
6209        config
6210            .load_priv_key_from_pem_file("examples/cert.key")
6211            .unwrap();
6212        config.set_application_protos(&[b"h3"]).unwrap();
6213        config.set_initial_max_data(70);
6214        config.set_initial_max_stream_data_bidi_local(150);
6215        config.set_initial_max_stream_data_bidi_remote(150);
6216        config.set_initial_max_stream_data_uni(150);
6217        config.set_initial_max_streams_bidi(100);
6218        config.set_initial_max_streams_uni(5);
6219        config.verify_peer(false);
6220
6221        let h3_config = Config::new().unwrap();
6222
6223        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
6224        assert_eq!(s.pipe.handshake(), Ok(()));
6225
6226        s.client.control_stream_id = Some(
6227            s.client
6228                .open_uni_stream(
6229                    &mut s.pipe.client,
6230                    stream::HTTP3_CONTROL_STREAM_TYPE_ID,
6231                )
6232                .unwrap(),
6233        );
6234
6235        let settings = frame::Frame::Settings {
6236            max_field_section_size: None,
6237            qpack_max_table_capacity: None,
6238            qpack_blocked_streams: None,
6239            connect_protocol_enabled: None,
6240            h3_datagram: Some(1),
6241            grease: None,
6242            additional_settings: Default::default(),
6243            raw: Default::default(),
6244        };
6245
6246        s.send_frame_client(settings, s.client.control_stream_id.unwrap(), false)
6247            .unwrap();
6248
6249        assert_eq!(s.pipe.advance(), Ok(()));
6250
6251        assert_eq!(s.server.poll(&mut s.pipe.server), Err(Error::SettingsError));
6252    }
6253
6254    #[test]
6255    /// Tests that receiving SETTINGS with prohibited values generates an error.
6256    fn settings_h2_prohibited() {
6257        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
6258        config
6259            .load_cert_chain_from_pem_file("examples/cert.crt")
6260            .unwrap();
6261        config
6262            .load_priv_key_from_pem_file("examples/cert.key")
6263            .unwrap();
6264        config.set_application_protos(&[b"h3"]).unwrap();
6265        config.set_initial_max_data(70);
6266        config.set_initial_max_stream_data_bidi_local(150);
6267        config.set_initial_max_stream_data_bidi_remote(150);
6268        config.set_initial_max_stream_data_uni(150);
6269        config.set_initial_max_streams_bidi(100);
6270        config.set_initial_max_streams_uni(5);
6271        config.verify_peer(false);
6272
6273        let h3_config = Config::new().unwrap();
6274
6275        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
6276        assert_eq!(s.pipe.handshake(), Ok(()));
6277
6278        s.client.control_stream_id = Some(
6279            s.client
6280                .open_uni_stream(
6281                    &mut s.pipe.client,
6282                    stream::HTTP3_CONTROL_STREAM_TYPE_ID,
6283                )
6284                .unwrap(),
6285        );
6286
6287        s.server.control_stream_id = Some(
6288            s.server
6289                .open_uni_stream(
6290                    &mut s.pipe.server,
6291                    stream::HTTP3_CONTROL_STREAM_TYPE_ID,
6292                )
6293                .unwrap(),
6294        );
6295
6296        let frame_payload_len = 2u64;
6297        let settings = [
6298            frame::SETTINGS_FRAME_TYPE_ID as u8,
6299            frame_payload_len as u8,
6300            0x2, // 0x2 is a reserved setting type
6301            1,
6302        ];
6303
6304        s.send_arbitrary_stream_data_client(
6305            &settings,
6306            s.client.control_stream_id.unwrap(),
6307            false,
6308        )
6309        .unwrap();
6310
6311        s.send_arbitrary_stream_data_server(
6312            &settings,
6313            s.server.control_stream_id.unwrap(),
6314            false,
6315        )
6316        .unwrap();
6317
6318        assert_eq!(s.pipe.advance(), Ok(()));
6319
6320        assert_eq!(s.server.poll(&mut s.pipe.server), Err(Error::SettingsError));
6321
6322        assert_eq!(s.client.poll(&mut s.pipe.client), Err(Error::SettingsError));
6323    }
6324
6325    #[test]
6326    /// Tests that setting SETTINGS with prohibited values generates an error.
6327    fn set_prohibited_additional_settings() {
6328        let mut h3_config = Config::new().unwrap();
6329        assert_eq!(
6330            h3_config.set_additional_settings(vec![(
6331                frame::SETTINGS_QPACK_MAX_TABLE_CAPACITY,
6332                43
6333            )]),
6334            Err(Error::SettingsError)
6335        );
6336        assert_eq!(
6337            h3_config.set_additional_settings(vec![(
6338                frame::SETTINGS_MAX_FIELD_SECTION_SIZE,
6339                43
6340            )]),
6341            Err(Error::SettingsError)
6342        );
6343        assert_eq!(
6344            h3_config.set_additional_settings(vec![(
6345                frame::SETTINGS_QPACK_BLOCKED_STREAMS,
6346                43
6347            )]),
6348            Err(Error::SettingsError)
6349        );
6350        assert_eq!(
6351            h3_config.set_additional_settings(vec![(
6352                frame::SETTINGS_ENABLE_CONNECT_PROTOCOL,
6353                43
6354            )]),
6355            Err(Error::SettingsError)
6356        );
6357        assert_eq!(
6358            h3_config
6359                .set_additional_settings(vec![(frame::SETTINGS_H3_DATAGRAM, 43)]),
6360            Err(Error::SettingsError)
6361        );
6362    }
6363
6364    #[test]
6365    /// Tests additional settings are actually exchanged by the peers.
6366    fn set_additional_settings() {
6367        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
6368        config
6369            .load_cert_chain_from_pem_file("examples/cert.crt")
6370            .unwrap();
6371        config
6372            .load_priv_key_from_pem_file("examples/cert.key")
6373            .unwrap();
6374        config.set_application_protos(&[b"h3"]).unwrap();
6375        config.set_initial_max_data(70);
6376        config.set_initial_max_stream_data_bidi_local(150);
6377        config.set_initial_max_stream_data_bidi_remote(150);
6378        config.set_initial_max_stream_data_uni(150);
6379        config.set_initial_max_streams_bidi(100);
6380        config.set_initial_max_streams_uni(5);
6381        config.verify_peer(false);
6382        config.grease(false);
6383
6384        let mut h3_config = Config::new().unwrap();
6385        h3_config
6386            .set_additional_settings(vec![(42, 43), (44, 45)])
6387            .unwrap();
6388
6389        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
6390        assert_eq!(s.pipe.handshake(), Ok(()));
6391
6392        assert_eq!(s.pipe.advance(), Ok(()));
6393
6394        s.client.send_settings(&mut s.pipe.client).unwrap();
6395        assert_eq!(s.pipe.advance(), Ok(()));
6396        assert_eq!(s.server.poll(&mut s.pipe.server), Err(Error::Done));
6397
6398        s.server.send_settings(&mut s.pipe.server).unwrap();
6399        assert_eq!(s.pipe.advance(), Ok(()));
6400        assert_eq!(s.client.poll(&mut s.pipe.client), Err(Error::Done));
6401
6402        assert_eq!(
6403            s.server.peer_settings_raw(),
6404            Some(&[(42, 43), (44, 45)][..])
6405        );
6406        assert_eq!(
6407            s.client.peer_settings_raw(),
6408            Some(&[(42, 43), (44, 45)][..])
6409        );
6410    }
6411
6412    #[test]
6413    /// Send a single DATAGRAM.
6414    fn single_dgram() {
6415        let mut buf = [0; 65535];
6416        let mut s = Session::new().unwrap();
6417        s.handshake().unwrap();
6418
6419        // We'll send default data of 10 bytes on flow ID 0.
6420        let result = (11, 0, 1);
6421
6422        s.send_dgram_client(0).unwrap();
6423
6424        assert_eq!(s.poll_server(), Err(Error::Done));
6425        assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
6426
6427        s.send_dgram_server(0).unwrap();
6428        assert_eq!(s.poll_client(), Err(Error::Done));
6429        assert_eq!(s.recv_dgram_client(&mut buf), Ok(result));
6430    }
6431
6432    #[test]
6433    /// Send multiple DATAGRAMs.
6434    fn multiple_dgram() {
6435        let mut buf = [0; 65535];
6436        let mut s = Session::new().unwrap();
6437        s.handshake().unwrap();
6438
6439        // We'll send default data of 10 bytes on flow ID 0.
6440        let result = (11, 0, 1);
6441
6442        s.send_dgram_client(0).unwrap();
6443        s.send_dgram_client(0).unwrap();
6444        s.send_dgram_client(0).unwrap();
6445
6446        assert_eq!(s.poll_server(), Err(Error::Done));
6447        assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
6448        assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
6449        assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
6450        assert_eq!(s.recv_dgram_server(&mut buf), Err(Error::Done));
6451
6452        s.send_dgram_server(0).unwrap();
6453        s.send_dgram_server(0).unwrap();
6454        s.send_dgram_server(0).unwrap();
6455
6456        assert_eq!(s.poll_client(), Err(Error::Done));
6457        assert_eq!(s.recv_dgram_client(&mut buf), Ok(result));
6458        assert_eq!(s.recv_dgram_client(&mut buf), Ok(result));
6459        assert_eq!(s.recv_dgram_client(&mut buf), Ok(result));
6460        assert_eq!(s.recv_dgram_client(&mut buf), Err(Error::Done));
6461    }
6462
6463    #[test]
6464    /// Send more DATAGRAMs than the send queue allows.
6465    fn multiple_dgram_overflow() {
6466        let mut buf = [0; 65535];
6467        let mut s = Session::new().unwrap();
6468        s.handshake().unwrap();
6469
6470        // We'll send default data of 10 bytes on flow ID 0.
6471        let result = (11, 0, 1);
6472
6473        // Five DATAGRAMs
6474        s.send_dgram_client(0).unwrap();
6475        s.send_dgram_client(0).unwrap();
6476        s.send_dgram_client(0).unwrap();
6477        s.send_dgram_client(0).unwrap();
6478        s.send_dgram_client(0).unwrap();
6479
6480        // Only 3 independent DATAGRAMs to read events will fire.
6481        assert_eq!(s.poll_server(), Err(Error::Done));
6482        assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
6483        assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
6484        assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
6485        assert_eq!(s.recv_dgram_server(&mut buf), Err(Error::Done));
6486    }
6487
6488    #[test]
6489    /// Send a single DATAGRAM and request.
6490    fn poll_datagram_cycling_no_read() {
6491        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
6492        config
6493            .load_cert_chain_from_pem_file("examples/cert.crt")
6494            .unwrap();
6495        config
6496            .load_priv_key_from_pem_file("examples/cert.key")
6497            .unwrap();
6498        config.set_application_protos(&[b"h3"]).unwrap();
6499        config.set_initial_max_data(1500);
6500        config.set_initial_max_stream_data_bidi_local(150);
6501        config.set_initial_max_stream_data_bidi_remote(150);
6502        config.set_initial_max_stream_data_uni(150);
6503        config.set_initial_max_streams_bidi(100);
6504        config.set_initial_max_streams_uni(5);
6505        config.verify_peer(false);
6506        config.enable_dgram(true, 100, 100);
6507
6508        let h3_config = Config::new().unwrap();
6509        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
6510        s.handshake().unwrap();
6511
6512        // Send request followed by DATAGRAM on client side.
6513        let (stream, req) = s.send_request(false).unwrap();
6514
6515        s.send_body_client(stream, true).unwrap();
6516
6517        let ev_headers = Event::Headers {
6518            list: req,
6519            more_frames: true,
6520        };
6521
6522        s.send_dgram_client(0).unwrap();
6523
6524        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
6525        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
6526
6527        assert_eq!(s.poll_server(), Err(Error::Done));
6528    }
6529
6530    #[test]
6531    /// Send a single DATAGRAM and request.
6532    fn poll_datagram_single_read() {
6533        let mut buf = [0; 65535];
6534
6535        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
6536        config
6537            .load_cert_chain_from_pem_file("examples/cert.crt")
6538            .unwrap();
6539        config
6540            .load_priv_key_from_pem_file("examples/cert.key")
6541            .unwrap();
6542        config.set_application_protos(&[b"h3"]).unwrap();
6543        config.set_initial_max_data(1500);
6544        config.set_initial_max_stream_data_bidi_local(150);
6545        config.set_initial_max_stream_data_bidi_remote(150);
6546        config.set_initial_max_stream_data_uni(150);
6547        config.set_initial_max_streams_bidi(100);
6548        config.set_initial_max_streams_uni(5);
6549        config.verify_peer(false);
6550        config.enable_dgram(true, 100, 100);
6551
6552        let h3_config = Config::new().unwrap();
6553        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
6554        s.handshake().unwrap();
6555
6556        // We'll send default data of 10 bytes on flow ID 0.
6557        let result = (11, 0, 1);
6558
6559        // Send request followed by DATAGRAM on client side.
6560        let (stream, req) = s.send_request(false).unwrap();
6561
6562        let body = s.send_body_client(stream, true).unwrap();
6563
6564        let mut recv_buf = vec![0; body.len()];
6565
6566        let ev_headers = Event::Headers {
6567            list: req,
6568            more_frames: true,
6569        };
6570
6571        s.send_dgram_client(0).unwrap();
6572
6573        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
6574        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
6575
6576        assert_eq!(s.poll_server(), Err(Error::Done));
6577
6578        assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
6579
6580        assert_eq!(s.poll_server(), Err(Error::Done));
6581
6582        assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
6583        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
6584        assert_eq!(s.poll_server(), Err(Error::Done));
6585
6586        // Send response followed by DATAGRAM on server side
6587        let resp = s.send_response(stream, false).unwrap();
6588
6589        let body = s.send_body_server(stream, true).unwrap();
6590
6591        let mut recv_buf = vec![0; body.len()];
6592
6593        let ev_headers = Event::Headers {
6594            list: resp,
6595            more_frames: true,
6596        };
6597
6598        s.send_dgram_server(0).unwrap();
6599
6600        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
6601        assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
6602
6603        assert_eq!(s.poll_client(), Err(Error::Done));
6604
6605        assert_eq!(s.recv_dgram_client(&mut buf), Ok(result));
6606
6607        assert_eq!(s.poll_client(), Err(Error::Done));
6608
6609        assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(body.len()));
6610
6611        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
6612        assert_eq!(s.poll_client(), Err(Error::Done));
6613    }
6614
6615    #[test]
6616    /// Send multiple DATAGRAMs and requests.
6617    fn poll_datagram_multi_read() {
6618        let mut buf = [0; 65535];
6619
6620        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
6621        config
6622            .load_cert_chain_from_pem_file("examples/cert.crt")
6623            .unwrap();
6624        config
6625            .load_priv_key_from_pem_file("examples/cert.key")
6626            .unwrap();
6627        config.set_application_protos(&[b"h3"]).unwrap();
6628        config.set_initial_max_data(1500);
6629        config.set_initial_max_stream_data_bidi_local(150);
6630        config.set_initial_max_stream_data_bidi_remote(150);
6631        config.set_initial_max_stream_data_uni(150);
6632        config.set_initial_max_streams_bidi(100);
6633        config.set_initial_max_streams_uni(5);
6634        config.verify_peer(false);
6635        config.enable_dgram(true, 100, 100);
6636
6637        let h3_config = Config::new().unwrap();
6638        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
6639        s.handshake().unwrap();
6640
6641        // 10 bytes on flow ID 0 and 2.
6642        let flow_0_result = (11, 0, 1);
6643        let flow_2_result = (11, 2, 1);
6644
6645        // Send requests followed by DATAGRAMs on client side.
6646        let (stream, req) = s.send_request(false).unwrap();
6647
6648        let body = s.send_body_client(stream, true).unwrap();
6649
6650        let mut recv_buf = vec![0; body.len()];
6651
6652        let ev_headers = Event::Headers {
6653            list: req,
6654            more_frames: true,
6655        };
6656
6657        s.send_dgram_client(0).unwrap();
6658        s.send_dgram_client(0).unwrap();
6659        s.send_dgram_client(0).unwrap();
6660        s.send_dgram_client(0).unwrap();
6661        s.send_dgram_client(0).unwrap();
6662        s.send_dgram_client(2).unwrap();
6663        s.send_dgram_client(2).unwrap();
6664        s.send_dgram_client(2).unwrap();
6665        s.send_dgram_client(2).unwrap();
6666        s.send_dgram_client(2).unwrap();
6667
6668        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
6669        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
6670
6671        assert_eq!(s.poll_server(), Err(Error::Done));
6672
6673        // Second cycle, start to read
6674        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
6675        assert_eq!(s.poll_server(), Err(Error::Done));
6676        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
6677        assert_eq!(s.poll_server(), Err(Error::Done));
6678        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
6679        assert_eq!(s.poll_server(), Err(Error::Done));
6680
6681        assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
6682        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
6683
6684        assert_eq!(s.poll_server(), Err(Error::Done));
6685
6686        // Third cycle.
6687        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
6688        assert_eq!(s.poll_server(), Err(Error::Done));
6689        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
6690        assert_eq!(s.poll_server(), Err(Error::Done));
6691        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
6692        assert_eq!(s.poll_server(), Err(Error::Done));
6693        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
6694        assert_eq!(s.poll_server(), Err(Error::Done));
6695        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
6696        assert_eq!(s.poll_server(), Err(Error::Done));
6697        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
6698        assert_eq!(s.poll_server(), Err(Error::Done));
6699        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
6700        assert_eq!(s.poll_server(), Err(Error::Done));
6701
6702        // Send response followed by DATAGRAM on server side
6703        let resp = s.send_response(stream, false).unwrap();
6704
6705        let body = s.send_body_server(stream, true).unwrap();
6706
6707        let mut recv_buf = vec![0; body.len()];
6708
6709        let ev_headers = Event::Headers {
6710            list: resp,
6711            more_frames: true,
6712        };
6713
6714        s.send_dgram_server(0).unwrap();
6715        s.send_dgram_server(0).unwrap();
6716        s.send_dgram_server(0).unwrap();
6717        s.send_dgram_server(0).unwrap();
6718        s.send_dgram_server(0).unwrap();
6719        s.send_dgram_server(2).unwrap();
6720        s.send_dgram_server(2).unwrap();
6721        s.send_dgram_server(2).unwrap();
6722        s.send_dgram_server(2).unwrap();
6723        s.send_dgram_server(2).unwrap();
6724
6725        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
6726        assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
6727
6728        assert_eq!(s.poll_client(), Err(Error::Done));
6729
6730        // Second cycle, start to read
6731        assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_0_result));
6732        assert_eq!(s.poll_client(), Err(Error::Done));
6733        assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_0_result));
6734        assert_eq!(s.poll_client(), Err(Error::Done));
6735        assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_0_result));
6736        assert_eq!(s.poll_client(), Err(Error::Done));
6737
6738        assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(body.len()));
6739        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
6740
6741        assert_eq!(s.poll_client(), Err(Error::Done));
6742
6743        // Third cycle.
6744        assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_0_result));
6745        assert_eq!(s.poll_client(), Err(Error::Done));
6746        assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_0_result));
6747        assert_eq!(s.poll_client(), Err(Error::Done));
6748        assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_2_result));
6749        assert_eq!(s.poll_client(), Err(Error::Done));
6750        assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_2_result));
6751        assert_eq!(s.poll_client(), Err(Error::Done));
6752        assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_2_result));
6753        assert_eq!(s.poll_client(), Err(Error::Done));
6754        assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_2_result));
6755        assert_eq!(s.poll_client(), Err(Error::Done));
6756        assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_2_result));
6757        assert_eq!(s.poll_client(), Err(Error::Done));
6758    }
6759
6760    #[test]
6761    /// Tests that the Finished event is not issued for streams of unknown type
6762    /// (e.g. GREASE).
6763    fn finished_is_for_requests() {
6764        let mut s = Session::new().unwrap();
6765        s.handshake().unwrap();
6766
6767        assert_eq!(s.poll_client(), Err(Error::Done));
6768        assert_eq!(s.poll_server(), Err(Error::Done));
6769
6770        assert_eq!(s.client.open_grease_stream(&mut s.pipe.client), Ok(()));
6771        assert_eq!(s.pipe.advance(), Ok(()));
6772
6773        assert_eq!(s.poll_client(), Err(Error::Done));
6774        assert_eq!(s.poll_server(), Err(Error::Done));
6775    }
6776
6777    #[test]
6778    /// Tests that streams are marked as finished only once.
6779    fn finished_once() {
6780        let mut s = Session::new().unwrap();
6781        s.handshake().unwrap();
6782
6783        let (stream, req) = s.send_request(false).unwrap();
6784        let body = s.send_body_client(stream, true).unwrap();
6785
6786        let mut recv_buf = vec![0; body.len()];
6787
6788        let ev_headers = Event::Headers {
6789            list: req,
6790            more_frames: true,
6791        };
6792
6793        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
6794        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
6795
6796        assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
6797        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
6798
6799        assert_eq!(s.recv_body_server(stream, &mut recv_buf), Err(Error::Done));
6800        assert_eq!(s.poll_server(), Err(Error::Done));
6801    }
6802
6803    #[test]
6804    /// Tests that the Data event is properly re-armed.
6805    fn data_event_rearm() {
6806        let bytes = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
6807
6808        let mut s = Session::new().unwrap();
6809        s.handshake().unwrap();
6810
6811        let (r1_id, r1_hdrs) = s.send_request(false).unwrap();
6812
6813        let mut recv_buf = vec![0; bytes.len()];
6814
6815        let r1_ev_headers = Event::Headers {
6816            list: r1_hdrs,
6817            more_frames: true,
6818        };
6819
6820        // Manually send an incomplete DATA frame (i.e. the frame size is longer
6821        // than the actual data sent).
6822        {
6823            let mut d = [42; 10];
6824            let mut b = octets::OctetsMut::with_slice(&mut d);
6825
6826            b.put_varint(frame::DATA_FRAME_TYPE_ID).unwrap();
6827            b.put_varint(bytes.len() as u64).unwrap();
6828            let off = b.off();
6829            s.pipe.client.stream_send(r1_id, &d[..off], false).unwrap();
6830
6831            assert_eq!(
6832                s.pipe.client.stream_send(r1_id, &bytes[..5], false),
6833                Ok(5)
6834            );
6835
6836            s.advance().ok();
6837        }
6838
6839        assert_eq!(s.poll_server(), Ok((r1_id, r1_ev_headers)));
6840        assert_eq!(s.poll_server(), Ok((r1_id, Event::Data)));
6841        assert_eq!(s.poll_server(), Err(Error::Done));
6842
6843        // Read the available body data.
6844        assert_eq!(s.recv_body_server(r1_id, &mut recv_buf), Ok(5));
6845
6846        // Send the remaining DATA payload.
6847        assert_eq!(s.pipe.client.stream_send(r1_id, &bytes[5..], false), Ok(5));
6848        s.advance().ok();
6849
6850        assert_eq!(s.poll_server(), Ok((r1_id, Event::Data)));
6851        assert_eq!(s.poll_server(), Err(Error::Done));
6852
6853        // Read the rest of the body data.
6854        assert_eq!(s.recv_body_server(r1_id, &mut recv_buf), Ok(5));
6855        assert_eq!(s.poll_server(), Err(Error::Done));
6856
6857        // Send more data.
6858        let r1_body = s.send_body_client(r1_id, false).unwrap();
6859
6860        assert_eq!(s.poll_server(), Ok((r1_id, Event::Data)));
6861        assert_eq!(s.poll_server(), Err(Error::Done));
6862
6863        assert_eq!(s.recv_body_server(r1_id, &mut recv_buf), Ok(r1_body.len()));
6864
6865        // Send a new request to ensure cross-stream events don't break rearming.
6866        let (r2_id, r2_hdrs) = s.send_request(false).unwrap();
6867        let r2_ev_headers = Event::Headers {
6868            list: r2_hdrs,
6869            more_frames: true,
6870        };
6871        let r2_body = s.send_body_client(r2_id, false).unwrap();
6872
6873        s.advance().ok();
6874
6875        assert_eq!(s.poll_server(), Ok((r2_id, r2_ev_headers)));
6876        assert_eq!(s.poll_server(), Ok((r2_id, Event::Data)));
6877        assert_eq!(s.recv_body_server(r2_id, &mut recv_buf), Ok(r2_body.len()));
6878        assert_eq!(s.poll_server(), Err(Error::Done));
6879
6880        // Send more data on request 1, then trailing HEADERS.
6881        let r1_body = s.send_body_client(r1_id, false).unwrap();
6882
6883        let trailers = vec![Header::new(b"hello", b"world")];
6884
6885        s.client
6886            .send_headers(&mut s.pipe.client, r1_id, &trailers, true)
6887            .unwrap();
6888
6889        let r1_ev_trailers = Event::Headers {
6890            list: trailers.clone(),
6891            more_frames: false,
6892        };
6893
6894        s.advance().ok();
6895
6896        assert_eq!(s.poll_server(), Ok((r1_id, Event::Data)));
6897        assert_eq!(s.recv_body_server(r1_id, &mut recv_buf), Ok(r1_body.len()));
6898
6899        assert_eq!(s.poll_server(), Ok((r1_id, r1_ev_trailers)));
6900        assert_eq!(s.poll_server(), Ok((r1_id, Event::Finished)));
6901        assert_eq!(s.poll_server(), Err(Error::Done));
6902
6903        // Send more data on request 2, then trailing HEADERS.
6904        let r2_body = s.send_body_client(r2_id, false).unwrap();
6905
6906        s.client
6907            .send_headers(&mut s.pipe.client, r2_id, &trailers, false)
6908            .unwrap();
6909
6910        let r2_ev_trailers = Event::Headers {
6911            list: trailers,
6912            more_frames: true,
6913        };
6914
6915        s.advance().ok();
6916
6917        assert_eq!(s.poll_server(), Ok((r2_id, Event::Data)));
6918        assert_eq!(s.recv_body_server(r2_id, &mut recv_buf), Ok(r2_body.len()));
6919        assert_eq!(s.poll_server(), Ok((r2_id, r2_ev_trailers)));
6920        assert_eq!(s.poll_server(), Err(Error::Done));
6921
6922        let (r3_id, r3_hdrs) = s.send_request(false).unwrap();
6923
6924        let r3_ev_headers = Event::Headers {
6925            list: r3_hdrs,
6926            more_frames: true,
6927        };
6928
6929        // Manually send an incomplete DATA frame (i.e. only the header is sent).
6930        {
6931            let mut d = [42; 10];
6932            let mut b = octets::OctetsMut::with_slice(&mut d);
6933
6934            b.put_varint(frame::DATA_FRAME_TYPE_ID).unwrap();
6935            b.put_varint(bytes.len() as u64).unwrap();
6936            let off = b.off();
6937            s.pipe.client.stream_send(r3_id, &d[..off], false).unwrap();
6938
6939            s.advance().ok();
6940        }
6941
6942        assert_eq!(s.poll_server(), Ok((r3_id, r3_ev_headers)));
6943        assert_eq!(s.poll_server(), Ok((r3_id, Event::Data)));
6944        assert_eq!(s.poll_server(), Err(Error::Done));
6945
6946        assert_eq!(s.recv_body_server(r3_id, &mut recv_buf), Err(Error::Done));
6947
6948        assert_eq!(s.pipe.client.stream_send(r3_id, &bytes[..5], false), Ok(5));
6949
6950        s.advance().ok();
6951
6952        assert_eq!(s.poll_server(), Ok((r3_id, Event::Data)));
6953        assert_eq!(s.poll_server(), Err(Error::Done));
6954
6955        assert_eq!(s.recv_body_server(r3_id, &mut recv_buf), Ok(5));
6956
6957        assert_eq!(s.pipe.client.stream_send(r3_id, &bytes[5..], false), Ok(5));
6958        s.advance().ok();
6959
6960        assert_eq!(s.poll_server(), Ok((r3_id, Event::Data)));
6961        assert_eq!(s.poll_server(), Err(Error::Done));
6962
6963        assert_eq!(s.recv_body_server(r3_id, &mut recv_buf), Ok(5));
6964
6965        // Buffer multiple data frames.
6966        let body = s.send_body_client(r3_id, false).unwrap();
6967        s.send_body_client(r3_id, false).unwrap();
6968        s.send_body_client(r3_id, false).unwrap();
6969
6970        assert_eq!(s.poll_server(), Ok((r3_id, Event::Data)));
6971        assert_eq!(s.poll_server(), Err(Error::Done));
6972
6973        {
6974            let mut d = [42; 10];
6975            let mut b = octets::OctetsMut::with_slice(&mut d);
6976
6977            b.put_varint(frame::DATA_FRAME_TYPE_ID).unwrap();
6978            b.put_varint(0).unwrap();
6979            let off = b.off();
6980            s.pipe.client.stream_send(r3_id, &d[..off], true).unwrap();
6981
6982            s.advance().ok();
6983        }
6984
6985        let mut recv_buf = vec![0; bytes.len() * 3];
6986
6987        assert_eq!(s.recv_body_server(r3_id, &mut recv_buf), Ok(body.len() * 3));
6988    }
6989
6990    #[test]
6991    /// Tests that the Datagram event is properly re-armed.
6992    fn dgram_event_rearm() {
6993        let mut buf = [0; 65535];
6994
6995        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
6996        config
6997            .load_cert_chain_from_pem_file("examples/cert.crt")
6998            .unwrap();
6999        config
7000            .load_priv_key_from_pem_file("examples/cert.key")
7001            .unwrap();
7002        config.set_application_protos(&[b"h3"]).unwrap();
7003        config.set_initial_max_data(1500);
7004        config.set_initial_max_stream_data_bidi_local(150);
7005        config.set_initial_max_stream_data_bidi_remote(150);
7006        config.set_initial_max_stream_data_uni(150);
7007        config.set_initial_max_streams_bidi(100);
7008        config.set_initial_max_streams_uni(5);
7009        config.verify_peer(false);
7010        config.enable_dgram(true, 100, 100);
7011
7012        let h3_config = Config::new().unwrap();
7013        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
7014        s.handshake().unwrap();
7015
7016        // 10 bytes on flow ID 0 and 2.
7017        let flow_0_result = (11, 0, 1);
7018        let flow_2_result = (11, 2, 1);
7019
7020        // Send requests followed by DATAGRAMs on client side.
7021        let (stream, req) = s.send_request(false).unwrap();
7022
7023        let body = s.send_body_client(stream, true).unwrap();
7024
7025        let mut recv_buf = vec![0; body.len()];
7026
7027        let ev_headers = Event::Headers {
7028            list: req,
7029            more_frames: true,
7030        };
7031
7032        s.send_dgram_client(0).unwrap();
7033        s.send_dgram_client(0).unwrap();
7034        s.send_dgram_client(2).unwrap();
7035        s.send_dgram_client(2).unwrap();
7036
7037        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
7038        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
7039
7040        assert_eq!(s.poll_server(), Err(Error::Done));
7041        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
7042
7043        assert_eq!(s.poll_server(), Err(Error::Done));
7044        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
7045
7046        assert_eq!(s.poll_server(), Err(Error::Done));
7047        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
7048
7049        assert_eq!(s.poll_server(), Err(Error::Done));
7050        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
7051
7052        assert_eq!(s.poll_server(), Err(Error::Done));
7053
7054        s.send_dgram_client(0).unwrap();
7055        s.send_dgram_client(2).unwrap();
7056
7057        assert_eq!(s.poll_server(), Err(Error::Done));
7058
7059        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
7060        assert_eq!(s.poll_server(), Err(Error::Done));
7061
7062        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
7063        assert_eq!(s.poll_server(), Err(Error::Done));
7064
7065        assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
7066        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
7067    }
7068
7069    #[test]
7070    fn reset_stream() {
7071        let mut buf = [0; 65535];
7072
7073        let mut s = Session::new().unwrap();
7074        s.handshake().unwrap();
7075
7076        // Client sends request.
7077        let (stream, req) = s.send_request(false).unwrap();
7078
7079        let ev_headers = Event::Headers {
7080            list: req,
7081            more_frames: true,
7082        };
7083
7084        // Server sends response and closes stream.
7085        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
7086        assert_eq!(s.poll_server(), Err(Error::Done));
7087
7088        let resp = s.send_response(stream, true).unwrap();
7089
7090        let ev_headers = Event::Headers {
7091            list: resp,
7092            more_frames: false,
7093        };
7094
7095        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
7096        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
7097        assert_eq!(s.poll_client(), Err(Error::Done));
7098
7099        // Client sends RESET_STREAM, closing stream.
7100        let frames = [crate::frame::Frame::ResetStream {
7101            stream_id: stream,
7102            error_code: 42,
7103            final_size: 68,
7104        }];
7105
7106        let pkt_type = crate::packet::Type::Short;
7107        assert_eq!(
7108            s.pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
7109            Ok(39)
7110        );
7111
7112        // Server issues Reset event for the stream.
7113        assert_eq!(s.poll_server(), Ok((stream, Event::Reset(42))));
7114        assert_eq!(s.poll_server(), Err(Error::Done));
7115
7116        // Sending RESET_STREAM again shouldn't trigger another Reset event.
7117        assert_eq!(
7118            s.pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
7119            Ok(39)
7120        );
7121
7122        assert_eq!(s.poll_server(), Err(Error::Done));
7123    }
7124
7125    #[test]
7126    fn reset_finished_at_server() {
7127        let mut s = Session::new().unwrap();
7128        s.handshake().unwrap();
7129
7130        // Client sends HEADERS and doesn't fin
7131        let (stream, _req) = s.send_request(false).unwrap();
7132
7133        // ..then Client sends RESET_STREAM
7134        assert_eq!(
7135            s.pipe.client.stream_shutdown(0, crate::Shutdown::Write, 0),
7136            Ok(())
7137        );
7138
7139        assert_eq!(s.pipe.advance(), Ok(()));
7140
7141        // Server receives just a reset
7142        assert_eq!(s.poll_server(), Ok((stream, Event::Reset(0))));
7143        assert_eq!(s.poll_server(), Err(Error::Done));
7144
7145        // Client sends HEADERS and fin
7146        let (stream, req) = s.send_request(true).unwrap();
7147
7148        // ..then Client sends RESET_STREAM
7149        assert_eq!(
7150            s.pipe.client.stream_shutdown(4, crate::Shutdown::Write, 0),
7151            Ok(())
7152        );
7153
7154        let ev_headers = Event::Headers {
7155            list: req,
7156            more_frames: false,
7157        };
7158
7159        // Server receives headers and fin.
7160        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
7161        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
7162        assert_eq!(s.poll_server(), Err(Error::Done));
7163    }
7164
7165    #[test]
7166    fn reset_finished_at_server_with_data_pending() {
7167        let mut s = Session::new().unwrap();
7168        s.handshake().unwrap();
7169
7170        // Client sends HEADERS and doesn't fin.
7171        let (stream, req) = s.send_request(false).unwrap();
7172
7173        assert!(s.send_body_client(stream, false).is_ok());
7174
7175        assert_eq!(s.pipe.advance(), Ok(()));
7176
7177        let ev_headers = Event::Headers {
7178            list: req,
7179            more_frames: true,
7180        };
7181
7182        // Server receives headers and data...
7183        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
7184        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
7185
7186        // ..then Client sends RESET_STREAM.
7187        assert_eq!(
7188            s.pipe
7189                .client
7190                .stream_shutdown(stream, crate::Shutdown::Write, 0),
7191            Ok(())
7192        );
7193
7194        assert_eq!(s.pipe.advance(), Ok(()));
7195
7196        // The server does *not* attempt to read from the stream,
7197        // but polls and receives the reset and there are no more
7198        // readable streams.
7199        assert_eq!(s.poll_server(), Ok((stream, Event::Reset(0))));
7200        assert_eq!(s.poll_server(), Err(Error::Done));
7201        assert_eq!(s.pipe.server.readable().len(), 0);
7202    }
7203
7204    #[test]
7205    fn reset_finished_at_server_with_data_pending_2() {
7206        let mut s = Session::new().unwrap();
7207        s.handshake().unwrap();
7208
7209        // Client sends HEADERS and doesn't fin.
7210        let (stream, req) = s.send_request(false).unwrap();
7211
7212        assert!(s.send_body_client(stream, false).is_ok());
7213
7214        assert_eq!(s.pipe.advance(), Ok(()));
7215
7216        let ev_headers = Event::Headers {
7217            list: req,
7218            more_frames: true,
7219        };
7220
7221        // Server receives headers and data...
7222        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
7223        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
7224
7225        // ..then Client sends RESET_STREAM.
7226        assert_eq!(
7227            s.pipe
7228                .client
7229                .stream_shutdown(stream, crate::Shutdown::Write, 0),
7230            Ok(())
7231        );
7232
7233        assert_eq!(s.pipe.advance(), Ok(()));
7234
7235        // Server reads from the stream and receives the reset while
7236        // attempting to read.
7237        assert_eq!(
7238            s.recv_body_server(stream, &mut [0; 100]),
7239            Err(Error::TransportError(crate::Error::StreamReset(0)))
7240        );
7241
7242        // No more events and there are no more readable streams.
7243        assert_eq!(s.poll_server(), Err(Error::Done));
7244        assert_eq!(s.pipe.server.readable().len(), 0);
7245    }
7246
7247    #[test]
7248    fn reset_finished_at_client() {
7249        let mut buf = [0; 65535];
7250        let mut s = Session::new().unwrap();
7251        s.handshake().unwrap();
7252
7253        // Client sends HEADERS and doesn't fin
7254        let (stream, req) = s.send_request(false).unwrap();
7255
7256        let ev_headers = Event::Headers {
7257            list: req,
7258            more_frames: true,
7259        };
7260
7261        // Server receives headers.
7262        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
7263        assert_eq!(s.poll_server(), Err(Error::Done));
7264
7265        // Server sends response and doesn't fin
7266        s.send_response(stream, false).unwrap();
7267
7268        assert_eq!(s.pipe.advance(), Ok(()));
7269
7270        // .. then Server sends RESET_STREAM
7271        assert_eq!(
7272            s.pipe
7273                .server
7274                .stream_shutdown(stream, crate::Shutdown::Write, 0),
7275            Ok(())
7276        );
7277
7278        assert_eq!(s.pipe.advance(), Ok(()));
7279
7280        // Client receives Reset only
7281        assert_eq!(s.poll_client(), Ok((stream, Event::Reset(0))));
7282        assert_eq!(s.poll_server(), Err(Error::Done));
7283
7284        // Client sends headers and fin.
7285        let (stream, req) = s.send_request(true).unwrap();
7286
7287        let ev_headers = Event::Headers {
7288            list: req,
7289            more_frames: false,
7290        };
7291
7292        // Server receives headers and fin.
7293        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
7294        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
7295        assert_eq!(s.poll_server(), Err(Error::Done));
7296
7297        // Server sends response and fin
7298        let resp = s.send_response(stream, true).unwrap();
7299
7300        assert_eq!(s.pipe.advance(), Ok(()));
7301
7302        // ..then Server sends RESET_STREAM
7303        let frames = [crate::frame::Frame::ResetStream {
7304            stream_id: stream,
7305            error_code: 42,
7306            final_size: 68,
7307        }];
7308
7309        let pkt_type = crate::packet::Type::Short;
7310        assert_eq!(
7311            s.pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
7312            Ok(39)
7313        );
7314
7315        assert_eq!(s.pipe.advance(), Ok(()));
7316
7317        let ev_headers = Event::Headers {
7318            list: resp,
7319            more_frames: false,
7320        };
7321
7322        // Client receives headers and fin.
7323        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
7324        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
7325        assert_eq!(s.poll_client(), Err(Error::Done));
7326    }
7327}
7328
7329#[cfg(feature = "ffi")]
7330mod ffi;
7331#[cfg(feature = "internal")]
7332#[doc(hidden)]
7333pub mod frame;
7334#[cfg(not(feature = "internal"))]
7335mod frame;
7336#[doc(hidden)]
7337pub mod qpack;
7338mod stream;