h3i/client/
connection_summary.rs

1// Copyright (C) 2024, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27//! Summarizes events that occurred during a connection.
28
29use quiche;
30use quiche::Connection;
31use quiche::ConnectionError;
32use quiche::PathStats;
33use quiche::Stats;
34use serde::ser::SerializeStruct;
35use serde::ser::Serializer;
36use serde::Serialize;
37use std::cmp;
38use std::collections::HashMap;
39use std::iter::FromIterator;
40
41use crate::frame::CloseTriggerFrame;
42use crate::frame::EnrichedHeaders;
43use crate::frame::H3iFrame;
44
45/// Maximum length of any serialized element's unstructured data such as reason
46/// phrase.
47pub const MAX_SERIALIZED_BUFFER_LEN: usize = 16384;
48
49/// A summary of all frames received on a connection. There are some extra
50/// fields included to provide additional context into the connection's
51/// behavior.
52///
53/// ConnectionSummary implements [Serialize]. HTTP/3 frames that contain binary
54/// payload are serialized using the qlog
55/// [hexstring](https://www.ietf.org/archive/id/draft-ietf-quic-qlog-main-schema-10.html#section-1.2)
56/// format - "an even-length lowercase string of hexadecimally encoded bytes
57/// examples: 82dc, 027339, 4cdbfd9bf0"
58#[derive(Default, Debug)]
59pub struct ConnectionSummary {
60    pub stream_map: StreamMap,
61    /// L4 statistics received from the connection.
62    pub stats: Option<Stats>,
63    /// Statistics about all paths of the connection.
64    pub path_stats: Vec<PathStats>,
65    /// Details about why the connection closed.
66    pub conn_close_details: ConnectionCloseDetails,
67}
68
69impl Serialize for ConnectionSummary {
70    fn serialize<S>(&self, s: S) -> Result<S::Ok, S::Error>
71    where
72        S: Serializer,
73    {
74        let mut state = s.serialize_struct("path_stats", 4)?;
75        state.serialize_field("stream_map", &self.stream_map)?;
76        state.serialize_field(
77            "stats",
78            &self.stats.as_ref().map(SerializableStats),
79        )?;
80        let p: Vec<SerializablePathStats> =
81            self.path_stats.iter().map(SerializablePathStats).collect();
82        state.serialize_field("path_stats", &p)?;
83        state.serialize_field("error", &self.conn_close_details)?;
84        state.serialize_field(
85            "missed_close_trigger_frames",
86            &self.stream_map.missing_close_trigger_frames(),
87        )?;
88        state.end()
89    }
90}
91
92/// A read-only aggregation of frames received over a connection, mapped to the
93/// stream ID over which they were received.
94///
95/// [`StreamMap`] also contains the [`CloseTriggerFrames`] for the connection so
96/// that its state can be updated as new frames are received.
97#[derive(Clone, Debug, Default, Serialize)]
98pub struct StreamMap {
99    stream_frame_map: HashMap<u64, Vec<H3iFrame>>,
100    close_trigger_frames: Option<CloseTriggerFrames>,
101}
102
103impl<T> From<T> for StreamMap
104where
105    T: IntoIterator<Item = (u64, Vec<H3iFrame>)>,
106{
107    fn from(value: T) -> Self {
108        let stream_frame_map = HashMap::from_iter(value);
109
110        Self {
111            stream_frame_map,
112            close_trigger_frames: None,
113        }
114    }
115}
116
117impl StreamMap {
118    /// Flatten all received frames into a single vector. The ordering is
119    /// non-deterministic.
120    ///
121    /// # Example
122    ///
123    /// ```
124    /// use h3i::client::connection_summary::StreamMap;
125    /// use h3i::frame::EnrichedHeaders;
126    /// use h3i::frame::H3iFrame;
127    /// use quiche::h3::Header;
128    /// use std::iter::FromIterator;
129    ///
130    /// let h = Header::new(b"hello", b"world");
131    /// let headers = H3iFrame::Headers(EnrichedHeaders::from(vec![h]));
132    ///
133    /// let stream_map: StreamMap = [(0, vec![headers.clone()])].into();
134    /// assert_eq!(stream_map.all_frames(), vec![headers]);
135    /// ```
136    pub fn all_frames(&self) -> Vec<H3iFrame> {
137        self.stream_frame_map
138            .values()
139            .flatten()
140            .map(Clone::clone)
141            .collect::<Vec<H3iFrame>>()
142    }
143
144    /// Get all frames on a given `stream_id`.
145    ///
146    /// # Example
147    ///
148    /// ```
149    /// use h3i::client::connection_summary::StreamMap;
150    /// use h3i::frame::EnrichedHeaders;
151    /// use h3i::frame::H3iFrame;
152    /// use quiche::h3::Header;
153    /// use std::iter::FromIterator;
154    ///
155    /// let mut stream_map = StreamMap::default();
156    ///
157    /// let h = Header::new(b"hello", b"world");
158    /// let headers = H3iFrame::Headers(EnrichedHeaders::from(vec![h]));
159    ///
160    /// let stream_map: StreamMap = [(0, vec![headers.clone()])].into();
161    /// assert_eq!(stream_map.stream(0), vec![headers]);
162    /// ```
163    pub fn stream(&self, stream_id: u64) -> Vec<H3iFrame> {
164        self.stream_frame_map
165            .get(&stream_id)
166            .cloned()
167            .unwrap_or_default()
168    }
169
170    /// Check if a provided [`H3iFrame`] was received, regardless of what stream
171    /// it was received on.
172    ///
173    /// # Example
174    ///
175    /// ```
176    /// use h3i::client::connection_summary::StreamMap;
177    /// use h3i::frame::EnrichedHeaders;
178    /// use h3i::frame::H3iFrame;
179    /// use quiche::h3::Header;
180    /// use std::iter::FromIterator;
181    ///
182    /// let h = Header::new(b"hello", b"world");
183    /// let headers = H3iFrame::Headers(EnrichedHeaders::from(vec![h]));
184    ///
185    /// let stream_map: StreamMap = [(0, vec![headers.clone()])].into();
186    /// assert!(stream_map.received_frame(&headers));
187    /// ```
188    pub fn received_frame(&self, frame: &H3iFrame) -> bool {
189        self.all_frames().contains(frame)
190    }
191
192    /// Check if a provided [`H3iFrame`] was received over a specified stream.
193    ///
194    /// # Example
195    ///
196    /// ```
197    /// use h3i::client::connection_summary::StreamMap;
198    /// use h3i::frame::EnrichedHeaders;
199    /// use h3i::frame::H3iFrame;
200    /// use quiche::h3::Header;
201    /// use std::iter::FromIterator;
202    ///
203    /// let h = Header::new(b"hello", b"world");
204    /// let headers = H3iFrame::Headers(EnrichedHeaders::from(vec![h]));
205    ///
206    /// let stream_map: StreamMap = [(0, vec![headers.clone()])].into();
207    /// assert!(stream_map.received_frame_on_stream(0, &headers));
208    /// ```
209    pub fn received_frame_on_stream(
210        &self, stream: u64, frame: &H3iFrame,
211    ) -> bool {
212        self.stream_frame_map
213            .get(&stream)
214            .map(|v| v.contains(frame))
215            .is_some()
216    }
217
218    /// Check if the stream map is empty, e.g., no frames were received.
219    ///
220    /// # Example
221    ///
222    /// ```
223    /// use h3i::client::connection_summary::StreamMap;
224    /// use h3i::frame::EnrichedHeaders;
225    /// use h3i::frame::H3iFrame;
226    /// use quiche::h3::Header;
227    /// use std::iter::FromIterator;
228    ///
229    /// let mut stream_map = StreamMap::default();
230    /// assert!(stream_map.is_empty());
231    ///
232    /// let h = Header::new(b"hello", b"world");
233    /// let headers = H3iFrame::Headers(EnrichedHeaders::from(vec![h]));
234    ///
235    /// let stream_map: StreamMap = [(0, vec![headers.clone()])].into();
236    /// assert!(!stream_map.is_empty());
237    /// ```
238    pub fn is_empty(&self) -> bool {
239        self.stream_frame_map.is_empty()
240    }
241
242    /// See all HEADERS received on a given stream.
243    ///
244    /// # Example
245    ///
246    /// ```
247    /// use h3i::client::connection_summary::StreamMap;
248    /// use h3i::frame::EnrichedHeaders;
249    /// use h3i::frame::H3iFrame;
250    /// use quiche::h3::Header;
251    /// use std::iter::FromIterator;
252    ///
253    /// let h = Header::new(b"hello", b"world");
254    /// let enriched = EnrichedHeaders::from(vec![h]);
255    /// let headers = H3iFrame::Headers(enriched.clone());
256    /// let data = H3iFrame::QuicheH3(quiche::h3::frame::Frame::Data {
257    ///     payload: b"hello world".to_vec(),
258    /// });
259    ///
260    /// let stream_map: StreamMap = [(0, vec![headers.clone(), data.clone()])].into();
261    /// assert_eq!(stream_map.headers_on_stream(0), vec![enriched]);
262    /// ```
263    pub fn headers_on_stream(&self, stream_id: u64) -> Vec<EnrichedHeaders> {
264        self.stream(stream_id)
265            .into_iter()
266            .filter_map(|h3i_frame| h3i_frame.to_enriched_headers())
267            .collect()
268    }
269
270    /// If all [`CloseTriggerFrame`]s were seen. If no triggers were expected,
271    /// this will return `false`.
272    pub fn all_close_trigger_frames_seen(&self) -> bool {
273        if let Some(triggers) = self.close_trigger_frames.as_ref() {
274            triggers.saw_all_trigger_frames()
275        } else {
276            false
277        }
278    }
279
280    /// The set of all [`CloseTriggerFrame`]s that were _not_ seen on the
281    /// connection. Returns `None` if
282    pub fn missing_close_trigger_frames(&self) -> Option<Vec<CloseTriggerFrame>> {
283        self.close_trigger_frames
284            .as_ref()
285            .map(|e| e.missing_triggers())
286    }
287
288    ///  Not `pub` as users aren't expected to build their own [`StreamMap`]s.
289    pub(crate) fn new(close_trigger_frames: Option<CloseTriggerFrames>) -> Self {
290        Self {
291            close_trigger_frames,
292            ..Default::default()
293        }
294    }
295
296    pub(crate) fn insert(&mut self, stream_id: u64, frame: H3iFrame) {
297        if let Some(expected) = self.close_trigger_frames.as_mut() {
298            expected.receive_frame(stream_id, &frame);
299        }
300
301        self.stream_frame_map
302            .entry(stream_id)
303            .or_default()
304            .push(frame);
305    }
306
307    /// Close a [`quiche::Connection`] with the CONNECTION_CLOSE frame specified
308    /// by [`CloseTriggerFrames`]. If no [`CloseTriggerFrames`] exist, this is a
309    /// no-op.
310    pub(crate) fn close_due_to_trigger_frames(
311        &self, qconn: &mut quiche::Connection,
312    ) {
313        if let Some(ConnectionError {
314            is_app,
315            error_code,
316            reason,
317        }) = self.close_trigger_frames.as_ref().map(|tf| &tf.close_with)
318        {
319            let _ = qconn.close(*is_app, *error_code, reason);
320        }
321    }
322}
323
324/// A container for frames that h3i expects to see over a given connection. If
325/// h3i receives all the frames it expects, it will send a CONNECTION_CLOSE
326/// frame to the server. This bypasses the idle timeout and vastly quickens test
327/// suites which depend heavily on h3i.
328///
329/// The specific CONNECTION_CLOSE frame can be customized by passing a
330/// [`ConnectionError`] to [`Self::new_with_connection_close`]. h3i will send an
331/// application CONNECTION_CLOSE frame with error code 0x100 if this struct is
332/// constructed with the [`Self::new`] constructor.
333#[derive(Clone, Serialize, Debug)]
334pub struct CloseTriggerFrames {
335    missing: Vec<CloseTriggerFrame>,
336    #[serde(skip)]
337    close_with: ConnectionError,
338}
339
340impl CloseTriggerFrames {
341    /// Create a new [`CloseTriggerFrames`]. If all expected frames are
342    /// received, h3i will close the connection with an application-level
343    /// CONNECTION_CLOSE frame with error code 0x100.
344    pub fn new(frames: Vec<CloseTriggerFrame>) -> Self {
345        Self::new_with_connection_close(frames, ConnectionError {
346            is_app: true,
347            error_code: quiche::h3::WireErrorCode::NoError as u64,
348            reason: b"saw all close trigger frames".to_vec(),
349        })
350    }
351
352    /// Create a new [`CloseTriggerFrames`] with a custom close frame. When all
353    /// close trigger frames are received, h3i will close the connection with
354    /// the level, error code, and reason from `close_with`.
355    pub fn new_with_connection_close(
356        frames: Vec<CloseTriggerFrame>, close_with: ConnectionError,
357    ) -> Self {
358        Self {
359            missing: frames,
360            close_with,
361        }
362    }
363
364    fn receive_frame(&mut self, stream_id: u64, frame: &H3iFrame) {
365        for (i, trigger) in self.missing.iter_mut().enumerate() {
366            if trigger.is_equivalent(frame) && trigger.stream_id() == stream_id {
367                self.missing.remove(i);
368                break;
369            }
370        }
371    }
372
373    fn saw_all_trigger_frames(&self) -> bool {
374        self.missing.is_empty()
375    }
376
377    fn missing_triggers(&self) -> Vec<CloseTriggerFrame> {
378        self.missing.clone()
379    }
380}
381
382impl From<Vec<CloseTriggerFrame>> for CloseTriggerFrames {
383    fn from(value: Vec<CloseTriggerFrame>) -> Self {
384        Self::new(value)
385    }
386}
387
388/// Denotes why the connection was closed.
389#[derive(Debug, Default)]
390pub struct ConnectionCloseDetails {
391    peer_error: Option<ConnectionError>,
392    local_error: Option<ConnectionError>,
393    /// If the connection timed out.
394    pub timed_out: bool,
395}
396
397impl ConnectionCloseDetails {
398    pub fn new(qconn: &Connection) -> Self {
399        Self {
400            peer_error: qconn.peer_error().cloned(),
401            local_error: qconn.local_error().cloned(),
402            timed_out: qconn.is_timed_out(),
403        }
404    }
405
406    /// The error sent from the peer, if any.
407    pub fn peer_error(&self) -> Option<&ConnectionError> {
408        self.peer_error.as_ref()
409    }
410
411    /// The error generated locally, if any.
412    pub fn local_error(&self) -> Option<&ConnectionError> {
413        self.local_error.as_ref()
414    }
415
416    /// If the connection didn't see an error, either one from the peer or
417    /// generated locally.
418    pub fn no_err(&self) -> bool {
419        self.peer_error.is_none() && self.local_error.is_none()
420    }
421}
422
423impl Serialize for ConnectionCloseDetails {
424    fn serialize<S>(&self, s: S) -> Result<S::Ok, S::Error>
425    where
426        S: Serializer,
427    {
428        let mut state: <S as Serializer>::SerializeStruct =
429            s.serialize_struct("enriched_connection_error", 3)?;
430        if let Some(pe) = &self.peer_error {
431            state.serialize_field(
432                "peer_error",
433                &SerializableConnectionError(pe),
434            )?;
435        }
436
437        if let Some(le) = &self.local_error {
438            state.serialize_field(
439                "local_error",
440                &SerializableConnectionError(le),
441            )?;
442        }
443
444        state.serialize_field("timed_out", &self.timed_out)?;
445        state.end()
446    }
447}
448
449// Only applicable to async client
450#[doc(hidden)]
451/// A record that will be inserted into the [ConnectionSummary].
452pub enum ConnectionRecord {
453    StreamedFrame { stream_id: u64, frame: H3iFrame },
454    ConnectionStats(Stats),
455    PathStats(Vec<PathStats>),
456    Close(ConnectionCloseDetails),
457}
458
459/// A wrapper to help serialize [quiche::PathStats]
460pub struct SerializablePathStats<'a>(&'a quiche::PathStats);
461
462impl Serialize for SerializablePathStats<'_> {
463    fn serialize<S>(&self, s: S) -> Result<S::Ok, S::Error>
464    where
465        S: Serializer,
466    {
467        let mut state = s.serialize_struct("path_stats", 17)?;
468        state.serialize_field("local_addr", &self.0.local_addr)?;
469        state.serialize_field("peer_addr", &self.0.peer_addr)?;
470        state.serialize_field("active", &self.0.active)?;
471        state.serialize_field("recv", &self.0.recv)?;
472        state.serialize_field("sent", &self.0.sent)?;
473        state.serialize_field("lost", &self.0.lost)?;
474        state.serialize_field("retrans", &self.0.retrans)?;
475        state.serialize_field("rtt", &self.0.rtt.as_secs_f64())?;
476        state.serialize_field(
477            "min_rtt",
478            &self.0.min_rtt.map(|x| x.as_secs_f64()),
479        )?;
480        state.serialize_field("rttvar", &self.0.rttvar.as_secs_f64())?;
481        state.serialize_field("cwnd", &self.0.cwnd)?;
482        state.serialize_field("sent_bytes", &self.0.sent_bytes)?;
483        state.serialize_field("recv_bytes", &self.0.recv_bytes)?;
484        state.serialize_field("lost_bytes", &self.0.lost_bytes)?;
485        state.serialize_field(
486            "stream_retrans_bytes",
487            &self.0.stream_retrans_bytes,
488        )?;
489        state.serialize_field("pmtu", &self.0.pmtu)?;
490        state.serialize_field("delivery_rate", &self.0.delivery_rate)?;
491        state.end()
492    }
493}
494
495/// A wrapper to help serialize [quiche::Stats]
496pub struct SerializableStats<'a>(&'a quiche::Stats);
497
498impl Serialize for SerializableStats<'_> {
499    fn serialize<S>(&self, s: S) -> Result<S::Ok, S::Error>
500    where
501        S: Serializer,
502    {
503        let mut state = s.serialize_struct("path_stats", 14)?;
504        state.serialize_field("recv", &self.0.recv)?;
505        state.serialize_field("sent", &self.0.sent)?;
506        state.serialize_field("lost", &self.0.lost)?;
507        state.serialize_field("retrans", &self.0.retrans)?;
508        state.serialize_field("sent_bytes", &self.0.sent_bytes)?;
509        state.serialize_field("recv_bytes", &self.0.recv_bytes)?;
510        state.serialize_field("lost_bytes", &self.0.lost_bytes)?;
511        state.serialize_field(
512            "stream_retrans_bytes",
513            &self.0.stream_retrans_bytes,
514        )?;
515        state.serialize_field("paths_count", &self.0.paths_count)?;
516        state.serialize_field(
517            "reset_stream_count_local",
518            &self.0.reset_stream_count_local,
519        )?;
520        state.serialize_field(
521            "stopped_stream_count_local",
522            &self.0.stopped_stream_count_local,
523        )?;
524        state.serialize_field(
525            "reset_stream_count_remote",
526            &self.0.reset_stream_count_remote,
527        )?;
528        state.serialize_field(
529            "stopped_stream_count_remote",
530            &self.0.stopped_stream_count_remote,
531        )?;
532        state.serialize_field(
533            "path_challenge_rx_count",
534            &self.0.path_challenge_rx_count,
535        )?;
536        state.end()
537    }
538}
539
540/// A wrapper to help serialize a [quiche::ConnectionError]
541#[derive(Clone, Debug)]
542pub struct SerializableConnectionError<'a>(&'a quiche::ConnectionError);
543
544impl Serialize for SerializableConnectionError<'_> {
545    fn serialize<S>(&self, s: S) -> Result<S::Ok, S::Error>
546    where
547        S: Serializer,
548    {
549        let mut state = s.serialize_struct("path_stats", 3)?;
550        state.serialize_field("is_app", &self.0.is_app)?;
551        state.serialize_field("error_code", &self.0.error_code)?;
552        let max = cmp::min(self.0.reason.len(), MAX_SERIALIZED_BUFFER_LEN);
553        state.serialize_field(
554            "reason",
555            &String::from_utf8_lossy(&self.0.reason[..max]),
556        )?;
557        state.end()
558    }
559}
560
561#[cfg(test)]
562mod tests {
563    use super::*;
564    use crate::frame::EnrichedHeaders;
565    use quiche::h3::Header;
566
567    fn h3i_frame() -> H3iFrame {
568        vec![Header::new(b"hello", b"world")].into()
569    }
570
571    #[test]
572    fn close_trigger_frame() {
573        let frame = h3i_frame();
574        let mut triggers = CloseTriggerFrames::new(vec![CloseTriggerFrame::new(
575            0,
576            frame.clone(),
577        )]);
578
579        triggers.receive_frame(0, &frame);
580
581        assert!(triggers.saw_all_trigger_frames());
582    }
583
584    #[test]
585    fn trigger_frame_missing() {
586        let frame = h3i_frame();
587        let expected_frames = vec![
588            CloseTriggerFrame::new(0, frame.clone()),
589            CloseTriggerFrame::new(4, frame.clone()),
590            CloseTriggerFrame::new(8, vec![Header::new(b"go", b"jets")]),
591        ];
592        let mut expected = CloseTriggerFrames::new(expected_frames.clone());
593
594        expected.receive_frame(0, &frame);
595
596        assert!(!expected.saw_all_trigger_frames());
597        assert_eq!(expected.missing_triggers(), expected_frames[1..].to_vec());
598    }
599
600    fn stream_map_data() -> Vec<H3iFrame> {
601        let headers =
602            H3iFrame::Headers(EnrichedHeaders::from(vec![Header::new(
603                b"hello", b"world",
604            )]));
605        let data = H3iFrame::QuicheH3(quiche::h3::frame::Frame::Data {
606            payload: b"hello world".to_vec(),
607        });
608
609        vec![headers, data]
610    }
611
612    #[test]
613    fn test_stream_map_trigger_frames_with_none() {
614        let stream_map: StreamMap = vec![(0, stream_map_data())].into();
615        assert!(!stream_map.all_close_trigger_frames_seen());
616    }
617
618    #[test]
619    fn test_stream_map_trigger_frames() {
620        let data = stream_map_data();
621        let mut stream_map = StreamMap::new(Some(
622            vec![
623                CloseTriggerFrame::new(0, data[0].clone()),
624                CloseTriggerFrame::new(0, data[1].clone()),
625            ]
626            .into(),
627        ));
628
629        stream_map.insert(0, data[0].clone());
630        assert!(!stream_map.all_close_trigger_frames_seen());
631        assert_eq!(stream_map.missing_close_trigger_frames().unwrap(), vec![
632            CloseTriggerFrame::new(0, data[1].clone())
633        ]);
634    }
635}