1use std::slice::Iter;
30use std::time::Duration;
31use std::time::Instant;
32
33use ring::rand::*;
34
35use crate::client::QUIC_VERSION;
36use crate::frame::H3iFrame;
37use crate::quiche;
38
39use crate::actions::h3::Action;
40use crate::actions::h3::StreamEventType;
41use crate::actions::h3::WaitType;
42use crate::actions::h3::WaitingFor;
43use crate::client::execute_action;
44use crate::client::parse_streams;
45use crate::client::ClientError;
46use crate::client::ConnectionCloseDetails;
47use crate::client::MAX_DATAGRAM_SIZE;
48use crate::config::Config;
49
50use super::parse_args;
51use super::Client;
52use super::CloseTriggerFrames;
53use super::ConnectionSummary;
54use super::ParsedArgs;
55use super::StreamMap;
56use super::StreamParserMap;
57
58#[derive(Default)]
59struct SyncClient {
60 streams: StreamMap,
61 stream_parsers: StreamParserMap,
62}
63
64impl SyncClient {
65 fn new(close_trigger_frames: Option<CloseTriggerFrames>) -> Self {
66 Self {
67 streams: StreamMap::new(close_trigger_frames),
68 ..Default::default()
69 }
70 }
71}
72
73impl Client for SyncClient {
74 fn stream_parsers_mut(&mut self) -> &mut StreamParserMap {
75 &mut self.stream_parsers
76 }
77
78 fn handle_response_frame(&mut self, stream_id: u64, frame: H3iFrame) {
79 self.streams.insert(stream_id, frame);
80 }
81}
82
83fn create_config(args: &Config, should_log_keys: bool) -> quiche::Config {
84 let mut config = quiche::Config::new(QUIC_VERSION).unwrap();
86
87 config.verify_peer(args.verify_peer);
88 config.set_application_protos(&[b"h3"]).unwrap();
89 config.set_max_idle_timeout(args.idle_timeout);
90 config.set_max_recv_udp_payload_size(MAX_DATAGRAM_SIZE);
91 config.set_max_send_udp_payload_size(MAX_DATAGRAM_SIZE);
92 config.set_initial_max_data(10_000_000);
93 config
94 .set_initial_max_stream_data_bidi_local(args.max_stream_data_bidi_local);
95 config.set_initial_max_stream_data_bidi_remote(
96 args.max_stream_data_bidi_remote,
97 );
98 config.set_initial_max_stream_data_uni(args.max_stream_data_uni);
99 config.set_initial_max_streams_bidi(args.max_streams_bidi);
100 config.set_initial_max_streams_uni(args.max_streams_uni);
101 config.set_disable_active_migration(true);
102 config.set_active_connection_id_limit(0);
103
104 config.set_max_connection_window(args.max_window);
105 config.set_max_stream_window(args.max_stream_window);
106 config.grease(false);
107
108 if args.enable_dgram {
109 config.enable_dgram(
110 true,
111 args.dgram_recv_queue_len,
112 args.dgram_send_queue_len,
113 );
114 }
115 if should_log_keys {
116 config.log_keys()
117 }
118
119 config
120}
121
122pub fn connect(
133 args: Config, actions: Vec<Action>,
134 close_trigger_frames: Option<CloseTriggerFrames>,
135) -> std::result::Result<ConnectionSummary, ClientError> {
136 let mut buf = [0; 65535];
137 let mut out = [0; MAX_DATAGRAM_SIZE];
138
139 let ParsedArgs {
140 connect_url,
141 bind_addr,
142 peer_addr,
143 } = parse_args(&args);
144
145 let mut poll = mio::Poll::new().unwrap();
147 let mut events = mio::Events::with_capacity(1024);
148
149 let mut socket = mio::net::UdpSocket::bind(bind_addr).unwrap();
152 poll.registry()
153 .register(&mut socket, mio::Token(0), mio::Interest::READABLE)
154 .unwrap();
155
156 let mut keylog = None;
157 if let Some(keylog_path) = std::env::var_os("SSLKEYLOGFILE") {
158 let file = std::fs::OpenOptions::new()
159 .create(true)
160 .append(true)
161 .open(keylog_path)
162 .unwrap();
163
164 keylog = Some(file);
165 }
166
167 let mut config = create_config(&args, keylog.is_some());
168
169 let mut scid = [0; quiche::MAX_CONN_ID_LEN];
171
172 let rng = SystemRandom::new();
173 rng.fill(&mut scid[..]).unwrap();
174
175 let scid = quiche::ConnectionId::from_ref(&scid);
176
177 let Ok(local_addr) = socket.local_addr() else {
178 return Err(ClientError::Other("invalid socket".to_string()));
179 };
180
181 let mut conn =
183 quiche::connect(connect_url, &scid, local_addr, peer_addr, &mut config)
184 .map_err(|e| ClientError::Other(e.to_string()))?;
185
186 if let Some(keylog) = &mut keylog {
187 if let Ok(keylog) = keylog.try_clone() {
188 conn.set_keylog(Box::new(keylog));
189 }
190 }
191
192 log::info!(
193 "connecting to {peer_addr:} from {local_addr:} with scid {scid:?}",
194 );
195
196 let mut app_proto_selected = false;
197
198 let (write, send_info) = conn.send(&mut out).expect("initial send failed");
199
200 while let Err(e) = socket.send_to(&out[..write], send_info.to) {
201 if e.kind() == std::io::ErrorKind::WouldBlock {
202 log::debug!(
203 "{} -> {}: send() would block",
204 socket.local_addr().unwrap(),
205 send_info.to
206 );
207 continue;
208 }
209
210 return Err(ClientError::Other(format!("send() failed: {e:?}")));
211 }
212
213 let app_data_start = std::time::Instant::now();
214
215 let mut action_iter = actions.iter();
216 let mut wait_duration = None;
217 let mut wait_instant = None;
218
219 let mut client = SyncClient::new(close_trigger_frames);
220 let mut waiting_for = WaitingFor::default();
221
222 loop {
223 let actual_sleep = match (wait_duration, conn.timeout()) {
224 (Some(wait), Some(timeout)) => {
225 #[allow(clippy::comparison_chain)]
226 if timeout < wait {
227 let new = wait - timeout;
230 wait_duration = Some(new);
231 Some(timeout)
232 } else if wait < timeout {
233 Some(wait)
234 } else {
235 Some(timeout)
237 }
238 },
239 (None, Some(timeout)) => Some(timeout),
240 (Some(wait), None) => Some(wait),
241 _ => None,
242 };
243
244 log::debug!("actual sleep is {actual_sleep:?}");
245 poll.poll(&mut events, actual_sleep).unwrap();
246
247 if events.is_empty() {
250 log::debug!("timed out");
251
252 conn.on_timeout();
253 }
254
255 for event in &events {
258 let socket = match event.token() {
259 mio::Token(0) => &socket,
260
261 _ => unreachable!(),
262 };
263
264 let local_addr = socket.local_addr().unwrap();
265 'read: loop {
266 let (len, from) = match socket.recv_from(&mut buf) {
267 Ok(v) => v,
268
269 Err(e) => {
270 if e.kind() == std::io::ErrorKind::WouldBlock {
273 break 'read;
274 }
275
276 return Err(ClientError::Other(format!(
277 "{local_addr}: recv() failed: {e:?}"
278 )));
279 },
280 };
281
282 let recv_info = quiche::RecvInfo {
283 to: local_addr,
284 from,
285 };
286
287 let _read = match conn.recv(&mut buf[..len], recv_info) {
289 Ok(v) => v,
290
291 Err(e) => {
292 log::debug!("{local_addr}: recv failed: {e:?}");
293 continue 'read;
294 },
295 };
296 }
297 }
298
299 log::debug!("done reading");
300
301 if conn.is_closed() {
302 log::info!(
303 "connection closed with error={:?} did_idle_timeout={}, stats={:?} path_stats={:?}",
304 conn.peer_error(),
305 conn.is_timed_out(),
306 conn.stats(),
307 conn.path_stats().collect::<Vec<quiche::PathStats>>(),
308 );
309
310 if !conn.is_established() {
311 log::info!(
312 "connection timed out after {:?}",
313 app_data_start.elapsed(),
314 );
315
316 return Err(ClientError::HandshakeFail);
317 }
318
319 break;
320 }
321
322 if (conn.is_established() || conn.is_in_early_data()) &&
325 !app_proto_selected
326 {
327 app_proto_selected = true;
328 }
329
330 if app_proto_selected {
331 check_duration_and_do_actions(
332 &mut wait_duration,
333 &mut wait_instant,
334 &mut action_iter,
335 &mut conn,
336 &mut waiting_for,
337 client.stream_parsers_mut(),
338 );
339
340 let mut wait_cleared = false;
341 for response in parse_streams(&mut conn, &mut client) {
342 let stream_id = response.stream_id;
343
344 if let StreamEventType::Finished = response.event_type {
345 waiting_for.clear_waits_on_stream(stream_id);
346 } else {
347 waiting_for.remove_wait(response);
348 }
349
350 wait_cleared = true;
351 }
352
353 if client.streams.all_close_trigger_frames_seen() {
354 client.streams.close_due_to_trigger_frames(&mut conn);
355 }
356
357 if wait_cleared {
358 check_duration_and_do_actions(
359 &mut wait_duration,
360 &mut wait_instant,
361 &mut action_iter,
362 &mut conn,
363 &mut waiting_for,
364 client.stream_parsers_mut(),
365 );
366 }
367 }
368
369 while conn.scids_left() > 0 {
371 let (scid, reset_token) = generate_cid_and_reset_token();
372
373 if conn.new_scid(&scid, reset_token, false).is_err() {
374 break;
375 }
376 }
377
378 let sockets = vec![&socket];
381
382 for socket in sockets {
383 let local_addr = socket.local_addr().unwrap();
384
385 for peer_addr in conn.paths_iter(local_addr) {
386 loop {
387 let (write, send_info) = match conn.send_on_path(
388 &mut out,
389 Some(local_addr),
390 Some(peer_addr),
391 ) {
392 Ok(v) => v,
393
394 Err(quiche::Error::Done) => {
395 break;
396 },
397
398 Err(e) => {
399 log::error!(
400 "{local_addr} -> {peer_addr}: send failed: {e:?}"
401 );
402
403 conn.close(false, 0x1, b"fail").ok();
404 break;
405 },
406 };
407
408 if let Err(e) = socket.send_to(&out[..write], send_info.to) {
409 if e.kind() == std::io::ErrorKind::WouldBlock {
410 log::debug!(
411 "{} -> {}: send() would block",
412 local_addr,
413 send_info.to
414 );
415 break;
416 }
417
418 return Err(ClientError::Other(format!(
419 "{} -> {}: send() failed: {:?}",
420 local_addr, send_info.to, e
421 )));
422 }
423 }
424 }
425 }
426
427 if conn.is_closed() {
428 log::info!(
429 "connection closed, {:?} {:?}",
430 conn.stats(),
431 conn.path_stats().collect::<Vec<quiche::PathStats>>()
432 );
433
434 if !conn.is_established() {
435 log::info!(
436 "connection timed out after {:?}",
437 app_data_start.elapsed(),
438 );
439
440 return Err(ClientError::HandshakeFail);
441 }
442
443 break;
444 }
445 }
446
447 Ok(ConnectionSummary {
448 stream_map: client.streams,
449 stats: Some(conn.stats()),
450 path_stats: conn.path_stats().collect(),
451 conn_close_details: ConnectionCloseDetails::new(&conn),
452 })
453}
454
455fn check_duration_and_do_actions(
456 wait_duration: &mut Option<Duration>, wait_instant: &mut Option<Instant>,
457 action_iter: &mut Iter<Action>, conn: &mut quiche::Connection,
458 waiting_for: &mut WaitingFor, stream_parsers: &mut StreamParserMap,
459) {
460 match wait_duration.as_ref() {
461 None => {
462 if let Some(idle_wait) =
463 handle_actions(action_iter, conn, waiting_for, stream_parsers)
464 {
465 *wait_duration = Some(idle_wait);
466 *wait_instant = Some(Instant::now());
467
468 log::info!(
473 "waiting for {idle_wait:?} before executing more actions"
474 );
475 }
476 },
477
478 Some(period) => {
479 let now = Instant::now();
480 let then = wait_instant.unwrap();
481 log::debug!(
482 "checking if actions wait period elapsed {:?} > {:?}",
483 now.duration_since(then),
484 wait_duration
485 );
486 if now.duration_since(then) >= *period {
487 log::debug!("yup!");
488 *wait_duration = None;
489
490 if let Some(idle_wait) =
491 handle_actions(action_iter, conn, waiting_for, stream_parsers)
492 {
493 *wait_duration = Some(idle_wait);
494 }
495 }
496 },
497 }
498}
499
500pub fn generate_cid_and_reset_token() -> (quiche::ConnectionId<'static>, u128) {
502 let rng = SystemRandom::new();
503
504 let mut scid = [0; quiche::MAX_CONN_ID_LEN];
505 rng.fill(&mut scid[..]).unwrap();
506 let scid = scid.to_vec().into();
507
508 let mut reset_token = [0; 16];
509 rng.fill(&mut reset_token[..]).unwrap();
510
511 let reset_token = u128::from_be_bytes(reset_token);
512 (scid, reset_token)
513}
514
515fn handle_actions<'a, I>(
516 iter: &mut I, conn: &mut quiche::Connection, waiting_for: &mut WaitingFor,
517 stream_parsers: &mut StreamParserMap,
518) -> Option<Duration>
519where
520 I: Iterator<Item = &'a Action>,
521{
522 if !waiting_for.is_empty() {
523 log::debug!(
524 "won't fire an action due to waiting for responses: {waiting_for:?}"
525 );
526 return None;
527 }
528
529 for action in iter {
531 match action {
532 Action::FlushPackets => return None,
533 Action::Wait { wait_type } => match wait_type {
534 WaitType::WaitDuration(period) => return Some(*period),
535 WaitType::StreamEvent(response) => {
536 log::info!(
537 "waiting for {response:?} before executing more actions"
538 );
539 waiting_for.add_wait(response);
540 return None;
541 },
542 },
543 action => execute_action(action, conn, stream_parsers),
544 }
545 }
546
547 None
548}