h3i/client/
mod.rs

1// Copyright (C) 2024, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27//! The main h3i client runner.
28//!
29//! The client is responsible for connecting to an indicated server, executing
30//! as series of [Action]s, and capturing the results in a
31//! [ConnectionSummary].
32
33#[cfg(feature = "async")]
34pub mod async_client;
35pub mod connection_summary;
36pub mod sync_client;
37
38use connection_summary::*;
39use qlog::events::h3::HttpHeader;
40
41use std::collections::HashMap;
42use std::net::SocketAddr;
43use std::time::Instant;
44
45use crate::actions::h3::Action;
46use crate::actions::h3::StreamEvent;
47use crate::actions::h3::StreamEventType;
48use crate::config::Config;
49use crate::frame::H3iFrame;
50use crate::frame::ResetStream;
51use crate::frame_parser::FrameParseResult;
52use crate::frame_parser::FrameParser;
53use crate::frame_parser::InterruptCause;
54use crate::recordreplay::qlog::QlogEvent;
55use crate::recordreplay::qlog::*;
56use qlog::events::h3::H3FrameParsed;
57use qlog::events::h3::Http3Frame;
58use qlog::events::EventData;
59use qlog::streamer::QlogStreamer;
60use serde::Serialize;
61
62use crate::quiche;
63use quiche::h3::frame::Frame as QFrame;
64use quiche::h3::Error;
65use quiche::h3::NameValue;
66use quiche::ConnectionError;
67
68const MAX_DATAGRAM_SIZE: usize = 1350;
69const QUIC_VERSION: u32 = 1;
70
71fn handle_qlog(
72    qlog_streamer: Option<&mut QlogStreamer>, qlog_frame: Http3Frame,
73    stream_id: u64,
74) {
75    if let Some(s) = qlog_streamer {
76        let ev_data = EventData::H3FrameParsed(H3FrameParsed {
77            stream_id,
78            frame: qlog_frame,
79            ..Default::default()
80        });
81
82        s.add_event_data_now(ev_data).ok();
83    }
84}
85
86#[derive(Debug, Serialize)]
87/// Represents different errors that can occur when the h3i client runs.
88pub enum ClientError {
89    /// An error during the QUIC handshake.
90    HandshakeFail,
91    /// An error during HTTP/3 exchanges.
92    HttpFail,
93    /// Some other type of error.
94    Other(String),
95}
96
97pub(crate) trait Client {
98    /// Gives mutable access to the stream parsers to update their state.
99    fn stream_parsers_mut(&mut self) -> &mut StreamParserMap;
100
101    /// Handles a response frame. This allows [`Client`]s to customize how they
102    /// construct a [`StreamMap`] from a list of frames.
103    fn handle_response_frame(&mut self, stream_id: u64, frame: H3iFrame);
104}
105
106pub(crate) type StreamParserMap = HashMap<u64, FrameParser>;
107
108pub(crate) fn execute_action(
109    action: &Action, conn: &mut quiche::Connection,
110    stream_parsers: &mut StreamParserMap,
111) {
112    match action {
113        Action::SendFrame {
114            stream_id,
115            fin_stream,
116            frame,
117        } => {
118            log::info!("frame tx id={stream_id} frame={frame:?}");
119
120            // TODO: make serialization smarter
121            let mut d = [42; 9999];
122            let mut b = octets::OctetsMut::with_slice(&mut d);
123
124            if let Some(s) = conn.qlog_streamer() {
125                let events: QlogEvents = action.into();
126                for event in events {
127                    match event {
128                        QlogEvent::Event { data, ex_data } => {
129                            // skip dummy packet
130                            if matches!(data.as_ref(), EventData::PacketSent(..))
131                            {
132                                continue;
133                            }
134
135                            s.add_event_data_ex_now(*data, ex_data).ok();
136                        },
137
138                        QlogEvent::JsonEvent(mut ev) => {
139                            // need to rewrite the event time
140                            ev.time = Instant::now()
141                                .duration_since(s.start_time())
142                                .as_secs_f32() *
143                                1000.0;
144                            s.add_event(ev).ok();
145                        },
146                    }
147                }
148            }
149            let len = frame.to_bytes(&mut b).unwrap();
150
151            // TODO - pass errors here to the connectionsummary, which means we
152            // can't initialize it when the connection's been shut
153            // down
154            conn.stream_send(*stream_id, &d[..len], *fin_stream)
155                .unwrap();
156
157            stream_parsers
158                .entry(*stream_id)
159                .or_insert_with(|| FrameParser::new(*stream_id));
160        },
161
162        Action::SendHeadersFrame {
163            stream_id,
164            fin_stream,
165            headers,
166            frame,
167            ..
168        } => {
169            log::info!("headers frame tx stream={stream_id} hdrs={headers:?}");
170
171            // TODO: make serialization smarter
172            let mut d = [42; 9999];
173            let mut b = octets::OctetsMut::with_slice(&mut d);
174
175            if let Some(s) = conn.qlog_streamer() {
176                let events: QlogEvents = action.into();
177                for event in events {
178                    match event {
179                        QlogEvent::Event { data, ex_data } => {
180                            // skip dummy packet
181                            if matches!(data.as_ref(), EventData::PacketSent(..))
182                            {
183                                continue;
184                            }
185
186                            s.add_event_data_ex_now(*data, ex_data).ok();
187                        },
188
189                        QlogEvent::JsonEvent(mut ev) => {
190                            // need to rewrite the event time
191                            ev.time = Instant::now()
192                                .duration_since(s.start_time())
193                                .as_secs_f32() *
194                                1000.0;
195                            s.add_event(ev).ok();
196                        },
197                    }
198                }
199            }
200            let len = frame.to_bytes(&mut b).unwrap();
201            conn.stream_send(*stream_id, &d[..len], *fin_stream)
202                .unwrap();
203
204            stream_parsers
205                .entry(*stream_id)
206                .or_insert_with(|| FrameParser::new(*stream_id));
207        },
208
209        Action::OpenUniStream {
210            stream_id,
211            fin_stream,
212            stream_type,
213        } => {
214            log::info!(
215                "open uni stream_id={stream_id} ty={stream_type} fin={fin_stream}"
216            );
217
218            let mut d = [42; 8];
219            let mut b = octets::OctetsMut::with_slice(&mut d);
220            b.put_varint(*stream_type).unwrap();
221            let off = b.off();
222
223            conn.stream_send(*stream_id, &d[..off], *fin_stream)
224                .unwrap();
225
226            stream_parsers
227                .entry(*stream_id)
228                .or_insert_with(|| FrameParser::new(*stream_id));
229        },
230
231        Action::StreamBytes {
232            stream_id,
233            bytes,
234            fin_stream,
235        } => {
236            log::info!(
237                "stream bytes tx id={} len={} fin={}",
238                stream_id,
239                bytes.len(),
240                fin_stream
241            );
242            conn.stream_send(*stream_id, bytes, *fin_stream).unwrap();
243
244            stream_parsers
245                .entry(*stream_id)
246                .or_insert_with(|| FrameParser::new(*stream_id));
247        },
248
249        Action::ResetStream {
250            stream_id,
251            error_code,
252        } => {
253            log::info!(
254                "reset_stream stream_id={stream_id} error_code={error_code}"
255            );
256            if let Err(e) = conn.stream_shutdown(
257                *stream_id,
258                quiche::Shutdown::Write,
259                *error_code,
260            ) {
261                log::error!("can't send reset_stream: {e}");
262                // Clients can't reset streams they don't own. If we attempt to do
263                // this, stream_shutdown would fail, and we
264                // shouldn't create a parser.
265                return;
266            }
267
268            stream_parsers
269                .entry(*stream_id)
270                .or_insert_with(|| FrameParser::new(*stream_id));
271        },
272
273        Action::StopSending {
274            stream_id,
275            error_code,
276        } => {
277            log::info!(
278                "stop_sending stream id={stream_id} error_code={error_code}"
279            );
280
281            if let Err(e) = conn.stream_shutdown(
282                *stream_id,
283                quiche::Shutdown::Read,
284                *error_code,
285            ) {
286                log::error!("can't send stop_sending: {e}");
287            }
288
289            // A `STOP_SENDING` should elicit a `RESET_STREAM` in response, which
290            // the frame parser can automatically handle.
291            stream_parsers
292                .entry(*stream_id)
293                .or_insert_with(|| FrameParser::new(*stream_id));
294        },
295
296        Action::ConnectionClose { error } => {
297            let ConnectionError {
298                is_app,
299                error_code,
300                reason,
301            } = error;
302
303            log::info!("connection_close={error:?}");
304            let _ = conn.close(*is_app, *error_code, reason);
305        },
306
307        // Neither of these actions will manipulate the Quiche connection
308        Action::FlushPackets | Action::Wait { .. } => unreachable!(),
309    }
310}
311
312pub(crate) fn parse_streams<C: Client>(
313    conn: &mut quiche::Connection, client: &mut C,
314) -> Vec<StreamEvent> {
315    let mut responded_streams: Vec<StreamEvent> =
316        Vec::with_capacity(conn.readable().len());
317
318    for stream in conn.readable() {
319        // TODO: ignoring control streams
320        if stream % 4 != 0 {
321            continue;
322        }
323
324        loop {
325            let stream_parse_result = client
326                .stream_parsers_mut()
327                .get_mut(&stream)
328                .expect("stream readable with no parser")
329                .try_parse_frame(conn);
330
331            match stream_parse_result {
332                Ok(FrameParseResult::FrameParsed { h3i_frame, fin }) => {
333                    if let H3iFrame::Headers(ref headers) = h3i_frame {
334                        log::info!("hdrs={headers:?}");
335                    }
336
337                    handle_response_frame(
338                        client,
339                        conn.qlog_streamer(),
340                        &mut responded_streams,
341                        stream,
342                        h3i_frame,
343                    );
344
345                    if fin {
346                        handle_fin(
347                            &mut responded_streams,
348                            client.stream_parsers_mut(),
349                            stream,
350                        );
351                        break;
352                    }
353                },
354                Ok(FrameParseResult::Retry) => {},
355                Ok(FrameParseResult::Interrupted(cause)) => {
356                    if let InterruptCause::ResetStream(error_code) = cause {
357                        let frame = H3iFrame::ResetStream(ResetStream {
358                            stream_id: stream,
359                            error_code,
360                        });
361
362                        log::info!("received reset stream: {frame:?}");
363                        handle_response_frame(
364                            client,
365                            None,
366                            &mut responded_streams,
367                            stream,
368                            frame,
369                        );
370                    }
371
372                    handle_fin(
373                        &mut responded_streams,
374                        client.stream_parsers_mut(),
375                        stream,
376                    );
377                    break;
378                },
379                Err(e) => {
380                    match e {
381                        Error::TransportError(quiche::Error::Done) => {
382                            log::debug!("stream {stream} exhausted");
383                        },
384                        Error::TransportError(quiche::Error::StreamReset(
385                            error_code,
386                        )) => {
387                            let frame = H3iFrame::ResetStream(ResetStream {
388                                stream_id: stream,
389                                error_code,
390                            });
391
392                            log::info!("received reset stream: {frame:?}");
393
394                            handle_response_frame(
395                                client,
396                                None,
397                                &mut responded_streams,
398                                stream,
399                                frame,
400                            );
401
402                            client.stream_parsers_mut().remove(&stream);
403                        },
404                        _ => {
405                            log::warn!("stream read error: {e}");
406                        },
407                    };
408
409                    break;
410                },
411            }
412        }
413    }
414
415    responded_streams
416}
417
418fn handle_fin(
419    responded_streams: &mut Vec<StreamEvent>,
420    stream_parsers: &mut StreamParserMap, stream_id: u64,
421) {
422    responded_streams.push(StreamEvent {
423        stream_id,
424        event_type: StreamEventType::Finished,
425    });
426
427    stream_parsers.remove(&stream_id);
428}
429
430/// Push any responses to the [StreamMap] as well as store them in the
431/// `responded` vector
432fn handle_response_frame<C: Client>(
433    client: &mut C, qlog_streamer: Option<&mut QlogStreamer>,
434    responded_streams: &mut Vec<StreamEvent>, stream_id: u64, frame: H3iFrame,
435) {
436    let cloned = frame.clone();
437    client.handle_response_frame(stream_id, cloned);
438
439    let mut to_qlog: Option<Http3Frame> = None;
440    let mut push_to_responses: Option<StreamEvent> = None;
441
442    match frame {
443        H3iFrame::Headers(enriched_headers) => {
444            push_to_responses = Some(StreamEvent {
445                stream_id,
446                event_type: StreamEventType::Headers,
447            });
448
449            let qlog_headers: Vec<HttpHeader> = enriched_headers
450                .headers()
451                .iter()
452                .map(|h| qlog::events::h3::HttpHeader {
453                    name: String::from_utf8_lossy(h.name()).into_owned(),
454                    value: String::from_utf8_lossy(h.value()).into_owned(),
455                })
456                .collect();
457
458            to_qlog = Some(Http3Frame::Headers {
459                headers: qlog_headers,
460            });
461        },
462        H3iFrame::QuicheH3(quiche_frame) => {
463            if let QFrame::Data { .. } = quiche_frame {
464                push_to_responses = Some(StreamEvent {
465                    stream_id,
466                    event_type: StreamEventType::Data,
467                });
468            }
469
470            to_qlog = Some(quiche_frame.to_qlog());
471        },
472        H3iFrame::ResetStream(_) => {
473            push_to_responses = Some(StreamEvent {
474                stream_id,
475                event_type: StreamEventType::Finished,
476            });
477        },
478    }
479
480    if let Some(to_qlog) = to_qlog {
481        handle_qlog(qlog_streamer, to_qlog, stream_id);
482    }
483
484    if let Some(to_push) = push_to_responses {
485        responded_streams.push(to_push);
486    }
487}
488
489pub(crate) struct ParsedArgs<'a> {
490    pub(crate) bind_addr: SocketAddr,
491    pub(crate) peer_addr: SocketAddr,
492    pub(crate) connect_url: Option<&'a str>,
493}
494
495pub(crate) fn parse_args(args: &Config) -> ParsedArgs<'_> {
496    // We'll only connect to one server.
497    let connect_url = if !args.omit_sni {
498        args.host_port.split(':').next()
499    } else {
500        None
501    };
502
503    let (peer_addr, bind_addr) = resolve_socket_addrs(args);
504
505    ParsedArgs {
506        peer_addr,
507        bind_addr,
508        connect_url,
509    }
510}
511
512fn resolve_socket_addrs(args: &Config) -> (SocketAddr, SocketAddr) {
513    // Resolve server address.
514    let peer_addr = if let Some(addr) = &args.connect_to {
515        addr.parse().expect("--connect-to is expected to be a string containing an IPv4 or IPv6 address with a port. E.g. 192.0.2.0:443")
516    } else {
517        let x = format!("https://{}", args.host_port);
518        *url::Url::parse(&x)
519            .unwrap()
520            .socket_addrs(|| None)
521            .unwrap()
522            .first()
523            .unwrap()
524    };
525
526    // Bind to INADDR_ANY or IN6ADDR_ANY depending on the IP family of the
527    // server address. This is needed on macOS and BSD variants that don't
528    // support binding to IN6ADDR_ANY for both v4 and v6.
529    let bind_addr = match peer_addr {
530        std::net::SocketAddr::V4(_) => format!("0.0.0.0:{}", args.source_port),
531        std::net::SocketAddr::V6(_) => format!("[::]:{}", args.source_port),
532    };
533
534    (
535        peer_addr,
536        bind_addr.parse().expect("unable to parse bind address"),
537    )
538}