1pub mod connection_summary;
34pub mod sync_client;
35
36use connection_summary::*;
37use qlog::events::h3::HttpHeader;
38use quiche::ConnectionError;
39
40use std::collections::HashMap;
41use std::net::SocketAddr;
42use std::time::Instant;
43
44use crate::actions::h3::Action;
45use crate::actions::h3::StreamEvent;
46use crate::actions::h3::StreamEventType;
47use crate::config::Config;
48use crate::frame::H3iFrame;
49use crate::frame::ResetStream;
50use crate::frame_parser::FrameParseResult;
51use crate::frame_parser::FrameParser;
52use crate::frame_parser::InterruptCause;
53use crate::recordreplay::qlog::QlogEvent;
54use crate::recordreplay::qlog::*;
55use qlog::events::h3::H3FrameParsed;
56use qlog::events::h3::Http3Frame;
57use qlog::events::EventData;
58use qlog::streamer::QlogStreamer;
59use serde::Serialize;
60
61use quiche::h3::frame::Frame as QFrame;
62use quiche::h3::Error;
63use quiche::h3::NameValue;
64use quiche::Connection;
65use quiche::Result;
66use quiche::{
67 self,
68};
69
70const MAX_DATAGRAM_SIZE: usize = 1350;
71const QUIC_VERSION: u32 = 1;
72
73pub fn build_quiche_connection(
74 args: Config, peer_addr: SocketAddr, local_addr: SocketAddr,
75) -> Result<Connection> {
76 let connect_url = if !args.omit_sni {
78 args.host_port.split(':').next()
79 } else {
80 None
81 };
82
83 let mut config = quiche::Config::new(QUIC_VERSION).unwrap();
85
86 config.verify_peer(args.verify_peer);
87 config.set_application_protos(&[b"h3"]).unwrap();
88 config.set_max_idle_timeout(args.idle_timeout);
89 config.set_max_recv_udp_payload_size(MAX_DATAGRAM_SIZE);
90 config.set_max_send_udp_payload_size(MAX_DATAGRAM_SIZE);
91 config.set_initial_max_data(10_000_000);
92 config
93 .set_initial_max_stream_data_bidi_local(args.max_stream_data_bidi_local);
94 config.set_initial_max_stream_data_bidi_remote(
95 args.max_stream_data_bidi_remote,
96 );
97 config.set_initial_max_stream_data_uni(args.max_stream_data_uni);
98 config.set_initial_max_streams_bidi(args.max_streams_bidi);
99 config.set_initial_max_streams_uni(args.max_streams_uni);
100 config.set_disable_active_migration(true);
101 config.set_active_connection_id_limit(0);
102
103 config.set_max_connection_window(args.max_window);
104 config.set_max_stream_window(args.max_stream_window);
105
106 let mut keylog = None;
107
108 if let Some(keylog_path) = std::env::var_os("SSLKEYLOGFILE") {
109 let file = std::fs::OpenOptions::new()
110 .create(true)
111 .append(true)
112 .open(keylog_path)
113 .unwrap();
114
115 keylog = Some(file);
116
117 config.log_keys();
118 }
119
120 config.grease(false);
121
122 let mut scid = [0; quiche::MAX_CONN_ID_LEN];
124 rand::RngCore::fill_bytes(&mut rand::thread_rng(), &mut scid);
125
126 let scid = quiche::ConnectionId::from_ref(&scid);
127
128 let mut conn =
130 quiche::connect(connect_url, &scid, local_addr, peer_addr, &mut config)?;
131
132 if let Some(keylog) = &mut keylog {
133 if let Ok(keylog) = keylog.try_clone() {
134 conn.set_keylog(Box::new(keylog));
135 }
136 }
137
138 log::info!(
139 "connecting to {:} from {:} with scid {:?}",
140 peer_addr,
141 local_addr,
142 scid,
143 );
144
145 Ok(conn)
146}
147
148fn handle_qlog(
149 qlog_streamer: Option<&mut QlogStreamer>, qlog_frame: Http3Frame,
150 stream_id: u64,
151) {
152 if let Some(s) = qlog_streamer {
153 let ev_data = EventData::H3FrameParsed(H3FrameParsed {
154 stream_id,
155 frame: qlog_frame,
156 ..Default::default()
157 });
158
159 s.add_event_data_now(ev_data).ok();
160 }
161}
162
163#[derive(Debug, Serialize)]
164pub enum ClientError {
166 HandshakeFail,
168 HttpFail,
170 Other(String),
172}
173
174pub(crate) trait Client {
175 fn stream_parsers_mut(&mut self) -> &mut StreamParserMap;
177
178 fn handle_response_frame(&mut self, stream_id: u64, frame: H3iFrame);
181}
182
183pub(crate) type StreamParserMap = HashMap<u64, FrameParser>;
184
185pub(crate) fn execute_action(
186 action: &Action, conn: &mut quiche::Connection,
187 stream_parsers: &mut StreamParserMap,
188) {
189 match action {
190 Action::SendFrame {
191 stream_id,
192 fin_stream,
193 frame,
194 } => {
195 log::info!("frame tx id={} frame={:?}", stream_id, frame);
196
197 let mut d = [42; 9999];
199 let mut b = octets::OctetsMut::with_slice(&mut d);
200
201 if let Some(s) = conn.qlog_streamer() {
202 let events: QlogEvents = action.into();
203 for event in events {
204 match event {
205 QlogEvent::Event { data, ex_data } => {
206 if matches!(data.as_ref(), EventData::PacketSent(..))
208 {
209 continue;
210 }
211
212 s.add_event_data_ex_now(*data, ex_data).ok();
213 },
214
215 QlogEvent::JsonEvent(mut ev) => {
216 ev.time = Instant::now()
218 .duration_since(s.start_time())
219 .as_secs_f32() *
220 1000.0;
221 s.add_event(ev).ok();
222 },
223 }
224 }
225 }
226 let len = frame.to_bytes(&mut b).unwrap();
227
228 conn.stream_send(*stream_id, &d[..len], *fin_stream)
232 .unwrap();
233
234 stream_parsers
235 .entry(*stream_id)
236 .or_insert_with(|| FrameParser::new(*stream_id));
237 },
238
239 Action::SendHeadersFrame {
240 stream_id,
241 fin_stream,
242 headers,
243 frame,
244 ..
245 } => {
246 log::info!(
247 "headers frame tx stream={} hdrs={:?}",
248 stream_id,
249 headers
250 );
251
252 let mut d = [42; 9999];
254 let mut b = octets::OctetsMut::with_slice(&mut d);
255
256 if let Some(s) = conn.qlog_streamer() {
257 let events: QlogEvents = action.into();
258 for event in events {
259 match event {
260 QlogEvent::Event { data, ex_data } => {
261 if matches!(data.as_ref(), EventData::PacketSent(..))
263 {
264 continue;
265 }
266
267 s.add_event_data_ex_now(*data, ex_data).ok();
268 },
269
270 QlogEvent::JsonEvent(mut ev) => {
271 ev.time = Instant::now()
273 .duration_since(s.start_time())
274 .as_secs_f32() *
275 1000.0;
276 s.add_event(ev).ok();
277 },
278 }
279 }
280 }
281 let len = frame.to_bytes(&mut b).unwrap();
282 conn.stream_send(*stream_id, &d[..len], *fin_stream)
283 .unwrap();
284
285 stream_parsers
286 .entry(*stream_id)
287 .or_insert_with(|| FrameParser::new(*stream_id));
288 },
289
290 Action::OpenUniStream {
291 stream_id,
292 fin_stream,
293 stream_type,
294 } => {
295 log::info!(
296 "open uni stream_id={} ty={} fin={}",
297 stream_id,
298 stream_type,
299 fin_stream
300 );
301
302 let mut d = [42; 8];
303 let mut b = octets::OctetsMut::with_slice(&mut d);
304 b.put_varint(*stream_type).unwrap();
305 let off = b.off();
306
307 conn.stream_send(*stream_id, &d[..off], *fin_stream)
308 .unwrap();
309
310 stream_parsers
311 .entry(*stream_id)
312 .or_insert_with(|| FrameParser::new(*stream_id));
313 },
314
315 Action::StreamBytes {
316 stream_id,
317 bytes,
318 fin_stream,
319 } => {
320 log::info!(
321 "stream bytes tx id={} len={} fin={}",
322 stream_id,
323 bytes.len(),
324 fin_stream
325 );
326 conn.stream_send(*stream_id, bytes, *fin_stream).unwrap();
327
328 stream_parsers
329 .entry(*stream_id)
330 .or_insert_with(|| FrameParser::new(*stream_id));
331 },
332
333 Action::ResetStream {
334 stream_id,
335 error_code,
336 } => {
337 log::info!(
338 "reset_stream stream_id={} error_code={}",
339 stream_id,
340 error_code
341 );
342 if let Err(e) = conn.stream_shutdown(
343 *stream_id,
344 quiche::Shutdown::Write,
345 *error_code,
346 ) {
347 log::error!("can't send reset_stream: {}", e);
348 return;
352 }
353
354 stream_parsers
355 .entry(*stream_id)
356 .or_insert_with(|| FrameParser::new(*stream_id));
357 },
358
359 Action::StopSending {
360 stream_id,
361 error_code,
362 } => {
363 log::info!(
364 "stop_sending stream id={} error_code={}",
365 stream_id,
366 error_code
367 );
368
369 if let Err(e) = conn.stream_shutdown(
370 *stream_id,
371 quiche::Shutdown::Read,
372 *error_code,
373 ) {
374 log::error!("can't send stop_sending: {}", e);
375 }
376
377 stream_parsers
380 .entry(*stream_id)
381 .or_insert_with(|| FrameParser::new(*stream_id));
382 },
383
384 Action::ConnectionClose { error } => {
385 let ConnectionError {
386 is_app,
387 error_code,
388 reason,
389 } = error;
390
391 log::info!("connection_close={error:?}");
392 let _ = conn.close(*is_app, *error_code, reason);
393 },
394
395 Action::FlushPackets | Action::Wait { .. } => unreachable!(),
397 }
398}
399
400pub(crate) fn parse_streams<C: Client>(
401 conn: &mut quiche::Connection, client: &mut C,
402) -> Vec<StreamEvent> {
403 let mut responded_streams: Vec<StreamEvent> =
404 Vec::with_capacity(conn.readable().len());
405
406 for stream in conn.readable() {
407 if stream % 4 != 0 {
409 continue;
410 }
411
412 loop {
413 let stream_parse_result = client
414 .stream_parsers_mut()
415 .get_mut(&stream)
416 .expect("stream readable with no parser")
417 .try_parse_frame(conn);
418
419 match stream_parse_result {
420 Ok(FrameParseResult::FrameParsed { h3i_frame, fin }) => {
421 if let H3iFrame::Headers(ref headers) = h3i_frame {
422 log::info!("hdrs={:?}", headers);
423 }
424
425 handle_response_frame(
426 client,
427 conn.qlog_streamer(),
428 &mut responded_streams,
429 stream,
430 h3i_frame,
431 );
432
433 if fin {
434 handle_fin(
435 &mut responded_streams,
436 client.stream_parsers_mut(),
437 stream,
438 );
439 break;
440 }
441 },
442 Ok(FrameParseResult::Retry) => {},
443 Ok(FrameParseResult::Interrupted(cause)) => {
444 if let InterruptCause::ResetStream(error_code) = cause {
445 let frame = H3iFrame::ResetStream(ResetStream {
446 stream_id: stream,
447 error_code,
448 });
449
450 log::info!("received reset stream: {:?}", frame);
451 handle_response_frame(
452 client,
453 None,
454 &mut responded_streams,
455 stream,
456 frame,
457 );
458 }
459
460 handle_fin(
461 &mut responded_streams,
462 client.stream_parsers_mut(),
463 stream,
464 );
465 break;
466 },
467 Err(e) => {
468 match e {
469 Error::TransportError(quiche::Error::Done) => {
470 log::debug!("stream {stream} exhausted");
471 },
472 Error::TransportError(quiche::Error::StreamReset(
473 error_code,
474 )) => {
475 let frame = H3iFrame::ResetStream(ResetStream {
476 stream_id: stream,
477 error_code,
478 });
479
480 log::info!("received reset stream: {:?}", frame);
481
482 handle_response_frame(
483 client,
484 None,
485 &mut responded_streams,
486 stream,
487 frame,
488 );
489
490 client.stream_parsers_mut().remove(&stream);
491 },
492 _ => {
493 log::warn!("stream read error: {e}");
494 },
495 };
496
497 break;
498 },
499 }
500 }
501 }
502
503 responded_streams
504}
505
506fn handle_fin(
507 responded_streams: &mut Vec<StreamEvent>,
508 stream_parsers: &mut StreamParserMap, stream_id: u64,
509) {
510 responded_streams.push(StreamEvent {
511 stream_id,
512 event_type: StreamEventType::Finished,
513 });
514
515 stream_parsers.remove(&stream_id);
516}
517
518fn handle_response_frame<C: Client>(
521 client: &mut C, qlog_streamer: Option<&mut QlogStreamer>,
522 responded_streams: &mut Vec<StreamEvent>, stream_id: u64, frame: H3iFrame,
523) {
524 let cloned = frame.clone();
525 client.handle_response_frame(stream_id, cloned);
526
527 let mut to_qlog: Option<Http3Frame> = None;
528 let mut push_to_responses: Option<StreamEvent> = None;
529
530 match frame {
531 H3iFrame::Headers(enriched_headers) => {
532 push_to_responses = Some(StreamEvent {
533 stream_id,
534 event_type: StreamEventType::Headers,
535 });
536
537 let qlog_headers: Vec<HttpHeader> = enriched_headers
538 .headers()
539 .iter()
540 .map(|h| qlog::events::h3::HttpHeader {
541 name: String::from_utf8_lossy(h.name()).into_owned(),
542 value: String::from_utf8_lossy(h.value()).into_owned(),
543 })
544 .collect();
545
546 to_qlog = Some(Http3Frame::Headers {
547 headers: qlog_headers,
548 });
549 },
550 H3iFrame::QuicheH3(quiche_frame) => {
551 if let QFrame::Data { .. } = quiche_frame {
552 push_to_responses = Some(StreamEvent {
553 stream_id,
554 event_type: StreamEventType::Data,
555 });
556 }
557
558 to_qlog = Some(quiche_frame.to_qlog());
559 },
560 H3iFrame::ResetStream(_) => {
561 push_to_responses = Some(StreamEvent {
562 stream_id,
563 event_type: StreamEventType::Finished,
564 });
565 },
566 }
567
568 if let Some(to_qlog) = to_qlog {
569 handle_qlog(qlog_streamer, to_qlog, stream_id);
570 }
571
572 if let Some(to_push) = push_to_responses {
573 responded_streams.push(to_push);
574 }
575}