1use quiche::ConnectionError;
30use quiche::PathStats;
31use quiche::Stats;
32use serde::ser::SerializeStruct;
33use serde::ser::Serializer;
34use serde::Serialize;
35use std::cmp;
36use std::collections::HashMap;
37use std::iter::FromIterator;
38
39use crate::frame::CloseTriggerFrame;
40use crate::frame::EnrichedHeaders;
41use crate::frame::H3iFrame;
42use crate::quiche;
43
44pub const MAX_SERIALIZED_BUFFER_LEN: usize = 16384;
47
48#[derive(Default, Debug)]
58pub struct ConnectionSummary {
59 pub stream_map: StreamMap,
60 pub stats: Option<Stats>,
62 pub path_stats: Vec<PathStats>,
64 pub conn_close_details: ConnectionCloseDetails,
66}
67
68impl Serialize for ConnectionSummary {
69 fn serialize<S>(&self, s: S) -> Result<S::Ok, S::Error>
70 where
71 S: Serializer,
72 {
73 let mut state = s.serialize_struct("path_stats", 4)?;
74 state.serialize_field("stream_map", &self.stream_map)?;
75 state.serialize_field(
76 "stats",
77 &self.stats.as_ref().map(SerializableStats),
78 )?;
79 let p: Vec<SerializablePathStats> =
80 self.path_stats.iter().map(SerializablePathStats).collect();
81 state.serialize_field("path_stats", &p)?;
82 state.serialize_field("error", &self.conn_close_details)?;
83 state.serialize_field(
84 "missed_close_trigger_frames",
85 &self.stream_map.missing_close_trigger_frames(),
86 )?;
87 state.end()
88 }
89}
90
91#[derive(Clone, Debug, Default, Serialize)]
97pub struct StreamMap {
98 stream_frame_map: HashMap<u64, Vec<H3iFrame>>,
99 close_trigger_frames: Option<CloseTriggerFrames>,
100}
101
102impl<T> From<T> for StreamMap
103where
104 T: IntoIterator<Item = (u64, Vec<H3iFrame>)>,
105{
106 fn from(value: T) -> Self {
107 let stream_frame_map = HashMap::from_iter(value);
108
109 Self {
110 stream_frame_map,
111 close_trigger_frames: None,
112 }
113 }
114}
115
116impl StreamMap {
117 pub fn all_frames(&self) -> Vec<H3iFrame> {
136 self.stream_frame_map
137 .values()
138 .flatten()
139 .map(Clone::clone)
140 .collect::<Vec<H3iFrame>>()
141 }
142
143 pub fn stream(&self, stream_id: u64) -> Vec<H3iFrame> {
163 self.stream_frame_map
164 .get(&stream_id)
165 .cloned()
166 .unwrap_or_default()
167 }
168
169 pub fn received_frame(&self, frame: &H3iFrame) -> bool {
188 self.all_frames().contains(frame)
189 }
190
191 pub fn received_frame_on_stream(
209 &self, stream: u64, frame: &H3iFrame,
210 ) -> bool {
211 self.stream_frame_map
212 .get(&stream)
213 .map(|v| v.contains(frame))
214 .is_some()
215 }
216
217 pub fn is_empty(&self) -> bool {
238 self.stream_frame_map.is_empty()
239 }
240
241 pub fn headers_on_stream(&self, stream_id: u64) -> Vec<EnrichedHeaders> {
263 self.stream(stream_id)
264 .into_iter()
265 .filter_map(|h3i_frame| h3i_frame.to_enriched_headers())
266 .collect()
267 }
268
269 pub fn all_close_trigger_frames_seen(&self) -> bool {
272 if let Some(triggers) = self.close_trigger_frames.as_ref() {
273 triggers.saw_all_trigger_frames()
274 } else {
275 false
276 }
277 }
278
279 pub fn missing_close_trigger_frames(&self) -> Option<Vec<CloseTriggerFrame>> {
282 self.close_trigger_frames
283 .as_ref()
284 .map(|e| e.missing_triggers())
285 }
286
287 pub(crate) fn new(close_trigger_frames: Option<CloseTriggerFrames>) -> Self {
289 Self {
290 close_trigger_frames,
291 ..Default::default()
292 }
293 }
294
295 pub(crate) fn insert(&mut self, stream_id: u64, frame: H3iFrame) {
296 if let Some(expected) = self.close_trigger_frames.as_mut() {
297 expected.receive_frame(stream_id, &frame);
298 }
299
300 self.stream_frame_map
301 .entry(stream_id)
302 .or_default()
303 .push(frame);
304 }
305
306 pub(crate) fn close_due_to_trigger_frames(
310 &self, qconn: &mut quiche::Connection,
311 ) {
312 if let Some(ConnectionError {
313 is_app,
314 error_code,
315 reason,
316 }) = self.close_trigger_frames.as_ref().map(|tf| &tf.close_with)
317 {
318 let _ = qconn.close(*is_app, *error_code, reason);
319 }
320 }
321}
322
323#[derive(Clone, Serialize, Debug)]
333pub struct CloseTriggerFrames {
334 missing: Vec<CloseTriggerFrame>,
335 #[serde(skip)]
336 close_with: ConnectionError,
337}
338
339impl CloseTriggerFrames {
340 pub fn new(frames: Vec<CloseTriggerFrame>) -> Self {
344 Self::new_with_connection_close(frames, ConnectionError {
345 is_app: true,
346 error_code: quiche::h3::WireErrorCode::NoError as u64,
347 reason: b"saw all close trigger frames".to_vec(),
348 })
349 }
350
351 pub fn new_with_connection_close(
355 frames: Vec<CloseTriggerFrame>, close_with: ConnectionError,
356 ) -> Self {
357 Self {
358 missing: frames,
359 close_with,
360 }
361 }
362
363 fn receive_frame(&mut self, stream_id: u64, frame: &H3iFrame) {
364 for (i, trigger) in self.missing.iter_mut().enumerate() {
365 if trigger.is_equivalent(frame) && trigger.stream_id() == stream_id {
366 self.missing.remove(i);
367 break;
368 }
369 }
370 }
371
372 fn saw_all_trigger_frames(&self) -> bool {
373 self.missing.is_empty()
374 }
375
376 fn missing_triggers(&self) -> Vec<CloseTriggerFrame> {
377 self.missing.clone()
378 }
379}
380
381impl From<Vec<CloseTriggerFrame>> for CloseTriggerFrames {
382 fn from(value: Vec<CloseTriggerFrame>) -> Self {
383 Self::new(value)
384 }
385}
386
387#[derive(Default)]
389pub struct ConnectionCloseDetails {
390 peer_error: Option<ConnectionError>,
391 local_error: Option<ConnectionError>,
392 pub timed_out: bool,
394 pub session: Option<Vec<u8>>,
396}
397
398impl core::fmt::Debug for ConnectionCloseDetails {
399 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
400 f.debug_struct("ConnectionCloseDetails")
402 .field("peer_error", &self.peer_error)
403 .field("local_error", &self.local_error)
404 .field("timed_out", &self.timed_out)
405 .finish()
406 }
407}
408
409impl ConnectionCloseDetails {
410 pub fn new<F: quiche::BufFactory>(qconn: &quiche::Connection<F>) -> Self {
411 let session = qconn.session().map(|s| s.to_vec());
412 Self {
413 peer_error: qconn.peer_error().cloned(),
414 local_error: qconn.local_error().cloned(),
415 timed_out: qconn.is_timed_out(),
416 session,
417 }
418 }
419
420 pub fn peer_error(&self) -> Option<&ConnectionError> {
422 self.peer_error.as_ref()
423 }
424
425 pub fn local_error(&self) -> Option<&ConnectionError> {
427 self.local_error.as_ref()
428 }
429
430 pub fn no_err(&self) -> bool {
433 self.peer_error.is_none() && self.local_error.is_none()
434 }
435}
436
437impl Serialize for ConnectionCloseDetails {
438 fn serialize<S>(&self, s: S) -> Result<S::Ok, S::Error>
439 where
440 S: Serializer,
441 {
442 let mut state: <S as Serializer>::SerializeStruct =
443 s.serialize_struct("enriched_connection_error", 3)?;
444 if let Some(pe) = &self.peer_error {
445 state.serialize_field(
446 "peer_error",
447 &SerializableConnectionError(pe),
448 )?;
449 }
450
451 if let Some(le) = &self.local_error {
452 state.serialize_field(
453 "local_error",
454 &SerializableConnectionError(le),
455 )?;
456 }
457
458 state.serialize_field("timed_out", &self.timed_out)?;
459 state.end()
460 }
461}
462
463pub struct SerializablePathStats<'a>(&'a quiche::PathStats);
465
466impl Serialize for SerializablePathStats<'_> {
467 fn serialize<S>(&self, s: S) -> Result<S::Ok, S::Error>
468 where
469 S: Serializer,
470 {
471 let mut state = s.serialize_struct("path_stats", 17)?;
472 state.serialize_field("local_addr", &self.0.local_addr)?;
473 state.serialize_field("peer_addr", &self.0.peer_addr)?;
474 state.serialize_field("active", &self.0.active)?;
475 state.serialize_field("recv", &self.0.recv)?;
476 state.serialize_field("sent", &self.0.sent)?;
477 state.serialize_field("lost", &self.0.lost)?;
478 state.serialize_field("retrans", &self.0.retrans)?;
479 state.serialize_field("rtt", &self.0.rtt.as_secs_f64())?;
480 state.serialize_field(
481 "min_rtt",
482 &self.0.min_rtt.map(|x| x.as_secs_f64()),
483 )?;
484 state.serialize_field("rttvar", &self.0.rttvar.as_secs_f64())?;
485 state.serialize_field("cwnd", &self.0.cwnd)?;
486 state.serialize_field("sent_bytes", &self.0.sent_bytes)?;
487 state.serialize_field("recv_bytes", &self.0.recv_bytes)?;
488 state.serialize_field("lost_bytes", &self.0.lost_bytes)?;
489 state.serialize_field(
490 "stream_retrans_bytes",
491 &self.0.stream_retrans_bytes,
492 )?;
493 state.serialize_field("pmtu", &self.0.pmtu)?;
494 state.serialize_field("delivery_rate", &self.0.delivery_rate)?;
495 state.end()
496 }
497}
498
499pub struct SerializableStats<'a>(&'a quiche::Stats);
501
502impl Serialize for SerializableStats<'_> {
503 fn serialize<S>(&self, s: S) -> Result<S::Ok, S::Error>
504 where
505 S: Serializer,
506 {
507 let mut state = s.serialize_struct("path_stats", 14)?;
508 state.serialize_field("recv", &self.0.recv)?;
509 state.serialize_field("sent", &self.0.sent)?;
510 state.serialize_field("lost", &self.0.lost)?;
511 state.serialize_field("retrans", &self.0.retrans)?;
512 state.serialize_field("sent_bytes", &self.0.sent_bytes)?;
513 state.serialize_field("recv_bytes", &self.0.recv_bytes)?;
514 state.serialize_field("lost_bytes", &self.0.lost_bytes)?;
515 state.serialize_field(
516 "stream_retrans_bytes",
517 &self.0.stream_retrans_bytes,
518 )?;
519 state.serialize_field("paths_count", &self.0.paths_count)?;
520 state.serialize_field(
521 "reset_stream_count_local",
522 &self.0.reset_stream_count_local,
523 )?;
524 state.serialize_field(
525 "stopped_stream_count_local",
526 &self.0.stopped_stream_count_local,
527 )?;
528 state.serialize_field(
529 "reset_stream_count_remote",
530 &self.0.reset_stream_count_remote,
531 )?;
532 state.serialize_field(
533 "stopped_stream_count_remote",
534 &self.0.stopped_stream_count_remote,
535 )?;
536 state.serialize_field(
537 "path_challenge_rx_count",
538 &self.0.path_challenge_rx_count,
539 )?;
540 state.end()
541 }
542}
543
544#[derive(Clone, Debug)]
546pub struct SerializableConnectionError<'a>(&'a quiche::ConnectionError);
547
548impl Serialize for SerializableConnectionError<'_> {
549 fn serialize<S>(&self, s: S) -> Result<S::Ok, S::Error>
550 where
551 S: Serializer,
552 {
553 let mut state = s.serialize_struct("path_stats", 3)?;
554 state.serialize_field("is_app", &self.0.is_app)?;
555 state.serialize_field("error_code", &self.0.error_code)?;
556 let max = cmp::min(self.0.reason.len(), MAX_SERIALIZED_BUFFER_LEN);
557 state.serialize_field(
558 "reason",
559 &String::from_utf8_lossy(&self.0.reason[..max]),
560 )?;
561 state.end()
562 }
563}
564
565#[cfg(test)]
566mod tests {
567 use super::*;
568 use crate::frame::EnrichedHeaders;
569 use quiche::h3::Header;
570
571 fn h3i_frame() -> H3iFrame {
572 vec![Header::new(b"hello", b"world")].into()
573 }
574
575 #[test]
576 fn close_trigger_frame() {
577 let frame = h3i_frame();
578 let mut triggers = CloseTriggerFrames::new(vec![CloseTriggerFrame::new(
579 0,
580 frame.clone(),
581 )]);
582
583 triggers.receive_frame(0, &frame);
584
585 assert!(triggers.saw_all_trigger_frames());
586 }
587
588 #[test]
589 fn trigger_frame_missing() {
590 let frame = h3i_frame();
591 let expected_frames = vec![
592 CloseTriggerFrame::new(0, frame.clone()),
593 CloseTriggerFrame::new(4, frame.clone()),
594 CloseTriggerFrame::new(8, vec![Header::new(b"go", b"jets")]),
595 ];
596 let mut expected = CloseTriggerFrames::new(expected_frames.clone());
597
598 expected.receive_frame(0, &frame);
599
600 assert!(!expected.saw_all_trigger_frames());
601 assert_eq!(expected.missing_triggers(), expected_frames[1..].to_vec());
602 }
603
604 fn stream_map_data() -> Vec<H3iFrame> {
605 let headers =
606 H3iFrame::Headers(EnrichedHeaders::from(vec![Header::new(
607 b"hello", b"world",
608 )]));
609 let data = H3iFrame::QuicheH3(quiche::h3::frame::Frame::Data {
610 payload: b"hello world".to_vec(),
611 });
612
613 vec![headers, data]
614 }
615
616 #[test]
617 fn test_stream_map_trigger_frames_with_none() {
618 let stream_map: StreamMap = vec![(0, stream_map_data())].into();
619 assert!(!stream_map.all_close_trigger_frames_seen());
620 }
621
622 #[test]
623 fn test_stream_map_trigger_frames() {
624 let data = stream_map_data();
625 let mut stream_map = StreamMap::new(Some(
626 vec![
627 CloseTriggerFrame::new(0, data[0].clone()),
628 CloseTriggerFrame::new(0, data[1].clone()),
629 ]
630 .into(),
631 ));
632
633 stream_map.insert(0, data[0].clone());
634 assert!(!stream_map.all_close_trigger_frames_seen());
635 assert_eq!(stream_map.missing_close_trigger_frames().unwrap(), vec![
636 CloseTriggerFrame::new(0, data[1].clone())
637 ]);
638 }
639}