Skip to main content

quiche/h3/
mod.rs

1// Copyright (C) 2019, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27//! HTTP/3 wire protocol and QPACK implementation.
28//!
29//! This module provides a high level API for sending and receiving HTTP/3
30//! requests and responses on top of the QUIC transport protocol.
31//!
32//! ## Connection setup
33//!
34//! HTTP/3 connections require a QUIC transport-layer connection, see
35//! [Connection setup] for a full description of the setup process.
36//!
37//! To use HTTP/3, the QUIC connection must be configured with a suitable
38//! Application Layer Protocol Negotiation (ALPN) Protocol ID:
39//!
40//! ```
41//! let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION)?;
42//! config.set_application_protos(quiche::h3::APPLICATION_PROTOCOL)?;
43//! # Ok::<(), quiche::Error>(())
44//! ```
45//!
46//! The QUIC handshake is driven by [sending] and [receiving] QUIC packets.
47//!
48//! Once the handshake has completed, the first step in establishing an HTTP/3
49//! connection is creating its configuration object:
50//!
51//! ```
52//! let h3_config = quiche::h3::Config::new()?;
53//! # Ok::<(), quiche::h3::Error>(())
54//! ```
55//!
56//! HTTP/3 client and server connections are both created using the
57//! [`with_transport()`] function, the role is inferred from the type of QUIC
58//! connection:
59//!
60//! ```no_run
61//! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap();
62//! # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]);
63//! # let peer = "127.0.0.1:1234".parse().unwrap();
64//! # let local = "127.0.0.1:4321".parse().unwrap();
65//! # let mut conn = quiche::accept(&scid, None, local, peer, &mut config).unwrap();
66//! # let h3_config = quiche::h3::Config::new()?;
67//! let h3_conn = quiche::h3::Connection::with_transport(&mut conn, &h3_config)?;
68//! # Ok::<(), quiche::h3::Error>(())
69//! ```
70//!
71//! ## Sending a request
72//!
73//! An HTTP/3 client can send a request by using the connection's
74//! [`send_request()`] method to queue request headers; [sending] QUIC packets
75//! causes the requests to get sent to the peer:
76//!
77//! ```no_run
78//! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap();
79//! # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]);
80//! # let peer = "127.0.0.1:1234".parse().unwrap();
81//! # let local = "127.0.0.1:4321".parse().unwrap();
82//! # let mut conn = quiche::connect(None, &scid, local, peer, &mut config).unwrap();
83//! # let h3_config = quiche::h3::Config::new()?;
84//! # let mut h3_conn = quiche::h3::Connection::with_transport(&mut conn, &h3_config)?;
85//! let req = vec![
86//!     quiche::h3::Header::new(b":method", b"GET"),
87//!     quiche::h3::Header::new(b":scheme", b"https"),
88//!     quiche::h3::Header::new(b":authority", b"quic.tech"),
89//!     quiche::h3::Header::new(b":path", b"/"),
90//!     quiche::h3::Header::new(b"user-agent", b"quiche"),
91//! ];
92//!
93//! h3_conn.send_request(&mut conn, &req, true)?;
94//! # Ok::<(), quiche::h3::Error>(())
95//! ```
96//!
97//! An HTTP/3 client can send a request with additional body data by using
98//! the connection's [`send_body()`] method:
99//!
100//! ```no_run
101//! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap();
102//! # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]);
103//! # let peer = "127.0.0.1:1234".parse().unwrap();
104//! # let local = "127.0.0.1:4321".parse().unwrap();
105//! # let mut conn = quiche::connect(None, &scid, local, peer, &mut config).unwrap();
106//! # let h3_config = quiche::h3::Config::new()?;
107//! # let mut h3_conn = quiche::h3::Connection::with_transport(&mut conn, &h3_config)?;
108//! let req = vec![
109//!     quiche::h3::Header::new(b":method", b"GET"),
110//!     quiche::h3::Header::new(b":scheme", b"https"),
111//!     quiche::h3::Header::new(b":authority", b"quic.tech"),
112//!     quiche::h3::Header::new(b":path", b"/"),
113//!     quiche::h3::Header::new(b"user-agent", b"quiche"),
114//! ];
115//!
116//! let stream_id = h3_conn.send_request(&mut conn, &req, false)?;
117//! h3_conn.send_body(&mut conn, stream_id, b"Hello World!", true)?;
118//! # Ok::<(), quiche::h3::Error>(())
119//! ```
120//!
121//! ## Handling requests and responses
122//!
123//! After [receiving] QUIC packets, HTTP/3 data is processed using the
124//! connection's [`poll()`] method. On success, this returns an [`Event`] object
125//! and an ID corresponding to the stream where the `Event` originated.
126//!
127//! An HTTP/3 server uses [`poll()`] to read requests and responds to them using
128//! [`send_response()`] and [`send_body()`]:
129//!
130//! ```no_run
131//! use quiche::h3::NameValue;
132//!
133//! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap();
134//! # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]);
135//! # let peer = "127.0.0.1:1234".parse().unwrap();
136//! # let local = "127.0.0.1:1234".parse().unwrap();
137//! # let mut conn = quiche::accept(&scid, None, local, peer, &mut config).unwrap();
138//! # let h3_config = quiche::h3::Config::new()?;
139//! # let mut h3_conn = quiche::h3::Connection::with_transport(&mut conn, &h3_config)?;
140//! loop {
141//!     match h3_conn.poll(&mut conn) {
142//!         Ok((stream_id, quiche::h3::Event::Headers{list, more_frames})) => {
143//!             let mut headers = list.into_iter();
144//!
145//!             // Look for the request's method.
146//!             let method = headers.find(|h| h.name() == b":method").unwrap();
147//!
148//!             // Look for the request's path.
149//!             let path = headers.find(|h| h.name() == b":path").unwrap();
150//!
151//!             if method.value() == b"GET" && path.value() == b"/" {
152//!                 let resp = vec![
153//!                     quiche::h3::Header::new(b":status", 200.to_string().as_bytes()),
154//!                     quiche::h3::Header::new(b"server", b"quiche"),
155//!                 ];
156//!
157//!                 h3_conn.send_response(&mut conn, stream_id, &resp, false)?;
158//!                 h3_conn.send_body(&mut conn, stream_id, b"Hello World!", true)?;
159//!             }
160//!         },
161//!
162//!         Ok((stream_id, quiche::h3::Event::Data)) => {
163//!             // Request body data, handle it.
164//!             # return Ok(());
165//!         },
166//!
167//!         Ok((stream_id, quiche::h3::Event::Finished)) => {
168//!             // Peer terminated stream, handle it.
169//!         },
170//!
171//!         Ok((stream_id, quiche::h3::Event::Reset(err))) => {
172//!             // Peer reset the stream, handle it.
173//!         },
174//!
175//!         Ok((_flow_id, quiche::h3::Event::PriorityUpdate)) => (),
176//!
177//!         Ok((goaway_id, quiche::h3::Event::GoAway)) => {
178//!              // Peer signalled it is going away, handle it.
179//!         },
180//!
181//!         Err(quiche::h3::Error::Done) => {
182//!             // Done reading.
183//!             break;
184//!         },
185//!
186//!         Err(e) => {
187//!             // An error occurred, handle it.
188//!             break;
189//!         },
190//!     }
191//! }
192//! # Ok::<(), quiche::h3::Error>(())
193//! ```
194//!
195//! An HTTP/3 client uses [`poll()`] to read responses:
196//!
197//! ```no_run
198//! use quiche::h3::NameValue;
199//!
200//! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap();
201//! # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]);
202//! # let peer = "127.0.0.1:1234".parse().unwrap();
203//! # let local = "127.0.0.1:1234".parse().unwrap();
204//! # let mut conn = quiche::connect(None, &scid, local, peer, &mut config).unwrap();
205//! # let h3_config = quiche::h3::Config::new()?;
206//! # let mut h3_conn = quiche::h3::Connection::with_transport(&mut conn, &h3_config)?;
207//! loop {
208//!     match h3_conn.poll(&mut conn) {
209//!         Ok((stream_id, quiche::h3::Event::Headers{list, more_frames})) => {
210//!             let status = list.iter().find(|h| h.name() == b":status").unwrap();
211//!             println!("Received {} response on stream {}",
212//!                      std::str::from_utf8(status.value()).unwrap(),
213//!                      stream_id);
214//!         },
215//!
216//!         Ok((stream_id, quiche::h3::Event::Data)) => {
217//!             let mut body = vec![0; 4096];
218//!
219//!             // Consume all body data received on the stream.
220//!             while let Ok(read) =
221//!                 h3_conn.recv_body(&mut conn, stream_id, &mut body)
222//!             {
223//!                 println!("Received {} bytes of payload on stream {}",
224//!                          read, stream_id);
225//!             }
226//!         },
227//!
228//!         Ok((stream_id, quiche::h3::Event::Finished)) => {
229//!             // Peer terminated stream, handle it.
230//!         },
231//!
232//!         Ok((stream_id, quiche::h3::Event::Reset(err))) => {
233//!             // Peer reset the stream, handle it.
234//!         },
235//!
236//!         Ok((_prioritized_element_id, quiche::h3::Event::PriorityUpdate)) => (),
237//!
238//!         Ok((goaway_id, quiche::h3::Event::GoAway)) => {
239//!              // Peer signalled it is going away, handle it.
240//!         },
241//!
242//!         Err(quiche::h3::Error::Done) => {
243//!             // Done reading.
244//!             break;
245//!         },
246//!
247//!         Err(e) => {
248//!             // An error occurred, handle it.
249//!             break;
250//!         },
251//!     }
252//! }
253//! # Ok::<(), quiche::h3::Error>(())
254//! ```
255//!
256//! ## Detecting end of request or response
257//!
258//! A single HTTP/3 request or response may consist of several HEADERS and DATA
259//! frames; it is finished when the QUIC stream is closed. Calling [`poll()`]
260//! repeatedly will generate an [`Event`] for each of these. The application may
261//! use these event to do additional HTTP semantic validation.
262//!
263//! ## HTTP/3 protocol errors
264//!
265//! Quiche is responsible for managing the HTTP/3 connection, ensuring it is in
266//! a correct state and validating all messages received by a peer. This mainly
267//! takes place in the [`poll()`] method. If an HTTP/3 error occurs, quiche will
268//! close the connection and send an appropriate CONNECTION_CLOSE frame to the
269//! peer. An [`Error`] is returned to the application so that it can perform any
270//! required tidy up such as closing sockets.
271//!
272//! [`application_proto()`]: ../struct.Connection.html#method.application_proto
273//! [`stream_finished()`]: ../struct.Connection.html#method.stream_finished
274//! [Connection setup]: ../index.html#connection-setup
275//! [sending]: ../index.html#generating-outgoing-packets
276//! [receiving]: ../index.html#handling-incoming-packets
277//! [`with_transport()`]: struct.Connection.html#method.with_transport
278//! [`poll()`]: struct.Connection.html#method.poll
279//! [`Event`]: enum.Event.html
280//! [`Error`]: enum.Error.html
281//! [`send_request()`]: struct.Connection.html#method.send_response
282//! [`send_response()`]: struct.Connection.html#method.send_response
283//! [`send_body()`]: struct.Connection.html#method.send_body
284
285use std::collections::HashSet;
286use std::collections::VecDeque;
287
288#[cfg(feature = "sfv")]
289use std::convert::TryFrom;
290use std::fmt;
291use std::fmt::Write;
292
293#[cfg(feature = "qlog")]
294use qlog::events::http3::FrameCreated;
295#[cfg(feature = "qlog")]
296use qlog::events::http3::FrameParsed;
297#[cfg(feature = "qlog")]
298use qlog::events::http3::Http3EventType;
299#[cfg(feature = "qlog")]
300use qlog::events::http3::Http3Frame;
301#[cfg(feature = "qlog")]
302use qlog::events::http3::Initiator;
303#[cfg(feature = "qlog")]
304use qlog::events::http3::StreamType;
305#[cfg(feature = "qlog")]
306use qlog::events::http3::StreamTypeSet;
307#[cfg(feature = "qlog")]
308use qlog::events::EventData;
309#[cfg(feature = "qlog")]
310use qlog::events::EventImportance;
311#[cfg(feature = "qlog")]
312use qlog::events::EventType;
313
314use crate::buffers::BufFactory;
315use crate::BufSplit;
316
317/// List of ALPN tokens of supported HTTP/3 versions.
318///
319/// This can be passed directly to the [`Config::set_application_protos()`]
320/// method when implementing HTTP/3 applications.
321///
322/// [`Config::set_application_protos()`]:
323/// ../struct.Config.html#method.set_application_protos
324pub const APPLICATION_PROTOCOL: &[&[u8]] = &[b"h3"];
325
326// The offset used when converting HTTP/3 urgency to quiche urgency.
327const PRIORITY_URGENCY_OFFSET: u8 = 124;
328
329// Parameter values as specified in [Extensible Priorities].
330//
331// [Extensible Priorities]: https://www.rfc-editor.org/rfc/rfc9218.html#section-4.
332const PRIORITY_URGENCY_LOWER_BOUND: u8 = 0;
333const PRIORITY_URGENCY_UPPER_BOUND: u8 = 7;
334const PRIORITY_URGENCY_DEFAULT: u8 = 3;
335const PRIORITY_INCREMENTAL_DEFAULT: bool = false;
336
337#[cfg(feature = "qlog")]
338const QLOG_FRAME_CREATED: EventType =
339    EventType::Http3EventType(Http3EventType::FrameCreated);
340#[cfg(feature = "qlog")]
341const QLOG_FRAME_PARSED: EventType =
342    EventType::Http3EventType(Http3EventType::FrameParsed);
343#[cfg(feature = "qlog")]
344const QLOG_STREAM_TYPE_SET: EventType =
345    EventType::Http3EventType(Http3EventType::StreamTypeSet);
346
347/// A specialized [`Result`] type for quiche HTTP/3 operations.
348///
349/// This type is used throughout quiche's HTTP/3 public API for any operation
350/// that can produce an error.
351///
352/// [`Result`]: https://doc.rust-lang.org/std/result/enum.Result.html
353pub type Result<T> = std::result::Result<T, Error>;
354
355/// An HTTP/3 error.
356#[derive(Clone, Copy, Debug, PartialEq, Eq)]
357pub enum Error {
358    /// There is no error or no work to do
359    Done,
360
361    /// The provided buffer is too short.
362    BufferTooShort,
363
364    /// Internal error in the HTTP/3 stack.
365    InternalError,
366
367    /// Endpoint detected that the peer is exhibiting behavior that causes.
368    /// excessive load.
369    ExcessiveLoad,
370
371    /// Stream ID or Push ID greater that current maximum was
372    /// used incorrectly, such as exceeding a limit, reducing a limit,
373    /// or being reused.
374    IdError,
375
376    /// The endpoint detected that its peer created a stream that it will not
377    /// accept.
378    StreamCreationError,
379
380    /// A required critical stream was closed.
381    ClosedCriticalStream,
382
383    /// No SETTINGS frame at beginning of control stream.
384    MissingSettings,
385
386    /// A frame was received which is not permitted in the current state.
387    FrameUnexpected,
388
389    /// Frame violated layout or size rules.
390    FrameError,
391
392    /// QPACK Header block decompression failure.
393    QpackDecompressionFailed,
394
395    /// Error originated from the transport layer.
396    TransportError(crate::Error),
397
398    /// The underlying QUIC stream (or connection) doesn't have enough capacity
399    /// for the operation to complete. The application should retry later on.
400    StreamBlocked,
401
402    /// Error in the payload of a SETTINGS frame.
403    SettingsError,
404
405    /// Server rejected request.
406    RequestRejected,
407
408    /// Request or its response cancelled.
409    RequestCancelled,
410
411    /// Client's request stream terminated without containing a full-formed
412    /// request.
413    RequestIncomplete,
414
415    /// An HTTP message was malformed and cannot be processed.
416    MessageError,
417
418    /// The TCP connection established in response to a CONNECT request was
419    /// reset or abnormally closed.
420    ConnectError,
421
422    /// The requested operation cannot be served over HTTP/3. Peer should retry
423    /// over HTTP/1.1.
424    VersionFallback,
425}
426
427/// HTTP/3 error codes sent on the wire.
428///
429/// As defined in [RFC9114](https://www.rfc-editor.org/rfc/rfc9114.html#http-error-codes).
430#[derive(Copy, Clone, Debug, Eq, PartialEq)]
431pub enum WireErrorCode {
432    /// No error. This is used when the connection or stream needs to be closed,
433    /// but there is no error to signal.
434    NoError              = 0x100,
435    /// Peer violated protocol requirements in a way that does not match a more
436    /// specific error code or endpoint declines to use the more specific
437    /// error code.
438    GeneralProtocolError = 0x101,
439    /// An internal error has occurred in the HTTP stack.
440    InternalError        = 0x102,
441    /// The endpoint detected that its peer created a stream that it will not
442    /// accept.
443    StreamCreationError  = 0x103,
444    /// A stream required by the HTTP/3 connection was closed or reset.
445    ClosedCriticalStream = 0x104,
446    /// A frame was received that was not permitted in the current state or on
447    /// the current stream.
448    FrameUnexpected      = 0x105,
449    /// A frame that fails to satisfy layout requirements or with an invalid
450    /// size was received.
451    FrameError           = 0x106,
452    /// The endpoint detected that its peer is exhibiting a behavior that might
453    /// be generating excessive load.
454    ExcessiveLoad        = 0x107,
455    /// A stream ID or push ID was used incorrectly, such as exceeding a limit,
456    /// reducing a limit, or being reused.
457    IdError              = 0x108,
458    /// An endpoint detected an error in the payload of a SETTINGS frame.
459    SettingsError        = 0x109,
460    /// No SETTINGS frame was received at the beginning of the control stream.
461    MissingSettings      = 0x10a,
462    /// A server rejected a request without performing any application
463    /// processing.
464    RequestRejected      = 0x10b,
465    /// The request or its response (including pushed response) is cancelled.
466    RequestCancelled     = 0x10c,
467    /// The client's stream terminated without containing a fully formed
468    /// request.
469    RequestIncomplete    = 0x10d,
470    /// An HTTP message was malformed and cannot be processed.
471    MessageError         = 0x10e,
472    /// The TCP connection established in response to a CONNECT request was
473    /// reset or abnormally closed.
474    ConnectError         = 0x10f,
475    /// The requested operation cannot be served over HTTP/3. The peer should
476    /// retry over HTTP/1.1.
477    VersionFallback      = 0x110,
478}
479
480impl Error {
481    fn to_wire(self) -> u64 {
482        match self {
483            Error::Done => WireErrorCode::NoError as u64,
484            Error::InternalError => WireErrorCode::InternalError as u64,
485            Error::StreamCreationError =>
486                WireErrorCode::StreamCreationError as u64,
487            Error::ClosedCriticalStream =>
488                WireErrorCode::ClosedCriticalStream as u64,
489            Error::FrameUnexpected => WireErrorCode::FrameUnexpected as u64,
490            Error::FrameError => WireErrorCode::FrameError as u64,
491            Error::ExcessiveLoad => WireErrorCode::ExcessiveLoad as u64,
492            Error::IdError => WireErrorCode::IdError as u64,
493            Error::MissingSettings => WireErrorCode::MissingSettings as u64,
494            Error::QpackDecompressionFailed => 0x200,
495            Error::BufferTooShort => 0x999,
496            Error::TransportError { .. } | Error::StreamBlocked => 0xFF,
497            Error::SettingsError => WireErrorCode::SettingsError as u64,
498            Error::RequestRejected => WireErrorCode::RequestRejected as u64,
499            Error::RequestCancelled => WireErrorCode::RequestCancelled as u64,
500            Error::RequestIncomplete => WireErrorCode::RequestIncomplete as u64,
501            Error::MessageError => WireErrorCode::MessageError as u64,
502            Error::ConnectError => WireErrorCode::ConnectError as u64,
503            Error::VersionFallback => WireErrorCode::VersionFallback as u64,
504        }
505    }
506
507    #[cfg(feature = "ffi")]
508    fn to_c(self) -> libc::ssize_t {
509        match self {
510            Error::Done => -1,
511            Error::BufferTooShort => -2,
512            Error::InternalError => -3,
513            Error::ExcessiveLoad => -4,
514            Error::IdError => -5,
515            Error::StreamCreationError => -6,
516            Error::ClosedCriticalStream => -7,
517            Error::MissingSettings => -8,
518            Error::FrameUnexpected => -9,
519            Error::FrameError => -10,
520            Error::QpackDecompressionFailed => -11,
521            // -12 was previously used for TransportError, skip it
522            Error::StreamBlocked => -13,
523            Error::SettingsError => -14,
524            Error::RequestRejected => -15,
525            Error::RequestCancelled => -16,
526            Error::RequestIncomplete => -17,
527            Error::MessageError => -18,
528            Error::ConnectError => -19,
529            Error::VersionFallback => -20,
530
531            Error::TransportError(quic_error) => quic_error.to_c() - 1000,
532        }
533    }
534}
535
536impl fmt::Display for Error {
537    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
538        write!(f, "{self:?}")
539    }
540}
541
542impl std::error::Error for Error {
543    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
544        None
545    }
546}
547
548impl From<super::Error> for Error {
549    fn from(err: super::Error) -> Self {
550        match err {
551            super::Error::Done => Error::Done,
552
553            _ => Error::TransportError(err),
554        }
555    }
556}
557
558impl From<octets::BufferTooShortError> for Error {
559    fn from(_err: octets::BufferTooShortError) -> Self {
560        Error::BufferTooShort
561    }
562}
563
564/// An HTTP/3 configuration.
565pub struct Config {
566    max_field_section_size: Option<u64>,
567    qpack_max_table_capacity: Option<u64>,
568    qpack_blocked_streams: Option<u64>,
569    connect_protocol_enabled: Option<u64>,
570    /// additional settings are settings that are not part of the H3
571    /// settings explicitly handled above
572    additional_settings: Option<Vec<(u64, u64)>>,
573}
574
575impl Config {
576    /// Creates a new configuration object with default settings.
577    pub const fn new() -> Result<Config> {
578        Ok(Config {
579            max_field_section_size: None,
580            qpack_max_table_capacity: None,
581            qpack_blocked_streams: None,
582            connect_protocol_enabled: None,
583            additional_settings: None,
584        })
585    }
586
587    /// Sets the `SETTINGS_MAX_FIELD_SECTION_SIZE` setting.
588    ///
589    /// By default no limit is enforced. When a request whose headers exceed
590    /// the limit set by the application is received, the call to the [`poll()`]
591    /// method will return the [`Error::ExcessiveLoad`] error, and the
592    /// connection will be closed.
593    ///
594    /// [`poll()`]: struct.Connection.html#method.poll
595    /// [`Error::ExcessiveLoad`]: enum.Error.html#variant.ExcessiveLoad
596    pub fn set_max_field_section_size(&mut self, v: u64) {
597        self.max_field_section_size = Some(v);
598    }
599
600    /// Sets the `SETTINGS_QPACK_MAX_TABLE_CAPACITY` setting.
601    ///
602    /// The default value is `0`.
603    pub fn set_qpack_max_table_capacity(&mut self, v: u64) {
604        self.qpack_max_table_capacity = Some(v);
605    }
606
607    /// Sets the `SETTINGS_QPACK_BLOCKED_STREAMS` setting.
608    ///
609    /// The default value is `0`.
610    pub fn set_qpack_blocked_streams(&mut self, v: u64) {
611        self.qpack_blocked_streams = Some(v);
612    }
613
614    /// Sets or omits the `SETTINGS_ENABLE_CONNECT_PROTOCOL` setting.
615    ///
616    /// The default value is `false`.
617    pub fn enable_extended_connect(&mut self, enabled: bool) {
618        if enabled {
619            self.connect_protocol_enabled = Some(1);
620        } else {
621            self.connect_protocol_enabled = None;
622        }
623    }
624
625    /// Sets additional HTTP/3 settings.
626    ///
627    /// The default value is no additional settings.
628    /// The `additional_settings` parameter must not the following
629    /// settings as they are already handled by this library:
630    ///
631    /// - SETTINGS_QPACK_MAX_TABLE_CAPACITY
632    /// - SETTINGS_MAX_FIELD_SECTION_SIZE
633    /// - SETTINGS_QPACK_BLOCKED_STREAMS
634    /// - SETTINGS_ENABLE_CONNECT_PROTOCOL
635    /// - SETTINGS_H3_DATAGRAM
636    ///
637    /// If such a setting is present in the `additional_settings`,
638    /// the method will return the [`Error::SettingsError`] error.
639    ///
640    /// If a setting identifier is present twice in `additional_settings`,
641    /// the method will return the [`Error::SettingsError`] error.
642    ///
643    /// [`Error::SettingsError`]: enum.Error.html#variant.SettingsError
644    pub fn set_additional_settings(
645        &mut self, additional_settings: Vec<(u64, u64)>,
646    ) -> Result<()> {
647        let explicit_quiche_settings = HashSet::from([
648            frame::SETTINGS_QPACK_MAX_TABLE_CAPACITY,
649            frame::SETTINGS_MAX_FIELD_SECTION_SIZE,
650            frame::SETTINGS_QPACK_BLOCKED_STREAMS,
651            frame::SETTINGS_ENABLE_CONNECT_PROTOCOL,
652            frame::SETTINGS_H3_DATAGRAM,
653            frame::SETTINGS_H3_DATAGRAM_00,
654        ]);
655
656        let dedup_settings: HashSet<u64> =
657            additional_settings.iter().map(|(key, _)| *key).collect();
658
659        if dedup_settings.len() != additional_settings.len() ||
660            !explicit_quiche_settings.is_disjoint(&dedup_settings)
661        {
662            return Err(Error::SettingsError);
663        }
664        self.additional_settings = Some(additional_settings);
665        Ok(())
666    }
667}
668
669/// A trait for types with associated string name and value.
670pub trait NameValue {
671    /// Returns the object's name.
672    fn name(&self) -> &[u8];
673
674    /// Returns the object's value.
675    fn value(&self) -> &[u8];
676}
677
678impl<N, V> NameValue for (N, V)
679where
680    N: AsRef<[u8]>,
681    V: AsRef<[u8]>,
682{
683    fn name(&self) -> &[u8] {
684        self.0.as_ref()
685    }
686
687    fn value(&self) -> &[u8] {
688        self.1.as_ref()
689    }
690}
691
692/// An owned name-value pair representing a raw HTTP header.
693#[derive(Clone, PartialEq, Eq)]
694pub struct Header(Vec<u8>, Vec<u8>);
695
696fn try_print_as_readable(hdr: &[u8], f: &mut fmt::Formatter) -> fmt::Result {
697    match std::str::from_utf8(hdr) {
698        Ok(s) => f.write_str(&s.escape_default().to_string()),
699        Err(_) => write!(f, "{hdr:?}"),
700    }
701}
702
703impl fmt::Debug for Header {
704    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
705        f.write_char('"')?;
706        try_print_as_readable(&self.0, f)?;
707        f.write_str(": ")?;
708        try_print_as_readable(&self.1, f)?;
709        f.write_char('"')
710    }
711}
712
713impl Header {
714    /// Creates a new header.
715    ///
716    /// Both `name` and `value` will be cloned.
717    pub fn new(name: &[u8], value: &[u8]) -> Self {
718        Self(name.to_vec(), value.to_vec())
719    }
720}
721
722impl NameValue for Header {
723    fn name(&self) -> &[u8] {
724        &self.0
725    }
726
727    fn value(&self) -> &[u8] {
728        &self.1
729    }
730}
731
732/// A non-owned name-value pair representing a raw HTTP header.
733#[derive(Clone, Debug, PartialEq, Eq)]
734pub struct HeaderRef<'a>(&'a [u8], &'a [u8]);
735
736impl<'a> HeaderRef<'a> {
737    /// Creates a new header.
738    pub const fn new(name: &'a [u8], value: &'a [u8]) -> Self {
739        Self(name, value)
740    }
741}
742
743impl NameValue for HeaderRef<'_> {
744    fn name(&self) -> &[u8] {
745        self.0
746    }
747
748    fn value(&self) -> &[u8] {
749        self.1
750    }
751}
752
753/// An HTTP/3 connection event.
754#[derive(Clone, Debug, PartialEq, Eq)]
755pub enum Event {
756    /// Request/response headers were received.
757    Headers {
758        /// The list of received header fields. The application should validate
759        /// pseudo-headers and headers.
760        list: Vec<Header>,
761
762        /// Whether more frames will follow the headers on the stream.
763        more_frames: bool,
764    },
765
766    /// Data was received.
767    ///
768    /// This indicates that the application can use the [`recv_body()`] method
769    /// to retrieve the data from the stream.
770    ///
771    /// Note that [`recv_body()`] will need to be called repeatedly until the
772    /// [`Done`] value is returned, as the event will not be re-armed until all
773    /// buffered data is read.
774    ///
775    /// [`recv_body()`]: struct.Connection.html#method.recv_body
776    /// [`Done`]: enum.Error.html#variant.Done
777    Data,
778
779    /// Stream was closed,
780    Finished,
781
782    /// Stream was reset.
783    ///
784    /// The associated data represents the error code sent by the peer.
785    Reset(u64),
786
787    /// PRIORITY_UPDATE was received.
788    ///
789    /// This indicates that the application can use the
790    /// [`take_last_priority_update()`] method to take the last received
791    /// PRIORITY_UPDATE for a specified stream.
792    ///
793    /// This event is triggered once per stream until the last PRIORITY_UPDATE
794    /// is taken. It is recommended that applications defer taking the
795    /// PRIORITY_UPDATE until after [`poll()`] returns [`Done`].
796    ///
797    /// [`take_last_priority_update()`]: struct.Connection.html#method.take_last_priority_update
798    /// [`poll()`]: struct.Connection.html#method.poll
799    /// [`Done`]: enum.Error.html#variant.Done
800    PriorityUpdate,
801
802    /// GOAWAY was received.
803    GoAway,
804}
805
806/// Extensible Priorities parameters.
807///
808/// The `TryFrom` trait supports constructing this object from the serialized
809/// Structured Fields Dictionary field value. I.e, use `TryFrom` to parse the
810/// value of a Priority header field or a PRIORITY_UPDATE frame. Using this
811/// trait requires the `sfv` feature to be enabled.
812#[derive(Clone, Copy, Debug, PartialEq, Eq)]
813#[repr(C)]
814pub struct Priority {
815    urgency: u8,
816    incremental: bool,
817}
818
819impl Default for Priority {
820    fn default() -> Self {
821        Priority {
822            urgency: PRIORITY_URGENCY_DEFAULT,
823            incremental: PRIORITY_INCREMENTAL_DEFAULT,
824        }
825    }
826}
827
828impl Priority {
829    /// Creates a new Priority.
830    pub const fn new(urgency: u8, incremental: bool) -> Self {
831        Priority {
832            urgency,
833            incremental,
834        }
835    }
836}
837
838#[cfg(feature = "sfv")]
839#[cfg_attr(docsrs, doc(cfg(feature = "sfv")))]
840impl TryFrom<&[u8]> for Priority {
841    type Error = Error;
842
843    /// Try to parse an Extensible Priority field value.
844    ///
845    /// The field value is expected to be a Structured Fields Dictionary; see
846    /// [Extensible Priorities].
847    ///
848    /// If the `u` or `i` fields are contained with correct types, a constructed
849    /// Priority object is returned. Note that urgency values outside of valid
850    /// range (0 through 7) are clamped to 7.
851    ///
852    /// If the `u` or `i` fields are contained with the wrong types,
853    /// Error::Done is returned.
854    ///
855    /// Omitted parameters will yield default values.
856    ///
857    /// [Extensible Priorities]: https://www.rfc-editor.org/rfc/rfc9218.html#section-4.
858    fn try_from(value: &[u8]) -> std::result::Result<Self, Self::Error> {
859        let dict = match sfv::Parser::parse_dictionary(value) {
860            Ok(v) => v,
861
862            Err(_) => return Err(Error::Done),
863        };
864
865        let urgency = match dict.get("u") {
866            // If there is a u parameter, try to read it as an Item of type
867            // Integer. If the value out of the spec's allowed range
868            // (0 through 7), that's an error so set it to the upper
869            // bound (lowest priority) to avoid interference with
870            // other streams.
871            Some(sfv::ListEntry::Item(item)) => match item.bare_item.as_int() {
872                Some(v) => {
873                    if !(PRIORITY_URGENCY_LOWER_BOUND as i64..=
874                        PRIORITY_URGENCY_UPPER_BOUND as i64)
875                        .contains(&v)
876                    {
877                        PRIORITY_URGENCY_UPPER_BOUND
878                    } else {
879                        v as u8
880                    }
881                },
882
883                None => return Err(Error::Done),
884            },
885
886            Some(sfv::ListEntry::InnerList(_)) => return Err(Error::Done),
887
888            // Omitted so use default value.
889            None => PRIORITY_URGENCY_DEFAULT,
890        };
891
892        let incremental = match dict.get("i") {
893            Some(sfv::ListEntry::Item(item)) =>
894                item.bare_item.as_bool().ok_or(Error::Done)?,
895
896            // Omitted so use default value.
897            _ => false,
898        };
899
900        Ok(Priority::new(urgency, incremental))
901    }
902}
903
904struct ConnectionSettings {
905    pub max_field_section_size: Option<u64>,
906    pub qpack_max_table_capacity: Option<u64>,
907    pub qpack_blocked_streams: Option<u64>,
908    pub connect_protocol_enabled: Option<u64>,
909    pub h3_datagram: Option<u64>,
910    pub additional_settings: Option<Vec<(u64, u64)>>,
911    pub raw: Option<Vec<(u64, u64)>>,
912}
913
914#[derive(Default)]
915struct QpackStreams {
916    pub encoder_stream_id: Option<u64>,
917    pub encoder_stream_bytes: u64,
918    pub decoder_stream_id: Option<u64>,
919    pub decoder_stream_bytes: u64,
920}
921
922/// Statistics about the connection.
923///
924/// A connection's statistics can be collected using the [`stats()`] method.
925///
926/// [`stats()`]: struct.Connection.html#method.stats
927#[derive(Clone, Default)]
928pub struct Stats {
929    /// The number of bytes received on the QPACK encoder stream.
930    pub qpack_encoder_stream_recv_bytes: u64,
931    /// The number of bytes received on the QPACK decoder stream.
932    pub qpack_decoder_stream_recv_bytes: u64,
933}
934
935fn close_conn_critical_stream<F: BufFactory>(
936    conn: &mut super::Connection<F>,
937) -> Result<()> {
938    conn.close(
939        true,
940        Error::ClosedCriticalStream.to_wire(),
941        b"Critical stream closed.",
942    )?;
943
944    Err(Error::ClosedCriticalStream)
945}
946
947fn close_conn_if_critical_stream_finished<F: BufFactory>(
948    conn: &mut super::Connection<F>, stream_id: u64,
949) -> Result<()> {
950    if conn.stream_finished(stream_id) {
951        close_conn_critical_stream(conn)?;
952    }
953
954    Ok(())
955}
956
957/// An HTTP/3 connection.
958pub struct Connection {
959    is_server: bool,
960
961    next_request_stream_id: u64,
962    next_uni_stream_id: u64,
963
964    streams: crate::stream::StreamIdHashMap<stream::Stream>,
965
966    local_settings: ConnectionSettings,
967    peer_settings: ConnectionSettings,
968
969    control_stream_id: Option<u64>,
970    peer_control_stream_id: Option<u64>,
971
972    qpack_encoder: qpack::Encoder,
973    qpack_decoder: qpack::Decoder,
974
975    local_qpack_streams: QpackStreams,
976    peer_qpack_streams: QpackStreams,
977
978    max_push_id: u64,
979
980    finished_streams: VecDeque<u64>,
981
982    frames_greased: bool,
983
984    local_goaway_id: Option<u64>,
985    peer_goaway_id: Option<u64>,
986}
987
988impl Connection {
989    fn new(
990        config: &Config, is_server: bool, enable_dgram: bool,
991    ) -> Result<Connection> {
992        let initial_uni_stream_id = if is_server { 0x3 } else { 0x2 };
993        let h3_datagram = if enable_dgram { Some(1) } else { None };
994
995        Ok(Connection {
996            is_server,
997
998            next_request_stream_id: 0,
999
1000            next_uni_stream_id: initial_uni_stream_id,
1001
1002            streams: Default::default(),
1003
1004            local_settings: ConnectionSettings {
1005                max_field_section_size: config.max_field_section_size,
1006                qpack_max_table_capacity: config.qpack_max_table_capacity,
1007                qpack_blocked_streams: config.qpack_blocked_streams,
1008                connect_protocol_enabled: config.connect_protocol_enabled,
1009                h3_datagram,
1010                additional_settings: config.additional_settings.clone(),
1011                raw: Default::default(),
1012            },
1013
1014            peer_settings: ConnectionSettings {
1015                max_field_section_size: None,
1016                qpack_max_table_capacity: None,
1017                qpack_blocked_streams: None,
1018                h3_datagram: None,
1019                connect_protocol_enabled: None,
1020                additional_settings: Default::default(),
1021                raw: Default::default(),
1022            },
1023
1024            control_stream_id: None,
1025            peer_control_stream_id: None,
1026
1027            qpack_encoder: qpack::Encoder::new(),
1028            qpack_decoder: qpack::Decoder::new(),
1029
1030            local_qpack_streams: Default::default(),
1031            peer_qpack_streams: Default::default(),
1032
1033            max_push_id: 0,
1034
1035            finished_streams: VecDeque::new(),
1036
1037            frames_greased: false,
1038
1039            local_goaway_id: None,
1040            peer_goaway_id: None,
1041        })
1042    }
1043
1044    /// Creates a new HTTP/3 connection using the provided QUIC connection.
1045    ///
1046    /// This will also initiate the HTTP/3 handshake with the peer by opening
1047    /// all control streams (including QPACK) and sending the local settings.
1048    ///
1049    /// On success the new connection is returned.
1050    ///
1051    /// The [`StreamLimit`] error is returned when the HTTP/3 control stream
1052    /// cannot be created due to stream limits.
1053    ///
1054    /// The [`InternalError`] error is returned when either the underlying QUIC
1055    /// connection is not in a suitable state, or the HTTP/3 control stream
1056    /// cannot be created due to flow control limits.
1057    ///
1058    /// [`StreamLimit`]: ../enum.Error.html#variant.StreamLimit
1059    /// [`InternalError`]: ../enum.Error.html#variant.InternalError
1060    pub fn with_transport<F: BufFactory>(
1061        conn: &mut super::Connection<F>, config: &Config,
1062    ) -> Result<Connection> {
1063        let is_client = !conn.is_server;
1064        if is_client && !(conn.is_established() || conn.is_in_early_data()) {
1065            trace!("{} QUIC connection must be established or in early data before creating an HTTP/3 connection", conn.trace_id());
1066            return Err(Error::InternalError);
1067        }
1068
1069        let mut http3_conn =
1070            Connection::new(config, conn.is_server, conn.dgram_enabled())?;
1071
1072        match http3_conn.send_settings(conn) {
1073            Ok(_) => (),
1074
1075            Err(e) => {
1076                conn.close(true, e.to_wire(), b"Error opening control stream")?;
1077                return Err(e);
1078            },
1079        };
1080
1081        // Try opening QPACK streams, but ignore errors if it fails since we
1082        // don't need them right now.
1083        http3_conn.open_qpack_encoder_stream(conn).ok();
1084        http3_conn.open_qpack_decoder_stream(conn).ok();
1085
1086        if conn.grease {
1087            // Try opening a GREASE stream, but ignore errors since it's not
1088            // critical.
1089            http3_conn.open_grease_stream(conn).ok();
1090        }
1091
1092        Ok(http3_conn)
1093    }
1094
1095    /// Sends an HTTP/3 request.
1096    ///
1097    /// The request is encoded from the provided list of headers without a
1098    /// body, and sent on a newly allocated stream. To include a body,
1099    /// set `fin` as `false` and subsequently call [`send_body()`] with the
1100    /// same `conn` and the `stream_id` returned from this method.
1101    ///
1102    /// On success the newly allocated stream ID is returned.
1103    ///
1104    /// The [`StreamBlocked`] error is returned when the underlying QUIC stream
1105    /// doesn't have enough capacity for the operation to complete. When this
1106    /// happens the application should retry the **entire** `send_request` call
1107    /// once the stream is reported as writable again. Any partial state created
1108    /// by the failed call is rolled back, so repeating the call with the same
1109    /// arguments is safe.
1110    ///
1111    /// [`send_body()`]: struct.Connection.html#method.send_body
1112    /// [`StreamBlocked`]: enum.Error.html#variant.StreamBlocked
1113    pub fn send_request<T: NameValue, F: BufFactory>(
1114        &mut self, conn: &mut super::Connection<F>, headers: &[T], fin: bool,
1115    ) -> Result<u64> {
1116        // If we received a GOAWAY from the peer, MUST NOT initiate new
1117        // requests.
1118        if self.peer_goaway_id.is_some() {
1119            return Err(Error::FrameUnexpected);
1120        }
1121
1122        let stream_id = self.next_request_stream_id;
1123
1124        self.streams
1125            .insert(stream_id, <stream::Stream>::new(stream_id, true));
1126
1127        // The underlying QUIC stream does not exist yet, so calls to e.g.
1128        // stream_capacity() will fail. By writing a 0-length buffer, we force
1129        // the creation of the QUIC stream state, without actually writing
1130        // anything.
1131        if let Err(e) = conn.stream_send(stream_id, b"", false) {
1132            self.streams.remove(&stream_id);
1133
1134            if e == super::Error::Done {
1135                return Err(Error::StreamBlocked);
1136            }
1137
1138            return Err(e.into());
1139        };
1140
1141        if let Err(e) = self.send_headers(conn, stream_id, headers, fin) {
1142            // If the stream was blocked before any header bytes were written,
1143            // the QUIC stream exists but carries no H3 data yet. Roll back the
1144            // H3-layer stream entry so the stream ID is not consumed and a
1145            // subsequent retry of `send_request` starts from a clean state,
1146            // consistent with the `StreamBlocked` path above.
1147            if e == Error::StreamBlocked {
1148                self.streams.remove(&stream_id);
1149            }
1150
1151            return Err(e);
1152        }
1153
1154        // To avoid skipping stream IDs, we only calculate the next available
1155        // stream ID when a request has been successfully buffered.
1156        self.next_request_stream_id = self
1157            .next_request_stream_id
1158            .checked_add(4)
1159            .ok_or(Error::IdError)?;
1160
1161        Ok(stream_id)
1162    }
1163
1164    /// Sends an HTTP/3 response on the specified stream with default priority.
1165    ///
1166    /// This method sends the provided `headers` as a single initial response
1167    /// without a body.
1168    ///
1169    /// To send a non-final 1xx, then a final 200+ without body:
1170    ///   * send_response() with `fin` set to `false`.
1171    ///   * [`send_additional_headers()`] with fin set to `true` using the same
1172    ///     `stream_id` value.
1173    ///
1174    /// To send a non-final 1xx, then a final 200+ with body:
1175    ///   * send_response() with `fin` set to `false`.
1176    ///   * [`send_additional_headers()`] with fin set to `false` and same
1177    ///     `stream_id` value.
1178    ///   * [`send_body()`] with same `stream_id`.
1179    ///
1180    /// To send a final 200+ with body:
1181    ///   * send_response() with `fin` set to `false`.
1182    ///   * [`send_body()`] with same `stream_id`.
1183    ///
1184    /// Additional headers can only be sent during certain phases of an HTTP/3
1185    /// message exchange, see [Section 4.1 of RFC 9114]. The [`FrameUnexpected`]
1186    /// error is returned if this method, or [`send_response_with_priority()`],
1187    /// are called multiple times with the same `stream_id` value.
1188    ///
1189    /// The [`StreamBlocked`] error is returned when the underlying QUIC stream
1190    /// doesn't have enough capacity for the operation to complete. When this
1191    /// happens the application should retry the operation once the stream is
1192    /// reported as writable again.
1193    ///
1194    /// [`send_body()`]: struct.Connection.html#method.send_body
1195    /// [`send_additional_headers()`]:
1196    ///     struct.Connection.html#method.send_additional_headers
1197    /// [`send_response_with_priority()`]:
1198    ///     struct.Connection.html#method.send_response_with_priority
1199    /// [`FrameUnexpected`]: enum.Error.html#variant.FrameUnexpected
1200    /// [`StreamBlocked`]: enum.Error.html#variant.StreamBlocked
1201    pub fn send_response<T: NameValue, F: BufFactory>(
1202        &mut self, conn: &mut super::Connection<F>, stream_id: u64,
1203        headers: &[T], fin: bool,
1204    ) -> Result<()> {
1205        let priority = Default::default();
1206
1207        self.send_response_with_priority(
1208            conn, stream_id, headers, &priority, fin,
1209        )?;
1210
1211        Ok(())
1212    }
1213
1214    /// Sends an HTTP/3 response on the specified stream with specified
1215    /// priority.
1216    ///
1217    /// This method sends the provided `headers` as a single initial response
1218    /// without a body.
1219    ///
1220    /// To send a non-final 1xx, then a final 200+ without body:
1221    ///   * send_response_with_priority() with `fin` set to `false`.
1222    ///   * [`send_additional_headers()`] with fin set to `true` using the same
1223    ///     `stream_id` value.
1224    ///
1225    /// To send a non-final 1xx, then a final 200+ with body:
1226    ///   * send_response_with_priority() with `fin` set to `false`.
1227    ///   * [`send_additional_headers()`] with fin set to `false` and same
1228    ///     `stream_id` value.
1229    ///   * [`send_body()`] with same `stream_id`.
1230    ///
1231    /// To send a final 200+ with body:
1232    ///   * send_response_with_priority() with `fin` set to `false`.
1233    ///   * [`send_body()`] with same `stream_id`.
1234    ///
1235    /// The `priority` parameter represents [Extensible Priority]
1236    /// parameters. If the urgency is outside the range 0-7, it will be clamped
1237    /// to 7.
1238    ///
1239    /// Additional headers can only be sent during certain phases of an HTTP/3
1240    /// message exchange, see [Section 4.1 of RFC 9114]. The [`FrameUnexpected`]
1241    /// error is returned if this method, or [`send_response()`],
1242    /// are called multiple times with the same `stream_id` value.
1243    ///
1244    /// The [`StreamBlocked`] error is returned when the underlying QUIC stream
1245    /// doesn't have enough capacity for the operation to complete. When this
1246    /// happens the application should retry the operation once the stream is
1247    /// reported as writable again.
1248    ///
1249    /// [`send_body()`]: struct.Connection.html#method.send_body
1250    /// [`send_additional_headers()`]:
1251    ///     struct.Connection.html#method.send_additional_headers
1252    /// [`send_response()`]:
1253    ///     struct.Connection.html#method.send_response
1254    /// [`FrameUnexpected`]: enum.Error.html#variant.FrameUnexpected
1255    /// [`StreamBlocked`]: enum.Error.html#variant.StreamBlocked
1256    /// [Extensible Priority]: https://www.rfc-editor.org/rfc/rfc9218.html#section-4.
1257    pub fn send_response_with_priority<T: NameValue, F: BufFactory>(
1258        &mut self, conn: &mut super::Connection<F>, stream_id: u64,
1259        headers: &[T], priority: &Priority, fin: bool,
1260    ) -> Result<()> {
1261        match self.streams.get(&stream_id) {
1262            Some(s) => {
1263                // Only one initial HEADERS allowed.
1264                if s.local_initialized() {
1265                    return Err(Error::FrameUnexpected);
1266                }
1267
1268                s
1269            },
1270
1271            None => return Err(Error::FrameUnexpected),
1272        };
1273
1274        self.send_headers(conn, stream_id, headers, fin)?;
1275
1276        // Clamp and shift urgency into quiche-priority space
1277        let urgency = priority
1278            .urgency
1279            .clamp(PRIORITY_URGENCY_LOWER_BOUND, PRIORITY_URGENCY_UPPER_BOUND) +
1280            PRIORITY_URGENCY_OFFSET;
1281
1282        conn.stream_priority(stream_id, urgency, priority.incremental)?;
1283
1284        Ok(())
1285    }
1286
1287    /// Sends additional HTTP/3 headers.
1288    ///
1289    /// After the initial request or response headers have been sent, using
1290    /// [`send_request()`] or [`send_response()`] respectively, this method can
1291    /// be used send an additional HEADERS frame. For example, to send a single
1292    /// instance of trailers after a request with a body, or to issue another
1293    /// non-final 1xx after a preceding 1xx, or to issue a final response after
1294    /// a preceding 1xx.
1295    ///
1296    /// Additional headers can only be sent during certain phases of an HTTP/3
1297    /// message exchange, see [Section 4.1 of RFC 9114]. The [`FrameUnexpected`]
1298    /// error is returned when this method is called during the wrong phase,
1299    /// such as before initial headers have been sent, or if trailers have
1300    /// already been sent.
1301    ///
1302    /// The [`StreamBlocked`] error is returned when the underlying QUIC stream
1303    /// doesn't have enough capacity for the operation to complete. When this
1304    /// happens the application should retry the operation once the stream is
1305    /// reported as writable again.
1306    ///
1307    /// [`send_request()`]: struct.Connection.html#method.send_request
1308    /// [`send_response()`]: struct.Connection.html#method.send_response
1309    /// [`StreamBlocked`]: enum.Error.html#variant.StreamBlocked
1310    /// [`FrameUnexpected`]: enum.Error.html#variant.FrameUnexpected
1311    /// [Section 4.1 of RFC 9114]:
1312    ///     https://www.rfc-editor.org/rfc/rfc9114.html#section-4.1.
1313    pub fn send_additional_headers<T: NameValue, F: BufFactory>(
1314        &mut self, conn: &mut super::Connection<F>, stream_id: u64,
1315        headers: &[T], is_trailer_section: bool, fin: bool,
1316    ) -> Result<()> {
1317        // Clients can only send trailer headers.
1318        if !self.is_server && !is_trailer_section {
1319            return Err(Error::FrameUnexpected);
1320        }
1321
1322        match self.streams.get(&stream_id) {
1323            Some(s) => {
1324                // Initial HEADERS must have been sent.
1325                if !s.local_initialized() {
1326                    return Err(Error::FrameUnexpected);
1327                }
1328
1329                // Only one trailing HEADERS allowed.
1330                if s.trailers_sent() {
1331                    return Err(Error::FrameUnexpected);
1332                }
1333
1334                s
1335            },
1336
1337            None => return Err(Error::FrameUnexpected),
1338        };
1339
1340        self.send_headers(conn, stream_id, headers, fin)?;
1341
1342        if is_trailer_section {
1343            // send_headers() might have tidied the stream away, so we need to
1344            // check again.
1345            if let Some(s) = self.streams.get_mut(&stream_id) {
1346                s.mark_trailers_sent();
1347            }
1348        }
1349
1350        Ok(())
1351    }
1352
1353    /// Sends additional HTTP/3 headers with specified priority.
1354    ///
1355    /// After the initial request or response headers have been sent, using
1356    /// [`send_request()`] or [`send_response()`] respectively, this method can
1357    /// be used send an additional HEADERS frame. For example, to send a single
1358    /// instance of trailers after a request with a body, or to issue another
1359    /// non-final 1xx after a preceding 1xx, or to issue a final response after
1360    /// a preceding 1xx.
1361    ///
1362    /// The `priority` parameter represents [Extensible Priority]
1363    /// parameters. If the urgency is outside the range 0-7, it will be clamped
1364    /// to 7.
1365    ///
1366    /// Additional headers can only be sent during certain phases of an HTTP/3
1367    /// message exchange, see [Section 4.1 of RFC 9114]. The [`FrameUnexpected`]
1368    /// error is returned when this method is called during the wrong phase,
1369    /// such as before initial headers have been sent, or if trailers have
1370    /// already been sent.
1371    ///
1372    /// The [`StreamBlocked`] error is returned when the underlying QUIC stream
1373    /// doesn't have enough capacity for the operation to complete. When this
1374    /// happens the application should retry the operation once the stream is
1375    /// reported as writable again.
1376    ///
1377    /// [`send_request()`]: struct.Connection.html#method.send_request
1378    /// [`send_response()`]: struct.Connection.html#method.send_response
1379    /// [`StreamBlocked`]: enum.Error.html#variant.StreamBlocked
1380    /// [`FrameUnexpected`]: enum.Error.html#variant.FrameUnexpected
1381    /// [Section 4.1 of RFC 9114]:
1382    ///     https://www.rfc-editor.org/rfc/rfc9114.html#section-4.1.
1383    /// [Extensible Priority]: https://www.rfc-editor.org/rfc/rfc9218.html#section-4.
1384    pub fn send_additional_headers_with_priority<T: NameValue, F: BufFactory>(
1385        &mut self, conn: &mut super::Connection<F>, stream_id: u64,
1386        headers: &[T], priority: &Priority, is_trailer_section: bool, fin: bool,
1387    ) -> Result<()> {
1388        self.send_additional_headers(
1389            conn,
1390            stream_id,
1391            headers,
1392            is_trailer_section,
1393            fin,
1394        )?;
1395
1396        // Clamp and shift urgency into quiche-priority space
1397        let urgency = priority
1398            .urgency
1399            .clamp(PRIORITY_URGENCY_LOWER_BOUND, PRIORITY_URGENCY_UPPER_BOUND) +
1400            PRIORITY_URGENCY_OFFSET;
1401
1402        conn.stream_priority(stream_id, urgency, priority.incremental)?;
1403
1404        Ok(())
1405    }
1406
1407    fn encode_header_block<T: NameValue>(
1408        &mut self, headers: &[T],
1409    ) -> Result<Vec<u8>> {
1410        let headers_len = headers
1411            .iter()
1412            .fold(0, |acc, h| acc + h.value().len() + h.name().len() + 32);
1413
1414        let mut header_block = vec![0; headers_len];
1415        let len = self
1416            .qpack_encoder
1417            .encode(headers, &mut header_block)
1418            .map_err(|_| Error::InternalError)?;
1419
1420        header_block.truncate(len);
1421
1422        Ok(header_block)
1423    }
1424
1425    fn send_headers<T: NameValue, F: BufFactory>(
1426        &mut self, conn: &mut super::Connection<F>, stream_id: u64,
1427        headers: &[T], fin: bool,
1428    ) -> Result<()> {
1429        let mut d = [42; 10];
1430        let mut b = octets::OctetsMut::with_slice(&mut d);
1431
1432        if !self.frames_greased && conn.grease {
1433            self.send_grease_frames(conn, stream_id)?;
1434            self.frames_greased = true;
1435        }
1436
1437        let header_block = self.encode_header_block(headers)?;
1438
1439        let overhead = octets::varint_len(frame::HEADERS_FRAME_TYPE_ID) +
1440            octets::varint_len(header_block.len() as u64);
1441
1442        // Headers need to be sent atomically, so make sure the stream has
1443        // enough capacity.
1444        match conn.stream_writable(stream_id, overhead + header_block.len()) {
1445            Ok(true) => (),
1446
1447            Ok(false) => return Err(Error::StreamBlocked),
1448
1449            Err(e) => {
1450                if conn.stream_finished(stream_id) {
1451                    self.streams.remove(&stream_id);
1452                }
1453
1454                return Err(e.into());
1455            },
1456        };
1457
1458        b.put_varint(frame::HEADERS_FRAME_TYPE_ID)?;
1459        b.put_varint(header_block.len() as u64)?;
1460        let off = b.off();
1461        conn.stream_send(stream_id, &d[..off], false)?;
1462
1463        // Sending header block separately avoids unnecessary copy.
1464        conn.stream_send(stream_id, &header_block, fin)?;
1465
1466        trace!(
1467            "{} tx frm HEADERS stream={} len={} fin={}",
1468            conn.trace_id(),
1469            stream_id,
1470            header_block.len(),
1471            fin
1472        );
1473
1474        qlog_with_type!(QLOG_FRAME_CREATED, conn.qlog, q, {
1475            let qlog_headers = headers
1476                .iter()
1477                .map(|h| qlog::events::http3::HttpHeader {
1478                    name: Some(String::from_utf8_lossy(h.name()).into_owned()),
1479                    name_bytes: None,
1480                    value: Some(String::from_utf8_lossy(h.value()).into_owned()),
1481                    value_bytes: None,
1482                })
1483                .collect();
1484
1485            let frame = Http3Frame::Headers {
1486                headers: qlog_headers,
1487                raw: None,
1488            };
1489            let ev_data = EventData::Http3FrameCreated(FrameCreated {
1490                stream_id,
1491                length: Some(header_block.len() as u64),
1492                frame,
1493                ..Default::default()
1494            });
1495
1496            q.add_event_data_now(ev_data).ok();
1497        });
1498
1499        if let Some(s) = self.streams.get_mut(&stream_id) {
1500            s.initialize_local();
1501        }
1502
1503        if fin && conn.stream_finished(stream_id) {
1504            self.streams.remove(&stream_id);
1505        }
1506
1507        Ok(())
1508    }
1509
1510    /// Sends an HTTP/3 body chunk on the given stream.
1511    ///
1512    /// On success the number of bytes written is returned, or [`Done`] if no
1513    /// bytes could be written (e.g. because the stream is blocked).
1514    ///
1515    /// Note that the number of written bytes returned can be lower than the
1516    /// length of the input buffer when the underlying QUIC stream doesn't have
1517    /// enough capacity for the operation to complete.
1518    ///
1519    /// When a partial write happens (including when [`Done`] is returned) the
1520    /// application should retry the operation once the stream is reported as
1521    /// writable again.
1522    ///
1523    /// [`Done`]: enum.Error.html#variant.Done
1524    pub fn send_body<F: BufFactory>(
1525        &mut self, conn: &mut super::Connection<F>, stream_id: u64, body: &[u8],
1526        fin: bool,
1527    ) -> Result<usize> {
1528        self.do_send_body(
1529            conn,
1530            stream_id,
1531            body,
1532            fin,
1533            |conn: &mut super::Connection<F>,
1534             header: &[u8],
1535             stream_id: u64,
1536             body: &[u8],
1537             body_len: usize,
1538             fin: bool| {
1539                conn.stream_send(stream_id, header, false)?;
1540                Ok(conn
1541                    .stream_send(stream_id, &body[..body_len], fin)
1542                    .map(|v| (v, v))?)
1543            },
1544        )
1545    }
1546
1547    /// Sends an HTTP/3 body chunk provided as a raw buffer on the given stream.
1548    ///
1549    /// If the capacity allows it the buffer will be appended to the stream's
1550    /// send queue with zero copying.
1551    ///
1552    /// On success the number of bytes written is returned, or [`Done`] if no
1553    /// bytes could be written (e.g. because the stream is blocked).
1554    ///
1555    /// Note that the number of written bytes returned can be lower than the
1556    /// length of the input buffer when the underlying QUIC stream doesn't have
1557    /// enough capacity for the operation to complete.
1558    ///
1559    /// When a partial write happens (including when [`Done`] is returned) the
1560    /// remaining (unwrittent) buffer will also be returned. The application
1561    /// should retry the operation once the stream is reported as writable
1562    /// again.
1563    ///
1564    /// [`Done`]: enum.Error.html#variant.Done
1565    pub fn send_body_zc<F>(
1566        &mut self, conn: &mut super::Connection<F>, stream_id: u64,
1567        body: &mut F::Buf, fin: bool,
1568    ) -> Result<usize>
1569    where
1570        F: BufFactory,
1571        F::Buf: BufSplit,
1572    {
1573        self.do_send_body(
1574            conn,
1575            stream_id,
1576            body,
1577            fin,
1578            |conn: &mut super::Connection<F>,
1579             header: &[u8],
1580             stream_id: u64,
1581             body: &mut F::Buf,
1582             mut body_len: usize,
1583             fin: bool| {
1584                let with_prefix = body.try_add_prefix(header);
1585                if !with_prefix {
1586                    conn.stream_send(stream_id, header, false)?;
1587                } else {
1588                    body_len += header.len();
1589                }
1590
1591                let remainder = body.split_at(body_len);
1592                // body now contains the first `body_len` bytes of the original
1593                // buffer
1594                debug_assert_eq!(body.as_ref().len(), body_len);
1595
1596                let (mut n, rem) =
1597                    conn.stream_send_zc(stream_id, body.clone(), fin)?;
1598                if rem.as_ref().is_some_and(|v| !v.as_ref().is_empty()) {
1599                    // `rem` should always be None or empty.
1600                    // `do_send_body()` should have checked the capacity and
1601                    // ensured that there is enough capacity to write the header +
1602                    // fully body.
1603                    debug_assert!(false);
1604                    return Err(Error::InternalError);
1605                }
1606
1607                if with_prefix {
1608                    n -= header.len();
1609                }
1610
1611                if !remainder.as_ref().is_empty() {
1612                    let _ = std::mem::replace(body, remainder);
1613                }
1614
1615                Ok((n, n))
1616            },
1617        )
1618    }
1619
1620    fn do_send_body<F, B, R, SND>(
1621        &mut self, conn: &mut super::Connection<F>, stream_id: u64, body: B,
1622        fin: bool, write_fn: SND,
1623    ) -> Result<R>
1624    where
1625        F: BufFactory,
1626        B: AsRef<[u8]>,
1627        SND: FnOnce(
1628            &mut super::Connection<F>,
1629            &[u8],
1630            u64,
1631            B,
1632            usize,
1633            bool,
1634        ) -> Result<(usize, R)>,
1635    {
1636        let mut d = [42; 10];
1637        let mut b = octets::OctetsMut::with_slice(&mut d);
1638
1639        let len = body.as_ref().len();
1640
1641        // Validate that it is sane to send data on the stream.
1642        if stream_id % 4 != 0 {
1643            return Err(Error::FrameUnexpected);
1644        }
1645
1646        match self.streams.get_mut(&stream_id) {
1647            Some(s) => {
1648                if !s.local_initialized() {
1649                    return Err(Error::FrameUnexpected);
1650                }
1651
1652                if s.trailers_sent() {
1653                    return Err(Error::FrameUnexpected);
1654                }
1655            },
1656
1657            None => {
1658                return Err(Error::FrameUnexpected);
1659            },
1660        };
1661
1662        // Avoid sending 0-length DATA frames when the fin flag is false.
1663        if len == 0 && !fin {
1664            return Err(Error::Done);
1665        }
1666
1667        let overhead = octets::varint_len(frame::DATA_FRAME_TYPE_ID) +
1668            octets::varint_len(len as u64);
1669
1670        let stream_cap = match conn.stream_capacity(stream_id) {
1671            Ok(v) => v,
1672
1673            Err(e) => {
1674                if conn.stream_finished(stream_id) {
1675                    self.streams.remove(&stream_id);
1676                }
1677
1678                return Err(e.into());
1679            },
1680        };
1681
1682        // Make sure there is enough capacity to send the DATA frame header.
1683        if stream_cap < overhead {
1684            let _ = conn.stream_writable(stream_id, overhead + 1);
1685            return Err(Error::Done);
1686        }
1687
1688        // Cap the frame payload length to the stream's capacity.
1689        let body_len = std::cmp::min(len, stream_cap - overhead);
1690
1691        // If we can't send the entire body, set the fin flag to false so the
1692        // application can try again later.
1693        let fin = if body_len != len { false } else { fin };
1694
1695        // Again, avoid sending 0-length DATA frames when the fin flag is false.
1696        if body_len == 0 && !fin {
1697            let _ = conn.stream_writable(stream_id, overhead + 1);
1698            return Err(Error::Done);
1699        }
1700
1701        b.put_varint(frame::DATA_FRAME_TYPE_ID)?;
1702        b.put_varint(body_len as u64)?;
1703        let off = b.off();
1704
1705        // Return how many bytes were written, excluding the frame header.
1706        // Sending body separately avoids unnecessary copy.
1707        let (written, ret) =
1708            write_fn(conn, &d[..off], stream_id, body, body_len, fin)?;
1709        if written != body_len {
1710            // This should never happen. If it does, it means we wrote an
1711            // incorrect frame length and thus we can't really continue.
1712            debug_assert!(false);
1713            return Err(Error::InternalError);
1714        }
1715
1716        trace!(
1717            "{} tx frm DATA stream={} len={} fin={}",
1718            conn.trace_id(),
1719            stream_id,
1720            written,
1721            fin
1722        );
1723
1724        qlog_with_type!(QLOG_FRAME_CREATED, conn.qlog, q, {
1725            let frame = Http3Frame::Data { raw: None };
1726            let ev_data = EventData::Http3FrameCreated(FrameCreated {
1727                stream_id,
1728                length: Some(written as u64),
1729                frame,
1730                ..Default::default()
1731            });
1732
1733            q.add_event_data_now(ev_data).ok();
1734        });
1735
1736        if written < len {
1737            // Ensure the peer is notified that the connection or stream is
1738            // blocked when the stream's capacity is limited by flow control.
1739            //
1740            // We only need enough capacity to send a few bytes, to make sure
1741            // the stream doesn't hang due to congestion window not growing
1742            // enough.
1743            let _ = conn.stream_writable(stream_id, overhead + 1);
1744        }
1745
1746        if fin && written == len && conn.stream_finished(stream_id) {
1747            self.streams.remove(&stream_id);
1748        }
1749
1750        Ok(ret)
1751    }
1752
1753    /// Returns whether the peer enabled HTTP/3 DATAGRAM frame support.
1754    ///
1755    /// Support is signalled by the peer's SETTINGS, so this method always
1756    /// returns false until they have been processed using the [`poll()`]
1757    /// method.
1758    ///
1759    /// [`poll()`]: struct.Connection.html#method.poll
1760    pub fn dgram_enabled_by_peer<F: BufFactory>(
1761        &self, conn: &super::Connection<F>,
1762    ) -> bool {
1763        self.peer_settings.h3_datagram == Some(1) &&
1764            conn.dgram_max_writable_len().is_some()
1765    }
1766
1767    /// Returns whether the peer enabled extended CONNECT support.
1768    ///
1769    /// Support is signalled by the peer's SETTINGS, so this method always
1770    /// returns false until they have been processed using the [`poll()`]
1771    /// method.
1772    ///
1773    /// [`poll()`]: struct.Connection.html#method.poll
1774    pub fn extended_connect_enabled_by_peer(&self) -> bool {
1775        self.peer_settings.connect_protocol_enabled == Some(1)
1776    }
1777
1778    /// Reads request or response body data into the provided buffer.
1779    ///
1780    /// Applications should call this method whenever the [`poll()`] method
1781    /// returns a [`Data`] event.
1782    ///
1783    /// On success the amount of bytes read is returned, or [`Done`] if there
1784    /// is no data to read.
1785    ///
1786    /// [`poll()`]: struct.Connection.html#method.poll
1787    /// [`Data`]: enum.Event.html#variant.Data
1788    /// [`Done`]: enum.Error.html#variant.Done
1789    pub fn recv_body<F: BufFactory>(
1790        &mut self, conn: &mut super::Connection<F>, stream_id: u64,
1791        out: &mut [u8],
1792    ) -> Result<usize> {
1793        self.recv_body_buf(conn, stream_id, out)
1794    }
1795
1796    /// Reads request or response body data into the provided BufMut buffer.
1797    ///
1798    /// **NOTE**:
1799    /// The BufMut will be populated with all available data up to its capacity.
1800    /// Since some BufMut implementations, e.g., [`Vec<u8>`], dynamically
1801    /// allocate additional memory, the caller may use
1802    /// [`BufMut::limit()`] to limit the maximum amount of data that
1803    /// can be written.
1804    ///
1805    /// Applications should call this method (or [`recv_body()`]) whenever
1806    /// the [`poll()`] method returns a [`Data`] event.
1807    ///
1808    /// On success the amount of bytes read is returned, or [`Done`] if there
1809    /// is no data to read.
1810    ///
1811    /// [`BufMut::limit()`]: bytes::BufMut::limit()
1812    /// [`recv_body()`]: Self::recv_body()
1813    /// [`poll()`]: struct.Connection.html#method.poll
1814    /// [`Data`]: enum.Event.html#variant.Data
1815    /// [`Done`]: enum.Error.html#variant.Done
1816    ///
1817    /// ## Example:
1818    /// ```no_run
1819    /// # use quiche::h3;
1820    /// fn receive(
1821    ///     qconn: &mut quiche::Connection, h3conn: &mut h3::Connection,
1822    /// ) -> Result<Vec<u8>, h3::Error> {
1823    ///     use bytes::BufMut as _;
1824    ///     let mut buffer = Vec::with_capacity(2048).limit(2048);
1825    ///     let bytes = h3conn.recv_body_buf(qconn, 0, &mut buffer)?;
1826    ///     let buffer = buffer.into_inner();
1827    ///     // The vec has been filled with exactly `bytes` number of bytes
1828    ///     assert_eq!(buffer.len(), bytes);
1829    ///     Ok(buffer)
1830    /// }
1831    /// ```
1832    pub fn recv_body_buf<F: BufFactory, OUT: bytes::BufMut>(
1833        &mut self, conn: &mut super::Connection<F>, stream_id: u64, mut out: OUT,
1834    ) -> Result<usize> {
1835        let mut total = 0;
1836
1837        // Try to consume all buffered data for the stream, even across multiple
1838        // DATA frames.
1839        // Note, that even if the BufMut does not have a limit defined, we are
1840        // inherently limited by how much data is in quiche's receive buffer for
1841        // that stream, so the BufMut cannot grow unbounded.
1842        while out.has_remaining_mut() {
1843            let stream = self.streams.get_mut(&stream_id).ok_or(Error::Done)?;
1844
1845            if stream.state() != stream::State::Data {
1846                break;
1847            }
1848
1849            let (read, fin) = match stream.try_consume_data(conn, &mut out) {
1850                Ok(v) => v,
1851
1852                Err(Error::Done) => break,
1853
1854                Err(e) => return Err(e),
1855            };
1856
1857            total += read;
1858
1859            // No more data to read, we are done.
1860            if read == 0 || fin {
1861                break;
1862            }
1863
1864            // Process incoming data from the stream. For example, if a whole
1865            // DATA frame was consumed, and another one is queued behind it,
1866            // this will ensure the additional data will also be returned to
1867            // the application.
1868            match self.process_readable_stream(conn, stream_id, false) {
1869                Ok(_) => unreachable!(),
1870
1871                Err(Error::Done) => (),
1872
1873                Err(e) => return Err(e),
1874            };
1875
1876            if conn.stream_finished(stream_id) {
1877                break;
1878            }
1879        }
1880
1881        // While body is being received, the stream is marked as finished only
1882        // when all data is read by the application.
1883        if conn.stream_finished(stream_id) {
1884            self.process_finished_stream(stream_id);
1885        }
1886
1887        if total == 0 {
1888            return Err(Error::Done);
1889        }
1890
1891        Ok(total)
1892    }
1893
1894    /// Sends a PRIORITY_UPDATE frame on the control stream with specified
1895    /// request stream ID and priority.
1896    ///
1897    /// The `priority` parameter represents [Extensible Priority]
1898    /// parameters. If the urgency is outside the range 0-7, it will be clamped
1899    /// to 7.
1900    ///
1901    /// The [`StreamBlocked`] error is returned when the underlying QUIC stream
1902    /// doesn't have enough capacity for the operation to complete. When this
1903    /// happens the application should retry the operation once the stream is
1904    /// reported as writable again.
1905    ///
1906    /// [`StreamBlocked`]: enum.Error.html#variant.StreamBlocked
1907    /// [Extensible Priority]: https://www.rfc-editor.org/rfc/rfc9218.html#section-4.
1908    pub fn send_priority_update_for_request<F: BufFactory>(
1909        &mut self, conn: &mut super::Connection<F>, stream_id: u64,
1910        priority: &Priority,
1911    ) -> Result<()> {
1912        let mut d = [42; 20];
1913        let mut b = octets::OctetsMut::with_slice(&mut d);
1914
1915        // Validate that it is sane to send PRIORITY_UPDATE.
1916        if self.is_server {
1917            return Err(Error::FrameUnexpected);
1918        }
1919
1920        if stream_id % 4 != 0 {
1921            return Err(Error::FrameUnexpected);
1922        }
1923
1924        let control_stream_id =
1925            self.control_stream_id.ok_or(Error::FrameUnexpected)?;
1926
1927        let urgency = priority
1928            .urgency
1929            .clamp(PRIORITY_URGENCY_LOWER_BOUND, PRIORITY_URGENCY_UPPER_BOUND);
1930
1931        let mut field_value = format!("u={urgency}");
1932
1933        if priority.incremental {
1934            field_value.push_str(",i");
1935        }
1936
1937        let priority_field_value = field_value.as_bytes();
1938        let frame_payload_len =
1939            octets::varint_len(stream_id) + priority_field_value.len();
1940
1941        let overhead =
1942            octets::varint_len(frame::PRIORITY_UPDATE_FRAME_REQUEST_TYPE_ID) +
1943                octets::varint_len(stream_id) +
1944                octets::varint_len(frame_payload_len as u64);
1945
1946        // Make sure the control stream has enough capacity.
1947        match conn.stream_writable(
1948            control_stream_id,
1949            overhead + priority_field_value.len(),
1950        ) {
1951            Ok(true) => (),
1952
1953            Ok(false) => return Err(Error::StreamBlocked),
1954
1955            Err(e) => {
1956                return Err(e.into());
1957            },
1958        }
1959
1960        b.put_varint(frame::PRIORITY_UPDATE_FRAME_REQUEST_TYPE_ID)?;
1961        b.put_varint(frame_payload_len as u64)?;
1962        b.put_varint(stream_id)?;
1963        let off = b.off();
1964        conn.stream_send(control_stream_id, &d[..off], false)?;
1965
1966        // Sending field value separately avoids unnecessary copy.
1967        conn.stream_send(control_stream_id, priority_field_value, false)?;
1968
1969        trace!(
1970            "{} tx frm PRIORITY_UPDATE request_stream={} priority_field_value={}",
1971            conn.trace_id(),
1972            stream_id,
1973            field_value,
1974        );
1975
1976        qlog_with_type!(QLOG_FRAME_CREATED, conn.qlog, q, {
1977            let frame = Http3Frame::PriorityUpdate {
1978                stream_id: Some(stream_id),
1979                push_id: None,
1980                priority_field_value: field_value.clone(),
1981                raw: None,
1982            };
1983
1984            let ev_data = EventData::Http3FrameCreated(FrameCreated {
1985                stream_id,
1986                length: Some(priority_field_value.len() as u64),
1987                frame,
1988                ..Default::default()
1989            });
1990
1991            q.add_event_data_now(ev_data).ok();
1992        });
1993
1994        Ok(())
1995    }
1996
1997    /// Take the last PRIORITY_UPDATE for a prioritized element ID.
1998    ///
1999    /// When the [`poll()`] method returns a [`PriorityUpdate`] event for a
2000    /// prioritized element, the event has triggered and will not rearm until
2001    /// applications call this method. It is recommended that applications defer
2002    /// taking the PRIORITY_UPDATE until after [`poll()`] returns [`Done`].
2003    ///
2004    /// On success the Priority Field Value is returned, or [`Done`] if there is
2005    /// no PRIORITY_UPDATE to read (either because there is no value to take, or
2006    /// because the prioritized element does not exist).
2007    ///
2008    /// [`poll()`]: struct.Connection.html#method.poll
2009    /// [`PriorityUpdate`]: enum.Event.html#variant.PriorityUpdate
2010    /// [`Done`]: enum.Error.html#variant.Done
2011    pub fn take_last_priority_update(
2012        &mut self, prioritized_element_id: u64,
2013    ) -> Result<Vec<u8>> {
2014        if let Some(stream) = self.streams.get_mut(&prioritized_element_id) {
2015            return stream.take_last_priority_update().ok_or(Error::Done);
2016        }
2017
2018        Err(Error::Done)
2019    }
2020
2021    /// Processes HTTP/3 data received from the peer.
2022    ///
2023    /// On success it returns an [`Event`] and an ID, or [`Done`] when there are
2024    /// no events to report.
2025    ///
2026    /// Note that all events are edge-triggered, meaning that once reported they
2027    /// will not be reported again by calling this method again, until the event
2028    /// is re-armed.
2029    ///
2030    /// The events [`Headers`], [`Data`] and [`Finished`] return a stream ID,
2031    /// which is used in methods [`recv_body()`], [`send_response()`] or
2032    /// [`send_body()`].
2033    ///
2034    /// The event [`GoAway`] returns an ID that depends on the connection role.
2035    /// A client receives the largest processed stream ID. A server receives the
2036    /// the largest permitted push ID.
2037    ///
2038    /// The event [`PriorityUpdate`] only occurs at servers. It returns a
2039    /// prioritized element ID that is used in the method
2040    /// [`take_last_priority_update()`], which rearms the event for that ID.
2041    ///
2042    /// If an error occurs while processing data, the connection is closed with
2043    /// the appropriate error code, using the transport's [`close()`] method.
2044    ///
2045    /// [`Event`]: enum.Event.html
2046    /// [`Done`]: enum.Error.html#variant.Done
2047    /// [`Headers`]: enum.Event.html#variant.Headers
2048    /// [`Data`]: enum.Event.html#variant.Data
2049    /// [`Finished`]: enum.Event.html#variant.Finished
2050    /// [`GoAway`]: enum.Event.html#variant.GoAWay
2051    /// [`PriorityUpdate`]: enum.Event.html#variant.PriorityUpdate
2052    /// [`recv_body()`]: struct.Connection.html#method.recv_body
2053    /// [`send_response()`]: struct.Connection.html#method.send_response
2054    /// [`send_body()`]: struct.Connection.html#method.send_body
2055    /// [`recv_dgram()`]: struct.Connection.html#method.recv_dgram
2056    /// [`take_last_priority_update()`]: struct.Connection.html#method.take_last_priority_update
2057    /// [`close()`]: ../struct.Connection.html#method.close
2058    pub fn poll<F: BufFactory>(
2059        &mut self, conn: &mut super::Connection<F>,
2060    ) -> Result<(u64, Event)> {
2061        // When connection close is initiated by the local application (e.g. due
2062        // to a protocol error), the connection itself might be in a broken
2063        // state, so return early.
2064        if conn.local_error.is_some() {
2065            return Err(Error::Done);
2066        }
2067
2068        // Process control streams first.
2069        if let Some(stream_id) = self.peer_control_stream_id {
2070            match self.process_control_stream(conn, stream_id) {
2071                Ok(ev) => return Ok(ev),
2072
2073                Err(Error::Done) => (),
2074
2075                Err(e) => return Err(e),
2076            };
2077        }
2078
2079        if let Some(stream_id) = self.peer_qpack_streams.encoder_stream_id {
2080            match self.process_control_stream(conn, stream_id) {
2081                Ok(ev) => return Ok(ev),
2082
2083                Err(Error::Done) => (),
2084
2085                Err(e) => return Err(e),
2086            };
2087        }
2088
2089        if let Some(stream_id) = self.peer_qpack_streams.decoder_stream_id {
2090            match self.process_control_stream(conn, stream_id) {
2091                Ok(ev) => return Ok(ev),
2092
2093                Err(Error::Done) => (),
2094
2095                Err(e) => return Err(e),
2096            };
2097        }
2098
2099        // Process finished streams list.
2100        if let Some(finished) = self.finished_streams.pop_front() {
2101            return Ok((finished, Event::Finished));
2102        }
2103
2104        // Process HTTP/3 data from readable streams.
2105        for s in conn.readable() {
2106            trace!("{} stream id {} is readable", conn.trace_id(), s);
2107
2108            let ev = match self.process_readable_stream(conn, s, true) {
2109                Ok(v) => Some(v),
2110
2111                Err(Error::Done) => None,
2112
2113                // Return early if the stream was reset, to avoid returning
2114                // a Finished event later as well.
2115                Err(Error::TransportError(crate::Error::StreamReset(e))) =>
2116                    return Ok((s, Event::Reset(e))),
2117
2118                Err(e) => return Err(e),
2119            };
2120
2121            if conn.stream_finished(s) {
2122                self.process_finished_stream(s);
2123            }
2124
2125            // TODO: check if stream is completed so it can be freed
2126            if let Some(ev) = ev {
2127                return Ok(ev);
2128            }
2129        }
2130
2131        // Process finished streams list once again, to make sure `Finished`
2132        // events are returned when receiving empty stream frames with the fin
2133        // flag set.
2134        if let Some(finished) = self.finished_streams.pop_front() {
2135            if conn.stream_readable(finished) {
2136                // The stream is finished, but is still readable, it may
2137                // indicate that there is a pending error, such as reset.
2138                if let Err(crate::Error::StreamReset(e)) =
2139                    conn.stream_recv(finished, &mut [])
2140                {
2141                    return Ok((finished, Event::Reset(e)));
2142                }
2143            }
2144            return Ok((finished, Event::Finished));
2145        }
2146
2147        Err(Error::Done)
2148    }
2149
2150    /// Sends a GOAWAY frame to initiate graceful connection closure.
2151    ///
2152    /// When quiche is used in the server role, the `id` parameter is the stream
2153    /// ID of the highest processed request. This can be any valid ID between 0
2154    /// and 2^62-4. However, the ID cannot be increased. Failure to satisfy
2155    /// these conditions will return an error.
2156    ///
2157    /// This method does not close the QUIC connection. Applications are
2158    /// required to call [`close()`] themselves.
2159    ///
2160    /// [`close()`]: ../struct.Connection.html#method.close
2161    pub fn send_goaway<F: BufFactory>(
2162        &mut self, conn: &mut super::Connection<F>, id: u64,
2163    ) -> Result<()> {
2164        let mut id = id;
2165
2166        // TODO: server push
2167        //
2168        // In the meantime always send 0 from client.
2169        if !self.is_server {
2170            id = 0;
2171        }
2172
2173        if self.is_server && id % 4 != 0 {
2174            return Err(Error::IdError);
2175        }
2176
2177        if let Some(sent_id) = self.local_goaway_id {
2178            if id > sent_id {
2179                return Err(Error::IdError);
2180            }
2181        }
2182
2183        if let Some(stream_id) = self.control_stream_id {
2184            let mut d = [42; 10];
2185            let mut b = octets::OctetsMut::with_slice(&mut d);
2186
2187            let frame = frame::Frame::GoAway { id };
2188
2189            let wire_len = frame.to_bytes(&mut b)?;
2190            let stream_cap = conn.stream_capacity(stream_id)?;
2191
2192            if stream_cap < wire_len {
2193                return Err(Error::StreamBlocked);
2194            }
2195
2196            trace!("{} tx frm {:?}", conn.trace_id(), frame);
2197
2198            qlog_with_type!(QLOG_FRAME_CREATED, conn.qlog, q, {
2199                let ev_data = EventData::Http3FrameCreated(FrameCreated {
2200                    stream_id,
2201                    length: Some(octets::varint_len(id) as u64),
2202                    frame: frame.to_qlog(),
2203                    ..Default::default()
2204                });
2205
2206                q.add_event_data_now(ev_data).ok();
2207            });
2208
2209            let off = b.off();
2210            conn.stream_send(stream_id, &d[..off], false)?;
2211
2212            self.local_goaway_id = Some(id);
2213        }
2214
2215        Ok(())
2216    }
2217
2218    /// Gets the raw settings from peer including unknown and reserved types.
2219    ///
2220    /// The order of settings is the same as received in the SETTINGS frame.
2221    pub fn peer_settings_raw(&self) -> Option<&[(u64, u64)]> {
2222        self.peer_settings.raw.as_deref()
2223    }
2224
2225    fn open_uni_stream<F: BufFactory>(
2226        &mut self, conn: &mut super::Connection<F>, ty: u64,
2227    ) -> Result<u64> {
2228        let stream_id = self.next_uni_stream_id;
2229
2230        let mut d = [0; 8];
2231        let mut b = octets::OctetsMut::with_slice(&mut d);
2232
2233        match ty {
2234            // Control and QPACK streams are the most important to schedule.
2235            stream::HTTP3_CONTROL_STREAM_TYPE_ID |
2236            stream::QPACK_ENCODER_STREAM_TYPE_ID |
2237            stream::QPACK_DECODER_STREAM_TYPE_ID => {
2238                conn.stream_priority(stream_id, 0, false)?;
2239            },
2240
2241            // TODO: Server push
2242            stream::HTTP3_PUSH_STREAM_TYPE_ID => (),
2243
2244            // Anything else is a GREASE stream, so make it the least important.
2245            _ => {
2246                conn.stream_priority(stream_id, 255, false)?;
2247            },
2248        }
2249
2250        conn.stream_send(stream_id, b.put_varint(ty)?, false)?;
2251
2252        // To avoid skipping stream IDs, we only calculate the next available
2253        // stream ID when data has been successfully buffered.
2254        self.next_uni_stream_id = self
2255            .next_uni_stream_id
2256            .checked_add(4)
2257            .ok_or(Error::IdError)?;
2258
2259        Ok(stream_id)
2260    }
2261
2262    fn open_qpack_encoder_stream<F: BufFactory>(
2263        &mut self, conn: &mut super::Connection<F>,
2264    ) -> Result<()> {
2265        let stream_id =
2266            self.open_uni_stream(conn, stream::QPACK_ENCODER_STREAM_TYPE_ID)?;
2267
2268        self.local_qpack_streams.encoder_stream_id = Some(stream_id);
2269
2270        qlog_with_type!(QLOG_STREAM_TYPE_SET, conn.qlog, q, {
2271            let ev_data = EventData::Http3StreamTypeSet(StreamTypeSet {
2272                stream_id,
2273                initiator: Some(Initiator::Local),
2274                stream_type: StreamType::QpackEncode,
2275                ..Default::default()
2276            });
2277
2278            q.add_event_data_now(ev_data).ok();
2279        });
2280
2281        Ok(())
2282    }
2283
2284    fn open_qpack_decoder_stream<F: BufFactory>(
2285        &mut self, conn: &mut super::Connection<F>,
2286    ) -> Result<()> {
2287        let stream_id =
2288            self.open_uni_stream(conn, stream::QPACK_DECODER_STREAM_TYPE_ID)?;
2289
2290        self.local_qpack_streams.decoder_stream_id = Some(stream_id);
2291
2292        qlog_with_type!(QLOG_STREAM_TYPE_SET, conn.qlog, q, {
2293            let ev_data = EventData::Http3StreamTypeSet(StreamTypeSet {
2294                stream_id,
2295                initiator: Some(Initiator::Local),
2296                stream_type: StreamType::QpackDecode,
2297                ..Default::default()
2298            });
2299
2300            q.add_event_data_now(ev_data).ok();
2301        });
2302
2303        Ok(())
2304    }
2305
2306    /// Send GREASE frames on the provided stream ID.
2307    fn send_grease_frames<F: BufFactory>(
2308        &mut self, conn: &mut super::Connection<F>, stream_id: u64,
2309    ) -> Result<()> {
2310        let mut d = [0; 8];
2311
2312        let stream_cap = match conn.stream_capacity(stream_id) {
2313            Ok(v) => v,
2314
2315            Err(e) => {
2316                if conn.stream_finished(stream_id) {
2317                    self.streams.remove(&stream_id);
2318                }
2319
2320                return Err(e.into());
2321            },
2322        };
2323
2324        let grease_frame1 = grease_value();
2325        let grease_frame2 = grease_value();
2326        let grease_payload = b"GREASE is the word";
2327
2328        let overhead = octets::varint_len(grease_frame1) + // frame type
2329            1 + // payload len
2330            octets::varint_len(grease_frame2) + // frame type
2331            1 + // payload len
2332            grease_payload.len(); // payload
2333
2334        // Don't send GREASE if there is not enough capacity for it. Greasing
2335        // will _not_ be attempted again later on.
2336        if stream_cap < overhead {
2337            return Ok(());
2338        }
2339
2340        // Empty GREASE frame.
2341        let mut b = octets::OctetsMut::with_slice(&mut d);
2342        conn.stream_send(stream_id, b.put_varint(grease_frame1)?, false)?;
2343
2344        let mut b = octets::OctetsMut::with_slice(&mut d);
2345        conn.stream_send(stream_id, b.put_varint(0)?, false)?;
2346
2347        trace!(
2348            "{} tx frm GREASE stream={} len=0",
2349            conn.trace_id(),
2350            stream_id
2351        );
2352
2353        qlog_with_type!(QLOG_FRAME_CREATED, conn.qlog, q, {
2354            let frame = Http3Frame::Reserved {
2355                frame_type_bytes: grease_frame1,
2356                raw: None,
2357            };
2358            let ev_data = EventData::Http3FrameCreated(FrameCreated {
2359                stream_id,
2360                length: Some(0),
2361                frame,
2362                ..Default::default()
2363            });
2364
2365            q.add_event_data_now(ev_data).ok();
2366        });
2367
2368        // GREASE frame with payload.
2369        let mut b = octets::OctetsMut::with_slice(&mut d);
2370        conn.stream_send(stream_id, b.put_varint(grease_frame2)?, false)?;
2371
2372        let mut b = octets::OctetsMut::with_slice(&mut d);
2373        conn.stream_send(stream_id, b.put_varint(18)?, false)?;
2374
2375        conn.stream_send(stream_id, grease_payload, false)?;
2376
2377        trace!(
2378            "{} tx frm GREASE stream={} len={}",
2379            conn.trace_id(),
2380            stream_id,
2381            grease_payload.len()
2382        );
2383
2384        qlog_with_type!(QLOG_FRAME_CREATED, conn.qlog, q, {
2385            let frame = Http3Frame::Reserved {
2386                frame_type_bytes: grease_frame2,
2387                raw: None,
2388            };
2389            let ev_data = EventData::Http3FrameCreated(FrameCreated {
2390                stream_id,
2391                length: Some(grease_payload.len() as u64),
2392                frame,
2393                ..Default::default()
2394            });
2395
2396            q.add_event_data_now(ev_data).ok();
2397        });
2398
2399        Ok(())
2400    }
2401
2402    /// Opens a new unidirectional stream with a GREASE type and sends some
2403    /// unframed payload.
2404    fn open_grease_stream<F: BufFactory>(
2405        &mut self, conn: &mut super::Connection<F>,
2406    ) -> Result<()> {
2407        let ty = grease_value();
2408        match self.open_uni_stream(conn, ty) {
2409            Ok(stream_id) => {
2410                conn.stream_send(stream_id, b"GREASE is the word", true)?;
2411
2412                trace!("{} open GREASE stream {}", conn.trace_id(), stream_id);
2413
2414                qlog_with_type!(QLOG_STREAM_TYPE_SET, conn.qlog, q, {
2415                    let ev_data = EventData::Http3StreamTypeSet(StreamTypeSet {
2416                        stream_id,
2417                        initiator: Some(Initiator::Local),
2418                        stream_type: StreamType::Unknown,
2419                        stream_type_bytes: Some(ty),
2420                        ..Default::default()
2421                    });
2422
2423                    q.add_event_data_now(ev_data).ok();
2424                });
2425            },
2426
2427            Err(Error::IdError) => {
2428                trace!("{} GREASE stream blocked", conn.trace_id(),);
2429
2430                return Ok(());
2431            },
2432
2433            Err(e) => return Err(e),
2434        };
2435
2436        Ok(())
2437    }
2438
2439    /// Sends SETTINGS frame based on HTTP/3 configuration.
2440    fn send_settings<F: BufFactory>(
2441        &mut self, conn: &mut super::Connection<F>,
2442    ) -> Result<()> {
2443        let stream_id = match self
2444            .open_uni_stream(conn, stream::HTTP3_CONTROL_STREAM_TYPE_ID)
2445        {
2446            Ok(v) => v,
2447
2448            Err(e) => {
2449                trace!("{} Control stream blocked", conn.trace_id(),);
2450
2451                if e == Error::Done {
2452                    return Err(Error::InternalError);
2453                }
2454
2455                return Err(e);
2456            },
2457        };
2458
2459        self.control_stream_id = Some(stream_id);
2460
2461        qlog_with_type!(QLOG_STREAM_TYPE_SET, conn.qlog, q, {
2462            let ev_data = EventData::Http3StreamTypeSet(StreamTypeSet {
2463                stream_id,
2464                initiator: Some(Initiator::Local),
2465                stream_type: StreamType::Control,
2466                ..Default::default()
2467            });
2468
2469            q.add_event_data_now(ev_data).ok();
2470        });
2471
2472        let grease = if conn.grease {
2473            Some((grease_value(), grease_value()))
2474        } else {
2475            None
2476        };
2477
2478        let frame = frame::Frame::Settings {
2479            max_field_section_size: self.local_settings.max_field_section_size,
2480            qpack_max_table_capacity: self
2481                .local_settings
2482                .qpack_max_table_capacity,
2483            qpack_blocked_streams: self.local_settings.qpack_blocked_streams,
2484            connect_protocol_enabled: self
2485                .local_settings
2486                .connect_protocol_enabled,
2487            h3_datagram: self.local_settings.h3_datagram,
2488            grease,
2489            additional_settings: self.local_settings.additional_settings.clone(),
2490            raw: Default::default(),
2491        };
2492
2493        let mut d = [42; 128];
2494        let mut b = octets::OctetsMut::with_slice(&mut d);
2495
2496        frame.to_bytes(&mut b)?;
2497
2498        let off = b.off();
2499
2500        if let Some(id) = self.control_stream_id {
2501            conn.stream_send(id, &d[..off], false)?;
2502
2503            trace!(
2504                "{} tx frm SETTINGS stream={} len={}",
2505                conn.trace_id(),
2506                id,
2507                off
2508            );
2509
2510            qlog_with_type!(QLOG_FRAME_CREATED, conn.qlog, q, {
2511                let frame = frame.to_qlog();
2512                let ev_data = EventData::Http3FrameCreated(FrameCreated {
2513                    stream_id: id,
2514                    length: Some(off as u64),
2515                    frame,
2516                    ..Default::default()
2517                });
2518
2519                q.add_event_data_now(ev_data).ok();
2520            });
2521        }
2522
2523        Ok(())
2524    }
2525
2526    fn process_control_stream<F: BufFactory>(
2527        &mut self, conn: &mut super::Connection<F>, stream_id: u64,
2528    ) -> Result<(u64, Event)> {
2529        close_conn_if_critical_stream_finished(conn, stream_id)?;
2530
2531        if !conn.stream_readable(stream_id) {
2532            return Err(Error::Done);
2533        }
2534
2535        match self.process_readable_stream(conn, stream_id, true) {
2536            Ok(ev) => return Ok(ev),
2537
2538            Err(Error::Done) => (),
2539
2540            Err(e) => return Err(e),
2541        };
2542
2543        close_conn_if_critical_stream_finished(conn, stream_id)?;
2544
2545        Err(Error::Done)
2546    }
2547
2548    fn process_readable_stream<F: BufFactory>(
2549        &mut self, conn: &mut super::Connection<F>, stream_id: u64, polling: bool,
2550    ) -> Result<(u64, Event)> {
2551        self.streams
2552            .entry(stream_id)
2553            .or_insert_with(|| <stream::Stream>::new(stream_id, false));
2554
2555        // We need to get a fresh reference to the stream for each
2556        // iteration, to avoid borrowing `self` for the entire duration
2557        // of the loop, because we'll need to borrow it again in the
2558        // `State::FramePayload` case below.
2559        while let Some(stream) = self.streams.get_mut(&stream_id) {
2560            match stream.state() {
2561                stream::State::StreamType => {
2562                    stream.try_fill_buffer(conn)?;
2563
2564                    let varint = match stream.try_consume_varint() {
2565                        Ok(v) => v,
2566
2567                        Err(_) => continue,
2568                    };
2569
2570                    let ty = stream::Type::deserialize(varint)?;
2571
2572                    if let Err(e) = stream.set_ty(ty) {
2573                        conn.close(true, e.to_wire(), b"")?;
2574                        return Err(e);
2575                    }
2576
2577                    qlog_with_type!(QLOG_STREAM_TYPE_SET, conn.qlog, q, {
2578                        let ty_val = if matches!(ty, stream::Type::Unknown) {
2579                            Some(varint)
2580                        } else {
2581                            None
2582                        };
2583
2584                        let ev_data =
2585                            EventData::Http3StreamTypeSet(StreamTypeSet {
2586                                stream_id,
2587                                initiator: Some(Initiator::Remote),
2588                                stream_type: ty.to_qlog(),
2589                                stream_type_bytes: ty_val,
2590                                ..Default::default()
2591                            });
2592
2593                        q.add_event_data_now(ev_data).ok();
2594                    });
2595
2596                    match &ty {
2597                        stream::Type::Control => {
2598                            // Only one control stream allowed.
2599                            if self.peer_control_stream_id.is_some() {
2600                                conn.close(
2601                                    true,
2602                                    Error::StreamCreationError.to_wire(),
2603                                    b"Received multiple control streams",
2604                                )?;
2605
2606                                return Err(Error::StreamCreationError);
2607                            }
2608
2609                            trace!(
2610                                "{} open peer's control stream {}",
2611                                conn.trace_id(),
2612                                stream_id
2613                            );
2614
2615                            close_conn_if_critical_stream_finished(
2616                                conn, stream_id,
2617                            )?;
2618
2619                            self.peer_control_stream_id = Some(stream_id);
2620                        },
2621
2622                        stream::Type::Push => {
2623                            // Only clients can receive push stream.
2624                            if self.is_server {
2625                                conn.close(
2626                                    true,
2627                                    Error::StreamCreationError.to_wire(),
2628                                    b"Server received push stream.",
2629                                )?;
2630
2631                                return Err(Error::StreamCreationError);
2632                            }
2633                        },
2634
2635                        stream::Type::QpackEncoder => {
2636                            // Only one qpack encoder stream allowed.
2637                            if self.peer_qpack_streams.encoder_stream_id.is_some()
2638                            {
2639                                conn.close(
2640                                    true,
2641                                    Error::StreamCreationError.to_wire(),
2642                                    b"Received multiple QPACK encoder streams",
2643                                )?;
2644
2645                                return Err(Error::StreamCreationError);
2646                            }
2647
2648                            close_conn_if_critical_stream_finished(
2649                                conn, stream_id,
2650                            )?;
2651
2652                            self.peer_qpack_streams.encoder_stream_id =
2653                                Some(stream_id);
2654                        },
2655
2656                        stream::Type::QpackDecoder => {
2657                            // Only one qpack decoder allowed.
2658                            if self.peer_qpack_streams.decoder_stream_id.is_some()
2659                            {
2660                                conn.close(
2661                                    true,
2662                                    Error::StreamCreationError.to_wire(),
2663                                    b"Received multiple QPACK decoder streams",
2664                                )?;
2665
2666                                return Err(Error::StreamCreationError);
2667                            }
2668
2669                            close_conn_if_critical_stream_finished(
2670                                conn, stream_id,
2671                            )?;
2672
2673                            self.peer_qpack_streams.decoder_stream_id =
2674                                Some(stream_id);
2675                        },
2676
2677                        stream::Type::Unknown => {
2678                            // Unknown stream types are ignored.
2679                            // TODO: we MAY send STOP_SENDING
2680                        },
2681
2682                        stream::Type::Request => unreachable!(),
2683                    }
2684                },
2685
2686                stream::State::PushId => {
2687                    stream.try_fill_buffer(conn)?;
2688
2689                    let varint = match stream.try_consume_varint() {
2690                        Ok(v) => v,
2691
2692                        Err(_) => continue,
2693                    };
2694
2695                    if let Err(e) = stream.set_push_id(varint) {
2696                        conn.close(true, e.to_wire(), b"")?;
2697                        return Err(e);
2698                    }
2699                },
2700
2701                stream::State::FrameType => {
2702                    stream.try_fill_buffer(conn)?;
2703
2704                    let varint = match stream.try_consume_varint() {
2705                        Ok(v) => v,
2706
2707                        Err(_) => continue,
2708                    };
2709
2710                    match stream.set_frame_type(varint) {
2711                        Err(Error::FrameUnexpected) => {
2712                            let msg = format!("Unexpected frame type {varint}");
2713
2714                            conn.close(
2715                                true,
2716                                Error::FrameUnexpected.to_wire(),
2717                                msg.as_bytes(),
2718                            )?;
2719
2720                            return Err(Error::FrameUnexpected);
2721                        },
2722
2723                        Err(e) => {
2724                            conn.close(
2725                                true,
2726                                e.to_wire(),
2727                                b"Error handling frame.",
2728                            )?;
2729
2730                            return Err(e);
2731                        },
2732
2733                        _ => (),
2734                    }
2735                },
2736
2737                stream::State::FramePayloadLen => {
2738                    stream.try_fill_buffer(conn)?;
2739
2740                    let payload_len = match stream.try_consume_varint() {
2741                        Ok(v) => v,
2742
2743                        Err(_) => continue,
2744                    };
2745
2746                    // DATA frames are handled uniquely. After this point we lose
2747                    // visibility of DATA framing, so just log here.
2748                    if Some(frame::DATA_FRAME_TYPE_ID) == stream.frame_type() {
2749                        trace!(
2750                            "{} rx frm DATA stream={} wire_payload_len={}",
2751                            conn.trace_id(),
2752                            stream_id,
2753                            payload_len
2754                        );
2755
2756                        qlog_with_type!(QLOG_FRAME_PARSED, conn.qlog, q, {
2757                            let frame = Http3Frame::Data { raw: None };
2758
2759                            let ev_data =
2760                                EventData::Http3FrameParsed(FrameParsed {
2761                                    stream_id,
2762                                    length: Some(payload_len),
2763                                    frame,
2764                                    ..Default::default()
2765                                });
2766
2767                            q.add_event_data_now(ev_data).ok();
2768                        });
2769                    }
2770
2771                    if let Err(e) = stream.set_frame_payload_len(payload_len) {
2772                        conn.close(true, e.to_wire(), b"")?;
2773                        return Err(e);
2774                    }
2775                },
2776
2777                stream::State::FramePayload => {
2778                    // Do not emit events when not polling.
2779                    if !polling {
2780                        break;
2781                    }
2782
2783                    stream.try_fill_buffer(conn)?;
2784
2785                    let (frame, payload_len) = match stream.try_consume_frame() {
2786                        Ok(frame) => frame,
2787
2788                        Err(Error::Done) => return Err(Error::Done),
2789
2790                        Err(e) => {
2791                            conn.close(
2792                                true,
2793                                e.to_wire(),
2794                                b"Error handling frame.",
2795                            )?;
2796
2797                            return Err(e);
2798                        },
2799                    };
2800
2801                    match self.process_frame(conn, stream_id, frame, payload_len)
2802                    {
2803                        Ok(ev) => return Ok(ev),
2804
2805                        Err(Error::Done) => {
2806                            // This might be a frame that is processed internally
2807                            // without needing to bubble up to the user as an
2808                            // event. Check whether the frame has FIN'd by QUIC
2809                            // to prevent trying to read again on a closed stream.
2810                            if conn.stream_finished(stream_id) {
2811                                break;
2812                            }
2813                        },
2814
2815                        Err(e) => return Err(e),
2816                    };
2817                },
2818
2819                stream::State::Data => {
2820                    // Do not emit events when not polling.
2821                    if !polling {
2822                        break;
2823                    }
2824
2825                    if !stream.try_trigger_data_event() {
2826                        break;
2827                    }
2828
2829                    return Ok((stream_id, Event::Data));
2830                },
2831
2832                stream::State::QpackInstruction => {
2833                    let mut d = [0; 4096];
2834
2835                    // Read data from the stream and discard immediately.
2836                    loop {
2837                        let (recv, fin) = conn.stream_recv(stream_id, &mut d)?;
2838
2839                        match stream.ty() {
2840                            Some(stream::Type::QpackEncoder) =>
2841                                self.peer_qpack_streams.encoder_stream_bytes +=
2842                                    recv as u64,
2843                            Some(stream::Type::QpackDecoder) =>
2844                                self.peer_qpack_streams.decoder_stream_bytes +=
2845                                    recv as u64,
2846                            _ => unreachable!(),
2847                        };
2848
2849                        if fin {
2850                            close_conn_critical_stream(conn)?;
2851                        }
2852                    }
2853                },
2854
2855                stream::State::Drain => {
2856                    // Discard incoming data on the stream.
2857                    conn.stream_shutdown(
2858                        stream_id,
2859                        crate::Shutdown::Read,
2860                        0x100,
2861                    )?;
2862
2863                    break;
2864                },
2865
2866                stream::State::Finished => break,
2867            }
2868        }
2869
2870        Err(Error::Done)
2871    }
2872
2873    fn process_finished_stream(&mut self, stream_id: u64) {
2874        let stream = match self.streams.get_mut(&stream_id) {
2875            Some(v) => v,
2876
2877            None => return,
2878        };
2879
2880        if stream.state() == stream::State::Finished {
2881            return;
2882        }
2883
2884        match stream.ty() {
2885            Some(stream::Type::Request) | Some(stream::Type::Push) => {
2886                stream.finished();
2887
2888                self.finished_streams.push_back(stream_id);
2889            },
2890
2891            _ => (),
2892        };
2893    }
2894
2895    fn process_frame<F: BufFactory>(
2896        &mut self, conn: &mut super::Connection<F>, stream_id: u64,
2897        frame: frame::Frame, payload_len: u64,
2898    ) -> Result<(u64, Event)> {
2899        trace!(
2900            "{} rx frm {:?} stream={} payload_len={}",
2901            conn.trace_id(),
2902            frame,
2903            stream_id,
2904            payload_len
2905        );
2906
2907        qlog_with_type!(QLOG_FRAME_PARSED, conn.qlog, q, {
2908            // HEADERS frames are special case and will be logged below.
2909            if !matches!(frame, frame::Frame::Headers { .. }) {
2910                let frame = frame.to_qlog();
2911                let ev_data = EventData::Http3FrameParsed(FrameParsed {
2912                    stream_id,
2913                    length: Some(payload_len),
2914                    frame,
2915                    ..Default::default()
2916                });
2917
2918                q.add_event_data_now(ev_data).ok();
2919            }
2920        });
2921
2922        match frame {
2923            frame::Frame::Settings {
2924                max_field_section_size,
2925                qpack_max_table_capacity,
2926                qpack_blocked_streams,
2927                connect_protocol_enabled,
2928                h3_datagram,
2929                additional_settings,
2930                raw,
2931                ..
2932            } => {
2933                self.peer_settings = ConnectionSettings {
2934                    max_field_section_size,
2935                    qpack_max_table_capacity,
2936                    qpack_blocked_streams,
2937                    connect_protocol_enabled,
2938                    h3_datagram,
2939                    additional_settings,
2940                    raw,
2941                };
2942
2943                if let Some(1) = h3_datagram {
2944                    // The peer MUST have also enabled DATAGRAM with a TP
2945                    if conn.dgram_max_writable_len().is_none() {
2946                        conn.close(
2947                            true,
2948                            Error::SettingsError.to_wire(),
2949                            b"H3_DATAGRAM sent with value 1 but max_datagram_frame_size TP not set.",
2950                        )?;
2951
2952                        return Err(Error::SettingsError);
2953                    }
2954                }
2955            },
2956
2957            frame::Frame::Headers { header_block } => {
2958                // Servers reject too many HEADERS frames.
2959                if let Some(s) = self.streams.get_mut(&stream_id) {
2960                    if self.is_server && s.headers_received_count() == 2 {
2961                        conn.close(
2962                            true,
2963                            Error::FrameUnexpected.to_wire(),
2964                            b"Too many HEADERS frames",
2965                        )?;
2966                        return Err(Error::FrameUnexpected);
2967                    }
2968
2969                    s.increment_headers_received();
2970                }
2971
2972                // Use "infinite" as default value for max_field_section_size if
2973                // it is not configured by the application.
2974                let max_size = self
2975                    .local_settings
2976                    .max_field_section_size
2977                    .unwrap_or(u64::MAX);
2978
2979                let headers = match self
2980                    .qpack_decoder
2981                    .decode(&header_block[..], max_size)
2982                {
2983                    Ok(v) => v,
2984
2985                    Err(e) => {
2986                        let e = match e {
2987                            qpack::Error::HeaderListTooLarge =>
2988                                Error::ExcessiveLoad,
2989
2990                            _ => Error::QpackDecompressionFailed,
2991                        };
2992
2993                        conn.close(true, e.to_wire(), b"Error parsing headers.")?;
2994
2995                        return Err(e);
2996                    },
2997                };
2998
2999                qlog_with_type!(QLOG_FRAME_PARSED, conn.qlog, q, {
3000                    let qlog_headers = headers
3001                        .iter()
3002                        .map(|h| qlog::events::http3::HttpHeader {
3003                            name: Some(
3004                                String::from_utf8_lossy(h.name()).into_owned(),
3005                            ),
3006                            name_bytes: None,
3007                            value: Some(
3008                                String::from_utf8_lossy(h.value()).into_owned(),
3009                            ),
3010                            value_bytes: None,
3011                        })
3012                        .collect();
3013
3014                    let frame = Http3Frame::Headers {
3015                        headers: qlog_headers,
3016                        raw: None,
3017                    };
3018
3019                    let ev_data = EventData::Http3FrameParsed(FrameParsed {
3020                        stream_id,
3021                        length: Some(payload_len),
3022                        frame,
3023                        ..Default::default()
3024                    });
3025
3026                    q.add_event_data_now(ev_data).ok();
3027                });
3028
3029                let more_frames = !conn.stream_finished(stream_id);
3030
3031                return Ok((stream_id, Event::Headers {
3032                    list: headers,
3033                    more_frames,
3034                }));
3035            },
3036
3037            frame::Frame::Data { .. } => {
3038                // Do nothing. The Data event is returned separately.
3039            },
3040
3041            frame::Frame::GoAway { id } => {
3042                if !self.is_server && id % 4 != 0 {
3043                    conn.close(
3044                        true,
3045                        Error::FrameUnexpected.to_wire(),
3046                        b"GOAWAY received with ID of non-request stream",
3047                    )?;
3048
3049                    return Err(Error::IdError);
3050                }
3051
3052                if let Some(received_id) = self.peer_goaway_id {
3053                    if id > received_id {
3054                        conn.close(
3055                            true,
3056                            Error::IdError.to_wire(),
3057                            b"GOAWAY received with ID larger than previously received",
3058                        )?;
3059
3060                        return Err(Error::IdError);
3061                    }
3062                }
3063
3064                self.peer_goaway_id = Some(id);
3065
3066                return Ok((id, Event::GoAway));
3067            },
3068
3069            frame::Frame::MaxPushId { push_id } => {
3070                if !self.is_server {
3071                    conn.close(
3072                        true,
3073                        Error::FrameUnexpected.to_wire(),
3074                        b"MAX_PUSH_ID received by client",
3075                    )?;
3076
3077                    return Err(Error::FrameUnexpected);
3078                }
3079
3080                if push_id < self.max_push_id {
3081                    conn.close(
3082                        true,
3083                        Error::IdError.to_wire(),
3084                        b"MAX_PUSH_ID reduced limit",
3085                    )?;
3086
3087                    return Err(Error::IdError);
3088                }
3089
3090                self.max_push_id = push_id;
3091            },
3092
3093            frame::Frame::PushPromise { .. } => {
3094                if self.is_server {
3095                    conn.close(
3096                        true,
3097                        Error::FrameUnexpected.to_wire(),
3098                        b"PUSH_PROMISE received by server",
3099                    )?;
3100
3101                    return Err(Error::FrameUnexpected);
3102                }
3103
3104                if stream_id % 4 != 0 {
3105                    conn.close(
3106                        true,
3107                        Error::FrameUnexpected.to_wire(),
3108                        b"PUSH_PROMISE received on non-request stream",
3109                    )?;
3110
3111                    return Err(Error::FrameUnexpected);
3112                }
3113
3114                // TODO: implement more checks and PUSH_PROMISE event
3115            },
3116
3117            frame::Frame::CancelPush { .. } => {
3118                // TODO: implement CANCEL_PUSH frame
3119            },
3120
3121            frame::Frame::PriorityUpdateRequest {
3122                prioritized_element_id,
3123                priority_field_value,
3124            } => {
3125                if !self.is_server {
3126                    conn.close(
3127                        true,
3128                        Error::FrameUnexpected.to_wire(),
3129                        b"PRIORITY_UPDATE received by client",
3130                    )?;
3131
3132                    return Err(Error::FrameUnexpected);
3133                }
3134
3135                if prioritized_element_id % 4 != 0 {
3136                    conn.close(
3137                        true,
3138                        Error::FrameUnexpected.to_wire(),
3139                        b"PRIORITY_UPDATE for request stream type with wrong ID",
3140                    )?;
3141
3142                    return Err(Error::FrameUnexpected);
3143                }
3144
3145                if prioritized_element_id > conn.streams.max_streams_bidi() * 4 {
3146                    conn.close(
3147                        true,
3148                        Error::IdError.to_wire(),
3149                        b"PRIORITY_UPDATE for request stream beyond max streams limit",
3150                    )?;
3151
3152                    return Err(Error::IdError);
3153                }
3154
3155                // If the PRIORITY_UPDATE is valid, consider storing the latest
3156                // contents. Due to reordering, it is possible that we might
3157                // receive frames that reference streams that have not yet to
3158                // been opened and that's OK because it's within our concurrency
3159                // limit. However, we discard PRIORITY_UPDATE that refers to
3160                // streams that we know have been collected.
3161                if conn.streams.is_collected(prioritized_element_id) {
3162                    return Err(Error::Done);
3163                }
3164
3165                // If the stream did not yet exist, create it and store.
3166                let stream =
3167                    self.streams.entry(prioritized_element_id).or_insert_with(
3168                        || <stream::Stream>::new(prioritized_element_id, false),
3169                    );
3170
3171                let had_priority_update = stream.has_last_priority_update();
3172                stream.set_last_priority_update(Some(priority_field_value));
3173
3174                // Only trigger the event when there wasn't already a stored
3175                // PRIORITY_UPDATE.
3176                if !had_priority_update {
3177                    return Ok((prioritized_element_id, Event::PriorityUpdate));
3178                } else {
3179                    return Err(Error::Done);
3180                }
3181            },
3182
3183            frame::Frame::PriorityUpdatePush {
3184                prioritized_element_id,
3185                ..
3186            } => {
3187                if !self.is_server {
3188                    conn.close(
3189                        true,
3190                        Error::FrameUnexpected.to_wire(),
3191                        b"PRIORITY_UPDATE received by client",
3192                    )?;
3193
3194                    return Err(Error::FrameUnexpected);
3195                }
3196
3197                if prioritized_element_id % 3 != 0 {
3198                    conn.close(
3199                        true,
3200                        Error::FrameUnexpected.to_wire(),
3201                        b"PRIORITY_UPDATE for push stream type with wrong ID",
3202                    )?;
3203
3204                    return Err(Error::FrameUnexpected);
3205                }
3206
3207                // TODO: we only implement this if we implement server push
3208            },
3209
3210            frame::Frame::Unknown { .. } => (),
3211        }
3212
3213        Err(Error::Done)
3214    }
3215
3216    /// Collects and returns statistics about the connection.
3217    #[inline]
3218    pub fn stats(&self) -> Stats {
3219        Stats {
3220            qpack_encoder_stream_recv_bytes: self
3221                .peer_qpack_streams
3222                .encoder_stream_bytes,
3223            qpack_decoder_stream_recv_bytes: self
3224                .peer_qpack_streams
3225                .decoder_stream_bytes,
3226        }
3227    }
3228}
3229
3230/// Generates an HTTP/3 GREASE variable length integer.
3231pub fn grease_value() -> u64 {
3232    let n = super::rand::rand_u64_uniform(148_764_065_110_560_899);
3233    31 * n + 33
3234}
3235
3236#[doc(hidden)]
3237#[cfg(any(test, feature = "internal"))]
3238pub mod testing {
3239    use super::*;
3240
3241    use crate::test_utils;
3242    use crate::DefaultBufFactory;
3243
3244    /// Session is an HTTP/3 test helper structure. It holds a client, server
3245    /// and pipe that allows them to communicate.
3246    ///
3247    /// `default()` creates a session with some sensible default
3248    /// configuration. `with_configs()` allows for providing a specific
3249    /// configuration.
3250    ///
3251    /// `handshake()` performs all the steps needed to establish an HTTP/3
3252    /// connection.
3253    ///
3254    /// Some utility functions are provided that make it less verbose to send
3255    /// request, responses and individual headers. The full quiche API remains
3256    /// available for any test that need to do unconventional things (such as
3257    /// bad behaviour that triggers errors).
3258    pub struct Session<F = DefaultBufFactory>
3259    where
3260        F: BufFactory,
3261    {
3262        pub pipe: test_utils::Pipe<F>,
3263        pub client: Connection,
3264        pub server: Connection,
3265    }
3266
3267    impl Session {
3268        pub fn new() -> Result<Session> {
3269            Session::<DefaultBufFactory>::new_with_buf()
3270        }
3271
3272        pub fn with_configs(
3273            config: &mut crate::Config, h3_config: &Config,
3274        ) -> Result<Session> {
3275            Session::<DefaultBufFactory>::with_configs_and_buf(config, h3_config)
3276        }
3277
3278        pub fn default_configs() -> Result<(crate::Config, Config)> {
3279            fn path_relative_to_manifest_dir(path: &str) -> String {
3280                std::fs::canonicalize(
3281                    std::path::Path::new(env!("CARGO_MANIFEST_DIR")).join(path),
3282                )
3283                .unwrap()
3284                .to_string_lossy()
3285                .into_owned()
3286            }
3287
3288            let mut config = crate::Config::new(crate::PROTOCOL_VERSION)?;
3289            config.load_cert_chain_from_pem_file(
3290                &path_relative_to_manifest_dir("examples/cert.crt"),
3291            )?;
3292            config.load_priv_key_from_pem_file(
3293                &path_relative_to_manifest_dir("examples/cert.key"),
3294            )?;
3295            config.set_application_protos(&[b"h3"])?;
3296            config.set_initial_max_data(1500);
3297            config.set_initial_max_stream_data_bidi_local(150);
3298            config.set_initial_max_stream_data_bidi_remote(150);
3299            config.set_initial_max_stream_data_uni(150);
3300            config.set_initial_max_streams_bidi(5);
3301            config.set_initial_max_streams_uni(5);
3302            config.verify_peer(false);
3303            config.enable_dgram(true, 3, 3);
3304            config.set_ack_delay_exponent(8);
3305
3306            let h3_config = Config::new()?;
3307            Ok((config, h3_config))
3308        }
3309    }
3310
3311    impl<F: BufFactory> Session<F> {
3312        pub fn new_with_buf() -> Result<Session<F>> {
3313            let (mut config, h3_config) = Session::default_configs()?;
3314            Session::with_configs_and_buf(&mut config, &h3_config)
3315        }
3316
3317        pub fn with_configs_and_buf(
3318            config: &mut crate::Config, h3_config: &Config,
3319        ) -> Result<Session<F>> {
3320            let pipe = test_utils::Pipe::with_config_and_buf(config)?;
3321            let client_dgram = pipe.client.dgram_enabled();
3322            let server_dgram = pipe.server.dgram_enabled();
3323            Ok(Session {
3324                pipe,
3325                client: Connection::new(h3_config, false, client_dgram)?,
3326                server: Connection::new(h3_config, true, server_dgram)?,
3327            })
3328        }
3329
3330        /// Do the HTTP/3 handshake so both ends are in sane initial state.
3331        pub fn handshake(&mut self) -> Result<()> {
3332            self.pipe.handshake()?;
3333
3334            // Client streams.
3335            self.client.send_settings(&mut self.pipe.client)?;
3336            self.pipe.advance().ok();
3337
3338            self.client
3339                .open_qpack_encoder_stream(&mut self.pipe.client)?;
3340            self.pipe.advance().ok();
3341
3342            self.client
3343                .open_qpack_decoder_stream(&mut self.pipe.client)?;
3344            self.pipe.advance().ok();
3345
3346            if self.pipe.client.grease {
3347                self.client.open_grease_stream(&mut self.pipe.client)?;
3348            }
3349
3350            self.pipe.advance().ok();
3351
3352            // Server streams.
3353            self.server.send_settings(&mut self.pipe.server)?;
3354            self.pipe.advance().ok();
3355
3356            self.server
3357                .open_qpack_encoder_stream(&mut self.pipe.server)?;
3358            self.pipe.advance().ok();
3359
3360            self.server
3361                .open_qpack_decoder_stream(&mut self.pipe.server)?;
3362            self.pipe.advance().ok();
3363
3364            if self.pipe.server.grease {
3365                self.server.open_grease_stream(&mut self.pipe.server)?;
3366            }
3367
3368            self.advance().ok();
3369
3370            while self.client.poll(&mut self.pipe.client).is_ok() {
3371                // Do nothing.
3372            }
3373
3374            while self.server.poll(&mut self.pipe.server).is_ok() {
3375                // Do nothing.
3376            }
3377
3378            Ok(())
3379        }
3380
3381        /// Advances the session pipe over the buffer.
3382        pub fn advance(&mut self) -> crate::Result<()> {
3383            self.pipe.advance()
3384        }
3385
3386        /// Polls the client for events.
3387        pub fn poll_client(&mut self) -> Result<(u64, Event)> {
3388            self.client.poll(&mut self.pipe.client)
3389        }
3390
3391        /// Polls the server for events.
3392        pub fn poll_server(&mut self) -> Result<(u64, Event)> {
3393            self.server.poll(&mut self.pipe.server)
3394        }
3395
3396        /// Sends a request from client with default headers.
3397        ///
3398        /// On success it returns the newly allocated stream and the headers.
3399        pub fn send_request(&mut self, fin: bool) -> Result<(u64, Vec<Header>)> {
3400            let req = vec![
3401                Header::new(b":method", b"GET"),
3402                Header::new(b":scheme", b"https"),
3403                Header::new(b":authority", b"quic.tech"),
3404                Header::new(b":path", b"/test"),
3405                Header::new(b"user-agent", b"quiche-test"),
3406            ];
3407
3408            let stream =
3409                self.client.send_request(&mut self.pipe.client, &req, fin)?;
3410
3411            self.advance().ok();
3412
3413            Ok((stream, req))
3414        }
3415
3416        /// Sends a response from server with default headers.
3417        ///
3418        /// On success it returns the headers.
3419        pub fn send_response(
3420            &mut self, stream: u64, fin: bool,
3421        ) -> Result<Vec<Header>> {
3422            let resp = vec![
3423                Header::new(b":status", b"200"),
3424                Header::new(b"server", b"quiche-test"),
3425            ];
3426
3427            self.server.send_response(
3428                &mut self.pipe.server,
3429                stream,
3430                &resp,
3431                fin,
3432            )?;
3433
3434            self.advance().ok();
3435
3436            Ok(resp)
3437        }
3438
3439        /// Sends some default payload from client.
3440        ///
3441        /// On success it returns the payload.
3442        pub fn send_body_client(
3443            &mut self, stream: u64, fin: bool,
3444        ) -> Result<Vec<u8>> {
3445            let bytes = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
3446
3447            self.client
3448                .send_body(&mut self.pipe.client, stream, &bytes, fin)?;
3449
3450            self.advance().ok();
3451
3452            Ok(bytes)
3453        }
3454
3455        /// Fetches DATA payload from the server.
3456        ///
3457        /// On success it returns the number of bytes received.
3458        pub fn recv_body_client(
3459            &mut self, stream: u64, buf: &mut [u8],
3460        ) -> Result<usize> {
3461            self.client.recv_body(&mut self.pipe.client, stream, buf)
3462        }
3463
3464        /// Fetches DATA payload from the server.
3465        ///
3466        /// On success it returns the number of bytes received.
3467        pub fn recv_body_buf_client<B: bytes::BufMut>(
3468            &mut self, stream: u64, buf: B,
3469        ) -> Result<usize> {
3470            self.client
3471                .recv_body_buf(&mut self.pipe.client, stream, buf)
3472        }
3473
3474        /// Sends some default payload from server.
3475        ///
3476        /// On success it returns the payload.
3477        pub fn send_body_server(
3478            &mut self, stream: u64, fin: bool,
3479        ) -> Result<Vec<u8>> {
3480            let bytes = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
3481
3482            self.server
3483                .send_body(&mut self.pipe.server, stream, &bytes, fin)?;
3484
3485            self.advance().ok();
3486
3487            Ok(bytes)
3488        }
3489
3490        /// Fetches DATA payload from the client.
3491        ///
3492        /// On success it returns the number of bytes received.
3493        pub fn recv_body_server(
3494            &mut self, stream: u64, buf: &mut [u8],
3495        ) -> Result<usize> {
3496            self.server.recv_body(&mut self.pipe.server, stream, buf)
3497        }
3498
3499        /// Fetches DATA payload from the client.
3500        ///
3501        /// On success it returns the number of bytes received.
3502        pub fn recv_body_buf_server<B: bytes::BufMut>(
3503            &mut self, stream: u64, buf: B,
3504        ) -> Result<usize> {
3505            self.server
3506                .recv_body_buf(&mut self.pipe.server, stream, buf)
3507        }
3508
3509        /// Sends a single HTTP/3 frame from the client.
3510        pub fn send_frame_client(
3511            &mut self, frame: frame::Frame, stream_id: u64, fin: bool,
3512        ) -> Result<()> {
3513            let mut d = [42; 65535];
3514
3515            let mut b = octets::OctetsMut::with_slice(&mut d);
3516
3517            frame.to_bytes(&mut b)?;
3518
3519            let off = b.off();
3520            self.pipe.client.stream_send(stream_id, &d[..off], fin)?;
3521
3522            self.advance().ok();
3523
3524            Ok(())
3525        }
3526
3527        /// Send an HTTP/3 DATAGRAM with default data from the client.
3528        ///
3529        /// On success it returns the data.
3530        pub fn send_dgram_client(&mut self, flow_id: u64) -> Result<Vec<u8>> {
3531            let bytes = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
3532            let len = octets::varint_len(flow_id) + bytes.len();
3533            let mut d = vec![0; len];
3534            let mut b = octets::OctetsMut::with_slice(&mut d);
3535
3536            b.put_varint(flow_id)?;
3537            b.put_bytes(&bytes)?;
3538
3539            self.pipe.client.dgram_send(&d)?;
3540
3541            self.advance().ok();
3542
3543            Ok(bytes)
3544        }
3545
3546        /// Receives an HTTP/3 DATAGRAM from the server.
3547        ///
3548        /// On success it returns the DATAGRAM length, flow ID and flow ID
3549        /// length.
3550        pub fn recv_dgram_client(
3551            &mut self, buf: &mut [u8],
3552        ) -> Result<(usize, u64, usize)> {
3553            let len = self.pipe.client.dgram_recv(buf)?;
3554            let mut b = octets::Octets::with_slice(buf);
3555            let flow_id = b.get_varint()?;
3556
3557            Ok((len, flow_id, b.off()))
3558        }
3559
3560        /// Send an HTTP/3 DATAGRAM with default data from the server
3561        ///
3562        /// On success it returns the data.
3563        pub fn send_dgram_server(&mut self, flow_id: u64) -> Result<Vec<u8>> {
3564            let bytes = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
3565            let len = octets::varint_len(flow_id) + bytes.len();
3566            let mut d = vec![0; len];
3567            let mut b = octets::OctetsMut::with_slice(&mut d);
3568
3569            b.put_varint(flow_id)?;
3570            b.put_bytes(&bytes)?;
3571
3572            self.pipe.server.dgram_send(&d)?;
3573
3574            self.advance().ok();
3575
3576            Ok(bytes)
3577        }
3578
3579        /// Receives an HTTP/3 DATAGRAM from the client.
3580        ///
3581        /// On success it returns the DATAGRAM length, flow ID and flow ID
3582        /// length.
3583        pub fn recv_dgram_server(
3584            &mut self, buf: &mut [u8],
3585        ) -> Result<(usize, u64, usize)> {
3586            let len = self.pipe.server.dgram_recv(buf)?;
3587            let mut b = octets::Octets::with_slice(buf);
3588            let flow_id = b.get_varint()?;
3589
3590            Ok((len, flow_id, b.off()))
3591        }
3592
3593        /// Sends a single HTTP/3 frame from the server.
3594        pub fn send_frame_server(
3595            &mut self, frame: frame::Frame, stream_id: u64, fin: bool,
3596        ) -> Result<()> {
3597            let mut d = [42; 65535];
3598
3599            let mut b = octets::OctetsMut::with_slice(&mut d);
3600
3601            frame.to_bytes(&mut b)?;
3602
3603            let off = b.off();
3604            self.pipe.server.stream_send(stream_id, &d[..off], fin)?;
3605
3606            self.advance().ok();
3607
3608            Ok(())
3609        }
3610
3611        /// Sends an arbitrary buffer of HTTP/3 stream data from the client.
3612        pub fn send_arbitrary_stream_data_client(
3613            &mut self, data: &[u8], stream_id: u64, fin: bool,
3614        ) -> Result<()> {
3615            self.pipe.client.stream_send(stream_id, data, fin)?;
3616
3617            self.advance().ok();
3618
3619            Ok(())
3620        }
3621
3622        /// Sends an arbitrary buffer of HTTP/3 stream data from the server.
3623        pub fn send_arbitrary_stream_data_server(
3624            &mut self, data: &[u8], stream_id: u64, fin: bool,
3625        ) -> Result<()> {
3626            self.pipe.server.stream_send(stream_id, data, fin)?;
3627
3628            self.advance().ok();
3629
3630            Ok(())
3631        }
3632    }
3633}
3634
3635#[cfg(test)]
3636mod tests {
3637    use bytes::BufMut as _;
3638
3639    use super::*;
3640
3641    use super::testing::*;
3642
3643    #[test]
3644    /// Make sure that random GREASE values is within the specified limit.
3645    fn grease_value_in_varint_limit() {
3646        assert!(grease_value() < 2u64.pow(62) - 1);
3647    }
3648
3649    #[test]
3650    fn h3_handshake_0rtt() {
3651        let mut buf = [0; 65535];
3652
3653        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
3654        config
3655            .load_cert_chain_from_pem_file("examples/cert.crt")
3656            .unwrap();
3657        config
3658            .load_priv_key_from_pem_file("examples/cert.key")
3659            .unwrap();
3660        config
3661            .set_application_protos(&[b"proto1", b"proto2"])
3662            .unwrap();
3663        config.set_initial_max_data(30);
3664        config.set_initial_max_stream_data_bidi_local(15);
3665        config.set_initial_max_stream_data_bidi_remote(15);
3666        config.set_initial_max_stream_data_uni(15);
3667        config.set_initial_max_streams_bidi(3);
3668        config.set_initial_max_streams_uni(3);
3669        config.enable_early_data();
3670        config.verify_peer(false);
3671
3672        let h3_config = Config::new().unwrap();
3673
3674        // Perform initial handshake.
3675        let mut pipe = crate::test_utils::Pipe::with_config(&mut config).unwrap();
3676        assert_eq!(pipe.handshake(), Ok(()));
3677
3678        // Extract session,
3679        let session = pipe.client.session().unwrap();
3680
3681        // Configure session on new connection.
3682        let mut pipe = crate::test_utils::Pipe::with_config(&mut config).unwrap();
3683        assert_eq!(pipe.client.set_session(session), Ok(()));
3684
3685        // Can't create an H3 connection until the QUIC connection is determined
3686        // to have made sufficient early data progress.
3687        assert!(matches!(
3688            Connection::with_transport(&mut pipe.client, &h3_config),
3689            Err(Error::InternalError)
3690        ));
3691
3692        // Client sends initial flight.
3693        let (len, _) = pipe.client.send(&mut buf).unwrap();
3694
3695        // Now an H3 connection can be created.
3696        assert!(Connection::with_transport(&mut pipe.client, &h3_config).is_ok());
3697        assert_eq!(pipe.server_recv(&mut buf[..len]), Ok(len));
3698
3699        // Client sends 0-RTT packet.
3700        let pkt_type = crate::packet::Type::ZeroRTT;
3701
3702        let frames = [crate::frame::Frame::Stream {
3703            stream_id: 6,
3704            data: <crate::range_buf::RangeBuf>::from(b"aaaaa", 0, true),
3705        }];
3706
3707        assert_eq!(
3708            pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
3709            Ok(1200)
3710        );
3711
3712        assert_eq!(pipe.server.undecryptable_pkts.len(), 0);
3713
3714        // 0-RTT stream data is readable.
3715        let mut r = pipe.server.readable();
3716        assert_eq!(r.next(), Some(6));
3717        assert_eq!(r.next(), None);
3718
3719        let mut b = [0; 15];
3720        assert_eq!(pipe.server.stream_recv(6, &mut b), Ok((5, true)));
3721        assert_eq!(&b[..5], b"aaaaa");
3722    }
3723
3724    #[test]
3725    /// Send a request with no body, get a response with no body.
3726    fn request_no_body_response_no_body() {
3727        let mut s = Session::new().unwrap();
3728        s.handshake().unwrap();
3729
3730        let (stream, req) = s.send_request(true).unwrap();
3731
3732        assert_eq!(stream, 0);
3733
3734        let ev_headers = Event::Headers {
3735            list: req,
3736            more_frames: false,
3737        };
3738
3739        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
3740        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
3741
3742        let resp = s.send_response(stream, true).unwrap();
3743
3744        let ev_headers = Event::Headers {
3745            list: resp,
3746            more_frames: false,
3747        };
3748
3749        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
3750        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
3751        assert_eq!(s.poll_client(), Err(Error::Done));
3752    }
3753
3754    #[test]
3755    /// Send a request with no body, get a response with one DATA frame.
3756    fn request_no_body_response_one_chunk() {
3757        let mut s = Session::new().unwrap();
3758        s.handshake().unwrap();
3759
3760        let (stream, req) = s.send_request(true).unwrap();
3761        assert_eq!(stream, 0);
3762
3763        let ev_headers = Event::Headers {
3764            list: req,
3765            more_frames: false,
3766        };
3767
3768        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
3769
3770        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
3771
3772        let resp = s.send_response(stream, false).unwrap();
3773
3774        let body = s.send_body_server(stream, true).unwrap();
3775
3776        let mut recv_buf = vec![0; body.len()];
3777
3778        let ev_headers = Event::Headers {
3779            list: resp,
3780            more_frames: true,
3781        };
3782
3783        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
3784
3785        assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
3786        assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(body.len()));
3787
3788        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
3789        assert_eq!(s.poll_client(), Err(Error::Done));
3790    }
3791
3792    #[test]
3793    /// Send a request with no body, get a response with multiple DATA frames.
3794    fn request_no_body_response_many_chunks() {
3795        let mut s = Session::new().unwrap();
3796        s.handshake().unwrap();
3797
3798        let (stream, req) = s.send_request(true).unwrap();
3799
3800        let ev_headers = Event::Headers {
3801            list: req,
3802            more_frames: false,
3803        };
3804
3805        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
3806        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
3807
3808        let total_data_frames = 4;
3809
3810        let resp = s.send_response(stream, false).unwrap();
3811
3812        for _ in 0..total_data_frames - 1 {
3813            s.send_body_server(stream, false).unwrap();
3814        }
3815
3816        let body = s.send_body_server(stream, true).unwrap();
3817
3818        let mut recv_buf = vec![0; body.len()];
3819
3820        let ev_headers = Event::Headers {
3821            list: resp,
3822            more_frames: true,
3823        };
3824
3825        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
3826        assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
3827        assert_eq!(s.poll_client(), Err(Error::Done));
3828
3829        for _ in 0..total_data_frames {
3830            assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(body.len()));
3831        }
3832
3833        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
3834        assert_eq!(s.poll_client(), Err(Error::Done));
3835    }
3836
3837    #[test]
3838    /// Send a request with no body, get a response with multiple DATA frames.
3839    fn request_no_body_response_many_chunks_with_buf() {
3840        let (mut config, h3_config) = Session::default_configs().unwrap();
3841        // we don't want to be limited by flow or cong. control
3842        config.set_initial_congestion_window_packets(100);
3843        config.set_initial_max_data(200_000);
3844        config.set_initial_max_stream_data_bidi_local(200_000);
3845        config.set_initial_max_stream_data_bidi_remote(200_000);
3846        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
3847        s.handshake().unwrap();
3848
3849        let (stream, req) = s.send_request(true).unwrap();
3850
3851        let ev_headers = Event::Headers {
3852            list: req,
3853            more_frames: false,
3854        };
3855
3856        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
3857        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
3858
3859        let total_data_frames = 4;
3860
3861        // Use a large body
3862        let data = vec![0xab_u8; 16 * 1024];
3863
3864        let resp = s.send_response(stream, false).unwrap();
3865
3866        for _ in 0..total_data_frames - 1 {
3867            assert_eq!(
3868                s.server.send_body(&mut s.pipe.server, stream, &data, false),
3869                Ok(data.len())
3870            );
3871            s.advance().ok();
3872        }
3873
3874        s.server
3875            .send_body(&mut s.pipe.server, stream, &data, true)
3876            .unwrap();
3877        s.advance().ok();
3878
3879        let ev_headers = Event::Headers {
3880            list: resp,
3881            more_frames: true,
3882        };
3883
3884        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
3885        assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
3886        assert_eq!(s.poll_client(), Err(Error::Done));
3887
3888        // We expect to be able to read multiple data frames in a single call and
3889        // reads don't have to end on frame boundaries. So let's try to read
3890        // 1.5 times the amount we sent in one frame.
3891        let how_much_to_read_per_call = data.len() * 2 / 3;
3892        let mut remaining_to_read = total_data_frames * data.len();
3893        let mut recv_buf = Vec::new().limit(how_much_to_read_per_call);
3894        assert_eq!(
3895            s.recv_body_buf_client(stream, &mut recv_buf),
3896            Ok(how_much_to_read_per_call)
3897        );
3898        remaining_to_read -= how_much_to_read_per_call;
3899        assert_eq!(recv_buf.get_ref().len(), how_much_to_read_per_call);
3900
3901        while remaining_to_read > 0 {
3902            // Set a different limit for the following reads.
3903            recv_buf.set_limit(data.len());
3904            // We should either read up to the limit we set above, or to
3905            // the end of buffered data.
3906            let expected = std::cmp::min(data.len(), remaining_to_read);
3907            assert_eq!(
3908                s.recv_body_buf_client(stream, &mut recv_buf),
3909                Ok(expected)
3910            );
3911            remaining_to_read -= expected;
3912        }
3913        // We've read everything now. Ensure the Vec reflects that
3914        assert_eq!(recv_buf.get_ref().len(), total_data_frames * data.len());
3915
3916        // No more data to read.
3917        assert_eq!(
3918            s.recv_body_buf_client(stream, &mut recv_buf),
3919            Err(Error::Done)
3920        );
3921
3922        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
3923        assert_eq!(s.poll_client(), Err(Error::Done));
3924    }
3925
3926    #[test]
3927    /// Send a request with one DATA frame, get a response with no body.
3928    fn request_one_chunk_response_no_body() {
3929        let mut s = Session::new().unwrap();
3930        s.handshake().unwrap();
3931
3932        let (stream, req) = s.send_request(false).unwrap();
3933
3934        let body = s.send_body_client(stream, true).unwrap();
3935
3936        let mut recv_buf = vec![0; body.len()];
3937
3938        let ev_headers = Event::Headers {
3939            list: req,
3940            more_frames: true,
3941        };
3942
3943        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
3944
3945        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
3946        assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
3947
3948        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
3949
3950        let resp = s.send_response(stream, true).unwrap();
3951
3952        let ev_headers = Event::Headers {
3953            list: resp,
3954            more_frames: false,
3955        };
3956
3957        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
3958        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
3959    }
3960
3961    #[test]
3962    /// Send a request with multiple DATA frames, get a response with no body.
3963    fn request_many_chunks_response_no_body() {
3964        let mut s = Session::new().unwrap();
3965        s.handshake().unwrap();
3966
3967        let (stream, req) = s.send_request(false).unwrap();
3968
3969        let total_data_frames = 4;
3970
3971        for _ in 0..total_data_frames - 1 {
3972            s.send_body_client(stream, false).unwrap();
3973        }
3974
3975        let body = s.send_body_client(stream, true).unwrap();
3976
3977        let mut recv_buf = vec![0; body.len()];
3978
3979        let ev_headers = Event::Headers {
3980            list: req,
3981            more_frames: true,
3982        };
3983
3984        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
3985        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
3986        assert_eq!(s.poll_server(), Err(Error::Done));
3987
3988        for _ in 0..total_data_frames {
3989            assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
3990        }
3991
3992        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
3993
3994        let resp = s.send_response(stream, true).unwrap();
3995
3996        let ev_headers = Event::Headers {
3997            list: resp,
3998            more_frames: false,
3999        };
4000
4001        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
4002        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
4003    }
4004
4005    #[test]
4006    /// Send a request with multiple DATA frames, get a response with one DATA
4007    /// frame.
4008    fn many_requests_many_chunks_response_one_chunk() {
4009        let mut s = Session::new().unwrap();
4010        s.handshake().unwrap();
4011
4012        let mut reqs = Vec::new();
4013
4014        let (stream1, req1) = s.send_request(false).unwrap();
4015        assert_eq!(stream1, 0);
4016        reqs.push(req1);
4017
4018        let (stream2, req2) = s.send_request(false).unwrap();
4019        assert_eq!(stream2, 4);
4020        reqs.push(req2);
4021
4022        let (stream3, req3) = s.send_request(false).unwrap();
4023        assert_eq!(stream3, 8);
4024        reqs.push(req3);
4025
4026        let body = s.send_body_client(stream1, false).unwrap();
4027        s.send_body_client(stream2, false).unwrap();
4028        s.send_body_client(stream3, false).unwrap();
4029
4030        let mut recv_buf = vec![0; body.len()];
4031
4032        // Reverse order of writes.
4033
4034        s.send_body_client(stream3, true).unwrap();
4035        s.send_body_client(stream2, true).unwrap();
4036        s.send_body_client(stream1, true).unwrap();
4037
4038        let (_, ev) = s.poll_server().unwrap();
4039        let ev_headers = Event::Headers {
4040            list: reqs[0].clone(),
4041            more_frames: true,
4042        };
4043        assert_eq!(ev, ev_headers);
4044
4045        let (_, ev) = s.poll_server().unwrap();
4046        let ev_headers = Event::Headers {
4047            list: reqs[1].clone(),
4048            more_frames: true,
4049        };
4050        assert_eq!(ev, ev_headers);
4051
4052        let (_, ev) = s.poll_server().unwrap();
4053        let ev_headers = Event::Headers {
4054            list: reqs[2].clone(),
4055            more_frames: true,
4056        };
4057        assert_eq!(ev, ev_headers);
4058
4059        assert_eq!(s.poll_server(), Ok((0, Event::Data)));
4060        assert_eq!(s.recv_body_server(0, &mut recv_buf), Ok(body.len()));
4061        assert_eq!(s.poll_client(), Err(Error::Done));
4062        assert_eq!(s.recv_body_server(0, &mut recv_buf), Ok(body.len()));
4063        assert_eq!(s.poll_server(), Ok((0, Event::Finished)));
4064
4065        assert_eq!(s.poll_server(), Ok((4, Event::Data)));
4066        assert_eq!(s.recv_body_server(4, &mut recv_buf), Ok(body.len()));
4067        assert_eq!(s.poll_client(), Err(Error::Done));
4068        assert_eq!(s.recv_body_server(4, &mut recv_buf), Ok(body.len()));
4069        assert_eq!(s.poll_server(), Ok((4, Event::Finished)));
4070
4071        assert_eq!(s.poll_server(), Ok((8, Event::Data)));
4072        assert_eq!(s.recv_body_server(8, &mut recv_buf), Ok(body.len()));
4073        assert_eq!(s.poll_client(), Err(Error::Done));
4074        assert_eq!(s.recv_body_server(8, &mut recv_buf), Ok(body.len()));
4075        assert_eq!(s.poll_server(), Ok((8, Event::Finished)));
4076
4077        assert_eq!(s.poll_server(), Err(Error::Done));
4078
4079        let mut resps = Vec::new();
4080
4081        let resp1 = s.send_response(stream1, true).unwrap();
4082        resps.push(resp1);
4083
4084        let resp2 = s.send_response(stream2, true).unwrap();
4085        resps.push(resp2);
4086
4087        let resp3 = s.send_response(stream3, true).unwrap();
4088        resps.push(resp3);
4089
4090        for _ in 0..resps.len() {
4091            let (stream, ev) = s.poll_client().unwrap();
4092            let ev_headers = Event::Headers {
4093                list: resps[(stream / 4) as usize].clone(),
4094                more_frames: false,
4095            };
4096            assert_eq!(ev, ev_headers);
4097            assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
4098        }
4099
4100        assert_eq!(s.poll_client(), Err(Error::Done));
4101    }
4102
4103    #[test]
4104    /// Send a request with no body, get a response with one DATA frame and an
4105    /// empty FIN after reception from the client.
4106    fn request_no_body_response_one_chunk_empty_fin() {
4107        let mut s = Session::new().unwrap();
4108        s.handshake().unwrap();
4109
4110        let (stream, req) = s.send_request(true).unwrap();
4111
4112        let ev_headers = Event::Headers {
4113            list: req,
4114            more_frames: false,
4115        };
4116
4117        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4118        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
4119
4120        let resp = s.send_response(stream, false).unwrap();
4121
4122        let body = s.send_body_server(stream, false).unwrap();
4123
4124        let mut recv_buf = vec![0; body.len()];
4125
4126        let ev_headers = Event::Headers {
4127            list: resp,
4128            more_frames: true,
4129        };
4130
4131        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
4132
4133        assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
4134        assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(body.len()));
4135
4136        assert_eq!(s.pipe.server.stream_send(stream, &[], true), Ok(0));
4137        s.advance().ok();
4138
4139        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
4140        assert_eq!(s.poll_client(), Err(Error::Done));
4141    }
4142
4143    #[test]
4144    /// Send a request with no body, get a response with no body followed by
4145    /// GREASE that is STREAM frame with a FIN.
4146    fn request_no_body_response_no_body_with_grease() {
4147        let mut s = Session::new().unwrap();
4148        s.handshake().unwrap();
4149
4150        let (stream, req) = s.send_request(true).unwrap();
4151
4152        assert_eq!(stream, 0);
4153
4154        let ev_headers = Event::Headers {
4155            list: req,
4156            more_frames: false,
4157        };
4158
4159        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4160        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
4161
4162        let resp = s.send_response(stream, false).unwrap();
4163
4164        let ev_headers = Event::Headers {
4165            list: resp,
4166            more_frames: true,
4167        };
4168
4169        // Inject a GREASE frame
4170        let mut d = [42; 10];
4171        let mut b = octets::OctetsMut::with_slice(&mut d);
4172
4173        let frame_type = b.put_varint(148_764_065_110_560_899).unwrap();
4174        s.pipe.server.stream_send(0, frame_type, false).unwrap();
4175
4176        let frame_len = b.put_varint(10).unwrap();
4177        s.pipe.server.stream_send(0, frame_len, false).unwrap();
4178
4179        s.pipe.server.stream_send(0, &d, true).unwrap();
4180
4181        s.advance().ok();
4182
4183        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
4184        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
4185        assert_eq!(s.poll_client(), Err(Error::Done));
4186    }
4187
4188    #[test]
4189    /// Try to send DATA frames before HEADERS.
4190    fn body_response_before_headers() {
4191        let mut s = Session::new().unwrap();
4192        s.handshake().unwrap();
4193
4194        let (stream, req) = s.send_request(true).unwrap();
4195        assert_eq!(stream, 0);
4196
4197        let ev_headers = Event::Headers {
4198            list: req,
4199            more_frames: false,
4200        };
4201
4202        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4203
4204        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
4205
4206        assert_eq!(
4207            s.send_body_server(stream, true),
4208            Err(Error::FrameUnexpected)
4209        );
4210
4211        assert_eq!(s.poll_client(), Err(Error::Done));
4212    }
4213
4214    #[test]
4215    /// Try to send DATA frames on wrong streams, ensure the API returns an
4216    /// error before anything hits the transport layer.
4217    fn send_body_invalid_client_stream() {
4218        let mut s = Session::new().unwrap();
4219        s.handshake().unwrap();
4220
4221        assert_eq!(s.send_body_client(0, true), Err(Error::FrameUnexpected));
4222
4223        assert_eq!(
4224            s.send_body_client(s.client.control_stream_id.unwrap(), true),
4225            Err(Error::FrameUnexpected)
4226        );
4227
4228        assert_eq!(
4229            s.send_body_client(
4230                s.client.local_qpack_streams.encoder_stream_id.unwrap(),
4231                true
4232            ),
4233            Err(Error::FrameUnexpected)
4234        );
4235
4236        assert_eq!(
4237            s.send_body_client(
4238                s.client.local_qpack_streams.decoder_stream_id.unwrap(),
4239                true
4240            ),
4241            Err(Error::FrameUnexpected)
4242        );
4243
4244        assert_eq!(
4245            s.send_body_client(s.client.peer_control_stream_id.unwrap(), true),
4246            Err(Error::FrameUnexpected)
4247        );
4248
4249        assert_eq!(
4250            s.send_body_client(
4251                s.client.peer_qpack_streams.encoder_stream_id.unwrap(),
4252                true
4253            ),
4254            Err(Error::FrameUnexpected)
4255        );
4256
4257        assert_eq!(
4258            s.send_body_client(
4259                s.client.peer_qpack_streams.decoder_stream_id.unwrap(),
4260                true
4261            ),
4262            Err(Error::FrameUnexpected)
4263        );
4264    }
4265
4266    #[test]
4267    /// Try to send DATA frames on wrong streams, ensure the API returns an
4268    /// error before anything hits the transport layer.
4269    fn send_body_invalid_server_stream() {
4270        let mut s = Session::new().unwrap();
4271        s.handshake().unwrap();
4272
4273        assert_eq!(s.send_body_server(0, true), Err(Error::FrameUnexpected));
4274
4275        assert_eq!(
4276            s.send_body_server(s.server.control_stream_id.unwrap(), true),
4277            Err(Error::FrameUnexpected)
4278        );
4279
4280        assert_eq!(
4281            s.send_body_server(
4282                s.server.local_qpack_streams.encoder_stream_id.unwrap(),
4283                true
4284            ),
4285            Err(Error::FrameUnexpected)
4286        );
4287
4288        assert_eq!(
4289            s.send_body_server(
4290                s.server.local_qpack_streams.decoder_stream_id.unwrap(),
4291                true
4292            ),
4293            Err(Error::FrameUnexpected)
4294        );
4295
4296        assert_eq!(
4297            s.send_body_server(s.server.peer_control_stream_id.unwrap(), true),
4298            Err(Error::FrameUnexpected)
4299        );
4300
4301        assert_eq!(
4302            s.send_body_server(
4303                s.server.peer_qpack_streams.encoder_stream_id.unwrap(),
4304                true
4305            ),
4306            Err(Error::FrameUnexpected)
4307        );
4308
4309        assert_eq!(
4310            s.send_body_server(
4311                s.server.peer_qpack_streams.decoder_stream_id.unwrap(),
4312                true
4313            ),
4314            Err(Error::FrameUnexpected)
4315        );
4316    }
4317
4318    #[test]
4319    /// Client sends request with body and trailers.
4320    fn trailers() {
4321        let mut s = Session::new().unwrap();
4322        s.handshake().unwrap();
4323
4324        let (stream, req) = s.send_request(false).unwrap();
4325
4326        let body = s.send_body_client(stream, false).unwrap();
4327
4328        let mut recv_buf = vec![0; body.len()];
4329
4330        let req_trailers = vec![Header::new(b"foo", b"bar")];
4331
4332        s.client
4333            .send_additional_headers(
4334                &mut s.pipe.client,
4335                stream,
4336                &req_trailers,
4337                true,
4338                true,
4339            )
4340            .unwrap();
4341
4342        s.advance().ok();
4343
4344        let ev_headers = Event::Headers {
4345            list: req,
4346            more_frames: true,
4347        };
4348
4349        let ev_trailers = Event::Headers {
4350            list: req_trailers,
4351            more_frames: false,
4352        };
4353
4354        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4355
4356        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
4357        assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
4358
4359        assert_eq!(s.poll_server(), Ok((stream, ev_trailers)));
4360        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
4361    }
4362
4363    #[test]
4364    /// Server responds with a 103, then a 200 with no body.
4365    fn informational_response() {
4366        let mut s = Session::new().unwrap();
4367        s.handshake().unwrap();
4368
4369        let (stream, req) = s.send_request(true).unwrap();
4370
4371        assert_eq!(stream, 0);
4372
4373        let ev_headers = Event::Headers {
4374            list: req,
4375            more_frames: false,
4376        };
4377
4378        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4379        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
4380
4381        let info_resp = vec![
4382            Header::new(b":status", b"103"),
4383            Header::new(b"link", b"<https://example.com>; rel=\"preconnect\""),
4384        ];
4385
4386        let resp = vec![
4387            Header::new(b":status", b"200"),
4388            Header::new(b"server", b"quiche-test"),
4389        ];
4390
4391        s.server
4392            .send_response(&mut s.pipe.server, stream, &info_resp, false)
4393            .unwrap();
4394
4395        s.server
4396            .send_additional_headers(
4397                &mut s.pipe.server,
4398                stream,
4399                &resp,
4400                false,
4401                true,
4402            )
4403            .unwrap();
4404
4405        s.advance().ok();
4406
4407        let ev_info_headers = Event::Headers {
4408            list: info_resp,
4409            more_frames: true,
4410        };
4411
4412        let ev_headers = Event::Headers {
4413            list: resp,
4414            more_frames: false,
4415        };
4416
4417        assert_eq!(s.poll_client(), Ok((stream, ev_info_headers)));
4418        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
4419        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
4420        assert_eq!(s.poll_client(), Err(Error::Done));
4421    }
4422
4423    #[test]
4424    /// Server responds with a 103, then attempts to send a 200 using
4425    /// send_response again, which should fail.
4426    fn no_multiple_response() {
4427        let mut s = Session::new().unwrap();
4428        s.handshake().unwrap();
4429
4430        let (stream, req) = s.send_request(true).unwrap();
4431
4432        assert_eq!(stream, 0);
4433
4434        let ev_headers = Event::Headers {
4435            list: req,
4436            more_frames: false,
4437        };
4438
4439        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4440        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
4441
4442        let info_resp = vec![
4443            Header::new(b":status", b"103"),
4444            Header::new(b"link", b"<https://example.com>; rel=\"preconnect\""),
4445        ];
4446
4447        let resp = vec![
4448            Header::new(b":status", b"200"),
4449            Header::new(b"server", b"quiche-test"),
4450        ];
4451
4452        s.server
4453            .send_response(&mut s.pipe.server, stream, &info_resp, false)
4454            .unwrap();
4455
4456        assert_eq!(
4457            Err(Error::FrameUnexpected),
4458            s.server
4459                .send_response(&mut s.pipe.server, stream, &resp, true)
4460        );
4461
4462        s.advance().ok();
4463
4464        let ev_info_headers = Event::Headers {
4465            list: info_resp,
4466            more_frames: true,
4467        };
4468
4469        assert_eq!(s.poll_client(), Ok((stream, ev_info_headers)));
4470        assert_eq!(s.poll_client(), Err(Error::Done));
4471    }
4472
4473    #[test]
4474    /// Server attempts to use send_additional_headers before initial response.
4475    fn no_send_additional_before_initial_response() {
4476        let mut s = Session::new().unwrap();
4477        s.handshake().unwrap();
4478
4479        let (stream, req) = s.send_request(true).unwrap();
4480
4481        assert_eq!(stream, 0);
4482
4483        let ev_headers = Event::Headers {
4484            list: req,
4485            more_frames: false,
4486        };
4487
4488        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4489        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
4490
4491        let info_resp = vec![
4492            Header::new(b":status", b"103"),
4493            Header::new(b"link", b"<https://example.com>; rel=\"preconnect\""),
4494        ];
4495
4496        assert_eq!(
4497            Err(Error::FrameUnexpected),
4498            s.server.send_additional_headers(
4499                &mut s.pipe.server,
4500                stream,
4501                &info_resp,
4502                false,
4503                false
4504            )
4505        );
4506
4507        s.advance().ok();
4508
4509        assert_eq!(s.poll_client(), Err(Error::Done));
4510    }
4511
4512    #[test]
4513    /// Client sends multiple HEADERS before data.
4514    fn additional_headers_before_data_client() {
4515        let mut s = Session::new().unwrap();
4516        s.handshake().unwrap();
4517
4518        let (stream, req) = s.send_request(false).unwrap();
4519
4520        let req_trailer = vec![Header::new(b"goodbye", b"world")];
4521
4522        assert_eq!(
4523            s.client.send_additional_headers(
4524                &mut s.pipe.client,
4525                stream,
4526                &req_trailer,
4527                true,
4528                false
4529            ),
4530            Ok(())
4531        );
4532
4533        s.advance().ok();
4534
4535        let ev_initial_headers = Event::Headers {
4536            list: req,
4537            more_frames: true,
4538        };
4539
4540        let ev_trailing_headers = Event::Headers {
4541            list: req_trailer,
4542            more_frames: true,
4543        };
4544
4545        assert_eq!(s.poll_server(), Ok((stream, ev_initial_headers)));
4546        assert_eq!(s.poll_server(), Ok((stream, ev_trailing_headers)));
4547        assert_eq!(s.poll_server(), Err(Error::Done));
4548    }
4549
4550    #[test]
4551    /// Client sends multiple HEADERS before data.
4552    fn data_after_trailers_client() {
4553        let mut s = Session::new().unwrap();
4554        s.handshake().unwrap();
4555
4556        let (stream, req) = s.send_request(false).unwrap();
4557
4558        let body = s.send_body_client(stream, false).unwrap();
4559
4560        let mut recv_buf = vec![0; body.len()];
4561
4562        let req_trailers = vec![Header::new(b"foo", b"bar")];
4563
4564        s.client
4565            .send_additional_headers(
4566                &mut s.pipe.client,
4567                stream,
4568                &req_trailers,
4569                true,
4570                false,
4571            )
4572            .unwrap();
4573
4574        s.advance().ok();
4575
4576        s.send_frame_client(
4577            frame::Frame::Data {
4578                payload: vec![1, 2, 3, 4],
4579            },
4580            stream,
4581            true,
4582        )
4583        .unwrap();
4584
4585        let ev_headers = Event::Headers {
4586            list: req,
4587            more_frames: true,
4588        };
4589
4590        let ev_trailers = Event::Headers {
4591            list: req_trailers,
4592            more_frames: true,
4593        };
4594
4595        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4596        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
4597        assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
4598        assert_eq!(s.poll_server(), Ok((stream, ev_trailers)));
4599        assert_eq!(s.poll_server(), Err(Error::FrameUnexpected));
4600    }
4601
4602    #[test]
4603    /// Send a MAX_PUSH_ID frame from the client on a valid stream.
4604    fn max_push_id_from_client_good() {
4605        let mut s = Session::new().unwrap();
4606        s.handshake().unwrap();
4607
4608        s.send_frame_client(
4609            frame::Frame::MaxPushId { push_id: 1 },
4610            s.client.control_stream_id.unwrap(),
4611            false,
4612        )
4613        .unwrap();
4614
4615        assert_eq!(s.poll_server(), Err(Error::Done));
4616    }
4617
4618    #[test]
4619    /// Send a MAX_PUSH_ID frame from the client on an invalid stream.
4620    fn max_push_id_from_client_bad_stream() {
4621        let mut s = Session::new().unwrap();
4622        s.handshake().unwrap();
4623
4624        let (stream, req) = s.send_request(false).unwrap();
4625
4626        s.send_frame_client(
4627            frame::Frame::MaxPushId { push_id: 2 },
4628            stream,
4629            false,
4630        )
4631        .unwrap();
4632
4633        let ev_headers = Event::Headers {
4634            list: req,
4635            more_frames: true,
4636        };
4637
4638        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4639        assert_eq!(s.poll_server(), Err(Error::FrameUnexpected));
4640    }
4641
4642    #[test]
4643    /// Send a sequence of MAX_PUSH_ID frames from the client that attempt to
4644    /// reduce the limit.
4645    fn max_push_id_from_client_limit_reduction() {
4646        let mut s = Session::new().unwrap();
4647        s.handshake().unwrap();
4648
4649        s.send_frame_client(
4650            frame::Frame::MaxPushId { push_id: 2 },
4651            s.client.control_stream_id.unwrap(),
4652            false,
4653        )
4654        .unwrap();
4655
4656        s.send_frame_client(
4657            frame::Frame::MaxPushId { push_id: 1 },
4658            s.client.control_stream_id.unwrap(),
4659            false,
4660        )
4661        .unwrap();
4662
4663        assert_eq!(s.poll_server(), Err(Error::IdError));
4664    }
4665
4666    #[test]
4667    /// Send a MAX_PUSH_ID frame from the server, which is forbidden.
4668    fn max_push_id_from_server() {
4669        let mut s = Session::new().unwrap();
4670        s.handshake().unwrap();
4671
4672        s.send_frame_server(
4673            frame::Frame::MaxPushId { push_id: 1 },
4674            s.server.control_stream_id.unwrap(),
4675            false,
4676        )
4677        .unwrap();
4678
4679        assert_eq!(s.poll_client(), Err(Error::FrameUnexpected));
4680    }
4681
4682    #[test]
4683    /// Send a PUSH_PROMISE frame from the client, which is forbidden.
4684    fn push_promise_from_client() {
4685        let mut s = Session::new().unwrap();
4686        s.handshake().unwrap();
4687
4688        let (stream, req) = s.send_request(false).unwrap();
4689
4690        let header_block = s.client.encode_header_block(&req).unwrap();
4691
4692        s.send_frame_client(
4693            frame::Frame::PushPromise {
4694                push_id: 1,
4695                header_block,
4696            },
4697            stream,
4698            false,
4699        )
4700        .unwrap();
4701
4702        let ev_headers = Event::Headers {
4703            list: req,
4704            more_frames: true,
4705        };
4706
4707        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4708        assert_eq!(s.poll_server(), Err(Error::FrameUnexpected));
4709    }
4710
4711    #[test]
4712    /// Send a CANCEL_PUSH frame from the client.
4713    fn cancel_push_from_client() {
4714        let mut s = Session::new().unwrap();
4715        s.handshake().unwrap();
4716
4717        s.send_frame_client(
4718            frame::Frame::CancelPush { push_id: 1 },
4719            s.client.control_stream_id.unwrap(),
4720            false,
4721        )
4722        .unwrap();
4723
4724        assert_eq!(s.poll_server(), Err(Error::Done));
4725    }
4726
4727    #[test]
4728    /// Send a CANCEL_PUSH frame from the client on an invalid stream.
4729    fn cancel_push_from_client_bad_stream() {
4730        let mut s = Session::new().unwrap();
4731        s.handshake().unwrap();
4732
4733        let (stream, req) = s.send_request(false).unwrap();
4734
4735        s.send_frame_client(
4736            frame::Frame::CancelPush { push_id: 2 },
4737            stream,
4738            false,
4739        )
4740        .unwrap();
4741
4742        let ev_headers = Event::Headers {
4743            list: req,
4744            more_frames: true,
4745        };
4746
4747        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4748        assert_eq!(s.poll_server(), Err(Error::FrameUnexpected));
4749    }
4750
4751    #[test]
4752    /// Send a CANCEL_PUSH frame from the client.
4753    fn cancel_push_from_server() {
4754        let mut s = Session::new().unwrap();
4755        s.handshake().unwrap();
4756
4757        s.send_frame_server(
4758            frame::Frame::CancelPush { push_id: 1 },
4759            s.server.control_stream_id.unwrap(),
4760            false,
4761        )
4762        .unwrap();
4763
4764        assert_eq!(s.poll_client(), Err(Error::Done));
4765    }
4766
4767    #[test]
4768    /// Send a GOAWAY frame from the client.
4769    fn goaway_from_client_good() {
4770        let mut s = Session::new().unwrap();
4771        s.handshake().unwrap();
4772
4773        s.client.send_goaway(&mut s.pipe.client, 100).unwrap();
4774
4775        s.advance().ok();
4776
4777        // TODO: server push
4778        assert_eq!(s.poll_server(), Ok((0, Event::GoAway)));
4779    }
4780
4781    #[test]
4782    /// Send a GOAWAY frame from the server.
4783    fn goaway_from_server_good() {
4784        let mut s = Session::new().unwrap();
4785        s.handshake().unwrap();
4786
4787        s.server.send_goaway(&mut s.pipe.server, 4000).unwrap();
4788
4789        s.advance().ok();
4790
4791        assert_eq!(s.poll_client(), Ok((4000, Event::GoAway)));
4792    }
4793
4794    #[test]
4795    /// A client MUST NOT send a request after it receives GOAWAY.
4796    fn client_request_after_goaway() {
4797        let mut s = Session::new().unwrap();
4798        s.handshake().unwrap();
4799
4800        s.server.send_goaway(&mut s.pipe.server, 4000).unwrap();
4801
4802        s.advance().ok();
4803
4804        assert_eq!(s.poll_client(), Ok((4000, Event::GoAway)));
4805
4806        assert_eq!(s.send_request(true), Err(Error::FrameUnexpected));
4807    }
4808
4809    #[test]
4810    /// Send a GOAWAY frame from the server, using an invalid goaway ID.
4811    fn goaway_from_server_invalid_id() {
4812        let mut s = Session::new().unwrap();
4813        s.handshake().unwrap();
4814
4815        s.send_frame_server(
4816            frame::Frame::GoAway { id: 1 },
4817            s.server.control_stream_id.unwrap(),
4818            false,
4819        )
4820        .unwrap();
4821
4822        assert_eq!(s.poll_client(), Err(Error::IdError));
4823    }
4824
4825    #[test]
4826    /// Send multiple GOAWAY frames from the server, that increase the goaway
4827    /// ID.
4828    fn goaway_from_server_increase_id() {
4829        let mut s = Session::new().unwrap();
4830        s.handshake().unwrap();
4831
4832        s.send_frame_server(
4833            frame::Frame::GoAway { id: 0 },
4834            s.server.control_stream_id.unwrap(),
4835            false,
4836        )
4837        .unwrap();
4838
4839        s.send_frame_server(
4840            frame::Frame::GoAway { id: 4 },
4841            s.server.control_stream_id.unwrap(),
4842            false,
4843        )
4844        .unwrap();
4845
4846        assert_eq!(s.poll_client(), Ok((0, Event::GoAway)));
4847
4848        assert_eq!(s.poll_client(), Err(Error::IdError));
4849    }
4850
4851    #[test]
4852    #[cfg(feature = "sfv")]
4853    fn parse_priority_field_value() {
4854        // Legal dicts
4855        assert_eq!(
4856            Ok(Priority::new(0, false)),
4857            Priority::try_from(b"u=0".as_slice())
4858        );
4859        assert_eq!(
4860            Ok(Priority::new(3, false)),
4861            Priority::try_from(b"u=3".as_slice())
4862        );
4863        assert_eq!(
4864            Ok(Priority::new(7, false)),
4865            Priority::try_from(b"u=7".as_slice())
4866        );
4867
4868        assert_eq!(
4869            Ok(Priority::new(0, true)),
4870            Priority::try_from(b"u=0, i".as_slice())
4871        );
4872        assert_eq!(
4873            Ok(Priority::new(3, true)),
4874            Priority::try_from(b"u=3, i".as_slice())
4875        );
4876        assert_eq!(
4877            Ok(Priority::new(7, true)),
4878            Priority::try_from(b"u=7, i".as_slice())
4879        );
4880
4881        assert_eq!(
4882            Ok(Priority::new(0, true)),
4883            Priority::try_from(b"u=0, i=?1".as_slice())
4884        );
4885        assert_eq!(
4886            Ok(Priority::new(3, true)),
4887            Priority::try_from(b"u=3, i=?1".as_slice())
4888        );
4889        assert_eq!(
4890            Ok(Priority::new(7, true)),
4891            Priority::try_from(b"u=7, i=?1".as_slice())
4892        );
4893
4894        assert_eq!(
4895            Ok(Priority::new(3, false)),
4896            Priority::try_from(b"".as_slice())
4897        );
4898
4899        assert_eq!(
4900            Ok(Priority::new(0, true)),
4901            Priority::try_from(b"u=0;foo, i;bar".as_slice())
4902        );
4903        assert_eq!(
4904            Ok(Priority::new(3, true)),
4905            Priority::try_from(b"u=3;hello, i;world".as_slice())
4906        );
4907        assert_eq!(
4908            Ok(Priority::new(7, true)),
4909            Priority::try_from(b"u=7;croeso, i;gymru".as_slice())
4910        );
4911
4912        assert_eq!(
4913            Ok(Priority::new(0, true)),
4914            Priority::try_from(b"u=0, i, spinaltap=11".as_slice())
4915        );
4916
4917        // Illegal formats
4918        assert_eq!(Err(Error::Done), Priority::try_from(b"0".as_slice()));
4919        assert_eq!(
4920            Ok(Priority::new(7, false)),
4921            Priority::try_from(b"u=-1".as_slice())
4922        );
4923        assert_eq!(Err(Error::Done), Priority::try_from(b"u=0.2".as_slice()));
4924        assert_eq!(
4925            Ok(Priority::new(7, false)),
4926            Priority::try_from(b"u=100".as_slice())
4927        );
4928        assert_eq!(
4929            Err(Error::Done),
4930            Priority::try_from(b"u=3, i=true".as_slice())
4931        );
4932
4933        // Trailing comma in dict is malformed
4934        assert_eq!(Err(Error::Done), Priority::try_from(b"u=7, ".as_slice()));
4935    }
4936
4937    #[test]
4938    /// Send a PRIORITY_UPDATE for request stream from the client.
4939    fn priority_update_request() {
4940        let mut s = Session::new().unwrap();
4941        s.handshake().unwrap();
4942
4943        s.client
4944            .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
4945                urgency: 3,
4946                incremental: false,
4947            })
4948            .unwrap();
4949        s.advance().ok();
4950
4951        assert_eq!(s.poll_server(), Ok((0, Event::PriorityUpdate)));
4952        assert_eq!(s.poll_server(), Err(Error::Done));
4953    }
4954
4955    #[test]
4956    /// Send a PRIORITY_UPDATE for request stream from the client.
4957    fn priority_update_single_stream_rearm() {
4958        let mut s = Session::new().unwrap();
4959        s.handshake().unwrap();
4960
4961        s.client
4962            .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
4963                urgency: 3,
4964                incremental: false,
4965            })
4966            .unwrap();
4967        s.advance().ok();
4968
4969        assert_eq!(s.poll_server(), Ok((0, Event::PriorityUpdate)));
4970        assert_eq!(s.poll_server(), Err(Error::Done));
4971
4972        s.client
4973            .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
4974                urgency: 5,
4975                incremental: false,
4976            })
4977            .unwrap();
4978        s.advance().ok();
4979
4980        assert_eq!(s.poll_server(), Err(Error::Done));
4981
4982        // There is only one PRIORITY_UPDATE frame to read. Once read, the event
4983        // will rearm ready for more.
4984        assert_eq!(s.server.take_last_priority_update(0), Ok(b"u=5".to_vec()));
4985        assert_eq!(s.server.take_last_priority_update(0), Err(Error::Done));
4986
4987        s.client
4988            .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
4989                urgency: 7,
4990                incremental: false,
4991            })
4992            .unwrap();
4993        s.advance().ok();
4994
4995        assert_eq!(s.poll_server(), Ok((0, Event::PriorityUpdate)));
4996        assert_eq!(s.poll_server(), Err(Error::Done));
4997
4998        assert_eq!(s.server.take_last_priority_update(0), Ok(b"u=7".to_vec()));
4999        assert_eq!(s.server.take_last_priority_update(0), Err(Error::Done));
5000    }
5001
5002    #[test]
5003    /// Send multiple PRIORITY_UPDATE frames for different streams from the
5004    /// client across multiple flights of exchange.
5005    fn priority_update_request_multiple_stream_arm_multiple_flights() {
5006        let mut s = Session::new().unwrap();
5007        s.handshake().unwrap();
5008
5009        s.client
5010            .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
5011                urgency: 3,
5012                incremental: false,
5013            })
5014            .unwrap();
5015        s.advance().ok();
5016
5017        assert_eq!(s.poll_server(), Ok((0, Event::PriorityUpdate)));
5018        assert_eq!(s.poll_server(), Err(Error::Done));
5019
5020        s.client
5021            .send_priority_update_for_request(&mut s.pipe.client, 4, &Priority {
5022                urgency: 1,
5023                incremental: false,
5024            })
5025            .unwrap();
5026        s.advance().ok();
5027
5028        assert_eq!(s.poll_server(), Ok((4, Event::PriorityUpdate)));
5029        assert_eq!(s.poll_server(), Err(Error::Done));
5030
5031        s.client
5032            .send_priority_update_for_request(&mut s.pipe.client, 8, &Priority {
5033                urgency: 2,
5034                incremental: false,
5035            })
5036            .unwrap();
5037        s.advance().ok();
5038
5039        assert_eq!(s.poll_server(), Ok((8, Event::PriorityUpdate)));
5040        assert_eq!(s.poll_server(), Err(Error::Done));
5041
5042        assert_eq!(s.server.take_last_priority_update(0), Ok(b"u=3".to_vec()));
5043        assert_eq!(s.server.take_last_priority_update(4), Ok(b"u=1".to_vec()));
5044        assert_eq!(s.server.take_last_priority_update(8), Ok(b"u=2".to_vec()));
5045        assert_eq!(s.server.take_last_priority_update(0), Err(Error::Done));
5046    }
5047
5048    #[test]
5049    /// Send multiple PRIORITY_UPDATE frames for different streams from the
5050    /// client across a single flight.
5051    fn priority_update_request_multiple_stream_arm_single_flight() {
5052        let mut s = Session::new().unwrap();
5053        s.handshake().unwrap();
5054
5055        let mut d = [42; 65535];
5056
5057        let mut b = octets::OctetsMut::with_slice(&mut d);
5058
5059        let p1 = frame::Frame::PriorityUpdateRequest {
5060            prioritized_element_id: 0,
5061            priority_field_value: b"u=3".to_vec(),
5062        };
5063
5064        let p2 = frame::Frame::PriorityUpdateRequest {
5065            prioritized_element_id: 4,
5066            priority_field_value: b"u=3".to_vec(),
5067        };
5068
5069        let p3 = frame::Frame::PriorityUpdateRequest {
5070            prioritized_element_id: 8,
5071            priority_field_value: b"u=3".to_vec(),
5072        };
5073
5074        p1.to_bytes(&mut b).unwrap();
5075        p2.to_bytes(&mut b).unwrap();
5076        p3.to_bytes(&mut b).unwrap();
5077
5078        let off = b.off();
5079        s.pipe
5080            .client
5081            .stream_send(s.client.control_stream_id.unwrap(), &d[..off], false)
5082            .unwrap();
5083
5084        s.advance().ok();
5085
5086        assert_eq!(s.poll_server(), Ok((0, Event::PriorityUpdate)));
5087        assert_eq!(s.poll_server(), Ok((4, Event::PriorityUpdate)));
5088        assert_eq!(s.poll_server(), Ok((8, Event::PriorityUpdate)));
5089        assert_eq!(s.poll_server(), Err(Error::Done));
5090
5091        assert_eq!(s.server.take_last_priority_update(0), Ok(b"u=3".to_vec()));
5092        assert_eq!(s.server.take_last_priority_update(4), Ok(b"u=3".to_vec()));
5093        assert_eq!(s.server.take_last_priority_update(8), Ok(b"u=3".to_vec()));
5094
5095        assert_eq!(s.server.take_last_priority_update(0), Err(Error::Done));
5096    }
5097
5098    #[test]
5099    /// Send a PRIORITY_UPDATE for a request stream, before and after the stream
5100    /// has been completed.
5101    fn priority_update_request_collected_completed() {
5102        let mut s = Session::new().unwrap();
5103        s.handshake().unwrap();
5104
5105        s.client
5106            .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
5107                urgency: 3,
5108                incremental: false,
5109            })
5110            .unwrap();
5111        s.advance().ok();
5112
5113        let (stream, req) = s.send_request(true).unwrap();
5114        let ev_headers = Event::Headers {
5115            list: req,
5116            more_frames: false,
5117        };
5118
5119        // Priority event is generated before request headers.
5120        assert_eq!(s.poll_server(), Ok((0, Event::PriorityUpdate)));
5121        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
5122        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
5123        assert_eq!(s.poll_server(), Err(Error::Done));
5124
5125        assert_eq!(s.server.take_last_priority_update(0), Ok(b"u=3".to_vec()));
5126        assert_eq!(s.server.take_last_priority_update(0), Err(Error::Done));
5127
5128        let resp = s.send_response(stream, true).unwrap();
5129
5130        let ev_headers = Event::Headers {
5131            list: resp,
5132            more_frames: false,
5133        };
5134
5135        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
5136        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
5137        assert_eq!(s.poll_client(), Err(Error::Done));
5138
5139        // Now send a PRIORITY_UPDATE for the completed request stream.
5140        s.client
5141            .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
5142                urgency: 3,
5143                incremental: false,
5144            })
5145            .unwrap();
5146        s.advance().ok();
5147
5148        // No event generated at server
5149        assert_eq!(s.poll_server(), Err(Error::Done));
5150    }
5151
5152    #[test]
5153    /// Send a PRIORITY_UPDATE for a request stream, before and after the stream
5154    /// has been stopped.
5155    fn priority_update_request_collected_stopped() {
5156        let mut s = Session::new().unwrap();
5157        s.handshake().unwrap();
5158
5159        s.client
5160            .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
5161                urgency: 3,
5162                incremental: false,
5163            })
5164            .unwrap();
5165        s.advance().ok();
5166
5167        let (stream, req) = s.send_request(false).unwrap();
5168        let ev_headers = Event::Headers {
5169            list: req,
5170            more_frames: true,
5171        };
5172
5173        // Priority event is generated before request headers.
5174        assert_eq!(s.poll_server(), Ok((0, Event::PriorityUpdate)));
5175        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
5176        assert_eq!(s.poll_server(), Err(Error::Done));
5177
5178        assert_eq!(s.server.take_last_priority_update(0), Ok(b"u=3".to_vec()));
5179        assert_eq!(s.server.take_last_priority_update(0), Err(Error::Done));
5180
5181        s.pipe
5182            .client
5183            .stream_shutdown(stream, crate::Shutdown::Write, 0x100)
5184            .unwrap();
5185        s.pipe
5186            .client
5187            .stream_shutdown(stream, crate::Shutdown::Read, 0x100)
5188            .unwrap();
5189
5190        s.advance().ok();
5191
5192        assert_eq!(s.poll_server(), Ok((0, Event::Reset(0x100))));
5193        assert_eq!(s.poll_server(), Err(Error::Done));
5194
5195        // Now send a PRIORITY_UPDATE for the closed request stream.
5196        s.client
5197            .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
5198                urgency: 3,
5199                incremental: false,
5200            })
5201            .unwrap();
5202        s.advance().ok();
5203
5204        // No event generated at server
5205        assert_eq!(s.poll_server(), Err(Error::Done));
5206
5207        assert!(s.pipe.server.streams.is_collected(0));
5208        assert!(s.pipe.client.streams.is_collected(0));
5209    }
5210
5211    #[test]
5212    /// Send a PRIORITY_UPDATE for push stream from the client.
5213    fn priority_update_push() {
5214        let mut s = Session::new().unwrap();
5215        s.handshake().unwrap();
5216
5217        s.send_frame_client(
5218            frame::Frame::PriorityUpdatePush {
5219                prioritized_element_id: 3,
5220                priority_field_value: b"u=3".to_vec(),
5221            },
5222            s.client.control_stream_id.unwrap(),
5223            false,
5224        )
5225        .unwrap();
5226
5227        assert_eq!(s.poll_server(), Err(Error::Done));
5228    }
5229
5230    #[test]
5231    /// Send a PRIORITY_UPDATE for request stream from the client but for an
5232    /// incorrect stream type.
5233    fn priority_update_request_bad_stream() {
5234        let mut s = Session::new().unwrap();
5235        s.handshake().unwrap();
5236
5237        s.send_frame_client(
5238            frame::Frame::PriorityUpdateRequest {
5239                prioritized_element_id: 5,
5240                priority_field_value: b"u=3".to_vec(),
5241            },
5242            s.client.control_stream_id.unwrap(),
5243            false,
5244        )
5245        .unwrap();
5246
5247        assert_eq!(s.poll_server(), Err(Error::FrameUnexpected));
5248    }
5249
5250    #[test]
5251    /// Send a PRIORITY_UPDATE for push stream from the client but for an
5252    /// incorrect stream type.
5253    fn priority_update_push_bad_stream() {
5254        let mut s = Session::new().unwrap();
5255        s.handshake().unwrap();
5256
5257        s.send_frame_client(
5258            frame::Frame::PriorityUpdatePush {
5259                prioritized_element_id: 5,
5260                priority_field_value: b"u=3".to_vec(),
5261            },
5262            s.client.control_stream_id.unwrap(),
5263            false,
5264        )
5265        .unwrap();
5266
5267        assert_eq!(s.poll_server(), Err(Error::FrameUnexpected));
5268    }
5269
5270    #[test]
5271    /// Send a PRIORITY_UPDATE for request stream from the server.
5272    fn priority_update_request_from_server() {
5273        let mut s = Session::new().unwrap();
5274        s.handshake().unwrap();
5275
5276        s.send_frame_server(
5277            frame::Frame::PriorityUpdateRequest {
5278                prioritized_element_id: 0,
5279                priority_field_value: b"u=3".to_vec(),
5280            },
5281            s.server.control_stream_id.unwrap(),
5282            false,
5283        )
5284        .unwrap();
5285
5286        assert_eq!(s.poll_client(), Err(Error::FrameUnexpected));
5287    }
5288
5289    #[test]
5290    /// Send a PRIORITY_UPDATE for request stream from the server.
5291    fn priority_update_push_from_server() {
5292        let mut s = Session::new().unwrap();
5293        s.handshake().unwrap();
5294
5295        s.send_frame_server(
5296            frame::Frame::PriorityUpdatePush {
5297                prioritized_element_id: 0,
5298                priority_field_value: b"u=3".to_vec(),
5299            },
5300            s.server.control_stream_id.unwrap(),
5301            false,
5302        )
5303        .unwrap();
5304
5305        assert_eq!(s.poll_client(), Err(Error::FrameUnexpected));
5306    }
5307
5308    #[test]
5309    /// Ensure quiche allocates streams for client and server roles as expected.
5310    fn uni_stream_local_counting() {
5311        let config = Config::new().unwrap();
5312
5313        let h3_cln = Connection::new(&config, false, false).unwrap();
5314        assert_eq!(h3_cln.next_uni_stream_id, 2);
5315
5316        let h3_srv = Connection::new(&config, true, false).unwrap();
5317        assert_eq!(h3_srv.next_uni_stream_id, 3);
5318    }
5319
5320    #[test]
5321    /// Client opens multiple control streams, which is forbidden.
5322    fn open_multiple_control_streams() {
5323        let mut s = Session::new().unwrap();
5324        s.handshake().unwrap();
5325
5326        let stream_id = s.client.next_uni_stream_id;
5327
5328        let mut d = [42; 8];
5329        let mut b = octets::OctetsMut::with_slice(&mut d);
5330
5331        s.pipe
5332            .client
5333            .stream_send(
5334                stream_id,
5335                b.put_varint(stream::HTTP3_CONTROL_STREAM_TYPE_ID).unwrap(),
5336                false,
5337            )
5338            .unwrap();
5339
5340        s.advance().ok();
5341
5342        assert_eq!(s.poll_server(), Err(Error::StreamCreationError));
5343    }
5344
5345    #[test]
5346    /// Client closes the control stream, which is forbidden.
5347    fn close_control_stream_after_type() {
5348        let mut s = Session::new().unwrap();
5349        s.handshake().unwrap();
5350
5351        s.pipe
5352            .client
5353            .stream_send(s.client.control_stream_id.unwrap(), &[], true)
5354            .unwrap();
5355
5356        s.advance().ok();
5357
5358        assert_eq!(
5359            Err(Error::ClosedCriticalStream),
5360            s.server.poll(&mut s.pipe.server)
5361        );
5362        assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
5363    }
5364
5365    #[test]
5366    /// Client closes the control stream after a frame is sent, which is
5367    /// forbidden.
5368    fn close_control_stream_after_frame() {
5369        let mut s = Session::new().unwrap();
5370        s.handshake().unwrap();
5371
5372        s.send_frame_client(
5373            frame::Frame::MaxPushId { push_id: 1 },
5374            s.client.control_stream_id.unwrap(),
5375            true,
5376        )
5377        .unwrap();
5378
5379        assert_eq!(
5380            Err(Error::ClosedCriticalStream),
5381            s.server.poll(&mut s.pipe.server)
5382        );
5383        assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
5384    }
5385
5386    #[test]
5387    /// Client resets the control stream, which is forbidden.
5388    fn reset_control_stream_after_type() {
5389        let mut s = Session::new().unwrap();
5390        s.handshake().unwrap();
5391
5392        s.pipe
5393            .client
5394            .stream_shutdown(
5395                s.client.control_stream_id.unwrap(),
5396                crate::Shutdown::Write,
5397                0,
5398            )
5399            .unwrap();
5400
5401        s.advance().ok();
5402
5403        assert_eq!(
5404            Err(Error::ClosedCriticalStream),
5405            s.server.poll(&mut s.pipe.server)
5406        );
5407        assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
5408    }
5409
5410    #[test]
5411    /// Client resets the control stream after a frame is sent, which is
5412    /// forbidden.
5413    fn reset_control_stream_after_frame() {
5414        let mut s = Session::new().unwrap();
5415        s.handshake().unwrap();
5416
5417        s.send_frame_client(
5418            frame::Frame::MaxPushId { push_id: 1 },
5419            s.client.control_stream_id.unwrap(),
5420            false,
5421        )
5422        .unwrap();
5423
5424        assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
5425
5426        s.pipe
5427            .client
5428            .stream_shutdown(
5429                s.client.control_stream_id.unwrap(),
5430                crate::Shutdown::Write,
5431                0,
5432            )
5433            .unwrap();
5434
5435        s.advance().ok();
5436
5437        assert_eq!(
5438            Err(Error::ClosedCriticalStream),
5439            s.server.poll(&mut s.pipe.server)
5440        );
5441        assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
5442    }
5443
5444    #[test]
5445    /// Client closes QPACK stream, which is forbidden.
5446    fn close_qpack_stream_after_type() {
5447        let mut s = Session::new().unwrap();
5448        s.handshake().unwrap();
5449
5450        s.pipe
5451            .client
5452            .stream_send(
5453                s.client.local_qpack_streams.encoder_stream_id.unwrap(),
5454                &[],
5455                true,
5456            )
5457            .unwrap();
5458
5459        s.advance().ok();
5460
5461        assert_eq!(
5462            Err(Error::ClosedCriticalStream),
5463            s.server.poll(&mut s.pipe.server)
5464        );
5465        assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
5466    }
5467
5468    #[test]
5469    /// Client closes QPACK stream after sending some stuff, which is forbidden.
5470    fn close_qpack_stream_after_data() {
5471        let mut s = Session::new().unwrap();
5472        s.handshake().unwrap();
5473
5474        let stream_id = s.client.local_qpack_streams.encoder_stream_id.unwrap();
5475        let d = [0; 1];
5476
5477        s.pipe.client.stream_send(stream_id, &d, false).unwrap();
5478        s.pipe.client.stream_send(stream_id, &d, true).unwrap();
5479
5480        s.advance().ok();
5481
5482        assert_eq!(
5483            Err(Error::ClosedCriticalStream),
5484            s.server.poll(&mut s.pipe.server)
5485        );
5486        assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
5487    }
5488
5489    #[test]
5490    /// Client resets QPACK stream, which is forbidden.
5491    fn reset_qpack_stream_after_type() {
5492        let mut s = Session::new().unwrap();
5493        s.handshake().unwrap();
5494
5495        s.pipe
5496            .client
5497            .stream_shutdown(
5498                s.client.local_qpack_streams.encoder_stream_id.unwrap(),
5499                crate::Shutdown::Write,
5500                0,
5501            )
5502            .unwrap();
5503
5504        s.advance().ok();
5505
5506        assert_eq!(
5507            Err(Error::ClosedCriticalStream),
5508            s.server.poll(&mut s.pipe.server)
5509        );
5510        assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
5511    }
5512
5513    #[test]
5514    /// Client resets QPACK stream after sending some stuff, which is forbidden.
5515    fn reset_qpack_stream_after_data() {
5516        let mut s = Session::new().unwrap();
5517        s.handshake().unwrap();
5518
5519        let stream_id = s.client.local_qpack_streams.encoder_stream_id.unwrap();
5520        let d = [0; 1];
5521
5522        s.pipe.client.stream_send(stream_id, &d, false).unwrap();
5523        s.pipe.client.stream_send(stream_id, &d, false).unwrap();
5524
5525        s.advance().ok();
5526
5527        assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
5528
5529        s.pipe
5530            .client
5531            .stream_shutdown(stream_id, crate::Shutdown::Write, 0)
5532            .unwrap();
5533
5534        s.advance().ok();
5535
5536        assert_eq!(
5537            Err(Error::ClosedCriticalStream),
5538            s.server.poll(&mut s.pipe.server)
5539        );
5540        assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
5541    }
5542
5543    #[test]
5544    /// Client sends QPACK data.
5545    fn qpack_data() {
5546        // TODO: QPACK instructions are ignored until dynamic table support is
5547        // added so we just test that the data is safely ignored.
5548        let mut s = Session::new().unwrap();
5549        s.handshake().unwrap();
5550
5551        let e_stream_id = s.client.local_qpack_streams.encoder_stream_id.unwrap();
5552        let d_stream_id = s.client.local_qpack_streams.decoder_stream_id.unwrap();
5553        let d = [0; 20];
5554
5555        s.pipe.client.stream_send(e_stream_id, &d, false).unwrap();
5556        s.advance().ok();
5557
5558        s.pipe.client.stream_send(d_stream_id, &d, false).unwrap();
5559        s.advance().ok();
5560
5561        match s.server.poll(&mut s.pipe.server) {
5562            Ok(_) => panic!(),
5563
5564            Err(Error::Done) => {
5565                assert_eq!(s.server.peer_qpack_streams.encoder_stream_bytes, 20);
5566                assert_eq!(s.server.peer_qpack_streams.decoder_stream_bytes, 20);
5567            },
5568
5569            Err(_) => {
5570                panic!();
5571            },
5572        }
5573
5574        let stats = s.server.stats();
5575        assert_eq!(stats.qpack_encoder_stream_recv_bytes, 20);
5576        assert_eq!(stats.qpack_decoder_stream_recv_bytes, 20);
5577    }
5578
5579    #[test]
5580    /// Tests limits for the stream state buffer maximum size.
5581    fn max_state_buf_size() {
5582        let mut s = Session::new().unwrap();
5583        s.handshake().unwrap();
5584
5585        let req = vec![
5586            Header::new(b":method", b"GET"),
5587            Header::new(b":scheme", b"https"),
5588            Header::new(b":authority", b"quic.tech"),
5589            Header::new(b":path", b"/test"),
5590            Header::new(b"user-agent", b"quiche-test"),
5591        ];
5592
5593        assert_eq!(
5594            s.client.send_request(&mut s.pipe.client, &req, false),
5595            Ok(0)
5596        );
5597
5598        s.advance().ok();
5599
5600        let ev_headers = Event::Headers {
5601            list: req,
5602            more_frames: true,
5603        };
5604
5605        assert_eq!(s.server.poll(&mut s.pipe.server), Ok((0, ev_headers)));
5606
5607        // DATA frames don't consume the state buffer, so can be of any size.
5608        let mut d = [42; 128];
5609        let mut b = octets::OctetsMut::with_slice(&mut d);
5610
5611        let frame_type = b.put_varint(frame::DATA_FRAME_TYPE_ID).unwrap();
5612        s.pipe.client.stream_send(0, frame_type, false).unwrap();
5613
5614        let frame_len = b.put_varint(1 << 24).unwrap();
5615        s.pipe.client.stream_send(0, frame_len, false).unwrap();
5616
5617        s.pipe.client.stream_send(0, &d, false).unwrap();
5618
5619        s.advance().ok();
5620
5621        assert_eq!(s.server.poll(&mut s.pipe.server), Ok((0, Event::Data)));
5622
5623        // GREASE frames consume the state buffer, so need to be limited.
5624        let mut s = Session::new().unwrap();
5625        s.handshake().unwrap();
5626
5627        let mut d = [42; 128];
5628        let mut b = octets::OctetsMut::with_slice(&mut d);
5629
5630        let frame_type = b.put_varint(148_764_065_110_560_899).unwrap();
5631        s.pipe.client.stream_send(0, frame_type, false).unwrap();
5632
5633        let frame_len = b.put_varint(1 << 24).unwrap();
5634        s.pipe.client.stream_send(0, frame_len, false).unwrap();
5635
5636        s.pipe.client.stream_send(0, &d, false).unwrap();
5637
5638        s.advance().ok();
5639
5640        assert_eq!(s.server.poll(&mut s.pipe.server), Err(Error::ExcessiveLoad));
5641    }
5642
5643    #[test]
5644    /// Tests that DATA frames are properly truncated depending on the request
5645    /// stream's outgoing flow control capacity.
5646    fn stream_backpressure() {
5647        let bytes = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
5648
5649        let mut s = Session::new().unwrap();
5650        s.handshake().unwrap();
5651
5652        let (stream, req) = s.send_request(false).unwrap();
5653
5654        let total_data_frames = 6;
5655
5656        for _ in 0..total_data_frames {
5657            assert_eq!(
5658                s.client
5659                    .send_body(&mut s.pipe.client, stream, &bytes, false),
5660                Ok(bytes.len())
5661            );
5662
5663            s.advance().ok();
5664        }
5665
5666        assert_eq!(
5667            s.client.send_body(&mut s.pipe.client, stream, &bytes, true),
5668            Ok(bytes.len() - 2)
5669        );
5670
5671        s.advance().ok();
5672
5673        let mut recv_buf = vec![0; bytes.len()];
5674
5675        let ev_headers = Event::Headers {
5676            list: req,
5677            more_frames: true,
5678        };
5679
5680        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
5681        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
5682        assert_eq!(s.poll_server(), Err(Error::Done));
5683
5684        for _ in 0..total_data_frames {
5685            assert_eq!(
5686                s.recv_body_server(stream, &mut recv_buf),
5687                Ok(bytes.len())
5688            );
5689        }
5690
5691        assert_eq!(
5692            s.recv_body_server(stream, &mut recv_buf),
5693            Ok(bytes.len() - 2)
5694        );
5695
5696        // Fin flag from last send_body() call was not sent as the buffer was
5697        // only partially written.
5698        assert_eq!(s.poll_server(), Err(Error::Done));
5699
5700        assert_eq!(s.pipe.server.data_blocked_sent_count, 0);
5701        assert_eq!(s.pipe.server.stream_data_blocked_sent_count, 0);
5702        assert_eq!(s.pipe.server.data_blocked_recv_count, 0);
5703        assert_eq!(s.pipe.server.stream_data_blocked_recv_count, 1);
5704
5705        assert_eq!(s.pipe.client.data_blocked_sent_count, 0);
5706        assert_eq!(s.pipe.client.stream_data_blocked_sent_count, 1);
5707        assert_eq!(s.pipe.client.data_blocked_recv_count, 0);
5708        assert_eq!(s.pipe.client.stream_data_blocked_recv_count, 0);
5709    }
5710
5711    #[test]
5712    /// Tests that the max header list size setting is enforced.
5713    fn request_max_header_size_limit() {
5714        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
5715        config
5716            .load_cert_chain_from_pem_file("examples/cert.crt")
5717            .unwrap();
5718        config
5719            .load_priv_key_from_pem_file("examples/cert.key")
5720            .unwrap();
5721        config.set_application_protos(&[b"h3"]).unwrap();
5722        config.set_initial_max_data(1500);
5723        config.set_initial_max_stream_data_bidi_local(150);
5724        config.set_initial_max_stream_data_bidi_remote(150);
5725        config.set_initial_max_stream_data_uni(150);
5726        config.set_initial_max_streams_bidi(5);
5727        config.set_initial_max_streams_uni(5);
5728        config.verify_peer(false);
5729
5730        let mut h3_config = Config::new().unwrap();
5731        h3_config.set_max_field_section_size(65);
5732
5733        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
5734
5735        s.handshake().unwrap();
5736
5737        let req = vec![
5738            Header::new(b":method", b"GET"),
5739            Header::new(b":scheme", b"https"),
5740            Header::new(b":authority", b"quic.tech"),
5741            Header::new(b":path", b"/test"),
5742            Header::new(b"aaaaaaa", b"aaaaaaaa"),
5743        ];
5744
5745        let stream = s
5746            .client
5747            .send_request(&mut s.pipe.client, &req, true)
5748            .unwrap();
5749
5750        s.advance().ok();
5751
5752        assert_eq!(stream, 0);
5753
5754        assert_eq!(s.poll_server(), Err(Error::ExcessiveLoad));
5755
5756        assert_eq!(
5757            s.pipe.server.local_error.as_ref().unwrap().error_code,
5758            Error::to_wire(Error::ExcessiveLoad)
5759        );
5760    }
5761
5762    #[test]
5763    /// Tests that Error::TransportError contains a transport error.
5764    fn transport_error() {
5765        let mut s = Session::new().unwrap();
5766        s.handshake().unwrap();
5767
5768        let req = vec![
5769            Header::new(b":method", b"GET"),
5770            Header::new(b":scheme", b"https"),
5771            Header::new(b":authority", b"quic.tech"),
5772            Header::new(b":path", b"/test"),
5773            Header::new(b"user-agent", b"quiche-test"),
5774        ];
5775
5776        // We need to open all streams in the same flight, so we can't use the
5777        // Session::send_request() method because it also calls advance(),
5778        // otherwise the server would send a MAX_STREAMS frame and the client
5779        // wouldn't hit the streams limit.
5780        assert_eq!(s.client.send_request(&mut s.pipe.client, &req, true), Ok(0));
5781        assert_eq!(s.client.send_request(&mut s.pipe.client, &req, true), Ok(4));
5782        assert_eq!(s.client.send_request(&mut s.pipe.client, &req, true), Ok(8));
5783        assert_eq!(
5784            s.client.send_request(&mut s.pipe.client, &req, true),
5785            Ok(12)
5786        );
5787        assert_eq!(
5788            s.client.send_request(&mut s.pipe.client, &req, true),
5789            Ok(16)
5790        );
5791
5792        assert_eq!(
5793            s.client.send_request(&mut s.pipe.client, &req, true),
5794            Err(Error::TransportError(crate::Error::StreamLimit))
5795        );
5796    }
5797
5798    #[test]
5799    /// Tests that sending DATA before HEADERS causes an error.
5800    fn data_before_headers() {
5801        let mut s = Session::new().unwrap();
5802        s.handshake().unwrap();
5803
5804        let mut d = [42; 128];
5805        let mut b = octets::OctetsMut::with_slice(&mut d);
5806
5807        let frame_type = b.put_varint(frame::DATA_FRAME_TYPE_ID).unwrap();
5808        s.pipe.client.stream_send(0, frame_type, false).unwrap();
5809
5810        let frame_len = b.put_varint(5).unwrap();
5811        s.pipe.client.stream_send(0, frame_len, false).unwrap();
5812
5813        s.pipe.client.stream_send(0, b"hello", false).unwrap();
5814
5815        s.advance().ok();
5816
5817        assert_eq!(
5818            s.server.poll(&mut s.pipe.server),
5819            Err(Error::FrameUnexpected)
5820        );
5821    }
5822
5823    #[test]
5824    /// Tests that calling poll() after an error occurred does nothing.
5825    fn poll_after_error() {
5826        let mut s = Session::new().unwrap();
5827        s.handshake().unwrap();
5828
5829        let mut d = [42; 128];
5830        let mut b = octets::OctetsMut::with_slice(&mut d);
5831
5832        let frame_type = b.put_varint(148_764_065_110_560_899).unwrap();
5833        s.pipe.client.stream_send(0, frame_type, false).unwrap();
5834
5835        let frame_len = b.put_varint(1 << 24).unwrap();
5836        s.pipe.client.stream_send(0, frame_len, false).unwrap();
5837
5838        s.pipe.client.stream_send(0, &d, false).unwrap();
5839
5840        s.advance().ok();
5841
5842        assert_eq!(s.server.poll(&mut s.pipe.server), Err(Error::ExcessiveLoad));
5843
5844        // Try to call poll() again after an error occurred.
5845        assert_eq!(s.server.poll(&mut s.pipe.server), Err(Error::Done));
5846    }
5847
5848    #[test]
5849    /// Tests that we limit sending HEADERS based on the stream capacity.
5850    fn headers_blocked() {
5851        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
5852        config
5853            .load_cert_chain_from_pem_file("examples/cert.crt")
5854            .unwrap();
5855        config
5856            .load_priv_key_from_pem_file("examples/cert.key")
5857            .unwrap();
5858        config.set_application_protos(&[b"h3"]).unwrap();
5859        config.set_initial_max_data(70);
5860        config.set_initial_max_stream_data_bidi_local(150);
5861        config.set_initial_max_stream_data_bidi_remote(150);
5862        config.set_initial_max_stream_data_uni(150);
5863        config.set_initial_max_streams_bidi(100);
5864        config.set_initial_max_streams_uni(5);
5865        config.verify_peer(false);
5866
5867        let h3_config = Config::new().unwrap();
5868
5869        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
5870
5871        s.handshake().unwrap();
5872
5873        let req = vec![
5874            Header::new(b":method", b"GET"),
5875            Header::new(b":scheme", b"https"),
5876            Header::new(b":authority", b"quic.tech"),
5877            Header::new(b":path", b"/test"),
5878        ];
5879
5880        assert_eq!(s.client.send_request(&mut s.pipe.client, &req, true), Ok(0));
5881
5882        assert_eq!(
5883            s.client.send_request(&mut s.pipe.client, &req, true),
5884            Err(Error::StreamBlocked)
5885        );
5886
5887        // Clear the writable stream queue.
5888        assert_eq!(s.pipe.client.stream_writable_next(), Some(2));
5889        assert_eq!(s.pipe.client.stream_writable_next(), Some(6));
5890        assert_eq!(s.pipe.client.stream_writable_next(), Some(10));
5891        assert_eq!(s.pipe.client.stream_writable_next(), None);
5892
5893        s.advance().ok();
5894
5895        // Once the server gives flow control credits back, we can send the
5896        // request.
5897        assert_eq!(s.pipe.client.stream_writable_next(), Some(4));
5898        assert_eq!(s.client.send_request(&mut s.pipe.client, &req, true), Ok(4));
5899
5900        assert_eq!(s.pipe.server.data_blocked_sent_count, 0);
5901        assert_eq!(s.pipe.server.stream_data_blocked_sent_count, 0);
5902        assert_eq!(s.pipe.server.data_blocked_recv_count, 1);
5903        assert_eq!(s.pipe.server.stream_data_blocked_recv_count, 0);
5904
5905        assert_eq!(s.pipe.client.data_blocked_sent_count, 1);
5906        assert_eq!(s.pipe.client.stream_data_blocked_sent_count, 0);
5907        assert_eq!(s.pipe.client.data_blocked_recv_count, 0);
5908        assert_eq!(s.pipe.client.stream_data_blocked_recv_count, 0);
5909    }
5910
5911    #[test]
5912    /// Ensure StreamBlocked when connection flow control prevents headers.
5913    fn headers_blocked_on_conn() {
5914        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
5915        config
5916            .load_cert_chain_from_pem_file("examples/cert.crt")
5917            .unwrap();
5918        config
5919            .load_priv_key_from_pem_file("examples/cert.key")
5920            .unwrap();
5921        config.set_application_protos(&[b"h3"]).unwrap();
5922        config.set_initial_max_data(70);
5923        config.set_initial_max_stream_data_bidi_local(150);
5924        config.set_initial_max_stream_data_bidi_remote(150);
5925        config.set_initial_max_stream_data_uni(150);
5926        config.set_initial_max_streams_bidi(100);
5927        config.set_initial_max_streams_uni(5);
5928        config.verify_peer(false);
5929
5930        let h3_config = Config::new().unwrap();
5931
5932        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
5933
5934        s.handshake().unwrap();
5935
5936        // After the HTTP handshake, some bytes of connection flow control have
5937        // been consumed. Fill the connection with more grease data on the control
5938        // stream.
5939        let d = [42; 28];
5940        assert_eq!(s.pipe.client.stream_send(2, &d, false), Ok(23));
5941
5942        let req = vec![
5943            Header::new(b":method", b"GET"),
5944            Header::new(b":scheme", b"https"),
5945            Header::new(b":authority", b"quic.tech"),
5946            Header::new(b":path", b"/test"),
5947        ];
5948
5949        // There is 0 connection-level flow control, so sending a request is
5950        // blocked.
5951        assert_eq!(
5952            s.client.send_request(&mut s.pipe.client, &req, true),
5953            Err(Error::StreamBlocked)
5954        );
5955        assert_eq!(s.pipe.client.stream_writable_next(), None);
5956
5957        // Emit the control stream data and drain it at the server via poll() to
5958        // consumes it via poll() and gives back flow control.
5959        s.advance().ok();
5960        assert_eq!(s.poll_server(), Err(Error::Done));
5961        s.advance().ok();
5962
5963        // Now we can send the request.
5964        assert_eq!(s.pipe.client.stream_writable_next(), Some(2));
5965        assert_eq!(s.pipe.client.stream_writable_next(), Some(6));
5966        assert_eq!(s.client.send_request(&mut s.pipe.client, &req, true), Ok(0));
5967
5968        assert_eq!(s.pipe.server.data_blocked_sent_count, 0);
5969        assert_eq!(s.pipe.server.stream_data_blocked_sent_count, 0);
5970        assert_eq!(s.pipe.server.data_blocked_recv_count, 1);
5971        assert_eq!(s.pipe.server.stream_data_blocked_recv_count, 0);
5972
5973        assert_eq!(s.pipe.client.data_blocked_sent_count, 1);
5974        assert_eq!(s.pipe.client.stream_data_blocked_sent_count, 0);
5975        assert_eq!(s.pipe.client.data_blocked_recv_count, 0);
5976        assert_eq!(s.pipe.client.stream_data_blocked_recv_count, 0);
5977    }
5978
5979    #[test]
5980    /// Ensure that the connection does not consume a stream ID when
5981    /// send_request fails with StreamBlocked due to hitting the
5982    /// MAX_DATA flow control limit when attempting to send headers.
5983    /// The headers are sent successfully after a MAX_DATA update.
5984    fn headers_blocked_by_max_data_success_on_retry() {
5985        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
5986        config
5987            .load_cert_chain_from_pem_file("examples/cert.crt")
5988            .unwrap();
5989        config
5990            .load_priv_key_from_pem_file("examples/cert.key")
5991            .unwrap();
5992        config.set_application_protos(&[b"h3"]).unwrap();
5993        config.set_initial_max_data(70);
5994        config.set_initial_max_stream_data_bidi_local(150);
5995        config.set_initial_max_stream_data_bidi_remote(150);
5996        config.set_initial_max_stream_data_uni(150);
5997        config.set_initial_max_streams_bidi(100);
5998        config.set_initial_max_streams_uni(5);
5999        config.verify_peer(false);
6000
6001        let h3_config = Config::new().unwrap();
6002
6003        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
6004
6005        s.handshake().unwrap();
6006
6007        let req = vec![
6008            Header::new(b":method", b"GET"),
6009            Header::new(b":scheme", b"https"),
6010            Header::new(b":authority", b"quic.tech"),
6011            Header::new(b":path", b"/test/with/long/url"),
6012        ];
6013
6014        // After the HTTP handshake, some bytes of connection flow
6015        // control have been consumed.  The serialized request does
6016        // not fit in the remaining connection-level flow control
6017        // limit.  send_request fails without creating a stream.
6018        assert_eq!(
6019            s.client.send_request(&mut s.pipe.client, &req, true),
6020            Err(Error::StreamBlocked)
6021        );
6022
6023        // Verify that stream 0 does not exist in the H3 stream map after the
6024        // failed attempt.
6025        assert!(!s.client.streams.contains_key(&0));
6026
6027        // Emit the control stream data and drain it at the server to give back
6028        // flow control.
6029        s.advance().ok();
6030        assert_eq!(s.poll_server(), Err(Error::Done));
6031        s.advance().ok();
6032
6033        // Now we can send the request. The stream ID should be 0 (not 4),
6034        // confirming that the blocked attempt did not consume the stream ID.
6035        let stream_id = s.client.send_request(&mut s.pipe.client, &req, true);
6036        assert_eq!(stream_id, Ok(0));
6037        assert!(s.client.streams.contains_key(&0));
6038        assert!(!s.client.streams.contains_key(&4));
6039
6040        // Subsequent request should use stream ID 4.
6041        let stream_id2 = s.client.send_request(&mut s.pipe.client, &req, true);
6042        assert_eq!(stream_id2, Ok(4));
6043        assert!(s.client.streams.contains_key(&0));
6044        assert!(s.client.streams.contains_key(&4));
6045
6046        s.advance().ok();
6047    }
6048
6049    #[test]
6050    /// Ensure STREAM_DATA_BLOCKED is not emitted multiple times with the same
6051    /// offset when trying to send large bodies.
6052    fn send_body_truncation_stream_blocked() {
6053        use crate::test_utils::decode_pkt;
6054
6055        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
6056        config
6057            .load_cert_chain_from_pem_file("examples/cert.crt")
6058            .unwrap();
6059        config
6060            .load_priv_key_from_pem_file("examples/cert.key")
6061            .unwrap();
6062        config.set_application_protos(&[b"h3"]).unwrap();
6063        config.set_initial_max_data(10000); // large connection-level flow control
6064        config.set_initial_max_stream_data_bidi_local(80);
6065        config.set_initial_max_stream_data_bidi_remote(80);
6066        config.set_initial_max_stream_data_uni(150);
6067        config.set_initial_max_streams_bidi(100);
6068        config.set_initial_max_streams_uni(5);
6069        config.verify_peer(false);
6070
6071        let h3_config = Config::new().unwrap();
6072
6073        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
6074
6075        s.handshake().unwrap();
6076
6077        let (stream, req) = s.send_request(true).unwrap();
6078
6079        let ev_headers = Event::Headers {
6080            list: req,
6081            more_frames: false,
6082        };
6083
6084        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
6085        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
6086
6087        let _ = s.send_response(stream, false).unwrap();
6088
6089        assert_eq!(s.pipe.server.streams.blocked().len(), 0);
6090
6091        // The body must be larger than the stream window would allow
6092        let d = [42; 500];
6093        let mut off = 0;
6094
6095        let sent = s
6096            .server
6097            .send_body(&mut s.pipe.server, stream, &d, true)
6098            .unwrap();
6099        assert_eq!(sent, 25);
6100        off += sent;
6101
6102        // send_body wrote as much as it could (sent < size of buff).
6103        assert_eq!(s.pipe.server.streams.blocked().len(), 1);
6104        assert_eq!(
6105            s.server
6106                .send_body(&mut s.pipe.server, stream, &d[off..], true),
6107            Err(Error::Done)
6108        );
6109        assert_eq!(s.pipe.server.streams.blocked().len(), 1);
6110
6111        // Now read raw frames to see what the QUIC layer did
6112        let mut buf = [0; 65535];
6113        let (len, _) = s.pipe.server.send(&mut buf).unwrap();
6114
6115        let frames = decode_pkt(&mut s.pipe.client, &mut buf[..len]).unwrap();
6116
6117        let mut iter = frames.iter();
6118
6119        assert_eq!(
6120            iter.next(),
6121            Some(&crate::frame::Frame::StreamDataBlocked {
6122                stream_id: 0,
6123                limit: 80,
6124            })
6125        );
6126
6127        // At the server, after sending the STREAM_DATA_BLOCKED frame, we clear
6128        // the mark.
6129        assert_eq!(s.pipe.server.streams.blocked().len(), 0);
6130
6131        // Don't read any data from the client, so stream flow control is never
6132        // given back in the form of changing the stream's max offset.
6133        // Subsequent body send operations will still fail but no more
6134        // STREAM_DATA_BLOCKED frames should be submitted since the limit didn't
6135        // change. No frames means no packet to send.
6136        assert_eq!(
6137            s.server
6138                .send_body(&mut s.pipe.server, stream, &d[off..], true),
6139            Err(Error::Done)
6140        );
6141        assert_eq!(s.pipe.server.streams.blocked().len(), 0);
6142        assert_eq!(s.pipe.server.send(&mut buf), Err(crate::Error::Done));
6143
6144        // Now update the client's max offset manually.
6145        let frames = [crate::frame::Frame::MaxStreamData {
6146            stream_id: 0,
6147            max: 100,
6148        }];
6149
6150        let pkt_type = crate::packet::Type::Short;
6151        assert_eq!(
6152            s.pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
6153            Ok(39),
6154        );
6155
6156        let sent = s
6157            .server
6158            .send_body(&mut s.pipe.server, stream, &d[off..], true)
6159            .unwrap();
6160        assert_eq!(sent, 18);
6161
6162        // Same thing here...
6163        assert_eq!(s.pipe.server.streams.blocked().len(), 1);
6164        assert_eq!(
6165            s.server
6166                .send_body(&mut s.pipe.server, stream, &d[off..], true),
6167            Err(Error::Done)
6168        );
6169        assert_eq!(s.pipe.server.streams.blocked().len(), 1);
6170
6171        let (len, _) = s.pipe.server.send(&mut buf).unwrap();
6172
6173        let frames = decode_pkt(&mut s.pipe.client, &mut buf[..len]).unwrap();
6174
6175        let mut iter = frames.iter();
6176
6177        assert_eq!(
6178            iter.next(),
6179            Some(&crate::frame::Frame::StreamDataBlocked {
6180                stream_id: 0,
6181                limit: 100,
6182            })
6183        );
6184    }
6185
6186    #[test]
6187    /// Ensure stream doesn't hang due to small cwnd.
6188    fn send_body_stream_blocked_by_small_cwnd() {
6189        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
6190        config
6191            .load_cert_chain_from_pem_file("examples/cert.crt")
6192            .unwrap();
6193        config
6194            .load_priv_key_from_pem_file("examples/cert.key")
6195            .unwrap();
6196        config.set_application_protos(&[b"h3"]).unwrap();
6197        config.set_initial_max_data(100000); // large connection-level flow control
6198        config.set_initial_max_stream_data_bidi_local(100000);
6199        config.set_initial_max_stream_data_bidi_remote(50000);
6200        config.set_initial_max_stream_data_uni(150);
6201        config.set_initial_max_streams_bidi(100);
6202        config.set_initial_max_streams_uni(5);
6203        config.verify_peer(false);
6204
6205        let h3_config = Config::new().unwrap();
6206
6207        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
6208
6209        s.handshake().unwrap();
6210
6211        let (stream, req) = s.send_request(true).unwrap();
6212
6213        let ev_headers = Event::Headers {
6214            list: req,
6215            more_frames: false,
6216        };
6217
6218        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
6219        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
6220
6221        let _ = s.send_response(stream, false).unwrap();
6222
6223        // Clear the writable stream queue.
6224        assert_eq!(s.pipe.server.stream_writable_next(), Some(3));
6225        assert_eq!(s.pipe.server.stream_writable_next(), Some(7));
6226        assert_eq!(s.pipe.server.stream_writable_next(), Some(11));
6227        assert_eq!(s.pipe.server.stream_writable_next(), Some(stream));
6228        assert_eq!(s.pipe.server.stream_writable_next(), None);
6229
6230        // The body must be larger than the cwnd would allow.
6231        let send_buf = [42; 80000];
6232
6233        let sent = s
6234            .server
6235            .send_body(&mut s.pipe.server, stream, &send_buf, true)
6236            .unwrap();
6237
6238        // send_body wrote as much as it could (sent < size of buff).
6239        assert_eq!(sent, 11995);
6240
6241        s.advance().ok();
6242
6243        // Client reads received headers and body.
6244        let mut recv_buf = [42; 80000];
6245        assert!(s.poll_client().is_ok());
6246        assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
6247        assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(11995));
6248
6249        s.advance().ok();
6250
6251        // Server send cap is smaller than remaining body buffer.
6252        assert!(s.pipe.server.tx_cap < send_buf.len() - sent);
6253
6254        // Once the server cwnd opens up, we can send more body.
6255        assert_eq!(s.pipe.server.stream_writable_next(), Some(0));
6256    }
6257
6258    #[test]
6259    /// Ensure stream doesn't hang due to small cwnd.
6260    fn send_body_stream_blocked_zero_length() {
6261        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
6262        config
6263            .load_cert_chain_from_pem_file("examples/cert.crt")
6264            .unwrap();
6265        config
6266            .load_priv_key_from_pem_file("examples/cert.key")
6267            .unwrap();
6268        config.set_application_protos(&[b"h3"]).unwrap();
6269        config.set_initial_max_data(100000); // large connection-level flow control
6270        config.set_initial_max_stream_data_bidi_local(100000);
6271        config.set_initial_max_stream_data_bidi_remote(50000);
6272        config.set_initial_max_stream_data_uni(150);
6273        config.set_initial_max_streams_bidi(100);
6274        config.set_initial_max_streams_uni(5);
6275        config.verify_peer(false);
6276
6277        let h3_config = Config::new().unwrap();
6278
6279        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
6280
6281        s.handshake().unwrap();
6282
6283        let (stream, req) = s.send_request(true).unwrap();
6284
6285        let ev_headers = Event::Headers {
6286            list: req,
6287            more_frames: false,
6288        };
6289
6290        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
6291        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
6292
6293        let _ = s.send_response(stream, false).unwrap();
6294
6295        // Clear the writable stream queue.
6296        assert_eq!(s.pipe.server.stream_writable_next(), Some(3));
6297        assert_eq!(s.pipe.server.stream_writable_next(), Some(7));
6298        assert_eq!(s.pipe.server.stream_writable_next(), Some(11));
6299        assert_eq!(s.pipe.server.stream_writable_next(), Some(stream));
6300        assert_eq!(s.pipe.server.stream_writable_next(), None);
6301
6302        // The body is large enough to fill the cwnd, except for enough bytes
6303        // for another DATA frame header (but no payload).
6304        let send_buf = [42; 11994];
6305
6306        let sent = s
6307            .server
6308            .send_body(&mut s.pipe.server, stream, &send_buf, false)
6309            .unwrap();
6310
6311        assert_eq!(sent, 11994);
6312
6313        // There is only enough capacity left for the DATA frame header, but
6314        // no payload.
6315        assert_eq!(s.pipe.server.stream_capacity(stream).unwrap(), 3);
6316        assert_eq!(
6317            s.server
6318                .send_body(&mut s.pipe.server, stream, &send_buf, false),
6319            Err(Error::Done)
6320        );
6321
6322        s.advance().ok();
6323
6324        // Client reads received headers and body.
6325        let mut recv_buf = [42; 80000];
6326        assert!(s.poll_client().is_ok());
6327        assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
6328        assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(11994));
6329
6330        s.advance().ok();
6331
6332        // Once the server cwnd opens up, we can send more body.
6333        assert_eq!(s.pipe.server.stream_writable_next(), Some(0));
6334    }
6335
6336    #[test]
6337    /// Test handling of 0-length DATA writes with and without fin.
6338    fn zero_length_data() {
6339        let mut s = Session::new().unwrap();
6340        s.handshake().unwrap();
6341
6342        let (stream, req) = s.send_request(false).unwrap();
6343
6344        assert_eq!(
6345            s.client.send_body(&mut s.pipe.client, 0, b"", false),
6346            Err(Error::Done)
6347        );
6348        assert_eq!(s.client.send_body(&mut s.pipe.client, 0, b"", true), Ok(0));
6349
6350        s.advance().ok();
6351
6352        let mut recv_buf = vec![0; 100];
6353
6354        let ev_headers = Event::Headers {
6355            list: req,
6356            more_frames: true,
6357        };
6358
6359        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
6360
6361        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
6362        assert_eq!(s.recv_body_server(stream, &mut recv_buf), Err(Error::Done));
6363
6364        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
6365        assert_eq!(s.poll_server(), Err(Error::Done));
6366
6367        let resp = s.send_response(stream, false).unwrap();
6368
6369        assert_eq!(
6370            s.server.send_body(&mut s.pipe.server, 0, b"", false),
6371            Err(Error::Done)
6372        );
6373        assert_eq!(s.server.send_body(&mut s.pipe.server, 0, b"", true), Ok(0));
6374
6375        s.advance().ok();
6376
6377        let ev_headers = Event::Headers {
6378            list: resp,
6379            more_frames: true,
6380        };
6381
6382        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
6383
6384        assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
6385        assert_eq!(s.recv_body_client(stream, &mut recv_buf), Err(Error::Done));
6386
6387        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
6388        assert_eq!(s.poll_client(), Err(Error::Done));
6389    }
6390
6391    #[test]
6392    /// Tests that blocked 0-length DATA writes are reported correctly.
6393    fn zero_length_data_blocked() {
6394        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
6395        config
6396            .load_cert_chain_from_pem_file("examples/cert.crt")
6397            .unwrap();
6398        config
6399            .load_priv_key_from_pem_file("examples/cert.key")
6400            .unwrap();
6401        config.set_application_protos(&[b"h3"]).unwrap();
6402        config.set_initial_max_data(69);
6403        config.set_initial_max_stream_data_bidi_local(150);
6404        config.set_initial_max_stream_data_bidi_remote(150);
6405        config.set_initial_max_stream_data_uni(150);
6406        config.set_initial_max_streams_bidi(100);
6407        config.set_initial_max_streams_uni(5);
6408        config.verify_peer(false);
6409
6410        let h3_config = Config::new().unwrap();
6411
6412        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
6413
6414        s.handshake().unwrap();
6415
6416        let req = vec![
6417            Header::new(b":method", b"GET"),
6418            Header::new(b":scheme", b"https"),
6419            Header::new(b":authority", b"quic.tech"),
6420            Header::new(b":path", b"/test"),
6421        ];
6422
6423        assert_eq!(
6424            s.client.send_request(&mut s.pipe.client, &req, false),
6425            Ok(0)
6426        );
6427
6428        assert_eq!(
6429            s.client.send_body(&mut s.pipe.client, 0, b"", true),
6430            Err(Error::Done)
6431        );
6432
6433        // Clear the writable stream queue.
6434        assert_eq!(s.pipe.client.stream_writable_next(), Some(2));
6435        assert_eq!(s.pipe.client.stream_writable_next(), Some(6));
6436        assert_eq!(s.pipe.client.stream_writable_next(), Some(10));
6437        assert_eq!(s.pipe.client.stream_writable_next(), None);
6438
6439        s.advance().ok();
6440
6441        // Once the server gives flow control credits back, we can send the body.
6442        assert_eq!(s.pipe.client.stream_writable_next(), Some(0));
6443        assert_eq!(s.client.send_body(&mut s.pipe.client, 0, b"", true), Ok(0));
6444    }
6445
6446    #[test]
6447    /// Tests that receiving an empty SETTINGS frame is handled and reported.
6448    fn empty_settings() {
6449        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
6450        config
6451            .load_cert_chain_from_pem_file("examples/cert.crt")
6452            .unwrap();
6453        config
6454            .load_priv_key_from_pem_file("examples/cert.key")
6455            .unwrap();
6456        config.set_application_protos(&[b"h3"]).unwrap();
6457        config.set_initial_max_data(1500);
6458        config.set_initial_max_stream_data_bidi_local(150);
6459        config.set_initial_max_stream_data_bidi_remote(150);
6460        config.set_initial_max_stream_data_uni(150);
6461        config.set_initial_max_streams_bidi(5);
6462        config.set_initial_max_streams_uni(5);
6463        config.verify_peer(false);
6464        config.set_ack_delay_exponent(8);
6465        config.grease(false);
6466
6467        let h3_config = Config::new().unwrap();
6468        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
6469
6470        s.handshake().unwrap();
6471
6472        assert!(s.client.peer_settings_raw().is_some());
6473        assert!(s.server.peer_settings_raw().is_some());
6474    }
6475
6476    #[test]
6477    /// Tests that receiving a H3_DATAGRAM setting is ok.
6478    fn dgram_setting() {
6479        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
6480        config
6481            .load_cert_chain_from_pem_file("examples/cert.crt")
6482            .unwrap();
6483        config
6484            .load_priv_key_from_pem_file("examples/cert.key")
6485            .unwrap();
6486        config.set_application_protos(&[b"h3"]).unwrap();
6487        config.set_initial_max_data(70);
6488        config.set_initial_max_stream_data_bidi_local(150);
6489        config.set_initial_max_stream_data_bidi_remote(150);
6490        config.set_initial_max_stream_data_uni(150);
6491        config.set_initial_max_streams_bidi(100);
6492        config.set_initial_max_streams_uni(5);
6493        config.enable_dgram(true, 1000, 1000);
6494        config.verify_peer(false);
6495
6496        let h3_config = Config::new().unwrap();
6497
6498        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
6499        assert_eq!(s.pipe.handshake(), Ok(()));
6500
6501        s.client.send_settings(&mut s.pipe.client).unwrap();
6502        assert_eq!(s.pipe.advance(), Ok(()));
6503
6504        // Before processing SETTINGS (via poll), HTTP/3 DATAGRAMS are not
6505        // enabled.
6506        assert!(!s.server.dgram_enabled_by_peer(&s.pipe.server));
6507
6508        // When everything is ok, poll returns Done and DATAGRAM is enabled.
6509        assert_eq!(s.server.poll(&mut s.pipe.server), Err(Error::Done));
6510        assert!(s.server.dgram_enabled_by_peer(&s.pipe.server));
6511
6512        // Now detect things on the client
6513        s.server.send_settings(&mut s.pipe.server).unwrap();
6514        assert_eq!(s.pipe.advance(), Ok(()));
6515        assert!(!s.client.dgram_enabled_by_peer(&s.pipe.client));
6516        assert_eq!(s.client.poll(&mut s.pipe.client), Err(Error::Done));
6517        assert!(s.client.dgram_enabled_by_peer(&s.pipe.client));
6518    }
6519
6520    #[test]
6521    /// Tests that receiving a H3_DATAGRAM setting when no TP is set generates
6522    /// an error.
6523    fn dgram_setting_no_tp() {
6524        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
6525        config
6526            .load_cert_chain_from_pem_file("examples/cert.crt")
6527            .unwrap();
6528        config
6529            .load_priv_key_from_pem_file("examples/cert.key")
6530            .unwrap();
6531        config.set_application_protos(&[b"h3"]).unwrap();
6532        config.set_initial_max_data(70);
6533        config.set_initial_max_stream_data_bidi_local(150);
6534        config.set_initial_max_stream_data_bidi_remote(150);
6535        config.set_initial_max_stream_data_uni(150);
6536        config.set_initial_max_streams_bidi(100);
6537        config.set_initial_max_streams_uni(5);
6538        config.verify_peer(false);
6539
6540        let h3_config = Config::new().unwrap();
6541
6542        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
6543        assert_eq!(s.pipe.handshake(), Ok(()));
6544
6545        s.client.control_stream_id = Some(
6546            s.client
6547                .open_uni_stream(
6548                    &mut s.pipe.client,
6549                    stream::HTTP3_CONTROL_STREAM_TYPE_ID,
6550                )
6551                .unwrap(),
6552        );
6553
6554        let settings = frame::Frame::Settings {
6555            max_field_section_size: None,
6556            qpack_max_table_capacity: None,
6557            qpack_blocked_streams: None,
6558            connect_protocol_enabled: None,
6559            h3_datagram: Some(1),
6560            grease: None,
6561            additional_settings: Default::default(),
6562            raw: Default::default(),
6563        };
6564
6565        s.send_frame_client(settings, s.client.control_stream_id.unwrap(), false)
6566            .unwrap();
6567
6568        assert_eq!(s.pipe.advance(), Ok(()));
6569
6570        assert_eq!(s.server.poll(&mut s.pipe.server), Err(Error::SettingsError));
6571    }
6572
6573    #[test]
6574    /// Tests that receiving SETTINGS with prohibited values generates an error.
6575    fn settings_h2_prohibited() {
6576        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
6577        config
6578            .load_cert_chain_from_pem_file("examples/cert.crt")
6579            .unwrap();
6580        config
6581            .load_priv_key_from_pem_file("examples/cert.key")
6582            .unwrap();
6583        config.set_application_protos(&[b"h3"]).unwrap();
6584        config.set_initial_max_data(70);
6585        config.set_initial_max_stream_data_bidi_local(150);
6586        config.set_initial_max_stream_data_bidi_remote(150);
6587        config.set_initial_max_stream_data_uni(150);
6588        config.set_initial_max_streams_bidi(100);
6589        config.set_initial_max_streams_uni(5);
6590        config.verify_peer(false);
6591
6592        let h3_config = Config::new().unwrap();
6593
6594        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
6595        assert_eq!(s.pipe.handshake(), Ok(()));
6596
6597        s.client.control_stream_id = Some(
6598            s.client
6599                .open_uni_stream(
6600                    &mut s.pipe.client,
6601                    stream::HTTP3_CONTROL_STREAM_TYPE_ID,
6602                )
6603                .unwrap(),
6604        );
6605
6606        s.server.control_stream_id = Some(
6607            s.server
6608                .open_uni_stream(
6609                    &mut s.pipe.server,
6610                    stream::HTTP3_CONTROL_STREAM_TYPE_ID,
6611                )
6612                .unwrap(),
6613        );
6614
6615        let frame_payload_len = 2u64;
6616        let settings = [
6617            frame::SETTINGS_FRAME_TYPE_ID as u8,
6618            frame_payload_len as u8,
6619            0x2, // 0x2 is a reserved setting type
6620            1,
6621        ];
6622
6623        s.send_arbitrary_stream_data_client(
6624            &settings,
6625            s.client.control_stream_id.unwrap(),
6626            false,
6627        )
6628        .unwrap();
6629
6630        s.send_arbitrary_stream_data_server(
6631            &settings,
6632            s.server.control_stream_id.unwrap(),
6633            false,
6634        )
6635        .unwrap();
6636
6637        assert_eq!(s.pipe.advance(), Ok(()));
6638
6639        assert_eq!(s.server.poll(&mut s.pipe.server), Err(Error::SettingsError));
6640
6641        assert_eq!(s.client.poll(&mut s.pipe.client), Err(Error::SettingsError));
6642    }
6643
6644    #[test]
6645    /// Tests that setting SETTINGS with prohibited values generates an error.
6646    fn set_prohibited_additional_settings() {
6647        let mut h3_config = Config::new().unwrap();
6648        assert_eq!(
6649            h3_config.set_additional_settings(vec![(
6650                frame::SETTINGS_QPACK_MAX_TABLE_CAPACITY,
6651                43
6652            )]),
6653            Err(Error::SettingsError)
6654        );
6655        assert_eq!(
6656            h3_config.set_additional_settings(vec![(
6657                frame::SETTINGS_MAX_FIELD_SECTION_SIZE,
6658                43
6659            )]),
6660            Err(Error::SettingsError)
6661        );
6662        assert_eq!(
6663            h3_config.set_additional_settings(vec![(
6664                frame::SETTINGS_QPACK_BLOCKED_STREAMS,
6665                43
6666            )]),
6667            Err(Error::SettingsError)
6668        );
6669        assert_eq!(
6670            h3_config.set_additional_settings(vec![(
6671                frame::SETTINGS_ENABLE_CONNECT_PROTOCOL,
6672                43
6673            )]),
6674            Err(Error::SettingsError)
6675        );
6676        assert_eq!(
6677            h3_config
6678                .set_additional_settings(vec![(frame::SETTINGS_H3_DATAGRAM, 43)]),
6679            Err(Error::SettingsError)
6680        );
6681    }
6682
6683    #[test]
6684    /// Tests additional settings are actually exchanged by the peers.
6685    fn set_additional_settings() {
6686        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
6687        config
6688            .load_cert_chain_from_pem_file("examples/cert.crt")
6689            .unwrap();
6690        config
6691            .load_priv_key_from_pem_file("examples/cert.key")
6692            .unwrap();
6693        config.set_application_protos(&[b"h3"]).unwrap();
6694        config.set_initial_max_data(70);
6695        config.set_initial_max_stream_data_bidi_local(150);
6696        config.set_initial_max_stream_data_bidi_remote(150);
6697        config.set_initial_max_stream_data_uni(150);
6698        config.set_initial_max_streams_bidi(100);
6699        config.set_initial_max_streams_uni(5);
6700        config.verify_peer(false);
6701        config.grease(false);
6702
6703        let mut h3_config = Config::new().unwrap();
6704        h3_config
6705            .set_additional_settings(vec![(42, 43), (44, 45)])
6706            .unwrap();
6707
6708        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
6709        assert_eq!(s.pipe.handshake(), Ok(()));
6710
6711        assert_eq!(s.pipe.advance(), Ok(()));
6712
6713        s.client.send_settings(&mut s.pipe.client).unwrap();
6714        assert_eq!(s.pipe.advance(), Ok(()));
6715        assert_eq!(s.server.poll(&mut s.pipe.server), Err(Error::Done));
6716
6717        s.server.send_settings(&mut s.pipe.server).unwrap();
6718        assert_eq!(s.pipe.advance(), Ok(()));
6719        assert_eq!(s.client.poll(&mut s.pipe.client), Err(Error::Done));
6720
6721        assert_eq!(
6722            s.server.peer_settings_raw(),
6723            Some(&[(42, 43), (44, 45)][..])
6724        );
6725        assert_eq!(
6726            s.client.peer_settings_raw(),
6727            Some(&[(42, 43), (44, 45)][..])
6728        );
6729    }
6730
6731    #[test]
6732    /// Send a single DATAGRAM.
6733    fn single_dgram() {
6734        let mut buf = [0; 65535];
6735        let mut s = Session::new().unwrap();
6736        s.handshake().unwrap();
6737
6738        // We'll send default data of 10 bytes on flow ID 0.
6739        let result = (11, 0, 1);
6740
6741        s.send_dgram_client(0).unwrap();
6742
6743        assert_eq!(s.poll_server(), Err(Error::Done));
6744        assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
6745
6746        s.send_dgram_server(0).unwrap();
6747        assert_eq!(s.poll_client(), Err(Error::Done));
6748        assert_eq!(s.recv_dgram_client(&mut buf), Ok(result));
6749    }
6750
6751    #[test]
6752    /// Send multiple DATAGRAMs.
6753    fn multiple_dgram() {
6754        let mut buf = [0; 65535];
6755        let mut s = Session::new().unwrap();
6756        s.handshake().unwrap();
6757
6758        // We'll send default data of 10 bytes on flow ID 0.
6759        let result = (11, 0, 1);
6760
6761        s.send_dgram_client(0).unwrap();
6762        s.send_dgram_client(0).unwrap();
6763        s.send_dgram_client(0).unwrap();
6764
6765        assert_eq!(s.poll_server(), Err(Error::Done));
6766        assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
6767        assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
6768        assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
6769        assert_eq!(s.recv_dgram_server(&mut buf), Err(Error::Done));
6770
6771        s.send_dgram_server(0).unwrap();
6772        s.send_dgram_server(0).unwrap();
6773        s.send_dgram_server(0).unwrap();
6774
6775        assert_eq!(s.poll_client(), Err(Error::Done));
6776        assert_eq!(s.recv_dgram_client(&mut buf), Ok(result));
6777        assert_eq!(s.recv_dgram_client(&mut buf), Ok(result));
6778        assert_eq!(s.recv_dgram_client(&mut buf), Ok(result));
6779        assert_eq!(s.recv_dgram_client(&mut buf), Err(Error::Done));
6780    }
6781
6782    #[test]
6783    /// Send more DATAGRAMs than the send queue allows.
6784    fn multiple_dgram_overflow() {
6785        let mut buf = [0; 65535];
6786        let mut s = Session::new().unwrap();
6787        s.handshake().unwrap();
6788
6789        // We'll send default data of 10 bytes on flow ID 0.
6790        let result = (11, 0, 1);
6791
6792        // Five DATAGRAMs
6793        s.send_dgram_client(0).unwrap();
6794        s.send_dgram_client(0).unwrap();
6795        s.send_dgram_client(0).unwrap();
6796        s.send_dgram_client(0).unwrap();
6797        s.send_dgram_client(0).unwrap();
6798
6799        // Only 3 independent DATAGRAMs to read events will fire.
6800        assert_eq!(s.poll_server(), Err(Error::Done));
6801        assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
6802        assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
6803        assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
6804        assert_eq!(s.recv_dgram_server(&mut buf), Err(Error::Done));
6805    }
6806
6807    #[test]
6808    /// Send a single DATAGRAM and request.
6809    fn poll_datagram_cycling_no_read() {
6810        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
6811        config
6812            .load_cert_chain_from_pem_file("examples/cert.crt")
6813            .unwrap();
6814        config
6815            .load_priv_key_from_pem_file("examples/cert.key")
6816            .unwrap();
6817        config.set_application_protos(&[b"h3"]).unwrap();
6818        config.set_initial_max_data(1500);
6819        config.set_initial_max_stream_data_bidi_local(150);
6820        config.set_initial_max_stream_data_bidi_remote(150);
6821        config.set_initial_max_stream_data_uni(150);
6822        config.set_initial_max_streams_bidi(100);
6823        config.set_initial_max_streams_uni(5);
6824        config.verify_peer(false);
6825        config.enable_dgram(true, 100, 100);
6826
6827        let h3_config = Config::new().unwrap();
6828        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
6829        s.handshake().unwrap();
6830
6831        // Send request followed by DATAGRAM on client side.
6832        let (stream, req) = s.send_request(false).unwrap();
6833
6834        s.send_body_client(stream, true).unwrap();
6835
6836        let ev_headers = Event::Headers {
6837            list: req,
6838            more_frames: true,
6839        };
6840
6841        s.send_dgram_client(0).unwrap();
6842
6843        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
6844        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
6845
6846        assert_eq!(s.poll_server(), Err(Error::Done));
6847    }
6848
6849    #[test]
6850    /// Send a single DATAGRAM and request.
6851    fn poll_datagram_single_read() {
6852        let mut buf = [0; 65535];
6853
6854        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
6855        config
6856            .load_cert_chain_from_pem_file("examples/cert.crt")
6857            .unwrap();
6858        config
6859            .load_priv_key_from_pem_file("examples/cert.key")
6860            .unwrap();
6861        config.set_application_protos(&[b"h3"]).unwrap();
6862        config.set_initial_max_data(1500);
6863        config.set_initial_max_stream_data_bidi_local(150);
6864        config.set_initial_max_stream_data_bidi_remote(150);
6865        config.set_initial_max_stream_data_uni(150);
6866        config.set_initial_max_streams_bidi(100);
6867        config.set_initial_max_streams_uni(5);
6868        config.verify_peer(false);
6869        config.enable_dgram(true, 100, 100);
6870
6871        let h3_config = Config::new().unwrap();
6872        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
6873        s.handshake().unwrap();
6874
6875        // We'll send default data of 10 bytes on flow ID 0.
6876        let result = (11, 0, 1);
6877
6878        // Send request followed by DATAGRAM on client side.
6879        let (stream, req) = s.send_request(false).unwrap();
6880
6881        let body = s.send_body_client(stream, true).unwrap();
6882
6883        let mut recv_buf = vec![0; body.len()];
6884
6885        let ev_headers = Event::Headers {
6886            list: req,
6887            more_frames: true,
6888        };
6889
6890        s.send_dgram_client(0).unwrap();
6891
6892        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
6893        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
6894
6895        assert_eq!(s.poll_server(), Err(Error::Done));
6896
6897        assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
6898
6899        assert_eq!(s.poll_server(), Err(Error::Done));
6900
6901        assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
6902        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
6903        assert_eq!(s.poll_server(), Err(Error::Done));
6904
6905        // Send response followed by DATAGRAM on server side
6906        let resp = s.send_response(stream, false).unwrap();
6907
6908        let body = s.send_body_server(stream, true).unwrap();
6909
6910        let mut recv_buf = vec![0; body.len()];
6911
6912        let ev_headers = Event::Headers {
6913            list: resp,
6914            more_frames: true,
6915        };
6916
6917        s.send_dgram_server(0).unwrap();
6918
6919        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
6920        assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
6921
6922        assert_eq!(s.poll_client(), Err(Error::Done));
6923
6924        assert_eq!(s.recv_dgram_client(&mut buf), Ok(result));
6925
6926        assert_eq!(s.poll_client(), Err(Error::Done));
6927
6928        assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(body.len()));
6929
6930        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
6931        assert_eq!(s.poll_client(), Err(Error::Done));
6932    }
6933
6934    #[test]
6935    /// Send multiple DATAGRAMs and requests.
6936    fn poll_datagram_multi_read() {
6937        let mut buf = [0; 65535];
6938
6939        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
6940        config
6941            .load_cert_chain_from_pem_file("examples/cert.crt")
6942            .unwrap();
6943        config
6944            .load_priv_key_from_pem_file("examples/cert.key")
6945            .unwrap();
6946        config.set_application_protos(&[b"h3"]).unwrap();
6947        config.set_initial_max_data(1500);
6948        config.set_initial_max_stream_data_bidi_local(150);
6949        config.set_initial_max_stream_data_bidi_remote(150);
6950        config.set_initial_max_stream_data_uni(150);
6951        config.set_initial_max_streams_bidi(100);
6952        config.set_initial_max_streams_uni(5);
6953        config.verify_peer(false);
6954        config.enable_dgram(true, 100, 100);
6955
6956        let h3_config = Config::new().unwrap();
6957        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
6958        s.handshake().unwrap();
6959
6960        // 10 bytes on flow ID 0 and 2.
6961        let flow_0_result = (11, 0, 1);
6962        let flow_2_result = (11, 2, 1);
6963
6964        // Send requests followed by DATAGRAMs on client side.
6965        let (stream, req) = s.send_request(false).unwrap();
6966
6967        let body = s.send_body_client(stream, true).unwrap();
6968
6969        let mut recv_buf = vec![0; body.len()];
6970
6971        let ev_headers = Event::Headers {
6972            list: req,
6973            more_frames: true,
6974        };
6975
6976        s.send_dgram_client(0).unwrap();
6977        s.send_dgram_client(0).unwrap();
6978        s.send_dgram_client(0).unwrap();
6979        s.send_dgram_client(0).unwrap();
6980        s.send_dgram_client(0).unwrap();
6981        s.send_dgram_client(2).unwrap();
6982        s.send_dgram_client(2).unwrap();
6983        s.send_dgram_client(2).unwrap();
6984        s.send_dgram_client(2).unwrap();
6985        s.send_dgram_client(2).unwrap();
6986
6987        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
6988        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
6989
6990        assert_eq!(s.poll_server(), Err(Error::Done));
6991
6992        // Second cycle, start to read
6993        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
6994        assert_eq!(s.poll_server(), Err(Error::Done));
6995        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
6996        assert_eq!(s.poll_server(), Err(Error::Done));
6997        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
6998        assert_eq!(s.poll_server(), Err(Error::Done));
6999
7000        assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
7001        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
7002
7003        assert_eq!(s.poll_server(), Err(Error::Done));
7004
7005        // Third cycle.
7006        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
7007        assert_eq!(s.poll_server(), Err(Error::Done));
7008        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
7009        assert_eq!(s.poll_server(), Err(Error::Done));
7010        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
7011        assert_eq!(s.poll_server(), Err(Error::Done));
7012        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
7013        assert_eq!(s.poll_server(), Err(Error::Done));
7014        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
7015        assert_eq!(s.poll_server(), Err(Error::Done));
7016        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
7017        assert_eq!(s.poll_server(), Err(Error::Done));
7018        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
7019        assert_eq!(s.poll_server(), Err(Error::Done));
7020
7021        // Send response followed by DATAGRAM on server side
7022        let resp = s.send_response(stream, false).unwrap();
7023
7024        let body = s.send_body_server(stream, true).unwrap();
7025
7026        let mut recv_buf = vec![0; body.len()];
7027
7028        let ev_headers = Event::Headers {
7029            list: resp,
7030            more_frames: true,
7031        };
7032
7033        s.send_dgram_server(0).unwrap();
7034        s.send_dgram_server(0).unwrap();
7035        s.send_dgram_server(0).unwrap();
7036        s.send_dgram_server(0).unwrap();
7037        s.send_dgram_server(0).unwrap();
7038        s.send_dgram_server(2).unwrap();
7039        s.send_dgram_server(2).unwrap();
7040        s.send_dgram_server(2).unwrap();
7041        s.send_dgram_server(2).unwrap();
7042        s.send_dgram_server(2).unwrap();
7043
7044        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
7045        assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
7046
7047        assert_eq!(s.poll_client(), Err(Error::Done));
7048
7049        // Second cycle, start to read
7050        assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_0_result));
7051        assert_eq!(s.poll_client(), Err(Error::Done));
7052        assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_0_result));
7053        assert_eq!(s.poll_client(), Err(Error::Done));
7054        assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_0_result));
7055        assert_eq!(s.poll_client(), Err(Error::Done));
7056
7057        assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(body.len()));
7058        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
7059
7060        assert_eq!(s.poll_client(), Err(Error::Done));
7061
7062        // Third cycle.
7063        assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_0_result));
7064        assert_eq!(s.poll_client(), Err(Error::Done));
7065        assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_0_result));
7066        assert_eq!(s.poll_client(), Err(Error::Done));
7067        assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_2_result));
7068        assert_eq!(s.poll_client(), Err(Error::Done));
7069        assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_2_result));
7070        assert_eq!(s.poll_client(), Err(Error::Done));
7071        assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_2_result));
7072        assert_eq!(s.poll_client(), Err(Error::Done));
7073        assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_2_result));
7074        assert_eq!(s.poll_client(), Err(Error::Done));
7075        assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_2_result));
7076        assert_eq!(s.poll_client(), Err(Error::Done));
7077    }
7078
7079    #[test]
7080    /// Tests that the Finished event is not issued for streams of unknown type
7081    /// (e.g. GREASE).
7082    fn finished_is_for_requests() {
7083        let mut s = Session::new().unwrap();
7084        s.handshake().unwrap();
7085
7086        assert_eq!(s.poll_client(), Err(Error::Done));
7087        assert_eq!(s.poll_server(), Err(Error::Done));
7088
7089        assert_eq!(s.client.open_grease_stream(&mut s.pipe.client), Ok(()));
7090        assert_eq!(s.pipe.advance(), Ok(()));
7091
7092        assert_eq!(s.poll_client(), Err(Error::Done));
7093        assert_eq!(s.poll_server(), Err(Error::Done));
7094    }
7095
7096    #[test]
7097    /// Tests that streams are marked as finished only once.
7098    fn finished_once() {
7099        let mut s = Session::new().unwrap();
7100        s.handshake().unwrap();
7101
7102        let (stream, req) = s.send_request(false).unwrap();
7103        let body = s.send_body_client(stream, true).unwrap();
7104
7105        let mut recv_buf = vec![0; body.len()];
7106
7107        let ev_headers = Event::Headers {
7108            list: req,
7109            more_frames: true,
7110        };
7111
7112        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
7113        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
7114
7115        assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
7116        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
7117
7118        assert_eq!(s.recv_body_server(stream, &mut recv_buf), Err(Error::Done));
7119        assert_eq!(s.poll_server(), Err(Error::Done));
7120    }
7121
7122    #[test]
7123    /// Tests that the Data event is properly re-armed.
7124    fn data_event_rearm() {
7125        let bytes = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
7126
7127        let mut s = Session::new().unwrap();
7128        s.handshake().unwrap();
7129
7130        let (r1_id, r1_hdrs) = s.send_request(false).unwrap();
7131
7132        let mut recv_buf = vec![0; bytes.len()];
7133
7134        let r1_ev_headers = Event::Headers {
7135            list: r1_hdrs,
7136            more_frames: true,
7137        };
7138
7139        // Manually send an incomplete DATA frame (i.e. the frame size is longer
7140        // than the actual data sent).
7141        {
7142            let mut d = [42; 10];
7143            let mut b = octets::OctetsMut::with_slice(&mut d);
7144
7145            b.put_varint(frame::DATA_FRAME_TYPE_ID).unwrap();
7146            b.put_varint(bytes.len() as u64).unwrap();
7147            let off = b.off();
7148            s.pipe.client.stream_send(r1_id, &d[..off], false).unwrap();
7149
7150            assert_eq!(
7151                s.pipe.client.stream_send(r1_id, &bytes[..5], false),
7152                Ok(5)
7153            );
7154
7155            s.advance().ok();
7156        }
7157
7158        assert_eq!(s.poll_server(), Ok((r1_id, r1_ev_headers)));
7159        assert_eq!(s.poll_server(), Ok((r1_id, Event::Data)));
7160        assert_eq!(s.poll_server(), Err(Error::Done));
7161
7162        // Read the available body data.
7163        assert_eq!(s.recv_body_server(r1_id, &mut recv_buf), Ok(5));
7164
7165        // Send the remaining DATA payload.
7166        assert_eq!(s.pipe.client.stream_send(r1_id, &bytes[5..], false), Ok(5));
7167        s.advance().ok();
7168
7169        assert_eq!(s.poll_server(), Ok((r1_id, Event::Data)));
7170        assert_eq!(s.poll_server(), Err(Error::Done));
7171
7172        // Read the rest of the body data.
7173        assert_eq!(s.recv_body_server(r1_id, &mut recv_buf), Ok(5));
7174        assert_eq!(s.poll_server(), Err(Error::Done));
7175
7176        // Send more data.
7177        let r1_body = s.send_body_client(r1_id, false).unwrap();
7178
7179        assert_eq!(s.poll_server(), Ok((r1_id, Event::Data)));
7180        assert_eq!(s.poll_server(), Err(Error::Done));
7181
7182        assert_eq!(s.recv_body_server(r1_id, &mut recv_buf), Ok(r1_body.len()));
7183
7184        // Send a new request to ensure cross-stream events don't break rearming.
7185        let (r2_id, r2_hdrs) = s.send_request(false).unwrap();
7186        let r2_ev_headers = Event::Headers {
7187            list: r2_hdrs,
7188            more_frames: true,
7189        };
7190        let r2_body = s.send_body_client(r2_id, false).unwrap();
7191
7192        s.advance().ok();
7193
7194        assert_eq!(s.poll_server(), Ok((r2_id, r2_ev_headers)));
7195        assert_eq!(s.poll_server(), Ok((r2_id, Event::Data)));
7196        assert_eq!(s.recv_body_server(r2_id, &mut recv_buf), Ok(r2_body.len()));
7197        assert_eq!(s.poll_server(), Err(Error::Done));
7198
7199        // Send more data on request 1, then trailing HEADERS.
7200        let r1_body = s.send_body_client(r1_id, false).unwrap();
7201
7202        let trailers = vec![Header::new(b"hello", b"world")];
7203
7204        s.client
7205            .send_headers(&mut s.pipe.client, r1_id, &trailers, true)
7206            .unwrap();
7207
7208        let r1_ev_trailers = Event::Headers {
7209            list: trailers.clone(),
7210            more_frames: false,
7211        };
7212
7213        s.advance().ok();
7214
7215        assert_eq!(s.poll_server(), Ok((r1_id, Event::Data)));
7216        assert_eq!(s.recv_body_server(r1_id, &mut recv_buf), Ok(r1_body.len()));
7217
7218        assert_eq!(s.poll_server(), Ok((r1_id, r1_ev_trailers)));
7219        assert_eq!(s.poll_server(), Ok((r1_id, Event::Finished)));
7220        assert_eq!(s.poll_server(), Err(Error::Done));
7221
7222        // Send more data on request 2, then trailing HEADERS.
7223        let r2_body = s.send_body_client(r2_id, false).unwrap();
7224
7225        s.client
7226            .send_headers(&mut s.pipe.client, r2_id, &trailers, false)
7227            .unwrap();
7228
7229        let r2_ev_trailers = Event::Headers {
7230            list: trailers,
7231            more_frames: true,
7232        };
7233
7234        s.advance().ok();
7235
7236        assert_eq!(s.poll_server(), Ok((r2_id, Event::Data)));
7237        assert_eq!(s.recv_body_server(r2_id, &mut recv_buf), Ok(r2_body.len()));
7238        assert_eq!(s.poll_server(), Ok((r2_id, r2_ev_trailers)));
7239        assert_eq!(s.poll_server(), Err(Error::Done));
7240
7241        let (r3_id, r3_hdrs) = s.send_request(false).unwrap();
7242
7243        let r3_ev_headers = Event::Headers {
7244            list: r3_hdrs,
7245            more_frames: true,
7246        };
7247
7248        // Manually send an incomplete DATA frame (i.e. only the header is sent).
7249        {
7250            let mut d = [42; 10];
7251            let mut b = octets::OctetsMut::with_slice(&mut d);
7252
7253            b.put_varint(frame::DATA_FRAME_TYPE_ID).unwrap();
7254            b.put_varint(bytes.len() as u64).unwrap();
7255            let off = b.off();
7256            s.pipe.client.stream_send(r3_id, &d[..off], false).unwrap();
7257
7258            s.advance().ok();
7259        }
7260
7261        assert_eq!(s.poll_server(), Ok((r3_id, r3_ev_headers)));
7262        assert_eq!(s.poll_server(), Ok((r3_id, Event::Data)));
7263        assert_eq!(s.poll_server(), Err(Error::Done));
7264
7265        assert_eq!(s.recv_body_server(r3_id, &mut recv_buf), Err(Error::Done));
7266
7267        assert_eq!(s.pipe.client.stream_send(r3_id, &bytes[..5], false), Ok(5));
7268
7269        s.advance().ok();
7270
7271        assert_eq!(s.poll_server(), Ok((r3_id, Event::Data)));
7272        assert_eq!(s.poll_server(), Err(Error::Done));
7273
7274        assert_eq!(s.recv_body_server(r3_id, &mut recv_buf), Ok(5));
7275
7276        assert_eq!(s.pipe.client.stream_send(r3_id, &bytes[5..], false), Ok(5));
7277        s.advance().ok();
7278
7279        assert_eq!(s.poll_server(), Ok((r3_id, Event::Data)));
7280        assert_eq!(s.poll_server(), Err(Error::Done));
7281
7282        assert_eq!(s.recv_body_server(r3_id, &mut recv_buf), Ok(5));
7283
7284        // Buffer multiple data frames.
7285        let body = s.send_body_client(r3_id, false).unwrap();
7286        s.send_body_client(r3_id, false).unwrap();
7287        s.send_body_client(r3_id, false).unwrap();
7288
7289        assert_eq!(s.poll_server(), Ok((r3_id, Event::Data)));
7290        assert_eq!(s.poll_server(), Err(Error::Done));
7291
7292        {
7293            let mut d = [42; 10];
7294            let mut b = octets::OctetsMut::with_slice(&mut d);
7295
7296            b.put_varint(frame::DATA_FRAME_TYPE_ID).unwrap();
7297            b.put_varint(0).unwrap();
7298            let off = b.off();
7299            s.pipe.client.stream_send(r3_id, &d[..off], true).unwrap();
7300
7301            s.advance().ok();
7302        }
7303
7304        let mut recv_buf = vec![0; bytes.len() * 3];
7305
7306        assert_eq!(s.recv_body_server(r3_id, &mut recv_buf), Ok(body.len() * 3));
7307    }
7308
7309    #[test]
7310    /// Tests that the Datagram event is properly re-armed.
7311    fn dgram_event_rearm() {
7312        let mut buf = [0; 65535];
7313
7314        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
7315        config
7316            .load_cert_chain_from_pem_file("examples/cert.crt")
7317            .unwrap();
7318        config
7319            .load_priv_key_from_pem_file("examples/cert.key")
7320            .unwrap();
7321        config.set_application_protos(&[b"h3"]).unwrap();
7322        config.set_initial_max_data(1500);
7323        config.set_initial_max_stream_data_bidi_local(150);
7324        config.set_initial_max_stream_data_bidi_remote(150);
7325        config.set_initial_max_stream_data_uni(150);
7326        config.set_initial_max_streams_bidi(100);
7327        config.set_initial_max_streams_uni(5);
7328        config.verify_peer(false);
7329        config.enable_dgram(true, 100, 100);
7330
7331        let h3_config = Config::new().unwrap();
7332        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
7333        s.handshake().unwrap();
7334
7335        // 10 bytes on flow ID 0 and 2.
7336        let flow_0_result = (11, 0, 1);
7337        let flow_2_result = (11, 2, 1);
7338
7339        // Send requests followed by DATAGRAMs on client side.
7340        let (stream, req) = s.send_request(false).unwrap();
7341
7342        let body = s.send_body_client(stream, true).unwrap();
7343
7344        let mut recv_buf = vec![0; body.len()];
7345
7346        let ev_headers = Event::Headers {
7347            list: req,
7348            more_frames: true,
7349        };
7350
7351        s.send_dgram_client(0).unwrap();
7352        s.send_dgram_client(0).unwrap();
7353        s.send_dgram_client(2).unwrap();
7354        s.send_dgram_client(2).unwrap();
7355
7356        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
7357        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
7358
7359        assert_eq!(s.poll_server(), Err(Error::Done));
7360        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
7361
7362        assert_eq!(s.poll_server(), Err(Error::Done));
7363        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
7364
7365        assert_eq!(s.poll_server(), Err(Error::Done));
7366        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
7367
7368        assert_eq!(s.poll_server(), Err(Error::Done));
7369        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
7370
7371        assert_eq!(s.poll_server(), Err(Error::Done));
7372
7373        s.send_dgram_client(0).unwrap();
7374        s.send_dgram_client(2).unwrap();
7375
7376        assert_eq!(s.poll_server(), Err(Error::Done));
7377
7378        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
7379        assert_eq!(s.poll_server(), Err(Error::Done));
7380
7381        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
7382        assert_eq!(s.poll_server(), Err(Error::Done));
7383
7384        assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
7385        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
7386
7387        // Verify that dgram counts are incremented.
7388        assert_eq!(s.pipe.client.dgram_sent_count, 6);
7389        assert_eq!(s.pipe.client.dgram_recv_count, 0);
7390        assert_eq!(s.pipe.server.dgram_sent_count, 0);
7391        assert_eq!(s.pipe.server.dgram_recv_count, 6);
7392
7393        let server_path = s.pipe.server.paths.get_active().expect("no active");
7394        let client_path = s.pipe.client.paths.get_active().expect("no active");
7395        assert_eq!(client_path.dgram_sent_count, 6);
7396        assert_eq!(client_path.dgram_recv_count, 0);
7397        assert_eq!(server_path.dgram_sent_count, 0);
7398        assert_eq!(server_path.dgram_recv_count, 6);
7399    }
7400
7401    #[test]
7402    fn reset_stream() {
7403        let mut buf = [0; 65535];
7404
7405        let mut s = Session::new().unwrap();
7406        s.handshake().unwrap();
7407
7408        // Client sends request.
7409        let (stream, req) = s.send_request(false).unwrap();
7410
7411        let ev_headers = Event::Headers {
7412            list: req,
7413            more_frames: true,
7414        };
7415
7416        // Server sends response and closes stream.
7417        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
7418        assert_eq!(s.poll_server(), Err(Error::Done));
7419
7420        let resp = s.send_response(stream, true).unwrap();
7421
7422        let ev_headers = Event::Headers {
7423            list: resp,
7424            more_frames: false,
7425        };
7426
7427        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
7428        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
7429        assert_eq!(s.poll_client(), Err(Error::Done));
7430
7431        // Client sends RESET_STREAM, closing stream.
7432        let frames = [crate::frame::Frame::ResetStream {
7433            stream_id: stream,
7434            error_code: 42,
7435            final_size: 68,
7436        }];
7437
7438        let pkt_type = crate::packet::Type::Short;
7439        assert_eq!(
7440            s.pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
7441            Ok(39)
7442        );
7443
7444        // Server issues Reset event for the stream.
7445        assert_eq!(s.poll_server(), Ok((stream, Event::Reset(42))));
7446        assert_eq!(s.poll_server(), Err(Error::Done));
7447
7448        // Sending RESET_STREAM again shouldn't trigger another Reset event.
7449        assert_eq!(
7450            s.pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
7451            Ok(39)
7452        );
7453
7454        assert_eq!(s.poll_server(), Err(Error::Done));
7455    }
7456
7457    /// The client shuts down the stream's write direction, the server
7458    /// shuts down its side with fin
7459    #[test]
7460    fn client_shutdown_write_server_fin() {
7461        let mut buf = [0; 65535];
7462        let mut s = Session::new().unwrap();
7463        s.handshake().unwrap();
7464
7465        // Client sends request.
7466        let (stream, req) = s.send_request(false).unwrap();
7467
7468        let ev_headers = Event::Headers {
7469            list: req,
7470            more_frames: true,
7471        };
7472
7473        // Server sends response and closes stream.
7474        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
7475        assert_eq!(s.poll_server(), Err(Error::Done));
7476
7477        let resp = s.send_response(stream, true).unwrap();
7478
7479        let ev_headers = Event::Headers {
7480            list: resp,
7481            more_frames: false,
7482        };
7483
7484        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
7485        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
7486        assert_eq!(s.poll_client(), Err(Error::Done));
7487
7488        // Client shuts down stream ==> sends RESET_STREAM
7489        assert_eq!(
7490            s.pipe
7491                .client
7492                .stream_shutdown(stream, crate::Shutdown::Write, 42),
7493            Ok(())
7494        );
7495        assert_eq!(s.advance(), Ok(()));
7496
7497        // Server sees the Reset event for the stream.
7498        assert_eq!(s.poll_server(), Ok((stream, Event::Reset(42))));
7499        assert_eq!(s.poll_server(), Err(Error::Done));
7500
7501        // Streams have been collected by quiche
7502        assert!(s.pipe.server.streams.is_collected(stream));
7503        assert!(s.pipe.client.streams.is_collected(stream));
7504
7505        // Client sends another request, server sends response without fin
7506        //
7507        let (stream, req) = s.send_request(false).unwrap();
7508
7509        let ev_headers = Event::Headers {
7510            list: req,
7511            more_frames: true,
7512        };
7513
7514        // Check that server has received the request.
7515        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
7516        assert_eq!(s.poll_server(), Err(Error::Done));
7517
7518        // Server sends reponse without closing the stream.
7519        let resp = s.send_response(stream, false).unwrap();
7520
7521        let ev_headers = Event::Headers {
7522            list: resp,
7523            more_frames: true,
7524        };
7525
7526        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
7527        assert_eq!(s.poll_client(), Err(Error::Done));
7528
7529        // Client shuts down stream ==> sends RESET_STREAM
7530        assert_eq!(
7531            s.pipe
7532                .client
7533                .stream_shutdown(stream, crate::Shutdown::Write, 42),
7534            Ok(())
7535        );
7536        assert_eq!(s.advance(), Ok(()));
7537
7538        // Server sees the Reset event for the stream.
7539        assert_eq!(s.poll_server(), Ok((stream, Event::Reset(42))));
7540        assert_eq!(s.poll_server(), Err(Error::Done));
7541
7542        // Server sends body and closes the stream.
7543        s.send_body_server(stream, true).unwrap();
7544
7545        // Stream has been collected on server by quiche
7546        assert!(s.pipe.server.streams.is_collected(stream));
7547        // Client stream has not been collected, the client needs to
7548        // read the fin from the stream first.
7549        assert!(!s.pipe.client.streams.is_collected(stream));
7550        assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
7551        s.recv_body_client(stream, &mut buf).unwrap();
7552        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
7553        assert_eq!(s.poll_client(), Err(Error::Done));
7554        assert!(s.pipe.client.streams.is_collected(stream));
7555    }
7556
7557    #[test]
7558    fn client_shutdown_read() {
7559        let mut buf = [0; 65535];
7560        let mut s = Session::new().unwrap();
7561        s.handshake().unwrap();
7562
7563        // Client sends request and leaves stream open.
7564        let (stream, req) = s.send_request(false).unwrap();
7565
7566        let ev_headers = Event::Headers {
7567            list: req,
7568            more_frames: true,
7569        };
7570
7571        // Server sends response and leaves stream open.
7572        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
7573        assert_eq!(s.poll_server(), Err(Error::Done));
7574
7575        let resp = s.send_response(stream, false).unwrap();
7576
7577        let ev_headers = Event::Headers {
7578            list: resp,
7579            more_frames: true,
7580        };
7581
7582        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
7583        assert_eq!(s.poll_client(), Err(Error::Done));
7584        // Client shuts down read
7585        assert_eq!(
7586            s.pipe
7587                .client
7588                .stream_shutdown(stream, crate::Shutdown::Read, 42),
7589            Ok(())
7590        );
7591        assert_eq!(s.advance(), Ok(()));
7592
7593        // Stream is writable on server side, but returns StreamStopped
7594        assert_eq!(s.poll_server(), Err(Error::Done));
7595        let writables: Vec<u64> = s.pipe.server.writable().collect();
7596        assert!(writables.contains(&stream));
7597        assert_eq!(
7598            s.send_body_server(stream, false),
7599            Err(Error::TransportError(crate::Error::StreamStopped(42)))
7600        );
7601
7602        // Client needs to finish its side by sending a fin
7603        assert_eq!(
7604            s.client.send_body(&mut s.pipe.client, stream, &[], true),
7605            Ok(0)
7606        );
7607        assert_eq!(s.advance(), Ok(()));
7608        // Note, we get an Event::Data for an empty buffer today. But it
7609        // would also be fine to not get it.
7610        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
7611        assert_eq!(s.recv_body_server(stream, &mut buf), Err(Error::Done));
7612        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
7613        assert_eq!(s.poll_server(), Err(Error::Done));
7614
7615        // Since the client has already send a fin, the stream is collected
7616        // on both client and server
7617        assert!(s.pipe.client.streams.is_collected(stream));
7618        assert!(s.pipe.server.streams.is_collected(stream));
7619    }
7620
7621    #[test]
7622    fn reset_finished_at_server() {
7623        let mut s = Session::new().unwrap();
7624        s.handshake().unwrap();
7625
7626        // Client sends HEADERS and doesn't fin
7627        let (stream, _req) = s.send_request(false).unwrap();
7628
7629        // ..then Client sends RESET_STREAM
7630        assert_eq!(
7631            s.pipe.client.stream_shutdown(0, crate::Shutdown::Write, 0),
7632            Ok(())
7633        );
7634
7635        assert_eq!(s.pipe.advance(), Ok(()));
7636
7637        // Server receives just a reset
7638        assert_eq!(s.poll_server(), Ok((stream, Event::Reset(0))));
7639        assert_eq!(s.poll_server(), Err(Error::Done));
7640
7641        // Client sends HEADERS and fin
7642        let (stream, req) = s.send_request(true).unwrap();
7643
7644        // ..then Client sends RESET_STREAM
7645        assert_eq!(
7646            s.pipe.client.stream_shutdown(4, crate::Shutdown::Write, 0),
7647            Ok(())
7648        );
7649
7650        let ev_headers = Event::Headers {
7651            list: req,
7652            more_frames: false,
7653        };
7654
7655        // Server receives headers and fin.
7656        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
7657        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
7658        assert_eq!(s.poll_server(), Err(Error::Done));
7659    }
7660
7661    #[test]
7662    fn reset_finished_at_server_with_data_pending() {
7663        let mut s = Session::new().unwrap();
7664        s.handshake().unwrap();
7665
7666        // Client sends HEADERS and doesn't fin.
7667        let (stream, req) = s.send_request(false).unwrap();
7668
7669        assert!(s.send_body_client(stream, false).is_ok());
7670
7671        assert_eq!(s.pipe.advance(), Ok(()));
7672
7673        let ev_headers = Event::Headers {
7674            list: req,
7675            more_frames: true,
7676        };
7677
7678        // Server receives headers and data...
7679        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
7680        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
7681
7682        // ..then Client sends RESET_STREAM.
7683        assert_eq!(
7684            s.pipe
7685                .client
7686                .stream_shutdown(stream, crate::Shutdown::Write, 0),
7687            Ok(())
7688        );
7689
7690        assert_eq!(s.pipe.advance(), Ok(()));
7691
7692        // The server does *not* attempt to read from the stream,
7693        // but polls and receives the reset and there are no more
7694        // readable streams.
7695        assert_eq!(s.poll_server(), Ok((stream, Event::Reset(0))));
7696        assert_eq!(s.poll_server(), Err(Error::Done));
7697        assert_eq!(s.pipe.server.readable().len(), 0);
7698    }
7699
7700    #[test]
7701    fn reset_finished_at_server_with_data_pending_2() {
7702        let mut s = Session::new().unwrap();
7703        s.handshake().unwrap();
7704
7705        // Client sends HEADERS and doesn't fin.
7706        let (stream, req) = s.send_request(false).unwrap();
7707
7708        assert!(s.send_body_client(stream, false).is_ok());
7709
7710        assert_eq!(s.pipe.advance(), Ok(()));
7711
7712        let ev_headers = Event::Headers {
7713            list: req,
7714            more_frames: true,
7715        };
7716
7717        // Server receives headers and data...
7718        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
7719        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
7720
7721        // ..then Client sends RESET_STREAM.
7722        assert_eq!(
7723            s.pipe
7724                .client
7725                .stream_shutdown(stream, crate::Shutdown::Write, 0),
7726            Ok(())
7727        );
7728
7729        assert_eq!(s.pipe.advance(), Ok(()));
7730
7731        // Server reads from the stream and receives the reset while
7732        // attempting to read.
7733        assert_eq!(
7734            s.recv_body_server(stream, &mut [0; 100]),
7735            Err(Error::TransportError(crate::Error::StreamReset(0)))
7736        );
7737
7738        // No more events and there are no more readable streams.
7739        assert_eq!(s.poll_server(), Err(Error::Done));
7740        assert_eq!(s.pipe.server.readable().len(), 0);
7741    }
7742
7743    #[test]
7744    fn reset_finished_at_client() {
7745        let mut buf = [0; 65535];
7746        let mut s = Session::new().unwrap();
7747        s.handshake().unwrap();
7748
7749        // Client sends HEADERS and doesn't fin
7750        let (stream, req) = s.send_request(false).unwrap();
7751
7752        let ev_headers = Event::Headers {
7753            list: req,
7754            more_frames: true,
7755        };
7756
7757        // Server receives headers.
7758        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
7759        assert_eq!(s.poll_server(), Err(Error::Done));
7760
7761        // Server sends response and doesn't fin
7762        s.send_response(stream, false).unwrap();
7763
7764        assert_eq!(s.pipe.advance(), Ok(()));
7765
7766        // .. then Server sends RESET_STREAM
7767        assert_eq!(
7768            s.pipe
7769                .server
7770                .stream_shutdown(stream, crate::Shutdown::Write, 0),
7771            Ok(())
7772        );
7773
7774        assert_eq!(s.pipe.advance(), Ok(()));
7775
7776        // Client receives Reset only
7777        assert_eq!(s.poll_client(), Ok((stream, Event::Reset(0))));
7778        assert_eq!(s.poll_server(), Err(Error::Done));
7779
7780        // Client sends headers and fin.
7781        let (stream, req) = s.send_request(true).unwrap();
7782
7783        let ev_headers = Event::Headers {
7784            list: req,
7785            more_frames: false,
7786        };
7787
7788        // Server receives headers and fin.
7789        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
7790        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
7791        assert_eq!(s.poll_server(), Err(Error::Done));
7792
7793        // Server sends response and fin
7794        let resp = s.send_response(stream, true).unwrap();
7795
7796        assert_eq!(s.pipe.advance(), Ok(()));
7797
7798        // ..then Server sends RESET_STREAM
7799        let frames = [crate::frame::Frame::ResetStream {
7800            stream_id: stream,
7801            error_code: 42,
7802            final_size: 68,
7803        }];
7804
7805        let pkt_type = crate::packet::Type::Short;
7806        assert_eq!(
7807            s.pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
7808            Ok(39)
7809        );
7810
7811        assert_eq!(s.pipe.advance(), Ok(()));
7812
7813        let ev_headers = Event::Headers {
7814            list: resp,
7815            more_frames: false,
7816        };
7817
7818        // Client receives headers and fin.
7819        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
7820        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
7821        assert_eq!(s.poll_client(), Err(Error::Done));
7822    }
7823}
7824
7825#[cfg(feature = "ffi")]
7826mod ffi;
7827#[cfg(feature = "internal")]
7828#[doc(hidden)]
7829pub mod frame;
7830#[cfg(not(feature = "internal"))]
7831mod frame;
7832#[doc(hidden)]
7833pub mod qpack;
7834mod stream;