h3i/client/
sync_client.rs

1// Copyright (C) 2024, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27//! Responsible for creating a [quiche::Connection] and managing I/O.
28
29use std::slice::Iter;
30use std::time::Duration;
31use std::time::Instant;
32
33use ring::rand::*;
34
35use crate::client::QUIC_VERSION;
36use crate::frame::H3iFrame;
37use crate::quiche;
38
39use crate::actions::h3::Action;
40use crate::actions::h3::StreamEventType;
41use crate::actions::h3::WaitType;
42use crate::actions::h3::WaitingFor;
43use crate::client::execute_action;
44use crate::client::parse_streams;
45use crate::client::ClientError;
46use crate::client::ConnectionCloseDetails;
47use crate::client::MAX_DATAGRAM_SIZE;
48use crate::config::Config;
49
50use super::parse_args;
51use super::Client;
52use super::CloseTriggerFrames;
53use super::ConnectionSummary;
54use super::ParsedArgs;
55use super::StreamMap;
56use super::StreamParserMap;
57
58#[derive(Default)]
59struct SyncClient {
60    streams: StreamMap,
61    stream_parsers: StreamParserMap,
62}
63
64impl SyncClient {
65    fn new(close_trigger_frames: Option<CloseTriggerFrames>) -> Self {
66        Self {
67            streams: StreamMap::new(close_trigger_frames),
68            ..Default::default()
69        }
70    }
71}
72
73impl Client for SyncClient {
74    fn stream_parsers_mut(&mut self) -> &mut StreamParserMap {
75        &mut self.stream_parsers
76    }
77
78    fn handle_response_frame(&mut self, stream_id: u64, frame: H3iFrame) {
79        self.streams.insert(stream_id, frame);
80    }
81}
82
83fn create_config(args: &Config, should_log_keys: bool) -> quiche::Config {
84    // Create the configuration for the QUIC connection.
85    let mut config = quiche::Config::new(QUIC_VERSION).unwrap();
86
87    config.verify_peer(args.verify_peer);
88    config.set_application_protos(&[b"h3"]).unwrap();
89    config.set_max_idle_timeout(args.idle_timeout);
90    config.set_max_recv_udp_payload_size(MAX_DATAGRAM_SIZE);
91    config.set_max_send_udp_payload_size(MAX_DATAGRAM_SIZE);
92    config.set_initial_max_data(10_000_000);
93    config
94        .set_initial_max_stream_data_bidi_local(args.max_stream_data_bidi_local);
95    config.set_initial_max_stream_data_bidi_remote(
96        args.max_stream_data_bidi_remote,
97    );
98    config.set_initial_max_stream_data_uni(args.max_stream_data_uni);
99    config.set_initial_max_streams_bidi(args.max_streams_bidi);
100    config.set_initial_max_streams_uni(args.max_streams_uni);
101    config.set_disable_active_migration(true);
102    config.set_active_connection_id_limit(0);
103
104    config.set_max_connection_window(args.max_window);
105    config.set_max_stream_window(args.max_stream_window);
106    config.grease(false);
107
108    if args.enable_dgram {
109        config.enable_dgram(
110            true,
111            args.dgram_recv_queue_len,
112            args.dgram_send_queue_len,
113        );
114    }
115    if should_log_keys {
116        config.log_keys()
117    }
118
119    config
120}
121
122/// Connect to a server and execute provided actions.
123///
124/// Constructs a socket and [quiche::Connection] based on the provided `args`,
125/// then iterates over `actions`.
126///
127/// If `close_trigger_frames` is specified, h3i will close the connection
128/// immediately upon receiving all of the supplied frames rather than waiting
129/// for the idle timeout. See [`CloseTriggerFrames`] for details.
130///
131/// Returns a [ConnectionSummary] on success, [ClientError] on failure.
132pub fn connect(
133    args: Config, actions: Vec<Action>,
134    close_trigger_frames: Option<CloseTriggerFrames>,
135) -> std::result::Result<ConnectionSummary, ClientError> {
136    let mut buf = [0; 65535];
137    let mut out = [0; MAX_DATAGRAM_SIZE];
138
139    let ParsedArgs {
140        connect_url,
141        bind_addr,
142        peer_addr,
143    } = parse_args(&args);
144
145    // Setup the event loop.
146    let mut poll = mio::Poll::new().unwrap();
147    let mut events = mio::Events::with_capacity(1024);
148
149    // Create the UDP socket backing the QUIC connection, and register it with
150    // the event loop.
151    let mut socket = mio::net::UdpSocket::bind(bind_addr).unwrap();
152    poll.registry()
153        .register(&mut socket, mio::Token(0), mio::Interest::READABLE)
154        .unwrap();
155
156    let mut keylog = None;
157    if let Some(keylog_path) = std::env::var_os("SSLKEYLOGFILE") {
158        let file = std::fs::OpenOptions::new()
159            .create(true)
160            .append(true)
161            .open(keylog_path)
162            .unwrap();
163
164        keylog = Some(file);
165    }
166
167    let mut config = create_config(&args, keylog.is_some());
168
169    // Generate a random source connection ID for the connection.
170    let mut scid = [0; quiche::MAX_CONN_ID_LEN];
171
172    let rng = SystemRandom::new();
173    rng.fill(&mut scid[..]).unwrap();
174
175    let scid = quiche::ConnectionId::from_ref(&scid);
176
177    let Ok(local_addr) = socket.local_addr() else {
178        return Err(ClientError::Other("invalid socket".to_string()));
179    };
180
181    // Create a QUIC connection and initiate handshake.
182    let mut conn =
183        quiche::connect(connect_url, &scid, local_addr, peer_addr, &mut config)
184            .map_err(|e| ClientError::Other(e.to_string()))?;
185
186    if let Some(keylog) = &mut keylog {
187        if let Ok(keylog) = keylog.try_clone() {
188            conn.set_keylog(Box::new(keylog));
189        }
190    }
191
192    log::info!(
193        "connecting to {peer_addr:} from {local_addr:} with scid {scid:?}",
194    );
195
196    let mut app_proto_selected = false;
197
198    let (write, send_info) = conn.send(&mut out).expect("initial send failed");
199
200    while let Err(e) = socket.send_to(&out[..write], send_info.to) {
201        if e.kind() == std::io::ErrorKind::WouldBlock {
202            log::debug!(
203                "{} -> {}: send() would block",
204                socket.local_addr().unwrap(),
205                send_info.to
206            );
207            continue;
208        }
209
210        return Err(ClientError::Other(format!("send() failed: {e:?}")));
211    }
212
213    let app_data_start = std::time::Instant::now();
214
215    let mut action_iter = actions.iter();
216    let mut wait_duration = None;
217    let mut wait_instant = None;
218
219    let mut client = SyncClient::new(close_trigger_frames);
220    let mut waiting_for = WaitingFor::default();
221
222    loop {
223        let actual_sleep = match (wait_duration, conn.timeout()) {
224            (Some(wait), Some(timeout)) => {
225                #[allow(clippy::comparison_chain)]
226                if timeout < wait {
227                    // shave some off the wait time so it doesn't go longer
228                    // than user really wanted.
229                    let new = wait - timeout;
230                    wait_duration = Some(new);
231                    Some(timeout)
232                } else if wait < timeout {
233                    Some(wait)
234                } else {
235                    // same, so picking either doesn't matter
236                    Some(timeout)
237                }
238            },
239            (None, Some(timeout)) => Some(timeout),
240            (Some(wait), None) => Some(wait),
241            _ => None,
242        };
243
244        log::debug!("actual sleep is {actual_sleep:?}");
245        poll.poll(&mut events, actual_sleep).unwrap();
246
247        // If the event loop reported no events, run a belt and braces check on
248        // the quiche connection's timeouts.
249        if events.is_empty() {
250            log::debug!("timed out");
251
252            conn.on_timeout();
253        }
254
255        // Read incoming UDP packets from the socket and feed them to quiche,
256        // until there are no more packets to read.
257        for event in &events {
258            let socket = match event.token() {
259                mio::Token(0) => &socket,
260
261                _ => unreachable!(),
262            };
263
264            let local_addr = socket.local_addr().unwrap();
265            'read: loop {
266                let (len, from) = match socket.recv_from(&mut buf) {
267                    Ok(v) => v,
268
269                    Err(e) => {
270                        // There are no more UDP packets to read on this socket.
271                        // Process subsequent events.
272                        if e.kind() == std::io::ErrorKind::WouldBlock {
273                            break 'read;
274                        }
275
276                        return Err(ClientError::Other(format!(
277                            "{local_addr}: recv() failed: {e:?}"
278                        )));
279                    },
280                };
281
282                let recv_info = quiche::RecvInfo {
283                    to: local_addr,
284                    from,
285                };
286
287                // Process potentially coalesced packets.
288                let _read = match conn.recv(&mut buf[..len], recv_info) {
289                    Ok(v) => v,
290
291                    Err(e) => {
292                        log::debug!("{local_addr}: recv failed: {e:?}");
293                        continue 'read;
294                    },
295                };
296            }
297        }
298
299        log::debug!("done reading");
300
301        if conn.is_closed() {
302            log::info!(
303                "connection closed with error={:?} did_idle_timeout={}, stats={:?} path_stats={:?}",
304                conn.peer_error(),
305                conn.is_timed_out(),
306                conn.stats(),
307                conn.path_stats().collect::<Vec<quiche::PathStats>>(),
308            );
309
310            if !conn.is_established() {
311                log::info!(
312                    "connection timed out after {:?}",
313                    app_data_start.elapsed(),
314                );
315
316                return Err(ClientError::HandshakeFail);
317            }
318
319            break;
320        }
321
322        // Create a new application protocol session once the QUIC connection is
323        // established.
324        if (conn.is_established() || conn.is_in_early_data()) &&
325            !app_proto_selected
326        {
327            app_proto_selected = true;
328        }
329
330        if app_proto_selected {
331            check_duration_and_do_actions(
332                &mut wait_duration,
333                &mut wait_instant,
334                &mut action_iter,
335                &mut conn,
336                &mut waiting_for,
337                client.stream_parsers_mut(),
338            );
339
340            let mut wait_cleared = false;
341            for response in parse_streams(&mut conn, &mut client) {
342                let stream_id = response.stream_id;
343
344                if let StreamEventType::Finished = response.event_type {
345                    waiting_for.clear_waits_on_stream(stream_id);
346                } else {
347                    waiting_for.remove_wait(response);
348                }
349
350                wait_cleared = true;
351            }
352
353            if client.streams.all_close_trigger_frames_seen() {
354                client.streams.close_due_to_trigger_frames(&mut conn);
355            }
356
357            if wait_cleared {
358                check_duration_and_do_actions(
359                    &mut wait_duration,
360                    &mut wait_instant,
361                    &mut action_iter,
362                    &mut conn,
363                    &mut waiting_for,
364                    client.stream_parsers_mut(),
365                );
366            }
367        }
368
369        // Provides as many CIDs as possible.
370        while conn.scids_left() > 0 {
371            let (scid, reset_token) = generate_cid_and_reset_token();
372
373            if conn.new_scid(&scid, reset_token, false).is_err() {
374                break;
375            }
376        }
377
378        // Generate outgoing QUIC packets and send them on the UDP socket, until
379        // quiche reports that there are no more packets to be sent.
380        let sockets = vec![&socket];
381
382        for socket in sockets {
383            let local_addr = socket.local_addr().unwrap();
384
385            for peer_addr in conn.paths_iter(local_addr) {
386                loop {
387                    let (write, send_info) = match conn.send_on_path(
388                        &mut out,
389                        Some(local_addr),
390                        Some(peer_addr),
391                    ) {
392                        Ok(v) => v,
393
394                        Err(quiche::Error::Done) => {
395                            break;
396                        },
397
398                        Err(e) => {
399                            log::error!(
400                                "{local_addr} -> {peer_addr}: send failed: {e:?}"
401                            );
402
403                            conn.close(false, 0x1, b"fail").ok();
404                            break;
405                        },
406                    };
407
408                    if let Err(e) = socket.send_to(&out[..write], send_info.to) {
409                        if e.kind() == std::io::ErrorKind::WouldBlock {
410                            log::debug!(
411                                "{} -> {}: send() would block",
412                                local_addr,
413                                send_info.to
414                            );
415                            break;
416                        }
417
418                        return Err(ClientError::Other(format!(
419                            "{} -> {}: send() failed: {:?}",
420                            local_addr, send_info.to, e
421                        )));
422                    }
423                }
424            }
425        }
426
427        if conn.is_closed() {
428            log::info!(
429                "connection closed, {:?} {:?}",
430                conn.stats(),
431                conn.path_stats().collect::<Vec<quiche::PathStats>>()
432            );
433
434            if !conn.is_established() {
435                log::info!(
436                    "connection timed out after {:?}",
437                    app_data_start.elapsed(),
438                );
439
440                return Err(ClientError::HandshakeFail);
441            }
442
443            break;
444        }
445    }
446
447    Ok(ConnectionSummary {
448        stream_map: client.streams,
449        stats: Some(conn.stats()),
450        path_stats: conn.path_stats().collect(),
451        conn_close_details: ConnectionCloseDetails::new(&conn),
452    })
453}
454
455fn check_duration_and_do_actions(
456    wait_duration: &mut Option<Duration>, wait_instant: &mut Option<Instant>,
457    action_iter: &mut Iter<Action>, conn: &mut quiche::Connection,
458    waiting_for: &mut WaitingFor, stream_parsers: &mut StreamParserMap,
459) {
460    match wait_duration.as_ref() {
461        None => {
462            if let Some(idle_wait) =
463                handle_actions(action_iter, conn, waiting_for, stream_parsers)
464            {
465                *wait_duration = Some(idle_wait);
466                *wait_instant = Some(Instant::now());
467
468                // TODO: the wait period could still be larger than the
469                // negotiated idle timeout.
470                // We could in theory check quiche's idle_timeout value if
471                // it was public.
472                log::info!(
473                    "waiting for {idle_wait:?} before executing more actions"
474                );
475            }
476        },
477
478        Some(period) => {
479            let now = Instant::now();
480            let then = wait_instant.unwrap();
481            log::debug!(
482                "checking if actions wait period elapsed {:?} > {:?}",
483                now.duration_since(then),
484                wait_duration
485            );
486            if now.duration_since(then) >= *period {
487                log::debug!("yup!");
488                *wait_duration = None;
489
490                if let Some(idle_wait) =
491                    handle_actions(action_iter, conn, waiting_for, stream_parsers)
492                {
493                    *wait_duration = Some(idle_wait);
494                }
495            }
496        },
497    }
498}
499
500/// Generate a new pair of Source Connection ID and reset token.
501pub fn generate_cid_and_reset_token() -> (quiche::ConnectionId<'static>, u128) {
502    let rng = SystemRandom::new();
503
504    let mut scid = [0; quiche::MAX_CONN_ID_LEN];
505    rng.fill(&mut scid[..]).unwrap();
506    let scid = scid.to_vec().into();
507
508    let mut reset_token = [0; 16];
509    rng.fill(&mut reset_token[..]).unwrap();
510
511    let reset_token = u128::from_be_bytes(reset_token);
512    (scid, reset_token)
513}
514
515fn handle_actions<'a, I>(
516    iter: &mut I, conn: &mut quiche::Connection, waiting_for: &mut WaitingFor,
517    stream_parsers: &mut StreamParserMap,
518) -> Option<Duration>
519where
520    I: Iterator<Item = &'a Action>,
521{
522    if !waiting_for.is_empty() {
523        log::debug!(
524            "won't fire an action due to waiting for responses: {waiting_for:?}"
525        );
526        return None;
527    }
528
529    // Send actions
530    for action in iter {
531        match action {
532            Action::FlushPackets => return None,
533            Action::Wait { wait_type } => match wait_type {
534                WaitType::WaitDuration(period) => return Some(*period),
535                WaitType::StreamEvent(response) => {
536                    log::info!(
537                        "waiting for {response:?} before executing more actions"
538                    );
539                    waiting_for.add_wait(response);
540                    return None;
541                },
542            },
543            action => execute_action(action, conn, stream_parsers),
544        }
545    }
546
547    None
548}