Skip to main content

h3i/actions/
h3.rs

1// Copyright (C) 2024, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27//! Actions specific to HTTP/3 and QUIC
28//!
29//! Actions are small operations such as sending HTTP/3 frames or managing QUIC
30//! streams. Each independent use case for h3i requires its own collection of
31//! Actions, that h3i iterates over in sequence and executes.
32
33use std::collections::HashMap;
34use std::time::Duration;
35
36use crate::quiche;
37use quiche::h3::frame::Frame;
38use quiche::h3::Header;
39use quiche::ConnectionError;
40use serde::Deserialize;
41use serde::Serialize;
42use serde_with::serde_as;
43
44use crate::encode_header_block;
45use crate::encode_header_block_literal;
46
47/// Expected result from a stream_send operation.
48#[derive(Clone, Debug, Default, PartialEq, Eq)]
49pub enum ExpectedStreamSendResult {
50    /// Expect success with any number of bytes written.
51    #[default]
52    Ok,
53    /// Expect success with exactly the specified number of bytes written.
54    OkExact(usize),
55    /// Expect the operation to fail with the specified error.
56    Error(quiche::Error),
57}
58
59/// An action which the HTTP/3 client should take.
60///
61/// The client iterates over a vector of said actions, executing each one
62/// sequentially. Note that packets will be flushed when said iteration has
63/// completed, regardless of if an [`Action::FlushPackets`] was the terminal
64/// action.
65#[derive(Clone, Debug, PartialEq, Eq)]
66pub enum Action {
67    /// Send a [quiche::h3::frame::Frame] over a stream.
68    SendFrame {
69        stream_id: u64,
70        fin_stream: bool,
71        frame: Frame,
72        expected_result: ExpectedStreamSendResult,
73    },
74
75    /// Send a HEADERS frame over a stream.
76    SendHeadersFrame {
77        stream_id: u64,
78        fin_stream: bool,
79        literal_headers: bool,
80        headers: Vec<Header>,
81        frame: Frame,
82        expected_result: ExpectedStreamSendResult,
83    },
84
85    /// Send arbitrary bytes over a stream.
86    StreamBytes {
87        stream_id: u64,
88        fin_stream: bool,
89        bytes: Vec<u8>,
90        expected_result: ExpectedStreamSendResult,
91    },
92
93    /// Send a DATAGRAM frame.
94    SendDatagram {
95        payload: Vec<u8>,
96    },
97
98    /// Open a new unidirectional stream.
99    OpenUniStream {
100        stream_id: u64,
101        fin_stream: bool,
102        stream_type: u64,
103        expected_result: ExpectedStreamSendResult,
104    },
105
106    /// Send a RESET_STREAM frame with the given error code.
107    ResetStream {
108        stream_id: u64,
109        error_code: u64,
110    },
111
112    /// Send a STOP_SENDING frame with the given error code.
113    StopSending {
114        stream_id: u64,
115        error_code: u64,
116    },
117
118    /// Send a CONNECTION_CLOSE frame with the given [`ConnectionError`].
119    ConnectionClose {
120        error: ConnectionError,
121    },
122
123    FlushPackets,
124
125    /// Wait for an event. See [WaitType] for the events.
126    Wait {
127        wait_type: WaitType,
128    },
129}
130
131/// Configure the wait behavior for a connection.
132#[serde_as]
133#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
134pub enum WaitType {
135    /// Wait for a time before firing the next action
136    #[serde(rename = "duration")]
137    WaitDuration(
138        #[serde_as(as = "serde_with::DurationMilliSecondsWithFrac<f64>")]
139        Duration,
140    ),
141    /// Wait for some form of a response before firing the next action. This can
142    /// be superseded in several cases:
143    /// 1. The peer resets the specified stream.
144    /// 2. The peer sends a `fin` over the specified stream
145    StreamEvent(StreamEvent),
146
147    /// Wait until the peer has updated MAX_STREAMS to allow creation
148    /// of the required additional streams.
149    ///
150    /// Typically requires processing of a MAX_STREAMS frame sent by
151    /// the peer.  The peer may decide to send MAX_STREAMS updates as
152    /// the number of active streams changes due to stream termination
153    /// or stream creation. Typical goals being:
154    /// - Limit the number of active streams to the configured limit.
155    /// - Ensure that available quota is non-zero when number of active streams
156    ///   is below the configured limit.
157    /// - Minimize the number of MAX_STREAM updates sent.
158    CanOpenNumStreams(RequiredStreamsQuota),
159}
160
161impl From<WaitType> for Action {
162    fn from(value: WaitType) -> Self {
163        Self::Wait { wait_type: value }
164    }
165}
166
167/// A response event, received over a stream, which will terminate the wait
168/// period.
169///
170/// See [StreamEventType] for the types of events.
171#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
172#[serde(rename = "snake_case")]
173pub struct StreamEvent {
174    pub stream_id: u64,
175    #[serde(rename = "type")]
176    pub event_type: StreamEventType,
177}
178
179/// A check for stream creation quota which will terminate the wait period.
180#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
181#[serde(rename = "snake_case")]
182pub struct RequiredStreamsQuota {
183    /// Required stream quota
184    pub num: u64,
185
186    /// True for bidirectional streams, false for unidirectional streams
187    pub bidi: bool,
188}
189
190/// Response that can terminate a wait period.
191#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
192#[serde(rename_all = "lowercase")]
193pub enum StreamEventType {
194    /// A HEADERS frame was received.
195    Headers,
196    /// A DATA frame was received.
197    Data,
198    /// The stream was somehow finished, either by a RESET_STREAM frame or via
199    /// the `fin` bit being set.
200    Finished,
201}
202
203#[derive(Debug, Default)]
204pub(crate) struct WaitingFor {
205    stream_events: HashMap<u64, Vec<StreamEvent>>,
206    required_stream_quota: Option<RequiredStreamsQuota>,
207}
208
209impl WaitingFor {
210    pub(crate) fn is_empty(&self) -> bool {
211        self.stream_events.values().all(|v| v.is_empty()) &&
212            self.required_stream_quota.is_none()
213    }
214
215    pub(crate) fn add_wait(&mut self, stream_event: &StreamEvent) {
216        self.stream_events
217            .entry(stream_event.stream_id)
218            .or_default()
219            .push(*stream_event);
220    }
221
222    pub(crate) fn set_required_stream_quota(
223        &mut self, required: RequiredStreamsQuota,
224    ) {
225        self.required_stream_quota = Some(required);
226    }
227
228    /// Check and clear the [`WaitType::CanOpenNumStreams`] condition if
229    /// `conn` now reports enough available streams.
230    pub(crate) fn check_can_open_num_streams<F: quiche::BufFactory>(
231        &mut self, conn: &quiche::Connection<F>,
232    ) {
233        if let Some(streams_required) = self.required_stream_quota {
234            let available_streams = if streams_required.bidi {
235                conn.peer_streams_left_bidi()
236            } else {
237                conn.peer_streams_left_uni()
238            };
239            if available_streams >= streams_required.num {
240                log::info!(
241                    "required_stream_quota condition met \
242                     (needed={}, available={}, bidi={})",
243                    streams_required.num,
244                    available_streams,
245                    streams_required.bidi,
246                );
247                self.required_stream_quota = None;
248            }
249        }
250    }
251
252    pub(crate) fn remove_wait(&mut self, stream_event: StreamEvent) {
253        if let Some(waits) = self.stream_events.get_mut(&stream_event.stream_id) {
254            let old_len = waits.len();
255            waits.retain(|wait| wait != &stream_event);
256            let new_len = waits.len();
257
258            if old_len != new_len {
259                log::info!("No longer waiting for {stream_event:?}");
260            }
261        }
262    }
263
264    pub(crate) fn clear_waits_on_stream(&mut self, stream_id: u64) {
265        if let Some(waits) = self.stream_events.get_mut(&stream_id) {
266            if !waits.is_empty() {
267                log::info!("Clearing all waits for stream {stream_id}");
268                waits.clear();
269            }
270        }
271    }
272}
273
274/// Convenience to convert between header-related data and a
275/// [Action::SendHeadersFrame].
276pub fn send_headers_frame(
277    stream_id: u64, fin_stream: bool, headers: Vec<Header>,
278) -> Action {
279    let header_block = encode_header_block(&headers).unwrap();
280
281    Action::SendHeadersFrame {
282        stream_id,
283        fin_stream,
284        headers,
285        literal_headers: false,
286        frame: Frame::Headers { header_block },
287        expected_result: ExpectedStreamSendResult::Ok,
288    }
289}
290
291/// Convenience to convert between header-related data and a
292/// [Action::SendHeadersFrame].
293pub fn send_headers_frame_with_expected_result(
294    stream_id: u64, fin_stream: bool, headers: Vec<Header>,
295    expected_result: ExpectedStreamSendResult,
296) -> Action {
297    let header_block = encode_header_block(&headers).unwrap();
298
299    Action::SendHeadersFrame {
300        stream_id,
301        fin_stream,
302        headers,
303        literal_headers: false,
304        frame: Frame::Headers { header_block },
305        expected_result,
306    }
307}
308
309/// Convenience to convert between header-related data and a
310/// [Action::SendHeadersFrame]. Unlike [`send_headers_frame`],
311/// this version encodes the headers literally as they are provided,
312/// not converting the header names to lower-case.
313pub fn send_headers_frame_literal(
314    stream_id: u64, fin_stream: bool, headers: Vec<Header>,
315) -> Action {
316    let header_block = encode_header_block_literal(&headers).unwrap();
317
318    Action::SendHeadersFrame {
319        stream_id,
320        fin_stream,
321        headers,
322        literal_headers: true,
323        frame: Frame::Headers { header_block },
324        expected_result: ExpectedStreamSendResult::Ok,
325    }
326}
327
328/// Convenience to convert between header-related data and a
329/// [Action::SendHeadersFrame]. Unlike [`send_headers_frame`],
330/// this version encodes the headers literally as they are provided,
331/// not converting the header names to lower-case.
332pub fn send_headers_frame_literal_with_expected_result(
333    stream_id: u64, fin_stream: bool, headers: Vec<Header>,
334    expected_result: ExpectedStreamSendResult,
335) -> Action {
336    let header_block = encode_header_block_literal(&headers).unwrap();
337
338    Action::SendHeadersFrame {
339        stream_id,
340        fin_stream,
341        headers,
342        literal_headers: true,
343        frame: Frame::Headers { header_block },
344        expected_result,
345    }
346}