1use quiche::Connection;
30use quiche::ConnectionError;
31use quiche::PathStats;
32use quiche::Stats;
33use serde::ser::SerializeStruct;
34use serde::ser::Serializer;
35use serde::Serialize;
36use std::cmp;
37use std::collections::HashMap;
38use std::iter::FromIterator;
39
40use crate::frame::CloseTriggerFrame;
41use crate::frame::EnrichedHeaders;
42use crate::frame::H3iFrame;
43use crate::quiche;
44
45pub const MAX_SERIALIZED_BUFFER_LEN: usize = 16384;
48
49#[derive(Default, Debug)]
59pub struct ConnectionSummary {
60 pub stream_map: StreamMap,
61 pub stats: Option<Stats>,
63 pub path_stats: Vec<PathStats>,
65 pub conn_close_details: ConnectionCloseDetails,
67}
68
69impl Serialize for ConnectionSummary {
70 fn serialize<S>(&self, s: S) -> Result<S::Ok, S::Error>
71 where
72 S: Serializer,
73 {
74 let mut state = s.serialize_struct("path_stats", 4)?;
75 state.serialize_field("stream_map", &self.stream_map)?;
76 state.serialize_field(
77 "stats",
78 &self.stats.as_ref().map(SerializableStats),
79 )?;
80 let p: Vec<SerializablePathStats> =
81 self.path_stats.iter().map(SerializablePathStats).collect();
82 state.serialize_field("path_stats", &p)?;
83 state.serialize_field("error", &self.conn_close_details)?;
84 state.serialize_field(
85 "missed_close_trigger_frames",
86 &self.stream_map.missing_close_trigger_frames(),
87 )?;
88 state.end()
89 }
90}
91
92#[derive(Clone, Debug, Default, Serialize)]
98pub struct StreamMap {
99 stream_frame_map: HashMap<u64, Vec<H3iFrame>>,
100 close_trigger_frames: Option<CloseTriggerFrames>,
101}
102
103impl<T> From<T> for StreamMap
104where
105 T: IntoIterator<Item = (u64, Vec<H3iFrame>)>,
106{
107 fn from(value: T) -> Self {
108 let stream_frame_map = HashMap::from_iter(value);
109
110 Self {
111 stream_frame_map,
112 close_trigger_frames: None,
113 }
114 }
115}
116
117impl StreamMap {
118 pub fn all_frames(&self) -> Vec<H3iFrame> {
137 self.stream_frame_map
138 .values()
139 .flatten()
140 .map(Clone::clone)
141 .collect::<Vec<H3iFrame>>()
142 }
143
144 pub fn stream(&self, stream_id: u64) -> Vec<H3iFrame> {
164 self.stream_frame_map
165 .get(&stream_id)
166 .cloned()
167 .unwrap_or_default()
168 }
169
170 pub fn received_frame(&self, frame: &H3iFrame) -> bool {
189 self.all_frames().contains(frame)
190 }
191
192 pub fn received_frame_on_stream(
210 &self, stream: u64, frame: &H3iFrame,
211 ) -> bool {
212 self.stream_frame_map
213 .get(&stream)
214 .map(|v| v.contains(frame))
215 .is_some()
216 }
217
218 pub fn is_empty(&self) -> bool {
239 self.stream_frame_map.is_empty()
240 }
241
242 pub fn headers_on_stream(&self, stream_id: u64) -> Vec<EnrichedHeaders> {
264 self.stream(stream_id)
265 .into_iter()
266 .filter_map(|h3i_frame| h3i_frame.to_enriched_headers())
267 .collect()
268 }
269
270 pub fn all_close_trigger_frames_seen(&self) -> bool {
273 if let Some(triggers) = self.close_trigger_frames.as_ref() {
274 triggers.saw_all_trigger_frames()
275 } else {
276 false
277 }
278 }
279
280 pub fn missing_close_trigger_frames(&self) -> Option<Vec<CloseTriggerFrame>> {
283 self.close_trigger_frames
284 .as_ref()
285 .map(|e| e.missing_triggers())
286 }
287
288 pub(crate) fn new(close_trigger_frames: Option<CloseTriggerFrames>) -> Self {
290 Self {
291 close_trigger_frames,
292 ..Default::default()
293 }
294 }
295
296 pub(crate) fn insert(&mut self, stream_id: u64, frame: H3iFrame) {
297 if let Some(expected) = self.close_trigger_frames.as_mut() {
298 expected.receive_frame(stream_id, &frame);
299 }
300
301 self.stream_frame_map
302 .entry(stream_id)
303 .or_default()
304 .push(frame);
305 }
306
307 pub(crate) fn close_due_to_trigger_frames(
311 &self, qconn: &mut quiche::Connection,
312 ) {
313 if let Some(ConnectionError {
314 is_app,
315 error_code,
316 reason,
317 }) = self.close_trigger_frames.as_ref().map(|tf| &tf.close_with)
318 {
319 let _ = qconn.close(*is_app, *error_code, reason);
320 }
321 }
322}
323
324#[derive(Clone, Serialize, Debug)]
334pub struct CloseTriggerFrames {
335 missing: Vec<CloseTriggerFrame>,
336 #[serde(skip)]
337 close_with: ConnectionError,
338}
339
340impl CloseTriggerFrames {
341 pub fn new(frames: Vec<CloseTriggerFrame>) -> Self {
345 Self::new_with_connection_close(frames, ConnectionError {
346 is_app: true,
347 error_code: quiche::h3::WireErrorCode::NoError as u64,
348 reason: b"saw all close trigger frames".to_vec(),
349 })
350 }
351
352 pub fn new_with_connection_close(
356 frames: Vec<CloseTriggerFrame>, close_with: ConnectionError,
357 ) -> Self {
358 Self {
359 missing: frames,
360 close_with,
361 }
362 }
363
364 fn receive_frame(&mut self, stream_id: u64, frame: &H3iFrame) {
365 for (i, trigger) in self.missing.iter_mut().enumerate() {
366 if trigger.is_equivalent(frame) && trigger.stream_id() == stream_id {
367 self.missing.remove(i);
368 break;
369 }
370 }
371 }
372
373 fn saw_all_trigger_frames(&self) -> bool {
374 self.missing.is_empty()
375 }
376
377 fn missing_triggers(&self) -> Vec<CloseTriggerFrame> {
378 self.missing.clone()
379 }
380}
381
382impl From<Vec<CloseTriggerFrame>> for CloseTriggerFrames {
383 fn from(value: Vec<CloseTriggerFrame>) -> Self {
384 Self::new(value)
385 }
386}
387
388#[derive(Debug, Default)]
390pub struct ConnectionCloseDetails {
391 peer_error: Option<ConnectionError>,
392 local_error: Option<ConnectionError>,
393 pub timed_out: bool,
395}
396
397impl ConnectionCloseDetails {
398 pub fn new(qconn: &Connection) -> Self {
399 Self {
400 peer_error: qconn.peer_error().cloned(),
401 local_error: qconn.local_error().cloned(),
402 timed_out: qconn.is_timed_out(),
403 }
404 }
405
406 pub fn peer_error(&self) -> Option<&ConnectionError> {
408 self.peer_error.as_ref()
409 }
410
411 pub fn local_error(&self) -> Option<&ConnectionError> {
413 self.local_error.as_ref()
414 }
415
416 pub fn no_err(&self) -> bool {
419 self.peer_error.is_none() && self.local_error.is_none()
420 }
421}
422
423impl Serialize for ConnectionCloseDetails {
424 fn serialize<S>(&self, s: S) -> Result<S::Ok, S::Error>
425 where
426 S: Serializer,
427 {
428 let mut state: <S as Serializer>::SerializeStruct =
429 s.serialize_struct("enriched_connection_error", 3)?;
430 if let Some(pe) = &self.peer_error {
431 state.serialize_field(
432 "peer_error",
433 &SerializableConnectionError(pe),
434 )?;
435 }
436
437 if let Some(le) = &self.local_error {
438 state.serialize_field(
439 "local_error",
440 &SerializableConnectionError(le),
441 )?;
442 }
443
444 state.serialize_field("timed_out", &self.timed_out)?;
445 state.end()
446 }
447}
448
449pub struct SerializablePathStats<'a>(&'a quiche::PathStats);
451
452impl Serialize for SerializablePathStats<'_> {
453 fn serialize<S>(&self, s: S) -> Result<S::Ok, S::Error>
454 where
455 S: Serializer,
456 {
457 let mut state = s.serialize_struct("path_stats", 17)?;
458 state.serialize_field("local_addr", &self.0.local_addr)?;
459 state.serialize_field("peer_addr", &self.0.peer_addr)?;
460 state.serialize_field("active", &self.0.active)?;
461 state.serialize_field("recv", &self.0.recv)?;
462 state.serialize_field("sent", &self.0.sent)?;
463 state.serialize_field("lost", &self.0.lost)?;
464 state.serialize_field("retrans", &self.0.retrans)?;
465 state.serialize_field("rtt", &self.0.rtt.as_secs_f64())?;
466 state.serialize_field(
467 "min_rtt",
468 &self.0.min_rtt.map(|x| x.as_secs_f64()),
469 )?;
470 state.serialize_field("rttvar", &self.0.rttvar.as_secs_f64())?;
471 state.serialize_field("cwnd", &self.0.cwnd)?;
472 state.serialize_field("sent_bytes", &self.0.sent_bytes)?;
473 state.serialize_field("recv_bytes", &self.0.recv_bytes)?;
474 state.serialize_field("lost_bytes", &self.0.lost_bytes)?;
475 state.serialize_field(
476 "stream_retrans_bytes",
477 &self.0.stream_retrans_bytes,
478 )?;
479 state.serialize_field("pmtu", &self.0.pmtu)?;
480 state.serialize_field("delivery_rate", &self.0.delivery_rate)?;
481 state.end()
482 }
483}
484
485pub struct SerializableStats<'a>(&'a quiche::Stats);
487
488impl Serialize for SerializableStats<'_> {
489 fn serialize<S>(&self, s: S) -> Result<S::Ok, S::Error>
490 where
491 S: Serializer,
492 {
493 let mut state = s.serialize_struct("path_stats", 14)?;
494 state.serialize_field("recv", &self.0.recv)?;
495 state.serialize_field("sent", &self.0.sent)?;
496 state.serialize_field("lost", &self.0.lost)?;
497 state.serialize_field("retrans", &self.0.retrans)?;
498 state.serialize_field("sent_bytes", &self.0.sent_bytes)?;
499 state.serialize_field("recv_bytes", &self.0.recv_bytes)?;
500 state.serialize_field("lost_bytes", &self.0.lost_bytes)?;
501 state.serialize_field(
502 "stream_retrans_bytes",
503 &self.0.stream_retrans_bytes,
504 )?;
505 state.serialize_field("paths_count", &self.0.paths_count)?;
506 state.serialize_field(
507 "reset_stream_count_local",
508 &self.0.reset_stream_count_local,
509 )?;
510 state.serialize_field(
511 "stopped_stream_count_local",
512 &self.0.stopped_stream_count_local,
513 )?;
514 state.serialize_field(
515 "reset_stream_count_remote",
516 &self.0.reset_stream_count_remote,
517 )?;
518 state.serialize_field(
519 "stopped_stream_count_remote",
520 &self.0.stopped_stream_count_remote,
521 )?;
522 state.serialize_field(
523 "path_challenge_rx_count",
524 &self.0.path_challenge_rx_count,
525 )?;
526 state.end()
527 }
528}
529
530#[derive(Clone, Debug)]
532pub struct SerializableConnectionError<'a>(&'a quiche::ConnectionError);
533
534impl Serialize for SerializableConnectionError<'_> {
535 fn serialize<S>(&self, s: S) -> Result<S::Ok, S::Error>
536 where
537 S: Serializer,
538 {
539 let mut state = s.serialize_struct("path_stats", 3)?;
540 state.serialize_field("is_app", &self.0.is_app)?;
541 state.serialize_field("error_code", &self.0.error_code)?;
542 let max = cmp::min(self.0.reason.len(), MAX_SERIALIZED_BUFFER_LEN);
543 state.serialize_field(
544 "reason",
545 &String::from_utf8_lossy(&self.0.reason[..max]),
546 )?;
547 state.end()
548 }
549}
550
551#[cfg(test)]
552mod tests {
553 use super::*;
554 use crate::frame::EnrichedHeaders;
555 use quiche::h3::Header;
556
557 fn h3i_frame() -> H3iFrame {
558 vec![Header::new(b"hello", b"world")].into()
559 }
560
561 #[test]
562 fn close_trigger_frame() {
563 let frame = h3i_frame();
564 let mut triggers = CloseTriggerFrames::new(vec![CloseTriggerFrame::new(
565 0,
566 frame.clone(),
567 )]);
568
569 triggers.receive_frame(0, &frame);
570
571 assert!(triggers.saw_all_trigger_frames());
572 }
573
574 #[test]
575 fn trigger_frame_missing() {
576 let frame = h3i_frame();
577 let expected_frames = vec![
578 CloseTriggerFrame::new(0, frame.clone()),
579 CloseTriggerFrame::new(4, frame.clone()),
580 CloseTriggerFrame::new(8, vec![Header::new(b"go", b"jets")]),
581 ];
582 let mut expected = CloseTriggerFrames::new(expected_frames.clone());
583
584 expected.receive_frame(0, &frame);
585
586 assert!(!expected.saw_all_trigger_frames());
587 assert_eq!(expected.missing_triggers(), expected_frames[1..].to_vec());
588 }
589
590 fn stream_map_data() -> Vec<H3iFrame> {
591 let headers =
592 H3iFrame::Headers(EnrichedHeaders::from(vec![Header::new(
593 b"hello", b"world",
594 )]));
595 let data = H3iFrame::QuicheH3(quiche::h3::frame::Frame::Data {
596 payload: b"hello world".to_vec(),
597 });
598
599 vec![headers, data]
600 }
601
602 #[test]
603 fn test_stream_map_trigger_frames_with_none() {
604 let stream_map: StreamMap = vec![(0, stream_map_data())].into();
605 assert!(!stream_map.all_close_trigger_frames_seen());
606 }
607
608 #[test]
609 fn test_stream_map_trigger_frames() {
610 let data = stream_map_data();
611 let mut stream_map = StreamMap::new(Some(
612 vec![
613 CloseTriggerFrame::new(0, data[0].clone()),
614 CloseTriggerFrame::new(0, data[1].clone()),
615 ]
616 .into(),
617 ));
618
619 stream_map.insert(0, data[0].clone());
620 assert!(!stream_map.all_close_trigger_frames_seen());
621 assert_eq!(stream_map.missing_close_trigger_frames().unwrap(), vec![
622 CloseTriggerFrame::new(0, data[1].clone())
623 ]);
624 }
625}