h3i/client/
connection_summary.rs

1// Copyright (C) 2024, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27//! Summarizes events that occurred during a connection.
28
29use quiche::Connection;
30use quiche::ConnectionError;
31use quiche::PathStats;
32use quiche::Stats;
33use serde::ser::SerializeStruct;
34use serde::ser::Serializer;
35use serde::Serialize;
36use std::cmp;
37use std::collections::HashMap;
38use std::iter::FromIterator;
39
40use crate::frame::CloseTriggerFrame;
41use crate::frame::EnrichedHeaders;
42use crate::frame::H3iFrame;
43use crate::quiche;
44
45/// Maximum length of any serialized element's unstructured data such as reason
46/// phrase.
47pub const MAX_SERIALIZED_BUFFER_LEN: usize = 16384;
48
49/// A summary of all frames received on a connection. There are some extra
50/// fields included to provide additional context into the connection's
51/// behavior.
52///
53/// ConnectionSummary implements [Serialize]. HTTP/3 frames that contain binary
54/// payload are serialized using the qlog
55/// [hexstring](https://www.ietf.org/archive/id/draft-ietf-quic-qlog-main-schema-10.html#section-1.2)
56/// format - "an even-length lowercase string of hexadecimally encoded bytes
57/// examples: 82dc, 027339, 4cdbfd9bf0"
58#[derive(Default, Debug)]
59pub struct ConnectionSummary {
60    pub stream_map: StreamMap,
61    /// L4 statistics received from the connection.
62    pub stats: Option<Stats>,
63    /// Statistics about all paths of the connection.
64    pub path_stats: Vec<PathStats>,
65    /// Details about why the connection closed.
66    pub conn_close_details: ConnectionCloseDetails,
67}
68
69impl Serialize for ConnectionSummary {
70    fn serialize<S>(&self, s: S) -> Result<S::Ok, S::Error>
71    where
72        S: Serializer,
73    {
74        let mut state = s.serialize_struct("path_stats", 4)?;
75        state.serialize_field("stream_map", &self.stream_map)?;
76        state.serialize_field(
77            "stats",
78            &self.stats.as_ref().map(SerializableStats),
79        )?;
80        let p: Vec<SerializablePathStats> =
81            self.path_stats.iter().map(SerializablePathStats).collect();
82        state.serialize_field("path_stats", &p)?;
83        state.serialize_field("error", &self.conn_close_details)?;
84        state.serialize_field(
85            "missed_close_trigger_frames",
86            &self.stream_map.missing_close_trigger_frames(),
87        )?;
88        state.end()
89    }
90}
91
92/// A read-only aggregation of frames received over a connection, mapped to the
93/// stream ID over which they were received.
94///
95/// [`StreamMap`] also contains the [`CloseTriggerFrames`] for the connection so
96/// that its state can be updated as new frames are received.
97#[derive(Clone, Debug, Default, Serialize)]
98pub struct StreamMap {
99    stream_frame_map: HashMap<u64, Vec<H3iFrame>>,
100    close_trigger_frames: Option<CloseTriggerFrames>,
101}
102
103impl<T> From<T> for StreamMap
104where
105    T: IntoIterator<Item = (u64, Vec<H3iFrame>)>,
106{
107    fn from(value: T) -> Self {
108        let stream_frame_map = HashMap::from_iter(value);
109
110        Self {
111            stream_frame_map,
112            close_trigger_frames: None,
113        }
114    }
115}
116
117impl StreamMap {
118    /// Flatten all received frames into a single vector. The ordering is
119    /// non-deterministic.
120    ///
121    /// # Example
122    ///
123    /// ```
124    /// use h3i::client::connection_summary::StreamMap;
125    /// use h3i::frame::EnrichedHeaders;
126    /// use h3i::frame::H3iFrame;
127    /// use quiche::h3::Header;
128    /// use std::iter::FromIterator;
129    ///
130    /// let h = Header::new(b"hello", b"world");
131    /// let headers = H3iFrame::Headers(EnrichedHeaders::from(vec![h]));
132    ///
133    /// let stream_map: StreamMap = [(0, vec![headers.clone()])].into();
134    /// assert_eq!(stream_map.all_frames(), vec![headers]);
135    /// ```
136    pub fn all_frames(&self) -> Vec<H3iFrame> {
137        self.stream_frame_map
138            .values()
139            .flatten()
140            .map(Clone::clone)
141            .collect::<Vec<H3iFrame>>()
142    }
143
144    /// Get all frames on a given `stream_id`.
145    ///
146    /// # Example
147    ///
148    /// ```
149    /// use h3i::client::connection_summary::StreamMap;
150    /// use h3i::frame::EnrichedHeaders;
151    /// use h3i::frame::H3iFrame;
152    /// use quiche::h3::Header;
153    /// use std::iter::FromIterator;
154    ///
155    /// let mut stream_map = StreamMap::default();
156    ///
157    /// let h = Header::new(b"hello", b"world");
158    /// let headers = H3iFrame::Headers(EnrichedHeaders::from(vec![h]));
159    ///
160    /// let stream_map: StreamMap = [(0, vec![headers.clone()])].into();
161    /// assert_eq!(stream_map.stream(0), vec![headers]);
162    /// ```
163    pub fn stream(&self, stream_id: u64) -> Vec<H3iFrame> {
164        self.stream_frame_map
165            .get(&stream_id)
166            .cloned()
167            .unwrap_or_default()
168    }
169
170    /// Check if a provided [`H3iFrame`] was received, regardless of what stream
171    /// it was received on.
172    ///
173    /// # Example
174    ///
175    /// ```
176    /// use h3i::client::connection_summary::StreamMap;
177    /// use h3i::frame::EnrichedHeaders;
178    /// use h3i::frame::H3iFrame;
179    /// use quiche::h3::Header;
180    /// use std::iter::FromIterator;
181    ///
182    /// let h = Header::new(b"hello", b"world");
183    /// let headers = H3iFrame::Headers(EnrichedHeaders::from(vec![h]));
184    ///
185    /// let stream_map: StreamMap = [(0, vec![headers.clone()])].into();
186    /// assert!(stream_map.received_frame(&headers));
187    /// ```
188    pub fn received_frame(&self, frame: &H3iFrame) -> bool {
189        self.all_frames().contains(frame)
190    }
191
192    /// Check if a provided [`H3iFrame`] was received over a specified stream.
193    ///
194    /// # Example
195    ///
196    /// ```
197    /// use h3i::client::connection_summary::StreamMap;
198    /// use h3i::frame::EnrichedHeaders;
199    /// use h3i::frame::H3iFrame;
200    /// use quiche::h3::Header;
201    /// use std::iter::FromIterator;
202    ///
203    /// let h = Header::new(b"hello", b"world");
204    /// let headers = H3iFrame::Headers(EnrichedHeaders::from(vec![h]));
205    ///
206    /// let stream_map: StreamMap = [(0, vec![headers.clone()])].into();
207    /// assert!(stream_map.received_frame_on_stream(0, &headers));
208    /// ```
209    pub fn received_frame_on_stream(
210        &self, stream: u64, frame: &H3iFrame,
211    ) -> bool {
212        self.stream_frame_map
213            .get(&stream)
214            .map(|v| v.contains(frame))
215            .is_some()
216    }
217
218    /// Check if the stream map is empty, e.g., no frames were received.
219    ///
220    /// # Example
221    ///
222    /// ```
223    /// use h3i::client::connection_summary::StreamMap;
224    /// use h3i::frame::EnrichedHeaders;
225    /// use h3i::frame::H3iFrame;
226    /// use quiche::h3::Header;
227    /// use std::iter::FromIterator;
228    ///
229    /// let mut stream_map = StreamMap::default();
230    /// assert!(stream_map.is_empty());
231    ///
232    /// let h = Header::new(b"hello", b"world");
233    /// let headers = H3iFrame::Headers(EnrichedHeaders::from(vec![h]));
234    ///
235    /// let stream_map: StreamMap = [(0, vec![headers.clone()])].into();
236    /// assert!(!stream_map.is_empty());
237    /// ```
238    pub fn is_empty(&self) -> bool {
239        self.stream_frame_map.is_empty()
240    }
241
242    /// See all HEADERS received on a given stream.
243    ///
244    /// # Example
245    ///
246    /// ```
247    /// use h3i::client::connection_summary::StreamMap;
248    /// use h3i::frame::EnrichedHeaders;
249    /// use h3i::frame::H3iFrame;
250    /// use quiche::h3::Header;
251    /// use std::iter::FromIterator;
252    ///
253    /// let h = Header::new(b"hello", b"world");
254    /// let enriched = EnrichedHeaders::from(vec![h]);
255    /// let headers = H3iFrame::Headers(enriched.clone());
256    /// let data = H3iFrame::QuicheH3(quiche::h3::frame::Frame::Data {
257    ///     payload: b"hello world".to_vec(),
258    /// });
259    ///
260    /// let stream_map: StreamMap = [(0, vec![headers.clone(), data.clone()])].into();
261    /// assert_eq!(stream_map.headers_on_stream(0), vec![enriched]);
262    /// ```
263    pub fn headers_on_stream(&self, stream_id: u64) -> Vec<EnrichedHeaders> {
264        self.stream(stream_id)
265            .into_iter()
266            .filter_map(|h3i_frame| h3i_frame.to_enriched_headers())
267            .collect()
268    }
269
270    /// If all [`CloseTriggerFrame`]s were seen. If no triggers were expected,
271    /// this will return `false`.
272    pub fn all_close_trigger_frames_seen(&self) -> bool {
273        if let Some(triggers) = self.close_trigger_frames.as_ref() {
274            triggers.saw_all_trigger_frames()
275        } else {
276            false
277        }
278    }
279
280    /// The set of all [`CloseTriggerFrame`]s that were _not_ seen on the
281    /// connection. Returns `None` if
282    pub fn missing_close_trigger_frames(&self) -> Option<Vec<CloseTriggerFrame>> {
283        self.close_trigger_frames
284            .as_ref()
285            .map(|e| e.missing_triggers())
286    }
287
288    ///  Not `pub` as users aren't expected to build their own [`StreamMap`]s.
289    pub(crate) fn new(close_trigger_frames: Option<CloseTriggerFrames>) -> Self {
290        Self {
291            close_trigger_frames,
292            ..Default::default()
293        }
294    }
295
296    pub(crate) fn insert(&mut self, stream_id: u64, frame: H3iFrame) {
297        if let Some(expected) = self.close_trigger_frames.as_mut() {
298            expected.receive_frame(stream_id, &frame);
299        }
300
301        self.stream_frame_map
302            .entry(stream_id)
303            .or_default()
304            .push(frame);
305    }
306
307    /// Close a [`quiche::Connection`] with the CONNECTION_CLOSE frame specified
308    /// by [`CloseTriggerFrames`]. If no [`CloseTriggerFrames`] exist, this is a
309    /// no-op.
310    pub(crate) fn close_due_to_trigger_frames(
311        &self, qconn: &mut quiche::Connection,
312    ) {
313        if let Some(ConnectionError {
314            is_app,
315            error_code,
316            reason,
317        }) = self.close_trigger_frames.as_ref().map(|tf| &tf.close_with)
318        {
319            let _ = qconn.close(*is_app, *error_code, reason);
320        }
321    }
322}
323
324/// A container for frames that h3i expects to see over a given connection. If
325/// h3i receives all the frames it expects, it will send a CONNECTION_CLOSE
326/// frame to the server. This bypasses the idle timeout and vastly quickens test
327/// suites which depend heavily on h3i.
328///
329/// The specific CONNECTION_CLOSE frame can be customized by passing a
330/// [`ConnectionError`] to [`Self::new_with_connection_close`]. h3i will send an
331/// application CONNECTION_CLOSE frame with error code 0x100 if this struct is
332/// constructed with the [`Self::new`] constructor.
333#[derive(Clone, Serialize, Debug)]
334pub struct CloseTriggerFrames {
335    missing: Vec<CloseTriggerFrame>,
336    #[serde(skip)]
337    close_with: ConnectionError,
338}
339
340impl CloseTriggerFrames {
341    /// Create a new [`CloseTriggerFrames`]. If all expected frames are
342    /// received, h3i will close the connection with an application-level
343    /// CONNECTION_CLOSE frame with error code 0x100.
344    pub fn new(frames: Vec<CloseTriggerFrame>) -> Self {
345        Self::new_with_connection_close(frames, ConnectionError {
346            is_app: true,
347            error_code: quiche::h3::WireErrorCode::NoError as u64,
348            reason: b"saw all close trigger frames".to_vec(),
349        })
350    }
351
352    /// Create a new [`CloseTriggerFrames`] with a custom close frame. When all
353    /// close trigger frames are received, h3i will close the connection with
354    /// the level, error code, and reason from `close_with`.
355    pub fn new_with_connection_close(
356        frames: Vec<CloseTriggerFrame>, close_with: ConnectionError,
357    ) -> Self {
358        Self {
359            missing: frames,
360            close_with,
361        }
362    }
363
364    fn receive_frame(&mut self, stream_id: u64, frame: &H3iFrame) {
365        for (i, trigger) in self.missing.iter_mut().enumerate() {
366            if trigger.is_equivalent(frame) && trigger.stream_id() == stream_id {
367                self.missing.remove(i);
368                break;
369            }
370        }
371    }
372
373    fn saw_all_trigger_frames(&self) -> bool {
374        self.missing.is_empty()
375    }
376
377    fn missing_triggers(&self) -> Vec<CloseTriggerFrame> {
378        self.missing.clone()
379    }
380}
381
382impl From<Vec<CloseTriggerFrame>> for CloseTriggerFrames {
383    fn from(value: Vec<CloseTriggerFrame>) -> Self {
384        Self::new(value)
385    }
386}
387
388/// Denotes why the connection was closed.
389#[derive(Default)]
390pub struct ConnectionCloseDetails {
391    peer_error: Option<ConnectionError>,
392    local_error: Option<ConnectionError>,
393    /// If the connection timed out.
394    pub timed_out: bool,
395    /// Return the session from the underlying connection.
396    pub session: Option<Vec<u8>>,
397}
398
399impl core::fmt::Debug for ConnectionCloseDetails {
400    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
401        // Avoid printing 'session' since it contains connection secrets.
402        f.debug_struct("ConnectionCloseDetails")
403            .field("peer_error", &self.peer_error)
404            .field("local_error", &self.local_error)
405            .field("timed_out", &self.timed_out)
406            .finish()
407    }
408}
409
410impl ConnectionCloseDetails {
411    pub fn new(qconn: &Connection) -> Self {
412        let session = qconn.session().map(|s| s.to_vec());
413        Self {
414            peer_error: qconn.peer_error().cloned(),
415            local_error: qconn.local_error().cloned(),
416            timed_out: qconn.is_timed_out(),
417            session,
418        }
419    }
420
421    /// The error sent from the peer, if any.
422    pub fn peer_error(&self) -> Option<&ConnectionError> {
423        self.peer_error.as_ref()
424    }
425
426    /// The error generated locally, if any.
427    pub fn local_error(&self) -> Option<&ConnectionError> {
428        self.local_error.as_ref()
429    }
430
431    /// If the connection didn't see an error, either one from the peer or
432    /// generated locally.
433    pub fn no_err(&self) -> bool {
434        self.peer_error.is_none() && self.local_error.is_none()
435    }
436}
437
438impl Serialize for ConnectionCloseDetails {
439    fn serialize<S>(&self, s: S) -> Result<S::Ok, S::Error>
440    where
441        S: Serializer,
442    {
443        let mut state: <S as Serializer>::SerializeStruct =
444            s.serialize_struct("enriched_connection_error", 3)?;
445        if let Some(pe) = &self.peer_error {
446            state.serialize_field(
447                "peer_error",
448                &SerializableConnectionError(pe),
449            )?;
450        }
451
452        if let Some(le) = &self.local_error {
453            state.serialize_field(
454                "local_error",
455                &SerializableConnectionError(le),
456            )?;
457        }
458
459        state.serialize_field("timed_out", &self.timed_out)?;
460        state.end()
461    }
462}
463
464/// A wrapper to help serialize [quiche::PathStats]
465pub struct SerializablePathStats<'a>(&'a quiche::PathStats);
466
467impl Serialize for SerializablePathStats<'_> {
468    fn serialize<S>(&self, s: S) -> Result<S::Ok, S::Error>
469    where
470        S: Serializer,
471    {
472        let mut state = s.serialize_struct("path_stats", 17)?;
473        state.serialize_field("local_addr", &self.0.local_addr)?;
474        state.serialize_field("peer_addr", &self.0.peer_addr)?;
475        state.serialize_field("active", &self.0.active)?;
476        state.serialize_field("recv", &self.0.recv)?;
477        state.serialize_field("sent", &self.0.sent)?;
478        state.serialize_field("lost", &self.0.lost)?;
479        state.serialize_field("retrans", &self.0.retrans)?;
480        state.serialize_field("rtt", &self.0.rtt.as_secs_f64())?;
481        state.serialize_field(
482            "min_rtt",
483            &self.0.min_rtt.map(|x| x.as_secs_f64()),
484        )?;
485        state.serialize_field("rttvar", &self.0.rttvar.as_secs_f64())?;
486        state.serialize_field("cwnd", &self.0.cwnd)?;
487        state.serialize_field("sent_bytes", &self.0.sent_bytes)?;
488        state.serialize_field("recv_bytes", &self.0.recv_bytes)?;
489        state.serialize_field("lost_bytes", &self.0.lost_bytes)?;
490        state.serialize_field(
491            "stream_retrans_bytes",
492            &self.0.stream_retrans_bytes,
493        )?;
494        state.serialize_field("pmtu", &self.0.pmtu)?;
495        state.serialize_field("delivery_rate", &self.0.delivery_rate)?;
496        state.end()
497    }
498}
499
500/// A wrapper to help serialize [quiche::Stats]
501pub struct SerializableStats<'a>(&'a quiche::Stats);
502
503impl Serialize for SerializableStats<'_> {
504    fn serialize<S>(&self, s: S) -> Result<S::Ok, S::Error>
505    where
506        S: Serializer,
507    {
508        let mut state = s.serialize_struct("path_stats", 14)?;
509        state.serialize_field("recv", &self.0.recv)?;
510        state.serialize_field("sent", &self.0.sent)?;
511        state.serialize_field("lost", &self.0.lost)?;
512        state.serialize_field("retrans", &self.0.retrans)?;
513        state.serialize_field("sent_bytes", &self.0.sent_bytes)?;
514        state.serialize_field("recv_bytes", &self.0.recv_bytes)?;
515        state.serialize_field("lost_bytes", &self.0.lost_bytes)?;
516        state.serialize_field(
517            "stream_retrans_bytes",
518            &self.0.stream_retrans_bytes,
519        )?;
520        state.serialize_field("paths_count", &self.0.paths_count)?;
521        state.serialize_field(
522            "reset_stream_count_local",
523            &self.0.reset_stream_count_local,
524        )?;
525        state.serialize_field(
526            "stopped_stream_count_local",
527            &self.0.stopped_stream_count_local,
528        )?;
529        state.serialize_field(
530            "reset_stream_count_remote",
531            &self.0.reset_stream_count_remote,
532        )?;
533        state.serialize_field(
534            "stopped_stream_count_remote",
535            &self.0.stopped_stream_count_remote,
536        )?;
537        state.serialize_field(
538            "path_challenge_rx_count",
539            &self.0.path_challenge_rx_count,
540        )?;
541        state.end()
542    }
543}
544
545/// A wrapper to help serialize a [quiche::ConnectionError]
546#[derive(Clone, Debug)]
547pub struct SerializableConnectionError<'a>(&'a quiche::ConnectionError);
548
549impl Serialize for SerializableConnectionError<'_> {
550    fn serialize<S>(&self, s: S) -> Result<S::Ok, S::Error>
551    where
552        S: Serializer,
553    {
554        let mut state = s.serialize_struct("path_stats", 3)?;
555        state.serialize_field("is_app", &self.0.is_app)?;
556        state.serialize_field("error_code", &self.0.error_code)?;
557        let max = cmp::min(self.0.reason.len(), MAX_SERIALIZED_BUFFER_LEN);
558        state.serialize_field(
559            "reason",
560            &String::from_utf8_lossy(&self.0.reason[..max]),
561        )?;
562        state.end()
563    }
564}
565
566#[cfg(test)]
567mod tests {
568    use super::*;
569    use crate::frame::EnrichedHeaders;
570    use quiche::h3::Header;
571
572    fn h3i_frame() -> H3iFrame {
573        vec![Header::new(b"hello", b"world")].into()
574    }
575
576    #[test]
577    fn close_trigger_frame() {
578        let frame = h3i_frame();
579        let mut triggers = CloseTriggerFrames::new(vec![CloseTriggerFrame::new(
580            0,
581            frame.clone(),
582        )]);
583
584        triggers.receive_frame(0, &frame);
585
586        assert!(triggers.saw_all_trigger_frames());
587    }
588
589    #[test]
590    fn trigger_frame_missing() {
591        let frame = h3i_frame();
592        let expected_frames = vec![
593            CloseTriggerFrame::new(0, frame.clone()),
594            CloseTriggerFrame::new(4, frame.clone()),
595            CloseTriggerFrame::new(8, vec![Header::new(b"go", b"jets")]),
596        ];
597        let mut expected = CloseTriggerFrames::new(expected_frames.clone());
598
599        expected.receive_frame(0, &frame);
600
601        assert!(!expected.saw_all_trigger_frames());
602        assert_eq!(expected.missing_triggers(), expected_frames[1..].to_vec());
603    }
604
605    fn stream_map_data() -> Vec<H3iFrame> {
606        let headers =
607            H3iFrame::Headers(EnrichedHeaders::from(vec![Header::new(
608                b"hello", b"world",
609            )]));
610        let data = H3iFrame::QuicheH3(quiche::h3::frame::Frame::Data {
611            payload: b"hello world".to_vec(),
612        });
613
614        vec![headers, data]
615    }
616
617    #[test]
618    fn test_stream_map_trigger_frames_with_none() {
619        let stream_map: StreamMap = vec![(0, stream_map_data())].into();
620        assert!(!stream_map.all_close_trigger_frames_seen());
621    }
622
623    #[test]
624    fn test_stream_map_trigger_frames() {
625        let data = stream_map_data();
626        let mut stream_map = StreamMap::new(Some(
627            vec![
628                CloseTriggerFrame::new(0, data[0].clone()),
629                CloseTriggerFrame::new(0, data[1].clone()),
630            ]
631            .into(),
632        ));
633
634        stream_map.insert(0, data[0].clone());
635        assert!(!stream_map.all_close_trigger_frames_seen());
636        assert_eq!(stream_map.missing_close_trigger_frames().unwrap(), vec![
637            CloseTriggerFrame::new(0, data[1].clone())
638        ]);
639    }
640}