quiche/h3/
mod.rs

1// Copyright (C) 2019, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27//! HTTP/3 wire protocol and QPACK implementation.
28//!
29//! This module provides a high level API for sending and receiving HTTP/3
30//! requests and responses on top of the QUIC transport protocol.
31//!
32//! ## Connection setup
33//!
34//! HTTP/3 connections require a QUIC transport-layer connection, see
35//! [Connection setup] for a full description of the setup process.
36//!
37//! To use HTTP/3, the QUIC connection must be configured with a suitable
38//! Application Layer Protocol Negotiation (ALPN) Protocol ID:
39//!
40//! ```
41//! let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION)?;
42//! config.set_application_protos(quiche::h3::APPLICATION_PROTOCOL)?;
43//! # Ok::<(), quiche::Error>(())
44//! ```
45//!
46//! The QUIC handshake is driven by [sending] and [receiving] QUIC packets.
47//!
48//! Once the handshake has completed, the first step in establishing an HTTP/3
49//! connection is creating its configuration object:
50//!
51//! ```
52//! let h3_config = quiche::h3::Config::new()?;
53//! # Ok::<(), quiche::h3::Error>(())
54//! ```
55//!
56//! HTTP/3 client and server connections are both created using the
57//! [`with_transport()`] function, the role is inferred from the type of QUIC
58//! connection:
59//!
60//! ```no_run
61//! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap();
62//! # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]);
63//! # let peer = "127.0.0.1:1234".parse().unwrap();
64//! # let local = "127.0.0.1:4321".parse().unwrap();
65//! # let mut conn = quiche::accept(&scid, None, local, peer, &mut config).unwrap();
66//! # let h3_config = quiche::h3::Config::new()?;
67//! let h3_conn = quiche::h3::Connection::with_transport(&mut conn, &h3_config)?;
68//! # Ok::<(), quiche::h3::Error>(())
69//! ```
70//!
71//! ## Sending a request
72//!
73//! An HTTP/3 client can send a request by using the connection's
74//! [`send_request()`] method to queue request headers; [sending] QUIC packets
75//! causes the requests to get sent to the peer:
76//!
77//! ```no_run
78//! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap();
79//! # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]);
80//! # let peer = "127.0.0.1:1234".parse().unwrap();
81//! # let local = "127.0.0.1:4321".parse().unwrap();
82//! # let mut conn = quiche::connect(None, &scid, local, peer, &mut config).unwrap();
83//! # let h3_config = quiche::h3::Config::new()?;
84//! # let mut h3_conn = quiche::h3::Connection::with_transport(&mut conn, &h3_config)?;
85//! let req = vec![
86//!     quiche::h3::Header::new(b":method", b"GET"),
87//!     quiche::h3::Header::new(b":scheme", b"https"),
88//!     quiche::h3::Header::new(b":authority", b"quic.tech"),
89//!     quiche::h3::Header::new(b":path", b"/"),
90//!     quiche::h3::Header::new(b"user-agent", b"quiche"),
91//! ];
92//!
93//! h3_conn.send_request(&mut conn, &req, true)?;
94//! # Ok::<(), quiche::h3::Error>(())
95//! ```
96//!
97//! An HTTP/3 client can send a request with additional body data by using
98//! the connection's [`send_body()`] method:
99//!
100//! ```no_run
101//! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap();
102//! # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]);
103//! # let peer = "127.0.0.1:1234".parse().unwrap();
104//! # let local = "127.0.0.1:4321".parse().unwrap();
105//! # let mut conn = quiche::connect(None, &scid, local, peer, &mut config).unwrap();
106//! # let h3_config = quiche::h3::Config::new()?;
107//! # let mut h3_conn = quiche::h3::Connection::with_transport(&mut conn, &h3_config)?;
108//! let req = vec![
109//!     quiche::h3::Header::new(b":method", b"GET"),
110//!     quiche::h3::Header::new(b":scheme", b"https"),
111//!     quiche::h3::Header::new(b":authority", b"quic.tech"),
112//!     quiche::h3::Header::new(b":path", b"/"),
113//!     quiche::h3::Header::new(b"user-agent", b"quiche"),
114//! ];
115//!
116//! let stream_id = h3_conn.send_request(&mut conn, &req, false)?;
117//! h3_conn.send_body(&mut conn, stream_id, b"Hello World!", true)?;
118//! # Ok::<(), quiche::h3::Error>(())
119//! ```
120//!
121//! ## Handling requests and responses
122//!
123//! After [receiving] QUIC packets, HTTP/3 data is processed using the
124//! connection's [`poll()`] method. On success, this returns an [`Event`] object
125//! and an ID corresponding to the stream where the `Event` originated.
126//!
127//! An HTTP/3 server uses [`poll()`] to read requests and responds to them using
128//! [`send_response()`] and [`send_body()`]:
129//!
130//! ```no_run
131//! use quiche::h3::NameValue;
132//!
133//! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap();
134//! # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]);
135//! # let peer = "127.0.0.1:1234".parse().unwrap();
136//! # let local = "127.0.0.1:1234".parse().unwrap();
137//! # let mut conn = quiche::accept(&scid, None, local, peer, &mut config).unwrap();
138//! # let h3_config = quiche::h3::Config::new()?;
139//! # let mut h3_conn = quiche::h3::Connection::with_transport(&mut conn, &h3_config)?;
140//! loop {
141//!     match h3_conn.poll(&mut conn) {
142//!         Ok((stream_id, quiche::h3::Event::Headers{list, more_frames})) => {
143//!             let mut headers = list.into_iter();
144//!
145//!             // Look for the request's method.
146//!             let method = headers.find(|h| h.name() == b":method").unwrap();
147//!
148//!             // Look for the request's path.
149//!             let path = headers.find(|h| h.name() == b":path").unwrap();
150//!
151//!             if method.value() == b"GET" && path.value() == b"/" {
152//!                 let resp = vec![
153//!                     quiche::h3::Header::new(b":status", 200.to_string().as_bytes()),
154//!                     quiche::h3::Header::new(b"server", b"quiche"),
155//!                 ];
156//!
157//!                 h3_conn.send_response(&mut conn, stream_id, &resp, false)?;
158//!                 h3_conn.send_body(&mut conn, stream_id, b"Hello World!", true)?;
159//!             }
160//!         },
161//!
162//!         Ok((stream_id, quiche::h3::Event::Data)) => {
163//!             // Request body data, handle it.
164//!             # return Ok(());
165//!         },
166//!
167//!         Ok((stream_id, quiche::h3::Event::Finished)) => {
168//!             // Peer terminated stream, handle it.
169//!         },
170//!
171//!         Ok((stream_id, quiche::h3::Event::Reset(err))) => {
172//!             // Peer reset the stream, handle it.
173//!         },
174//!
175//!         Ok((_flow_id, quiche::h3::Event::PriorityUpdate)) => (),
176//!
177//!         Ok((goaway_id, quiche::h3::Event::GoAway)) => {
178//!              // Peer signalled it is going away, handle it.
179//!         },
180//!
181//!         Err(quiche::h3::Error::Done) => {
182//!             // Done reading.
183//!             break;
184//!         },
185//!
186//!         Err(e) => {
187//!             // An error occurred, handle it.
188//!             break;
189//!         },
190//!     }
191//! }
192//! # Ok::<(), quiche::h3::Error>(())
193//! ```
194//!
195//! An HTTP/3 client uses [`poll()`] to read responses:
196//!
197//! ```no_run
198//! use quiche::h3::NameValue;
199//!
200//! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap();
201//! # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]);
202//! # let peer = "127.0.0.1:1234".parse().unwrap();
203//! # let local = "127.0.0.1:1234".parse().unwrap();
204//! # let mut conn = quiche::connect(None, &scid, local, peer, &mut config).unwrap();
205//! # let h3_config = quiche::h3::Config::new()?;
206//! # let mut h3_conn = quiche::h3::Connection::with_transport(&mut conn, &h3_config)?;
207//! loop {
208//!     match h3_conn.poll(&mut conn) {
209//!         Ok((stream_id, quiche::h3::Event::Headers{list, more_frames})) => {
210//!             let status = list.iter().find(|h| h.name() == b":status").unwrap();
211//!             println!("Received {} response on stream {}",
212//!                      std::str::from_utf8(status.value()).unwrap(),
213//!                      stream_id);
214//!         },
215//!
216//!         Ok((stream_id, quiche::h3::Event::Data)) => {
217//!             let mut body = vec![0; 4096];
218//!
219//!             // Consume all body data received on the stream.
220//!             while let Ok(read) =
221//!                 h3_conn.recv_body(&mut conn, stream_id, &mut body)
222//!             {
223//!                 println!("Received {} bytes of payload on stream {}",
224//!                          read, stream_id);
225//!             }
226//!         },
227//!
228//!         Ok((stream_id, quiche::h3::Event::Finished)) => {
229//!             // Peer terminated stream, handle it.
230//!         },
231//!
232//!         Ok((stream_id, quiche::h3::Event::Reset(err))) => {
233//!             // Peer reset the stream, handle it.
234//!         },
235//!
236//!         Ok((_prioritized_element_id, quiche::h3::Event::PriorityUpdate)) => (),
237//!
238//!         Ok((goaway_id, quiche::h3::Event::GoAway)) => {
239//!              // Peer signalled it is going away, handle it.
240//!         },
241//!
242//!         Err(quiche::h3::Error::Done) => {
243//!             // Done reading.
244//!             break;
245//!         },
246//!
247//!         Err(e) => {
248//!             // An error occurred, handle it.
249//!             break;
250//!         },
251//!     }
252//! }
253//! # Ok::<(), quiche::h3::Error>(())
254//! ```
255//!
256//! ## Detecting end of request or response
257//!
258//! A single HTTP/3 request or response may consist of several HEADERS and DATA
259//! frames; it is finished when the QUIC stream is closed. Calling [`poll()`]
260//! repeatedly will generate an [`Event`] for each of these. The application may
261//! use these event to do additional HTTP semantic validation.
262//!
263//! ## HTTP/3 protocol errors
264//!
265//! Quiche is responsible for managing the HTTP/3 connection, ensuring it is in
266//! a correct state and validating all messages received by a peer. This mainly
267//! takes place in the [`poll()`] method. If an HTTP/3 error occurs, quiche will
268//! close the connection and send an appropriate CONNECTION_CLOSE frame to the
269//! peer. An [`Error`] is returned to the application so that it can perform any
270//! required tidy up such as closing sockets.
271//!
272//! [`application_proto()`]: ../struct.Connection.html#method.application_proto
273//! [`stream_finished()`]: ../struct.Connection.html#method.stream_finished
274//! [Connection setup]: ../index.html#connection-setup
275//! [sending]: ../index.html#generating-outgoing-packets
276//! [receiving]: ../index.html#handling-incoming-packets
277//! [`with_transport()`]: struct.Connection.html#method.with_transport
278//! [`poll()`]: struct.Connection.html#method.poll
279//! [`Event`]: enum.Event.html
280//! [`Error`]: enum.Error.html
281//! [`send_request()`]: struct.Connection.html#method.send_response
282//! [`send_response()`]: struct.Connection.html#method.send_response
283//! [`send_body()`]: struct.Connection.html#method.send_body
284
285use std::collections::HashSet;
286use std::collections::VecDeque;
287
288#[cfg(feature = "sfv")]
289use std::convert::TryFrom;
290use std::fmt;
291use std::fmt::Write;
292
293#[cfg(feature = "qlog")]
294use qlog::events::h3::H3FrameCreated;
295#[cfg(feature = "qlog")]
296use qlog::events::h3::H3FrameParsed;
297#[cfg(feature = "qlog")]
298use qlog::events::h3::H3Owner;
299#[cfg(feature = "qlog")]
300use qlog::events::h3::H3PriorityTargetStreamType;
301#[cfg(feature = "qlog")]
302use qlog::events::h3::H3StreamType;
303#[cfg(feature = "qlog")]
304use qlog::events::h3::H3StreamTypeSet;
305#[cfg(feature = "qlog")]
306use qlog::events::h3::Http3EventType;
307#[cfg(feature = "qlog")]
308use qlog::events::h3::Http3Frame;
309#[cfg(feature = "qlog")]
310use qlog::events::EventData;
311#[cfg(feature = "qlog")]
312use qlog::events::EventImportance;
313#[cfg(feature = "qlog")]
314use qlog::events::EventType;
315
316use crate::range_buf::BufFactory;
317use crate::BufSplit;
318
319/// List of ALPN tokens of supported HTTP/3 versions.
320///
321/// This can be passed directly to the [`Config::set_application_protos()`]
322/// method when implementing HTTP/3 applications.
323///
324/// [`Config::set_application_protos()`]:
325/// ../struct.Config.html#method.set_application_protos
326pub const APPLICATION_PROTOCOL: &[&[u8]] = &[b"h3"];
327
328// The offset used when converting HTTP/3 urgency to quiche urgency.
329const PRIORITY_URGENCY_OFFSET: u8 = 124;
330
331// Parameter values as specified in [Extensible Priorities].
332//
333// [Extensible Priorities]: https://www.rfc-editor.org/rfc/rfc9218.html#section-4.
334const PRIORITY_URGENCY_LOWER_BOUND: u8 = 0;
335const PRIORITY_URGENCY_UPPER_BOUND: u8 = 7;
336const PRIORITY_URGENCY_DEFAULT: u8 = 3;
337const PRIORITY_INCREMENTAL_DEFAULT: bool = false;
338
339#[cfg(feature = "qlog")]
340const QLOG_FRAME_CREATED: EventType =
341    EventType::Http3EventType(Http3EventType::FrameCreated);
342#[cfg(feature = "qlog")]
343const QLOG_FRAME_PARSED: EventType =
344    EventType::Http3EventType(Http3EventType::FrameParsed);
345#[cfg(feature = "qlog")]
346const QLOG_STREAM_TYPE_SET: EventType =
347    EventType::Http3EventType(Http3EventType::StreamTypeSet);
348
349/// A specialized [`Result`] type for quiche HTTP/3 operations.
350///
351/// This type is used throughout quiche's HTTP/3 public API for any operation
352/// that can produce an error.
353///
354/// [`Result`]: https://doc.rust-lang.org/std/result/enum.Result.html
355pub type Result<T> = std::result::Result<T, Error>;
356
357/// An HTTP/3 error.
358#[derive(Clone, Copy, Debug, PartialEq, Eq)]
359pub enum Error {
360    /// There is no error or no work to do
361    Done,
362
363    /// The provided buffer is too short.
364    BufferTooShort,
365
366    /// Internal error in the HTTP/3 stack.
367    InternalError,
368
369    /// Endpoint detected that the peer is exhibiting behavior that causes.
370    /// excessive load.
371    ExcessiveLoad,
372
373    /// Stream ID or Push ID greater that current maximum was
374    /// used incorrectly, such as exceeding a limit, reducing a limit,
375    /// or being reused.
376    IdError,
377
378    /// The endpoint detected that its peer created a stream that it will not
379    /// accept.
380    StreamCreationError,
381
382    /// A required critical stream was closed.
383    ClosedCriticalStream,
384
385    /// No SETTINGS frame at beginning of control stream.
386    MissingSettings,
387
388    /// A frame was received which is not permitted in the current state.
389    FrameUnexpected,
390
391    /// Frame violated layout or size rules.
392    FrameError,
393
394    /// QPACK Header block decompression failure.
395    QpackDecompressionFailed,
396
397    /// Error originated from the transport layer.
398    TransportError(crate::Error),
399
400    /// The underlying QUIC stream (or connection) doesn't have enough capacity
401    /// for the operation to complete. The application should retry later on.
402    StreamBlocked,
403
404    /// Error in the payload of a SETTINGS frame.
405    SettingsError,
406
407    /// Server rejected request.
408    RequestRejected,
409
410    /// Request or its response cancelled.
411    RequestCancelled,
412
413    /// Client's request stream terminated without containing a full-formed
414    /// request.
415    RequestIncomplete,
416
417    /// An HTTP message was malformed and cannot be processed.
418    MessageError,
419
420    /// The TCP connection established in response to a CONNECT request was
421    /// reset or abnormally closed.
422    ConnectError,
423
424    /// The requested operation cannot be served over HTTP/3. Peer should retry
425    /// over HTTP/1.1.
426    VersionFallback,
427}
428
429/// HTTP/3 error codes sent on the wire.
430///
431/// As defined in [RFC9114](https://www.rfc-editor.org/rfc/rfc9114.html#http-error-codes).
432#[derive(Copy, Clone, Debug, Eq, PartialEq)]
433pub enum WireErrorCode {
434    /// No error. This is used when the connection or stream needs to be closed,
435    /// but there is no error to signal.
436    NoError              = 0x100,
437    /// Peer violated protocol requirements in a way that does not match a more
438    /// specific error code or endpoint declines to use the more specific
439    /// error code.
440    GeneralProtocolError = 0x101,
441    /// An internal error has occurred in the HTTP stack.
442    InternalError        = 0x102,
443    /// The endpoint detected that its peer created a stream that it will not
444    /// accept.
445    StreamCreationError  = 0x103,
446    /// A stream required by the HTTP/3 connection was closed or reset.
447    ClosedCriticalStream = 0x104,
448    /// A frame was received that was not permitted in the current state or on
449    /// the current stream.
450    FrameUnexpected      = 0x105,
451    /// A frame that fails to satisfy layout requirements or with an invalid
452    /// size was received.
453    FrameError           = 0x106,
454    /// The endpoint detected that its peer is exhibiting a behavior that might
455    /// be generating excessive load.
456    ExcessiveLoad        = 0x107,
457    /// A stream ID or push ID was used incorrectly, such as exceeding a limit,
458    /// reducing a limit, or being reused.
459    IdError              = 0x108,
460    /// An endpoint detected an error in the payload of a SETTINGS frame.
461    SettingsError        = 0x109,
462    /// No SETTINGS frame was received at the beginning of the control stream.
463    MissingSettings      = 0x10a,
464    /// A server rejected a request without performing any application
465    /// processing.
466    RequestRejected      = 0x10b,
467    /// The request or its response (including pushed response) is cancelled.
468    RequestCancelled     = 0x10c,
469    /// The client's stream terminated without containing a fully formed
470    /// request.
471    RequestIncomplete    = 0x10d,
472    /// An HTTP message was malformed and cannot be processed.
473    MessageError         = 0x10e,
474    /// The TCP connection established in response to a CONNECT request was
475    /// reset or abnormally closed.
476    ConnectError         = 0x10f,
477    /// The requested operation cannot be served over HTTP/3. The peer should
478    /// retry over HTTP/1.1.
479    VersionFallback      = 0x110,
480}
481
482impl Error {
483    fn to_wire(self) -> u64 {
484        match self {
485            Error::Done => WireErrorCode::NoError as u64,
486            Error::InternalError => WireErrorCode::InternalError as u64,
487            Error::StreamCreationError =>
488                WireErrorCode::StreamCreationError as u64,
489            Error::ClosedCriticalStream =>
490                WireErrorCode::ClosedCriticalStream as u64,
491            Error::FrameUnexpected => WireErrorCode::FrameUnexpected as u64,
492            Error::FrameError => WireErrorCode::FrameError as u64,
493            Error::ExcessiveLoad => WireErrorCode::ExcessiveLoad as u64,
494            Error::IdError => WireErrorCode::IdError as u64,
495            Error::MissingSettings => WireErrorCode::MissingSettings as u64,
496            Error::QpackDecompressionFailed => 0x200,
497            Error::BufferTooShort => 0x999,
498            Error::TransportError { .. } | Error::StreamBlocked => 0xFF,
499            Error::SettingsError => WireErrorCode::SettingsError as u64,
500            Error::RequestRejected => WireErrorCode::RequestRejected as u64,
501            Error::RequestCancelled => WireErrorCode::RequestCancelled as u64,
502            Error::RequestIncomplete => WireErrorCode::RequestIncomplete as u64,
503            Error::MessageError => WireErrorCode::MessageError as u64,
504            Error::ConnectError => WireErrorCode::ConnectError as u64,
505            Error::VersionFallback => WireErrorCode::VersionFallback as u64,
506        }
507    }
508
509    #[cfg(feature = "ffi")]
510    fn to_c(self) -> libc::ssize_t {
511        match self {
512            Error::Done => -1,
513            Error::BufferTooShort => -2,
514            Error::InternalError => -3,
515            Error::ExcessiveLoad => -4,
516            Error::IdError => -5,
517            Error::StreamCreationError => -6,
518            Error::ClosedCriticalStream => -7,
519            Error::MissingSettings => -8,
520            Error::FrameUnexpected => -9,
521            Error::FrameError => -10,
522            Error::QpackDecompressionFailed => -11,
523            // -12 was previously used for TransportError, skip it
524            Error::StreamBlocked => -13,
525            Error::SettingsError => -14,
526            Error::RequestRejected => -15,
527            Error::RequestCancelled => -16,
528            Error::RequestIncomplete => -17,
529            Error::MessageError => -18,
530            Error::ConnectError => -19,
531            Error::VersionFallback => -20,
532
533            Error::TransportError(quic_error) => quic_error.to_c() - 1000,
534        }
535    }
536}
537
538impl std::fmt::Display for Error {
539    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
540        write!(f, "{self:?}")
541    }
542}
543
544impl std::error::Error for Error {
545    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
546        None
547    }
548}
549
550impl std::convert::From<super::Error> for Error {
551    fn from(err: super::Error) -> Self {
552        match err {
553            super::Error::Done => Error::Done,
554
555            _ => Error::TransportError(err),
556        }
557    }
558}
559
560impl std::convert::From<octets::BufferTooShortError> for Error {
561    fn from(_err: octets::BufferTooShortError) -> Self {
562        Error::BufferTooShort
563    }
564}
565
566/// An HTTP/3 configuration.
567pub struct Config {
568    max_field_section_size: Option<u64>,
569    qpack_max_table_capacity: Option<u64>,
570    qpack_blocked_streams: Option<u64>,
571    connect_protocol_enabled: Option<u64>,
572    /// additional settings are settings that are not part of the H3
573    /// settings explicitly handled above
574    additional_settings: Option<Vec<(u64, u64)>>,
575}
576
577impl Config {
578    /// Creates a new configuration object with default settings.
579    pub const fn new() -> Result<Config> {
580        Ok(Config {
581            max_field_section_size: None,
582            qpack_max_table_capacity: None,
583            qpack_blocked_streams: None,
584            connect_protocol_enabled: None,
585            additional_settings: None,
586        })
587    }
588
589    /// Sets the `SETTINGS_MAX_FIELD_SECTION_SIZE` setting.
590    ///
591    /// By default no limit is enforced. When a request whose headers exceed
592    /// the limit set by the application is received, the call to the [`poll()`]
593    /// method will return the [`Error::ExcessiveLoad`] error, and the
594    /// connection will be closed.
595    ///
596    /// [`poll()`]: struct.Connection.html#method.poll
597    /// [`Error::ExcessiveLoad`]: enum.Error.html#variant.ExcessiveLoad
598    pub fn set_max_field_section_size(&mut self, v: u64) {
599        self.max_field_section_size = Some(v);
600    }
601
602    /// Sets the `SETTINGS_QPACK_MAX_TABLE_CAPACITY` setting.
603    ///
604    /// The default value is `0`.
605    pub fn set_qpack_max_table_capacity(&mut self, v: u64) {
606        self.qpack_max_table_capacity = Some(v);
607    }
608
609    /// Sets the `SETTINGS_QPACK_BLOCKED_STREAMS` setting.
610    ///
611    /// The default value is `0`.
612    pub fn set_qpack_blocked_streams(&mut self, v: u64) {
613        self.qpack_blocked_streams = Some(v);
614    }
615
616    /// Sets or omits the `SETTINGS_ENABLE_CONNECT_PROTOCOL` setting.
617    ///
618    /// The default value is `false`.
619    pub fn enable_extended_connect(&mut self, enabled: bool) {
620        if enabled {
621            self.connect_protocol_enabled = Some(1);
622        } else {
623            self.connect_protocol_enabled = None;
624        }
625    }
626
627    /// Sets additional HTTP/3 settings.
628    ///
629    /// The default value is no additional settings.
630    /// The `additional_settings` parameter must not the following
631    /// settings as they are already handled by this library:
632    ///
633    /// - SETTINGS_QPACK_MAX_TABLE_CAPACITY
634    /// - SETTINGS_MAX_FIELD_SECTION_SIZE
635    /// - SETTINGS_QPACK_BLOCKED_STREAMS
636    /// - SETTINGS_ENABLE_CONNECT_PROTOCOL
637    /// - SETTINGS_H3_DATAGRAM
638    ///
639    /// If such a setting is present in the `additional_settings`,
640    /// the method will return the [`Error::SettingsError`] error.
641    ///
642    /// If a setting identifier is present twice in `additional_settings`,
643    /// the method will return the [`Error::SettingsError`] error.
644    ///
645    /// [`Error::SettingsError`]: enum.Error.html#variant.SettingsError
646    pub fn set_additional_settings(
647        &mut self, additional_settings: Vec<(u64, u64)>,
648    ) -> Result<()> {
649        let explicit_quiche_settings = HashSet::from([
650            frame::SETTINGS_QPACK_MAX_TABLE_CAPACITY,
651            frame::SETTINGS_MAX_FIELD_SECTION_SIZE,
652            frame::SETTINGS_QPACK_BLOCKED_STREAMS,
653            frame::SETTINGS_ENABLE_CONNECT_PROTOCOL,
654            frame::SETTINGS_H3_DATAGRAM,
655            frame::SETTINGS_H3_DATAGRAM_00,
656        ]);
657
658        let dedup_settings: HashSet<u64> =
659            additional_settings.iter().map(|(key, _)| *key).collect();
660
661        if dedup_settings.len() != additional_settings.len() ||
662            !explicit_quiche_settings.is_disjoint(&dedup_settings)
663        {
664            return Err(Error::SettingsError);
665        }
666        self.additional_settings = Some(additional_settings);
667        Ok(())
668    }
669}
670
671/// A trait for types with associated string name and value.
672pub trait NameValue {
673    /// Returns the object's name.
674    fn name(&self) -> &[u8];
675
676    /// Returns the object's value.
677    fn value(&self) -> &[u8];
678}
679
680impl<N, V> NameValue for (N, V)
681where
682    N: AsRef<[u8]>,
683    V: AsRef<[u8]>,
684{
685    fn name(&self) -> &[u8] {
686        self.0.as_ref()
687    }
688
689    fn value(&self) -> &[u8] {
690        self.1.as_ref()
691    }
692}
693
694/// An owned name-value pair representing a raw HTTP header.
695#[derive(Clone, PartialEq, Eq)]
696pub struct Header(Vec<u8>, Vec<u8>);
697
698fn try_print_as_readable(hdr: &[u8], f: &mut fmt::Formatter) -> fmt::Result {
699    match std::str::from_utf8(hdr) {
700        Ok(s) => f.write_str(&s.escape_default().to_string()),
701        Err(_) => write!(f, "{hdr:?}"),
702    }
703}
704
705impl fmt::Debug for Header {
706    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
707        f.write_char('"')?;
708        try_print_as_readable(&self.0, f)?;
709        f.write_str(": ")?;
710        try_print_as_readable(&self.1, f)?;
711        f.write_char('"')
712    }
713}
714
715impl Header {
716    /// Creates a new header.
717    ///
718    /// Both `name` and `value` will be cloned.
719    pub fn new(name: &[u8], value: &[u8]) -> Self {
720        Self(name.to_vec(), value.to_vec())
721    }
722}
723
724impl NameValue for Header {
725    fn name(&self) -> &[u8] {
726        &self.0
727    }
728
729    fn value(&self) -> &[u8] {
730        &self.1
731    }
732}
733
734/// A non-owned name-value pair representing a raw HTTP header.
735#[derive(Clone, Debug, PartialEq, Eq)]
736pub struct HeaderRef<'a>(&'a [u8], &'a [u8]);
737
738impl<'a> HeaderRef<'a> {
739    /// Creates a new header.
740    pub const fn new(name: &'a [u8], value: &'a [u8]) -> Self {
741        Self(name, value)
742    }
743}
744
745impl NameValue for HeaderRef<'_> {
746    fn name(&self) -> &[u8] {
747        self.0
748    }
749
750    fn value(&self) -> &[u8] {
751        self.1
752    }
753}
754
755/// An HTTP/3 connection event.
756#[derive(Clone, Debug, PartialEq, Eq)]
757pub enum Event {
758    /// Request/response headers were received.
759    Headers {
760        /// The list of received header fields. The application should validate
761        /// pseudo-headers and headers.
762        list: Vec<Header>,
763
764        /// Whether more frames will follow the headers on the stream.
765        more_frames: bool,
766    },
767
768    /// Data was received.
769    ///
770    /// This indicates that the application can use the [`recv_body()`] method
771    /// to retrieve the data from the stream.
772    ///
773    /// Note that [`recv_body()`] will need to be called repeatedly until the
774    /// [`Done`] value is returned, as the event will not be re-armed until all
775    /// buffered data is read.
776    ///
777    /// [`recv_body()`]: struct.Connection.html#method.recv_body
778    /// [`Done`]: enum.Error.html#variant.Done
779    Data,
780
781    /// Stream was closed,
782    Finished,
783
784    /// Stream was reset.
785    ///
786    /// The associated data represents the error code sent by the peer.
787    Reset(u64),
788
789    /// PRIORITY_UPDATE was received.
790    ///
791    /// This indicates that the application can use the
792    /// [`take_last_priority_update()`] method to take the last received
793    /// PRIORITY_UPDATE for a specified stream.
794    ///
795    /// This event is triggered once per stream until the last PRIORITY_UPDATE
796    /// is taken. It is recommended that applications defer taking the
797    /// PRIORITY_UPDATE until after [`poll()`] returns [`Done`].
798    ///
799    /// [`take_last_priority_update()`]: struct.Connection.html#method.take_last_priority_update
800    /// [`poll()`]: struct.Connection.html#method.poll
801    /// [`Done`]: enum.Error.html#variant.Done
802    PriorityUpdate,
803
804    /// GOAWAY was received.
805    GoAway,
806}
807
808/// Extensible Priorities parameters.
809///
810/// The `TryFrom` trait supports constructing this object from the serialized
811/// Structured Fields Dictionary field value. I.e, use `TryFrom` to parse the
812/// value of a Priority header field or a PRIORITY_UPDATE frame. Using this
813/// trait requires the `sfv` feature to be enabled.
814#[derive(Clone, Copy, Debug, PartialEq, Eq)]
815#[repr(C)]
816pub struct Priority {
817    urgency: u8,
818    incremental: bool,
819}
820
821impl Default for Priority {
822    fn default() -> Self {
823        Priority {
824            urgency: PRIORITY_URGENCY_DEFAULT,
825            incremental: PRIORITY_INCREMENTAL_DEFAULT,
826        }
827    }
828}
829
830impl Priority {
831    /// Creates a new Priority.
832    pub const fn new(urgency: u8, incremental: bool) -> Self {
833        Priority {
834            urgency,
835            incremental,
836        }
837    }
838}
839
840#[cfg(feature = "sfv")]
841#[cfg_attr(docsrs, doc(cfg(feature = "sfv")))]
842impl TryFrom<&[u8]> for Priority {
843    type Error = crate::h3::Error;
844
845    /// Try to parse an Extensible Priority field value.
846    ///
847    /// The field value is expected to be a Structured Fields Dictionary; see
848    /// [Extensible Priorities].
849    ///
850    /// If the `u` or `i` fields are contained with correct types, a constructed
851    /// Priority object is returned. Note that urgency values outside of valid
852    /// range (0 through 7) are clamped to 7.
853    ///
854    /// If the `u` or `i` fields are contained with the wrong types,
855    /// Error::Done is returned.
856    ///
857    /// Omitted parameters will yield default values.
858    ///
859    /// [Extensible Priorities]: https://www.rfc-editor.org/rfc/rfc9218.html#section-4.
860    fn try_from(value: &[u8]) -> std::result::Result<Self, Self::Error> {
861        let dict = match sfv::Parser::parse_dictionary(value) {
862            Ok(v) => v,
863
864            Err(_) => return Err(Error::Done),
865        };
866
867        let urgency = match dict.get("u") {
868            // If there is a u parameter, try to read it as an Item of type
869            // Integer. If the value out of the spec's allowed range
870            // (0 through 7), that's an error so set it to the upper
871            // bound (lowest priority) to avoid interference with
872            // other streams.
873            Some(sfv::ListEntry::Item(item)) => match item.bare_item.as_int() {
874                Some(v) => {
875                    if !(PRIORITY_URGENCY_LOWER_BOUND as i64..=
876                        PRIORITY_URGENCY_UPPER_BOUND as i64)
877                        .contains(&v)
878                    {
879                        PRIORITY_URGENCY_UPPER_BOUND
880                    } else {
881                        v as u8
882                    }
883                },
884
885                None => return Err(Error::Done),
886            },
887
888            Some(sfv::ListEntry::InnerList(_)) => return Err(Error::Done),
889
890            // Omitted so use default value.
891            None => PRIORITY_URGENCY_DEFAULT,
892        };
893
894        let incremental = match dict.get("i") {
895            Some(sfv::ListEntry::Item(item)) =>
896                item.bare_item.as_bool().ok_or(Error::Done)?,
897
898            // Omitted so use default value.
899            _ => false,
900        };
901
902        Ok(Priority::new(urgency, incremental))
903    }
904}
905
906struct ConnectionSettings {
907    pub max_field_section_size: Option<u64>,
908    pub qpack_max_table_capacity: Option<u64>,
909    pub qpack_blocked_streams: Option<u64>,
910    pub connect_protocol_enabled: Option<u64>,
911    pub h3_datagram: Option<u64>,
912    pub additional_settings: Option<Vec<(u64, u64)>>,
913    pub raw: Option<Vec<(u64, u64)>>,
914}
915
916#[derive(Default)]
917struct QpackStreams {
918    pub encoder_stream_id: Option<u64>,
919    pub encoder_stream_bytes: u64,
920    pub decoder_stream_id: Option<u64>,
921    pub decoder_stream_bytes: u64,
922}
923
924/// Statistics about the connection.
925///
926/// A connection's statistics can be collected using the [`stats()`] method.
927///
928/// [`stats()`]: struct.Connection.html#method.stats
929#[derive(Clone, Default)]
930pub struct Stats {
931    /// The number of bytes received on the QPACK encoder stream.
932    pub qpack_encoder_stream_recv_bytes: u64,
933    /// The number of bytes received on the QPACK decoder stream.
934    pub qpack_decoder_stream_recv_bytes: u64,
935}
936
937fn close_conn_critical_stream<F: BufFactory>(
938    conn: &mut super::Connection<F>,
939) -> Result<()> {
940    conn.close(
941        true,
942        Error::ClosedCriticalStream.to_wire(),
943        b"Critical stream closed.",
944    )?;
945
946    Err(Error::ClosedCriticalStream)
947}
948
949fn close_conn_if_critical_stream_finished<F: BufFactory>(
950    conn: &mut super::Connection<F>, stream_id: u64,
951) -> Result<()> {
952    if conn.stream_finished(stream_id) {
953        close_conn_critical_stream(conn)?;
954    }
955
956    Ok(())
957}
958
959/// An HTTP/3 connection.
960pub struct Connection {
961    is_server: bool,
962
963    next_request_stream_id: u64,
964    next_uni_stream_id: u64,
965
966    streams: crate::stream::StreamIdHashMap<stream::Stream>,
967
968    local_settings: ConnectionSettings,
969    peer_settings: ConnectionSettings,
970
971    control_stream_id: Option<u64>,
972    peer_control_stream_id: Option<u64>,
973
974    qpack_encoder: qpack::Encoder,
975    qpack_decoder: qpack::Decoder,
976
977    local_qpack_streams: QpackStreams,
978    peer_qpack_streams: QpackStreams,
979
980    max_push_id: u64,
981
982    finished_streams: VecDeque<u64>,
983
984    frames_greased: bool,
985
986    local_goaway_id: Option<u64>,
987    peer_goaway_id: Option<u64>,
988}
989
990impl Connection {
991    fn new(
992        config: &Config, is_server: bool, enable_dgram: bool,
993    ) -> Result<Connection> {
994        let initial_uni_stream_id = if is_server { 0x3 } else { 0x2 };
995        let h3_datagram = if enable_dgram { Some(1) } else { None };
996
997        Ok(Connection {
998            is_server,
999
1000            next_request_stream_id: 0,
1001
1002            next_uni_stream_id: initial_uni_stream_id,
1003
1004            streams: Default::default(),
1005
1006            local_settings: ConnectionSettings {
1007                max_field_section_size: config.max_field_section_size,
1008                qpack_max_table_capacity: config.qpack_max_table_capacity,
1009                qpack_blocked_streams: config.qpack_blocked_streams,
1010                connect_protocol_enabled: config.connect_protocol_enabled,
1011                h3_datagram,
1012                additional_settings: config.additional_settings.clone(),
1013                raw: Default::default(),
1014            },
1015
1016            peer_settings: ConnectionSettings {
1017                max_field_section_size: None,
1018                qpack_max_table_capacity: None,
1019                qpack_blocked_streams: None,
1020                h3_datagram: None,
1021                connect_protocol_enabled: None,
1022                additional_settings: Default::default(),
1023                raw: Default::default(),
1024            },
1025
1026            control_stream_id: None,
1027            peer_control_stream_id: None,
1028
1029            qpack_encoder: qpack::Encoder::new(),
1030            qpack_decoder: qpack::Decoder::new(),
1031
1032            local_qpack_streams: Default::default(),
1033            peer_qpack_streams: Default::default(),
1034
1035            max_push_id: 0,
1036
1037            finished_streams: VecDeque::new(),
1038
1039            frames_greased: false,
1040
1041            local_goaway_id: None,
1042            peer_goaway_id: None,
1043        })
1044    }
1045
1046    /// Creates a new HTTP/3 connection using the provided QUIC connection.
1047    ///
1048    /// This will also initiate the HTTP/3 handshake with the peer by opening
1049    /// all control streams (including QPACK) and sending the local settings.
1050    ///
1051    /// On success the new connection is returned.
1052    ///
1053    /// The [`StreamLimit`] error is returned when the HTTP/3 control stream
1054    /// cannot be created due to stream limits.
1055    ///
1056    /// The [`InternalError`] error is returned when either the underlying QUIC
1057    /// connection is not in a suitable state, or the HTTP/3 control stream
1058    /// cannot be created due to flow control limits.
1059    ///
1060    /// [`StreamLimit`]: ../enum.Error.html#variant.StreamLimit
1061    /// [`InternalError`]: ../enum.Error.html#variant.InternalError
1062    pub fn with_transport<F: BufFactory>(
1063        conn: &mut super::Connection<F>, config: &Config,
1064    ) -> Result<Connection> {
1065        let is_client = !conn.is_server;
1066        if is_client && !(conn.is_established() || conn.is_in_early_data()) {
1067            trace!("{} QUIC connection must be established or in early data before creating an HTTP/3 connection", conn.trace_id());
1068            return Err(Error::InternalError);
1069        }
1070
1071        let mut http3_conn =
1072            Connection::new(config, conn.is_server, conn.dgram_enabled())?;
1073
1074        match http3_conn.send_settings(conn) {
1075            Ok(_) => (),
1076
1077            Err(e) => {
1078                conn.close(true, e.to_wire(), b"Error opening control stream")?;
1079                return Err(e);
1080            },
1081        };
1082
1083        // Try opening QPACK streams, but ignore errors if it fails since we
1084        // don't need them right now.
1085        http3_conn.open_qpack_encoder_stream(conn).ok();
1086        http3_conn.open_qpack_decoder_stream(conn).ok();
1087
1088        if conn.grease {
1089            // Try opening a GREASE stream, but ignore errors since it's not
1090            // critical.
1091            http3_conn.open_grease_stream(conn).ok();
1092        }
1093
1094        Ok(http3_conn)
1095    }
1096
1097    /// Sends an HTTP/3 request.
1098    ///
1099    /// The request is encoded from the provided list of headers without a
1100    /// body, and sent on a newly allocated stream. To include a body,
1101    /// set `fin` as `false` and subsequently call [`send_body()`] with the
1102    /// same `conn` and the `stream_id` returned from this method.
1103    ///
1104    /// On success the newly allocated stream ID is returned.
1105    ///
1106    /// The [`StreamBlocked`] error is returned when the underlying QUIC stream
1107    /// doesn't have enough capacity for the operation to complete. When this
1108    /// happens the application should retry the operation once the stream is
1109    /// reported as writable again.
1110    ///
1111    /// [`send_body()`]: struct.Connection.html#method.send_body
1112    /// [`StreamBlocked`]: enum.Error.html#variant.StreamBlocked
1113    pub fn send_request<T: NameValue, F: BufFactory>(
1114        &mut self, conn: &mut super::Connection<F>, headers: &[T], fin: bool,
1115    ) -> Result<u64> {
1116        // If we received a GOAWAY from the peer, MUST NOT initiate new
1117        // requests.
1118        if self.peer_goaway_id.is_some() {
1119            return Err(Error::FrameUnexpected);
1120        }
1121
1122        let stream_id = self.next_request_stream_id;
1123
1124        self.streams
1125            .insert(stream_id, <stream::Stream>::new(stream_id, true));
1126
1127        // The underlying QUIC stream does not exist yet, so calls to e.g.
1128        // stream_capacity() will fail. By writing a 0-length buffer, we force
1129        // the creation of the QUIC stream state, without actually writing
1130        // anything.
1131        if let Err(e) = conn.stream_send(stream_id, b"", false) {
1132            self.streams.remove(&stream_id);
1133
1134            if e == super::Error::Done {
1135                return Err(Error::StreamBlocked);
1136            }
1137
1138            return Err(e.into());
1139        };
1140
1141        self.send_headers(conn, stream_id, headers, fin)?;
1142
1143        // To avoid skipping stream IDs, we only calculate the next available
1144        // stream ID when a request has been successfully buffered.
1145        self.next_request_stream_id = self
1146            .next_request_stream_id
1147            .checked_add(4)
1148            .ok_or(Error::IdError)?;
1149
1150        Ok(stream_id)
1151    }
1152
1153    /// Sends an HTTP/3 response on the specified stream with default priority.
1154    ///
1155    /// This method sends the provided `headers` as a single initial response
1156    /// without a body.
1157    ///
1158    /// To send a non-final 1xx, then a final 200+ without body:
1159    ///   * send_response() with `fin` set to `false`.
1160    ///   * [`send_additional_headers()`] with fin set to `true` using the same
1161    ///     `stream_id` value.
1162    ///
1163    /// To send a non-final 1xx, then a final 200+ with body:
1164    ///   * send_response() with `fin` set to `false`.
1165    ///   * [`send_additional_headers()`] with fin set to `false` and same
1166    ///     `stream_id` value.
1167    ///   * [`send_body()`] with same `stream_id`.
1168    ///
1169    /// To send a final 200+ with body:
1170    ///   * send_response() with `fin` set to `false`.
1171    ///   * [`send_body()`] with same `stream_id`.
1172    ///
1173    /// Additional headers can only be sent during certain phases of an HTTP/3
1174    /// message exchange, see [Section 4.1 of RFC 9114]. The [`FrameUnexpected`]
1175    /// error is returned if this method, or [`send_response_with_priority()`],
1176    /// are called multiple times with the same `stream_id` value.
1177    ///
1178    /// The [`StreamBlocked`] error is returned when the underlying QUIC stream
1179    /// doesn't have enough capacity for the operation to complete. When this
1180    /// happens the application should retry the operation once the stream is
1181    /// reported as writable again.
1182    ///
1183    /// [`send_body()`]: struct.Connection.html#method.send_body
1184    /// [`send_additional_headers()`]:
1185    ///     struct.Connection.html#method.send_additional_headers
1186    /// [`send_response_with_priority()`]:
1187    ///     struct.Connection.html#method.send_response_with_priority
1188    /// [`FrameUnexpected`]: enum.Error.html#variant.FrameUnexpected
1189    /// [`StreamBlocked`]: enum.Error.html#variant.StreamBlocked
1190    pub fn send_response<T: NameValue, F: BufFactory>(
1191        &mut self, conn: &mut super::Connection<F>, stream_id: u64,
1192        headers: &[T], fin: bool,
1193    ) -> Result<()> {
1194        let priority = Default::default();
1195
1196        self.send_response_with_priority(
1197            conn, stream_id, headers, &priority, fin,
1198        )?;
1199
1200        Ok(())
1201    }
1202
1203    /// Sends an HTTP/3 response on the specified stream with specified
1204    /// priority.
1205    ///
1206    /// This method sends the provided `headers` as a single initial response
1207    /// without a body.
1208    ///
1209    /// To send a non-final 1xx, then a final 200+ without body:
1210    ///   * send_response_with_priority() with `fin` set to `false`.
1211    ///   * [`send_additional_headers()`] with fin set to `true` using the same
1212    ///     `stream_id` value.
1213    ///
1214    /// To send a non-final 1xx, then a final 200+ with body:
1215    ///   * send_response_with_priority() with `fin` set to `false`.
1216    ///   * [`send_additional_headers()`] with fin set to `false` and same
1217    ///     `stream_id` value.
1218    ///   * [`send_body()`] with same `stream_id`.
1219    ///
1220    /// To send a final 200+ with body:
1221    ///   * send_response_with_priority() with `fin` set to `false`.
1222    ///   * [`send_body()`] with same `stream_id`.
1223    ///
1224    /// The `priority` parameter represents [Extensible Priority]
1225    /// parameters. If the urgency is outside the range 0-7, it will be clamped
1226    /// to 7.
1227    ///
1228    /// Additional headers can only be sent during certain phases of an HTTP/3
1229    /// message exchange, see [Section 4.1 of RFC 9114]. The [`FrameUnexpected`]
1230    /// error is returned if this method, or [`send_response()`],
1231    /// are called multiple times with the same `stream_id` value.
1232    ///
1233    /// The [`StreamBlocked`] error is returned when the underlying QUIC stream
1234    /// doesn't have enough capacity for the operation to complete. When this
1235    /// happens the application should retry the operation once the stream is
1236    /// reported as writable again.
1237    ///
1238    /// [`send_body()`]: struct.Connection.html#method.send_body
1239    /// [`send_additional_headers()`]:
1240    ///     struct.Connection.html#method.send_additional_headers
1241    /// [`send_response()`]:
1242    ///     struct.Connection.html#method.send_response
1243    /// [`FrameUnexpected`]: enum.Error.html#variant.FrameUnexpected
1244    /// [`StreamBlocked`]: enum.Error.html#variant.StreamBlocked
1245    /// [Extensible Priority]: https://www.rfc-editor.org/rfc/rfc9218.html#section-4.
1246    pub fn send_response_with_priority<T: NameValue, F: BufFactory>(
1247        &mut self, conn: &mut super::Connection<F>, stream_id: u64,
1248        headers: &[T], priority: &Priority, fin: bool,
1249    ) -> Result<()> {
1250        match self.streams.get(&stream_id) {
1251            Some(s) => {
1252                // Only one initial HEADERS allowed.
1253                if s.local_initialized() {
1254                    return Err(Error::FrameUnexpected);
1255                }
1256
1257                s
1258            },
1259
1260            None => return Err(Error::FrameUnexpected),
1261        };
1262
1263        self.send_headers(conn, stream_id, headers, fin)?;
1264
1265        // Clamp and shift urgency into quiche-priority space
1266        let urgency = priority
1267            .urgency
1268            .clamp(PRIORITY_URGENCY_LOWER_BOUND, PRIORITY_URGENCY_UPPER_BOUND) +
1269            PRIORITY_URGENCY_OFFSET;
1270
1271        conn.stream_priority(stream_id, urgency, priority.incremental)?;
1272
1273        Ok(())
1274    }
1275
1276    /// Sends additional HTTP/3 headers.
1277    ///
1278    /// After the initial request or response headers have been sent, using
1279    /// [`send_request()`] or [`send_response()`] respectively, this method can
1280    /// be used send an additional HEADERS frame. For example, to send a single
1281    /// instance of trailers after a request with a body, or to issue another
1282    /// non-final 1xx after a preceding 1xx, or to issue a final response after
1283    /// a preceding 1xx.
1284    ///
1285    /// Additional headers can only be sent during certain phases of an HTTP/3
1286    /// message exchange, see [Section 4.1 of RFC 9114]. The [`FrameUnexpected`]
1287    /// error is returned when this method is called during the wrong phase,
1288    /// such as before initial headers have been sent, or if trailers have
1289    /// already been sent.
1290    ///
1291    /// The [`StreamBlocked`] error is returned when the underlying QUIC stream
1292    /// doesn't have enough capacity for the operation to complete. When this
1293    /// happens the application should retry the operation once the stream is
1294    /// reported as writable again.
1295    ///
1296    /// [`send_request()`]: struct.Connection.html#method.send_request
1297    /// [`send_response()`]: struct.Connection.html#method.send_response
1298    /// [`StreamBlocked`]: enum.Error.html#variant.StreamBlocked
1299    /// [`FrameUnexpected`]: enum.Error.html#variant.FrameUnexpected
1300    /// [Section 4.1 of RFC 9114]:
1301    ///     https://www.rfc-editor.org/rfc/rfc9114.html#section-4.1.
1302    pub fn send_additional_headers<T: NameValue, F: BufFactory>(
1303        &mut self, conn: &mut super::Connection<F>, stream_id: u64,
1304        headers: &[T], is_trailer_section: bool, fin: bool,
1305    ) -> Result<()> {
1306        // Clients can only send trailer headers.
1307        if !self.is_server && !is_trailer_section {
1308            return Err(Error::FrameUnexpected);
1309        }
1310
1311        match self.streams.get(&stream_id) {
1312            Some(s) => {
1313                // Initial HEADERS must have been sent.
1314                if !s.local_initialized() {
1315                    return Err(Error::FrameUnexpected);
1316                }
1317
1318                // Only one trailing HEADERS allowed.
1319                if s.trailers_sent() {
1320                    return Err(Error::FrameUnexpected);
1321                }
1322
1323                s
1324            },
1325
1326            None => return Err(Error::FrameUnexpected),
1327        };
1328
1329        self.send_headers(conn, stream_id, headers, fin)?;
1330
1331        if is_trailer_section {
1332            // send_headers() might have tidied the stream away, so we need to
1333            // check again.
1334            if let Some(s) = self.streams.get_mut(&stream_id) {
1335                s.mark_trailers_sent();
1336            }
1337        }
1338
1339        Ok(())
1340    }
1341
1342    /// Sends additional HTTP/3 headers with specified priority.
1343    ///
1344    /// After the initial request or response headers have been sent, using
1345    /// [`send_request()`] or [`send_response()`] respectively, this method can
1346    /// be used send an additional HEADERS frame. For example, to send a single
1347    /// instance of trailers after a request with a body, or to issue another
1348    /// non-final 1xx after a preceding 1xx, or to issue a final response after
1349    /// a preceding 1xx.
1350    ///
1351    /// The `priority` parameter represents [Extensible Priority]
1352    /// parameters. If the urgency is outside the range 0-7, it will be clamped
1353    /// to 7.
1354    ///
1355    /// Additional headers can only be sent during certain phases of an HTTP/3
1356    /// message exchange, see [Section 4.1 of RFC 9114]. The [`FrameUnexpected`]
1357    /// error is returned when this method is called during the wrong phase,
1358    /// such as before initial headers have been sent, or if trailers have
1359    /// already been sent.
1360    ///
1361    /// The [`StreamBlocked`] error is returned when the underlying QUIC stream
1362    /// doesn't have enough capacity for the operation to complete. When this
1363    /// happens the application should retry the operation once the stream is
1364    /// reported as writable again.
1365    ///
1366    /// [`send_request()`]: struct.Connection.html#method.send_request
1367    /// [`send_response()`]: struct.Connection.html#method.send_response
1368    /// [`StreamBlocked`]: enum.Error.html#variant.StreamBlocked
1369    /// [`FrameUnexpected`]: enum.Error.html#variant.FrameUnexpected
1370    /// [Section 4.1 of RFC 9114]:
1371    ///     https://www.rfc-editor.org/rfc/rfc9114.html#section-4.1.
1372    /// [Extensible Priority]: https://www.rfc-editor.org/rfc/rfc9218.html#section-4.
1373    pub fn send_additional_headers_with_priority<T: NameValue, F: BufFactory>(
1374        &mut self, conn: &mut super::Connection<F>, stream_id: u64,
1375        headers: &[T], priority: &Priority, is_trailer_section: bool, fin: bool,
1376    ) -> Result<()> {
1377        self.send_additional_headers(
1378            conn,
1379            stream_id,
1380            headers,
1381            is_trailer_section,
1382            fin,
1383        )?;
1384
1385        // Clamp and shift urgency into quiche-priority space
1386        let urgency = priority
1387            .urgency
1388            .clamp(PRIORITY_URGENCY_LOWER_BOUND, PRIORITY_URGENCY_UPPER_BOUND) +
1389            PRIORITY_URGENCY_OFFSET;
1390
1391        conn.stream_priority(stream_id, urgency, priority.incremental)?;
1392
1393        Ok(())
1394    }
1395
1396    fn encode_header_block<T: NameValue>(
1397        &mut self, headers: &[T],
1398    ) -> Result<Vec<u8>> {
1399        let headers_len = headers
1400            .iter()
1401            .fold(0, |acc, h| acc + h.value().len() + h.name().len() + 32);
1402
1403        let mut header_block = vec![0; headers_len];
1404        let len = self
1405            .qpack_encoder
1406            .encode(headers, &mut header_block)
1407            .map_err(|_| Error::InternalError)?;
1408
1409        header_block.truncate(len);
1410
1411        Ok(header_block)
1412    }
1413
1414    fn send_headers<T: NameValue, F: BufFactory>(
1415        &mut self, conn: &mut super::Connection<F>, stream_id: u64,
1416        headers: &[T], fin: bool,
1417    ) -> Result<()> {
1418        let mut d = [42; 10];
1419        let mut b = octets::OctetsMut::with_slice(&mut d);
1420
1421        if !self.frames_greased && conn.grease {
1422            self.send_grease_frames(conn, stream_id)?;
1423            self.frames_greased = true;
1424        }
1425
1426        let header_block = self.encode_header_block(headers)?;
1427
1428        let overhead = octets::varint_len(frame::HEADERS_FRAME_TYPE_ID) +
1429            octets::varint_len(header_block.len() as u64);
1430
1431        // Headers need to be sent atomically, so make sure the stream has
1432        // enough capacity.
1433        match conn.stream_writable(stream_id, overhead + header_block.len()) {
1434            Ok(true) => (),
1435
1436            Ok(false) => return Err(Error::StreamBlocked),
1437
1438            Err(e) => {
1439                if conn.stream_finished(stream_id) {
1440                    self.streams.remove(&stream_id);
1441                }
1442
1443                return Err(e.into());
1444            },
1445        };
1446
1447        b.put_varint(frame::HEADERS_FRAME_TYPE_ID)?;
1448        b.put_varint(header_block.len() as u64)?;
1449        let off = b.off();
1450        conn.stream_send(stream_id, &d[..off], false)?;
1451
1452        // Sending header block separately avoids unnecessary copy.
1453        conn.stream_send(stream_id, &header_block, fin)?;
1454
1455        trace!(
1456            "{} tx frm HEADERS stream={} len={} fin={}",
1457            conn.trace_id(),
1458            stream_id,
1459            header_block.len(),
1460            fin
1461        );
1462
1463        qlog_with_type!(QLOG_FRAME_CREATED, conn.qlog, q, {
1464            let qlog_headers = headers
1465                .iter()
1466                .map(|h| qlog::events::h3::HttpHeader {
1467                    name: String::from_utf8_lossy(h.name()).into_owned(),
1468                    value: String::from_utf8_lossy(h.value()).into_owned(),
1469                })
1470                .collect();
1471
1472            let frame = Http3Frame::Headers {
1473                headers: qlog_headers,
1474            };
1475            let ev_data = EventData::H3FrameCreated(H3FrameCreated {
1476                stream_id,
1477                length: Some(header_block.len() as u64),
1478                frame,
1479                ..Default::default()
1480            });
1481
1482            q.add_event_data_now(ev_data).ok();
1483        });
1484
1485        if let Some(s) = self.streams.get_mut(&stream_id) {
1486            s.initialize_local();
1487        }
1488
1489        if fin && conn.stream_finished(stream_id) {
1490            self.streams.remove(&stream_id);
1491        }
1492
1493        Ok(())
1494    }
1495
1496    /// Sends an HTTP/3 body chunk on the given stream.
1497    ///
1498    /// On success the number of bytes written is returned, or [`Done`] if no
1499    /// bytes could be written (e.g. because the stream is blocked).
1500    ///
1501    /// Note that the number of written bytes returned can be lower than the
1502    /// length of the input buffer when the underlying QUIC stream doesn't have
1503    /// enough capacity for the operation to complete.
1504    ///
1505    /// When a partial write happens (including when [`Done`] is returned) the
1506    /// application should retry the operation once the stream is reported as
1507    /// writable again.
1508    ///
1509    /// [`Done`]: enum.Error.html#variant.Done
1510    pub fn send_body<F: BufFactory>(
1511        &mut self, conn: &mut super::Connection<F>, stream_id: u64, body: &[u8],
1512        fin: bool,
1513    ) -> Result<usize> {
1514        self.do_send_body(
1515            conn,
1516            stream_id,
1517            body,
1518            fin,
1519            |conn: &mut super::Connection<F>,
1520             header: &[u8],
1521             stream_id: u64,
1522             body: &[u8],
1523             body_len: usize,
1524             fin: bool| {
1525                conn.stream_send(stream_id, header, false)?;
1526                Ok(conn
1527                    .stream_send(stream_id, &body[..body_len], fin)
1528                    .map(|v| (v, v))?)
1529            },
1530        )
1531    }
1532
1533    /// Sends an HTTP/3 body chunk provided as a raw buffer on the given stream.
1534    ///
1535    /// If the capacity allows it the buffer will be appended to the stream's
1536    /// send queue with zero copying.
1537    ///
1538    /// On success the number of bytes written is returned, or [`Done`] if no
1539    /// bytes could be written (e.g. because the stream is blocked).
1540    ///
1541    /// Note that the number of written bytes returned can be lower than the
1542    /// length of the input buffer when the underlying QUIC stream doesn't have
1543    /// enough capacity for the operation to complete.
1544    ///
1545    /// When a partial write happens (including when [`Done`] is returned) the
1546    /// remaining (unwrittent) buffer will also be returned. The application
1547    /// should retry the operation once the stream is reported as writable
1548    /// again.
1549    ///
1550    /// [`Done`]: enum.Error.html#variant.Done
1551    pub fn send_body_zc<F>(
1552        &mut self, conn: &mut super::Connection<F>, stream_id: u64,
1553        body: &mut F::Buf, fin: bool,
1554    ) -> Result<usize>
1555    where
1556        F: BufFactory,
1557        F::Buf: BufSplit,
1558    {
1559        self.do_send_body(
1560            conn,
1561            stream_id,
1562            body,
1563            fin,
1564            |conn: &mut super::Connection<F>,
1565             header: &[u8],
1566             stream_id: u64,
1567             body: &mut F::Buf,
1568             mut body_len: usize,
1569             fin: bool| {
1570                let with_prefix = body.try_add_prefix(header);
1571                if !with_prefix {
1572                    conn.stream_send(stream_id, header, false)?;
1573                } else {
1574                    body_len += header.len();
1575                }
1576
1577                let (mut n, rem) = conn.stream_send_zc(
1578                    stream_id,
1579                    body.clone(),
1580                    Some(body_len),
1581                    fin,
1582                )?;
1583
1584                if with_prefix {
1585                    n -= header.len();
1586                }
1587
1588                if let Some(rem) = rem {
1589                    let _ = std::mem::replace(body, rem);
1590                }
1591
1592                Ok((n, n))
1593            },
1594        )
1595    }
1596
1597    fn do_send_body<F, B, R, SND>(
1598        &mut self, conn: &mut super::Connection<F>, stream_id: u64, body: B,
1599        fin: bool, write_fn: SND,
1600    ) -> Result<R>
1601    where
1602        F: BufFactory,
1603        B: AsRef<[u8]>,
1604        SND: FnOnce(
1605            &mut super::Connection<F>,
1606            &[u8],
1607            u64,
1608            B,
1609            usize,
1610            bool,
1611        ) -> Result<(usize, R)>,
1612    {
1613        let mut d = [42; 10];
1614        let mut b = octets::OctetsMut::with_slice(&mut d);
1615
1616        let len = body.as_ref().len();
1617
1618        // Validate that it is sane to send data on the stream.
1619        if stream_id % 4 != 0 {
1620            return Err(Error::FrameUnexpected);
1621        }
1622
1623        match self.streams.get_mut(&stream_id) {
1624            Some(s) => {
1625                if !s.local_initialized() {
1626                    return Err(Error::FrameUnexpected);
1627                }
1628
1629                if s.trailers_sent() {
1630                    return Err(Error::FrameUnexpected);
1631                }
1632            },
1633
1634            None => {
1635                return Err(Error::FrameUnexpected);
1636            },
1637        };
1638
1639        // Avoid sending 0-length DATA frames when the fin flag is false.
1640        if len == 0 && !fin {
1641            return Err(Error::Done);
1642        }
1643
1644        let overhead = octets::varint_len(frame::DATA_FRAME_TYPE_ID) +
1645            octets::varint_len(len as u64);
1646
1647        let stream_cap = match conn.stream_capacity(stream_id) {
1648            Ok(v) => v,
1649
1650            Err(e) => {
1651                if conn.stream_finished(stream_id) {
1652                    self.streams.remove(&stream_id);
1653                }
1654
1655                return Err(e.into());
1656            },
1657        };
1658
1659        // Make sure there is enough capacity to send the DATA frame header.
1660        if stream_cap < overhead {
1661            let _ = conn.stream_writable(stream_id, overhead + 1);
1662            return Err(Error::Done);
1663        }
1664
1665        // Cap the frame payload length to the stream's capacity.
1666        let body_len = std::cmp::min(len, stream_cap - overhead);
1667
1668        // If we can't send the entire body, set the fin flag to false so the
1669        // application can try again later.
1670        let fin = if body_len != len { false } else { fin };
1671
1672        // Again, avoid sending 0-length DATA frames when the fin flag is false.
1673        if body_len == 0 && !fin {
1674            let _ = conn.stream_writable(stream_id, overhead + 1);
1675            return Err(Error::Done);
1676        }
1677
1678        b.put_varint(frame::DATA_FRAME_TYPE_ID)?;
1679        b.put_varint(body_len as u64)?;
1680        let off = b.off();
1681
1682        // Return how many bytes were written, excluding the frame header.
1683        // Sending body separately avoids unnecessary copy.
1684        let (written, ret) =
1685            write_fn(conn, &d[..off], stream_id, body, body_len, fin)?;
1686
1687        trace!(
1688            "{} tx frm DATA stream={} len={} fin={}",
1689            conn.trace_id(),
1690            stream_id,
1691            written,
1692            fin
1693        );
1694
1695        qlog_with_type!(QLOG_FRAME_CREATED, conn.qlog, q, {
1696            let frame = Http3Frame::Data { raw: None };
1697            let ev_data = EventData::H3FrameCreated(H3FrameCreated {
1698                stream_id,
1699                length: Some(written as u64),
1700                frame,
1701                ..Default::default()
1702            });
1703
1704            q.add_event_data_now(ev_data).ok();
1705        });
1706
1707        if written < len {
1708            // Ensure the peer is notified that the connection or stream is
1709            // blocked when the stream's capacity is limited by flow control.
1710            //
1711            // We only need enough capacity to send a few bytes, to make sure
1712            // the stream doesn't hang due to congestion window not growing
1713            // enough.
1714            let _ = conn.stream_writable(stream_id, overhead + 1);
1715        }
1716
1717        if fin && written == len && conn.stream_finished(stream_id) {
1718            self.streams.remove(&stream_id);
1719        }
1720
1721        Ok(ret)
1722    }
1723
1724    /// Returns whether the peer enabled HTTP/3 DATAGRAM frame support.
1725    ///
1726    /// Support is signalled by the peer's SETTINGS, so this method always
1727    /// returns false until they have been processed using the [`poll()`]
1728    /// method.
1729    ///
1730    /// [`poll()`]: struct.Connection.html#method.poll
1731    pub fn dgram_enabled_by_peer<F: BufFactory>(
1732        &self, conn: &super::Connection<F>,
1733    ) -> bool {
1734        self.peer_settings.h3_datagram == Some(1) &&
1735            conn.dgram_max_writable_len().is_some()
1736    }
1737
1738    /// Returns whether the peer enabled extended CONNECT support.
1739    ///
1740    /// Support is signalled by the peer's SETTINGS, so this method always
1741    /// returns false until they have been processed using the [`poll()`]
1742    /// method.
1743    ///
1744    /// [`poll()`]: struct.Connection.html#method.poll
1745    pub fn extended_connect_enabled_by_peer(&self) -> bool {
1746        self.peer_settings.connect_protocol_enabled == Some(1)
1747    }
1748
1749    /// Reads request or response body data into the provided buffer.
1750    ///
1751    /// Applications should call this method whenever the [`poll()`] method
1752    /// returns a [`Data`] event.
1753    ///
1754    /// On success the amount of bytes read is returned, or [`Done`] if there
1755    /// is no data to read.
1756    ///
1757    /// [`poll()`]: struct.Connection.html#method.poll
1758    /// [`Data`]: enum.Event.html#variant.Data
1759    /// [`Done`]: enum.Error.html#variant.Done
1760    pub fn recv_body<F: BufFactory>(
1761        &mut self, conn: &mut super::Connection<F>, stream_id: u64,
1762        out: &mut [u8],
1763    ) -> Result<usize> {
1764        let mut total = 0;
1765
1766        // Try to consume all buffered data for the stream, even across multiple
1767        // DATA frames.
1768        while total < out.len() {
1769            let stream = self.streams.get_mut(&stream_id).ok_or(Error::Done)?;
1770
1771            if stream.state() != stream::State::Data {
1772                break;
1773            }
1774
1775            let (read, fin) =
1776                match stream.try_consume_data(conn, &mut out[total..]) {
1777                    Ok(v) => v,
1778
1779                    Err(Error::Done) => break,
1780
1781                    Err(e) => return Err(e),
1782                };
1783
1784            total += read;
1785
1786            // No more data to read, we are done.
1787            if read == 0 || fin {
1788                break;
1789            }
1790
1791            // Process incoming data from the stream. For example, if a whole
1792            // DATA frame was consumed, and another one is queued behind it,
1793            // this will ensure the additional data will also be returned to
1794            // the application.
1795            match self.process_readable_stream(conn, stream_id, false) {
1796                Ok(_) => unreachable!(),
1797
1798                Err(Error::Done) => (),
1799
1800                Err(e) => return Err(e),
1801            };
1802
1803            if conn.stream_finished(stream_id) {
1804                break;
1805            }
1806        }
1807
1808        // While body is being received, the stream is marked as finished only
1809        // when all data is read by the application.
1810        if conn.stream_finished(stream_id) {
1811            self.process_finished_stream(stream_id);
1812        }
1813
1814        if total == 0 {
1815            return Err(Error::Done);
1816        }
1817
1818        Ok(total)
1819    }
1820
1821    /// Sends a PRIORITY_UPDATE frame on the control stream with specified
1822    /// request stream ID and priority.
1823    ///
1824    /// The `priority` parameter represents [Extensible Priority]
1825    /// parameters. If the urgency is outside the range 0-7, it will be clamped
1826    /// to 7.
1827    ///
1828    /// The [`StreamBlocked`] error is returned when the underlying QUIC stream
1829    /// doesn't have enough capacity for the operation to complete. When this
1830    /// happens the application should retry the operation once the stream is
1831    /// reported as writable again.
1832    ///
1833    /// [`StreamBlocked`]: enum.Error.html#variant.StreamBlocked
1834    /// [Extensible Priority]: https://www.rfc-editor.org/rfc/rfc9218.html#section-4.
1835    pub fn send_priority_update_for_request<F: BufFactory>(
1836        &mut self, conn: &mut super::Connection<F>, stream_id: u64,
1837        priority: &Priority,
1838    ) -> Result<()> {
1839        let mut d = [42; 20];
1840        let mut b = octets::OctetsMut::with_slice(&mut d);
1841
1842        // Validate that it is sane to send PRIORITY_UPDATE.
1843        if self.is_server {
1844            return Err(Error::FrameUnexpected);
1845        }
1846
1847        if stream_id % 4 != 0 {
1848            return Err(Error::FrameUnexpected);
1849        }
1850
1851        let control_stream_id =
1852            self.control_stream_id.ok_or(Error::FrameUnexpected)?;
1853
1854        let urgency = priority
1855            .urgency
1856            .clamp(PRIORITY_URGENCY_LOWER_BOUND, PRIORITY_URGENCY_UPPER_BOUND);
1857
1858        let mut field_value = format!("u={urgency}");
1859
1860        if priority.incremental {
1861            field_value.push_str(",i");
1862        }
1863
1864        let priority_field_value = field_value.as_bytes();
1865        let frame_payload_len =
1866            octets::varint_len(stream_id) + priority_field_value.len();
1867
1868        let overhead =
1869            octets::varint_len(frame::PRIORITY_UPDATE_FRAME_REQUEST_TYPE_ID) +
1870                octets::varint_len(stream_id) +
1871                octets::varint_len(frame_payload_len as u64);
1872
1873        // Make sure the control stream has enough capacity.
1874        match conn.stream_writable(
1875            control_stream_id,
1876            overhead + priority_field_value.len(),
1877        ) {
1878            Ok(true) => (),
1879
1880            Ok(false) => return Err(Error::StreamBlocked),
1881
1882            Err(e) => {
1883                return Err(e.into());
1884            },
1885        }
1886
1887        b.put_varint(frame::PRIORITY_UPDATE_FRAME_REQUEST_TYPE_ID)?;
1888        b.put_varint(frame_payload_len as u64)?;
1889        b.put_varint(stream_id)?;
1890        let off = b.off();
1891        conn.stream_send(control_stream_id, &d[..off], false)?;
1892
1893        // Sending field value separately avoids unnecessary copy.
1894        conn.stream_send(control_stream_id, priority_field_value, false)?;
1895
1896        trace!(
1897            "{} tx frm PRIORITY_UPDATE request_stream={} priority_field_value={}",
1898            conn.trace_id(),
1899            stream_id,
1900            field_value,
1901        );
1902
1903        qlog_with_type!(QLOG_FRAME_CREATED, conn.qlog, q, {
1904            let frame = Http3Frame::PriorityUpdate {
1905                target_stream_type: H3PriorityTargetStreamType::Request,
1906                prioritized_element_id: stream_id,
1907                priority_field_value: field_value.clone(),
1908            };
1909
1910            let ev_data = EventData::H3FrameCreated(H3FrameCreated {
1911                stream_id,
1912                length: Some(priority_field_value.len() as u64),
1913                frame,
1914                ..Default::default()
1915            });
1916
1917            q.add_event_data_now(ev_data).ok();
1918        });
1919
1920        Ok(())
1921    }
1922
1923    /// Take the last PRIORITY_UPDATE for a prioritized element ID.
1924    ///
1925    /// When the [`poll()`] method returns a [`PriorityUpdate`] event for a
1926    /// prioritized element, the event has triggered and will not rearm until
1927    /// applications call this method. It is recommended that applications defer
1928    /// taking the PRIORITY_UPDATE until after [`poll()`] returns [`Done`].
1929    ///
1930    /// On success the Priority Field Value is returned, or [`Done`] if there is
1931    /// no PRIORITY_UPDATE to read (either because there is no value to take, or
1932    /// because the prioritized element does not exist).
1933    ///
1934    /// [`poll()`]: struct.Connection.html#method.poll
1935    /// [`PriorityUpdate`]: enum.Event.html#variant.PriorityUpdate
1936    /// [`Done`]: enum.Error.html#variant.Done
1937    pub fn take_last_priority_update(
1938        &mut self, prioritized_element_id: u64,
1939    ) -> Result<Vec<u8>> {
1940        if let Some(stream) = self.streams.get_mut(&prioritized_element_id) {
1941            return stream.take_last_priority_update().ok_or(Error::Done);
1942        }
1943
1944        Err(Error::Done)
1945    }
1946
1947    /// Processes HTTP/3 data received from the peer.
1948    ///
1949    /// On success it returns an [`Event`] and an ID, or [`Done`] when there are
1950    /// no events to report.
1951    ///
1952    /// Note that all events are edge-triggered, meaning that once reported they
1953    /// will not be reported again by calling this method again, until the event
1954    /// is re-armed.
1955    ///
1956    /// The events [`Headers`], [`Data`] and [`Finished`] return a stream ID,
1957    /// which is used in methods [`recv_body()`], [`send_response()`] or
1958    /// [`send_body()`].
1959    ///
1960    /// The event [`GoAway`] returns an ID that depends on the connection role.
1961    /// A client receives the largest processed stream ID. A server receives the
1962    /// the largest permitted push ID.
1963    ///
1964    /// The event [`PriorityUpdate`] only occurs at servers. It returns a
1965    /// prioritized element ID that is used in the method
1966    /// [`take_last_priority_update()`], which rearms the event for that ID.
1967    ///
1968    /// If an error occurs while processing data, the connection is closed with
1969    /// the appropriate error code, using the transport's [`close()`] method.
1970    ///
1971    /// [`Event`]: enum.Event.html
1972    /// [`Done`]: enum.Error.html#variant.Done
1973    /// [`Headers`]: enum.Event.html#variant.Headers
1974    /// [`Data`]: enum.Event.html#variant.Data
1975    /// [`Finished`]: enum.Event.html#variant.Finished
1976    /// [`GoAway`]: enum.Event.html#variant.GoAWay
1977    /// [`PriorityUpdate`]: enum.Event.html#variant.PriorityUpdate
1978    /// [`recv_body()`]: struct.Connection.html#method.recv_body
1979    /// [`send_response()`]: struct.Connection.html#method.send_response
1980    /// [`send_body()`]: struct.Connection.html#method.send_body
1981    /// [`recv_dgram()`]: struct.Connection.html#method.recv_dgram
1982    /// [`take_last_priority_update()`]: struct.Connection.html#method.take_last_priority_update
1983    /// [`close()`]: ../struct.Connection.html#method.close
1984    pub fn poll<F: BufFactory>(
1985        &mut self, conn: &mut super::Connection<F>,
1986    ) -> Result<(u64, Event)> {
1987        // When connection close is initiated by the local application (e.g. due
1988        // to a protocol error), the connection itself might be in a broken
1989        // state, so return early.
1990        if conn.local_error.is_some() {
1991            return Err(Error::Done);
1992        }
1993
1994        // Process control streams first.
1995        if let Some(stream_id) = self.peer_control_stream_id {
1996            match self.process_control_stream(conn, stream_id) {
1997                Ok(ev) => return Ok(ev),
1998
1999                Err(Error::Done) => (),
2000
2001                Err(e) => return Err(e),
2002            };
2003        }
2004
2005        if let Some(stream_id) = self.peer_qpack_streams.encoder_stream_id {
2006            match self.process_control_stream(conn, stream_id) {
2007                Ok(ev) => return Ok(ev),
2008
2009                Err(Error::Done) => (),
2010
2011                Err(e) => return Err(e),
2012            };
2013        }
2014
2015        if let Some(stream_id) = self.peer_qpack_streams.decoder_stream_id {
2016            match self.process_control_stream(conn, stream_id) {
2017                Ok(ev) => return Ok(ev),
2018
2019                Err(Error::Done) => (),
2020
2021                Err(e) => return Err(e),
2022            };
2023        }
2024
2025        // Process finished streams list.
2026        if let Some(finished) = self.finished_streams.pop_front() {
2027            return Ok((finished, Event::Finished));
2028        }
2029
2030        // Process HTTP/3 data from readable streams.
2031        for s in conn.readable() {
2032            trace!("{} stream id {} is readable", conn.trace_id(), s);
2033
2034            let ev = match self.process_readable_stream(conn, s, true) {
2035                Ok(v) => Some(v),
2036
2037                Err(Error::Done) => None,
2038
2039                // Return early if the stream was reset, to avoid returning
2040                // a Finished event later as well.
2041                Err(Error::TransportError(crate::Error::StreamReset(e))) =>
2042                    return Ok((s, Event::Reset(e))),
2043
2044                Err(e) => return Err(e),
2045            };
2046
2047            if conn.stream_finished(s) {
2048                self.process_finished_stream(s);
2049            }
2050
2051            // TODO: check if stream is completed so it can be freed
2052            if let Some(ev) = ev {
2053                return Ok(ev);
2054            }
2055        }
2056
2057        // Process finished streams list once again, to make sure `Finished`
2058        // events are returned when receiving empty stream frames with the fin
2059        // flag set.
2060        if let Some(finished) = self.finished_streams.pop_front() {
2061            if conn.stream_readable(finished) {
2062                // The stream is finished, but is still readable, it may
2063                // indicate that there is a pending error, such as reset.
2064                if let Err(crate::Error::StreamReset(e)) =
2065                    conn.stream_recv(finished, &mut [])
2066                {
2067                    return Ok((finished, Event::Reset(e)));
2068                }
2069            }
2070            return Ok((finished, Event::Finished));
2071        }
2072
2073        Err(Error::Done)
2074    }
2075
2076    /// Sends a GOAWAY frame to initiate graceful connection closure.
2077    ///
2078    /// When quiche is used in the server role, the `id` parameter is the stream
2079    /// ID of the highest processed request. This can be any valid ID between 0
2080    /// and 2^62-4. However, the ID cannot be increased. Failure to satisfy
2081    /// these conditions will return an error.
2082    ///
2083    /// This method does not close the QUIC connection. Applications are
2084    /// required to call [`close()`] themselves.
2085    ///
2086    /// [`close()`]: ../struct.Connection.html#method.close
2087    pub fn send_goaway<F: BufFactory>(
2088        &mut self, conn: &mut super::Connection<F>, id: u64,
2089    ) -> Result<()> {
2090        let mut id = id;
2091
2092        // TODO: server push
2093        //
2094        // In the meantime always send 0 from client.
2095        if !self.is_server {
2096            id = 0;
2097        }
2098
2099        if self.is_server && id % 4 != 0 {
2100            return Err(Error::IdError);
2101        }
2102
2103        if let Some(sent_id) = self.local_goaway_id {
2104            if id > sent_id {
2105                return Err(Error::IdError);
2106            }
2107        }
2108
2109        if let Some(stream_id) = self.control_stream_id {
2110            let mut d = [42; 10];
2111            let mut b = octets::OctetsMut::with_slice(&mut d);
2112
2113            let frame = frame::Frame::GoAway { id };
2114
2115            let wire_len = frame.to_bytes(&mut b)?;
2116            let stream_cap = conn.stream_capacity(stream_id)?;
2117
2118            if stream_cap < wire_len {
2119                return Err(Error::StreamBlocked);
2120            }
2121
2122            trace!("{} tx frm {:?}", conn.trace_id(), frame);
2123
2124            qlog_with_type!(QLOG_FRAME_CREATED, conn.qlog, q, {
2125                let ev_data = EventData::H3FrameCreated(H3FrameCreated {
2126                    stream_id,
2127                    length: Some(octets::varint_len(id) as u64),
2128                    frame: frame.to_qlog(),
2129                    ..Default::default()
2130                });
2131
2132                q.add_event_data_now(ev_data).ok();
2133            });
2134
2135            let off = b.off();
2136            conn.stream_send(stream_id, &d[..off], false)?;
2137
2138            self.local_goaway_id = Some(id);
2139        }
2140
2141        Ok(())
2142    }
2143
2144    /// Gets the raw settings from peer including unknown and reserved types.
2145    ///
2146    /// The order of settings is the same as received in the SETTINGS frame.
2147    pub fn peer_settings_raw(&self) -> Option<&[(u64, u64)]> {
2148        self.peer_settings.raw.as_deref()
2149    }
2150
2151    fn open_uni_stream<F: BufFactory>(
2152        &mut self, conn: &mut super::Connection<F>, ty: u64,
2153    ) -> Result<u64> {
2154        let stream_id = self.next_uni_stream_id;
2155
2156        let mut d = [0; 8];
2157        let mut b = octets::OctetsMut::with_slice(&mut d);
2158
2159        match ty {
2160            // Control and QPACK streams are the most important to schedule.
2161            stream::HTTP3_CONTROL_STREAM_TYPE_ID |
2162            stream::QPACK_ENCODER_STREAM_TYPE_ID |
2163            stream::QPACK_DECODER_STREAM_TYPE_ID => {
2164                conn.stream_priority(stream_id, 0, false)?;
2165            },
2166
2167            // TODO: Server push
2168            stream::HTTP3_PUSH_STREAM_TYPE_ID => (),
2169
2170            // Anything else is a GREASE stream, so make it the least important.
2171            _ => {
2172                conn.stream_priority(stream_id, 255, false)?;
2173            },
2174        }
2175
2176        conn.stream_send(stream_id, b.put_varint(ty)?, false)?;
2177
2178        // To avoid skipping stream IDs, we only calculate the next available
2179        // stream ID when data has been successfully buffered.
2180        self.next_uni_stream_id = self
2181            .next_uni_stream_id
2182            .checked_add(4)
2183            .ok_or(Error::IdError)?;
2184
2185        Ok(stream_id)
2186    }
2187
2188    fn open_qpack_encoder_stream<F: BufFactory>(
2189        &mut self, conn: &mut super::Connection<F>,
2190    ) -> Result<()> {
2191        let stream_id =
2192            self.open_uni_stream(conn, stream::QPACK_ENCODER_STREAM_TYPE_ID)?;
2193
2194        self.local_qpack_streams.encoder_stream_id = Some(stream_id);
2195
2196        qlog_with_type!(QLOG_STREAM_TYPE_SET, conn.qlog, q, {
2197            let ev_data = EventData::H3StreamTypeSet(H3StreamTypeSet {
2198                stream_id,
2199                owner: Some(H3Owner::Local),
2200                stream_type: H3StreamType::QpackEncode,
2201                ..Default::default()
2202            });
2203
2204            q.add_event_data_now(ev_data).ok();
2205        });
2206
2207        Ok(())
2208    }
2209
2210    fn open_qpack_decoder_stream<F: BufFactory>(
2211        &mut self, conn: &mut super::Connection<F>,
2212    ) -> Result<()> {
2213        let stream_id =
2214            self.open_uni_stream(conn, stream::QPACK_DECODER_STREAM_TYPE_ID)?;
2215
2216        self.local_qpack_streams.decoder_stream_id = Some(stream_id);
2217
2218        qlog_with_type!(QLOG_STREAM_TYPE_SET, conn.qlog, q, {
2219            let ev_data = EventData::H3StreamTypeSet(H3StreamTypeSet {
2220                stream_id,
2221                owner: Some(H3Owner::Local),
2222                stream_type: H3StreamType::QpackDecode,
2223                ..Default::default()
2224            });
2225
2226            q.add_event_data_now(ev_data).ok();
2227        });
2228
2229        Ok(())
2230    }
2231
2232    /// Send GREASE frames on the provided stream ID.
2233    fn send_grease_frames<F: BufFactory>(
2234        &mut self, conn: &mut super::Connection<F>, stream_id: u64,
2235    ) -> Result<()> {
2236        let mut d = [0; 8];
2237
2238        let stream_cap = match conn.stream_capacity(stream_id) {
2239            Ok(v) => v,
2240
2241            Err(e) => {
2242                if conn.stream_finished(stream_id) {
2243                    self.streams.remove(&stream_id);
2244                }
2245
2246                return Err(e.into());
2247            },
2248        };
2249
2250        let grease_frame1 = grease_value();
2251        let grease_frame2 = grease_value();
2252        let grease_payload = b"GREASE is the word";
2253
2254        let overhead = octets::varint_len(grease_frame1) + // frame type
2255            1 + // payload len
2256            octets::varint_len(grease_frame2) + // frame type
2257            1 + // payload len
2258            grease_payload.len(); // payload
2259
2260        // Don't send GREASE if there is not enough capacity for it. Greasing
2261        // will _not_ be attempted again later on.
2262        if stream_cap < overhead {
2263            return Ok(());
2264        }
2265
2266        // Empty GREASE frame.
2267        let mut b = octets::OctetsMut::with_slice(&mut d);
2268        conn.stream_send(stream_id, b.put_varint(grease_frame1)?, false)?;
2269
2270        let mut b = octets::OctetsMut::with_slice(&mut d);
2271        conn.stream_send(stream_id, b.put_varint(0)?, false)?;
2272
2273        trace!(
2274            "{} tx frm GREASE stream={} len=0",
2275            conn.trace_id(),
2276            stream_id
2277        );
2278
2279        qlog_with_type!(QLOG_FRAME_CREATED, conn.qlog, q, {
2280            let frame = Http3Frame::Reserved { length: Some(0) };
2281            let ev_data = EventData::H3FrameCreated(H3FrameCreated {
2282                stream_id,
2283                length: Some(0),
2284                frame,
2285                ..Default::default()
2286            });
2287
2288            q.add_event_data_now(ev_data).ok();
2289        });
2290
2291        // GREASE frame with payload.
2292        let mut b = octets::OctetsMut::with_slice(&mut d);
2293        conn.stream_send(stream_id, b.put_varint(grease_frame2)?, false)?;
2294
2295        let mut b = octets::OctetsMut::with_slice(&mut d);
2296        conn.stream_send(stream_id, b.put_varint(18)?, false)?;
2297
2298        conn.stream_send(stream_id, grease_payload, false)?;
2299
2300        trace!(
2301            "{} tx frm GREASE stream={} len={}",
2302            conn.trace_id(),
2303            stream_id,
2304            grease_payload.len()
2305        );
2306
2307        qlog_with_type!(QLOG_FRAME_CREATED, conn.qlog, q, {
2308            let frame = Http3Frame::Reserved {
2309                length: Some(grease_payload.len() as u64),
2310            };
2311            let ev_data = EventData::H3FrameCreated(H3FrameCreated {
2312                stream_id,
2313                length: Some(grease_payload.len() as u64),
2314                frame,
2315                ..Default::default()
2316            });
2317
2318            q.add_event_data_now(ev_data).ok();
2319        });
2320
2321        Ok(())
2322    }
2323
2324    /// Opens a new unidirectional stream with a GREASE type and sends some
2325    /// unframed payload.
2326    fn open_grease_stream<F: BufFactory>(
2327        &mut self, conn: &mut super::Connection<F>,
2328    ) -> Result<()> {
2329        let ty = grease_value();
2330        match self.open_uni_stream(conn, ty) {
2331            Ok(stream_id) => {
2332                conn.stream_send(stream_id, b"GREASE is the word", true)?;
2333
2334                trace!("{} open GREASE stream {}", conn.trace_id(), stream_id);
2335
2336                qlog_with_type!(QLOG_STREAM_TYPE_SET, conn.qlog, q, {
2337                    let ev_data = EventData::H3StreamTypeSet(H3StreamTypeSet {
2338                        stream_id,
2339                        owner: Some(H3Owner::Local),
2340                        stream_type: H3StreamType::Unknown,
2341                        stream_type_value: Some(ty),
2342                        ..Default::default()
2343                    });
2344
2345                    q.add_event_data_now(ev_data).ok();
2346                });
2347            },
2348
2349            Err(Error::IdError) => {
2350                trace!("{} GREASE stream blocked", conn.trace_id(),);
2351
2352                return Ok(());
2353            },
2354
2355            Err(e) => return Err(e),
2356        };
2357
2358        Ok(())
2359    }
2360
2361    /// Sends SETTINGS frame based on HTTP/3 configuration.
2362    fn send_settings<F: BufFactory>(
2363        &mut self, conn: &mut super::Connection<F>,
2364    ) -> Result<()> {
2365        let stream_id = match self
2366            .open_uni_stream(conn, stream::HTTP3_CONTROL_STREAM_TYPE_ID)
2367        {
2368            Ok(v) => v,
2369
2370            Err(e) => {
2371                trace!("{} Control stream blocked", conn.trace_id(),);
2372
2373                if e == Error::Done {
2374                    return Err(Error::InternalError);
2375                }
2376
2377                return Err(e);
2378            },
2379        };
2380
2381        self.control_stream_id = Some(stream_id);
2382
2383        qlog_with_type!(QLOG_STREAM_TYPE_SET, conn.qlog, q, {
2384            let ev_data = EventData::H3StreamTypeSet(H3StreamTypeSet {
2385                stream_id,
2386                owner: Some(H3Owner::Local),
2387                stream_type: H3StreamType::Control,
2388                ..Default::default()
2389            });
2390
2391            q.add_event_data_now(ev_data).ok();
2392        });
2393
2394        let grease = if conn.grease {
2395            Some((grease_value(), grease_value()))
2396        } else {
2397            None
2398        };
2399
2400        let frame = frame::Frame::Settings {
2401            max_field_section_size: self.local_settings.max_field_section_size,
2402            qpack_max_table_capacity: self
2403                .local_settings
2404                .qpack_max_table_capacity,
2405            qpack_blocked_streams: self.local_settings.qpack_blocked_streams,
2406            connect_protocol_enabled: self
2407                .local_settings
2408                .connect_protocol_enabled,
2409            h3_datagram: self.local_settings.h3_datagram,
2410            grease,
2411            additional_settings: self.local_settings.additional_settings.clone(),
2412            raw: Default::default(),
2413        };
2414
2415        let mut d = [42; 128];
2416        let mut b = octets::OctetsMut::with_slice(&mut d);
2417
2418        frame.to_bytes(&mut b)?;
2419
2420        let off = b.off();
2421
2422        if let Some(id) = self.control_stream_id {
2423            conn.stream_send(id, &d[..off], false)?;
2424
2425            trace!(
2426                "{} tx frm SETTINGS stream={} len={}",
2427                conn.trace_id(),
2428                id,
2429                off
2430            );
2431
2432            qlog_with_type!(QLOG_FRAME_CREATED, conn.qlog, q, {
2433                let frame = frame.to_qlog();
2434                let ev_data = EventData::H3FrameCreated(H3FrameCreated {
2435                    stream_id: id,
2436                    length: Some(off as u64),
2437                    frame,
2438                    ..Default::default()
2439                });
2440
2441                q.add_event_data_now(ev_data).ok();
2442            });
2443        }
2444
2445        Ok(())
2446    }
2447
2448    fn process_control_stream<F: BufFactory>(
2449        &mut self, conn: &mut super::Connection<F>, stream_id: u64,
2450    ) -> Result<(u64, Event)> {
2451        close_conn_if_critical_stream_finished(conn, stream_id)?;
2452
2453        if !conn.stream_readable(stream_id) {
2454            return Err(Error::Done);
2455        }
2456
2457        match self.process_readable_stream(conn, stream_id, true) {
2458            Ok(ev) => return Ok(ev),
2459
2460            Err(Error::Done) => (),
2461
2462            Err(e) => return Err(e),
2463        };
2464
2465        close_conn_if_critical_stream_finished(conn, stream_id)?;
2466
2467        Err(Error::Done)
2468    }
2469
2470    fn process_readable_stream<F: BufFactory>(
2471        &mut self, conn: &mut super::Connection<F>, stream_id: u64, polling: bool,
2472    ) -> Result<(u64, Event)> {
2473        self.streams
2474            .entry(stream_id)
2475            .or_insert_with(|| <stream::Stream>::new(stream_id, false));
2476
2477        // We need to get a fresh reference to the stream for each
2478        // iteration, to avoid borrowing `self` for the entire duration
2479        // of the loop, because we'll need to borrow it again in the
2480        // `State::FramePayload` case below.
2481        while let Some(stream) = self.streams.get_mut(&stream_id) {
2482            match stream.state() {
2483                stream::State::StreamType => {
2484                    stream.try_fill_buffer(conn)?;
2485
2486                    let varint = match stream.try_consume_varint() {
2487                        Ok(v) => v,
2488
2489                        Err(_) => continue,
2490                    };
2491
2492                    let ty = stream::Type::deserialize(varint)?;
2493
2494                    if let Err(e) = stream.set_ty(ty) {
2495                        conn.close(true, e.to_wire(), b"")?;
2496                        return Err(e);
2497                    }
2498
2499                    qlog_with_type!(QLOG_STREAM_TYPE_SET, conn.qlog, q, {
2500                        let ty_val = if matches!(ty, stream::Type::Unknown) {
2501                            Some(varint)
2502                        } else {
2503                            None
2504                        };
2505
2506                        let ev_data =
2507                            EventData::H3StreamTypeSet(H3StreamTypeSet {
2508                                stream_id,
2509                                owner: Some(H3Owner::Remote),
2510                                stream_type: ty.to_qlog(),
2511                                stream_type_value: ty_val,
2512                                ..Default::default()
2513                            });
2514
2515                        q.add_event_data_now(ev_data).ok();
2516                    });
2517
2518                    match &ty {
2519                        stream::Type::Control => {
2520                            // Only one control stream allowed.
2521                            if self.peer_control_stream_id.is_some() {
2522                                conn.close(
2523                                    true,
2524                                    Error::StreamCreationError.to_wire(),
2525                                    b"Received multiple control streams",
2526                                )?;
2527
2528                                return Err(Error::StreamCreationError);
2529                            }
2530
2531                            trace!(
2532                                "{} open peer's control stream {}",
2533                                conn.trace_id(),
2534                                stream_id
2535                            );
2536
2537                            close_conn_if_critical_stream_finished(
2538                                conn, stream_id,
2539                            )?;
2540
2541                            self.peer_control_stream_id = Some(stream_id);
2542                        },
2543
2544                        stream::Type::Push => {
2545                            // Only clients can receive push stream.
2546                            if self.is_server {
2547                                conn.close(
2548                                    true,
2549                                    Error::StreamCreationError.to_wire(),
2550                                    b"Server received push stream.",
2551                                )?;
2552
2553                                return Err(Error::StreamCreationError);
2554                            }
2555                        },
2556
2557                        stream::Type::QpackEncoder => {
2558                            // Only one qpack encoder stream allowed.
2559                            if self.peer_qpack_streams.encoder_stream_id.is_some()
2560                            {
2561                                conn.close(
2562                                    true,
2563                                    Error::StreamCreationError.to_wire(),
2564                                    b"Received multiple QPACK encoder streams",
2565                                )?;
2566
2567                                return Err(Error::StreamCreationError);
2568                            }
2569
2570                            close_conn_if_critical_stream_finished(
2571                                conn, stream_id,
2572                            )?;
2573
2574                            self.peer_qpack_streams.encoder_stream_id =
2575                                Some(stream_id);
2576                        },
2577
2578                        stream::Type::QpackDecoder => {
2579                            // Only one qpack decoder allowed.
2580                            if self.peer_qpack_streams.decoder_stream_id.is_some()
2581                            {
2582                                conn.close(
2583                                    true,
2584                                    Error::StreamCreationError.to_wire(),
2585                                    b"Received multiple QPACK decoder streams",
2586                                )?;
2587
2588                                return Err(Error::StreamCreationError);
2589                            }
2590
2591                            close_conn_if_critical_stream_finished(
2592                                conn, stream_id,
2593                            )?;
2594
2595                            self.peer_qpack_streams.decoder_stream_id =
2596                                Some(stream_id);
2597                        },
2598
2599                        stream::Type::Unknown => {
2600                            // Unknown stream types are ignored.
2601                            // TODO: we MAY send STOP_SENDING
2602                        },
2603
2604                        stream::Type::Request => unreachable!(),
2605                    }
2606                },
2607
2608                stream::State::PushId => {
2609                    stream.try_fill_buffer(conn)?;
2610
2611                    let varint = match stream.try_consume_varint() {
2612                        Ok(v) => v,
2613
2614                        Err(_) => continue,
2615                    };
2616
2617                    if let Err(e) = stream.set_push_id(varint) {
2618                        conn.close(true, e.to_wire(), b"")?;
2619                        return Err(e);
2620                    }
2621                },
2622
2623                stream::State::FrameType => {
2624                    stream.try_fill_buffer(conn)?;
2625
2626                    let varint = match stream.try_consume_varint() {
2627                        Ok(v) => v,
2628
2629                        Err(_) => continue,
2630                    };
2631
2632                    match stream.set_frame_type(varint) {
2633                        Err(Error::FrameUnexpected) => {
2634                            let msg = format!("Unexpected frame type {varint}");
2635
2636                            conn.close(
2637                                true,
2638                                Error::FrameUnexpected.to_wire(),
2639                                msg.as_bytes(),
2640                            )?;
2641
2642                            return Err(Error::FrameUnexpected);
2643                        },
2644
2645                        Err(e) => {
2646                            conn.close(
2647                                true,
2648                                e.to_wire(),
2649                                b"Error handling frame.",
2650                            )?;
2651
2652                            return Err(e);
2653                        },
2654
2655                        _ => (),
2656                    }
2657                },
2658
2659                stream::State::FramePayloadLen => {
2660                    stream.try_fill_buffer(conn)?;
2661
2662                    let payload_len = match stream.try_consume_varint() {
2663                        Ok(v) => v,
2664
2665                        Err(_) => continue,
2666                    };
2667
2668                    // DATA frames are handled uniquely. After this point we lose
2669                    // visibility of DATA framing, so just log here.
2670                    if Some(frame::DATA_FRAME_TYPE_ID) == stream.frame_type() {
2671                        trace!(
2672                            "{} rx frm DATA stream={} wire_payload_len={}",
2673                            conn.trace_id(),
2674                            stream_id,
2675                            payload_len
2676                        );
2677
2678                        qlog_with_type!(QLOG_FRAME_PARSED, conn.qlog, q, {
2679                            let frame = Http3Frame::Data { raw: None };
2680
2681                            let ev_data =
2682                                EventData::H3FrameParsed(H3FrameParsed {
2683                                    stream_id,
2684                                    length: Some(payload_len),
2685                                    frame,
2686                                    ..Default::default()
2687                                });
2688
2689                            q.add_event_data_now(ev_data).ok();
2690                        });
2691                    }
2692
2693                    if let Err(e) = stream.set_frame_payload_len(payload_len) {
2694                        conn.close(true, e.to_wire(), b"")?;
2695                        return Err(e);
2696                    }
2697                },
2698
2699                stream::State::FramePayload => {
2700                    // Do not emit events when not polling.
2701                    if !polling {
2702                        break;
2703                    }
2704
2705                    stream.try_fill_buffer(conn)?;
2706
2707                    let (frame, payload_len) = match stream.try_consume_frame() {
2708                        Ok(frame) => frame,
2709
2710                        Err(Error::Done) => return Err(Error::Done),
2711
2712                        Err(e) => {
2713                            conn.close(
2714                                true,
2715                                e.to_wire(),
2716                                b"Error handling frame.",
2717                            )?;
2718
2719                            return Err(e);
2720                        },
2721                    };
2722
2723                    match self.process_frame(conn, stream_id, frame, payload_len)
2724                    {
2725                        Ok(ev) => return Ok(ev),
2726
2727                        Err(Error::Done) => {
2728                            // This might be a frame that is processed internally
2729                            // without needing to bubble up to the user as an
2730                            // event. Check whether the frame has FIN'd by QUIC
2731                            // to prevent trying to read again on a closed stream.
2732                            if conn.stream_finished(stream_id) {
2733                                break;
2734                            }
2735                        },
2736
2737                        Err(e) => return Err(e),
2738                    };
2739                },
2740
2741                stream::State::Data => {
2742                    // Do not emit events when not polling.
2743                    if !polling {
2744                        break;
2745                    }
2746
2747                    if !stream.try_trigger_data_event() {
2748                        break;
2749                    }
2750
2751                    return Ok((stream_id, Event::Data));
2752                },
2753
2754                stream::State::QpackInstruction => {
2755                    let mut d = [0; 4096];
2756
2757                    // Read data from the stream and discard immediately.
2758                    loop {
2759                        let (recv, fin) = conn.stream_recv(stream_id, &mut d)?;
2760
2761                        match stream.ty() {
2762                            Some(stream::Type::QpackEncoder) =>
2763                                self.peer_qpack_streams.encoder_stream_bytes +=
2764                                    recv as u64,
2765                            Some(stream::Type::QpackDecoder) =>
2766                                self.peer_qpack_streams.decoder_stream_bytes +=
2767                                    recv as u64,
2768                            _ => unreachable!(),
2769                        };
2770
2771                        if fin {
2772                            close_conn_critical_stream(conn)?;
2773                        }
2774                    }
2775                },
2776
2777                stream::State::Drain => {
2778                    // Discard incoming data on the stream.
2779                    conn.stream_shutdown(
2780                        stream_id,
2781                        crate::Shutdown::Read,
2782                        0x100,
2783                    )?;
2784
2785                    break;
2786                },
2787
2788                stream::State::Finished => break,
2789            }
2790        }
2791
2792        Err(Error::Done)
2793    }
2794
2795    fn process_finished_stream(&mut self, stream_id: u64) {
2796        let stream = match self.streams.get_mut(&stream_id) {
2797            Some(v) => v,
2798
2799            None => return,
2800        };
2801
2802        if stream.state() == stream::State::Finished {
2803            return;
2804        }
2805
2806        match stream.ty() {
2807            Some(stream::Type::Request) | Some(stream::Type::Push) => {
2808                stream.finished();
2809
2810                self.finished_streams.push_back(stream_id);
2811            },
2812
2813            _ => (),
2814        };
2815    }
2816
2817    fn process_frame<F: BufFactory>(
2818        &mut self, conn: &mut super::Connection<F>, stream_id: u64,
2819        frame: frame::Frame, payload_len: u64,
2820    ) -> Result<(u64, Event)> {
2821        trace!(
2822            "{} rx frm {:?} stream={} payload_len={}",
2823            conn.trace_id(),
2824            frame,
2825            stream_id,
2826            payload_len
2827        );
2828
2829        qlog_with_type!(QLOG_FRAME_PARSED, conn.qlog, q, {
2830            // HEADERS frames are special case and will be logged below.
2831            if !matches!(frame, frame::Frame::Headers { .. }) {
2832                let frame = frame.to_qlog();
2833                let ev_data = EventData::H3FrameParsed(H3FrameParsed {
2834                    stream_id,
2835                    length: Some(payload_len),
2836                    frame,
2837                    ..Default::default()
2838                });
2839
2840                q.add_event_data_now(ev_data).ok();
2841            }
2842        });
2843
2844        match frame {
2845            frame::Frame::Settings {
2846                max_field_section_size,
2847                qpack_max_table_capacity,
2848                qpack_blocked_streams,
2849                connect_protocol_enabled,
2850                h3_datagram,
2851                additional_settings,
2852                raw,
2853                ..
2854            } => {
2855                self.peer_settings = ConnectionSettings {
2856                    max_field_section_size,
2857                    qpack_max_table_capacity,
2858                    qpack_blocked_streams,
2859                    connect_protocol_enabled,
2860                    h3_datagram,
2861                    additional_settings,
2862                    raw,
2863                };
2864
2865                if let Some(1) = h3_datagram {
2866                    // The peer MUST have also enabled DATAGRAM with a TP
2867                    if conn.dgram_max_writable_len().is_none() {
2868                        conn.close(
2869                            true,
2870                            Error::SettingsError.to_wire(),
2871                            b"H3_DATAGRAM sent with value 1 but max_datagram_frame_size TP not set.",
2872                        )?;
2873
2874                        return Err(Error::SettingsError);
2875                    }
2876                }
2877            },
2878
2879            frame::Frame::Headers { header_block } => {
2880                if Some(stream_id) == self.peer_control_stream_id {
2881                    conn.close(
2882                        true,
2883                        Error::FrameUnexpected.to_wire(),
2884                        b"HEADERS received on control stream",
2885                    )?;
2886
2887                    return Err(Error::FrameUnexpected);
2888                }
2889
2890                // Servers reject too many HEADERS frames.
2891                if let Some(s) = self.streams.get_mut(&stream_id) {
2892                    if self.is_server && s.headers_received_count() == 2 {
2893                        conn.close(
2894                            true,
2895                            Error::FrameUnexpected.to_wire(),
2896                            b"Too many HEADERS frames",
2897                        )?;
2898                        return Err(Error::FrameUnexpected);
2899                    }
2900
2901                    s.increment_headers_received();
2902                }
2903
2904                // Use "infinite" as default value for max_field_section_size if
2905                // it is not configured by the application.
2906                let max_size = self
2907                    .local_settings
2908                    .max_field_section_size
2909                    .unwrap_or(u64::MAX);
2910
2911                let headers = match self
2912                    .qpack_decoder
2913                    .decode(&header_block[..], max_size)
2914                {
2915                    Ok(v) => v,
2916
2917                    Err(e) => {
2918                        let e = match e {
2919                            qpack::Error::HeaderListTooLarge =>
2920                                Error::ExcessiveLoad,
2921
2922                            _ => Error::QpackDecompressionFailed,
2923                        };
2924
2925                        conn.close(true, e.to_wire(), b"Error parsing headers.")?;
2926
2927                        return Err(e);
2928                    },
2929                };
2930
2931                qlog_with_type!(QLOG_FRAME_PARSED, conn.qlog, q, {
2932                    let qlog_headers = headers
2933                        .iter()
2934                        .map(|h| qlog::events::h3::HttpHeader {
2935                            name: String::from_utf8_lossy(h.name()).into_owned(),
2936                            value: String::from_utf8_lossy(h.value())
2937                                .into_owned(),
2938                        })
2939                        .collect();
2940
2941                    let frame = Http3Frame::Headers {
2942                        headers: qlog_headers,
2943                    };
2944
2945                    let ev_data = EventData::H3FrameParsed(H3FrameParsed {
2946                        stream_id,
2947                        length: Some(payload_len),
2948                        frame,
2949                        ..Default::default()
2950                    });
2951
2952                    q.add_event_data_now(ev_data).ok();
2953                });
2954
2955                let more_frames = !conn.stream_finished(stream_id);
2956
2957                return Ok((stream_id, Event::Headers {
2958                    list: headers,
2959                    more_frames,
2960                }));
2961            },
2962
2963            frame::Frame::Data { .. } => {
2964                if Some(stream_id) == self.peer_control_stream_id {
2965                    conn.close(
2966                        true,
2967                        Error::FrameUnexpected.to_wire(),
2968                        b"DATA received on control stream",
2969                    )?;
2970
2971                    return Err(Error::FrameUnexpected);
2972                }
2973
2974                // Do nothing. The Data event is returned separately.
2975            },
2976
2977            frame::Frame::GoAway { id } => {
2978                if Some(stream_id) != self.peer_control_stream_id {
2979                    conn.close(
2980                        true,
2981                        Error::FrameUnexpected.to_wire(),
2982                        b"GOAWAY received on non-control stream",
2983                    )?;
2984
2985                    return Err(Error::FrameUnexpected);
2986                }
2987
2988                if !self.is_server && id % 4 != 0 {
2989                    conn.close(
2990                        true,
2991                        Error::FrameUnexpected.to_wire(),
2992                        b"GOAWAY received with ID of non-request stream",
2993                    )?;
2994
2995                    return Err(Error::IdError);
2996                }
2997
2998                if let Some(received_id) = self.peer_goaway_id {
2999                    if id > received_id {
3000                        conn.close(
3001                            true,
3002                            Error::IdError.to_wire(),
3003                            b"GOAWAY received with ID larger than previously received",
3004                        )?;
3005
3006                        return Err(Error::IdError);
3007                    }
3008                }
3009
3010                self.peer_goaway_id = Some(id);
3011
3012                return Ok((id, Event::GoAway));
3013            },
3014
3015            frame::Frame::MaxPushId { push_id } => {
3016                if Some(stream_id) != self.peer_control_stream_id {
3017                    conn.close(
3018                        true,
3019                        Error::FrameUnexpected.to_wire(),
3020                        b"MAX_PUSH_ID received on non-control stream",
3021                    )?;
3022
3023                    return Err(Error::FrameUnexpected);
3024                }
3025
3026                if !self.is_server {
3027                    conn.close(
3028                        true,
3029                        Error::FrameUnexpected.to_wire(),
3030                        b"MAX_PUSH_ID received by client",
3031                    )?;
3032
3033                    return Err(Error::FrameUnexpected);
3034                }
3035
3036                if push_id < self.max_push_id {
3037                    conn.close(
3038                        true,
3039                        Error::IdError.to_wire(),
3040                        b"MAX_PUSH_ID reduced limit",
3041                    )?;
3042
3043                    return Err(Error::IdError);
3044                }
3045
3046                self.max_push_id = push_id;
3047            },
3048
3049            frame::Frame::PushPromise { .. } => {
3050                if self.is_server {
3051                    conn.close(
3052                        true,
3053                        Error::FrameUnexpected.to_wire(),
3054                        b"PUSH_PROMISE received by server",
3055                    )?;
3056
3057                    return Err(Error::FrameUnexpected);
3058                }
3059
3060                if stream_id % 4 != 0 {
3061                    conn.close(
3062                        true,
3063                        Error::FrameUnexpected.to_wire(),
3064                        b"PUSH_PROMISE received on non-request stream",
3065                    )?;
3066
3067                    return Err(Error::FrameUnexpected);
3068                }
3069
3070                // TODO: implement more checks and PUSH_PROMISE event
3071            },
3072
3073            frame::Frame::CancelPush { .. } => {
3074                if Some(stream_id) != self.peer_control_stream_id {
3075                    conn.close(
3076                        true,
3077                        Error::FrameUnexpected.to_wire(),
3078                        b"CANCEL_PUSH received on non-control stream",
3079                    )?;
3080
3081                    return Err(Error::FrameUnexpected);
3082                }
3083
3084                // TODO: implement CANCEL_PUSH frame
3085            },
3086
3087            frame::Frame::PriorityUpdateRequest {
3088                prioritized_element_id,
3089                priority_field_value,
3090            } => {
3091                if !self.is_server {
3092                    conn.close(
3093                        true,
3094                        Error::FrameUnexpected.to_wire(),
3095                        b"PRIORITY_UPDATE received by client",
3096                    )?;
3097
3098                    return Err(Error::FrameUnexpected);
3099                }
3100
3101                if Some(stream_id) != self.peer_control_stream_id {
3102                    conn.close(
3103                        true,
3104                        Error::FrameUnexpected.to_wire(),
3105                        b"PRIORITY_UPDATE received on non-control stream",
3106                    )?;
3107
3108                    return Err(Error::FrameUnexpected);
3109                }
3110
3111                if prioritized_element_id % 4 != 0 {
3112                    conn.close(
3113                        true,
3114                        Error::FrameUnexpected.to_wire(),
3115                        b"PRIORITY_UPDATE for request stream type with wrong ID",
3116                    )?;
3117
3118                    return Err(Error::FrameUnexpected);
3119                }
3120
3121                if prioritized_element_id > conn.streams.max_streams_bidi() * 4 {
3122                    conn.close(
3123                        true,
3124                        Error::IdError.to_wire(),
3125                        b"PRIORITY_UPDATE for request stream beyond max streams limit",
3126                    )?;
3127
3128                    return Err(Error::IdError);
3129                }
3130
3131                // If the PRIORITY_UPDATE is valid, consider storing the latest
3132                // contents. Due to reordering, it is possible that we might
3133                // receive frames that reference streams that have not yet to
3134                // been opened and that's OK because it's within our concurrency
3135                // limit. However, we discard PRIORITY_UPDATE that refers to
3136                // streams that we know have been collected.
3137                if conn.streams.is_collected(prioritized_element_id) {
3138                    return Err(Error::Done);
3139                }
3140
3141                // If the stream did not yet exist, create it and store.
3142                let stream =
3143                    self.streams.entry(prioritized_element_id).or_insert_with(
3144                        || <stream::Stream>::new(prioritized_element_id, false),
3145                    );
3146
3147                let had_priority_update = stream.has_last_priority_update();
3148                stream.set_last_priority_update(Some(priority_field_value));
3149
3150                // Only trigger the event when there wasn't already a stored
3151                // PRIORITY_UPDATE.
3152                if !had_priority_update {
3153                    return Ok((prioritized_element_id, Event::PriorityUpdate));
3154                } else {
3155                    return Err(Error::Done);
3156                }
3157            },
3158
3159            frame::Frame::PriorityUpdatePush {
3160                prioritized_element_id,
3161                ..
3162            } => {
3163                if !self.is_server {
3164                    conn.close(
3165                        true,
3166                        Error::FrameUnexpected.to_wire(),
3167                        b"PRIORITY_UPDATE received by client",
3168                    )?;
3169
3170                    return Err(Error::FrameUnexpected);
3171                }
3172
3173                if Some(stream_id) != self.peer_control_stream_id {
3174                    conn.close(
3175                        true,
3176                        Error::FrameUnexpected.to_wire(),
3177                        b"PRIORITY_UPDATE received on non-control stream",
3178                    )?;
3179
3180                    return Err(Error::FrameUnexpected);
3181                }
3182
3183                if prioritized_element_id % 3 != 0 {
3184                    conn.close(
3185                        true,
3186                        Error::FrameUnexpected.to_wire(),
3187                        b"PRIORITY_UPDATE for push stream type with wrong ID",
3188                    )?;
3189
3190                    return Err(Error::FrameUnexpected);
3191                }
3192
3193                // TODO: we only implement this if we implement server push
3194            },
3195
3196            frame::Frame::Unknown { .. } => (),
3197        }
3198
3199        Err(Error::Done)
3200    }
3201
3202    /// Collects and returns statistics about the connection.
3203    #[inline]
3204    pub fn stats(&self) -> Stats {
3205        Stats {
3206            qpack_encoder_stream_recv_bytes: self
3207                .peer_qpack_streams
3208                .encoder_stream_bytes,
3209            qpack_decoder_stream_recv_bytes: self
3210                .peer_qpack_streams
3211                .decoder_stream_bytes,
3212        }
3213    }
3214}
3215
3216/// Generates an HTTP/3 GREASE variable length integer.
3217pub fn grease_value() -> u64 {
3218    let n = super::rand::rand_u64_uniform(148_764_065_110_560_899);
3219    31 * n + 33
3220}
3221
3222#[doc(hidden)]
3223pub mod testing {
3224    use super::*;
3225
3226    use crate::testing;
3227
3228    /// Session is an HTTP/3 test helper structure. It holds a client, server
3229    /// and pipe that allows them to communicate.
3230    ///
3231    /// `default()` creates a session with some sensible default
3232    /// configuration. `with_configs()` allows for providing a specific
3233    /// configuration.
3234    ///
3235    /// `handshake()` performs all the steps needed to establish an HTTP/3
3236    /// connection.
3237    ///
3238    /// Some utility functions are provided that make it less verbose to send
3239    /// request, responses and individual headers. The full quiche API remains
3240    /// available for any test that need to do unconventional things (such as
3241    /// bad behaviour that triggers errors).
3242    pub struct Session {
3243        pub pipe: testing::Pipe,
3244        pub client: Connection,
3245        pub server: Connection,
3246    }
3247
3248    impl Session {
3249        pub fn new() -> Result<Session> {
3250            fn path_relative_to_manifest_dir(path: &str) -> String {
3251                std::fs::canonicalize(
3252                    std::path::Path::new(env!("CARGO_MANIFEST_DIR")).join(path),
3253                )
3254                .unwrap()
3255                .to_string_lossy()
3256                .into_owned()
3257            }
3258
3259            let mut config = crate::Config::new(crate::PROTOCOL_VERSION)?;
3260            config.load_cert_chain_from_pem_file(
3261                &path_relative_to_manifest_dir("examples/cert.crt"),
3262            )?;
3263            config.load_priv_key_from_pem_file(
3264                &path_relative_to_manifest_dir("examples/cert.key"),
3265            )?;
3266            config.set_application_protos(&[b"h3"])?;
3267            config.set_initial_max_data(1500);
3268            config.set_initial_max_stream_data_bidi_local(150);
3269            config.set_initial_max_stream_data_bidi_remote(150);
3270            config.set_initial_max_stream_data_uni(150);
3271            config.set_initial_max_streams_bidi(5);
3272            config.set_initial_max_streams_uni(5);
3273            config.verify_peer(false);
3274            config.enable_dgram(true, 3, 3);
3275            config.set_ack_delay_exponent(8);
3276
3277            let h3_config = Config::new()?;
3278            Session::with_configs(&mut config, &h3_config)
3279        }
3280
3281        pub fn with_configs(
3282            config: &mut crate::Config, h3_config: &Config,
3283        ) -> Result<Session> {
3284            let pipe = testing::Pipe::with_config(config)?;
3285            let client_dgram = pipe.client.dgram_enabled();
3286            let server_dgram = pipe.server.dgram_enabled();
3287            Ok(Session {
3288                pipe,
3289                client: Connection::new(h3_config, false, client_dgram)?,
3290                server: Connection::new(h3_config, true, server_dgram)?,
3291            })
3292        }
3293
3294        /// Do the HTTP/3 handshake so both ends are in sane initial state.
3295        pub fn handshake(&mut self) -> Result<()> {
3296            self.pipe.handshake()?;
3297
3298            // Client streams.
3299            self.client.send_settings(&mut self.pipe.client)?;
3300            self.pipe.advance().ok();
3301
3302            self.client
3303                .open_qpack_encoder_stream(&mut self.pipe.client)?;
3304            self.pipe.advance().ok();
3305
3306            self.client
3307                .open_qpack_decoder_stream(&mut self.pipe.client)?;
3308            self.pipe.advance().ok();
3309
3310            if self.pipe.client.grease {
3311                self.client.open_grease_stream(&mut self.pipe.client)?;
3312            }
3313
3314            self.pipe.advance().ok();
3315
3316            // Server streams.
3317            self.server.send_settings(&mut self.pipe.server)?;
3318            self.pipe.advance().ok();
3319
3320            self.server
3321                .open_qpack_encoder_stream(&mut self.pipe.server)?;
3322            self.pipe.advance().ok();
3323
3324            self.server
3325                .open_qpack_decoder_stream(&mut self.pipe.server)?;
3326            self.pipe.advance().ok();
3327
3328            if self.pipe.server.grease {
3329                self.server.open_grease_stream(&mut self.pipe.server)?;
3330            }
3331
3332            self.advance().ok();
3333
3334            while self.client.poll(&mut self.pipe.client).is_ok() {
3335                // Do nothing.
3336            }
3337
3338            while self.server.poll(&mut self.pipe.server).is_ok() {
3339                // Do nothing.
3340            }
3341
3342            Ok(())
3343        }
3344
3345        /// Advances the session pipe over the buffer.
3346        pub fn advance(&mut self) -> crate::Result<()> {
3347            self.pipe.advance()
3348        }
3349
3350        /// Polls the client for events.
3351        pub fn poll_client(&mut self) -> Result<(u64, Event)> {
3352            self.client.poll(&mut self.pipe.client)
3353        }
3354
3355        /// Polls the server for events.
3356        pub fn poll_server(&mut self) -> Result<(u64, Event)> {
3357            self.server.poll(&mut self.pipe.server)
3358        }
3359
3360        /// Sends a request from client with default headers.
3361        ///
3362        /// On success it returns the newly allocated stream and the headers.
3363        pub fn send_request(&mut self, fin: bool) -> Result<(u64, Vec<Header>)> {
3364            let req = vec![
3365                Header::new(b":method", b"GET"),
3366                Header::new(b":scheme", b"https"),
3367                Header::new(b":authority", b"quic.tech"),
3368                Header::new(b":path", b"/test"),
3369                Header::new(b"user-agent", b"quiche-test"),
3370            ];
3371
3372            let stream =
3373                self.client.send_request(&mut self.pipe.client, &req, fin)?;
3374
3375            self.advance().ok();
3376
3377            Ok((stream, req))
3378        }
3379
3380        /// Sends a response from server with default headers.
3381        ///
3382        /// On success it returns the headers.
3383        pub fn send_response(
3384            &mut self, stream: u64, fin: bool,
3385        ) -> Result<Vec<Header>> {
3386            let resp = vec![
3387                Header::new(b":status", b"200"),
3388                Header::new(b"server", b"quiche-test"),
3389            ];
3390
3391            self.server.send_response(
3392                &mut self.pipe.server,
3393                stream,
3394                &resp,
3395                fin,
3396            )?;
3397
3398            self.advance().ok();
3399
3400            Ok(resp)
3401        }
3402
3403        /// Sends some default payload from client.
3404        ///
3405        /// On success it returns the payload.
3406        pub fn send_body_client(
3407            &mut self, stream: u64, fin: bool,
3408        ) -> Result<Vec<u8>> {
3409            let bytes = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
3410
3411            self.client
3412                .send_body(&mut self.pipe.client, stream, &bytes, fin)?;
3413
3414            self.advance().ok();
3415
3416            Ok(bytes)
3417        }
3418
3419        /// Fetches DATA payload from the server.
3420        ///
3421        /// On success it returns the number of bytes received.
3422        pub fn recv_body_client(
3423            &mut self, stream: u64, buf: &mut [u8],
3424        ) -> Result<usize> {
3425            self.client.recv_body(&mut self.pipe.client, stream, buf)
3426        }
3427
3428        /// Sends some default payload from server.
3429        ///
3430        /// On success it returns the payload.
3431        pub fn send_body_server(
3432            &mut self, stream: u64, fin: bool,
3433        ) -> Result<Vec<u8>> {
3434            let bytes = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
3435
3436            self.server
3437                .send_body(&mut self.pipe.server, stream, &bytes, fin)?;
3438
3439            self.advance().ok();
3440
3441            Ok(bytes)
3442        }
3443
3444        /// Fetches DATA payload from the client.
3445        ///
3446        /// On success it returns the number of bytes received.
3447        pub fn recv_body_server(
3448            &mut self, stream: u64, buf: &mut [u8],
3449        ) -> Result<usize> {
3450            self.server.recv_body(&mut self.pipe.server, stream, buf)
3451        }
3452
3453        /// Sends a single HTTP/3 frame from the client.
3454        pub fn send_frame_client(
3455            &mut self, frame: frame::Frame, stream_id: u64, fin: bool,
3456        ) -> Result<()> {
3457            let mut d = [42; 65535];
3458
3459            let mut b = octets::OctetsMut::with_slice(&mut d);
3460
3461            frame.to_bytes(&mut b)?;
3462
3463            let off = b.off();
3464            self.pipe.client.stream_send(stream_id, &d[..off], fin)?;
3465
3466            self.advance().ok();
3467
3468            Ok(())
3469        }
3470
3471        /// Send an HTTP/3 DATAGRAM with default data from the client.
3472        ///
3473        /// On success it returns the data.
3474        pub fn send_dgram_client(&mut self, flow_id: u64) -> Result<Vec<u8>> {
3475            let bytes = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
3476            let len = octets::varint_len(flow_id) + bytes.len();
3477            let mut d = vec![0; len];
3478            let mut b = octets::OctetsMut::with_slice(&mut d);
3479
3480            b.put_varint(flow_id)?;
3481            b.put_bytes(&bytes)?;
3482
3483            self.pipe.client.dgram_send(&d)?;
3484
3485            self.advance().ok();
3486
3487            Ok(bytes)
3488        }
3489
3490        /// Receives an HTTP/3 DATAGRAM from the server.
3491        ///
3492        /// On success it returns the DATAGRAM length, flow ID and flow ID
3493        /// length.
3494        pub fn recv_dgram_client(
3495            &mut self, buf: &mut [u8],
3496        ) -> Result<(usize, u64, usize)> {
3497            let len = self.pipe.client.dgram_recv(buf)?;
3498            let mut b = octets::Octets::with_slice(buf);
3499            let flow_id = b.get_varint()?;
3500
3501            Ok((len, flow_id, b.off()))
3502        }
3503
3504        /// Send an HTTP/3 DATAGRAM with default data from the server
3505        ///
3506        /// On success it returns the data.
3507        pub fn send_dgram_server(&mut self, flow_id: u64) -> Result<Vec<u8>> {
3508            let bytes = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
3509            let len = octets::varint_len(flow_id) + bytes.len();
3510            let mut d = vec![0; len];
3511            let mut b = octets::OctetsMut::with_slice(&mut d);
3512
3513            b.put_varint(flow_id)?;
3514            b.put_bytes(&bytes)?;
3515
3516            self.pipe.server.dgram_send(&d)?;
3517
3518            self.advance().ok();
3519
3520            Ok(bytes)
3521        }
3522
3523        /// Receives an HTTP/3 DATAGRAM from the client.
3524        ///
3525        /// On success it returns the DATAGRAM length, flow ID and flow ID
3526        /// length.
3527        pub fn recv_dgram_server(
3528            &mut self, buf: &mut [u8],
3529        ) -> Result<(usize, u64, usize)> {
3530            let len = self.pipe.server.dgram_recv(buf)?;
3531            let mut b = octets::Octets::with_slice(buf);
3532            let flow_id = b.get_varint()?;
3533
3534            Ok((len, flow_id, b.off()))
3535        }
3536
3537        /// Sends a single HTTP/3 frame from the server.
3538        pub fn send_frame_server(
3539            &mut self, frame: frame::Frame, stream_id: u64, fin: bool,
3540        ) -> Result<()> {
3541            let mut d = [42; 65535];
3542
3543            let mut b = octets::OctetsMut::with_slice(&mut d);
3544
3545            frame.to_bytes(&mut b)?;
3546
3547            let off = b.off();
3548            self.pipe.server.stream_send(stream_id, &d[..off], fin)?;
3549
3550            self.advance().ok();
3551
3552            Ok(())
3553        }
3554
3555        /// Sends an arbitrary buffer of HTTP/3 stream data from the client.
3556        pub fn send_arbitrary_stream_data_client(
3557            &mut self, data: &[u8], stream_id: u64, fin: bool,
3558        ) -> Result<()> {
3559            self.pipe.client.stream_send(stream_id, data, fin)?;
3560
3561            self.advance().ok();
3562
3563            Ok(())
3564        }
3565
3566        /// Sends an arbitrary buffer of HTTP/3 stream data from the server.
3567        pub fn send_arbitrary_stream_data_server(
3568            &mut self, data: &[u8], stream_id: u64, fin: bool,
3569        ) -> Result<()> {
3570            self.pipe.server.stream_send(stream_id, data, fin)?;
3571
3572            self.advance().ok();
3573
3574            Ok(())
3575        }
3576    }
3577}
3578
3579#[cfg(test)]
3580mod tests {
3581    use super::*;
3582
3583    use super::testing::*;
3584
3585    #[test]
3586    /// Make sure that random GREASE values is within the specified limit.
3587    fn grease_value_in_varint_limit() {
3588        assert!(grease_value() < 2u64.pow(62) - 1);
3589    }
3590
3591    #[cfg(not(feature = "openssl"))] // 0-RTT not supported when using openssl/quictls
3592    #[test]
3593    fn h3_handshake_0rtt() {
3594        let mut buf = [0; 65535];
3595
3596        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
3597        config
3598            .load_cert_chain_from_pem_file("examples/cert.crt")
3599            .unwrap();
3600        config
3601            .load_priv_key_from_pem_file("examples/cert.key")
3602            .unwrap();
3603        config
3604            .set_application_protos(&[b"proto1", b"proto2"])
3605            .unwrap();
3606        config.set_initial_max_data(30);
3607        config.set_initial_max_stream_data_bidi_local(15);
3608        config.set_initial_max_stream_data_bidi_remote(15);
3609        config.set_initial_max_stream_data_uni(15);
3610        config.set_initial_max_streams_bidi(3);
3611        config.set_initial_max_streams_uni(3);
3612        config.enable_early_data();
3613        config.verify_peer(false);
3614
3615        let h3_config = Config::new().unwrap();
3616
3617        // Perform initial handshake.
3618        let mut pipe = crate::testing::Pipe::with_config(&mut config).unwrap();
3619        assert_eq!(pipe.handshake(), Ok(()));
3620
3621        // Extract session,
3622        let session = pipe.client.session().unwrap();
3623
3624        // Configure session on new connection.
3625        let mut pipe = crate::testing::Pipe::with_config(&mut config).unwrap();
3626        assert_eq!(pipe.client.set_session(session), Ok(()));
3627
3628        // Can't create an H3 connection until the QUIC connection is determined
3629        // to have made sufficient early data progress.
3630        assert!(matches!(
3631            Connection::with_transport(&mut pipe.client, &h3_config),
3632            Err(Error::InternalError)
3633        ));
3634
3635        // Client sends initial flight.
3636        let (len, _) = pipe.client.send(&mut buf).unwrap();
3637
3638        // Now an H3 connection can be created.
3639        assert!(Connection::with_transport(&mut pipe.client, &h3_config).is_ok());
3640        assert_eq!(pipe.server_recv(&mut buf[..len]), Ok(len));
3641
3642        // Client sends 0-RTT packet.
3643        let pkt_type = crate::packet::Type::ZeroRTT;
3644
3645        let frames = [crate::frame::Frame::Stream {
3646            stream_id: 6,
3647            data: <crate::range_buf::RangeBuf>::from(b"aaaaa", 0, true),
3648        }];
3649
3650        assert_eq!(
3651            pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
3652            Ok(1200)
3653        );
3654
3655        assert_eq!(pipe.server.undecryptable_pkts.len(), 0);
3656
3657        // 0-RTT stream data is readable.
3658        let mut r = pipe.server.readable();
3659        assert_eq!(r.next(), Some(6));
3660        assert_eq!(r.next(), None);
3661
3662        let mut b = [0; 15];
3663        assert_eq!(pipe.server.stream_recv(6, &mut b), Ok((5, true)));
3664        assert_eq!(&b[..5], b"aaaaa");
3665    }
3666
3667    #[test]
3668    /// Send a request with no body, get a response with no body.
3669    fn request_no_body_response_no_body() {
3670        let mut s = Session::new().unwrap();
3671        s.handshake().unwrap();
3672
3673        let (stream, req) = s.send_request(true).unwrap();
3674
3675        assert_eq!(stream, 0);
3676
3677        let ev_headers = Event::Headers {
3678            list: req,
3679            more_frames: false,
3680        };
3681
3682        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
3683        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
3684
3685        let resp = s.send_response(stream, true).unwrap();
3686
3687        let ev_headers = Event::Headers {
3688            list: resp,
3689            more_frames: false,
3690        };
3691
3692        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
3693        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
3694        assert_eq!(s.poll_client(), Err(Error::Done));
3695    }
3696
3697    #[test]
3698    /// Send a request with no body, get a response with one DATA frame.
3699    fn request_no_body_response_one_chunk() {
3700        let mut s = Session::new().unwrap();
3701        s.handshake().unwrap();
3702
3703        let (stream, req) = s.send_request(true).unwrap();
3704        assert_eq!(stream, 0);
3705
3706        let ev_headers = Event::Headers {
3707            list: req,
3708            more_frames: false,
3709        };
3710
3711        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
3712
3713        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
3714
3715        let resp = s.send_response(stream, false).unwrap();
3716
3717        let body = s.send_body_server(stream, true).unwrap();
3718
3719        let mut recv_buf = vec![0; body.len()];
3720
3721        let ev_headers = Event::Headers {
3722            list: resp,
3723            more_frames: true,
3724        };
3725
3726        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
3727
3728        assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
3729        assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(body.len()));
3730
3731        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
3732        assert_eq!(s.poll_client(), Err(Error::Done));
3733    }
3734
3735    #[test]
3736    /// Send a request with no body, get a response with multiple DATA frames.
3737    fn request_no_body_response_many_chunks() {
3738        let mut s = Session::new().unwrap();
3739        s.handshake().unwrap();
3740
3741        let (stream, req) = s.send_request(true).unwrap();
3742
3743        let ev_headers = Event::Headers {
3744            list: req,
3745            more_frames: false,
3746        };
3747
3748        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
3749        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
3750
3751        let total_data_frames = 4;
3752
3753        let resp = s.send_response(stream, false).unwrap();
3754
3755        for _ in 0..total_data_frames - 1 {
3756            s.send_body_server(stream, false).unwrap();
3757        }
3758
3759        let body = s.send_body_server(stream, true).unwrap();
3760
3761        let mut recv_buf = vec![0; body.len()];
3762
3763        let ev_headers = Event::Headers {
3764            list: resp,
3765            more_frames: true,
3766        };
3767
3768        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
3769        assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
3770        assert_eq!(s.poll_client(), Err(Error::Done));
3771
3772        for _ in 0..total_data_frames {
3773            assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(body.len()));
3774        }
3775
3776        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
3777        assert_eq!(s.poll_client(), Err(Error::Done));
3778    }
3779
3780    #[test]
3781    /// Send a request with one DATA frame, get a response with no body.
3782    fn request_one_chunk_response_no_body() {
3783        let mut s = Session::new().unwrap();
3784        s.handshake().unwrap();
3785
3786        let (stream, req) = s.send_request(false).unwrap();
3787
3788        let body = s.send_body_client(stream, true).unwrap();
3789
3790        let mut recv_buf = vec![0; body.len()];
3791
3792        let ev_headers = Event::Headers {
3793            list: req,
3794            more_frames: true,
3795        };
3796
3797        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
3798
3799        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
3800        assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
3801
3802        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
3803
3804        let resp = s.send_response(stream, true).unwrap();
3805
3806        let ev_headers = Event::Headers {
3807            list: resp,
3808            more_frames: false,
3809        };
3810
3811        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
3812        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
3813    }
3814
3815    #[test]
3816    /// Send a request with multiple DATA frames, get a response with no body.
3817    fn request_many_chunks_response_no_body() {
3818        let mut s = Session::new().unwrap();
3819        s.handshake().unwrap();
3820
3821        let (stream, req) = s.send_request(false).unwrap();
3822
3823        let total_data_frames = 4;
3824
3825        for _ in 0..total_data_frames - 1 {
3826            s.send_body_client(stream, false).unwrap();
3827        }
3828
3829        let body = s.send_body_client(stream, true).unwrap();
3830
3831        let mut recv_buf = vec![0; body.len()];
3832
3833        let ev_headers = Event::Headers {
3834            list: req,
3835            more_frames: true,
3836        };
3837
3838        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
3839        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
3840        assert_eq!(s.poll_server(), Err(Error::Done));
3841
3842        for _ in 0..total_data_frames {
3843            assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
3844        }
3845
3846        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
3847
3848        let resp = s.send_response(stream, true).unwrap();
3849
3850        let ev_headers = Event::Headers {
3851            list: resp,
3852            more_frames: false,
3853        };
3854
3855        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
3856        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
3857    }
3858
3859    #[test]
3860    /// Send a request with multiple DATA frames, get a response with one DATA
3861    /// frame.
3862    fn many_requests_many_chunks_response_one_chunk() {
3863        let mut s = Session::new().unwrap();
3864        s.handshake().unwrap();
3865
3866        let mut reqs = Vec::new();
3867
3868        let (stream1, req1) = s.send_request(false).unwrap();
3869        assert_eq!(stream1, 0);
3870        reqs.push(req1);
3871
3872        let (stream2, req2) = s.send_request(false).unwrap();
3873        assert_eq!(stream2, 4);
3874        reqs.push(req2);
3875
3876        let (stream3, req3) = s.send_request(false).unwrap();
3877        assert_eq!(stream3, 8);
3878        reqs.push(req3);
3879
3880        let body = s.send_body_client(stream1, false).unwrap();
3881        s.send_body_client(stream2, false).unwrap();
3882        s.send_body_client(stream3, false).unwrap();
3883
3884        let mut recv_buf = vec![0; body.len()];
3885
3886        // Reverse order of writes.
3887
3888        s.send_body_client(stream3, true).unwrap();
3889        s.send_body_client(stream2, true).unwrap();
3890        s.send_body_client(stream1, true).unwrap();
3891
3892        let (_, ev) = s.poll_server().unwrap();
3893        let ev_headers = Event::Headers {
3894            list: reqs[0].clone(),
3895            more_frames: true,
3896        };
3897        assert_eq!(ev, ev_headers);
3898
3899        let (_, ev) = s.poll_server().unwrap();
3900        let ev_headers = Event::Headers {
3901            list: reqs[1].clone(),
3902            more_frames: true,
3903        };
3904        assert_eq!(ev, ev_headers);
3905
3906        let (_, ev) = s.poll_server().unwrap();
3907        let ev_headers = Event::Headers {
3908            list: reqs[2].clone(),
3909            more_frames: true,
3910        };
3911        assert_eq!(ev, ev_headers);
3912
3913        assert_eq!(s.poll_server(), Ok((0, Event::Data)));
3914        assert_eq!(s.recv_body_server(0, &mut recv_buf), Ok(body.len()));
3915        assert_eq!(s.poll_client(), Err(Error::Done));
3916        assert_eq!(s.recv_body_server(0, &mut recv_buf), Ok(body.len()));
3917        assert_eq!(s.poll_server(), Ok((0, Event::Finished)));
3918
3919        assert_eq!(s.poll_server(), Ok((4, Event::Data)));
3920        assert_eq!(s.recv_body_server(4, &mut recv_buf), Ok(body.len()));
3921        assert_eq!(s.poll_client(), Err(Error::Done));
3922        assert_eq!(s.recv_body_server(4, &mut recv_buf), Ok(body.len()));
3923        assert_eq!(s.poll_server(), Ok((4, Event::Finished)));
3924
3925        assert_eq!(s.poll_server(), Ok((8, Event::Data)));
3926        assert_eq!(s.recv_body_server(8, &mut recv_buf), Ok(body.len()));
3927        assert_eq!(s.poll_client(), Err(Error::Done));
3928        assert_eq!(s.recv_body_server(8, &mut recv_buf), Ok(body.len()));
3929        assert_eq!(s.poll_server(), Ok((8, Event::Finished)));
3930
3931        assert_eq!(s.poll_server(), Err(Error::Done));
3932
3933        let mut resps = Vec::new();
3934
3935        let resp1 = s.send_response(stream1, true).unwrap();
3936        resps.push(resp1);
3937
3938        let resp2 = s.send_response(stream2, true).unwrap();
3939        resps.push(resp2);
3940
3941        let resp3 = s.send_response(stream3, true).unwrap();
3942        resps.push(resp3);
3943
3944        for _ in 0..resps.len() {
3945            let (stream, ev) = s.poll_client().unwrap();
3946            let ev_headers = Event::Headers {
3947                list: resps[(stream / 4) as usize].clone(),
3948                more_frames: false,
3949            };
3950            assert_eq!(ev, ev_headers);
3951            assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
3952        }
3953
3954        assert_eq!(s.poll_client(), Err(Error::Done));
3955    }
3956
3957    #[test]
3958    /// Send a request with no body, get a response with one DATA frame and an
3959    /// empty FIN after reception from the client.
3960    fn request_no_body_response_one_chunk_empty_fin() {
3961        let mut s = Session::new().unwrap();
3962        s.handshake().unwrap();
3963
3964        let (stream, req) = s.send_request(true).unwrap();
3965
3966        let ev_headers = Event::Headers {
3967            list: req,
3968            more_frames: false,
3969        };
3970
3971        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
3972        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
3973
3974        let resp = s.send_response(stream, false).unwrap();
3975
3976        let body = s.send_body_server(stream, false).unwrap();
3977
3978        let mut recv_buf = vec![0; body.len()];
3979
3980        let ev_headers = Event::Headers {
3981            list: resp,
3982            more_frames: true,
3983        };
3984
3985        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
3986
3987        assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
3988        assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(body.len()));
3989
3990        assert_eq!(s.pipe.server.stream_send(stream, &[], true), Ok(0));
3991        s.advance().ok();
3992
3993        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
3994        assert_eq!(s.poll_client(), Err(Error::Done));
3995    }
3996
3997    #[test]
3998    /// Send a request with no body, get a response with no body followed by
3999    /// GREASE that is STREAM frame with a FIN.
4000    fn request_no_body_response_no_body_with_grease() {
4001        let mut s = Session::new().unwrap();
4002        s.handshake().unwrap();
4003
4004        let (stream, req) = s.send_request(true).unwrap();
4005
4006        assert_eq!(stream, 0);
4007
4008        let ev_headers = Event::Headers {
4009            list: req,
4010            more_frames: false,
4011        };
4012
4013        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4014        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
4015
4016        let resp = s.send_response(stream, false).unwrap();
4017
4018        let ev_headers = Event::Headers {
4019            list: resp,
4020            more_frames: true,
4021        };
4022
4023        // Inject a GREASE frame
4024        let mut d = [42; 10];
4025        let mut b = octets::OctetsMut::with_slice(&mut d);
4026
4027        let frame_type = b.put_varint(148_764_065_110_560_899).unwrap();
4028        s.pipe.server.stream_send(0, frame_type, false).unwrap();
4029
4030        let frame_len = b.put_varint(10).unwrap();
4031        s.pipe.server.stream_send(0, frame_len, false).unwrap();
4032
4033        s.pipe.server.stream_send(0, &d, true).unwrap();
4034
4035        s.advance().ok();
4036
4037        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
4038        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
4039        assert_eq!(s.poll_client(), Err(Error::Done));
4040    }
4041
4042    #[test]
4043    /// Try to send DATA frames before HEADERS.
4044    fn body_response_before_headers() {
4045        let mut s = Session::new().unwrap();
4046        s.handshake().unwrap();
4047
4048        let (stream, req) = s.send_request(true).unwrap();
4049        assert_eq!(stream, 0);
4050
4051        let ev_headers = Event::Headers {
4052            list: req,
4053            more_frames: false,
4054        };
4055
4056        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4057
4058        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
4059
4060        assert_eq!(
4061            s.send_body_server(stream, true),
4062            Err(Error::FrameUnexpected)
4063        );
4064
4065        assert_eq!(s.poll_client(), Err(Error::Done));
4066    }
4067
4068    #[test]
4069    /// Try to send DATA frames on wrong streams, ensure the API returns an
4070    /// error before anything hits the transport layer.
4071    fn send_body_invalid_client_stream() {
4072        let mut s = Session::new().unwrap();
4073        s.handshake().unwrap();
4074
4075        assert_eq!(s.send_body_client(0, true), Err(Error::FrameUnexpected));
4076
4077        assert_eq!(
4078            s.send_body_client(s.client.control_stream_id.unwrap(), true),
4079            Err(Error::FrameUnexpected)
4080        );
4081
4082        assert_eq!(
4083            s.send_body_client(
4084                s.client.local_qpack_streams.encoder_stream_id.unwrap(),
4085                true
4086            ),
4087            Err(Error::FrameUnexpected)
4088        );
4089
4090        assert_eq!(
4091            s.send_body_client(
4092                s.client.local_qpack_streams.decoder_stream_id.unwrap(),
4093                true
4094            ),
4095            Err(Error::FrameUnexpected)
4096        );
4097
4098        assert_eq!(
4099            s.send_body_client(s.client.peer_control_stream_id.unwrap(), true),
4100            Err(Error::FrameUnexpected)
4101        );
4102
4103        assert_eq!(
4104            s.send_body_client(
4105                s.client.peer_qpack_streams.encoder_stream_id.unwrap(),
4106                true
4107            ),
4108            Err(Error::FrameUnexpected)
4109        );
4110
4111        assert_eq!(
4112            s.send_body_client(
4113                s.client.peer_qpack_streams.decoder_stream_id.unwrap(),
4114                true
4115            ),
4116            Err(Error::FrameUnexpected)
4117        );
4118    }
4119
4120    #[test]
4121    /// Try to send DATA frames on wrong streams, ensure the API returns an
4122    /// error before anything hits the transport layer.
4123    fn send_body_invalid_server_stream() {
4124        let mut s = Session::new().unwrap();
4125        s.handshake().unwrap();
4126
4127        assert_eq!(s.send_body_server(0, true), Err(Error::FrameUnexpected));
4128
4129        assert_eq!(
4130            s.send_body_server(s.server.control_stream_id.unwrap(), true),
4131            Err(Error::FrameUnexpected)
4132        );
4133
4134        assert_eq!(
4135            s.send_body_server(
4136                s.server.local_qpack_streams.encoder_stream_id.unwrap(),
4137                true
4138            ),
4139            Err(Error::FrameUnexpected)
4140        );
4141
4142        assert_eq!(
4143            s.send_body_server(
4144                s.server.local_qpack_streams.decoder_stream_id.unwrap(),
4145                true
4146            ),
4147            Err(Error::FrameUnexpected)
4148        );
4149
4150        assert_eq!(
4151            s.send_body_server(s.server.peer_control_stream_id.unwrap(), true),
4152            Err(Error::FrameUnexpected)
4153        );
4154
4155        assert_eq!(
4156            s.send_body_server(
4157                s.server.peer_qpack_streams.encoder_stream_id.unwrap(),
4158                true
4159            ),
4160            Err(Error::FrameUnexpected)
4161        );
4162
4163        assert_eq!(
4164            s.send_body_server(
4165                s.server.peer_qpack_streams.decoder_stream_id.unwrap(),
4166                true
4167            ),
4168            Err(Error::FrameUnexpected)
4169        );
4170    }
4171
4172    #[test]
4173    /// Client sends request with body and trailers.
4174    fn trailers() {
4175        let mut s = Session::new().unwrap();
4176        s.handshake().unwrap();
4177
4178        let (stream, req) = s.send_request(false).unwrap();
4179
4180        let body = s.send_body_client(stream, false).unwrap();
4181
4182        let mut recv_buf = vec![0; body.len()];
4183
4184        let req_trailers = vec![Header::new(b"foo", b"bar")];
4185
4186        s.client
4187            .send_additional_headers(
4188                &mut s.pipe.client,
4189                stream,
4190                &req_trailers,
4191                true,
4192                true,
4193            )
4194            .unwrap();
4195
4196        s.advance().ok();
4197
4198        let ev_headers = Event::Headers {
4199            list: req,
4200            more_frames: true,
4201        };
4202
4203        let ev_trailers = Event::Headers {
4204            list: req_trailers,
4205            more_frames: false,
4206        };
4207
4208        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4209
4210        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
4211        assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
4212
4213        assert_eq!(s.poll_server(), Ok((stream, ev_trailers)));
4214        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
4215    }
4216
4217    #[test]
4218    /// Server responds with a 103, then a 200 with no body.
4219    fn informational_response() {
4220        let mut s = Session::new().unwrap();
4221        s.handshake().unwrap();
4222
4223        let (stream, req) = s.send_request(true).unwrap();
4224
4225        assert_eq!(stream, 0);
4226
4227        let ev_headers = Event::Headers {
4228            list: req,
4229            more_frames: false,
4230        };
4231
4232        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4233        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
4234
4235        let info_resp = vec![
4236            Header::new(b":status", b"103"),
4237            Header::new(b"link", b"<https://example.com>; rel=\"preconnect\""),
4238        ];
4239
4240        let resp = vec![
4241            Header::new(b":status", b"200"),
4242            Header::new(b"server", b"quiche-test"),
4243        ];
4244
4245        s.server
4246            .send_response(&mut s.pipe.server, stream, &info_resp, false)
4247            .unwrap();
4248
4249        s.server
4250            .send_additional_headers(
4251                &mut s.pipe.server,
4252                stream,
4253                &resp,
4254                false,
4255                true,
4256            )
4257            .unwrap();
4258
4259        s.advance().ok();
4260
4261        let ev_info_headers = Event::Headers {
4262            list: info_resp,
4263            more_frames: true,
4264        };
4265
4266        let ev_headers = Event::Headers {
4267            list: resp,
4268            more_frames: false,
4269        };
4270
4271        assert_eq!(s.poll_client(), Ok((stream, ev_info_headers)));
4272        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
4273        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
4274        assert_eq!(s.poll_client(), Err(Error::Done));
4275    }
4276
4277    #[test]
4278    /// Server responds with a 103, then attempts to send a 200 using
4279    /// send_response again, which should fail.
4280    fn no_multiple_response() {
4281        let mut s = Session::new().unwrap();
4282        s.handshake().unwrap();
4283
4284        let (stream, req) = s.send_request(true).unwrap();
4285
4286        assert_eq!(stream, 0);
4287
4288        let ev_headers = Event::Headers {
4289            list: req,
4290            more_frames: false,
4291        };
4292
4293        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4294        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
4295
4296        let info_resp = vec![
4297            Header::new(b":status", b"103"),
4298            Header::new(b"link", b"<https://example.com>; rel=\"preconnect\""),
4299        ];
4300
4301        let resp = vec![
4302            Header::new(b":status", b"200"),
4303            Header::new(b"server", b"quiche-test"),
4304        ];
4305
4306        s.server
4307            .send_response(&mut s.pipe.server, stream, &info_resp, false)
4308            .unwrap();
4309
4310        assert_eq!(
4311            Err(Error::FrameUnexpected),
4312            s.server
4313                .send_response(&mut s.pipe.server, stream, &resp, true)
4314        );
4315
4316        s.advance().ok();
4317
4318        let ev_info_headers = Event::Headers {
4319            list: info_resp,
4320            more_frames: true,
4321        };
4322
4323        assert_eq!(s.poll_client(), Ok((stream, ev_info_headers)));
4324        assert_eq!(s.poll_client(), Err(Error::Done));
4325    }
4326
4327    #[test]
4328    /// Server attempts to use send_additional_headers before initial response.
4329    fn no_send_additional_before_initial_response() {
4330        let mut s = Session::new().unwrap();
4331        s.handshake().unwrap();
4332
4333        let (stream, req) = s.send_request(true).unwrap();
4334
4335        assert_eq!(stream, 0);
4336
4337        let ev_headers = Event::Headers {
4338            list: req,
4339            more_frames: false,
4340        };
4341
4342        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4343        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
4344
4345        let info_resp = vec![
4346            Header::new(b":status", b"103"),
4347            Header::new(b"link", b"<https://example.com>; rel=\"preconnect\""),
4348        ];
4349
4350        assert_eq!(
4351            Err(Error::FrameUnexpected),
4352            s.server.send_additional_headers(
4353                &mut s.pipe.server,
4354                stream,
4355                &info_resp,
4356                false,
4357                false
4358            )
4359        );
4360
4361        s.advance().ok();
4362
4363        assert_eq!(s.poll_client(), Err(Error::Done));
4364    }
4365
4366    #[test]
4367    /// Client sends multiple HEADERS before data.
4368    fn additional_headers_before_data_client() {
4369        let mut s = Session::new().unwrap();
4370        s.handshake().unwrap();
4371
4372        let (stream, req) = s.send_request(false).unwrap();
4373
4374        let req_trailer = vec![Header::new(b"goodbye", b"world")];
4375
4376        assert_eq!(
4377            s.client.send_additional_headers(
4378                &mut s.pipe.client,
4379                stream,
4380                &req_trailer,
4381                true,
4382                false
4383            ),
4384            Ok(())
4385        );
4386
4387        s.advance().ok();
4388
4389        let ev_initial_headers = Event::Headers {
4390            list: req,
4391            more_frames: true,
4392        };
4393
4394        let ev_trailing_headers = Event::Headers {
4395            list: req_trailer,
4396            more_frames: true,
4397        };
4398
4399        assert_eq!(s.poll_server(), Ok((stream, ev_initial_headers)));
4400        assert_eq!(s.poll_server(), Ok((stream, ev_trailing_headers)));
4401        assert_eq!(s.poll_server(), Err(Error::Done));
4402    }
4403
4404    #[test]
4405    /// Client sends multiple HEADERS before data.
4406    fn data_after_trailers_client() {
4407        let mut s = Session::new().unwrap();
4408        s.handshake().unwrap();
4409
4410        let (stream, req) = s.send_request(false).unwrap();
4411
4412        let body = s.send_body_client(stream, false).unwrap();
4413
4414        let mut recv_buf = vec![0; body.len()];
4415
4416        let req_trailers = vec![Header::new(b"foo", b"bar")];
4417
4418        s.client
4419            .send_additional_headers(
4420                &mut s.pipe.client,
4421                stream,
4422                &req_trailers,
4423                true,
4424                false,
4425            )
4426            .unwrap();
4427
4428        s.advance().ok();
4429
4430        s.send_frame_client(
4431            frame::Frame::Data {
4432                payload: vec![1, 2, 3, 4],
4433            },
4434            stream,
4435            true,
4436        )
4437        .unwrap();
4438
4439        let ev_headers = Event::Headers {
4440            list: req,
4441            more_frames: true,
4442        };
4443
4444        let ev_trailers = Event::Headers {
4445            list: req_trailers,
4446            more_frames: true,
4447        };
4448
4449        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4450        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
4451        assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
4452        assert_eq!(s.poll_server(), Ok((stream, ev_trailers)));
4453        assert_eq!(s.poll_server(), Err(Error::FrameUnexpected));
4454    }
4455
4456    #[test]
4457    /// Send a MAX_PUSH_ID frame from the client on a valid stream.
4458    fn max_push_id_from_client_good() {
4459        let mut s = Session::new().unwrap();
4460        s.handshake().unwrap();
4461
4462        s.send_frame_client(
4463            frame::Frame::MaxPushId { push_id: 1 },
4464            s.client.control_stream_id.unwrap(),
4465            false,
4466        )
4467        .unwrap();
4468
4469        assert_eq!(s.poll_server(), Err(Error::Done));
4470    }
4471
4472    #[test]
4473    /// Send a MAX_PUSH_ID frame from the client on an invalid stream.
4474    fn max_push_id_from_client_bad_stream() {
4475        let mut s = Session::new().unwrap();
4476        s.handshake().unwrap();
4477
4478        let (stream, req) = s.send_request(false).unwrap();
4479
4480        s.send_frame_client(
4481            frame::Frame::MaxPushId { push_id: 2 },
4482            stream,
4483            false,
4484        )
4485        .unwrap();
4486
4487        let ev_headers = Event::Headers {
4488            list: req,
4489            more_frames: true,
4490        };
4491
4492        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4493        assert_eq!(s.poll_server(), Err(Error::FrameUnexpected));
4494    }
4495
4496    #[test]
4497    /// Send a sequence of MAX_PUSH_ID frames from the client that attempt to
4498    /// reduce the limit.
4499    fn max_push_id_from_client_limit_reduction() {
4500        let mut s = Session::new().unwrap();
4501        s.handshake().unwrap();
4502
4503        s.send_frame_client(
4504            frame::Frame::MaxPushId { push_id: 2 },
4505            s.client.control_stream_id.unwrap(),
4506            false,
4507        )
4508        .unwrap();
4509
4510        s.send_frame_client(
4511            frame::Frame::MaxPushId { push_id: 1 },
4512            s.client.control_stream_id.unwrap(),
4513            false,
4514        )
4515        .unwrap();
4516
4517        assert_eq!(s.poll_server(), Err(Error::IdError));
4518    }
4519
4520    #[test]
4521    /// Send a MAX_PUSH_ID frame from the server, which is forbidden.
4522    fn max_push_id_from_server() {
4523        let mut s = Session::new().unwrap();
4524        s.handshake().unwrap();
4525
4526        s.send_frame_server(
4527            frame::Frame::MaxPushId { push_id: 1 },
4528            s.server.control_stream_id.unwrap(),
4529            false,
4530        )
4531        .unwrap();
4532
4533        assert_eq!(s.poll_client(), Err(Error::FrameUnexpected));
4534    }
4535
4536    #[test]
4537    /// Send a PUSH_PROMISE frame from the client, which is forbidden.
4538    fn push_promise_from_client() {
4539        let mut s = Session::new().unwrap();
4540        s.handshake().unwrap();
4541
4542        let (stream, req) = s.send_request(false).unwrap();
4543
4544        let header_block = s.client.encode_header_block(&req).unwrap();
4545
4546        s.send_frame_client(
4547            frame::Frame::PushPromise {
4548                push_id: 1,
4549                header_block,
4550            },
4551            stream,
4552            false,
4553        )
4554        .unwrap();
4555
4556        let ev_headers = Event::Headers {
4557            list: req,
4558            more_frames: true,
4559        };
4560
4561        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4562        assert_eq!(s.poll_server(), Err(Error::FrameUnexpected));
4563    }
4564
4565    #[test]
4566    /// Send a CANCEL_PUSH frame from the client.
4567    fn cancel_push_from_client() {
4568        let mut s = Session::new().unwrap();
4569        s.handshake().unwrap();
4570
4571        s.send_frame_client(
4572            frame::Frame::CancelPush { push_id: 1 },
4573            s.client.control_stream_id.unwrap(),
4574            false,
4575        )
4576        .unwrap();
4577
4578        assert_eq!(s.poll_server(), Err(Error::Done));
4579    }
4580
4581    #[test]
4582    /// Send a CANCEL_PUSH frame from the client on an invalid stream.
4583    fn cancel_push_from_client_bad_stream() {
4584        let mut s = Session::new().unwrap();
4585        s.handshake().unwrap();
4586
4587        let (stream, req) = s.send_request(false).unwrap();
4588
4589        s.send_frame_client(
4590            frame::Frame::CancelPush { push_id: 2 },
4591            stream,
4592            false,
4593        )
4594        .unwrap();
4595
4596        let ev_headers = Event::Headers {
4597            list: req,
4598            more_frames: true,
4599        };
4600
4601        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4602        assert_eq!(s.poll_server(), Err(Error::FrameUnexpected));
4603    }
4604
4605    #[test]
4606    /// Send a CANCEL_PUSH frame from the client.
4607    fn cancel_push_from_server() {
4608        let mut s = Session::new().unwrap();
4609        s.handshake().unwrap();
4610
4611        s.send_frame_server(
4612            frame::Frame::CancelPush { push_id: 1 },
4613            s.server.control_stream_id.unwrap(),
4614            false,
4615        )
4616        .unwrap();
4617
4618        assert_eq!(s.poll_client(), Err(Error::Done));
4619    }
4620
4621    #[test]
4622    /// Send a GOAWAY frame from the client.
4623    fn goaway_from_client_good() {
4624        let mut s = Session::new().unwrap();
4625        s.handshake().unwrap();
4626
4627        s.client.send_goaway(&mut s.pipe.client, 100).unwrap();
4628
4629        s.advance().ok();
4630
4631        // TODO: server push
4632        assert_eq!(s.poll_server(), Ok((0, Event::GoAway)));
4633    }
4634
4635    #[test]
4636    /// Send a GOAWAY frame from the server.
4637    fn goaway_from_server_good() {
4638        let mut s = Session::new().unwrap();
4639        s.handshake().unwrap();
4640
4641        s.server.send_goaway(&mut s.pipe.server, 4000).unwrap();
4642
4643        s.advance().ok();
4644
4645        assert_eq!(s.poll_client(), Ok((4000, Event::GoAway)));
4646    }
4647
4648    #[test]
4649    /// A client MUST NOT send a request after it receives GOAWAY.
4650    fn client_request_after_goaway() {
4651        let mut s = Session::new().unwrap();
4652        s.handshake().unwrap();
4653
4654        s.server.send_goaway(&mut s.pipe.server, 4000).unwrap();
4655
4656        s.advance().ok();
4657
4658        assert_eq!(s.poll_client(), Ok((4000, Event::GoAway)));
4659
4660        assert_eq!(s.send_request(true), Err(Error::FrameUnexpected));
4661    }
4662
4663    #[test]
4664    /// Send a GOAWAY frame from the server, using an invalid goaway ID.
4665    fn goaway_from_server_invalid_id() {
4666        let mut s = Session::new().unwrap();
4667        s.handshake().unwrap();
4668
4669        s.send_frame_server(
4670            frame::Frame::GoAway { id: 1 },
4671            s.server.control_stream_id.unwrap(),
4672            false,
4673        )
4674        .unwrap();
4675
4676        assert_eq!(s.poll_client(), Err(Error::IdError));
4677    }
4678
4679    #[test]
4680    /// Send multiple GOAWAY frames from the server, that increase the goaway
4681    /// ID.
4682    fn goaway_from_server_increase_id() {
4683        let mut s = Session::new().unwrap();
4684        s.handshake().unwrap();
4685
4686        s.send_frame_server(
4687            frame::Frame::GoAway { id: 0 },
4688            s.server.control_stream_id.unwrap(),
4689            false,
4690        )
4691        .unwrap();
4692
4693        s.send_frame_server(
4694            frame::Frame::GoAway { id: 4 },
4695            s.server.control_stream_id.unwrap(),
4696            false,
4697        )
4698        .unwrap();
4699
4700        assert_eq!(s.poll_client(), Ok((0, Event::GoAway)));
4701
4702        assert_eq!(s.poll_client(), Err(Error::IdError));
4703    }
4704
4705    #[test]
4706    #[cfg(feature = "sfv")]
4707    fn parse_priority_field_value() {
4708        // Legal dicts
4709        assert_eq!(
4710            Ok(Priority::new(0, false)),
4711            Priority::try_from(b"u=0".as_slice())
4712        );
4713        assert_eq!(
4714            Ok(Priority::new(3, false)),
4715            Priority::try_from(b"u=3".as_slice())
4716        );
4717        assert_eq!(
4718            Ok(Priority::new(7, false)),
4719            Priority::try_from(b"u=7".as_slice())
4720        );
4721
4722        assert_eq!(
4723            Ok(Priority::new(0, true)),
4724            Priority::try_from(b"u=0, i".as_slice())
4725        );
4726        assert_eq!(
4727            Ok(Priority::new(3, true)),
4728            Priority::try_from(b"u=3, i".as_slice())
4729        );
4730        assert_eq!(
4731            Ok(Priority::new(7, true)),
4732            Priority::try_from(b"u=7, i".as_slice())
4733        );
4734
4735        assert_eq!(
4736            Ok(Priority::new(0, true)),
4737            Priority::try_from(b"u=0, i=?1".as_slice())
4738        );
4739        assert_eq!(
4740            Ok(Priority::new(3, true)),
4741            Priority::try_from(b"u=3, i=?1".as_slice())
4742        );
4743        assert_eq!(
4744            Ok(Priority::new(7, true)),
4745            Priority::try_from(b"u=7, i=?1".as_slice())
4746        );
4747
4748        assert_eq!(
4749            Ok(Priority::new(3, false)),
4750            Priority::try_from(b"".as_slice())
4751        );
4752
4753        assert_eq!(
4754            Ok(Priority::new(0, true)),
4755            Priority::try_from(b"u=0;foo, i;bar".as_slice())
4756        );
4757        assert_eq!(
4758            Ok(Priority::new(3, true)),
4759            Priority::try_from(b"u=3;hello, i;world".as_slice())
4760        );
4761        assert_eq!(
4762            Ok(Priority::new(7, true)),
4763            Priority::try_from(b"u=7;croeso, i;gymru".as_slice())
4764        );
4765
4766        assert_eq!(
4767            Ok(Priority::new(0, true)),
4768            Priority::try_from(b"u=0, i, spinaltap=11".as_slice())
4769        );
4770
4771        // Illegal formats
4772        assert_eq!(Err(Error::Done), Priority::try_from(b"0".as_slice()));
4773        assert_eq!(
4774            Ok(Priority::new(7, false)),
4775            Priority::try_from(b"u=-1".as_slice())
4776        );
4777        assert_eq!(Err(Error::Done), Priority::try_from(b"u=0.2".as_slice()));
4778        assert_eq!(
4779            Ok(Priority::new(7, false)),
4780            Priority::try_from(b"u=100".as_slice())
4781        );
4782        assert_eq!(
4783            Err(Error::Done),
4784            Priority::try_from(b"u=3, i=true".as_slice())
4785        );
4786
4787        // Trailing comma in dict is malformed
4788        assert_eq!(Err(Error::Done), Priority::try_from(b"u=7, ".as_slice()));
4789    }
4790
4791    #[test]
4792    /// Send a PRIORITY_UPDATE for request stream from the client.
4793    fn priority_update_request() {
4794        let mut s = Session::new().unwrap();
4795        s.handshake().unwrap();
4796
4797        s.client
4798            .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
4799                urgency: 3,
4800                incremental: false,
4801            })
4802            .unwrap();
4803        s.advance().ok();
4804
4805        assert_eq!(s.poll_server(), Ok((0, Event::PriorityUpdate)));
4806        assert_eq!(s.poll_server(), Err(Error::Done));
4807    }
4808
4809    #[test]
4810    /// Send a PRIORITY_UPDATE for request stream from the client.
4811    fn priority_update_single_stream_rearm() {
4812        let mut s = Session::new().unwrap();
4813        s.handshake().unwrap();
4814
4815        s.client
4816            .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
4817                urgency: 3,
4818                incremental: false,
4819            })
4820            .unwrap();
4821        s.advance().ok();
4822
4823        assert_eq!(s.poll_server(), Ok((0, Event::PriorityUpdate)));
4824        assert_eq!(s.poll_server(), Err(Error::Done));
4825
4826        s.client
4827            .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
4828                urgency: 5,
4829                incremental: false,
4830            })
4831            .unwrap();
4832        s.advance().ok();
4833
4834        assert_eq!(s.poll_server(), Err(Error::Done));
4835
4836        // There is only one PRIORITY_UPDATE frame to read. Once read, the event
4837        // will rearm ready for more.
4838        assert_eq!(s.server.take_last_priority_update(0), Ok(b"u=5".to_vec()));
4839        assert_eq!(s.server.take_last_priority_update(0), Err(Error::Done));
4840
4841        s.client
4842            .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
4843                urgency: 7,
4844                incremental: false,
4845            })
4846            .unwrap();
4847        s.advance().ok();
4848
4849        assert_eq!(s.poll_server(), Ok((0, Event::PriorityUpdate)));
4850        assert_eq!(s.poll_server(), Err(Error::Done));
4851
4852        assert_eq!(s.server.take_last_priority_update(0), Ok(b"u=7".to_vec()));
4853        assert_eq!(s.server.take_last_priority_update(0), Err(Error::Done));
4854    }
4855
4856    #[test]
4857    /// Send multiple PRIORITY_UPDATE frames for different streams from the
4858    /// client across multiple flights of exchange.
4859    fn priority_update_request_multiple_stream_arm_multiple_flights() {
4860        let mut s = Session::new().unwrap();
4861        s.handshake().unwrap();
4862
4863        s.client
4864            .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
4865                urgency: 3,
4866                incremental: false,
4867            })
4868            .unwrap();
4869        s.advance().ok();
4870
4871        assert_eq!(s.poll_server(), Ok((0, Event::PriorityUpdate)));
4872        assert_eq!(s.poll_server(), Err(Error::Done));
4873
4874        s.client
4875            .send_priority_update_for_request(&mut s.pipe.client, 4, &Priority {
4876                urgency: 1,
4877                incremental: false,
4878            })
4879            .unwrap();
4880        s.advance().ok();
4881
4882        assert_eq!(s.poll_server(), Ok((4, Event::PriorityUpdate)));
4883        assert_eq!(s.poll_server(), Err(Error::Done));
4884
4885        s.client
4886            .send_priority_update_for_request(&mut s.pipe.client, 8, &Priority {
4887                urgency: 2,
4888                incremental: false,
4889            })
4890            .unwrap();
4891        s.advance().ok();
4892
4893        assert_eq!(s.poll_server(), Ok((8, Event::PriorityUpdate)));
4894        assert_eq!(s.poll_server(), Err(Error::Done));
4895
4896        assert_eq!(s.server.take_last_priority_update(0), Ok(b"u=3".to_vec()));
4897        assert_eq!(s.server.take_last_priority_update(4), Ok(b"u=1".to_vec()));
4898        assert_eq!(s.server.take_last_priority_update(8), Ok(b"u=2".to_vec()));
4899        assert_eq!(s.server.take_last_priority_update(0), Err(Error::Done));
4900    }
4901
4902    #[test]
4903    /// Send multiple PRIORITY_UPDATE frames for different streams from the
4904    /// client across a single flight.
4905    fn priority_update_request_multiple_stream_arm_single_flight() {
4906        let mut s = Session::new().unwrap();
4907        s.handshake().unwrap();
4908
4909        let mut d = [42; 65535];
4910
4911        let mut b = octets::OctetsMut::with_slice(&mut d);
4912
4913        let p1 = frame::Frame::PriorityUpdateRequest {
4914            prioritized_element_id: 0,
4915            priority_field_value: b"u=3".to_vec(),
4916        };
4917
4918        let p2 = frame::Frame::PriorityUpdateRequest {
4919            prioritized_element_id: 4,
4920            priority_field_value: b"u=3".to_vec(),
4921        };
4922
4923        let p3 = frame::Frame::PriorityUpdateRequest {
4924            prioritized_element_id: 8,
4925            priority_field_value: b"u=3".to_vec(),
4926        };
4927
4928        p1.to_bytes(&mut b).unwrap();
4929        p2.to_bytes(&mut b).unwrap();
4930        p3.to_bytes(&mut b).unwrap();
4931
4932        let off = b.off();
4933        s.pipe
4934            .client
4935            .stream_send(s.client.control_stream_id.unwrap(), &d[..off], false)
4936            .unwrap();
4937
4938        s.advance().ok();
4939
4940        assert_eq!(s.poll_server(), Ok((0, Event::PriorityUpdate)));
4941        assert_eq!(s.poll_server(), Ok((4, Event::PriorityUpdate)));
4942        assert_eq!(s.poll_server(), Ok((8, Event::PriorityUpdate)));
4943        assert_eq!(s.poll_server(), Err(Error::Done));
4944
4945        assert_eq!(s.server.take_last_priority_update(0), Ok(b"u=3".to_vec()));
4946        assert_eq!(s.server.take_last_priority_update(4), Ok(b"u=3".to_vec()));
4947        assert_eq!(s.server.take_last_priority_update(8), Ok(b"u=3".to_vec()));
4948
4949        assert_eq!(s.server.take_last_priority_update(0), Err(Error::Done));
4950    }
4951
4952    #[test]
4953    /// Send a PRIORITY_UPDATE for a request stream, before and after the stream
4954    /// has been completed.
4955    fn priority_update_request_collected_completed() {
4956        let mut s = Session::new().unwrap();
4957        s.handshake().unwrap();
4958
4959        s.client
4960            .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
4961                urgency: 3,
4962                incremental: false,
4963            })
4964            .unwrap();
4965        s.advance().ok();
4966
4967        let (stream, req) = s.send_request(true).unwrap();
4968        let ev_headers = Event::Headers {
4969            list: req,
4970            more_frames: false,
4971        };
4972
4973        // Priority event is generated before request headers.
4974        assert_eq!(s.poll_server(), Ok((0, Event::PriorityUpdate)));
4975        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4976        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
4977        assert_eq!(s.poll_server(), Err(Error::Done));
4978
4979        assert_eq!(s.server.take_last_priority_update(0), Ok(b"u=3".to_vec()));
4980        assert_eq!(s.server.take_last_priority_update(0), Err(Error::Done));
4981
4982        let resp = s.send_response(stream, true).unwrap();
4983
4984        let ev_headers = Event::Headers {
4985            list: resp,
4986            more_frames: false,
4987        };
4988
4989        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
4990        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
4991        assert_eq!(s.poll_client(), Err(Error::Done));
4992
4993        // Now send a PRIORITY_UPDATE for the completed request stream.
4994        s.client
4995            .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
4996                urgency: 3,
4997                incremental: false,
4998            })
4999            .unwrap();
5000        s.advance().ok();
5001
5002        // No event generated at server
5003        assert_eq!(s.poll_server(), Err(Error::Done));
5004    }
5005
5006    #[test]
5007    /// Send a PRIORITY_UPDATE for a request stream, before and after the stream
5008    /// has been stopped.
5009    fn priority_update_request_collected_stopped() {
5010        let mut s = Session::new().unwrap();
5011        s.handshake().unwrap();
5012
5013        s.client
5014            .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
5015                urgency: 3,
5016                incremental: false,
5017            })
5018            .unwrap();
5019        s.advance().ok();
5020
5021        let (stream, req) = s.send_request(false).unwrap();
5022        let ev_headers = Event::Headers {
5023            list: req,
5024            more_frames: true,
5025        };
5026
5027        // Priority event is generated before request headers.
5028        assert_eq!(s.poll_server(), Ok((0, Event::PriorityUpdate)));
5029        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
5030        assert_eq!(s.poll_server(), Err(Error::Done));
5031
5032        assert_eq!(s.server.take_last_priority_update(0), Ok(b"u=3".to_vec()));
5033        assert_eq!(s.server.take_last_priority_update(0), Err(Error::Done));
5034
5035        s.pipe
5036            .client
5037            .stream_shutdown(stream, crate::Shutdown::Write, 0x100)
5038            .unwrap();
5039        s.pipe
5040            .client
5041            .stream_shutdown(stream, crate::Shutdown::Read, 0x100)
5042            .unwrap();
5043
5044        s.advance().ok();
5045
5046        assert_eq!(s.poll_server(), Ok((0, Event::Reset(0x100))));
5047        assert_eq!(s.poll_server(), Err(Error::Done));
5048
5049        // Now send a PRIORITY_UPDATE for the closed request stream.
5050        s.client
5051            .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
5052                urgency: 3,
5053                incremental: false,
5054            })
5055            .unwrap();
5056        s.advance().ok();
5057
5058        // No event generated at server
5059        assert_eq!(s.poll_server(), Err(Error::Done));
5060    }
5061
5062    #[test]
5063    /// Send a PRIORITY_UPDATE for push stream from the client.
5064    fn priority_update_push() {
5065        let mut s = Session::new().unwrap();
5066        s.handshake().unwrap();
5067
5068        s.send_frame_client(
5069            frame::Frame::PriorityUpdatePush {
5070                prioritized_element_id: 3,
5071                priority_field_value: b"u=3".to_vec(),
5072            },
5073            s.client.control_stream_id.unwrap(),
5074            false,
5075        )
5076        .unwrap();
5077
5078        assert_eq!(s.poll_server(), Err(Error::Done));
5079    }
5080
5081    #[test]
5082    /// Send a PRIORITY_UPDATE for request stream from the client but for an
5083    /// incorrect stream type.
5084    fn priority_update_request_bad_stream() {
5085        let mut s = Session::new().unwrap();
5086        s.handshake().unwrap();
5087
5088        s.send_frame_client(
5089            frame::Frame::PriorityUpdateRequest {
5090                prioritized_element_id: 5,
5091                priority_field_value: b"u=3".to_vec(),
5092            },
5093            s.client.control_stream_id.unwrap(),
5094            false,
5095        )
5096        .unwrap();
5097
5098        assert_eq!(s.poll_server(), Err(Error::FrameUnexpected));
5099    }
5100
5101    #[test]
5102    /// Send a PRIORITY_UPDATE for push stream from the client but for an
5103    /// incorrect stream type.
5104    fn priority_update_push_bad_stream() {
5105        let mut s = Session::new().unwrap();
5106        s.handshake().unwrap();
5107
5108        s.send_frame_client(
5109            frame::Frame::PriorityUpdatePush {
5110                prioritized_element_id: 5,
5111                priority_field_value: b"u=3".to_vec(),
5112            },
5113            s.client.control_stream_id.unwrap(),
5114            false,
5115        )
5116        .unwrap();
5117
5118        assert_eq!(s.poll_server(), Err(Error::FrameUnexpected));
5119    }
5120
5121    #[test]
5122    /// Send a PRIORITY_UPDATE for request stream from the server.
5123    fn priority_update_request_from_server() {
5124        let mut s = Session::new().unwrap();
5125        s.handshake().unwrap();
5126
5127        s.send_frame_server(
5128            frame::Frame::PriorityUpdateRequest {
5129                prioritized_element_id: 0,
5130                priority_field_value: b"u=3".to_vec(),
5131            },
5132            s.server.control_stream_id.unwrap(),
5133            false,
5134        )
5135        .unwrap();
5136
5137        assert_eq!(s.poll_client(), Err(Error::FrameUnexpected));
5138    }
5139
5140    #[test]
5141    /// Send a PRIORITY_UPDATE for request stream from the server.
5142    fn priority_update_push_from_server() {
5143        let mut s = Session::new().unwrap();
5144        s.handshake().unwrap();
5145
5146        s.send_frame_server(
5147            frame::Frame::PriorityUpdatePush {
5148                prioritized_element_id: 0,
5149                priority_field_value: b"u=3".to_vec(),
5150            },
5151            s.server.control_stream_id.unwrap(),
5152            false,
5153        )
5154        .unwrap();
5155
5156        assert_eq!(s.poll_client(), Err(Error::FrameUnexpected));
5157    }
5158
5159    #[test]
5160    /// Ensure quiche allocates streams for client and server roles as expected.
5161    fn uni_stream_local_counting() {
5162        let config = Config::new().unwrap();
5163
5164        let h3_cln = Connection::new(&config, false, false).unwrap();
5165        assert_eq!(h3_cln.next_uni_stream_id, 2);
5166
5167        let h3_srv = Connection::new(&config, true, false).unwrap();
5168        assert_eq!(h3_srv.next_uni_stream_id, 3);
5169    }
5170
5171    #[test]
5172    /// Client opens multiple control streams, which is forbidden.
5173    fn open_multiple_control_streams() {
5174        let mut s = Session::new().unwrap();
5175        s.handshake().unwrap();
5176
5177        let stream_id = s.client.next_uni_stream_id;
5178
5179        let mut d = [42; 8];
5180        let mut b = octets::OctetsMut::with_slice(&mut d);
5181
5182        s.pipe
5183            .client
5184            .stream_send(
5185                stream_id,
5186                b.put_varint(stream::HTTP3_CONTROL_STREAM_TYPE_ID).unwrap(),
5187                false,
5188            )
5189            .unwrap();
5190
5191        s.advance().ok();
5192
5193        assert_eq!(s.poll_server(), Err(Error::StreamCreationError));
5194    }
5195
5196    #[test]
5197    /// Client closes the control stream, which is forbidden.
5198    fn close_control_stream_after_type() {
5199        let mut s = Session::new().unwrap();
5200        s.handshake().unwrap();
5201
5202        s.pipe
5203            .client
5204            .stream_send(s.client.control_stream_id.unwrap(), &[], true)
5205            .unwrap();
5206
5207        s.advance().ok();
5208
5209        assert_eq!(
5210            Err(Error::ClosedCriticalStream),
5211            s.server.poll(&mut s.pipe.server)
5212        );
5213        assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
5214    }
5215
5216    #[test]
5217    /// Client closes the control stream after a frame is sent, which is
5218    /// forbidden.
5219    fn close_control_stream_after_frame() {
5220        let mut s = Session::new().unwrap();
5221        s.handshake().unwrap();
5222
5223        s.send_frame_client(
5224            frame::Frame::MaxPushId { push_id: 1 },
5225            s.client.control_stream_id.unwrap(),
5226            true,
5227        )
5228        .unwrap();
5229
5230        assert_eq!(
5231            Err(Error::ClosedCriticalStream),
5232            s.server.poll(&mut s.pipe.server)
5233        );
5234        assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
5235    }
5236
5237    #[test]
5238    /// Client resets the control stream, which is forbidden.
5239    fn reset_control_stream_after_type() {
5240        let mut s = Session::new().unwrap();
5241        s.handshake().unwrap();
5242
5243        s.pipe
5244            .client
5245            .stream_shutdown(
5246                s.client.control_stream_id.unwrap(),
5247                crate::Shutdown::Write,
5248                0,
5249            )
5250            .unwrap();
5251
5252        s.advance().ok();
5253
5254        assert_eq!(
5255            Err(Error::ClosedCriticalStream),
5256            s.server.poll(&mut s.pipe.server)
5257        );
5258        assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
5259    }
5260
5261    #[test]
5262    /// Client resets the control stream after a frame is sent, which is
5263    /// forbidden.
5264    fn reset_control_stream_after_frame() {
5265        let mut s = Session::new().unwrap();
5266        s.handshake().unwrap();
5267
5268        s.send_frame_client(
5269            frame::Frame::MaxPushId { push_id: 1 },
5270            s.client.control_stream_id.unwrap(),
5271            false,
5272        )
5273        .unwrap();
5274
5275        assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
5276
5277        s.pipe
5278            .client
5279            .stream_shutdown(
5280                s.client.control_stream_id.unwrap(),
5281                crate::Shutdown::Write,
5282                0,
5283            )
5284            .unwrap();
5285
5286        s.advance().ok();
5287
5288        assert_eq!(
5289            Err(Error::ClosedCriticalStream),
5290            s.server.poll(&mut s.pipe.server)
5291        );
5292        assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
5293    }
5294
5295    #[test]
5296    /// Client closes QPACK stream, which is forbidden.
5297    fn close_qpack_stream_after_type() {
5298        let mut s = Session::new().unwrap();
5299        s.handshake().unwrap();
5300
5301        s.pipe
5302            .client
5303            .stream_send(
5304                s.client.local_qpack_streams.encoder_stream_id.unwrap(),
5305                &[],
5306                true,
5307            )
5308            .unwrap();
5309
5310        s.advance().ok();
5311
5312        assert_eq!(
5313            Err(Error::ClosedCriticalStream),
5314            s.server.poll(&mut s.pipe.server)
5315        );
5316        assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
5317    }
5318
5319    #[test]
5320    /// Client closes QPACK stream after sending some stuff, which is forbidden.
5321    fn close_qpack_stream_after_data() {
5322        let mut s = Session::new().unwrap();
5323        s.handshake().unwrap();
5324
5325        let stream_id = s.client.local_qpack_streams.encoder_stream_id.unwrap();
5326        let d = [0; 1];
5327
5328        s.pipe.client.stream_send(stream_id, &d, false).unwrap();
5329        s.pipe.client.stream_send(stream_id, &d, true).unwrap();
5330
5331        s.advance().ok();
5332
5333        assert_eq!(
5334            Err(Error::ClosedCriticalStream),
5335            s.server.poll(&mut s.pipe.server)
5336        );
5337        assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
5338    }
5339
5340    #[test]
5341    /// Client resets QPACK stream, which is forbidden.
5342    fn reset_qpack_stream_after_type() {
5343        let mut s = Session::new().unwrap();
5344        s.handshake().unwrap();
5345
5346        s.pipe
5347            .client
5348            .stream_shutdown(
5349                s.client.local_qpack_streams.encoder_stream_id.unwrap(),
5350                crate::Shutdown::Write,
5351                0,
5352            )
5353            .unwrap();
5354
5355        s.advance().ok();
5356
5357        assert_eq!(
5358            Err(Error::ClosedCriticalStream),
5359            s.server.poll(&mut s.pipe.server)
5360        );
5361        assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
5362    }
5363
5364    #[test]
5365    /// Client resets QPACK stream after sending some stuff, which is forbidden.
5366    fn reset_qpack_stream_after_data() {
5367        let mut s = Session::new().unwrap();
5368        s.handshake().unwrap();
5369
5370        let stream_id = s.client.local_qpack_streams.encoder_stream_id.unwrap();
5371        let d = [0; 1];
5372
5373        s.pipe.client.stream_send(stream_id, &d, false).unwrap();
5374        s.pipe.client.stream_send(stream_id, &d, false).unwrap();
5375
5376        s.advance().ok();
5377
5378        assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
5379
5380        s.pipe
5381            .client
5382            .stream_shutdown(stream_id, crate::Shutdown::Write, 0)
5383            .unwrap();
5384
5385        s.advance().ok();
5386
5387        assert_eq!(
5388            Err(Error::ClosedCriticalStream),
5389            s.server.poll(&mut s.pipe.server)
5390        );
5391        assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
5392    }
5393
5394    #[test]
5395    /// Client sends QPACK data.
5396    fn qpack_data() {
5397        // TODO: QPACK instructions are ignored until dynamic table support is
5398        // added so we just test that the data is safely ignored.
5399        let mut s = Session::new().unwrap();
5400        s.handshake().unwrap();
5401
5402        let e_stream_id = s.client.local_qpack_streams.encoder_stream_id.unwrap();
5403        let d_stream_id = s.client.local_qpack_streams.decoder_stream_id.unwrap();
5404        let d = [0; 20];
5405
5406        s.pipe.client.stream_send(e_stream_id, &d, false).unwrap();
5407        s.advance().ok();
5408
5409        s.pipe.client.stream_send(d_stream_id, &d, false).unwrap();
5410        s.advance().ok();
5411
5412        match s.server.poll(&mut s.pipe.server) {
5413            Ok(_) => panic!(),
5414
5415            Err(Error::Done) => {
5416                assert_eq!(s.server.peer_qpack_streams.encoder_stream_bytes, 20);
5417                assert_eq!(s.server.peer_qpack_streams.decoder_stream_bytes, 20);
5418            },
5419
5420            Err(_) => {
5421                panic!();
5422            },
5423        }
5424
5425        let stats = s.server.stats();
5426        assert_eq!(stats.qpack_encoder_stream_recv_bytes, 20);
5427        assert_eq!(stats.qpack_decoder_stream_recv_bytes, 20);
5428    }
5429
5430    #[test]
5431    /// Tests limits for the stream state buffer maximum size.
5432    fn max_state_buf_size() {
5433        let mut s = Session::new().unwrap();
5434        s.handshake().unwrap();
5435
5436        let req = vec![
5437            Header::new(b":method", b"GET"),
5438            Header::new(b":scheme", b"https"),
5439            Header::new(b":authority", b"quic.tech"),
5440            Header::new(b":path", b"/test"),
5441            Header::new(b"user-agent", b"quiche-test"),
5442        ];
5443
5444        assert_eq!(
5445            s.client.send_request(&mut s.pipe.client, &req, false),
5446            Ok(0)
5447        );
5448
5449        s.advance().ok();
5450
5451        let ev_headers = Event::Headers {
5452            list: req,
5453            more_frames: true,
5454        };
5455
5456        assert_eq!(s.server.poll(&mut s.pipe.server), Ok((0, ev_headers)));
5457
5458        // DATA frames don't consume the state buffer, so can be of any size.
5459        let mut d = [42; 128];
5460        let mut b = octets::OctetsMut::with_slice(&mut d);
5461
5462        let frame_type = b.put_varint(frame::DATA_FRAME_TYPE_ID).unwrap();
5463        s.pipe.client.stream_send(0, frame_type, false).unwrap();
5464
5465        let frame_len = b.put_varint(1 << 24).unwrap();
5466        s.pipe.client.stream_send(0, frame_len, false).unwrap();
5467
5468        s.pipe.client.stream_send(0, &d, false).unwrap();
5469
5470        s.advance().ok();
5471
5472        assert_eq!(s.server.poll(&mut s.pipe.server), Ok((0, Event::Data)));
5473
5474        // GREASE frames consume the state buffer, so need to be limited.
5475        let mut s = Session::new().unwrap();
5476        s.handshake().unwrap();
5477
5478        let mut d = [42; 128];
5479        let mut b = octets::OctetsMut::with_slice(&mut d);
5480
5481        let frame_type = b.put_varint(148_764_065_110_560_899).unwrap();
5482        s.pipe.client.stream_send(0, frame_type, false).unwrap();
5483
5484        let frame_len = b.put_varint(1 << 24).unwrap();
5485        s.pipe.client.stream_send(0, frame_len, false).unwrap();
5486
5487        s.pipe.client.stream_send(0, &d, false).unwrap();
5488
5489        s.advance().ok();
5490
5491        assert_eq!(s.server.poll(&mut s.pipe.server), Err(Error::ExcessiveLoad));
5492    }
5493
5494    #[test]
5495    /// Tests that DATA frames are properly truncated depending on the request
5496    /// stream's outgoing flow control capacity.
5497    fn stream_backpressure() {
5498        let bytes = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
5499
5500        let mut s = Session::new().unwrap();
5501        s.handshake().unwrap();
5502
5503        let (stream, req) = s.send_request(false).unwrap();
5504
5505        let total_data_frames = 6;
5506
5507        for _ in 0..total_data_frames {
5508            assert_eq!(
5509                s.client
5510                    .send_body(&mut s.pipe.client, stream, &bytes, false),
5511                Ok(bytes.len())
5512            );
5513
5514            s.advance().ok();
5515        }
5516
5517        assert_eq!(
5518            s.client.send_body(&mut s.pipe.client, stream, &bytes, true),
5519            Ok(bytes.len() - 2)
5520        );
5521
5522        s.advance().ok();
5523
5524        let mut recv_buf = vec![0; bytes.len()];
5525
5526        let ev_headers = Event::Headers {
5527            list: req,
5528            more_frames: true,
5529        };
5530
5531        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
5532        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
5533        assert_eq!(s.poll_server(), Err(Error::Done));
5534
5535        for _ in 0..total_data_frames {
5536            assert_eq!(
5537                s.recv_body_server(stream, &mut recv_buf),
5538                Ok(bytes.len())
5539            );
5540        }
5541
5542        assert_eq!(
5543            s.recv_body_server(stream, &mut recv_buf),
5544            Ok(bytes.len() - 2)
5545        );
5546
5547        // Fin flag from last send_body() call was not sent as the buffer was
5548        // only partially written.
5549        assert_eq!(s.poll_server(), Err(Error::Done));
5550    }
5551
5552    #[test]
5553    /// Tests that the max header list size setting is enforced.
5554    fn request_max_header_size_limit() {
5555        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
5556        config
5557            .load_cert_chain_from_pem_file("examples/cert.crt")
5558            .unwrap();
5559        config
5560            .load_priv_key_from_pem_file("examples/cert.key")
5561            .unwrap();
5562        config.set_application_protos(&[b"h3"]).unwrap();
5563        config.set_initial_max_data(1500);
5564        config.set_initial_max_stream_data_bidi_local(150);
5565        config.set_initial_max_stream_data_bidi_remote(150);
5566        config.set_initial_max_stream_data_uni(150);
5567        config.set_initial_max_streams_bidi(5);
5568        config.set_initial_max_streams_uni(5);
5569        config.verify_peer(false);
5570
5571        let mut h3_config = Config::new().unwrap();
5572        h3_config.set_max_field_section_size(65);
5573
5574        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
5575
5576        s.handshake().unwrap();
5577
5578        let req = vec![
5579            Header::new(b":method", b"GET"),
5580            Header::new(b":scheme", b"https"),
5581            Header::new(b":authority", b"quic.tech"),
5582            Header::new(b":path", b"/test"),
5583            Header::new(b"aaaaaaa", b"aaaaaaaa"),
5584        ];
5585
5586        let stream = s
5587            .client
5588            .send_request(&mut s.pipe.client, &req, true)
5589            .unwrap();
5590
5591        s.advance().ok();
5592
5593        assert_eq!(stream, 0);
5594
5595        assert_eq!(s.poll_server(), Err(Error::ExcessiveLoad));
5596
5597        assert_eq!(
5598            s.pipe.server.local_error.as_ref().unwrap().error_code,
5599            Error::to_wire(Error::ExcessiveLoad)
5600        );
5601    }
5602
5603    #[test]
5604    /// Tests that Error::TransportError contains a transport error.
5605    fn transport_error() {
5606        let mut s = Session::new().unwrap();
5607        s.handshake().unwrap();
5608
5609        let req = vec![
5610            Header::new(b":method", b"GET"),
5611            Header::new(b":scheme", b"https"),
5612            Header::new(b":authority", b"quic.tech"),
5613            Header::new(b":path", b"/test"),
5614            Header::new(b"user-agent", b"quiche-test"),
5615        ];
5616
5617        // We need to open all streams in the same flight, so we can't use the
5618        // Session::send_request() method because it also calls advance(),
5619        // otherwise the server would send a MAX_STREAMS frame and the client
5620        // wouldn't hit the streams limit.
5621        assert_eq!(s.client.send_request(&mut s.pipe.client, &req, true), Ok(0));
5622        assert_eq!(s.client.send_request(&mut s.pipe.client, &req, true), Ok(4));
5623        assert_eq!(s.client.send_request(&mut s.pipe.client, &req, true), Ok(8));
5624        assert_eq!(
5625            s.client.send_request(&mut s.pipe.client, &req, true),
5626            Ok(12)
5627        );
5628        assert_eq!(
5629            s.client.send_request(&mut s.pipe.client, &req, true),
5630            Ok(16)
5631        );
5632
5633        assert_eq!(
5634            s.client.send_request(&mut s.pipe.client, &req, true),
5635            Err(Error::TransportError(crate::Error::StreamLimit))
5636        );
5637    }
5638
5639    #[test]
5640    /// Tests that sending DATA before HEADERS causes an error.
5641    fn data_before_headers() {
5642        let mut s = Session::new().unwrap();
5643        s.handshake().unwrap();
5644
5645        let mut d = [42; 128];
5646        let mut b = octets::OctetsMut::with_slice(&mut d);
5647
5648        let frame_type = b.put_varint(frame::DATA_FRAME_TYPE_ID).unwrap();
5649        s.pipe.client.stream_send(0, frame_type, false).unwrap();
5650
5651        let frame_len = b.put_varint(5).unwrap();
5652        s.pipe.client.stream_send(0, frame_len, false).unwrap();
5653
5654        s.pipe.client.stream_send(0, b"hello", false).unwrap();
5655
5656        s.advance().ok();
5657
5658        assert_eq!(
5659            s.server.poll(&mut s.pipe.server),
5660            Err(Error::FrameUnexpected)
5661        );
5662    }
5663
5664    #[test]
5665    /// Tests that calling poll() after an error occurred does nothing.
5666    fn poll_after_error() {
5667        let mut s = Session::new().unwrap();
5668        s.handshake().unwrap();
5669
5670        let mut d = [42; 128];
5671        let mut b = octets::OctetsMut::with_slice(&mut d);
5672
5673        let frame_type = b.put_varint(148_764_065_110_560_899).unwrap();
5674        s.pipe.client.stream_send(0, frame_type, false).unwrap();
5675
5676        let frame_len = b.put_varint(1 << 24).unwrap();
5677        s.pipe.client.stream_send(0, frame_len, false).unwrap();
5678
5679        s.pipe.client.stream_send(0, &d, false).unwrap();
5680
5681        s.advance().ok();
5682
5683        assert_eq!(s.server.poll(&mut s.pipe.server), Err(Error::ExcessiveLoad));
5684
5685        // Try to call poll() again after an error occurred.
5686        assert_eq!(s.server.poll(&mut s.pipe.server), Err(Error::Done));
5687    }
5688
5689    #[test]
5690    /// Tests that we limit sending HEADERS based on the stream capacity.
5691    fn headers_blocked() {
5692        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
5693        config
5694            .load_cert_chain_from_pem_file("examples/cert.crt")
5695            .unwrap();
5696        config
5697            .load_priv_key_from_pem_file("examples/cert.key")
5698            .unwrap();
5699        config.set_application_protos(&[b"h3"]).unwrap();
5700        config.set_initial_max_data(70);
5701        config.set_initial_max_stream_data_bidi_local(150);
5702        config.set_initial_max_stream_data_bidi_remote(150);
5703        config.set_initial_max_stream_data_uni(150);
5704        config.set_initial_max_streams_bidi(100);
5705        config.set_initial_max_streams_uni(5);
5706        config.verify_peer(false);
5707
5708        let h3_config = Config::new().unwrap();
5709
5710        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
5711
5712        s.handshake().unwrap();
5713
5714        let req = vec![
5715            Header::new(b":method", b"GET"),
5716            Header::new(b":scheme", b"https"),
5717            Header::new(b":authority", b"quic.tech"),
5718            Header::new(b":path", b"/test"),
5719        ];
5720
5721        assert_eq!(s.client.send_request(&mut s.pipe.client, &req, true), Ok(0));
5722
5723        assert_eq!(
5724            s.client.send_request(&mut s.pipe.client, &req, true),
5725            Err(Error::StreamBlocked)
5726        );
5727
5728        // Clear the writable stream queue.
5729        assert_eq!(s.pipe.client.stream_writable_next(), Some(2));
5730        assert_eq!(s.pipe.client.stream_writable_next(), Some(6));
5731        assert_eq!(s.pipe.client.stream_writable_next(), Some(10));
5732        assert_eq!(s.pipe.client.stream_writable_next(), None);
5733
5734        s.advance().ok();
5735
5736        // Once the server gives flow control credits back, we can send the
5737        // request.
5738        assert_eq!(s.pipe.client.stream_writable_next(), Some(4));
5739        assert_eq!(s.client.send_request(&mut s.pipe.client, &req, true), Ok(4));
5740    }
5741
5742    #[test]
5743    /// Ensure StreamBlocked when connection flow control prevents headers.
5744    fn headers_blocked_on_conn() {
5745        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
5746        config
5747            .load_cert_chain_from_pem_file("examples/cert.crt")
5748            .unwrap();
5749        config
5750            .load_priv_key_from_pem_file("examples/cert.key")
5751            .unwrap();
5752        config.set_application_protos(&[b"h3"]).unwrap();
5753        config.set_initial_max_data(70);
5754        config.set_initial_max_stream_data_bidi_local(150);
5755        config.set_initial_max_stream_data_bidi_remote(150);
5756        config.set_initial_max_stream_data_uni(150);
5757        config.set_initial_max_streams_bidi(100);
5758        config.set_initial_max_streams_uni(5);
5759        config.verify_peer(false);
5760
5761        let h3_config = Config::new().unwrap();
5762
5763        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
5764
5765        s.handshake().unwrap();
5766
5767        // After the HTTP handshake, some bytes of connection flow control have
5768        // been consumed. Fill the connection with more grease data on the control
5769        // stream.
5770        let d = [42; 28];
5771        assert_eq!(s.pipe.client.stream_send(2, &d, false), Ok(23));
5772
5773        let req = vec![
5774            Header::new(b":method", b"GET"),
5775            Header::new(b":scheme", b"https"),
5776            Header::new(b":authority", b"quic.tech"),
5777            Header::new(b":path", b"/test"),
5778        ];
5779
5780        // There is 0 connection-level flow control, so sending a request is
5781        // blocked.
5782        assert_eq!(
5783            s.client.send_request(&mut s.pipe.client, &req, true),
5784            Err(Error::StreamBlocked)
5785        );
5786        assert_eq!(s.pipe.client.stream_writable_next(), None);
5787
5788        // Emit the control stream data and drain it at the server via poll() to
5789        // consumes it via poll() and gives back flow control.
5790        s.advance().ok();
5791        assert_eq!(s.poll_server(), Err(Error::Done));
5792        s.advance().ok();
5793
5794        // Now we can send the request.
5795        assert_eq!(s.pipe.client.stream_writable_next(), Some(2));
5796        assert_eq!(s.pipe.client.stream_writable_next(), Some(6));
5797        assert_eq!(s.client.send_request(&mut s.pipe.client, &req, true), Ok(0));
5798    }
5799
5800    #[test]
5801    /// Ensure STREAM_DATA_BLOCKED is not emitted multiple times with the same
5802    /// offset when trying to send large bodies.
5803    fn send_body_truncation_stream_blocked() {
5804        use crate::testing::decode_pkt;
5805
5806        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
5807        config
5808            .load_cert_chain_from_pem_file("examples/cert.crt")
5809            .unwrap();
5810        config
5811            .load_priv_key_from_pem_file("examples/cert.key")
5812            .unwrap();
5813        config.set_application_protos(&[b"h3"]).unwrap();
5814        config.set_initial_max_data(10000); // large connection-level flow control
5815        config.set_initial_max_stream_data_bidi_local(80);
5816        config.set_initial_max_stream_data_bidi_remote(80);
5817        config.set_initial_max_stream_data_uni(150);
5818        config.set_initial_max_streams_bidi(100);
5819        config.set_initial_max_streams_uni(5);
5820        config.verify_peer(false);
5821
5822        let h3_config = Config::new().unwrap();
5823
5824        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
5825
5826        s.handshake().unwrap();
5827
5828        let (stream, req) = s.send_request(true).unwrap();
5829
5830        let ev_headers = Event::Headers {
5831            list: req,
5832            more_frames: false,
5833        };
5834
5835        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
5836        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
5837
5838        let _ = s.send_response(stream, false).unwrap();
5839
5840        assert_eq!(s.pipe.server.streams.blocked().len(), 0);
5841
5842        // The body must be larger than the stream window would allow
5843        let d = [42; 500];
5844        let mut off = 0;
5845
5846        let sent = s
5847            .server
5848            .send_body(&mut s.pipe.server, stream, &d, true)
5849            .unwrap();
5850        assert_eq!(sent, 25);
5851        off += sent;
5852
5853        // send_body wrote as much as it could (sent < size of buff).
5854        assert_eq!(s.pipe.server.streams.blocked().len(), 1);
5855        assert_eq!(
5856            s.server
5857                .send_body(&mut s.pipe.server, stream, &d[off..], true),
5858            Err(Error::Done)
5859        );
5860        assert_eq!(s.pipe.server.streams.blocked().len(), 1);
5861
5862        // Now read raw frames to see what the QUIC layer did
5863        let mut buf = [0; 65535];
5864        let (len, _) = s.pipe.server.send(&mut buf).unwrap();
5865
5866        let frames = decode_pkt(&mut s.pipe.client, &mut buf[..len]).unwrap();
5867
5868        let mut iter = frames.iter();
5869
5870        assert_eq!(
5871            iter.next(),
5872            Some(&crate::frame::Frame::StreamDataBlocked {
5873                stream_id: 0,
5874                limit: 80,
5875            })
5876        );
5877
5878        // At the server, after sending the STREAM_DATA_BLOCKED frame, we clear
5879        // the mark.
5880        assert_eq!(s.pipe.server.streams.blocked().len(), 0);
5881
5882        // Don't read any data from the client, so stream flow control is never
5883        // given back in the form of changing the stream's max offset.
5884        // Subsequent body send operations will still fail but no more
5885        // STREAM_DATA_BLOCKED frames should be submitted since the limit didn't
5886        // change. No frames means no packet to send.
5887        assert_eq!(
5888            s.server
5889                .send_body(&mut s.pipe.server, stream, &d[off..], true),
5890            Err(Error::Done)
5891        );
5892        assert_eq!(s.pipe.server.streams.blocked().len(), 0);
5893        assert_eq!(s.pipe.server.send(&mut buf), Err(crate::Error::Done));
5894
5895        // Now update the client's max offset manually.
5896        let frames = [crate::frame::Frame::MaxStreamData {
5897            stream_id: 0,
5898            max: 100,
5899        }];
5900
5901        let pkt_type = crate::packet::Type::Short;
5902        assert_eq!(
5903            s.pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
5904            Ok(39),
5905        );
5906
5907        let sent = s
5908            .server
5909            .send_body(&mut s.pipe.server, stream, &d[off..], true)
5910            .unwrap();
5911        assert_eq!(sent, 18);
5912
5913        // Same thing here...
5914        assert_eq!(s.pipe.server.streams.blocked().len(), 1);
5915        assert_eq!(
5916            s.server
5917                .send_body(&mut s.pipe.server, stream, &d[off..], true),
5918            Err(Error::Done)
5919        );
5920        assert_eq!(s.pipe.server.streams.blocked().len(), 1);
5921
5922        let (len, _) = s.pipe.server.send(&mut buf).unwrap();
5923
5924        let frames = decode_pkt(&mut s.pipe.client, &mut buf[..len]).unwrap();
5925
5926        let mut iter = frames.iter();
5927
5928        assert_eq!(
5929            iter.next(),
5930            Some(&crate::frame::Frame::StreamDataBlocked {
5931                stream_id: 0,
5932                limit: 100,
5933            })
5934        );
5935    }
5936
5937    #[test]
5938    /// Ensure stream doesn't hang due to small cwnd.
5939    fn send_body_stream_blocked_by_small_cwnd() {
5940        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
5941        config
5942            .load_cert_chain_from_pem_file("examples/cert.crt")
5943            .unwrap();
5944        config
5945            .load_priv_key_from_pem_file("examples/cert.key")
5946            .unwrap();
5947        config.set_application_protos(&[b"h3"]).unwrap();
5948        config.set_initial_max_data(100000); // large connection-level flow control
5949        config.set_initial_max_stream_data_bidi_local(100000);
5950        config.set_initial_max_stream_data_bidi_remote(50000);
5951        config.set_initial_max_stream_data_uni(150);
5952        config.set_initial_max_streams_bidi(100);
5953        config.set_initial_max_streams_uni(5);
5954        config.verify_peer(false);
5955
5956        let h3_config = Config::new().unwrap();
5957
5958        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
5959
5960        s.handshake().unwrap();
5961
5962        let (stream, req) = s.send_request(true).unwrap();
5963
5964        let ev_headers = Event::Headers {
5965            list: req,
5966            more_frames: false,
5967        };
5968
5969        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
5970        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
5971
5972        let _ = s.send_response(stream, false).unwrap();
5973
5974        // Clear the writable stream queue.
5975        assert_eq!(s.pipe.server.stream_writable_next(), Some(3));
5976        assert_eq!(s.pipe.server.stream_writable_next(), Some(7));
5977        assert_eq!(s.pipe.server.stream_writable_next(), Some(11));
5978        assert_eq!(s.pipe.server.stream_writable_next(), Some(stream));
5979        assert_eq!(s.pipe.server.stream_writable_next(), None);
5980
5981        // The body must be larger than the cwnd would allow.
5982        let send_buf = [42; 80000];
5983
5984        let sent = s
5985            .server
5986            .send_body(&mut s.pipe.server, stream, &send_buf, true)
5987            .unwrap();
5988
5989        // send_body wrote as much as it could (sent < size of buff).
5990        assert_eq!(sent, 11995);
5991
5992        s.advance().ok();
5993
5994        // Client reads received headers and body.
5995        let mut recv_buf = [42; 80000];
5996        assert!(s.poll_client().is_ok());
5997        assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
5998        assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(11995));
5999
6000        s.advance().ok();
6001
6002        // Server send cap is smaller than remaining body buffer.
6003        assert!(s.pipe.server.tx_cap < send_buf.len() - sent);
6004
6005        // Once the server cwnd opens up, we can send more body.
6006        assert_eq!(s.pipe.server.stream_writable_next(), Some(0));
6007    }
6008
6009    #[test]
6010    /// Ensure stream doesn't hang due to small cwnd.
6011    fn send_body_stream_blocked_zero_length() {
6012        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
6013        config
6014            .load_cert_chain_from_pem_file("examples/cert.crt")
6015            .unwrap();
6016        config
6017            .load_priv_key_from_pem_file("examples/cert.key")
6018            .unwrap();
6019        config.set_application_protos(&[b"h3"]).unwrap();
6020        config.set_initial_max_data(100000); // large connection-level flow control
6021        config.set_initial_max_stream_data_bidi_local(100000);
6022        config.set_initial_max_stream_data_bidi_remote(50000);
6023        config.set_initial_max_stream_data_uni(150);
6024        config.set_initial_max_streams_bidi(100);
6025        config.set_initial_max_streams_uni(5);
6026        config.verify_peer(false);
6027
6028        let h3_config = Config::new().unwrap();
6029
6030        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
6031
6032        s.handshake().unwrap();
6033
6034        let (stream, req) = s.send_request(true).unwrap();
6035
6036        let ev_headers = Event::Headers {
6037            list: req,
6038            more_frames: false,
6039        };
6040
6041        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
6042        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
6043
6044        let _ = s.send_response(stream, false).unwrap();
6045
6046        // Clear the writable stream queue.
6047        assert_eq!(s.pipe.server.stream_writable_next(), Some(3));
6048        assert_eq!(s.pipe.server.stream_writable_next(), Some(7));
6049        assert_eq!(s.pipe.server.stream_writable_next(), Some(11));
6050        assert_eq!(s.pipe.server.stream_writable_next(), Some(stream));
6051        assert_eq!(s.pipe.server.stream_writable_next(), None);
6052
6053        // The body is large enough to fill the cwnd, except for enough bytes
6054        // for another DATA frame header (but no payload).
6055        let send_buf = [42; 11994];
6056
6057        let sent = s
6058            .server
6059            .send_body(&mut s.pipe.server, stream, &send_buf, false)
6060            .unwrap();
6061
6062        assert_eq!(sent, 11994);
6063
6064        // There is only enough capacity left for the DATA frame header, but
6065        // no payload.
6066        assert_eq!(s.pipe.server.stream_capacity(stream).unwrap(), 3);
6067        assert_eq!(
6068            s.server
6069                .send_body(&mut s.pipe.server, stream, &send_buf, false),
6070            Err(Error::Done)
6071        );
6072
6073        s.advance().ok();
6074
6075        // Client reads received headers and body.
6076        let mut recv_buf = [42; 80000];
6077        assert!(s.poll_client().is_ok());
6078        assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
6079        assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(11994));
6080
6081        s.advance().ok();
6082
6083        // Once the server cwnd opens up, we can send more body.
6084        assert_eq!(s.pipe.server.stream_writable_next(), Some(0));
6085    }
6086
6087    #[test]
6088    /// Test handling of 0-length DATA writes with and without fin.
6089    fn zero_length_data() {
6090        let mut s = Session::new().unwrap();
6091        s.handshake().unwrap();
6092
6093        let (stream, req) = s.send_request(false).unwrap();
6094
6095        assert_eq!(
6096            s.client.send_body(&mut s.pipe.client, 0, b"", false),
6097            Err(Error::Done)
6098        );
6099        assert_eq!(s.client.send_body(&mut s.pipe.client, 0, b"", true), Ok(0));
6100
6101        s.advance().ok();
6102
6103        let mut recv_buf = vec![0; 100];
6104
6105        let ev_headers = Event::Headers {
6106            list: req,
6107            more_frames: true,
6108        };
6109
6110        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
6111
6112        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
6113        assert_eq!(s.recv_body_server(stream, &mut recv_buf), Err(Error::Done));
6114
6115        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
6116        assert_eq!(s.poll_server(), Err(Error::Done));
6117
6118        let resp = s.send_response(stream, false).unwrap();
6119
6120        assert_eq!(
6121            s.server.send_body(&mut s.pipe.server, 0, b"", false),
6122            Err(Error::Done)
6123        );
6124        assert_eq!(s.server.send_body(&mut s.pipe.server, 0, b"", true), Ok(0));
6125
6126        s.advance().ok();
6127
6128        let ev_headers = Event::Headers {
6129            list: resp,
6130            more_frames: true,
6131        };
6132
6133        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
6134
6135        assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
6136        assert_eq!(s.recv_body_client(stream, &mut recv_buf), Err(Error::Done));
6137
6138        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
6139        assert_eq!(s.poll_client(), Err(Error::Done));
6140    }
6141
6142    #[test]
6143    /// Tests that blocked 0-length DATA writes are reported correctly.
6144    fn zero_length_data_blocked() {
6145        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
6146        config
6147            .load_cert_chain_from_pem_file("examples/cert.crt")
6148            .unwrap();
6149        config
6150            .load_priv_key_from_pem_file("examples/cert.key")
6151            .unwrap();
6152        config.set_application_protos(&[b"h3"]).unwrap();
6153        config.set_initial_max_data(69);
6154        config.set_initial_max_stream_data_bidi_local(150);
6155        config.set_initial_max_stream_data_bidi_remote(150);
6156        config.set_initial_max_stream_data_uni(150);
6157        config.set_initial_max_streams_bidi(100);
6158        config.set_initial_max_streams_uni(5);
6159        config.verify_peer(false);
6160
6161        let h3_config = Config::new().unwrap();
6162
6163        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
6164
6165        s.handshake().unwrap();
6166
6167        let req = vec![
6168            Header::new(b":method", b"GET"),
6169            Header::new(b":scheme", b"https"),
6170            Header::new(b":authority", b"quic.tech"),
6171            Header::new(b":path", b"/test"),
6172        ];
6173
6174        assert_eq!(
6175            s.client.send_request(&mut s.pipe.client, &req, false),
6176            Ok(0)
6177        );
6178
6179        assert_eq!(
6180            s.client.send_body(&mut s.pipe.client, 0, b"", true),
6181            Err(Error::Done)
6182        );
6183
6184        // Clear the writable stream queue.
6185        assert_eq!(s.pipe.client.stream_writable_next(), Some(2));
6186        assert_eq!(s.pipe.client.stream_writable_next(), Some(6));
6187        assert_eq!(s.pipe.client.stream_writable_next(), Some(10));
6188        assert_eq!(s.pipe.client.stream_writable_next(), None);
6189
6190        s.advance().ok();
6191
6192        // Once the server gives flow control credits back, we can send the body.
6193        assert_eq!(s.pipe.client.stream_writable_next(), Some(0));
6194        assert_eq!(s.client.send_body(&mut s.pipe.client, 0, b"", true), Ok(0));
6195    }
6196
6197    #[test]
6198    /// Tests that receiving an empty SETTINGS frame is handled and reported.
6199    fn empty_settings() {
6200        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
6201        config
6202            .load_cert_chain_from_pem_file("examples/cert.crt")
6203            .unwrap();
6204        config
6205            .load_priv_key_from_pem_file("examples/cert.key")
6206            .unwrap();
6207        config.set_application_protos(&[b"h3"]).unwrap();
6208        config.set_initial_max_data(1500);
6209        config.set_initial_max_stream_data_bidi_local(150);
6210        config.set_initial_max_stream_data_bidi_remote(150);
6211        config.set_initial_max_stream_data_uni(150);
6212        config.set_initial_max_streams_bidi(5);
6213        config.set_initial_max_streams_uni(5);
6214        config.verify_peer(false);
6215        config.set_ack_delay_exponent(8);
6216        config.grease(false);
6217
6218        let h3_config = Config::new().unwrap();
6219        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
6220
6221        s.handshake().unwrap();
6222
6223        assert!(s.client.peer_settings_raw().is_some());
6224        assert!(s.server.peer_settings_raw().is_some());
6225    }
6226
6227    #[test]
6228    /// Tests that receiving a H3_DATAGRAM setting is ok.
6229    fn dgram_setting() {
6230        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
6231        config
6232            .load_cert_chain_from_pem_file("examples/cert.crt")
6233            .unwrap();
6234        config
6235            .load_priv_key_from_pem_file("examples/cert.key")
6236            .unwrap();
6237        config.set_application_protos(&[b"h3"]).unwrap();
6238        config.set_initial_max_data(70);
6239        config.set_initial_max_stream_data_bidi_local(150);
6240        config.set_initial_max_stream_data_bidi_remote(150);
6241        config.set_initial_max_stream_data_uni(150);
6242        config.set_initial_max_streams_bidi(100);
6243        config.set_initial_max_streams_uni(5);
6244        config.enable_dgram(true, 1000, 1000);
6245        config.verify_peer(false);
6246
6247        let h3_config = Config::new().unwrap();
6248
6249        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
6250        assert_eq!(s.pipe.handshake(), Ok(()));
6251
6252        s.client.send_settings(&mut s.pipe.client).unwrap();
6253        assert_eq!(s.pipe.advance(), Ok(()));
6254
6255        // Before processing SETTINGS (via poll), HTTP/3 DATAGRAMS are not
6256        // enabled.
6257        assert!(!s.server.dgram_enabled_by_peer(&s.pipe.server));
6258
6259        // When everything is ok, poll returns Done and DATAGRAM is enabled.
6260        assert_eq!(s.server.poll(&mut s.pipe.server), Err(Error::Done));
6261        assert!(s.server.dgram_enabled_by_peer(&s.pipe.server));
6262
6263        // Now detect things on the client
6264        s.server.send_settings(&mut s.pipe.server).unwrap();
6265        assert_eq!(s.pipe.advance(), Ok(()));
6266        assert!(!s.client.dgram_enabled_by_peer(&s.pipe.client));
6267        assert_eq!(s.client.poll(&mut s.pipe.client), Err(Error::Done));
6268        assert!(s.client.dgram_enabled_by_peer(&s.pipe.client));
6269    }
6270
6271    #[test]
6272    /// Tests that receiving a H3_DATAGRAM setting when no TP is set generates
6273    /// an error.
6274    fn dgram_setting_no_tp() {
6275        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
6276        config
6277            .load_cert_chain_from_pem_file("examples/cert.crt")
6278            .unwrap();
6279        config
6280            .load_priv_key_from_pem_file("examples/cert.key")
6281            .unwrap();
6282        config.set_application_protos(&[b"h3"]).unwrap();
6283        config.set_initial_max_data(70);
6284        config.set_initial_max_stream_data_bidi_local(150);
6285        config.set_initial_max_stream_data_bidi_remote(150);
6286        config.set_initial_max_stream_data_uni(150);
6287        config.set_initial_max_streams_bidi(100);
6288        config.set_initial_max_streams_uni(5);
6289        config.verify_peer(false);
6290
6291        let h3_config = Config::new().unwrap();
6292
6293        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
6294        assert_eq!(s.pipe.handshake(), Ok(()));
6295
6296        s.client.control_stream_id = Some(
6297            s.client
6298                .open_uni_stream(
6299                    &mut s.pipe.client,
6300                    stream::HTTP3_CONTROL_STREAM_TYPE_ID,
6301                )
6302                .unwrap(),
6303        );
6304
6305        let settings = frame::Frame::Settings {
6306            max_field_section_size: None,
6307            qpack_max_table_capacity: None,
6308            qpack_blocked_streams: None,
6309            connect_protocol_enabled: None,
6310            h3_datagram: Some(1),
6311            grease: None,
6312            additional_settings: Default::default(),
6313            raw: Default::default(),
6314        };
6315
6316        s.send_frame_client(settings, s.client.control_stream_id.unwrap(), false)
6317            .unwrap();
6318
6319        assert_eq!(s.pipe.advance(), Ok(()));
6320
6321        assert_eq!(s.server.poll(&mut s.pipe.server), Err(Error::SettingsError));
6322    }
6323
6324    #[test]
6325    /// Tests that receiving SETTINGS with prohibited values generates an error.
6326    fn settings_h2_prohibited() {
6327        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
6328        config
6329            .load_cert_chain_from_pem_file("examples/cert.crt")
6330            .unwrap();
6331        config
6332            .load_priv_key_from_pem_file("examples/cert.key")
6333            .unwrap();
6334        config.set_application_protos(&[b"h3"]).unwrap();
6335        config.set_initial_max_data(70);
6336        config.set_initial_max_stream_data_bidi_local(150);
6337        config.set_initial_max_stream_data_bidi_remote(150);
6338        config.set_initial_max_stream_data_uni(150);
6339        config.set_initial_max_streams_bidi(100);
6340        config.set_initial_max_streams_uni(5);
6341        config.verify_peer(false);
6342
6343        let h3_config = Config::new().unwrap();
6344
6345        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
6346        assert_eq!(s.pipe.handshake(), Ok(()));
6347
6348        s.client.control_stream_id = Some(
6349            s.client
6350                .open_uni_stream(
6351                    &mut s.pipe.client,
6352                    stream::HTTP3_CONTROL_STREAM_TYPE_ID,
6353                )
6354                .unwrap(),
6355        );
6356
6357        s.server.control_stream_id = Some(
6358            s.server
6359                .open_uni_stream(
6360                    &mut s.pipe.server,
6361                    stream::HTTP3_CONTROL_STREAM_TYPE_ID,
6362                )
6363                .unwrap(),
6364        );
6365
6366        let frame_payload_len = 2u64;
6367        let settings = [
6368            frame::SETTINGS_FRAME_TYPE_ID as u8,
6369            frame_payload_len as u8,
6370            0x2, // 0x2 is a reserved setting type
6371            1,
6372        ];
6373
6374        s.send_arbitrary_stream_data_client(
6375            &settings,
6376            s.client.control_stream_id.unwrap(),
6377            false,
6378        )
6379        .unwrap();
6380
6381        s.send_arbitrary_stream_data_server(
6382            &settings,
6383            s.server.control_stream_id.unwrap(),
6384            false,
6385        )
6386        .unwrap();
6387
6388        assert_eq!(s.pipe.advance(), Ok(()));
6389
6390        assert_eq!(s.server.poll(&mut s.pipe.server), Err(Error::SettingsError));
6391
6392        assert_eq!(s.client.poll(&mut s.pipe.client), Err(Error::SettingsError));
6393    }
6394
6395    #[test]
6396    /// Tests that setting SETTINGS with prohibited values generates an error.
6397    fn set_prohibited_additional_settings() {
6398        let mut h3_config = Config::new().unwrap();
6399        assert_eq!(
6400            h3_config.set_additional_settings(vec![(
6401                frame::SETTINGS_QPACK_MAX_TABLE_CAPACITY,
6402                43
6403            )]),
6404            Err(Error::SettingsError)
6405        );
6406        assert_eq!(
6407            h3_config.set_additional_settings(vec![(
6408                frame::SETTINGS_MAX_FIELD_SECTION_SIZE,
6409                43
6410            )]),
6411            Err(Error::SettingsError)
6412        );
6413        assert_eq!(
6414            h3_config.set_additional_settings(vec![(
6415                frame::SETTINGS_QPACK_BLOCKED_STREAMS,
6416                43
6417            )]),
6418            Err(Error::SettingsError)
6419        );
6420        assert_eq!(
6421            h3_config.set_additional_settings(vec![(
6422                frame::SETTINGS_ENABLE_CONNECT_PROTOCOL,
6423                43
6424            )]),
6425            Err(Error::SettingsError)
6426        );
6427        assert_eq!(
6428            h3_config
6429                .set_additional_settings(vec![(frame::SETTINGS_H3_DATAGRAM, 43)]),
6430            Err(Error::SettingsError)
6431        );
6432    }
6433
6434    #[test]
6435    /// Tests additional settings are actually exchanged by the peers.
6436    fn set_additional_settings() {
6437        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
6438        config
6439            .load_cert_chain_from_pem_file("examples/cert.crt")
6440            .unwrap();
6441        config
6442            .load_priv_key_from_pem_file("examples/cert.key")
6443            .unwrap();
6444        config.set_application_protos(&[b"h3"]).unwrap();
6445        config.set_initial_max_data(70);
6446        config.set_initial_max_stream_data_bidi_local(150);
6447        config.set_initial_max_stream_data_bidi_remote(150);
6448        config.set_initial_max_stream_data_uni(150);
6449        config.set_initial_max_streams_bidi(100);
6450        config.set_initial_max_streams_uni(5);
6451        config.verify_peer(false);
6452        config.grease(false);
6453
6454        let mut h3_config = Config::new().unwrap();
6455        h3_config
6456            .set_additional_settings(vec![(42, 43), (44, 45)])
6457            .unwrap();
6458
6459        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
6460        assert_eq!(s.pipe.handshake(), Ok(()));
6461
6462        assert_eq!(s.pipe.advance(), Ok(()));
6463
6464        s.client.send_settings(&mut s.pipe.client).unwrap();
6465        assert_eq!(s.pipe.advance(), Ok(()));
6466        assert_eq!(s.server.poll(&mut s.pipe.server), Err(Error::Done));
6467
6468        s.server.send_settings(&mut s.pipe.server).unwrap();
6469        assert_eq!(s.pipe.advance(), Ok(()));
6470        assert_eq!(s.client.poll(&mut s.pipe.client), Err(Error::Done));
6471
6472        assert_eq!(
6473            s.server.peer_settings_raw(),
6474            Some(&[(42, 43), (44, 45)][..])
6475        );
6476        assert_eq!(
6477            s.client.peer_settings_raw(),
6478            Some(&[(42, 43), (44, 45)][..])
6479        );
6480    }
6481
6482    #[test]
6483    /// Send a single DATAGRAM.
6484    fn single_dgram() {
6485        let mut buf = [0; 65535];
6486        let mut s = Session::new().unwrap();
6487        s.handshake().unwrap();
6488
6489        // We'll send default data of 10 bytes on flow ID 0.
6490        let result = (11, 0, 1);
6491
6492        s.send_dgram_client(0).unwrap();
6493
6494        assert_eq!(s.poll_server(), Err(Error::Done));
6495        assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
6496
6497        s.send_dgram_server(0).unwrap();
6498        assert_eq!(s.poll_client(), Err(Error::Done));
6499        assert_eq!(s.recv_dgram_client(&mut buf), Ok(result));
6500    }
6501
6502    #[test]
6503    /// Send multiple DATAGRAMs.
6504    fn multiple_dgram() {
6505        let mut buf = [0; 65535];
6506        let mut s = Session::new().unwrap();
6507        s.handshake().unwrap();
6508
6509        // We'll send default data of 10 bytes on flow ID 0.
6510        let result = (11, 0, 1);
6511
6512        s.send_dgram_client(0).unwrap();
6513        s.send_dgram_client(0).unwrap();
6514        s.send_dgram_client(0).unwrap();
6515
6516        assert_eq!(s.poll_server(), Err(Error::Done));
6517        assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
6518        assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
6519        assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
6520        assert_eq!(s.recv_dgram_server(&mut buf), Err(Error::Done));
6521
6522        s.send_dgram_server(0).unwrap();
6523        s.send_dgram_server(0).unwrap();
6524        s.send_dgram_server(0).unwrap();
6525
6526        assert_eq!(s.poll_client(), Err(Error::Done));
6527        assert_eq!(s.recv_dgram_client(&mut buf), Ok(result));
6528        assert_eq!(s.recv_dgram_client(&mut buf), Ok(result));
6529        assert_eq!(s.recv_dgram_client(&mut buf), Ok(result));
6530        assert_eq!(s.recv_dgram_client(&mut buf), Err(Error::Done));
6531    }
6532
6533    #[test]
6534    /// Send more DATAGRAMs than the send queue allows.
6535    fn multiple_dgram_overflow() {
6536        let mut buf = [0; 65535];
6537        let mut s = Session::new().unwrap();
6538        s.handshake().unwrap();
6539
6540        // We'll send default data of 10 bytes on flow ID 0.
6541        let result = (11, 0, 1);
6542
6543        // Five DATAGRAMs
6544        s.send_dgram_client(0).unwrap();
6545        s.send_dgram_client(0).unwrap();
6546        s.send_dgram_client(0).unwrap();
6547        s.send_dgram_client(0).unwrap();
6548        s.send_dgram_client(0).unwrap();
6549
6550        // Only 3 independent DATAGRAMs to read events will fire.
6551        assert_eq!(s.poll_server(), Err(Error::Done));
6552        assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
6553        assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
6554        assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
6555        assert_eq!(s.recv_dgram_server(&mut buf), Err(Error::Done));
6556    }
6557
6558    #[test]
6559    /// Send a single DATAGRAM and request.
6560    fn poll_datagram_cycling_no_read() {
6561        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
6562        config
6563            .load_cert_chain_from_pem_file("examples/cert.crt")
6564            .unwrap();
6565        config
6566            .load_priv_key_from_pem_file("examples/cert.key")
6567            .unwrap();
6568        config.set_application_protos(&[b"h3"]).unwrap();
6569        config.set_initial_max_data(1500);
6570        config.set_initial_max_stream_data_bidi_local(150);
6571        config.set_initial_max_stream_data_bidi_remote(150);
6572        config.set_initial_max_stream_data_uni(150);
6573        config.set_initial_max_streams_bidi(100);
6574        config.set_initial_max_streams_uni(5);
6575        config.verify_peer(false);
6576        config.enable_dgram(true, 100, 100);
6577
6578        let h3_config = Config::new().unwrap();
6579        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
6580        s.handshake().unwrap();
6581
6582        // Send request followed by DATAGRAM on client side.
6583        let (stream, req) = s.send_request(false).unwrap();
6584
6585        s.send_body_client(stream, true).unwrap();
6586
6587        let ev_headers = Event::Headers {
6588            list: req,
6589            more_frames: true,
6590        };
6591
6592        s.send_dgram_client(0).unwrap();
6593
6594        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
6595        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
6596
6597        assert_eq!(s.poll_server(), Err(Error::Done));
6598    }
6599
6600    #[test]
6601    /// Send a single DATAGRAM and request.
6602    fn poll_datagram_single_read() {
6603        let mut buf = [0; 65535];
6604
6605        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
6606        config
6607            .load_cert_chain_from_pem_file("examples/cert.crt")
6608            .unwrap();
6609        config
6610            .load_priv_key_from_pem_file("examples/cert.key")
6611            .unwrap();
6612        config.set_application_protos(&[b"h3"]).unwrap();
6613        config.set_initial_max_data(1500);
6614        config.set_initial_max_stream_data_bidi_local(150);
6615        config.set_initial_max_stream_data_bidi_remote(150);
6616        config.set_initial_max_stream_data_uni(150);
6617        config.set_initial_max_streams_bidi(100);
6618        config.set_initial_max_streams_uni(5);
6619        config.verify_peer(false);
6620        config.enable_dgram(true, 100, 100);
6621
6622        let h3_config = Config::new().unwrap();
6623        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
6624        s.handshake().unwrap();
6625
6626        // We'll send default data of 10 bytes on flow ID 0.
6627        let result = (11, 0, 1);
6628
6629        // Send request followed by DATAGRAM on client side.
6630        let (stream, req) = s.send_request(false).unwrap();
6631
6632        let body = s.send_body_client(stream, true).unwrap();
6633
6634        let mut recv_buf = vec![0; body.len()];
6635
6636        let ev_headers = Event::Headers {
6637            list: req,
6638            more_frames: true,
6639        };
6640
6641        s.send_dgram_client(0).unwrap();
6642
6643        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
6644        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
6645
6646        assert_eq!(s.poll_server(), Err(Error::Done));
6647
6648        assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
6649
6650        assert_eq!(s.poll_server(), Err(Error::Done));
6651
6652        assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
6653        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
6654        assert_eq!(s.poll_server(), Err(Error::Done));
6655
6656        // Send response followed by DATAGRAM on server side
6657        let resp = s.send_response(stream, false).unwrap();
6658
6659        let body = s.send_body_server(stream, true).unwrap();
6660
6661        let mut recv_buf = vec![0; body.len()];
6662
6663        let ev_headers = Event::Headers {
6664            list: resp,
6665            more_frames: true,
6666        };
6667
6668        s.send_dgram_server(0).unwrap();
6669
6670        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
6671        assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
6672
6673        assert_eq!(s.poll_client(), Err(Error::Done));
6674
6675        assert_eq!(s.recv_dgram_client(&mut buf), Ok(result));
6676
6677        assert_eq!(s.poll_client(), Err(Error::Done));
6678
6679        assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(body.len()));
6680
6681        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
6682        assert_eq!(s.poll_client(), Err(Error::Done));
6683    }
6684
6685    #[test]
6686    /// Send multiple DATAGRAMs and requests.
6687    fn poll_datagram_multi_read() {
6688        let mut buf = [0; 65535];
6689
6690        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
6691        config
6692            .load_cert_chain_from_pem_file("examples/cert.crt")
6693            .unwrap();
6694        config
6695            .load_priv_key_from_pem_file("examples/cert.key")
6696            .unwrap();
6697        config.set_application_protos(&[b"h3"]).unwrap();
6698        config.set_initial_max_data(1500);
6699        config.set_initial_max_stream_data_bidi_local(150);
6700        config.set_initial_max_stream_data_bidi_remote(150);
6701        config.set_initial_max_stream_data_uni(150);
6702        config.set_initial_max_streams_bidi(100);
6703        config.set_initial_max_streams_uni(5);
6704        config.verify_peer(false);
6705        config.enable_dgram(true, 100, 100);
6706
6707        let h3_config = Config::new().unwrap();
6708        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
6709        s.handshake().unwrap();
6710
6711        // 10 bytes on flow ID 0 and 2.
6712        let flow_0_result = (11, 0, 1);
6713        let flow_2_result = (11, 2, 1);
6714
6715        // Send requests followed by DATAGRAMs on client side.
6716        let (stream, req) = s.send_request(false).unwrap();
6717
6718        let body = s.send_body_client(stream, true).unwrap();
6719
6720        let mut recv_buf = vec![0; body.len()];
6721
6722        let ev_headers = Event::Headers {
6723            list: req,
6724            more_frames: true,
6725        };
6726
6727        s.send_dgram_client(0).unwrap();
6728        s.send_dgram_client(0).unwrap();
6729        s.send_dgram_client(0).unwrap();
6730        s.send_dgram_client(0).unwrap();
6731        s.send_dgram_client(0).unwrap();
6732        s.send_dgram_client(2).unwrap();
6733        s.send_dgram_client(2).unwrap();
6734        s.send_dgram_client(2).unwrap();
6735        s.send_dgram_client(2).unwrap();
6736        s.send_dgram_client(2).unwrap();
6737
6738        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
6739        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
6740
6741        assert_eq!(s.poll_server(), Err(Error::Done));
6742
6743        // Second cycle, start to read
6744        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
6745        assert_eq!(s.poll_server(), Err(Error::Done));
6746        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
6747        assert_eq!(s.poll_server(), Err(Error::Done));
6748        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
6749        assert_eq!(s.poll_server(), Err(Error::Done));
6750
6751        assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
6752        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
6753
6754        assert_eq!(s.poll_server(), Err(Error::Done));
6755
6756        // Third cycle.
6757        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
6758        assert_eq!(s.poll_server(), Err(Error::Done));
6759        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
6760        assert_eq!(s.poll_server(), Err(Error::Done));
6761        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
6762        assert_eq!(s.poll_server(), Err(Error::Done));
6763        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
6764        assert_eq!(s.poll_server(), Err(Error::Done));
6765        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
6766        assert_eq!(s.poll_server(), Err(Error::Done));
6767        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
6768        assert_eq!(s.poll_server(), Err(Error::Done));
6769        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
6770        assert_eq!(s.poll_server(), Err(Error::Done));
6771
6772        // Send response followed by DATAGRAM on server side
6773        let resp = s.send_response(stream, false).unwrap();
6774
6775        let body = s.send_body_server(stream, true).unwrap();
6776
6777        let mut recv_buf = vec![0; body.len()];
6778
6779        let ev_headers = Event::Headers {
6780            list: resp,
6781            more_frames: true,
6782        };
6783
6784        s.send_dgram_server(0).unwrap();
6785        s.send_dgram_server(0).unwrap();
6786        s.send_dgram_server(0).unwrap();
6787        s.send_dgram_server(0).unwrap();
6788        s.send_dgram_server(0).unwrap();
6789        s.send_dgram_server(2).unwrap();
6790        s.send_dgram_server(2).unwrap();
6791        s.send_dgram_server(2).unwrap();
6792        s.send_dgram_server(2).unwrap();
6793        s.send_dgram_server(2).unwrap();
6794
6795        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
6796        assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
6797
6798        assert_eq!(s.poll_client(), Err(Error::Done));
6799
6800        // Second cycle, start to read
6801        assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_0_result));
6802        assert_eq!(s.poll_client(), Err(Error::Done));
6803        assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_0_result));
6804        assert_eq!(s.poll_client(), Err(Error::Done));
6805        assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_0_result));
6806        assert_eq!(s.poll_client(), Err(Error::Done));
6807
6808        assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(body.len()));
6809        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
6810
6811        assert_eq!(s.poll_client(), Err(Error::Done));
6812
6813        // Third cycle.
6814        assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_0_result));
6815        assert_eq!(s.poll_client(), Err(Error::Done));
6816        assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_0_result));
6817        assert_eq!(s.poll_client(), Err(Error::Done));
6818        assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_2_result));
6819        assert_eq!(s.poll_client(), Err(Error::Done));
6820        assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_2_result));
6821        assert_eq!(s.poll_client(), Err(Error::Done));
6822        assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_2_result));
6823        assert_eq!(s.poll_client(), Err(Error::Done));
6824        assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_2_result));
6825        assert_eq!(s.poll_client(), Err(Error::Done));
6826        assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_2_result));
6827        assert_eq!(s.poll_client(), Err(Error::Done));
6828    }
6829
6830    #[test]
6831    /// Tests that the Finished event is not issued for streams of unknown type
6832    /// (e.g. GREASE).
6833    fn finished_is_for_requests() {
6834        let mut s = Session::new().unwrap();
6835        s.handshake().unwrap();
6836
6837        assert_eq!(s.poll_client(), Err(Error::Done));
6838        assert_eq!(s.poll_server(), Err(Error::Done));
6839
6840        assert_eq!(s.client.open_grease_stream(&mut s.pipe.client), Ok(()));
6841        assert_eq!(s.pipe.advance(), Ok(()));
6842
6843        assert_eq!(s.poll_client(), Err(Error::Done));
6844        assert_eq!(s.poll_server(), Err(Error::Done));
6845    }
6846
6847    #[test]
6848    /// Tests that streams are marked as finished only once.
6849    fn finished_once() {
6850        let mut s = Session::new().unwrap();
6851        s.handshake().unwrap();
6852
6853        let (stream, req) = s.send_request(false).unwrap();
6854        let body = s.send_body_client(stream, true).unwrap();
6855
6856        let mut recv_buf = vec![0; body.len()];
6857
6858        let ev_headers = Event::Headers {
6859            list: req,
6860            more_frames: true,
6861        };
6862
6863        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
6864        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
6865
6866        assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
6867        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
6868
6869        assert_eq!(s.recv_body_server(stream, &mut recv_buf), Err(Error::Done));
6870        assert_eq!(s.poll_server(), Err(Error::Done));
6871    }
6872
6873    #[test]
6874    /// Tests that the Data event is properly re-armed.
6875    fn data_event_rearm() {
6876        let bytes = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
6877
6878        let mut s = Session::new().unwrap();
6879        s.handshake().unwrap();
6880
6881        let (r1_id, r1_hdrs) = s.send_request(false).unwrap();
6882
6883        let mut recv_buf = vec![0; bytes.len()];
6884
6885        let r1_ev_headers = Event::Headers {
6886            list: r1_hdrs,
6887            more_frames: true,
6888        };
6889
6890        // Manually send an incomplete DATA frame (i.e. the frame size is longer
6891        // than the actual data sent).
6892        {
6893            let mut d = [42; 10];
6894            let mut b = octets::OctetsMut::with_slice(&mut d);
6895
6896            b.put_varint(frame::DATA_FRAME_TYPE_ID).unwrap();
6897            b.put_varint(bytes.len() as u64).unwrap();
6898            let off = b.off();
6899            s.pipe.client.stream_send(r1_id, &d[..off], false).unwrap();
6900
6901            assert_eq!(
6902                s.pipe.client.stream_send(r1_id, &bytes[..5], false),
6903                Ok(5)
6904            );
6905
6906            s.advance().ok();
6907        }
6908
6909        assert_eq!(s.poll_server(), Ok((r1_id, r1_ev_headers)));
6910        assert_eq!(s.poll_server(), Ok((r1_id, Event::Data)));
6911        assert_eq!(s.poll_server(), Err(Error::Done));
6912
6913        // Read the available body data.
6914        assert_eq!(s.recv_body_server(r1_id, &mut recv_buf), Ok(5));
6915
6916        // Send the remaining DATA payload.
6917        assert_eq!(s.pipe.client.stream_send(r1_id, &bytes[5..], false), Ok(5));
6918        s.advance().ok();
6919
6920        assert_eq!(s.poll_server(), Ok((r1_id, Event::Data)));
6921        assert_eq!(s.poll_server(), Err(Error::Done));
6922
6923        // Read the rest of the body data.
6924        assert_eq!(s.recv_body_server(r1_id, &mut recv_buf), Ok(5));
6925        assert_eq!(s.poll_server(), Err(Error::Done));
6926
6927        // Send more data.
6928        let r1_body = s.send_body_client(r1_id, false).unwrap();
6929
6930        assert_eq!(s.poll_server(), Ok((r1_id, Event::Data)));
6931        assert_eq!(s.poll_server(), Err(Error::Done));
6932
6933        assert_eq!(s.recv_body_server(r1_id, &mut recv_buf), Ok(r1_body.len()));
6934
6935        // Send a new request to ensure cross-stream events don't break rearming.
6936        let (r2_id, r2_hdrs) = s.send_request(false).unwrap();
6937        let r2_ev_headers = Event::Headers {
6938            list: r2_hdrs,
6939            more_frames: true,
6940        };
6941        let r2_body = s.send_body_client(r2_id, false).unwrap();
6942
6943        s.advance().ok();
6944
6945        assert_eq!(s.poll_server(), Ok((r2_id, r2_ev_headers)));
6946        assert_eq!(s.poll_server(), Ok((r2_id, Event::Data)));
6947        assert_eq!(s.recv_body_server(r2_id, &mut recv_buf), Ok(r2_body.len()));
6948        assert_eq!(s.poll_server(), Err(Error::Done));
6949
6950        // Send more data on request 1, then trailing HEADERS.
6951        let r1_body = s.send_body_client(r1_id, false).unwrap();
6952
6953        let trailers = vec![Header::new(b"hello", b"world")];
6954
6955        s.client
6956            .send_headers(&mut s.pipe.client, r1_id, &trailers, true)
6957            .unwrap();
6958
6959        let r1_ev_trailers = Event::Headers {
6960            list: trailers.clone(),
6961            more_frames: false,
6962        };
6963
6964        s.advance().ok();
6965
6966        assert_eq!(s.poll_server(), Ok((r1_id, Event::Data)));
6967        assert_eq!(s.recv_body_server(r1_id, &mut recv_buf), Ok(r1_body.len()));
6968
6969        assert_eq!(s.poll_server(), Ok((r1_id, r1_ev_trailers)));
6970        assert_eq!(s.poll_server(), Ok((r1_id, Event::Finished)));
6971        assert_eq!(s.poll_server(), Err(Error::Done));
6972
6973        // Send more data on request 2, then trailing HEADERS.
6974        let r2_body = s.send_body_client(r2_id, false).unwrap();
6975
6976        s.client
6977            .send_headers(&mut s.pipe.client, r2_id, &trailers, false)
6978            .unwrap();
6979
6980        let r2_ev_trailers = Event::Headers {
6981            list: trailers,
6982            more_frames: true,
6983        };
6984
6985        s.advance().ok();
6986
6987        assert_eq!(s.poll_server(), Ok((r2_id, Event::Data)));
6988        assert_eq!(s.recv_body_server(r2_id, &mut recv_buf), Ok(r2_body.len()));
6989        assert_eq!(s.poll_server(), Ok((r2_id, r2_ev_trailers)));
6990        assert_eq!(s.poll_server(), Err(Error::Done));
6991
6992        let (r3_id, r3_hdrs) = s.send_request(false).unwrap();
6993
6994        let r3_ev_headers = Event::Headers {
6995            list: r3_hdrs,
6996            more_frames: true,
6997        };
6998
6999        // Manually send an incomplete DATA frame (i.e. only the header is sent).
7000        {
7001            let mut d = [42; 10];
7002            let mut b = octets::OctetsMut::with_slice(&mut d);
7003
7004            b.put_varint(frame::DATA_FRAME_TYPE_ID).unwrap();
7005            b.put_varint(bytes.len() as u64).unwrap();
7006            let off = b.off();
7007            s.pipe.client.stream_send(r3_id, &d[..off], false).unwrap();
7008
7009            s.advance().ok();
7010        }
7011
7012        assert_eq!(s.poll_server(), Ok((r3_id, r3_ev_headers)));
7013        assert_eq!(s.poll_server(), Ok((r3_id, Event::Data)));
7014        assert_eq!(s.poll_server(), Err(Error::Done));
7015
7016        assert_eq!(s.recv_body_server(r3_id, &mut recv_buf), Err(Error::Done));
7017
7018        assert_eq!(s.pipe.client.stream_send(r3_id, &bytes[..5], false), Ok(5));
7019
7020        s.advance().ok();
7021
7022        assert_eq!(s.poll_server(), Ok((r3_id, Event::Data)));
7023        assert_eq!(s.poll_server(), Err(Error::Done));
7024
7025        assert_eq!(s.recv_body_server(r3_id, &mut recv_buf), Ok(5));
7026
7027        assert_eq!(s.pipe.client.stream_send(r3_id, &bytes[5..], false), Ok(5));
7028        s.advance().ok();
7029
7030        assert_eq!(s.poll_server(), Ok((r3_id, Event::Data)));
7031        assert_eq!(s.poll_server(), Err(Error::Done));
7032
7033        assert_eq!(s.recv_body_server(r3_id, &mut recv_buf), Ok(5));
7034
7035        // Buffer multiple data frames.
7036        let body = s.send_body_client(r3_id, false).unwrap();
7037        s.send_body_client(r3_id, false).unwrap();
7038        s.send_body_client(r3_id, false).unwrap();
7039
7040        assert_eq!(s.poll_server(), Ok((r3_id, Event::Data)));
7041        assert_eq!(s.poll_server(), Err(Error::Done));
7042
7043        {
7044            let mut d = [42; 10];
7045            let mut b = octets::OctetsMut::with_slice(&mut d);
7046
7047            b.put_varint(frame::DATA_FRAME_TYPE_ID).unwrap();
7048            b.put_varint(0).unwrap();
7049            let off = b.off();
7050            s.pipe.client.stream_send(r3_id, &d[..off], true).unwrap();
7051
7052            s.advance().ok();
7053        }
7054
7055        let mut recv_buf = vec![0; bytes.len() * 3];
7056
7057        assert_eq!(s.recv_body_server(r3_id, &mut recv_buf), Ok(body.len() * 3));
7058    }
7059
7060    #[test]
7061    /// Tests that the Datagram event is properly re-armed.
7062    fn dgram_event_rearm() {
7063        let mut buf = [0; 65535];
7064
7065        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
7066        config
7067            .load_cert_chain_from_pem_file("examples/cert.crt")
7068            .unwrap();
7069        config
7070            .load_priv_key_from_pem_file("examples/cert.key")
7071            .unwrap();
7072        config.set_application_protos(&[b"h3"]).unwrap();
7073        config.set_initial_max_data(1500);
7074        config.set_initial_max_stream_data_bidi_local(150);
7075        config.set_initial_max_stream_data_bidi_remote(150);
7076        config.set_initial_max_stream_data_uni(150);
7077        config.set_initial_max_streams_bidi(100);
7078        config.set_initial_max_streams_uni(5);
7079        config.verify_peer(false);
7080        config.enable_dgram(true, 100, 100);
7081
7082        let h3_config = Config::new().unwrap();
7083        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
7084        s.handshake().unwrap();
7085
7086        // 10 bytes on flow ID 0 and 2.
7087        let flow_0_result = (11, 0, 1);
7088        let flow_2_result = (11, 2, 1);
7089
7090        // Send requests followed by DATAGRAMs on client side.
7091        let (stream, req) = s.send_request(false).unwrap();
7092
7093        let body = s.send_body_client(stream, true).unwrap();
7094
7095        let mut recv_buf = vec![0; body.len()];
7096
7097        let ev_headers = Event::Headers {
7098            list: req,
7099            more_frames: true,
7100        };
7101
7102        s.send_dgram_client(0).unwrap();
7103        s.send_dgram_client(0).unwrap();
7104        s.send_dgram_client(2).unwrap();
7105        s.send_dgram_client(2).unwrap();
7106
7107        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
7108        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
7109
7110        assert_eq!(s.poll_server(), Err(Error::Done));
7111        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
7112
7113        assert_eq!(s.poll_server(), Err(Error::Done));
7114        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
7115
7116        assert_eq!(s.poll_server(), Err(Error::Done));
7117        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
7118
7119        assert_eq!(s.poll_server(), Err(Error::Done));
7120        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
7121
7122        assert_eq!(s.poll_server(), Err(Error::Done));
7123
7124        s.send_dgram_client(0).unwrap();
7125        s.send_dgram_client(2).unwrap();
7126
7127        assert_eq!(s.poll_server(), Err(Error::Done));
7128
7129        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
7130        assert_eq!(s.poll_server(), Err(Error::Done));
7131
7132        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
7133        assert_eq!(s.poll_server(), Err(Error::Done));
7134
7135        assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
7136        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
7137    }
7138
7139    #[test]
7140    fn reset_stream() {
7141        let mut buf = [0; 65535];
7142
7143        let mut s = Session::new().unwrap();
7144        s.handshake().unwrap();
7145
7146        // Client sends request.
7147        let (stream, req) = s.send_request(false).unwrap();
7148
7149        let ev_headers = Event::Headers {
7150            list: req,
7151            more_frames: true,
7152        };
7153
7154        // Server sends response and closes stream.
7155        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
7156        assert_eq!(s.poll_server(), Err(Error::Done));
7157
7158        let resp = s.send_response(stream, true).unwrap();
7159
7160        let ev_headers = Event::Headers {
7161            list: resp,
7162            more_frames: false,
7163        };
7164
7165        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
7166        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
7167        assert_eq!(s.poll_client(), Err(Error::Done));
7168
7169        // Client sends RESET_STREAM, closing stream.
7170        let frames = [crate::frame::Frame::ResetStream {
7171            stream_id: stream,
7172            error_code: 42,
7173            final_size: 68,
7174        }];
7175
7176        let pkt_type = crate::packet::Type::Short;
7177        assert_eq!(
7178            s.pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
7179            Ok(39)
7180        );
7181
7182        // Server issues Reset event for the stream.
7183        assert_eq!(s.poll_server(), Ok((stream, Event::Reset(42))));
7184        assert_eq!(s.poll_server(), Err(Error::Done));
7185
7186        // Sending RESET_STREAM again shouldn't trigger another Reset event.
7187        assert_eq!(
7188            s.pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
7189            Ok(39)
7190        );
7191
7192        assert_eq!(s.poll_server(), Err(Error::Done));
7193    }
7194
7195    #[test]
7196    fn reset_finished_at_server() {
7197        let mut s = Session::new().unwrap();
7198        s.handshake().unwrap();
7199
7200        // Client sends HEADERS and doesn't fin
7201        let (stream, _req) = s.send_request(false).unwrap();
7202
7203        // ..then Client sends RESET_STREAM
7204        assert_eq!(
7205            s.pipe.client.stream_shutdown(0, crate::Shutdown::Write, 0),
7206            Ok(())
7207        );
7208
7209        assert_eq!(s.pipe.advance(), Ok(()));
7210
7211        // Server receives just a reset
7212        assert_eq!(s.poll_server(), Ok((stream, Event::Reset(0))));
7213        assert_eq!(s.poll_server(), Err(Error::Done));
7214
7215        // Client sends HEADERS and fin
7216        let (stream, req) = s.send_request(true).unwrap();
7217
7218        // ..then Client sends RESET_STREAM
7219        assert_eq!(
7220            s.pipe.client.stream_shutdown(4, crate::Shutdown::Write, 0),
7221            Ok(())
7222        );
7223
7224        let ev_headers = Event::Headers {
7225            list: req,
7226            more_frames: false,
7227        };
7228
7229        // Server receives headers and fin.
7230        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
7231        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
7232        assert_eq!(s.poll_server(), Err(Error::Done));
7233    }
7234
7235    #[test]
7236    fn reset_finished_at_server_with_data_pending() {
7237        let mut s = Session::new().unwrap();
7238        s.handshake().unwrap();
7239
7240        // Client sends HEADERS and doesn't fin.
7241        let (stream, req) = s.send_request(false).unwrap();
7242
7243        assert!(s.send_body_client(stream, false).is_ok());
7244
7245        assert_eq!(s.pipe.advance(), Ok(()));
7246
7247        let ev_headers = Event::Headers {
7248            list: req,
7249            more_frames: true,
7250        };
7251
7252        // Server receives headers and data...
7253        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
7254        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
7255
7256        // ..then Client sends RESET_STREAM.
7257        assert_eq!(
7258            s.pipe
7259                .client
7260                .stream_shutdown(stream, crate::Shutdown::Write, 0),
7261            Ok(())
7262        );
7263
7264        assert_eq!(s.pipe.advance(), Ok(()));
7265
7266        // Server receives the reset and there are no more readable streams.
7267        assert_eq!(s.poll_server(), Ok((stream, Event::Reset(0))));
7268        assert_eq!(s.poll_server(), Err(Error::Done));
7269        assert_eq!(s.pipe.server.readable().len(), 0);
7270    }
7271
7272    #[test]
7273    fn reset_finished_at_client() {
7274        let mut buf = [0; 65535];
7275        let mut s = Session::new().unwrap();
7276        s.handshake().unwrap();
7277
7278        // Client sends HEADERS and doesn't fin
7279        let (stream, req) = s.send_request(false).unwrap();
7280
7281        let ev_headers = Event::Headers {
7282            list: req,
7283            more_frames: true,
7284        };
7285
7286        // Server receives headers.
7287        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
7288        assert_eq!(s.poll_server(), Err(Error::Done));
7289
7290        // Server sends response and doesn't fin
7291        s.send_response(stream, false).unwrap();
7292
7293        assert_eq!(s.pipe.advance(), Ok(()));
7294
7295        // .. then Server sends RESET_STREAM
7296        assert_eq!(
7297            s.pipe
7298                .server
7299                .stream_shutdown(stream, crate::Shutdown::Write, 0),
7300            Ok(())
7301        );
7302
7303        assert_eq!(s.pipe.advance(), Ok(()));
7304
7305        // Client receives Reset only
7306        assert_eq!(s.poll_client(), Ok((stream, Event::Reset(0))));
7307        assert_eq!(s.poll_server(), Err(Error::Done));
7308
7309        // Client sends headers and fin.
7310        let (stream, req) = s.send_request(true).unwrap();
7311
7312        let ev_headers = Event::Headers {
7313            list: req,
7314            more_frames: false,
7315        };
7316
7317        // Server receives headers and fin.
7318        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
7319        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
7320        assert_eq!(s.poll_server(), Err(Error::Done));
7321
7322        // Server sends response and fin
7323        let resp = s.send_response(stream, true).unwrap();
7324
7325        assert_eq!(s.pipe.advance(), Ok(()));
7326
7327        // ..then Server sends RESET_STREAM
7328        let frames = [crate::frame::Frame::ResetStream {
7329            stream_id: stream,
7330            error_code: 42,
7331            final_size: 68,
7332        }];
7333
7334        let pkt_type = crate::packet::Type::Short;
7335        assert_eq!(
7336            s.pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
7337            Ok(39)
7338        );
7339
7340        assert_eq!(s.pipe.advance(), Ok(()));
7341
7342        let ev_headers = Event::Headers {
7343            list: resp,
7344            more_frames: false,
7345        };
7346
7347        // Client receives headers and fin.
7348        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
7349        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
7350        assert_eq!(s.poll_client(), Err(Error::Done));
7351    }
7352}
7353
7354#[cfg(feature = "ffi")]
7355mod ffi;
7356#[cfg(feature = "internal")]
7357#[doc(hidden)]
7358pub mod frame;
7359#[cfg(not(feature = "internal"))]
7360mod frame;
7361#[doc(hidden)]
7362pub mod qpack;
7363mod stream;