qlog/reader.rs
1// Copyright (C) 2023, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8// * Redistributions of source code must retain the above copyright notice,
9// this list of conditions and the following disclaimer.
10//
11// * Redistributions in binary form must reproduce the above copyright
12// notice, this list of conditions and the following disclaimer in the
13// documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27use std::fs::File;
28use std::io::BufReader;
29use std::path::Path;
30
31use crate::QlogSeq;
32use crate::SQLOG_EXT;
33use crate::SQLOG_GZ_EXT;
34use crate::SQLOG_ZST_EXT;
35
36/// Represents the format of the read event.
37#[allow(clippy::large_enum_variant)]
38#[derive(Clone, Debug)]
39pub enum Event {
40 /// A native qlog event type.
41 Qlog(crate::events::Event),
42
43 // An extended JSON event type.
44 Json(crate::events::JsonEvent),
45}
46
47/// A helper object specialized for reading JSON-SEQ qlog from a [`BufRead`]
48/// trait.
49///
50/// [`BufRead`]: https://doc.rust-lang.org/std/io/trait.BufRead.html
51pub struct QlogSeqReader<'a> {
52 pub qlog: QlogSeq,
53 reader: Box<dyn std::io::BufRead + Send + Sync + 'a>,
54}
55
56impl<'a> QlogSeqReader<'a> {
57 pub fn new(
58 mut reader: Box<dyn std::io::BufRead + Send + Sync + 'a>,
59 ) -> Result<Self, Box<dyn std::error::Error>> {
60 // "null record" skip it
61 Self::read_record(reader.as_mut());
62
63 let header = Self::read_record(reader.as_mut()).ok_or_else(|| {
64 std::io::Error::other("error reading file header bytes")
65 })?;
66
67 let res: Result<QlogSeq, serde_json::Error> =
68 serde_json::from_slice(&header);
69 match res {
70 Ok(qlog) => Ok(Self { qlog, reader }),
71
72 Err(e) => Err(e.into()),
73 }
74 }
75
76 /// Convenience constructor that opens `path` and picks a streaming
77 /// decoder based on the file's compound extension:
78 ///
79 /// * `*.sqlog` -> raw JSON-SEQ (always available).
80 /// * `*.sqlog.gz` -> gzip via `flate2` (requires the `gzip` feature).
81 /// * `*.sqlog.zst` -> zstd via `zstd` (requires the `zstd` feature).
82 ///
83 /// The dispatch tests the *compound* suffix (`.sqlog.gz`,
84 /// `.sqlog.zst`, `.sqlog`) on the full filename, in that order.
85 /// This rejects bare `.gz` or `.zst` names that do not also carry
86 /// the `.sqlog` segment, and avoids the trap where
87 /// [`Path::extension`] strips only the last component (which
88 /// would silently accept `something.tar.gz`).
89 ///
90 /// Unknown extensions, or compressed extensions whose matching
91 /// feature is not enabled, return an [`std::io::ErrorKind::Unsupported`]
92 /// error with a message pointing at the feature that is needed.
93 ///
94 /// This is the intended single entry point for reading a qlog
95 /// file regardless of compression.
96 pub fn with_file(
97 path: impl AsRef<Path>,
98 ) -> Result<Self, Box<dyn std::error::Error>> {
99 let path = path.as_ref();
100 let file = File::open(path)?;
101 let name = path.file_name().and_then(|s| s.to_str()).unwrap_or("");
102
103 // Order matters: `.sqlog.gz` and `.sqlog.zst` must be tested
104 // before `.sqlog` (every compressed suffix also ends with
105 // `.sqlog` if you only look at the last extension).
106 //
107 // Each compound suffix is matched exactly once. The
108 // `#[cfg(feature = "...")]` lives inside the `if` body so the
109 // disabled-feature path can return a helpful error rather
110 // than falling through to the unknown-extension branch.
111 if name.ends_with(SQLOG_GZ_EXT) {
112 #[cfg(feature = "gzip")]
113 {
114 let reader: Box<dyn std::io::BufRead + Send + Sync> =
115 Box::new(BufReader::new(flate2::read::GzDecoder::new(file)));
116 return Self::new(reader);
117 }
118 #[cfg(not(feature = "gzip"))]
119 return Err(std::io::Error::new(
120 std::io::ErrorKind::Unsupported,
121 format!(
122 "qlog file {name:?} requires the `gzip` feature on \
123 the qlog crate to decode"
124 ),
125 )
126 .into());
127 }
128 if name.ends_with(SQLOG_ZST_EXT) {
129 #[cfg(feature = "zstd")]
130 {
131 let decoder = zstd::Decoder::new(file)?;
132 let reader: Box<dyn std::io::BufRead + Send + Sync> =
133 Box::new(BufReader::new(decoder));
134 return Self::new(reader);
135 }
136 #[cfg(not(feature = "zstd"))]
137 return Err(std::io::Error::new(
138 std::io::ErrorKind::Unsupported,
139 format!(
140 "qlog file {name:?} requires the `zstd` feature on \
141 the qlog crate to decode"
142 ),
143 )
144 .into());
145 }
146 if name.ends_with(SQLOG_EXT) {
147 let reader: Box<dyn std::io::BufRead + Send + Sync> =
148 Box::new(BufReader::new(file));
149 return Self::new(reader);
150 }
151
152 Err(std::io::Error::new(
153 std::io::ErrorKind::Unsupported,
154 format!(
155 "qlog file {name:?} does not match a known qlog \
156 extension ({SQLOG_EXT}, {SQLOG_GZ_EXT}, {SQLOG_ZST_EXT})"
157 ),
158 )
159 .into())
160 }
161
162 fn read_record(
163 reader: &mut (dyn std::io::BufRead + Send + Sync),
164 ) -> Option<Vec<u8>> {
165 let mut buf = Vec::<u8>::new();
166 let size = reader.read_until(b'', &mut buf).unwrap();
167 if size <= 1 {
168 return None;
169 }
170
171 buf.truncate(buf.len() - 1);
172
173 Some(buf)
174 }
175}
176
177impl Iterator for QlogSeqReader<'_> {
178 type Item = Event;
179
180 #[inline]
181 fn next(&mut self) -> Option<Self::Item> {
182 // Attempt to deserialize events but skip them if that fails for any
183 // reason, ensuring we always read all bytes in the reader.
184 while let Some(bytes) = Self::read_record(&mut self.reader) {
185 let r: serde_json::Result<crate::events::Event> =
186 serde_json::from_slice(&bytes);
187
188 if let Ok(event) = r {
189 return Some(Event::Qlog(event));
190 }
191
192 let r: serde_json::Result<crate::events::JsonEvent> =
193 serde_json::from_slice(&bytes);
194
195 if let Ok(event) = r {
196 return Some(Event::Json(event));
197 }
198 }
199
200 None
201 }
202}