Skip to main content

tokio_quiche/http3/driver/
client.rs

1// Copyright (C) 2025, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27use std::collections::BTreeMap;
28use std::collections::VecDeque;
29use std::sync::Arc;
30use std::time::Duration;
31
32use datagram_socket::StreamClosureKind;
33use foundations::telemetry::log;
34use quiche::h3;
35use tokio::sync::mpsc;
36use tokio::sync::oneshot;
37
38use super::datagram;
39use super::DriverHooks;
40use super::H3Command;
41use super::H3ConnectionError;
42use super::H3ConnectionResult;
43use super::H3Controller;
44use super::H3Driver;
45use super::H3Event;
46use super::InboundFrameStream;
47use super::InboundHeaders;
48use super::IncomingH3Headers;
49use super::OutboundFrameSender;
50use super::RequestSender;
51use super::StreamCtx;
52use super::STREAM_CAPACITY;
53use crate::http3::settings::Http3Settings;
54use crate::quic::HandshakeInfo;
55use crate::quic::QuicCommand;
56use crate::quic::QuicheConnection;
57
58/// An [H3Driver] for a client-side HTTP/3 connection. See [H3Driver] for
59/// details. Emits [`ClientH3Event`]s and expects [`ClientH3Command`]s for
60/// control.
61pub type ClientH3Driver = H3Driver<ClientHooks>;
62/// The [H3Controller] type paired with [ClientH3Driver]. See [H3Controller] for
63/// details.
64pub type ClientH3Controller = H3Controller<ClientHooks>;
65/// Receives [`ClientH3Event`]s from a [ClientH3Driver]. This is the control
66/// stream which describes what is happening on the connection, but does not
67/// transfer data.
68pub type ClientEventStream = mpsc::UnboundedReceiver<ClientH3Event>;
69/// A [RequestSender] to send HTTP requests over a [ClientH3Driver]'s
70/// connection.
71pub type ClientRequestSender = RequestSender<ClientH3Command, NewClientRequest>;
72
73/// An HTTP request sent using a [ClientRequestSender] to the [ClientH3Driver].
74#[derive(Debug)]
75pub struct NewClientRequest {
76    /// A user-defined identifier to match [`ClientH3Event::NewOutboundRequest`]
77    /// to its original [`NewClientRequest`]. This ID is not used anywhere else.
78    pub request_id: u64,
79    /// The [`h3::Header`]s that make up this request.
80    pub headers: Vec<h3::Header>,
81    /// A sender to pass the request's [`OutboundFrameSender`] to the request
82    /// body.
83    pub body_writer: Option<oneshot::Sender<OutboundFrameSender>>,
84}
85
86/// Events produced by [ClientH3Driver].
87#[derive(Debug)]
88pub enum ClientH3Event {
89    Core(H3Event),
90    /// Headers for the request with the given `request_id` were sent on
91    /// `stream_id`. The body, if there is one, could still be sending.
92    NewOutboundRequest {
93        stream_id: u64,
94        request_id: u64,
95    },
96}
97
98impl From<H3Event> for ClientH3Event {
99    fn from(ev: H3Event) -> Self {
100        Self::Core(ev)
101    }
102}
103
104/// Commands accepted by [ClientH3Driver].
105#[derive(Debug)]
106pub enum ClientH3Command {
107    Core(H3Command),
108    /// Send a new HTTP request over the [`quiche::h3::Connection`]. The driver
109    /// will allocate a stream ID and report it back to the controller via
110    /// [`ClientH3Event::NewOutboundRequest`].
111    ClientRequest(NewClientRequest),
112}
113
114impl From<H3Command> for ClientH3Command {
115    fn from(cmd: H3Command) -> Self {
116        Self::Core(cmd)
117    }
118}
119
120impl From<QuicCommand> for ClientH3Command {
121    fn from(cmd: QuicCommand) -> Self {
122        Self::Core(H3Command::QuicCmd(cmd))
123    }
124}
125
126impl From<NewClientRequest> for ClientH3Command {
127    fn from(req: NewClientRequest) -> Self {
128        Self::ClientRequest(req)
129    }
130}
131
132/// A [`PendingClientRequest`] is a request which has not yet received a
133/// response.
134///
135/// The `send` and `recv` halves are passed to the [ClientH3Controller] in an
136/// [`H3Event::IncomingHeaders`] once the server's response has been received.
137struct PendingClientRequest {
138    send: OutboundFrameSender,
139    recv: InboundFrameStream,
140}
141
142/// Retry delay when `StreamBlocked` or `StreamLimit` is returned by
143/// `send_request`. Both errors are transient: the peer will open more
144/// stream credit or flow-control window within a few milliseconds.
145const BLOCKED_RETRY_DELAY: Duration = Duration::from_millis(5);
146
147pub struct ClientHooks {
148    /// Mapping from stream IDs to the associated [`PendingClientRequest`].
149    pending_requests: BTreeMap<u64, PendingClientRequest>,
150    /// Requests that could not be sent yet due to `StreamBlocked` or
151    /// `StreamLimit`. They will be retried after [`BLOCKED_RETRY_DELAY`].
152    queued_requests: VecDeque<NewClientRequest>,
153    /// A sender back into the driver's own command channel, used to
154    /// re-enqueue blocked requests after the retry delay.
155    ///
156    /// Initialised in `conn_established`; `None` before the connection
157    /// is established.
158    self_cmd_sender: Option<mpsc::UnboundedSender<ClientH3Command>>,
159}
160
161impl ClientHooks {
162    /// Returns `true` when `err` from `h3::Connection::send_request` means the
163    /// request should be retried after a short delay rather than treated as a
164    /// fatal connection error.
165    ///
166    /// `send_request` rolls back any partial stream state before returning
167    /// these errors, so retrying the call with the same arguments is safe.
168    ///
169    /// * `StreamBlocked` — the QUIC stream's flow-control window is temporarily
170    ///   exhausted; the stream entry is removed by `send_request` before
171    ///   returning, so the stream ID is not consumed.
172    /// * `TransportError(StreamLimit)` — the peer's concurrent-stream limit has
173    ///   been reached; QUIC will deliver a MAX_STREAMS frame when credit opens
174    ///   up.
175    fn is_retriable_send_error(err: &h3::Error) -> bool {
176        matches!(
177            err,
178            h3::Error::StreamBlocked |
179                h3::Error::TransportError(quiche::Error::StreamLimit)
180        )
181    }
182
183    /// Initiates a client-side request. This sends the request, stores the
184    /// [`PendingClientRequest`] and allocates a new stream plus potential
185    /// DATAGRAM flow (CONNECT-{UDP,IP}).
186    ///
187    /// If the connection temporarily cannot open a new stream (`StreamBlocked`
188    /// or `StreamLimit`), the request is pushed onto `queued_requests` and
189    /// will be retried after [`BLOCKED_RETRY_DELAY`].
190    fn initiate_request(
191        driver: &mut H3Driver<Self>, qconn: &mut QuicheConnection,
192        request: NewClientRequest,
193    ) -> H3ConnectionResult<()> {
194        let body_finished = request.body_writer.is_none();
195
196        let stream_id = match driver.conn_mut()?.send_request(
197            qconn,
198            &request.headers,
199            body_finished,
200        ) {
201            Ok(id) => id,
202            Err(ref err) if Self::is_retriable_send_error(err) => {
203                log::debug!(
204                    "send_request blocked, queuing for retry";
205                    "request_id" => request.request_id,
206                    "error" => %err,
207                );
208                driver.hooks.queued_requests.push_back(request);
209                return Ok(());
210            },
211            Err(err) => return Err(H3ConnectionError::from(err)),
212        };
213
214        // log::info!("sent h3 request"; "stream_id" => stream_id);
215        let (mut stream_ctx, send, recv) =
216            StreamCtx::new(stream_id, STREAM_CAPACITY);
217
218        if body_finished {
219            // `send_request()` already sent FIN for bodyless requests, so
220            // mark the send side complete and drop the outbound-frame receiver
221            // since no body will be sent.
222            stream_ctx.fin_or_reset_sent = true;
223            stream_ctx.recv = None;
224            stream_ctx
225                .audit_stats
226                .set_sent_stream_fin(StreamClosureKind::Explicit);
227        }
228
229        if let Some(quarter_stream_id) =
230            datagram::extract_quarter_stream_id(stream_id, &request.headers)
231        {
232            log::info!(
233                "creating new flow for MASQUE request";
234                "stream_id" => stream_id,
235                "quarter_stream_id" => quarter_stream_id,
236            );
237            let _ = driver.get_or_insert_flow(quarter_stream_id)?;
238            stream_ctx.associated_dgram_flow_id = Some(quarter_stream_id);
239        }
240
241        if let Some(body_writer) = request.body_writer {
242            let _ = body_writer.send(send.clone());
243            driver
244                .waiting_streams
245                .push(stream_ctx.wait_for_recv(stream_id));
246        }
247
248        driver.insert_stream(stream_id, stream_ctx);
249        driver
250            .hooks
251            .pending_requests
252            .insert(stream_id, PendingClientRequest { send, recv });
253
254        // Notify the H3Controller that we've allocated a stream_id for a
255        // given request_id.
256        let _ = driver
257            .h3_event_sender
258            .send(ClientH3Event::NewOutboundRequest {
259                stream_id,
260                request_id: request.request_id,
261            });
262
263        Ok(())
264    }
265
266    /// Handles a response from the peer by sending a relevant [`H3Event`] to
267    /// the [ClientH3Controller] for application-level processing.
268    fn handle_response(
269        driver: &mut H3Driver<Self>, headers: InboundHeaders,
270        pending_request: PendingClientRequest,
271    ) -> H3ConnectionResult<()> {
272        let InboundHeaders {
273            stream_id,
274            headers,
275            has_body,
276        } = headers;
277
278        let Some(stream_ctx) = driver.stream_map.get(&stream_id) else {
279            // todo(fisher): send better error to client
280            return Err(H3ConnectionError::NonexistentStream);
281        };
282
283        let headers = IncomingH3Headers {
284            stream_id,
285            headers,
286            send: pending_request.send,
287            recv: pending_request.recv,
288            read_fin: !has_body,
289            h3_audit_stats: Arc::clone(&stream_ctx.audit_stats),
290        };
291
292        driver
293            .h3_event_sender
294            .send(H3Event::IncomingHeaders(headers).into())
295            .map_err(|_| H3ConnectionError::ControllerWentAway)
296    }
297}
298
299#[allow(private_interfaces)]
300impl DriverHooks for ClientHooks {
301    type Command = ClientH3Command;
302    type Event = ClientH3Event;
303
304    fn new(_settings: &Http3Settings) -> Self {
305        Self {
306            pending_requests: BTreeMap::new(),
307            queued_requests: VecDeque::new(),
308            self_cmd_sender: None,
309        }
310    }
311
312    fn conn_established(
313        driver: &mut H3Driver<Self>, qconn: &mut QuicheConnection,
314        _handshake_info: &HandshakeInfo,
315    ) -> H3ConnectionResult<()> {
316        assert!(
317            !qconn.is_server(),
318            "ClientH3Driver requires a client-side QUIC connection"
319        );
320        driver.hooks.self_cmd_sender = Some(driver.self_cmd_sender().clone());
321        Ok(())
322    }
323
324    fn headers_received(
325        driver: &mut H3Driver<Self>, _qconn: &mut QuicheConnection,
326        headers: InboundHeaders,
327    ) -> H3ConnectionResult<()> {
328        let Some(pending_request) =
329            driver.hooks.pending_requests.remove(&headers.stream_id)
330        else {
331            // todo(fisher): better handling when an unknown stream_id is
332            // encountered.
333            return Ok(());
334        };
335        Self::handle_response(driver, headers, pending_request)
336    }
337
338    fn conn_command(
339        driver: &mut H3Driver<Self>, qconn: &mut QuicheConnection,
340        cmd: Self::Command,
341    ) -> H3ConnectionResult<()> {
342        match cmd {
343            ClientH3Command::Core(c) => driver.handle_core_command(qconn, c),
344            ClientH3Command::ClientRequest(req) =>
345                Self::initiate_request(driver, qconn, req),
346        }
347    }
348
349    fn has_wait_action(driver: &mut H3Driver<Self>) -> bool {
350        !driver.hooks.queued_requests.is_empty()
351    }
352
353    async fn wait_for_action(
354        &mut self, qconn: &mut QuicheConnection,
355    ) -> H3ConnectionResult<()> {
356        // Sleep briefly to let the peer open stream credit or raise the
357        // MAX_STREAMS limit, then re-enqueue waiting requests back into
358        // the driver's command channel so that `conn_command` can retry them
359        // with a full `&mut H3Driver` on the next loop iteration.
360        //
361        // Only drain up to the number of available bidi streams to avoid
362        // re-queueing requests that would immediately fail.
363        tokio::time::sleep(BLOCKED_RETRY_DELAY).await;
364
365        let Some(sender) = &self.self_cmd_sender else {
366            // Should not happen: `conn_established` always sets this.
367            return Ok(());
368        };
369
370        let streams_left = qconn.peer_streams_left_bidi() as usize;
371        let to_drain = streams_left.min(self.queued_requests.len());
372
373        for request in self.queued_requests.drain(..to_drain) {
374            log::debug!(
375                "retrying queued request after stream-blocked delay";
376                "request_id" => request.request_id,
377            );
378            let _ = sender.send(ClientH3Command::ClientRequest(request));
379        }
380
381        Ok(())
382    }
383}
384
385impl ClientH3Controller {
386    /// Creates a [`NewClientRequest`] sender for the paired [ClientH3Driver].
387    pub fn request_sender(&self) -> ClientRequestSender {
388        RequestSender {
389            sender: self.cmd_sender.clone(),
390            _r: Default::default(),
391        }
392    }
393}