Skip to main content

h3i/client/
mod.rs

1// Copyright (C) 2024, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27//! The main h3i client runner.
28//!
29//! The client is responsible for connecting to an indicated server, executing
30//! as series of [Action]s, and capturing the results in a
31//! [ConnectionSummary].
32
33#[cfg(feature = "async")]
34pub mod async_client;
35pub mod connection_summary;
36pub mod sync_client;
37
38use connection_summary::*;
39use qlog::events::http3::FrameParsed;
40use qlog::events::http3::HttpHeader;
41use quiche::ConnectionError;
42
43use std::collections::HashMap;
44use std::net::SocketAddr;
45use std::time::Instant;
46
47use crate::actions::h3::Action;
48use crate::actions::h3::ExpectedStreamSendResult;
49use crate::actions::h3::StreamEvent;
50use crate::actions::h3::StreamEventType;
51use crate::config::Config;
52use crate::frame::H3iFrame;
53use crate::frame::ResetStream;
54use crate::frame_parser::FrameParseResult;
55use crate::frame_parser::FrameParser;
56use crate::frame_parser::InterruptCause;
57use crate::recordreplay::qlog::QlogEvent;
58use crate::recordreplay::qlog::*;
59use qlog::events::http3::Http3Frame;
60use qlog::events::EventData;
61use qlog::streamer::QlogStreamer;
62use serde::Serialize;
63
64use crate::quiche;
65use quiche::h3::frame::Frame as QFrame;
66use quiche::h3::Error;
67use quiche::h3::NameValue;
68
69const MAX_DATAGRAM_SIZE: usize = 1350;
70const QUIC_VERSION: u32 = 1;
71
72fn handle_qlog(
73    qlog_streamer: Option<&mut QlogStreamer>, qlog_frame: Http3Frame,
74    stream_id: u64,
75) {
76    if let Some(s) = qlog_streamer {
77        let ev_data = EventData::Http3FrameParsed(FrameParsed {
78            stream_id,
79            frame: qlog_frame,
80            ..Default::default()
81        });
82
83        s.add_event_data_now(ev_data).ok();
84    }
85}
86
87#[derive(Debug, Serialize)]
88/// Represents different errors that can occur when the h3i client runs.
89pub enum ClientError {
90    /// An error during the QUIC handshake.
91    HandshakeFail,
92    /// An error during HTTP/3 exchanges.
93    HttpFail,
94    /// Some other type of error.
95    Other(String),
96}
97
98pub(crate) trait Client {
99    /// Gives mutable access to the stream parsers to update their state.
100    fn stream_parsers_mut(&mut self) -> &mut StreamParserMap;
101
102    /// Handles a response frame. This allows [`Client`]s to customize how they
103    /// construct a [`StreamMap`] from a list of frames.
104    fn handle_response_frame(&mut self, stream_id: u64, frame: H3iFrame);
105}
106
107pub(crate) type StreamParserMap = HashMap<u64, FrameParser>;
108
109fn validate_stream_send_result(
110    result: quiche::Result<usize>, expected: &ExpectedStreamSendResult,
111    stream_id: u64,
112) {
113    match expected {
114        ExpectedStreamSendResult::Ok => {
115            result.unwrap_or_else(|err| {
116                panic!(
117                    "Expected stream_send on stream {} to succeed, but got: {:?}",
118                    stream_id, err
119                )
120            });
121        },
122        ExpectedStreamSendResult::OkExact(expected_bytes) => {
123            match result {
124                Ok(actual_bytes) if actual_bytes == *expected_bytes => {},
125                Ok(actual_bytes) => panic!(
126                    "Expected stream_send on stream {} to write {} bytes, got {}",
127                    stream_id, expected_bytes, actual_bytes
128                ),
129                Err(err) => panic!(
130                    "Expected stream_send on stream {} to write {} bytes, got error: {:?}",
131                    stream_id, expected_bytes, err
132                ),
133            }
134        },
135        ExpectedStreamSendResult::Error(expected_err) => {
136            match result {
137                Ok(written) => panic!(
138                    "Expected stream_send on stream {} to fail with {:?}, but wrote {} bytes",
139                    stream_id, expected_err, written
140                ),
141                Err(actual_err) if &actual_err == expected_err => {},
142                Err(actual_err) => panic!(
143                    "Expected stream_send on stream {} to fail with {:?}, got {:?}",
144                    stream_id, expected_err, actual_err
145                ),
146            }
147        },
148    }
149}
150
151pub(crate) fn execute_action<F: quiche::BufFactory>(
152    action: &Action, conn: &mut quiche::Connection<F>,
153    stream_parsers: &mut StreamParserMap,
154) {
155    match action {
156        Action::SendFrame {
157            stream_id,
158            fin_stream,
159            frame,
160            expected_result,
161        } => {
162            log::info!("frame tx id={stream_id} frame={frame:?}");
163
164            // TODO: make serialization smarter
165            let mut d = [42; 9999];
166            let mut b = octets::OctetsMut::with_slice(&mut d);
167
168            if let Some(s) = conn.qlog_streamer() {
169                let events: QlogEvents = action.into();
170                for event in events {
171                    match event {
172                        QlogEvent::Event { data, ex_data } => {
173                            // skip dummy packet
174                            if matches!(
175                                data.as_ref(),
176                                EventData::QuicPacketSent(..)
177                            ) {
178                                continue;
179                            }
180
181                            s.add_event_data_ex_now(*data, ex_data).ok();
182                        },
183
184                        QlogEvent::JsonEvent(mut ev) => {
185                            // need to rewrite the event time
186                            ev.time = Instant::now()
187                                .duration_since(s.start_time())
188                                .as_secs_f64() *
189                                1000.0;
190                            s.add_event(ev).ok();
191                        },
192                    }
193                }
194            }
195            let len = frame.to_bytes(&mut b).unwrap();
196
197            // TODO - consider storying result in ConnectionSummary too.
198            let result = conn.stream_send(*stream_id, &d[..len], *fin_stream);
199            validate_stream_send_result(result, expected_result, *stream_id);
200
201            stream_parsers
202                .entry(*stream_id)
203                .or_insert_with(|| FrameParser::new(*stream_id));
204        },
205
206        Action::SendHeadersFrame {
207            stream_id,
208            fin_stream,
209            headers,
210            frame,
211            expected_result,
212            ..
213        } => {
214            log::info!("headers frame tx stream={stream_id} hdrs={headers:?}");
215
216            // TODO: make serialization smarter
217            let mut d = [42; 9999];
218            let mut b = octets::OctetsMut::with_slice(&mut d);
219
220            if let Some(s) = conn.qlog_streamer() {
221                let events: QlogEvents = action.into();
222                for event in events {
223                    match event {
224                        QlogEvent::Event { data, ex_data } => {
225                            // skip dummy packet
226                            if matches!(
227                                data.as_ref(),
228                                EventData::QuicPacketSent(..)
229                            ) {
230                                continue;
231                            }
232
233                            s.add_event_data_ex_now(*data, ex_data).ok();
234                        },
235
236                        QlogEvent::JsonEvent(mut ev) => {
237                            // need to rewrite the event time
238                            ev.time = Instant::now()
239                                .duration_since(s.start_time())
240                                .as_secs_f64() *
241                                1000.0;
242                            s.add_event(ev).ok();
243                        },
244                    }
245                }
246            }
247            let len = frame.to_bytes(&mut b).unwrap();
248            let result = conn.stream_send(*stream_id, &d[..len], *fin_stream);
249            validate_stream_send_result(result, expected_result, *stream_id);
250
251            stream_parsers
252                .entry(*stream_id)
253                .or_insert_with(|| FrameParser::new(*stream_id));
254        },
255
256        Action::OpenUniStream {
257            stream_id,
258            fin_stream,
259            stream_type,
260            expected_result,
261        } => {
262            log::info!(
263                "open uni stream_id={stream_id} ty={stream_type} fin={fin_stream}"
264            );
265
266            let mut d = [42; 8];
267            let mut b = octets::OctetsMut::with_slice(&mut d);
268            b.put_varint(*stream_type).unwrap();
269            let off = b.off();
270
271            let result = conn.stream_send(*stream_id, &d[..off], *fin_stream);
272            validate_stream_send_result(result, expected_result, *stream_id);
273
274            stream_parsers
275                .entry(*stream_id)
276                .or_insert_with(|| FrameParser::new(*stream_id));
277        },
278
279        Action::StreamBytes {
280            stream_id,
281            bytes,
282            fin_stream,
283            expected_result,
284        } => {
285            log::info!(
286                "stream bytes tx id={} len={} fin={}",
287                stream_id,
288                bytes.len(),
289                fin_stream
290            );
291            let result = conn.stream_send(*stream_id, bytes, *fin_stream);
292            validate_stream_send_result(result, expected_result, *stream_id);
293
294            stream_parsers
295                .entry(*stream_id)
296                .or_insert_with(|| FrameParser::new(*stream_id));
297        },
298
299        Action::SendDatagram { payload } => {
300            log::info!("dgram tx len={}", payload.len(),);
301
302            conn.dgram_send(payload)
303                .expect("datagram extension not enabled by peer");
304        },
305
306        Action::ResetStream {
307            stream_id,
308            error_code,
309        } => {
310            log::info!(
311                "reset_stream stream_id={stream_id} error_code={error_code}"
312            );
313            if let Err(e) = conn.stream_shutdown(
314                *stream_id,
315                quiche::Shutdown::Write,
316                *error_code,
317            ) {
318                log::error!("can't send reset_stream: {e}");
319                // Clients can't reset streams they don't own. If we attempt to do
320                // this, stream_shutdown would fail, and we
321                // shouldn't create a parser.
322                return;
323            }
324
325            stream_parsers
326                .entry(*stream_id)
327                .or_insert_with(|| FrameParser::new(*stream_id));
328        },
329
330        Action::StopSending {
331            stream_id,
332            error_code,
333        } => {
334            log::info!(
335                "stop_sending stream id={stream_id} error_code={error_code}"
336            );
337
338            if let Err(e) = conn.stream_shutdown(
339                *stream_id,
340                quiche::Shutdown::Read,
341                *error_code,
342            ) {
343                log::error!("can't send stop_sending: {e}");
344            }
345
346            // A `STOP_SENDING` should elicit a `RESET_STREAM` in response, which
347            // the frame parser can automatically handle.
348            stream_parsers
349                .entry(*stream_id)
350                .or_insert_with(|| FrameParser::new(*stream_id));
351        },
352
353        Action::ConnectionClose { error } => {
354            let ConnectionError {
355                is_app,
356                error_code,
357                reason,
358            } = error;
359
360            log::info!("connection_close={error:?}");
361            let _ = conn.close(*is_app, *error_code, reason);
362        },
363
364        // Neither of these actions will manipulate the Quiche connection
365        Action::FlushPackets | Action::Wait { .. } => unreachable!(),
366    }
367}
368
369pub(crate) fn parse_streams<F: quiche::BufFactory, C: Client>(
370    conn: &mut quiche::Connection<F>, client: &mut C,
371) -> Vec<StreamEvent> {
372    let mut responded_streams: Vec<StreamEvent> =
373        Vec::with_capacity(conn.readable().len());
374
375    for stream in conn.readable() {
376        // TODO: ignoring control streams
377        if stream % 4 != 0 {
378            continue;
379        }
380
381        loop {
382            let stream_parse_result = client
383                .stream_parsers_mut()
384                .get_mut(&stream)
385                .expect("stream readable with no parser")
386                .try_parse_frame(conn);
387
388            match stream_parse_result {
389                Ok(FrameParseResult::FrameParsed { h3i_frame, fin }) => {
390                    if let H3iFrame::Headers(ref headers) = h3i_frame {
391                        log::info!("hdrs={headers:?}");
392                    }
393
394                    handle_response_frame(
395                        client,
396                        conn.qlog_streamer(),
397                        &mut responded_streams,
398                        stream,
399                        h3i_frame,
400                    );
401
402                    if fin {
403                        handle_fin(
404                            &mut responded_streams,
405                            client.stream_parsers_mut(),
406                            stream,
407                        );
408                        break;
409                    }
410                },
411                Ok(FrameParseResult::Retry) => {},
412                Ok(FrameParseResult::Interrupted(cause)) => {
413                    if let InterruptCause::ResetStream(error_code) = cause {
414                        let frame = H3iFrame::ResetStream(ResetStream {
415                            stream_id: stream,
416                            error_code,
417                        });
418
419                        log::info!("received reset stream: {frame:?}");
420                        handle_response_frame(
421                            client,
422                            None,
423                            &mut responded_streams,
424                            stream,
425                            frame,
426                        );
427                    }
428
429                    handle_fin(
430                        &mut responded_streams,
431                        client.stream_parsers_mut(),
432                        stream,
433                    );
434                    break;
435                },
436                Err(e) => {
437                    match e {
438                        Error::TransportError(quiche::Error::Done) => {
439                            log::debug!("stream {stream} exhausted");
440                        },
441                        Error::TransportError(quiche::Error::StreamReset(
442                            error_code,
443                        )) => {
444                            let frame = H3iFrame::ResetStream(ResetStream {
445                                stream_id: stream,
446                                error_code,
447                            });
448
449                            log::info!("received reset stream: {frame:?}");
450
451                            handle_response_frame(
452                                client,
453                                None,
454                                &mut responded_streams,
455                                stream,
456                                frame,
457                            );
458
459                            client.stream_parsers_mut().remove(&stream);
460                        },
461                        _ => {
462                            log::warn!("stream read error: {e}");
463                        },
464                    };
465
466                    break;
467                },
468            }
469        }
470    }
471
472    responded_streams
473}
474
475fn handle_fin(
476    responded_streams: &mut Vec<StreamEvent>,
477    stream_parsers: &mut StreamParserMap, stream_id: u64,
478) {
479    responded_streams.push(StreamEvent {
480        stream_id,
481        event_type: StreamEventType::Finished,
482    });
483
484    stream_parsers.remove(&stream_id);
485}
486
487/// Push any responses to the [StreamMap] as well as store them in the
488/// `responded` vector
489fn handle_response_frame<C: Client>(
490    client: &mut C, qlog_streamer: Option<&mut QlogStreamer>,
491    responded_streams: &mut Vec<StreamEvent>, stream_id: u64, frame: H3iFrame,
492) {
493    let cloned = frame.clone();
494    client.handle_response_frame(stream_id, cloned);
495
496    let mut to_qlog: Option<Http3Frame> = None;
497    let mut push_to_responses: Option<StreamEvent> = None;
498
499    match frame {
500        H3iFrame::Headers(enriched_headers) => {
501            push_to_responses = Some(StreamEvent {
502                stream_id,
503                event_type: StreamEventType::Headers,
504            });
505
506            let qlog_headers: Vec<HttpHeader> = enriched_headers
507                .headers()
508                .iter()
509                .map(|h| qlog::events::http3::HttpHeader {
510                    name: Some(String::from_utf8_lossy(h.name()).into_owned()),
511                    name_bytes: None,
512                    value: Some(String::from_utf8_lossy(h.value()).into_owned()),
513                    value_bytes: None,
514                })
515                .collect();
516
517            to_qlog = Some(Http3Frame::Headers {
518                headers: qlog_headers,
519                raw: None,
520            });
521        },
522        H3iFrame::QuicheH3(quiche_frame) => {
523            if let QFrame::Data { .. } = quiche_frame {
524                push_to_responses = Some(StreamEvent {
525                    stream_id,
526                    event_type: StreamEventType::Data,
527                });
528            }
529
530            to_qlog = Some(quiche_frame.to_qlog());
531        },
532        H3iFrame::ResetStream(_) => {
533            push_to_responses = Some(StreamEvent {
534                stream_id,
535                event_type: StreamEventType::Finished,
536            });
537        },
538    }
539
540    if let Some(to_qlog) = to_qlog {
541        handle_qlog(qlog_streamer, to_qlog, stream_id);
542    }
543
544    if let Some(to_push) = push_to_responses {
545        responded_streams.push(to_push);
546    }
547}
548
549pub(crate) struct ParsedArgs<'a> {
550    pub(crate) bind_addr: SocketAddr,
551    pub(crate) peer_addr: SocketAddr,
552    pub(crate) connect_url: Option<&'a str>,
553}
554
555pub(crate) fn parse_args(args: &Config) -> ParsedArgs<'_> {
556    // We'll only connect to one server.
557    let connect_url = if !args.omit_sni {
558        args.host_port.split(':').next()
559    } else {
560        None
561    };
562
563    let (peer_addr, bind_addr) = resolve_socket_addrs(args);
564
565    ParsedArgs {
566        peer_addr,
567        bind_addr,
568        connect_url,
569    }
570}
571
572fn resolve_socket_addrs(args: &Config) -> (SocketAddr, SocketAddr) {
573    // Resolve server address.
574    let peer_addr = if let Some(addr) = &args.connect_to {
575        addr.parse().expect("--connect-to is expected to be a string containing an IPv4 or IPv6 address with a port. E.g. 192.0.2.0:443")
576    } else {
577        let x = format!("https://{}", args.host_port);
578        *url::Url::parse(&x)
579            .unwrap()
580            .socket_addrs(|| None)
581            .unwrap()
582            .first()
583            .unwrap()
584    };
585
586    // Bind to INADDR_ANY or IN6ADDR_ANY depending on the IP family of the
587    // server address. This is needed on macOS and BSD variants that don't
588    // support binding to IN6ADDR_ANY for both v4 and v6.
589    let bind_addr = match peer_addr {
590        std::net::SocketAddr::V4(_) => format!("0.0.0.0:{}", args.source_port),
591        std::net::SocketAddr::V6(_) => format!("[::]:{}", args.source_port),
592    };
593
594    (
595        peer_addr,
596        bind_addr.parse().expect("unable to parse bind address"),
597    )
598}