Skip to main content

quiche/h3/
mod.rs

1// Copyright (C) 2019, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27//! HTTP/3 wire protocol and QPACK implementation.
28//!
29//! This module provides a high level API for sending and receiving HTTP/3
30//! requests and responses on top of the QUIC transport protocol.
31//!
32//! ## Connection setup
33//!
34//! HTTP/3 connections require a QUIC transport-layer connection, see
35//! [Connection setup] for a full description of the setup process.
36//!
37//! To use HTTP/3, the QUIC connection must be configured with a suitable
38//! Application Layer Protocol Negotiation (ALPN) Protocol ID:
39//!
40//! ```
41//! let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION)?;
42//! config.set_application_protos(quiche::h3::APPLICATION_PROTOCOL)?;
43//! # Ok::<(), quiche::Error>(())
44//! ```
45//!
46//! The QUIC handshake is driven by [sending] and [receiving] QUIC packets.
47//!
48//! Once the handshake has completed, the first step in establishing an HTTP/3
49//! connection is creating its configuration object:
50//!
51//! ```
52//! let h3_config = quiche::h3::Config::new()?;
53//! # Ok::<(), quiche::h3::Error>(())
54//! ```
55//!
56//! HTTP/3 client and server connections are both created using the
57//! [`with_transport()`] function, the role is inferred from the type of QUIC
58//! connection:
59//!
60//! ```no_run
61//! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap();
62//! # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]);
63//! # let peer = "127.0.0.1:1234".parse().unwrap();
64//! # let local = "127.0.0.1:4321".parse().unwrap();
65//! # let mut conn = quiche::accept(&scid, None, local, peer, &mut config).unwrap();
66//! # let h3_config = quiche::h3::Config::new()?;
67//! let h3_conn = quiche::h3::Connection::with_transport(&mut conn, &h3_config)?;
68//! # Ok::<(), quiche::h3::Error>(())
69//! ```
70//!
71//! ## Sending a request
72//!
73//! An HTTP/3 client can send a request by using the connection's
74//! [`send_request()`] method to queue request headers; [sending] QUIC packets
75//! causes the requests to get sent to the peer:
76//!
77//! ```no_run
78//! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap();
79//! # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]);
80//! # let peer = "127.0.0.1:1234".parse().unwrap();
81//! # let local = "127.0.0.1:4321".parse().unwrap();
82//! # let mut conn = quiche::connect(None, &scid, local, peer, &mut config).unwrap();
83//! # let h3_config = quiche::h3::Config::new()?;
84//! # let mut h3_conn = quiche::h3::Connection::with_transport(&mut conn, &h3_config)?;
85//! let req = vec![
86//!     quiche::h3::Header::new(b":method", b"GET"),
87//!     quiche::h3::Header::new(b":scheme", b"https"),
88//!     quiche::h3::Header::new(b":authority", b"quic.tech"),
89//!     quiche::h3::Header::new(b":path", b"/"),
90//!     quiche::h3::Header::new(b"user-agent", b"quiche"),
91//! ];
92//!
93//! h3_conn.send_request(&mut conn, &req, true)?;
94//! # Ok::<(), quiche::h3::Error>(())
95//! ```
96//!
97//! An HTTP/3 client can send a request with additional body data by using
98//! the connection's [`send_body()`] method:
99//!
100//! ```no_run
101//! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap();
102//! # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]);
103//! # let peer = "127.0.0.1:1234".parse().unwrap();
104//! # let local = "127.0.0.1:4321".parse().unwrap();
105//! # let mut conn = quiche::connect(None, &scid, local, peer, &mut config).unwrap();
106//! # let h3_config = quiche::h3::Config::new()?;
107//! # let mut h3_conn = quiche::h3::Connection::with_transport(&mut conn, &h3_config)?;
108//! let req = vec![
109//!     quiche::h3::Header::new(b":method", b"GET"),
110//!     quiche::h3::Header::new(b":scheme", b"https"),
111//!     quiche::h3::Header::new(b":authority", b"quic.tech"),
112//!     quiche::h3::Header::new(b":path", b"/"),
113//!     quiche::h3::Header::new(b"user-agent", b"quiche"),
114//! ];
115//!
116//! let stream_id = h3_conn.send_request(&mut conn, &req, false)?;
117//! h3_conn.send_body(&mut conn, stream_id, b"Hello World!", true)?;
118//! # Ok::<(), quiche::h3::Error>(())
119//! ```
120//!
121//! ## Handling requests and responses
122//!
123//! After [receiving] QUIC packets, HTTP/3 data is processed using the
124//! connection's [`poll()`] method. On success, this returns an [`Event`] object
125//! and an ID corresponding to the stream where the `Event` originated.
126//!
127//! An HTTP/3 server uses [`poll()`] to read requests and responds to them using
128//! [`send_response()`] and [`send_body()`]:
129//!
130//! ```no_run
131//! use quiche::h3::NameValue;
132//!
133//! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap();
134//! # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]);
135//! # let peer = "127.0.0.1:1234".parse().unwrap();
136//! # let local = "127.0.0.1:1234".parse().unwrap();
137//! # let mut conn = quiche::accept(&scid, None, local, peer, &mut config).unwrap();
138//! # let h3_config = quiche::h3::Config::new()?;
139//! # let mut h3_conn = quiche::h3::Connection::with_transport(&mut conn, &h3_config)?;
140//! loop {
141//!     match h3_conn.poll(&mut conn) {
142//!         Ok((stream_id, quiche::h3::Event::Headers{list, more_frames})) => {
143//!             let mut headers = list.into_iter();
144//!
145//!             // Look for the request's method.
146//!             let method = headers.find(|h| h.name() == b":method").unwrap();
147//!
148//!             // Look for the request's path.
149//!             let path = headers.find(|h| h.name() == b":path").unwrap();
150//!
151//!             if method.value() == b"GET" && path.value() == b"/" {
152//!                 let resp = vec![
153//!                     quiche::h3::Header::new(b":status", 200.to_string().as_bytes()),
154//!                     quiche::h3::Header::new(b"server", b"quiche"),
155//!                 ];
156//!
157//!                 h3_conn.send_response(&mut conn, stream_id, &resp, false)?;
158//!                 h3_conn.send_body(&mut conn, stream_id, b"Hello World!", true)?;
159//!             }
160//!         },
161//!
162//!         Ok((stream_id, quiche::h3::Event::Data)) => {
163//!             // Request body data, handle it.
164//!             # return Ok(());
165//!         },
166//!
167//!         Ok((stream_id, quiche::h3::Event::Finished)) => {
168//!             // Peer terminated stream, handle it.
169//!         },
170//!
171//!         Ok((stream_id, quiche::h3::Event::Reset(err))) => {
172//!             // Peer reset the stream, handle it.
173//!         },
174//!
175//!         Ok((_flow_id, quiche::h3::Event::PriorityUpdate)) => (),
176//!
177//!         Ok((goaway_id, quiche::h3::Event::GoAway)) => {
178//!              // Peer signalled it is going away, handle it.
179//!         },
180//!
181//!         Err(quiche::h3::Error::Done) => {
182//!             // Done reading.
183//!             break;
184//!         },
185//!
186//!         Err(e) => {
187//!             // An error occurred, handle it.
188//!             break;
189//!         },
190//!     }
191//! }
192//! # Ok::<(), quiche::h3::Error>(())
193//! ```
194//!
195//! An HTTP/3 client uses [`poll()`] to read responses:
196//!
197//! ```no_run
198//! use quiche::h3::NameValue;
199//!
200//! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap();
201//! # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]);
202//! # let peer = "127.0.0.1:1234".parse().unwrap();
203//! # let local = "127.0.0.1:1234".parse().unwrap();
204//! # let mut conn = quiche::connect(None, &scid, local, peer, &mut config).unwrap();
205//! # let h3_config = quiche::h3::Config::new()?;
206//! # let mut h3_conn = quiche::h3::Connection::with_transport(&mut conn, &h3_config)?;
207//! loop {
208//!     match h3_conn.poll(&mut conn) {
209//!         Ok((stream_id, quiche::h3::Event::Headers{list, more_frames})) => {
210//!             let status = list.iter().find(|h| h.name() == b":status").unwrap();
211//!             println!("Received {} response on stream {}",
212//!                      std::str::from_utf8(status.value()).unwrap(),
213//!                      stream_id);
214//!         },
215//!
216//!         Ok((stream_id, quiche::h3::Event::Data)) => {
217//!             let mut body = vec![0; 4096];
218//!
219//!             // Consume all body data received on the stream.
220//!             while let Ok(read) =
221//!                 h3_conn.recv_body(&mut conn, stream_id, &mut body)
222//!             {
223//!                 println!("Received {} bytes of payload on stream {}",
224//!                          read, stream_id);
225//!             }
226//!         },
227//!
228//!         Ok((stream_id, quiche::h3::Event::Finished)) => {
229//!             // Peer terminated stream, handle it.
230//!         },
231//!
232//!         Ok((stream_id, quiche::h3::Event::Reset(err))) => {
233//!             // Peer reset the stream, handle it.
234//!         },
235//!
236//!         Ok((_prioritized_element_id, quiche::h3::Event::PriorityUpdate)) => (),
237//!
238//!         Ok((goaway_id, quiche::h3::Event::GoAway)) => {
239//!              // Peer signalled it is going away, handle it.
240//!         },
241//!
242//!         Err(quiche::h3::Error::Done) => {
243//!             // Done reading.
244//!             break;
245//!         },
246//!
247//!         Err(e) => {
248//!             // An error occurred, handle it.
249//!             break;
250//!         },
251//!     }
252//! }
253//! # Ok::<(), quiche::h3::Error>(())
254//! ```
255//!
256//! ## Detecting end of request or response
257//!
258//! A single HTTP/3 request or response may consist of several HEADERS and DATA
259//! frames; it is finished when the QUIC stream is closed. Calling [`poll()`]
260//! repeatedly will generate an [`Event`] for each of these. The application may
261//! use these event to do additional HTTP semantic validation.
262//!
263//! ## HTTP/3 protocol errors
264//!
265//! Quiche is responsible for managing the HTTP/3 connection, ensuring it is in
266//! a correct state and validating all messages received by a peer. This mainly
267//! takes place in the [`poll()`] method. If an HTTP/3 error occurs, quiche will
268//! close the connection and send an appropriate CONNECTION_CLOSE frame to the
269//! peer. An [`Error`] is returned to the application so that it can perform any
270//! required tidy up such as closing sockets.
271//!
272//! [`application_proto()`]: ../struct.Connection.html#method.application_proto
273//! [`stream_finished()`]: ../struct.Connection.html#method.stream_finished
274//! [Connection setup]: ../index.html#connection-setup
275//! [sending]: ../index.html#generating-outgoing-packets
276//! [receiving]: ../index.html#handling-incoming-packets
277//! [`with_transport()`]: struct.Connection.html#method.with_transport
278//! [`poll()`]: struct.Connection.html#method.poll
279//! [`Event`]: enum.Event.html
280//! [`Error`]: enum.Error.html
281//! [`send_request()`]: struct.Connection.html#method.send_response
282//! [`send_response()`]: struct.Connection.html#method.send_response
283//! [`send_body()`]: struct.Connection.html#method.send_body
284
285use std::collections::HashSet;
286use std::collections::VecDeque;
287
288#[cfg(feature = "sfv")]
289use std::convert::TryFrom;
290use std::fmt;
291use std::fmt::Write;
292
293#[cfg(feature = "qlog")]
294use qlog::events::http3::FrameCreated;
295#[cfg(feature = "qlog")]
296use qlog::events::http3::FrameParsed;
297#[cfg(feature = "qlog")]
298use qlog::events::http3::Http3EventType;
299#[cfg(feature = "qlog")]
300use qlog::events::http3::Http3Frame;
301#[cfg(feature = "qlog")]
302use qlog::events::http3::Initiator;
303#[cfg(feature = "qlog")]
304use qlog::events::http3::PriorityTargetStreamType;
305#[cfg(feature = "qlog")]
306use qlog::events::http3::StreamType;
307#[cfg(feature = "qlog")]
308use qlog::events::http3::StreamTypeSet;
309#[cfg(feature = "qlog")]
310use qlog::events::EventData;
311#[cfg(feature = "qlog")]
312use qlog::events::EventImportance;
313#[cfg(feature = "qlog")]
314use qlog::events::EventType;
315
316use crate::buffers::BufFactory;
317use crate::BufSplit;
318
319/// List of ALPN tokens of supported HTTP/3 versions.
320///
321/// This can be passed directly to the [`Config::set_application_protos()`]
322/// method when implementing HTTP/3 applications.
323///
324/// [`Config::set_application_protos()`]:
325/// ../struct.Config.html#method.set_application_protos
326pub const APPLICATION_PROTOCOL: &[&[u8]] = &[b"h3"];
327
328// The offset used when converting HTTP/3 urgency to quiche urgency.
329const PRIORITY_URGENCY_OFFSET: u8 = 124;
330
331// Parameter values as specified in [Extensible Priorities].
332//
333// [Extensible Priorities]: https://www.rfc-editor.org/rfc/rfc9218.html#section-4.
334const PRIORITY_URGENCY_LOWER_BOUND: u8 = 0;
335const PRIORITY_URGENCY_UPPER_BOUND: u8 = 7;
336const PRIORITY_URGENCY_DEFAULT: u8 = 3;
337const PRIORITY_INCREMENTAL_DEFAULT: bool = false;
338
339#[cfg(feature = "qlog")]
340const QLOG_FRAME_CREATED: EventType =
341    EventType::Http3EventType(Http3EventType::FrameCreated);
342#[cfg(feature = "qlog")]
343const QLOG_FRAME_PARSED: EventType =
344    EventType::Http3EventType(Http3EventType::FrameParsed);
345#[cfg(feature = "qlog")]
346const QLOG_STREAM_TYPE_SET: EventType =
347    EventType::Http3EventType(Http3EventType::StreamTypeSet);
348
349/// A specialized [`Result`] type for quiche HTTP/3 operations.
350///
351/// This type is used throughout quiche's HTTP/3 public API for any operation
352/// that can produce an error.
353///
354/// [`Result`]: https://doc.rust-lang.org/std/result/enum.Result.html
355pub type Result<T> = std::result::Result<T, Error>;
356
357/// An HTTP/3 error.
358#[derive(Clone, Copy, Debug, PartialEq, Eq)]
359pub enum Error {
360    /// There is no error or no work to do
361    Done,
362
363    /// The provided buffer is too short.
364    BufferTooShort,
365
366    /// Internal error in the HTTP/3 stack.
367    InternalError,
368
369    /// Endpoint detected that the peer is exhibiting behavior that causes.
370    /// excessive load.
371    ExcessiveLoad,
372
373    /// Stream ID or Push ID greater that current maximum was
374    /// used incorrectly, such as exceeding a limit, reducing a limit,
375    /// or being reused.
376    IdError,
377
378    /// The endpoint detected that its peer created a stream that it will not
379    /// accept.
380    StreamCreationError,
381
382    /// A required critical stream was closed.
383    ClosedCriticalStream,
384
385    /// No SETTINGS frame at beginning of control stream.
386    MissingSettings,
387
388    /// A frame was received which is not permitted in the current state.
389    FrameUnexpected,
390
391    /// Frame violated layout or size rules.
392    FrameError,
393
394    /// QPACK Header block decompression failure.
395    QpackDecompressionFailed,
396
397    /// Error originated from the transport layer.
398    TransportError(crate::Error),
399
400    /// The underlying QUIC stream (or connection) doesn't have enough capacity
401    /// for the operation to complete. The application should retry later on.
402    StreamBlocked,
403
404    /// Error in the payload of a SETTINGS frame.
405    SettingsError,
406
407    /// Server rejected request.
408    RequestRejected,
409
410    /// Request or its response cancelled.
411    RequestCancelled,
412
413    /// Client's request stream terminated without containing a full-formed
414    /// request.
415    RequestIncomplete,
416
417    /// An HTTP message was malformed and cannot be processed.
418    MessageError,
419
420    /// The TCP connection established in response to a CONNECT request was
421    /// reset or abnormally closed.
422    ConnectError,
423
424    /// The requested operation cannot be served over HTTP/3. Peer should retry
425    /// over HTTP/1.1.
426    VersionFallback,
427}
428
429/// HTTP/3 error codes sent on the wire.
430///
431/// As defined in [RFC9114](https://www.rfc-editor.org/rfc/rfc9114.html#http-error-codes).
432#[derive(Copy, Clone, Debug, Eq, PartialEq)]
433pub enum WireErrorCode {
434    /// No error. This is used when the connection or stream needs to be closed,
435    /// but there is no error to signal.
436    NoError              = 0x100,
437    /// Peer violated protocol requirements in a way that does not match a more
438    /// specific error code or endpoint declines to use the more specific
439    /// error code.
440    GeneralProtocolError = 0x101,
441    /// An internal error has occurred in the HTTP stack.
442    InternalError        = 0x102,
443    /// The endpoint detected that its peer created a stream that it will not
444    /// accept.
445    StreamCreationError  = 0x103,
446    /// A stream required by the HTTP/3 connection was closed or reset.
447    ClosedCriticalStream = 0x104,
448    /// A frame was received that was not permitted in the current state or on
449    /// the current stream.
450    FrameUnexpected      = 0x105,
451    /// A frame that fails to satisfy layout requirements or with an invalid
452    /// size was received.
453    FrameError           = 0x106,
454    /// The endpoint detected that its peer is exhibiting a behavior that might
455    /// be generating excessive load.
456    ExcessiveLoad        = 0x107,
457    /// A stream ID or push ID was used incorrectly, such as exceeding a limit,
458    /// reducing a limit, or being reused.
459    IdError              = 0x108,
460    /// An endpoint detected an error in the payload of a SETTINGS frame.
461    SettingsError        = 0x109,
462    /// No SETTINGS frame was received at the beginning of the control stream.
463    MissingSettings      = 0x10a,
464    /// A server rejected a request without performing any application
465    /// processing.
466    RequestRejected      = 0x10b,
467    /// The request or its response (including pushed response) is cancelled.
468    RequestCancelled     = 0x10c,
469    /// The client's stream terminated without containing a fully formed
470    /// request.
471    RequestIncomplete    = 0x10d,
472    /// An HTTP message was malformed and cannot be processed.
473    MessageError         = 0x10e,
474    /// The TCP connection established in response to a CONNECT request was
475    /// reset or abnormally closed.
476    ConnectError         = 0x10f,
477    /// The requested operation cannot be served over HTTP/3. The peer should
478    /// retry over HTTP/1.1.
479    VersionFallback      = 0x110,
480}
481
482impl Error {
483    fn to_wire(self) -> u64 {
484        match self {
485            Error::Done => WireErrorCode::NoError as u64,
486            Error::InternalError => WireErrorCode::InternalError as u64,
487            Error::StreamCreationError =>
488                WireErrorCode::StreamCreationError as u64,
489            Error::ClosedCriticalStream =>
490                WireErrorCode::ClosedCriticalStream as u64,
491            Error::FrameUnexpected => WireErrorCode::FrameUnexpected as u64,
492            Error::FrameError => WireErrorCode::FrameError as u64,
493            Error::ExcessiveLoad => WireErrorCode::ExcessiveLoad as u64,
494            Error::IdError => WireErrorCode::IdError as u64,
495            Error::MissingSettings => WireErrorCode::MissingSettings as u64,
496            Error::QpackDecompressionFailed => 0x200,
497            Error::BufferTooShort => 0x999,
498            Error::TransportError { .. } | Error::StreamBlocked => 0xFF,
499            Error::SettingsError => WireErrorCode::SettingsError as u64,
500            Error::RequestRejected => WireErrorCode::RequestRejected as u64,
501            Error::RequestCancelled => WireErrorCode::RequestCancelled as u64,
502            Error::RequestIncomplete => WireErrorCode::RequestIncomplete as u64,
503            Error::MessageError => WireErrorCode::MessageError as u64,
504            Error::ConnectError => WireErrorCode::ConnectError as u64,
505            Error::VersionFallback => WireErrorCode::VersionFallback as u64,
506        }
507    }
508
509    #[cfg(feature = "ffi")]
510    fn to_c(self) -> libc::ssize_t {
511        match self {
512            Error::Done => -1,
513            Error::BufferTooShort => -2,
514            Error::InternalError => -3,
515            Error::ExcessiveLoad => -4,
516            Error::IdError => -5,
517            Error::StreamCreationError => -6,
518            Error::ClosedCriticalStream => -7,
519            Error::MissingSettings => -8,
520            Error::FrameUnexpected => -9,
521            Error::FrameError => -10,
522            Error::QpackDecompressionFailed => -11,
523            // -12 was previously used for TransportError, skip it
524            Error::StreamBlocked => -13,
525            Error::SettingsError => -14,
526            Error::RequestRejected => -15,
527            Error::RequestCancelled => -16,
528            Error::RequestIncomplete => -17,
529            Error::MessageError => -18,
530            Error::ConnectError => -19,
531            Error::VersionFallback => -20,
532
533            Error::TransportError(quic_error) => quic_error.to_c() - 1000,
534        }
535    }
536}
537
538impl fmt::Display for Error {
539    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
540        write!(f, "{self:?}")
541    }
542}
543
544impl std::error::Error for Error {
545    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
546        None
547    }
548}
549
550impl From<super::Error> for Error {
551    fn from(err: super::Error) -> Self {
552        match err {
553            super::Error::Done => Error::Done,
554
555            _ => Error::TransportError(err),
556        }
557    }
558}
559
560impl From<octets::BufferTooShortError> for Error {
561    fn from(_err: octets::BufferTooShortError) -> Self {
562        Error::BufferTooShort
563    }
564}
565
566/// An HTTP/3 configuration.
567pub struct Config {
568    max_field_section_size: Option<u64>,
569    qpack_max_table_capacity: Option<u64>,
570    qpack_blocked_streams: Option<u64>,
571    connect_protocol_enabled: Option<u64>,
572    /// additional settings are settings that are not part of the H3
573    /// settings explicitly handled above
574    additional_settings: Option<Vec<(u64, u64)>>,
575}
576
577impl Config {
578    /// Creates a new configuration object with default settings.
579    pub const fn new() -> Result<Config> {
580        Ok(Config {
581            max_field_section_size: None,
582            qpack_max_table_capacity: None,
583            qpack_blocked_streams: None,
584            connect_protocol_enabled: None,
585            additional_settings: None,
586        })
587    }
588
589    /// Sets the `SETTINGS_MAX_FIELD_SECTION_SIZE` setting.
590    ///
591    /// By default no limit is enforced. When a request whose headers exceed
592    /// the limit set by the application is received, the call to the [`poll()`]
593    /// method will return the [`Error::ExcessiveLoad`] error, and the
594    /// connection will be closed.
595    ///
596    /// [`poll()`]: struct.Connection.html#method.poll
597    /// [`Error::ExcessiveLoad`]: enum.Error.html#variant.ExcessiveLoad
598    pub fn set_max_field_section_size(&mut self, v: u64) {
599        self.max_field_section_size = Some(v);
600    }
601
602    /// Sets the `SETTINGS_QPACK_MAX_TABLE_CAPACITY` setting.
603    ///
604    /// The default value is `0`.
605    pub fn set_qpack_max_table_capacity(&mut self, v: u64) {
606        self.qpack_max_table_capacity = Some(v);
607    }
608
609    /// Sets the `SETTINGS_QPACK_BLOCKED_STREAMS` setting.
610    ///
611    /// The default value is `0`.
612    pub fn set_qpack_blocked_streams(&mut self, v: u64) {
613        self.qpack_blocked_streams = Some(v);
614    }
615
616    /// Sets or omits the `SETTINGS_ENABLE_CONNECT_PROTOCOL` setting.
617    ///
618    /// The default value is `false`.
619    pub fn enable_extended_connect(&mut self, enabled: bool) {
620        if enabled {
621            self.connect_protocol_enabled = Some(1);
622        } else {
623            self.connect_protocol_enabled = None;
624        }
625    }
626
627    /// Sets additional HTTP/3 settings.
628    ///
629    /// The default value is no additional settings.
630    /// The `additional_settings` parameter must not the following
631    /// settings as they are already handled by this library:
632    ///
633    /// - SETTINGS_QPACK_MAX_TABLE_CAPACITY
634    /// - SETTINGS_MAX_FIELD_SECTION_SIZE
635    /// - SETTINGS_QPACK_BLOCKED_STREAMS
636    /// - SETTINGS_ENABLE_CONNECT_PROTOCOL
637    /// - SETTINGS_H3_DATAGRAM
638    ///
639    /// If such a setting is present in the `additional_settings`,
640    /// the method will return the [`Error::SettingsError`] error.
641    ///
642    /// If a setting identifier is present twice in `additional_settings`,
643    /// the method will return the [`Error::SettingsError`] error.
644    ///
645    /// [`Error::SettingsError`]: enum.Error.html#variant.SettingsError
646    pub fn set_additional_settings(
647        &mut self, additional_settings: Vec<(u64, u64)>,
648    ) -> Result<()> {
649        let explicit_quiche_settings = HashSet::from([
650            frame::SETTINGS_QPACK_MAX_TABLE_CAPACITY,
651            frame::SETTINGS_MAX_FIELD_SECTION_SIZE,
652            frame::SETTINGS_QPACK_BLOCKED_STREAMS,
653            frame::SETTINGS_ENABLE_CONNECT_PROTOCOL,
654            frame::SETTINGS_H3_DATAGRAM,
655            frame::SETTINGS_H3_DATAGRAM_00,
656        ]);
657
658        let dedup_settings: HashSet<u64> =
659            additional_settings.iter().map(|(key, _)| *key).collect();
660
661        if dedup_settings.len() != additional_settings.len() ||
662            !explicit_quiche_settings.is_disjoint(&dedup_settings)
663        {
664            return Err(Error::SettingsError);
665        }
666        self.additional_settings = Some(additional_settings);
667        Ok(())
668    }
669}
670
671/// A trait for types with associated string name and value.
672pub trait NameValue {
673    /// Returns the object's name.
674    fn name(&self) -> &[u8];
675
676    /// Returns the object's value.
677    fn value(&self) -> &[u8];
678}
679
680impl<N, V> NameValue for (N, V)
681where
682    N: AsRef<[u8]>,
683    V: AsRef<[u8]>,
684{
685    fn name(&self) -> &[u8] {
686        self.0.as_ref()
687    }
688
689    fn value(&self) -> &[u8] {
690        self.1.as_ref()
691    }
692}
693
694/// An owned name-value pair representing a raw HTTP header.
695#[derive(Clone, PartialEq, Eq)]
696pub struct Header(Vec<u8>, Vec<u8>);
697
698fn try_print_as_readable(hdr: &[u8], f: &mut fmt::Formatter) -> fmt::Result {
699    match std::str::from_utf8(hdr) {
700        Ok(s) => f.write_str(&s.escape_default().to_string()),
701        Err(_) => write!(f, "{hdr:?}"),
702    }
703}
704
705impl fmt::Debug for Header {
706    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
707        f.write_char('"')?;
708        try_print_as_readable(&self.0, f)?;
709        f.write_str(": ")?;
710        try_print_as_readable(&self.1, f)?;
711        f.write_char('"')
712    }
713}
714
715impl Header {
716    /// Creates a new header.
717    ///
718    /// Both `name` and `value` will be cloned.
719    pub fn new(name: &[u8], value: &[u8]) -> Self {
720        Self(name.to_vec(), value.to_vec())
721    }
722}
723
724impl NameValue for Header {
725    fn name(&self) -> &[u8] {
726        &self.0
727    }
728
729    fn value(&self) -> &[u8] {
730        &self.1
731    }
732}
733
734/// A non-owned name-value pair representing a raw HTTP header.
735#[derive(Clone, Debug, PartialEq, Eq)]
736pub struct HeaderRef<'a>(&'a [u8], &'a [u8]);
737
738impl<'a> HeaderRef<'a> {
739    /// Creates a new header.
740    pub const fn new(name: &'a [u8], value: &'a [u8]) -> Self {
741        Self(name, value)
742    }
743}
744
745impl NameValue for HeaderRef<'_> {
746    fn name(&self) -> &[u8] {
747        self.0
748    }
749
750    fn value(&self) -> &[u8] {
751        self.1
752    }
753}
754
755/// An HTTP/3 connection event.
756#[derive(Clone, Debug, PartialEq, Eq)]
757pub enum Event {
758    /// Request/response headers were received.
759    Headers {
760        /// The list of received header fields. The application should validate
761        /// pseudo-headers and headers.
762        list: Vec<Header>,
763
764        /// Whether more frames will follow the headers on the stream.
765        more_frames: bool,
766    },
767
768    /// Data was received.
769    ///
770    /// This indicates that the application can use the [`recv_body()`] method
771    /// to retrieve the data from the stream.
772    ///
773    /// Note that [`recv_body()`] will need to be called repeatedly until the
774    /// [`Done`] value is returned, as the event will not be re-armed until all
775    /// buffered data is read.
776    ///
777    /// [`recv_body()`]: struct.Connection.html#method.recv_body
778    /// [`Done`]: enum.Error.html#variant.Done
779    Data,
780
781    /// Stream was closed,
782    Finished,
783
784    /// Stream was reset.
785    ///
786    /// The associated data represents the error code sent by the peer.
787    Reset(u64),
788
789    /// PRIORITY_UPDATE was received.
790    ///
791    /// This indicates that the application can use the
792    /// [`take_last_priority_update()`] method to take the last received
793    /// PRIORITY_UPDATE for a specified stream.
794    ///
795    /// This event is triggered once per stream until the last PRIORITY_UPDATE
796    /// is taken. It is recommended that applications defer taking the
797    /// PRIORITY_UPDATE until after [`poll()`] returns [`Done`].
798    ///
799    /// [`take_last_priority_update()`]: struct.Connection.html#method.take_last_priority_update
800    /// [`poll()`]: struct.Connection.html#method.poll
801    /// [`Done`]: enum.Error.html#variant.Done
802    PriorityUpdate,
803
804    /// GOAWAY was received.
805    GoAway,
806}
807
808/// Extensible Priorities parameters.
809///
810/// The `TryFrom` trait supports constructing this object from the serialized
811/// Structured Fields Dictionary field value. I.e, use `TryFrom` to parse the
812/// value of a Priority header field or a PRIORITY_UPDATE frame. Using this
813/// trait requires the `sfv` feature to be enabled.
814#[derive(Clone, Copy, Debug, PartialEq, Eq)]
815#[repr(C)]
816pub struct Priority {
817    urgency: u8,
818    incremental: bool,
819}
820
821impl Default for Priority {
822    fn default() -> Self {
823        Priority {
824            urgency: PRIORITY_URGENCY_DEFAULT,
825            incremental: PRIORITY_INCREMENTAL_DEFAULT,
826        }
827    }
828}
829
830impl Priority {
831    /// Creates a new Priority.
832    pub const fn new(urgency: u8, incremental: bool) -> Self {
833        Priority {
834            urgency,
835            incremental,
836        }
837    }
838}
839
840#[cfg(feature = "sfv")]
841#[cfg_attr(docsrs, doc(cfg(feature = "sfv")))]
842impl TryFrom<&[u8]> for Priority {
843    type Error = Error;
844
845    /// Try to parse an Extensible Priority field value.
846    ///
847    /// The field value is expected to be a Structured Fields Dictionary; see
848    /// [Extensible Priorities].
849    ///
850    /// If the `u` or `i` fields are contained with correct types, a constructed
851    /// Priority object is returned. Note that urgency values outside of valid
852    /// range (0 through 7) are clamped to 7.
853    ///
854    /// If the `u` or `i` fields are contained with the wrong types,
855    /// Error::Done is returned.
856    ///
857    /// Omitted parameters will yield default values.
858    ///
859    /// [Extensible Priorities]: https://www.rfc-editor.org/rfc/rfc9218.html#section-4.
860    fn try_from(value: &[u8]) -> std::result::Result<Self, Self::Error> {
861        let dict = match sfv::Parser::parse_dictionary(value) {
862            Ok(v) => v,
863
864            Err(_) => return Err(Error::Done),
865        };
866
867        let urgency = match dict.get("u") {
868            // If there is a u parameter, try to read it as an Item of type
869            // Integer. If the value out of the spec's allowed range
870            // (0 through 7), that's an error so set it to the upper
871            // bound (lowest priority) to avoid interference with
872            // other streams.
873            Some(sfv::ListEntry::Item(item)) => match item.bare_item.as_int() {
874                Some(v) => {
875                    if !(PRIORITY_URGENCY_LOWER_BOUND as i64..=
876                        PRIORITY_URGENCY_UPPER_BOUND as i64)
877                        .contains(&v)
878                    {
879                        PRIORITY_URGENCY_UPPER_BOUND
880                    } else {
881                        v as u8
882                    }
883                },
884
885                None => return Err(Error::Done),
886            },
887
888            Some(sfv::ListEntry::InnerList(_)) => return Err(Error::Done),
889
890            // Omitted so use default value.
891            None => PRIORITY_URGENCY_DEFAULT,
892        };
893
894        let incremental = match dict.get("i") {
895            Some(sfv::ListEntry::Item(item)) =>
896                item.bare_item.as_bool().ok_or(Error::Done)?,
897
898            // Omitted so use default value.
899            _ => false,
900        };
901
902        Ok(Priority::new(urgency, incremental))
903    }
904}
905
906struct ConnectionSettings {
907    pub max_field_section_size: Option<u64>,
908    pub qpack_max_table_capacity: Option<u64>,
909    pub qpack_blocked_streams: Option<u64>,
910    pub connect_protocol_enabled: Option<u64>,
911    pub h3_datagram: Option<u64>,
912    pub additional_settings: Option<Vec<(u64, u64)>>,
913    pub raw: Option<Vec<(u64, u64)>>,
914}
915
916#[derive(Default)]
917struct QpackStreams {
918    pub encoder_stream_id: Option<u64>,
919    pub encoder_stream_bytes: u64,
920    pub decoder_stream_id: Option<u64>,
921    pub decoder_stream_bytes: u64,
922}
923
924/// Statistics about the connection.
925///
926/// A connection's statistics can be collected using the [`stats()`] method.
927///
928/// [`stats()`]: struct.Connection.html#method.stats
929#[derive(Clone, Default)]
930pub struct Stats {
931    /// The number of bytes received on the QPACK encoder stream.
932    pub qpack_encoder_stream_recv_bytes: u64,
933    /// The number of bytes received on the QPACK decoder stream.
934    pub qpack_decoder_stream_recv_bytes: u64,
935}
936
937fn close_conn_critical_stream<F: BufFactory>(
938    conn: &mut super::Connection<F>,
939) -> Result<()> {
940    conn.close(
941        true,
942        Error::ClosedCriticalStream.to_wire(),
943        b"Critical stream closed.",
944    )?;
945
946    Err(Error::ClosedCriticalStream)
947}
948
949fn close_conn_if_critical_stream_finished<F: BufFactory>(
950    conn: &mut super::Connection<F>, stream_id: u64,
951) -> Result<()> {
952    if conn.stream_finished(stream_id) {
953        close_conn_critical_stream(conn)?;
954    }
955
956    Ok(())
957}
958
959/// An HTTP/3 connection.
960pub struct Connection {
961    is_server: bool,
962
963    next_request_stream_id: u64,
964    next_uni_stream_id: u64,
965
966    streams: crate::stream::StreamIdHashMap<stream::Stream>,
967
968    local_settings: ConnectionSettings,
969    peer_settings: ConnectionSettings,
970
971    control_stream_id: Option<u64>,
972    peer_control_stream_id: Option<u64>,
973
974    qpack_encoder: qpack::Encoder,
975    qpack_decoder: qpack::Decoder,
976
977    local_qpack_streams: QpackStreams,
978    peer_qpack_streams: QpackStreams,
979
980    max_push_id: u64,
981
982    finished_streams: VecDeque<u64>,
983
984    frames_greased: bool,
985
986    local_goaway_id: Option<u64>,
987    peer_goaway_id: Option<u64>,
988}
989
990impl Connection {
991    fn new(
992        config: &Config, is_server: bool, enable_dgram: bool,
993    ) -> Result<Connection> {
994        let initial_uni_stream_id = if is_server { 0x3 } else { 0x2 };
995        let h3_datagram = if enable_dgram { Some(1) } else { None };
996
997        Ok(Connection {
998            is_server,
999
1000            next_request_stream_id: 0,
1001
1002            next_uni_stream_id: initial_uni_stream_id,
1003
1004            streams: Default::default(),
1005
1006            local_settings: ConnectionSettings {
1007                max_field_section_size: config.max_field_section_size,
1008                qpack_max_table_capacity: config.qpack_max_table_capacity,
1009                qpack_blocked_streams: config.qpack_blocked_streams,
1010                connect_protocol_enabled: config.connect_protocol_enabled,
1011                h3_datagram,
1012                additional_settings: config.additional_settings.clone(),
1013                raw: Default::default(),
1014            },
1015
1016            peer_settings: ConnectionSettings {
1017                max_field_section_size: None,
1018                qpack_max_table_capacity: None,
1019                qpack_blocked_streams: None,
1020                h3_datagram: None,
1021                connect_protocol_enabled: None,
1022                additional_settings: Default::default(),
1023                raw: Default::default(),
1024            },
1025
1026            control_stream_id: None,
1027            peer_control_stream_id: None,
1028
1029            qpack_encoder: qpack::Encoder::new(),
1030            qpack_decoder: qpack::Decoder::new(),
1031
1032            local_qpack_streams: Default::default(),
1033            peer_qpack_streams: Default::default(),
1034
1035            max_push_id: 0,
1036
1037            finished_streams: VecDeque::new(),
1038
1039            frames_greased: false,
1040
1041            local_goaway_id: None,
1042            peer_goaway_id: None,
1043        })
1044    }
1045
1046    /// Creates a new HTTP/3 connection using the provided QUIC connection.
1047    ///
1048    /// This will also initiate the HTTP/3 handshake with the peer by opening
1049    /// all control streams (including QPACK) and sending the local settings.
1050    ///
1051    /// On success the new connection is returned.
1052    ///
1053    /// The [`StreamLimit`] error is returned when the HTTP/3 control stream
1054    /// cannot be created due to stream limits.
1055    ///
1056    /// The [`InternalError`] error is returned when either the underlying QUIC
1057    /// connection is not in a suitable state, or the HTTP/3 control stream
1058    /// cannot be created due to flow control limits.
1059    ///
1060    /// [`StreamLimit`]: ../enum.Error.html#variant.StreamLimit
1061    /// [`InternalError`]: ../enum.Error.html#variant.InternalError
1062    pub fn with_transport<F: BufFactory>(
1063        conn: &mut super::Connection<F>, config: &Config,
1064    ) -> Result<Connection> {
1065        let is_client = !conn.is_server;
1066        if is_client && !(conn.is_established() || conn.is_in_early_data()) {
1067            trace!("{} QUIC connection must be established or in early data before creating an HTTP/3 connection", conn.trace_id());
1068            return Err(Error::InternalError);
1069        }
1070
1071        let mut http3_conn =
1072            Connection::new(config, conn.is_server, conn.dgram_enabled())?;
1073
1074        match http3_conn.send_settings(conn) {
1075            Ok(_) => (),
1076
1077            Err(e) => {
1078                conn.close(true, e.to_wire(), b"Error opening control stream")?;
1079                return Err(e);
1080            },
1081        };
1082
1083        // Try opening QPACK streams, but ignore errors if it fails since we
1084        // don't need them right now.
1085        http3_conn.open_qpack_encoder_stream(conn).ok();
1086        http3_conn.open_qpack_decoder_stream(conn).ok();
1087
1088        if conn.grease {
1089            // Try opening a GREASE stream, but ignore errors since it's not
1090            // critical.
1091            http3_conn.open_grease_stream(conn).ok();
1092        }
1093
1094        Ok(http3_conn)
1095    }
1096
1097    /// Sends an HTTP/3 request.
1098    ///
1099    /// The request is encoded from the provided list of headers without a
1100    /// body, and sent on a newly allocated stream. To include a body,
1101    /// set `fin` as `false` and subsequently call [`send_body()`] with the
1102    /// same `conn` and the `stream_id` returned from this method.
1103    ///
1104    /// On success the newly allocated stream ID is returned.
1105    ///
1106    /// The [`StreamBlocked`] error is returned when the underlying QUIC stream
1107    /// doesn't have enough capacity for the operation to complete. When this
1108    /// happens the application should retry the operation once the stream is
1109    /// reported as writable again.
1110    ///
1111    /// [`send_body()`]: struct.Connection.html#method.send_body
1112    /// [`StreamBlocked`]: enum.Error.html#variant.StreamBlocked
1113    pub fn send_request<T: NameValue, F: BufFactory>(
1114        &mut self, conn: &mut super::Connection<F>, headers: &[T], fin: bool,
1115    ) -> Result<u64> {
1116        // If we received a GOAWAY from the peer, MUST NOT initiate new
1117        // requests.
1118        if self.peer_goaway_id.is_some() {
1119            return Err(Error::FrameUnexpected);
1120        }
1121
1122        let stream_id = self.next_request_stream_id;
1123
1124        self.streams
1125            .insert(stream_id, <stream::Stream>::new(stream_id, true));
1126
1127        // The underlying QUIC stream does not exist yet, so calls to e.g.
1128        // stream_capacity() will fail. By writing a 0-length buffer, we force
1129        // the creation of the QUIC stream state, without actually writing
1130        // anything.
1131        if let Err(e) = conn.stream_send(stream_id, b"", false) {
1132            self.streams.remove(&stream_id);
1133
1134            if e == super::Error::Done {
1135                return Err(Error::StreamBlocked);
1136            }
1137
1138            return Err(e.into());
1139        };
1140
1141        self.send_headers(conn, stream_id, headers, fin)?;
1142
1143        // To avoid skipping stream IDs, we only calculate the next available
1144        // stream ID when a request has been successfully buffered.
1145        self.next_request_stream_id = self
1146            .next_request_stream_id
1147            .checked_add(4)
1148            .ok_or(Error::IdError)?;
1149
1150        Ok(stream_id)
1151    }
1152
1153    /// Sends an HTTP/3 response on the specified stream with default priority.
1154    ///
1155    /// This method sends the provided `headers` as a single initial response
1156    /// without a body.
1157    ///
1158    /// To send a non-final 1xx, then a final 200+ without body:
1159    ///   * send_response() with `fin` set to `false`.
1160    ///   * [`send_additional_headers()`] with fin set to `true` using the same
1161    ///     `stream_id` value.
1162    ///
1163    /// To send a non-final 1xx, then a final 200+ with body:
1164    ///   * send_response() with `fin` set to `false`.
1165    ///   * [`send_additional_headers()`] with fin set to `false` and same
1166    ///     `stream_id` value.
1167    ///   * [`send_body()`] with same `stream_id`.
1168    ///
1169    /// To send a final 200+ with body:
1170    ///   * send_response() with `fin` set to `false`.
1171    ///   * [`send_body()`] with same `stream_id`.
1172    ///
1173    /// Additional headers can only be sent during certain phases of an HTTP/3
1174    /// message exchange, see [Section 4.1 of RFC 9114]. The [`FrameUnexpected`]
1175    /// error is returned if this method, or [`send_response_with_priority()`],
1176    /// are called multiple times with the same `stream_id` value.
1177    ///
1178    /// The [`StreamBlocked`] error is returned when the underlying QUIC stream
1179    /// doesn't have enough capacity for the operation to complete. When this
1180    /// happens the application should retry the operation once the stream is
1181    /// reported as writable again.
1182    ///
1183    /// [`send_body()`]: struct.Connection.html#method.send_body
1184    /// [`send_additional_headers()`]:
1185    ///     struct.Connection.html#method.send_additional_headers
1186    /// [`send_response_with_priority()`]:
1187    ///     struct.Connection.html#method.send_response_with_priority
1188    /// [`FrameUnexpected`]: enum.Error.html#variant.FrameUnexpected
1189    /// [`StreamBlocked`]: enum.Error.html#variant.StreamBlocked
1190    pub fn send_response<T: NameValue, F: BufFactory>(
1191        &mut self, conn: &mut super::Connection<F>, stream_id: u64,
1192        headers: &[T], fin: bool,
1193    ) -> Result<()> {
1194        let priority = Default::default();
1195
1196        self.send_response_with_priority(
1197            conn, stream_id, headers, &priority, fin,
1198        )?;
1199
1200        Ok(())
1201    }
1202
1203    /// Sends an HTTP/3 response on the specified stream with specified
1204    /// priority.
1205    ///
1206    /// This method sends the provided `headers` as a single initial response
1207    /// without a body.
1208    ///
1209    /// To send a non-final 1xx, then a final 200+ without body:
1210    ///   * send_response_with_priority() with `fin` set to `false`.
1211    ///   * [`send_additional_headers()`] with fin set to `true` using the same
1212    ///     `stream_id` value.
1213    ///
1214    /// To send a non-final 1xx, then a final 200+ with body:
1215    ///   * send_response_with_priority() with `fin` set to `false`.
1216    ///   * [`send_additional_headers()`] with fin set to `false` and same
1217    ///     `stream_id` value.
1218    ///   * [`send_body()`] with same `stream_id`.
1219    ///
1220    /// To send a final 200+ with body:
1221    ///   * send_response_with_priority() with `fin` set to `false`.
1222    ///   * [`send_body()`] with same `stream_id`.
1223    ///
1224    /// The `priority` parameter represents [Extensible Priority]
1225    /// parameters. If the urgency is outside the range 0-7, it will be clamped
1226    /// to 7.
1227    ///
1228    /// Additional headers can only be sent during certain phases of an HTTP/3
1229    /// message exchange, see [Section 4.1 of RFC 9114]. The [`FrameUnexpected`]
1230    /// error is returned if this method, or [`send_response()`],
1231    /// are called multiple times with the same `stream_id` value.
1232    ///
1233    /// The [`StreamBlocked`] error is returned when the underlying QUIC stream
1234    /// doesn't have enough capacity for the operation to complete. When this
1235    /// happens the application should retry the operation once the stream is
1236    /// reported as writable again.
1237    ///
1238    /// [`send_body()`]: struct.Connection.html#method.send_body
1239    /// [`send_additional_headers()`]:
1240    ///     struct.Connection.html#method.send_additional_headers
1241    /// [`send_response()`]:
1242    ///     struct.Connection.html#method.send_response
1243    /// [`FrameUnexpected`]: enum.Error.html#variant.FrameUnexpected
1244    /// [`StreamBlocked`]: enum.Error.html#variant.StreamBlocked
1245    /// [Extensible Priority]: https://www.rfc-editor.org/rfc/rfc9218.html#section-4.
1246    pub fn send_response_with_priority<T: NameValue, F: BufFactory>(
1247        &mut self, conn: &mut super::Connection<F>, stream_id: u64,
1248        headers: &[T], priority: &Priority, fin: bool,
1249    ) -> Result<()> {
1250        match self.streams.get(&stream_id) {
1251            Some(s) => {
1252                // Only one initial HEADERS allowed.
1253                if s.local_initialized() {
1254                    return Err(Error::FrameUnexpected);
1255                }
1256
1257                s
1258            },
1259
1260            None => return Err(Error::FrameUnexpected),
1261        };
1262
1263        self.send_headers(conn, stream_id, headers, fin)?;
1264
1265        // Clamp and shift urgency into quiche-priority space
1266        let urgency = priority
1267            .urgency
1268            .clamp(PRIORITY_URGENCY_LOWER_BOUND, PRIORITY_URGENCY_UPPER_BOUND) +
1269            PRIORITY_URGENCY_OFFSET;
1270
1271        conn.stream_priority(stream_id, urgency, priority.incremental)?;
1272
1273        Ok(())
1274    }
1275
1276    /// Sends additional HTTP/3 headers.
1277    ///
1278    /// After the initial request or response headers have been sent, using
1279    /// [`send_request()`] or [`send_response()`] respectively, this method can
1280    /// be used send an additional HEADERS frame. For example, to send a single
1281    /// instance of trailers after a request with a body, or to issue another
1282    /// non-final 1xx after a preceding 1xx, or to issue a final response after
1283    /// a preceding 1xx.
1284    ///
1285    /// Additional headers can only be sent during certain phases of an HTTP/3
1286    /// message exchange, see [Section 4.1 of RFC 9114]. The [`FrameUnexpected`]
1287    /// error is returned when this method is called during the wrong phase,
1288    /// such as before initial headers have been sent, or if trailers have
1289    /// already been sent.
1290    ///
1291    /// The [`StreamBlocked`] error is returned when the underlying QUIC stream
1292    /// doesn't have enough capacity for the operation to complete. When this
1293    /// happens the application should retry the operation once the stream is
1294    /// reported as writable again.
1295    ///
1296    /// [`send_request()`]: struct.Connection.html#method.send_request
1297    /// [`send_response()`]: struct.Connection.html#method.send_response
1298    /// [`StreamBlocked`]: enum.Error.html#variant.StreamBlocked
1299    /// [`FrameUnexpected`]: enum.Error.html#variant.FrameUnexpected
1300    /// [Section 4.1 of RFC 9114]:
1301    ///     https://www.rfc-editor.org/rfc/rfc9114.html#section-4.1.
1302    pub fn send_additional_headers<T: NameValue, F: BufFactory>(
1303        &mut self, conn: &mut super::Connection<F>, stream_id: u64,
1304        headers: &[T], is_trailer_section: bool, fin: bool,
1305    ) -> Result<()> {
1306        // Clients can only send trailer headers.
1307        if !self.is_server && !is_trailer_section {
1308            return Err(Error::FrameUnexpected);
1309        }
1310
1311        match self.streams.get(&stream_id) {
1312            Some(s) => {
1313                // Initial HEADERS must have been sent.
1314                if !s.local_initialized() {
1315                    return Err(Error::FrameUnexpected);
1316                }
1317
1318                // Only one trailing HEADERS allowed.
1319                if s.trailers_sent() {
1320                    return Err(Error::FrameUnexpected);
1321                }
1322
1323                s
1324            },
1325
1326            None => return Err(Error::FrameUnexpected),
1327        };
1328
1329        self.send_headers(conn, stream_id, headers, fin)?;
1330
1331        if is_trailer_section {
1332            // send_headers() might have tidied the stream away, so we need to
1333            // check again.
1334            if let Some(s) = self.streams.get_mut(&stream_id) {
1335                s.mark_trailers_sent();
1336            }
1337        }
1338
1339        Ok(())
1340    }
1341
1342    /// Sends additional HTTP/3 headers with specified priority.
1343    ///
1344    /// After the initial request or response headers have been sent, using
1345    /// [`send_request()`] or [`send_response()`] respectively, this method can
1346    /// be used send an additional HEADERS frame. For example, to send a single
1347    /// instance of trailers after a request with a body, or to issue another
1348    /// non-final 1xx after a preceding 1xx, or to issue a final response after
1349    /// a preceding 1xx.
1350    ///
1351    /// The `priority` parameter represents [Extensible Priority]
1352    /// parameters. If the urgency is outside the range 0-7, it will be clamped
1353    /// to 7.
1354    ///
1355    /// Additional headers can only be sent during certain phases of an HTTP/3
1356    /// message exchange, see [Section 4.1 of RFC 9114]. The [`FrameUnexpected`]
1357    /// error is returned when this method is called during the wrong phase,
1358    /// such as before initial headers have been sent, or if trailers have
1359    /// already been sent.
1360    ///
1361    /// The [`StreamBlocked`] error is returned when the underlying QUIC stream
1362    /// doesn't have enough capacity for the operation to complete. When this
1363    /// happens the application should retry the operation once the stream is
1364    /// reported as writable again.
1365    ///
1366    /// [`send_request()`]: struct.Connection.html#method.send_request
1367    /// [`send_response()`]: struct.Connection.html#method.send_response
1368    /// [`StreamBlocked`]: enum.Error.html#variant.StreamBlocked
1369    /// [`FrameUnexpected`]: enum.Error.html#variant.FrameUnexpected
1370    /// [Section 4.1 of RFC 9114]:
1371    ///     https://www.rfc-editor.org/rfc/rfc9114.html#section-4.1.
1372    /// [Extensible Priority]: https://www.rfc-editor.org/rfc/rfc9218.html#section-4.
1373    pub fn send_additional_headers_with_priority<T: NameValue, F: BufFactory>(
1374        &mut self, conn: &mut super::Connection<F>, stream_id: u64,
1375        headers: &[T], priority: &Priority, is_trailer_section: bool, fin: bool,
1376    ) -> Result<()> {
1377        self.send_additional_headers(
1378            conn,
1379            stream_id,
1380            headers,
1381            is_trailer_section,
1382            fin,
1383        )?;
1384
1385        // Clamp and shift urgency into quiche-priority space
1386        let urgency = priority
1387            .urgency
1388            .clamp(PRIORITY_URGENCY_LOWER_BOUND, PRIORITY_URGENCY_UPPER_BOUND) +
1389            PRIORITY_URGENCY_OFFSET;
1390
1391        conn.stream_priority(stream_id, urgency, priority.incremental)?;
1392
1393        Ok(())
1394    }
1395
1396    fn encode_header_block<T: NameValue>(
1397        &mut self, headers: &[T],
1398    ) -> Result<Vec<u8>> {
1399        let headers_len = headers
1400            .iter()
1401            .fold(0, |acc, h| acc + h.value().len() + h.name().len() + 32);
1402
1403        let mut header_block = vec![0; headers_len];
1404        let len = self
1405            .qpack_encoder
1406            .encode(headers, &mut header_block)
1407            .map_err(|_| Error::InternalError)?;
1408
1409        header_block.truncate(len);
1410
1411        Ok(header_block)
1412    }
1413
1414    fn send_headers<T: NameValue, F: BufFactory>(
1415        &mut self, conn: &mut super::Connection<F>, stream_id: u64,
1416        headers: &[T], fin: bool,
1417    ) -> Result<()> {
1418        let mut d = [42; 10];
1419        let mut b = octets::OctetsMut::with_slice(&mut d);
1420
1421        if !self.frames_greased && conn.grease {
1422            self.send_grease_frames(conn, stream_id)?;
1423            self.frames_greased = true;
1424        }
1425
1426        let header_block = self.encode_header_block(headers)?;
1427
1428        let overhead = octets::varint_len(frame::HEADERS_FRAME_TYPE_ID) +
1429            octets::varint_len(header_block.len() as u64);
1430
1431        // Headers need to be sent atomically, so make sure the stream has
1432        // enough capacity.
1433        match conn.stream_writable(stream_id, overhead + header_block.len()) {
1434            Ok(true) => (),
1435
1436            Ok(false) => return Err(Error::StreamBlocked),
1437
1438            Err(e) => {
1439                if conn.stream_finished(stream_id) {
1440                    self.streams.remove(&stream_id);
1441                }
1442
1443                return Err(e.into());
1444            },
1445        };
1446
1447        b.put_varint(frame::HEADERS_FRAME_TYPE_ID)?;
1448        b.put_varint(header_block.len() as u64)?;
1449        let off = b.off();
1450        conn.stream_send(stream_id, &d[..off], false)?;
1451
1452        // Sending header block separately avoids unnecessary copy.
1453        conn.stream_send(stream_id, &header_block, fin)?;
1454
1455        trace!(
1456            "{} tx frm HEADERS stream={} len={} fin={}",
1457            conn.trace_id(),
1458            stream_id,
1459            header_block.len(),
1460            fin
1461        );
1462
1463        qlog_with_type!(QLOG_FRAME_CREATED, conn.qlog, q, {
1464            let qlog_headers = headers
1465                .iter()
1466                .map(|h| qlog::events::http3::HttpHeader {
1467                    name: Some(String::from_utf8_lossy(h.name()).into_owned()),
1468                    name_bytes: None,
1469                    value: Some(String::from_utf8_lossy(h.value()).into_owned()),
1470                    value_bytes: None,
1471                })
1472                .collect();
1473
1474            let frame = Http3Frame::Headers {
1475                headers: qlog_headers,
1476            };
1477            let ev_data = EventData::Http3FrameCreated(FrameCreated {
1478                stream_id,
1479                length: Some(header_block.len() as u64),
1480                frame,
1481                ..Default::default()
1482            });
1483
1484            q.add_event_data_now(ev_data).ok();
1485        });
1486
1487        if let Some(s) = self.streams.get_mut(&stream_id) {
1488            s.initialize_local();
1489        }
1490
1491        if fin && conn.stream_finished(stream_id) {
1492            self.streams.remove(&stream_id);
1493        }
1494
1495        Ok(())
1496    }
1497
1498    /// Sends an HTTP/3 body chunk on the given stream.
1499    ///
1500    /// On success the number of bytes written is returned, or [`Done`] if no
1501    /// bytes could be written (e.g. because the stream is blocked).
1502    ///
1503    /// Note that the number of written bytes returned can be lower than the
1504    /// length of the input buffer when the underlying QUIC stream doesn't have
1505    /// enough capacity for the operation to complete.
1506    ///
1507    /// When a partial write happens (including when [`Done`] is returned) the
1508    /// application should retry the operation once the stream is reported as
1509    /// writable again.
1510    ///
1511    /// [`Done`]: enum.Error.html#variant.Done
1512    pub fn send_body<F: BufFactory>(
1513        &mut self, conn: &mut super::Connection<F>, stream_id: u64, body: &[u8],
1514        fin: bool,
1515    ) -> Result<usize> {
1516        self.do_send_body(
1517            conn,
1518            stream_id,
1519            body,
1520            fin,
1521            |conn: &mut super::Connection<F>,
1522             header: &[u8],
1523             stream_id: u64,
1524             body: &[u8],
1525             body_len: usize,
1526             fin: bool| {
1527                conn.stream_send(stream_id, header, false)?;
1528                Ok(conn
1529                    .stream_send(stream_id, &body[..body_len], fin)
1530                    .map(|v| (v, v))?)
1531            },
1532        )
1533    }
1534
1535    /// Sends an HTTP/3 body chunk provided as a raw buffer on the given stream.
1536    ///
1537    /// If the capacity allows it the buffer will be appended to the stream's
1538    /// send queue with zero copying.
1539    ///
1540    /// On success the number of bytes written is returned, or [`Done`] if no
1541    /// bytes could be written (e.g. because the stream is blocked).
1542    ///
1543    /// Note that the number of written bytes returned can be lower than the
1544    /// length of the input buffer when the underlying QUIC stream doesn't have
1545    /// enough capacity for the operation to complete.
1546    ///
1547    /// When a partial write happens (including when [`Done`] is returned) the
1548    /// remaining (unwrittent) buffer will also be returned. The application
1549    /// should retry the operation once the stream is reported as writable
1550    /// again.
1551    ///
1552    /// [`Done`]: enum.Error.html#variant.Done
1553    pub fn send_body_zc<F>(
1554        &mut self, conn: &mut super::Connection<F>, stream_id: u64,
1555        body: &mut F::Buf, fin: bool,
1556    ) -> Result<usize>
1557    where
1558        F: BufFactory,
1559        F::Buf: BufSplit,
1560    {
1561        self.do_send_body(
1562            conn,
1563            stream_id,
1564            body,
1565            fin,
1566            |conn: &mut super::Connection<F>,
1567             header: &[u8],
1568             stream_id: u64,
1569             body: &mut F::Buf,
1570             mut body_len: usize,
1571             fin: bool| {
1572                let with_prefix = body.try_add_prefix(header);
1573                if !with_prefix {
1574                    conn.stream_send(stream_id, header, false)?;
1575                } else {
1576                    body_len += header.len();
1577                }
1578
1579                let (mut n, rem) = conn.stream_send_zc(
1580                    stream_id,
1581                    body.clone(),
1582                    Some(body_len),
1583                    fin,
1584                )?;
1585
1586                if with_prefix {
1587                    n -= header.len();
1588                }
1589
1590                if let Some(rem) = rem {
1591                    let _ = std::mem::replace(body, rem);
1592                }
1593
1594                Ok((n, n))
1595            },
1596        )
1597    }
1598
1599    fn do_send_body<F, B, R, SND>(
1600        &mut self, conn: &mut super::Connection<F>, stream_id: u64, body: B,
1601        fin: bool, write_fn: SND,
1602    ) -> Result<R>
1603    where
1604        F: BufFactory,
1605        B: AsRef<[u8]>,
1606        SND: FnOnce(
1607            &mut super::Connection<F>,
1608            &[u8],
1609            u64,
1610            B,
1611            usize,
1612            bool,
1613        ) -> Result<(usize, R)>,
1614    {
1615        let mut d = [42; 10];
1616        let mut b = octets::OctetsMut::with_slice(&mut d);
1617
1618        let len = body.as_ref().len();
1619
1620        // Validate that it is sane to send data on the stream.
1621        if stream_id % 4 != 0 {
1622            return Err(Error::FrameUnexpected);
1623        }
1624
1625        match self.streams.get_mut(&stream_id) {
1626            Some(s) => {
1627                if !s.local_initialized() {
1628                    return Err(Error::FrameUnexpected);
1629                }
1630
1631                if s.trailers_sent() {
1632                    return Err(Error::FrameUnexpected);
1633                }
1634            },
1635
1636            None => {
1637                return Err(Error::FrameUnexpected);
1638            },
1639        };
1640
1641        // Avoid sending 0-length DATA frames when the fin flag is false.
1642        if len == 0 && !fin {
1643            return Err(Error::Done);
1644        }
1645
1646        let overhead = octets::varint_len(frame::DATA_FRAME_TYPE_ID) +
1647            octets::varint_len(len as u64);
1648
1649        let stream_cap = match conn.stream_capacity(stream_id) {
1650            Ok(v) => v,
1651
1652            Err(e) => {
1653                if conn.stream_finished(stream_id) {
1654                    self.streams.remove(&stream_id);
1655                }
1656
1657                return Err(e.into());
1658            },
1659        };
1660
1661        // Make sure there is enough capacity to send the DATA frame header.
1662        if stream_cap < overhead {
1663            let _ = conn.stream_writable(stream_id, overhead + 1);
1664            return Err(Error::Done);
1665        }
1666
1667        // Cap the frame payload length to the stream's capacity.
1668        let body_len = std::cmp::min(len, stream_cap - overhead);
1669
1670        // If we can't send the entire body, set the fin flag to false so the
1671        // application can try again later.
1672        let fin = if body_len != len { false } else { fin };
1673
1674        // Again, avoid sending 0-length DATA frames when the fin flag is false.
1675        if body_len == 0 && !fin {
1676            let _ = conn.stream_writable(stream_id, overhead + 1);
1677            return Err(Error::Done);
1678        }
1679
1680        b.put_varint(frame::DATA_FRAME_TYPE_ID)?;
1681        b.put_varint(body_len as u64)?;
1682        let off = b.off();
1683
1684        // Return how many bytes were written, excluding the frame header.
1685        // Sending body separately avoids unnecessary copy.
1686        let (written, ret) =
1687            write_fn(conn, &d[..off], stream_id, body, body_len, fin)?;
1688
1689        trace!(
1690            "{} tx frm DATA stream={} len={} fin={}",
1691            conn.trace_id(),
1692            stream_id,
1693            written,
1694            fin
1695        );
1696
1697        qlog_with_type!(QLOG_FRAME_CREATED, conn.qlog, q, {
1698            let frame = Http3Frame::Data { raw: None };
1699            let ev_data = EventData::Http3FrameCreated(FrameCreated {
1700                stream_id,
1701                length: Some(written as u64),
1702                frame,
1703                ..Default::default()
1704            });
1705
1706            q.add_event_data_now(ev_data).ok();
1707        });
1708
1709        if written < len {
1710            // Ensure the peer is notified that the connection or stream is
1711            // blocked when the stream's capacity is limited by flow control.
1712            //
1713            // We only need enough capacity to send a few bytes, to make sure
1714            // the stream doesn't hang due to congestion window not growing
1715            // enough.
1716            let _ = conn.stream_writable(stream_id, overhead + 1);
1717        }
1718
1719        if fin && written == len && conn.stream_finished(stream_id) {
1720            self.streams.remove(&stream_id);
1721        }
1722
1723        Ok(ret)
1724    }
1725
1726    /// Returns whether the peer enabled HTTP/3 DATAGRAM frame support.
1727    ///
1728    /// Support is signalled by the peer's SETTINGS, so this method always
1729    /// returns false until they have been processed using the [`poll()`]
1730    /// method.
1731    ///
1732    /// [`poll()`]: struct.Connection.html#method.poll
1733    pub fn dgram_enabled_by_peer<F: BufFactory>(
1734        &self, conn: &super::Connection<F>,
1735    ) -> bool {
1736        self.peer_settings.h3_datagram == Some(1) &&
1737            conn.dgram_max_writable_len().is_some()
1738    }
1739
1740    /// Returns whether the peer enabled extended CONNECT support.
1741    ///
1742    /// Support is signalled by the peer's SETTINGS, so this method always
1743    /// returns false until they have been processed using the [`poll()`]
1744    /// method.
1745    ///
1746    /// [`poll()`]: struct.Connection.html#method.poll
1747    pub fn extended_connect_enabled_by_peer(&self) -> bool {
1748        self.peer_settings.connect_protocol_enabled == Some(1)
1749    }
1750
1751    /// Reads request or response body data into the provided buffer.
1752    ///
1753    /// Applications should call this method whenever the [`poll()`] method
1754    /// returns a [`Data`] event.
1755    ///
1756    /// On success the amount of bytes read is returned, or [`Done`] if there
1757    /// is no data to read.
1758    ///
1759    /// [`poll()`]: struct.Connection.html#method.poll
1760    /// [`Data`]: enum.Event.html#variant.Data
1761    /// [`Done`]: enum.Error.html#variant.Done
1762    pub fn recv_body<F: BufFactory>(
1763        &mut self, conn: &mut super::Connection<F>, stream_id: u64,
1764        out: &mut [u8],
1765    ) -> Result<usize> {
1766        let mut total = 0;
1767
1768        // Try to consume all buffered data for the stream, even across multiple
1769        // DATA frames.
1770        while total < out.len() {
1771            let stream = self.streams.get_mut(&stream_id).ok_or(Error::Done)?;
1772
1773            if stream.state() != stream::State::Data {
1774                break;
1775            }
1776
1777            let (read, fin) =
1778                match stream.try_consume_data(conn, &mut out[total..]) {
1779                    Ok(v) => v,
1780
1781                    Err(Error::Done) => break,
1782
1783                    Err(e) => return Err(e),
1784                };
1785
1786            total += read;
1787
1788            // No more data to read, we are done.
1789            if read == 0 || fin {
1790                break;
1791            }
1792
1793            // Process incoming data from the stream. For example, if a whole
1794            // DATA frame was consumed, and another one is queued behind it,
1795            // this will ensure the additional data will also be returned to
1796            // the application.
1797            match self.process_readable_stream(conn, stream_id, false) {
1798                Ok(_) => unreachable!(),
1799
1800                Err(Error::Done) => (),
1801
1802                Err(e) => return Err(e),
1803            };
1804
1805            if conn.stream_finished(stream_id) {
1806                break;
1807            }
1808        }
1809
1810        // While body is being received, the stream is marked as finished only
1811        // when all data is read by the application.
1812        if conn.stream_finished(stream_id) {
1813            self.process_finished_stream(stream_id);
1814        }
1815
1816        if total == 0 {
1817            return Err(Error::Done);
1818        }
1819
1820        Ok(total)
1821    }
1822
1823    /// Sends a PRIORITY_UPDATE frame on the control stream with specified
1824    /// request stream ID and priority.
1825    ///
1826    /// The `priority` parameter represents [Extensible Priority]
1827    /// parameters. If the urgency is outside the range 0-7, it will be clamped
1828    /// to 7.
1829    ///
1830    /// The [`StreamBlocked`] error is returned when the underlying QUIC stream
1831    /// doesn't have enough capacity for the operation to complete. When this
1832    /// happens the application should retry the operation once the stream is
1833    /// reported as writable again.
1834    ///
1835    /// [`StreamBlocked`]: enum.Error.html#variant.StreamBlocked
1836    /// [Extensible Priority]: https://www.rfc-editor.org/rfc/rfc9218.html#section-4.
1837    pub fn send_priority_update_for_request<F: BufFactory>(
1838        &mut self, conn: &mut super::Connection<F>, stream_id: u64,
1839        priority: &Priority,
1840    ) -> Result<()> {
1841        let mut d = [42; 20];
1842        let mut b = octets::OctetsMut::with_slice(&mut d);
1843
1844        // Validate that it is sane to send PRIORITY_UPDATE.
1845        if self.is_server {
1846            return Err(Error::FrameUnexpected);
1847        }
1848
1849        if stream_id % 4 != 0 {
1850            return Err(Error::FrameUnexpected);
1851        }
1852
1853        let control_stream_id =
1854            self.control_stream_id.ok_or(Error::FrameUnexpected)?;
1855
1856        let urgency = priority
1857            .urgency
1858            .clamp(PRIORITY_URGENCY_LOWER_BOUND, PRIORITY_URGENCY_UPPER_BOUND);
1859
1860        let mut field_value = format!("u={urgency}");
1861
1862        if priority.incremental {
1863            field_value.push_str(",i");
1864        }
1865
1866        let priority_field_value = field_value.as_bytes();
1867        let frame_payload_len =
1868            octets::varint_len(stream_id) + priority_field_value.len();
1869
1870        let overhead =
1871            octets::varint_len(frame::PRIORITY_UPDATE_FRAME_REQUEST_TYPE_ID) +
1872                octets::varint_len(stream_id) +
1873                octets::varint_len(frame_payload_len as u64);
1874
1875        // Make sure the control stream has enough capacity.
1876        match conn.stream_writable(
1877            control_stream_id,
1878            overhead + priority_field_value.len(),
1879        ) {
1880            Ok(true) => (),
1881
1882            Ok(false) => return Err(Error::StreamBlocked),
1883
1884            Err(e) => {
1885                return Err(e.into());
1886            },
1887        }
1888
1889        b.put_varint(frame::PRIORITY_UPDATE_FRAME_REQUEST_TYPE_ID)?;
1890        b.put_varint(frame_payload_len as u64)?;
1891        b.put_varint(stream_id)?;
1892        let off = b.off();
1893        conn.stream_send(control_stream_id, &d[..off], false)?;
1894
1895        // Sending field value separately avoids unnecessary copy.
1896        conn.stream_send(control_stream_id, priority_field_value, false)?;
1897
1898        trace!(
1899            "{} tx frm PRIORITY_UPDATE request_stream={} priority_field_value={}",
1900            conn.trace_id(),
1901            stream_id,
1902            field_value,
1903        );
1904
1905        qlog_with_type!(QLOG_FRAME_CREATED, conn.qlog, q, {
1906            let frame = Http3Frame::PriorityUpdate {
1907                target_stream_type: PriorityTargetStreamType::Request,
1908                prioritized_element_id: stream_id,
1909                priority_field_value: field_value.clone(),
1910            };
1911
1912            let ev_data = EventData::Http3FrameCreated(FrameCreated {
1913                stream_id,
1914                length: Some(priority_field_value.len() as u64),
1915                frame,
1916                ..Default::default()
1917            });
1918
1919            q.add_event_data_now(ev_data).ok();
1920        });
1921
1922        Ok(())
1923    }
1924
1925    /// Take the last PRIORITY_UPDATE for a prioritized element ID.
1926    ///
1927    /// When the [`poll()`] method returns a [`PriorityUpdate`] event for a
1928    /// prioritized element, the event has triggered and will not rearm until
1929    /// applications call this method. It is recommended that applications defer
1930    /// taking the PRIORITY_UPDATE until after [`poll()`] returns [`Done`].
1931    ///
1932    /// On success the Priority Field Value is returned, or [`Done`] if there is
1933    /// no PRIORITY_UPDATE to read (either because there is no value to take, or
1934    /// because the prioritized element does not exist).
1935    ///
1936    /// [`poll()`]: struct.Connection.html#method.poll
1937    /// [`PriorityUpdate`]: enum.Event.html#variant.PriorityUpdate
1938    /// [`Done`]: enum.Error.html#variant.Done
1939    pub fn take_last_priority_update(
1940        &mut self, prioritized_element_id: u64,
1941    ) -> Result<Vec<u8>> {
1942        if let Some(stream) = self.streams.get_mut(&prioritized_element_id) {
1943            return stream.take_last_priority_update().ok_or(Error::Done);
1944        }
1945
1946        Err(Error::Done)
1947    }
1948
1949    /// Processes HTTP/3 data received from the peer.
1950    ///
1951    /// On success it returns an [`Event`] and an ID, or [`Done`] when there are
1952    /// no events to report.
1953    ///
1954    /// Note that all events are edge-triggered, meaning that once reported they
1955    /// will not be reported again by calling this method again, until the event
1956    /// is re-armed.
1957    ///
1958    /// The events [`Headers`], [`Data`] and [`Finished`] return a stream ID,
1959    /// which is used in methods [`recv_body()`], [`send_response()`] or
1960    /// [`send_body()`].
1961    ///
1962    /// The event [`GoAway`] returns an ID that depends on the connection role.
1963    /// A client receives the largest processed stream ID. A server receives the
1964    /// the largest permitted push ID.
1965    ///
1966    /// The event [`PriorityUpdate`] only occurs at servers. It returns a
1967    /// prioritized element ID that is used in the method
1968    /// [`take_last_priority_update()`], which rearms the event for that ID.
1969    ///
1970    /// If an error occurs while processing data, the connection is closed with
1971    /// the appropriate error code, using the transport's [`close()`] method.
1972    ///
1973    /// [`Event`]: enum.Event.html
1974    /// [`Done`]: enum.Error.html#variant.Done
1975    /// [`Headers`]: enum.Event.html#variant.Headers
1976    /// [`Data`]: enum.Event.html#variant.Data
1977    /// [`Finished`]: enum.Event.html#variant.Finished
1978    /// [`GoAway`]: enum.Event.html#variant.GoAWay
1979    /// [`PriorityUpdate`]: enum.Event.html#variant.PriorityUpdate
1980    /// [`recv_body()`]: struct.Connection.html#method.recv_body
1981    /// [`send_response()`]: struct.Connection.html#method.send_response
1982    /// [`send_body()`]: struct.Connection.html#method.send_body
1983    /// [`recv_dgram()`]: struct.Connection.html#method.recv_dgram
1984    /// [`take_last_priority_update()`]: struct.Connection.html#method.take_last_priority_update
1985    /// [`close()`]: ../struct.Connection.html#method.close
1986    pub fn poll<F: BufFactory>(
1987        &mut self, conn: &mut super::Connection<F>,
1988    ) -> Result<(u64, Event)> {
1989        // When connection close is initiated by the local application (e.g. due
1990        // to a protocol error), the connection itself might be in a broken
1991        // state, so return early.
1992        if conn.local_error.is_some() {
1993            return Err(Error::Done);
1994        }
1995
1996        // Process control streams first.
1997        if let Some(stream_id) = self.peer_control_stream_id {
1998            match self.process_control_stream(conn, stream_id) {
1999                Ok(ev) => return Ok(ev),
2000
2001                Err(Error::Done) => (),
2002
2003                Err(e) => return Err(e),
2004            };
2005        }
2006
2007        if let Some(stream_id) = self.peer_qpack_streams.encoder_stream_id {
2008            match self.process_control_stream(conn, stream_id) {
2009                Ok(ev) => return Ok(ev),
2010
2011                Err(Error::Done) => (),
2012
2013                Err(e) => return Err(e),
2014            };
2015        }
2016
2017        if let Some(stream_id) = self.peer_qpack_streams.decoder_stream_id {
2018            match self.process_control_stream(conn, stream_id) {
2019                Ok(ev) => return Ok(ev),
2020
2021                Err(Error::Done) => (),
2022
2023                Err(e) => return Err(e),
2024            };
2025        }
2026
2027        // Process finished streams list.
2028        if let Some(finished) = self.finished_streams.pop_front() {
2029            return Ok((finished, Event::Finished));
2030        }
2031
2032        // Process HTTP/3 data from readable streams.
2033        for s in conn.readable() {
2034            trace!("{} stream id {} is readable", conn.trace_id(), s);
2035
2036            let ev = match self.process_readable_stream(conn, s, true) {
2037                Ok(v) => Some(v),
2038
2039                Err(Error::Done) => None,
2040
2041                // Return early if the stream was reset, to avoid returning
2042                // a Finished event later as well.
2043                Err(Error::TransportError(crate::Error::StreamReset(e))) =>
2044                    return Ok((s, Event::Reset(e))),
2045
2046                Err(e) => return Err(e),
2047            };
2048
2049            if conn.stream_finished(s) {
2050                self.process_finished_stream(s);
2051            }
2052
2053            // TODO: check if stream is completed so it can be freed
2054            if let Some(ev) = ev {
2055                return Ok(ev);
2056            }
2057        }
2058
2059        // Process finished streams list once again, to make sure `Finished`
2060        // events are returned when receiving empty stream frames with the fin
2061        // flag set.
2062        if let Some(finished) = self.finished_streams.pop_front() {
2063            if conn.stream_readable(finished) {
2064                // The stream is finished, but is still readable, it may
2065                // indicate that there is a pending error, such as reset.
2066                if let Err(crate::Error::StreamReset(e)) =
2067                    conn.stream_recv(finished, &mut [])
2068                {
2069                    return Ok((finished, Event::Reset(e)));
2070                }
2071            }
2072            return Ok((finished, Event::Finished));
2073        }
2074
2075        Err(Error::Done)
2076    }
2077
2078    /// Sends a GOAWAY frame to initiate graceful connection closure.
2079    ///
2080    /// When quiche is used in the server role, the `id` parameter is the stream
2081    /// ID of the highest processed request. This can be any valid ID between 0
2082    /// and 2^62-4. However, the ID cannot be increased. Failure to satisfy
2083    /// these conditions will return an error.
2084    ///
2085    /// This method does not close the QUIC connection. Applications are
2086    /// required to call [`close()`] themselves.
2087    ///
2088    /// [`close()`]: ../struct.Connection.html#method.close
2089    pub fn send_goaway<F: BufFactory>(
2090        &mut self, conn: &mut super::Connection<F>, id: u64,
2091    ) -> Result<()> {
2092        let mut id = id;
2093
2094        // TODO: server push
2095        //
2096        // In the meantime always send 0 from client.
2097        if !self.is_server {
2098            id = 0;
2099        }
2100
2101        if self.is_server && id % 4 != 0 {
2102            return Err(Error::IdError);
2103        }
2104
2105        if let Some(sent_id) = self.local_goaway_id {
2106            if id > sent_id {
2107                return Err(Error::IdError);
2108            }
2109        }
2110
2111        if let Some(stream_id) = self.control_stream_id {
2112            let mut d = [42; 10];
2113            let mut b = octets::OctetsMut::with_slice(&mut d);
2114
2115            let frame = frame::Frame::GoAway { id };
2116
2117            let wire_len = frame.to_bytes(&mut b)?;
2118            let stream_cap = conn.stream_capacity(stream_id)?;
2119
2120            if stream_cap < wire_len {
2121                return Err(Error::StreamBlocked);
2122            }
2123
2124            trace!("{} tx frm {:?}", conn.trace_id(), frame);
2125
2126            qlog_with_type!(QLOG_FRAME_CREATED, conn.qlog, q, {
2127                let ev_data = EventData::Http3FrameCreated(FrameCreated {
2128                    stream_id,
2129                    length: Some(octets::varint_len(id) as u64),
2130                    frame: frame.to_qlog(),
2131                    ..Default::default()
2132                });
2133
2134                q.add_event_data_now(ev_data).ok();
2135            });
2136
2137            let off = b.off();
2138            conn.stream_send(stream_id, &d[..off], false)?;
2139
2140            self.local_goaway_id = Some(id);
2141        }
2142
2143        Ok(())
2144    }
2145
2146    /// Gets the raw settings from peer including unknown and reserved types.
2147    ///
2148    /// The order of settings is the same as received in the SETTINGS frame.
2149    pub fn peer_settings_raw(&self) -> Option<&[(u64, u64)]> {
2150        self.peer_settings.raw.as_deref()
2151    }
2152
2153    fn open_uni_stream<F: BufFactory>(
2154        &mut self, conn: &mut super::Connection<F>, ty: u64,
2155    ) -> Result<u64> {
2156        let stream_id = self.next_uni_stream_id;
2157
2158        let mut d = [0; 8];
2159        let mut b = octets::OctetsMut::with_slice(&mut d);
2160
2161        match ty {
2162            // Control and QPACK streams are the most important to schedule.
2163            stream::HTTP3_CONTROL_STREAM_TYPE_ID |
2164            stream::QPACK_ENCODER_STREAM_TYPE_ID |
2165            stream::QPACK_DECODER_STREAM_TYPE_ID => {
2166                conn.stream_priority(stream_id, 0, false)?;
2167            },
2168
2169            // TODO: Server push
2170            stream::HTTP3_PUSH_STREAM_TYPE_ID => (),
2171
2172            // Anything else is a GREASE stream, so make it the least important.
2173            _ => {
2174                conn.stream_priority(stream_id, 255, false)?;
2175            },
2176        }
2177
2178        conn.stream_send(stream_id, b.put_varint(ty)?, false)?;
2179
2180        // To avoid skipping stream IDs, we only calculate the next available
2181        // stream ID when data has been successfully buffered.
2182        self.next_uni_stream_id = self
2183            .next_uni_stream_id
2184            .checked_add(4)
2185            .ok_or(Error::IdError)?;
2186
2187        Ok(stream_id)
2188    }
2189
2190    fn open_qpack_encoder_stream<F: BufFactory>(
2191        &mut self, conn: &mut super::Connection<F>,
2192    ) -> Result<()> {
2193        let stream_id =
2194            self.open_uni_stream(conn, stream::QPACK_ENCODER_STREAM_TYPE_ID)?;
2195
2196        self.local_qpack_streams.encoder_stream_id = Some(stream_id);
2197
2198        qlog_with_type!(QLOG_STREAM_TYPE_SET, conn.qlog, q, {
2199            let ev_data = EventData::Http3StreamTypeSet(StreamTypeSet {
2200                stream_id,
2201                initiator: Some(Initiator::Local),
2202                stream_type: StreamType::QpackEncode,
2203                ..Default::default()
2204            });
2205
2206            q.add_event_data_now(ev_data).ok();
2207        });
2208
2209        Ok(())
2210    }
2211
2212    fn open_qpack_decoder_stream<F: BufFactory>(
2213        &mut self, conn: &mut super::Connection<F>,
2214    ) -> Result<()> {
2215        let stream_id =
2216            self.open_uni_stream(conn, stream::QPACK_DECODER_STREAM_TYPE_ID)?;
2217
2218        self.local_qpack_streams.decoder_stream_id = Some(stream_id);
2219
2220        qlog_with_type!(QLOG_STREAM_TYPE_SET, conn.qlog, q, {
2221            let ev_data = EventData::Http3StreamTypeSet(StreamTypeSet {
2222                stream_id,
2223                initiator: Some(Initiator::Local),
2224                stream_type: StreamType::QpackDecode,
2225                ..Default::default()
2226            });
2227
2228            q.add_event_data_now(ev_data).ok();
2229        });
2230
2231        Ok(())
2232    }
2233
2234    /// Send GREASE frames on the provided stream ID.
2235    fn send_grease_frames<F: BufFactory>(
2236        &mut self, conn: &mut super::Connection<F>, stream_id: u64,
2237    ) -> Result<()> {
2238        let mut d = [0; 8];
2239
2240        let stream_cap = match conn.stream_capacity(stream_id) {
2241            Ok(v) => v,
2242
2243            Err(e) => {
2244                if conn.stream_finished(stream_id) {
2245                    self.streams.remove(&stream_id);
2246                }
2247
2248                return Err(e.into());
2249            },
2250        };
2251
2252        let grease_frame1 = grease_value();
2253        let grease_frame2 = grease_value();
2254        let grease_payload = b"GREASE is the word";
2255
2256        let overhead = octets::varint_len(grease_frame1) + // frame type
2257            1 + // payload len
2258            octets::varint_len(grease_frame2) + // frame type
2259            1 + // payload len
2260            grease_payload.len(); // payload
2261
2262        // Don't send GREASE if there is not enough capacity for it. Greasing
2263        // will _not_ be attempted again later on.
2264        if stream_cap < overhead {
2265            return Ok(());
2266        }
2267
2268        // Empty GREASE frame.
2269        let mut b = octets::OctetsMut::with_slice(&mut d);
2270        conn.stream_send(stream_id, b.put_varint(grease_frame1)?, false)?;
2271
2272        let mut b = octets::OctetsMut::with_slice(&mut d);
2273        conn.stream_send(stream_id, b.put_varint(0)?, false)?;
2274
2275        trace!(
2276            "{} tx frm GREASE stream={} len=0",
2277            conn.trace_id(),
2278            stream_id
2279        );
2280
2281        qlog_with_type!(QLOG_FRAME_CREATED, conn.qlog, q, {
2282            let frame = Http3Frame::Reserved { length: Some(0) };
2283            let ev_data = EventData::Http3FrameCreated(FrameCreated {
2284                stream_id,
2285                length: Some(0),
2286                frame,
2287                ..Default::default()
2288            });
2289
2290            q.add_event_data_now(ev_data).ok();
2291        });
2292
2293        // GREASE frame with payload.
2294        let mut b = octets::OctetsMut::with_slice(&mut d);
2295        conn.stream_send(stream_id, b.put_varint(grease_frame2)?, false)?;
2296
2297        let mut b = octets::OctetsMut::with_slice(&mut d);
2298        conn.stream_send(stream_id, b.put_varint(18)?, false)?;
2299
2300        conn.stream_send(stream_id, grease_payload, false)?;
2301
2302        trace!(
2303            "{} tx frm GREASE stream={} len={}",
2304            conn.trace_id(),
2305            stream_id,
2306            grease_payload.len()
2307        );
2308
2309        qlog_with_type!(QLOG_FRAME_CREATED, conn.qlog, q, {
2310            let frame = Http3Frame::Reserved {
2311                length: Some(grease_payload.len() as u64),
2312            };
2313            let ev_data = EventData::Http3FrameCreated(FrameCreated {
2314                stream_id,
2315                length: Some(grease_payload.len() as u64),
2316                frame,
2317                ..Default::default()
2318            });
2319
2320            q.add_event_data_now(ev_data).ok();
2321        });
2322
2323        Ok(())
2324    }
2325
2326    /// Opens a new unidirectional stream with a GREASE type and sends some
2327    /// unframed payload.
2328    fn open_grease_stream<F: BufFactory>(
2329        &mut self, conn: &mut super::Connection<F>,
2330    ) -> Result<()> {
2331        let ty = grease_value();
2332        match self.open_uni_stream(conn, ty) {
2333            Ok(stream_id) => {
2334                conn.stream_send(stream_id, b"GREASE is the word", true)?;
2335
2336                trace!("{} open GREASE stream {}", conn.trace_id(), stream_id);
2337
2338                qlog_with_type!(QLOG_STREAM_TYPE_SET, conn.qlog, q, {
2339                    let ev_data = EventData::Http3StreamTypeSet(StreamTypeSet {
2340                        stream_id,
2341                        initiator: Some(Initiator::Local),
2342                        stream_type: StreamType::Unknown,
2343                        stream_type_bytes: Some(ty),
2344                        ..Default::default()
2345                    });
2346
2347                    q.add_event_data_now(ev_data).ok();
2348                });
2349            },
2350
2351            Err(Error::IdError) => {
2352                trace!("{} GREASE stream blocked", conn.trace_id(),);
2353
2354                return Ok(());
2355            },
2356
2357            Err(e) => return Err(e),
2358        };
2359
2360        Ok(())
2361    }
2362
2363    /// Sends SETTINGS frame based on HTTP/3 configuration.
2364    fn send_settings<F: BufFactory>(
2365        &mut self, conn: &mut super::Connection<F>,
2366    ) -> Result<()> {
2367        let stream_id = match self
2368            .open_uni_stream(conn, stream::HTTP3_CONTROL_STREAM_TYPE_ID)
2369        {
2370            Ok(v) => v,
2371
2372            Err(e) => {
2373                trace!("{} Control stream blocked", conn.trace_id(),);
2374
2375                if e == Error::Done {
2376                    return Err(Error::InternalError);
2377                }
2378
2379                return Err(e);
2380            },
2381        };
2382
2383        self.control_stream_id = Some(stream_id);
2384
2385        qlog_with_type!(QLOG_STREAM_TYPE_SET, conn.qlog, q, {
2386            let ev_data = EventData::Http3StreamTypeSet(StreamTypeSet {
2387                stream_id,
2388                initiator: Some(Initiator::Local),
2389                stream_type: StreamType::Control,
2390                ..Default::default()
2391            });
2392
2393            q.add_event_data_now(ev_data).ok();
2394        });
2395
2396        let grease = if conn.grease {
2397            Some((grease_value(), grease_value()))
2398        } else {
2399            None
2400        };
2401
2402        let frame = frame::Frame::Settings {
2403            max_field_section_size: self.local_settings.max_field_section_size,
2404            qpack_max_table_capacity: self
2405                .local_settings
2406                .qpack_max_table_capacity,
2407            qpack_blocked_streams: self.local_settings.qpack_blocked_streams,
2408            connect_protocol_enabled: self
2409                .local_settings
2410                .connect_protocol_enabled,
2411            h3_datagram: self.local_settings.h3_datagram,
2412            grease,
2413            additional_settings: self.local_settings.additional_settings.clone(),
2414            raw: Default::default(),
2415        };
2416
2417        let mut d = [42; 128];
2418        let mut b = octets::OctetsMut::with_slice(&mut d);
2419
2420        frame.to_bytes(&mut b)?;
2421
2422        let off = b.off();
2423
2424        if let Some(id) = self.control_stream_id {
2425            conn.stream_send(id, &d[..off], false)?;
2426
2427            trace!(
2428                "{} tx frm SETTINGS stream={} len={}",
2429                conn.trace_id(),
2430                id,
2431                off
2432            );
2433
2434            qlog_with_type!(QLOG_FRAME_CREATED, conn.qlog, q, {
2435                let frame = frame.to_qlog();
2436                let ev_data = EventData::Http3FrameCreated(FrameCreated {
2437                    stream_id: id,
2438                    length: Some(off as u64),
2439                    frame,
2440                    ..Default::default()
2441                });
2442
2443                q.add_event_data_now(ev_data).ok();
2444            });
2445        }
2446
2447        Ok(())
2448    }
2449
2450    fn process_control_stream<F: BufFactory>(
2451        &mut self, conn: &mut super::Connection<F>, stream_id: u64,
2452    ) -> Result<(u64, Event)> {
2453        close_conn_if_critical_stream_finished(conn, stream_id)?;
2454
2455        if !conn.stream_readable(stream_id) {
2456            return Err(Error::Done);
2457        }
2458
2459        match self.process_readable_stream(conn, stream_id, true) {
2460            Ok(ev) => return Ok(ev),
2461
2462            Err(Error::Done) => (),
2463
2464            Err(e) => return Err(e),
2465        };
2466
2467        close_conn_if_critical_stream_finished(conn, stream_id)?;
2468
2469        Err(Error::Done)
2470    }
2471
2472    fn process_readable_stream<F: BufFactory>(
2473        &mut self, conn: &mut super::Connection<F>, stream_id: u64, polling: bool,
2474    ) -> Result<(u64, Event)> {
2475        self.streams
2476            .entry(stream_id)
2477            .or_insert_with(|| <stream::Stream>::new(stream_id, false));
2478
2479        // We need to get a fresh reference to the stream for each
2480        // iteration, to avoid borrowing `self` for the entire duration
2481        // of the loop, because we'll need to borrow it again in the
2482        // `State::FramePayload` case below.
2483        while let Some(stream) = self.streams.get_mut(&stream_id) {
2484            match stream.state() {
2485                stream::State::StreamType => {
2486                    stream.try_fill_buffer(conn)?;
2487
2488                    let varint = match stream.try_consume_varint() {
2489                        Ok(v) => v,
2490
2491                        Err(_) => continue,
2492                    };
2493
2494                    let ty = stream::Type::deserialize(varint)?;
2495
2496                    if let Err(e) = stream.set_ty(ty) {
2497                        conn.close(true, e.to_wire(), b"")?;
2498                        return Err(e);
2499                    }
2500
2501                    qlog_with_type!(QLOG_STREAM_TYPE_SET, conn.qlog, q, {
2502                        let ty_val = if matches!(ty, stream::Type::Unknown) {
2503                            Some(varint)
2504                        } else {
2505                            None
2506                        };
2507
2508                        let ev_data =
2509                            EventData::Http3StreamTypeSet(StreamTypeSet {
2510                                stream_id,
2511                                initiator: Some(Initiator::Remote),
2512                                stream_type: ty.to_qlog(),
2513                                stream_type_bytes: ty_val,
2514                                ..Default::default()
2515                            });
2516
2517                        q.add_event_data_now(ev_data).ok();
2518                    });
2519
2520                    match &ty {
2521                        stream::Type::Control => {
2522                            // Only one control stream allowed.
2523                            if self.peer_control_stream_id.is_some() {
2524                                conn.close(
2525                                    true,
2526                                    Error::StreamCreationError.to_wire(),
2527                                    b"Received multiple control streams",
2528                                )?;
2529
2530                                return Err(Error::StreamCreationError);
2531                            }
2532
2533                            trace!(
2534                                "{} open peer's control stream {}",
2535                                conn.trace_id(),
2536                                stream_id
2537                            );
2538
2539                            close_conn_if_critical_stream_finished(
2540                                conn, stream_id,
2541                            )?;
2542
2543                            self.peer_control_stream_id = Some(stream_id);
2544                        },
2545
2546                        stream::Type::Push => {
2547                            // Only clients can receive push stream.
2548                            if self.is_server {
2549                                conn.close(
2550                                    true,
2551                                    Error::StreamCreationError.to_wire(),
2552                                    b"Server received push stream.",
2553                                )?;
2554
2555                                return Err(Error::StreamCreationError);
2556                            }
2557                        },
2558
2559                        stream::Type::QpackEncoder => {
2560                            // Only one qpack encoder stream allowed.
2561                            if self.peer_qpack_streams.encoder_stream_id.is_some()
2562                            {
2563                                conn.close(
2564                                    true,
2565                                    Error::StreamCreationError.to_wire(),
2566                                    b"Received multiple QPACK encoder streams",
2567                                )?;
2568
2569                                return Err(Error::StreamCreationError);
2570                            }
2571
2572                            close_conn_if_critical_stream_finished(
2573                                conn, stream_id,
2574                            )?;
2575
2576                            self.peer_qpack_streams.encoder_stream_id =
2577                                Some(stream_id);
2578                        },
2579
2580                        stream::Type::QpackDecoder => {
2581                            // Only one qpack decoder allowed.
2582                            if self.peer_qpack_streams.decoder_stream_id.is_some()
2583                            {
2584                                conn.close(
2585                                    true,
2586                                    Error::StreamCreationError.to_wire(),
2587                                    b"Received multiple QPACK decoder streams",
2588                                )?;
2589
2590                                return Err(Error::StreamCreationError);
2591                            }
2592
2593                            close_conn_if_critical_stream_finished(
2594                                conn, stream_id,
2595                            )?;
2596
2597                            self.peer_qpack_streams.decoder_stream_id =
2598                                Some(stream_id);
2599                        },
2600
2601                        stream::Type::Unknown => {
2602                            // Unknown stream types are ignored.
2603                            // TODO: we MAY send STOP_SENDING
2604                        },
2605
2606                        stream::Type::Request => unreachable!(),
2607                    }
2608                },
2609
2610                stream::State::PushId => {
2611                    stream.try_fill_buffer(conn)?;
2612
2613                    let varint = match stream.try_consume_varint() {
2614                        Ok(v) => v,
2615
2616                        Err(_) => continue,
2617                    };
2618
2619                    if let Err(e) = stream.set_push_id(varint) {
2620                        conn.close(true, e.to_wire(), b"")?;
2621                        return Err(e);
2622                    }
2623                },
2624
2625                stream::State::FrameType => {
2626                    stream.try_fill_buffer(conn)?;
2627
2628                    let varint = match stream.try_consume_varint() {
2629                        Ok(v) => v,
2630
2631                        Err(_) => continue,
2632                    };
2633
2634                    match stream.set_frame_type(varint) {
2635                        Err(Error::FrameUnexpected) => {
2636                            let msg = format!("Unexpected frame type {varint}");
2637
2638                            conn.close(
2639                                true,
2640                                Error::FrameUnexpected.to_wire(),
2641                                msg.as_bytes(),
2642                            )?;
2643
2644                            return Err(Error::FrameUnexpected);
2645                        },
2646
2647                        Err(e) => {
2648                            conn.close(
2649                                true,
2650                                e.to_wire(),
2651                                b"Error handling frame.",
2652                            )?;
2653
2654                            return Err(e);
2655                        },
2656
2657                        _ => (),
2658                    }
2659                },
2660
2661                stream::State::FramePayloadLen => {
2662                    stream.try_fill_buffer(conn)?;
2663
2664                    let payload_len = match stream.try_consume_varint() {
2665                        Ok(v) => v,
2666
2667                        Err(_) => continue,
2668                    };
2669
2670                    // DATA frames are handled uniquely. After this point we lose
2671                    // visibility of DATA framing, so just log here.
2672                    if Some(frame::DATA_FRAME_TYPE_ID) == stream.frame_type() {
2673                        trace!(
2674                            "{} rx frm DATA stream={} wire_payload_len={}",
2675                            conn.trace_id(),
2676                            stream_id,
2677                            payload_len
2678                        );
2679
2680                        qlog_with_type!(QLOG_FRAME_PARSED, conn.qlog, q, {
2681                            let frame = Http3Frame::Data { raw: None };
2682
2683                            let ev_data =
2684                                EventData::Http3FrameParsed(FrameParsed {
2685                                    stream_id,
2686                                    length: Some(payload_len),
2687                                    frame,
2688                                    ..Default::default()
2689                                });
2690
2691                            q.add_event_data_now(ev_data).ok();
2692                        });
2693                    }
2694
2695                    if let Err(e) = stream.set_frame_payload_len(payload_len) {
2696                        conn.close(true, e.to_wire(), b"")?;
2697                        return Err(e);
2698                    }
2699                },
2700
2701                stream::State::FramePayload => {
2702                    // Do not emit events when not polling.
2703                    if !polling {
2704                        break;
2705                    }
2706
2707                    stream.try_fill_buffer(conn)?;
2708
2709                    let (frame, payload_len) = match stream.try_consume_frame() {
2710                        Ok(frame) => frame,
2711
2712                        Err(Error::Done) => return Err(Error::Done),
2713
2714                        Err(e) => {
2715                            conn.close(
2716                                true,
2717                                e.to_wire(),
2718                                b"Error handling frame.",
2719                            )?;
2720
2721                            return Err(e);
2722                        },
2723                    };
2724
2725                    match self.process_frame(conn, stream_id, frame, payload_len)
2726                    {
2727                        Ok(ev) => return Ok(ev),
2728
2729                        Err(Error::Done) => {
2730                            // This might be a frame that is processed internally
2731                            // without needing to bubble up to the user as an
2732                            // event. Check whether the frame has FIN'd by QUIC
2733                            // to prevent trying to read again on a closed stream.
2734                            if conn.stream_finished(stream_id) {
2735                                break;
2736                            }
2737                        },
2738
2739                        Err(e) => return Err(e),
2740                    };
2741                },
2742
2743                stream::State::Data => {
2744                    // Do not emit events when not polling.
2745                    if !polling {
2746                        break;
2747                    }
2748
2749                    if !stream.try_trigger_data_event() {
2750                        break;
2751                    }
2752
2753                    return Ok((stream_id, Event::Data));
2754                },
2755
2756                stream::State::QpackInstruction => {
2757                    let mut d = [0; 4096];
2758
2759                    // Read data from the stream and discard immediately.
2760                    loop {
2761                        let (recv, fin) = conn.stream_recv(stream_id, &mut d)?;
2762
2763                        match stream.ty() {
2764                            Some(stream::Type::QpackEncoder) =>
2765                                self.peer_qpack_streams.encoder_stream_bytes +=
2766                                    recv as u64,
2767                            Some(stream::Type::QpackDecoder) =>
2768                                self.peer_qpack_streams.decoder_stream_bytes +=
2769                                    recv as u64,
2770                            _ => unreachable!(),
2771                        };
2772
2773                        if fin {
2774                            close_conn_critical_stream(conn)?;
2775                        }
2776                    }
2777                },
2778
2779                stream::State::Drain => {
2780                    // Discard incoming data on the stream.
2781                    conn.stream_shutdown(
2782                        stream_id,
2783                        crate::Shutdown::Read,
2784                        0x100,
2785                    )?;
2786
2787                    break;
2788                },
2789
2790                stream::State::Finished => break,
2791            }
2792        }
2793
2794        Err(Error::Done)
2795    }
2796
2797    fn process_finished_stream(&mut self, stream_id: u64) {
2798        let stream = match self.streams.get_mut(&stream_id) {
2799            Some(v) => v,
2800
2801            None => return,
2802        };
2803
2804        if stream.state() == stream::State::Finished {
2805            return;
2806        }
2807
2808        match stream.ty() {
2809            Some(stream::Type::Request) | Some(stream::Type::Push) => {
2810                stream.finished();
2811
2812                self.finished_streams.push_back(stream_id);
2813            },
2814
2815            _ => (),
2816        };
2817    }
2818
2819    fn process_frame<F: BufFactory>(
2820        &mut self, conn: &mut super::Connection<F>, stream_id: u64,
2821        frame: frame::Frame, payload_len: u64,
2822    ) -> Result<(u64, Event)> {
2823        trace!(
2824            "{} rx frm {:?} stream={} payload_len={}",
2825            conn.trace_id(),
2826            frame,
2827            stream_id,
2828            payload_len
2829        );
2830
2831        qlog_with_type!(QLOG_FRAME_PARSED, conn.qlog, q, {
2832            // HEADERS frames are special case and will be logged below.
2833            if !matches!(frame, frame::Frame::Headers { .. }) {
2834                let frame = frame.to_qlog();
2835                let ev_data = EventData::Http3FrameParsed(FrameParsed {
2836                    stream_id,
2837                    length: Some(payload_len),
2838                    frame,
2839                    ..Default::default()
2840                });
2841
2842                q.add_event_data_now(ev_data).ok();
2843            }
2844        });
2845
2846        match frame {
2847            frame::Frame::Settings {
2848                max_field_section_size,
2849                qpack_max_table_capacity,
2850                qpack_blocked_streams,
2851                connect_protocol_enabled,
2852                h3_datagram,
2853                additional_settings,
2854                raw,
2855                ..
2856            } => {
2857                self.peer_settings = ConnectionSettings {
2858                    max_field_section_size,
2859                    qpack_max_table_capacity,
2860                    qpack_blocked_streams,
2861                    connect_protocol_enabled,
2862                    h3_datagram,
2863                    additional_settings,
2864                    raw,
2865                };
2866
2867                if let Some(1) = h3_datagram {
2868                    // The peer MUST have also enabled DATAGRAM with a TP
2869                    if conn.dgram_max_writable_len().is_none() {
2870                        conn.close(
2871                            true,
2872                            Error::SettingsError.to_wire(),
2873                            b"H3_DATAGRAM sent with value 1 but max_datagram_frame_size TP not set.",
2874                        )?;
2875
2876                        return Err(Error::SettingsError);
2877                    }
2878                }
2879            },
2880
2881            frame::Frame::Headers { header_block } => {
2882                // Servers reject too many HEADERS frames.
2883                if let Some(s) = self.streams.get_mut(&stream_id) {
2884                    if self.is_server && s.headers_received_count() == 2 {
2885                        conn.close(
2886                            true,
2887                            Error::FrameUnexpected.to_wire(),
2888                            b"Too many HEADERS frames",
2889                        )?;
2890                        return Err(Error::FrameUnexpected);
2891                    }
2892
2893                    s.increment_headers_received();
2894                }
2895
2896                // Use "infinite" as default value for max_field_section_size if
2897                // it is not configured by the application.
2898                let max_size = self
2899                    .local_settings
2900                    .max_field_section_size
2901                    .unwrap_or(u64::MAX);
2902
2903                let headers = match self
2904                    .qpack_decoder
2905                    .decode(&header_block[..], max_size)
2906                {
2907                    Ok(v) => v,
2908
2909                    Err(e) => {
2910                        let e = match e {
2911                            qpack::Error::HeaderListTooLarge =>
2912                                Error::ExcessiveLoad,
2913
2914                            _ => Error::QpackDecompressionFailed,
2915                        };
2916
2917                        conn.close(true, e.to_wire(), b"Error parsing headers.")?;
2918
2919                        return Err(e);
2920                    },
2921                };
2922
2923                qlog_with_type!(QLOG_FRAME_PARSED, conn.qlog, q, {
2924                    let qlog_headers = headers
2925                        .iter()
2926                        .map(|h| qlog::events::http3::HttpHeader {
2927                            name: Some(
2928                                String::from_utf8_lossy(h.name()).into_owned(),
2929                            ),
2930                            name_bytes: None,
2931                            value: Some(
2932                                String::from_utf8_lossy(h.value()).into_owned(),
2933                            ),
2934                            value_bytes: None,
2935                        })
2936                        .collect();
2937
2938                    let frame = Http3Frame::Headers {
2939                        headers: qlog_headers,
2940                    };
2941
2942                    let ev_data = EventData::Http3FrameParsed(FrameParsed {
2943                        stream_id,
2944                        length: Some(payload_len),
2945                        frame,
2946                        ..Default::default()
2947                    });
2948
2949                    q.add_event_data_now(ev_data).ok();
2950                });
2951
2952                let more_frames = !conn.stream_finished(stream_id);
2953
2954                return Ok((stream_id, Event::Headers {
2955                    list: headers,
2956                    more_frames,
2957                }));
2958            },
2959
2960            frame::Frame::Data { .. } => {
2961                // Do nothing. The Data event is returned separately.
2962            },
2963
2964            frame::Frame::GoAway { id } => {
2965                if !self.is_server && id % 4 != 0 {
2966                    conn.close(
2967                        true,
2968                        Error::FrameUnexpected.to_wire(),
2969                        b"GOAWAY received with ID of non-request stream",
2970                    )?;
2971
2972                    return Err(Error::IdError);
2973                }
2974
2975                if let Some(received_id) = self.peer_goaway_id {
2976                    if id > received_id {
2977                        conn.close(
2978                            true,
2979                            Error::IdError.to_wire(),
2980                            b"GOAWAY received with ID larger than previously received",
2981                        )?;
2982
2983                        return Err(Error::IdError);
2984                    }
2985                }
2986
2987                self.peer_goaway_id = Some(id);
2988
2989                return Ok((id, Event::GoAway));
2990            },
2991
2992            frame::Frame::MaxPushId { push_id } => {
2993                if !self.is_server {
2994                    conn.close(
2995                        true,
2996                        Error::FrameUnexpected.to_wire(),
2997                        b"MAX_PUSH_ID received by client",
2998                    )?;
2999
3000                    return Err(Error::FrameUnexpected);
3001                }
3002
3003                if push_id < self.max_push_id {
3004                    conn.close(
3005                        true,
3006                        Error::IdError.to_wire(),
3007                        b"MAX_PUSH_ID reduced limit",
3008                    )?;
3009
3010                    return Err(Error::IdError);
3011                }
3012
3013                self.max_push_id = push_id;
3014            },
3015
3016            frame::Frame::PushPromise { .. } => {
3017                if self.is_server {
3018                    conn.close(
3019                        true,
3020                        Error::FrameUnexpected.to_wire(),
3021                        b"PUSH_PROMISE received by server",
3022                    )?;
3023
3024                    return Err(Error::FrameUnexpected);
3025                }
3026
3027                if stream_id % 4 != 0 {
3028                    conn.close(
3029                        true,
3030                        Error::FrameUnexpected.to_wire(),
3031                        b"PUSH_PROMISE received on non-request stream",
3032                    )?;
3033
3034                    return Err(Error::FrameUnexpected);
3035                }
3036
3037                // TODO: implement more checks and PUSH_PROMISE event
3038            },
3039
3040            frame::Frame::CancelPush { .. } => {
3041                // TODO: implement CANCEL_PUSH frame
3042            },
3043
3044            frame::Frame::PriorityUpdateRequest {
3045                prioritized_element_id,
3046                priority_field_value,
3047            } => {
3048                if !self.is_server {
3049                    conn.close(
3050                        true,
3051                        Error::FrameUnexpected.to_wire(),
3052                        b"PRIORITY_UPDATE received by client",
3053                    )?;
3054
3055                    return Err(Error::FrameUnexpected);
3056                }
3057
3058                if prioritized_element_id % 4 != 0 {
3059                    conn.close(
3060                        true,
3061                        Error::FrameUnexpected.to_wire(),
3062                        b"PRIORITY_UPDATE for request stream type with wrong ID",
3063                    )?;
3064
3065                    return Err(Error::FrameUnexpected);
3066                }
3067
3068                if prioritized_element_id > conn.streams.max_streams_bidi() * 4 {
3069                    conn.close(
3070                        true,
3071                        Error::IdError.to_wire(),
3072                        b"PRIORITY_UPDATE for request stream beyond max streams limit",
3073                    )?;
3074
3075                    return Err(Error::IdError);
3076                }
3077
3078                // If the PRIORITY_UPDATE is valid, consider storing the latest
3079                // contents. Due to reordering, it is possible that we might
3080                // receive frames that reference streams that have not yet to
3081                // been opened and that's OK because it's within our concurrency
3082                // limit. However, we discard PRIORITY_UPDATE that refers to
3083                // streams that we know have been collected.
3084                if conn.streams.is_collected(prioritized_element_id) {
3085                    return Err(Error::Done);
3086                }
3087
3088                // If the stream did not yet exist, create it and store.
3089                let stream =
3090                    self.streams.entry(prioritized_element_id).or_insert_with(
3091                        || <stream::Stream>::new(prioritized_element_id, false),
3092                    );
3093
3094                let had_priority_update = stream.has_last_priority_update();
3095                stream.set_last_priority_update(Some(priority_field_value));
3096
3097                // Only trigger the event when there wasn't already a stored
3098                // PRIORITY_UPDATE.
3099                if !had_priority_update {
3100                    return Ok((prioritized_element_id, Event::PriorityUpdate));
3101                } else {
3102                    return Err(Error::Done);
3103                }
3104            },
3105
3106            frame::Frame::PriorityUpdatePush {
3107                prioritized_element_id,
3108                ..
3109            } => {
3110                if !self.is_server {
3111                    conn.close(
3112                        true,
3113                        Error::FrameUnexpected.to_wire(),
3114                        b"PRIORITY_UPDATE received by client",
3115                    )?;
3116
3117                    return Err(Error::FrameUnexpected);
3118                }
3119
3120                if prioritized_element_id % 3 != 0 {
3121                    conn.close(
3122                        true,
3123                        Error::FrameUnexpected.to_wire(),
3124                        b"PRIORITY_UPDATE for push stream type with wrong ID",
3125                    )?;
3126
3127                    return Err(Error::FrameUnexpected);
3128                }
3129
3130                // TODO: we only implement this if we implement server push
3131            },
3132
3133            frame::Frame::Unknown { .. } => (),
3134        }
3135
3136        Err(Error::Done)
3137    }
3138
3139    /// Collects and returns statistics about the connection.
3140    #[inline]
3141    pub fn stats(&self) -> Stats {
3142        Stats {
3143            qpack_encoder_stream_recv_bytes: self
3144                .peer_qpack_streams
3145                .encoder_stream_bytes,
3146            qpack_decoder_stream_recv_bytes: self
3147                .peer_qpack_streams
3148                .decoder_stream_bytes,
3149        }
3150    }
3151}
3152
3153/// Generates an HTTP/3 GREASE variable length integer.
3154pub fn grease_value() -> u64 {
3155    let n = super::rand::rand_u64_uniform(148_764_065_110_560_899);
3156    31 * n + 33
3157}
3158
3159#[doc(hidden)]
3160#[cfg(any(test, feature = "internal"))]
3161pub mod testing {
3162    use super::*;
3163
3164    use crate::test_utils;
3165    use crate::DefaultBufFactory;
3166
3167    /// Session is an HTTP/3 test helper structure. It holds a client, server
3168    /// and pipe that allows them to communicate.
3169    ///
3170    /// `default()` creates a session with some sensible default
3171    /// configuration. `with_configs()` allows for providing a specific
3172    /// configuration.
3173    ///
3174    /// `handshake()` performs all the steps needed to establish an HTTP/3
3175    /// connection.
3176    ///
3177    /// Some utility functions are provided that make it less verbose to send
3178    /// request, responses and individual headers. The full quiche API remains
3179    /// available for any test that need to do unconventional things (such as
3180    /// bad behaviour that triggers errors).
3181    pub struct Session<F = DefaultBufFactory>
3182    where
3183        F: BufFactory,
3184    {
3185        pub pipe: test_utils::Pipe<F>,
3186        pub client: Connection,
3187        pub server: Connection,
3188    }
3189
3190    impl Session {
3191        pub fn new() -> Result<Session> {
3192            Session::<DefaultBufFactory>::new_with_buf()
3193        }
3194
3195        pub fn with_configs(
3196            config: &mut crate::Config, h3_config: &Config,
3197        ) -> Result<Session> {
3198            Session::<DefaultBufFactory>::with_configs_and_buf(config, h3_config)
3199        }
3200    }
3201
3202    impl<F: BufFactory> Session<F> {
3203        pub fn new_with_buf() -> Result<Session<F>> {
3204            fn path_relative_to_manifest_dir(path: &str) -> String {
3205                std::fs::canonicalize(
3206                    std::path::Path::new(env!("CARGO_MANIFEST_DIR")).join(path),
3207                )
3208                .unwrap()
3209                .to_string_lossy()
3210                .into_owned()
3211            }
3212
3213            let mut config = crate::Config::new(crate::PROTOCOL_VERSION)?;
3214            config.load_cert_chain_from_pem_file(
3215                &path_relative_to_manifest_dir("examples/cert.crt"),
3216            )?;
3217            config.load_priv_key_from_pem_file(
3218                &path_relative_to_manifest_dir("examples/cert.key"),
3219            )?;
3220            config.set_application_protos(&[b"h3"])?;
3221            config.set_initial_max_data(1500);
3222            config.set_initial_max_stream_data_bidi_local(150);
3223            config.set_initial_max_stream_data_bidi_remote(150);
3224            config.set_initial_max_stream_data_uni(150);
3225            config.set_initial_max_streams_bidi(5);
3226            config.set_initial_max_streams_uni(5);
3227            config.verify_peer(false);
3228            config.enable_dgram(true, 3, 3);
3229            config.set_ack_delay_exponent(8);
3230
3231            let h3_config = Config::new()?;
3232            Session::with_configs_and_buf(&mut config, &h3_config)
3233        }
3234
3235        pub fn with_configs_and_buf(
3236            config: &mut crate::Config, h3_config: &Config,
3237        ) -> Result<Session<F>> {
3238            let pipe = test_utils::Pipe::with_config_and_buf(config)?;
3239            let client_dgram = pipe.client.dgram_enabled();
3240            let server_dgram = pipe.server.dgram_enabled();
3241            Ok(Session {
3242                pipe,
3243                client: Connection::new(h3_config, false, client_dgram)?,
3244                server: Connection::new(h3_config, true, server_dgram)?,
3245            })
3246        }
3247
3248        /// Do the HTTP/3 handshake so both ends are in sane initial state.
3249        pub fn handshake(&mut self) -> Result<()> {
3250            self.pipe.handshake()?;
3251
3252            // Client streams.
3253            self.client.send_settings(&mut self.pipe.client)?;
3254            self.pipe.advance().ok();
3255
3256            self.client
3257                .open_qpack_encoder_stream(&mut self.pipe.client)?;
3258            self.pipe.advance().ok();
3259
3260            self.client
3261                .open_qpack_decoder_stream(&mut self.pipe.client)?;
3262            self.pipe.advance().ok();
3263
3264            if self.pipe.client.grease {
3265                self.client.open_grease_stream(&mut self.pipe.client)?;
3266            }
3267
3268            self.pipe.advance().ok();
3269
3270            // Server streams.
3271            self.server.send_settings(&mut self.pipe.server)?;
3272            self.pipe.advance().ok();
3273
3274            self.server
3275                .open_qpack_encoder_stream(&mut self.pipe.server)?;
3276            self.pipe.advance().ok();
3277
3278            self.server
3279                .open_qpack_decoder_stream(&mut self.pipe.server)?;
3280            self.pipe.advance().ok();
3281
3282            if self.pipe.server.grease {
3283                self.server.open_grease_stream(&mut self.pipe.server)?;
3284            }
3285
3286            self.advance().ok();
3287
3288            while self.client.poll(&mut self.pipe.client).is_ok() {
3289                // Do nothing.
3290            }
3291
3292            while self.server.poll(&mut self.pipe.server).is_ok() {
3293                // Do nothing.
3294            }
3295
3296            Ok(())
3297        }
3298
3299        /// Advances the session pipe over the buffer.
3300        pub fn advance(&mut self) -> crate::Result<()> {
3301            self.pipe.advance()
3302        }
3303
3304        /// Polls the client for events.
3305        pub fn poll_client(&mut self) -> Result<(u64, Event)> {
3306            self.client.poll(&mut self.pipe.client)
3307        }
3308
3309        /// Polls the server for events.
3310        pub fn poll_server(&mut self) -> Result<(u64, Event)> {
3311            self.server.poll(&mut self.pipe.server)
3312        }
3313
3314        /// Sends a request from client with default headers.
3315        ///
3316        /// On success it returns the newly allocated stream and the headers.
3317        pub fn send_request(&mut self, fin: bool) -> Result<(u64, Vec<Header>)> {
3318            let req = vec![
3319                Header::new(b":method", b"GET"),
3320                Header::new(b":scheme", b"https"),
3321                Header::new(b":authority", b"quic.tech"),
3322                Header::new(b":path", b"/test"),
3323                Header::new(b"user-agent", b"quiche-test"),
3324            ];
3325
3326            let stream =
3327                self.client.send_request(&mut self.pipe.client, &req, fin)?;
3328
3329            self.advance().ok();
3330
3331            Ok((stream, req))
3332        }
3333
3334        /// Sends a response from server with default headers.
3335        ///
3336        /// On success it returns the headers.
3337        pub fn send_response(
3338            &mut self, stream: u64, fin: bool,
3339        ) -> Result<Vec<Header>> {
3340            let resp = vec![
3341                Header::new(b":status", b"200"),
3342                Header::new(b"server", b"quiche-test"),
3343            ];
3344
3345            self.server.send_response(
3346                &mut self.pipe.server,
3347                stream,
3348                &resp,
3349                fin,
3350            )?;
3351
3352            self.advance().ok();
3353
3354            Ok(resp)
3355        }
3356
3357        /// Sends some default payload from client.
3358        ///
3359        /// On success it returns the payload.
3360        pub fn send_body_client(
3361            &mut self, stream: u64, fin: bool,
3362        ) -> Result<Vec<u8>> {
3363            let bytes = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
3364
3365            self.client
3366                .send_body(&mut self.pipe.client, stream, &bytes, fin)?;
3367
3368            self.advance().ok();
3369
3370            Ok(bytes)
3371        }
3372
3373        /// Fetches DATA payload from the server.
3374        ///
3375        /// On success it returns the number of bytes received.
3376        pub fn recv_body_client(
3377            &mut self, stream: u64, buf: &mut [u8],
3378        ) -> Result<usize> {
3379            self.client.recv_body(&mut self.pipe.client, stream, buf)
3380        }
3381
3382        /// Sends some default payload from server.
3383        ///
3384        /// On success it returns the payload.
3385        pub fn send_body_server(
3386            &mut self, stream: u64, fin: bool,
3387        ) -> Result<Vec<u8>> {
3388            let bytes = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
3389
3390            self.server
3391                .send_body(&mut self.pipe.server, stream, &bytes, fin)?;
3392
3393            self.advance().ok();
3394
3395            Ok(bytes)
3396        }
3397
3398        /// Fetches DATA payload from the client.
3399        ///
3400        /// On success it returns the number of bytes received.
3401        pub fn recv_body_server(
3402            &mut self, stream: u64, buf: &mut [u8],
3403        ) -> Result<usize> {
3404            self.server.recv_body(&mut self.pipe.server, stream, buf)
3405        }
3406
3407        /// Sends a single HTTP/3 frame from the client.
3408        pub fn send_frame_client(
3409            &mut self, frame: frame::Frame, stream_id: u64, fin: bool,
3410        ) -> Result<()> {
3411            let mut d = [42; 65535];
3412
3413            let mut b = octets::OctetsMut::with_slice(&mut d);
3414
3415            frame.to_bytes(&mut b)?;
3416
3417            let off = b.off();
3418            self.pipe.client.stream_send(stream_id, &d[..off], fin)?;
3419
3420            self.advance().ok();
3421
3422            Ok(())
3423        }
3424
3425        /// Send an HTTP/3 DATAGRAM with default data from the client.
3426        ///
3427        /// On success it returns the data.
3428        pub fn send_dgram_client(&mut self, flow_id: u64) -> Result<Vec<u8>> {
3429            let bytes = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
3430            let len = octets::varint_len(flow_id) + bytes.len();
3431            let mut d = vec![0; len];
3432            let mut b = octets::OctetsMut::with_slice(&mut d);
3433
3434            b.put_varint(flow_id)?;
3435            b.put_bytes(&bytes)?;
3436
3437            self.pipe.client.dgram_send(&d)?;
3438
3439            self.advance().ok();
3440
3441            Ok(bytes)
3442        }
3443
3444        /// Receives an HTTP/3 DATAGRAM from the server.
3445        ///
3446        /// On success it returns the DATAGRAM length, flow ID and flow ID
3447        /// length.
3448        pub fn recv_dgram_client(
3449            &mut self, buf: &mut [u8],
3450        ) -> Result<(usize, u64, usize)> {
3451            let len = self.pipe.client.dgram_recv(buf)?;
3452            let mut b = octets::Octets::with_slice(buf);
3453            let flow_id = b.get_varint()?;
3454
3455            Ok((len, flow_id, b.off()))
3456        }
3457
3458        /// Send an HTTP/3 DATAGRAM with default data from the server
3459        ///
3460        /// On success it returns the data.
3461        pub fn send_dgram_server(&mut self, flow_id: u64) -> Result<Vec<u8>> {
3462            let bytes = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
3463            let len = octets::varint_len(flow_id) + bytes.len();
3464            let mut d = vec![0; len];
3465            let mut b = octets::OctetsMut::with_slice(&mut d);
3466
3467            b.put_varint(flow_id)?;
3468            b.put_bytes(&bytes)?;
3469
3470            self.pipe.server.dgram_send(&d)?;
3471
3472            self.advance().ok();
3473
3474            Ok(bytes)
3475        }
3476
3477        /// Receives an HTTP/3 DATAGRAM from the client.
3478        ///
3479        /// On success it returns the DATAGRAM length, flow ID and flow ID
3480        /// length.
3481        pub fn recv_dgram_server(
3482            &mut self, buf: &mut [u8],
3483        ) -> Result<(usize, u64, usize)> {
3484            let len = self.pipe.server.dgram_recv(buf)?;
3485            let mut b = octets::Octets::with_slice(buf);
3486            let flow_id = b.get_varint()?;
3487
3488            Ok((len, flow_id, b.off()))
3489        }
3490
3491        /// Sends a single HTTP/3 frame from the server.
3492        pub fn send_frame_server(
3493            &mut self, frame: frame::Frame, stream_id: u64, fin: bool,
3494        ) -> Result<()> {
3495            let mut d = [42; 65535];
3496
3497            let mut b = octets::OctetsMut::with_slice(&mut d);
3498
3499            frame.to_bytes(&mut b)?;
3500
3501            let off = b.off();
3502            self.pipe.server.stream_send(stream_id, &d[..off], fin)?;
3503
3504            self.advance().ok();
3505
3506            Ok(())
3507        }
3508
3509        /// Sends an arbitrary buffer of HTTP/3 stream data from the client.
3510        pub fn send_arbitrary_stream_data_client(
3511            &mut self, data: &[u8], stream_id: u64, fin: bool,
3512        ) -> Result<()> {
3513            self.pipe.client.stream_send(stream_id, data, fin)?;
3514
3515            self.advance().ok();
3516
3517            Ok(())
3518        }
3519
3520        /// Sends an arbitrary buffer of HTTP/3 stream data from the server.
3521        pub fn send_arbitrary_stream_data_server(
3522            &mut self, data: &[u8], stream_id: u64, fin: bool,
3523        ) -> Result<()> {
3524            self.pipe.server.stream_send(stream_id, data, fin)?;
3525
3526            self.advance().ok();
3527
3528            Ok(())
3529        }
3530    }
3531}
3532
3533#[cfg(test)]
3534mod tests {
3535    use super::*;
3536
3537    use super::testing::*;
3538
3539    #[test]
3540    /// Make sure that random GREASE values is within the specified limit.
3541    fn grease_value_in_varint_limit() {
3542        assert!(grease_value() < 2u64.pow(62) - 1);
3543    }
3544
3545    #[cfg(not(feature = "openssl"))] // 0-RTT not supported when using openssl/quictls
3546    #[test]
3547    fn h3_handshake_0rtt() {
3548        let mut buf = [0; 65535];
3549
3550        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
3551        config
3552            .load_cert_chain_from_pem_file("examples/cert.crt")
3553            .unwrap();
3554        config
3555            .load_priv_key_from_pem_file("examples/cert.key")
3556            .unwrap();
3557        config
3558            .set_application_protos(&[b"proto1", b"proto2"])
3559            .unwrap();
3560        config.set_initial_max_data(30);
3561        config.set_initial_max_stream_data_bidi_local(15);
3562        config.set_initial_max_stream_data_bidi_remote(15);
3563        config.set_initial_max_stream_data_uni(15);
3564        config.set_initial_max_streams_bidi(3);
3565        config.set_initial_max_streams_uni(3);
3566        config.enable_early_data();
3567        config.verify_peer(false);
3568
3569        let h3_config = Config::new().unwrap();
3570
3571        // Perform initial handshake.
3572        let mut pipe = crate::test_utils::Pipe::with_config(&mut config).unwrap();
3573        assert_eq!(pipe.handshake(), Ok(()));
3574
3575        // Extract session,
3576        let session = pipe.client.session().unwrap();
3577
3578        // Configure session on new connection.
3579        let mut pipe = crate::test_utils::Pipe::with_config(&mut config).unwrap();
3580        assert_eq!(pipe.client.set_session(session), Ok(()));
3581
3582        // Can't create an H3 connection until the QUIC connection is determined
3583        // to have made sufficient early data progress.
3584        assert!(matches!(
3585            Connection::with_transport(&mut pipe.client, &h3_config),
3586            Err(Error::InternalError)
3587        ));
3588
3589        // Client sends initial flight.
3590        let (len, _) = pipe.client.send(&mut buf).unwrap();
3591
3592        // Now an H3 connection can be created.
3593        assert!(Connection::with_transport(&mut pipe.client, &h3_config).is_ok());
3594        assert_eq!(pipe.server_recv(&mut buf[..len]), Ok(len));
3595
3596        // Client sends 0-RTT packet.
3597        let pkt_type = crate::packet::Type::ZeroRTT;
3598
3599        let frames = [crate::frame::Frame::Stream {
3600            stream_id: 6,
3601            data: <crate::range_buf::RangeBuf>::from(b"aaaaa", 0, true),
3602        }];
3603
3604        assert_eq!(
3605            pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
3606            Ok(1200)
3607        );
3608
3609        assert_eq!(pipe.server.undecryptable_pkts.len(), 0);
3610
3611        // 0-RTT stream data is readable.
3612        let mut r = pipe.server.readable();
3613        assert_eq!(r.next(), Some(6));
3614        assert_eq!(r.next(), None);
3615
3616        let mut b = [0; 15];
3617        assert_eq!(pipe.server.stream_recv(6, &mut b), Ok((5, true)));
3618        assert_eq!(&b[..5], b"aaaaa");
3619    }
3620
3621    #[test]
3622    /// Send a request with no body, get a response with no body.
3623    fn request_no_body_response_no_body() {
3624        let mut s = Session::new().unwrap();
3625        s.handshake().unwrap();
3626
3627        let (stream, req) = s.send_request(true).unwrap();
3628
3629        assert_eq!(stream, 0);
3630
3631        let ev_headers = Event::Headers {
3632            list: req,
3633            more_frames: false,
3634        };
3635
3636        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
3637        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
3638
3639        let resp = s.send_response(stream, true).unwrap();
3640
3641        let ev_headers = Event::Headers {
3642            list: resp,
3643            more_frames: false,
3644        };
3645
3646        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
3647        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
3648        assert_eq!(s.poll_client(), Err(Error::Done));
3649    }
3650
3651    #[test]
3652    /// Send a request with no body, get a response with one DATA frame.
3653    fn request_no_body_response_one_chunk() {
3654        let mut s = Session::new().unwrap();
3655        s.handshake().unwrap();
3656
3657        let (stream, req) = s.send_request(true).unwrap();
3658        assert_eq!(stream, 0);
3659
3660        let ev_headers = Event::Headers {
3661            list: req,
3662            more_frames: false,
3663        };
3664
3665        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
3666
3667        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
3668
3669        let resp = s.send_response(stream, false).unwrap();
3670
3671        let body = s.send_body_server(stream, true).unwrap();
3672
3673        let mut recv_buf = vec![0; body.len()];
3674
3675        let ev_headers = Event::Headers {
3676            list: resp,
3677            more_frames: true,
3678        };
3679
3680        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
3681
3682        assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
3683        assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(body.len()));
3684
3685        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
3686        assert_eq!(s.poll_client(), Err(Error::Done));
3687    }
3688
3689    #[test]
3690    /// Send a request with no body, get a response with multiple DATA frames.
3691    fn request_no_body_response_many_chunks() {
3692        let mut s = Session::new().unwrap();
3693        s.handshake().unwrap();
3694
3695        let (stream, req) = s.send_request(true).unwrap();
3696
3697        let ev_headers = Event::Headers {
3698            list: req,
3699            more_frames: false,
3700        };
3701
3702        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
3703        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
3704
3705        let total_data_frames = 4;
3706
3707        let resp = s.send_response(stream, false).unwrap();
3708
3709        for _ in 0..total_data_frames - 1 {
3710            s.send_body_server(stream, false).unwrap();
3711        }
3712
3713        let body = s.send_body_server(stream, true).unwrap();
3714
3715        let mut recv_buf = vec![0; body.len()];
3716
3717        let ev_headers = Event::Headers {
3718            list: resp,
3719            more_frames: true,
3720        };
3721
3722        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
3723        assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
3724        assert_eq!(s.poll_client(), Err(Error::Done));
3725
3726        for _ in 0..total_data_frames {
3727            assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(body.len()));
3728        }
3729
3730        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
3731        assert_eq!(s.poll_client(), Err(Error::Done));
3732    }
3733
3734    #[test]
3735    /// Send a request with one DATA frame, get a response with no body.
3736    fn request_one_chunk_response_no_body() {
3737        let mut s = Session::new().unwrap();
3738        s.handshake().unwrap();
3739
3740        let (stream, req) = s.send_request(false).unwrap();
3741
3742        let body = s.send_body_client(stream, true).unwrap();
3743
3744        let mut recv_buf = vec![0; body.len()];
3745
3746        let ev_headers = Event::Headers {
3747            list: req,
3748            more_frames: true,
3749        };
3750
3751        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
3752
3753        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
3754        assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
3755
3756        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
3757
3758        let resp = s.send_response(stream, true).unwrap();
3759
3760        let ev_headers = Event::Headers {
3761            list: resp,
3762            more_frames: false,
3763        };
3764
3765        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
3766        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
3767    }
3768
3769    #[test]
3770    /// Send a request with multiple DATA frames, get a response with no body.
3771    fn request_many_chunks_response_no_body() {
3772        let mut s = Session::new().unwrap();
3773        s.handshake().unwrap();
3774
3775        let (stream, req) = s.send_request(false).unwrap();
3776
3777        let total_data_frames = 4;
3778
3779        for _ in 0..total_data_frames - 1 {
3780            s.send_body_client(stream, false).unwrap();
3781        }
3782
3783        let body = s.send_body_client(stream, true).unwrap();
3784
3785        let mut recv_buf = vec![0; body.len()];
3786
3787        let ev_headers = Event::Headers {
3788            list: req,
3789            more_frames: true,
3790        };
3791
3792        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
3793        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
3794        assert_eq!(s.poll_server(), Err(Error::Done));
3795
3796        for _ in 0..total_data_frames {
3797            assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
3798        }
3799
3800        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
3801
3802        let resp = s.send_response(stream, true).unwrap();
3803
3804        let ev_headers = Event::Headers {
3805            list: resp,
3806            more_frames: false,
3807        };
3808
3809        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
3810        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
3811    }
3812
3813    #[test]
3814    /// Send a request with multiple DATA frames, get a response with one DATA
3815    /// frame.
3816    fn many_requests_many_chunks_response_one_chunk() {
3817        let mut s = Session::new().unwrap();
3818        s.handshake().unwrap();
3819
3820        let mut reqs = Vec::new();
3821
3822        let (stream1, req1) = s.send_request(false).unwrap();
3823        assert_eq!(stream1, 0);
3824        reqs.push(req1);
3825
3826        let (stream2, req2) = s.send_request(false).unwrap();
3827        assert_eq!(stream2, 4);
3828        reqs.push(req2);
3829
3830        let (stream3, req3) = s.send_request(false).unwrap();
3831        assert_eq!(stream3, 8);
3832        reqs.push(req3);
3833
3834        let body = s.send_body_client(stream1, false).unwrap();
3835        s.send_body_client(stream2, false).unwrap();
3836        s.send_body_client(stream3, false).unwrap();
3837
3838        let mut recv_buf = vec![0; body.len()];
3839
3840        // Reverse order of writes.
3841
3842        s.send_body_client(stream3, true).unwrap();
3843        s.send_body_client(stream2, true).unwrap();
3844        s.send_body_client(stream1, true).unwrap();
3845
3846        let (_, ev) = s.poll_server().unwrap();
3847        let ev_headers = Event::Headers {
3848            list: reqs[0].clone(),
3849            more_frames: true,
3850        };
3851        assert_eq!(ev, ev_headers);
3852
3853        let (_, ev) = s.poll_server().unwrap();
3854        let ev_headers = Event::Headers {
3855            list: reqs[1].clone(),
3856            more_frames: true,
3857        };
3858        assert_eq!(ev, ev_headers);
3859
3860        let (_, ev) = s.poll_server().unwrap();
3861        let ev_headers = Event::Headers {
3862            list: reqs[2].clone(),
3863            more_frames: true,
3864        };
3865        assert_eq!(ev, ev_headers);
3866
3867        assert_eq!(s.poll_server(), Ok((0, Event::Data)));
3868        assert_eq!(s.recv_body_server(0, &mut recv_buf), Ok(body.len()));
3869        assert_eq!(s.poll_client(), Err(Error::Done));
3870        assert_eq!(s.recv_body_server(0, &mut recv_buf), Ok(body.len()));
3871        assert_eq!(s.poll_server(), Ok((0, Event::Finished)));
3872
3873        assert_eq!(s.poll_server(), Ok((4, Event::Data)));
3874        assert_eq!(s.recv_body_server(4, &mut recv_buf), Ok(body.len()));
3875        assert_eq!(s.poll_client(), Err(Error::Done));
3876        assert_eq!(s.recv_body_server(4, &mut recv_buf), Ok(body.len()));
3877        assert_eq!(s.poll_server(), Ok((4, Event::Finished)));
3878
3879        assert_eq!(s.poll_server(), Ok((8, Event::Data)));
3880        assert_eq!(s.recv_body_server(8, &mut recv_buf), Ok(body.len()));
3881        assert_eq!(s.poll_client(), Err(Error::Done));
3882        assert_eq!(s.recv_body_server(8, &mut recv_buf), Ok(body.len()));
3883        assert_eq!(s.poll_server(), Ok((8, Event::Finished)));
3884
3885        assert_eq!(s.poll_server(), Err(Error::Done));
3886
3887        let mut resps = Vec::new();
3888
3889        let resp1 = s.send_response(stream1, true).unwrap();
3890        resps.push(resp1);
3891
3892        let resp2 = s.send_response(stream2, true).unwrap();
3893        resps.push(resp2);
3894
3895        let resp3 = s.send_response(stream3, true).unwrap();
3896        resps.push(resp3);
3897
3898        for _ in 0..resps.len() {
3899            let (stream, ev) = s.poll_client().unwrap();
3900            let ev_headers = Event::Headers {
3901                list: resps[(stream / 4) as usize].clone(),
3902                more_frames: false,
3903            };
3904            assert_eq!(ev, ev_headers);
3905            assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
3906        }
3907
3908        assert_eq!(s.poll_client(), Err(Error::Done));
3909    }
3910
3911    #[test]
3912    /// Send a request with no body, get a response with one DATA frame and an
3913    /// empty FIN after reception from the client.
3914    fn request_no_body_response_one_chunk_empty_fin() {
3915        let mut s = Session::new().unwrap();
3916        s.handshake().unwrap();
3917
3918        let (stream, req) = s.send_request(true).unwrap();
3919
3920        let ev_headers = Event::Headers {
3921            list: req,
3922            more_frames: false,
3923        };
3924
3925        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
3926        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
3927
3928        let resp = s.send_response(stream, false).unwrap();
3929
3930        let body = s.send_body_server(stream, false).unwrap();
3931
3932        let mut recv_buf = vec![0; body.len()];
3933
3934        let ev_headers = Event::Headers {
3935            list: resp,
3936            more_frames: true,
3937        };
3938
3939        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
3940
3941        assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
3942        assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(body.len()));
3943
3944        assert_eq!(s.pipe.server.stream_send(stream, &[], true), Ok(0));
3945        s.advance().ok();
3946
3947        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
3948        assert_eq!(s.poll_client(), Err(Error::Done));
3949    }
3950
3951    #[test]
3952    /// Send a request with no body, get a response with no body followed by
3953    /// GREASE that is STREAM frame with a FIN.
3954    fn request_no_body_response_no_body_with_grease() {
3955        let mut s = Session::new().unwrap();
3956        s.handshake().unwrap();
3957
3958        let (stream, req) = s.send_request(true).unwrap();
3959
3960        assert_eq!(stream, 0);
3961
3962        let ev_headers = Event::Headers {
3963            list: req,
3964            more_frames: false,
3965        };
3966
3967        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
3968        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
3969
3970        let resp = s.send_response(stream, false).unwrap();
3971
3972        let ev_headers = Event::Headers {
3973            list: resp,
3974            more_frames: true,
3975        };
3976
3977        // Inject a GREASE frame
3978        let mut d = [42; 10];
3979        let mut b = octets::OctetsMut::with_slice(&mut d);
3980
3981        let frame_type = b.put_varint(148_764_065_110_560_899).unwrap();
3982        s.pipe.server.stream_send(0, frame_type, false).unwrap();
3983
3984        let frame_len = b.put_varint(10).unwrap();
3985        s.pipe.server.stream_send(0, frame_len, false).unwrap();
3986
3987        s.pipe.server.stream_send(0, &d, true).unwrap();
3988
3989        s.advance().ok();
3990
3991        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
3992        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
3993        assert_eq!(s.poll_client(), Err(Error::Done));
3994    }
3995
3996    #[test]
3997    /// Try to send DATA frames before HEADERS.
3998    fn body_response_before_headers() {
3999        let mut s = Session::new().unwrap();
4000        s.handshake().unwrap();
4001
4002        let (stream, req) = s.send_request(true).unwrap();
4003        assert_eq!(stream, 0);
4004
4005        let ev_headers = Event::Headers {
4006            list: req,
4007            more_frames: false,
4008        };
4009
4010        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4011
4012        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
4013
4014        assert_eq!(
4015            s.send_body_server(stream, true),
4016            Err(Error::FrameUnexpected)
4017        );
4018
4019        assert_eq!(s.poll_client(), Err(Error::Done));
4020    }
4021
4022    #[test]
4023    /// Try to send DATA frames on wrong streams, ensure the API returns an
4024    /// error before anything hits the transport layer.
4025    fn send_body_invalid_client_stream() {
4026        let mut s = Session::new().unwrap();
4027        s.handshake().unwrap();
4028
4029        assert_eq!(s.send_body_client(0, true), Err(Error::FrameUnexpected));
4030
4031        assert_eq!(
4032            s.send_body_client(s.client.control_stream_id.unwrap(), true),
4033            Err(Error::FrameUnexpected)
4034        );
4035
4036        assert_eq!(
4037            s.send_body_client(
4038                s.client.local_qpack_streams.encoder_stream_id.unwrap(),
4039                true
4040            ),
4041            Err(Error::FrameUnexpected)
4042        );
4043
4044        assert_eq!(
4045            s.send_body_client(
4046                s.client.local_qpack_streams.decoder_stream_id.unwrap(),
4047                true
4048            ),
4049            Err(Error::FrameUnexpected)
4050        );
4051
4052        assert_eq!(
4053            s.send_body_client(s.client.peer_control_stream_id.unwrap(), true),
4054            Err(Error::FrameUnexpected)
4055        );
4056
4057        assert_eq!(
4058            s.send_body_client(
4059                s.client.peer_qpack_streams.encoder_stream_id.unwrap(),
4060                true
4061            ),
4062            Err(Error::FrameUnexpected)
4063        );
4064
4065        assert_eq!(
4066            s.send_body_client(
4067                s.client.peer_qpack_streams.decoder_stream_id.unwrap(),
4068                true
4069            ),
4070            Err(Error::FrameUnexpected)
4071        );
4072    }
4073
4074    #[test]
4075    /// Try to send DATA frames on wrong streams, ensure the API returns an
4076    /// error before anything hits the transport layer.
4077    fn send_body_invalid_server_stream() {
4078        let mut s = Session::new().unwrap();
4079        s.handshake().unwrap();
4080
4081        assert_eq!(s.send_body_server(0, true), Err(Error::FrameUnexpected));
4082
4083        assert_eq!(
4084            s.send_body_server(s.server.control_stream_id.unwrap(), true),
4085            Err(Error::FrameUnexpected)
4086        );
4087
4088        assert_eq!(
4089            s.send_body_server(
4090                s.server.local_qpack_streams.encoder_stream_id.unwrap(),
4091                true
4092            ),
4093            Err(Error::FrameUnexpected)
4094        );
4095
4096        assert_eq!(
4097            s.send_body_server(
4098                s.server.local_qpack_streams.decoder_stream_id.unwrap(),
4099                true
4100            ),
4101            Err(Error::FrameUnexpected)
4102        );
4103
4104        assert_eq!(
4105            s.send_body_server(s.server.peer_control_stream_id.unwrap(), true),
4106            Err(Error::FrameUnexpected)
4107        );
4108
4109        assert_eq!(
4110            s.send_body_server(
4111                s.server.peer_qpack_streams.encoder_stream_id.unwrap(),
4112                true
4113            ),
4114            Err(Error::FrameUnexpected)
4115        );
4116
4117        assert_eq!(
4118            s.send_body_server(
4119                s.server.peer_qpack_streams.decoder_stream_id.unwrap(),
4120                true
4121            ),
4122            Err(Error::FrameUnexpected)
4123        );
4124    }
4125
4126    #[test]
4127    /// Client sends request with body and trailers.
4128    fn trailers() {
4129        let mut s = Session::new().unwrap();
4130        s.handshake().unwrap();
4131
4132        let (stream, req) = s.send_request(false).unwrap();
4133
4134        let body = s.send_body_client(stream, false).unwrap();
4135
4136        let mut recv_buf = vec![0; body.len()];
4137
4138        let req_trailers = vec![Header::new(b"foo", b"bar")];
4139
4140        s.client
4141            .send_additional_headers(
4142                &mut s.pipe.client,
4143                stream,
4144                &req_trailers,
4145                true,
4146                true,
4147            )
4148            .unwrap();
4149
4150        s.advance().ok();
4151
4152        let ev_headers = Event::Headers {
4153            list: req,
4154            more_frames: true,
4155        };
4156
4157        let ev_trailers = Event::Headers {
4158            list: req_trailers,
4159            more_frames: false,
4160        };
4161
4162        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4163
4164        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
4165        assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
4166
4167        assert_eq!(s.poll_server(), Ok((stream, ev_trailers)));
4168        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
4169    }
4170
4171    #[test]
4172    /// Server responds with a 103, then a 200 with no body.
4173    fn informational_response() {
4174        let mut s = Session::new().unwrap();
4175        s.handshake().unwrap();
4176
4177        let (stream, req) = s.send_request(true).unwrap();
4178
4179        assert_eq!(stream, 0);
4180
4181        let ev_headers = Event::Headers {
4182            list: req,
4183            more_frames: false,
4184        };
4185
4186        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4187        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
4188
4189        let info_resp = vec![
4190            Header::new(b":status", b"103"),
4191            Header::new(b"link", b"<https://example.com>; rel=\"preconnect\""),
4192        ];
4193
4194        let resp = vec![
4195            Header::new(b":status", b"200"),
4196            Header::new(b"server", b"quiche-test"),
4197        ];
4198
4199        s.server
4200            .send_response(&mut s.pipe.server, stream, &info_resp, false)
4201            .unwrap();
4202
4203        s.server
4204            .send_additional_headers(
4205                &mut s.pipe.server,
4206                stream,
4207                &resp,
4208                false,
4209                true,
4210            )
4211            .unwrap();
4212
4213        s.advance().ok();
4214
4215        let ev_info_headers = Event::Headers {
4216            list: info_resp,
4217            more_frames: true,
4218        };
4219
4220        let ev_headers = Event::Headers {
4221            list: resp,
4222            more_frames: false,
4223        };
4224
4225        assert_eq!(s.poll_client(), Ok((stream, ev_info_headers)));
4226        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
4227        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
4228        assert_eq!(s.poll_client(), Err(Error::Done));
4229    }
4230
4231    #[test]
4232    /// Server responds with a 103, then attempts to send a 200 using
4233    /// send_response again, which should fail.
4234    fn no_multiple_response() {
4235        let mut s = Session::new().unwrap();
4236        s.handshake().unwrap();
4237
4238        let (stream, req) = s.send_request(true).unwrap();
4239
4240        assert_eq!(stream, 0);
4241
4242        let ev_headers = Event::Headers {
4243            list: req,
4244            more_frames: false,
4245        };
4246
4247        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4248        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
4249
4250        let info_resp = vec![
4251            Header::new(b":status", b"103"),
4252            Header::new(b"link", b"<https://example.com>; rel=\"preconnect\""),
4253        ];
4254
4255        let resp = vec![
4256            Header::new(b":status", b"200"),
4257            Header::new(b"server", b"quiche-test"),
4258        ];
4259
4260        s.server
4261            .send_response(&mut s.pipe.server, stream, &info_resp, false)
4262            .unwrap();
4263
4264        assert_eq!(
4265            Err(Error::FrameUnexpected),
4266            s.server
4267                .send_response(&mut s.pipe.server, stream, &resp, true)
4268        );
4269
4270        s.advance().ok();
4271
4272        let ev_info_headers = Event::Headers {
4273            list: info_resp,
4274            more_frames: true,
4275        };
4276
4277        assert_eq!(s.poll_client(), Ok((stream, ev_info_headers)));
4278        assert_eq!(s.poll_client(), Err(Error::Done));
4279    }
4280
4281    #[test]
4282    /// Server attempts to use send_additional_headers before initial response.
4283    fn no_send_additional_before_initial_response() {
4284        let mut s = Session::new().unwrap();
4285        s.handshake().unwrap();
4286
4287        let (stream, req) = s.send_request(true).unwrap();
4288
4289        assert_eq!(stream, 0);
4290
4291        let ev_headers = Event::Headers {
4292            list: req,
4293            more_frames: false,
4294        };
4295
4296        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4297        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
4298
4299        let info_resp = vec![
4300            Header::new(b":status", b"103"),
4301            Header::new(b"link", b"<https://example.com>; rel=\"preconnect\""),
4302        ];
4303
4304        assert_eq!(
4305            Err(Error::FrameUnexpected),
4306            s.server.send_additional_headers(
4307                &mut s.pipe.server,
4308                stream,
4309                &info_resp,
4310                false,
4311                false
4312            )
4313        );
4314
4315        s.advance().ok();
4316
4317        assert_eq!(s.poll_client(), Err(Error::Done));
4318    }
4319
4320    #[test]
4321    /// Client sends multiple HEADERS before data.
4322    fn additional_headers_before_data_client() {
4323        let mut s = Session::new().unwrap();
4324        s.handshake().unwrap();
4325
4326        let (stream, req) = s.send_request(false).unwrap();
4327
4328        let req_trailer = vec![Header::new(b"goodbye", b"world")];
4329
4330        assert_eq!(
4331            s.client.send_additional_headers(
4332                &mut s.pipe.client,
4333                stream,
4334                &req_trailer,
4335                true,
4336                false
4337            ),
4338            Ok(())
4339        );
4340
4341        s.advance().ok();
4342
4343        let ev_initial_headers = Event::Headers {
4344            list: req,
4345            more_frames: true,
4346        };
4347
4348        let ev_trailing_headers = Event::Headers {
4349            list: req_trailer,
4350            more_frames: true,
4351        };
4352
4353        assert_eq!(s.poll_server(), Ok((stream, ev_initial_headers)));
4354        assert_eq!(s.poll_server(), Ok((stream, ev_trailing_headers)));
4355        assert_eq!(s.poll_server(), Err(Error::Done));
4356    }
4357
4358    #[test]
4359    /// Client sends multiple HEADERS before data.
4360    fn data_after_trailers_client() {
4361        let mut s = Session::new().unwrap();
4362        s.handshake().unwrap();
4363
4364        let (stream, req) = s.send_request(false).unwrap();
4365
4366        let body = s.send_body_client(stream, false).unwrap();
4367
4368        let mut recv_buf = vec![0; body.len()];
4369
4370        let req_trailers = vec![Header::new(b"foo", b"bar")];
4371
4372        s.client
4373            .send_additional_headers(
4374                &mut s.pipe.client,
4375                stream,
4376                &req_trailers,
4377                true,
4378                false,
4379            )
4380            .unwrap();
4381
4382        s.advance().ok();
4383
4384        s.send_frame_client(
4385            frame::Frame::Data {
4386                payload: vec![1, 2, 3, 4],
4387            },
4388            stream,
4389            true,
4390        )
4391        .unwrap();
4392
4393        let ev_headers = Event::Headers {
4394            list: req,
4395            more_frames: true,
4396        };
4397
4398        let ev_trailers = Event::Headers {
4399            list: req_trailers,
4400            more_frames: true,
4401        };
4402
4403        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4404        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
4405        assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
4406        assert_eq!(s.poll_server(), Ok((stream, ev_trailers)));
4407        assert_eq!(s.poll_server(), Err(Error::FrameUnexpected));
4408    }
4409
4410    #[test]
4411    /// Send a MAX_PUSH_ID frame from the client on a valid stream.
4412    fn max_push_id_from_client_good() {
4413        let mut s = Session::new().unwrap();
4414        s.handshake().unwrap();
4415
4416        s.send_frame_client(
4417            frame::Frame::MaxPushId { push_id: 1 },
4418            s.client.control_stream_id.unwrap(),
4419            false,
4420        )
4421        .unwrap();
4422
4423        assert_eq!(s.poll_server(), Err(Error::Done));
4424    }
4425
4426    #[test]
4427    /// Send a MAX_PUSH_ID frame from the client on an invalid stream.
4428    fn max_push_id_from_client_bad_stream() {
4429        let mut s = Session::new().unwrap();
4430        s.handshake().unwrap();
4431
4432        let (stream, req) = s.send_request(false).unwrap();
4433
4434        s.send_frame_client(
4435            frame::Frame::MaxPushId { push_id: 2 },
4436            stream,
4437            false,
4438        )
4439        .unwrap();
4440
4441        let ev_headers = Event::Headers {
4442            list: req,
4443            more_frames: true,
4444        };
4445
4446        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4447        assert_eq!(s.poll_server(), Err(Error::FrameUnexpected));
4448    }
4449
4450    #[test]
4451    /// Send a sequence of MAX_PUSH_ID frames from the client that attempt to
4452    /// reduce the limit.
4453    fn max_push_id_from_client_limit_reduction() {
4454        let mut s = Session::new().unwrap();
4455        s.handshake().unwrap();
4456
4457        s.send_frame_client(
4458            frame::Frame::MaxPushId { push_id: 2 },
4459            s.client.control_stream_id.unwrap(),
4460            false,
4461        )
4462        .unwrap();
4463
4464        s.send_frame_client(
4465            frame::Frame::MaxPushId { push_id: 1 },
4466            s.client.control_stream_id.unwrap(),
4467            false,
4468        )
4469        .unwrap();
4470
4471        assert_eq!(s.poll_server(), Err(Error::IdError));
4472    }
4473
4474    #[test]
4475    /// Send a MAX_PUSH_ID frame from the server, which is forbidden.
4476    fn max_push_id_from_server() {
4477        let mut s = Session::new().unwrap();
4478        s.handshake().unwrap();
4479
4480        s.send_frame_server(
4481            frame::Frame::MaxPushId { push_id: 1 },
4482            s.server.control_stream_id.unwrap(),
4483            false,
4484        )
4485        .unwrap();
4486
4487        assert_eq!(s.poll_client(), Err(Error::FrameUnexpected));
4488    }
4489
4490    #[test]
4491    /// Send a PUSH_PROMISE frame from the client, which is forbidden.
4492    fn push_promise_from_client() {
4493        let mut s = Session::new().unwrap();
4494        s.handshake().unwrap();
4495
4496        let (stream, req) = s.send_request(false).unwrap();
4497
4498        let header_block = s.client.encode_header_block(&req).unwrap();
4499
4500        s.send_frame_client(
4501            frame::Frame::PushPromise {
4502                push_id: 1,
4503                header_block,
4504            },
4505            stream,
4506            false,
4507        )
4508        .unwrap();
4509
4510        let ev_headers = Event::Headers {
4511            list: req,
4512            more_frames: true,
4513        };
4514
4515        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4516        assert_eq!(s.poll_server(), Err(Error::FrameUnexpected));
4517    }
4518
4519    #[test]
4520    /// Send a CANCEL_PUSH frame from the client.
4521    fn cancel_push_from_client() {
4522        let mut s = Session::new().unwrap();
4523        s.handshake().unwrap();
4524
4525        s.send_frame_client(
4526            frame::Frame::CancelPush { push_id: 1 },
4527            s.client.control_stream_id.unwrap(),
4528            false,
4529        )
4530        .unwrap();
4531
4532        assert_eq!(s.poll_server(), Err(Error::Done));
4533    }
4534
4535    #[test]
4536    /// Send a CANCEL_PUSH frame from the client on an invalid stream.
4537    fn cancel_push_from_client_bad_stream() {
4538        let mut s = Session::new().unwrap();
4539        s.handshake().unwrap();
4540
4541        let (stream, req) = s.send_request(false).unwrap();
4542
4543        s.send_frame_client(
4544            frame::Frame::CancelPush { push_id: 2 },
4545            stream,
4546            false,
4547        )
4548        .unwrap();
4549
4550        let ev_headers = Event::Headers {
4551            list: req,
4552            more_frames: true,
4553        };
4554
4555        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4556        assert_eq!(s.poll_server(), Err(Error::FrameUnexpected));
4557    }
4558
4559    #[test]
4560    /// Send a CANCEL_PUSH frame from the client.
4561    fn cancel_push_from_server() {
4562        let mut s = Session::new().unwrap();
4563        s.handshake().unwrap();
4564
4565        s.send_frame_server(
4566            frame::Frame::CancelPush { push_id: 1 },
4567            s.server.control_stream_id.unwrap(),
4568            false,
4569        )
4570        .unwrap();
4571
4572        assert_eq!(s.poll_client(), Err(Error::Done));
4573    }
4574
4575    #[test]
4576    /// Send a GOAWAY frame from the client.
4577    fn goaway_from_client_good() {
4578        let mut s = Session::new().unwrap();
4579        s.handshake().unwrap();
4580
4581        s.client.send_goaway(&mut s.pipe.client, 100).unwrap();
4582
4583        s.advance().ok();
4584
4585        // TODO: server push
4586        assert_eq!(s.poll_server(), Ok((0, Event::GoAway)));
4587    }
4588
4589    #[test]
4590    /// Send a GOAWAY frame from the server.
4591    fn goaway_from_server_good() {
4592        let mut s = Session::new().unwrap();
4593        s.handshake().unwrap();
4594
4595        s.server.send_goaway(&mut s.pipe.server, 4000).unwrap();
4596
4597        s.advance().ok();
4598
4599        assert_eq!(s.poll_client(), Ok((4000, Event::GoAway)));
4600    }
4601
4602    #[test]
4603    /// A client MUST NOT send a request after it receives GOAWAY.
4604    fn client_request_after_goaway() {
4605        let mut s = Session::new().unwrap();
4606        s.handshake().unwrap();
4607
4608        s.server.send_goaway(&mut s.pipe.server, 4000).unwrap();
4609
4610        s.advance().ok();
4611
4612        assert_eq!(s.poll_client(), Ok((4000, Event::GoAway)));
4613
4614        assert_eq!(s.send_request(true), Err(Error::FrameUnexpected));
4615    }
4616
4617    #[test]
4618    /// Send a GOAWAY frame from the server, using an invalid goaway ID.
4619    fn goaway_from_server_invalid_id() {
4620        let mut s = Session::new().unwrap();
4621        s.handshake().unwrap();
4622
4623        s.send_frame_server(
4624            frame::Frame::GoAway { id: 1 },
4625            s.server.control_stream_id.unwrap(),
4626            false,
4627        )
4628        .unwrap();
4629
4630        assert_eq!(s.poll_client(), Err(Error::IdError));
4631    }
4632
4633    #[test]
4634    /// Send multiple GOAWAY frames from the server, that increase the goaway
4635    /// ID.
4636    fn goaway_from_server_increase_id() {
4637        let mut s = Session::new().unwrap();
4638        s.handshake().unwrap();
4639
4640        s.send_frame_server(
4641            frame::Frame::GoAway { id: 0 },
4642            s.server.control_stream_id.unwrap(),
4643            false,
4644        )
4645        .unwrap();
4646
4647        s.send_frame_server(
4648            frame::Frame::GoAway { id: 4 },
4649            s.server.control_stream_id.unwrap(),
4650            false,
4651        )
4652        .unwrap();
4653
4654        assert_eq!(s.poll_client(), Ok((0, Event::GoAway)));
4655
4656        assert_eq!(s.poll_client(), Err(Error::IdError));
4657    }
4658
4659    #[test]
4660    #[cfg(feature = "sfv")]
4661    fn parse_priority_field_value() {
4662        // Legal dicts
4663        assert_eq!(
4664            Ok(Priority::new(0, false)),
4665            Priority::try_from(b"u=0".as_slice())
4666        );
4667        assert_eq!(
4668            Ok(Priority::new(3, false)),
4669            Priority::try_from(b"u=3".as_slice())
4670        );
4671        assert_eq!(
4672            Ok(Priority::new(7, false)),
4673            Priority::try_from(b"u=7".as_slice())
4674        );
4675
4676        assert_eq!(
4677            Ok(Priority::new(0, true)),
4678            Priority::try_from(b"u=0, i".as_slice())
4679        );
4680        assert_eq!(
4681            Ok(Priority::new(3, true)),
4682            Priority::try_from(b"u=3, i".as_slice())
4683        );
4684        assert_eq!(
4685            Ok(Priority::new(7, true)),
4686            Priority::try_from(b"u=7, i".as_slice())
4687        );
4688
4689        assert_eq!(
4690            Ok(Priority::new(0, true)),
4691            Priority::try_from(b"u=0, i=?1".as_slice())
4692        );
4693        assert_eq!(
4694            Ok(Priority::new(3, true)),
4695            Priority::try_from(b"u=3, i=?1".as_slice())
4696        );
4697        assert_eq!(
4698            Ok(Priority::new(7, true)),
4699            Priority::try_from(b"u=7, i=?1".as_slice())
4700        );
4701
4702        assert_eq!(
4703            Ok(Priority::new(3, false)),
4704            Priority::try_from(b"".as_slice())
4705        );
4706
4707        assert_eq!(
4708            Ok(Priority::new(0, true)),
4709            Priority::try_from(b"u=0;foo, i;bar".as_slice())
4710        );
4711        assert_eq!(
4712            Ok(Priority::new(3, true)),
4713            Priority::try_from(b"u=3;hello, i;world".as_slice())
4714        );
4715        assert_eq!(
4716            Ok(Priority::new(7, true)),
4717            Priority::try_from(b"u=7;croeso, i;gymru".as_slice())
4718        );
4719
4720        assert_eq!(
4721            Ok(Priority::new(0, true)),
4722            Priority::try_from(b"u=0, i, spinaltap=11".as_slice())
4723        );
4724
4725        // Illegal formats
4726        assert_eq!(Err(Error::Done), Priority::try_from(b"0".as_slice()));
4727        assert_eq!(
4728            Ok(Priority::new(7, false)),
4729            Priority::try_from(b"u=-1".as_slice())
4730        );
4731        assert_eq!(Err(Error::Done), Priority::try_from(b"u=0.2".as_slice()));
4732        assert_eq!(
4733            Ok(Priority::new(7, false)),
4734            Priority::try_from(b"u=100".as_slice())
4735        );
4736        assert_eq!(
4737            Err(Error::Done),
4738            Priority::try_from(b"u=3, i=true".as_slice())
4739        );
4740
4741        // Trailing comma in dict is malformed
4742        assert_eq!(Err(Error::Done), Priority::try_from(b"u=7, ".as_slice()));
4743    }
4744
4745    #[test]
4746    /// Send a PRIORITY_UPDATE for request stream from the client.
4747    fn priority_update_request() {
4748        let mut s = Session::new().unwrap();
4749        s.handshake().unwrap();
4750
4751        s.client
4752            .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
4753                urgency: 3,
4754                incremental: false,
4755            })
4756            .unwrap();
4757        s.advance().ok();
4758
4759        assert_eq!(s.poll_server(), Ok((0, Event::PriorityUpdate)));
4760        assert_eq!(s.poll_server(), Err(Error::Done));
4761    }
4762
4763    #[test]
4764    /// Send a PRIORITY_UPDATE for request stream from the client.
4765    fn priority_update_single_stream_rearm() {
4766        let mut s = Session::new().unwrap();
4767        s.handshake().unwrap();
4768
4769        s.client
4770            .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
4771                urgency: 3,
4772                incremental: false,
4773            })
4774            .unwrap();
4775        s.advance().ok();
4776
4777        assert_eq!(s.poll_server(), Ok((0, Event::PriorityUpdate)));
4778        assert_eq!(s.poll_server(), Err(Error::Done));
4779
4780        s.client
4781            .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
4782                urgency: 5,
4783                incremental: false,
4784            })
4785            .unwrap();
4786        s.advance().ok();
4787
4788        assert_eq!(s.poll_server(), Err(Error::Done));
4789
4790        // There is only one PRIORITY_UPDATE frame to read. Once read, the event
4791        // will rearm ready for more.
4792        assert_eq!(s.server.take_last_priority_update(0), Ok(b"u=5".to_vec()));
4793        assert_eq!(s.server.take_last_priority_update(0), Err(Error::Done));
4794
4795        s.client
4796            .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
4797                urgency: 7,
4798                incremental: false,
4799            })
4800            .unwrap();
4801        s.advance().ok();
4802
4803        assert_eq!(s.poll_server(), Ok((0, Event::PriorityUpdate)));
4804        assert_eq!(s.poll_server(), Err(Error::Done));
4805
4806        assert_eq!(s.server.take_last_priority_update(0), Ok(b"u=7".to_vec()));
4807        assert_eq!(s.server.take_last_priority_update(0), Err(Error::Done));
4808    }
4809
4810    #[test]
4811    /// Send multiple PRIORITY_UPDATE frames for different streams from the
4812    /// client across multiple flights of exchange.
4813    fn priority_update_request_multiple_stream_arm_multiple_flights() {
4814        let mut s = Session::new().unwrap();
4815        s.handshake().unwrap();
4816
4817        s.client
4818            .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
4819                urgency: 3,
4820                incremental: false,
4821            })
4822            .unwrap();
4823        s.advance().ok();
4824
4825        assert_eq!(s.poll_server(), Ok((0, Event::PriorityUpdate)));
4826        assert_eq!(s.poll_server(), Err(Error::Done));
4827
4828        s.client
4829            .send_priority_update_for_request(&mut s.pipe.client, 4, &Priority {
4830                urgency: 1,
4831                incremental: false,
4832            })
4833            .unwrap();
4834        s.advance().ok();
4835
4836        assert_eq!(s.poll_server(), Ok((4, Event::PriorityUpdate)));
4837        assert_eq!(s.poll_server(), Err(Error::Done));
4838
4839        s.client
4840            .send_priority_update_for_request(&mut s.pipe.client, 8, &Priority {
4841                urgency: 2,
4842                incremental: false,
4843            })
4844            .unwrap();
4845        s.advance().ok();
4846
4847        assert_eq!(s.poll_server(), Ok((8, Event::PriorityUpdate)));
4848        assert_eq!(s.poll_server(), Err(Error::Done));
4849
4850        assert_eq!(s.server.take_last_priority_update(0), Ok(b"u=3".to_vec()));
4851        assert_eq!(s.server.take_last_priority_update(4), Ok(b"u=1".to_vec()));
4852        assert_eq!(s.server.take_last_priority_update(8), Ok(b"u=2".to_vec()));
4853        assert_eq!(s.server.take_last_priority_update(0), Err(Error::Done));
4854    }
4855
4856    #[test]
4857    /// Send multiple PRIORITY_UPDATE frames for different streams from the
4858    /// client across a single flight.
4859    fn priority_update_request_multiple_stream_arm_single_flight() {
4860        let mut s = Session::new().unwrap();
4861        s.handshake().unwrap();
4862
4863        let mut d = [42; 65535];
4864
4865        let mut b = octets::OctetsMut::with_slice(&mut d);
4866
4867        let p1 = frame::Frame::PriorityUpdateRequest {
4868            prioritized_element_id: 0,
4869            priority_field_value: b"u=3".to_vec(),
4870        };
4871
4872        let p2 = frame::Frame::PriorityUpdateRequest {
4873            prioritized_element_id: 4,
4874            priority_field_value: b"u=3".to_vec(),
4875        };
4876
4877        let p3 = frame::Frame::PriorityUpdateRequest {
4878            prioritized_element_id: 8,
4879            priority_field_value: b"u=3".to_vec(),
4880        };
4881
4882        p1.to_bytes(&mut b).unwrap();
4883        p2.to_bytes(&mut b).unwrap();
4884        p3.to_bytes(&mut b).unwrap();
4885
4886        let off = b.off();
4887        s.pipe
4888            .client
4889            .stream_send(s.client.control_stream_id.unwrap(), &d[..off], false)
4890            .unwrap();
4891
4892        s.advance().ok();
4893
4894        assert_eq!(s.poll_server(), Ok((0, Event::PriorityUpdate)));
4895        assert_eq!(s.poll_server(), Ok((4, Event::PriorityUpdate)));
4896        assert_eq!(s.poll_server(), Ok((8, Event::PriorityUpdate)));
4897        assert_eq!(s.poll_server(), Err(Error::Done));
4898
4899        assert_eq!(s.server.take_last_priority_update(0), Ok(b"u=3".to_vec()));
4900        assert_eq!(s.server.take_last_priority_update(4), Ok(b"u=3".to_vec()));
4901        assert_eq!(s.server.take_last_priority_update(8), Ok(b"u=3".to_vec()));
4902
4903        assert_eq!(s.server.take_last_priority_update(0), Err(Error::Done));
4904    }
4905
4906    #[test]
4907    /// Send a PRIORITY_UPDATE for a request stream, before and after the stream
4908    /// has been completed.
4909    fn priority_update_request_collected_completed() {
4910        let mut s = Session::new().unwrap();
4911        s.handshake().unwrap();
4912
4913        s.client
4914            .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
4915                urgency: 3,
4916                incremental: false,
4917            })
4918            .unwrap();
4919        s.advance().ok();
4920
4921        let (stream, req) = s.send_request(true).unwrap();
4922        let ev_headers = Event::Headers {
4923            list: req,
4924            more_frames: false,
4925        };
4926
4927        // Priority event is generated before request headers.
4928        assert_eq!(s.poll_server(), Ok((0, Event::PriorityUpdate)));
4929        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4930        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
4931        assert_eq!(s.poll_server(), Err(Error::Done));
4932
4933        assert_eq!(s.server.take_last_priority_update(0), Ok(b"u=3".to_vec()));
4934        assert_eq!(s.server.take_last_priority_update(0), Err(Error::Done));
4935
4936        let resp = s.send_response(stream, true).unwrap();
4937
4938        let ev_headers = Event::Headers {
4939            list: resp,
4940            more_frames: false,
4941        };
4942
4943        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
4944        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
4945        assert_eq!(s.poll_client(), Err(Error::Done));
4946
4947        // Now send a PRIORITY_UPDATE for the completed request stream.
4948        s.client
4949            .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
4950                urgency: 3,
4951                incremental: false,
4952            })
4953            .unwrap();
4954        s.advance().ok();
4955
4956        // No event generated at server
4957        assert_eq!(s.poll_server(), Err(Error::Done));
4958    }
4959
4960    #[test]
4961    /// Send a PRIORITY_UPDATE for a request stream, before and after the stream
4962    /// has been stopped.
4963    fn priority_update_request_collected_stopped() {
4964        let mut s = Session::new().unwrap();
4965        s.handshake().unwrap();
4966
4967        s.client
4968            .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
4969                urgency: 3,
4970                incremental: false,
4971            })
4972            .unwrap();
4973        s.advance().ok();
4974
4975        let (stream, req) = s.send_request(false).unwrap();
4976        let ev_headers = Event::Headers {
4977            list: req,
4978            more_frames: true,
4979        };
4980
4981        // Priority event is generated before request headers.
4982        assert_eq!(s.poll_server(), Ok((0, Event::PriorityUpdate)));
4983        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4984        assert_eq!(s.poll_server(), Err(Error::Done));
4985
4986        assert_eq!(s.server.take_last_priority_update(0), Ok(b"u=3".to_vec()));
4987        assert_eq!(s.server.take_last_priority_update(0), Err(Error::Done));
4988
4989        s.pipe
4990            .client
4991            .stream_shutdown(stream, crate::Shutdown::Write, 0x100)
4992            .unwrap();
4993        s.pipe
4994            .client
4995            .stream_shutdown(stream, crate::Shutdown::Read, 0x100)
4996            .unwrap();
4997
4998        s.advance().ok();
4999
5000        assert_eq!(s.poll_server(), Ok((0, Event::Reset(0x100))));
5001        assert_eq!(s.poll_server(), Err(Error::Done));
5002
5003        // Now send a PRIORITY_UPDATE for the closed request stream.
5004        s.client
5005            .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
5006                urgency: 3,
5007                incremental: false,
5008            })
5009            .unwrap();
5010        s.advance().ok();
5011
5012        // No event generated at server
5013        assert_eq!(s.poll_server(), Err(Error::Done));
5014
5015        assert!(s.pipe.server.streams.is_collected(0));
5016        assert!(s.pipe.client.streams.is_collected(0));
5017    }
5018
5019    #[test]
5020    /// Send a PRIORITY_UPDATE for push stream from the client.
5021    fn priority_update_push() {
5022        let mut s = Session::new().unwrap();
5023        s.handshake().unwrap();
5024
5025        s.send_frame_client(
5026            frame::Frame::PriorityUpdatePush {
5027                prioritized_element_id: 3,
5028                priority_field_value: b"u=3".to_vec(),
5029            },
5030            s.client.control_stream_id.unwrap(),
5031            false,
5032        )
5033        .unwrap();
5034
5035        assert_eq!(s.poll_server(), Err(Error::Done));
5036    }
5037
5038    #[test]
5039    /// Send a PRIORITY_UPDATE for request stream from the client but for an
5040    /// incorrect stream type.
5041    fn priority_update_request_bad_stream() {
5042        let mut s = Session::new().unwrap();
5043        s.handshake().unwrap();
5044
5045        s.send_frame_client(
5046            frame::Frame::PriorityUpdateRequest {
5047                prioritized_element_id: 5,
5048                priority_field_value: b"u=3".to_vec(),
5049            },
5050            s.client.control_stream_id.unwrap(),
5051            false,
5052        )
5053        .unwrap();
5054
5055        assert_eq!(s.poll_server(), Err(Error::FrameUnexpected));
5056    }
5057
5058    #[test]
5059    /// Send a PRIORITY_UPDATE for push stream from the client but for an
5060    /// incorrect stream type.
5061    fn priority_update_push_bad_stream() {
5062        let mut s = Session::new().unwrap();
5063        s.handshake().unwrap();
5064
5065        s.send_frame_client(
5066            frame::Frame::PriorityUpdatePush {
5067                prioritized_element_id: 5,
5068                priority_field_value: b"u=3".to_vec(),
5069            },
5070            s.client.control_stream_id.unwrap(),
5071            false,
5072        )
5073        .unwrap();
5074
5075        assert_eq!(s.poll_server(), Err(Error::FrameUnexpected));
5076    }
5077
5078    #[test]
5079    /// Send a PRIORITY_UPDATE for request stream from the server.
5080    fn priority_update_request_from_server() {
5081        let mut s = Session::new().unwrap();
5082        s.handshake().unwrap();
5083
5084        s.send_frame_server(
5085            frame::Frame::PriorityUpdateRequest {
5086                prioritized_element_id: 0,
5087                priority_field_value: b"u=3".to_vec(),
5088            },
5089            s.server.control_stream_id.unwrap(),
5090            false,
5091        )
5092        .unwrap();
5093
5094        assert_eq!(s.poll_client(), Err(Error::FrameUnexpected));
5095    }
5096
5097    #[test]
5098    /// Send a PRIORITY_UPDATE for request stream from the server.
5099    fn priority_update_push_from_server() {
5100        let mut s = Session::new().unwrap();
5101        s.handshake().unwrap();
5102
5103        s.send_frame_server(
5104            frame::Frame::PriorityUpdatePush {
5105                prioritized_element_id: 0,
5106                priority_field_value: b"u=3".to_vec(),
5107            },
5108            s.server.control_stream_id.unwrap(),
5109            false,
5110        )
5111        .unwrap();
5112
5113        assert_eq!(s.poll_client(), Err(Error::FrameUnexpected));
5114    }
5115
5116    #[test]
5117    /// Ensure quiche allocates streams for client and server roles as expected.
5118    fn uni_stream_local_counting() {
5119        let config = Config::new().unwrap();
5120
5121        let h3_cln = Connection::new(&config, false, false).unwrap();
5122        assert_eq!(h3_cln.next_uni_stream_id, 2);
5123
5124        let h3_srv = Connection::new(&config, true, false).unwrap();
5125        assert_eq!(h3_srv.next_uni_stream_id, 3);
5126    }
5127
5128    #[test]
5129    /// Client opens multiple control streams, which is forbidden.
5130    fn open_multiple_control_streams() {
5131        let mut s = Session::new().unwrap();
5132        s.handshake().unwrap();
5133
5134        let stream_id = s.client.next_uni_stream_id;
5135
5136        let mut d = [42; 8];
5137        let mut b = octets::OctetsMut::with_slice(&mut d);
5138
5139        s.pipe
5140            .client
5141            .stream_send(
5142                stream_id,
5143                b.put_varint(stream::HTTP3_CONTROL_STREAM_TYPE_ID).unwrap(),
5144                false,
5145            )
5146            .unwrap();
5147
5148        s.advance().ok();
5149
5150        assert_eq!(s.poll_server(), Err(Error::StreamCreationError));
5151    }
5152
5153    #[test]
5154    /// Client closes the control stream, which is forbidden.
5155    fn close_control_stream_after_type() {
5156        let mut s = Session::new().unwrap();
5157        s.handshake().unwrap();
5158
5159        s.pipe
5160            .client
5161            .stream_send(s.client.control_stream_id.unwrap(), &[], true)
5162            .unwrap();
5163
5164        s.advance().ok();
5165
5166        assert_eq!(
5167            Err(Error::ClosedCriticalStream),
5168            s.server.poll(&mut s.pipe.server)
5169        );
5170        assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
5171    }
5172
5173    #[test]
5174    /// Client closes the control stream after a frame is sent, which is
5175    /// forbidden.
5176    fn close_control_stream_after_frame() {
5177        let mut s = Session::new().unwrap();
5178        s.handshake().unwrap();
5179
5180        s.send_frame_client(
5181            frame::Frame::MaxPushId { push_id: 1 },
5182            s.client.control_stream_id.unwrap(),
5183            true,
5184        )
5185        .unwrap();
5186
5187        assert_eq!(
5188            Err(Error::ClosedCriticalStream),
5189            s.server.poll(&mut s.pipe.server)
5190        );
5191        assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
5192    }
5193
5194    #[test]
5195    /// Client resets the control stream, which is forbidden.
5196    fn reset_control_stream_after_type() {
5197        let mut s = Session::new().unwrap();
5198        s.handshake().unwrap();
5199
5200        s.pipe
5201            .client
5202            .stream_shutdown(
5203                s.client.control_stream_id.unwrap(),
5204                crate::Shutdown::Write,
5205                0,
5206            )
5207            .unwrap();
5208
5209        s.advance().ok();
5210
5211        assert_eq!(
5212            Err(Error::ClosedCriticalStream),
5213            s.server.poll(&mut s.pipe.server)
5214        );
5215        assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
5216    }
5217
5218    #[test]
5219    /// Client resets the control stream after a frame is sent, which is
5220    /// forbidden.
5221    fn reset_control_stream_after_frame() {
5222        let mut s = Session::new().unwrap();
5223        s.handshake().unwrap();
5224
5225        s.send_frame_client(
5226            frame::Frame::MaxPushId { push_id: 1 },
5227            s.client.control_stream_id.unwrap(),
5228            false,
5229        )
5230        .unwrap();
5231
5232        assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
5233
5234        s.pipe
5235            .client
5236            .stream_shutdown(
5237                s.client.control_stream_id.unwrap(),
5238                crate::Shutdown::Write,
5239                0,
5240            )
5241            .unwrap();
5242
5243        s.advance().ok();
5244
5245        assert_eq!(
5246            Err(Error::ClosedCriticalStream),
5247            s.server.poll(&mut s.pipe.server)
5248        );
5249        assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
5250    }
5251
5252    #[test]
5253    /// Client closes QPACK stream, which is forbidden.
5254    fn close_qpack_stream_after_type() {
5255        let mut s = Session::new().unwrap();
5256        s.handshake().unwrap();
5257
5258        s.pipe
5259            .client
5260            .stream_send(
5261                s.client.local_qpack_streams.encoder_stream_id.unwrap(),
5262                &[],
5263                true,
5264            )
5265            .unwrap();
5266
5267        s.advance().ok();
5268
5269        assert_eq!(
5270            Err(Error::ClosedCriticalStream),
5271            s.server.poll(&mut s.pipe.server)
5272        );
5273        assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
5274    }
5275
5276    #[test]
5277    /// Client closes QPACK stream after sending some stuff, which is forbidden.
5278    fn close_qpack_stream_after_data() {
5279        let mut s = Session::new().unwrap();
5280        s.handshake().unwrap();
5281
5282        let stream_id = s.client.local_qpack_streams.encoder_stream_id.unwrap();
5283        let d = [0; 1];
5284
5285        s.pipe.client.stream_send(stream_id, &d, false).unwrap();
5286        s.pipe.client.stream_send(stream_id, &d, true).unwrap();
5287
5288        s.advance().ok();
5289
5290        assert_eq!(
5291            Err(Error::ClosedCriticalStream),
5292            s.server.poll(&mut s.pipe.server)
5293        );
5294        assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
5295    }
5296
5297    #[test]
5298    /// Client resets QPACK stream, which is forbidden.
5299    fn reset_qpack_stream_after_type() {
5300        let mut s = Session::new().unwrap();
5301        s.handshake().unwrap();
5302
5303        s.pipe
5304            .client
5305            .stream_shutdown(
5306                s.client.local_qpack_streams.encoder_stream_id.unwrap(),
5307                crate::Shutdown::Write,
5308                0,
5309            )
5310            .unwrap();
5311
5312        s.advance().ok();
5313
5314        assert_eq!(
5315            Err(Error::ClosedCriticalStream),
5316            s.server.poll(&mut s.pipe.server)
5317        );
5318        assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
5319    }
5320
5321    #[test]
5322    /// Client resets QPACK stream after sending some stuff, which is forbidden.
5323    fn reset_qpack_stream_after_data() {
5324        let mut s = Session::new().unwrap();
5325        s.handshake().unwrap();
5326
5327        let stream_id = s.client.local_qpack_streams.encoder_stream_id.unwrap();
5328        let d = [0; 1];
5329
5330        s.pipe.client.stream_send(stream_id, &d, false).unwrap();
5331        s.pipe.client.stream_send(stream_id, &d, false).unwrap();
5332
5333        s.advance().ok();
5334
5335        assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
5336
5337        s.pipe
5338            .client
5339            .stream_shutdown(stream_id, crate::Shutdown::Write, 0)
5340            .unwrap();
5341
5342        s.advance().ok();
5343
5344        assert_eq!(
5345            Err(Error::ClosedCriticalStream),
5346            s.server.poll(&mut s.pipe.server)
5347        );
5348        assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
5349    }
5350
5351    #[test]
5352    /// Client sends QPACK data.
5353    fn qpack_data() {
5354        // TODO: QPACK instructions are ignored until dynamic table support is
5355        // added so we just test that the data is safely ignored.
5356        let mut s = Session::new().unwrap();
5357        s.handshake().unwrap();
5358
5359        let e_stream_id = s.client.local_qpack_streams.encoder_stream_id.unwrap();
5360        let d_stream_id = s.client.local_qpack_streams.decoder_stream_id.unwrap();
5361        let d = [0; 20];
5362
5363        s.pipe.client.stream_send(e_stream_id, &d, false).unwrap();
5364        s.advance().ok();
5365
5366        s.pipe.client.stream_send(d_stream_id, &d, false).unwrap();
5367        s.advance().ok();
5368
5369        match s.server.poll(&mut s.pipe.server) {
5370            Ok(_) => panic!(),
5371
5372            Err(Error::Done) => {
5373                assert_eq!(s.server.peer_qpack_streams.encoder_stream_bytes, 20);
5374                assert_eq!(s.server.peer_qpack_streams.decoder_stream_bytes, 20);
5375            },
5376
5377            Err(_) => {
5378                panic!();
5379            },
5380        }
5381
5382        let stats = s.server.stats();
5383        assert_eq!(stats.qpack_encoder_stream_recv_bytes, 20);
5384        assert_eq!(stats.qpack_decoder_stream_recv_bytes, 20);
5385    }
5386
5387    #[test]
5388    /// Tests limits for the stream state buffer maximum size.
5389    fn max_state_buf_size() {
5390        let mut s = Session::new().unwrap();
5391        s.handshake().unwrap();
5392
5393        let req = vec![
5394            Header::new(b":method", b"GET"),
5395            Header::new(b":scheme", b"https"),
5396            Header::new(b":authority", b"quic.tech"),
5397            Header::new(b":path", b"/test"),
5398            Header::new(b"user-agent", b"quiche-test"),
5399        ];
5400
5401        assert_eq!(
5402            s.client.send_request(&mut s.pipe.client, &req, false),
5403            Ok(0)
5404        );
5405
5406        s.advance().ok();
5407
5408        let ev_headers = Event::Headers {
5409            list: req,
5410            more_frames: true,
5411        };
5412
5413        assert_eq!(s.server.poll(&mut s.pipe.server), Ok((0, ev_headers)));
5414
5415        // DATA frames don't consume the state buffer, so can be of any size.
5416        let mut d = [42; 128];
5417        let mut b = octets::OctetsMut::with_slice(&mut d);
5418
5419        let frame_type = b.put_varint(frame::DATA_FRAME_TYPE_ID).unwrap();
5420        s.pipe.client.stream_send(0, frame_type, false).unwrap();
5421
5422        let frame_len = b.put_varint(1 << 24).unwrap();
5423        s.pipe.client.stream_send(0, frame_len, false).unwrap();
5424
5425        s.pipe.client.stream_send(0, &d, false).unwrap();
5426
5427        s.advance().ok();
5428
5429        assert_eq!(s.server.poll(&mut s.pipe.server), Ok((0, Event::Data)));
5430
5431        // GREASE frames consume the state buffer, so need to be limited.
5432        let mut s = Session::new().unwrap();
5433        s.handshake().unwrap();
5434
5435        let mut d = [42; 128];
5436        let mut b = octets::OctetsMut::with_slice(&mut d);
5437
5438        let frame_type = b.put_varint(148_764_065_110_560_899).unwrap();
5439        s.pipe.client.stream_send(0, frame_type, false).unwrap();
5440
5441        let frame_len = b.put_varint(1 << 24).unwrap();
5442        s.pipe.client.stream_send(0, frame_len, false).unwrap();
5443
5444        s.pipe.client.stream_send(0, &d, false).unwrap();
5445
5446        s.advance().ok();
5447
5448        assert_eq!(s.server.poll(&mut s.pipe.server), Err(Error::ExcessiveLoad));
5449    }
5450
5451    #[test]
5452    /// Tests that DATA frames are properly truncated depending on the request
5453    /// stream's outgoing flow control capacity.
5454    fn stream_backpressure() {
5455        let bytes = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
5456
5457        let mut s = Session::new().unwrap();
5458        s.handshake().unwrap();
5459
5460        let (stream, req) = s.send_request(false).unwrap();
5461
5462        let total_data_frames = 6;
5463
5464        for _ in 0..total_data_frames {
5465            assert_eq!(
5466                s.client
5467                    .send_body(&mut s.pipe.client, stream, &bytes, false),
5468                Ok(bytes.len())
5469            );
5470
5471            s.advance().ok();
5472        }
5473
5474        assert_eq!(
5475            s.client.send_body(&mut s.pipe.client, stream, &bytes, true),
5476            Ok(bytes.len() - 2)
5477        );
5478
5479        s.advance().ok();
5480
5481        let mut recv_buf = vec![0; bytes.len()];
5482
5483        let ev_headers = Event::Headers {
5484            list: req,
5485            more_frames: true,
5486        };
5487
5488        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
5489        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
5490        assert_eq!(s.poll_server(), Err(Error::Done));
5491
5492        for _ in 0..total_data_frames {
5493            assert_eq!(
5494                s.recv_body_server(stream, &mut recv_buf),
5495                Ok(bytes.len())
5496            );
5497        }
5498
5499        assert_eq!(
5500            s.recv_body_server(stream, &mut recv_buf),
5501            Ok(bytes.len() - 2)
5502        );
5503
5504        // Fin flag from last send_body() call was not sent as the buffer was
5505        // only partially written.
5506        assert_eq!(s.poll_server(), Err(Error::Done));
5507
5508        assert_eq!(s.pipe.server.data_blocked_sent_count, 0);
5509        assert_eq!(s.pipe.server.stream_data_blocked_sent_count, 0);
5510        assert_eq!(s.pipe.server.data_blocked_recv_count, 0);
5511        assert_eq!(s.pipe.server.stream_data_blocked_recv_count, 1);
5512
5513        assert_eq!(s.pipe.client.data_blocked_sent_count, 0);
5514        assert_eq!(s.pipe.client.stream_data_blocked_sent_count, 1);
5515        assert_eq!(s.pipe.client.data_blocked_recv_count, 0);
5516        assert_eq!(s.pipe.client.stream_data_blocked_recv_count, 0);
5517    }
5518
5519    #[test]
5520    /// Tests that the max header list size setting is enforced.
5521    fn request_max_header_size_limit() {
5522        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
5523        config
5524            .load_cert_chain_from_pem_file("examples/cert.crt")
5525            .unwrap();
5526        config
5527            .load_priv_key_from_pem_file("examples/cert.key")
5528            .unwrap();
5529        config.set_application_protos(&[b"h3"]).unwrap();
5530        config.set_initial_max_data(1500);
5531        config.set_initial_max_stream_data_bidi_local(150);
5532        config.set_initial_max_stream_data_bidi_remote(150);
5533        config.set_initial_max_stream_data_uni(150);
5534        config.set_initial_max_streams_bidi(5);
5535        config.set_initial_max_streams_uni(5);
5536        config.verify_peer(false);
5537
5538        let mut h3_config = Config::new().unwrap();
5539        h3_config.set_max_field_section_size(65);
5540
5541        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
5542
5543        s.handshake().unwrap();
5544
5545        let req = vec![
5546            Header::new(b":method", b"GET"),
5547            Header::new(b":scheme", b"https"),
5548            Header::new(b":authority", b"quic.tech"),
5549            Header::new(b":path", b"/test"),
5550            Header::new(b"aaaaaaa", b"aaaaaaaa"),
5551        ];
5552
5553        let stream = s
5554            .client
5555            .send_request(&mut s.pipe.client, &req, true)
5556            .unwrap();
5557
5558        s.advance().ok();
5559
5560        assert_eq!(stream, 0);
5561
5562        assert_eq!(s.poll_server(), Err(Error::ExcessiveLoad));
5563
5564        assert_eq!(
5565            s.pipe.server.local_error.as_ref().unwrap().error_code,
5566            Error::to_wire(Error::ExcessiveLoad)
5567        );
5568    }
5569
5570    #[test]
5571    /// Tests that Error::TransportError contains a transport error.
5572    fn transport_error() {
5573        let mut s = Session::new().unwrap();
5574        s.handshake().unwrap();
5575
5576        let req = vec![
5577            Header::new(b":method", b"GET"),
5578            Header::new(b":scheme", b"https"),
5579            Header::new(b":authority", b"quic.tech"),
5580            Header::new(b":path", b"/test"),
5581            Header::new(b"user-agent", b"quiche-test"),
5582        ];
5583
5584        // We need to open all streams in the same flight, so we can't use the
5585        // Session::send_request() method because it also calls advance(),
5586        // otherwise the server would send a MAX_STREAMS frame and the client
5587        // wouldn't hit the streams limit.
5588        assert_eq!(s.client.send_request(&mut s.pipe.client, &req, true), Ok(0));
5589        assert_eq!(s.client.send_request(&mut s.pipe.client, &req, true), Ok(4));
5590        assert_eq!(s.client.send_request(&mut s.pipe.client, &req, true), Ok(8));
5591        assert_eq!(
5592            s.client.send_request(&mut s.pipe.client, &req, true),
5593            Ok(12)
5594        );
5595        assert_eq!(
5596            s.client.send_request(&mut s.pipe.client, &req, true),
5597            Ok(16)
5598        );
5599
5600        assert_eq!(
5601            s.client.send_request(&mut s.pipe.client, &req, true),
5602            Err(Error::TransportError(crate::Error::StreamLimit))
5603        );
5604    }
5605
5606    #[test]
5607    /// Tests that sending DATA before HEADERS causes an error.
5608    fn data_before_headers() {
5609        let mut s = Session::new().unwrap();
5610        s.handshake().unwrap();
5611
5612        let mut d = [42; 128];
5613        let mut b = octets::OctetsMut::with_slice(&mut d);
5614
5615        let frame_type = b.put_varint(frame::DATA_FRAME_TYPE_ID).unwrap();
5616        s.pipe.client.stream_send(0, frame_type, false).unwrap();
5617
5618        let frame_len = b.put_varint(5).unwrap();
5619        s.pipe.client.stream_send(0, frame_len, false).unwrap();
5620
5621        s.pipe.client.stream_send(0, b"hello", false).unwrap();
5622
5623        s.advance().ok();
5624
5625        assert_eq!(
5626            s.server.poll(&mut s.pipe.server),
5627            Err(Error::FrameUnexpected)
5628        );
5629    }
5630
5631    #[test]
5632    /// Tests that calling poll() after an error occurred does nothing.
5633    fn poll_after_error() {
5634        let mut s = Session::new().unwrap();
5635        s.handshake().unwrap();
5636
5637        let mut d = [42; 128];
5638        let mut b = octets::OctetsMut::with_slice(&mut d);
5639
5640        let frame_type = b.put_varint(148_764_065_110_560_899).unwrap();
5641        s.pipe.client.stream_send(0, frame_type, false).unwrap();
5642
5643        let frame_len = b.put_varint(1 << 24).unwrap();
5644        s.pipe.client.stream_send(0, frame_len, false).unwrap();
5645
5646        s.pipe.client.stream_send(0, &d, false).unwrap();
5647
5648        s.advance().ok();
5649
5650        assert_eq!(s.server.poll(&mut s.pipe.server), Err(Error::ExcessiveLoad));
5651
5652        // Try to call poll() again after an error occurred.
5653        assert_eq!(s.server.poll(&mut s.pipe.server), Err(Error::Done));
5654    }
5655
5656    #[test]
5657    /// Tests that we limit sending HEADERS based on the stream capacity.
5658    fn headers_blocked() {
5659        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
5660        config
5661            .load_cert_chain_from_pem_file("examples/cert.crt")
5662            .unwrap();
5663        config
5664            .load_priv_key_from_pem_file("examples/cert.key")
5665            .unwrap();
5666        config.set_application_protos(&[b"h3"]).unwrap();
5667        config.set_initial_max_data(70);
5668        config.set_initial_max_stream_data_bidi_local(150);
5669        config.set_initial_max_stream_data_bidi_remote(150);
5670        config.set_initial_max_stream_data_uni(150);
5671        config.set_initial_max_streams_bidi(100);
5672        config.set_initial_max_streams_uni(5);
5673        config.verify_peer(false);
5674
5675        let h3_config = Config::new().unwrap();
5676
5677        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
5678
5679        s.handshake().unwrap();
5680
5681        let req = vec![
5682            Header::new(b":method", b"GET"),
5683            Header::new(b":scheme", b"https"),
5684            Header::new(b":authority", b"quic.tech"),
5685            Header::new(b":path", b"/test"),
5686        ];
5687
5688        assert_eq!(s.client.send_request(&mut s.pipe.client, &req, true), Ok(0));
5689
5690        assert_eq!(
5691            s.client.send_request(&mut s.pipe.client, &req, true),
5692            Err(Error::StreamBlocked)
5693        );
5694
5695        // Clear the writable stream queue.
5696        assert_eq!(s.pipe.client.stream_writable_next(), Some(2));
5697        assert_eq!(s.pipe.client.stream_writable_next(), Some(6));
5698        assert_eq!(s.pipe.client.stream_writable_next(), Some(10));
5699        assert_eq!(s.pipe.client.stream_writable_next(), None);
5700
5701        s.advance().ok();
5702
5703        // Once the server gives flow control credits back, we can send the
5704        // request.
5705        assert_eq!(s.pipe.client.stream_writable_next(), Some(4));
5706        assert_eq!(s.client.send_request(&mut s.pipe.client, &req, true), Ok(4));
5707
5708        assert_eq!(s.pipe.server.data_blocked_sent_count, 0);
5709        assert_eq!(s.pipe.server.stream_data_blocked_sent_count, 0);
5710        assert_eq!(s.pipe.server.data_blocked_recv_count, 1);
5711        assert_eq!(s.pipe.server.stream_data_blocked_recv_count, 0);
5712
5713        assert_eq!(s.pipe.client.data_blocked_sent_count, 1);
5714        assert_eq!(s.pipe.client.stream_data_blocked_sent_count, 0);
5715        assert_eq!(s.pipe.client.data_blocked_recv_count, 0);
5716        assert_eq!(s.pipe.client.stream_data_blocked_recv_count, 0);
5717    }
5718
5719    #[test]
5720    /// Ensure StreamBlocked when connection flow control prevents headers.
5721    fn headers_blocked_on_conn() {
5722        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
5723        config
5724            .load_cert_chain_from_pem_file("examples/cert.crt")
5725            .unwrap();
5726        config
5727            .load_priv_key_from_pem_file("examples/cert.key")
5728            .unwrap();
5729        config.set_application_protos(&[b"h3"]).unwrap();
5730        config.set_initial_max_data(70);
5731        config.set_initial_max_stream_data_bidi_local(150);
5732        config.set_initial_max_stream_data_bidi_remote(150);
5733        config.set_initial_max_stream_data_uni(150);
5734        config.set_initial_max_streams_bidi(100);
5735        config.set_initial_max_streams_uni(5);
5736        config.verify_peer(false);
5737
5738        let h3_config = Config::new().unwrap();
5739
5740        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
5741
5742        s.handshake().unwrap();
5743
5744        // After the HTTP handshake, some bytes of connection flow control have
5745        // been consumed. Fill the connection with more grease data on the control
5746        // stream.
5747        let d = [42; 28];
5748        assert_eq!(s.pipe.client.stream_send(2, &d, false), Ok(23));
5749
5750        let req = vec![
5751            Header::new(b":method", b"GET"),
5752            Header::new(b":scheme", b"https"),
5753            Header::new(b":authority", b"quic.tech"),
5754            Header::new(b":path", b"/test"),
5755        ];
5756
5757        // There is 0 connection-level flow control, so sending a request is
5758        // blocked.
5759        assert_eq!(
5760            s.client.send_request(&mut s.pipe.client, &req, true),
5761            Err(Error::StreamBlocked)
5762        );
5763        assert_eq!(s.pipe.client.stream_writable_next(), None);
5764
5765        // Emit the control stream data and drain it at the server via poll() to
5766        // consumes it via poll() and gives back flow control.
5767        s.advance().ok();
5768        assert_eq!(s.poll_server(), Err(Error::Done));
5769        s.advance().ok();
5770
5771        // Now we can send the request.
5772        assert_eq!(s.pipe.client.stream_writable_next(), Some(2));
5773        assert_eq!(s.pipe.client.stream_writable_next(), Some(6));
5774        assert_eq!(s.client.send_request(&mut s.pipe.client, &req, true), Ok(0));
5775
5776        assert_eq!(s.pipe.server.data_blocked_sent_count, 0);
5777        assert_eq!(s.pipe.server.stream_data_blocked_sent_count, 0);
5778        assert_eq!(s.pipe.server.data_blocked_recv_count, 1);
5779        assert_eq!(s.pipe.server.stream_data_blocked_recv_count, 0);
5780
5781        assert_eq!(s.pipe.client.data_blocked_sent_count, 1);
5782        assert_eq!(s.pipe.client.stream_data_blocked_sent_count, 0);
5783        assert_eq!(s.pipe.client.data_blocked_recv_count, 0);
5784        assert_eq!(s.pipe.client.stream_data_blocked_recv_count, 0);
5785    }
5786
5787    #[test]
5788    /// Ensure STREAM_DATA_BLOCKED is not emitted multiple times with the same
5789    /// offset when trying to send large bodies.
5790    fn send_body_truncation_stream_blocked() {
5791        use crate::test_utils::decode_pkt;
5792
5793        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
5794        config
5795            .load_cert_chain_from_pem_file("examples/cert.crt")
5796            .unwrap();
5797        config
5798            .load_priv_key_from_pem_file("examples/cert.key")
5799            .unwrap();
5800        config.set_application_protos(&[b"h3"]).unwrap();
5801        config.set_initial_max_data(10000); // large connection-level flow control
5802        config.set_initial_max_stream_data_bidi_local(80);
5803        config.set_initial_max_stream_data_bidi_remote(80);
5804        config.set_initial_max_stream_data_uni(150);
5805        config.set_initial_max_streams_bidi(100);
5806        config.set_initial_max_streams_uni(5);
5807        config.verify_peer(false);
5808
5809        let h3_config = Config::new().unwrap();
5810
5811        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
5812
5813        s.handshake().unwrap();
5814
5815        let (stream, req) = s.send_request(true).unwrap();
5816
5817        let ev_headers = Event::Headers {
5818            list: req,
5819            more_frames: false,
5820        };
5821
5822        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
5823        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
5824
5825        let _ = s.send_response(stream, false).unwrap();
5826
5827        assert_eq!(s.pipe.server.streams.blocked().len(), 0);
5828
5829        // The body must be larger than the stream window would allow
5830        let d = [42; 500];
5831        let mut off = 0;
5832
5833        let sent = s
5834            .server
5835            .send_body(&mut s.pipe.server, stream, &d, true)
5836            .unwrap();
5837        assert_eq!(sent, 25);
5838        off += sent;
5839
5840        // send_body wrote as much as it could (sent < size of buff).
5841        assert_eq!(s.pipe.server.streams.blocked().len(), 1);
5842        assert_eq!(
5843            s.server
5844                .send_body(&mut s.pipe.server, stream, &d[off..], true),
5845            Err(Error::Done)
5846        );
5847        assert_eq!(s.pipe.server.streams.blocked().len(), 1);
5848
5849        // Now read raw frames to see what the QUIC layer did
5850        let mut buf = [0; 65535];
5851        let (len, _) = s.pipe.server.send(&mut buf).unwrap();
5852
5853        let frames = decode_pkt(&mut s.pipe.client, &mut buf[..len]).unwrap();
5854
5855        let mut iter = frames.iter();
5856
5857        assert_eq!(
5858            iter.next(),
5859            Some(&crate::frame::Frame::StreamDataBlocked {
5860                stream_id: 0,
5861                limit: 80,
5862            })
5863        );
5864
5865        // At the server, after sending the STREAM_DATA_BLOCKED frame, we clear
5866        // the mark.
5867        assert_eq!(s.pipe.server.streams.blocked().len(), 0);
5868
5869        // Don't read any data from the client, so stream flow control is never
5870        // given back in the form of changing the stream's max offset.
5871        // Subsequent body send operations will still fail but no more
5872        // STREAM_DATA_BLOCKED frames should be submitted since the limit didn't
5873        // change. No frames means no packet to send.
5874        assert_eq!(
5875            s.server
5876                .send_body(&mut s.pipe.server, stream, &d[off..], true),
5877            Err(Error::Done)
5878        );
5879        assert_eq!(s.pipe.server.streams.blocked().len(), 0);
5880        assert_eq!(s.pipe.server.send(&mut buf), Err(crate::Error::Done));
5881
5882        // Now update the client's max offset manually.
5883        let frames = [crate::frame::Frame::MaxStreamData {
5884            stream_id: 0,
5885            max: 100,
5886        }];
5887
5888        let pkt_type = crate::packet::Type::Short;
5889        assert_eq!(
5890            s.pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
5891            Ok(39),
5892        );
5893
5894        let sent = s
5895            .server
5896            .send_body(&mut s.pipe.server, stream, &d[off..], true)
5897            .unwrap();
5898        assert_eq!(sent, 18);
5899
5900        // Same thing here...
5901        assert_eq!(s.pipe.server.streams.blocked().len(), 1);
5902        assert_eq!(
5903            s.server
5904                .send_body(&mut s.pipe.server, stream, &d[off..], true),
5905            Err(Error::Done)
5906        );
5907        assert_eq!(s.pipe.server.streams.blocked().len(), 1);
5908
5909        let (len, _) = s.pipe.server.send(&mut buf).unwrap();
5910
5911        let frames = decode_pkt(&mut s.pipe.client, &mut buf[..len]).unwrap();
5912
5913        let mut iter = frames.iter();
5914
5915        assert_eq!(
5916            iter.next(),
5917            Some(&crate::frame::Frame::StreamDataBlocked {
5918                stream_id: 0,
5919                limit: 100,
5920            })
5921        );
5922    }
5923
5924    #[test]
5925    /// Ensure stream doesn't hang due to small cwnd.
5926    fn send_body_stream_blocked_by_small_cwnd() {
5927        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
5928        config
5929            .load_cert_chain_from_pem_file("examples/cert.crt")
5930            .unwrap();
5931        config
5932            .load_priv_key_from_pem_file("examples/cert.key")
5933            .unwrap();
5934        config.set_application_protos(&[b"h3"]).unwrap();
5935        config.set_initial_max_data(100000); // large connection-level flow control
5936        config.set_initial_max_stream_data_bidi_local(100000);
5937        config.set_initial_max_stream_data_bidi_remote(50000);
5938        config.set_initial_max_stream_data_uni(150);
5939        config.set_initial_max_streams_bidi(100);
5940        config.set_initial_max_streams_uni(5);
5941        config.verify_peer(false);
5942
5943        let h3_config = Config::new().unwrap();
5944
5945        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
5946
5947        s.handshake().unwrap();
5948
5949        let (stream, req) = s.send_request(true).unwrap();
5950
5951        let ev_headers = Event::Headers {
5952            list: req,
5953            more_frames: false,
5954        };
5955
5956        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
5957        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
5958
5959        let _ = s.send_response(stream, false).unwrap();
5960
5961        // Clear the writable stream queue.
5962        assert_eq!(s.pipe.server.stream_writable_next(), Some(3));
5963        assert_eq!(s.pipe.server.stream_writable_next(), Some(7));
5964        assert_eq!(s.pipe.server.stream_writable_next(), Some(11));
5965        assert_eq!(s.pipe.server.stream_writable_next(), Some(stream));
5966        assert_eq!(s.pipe.server.stream_writable_next(), None);
5967
5968        // The body must be larger than the cwnd would allow.
5969        let send_buf = [42; 80000];
5970
5971        let sent = s
5972            .server
5973            .send_body(&mut s.pipe.server, stream, &send_buf, true)
5974            .unwrap();
5975
5976        // send_body wrote as much as it could (sent < size of buff).
5977        assert_eq!(sent, 11995);
5978
5979        s.advance().ok();
5980
5981        // Client reads received headers and body.
5982        let mut recv_buf = [42; 80000];
5983        assert!(s.poll_client().is_ok());
5984        assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
5985        assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(11995));
5986
5987        s.advance().ok();
5988
5989        // Server send cap is smaller than remaining body buffer.
5990        assert!(s.pipe.server.tx_cap < send_buf.len() - sent);
5991
5992        // Once the server cwnd opens up, we can send more body.
5993        assert_eq!(s.pipe.server.stream_writable_next(), Some(0));
5994    }
5995
5996    #[test]
5997    /// Ensure stream doesn't hang due to small cwnd.
5998    fn send_body_stream_blocked_zero_length() {
5999        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
6000        config
6001            .load_cert_chain_from_pem_file("examples/cert.crt")
6002            .unwrap();
6003        config
6004            .load_priv_key_from_pem_file("examples/cert.key")
6005            .unwrap();
6006        config.set_application_protos(&[b"h3"]).unwrap();
6007        config.set_initial_max_data(100000); // large connection-level flow control
6008        config.set_initial_max_stream_data_bidi_local(100000);
6009        config.set_initial_max_stream_data_bidi_remote(50000);
6010        config.set_initial_max_stream_data_uni(150);
6011        config.set_initial_max_streams_bidi(100);
6012        config.set_initial_max_streams_uni(5);
6013        config.verify_peer(false);
6014
6015        let h3_config = Config::new().unwrap();
6016
6017        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
6018
6019        s.handshake().unwrap();
6020
6021        let (stream, req) = s.send_request(true).unwrap();
6022
6023        let ev_headers = Event::Headers {
6024            list: req,
6025            more_frames: false,
6026        };
6027
6028        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
6029        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
6030
6031        let _ = s.send_response(stream, false).unwrap();
6032
6033        // Clear the writable stream queue.
6034        assert_eq!(s.pipe.server.stream_writable_next(), Some(3));
6035        assert_eq!(s.pipe.server.stream_writable_next(), Some(7));
6036        assert_eq!(s.pipe.server.stream_writable_next(), Some(11));
6037        assert_eq!(s.pipe.server.stream_writable_next(), Some(stream));
6038        assert_eq!(s.pipe.server.stream_writable_next(), None);
6039
6040        // The body is large enough to fill the cwnd, except for enough bytes
6041        // for another DATA frame header (but no payload).
6042        let send_buf = [42; 11994];
6043
6044        let sent = s
6045            .server
6046            .send_body(&mut s.pipe.server, stream, &send_buf, false)
6047            .unwrap();
6048
6049        assert_eq!(sent, 11994);
6050
6051        // There is only enough capacity left for the DATA frame header, but
6052        // no payload.
6053        assert_eq!(s.pipe.server.stream_capacity(stream).unwrap(), 3);
6054        assert_eq!(
6055            s.server
6056                .send_body(&mut s.pipe.server, stream, &send_buf, false),
6057            Err(Error::Done)
6058        );
6059
6060        s.advance().ok();
6061
6062        // Client reads received headers and body.
6063        let mut recv_buf = [42; 80000];
6064        assert!(s.poll_client().is_ok());
6065        assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
6066        assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(11994));
6067
6068        s.advance().ok();
6069
6070        // Once the server cwnd opens up, we can send more body.
6071        assert_eq!(s.pipe.server.stream_writable_next(), Some(0));
6072    }
6073
6074    #[test]
6075    /// Test handling of 0-length DATA writes with and without fin.
6076    fn zero_length_data() {
6077        let mut s = Session::new().unwrap();
6078        s.handshake().unwrap();
6079
6080        let (stream, req) = s.send_request(false).unwrap();
6081
6082        assert_eq!(
6083            s.client.send_body(&mut s.pipe.client, 0, b"", false),
6084            Err(Error::Done)
6085        );
6086        assert_eq!(s.client.send_body(&mut s.pipe.client, 0, b"", true), Ok(0));
6087
6088        s.advance().ok();
6089
6090        let mut recv_buf = vec![0; 100];
6091
6092        let ev_headers = Event::Headers {
6093            list: req,
6094            more_frames: true,
6095        };
6096
6097        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
6098
6099        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
6100        assert_eq!(s.recv_body_server(stream, &mut recv_buf), Err(Error::Done));
6101
6102        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
6103        assert_eq!(s.poll_server(), Err(Error::Done));
6104
6105        let resp = s.send_response(stream, false).unwrap();
6106
6107        assert_eq!(
6108            s.server.send_body(&mut s.pipe.server, 0, b"", false),
6109            Err(Error::Done)
6110        );
6111        assert_eq!(s.server.send_body(&mut s.pipe.server, 0, b"", true), Ok(0));
6112
6113        s.advance().ok();
6114
6115        let ev_headers = Event::Headers {
6116            list: resp,
6117            more_frames: true,
6118        };
6119
6120        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
6121
6122        assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
6123        assert_eq!(s.recv_body_client(stream, &mut recv_buf), Err(Error::Done));
6124
6125        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
6126        assert_eq!(s.poll_client(), Err(Error::Done));
6127    }
6128
6129    #[test]
6130    /// Tests that blocked 0-length DATA writes are reported correctly.
6131    fn zero_length_data_blocked() {
6132        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
6133        config
6134            .load_cert_chain_from_pem_file("examples/cert.crt")
6135            .unwrap();
6136        config
6137            .load_priv_key_from_pem_file("examples/cert.key")
6138            .unwrap();
6139        config.set_application_protos(&[b"h3"]).unwrap();
6140        config.set_initial_max_data(69);
6141        config.set_initial_max_stream_data_bidi_local(150);
6142        config.set_initial_max_stream_data_bidi_remote(150);
6143        config.set_initial_max_stream_data_uni(150);
6144        config.set_initial_max_streams_bidi(100);
6145        config.set_initial_max_streams_uni(5);
6146        config.verify_peer(false);
6147
6148        let h3_config = Config::new().unwrap();
6149
6150        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
6151
6152        s.handshake().unwrap();
6153
6154        let req = vec![
6155            Header::new(b":method", b"GET"),
6156            Header::new(b":scheme", b"https"),
6157            Header::new(b":authority", b"quic.tech"),
6158            Header::new(b":path", b"/test"),
6159        ];
6160
6161        assert_eq!(
6162            s.client.send_request(&mut s.pipe.client, &req, false),
6163            Ok(0)
6164        );
6165
6166        assert_eq!(
6167            s.client.send_body(&mut s.pipe.client, 0, b"", true),
6168            Err(Error::Done)
6169        );
6170
6171        // Clear the writable stream queue.
6172        assert_eq!(s.pipe.client.stream_writable_next(), Some(2));
6173        assert_eq!(s.pipe.client.stream_writable_next(), Some(6));
6174        assert_eq!(s.pipe.client.stream_writable_next(), Some(10));
6175        assert_eq!(s.pipe.client.stream_writable_next(), None);
6176
6177        s.advance().ok();
6178
6179        // Once the server gives flow control credits back, we can send the body.
6180        assert_eq!(s.pipe.client.stream_writable_next(), Some(0));
6181        assert_eq!(s.client.send_body(&mut s.pipe.client, 0, b"", true), Ok(0));
6182    }
6183
6184    #[test]
6185    /// Tests that receiving an empty SETTINGS frame is handled and reported.
6186    fn empty_settings() {
6187        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
6188        config
6189            .load_cert_chain_from_pem_file("examples/cert.crt")
6190            .unwrap();
6191        config
6192            .load_priv_key_from_pem_file("examples/cert.key")
6193            .unwrap();
6194        config.set_application_protos(&[b"h3"]).unwrap();
6195        config.set_initial_max_data(1500);
6196        config.set_initial_max_stream_data_bidi_local(150);
6197        config.set_initial_max_stream_data_bidi_remote(150);
6198        config.set_initial_max_stream_data_uni(150);
6199        config.set_initial_max_streams_bidi(5);
6200        config.set_initial_max_streams_uni(5);
6201        config.verify_peer(false);
6202        config.set_ack_delay_exponent(8);
6203        config.grease(false);
6204
6205        let h3_config = Config::new().unwrap();
6206        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
6207
6208        s.handshake().unwrap();
6209
6210        assert!(s.client.peer_settings_raw().is_some());
6211        assert!(s.server.peer_settings_raw().is_some());
6212    }
6213
6214    #[test]
6215    /// Tests that receiving a H3_DATAGRAM setting is ok.
6216    fn dgram_setting() {
6217        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
6218        config
6219            .load_cert_chain_from_pem_file("examples/cert.crt")
6220            .unwrap();
6221        config
6222            .load_priv_key_from_pem_file("examples/cert.key")
6223            .unwrap();
6224        config.set_application_protos(&[b"h3"]).unwrap();
6225        config.set_initial_max_data(70);
6226        config.set_initial_max_stream_data_bidi_local(150);
6227        config.set_initial_max_stream_data_bidi_remote(150);
6228        config.set_initial_max_stream_data_uni(150);
6229        config.set_initial_max_streams_bidi(100);
6230        config.set_initial_max_streams_uni(5);
6231        config.enable_dgram(true, 1000, 1000);
6232        config.verify_peer(false);
6233
6234        let h3_config = Config::new().unwrap();
6235
6236        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
6237        assert_eq!(s.pipe.handshake(), Ok(()));
6238
6239        s.client.send_settings(&mut s.pipe.client).unwrap();
6240        assert_eq!(s.pipe.advance(), Ok(()));
6241
6242        // Before processing SETTINGS (via poll), HTTP/3 DATAGRAMS are not
6243        // enabled.
6244        assert!(!s.server.dgram_enabled_by_peer(&s.pipe.server));
6245
6246        // When everything is ok, poll returns Done and DATAGRAM is enabled.
6247        assert_eq!(s.server.poll(&mut s.pipe.server), Err(Error::Done));
6248        assert!(s.server.dgram_enabled_by_peer(&s.pipe.server));
6249
6250        // Now detect things on the client
6251        s.server.send_settings(&mut s.pipe.server).unwrap();
6252        assert_eq!(s.pipe.advance(), Ok(()));
6253        assert!(!s.client.dgram_enabled_by_peer(&s.pipe.client));
6254        assert_eq!(s.client.poll(&mut s.pipe.client), Err(Error::Done));
6255        assert!(s.client.dgram_enabled_by_peer(&s.pipe.client));
6256    }
6257
6258    #[test]
6259    /// Tests that receiving a H3_DATAGRAM setting when no TP is set generates
6260    /// an error.
6261    fn dgram_setting_no_tp() {
6262        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
6263        config
6264            .load_cert_chain_from_pem_file("examples/cert.crt")
6265            .unwrap();
6266        config
6267            .load_priv_key_from_pem_file("examples/cert.key")
6268            .unwrap();
6269        config.set_application_protos(&[b"h3"]).unwrap();
6270        config.set_initial_max_data(70);
6271        config.set_initial_max_stream_data_bidi_local(150);
6272        config.set_initial_max_stream_data_bidi_remote(150);
6273        config.set_initial_max_stream_data_uni(150);
6274        config.set_initial_max_streams_bidi(100);
6275        config.set_initial_max_streams_uni(5);
6276        config.verify_peer(false);
6277
6278        let h3_config = Config::new().unwrap();
6279
6280        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
6281        assert_eq!(s.pipe.handshake(), Ok(()));
6282
6283        s.client.control_stream_id = Some(
6284            s.client
6285                .open_uni_stream(
6286                    &mut s.pipe.client,
6287                    stream::HTTP3_CONTROL_STREAM_TYPE_ID,
6288                )
6289                .unwrap(),
6290        );
6291
6292        let settings = frame::Frame::Settings {
6293            max_field_section_size: None,
6294            qpack_max_table_capacity: None,
6295            qpack_blocked_streams: None,
6296            connect_protocol_enabled: None,
6297            h3_datagram: Some(1),
6298            grease: None,
6299            additional_settings: Default::default(),
6300            raw: Default::default(),
6301        };
6302
6303        s.send_frame_client(settings, s.client.control_stream_id.unwrap(), false)
6304            .unwrap();
6305
6306        assert_eq!(s.pipe.advance(), Ok(()));
6307
6308        assert_eq!(s.server.poll(&mut s.pipe.server), Err(Error::SettingsError));
6309    }
6310
6311    #[test]
6312    /// Tests that receiving SETTINGS with prohibited values generates an error.
6313    fn settings_h2_prohibited() {
6314        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
6315        config
6316            .load_cert_chain_from_pem_file("examples/cert.crt")
6317            .unwrap();
6318        config
6319            .load_priv_key_from_pem_file("examples/cert.key")
6320            .unwrap();
6321        config.set_application_protos(&[b"h3"]).unwrap();
6322        config.set_initial_max_data(70);
6323        config.set_initial_max_stream_data_bidi_local(150);
6324        config.set_initial_max_stream_data_bidi_remote(150);
6325        config.set_initial_max_stream_data_uni(150);
6326        config.set_initial_max_streams_bidi(100);
6327        config.set_initial_max_streams_uni(5);
6328        config.verify_peer(false);
6329
6330        let h3_config = Config::new().unwrap();
6331
6332        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
6333        assert_eq!(s.pipe.handshake(), Ok(()));
6334
6335        s.client.control_stream_id = Some(
6336            s.client
6337                .open_uni_stream(
6338                    &mut s.pipe.client,
6339                    stream::HTTP3_CONTROL_STREAM_TYPE_ID,
6340                )
6341                .unwrap(),
6342        );
6343
6344        s.server.control_stream_id = Some(
6345            s.server
6346                .open_uni_stream(
6347                    &mut s.pipe.server,
6348                    stream::HTTP3_CONTROL_STREAM_TYPE_ID,
6349                )
6350                .unwrap(),
6351        );
6352
6353        let frame_payload_len = 2u64;
6354        let settings = [
6355            frame::SETTINGS_FRAME_TYPE_ID as u8,
6356            frame_payload_len as u8,
6357            0x2, // 0x2 is a reserved setting type
6358            1,
6359        ];
6360
6361        s.send_arbitrary_stream_data_client(
6362            &settings,
6363            s.client.control_stream_id.unwrap(),
6364            false,
6365        )
6366        .unwrap();
6367
6368        s.send_arbitrary_stream_data_server(
6369            &settings,
6370            s.server.control_stream_id.unwrap(),
6371            false,
6372        )
6373        .unwrap();
6374
6375        assert_eq!(s.pipe.advance(), Ok(()));
6376
6377        assert_eq!(s.server.poll(&mut s.pipe.server), Err(Error::SettingsError));
6378
6379        assert_eq!(s.client.poll(&mut s.pipe.client), Err(Error::SettingsError));
6380    }
6381
6382    #[test]
6383    /// Tests that setting SETTINGS with prohibited values generates an error.
6384    fn set_prohibited_additional_settings() {
6385        let mut h3_config = Config::new().unwrap();
6386        assert_eq!(
6387            h3_config.set_additional_settings(vec![(
6388                frame::SETTINGS_QPACK_MAX_TABLE_CAPACITY,
6389                43
6390            )]),
6391            Err(Error::SettingsError)
6392        );
6393        assert_eq!(
6394            h3_config.set_additional_settings(vec![(
6395                frame::SETTINGS_MAX_FIELD_SECTION_SIZE,
6396                43
6397            )]),
6398            Err(Error::SettingsError)
6399        );
6400        assert_eq!(
6401            h3_config.set_additional_settings(vec![(
6402                frame::SETTINGS_QPACK_BLOCKED_STREAMS,
6403                43
6404            )]),
6405            Err(Error::SettingsError)
6406        );
6407        assert_eq!(
6408            h3_config.set_additional_settings(vec![(
6409                frame::SETTINGS_ENABLE_CONNECT_PROTOCOL,
6410                43
6411            )]),
6412            Err(Error::SettingsError)
6413        );
6414        assert_eq!(
6415            h3_config
6416                .set_additional_settings(vec![(frame::SETTINGS_H3_DATAGRAM, 43)]),
6417            Err(Error::SettingsError)
6418        );
6419    }
6420
6421    #[test]
6422    /// Tests additional settings are actually exchanged by the peers.
6423    fn set_additional_settings() {
6424        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
6425        config
6426            .load_cert_chain_from_pem_file("examples/cert.crt")
6427            .unwrap();
6428        config
6429            .load_priv_key_from_pem_file("examples/cert.key")
6430            .unwrap();
6431        config.set_application_protos(&[b"h3"]).unwrap();
6432        config.set_initial_max_data(70);
6433        config.set_initial_max_stream_data_bidi_local(150);
6434        config.set_initial_max_stream_data_bidi_remote(150);
6435        config.set_initial_max_stream_data_uni(150);
6436        config.set_initial_max_streams_bidi(100);
6437        config.set_initial_max_streams_uni(5);
6438        config.verify_peer(false);
6439        config.grease(false);
6440
6441        let mut h3_config = Config::new().unwrap();
6442        h3_config
6443            .set_additional_settings(vec![(42, 43), (44, 45)])
6444            .unwrap();
6445
6446        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
6447        assert_eq!(s.pipe.handshake(), Ok(()));
6448
6449        assert_eq!(s.pipe.advance(), Ok(()));
6450
6451        s.client.send_settings(&mut s.pipe.client).unwrap();
6452        assert_eq!(s.pipe.advance(), Ok(()));
6453        assert_eq!(s.server.poll(&mut s.pipe.server), Err(Error::Done));
6454
6455        s.server.send_settings(&mut s.pipe.server).unwrap();
6456        assert_eq!(s.pipe.advance(), Ok(()));
6457        assert_eq!(s.client.poll(&mut s.pipe.client), Err(Error::Done));
6458
6459        assert_eq!(
6460            s.server.peer_settings_raw(),
6461            Some(&[(42, 43), (44, 45)][..])
6462        );
6463        assert_eq!(
6464            s.client.peer_settings_raw(),
6465            Some(&[(42, 43), (44, 45)][..])
6466        );
6467    }
6468
6469    #[test]
6470    /// Send a single DATAGRAM.
6471    fn single_dgram() {
6472        let mut buf = [0; 65535];
6473        let mut s = Session::new().unwrap();
6474        s.handshake().unwrap();
6475
6476        // We'll send default data of 10 bytes on flow ID 0.
6477        let result = (11, 0, 1);
6478
6479        s.send_dgram_client(0).unwrap();
6480
6481        assert_eq!(s.poll_server(), Err(Error::Done));
6482        assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
6483
6484        s.send_dgram_server(0).unwrap();
6485        assert_eq!(s.poll_client(), Err(Error::Done));
6486        assert_eq!(s.recv_dgram_client(&mut buf), Ok(result));
6487    }
6488
6489    #[test]
6490    /// Send multiple DATAGRAMs.
6491    fn multiple_dgram() {
6492        let mut buf = [0; 65535];
6493        let mut s = Session::new().unwrap();
6494        s.handshake().unwrap();
6495
6496        // We'll send default data of 10 bytes on flow ID 0.
6497        let result = (11, 0, 1);
6498
6499        s.send_dgram_client(0).unwrap();
6500        s.send_dgram_client(0).unwrap();
6501        s.send_dgram_client(0).unwrap();
6502
6503        assert_eq!(s.poll_server(), Err(Error::Done));
6504        assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
6505        assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
6506        assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
6507        assert_eq!(s.recv_dgram_server(&mut buf), Err(Error::Done));
6508
6509        s.send_dgram_server(0).unwrap();
6510        s.send_dgram_server(0).unwrap();
6511        s.send_dgram_server(0).unwrap();
6512
6513        assert_eq!(s.poll_client(), Err(Error::Done));
6514        assert_eq!(s.recv_dgram_client(&mut buf), Ok(result));
6515        assert_eq!(s.recv_dgram_client(&mut buf), Ok(result));
6516        assert_eq!(s.recv_dgram_client(&mut buf), Ok(result));
6517        assert_eq!(s.recv_dgram_client(&mut buf), Err(Error::Done));
6518    }
6519
6520    #[test]
6521    /// Send more DATAGRAMs than the send queue allows.
6522    fn multiple_dgram_overflow() {
6523        let mut buf = [0; 65535];
6524        let mut s = Session::new().unwrap();
6525        s.handshake().unwrap();
6526
6527        // We'll send default data of 10 bytes on flow ID 0.
6528        let result = (11, 0, 1);
6529
6530        // Five DATAGRAMs
6531        s.send_dgram_client(0).unwrap();
6532        s.send_dgram_client(0).unwrap();
6533        s.send_dgram_client(0).unwrap();
6534        s.send_dgram_client(0).unwrap();
6535        s.send_dgram_client(0).unwrap();
6536
6537        // Only 3 independent DATAGRAMs to read events will fire.
6538        assert_eq!(s.poll_server(), Err(Error::Done));
6539        assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
6540        assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
6541        assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
6542        assert_eq!(s.recv_dgram_server(&mut buf), Err(Error::Done));
6543    }
6544
6545    #[test]
6546    /// Send a single DATAGRAM and request.
6547    fn poll_datagram_cycling_no_read() {
6548        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
6549        config
6550            .load_cert_chain_from_pem_file("examples/cert.crt")
6551            .unwrap();
6552        config
6553            .load_priv_key_from_pem_file("examples/cert.key")
6554            .unwrap();
6555        config.set_application_protos(&[b"h3"]).unwrap();
6556        config.set_initial_max_data(1500);
6557        config.set_initial_max_stream_data_bidi_local(150);
6558        config.set_initial_max_stream_data_bidi_remote(150);
6559        config.set_initial_max_stream_data_uni(150);
6560        config.set_initial_max_streams_bidi(100);
6561        config.set_initial_max_streams_uni(5);
6562        config.verify_peer(false);
6563        config.enable_dgram(true, 100, 100);
6564
6565        let h3_config = Config::new().unwrap();
6566        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
6567        s.handshake().unwrap();
6568
6569        // Send request followed by DATAGRAM on client side.
6570        let (stream, req) = s.send_request(false).unwrap();
6571
6572        s.send_body_client(stream, true).unwrap();
6573
6574        let ev_headers = Event::Headers {
6575            list: req,
6576            more_frames: true,
6577        };
6578
6579        s.send_dgram_client(0).unwrap();
6580
6581        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
6582        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
6583
6584        assert_eq!(s.poll_server(), Err(Error::Done));
6585    }
6586
6587    #[test]
6588    /// Send a single DATAGRAM and request.
6589    fn poll_datagram_single_read() {
6590        let mut buf = [0; 65535];
6591
6592        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
6593        config
6594            .load_cert_chain_from_pem_file("examples/cert.crt")
6595            .unwrap();
6596        config
6597            .load_priv_key_from_pem_file("examples/cert.key")
6598            .unwrap();
6599        config.set_application_protos(&[b"h3"]).unwrap();
6600        config.set_initial_max_data(1500);
6601        config.set_initial_max_stream_data_bidi_local(150);
6602        config.set_initial_max_stream_data_bidi_remote(150);
6603        config.set_initial_max_stream_data_uni(150);
6604        config.set_initial_max_streams_bidi(100);
6605        config.set_initial_max_streams_uni(5);
6606        config.verify_peer(false);
6607        config.enable_dgram(true, 100, 100);
6608
6609        let h3_config = Config::new().unwrap();
6610        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
6611        s.handshake().unwrap();
6612
6613        // We'll send default data of 10 bytes on flow ID 0.
6614        let result = (11, 0, 1);
6615
6616        // Send request followed by DATAGRAM on client side.
6617        let (stream, req) = s.send_request(false).unwrap();
6618
6619        let body = s.send_body_client(stream, true).unwrap();
6620
6621        let mut recv_buf = vec![0; body.len()];
6622
6623        let ev_headers = Event::Headers {
6624            list: req,
6625            more_frames: true,
6626        };
6627
6628        s.send_dgram_client(0).unwrap();
6629
6630        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
6631        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
6632
6633        assert_eq!(s.poll_server(), Err(Error::Done));
6634
6635        assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
6636
6637        assert_eq!(s.poll_server(), Err(Error::Done));
6638
6639        assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
6640        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
6641        assert_eq!(s.poll_server(), Err(Error::Done));
6642
6643        // Send response followed by DATAGRAM on server side
6644        let resp = s.send_response(stream, false).unwrap();
6645
6646        let body = s.send_body_server(stream, true).unwrap();
6647
6648        let mut recv_buf = vec![0; body.len()];
6649
6650        let ev_headers = Event::Headers {
6651            list: resp,
6652            more_frames: true,
6653        };
6654
6655        s.send_dgram_server(0).unwrap();
6656
6657        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
6658        assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
6659
6660        assert_eq!(s.poll_client(), Err(Error::Done));
6661
6662        assert_eq!(s.recv_dgram_client(&mut buf), Ok(result));
6663
6664        assert_eq!(s.poll_client(), Err(Error::Done));
6665
6666        assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(body.len()));
6667
6668        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
6669        assert_eq!(s.poll_client(), Err(Error::Done));
6670    }
6671
6672    #[test]
6673    /// Send multiple DATAGRAMs and requests.
6674    fn poll_datagram_multi_read() {
6675        let mut buf = [0; 65535];
6676
6677        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
6678        config
6679            .load_cert_chain_from_pem_file("examples/cert.crt")
6680            .unwrap();
6681        config
6682            .load_priv_key_from_pem_file("examples/cert.key")
6683            .unwrap();
6684        config.set_application_protos(&[b"h3"]).unwrap();
6685        config.set_initial_max_data(1500);
6686        config.set_initial_max_stream_data_bidi_local(150);
6687        config.set_initial_max_stream_data_bidi_remote(150);
6688        config.set_initial_max_stream_data_uni(150);
6689        config.set_initial_max_streams_bidi(100);
6690        config.set_initial_max_streams_uni(5);
6691        config.verify_peer(false);
6692        config.enable_dgram(true, 100, 100);
6693
6694        let h3_config = Config::new().unwrap();
6695        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
6696        s.handshake().unwrap();
6697
6698        // 10 bytes on flow ID 0 and 2.
6699        let flow_0_result = (11, 0, 1);
6700        let flow_2_result = (11, 2, 1);
6701
6702        // Send requests followed by DATAGRAMs on client side.
6703        let (stream, req) = s.send_request(false).unwrap();
6704
6705        let body = s.send_body_client(stream, true).unwrap();
6706
6707        let mut recv_buf = vec![0; body.len()];
6708
6709        let ev_headers = Event::Headers {
6710            list: req,
6711            more_frames: true,
6712        };
6713
6714        s.send_dgram_client(0).unwrap();
6715        s.send_dgram_client(0).unwrap();
6716        s.send_dgram_client(0).unwrap();
6717        s.send_dgram_client(0).unwrap();
6718        s.send_dgram_client(0).unwrap();
6719        s.send_dgram_client(2).unwrap();
6720        s.send_dgram_client(2).unwrap();
6721        s.send_dgram_client(2).unwrap();
6722        s.send_dgram_client(2).unwrap();
6723        s.send_dgram_client(2).unwrap();
6724
6725        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
6726        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
6727
6728        assert_eq!(s.poll_server(), Err(Error::Done));
6729
6730        // Second cycle, start to read
6731        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
6732        assert_eq!(s.poll_server(), Err(Error::Done));
6733        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
6734        assert_eq!(s.poll_server(), Err(Error::Done));
6735        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
6736        assert_eq!(s.poll_server(), Err(Error::Done));
6737
6738        assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
6739        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
6740
6741        assert_eq!(s.poll_server(), Err(Error::Done));
6742
6743        // Third cycle.
6744        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
6745        assert_eq!(s.poll_server(), Err(Error::Done));
6746        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
6747        assert_eq!(s.poll_server(), Err(Error::Done));
6748        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
6749        assert_eq!(s.poll_server(), Err(Error::Done));
6750        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
6751        assert_eq!(s.poll_server(), Err(Error::Done));
6752        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
6753        assert_eq!(s.poll_server(), Err(Error::Done));
6754        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
6755        assert_eq!(s.poll_server(), Err(Error::Done));
6756        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
6757        assert_eq!(s.poll_server(), Err(Error::Done));
6758
6759        // Send response followed by DATAGRAM on server side
6760        let resp = s.send_response(stream, false).unwrap();
6761
6762        let body = s.send_body_server(stream, true).unwrap();
6763
6764        let mut recv_buf = vec![0; body.len()];
6765
6766        let ev_headers = Event::Headers {
6767            list: resp,
6768            more_frames: true,
6769        };
6770
6771        s.send_dgram_server(0).unwrap();
6772        s.send_dgram_server(0).unwrap();
6773        s.send_dgram_server(0).unwrap();
6774        s.send_dgram_server(0).unwrap();
6775        s.send_dgram_server(0).unwrap();
6776        s.send_dgram_server(2).unwrap();
6777        s.send_dgram_server(2).unwrap();
6778        s.send_dgram_server(2).unwrap();
6779        s.send_dgram_server(2).unwrap();
6780        s.send_dgram_server(2).unwrap();
6781
6782        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
6783        assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
6784
6785        assert_eq!(s.poll_client(), Err(Error::Done));
6786
6787        // Second cycle, start to read
6788        assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_0_result));
6789        assert_eq!(s.poll_client(), Err(Error::Done));
6790        assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_0_result));
6791        assert_eq!(s.poll_client(), Err(Error::Done));
6792        assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_0_result));
6793        assert_eq!(s.poll_client(), Err(Error::Done));
6794
6795        assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(body.len()));
6796        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
6797
6798        assert_eq!(s.poll_client(), Err(Error::Done));
6799
6800        // Third cycle.
6801        assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_0_result));
6802        assert_eq!(s.poll_client(), Err(Error::Done));
6803        assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_0_result));
6804        assert_eq!(s.poll_client(), Err(Error::Done));
6805        assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_2_result));
6806        assert_eq!(s.poll_client(), Err(Error::Done));
6807        assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_2_result));
6808        assert_eq!(s.poll_client(), Err(Error::Done));
6809        assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_2_result));
6810        assert_eq!(s.poll_client(), Err(Error::Done));
6811        assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_2_result));
6812        assert_eq!(s.poll_client(), Err(Error::Done));
6813        assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_2_result));
6814        assert_eq!(s.poll_client(), Err(Error::Done));
6815    }
6816
6817    #[test]
6818    /// Tests that the Finished event is not issued for streams of unknown type
6819    /// (e.g. GREASE).
6820    fn finished_is_for_requests() {
6821        let mut s = Session::new().unwrap();
6822        s.handshake().unwrap();
6823
6824        assert_eq!(s.poll_client(), Err(Error::Done));
6825        assert_eq!(s.poll_server(), Err(Error::Done));
6826
6827        assert_eq!(s.client.open_grease_stream(&mut s.pipe.client), Ok(()));
6828        assert_eq!(s.pipe.advance(), Ok(()));
6829
6830        assert_eq!(s.poll_client(), Err(Error::Done));
6831        assert_eq!(s.poll_server(), Err(Error::Done));
6832    }
6833
6834    #[test]
6835    /// Tests that streams are marked as finished only once.
6836    fn finished_once() {
6837        let mut s = Session::new().unwrap();
6838        s.handshake().unwrap();
6839
6840        let (stream, req) = s.send_request(false).unwrap();
6841        let body = s.send_body_client(stream, true).unwrap();
6842
6843        let mut recv_buf = vec![0; body.len()];
6844
6845        let ev_headers = Event::Headers {
6846            list: req,
6847            more_frames: true,
6848        };
6849
6850        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
6851        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
6852
6853        assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
6854        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
6855
6856        assert_eq!(s.recv_body_server(stream, &mut recv_buf), Err(Error::Done));
6857        assert_eq!(s.poll_server(), Err(Error::Done));
6858    }
6859
6860    #[test]
6861    /// Tests that the Data event is properly re-armed.
6862    fn data_event_rearm() {
6863        let bytes = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
6864
6865        let mut s = Session::new().unwrap();
6866        s.handshake().unwrap();
6867
6868        let (r1_id, r1_hdrs) = s.send_request(false).unwrap();
6869
6870        let mut recv_buf = vec![0; bytes.len()];
6871
6872        let r1_ev_headers = Event::Headers {
6873            list: r1_hdrs,
6874            more_frames: true,
6875        };
6876
6877        // Manually send an incomplete DATA frame (i.e. the frame size is longer
6878        // than the actual data sent).
6879        {
6880            let mut d = [42; 10];
6881            let mut b = octets::OctetsMut::with_slice(&mut d);
6882
6883            b.put_varint(frame::DATA_FRAME_TYPE_ID).unwrap();
6884            b.put_varint(bytes.len() as u64).unwrap();
6885            let off = b.off();
6886            s.pipe.client.stream_send(r1_id, &d[..off], false).unwrap();
6887
6888            assert_eq!(
6889                s.pipe.client.stream_send(r1_id, &bytes[..5], false),
6890                Ok(5)
6891            );
6892
6893            s.advance().ok();
6894        }
6895
6896        assert_eq!(s.poll_server(), Ok((r1_id, r1_ev_headers)));
6897        assert_eq!(s.poll_server(), Ok((r1_id, Event::Data)));
6898        assert_eq!(s.poll_server(), Err(Error::Done));
6899
6900        // Read the available body data.
6901        assert_eq!(s.recv_body_server(r1_id, &mut recv_buf), Ok(5));
6902
6903        // Send the remaining DATA payload.
6904        assert_eq!(s.pipe.client.stream_send(r1_id, &bytes[5..], false), Ok(5));
6905        s.advance().ok();
6906
6907        assert_eq!(s.poll_server(), Ok((r1_id, Event::Data)));
6908        assert_eq!(s.poll_server(), Err(Error::Done));
6909
6910        // Read the rest of the body data.
6911        assert_eq!(s.recv_body_server(r1_id, &mut recv_buf), Ok(5));
6912        assert_eq!(s.poll_server(), Err(Error::Done));
6913
6914        // Send more data.
6915        let r1_body = s.send_body_client(r1_id, false).unwrap();
6916
6917        assert_eq!(s.poll_server(), Ok((r1_id, Event::Data)));
6918        assert_eq!(s.poll_server(), Err(Error::Done));
6919
6920        assert_eq!(s.recv_body_server(r1_id, &mut recv_buf), Ok(r1_body.len()));
6921
6922        // Send a new request to ensure cross-stream events don't break rearming.
6923        let (r2_id, r2_hdrs) = s.send_request(false).unwrap();
6924        let r2_ev_headers = Event::Headers {
6925            list: r2_hdrs,
6926            more_frames: true,
6927        };
6928        let r2_body = s.send_body_client(r2_id, false).unwrap();
6929
6930        s.advance().ok();
6931
6932        assert_eq!(s.poll_server(), Ok((r2_id, r2_ev_headers)));
6933        assert_eq!(s.poll_server(), Ok((r2_id, Event::Data)));
6934        assert_eq!(s.recv_body_server(r2_id, &mut recv_buf), Ok(r2_body.len()));
6935        assert_eq!(s.poll_server(), Err(Error::Done));
6936
6937        // Send more data on request 1, then trailing HEADERS.
6938        let r1_body = s.send_body_client(r1_id, false).unwrap();
6939
6940        let trailers = vec![Header::new(b"hello", b"world")];
6941
6942        s.client
6943            .send_headers(&mut s.pipe.client, r1_id, &trailers, true)
6944            .unwrap();
6945
6946        let r1_ev_trailers = Event::Headers {
6947            list: trailers.clone(),
6948            more_frames: false,
6949        };
6950
6951        s.advance().ok();
6952
6953        assert_eq!(s.poll_server(), Ok((r1_id, Event::Data)));
6954        assert_eq!(s.recv_body_server(r1_id, &mut recv_buf), Ok(r1_body.len()));
6955
6956        assert_eq!(s.poll_server(), Ok((r1_id, r1_ev_trailers)));
6957        assert_eq!(s.poll_server(), Ok((r1_id, Event::Finished)));
6958        assert_eq!(s.poll_server(), Err(Error::Done));
6959
6960        // Send more data on request 2, then trailing HEADERS.
6961        let r2_body = s.send_body_client(r2_id, false).unwrap();
6962
6963        s.client
6964            .send_headers(&mut s.pipe.client, r2_id, &trailers, false)
6965            .unwrap();
6966
6967        let r2_ev_trailers = Event::Headers {
6968            list: trailers,
6969            more_frames: true,
6970        };
6971
6972        s.advance().ok();
6973
6974        assert_eq!(s.poll_server(), Ok((r2_id, Event::Data)));
6975        assert_eq!(s.recv_body_server(r2_id, &mut recv_buf), Ok(r2_body.len()));
6976        assert_eq!(s.poll_server(), Ok((r2_id, r2_ev_trailers)));
6977        assert_eq!(s.poll_server(), Err(Error::Done));
6978
6979        let (r3_id, r3_hdrs) = s.send_request(false).unwrap();
6980
6981        let r3_ev_headers = Event::Headers {
6982            list: r3_hdrs,
6983            more_frames: true,
6984        };
6985
6986        // Manually send an incomplete DATA frame (i.e. only the header is sent).
6987        {
6988            let mut d = [42; 10];
6989            let mut b = octets::OctetsMut::with_slice(&mut d);
6990
6991            b.put_varint(frame::DATA_FRAME_TYPE_ID).unwrap();
6992            b.put_varint(bytes.len() as u64).unwrap();
6993            let off = b.off();
6994            s.pipe.client.stream_send(r3_id, &d[..off], false).unwrap();
6995
6996            s.advance().ok();
6997        }
6998
6999        assert_eq!(s.poll_server(), Ok((r3_id, r3_ev_headers)));
7000        assert_eq!(s.poll_server(), Ok((r3_id, Event::Data)));
7001        assert_eq!(s.poll_server(), Err(Error::Done));
7002
7003        assert_eq!(s.recv_body_server(r3_id, &mut recv_buf), Err(Error::Done));
7004
7005        assert_eq!(s.pipe.client.stream_send(r3_id, &bytes[..5], false), Ok(5));
7006
7007        s.advance().ok();
7008
7009        assert_eq!(s.poll_server(), Ok((r3_id, Event::Data)));
7010        assert_eq!(s.poll_server(), Err(Error::Done));
7011
7012        assert_eq!(s.recv_body_server(r3_id, &mut recv_buf), Ok(5));
7013
7014        assert_eq!(s.pipe.client.stream_send(r3_id, &bytes[5..], false), Ok(5));
7015        s.advance().ok();
7016
7017        assert_eq!(s.poll_server(), Ok((r3_id, Event::Data)));
7018        assert_eq!(s.poll_server(), Err(Error::Done));
7019
7020        assert_eq!(s.recv_body_server(r3_id, &mut recv_buf), Ok(5));
7021
7022        // Buffer multiple data frames.
7023        let body = s.send_body_client(r3_id, false).unwrap();
7024        s.send_body_client(r3_id, false).unwrap();
7025        s.send_body_client(r3_id, false).unwrap();
7026
7027        assert_eq!(s.poll_server(), Ok((r3_id, Event::Data)));
7028        assert_eq!(s.poll_server(), Err(Error::Done));
7029
7030        {
7031            let mut d = [42; 10];
7032            let mut b = octets::OctetsMut::with_slice(&mut d);
7033
7034            b.put_varint(frame::DATA_FRAME_TYPE_ID).unwrap();
7035            b.put_varint(0).unwrap();
7036            let off = b.off();
7037            s.pipe.client.stream_send(r3_id, &d[..off], true).unwrap();
7038
7039            s.advance().ok();
7040        }
7041
7042        let mut recv_buf = vec![0; bytes.len() * 3];
7043
7044        assert_eq!(s.recv_body_server(r3_id, &mut recv_buf), Ok(body.len() * 3));
7045    }
7046
7047    #[test]
7048    /// Tests that the Datagram event is properly re-armed.
7049    fn dgram_event_rearm() {
7050        let mut buf = [0; 65535];
7051
7052        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
7053        config
7054            .load_cert_chain_from_pem_file("examples/cert.crt")
7055            .unwrap();
7056        config
7057            .load_priv_key_from_pem_file("examples/cert.key")
7058            .unwrap();
7059        config.set_application_protos(&[b"h3"]).unwrap();
7060        config.set_initial_max_data(1500);
7061        config.set_initial_max_stream_data_bidi_local(150);
7062        config.set_initial_max_stream_data_bidi_remote(150);
7063        config.set_initial_max_stream_data_uni(150);
7064        config.set_initial_max_streams_bidi(100);
7065        config.set_initial_max_streams_uni(5);
7066        config.verify_peer(false);
7067        config.enable_dgram(true, 100, 100);
7068
7069        let h3_config = Config::new().unwrap();
7070        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
7071        s.handshake().unwrap();
7072
7073        // 10 bytes on flow ID 0 and 2.
7074        let flow_0_result = (11, 0, 1);
7075        let flow_2_result = (11, 2, 1);
7076
7077        // Send requests followed by DATAGRAMs on client side.
7078        let (stream, req) = s.send_request(false).unwrap();
7079
7080        let body = s.send_body_client(stream, true).unwrap();
7081
7082        let mut recv_buf = vec![0; body.len()];
7083
7084        let ev_headers = Event::Headers {
7085            list: req,
7086            more_frames: true,
7087        };
7088
7089        s.send_dgram_client(0).unwrap();
7090        s.send_dgram_client(0).unwrap();
7091        s.send_dgram_client(2).unwrap();
7092        s.send_dgram_client(2).unwrap();
7093
7094        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
7095        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
7096
7097        assert_eq!(s.poll_server(), Err(Error::Done));
7098        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
7099
7100        assert_eq!(s.poll_server(), Err(Error::Done));
7101        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
7102
7103        assert_eq!(s.poll_server(), Err(Error::Done));
7104        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
7105
7106        assert_eq!(s.poll_server(), Err(Error::Done));
7107        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
7108
7109        assert_eq!(s.poll_server(), Err(Error::Done));
7110
7111        s.send_dgram_client(0).unwrap();
7112        s.send_dgram_client(2).unwrap();
7113
7114        assert_eq!(s.poll_server(), Err(Error::Done));
7115
7116        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
7117        assert_eq!(s.poll_server(), Err(Error::Done));
7118
7119        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
7120        assert_eq!(s.poll_server(), Err(Error::Done));
7121
7122        assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
7123        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
7124
7125        // Verify that dgram counts are incremented.
7126        assert_eq!(s.pipe.client.dgram_sent_count, 6);
7127        assert_eq!(s.pipe.client.dgram_recv_count, 0);
7128        assert_eq!(s.pipe.server.dgram_sent_count, 0);
7129        assert_eq!(s.pipe.server.dgram_recv_count, 6);
7130
7131        let server_path = s.pipe.server.paths.get_active().expect("no active");
7132        let client_path = s.pipe.client.paths.get_active().expect("no active");
7133        assert_eq!(client_path.dgram_sent_count, 6);
7134        assert_eq!(client_path.dgram_recv_count, 0);
7135        assert_eq!(server_path.dgram_sent_count, 0);
7136        assert_eq!(server_path.dgram_recv_count, 6);
7137    }
7138
7139    #[test]
7140    fn reset_stream() {
7141        let mut buf = [0; 65535];
7142
7143        let mut s = Session::new().unwrap();
7144        s.handshake().unwrap();
7145
7146        // Client sends request.
7147        let (stream, req) = s.send_request(false).unwrap();
7148
7149        let ev_headers = Event::Headers {
7150            list: req,
7151            more_frames: true,
7152        };
7153
7154        // Server sends response and closes stream.
7155        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
7156        assert_eq!(s.poll_server(), Err(Error::Done));
7157
7158        let resp = s.send_response(stream, true).unwrap();
7159
7160        let ev_headers = Event::Headers {
7161            list: resp,
7162            more_frames: false,
7163        };
7164
7165        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
7166        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
7167        assert_eq!(s.poll_client(), Err(Error::Done));
7168
7169        // Client sends RESET_STREAM, closing stream.
7170        let frames = [crate::frame::Frame::ResetStream {
7171            stream_id: stream,
7172            error_code: 42,
7173            final_size: 68,
7174        }];
7175
7176        let pkt_type = crate::packet::Type::Short;
7177        assert_eq!(
7178            s.pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
7179            Ok(39)
7180        );
7181
7182        // Server issues Reset event for the stream.
7183        assert_eq!(s.poll_server(), Ok((stream, Event::Reset(42))));
7184        assert_eq!(s.poll_server(), Err(Error::Done));
7185
7186        // Sending RESET_STREAM again shouldn't trigger another Reset event.
7187        assert_eq!(
7188            s.pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
7189            Ok(39)
7190        );
7191
7192        assert_eq!(s.poll_server(), Err(Error::Done));
7193    }
7194
7195    /// The client shuts down the stream's write direction, the server
7196    /// shuts down its side with fin
7197    #[test]
7198    fn client_shutdown_write_server_fin() {
7199        let mut buf = [0; 65535];
7200        let mut s = Session::new().unwrap();
7201        s.handshake().unwrap();
7202
7203        // Client sends request.
7204        let (stream, req) = s.send_request(false).unwrap();
7205
7206        let ev_headers = Event::Headers {
7207            list: req,
7208            more_frames: true,
7209        };
7210
7211        // Server sends response and closes stream.
7212        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
7213        assert_eq!(s.poll_server(), Err(Error::Done));
7214
7215        let resp = s.send_response(stream, true).unwrap();
7216
7217        let ev_headers = Event::Headers {
7218            list: resp,
7219            more_frames: false,
7220        };
7221
7222        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
7223        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
7224        assert_eq!(s.poll_client(), Err(Error::Done));
7225
7226        // Client shuts down stream ==> sends RESET_STREAM
7227        assert_eq!(
7228            s.pipe
7229                .client
7230                .stream_shutdown(stream, crate::Shutdown::Write, 42),
7231            Ok(())
7232        );
7233        assert_eq!(s.advance(), Ok(()));
7234
7235        // Server sees the Reset event for the stream.
7236        assert_eq!(s.poll_server(), Ok((stream, Event::Reset(42))));
7237        assert_eq!(s.poll_server(), Err(Error::Done));
7238
7239        // Streams have been collected by quiche
7240        assert!(s.pipe.server.streams.is_collected(stream));
7241        assert!(s.pipe.client.streams.is_collected(stream));
7242
7243        // Client sends another request, server sends response without fin
7244        //
7245        let (stream, req) = s.send_request(false).unwrap();
7246
7247        let ev_headers = Event::Headers {
7248            list: req,
7249            more_frames: true,
7250        };
7251
7252        // Check that server has received the request.
7253        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
7254        assert_eq!(s.poll_server(), Err(Error::Done));
7255
7256        // Server sends reponse without closing the stream.
7257        let resp = s.send_response(stream, false).unwrap();
7258
7259        let ev_headers = Event::Headers {
7260            list: resp,
7261            more_frames: true,
7262        };
7263
7264        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
7265        assert_eq!(s.poll_client(), Err(Error::Done));
7266
7267        // Client shuts down stream ==> sends RESET_STREAM
7268        assert_eq!(
7269            s.pipe
7270                .client
7271                .stream_shutdown(stream, crate::Shutdown::Write, 42),
7272            Ok(())
7273        );
7274        assert_eq!(s.advance(), Ok(()));
7275
7276        // Server sees the Reset event for the stream.
7277        assert_eq!(s.poll_server(), Ok((stream, Event::Reset(42))));
7278        assert_eq!(s.poll_server(), Err(Error::Done));
7279
7280        // Server sends body and closes the stream.
7281        s.send_body_server(stream, true).unwrap();
7282
7283        // Stream has been collected on server by quiche
7284        assert!(s.pipe.server.streams.is_collected(stream));
7285        // Client stream has not been collected, the client needs to
7286        // read the fin from the stream first.
7287        assert!(!s.pipe.client.streams.is_collected(stream));
7288        assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
7289        s.recv_body_client(stream, &mut buf).unwrap();
7290        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
7291        assert_eq!(s.poll_client(), Err(Error::Done));
7292        assert!(s.pipe.client.streams.is_collected(stream));
7293    }
7294
7295    #[test]
7296    fn client_shutdown_read() {
7297        let mut buf = [0; 65535];
7298        let mut s = Session::new().unwrap();
7299        s.handshake().unwrap();
7300
7301        // Client sends request and leaves stream open.
7302        let (stream, req) = s.send_request(false).unwrap();
7303
7304        let ev_headers = Event::Headers {
7305            list: req,
7306            more_frames: true,
7307        };
7308
7309        // Server sends response and leaves stream open.
7310        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
7311        assert_eq!(s.poll_server(), Err(Error::Done));
7312
7313        let resp = s.send_response(stream, false).unwrap();
7314
7315        let ev_headers = Event::Headers {
7316            list: resp,
7317            more_frames: true,
7318        };
7319
7320        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
7321        assert_eq!(s.poll_client(), Err(Error::Done));
7322        // Client shuts down read
7323        assert_eq!(
7324            s.pipe
7325                .client
7326                .stream_shutdown(stream, crate::Shutdown::Read, 42),
7327            Ok(())
7328        );
7329        assert_eq!(s.advance(), Ok(()));
7330
7331        // Stream is writable on server side, but returns StreamStopped
7332        assert_eq!(s.poll_server(), Err(Error::Done));
7333        let writables: Vec<u64> = s.pipe.server.writable().collect();
7334        assert!(writables.contains(&stream));
7335        assert_eq!(
7336            s.send_body_server(stream, false),
7337            Err(Error::TransportError(crate::Error::StreamStopped(42)))
7338        );
7339
7340        // Client needs to finish its side by sending a fin
7341        assert_eq!(
7342            s.client.send_body(&mut s.pipe.client, stream, &[], true),
7343            Ok(0)
7344        );
7345        assert_eq!(s.advance(), Ok(()));
7346        // Note, we get an Event::Data for an empty buffer today. But it
7347        // would also be fine to not get it.
7348        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
7349        assert_eq!(s.recv_body_server(stream, &mut buf), Err(Error::Done));
7350        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
7351        assert_eq!(s.poll_server(), Err(Error::Done));
7352
7353        // Since the client has already send a fin, the stream is collected
7354        // on both client and server
7355        assert!(s.pipe.client.streams.is_collected(stream));
7356        assert!(s.pipe.server.streams.is_collected(stream));
7357    }
7358
7359    #[test]
7360    fn reset_finished_at_server() {
7361        let mut s = Session::new().unwrap();
7362        s.handshake().unwrap();
7363
7364        // Client sends HEADERS and doesn't fin
7365        let (stream, _req) = s.send_request(false).unwrap();
7366
7367        // ..then Client sends RESET_STREAM
7368        assert_eq!(
7369            s.pipe.client.stream_shutdown(0, crate::Shutdown::Write, 0),
7370            Ok(())
7371        );
7372
7373        assert_eq!(s.pipe.advance(), Ok(()));
7374
7375        // Server receives just a reset
7376        assert_eq!(s.poll_server(), Ok((stream, Event::Reset(0))));
7377        assert_eq!(s.poll_server(), Err(Error::Done));
7378
7379        // Client sends HEADERS and fin
7380        let (stream, req) = s.send_request(true).unwrap();
7381
7382        // ..then Client sends RESET_STREAM
7383        assert_eq!(
7384            s.pipe.client.stream_shutdown(4, crate::Shutdown::Write, 0),
7385            Ok(())
7386        );
7387
7388        let ev_headers = Event::Headers {
7389            list: req,
7390            more_frames: false,
7391        };
7392
7393        // Server receives headers and fin.
7394        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
7395        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
7396        assert_eq!(s.poll_server(), Err(Error::Done));
7397    }
7398
7399    #[test]
7400    fn reset_finished_at_server_with_data_pending() {
7401        let mut s = Session::new().unwrap();
7402        s.handshake().unwrap();
7403
7404        // Client sends HEADERS and doesn't fin.
7405        let (stream, req) = s.send_request(false).unwrap();
7406
7407        assert!(s.send_body_client(stream, false).is_ok());
7408
7409        assert_eq!(s.pipe.advance(), Ok(()));
7410
7411        let ev_headers = Event::Headers {
7412            list: req,
7413            more_frames: true,
7414        };
7415
7416        // Server receives headers and data...
7417        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
7418        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
7419
7420        // ..then Client sends RESET_STREAM.
7421        assert_eq!(
7422            s.pipe
7423                .client
7424                .stream_shutdown(stream, crate::Shutdown::Write, 0),
7425            Ok(())
7426        );
7427
7428        assert_eq!(s.pipe.advance(), Ok(()));
7429
7430        // The server does *not* attempt to read from the stream,
7431        // but polls and receives the reset and there are no more
7432        // readable streams.
7433        assert_eq!(s.poll_server(), Ok((stream, Event::Reset(0))));
7434        assert_eq!(s.poll_server(), Err(Error::Done));
7435        assert_eq!(s.pipe.server.readable().len(), 0);
7436    }
7437
7438    #[test]
7439    fn reset_finished_at_server_with_data_pending_2() {
7440        let mut s = Session::new().unwrap();
7441        s.handshake().unwrap();
7442
7443        // Client sends HEADERS and doesn't fin.
7444        let (stream, req) = s.send_request(false).unwrap();
7445
7446        assert!(s.send_body_client(stream, false).is_ok());
7447
7448        assert_eq!(s.pipe.advance(), Ok(()));
7449
7450        let ev_headers = Event::Headers {
7451            list: req,
7452            more_frames: true,
7453        };
7454
7455        // Server receives headers and data...
7456        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
7457        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
7458
7459        // ..then Client sends RESET_STREAM.
7460        assert_eq!(
7461            s.pipe
7462                .client
7463                .stream_shutdown(stream, crate::Shutdown::Write, 0),
7464            Ok(())
7465        );
7466
7467        assert_eq!(s.pipe.advance(), Ok(()));
7468
7469        // Server reads from the stream and receives the reset while
7470        // attempting to read.
7471        assert_eq!(
7472            s.recv_body_server(stream, &mut [0; 100]),
7473            Err(Error::TransportError(crate::Error::StreamReset(0)))
7474        );
7475
7476        // No more events and there are no more readable streams.
7477        assert_eq!(s.poll_server(), Err(Error::Done));
7478        assert_eq!(s.pipe.server.readable().len(), 0);
7479    }
7480
7481    #[test]
7482    fn reset_finished_at_client() {
7483        let mut buf = [0; 65535];
7484        let mut s = Session::new().unwrap();
7485        s.handshake().unwrap();
7486
7487        // Client sends HEADERS and doesn't fin
7488        let (stream, req) = s.send_request(false).unwrap();
7489
7490        let ev_headers = Event::Headers {
7491            list: req,
7492            more_frames: true,
7493        };
7494
7495        // Server receives headers.
7496        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
7497        assert_eq!(s.poll_server(), Err(Error::Done));
7498
7499        // Server sends response and doesn't fin
7500        s.send_response(stream, false).unwrap();
7501
7502        assert_eq!(s.pipe.advance(), Ok(()));
7503
7504        // .. then Server sends RESET_STREAM
7505        assert_eq!(
7506            s.pipe
7507                .server
7508                .stream_shutdown(stream, crate::Shutdown::Write, 0),
7509            Ok(())
7510        );
7511
7512        assert_eq!(s.pipe.advance(), Ok(()));
7513
7514        // Client receives Reset only
7515        assert_eq!(s.poll_client(), Ok((stream, Event::Reset(0))));
7516        assert_eq!(s.poll_server(), Err(Error::Done));
7517
7518        // Client sends headers and fin.
7519        let (stream, req) = s.send_request(true).unwrap();
7520
7521        let ev_headers = Event::Headers {
7522            list: req,
7523            more_frames: false,
7524        };
7525
7526        // Server receives headers and fin.
7527        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
7528        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
7529        assert_eq!(s.poll_server(), Err(Error::Done));
7530
7531        // Server sends response and fin
7532        let resp = s.send_response(stream, true).unwrap();
7533
7534        assert_eq!(s.pipe.advance(), Ok(()));
7535
7536        // ..then Server sends RESET_STREAM
7537        let frames = [crate::frame::Frame::ResetStream {
7538            stream_id: stream,
7539            error_code: 42,
7540            final_size: 68,
7541        }];
7542
7543        let pkt_type = crate::packet::Type::Short;
7544        assert_eq!(
7545            s.pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
7546            Ok(39)
7547        );
7548
7549        assert_eq!(s.pipe.advance(), Ok(()));
7550
7551        let ev_headers = Event::Headers {
7552            list: resp,
7553            more_frames: false,
7554        };
7555
7556        // Client receives headers and fin.
7557        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
7558        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
7559        assert_eq!(s.poll_client(), Err(Error::Done));
7560    }
7561}
7562
7563#[cfg(feature = "ffi")]
7564mod ffi;
7565#[cfg(feature = "internal")]
7566#[doc(hidden)]
7567pub mod frame;
7568#[cfg(not(feature = "internal"))]
7569mod frame;
7570#[doc(hidden)]
7571pub mod qpack;
7572mod stream;