h3i/actions/h3.rs
1// Copyright (C) 2024, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8// * Redistributions of source code must retain the above copyright notice,
9// this list of conditions and the following disclaimer.
10//
11// * Redistributions in binary form must reproduce the above copyright
12// notice, this list of conditions and the following disclaimer in the
13// documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27//! Actions specific to HTTP/3 and QUIC
28//!
29//! Actions are small operations such as sending HTTP/3 frames or managing QUIC
30//! streams. Each independent use case for h3i requires its own collection of
31//! Actions, that h3i iterates over in sequence and executes.
32
33use std::collections::HashMap;
34use std::time::Duration;
35
36use crate::quiche;
37use quiche::h3::frame::Frame;
38use quiche::h3::Header;
39use quiche::ConnectionError;
40use serde::Deserialize;
41use serde::Serialize;
42use serde_with::serde_as;
43
44use crate::encode_header_block;
45use crate::encode_header_block_literal;
46
47/// Expected result from a stream_send operation.
48#[derive(Clone, Debug, Default, PartialEq, Eq)]
49pub enum ExpectedStreamSendResult {
50 /// Expect success with any number of bytes written.
51 #[default]
52 Ok,
53 /// Expect success with exactly the specified number of bytes written.
54 OkExact(usize),
55 /// Expect the operation to fail with the specified error.
56 Error(quiche::Error),
57}
58
59/// An action which the HTTP/3 client should take.
60///
61/// The client iterates over a vector of said actions, executing each one
62/// sequentially. Note that packets will be flushed when said iteration has
63/// completed, regardless of if an [`Action::FlushPackets`] was the terminal
64/// action.
65#[derive(Clone, Debug, PartialEq, Eq)]
66pub enum Action {
67 /// Send a [quiche::h3::frame::Frame] over a stream.
68 SendFrame {
69 stream_id: u64,
70 fin_stream: bool,
71 frame: Frame,
72 expected_result: ExpectedStreamSendResult,
73 },
74
75 /// Send a HEADERS frame over a stream.
76 SendHeadersFrame {
77 stream_id: u64,
78 fin_stream: bool,
79 literal_headers: bool,
80 headers: Vec<Header>,
81 frame: Frame,
82 expected_result: ExpectedStreamSendResult,
83 },
84
85 /// Send arbitrary bytes over a stream.
86 StreamBytes {
87 stream_id: u64,
88 fin_stream: bool,
89 bytes: Vec<u8>,
90 expected_result: ExpectedStreamSendResult,
91 },
92
93 /// Send a DATAGRAM frame.
94 SendDatagram {
95 payload: Vec<u8>,
96 },
97
98 /// Open a new unidirectional stream.
99 OpenUniStream {
100 stream_id: u64,
101 fin_stream: bool,
102 stream_type: u64,
103 expected_result: ExpectedStreamSendResult,
104 },
105
106 /// Send a RESET_STREAM frame with the given error code.
107 ResetStream {
108 stream_id: u64,
109 error_code: u64,
110 },
111
112 /// Send a STOP_SENDING frame with the given error code.
113 StopSending {
114 stream_id: u64,
115 error_code: u64,
116 },
117
118 /// Send a CONNECTION_CLOSE frame with the given [`ConnectionError`].
119 ConnectionClose {
120 error: ConnectionError,
121 },
122
123 FlushPackets,
124
125 /// Wait for an event. See [WaitType] for the events.
126 Wait {
127 wait_type: WaitType,
128 },
129}
130
131/// Configure the wait behavior for a connection.
132#[serde_as]
133#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
134pub enum WaitType {
135 /// Wait for a time before firing the next action
136 #[serde(rename = "duration")]
137 WaitDuration(
138 #[serde_as(as = "serde_with::DurationMilliSecondsWithFrac<f64>")]
139 Duration,
140 ),
141 /// Wait for some form of a response before firing the next action. This can
142 /// be superseded in several cases:
143 /// 1. The peer resets the specified stream.
144 /// 2. The peer sends a `fin` over the specified stream
145 StreamEvent(StreamEvent),
146
147 /// Wait until the peer has updated MAX_STREAMS to allow creation
148 /// of the required additional streams.
149 ///
150 /// Typically requires processing of a MAX_STREAMS frame sent by
151 /// the peer. The peer may decide to send MAX_STREAMS updates as
152 /// the number of active streams changes due to stream termination
153 /// or stream creation. Typical goals being:
154 /// - Limit the number of active streams to the configured limit.
155 /// - Ensure that available quota is non-zero when number of active streams
156 /// is below the configured limit.
157 /// - Minimize the number of MAX_STREAM updates sent.
158 CanOpenNumStreams(RequiredStreamsQuota),
159}
160
161impl From<WaitType> for Action {
162 fn from(value: WaitType) -> Self {
163 Self::Wait { wait_type: value }
164 }
165}
166
167/// A response event, received over a stream, which will terminate the wait
168/// period.
169///
170/// See [StreamEventType] for the types of events.
171#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
172#[serde(rename = "snake_case")]
173pub struct StreamEvent {
174 pub stream_id: u64,
175 #[serde(rename = "type")]
176 pub event_type: StreamEventType,
177}
178
179/// A check for stream creation quota which will terminate the wait period.
180#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
181#[serde(rename = "snake_case")]
182pub struct RequiredStreamsQuota {
183 /// Required stream quota
184 pub num: u64,
185
186 /// True for bidirectional streams, false for unidirectional streams
187 pub bidi: bool,
188}
189
190/// Response that can terminate a wait period.
191#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
192#[serde(rename_all = "lowercase")]
193pub enum StreamEventType {
194 /// A HEADERS frame was received.
195 Headers,
196 /// A DATA frame was received.
197 Data,
198 /// The stream was somehow finished, either by a RESET_STREAM frame or via
199 /// the `fin` bit being set.
200 Finished,
201}
202
203#[derive(Debug, Default)]
204pub(crate) struct WaitingFor {
205 stream_events: HashMap<u64, Vec<StreamEvent>>,
206 required_stream_quota: Option<RequiredStreamsQuota>,
207}
208
209impl WaitingFor {
210 pub(crate) fn is_empty(&self) -> bool {
211 self.stream_events.values().all(|v| v.is_empty()) &&
212 self.required_stream_quota.is_none()
213 }
214
215 pub(crate) fn add_wait(&mut self, stream_event: &StreamEvent) {
216 self.stream_events
217 .entry(stream_event.stream_id)
218 .or_default()
219 .push(*stream_event);
220 }
221
222 pub(crate) fn set_required_stream_quota(
223 &mut self, required: RequiredStreamsQuota,
224 ) {
225 self.required_stream_quota = Some(required);
226 }
227
228 /// Check and clear the [`WaitType::CanOpenNumStreams`] condition if
229 /// `conn` now reports enough available streams.
230 pub(crate) fn check_can_open_num_streams<F: quiche::BufFactory>(
231 &mut self, conn: &quiche::Connection<F>,
232 ) {
233 if let Some(streams_required) = self.required_stream_quota {
234 let available_streams = if streams_required.bidi {
235 conn.peer_streams_left_bidi()
236 } else {
237 conn.peer_streams_left_uni()
238 };
239 if available_streams >= streams_required.num {
240 log::info!(
241 "required_stream_quota condition met \
242 (needed={}, available={}, bidi={})",
243 streams_required.num,
244 available_streams,
245 streams_required.bidi,
246 );
247 self.required_stream_quota = None;
248 }
249 }
250 }
251
252 pub(crate) fn remove_wait(&mut self, stream_event: StreamEvent) {
253 if let Some(waits) = self.stream_events.get_mut(&stream_event.stream_id) {
254 let old_len = waits.len();
255 waits.retain(|wait| wait != &stream_event);
256 let new_len = waits.len();
257
258 if old_len != new_len {
259 log::info!("No longer waiting for {stream_event:?}");
260 }
261 }
262 }
263
264 pub(crate) fn clear_waits_on_stream(&mut self, stream_id: u64) {
265 if let Some(waits) = self.stream_events.get_mut(&stream_id) {
266 if !waits.is_empty() {
267 log::info!("Clearing all waits for stream {stream_id}");
268 waits.clear();
269 }
270 }
271 }
272}
273
274/// Convenience to convert between header-related data and a
275/// [Action::SendHeadersFrame].
276pub fn send_headers_frame(
277 stream_id: u64, fin_stream: bool, headers: Vec<Header>,
278) -> Action {
279 let header_block = encode_header_block(&headers).unwrap();
280
281 Action::SendHeadersFrame {
282 stream_id,
283 fin_stream,
284 headers,
285 literal_headers: false,
286 frame: Frame::Headers { header_block },
287 expected_result: ExpectedStreamSendResult::Ok,
288 }
289}
290
291/// Convenience to convert between header-related data and a
292/// [Action::SendHeadersFrame].
293pub fn send_headers_frame_with_expected_result(
294 stream_id: u64, fin_stream: bool, headers: Vec<Header>,
295 expected_result: ExpectedStreamSendResult,
296) -> Action {
297 let header_block = encode_header_block(&headers).unwrap();
298
299 Action::SendHeadersFrame {
300 stream_id,
301 fin_stream,
302 headers,
303 literal_headers: false,
304 frame: Frame::Headers { header_block },
305 expected_result,
306 }
307}
308
309/// Convenience to convert between header-related data and a
310/// [Action::SendHeadersFrame]. Unlike [`send_headers_frame`],
311/// this version encodes the headers literally as they are provided,
312/// not converting the header names to lower-case.
313pub fn send_headers_frame_literal(
314 stream_id: u64, fin_stream: bool, headers: Vec<Header>,
315) -> Action {
316 let header_block = encode_header_block_literal(&headers).unwrap();
317
318 Action::SendHeadersFrame {
319 stream_id,
320 fin_stream,
321 headers,
322 literal_headers: true,
323 frame: Frame::Headers { header_block },
324 expected_result: ExpectedStreamSendResult::Ok,
325 }
326}
327
328/// Convenience to convert between header-related data and a
329/// [Action::SendHeadersFrame]. Unlike [`send_headers_frame`],
330/// this version encodes the headers literally as they are provided,
331/// not converting the header names to lower-case.
332pub fn send_headers_frame_literal_with_expected_result(
333 stream_id: u64, fin_stream: bool, headers: Vec<Header>,
334 expected_result: ExpectedStreamSendResult,
335) -> Action {
336 let header_block = encode_header_block_literal(&headers).unwrap();
337
338 Action::SendHeadersFrame {
339 stream_id,
340 fin_stream,
341 headers,
342 literal_headers: true,
343 frame: Frame::Headers { header_block },
344 expected_result,
345 }
346}