1use buffer_pool::ConsumeBuffer;
31use buffer_pool::Pooled;
32use log;
33use quiche::PathStats;
34use quiche::Stats;
35use std::future::Future;
36use std::pin::Pin;
37use std::task::Context;
38use std::task::Poll;
39use std::time::Duration;
40use tokio::select;
41use tokio::sync::mpsc;
42use tokio::sync::oneshot;
43use tokio::time::sleep;
44use tokio::time::sleep_until;
45use tokio::time::Instant;
46use tokio_quiche::buf_factory::BufFactory;
47use tokio_quiche::metrics::Metrics;
48use tokio_quiche::quic::HandshakeInfo;
49use tokio_quiche::quic::QuicheConnection;
50use tokio_quiche::settings::Hooks;
51use tokio_quiche::settings::QuicSettings;
52use tokio_quiche::socket::Socket;
53use tokio_quiche::ApplicationOverQuic;
54use tokio_quiche::ConnectionParams;
55use tokio_quiche::QuicResult;
56
57use crate::actions::h3::Action;
58use crate::actions::h3::WaitType;
59use crate::actions::h3::WaitingFor;
60use crate::client::execute_action;
61use crate::client::parse_args;
62use crate::client::parse_streams;
63use crate::client::ClientError;
64use crate::client::CloseTriggerFrames;
65use crate::client::ConnectionSummary;
66use crate::client::ParsedArgs;
67use crate::client::StreamMap;
68use crate::client::MAX_DATAGRAM_SIZE;
69use crate::config::Config as H3iConfig;
70use crate::frame::H3iFrame;
71use crate::quiche;
72
73use super::Client;
74use super::ConnectionCloseDetails;
75use super::StreamParserMap;
76
77pub async fn connect(
79 args: &H3iConfig, frame_actions: Vec<Action>,
80 close_trigger_frames: Option<CloseTriggerFrames>,
81) -> std::result::Result<BuildingConnectionSummary, ClientError> {
82 let quic_settings = create_config(args);
83 let mut connection_params =
84 ConnectionParams::new_client(quic_settings, None, Hooks::default());
85
86 connection_params.session = args.session.clone();
87
88 let ParsedArgs {
89 connect_url,
90 bind_addr,
91 peer_addr,
92 } = parse_args(args);
93
94 let socket = tokio::net::UdpSocket::bind(bind_addr).await.unwrap();
95 socket.connect(peer_addr).await.unwrap();
96
97 log::info!(
98 "connecting to {:} from {:}",
99 peer_addr,
100 socket.local_addr().unwrap()
101 );
102
103 let (h3i, conn_summary_fut) =
104 H3iDriver::new(frame_actions, close_trigger_frames);
105 match tokio_quiche::quic::connect_with_config(
106 Socket::try_from(socket).unwrap(),
107 connect_url,
108 &connection_params,
109 h3i,
110 )
111 .await
112 {
113 Ok(_) => Ok(conn_summary_fut),
114 Err(_) => Err(ClientError::HandshakeFail),
115 }
116}
117
118fn create_config(args: &H3iConfig) -> QuicSettings {
119 let mut quic_settings = QuicSettings::default();
120
121 quic_settings.verify_peer = args.verify_peer;
122 quic_settings.max_idle_timeout =
123 Some(Duration::from_millis(args.idle_timeout));
124 quic_settings.max_recv_udp_payload_size = MAX_DATAGRAM_SIZE;
125 quic_settings.max_send_udp_payload_size = MAX_DATAGRAM_SIZE;
126 quic_settings.initial_max_data = 10_000_000;
127 quic_settings.initial_max_stream_data_bidi_local =
128 args.max_stream_data_bidi_local;
129 quic_settings.initial_max_stream_data_bidi_remote =
130 args.max_stream_data_bidi_remote;
131 quic_settings.initial_max_stream_data_uni = args.max_stream_data_uni;
132 quic_settings.initial_max_streams_bidi = args.max_streams_bidi;
133 quic_settings.initial_max_streams_uni = args.max_streams_uni;
134 quic_settings.disable_active_migration = true;
135 quic_settings.active_connection_id_limit = 0;
136 quic_settings.max_connection_window = args.max_window;
137 quic_settings.max_stream_window = args.max_stream_window;
138 quic_settings.grease = false;
139
140 quic_settings.capture_quiche_logs = true;
141 quic_settings.keylog_file = std::env::var_os("SSLKEYLOGFILE")
142 .and_then(|os_str| os_str.into_string().ok());
143
144 quic_settings.enable_dgram = args.enable_dgram;
145 quic_settings.dgram_recv_max_queue_len = args.dgram_recv_queue_len;
146 quic_settings.dgram_send_max_queue_len = args.dgram_send_queue_len;
147
148 quic_settings
149}
150
151#[must_use = "must await to get a ConnectionSummary"]
158pub struct BuildingConnectionSummary {
159 rx: mpsc::UnboundedReceiver<ConnectionRecord>,
160 summary: Option<ConnectionSummary>,
161 seen_all_close_trigger_frames: Option<oneshot::Sender<()>>,
162}
163
164impl BuildingConnectionSummary {
165 fn new(
166 rx: mpsc::UnboundedReceiver<ConnectionRecord>,
167 close_trigger_frames: Option<CloseTriggerFrames>,
168 trigger_frame_tx: oneshot::Sender<()>,
169 ) -> Self {
170 let summary = ConnectionSummary {
171 stream_map: StreamMap::new(close_trigger_frames),
172 ..Default::default()
173 };
174
175 Self {
176 rx,
177 summary: Some(summary),
178 seen_all_close_trigger_frames: Some(trigger_frame_tx),
179 }
180 }
181}
182
183impl Future for BuildingConnectionSummary {
184 type Output = ConnectionSummary;
185
186 fn poll(
187 mut self: Pin<&mut Self>, cx: &mut Context<'_>,
188 ) -> Poll<Self::Output> {
189 while let Poll::Ready(Some(record)) = self.rx.poll_recv(cx) {
190 let summary = self.summary.as_mut().expect("summary already taken");
193
194 match record {
195 ConnectionRecord::StreamedFrame { stream_id, frame } => {
196 let stream_map = &mut summary.stream_map;
197 stream_map.insert(stream_id, frame);
198
199 if stream_map.all_close_trigger_frames_seen() {
200 if let Some(expected_tx) =
202 self.seen_all_close_trigger_frames.take()
203 {
204 let _ = expected_tx.send(());
205 }
206 }
207 },
208 ConnectionRecord::ConnectionStats(s) => summary.stats = Some(s),
209 ConnectionRecord::PathStats(ps) => summary.path_stats = ps,
210 ConnectionRecord::Close(d) => summary.conn_close_details = d,
211 };
212 }
213
214 if self.rx.is_closed() {
215 let summary = self.summary.take().expect("summary already taken");
218 Poll::Ready(summary)
219 } else {
220 Poll::Pending
221 }
222 }
223}
224
225pub struct H3iDriver {
226 buffer: Pooled<ConsumeBuffer>,
227 actions: Vec<Action>,
228 actions_executed: usize,
229 next_fire_time: Instant,
230 waiting_for_responses: WaitingFor,
231 record_tx: mpsc::UnboundedSender<ConnectionRecord>,
232 stream_parsers: StreamParserMap,
233 close_trigger_seen_rx: oneshot::Receiver<()>,
234}
235
236impl H3iDriver {
237 fn new(
238 actions: Vec<Action>, close_trigger_frames: Option<CloseTriggerFrames>,
239 ) -> (Self, BuildingConnectionSummary) {
240 let (record_tx, record_rx) = mpsc::unbounded_channel();
241 let (close_trigger_seen_tx, close_trigger_seen_rx) = oneshot::channel();
242 let fut = BuildingConnectionSummary::new(
243 record_rx,
244 close_trigger_frames,
245 close_trigger_seen_tx,
246 );
247
248 (
249 Self {
250 buffer: BufFactory::get_max_buf(),
251 actions,
252 actions_executed: 0,
253 next_fire_time: Instant::now(),
254 waiting_for_responses: WaitingFor::default(),
255 record_tx,
256 stream_parsers: StreamParserMap::default(),
257 close_trigger_seen_rx,
258 },
259 fut,
260 )
261 }
262
263 fn should_fire(&self) -> bool {
265 Instant::now() >= self.next_fire_time
266 }
267
268 fn register_waits(&mut self) {
270 while self.actions_executed < self.actions.len() {
271 if let Action::Wait { wait_type } =
272 &self.actions[self.actions_executed]
273 {
274 self.actions_executed += 1;
275
276 match wait_type {
277 WaitType::WaitDuration(duration) => {
278 self.next_fire_time = Instant::now() + *duration;
279
280 log::debug!(
281 "h3i: waiting for responses: {:?}",
282 self.waiting_for_responses
283 );
284 },
285 WaitType::StreamEvent(event) => {
286 self.waiting_for_responses.add_wait(event);
287 },
288 }
289 } else {
290 break;
291 }
292 }
293 }
294}
295
296impl Client for H3iDriver {
297 fn stream_parsers_mut(&mut self) -> &mut StreamParserMap {
298 &mut self.stream_parsers
299 }
300
301 fn handle_response_frame(
302 &mut self, stream_id: u64, frame: crate::frame::H3iFrame,
303 ) {
304 self.record_tx
305 .send(ConnectionRecord::StreamedFrame { stream_id, frame })
306 .expect("H3iDriver task dropped")
307 }
308}
309
310impl ApplicationOverQuic for H3iDriver {
311 fn on_conn_established(
312 &mut self, _qconn: &mut QuicheConnection, _handshake_info: &HandshakeInfo,
313 ) -> QuicResult<()> {
314 log::info!("h3i: HTTP/3 connection established");
315 Ok(())
316 }
317
318 fn should_act(&self) -> bool {
319 true
322 }
323
324 fn process_reads(&mut self, qconn: &mut QuicheConnection) -> QuicResult<()> {
325 log::trace!("h3i: process_reads");
326
327 self.register_waits();
331
332 let stream_events = parse_streams(qconn, self);
333 for event in stream_events {
334 self.waiting_for_responses.remove_wait(event);
335 }
336
337 Ok(())
338 }
339
340 fn process_writes(&mut self, qconn: &mut QuicheConnection) -> QuicResult<()> {
341 log::trace!("h3i: process_writes");
342
343 if !self.waiting_for_responses.is_empty() {
344 log::debug!(
345 "awaiting responses on streams {:?}, skipping further action",
346 self.waiting_for_responses
347 );
348
349 return Ok(());
350 }
351
352 let iter = self.actions.clone().into_iter().skip(self.actions_executed);
354
355 for action in iter {
356 match action {
357 Action::SendFrame { .. } |
358 Action::StreamBytes { .. } |
359 Action::SendDatagram { .. } |
360 Action::ResetStream { .. } |
361 Action::StopSending { .. } |
362 Action::OpenUniStream { .. } |
363 Action::ConnectionClose { .. } |
364 Action::SendHeadersFrame { .. } => {
365 if self.should_fire() {
366 self.next_fire_time = Instant::now();
369
370 execute_action(&action, qconn, self.stream_parsers_mut());
371 self.actions_executed += 1;
372 } else {
373 break;
374 }
375 },
376 Action::Wait { .. } => {
377 break;
382 },
383 Action::FlushPackets => {
384 self.actions_executed += 1;
385 break;
386 },
387 }
388 }
389
390 Ok(())
391 }
392
393 async fn wait_for_data(
394 &mut self, qconn: &mut QuicheConnection,
395 ) -> QuicResult<()> {
396 log::trace!("h3i: wait_for_data");
397
398 let sleep_fut = if !self.should_fire() {
399 sleep_until(self.next_fire_time)
400 } else {
401 sleep(Duration::MAX)
404 };
405
406 select! {
407 rx = &mut self.close_trigger_seen_rx, if !self.close_trigger_seen_rx.is_terminated() => {
408 if rx.is_ok() {
412 let _ = qconn.close(true, quiche::h3::WireErrorCode::NoError as u64, b"saw all expected frames");
414 }
415 }
416 _ = sleep_fut => {}
417 }
418
419 Ok(())
420 }
421
422 fn buffer(&mut self) -> &mut [u8] {
423 &mut self.buffer
424 }
425
426 fn on_conn_close<M: Metrics>(
427 &mut self, qconn: &mut QuicheConnection, _metrics: &M,
428 _work_loop_result: &QuicResult<()>,
429 ) {
430 let _ = self
431 .record_tx
432 .send(ConnectionRecord::Close(ConnectionCloseDetails::new(qconn)));
433
434 let _ = self
435 .record_tx
436 .send(ConnectionRecord::ConnectionStats(qconn.stats()));
437
438 let conn_path_stats = qconn.path_stats().collect::<Vec<PathStats>>();
439 let _ = self
440 .record_tx
441 .send(ConnectionRecord::PathStats(conn_path_stats));
442 }
443}
444
445pub enum ConnectionRecord {
446 StreamedFrame { stream_id: u64, frame: H3iFrame },
447 Close(ConnectionCloseDetails),
448 PathStats(Vec<PathStats>),
449 ConnectionStats(Stats),
450}