tokio_quiche/http3/driver/client.rs
1// Copyright (C) 2025, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8// * Redistributions of source code must retain the above copyright notice,
9// this list of conditions and the following disclaimer.
10//
11// * Redistributions in binary form must reproduce the above copyright
12// notice, this list of conditions and the following disclaimer in the
13// documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27use std::collections::BTreeMap;
28use std::collections::VecDeque;
29use std::sync::Arc;
30use std::time::Duration;
31
32use datagram_socket::StreamClosureKind;
33use foundations::telemetry::log;
34use quiche::h3;
35use tokio::sync::mpsc;
36use tokio::sync::oneshot;
37
38use super::datagram;
39use super::DriverHooks;
40use super::H3Command;
41use super::H3ConnectionError;
42use super::H3ConnectionResult;
43use super::H3Controller;
44use super::H3Driver;
45use super::H3Event;
46use super::InboundFrameStream;
47use super::InboundHeaders;
48use super::IncomingH3Headers;
49use super::OutboundFrameSender;
50use super::RequestSender;
51use super::StreamCtx;
52use super::STREAM_CAPACITY;
53use crate::http3::settings::Http3Settings;
54use crate::quic::HandshakeInfo;
55use crate::quic::QuicCommand;
56use crate::quic::QuicheConnection;
57
58/// An [H3Driver] for a client-side HTTP/3 connection. See [H3Driver] for
59/// details. Emits [`ClientH3Event`]s and expects [`ClientH3Command`]s for
60/// control.
61pub type ClientH3Driver = H3Driver<ClientHooks>;
62/// The [H3Controller] type paired with [ClientH3Driver]. See [H3Controller] for
63/// details.
64pub type ClientH3Controller = H3Controller<ClientHooks>;
65/// Receives [`ClientH3Event`]s from a [ClientH3Driver]. This is the control
66/// stream which describes what is happening on the connection, but does not
67/// transfer data.
68pub type ClientEventStream = mpsc::UnboundedReceiver<ClientH3Event>;
69/// A [RequestSender] to send HTTP requests over a [ClientH3Driver]'s
70/// connection.
71pub type ClientRequestSender = RequestSender<ClientH3Command, NewClientRequest>;
72
73/// An HTTP request sent using a [ClientRequestSender] to the [ClientH3Driver].
74#[derive(Debug)]
75pub struct NewClientRequest {
76 /// A user-defined identifier to match [`ClientH3Event::NewOutboundRequest`]
77 /// to its original [`NewClientRequest`]. This ID is not used anywhere else.
78 pub request_id: u64,
79 /// The [`h3::Header`]s that make up this request.
80 pub headers: Vec<h3::Header>,
81 /// A sender to pass the request's [`OutboundFrameSender`] to the request
82 /// body.
83 pub body_writer: Option<oneshot::Sender<OutboundFrameSender>>,
84}
85
86/// Events produced by [ClientH3Driver].
87#[derive(Debug)]
88pub enum ClientH3Event {
89 Core(H3Event),
90 /// Headers for the request with the given `request_id` were sent on
91 /// `stream_id`. The body, if there is one, could still be sending.
92 NewOutboundRequest {
93 stream_id: u64,
94 request_id: u64,
95 },
96}
97
98impl From<H3Event> for ClientH3Event {
99 fn from(ev: H3Event) -> Self {
100 Self::Core(ev)
101 }
102}
103
104/// Commands accepted by [ClientH3Driver].
105#[derive(Debug)]
106pub enum ClientH3Command {
107 Core(H3Command),
108 /// Send a new HTTP request over the [`quiche::h3::Connection`]. The driver
109 /// will allocate a stream ID and report it back to the controller via
110 /// [`ClientH3Event::NewOutboundRequest`].
111 ClientRequest(NewClientRequest),
112}
113
114impl From<H3Command> for ClientH3Command {
115 fn from(cmd: H3Command) -> Self {
116 Self::Core(cmd)
117 }
118}
119
120impl From<QuicCommand> for ClientH3Command {
121 fn from(cmd: QuicCommand) -> Self {
122 Self::Core(H3Command::QuicCmd(cmd))
123 }
124}
125
126impl From<NewClientRequest> for ClientH3Command {
127 fn from(req: NewClientRequest) -> Self {
128 Self::ClientRequest(req)
129 }
130}
131
132/// A [`PendingClientRequest`] is a request which has not yet received a
133/// response.
134///
135/// The `send` and `recv` halves are passed to the [ClientH3Controller] in an
136/// [`H3Event::IncomingHeaders`] once the server's response has been received.
137struct PendingClientRequest {
138 send: OutboundFrameSender,
139 recv: InboundFrameStream,
140}
141
142/// Retry delay when `StreamBlocked` or `StreamLimit` is returned by
143/// `send_request`. Both errors are transient: the peer will open more
144/// stream credit or flow-control window within a few milliseconds.
145const BLOCKED_RETRY_DELAY: Duration = Duration::from_millis(5);
146
147pub struct ClientHooks {
148 /// Mapping from stream IDs to the associated [`PendingClientRequest`].
149 pending_requests: BTreeMap<u64, PendingClientRequest>,
150 /// Requests that could not be sent yet due to `StreamBlocked` or
151 /// `StreamLimit`. They will be retried after [`BLOCKED_RETRY_DELAY`].
152 queued_requests: VecDeque<NewClientRequest>,
153 /// A sender back into the driver's own command channel, used to
154 /// re-enqueue blocked requests after the retry delay.
155 ///
156 /// Initialised in `conn_established`; `None` before the connection
157 /// is established.
158 self_cmd_sender: Option<mpsc::UnboundedSender<ClientH3Command>>,
159}
160
161impl ClientHooks {
162 /// Returns `true` when `err` from `h3::Connection::send_request` means the
163 /// request should be retried after a short delay rather than treated as a
164 /// fatal connection error.
165 ///
166 /// `send_request` rolls back any partial stream state before returning
167 /// these errors, so retrying the call with the same arguments is safe.
168 ///
169 /// * `StreamBlocked` — the QUIC stream's flow-control window is temporarily
170 /// exhausted; the stream entry is removed by `send_request` before
171 /// returning, so the stream ID is not consumed.
172 /// * `TransportError(StreamLimit)` — the peer's concurrent-stream limit has
173 /// been reached; QUIC will deliver a MAX_STREAMS frame when credit opens
174 /// up.
175 fn is_retriable_send_error(err: &h3::Error) -> bool {
176 matches!(
177 err,
178 h3::Error::StreamBlocked |
179 h3::Error::TransportError(quiche::Error::StreamLimit)
180 )
181 }
182
183 /// Initiates a client-side request. This sends the request, stores the
184 /// [`PendingClientRequest`] and allocates a new stream plus potential
185 /// DATAGRAM flow (CONNECT-{UDP,IP}).
186 ///
187 /// If the connection temporarily cannot open a new stream (`StreamBlocked`
188 /// or `StreamLimit`), the request is pushed onto `queued_requests` and
189 /// will be retried after [`BLOCKED_RETRY_DELAY`].
190 fn initiate_request(
191 driver: &mut H3Driver<Self>, qconn: &mut QuicheConnection,
192 request: NewClientRequest,
193 ) -> H3ConnectionResult<()> {
194 let body_finished = request.body_writer.is_none();
195
196 let stream_id = match driver.conn_mut()?.send_request(
197 qconn,
198 &request.headers,
199 body_finished,
200 ) {
201 Ok(id) => id,
202 Err(ref err) if Self::is_retriable_send_error(err) => {
203 log::debug!(
204 "send_request blocked, queuing for retry";
205 "request_id" => request.request_id,
206 "error" => %err,
207 );
208 driver.hooks.queued_requests.push_back(request);
209 return Ok(());
210 },
211 Err(err) => return Err(H3ConnectionError::from(err)),
212 };
213
214 // log::info!("sent h3 request"; "stream_id" => stream_id);
215 let (mut stream_ctx, send, recv) =
216 StreamCtx::new(stream_id, STREAM_CAPACITY);
217
218 if body_finished {
219 // `send_request()` already sent FIN for bodyless requests, so
220 // mark the send side complete and drop the outbound-frame receiver
221 // since no body will be sent.
222 stream_ctx.fin_or_reset_sent = true;
223 stream_ctx.recv = None;
224 stream_ctx
225 .audit_stats
226 .set_sent_stream_fin(StreamClosureKind::Explicit);
227 }
228
229 if let Some(quarter_stream_id) =
230 datagram::extract_quarter_stream_id(stream_id, &request.headers)
231 {
232 log::info!(
233 "creating new flow for MASQUE request";
234 "stream_id" => stream_id,
235 "quarter_stream_id" => quarter_stream_id,
236 );
237 let _ = driver.get_or_insert_flow(quarter_stream_id)?;
238 stream_ctx.associated_dgram_flow_id = Some(quarter_stream_id);
239 }
240
241 if let Some(body_writer) = request.body_writer {
242 let _ = body_writer.send(send.clone());
243 driver
244 .waiting_streams
245 .push(stream_ctx.wait_for_recv(stream_id));
246 }
247
248 driver.insert_stream(stream_id, stream_ctx);
249 driver
250 .hooks
251 .pending_requests
252 .insert(stream_id, PendingClientRequest { send, recv });
253
254 // Notify the H3Controller that we've allocated a stream_id for a
255 // given request_id.
256 let _ = driver
257 .h3_event_sender
258 .send(ClientH3Event::NewOutboundRequest {
259 stream_id,
260 request_id: request.request_id,
261 });
262
263 Ok(())
264 }
265
266 /// Handles a response from the peer by sending a relevant [`H3Event`] to
267 /// the [ClientH3Controller] for application-level processing.
268 fn handle_response(
269 driver: &mut H3Driver<Self>, headers: InboundHeaders,
270 pending_request: PendingClientRequest,
271 ) -> H3ConnectionResult<()> {
272 let InboundHeaders {
273 stream_id,
274 headers,
275 has_body,
276 } = headers;
277
278 let Some(stream_ctx) = driver.stream_map.get(&stream_id) else {
279 // todo(fisher): send better error to client
280 return Err(H3ConnectionError::NonexistentStream);
281 };
282
283 let headers = IncomingH3Headers {
284 stream_id,
285 headers,
286 send: pending_request.send,
287 recv: pending_request.recv,
288 read_fin: !has_body,
289 h3_audit_stats: Arc::clone(&stream_ctx.audit_stats),
290 };
291
292 driver
293 .h3_event_sender
294 .send(H3Event::IncomingHeaders(headers).into())
295 .map_err(|_| H3ConnectionError::ControllerWentAway)
296 }
297}
298
299#[allow(private_interfaces)]
300impl DriverHooks for ClientHooks {
301 type Command = ClientH3Command;
302 type Event = ClientH3Event;
303
304 fn new(_settings: &Http3Settings) -> Self {
305 Self {
306 pending_requests: BTreeMap::new(),
307 queued_requests: VecDeque::new(),
308 self_cmd_sender: None,
309 }
310 }
311
312 fn conn_established(
313 driver: &mut H3Driver<Self>, qconn: &mut QuicheConnection,
314 _handshake_info: &HandshakeInfo,
315 ) -> H3ConnectionResult<()> {
316 assert!(
317 !qconn.is_server(),
318 "ClientH3Driver requires a client-side QUIC connection"
319 );
320 driver.hooks.self_cmd_sender = Some(driver.self_cmd_sender().clone());
321 Ok(())
322 }
323
324 fn headers_received(
325 driver: &mut H3Driver<Self>, _qconn: &mut QuicheConnection,
326 headers: InboundHeaders,
327 ) -> H3ConnectionResult<()> {
328 let Some(pending_request) =
329 driver.hooks.pending_requests.remove(&headers.stream_id)
330 else {
331 // todo(fisher): better handling when an unknown stream_id is
332 // encountered.
333 return Ok(());
334 };
335 Self::handle_response(driver, headers, pending_request)
336 }
337
338 fn conn_command(
339 driver: &mut H3Driver<Self>, qconn: &mut QuicheConnection,
340 cmd: Self::Command,
341 ) -> H3ConnectionResult<()> {
342 match cmd {
343 ClientH3Command::Core(c) => driver.handle_core_command(qconn, c),
344 ClientH3Command::ClientRequest(req) =>
345 Self::initiate_request(driver, qconn, req),
346 }
347 }
348
349 fn has_wait_action(driver: &mut H3Driver<Self>) -> bool {
350 !driver.hooks.queued_requests.is_empty()
351 }
352
353 async fn wait_for_action(
354 &mut self, qconn: &mut QuicheConnection,
355 ) -> H3ConnectionResult<()> {
356 // Sleep briefly to let the peer open stream credit or raise the
357 // MAX_STREAMS limit, then re-enqueue waiting requests back into
358 // the driver's command channel so that `conn_command` can retry them
359 // with a full `&mut H3Driver` on the next loop iteration.
360 //
361 // Only drain up to the number of available bidi streams to avoid
362 // re-queueing requests that would immediately fail.
363 tokio::time::sleep(BLOCKED_RETRY_DELAY).await;
364
365 let Some(sender) = &self.self_cmd_sender else {
366 // Should not happen: `conn_established` always sets this.
367 return Ok(());
368 };
369
370 let streams_left = qconn.peer_streams_left_bidi() as usize;
371 let to_drain = streams_left.min(self.queued_requests.len());
372
373 for request in self.queued_requests.drain(..to_drain) {
374 log::debug!(
375 "retrying queued request after stream-blocked delay";
376 "request_id" => request.request_id,
377 );
378 let _ = sender.send(ClientH3Command::ClientRequest(request));
379 }
380
381 Ok(())
382 }
383}
384
385impl ClientH3Controller {
386 /// Creates a [`NewClientRequest`] sender for the paired [ClientH3Driver].
387 pub fn request_sender(&self) -> ClientRequestSender {
388 RequestSender {
389 sender: self.cmd_sender.clone(),
390 _r: Default::default(),
391 }
392 }
393}