1#[cfg(feature = "async")]
34pub mod async_client;
35pub mod connection_summary;
36pub mod sync_client;
37
38use connection_summary::*;
39use qlog::events::h3::HttpHeader;
40
41use std::collections::HashMap;
42use std::net::SocketAddr;
43use std::time::Instant;
44
45use crate::actions::h3::Action;
46use crate::actions::h3::StreamEvent;
47use crate::actions::h3::StreamEventType;
48use crate::config::Config;
49use crate::frame::H3iFrame;
50use crate::frame::ResetStream;
51use crate::frame_parser::FrameParseResult;
52use crate::frame_parser::FrameParser;
53use crate::frame_parser::InterruptCause;
54use crate::recordreplay::qlog::QlogEvent;
55use crate::recordreplay::qlog::*;
56use qlog::events::h3::H3FrameParsed;
57use qlog::events::h3::Http3Frame;
58use qlog::events::EventData;
59use qlog::streamer::QlogStreamer;
60use serde::Serialize;
61
62use crate::quiche;
63use quiche::h3::frame::Frame as QFrame;
64use quiche::h3::Error;
65use quiche::h3::NameValue;
66use quiche::ConnectionError;
67
68const MAX_DATAGRAM_SIZE: usize = 1350;
69const QUIC_VERSION: u32 = 1;
70
71fn handle_qlog(
72 qlog_streamer: Option<&mut QlogStreamer>, qlog_frame: Http3Frame,
73 stream_id: u64,
74) {
75 if let Some(s) = qlog_streamer {
76 let ev_data = EventData::H3FrameParsed(H3FrameParsed {
77 stream_id,
78 frame: qlog_frame,
79 ..Default::default()
80 });
81
82 s.add_event_data_now(ev_data).ok();
83 }
84}
85
86#[derive(Debug, Serialize)]
87pub enum ClientError {
89 HandshakeFail,
91 HttpFail,
93 Other(String),
95}
96
97pub(crate) trait Client {
98 fn stream_parsers_mut(&mut self) -> &mut StreamParserMap;
100
101 fn handle_response_frame(&mut self, stream_id: u64, frame: H3iFrame);
104}
105
106pub(crate) type StreamParserMap = HashMap<u64, FrameParser>;
107
108pub(crate) fn execute_action(
109 action: &Action, conn: &mut quiche::Connection,
110 stream_parsers: &mut StreamParserMap,
111) {
112 match action {
113 Action::SendFrame {
114 stream_id,
115 fin_stream,
116 frame,
117 } => {
118 log::info!("frame tx id={stream_id} frame={frame:?}");
119
120 let mut d = [42; 9999];
122 let mut b = octets::OctetsMut::with_slice(&mut d);
123
124 if let Some(s) = conn.qlog_streamer() {
125 let events: QlogEvents = action.into();
126 for event in events {
127 match event {
128 QlogEvent::Event { data, ex_data } => {
129 if matches!(data.as_ref(), EventData::PacketSent(..))
131 {
132 continue;
133 }
134
135 s.add_event_data_ex_now(*data, ex_data).ok();
136 },
137
138 QlogEvent::JsonEvent(mut ev) => {
139 ev.time = Instant::now()
141 .duration_since(s.start_time())
142 .as_secs_f32() *
143 1000.0;
144 s.add_event(ev).ok();
145 },
146 }
147 }
148 }
149 let len = frame.to_bytes(&mut b).unwrap();
150
151 conn.stream_send(*stream_id, &d[..len], *fin_stream)
155 .unwrap();
156
157 stream_parsers
158 .entry(*stream_id)
159 .or_insert_with(|| FrameParser::new(*stream_id));
160 },
161
162 Action::SendHeadersFrame {
163 stream_id,
164 fin_stream,
165 headers,
166 frame,
167 ..
168 } => {
169 log::info!("headers frame tx stream={stream_id} hdrs={headers:?}");
170
171 let mut d = [42; 9999];
173 let mut b = octets::OctetsMut::with_slice(&mut d);
174
175 if let Some(s) = conn.qlog_streamer() {
176 let events: QlogEvents = action.into();
177 for event in events {
178 match event {
179 QlogEvent::Event { data, ex_data } => {
180 if matches!(data.as_ref(), EventData::PacketSent(..))
182 {
183 continue;
184 }
185
186 s.add_event_data_ex_now(*data, ex_data).ok();
187 },
188
189 QlogEvent::JsonEvent(mut ev) => {
190 ev.time = Instant::now()
192 .duration_since(s.start_time())
193 .as_secs_f32() *
194 1000.0;
195 s.add_event(ev).ok();
196 },
197 }
198 }
199 }
200 let len = frame.to_bytes(&mut b).unwrap();
201 conn.stream_send(*stream_id, &d[..len], *fin_stream)
202 .unwrap();
203
204 stream_parsers
205 .entry(*stream_id)
206 .or_insert_with(|| FrameParser::new(*stream_id));
207 },
208
209 Action::OpenUniStream {
210 stream_id,
211 fin_stream,
212 stream_type,
213 } => {
214 log::info!(
215 "open uni stream_id={stream_id} ty={stream_type} fin={fin_stream}"
216 );
217
218 let mut d = [42; 8];
219 let mut b = octets::OctetsMut::with_slice(&mut d);
220 b.put_varint(*stream_type).unwrap();
221 let off = b.off();
222
223 conn.stream_send(*stream_id, &d[..off], *fin_stream)
224 .unwrap();
225
226 stream_parsers
227 .entry(*stream_id)
228 .or_insert_with(|| FrameParser::new(*stream_id));
229 },
230
231 Action::StreamBytes {
232 stream_id,
233 bytes,
234 fin_stream,
235 } => {
236 log::info!(
237 "stream bytes tx id={} len={} fin={}",
238 stream_id,
239 bytes.len(),
240 fin_stream
241 );
242 conn.stream_send(*stream_id, bytes, *fin_stream).unwrap();
243
244 stream_parsers
245 .entry(*stream_id)
246 .or_insert_with(|| FrameParser::new(*stream_id));
247 },
248
249 Action::ResetStream {
250 stream_id,
251 error_code,
252 } => {
253 log::info!(
254 "reset_stream stream_id={stream_id} error_code={error_code}"
255 );
256 if let Err(e) = conn.stream_shutdown(
257 *stream_id,
258 quiche::Shutdown::Write,
259 *error_code,
260 ) {
261 log::error!("can't send reset_stream: {e}");
262 return;
266 }
267
268 stream_parsers
269 .entry(*stream_id)
270 .or_insert_with(|| FrameParser::new(*stream_id));
271 },
272
273 Action::StopSending {
274 stream_id,
275 error_code,
276 } => {
277 log::info!(
278 "stop_sending stream id={stream_id} error_code={error_code}"
279 );
280
281 if let Err(e) = conn.stream_shutdown(
282 *stream_id,
283 quiche::Shutdown::Read,
284 *error_code,
285 ) {
286 log::error!("can't send stop_sending: {e}");
287 }
288
289 stream_parsers
292 .entry(*stream_id)
293 .or_insert_with(|| FrameParser::new(*stream_id));
294 },
295
296 Action::ConnectionClose { error } => {
297 let ConnectionError {
298 is_app,
299 error_code,
300 reason,
301 } = error;
302
303 log::info!("connection_close={error:?}");
304 let _ = conn.close(*is_app, *error_code, reason);
305 },
306
307 Action::FlushPackets | Action::Wait { .. } => unreachable!(),
309 }
310}
311
312pub(crate) fn parse_streams<C: Client>(
313 conn: &mut quiche::Connection, client: &mut C,
314) -> Vec<StreamEvent> {
315 let mut responded_streams: Vec<StreamEvent> =
316 Vec::with_capacity(conn.readable().len());
317
318 for stream in conn.readable() {
319 if stream % 4 != 0 {
321 continue;
322 }
323
324 loop {
325 let stream_parse_result = client
326 .stream_parsers_mut()
327 .get_mut(&stream)
328 .expect("stream readable with no parser")
329 .try_parse_frame(conn);
330
331 match stream_parse_result {
332 Ok(FrameParseResult::FrameParsed { h3i_frame, fin }) => {
333 if let H3iFrame::Headers(ref headers) = h3i_frame {
334 log::info!("hdrs={headers:?}");
335 }
336
337 handle_response_frame(
338 client,
339 conn.qlog_streamer(),
340 &mut responded_streams,
341 stream,
342 h3i_frame,
343 );
344
345 if fin {
346 handle_fin(
347 &mut responded_streams,
348 client.stream_parsers_mut(),
349 stream,
350 );
351 break;
352 }
353 },
354 Ok(FrameParseResult::Retry) => {},
355 Ok(FrameParseResult::Interrupted(cause)) => {
356 if let InterruptCause::ResetStream(error_code) = cause {
357 let frame = H3iFrame::ResetStream(ResetStream {
358 stream_id: stream,
359 error_code,
360 });
361
362 log::info!("received reset stream: {frame:?}");
363 handle_response_frame(
364 client,
365 None,
366 &mut responded_streams,
367 stream,
368 frame,
369 );
370 }
371
372 handle_fin(
373 &mut responded_streams,
374 client.stream_parsers_mut(),
375 stream,
376 );
377 break;
378 },
379 Err(e) => {
380 match e {
381 Error::TransportError(quiche::Error::Done) => {
382 log::debug!("stream {stream} exhausted");
383 },
384 Error::TransportError(quiche::Error::StreamReset(
385 error_code,
386 )) => {
387 let frame = H3iFrame::ResetStream(ResetStream {
388 stream_id: stream,
389 error_code,
390 });
391
392 log::info!("received reset stream: {frame:?}");
393
394 handle_response_frame(
395 client,
396 None,
397 &mut responded_streams,
398 stream,
399 frame,
400 );
401
402 client.stream_parsers_mut().remove(&stream);
403 },
404 _ => {
405 log::warn!("stream read error: {e}");
406 },
407 };
408
409 break;
410 },
411 }
412 }
413 }
414
415 responded_streams
416}
417
418fn handle_fin(
419 responded_streams: &mut Vec<StreamEvent>,
420 stream_parsers: &mut StreamParserMap, stream_id: u64,
421) {
422 responded_streams.push(StreamEvent {
423 stream_id,
424 event_type: StreamEventType::Finished,
425 });
426
427 stream_parsers.remove(&stream_id);
428}
429
430fn handle_response_frame<C: Client>(
433 client: &mut C, qlog_streamer: Option<&mut QlogStreamer>,
434 responded_streams: &mut Vec<StreamEvent>, stream_id: u64, frame: H3iFrame,
435) {
436 let cloned = frame.clone();
437 client.handle_response_frame(stream_id, cloned);
438
439 let mut to_qlog: Option<Http3Frame> = None;
440 let mut push_to_responses: Option<StreamEvent> = None;
441
442 match frame {
443 H3iFrame::Headers(enriched_headers) => {
444 push_to_responses = Some(StreamEvent {
445 stream_id,
446 event_type: StreamEventType::Headers,
447 });
448
449 let qlog_headers: Vec<HttpHeader> = enriched_headers
450 .headers()
451 .iter()
452 .map(|h| qlog::events::h3::HttpHeader {
453 name: String::from_utf8_lossy(h.name()).into_owned(),
454 value: String::from_utf8_lossy(h.value()).into_owned(),
455 })
456 .collect();
457
458 to_qlog = Some(Http3Frame::Headers {
459 headers: qlog_headers,
460 });
461 },
462 H3iFrame::QuicheH3(quiche_frame) => {
463 if let QFrame::Data { .. } = quiche_frame {
464 push_to_responses = Some(StreamEvent {
465 stream_id,
466 event_type: StreamEventType::Data,
467 });
468 }
469
470 to_qlog = Some(quiche_frame.to_qlog());
471 },
472 H3iFrame::ResetStream(_) => {
473 push_to_responses = Some(StreamEvent {
474 stream_id,
475 event_type: StreamEventType::Finished,
476 });
477 },
478 }
479
480 if let Some(to_qlog) = to_qlog {
481 handle_qlog(qlog_streamer, to_qlog, stream_id);
482 }
483
484 if let Some(to_push) = push_to_responses {
485 responded_streams.push(to_push);
486 }
487}
488
489pub(crate) struct ParsedArgs<'a> {
490 pub(crate) bind_addr: SocketAddr,
491 pub(crate) peer_addr: SocketAddr,
492 pub(crate) connect_url: Option<&'a str>,
493}
494
495pub(crate) fn parse_args(args: &Config) -> ParsedArgs<'_> {
496 let connect_url = if !args.omit_sni {
498 args.host_port.split(':').next()
499 } else {
500 None
501 };
502
503 let (peer_addr, bind_addr) = resolve_socket_addrs(args);
504
505 ParsedArgs {
506 peer_addr,
507 bind_addr,
508 connect_url,
509 }
510}
511
512fn resolve_socket_addrs(args: &Config) -> (SocketAddr, SocketAddr) {
513 let peer_addr = if let Some(addr) = &args.connect_to {
515 addr.parse().expect("--connect-to is expected to be a string containing an IPv4 or IPv6 address with a port. E.g. 192.0.2.0:443")
516 } else {
517 let x = format!("https://{}", args.host_port);
518 *url::Url::parse(&x)
519 .unwrap()
520 .socket_addrs(|| None)
521 .unwrap()
522 .first()
523 .unwrap()
524 };
525
526 let bind_addr = match peer_addr {
530 std::net::SocketAddr::V4(_) => format!("0.0.0.0:{}", args.source_port),
531 std::net::SocketAddr::V6(_) => format!("[::]:{}", args.source_port),
532 };
533
534 (
535 peer_addr,
536 bind_addr.parse().expect("unable to parse bind address"),
537 )
538}