1use std::slice::Iter;
30use std::time::Duration;
31use std::time::Instant;
32
33use ring::rand::*;
34
35use crate::client::QUIC_VERSION;
36use crate::frame::H3iFrame;
37use crate::quiche;
38
39use crate::actions::h3::Action;
40use crate::actions::h3::StreamEventType;
41use crate::actions::h3::WaitType;
42use crate::actions::h3::WaitingFor;
43use crate::client::execute_action;
44use crate::client::parse_streams;
45use crate::client::ClientError;
46use crate::client::ConnectionCloseDetails;
47use crate::client::MAX_DATAGRAM_SIZE;
48use crate::config::Config;
49
50use super::parse_args;
51use super::Client;
52use super::CloseTriggerFrames;
53use super::ConnectionSummary;
54use super::ParsedArgs;
55use super::StreamMap;
56use super::StreamParserMap;
57
58#[derive(Default)]
59struct SyncClient {
60 streams: StreamMap,
61 stream_parsers: StreamParserMap,
62}
63
64impl SyncClient {
65 fn new(close_trigger_frames: Option<CloseTriggerFrames>) -> Self {
66 Self {
67 streams: StreamMap::new(close_trigger_frames),
68 ..Default::default()
69 }
70 }
71}
72
73impl Client for SyncClient {
74 fn stream_parsers_mut(&mut self) -> &mut StreamParserMap {
75 &mut self.stream_parsers
76 }
77
78 fn handle_response_frame(&mut self, stream_id: u64, frame: H3iFrame) {
79 self.streams.insert(stream_id, frame);
80 }
81}
82
83fn create_config(args: &Config, should_log_keys: bool) -> quiche::Config {
84 let mut config = quiche::Config::new(QUIC_VERSION).unwrap();
86
87 config.verify_peer(args.verify_peer);
88 config.set_application_protos(&[b"h3"]).unwrap();
89 config.set_max_idle_timeout(args.idle_timeout);
90 config.set_max_recv_udp_payload_size(MAX_DATAGRAM_SIZE);
91 config.set_max_send_udp_payload_size(MAX_DATAGRAM_SIZE);
92 config.set_initial_max_data(10_000_000);
93 config
94 .set_initial_max_stream_data_bidi_local(args.max_stream_data_bidi_local);
95 config.set_initial_max_stream_data_bidi_remote(
96 args.max_stream_data_bidi_remote,
97 );
98 config.set_initial_max_stream_data_uni(args.max_stream_data_uni);
99 config.set_initial_max_streams_bidi(args.max_streams_bidi);
100 config.set_initial_max_streams_uni(args.max_streams_uni);
101 config.set_disable_active_migration(true);
102 config.set_active_connection_id_limit(0);
103
104 config.set_max_connection_window(args.max_window);
105 config.set_max_stream_window(args.max_stream_window);
106 config.grease(false);
107
108 if should_log_keys {
109 config.log_keys()
110 }
111
112 config
113}
114
115pub fn connect(
126 args: Config, actions: Vec<Action>,
127 close_trigger_frames: Option<CloseTriggerFrames>,
128) -> std::result::Result<ConnectionSummary, ClientError> {
129 let mut buf = [0; 65535];
130 let mut out = [0; MAX_DATAGRAM_SIZE];
131
132 let ParsedArgs {
133 connect_url,
134 bind_addr,
135 peer_addr,
136 } = parse_args(&args);
137
138 let mut poll = mio::Poll::new().unwrap();
140 let mut events = mio::Events::with_capacity(1024);
141
142 let mut socket = mio::net::UdpSocket::bind(bind_addr).unwrap();
145 poll.registry()
146 .register(&mut socket, mio::Token(0), mio::Interest::READABLE)
147 .unwrap();
148
149 let mut keylog = None;
150 if let Some(keylog_path) = std::env::var_os("SSLKEYLOGFILE") {
151 let file = std::fs::OpenOptions::new()
152 .create(true)
153 .append(true)
154 .open(keylog_path)
155 .unwrap();
156
157 keylog = Some(file);
158 }
159
160 let mut config = create_config(&args, keylog.is_some());
161
162 let mut scid = [0; quiche::MAX_CONN_ID_LEN];
164
165 let rng = SystemRandom::new();
166 rng.fill(&mut scid[..]).unwrap();
167
168 let scid = quiche::ConnectionId::from_ref(&scid);
169
170 let Ok(local_addr) = socket.local_addr() else {
171 return Err(ClientError::Other("invalid socket".to_string()));
172 };
173
174 let mut conn =
176 quiche::connect(connect_url, &scid, local_addr, peer_addr, &mut config)
177 .map_err(|e| ClientError::Other(e.to_string()))?;
178
179 if let Some(keylog) = &mut keylog {
180 if let Ok(keylog) = keylog.try_clone() {
181 conn.set_keylog(Box::new(keylog));
182 }
183 }
184
185 log::info!(
186 "connecting to {peer_addr:} from {local_addr:} with scid {scid:?}",
187 );
188
189 let mut app_proto_selected = false;
190
191 let (write, send_info) = conn.send(&mut out).expect("initial send failed");
192
193 while let Err(e) = socket.send_to(&out[..write], send_info.to) {
194 if e.kind() == std::io::ErrorKind::WouldBlock {
195 log::debug!(
196 "{} -> {}: send() would block",
197 socket.local_addr().unwrap(),
198 send_info.to
199 );
200 continue;
201 }
202
203 return Err(ClientError::Other(format!("send() failed: {e:?}")));
204 }
205
206 let app_data_start = std::time::Instant::now();
207
208 let mut action_iter = actions.iter();
209 let mut wait_duration = None;
210 let mut wait_instant = None;
211
212 let mut client = SyncClient::new(close_trigger_frames);
213 let mut waiting_for = WaitingFor::default();
214
215 loop {
216 let actual_sleep = match (wait_duration, conn.timeout()) {
217 (Some(wait), Some(timeout)) => {
218 #[allow(clippy::comparison_chain)]
219 if timeout < wait {
220 let new = wait - timeout;
223 wait_duration = Some(new);
224 Some(timeout)
225 } else if wait < timeout {
226 Some(wait)
227 } else {
228 Some(timeout)
230 }
231 },
232 (None, Some(timeout)) => Some(timeout),
233 (Some(wait), None) => Some(wait),
234 _ => None,
235 };
236
237 log::debug!("actual sleep is {actual_sleep:?}");
238 poll.poll(&mut events, actual_sleep).unwrap();
239
240 if events.is_empty() {
243 log::debug!("timed out");
244
245 conn.on_timeout();
246 }
247
248 for event in &events {
251 let socket = match event.token() {
252 mio::Token(0) => &socket,
253
254 _ => unreachable!(),
255 };
256
257 let local_addr = socket.local_addr().unwrap();
258 'read: loop {
259 let (len, from) = match socket.recv_from(&mut buf) {
260 Ok(v) => v,
261
262 Err(e) => {
263 if e.kind() == std::io::ErrorKind::WouldBlock {
266 break 'read;
267 }
268
269 return Err(ClientError::Other(format!(
270 "{local_addr}: recv() failed: {e:?}"
271 )));
272 },
273 };
274
275 let recv_info = quiche::RecvInfo {
276 to: local_addr,
277 from,
278 };
279
280 let _read = match conn.recv(&mut buf[..len], recv_info) {
282 Ok(v) => v,
283
284 Err(e) => {
285 log::debug!("{local_addr}: recv failed: {e:?}");
286 continue 'read;
287 },
288 };
289 }
290 }
291
292 log::debug!("done reading");
293
294 if conn.is_closed() {
295 log::info!(
296 "connection closed with error={:?} did_idle_timeout={}, stats={:?} path_stats={:?}",
297 conn.peer_error(),
298 conn.is_timed_out(),
299 conn.stats(),
300 conn.path_stats().collect::<Vec<quiche::PathStats>>(),
301 );
302
303 if !conn.is_established() {
304 log::info!(
305 "connection timed out after {:?}",
306 app_data_start.elapsed(),
307 );
308
309 return Err(ClientError::HandshakeFail);
310 }
311
312 break;
313 }
314
315 if (conn.is_established() || conn.is_in_early_data()) &&
318 !app_proto_selected
319 {
320 app_proto_selected = true;
321 }
322
323 if app_proto_selected {
324 check_duration_and_do_actions(
325 &mut wait_duration,
326 &mut wait_instant,
327 &mut action_iter,
328 &mut conn,
329 &mut waiting_for,
330 client.stream_parsers_mut(),
331 );
332
333 let mut wait_cleared = false;
334 for response in parse_streams(&mut conn, &mut client) {
335 let stream_id = response.stream_id;
336
337 if let StreamEventType::Finished = response.event_type {
338 waiting_for.clear_waits_on_stream(stream_id);
339 } else {
340 waiting_for.remove_wait(response);
341 }
342
343 wait_cleared = true;
344 }
345
346 if client.streams.all_close_trigger_frames_seen() {
347 client.streams.close_due_to_trigger_frames(&mut conn);
348 }
349
350 if wait_cleared {
351 check_duration_and_do_actions(
352 &mut wait_duration,
353 &mut wait_instant,
354 &mut action_iter,
355 &mut conn,
356 &mut waiting_for,
357 client.stream_parsers_mut(),
358 );
359 }
360 }
361
362 while conn.scids_left() > 0 {
364 let (scid, reset_token) = generate_cid_and_reset_token();
365
366 if conn.new_scid(&scid, reset_token, false).is_err() {
367 break;
368 }
369 }
370
371 let sockets = vec![&socket];
374
375 for socket in sockets {
376 let local_addr = socket.local_addr().unwrap();
377
378 for peer_addr in conn.paths_iter(local_addr) {
379 loop {
380 let (write, send_info) = match conn.send_on_path(
381 &mut out,
382 Some(local_addr),
383 Some(peer_addr),
384 ) {
385 Ok(v) => v,
386
387 Err(quiche::Error::Done) => {
388 break;
389 },
390
391 Err(e) => {
392 log::error!(
393 "{local_addr} -> {peer_addr}: send failed: {e:?}"
394 );
395
396 conn.close(false, 0x1, b"fail").ok();
397 break;
398 },
399 };
400
401 if let Err(e) = socket.send_to(&out[..write], send_info.to) {
402 if e.kind() == std::io::ErrorKind::WouldBlock {
403 log::debug!(
404 "{} -> {}: send() would block",
405 local_addr,
406 send_info.to
407 );
408 break;
409 }
410
411 return Err(ClientError::Other(format!(
412 "{} -> {}: send() failed: {:?}",
413 local_addr, send_info.to, e
414 )));
415 }
416 }
417 }
418 }
419
420 if conn.is_closed() {
421 log::info!(
422 "connection closed, {:?} {:?}",
423 conn.stats(),
424 conn.path_stats().collect::<Vec<quiche::PathStats>>()
425 );
426
427 if !conn.is_established() {
428 log::info!(
429 "connection timed out after {:?}",
430 app_data_start.elapsed(),
431 );
432
433 return Err(ClientError::HandshakeFail);
434 }
435
436 break;
437 }
438 }
439
440 Ok(ConnectionSummary {
441 stream_map: client.streams,
442 stats: Some(conn.stats()),
443 path_stats: conn.path_stats().collect(),
444 conn_close_details: ConnectionCloseDetails::new(&conn),
445 })
446}
447
448fn check_duration_and_do_actions(
449 wait_duration: &mut Option<Duration>, wait_instant: &mut Option<Instant>,
450 action_iter: &mut Iter<Action>, conn: &mut quiche::Connection,
451 waiting_for: &mut WaitingFor, stream_parsers: &mut StreamParserMap,
452) {
453 match wait_duration.as_ref() {
454 None => {
455 if let Some(idle_wait) =
456 handle_actions(action_iter, conn, waiting_for, stream_parsers)
457 {
458 *wait_duration = Some(idle_wait);
459 *wait_instant = Some(Instant::now());
460
461 log::info!(
466 "waiting for {idle_wait:?} before executing more actions"
467 );
468 }
469 },
470
471 Some(period) => {
472 let now = Instant::now();
473 let then = wait_instant.unwrap();
474 log::debug!(
475 "checking if actions wait period elapsed {:?} > {:?}",
476 now.duration_since(then),
477 wait_duration
478 );
479 if now.duration_since(then) >= *period {
480 log::debug!("yup!");
481 *wait_duration = None;
482
483 if let Some(idle_wait) =
484 handle_actions(action_iter, conn, waiting_for, stream_parsers)
485 {
486 *wait_duration = Some(idle_wait);
487 }
488 }
489 },
490 }
491}
492
493pub fn generate_cid_and_reset_token() -> (quiche::ConnectionId<'static>, u128) {
495 let rng = SystemRandom::new();
496
497 let mut scid = [0; quiche::MAX_CONN_ID_LEN];
498 rng.fill(&mut scid[..]).unwrap();
499 let scid = scid.to_vec().into();
500
501 let mut reset_token = [0; 16];
502 rng.fill(&mut reset_token[..]).unwrap();
503
504 let reset_token = u128::from_be_bytes(reset_token);
505 (scid, reset_token)
506}
507
508fn handle_actions<'a, I>(
509 iter: &mut I, conn: &mut quiche::Connection, waiting_for: &mut WaitingFor,
510 stream_parsers: &mut StreamParserMap,
511) -> Option<Duration>
512where
513 I: Iterator<Item = &'a Action>,
514{
515 if !waiting_for.is_empty() {
516 log::debug!(
517 "won't fire an action due to waiting for responses: {waiting_for:?}"
518 );
519 return None;
520 }
521
522 for action in iter {
524 match action {
525 Action::FlushPackets => return None,
526 Action::Wait { wait_type } => match wait_type {
527 WaitType::WaitDuration(period) => return Some(*period),
528 WaitType::StreamEvent(response) => {
529 log::info!(
530 "waiting for {response:?} before executing more actions"
531 );
532 waiting_for.add_wait(response);
533 return None;
534 },
535 },
536 action => execute_action(action, conn, stream_parsers),
537 }
538 }
539
540 None
541}