tokio_quiche/http3/driver/client.rs
1// Copyright (C) 2025, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8// * Redistributions of source code must retain the above copyright notice,
9// this list of conditions and the following disclaimer.
10//
11// * Redistributions in binary form must reproduce the above copyright
12// notice, this list of conditions and the following disclaimer in the
13// documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27use std::collections::BTreeMap;
28use std::collections::VecDeque;
29use std::sync::Arc;
30use std::time::Duration;
31
32use foundations::telemetry::log;
33use quiche::h3;
34use tokio::sync::mpsc;
35use tokio::sync::oneshot;
36
37use super::datagram;
38use super::DriverHooks;
39use super::H3Command;
40use super::H3ConnectionError;
41use super::H3ConnectionResult;
42use super::H3Controller;
43use super::H3Driver;
44use super::H3Event;
45use super::InboundFrameStream;
46use super::InboundHeaders;
47use super::IncomingH3Headers;
48use super::OutboundFrameSender;
49use super::RequestSender;
50use super::StreamCtx;
51use super::STREAM_CAPACITY;
52use crate::http3::settings::Http3Settings;
53use crate::quic::HandshakeInfo;
54use crate::quic::QuicCommand;
55use crate::quic::QuicheConnection;
56
57/// An [H3Driver] for a client-side HTTP/3 connection. See [H3Driver] for
58/// details. Emits [`ClientH3Event`]s and expects [`ClientH3Command`]s for
59/// control.
60pub type ClientH3Driver = H3Driver<ClientHooks>;
61/// The [H3Controller] type paired with [ClientH3Driver]. See [H3Controller] for
62/// details.
63pub type ClientH3Controller = H3Controller<ClientHooks>;
64/// Receives [`ClientH3Event`]s from a [ClientH3Driver]. This is the control
65/// stream which describes what is happening on the connection, but does not
66/// transfer data.
67pub type ClientEventStream = mpsc::UnboundedReceiver<ClientH3Event>;
68/// A [RequestSender] to send HTTP requests over a [ClientH3Driver]'s
69/// connection.
70pub type ClientRequestSender = RequestSender<ClientH3Command, NewClientRequest>;
71
72/// An HTTP request sent using a [ClientRequestSender] to the [ClientH3Driver].
73#[derive(Debug)]
74pub struct NewClientRequest {
75 /// A user-defined identifier to match [`ClientH3Event::NewOutboundRequest`]
76 /// to its original [`NewClientRequest`]. This ID is not used anywhere else.
77 pub request_id: u64,
78 /// The [`h3::Header`]s that make up this request.
79 pub headers: Vec<h3::Header>,
80 /// A sender to pass the request's [`OutboundFrameSender`] to the request
81 /// body.
82 pub body_writer: Option<oneshot::Sender<OutboundFrameSender>>,
83}
84
85/// Events produced by [ClientH3Driver].
86#[derive(Debug)]
87pub enum ClientH3Event {
88 Core(H3Event),
89 /// Headers for the request with the given `request_id` were sent on
90 /// `stream_id`. The body, if there is one, could still be sending.
91 NewOutboundRequest {
92 stream_id: u64,
93 request_id: u64,
94 },
95}
96
97impl From<H3Event> for ClientH3Event {
98 fn from(ev: H3Event) -> Self {
99 Self::Core(ev)
100 }
101}
102
103/// Commands accepted by [ClientH3Driver].
104#[derive(Debug)]
105pub enum ClientH3Command {
106 Core(H3Command),
107 /// Send a new HTTP request over the [`quiche::h3::Connection`]. The driver
108 /// will allocate a stream ID and report it back to the controller via
109 /// [`ClientH3Event::NewOutboundRequest`].
110 ClientRequest(NewClientRequest),
111}
112
113impl From<H3Command> for ClientH3Command {
114 fn from(cmd: H3Command) -> Self {
115 Self::Core(cmd)
116 }
117}
118
119impl From<QuicCommand> for ClientH3Command {
120 fn from(cmd: QuicCommand) -> Self {
121 Self::Core(H3Command::QuicCmd(cmd))
122 }
123}
124
125impl From<NewClientRequest> for ClientH3Command {
126 fn from(req: NewClientRequest) -> Self {
127 Self::ClientRequest(req)
128 }
129}
130
131/// A [`PendingClientRequest`] is a request which has not yet received a
132/// response.
133///
134/// The `send` and `recv` halves are passed to the [ClientH3Controller] in an
135/// [`H3Event::IncomingHeaders`] once the server's response has been received.
136struct PendingClientRequest {
137 send: OutboundFrameSender,
138 recv: InboundFrameStream,
139}
140
141/// Retry delay when `StreamBlocked` or `StreamLimit` is returned by
142/// `send_request`. Both errors are transient: the peer will open more
143/// stream credit or flow-control window within a few milliseconds.
144const BLOCKED_RETRY_DELAY: Duration = Duration::from_millis(5);
145
146pub struct ClientHooks {
147 /// Mapping from stream IDs to the associated [`PendingClientRequest`].
148 pending_requests: BTreeMap<u64, PendingClientRequest>,
149 /// Requests that could not be sent yet due to `StreamBlocked` or
150 /// `StreamLimit`. They will be retried after [`BLOCKED_RETRY_DELAY`].
151 queued_requests: VecDeque<NewClientRequest>,
152 /// A sender back into the driver's own command channel, used to
153 /// re-enqueue blocked requests after the retry delay.
154 ///
155 /// Initialised in `conn_established`; `None` before the connection
156 /// is established.
157 self_cmd_sender: Option<mpsc::UnboundedSender<ClientH3Command>>,
158}
159
160impl ClientHooks {
161 /// Returns `true` when `err` from `h3::Connection::send_request` means the
162 /// request should be retried after a short delay rather than treated as a
163 /// fatal connection error.
164 ///
165 /// `send_request` rolls back any partial stream state before returning
166 /// these errors, so retrying the call with the same arguments is safe.
167 ///
168 /// * `StreamBlocked` — the QUIC stream's flow-control window is temporarily
169 /// exhausted; the stream entry is removed by `send_request` before
170 /// returning, so the stream ID is not consumed.
171 /// * `TransportError(StreamLimit)` — the peer's concurrent-stream limit has
172 /// been reached; QUIC will deliver a MAX_STREAMS frame when credit opens
173 /// up.
174 fn is_retriable_send_error(err: &h3::Error) -> bool {
175 matches!(
176 err,
177 h3::Error::StreamBlocked |
178 h3::Error::TransportError(quiche::Error::StreamLimit)
179 )
180 }
181
182 /// Initiates a client-side request. This sends the request, stores the
183 /// [`PendingClientRequest`] and allocates a new stream plus potential
184 /// DATAGRAM flow (CONNECT-{UDP,IP}).
185 ///
186 /// If the connection temporarily cannot open a new stream (`StreamBlocked`
187 /// or `StreamLimit`), the request is pushed onto `queued_requests` and
188 /// will be retried after [`BLOCKED_RETRY_DELAY`].
189 fn initiate_request(
190 driver: &mut H3Driver<Self>, qconn: &mut QuicheConnection,
191 request: NewClientRequest,
192 ) -> H3ConnectionResult<()> {
193 let body_finished = request.body_writer.is_none();
194
195 let stream_id = match driver.conn_mut()?.send_request(
196 qconn,
197 &request.headers,
198 body_finished,
199 ) {
200 Ok(id) => id,
201 Err(ref err) if Self::is_retriable_send_error(err) => {
202 log::debug!(
203 "send_request blocked, queuing for retry";
204 "request_id" => request.request_id,
205 "error" => %err,
206 );
207 driver.hooks.queued_requests.push_back(request);
208 return Ok(());
209 },
210 Err(err) => return Err(H3ConnectionError::from(err)),
211 };
212
213 // log::info!("sent h3 request"; "stream_id" => stream_id);
214 let (mut stream_ctx, send, recv) =
215 StreamCtx::new(stream_id, STREAM_CAPACITY);
216
217 if let Some(quarter_stream_id) =
218 datagram::extract_quarter_stream_id(stream_id, &request.headers)
219 {
220 log::info!(
221 "creating new flow for MASQUE request";
222 "stream_id" => stream_id,
223 "quarter_stream_id" => quarter_stream_id,
224 );
225 let _ = driver.get_or_insert_flow(quarter_stream_id)?;
226 stream_ctx.associated_dgram_flow_id = Some(quarter_stream_id);
227 }
228
229 if let Some(body_writer) = request.body_writer {
230 let _ = body_writer.send(send.clone());
231 driver
232 .waiting_streams
233 .push(stream_ctx.wait_for_recv(stream_id));
234 }
235
236 driver.insert_stream(stream_id, stream_ctx);
237 driver
238 .hooks
239 .pending_requests
240 .insert(stream_id, PendingClientRequest { send, recv });
241
242 // Notify the H3Controller that we've allocated a stream_id for a
243 // given request_id.
244 let _ = driver
245 .h3_event_sender
246 .send(ClientH3Event::NewOutboundRequest {
247 stream_id,
248 request_id: request.request_id,
249 });
250
251 Ok(())
252 }
253
254 /// Handles a response from the peer by sending a relevant [`H3Event`] to
255 /// the [ClientH3Controller] for application-level processing.
256 fn handle_response(
257 driver: &mut H3Driver<Self>, headers: InboundHeaders,
258 pending_request: PendingClientRequest,
259 ) -> H3ConnectionResult<()> {
260 let InboundHeaders {
261 stream_id,
262 headers,
263 has_body,
264 } = headers;
265
266 let Some(stream_ctx) = driver.stream_map.get(&stream_id) else {
267 // todo(fisher): send better error to client
268 return Err(H3ConnectionError::NonexistentStream);
269 };
270
271 let headers = IncomingH3Headers {
272 stream_id,
273 headers,
274 send: pending_request.send,
275 recv: pending_request.recv,
276 read_fin: !has_body,
277 h3_audit_stats: Arc::clone(&stream_ctx.audit_stats),
278 };
279
280 driver
281 .h3_event_sender
282 .send(H3Event::IncomingHeaders(headers).into())
283 .map_err(|_| H3ConnectionError::ControllerWentAway)
284 }
285}
286
287#[allow(private_interfaces)]
288impl DriverHooks for ClientHooks {
289 type Command = ClientH3Command;
290 type Event = ClientH3Event;
291
292 fn new(_settings: &Http3Settings) -> Self {
293 Self {
294 pending_requests: BTreeMap::new(),
295 queued_requests: VecDeque::new(),
296 self_cmd_sender: None,
297 }
298 }
299
300 fn conn_established(
301 driver: &mut H3Driver<Self>, qconn: &mut QuicheConnection,
302 _handshake_info: &HandshakeInfo,
303 ) -> H3ConnectionResult<()> {
304 assert!(
305 !qconn.is_server(),
306 "ClientH3Driver requires a client-side QUIC connection"
307 );
308 driver.hooks.self_cmd_sender = Some(driver.self_cmd_sender().clone());
309 Ok(())
310 }
311
312 fn headers_received(
313 driver: &mut H3Driver<Self>, _qconn: &mut QuicheConnection,
314 headers: InboundHeaders,
315 ) -> H3ConnectionResult<()> {
316 let Some(pending_request) =
317 driver.hooks.pending_requests.remove(&headers.stream_id)
318 else {
319 // todo(fisher): better handling when an unknown stream_id is
320 // encountered.
321 return Ok(());
322 };
323 Self::handle_response(driver, headers, pending_request)
324 }
325
326 fn conn_command(
327 driver: &mut H3Driver<Self>, qconn: &mut QuicheConnection,
328 cmd: Self::Command,
329 ) -> H3ConnectionResult<()> {
330 match cmd {
331 ClientH3Command::Core(c) => driver.handle_core_command(qconn, c),
332 ClientH3Command::ClientRequest(req) =>
333 Self::initiate_request(driver, qconn, req),
334 }
335 }
336
337 fn has_wait_action(driver: &mut H3Driver<Self>) -> bool {
338 !driver.hooks.queued_requests.is_empty()
339 }
340
341 async fn wait_for_action(
342 &mut self, qconn: &mut QuicheConnection,
343 ) -> H3ConnectionResult<()> {
344 // Sleep briefly to let the peer open stream credit or raise the
345 // MAX_STREAMS limit, then re-enqueue waiting requests back into
346 // the driver's command channel so that `conn_command` can retry them
347 // with a full `&mut H3Driver` on the next loop iteration.
348 //
349 // Only drain up to the number of available bidi streams to avoid
350 // re-queueing requests that would immediately fail.
351 tokio::time::sleep(BLOCKED_RETRY_DELAY).await;
352
353 let Some(sender) = &self.self_cmd_sender else {
354 // Should not happen: `conn_established` always sets this.
355 return Ok(());
356 };
357
358 let streams_left = qconn.peer_streams_left_bidi() as usize;
359 let to_drain = streams_left.min(self.queued_requests.len());
360
361 for request in self.queued_requests.drain(..to_drain) {
362 log::debug!(
363 "retrying queued request after stream-blocked delay";
364 "request_id" => request.request_id,
365 );
366 let _ = sender.send(ClientH3Command::ClientRequest(request));
367 }
368
369 Ok(())
370 }
371}
372
373impl ClientH3Controller {
374 /// Creates a [`NewClientRequest`] sender for the paired [ClientH3Driver].
375 pub fn request_sender(&self) -> ClientRequestSender {
376 RequestSender {
377 sender: self.cmd_sender.clone(),
378 _r: Default::default(),
379 }
380 }
381}