1use std::slice::Iter;
30use std::time::Duration;
31use std::time::Instant;
32
33use crate::client::QUIC_VERSION;
34use crate::frame::H3iFrame;
35use crate::quiche;
36
37use crate::actions::h3::Action;
38use crate::actions::h3::StreamEventType;
39use crate::actions::h3::WaitType;
40use crate::actions::h3::WaitingFor;
41use crate::client::execute_action;
42use crate::client::parse_streams;
43use crate::client::ClientError;
44use crate::client::ConnectionCloseDetails;
45use crate::client::MAX_DATAGRAM_SIZE;
46use crate::config::Config;
47
48use super::parse_args;
49use super::Client;
50use super::CloseTriggerFrames;
51use super::ConnectionSummary;
52use super::ParsedArgs;
53use super::StreamMap;
54use super::StreamParserMap;
55
56#[derive(Default)]
57struct SyncClient {
58 streams: StreamMap,
59 stream_parsers: StreamParserMap,
60}
61
62impl SyncClient {
63 fn new(close_trigger_frames: Option<CloseTriggerFrames>) -> Self {
64 Self {
65 streams: StreamMap::new(close_trigger_frames),
66 ..Default::default()
67 }
68 }
69}
70
71impl Client for SyncClient {
72 fn stream_parsers_mut(&mut self) -> &mut StreamParserMap {
73 &mut self.stream_parsers
74 }
75
76 fn handle_response_frame(&mut self, stream_id: u64, frame: H3iFrame) {
77 self.streams.insert(stream_id, frame);
78 }
79}
80
81fn create_config(args: &Config, should_log_keys: bool) -> quiche::Config {
82 let mut config = quiche::Config::new(QUIC_VERSION).unwrap();
84
85 config.verify_peer(args.verify_peer);
86 config.set_application_protos(&[b"h3"]).unwrap();
87 config.set_max_idle_timeout(args.idle_timeout);
88 config.set_max_recv_udp_payload_size(MAX_DATAGRAM_SIZE);
89 config.set_max_send_udp_payload_size(MAX_DATAGRAM_SIZE);
90 config.set_initial_max_data(10_000_000);
91 config
92 .set_initial_max_stream_data_bidi_local(args.max_stream_data_bidi_local);
93 config.set_initial_max_stream_data_bidi_remote(
94 args.max_stream_data_bidi_remote,
95 );
96 config.set_initial_max_stream_data_uni(args.max_stream_data_uni);
97 config.set_initial_max_streams_bidi(args.max_streams_bidi);
98 config.set_initial_max_streams_uni(args.max_streams_uni);
99 config.set_disable_active_migration(true);
100 config.set_active_connection_id_limit(0);
101
102 config.set_max_connection_window(args.max_window);
103 config.set_max_stream_window(args.max_stream_window);
104 config.grease(false);
105
106 if should_log_keys {
107 config.log_keys()
108 }
109
110 config
111}
112
113pub fn connect(
124 args: Config, actions: Vec<Action>,
125 close_trigger_frames: Option<CloseTriggerFrames>,
126) -> std::result::Result<ConnectionSummary, ClientError> {
127 let mut buf = [0; 65535];
128 let mut out = [0; MAX_DATAGRAM_SIZE];
129
130 let ParsedArgs {
131 connect_url,
132 bind_addr,
133 peer_addr,
134 } = parse_args(&args);
135
136 let mut poll = mio::Poll::new().unwrap();
138 let mut events = mio::Events::with_capacity(1024);
139
140 let mut socket = mio::net::UdpSocket::bind(bind_addr).unwrap();
143 poll.registry()
144 .register(&mut socket, mio::Token(0), mio::Interest::READABLE)
145 .unwrap();
146
147 let mut keylog = None;
148 if let Some(keylog_path) = std::env::var_os("SSLKEYLOGFILE") {
149 let file = std::fs::OpenOptions::new()
150 .create(true)
151 .append(true)
152 .open(keylog_path)
153 .unwrap();
154
155 keylog = Some(file);
156 }
157
158 let mut config = create_config(&args, keylog.is_some());
159
160 let mut scid = [0; quiche::MAX_CONN_ID_LEN];
162 rand::RngCore::fill_bytes(&mut rand::thread_rng(), &mut scid);
163
164 let scid = quiche::ConnectionId::from_ref(&scid);
165
166 let Ok(local_addr) = socket.local_addr() else {
167 return Err(ClientError::Other("invalid socket".to_string()));
168 };
169
170 let mut conn =
172 quiche::connect(connect_url, &scid, local_addr, peer_addr, &mut config)
173 .map_err(|e| ClientError::Other(e.to_string()))?;
174
175 if let Some(keylog) = &mut keylog {
176 if let Ok(keylog) = keylog.try_clone() {
177 conn.set_keylog(Box::new(keylog));
178 }
179 }
180
181 log::info!(
182 "connecting to {peer_addr:} from {local_addr:} with scid {scid:?}",
183 );
184
185 let mut app_proto_selected = false;
186
187 let (write, send_info) = conn.send(&mut out).expect("initial send failed");
188
189 while let Err(e) = socket.send_to(&out[..write], send_info.to) {
190 if e.kind() == std::io::ErrorKind::WouldBlock {
191 log::debug!(
192 "{} -> {}: send() would block",
193 socket.local_addr().unwrap(),
194 send_info.to
195 );
196 continue;
197 }
198
199 return Err(ClientError::Other(format!("send() failed: {e:?}")));
200 }
201
202 let app_data_start = std::time::Instant::now();
203
204 let mut action_iter = actions.iter();
205 let mut wait_duration = None;
206 let mut wait_instant = None;
207
208 let mut client = SyncClient::new(close_trigger_frames);
209 let mut waiting_for = WaitingFor::default();
210
211 loop {
212 let actual_sleep = match (wait_duration, conn.timeout()) {
213 (Some(wait), Some(timeout)) => {
214 #[allow(clippy::comparison_chain)]
215 if timeout < wait {
216 let new = wait - timeout;
219 wait_duration = Some(new);
220 Some(timeout)
221 } else if wait < timeout {
222 Some(wait)
223 } else {
224 Some(timeout)
226 }
227 },
228 (None, Some(timeout)) => Some(timeout),
229 (Some(wait), None) => Some(wait),
230 _ => None,
231 };
232
233 log::debug!("actual sleep is {actual_sleep:?}");
234 poll.poll(&mut events, actual_sleep).unwrap();
235
236 if events.is_empty() {
239 log::debug!("timed out");
240
241 conn.on_timeout();
242 }
243
244 for event in &events {
247 let socket = match event.token() {
248 mio::Token(0) => &socket,
249
250 _ => unreachable!(),
251 };
252
253 let local_addr = socket.local_addr().unwrap();
254 'read: loop {
255 let (len, from) = match socket.recv_from(&mut buf) {
256 Ok(v) => v,
257
258 Err(e) => {
259 if e.kind() == std::io::ErrorKind::WouldBlock {
262 break 'read;
263 }
264
265 return Err(ClientError::Other(format!(
266 "{local_addr}: recv() failed: {e:?}"
267 )));
268 },
269 };
270
271 let recv_info = quiche::RecvInfo {
272 to: local_addr,
273 from,
274 };
275
276 let _read = match conn.recv(&mut buf[..len], recv_info) {
278 Ok(v) => v,
279
280 Err(e) => {
281 log::debug!("{local_addr}: recv failed: {e:?}");
282 continue 'read;
283 },
284 };
285 }
286 }
287
288 log::debug!("done reading");
289
290 if conn.is_closed() {
291 log::info!(
292 "connection closed with error={:?} did_idle_timeout={}, stats={:?} path_stats={:?}",
293 conn.peer_error(),
294 conn.is_timed_out(),
295 conn.stats(),
296 conn.path_stats().collect::<Vec<quiche::PathStats>>(),
297 );
298
299 if !conn.is_established() {
300 log::info!(
301 "connection timed out after {:?}",
302 app_data_start.elapsed(),
303 );
304
305 return Err(ClientError::HandshakeFail);
306 }
307
308 break;
309 }
310
311 if (conn.is_established() || conn.is_in_early_data()) &&
314 !app_proto_selected
315 {
316 app_proto_selected = true;
317 }
318
319 if app_proto_selected {
320 check_duration_and_do_actions(
321 &mut wait_duration,
322 &mut wait_instant,
323 &mut action_iter,
324 &mut conn,
325 &mut waiting_for,
326 client.stream_parsers_mut(),
327 );
328
329 let mut wait_cleared = false;
330 for response in parse_streams(&mut conn, &mut client) {
331 let stream_id = response.stream_id;
332
333 if let StreamEventType::Finished = response.event_type {
334 waiting_for.clear_waits_on_stream(stream_id);
335 } else {
336 waiting_for.remove_wait(response);
337 }
338
339 wait_cleared = true;
340 }
341
342 if client.streams.all_close_trigger_frames_seen() {
343 client.streams.close_due_to_trigger_frames(&mut conn);
344 }
345
346 if wait_cleared {
347 check_duration_and_do_actions(
348 &mut wait_duration,
349 &mut wait_instant,
350 &mut action_iter,
351 &mut conn,
352 &mut waiting_for,
353 client.stream_parsers_mut(),
354 );
355 }
356 }
357
358 while conn.scids_left() > 0 {
360 let (scid, reset_token) = generate_cid_and_reset_token();
361
362 if conn.new_scid(&scid, reset_token, false).is_err() {
363 break;
364 }
365 }
366
367 let sockets = vec![&socket];
370
371 for socket in sockets {
372 let local_addr = socket.local_addr().unwrap();
373
374 for peer_addr in conn.paths_iter(local_addr) {
375 loop {
376 let (write, send_info) = match conn.send_on_path(
377 &mut out,
378 Some(local_addr),
379 Some(peer_addr),
380 ) {
381 Ok(v) => v,
382
383 Err(quiche::Error::Done) => {
384 break;
385 },
386
387 Err(e) => {
388 log::error!(
389 "{local_addr} -> {peer_addr}: send failed: {e:?}"
390 );
391
392 conn.close(false, 0x1, b"fail").ok();
393 break;
394 },
395 };
396
397 if let Err(e) = socket.send_to(&out[..write], send_info.to) {
398 if e.kind() == std::io::ErrorKind::WouldBlock {
399 log::debug!(
400 "{} -> {}: send() would block",
401 local_addr,
402 send_info.to
403 );
404 break;
405 }
406
407 return Err(ClientError::Other(format!(
408 "{} -> {}: send() failed: {:?}",
409 local_addr, send_info.to, e
410 )));
411 }
412 }
413 }
414 }
415
416 if conn.is_closed() {
417 log::info!(
418 "connection closed, {:?} {:?}",
419 conn.stats(),
420 conn.path_stats().collect::<Vec<quiche::PathStats>>()
421 );
422
423 if !conn.is_established() {
424 log::info!(
425 "connection timed out after {:?}",
426 app_data_start.elapsed(),
427 );
428
429 return Err(ClientError::HandshakeFail);
430 }
431
432 break;
433 }
434 }
435
436 Ok(ConnectionSummary {
437 stream_map: client.streams,
438 stats: Some(conn.stats()),
439 path_stats: conn.path_stats().collect(),
440 conn_close_details: ConnectionCloseDetails::new(&conn),
441 })
442}
443
444fn check_duration_and_do_actions(
445 wait_duration: &mut Option<Duration>, wait_instant: &mut Option<Instant>,
446 action_iter: &mut Iter<Action>, conn: &mut quiche::Connection,
447 waiting_for: &mut WaitingFor, stream_parsers: &mut StreamParserMap,
448) {
449 match wait_duration.as_ref() {
450 None => {
451 if let Some(idle_wait) =
452 handle_actions(action_iter, conn, waiting_for, stream_parsers)
453 {
454 *wait_duration = Some(idle_wait);
455 *wait_instant = Some(Instant::now());
456
457 log::info!(
462 "waiting for {idle_wait:?} before executing more actions"
463 );
464 }
465 },
466
467 Some(period) => {
468 let now = Instant::now();
469 let then = wait_instant.unwrap();
470 log::debug!(
471 "checking if actions wait period elapsed {:?} > {:?}",
472 now.duration_since(then),
473 wait_duration
474 );
475 if now.duration_since(then) >= *period {
476 log::debug!("yup!");
477 *wait_duration = None;
478
479 if let Some(idle_wait) =
480 handle_actions(action_iter, conn, waiting_for, stream_parsers)
481 {
482 *wait_duration = Some(idle_wait);
483 }
484 }
485 },
486 }
487}
488
489pub fn generate_cid_and_reset_token() -> (quiche::ConnectionId<'static>, u128) {
491 let mut scid = [0; quiche::MAX_CONN_ID_LEN];
492 rand::RngCore::fill_bytes(&mut rand::thread_rng(), &mut scid);
493 let scid = scid.to_vec().into();
494 let mut reset_token = [0; 16];
495 rand::RngCore::fill_bytes(&mut rand::thread_rng(), &mut reset_token);
496 let reset_token = u128::from_be_bytes(reset_token);
497 (scid, reset_token)
498}
499
500fn handle_actions<'a, I>(
501 iter: &mut I, conn: &mut quiche::Connection, waiting_for: &mut WaitingFor,
502 stream_parsers: &mut StreamParserMap,
503) -> Option<Duration>
504where
505 I: Iterator<Item = &'a Action>,
506{
507 if !waiting_for.is_empty() {
508 log::debug!(
509 "won't fire an action due to waiting for responses: {waiting_for:?}"
510 );
511 return None;
512 }
513
514 for action in iter {
516 match action {
517 Action::FlushPackets => return None,
518 Action::Wait { wait_type } => match wait_type {
519 WaitType::WaitDuration(period) => return Some(*period),
520 WaitType::StreamEvent(response) => {
521 log::info!(
522 "waiting for {response:?} before executing more actions"
523 );
524 waiting_for.add_wait(response);
525 return None;
526 },
527 },
528 action => execute_action(action, conn, stream_parsers),
529 }
530 }
531
532 None
533}