1use std::slice::Iter;
30use std::time::Duration;
31use std::time::Instant;
32
33use crate::frame::H3iFrame;
34use crate::quiche;
35
36use crate::actions::h3::Action;
37use crate::actions::h3::StreamEventType;
38use crate::actions::h3::WaitType;
39use crate::actions::h3::WaitingFor;
40use crate::client::build_quiche_connection;
41use crate::client::execute_action;
42use crate::client::parse_streams;
43use crate::client::ClientError;
44use crate::client::ConnectionCloseDetails;
45use crate::client::MAX_DATAGRAM_SIZE;
46use crate::config::Config;
47
48use super::Client;
49use super::CloseTriggerFrames;
50use super::ConnectionSummary;
51use super::StreamMap;
52use super::StreamParserMap;
53
54#[derive(Default)]
55struct SyncClient {
56 streams: StreamMap,
57 stream_parsers: StreamParserMap,
58}
59
60impl SyncClient {
61 fn new(close_trigger_frames: Option<CloseTriggerFrames>) -> Self {
62 Self {
63 streams: StreamMap::new(close_trigger_frames),
64 ..Default::default()
65 }
66 }
67}
68
69impl Client for SyncClient {
70 fn stream_parsers_mut(&mut self) -> &mut StreamParserMap {
71 &mut self.stream_parsers
72 }
73
74 fn handle_response_frame(&mut self, stream_id: u64, frame: H3iFrame) {
75 self.streams.insert(stream_id, frame);
76 }
77}
78
79pub fn connect(
90 args: Config, actions: &[Action],
91 close_trigger_frames: Option<CloseTriggerFrames>,
92) -> std::result::Result<ConnectionSummary, ClientError> {
93 let mut buf = [0; 65535];
94 let mut out = [0; MAX_DATAGRAM_SIZE];
95
96 let mut poll = mio::Poll::new().unwrap();
98 let mut events = mio::Events::with_capacity(1024);
99
100 let peer_addr = if let Some(addr) = &args.connect_to {
102 addr.parse().expect("--connect-to is expected to be a string containing an IPv4 or IPv6 address with a port. E.g. 192.0.2.0:443")
103 } else {
104 let x = format!("https://{}", args.host_port);
105 *url::Url::parse(&x)
106 .unwrap()
107 .socket_addrs(|| None)
108 .unwrap()
109 .first()
110 .unwrap()
111 };
112
113 let bind_addr = match peer_addr {
117 std::net::SocketAddr::V4(_) => format!("0.0.0.0:{}", args.source_port),
118 std::net::SocketAddr::V6(_) => format!("[::]:{}", args.source_port),
119 };
120
121 let mut socket =
124 mio::net::UdpSocket::bind(bind_addr.parse().unwrap()).unwrap();
125 poll.registry()
126 .register(&mut socket, mio::Token(0), mio::Interest::READABLE)
127 .unwrap();
128
129 let Ok(local_addr) = socket.local_addr() else {
130 return Err(ClientError::Other("invalid socket".to_string()));
131 };
132
133 let mut conn = build_quiche_connection(args, peer_addr, local_addr)
134 .map_err(|_| ClientError::HandshakeFail)?;
135
136 let mut app_proto_selected = false;
137
138 let (write, send_info) = conn.send(&mut out).expect("initial send failed");
139
140 while let Err(e) = socket.send_to(&out[..write], send_info.to) {
141 if e.kind() == std::io::ErrorKind::WouldBlock {
142 log::debug!(
143 "{} -> {}: send() would block",
144 socket.local_addr().unwrap(),
145 send_info.to
146 );
147 continue;
148 }
149
150 return Err(ClientError::Other(format!("send() failed: {e:?}")));
151 }
152
153 let app_data_start = std::time::Instant::now();
154
155 let mut action_iter = actions.iter();
156 let mut wait_duration = None;
157 let mut wait_instant = None;
158
159 let mut client = SyncClient::new(close_trigger_frames);
160 let mut waiting_for = WaitingFor::default();
161
162 loop {
163 let actual_sleep = match (wait_duration, conn.timeout()) {
164 (Some(wait), Some(timeout)) => {
165 #[allow(clippy::comparison_chain)]
166 if timeout < wait {
167 let new = wait - timeout;
170 wait_duration = Some(new);
171 Some(timeout)
172 } else if wait < timeout {
173 Some(wait)
174 } else {
175 Some(timeout)
177 }
178 },
179 (None, Some(timeout)) => Some(timeout),
180 (Some(wait), None) => Some(wait),
181 _ => None,
182 };
183
184 log::debug!("actual sleep is {:?}", actual_sleep);
185 poll.poll(&mut events, actual_sleep).unwrap();
186
187 if events.is_empty() {
190 log::debug!("timed out");
191
192 conn.on_timeout();
193 }
194
195 for event in &events {
198 let socket = match event.token() {
199 mio::Token(0) => &socket,
200
201 _ => unreachable!(),
202 };
203
204 let local_addr = socket.local_addr().unwrap();
205 'read: loop {
206 let (len, from) = match socket.recv_from(&mut buf) {
207 Ok(v) => v,
208
209 Err(e) => {
210 if e.kind() == std::io::ErrorKind::WouldBlock {
213 break 'read;
214 }
215
216 return Err(ClientError::Other(format!(
217 "{local_addr}: recv() failed: {e:?}"
218 )));
219 },
220 };
221
222 let recv_info = quiche::RecvInfo {
223 to: local_addr,
224 from,
225 };
226
227 let _read = match conn.recv(&mut buf[..len], recv_info) {
229 Ok(v) => v,
230
231 Err(e) => {
232 log::debug!("{}: recv failed: {:?}", local_addr, e);
233 continue 'read;
234 },
235 };
236 }
237 }
238
239 log::debug!("done reading");
240
241 if conn.is_closed() {
242 log::info!(
243 "connection closed with error={:?} did_idle_timeout={}, stats={:?} path_stats={:?}",
244 conn.peer_error(),
245 conn.is_timed_out(),
246 conn.stats(),
247 conn.path_stats().collect::<Vec<quiche::PathStats>>(),
248 );
249
250 if !conn.is_established() {
251 log::info!(
252 "connection timed out after {:?}",
253 app_data_start.elapsed(),
254 );
255
256 return Err(ClientError::HandshakeFail);
257 }
258
259 break;
260 }
261
262 if (conn.is_established() || conn.is_in_early_data()) &&
265 !app_proto_selected
266 {
267 app_proto_selected = true;
268 }
269
270 if app_proto_selected {
271 check_duration_and_do_actions(
272 &mut wait_duration,
273 &mut wait_instant,
274 &mut action_iter,
275 &mut conn,
276 &mut waiting_for,
277 client.stream_parsers_mut(),
278 );
279
280 let mut wait_cleared = false;
281 for response in parse_streams(&mut conn, &mut client) {
282 let stream_id = response.stream_id;
283
284 if let StreamEventType::Finished = response.event_type {
285 waiting_for.clear_waits_on_stream(stream_id);
286 } else {
287 waiting_for.remove_wait(response);
288 }
289
290 wait_cleared = true;
291 }
292
293 if client.streams.all_close_trigger_frames_seen() {
294 client.streams.close_due_to_trigger_frames(&mut conn);
295 }
296
297 if wait_cleared {
298 check_duration_and_do_actions(
299 &mut wait_duration,
300 &mut wait_instant,
301 &mut action_iter,
302 &mut conn,
303 &mut waiting_for,
304 client.stream_parsers_mut(),
305 );
306 }
307 }
308
309 while conn.scids_left() > 0 {
311 let (scid, reset_token) = generate_cid_and_reset_token();
312
313 if conn.new_scid(&scid, reset_token, false).is_err() {
314 break;
315 }
316 }
317
318 let sockets = vec![&socket];
321
322 for socket in sockets {
323 let local_addr = socket.local_addr().unwrap();
324
325 for peer_addr in conn.paths_iter(local_addr) {
326 loop {
327 let (write, send_info) = match conn.send_on_path(
328 &mut out,
329 Some(local_addr),
330 Some(peer_addr),
331 ) {
332 Ok(v) => v,
333
334 Err(quiche::Error::Done) => {
335 break;
336 },
337
338 Err(e) => {
339 log::error!(
340 "{} -> {}: send failed: {:?}",
341 local_addr,
342 peer_addr,
343 e
344 );
345
346 conn.close(false, 0x1, b"fail").ok();
347 break;
348 },
349 };
350
351 if let Err(e) = socket.send_to(&out[..write], send_info.to) {
352 if e.kind() == std::io::ErrorKind::WouldBlock {
353 log::debug!(
354 "{} -> {}: send() would block",
355 local_addr,
356 send_info.to
357 );
358 break;
359 }
360
361 return Err(ClientError::Other(format!(
362 "{} -> {}: send() failed: {:?}",
363 local_addr, send_info.to, e
364 )));
365 }
366 }
367 }
368 }
369
370 if conn.is_closed() {
371 log::info!(
372 "connection closed, {:?} {:?}",
373 conn.stats(),
374 conn.path_stats().collect::<Vec<quiche::PathStats>>()
375 );
376
377 if !conn.is_established() {
378 log::info!(
379 "connection timed out after {:?}",
380 app_data_start.elapsed(),
381 );
382
383 return Err(ClientError::HandshakeFail);
384 }
385
386 break;
387 }
388 }
389
390 Ok(ConnectionSummary {
391 stream_map: client.streams,
392 stats: Some(conn.stats()),
393 path_stats: conn.path_stats().collect(),
394 conn_close_details: ConnectionCloseDetails::new(&conn),
395 })
396}
397
398fn check_duration_and_do_actions(
399 wait_duration: &mut Option<Duration>, wait_instant: &mut Option<Instant>,
400 action_iter: &mut Iter<Action>, conn: &mut quiche::Connection,
401 waiting_for: &mut WaitingFor, stream_parsers: &mut StreamParserMap,
402) {
403 match wait_duration.as_ref() {
404 None => {
405 if let Some(idle_wait) =
406 handle_actions(action_iter, conn, waiting_for, stream_parsers)
407 {
408 *wait_duration = Some(idle_wait);
409 *wait_instant = Some(Instant::now());
410
411 log::info!(
416 "waiting for {:?} before executing more actions",
417 idle_wait
418 );
419 }
420 },
421
422 Some(period) => {
423 let now = Instant::now();
424 let then = wait_instant.unwrap();
425 log::debug!(
426 "checking if actions wait period elapsed {:?} > {:?}",
427 now.duration_since(then),
428 wait_duration
429 );
430 if now.duration_since(then) >= *period {
431 log::debug!("yup!");
432 *wait_duration = None;
433
434 if let Some(idle_wait) =
435 handle_actions(action_iter, conn, waiting_for, stream_parsers)
436 {
437 *wait_duration = Some(idle_wait);
438 }
439 }
440 },
441 }
442}
443
444pub fn generate_cid_and_reset_token() -> (quiche::ConnectionId<'static>, u128) {
446 let mut scid = [0; quiche::MAX_CONN_ID_LEN];
447 rand::RngCore::fill_bytes(&mut rand::thread_rng(), &mut scid);
448 let scid = scid.to_vec().into();
449 let mut reset_token = [0; 16];
450 rand::RngCore::fill_bytes(&mut rand::thread_rng(), &mut reset_token);
451 let reset_token = u128::from_be_bytes(reset_token);
452 (scid, reset_token)
453}
454
455pub fn make_qlog_writer(
457 dir: &std::ffi::OsStr, role: &str, id: &str,
458) -> std::io::BufWriter<std::fs::File> {
459 let mut path = std::path::PathBuf::from(dir);
460 let filename = format!("{role}-{id}.sqlog");
461 path.push(filename);
462
463 match std::fs::File::create(&path) {
464 Ok(f) => std::io::BufWriter::new(f),
465
466 Err(e) => panic!(
467 "Error creating qlog file attempted path was {:?}: {}",
468 path, e
469 ),
470 }
471}
472
473fn handle_actions<'a, I>(
474 iter: &mut I, conn: &mut quiche::Connection, waiting_for: &mut WaitingFor,
475 stream_parsers: &mut StreamParserMap,
476) -> Option<Duration>
477where
478 I: Iterator<Item = &'a Action>,
479{
480 if !waiting_for.is_empty() {
481 log::debug!(
482 "won't fire an action due to waiting for responses: {:?}",
483 waiting_for
484 );
485 return None;
486 }
487
488 for action in iter {
490 match action {
491 Action::FlushPackets => return None,
492 Action::Wait { wait_type } => match wait_type {
493 WaitType::WaitDuration(period) => return Some(*period),
494 WaitType::StreamEvent(response) => {
495 log::info!(
496 "waiting for {:?} before executing more actions",
497 response
498 );
499 waiting_for.add_wait(response);
500 return None;
501 },
502 },
503 action => execute_action(action, conn, stream_parsers),
504 }
505 }
506
507 None
508}