Skip to main content

qlog/
reader.rs

1// Copyright (C) 2023, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27use std::fs::File;
28use std::io::BufReader;
29use std::path::Path;
30
31use crate::QlogSeq;
32use crate::SQLOG_EXT;
33use crate::SQLOG_GZ_EXT;
34use crate::SQLOG_ZST_EXT;
35
36/// Represents the format of the read event.
37#[allow(clippy::large_enum_variant)]
38#[derive(Clone, Debug)]
39pub enum Event {
40    /// A native qlog event type.
41    Qlog(crate::events::Event),
42
43    // An extended JSON event type.
44    Json(crate::events::JsonEvent),
45}
46
47/// A helper object specialized for reading JSON-SEQ qlog from a [`BufRead`]
48/// trait.
49///
50/// [`BufRead`]: https://doc.rust-lang.org/std/io/trait.BufRead.html
51pub struct QlogSeqReader<'a> {
52    pub qlog: QlogSeq,
53    reader: Box<dyn std::io::BufRead + Send + Sync + 'a>,
54}
55
56impl<'a> QlogSeqReader<'a> {
57    pub fn new(
58        mut reader: Box<dyn std::io::BufRead + Send + Sync + 'a>,
59    ) -> Result<Self, Box<dyn std::error::Error>> {
60        // "null record" skip it
61        Self::read_record(reader.as_mut());
62
63        let header = Self::read_record(reader.as_mut()).ok_or_else(|| {
64            std::io::Error::other("error reading file header bytes")
65        })?;
66
67        let res: Result<QlogSeq, serde_json::Error> =
68            serde_json::from_slice(&header);
69        match res {
70            Ok(qlog) => Ok(Self { qlog, reader }),
71
72            Err(e) => Err(e.into()),
73        }
74    }
75
76    /// Convenience constructor that opens `path` and picks a streaming
77    /// decoder based on the file's compound extension:
78    ///
79    /// * `*.sqlog` -> raw JSON-SEQ (always available).
80    /// * `*.sqlog.gz` -> gzip via `flate2` (requires the `gzip` feature).
81    /// * `*.sqlog.zst` -> zstd via `zstd` (requires the `zstd` feature).
82    ///
83    /// The dispatch tests the *compound* suffix (`.sqlog.gz`,
84    /// `.sqlog.zst`, `.sqlog`) on the full filename, in that order.
85    /// This rejects bare `.gz` or `.zst` names that do not also carry
86    /// the `.sqlog` segment, and avoids the trap where
87    /// [`Path::extension`] strips only the last component (which
88    /// would silently accept `something.tar.gz`).
89    ///
90    /// Unknown extensions, or compressed extensions whose matching
91    /// feature is not enabled, return an [`std::io::ErrorKind::Unsupported`]
92    /// error with a message pointing at the feature that is needed.
93    ///
94    /// This is the intended single entry point for reading a qlog
95    /// file regardless of compression.
96    pub fn with_file(
97        path: impl AsRef<Path>,
98    ) -> Result<Self, Box<dyn std::error::Error>> {
99        let path = path.as_ref();
100        let file = File::open(path)?;
101        let name = path.file_name().and_then(|s| s.to_str()).unwrap_or("");
102
103        // Order matters: `.sqlog.gz` and `.sqlog.zst` must be tested
104        // before `.sqlog` (every compressed suffix also ends with
105        // `.sqlog` if you only look at the last extension).
106        //
107        // Each compound suffix is matched exactly once. The
108        // `#[cfg(feature = "...")]` lives inside the `if` body so the
109        // disabled-feature path can return a helpful error rather
110        // than falling through to the unknown-extension branch.
111        if name.ends_with(SQLOG_GZ_EXT) {
112            #[cfg(feature = "gzip")]
113            {
114                let reader: Box<dyn std::io::BufRead + Send + Sync> =
115                    Box::new(BufReader::new(flate2::read::GzDecoder::new(file)));
116                return Self::new(reader);
117            }
118            #[cfg(not(feature = "gzip"))]
119            return Err(std::io::Error::new(
120                std::io::ErrorKind::Unsupported,
121                format!(
122                    "qlog file {name:?} requires the `gzip` feature on \
123                     the qlog crate to decode"
124                ),
125            )
126            .into());
127        }
128        if name.ends_with(SQLOG_ZST_EXT) {
129            #[cfg(feature = "zstd")]
130            {
131                let decoder = zstd::Decoder::new(file)?;
132                let reader: Box<dyn std::io::BufRead + Send + Sync> =
133                    Box::new(BufReader::new(decoder));
134                return Self::new(reader);
135            }
136            #[cfg(not(feature = "zstd"))]
137            return Err(std::io::Error::new(
138                std::io::ErrorKind::Unsupported,
139                format!(
140                    "qlog file {name:?} requires the `zstd` feature on \
141                     the qlog crate to decode"
142                ),
143            )
144            .into());
145        }
146        if name.ends_with(SQLOG_EXT) {
147            let reader: Box<dyn std::io::BufRead + Send + Sync> =
148                Box::new(BufReader::new(file));
149            return Self::new(reader);
150        }
151
152        Err(std::io::Error::new(
153            std::io::ErrorKind::Unsupported,
154            format!(
155                "qlog file {name:?} does not match a known qlog \
156                 extension ({SQLOG_EXT}, {SQLOG_GZ_EXT}, {SQLOG_ZST_EXT})"
157            ),
158        )
159        .into())
160    }
161
162    fn read_record(
163        reader: &mut (dyn std::io::BufRead + Send + Sync),
164    ) -> Option<Vec<u8>> {
165        let mut buf = Vec::<u8>::new();
166        let size = reader.read_until(b'', &mut buf).unwrap();
167        if size <= 1 {
168            return None;
169        }
170
171        buf.truncate(buf.len() - 1);
172
173        Some(buf)
174    }
175}
176
177impl Iterator for QlogSeqReader<'_> {
178    type Item = Event;
179
180    #[inline]
181    fn next(&mut self) -> Option<Self::Item> {
182        // Attempt to deserialize events but skip them if that fails for any
183        // reason, ensuring we always read all bytes in the reader.
184        while let Some(bytes) = Self::read_record(&mut self.reader) {
185            let r: serde_json::Result<crate::events::Event> =
186                serde_json::from_slice(&bytes);
187
188            if let Ok(event) = r {
189                return Some(Event::Qlog(event));
190            }
191
192            let r: serde_json::Result<crate::events::JsonEvent> =
193                serde_json::from_slice(&bytes);
194
195            if let Ok(event) = r {
196                return Some(Event::Json(event));
197            }
198        }
199
200        None
201    }
202}