1use buffer_pool::ConsumeBuffer;
31use buffer_pool::Pooled;
32use log;
33use quiche::PathStats;
34use quiche::Stats;
35use std::future::Future;
36use std::pin::Pin;
37use std::task::Context;
38use std::task::Poll;
39use std::time::Duration;
40use tokio::select;
41use tokio::sync::mpsc;
42use tokio::sync::oneshot;
43use tokio::time::sleep_until;
44use tokio::time::Instant;
45use tokio_quiche::buf_factory::BufFactory;
46use tokio_quiche::metrics::Metrics;
47use tokio_quiche::quic::HandshakeInfo;
48use tokio_quiche::quic::QuicheConnection;
49use tokio_quiche::settings::Hooks;
50use tokio_quiche::settings::QuicSettings;
51use tokio_quiche::socket::Socket;
52use tokio_quiche::ApplicationOverQuic;
53use tokio_quiche::ConnectionParams;
54use tokio_quiche::QuicResult;
55
56use crate::actions::h3::Action;
57use crate::actions::h3::WaitType;
58use crate::actions::h3::WaitingFor;
59use crate::client::execute_action;
60use crate::client::parse_args;
61use crate::client::parse_streams;
62use crate::client::ClientError;
63use crate::client::CloseTriggerFrames;
64use crate::client::ConnectionSummary;
65use crate::client::ParsedArgs;
66use crate::client::StreamMap;
67use crate::client::MAX_DATAGRAM_SIZE;
68use crate::config::Config as H3iConfig;
69use crate::frame::H3iFrame;
70use crate::quiche;
71
72use super::Client;
73use super::ConnectionCloseDetails;
74use super::StreamParserMap;
75
76pub async fn connect(
78 args: &H3iConfig, frame_actions: Vec<Action>,
79 close_trigger_frames: Option<CloseTriggerFrames>,
80) -> std::result::Result<BuildingConnectionSummary, ClientError> {
81 let quic_settings = create_config(args);
82 let connection_params =
83 ConnectionParams::new_client(quic_settings, None, Hooks::default());
84
85 let ParsedArgs {
86 connect_url,
87 bind_addr,
88 peer_addr,
89 } = parse_args(args);
90
91 let socket = tokio::net::UdpSocket::bind(bind_addr).await.unwrap();
92 socket.connect(peer_addr).await.unwrap();
93
94 log::info!(
95 "connecting to {:} from {:}",
96 peer_addr,
97 socket.local_addr().unwrap()
98 );
99
100 let (h3i, conn_summary_fut) =
101 H3iDriver::new(frame_actions, close_trigger_frames);
102 match tokio_quiche::quic::connect_with_config(
103 Socket::try_from(socket).unwrap(),
104 connect_url,
105 &connection_params,
106 h3i,
107 )
108 .await
109 {
110 Ok(_) => Ok(conn_summary_fut),
111 Err(_) => Err(ClientError::HandshakeFail),
112 }
113}
114
115fn create_config(args: &H3iConfig) -> QuicSettings {
116 let mut quic_settings = QuicSettings::default();
117
118 quic_settings.verify_peer = args.verify_peer;
119 quic_settings.max_idle_timeout =
120 Some(Duration::from_millis(args.idle_timeout));
121 quic_settings.max_recv_udp_payload_size = MAX_DATAGRAM_SIZE;
122 quic_settings.max_send_udp_payload_size = MAX_DATAGRAM_SIZE;
123 quic_settings.initial_max_data = 10_000_000;
124 quic_settings.initial_max_stream_data_bidi_local =
125 args.max_stream_data_bidi_local;
126 quic_settings.initial_max_stream_data_bidi_remote =
127 args.max_stream_data_bidi_remote;
128 quic_settings.initial_max_stream_data_uni = args.max_stream_data_uni;
129 quic_settings.initial_max_streams_bidi = args.max_streams_bidi;
130 quic_settings.initial_max_streams_uni = args.max_streams_uni;
131 quic_settings.disable_active_migration = true;
132 quic_settings.active_connection_id_limit = 0;
133 quic_settings.max_connection_window = args.max_window;
134 quic_settings.max_stream_window = args.max_stream_window;
135 quic_settings.grease = false;
136
137 quic_settings.capture_quiche_logs = true;
138 quic_settings.keylog_file = std::env::var_os("SSLKEYLOGFILE")
139 .and_then(|os_str| os_str.into_string().ok());
140
141 quic_settings
142}
143
144#[must_use = "must await to get a ConnectionSummary"]
151pub struct BuildingConnectionSummary {
152 rx: mpsc::UnboundedReceiver<ConnectionRecord>,
153 summary: Option<ConnectionSummary>,
154 seen_all_close_trigger_frames: Option<oneshot::Sender<()>>,
155}
156
157impl BuildingConnectionSummary {
158 fn new(
159 rx: mpsc::UnboundedReceiver<ConnectionRecord>,
160 close_trigger_frames: Option<CloseTriggerFrames>,
161 trigger_frame_tx: oneshot::Sender<()>,
162 ) -> Self {
163 let summary = ConnectionSummary {
164 stream_map: StreamMap::new(close_trigger_frames),
165 ..Default::default()
166 };
167
168 Self {
169 rx,
170 summary: Some(summary),
171 seen_all_close_trigger_frames: Some(trigger_frame_tx),
172 }
173 }
174}
175
176impl Future for BuildingConnectionSummary {
177 type Output = ConnectionSummary;
178
179 fn poll(
180 mut self: Pin<&mut Self>, cx: &mut Context<'_>,
181 ) -> Poll<Self::Output> {
182 while let Poll::Ready(Some(record)) = self.rx.poll_recv(cx) {
183 let summary = self.summary.as_mut().expect("summary already taken");
186
187 match record {
188 ConnectionRecord::StreamedFrame { stream_id, frame } => {
189 let stream_map = &mut summary.stream_map;
190 stream_map.insert(stream_id, frame);
191
192 if stream_map.all_close_trigger_frames_seen() {
193 if let Some(expected_tx) =
195 self.seen_all_close_trigger_frames.take()
196 {
197 let _ = expected_tx.send(());
198 }
199 }
200 },
201 ConnectionRecord::ConnectionStats(s) => summary.stats = Some(s),
202 ConnectionRecord::PathStats(ps) => summary.path_stats = ps,
203 ConnectionRecord::Close(d) => summary.conn_close_details = d,
204 };
205 }
206
207 if self.rx.is_closed() {
208 let summary = self.summary.take().expect("summary already taken");
211 Poll::Ready(summary)
212 } else {
213 Poll::Pending
214 }
215 }
216}
217
218pub struct H3iDriver {
219 buffer: Pooled<ConsumeBuffer>,
220 actions: Vec<Action>,
221 actions_executed: usize,
222 next_fire_time: Instant,
223 waiting_for_responses: WaitingFor,
224 record_tx: mpsc::UnboundedSender<ConnectionRecord>,
225 stream_parsers: StreamParserMap,
226 close_trigger_seen_rx: oneshot::Receiver<()>,
227}
228
229impl H3iDriver {
230 fn new(
231 actions: Vec<Action>, close_trigger_frames: Option<CloseTriggerFrames>,
232 ) -> (Self, BuildingConnectionSummary) {
233 let (record_tx, record_rx) = mpsc::unbounded_channel();
234 let (close_trigger_seen_tx, close_trigger_seen_rx) = oneshot::channel();
235 let fut = BuildingConnectionSummary::new(
236 record_rx,
237 close_trigger_frames,
238 close_trigger_seen_tx,
239 );
240
241 (
242 Self {
243 buffer: BufFactory::get_max_buf(),
244 actions,
245 actions_executed: 0,
246 next_fire_time: Instant::now(),
247 waiting_for_responses: WaitingFor::default(),
248 record_tx,
249 stream_parsers: StreamParserMap::default(),
250 close_trigger_seen_rx,
251 },
252 fut,
253 )
254 }
255
256 fn should_fire(&self) -> bool {
258 Instant::now() >= self.next_fire_time
259 }
260
261 fn register_waits(&mut self) {
263 while self.actions_executed < self.actions.len() {
264 if let Action::Wait { wait_type } =
265 &self.actions[self.actions_executed]
266 {
267 self.actions_executed += 1;
268
269 match wait_type {
270 WaitType::WaitDuration(duration) => {
271 self.next_fire_time = Instant::now() + *duration;
272
273 log::debug!(
274 "h3i: waiting for responses: {:?}",
275 self.waiting_for_responses
276 );
277 },
278 WaitType::StreamEvent(event) => {
279 self.waiting_for_responses.add_wait(event);
280 },
281 }
282 } else {
283 break;
284 }
285 }
286 }
287}
288
289impl Client for H3iDriver {
290 fn stream_parsers_mut(&mut self) -> &mut StreamParserMap {
291 &mut self.stream_parsers
292 }
293
294 fn handle_response_frame(
295 &mut self, stream_id: u64, frame: crate::frame::H3iFrame,
296 ) {
297 self.record_tx
298 .send(ConnectionRecord::StreamedFrame { stream_id, frame })
299 .expect("H3iDriver task dropped")
300 }
301}
302
303impl ApplicationOverQuic for H3iDriver {
304 fn on_conn_established(
305 &mut self, _qconn: &mut QuicheConnection, _handshake_info: &HandshakeInfo,
306 ) -> QuicResult<()> {
307 log::info!("h3i: HTTP/3 connection established");
308 Ok(())
309 }
310
311 fn should_act(&self) -> bool {
312 true
315 }
316
317 fn process_reads(&mut self, qconn: &mut QuicheConnection) -> QuicResult<()> {
318 log::trace!("h3i: process_reads");
319
320 self.register_waits();
324
325 let stream_events = parse_streams(qconn, self);
326 for event in stream_events {
327 self.waiting_for_responses.remove_wait(event);
328 }
329
330 Ok(())
331 }
332
333 fn process_writes(&mut self, qconn: &mut QuicheConnection) -> QuicResult<()> {
334 log::trace!("h3i: process_writes");
335
336 if !self.waiting_for_responses.is_empty() {
337 log::debug!(
338 "awaiting responses on streams {:?}, skipping further action",
339 self.waiting_for_responses
340 );
341
342 return Ok(());
343 }
344
345 let mut iter =
347 self.actions.clone().into_iter().skip(self.actions_executed);
348 while let Some(action) = iter.next() {
349 match action {
350 Action::SendFrame { .. } |
351 Action::StreamBytes { .. } |
352 Action::ResetStream { .. } |
353 Action::StopSending { .. } |
354 Action::OpenUniStream { .. } |
355 Action::ConnectionClose { .. } |
356 Action::SendHeadersFrame { .. } => {
357 if self.should_fire() {
358 self.next_fire_time = Instant::now();
361
362 execute_action(&action, qconn, self.stream_parsers_mut());
363 self.actions_executed += 1;
364 } else {
365 break;
366 }
367 },
368 Action::Wait { .. } => {
369 break;
374 },
375 Action::FlushPackets => {
376 self.actions_executed += 1;
377 break;
378 },
379 }
380 }
381
382 Ok(())
383 }
384
385 async fn wait_for_data(
386 &mut self, qconn: &mut QuicheConnection,
387 ) -> QuicResult<()> {
388 log::trace!("h3i: wait_for_data");
389
390 let waiting = !self.should_fire();
391 select! {
392 rx = &mut self.close_trigger_seen_rx, if !self.close_trigger_seen_rx.is_terminated() => {
393 if rx.is_ok() {
397 let _ = qconn.close(true, quiche::h3::WireErrorCode::NoError as u64, b"saw all expected frames");
399 }
400 }
401 _ = sleep_until(self.next_fire_time), if waiting => {
402 log::debug!("h3i: releasing wait timer");
403 }
404 }
405
406 Ok(())
407 }
408
409 fn buffer(&mut self) -> &mut [u8] {
410 &mut self.buffer
411 }
412
413 fn on_conn_close<M: Metrics>(
414 &mut self, qconn: &mut QuicheConnection, _metrics: &M,
415 _work_loop_result: &QuicResult<()>,
416 ) {
417 let _ = self
418 .record_tx
419 .send(ConnectionRecord::Close(ConnectionCloseDetails::new(&qconn)));
420
421 let _ = self
422 .record_tx
423 .send(ConnectionRecord::ConnectionStats(qconn.stats()));
424
425 let conn_path_stats = qconn.path_stats().collect::<Vec<PathStats>>();
426 let _ = self
427 .record_tx
428 .send(ConnectionRecord::PathStats(conn_path_stats));
429 }
430}
431
432pub enum ConnectionRecord {
433 StreamedFrame { stream_id: u64, frame: H3iFrame },
434 Close(ConnectionCloseDetails),
435 PathStats(Vec<PathStats>),
436 ConnectionStats(Stats),
437}