h3i/client/
mod.rs

1// Copyright (C) 2024, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27//! The main h3i client runner.
28//!
29//! The client is responsible for connecting to an indicated server, executing
30//! as series of [Action]s, and capturing the results in a
31//! [ConnectionSummary].
32
33pub mod connection_summary;
34pub mod sync_client;
35
36use connection_summary::*;
37use qlog::events::h3::HttpHeader;
38use quiche::ConnectionError;
39
40use std::collections::HashMap;
41use std::net::SocketAddr;
42use std::time::Instant;
43
44use crate::actions::h3::Action;
45use crate::actions::h3::StreamEvent;
46use crate::actions::h3::StreamEventType;
47use crate::config::Config;
48use crate::frame::H3iFrame;
49use crate::frame::ResetStream;
50use crate::frame_parser::FrameParseResult;
51use crate::frame_parser::FrameParser;
52use crate::frame_parser::InterruptCause;
53use crate::recordreplay::qlog::QlogEvent;
54use crate::recordreplay::qlog::*;
55use qlog::events::h3::H3FrameParsed;
56use qlog::events::h3::Http3Frame;
57use qlog::events::EventData;
58use qlog::streamer::QlogStreamer;
59use serde::Serialize;
60
61use quiche::h3::frame::Frame as QFrame;
62use quiche::h3::Error;
63use quiche::h3::NameValue;
64use quiche::Connection;
65use quiche::Result;
66use quiche::{
67    self,
68};
69
70const MAX_DATAGRAM_SIZE: usize = 1350;
71const QUIC_VERSION: u32 = 1;
72
73pub fn build_quiche_connection(
74    args: Config, peer_addr: SocketAddr, local_addr: SocketAddr,
75) -> Result<Connection> {
76    // We'll only connect to one server.
77    let connect_url = if !args.omit_sni {
78        args.host_port.split(':').next()
79    } else {
80        None
81    };
82
83    // Create the configuration for the QUIC connection.
84    let mut config = quiche::Config::new(QUIC_VERSION).unwrap();
85
86    config.verify_peer(args.verify_peer);
87    config.set_application_protos(&[b"h3"]).unwrap();
88    config.set_max_idle_timeout(args.idle_timeout);
89    config.set_max_recv_udp_payload_size(MAX_DATAGRAM_SIZE);
90    config.set_max_send_udp_payload_size(MAX_DATAGRAM_SIZE);
91    config.set_initial_max_data(10_000_000);
92    config
93        .set_initial_max_stream_data_bidi_local(args.max_stream_data_bidi_local);
94    config.set_initial_max_stream_data_bidi_remote(
95        args.max_stream_data_bidi_remote,
96    );
97    config.set_initial_max_stream_data_uni(args.max_stream_data_uni);
98    config.set_initial_max_streams_bidi(args.max_streams_bidi);
99    config.set_initial_max_streams_uni(args.max_streams_uni);
100    config.set_disable_active_migration(true);
101    config.set_active_connection_id_limit(0);
102
103    config.set_max_connection_window(args.max_window);
104    config.set_max_stream_window(args.max_stream_window);
105
106    let mut keylog = None;
107
108    if let Some(keylog_path) = std::env::var_os("SSLKEYLOGFILE") {
109        let file = std::fs::OpenOptions::new()
110            .create(true)
111            .append(true)
112            .open(keylog_path)
113            .unwrap();
114
115        keylog = Some(file);
116
117        config.log_keys();
118    }
119
120    config.grease(false);
121
122    // Generate a random source connection ID for the connection.
123    let mut scid = [0; quiche::MAX_CONN_ID_LEN];
124    rand::RngCore::fill_bytes(&mut rand::thread_rng(), &mut scid);
125
126    let scid = quiche::ConnectionId::from_ref(&scid);
127
128    // Create a QUIC connection and initiate handshake.
129    let mut conn =
130        quiche::connect(connect_url, &scid, local_addr, peer_addr, &mut config)?;
131
132    if let Some(keylog) = &mut keylog {
133        if let Ok(keylog) = keylog.try_clone() {
134            conn.set_keylog(Box::new(keylog));
135        }
136    }
137
138    log::info!(
139        "connecting to {:} from {:} with scid {:?}",
140        peer_addr,
141        local_addr,
142        scid,
143    );
144
145    Ok(conn)
146}
147
148fn handle_qlog(
149    qlog_streamer: Option<&mut QlogStreamer>, qlog_frame: Http3Frame,
150    stream_id: u64,
151) {
152    if let Some(s) = qlog_streamer {
153        let ev_data = EventData::H3FrameParsed(H3FrameParsed {
154            stream_id,
155            frame: qlog_frame,
156            ..Default::default()
157        });
158
159        s.add_event_data_now(ev_data).ok();
160    }
161}
162
163#[derive(Debug, Serialize)]
164/// Represents different errors that can occur when [sync_client] runs.
165pub enum ClientError {
166    /// An error during the QUIC handshake.
167    HandshakeFail,
168    /// An error during HTTP/3 exchanges.
169    HttpFail,
170    /// Some other type of error.
171    Other(String),
172}
173
174pub(crate) trait Client {
175    /// Gives mutable access to the stream parsers to update their state.
176    fn stream_parsers_mut(&mut self) -> &mut StreamParserMap;
177
178    /// Handles a response frame. This allows [`Client`]s to customize how they
179    /// construct a [`StreamMap`] from a list of frames.
180    fn handle_response_frame(&mut self, stream_id: u64, frame: H3iFrame);
181}
182
183pub(crate) type StreamParserMap = HashMap<u64, FrameParser>;
184
185pub(crate) fn execute_action(
186    action: &Action, conn: &mut quiche::Connection,
187    stream_parsers: &mut StreamParserMap,
188) {
189    match action {
190        Action::SendFrame {
191            stream_id,
192            fin_stream,
193            frame,
194        } => {
195            log::info!("frame tx id={} frame={:?}", stream_id, frame);
196
197            // TODO: make serialization smarter
198            let mut d = [42; 9999];
199            let mut b = octets::OctetsMut::with_slice(&mut d);
200
201            if let Some(s) = conn.qlog_streamer() {
202                let events: QlogEvents = action.into();
203                for event in events {
204                    match event {
205                        QlogEvent::Event { data, ex_data } => {
206                            // skip dummy packet
207                            if matches!(data.as_ref(), EventData::PacketSent(..))
208                            {
209                                continue;
210                            }
211
212                            s.add_event_data_ex_now(*data, ex_data).ok();
213                        },
214
215                        QlogEvent::JsonEvent(mut ev) => {
216                            // need to rewrite the event time
217                            ev.time = Instant::now()
218                                .duration_since(s.start_time())
219                                .as_secs_f32() *
220                                1000.0;
221                            s.add_event(ev).ok();
222                        },
223                    }
224                }
225            }
226            let len = frame.to_bytes(&mut b).unwrap();
227
228            // TODO - pass errors here to the connectionsummary, which means we
229            // can't initialize it when the connection's been shut
230            // down
231            conn.stream_send(*stream_id, &d[..len], *fin_stream)
232                .unwrap();
233
234            stream_parsers
235                .entry(*stream_id)
236                .or_insert_with(|| FrameParser::new(*stream_id));
237        },
238
239        Action::SendHeadersFrame {
240            stream_id,
241            fin_stream,
242            headers,
243            frame,
244            ..
245        } => {
246            log::info!(
247                "headers frame tx stream={} hdrs={:?}",
248                stream_id,
249                headers
250            );
251
252            // TODO: make serialization smarter
253            let mut d = [42; 9999];
254            let mut b = octets::OctetsMut::with_slice(&mut d);
255
256            if let Some(s) = conn.qlog_streamer() {
257                let events: QlogEvents = action.into();
258                for event in events {
259                    match event {
260                        QlogEvent::Event { data, ex_data } => {
261                            // skip dummy packet
262                            if matches!(data.as_ref(), EventData::PacketSent(..))
263                            {
264                                continue;
265                            }
266
267                            s.add_event_data_ex_now(*data, ex_data).ok();
268                        },
269
270                        QlogEvent::JsonEvent(mut ev) => {
271                            // need to rewrite the event time
272                            ev.time = Instant::now()
273                                .duration_since(s.start_time())
274                                .as_secs_f32() *
275                                1000.0;
276                            s.add_event(ev).ok();
277                        },
278                    }
279                }
280            }
281            let len = frame.to_bytes(&mut b).unwrap();
282            conn.stream_send(*stream_id, &d[..len], *fin_stream)
283                .unwrap();
284
285            stream_parsers
286                .entry(*stream_id)
287                .or_insert_with(|| FrameParser::new(*stream_id));
288        },
289
290        Action::OpenUniStream {
291            stream_id,
292            fin_stream,
293            stream_type,
294        } => {
295            log::info!(
296                "open uni stream_id={} ty={} fin={}",
297                stream_id,
298                stream_type,
299                fin_stream
300            );
301
302            let mut d = [42; 8];
303            let mut b = octets::OctetsMut::with_slice(&mut d);
304            b.put_varint(*stream_type).unwrap();
305            let off = b.off();
306
307            conn.stream_send(*stream_id, &d[..off], *fin_stream)
308                .unwrap();
309
310            stream_parsers
311                .entry(*stream_id)
312                .or_insert_with(|| FrameParser::new(*stream_id));
313        },
314
315        Action::StreamBytes {
316            stream_id,
317            bytes,
318            fin_stream,
319        } => {
320            log::info!(
321                "stream bytes tx id={} len={} fin={}",
322                stream_id,
323                bytes.len(),
324                fin_stream
325            );
326            conn.stream_send(*stream_id, bytes, *fin_stream).unwrap();
327
328            stream_parsers
329                .entry(*stream_id)
330                .or_insert_with(|| FrameParser::new(*stream_id));
331        },
332
333        Action::ResetStream {
334            stream_id,
335            error_code,
336        } => {
337            log::info!(
338                "reset_stream stream_id={} error_code={}",
339                stream_id,
340                error_code
341            );
342            if let Err(e) = conn.stream_shutdown(
343                *stream_id,
344                quiche::Shutdown::Write,
345                *error_code,
346            ) {
347                log::error!("can't send reset_stream: {}", e);
348                // Clients can't reset streams they don't own. If we attempt to do
349                // this, stream_shutdown would fail, and we
350                // shouldn't create a parser.
351                return;
352            }
353
354            stream_parsers
355                .entry(*stream_id)
356                .or_insert_with(|| FrameParser::new(*stream_id));
357        },
358
359        Action::StopSending {
360            stream_id,
361            error_code,
362        } => {
363            log::info!(
364                "stop_sending stream id={} error_code={}",
365                stream_id,
366                error_code
367            );
368
369            if let Err(e) = conn.stream_shutdown(
370                *stream_id,
371                quiche::Shutdown::Read,
372                *error_code,
373            ) {
374                log::error!("can't send stop_sending: {}", e);
375            }
376
377            // A `STOP_SENDING` should elicit a `RESET_STREAM` in response, which
378            // the frame parser can automatically handle.
379            stream_parsers
380                .entry(*stream_id)
381                .or_insert_with(|| FrameParser::new(*stream_id));
382        },
383
384        Action::ConnectionClose { error } => {
385            let ConnectionError {
386                is_app,
387                error_code,
388                reason,
389            } = error;
390
391            log::info!("connection_close={error:?}");
392            let _ = conn.close(*is_app, *error_code, reason);
393        },
394
395        // Neither of these actions will manipulate the Quiche connection
396        Action::FlushPackets | Action::Wait { .. } => unreachable!(),
397    }
398}
399
400pub(crate) fn parse_streams<C: Client>(
401    conn: &mut quiche::Connection, client: &mut C,
402) -> Vec<StreamEvent> {
403    let mut responded_streams: Vec<StreamEvent> =
404        Vec::with_capacity(conn.readable().len());
405
406    for stream in conn.readable() {
407        // TODO: ignoring control streams
408        if stream % 4 != 0 {
409            continue;
410        }
411
412        loop {
413            let stream_parse_result = client
414                .stream_parsers_mut()
415                .get_mut(&stream)
416                .expect("stream readable with no parser")
417                .try_parse_frame(conn);
418
419            match stream_parse_result {
420                Ok(FrameParseResult::FrameParsed { h3i_frame, fin }) => {
421                    if let H3iFrame::Headers(ref headers) = h3i_frame {
422                        log::info!("hdrs={:?}", headers);
423                    }
424
425                    handle_response_frame(
426                        client,
427                        conn.qlog_streamer(),
428                        &mut responded_streams,
429                        stream,
430                        h3i_frame,
431                    );
432
433                    if fin {
434                        handle_fin(
435                            &mut responded_streams,
436                            client.stream_parsers_mut(),
437                            stream,
438                        );
439                        break;
440                    }
441                },
442                Ok(FrameParseResult::Retry) => {},
443                Ok(FrameParseResult::Interrupted(cause)) => {
444                    if let InterruptCause::ResetStream(error_code) = cause {
445                        let frame = H3iFrame::ResetStream(ResetStream {
446                            stream_id: stream,
447                            error_code,
448                        });
449
450                        log::info!("received reset stream: {:?}", frame);
451                        handle_response_frame(
452                            client,
453                            None,
454                            &mut responded_streams,
455                            stream,
456                            frame,
457                        );
458                    }
459
460                    handle_fin(
461                        &mut responded_streams,
462                        client.stream_parsers_mut(),
463                        stream,
464                    );
465                    break;
466                },
467                Err(e) => {
468                    match e {
469                        Error::TransportError(quiche::Error::Done) => {
470                            log::debug!("stream {stream} exhausted");
471                        },
472                        Error::TransportError(quiche::Error::StreamReset(
473                            error_code,
474                        )) => {
475                            let frame = H3iFrame::ResetStream(ResetStream {
476                                stream_id: stream,
477                                error_code,
478                            });
479
480                            log::info!("received reset stream: {:?}", frame);
481
482                            handle_response_frame(
483                                client,
484                                None,
485                                &mut responded_streams,
486                                stream,
487                                frame,
488                            );
489
490                            client.stream_parsers_mut().remove(&stream);
491                        },
492                        _ => {
493                            log::warn!("stream read error: {e}");
494                        },
495                    };
496
497                    break;
498                },
499            }
500        }
501    }
502
503    responded_streams
504}
505
506fn handle_fin(
507    responded_streams: &mut Vec<StreamEvent>,
508    stream_parsers: &mut StreamParserMap, stream_id: u64,
509) {
510    responded_streams.push(StreamEvent {
511        stream_id,
512        event_type: StreamEventType::Finished,
513    });
514
515    stream_parsers.remove(&stream_id);
516}
517
518/// Push any responses to the [StreamMap] as well as store them in the
519/// `responded` vector
520fn handle_response_frame<C: Client>(
521    client: &mut C, qlog_streamer: Option<&mut QlogStreamer>,
522    responded_streams: &mut Vec<StreamEvent>, stream_id: u64, frame: H3iFrame,
523) {
524    let cloned = frame.clone();
525    client.handle_response_frame(stream_id, cloned);
526
527    let mut to_qlog: Option<Http3Frame> = None;
528    let mut push_to_responses: Option<StreamEvent> = None;
529
530    match frame {
531        H3iFrame::Headers(enriched_headers) => {
532            push_to_responses = Some(StreamEvent {
533                stream_id,
534                event_type: StreamEventType::Headers,
535            });
536
537            let qlog_headers: Vec<HttpHeader> = enriched_headers
538                .headers()
539                .iter()
540                .map(|h| qlog::events::h3::HttpHeader {
541                    name: String::from_utf8_lossy(h.name()).into_owned(),
542                    value: String::from_utf8_lossy(h.value()).into_owned(),
543                })
544                .collect();
545
546            to_qlog = Some(Http3Frame::Headers {
547                headers: qlog_headers,
548            });
549        },
550        H3iFrame::QuicheH3(quiche_frame) => {
551            if let QFrame::Data { .. } = quiche_frame {
552                push_to_responses = Some(StreamEvent {
553                    stream_id,
554                    event_type: StreamEventType::Data,
555                });
556            }
557
558            to_qlog = Some(quiche_frame.to_qlog());
559        },
560        H3iFrame::ResetStream(_) => {
561            push_to_responses = Some(StreamEvent {
562                stream_id,
563                event_type: StreamEventType::Finished,
564            });
565        },
566    }
567
568    if let Some(to_qlog) = to_qlog {
569        handle_qlog(qlog_streamer, to_qlog, stream_id);
570    }
571
572    if let Some(to_push) = push_to_responses {
573        responded_streams.push(to_push);
574    }
575}