1#[cfg(feature = "async")]
34pub mod async_client;
35pub mod connection_summary;
36pub mod sync_client;
37
38use connection_summary::*;
39use qlog::events::h3::HttpHeader;
40
41use std::collections::HashMap;
42use std::net::SocketAddr;
43use std::time::Instant;
44
45use crate::actions::h3::Action;
46use crate::actions::h3::StreamEvent;
47use crate::actions::h3::StreamEventType;
48use crate::config::Config;
49use crate::frame::H3iFrame;
50use crate::frame::ResetStream;
51use crate::frame_parser::FrameParseResult;
52use crate::frame_parser::FrameParser;
53use crate::frame_parser::InterruptCause;
54use crate::recordreplay::qlog::QlogEvent;
55use crate::recordreplay::qlog::*;
56use qlog::events::h3::H3FrameParsed;
57use qlog::events::h3::Http3Frame;
58use qlog::events::EventData;
59use qlog::streamer::QlogStreamer;
60use serde::Serialize;
61
62use crate::quiche;
63use quiche::h3::frame::Frame as QFrame;
64use quiche::h3::Error;
65use quiche::h3::NameValue;
66use quiche::ConnectionError;
67
68const MAX_DATAGRAM_SIZE: usize = 1350;
69const QUIC_VERSION: u32 = 1;
70
71fn handle_qlog(
72 qlog_streamer: Option<&mut QlogStreamer>, qlog_frame: Http3Frame,
73 stream_id: u64,
74) {
75 if let Some(s) = qlog_streamer {
76 let ev_data = EventData::H3FrameParsed(H3FrameParsed {
77 stream_id,
78 frame: qlog_frame,
79 ..Default::default()
80 });
81
82 s.add_event_data_now(ev_data).ok();
83 }
84}
85
86#[derive(Debug, Serialize)]
87pub enum ClientError {
89 HandshakeFail,
91 HttpFail,
93 Other(String),
95}
96
97pub(crate) trait Client {
98 fn stream_parsers_mut(&mut self) -> &mut StreamParserMap;
100
101 fn handle_response_frame(&mut self, stream_id: u64, frame: H3iFrame);
104}
105
106pub(crate) type StreamParserMap = HashMap<u64, FrameParser>;
107
108pub(crate) fn execute_action(
109 action: &Action, conn: &mut quiche::Connection,
110 stream_parsers: &mut StreamParserMap,
111) {
112 match action {
113 Action::SendFrame {
114 stream_id,
115 fin_stream,
116 frame,
117 } => {
118 log::info!("frame tx id={stream_id} frame={frame:?}");
119
120 let mut d = [42; 9999];
122 let mut b = octets::OctetsMut::with_slice(&mut d);
123
124 if let Some(s) = conn.qlog_streamer() {
125 let events: QlogEvents = action.into();
126 for event in events {
127 match event {
128 QlogEvent::Event { data, ex_data } => {
129 if matches!(data.as_ref(), EventData::PacketSent(..))
131 {
132 continue;
133 }
134
135 s.add_event_data_ex_now(*data, ex_data).ok();
136 },
137
138 QlogEvent::JsonEvent(mut ev) => {
139 ev.time = Instant::now()
141 .duration_since(s.start_time())
142 .as_secs_f32() *
143 1000.0;
144 s.add_event(ev).ok();
145 },
146 }
147 }
148 }
149 let len = frame.to_bytes(&mut b).unwrap();
150
151 conn.stream_send(*stream_id, &d[..len], *fin_stream)
155 .unwrap();
156
157 stream_parsers
158 .entry(*stream_id)
159 .or_insert_with(|| FrameParser::new(*stream_id));
160 },
161
162 Action::SendHeadersFrame {
163 stream_id,
164 fin_stream,
165 headers,
166 frame,
167 ..
168 } => {
169 log::info!("headers frame tx stream={stream_id} hdrs={headers:?}");
170
171 let mut d = [42; 9999];
173 let mut b = octets::OctetsMut::with_slice(&mut d);
174
175 if let Some(s) = conn.qlog_streamer() {
176 let events: QlogEvents = action.into();
177 for event in events {
178 match event {
179 QlogEvent::Event { data, ex_data } => {
180 if matches!(data.as_ref(), EventData::PacketSent(..))
182 {
183 continue;
184 }
185
186 s.add_event_data_ex_now(*data, ex_data).ok();
187 },
188
189 QlogEvent::JsonEvent(mut ev) => {
190 ev.time = Instant::now()
192 .duration_since(s.start_time())
193 .as_secs_f32() *
194 1000.0;
195 s.add_event(ev).ok();
196 },
197 }
198 }
199 }
200 let len = frame.to_bytes(&mut b).unwrap();
201 conn.stream_send(*stream_id, &d[..len], *fin_stream)
202 .unwrap();
203
204 stream_parsers
205 .entry(*stream_id)
206 .or_insert_with(|| FrameParser::new(*stream_id));
207 },
208
209 Action::OpenUniStream {
210 stream_id,
211 fin_stream,
212 stream_type,
213 } => {
214 log::info!(
215 "open uni stream_id={stream_id} ty={stream_type} fin={fin_stream}"
216 );
217
218 let mut d = [42; 8];
219 let mut b = octets::OctetsMut::with_slice(&mut d);
220 b.put_varint(*stream_type).unwrap();
221 let off = b.off();
222
223 conn.stream_send(*stream_id, &d[..off], *fin_stream)
224 .unwrap();
225
226 stream_parsers
227 .entry(*stream_id)
228 .or_insert_with(|| FrameParser::new(*stream_id));
229 },
230
231 Action::StreamBytes {
232 stream_id,
233 bytes,
234 fin_stream,
235 } => {
236 log::info!(
237 "stream bytes tx id={} len={} fin={}",
238 stream_id,
239 bytes.len(),
240 fin_stream
241 );
242 conn.stream_send(*stream_id, bytes, *fin_stream).unwrap();
243
244 stream_parsers
245 .entry(*stream_id)
246 .or_insert_with(|| FrameParser::new(*stream_id));
247 },
248
249 Action::SendDatagram { payload } => {
250 log::info!("dgram tx len={}", payload.len(),);
251
252 conn.dgram_send(payload)
253 .expect("datagram extension not enabled by peer");
254 },
255
256 Action::ResetStream {
257 stream_id,
258 error_code,
259 } => {
260 log::info!(
261 "reset_stream stream_id={stream_id} error_code={error_code}"
262 );
263 if let Err(e) = conn.stream_shutdown(
264 *stream_id,
265 quiche::Shutdown::Write,
266 *error_code,
267 ) {
268 log::error!("can't send reset_stream: {e}");
269 return;
273 }
274
275 stream_parsers
276 .entry(*stream_id)
277 .or_insert_with(|| FrameParser::new(*stream_id));
278 },
279
280 Action::StopSending {
281 stream_id,
282 error_code,
283 } => {
284 log::info!(
285 "stop_sending stream id={stream_id} error_code={error_code}"
286 );
287
288 if let Err(e) = conn.stream_shutdown(
289 *stream_id,
290 quiche::Shutdown::Read,
291 *error_code,
292 ) {
293 log::error!("can't send stop_sending: {e}");
294 }
295
296 stream_parsers
299 .entry(*stream_id)
300 .or_insert_with(|| FrameParser::new(*stream_id));
301 },
302
303 Action::ConnectionClose { error } => {
304 let ConnectionError {
305 is_app,
306 error_code,
307 reason,
308 } = error;
309
310 log::info!("connection_close={error:?}");
311 let _ = conn.close(*is_app, *error_code, reason);
312 },
313
314 Action::FlushPackets | Action::Wait { .. } => unreachable!(),
316 }
317}
318
319pub(crate) fn parse_streams<C: Client>(
320 conn: &mut quiche::Connection, client: &mut C,
321) -> Vec<StreamEvent> {
322 let mut responded_streams: Vec<StreamEvent> =
323 Vec::with_capacity(conn.readable().len());
324
325 for stream in conn.readable() {
326 if stream % 4 != 0 {
328 continue;
329 }
330
331 loop {
332 let stream_parse_result = client
333 .stream_parsers_mut()
334 .get_mut(&stream)
335 .expect("stream readable with no parser")
336 .try_parse_frame(conn);
337
338 match stream_parse_result {
339 Ok(FrameParseResult::FrameParsed { h3i_frame, fin }) => {
340 if let H3iFrame::Headers(ref headers) = h3i_frame {
341 log::info!("hdrs={headers:?}");
342 }
343
344 handle_response_frame(
345 client,
346 conn.qlog_streamer(),
347 &mut responded_streams,
348 stream,
349 h3i_frame,
350 );
351
352 if fin {
353 handle_fin(
354 &mut responded_streams,
355 client.stream_parsers_mut(),
356 stream,
357 );
358 break;
359 }
360 },
361 Ok(FrameParseResult::Retry) => {},
362 Ok(FrameParseResult::Interrupted(cause)) => {
363 if let InterruptCause::ResetStream(error_code) = cause {
364 let frame = H3iFrame::ResetStream(ResetStream {
365 stream_id: stream,
366 error_code,
367 });
368
369 log::info!("received reset stream: {frame:?}");
370 handle_response_frame(
371 client,
372 None,
373 &mut responded_streams,
374 stream,
375 frame,
376 );
377 }
378
379 handle_fin(
380 &mut responded_streams,
381 client.stream_parsers_mut(),
382 stream,
383 );
384 break;
385 },
386 Err(e) => {
387 match e {
388 Error::TransportError(quiche::Error::Done) => {
389 log::debug!("stream {stream} exhausted");
390 },
391 Error::TransportError(quiche::Error::StreamReset(
392 error_code,
393 )) => {
394 let frame = H3iFrame::ResetStream(ResetStream {
395 stream_id: stream,
396 error_code,
397 });
398
399 log::info!("received reset stream: {frame:?}");
400
401 handle_response_frame(
402 client,
403 None,
404 &mut responded_streams,
405 stream,
406 frame,
407 );
408
409 client.stream_parsers_mut().remove(&stream);
410 },
411 _ => {
412 log::warn!("stream read error: {e}");
413 },
414 };
415
416 break;
417 },
418 }
419 }
420 }
421
422 responded_streams
423}
424
425fn handle_fin(
426 responded_streams: &mut Vec<StreamEvent>,
427 stream_parsers: &mut StreamParserMap, stream_id: u64,
428) {
429 responded_streams.push(StreamEvent {
430 stream_id,
431 event_type: StreamEventType::Finished,
432 });
433
434 stream_parsers.remove(&stream_id);
435}
436
437fn handle_response_frame<C: Client>(
440 client: &mut C, qlog_streamer: Option<&mut QlogStreamer>,
441 responded_streams: &mut Vec<StreamEvent>, stream_id: u64, frame: H3iFrame,
442) {
443 let cloned = frame.clone();
444 client.handle_response_frame(stream_id, cloned);
445
446 let mut to_qlog: Option<Http3Frame> = None;
447 let mut push_to_responses: Option<StreamEvent> = None;
448
449 match frame {
450 H3iFrame::Headers(enriched_headers) => {
451 push_to_responses = Some(StreamEvent {
452 stream_id,
453 event_type: StreamEventType::Headers,
454 });
455
456 let qlog_headers: Vec<HttpHeader> = enriched_headers
457 .headers()
458 .iter()
459 .map(|h| qlog::events::h3::HttpHeader {
460 name: String::from_utf8_lossy(h.name()).into_owned(),
461 value: String::from_utf8_lossy(h.value()).into_owned(),
462 })
463 .collect();
464
465 to_qlog = Some(Http3Frame::Headers {
466 headers: qlog_headers,
467 });
468 },
469 H3iFrame::QuicheH3(quiche_frame) => {
470 if let QFrame::Data { .. } = quiche_frame {
471 push_to_responses = Some(StreamEvent {
472 stream_id,
473 event_type: StreamEventType::Data,
474 });
475 }
476
477 to_qlog = Some(quiche_frame.to_qlog());
478 },
479 H3iFrame::ResetStream(_) => {
480 push_to_responses = Some(StreamEvent {
481 stream_id,
482 event_type: StreamEventType::Finished,
483 });
484 },
485 }
486
487 if let Some(to_qlog) = to_qlog {
488 handle_qlog(qlog_streamer, to_qlog, stream_id);
489 }
490
491 if let Some(to_push) = push_to_responses {
492 responded_streams.push(to_push);
493 }
494}
495
496pub(crate) struct ParsedArgs<'a> {
497 pub(crate) bind_addr: SocketAddr,
498 pub(crate) peer_addr: SocketAddr,
499 pub(crate) connect_url: Option<&'a str>,
500}
501
502pub(crate) fn parse_args(args: &Config) -> ParsedArgs<'_> {
503 let connect_url = if !args.omit_sni {
505 args.host_port.split(':').next()
506 } else {
507 None
508 };
509
510 let (peer_addr, bind_addr) = resolve_socket_addrs(args);
511
512 ParsedArgs {
513 peer_addr,
514 bind_addr,
515 connect_url,
516 }
517}
518
519fn resolve_socket_addrs(args: &Config) -> (SocketAddr, SocketAddr) {
520 let peer_addr = if let Some(addr) = &args.connect_to {
522 addr.parse().expect("--connect-to is expected to be a string containing an IPv4 or IPv6 address with a port. E.g. 192.0.2.0:443")
523 } else {
524 let x = format!("https://{}", args.host_port);
525 *url::Url::parse(&x)
526 .unwrap()
527 .socket_addrs(|| None)
528 .unwrap()
529 .first()
530 .unwrap()
531 };
532
533 let bind_addr = match peer_addr {
537 std::net::SocketAddr::V4(_) => format!("0.0.0.0:{}", args.source_port),
538 std::net::SocketAddr::V6(_) => format!("[::]:{}", args.source_port),
539 };
540
541 (
542 peer_addr,
543 bind_addr.parse().expect("unable to parse bind address"),
544 )
545}