h3i/actions/
h3.rs

1// Copyright (C) 2024, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27//! Actions specific to HTTP/3 and QUIC
28//!
29//! Actions are small operations such as sending HTTP/3 frames or managing QUIC
30//! streams. Each independent use case for h3i requires its own collection of
31//! Actions, that h3i iterates over in sequence and executes.
32
33use std::collections::HashMap;
34use std::time::Duration;
35
36use crate::quiche;
37use quiche::h3::frame::Frame;
38use quiche::h3::Header;
39use quiche::ConnectionError;
40use serde::Deserialize;
41use serde::Serialize;
42use serde_with::serde_as;
43
44use crate::encode_header_block;
45use crate::encode_header_block_literal;
46
47/// An action which the HTTP/3 client should take.
48///
49/// The client iterates over a vector of said actions, executing each one
50/// sequentially. Note that packets will be flushed when said iteration has
51/// completed, regardless of if an [`Action::FlushPackets`] was the terminal
52/// action.
53#[derive(Clone, Debug, PartialEq, Eq)]
54pub enum Action {
55    /// Send a [quiche::h3::frame::Frame] over a stream.
56    SendFrame {
57        stream_id: u64,
58        fin_stream: bool,
59        frame: Frame,
60    },
61
62    /// Send a HEADERS frame over a stream.
63    SendHeadersFrame {
64        stream_id: u64,
65        fin_stream: bool,
66        literal_headers: bool,
67        headers: Vec<Header>,
68        frame: Frame,
69    },
70
71    /// Send arbitrary bytes over a stream.
72    StreamBytes {
73        stream_id: u64,
74        fin_stream: bool,
75        bytes: Vec<u8>,
76    },
77
78    /// Send a DATAGRAM frame.
79    SendDatagram {
80        payload: Vec<u8>,
81    },
82
83    /// Open a new unidirectional stream.
84    OpenUniStream {
85        stream_id: u64,
86        fin_stream: bool,
87        stream_type: u64,
88    },
89
90    /// Send a RESET_STREAM frame with the given error code.
91    ResetStream {
92        stream_id: u64,
93        error_code: u64,
94    },
95
96    /// Send a STOP_SENDING frame with the given error code.
97    StopSending {
98        stream_id: u64,
99        error_code: u64,
100    },
101
102    /// Send a CONNECTION_CLOSE frame with the given [`ConnectionError`].
103    ConnectionClose {
104        error: ConnectionError,
105    },
106
107    FlushPackets,
108
109    /// Wait for an event. See [WaitType] for the events.
110    Wait {
111        wait_type: WaitType,
112    },
113}
114
115/// Configure the wait behavior for a connection.
116#[serde_as]
117#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
118pub enum WaitType {
119    /// Wait for a time before firing the next action
120    #[serde(rename = "duration")]
121    WaitDuration(
122        #[serde_as(as = "serde_with::DurationMilliSecondsWithFrac<f64>")]
123        Duration,
124    ),
125    /// Wait for some form of a response before firing the next action. This can
126    /// be superseded in several cases:
127    /// 1. The peer resets the specified stream.
128    /// 2. The peer sends a `fin` over the specified stream
129    StreamEvent(StreamEvent),
130}
131
132impl From<WaitType> for Action {
133    fn from(value: WaitType) -> Self {
134        Self::Wait { wait_type: value }
135    }
136}
137
138/// A response event, received over a stream, which will terminate the wait
139/// period.
140///
141/// See [StreamEventType] for the types of events.
142#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
143#[serde(rename = "snake_case")]
144pub struct StreamEvent {
145    pub stream_id: u64,
146    #[serde(rename = "type")]
147    pub event_type: StreamEventType,
148}
149
150/// Response that can terminate a wait period.
151#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
152#[serde(rename_all = "lowercase")]
153pub enum StreamEventType {
154    /// A HEADERS frame was received.
155    Headers,
156    /// A DATA frame was received.
157    Data,
158    /// The stream was somehow finished, either by a RESET_STREAM frame or via
159    /// the `fin` bit being set.
160    Finished,
161}
162
163#[derive(Debug, Default)]
164pub(crate) struct WaitingFor(HashMap<u64, Vec<StreamEvent>>);
165
166impl WaitingFor {
167    pub(crate) fn is_empty(&self) -> bool {
168        self.0.values().all(|v| v.is_empty())
169    }
170
171    pub(crate) fn add_wait(&mut self, stream_event: &StreamEvent) {
172        self.0
173            .entry(stream_event.stream_id)
174            .or_default()
175            .push(*stream_event);
176    }
177
178    pub(crate) fn remove_wait(&mut self, stream_event: StreamEvent) {
179        if let Some(waits) = self.0.get_mut(&stream_event.stream_id) {
180            let old_len = waits.len();
181            waits.retain(|wait| wait != &stream_event);
182            let new_len = waits.len();
183
184            if old_len != new_len {
185                log::info!("No longer waiting for {stream_event:?}");
186            }
187        }
188    }
189
190    pub(crate) fn clear_waits_on_stream(&mut self, stream_id: u64) {
191        if let Some(waits) = self.0.get_mut(&stream_id) {
192            if !waits.is_empty() {
193                log::info!("Clearing all waits for stream {stream_id}");
194                waits.clear();
195            }
196        }
197    }
198}
199
200/// Convenience to convert between header-related data and a
201/// [Action::SendHeadersFrame].
202pub fn send_headers_frame(
203    stream_id: u64, fin_stream: bool, headers: Vec<Header>,
204) -> Action {
205    let header_block = encode_header_block(&headers).unwrap();
206
207    Action::SendHeadersFrame {
208        stream_id,
209        fin_stream,
210        headers,
211        literal_headers: false,
212        frame: Frame::Headers { header_block },
213    }
214}
215
216/// Convenience to convert between header-related data and a
217/// [Action::SendHeadersFrame]. Unlike [`send_headers_frame`],
218/// this version encodes the headers literally as they are provided,
219/// not converting the header names to lower-case.
220pub fn send_headers_frame_literal(
221    stream_id: u64, fin_stream: bool, headers: Vec<Header>,
222) -> Action {
223    let header_block = encode_header_block_literal(&headers).unwrap();
224
225    Action::SendHeadersFrame {
226        stream_id,
227        fin_stream,
228        headers,
229        literal_headers: true,
230        frame: Frame::Headers { header_block },
231    }
232}