tokio_quiche/http3/driver/
client.rs

1// Copyright (C) 2025, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27use std::collections::BTreeMap;
28use std::sync::Arc;
29
30use foundations::telemetry::log;
31use quiche::h3;
32use tokio::sync::mpsc;
33use tokio::sync::oneshot;
34
35use super::datagram;
36use super::DriverHooks;
37use super::H3Command;
38use super::H3ConnectionError;
39use super::H3ConnectionResult;
40use super::H3Controller;
41use super::H3Driver;
42use super::H3Event;
43use super::InboundFrameStream;
44use super::InboundHeaders;
45use super::IncomingH3Headers;
46use super::OutboundFrameSender;
47use super::RequestSender;
48use super::StreamCtx;
49use super::STREAM_CAPACITY;
50use crate::http3::settings::Http3Settings;
51use crate::quic::HandshakeInfo;
52use crate::quic::QuicCommand;
53use crate::quic::QuicheConnection;
54
55/// An [H3Driver] for a client-side HTTP/3 connection. See [H3Driver] for
56/// details. Emits [`ClientH3Event`]s and expects [`ClientH3Command`]s for
57/// control.
58pub type ClientH3Driver = H3Driver<ClientHooks>;
59/// The [H3Controller] type paired with [ClientH3Driver]. See [H3Controller] for
60/// details.
61pub type ClientH3Controller = H3Controller<ClientHooks>;
62/// Receives [`ClientH3Event`]s from a [ClientH3Driver]. This is the control
63/// stream which describes what is happening on the connection, but does not
64/// transfer data.
65pub type ClientEventStream = mpsc::UnboundedReceiver<ClientH3Event>;
66/// A [RequestSender] to send HTTP requests over a [ClientH3Driver]'s
67/// connection.
68pub type ClientRequestSender = RequestSender<ClientH3Command, NewClientRequest>;
69
70/// An HTTP request sent using a [ClientRequestSender] to the [ClientH3Driver].
71#[derive(Debug)]
72pub struct NewClientRequest {
73    /// A user-defined identifier to match [`ClientH3Event::NewOutboundRequest`]
74    /// to its original [`NewClientRequest`]. This ID is not used anywhere else.
75    pub request_id: u64,
76    /// The [`h3::Header`]s that make up this request.
77    pub headers: Vec<h3::Header>,
78    /// A sender to pass the request's [`OutboundFrameSender`] to the request
79    /// body.
80    pub body_writer: Option<oneshot::Sender<OutboundFrameSender>>,
81}
82
83/// Events produced by [ClientH3Driver].
84#[derive(Debug)]
85pub enum ClientH3Event {
86    Core(H3Event),
87    /// Headers for the request with the given `request_id` were sent on
88    /// `stream_id`. The body, if there is one, could still be sending.
89    NewOutboundRequest {
90        stream_id: u64,
91        request_id: u64,
92    },
93}
94
95impl From<H3Event> for ClientH3Event {
96    fn from(ev: H3Event) -> Self {
97        Self::Core(ev)
98    }
99}
100
101/// Commands accepted by [ClientH3Driver].
102#[derive(Debug)]
103pub enum ClientH3Command {
104    Core(H3Command),
105    /// Send a new HTTP request over the [`quiche::h3::Connection`]. The driver
106    /// will allocate a stream ID and report it back to the controller via
107    /// [`ClientH3Event::NewOutboundRequest`].
108    ClientRequest(NewClientRequest),
109}
110
111impl From<H3Command> for ClientH3Command {
112    fn from(cmd: H3Command) -> Self {
113        Self::Core(cmd)
114    }
115}
116
117impl From<QuicCommand> for ClientH3Command {
118    fn from(cmd: QuicCommand) -> Self {
119        Self::Core(H3Command::QuicCmd(cmd))
120    }
121}
122
123impl From<NewClientRequest> for ClientH3Command {
124    fn from(req: NewClientRequest) -> Self {
125        Self::ClientRequest(req)
126    }
127}
128
129/// A [`PendingClientRequest`] is a request which has not yet received a
130/// response.
131///
132/// The `send` and `recv` halves are passed to the [ClientH3Controller] in an
133/// [`H3Event::IncomingHeaders`] once the server's response has been received.
134struct PendingClientRequest {
135    send: OutboundFrameSender,
136    recv: InboundFrameStream,
137}
138
139pub struct ClientHooks {
140    /// Mapping from stream IDs to the associated [`PendingClientRequest`].
141    pending_requests: BTreeMap<u64, PendingClientRequest>,
142}
143
144impl ClientHooks {
145    /// Initiates a client-side request. This sends the request, stores the
146    /// [`PendingClientRequest`] and allocates a new stream plus potential
147    /// DATAGRAM flow (CONNECT-{UDP,IP}).
148    fn initiate_request(
149        driver: &mut H3Driver<Self>, qconn: &mut QuicheConnection,
150        request: NewClientRequest,
151    ) -> H3ConnectionResult<()> {
152        let body_finished = request.body_writer.is_none();
153
154        // TODO: retry the request if the error is not fatal
155        let stream_id = driver.conn_mut()?.send_request(
156            qconn,
157            &request.headers,
158            body_finished,
159        )?;
160
161        // log::info!("sent h3 request"; "stream_id" => stream_id);
162        let (mut stream_ctx, send, recv) =
163            StreamCtx::new(stream_id, STREAM_CAPACITY);
164
165        if let Some(flow_id) =
166            datagram::extract_flow_id(stream_id, &request.headers)
167        {
168            log::info!(
169                "creating new flow for MASQUE request";
170                "stream_id" => stream_id,
171                "flow_id" => flow_id,
172            );
173            let _ = driver.get_or_insert_flow(flow_id)?;
174            stream_ctx.associated_dgram_flow_id = Some(flow_id);
175        }
176
177        if let Some(body_writer) = request.body_writer {
178            let _ = body_writer.send(send.clone());
179            driver
180                .waiting_streams
181                .push(stream_ctx.wait_for_recv(stream_id));
182        }
183
184        driver.insert_stream(stream_id, stream_ctx);
185        driver
186            .hooks
187            .pending_requests
188            .insert(stream_id, PendingClientRequest { send, recv });
189
190        // Notify the H3Controller that we've allocated a stream_id for a
191        // given request_id.
192        let _ = driver
193            .h3_event_sender
194            .send(ClientH3Event::NewOutboundRequest {
195                stream_id,
196                request_id: request.request_id,
197            });
198
199        Ok(())
200    }
201
202    /// Handles a response from the peer by sending a relevant [`H3Event`] to
203    /// the [ClientH3Controller] for application-level processing.
204    fn handle_response(
205        driver: &mut H3Driver<Self>, headers: InboundHeaders,
206        pending_request: PendingClientRequest,
207    ) -> H3ConnectionResult<()> {
208        let InboundHeaders {
209            stream_id,
210            headers,
211            has_body,
212        } = headers;
213
214        let Some(stream_ctx) = driver.stream_map.get(&stream_id) else {
215            // todo(fisher): send better error to client
216            return Err(H3ConnectionError::NonexistentStream);
217        };
218
219        let headers = IncomingH3Headers {
220            stream_id,
221            headers,
222            send: pending_request.send,
223            recv: pending_request.recv,
224            read_fin: !has_body,
225            h3_audit_stats: Arc::clone(&stream_ctx.audit_stats),
226        };
227
228        driver
229            .h3_event_sender
230            .send(H3Event::IncomingHeaders(headers).into())
231            .map_err(|_| H3ConnectionError::ControllerWentAway)
232    }
233}
234
235#[allow(private_interfaces)]
236impl DriverHooks for ClientHooks {
237    type Command = ClientH3Command;
238    type Event = ClientH3Event;
239
240    fn new(_settings: &Http3Settings) -> Self {
241        Self {
242            pending_requests: BTreeMap::new(),
243        }
244    }
245
246    fn conn_established(
247        _driver: &mut H3Driver<Self>, qconn: &mut QuicheConnection,
248        _handshake_info: &HandshakeInfo,
249    ) -> H3ConnectionResult<()> {
250        assert!(
251            !qconn.is_server(),
252            "ClientH3Driver requires a client-side QUIC connection"
253        );
254        Ok(())
255    }
256
257    fn headers_received(
258        driver: &mut H3Driver<Self>, _qconn: &mut QuicheConnection,
259        headers: InboundHeaders,
260    ) -> H3ConnectionResult<()> {
261        let Some(pending_request) =
262            driver.hooks.pending_requests.remove(&headers.stream_id)
263        else {
264            // todo(fisher): better handling when an unknown stream_id is
265            // encountered.
266            return Ok(());
267        };
268        Self::handle_response(driver, headers, pending_request)
269    }
270
271    fn conn_command(
272        driver: &mut H3Driver<Self>, qconn: &mut QuicheConnection,
273        cmd: Self::Command,
274    ) -> H3ConnectionResult<()> {
275        match cmd {
276            ClientH3Command::Core(c) => driver.handle_core_command(qconn, c),
277            ClientH3Command::ClientRequest(req) =>
278                Self::initiate_request(driver, qconn, req),
279        }
280    }
281}
282
283impl ClientH3Controller {
284    /// Creates a [`NewClientRequest`] sender for the paired [ClientH3Driver].
285    pub fn request_sender(&self) -> ClientRequestSender {
286        RequestSender {
287            sender: self.cmd_sender.clone(),
288            _r: Default::default(),
289        }
290    }
291}