quiche/h3/
mod.rs

1// Copyright (C) 2019, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27//! HTTP/3 wire protocol and QPACK implementation.
28//!
29//! This module provides a high level API for sending and receiving HTTP/3
30//! requests and responses on top of the QUIC transport protocol.
31//!
32//! ## Connection setup
33//!
34//! HTTP/3 connections require a QUIC transport-layer connection, see
35//! [Connection setup] for a full description of the setup process.
36//!
37//! To use HTTP/3, the QUIC connection must be configured with a suitable
38//! Application Layer Protocol Negotiation (ALPN) Protocol ID:
39//!
40//! ```
41//! let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION)?;
42//! config.set_application_protos(quiche::h3::APPLICATION_PROTOCOL)?;
43//! # Ok::<(), quiche::Error>(())
44//! ```
45//!
46//! The QUIC handshake is driven by [sending] and [receiving] QUIC packets.
47//!
48//! Once the handshake has completed, the first step in establishing an HTTP/3
49//! connection is creating its configuration object:
50//!
51//! ```
52//! let h3_config = quiche::h3::Config::new()?;
53//! # Ok::<(), quiche::h3::Error>(())
54//! ```
55//!
56//! HTTP/3 client and server connections are both created using the
57//! [`with_transport()`] function, the role is inferred from the type of QUIC
58//! connection:
59//!
60//! ```no_run
61//! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap();
62//! # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]);
63//! # let peer = "127.0.0.1:1234".parse().unwrap();
64//! # let local = "127.0.0.1:4321".parse().unwrap();
65//! # let mut conn = quiche::accept(&scid, None, local, peer, &mut config).unwrap();
66//! # let h3_config = quiche::h3::Config::new()?;
67//! let h3_conn = quiche::h3::Connection::with_transport(&mut conn, &h3_config)?;
68//! # Ok::<(), quiche::h3::Error>(())
69//! ```
70//!
71//! ## Sending a request
72//!
73//! An HTTP/3 client can send a request by using the connection's
74//! [`send_request()`] method to queue request headers; [sending] QUIC packets
75//! causes the requests to get sent to the peer:
76//!
77//! ```no_run
78//! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap();
79//! # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]);
80//! # let peer = "127.0.0.1:1234".parse().unwrap();
81//! # let local = "127.0.0.1:4321".parse().unwrap();
82//! # let mut conn = quiche::connect(None, &scid, local, peer, &mut config).unwrap();
83//! # let h3_config = quiche::h3::Config::new()?;
84//! # let mut h3_conn = quiche::h3::Connection::with_transport(&mut conn, &h3_config)?;
85//! let req = vec![
86//!     quiche::h3::Header::new(b":method", b"GET"),
87//!     quiche::h3::Header::new(b":scheme", b"https"),
88//!     quiche::h3::Header::new(b":authority", b"quic.tech"),
89//!     quiche::h3::Header::new(b":path", b"/"),
90//!     quiche::h3::Header::new(b"user-agent", b"quiche"),
91//! ];
92//!
93//! h3_conn.send_request(&mut conn, &req, true)?;
94//! # Ok::<(), quiche::h3::Error>(())
95//! ```
96//!
97//! An HTTP/3 client can send a request with additional body data by using
98//! the connection's [`send_body()`] method:
99//!
100//! ```no_run
101//! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap();
102//! # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]);
103//! # let peer = "127.0.0.1:1234".parse().unwrap();
104//! # let local = "127.0.0.1:4321".parse().unwrap();
105//! # let mut conn = quiche::connect(None, &scid, local, peer, &mut config).unwrap();
106//! # let h3_config = quiche::h3::Config::new()?;
107//! # let mut h3_conn = quiche::h3::Connection::with_transport(&mut conn, &h3_config)?;
108//! let req = vec![
109//!     quiche::h3::Header::new(b":method", b"GET"),
110//!     quiche::h3::Header::new(b":scheme", b"https"),
111//!     quiche::h3::Header::new(b":authority", b"quic.tech"),
112//!     quiche::h3::Header::new(b":path", b"/"),
113//!     quiche::h3::Header::new(b"user-agent", b"quiche"),
114//! ];
115//!
116//! let stream_id = h3_conn.send_request(&mut conn, &req, false)?;
117//! h3_conn.send_body(&mut conn, stream_id, b"Hello World!", true)?;
118//! # Ok::<(), quiche::h3::Error>(())
119//! ```
120//!
121//! ## Handling requests and responses
122//!
123//! After [receiving] QUIC packets, HTTP/3 data is processed using the
124//! connection's [`poll()`] method. On success, this returns an [`Event`] object
125//! and an ID corresponding to the stream where the `Event` originated.
126//!
127//! An HTTP/3 server uses [`poll()`] to read requests and responds to them using
128//! [`send_response()`] and [`send_body()`]:
129//!
130//! ```no_run
131//! use quiche::h3::NameValue;
132//!
133//! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap();
134//! # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]);
135//! # let peer = "127.0.0.1:1234".parse().unwrap();
136//! # let local = "127.0.0.1:1234".parse().unwrap();
137//! # let mut conn = quiche::accept(&scid, None, local, peer, &mut config).unwrap();
138//! # let h3_config = quiche::h3::Config::new()?;
139//! # let mut h3_conn = quiche::h3::Connection::with_transport(&mut conn, &h3_config)?;
140//! loop {
141//!     match h3_conn.poll(&mut conn) {
142//!         Ok((stream_id, quiche::h3::Event::Headers{list, more_frames})) => {
143//!             let mut headers = list.into_iter();
144//!
145//!             // Look for the request's method.
146//!             let method = headers.find(|h| h.name() == b":method").unwrap();
147//!
148//!             // Look for the request's path.
149//!             let path = headers.find(|h| h.name() == b":path").unwrap();
150//!
151//!             if method.value() == b"GET" && path.value() == b"/" {
152//!                 let resp = vec![
153//!                     quiche::h3::Header::new(b":status", 200.to_string().as_bytes()),
154//!                     quiche::h3::Header::new(b"server", b"quiche"),
155//!                 ];
156//!
157//!                 h3_conn.send_response(&mut conn, stream_id, &resp, false)?;
158//!                 h3_conn.send_body(&mut conn, stream_id, b"Hello World!", true)?;
159//!             }
160//!         },
161//!
162//!         Ok((stream_id, quiche::h3::Event::Data)) => {
163//!             // Request body data, handle it.
164//!             # return Ok(());
165//!         },
166//!
167//!         Ok((stream_id, quiche::h3::Event::Finished)) => {
168//!             // Peer terminated stream, handle it.
169//!         },
170//!
171//!         Ok((stream_id, quiche::h3::Event::Reset(err))) => {
172//!             // Peer reset the stream, handle it.
173//!         },
174//!
175//!         Ok((_flow_id, quiche::h3::Event::PriorityUpdate)) => (),
176//!
177//!         Ok((goaway_id, quiche::h3::Event::GoAway)) => {
178//!              // Peer signalled it is going away, handle it.
179//!         },
180//!
181//!         Err(quiche::h3::Error::Done) => {
182//!             // Done reading.
183//!             break;
184//!         },
185//!
186//!         Err(e) => {
187//!             // An error occurred, handle it.
188//!             break;
189//!         },
190//!     }
191//! }
192//! # Ok::<(), quiche::h3::Error>(())
193//! ```
194//!
195//! An HTTP/3 client uses [`poll()`] to read responses:
196//!
197//! ```no_run
198//! use quiche::h3::NameValue;
199//!
200//! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap();
201//! # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]);
202//! # let peer = "127.0.0.1:1234".parse().unwrap();
203//! # let local = "127.0.0.1:1234".parse().unwrap();
204//! # let mut conn = quiche::connect(None, &scid, local, peer, &mut config).unwrap();
205//! # let h3_config = quiche::h3::Config::new()?;
206//! # let mut h3_conn = quiche::h3::Connection::with_transport(&mut conn, &h3_config)?;
207//! loop {
208//!     match h3_conn.poll(&mut conn) {
209//!         Ok((stream_id, quiche::h3::Event::Headers{list, more_frames})) => {
210//!             let status = list.iter().find(|h| h.name() == b":status").unwrap();
211//!             println!("Received {} response on stream {}",
212//!                      std::str::from_utf8(status.value()).unwrap(),
213//!                      stream_id);
214//!         },
215//!
216//!         Ok((stream_id, quiche::h3::Event::Data)) => {
217//!             let mut body = vec![0; 4096];
218//!
219//!             // Consume all body data received on the stream.
220//!             while let Ok(read) =
221//!                 h3_conn.recv_body(&mut conn, stream_id, &mut body)
222//!             {
223//!                 println!("Received {} bytes of payload on stream {}",
224//!                          read, stream_id);
225//!             }
226//!         },
227//!
228//!         Ok((stream_id, quiche::h3::Event::Finished)) => {
229//!             // Peer terminated stream, handle it.
230//!         },
231//!
232//!         Ok((stream_id, quiche::h3::Event::Reset(err))) => {
233//!             // Peer reset the stream, handle it.
234//!         },
235//!
236//!         Ok((_prioritized_element_id, quiche::h3::Event::PriorityUpdate)) => (),
237//!
238//!         Ok((goaway_id, quiche::h3::Event::GoAway)) => {
239//!              // Peer signalled it is going away, handle it.
240//!         },
241//!
242//!         Err(quiche::h3::Error::Done) => {
243//!             // Done reading.
244//!             break;
245//!         },
246//!
247//!         Err(e) => {
248//!             // An error occurred, handle it.
249//!             break;
250//!         },
251//!     }
252//! }
253//! # Ok::<(), quiche::h3::Error>(())
254//! ```
255//!
256//! ## Detecting end of request or response
257//!
258//! A single HTTP/3 request or response may consist of several HEADERS and DATA
259//! frames; it is finished when the QUIC stream is closed. Calling [`poll()`]
260//! repeatedly will generate an [`Event`] for each of these. The application may
261//! use these event to do additional HTTP semantic validation.
262//!
263//! ## HTTP/3 protocol errors
264//!
265//! Quiche is responsible for managing the HTTP/3 connection, ensuring it is in
266//! a correct state and validating all messages received by a peer. This mainly
267//! takes place in the [`poll()`] method. If an HTTP/3 error occurs, quiche will
268//! close the connection and send an appropriate CONNECTION_CLOSE frame to the
269//! peer. An [`Error`] is returned to the application so that it can perform any
270//! required tidy up such as closing sockets.
271//!
272//! [`application_proto()`]: ../struct.Connection.html#method.application_proto
273//! [`stream_finished()`]: ../struct.Connection.html#method.stream_finished
274//! [Connection setup]: ../index.html#connection-setup
275//! [sending]: ../index.html#generating-outgoing-packets
276//! [receiving]: ../index.html#handling-incoming-packets
277//! [`with_transport()`]: struct.Connection.html#method.with_transport
278//! [`poll()`]: struct.Connection.html#method.poll
279//! [`Event`]: enum.Event.html
280//! [`Error`]: enum.Error.html
281//! [`send_request()`]: struct.Connection.html#method.send_response
282//! [`send_response()`]: struct.Connection.html#method.send_response
283//! [`send_body()`]: struct.Connection.html#method.send_body
284
285use std::collections::HashSet;
286use std::collections::VecDeque;
287
288#[cfg(feature = "sfv")]
289use std::convert::TryFrom;
290use std::fmt;
291use std::fmt::Write;
292
293#[cfg(feature = "qlog")]
294use qlog::events::h3::H3FrameCreated;
295#[cfg(feature = "qlog")]
296use qlog::events::h3::H3FrameParsed;
297#[cfg(feature = "qlog")]
298use qlog::events::h3::H3Owner;
299#[cfg(feature = "qlog")]
300use qlog::events::h3::H3PriorityTargetStreamType;
301#[cfg(feature = "qlog")]
302use qlog::events::h3::H3StreamType;
303#[cfg(feature = "qlog")]
304use qlog::events::h3::H3StreamTypeSet;
305#[cfg(feature = "qlog")]
306use qlog::events::h3::Http3EventType;
307#[cfg(feature = "qlog")]
308use qlog::events::h3::Http3Frame;
309#[cfg(feature = "qlog")]
310use qlog::events::EventData;
311#[cfg(feature = "qlog")]
312use qlog::events::EventImportance;
313#[cfg(feature = "qlog")]
314use qlog::events::EventType;
315
316use crate::range_buf::BufFactory;
317use crate::BufSplit;
318
319/// List of ALPN tokens of supported HTTP/3 versions.
320///
321/// This can be passed directly to the [`Config::set_application_protos()`]
322/// method when implementing HTTP/3 applications.
323///
324/// [`Config::set_application_protos()`]:
325/// ../struct.Config.html#method.set_application_protos
326pub const APPLICATION_PROTOCOL: &[&[u8]] = &[b"h3"];
327
328// The offset used when converting HTTP/3 urgency to quiche urgency.
329const PRIORITY_URGENCY_OFFSET: u8 = 124;
330
331// Parameter values as specified in [Extensible Priorities].
332//
333// [Extensible Priorities]: https://www.rfc-editor.org/rfc/rfc9218.html#section-4.
334const PRIORITY_URGENCY_LOWER_BOUND: u8 = 0;
335const PRIORITY_URGENCY_UPPER_BOUND: u8 = 7;
336const PRIORITY_URGENCY_DEFAULT: u8 = 3;
337const PRIORITY_INCREMENTAL_DEFAULT: bool = false;
338
339#[cfg(feature = "qlog")]
340const QLOG_FRAME_CREATED: EventType =
341    EventType::Http3EventType(Http3EventType::FrameCreated);
342#[cfg(feature = "qlog")]
343const QLOG_FRAME_PARSED: EventType =
344    EventType::Http3EventType(Http3EventType::FrameParsed);
345#[cfg(feature = "qlog")]
346const QLOG_STREAM_TYPE_SET: EventType =
347    EventType::Http3EventType(Http3EventType::StreamTypeSet);
348
349/// A specialized [`Result`] type for quiche HTTP/3 operations.
350///
351/// This type is used throughout quiche's HTTP/3 public API for any operation
352/// that can produce an error.
353///
354/// [`Result`]: https://doc.rust-lang.org/std/result/enum.Result.html
355pub type Result<T> = std::result::Result<T, Error>;
356
357/// An HTTP/3 error.
358#[derive(Clone, Copy, Debug, PartialEq, Eq)]
359pub enum Error {
360    /// There is no error or no work to do
361    Done,
362
363    /// The provided buffer is too short.
364    BufferTooShort,
365
366    /// Internal error in the HTTP/3 stack.
367    InternalError,
368
369    /// Endpoint detected that the peer is exhibiting behavior that causes.
370    /// excessive load.
371    ExcessiveLoad,
372
373    /// Stream ID or Push ID greater that current maximum was
374    /// used incorrectly, such as exceeding a limit, reducing a limit,
375    /// or being reused.
376    IdError,
377
378    /// The endpoint detected that its peer created a stream that it will not
379    /// accept.
380    StreamCreationError,
381
382    /// A required critical stream was closed.
383    ClosedCriticalStream,
384
385    /// No SETTINGS frame at beginning of control stream.
386    MissingSettings,
387
388    /// A frame was received which is not permitted in the current state.
389    FrameUnexpected,
390
391    /// Frame violated layout or size rules.
392    FrameError,
393
394    /// QPACK Header block decompression failure.
395    QpackDecompressionFailed,
396
397    /// Error originated from the transport layer.
398    TransportError(crate::Error),
399
400    /// The underlying QUIC stream (or connection) doesn't have enough capacity
401    /// for the operation to complete. The application should retry later on.
402    StreamBlocked,
403
404    /// Error in the payload of a SETTINGS frame.
405    SettingsError,
406
407    /// Server rejected request.
408    RequestRejected,
409
410    /// Request or its response cancelled.
411    RequestCancelled,
412
413    /// Client's request stream terminated without containing a full-formed
414    /// request.
415    RequestIncomplete,
416
417    /// An HTTP message was malformed and cannot be processed.
418    MessageError,
419
420    /// The TCP connection established in response to a CONNECT request was
421    /// reset or abnormally closed.
422    ConnectError,
423
424    /// The requested operation cannot be served over HTTP/3. Peer should retry
425    /// over HTTP/1.1.
426    VersionFallback,
427}
428
429/// HTTP/3 error codes sent on the wire.
430///
431/// As defined in [RFC9114](https://www.rfc-editor.org/rfc/rfc9114.html#http-error-codes).
432#[derive(Copy, Clone, Debug, Eq, PartialEq)]
433pub enum WireErrorCode {
434    /// No error. This is used when the connection or stream needs to be closed,
435    /// but there is no error to signal.
436    NoError              = 0x100,
437    /// Peer violated protocol requirements in a way that does not match a more
438    /// specific error code or endpoint declines to use the more specific
439    /// error code.
440    GeneralProtocolError = 0x101,
441    /// An internal error has occurred in the HTTP stack.
442    InternalError        = 0x102,
443    /// The endpoint detected that its peer created a stream that it will not
444    /// accept.
445    StreamCreationError  = 0x103,
446    /// A stream required by the HTTP/3 connection was closed or reset.
447    ClosedCriticalStream = 0x104,
448    /// A frame was received that was not permitted in the current state or on
449    /// the current stream.
450    FrameUnexpected      = 0x105,
451    /// A frame that fails to satisfy layout requirements or with an invalid
452    /// size was received.
453    FrameError           = 0x106,
454    /// The endpoint detected that its peer is exhibiting a behavior that might
455    /// be generating excessive load.
456    ExcessiveLoad        = 0x107,
457    /// A stream ID or push ID was used incorrectly, such as exceeding a limit,
458    /// reducing a limit, or being reused.
459    IdError              = 0x108,
460    /// An endpoint detected an error in the payload of a SETTINGS frame.
461    SettingsError        = 0x109,
462    /// No SETTINGS frame was received at the beginning of the control stream.
463    MissingSettings      = 0x10a,
464    /// A server rejected a request without performing any application
465    /// processing.
466    RequestRejected      = 0x10b,
467    /// The request or its response (including pushed response) is cancelled.
468    RequestCancelled     = 0x10c,
469    /// The client's stream terminated without containing a fully formed
470    /// request.
471    RequestIncomplete    = 0x10d,
472    /// An HTTP message was malformed and cannot be processed.
473    MessageError         = 0x10e,
474    /// The TCP connection established in response to a CONNECT request was
475    /// reset or abnormally closed.
476    ConnectError         = 0x10f,
477    /// The requested operation cannot be served over HTTP/3. The peer should
478    /// retry over HTTP/1.1.
479    VersionFallback      = 0x110,
480}
481
482impl Error {
483    fn to_wire(self) -> u64 {
484        match self {
485            Error::Done => WireErrorCode::NoError as u64,
486            Error::InternalError => WireErrorCode::InternalError as u64,
487            Error::StreamCreationError =>
488                WireErrorCode::StreamCreationError as u64,
489            Error::ClosedCriticalStream =>
490                WireErrorCode::ClosedCriticalStream as u64,
491            Error::FrameUnexpected => WireErrorCode::FrameUnexpected as u64,
492            Error::FrameError => WireErrorCode::FrameError as u64,
493            Error::ExcessiveLoad => WireErrorCode::ExcessiveLoad as u64,
494            Error::IdError => WireErrorCode::IdError as u64,
495            Error::MissingSettings => WireErrorCode::MissingSettings as u64,
496            Error::QpackDecompressionFailed => 0x200,
497            Error::BufferTooShort => 0x999,
498            Error::TransportError { .. } | Error::StreamBlocked => 0xFF,
499            Error::SettingsError => WireErrorCode::SettingsError as u64,
500            Error::RequestRejected => WireErrorCode::RequestRejected as u64,
501            Error::RequestCancelled => WireErrorCode::RequestCancelled as u64,
502            Error::RequestIncomplete => WireErrorCode::RequestIncomplete as u64,
503            Error::MessageError => WireErrorCode::MessageError as u64,
504            Error::ConnectError => WireErrorCode::ConnectError as u64,
505            Error::VersionFallback => WireErrorCode::VersionFallback as u64,
506        }
507    }
508
509    #[cfg(feature = "ffi")]
510    fn to_c(self) -> libc::ssize_t {
511        match self {
512            Error::Done => -1,
513            Error::BufferTooShort => -2,
514            Error::InternalError => -3,
515            Error::ExcessiveLoad => -4,
516            Error::IdError => -5,
517            Error::StreamCreationError => -6,
518            Error::ClosedCriticalStream => -7,
519            Error::MissingSettings => -8,
520            Error::FrameUnexpected => -9,
521            Error::FrameError => -10,
522            Error::QpackDecompressionFailed => -11,
523            // -12 was previously used for TransportError, skip it
524            Error::StreamBlocked => -13,
525            Error::SettingsError => -14,
526            Error::RequestRejected => -15,
527            Error::RequestCancelled => -16,
528            Error::RequestIncomplete => -17,
529            Error::MessageError => -18,
530            Error::ConnectError => -19,
531            Error::VersionFallback => -20,
532
533            Error::TransportError(quic_error) => quic_error.to_c() - 1000,
534        }
535    }
536}
537
538impl std::fmt::Display for Error {
539    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
540        write!(f, "{self:?}")
541    }
542}
543
544impl std::error::Error for Error {
545    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
546        None
547    }
548}
549
550impl std::convert::From<super::Error> for Error {
551    fn from(err: super::Error) -> Self {
552        match err {
553            super::Error::Done => Error::Done,
554
555            _ => Error::TransportError(err),
556        }
557    }
558}
559
560impl std::convert::From<octets::BufferTooShortError> for Error {
561    fn from(_err: octets::BufferTooShortError) -> Self {
562        Error::BufferTooShort
563    }
564}
565
566/// An HTTP/3 configuration.
567pub struct Config {
568    max_field_section_size: Option<u64>,
569    qpack_max_table_capacity: Option<u64>,
570    qpack_blocked_streams: Option<u64>,
571    connect_protocol_enabled: Option<u64>,
572    /// additional settings are settings that are not part of the H3
573    /// settings explicitly handled above
574    additional_settings: Option<Vec<(u64, u64)>>,
575}
576
577impl Config {
578    /// Creates a new configuration object with default settings.
579    pub const fn new() -> Result<Config> {
580        Ok(Config {
581            max_field_section_size: None,
582            qpack_max_table_capacity: None,
583            qpack_blocked_streams: None,
584            connect_protocol_enabled: None,
585            additional_settings: None,
586        })
587    }
588
589    /// Sets the `SETTINGS_MAX_FIELD_SECTION_SIZE` setting.
590    ///
591    /// By default no limit is enforced. When a request whose headers exceed
592    /// the limit set by the application is received, the call to the [`poll()`]
593    /// method will return the [`Error::ExcessiveLoad`] error, and the
594    /// connection will be closed.
595    ///
596    /// [`poll()`]: struct.Connection.html#method.poll
597    /// [`Error::ExcessiveLoad`]: enum.Error.html#variant.ExcessiveLoad
598    pub fn set_max_field_section_size(&mut self, v: u64) {
599        self.max_field_section_size = Some(v);
600    }
601
602    /// Sets the `SETTINGS_QPACK_MAX_TABLE_CAPACITY` setting.
603    ///
604    /// The default value is `0`.
605    pub fn set_qpack_max_table_capacity(&mut self, v: u64) {
606        self.qpack_max_table_capacity = Some(v);
607    }
608
609    /// Sets the `SETTINGS_QPACK_BLOCKED_STREAMS` setting.
610    ///
611    /// The default value is `0`.
612    pub fn set_qpack_blocked_streams(&mut self, v: u64) {
613        self.qpack_blocked_streams = Some(v);
614    }
615
616    /// Sets or omits the `SETTINGS_ENABLE_CONNECT_PROTOCOL` setting.
617    ///
618    /// The default value is `false`.
619    pub fn enable_extended_connect(&mut self, enabled: bool) {
620        if enabled {
621            self.connect_protocol_enabled = Some(1);
622        } else {
623            self.connect_protocol_enabled = None;
624        }
625    }
626
627    /// Sets additional HTTP/3 settings.
628    ///
629    /// The default value is no additional settings.
630    /// The `additional_settings` parameter must not the following
631    /// settings as they are already handled by this library:
632    ///
633    /// - SETTINGS_QPACK_MAX_TABLE_CAPACITY
634    /// - SETTINGS_MAX_FIELD_SECTION_SIZE
635    /// - SETTINGS_QPACK_BLOCKED_STREAMS
636    /// - SETTINGS_ENABLE_CONNECT_PROTOCOL
637    /// - SETTINGS_H3_DATAGRAM
638    ///
639    /// If such a setting is present in the `additional_settings`,
640    /// the method will return the [`Error::SettingsError`] error.
641    ///
642    /// If a setting identifier is present twice in `additional_settings`,
643    /// the method will return the [`Error::SettingsError`] error.
644    ///
645    /// [`Error::SettingsError`]: enum.Error.html#variant.SettingsError
646    pub fn set_additional_settings(
647        &mut self, additional_settings: Vec<(u64, u64)>,
648    ) -> Result<()> {
649        let explicit_quiche_settings = HashSet::from([
650            frame::SETTINGS_QPACK_MAX_TABLE_CAPACITY,
651            frame::SETTINGS_MAX_FIELD_SECTION_SIZE,
652            frame::SETTINGS_QPACK_BLOCKED_STREAMS,
653            frame::SETTINGS_ENABLE_CONNECT_PROTOCOL,
654            frame::SETTINGS_H3_DATAGRAM,
655            frame::SETTINGS_H3_DATAGRAM_00,
656        ]);
657
658        let dedup_settings: HashSet<u64> =
659            additional_settings.iter().map(|(key, _)| *key).collect();
660
661        if dedup_settings.len() != additional_settings.len() ||
662            !explicit_quiche_settings.is_disjoint(&dedup_settings)
663        {
664            return Err(Error::SettingsError);
665        }
666        self.additional_settings = Some(additional_settings);
667        Ok(())
668    }
669}
670
671/// A trait for types with associated string name and value.
672pub trait NameValue {
673    /// Returns the object's name.
674    fn name(&self) -> &[u8];
675
676    /// Returns the object's value.
677    fn value(&self) -> &[u8];
678}
679
680impl<N, V> NameValue for (N, V)
681where
682    N: AsRef<[u8]>,
683    V: AsRef<[u8]>,
684{
685    fn name(&self) -> &[u8] {
686        self.0.as_ref()
687    }
688
689    fn value(&self) -> &[u8] {
690        self.1.as_ref()
691    }
692}
693
694/// An owned name-value pair representing a raw HTTP header.
695#[derive(Clone, PartialEq, Eq)]
696pub struct Header(Vec<u8>, Vec<u8>);
697
698fn try_print_as_readable(hdr: &[u8], f: &mut fmt::Formatter) -> fmt::Result {
699    match std::str::from_utf8(hdr) {
700        Ok(s) => f.write_str(&s.escape_default().to_string()),
701        Err(_) => write!(f, "{hdr:?}"),
702    }
703}
704
705impl fmt::Debug for Header {
706    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
707        f.write_char('"')?;
708        try_print_as_readable(&self.0, f)?;
709        f.write_str(": ")?;
710        try_print_as_readable(&self.1, f)?;
711        f.write_char('"')
712    }
713}
714
715impl Header {
716    /// Creates a new header.
717    ///
718    /// Both `name` and `value` will be cloned.
719    pub fn new(name: &[u8], value: &[u8]) -> Self {
720        Self(name.to_vec(), value.to_vec())
721    }
722}
723
724impl NameValue for Header {
725    fn name(&self) -> &[u8] {
726        &self.0
727    }
728
729    fn value(&self) -> &[u8] {
730        &self.1
731    }
732}
733
734/// A non-owned name-value pair representing a raw HTTP header.
735#[derive(Clone, Debug, PartialEq, Eq)]
736pub struct HeaderRef<'a>(&'a [u8], &'a [u8]);
737
738impl<'a> HeaderRef<'a> {
739    /// Creates a new header.
740    pub const fn new(name: &'a [u8], value: &'a [u8]) -> Self {
741        Self(name, value)
742    }
743}
744
745impl NameValue for HeaderRef<'_> {
746    fn name(&self) -> &[u8] {
747        self.0
748    }
749
750    fn value(&self) -> &[u8] {
751        self.1
752    }
753}
754
755/// An HTTP/3 connection event.
756#[derive(Clone, Debug, PartialEq, Eq)]
757pub enum Event {
758    /// Request/response headers were received.
759    Headers {
760        /// The list of received header fields. The application should validate
761        /// pseudo-headers and headers.
762        list: Vec<Header>,
763
764        /// Whether more frames will follow the headers on the stream.
765        more_frames: bool,
766    },
767
768    /// Data was received.
769    ///
770    /// This indicates that the application can use the [`recv_body()`] method
771    /// to retrieve the data from the stream.
772    ///
773    /// Note that [`recv_body()`] will need to be called repeatedly until the
774    /// [`Done`] value is returned, as the event will not be re-armed until all
775    /// buffered data is read.
776    ///
777    /// [`recv_body()`]: struct.Connection.html#method.recv_body
778    /// [`Done`]: enum.Error.html#variant.Done
779    Data,
780
781    /// Stream was closed,
782    Finished,
783
784    /// Stream was reset.
785    ///
786    /// The associated data represents the error code sent by the peer.
787    Reset(u64),
788
789    /// PRIORITY_UPDATE was received.
790    ///
791    /// This indicates that the application can use the
792    /// [`take_last_priority_update()`] method to take the last received
793    /// PRIORITY_UPDATE for a specified stream.
794    ///
795    /// This event is triggered once per stream until the last PRIORITY_UPDATE
796    /// is taken. It is recommended that applications defer taking the
797    /// PRIORITY_UPDATE until after [`poll()`] returns [`Done`].
798    ///
799    /// [`take_last_priority_update()`]: struct.Connection.html#method.take_last_priority_update
800    /// [`poll()`]: struct.Connection.html#method.poll
801    /// [`Done`]: enum.Error.html#variant.Done
802    PriorityUpdate,
803
804    /// GOAWAY was received.
805    GoAway,
806}
807
808/// Extensible Priorities parameters.
809///
810/// The `TryFrom` trait supports constructing this object from the serialized
811/// Structured Fields Dictionary field value. I.e, use `TryFrom` to parse the
812/// value of a Priority header field or a PRIORITY_UPDATE frame. Using this
813/// trait requires the `sfv` feature to be enabled.
814#[derive(Debug, PartialEq, Eq)]
815#[repr(C)]
816pub struct Priority {
817    urgency: u8,
818    incremental: bool,
819}
820
821impl Default for Priority {
822    fn default() -> Self {
823        Priority {
824            urgency: PRIORITY_URGENCY_DEFAULT,
825            incremental: PRIORITY_INCREMENTAL_DEFAULT,
826        }
827    }
828}
829
830impl Priority {
831    /// Creates a new Priority.
832    pub const fn new(urgency: u8, incremental: bool) -> Self {
833        Priority {
834            urgency,
835            incremental,
836        }
837    }
838}
839
840#[cfg(feature = "sfv")]
841#[cfg_attr(docsrs, doc(cfg(feature = "sfv")))]
842impl TryFrom<&[u8]> for Priority {
843    type Error = crate::h3::Error;
844
845    /// Try to parse an Extensible Priority field value.
846    ///
847    /// The field value is expected to be a Structured Fields Dictionary; see
848    /// [Extensible Priorities].
849    ///
850    /// If the `u` or `i` fields are contained with correct types, a constructed
851    /// Priority object is returned. Note that urgency values outside of valid
852    /// range (0 through 7) are clamped to 7.
853    ///
854    /// If the `u` or `i` fields are contained with the wrong types,
855    /// Error::Done is returned.
856    ///
857    /// Omitted parameters will yield default values.
858    ///
859    /// [Extensible Priorities]: https://www.rfc-editor.org/rfc/rfc9218.html#section-4.
860    fn try_from(value: &[u8]) -> std::result::Result<Self, Self::Error> {
861        let dict = match sfv::Parser::parse_dictionary(value) {
862            Ok(v) => v,
863
864            Err(_) => return Err(Error::Done),
865        };
866
867        let urgency = match dict.get("u") {
868            // If there is a u parameter, try to read it as an Item of type
869            // Integer. If the value out of the spec's allowed range
870            // (0 through 7), that's an error so set it to the upper
871            // bound (lowest priority) to avoid interference with
872            // other streams.
873            Some(sfv::ListEntry::Item(item)) => match item.bare_item.as_int() {
874                Some(v) => {
875                    if !(PRIORITY_URGENCY_LOWER_BOUND as i64..=
876                        PRIORITY_URGENCY_UPPER_BOUND as i64)
877                        .contains(&v)
878                    {
879                        PRIORITY_URGENCY_UPPER_BOUND
880                    } else {
881                        v as u8
882                    }
883                },
884
885                None => return Err(Error::Done),
886            },
887
888            Some(sfv::ListEntry::InnerList(_)) => return Err(Error::Done),
889
890            // Omitted so use default value.
891            None => PRIORITY_URGENCY_DEFAULT,
892        };
893
894        let incremental = match dict.get("i") {
895            Some(sfv::ListEntry::Item(item)) =>
896                item.bare_item.as_bool().ok_or(Error::Done)?,
897
898            // Omitted so use default value.
899            _ => false,
900        };
901
902        Ok(Priority::new(urgency, incremental))
903    }
904}
905
906struct ConnectionSettings {
907    pub max_field_section_size: Option<u64>,
908    pub qpack_max_table_capacity: Option<u64>,
909    pub qpack_blocked_streams: Option<u64>,
910    pub connect_protocol_enabled: Option<u64>,
911    pub h3_datagram: Option<u64>,
912    pub additional_settings: Option<Vec<(u64, u64)>>,
913    pub raw: Option<Vec<(u64, u64)>>,
914}
915
916#[derive(Default)]
917struct QpackStreams {
918    pub encoder_stream_id: Option<u64>,
919    pub encoder_stream_bytes: u64,
920    pub decoder_stream_id: Option<u64>,
921    pub decoder_stream_bytes: u64,
922}
923
924/// Statistics about the connection.
925///
926/// A connection's statistics can be collected using the [`stats()`] method.
927///
928/// [`stats()`]: struct.Connection.html#method.stats
929#[derive(Clone, Default)]
930pub struct Stats {
931    /// The number of bytes received on the QPACK encoder stream.
932    pub qpack_encoder_stream_recv_bytes: u64,
933    /// The number of bytes received on the QPACK decoder stream.
934    pub qpack_decoder_stream_recv_bytes: u64,
935}
936
937fn close_conn_critical_stream<F: BufFactory>(
938    conn: &mut super::Connection<F>,
939) -> Result<()> {
940    conn.close(
941        true,
942        Error::ClosedCriticalStream.to_wire(),
943        b"Critical stream closed.",
944    )?;
945
946    Err(Error::ClosedCriticalStream)
947}
948
949fn close_conn_if_critical_stream_finished<F: BufFactory>(
950    conn: &mut super::Connection<F>, stream_id: u64,
951) -> Result<()> {
952    if conn.stream_finished(stream_id) {
953        close_conn_critical_stream(conn)?;
954    }
955
956    Ok(())
957}
958
959/// An HTTP/3 connection.
960pub struct Connection {
961    is_server: bool,
962
963    next_request_stream_id: u64,
964    next_uni_stream_id: u64,
965
966    streams: crate::stream::StreamIdHashMap<stream::Stream>,
967
968    local_settings: ConnectionSettings,
969    peer_settings: ConnectionSettings,
970
971    control_stream_id: Option<u64>,
972    peer_control_stream_id: Option<u64>,
973
974    qpack_encoder: qpack::Encoder,
975    qpack_decoder: qpack::Decoder,
976
977    local_qpack_streams: QpackStreams,
978    peer_qpack_streams: QpackStreams,
979
980    max_push_id: u64,
981
982    finished_streams: VecDeque<u64>,
983
984    frames_greased: bool,
985
986    local_goaway_id: Option<u64>,
987    peer_goaway_id: Option<u64>,
988}
989
990impl Connection {
991    fn new(
992        config: &Config, is_server: bool, enable_dgram: bool,
993    ) -> Result<Connection> {
994        let initial_uni_stream_id = if is_server { 0x3 } else { 0x2 };
995        let h3_datagram = if enable_dgram { Some(1) } else { None };
996
997        Ok(Connection {
998            is_server,
999
1000            next_request_stream_id: 0,
1001
1002            next_uni_stream_id: initial_uni_stream_id,
1003
1004            streams: Default::default(),
1005
1006            local_settings: ConnectionSettings {
1007                max_field_section_size: config.max_field_section_size,
1008                qpack_max_table_capacity: config.qpack_max_table_capacity,
1009                qpack_blocked_streams: config.qpack_blocked_streams,
1010                connect_protocol_enabled: config.connect_protocol_enabled,
1011                h3_datagram,
1012                additional_settings: config.additional_settings.clone(),
1013                raw: Default::default(),
1014            },
1015
1016            peer_settings: ConnectionSettings {
1017                max_field_section_size: None,
1018                qpack_max_table_capacity: None,
1019                qpack_blocked_streams: None,
1020                h3_datagram: None,
1021                connect_protocol_enabled: None,
1022                additional_settings: Default::default(),
1023                raw: Default::default(),
1024            },
1025
1026            control_stream_id: None,
1027            peer_control_stream_id: None,
1028
1029            qpack_encoder: qpack::Encoder::new(),
1030            qpack_decoder: qpack::Decoder::new(),
1031
1032            local_qpack_streams: Default::default(),
1033            peer_qpack_streams: Default::default(),
1034
1035            max_push_id: 0,
1036
1037            finished_streams: VecDeque::new(),
1038
1039            frames_greased: false,
1040
1041            local_goaway_id: None,
1042            peer_goaway_id: None,
1043        })
1044    }
1045
1046    /// Creates a new HTTP/3 connection using the provided QUIC connection.
1047    ///
1048    /// This will also initiate the HTTP/3 handshake with the peer by opening
1049    /// all control streams (including QPACK) and sending the local settings.
1050    ///
1051    /// On success the new connection is returned.
1052    ///
1053    /// The [`StreamLimit`] error is returned when the HTTP/3 control stream
1054    /// cannot be created due to stream limits.
1055    ///
1056    /// The [`InternalError`] error is returned when either the underlying QUIC
1057    /// connection is not in a suitable state, or the HTTP/3 control stream
1058    /// cannot be created due to flow control limits.
1059    ///
1060    /// [`StreamLimit`]: ../enum.Error.html#variant.StreamLimit
1061    /// [`InternalError`]: ../enum.Error.html#variant.InternalError
1062    pub fn with_transport<F: BufFactory>(
1063        conn: &mut super::Connection<F>, config: &Config,
1064    ) -> Result<Connection> {
1065        let is_client = !conn.is_server;
1066        if is_client && !(conn.is_established() || conn.is_in_early_data()) {
1067            trace!("{} QUIC connection must be established or in early data before creating an HTTP/3 connection", conn.trace_id());
1068            return Err(Error::InternalError);
1069        }
1070
1071        let mut http3_conn =
1072            Connection::new(config, conn.is_server, conn.dgram_enabled())?;
1073
1074        match http3_conn.send_settings(conn) {
1075            Ok(_) => (),
1076
1077            Err(e) => {
1078                conn.close(true, e.to_wire(), b"Error opening control stream")?;
1079                return Err(e);
1080            },
1081        };
1082
1083        // Try opening QPACK streams, but ignore errors if it fails since we
1084        // don't need them right now.
1085        http3_conn.open_qpack_encoder_stream(conn).ok();
1086        http3_conn.open_qpack_decoder_stream(conn).ok();
1087
1088        if conn.grease {
1089            // Try opening a GREASE stream, but ignore errors since it's not
1090            // critical.
1091            http3_conn.open_grease_stream(conn).ok();
1092        }
1093
1094        Ok(http3_conn)
1095    }
1096
1097    /// Sends an HTTP/3 request.
1098    ///
1099    /// The request is encoded from the provided list of headers without a
1100    /// body, and sent on a newly allocated stream. To include a body,
1101    /// set `fin` as `false` and subsequently call [`send_body()`] with the
1102    /// same `conn` and the `stream_id` returned from this method.
1103    ///
1104    /// On success the newly allocated stream ID is returned.
1105    ///
1106    /// The [`StreamBlocked`] error is returned when the underlying QUIC stream
1107    /// doesn't have enough capacity for the operation to complete. When this
1108    /// happens the application should retry the operation once the stream is
1109    /// reported as writable again.
1110    ///
1111    /// [`send_body()`]: struct.Connection.html#method.send_body
1112    /// [`StreamBlocked`]: enum.Error.html#variant.StreamBlocked
1113    pub fn send_request<T: NameValue, F: BufFactory>(
1114        &mut self, conn: &mut super::Connection<F>, headers: &[T], fin: bool,
1115    ) -> Result<u64> {
1116        // If we received a GOAWAY from the peer, MUST NOT initiate new
1117        // requests.
1118        if self.peer_goaway_id.is_some() {
1119            return Err(Error::FrameUnexpected);
1120        }
1121
1122        let stream_id = self.next_request_stream_id;
1123
1124        self.streams
1125            .insert(stream_id, <stream::Stream>::new(stream_id, true));
1126
1127        // The underlying QUIC stream does not exist yet, so calls to e.g.
1128        // stream_capacity() will fail. By writing a 0-length buffer, we force
1129        // the creation of the QUIC stream state, without actually writing
1130        // anything.
1131        if let Err(e) = conn.stream_send(stream_id, b"", false) {
1132            self.streams.remove(&stream_id);
1133
1134            if e == super::Error::Done {
1135                return Err(Error::StreamBlocked);
1136            }
1137
1138            return Err(e.into());
1139        };
1140
1141        self.send_headers(conn, stream_id, headers, fin)?;
1142
1143        // To avoid skipping stream IDs, we only calculate the next available
1144        // stream ID when a request has been successfully buffered.
1145        self.next_request_stream_id = self
1146            .next_request_stream_id
1147            .checked_add(4)
1148            .ok_or(Error::IdError)?;
1149
1150        Ok(stream_id)
1151    }
1152
1153    /// Sends an HTTP/3 response on the specified stream with default priority.
1154    ///
1155    /// This method sends the provided `headers` as a single initial response
1156    /// without a body.
1157    ///
1158    /// To send a non-final 1xx, then a final 200+ without body:
1159    ///   * send_response() with `fin` set to `false`.
1160    ///   * [`send_additional_headers()`] with fin set to `true` using the same
1161    ///     `stream_id` value.
1162    ///
1163    /// To send a non-final 1xx, then a final 200+ with body:
1164    ///   * send_response() with `fin` set to `false`.
1165    ///   * [`send_additional_headers()`] with fin set to `false` and same
1166    ///     `stream_id` value.
1167    ///   * [`send_body()`] with same `stream_id`.
1168    ///
1169    /// To send a final 200+ with body:
1170    ///   * send_response() with `fin` set to `false`.
1171    ///   * [`send_body()`] with same `stream_id`.
1172    ///
1173    /// Additional headers can only be sent during certain phases of an HTTP/3
1174    /// message exchange, see [Section 4.1 of RFC 9114]. The [`FrameUnexpected`]
1175    /// error is returned if this method, or [`send_response_with_priority()`],
1176    /// are called multiple times with the same `stream_id` value.
1177    ///
1178    /// The [`StreamBlocked`] error is returned when the underlying QUIC stream
1179    /// doesn't have enough capacity for the operation to complete. When this
1180    /// happens the application should retry the operation once the stream is
1181    /// reported as writable again.
1182    ///
1183    /// [`send_body()`]: struct.Connection.html#method.send_body
1184    /// [`send_additional_headers()`]:
1185    ///     struct.Connection.html#method.send_additional_headers
1186    /// [`send_response_with_priority()`]:
1187    ///     struct.Connection.html#method.send_response_with_priority
1188    /// [`FrameUnexpected`]: enum.Error.html#variant.FrameUnexpected
1189    /// [`StreamBlocked`]: enum.Error.html#variant.StreamBlocked
1190    pub fn send_response<T: NameValue, F: BufFactory>(
1191        &mut self, conn: &mut super::Connection<F>, stream_id: u64,
1192        headers: &[T], fin: bool,
1193    ) -> Result<()> {
1194        let priority = Default::default();
1195
1196        self.send_response_with_priority(
1197            conn, stream_id, headers, &priority, fin,
1198        )?;
1199
1200        Ok(())
1201    }
1202
1203    /// Sends an HTTP/3 response on the specified stream with specified
1204    /// priority.
1205    ///
1206    /// The `priority` parameter represents [Extensible Priority]
1207    /// parameters. If the urgency is outside the range 0-7, it will be clamped
1208    /// to 7.
1209    ///
1210    /// This method sends the provided `headers` as a single initial response
1211    /// without a body.
1212    ///
1213    /// To send a non-final 1xx, then a final 200+ without body:
1214    ///   * send_response_with_priority() with `fin` set to `false`.
1215    ///   * [`send_additional_headers()`] with fin set to `true` using the same
1216    ///     `stream_id` value.
1217    ///
1218    /// To send a non-final 1xx, then a final 200+ with body:
1219    ///   * send_response_with_priority() with `fin` set to `false`.
1220    ///   * [`send_additional_headers()`] with fin set to `false` and same
1221    ///     `stream_id` value.
1222    ///   * [`send_body()`] with same `stream_id`.
1223    ///
1224    /// To send a final 200+ with body:
1225    ///   * send_response_with_priority() with `fin` set to `false`.
1226    ///   * [`send_body()`] with same `stream_id`.
1227    ///
1228    /// Additional headers can only be sent during certain phases of an HTTP/3
1229    /// message exchange, see [Section 4.1 of RFC 9114]. The [`FrameUnexpected`]
1230    /// error is returned if this method, or [`send_response()`],
1231    /// are called multiple times with the same `stream_id` value.
1232    ///
1233    /// The [`StreamBlocked`] error is returned when the underlying QUIC stream
1234    /// doesn't have enough capacity for the operation to complete. When this
1235    /// happens the application should retry the operation once the stream is
1236    /// reported as writable again.
1237    ///
1238    /// [`send_body()`]: struct.Connection.html#method.send_body
1239    /// [`send_additional_headers()`]:
1240    ///     struct.Connection.html#method.send_additional_headers
1241    /// [`send_response()`]:
1242    ///     struct.Connection.html#method.send_response
1243    /// [`FrameUnexpected`]: enum.Error.html#variant.FrameUnexpected
1244    /// [`StreamBlocked`]: enum.Error.html#variant.StreamBlocked
1245    /// [Extensible Priority]: https://www.rfc-editor.org/rfc/rfc9218.html#section-4.
1246    pub fn send_response_with_priority<T: NameValue, F: BufFactory>(
1247        &mut self, conn: &mut super::Connection<F>, stream_id: u64,
1248        headers: &[T], priority: &Priority, fin: bool,
1249    ) -> Result<()> {
1250        match self.streams.get(&stream_id) {
1251            Some(s) => {
1252                // Only one initial HEADERS allowed.
1253                if s.local_initialized() {
1254                    return Err(Error::FrameUnexpected);
1255                }
1256
1257                s
1258            },
1259
1260            None => return Err(Error::FrameUnexpected),
1261        };
1262
1263        self.send_headers(conn, stream_id, headers, fin)?;
1264
1265        // Clamp and shift urgency into quiche-priority space
1266        let urgency = priority
1267            .urgency
1268            .clamp(PRIORITY_URGENCY_LOWER_BOUND, PRIORITY_URGENCY_UPPER_BOUND) +
1269            PRIORITY_URGENCY_OFFSET;
1270
1271        conn.stream_priority(stream_id, urgency, priority.incremental)?;
1272
1273        Ok(())
1274    }
1275
1276    /// Sends additional HTTP/3 headers.
1277    ///
1278    /// After the initial request or response headers have been sent, using
1279    /// [`send_request()`] or [`send_response()`] respectively, this method can
1280    /// be used send an additional HEADERS frame. For example, to send a single
1281    /// instance of trailers after a request with a body, or to issue another
1282    /// non-final 1xx after a preceding 1xx, or to issue a final response after
1283    /// a preceding 1xx.
1284    ///
1285    /// Additional headers can only be sent during certain phases of an HTTP/3
1286    /// message exchange, see [Section 4.1 of RFC 9114]. The [`FrameUnexpected`]
1287    /// error is returned when this method is called during the wrong phase,
1288    /// such as before initial headers have been sent, or if trailers have
1289    /// already been sent.
1290    ///
1291    /// The [`StreamBlocked`] error is returned when the underlying QUIC stream
1292    /// doesn't have enough capacity for the operation to complete. When this
1293    /// happens the application should retry the operation once the stream is
1294    /// reported as writable again.
1295    ///
1296    /// [`send_request()`]: struct.Connection.html#method.send_request
1297    /// [`send_response()`]: struct.Connection.html#method.send_response
1298    /// [`StreamBlocked`]: enum.Error.html#variant.StreamBlocked
1299    /// [`FrameUnexpected`]: enum.Error.html#variant.FrameUnexpected
1300    /// [Section 4.1 of RFC 9114]:
1301    ///     https://www.rfc-editor.org/rfc/rfc9114.html#section-4.1.
1302    pub fn send_additional_headers<T: NameValue, F: BufFactory>(
1303        &mut self, conn: &mut super::Connection<F>, stream_id: u64,
1304        headers: &[T], is_trailer_section: bool, fin: bool,
1305    ) -> Result<()> {
1306        // Clients can only send trailer headers.
1307        if !self.is_server && !is_trailer_section {
1308            return Err(Error::FrameUnexpected);
1309        }
1310
1311        match self.streams.get(&stream_id) {
1312            Some(s) => {
1313                // Initial HEADERS must have been sent.
1314                if !s.local_initialized() {
1315                    return Err(Error::FrameUnexpected);
1316                }
1317
1318                // Only one trailing HEADERS allowed.
1319                if s.trailers_sent() {
1320                    return Err(Error::FrameUnexpected);
1321                }
1322
1323                s
1324            },
1325
1326            None => return Err(Error::FrameUnexpected),
1327        };
1328
1329        self.send_headers(conn, stream_id, headers, fin)?;
1330
1331        if is_trailer_section {
1332            // send_headers() might have tidied the stream away, so we need to
1333            // check again.
1334            if let Some(s) = self.streams.get_mut(&stream_id) {
1335                s.mark_trailers_sent();
1336            }
1337        }
1338
1339        Ok(())
1340    }
1341
1342    fn encode_header_block<T: NameValue>(
1343        &mut self, headers: &[T],
1344    ) -> Result<Vec<u8>> {
1345        let headers_len = headers
1346            .iter()
1347            .fold(0, |acc, h| acc + h.value().len() + h.name().len() + 32);
1348
1349        let mut header_block = vec![0; headers_len];
1350        let len = self
1351            .qpack_encoder
1352            .encode(headers, &mut header_block)
1353            .map_err(|_| Error::InternalError)?;
1354
1355        header_block.truncate(len);
1356
1357        Ok(header_block)
1358    }
1359
1360    fn send_headers<T: NameValue, F: BufFactory>(
1361        &mut self, conn: &mut super::Connection<F>, stream_id: u64,
1362        headers: &[T], fin: bool,
1363    ) -> Result<()> {
1364        let mut d = [42; 10];
1365        let mut b = octets::OctetsMut::with_slice(&mut d);
1366
1367        if !self.frames_greased && conn.grease {
1368            self.send_grease_frames(conn, stream_id)?;
1369            self.frames_greased = true;
1370        }
1371
1372        let header_block = self.encode_header_block(headers)?;
1373
1374        let overhead = octets::varint_len(frame::HEADERS_FRAME_TYPE_ID) +
1375            octets::varint_len(header_block.len() as u64);
1376
1377        // Headers need to be sent atomically, so make sure the stream has
1378        // enough capacity.
1379        match conn.stream_writable(stream_id, overhead + header_block.len()) {
1380            Ok(true) => (),
1381
1382            Ok(false) => return Err(Error::StreamBlocked),
1383
1384            Err(e) => {
1385                if conn.stream_finished(stream_id) {
1386                    self.streams.remove(&stream_id);
1387                }
1388
1389                return Err(e.into());
1390            },
1391        };
1392
1393        b.put_varint(frame::HEADERS_FRAME_TYPE_ID)?;
1394        b.put_varint(header_block.len() as u64)?;
1395        let off = b.off();
1396        conn.stream_send(stream_id, &d[..off], false)?;
1397
1398        // Sending header block separately avoids unnecessary copy.
1399        conn.stream_send(stream_id, &header_block, fin)?;
1400
1401        trace!(
1402            "{} tx frm HEADERS stream={} len={} fin={}",
1403            conn.trace_id(),
1404            stream_id,
1405            header_block.len(),
1406            fin
1407        );
1408
1409        qlog_with_type!(QLOG_FRAME_CREATED, conn.qlog, q, {
1410            let qlog_headers = headers
1411                .iter()
1412                .map(|h| qlog::events::h3::HttpHeader {
1413                    name: String::from_utf8_lossy(h.name()).into_owned(),
1414                    value: String::from_utf8_lossy(h.value()).into_owned(),
1415                })
1416                .collect();
1417
1418            let frame = Http3Frame::Headers {
1419                headers: qlog_headers,
1420            };
1421            let ev_data = EventData::H3FrameCreated(H3FrameCreated {
1422                stream_id,
1423                length: Some(header_block.len() as u64),
1424                frame,
1425                ..Default::default()
1426            });
1427
1428            q.add_event_data_now(ev_data).ok();
1429        });
1430
1431        if let Some(s) = self.streams.get_mut(&stream_id) {
1432            s.initialize_local();
1433        }
1434
1435        if fin && conn.stream_finished(stream_id) {
1436            self.streams.remove(&stream_id);
1437        }
1438
1439        Ok(())
1440    }
1441
1442    /// Sends an HTTP/3 body chunk on the given stream.
1443    ///
1444    /// On success the number of bytes written is returned, or [`Done`] if no
1445    /// bytes could be written (e.g. because the stream is blocked).
1446    ///
1447    /// Note that the number of written bytes returned can be lower than the
1448    /// length of the input buffer when the underlying QUIC stream doesn't have
1449    /// enough capacity for the operation to complete.
1450    ///
1451    /// When a partial write happens (including when [`Done`] is returned) the
1452    /// application should retry the operation once the stream is reported as
1453    /// writable again.
1454    ///
1455    /// [`Done`]: enum.Error.html#variant.Done
1456    pub fn send_body<F: BufFactory>(
1457        &mut self, conn: &mut super::Connection<F>, stream_id: u64, body: &[u8],
1458        fin: bool,
1459    ) -> Result<usize> {
1460        self.do_send_body(
1461            conn,
1462            stream_id,
1463            body,
1464            fin,
1465            |conn: &mut super::Connection<F>,
1466             header: &[u8],
1467             stream_id: u64,
1468             body: &[u8],
1469             body_len: usize,
1470             fin: bool| {
1471                conn.stream_send(stream_id, header, false)?;
1472                Ok(conn
1473                    .stream_send(stream_id, &body[..body_len], fin)
1474                    .map(|v| (v, v))?)
1475            },
1476        )
1477    }
1478
1479    /// Sends an HTTP/3 body chunk provided as a raw buffer on the given stream.
1480    ///
1481    /// If the capacity allows it the buffer will be appended to the stream's
1482    /// send queue with zero copying.
1483    ///
1484    /// On success the number of bytes written is returned, or [`Done`] if no
1485    /// bytes could be written (e.g. because the stream is blocked).
1486    ///
1487    /// Note that the number of written bytes returned can be lower than the
1488    /// length of the input buffer when the underlying QUIC stream doesn't have
1489    /// enough capacity for the operation to complete.
1490    ///
1491    /// When a partial write happens (including when [`Done`] is returned) the
1492    /// remaining (unwrittent) buffer will also be returned. The application
1493    /// should retry the operation once the stream is reported as writable
1494    /// again.
1495    ///
1496    /// [`Done`]: enum.Error.html#variant.Done
1497    pub fn send_body_zc<F>(
1498        &mut self, conn: &mut super::Connection<F>, stream_id: u64,
1499        body: &mut F::Buf, fin: bool,
1500    ) -> Result<usize>
1501    where
1502        F: BufFactory,
1503        F::Buf: BufSplit,
1504    {
1505        self.do_send_body(
1506            conn,
1507            stream_id,
1508            body,
1509            fin,
1510            |conn: &mut super::Connection<F>,
1511             header: &[u8],
1512             stream_id: u64,
1513             body: &mut F::Buf,
1514             mut body_len: usize,
1515             fin: bool| {
1516                let with_prefix = body.try_add_prefix(header);
1517                if !with_prefix {
1518                    conn.stream_send(stream_id, header, false)?;
1519                } else {
1520                    body_len += header.len();
1521                }
1522
1523                let (mut n, rem) = conn.stream_send_zc(
1524                    stream_id,
1525                    body.clone(),
1526                    Some(body_len),
1527                    fin,
1528                )?;
1529
1530                if with_prefix {
1531                    n -= header.len();
1532                }
1533
1534                if let Some(rem) = rem {
1535                    let _ = std::mem::replace(body, rem);
1536                }
1537
1538                Ok((n, n))
1539            },
1540        )
1541    }
1542
1543    fn do_send_body<F, B, R, SND>(
1544        &mut self, conn: &mut super::Connection<F>, stream_id: u64, body: B,
1545        fin: bool, write_fn: SND,
1546    ) -> Result<R>
1547    where
1548        F: BufFactory,
1549        B: AsRef<[u8]>,
1550        SND: FnOnce(
1551            &mut super::Connection<F>,
1552            &[u8],
1553            u64,
1554            B,
1555            usize,
1556            bool,
1557        ) -> Result<(usize, R)>,
1558    {
1559        let mut d = [42; 10];
1560        let mut b = octets::OctetsMut::with_slice(&mut d);
1561
1562        let len = body.as_ref().len();
1563
1564        // Validate that it is sane to send data on the stream.
1565        if stream_id % 4 != 0 {
1566            return Err(Error::FrameUnexpected);
1567        }
1568
1569        match self.streams.get_mut(&stream_id) {
1570            Some(s) => {
1571                if !s.local_initialized() {
1572                    return Err(Error::FrameUnexpected);
1573                }
1574
1575                if s.trailers_sent() {
1576                    return Err(Error::FrameUnexpected);
1577                }
1578            },
1579
1580            None => {
1581                return Err(Error::FrameUnexpected);
1582            },
1583        };
1584
1585        // Avoid sending 0-length DATA frames when the fin flag is false.
1586        if len == 0 && !fin {
1587            return Err(Error::Done);
1588        }
1589
1590        let overhead = octets::varint_len(frame::DATA_FRAME_TYPE_ID) +
1591            octets::varint_len(len as u64);
1592
1593        let stream_cap = match conn.stream_capacity(stream_id) {
1594            Ok(v) => v,
1595
1596            Err(e) => {
1597                if conn.stream_finished(stream_id) {
1598                    self.streams.remove(&stream_id);
1599                }
1600
1601                return Err(e.into());
1602            },
1603        };
1604
1605        // Make sure there is enough capacity to send the DATA frame header.
1606        if stream_cap < overhead {
1607            let _ = conn.stream_writable(stream_id, overhead + 1);
1608            return Err(Error::Done);
1609        }
1610
1611        // Cap the frame payload length to the stream's capacity.
1612        let body_len = std::cmp::min(len, stream_cap - overhead);
1613
1614        // If we can't send the entire body, set the fin flag to false so the
1615        // application can try again later.
1616        let fin = if body_len != len { false } else { fin };
1617
1618        // Again, avoid sending 0-length DATA frames when the fin flag is false.
1619        if body_len == 0 && !fin {
1620            let _ = conn.stream_writable(stream_id, overhead + 1);
1621            return Err(Error::Done);
1622        }
1623
1624        b.put_varint(frame::DATA_FRAME_TYPE_ID)?;
1625        b.put_varint(body_len as u64)?;
1626        let off = b.off();
1627
1628        // Return how many bytes were written, excluding the frame header.
1629        // Sending body separately avoids unnecessary copy.
1630        let (written, ret) =
1631            write_fn(conn, &d[..off], stream_id, body, body_len, fin)?;
1632
1633        trace!(
1634            "{} tx frm DATA stream={} len={} fin={}",
1635            conn.trace_id(),
1636            stream_id,
1637            written,
1638            fin
1639        );
1640
1641        qlog_with_type!(QLOG_FRAME_CREATED, conn.qlog, q, {
1642            let frame = Http3Frame::Data { raw: None };
1643            let ev_data = EventData::H3FrameCreated(H3FrameCreated {
1644                stream_id,
1645                length: Some(written as u64),
1646                frame,
1647                ..Default::default()
1648            });
1649
1650            q.add_event_data_now(ev_data).ok();
1651        });
1652
1653        if written < len {
1654            // Ensure the peer is notified that the connection or stream is
1655            // blocked when the stream's capacity is limited by flow control.
1656            //
1657            // We only need enough capacity to send a few bytes, to make sure
1658            // the stream doesn't hang due to congestion window not growing
1659            // enough.
1660            let _ = conn.stream_writable(stream_id, overhead + 1);
1661        }
1662
1663        if fin && written == len && conn.stream_finished(stream_id) {
1664            self.streams.remove(&stream_id);
1665        }
1666
1667        Ok(ret)
1668    }
1669
1670    /// Returns whether the peer enabled HTTP/3 DATAGRAM frame support.
1671    ///
1672    /// Support is signalled by the peer's SETTINGS, so this method always
1673    /// returns false until they have been processed using the [`poll()`]
1674    /// method.
1675    ///
1676    /// [`poll()`]: struct.Connection.html#method.poll
1677    pub fn dgram_enabled_by_peer<F: BufFactory>(
1678        &self, conn: &super::Connection<F>,
1679    ) -> bool {
1680        self.peer_settings.h3_datagram == Some(1) &&
1681            conn.dgram_max_writable_len().is_some()
1682    }
1683
1684    /// Returns whether the peer enabled extended CONNECT support.
1685    ///
1686    /// Support is signalled by the peer's SETTINGS, so this method always
1687    /// returns false until they have been processed using the [`poll()`]
1688    /// method.
1689    ///
1690    /// [`poll()`]: struct.Connection.html#method.poll
1691    pub fn extended_connect_enabled_by_peer(&self) -> bool {
1692        self.peer_settings.connect_protocol_enabled == Some(1)
1693    }
1694
1695    /// Reads request or response body data into the provided buffer.
1696    ///
1697    /// Applications should call this method whenever the [`poll()`] method
1698    /// returns a [`Data`] event.
1699    ///
1700    /// On success the amount of bytes read is returned, or [`Done`] if there
1701    /// is no data to read.
1702    ///
1703    /// [`poll()`]: struct.Connection.html#method.poll
1704    /// [`Data`]: enum.Event.html#variant.Data
1705    /// [`Done`]: enum.Error.html#variant.Done
1706    pub fn recv_body<F: BufFactory>(
1707        &mut self, conn: &mut super::Connection<F>, stream_id: u64,
1708        out: &mut [u8],
1709    ) -> Result<usize> {
1710        let mut total = 0;
1711
1712        // Try to consume all buffered data for the stream, even across multiple
1713        // DATA frames.
1714        while total < out.len() {
1715            let stream = self.streams.get_mut(&stream_id).ok_or(Error::Done)?;
1716
1717            if stream.state() != stream::State::Data {
1718                break;
1719            }
1720
1721            let (read, fin) =
1722                match stream.try_consume_data(conn, &mut out[total..]) {
1723                    Ok(v) => v,
1724
1725                    Err(Error::Done) => break,
1726
1727                    Err(e) => return Err(e),
1728                };
1729
1730            total += read;
1731
1732            // No more data to read, we are done.
1733            if read == 0 || fin {
1734                break;
1735            }
1736
1737            // Process incoming data from the stream. For example, if a whole
1738            // DATA frame was consumed, and another one is queued behind it,
1739            // this will ensure the additional data will also be returned to
1740            // the application.
1741            match self.process_readable_stream(conn, stream_id, false) {
1742                Ok(_) => unreachable!(),
1743
1744                Err(Error::Done) => (),
1745
1746                Err(e) => return Err(e),
1747            };
1748
1749            if conn.stream_finished(stream_id) {
1750                break;
1751            }
1752        }
1753
1754        // While body is being received, the stream is marked as finished only
1755        // when all data is read by the application.
1756        if conn.stream_finished(stream_id) {
1757            self.process_finished_stream(stream_id);
1758        }
1759
1760        if total == 0 {
1761            return Err(Error::Done);
1762        }
1763
1764        Ok(total)
1765    }
1766
1767    /// Sends a PRIORITY_UPDATE frame on the control stream with specified
1768    /// request stream ID and priority.
1769    ///
1770    /// The `priority` parameter represents [Extensible Priority]
1771    /// parameters. If the urgency is outside the range 0-7, it will be clamped
1772    /// to 7.
1773    ///
1774    /// The [`StreamBlocked`] error is returned when the underlying QUIC stream
1775    /// doesn't have enough capacity for the operation to complete. When this
1776    /// happens the application should retry the operation once the stream is
1777    /// reported as writable again.
1778    ///
1779    /// [`StreamBlocked`]: enum.Error.html#variant.StreamBlocked
1780    /// [Extensible Priority]: https://www.rfc-editor.org/rfc/rfc9218.html#section-4.
1781    pub fn send_priority_update_for_request<F: BufFactory>(
1782        &mut self, conn: &mut super::Connection<F>, stream_id: u64,
1783        priority: &Priority,
1784    ) -> Result<()> {
1785        let mut d = [42; 20];
1786        let mut b = octets::OctetsMut::with_slice(&mut d);
1787
1788        // Validate that it is sane to send PRIORITY_UPDATE.
1789        if self.is_server {
1790            return Err(Error::FrameUnexpected);
1791        }
1792
1793        if stream_id % 4 != 0 {
1794            return Err(Error::FrameUnexpected);
1795        }
1796
1797        let control_stream_id =
1798            self.control_stream_id.ok_or(Error::FrameUnexpected)?;
1799
1800        let urgency = priority
1801            .urgency
1802            .clamp(PRIORITY_URGENCY_LOWER_BOUND, PRIORITY_URGENCY_UPPER_BOUND);
1803
1804        let mut field_value = format!("u={urgency}");
1805
1806        if priority.incremental {
1807            field_value.push_str(",i");
1808        }
1809
1810        let priority_field_value = field_value.as_bytes();
1811        let frame_payload_len =
1812            octets::varint_len(stream_id) + priority_field_value.len();
1813
1814        let overhead =
1815            octets::varint_len(frame::PRIORITY_UPDATE_FRAME_REQUEST_TYPE_ID) +
1816                octets::varint_len(stream_id) +
1817                octets::varint_len(frame_payload_len as u64);
1818
1819        // Make sure the control stream has enough capacity.
1820        match conn.stream_writable(
1821            control_stream_id,
1822            overhead + priority_field_value.len(),
1823        ) {
1824            Ok(true) => (),
1825
1826            Ok(false) => return Err(Error::StreamBlocked),
1827
1828            Err(e) => {
1829                return Err(e.into());
1830            },
1831        }
1832
1833        b.put_varint(frame::PRIORITY_UPDATE_FRAME_REQUEST_TYPE_ID)?;
1834        b.put_varint(frame_payload_len as u64)?;
1835        b.put_varint(stream_id)?;
1836        let off = b.off();
1837        conn.stream_send(control_stream_id, &d[..off], false)?;
1838
1839        // Sending field value separately avoids unnecessary copy.
1840        conn.stream_send(control_stream_id, priority_field_value, false)?;
1841
1842        trace!(
1843            "{} tx frm PRIORITY_UPDATE request_stream={} priority_field_value={}",
1844            conn.trace_id(),
1845            stream_id,
1846            field_value,
1847        );
1848
1849        qlog_with_type!(QLOG_FRAME_CREATED, conn.qlog, q, {
1850            let frame = Http3Frame::PriorityUpdate {
1851                target_stream_type: H3PriorityTargetStreamType::Request,
1852                prioritized_element_id: stream_id,
1853                priority_field_value: field_value.clone(),
1854            };
1855
1856            let ev_data = EventData::H3FrameCreated(H3FrameCreated {
1857                stream_id,
1858                length: Some(priority_field_value.len() as u64),
1859                frame,
1860                ..Default::default()
1861            });
1862
1863            q.add_event_data_now(ev_data).ok();
1864        });
1865
1866        Ok(())
1867    }
1868
1869    /// Take the last PRIORITY_UPDATE for a prioritized element ID.
1870    ///
1871    /// When the [`poll()`] method returns a [`PriorityUpdate`] event for a
1872    /// prioritized element, the event has triggered and will not rearm until
1873    /// applications call this method. It is recommended that applications defer
1874    /// taking the PRIORITY_UPDATE until after [`poll()`] returns [`Done`].
1875    ///
1876    /// On success the Priority Field Value is returned, or [`Done`] if there is
1877    /// no PRIORITY_UPDATE to read (either because there is no value to take, or
1878    /// because the prioritized element does not exist).
1879    ///
1880    /// [`poll()`]: struct.Connection.html#method.poll
1881    /// [`PriorityUpdate`]: enum.Event.html#variant.PriorityUpdate
1882    /// [`Done`]: enum.Error.html#variant.Done
1883    pub fn take_last_priority_update(
1884        &mut self, prioritized_element_id: u64,
1885    ) -> Result<Vec<u8>> {
1886        if let Some(stream) = self.streams.get_mut(&prioritized_element_id) {
1887            return stream.take_last_priority_update().ok_or(Error::Done);
1888        }
1889
1890        Err(Error::Done)
1891    }
1892
1893    /// Processes HTTP/3 data received from the peer.
1894    ///
1895    /// On success it returns an [`Event`] and an ID, or [`Done`] when there are
1896    /// no events to report.
1897    ///
1898    /// Note that all events are edge-triggered, meaning that once reported they
1899    /// will not be reported again by calling this method again, until the event
1900    /// is re-armed.
1901    ///
1902    /// The events [`Headers`], [`Data`] and [`Finished`] return a stream ID,
1903    /// which is used in methods [`recv_body()`], [`send_response()`] or
1904    /// [`send_body()`].
1905    ///
1906    /// The event [`GoAway`] returns an ID that depends on the connection role.
1907    /// A client receives the largest processed stream ID. A server receives the
1908    /// the largest permitted push ID.
1909    ///
1910    /// The event [`PriorityUpdate`] only occurs at servers. It returns a
1911    /// prioritized element ID that is used in the method
1912    /// [`take_last_priority_update()`], which rearms the event for that ID.
1913    ///
1914    /// If an error occurs while processing data, the connection is closed with
1915    /// the appropriate error code, using the transport's [`close()`] method.
1916    ///
1917    /// [`Event`]: enum.Event.html
1918    /// [`Done`]: enum.Error.html#variant.Done
1919    /// [`Headers`]: enum.Event.html#variant.Headers
1920    /// [`Data`]: enum.Event.html#variant.Data
1921    /// [`Finished`]: enum.Event.html#variant.Finished
1922    /// [`GoAway`]: enum.Event.html#variant.GoAWay
1923    /// [`PriorityUpdate`]: enum.Event.html#variant.PriorityUpdate
1924    /// [`recv_body()`]: struct.Connection.html#method.recv_body
1925    /// [`send_response()`]: struct.Connection.html#method.send_response
1926    /// [`send_body()`]: struct.Connection.html#method.send_body
1927    /// [`recv_dgram()`]: struct.Connection.html#method.recv_dgram
1928    /// [`take_last_priority_update()`]: struct.Connection.html#method.take_last_priority_update
1929    /// [`close()`]: ../struct.Connection.html#method.close
1930    pub fn poll<F: BufFactory>(
1931        &mut self, conn: &mut super::Connection<F>,
1932    ) -> Result<(u64, Event)> {
1933        // When connection close is initiated by the local application (e.g. due
1934        // to a protocol error), the connection itself might be in a broken
1935        // state, so return early.
1936        if conn.local_error.is_some() {
1937            return Err(Error::Done);
1938        }
1939
1940        // Process control streams first.
1941        if let Some(stream_id) = self.peer_control_stream_id {
1942            match self.process_control_stream(conn, stream_id) {
1943                Ok(ev) => return Ok(ev),
1944
1945                Err(Error::Done) => (),
1946
1947                Err(e) => return Err(e),
1948            };
1949        }
1950
1951        if let Some(stream_id) = self.peer_qpack_streams.encoder_stream_id {
1952            match self.process_control_stream(conn, stream_id) {
1953                Ok(ev) => return Ok(ev),
1954
1955                Err(Error::Done) => (),
1956
1957                Err(e) => return Err(e),
1958            };
1959        }
1960
1961        if let Some(stream_id) = self.peer_qpack_streams.decoder_stream_id {
1962            match self.process_control_stream(conn, stream_id) {
1963                Ok(ev) => return Ok(ev),
1964
1965                Err(Error::Done) => (),
1966
1967                Err(e) => return Err(e),
1968            };
1969        }
1970
1971        // Process finished streams list.
1972        if let Some(finished) = self.finished_streams.pop_front() {
1973            return Ok((finished, Event::Finished));
1974        }
1975
1976        // Process HTTP/3 data from readable streams.
1977        for s in conn.readable() {
1978            trace!("{} stream id {} is readable", conn.trace_id(), s);
1979
1980            let ev = match self.process_readable_stream(conn, s, true) {
1981                Ok(v) => Some(v),
1982
1983                Err(Error::Done) => None,
1984
1985                // Return early if the stream was reset, to avoid returning
1986                // a Finished event later as well.
1987                Err(Error::TransportError(crate::Error::StreamReset(e))) =>
1988                    return Ok((s, Event::Reset(e))),
1989
1990                Err(e) => return Err(e),
1991            };
1992
1993            if conn.stream_finished(s) {
1994                self.process_finished_stream(s);
1995            }
1996
1997            // TODO: check if stream is completed so it can be freed
1998            if let Some(ev) = ev {
1999                return Ok(ev);
2000            }
2001        }
2002
2003        // Process finished streams list once again, to make sure `Finished`
2004        // events are returned when receiving empty stream frames with the fin
2005        // flag set.
2006        if let Some(finished) = self.finished_streams.pop_front() {
2007            if conn.stream_readable(finished) {
2008                // The stream is finished, but is still readable, it may
2009                // indicate that there is a pending error, such as reset.
2010                if let Err(crate::Error::StreamReset(e)) =
2011                    conn.stream_recv(finished, &mut [])
2012                {
2013                    return Ok((finished, Event::Reset(e)));
2014                }
2015            }
2016            return Ok((finished, Event::Finished));
2017        }
2018
2019        Err(Error::Done)
2020    }
2021
2022    /// Sends a GOAWAY frame to initiate graceful connection closure.
2023    ///
2024    /// When quiche is used in the server role, the `id` parameter is the stream
2025    /// ID of the highest processed request. This can be any valid ID between 0
2026    /// and 2^62-4. However, the ID cannot be increased. Failure to satisfy
2027    /// these conditions will return an error.
2028    ///
2029    /// This method does not close the QUIC connection. Applications are
2030    /// required to call [`close()`] themselves.
2031    ///
2032    /// [`close()`]: ../struct.Connection.html#method.close
2033    pub fn send_goaway<F: BufFactory>(
2034        &mut self, conn: &mut super::Connection<F>, id: u64,
2035    ) -> Result<()> {
2036        let mut id = id;
2037
2038        // TODO: server push
2039        //
2040        // In the meantime always send 0 from client.
2041        if !self.is_server {
2042            id = 0;
2043        }
2044
2045        if self.is_server && id % 4 != 0 {
2046            return Err(Error::IdError);
2047        }
2048
2049        if let Some(sent_id) = self.local_goaway_id {
2050            if id > sent_id {
2051                return Err(Error::IdError);
2052            }
2053        }
2054
2055        if let Some(stream_id) = self.control_stream_id {
2056            let mut d = [42; 10];
2057            let mut b = octets::OctetsMut::with_slice(&mut d);
2058
2059            let frame = frame::Frame::GoAway { id };
2060
2061            let wire_len = frame.to_bytes(&mut b)?;
2062            let stream_cap = conn.stream_capacity(stream_id)?;
2063
2064            if stream_cap < wire_len {
2065                return Err(Error::StreamBlocked);
2066            }
2067
2068            trace!("{} tx frm {:?}", conn.trace_id(), frame);
2069
2070            qlog_with_type!(QLOG_FRAME_CREATED, conn.qlog, q, {
2071                let ev_data = EventData::H3FrameCreated(H3FrameCreated {
2072                    stream_id,
2073                    length: Some(octets::varint_len(id) as u64),
2074                    frame: frame.to_qlog(),
2075                    ..Default::default()
2076                });
2077
2078                q.add_event_data_now(ev_data).ok();
2079            });
2080
2081            let off = b.off();
2082            conn.stream_send(stream_id, &d[..off], false)?;
2083
2084            self.local_goaway_id = Some(id);
2085        }
2086
2087        Ok(())
2088    }
2089
2090    /// Gets the raw settings from peer including unknown and reserved types.
2091    ///
2092    /// The order of settings is the same as received in the SETTINGS frame.
2093    pub fn peer_settings_raw(&self) -> Option<&[(u64, u64)]> {
2094        self.peer_settings.raw.as_deref()
2095    }
2096
2097    fn open_uni_stream<F: BufFactory>(
2098        &mut self, conn: &mut super::Connection<F>, ty: u64,
2099    ) -> Result<u64> {
2100        let stream_id = self.next_uni_stream_id;
2101
2102        let mut d = [0; 8];
2103        let mut b = octets::OctetsMut::with_slice(&mut d);
2104
2105        match ty {
2106            // Control and QPACK streams are the most important to schedule.
2107            stream::HTTP3_CONTROL_STREAM_TYPE_ID |
2108            stream::QPACK_ENCODER_STREAM_TYPE_ID |
2109            stream::QPACK_DECODER_STREAM_TYPE_ID => {
2110                conn.stream_priority(stream_id, 0, false)?;
2111            },
2112
2113            // TODO: Server push
2114            stream::HTTP3_PUSH_STREAM_TYPE_ID => (),
2115
2116            // Anything else is a GREASE stream, so make it the least important.
2117            _ => {
2118                conn.stream_priority(stream_id, 255, false)?;
2119            },
2120        }
2121
2122        conn.stream_send(stream_id, b.put_varint(ty)?, false)?;
2123
2124        // To avoid skipping stream IDs, we only calculate the next available
2125        // stream ID when data has been successfully buffered.
2126        self.next_uni_stream_id = self
2127            .next_uni_stream_id
2128            .checked_add(4)
2129            .ok_or(Error::IdError)?;
2130
2131        Ok(stream_id)
2132    }
2133
2134    fn open_qpack_encoder_stream<F: BufFactory>(
2135        &mut self, conn: &mut super::Connection<F>,
2136    ) -> Result<()> {
2137        let stream_id =
2138            self.open_uni_stream(conn, stream::QPACK_ENCODER_STREAM_TYPE_ID)?;
2139
2140        self.local_qpack_streams.encoder_stream_id = Some(stream_id);
2141
2142        qlog_with_type!(QLOG_STREAM_TYPE_SET, conn.qlog, q, {
2143            let ev_data = EventData::H3StreamTypeSet(H3StreamTypeSet {
2144                stream_id,
2145                owner: Some(H3Owner::Local),
2146                stream_type: H3StreamType::QpackEncode,
2147                ..Default::default()
2148            });
2149
2150            q.add_event_data_now(ev_data).ok();
2151        });
2152
2153        Ok(())
2154    }
2155
2156    fn open_qpack_decoder_stream<F: BufFactory>(
2157        &mut self, conn: &mut super::Connection<F>,
2158    ) -> Result<()> {
2159        let stream_id =
2160            self.open_uni_stream(conn, stream::QPACK_DECODER_STREAM_TYPE_ID)?;
2161
2162        self.local_qpack_streams.decoder_stream_id = Some(stream_id);
2163
2164        qlog_with_type!(QLOG_STREAM_TYPE_SET, conn.qlog, q, {
2165            let ev_data = EventData::H3StreamTypeSet(H3StreamTypeSet {
2166                stream_id,
2167                owner: Some(H3Owner::Local),
2168                stream_type: H3StreamType::QpackDecode,
2169                ..Default::default()
2170            });
2171
2172            q.add_event_data_now(ev_data).ok();
2173        });
2174
2175        Ok(())
2176    }
2177
2178    /// Send GREASE frames on the provided stream ID.
2179    fn send_grease_frames<F: BufFactory>(
2180        &mut self, conn: &mut super::Connection<F>, stream_id: u64,
2181    ) -> Result<()> {
2182        let mut d = [0; 8];
2183
2184        let stream_cap = match conn.stream_capacity(stream_id) {
2185            Ok(v) => v,
2186
2187            Err(e) => {
2188                if conn.stream_finished(stream_id) {
2189                    self.streams.remove(&stream_id);
2190                }
2191
2192                return Err(e.into());
2193            },
2194        };
2195
2196        let grease_frame1 = grease_value();
2197        let grease_frame2 = grease_value();
2198        let grease_payload = b"GREASE is the word";
2199
2200        let overhead = octets::varint_len(grease_frame1) + // frame type
2201            1 + // payload len
2202            octets::varint_len(grease_frame2) + // frame type
2203            1 + // payload len
2204            grease_payload.len(); // payload
2205
2206        // Don't send GREASE if there is not enough capacity for it. Greasing
2207        // will _not_ be attempted again later on.
2208        if stream_cap < overhead {
2209            return Ok(());
2210        }
2211
2212        // Empty GREASE frame.
2213        let mut b = octets::OctetsMut::with_slice(&mut d);
2214        conn.stream_send(stream_id, b.put_varint(grease_frame1)?, false)?;
2215
2216        let mut b = octets::OctetsMut::with_slice(&mut d);
2217        conn.stream_send(stream_id, b.put_varint(0)?, false)?;
2218
2219        trace!(
2220            "{} tx frm GREASE stream={} len=0",
2221            conn.trace_id(),
2222            stream_id
2223        );
2224
2225        qlog_with_type!(QLOG_FRAME_CREATED, conn.qlog, q, {
2226            let frame = Http3Frame::Reserved { length: Some(0) };
2227            let ev_data = EventData::H3FrameCreated(H3FrameCreated {
2228                stream_id,
2229                length: Some(0),
2230                frame,
2231                ..Default::default()
2232            });
2233
2234            q.add_event_data_now(ev_data).ok();
2235        });
2236
2237        // GREASE frame with payload.
2238        let mut b = octets::OctetsMut::with_slice(&mut d);
2239        conn.stream_send(stream_id, b.put_varint(grease_frame2)?, false)?;
2240
2241        let mut b = octets::OctetsMut::with_slice(&mut d);
2242        conn.stream_send(stream_id, b.put_varint(18)?, false)?;
2243
2244        conn.stream_send(stream_id, grease_payload, false)?;
2245
2246        trace!(
2247            "{} tx frm GREASE stream={} len={}",
2248            conn.trace_id(),
2249            stream_id,
2250            grease_payload.len()
2251        );
2252
2253        qlog_with_type!(QLOG_FRAME_CREATED, conn.qlog, q, {
2254            let frame = Http3Frame::Reserved {
2255                length: Some(grease_payload.len() as u64),
2256            };
2257            let ev_data = EventData::H3FrameCreated(H3FrameCreated {
2258                stream_id,
2259                length: Some(grease_payload.len() as u64),
2260                frame,
2261                ..Default::default()
2262            });
2263
2264            q.add_event_data_now(ev_data).ok();
2265        });
2266
2267        Ok(())
2268    }
2269
2270    /// Opens a new unidirectional stream with a GREASE type and sends some
2271    /// unframed payload.
2272    fn open_grease_stream<F: BufFactory>(
2273        &mut self, conn: &mut super::Connection<F>,
2274    ) -> Result<()> {
2275        let ty = grease_value();
2276        match self.open_uni_stream(conn, ty) {
2277            Ok(stream_id) => {
2278                conn.stream_send(stream_id, b"GREASE is the word", true)?;
2279
2280                trace!("{} open GREASE stream {}", conn.trace_id(), stream_id);
2281
2282                qlog_with_type!(QLOG_STREAM_TYPE_SET, conn.qlog, q, {
2283                    let ev_data = EventData::H3StreamTypeSet(H3StreamTypeSet {
2284                        stream_id,
2285                        owner: Some(H3Owner::Local),
2286                        stream_type: H3StreamType::Unknown,
2287                        stream_type_value: Some(ty),
2288                        ..Default::default()
2289                    });
2290
2291                    q.add_event_data_now(ev_data).ok();
2292                });
2293            },
2294
2295            Err(Error::IdError) => {
2296                trace!("{} GREASE stream blocked", conn.trace_id(),);
2297
2298                return Ok(());
2299            },
2300
2301            Err(e) => return Err(e),
2302        };
2303
2304        Ok(())
2305    }
2306
2307    /// Sends SETTINGS frame based on HTTP/3 configuration.
2308    fn send_settings<F: BufFactory>(
2309        &mut self, conn: &mut super::Connection<F>,
2310    ) -> Result<()> {
2311        let stream_id = match self
2312            .open_uni_stream(conn, stream::HTTP3_CONTROL_STREAM_TYPE_ID)
2313        {
2314            Ok(v) => v,
2315
2316            Err(e) => {
2317                trace!("{} Control stream blocked", conn.trace_id(),);
2318
2319                if e == Error::Done {
2320                    return Err(Error::InternalError);
2321                }
2322
2323                return Err(e);
2324            },
2325        };
2326
2327        self.control_stream_id = Some(stream_id);
2328
2329        qlog_with_type!(QLOG_STREAM_TYPE_SET, conn.qlog, q, {
2330            let ev_data = EventData::H3StreamTypeSet(H3StreamTypeSet {
2331                stream_id,
2332                owner: Some(H3Owner::Local),
2333                stream_type: H3StreamType::Control,
2334                ..Default::default()
2335            });
2336
2337            q.add_event_data_now(ev_data).ok();
2338        });
2339
2340        let grease = if conn.grease {
2341            Some((grease_value(), grease_value()))
2342        } else {
2343            None
2344        };
2345
2346        let frame = frame::Frame::Settings {
2347            max_field_section_size: self.local_settings.max_field_section_size,
2348            qpack_max_table_capacity: self
2349                .local_settings
2350                .qpack_max_table_capacity,
2351            qpack_blocked_streams: self.local_settings.qpack_blocked_streams,
2352            connect_protocol_enabled: self
2353                .local_settings
2354                .connect_protocol_enabled,
2355            h3_datagram: self.local_settings.h3_datagram,
2356            grease,
2357            additional_settings: self.local_settings.additional_settings.clone(),
2358            raw: Default::default(),
2359        };
2360
2361        let mut d = [42; 128];
2362        let mut b = octets::OctetsMut::with_slice(&mut d);
2363
2364        frame.to_bytes(&mut b)?;
2365
2366        let off = b.off();
2367
2368        if let Some(id) = self.control_stream_id {
2369            conn.stream_send(id, &d[..off], false)?;
2370
2371            trace!(
2372                "{} tx frm SETTINGS stream={} len={}",
2373                conn.trace_id(),
2374                id,
2375                off
2376            );
2377
2378            qlog_with_type!(QLOG_FRAME_CREATED, conn.qlog, q, {
2379                let frame = frame.to_qlog();
2380                let ev_data = EventData::H3FrameCreated(H3FrameCreated {
2381                    stream_id: id,
2382                    length: Some(off as u64),
2383                    frame,
2384                    ..Default::default()
2385                });
2386
2387                q.add_event_data_now(ev_data).ok();
2388            });
2389        }
2390
2391        Ok(())
2392    }
2393
2394    fn process_control_stream<F: BufFactory>(
2395        &mut self, conn: &mut super::Connection<F>, stream_id: u64,
2396    ) -> Result<(u64, Event)> {
2397        close_conn_if_critical_stream_finished(conn, stream_id)?;
2398
2399        if !conn.stream_readable(stream_id) {
2400            return Err(Error::Done);
2401        }
2402
2403        match self.process_readable_stream(conn, stream_id, true) {
2404            Ok(ev) => return Ok(ev),
2405
2406            Err(Error::Done) => (),
2407
2408            Err(e) => return Err(e),
2409        };
2410
2411        close_conn_if_critical_stream_finished(conn, stream_id)?;
2412
2413        Err(Error::Done)
2414    }
2415
2416    fn process_readable_stream<F: BufFactory>(
2417        &mut self, conn: &mut super::Connection<F>, stream_id: u64, polling: bool,
2418    ) -> Result<(u64, Event)> {
2419        self.streams
2420            .entry(stream_id)
2421            .or_insert_with(|| <stream::Stream>::new(stream_id, false));
2422
2423        // We need to get a fresh reference to the stream for each
2424        // iteration, to avoid borrowing `self` for the entire duration
2425        // of the loop, because we'll need to borrow it again in the
2426        // `State::FramePayload` case below.
2427        while let Some(stream) = self.streams.get_mut(&stream_id) {
2428            match stream.state() {
2429                stream::State::StreamType => {
2430                    stream.try_fill_buffer(conn)?;
2431
2432                    let varint = match stream.try_consume_varint() {
2433                        Ok(v) => v,
2434
2435                        Err(_) => continue,
2436                    };
2437
2438                    let ty = stream::Type::deserialize(varint)?;
2439
2440                    if let Err(e) = stream.set_ty(ty) {
2441                        conn.close(true, e.to_wire(), b"")?;
2442                        return Err(e);
2443                    }
2444
2445                    qlog_with_type!(QLOG_STREAM_TYPE_SET, conn.qlog, q, {
2446                        let ty_val = if matches!(ty, stream::Type::Unknown) {
2447                            Some(varint)
2448                        } else {
2449                            None
2450                        };
2451
2452                        let ev_data =
2453                            EventData::H3StreamTypeSet(H3StreamTypeSet {
2454                                stream_id,
2455                                owner: Some(H3Owner::Remote),
2456                                stream_type: ty.to_qlog(),
2457                                stream_type_value: ty_val,
2458                                ..Default::default()
2459                            });
2460
2461                        q.add_event_data_now(ev_data).ok();
2462                    });
2463
2464                    match &ty {
2465                        stream::Type::Control => {
2466                            // Only one control stream allowed.
2467                            if self.peer_control_stream_id.is_some() {
2468                                conn.close(
2469                                    true,
2470                                    Error::StreamCreationError.to_wire(),
2471                                    b"Received multiple control streams",
2472                                )?;
2473
2474                                return Err(Error::StreamCreationError);
2475                            }
2476
2477                            trace!(
2478                                "{} open peer's control stream {}",
2479                                conn.trace_id(),
2480                                stream_id
2481                            );
2482
2483                            close_conn_if_critical_stream_finished(
2484                                conn, stream_id,
2485                            )?;
2486
2487                            self.peer_control_stream_id = Some(stream_id);
2488                        },
2489
2490                        stream::Type::Push => {
2491                            // Only clients can receive push stream.
2492                            if self.is_server {
2493                                conn.close(
2494                                    true,
2495                                    Error::StreamCreationError.to_wire(),
2496                                    b"Server received push stream.",
2497                                )?;
2498
2499                                return Err(Error::StreamCreationError);
2500                            }
2501                        },
2502
2503                        stream::Type::QpackEncoder => {
2504                            // Only one qpack encoder stream allowed.
2505                            if self.peer_qpack_streams.encoder_stream_id.is_some()
2506                            {
2507                                conn.close(
2508                                    true,
2509                                    Error::StreamCreationError.to_wire(),
2510                                    b"Received multiple QPACK encoder streams",
2511                                )?;
2512
2513                                return Err(Error::StreamCreationError);
2514                            }
2515
2516                            close_conn_if_critical_stream_finished(
2517                                conn, stream_id,
2518                            )?;
2519
2520                            self.peer_qpack_streams.encoder_stream_id =
2521                                Some(stream_id);
2522                        },
2523
2524                        stream::Type::QpackDecoder => {
2525                            // Only one qpack decoder allowed.
2526                            if self.peer_qpack_streams.decoder_stream_id.is_some()
2527                            {
2528                                conn.close(
2529                                    true,
2530                                    Error::StreamCreationError.to_wire(),
2531                                    b"Received multiple QPACK decoder streams",
2532                                )?;
2533
2534                                return Err(Error::StreamCreationError);
2535                            }
2536
2537                            close_conn_if_critical_stream_finished(
2538                                conn, stream_id,
2539                            )?;
2540
2541                            self.peer_qpack_streams.decoder_stream_id =
2542                                Some(stream_id);
2543                        },
2544
2545                        stream::Type::Unknown => {
2546                            // Unknown stream types are ignored.
2547                            // TODO: we MAY send STOP_SENDING
2548                        },
2549
2550                        stream::Type::Request => unreachable!(),
2551                    }
2552                },
2553
2554                stream::State::PushId => {
2555                    stream.try_fill_buffer(conn)?;
2556
2557                    let varint = match stream.try_consume_varint() {
2558                        Ok(v) => v,
2559
2560                        Err(_) => continue,
2561                    };
2562
2563                    if let Err(e) = stream.set_push_id(varint) {
2564                        conn.close(true, e.to_wire(), b"")?;
2565                        return Err(e);
2566                    }
2567                },
2568
2569                stream::State::FrameType => {
2570                    stream.try_fill_buffer(conn)?;
2571
2572                    let varint = match stream.try_consume_varint() {
2573                        Ok(v) => v,
2574
2575                        Err(_) => continue,
2576                    };
2577
2578                    match stream.set_frame_type(varint) {
2579                        Err(Error::FrameUnexpected) => {
2580                            let msg = format!("Unexpected frame type {varint}");
2581
2582                            conn.close(
2583                                true,
2584                                Error::FrameUnexpected.to_wire(),
2585                                msg.as_bytes(),
2586                            )?;
2587
2588                            return Err(Error::FrameUnexpected);
2589                        },
2590
2591                        Err(e) => {
2592                            conn.close(
2593                                true,
2594                                e.to_wire(),
2595                                b"Error handling frame.",
2596                            )?;
2597
2598                            return Err(e);
2599                        },
2600
2601                        _ => (),
2602                    }
2603                },
2604
2605                stream::State::FramePayloadLen => {
2606                    stream.try_fill_buffer(conn)?;
2607
2608                    let payload_len = match stream.try_consume_varint() {
2609                        Ok(v) => v,
2610
2611                        Err(_) => continue,
2612                    };
2613
2614                    // DATA frames are handled uniquely. After this point we lose
2615                    // visibility of DATA framing, so just log here.
2616                    if Some(frame::DATA_FRAME_TYPE_ID) == stream.frame_type() {
2617                        trace!(
2618                            "{} rx frm DATA stream={} wire_payload_len={}",
2619                            conn.trace_id(),
2620                            stream_id,
2621                            payload_len
2622                        );
2623
2624                        qlog_with_type!(QLOG_FRAME_PARSED, conn.qlog, q, {
2625                            let frame = Http3Frame::Data { raw: None };
2626
2627                            let ev_data =
2628                                EventData::H3FrameParsed(H3FrameParsed {
2629                                    stream_id,
2630                                    length: Some(payload_len),
2631                                    frame,
2632                                    ..Default::default()
2633                                });
2634
2635                            q.add_event_data_now(ev_data).ok();
2636                        });
2637                    }
2638
2639                    if let Err(e) = stream.set_frame_payload_len(payload_len) {
2640                        conn.close(true, e.to_wire(), b"")?;
2641                        return Err(e);
2642                    }
2643                },
2644
2645                stream::State::FramePayload => {
2646                    // Do not emit events when not polling.
2647                    if !polling {
2648                        break;
2649                    }
2650
2651                    stream.try_fill_buffer(conn)?;
2652
2653                    let (frame, payload_len) = match stream.try_consume_frame() {
2654                        Ok(frame) => frame,
2655
2656                        Err(Error::Done) => return Err(Error::Done),
2657
2658                        Err(e) => {
2659                            conn.close(
2660                                true,
2661                                e.to_wire(),
2662                                b"Error handling frame.",
2663                            )?;
2664
2665                            return Err(e);
2666                        },
2667                    };
2668
2669                    match self.process_frame(conn, stream_id, frame, payload_len)
2670                    {
2671                        Ok(ev) => return Ok(ev),
2672
2673                        Err(Error::Done) => {
2674                            // This might be a frame that is processed internally
2675                            // without needing to bubble up to the user as an
2676                            // event. Check whether the frame has FIN'd by QUIC
2677                            // to prevent trying to read again on a closed stream.
2678                            if conn.stream_finished(stream_id) {
2679                                break;
2680                            }
2681                        },
2682
2683                        Err(e) => return Err(e),
2684                    };
2685                },
2686
2687                stream::State::Data => {
2688                    // Do not emit events when not polling.
2689                    if !polling {
2690                        break;
2691                    }
2692
2693                    if !stream.try_trigger_data_event() {
2694                        break;
2695                    }
2696
2697                    return Ok((stream_id, Event::Data));
2698                },
2699
2700                stream::State::QpackInstruction => {
2701                    let mut d = [0; 4096];
2702
2703                    // Read data from the stream and discard immediately.
2704                    loop {
2705                        let (recv, fin) = conn.stream_recv(stream_id, &mut d)?;
2706
2707                        match stream.ty() {
2708                            Some(stream::Type::QpackEncoder) =>
2709                                self.peer_qpack_streams.encoder_stream_bytes +=
2710                                    recv as u64,
2711                            Some(stream::Type::QpackDecoder) =>
2712                                self.peer_qpack_streams.decoder_stream_bytes +=
2713                                    recv as u64,
2714                            _ => unreachable!(),
2715                        };
2716
2717                        if fin {
2718                            close_conn_critical_stream(conn)?;
2719                        }
2720                    }
2721                },
2722
2723                stream::State::Drain => {
2724                    // Discard incoming data on the stream.
2725                    conn.stream_shutdown(
2726                        stream_id,
2727                        crate::Shutdown::Read,
2728                        0x100,
2729                    )?;
2730
2731                    break;
2732                },
2733
2734                stream::State::Finished => break,
2735            }
2736        }
2737
2738        Err(Error::Done)
2739    }
2740
2741    fn process_finished_stream(&mut self, stream_id: u64) {
2742        let stream = match self.streams.get_mut(&stream_id) {
2743            Some(v) => v,
2744
2745            None => return,
2746        };
2747
2748        if stream.state() == stream::State::Finished {
2749            return;
2750        }
2751
2752        match stream.ty() {
2753            Some(stream::Type::Request) | Some(stream::Type::Push) => {
2754                stream.finished();
2755
2756                self.finished_streams.push_back(stream_id);
2757            },
2758
2759            _ => (),
2760        };
2761    }
2762
2763    fn process_frame<F: BufFactory>(
2764        &mut self, conn: &mut super::Connection<F>, stream_id: u64,
2765        frame: frame::Frame, payload_len: u64,
2766    ) -> Result<(u64, Event)> {
2767        trace!(
2768            "{} rx frm {:?} stream={} payload_len={}",
2769            conn.trace_id(),
2770            frame,
2771            stream_id,
2772            payload_len
2773        );
2774
2775        qlog_with_type!(QLOG_FRAME_PARSED, conn.qlog, q, {
2776            // HEADERS frames are special case and will be logged below.
2777            if !matches!(frame, frame::Frame::Headers { .. }) {
2778                let frame = frame.to_qlog();
2779                let ev_data = EventData::H3FrameParsed(H3FrameParsed {
2780                    stream_id,
2781                    length: Some(payload_len),
2782                    frame,
2783                    ..Default::default()
2784                });
2785
2786                q.add_event_data_now(ev_data).ok();
2787            }
2788        });
2789
2790        match frame {
2791            frame::Frame::Settings {
2792                max_field_section_size,
2793                qpack_max_table_capacity,
2794                qpack_blocked_streams,
2795                connect_protocol_enabled,
2796                h3_datagram,
2797                additional_settings,
2798                raw,
2799                ..
2800            } => {
2801                self.peer_settings = ConnectionSettings {
2802                    max_field_section_size,
2803                    qpack_max_table_capacity,
2804                    qpack_blocked_streams,
2805                    connect_protocol_enabled,
2806                    h3_datagram,
2807                    additional_settings,
2808                    raw,
2809                };
2810
2811                if let Some(1) = h3_datagram {
2812                    // The peer MUST have also enabled DATAGRAM with a TP
2813                    if conn.dgram_max_writable_len().is_none() {
2814                        conn.close(
2815                            true,
2816                            Error::SettingsError.to_wire(),
2817                            b"H3_DATAGRAM sent with value 1 but max_datagram_frame_size TP not set.",
2818                        )?;
2819
2820                        return Err(Error::SettingsError);
2821                    }
2822                }
2823            },
2824
2825            frame::Frame::Headers { header_block } => {
2826                if Some(stream_id) == self.peer_control_stream_id {
2827                    conn.close(
2828                        true,
2829                        Error::FrameUnexpected.to_wire(),
2830                        b"HEADERS received on control stream",
2831                    )?;
2832
2833                    return Err(Error::FrameUnexpected);
2834                }
2835
2836                // Servers reject too many HEADERS frames.
2837                if let Some(s) = self.streams.get_mut(&stream_id) {
2838                    if self.is_server && s.headers_received_count() == 2 {
2839                        conn.close(
2840                            true,
2841                            Error::FrameUnexpected.to_wire(),
2842                            b"Too many HEADERS frames",
2843                        )?;
2844                        return Err(Error::FrameUnexpected);
2845                    }
2846
2847                    s.increment_headers_received();
2848                }
2849
2850                // Use "infinite" as default value for max_field_section_size if
2851                // it is not configured by the application.
2852                let max_size = self
2853                    .local_settings
2854                    .max_field_section_size
2855                    .unwrap_or(u64::MAX);
2856
2857                let headers = match self
2858                    .qpack_decoder
2859                    .decode(&header_block[..], max_size)
2860                {
2861                    Ok(v) => v,
2862
2863                    Err(e) => {
2864                        let e = match e {
2865                            qpack::Error::HeaderListTooLarge =>
2866                                Error::ExcessiveLoad,
2867
2868                            _ => Error::QpackDecompressionFailed,
2869                        };
2870
2871                        conn.close(true, e.to_wire(), b"Error parsing headers.")?;
2872
2873                        return Err(e);
2874                    },
2875                };
2876
2877                qlog_with_type!(QLOG_FRAME_PARSED, conn.qlog, q, {
2878                    let qlog_headers = headers
2879                        .iter()
2880                        .map(|h| qlog::events::h3::HttpHeader {
2881                            name: String::from_utf8_lossy(h.name()).into_owned(),
2882                            value: String::from_utf8_lossy(h.value())
2883                                .into_owned(),
2884                        })
2885                        .collect();
2886
2887                    let frame = Http3Frame::Headers {
2888                        headers: qlog_headers,
2889                    };
2890
2891                    let ev_data = EventData::H3FrameParsed(H3FrameParsed {
2892                        stream_id,
2893                        length: Some(payload_len),
2894                        frame,
2895                        ..Default::default()
2896                    });
2897
2898                    q.add_event_data_now(ev_data).ok();
2899                });
2900
2901                let more_frames = !conn.stream_finished(stream_id);
2902
2903                return Ok((stream_id, Event::Headers {
2904                    list: headers,
2905                    more_frames,
2906                }));
2907            },
2908
2909            frame::Frame::Data { .. } => {
2910                if Some(stream_id) == self.peer_control_stream_id {
2911                    conn.close(
2912                        true,
2913                        Error::FrameUnexpected.to_wire(),
2914                        b"DATA received on control stream",
2915                    )?;
2916
2917                    return Err(Error::FrameUnexpected);
2918                }
2919
2920                // Do nothing. The Data event is returned separately.
2921            },
2922
2923            frame::Frame::GoAway { id } => {
2924                if Some(stream_id) != self.peer_control_stream_id {
2925                    conn.close(
2926                        true,
2927                        Error::FrameUnexpected.to_wire(),
2928                        b"GOAWAY received on non-control stream",
2929                    )?;
2930
2931                    return Err(Error::FrameUnexpected);
2932                }
2933
2934                if !self.is_server && id % 4 != 0 {
2935                    conn.close(
2936                        true,
2937                        Error::FrameUnexpected.to_wire(),
2938                        b"GOAWAY received with ID of non-request stream",
2939                    )?;
2940
2941                    return Err(Error::IdError);
2942                }
2943
2944                if let Some(received_id) = self.peer_goaway_id {
2945                    if id > received_id {
2946                        conn.close(
2947                            true,
2948                            Error::IdError.to_wire(),
2949                            b"GOAWAY received with ID larger than previously received",
2950                        )?;
2951
2952                        return Err(Error::IdError);
2953                    }
2954                }
2955
2956                self.peer_goaway_id = Some(id);
2957
2958                return Ok((id, Event::GoAway));
2959            },
2960
2961            frame::Frame::MaxPushId { push_id } => {
2962                if Some(stream_id) != self.peer_control_stream_id {
2963                    conn.close(
2964                        true,
2965                        Error::FrameUnexpected.to_wire(),
2966                        b"MAX_PUSH_ID received on non-control stream",
2967                    )?;
2968
2969                    return Err(Error::FrameUnexpected);
2970                }
2971
2972                if !self.is_server {
2973                    conn.close(
2974                        true,
2975                        Error::FrameUnexpected.to_wire(),
2976                        b"MAX_PUSH_ID received by client",
2977                    )?;
2978
2979                    return Err(Error::FrameUnexpected);
2980                }
2981
2982                if push_id < self.max_push_id {
2983                    conn.close(
2984                        true,
2985                        Error::IdError.to_wire(),
2986                        b"MAX_PUSH_ID reduced limit",
2987                    )?;
2988
2989                    return Err(Error::IdError);
2990                }
2991
2992                self.max_push_id = push_id;
2993            },
2994
2995            frame::Frame::PushPromise { .. } => {
2996                if self.is_server {
2997                    conn.close(
2998                        true,
2999                        Error::FrameUnexpected.to_wire(),
3000                        b"PUSH_PROMISE received by server",
3001                    )?;
3002
3003                    return Err(Error::FrameUnexpected);
3004                }
3005
3006                if stream_id % 4 != 0 {
3007                    conn.close(
3008                        true,
3009                        Error::FrameUnexpected.to_wire(),
3010                        b"PUSH_PROMISE received on non-request stream",
3011                    )?;
3012
3013                    return Err(Error::FrameUnexpected);
3014                }
3015
3016                // TODO: implement more checks and PUSH_PROMISE event
3017            },
3018
3019            frame::Frame::CancelPush { .. } => {
3020                if Some(stream_id) != self.peer_control_stream_id {
3021                    conn.close(
3022                        true,
3023                        Error::FrameUnexpected.to_wire(),
3024                        b"CANCEL_PUSH received on non-control stream",
3025                    )?;
3026
3027                    return Err(Error::FrameUnexpected);
3028                }
3029
3030                // TODO: implement CANCEL_PUSH frame
3031            },
3032
3033            frame::Frame::PriorityUpdateRequest {
3034                prioritized_element_id,
3035                priority_field_value,
3036            } => {
3037                if !self.is_server {
3038                    conn.close(
3039                        true,
3040                        Error::FrameUnexpected.to_wire(),
3041                        b"PRIORITY_UPDATE received by client",
3042                    )?;
3043
3044                    return Err(Error::FrameUnexpected);
3045                }
3046
3047                if Some(stream_id) != self.peer_control_stream_id {
3048                    conn.close(
3049                        true,
3050                        Error::FrameUnexpected.to_wire(),
3051                        b"PRIORITY_UPDATE received on non-control stream",
3052                    )?;
3053
3054                    return Err(Error::FrameUnexpected);
3055                }
3056
3057                if prioritized_element_id % 4 != 0 {
3058                    conn.close(
3059                        true,
3060                        Error::FrameUnexpected.to_wire(),
3061                        b"PRIORITY_UPDATE for request stream type with wrong ID",
3062                    )?;
3063
3064                    return Err(Error::FrameUnexpected);
3065                }
3066
3067                if prioritized_element_id > conn.streams.max_streams_bidi() * 4 {
3068                    conn.close(
3069                        true,
3070                        Error::IdError.to_wire(),
3071                        b"PRIORITY_UPDATE for request stream beyond max streams limit",
3072                    )?;
3073
3074                    return Err(Error::IdError);
3075                }
3076
3077                // If the PRIORITY_UPDATE is valid, consider storing the latest
3078                // contents. Due to reordering, it is possible that we might
3079                // receive frames that reference streams that have not yet to
3080                // been opened and that's OK because it's within our concurrency
3081                // limit. However, we discard PRIORITY_UPDATE that refers to
3082                // streams that we know have been collected.
3083                if conn.streams.is_collected(prioritized_element_id) {
3084                    return Err(Error::Done);
3085                }
3086
3087                // If the stream did not yet exist, create it and store.
3088                let stream =
3089                    self.streams.entry(prioritized_element_id).or_insert_with(
3090                        || <stream::Stream>::new(prioritized_element_id, false),
3091                    );
3092
3093                let had_priority_update = stream.has_last_priority_update();
3094                stream.set_last_priority_update(Some(priority_field_value));
3095
3096                // Only trigger the event when there wasn't already a stored
3097                // PRIORITY_UPDATE.
3098                if !had_priority_update {
3099                    return Ok((prioritized_element_id, Event::PriorityUpdate));
3100                } else {
3101                    return Err(Error::Done);
3102                }
3103            },
3104
3105            frame::Frame::PriorityUpdatePush {
3106                prioritized_element_id,
3107                ..
3108            } => {
3109                if !self.is_server {
3110                    conn.close(
3111                        true,
3112                        Error::FrameUnexpected.to_wire(),
3113                        b"PRIORITY_UPDATE received by client",
3114                    )?;
3115
3116                    return Err(Error::FrameUnexpected);
3117                }
3118
3119                if Some(stream_id) != self.peer_control_stream_id {
3120                    conn.close(
3121                        true,
3122                        Error::FrameUnexpected.to_wire(),
3123                        b"PRIORITY_UPDATE received on non-control stream",
3124                    )?;
3125
3126                    return Err(Error::FrameUnexpected);
3127                }
3128
3129                if prioritized_element_id % 3 != 0 {
3130                    conn.close(
3131                        true,
3132                        Error::FrameUnexpected.to_wire(),
3133                        b"PRIORITY_UPDATE for push stream type with wrong ID",
3134                    )?;
3135
3136                    return Err(Error::FrameUnexpected);
3137                }
3138
3139                // TODO: we only implement this if we implement server push
3140            },
3141
3142            frame::Frame::Unknown { .. } => (),
3143        }
3144
3145        Err(Error::Done)
3146    }
3147
3148    /// Collects and returns statistics about the connection.
3149    #[inline]
3150    pub fn stats(&self) -> Stats {
3151        Stats {
3152            qpack_encoder_stream_recv_bytes: self
3153                .peer_qpack_streams
3154                .encoder_stream_bytes,
3155            qpack_decoder_stream_recv_bytes: self
3156                .peer_qpack_streams
3157                .decoder_stream_bytes,
3158        }
3159    }
3160}
3161
3162/// Generates an HTTP/3 GREASE variable length integer.
3163pub fn grease_value() -> u64 {
3164    let n = super::rand::rand_u64_uniform(148_764_065_110_560_899);
3165    31 * n + 33
3166}
3167
3168#[doc(hidden)]
3169pub mod testing {
3170    use super::*;
3171
3172    use crate::testing;
3173
3174    /// Session is an HTTP/3 test helper structure. It holds a client, server
3175    /// and pipe that allows them to communicate.
3176    ///
3177    /// `default()` creates a session with some sensible default
3178    /// configuration. `with_configs()` allows for providing a specific
3179    /// configuration.
3180    ///
3181    /// `handshake()` performs all the steps needed to establish an HTTP/3
3182    /// connection.
3183    ///
3184    /// Some utility functions are provided that make it less verbose to send
3185    /// request, responses and individual headers. The full quiche API remains
3186    /// available for any test that need to do unconventional things (such as
3187    /// bad behaviour that triggers errors).
3188    pub struct Session {
3189        pub pipe: testing::Pipe,
3190        pub client: Connection,
3191        pub server: Connection,
3192    }
3193
3194    impl Session {
3195        pub fn new() -> Result<Session> {
3196            fn path_relative_to_manifest_dir(path: &str) -> String {
3197                std::fs::canonicalize(
3198                    std::path::Path::new(env!("CARGO_MANIFEST_DIR")).join(path),
3199                )
3200                .unwrap()
3201                .to_string_lossy()
3202                .into_owned()
3203            }
3204
3205            let mut config = crate::Config::new(crate::PROTOCOL_VERSION)?;
3206            config.load_cert_chain_from_pem_file(
3207                &path_relative_to_manifest_dir("examples/cert.crt"),
3208            )?;
3209            config.load_priv_key_from_pem_file(
3210                &path_relative_to_manifest_dir("examples/cert.key"),
3211            )?;
3212            config.set_application_protos(&[b"h3"])?;
3213            config.set_initial_max_data(1500);
3214            config.set_initial_max_stream_data_bidi_local(150);
3215            config.set_initial_max_stream_data_bidi_remote(150);
3216            config.set_initial_max_stream_data_uni(150);
3217            config.set_initial_max_streams_bidi(5);
3218            config.set_initial_max_streams_uni(5);
3219            config.verify_peer(false);
3220            config.enable_dgram(true, 3, 3);
3221            config.set_ack_delay_exponent(8);
3222
3223            let h3_config = Config::new()?;
3224            Session::with_configs(&mut config, &h3_config)
3225        }
3226
3227        pub fn with_configs(
3228            config: &mut crate::Config, h3_config: &Config,
3229        ) -> Result<Session> {
3230            let pipe = testing::Pipe::with_config(config)?;
3231            let client_dgram = pipe.client.dgram_enabled();
3232            let server_dgram = pipe.server.dgram_enabled();
3233            Ok(Session {
3234                pipe,
3235                client: Connection::new(h3_config, false, client_dgram)?,
3236                server: Connection::new(h3_config, true, server_dgram)?,
3237            })
3238        }
3239
3240        /// Do the HTTP/3 handshake so both ends are in sane initial state.
3241        pub fn handshake(&mut self) -> Result<()> {
3242            self.pipe.handshake()?;
3243
3244            // Client streams.
3245            self.client.send_settings(&mut self.pipe.client)?;
3246            self.pipe.advance().ok();
3247
3248            self.client
3249                .open_qpack_encoder_stream(&mut self.pipe.client)?;
3250            self.pipe.advance().ok();
3251
3252            self.client
3253                .open_qpack_decoder_stream(&mut self.pipe.client)?;
3254            self.pipe.advance().ok();
3255
3256            if self.pipe.client.grease {
3257                self.client.open_grease_stream(&mut self.pipe.client)?;
3258            }
3259
3260            self.pipe.advance().ok();
3261
3262            // Server streams.
3263            self.server.send_settings(&mut self.pipe.server)?;
3264            self.pipe.advance().ok();
3265
3266            self.server
3267                .open_qpack_encoder_stream(&mut self.pipe.server)?;
3268            self.pipe.advance().ok();
3269
3270            self.server
3271                .open_qpack_decoder_stream(&mut self.pipe.server)?;
3272            self.pipe.advance().ok();
3273
3274            if self.pipe.server.grease {
3275                self.server.open_grease_stream(&mut self.pipe.server)?;
3276            }
3277
3278            self.advance().ok();
3279
3280            while self.client.poll(&mut self.pipe.client).is_ok() {
3281                // Do nothing.
3282            }
3283
3284            while self.server.poll(&mut self.pipe.server).is_ok() {
3285                // Do nothing.
3286            }
3287
3288            Ok(())
3289        }
3290
3291        /// Advances the session pipe over the buffer.
3292        pub fn advance(&mut self) -> crate::Result<()> {
3293            self.pipe.advance()
3294        }
3295
3296        /// Polls the client for events.
3297        pub fn poll_client(&mut self) -> Result<(u64, Event)> {
3298            self.client.poll(&mut self.pipe.client)
3299        }
3300
3301        /// Polls the server for events.
3302        pub fn poll_server(&mut self) -> Result<(u64, Event)> {
3303            self.server.poll(&mut self.pipe.server)
3304        }
3305
3306        /// Sends a request from client with default headers.
3307        ///
3308        /// On success it returns the newly allocated stream and the headers.
3309        pub fn send_request(&mut self, fin: bool) -> Result<(u64, Vec<Header>)> {
3310            let req = vec![
3311                Header::new(b":method", b"GET"),
3312                Header::new(b":scheme", b"https"),
3313                Header::new(b":authority", b"quic.tech"),
3314                Header::new(b":path", b"/test"),
3315                Header::new(b"user-agent", b"quiche-test"),
3316            ];
3317
3318            let stream =
3319                self.client.send_request(&mut self.pipe.client, &req, fin)?;
3320
3321            self.advance().ok();
3322
3323            Ok((stream, req))
3324        }
3325
3326        /// Sends a response from server with default headers.
3327        ///
3328        /// On success it returns the headers.
3329        pub fn send_response(
3330            &mut self, stream: u64, fin: bool,
3331        ) -> Result<Vec<Header>> {
3332            let resp = vec![
3333                Header::new(b":status", b"200"),
3334                Header::new(b"server", b"quiche-test"),
3335            ];
3336
3337            self.server.send_response(
3338                &mut self.pipe.server,
3339                stream,
3340                &resp,
3341                fin,
3342            )?;
3343
3344            self.advance().ok();
3345
3346            Ok(resp)
3347        }
3348
3349        /// Sends some default payload from client.
3350        ///
3351        /// On success it returns the payload.
3352        pub fn send_body_client(
3353            &mut self, stream: u64, fin: bool,
3354        ) -> Result<Vec<u8>> {
3355            let bytes = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
3356
3357            self.client
3358                .send_body(&mut self.pipe.client, stream, &bytes, fin)?;
3359
3360            self.advance().ok();
3361
3362            Ok(bytes)
3363        }
3364
3365        /// Fetches DATA payload from the server.
3366        ///
3367        /// On success it returns the number of bytes received.
3368        pub fn recv_body_client(
3369            &mut self, stream: u64, buf: &mut [u8],
3370        ) -> Result<usize> {
3371            self.client.recv_body(&mut self.pipe.client, stream, buf)
3372        }
3373
3374        /// Sends some default payload from server.
3375        ///
3376        /// On success it returns the payload.
3377        pub fn send_body_server(
3378            &mut self, stream: u64, fin: bool,
3379        ) -> Result<Vec<u8>> {
3380            let bytes = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
3381
3382            self.server
3383                .send_body(&mut self.pipe.server, stream, &bytes, fin)?;
3384
3385            self.advance().ok();
3386
3387            Ok(bytes)
3388        }
3389
3390        /// Fetches DATA payload from the client.
3391        ///
3392        /// On success it returns the number of bytes received.
3393        pub fn recv_body_server(
3394            &mut self, stream: u64, buf: &mut [u8],
3395        ) -> Result<usize> {
3396            self.server.recv_body(&mut self.pipe.server, stream, buf)
3397        }
3398
3399        /// Sends a single HTTP/3 frame from the client.
3400        pub fn send_frame_client(
3401            &mut self, frame: frame::Frame, stream_id: u64, fin: bool,
3402        ) -> Result<()> {
3403            let mut d = [42; 65535];
3404
3405            let mut b = octets::OctetsMut::with_slice(&mut d);
3406
3407            frame.to_bytes(&mut b)?;
3408
3409            let off = b.off();
3410            self.pipe.client.stream_send(stream_id, &d[..off], fin)?;
3411
3412            self.advance().ok();
3413
3414            Ok(())
3415        }
3416
3417        /// Send an HTTP/3 DATAGRAM with default data from the client.
3418        ///
3419        /// On success it returns the data.
3420        pub fn send_dgram_client(&mut self, flow_id: u64) -> Result<Vec<u8>> {
3421            let bytes = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
3422            let len = octets::varint_len(flow_id) + bytes.len();
3423            let mut d = vec![0; len];
3424            let mut b = octets::OctetsMut::with_slice(&mut d);
3425
3426            b.put_varint(flow_id)?;
3427            b.put_bytes(&bytes)?;
3428
3429            self.pipe.client.dgram_send(&d)?;
3430
3431            self.advance().ok();
3432
3433            Ok(bytes)
3434        }
3435
3436        /// Receives an HTTP/3 DATAGRAM from the server.
3437        ///
3438        /// On success it returns the DATAGRAM length, flow ID and flow ID
3439        /// length.
3440        pub fn recv_dgram_client(
3441            &mut self, buf: &mut [u8],
3442        ) -> Result<(usize, u64, usize)> {
3443            let len = self.pipe.client.dgram_recv(buf)?;
3444            let mut b = octets::Octets::with_slice(buf);
3445            let flow_id = b.get_varint()?;
3446
3447            Ok((len, flow_id, b.off()))
3448        }
3449
3450        /// Send an HTTP/3 DATAGRAM with default data from the server
3451        ///
3452        /// On success it returns the data.
3453        pub fn send_dgram_server(&mut self, flow_id: u64) -> Result<Vec<u8>> {
3454            let bytes = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
3455            let len = octets::varint_len(flow_id) + bytes.len();
3456            let mut d = vec![0; len];
3457            let mut b = octets::OctetsMut::with_slice(&mut d);
3458
3459            b.put_varint(flow_id)?;
3460            b.put_bytes(&bytes)?;
3461
3462            self.pipe.server.dgram_send(&d)?;
3463
3464            self.advance().ok();
3465
3466            Ok(bytes)
3467        }
3468
3469        /// Receives an HTTP/3 DATAGRAM from the client.
3470        ///
3471        /// On success it returns the DATAGRAM length, flow ID and flow ID
3472        /// length.
3473        pub fn recv_dgram_server(
3474            &mut self, buf: &mut [u8],
3475        ) -> Result<(usize, u64, usize)> {
3476            let len = self.pipe.server.dgram_recv(buf)?;
3477            let mut b = octets::Octets::with_slice(buf);
3478            let flow_id = b.get_varint()?;
3479
3480            Ok((len, flow_id, b.off()))
3481        }
3482
3483        /// Sends a single HTTP/3 frame from the server.
3484        pub fn send_frame_server(
3485            &mut self, frame: frame::Frame, stream_id: u64, fin: bool,
3486        ) -> Result<()> {
3487            let mut d = [42; 65535];
3488
3489            let mut b = octets::OctetsMut::with_slice(&mut d);
3490
3491            frame.to_bytes(&mut b)?;
3492
3493            let off = b.off();
3494            self.pipe.server.stream_send(stream_id, &d[..off], fin)?;
3495
3496            self.advance().ok();
3497
3498            Ok(())
3499        }
3500
3501        /// Sends an arbitrary buffer of HTTP/3 stream data from the client.
3502        pub fn send_arbitrary_stream_data_client(
3503            &mut self, data: &[u8], stream_id: u64, fin: bool,
3504        ) -> Result<()> {
3505            self.pipe.client.stream_send(stream_id, data, fin)?;
3506
3507            self.advance().ok();
3508
3509            Ok(())
3510        }
3511
3512        /// Sends an arbitrary buffer of HTTP/3 stream data from the server.
3513        pub fn send_arbitrary_stream_data_server(
3514            &mut self, data: &[u8], stream_id: u64, fin: bool,
3515        ) -> Result<()> {
3516            self.pipe.server.stream_send(stream_id, data, fin)?;
3517
3518            self.advance().ok();
3519
3520            Ok(())
3521        }
3522    }
3523}
3524
3525#[cfg(test)]
3526mod tests {
3527    use super::*;
3528
3529    use super::testing::*;
3530
3531    #[test]
3532    /// Make sure that random GREASE values is within the specified limit.
3533    fn grease_value_in_varint_limit() {
3534        assert!(grease_value() < 2u64.pow(62) - 1);
3535    }
3536
3537    #[cfg(not(feature = "openssl"))] // 0-RTT not supported when using openssl/quictls
3538    #[test]
3539    fn h3_handshake_0rtt() {
3540        let mut buf = [0; 65535];
3541
3542        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
3543        config
3544            .load_cert_chain_from_pem_file("examples/cert.crt")
3545            .unwrap();
3546        config
3547            .load_priv_key_from_pem_file("examples/cert.key")
3548            .unwrap();
3549        config
3550            .set_application_protos(&[b"proto1", b"proto2"])
3551            .unwrap();
3552        config.set_initial_max_data(30);
3553        config.set_initial_max_stream_data_bidi_local(15);
3554        config.set_initial_max_stream_data_bidi_remote(15);
3555        config.set_initial_max_stream_data_uni(15);
3556        config.set_initial_max_streams_bidi(3);
3557        config.set_initial_max_streams_uni(3);
3558        config.enable_early_data();
3559        config.verify_peer(false);
3560
3561        let h3_config = Config::new().unwrap();
3562
3563        // Perform initial handshake.
3564        let mut pipe = crate::testing::Pipe::with_config(&mut config).unwrap();
3565        assert_eq!(pipe.handshake(), Ok(()));
3566
3567        // Extract session,
3568        let session = pipe.client.session().unwrap();
3569
3570        // Configure session on new connection.
3571        let mut pipe = crate::testing::Pipe::with_config(&mut config).unwrap();
3572        assert_eq!(pipe.client.set_session(session), Ok(()));
3573
3574        // Can't create an H3 connection until the QUIC connection is determined
3575        // to have made sufficient early data progress.
3576        assert!(matches!(
3577            Connection::with_transport(&mut pipe.client, &h3_config),
3578            Err(Error::InternalError)
3579        ));
3580
3581        // Client sends initial flight.
3582        let (len, _) = pipe.client.send(&mut buf).unwrap();
3583
3584        // Now an H3 connection can be created.
3585        assert!(Connection::with_transport(&mut pipe.client, &h3_config).is_ok());
3586        assert_eq!(pipe.server_recv(&mut buf[..len]), Ok(len));
3587
3588        // Client sends 0-RTT packet.
3589        let pkt_type = crate::packet::Type::ZeroRTT;
3590
3591        let frames = [crate::frame::Frame::Stream {
3592            stream_id: 6,
3593            data: <crate::range_buf::RangeBuf>::from(b"aaaaa", 0, true),
3594        }];
3595
3596        assert_eq!(
3597            pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
3598            Ok(1200)
3599        );
3600
3601        assert_eq!(pipe.server.undecryptable_pkts.len(), 0);
3602
3603        // 0-RTT stream data is readable.
3604        let mut r = pipe.server.readable();
3605        assert_eq!(r.next(), Some(6));
3606        assert_eq!(r.next(), None);
3607
3608        let mut b = [0; 15];
3609        assert_eq!(pipe.server.stream_recv(6, &mut b), Ok((5, true)));
3610        assert_eq!(&b[..5], b"aaaaa");
3611    }
3612
3613    #[test]
3614    /// Send a request with no body, get a response with no body.
3615    fn request_no_body_response_no_body() {
3616        let mut s = Session::new().unwrap();
3617        s.handshake().unwrap();
3618
3619        let (stream, req) = s.send_request(true).unwrap();
3620
3621        assert_eq!(stream, 0);
3622
3623        let ev_headers = Event::Headers {
3624            list: req,
3625            more_frames: false,
3626        };
3627
3628        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
3629        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
3630
3631        let resp = s.send_response(stream, true).unwrap();
3632
3633        let ev_headers = Event::Headers {
3634            list: resp,
3635            more_frames: false,
3636        };
3637
3638        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
3639        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
3640        assert_eq!(s.poll_client(), Err(Error::Done));
3641    }
3642
3643    #[test]
3644    /// Send a request with no body, get a response with one DATA frame.
3645    fn request_no_body_response_one_chunk() {
3646        let mut s = Session::new().unwrap();
3647        s.handshake().unwrap();
3648
3649        let (stream, req) = s.send_request(true).unwrap();
3650        assert_eq!(stream, 0);
3651
3652        let ev_headers = Event::Headers {
3653            list: req,
3654            more_frames: false,
3655        };
3656
3657        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
3658
3659        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
3660
3661        let resp = s.send_response(stream, false).unwrap();
3662
3663        let body = s.send_body_server(stream, true).unwrap();
3664
3665        let mut recv_buf = vec![0; body.len()];
3666
3667        let ev_headers = Event::Headers {
3668            list: resp,
3669            more_frames: true,
3670        };
3671
3672        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
3673
3674        assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
3675        assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(body.len()));
3676
3677        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
3678        assert_eq!(s.poll_client(), Err(Error::Done));
3679    }
3680
3681    #[test]
3682    /// Send a request with no body, get a response with multiple DATA frames.
3683    fn request_no_body_response_many_chunks() {
3684        let mut s = Session::new().unwrap();
3685        s.handshake().unwrap();
3686
3687        let (stream, req) = s.send_request(true).unwrap();
3688
3689        let ev_headers = Event::Headers {
3690            list: req,
3691            more_frames: false,
3692        };
3693
3694        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
3695        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
3696
3697        let total_data_frames = 4;
3698
3699        let resp = s.send_response(stream, false).unwrap();
3700
3701        for _ in 0..total_data_frames - 1 {
3702            s.send_body_server(stream, false).unwrap();
3703        }
3704
3705        let body = s.send_body_server(stream, true).unwrap();
3706
3707        let mut recv_buf = vec![0; body.len()];
3708
3709        let ev_headers = Event::Headers {
3710            list: resp,
3711            more_frames: true,
3712        };
3713
3714        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
3715        assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
3716        assert_eq!(s.poll_client(), Err(Error::Done));
3717
3718        for _ in 0..total_data_frames {
3719            assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(body.len()));
3720        }
3721
3722        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
3723        assert_eq!(s.poll_client(), Err(Error::Done));
3724    }
3725
3726    #[test]
3727    /// Send a request with one DATA frame, get a response with no body.
3728    fn request_one_chunk_response_no_body() {
3729        let mut s = Session::new().unwrap();
3730        s.handshake().unwrap();
3731
3732        let (stream, req) = s.send_request(false).unwrap();
3733
3734        let body = s.send_body_client(stream, true).unwrap();
3735
3736        let mut recv_buf = vec![0; body.len()];
3737
3738        let ev_headers = Event::Headers {
3739            list: req,
3740            more_frames: true,
3741        };
3742
3743        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
3744
3745        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
3746        assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
3747
3748        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
3749
3750        let resp = s.send_response(stream, true).unwrap();
3751
3752        let ev_headers = Event::Headers {
3753            list: resp,
3754            more_frames: false,
3755        };
3756
3757        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
3758        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
3759    }
3760
3761    #[test]
3762    /// Send a request with multiple DATA frames, get a response with no body.
3763    fn request_many_chunks_response_no_body() {
3764        let mut s = Session::new().unwrap();
3765        s.handshake().unwrap();
3766
3767        let (stream, req) = s.send_request(false).unwrap();
3768
3769        let total_data_frames = 4;
3770
3771        for _ in 0..total_data_frames - 1 {
3772            s.send_body_client(stream, false).unwrap();
3773        }
3774
3775        let body = s.send_body_client(stream, true).unwrap();
3776
3777        let mut recv_buf = vec![0; body.len()];
3778
3779        let ev_headers = Event::Headers {
3780            list: req,
3781            more_frames: true,
3782        };
3783
3784        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
3785        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
3786        assert_eq!(s.poll_server(), Err(Error::Done));
3787
3788        for _ in 0..total_data_frames {
3789            assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
3790        }
3791
3792        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
3793
3794        let resp = s.send_response(stream, true).unwrap();
3795
3796        let ev_headers = Event::Headers {
3797            list: resp,
3798            more_frames: false,
3799        };
3800
3801        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
3802        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
3803    }
3804
3805    #[test]
3806    /// Send a request with multiple DATA frames, get a response with one DATA
3807    /// frame.
3808    fn many_requests_many_chunks_response_one_chunk() {
3809        let mut s = Session::new().unwrap();
3810        s.handshake().unwrap();
3811
3812        let mut reqs = Vec::new();
3813
3814        let (stream1, req1) = s.send_request(false).unwrap();
3815        assert_eq!(stream1, 0);
3816        reqs.push(req1);
3817
3818        let (stream2, req2) = s.send_request(false).unwrap();
3819        assert_eq!(stream2, 4);
3820        reqs.push(req2);
3821
3822        let (stream3, req3) = s.send_request(false).unwrap();
3823        assert_eq!(stream3, 8);
3824        reqs.push(req3);
3825
3826        let body = s.send_body_client(stream1, false).unwrap();
3827        s.send_body_client(stream2, false).unwrap();
3828        s.send_body_client(stream3, false).unwrap();
3829
3830        let mut recv_buf = vec![0; body.len()];
3831
3832        // Reverse order of writes.
3833
3834        s.send_body_client(stream3, true).unwrap();
3835        s.send_body_client(stream2, true).unwrap();
3836        s.send_body_client(stream1, true).unwrap();
3837
3838        let (_, ev) = s.poll_server().unwrap();
3839        let ev_headers = Event::Headers {
3840            list: reqs[0].clone(),
3841            more_frames: true,
3842        };
3843        assert_eq!(ev, ev_headers);
3844
3845        let (_, ev) = s.poll_server().unwrap();
3846        let ev_headers = Event::Headers {
3847            list: reqs[1].clone(),
3848            more_frames: true,
3849        };
3850        assert_eq!(ev, ev_headers);
3851
3852        let (_, ev) = s.poll_server().unwrap();
3853        let ev_headers = Event::Headers {
3854            list: reqs[2].clone(),
3855            more_frames: true,
3856        };
3857        assert_eq!(ev, ev_headers);
3858
3859        assert_eq!(s.poll_server(), Ok((0, Event::Data)));
3860        assert_eq!(s.recv_body_server(0, &mut recv_buf), Ok(body.len()));
3861        assert_eq!(s.poll_client(), Err(Error::Done));
3862        assert_eq!(s.recv_body_server(0, &mut recv_buf), Ok(body.len()));
3863        assert_eq!(s.poll_server(), Ok((0, Event::Finished)));
3864
3865        assert_eq!(s.poll_server(), Ok((4, Event::Data)));
3866        assert_eq!(s.recv_body_server(4, &mut recv_buf), Ok(body.len()));
3867        assert_eq!(s.poll_client(), Err(Error::Done));
3868        assert_eq!(s.recv_body_server(4, &mut recv_buf), Ok(body.len()));
3869        assert_eq!(s.poll_server(), Ok((4, Event::Finished)));
3870
3871        assert_eq!(s.poll_server(), Ok((8, Event::Data)));
3872        assert_eq!(s.recv_body_server(8, &mut recv_buf), Ok(body.len()));
3873        assert_eq!(s.poll_client(), Err(Error::Done));
3874        assert_eq!(s.recv_body_server(8, &mut recv_buf), Ok(body.len()));
3875        assert_eq!(s.poll_server(), Ok((8, Event::Finished)));
3876
3877        assert_eq!(s.poll_server(), Err(Error::Done));
3878
3879        let mut resps = Vec::new();
3880
3881        let resp1 = s.send_response(stream1, true).unwrap();
3882        resps.push(resp1);
3883
3884        let resp2 = s.send_response(stream2, true).unwrap();
3885        resps.push(resp2);
3886
3887        let resp3 = s.send_response(stream3, true).unwrap();
3888        resps.push(resp3);
3889
3890        for _ in 0..resps.len() {
3891            let (stream, ev) = s.poll_client().unwrap();
3892            let ev_headers = Event::Headers {
3893                list: resps[(stream / 4) as usize].clone(),
3894                more_frames: false,
3895            };
3896            assert_eq!(ev, ev_headers);
3897            assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
3898        }
3899
3900        assert_eq!(s.poll_client(), Err(Error::Done));
3901    }
3902
3903    #[test]
3904    /// Send a request with no body, get a response with one DATA frame and an
3905    /// empty FIN after reception from the client.
3906    fn request_no_body_response_one_chunk_empty_fin() {
3907        let mut s = Session::new().unwrap();
3908        s.handshake().unwrap();
3909
3910        let (stream, req) = s.send_request(true).unwrap();
3911
3912        let ev_headers = Event::Headers {
3913            list: req,
3914            more_frames: false,
3915        };
3916
3917        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
3918        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
3919
3920        let resp = s.send_response(stream, false).unwrap();
3921
3922        let body = s.send_body_server(stream, false).unwrap();
3923
3924        let mut recv_buf = vec![0; body.len()];
3925
3926        let ev_headers = Event::Headers {
3927            list: resp,
3928            more_frames: true,
3929        };
3930
3931        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
3932
3933        assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
3934        assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(body.len()));
3935
3936        assert_eq!(s.pipe.server.stream_send(stream, &[], true), Ok(0));
3937        s.advance().ok();
3938
3939        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
3940        assert_eq!(s.poll_client(), Err(Error::Done));
3941    }
3942
3943    #[test]
3944    /// Send a request with no body, get a response with no body followed by
3945    /// GREASE that is STREAM frame with a FIN.
3946    fn request_no_body_response_no_body_with_grease() {
3947        let mut s = Session::new().unwrap();
3948        s.handshake().unwrap();
3949
3950        let (stream, req) = s.send_request(true).unwrap();
3951
3952        assert_eq!(stream, 0);
3953
3954        let ev_headers = Event::Headers {
3955            list: req,
3956            more_frames: false,
3957        };
3958
3959        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
3960        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
3961
3962        let resp = s.send_response(stream, false).unwrap();
3963
3964        let ev_headers = Event::Headers {
3965            list: resp,
3966            more_frames: true,
3967        };
3968
3969        // Inject a GREASE frame
3970        let mut d = [42; 10];
3971        let mut b = octets::OctetsMut::with_slice(&mut d);
3972
3973        let frame_type = b.put_varint(148_764_065_110_560_899).unwrap();
3974        s.pipe.server.stream_send(0, frame_type, false).unwrap();
3975
3976        let frame_len = b.put_varint(10).unwrap();
3977        s.pipe.server.stream_send(0, frame_len, false).unwrap();
3978
3979        s.pipe.server.stream_send(0, &d, true).unwrap();
3980
3981        s.advance().ok();
3982
3983        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
3984        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
3985        assert_eq!(s.poll_client(), Err(Error::Done));
3986    }
3987
3988    #[test]
3989    /// Try to send DATA frames before HEADERS.
3990    fn body_response_before_headers() {
3991        let mut s = Session::new().unwrap();
3992        s.handshake().unwrap();
3993
3994        let (stream, req) = s.send_request(true).unwrap();
3995        assert_eq!(stream, 0);
3996
3997        let ev_headers = Event::Headers {
3998            list: req,
3999            more_frames: false,
4000        };
4001
4002        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4003
4004        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
4005
4006        assert_eq!(
4007            s.send_body_server(stream, true),
4008            Err(Error::FrameUnexpected)
4009        );
4010
4011        assert_eq!(s.poll_client(), Err(Error::Done));
4012    }
4013
4014    #[test]
4015    /// Try to send DATA frames on wrong streams, ensure the API returns an
4016    /// error before anything hits the transport layer.
4017    fn send_body_invalid_client_stream() {
4018        let mut s = Session::new().unwrap();
4019        s.handshake().unwrap();
4020
4021        assert_eq!(s.send_body_client(0, true), Err(Error::FrameUnexpected));
4022
4023        assert_eq!(
4024            s.send_body_client(s.client.control_stream_id.unwrap(), true),
4025            Err(Error::FrameUnexpected)
4026        );
4027
4028        assert_eq!(
4029            s.send_body_client(
4030                s.client.local_qpack_streams.encoder_stream_id.unwrap(),
4031                true
4032            ),
4033            Err(Error::FrameUnexpected)
4034        );
4035
4036        assert_eq!(
4037            s.send_body_client(
4038                s.client.local_qpack_streams.decoder_stream_id.unwrap(),
4039                true
4040            ),
4041            Err(Error::FrameUnexpected)
4042        );
4043
4044        assert_eq!(
4045            s.send_body_client(s.client.peer_control_stream_id.unwrap(), true),
4046            Err(Error::FrameUnexpected)
4047        );
4048
4049        assert_eq!(
4050            s.send_body_client(
4051                s.client.peer_qpack_streams.encoder_stream_id.unwrap(),
4052                true
4053            ),
4054            Err(Error::FrameUnexpected)
4055        );
4056
4057        assert_eq!(
4058            s.send_body_client(
4059                s.client.peer_qpack_streams.decoder_stream_id.unwrap(),
4060                true
4061            ),
4062            Err(Error::FrameUnexpected)
4063        );
4064    }
4065
4066    #[test]
4067    /// Try to send DATA frames on wrong streams, ensure the API returns an
4068    /// error before anything hits the transport layer.
4069    fn send_body_invalid_server_stream() {
4070        let mut s = Session::new().unwrap();
4071        s.handshake().unwrap();
4072
4073        assert_eq!(s.send_body_server(0, true), Err(Error::FrameUnexpected));
4074
4075        assert_eq!(
4076            s.send_body_server(s.server.control_stream_id.unwrap(), true),
4077            Err(Error::FrameUnexpected)
4078        );
4079
4080        assert_eq!(
4081            s.send_body_server(
4082                s.server.local_qpack_streams.encoder_stream_id.unwrap(),
4083                true
4084            ),
4085            Err(Error::FrameUnexpected)
4086        );
4087
4088        assert_eq!(
4089            s.send_body_server(
4090                s.server.local_qpack_streams.decoder_stream_id.unwrap(),
4091                true
4092            ),
4093            Err(Error::FrameUnexpected)
4094        );
4095
4096        assert_eq!(
4097            s.send_body_server(s.server.peer_control_stream_id.unwrap(), true),
4098            Err(Error::FrameUnexpected)
4099        );
4100
4101        assert_eq!(
4102            s.send_body_server(
4103                s.server.peer_qpack_streams.encoder_stream_id.unwrap(),
4104                true
4105            ),
4106            Err(Error::FrameUnexpected)
4107        );
4108
4109        assert_eq!(
4110            s.send_body_server(
4111                s.server.peer_qpack_streams.decoder_stream_id.unwrap(),
4112                true
4113            ),
4114            Err(Error::FrameUnexpected)
4115        );
4116    }
4117
4118    #[test]
4119    /// Client sends request with body and trailers.
4120    fn trailers() {
4121        let mut s = Session::new().unwrap();
4122        s.handshake().unwrap();
4123
4124        let (stream, req) = s.send_request(false).unwrap();
4125
4126        let body = s.send_body_client(stream, false).unwrap();
4127
4128        let mut recv_buf = vec![0; body.len()];
4129
4130        let req_trailers = vec![Header::new(b"foo", b"bar")];
4131
4132        s.client
4133            .send_additional_headers(
4134                &mut s.pipe.client,
4135                stream,
4136                &req_trailers,
4137                true,
4138                true,
4139            )
4140            .unwrap();
4141
4142        s.advance().ok();
4143
4144        let ev_headers = Event::Headers {
4145            list: req,
4146            more_frames: true,
4147        };
4148
4149        let ev_trailers = Event::Headers {
4150            list: req_trailers,
4151            more_frames: false,
4152        };
4153
4154        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4155
4156        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
4157        assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
4158
4159        assert_eq!(s.poll_server(), Ok((stream, ev_trailers)));
4160        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
4161    }
4162
4163    #[test]
4164    /// Server responds with a 103, then a 200 with no body.
4165    fn informational_response() {
4166        let mut s = Session::new().unwrap();
4167        s.handshake().unwrap();
4168
4169        let (stream, req) = s.send_request(true).unwrap();
4170
4171        assert_eq!(stream, 0);
4172
4173        let ev_headers = Event::Headers {
4174            list: req,
4175            more_frames: false,
4176        };
4177
4178        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4179        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
4180
4181        let info_resp = vec![
4182            Header::new(b":status", b"103"),
4183            Header::new(b"link", b"<https://example.com>; rel=\"preconnect\""),
4184        ];
4185
4186        let resp = vec![
4187            Header::new(b":status", b"200"),
4188            Header::new(b"server", b"quiche-test"),
4189        ];
4190
4191        s.server
4192            .send_response(&mut s.pipe.server, stream, &info_resp, false)
4193            .unwrap();
4194
4195        s.server
4196            .send_additional_headers(
4197                &mut s.pipe.server,
4198                stream,
4199                &resp,
4200                false,
4201                true,
4202            )
4203            .unwrap();
4204
4205        s.advance().ok();
4206
4207        let ev_info_headers = Event::Headers {
4208            list: info_resp,
4209            more_frames: true,
4210        };
4211
4212        let ev_headers = Event::Headers {
4213            list: resp,
4214            more_frames: false,
4215        };
4216
4217        assert_eq!(s.poll_client(), Ok((stream, ev_info_headers)));
4218        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
4219        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
4220        assert_eq!(s.poll_client(), Err(Error::Done));
4221    }
4222
4223    #[test]
4224    /// Server responds with a 103, then attempts to send a 200 using
4225    /// send_response again, which should fail.
4226    fn no_multiple_response() {
4227        let mut s = Session::new().unwrap();
4228        s.handshake().unwrap();
4229
4230        let (stream, req) = s.send_request(true).unwrap();
4231
4232        assert_eq!(stream, 0);
4233
4234        let ev_headers = Event::Headers {
4235            list: req,
4236            more_frames: false,
4237        };
4238
4239        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4240        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
4241
4242        let info_resp = vec![
4243            Header::new(b":status", b"103"),
4244            Header::new(b"link", b"<https://example.com>; rel=\"preconnect\""),
4245        ];
4246
4247        let resp = vec![
4248            Header::new(b":status", b"200"),
4249            Header::new(b"server", b"quiche-test"),
4250        ];
4251
4252        s.server
4253            .send_response(&mut s.pipe.server, stream, &info_resp, false)
4254            .unwrap();
4255
4256        assert_eq!(
4257            Err(Error::FrameUnexpected),
4258            s.server
4259                .send_response(&mut s.pipe.server, stream, &resp, true)
4260        );
4261
4262        s.advance().ok();
4263
4264        let ev_info_headers = Event::Headers {
4265            list: info_resp,
4266            more_frames: true,
4267        };
4268
4269        assert_eq!(s.poll_client(), Ok((stream, ev_info_headers)));
4270        assert_eq!(s.poll_client(), Err(Error::Done));
4271    }
4272
4273    #[test]
4274    /// Server attempts to use send_additional_headers before initial response.
4275    fn no_send_additional_before_initial_response() {
4276        let mut s = Session::new().unwrap();
4277        s.handshake().unwrap();
4278
4279        let (stream, req) = s.send_request(true).unwrap();
4280
4281        assert_eq!(stream, 0);
4282
4283        let ev_headers = Event::Headers {
4284            list: req,
4285            more_frames: false,
4286        };
4287
4288        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4289        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
4290
4291        let info_resp = vec![
4292            Header::new(b":status", b"103"),
4293            Header::new(b"link", b"<https://example.com>; rel=\"preconnect\""),
4294        ];
4295
4296        assert_eq!(
4297            Err(Error::FrameUnexpected),
4298            s.server.send_additional_headers(
4299                &mut s.pipe.server,
4300                stream,
4301                &info_resp,
4302                false,
4303                false
4304            )
4305        );
4306
4307        s.advance().ok();
4308
4309        assert_eq!(s.poll_client(), Err(Error::Done));
4310    }
4311
4312    #[test]
4313    /// Client sends multiple HEADERS before data.
4314    fn additional_headers_before_data_client() {
4315        let mut s = Session::new().unwrap();
4316        s.handshake().unwrap();
4317
4318        let (stream, req) = s.send_request(false).unwrap();
4319
4320        let req_trailer = vec![Header::new(b"goodbye", b"world")];
4321
4322        assert_eq!(
4323            s.client.send_additional_headers(
4324                &mut s.pipe.client,
4325                stream,
4326                &req_trailer,
4327                true,
4328                false
4329            ),
4330            Ok(())
4331        );
4332
4333        s.advance().ok();
4334
4335        let ev_initial_headers = Event::Headers {
4336            list: req,
4337            more_frames: true,
4338        };
4339
4340        let ev_trailing_headers = Event::Headers {
4341            list: req_trailer,
4342            more_frames: true,
4343        };
4344
4345        assert_eq!(s.poll_server(), Ok((stream, ev_initial_headers)));
4346        assert_eq!(s.poll_server(), Ok((stream, ev_trailing_headers)));
4347        assert_eq!(s.poll_server(), Err(Error::Done));
4348    }
4349
4350    #[test]
4351    /// Client sends multiple HEADERS before data.
4352    fn data_after_trailers_client() {
4353        let mut s = Session::new().unwrap();
4354        s.handshake().unwrap();
4355
4356        let (stream, req) = s.send_request(false).unwrap();
4357
4358        let body = s.send_body_client(stream, false).unwrap();
4359
4360        let mut recv_buf = vec![0; body.len()];
4361
4362        let req_trailers = vec![Header::new(b"foo", b"bar")];
4363
4364        s.client
4365            .send_additional_headers(
4366                &mut s.pipe.client,
4367                stream,
4368                &req_trailers,
4369                true,
4370                false,
4371            )
4372            .unwrap();
4373
4374        s.advance().ok();
4375
4376        s.send_frame_client(
4377            frame::Frame::Data {
4378                payload: vec![1, 2, 3, 4],
4379            },
4380            stream,
4381            true,
4382        )
4383        .unwrap();
4384
4385        let ev_headers = Event::Headers {
4386            list: req,
4387            more_frames: true,
4388        };
4389
4390        let ev_trailers = Event::Headers {
4391            list: req_trailers,
4392            more_frames: true,
4393        };
4394
4395        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4396        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
4397        assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
4398        assert_eq!(s.poll_server(), Ok((stream, ev_trailers)));
4399        assert_eq!(s.poll_server(), Err(Error::FrameUnexpected));
4400    }
4401
4402    #[test]
4403    /// Send a MAX_PUSH_ID frame from the client on a valid stream.
4404    fn max_push_id_from_client_good() {
4405        let mut s = Session::new().unwrap();
4406        s.handshake().unwrap();
4407
4408        s.send_frame_client(
4409            frame::Frame::MaxPushId { push_id: 1 },
4410            s.client.control_stream_id.unwrap(),
4411            false,
4412        )
4413        .unwrap();
4414
4415        assert_eq!(s.poll_server(), Err(Error::Done));
4416    }
4417
4418    #[test]
4419    /// Send a MAX_PUSH_ID frame from the client on an invalid stream.
4420    fn max_push_id_from_client_bad_stream() {
4421        let mut s = Session::new().unwrap();
4422        s.handshake().unwrap();
4423
4424        let (stream, req) = s.send_request(false).unwrap();
4425
4426        s.send_frame_client(
4427            frame::Frame::MaxPushId { push_id: 2 },
4428            stream,
4429            false,
4430        )
4431        .unwrap();
4432
4433        let ev_headers = Event::Headers {
4434            list: req,
4435            more_frames: true,
4436        };
4437
4438        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4439        assert_eq!(s.poll_server(), Err(Error::FrameUnexpected));
4440    }
4441
4442    #[test]
4443    /// Send a sequence of MAX_PUSH_ID frames from the client that attempt to
4444    /// reduce the limit.
4445    fn max_push_id_from_client_limit_reduction() {
4446        let mut s = Session::new().unwrap();
4447        s.handshake().unwrap();
4448
4449        s.send_frame_client(
4450            frame::Frame::MaxPushId { push_id: 2 },
4451            s.client.control_stream_id.unwrap(),
4452            false,
4453        )
4454        .unwrap();
4455
4456        s.send_frame_client(
4457            frame::Frame::MaxPushId { push_id: 1 },
4458            s.client.control_stream_id.unwrap(),
4459            false,
4460        )
4461        .unwrap();
4462
4463        assert_eq!(s.poll_server(), Err(Error::IdError));
4464    }
4465
4466    #[test]
4467    /// Send a MAX_PUSH_ID frame from the server, which is forbidden.
4468    fn max_push_id_from_server() {
4469        let mut s = Session::new().unwrap();
4470        s.handshake().unwrap();
4471
4472        s.send_frame_server(
4473            frame::Frame::MaxPushId { push_id: 1 },
4474            s.server.control_stream_id.unwrap(),
4475            false,
4476        )
4477        .unwrap();
4478
4479        assert_eq!(s.poll_client(), Err(Error::FrameUnexpected));
4480    }
4481
4482    #[test]
4483    /// Send a PUSH_PROMISE frame from the client, which is forbidden.
4484    fn push_promise_from_client() {
4485        let mut s = Session::new().unwrap();
4486        s.handshake().unwrap();
4487
4488        let (stream, req) = s.send_request(false).unwrap();
4489
4490        let header_block = s.client.encode_header_block(&req).unwrap();
4491
4492        s.send_frame_client(
4493            frame::Frame::PushPromise {
4494                push_id: 1,
4495                header_block,
4496            },
4497            stream,
4498            false,
4499        )
4500        .unwrap();
4501
4502        let ev_headers = Event::Headers {
4503            list: req,
4504            more_frames: true,
4505        };
4506
4507        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4508        assert_eq!(s.poll_server(), Err(Error::FrameUnexpected));
4509    }
4510
4511    #[test]
4512    /// Send a CANCEL_PUSH frame from the client.
4513    fn cancel_push_from_client() {
4514        let mut s = Session::new().unwrap();
4515        s.handshake().unwrap();
4516
4517        s.send_frame_client(
4518            frame::Frame::CancelPush { push_id: 1 },
4519            s.client.control_stream_id.unwrap(),
4520            false,
4521        )
4522        .unwrap();
4523
4524        assert_eq!(s.poll_server(), Err(Error::Done));
4525    }
4526
4527    #[test]
4528    /// Send a CANCEL_PUSH frame from the client on an invalid stream.
4529    fn cancel_push_from_client_bad_stream() {
4530        let mut s = Session::new().unwrap();
4531        s.handshake().unwrap();
4532
4533        let (stream, req) = s.send_request(false).unwrap();
4534
4535        s.send_frame_client(
4536            frame::Frame::CancelPush { push_id: 2 },
4537            stream,
4538            false,
4539        )
4540        .unwrap();
4541
4542        let ev_headers = Event::Headers {
4543            list: req,
4544            more_frames: true,
4545        };
4546
4547        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4548        assert_eq!(s.poll_server(), Err(Error::FrameUnexpected));
4549    }
4550
4551    #[test]
4552    /// Send a CANCEL_PUSH frame from the client.
4553    fn cancel_push_from_server() {
4554        let mut s = Session::new().unwrap();
4555        s.handshake().unwrap();
4556
4557        s.send_frame_server(
4558            frame::Frame::CancelPush { push_id: 1 },
4559            s.server.control_stream_id.unwrap(),
4560            false,
4561        )
4562        .unwrap();
4563
4564        assert_eq!(s.poll_client(), Err(Error::Done));
4565    }
4566
4567    #[test]
4568    /// Send a GOAWAY frame from the client.
4569    fn goaway_from_client_good() {
4570        let mut s = Session::new().unwrap();
4571        s.handshake().unwrap();
4572
4573        s.client.send_goaway(&mut s.pipe.client, 100).unwrap();
4574
4575        s.advance().ok();
4576
4577        // TODO: server push
4578        assert_eq!(s.poll_server(), Ok((0, Event::GoAway)));
4579    }
4580
4581    #[test]
4582    /// Send a GOAWAY frame from the server.
4583    fn goaway_from_server_good() {
4584        let mut s = Session::new().unwrap();
4585        s.handshake().unwrap();
4586
4587        s.server.send_goaway(&mut s.pipe.server, 4000).unwrap();
4588
4589        s.advance().ok();
4590
4591        assert_eq!(s.poll_client(), Ok((4000, Event::GoAway)));
4592    }
4593
4594    #[test]
4595    /// A client MUST NOT send a request after it receives GOAWAY.
4596    fn client_request_after_goaway() {
4597        let mut s = Session::new().unwrap();
4598        s.handshake().unwrap();
4599
4600        s.server.send_goaway(&mut s.pipe.server, 4000).unwrap();
4601
4602        s.advance().ok();
4603
4604        assert_eq!(s.poll_client(), Ok((4000, Event::GoAway)));
4605
4606        assert_eq!(s.send_request(true), Err(Error::FrameUnexpected));
4607    }
4608
4609    #[test]
4610    /// Send a GOAWAY frame from the server, using an invalid goaway ID.
4611    fn goaway_from_server_invalid_id() {
4612        let mut s = Session::new().unwrap();
4613        s.handshake().unwrap();
4614
4615        s.send_frame_server(
4616            frame::Frame::GoAway { id: 1 },
4617            s.server.control_stream_id.unwrap(),
4618            false,
4619        )
4620        .unwrap();
4621
4622        assert_eq!(s.poll_client(), Err(Error::IdError));
4623    }
4624
4625    #[test]
4626    /// Send multiple GOAWAY frames from the server, that increase the goaway
4627    /// ID.
4628    fn goaway_from_server_increase_id() {
4629        let mut s = Session::new().unwrap();
4630        s.handshake().unwrap();
4631
4632        s.send_frame_server(
4633            frame::Frame::GoAway { id: 0 },
4634            s.server.control_stream_id.unwrap(),
4635            false,
4636        )
4637        .unwrap();
4638
4639        s.send_frame_server(
4640            frame::Frame::GoAway { id: 4 },
4641            s.server.control_stream_id.unwrap(),
4642            false,
4643        )
4644        .unwrap();
4645
4646        assert_eq!(s.poll_client(), Ok((0, Event::GoAway)));
4647
4648        assert_eq!(s.poll_client(), Err(Error::IdError));
4649    }
4650
4651    #[test]
4652    #[cfg(feature = "sfv")]
4653    fn parse_priority_field_value() {
4654        // Legal dicts
4655        assert_eq!(
4656            Ok(Priority::new(0, false)),
4657            Priority::try_from(b"u=0".as_slice())
4658        );
4659        assert_eq!(
4660            Ok(Priority::new(3, false)),
4661            Priority::try_from(b"u=3".as_slice())
4662        );
4663        assert_eq!(
4664            Ok(Priority::new(7, false)),
4665            Priority::try_from(b"u=7".as_slice())
4666        );
4667
4668        assert_eq!(
4669            Ok(Priority::new(0, true)),
4670            Priority::try_from(b"u=0, i".as_slice())
4671        );
4672        assert_eq!(
4673            Ok(Priority::new(3, true)),
4674            Priority::try_from(b"u=3, i".as_slice())
4675        );
4676        assert_eq!(
4677            Ok(Priority::new(7, true)),
4678            Priority::try_from(b"u=7, i".as_slice())
4679        );
4680
4681        assert_eq!(
4682            Ok(Priority::new(0, true)),
4683            Priority::try_from(b"u=0, i=?1".as_slice())
4684        );
4685        assert_eq!(
4686            Ok(Priority::new(3, true)),
4687            Priority::try_from(b"u=3, i=?1".as_slice())
4688        );
4689        assert_eq!(
4690            Ok(Priority::new(7, true)),
4691            Priority::try_from(b"u=7, i=?1".as_slice())
4692        );
4693
4694        assert_eq!(
4695            Ok(Priority::new(3, false)),
4696            Priority::try_from(b"".as_slice())
4697        );
4698
4699        assert_eq!(
4700            Ok(Priority::new(0, true)),
4701            Priority::try_from(b"u=0;foo, i;bar".as_slice())
4702        );
4703        assert_eq!(
4704            Ok(Priority::new(3, true)),
4705            Priority::try_from(b"u=3;hello, i;world".as_slice())
4706        );
4707        assert_eq!(
4708            Ok(Priority::new(7, true)),
4709            Priority::try_from(b"u=7;croeso, i;gymru".as_slice())
4710        );
4711
4712        assert_eq!(
4713            Ok(Priority::new(0, true)),
4714            Priority::try_from(b"u=0, i, spinaltap=11".as_slice())
4715        );
4716
4717        // Illegal formats
4718        assert_eq!(Err(Error::Done), Priority::try_from(b"0".as_slice()));
4719        assert_eq!(
4720            Ok(Priority::new(7, false)),
4721            Priority::try_from(b"u=-1".as_slice())
4722        );
4723        assert_eq!(Err(Error::Done), Priority::try_from(b"u=0.2".as_slice()));
4724        assert_eq!(
4725            Ok(Priority::new(7, false)),
4726            Priority::try_from(b"u=100".as_slice())
4727        );
4728        assert_eq!(
4729            Err(Error::Done),
4730            Priority::try_from(b"u=3, i=true".as_slice())
4731        );
4732
4733        // Trailing comma in dict is malformed
4734        assert_eq!(Err(Error::Done), Priority::try_from(b"u=7, ".as_slice()));
4735    }
4736
4737    #[test]
4738    /// Send a PRIORITY_UPDATE for request stream from the client.
4739    fn priority_update_request() {
4740        let mut s = Session::new().unwrap();
4741        s.handshake().unwrap();
4742
4743        s.client
4744            .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
4745                urgency: 3,
4746                incremental: false,
4747            })
4748            .unwrap();
4749        s.advance().ok();
4750
4751        assert_eq!(s.poll_server(), Ok((0, Event::PriorityUpdate)));
4752        assert_eq!(s.poll_server(), Err(Error::Done));
4753    }
4754
4755    #[test]
4756    /// Send a PRIORITY_UPDATE for request stream from the client.
4757    fn priority_update_single_stream_rearm() {
4758        let mut s = Session::new().unwrap();
4759        s.handshake().unwrap();
4760
4761        s.client
4762            .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
4763                urgency: 3,
4764                incremental: false,
4765            })
4766            .unwrap();
4767        s.advance().ok();
4768
4769        assert_eq!(s.poll_server(), Ok((0, Event::PriorityUpdate)));
4770        assert_eq!(s.poll_server(), Err(Error::Done));
4771
4772        s.client
4773            .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
4774                urgency: 5,
4775                incremental: false,
4776            })
4777            .unwrap();
4778        s.advance().ok();
4779
4780        assert_eq!(s.poll_server(), Err(Error::Done));
4781
4782        // There is only one PRIORITY_UPDATE frame to read. Once read, the event
4783        // will rearm ready for more.
4784        assert_eq!(s.server.take_last_priority_update(0), Ok(b"u=5".to_vec()));
4785        assert_eq!(s.server.take_last_priority_update(0), Err(Error::Done));
4786
4787        s.client
4788            .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
4789                urgency: 7,
4790                incremental: false,
4791            })
4792            .unwrap();
4793        s.advance().ok();
4794
4795        assert_eq!(s.poll_server(), Ok((0, Event::PriorityUpdate)));
4796        assert_eq!(s.poll_server(), Err(Error::Done));
4797
4798        assert_eq!(s.server.take_last_priority_update(0), Ok(b"u=7".to_vec()));
4799        assert_eq!(s.server.take_last_priority_update(0), Err(Error::Done));
4800    }
4801
4802    #[test]
4803    /// Send multiple PRIORITY_UPDATE frames for different streams from the
4804    /// client across multiple flights of exchange.
4805    fn priority_update_request_multiple_stream_arm_multiple_flights() {
4806        let mut s = Session::new().unwrap();
4807        s.handshake().unwrap();
4808
4809        s.client
4810            .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
4811                urgency: 3,
4812                incremental: false,
4813            })
4814            .unwrap();
4815        s.advance().ok();
4816
4817        assert_eq!(s.poll_server(), Ok((0, Event::PriorityUpdate)));
4818        assert_eq!(s.poll_server(), Err(Error::Done));
4819
4820        s.client
4821            .send_priority_update_for_request(&mut s.pipe.client, 4, &Priority {
4822                urgency: 1,
4823                incremental: false,
4824            })
4825            .unwrap();
4826        s.advance().ok();
4827
4828        assert_eq!(s.poll_server(), Ok((4, Event::PriorityUpdate)));
4829        assert_eq!(s.poll_server(), Err(Error::Done));
4830
4831        s.client
4832            .send_priority_update_for_request(&mut s.pipe.client, 8, &Priority {
4833                urgency: 2,
4834                incremental: false,
4835            })
4836            .unwrap();
4837        s.advance().ok();
4838
4839        assert_eq!(s.poll_server(), Ok((8, Event::PriorityUpdate)));
4840        assert_eq!(s.poll_server(), Err(Error::Done));
4841
4842        assert_eq!(s.server.take_last_priority_update(0), Ok(b"u=3".to_vec()));
4843        assert_eq!(s.server.take_last_priority_update(4), Ok(b"u=1".to_vec()));
4844        assert_eq!(s.server.take_last_priority_update(8), Ok(b"u=2".to_vec()));
4845        assert_eq!(s.server.take_last_priority_update(0), Err(Error::Done));
4846    }
4847
4848    #[test]
4849    /// Send multiple PRIORITY_UPDATE frames for different streams from the
4850    /// client across a single flight.
4851    fn priority_update_request_multiple_stream_arm_single_flight() {
4852        let mut s = Session::new().unwrap();
4853        s.handshake().unwrap();
4854
4855        let mut d = [42; 65535];
4856
4857        let mut b = octets::OctetsMut::with_slice(&mut d);
4858
4859        let p1 = frame::Frame::PriorityUpdateRequest {
4860            prioritized_element_id: 0,
4861            priority_field_value: b"u=3".to_vec(),
4862        };
4863
4864        let p2 = frame::Frame::PriorityUpdateRequest {
4865            prioritized_element_id: 4,
4866            priority_field_value: b"u=3".to_vec(),
4867        };
4868
4869        let p3 = frame::Frame::PriorityUpdateRequest {
4870            prioritized_element_id: 8,
4871            priority_field_value: b"u=3".to_vec(),
4872        };
4873
4874        p1.to_bytes(&mut b).unwrap();
4875        p2.to_bytes(&mut b).unwrap();
4876        p3.to_bytes(&mut b).unwrap();
4877
4878        let off = b.off();
4879        s.pipe
4880            .client
4881            .stream_send(s.client.control_stream_id.unwrap(), &d[..off], false)
4882            .unwrap();
4883
4884        s.advance().ok();
4885
4886        assert_eq!(s.poll_server(), Ok((0, Event::PriorityUpdate)));
4887        assert_eq!(s.poll_server(), Ok((4, Event::PriorityUpdate)));
4888        assert_eq!(s.poll_server(), Ok((8, Event::PriorityUpdate)));
4889        assert_eq!(s.poll_server(), Err(Error::Done));
4890
4891        assert_eq!(s.server.take_last_priority_update(0), Ok(b"u=3".to_vec()));
4892        assert_eq!(s.server.take_last_priority_update(4), Ok(b"u=3".to_vec()));
4893        assert_eq!(s.server.take_last_priority_update(8), Ok(b"u=3".to_vec()));
4894
4895        assert_eq!(s.server.take_last_priority_update(0), Err(Error::Done));
4896    }
4897
4898    #[test]
4899    /// Send a PRIORITY_UPDATE for a request stream, before and after the stream
4900    /// has been completed.
4901    fn priority_update_request_collected_completed() {
4902        let mut s = Session::new().unwrap();
4903        s.handshake().unwrap();
4904
4905        s.client
4906            .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
4907                urgency: 3,
4908                incremental: false,
4909            })
4910            .unwrap();
4911        s.advance().ok();
4912
4913        let (stream, req) = s.send_request(true).unwrap();
4914        let ev_headers = Event::Headers {
4915            list: req,
4916            more_frames: false,
4917        };
4918
4919        // Priority event is generated before request headers.
4920        assert_eq!(s.poll_server(), Ok((0, Event::PriorityUpdate)));
4921        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4922        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
4923        assert_eq!(s.poll_server(), Err(Error::Done));
4924
4925        assert_eq!(s.server.take_last_priority_update(0), Ok(b"u=3".to_vec()));
4926        assert_eq!(s.server.take_last_priority_update(0), Err(Error::Done));
4927
4928        let resp = s.send_response(stream, true).unwrap();
4929
4930        let ev_headers = Event::Headers {
4931            list: resp,
4932            more_frames: false,
4933        };
4934
4935        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
4936        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
4937        assert_eq!(s.poll_client(), Err(Error::Done));
4938
4939        // Now send a PRIORITY_UPDATE for the completed request stream.
4940        s.client
4941            .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
4942                urgency: 3,
4943                incremental: false,
4944            })
4945            .unwrap();
4946        s.advance().ok();
4947
4948        // No event generated at server
4949        assert_eq!(s.poll_server(), Err(Error::Done));
4950    }
4951
4952    #[test]
4953    /// Send a PRIORITY_UPDATE for a request stream, before and after the stream
4954    /// has been stopped.
4955    fn priority_update_request_collected_stopped() {
4956        let mut s = Session::new().unwrap();
4957        s.handshake().unwrap();
4958
4959        s.client
4960            .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
4961                urgency: 3,
4962                incremental: false,
4963            })
4964            .unwrap();
4965        s.advance().ok();
4966
4967        let (stream, req) = s.send_request(false).unwrap();
4968        let ev_headers = Event::Headers {
4969            list: req,
4970            more_frames: true,
4971        };
4972
4973        // Priority event is generated before request headers.
4974        assert_eq!(s.poll_server(), Ok((0, Event::PriorityUpdate)));
4975        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4976        assert_eq!(s.poll_server(), Err(Error::Done));
4977
4978        assert_eq!(s.server.take_last_priority_update(0), Ok(b"u=3".to_vec()));
4979        assert_eq!(s.server.take_last_priority_update(0), Err(Error::Done));
4980
4981        s.pipe
4982            .client
4983            .stream_shutdown(stream, crate::Shutdown::Write, 0x100)
4984            .unwrap();
4985        s.pipe
4986            .client
4987            .stream_shutdown(stream, crate::Shutdown::Read, 0x100)
4988            .unwrap();
4989
4990        s.advance().ok();
4991
4992        assert_eq!(s.poll_server(), Ok((0, Event::Reset(0x100))));
4993        assert_eq!(s.poll_server(), Err(Error::Done));
4994
4995        // Now send a PRIORITY_UPDATE for the closed request stream.
4996        s.client
4997            .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
4998                urgency: 3,
4999                incremental: false,
5000            })
5001            .unwrap();
5002        s.advance().ok();
5003
5004        // No event generated at server
5005        assert_eq!(s.poll_server(), Err(Error::Done));
5006    }
5007
5008    #[test]
5009    /// Send a PRIORITY_UPDATE for push stream from the client.
5010    fn priority_update_push() {
5011        let mut s = Session::new().unwrap();
5012        s.handshake().unwrap();
5013
5014        s.send_frame_client(
5015            frame::Frame::PriorityUpdatePush {
5016                prioritized_element_id: 3,
5017                priority_field_value: b"u=3".to_vec(),
5018            },
5019            s.client.control_stream_id.unwrap(),
5020            false,
5021        )
5022        .unwrap();
5023
5024        assert_eq!(s.poll_server(), Err(Error::Done));
5025    }
5026
5027    #[test]
5028    /// Send a PRIORITY_UPDATE for request stream from the client but for an
5029    /// incorrect stream type.
5030    fn priority_update_request_bad_stream() {
5031        let mut s = Session::new().unwrap();
5032        s.handshake().unwrap();
5033
5034        s.send_frame_client(
5035            frame::Frame::PriorityUpdateRequest {
5036                prioritized_element_id: 5,
5037                priority_field_value: b"u=3".to_vec(),
5038            },
5039            s.client.control_stream_id.unwrap(),
5040            false,
5041        )
5042        .unwrap();
5043
5044        assert_eq!(s.poll_server(), Err(Error::FrameUnexpected));
5045    }
5046
5047    #[test]
5048    /// Send a PRIORITY_UPDATE for push stream from the client but for an
5049    /// incorrect stream type.
5050    fn priority_update_push_bad_stream() {
5051        let mut s = Session::new().unwrap();
5052        s.handshake().unwrap();
5053
5054        s.send_frame_client(
5055            frame::Frame::PriorityUpdatePush {
5056                prioritized_element_id: 5,
5057                priority_field_value: b"u=3".to_vec(),
5058            },
5059            s.client.control_stream_id.unwrap(),
5060            false,
5061        )
5062        .unwrap();
5063
5064        assert_eq!(s.poll_server(), Err(Error::FrameUnexpected));
5065    }
5066
5067    #[test]
5068    /// Send a PRIORITY_UPDATE for request stream from the server.
5069    fn priority_update_request_from_server() {
5070        let mut s = Session::new().unwrap();
5071        s.handshake().unwrap();
5072
5073        s.send_frame_server(
5074            frame::Frame::PriorityUpdateRequest {
5075                prioritized_element_id: 0,
5076                priority_field_value: b"u=3".to_vec(),
5077            },
5078            s.server.control_stream_id.unwrap(),
5079            false,
5080        )
5081        .unwrap();
5082
5083        assert_eq!(s.poll_client(), Err(Error::FrameUnexpected));
5084    }
5085
5086    #[test]
5087    /// Send a PRIORITY_UPDATE for request stream from the server.
5088    fn priority_update_push_from_server() {
5089        let mut s = Session::new().unwrap();
5090        s.handshake().unwrap();
5091
5092        s.send_frame_server(
5093            frame::Frame::PriorityUpdatePush {
5094                prioritized_element_id: 0,
5095                priority_field_value: b"u=3".to_vec(),
5096            },
5097            s.server.control_stream_id.unwrap(),
5098            false,
5099        )
5100        .unwrap();
5101
5102        assert_eq!(s.poll_client(), Err(Error::FrameUnexpected));
5103    }
5104
5105    #[test]
5106    /// Ensure quiche allocates streams for client and server roles as expected.
5107    fn uni_stream_local_counting() {
5108        let config = Config::new().unwrap();
5109
5110        let h3_cln = Connection::new(&config, false, false).unwrap();
5111        assert_eq!(h3_cln.next_uni_stream_id, 2);
5112
5113        let h3_srv = Connection::new(&config, true, false).unwrap();
5114        assert_eq!(h3_srv.next_uni_stream_id, 3);
5115    }
5116
5117    #[test]
5118    /// Client opens multiple control streams, which is forbidden.
5119    fn open_multiple_control_streams() {
5120        let mut s = Session::new().unwrap();
5121        s.handshake().unwrap();
5122
5123        let stream_id = s.client.next_uni_stream_id;
5124
5125        let mut d = [42; 8];
5126        let mut b = octets::OctetsMut::with_slice(&mut d);
5127
5128        s.pipe
5129            .client
5130            .stream_send(
5131                stream_id,
5132                b.put_varint(stream::HTTP3_CONTROL_STREAM_TYPE_ID).unwrap(),
5133                false,
5134            )
5135            .unwrap();
5136
5137        s.advance().ok();
5138
5139        assert_eq!(s.poll_server(), Err(Error::StreamCreationError));
5140    }
5141
5142    #[test]
5143    /// Client closes the control stream, which is forbidden.
5144    fn close_control_stream_after_type() {
5145        let mut s = Session::new().unwrap();
5146        s.handshake().unwrap();
5147
5148        s.pipe
5149            .client
5150            .stream_send(s.client.control_stream_id.unwrap(), &[], true)
5151            .unwrap();
5152
5153        s.advance().ok();
5154
5155        assert_eq!(
5156            Err(Error::ClosedCriticalStream),
5157            s.server.poll(&mut s.pipe.server)
5158        );
5159        assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
5160    }
5161
5162    #[test]
5163    /// Client closes the control stream after a frame is sent, which is
5164    /// forbidden.
5165    fn close_control_stream_after_frame() {
5166        let mut s = Session::new().unwrap();
5167        s.handshake().unwrap();
5168
5169        s.send_frame_client(
5170            frame::Frame::MaxPushId { push_id: 1 },
5171            s.client.control_stream_id.unwrap(),
5172            true,
5173        )
5174        .unwrap();
5175
5176        assert_eq!(
5177            Err(Error::ClosedCriticalStream),
5178            s.server.poll(&mut s.pipe.server)
5179        );
5180        assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
5181    }
5182
5183    #[test]
5184    /// Client resets the control stream, which is forbidden.
5185    fn reset_control_stream_after_type() {
5186        let mut s = Session::new().unwrap();
5187        s.handshake().unwrap();
5188
5189        s.pipe
5190            .client
5191            .stream_shutdown(
5192                s.client.control_stream_id.unwrap(),
5193                crate::Shutdown::Write,
5194                0,
5195            )
5196            .unwrap();
5197
5198        s.advance().ok();
5199
5200        assert_eq!(
5201            Err(Error::ClosedCriticalStream),
5202            s.server.poll(&mut s.pipe.server)
5203        );
5204        assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
5205    }
5206
5207    #[test]
5208    /// Client resets the control stream after a frame is sent, which is
5209    /// forbidden.
5210    fn reset_control_stream_after_frame() {
5211        let mut s = Session::new().unwrap();
5212        s.handshake().unwrap();
5213
5214        s.send_frame_client(
5215            frame::Frame::MaxPushId { push_id: 1 },
5216            s.client.control_stream_id.unwrap(),
5217            false,
5218        )
5219        .unwrap();
5220
5221        assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
5222
5223        s.pipe
5224            .client
5225            .stream_shutdown(
5226                s.client.control_stream_id.unwrap(),
5227                crate::Shutdown::Write,
5228                0,
5229            )
5230            .unwrap();
5231
5232        s.advance().ok();
5233
5234        assert_eq!(
5235            Err(Error::ClosedCriticalStream),
5236            s.server.poll(&mut s.pipe.server)
5237        );
5238        assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
5239    }
5240
5241    #[test]
5242    /// Client closes QPACK stream, which is forbidden.
5243    fn close_qpack_stream_after_type() {
5244        let mut s = Session::new().unwrap();
5245        s.handshake().unwrap();
5246
5247        s.pipe
5248            .client
5249            .stream_send(
5250                s.client.local_qpack_streams.encoder_stream_id.unwrap(),
5251                &[],
5252                true,
5253            )
5254            .unwrap();
5255
5256        s.advance().ok();
5257
5258        assert_eq!(
5259            Err(Error::ClosedCriticalStream),
5260            s.server.poll(&mut s.pipe.server)
5261        );
5262        assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
5263    }
5264
5265    #[test]
5266    /// Client closes QPACK stream after sending some stuff, which is forbidden.
5267    fn close_qpack_stream_after_data() {
5268        let mut s = Session::new().unwrap();
5269        s.handshake().unwrap();
5270
5271        let stream_id = s.client.local_qpack_streams.encoder_stream_id.unwrap();
5272        let d = [0; 1];
5273
5274        s.pipe.client.stream_send(stream_id, &d, false).unwrap();
5275        s.pipe.client.stream_send(stream_id, &d, true).unwrap();
5276
5277        s.advance().ok();
5278
5279        assert_eq!(
5280            Err(Error::ClosedCriticalStream),
5281            s.server.poll(&mut s.pipe.server)
5282        );
5283        assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
5284    }
5285
5286    #[test]
5287    /// Client resets QPACK stream, which is forbidden.
5288    fn reset_qpack_stream_after_type() {
5289        let mut s = Session::new().unwrap();
5290        s.handshake().unwrap();
5291
5292        s.pipe
5293            .client
5294            .stream_shutdown(
5295                s.client.local_qpack_streams.encoder_stream_id.unwrap(),
5296                crate::Shutdown::Write,
5297                0,
5298            )
5299            .unwrap();
5300
5301        s.advance().ok();
5302
5303        assert_eq!(
5304            Err(Error::ClosedCriticalStream),
5305            s.server.poll(&mut s.pipe.server)
5306        );
5307        assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
5308    }
5309
5310    #[test]
5311    /// Client resets QPACK stream after sending some stuff, which is forbidden.
5312    fn reset_qpack_stream_after_data() {
5313        let mut s = Session::new().unwrap();
5314        s.handshake().unwrap();
5315
5316        let stream_id = s.client.local_qpack_streams.encoder_stream_id.unwrap();
5317        let d = [0; 1];
5318
5319        s.pipe.client.stream_send(stream_id, &d, false).unwrap();
5320        s.pipe.client.stream_send(stream_id, &d, false).unwrap();
5321
5322        s.advance().ok();
5323
5324        assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
5325
5326        s.pipe
5327            .client
5328            .stream_shutdown(stream_id, crate::Shutdown::Write, 0)
5329            .unwrap();
5330
5331        s.advance().ok();
5332
5333        assert_eq!(
5334            Err(Error::ClosedCriticalStream),
5335            s.server.poll(&mut s.pipe.server)
5336        );
5337        assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
5338    }
5339
5340    #[test]
5341    /// Client sends QPACK data.
5342    fn qpack_data() {
5343        // TODO: QPACK instructions are ignored until dynamic table support is
5344        // added so we just test that the data is safely ignored.
5345        let mut s = Session::new().unwrap();
5346        s.handshake().unwrap();
5347
5348        let e_stream_id = s.client.local_qpack_streams.encoder_stream_id.unwrap();
5349        let d_stream_id = s.client.local_qpack_streams.decoder_stream_id.unwrap();
5350        let d = [0; 20];
5351
5352        s.pipe.client.stream_send(e_stream_id, &d, false).unwrap();
5353        s.advance().ok();
5354
5355        s.pipe.client.stream_send(d_stream_id, &d, false).unwrap();
5356        s.advance().ok();
5357
5358        match s.server.poll(&mut s.pipe.server) {
5359            Ok(_) => panic!(),
5360
5361            Err(Error::Done) => {
5362                assert_eq!(s.server.peer_qpack_streams.encoder_stream_bytes, 20);
5363                assert_eq!(s.server.peer_qpack_streams.decoder_stream_bytes, 20);
5364            },
5365
5366            Err(_) => {
5367                panic!();
5368            },
5369        }
5370
5371        let stats = s.server.stats();
5372        assert_eq!(stats.qpack_encoder_stream_recv_bytes, 20);
5373        assert_eq!(stats.qpack_decoder_stream_recv_bytes, 20);
5374    }
5375
5376    #[test]
5377    /// Tests limits for the stream state buffer maximum size.
5378    fn max_state_buf_size() {
5379        let mut s = Session::new().unwrap();
5380        s.handshake().unwrap();
5381
5382        let req = vec![
5383            Header::new(b":method", b"GET"),
5384            Header::new(b":scheme", b"https"),
5385            Header::new(b":authority", b"quic.tech"),
5386            Header::new(b":path", b"/test"),
5387            Header::new(b"user-agent", b"quiche-test"),
5388        ];
5389
5390        assert_eq!(
5391            s.client.send_request(&mut s.pipe.client, &req, false),
5392            Ok(0)
5393        );
5394
5395        s.advance().ok();
5396
5397        let ev_headers = Event::Headers {
5398            list: req,
5399            more_frames: true,
5400        };
5401
5402        assert_eq!(s.server.poll(&mut s.pipe.server), Ok((0, ev_headers)));
5403
5404        // DATA frames don't consume the state buffer, so can be of any size.
5405        let mut d = [42; 128];
5406        let mut b = octets::OctetsMut::with_slice(&mut d);
5407
5408        let frame_type = b.put_varint(frame::DATA_FRAME_TYPE_ID).unwrap();
5409        s.pipe.client.stream_send(0, frame_type, false).unwrap();
5410
5411        let frame_len = b.put_varint(1 << 24).unwrap();
5412        s.pipe.client.stream_send(0, frame_len, false).unwrap();
5413
5414        s.pipe.client.stream_send(0, &d, false).unwrap();
5415
5416        s.advance().ok();
5417
5418        assert_eq!(s.server.poll(&mut s.pipe.server), Ok((0, Event::Data)));
5419
5420        // GREASE frames consume the state buffer, so need to be limited.
5421        let mut s = Session::new().unwrap();
5422        s.handshake().unwrap();
5423
5424        let mut d = [42; 128];
5425        let mut b = octets::OctetsMut::with_slice(&mut d);
5426
5427        let frame_type = b.put_varint(148_764_065_110_560_899).unwrap();
5428        s.pipe.client.stream_send(0, frame_type, false).unwrap();
5429
5430        let frame_len = b.put_varint(1 << 24).unwrap();
5431        s.pipe.client.stream_send(0, frame_len, false).unwrap();
5432
5433        s.pipe.client.stream_send(0, &d, false).unwrap();
5434
5435        s.advance().ok();
5436
5437        assert_eq!(s.server.poll(&mut s.pipe.server), Err(Error::ExcessiveLoad));
5438    }
5439
5440    #[test]
5441    /// Tests that DATA frames are properly truncated depending on the request
5442    /// stream's outgoing flow control capacity.
5443    fn stream_backpressure() {
5444        let bytes = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
5445
5446        let mut s = Session::new().unwrap();
5447        s.handshake().unwrap();
5448
5449        let (stream, req) = s.send_request(false).unwrap();
5450
5451        let total_data_frames = 6;
5452
5453        for _ in 0..total_data_frames {
5454            assert_eq!(
5455                s.client
5456                    .send_body(&mut s.pipe.client, stream, &bytes, false),
5457                Ok(bytes.len())
5458            );
5459
5460            s.advance().ok();
5461        }
5462
5463        assert_eq!(
5464            s.client.send_body(&mut s.pipe.client, stream, &bytes, true),
5465            Ok(bytes.len() - 2)
5466        );
5467
5468        s.advance().ok();
5469
5470        let mut recv_buf = vec![0; bytes.len()];
5471
5472        let ev_headers = Event::Headers {
5473            list: req,
5474            more_frames: true,
5475        };
5476
5477        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
5478        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
5479        assert_eq!(s.poll_server(), Err(Error::Done));
5480
5481        for _ in 0..total_data_frames {
5482            assert_eq!(
5483                s.recv_body_server(stream, &mut recv_buf),
5484                Ok(bytes.len())
5485            );
5486        }
5487
5488        assert_eq!(
5489            s.recv_body_server(stream, &mut recv_buf),
5490            Ok(bytes.len() - 2)
5491        );
5492
5493        // Fin flag from last send_body() call was not sent as the buffer was
5494        // only partially written.
5495        assert_eq!(s.poll_server(), Err(Error::Done));
5496    }
5497
5498    #[test]
5499    /// Tests that the max header list size setting is enforced.
5500    fn request_max_header_size_limit() {
5501        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
5502        config
5503            .load_cert_chain_from_pem_file("examples/cert.crt")
5504            .unwrap();
5505        config
5506            .load_priv_key_from_pem_file("examples/cert.key")
5507            .unwrap();
5508        config.set_application_protos(&[b"h3"]).unwrap();
5509        config.set_initial_max_data(1500);
5510        config.set_initial_max_stream_data_bidi_local(150);
5511        config.set_initial_max_stream_data_bidi_remote(150);
5512        config.set_initial_max_stream_data_uni(150);
5513        config.set_initial_max_streams_bidi(5);
5514        config.set_initial_max_streams_uni(5);
5515        config.verify_peer(false);
5516
5517        let mut h3_config = Config::new().unwrap();
5518        h3_config.set_max_field_section_size(65);
5519
5520        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
5521
5522        s.handshake().unwrap();
5523
5524        let req = vec![
5525            Header::new(b":method", b"GET"),
5526            Header::new(b":scheme", b"https"),
5527            Header::new(b":authority", b"quic.tech"),
5528            Header::new(b":path", b"/test"),
5529            Header::new(b"aaaaaaa", b"aaaaaaaa"),
5530        ];
5531
5532        let stream = s
5533            .client
5534            .send_request(&mut s.pipe.client, &req, true)
5535            .unwrap();
5536
5537        s.advance().ok();
5538
5539        assert_eq!(stream, 0);
5540
5541        assert_eq!(s.poll_server(), Err(Error::ExcessiveLoad));
5542
5543        assert_eq!(
5544            s.pipe.server.local_error.as_ref().unwrap().error_code,
5545            Error::to_wire(Error::ExcessiveLoad)
5546        );
5547    }
5548
5549    #[test]
5550    /// Tests that Error::TransportError contains a transport error.
5551    fn transport_error() {
5552        let mut s = Session::new().unwrap();
5553        s.handshake().unwrap();
5554
5555        let req = vec![
5556            Header::new(b":method", b"GET"),
5557            Header::new(b":scheme", b"https"),
5558            Header::new(b":authority", b"quic.tech"),
5559            Header::new(b":path", b"/test"),
5560            Header::new(b"user-agent", b"quiche-test"),
5561        ];
5562
5563        // We need to open all streams in the same flight, so we can't use the
5564        // Session::send_request() method because it also calls advance(),
5565        // otherwise the server would send a MAX_STREAMS frame and the client
5566        // wouldn't hit the streams limit.
5567        assert_eq!(s.client.send_request(&mut s.pipe.client, &req, true), Ok(0));
5568        assert_eq!(s.client.send_request(&mut s.pipe.client, &req, true), Ok(4));
5569        assert_eq!(s.client.send_request(&mut s.pipe.client, &req, true), Ok(8));
5570        assert_eq!(
5571            s.client.send_request(&mut s.pipe.client, &req, true),
5572            Ok(12)
5573        );
5574        assert_eq!(
5575            s.client.send_request(&mut s.pipe.client, &req, true),
5576            Ok(16)
5577        );
5578
5579        assert_eq!(
5580            s.client.send_request(&mut s.pipe.client, &req, true),
5581            Err(Error::TransportError(crate::Error::StreamLimit))
5582        );
5583    }
5584
5585    #[test]
5586    /// Tests that sending DATA before HEADERS causes an error.
5587    fn data_before_headers() {
5588        let mut s = Session::new().unwrap();
5589        s.handshake().unwrap();
5590
5591        let mut d = [42; 128];
5592        let mut b = octets::OctetsMut::with_slice(&mut d);
5593
5594        let frame_type = b.put_varint(frame::DATA_FRAME_TYPE_ID).unwrap();
5595        s.pipe.client.stream_send(0, frame_type, false).unwrap();
5596
5597        let frame_len = b.put_varint(5).unwrap();
5598        s.pipe.client.stream_send(0, frame_len, false).unwrap();
5599
5600        s.pipe.client.stream_send(0, b"hello", false).unwrap();
5601
5602        s.advance().ok();
5603
5604        assert_eq!(
5605            s.server.poll(&mut s.pipe.server),
5606            Err(Error::FrameUnexpected)
5607        );
5608    }
5609
5610    #[test]
5611    /// Tests that calling poll() after an error occurred does nothing.
5612    fn poll_after_error() {
5613        let mut s = Session::new().unwrap();
5614        s.handshake().unwrap();
5615
5616        let mut d = [42; 128];
5617        let mut b = octets::OctetsMut::with_slice(&mut d);
5618
5619        let frame_type = b.put_varint(148_764_065_110_560_899).unwrap();
5620        s.pipe.client.stream_send(0, frame_type, false).unwrap();
5621
5622        let frame_len = b.put_varint(1 << 24).unwrap();
5623        s.pipe.client.stream_send(0, frame_len, false).unwrap();
5624
5625        s.pipe.client.stream_send(0, &d, false).unwrap();
5626
5627        s.advance().ok();
5628
5629        assert_eq!(s.server.poll(&mut s.pipe.server), Err(Error::ExcessiveLoad));
5630
5631        // Try to call poll() again after an error occurred.
5632        assert_eq!(s.server.poll(&mut s.pipe.server), Err(Error::Done));
5633    }
5634
5635    #[test]
5636    /// Tests that we limit sending HEADERS based on the stream capacity.
5637    fn headers_blocked() {
5638        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
5639        config
5640            .load_cert_chain_from_pem_file("examples/cert.crt")
5641            .unwrap();
5642        config
5643            .load_priv_key_from_pem_file("examples/cert.key")
5644            .unwrap();
5645        config.set_application_protos(&[b"h3"]).unwrap();
5646        config.set_initial_max_data(70);
5647        config.set_initial_max_stream_data_bidi_local(150);
5648        config.set_initial_max_stream_data_bidi_remote(150);
5649        config.set_initial_max_stream_data_uni(150);
5650        config.set_initial_max_streams_bidi(100);
5651        config.set_initial_max_streams_uni(5);
5652        config.verify_peer(false);
5653
5654        let h3_config = Config::new().unwrap();
5655
5656        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
5657
5658        s.handshake().unwrap();
5659
5660        let req = vec![
5661            Header::new(b":method", b"GET"),
5662            Header::new(b":scheme", b"https"),
5663            Header::new(b":authority", b"quic.tech"),
5664            Header::new(b":path", b"/test"),
5665        ];
5666
5667        assert_eq!(s.client.send_request(&mut s.pipe.client, &req, true), Ok(0));
5668
5669        assert_eq!(
5670            s.client.send_request(&mut s.pipe.client, &req, true),
5671            Err(Error::StreamBlocked)
5672        );
5673
5674        // Clear the writable stream queue.
5675        assert_eq!(s.pipe.client.stream_writable_next(), Some(2));
5676        assert_eq!(s.pipe.client.stream_writable_next(), Some(6));
5677        assert_eq!(s.pipe.client.stream_writable_next(), Some(10));
5678        assert_eq!(s.pipe.client.stream_writable_next(), None);
5679
5680        s.advance().ok();
5681
5682        // Once the server gives flow control credits back, we can send the
5683        // request.
5684        assert_eq!(s.pipe.client.stream_writable_next(), Some(4));
5685        assert_eq!(s.client.send_request(&mut s.pipe.client, &req, true), Ok(4));
5686    }
5687
5688    #[test]
5689    /// Ensure StreamBlocked when connection flow control prevents headers.
5690    fn headers_blocked_on_conn() {
5691        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
5692        config
5693            .load_cert_chain_from_pem_file("examples/cert.crt")
5694            .unwrap();
5695        config
5696            .load_priv_key_from_pem_file("examples/cert.key")
5697            .unwrap();
5698        config.set_application_protos(&[b"h3"]).unwrap();
5699        config.set_initial_max_data(70);
5700        config.set_initial_max_stream_data_bidi_local(150);
5701        config.set_initial_max_stream_data_bidi_remote(150);
5702        config.set_initial_max_stream_data_uni(150);
5703        config.set_initial_max_streams_bidi(100);
5704        config.set_initial_max_streams_uni(5);
5705        config.verify_peer(false);
5706
5707        let h3_config = Config::new().unwrap();
5708
5709        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
5710
5711        s.handshake().unwrap();
5712
5713        // After the HTTP handshake, some bytes of connection flow control have
5714        // been consumed. Fill the connection with more grease data on the control
5715        // stream.
5716        let d = [42; 28];
5717        assert_eq!(s.pipe.client.stream_send(2, &d, false), Ok(23));
5718
5719        let req = vec![
5720            Header::new(b":method", b"GET"),
5721            Header::new(b":scheme", b"https"),
5722            Header::new(b":authority", b"quic.tech"),
5723            Header::new(b":path", b"/test"),
5724        ];
5725
5726        // There is 0 connection-level flow control, so sending a request is
5727        // blocked.
5728        assert_eq!(
5729            s.client.send_request(&mut s.pipe.client, &req, true),
5730            Err(Error::StreamBlocked)
5731        );
5732        assert_eq!(s.pipe.client.stream_writable_next(), None);
5733
5734        // Emit the control stream data and drain it at the server via poll() to
5735        // consumes it via poll() and gives back flow control.
5736        s.advance().ok();
5737        assert_eq!(s.poll_server(), Err(Error::Done));
5738        s.advance().ok();
5739
5740        // Now we can send the request.
5741        assert_eq!(s.pipe.client.stream_writable_next(), Some(2));
5742        assert_eq!(s.pipe.client.stream_writable_next(), Some(6));
5743        assert_eq!(s.client.send_request(&mut s.pipe.client, &req, true), Ok(0));
5744    }
5745
5746    #[test]
5747    /// Ensure STREAM_DATA_BLOCKED is not emitted multiple times with the same
5748    /// offset when trying to send large bodies.
5749    fn send_body_truncation_stream_blocked() {
5750        use crate::testing::decode_pkt;
5751
5752        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
5753        config
5754            .load_cert_chain_from_pem_file("examples/cert.crt")
5755            .unwrap();
5756        config
5757            .load_priv_key_from_pem_file("examples/cert.key")
5758            .unwrap();
5759        config.set_application_protos(&[b"h3"]).unwrap();
5760        config.set_initial_max_data(10000); // large connection-level flow control
5761        config.set_initial_max_stream_data_bidi_local(80);
5762        config.set_initial_max_stream_data_bidi_remote(80);
5763        config.set_initial_max_stream_data_uni(150);
5764        config.set_initial_max_streams_bidi(100);
5765        config.set_initial_max_streams_uni(5);
5766        config.verify_peer(false);
5767
5768        let h3_config = Config::new().unwrap();
5769
5770        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
5771
5772        s.handshake().unwrap();
5773
5774        let (stream, req) = s.send_request(true).unwrap();
5775
5776        let ev_headers = Event::Headers {
5777            list: req,
5778            more_frames: false,
5779        };
5780
5781        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
5782        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
5783
5784        let _ = s.send_response(stream, false).unwrap();
5785
5786        assert_eq!(s.pipe.server.streams.blocked().len(), 0);
5787
5788        // The body must be larger than the stream window would allow
5789        let d = [42; 500];
5790        let mut off = 0;
5791
5792        let sent = s
5793            .server
5794            .send_body(&mut s.pipe.server, stream, &d, true)
5795            .unwrap();
5796        assert_eq!(sent, 25);
5797        off += sent;
5798
5799        // send_body wrote as much as it could (sent < size of buff).
5800        assert_eq!(s.pipe.server.streams.blocked().len(), 1);
5801        assert_eq!(
5802            s.server
5803                .send_body(&mut s.pipe.server, stream, &d[off..], true),
5804            Err(Error::Done)
5805        );
5806        assert_eq!(s.pipe.server.streams.blocked().len(), 1);
5807
5808        // Now read raw frames to see what the QUIC layer did
5809        let mut buf = [0; 65535];
5810        let (len, _) = s.pipe.server.send(&mut buf).unwrap();
5811
5812        let frames = decode_pkt(&mut s.pipe.client, &mut buf[..len]).unwrap();
5813
5814        let mut iter = frames.iter();
5815
5816        assert_eq!(
5817            iter.next(),
5818            Some(&crate::frame::Frame::StreamDataBlocked {
5819                stream_id: 0,
5820                limit: 80,
5821            })
5822        );
5823
5824        // At the server, after sending the STREAM_DATA_BLOCKED frame, we clear
5825        // the mark.
5826        assert_eq!(s.pipe.server.streams.blocked().len(), 0);
5827
5828        // Don't read any data from the client, so stream flow control is never
5829        // given back in the form of changing the stream's max offset.
5830        // Subsequent body send operations will still fail but no more
5831        // STREAM_DATA_BLOCKED frames should be submitted since the limit didn't
5832        // change. No frames means no packet to send.
5833        assert_eq!(
5834            s.server
5835                .send_body(&mut s.pipe.server, stream, &d[off..], true),
5836            Err(Error::Done)
5837        );
5838        assert_eq!(s.pipe.server.streams.blocked().len(), 0);
5839        assert_eq!(s.pipe.server.send(&mut buf), Err(crate::Error::Done));
5840
5841        // Now update the client's max offset manually.
5842        let frames = [crate::frame::Frame::MaxStreamData {
5843            stream_id: 0,
5844            max: 100,
5845        }];
5846
5847        let pkt_type = crate::packet::Type::Short;
5848        assert_eq!(
5849            s.pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
5850            Ok(39),
5851        );
5852
5853        let sent = s
5854            .server
5855            .send_body(&mut s.pipe.server, stream, &d[off..], true)
5856            .unwrap();
5857        assert_eq!(sent, 18);
5858
5859        // Same thing here...
5860        assert_eq!(s.pipe.server.streams.blocked().len(), 1);
5861        assert_eq!(
5862            s.server
5863                .send_body(&mut s.pipe.server, stream, &d[off..], true),
5864            Err(Error::Done)
5865        );
5866        assert_eq!(s.pipe.server.streams.blocked().len(), 1);
5867
5868        let (len, _) = s.pipe.server.send(&mut buf).unwrap();
5869
5870        let frames = decode_pkt(&mut s.pipe.client, &mut buf[..len]).unwrap();
5871
5872        let mut iter = frames.iter();
5873
5874        assert_eq!(
5875            iter.next(),
5876            Some(&crate::frame::Frame::StreamDataBlocked {
5877                stream_id: 0,
5878                limit: 100,
5879            })
5880        );
5881    }
5882
5883    #[test]
5884    /// Ensure stream doesn't hang due to small cwnd.
5885    fn send_body_stream_blocked_by_small_cwnd() {
5886        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
5887        config
5888            .load_cert_chain_from_pem_file("examples/cert.crt")
5889            .unwrap();
5890        config
5891            .load_priv_key_from_pem_file("examples/cert.key")
5892            .unwrap();
5893        config.set_application_protos(&[b"h3"]).unwrap();
5894        config.set_initial_max_data(100000); // large connection-level flow control
5895        config.set_initial_max_stream_data_bidi_local(100000);
5896        config.set_initial_max_stream_data_bidi_remote(50000);
5897        config.set_initial_max_stream_data_uni(150);
5898        config.set_initial_max_streams_bidi(100);
5899        config.set_initial_max_streams_uni(5);
5900        config.verify_peer(false);
5901
5902        let h3_config = Config::new().unwrap();
5903
5904        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
5905
5906        s.handshake().unwrap();
5907
5908        let (stream, req) = s.send_request(true).unwrap();
5909
5910        let ev_headers = Event::Headers {
5911            list: req,
5912            more_frames: false,
5913        };
5914
5915        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
5916        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
5917
5918        let _ = s.send_response(stream, false).unwrap();
5919
5920        // Clear the writable stream queue.
5921        assert_eq!(s.pipe.server.stream_writable_next(), Some(3));
5922        assert_eq!(s.pipe.server.stream_writable_next(), Some(7));
5923        assert_eq!(s.pipe.server.stream_writable_next(), Some(11));
5924        assert_eq!(s.pipe.server.stream_writable_next(), Some(stream));
5925        assert_eq!(s.pipe.server.stream_writable_next(), None);
5926
5927        // The body must be larger than the cwnd would allow.
5928        let send_buf = [42; 80000];
5929
5930        let sent = s
5931            .server
5932            .send_body(&mut s.pipe.server, stream, &send_buf, true)
5933            .unwrap();
5934
5935        // send_body wrote as much as it could (sent < size of buff).
5936        assert_eq!(sent, 11995);
5937
5938        s.advance().ok();
5939
5940        // Client reads received headers and body.
5941        let mut recv_buf = [42; 80000];
5942        assert!(s.poll_client().is_ok());
5943        assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
5944        assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(11995));
5945
5946        s.advance().ok();
5947
5948        // Server send cap is smaller than remaining body buffer.
5949        assert!(s.pipe.server.tx_cap < send_buf.len() - sent);
5950
5951        // Once the server cwnd opens up, we can send more body.
5952        assert_eq!(s.pipe.server.stream_writable_next(), Some(0));
5953    }
5954
5955    #[test]
5956    /// Ensure stream doesn't hang due to small cwnd.
5957    fn send_body_stream_blocked_zero_length() {
5958        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
5959        config
5960            .load_cert_chain_from_pem_file("examples/cert.crt")
5961            .unwrap();
5962        config
5963            .load_priv_key_from_pem_file("examples/cert.key")
5964            .unwrap();
5965        config.set_application_protos(&[b"h3"]).unwrap();
5966        config.set_initial_max_data(100000); // large connection-level flow control
5967        config.set_initial_max_stream_data_bidi_local(100000);
5968        config.set_initial_max_stream_data_bidi_remote(50000);
5969        config.set_initial_max_stream_data_uni(150);
5970        config.set_initial_max_streams_bidi(100);
5971        config.set_initial_max_streams_uni(5);
5972        config.verify_peer(false);
5973
5974        let h3_config = Config::new().unwrap();
5975
5976        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
5977
5978        s.handshake().unwrap();
5979
5980        let (stream, req) = s.send_request(true).unwrap();
5981
5982        let ev_headers = Event::Headers {
5983            list: req,
5984            more_frames: false,
5985        };
5986
5987        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
5988        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
5989
5990        let _ = s.send_response(stream, false).unwrap();
5991
5992        // Clear the writable stream queue.
5993        assert_eq!(s.pipe.server.stream_writable_next(), Some(3));
5994        assert_eq!(s.pipe.server.stream_writable_next(), Some(7));
5995        assert_eq!(s.pipe.server.stream_writable_next(), Some(11));
5996        assert_eq!(s.pipe.server.stream_writable_next(), Some(stream));
5997        assert_eq!(s.pipe.server.stream_writable_next(), None);
5998
5999        // The body is large enough to fill the cwnd, except for enough bytes
6000        // for another DATA frame header (but no payload).
6001        let send_buf = [42; 11994];
6002
6003        let sent = s
6004            .server
6005            .send_body(&mut s.pipe.server, stream, &send_buf, false)
6006            .unwrap();
6007
6008        assert_eq!(sent, 11994);
6009
6010        // There is only enough capacity left for the DATA frame header, but
6011        // no payload.
6012        assert_eq!(s.pipe.server.stream_capacity(stream).unwrap(), 3);
6013        assert_eq!(
6014            s.server
6015                .send_body(&mut s.pipe.server, stream, &send_buf, false),
6016            Err(Error::Done)
6017        );
6018
6019        s.advance().ok();
6020
6021        // Client reads received headers and body.
6022        let mut recv_buf = [42; 80000];
6023        assert!(s.poll_client().is_ok());
6024        assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
6025        assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(11994));
6026
6027        s.advance().ok();
6028
6029        // Once the server cwnd opens up, we can send more body.
6030        assert_eq!(s.pipe.server.stream_writable_next(), Some(0));
6031    }
6032
6033    #[test]
6034    /// Test handling of 0-length DATA writes with and without fin.
6035    fn zero_length_data() {
6036        let mut s = Session::new().unwrap();
6037        s.handshake().unwrap();
6038
6039        let (stream, req) = s.send_request(false).unwrap();
6040
6041        assert_eq!(
6042            s.client.send_body(&mut s.pipe.client, 0, b"", false),
6043            Err(Error::Done)
6044        );
6045        assert_eq!(s.client.send_body(&mut s.pipe.client, 0, b"", true), Ok(0));
6046
6047        s.advance().ok();
6048
6049        let mut recv_buf = vec![0; 100];
6050
6051        let ev_headers = Event::Headers {
6052            list: req,
6053            more_frames: true,
6054        };
6055
6056        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
6057
6058        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
6059        assert_eq!(s.recv_body_server(stream, &mut recv_buf), Err(Error::Done));
6060
6061        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
6062        assert_eq!(s.poll_server(), Err(Error::Done));
6063
6064        let resp = s.send_response(stream, false).unwrap();
6065
6066        assert_eq!(
6067            s.server.send_body(&mut s.pipe.server, 0, b"", false),
6068            Err(Error::Done)
6069        );
6070        assert_eq!(s.server.send_body(&mut s.pipe.server, 0, b"", true), Ok(0));
6071
6072        s.advance().ok();
6073
6074        let ev_headers = Event::Headers {
6075            list: resp,
6076            more_frames: true,
6077        };
6078
6079        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
6080
6081        assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
6082        assert_eq!(s.recv_body_client(stream, &mut recv_buf), Err(Error::Done));
6083
6084        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
6085        assert_eq!(s.poll_client(), Err(Error::Done));
6086    }
6087
6088    #[test]
6089    /// Tests that blocked 0-length DATA writes are reported correctly.
6090    fn zero_length_data_blocked() {
6091        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
6092        config
6093            .load_cert_chain_from_pem_file("examples/cert.crt")
6094            .unwrap();
6095        config
6096            .load_priv_key_from_pem_file("examples/cert.key")
6097            .unwrap();
6098        config.set_application_protos(&[b"h3"]).unwrap();
6099        config.set_initial_max_data(69);
6100        config.set_initial_max_stream_data_bidi_local(150);
6101        config.set_initial_max_stream_data_bidi_remote(150);
6102        config.set_initial_max_stream_data_uni(150);
6103        config.set_initial_max_streams_bidi(100);
6104        config.set_initial_max_streams_uni(5);
6105        config.verify_peer(false);
6106
6107        let h3_config = Config::new().unwrap();
6108
6109        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
6110
6111        s.handshake().unwrap();
6112
6113        let req = vec![
6114            Header::new(b":method", b"GET"),
6115            Header::new(b":scheme", b"https"),
6116            Header::new(b":authority", b"quic.tech"),
6117            Header::new(b":path", b"/test"),
6118        ];
6119
6120        assert_eq!(
6121            s.client.send_request(&mut s.pipe.client, &req, false),
6122            Ok(0)
6123        );
6124
6125        assert_eq!(
6126            s.client.send_body(&mut s.pipe.client, 0, b"", true),
6127            Err(Error::Done)
6128        );
6129
6130        // Clear the writable stream queue.
6131        assert_eq!(s.pipe.client.stream_writable_next(), Some(2));
6132        assert_eq!(s.pipe.client.stream_writable_next(), Some(6));
6133        assert_eq!(s.pipe.client.stream_writable_next(), Some(10));
6134        assert_eq!(s.pipe.client.stream_writable_next(), None);
6135
6136        s.advance().ok();
6137
6138        // Once the server gives flow control credits back, we can send the body.
6139        assert_eq!(s.pipe.client.stream_writable_next(), Some(0));
6140        assert_eq!(s.client.send_body(&mut s.pipe.client, 0, b"", true), Ok(0));
6141    }
6142
6143    #[test]
6144    /// Tests that receiving an empty SETTINGS frame is handled and reported.
6145    fn empty_settings() {
6146        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
6147        config
6148            .load_cert_chain_from_pem_file("examples/cert.crt")
6149            .unwrap();
6150        config
6151            .load_priv_key_from_pem_file("examples/cert.key")
6152            .unwrap();
6153        config.set_application_protos(&[b"h3"]).unwrap();
6154        config.set_initial_max_data(1500);
6155        config.set_initial_max_stream_data_bidi_local(150);
6156        config.set_initial_max_stream_data_bidi_remote(150);
6157        config.set_initial_max_stream_data_uni(150);
6158        config.set_initial_max_streams_bidi(5);
6159        config.set_initial_max_streams_uni(5);
6160        config.verify_peer(false);
6161        config.set_ack_delay_exponent(8);
6162        config.grease(false);
6163
6164        let h3_config = Config::new().unwrap();
6165        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
6166
6167        s.handshake().unwrap();
6168
6169        assert!(s.client.peer_settings_raw().is_some());
6170        assert!(s.server.peer_settings_raw().is_some());
6171    }
6172
6173    #[test]
6174    /// Tests that receiving a H3_DATAGRAM setting is ok.
6175    fn dgram_setting() {
6176        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
6177        config
6178            .load_cert_chain_from_pem_file("examples/cert.crt")
6179            .unwrap();
6180        config
6181            .load_priv_key_from_pem_file("examples/cert.key")
6182            .unwrap();
6183        config.set_application_protos(&[b"h3"]).unwrap();
6184        config.set_initial_max_data(70);
6185        config.set_initial_max_stream_data_bidi_local(150);
6186        config.set_initial_max_stream_data_bidi_remote(150);
6187        config.set_initial_max_stream_data_uni(150);
6188        config.set_initial_max_streams_bidi(100);
6189        config.set_initial_max_streams_uni(5);
6190        config.enable_dgram(true, 1000, 1000);
6191        config.verify_peer(false);
6192
6193        let h3_config = Config::new().unwrap();
6194
6195        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
6196        assert_eq!(s.pipe.handshake(), Ok(()));
6197
6198        s.client.send_settings(&mut s.pipe.client).unwrap();
6199        assert_eq!(s.pipe.advance(), Ok(()));
6200
6201        // Before processing SETTINGS (via poll), HTTP/3 DATAGRAMS are not
6202        // enabled.
6203        assert!(!s.server.dgram_enabled_by_peer(&s.pipe.server));
6204
6205        // When everything is ok, poll returns Done and DATAGRAM is enabled.
6206        assert_eq!(s.server.poll(&mut s.pipe.server), Err(Error::Done));
6207        assert!(s.server.dgram_enabled_by_peer(&s.pipe.server));
6208
6209        // Now detect things on the client
6210        s.server.send_settings(&mut s.pipe.server).unwrap();
6211        assert_eq!(s.pipe.advance(), Ok(()));
6212        assert!(!s.client.dgram_enabled_by_peer(&s.pipe.client));
6213        assert_eq!(s.client.poll(&mut s.pipe.client), Err(Error::Done));
6214        assert!(s.client.dgram_enabled_by_peer(&s.pipe.client));
6215    }
6216
6217    #[test]
6218    /// Tests that receiving a H3_DATAGRAM setting when no TP is set generates
6219    /// an error.
6220    fn dgram_setting_no_tp() {
6221        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
6222        config
6223            .load_cert_chain_from_pem_file("examples/cert.crt")
6224            .unwrap();
6225        config
6226            .load_priv_key_from_pem_file("examples/cert.key")
6227            .unwrap();
6228        config.set_application_protos(&[b"h3"]).unwrap();
6229        config.set_initial_max_data(70);
6230        config.set_initial_max_stream_data_bidi_local(150);
6231        config.set_initial_max_stream_data_bidi_remote(150);
6232        config.set_initial_max_stream_data_uni(150);
6233        config.set_initial_max_streams_bidi(100);
6234        config.set_initial_max_streams_uni(5);
6235        config.verify_peer(false);
6236
6237        let h3_config = Config::new().unwrap();
6238
6239        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
6240        assert_eq!(s.pipe.handshake(), Ok(()));
6241
6242        s.client.control_stream_id = Some(
6243            s.client
6244                .open_uni_stream(
6245                    &mut s.pipe.client,
6246                    stream::HTTP3_CONTROL_STREAM_TYPE_ID,
6247                )
6248                .unwrap(),
6249        );
6250
6251        let settings = frame::Frame::Settings {
6252            max_field_section_size: None,
6253            qpack_max_table_capacity: None,
6254            qpack_blocked_streams: None,
6255            connect_protocol_enabled: None,
6256            h3_datagram: Some(1),
6257            grease: None,
6258            additional_settings: Default::default(),
6259            raw: Default::default(),
6260        };
6261
6262        s.send_frame_client(settings, s.client.control_stream_id.unwrap(), false)
6263            .unwrap();
6264
6265        assert_eq!(s.pipe.advance(), Ok(()));
6266
6267        assert_eq!(s.server.poll(&mut s.pipe.server), Err(Error::SettingsError));
6268    }
6269
6270    #[test]
6271    /// Tests that receiving SETTINGS with prohibited values generates an error.
6272    fn settings_h2_prohibited() {
6273        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
6274        config
6275            .load_cert_chain_from_pem_file("examples/cert.crt")
6276            .unwrap();
6277        config
6278            .load_priv_key_from_pem_file("examples/cert.key")
6279            .unwrap();
6280        config.set_application_protos(&[b"h3"]).unwrap();
6281        config.set_initial_max_data(70);
6282        config.set_initial_max_stream_data_bidi_local(150);
6283        config.set_initial_max_stream_data_bidi_remote(150);
6284        config.set_initial_max_stream_data_uni(150);
6285        config.set_initial_max_streams_bidi(100);
6286        config.set_initial_max_streams_uni(5);
6287        config.verify_peer(false);
6288
6289        let h3_config = Config::new().unwrap();
6290
6291        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
6292        assert_eq!(s.pipe.handshake(), Ok(()));
6293
6294        s.client.control_stream_id = Some(
6295            s.client
6296                .open_uni_stream(
6297                    &mut s.pipe.client,
6298                    stream::HTTP3_CONTROL_STREAM_TYPE_ID,
6299                )
6300                .unwrap(),
6301        );
6302
6303        s.server.control_stream_id = Some(
6304            s.server
6305                .open_uni_stream(
6306                    &mut s.pipe.server,
6307                    stream::HTTP3_CONTROL_STREAM_TYPE_ID,
6308                )
6309                .unwrap(),
6310        );
6311
6312        let frame_payload_len = 2u64;
6313        let settings = [
6314            frame::SETTINGS_FRAME_TYPE_ID as u8,
6315            frame_payload_len as u8,
6316            0x2, // 0x2 is a reserved setting type
6317            1,
6318        ];
6319
6320        s.send_arbitrary_stream_data_client(
6321            &settings,
6322            s.client.control_stream_id.unwrap(),
6323            false,
6324        )
6325        .unwrap();
6326
6327        s.send_arbitrary_stream_data_server(
6328            &settings,
6329            s.server.control_stream_id.unwrap(),
6330            false,
6331        )
6332        .unwrap();
6333
6334        assert_eq!(s.pipe.advance(), Ok(()));
6335
6336        assert_eq!(s.server.poll(&mut s.pipe.server), Err(Error::SettingsError));
6337
6338        assert_eq!(s.client.poll(&mut s.pipe.client), Err(Error::SettingsError));
6339    }
6340
6341    #[test]
6342    /// Tests that setting SETTINGS with prohibited values generates an error.
6343    fn set_prohibited_additional_settings() {
6344        let mut h3_config = Config::new().unwrap();
6345        assert_eq!(
6346            h3_config.set_additional_settings(vec![(
6347                frame::SETTINGS_QPACK_MAX_TABLE_CAPACITY,
6348                43
6349            )]),
6350            Err(Error::SettingsError)
6351        );
6352        assert_eq!(
6353            h3_config.set_additional_settings(vec![(
6354                frame::SETTINGS_MAX_FIELD_SECTION_SIZE,
6355                43
6356            )]),
6357            Err(Error::SettingsError)
6358        );
6359        assert_eq!(
6360            h3_config.set_additional_settings(vec![(
6361                frame::SETTINGS_QPACK_BLOCKED_STREAMS,
6362                43
6363            )]),
6364            Err(Error::SettingsError)
6365        );
6366        assert_eq!(
6367            h3_config.set_additional_settings(vec![(
6368                frame::SETTINGS_ENABLE_CONNECT_PROTOCOL,
6369                43
6370            )]),
6371            Err(Error::SettingsError)
6372        );
6373        assert_eq!(
6374            h3_config
6375                .set_additional_settings(vec![(frame::SETTINGS_H3_DATAGRAM, 43)]),
6376            Err(Error::SettingsError)
6377        );
6378    }
6379
6380    #[test]
6381    /// Tests additional settings are actually exchanged by the peers.
6382    fn set_additional_settings() {
6383        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
6384        config
6385            .load_cert_chain_from_pem_file("examples/cert.crt")
6386            .unwrap();
6387        config
6388            .load_priv_key_from_pem_file("examples/cert.key")
6389            .unwrap();
6390        config.set_application_protos(&[b"h3"]).unwrap();
6391        config.set_initial_max_data(70);
6392        config.set_initial_max_stream_data_bidi_local(150);
6393        config.set_initial_max_stream_data_bidi_remote(150);
6394        config.set_initial_max_stream_data_uni(150);
6395        config.set_initial_max_streams_bidi(100);
6396        config.set_initial_max_streams_uni(5);
6397        config.verify_peer(false);
6398        config.grease(false);
6399
6400        let mut h3_config = Config::new().unwrap();
6401        h3_config
6402            .set_additional_settings(vec![(42, 43), (44, 45)])
6403            .unwrap();
6404
6405        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
6406        assert_eq!(s.pipe.handshake(), Ok(()));
6407
6408        assert_eq!(s.pipe.advance(), Ok(()));
6409
6410        s.client.send_settings(&mut s.pipe.client).unwrap();
6411        assert_eq!(s.pipe.advance(), Ok(()));
6412        assert_eq!(s.server.poll(&mut s.pipe.server), Err(Error::Done));
6413
6414        s.server.send_settings(&mut s.pipe.server).unwrap();
6415        assert_eq!(s.pipe.advance(), Ok(()));
6416        assert_eq!(s.client.poll(&mut s.pipe.client), Err(Error::Done));
6417
6418        assert_eq!(
6419            s.server.peer_settings_raw(),
6420            Some(&[(42, 43), (44, 45)][..])
6421        );
6422        assert_eq!(
6423            s.client.peer_settings_raw(),
6424            Some(&[(42, 43), (44, 45)][..])
6425        );
6426    }
6427
6428    #[test]
6429    /// Send a single DATAGRAM.
6430    fn single_dgram() {
6431        let mut buf = [0; 65535];
6432        let mut s = Session::new().unwrap();
6433        s.handshake().unwrap();
6434
6435        // We'll send default data of 10 bytes on flow ID 0.
6436        let result = (11, 0, 1);
6437
6438        s.send_dgram_client(0).unwrap();
6439
6440        assert_eq!(s.poll_server(), Err(Error::Done));
6441        assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
6442
6443        s.send_dgram_server(0).unwrap();
6444        assert_eq!(s.poll_client(), Err(Error::Done));
6445        assert_eq!(s.recv_dgram_client(&mut buf), Ok(result));
6446    }
6447
6448    #[test]
6449    /// Send multiple DATAGRAMs.
6450    fn multiple_dgram() {
6451        let mut buf = [0; 65535];
6452        let mut s = Session::new().unwrap();
6453        s.handshake().unwrap();
6454
6455        // We'll send default data of 10 bytes on flow ID 0.
6456        let result = (11, 0, 1);
6457
6458        s.send_dgram_client(0).unwrap();
6459        s.send_dgram_client(0).unwrap();
6460        s.send_dgram_client(0).unwrap();
6461
6462        assert_eq!(s.poll_server(), Err(Error::Done));
6463        assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
6464        assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
6465        assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
6466        assert_eq!(s.recv_dgram_server(&mut buf), Err(Error::Done));
6467
6468        s.send_dgram_server(0).unwrap();
6469        s.send_dgram_server(0).unwrap();
6470        s.send_dgram_server(0).unwrap();
6471
6472        assert_eq!(s.poll_client(), Err(Error::Done));
6473        assert_eq!(s.recv_dgram_client(&mut buf), Ok(result));
6474        assert_eq!(s.recv_dgram_client(&mut buf), Ok(result));
6475        assert_eq!(s.recv_dgram_client(&mut buf), Ok(result));
6476        assert_eq!(s.recv_dgram_client(&mut buf), Err(Error::Done));
6477    }
6478
6479    #[test]
6480    /// Send more DATAGRAMs than the send queue allows.
6481    fn multiple_dgram_overflow() {
6482        let mut buf = [0; 65535];
6483        let mut s = Session::new().unwrap();
6484        s.handshake().unwrap();
6485
6486        // We'll send default data of 10 bytes on flow ID 0.
6487        let result = (11, 0, 1);
6488
6489        // Five DATAGRAMs
6490        s.send_dgram_client(0).unwrap();
6491        s.send_dgram_client(0).unwrap();
6492        s.send_dgram_client(0).unwrap();
6493        s.send_dgram_client(0).unwrap();
6494        s.send_dgram_client(0).unwrap();
6495
6496        // Only 3 independent DATAGRAMs to read events will fire.
6497        assert_eq!(s.poll_server(), Err(Error::Done));
6498        assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
6499        assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
6500        assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
6501        assert_eq!(s.recv_dgram_server(&mut buf), Err(Error::Done));
6502    }
6503
6504    #[test]
6505    /// Send a single DATAGRAM and request.
6506    fn poll_datagram_cycling_no_read() {
6507        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
6508        config
6509            .load_cert_chain_from_pem_file("examples/cert.crt")
6510            .unwrap();
6511        config
6512            .load_priv_key_from_pem_file("examples/cert.key")
6513            .unwrap();
6514        config.set_application_protos(&[b"h3"]).unwrap();
6515        config.set_initial_max_data(1500);
6516        config.set_initial_max_stream_data_bidi_local(150);
6517        config.set_initial_max_stream_data_bidi_remote(150);
6518        config.set_initial_max_stream_data_uni(150);
6519        config.set_initial_max_streams_bidi(100);
6520        config.set_initial_max_streams_uni(5);
6521        config.verify_peer(false);
6522        config.enable_dgram(true, 100, 100);
6523
6524        let h3_config = Config::new().unwrap();
6525        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
6526        s.handshake().unwrap();
6527
6528        // Send request followed by DATAGRAM on client side.
6529        let (stream, req) = s.send_request(false).unwrap();
6530
6531        s.send_body_client(stream, true).unwrap();
6532
6533        let ev_headers = Event::Headers {
6534            list: req,
6535            more_frames: true,
6536        };
6537
6538        s.send_dgram_client(0).unwrap();
6539
6540        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
6541        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
6542
6543        assert_eq!(s.poll_server(), Err(Error::Done));
6544    }
6545
6546    #[test]
6547    /// Send a single DATAGRAM and request.
6548    fn poll_datagram_single_read() {
6549        let mut buf = [0; 65535];
6550
6551        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
6552        config
6553            .load_cert_chain_from_pem_file("examples/cert.crt")
6554            .unwrap();
6555        config
6556            .load_priv_key_from_pem_file("examples/cert.key")
6557            .unwrap();
6558        config.set_application_protos(&[b"h3"]).unwrap();
6559        config.set_initial_max_data(1500);
6560        config.set_initial_max_stream_data_bidi_local(150);
6561        config.set_initial_max_stream_data_bidi_remote(150);
6562        config.set_initial_max_stream_data_uni(150);
6563        config.set_initial_max_streams_bidi(100);
6564        config.set_initial_max_streams_uni(5);
6565        config.verify_peer(false);
6566        config.enable_dgram(true, 100, 100);
6567
6568        let h3_config = Config::new().unwrap();
6569        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
6570        s.handshake().unwrap();
6571
6572        // We'll send default data of 10 bytes on flow ID 0.
6573        let result = (11, 0, 1);
6574
6575        // Send request followed by DATAGRAM on client side.
6576        let (stream, req) = s.send_request(false).unwrap();
6577
6578        let body = s.send_body_client(stream, true).unwrap();
6579
6580        let mut recv_buf = vec![0; body.len()];
6581
6582        let ev_headers = Event::Headers {
6583            list: req,
6584            more_frames: true,
6585        };
6586
6587        s.send_dgram_client(0).unwrap();
6588
6589        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
6590        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
6591
6592        assert_eq!(s.poll_server(), Err(Error::Done));
6593
6594        assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
6595
6596        assert_eq!(s.poll_server(), Err(Error::Done));
6597
6598        assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
6599        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
6600        assert_eq!(s.poll_server(), Err(Error::Done));
6601
6602        // Send response followed by DATAGRAM on server side
6603        let resp = s.send_response(stream, false).unwrap();
6604
6605        let body = s.send_body_server(stream, true).unwrap();
6606
6607        let mut recv_buf = vec![0; body.len()];
6608
6609        let ev_headers = Event::Headers {
6610            list: resp,
6611            more_frames: true,
6612        };
6613
6614        s.send_dgram_server(0).unwrap();
6615
6616        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
6617        assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
6618
6619        assert_eq!(s.poll_client(), Err(Error::Done));
6620
6621        assert_eq!(s.recv_dgram_client(&mut buf), Ok(result));
6622
6623        assert_eq!(s.poll_client(), Err(Error::Done));
6624
6625        assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(body.len()));
6626
6627        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
6628        assert_eq!(s.poll_client(), Err(Error::Done));
6629    }
6630
6631    #[test]
6632    /// Send multiple DATAGRAMs and requests.
6633    fn poll_datagram_multi_read() {
6634        let mut buf = [0; 65535];
6635
6636        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
6637        config
6638            .load_cert_chain_from_pem_file("examples/cert.crt")
6639            .unwrap();
6640        config
6641            .load_priv_key_from_pem_file("examples/cert.key")
6642            .unwrap();
6643        config.set_application_protos(&[b"h3"]).unwrap();
6644        config.set_initial_max_data(1500);
6645        config.set_initial_max_stream_data_bidi_local(150);
6646        config.set_initial_max_stream_data_bidi_remote(150);
6647        config.set_initial_max_stream_data_uni(150);
6648        config.set_initial_max_streams_bidi(100);
6649        config.set_initial_max_streams_uni(5);
6650        config.verify_peer(false);
6651        config.enable_dgram(true, 100, 100);
6652
6653        let h3_config = Config::new().unwrap();
6654        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
6655        s.handshake().unwrap();
6656
6657        // 10 bytes on flow ID 0 and 2.
6658        let flow_0_result = (11, 0, 1);
6659        let flow_2_result = (11, 2, 1);
6660
6661        // Send requests followed by DATAGRAMs on client side.
6662        let (stream, req) = s.send_request(false).unwrap();
6663
6664        let body = s.send_body_client(stream, true).unwrap();
6665
6666        let mut recv_buf = vec![0; body.len()];
6667
6668        let ev_headers = Event::Headers {
6669            list: req,
6670            more_frames: true,
6671        };
6672
6673        s.send_dgram_client(0).unwrap();
6674        s.send_dgram_client(0).unwrap();
6675        s.send_dgram_client(0).unwrap();
6676        s.send_dgram_client(0).unwrap();
6677        s.send_dgram_client(0).unwrap();
6678        s.send_dgram_client(2).unwrap();
6679        s.send_dgram_client(2).unwrap();
6680        s.send_dgram_client(2).unwrap();
6681        s.send_dgram_client(2).unwrap();
6682        s.send_dgram_client(2).unwrap();
6683
6684        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
6685        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
6686
6687        assert_eq!(s.poll_server(), Err(Error::Done));
6688
6689        // Second cycle, start to read
6690        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
6691        assert_eq!(s.poll_server(), Err(Error::Done));
6692        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
6693        assert_eq!(s.poll_server(), Err(Error::Done));
6694        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
6695        assert_eq!(s.poll_server(), Err(Error::Done));
6696
6697        assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
6698        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
6699
6700        assert_eq!(s.poll_server(), Err(Error::Done));
6701
6702        // Third cycle.
6703        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
6704        assert_eq!(s.poll_server(), Err(Error::Done));
6705        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
6706        assert_eq!(s.poll_server(), Err(Error::Done));
6707        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
6708        assert_eq!(s.poll_server(), Err(Error::Done));
6709        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
6710        assert_eq!(s.poll_server(), Err(Error::Done));
6711        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
6712        assert_eq!(s.poll_server(), Err(Error::Done));
6713        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
6714        assert_eq!(s.poll_server(), Err(Error::Done));
6715        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
6716        assert_eq!(s.poll_server(), Err(Error::Done));
6717
6718        // Send response followed by DATAGRAM on server side
6719        let resp = s.send_response(stream, false).unwrap();
6720
6721        let body = s.send_body_server(stream, true).unwrap();
6722
6723        let mut recv_buf = vec![0; body.len()];
6724
6725        let ev_headers = Event::Headers {
6726            list: resp,
6727            more_frames: true,
6728        };
6729
6730        s.send_dgram_server(0).unwrap();
6731        s.send_dgram_server(0).unwrap();
6732        s.send_dgram_server(0).unwrap();
6733        s.send_dgram_server(0).unwrap();
6734        s.send_dgram_server(0).unwrap();
6735        s.send_dgram_server(2).unwrap();
6736        s.send_dgram_server(2).unwrap();
6737        s.send_dgram_server(2).unwrap();
6738        s.send_dgram_server(2).unwrap();
6739        s.send_dgram_server(2).unwrap();
6740
6741        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
6742        assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
6743
6744        assert_eq!(s.poll_client(), Err(Error::Done));
6745
6746        // Second cycle, start to read
6747        assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_0_result));
6748        assert_eq!(s.poll_client(), Err(Error::Done));
6749        assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_0_result));
6750        assert_eq!(s.poll_client(), Err(Error::Done));
6751        assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_0_result));
6752        assert_eq!(s.poll_client(), Err(Error::Done));
6753
6754        assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(body.len()));
6755        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
6756
6757        assert_eq!(s.poll_client(), Err(Error::Done));
6758
6759        // Third cycle.
6760        assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_0_result));
6761        assert_eq!(s.poll_client(), Err(Error::Done));
6762        assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_0_result));
6763        assert_eq!(s.poll_client(), Err(Error::Done));
6764        assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_2_result));
6765        assert_eq!(s.poll_client(), Err(Error::Done));
6766        assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_2_result));
6767        assert_eq!(s.poll_client(), Err(Error::Done));
6768        assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_2_result));
6769        assert_eq!(s.poll_client(), Err(Error::Done));
6770        assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_2_result));
6771        assert_eq!(s.poll_client(), Err(Error::Done));
6772        assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_2_result));
6773        assert_eq!(s.poll_client(), Err(Error::Done));
6774    }
6775
6776    #[test]
6777    /// Tests that the Finished event is not issued for streams of unknown type
6778    /// (e.g. GREASE).
6779    fn finished_is_for_requests() {
6780        let mut s = Session::new().unwrap();
6781        s.handshake().unwrap();
6782
6783        assert_eq!(s.poll_client(), Err(Error::Done));
6784        assert_eq!(s.poll_server(), Err(Error::Done));
6785
6786        assert_eq!(s.client.open_grease_stream(&mut s.pipe.client), Ok(()));
6787        assert_eq!(s.pipe.advance(), Ok(()));
6788
6789        assert_eq!(s.poll_client(), Err(Error::Done));
6790        assert_eq!(s.poll_server(), Err(Error::Done));
6791    }
6792
6793    #[test]
6794    /// Tests that streams are marked as finished only once.
6795    fn finished_once() {
6796        let mut s = Session::new().unwrap();
6797        s.handshake().unwrap();
6798
6799        let (stream, req) = s.send_request(false).unwrap();
6800        let body = s.send_body_client(stream, true).unwrap();
6801
6802        let mut recv_buf = vec![0; body.len()];
6803
6804        let ev_headers = Event::Headers {
6805            list: req,
6806            more_frames: true,
6807        };
6808
6809        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
6810        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
6811
6812        assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
6813        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
6814
6815        assert_eq!(s.recv_body_server(stream, &mut recv_buf), Err(Error::Done));
6816        assert_eq!(s.poll_server(), Err(Error::Done));
6817    }
6818
6819    #[test]
6820    /// Tests that the Data event is properly re-armed.
6821    fn data_event_rearm() {
6822        let bytes = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
6823
6824        let mut s = Session::new().unwrap();
6825        s.handshake().unwrap();
6826
6827        let (r1_id, r1_hdrs) = s.send_request(false).unwrap();
6828
6829        let mut recv_buf = vec![0; bytes.len()];
6830
6831        let r1_ev_headers = Event::Headers {
6832            list: r1_hdrs,
6833            more_frames: true,
6834        };
6835
6836        // Manually send an incomplete DATA frame (i.e. the frame size is longer
6837        // than the actual data sent).
6838        {
6839            let mut d = [42; 10];
6840            let mut b = octets::OctetsMut::with_slice(&mut d);
6841
6842            b.put_varint(frame::DATA_FRAME_TYPE_ID).unwrap();
6843            b.put_varint(bytes.len() as u64).unwrap();
6844            let off = b.off();
6845            s.pipe.client.stream_send(r1_id, &d[..off], false).unwrap();
6846
6847            assert_eq!(
6848                s.pipe.client.stream_send(r1_id, &bytes[..5], false),
6849                Ok(5)
6850            );
6851
6852            s.advance().ok();
6853        }
6854
6855        assert_eq!(s.poll_server(), Ok((r1_id, r1_ev_headers)));
6856        assert_eq!(s.poll_server(), Ok((r1_id, Event::Data)));
6857        assert_eq!(s.poll_server(), Err(Error::Done));
6858
6859        // Read the available body data.
6860        assert_eq!(s.recv_body_server(r1_id, &mut recv_buf), Ok(5));
6861
6862        // Send the remaining DATA payload.
6863        assert_eq!(s.pipe.client.stream_send(r1_id, &bytes[5..], false), Ok(5));
6864        s.advance().ok();
6865
6866        assert_eq!(s.poll_server(), Ok((r1_id, Event::Data)));
6867        assert_eq!(s.poll_server(), Err(Error::Done));
6868
6869        // Read the rest of the body data.
6870        assert_eq!(s.recv_body_server(r1_id, &mut recv_buf), Ok(5));
6871        assert_eq!(s.poll_server(), Err(Error::Done));
6872
6873        // Send more data.
6874        let r1_body = s.send_body_client(r1_id, false).unwrap();
6875
6876        assert_eq!(s.poll_server(), Ok((r1_id, Event::Data)));
6877        assert_eq!(s.poll_server(), Err(Error::Done));
6878
6879        assert_eq!(s.recv_body_server(r1_id, &mut recv_buf), Ok(r1_body.len()));
6880
6881        // Send a new request to ensure cross-stream events don't break rearming.
6882        let (r2_id, r2_hdrs) = s.send_request(false).unwrap();
6883        let r2_ev_headers = Event::Headers {
6884            list: r2_hdrs,
6885            more_frames: true,
6886        };
6887        let r2_body = s.send_body_client(r2_id, false).unwrap();
6888
6889        s.advance().ok();
6890
6891        assert_eq!(s.poll_server(), Ok((r2_id, r2_ev_headers)));
6892        assert_eq!(s.poll_server(), Ok((r2_id, Event::Data)));
6893        assert_eq!(s.recv_body_server(r2_id, &mut recv_buf), Ok(r2_body.len()));
6894        assert_eq!(s.poll_server(), Err(Error::Done));
6895
6896        // Send more data on request 1, then trailing HEADERS.
6897        let r1_body = s.send_body_client(r1_id, false).unwrap();
6898
6899        let trailers = vec![Header::new(b"hello", b"world")];
6900
6901        s.client
6902            .send_headers(&mut s.pipe.client, r1_id, &trailers, true)
6903            .unwrap();
6904
6905        let r1_ev_trailers = Event::Headers {
6906            list: trailers.clone(),
6907            more_frames: false,
6908        };
6909
6910        s.advance().ok();
6911
6912        assert_eq!(s.poll_server(), Ok((r1_id, Event::Data)));
6913        assert_eq!(s.recv_body_server(r1_id, &mut recv_buf), Ok(r1_body.len()));
6914
6915        assert_eq!(s.poll_server(), Ok((r1_id, r1_ev_trailers)));
6916        assert_eq!(s.poll_server(), Ok((r1_id, Event::Finished)));
6917        assert_eq!(s.poll_server(), Err(Error::Done));
6918
6919        // Send more data on request 2, then trailing HEADERS.
6920        let r2_body = s.send_body_client(r2_id, false).unwrap();
6921
6922        s.client
6923            .send_headers(&mut s.pipe.client, r2_id, &trailers, false)
6924            .unwrap();
6925
6926        let r2_ev_trailers = Event::Headers {
6927            list: trailers,
6928            more_frames: true,
6929        };
6930
6931        s.advance().ok();
6932
6933        assert_eq!(s.poll_server(), Ok((r2_id, Event::Data)));
6934        assert_eq!(s.recv_body_server(r2_id, &mut recv_buf), Ok(r2_body.len()));
6935        assert_eq!(s.poll_server(), Ok((r2_id, r2_ev_trailers)));
6936        assert_eq!(s.poll_server(), Err(Error::Done));
6937
6938        let (r3_id, r3_hdrs) = s.send_request(false).unwrap();
6939
6940        let r3_ev_headers = Event::Headers {
6941            list: r3_hdrs,
6942            more_frames: true,
6943        };
6944
6945        // Manually send an incomplete DATA frame (i.e. only the header is sent).
6946        {
6947            let mut d = [42; 10];
6948            let mut b = octets::OctetsMut::with_slice(&mut d);
6949
6950            b.put_varint(frame::DATA_FRAME_TYPE_ID).unwrap();
6951            b.put_varint(bytes.len() as u64).unwrap();
6952            let off = b.off();
6953            s.pipe.client.stream_send(r3_id, &d[..off], false).unwrap();
6954
6955            s.advance().ok();
6956        }
6957
6958        assert_eq!(s.poll_server(), Ok((r3_id, r3_ev_headers)));
6959        assert_eq!(s.poll_server(), Ok((r3_id, Event::Data)));
6960        assert_eq!(s.poll_server(), Err(Error::Done));
6961
6962        assert_eq!(s.recv_body_server(r3_id, &mut recv_buf), Err(Error::Done));
6963
6964        assert_eq!(s.pipe.client.stream_send(r3_id, &bytes[..5], false), Ok(5));
6965
6966        s.advance().ok();
6967
6968        assert_eq!(s.poll_server(), Ok((r3_id, Event::Data)));
6969        assert_eq!(s.poll_server(), Err(Error::Done));
6970
6971        assert_eq!(s.recv_body_server(r3_id, &mut recv_buf), Ok(5));
6972
6973        assert_eq!(s.pipe.client.stream_send(r3_id, &bytes[5..], false), Ok(5));
6974        s.advance().ok();
6975
6976        assert_eq!(s.poll_server(), Ok((r3_id, Event::Data)));
6977        assert_eq!(s.poll_server(), Err(Error::Done));
6978
6979        assert_eq!(s.recv_body_server(r3_id, &mut recv_buf), Ok(5));
6980
6981        // Buffer multiple data frames.
6982        let body = s.send_body_client(r3_id, false).unwrap();
6983        s.send_body_client(r3_id, false).unwrap();
6984        s.send_body_client(r3_id, false).unwrap();
6985
6986        assert_eq!(s.poll_server(), Ok((r3_id, Event::Data)));
6987        assert_eq!(s.poll_server(), Err(Error::Done));
6988
6989        {
6990            let mut d = [42; 10];
6991            let mut b = octets::OctetsMut::with_slice(&mut d);
6992
6993            b.put_varint(frame::DATA_FRAME_TYPE_ID).unwrap();
6994            b.put_varint(0).unwrap();
6995            let off = b.off();
6996            s.pipe.client.stream_send(r3_id, &d[..off], true).unwrap();
6997
6998            s.advance().ok();
6999        }
7000
7001        let mut recv_buf = vec![0; bytes.len() * 3];
7002
7003        assert_eq!(s.recv_body_server(r3_id, &mut recv_buf), Ok(body.len() * 3));
7004    }
7005
7006    #[test]
7007    /// Tests that the Datagram event is properly re-armed.
7008    fn dgram_event_rearm() {
7009        let mut buf = [0; 65535];
7010
7011        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
7012        config
7013            .load_cert_chain_from_pem_file("examples/cert.crt")
7014            .unwrap();
7015        config
7016            .load_priv_key_from_pem_file("examples/cert.key")
7017            .unwrap();
7018        config.set_application_protos(&[b"h3"]).unwrap();
7019        config.set_initial_max_data(1500);
7020        config.set_initial_max_stream_data_bidi_local(150);
7021        config.set_initial_max_stream_data_bidi_remote(150);
7022        config.set_initial_max_stream_data_uni(150);
7023        config.set_initial_max_streams_bidi(100);
7024        config.set_initial_max_streams_uni(5);
7025        config.verify_peer(false);
7026        config.enable_dgram(true, 100, 100);
7027
7028        let h3_config = Config::new().unwrap();
7029        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
7030        s.handshake().unwrap();
7031
7032        // 10 bytes on flow ID 0 and 2.
7033        let flow_0_result = (11, 0, 1);
7034        let flow_2_result = (11, 2, 1);
7035
7036        // Send requests followed by DATAGRAMs on client side.
7037        let (stream, req) = s.send_request(false).unwrap();
7038
7039        let body = s.send_body_client(stream, true).unwrap();
7040
7041        let mut recv_buf = vec![0; body.len()];
7042
7043        let ev_headers = Event::Headers {
7044            list: req,
7045            more_frames: true,
7046        };
7047
7048        s.send_dgram_client(0).unwrap();
7049        s.send_dgram_client(0).unwrap();
7050        s.send_dgram_client(2).unwrap();
7051        s.send_dgram_client(2).unwrap();
7052
7053        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
7054        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
7055
7056        assert_eq!(s.poll_server(), Err(Error::Done));
7057        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
7058
7059        assert_eq!(s.poll_server(), Err(Error::Done));
7060        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
7061
7062        assert_eq!(s.poll_server(), Err(Error::Done));
7063        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
7064
7065        assert_eq!(s.poll_server(), Err(Error::Done));
7066        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
7067
7068        assert_eq!(s.poll_server(), Err(Error::Done));
7069
7070        s.send_dgram_client(0).unwrap();
7071        s.send_dgram_client(2).unwrap();
7072
7073        assert_eq!(s.poll_server(), Err(Error::Done));
7074
7075        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
7076        assert_eq!(s.poll_server(), Err(Error::Done));
7077
7078        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
7079        assert_eq!(s.poll_server(), Err(Error::Done));
7080
7081        assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
7082        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
7083    }
7084
7085    #[test]
7086    fn reset_stream() {
7087        let mut buf = [0; 65535];
7088
7089        let mut s = Session::new().unwrap();
7090        s.handshake().unwrap();
7091
7092        // Client sends request.
7093        let (stream, req) = s.send_request(false).unwrap();
7094
7095        let ev_headers = Event::Headers {
7096            list: req,
7097            more_frames: true,
7098        };
7099
7100        // Server sends response and closes stream.
7101        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
7102        assert_eq!(s.poll_server(), Err(Error::Done));
7103
7104        let resp = s.send_response(stream, true).unwrap();
7105
7106        let ev_headers = Event::Headers {
7107            list: resp,
7108            more_frames: false,
7109        };
7110
7111        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
7112        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
7113        assert_eq!(s.poll_client(), Err(Error::Done));
7114
7115        // Client sends RESET_STREAM, closing stream.
7116        let frames = [crate::frame::Frame::ResetStream {
7117            stream_id: stream,
7118            error_code: 42,
7119            final_size: 68,
7120        }];
7121
7122        let pkt_type = crate::packet::Type::Short;
7123        assert_eq!(
7124            s.pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
7125            Ok(39)
7126        );
7127
7128        // Server issues Reset event for the stream.
7129        assert_eq!(s.poll_server(), Ok((stream, Event::Reset(42))));
7130        assert_eq!(s.poll_server(), Err(Error::Done));
7131
7132        // Sending RESET_STREAM again shouldn't trigger another Reset event.
7133        assert_eq!(
7134            s.pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
7135            Ok(39)
7136        );
7137
7138        assert_eq!(s.poll_server(), Err(Error::Done));
7139    }
7140
7141    #[test]
7142    fn reset_finished_at_server() {
7143        let mut s = Session::new().unwrap();
7144        s.handshake().unwrap();
7145
7146        // Client sends HEADERS and doesn't fin
7147        let (stream, _req) = s.send_request(false).unwrap();
7148
7149        // ..then Client sends RESET_STREAM
7150        assert_eq!(
7151            s.pipe.client.stream_shutdown(0, crate::Shutdown::Write, 0),
7152            Ok(())
7153        );
7154
7155        assert_eq!(s.pipe.advance(), Ok(()));
7156
7157        // Server receives just a reset
7158        assert_eq!(s.poll_server(), Ok((stream, Event::Reset(0))));
7159        assert_eq!(s.poll_server(), Err(Error::Done));
7160
7161        // Client sends HEADERS and fin
7162        let (stream, req) = s.send_request(true).unwrap();
7163
7164        // ..then Client sends RESET_STREAM
7165        assert_eq!(
7166            s.pipe.client.stream_shutdown(4, crate::Shutdown::Write, 0),
7167            Ok(())
7168        );
7169
7170        let ev_headers = Event::Headers {
7171            list: req,
7172            more_frames: false,
7173        };
7174
7175        // Server receives headers and fin.
7176        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
7177        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
7178        assert_eq!(s.poll_server(), Err(Error::Done));
7179    }
7180
7181    #[test]
7182    fn reset_finished_at_server_with_data_pending() {
7183        let mut s = Session::new().unwrap();
7184        s.handshake().unwrap();
7185
7186        // Client sends HEADERS and doesn't fin.
7187        let (stream, req) = s.send_request(false).unwrap();
7188
7189        assert!(s.send_body_client(stream, false).is_ok());
7190
7191        assert_eq!(s.pipe.advance(), Ok(()));
7192
7193        let ev_headers = Event::Headers {
7194            list: req,
7195            more_frames: true,
7196        };
7197
7198        // Server receives headers and data...
7199        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
7200        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
7201
7202        // ..then Client sends RESET_STREAM.
7203        assert_eq!(
7204            s.pipe
7205                .client
7206                .stream_shutdown(stream, crate::Shutdown::Write, 0),
7207            Ok(())
7208        );
7209
7210        assert_eq!(s.pipe.advance(), Ok(()));
7211
7212        // Server receives the reset and there are no more readable streams.
7213        assert_eq!(s.poll_server(), Ok((stream, Event::Reset(0))));
7214        assert_eq!(s.poll_server(), Err(Error::Done));
7215        assert_eq!(s.pipe.server.readable().len(), 0);
7216    }
7217
7218    #[test]
7219    fn reset_finished_at_client() {
7220        let mut buf = [0; 65535];
7221        let mut s = Session::new().unwrap();
7222        s.handshake().unwrap();
7223
7224        // Client sends HEADERS and doesn't fin
7225        let (stream, req) = s.send_request(false).unwrap();
7226
7227        let ev_headers = Event::Headers {
7228            list: req,
7229            more_frames: true,
7230        };
7231
7232        // Server receives headers.
7233        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
7234        assert_eq!(s.poll_server(), Err(Error::Done));
7235
7236        // Server sends response and doesn't fin
7237        s.send_response(stream, false).unwrap();
7238
7239        assert_eq!(s.pipe.advance(), Ok(()));
7240
7241        // .. then Server sends RESET_STREAM
7242        assert_eq!(
7243            s.pipe
7244                .server
7245                .stream_shutdown(stream, crate::Shutdown::Write, 0),
7246            Ok(())
7247        );
7248
7249        assert_eq!(s.pipe.advance(), Ok(()));
7250
7251        // Client receives Reset only
7252        assert_eq!(s.poll_client(), Ok((stream, Event::Reset(0))));
7253        assert_eq!(s.poll_server(), Err(Error::Done));
7254
7255        // Client sends headers and fin.
7256        let (stream, req) = s.send_request(true).unwrap();
7257
7258        let ev_headers = Event::Headers {
7259            list: req,
7260            more_frames: false,
7261        };
7262
7263        // Server receives headers and fin.
7264        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
7265        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
7266        assert_eq!(s.poll_server(), Err(Error::Done));
7267
7268        // Server sends response and fin
7269        let resp = s.send_response(stream, true).unwrap();
7270
7271        assert_eq!(s.pipe.advance(), Ok(()));
7272
7273        // ..then Server sends RESET_STREAM
7274        let frames = [crate::frame::Frame::ResetStream {
7275            stream_id: stream,
7276            error_code: 42,
7277            final_size: 68,
7278        }];
7279
7280        let pkt_type = crate::packet::Type::Short;
7281        assert_eq!(
7282            s.pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
7283            Ok(39)
7284        );
7285
7286        assert_eq!(s.pipe.advance(), Ok(()));
7287
7288        let ev_headers = Event::Headers {
7289            list: resp,
7290            more_frames: false,
7291        };
7292
7293        // Client receives headers and fin.
7294        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
7295        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
7296        assert_eq!(s.poll_client(), Err(Error::Done));
7297    }
7298}
7299
7300#[cfg(feature = "ffi")]
7301mod ffi;
7302#[cfg(feature = "internal")]
7303#[doc(hidden)]
7304pub mod frame;
7305#[cfg(not(feature = "internal"))]
7306mod frame;
7307#[doc(hidden)]
7308pub mod qpack;
7309mod stream;