tokio_quiche/http3/driver/
client.rs1use std::collections::BTreeMap;
28use std::sync::Arc;
29
30use foundations::telemetry::log;
31use quiche::h3;
32use tokio::sync::mpsc;
33use tokio::sync::oneshot;
34
35use super::datagram;
36use super::DriverHooks;
37use super::H3Command;
38use super::H3ConnectionError;
39use super::H3ConnectionResult;
40use super::H3Controller;
41use super::H3Driver;
42use super::H3Event;
43use super::InboundFrameStream;
44use super::InboundHeaders;
45use super::IncomingH3Headers;
46use super::OutboundFrameSender;
47use super::RequestSender;
48use super::StreamCtx;
49use super::STREAM_CAPACITY;
50use crate::http3::settings::Http3Settings;
51use crate::quic::HandshakeInfo;
52use crate::quic::QuicCommand;
53use crate::quic::QuicheConnection;
54
55pub type ClientH3Driver = H3Driver<ClientHooks>;
59pub type ClientH3Controller = H3Controller<ClientHooks>;
62pub type ClientEventStream = mpsc::UnboundedReceiver<ClientH3Event>;
66pub type ClientRequestSender = RequestSender<ClientH3Command, NewClientRequest>;
69
70#[derive(Debug)]
72pub struct NewClientRequest {
73 pub request_id: u64,
76 pub headers: Vec<h3::Header>,
78 pub body_writer: Option<oneshot::Sender<OutboundFrameSender>>,
81}
82
83#[derive(Debug)]
85pub enum ClientH3Event {
86 Core(H3Event),
87 NewOutboundRequest {
90 stream_id: u64,
91 request_id: u64,
92 },
93}
94
95impl From<H3Event> for ClientH3Event {
96 fn from(ev: H3Event) -> Self {
97 Self::Core(ev)
98 }
99}
100
101#[derive(Debug)]
103pub enum ClientH3Command {
104 Core(H3Command),
105 ClientRequest(NewClientRequest),
109}
110
111impl From<H3Command> for ClientH3Command {
112 fn from(cmd: H3Command) -> Self {
113 Self::Core(cmd)
114 }
115}
116
117impl From<QuicCommand> for ClientH3Command {
118 fn from(cmd: QuicCommand) -> Self {
119 Self::Core(H3Command::QuicCmd(cmd))
120 }
121}
122
123impl From<NewClientRequest> for ClientH3Command {
124 fn from(req: NewClientRequest) -> Self {
125 Self::ClientRequest(req)
126 }
127}
128
129struct PendingClientRequest {
135 send: OutboundFrameSender,
136 recv: InboundFrameStream,
137}
138
139pub struct ClientHooks {
140 pending_requests: BTreeMap<u64, PendingClientRequest>,
142}
143
144impl ClientHooks {
145 fn initiate_request(
149 driver: &mut H3Driver<Self>, qconn: &mut QuicheConnection,
150 request: NewClientRequest,
151 ) -> H3ConnectionResult<()> {
152 let body_finished = request.body_writer.is_none();
153
154 let stream_id = driver.conn_mut()?.send_request(
156 qconn,
157 &request.headers,
158 body_finished,
159 )?;
160
161 let (mut stream_ctx, send, recv) =
163 StreamCtx::new(stream_id, STREAM_CAPACITY);
164
165 if let Some(flow_id) =
166 datagram::extract_flow_id(stream_id, &request.headers)
167 {
168 log::info!(
169 "creating new flow for MASQUE request";
170 "stream_id" => stream_id,
171 "flow_id" => flow_id,
172 );
173 let _ = driver.get_or_insert_flow(flow_id)?;
174 stream_ctx.associated_dgram_flow_id = Some(flow_id);
175 }
176
177 if let Some(body_writer) = request.body_writer {
178 let _ = body_writer.send(send.clone());
179 driver
180 .waiting_streams
181 .push(stream_ctx.wait_for_recv(stream_id));
182 }
183
184 driver.insert_stream(stream_id, stream_ctx);
185 driver
186 .hooks
187 .pending_requests
188 .insert(stream_id, PendingClientRequest { send, recv });
189
190 let _ = driver
193 .h3_event_sender
194 .send(ClientH3Event::NewOutboundRequest {
195 stream_id,
196 request_id: request.request_id,
197 });
198
199 Ok(())
200 }
201
202 fn handle_response(
205 driver: &mut H3Driver<Self>, headers: InboundHeaders,
206 pending_request: PendingClientRequest,
207 ) -> H3ConnectionResult<()> {
208 let InboundHeaders {
209 stream_id,
210 headers,
211 has_body,
212 } = headers;
213
214 let Some(stream_ctx) = driver.stream_map.get(&stream_id) else {
215 return Err(H3ConnectionError::NonexistentStream);
217 };
218
219 let headers = IncomingH3Headers {
220 stream_id,
221 headers,
222 send: pending_request.send,
223 recv: pending_request.recv,
224 read_fin: !has_body,
225 h3_audit_stats: Arc::clone(&stream_ctx.audit_stats),
226 };
227
228 driver
229 .h3_event_sender
230 .send(H3Event::IncomingHeaders(headers).into())
231 .map_err(|_| H3ConnectionError::ControllerWentAway)
232 }
233}
234
235#[allow(private_interfaces)]
236impl DriverHooks for ClientHooks {
237 type Command = ClientH3Command;
238 type Event = ClientH3Event;
239
240 fn new(_settings: &Http3Settings) -> Self {
241 Self {
242 pending_requests: BTreeMap::new(),
243 }
244 }
245
246 fn conn_established(
247 _driver: &mut H3Driver<Self>, qconn: &mut QuicheConnection,
248 _handshake_info: &HandshakeInfo,
249 ) -> H3ConnectionResult<()> {
250 assert!(
251 !qconn.is_server(),
252 "ClientH3Driver requires a client-side QUIC connection"
253 );
254 Ok(())
255 }
256
257 fn headers_received(
258 driver: &mut H3Driver<Self>, _qconn: &mut QuicheConnection,
259 headers: InboundHeaders,
260 ) -> H3ConnectionResult<()> {
261 let Some(pending_request) =
262 driver.hooks.pending_requests.remove(&headers.stream_id)
263 else {
264 return Ok(());
267 };
268 Self::handle_response(driver, headers, pending_request)
269 }
270
271 fn conn_command(
272 driver: &mut H3Driver<Self>, qconn: &mut QuicheConnection,
273 cmd: Self::Command,
274 ) -> H3ConnectionResult<()> {
275 match cmd {
276 ClientH3Command::Core(c) => driver.handle_core_command(qconn, c),
277 ClientH3Command::ClientRequest(req) =>
278 Self::initiate_request(driver, qconn, req),
279 }
280 }
281}
282
283impl ClientH3Controller {
284 pub fn request_sender(&self) -> ClientRequestSender {
286 RequestSender {
287 sender: self.cmd_sender.clone(),
288 _r: Default::default(),
289 }
290 }
291}