tokio_quiche/http3/driver/
client.rs1use std::collections::BTreeMap;
28use std::sync::Arc;
29
30use foundations::telemetry::log;
31use quiche::h3;
32use tokio::sync::mpsc;
33use tokio::sync::oneshot;
34
35use super::datagram;
36use super::DriverHooks;
37use super::H3Command;
38use super::H3ConnectionError;
39use super::H3ConnectionResult;
40use super::H3Controller;
41use super::H3Driver;
42use super::H3Event;
43use super::InboundFrameStream;
44use super::InboundHeaders;
45use super::IncomingH3Headers;
46use super::OutboundFrameSender;
47use super::RequestSender;
48use super::StreamCtx;
49use super::STREAM_CAPACITY;
50use crate::http3::settings::Http3Settings;
51use crate::quic::HandshakeInfo;
52use crate::quic::QuicCommand;
53use crate::quic::QuicheConnection;
54
55pub type ClientH3Driver = H3Driver<ClientHooks>;
59pub type ClientH3Controller = H3Controller<ClientHooks>;
62pub type ClientEventStream = mpsc::UnboundedReceiver<ClientH3Event>;
66pub type ClientRequestSender = RequestSender<ClientH3Command, NewClientRequest>;
69
70#[derive(Debug)]
72pub struct NewClientRequest {
73    pub request_id: u64,
76    pub headers: Vec<h3::Header>,
78    pub body_writer: Option<oneshot::Sender<OutboundFrameSender>>,
81}
82
83#[derive(Debug)]
85pub enum ClientH3Event {
86    Core(H3Event),
87    NewOutboundRequest {
90        stream_id: u64,
91        request_id: u64,
92    },
93}
94
95impl From<H3Event> for ClientH3Event {
96    fn from(ev: H3Event) -> Self {
97        Self::Core(ev)
98    }
99}
100
101#[derive(Debug)]
103pub enum ClientH3Command {
104    Core(H3Command),
105    ClientRequest(NewClientRequest),
109}
110
111impl From<H3Command> for ClientH3Command {
112    fn from(cmd: H3Command) -> Self {
113        Self::Core(cmd)
114    }
115}
116
117impl From<QuicCommand> for ClientH3Command {
118    fn from(cmd: QuicCommand) -> Self {
119        Self::Core(H3Command::QuicCmd(cmd))
120    }
121}
122
123impl From<NewClientRequest> for ClientH3Command {
124    fn from(req: NewClientRequest) -> Self {
125        Self::ClientRequest(req)
126    }
127}
128
129struct PendingClientRequest {
135    send: OutboundFrameSender,
136    recv: InboundFrameStream,
137}
138
139pub struct ClientHooks {
140    pending_requests: BTreeMap<u64, PendingClientRequest>,
142}
143
144impl ClientHooks {
145    fn initiate_request(
149        driver: &mut H3Driver<Self>, qconn: &mut QuicheConnection,
150        request: NewClientRequest,
151    ) -> H3ConnectionResult<()> {
152        let body_finished = request.body_writer.is_none();
153
154        let stream_id = driver.conn_mut()?.send_request(
156            qconn,
157            &request.headers,
158            body_finished,
159        )?;
160
161        let (mut stream_ctx, send, recv) =
163            StreamCtx::new(stream_id, STREAM_CAPACITY);
164
165        if let Some(flow_id) =
166            datagram::extract_flow_id(stream_id, &request.headers)
167        {
168            log::info!(
169                "creating new flow for MASQUE request";
170                "stream_id" => stream_id,
171                "flow_id" => flow_id,
172            );
173            let _ = driver.get_or_insert_flow(flow_id)?;
174            stream_ctx.associated_dgram_flow_id = Some(flow_id);
175        }
176
177        if let Some(body_writer) = request.body_writer {
178            let _ = body_writer.send(send.clone());
179            driver
180                .waiting_streams
181                .push(stream_ctx.wait_for_recv(stream_id));
182        }
183
184        driver.insert_stream(stream_id, stream_ctx);
185        driver
186            .hooks
187            .pending_requests
188            .insert(stream_id, PendingClientRequest { send, recv });
189
190        let _ = driver
193            .h3_event_sender
194            .send(ClientH3Event::NewOutboundRequest {
195                stream_id,
196                request_id: request.request_id,
197            });
198
199        Ok(())
200    }
201
202    fn handle_response(
205        driver: &mut H3Driver<Self>, headers: InboundHeaders,
206        pending_request: PendingClientRequest,
207    ) -> H3ConnectionResult<()> {
208        let InboundHeaders {
209            stream_id,
210            headers,
211            has_body,
212        } = headers;
213
214        let Some(stream_ctx) = driver.stream_map.get(&stream_id) else {
215            return Err(H3ConnectionError::NonexistentStream);
217        };
218
219        let headers = IncomingH3Headers {
220            stream_id,
221            headers,
222            send: pending_request.send,
223            recv: pending_request.recv,
224            read_fin: !has_body,
225            h3_audit_stats: Arc::clone(&stream_ctx.audit_stats),
226        };
227
228        driver
229            .h3_event_sender
230            .send(H3Event::IncomingHeaders(headers).into())
231            .map_err(|_| H3ConnectionError::ControllerWentAway)
232    }
233}
234
235#[allow(private_interfaces)]
236impl DriverHooks for ClientHooks {
237    type Command = ClientH3Command;
238    type Event = ClientH3Event;
239
240    fn new(_settings: &Http3Settings) -> Self {
241        Self {
242            pending_requests: BTreeMap::new(),
243        }
244    }
245
246    fn conn_established(
247        _driver: &mut H3Driver<Self>, qconn: &mut QuicheConnection,
248        _handshake_info: &HandshakeInfo,
249    ) -> H3ConnectionResult<()> {
250        assert!(
251            !qconn.is_server(),
252            "ClientH3Driver requires a client-side QUIC connection"
253        );
254        Ok(())
255    }
256
257    fn headers_received(
258        driver: &mut H3Driver<Self>, _qconn: &mut QuicheConnection,
259        headers: InboundHeaders,
260    ) -> H3ConnectionResult<()> {
261        let Some(pending_request) =
262            driver.hooks.pending_requests.remove(&headers.stream_id)
263        else {
264            return Ok(());
267        };
268        Self::handle_response(driver, headers, pending_request)
269    }
270
271    fn conn_command(
272        driver: &mut H3Driver<Self>, qconn: &mut QuicheConnection,
273        cmd: Self::Command,
274    ) -> H3ConnectionResult<()> {
275        match cmd {
276            ClientH3Command::Core(c) => driver.handle_core_command(qconn, c),
277            ClientH3Command::ClientRequest(req) =>
278                Self::initiate_request(driver, qconn, req),
279        }
280    }
281}
282
283impl ClientH3Controller {
284    pub fn request_sender(&self) -> ClientRequestSender {
286        RequestSender {
287            sender: self.cmd_sender.clone(),
288            _r: Default::default(),
289        }
290    }
291}