1use std::slice::Iter;
30use std::time::Duration;
31use std::time::Instant;
32
33use ring::rand::*;
34
35use crate::client::QUIC_VERSION;
36use crate::frame::H3iFrame;
37use crate::quiche;
38
39use crate::actions::h3::Action;
40use crate::actions::h3::StreamEventType;
41use crate::actions::h3::WaitType;
42use crate::actions::h3::WaitingFor;
43use crate::client::execute_action;
44use crate::client::parse_streams;
45use crate::client::ClientError;
46use crate::client::ConnectionCloseDetails;
47use crate::client::MAX_DATAGRAM_SIZE;
48use crate::config::Config;
49
50use super::parse_args;
51use super::Client;
52use super::CloseTriggerFrames;
53use super::ConnectionSummary;
54use super::ParsedArgs;
55use super::StreamMap;
56use super::StreamParserMap;
57
58#[derive(Default)]
59struct SyncClient {
60 streams: StreamMap,
61 stream_parsers: StreamParserMap,
62}
63
64impl SyncClient {
65 fn new(close_trigger_frames: Option<CloseTriggerFrames>) -> Self {
66 Self {
67 streams: StreamMap::new(close_trigger_frames),
68 ..Default::default()
69 }
70 }
71}
72
73impl Client for SyncClient {
74 fn stream_parsers_mut(&mut self) -> &mut StreamParserMap {
75 &mut self.stream_parsers
76 }
77
78 fn handle_response_frame(&mut self, stream_id: u64, frame: H3iFrame) {
79 self.streams.insert(stream_id, frame);
80 }
81}
82
83fn create_config(args: &Config, should_log_keys: bool) -> quiche::Config {
84 let mut config = quiche::Config::new(QUIC_VERSION).unwrap();
86
87 config.verify_peer(args.verify_peer);
88 config.set_application_protos(&[b"h3"]).unwrap();
89 config.set_max_idle_timeout(args.idle_timeout);
90 config.set_max_recv_udp_payload_size(MAX_DATAGRAM_SIZE);
91 config.set_max_send_udp_payload_size(MAX_DATAGRAM_SIZE);
92 config.set_initial_max_data(10_000_000);
93 config
94 .set_initial_max_stream_data_bidi_local(args.max_stream_data_bidi_local);
95 config.set_initial_max_stream_data_bidi_remote(
96 args.max_stream_data_bidi_remote,
97 );
98 config.set_initial_max_stream_data_uni(args.max_stream_data_uni);
99 config.set_initial_max_streams_bidi(args.max_streams_bidi);
100 config.set_initial_max_streams_uni(args.max_streams_uni);
101 config.set_disable_active_migration(true);
102 config.set_active_connection_id_limit(0);
103
104 config.set_max_connection_window(args.max_window);
105 config.set_max_stream_window(args.max_stream_window);
106 config.grease(false);
107
108 if args.enable_early_data {
109 config.enable_early_data();
110 }
111
112 if args.enable_dgram {
113 config.enable_dgram(
114 true,
115 args.dgram_recv_queue_len,
116 args.dgram_send_queue_len,
117 );
118 }
119 if should_log_keys {
120 config.log_keys()
121 }
122
123 config
124}
125
126pub fn connect(
137 args: Config, actions: Vec<Action>,
138 close_trigger_frames: Option<CloseTriggerFrames>,
139) -> std::result::Result<ConnectionSummary, ClientError> {
140 connect_with_early_data(args, None, actions, close_trigger_frames)
141}
142
143pub fn connect_with_early_data(
147 args: Config, early_actions: Option<Vec<Action>>, actions: Vec<Action>,
148 close_trigger_frames: Option<CloseTriggerFrames>,
149) -> std::result::Result<ConnectionSummary, ClientError> {
150 let mut buf = [0; 65535];
151 let mut out = [0; MAX_DATAGRAM_SIZE];
152
153 let ParsedArgs {
154 connect_url,
155 bind_addr,
156 peer_addr,
157 } = parse_args(&args);
158
159 let mut poll = mio::Poll::new().unwrap();
161 let mut events = mio::Events::with_capacity(1024);
162
163 let mut socket = mio::net::UdpSocket::bind(bind_addr).unwrap();
166 poll.registry()
167 .register(&mut socket, mio::Token(0), mio::Interest::READABLE)
168 .unwrap();
169
170 let mut keylog = None;
171 if let Some(keylog_path) = std::env::var_os("SSLKEYLOGFILE") {
172 let file = std::fs::OpenOptions::new()
173 .create(true)
174 .append(true)
175 .open(keylog_path)
176 .unwrap();
177
178 keylog = Some(file);
179 }
180
181 let mut config = create_config(&args, keylog.is_some());
182
183 let mut scid = [0; quiche::MAX_CONN_ID_LEN];
185
186 let rng = SystemRandom::new();
187 rng.fill(&mut scid[..]).unwrap();
188
189 let scid = quiche::ConnectionId::from_ref(&scid);
190
191 let Ok(local_addr) = socket.local_addr() else {
192 return Err(ClientError::Other("invalid socket".to_string()));
193 };
194
195 let mut conn =
197 quiche::connect(connect_url, &scid, local_addr, peer_addr, &mut config)
198 .map_err(|e| ClientError::Other(e.to_string()))?;
199
200 if let Some(session) = &args.session {
201 conn.set_session(session)
202 .map_err(|error| ClientError::Other(error.to_string()))?;
203 }
204
205 if let Some(keylog) = &mut keylog {
206 if let Ok(keylog) = keylog.try_clone() {
207 conn.set_keylog(Box::new(keylog));
208 }
209 }
210
211 log::info!(
212 "connecting to {peer_addr:} from {local_addr:} with scid {scid:?}",
213 );
214
215 let mut app_proto_selected = false;
216
217 let (write, send_info) = conn.send(&mut out).expect("initial send failed");
219
220 let mut client = SyncClient::new(close_trigger_frames);
221 if conn.is_in_early_data() {
224 if let Some(early_actions) = early_actions {
225 let mut early_action_iter = early_actions.iter();
226 let mut wait_duration = None;
227 let mut wait_instant = None;
228 let mut waiting_for = WaitingFor::default();
229
230 check_duration_and_do_actions(
231 &mut wait_duration,
232 &mut wait_instant,
233 &mut early_action_iter,
234 &mut conn,
235 &mut waiting_for,
236 client.stream_parsers_mut(),
237 );
238 }
239 }
240
241 while let Err(e) = socket.send_to(&out[..write], send_info.to) {
242 if e.kind() == std::io::ErrorKind::WouldBlock {
243 log::debug!(
244 "{} -> {}: send() would block",
245 socket.local_addr().unwrap(),
246 send_info.to
247 );
248 continue;
249 }
250
251 return Err(ClientError::Other(format!("send() failed: {e:?}")));
252 }
253
254 let app_data_start = std::time::Instant::now();
255
256 let mut action_iter = actions.iter();
257 let mut wait_duration = None;
258 let mut wait_instant = None;
259
260 let mut waiting_for = WaitingFor::default();
261
262 loop {
263 let actual_sleep = match (wait_duration, conn.timeout()) {
264 (Some(wait), Some(timeout)) => {
265 #[allow(clippy::comparison_chain)]
266 if timeout < wait {
267 let new = wait - timeout;
270 wait_duration = Some(new);
271 Some(timeout)
272 } else if wait < timeout {
273 Some(wait)
274 } else {
275 Some(timeout)
277 }
278 },
279 (None, Some(timeout)) => Some(timeout),
280 (Some(wait), None) => Some(wait),
281 _ => None,
282 };
283
284 log::debug!("actual sleep is {actual_sleep:?}");
285 poll.poll(&mut events, actual_sleep).unwrap();
286
287 if events.is_empty() {
290 log::debug!("timed out");
291
292 conn.on_timeout();
293 }
294
295 for event in &events {
298 let socket = match event.token() {
299 mio::Token(0) => &socket,
300
301 _ => unreachable!(),
302 };
303
304 let local_addr = socket.local_addr().unwrap();
305 'read: loop {
306 let (len, from) = match socket.recv_from(&mut buf) {
307 Ok(v) => v,
308
309 Err(e) => {
310 if e.kind() == std::io::ErrorKind::WouldBlock {
313 break 'read;
314 }
315
316 return Err(ClientError::Other(format!(
317 "{local_addr}: recv() failed: {e:?}"
318 )));
319 },
320 };
321
322 let recv_info = quiche::RecvInfo {
323 to: local_addr,
324 from,
325 };
326
327 let _read = match conn.recv(&mut buf[..len], recv_info) {
329 Ok(v) => v,
330
331 Err(e) => {
332 log::debug!("{local_addr}: recv failed: {e:?}");
333 continue 'read;
334 },
335 };
336 }
337 }
338
339 log::debug!("done reading");
340
341 if conn.is_closed() {
342 log::info!(
343 "connection closed with error={:?} did_idle_timeout={}, stats={:?} path_stats={:?}",
344 conn.peer_error(),
345 conn.is_timed_out(),
346 conn.stats(),
347 conn.path_stats().collect::<Vec<quiche::PathStats>>(),
348 );
349
350 if !conn.is_established() {
351 log::info!(
352 "connection timed out after {:?}",
353 app_data_start.elapsed(),
354 );
355
356 return Err(ClientError::HandshakeFail);
357 }
358
359 break;
360 }
361
362 if (conn.is_established() || conn.is_in_early_data()) &&
365 !app_proto_selected
366 {
367 app_proto_selected = true;
368 }
369
370 if app_proto_selected {
371 check_duration_and_do_actions(
372 &mut wait_duration,
373 &mut wait_instant,
374 &mut action_iter,
375 &mut conn,
376 &mut waiting_for,
377 client.stream_parsers_mut(),
378 );
379
380 let mut wait_cleared = false;
381 for response in parse_streams(&mut conn, &mut client) {
382 let stream_id = response.stream_id;
383
384 if let StreamEventType::Finished = response.event_type {
385 waiting_for.clear_waits_on_stream(stream_id);
386 } else {
387 waiting_for.remove_wait(response);
388 }
389
390 wait_cleared = true;
391 }
392
393 if client.streams.all_close_trigger_frames_seen() {
394 client.streams.close_due_to_trigger_frames(&mut conn);
395 }
396
397 if wait_cleared {
398 check_duration_and_do_actions(
399 &mut wait_duration,
400 &mut wait_instant,
401 &mut action_iter,
402 &mut conn,
403 &mut waiting_for,
404 client.stream_parsers_mut(),
405 );
406 }
407 }
408
409 while conn.scids_left() > 0 {
411 let (scid, reset_token) = generate_cid_and_reset_token();
412
413 if conn.new_scid(&scid, reset_token, false).is_err() {
414 break;
415 }
416 }
417
418 let sockets = vec![&socket];
421
422 for socket in sockets {
423 let local_addr = socket.local_addr().unwrap();
424
425 for peer_addr in conn.paths_iter(local_addr) {
426 loop {
427 let (write, send_info) = match conn.send_on_path(
428 &mut out,
429 Some(local_addr),
430 Some(peer_addr),
431 ) {
432 Ok(v) => v,
433
434 Err(quiche::Error::Done) => {
435 break;
436 },
437
438 Err(e) => {
439 log::error!(
440 "{local_addr} -> {peer_addr}: send failed: {e:?}"
441 );
442
443 conn.close(false, 0x1, b"fail").ok();
444 break;
445 },
446 };
447
448 if let Err(e) = socket.send_to(&out[..write], send_info.to) {
449 if e.kind() == std::io::ErrorKind::WouldBlock {
450 log::debug!(
451 "{} -> {}: send() would block",
452 local_addr,
453 send_info.to
454 );
455 break;
456 }
457
458 return Err(ClientError::Other(format!(
459 "{} -> {}: send() failed: {:?}",
460 local_addr, send_info.to, e
461 )));
462 }
463 }
464 }
465 }
466
467 if conn.is_closed() {
468 log::info!(
469 "connection closed, {:?} {:?}",
470 conn.stats(),
471 conn.path_stats().collect::<Vec<quiche::PathStats>>()
472 );
473
474 if !conn.is_established() {
475 log::info!(
476 "connection timed out after {:?}",
477 app_data_start.elapsed(),
478 );
479
480 return Err(ClientError::HandshakeFail);
481 }
482
483 break;
484 }
485 }
486
487 Ok(ConnectionSummary {
488 stream_map: client.streams,
489 stats: Some(conn.stats()),
490 path_stats: conn.path_stats().collect(),
491 conn_close_details: ConnectionCloseDetails::new(&conn),
492 })
493}
494
495fn check_duration_and_do_actions(
496 wait_duration: &mut Option<Duration>, wait_instant: &mut Option<Instant>,
497 action_iter: &mut Iter<Action>, conn: &mut quiche::Connection,
498 waiting_for: &mut WaitingFor, stream_parsers: &mut StreamParserMap,
499) {
500 match wait_duration.as_ref() {
501 None => {
502 if let Some(idle_wait) =
503 handle_actions(action_iter, conn, waiting_for, stream_parsers)
504 {
505 *wait_duration = Some(idle_wait);
506 *wait_instant = Some(Instant::now());
507
508 log::info!(
513 "waiting for {idle_wait:?} before executing more actions"
514 );
515 }
516 },
517
518 Some(period) => {
519 let now = Instant::now();
520 let then = wait_instant.unwrap();
521 log::debug!(
522 "checking if actions wait period elapsed {:?} > {:?}",
523 now.duration_since(then),
524 wait_duration
525 );
526 if now.duration_since(then) >= *period {
527 log::debug!("yup!");
528 *wait_duration = None;
529
530 if let Some(idle_wait) =
531 handle_actions(action_iter, conn, waiting_for, stream_parsers)
532 {
533 *wait_duration = Some(idle_wait);
534 }
535 }
536 },
537 }
538}
539
540pub fn generate_cid_and_reset_token() -> (quiche::ConnectionId<'static>, u128) {
542 let rng = SystemRandom::new();
543
544 let mut scid = [0; quiche::MAX_CONN_ID_LEN];
545 rng.fill(&mut scid[..]).unwrap();
546 let scid = scid.to_vec().into();
547
548 let mut reset_token = [0; 16];
549 rng.fill(&mut reset_token[..]).unwrap();
550
551 let reset_token = u128::from_be_bytes(reset_token);
552 (scid, reset_token)
553}
554
555fn handle_actions<'a, I>(
556 iter: &mut I, conn: &mut quiche::Connection, waiting_for: &mut WaitingFor,
557 stream_parsers: &mut StreamParserMap,
558) -> Option<Duration>
559where
560 I: Iterator<Item = &'a Action>,
561{
562 if !waiting_for.is_empty() {
563 log::debug!(
564 "won't fire an action due to waiting for responses: {waiting_for:?}"
565 );
566 return None;
567 }
568
569 for action in iter {
571 match action {
572 Action::FlushPackets => return None,
573 Action::Wait { wait_type } => match wait_type {
574 WaitType::WaitDuration(period) => return Some(*period),
575 WaitType::StreamEvent(response) => {
576 log::info!(
577 "waiting for {response:?} before executing more actions"
578 );
579 waiting_for.add_wait(response);
580 return None;
581 },
582 },
583 action => execute_action(action, conn, stream_parsers),
584 }
585 }
586
587 None
588}