1#[cfg(feature = "async")]
34pub mod async_client;
35pub mod connection_summary;
36pub mod sync_client;
37
38use connection_summary::*;
39use qlog::events::http3::FrameParsed;
40use qlog::events::http3::HttpHeader;
41use quiche::ConnectionError;
42
43use std::collections::HashMap;
44use std::net::SocketAddr;
45use std::time::Instant;
46
47use crate::actions::h3::Action;
48use crate::actions::h3::ExpectedStreamSendResult;
49use crate::actions::h3::StreamEvent;
50use crate::actions::h3::StreamEventType;
51use crate::config::Config;
52use crate::frame::H3iFrame;
53use crate::frame::ResetStream;
54use crate::frame_parser::FrameParseResult;
55use crate::frame_parser::FrameParser;
56use crate::frame_parser::InterruptCause;
57use crate::recordreplay::qlog::QlogEvent;
58use crate::recordreplay::qlog::*;
59use qlog::events::http3::Http3Frame;
60use qlog::events::EventData;
61use qlog::streamer::QlogStreamer;
62use serde::Serialize;
63
64use crate::quiche;
65use quiche::h3::frame::Frame as QFrame;
66use quiche::h3::Error;
67use quiche::h3::NameValue;
68
69const MAX_DATAGRAM_SIZE: usize = 1350;
70const QUIC_VERSION: u32 = 1;
71
72fn handle_qlog(
73 qlog_streamer: Option<&mut QlogStreamer>, qlog_frame: Http3Frame,
74 stream_id: u64,
75) {
76 if let Some(s) = qlog_streamer {
77 let ev_data = EventData::Http3FrameParsed(FrameParsed {
78 stream_id,
79 frame: qlog_frame,
80 ..Default::default()
81 });
82
83 s.add_event_data_now(ev_data).ok();
84 }
85}
86
87#[derive(Debug, Serialize)]
88pub enum ClientError {
90 HandshakeFail,
92 HttpFail,
94 Other(String),
96}
97
98pub(crate) trait Client {
99 fn stream_parsers_mut(&mut self) -> &mut StreamParserMap;
101
102 fn handle_response_frame(&mut self, stream_id: u64, frame: H3iFrame);
105}
106
107pub(crate) type StreamParserMap = HashMap<u64, FrameParser>;
108
109fn validate_stream_send_result(
110 result: quiche::Result<usize>, expected: &ExpectedStreamSendResult,
111 stream_id: u64,
112) {
113 match expected {
114 ExpectedStreamSendResult::Ok => {
115 result.unwrap_or_else(|err| {
116 panic!(
117 "Expected stream_send on stream {} to succeed, but got: {:?}",
118 stream_id, err
119 )
120 });
121 },
122 ExpectedStreamSendResult::OkExact(expected_bytes) => {
123 match result {
124 Ok(actual_bytes) if actual_bytes == *expected_bytes => {},
125 Ok(actual_bytes) => panic!(
126 "Expected stream_send on stream {} to write {} bytes, got {}",
127 stream_id, expected_bytes, actual_bytes
128 ),
129 Err(err) => panic!(
130 "Expected stream_send on stream {} to write {} bytes, got error: {:?}",
131 stream_id, expected_bytes, err
132 ),
133 }
134 },
135 ExpectedStreamSendResult::Error(expected_err) => {
136 match result {
137 Ok(written) => panic!(
138 "Expected stream_send on stream {} to fail with {:?}, but wrote {} bytes",
139 stream_id, expected_err, written
140 ),
141 Err(actual_err) if &actual_err == expected_err => {},
142 Err(actual_err) => panic!(
143 "Expected stream_send on stream {} to fail with {:?}, got {:?}",
144 stream_id, expected_err, actual_err
145 ),
146 }
147 },
148 }
149}
150
151pub(crate) fn execute_action<F: quiche::BufFactory>(
152 action: &Action, conn: &mut quiche::Connection<F>,
153 stream_parsers: &mut StreamParserMap,
154) {
155 match action {
156 Action::SendFrame {
157 stream_id,
158 fin_stream,
159 frame,
160 expected_result,
161 } => {
162 log::info!("frame tx id={stream_id} frame={frame:?}");
163
164 let mut d = [42; 9999];
166 let mut b = octets::OctetsMut::with_slice(&mut d);
167
168 if let Some(s) = conn.qlog_streamer() {
169 let events: QlogEvents = action.into();
170 for event in events {
171 match event {
172 QlogEvent::Event { data, ex_data } => {
173 if matches!(
175 data.as_ref(),
176 EventData::QuicPacketSent(..)
177 ) {
178 continue;
179 }
180
181 s.add_event_data_ex_now(*data, ex_data).ok();
182 },
183
184 QlogEvent::JsonEvent(mut ev) => {
185 ev.time = Instant::now()
187 .duration_since(s.start_time())
188 .as_secs_f64() *
189 1000.0;
190 s.add_event(ev).ok();
191 },
192 }
193 }
194 }
195 let len = frame.to_bytes(&mut b).unwrap();
196
197 let result = conn.stream_send(*stream_id, &d[..len], *fin_stream);
199 validate_stream_send_result(result, expected_result, *stream_id);
200
201 stream_parsers
202 .entry(*stream_id)
203 .or_insert_with(|| FrameParser::new(*stream_id));
204 },
205
206 Action::SendHeadersFrame {
207 stream_id,
208 fin_stream,
209 headers,
210 frame,
211 expected_result,
212 ..
213 } => {
214 log::info!("headers frame tx stream={stream_id} hdrs={headers:?}");
215
216 let mut d = [42; 9999];
218 let mut b = octets::OctetsMut::with_slice(&mut d);
219
220 if let Some(s) = conn.qlog_streamer() {
221 let events: QlogEvents = action.into();
222 for event in events {
223 match event {
224 QlogEvent::Event { data, ex_data } => {
225 if matches!(
227 data.as_ref(),
228 EventData::QuicPacketSent(..)
229 ) {
230 continue;
231 }
232
233 s.add_event_data_ex_now(*data, ex_data).ok();
234 },
235
236 QlogEvent::JsonEvent(mut ev) => {
237 ev.time = Instant::now()
239 .duration_since(s.start_time())
240 .as_secs_f64() *
241 1000.0;
242 s.add_event(ev).ok();
243 },
244 }
245 }
246 }
247 let len = frame.to_bytes(&mut b).unwrap();
248 let result = conn.stream_send(*stream_id, &d[..len], *fin_stream);
249 validate_stream_send_result(result, expected_result, *stream_id);
250
251 stream_parsers
252 .entry(*stream_id)
253 .or_insert_with(|| FrameParser::new(*stream_id));
254 },
255
256 Action::OpenUniStream {
257 stream_id,
258 fin_stream,
259 stream_type,
260 expected_result,
261 } => {
262 log::info!(
263 "open uni stream_id={stream_id} ty={stream_type} fin={fin_stream}"
264 );
265
266 let mut d = [42; 8];
267 let mut b = octets::OctetsMut::with_slice(&mut d);
268 b.put_varint(*stream_type).unwrap();
269 let off = b.off();
270
271 let result = conn.stream_send(*stream_id, &d[..off], *fin_stream);
272 validate_stream_send_result(result, expected_result, *stream_id);
273
274 stream_parsers
275 .entry(*stream_id)
276 .or_insert_with(|| FrameParser::new(*stream_id));
277 },
278
279 Action::StreamBytes {
280 stream_id,
281 bytes,
282 fin_stream,
283 expected_result,
284 } => {
285 log::info!(
286 "stream bytes tx id={} len={} fin={}",
287 stream_id,
288 bytes.len(),
289 fin_stream
290 );
291 let result = conn.stream_send(*stream_id, bytes, *fin_stream);
292 validate_stream_send_result(result, expected_result, *stream_id);
293
294 stream_parsers
295 .entry(*stream_id)
296 .or_insert_with(|| FrameParser::new(*stream_id));
297 },
298
299 Action::SendDatagram { payload } => {
300 log::info!("dgram tx len={}", payload.len(),);
301
302 conn.dgram_send(payload)
303 .expect("datagram extension not enabled by peer");
304 },
305
306 Action::ResetStream {
307 stream_id,
308 error_code,
309 } => {
310 log::info!(
311 "reset_stream stream_id={stream_id} error_code={error_code}"
312 );
313 if let Err(e) = conn.stream_shutdown(
314 *stream_id,
315 quiche::Shutdown::Write,
316 *error_code,
317 ) {
318 log::error!("can't send reset_stream: {e}");
319 return;
323 }
324
325 stream_parsers
326 .entry(*stream_id)
327 .or_insert_with(|| FrameParser::new(*stream_id));
328 },
329
330 Action::StopSending {
331 stream_id,
332 error_code,
333 } => {
334 log::info!(
335 "stop_sending stream id={stream_id} error_code={error_code}"
336 );
337
338 if let Err(e) = conn.stream_shutdown(
339 *stream_id,
340 quiche::Shutdown::Read,
341 *error_code,
342 ) {
343 log::error!("can't send stop_sending: {e}");
344 }
345
346 stream_parsers
349 .entry(*stream_id)
350 .or_insert_with(|| FrameParser::new(*stream_id));
351 },
352
353 Action::ConnectionClose { error } => {
354 let ConnectionError {
355 is_app,
356 error_code,
357 reason,
358 } = error;
359
360 log::info!("connection_close={error:?}");
361 let _ = conn.close(*is_app, *error_code, reason);
362 },
363
364 Action::FlushPackets | Action::Wait { .. } => unreachable!(),
366 }
367}
368
369pub(crate) fn parse_streams<F: quiche::BufFactory, C: Client>(
370 conn: &mut quiche::Connection<F>, client: &mut C,
371) -> Vec<StreamEvent> {
372 let mut responded_streams: Vec<StreamEvent> =
373 Vec::with_capacity(conn.readable().len());
374
375 for stream in conn.readable() {
376 if stream % 4 != 0 {
378 continue;
379 }
380
381 loop {
382 let stream_parse_result = client
383 .stream_parsers_mut()
384 .get_mut(&stream)
385 .expect("stream readable with no parser")
386 .try_parse_frame(conn);
387
388 match stream_parse_result {
389 Ok(FrameParseResult::FrameParsed { h3i_frame, fin }) => {
390 if let H3iFrame::Headers(ref headers) = h3i_frame {
391 log::info!("hdrs={headers:?}");
392 }
393
394 handle_response_frame(
395 client,
396 conn.qlog_streamer(),
397 &mut responded_streams,
398 stream,
399 h3i_frame,
400 );
401
402 if fin {
403 handle_fin(
404 &mut responded_streams,
405 client.stream_parsers_mut(),
406 stream,
407 );
408 break;
409 }
410 },
411 Ok(FrameParseResult::Retry) => {},
412 Ok(FrameParseResult::Interrupted(cause)) => {
413 if let InterruptCause::ResetStream(error_code) = cause {
414 let frame = H3iFrame::ResetStream(ResetStream {
415 stream_id: stream,
416 error_code,
417 });
418
419 log::info!("received reset stream: {frame:?}");
420 handle_response_frame(
421 client,
422 None,
423 &mut responded_streams,
424 stream,
425 frame,
426 );
427 }
428
429 handle_fin(
430 &mut responded_streams,
431 client.stream_parsers_mut(),
432 stream,
433 );
434 break;
435 },
436 Err(e) => {
437 match e {
438 Error::TransportError(quiche::Error::Done) => {
439 log::debug!("stream {stream} exhausted");
440 },
441 Error::TransportError(quiche::Error::StreamReset(
442 error_code,
443 )) => {
444 let frame = H3iFrame::ResetStream(ResetStream {
445 stream_id: stream,
446 error_code,
447 });
448
449 log::info!("received reset stream: {frame:?}");
450
451 handle_response_frame(
452 client,
453 None,
454 &mut responded_streams,
455 stream,
456 frame,
457 );
458
459 client.stream_parsers_mut().remove(&stream);
460 },
461 _ => {
462 log::warn!("stream read error: {e}");
463 },
464 };
465
466 break;
467 },
468 }
469 }
470 }
471
472 responded_streams
473}
474
475fn handle_fin(
476 responded_streams: &mut Vec<StreamEvent>,
477 stream_parsers: &mut StreamParserMap, stream_id: u64,
478) {
479 responded_streams.push(StreamEvent {
480 stream_id,
481 event_type: StreamEventType::Finished,
482 });
483
484 stream_parsers.remove(&stream_id);
485}
486
487fn handle_response_frame<C: Client>(
490 client: &mut C, qlog_streamer: Option<&mut QlogStreamer>,
491 responded_streams: &mut Vec<StreamEvent>, stream_id: u64, frame: H3iFrame,
492) {
493 let cloned = frame.clone();
494 client.handle_response_frame(stream_id, cloned);
495
496 let mut to_qlog: Option<Http3Frame> = None;
497 let mut push_to_responses: Option<StreamEvent> = None;
498
499 match frame {
500 H3iFrame::Headers(enriched_headers) => {
501 push_to_responses = Some(StreamEvent {
502 stream_id,
503 event_type: StreamEventType::Headers,
504 });
505
506 let qlog_headers: Vec<HttpHeader> = enriched_headers
507 .headers()
508 .iter()
509 .map(|h| qlog::events::http3::HttpHeader {
510 name: Some(String::from_utf8_lossy(h.name()).into_owned()),
511 name_bytes: None,
512 value: Some(String::from_utf8_lossy(h.value()).into_owned()),
513 value_bytes: None,
514 })
515 .collect();
516
517 to_qlog = Some(Http3Frame::Headers {
518 headers: qlog_headers,
519 raw: None,
520 });
521 },
522 H3iFrame::QuicheH3(quiche_frame) => {
523 if let QFrame::Data { .. } = quiche_frame {
524 push_to_responses = Some(StreamEvent {
525 stream_id,
526 event_type: StreamEventType::Data,
527 });
528 }
529
530 to_qlog = Some(quiche_frame.to_qlog());
531 },
532 H3iFrame::ResetStream(_) => {
533 push_to_responses = Some(StreamEvent {
534 stream_id,
535 event_type: StreamEventType::Finished,
536 });
537 },
538 }
539
540 if let Some(to_qlog) = to_qlog {
541 handle_qlog(qlog_streamer, to_qlog, stream_id);
542 }
543
544 if let Some(to_push) = push_to_responses {
545 responded_streams.push(to_push);
546 }
547}
548
549pub(crate) struct ParsedArgs<'a> {
550 pub(crate) bind_addr: SocketAddr,
551 pub(crate) peer_addr: SocketAddr,
552 pub(crate) connect_url: Option<&'a str>,
553}
554
555pub(crate) fn parse_args(args: &Config) -> ParsedArgs<'_> {
556 let connect_url = if !args.omit_sni {
558 args.host_port.split(':').next()
559 } else {
560 None
561 };
562
563 let (peer_addr, bind_addr) = resolve_socket_addrs(args);
564
565 ParsedArgs {
566 peer_addr,
567 bind_addr,
568 connect_url,
569 }
570}
571
572fn resolve_socket_addrs(args: &Config) -> (SocketAddr, SocketAddr) {
573 let peer_addr = if let Some(addr) = &args.connect_to {
575 addr.parse().expect("--connect-to is expected to be a string containing an IPv4 or IPv6 address with a port. E.g. 192.0.2.0:443")
576 } else {
577 let x = format!("https://{}", args.host_port);
578 *url::Url::parse(&x)
579 .unwrap()
580 .socket_addrs(|| None)
581 .unwrap()
582 .first()
583 .unwrap()
584 };
585
586 let bind_addr = match peer_addr {
590 std::net::SocketAddr::V4(_) => format!("0.0.0.0:{}", args.source_port),
591 std::net::SocketAddr::V6(_) => format!("[::]:{}", args.source_port),
592 };
593
594 (
595 peer_addr,
596 bind_addr.parse().expect("unable to parse bind address"),
597 )
598}