qlog/
streamer.rs

1// Copyright (C) 2021, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27use crate::events::EventData;
28use crate::events::EventImportance;
29use crate::events::EventType;
30use crate::events::Eventable;
31use crate::events::ExData;
32
33/// A helper object specialized for streaming JSON-serialized qlog to a
34/// [`Write`] trait.
35///
36/// The object is responsible for the `Qlog` object that contains the
37/// provided `Trace`.
38///
39/// Serialization is progressively driven by method calls; once log streaming
40/// is started, `event::Events` can be written using `add_event()`.
41///
42/// [`Write`]: https://doc.rust-lang.org/std/io/trait.Write.html
43use super::*;
44
45#[derive(PartialEq, Eq, Debug)]
46pub enum StreamerState {
47    Initial,
48    Ready,
49    Finished,
50}
51
52pub struct QlogStreamer {
53    start_time: std::time::Instant,
54    writer: Box<dyn std::io::Write + Send + Sync>,
55    qlog: QlogSeq,
56    state: StreamerState,
57    log_level: EventImportance,
58}
59
60impl QlogStreamer {
61    /// Creates a [QlogStreamer] object.
62    ///
63    /// It owns a [QlogSeq] object that contains the provided [TraceSeq]
64    /// containing [Event]s.
65    ///
66    /// All serialization will be written to the provided [`Write`] using the
67    /// JSON-SEQ format.
68    ///
69    /// [`Write`]: https://doc.rust-lang.org/std/io/trait.Write.html
70    #[allow(clippy::too_many_arguments)]
71    pub fn new(
72        qlog_version: String, title: Option<String>, description: Option<String>,
73        summary: Option<String>, start_time: std::time::Instant, trace: TraceSeq,
74        log_level: EventImportance,
75        writer: Box<dyn std::io::Write + Send + Sync>,
76    ) -> Self {
77        let qlog = QlogSeq {
78            qlog_version,
79            qlog_format: "JSON-SEQ".to_string(),
80            title,
81            description,
82            summary,
83            trace,
84        };
85
86        QlogStreamer {
87            start_time,
88            writer,
89            qlog,
90            state: StreamerState::Initial,
91            log_level,
92        }
93    }
94
95    /// Starts qlog streaming serialization.
96    ///
97    /// This writes out the JSON-SEQ-serialized form of all initial qlog
98    /// information. [Event]s are separately appended using [add_event()],
99    /// [add_event_with_instant()], [add_event_now()],
100    /// [add_event_data_with_instant()], or [add_event_data_now()].
101    ///
102    /// [add_event()]: #method.add_event
103    /// [add_event_with_instant()]: #method.add_event_with_instant
104    /// [add_event_now()]: #method.add_event_now
105    /// [add_event_data_with_instant()]: #method.add_event_data_with_instant
106    /// [add_event_data_now()]: #method.add_event_data_now
107    pub fn start_log(&mut self) -> Result<()> {
108        if self.state != StreamerState::Initial {
109            return Err(Error::Done);
110        }
111
112        self.writer.as_mut().write_all(b"")?;
113        serde_json::to_writer(self.writer.as_mut(), &self.qlog)
114            .map_err(|_| Error::Done)?;
115        self.writer.as_mut().write_all(b"\n")?;
116
117        self.state = StreamerState::Ready;
118
119        Ok(())
120    }
121
122    /// Finishes qlog streaming serialization.
123    ///
124    /// After this is called, no more serialization will occur.
125    pub fn finish_log(&mut self) -> Result<()> {
126        if self.state == StreamerState::Initial ||
127            self.state == StreamerState::Finished
128        {
129            return Err(Error::InvalidState);
130        }
131
132        self.state = StreamerState::Finished;
133
134        self.writer.as_mut().flush()?;
135
136        Ok(())
137    }
138
139    /// Writes a serializable to a JSON-SEQ record using
140    /// [std::time::Instant::now()].
141    pub fn add_event_now<E: Serialize + Eventable>(
142        &mut self, event: E,
143    ) -> Result<()> {
144        let now = std::time::Instant::now();
145
146        self.add_event_with_instant(event, now)
147    }
148
149    /// Writes a serializable to a pretty-printed JSON-SEQ record using
150    /// [std::time::Instant::now()].
151    pub fn add_event_now_pretty<E: Serialize + Eventable>(
152        &mut self, event: E,
153    ) -> Result<()> {
154        let now = std::time::Instant::now();
155
156        self.add_event_with_instant_pretty(event, now)
157    }
158
159    /// Writes a serializable to a JSON-SEQ record using the provided
160    /// [std::time::Instant].
161    pub fn add_event_with_instant<E: Serialize + Eventable>(
162        &mut self, event: E, now: std::time::Instant,
163    ) -> Result<()> {
164        self.event_with_instant(event, now, false)
165    }
166
167    /// Writes a serializable to a pretty-printed JSON-SEQ record using the
168    /// provided [std::time::Instant].
169    pub fn add_event_with_instant_pretty<E: Serialize + Eventable>(
170        &mut self, event: E, now: std::time::Instant,
171    ) -> Result<()> {
172        self.event_with_instant(event, now, true)
173    }
174
175    fn event_with_instant<E: Serialize + Eventable>(
176        &mut self, mut event: E, now: std::time::Instant, pretty: bool,
177    ) -> Result<()> {
178        if self.state != StreamerState::Ready {
179            return Err(Error::InvalidState);
180        }
181
182        if !event.importance().is_contained_in(&self.log_level) {
183            return Err(Error::Done);
184        }
185
186        let dur = if cfg!(test) {
187            std::time::Duration::from_secs(0)
188        } else {
189            now.duration_since(self.start_time)
190        };
191
192        let rel_time = dur.as_secs_f32() * 1000.0;
193        event.set_time(rel_time);
194
195        if pretty {
196            self.add_event_pretty(event)
197        } else {
198            self.add_event(event)
199        }
200    }
201
202    /// Writes an [Event] based on the provided [EventData] to a JSON-SEQ record
203    /// at time [std::time::Instant::now()].
204    pub fn add_event_data_now(&mut self, event_data: EventData) -> Result<()> {
205        self.add_event_data_ex_now(event_data, Default::default())
206    }
207
208    /// Writes an [Event] based on the provided [EventData] to a pretty-printed
209    /// JSON-SEQ record at time [std::time::Instant::now()].
210    pub fn add_event_data_now_pretty(
211        &mut self, event_data: EventData,
212    ) -> Result<()> {
213        self.add_event_data_ex_now_pretty(event_data, Default::default())
214    }
215
216    /// Writes an [Event] based on the provided [EventData] and [ExData] to a
217    /// JSON-SEQ record at time [std::time::Instant::now()].
218    pub fn add_event_data_ex_now(
219        &mut self, event_data: EventData, ex_data: ExData,
220    ) -> Result<()> {
221        let now = std::time::Instant::now();
222
223        self.add_event_data_ex_with_instant(event_data, ex_data, now)
224    }
225
226    /// Writes an [Event] based on the provided [EventData] and [ExData] to a
227    /// pretty-printed JSON-SEQ record at time [std::time::Instant::now()].
228    pub fn add_event_data_ex_now_pretty(
229        &mut self, event_data: EventData, ex_data: ExData,
230    ) -> Result<()> {
231        let now = std::time::Instant::now();
232
233        self.add_event_data_ex_with_instant_pretty(event_data, ex_data, now)
234    }
235
236    /// Writes an [Event] based on the provided [EventData] and
237    /// [std::time::Instant] to a JSON-SEQ record.
238    pub fn add_event_data_with_instant(
239        &mut self, event_data: EventData, now: std::time::Instant,
240    ) -> Result<()> {
241        self.add_event_data_ex_with_instant(event_data, Default::default(), now)
242    }
243
244    /// Writes an [Event] based on the provided [EventData] and
245    /// [std::time::Instant] to a pretty-printed JSON-SEQ record.
246    pub fn add_event_data_with_instant_pretty(
247        &mut self, event_data: EventData, now: std::time::Instant,
248    ) -> Result<()> {
249        self.add_event_data_ex_with_instant_pretty(
250            event_data,
251            Default::default(),
252            now,
253        )
254    }
255
256    /// Writes an [Event] based on the provided [EventData], [ExData], and
257    /// [std::time::Instant] to a JSON-SEQ record.
258    pub fn add_event_data_ex_with_instant(
259        &mut self, event_data: EventData, ex_data: ExData,
260        now: std::time::Instant,
261    ) -> Result<()> {
262        self.event_data_ex_with_instant(event_data, ex_data, now, false)
263    }
264
265    // Writes an [Event] based on the provided [EventData], [ExData], and
266    /// [std::time::Instant] to a pretty-printed JSON-SEQ record.
267    pub fn add_event_data_ex_with_instant_pretty(
268        &mut self, event_data: EventData, ex_data: ExData,
269        now: std::time::Instant,
270    ) -> Result<()> {
271        self.event_data_ex_with_instant(event_data, ex_data, now, true)
272    }
273
274    fn event_data_ex_with_instant(
275        &mut self, event_data: EventData, ex_data: ExData,
276        now: std::time::Instant, pretty: bool,
277    ) -> Result<()> {
278        if self.state != StreamerState::Ready {
279            return Err(Error::InvalidState);
280        }
281
282        let ty = EventType::from(&event_data);
283        if !EventImportance::from(ty).is_contained_in(&self.log_level) {
284            return Err(Error::Done);
285        }
286
287        let dur = if cfg!(test) {
288            std::time::Duration::from_secs(0)
289        } else {
290            now.duration_since(self.start_time)
291        };
292
293        let rel_time = dur.as_secs_f32() * 1000.0;
294        let event = Event::with_time_ex(rel_time, event_data, ex_data);
295
296        if pretty {
297            self.add_event_pretty(event)
298        } else {
299            self.add_event(event)
300        }
301    }
302
303    /// Writes a JSON-SEQ-serialized [Event] using the provided [Event].
304    pub fn add_event<E: Serialize + Eventable>(
305        &mut self, event: E,
306    ) -> Result<()> {
307        self.write_event(event, false)
308    }
309
310    /// Writes a pretty-printed JSON-SEQ-serialized [Event] using the provided
311    /// [Event].
312    pub fn add_event_pretty<E: Serialize + Eventable>(
313        &mut self, event: E,
314    ) -> Result<()> {
315        self.write_event(event, true)
316    }
317
318    /// Writes a JSON-SEQ-serialized [Event] using the provided [Event].
319    fn write_event<E: Serialize + Eventable>(
320        &mut self, event: E, pretty: bool,
321    ) -> Result<()> {
322        if self.state != StreamerState::Ready {
323            return Err(Error::InvalidState);
324        }
325
326        if !event.importance().is_contained_in(&self.log_level) {
327            return Err(Error::Done);
328        }
329
330        self.writer.as_mut().write_all(b"")?;
331        if pretty {
332            serde_json::to_writer_pretty(self.writer.as_mut(), &event)
333                .map_err(|_| Error::Done)?;
334        } else {
335            serde_json::to_writer(self.writer.as_mut(), &event)
336                .map_err(|_| Error::Done)?;
337        }
338        self.writer.as_mut().write_all(b"\n")?;
339
340        Ok(())
341    }
342
343    /// Returns the writer.
344    #[allow(clippy::borrowed_box)]
345    pub fn writer(&self) -> &Box<dyn std::io::Write + Send + Sync> {
346        &self.writer
347    }
348
349    pub fn start_time(&self) -> std::time::Instant {
350        self.start_time
351    }
352}
353
354impl Drop for QlogStreamer {
355    fn drop(&mut self) {
356        let _ = self.finish_log();
357    }
358}
359
360#[cfg(test)]
361mod tests {
362    use std::collections::BTreeMap;
363
364    use super::*;
365    use crate::events::quic;
366    use crate::events::quic::QuicFrame;
367    use crate::events::RawInfo;
368    use smallvec::smallvec;
369    use testing::*;
370
371    use serde_json::json;
372
373    #[test]
374    fn serialization_states() {
375        let v: Vec<u8> = Vec::new();
376        let buff = std::io::Cursor::new(v);
377        let writer = Box::new(buff);
378
379        let trace = make_trace_seq();
380        let pkt_hdr = make_pkt_hdr(quic::PacketType::Handshake);
381        let raw = Some(RawInfo {
382            length: Some(1251),
383            payload_length: Some(1224),
384            data: None,
385        });
386
387        let frame1 = QuicFrame::Stream {
388            stream_id: 40,
389            offset: 40,
390            length: 400,
391            fin: Some(true),
392            raw: None,
393        };
394
395        let event_data1 = EventData::PacketSent(quic::PacketSent {
396            header: pkt_hdr.clone(),
397            frames: Some(smallvec![frame1]),
398            raw: raw.clone(),
399            ..Default::default()
400        });
401
402        let ev1 = Event::with_time(0.0, event_data1);
403
404        let frame2 = QuicFrame::Stream {
405            stream_id: 0,
406            offset: 0,
407            length: 100,
408            fin: Some(true),
409            raw: None,
410        };
411
412        let frame3 = QuicFrame::Stream {
413            stream_id: 0,
414            offset: 0,
415            length: 100,
416            fin: Some(true),
417            raw: None,
418        };
419
420        let event_data2 = EventData::PacketSent(quic::PacketSent {
421            header: pkt_hdr.clone(),
422            frames: Some(smallvec![frame2]),
423            raw: raw.clone(),
424            ..Default::default()
425        });
426
427        let ev2 = Event::with_time(0.0, event_data2);
428
429        let event_data3 = EventData::PacketSent(quic::PacketSent {
430            header: pkt_hdr,
431            frames: Some(smallvec![frame3]),
432            stateless_reset_token: Some("reset_token".to_string()),
433            raw,
434            ..Default::default()
435        });
436
437        let ev3 = Event::with_time(0.0, event_data3);
438
439        let mut s = streamer::QlogStreamer::new(
440            "version".to_string(),
441            Some("title".to_string()),
442            Some("description".to_string()),
443            None,
444            std::time::Instant::now(),
445            trace,
446            EventImportance::Base,
447            writer,
448        );
449
450        // Before the log is started all other operations should fail.
451        assert!(matches!(s.add_event(ev2.clone()), Err(Error::InvalidState)));
452        assert!(matches!(s.finish_log(), Err(Error::InvalidState)));
453
454        // Start log and add a simple event.
455        assert!(matches!(s.start_log(), Ok(())));
456        assert!(matches!(s.add_event(ev1), Ok(())));
457
458        // Add some more events.
459        assert!(matches!(s.add_event(ev2), Ok(())));
460        assert!(matches!(s.add_event(ev3.clone()), Ok(())));
461
462        // Adding an event with an external time should work too.
463        // For tests, it will resolve to 0 but we care about proving the API
464        // here, not timing specifics.
465        let now = std::time::Instant::now();
466
467        assert!(matches!(s.add_event_with_instant(ev3, now), Ok(())));
468
469        assert!(matches!(s.finish_log(), Ok(())));
470
471        let r = s.writer();
472        #[allow(clippy::borrowed_box)]
473        let w: &Box<std::io::Cursor<Vec<u8>>> = unsafe { std::mem::transmute(r) };
474
475        let log_string = r#"{"qlog_version":"version","qlog_format":"JSON-SEQ","title":"title","description":"description","trace":{"vantage_point":{"type":"server"},"title":"Quiche qlog trace","description":"Quiche qlog trace description","configuration":{"time_offset":0.0}}}
476{"time":0.0,"name":"transport:packet_sent","data":{"header":{"packet_type":"handshake","packet_number":0,"version":"1","scil":8,"dcil":8,"scid":"7e37e4dcc6682da8","dcid":"36ce104eee50101c"},"raw":{"length":1251,"payload_length":1224},"frames":[{"frame_type":"stream","stream_id":40,"offset":40,"length":400,"fin":true}]}}
477{"time":0.0,"name":"transport:packet_sent","data":{"header":{"packet_type":"handshake","packet_number":0,"version":"1","scil":8,"dcil":8,"scid":"7e37e4dcc6682da8","dcid":"36ce104eee50101c"},"raw":{"length":1251,"payload_length":1224},"frames":[{"frame_type":"stream","stream_id":0,"offset":0,"length":100,"fin":true}]}}
478{"time":0.0,"name":"transport:packet_sent","data":{"header":{"packet_type":"handshake","packet_number":0,"version":"1","scil":8,"dcil":8,"scid":"7e37e4dcc6682da8","dcid":"36ce104eee50101c"},"stateless_reset_token":"reset_token","raw":{"length":1251,"payload_length":1224},"frames":[{"frame_type":"stream","stream_id":0,"offset":0,"length":100,"fin":true}]}}
479{"time":0.0,"name":"transport:packet_sent","data":{"header":{"packet_type":"handshake","packet_number":0,"version":"1","scil":8,"dcil":8,"scid":"7e37e4dcc6682da8","dcid":"36ce104eee50101c"},"stateless_reset_token":"reset_token","raw":{"length":1251,"payload_length":1224},"frames":[{"frame_type":"stream","stream_id":0,"offset":0,"length":100,"fin":true}]}}
480"#;
481
482        let written_string = std::str::from_utf8(w.as_ref().get_ref()).unwrap();
483
484        assert_eq!(log_string, written_string);
485    }
486
487    #[test]
488    fn stream_json_event() {
489        let data = json!({"foo": "Bar", "hello": 123});
490        let ev = events::JsonEvent {
491            time: 0.0,
492            importance: events::EventImportance::Core,
493            name: "jsonevent:sample".into(),
494            data,
495        };
496
497        let v: Vec<u8> = Vec::new();
498        let buff = std::io::Cursor::new(v);
499        let writer = Box::new(buff);
500
501        let trace = make_trace_seq();
502
503        let mut s = streamer::QlogStreamer::new(
504            "version".to_string(),
505            Some("title".to_string()),
506            Some("description".to_string()),
507            None,
508            std::time::Instant::now(),
509            trace,
510            EventImportance::Base,
511            writer,
512        );
513
514        assert!(matches!(s.start_log(), Ok(())));
515        assert!(matches!(s.add_event(ev), Ok(())));
516        assert!(matches!(s.finish_log(), Ok(())));
517
518        let r = s.writer();
519        #[allow(clippy::borrowed_box)]
520        let w: &Box<std::io::Cursor<Vec<u8>>> = unsafe { std::mem::transmute(r) };
521
522        let log_string = r#"{"qlog_version":"version","qlog_format":"JSON-SEQ","title":"title","description":"description","trace":{"vantage_point":{"type":"server"},"title":"Quiche qlog trace","description":"Quiche qlog trace description","configuration":{"time_offset":0.0}}}
523{"time":0.0,"name":"jsonevent:sample","data":{"foo":"Bar","hello":123}}
524"#;
525
526        let written_string = std::str::from_utf8(w.as_ref().get_ref()).unwrap();
527
528        assert_eq!(log_string, written_string);
529    }
530
531    #[test]
532    fn stream_data_ex() {
533        let v: Vec<u8> = Vec::new();
534        let buff = std::io::Cursor::new(v);
535        let writer = Box::new(buff);
536
537        let trace = make_trace_seq();
538        let pkt_hdr = make_pkt_hdr(quic::PacketType::Handshake);
539        let raw = Some(RawInfo {
540            length: Some(1251),
541            payload_length: Some(1224),
542            data: None,
543        });
544
545        let frame1 = QuicFrame::Stream {
546            stream_id: 40,
547            offset: 40,
548            length: 400,
549            fin: Some(true),
550            raw: None,
551        };
552
553        let event_data1 = EventData::PacketSent(quic::PacketSent {
554            header: pkt_hdr.clone(),
555            frames: Some(smallvec![frame1]),
556            raw: raw.clone(),
557            ..Default::default()
558        });
559        let j1 = json!({"foo": "Bar", "hello": 123});
560        let j2 = json!({"baz": [1,2,3,4]});
561        let mut ex_data = BTreeMap::new();
562        ex_data.insert("first".to_string(), j1);
563        ex_data.insert("second".to_string(), j2);
564
565        let ev1 = Event::with_time_ex(0.0, event_data1, ex_data);
566
567        let frame2 = QuicFrame::Stream {
568            stream_id: 1,
569            offset: 0,
570            length: 100,
571            fin: Some(true),
572            raw: None,
573        };
574
575        let event_data2 = EventData::PacketSent(quic::PacketSent {
576            header: pkt_hdr.clone(),
577            frames: Some(smallvec![frame2]),
578            raw: raw.clone(),
579            ..Default::default()
580        });
581
582        let ev2 = Event::with_time(0.0, event_data2);
583
584        let mut s = streamer::QlogStreamer::new(
585            "version".to_string(),
586            Some("title".to_string()),
587            Some("description".to_string()),
588            None,
589            std::time::Instant::now(),
590            trace,
591            EventImportance::Base,
592            writer,
593        );
594
595        assert!(matches!(s.start_log(), Ok(())));
596        assert!(matches!(s.add_event(ev1), Ok(())));
597        assert!(matches!(s.add_event(ev2), Ok(())));
598        assert!(matches!(s.finish_log(), Ok(())));
599
600        let r = s.writer();
601        #[allow(clippy::borrowed_box)]
602        let w: &Box<std::io::Cursor<Vec<u8>>> = unsafe { std::mem::transmute(r) };
603
604        let log_string = r#"{"qlog_version":"version","qlog_format":"JSON-SEQ","title":"title","description":"description","trace":{"vantage_point":{"type":"server"},"title":"Quiche qlog trace","description":"Quiche qlog trace description","configuration":{"time_offset":0.0}}}
605{"time":0.0,"name":"transport:packet_sent","data":{"header":{"packet_type":"handshake","packet_number":0,"version":"1","scil":8,"dcil":8,"scid":"7e37e4dcc6682da8","dcid":"36ce104eee50101c"},"raw":{"length":1251,"payload_length":1224},"frames":[{"frame_type":"stream","stream_id":40,"offset":40,"length":400,"fin":true}]},"first":{"foo":"Bar","hello":123},"second":{"baz":[1,2,3,4]}}
606{"time":0.0,"name":"transport:packet_sent","data":{"header":{"packet_type":"handshake","packet_number":0,"version":"1","scil":8,"dcil":8,"scid":"7e37e4dcc6682da8","dcid":"36ce104eee50101c"},"raw":{"length":1251,"payload_length":1224},"frames":[{"frame_type":"stream","stream_id":1,"offset":0,"length":100,"fin":true}]}}
607"#;
608
609        let written_string = std::str::from_utf8(w.as_ref().get_ref()).unwrap();
610
611        assert_eq!(log_string, written_string);
612    }
613}