Skip to main content

tokio_quiche/http3/driver/
client.rs

1// Copyright (C) 2025, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27use std::collections::BTreeMap;
28use std::collections::VecDeque;
29use std::sync::Arc;
30use std::time::Duration;
31
32use foundations::telemetry::log;
33use quiche::h3;
34use tokio::sync::mpsc;
35use tokio::sync::oneshot;
36
37use super::datagram;
38use super::DriverHooks;
39use super::H3Command;
40use super::H3ConnectionError;
41use super::H3ConnectionResult;
42use super::H3Controller;
43use super::H3Driver;
44use super::H3Event;
45use super::InboundFrameStream;
46use super::InboundHeaders;
47use super::IncomingH3Headers;
48use super::OutboundFrameSender;
49use super::RequestSender;
50use super::StreamCtx;
51use super::STREAM_CAPACITY;
52use crate::http3::settings::Http3Settings;
53use crate::quic::HandshakeInfo;
54use crate::quic::QuicCommand;
55use crate::quic::QuicheConnection;
56
57/// An [H3Driver] for a client-side HTTP/3 connection. See [H3Driver] for
58/// details. Emits [`ClientH3Event`]s and expects [`ClientH3Command`]s for
59/// control.
60pub type ClientH3Driver = H3Driver<ClientHooks>;
61/// The [H3Controller] type paired with [ClientH3Driver]. See [H3Controller] for
62/// details.
63pub type ClientH3Controller = H3Controller<ClientHooks>;
64/// Receives [`ClientH3Event`]s from a [ClientH3Driver]. This is the control
65/// stream which describes what is happening on the connection, but does not
66/// transfer data.
67pub type ClientEventStream = mpsc::UnboundedReceiver<ClientH3Event>;
68/// A [RequestSender] to send HTTP requests over a [ClientH3Driver]'s
69/// connection.
70pub type ClientRequestSender = RequestSender<ClientH3Command, NewClientRequest>;
71
72/// An HTTP request sent using a [ClientRequestSender] to the [ClientH3Driver].
73#[derive(Debug)]
74pub struct NewClientRequest {
75    /// A user-defined identifier to match [`ClientH3Event::NewOutboundRequest`]
76    /// to its original [`NewClientRequest`]. This ID is not used anywhere else.
77    pub request_id: u64,
78    /// The [`h3::Header`]s that make up this request.
79    pub headers: Vec<h3::Header>,
80    /// A sender to pass the request's [`OutboundFrameSender`] to the request
81    /// body.
82    pub body_writer: Option<oneshot::Sender<OutboundFrameSender>>,
83}
84
85/// Events produced by [ClientH3Driver].
86#[derive(Debug)]
87pub enum ClientH3Event {
88    Core(H3Event),
89    /// Headers for the request with the given `request_id` were sent on
90    /// `stream_id`. The body, if there is one, could still be sending.
91    NewOutboundRequest {
92        stream_id: u64,
93        request_id: u64,
94    },
95}
96
97impl From<H3Event> for ClientH3Event {
98    fn from(ev: H3Event) -> Self {
99        Self::Core(ev)
100    }
101}
102
103/// Commands accepted by [ClientH3Driver].
104#[derive(Debug)]
105pub enum ClientH3Command {
106    Core(H3Command),
107    /// Send a new HTTP request over the [`quiche::h3::Connection`]. The driver
108    /// will allocate a stream ID and report it back to the controller via
109    /// [`ClientH3Event::NewOutboundRequest`].
110    ClientRequest(NewClientRequest),
111}
112
113impl From<H3Command> for ClientH3Command {
114    fn from(cmd: H3Command) -> Self {
115        Self::Core(cmd)
116    }
117}
118
119impl From<QuicCommand> for ClientH3Command {
120    fn from(cmd: QuicCommand) -> Self {
121        Self::Core(H3Command::QuicCmd(cmd))
122    }
123}
124
125impl From<NewClientRequest> for ClientH3Command {
126    fn from(req: NewClientRequest) -> Self {
127        Self::ClientRequest(req)
128    }
129}
130
131/// A [`PendingClientRequest`] is a request which has not yet received a
132/// response.
133///
134/// The `send` and `recv` halves are passed to the [ClientH3Controller] in an
135/// [`H3Event::IncomingHeaders`] once the server's response has been received.
136struct PendingClientRequest {
137    send: OutboundFrameSender,
138    recv: InboundFrameStream,
139}
140
141/// Retry delay when `StreamBlocked` or `StreamLimit` is returned by
142/// `send_request`. Both errors are transient: the peer will open more
143/// stream credit or flow-control window within a few milliseconds.
144const BLOCKED_RETRY_DELAY: Duration = Duration::from_millis(5);
145
146pub struct ClientHooks {
147    /// Mapping from stream IDs to the associated [`PendingClientRequest`].
148    pending_requests: BTreeMap<u64, PendingClientRequest>,
149    /// Requests that could not be sent yet due to `StreamBlocked` or
150    /// `StreamLimit`. They will be retried after [`BLOCKED_RETRY_DELAY`].
151    queued_requests: VecDeque<NewClientRequest>,
152    /// A sender back into the driver's own command channel, used to
153    /// re-enqueue blocked requests after the retry delay.
154    ///
155    /// Initialised in `conn_established`; `None` before the connection
156    /// is established.
157    self_cmd_sender: Option<mpsc::UnboundedSender<ClientH3Command>>,
158}
159
160impl ClientHooks {
161    /// Returns `true` when `err` from `h3::Connection::send_request` means the
162    /// request should be retried after a short delay rather than treated as a
163    /// fatal connection error.
164    ///
165    /// `send_request` rolls back any partial stream state before returning
166    /// these errors, so retrying the call with the same arguments is safe.
167    ///
168    /// * `StreamBlocked` — the QUIC stream's flow-control window is temporarily
169    ///   exhausted; the stream entry is removed by `send_request` before
170    ///   returning, so the stream ID is not consumed.
171    /// * `TransportError(StreamLimit)` — the peer's concurrent-stream limit has
172    ///   been reached; QUIC will deliver a MAX_STREAMS frame when credit opens
173    ///   up.
174    fn is_retriable_send_error(err: &h3::Error) -> bool {
175        matches!(
176            err,
177            h3::Error::StreamBlocked |
178                h3::Error::TransportError(quiche::Error::StreamLimit)
179        )
180    }
181
182    /// Initiates a client-side request. This sends the request, stores the
183    /// [`PendingClientRequest`] and allocates a new stream plus potential
184    /// DATAGRAM flow (CONNECT-{UDP,IP}).
185    ///
186    /// If the connection temporarily cannot open a new stream (`StreamBlocked`
187    /// or `StreamLimit`), the request is pushed onto `queued_requests` and
188    /// will be retried after [`BLOCKED_RETRY_DELAY`].
189    fn initiate_request(
190        driver: &mut H3Driver<Self>, qconn: &mut QuicheConnection,
191        request: NewClientRequest,
192    ) -> H3ConnectionResult<()> {
193        let body_finished = request.body_writer.is_none();
194
195        let stream_id = match driver.conn_mut()?.send_request(
196            qconn,
197            &request.headers,
198            body_finished,
199        ) {
200            Ok(id) => id,
201            Err(ref err) if Self::is_retriable_send_error(err) => {
202                log::debug!(
203                    "send_request blocked, queuing for retry";
204                    "request_id" => request.request_id,
205                    "error" => %err,
206                );
207                driver.hooks.queued_requests.push_back(request);
208                return Ok(());
209            },
210            Err(err) => return Err(H3ConnectionError::from(err)),
211        };
212
213        // log::info!("sent h3 request"; "stream_id" => stream_id);
214        let (mut stream_ctx, send, recv) =
215            StreamCtx::new(stream_id, STREAM_CAPACITY);
216
217        if let Some(quarter_stream_id) =
218            datagram::extract_quarter_stream_id(stream_id, &request.headers)
219        {
220            log::info!(
221                "creating new flow for MASQUE request";
222                "stream_id" => stream_id,
223                "quarter_stream_id" => quarter_stream_id,
224            );
225            let _ = driver.get_or_insert_flow(quarter_stream_id)?;
226            stream_ctx.associated_dgram_flow_id = Some(quarter_stream_id);
227        }
228
229        if let Some(body_writer) = request.body_writer {
230            let _ = body_writer.send(send.clone());
231            driver
232                .waiting_streams
233                .push(stream_ctx.wait_for_recv(stream_id));
234        }
235
236        driver.insert_stream(stream_id, stream_ctx);
237        driver
238            .hooks
239            .pending_requests
240            .insert(stream_id, PendingClientRequest { send, recv });
241
242        // Notify the H3Controller that we've allocated a stream_id for a
243        // given request_id.
244        let _ = driver
245            .h3_event_sender
246            .send(ClientH3Event::NewOutboundRequest {
247                stream_id,
248                request_id: request.request_id,
249            });
250
251        Ok(())
252    }
253
254    /// Handles a response from the peer by sending a relevant [`H3Event`] to
255    /// the [ClientH3Controller] for application-level processing.
256    fn handle_response(
257        driver: &mut H3Driver<Self>, headers: InboundHeaders,
258        pending_request: PendingClientRequest,
259    ) -> H3ConnectionResult<()> {
260        let InboundHeaders {
261            stream_id,
262            headers,
263            has_body,
264        } = headers;
265
266        let Some(stream_ctx) = driver.stream_map.get(&stream_id) else {
267            // todo(fisher): send better error to client
268            return Err(H3ConnectionError::NonexistentStream);
269        };
270
271        let headers = IncomingH3Headers {
272            stream_id,
273            headers,
274            send: pending_request.send,
275            recv: pending_request.recv,
276            read_fin: !has_body,
277            h3_audit_stats: Arc::clone(&stream_ctx.audit_stats),
278        };
279
280        driver
281            .h3_event_sender
282            .send(H3Event::IncomingHeaders(headers).into())
283            .map_err(|_| H3ConnectionError::ControllerWentAway)
284    }
285}
286
287#[allow(private_interfaces)]
288impl DriverHooks for ClientHooks {
289    type Command = ClientH3Command;
290    type Event = ClientH3Event;
291
292    fn new(_settings: &Http3Settings) -> Self {
293        Self {
294            pending_requests: BTreeMap::new(),
295            queued_requests: VecDeque::new(),
296            self_cmd_sender: None,
297        }
298    }
299
300    fn conn_established(
301        driver: &mut H3Driver<Self>, qconn: &mut QuicheConnection,
302        _handshake_info: &HandshakeInfo,
303    ) -> H3ConnectionResult<()> {
304        assert!(
305            !qconn.is_server(),
306            "ClientH3Driver requires a client-side QUIC connection"
307        );
308        driver.hooks.self_cmd_sender = Some(driver.self_cmd_sender().clone());
309        Ok(())
310    }
311
312    fn headers_received(
313        driver: &mut H3Driver<Self>, _qconn: &mut QuicheConnection,
314        headers: InboundHeaders,
315    ) -> H3ConnectionResult<()> {
316        let Some(pending_request) =
317            driver.hooks.pending_requests.remove(&headers.stream_id)
318        else {
319            // todo(fisher): better handling when an unknown stream_id is
320            // encountered.
321            return Ok(());
322        };
323        Self::handle_response(driver, headers, pending_request)
324    }
325
326    fn conn_command(
327        driver: &mut H3Driver<Self>, qconn: &mut QuicheConnection,
328        cmd: Self::Command,
329    ) -> H3ConnectionResult<()> {
330        match cmd {
331            ClientH3Command::Core(c) => driver.handle_core_command(qconn, c),
332            ClientH3Command::ClientRequest(req) =>
333                Self::initiate_request(driver, qconn, req),
334        }
335    }
336
337    fn has_wait_action(driver: &mut H3Driver<Self>) -> bool {
338        !driver.hooks.queued_requests.is_empty()
339    }
340
341    async fn wait_for_action(
342        &mut self, qconn: &mut QuicheConnection,
343    ) -> H3ConnectionResult<()> {
344        // Sleep briefly to let the peer open stream credit or raise the
345        // MAX_STREAMS limit, then re-enqueue waiting requests back into
346        // the driver's command channel so that `conn_command` can retry them
347        // with a full `&mut H3Driver` on the next loop iteration.
348        //
349        // Only drain up to the number of available bidi streams to avoid
350        // re-queueing requests that would immediately fail.
351        tokio::time::sleep(BLOCKED_RETRY_DELAY).await;
352
353        let Some(sender) = &self.self_cmd_sender else {
354            // Should not happen: `conn_established` always sets this.
355            return Ok(());
356        };
357
358        let streams_left = qconn.peer_streams_left_bidi() as usize;
359        let to_drain = streams_left.min(self.queued_requests.len());
360
361        for request in self.queued_requests.drain(..to_drain) {
362            log::debug!(
363                "retrying queued request after stream-blocked delay";
364                "request_id" => request.request_id,
365            );
366            let _ = sender.send(ClientH3Command::ClientRequest(request));
367        }
368
369        Ok(())
370    }
371}
372
373impl ClientH3Controller {
374    /// Creates a [`NewClientRequest`] sender for the paired [ClientH3Driver].
375    pub fn request_sender(&self) -> ClientRequestSender {
376        RequestSender {
377            sender: self.cmd_sender.clone(),
378            _r: Default::default(),
379        }
380    }
381}