1use quiche;
30use quiche::Connection;
31use quiche::ConnectionError;
32use quiche::PathStats;
33use quiche::Stats;
34use serde::ser::SerializeStruct;
35use serde::ser::Serializer;
36use serde::Serialize;
37use std::cmp;
38use std::collections::HashMap;
39use std::iter::FromIterator;
40
41use crate::frame::CloseTriggerFrame;
42use crate::frame::EnrichedHeaders;
43use crate::frame::H3iFrame;
44
45pub const MAX_SERIALIZED_BUFFER_LEN: usize = 16384;
48
49#[derive(Default, Debug)]
59pub struct ConnectionSummary {
60 pub stream_map: StreamMap,
61 pub stats: Option<Stats>,
63 pub path_stats: Vec<PathStats>,
65 pub conn_close_details: ConnectionCloseDetails,
67}
68
69impl Serialize for ConnectionSummary {
70 fn serialize<S>(&self, s: S) -> Result<S::Ok, S::Error>
71 where
72 S: Serializer,
73 {
74 let mut state = s.serialize_struct("path_stats", 4)?;
75 state.serialize_field("stream_map", &self.stream_map)?;
76 state.serialize_field(
77 "stats",
78 &self.stats.as_ref().map(SerializableStats),
79 )?;
80 let p: Vec<SerializablePathStats> =
81 self.path_stats.iter().map(SerializablePathStats).collect();
82 state.serialize_field("path_stats", &p)?;
83 state.serialize_field("error", &self.conn_close_details)?;
84 state.serialize_field(
85 "missed_close_trigger_frames",
86 &self.stream_map.missing_close_trigger_frames(),
87 )?;
88 state.end()
89 }
90}
91
92#[derive(Clone, Debug, Default, Serialize)]
98pub struct StreamMap {
99 stream_frame_map: HashMap<u64, Vec<H3iFrame>>,
100 close_trigger_frames: Option<CloseTriggerFrames>,
101}
102
103impl<T> From<T> for StreamMap
104where
105 T: IntoIterator<Item = (u64, Vec<H3iFrame>)>,
106{
107 fn from(value: T) -> Self {
108 let stream_frame_map = HashMap::from_iter(value);
109
110 Self {
111 stream_frame_map,
112 close_trigger_frames: None,
113 }
114 }
115}
116
117impl StreamMap {
118 pub fn all_frames(&self) -> Vec<H3iFrame> {
137 self.stream_frame_map
138 .values()
139 .flatten()
140 .map(Clone::clone)
141 .collect::<Vec<H3iFrame>>()
142 }
143
144 pub fn stream(&self, stream_id: u64) -> Vec<H3iFrame> {
164 self.stream_frame_map
165 .get(&stream_id)
166 .cloned()
167 .unwrap_or_default()
168 }
169
170 pub fn received_frame(&self, frame: &H3iFrame) -> bool {
189 self.all_frames().contains(frame)
190 }
191
192 pub fn received_frame_on_stream(
210 &self, stream: u64, frame: &H3iFrame,
211 ) -> bool {
212 self.stream_frame_map
213 .get(&stream)
214 .map(|v| v.contains(frame))
215 .is_some()
216 }
217
218 pub fn is_empty(&self) -> bool {
239 self.stream_frame_map.is_empty()
240 }
241
242 pub fn headers_on_stream(&self, stream_id: u64) -> Vec<EnrichedHeaders> {
264 self.stream(stream_id)
265 .into_iter()
266 .filter_map(|h3i_frame| h3i_frame.to_enriched_headers())
267 .collect()
268 }
269
270 pub fn all_close_trigger_frames_seen(&self) -> bool {
273 if let Some(triggers) = self.close_trigger_frames.as_ref() {
274 triggers.saw_all_trigger_frames()
275 } else {
276 false
277 }
278 }
279
280 pub fn missing_close_trigger_frames(&self) -> Option<Vec<CloseTriggerFrame>> {
283 self.close_trigger_frames
284 .as_ref()
285 .map(|e| e.missing_triggers())
286 }
287
288 pub(crate) fn new(close_trigger_frames: Option<CloseTriggerFrames>) -> Self {
290 Self {
291 close_trigger_frames,
292 ..Default::default()
293 }
294 }
295
296 pub(crate) fn insert(&mut self, stream_id: u64, frame: H3iFrame) {
297 if let Some(expected) = self.close_trigger_frames.as_mut() {
298 expected.receive_frame(stream_id, &frame);
299 }
300
301 self.stream_frame_map
302 .entry(stream_id)
303 .or_default()
304 .push(frame);
305 }
306
307 pub(crate) fn close_due_to_trigger_frames(
311 &self, qconn: &mut quiche::Connection,
312 ) {
313 if let Some(ConnectionError {
314 is_app,
315 error_code,
316 reason,
317 }) = self.close_trigger_frames.as_ref().map(|tf| &tf.close_with)
318 {
319 let _ = qconn.close(*is_app, *error_code, reason);
320 }
321 }
322}
323
324#[derive(Clone, Serialize, Debug)]
334pub struct CloseTriggerFrames {
335 missing: Vec<CloseTriggerFrame>,
336 #[serde(skip)]
337 close_with: ConnectionError,
338}
339
340impl CloseTriggerFrames {
341 pub fn new(frames: Vec<CloseTriggerFrame>) -> Self {
345 Self::new_with_connection_close(frames, ConnectionError {
346 is_app: true,
347 error_code: quiche::h3::WireErrorCode::NoError as u64,
348 reason: b"saw all close trigger frames".to_vec(),
349 })
350 }
351
352 pub fn new_with_connection_close(
356 frames: Vec<CloseTriggerFrame>, close_with: ConnectionError,
357 ) -> Self {
358 Self {
359 missing: frames,
360 close_with,
361 }
362 }
363
364 fn receive_frame(&mut self, stream_id: u64, frame: &H3iFrame) {
365 for (i, trigger) in self.missing.iter_mut().enumerate() {
366 if trigger.is_equivalent(frame) && trigger.stream_id() == stream_id {
367 self.missing.remove(i);
368 break;
369 }
370 }
371 }
372
373 fn saw_all_trigger_frames(&self) -> bool {
374 self.missing.is_empty()
375 }
376
377 fn missing_triggers(&self) -> Vec<CloseTriggerFrame> {
378 self.missing.clone()
379 }
380}
381
382impl From<Vec<CloseTriggerFrame>> for CloseTriggerFrames {
383 fn from(value: Vec<CloseTriggerFrame>) -> Self {
384 Self::new(value)
385 }
386}
387
388#[derive(Debug, Default)]
390pub struct ConnectionCloseDetails {
391 peer_error: Option<ConnectionError>,
392 local_error: Option<ConnectionError>,
393 pub timed_out: bool,
395}
396
397impl ConnectionCloseDetails {
398 pub fn new(qconn: &Connection) -> Self {
399 Self {
400 peer_error: qconn.peer_error().cloned(),
401 local_error: qconn.local_error().cloned(),
402 timed_out: qconn.is_timed_out(),
403 }
404 }
405
406 pub fn peer_error(&self) -> Option<&ConnectionError> {
408 self.peer_error.as_ref()
409 }
410
411 pub fn local_error(&self) -> Option<&ConnectionError> {
413 self.local_error.as_ref()
414 }
415
416 pub fn no_err(&self) -> bool {
419 self.peer_error.is_none() && self.local_error.is_none()
420 }
421}
422
423impl Serialize for ConnectionCloseDetails {
424 fn serialize<S>(&self, s: S) -> Result<S::Ok, S::Error>
425 where
426 S: Serializer,
427 {
428 let mut state: <S as Serializer>::SerializeStruct =
429 s.serialize_struct("enriched_connection_error", 3)?;
430 if let Some(pe) = &self.peer_error {
431 state.serialize_field(
432 "peer_error",
433 &SerializableConnectionError(pe),
434 )?;
435 }
436
437 if let Some(le) = &self.local_error {
438 state.serialize_field(
439 "local_error",
440 &SerializableConnectionError(le),
441 )?;
442 }
443
444 state.serialize_field("timed_out", &self.timed_out)?;
445 state.end()
446 }
447}
448
449#[doc(hidden)]
451pub enum ConnectionRecord {
453 StreamedFrame { stream_id: u64, frame: H3iFrame },
454 ConnectionStats(Stats),
455 PathStats(Vec<PathStats>),
456 Close(ConnectionCloseDetails),
457}
458
459pub struct SerializablePathStats<'a>(&'a quiche::PathStats);
461
462impl Serialize for SerializablePathStats<'_> {
463 fn serialize<S>(&self, s: S) -> Result<S::Ok, S::Error>
464 where
465 S: Serializer,
466 {
467 let mut state = s.serialize_struct("path_stats", 17)?;
468 state.serialize_field("local_addr", &self.0.local_addr)?;
469 state.serialize_field("peer_addr", &self.0.peer_addr)?;
470 state.serialize_field("active", &self.0.active)?;
471 state.serialize_field("recv", &self.0.recv)?;
472 state.serialize_field("sent", &self.0.sent)?;
473 state.serialize_field("lost", &self.0.lost)?;
474 state.serialize_field("retrans", &self.0.retrans)?;
475 state.serialize_field("rtt", &self.0.rtt.as_secs_f64())?;
476 state.serialize_field(
477 "min_rtt",
478 &self.0.min_rtt.map(|x| x.as_secs_f64()),
479 )?;
480 state.serialize_field("rttvar", &self.0.rttvar.as_secs_f64())?;
481 state.serialize_field("cwnd", &self.0.cwnd)?;
482 state.serialize_field("sent_bytes", &self.0.sent_bytes)?;
483 state.serialize_field("recv_bytes", &self.0.recv_bytes)?;
484 state.serialize_field("lost_bytes", &self.0.lost_bytes)?;
485 state.serialize_field(
486 "stream_retrans_bytes",
487 &self.0.stream_retrans_bytes,
488 )?;
489 state.serialize_field("pmtu", &self.0.pmtu)?;
490 state.serialize_field("delivery_rate", &self.0.delivery_rate)?;
491 state.end()
492 }
493}
494
495pub struct SerializableStats<'a>(&'a quiche::Stats);
497
498impl Serialize for SerializableStats<'_> {
499 fn serialize<S>(&self, s: S) -> Result<S::Ok, S::Error>
500 where
501 S: Serializer,
502 {
503 let mut state = s.serialize_struct("path_stats", 14)?;
504 state.serialize_field("recv", &self.0.recv)?;
505 state.serialize_field("sent", &self.0.sent)?;
506 state.serialize_field("lost", &self.0.lost)?;
507 state.serialize_field("retrans", &self.0.retrans)?;
508 state.serialize_field("sent_bytes", &self.0.sent_bytes)?;
509 state.serialize_field("recv_bytes", &self.0.recv_bytes)?;
510 state.serialize_field("lost_bytes", &self.0.lost_bytes)?;
511 state.serialize_field(
512 "stream_retrans_bytes",
513 &self.0.stream_retrans_bytes,
514 )?;
515 state.serialize_field("paths_count", &self.0.paths_count)?;
516 state.serialize_field(
517 "reset_stream_count_local",
518 &self.0.reset_stream_count_local,
519 )?;
520 state.serialize_field(
521 "stopped_stream_count_local",
522 &self.0.stopped_stream_count_local,
523 )?;
524 state.serialize_field(
525 "reset_stream_count_remote",
526 &self.0.reset_stream_count_remote,
527 )?;
528 state.serialize_field(
529 "stopped_stream_count_remote",
530 &self.0.stopped_stream_count_remote,
531 )?;
532 state.serialize_field(
533 "path_challenge_rx_count",
534 &self.0.path_challenge_rx_count,
535 )?;
536 state.end()
537 }
538}
539
540#[derive(Clone, Debug)]
542pub struct SerializableConnectionError<'a>(&'a quiche::ConnectionError);
543
544impl Serialize for SerializableConnectionError<'_> {
545 fn serialize<S>(&self, s: S) -> Result<S::Ok, S::Error>
546 where
547 S: Serializer,
548 {
549 let mut state = s.serialize_struct("path_stats", 3)?;
550 state.serialize_field("is_app", &self.0.is_app)?;
551 state.serialize_field("error_code", &self.0.error_code)?;
552 let max = cmp::min(self.0.reason.len(), MAX_SERIALIZED_BUFFER_LEN);
553 state.serialize_field(
554 "reason",
555 &String::from_utf8_lossy(&self.0.reason[..max]),
556 )?;
557 state.end()
558 }
559}
560
561#[cfg(test)]
562mod tests {
563 use super::*;
564 use crate::frame::EnrichedHeaders;
565 use quiche::h3::Header;
566
567 fn h3i_frame() -> H3iFrame {
568 vec![Header::new(b"hello", b"world")].into()
569 }
570
571 #[test]
572 fn close_trigger_frame() {
573 let frame = h3i_frame();
574 let mut triggers = CloseTriggerFrames::new(vec![CloseTriggerFrame::new(
575 0,
576 frame.clone(),
577 )]);
578
579 triggers.receive_frame(0, &frame);
580
581 assert!(triggers.saw_all_trigger_frames());
582 }
583
584 #[test]
585 fn trigger_frame_missing() {
586 let frame = h3i_frame();
587 let expected_frames = vec![
588 CloseTriggerFrame::new(0, frame.clone()),
589 CloseTriggerFrame::new(4, frame.clone()),
590 CloseTriggerFrame::new(8, vec![Header::new(b"go", b"jets")]),
591 ];
592 let mut expected = CloseTriggerFrames::new(expected_frames.clone());
593
594 expected.receive_frame(0, &frame);
595
596 assert!(!expected.saw_all_trigger_frames());
597 assert_eq!(expected.missing_triggers(), expected_frames[1..].to_vec());
598 }
599
600 fn stream_map_data() -> Vec<H3iFrame> {
601 let headers =
602 H3iFrame::Headers(EnrichedHeaders::from(vec![Header::new(
603 b"hello", b"world",
604 )]));
605 let data = H3iFrame::QuicheH3(quiche::h3::frame::Frame::Data {
606 payload: b"hello world".to_vec(),
607 });
608
609 vec![headers, data]
610 }
611
612 #[test]
613 fn test_stream_map_trigger_frames_with_none() {
614 let stream_map: StreamMap = vec![(0, stream_map_data())].into();
615 assert!(!stream_map.all_close_trigger_frames_seen());
616 }
617
618 #[test]
619 fn test_stream_map_trigger_frames() {
620 let data = stream_map_data();
621 let mut stream_map = StreamMap::new(Some(
622 vec![
623 CloseTriggerFrame::new(0, data[0].clone()),
624 CloseTriggerFrame::new(0, data[1].clone()),
625 ]
626 .into(),
627 ));
628
629 stream_map.insert(0, data[0].clone());
630 assert!(!stream_map.all_close_trigger_frames_seen());
631 assert_eq!(stream_map.missing_close_trigger_frames().unwrap(), vec![
632 CloseTriggerFrame::new(0, data[1].clone())
633 ]);
634 }
635}