1use quiche::Connection;
30use quiche::ConnectionError;
31use quiche::PathStats;
32use quiche::Stats;
33use serde::ser::SerializeStruct;
34use serde::ser::Serializer;
35use serde::Serialize;
36use std::cmp;
37use std::collections::HashMap;
38use std::iter::FromIterator;
39
40use crate::frame::CloseTriggerFrame;
41use crate::frame::EnrichedHeaders;
42use crate::frame::H3iFrame;
43use crate::quiche;
44
45pub const MAX_SERIALIZED_BUFFER_LEN: usize = 16384;
48
49#[derive(Default, Debug)]
59pub struct ConnectionSummary {
60 pub stream_map: StreamMap,
61 pub stats: Option<Stats>,
63 pub path_stats: Vec<PathStats>,
65 pub conn_close_details: ConnectionCloseDetails,
67}
68
69impl Serialize for ConnectionSummary {
70 fn serialize<S>(&self, s: S) -> Result<S::Ok, S::Error>
71 where
72 S: Serializer,
73 {
74 let mut state = s.serialize_struct("path_stats", 4)?;
75 state.serialize_field("stream_map", &self.stream_map)?;
76 state.serialize_field(
77 "stats",
78 &self.stats.as_ref().map(SerializableStats),
79 )?;
80 let p: Vec<SerializablePathStats> =
81 self.path_stats.iter().map(SerializablePathStats).collect();
82 state.serialize_field("path_stats", &p)?;
83 state.serialize_field("error", &self.conn_close_details)?;
84 state.serialize_field(
85 "missed_close_trigger_frames",
86 &self.stream_map.missing_close_trigger_frames(),
87 )?;
88 state.end()
89 }
90}
91
92#[derive(Clone, Debug, Default, Serialize)]
98pub struct StreamMap {
99 stream_frame_map: HashMap<u64, Vec<H3iFrame>>,
100 close_trigger_frames: Option<CloseTriggerFrames>,
101}
102
103impl<T> From<T> for StreamMap
104where
105 T: IntoIterator<Item = (u64, Vec<H3iFrame>)>,
106{
107 fn from(value: T) -> Self {
108 let stream_frame_map = HashMap::from_iter(value);
109
110 Self {
111 stream_frame_map,
112 close_trigger_frames: None,
113 }
114 }
115}
116
117impl StreamMap {
118 pub fn all_frames(&self) -> Vec<H3iFrame> {
137 self.stream_frame_map
138 .values()
139 .flatten()
140 .map(Clone::clone)
141 .collect::<Vec<H3iFrame>>()
142 }
143
144 pub fn stream(&self, stream_id: u64) -> Vec<H3iFrame> {
164 self.stream_frame_map
165 .get(&stream_id)
166 .cloned()
167 .unwrap_or_default()
168 }
169
170 pub fn received_frame(&self, frame: &H3iFrame) -> bool {
189 self.all_frames().contains(frame)
190 }
191
192 pub fn received_frame_on_stream(
210 &self, stream: u64, frame: &H3iFrame,
211 ) -> bool {
212 self.stream_frame_map
213 .get(&stream)
214 .map(|v| v.contains(frame))
215 .is_some()
216 }
217
218 pub fn is_empty(&self) -> bool {
239 self.stream_frame_map.is_empty()
240 }
241
242 pub fn headers_on_stream(&self, stream_id: u64) -> Vec<EnrichedHeaders> {
264 self.stream(stream_id)
265 .into_iter()
266 .filter_map(|h3i_frame| h3i_frame.to_enriched_headers())
267 .collect()
268 }
269
270 pub fn all_close_trigger_frames_seen(&self) -> bool {
273 if let Some(triggers) = self.close_trigger_frames.as_ref() {
274 triggers.saw_all_trigger_frames()
275 } else {
276 false
277 }
278 }
279
280 pub fn missing_close_trigger_frames(&self) -> Option<Vec<CloseTriggerFrame>> {
283 self.close_trigger_frames
284 .as_ref()
285 .map(|e| e.missing_triggers())
286 }
287
288 pub(crate) fn new(close_trigger_frames: Option<CloseTriggerFrames>) -> Self {
290 Self {
291 close_trigger_frames,
292 ..Default::default()
293 }
294 }
295
296 pub(crate) fn insert(&mut self, stream_id: u64, frame: H3iFrame) {
297 if let Some(expected) = self.close_trigger_frames.as_mut() {
298 expected.receive_frame(stream_id, &frame);
299 }
300
301 self.stream_frame_map
302 .entry(stream_id)
303 .or_default()
304 .push(frame);
305 }
306
307 pub(crate) fn close_due_to_trigger_frames(
311 &self, qconn: &mut quiche::Connection,
312 ) {
313 if let Some(ConnectionError {
314 is_app,
315 error_code,
316 reason,
317 }) = self.close_trigger_frames.as_ref().map(|tf| &tf.close_with)
318 {
319 let _ = qconn.close(*is_app, *error_code, reason);
320 }
321 }
322}
323
324#[derive(Clone, Serialize, Debug)]
334pub struct CloseTriggerFrames {
335 missing: Vec<CloseTriggerFrame>,
336 #[serde(skip)]
337 close_with: ConnectionError,
338}
339
340impl CloseTriggerFrames {
341 pub fn new(frames: Vec<CloseTriggerFrame>) -> Self {
345 Self::new_with_connection_close(frames, ConnectionError {
346 is_app: true,
347 error_code: quiche::h3::WireErrorCode::NoError as u64,
348 reason: b"saw all close trigger frames".to_vec(),
349 })
350 }
351
352 pub fn new_with_connection_close(
356 frames: Vec<CloseTriggerFrame>, close_with: ConnectionError,
357 ) -> Self {
358 Self {
359 missing: frames,
360 close_with,
361 }
362 }
363
364 fn receive_frame(&mut self, stream_id: u64, frame: &H3iFrame) {
365 for (i, trigger) in self.missing.iter_mut().enumerate() {
366 if trigger.is_equivalent(frame) && trigger.stream_id() == stream_id {
367 self.missing.remove(i);
368 break;
369 }
370 }
371 }
372
373 fn saw_all_trigger_frames(&self) -> bool {
374 self.missing.is_empty()
375 }
376
377 fn missing_triggers(&self) -> Vec<CloseTriggerFrame> {
378 self.missing.clone()
379 }
380}
381
382impl From<Vec<CloseTriggerFrame>> for CloseTriggerFrames {
383 fn from(value: Vec<CloseTriggerFrame>) -> Self {
384 Self::new(value)
385 }
386}
387
388#[derive(Default)]
390pub struct ConnectionCloseDetails {
391 peer_error: Option<ConnectionError>,
392 local_error: Option<ConnectionError>,
393 pub timed_out: bool,
395 pub session: Option<Vec<u8>>,
397}
398
399impl core::fmt::Debug for ConnectionCloseDetails {
400 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
401 f.debug_struct("ConnectionCloseDetails")
403 .field("peer_error", &self.peer_error)
404 .field("local_error", &self.local_error)
405 .field("timed_out", &self.timed_out)
406 .finish()
407 }
408}
409
410impl ConnectionCloseDetails {
411 pub fn new(qconn: &Connection) -> Self {
412 let session = qconn.session().map(|s| s.to_vec());
413 Self {
414 peer_error: qconn.peer_error().cloned(),
415 local_error: qconn.local_error().cloned(),
416 timed_out: qconn.is_timed_out(),
417 session,
418 }
419 }
420
421 pub fn peer_error(&self) -> Option<&ConnectionError> {
423 self.peer_error.as_ref()
424 }
425
426 pub fn local_error(&self) -> Option<&ConnectionError> {
428 self.local_error.as_ref()
429 }
430
431 pub fn no_err(&self) -> bool {
434 self.peer_error.is_none() && self.local_error.is_none()
435 }
436}
437
438impl Serialize for ConnectionCloseDetails {
439 fn serialize<S>(&self, s: S) -> Result<S::Ok, S::Error>
440 where
441 S: Serializer,
442 {
443 let mut state: <S as Serializer>::SerializeStruct =
444 s.serialize_struct("enriched_connection_error", 3)?;
445 if let Some(pe) = &self.peer_error {
446 state.serialize_field(
447 "peer_error",
448 &SerializableConnectionError(pe),
449 )?;
450 }
451
452 if let Some(le) = &self.local_error {
453 state.serialize_field(
454 "local_error",
455 &SerializableConnectionError(le),
456 )?;
457 }
458
459 state.serialize_field("timed_out", &self.timed_out)?;
460 state.end()
461 }
462}
463
464pub struct SerializablePathStats<'a>(&'a quiche::PathStats);
466
467impl Serialize for SerializablePathStats<'_> {
468 fn serialize<S>(&self, s: S) -> Result<S::Ok, S::Error>
469 where
470 S: Serializer,
471 {
472 let mut state = s.serialize_struct("path_stats", 17)?;
473 state.serialize_field("local_addr", &self.0.local_addr)?;
474 state.serialize_field("peer_addr", &self.0.peer_addr)?;
475 state.serialize_field("active", &self.0.active)?;
476 state.serialize_field("recv", &self.0.recv)?;
477 state.serialize_field("sent", &self.0.sent)?;
478 state.serialize_field("lost", &self.0.lost)?;
479 state.serialize_field("retrans", &self.0.retrans)?;
480 state.serialize_field("rtt", &self.0.rtt.as_secs_f64())?;
481 state.serialize_field(
482 "min_rtt",
483 &self.0.min_rtt.map(|x| x.as_secs_f64()),
484 )?;
485 state.serialize_field("rttvar", &self.0.rttvar.as_secs_f64())?;
486 state.serialize_field("cwnd", &self.0.cwnd)?;
487 state.serialize_field("sent_bytes", &self.0.sent_bytes)?;
488 state.serialize_field("recv_bytes", &self.0.recv_bytes)?;
489 state.serialize_field("lost_bytes", &self.0.lost_bytes)?;
490 state.serialize_field(
491 "stream_retrans_bytes",
492 &self.0.stream_retrans_bytes,
493 )?;
494 state.serialize_field("pmtu", &self.0.pmtu)?;
495 state.serialize_field("delivery_rate", &self.0.delivery_rate)?;
496 state.end()
497 }
498}
499
500pub struct SerializableStats<'a>(&'a quiche::Stats);
502
503impl Serialize for SerializableStats<'_> {
504 fn serialize<S>(&self, s: S) -> Result<S::Ok, S::Error>
505 where
506 S: Serializer,
507 {
508 let mut state = s.serialize_struct("path_stats", 14)?;
509 state.serialize_field("recv", &self.0.recv)?;
510 state.serialize_field("sent", &self.0.sent)?;
511 state.serialize_field("lost", &self.0.lost)?;
512 state.serialize_field("retrans", &self.0.retrans)?;
513 state.serialize_field("sent_bytes", &self.0.sent_bytes)?;
514 state.serialize_field("recv_bytes", &self.0.recv_bytes)?;
515 state.serialize_field("lost_bytes", &self.0.lost_bytes)?;
516 state.serialize_field(
517 "stream_retrans_bytes",
518 &self.0.stream_retrans_bytes,
519 )?;
520 state.serialize_field("paths_count", &self.0.paths_count)?;
521 state.serialize_field(
522 "reset_stream_count_local",
523 &self.0.reset_stream_count_local,
524 )?;
525 state.serialize_field(
526 "stopped_stream_count_local",
527 &self.0.stopped_stream_count_local,
528 )?;
529 state.serialize_field(
530 "reset_stream_count_remote",
531 &self.0.reset_stream_count_remote,
532 )?;
533 state.serialize_field(
534 "stopped_stream_count_remote",
535 &self.0.stopped_stream_count_remote,
536 )?;
537 state.serialize_field(
538 "path_challenge_rx_count",
539 &self.0.path_challenge_rx_count,
540 )?;
541 state.end()
542 }
543}
544
545#[derive(Clone, Debug)]
547pub struct SerializableConnectionError<'a>(&'a quiche::ConnectionError);
548
549impl Serialize for SerializableConnectionError<'_> {
550 fn serialize<S>(&self, s: S) -> Result<S::Ok, S::Error>
551 where
552 S: Serializer,
553 {
554 let mut state = s.serialize_struct("path_stats", 3)?;
555 state.serialize_field("is_app", &self.0.is_app)?;
556 state.serialize_field("error_code", &self.0.error_code)?;
557 let max = cmp::min(self.0.reason.len(), MAX_SERIALIZED_BUFFER_LEN);
558 state.serialize_field(
559 "reason",
560 &String::from_utf8_lossy(&self.0.reason[..max]),
561 )?;
562 state.end()
563 }
564}
565
566#[cfg(test)]
567mod tests {
568 use super::*;
569 use crate::frame::EnrichedHeaders;
570 use quiche::h3::Header;
571
572 fn h3i_frame() -> H3iFrame {
573 vec![Header::new(b"hello", b"world")].into()
574 }
575
576 #[test]
577 fn close_trigger_frame() {
578 let frame = h3i_frame();
579 let mut triggers = CloseTriggerFrames::new(vec![CloseTriggerFrame::new(
580 0,
581 frame.clone(),
582 )]);
583
584 triggers.receive_frame(0, &frame);
585
586 assert!(triggers.saw_all_trigger_frames());
587 }
588
589 #[test]
590 fn trigger_frame_missing() {
591 let frame = h3i_frame();
592 let expected_frames = vec![
593 CloseTriggerFrame::new(0, frame.clone()),
594 CloseTriggerFrame::new(4, frame.clone()),
595 CloseTriggerFrame::new(8, vec![Header::new(b"go", b"jets")]),
596 ];
597 let mut expected = CloseTriggerFrames::new(expected_frames.clone());
598
599 expected.receive_frame(0, &frame);
600
601 assert!(!expected.saw_all_trigger_frames());
602 assert_eq!(expected.missing_triggers(), expected_frames[1..].to_vec());
603 }
604
605 fn stream_map_data() -> Vec<H3iFrame> {
606 let headers =
607 H3iFrame::Headers(EnrichedHeaders::from(vec![Header::new(
608 b"hello", b"world",
609 )]));
610 let data = H3iFrame::QuicheH3(quiche::h3::frame::Frame::Data {
611 payload: b"hello world".to_vec(),
612 });
613
614 vec![headers, data]
615 }
616
617 #[test]
618 fn test_stream_map_trigger_frames_with_none() {
619 let stream_map: StreamMap = vec![(0, stream_map_data())].into();
620 assert!(!stream_map.all_close_trigger_frames_seen());
621 }
622
623 #[test]
624 fn test_stream_map_trigger_frames() {
625 let data = stream_map_data();
626 let mut stream_map = StreamMap::new(Some(
627 vec![
628 CloseTriggerFrame::new(0, data[0].clone()),
629 CloseTriggerFrame::new(0, data[1].clone()),
630 ]
631 .into(),
632 ));
633
634 stream_map.insert(0, data[0].clone());
635 assert!(!stream_map.all_close_trigger_frames_seen());
636 assert_eq!(stream_map.missing_close_trigger_frames().unwrap(), vec![
637 CloseTriggerFrame::new(0, data[1].clone())
638 ]);
639 }
640}