h3i/client/
mod.rs

1// Copyright (C) 2024, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27//! The main h3i client runner.
28//!
29//! The client is responsible for connecting to an indicated server, executing
30//! as series of [Action]s, and capturing the results in a
31//! [ConnectionSummary].
32
33#[cfg(feature = "async")]
34pub mod async_client;
35pub mod connection_summary;
36pub mod sync_client;
37
38use connection_summary::*;
39use qlog::events::h3::HttpHeader;
40
41use std::collections::HashMap;
42use std::net::SocketAddr;
43use std::time::Instant;
44
45use crate::actions::h3::Action;
46use crate::actions::h3::StreamEvent;
47use crate::actions::h3::StreamEventType;
48use crate::config::Config;
49use crate::frame::H3iFrame;
50use crate::frame::ResetStream;
51use crate::frame_parser::FrameParseResult;
52use crate::frame_parser::FrameParser;
53use crate::frame_parser::InterruptCause;
54use crate::recordreplay::qlog::QlogEvent;
55use crate::recordreplay::qlog::*;
56use qlog::events::h3::H3FrameParsed;
57use qlog::events::h3::Http3Frame;
58use qlog::events::EventData;
59use qlog::streamer::QlogStreamer;
60use serde::Serialize;
61
62use crate::quiche;
63use quiche::h3::frame::Frame as QFrame;
64use quiche::h3::Error;
65use quiche::h3::NameValue;
66use quiche::ConnectionError;
67
68const MAX_DATAGRAM_SIZE: usize = 1350;
69const QUIC_VERSION: u32 = 1;
70
71fn handle_qlog(
72    qlog_streamer: Option<&mut QlogStreamer>, qlog_frame: Http3Frame,
73    stream_id: u64,
74) {
75    if let Some(s) = qlog_streamer {
76        let ev_data = EventData::H3FrameParsed(H3FrameParsed {
77            stream_id,
78            frame: qlog_frame,
79            ..Default::default()
80        });
81
82        s.add_event_data_now(ev_data).ok();
83    }
84}
85
86#[derive(Debug, Serialize)]
87/// Represents different errors that can occur when the h3i client runs.
88pub enum ClientError {
89    /// An error during the QUIC handshake.
90    HandshakeFail,
91    /// An error during HTTP/3 exchanges.
92    HttpFail,
93    /// Some other type of error.
94    Other(String),
95}
96
97pub(crate) trait Client {
98    /// Gives mutable access to the stream parsers to update their state.
99    fn stream_parsers_mut(&mut self) -> &mut StreamParserMap;
100
101    /// Handles a response frame. This allows [`Client`]s to customize how they
102    /// construct a [`StreamMap`] from a list of frames.
103    fn handle_response_frame(&mut self, stream_id: u64, frame: H3iFrame);
104}
105
106pub(crate) type StreamParserMap = HashMap<u64, FrameParser>;
107
108pub(crate) fn execute_action(
109    action: &Action, conn: &mut quiche::Connection,
110    stream_parsers: &mut StreamParserMap,
111) {
112    match action {
113        Action::SendFrame {
114            stream_id,
115            fin_stream,
116            frame,
117        } => {
118            log::info!("frame tx id={stream_id} frame={frame:?}");
119
120            // TODO: make serialization smarter
121            let mut d = [42; 9999];
122            let mut b = octets::OctetsMut::with_slice(&mut d);
123
124            if let Some(s) = conn.qlog_streamer() {
125                let events: QlogEvents = action.into();
126                for event in events {
127                    match event {
128                        QlogEvent::Event { data, ex_data } => {
129                            // skip dummy packet
130                            if matches!(data.as_ref(), EventData::PacketSent(..))
131                            {
132                                continue;
133                            }
134
135                            s.add_event_data_ex_now(*data, ex_data).ok();
136                        },
137
138                        QlogEvent::JsonEvent(mut ev) => {
139                            // need to rewrite the event time
140                            ev.time = Instant::now()
141                                .duration_since(s.start_time())
142                                .as_secs_f32() *
143                                1000.0;
144                            s.add_event(ev).ok();
145                        },
146                    }
147                }
148            }
149            let len = frame.to_bytes(&mut b).unwrap();
150
151            // TODO - pass errors here to the connectionsummary, which means we
152            // can't initialize it when the connection's been shut
153            // down
154            conn.stream_send(*stream_id, &d[..len], *fin_stream)
155                .unwrap();
156
157            stream_parsers
158                .entry(*stream_id)
159                .or_insert_with(|| FrameParser::new(*stream_id));
160        },
161
162        Action::SendHeadersFrame {
163            stream_id,
164            fin_stream,
165            headers,
166            frame,
167            ..
168        } => {
169            log::info!("headers frame tx stream={stream_id} hdrs={headers:?}");
170
171            // TODO: make serialization smarter
172            let mut d = [42; 9999];
173            let mut b = octets::OctetsMut::with_slice(&mut d);
174
175            if let Some(s) = conn.qlog_streamer() {
176                let events: QlogEvents = action.into();
177                for event in events {
178                    match event {
179                        QlogEvent::Event { data, ex_data } => {
180                            // skip dummy packet
181                            if matches!(data.as_ref(), EventData::PacketSent(..))
182                            {
183                                continue;
184                            }
185
186                            s.add_event_data_ex_now(*data, ex_data).ok();
187                        },
188
189                        QlogEvent::JsonEvent(mut ev) => {
190                            // need to rewrite the event time
191                            ev.time = Instant::now()
192                                .duration_since(s.start_time())
193                                .as_secs_f32() *
194                                1000.0;
195                            s.add_event(ev).ok();
196                        },
197                    }
198                }
199            }
200            let len = frame.to_bytes(&mut b).unwrap();
201            conn.stream_send(*stream_id, &d[..len], *fin_stream)
202                .unwrap();
203
204            stream_parsers
205                .entry(*stream_id)
206                .or_insert_with(|| FrameParser::new(*stream_id));
207        },
208
209        Action::OpenUniStream {
210            stream_id,
211            fin_stream,
212            stream_type,
213        } => {
214            log::info!(
215                "open uni stream_id={stream_id} ty={stream_type} fin={fin_stream}"
216            );
217
218            let mut d = [42; 8];
219            let mut b = octets::OctetsMut::with_slice(&mut d);
220            b.put_varint(*stream_type).unwrap();
221            let off = b.off();
222
223            conn.stream_send(*stream_id, &d[..off], *fin_stream)
224                .unwrap();
225
226            stream_parsers
227                .entry(*stream_id)
228                .or_insert_with(|| FrameParser::new(*stream_id));
229        },
230
231        Action::StreamBytes {
232            stream_id,
233            bytes,
234            fin_stream,
235        } => {
236            log::info!(
237                "stream bytes tx id={} len={} fin={}",
238                stream_id,
239                bytes.len(),
240                fin_stream
241            );
242            conn.stream_send(*stream_id, bytes, *fin_stream).unwrap();
243
244            stream_parsers
245                .entry(*stream_id)
246                .or_insert_with(|| FrameParser::new(*stream_id));
247        },
248
249        Action::SendDatagram { payload } => {
250            log::info!("dgram tx len={}", payload.len(),);
251
252            conn.dgram_send(payload)
253                .expect("datagram extension not enabled by peer");
254        },
255
256        Action::ResetStream {
257            stream_id,
258            error_code,
259        } => {
260            log::info!(
261                "reset_stream stream_id={stream_id} error_code={error_code}"
262            );
263            if let Err(e) = conn.stream_shutdown(
264                *stream_id,
265                quiche::Shutdown::Write,
266                *error_code,
267            ) {
268                log::error!("can't send reset_stream: {e}");
269                // Clients can't reset streams they don't own. If we attempt to do
270                // this, stream_shutdown would fail, and we
271                // shouldn't create a parser.
272                return;
273            }
274
275            stream_parsers
276                .entry(*stream_id)
277                .or_insert_with(|| FrameParser::new(*stream_id));
278        },
279
280        Action::StopSending {
281            stream_id,
282            error_code,
283        } => {
284            log::info!(
285                "stop_sending stream id={stream_id} error_code={error_code}"
286            );
287
288            if let Err(e) = conn.stream_shutdown(
289                *stream_id,
290                quiche::Shutdown::Read,
291                *error_code,
292            ) {
293                log::error!("can't send stop_sending: {e}");
294            }
295
296            // A `STOP_SENDING` should elicit a `RESET_STREAM` in response, which
297            // the frame parser can automatically handle.
298            stream_parsers
299                .entry(*stream_id)
300                .or_insert_with(|| FrameParser::new(*stream_id));
301        },
302
303        Action::ConnectionClose { error } => {
304            let ConnectionError {
305                is_app,
306                error_code,
307                reason,
308            } = error;
309
310            log::info!("connection_close={error:?}");
311            let _ = conn.close(*is_app, *error_code, reason);
312        },
313
314        // Neither of these actions will manipulate the Quiche connection
315        Action::FlushPackets | Action::Wait { .. } => unreachable!(),
316    }
317}
318
319pub(crate) fn parse_streams<C: Client>(
320    conn: &mut quiche::Connection, client: &mut C,
321) -> Vec<StreamEvent> {
322    let mut responded_streams: Vec<StreamEvent> =
323        Vec::with_capacity(conn.readable().len());
324
325    for stream in conn.readable() {
326        // TODO: ignoring control streams
327        if stream % 4 != 0 {
328            continue;
329        }
330
331        loop {
332            let stream_parse_result = client
333                .stream_parsers_mut()
334                .get_mut(&stream)
335                .expect("stream readable with no parser")
336                .try_parse_frame(conn);
337
338            match stream_parse_result {
339                Ok(FrameParseResult::FrameParsed { h3i_frame, fin }) => {
340                    if let H3iFrame::Headers(ref headers) = h3i_frame {
341                        log::info!("hdrs={headers:?}");
342                    }
343
344                    handle_response_frame(
345                        client,
346                        conn.qlog_streamer(),
347                        &mut responded_streams,
348                        stream,
349                        h3i_frame,
350                    );
351
352                    if fin {
353                        handle_fin(
354                            &mut responded_streams,
355                            client.stream_parsers_mut(),
356                            stream,
357                        );
358                        break;
359                    }
360                },
361                Ok(FrameParseResult::Retry) => {},
362                Ok(FrameParseResult::Interrupted(cause)) => {
363                    if let InterruptCause::ResetStream(error_code) = cause {
364                        let frame = H3iFrame::ResetStream(ResetStream {
365                            stream_id: stream,
366                            error_code,
367                        });
368
369                        log::info!("received reset stream: {frame:?}");
370                        handle_response_frame(
371                            client,
372                            None,
373                            &mut responded_streams,
374                            stream,
375                            frame,
376                        );
377                    }
378
379                    handle_fin(
380                        &mut responded_streams,
381                        client.stream_parsers_mut(),
382                        stream,
383                    );
384                    break;
385                },
386                Err(e) => {
387                    match e {
388                        Error::TransportError(quiche::Error::Done) => {
389                            log::debug!("stream {stream} exhausted");
390                        },
391                        Error::TransportError(quiche::Error::StreamReset(
392                            error_code,
393                        )) => {
394                            let frame = H3iFrame::ResetStream(ResetStream {
395                                stream_id: stream,
396                                error_code,
397                            });
398
399                            log::info!("received reset stream: {frame:?}");
400
401                            handle_response_frame(
402                                client,
403                                None,
404                                &mut responded_streams,
405                                stream,
406                                frame,
407                            );
408
409                            client.stream_parsers_mut().remove(&stream);
410                        },
411                        _ => {
412                            log::warn!("stream read error: {e}");
413                        },
414                    };
415
416                    break;
417                },
418            }
419        }
420    }
421
422    responded_streams
423}
424
425fn handle_fin(
426    responded_streams: &mut Vec<StreamEvent>,
427    stream_parsers: &mut StreamParserMap, stream_id: u64,
428) {
429    responded_streams.push(StreamEvent {
430        stream_id,
431        event_type: StreamEventType::Finished,
432    });
433
434    stream_parsers.remove(&stream_id);
435}
436
437/// Push any responses to the [StreamMap] as well as store them in the
438/// `responded` vector
439fn handle_response_frame<C: Client>(
440    client: &mut C, qlog_streamer: Option<&mut QlogStreamer>,
441    responded_streams: &mut Vec<StreamEvent>, stream_id: u64, frame: H3iFrame,
442) {
443    let cloned = frame.clone();
444    client.handle_response_frame(stream_id, cloned);
445
446    let mut to_qlog: Option<Http3Frame> = None;
447    let mut push_to_responses: Option<StreamEvent> = None;
448
449    match frame {
450        H3iFrame::Headers(enriched_headers) => {
451            push_to_responses = Some(StreamEvent {
452                stream_id,
453                event_type: StreamEventType::Headers,
454            });
455
456            let qlog_headers: Vec<HttpHeader> = enriched_headers
457                .headers()
458                .iter()
459                .map(|h| qlog::events::h3::HttpHeader {
460                    name: String::from_utf8_lossy(h.name()).into_owned(),
461                    value: String::from_utf8_lossy(h.value()).into_owned(),
462                })
463                .collect();
464
465            to_qlog = Some(Http3Frame::Headers {
466                headers: qlog_headers,
467            });
468        },
469        H3iFrame::QuicheH3(quiche_frame) => {
470            if let QFrame::Data { .. } = quiche_frame {
471                push_to_responses = Some(StreamEvent {
472                    stream_id,
473                    event_type: StreamEventType::Data,
474                });
475            }
476
477            to_qlog = Some(quiche_frame.to_qlog());
478        },
479        H3iFrame::ResetStream(_) => {
480            push_to_responses = Some(StreamEvent {
481                stream_id,
482                event_type: StreamEventType::Finished,
483            });
484        },
485    }
486
487    if let Some(to_qlog) = to_qlog {
488        handle_qlog(qlog_streamer, to_qlog, stream_id);
489    }
490
491    if let Some(to_push) = push_to_responses {
492        responded_streams.push(to_push);
493    }
494}
495
496pub(crate) struct ParsedArgs<'a> {
497    pub(crate) bind_addr: SocketAddr,
498    pub(crate) peer_addr: SocketAddr,
499    pub(crate) connect_url: Option<&'a str>,
500}
501
502pub(crate) fn parse_args(args: &Config) -> ParsedArgs<'_> {
503    // We'll only connect to one server.
504    let connect_url = if !args.omit_sni {
505        args.host_port.split(':').next()
506    } else {
507        None
508    };
509
510    let (peer_addr, bind_addr) = resolve_socket_addrs(args);
511
512    ParsedArgs {
513        peer_addr,
514        bind_addr,
515        connect_url,
516    }
517}
518
519fn resolve_socket_addrs(args: &Config) -> (SocketAddr, SocketAddr) {
520    // Resolve server address.
521    let peer_addr = if let Some(addr) = &args.connect_to {
522        addr.parse().expect("--connect-to is expected to be a string containing an IPv4 or IPv6 address with a port. E.g. 192.0.2.0:443")
523    } else {
524        let x = format!("https://{}", args.host_port);
525        *url::Url::parse(&x)
526            .unwrap()
527            .socket_addrs(|| None)
528            .unwrap()
529            .first()
530            .unwrap()
531    };
532
533    // Bind to INADDR_ANY or IN6ADDR_ANY depending on the IP family of the
534    // server address. This is needed on macOS and BSD variants that don't
535    // support binding to IN6ADDR_ANY for both v4 and v6.
536    let bind_addr = match peer_addr {
537        std::net::SocketAddr::V4(_) => format!("0.0.0.0:{}", args.source_port),
538        std::net::SocketAddr::V6(_) => format!("[::]:{}", args.source_port),
539    };
540
541    (
542        peer_addr,
543        bind_addr.parse().expect("unable to parse bind address"),
544    )
545}