1use std::slice::Iter;
30use std::time::Duration;
31use std::time::Instant;
32
33use ring::rand::*;
34
35use crate::client::QUIC_VERSION;
36use crate::frame::H3iFrame;
37use crate::quiche;
38
39use crate::actions::h3::Action;
40use crate::actions::h3::StreamEventType;
41use crate::actions::h3::WaitType;
42use crate::actions::h3::WaitingFor;
43use crate::client::execute_action;
44use crate::client::parse_streams;
45use crate::client::ClientError;
46use crate::client::ConnectionCloseDetails;
47use crate::client::MAX_DATAGRAM_SIZE;
48use crate::config::Config;
49
50use super::parse_args;
51use super::Client;
52use super::CloseTriggerFrames;
53use super::ConnectionSummary;
54use super::ParsedArgs;
55use super::StreamMap;
56use super::StreamParserMap;
57
58#[derive(Default)]
59struct SyncClient {
60 streams: StreamMap,
61 stream_parsers: StreamParserMap,
62}
63
64impl SyncClient {
65 fn new(close_trigger_frames: Option<CloseTriggerFrames>) -> Self {
66 Self {
67 streams: StreamMap::new(close_trigger_frames),
68 ..Default::default()
69 }
70 }
71}
72
73impl Client for SyncClient {
74 fn stream_parsers_mut(&mut self) -> &mut StreamParserMap {
75 &mut self.stream_parsers
76 }
77
78 fn handle_response_frame(&mut self, stream_id: u64, frame: H3iFrame) {
79 self.streams.insert(stream_id, frame);
80 }
81}
82
83fn create_config(args: &Config, should_log_keys: bool) -> quiche::Config {
84 let mut config = quiche::Config::new(QUIC_VERSION).unwrap();
86
87 config.verify_peer(args.verify_peer);
88 config.set_application_protos(&[b"h3"]).unwrap();
89 config.set_max_idle_timeout(args.idle_timeout);
90 config.set_max_recv_udp_payload_size(MAX_DATAGRAM_SIZE);
91 config.set_max_send_udp_payload_size(MAX_DATAGRAM_SIZE);
92 config.set_initial_max_data(10_000_000);
93 config
94 .set_initial_max_stream_data_bidi_local(args.max_stream_data_bidi_local);
95 config.set_initial_max_stream_data_bidi_remote(
96 args.max_stream_data_bidi_remote,
97 );
98 config.set_initial_max_stream_data_uni(args.max_stream_data_uni);
99 config.set_initial_max_streams_bidi(args.max_streams_bidi);
100 config.set_initial_max_streams_uni(args.max_streams_uni);
101 config.set_disable_active_migration(true);
102 config.set_active_connection_id_limit(0);
103
104 config.set_max_connection_window(args.max_window);
105 config.set_max_stream_window(args.max_stream_window);
106 config.set_enable_send_streams_blocked(true);
107 config.grease(false);
108
109 if args.enable_early_data {
110 config.enable_early_data();
111 }
112
113 if args.enable_dgram {
114 config.enable_dgram(
115 true,
116 args.dgram_recv_queue_len,
117 args.dgram_send_queue_len,
118 );
119 }
120 if should_log_keys {
121 config.log_keys()
122 }
123
124 config
125}
126
127pub fn connect(
138 args: Config, actions: Vec<Action>,
139 close_trigger_frames: Option<CloseTriggerFrames>,
140) -> std::result::Result<ConnectionSummary, ClientError> {
141 connect_with_early_data(args, None, actions, close_trigger_frames)
142}
143
144pub fn connect_with_early_data(
148 args: Config, early_actions: Option<Vec<Action>>, actions: Vec<Action>,
149 close_trigger_frames: Option<CloseTriggerFrames>,
150) -> std::result::Result<ConnectionSummary, ClientError> {
151 let mut buf = [0; 65535];
152 let mut out = [0; MAX_DATAGRAM_SIZE];
153
154 let ParsedArgs {
155 connect_url,
156 bind_addr,
157 peer_addr,
158 } = parse_args(&args);
159
160 let mut poll = mio::Poll::new().unwrap();
162 let mut events = mio::Events::with_capacity(1024);
163
164 let mut socket = mio::net::UdpSocket::bind(bind_addr).unwrap();
167 poll.registry()
168 .register(&mut socket, mio::Token(0), mio::Interest::READABLE)
169 .unwrap();
170
171 let mut keylog = None;
172 if let Some(keylog_path) = std::env::var_os("SSLKEYLOGFILE") {
173 let file = std::fs::OpenOptions::new()
174 .create(true)
175 .append(true)
176 .open(keylog_path)
177 .unwrap();
178
179 keylog = Some(file);
180 }
181
182 let mut config = create_config(&args, keylog.is_some());
183
184 let mut scid = [0; quiche::MAX_CONN_ID_LEN];
186
187 let rng = SystemRandom::new();
188 rng.fill(&mut scid[..]).unwrap();
189
190 let scid = quiche::ConnectionId::from_ref(&scid);
191
192 let Ok(local_addr) = socket.local_addr() else {
193 return Err(ClientError::Other("invalid socket".to_string()));
194 };
195
196 let mut conn =
198 quiche::connect(connect_url, &scid, local_addr, peer_addr, &mut config)
199 .map_err(|e| ClientError::Other(e.to_string()))?;
200
201 if let Some(session) = &args.session {
202 conn.set_session(session)
203 .map_err(|error| ClientError::Other(error.to_string()))?;
204 }
205
206 if let Some(keylog) = &mut keylog {
207 if let Ok(keylog) = keylog.try_clone() {
208 conn.set_keylog(Box::new(keylog));
209 }
210 }
211
212 log::info!(
213 "connecting to {peer_addr:} from {local_addr:} with scid {scid:?}",
214 );
215
216 let mut app_proto_selected = false;
217
218 let (write, send_info) = conn.send(&mut out).expect("initial send failed");
220
221 let mut client = SyncClient::new(close_trigger_frames);
222 if conn.is_in_early_data() {
225 if let Some(early_actions) = early_actions {
226 let mut early_action_iter = early_actions.iter();
227 let mut wait_duration = None;
228 let mut wait_instant = None;
229 let mut waiting_for = WaitingFor::default();
230
231 check_duration_and_do_actions(
232 &mut wait_duration,
233 &mut wait_instant,
234 &mut early_action_iter,
235 &mut conn,
236 &mut waiting_for,
237 client.stream_parsers_mut(),
238 );
239 }
240 }
241
242 while let Err(e) = socket.send_to(&out[..write], send_info.to) {
243 if e.kind() == std::io::ErrorKind::WouldBlock {
244 log::debug!(
245 "{} -> {}: send() would block",
246 socket.local_addr().unwrap(),
247 send_info.to
248 );
249 continue;
250 }
251
252 return Err(ClientError::Other(format!("send() failed: {e:?}")));
253 }
254
255 let app_data_start = std::time::Instant::now();
256
257 let mut action_iter = actions.iter();
258 let mut wait_duration = None;
259 let mut wait_instant = None;
260
261 let mut waiting_for = WaitingFor::default();
262
263 loop {
264 let actual_sleep = match (wait_duration, conn.timeout()) {
265 (Some(wait), Some(timeout)) => {
266 #[allow(clippy::comparison_chain)]
267 if timeout < wait {
268 let new = wait - timeout;
271 wait_duration = Some(new);
272 Some(timeout)
273 } else if wait < timeout {
274 Some(wait)
275 } else {
276 Some(timeout)
278 }
279 },
280 (None, Some(timeout)) => Some(timeout),
281 (Some(wait), None) => Some(wait),
282 _ => None,
283 };
284
285 log::debug!("actual sleep is {actual_sleep:?}");
286 poll.poll(&mut events, actual_sleep).unwrap();
287
288 if events.is_empty() {
291 log::debug!("timed out");
292
293 conn.on_timeout();
294 }
295
296 for event in &events {
299 let socket = match event.token() {
300 mio::Token(0) => &socket,
301
302 _ => unreachable!(),
303 };
304
305 let local_addr = socket.local_addr().unwrap();
306 'read: loop {
307 let (len, from) = match socket.recv_from(&mut buf) {
308 Ok(v) => v,
309
310 Err(e) => {
311 if e.kind() == std::io::ErrorKind::WouldBlock {
314 break 'read;
315 }
316
317 return Err(ClientError::Other(format!(
318 "{local_addr}: recv() failed: {e:?}"
319 )));
320 },
321 };
322
323 let recv_info = quiche::RecvInfo {
324 to: local_addr,
325 from,
326 };
327
328 let _read = match conn.recv(&mut buf[..len], recv_info) {
330 Ok(v) => v,
331
332 Err(e) => {
333 log::debug!("{local_addr}: recv failed: {e:?}");
334 continue 'read;
335 },
336 };
337 }
338 }
339
340 log::debug!("done reading");
341
342 if conn.is_closed() {
343 log::info!(
344 "connection closed with error={:?} did_idle_timeout={}, stats={:?} path_stats={:?}",
345 conn.peer_error(),
346 conn.is_timed_out(),
347 conn.stats(),
348 conn.path_stats().collect::<Vec<quiche::PathStats>>(),
349 );
350
351 if !conn.is_established() {
352 log::info!(
353 "connection timed out after {:?}",
354 app_data_start.elapsed(),
355 );
356
357 return Err(ClientError::HandshakeFail);
358 }
359
360 break;
361 }
362
363 if (conn.is_established() || conn.is_in_early_data()) &&
366 !app_proto_selected
367 {
368 app_proto_selected = true;
369 }
370
371 if app_proto_selected {
372 check_duration_and_do_actions(
373 &mut wait_duration,
374 &mut wait_instant,
375 &mut action_iter,
376 &mut conn,
377 &mut waiting_for,
378 client.stream_parsers_mut(),
379 );
380
381 let mut wait_cleared = false;
382 for response in parse_streams(&mut conn, &mut client) {
383 let stream_id = response.stream_id;
384
385 if let StreamEventType::Finished = response.event_type {
386 waiting_for.clear_waits_on_stream(stream_id);
387 } else {
388 waiting_for.remove_wait(response);
389 }
390
391 wait_cleared = true;
392 }
393
394 let before = waiting_for.is_empty();
396 waiting_for.check_can_open_num_streams(&conn);
397 if !before && waiting_for.is_empty() {
398 wait_cleared = true;
399 }
400
401 if client.streams.all_close_trigger_frames_seen() {
402 client.streams.close_due_to_trigger_frames(&mut conn);
403 }
404
405 if wait_cleared {
406 check_duration_and_do_actions(
407 &mut wait_duration,
408 &mut wait_instant,
409 &mut action_iter,
410 &mut conn,
411 &mut waiting_for,
412 client.stream_parsers_mut(),
413 );
414 }
415 }
416
417 while conn.scids_left() > 0 {
419 let (scid, reset_token) = generate_cid_and_reset_token();
420
421 if conn.new_scid(&scid, reset_token, false).is_err() {
422 break;
423 }
424 }
425
426 let sockets = vec![&socket];
429
430 for socket in sockets {
431 let local_addr = socket.local_addr().unwrap();
432
433 for peer_addr in conn.paths_iter(local_addr) {
434 loop {
435 let (write, send_info) = match conn.send_on_path(
436 &mut out,
437 Some(local_addr),
438 Some(peer_addr),
439 ) {
440 Ok(v) => v,
441
442 Err(quiche::Error::Done) => {
443 break;
444 },
445
446 Err(e) => {
447 log::error!(
448 "{local_addr} -> {peer_addr}: send failed: {e:?}"
449 );
450
451 conn.close(false, 0x1, b"fail").ok();
452 break;
453 },
454 };
455
456 if let Err(e) = socket.send_to(&out[..write], send_info.to) {
457 if e.kind() == std::io::ErrorKind::WouldBlock {
458 log::debug!(
459 "{} -> {}: send() would block",
460 local_addr,
461 send_info.to
462 );
463 break;
464 }
465
466 return Err(ClientError::Other(format!(
467 "{} -> {}: send() failed: {:?}",
468 local_addr, send_info.to, e
469 )));
470 }
471 }
472 }
473 }
474
475 if conn.is_closed() {
476 log::info!(
477 "connection closed, {:?} {:?}",
478 conn.stats(),
479 conn.path_stats().collect::<Vec<quiche::PathStats>>()
480 );
481
482 if !conn.is_established() {
483 log::info!(
484 "connection timed out after {:?}",
485 app_data_start.elapsed(),
486 );
487
488 return Err(ClientError::HandshakeFail);
489 }
490
491 break;
492 }
493 }
494
495 Ok(ConnectionSummary {
496 stream_map: client.streams,
497 stats: Some(conn.stats()),
498 path_stats: conn.path_stats().collect(),
499 conn_close_details: ConnectionCloseDetails::new(&conn),
500 })
501}
502
503fn check_duration_and_do_actions(
504 wait_duration: &mut Option<Duration>, wait_instant: &mut Option<Instant>,
505 action_iter: &mut Iter<Action>, conn: &mut quiche::Connection,
506 waiting_for: &mut WaitingFor, stream_parsers: &mut StreamParserMap,
507) {
508 match wait_duration.as_ref() {
509 None => {
510 if let Some(idle_wait) =
511 handle_actions(action_iter, conn, waiting_for, stream_parsers)
512 {
513 *wait_duration = Some(idle_wait);
514 *wait_instant = Some(Instant::now());
515
516 log::info!(
521 "waiting for {idle_wait:?} before executing more actions"
522 );
523 }
524 },
525
526 Some(period) => {
527 let now = Instant::now();
528 let then = wait_instant.unwrap();
529 log::debug!(
530 "checking if actions wait period elapsed {:?} > {:?}",
531 now.duration_since(then),
532 wait_duration
533 );
534 if now.duration_since(then) >= *period {
535 log::debug!("yup!");
536 *wait_duration = None;
537
538 if let Some(idle_wait) =
539 handle_actions(action_iter, conn, waiting_for, stream_parsers)
540 {
541 *wait_duration = Some(idle_wait);
542 }
543 }
544 },
545 }
546}
547
548pub fn generate_cid_and_reset_token() -> (quiche::ConnectionId<'static>, u128) {
550 let rng = SystemRandom::new();
551
552 let mut scid = [0; quiche::MAX_CONN_ID_LEN];
553 rng.fill(&mut scid[..]).unwrap();
554 let scid = scid.to_vec().into();
555
556 let mut reset_token = [0; 16];
557 rng.fill(&mut reset_token[..]).unwrap();
558
559 let reset_token = u128::from_be_bytes(reset_token);
560 (scid, reset_token)
561}
562
563fn handle_actions<'a, I>(
564 iter: &mut I, conn: &mut quiche::Connection, waiting_for: &mut WaitingFor,
565 stream_parsers: &mut StreamParserMap,
566) -> Option<Duration>
567where
568 I: Iterator<Item = &'a Action>,
569{
570 if !waiting_for.is_empty() {
571 log::debug!(
572 "won't fire an action due to waiting for responses: {waiting_for:?}"
573 );
574 return None;
575 }
576
577 for action in iter {
579 match action {
580 Action::FlushPackets => return None,
581 Action::Wait { wait_type } => match wait_type {
582 WaitType::WaitDuration(period) => return Some(*period),
583 WaitType::StreamEvent(response) => {
584 log::info!(
585 "waiting for {response:?} before executing more actions"
586 );
587 waiting_for.add_wait(response);
588 return None;
589 },
590 WaitType::CanOpenNumStreams(required_streams) => {
591 log::info!(
592 "h3i: waiting for peer_streams_left_bidi >= {required_streams:?}"
593 );
594 waiting_for.set_required_stream_quota(*required_streams);
595 return None;
596 },
597 },
598 action => execute_action(action, conn, stream_parsers),
599 }
600 }
601
602 None
603}