Skip to main content

h3i/client/
connection_summary.rs

1// Copyright (C) 2024, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27//! Summarizes events that occurred during a connection.
28
29use quiche::ConnectionError;
30use quiche::PathStats;
31use quiche::Stats;
32use serde::ser::SerializeStruct;
33use serde::ser::Serializer;
34use serde::Serialize;
35use std::cmp;
36use std::collections::HashMap;
37use std::iter::FromIterator;
38
39use crate::frame::CloseTriggerFrame;
40use crate::frame::EnrichedHeaders;
41use crate::frame::H3iFrame;
42use crate::quiche;
43
44/// Maximum length of any serialized element's unstructured data such as reason
45/// phrase.
46pub const MAX_SERIALIZED_BUFFER_LEN: usize = 16384;
47
48/// A summary of all frames received on a connection. There are some extra
49/// fields included to provide additional context into the connection's
50/// behavior.
51///
52/// ConnectionSummary implements [Serialize]. HTTP/3 frames that contain binary
53/// payload are serialized using the qlog
54/// [hexstring](https://www.ietf.org/archive/id/draft-ietf-quic-qlog-main-schema-10.html#section-1.2)
55/// format - "an even-length lowercase string of hexadecimally encoded bytes
56/// examples: 82dc, 027339, 4cdbfd9bf0"
57#[derive(Default, Debug)]
58pub struct ConnectionSummary {
59    pub stream_map: StreamMap,
60    /// L4 statistics received from the connection.
61    pub stats: Option<Stats>,
62    /// Statistics about all paths of the connection.
63    pub path_stats: Vec<PathStats>,
64    /// Details about why the connection closed.
65    pub conn_close_details: ConnectionCloseDetails,
66}
67
68impl Serialize for ConnectionSummary {
69    fn serialize<S>(&self, s: S) -> Result<S::Ok, S::Error>
70    where
71        S: Serializer,
72    {
73        let mut state = s.serialize_struct("path_stats", 4)?;
74        state.serialize_field("stream_map", &self.stream_map)?;
75        state.serialize_field(
76            "stats",
77            &self.stats.as_ref().map(SerializableStats),
78        )?;
79        let p: Vec<SerializablePathStats> =
80            self.path_stats.iter().map(SerializablePathStats).collect();
81        state.serialize_field("path_stats", &p)?;
82        state.serialize_field("error", &self.conn_close_details)?;
83        state.serialize_field(
84            "missed_close_trigger_frames",
85            &self.stream_map.missing_close_trigger_frames(),
86        )?;
87        state.end()
88    }
89}
90
91/// A read-only aggregation of frames received over a connection, mapped to the
92/// stream ID over which they were received.
93///
94/// [`StreamMap`] also contains the [`CloseTriggerFrames`] for the connection so
95/// that its state can be updated as new frames are received.
96#[derive(Clone, Debug, Default, Serialize)]
97pub struct StreamMap {
98    stream_frame_map: HashMap<u64, Vec<H3iFrame>>,
99    close_trigger_frames: Option<CloseTriggerFrames>,
100}
101
102impl<T> From<T> for StreamMap
103where
104    T: IntoIterator<Item = (u64, Vec<H3iFrame>)>,
105{
106    fn from(value: T) -> Self {
107        let stream_frame_map = HashMap::from_iter(value);
108
109        Self {
110            stream_frame_map,
111            close_trigger_frames: None,
112        }
113    }
114}
115
116impl StreamMap {
117    /// Flatten all received frames into a single vector. The ordering is
118    /// non-deterministic.
119    ///
120    /// # Example
121    ///
122    /// ```
123    /// use h3i::client::connection_summary::StreamMap;
124    /// use h3i::frame::EnrichedHeaders;
125    /// use h3i::frame::H3iFrame;
126    /// use quiche::h3::Header;
127    /// use std::iter::FromIterator;
128    ///
129    /// let h = Header::new(b"hello", b"world");
130    /// let headers = H3iFrame::Headers(EnrichedHeaders::from(vec![h]));
131    ///
132    /// let stream_map: StreamMap = [(0, vec![headers.clone()])].into();
133    /// assert_eq!(stream_map.all_frames(), vec![headers]);
134    /// ```
135    pub fn all_frames(&self) -> Vec<H3iFrame> {
136        self.stream_frame_map
137            .values()
138            .flatten()
139            .map(Clone::clone)
140            .collect::<Vec<H3iFrame>>()
141    }
142
143    /// Get all frames on a given `stream_id`.
144    ///
145    /// # Example
146    ///
147    /// ```
148    /// use h3i::client::connection_summary::StreamMap;
149    /// use h3i::frame::EnrichedHeaders;
150    /// use h3i::frame::H3iFrame;
151    /// use quiche::h3::Header;
152    /// use std::iter::FromIterator;
153    ///
154    /// let mut stream_map = StreamMap::default();
155    ///
156    /// let h = Header::new(b"hello", b"world");
157    /// let headers = H3iFrame::Headers(EnrichedHeaders::from(vec![h]));
158    ///
159    /// let stream_map: StreamMap = [(0, vec![headers.clone()])].into();
160    /// assert_eq!(stream_map.stream(0), vec![headers]);
161    /// ```
162    pub fn stream(&self, stream_id: u64) -> Vec<H3iFrame> {
163        self.stream_frame_map
164            .get(&stream_id)
165            .cloned()
166            .unwrap_or_default()
167    }
168
169    /// Check if a provided [`H3iFrame`] was received, regardless of what stream
170    /// it was received on.
171    ///
172    /// # Example
173    ///
174    /// ```
175    /// use h3i::client::connection_summary::StreamMap;
176    /// use h3i::frame::EnrichedHeaders;
177    /// use h3i::frame::H3iFrame;
178    /// use quiche::h3::Header;
179    /// use std::iter::FromIterator;
180    ///
181    /// let h = Header::new(b"hello", b"world");
182    /// let headers = H3iFrame::Headers(EnrichedHeaders::from(vec![h]));
183    ///
184    /// let stream_map: StreamMap = [(0, vec![headers.clone()])].into();
185    /// assert!(stream_map.received_frame(&headers));
186    /// ```
187    pub fn received_frame(&self, frame: &H3iFrame) -> bool {
188        self.all_frames().contains(frame)
189    }
190
191    /// Check if a provided [`H3iFrame`] was received over a specified stream.
192    ///
193    /// # Example
194    ///
195    /// ```
196    /// use h3i::client::connection_summary::StreamMap;
197    /// use h3i::frame::EnrichedHeaders;
198    /// use h3i::frame::H3iFrame;
199    /// use quiche::h3::Header;
200    /// use std::iter::FromIterator;
201    ///
202    /// let h = Header::new(b"hello", b"world");
203    /// let headers = H3iFrame::Headers(EnrichedHeaders::from(vec![h]));
204    ///
205    /// let stream_map: StreamMap = [(0, vec![headers.clone()])].into();
206    /// assert!(stream_map.received_frame_on_stream(0, &headers));
207    /// ```
208    pub fn received_frame_on_stream(
209        &self, stream: u64, frame: &H3iFrame,
210    ) -> bool {
211        self.stream_frame_map
212            .get(&stream)
213            .map(|v| v.contains(frame))
214            .is_some()
215    }
216
217    /// Check if the stream map is empty, e.g., no frames were received.
218    ///
219    /// # Example
220    ///
221    /// ```
222    /// use h3i::client::connection_summary::StreamMap;
223    /// use h3i::frame::EnrichedHeaders;
224    /// use h3i::frame::H3iFrame;
225    /// use quiche::h3::Header;
226    /// use std::iter::FromIterator;
227    ///
228    /// let mut stream_map = StreamMap::default();
229    /// assert!(stream_map.is_empty());
230    ///
231    /// let h = Header::new(b"hello", b"world");
232    /// let headers = H3iFrame::Headers(EnrichedHeaders::from(vec![h]));
233    ///
234    /// let stream_map: StreamMap = [(0, vec![headers.clone()])].into();
235    /// assert!(!stream_map.is_empty());
236    /// ```
237    pub fn is_empty(&self) -> bool {
238        self.stream_frame_map.is_empty()
239    }
240
241    /// See all HEADERS received on a given stream.
242    ///
243    /// # Example
244    ///
245    /// ```
246    /// use h3i::client::connection_summary::StreamMap;
247    /// use h3i::frame::EnrichedHeaders;
248    /// use h3i::frame::H3iFrame;
249    /// use quiche::h3::Header;
250    /// use std::iter::FromIterator;
251    ///
252    /// let h = Header::new(b"hello", b"world");
253    /// let enriched = EnrichedHeaders::from(vec![h]);
254    /// let headers = H3iFrame::Headers(enriched.clone());
255    /// let data = H3iFrame::QuicheH3(quiche::h3::frame::Frame::Data {
256    ///     payload: b"hello world".to_vec(),
257    /// });
258    ///
259    /// let stream_map: StreamMap = [(0, vec![headers.clone(), data.clone()])].into();
260    /// assert_eq!(stream_map.headers_on_stream(0), vec![enriched]);
261    /// ```
262    pub fn headers_on_stream(&self, stream_id: u64) -> Vec<EnrichedHeaders> {
263        self.stream(stream_id)
264            .into_iter()
265            .filter_map(|h3i_frame| h3i_frame.to_enriched_headers())
266            .collect()
267    }
268
269    /// If all [`CloseTriggerFrame`]s were seen. If no triggers were expected,
270    /// this will return `false`.
271    pub fn all_close_trigger_frames_seen(&self) -> bool {
272        if let Some(triggers) = self.close_trigger_frames.as_ref() {
273            triggers.saw_all_trigger_frames()
274        } else {
275            false
276        }
277    }
278
279    /// The set of all [`CloseTriggerFrame`]s that were _not_ seen on the
280    /// connection. Returns `None` if
281    pub fn missing_close_trigger_frames(&self) -> Option<Vec<CloseTriggerFrame>> {
282        self.close_trigger_frames
283            .as_ref()
284            .map(|e| e.missing_triggers())
285    }
286
287    ///  Not `pub` as users aren't expected to build their own [`StreamMap`]s.
288    pub(crate) fn new(close_trigger_frames: Option<CloseTriggerFrames>) -> Self {
289        Self {
290            close_trigger_frames,
291            ..Default::default()
292        }
293    }
294
295    pub(crate) fn insert(&mut self, stream_id: u64, frame: H3iFrame) {
296        if let Some(expected) = self.close_trigger_frames.as_mut() {
297            expected.receive_frame(stream_id, &frame);
298        }
299
300        self.stream_frame_map
301            .entry(stream_id)
302            .or_default()
303            .push(frame);
304    }
305
306    /// Close a [`quiche::Connection`] with the CONNECTION_CLOSE frame specified
307    /// by [`CloseTriggerFrames`]. If no [`CloseTriggerFrames`] exist, this is a
308    /// no-op.
309    pub(crate) fn close_due_to_trigger_frames(
310        &self, qconn: &mut quiche::Connection,
311    ) {
312        if let Some(ConnectionError {
313            is_app,
314            error_code,
315            reason,
316        }) = self.close_trigger_frames.as_ref().map(|tf| &tf.close_with)
317        {
318            let _ = qconn.close(*is_app, *error_code, reason);
319        }
320    }
321}
322
323/// A container for frames that h3i expects to see over a given connection. If
324/// h3i receives all the frames it expects, it will send a CONNECTION_CLOSE
325/// frame to the server. This bypasses the idle timeout and vastly quickens test
326/// suites which depend heavily on h3i.
327///
328/// The specific CONNECTION_CLOSE frame can be customized by passing a
329/// [`ConnectionError`] to [`Self::new_with_connection_close`]. h3i will send an
330/// application CONNECTION_CLOSE frame with error code 0x100 if this struct is
331/// constructed with the [`Self::new`] constructor.
332#[derive(Clone, Serialize, Debug)]
333pub struct CloseTriggerFrames {
334    missing: Vec<CloseTriggerFrame>,
335    #[serde(skip)]
336    close_with: ConnectionError,
337}
338
339impl CloseTriggerFrames {
340    /// Create a new [`CloseTriggerFrames`]. If all expected frames are
341    /// received, h3i will close the connection with an application-level
342    /// CONNECTION_CLOSE frame with error code 0x100.
343    pub fn new(frames: Vec<CloseTriggerFrame>) -> Self {
344        Self::new_with_connection_close(frames, ConnectionError {
345            is_app: true,
346            error_code: quiche::h3::WireErrorCode::NoError as u64,
347            reason: b"saw all close trigger frames".to_vec(),
348        })
349    }
350
351    /// Create a new [`CloseTriggerFrames`] with a custom close frame. When all
352    /// close trigger frames are received, h3i will close the connection with
353    /// the level, error code, and reason from `close_with`.
354    pub fn new_with_connection_close(
355        frames: Vec<CloseTriggerFrame>, close_with: ConnectionError,
356    ) -> Self {
357        Self {
358            missing: frames,
359            close_with,
360        }
361    }
362
363    fn receive_frame(&mut self, stream_id: u64, frame: &H3iFrame) {
364        for (i, trigger) in self.missing.iter_mut().enumerate() {
365            if trigger.is_equivalent(frame) && trigger.stream_id() == stream_id {
366                self.missing.remove(i);
367                break;
368            }
369        }
370    }
371
372    fn saw_all_trigger_frames(&self) -> bool {
373        self.missing.is_empty()
374    }
375
376    fn missing_triggers(&self) -> Vec<CloseTriggerFrame> {
377        self.missing.clone()
378    }
379}
380
381impl From<Vec<CloseTriggerFrame>> for CloseTriggerFrames {
382    fn from(value: Vec<CloseTriggerFrame>) -> Self {
383        Self::new(value)
384    }
385}
386
387/// Denotes why the connection was closed.
388#[derive(Default)]
389pub struct ConnectionCloseDetails {
390    peer_error: Option<ConnectionError>,
391    local_error: Option<ConnectionError>,
392    /// If the connection timed out.
393    pub timed_out: bool,
394    /// Return the session from the underlying connection.
395    pub session: Option<Vec<u8>>,
396}
397
398impl core::fmt::Debug for ConnectionCloseDetails {
399    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
400        // Avoid printing 'session' since it contains connection secrets.
401        f.debug_struct("ConnectionCloseDetails")
402            .field("peer_error", &self.peer_error)
403            .field("local_error", &self.local_error)
404            .field("timed_out", &self.timed_out)
405            .finish()
406    }
407}
408
409impl ConnectionCloseDetails {
410    pub fn new<F: quiche::BufFactory>(qconn: &quiche::Connection<F>) -> Self {
411        let session = qconn.session().map(|s| s.to_vec());
412        Self {
413            peer_error: qconn.peer_error().cloned(),
414            local_error: qconn.local_error().cloned(),
415            timed_out: qconn.is_timed_out(),
416            session,
417        }
418    }
419
420    /// The error sent from the peer, if any.
421    pub fn peer_error(&self) -> Option<&ConnectionError> {
422        self.peer_error.as_ref()
423    }
424
425    /// The error generated locally, if any.
426    pub fn local_error(&self) -> Option<&ConnectionError> {
427        self.local_error.as_ref()
428    }
429
430    /// If the connection didn't see an error, either one from the peer or
431    /// generated locally.
432    pub fn no_err(&self) -> bool {
433        self.peer_error.is_none() && self.local_error.is_none()
434    }
435}
436
437impl Serialize for ConnectionCloseDetails {
438    fn serialize<S>(&self, s: S) -> Result<S::Ok, S::Error>
439    where
440        S: Serializer,
441    {
442        let mut state: <S as Serializer>::SerializeStruct =
443            s.serialize_struct("enriched_connection_error", 3)?;
444        if let Some(pe) = &self.peer_error {
445            state.serialize_field(
446                "peer_error",
447                &SerializableConnectionError(pe),
448            )?;
449        }
450
451        if let Some(le) = &self.local_error {
452            state.serialize_field(
453                "local_error",
454                &SerializableConnectionError(le),
455            )?;
456        }
457
458        state.serialize_field("timed_out", &self.timed_out)?;
459        state.end()
460    }
461}
462
463/// A wrapper to help serialize [quiche::PathStats]
464pub struct SerializablePathStats<'a>(&'a quiche::PathStats);
465
466impl Serialize for SerializablePathStats<'_> {
467    fn serialize<S>(&self, s: S) -> Result<S::Ok, S::Error>
468    where
469        S: Serializer,
470    {
471        let mut state = s.serialize_struct("path_stats", 17)?;
472        state.serialize_field("local_addr", &self.0.local_addr)?;
473        state.serialize_field("peer_addr", &self.0.peer_addr)?;
474        state.serialize_field("active", &self.0.active)?;
475        state.serialize_field("recv", &self.0.recv)?;
476        state.serialize_field("sent", &self.0.sent)?;
477        state.serialize_field("lost", &self.0.lost)?;
478        state.serialize_field("retrans", &self.0.retrans)?;
479        state.serialize_field("rtt", &self.0.rtt.as_secs_f64())?;
480        state.serialize_field(
481            "min_rtt",
482            &self.0.min_rtt.map(|x| x.as_secs_f64()),
483        )?;
484        state.serialize_field("rttvar", &self.0.rttvar.as_secs_f64())?;
485        state.serialize_field("cwnd", &self.0.cwnd)?;
486        state.serialize_field("sent_bytes", &self.0.sent_bytes)?;
487        state.serialize_field("recv_bytes", &self.0.recv_bytes)?;
488        state.serialize_field("lost_bytes", &self.0.lost_bytes)?;
489        state.serialize_field(
490            "stream_retrans_bytes",
491            &self.0.stream_retrans_bytes,
492        )?;
493        state.serialize_field("pmtu", &self.0.pmtu)?;
494        state.serialize_field("delivery_rate", &self.0.delivery_rate)?;
495        state.end()
496    }
497}
498
499/// A wrapper to help serialize [quiche::Stats]
500pub struct SerializableStats<'a>(&'a quiche::Stats);
501
502impl Serialize for SerializableStats<'_> {
503    fn serialize<S>(&self, s: S) -> Result<S::Ok, S::Error>
504    where
505        S: Serializer,
506    {
507        let mut state = s.serialize_struct("path_stats", 14)?;
508        state.serialize_field("recv", &self.0.recv)?;
509        state.serialize_field("sent", &self.0.sent)?;
510        state.serialize_field("lost", &self.0.lost)?;
511        state.serialize_field("retrans", &self.0.retrans)?;
512        state.serialize_field("sent_bytes", &self.0.sent_bytes)?;
513        state.serialize_field("recv_bytes", &self.0.recv_bytes)?;
514        state.serialize_field("lost_bytes", &self.0.lost_bytes)?;
515        state.serialize_field(
516            "stream_retrans_bytes",
517            &self.0.stream_retrans_bytes,
518        )?;
519        state.serialize_field("paths_count", &self.0.paths_count)?;
520        state.serialize_field(
521            "reset_stream_count_local",
522            &self.0.reset_stream_count_local,
523        )?;
524        state.serialize_field(
525            "stopped_stream_count_local",
526            &self.0.stopped_stream_count_local,
527        )?;
528        state.serialize_field(
529            "reset_stream_count_remote",
530            &self.0.reset_stream_count_remote,
531        )?;
532        state.serialize_field(
533            "stopped_stream_count_remote",
534            &self.0.stopped_stream_count_remote,
535        )?;
536        state.serialize_field(
537            "path_challenge_rx_count",
538            &self.0.path_challenge_rx_count,
539        )?;
540        state.end()
541    }
542}
543
544/// A wrapper to help serialize a [quiche::ConnectionError]
545#[derive(Clone, Debug)]
546pub struct SerializableConnectionError<'a>(&'a quiche::ConnectionError);
547
548impl Serialize for SerializableConnectionError<'_> {
549    fn serialize<S>(&self, s: S) -> Result<S::Ok, S::Error>
550    where
551        S: Serializer,
552    {
553        let mut state = s.serialize_struct("path_stats", 3)?;
554        state.serialize_field("is_app", &self.0.is_app)?;
555        state.serialize_field("error_code", &self.0.error_code)?;
556        let max = cmp::min(self.0.reason.len(), MAX_SERIALIZED_BUFFER_LEN);
557        state.serialize_field(
558            "reason",
559            &String::from_utf8_lossy(&self.0.reason[..max]),
560        )?;
561        state.end()
562    }
563}
564
565#[cfg(test)]
566mod tests {
567    use super::*;
568    use crate::frame::EnrichedHeaders;
569    use quiche::h3::Header;
570
571    fn h3i_frame() -> H3iFrame {
572        vec![Header::new(b"hello", b"world")].into()
573    }
574
575    #[test]
576    fn close_trigger_frame() {
577        let frame = h3i_frame();
578        let mut triggers = CloseTriggerFrames::new(vec![CloseTriggerFrame::new(
579            0,
580            frame.clone(),
581        )]);
582
583        triggers.receive_frame(0, &frame);
584
585        assert!(triggers.saw_all_trigger_frames());
586    }
587
588    #[test]
589    fn trigger_frame_missing() {
590        let frame = h3i_frame();
591        let expected_frames = vec![
592            CloseTriggerFrame::new(0, frame.clone()),
593            CloseTriggerFrame::new(4, frame.clone()),
594            CloseTriggerFrame::new(8, vec![Header::new(b"go", b"jets")]),
595        ];
596        let mut expected = CloseTriggerFrames::new(expected_frames.clone());
597
598        expected.receive_frame(0, &frame);
599
600        assert!(!expected.saw_all_trigger_frames());
601        assert_eq!(expected.missing_triggers(), expected_frames[1..].to_vec());
602    }
603
604    fn stream_map_data() -> Vec<H3iFrame> {
605        let headers =
606            H3iFrame::Headers(EnrichedHeaders::from(vec![Header::new(
607                b"hello", b"world",
608            )]));
609        let data = H3iFrame::QuicheH3(quiche::h3::frame::Frame::Data {
610            payload: b"hello world".to_vec(),
611        });
612
613        vec![headers, data]
614    }
615
616    #[test]
617    fn test_stream_map_trigger_frames_with_none() {
618        let stream_map: StreamMap = vec![(0, stream_map_data())].into();
619        assert!(!stream_map.all_close_trigger_frames_seen());
620    }
621
622    #[test]
623    fn test_stream_map_trigger_frames() {
624        let data = stream_map_data();
625        let mut stream_map = StreamMap::new(Some(
626            vec![
627                CloseTriggerFrame::new(0, data[0].clone()),
628                CloseTriggerFrame::new(0, data[1].clone()),
629            ]
630            .into(),
631        ));
632
633        stream_map.insert(0, data[0].clone());
634        assert!(!stream_map.all_close_trigger_frames_seen());
635        assert_eq!(stream_map.missing_close_trigger_frames().unwrap(), vec![
636            CloseTriggerFrame::new(0, data[1].clone())
637        ]);
638    }
639}