1use log;
31use quiche::PathStats;
32use quiche::Stats;
33use std::future::Future;
34use std::pin::Pin;
35use std::task::Context;
36use std::task::Poll;
37use std::time::Duration;
38use tokio::select;
39use tokio::sync::mpsc;
40use tokio::sync::oneshot;
41use tokio::time::sleep;
42use tokio::time::sleep_until;
43use tokio::time::Instant;
44use tokio_quiche::buf_factory::BufFactory;
45use tokio_quiche::metrics::Metrics;
46use tokio_quiche::quic::HandshakeInfo;
47use tokio_quiche::quic::QuicheConnection;
48use tokio_quiche::settings::Hooks;
49use tokio_quiche::settings::QuicSettings;
50use tokio_quiche::socket::Socket;
51use tokio_quiche::ApplicationOverQuic;
52use tokio_quiche::ConnectionParams;
53use tokio_quiche::QuicResult;
54
55use crate::actions::h3::Action;
56use crate::actions::h3::WaitType;
57use crate::actions::h3::WaitingFor;
58use crate::client::execute_action;
59use crate::client::parse_args;
60use crate::client::parse_streams;
61use crate::client::ClientError;
62use crate::client::CloseTriggerFrames;
63use crate::client::ConnectionSummary;
64use crate::client::ParsedArgs;
65use crate::client::StreamMap;
66use crate::client::MAX_DATAGRAM_SIZE;
67use crate::config::Config as H3iConfig;
68use crate::frame::H3iFrame;
69use crate::quiche;
70
71use super::Client;
72use super::ConnectionCloseDetails;
73use super::StreamParserMap;
74
75pub async fn connect(
77 args: &H3iConfig, frame_actions: Vec<Action>,
78 close_trigger_frames: Option<CloseTriggerFrames>,
79) -> std::result::Result<BuildingConnectionSummary, ClientError> {
80 let quic_settings = create_config(args);
81 let mut connection_params =
82 ConnectionParams::new_client(quic_settings, None, Hooks::default());
83
84 connection_params.session = args.session.clone();
85
86 let ParsedArgs {
87 connect_url,
88 bind_addr,
89 peer_addr,
90 } = parse_args(args);
91
92 let socket = tokio::net::UdpSocket::bind(bind_addr).await.unwrap();
93 socket.connect(peer_addr).await.unwrap();
94
95 log::info!(
96 "connecting to {:} from {:}",
97 peer_addr,
98 socket.local_addr().unwrap()
99 );
100
101 let (h3i, conn_summary_fut) =
102 H3iDriver::new(frame_actions, close_trigger_frames);
103 match tokio_quiche::quic::connect_with_config(
104 Socket::try_from(socket).unwrap(),
105 connect_url,
106 &connection_params,
107 h3i,
108 )
109 .await
110 {
111 Ok(_) => Ok(conn_summary_fut),
112 Err(_) => Err(ClientError::HandshakeFail),
113 }
114}
115
116fn create_config(args: &H3iConfig) -> QuicSettings {
117 let mut quic_settings = QuicSettings::default();
118
119 quic_settings.verify_peer = args.verify_peer;
120 quic_settings.max_idle_timeout =
121 Some(Duration::from_millis(args.idle_timeout));
122 quic_settings.max_recv_udp_payload_size = MAX_DATAGRAM_SIZE;
123 quic_settings.max_send_udp_payload_size = MAX_DATAGRAM_SIZE;
124 quic_settings.initial_max_data = 10_000_000;
125 quic_settings.initial_max_stream_data_bidi_local =
126 args.max_stream_data_bidi_local;
127 quic_settings.initial_max_stream_data_bidi_remote =
128 args.max_stream_data_bidi_remote;
129 quic_settings.initial_max_stream_data_uni = args.max_stream_data_uni;
130 quic_settings.initial_max_streams_bidi = args.max_streams_bidi;
131 quic_settings.initial_max_streams_uni = args.max_streams_uni;
132 quic_settings.disable_active_migration = true;
133 quic_settings.active_connection_id_limit = 0;
134 quic_settings.max_connection_window = args.max_window;
135 quic_settings.max_stream_window = args.max_stream_window;
136 quic_settings.enable_send_streams_blocked = true;
137 quic_settings.grease = false;
138
139 quic_settings.capture_quiche_logs = true;
140 quic_settings.keylog_file = std::env::var_os("SSLKEYLOGFILE")
141 .and_then(|os_str| os_str.into_string().ok());
142
143 quic_settings.enable_dgram = args.enable_dgram;
144 quic_settings.dgram_recv_max_queue_len = args.dgram_recv_queue_len;
145 quic_settings.dgram_send_max_queue_len = args.dgram_send_queue_len;
146
147 quic_settings
148}
149
150#[must_use = "must await to get a ConnectionSummary"]
157pub struct BuildingConnectionSummary {
158 rx: mpsc::UnboundedReceiver<ConnectionRecord>,
159 summary: Option<ConnectionSummary>,
160 seen_all_close_trigger_frames: Option<oneshot::Sender<()>>,
161}
162
163impl BuildingConnectionSummary {
164 fn new(
165 rx: mpsc::UnboundedReceiver<ConnectionRecord>,
166 close_trigger_frames: Option<CloseTriggerFrames>,
167 trigger_frame_tx: oneshot::Sender<()>,
168 ) -> Self {
169 let summary = ConnectionSummary {
170 stream_map: StreamMap::new(close_trigger_frames),
171 ..Default::default()
172 };
173
174 Self {
175 rx,
176 summary: Some(summary),
177 seen_all_close_trigger_frames: Some(trigger_frame_tx),
178 }
179 }
180}
181
182impl Future for BuildingConnectionSummary {
183 type Output = ConnectionSummary;
184
185 fn poll(
186 mut self: Pin<&mut Self>, cx: &mut Context<'_>,
187 ) -> Poll<Self::Output> {
188 while let Poll::Ready(Some(record)) = self.rx.poll_recv(cx) {
189 let summary = self.summary.as_mut().expect("summary already taken");
192
193 match record {
194 ConnectionRecord::StreamedFrame { stream_id, frame } => {
195 let stream_map = &mut summary.stream_map;
196 stream_map.insert(stream_id, frame);
197
198 if stream_map.all_close_trigger_frames_seen() {
199 if let Some(expected_tx) =
201 self.seen_all_close_trigger_frames.take()
202 {
203 let _ = expected_tx.send(());
204 }
205 }
206 },
207 ConnectionRecord::ConnectionStats(s) => summary.stats = Some(s),
208 ConnectionRecord::PathStats(ps) => summary.path_stats = ps,
209 ConnectionRecord::Close(d) => summary.conn_close_details = d,
210 };
211 }
212
213 if self.rx.is_closed() {
214 let summary = self.summary.take().expect("summary already taken");
217 Poll::Ready(summary)
218 } else {
219 Poll::Pending
220 }
221 }
222}
223
224pub struct H3iDriver {
225 buffer: Vec<u8>,
226 actions: Vec<Action>,
227 actions_executed: usize,
228 next_fire_time: Instant,
229 waiting_for_responses: WaitingFor,
230 record_tx: mpsc::UnboundedSender<ConnectionRecord>,
231 stream_parsers: StreamParserMap,
232 close_trigger_seen_rx: oneshot::Receiver<()>,
233}
234
235impl H3iDriver {
236 fn new(
237 actions: Vec<Action>, close_trigger_frames: Option<CloseTriggerFrames>,
238 ) -> (Self, BuildingConnectionSummary) {
239 let (record_tx, record_rx) = mpsc::unbounded_channel();
240 let (close_trigger_seen_tx, close_trigger_seen_rx) = oneshot::channel();
241 let fut = BuildingConnectionSummary::new(
242 record_rx,
243 close_trigger_frames,
244 close_trigger_seen_tx,
245 );
246
247 (
248 Self {
249 buffer: vec![0u8; BufFactory::MAX_BUF_SIZE],
250 actions,
251 actions_executed: 0,
252 next_fire_time: Instant::now(),
253 waiting_for_responses: WaitingFor::default(),
254 record_tx,
255 stream_parsers: StreamParserMap::default(),
256 close_trigger_seen_rx,
257 },
258 fut,
259 )
260 }
261
262 fn should_fire(&self) -> bool {
264 Instant::now() >= self.next_fire_time
265 }
266
267 fn register_waits(&mut self) {
269 while self.actions_executed < self.actions.len() {
270 if let Action::Wait { wait_type } =
271 &self.actions[self.actions_executed]
272 {
273 self.actions_executed += 1;
274
275 match wait_type {
276 WaitType::WaitDuration(duration) => {
277 self.next_fire_time = Instant::now() + *duration;
278
279 log::debug!(
280 "h3i: waiting for responses: {:?}",
281 self.waiting_for_responses
282 );
283 },
284 WaitType::StreamEvent(event) => {
285 self.waiting_for_responses.add_wait(event);
286 },
287 WaitType::CanOpenNumStreams(required_streams) => {
288 log::info!(
289 "h3i: waiting for peer_streams_left_bidi >= {required_streams:?}"
290 );
291 self.waiting_for_responses
292 .set_required_stream_quota(*required_streams);
293 },
294 }
295 } else {
296 break;
297 }
298 }
299 }
300}
301
302impl Client for H3iDriver {
303 fn stream_parsers_mut(&mut self) -> &mut StreamParserMap {
304 &mut self.stream_parsers
305 }
306
307 fn handle_response_frame(
308 &mut self, stream_id: u64, frame: crate::frame::H3iFrame,
309 ) {
310 self.record_tx
311 .send(ConnectionRecord::StreamedFrame { stream_id, frame })
312 .expect("H3iDriver task dropped")
313 }
314}
315
316impl ApplicationOverQuic for H3iDriver {
317 fn on_conn_established(
318 &mut self, _qconn: &mut QuicheConnection, _handshake_info: &HandshakeInfo,
319 ) -> QuicResult<()> {
320 log::info!("h3i: HTTP/3 connection established");
321 Ok(())
322 }
323
324 fn should_act(&self) -> bool {
325 true
328 }
329
330 fn process_reads(&mut self, qconn: &mut QuicheConnection) -> QuicResult<()> {
331 log::trace!("h3i: process_reads");
332
333 self.register_waits();
337
338 let stream_events = parse_streams(qconn, self);
339 for event in stream_events {
340 self.waiting_for_responses.remove_wait(event);
341 }
342
343 self.waiting_for_responses.check_can_open_num_streams(qconn);
344
345 Ok(())
346 }
347
348 fn process_writes(&mut self, qconn: &mut QuicheConnection) -> QuicResult<()> {
349 log::trace!("h3i: process_writes");
350
351 if !self.waiting_for_responses.is_empty() {
352 log::debug!(
353 "awaiting responses on streams {:?}, skipping further action",
354 self.waiting_for_responses
355 );
356
357 return Ok(());
358 }
359
360 let iter = self.actions.clone().into_iter().skip(self.actions_executed);
362
363 for action in iter {
364 match action {
365 Action::SendFrame { .. } |
366 Action::StreamBytes { .. } |
367 Action::SendDatagram { .. } |
368 Action::ResetStream { .. } |
369 Action::StopSending { .. } |
370 Action::OpenUniStream { .. } |
371 Action::ConnectionClose { .. } |
372 Action::SendHeadersFrame { .. } => {
373 if self.should_fire() {
374 self.next_fire_time = Instant::now();
377
378 execute_action(&action, qconn, self.stream_parsers_mut());
379 self.actions_executed += 1;
380 } else {
381 break;
382 }
383 },
384 Action::Wait { .. } => {
385 break;
390 },
391 Action::FlushPackets => {
392 self.actions_executed += 1;
393 break;
394 },
395 }
396 }
397
398 Ok(())
399 }
400
401 async fn wait_for_data(
402 &mut self, qconn: &mut QuicheConnection,
403 ) -> QuicResult<()> {
404 log::trace!("h3i: wait_for_data");
405
406 let sleep_fut = if !self.should_fire() {
407 sleep_until(self.next_fire_time)
408 } else {
409 sleep(Duration::MAX)
412 };
413
414 select! {
415 rx = &mut self.close_trigger_seen_rx, if !self.close_trigger_seen_rx.is_terminated() => {
416 if rx.is_ok() {
420 let _ = qconn.close(true, quiche::h3::WireErrorCode::NoError as u64, b"saw all expected frames");
422 }
423 }
424 _ = sleep_fut => {}
425 }
426
427 Ok(())
428 }
429
430 fn buffer(&mut self) -> &mut [u8] {
431 &mut self.buffer
432 }
433
434 fn on_conn_close<M: Metrics>(
435 &mut self, qconn: &mut QuicheConnection, _metrics: &M,
436 _work_loop_result: &QuicResult<()>,
437 ) {
438 let _ = self
439 .record_tx
440 .send(ConnectionRecord::Close(ConnectionCloseDetails::new(qconn)));
441
442 let _ = self
443 .record_tx
444 .send(ConnectionRecord::ConnectionStats(qconn.stats()));
445
446 let conn_path_stats = qconn.path_stats().collect::<Vec<PathStats>>();
447 let _ = self
448 .record_tx
449 .send(ConnectionRecord::PathStats(conn_path_stats));
450 }
451}
452
453pub enum ConnectionRecord {
454 StreamedFrame { stream_id: u64, frame: H3iFrame },
455 Close(ConnectionCloseDetails),
456 PathStats(Vec<PathStats>),
457 ConnectionStats(Stats),
458}