Skip to main content

h3i/client/
sync_client.rs

1// Copyright (C) 2024, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27//! Responsible for creating a [quiche::Connection] and managing I/O.
28
29use std::slice::Iter;
30use std::time::Duration;
31use std::time::Instant;
32
33use ring::rand::*;
34
35use crate::client::QUIC_VERSION;
36use crate::frame::H3iFrame;
37use crate::quiche;
38
39use crate::actions::h3::Action;
40use crate::actions::h3::StreamEventType;
41use crate::actions::h3::WaitType;
42use crate::actions::h3::WaitingFor;
43use crate::client::execute_action;
44use crate::client::parse_streams;
45use crate::client::ClientError;
46use crate::client::ConnectionCloseDetails;
47use crate::client::MAX_DATAGRAM_SIZE;
48use crate::config::Config;
49
50use super::parse_args;
51use super::Client;
52use super::CloseTriggerFrames;
53use super::ConnectionSummary;
54use super::ParsedArgs;
55use super::StreamMap;
56use super::StreamParserMap;
57
58#[derive(Default)]
59struct SyncClient {
60    streams: StreamMap,
61    stream_parsers: StreamParserMap,
62}
63
64impl SyncClient {
65    fn new(close_trigger_frames: Option<CloseTriggerFrames>) -> Self {
66        Self {
67            streams: StreamMap::new(close_trigger_frames),
68            ..Default::default()
69        }
70    }
71}
72
73impl Client for SyncClient {
74    fn stream_parsers_mut(&mut self) -> &mut StreamParserMap {
75        &mut self.stream_parsers
76    }
77
78    fn handle_response_frame(&mut self, stream_id: u64, frame: H3iFrame) {
79        self.streams.insert(stream_id, frame);
80    }
81}
82
83fn create_config(args: &Config, should_log_keys: bool) -> quiche::Config {
84    // Create the configuration for the QUIC connection.
85    let mut config = quiche::Config::new(QUIC_VERSION).unwrap();
86
87    config.verify_peer(args.verify_peer);
88    config.set_application_protos(&[b"h3"]).unwrap();
89    config.set_max_idle_timeout(args.idle_timeout);
90    config.set_max_recv_udp_payload_size(MAX_DATAGRAM_SIZE);
91    config.set_max_send_udp_payload_size(MAX_DATAGRAM_SIZE);
92    config.set_initial_max_data(10_000_000);
93    config
94        .set_initial_max_stream_data_bidi_local(args.max_stream_data_bidi_local);
95    config.set_initial_max_stream_data_bidi_remote(
96        args.max_stream_data_bidi_remote,
97    );
98    config.set_initial_max_stream_data_uni(args.max_stream_data_uni);
99    config.set_initial_max_streams_bidi(args.max_streams_bidi);
100    config.set_initial_max_streams_uni(args.max_streams_uni);
101    config.set_disable_active_migration(true);
102    config.set_active_connection_id_limit(0);
103
104    config.set_max_connection_window(args.max_window);
105    config.set_max_stream_window(args.max_stream_window);
106    config.set_enable_send_streams_blocked(true);
107    config.grease(false);
108
109    if args.enable_early_data {
110        config.enable_early_data();
111    }
112
113    if args.enable_dgram {
114        config.enable_dgram(
115            true,
116            args.dgram_recv_queue_len,
117            args.dgram_send_queue_len,
118        );
119    }
120    if should_log_keys {
121        config.log_keys()
122    }
123
124    config
125}
126
127/// Connect to a server and execute provided actions.
128///
129/// Constructs a socket and [quiche::Connection] based on the provided `args`,
130/// then iterates over `actions`.
131///
132/// If `close_trigger_frames` is specified, h3i will close the connection
133/// immediately upon receiving all of the supplied frames rather than waiting
134/// for the idle timeout. See [`CloseTriggerFrames`] for details.
135///
136/// Returns a [ConnectionSummary] on success, [ClientError] on failure.
137pub fn connect(
138    args: Config, actions: Vec<Action>,
139    close_trigger_frames: Option<CloseTriggerFrames>,
140) -> std::result::Result<ConnectionSummary, ClientError> {
141    connect_with_early_data(args, None, actions, close_trigger_frames)
142}
143
144/// Connect to a server and execute provided early_action and actions.
145///
146/// See `connect` for additional documentation.
147pub fn connect_with_early_data(
148    args: Config, early_actions: Option<Vec<Action>>, actions: Vec<Action>,
149    close_trigger_frames: Option<CloseTriggerFrames>,
150) -> std::result::Result<ConnectionSummary, ClientError> {
151    let mut buf = [0; 65535];
152    let mut out = [0; MAX_DATAGRAM_SIZE];
153
154    let ParsedArgs {
155        connect_url,
156        bind_addr,
157        peer_addr,
158    } = parse_args(&args);
159
160    // Setup the event loop.
161    let mut poll = mio::Poll::new().unwrap();
162    let mut events = mio::Events::with_capacity(1024);
163
164    // Create the UDP socket backing the QUIC connection, and register it with
165    // the event loop.
166    let mut socket = mio::net::UdpSocket::bind(bind_addr).unwrap();
167    poll.registry()
168        .register(&mut socket, mio::Token(0), mio::Interest::READABLE)
169        .unwrap();
170
171    let mut keylog = None;
172    if let Some(keylog_path) = std::env::var_os("SSLKEYLOGFILE") {
173        let file = std::fs::OpenOptions::new()
174            .create(true)
175            .append(true)
176            .open(keylog_path)
177            .unwrap();
178
179        keylog = Some(file);
180    }
181
182    let mut config = create_config(&args, keylog.is_some());
183
184    // Generate a random source connection ID for the connection.
185    let mut scid = [0; quiche::MAX_CONN_ID_LEN];
186
187    let rng = SystemRandom::new();
188    rng.fill(&mut scid[..]).unwrap();
189
190    let scid = quiche::ConnectionId::from_ref(&scid);
191
192    let Ok(local_addr) = socket.local_addr() else {
193        return Err(ClientError::Other("invalid socket".to_string()));
194    };
195
196    // Create a new client-side QUIC connection.
197    let mut conn =
198        quiche::connect(connect_url, &scid, local_addr, peer_addr, &mut config)
199            .map_err(|e| ClientError::Other(e.to_string()))?;
200
201    if let Some(session) = &args.session {
202        conn.set_session(session)
203            .map_err(|error| ClientError::Other(error.to_string()))?;
204    }
205
206    if let Some(keylog) = &mut keylog {
207        if let Ok(keylog) = keylog.try_clone() {
208            conn.set_keylog(Box::new(keylog));
209        }
210    }
211
212    log::info!(
213        "connecting to {peer_addr:} from {local_addr:} with scid {scid:?}",
214    );
215
216    let mut app_proto_selected = false;
217
218    // Send ClientHello and initiate the handshake.
219    let (write, send_info) = conn.send(&mut out).expect("initial send failed");
220
221    let mut client = SyncClient::new(close_trigger_frames);
222    // Send early data if connection is_in_early_data (resumption with 0-RTT was
223    // successful) and if we have early_actions.
224    if conn.is_in_early_data() {
225        if let Some(early_actions) = early_actions {
226            let mut early_action_iter = early_actions.iter();
227            let mut wait_duration = None;
228            let mut wait_instant = None;
229            let mut waiting_for = WaitingFor::default();
230
231            check_duration_and_do_actions(
232                &mut wait_duration,
233                &mut wait_instant,
234                &mut early_action_iter,
235                &mut conn,
236                &mut waiting_for,
237                client.stream_parsers_mut(),
238            );
239        }
240    }
241
242    while let Err(e) = socket.send_to(&out[..write], send_info.to) {
243        if e.kind() == std::io::ErrorKind::WouldBlock {
244            log::debug!(
245                "{} -> {}: send() would block",
246                socket.local_addr().unwrap(),
247                send_info.to
248            );
249            continue;
250        }
251
252        return Err(ClientError::Other(format!("send() failed: {e:?}")));
253    }
254
255    let app_data_start = std::time::Instant::now();
256
257    let mut action_iter = actions.iter();
258    let mut wait_duration = None;
259    let mut wait_instant = None;
260
261    let mut waiting_for = WaitingFor::default();
262
263    loop {
264        let actual_sleep = match (wait_duration, conn.timeout()) {
265            (Some(wait), Some(timeout)) => {
266                #[allow(clippy::comparison_chain)]
267                if timeout < wait {
268                    // shave some off the wait time so it doesn't go longer
269                    // than user really wanted.
270                    let new = wait - timeout;
271                    wait_duration = Some(new);
272                    Some(timeout)
273                } else if wait < timeout {
274                    Some(wait)
275                } else {
276                    // same, so picking either doesn't matter
277                    Some(timeout)
278                }
279            },
280            (None, Some(timeout)) => Some(timeout),
281            (Some(wait), None) => Some(wait),
282            _ => None,
283        };
284
285        log::debug!("actual sleep is {actual_sleep:?}");
286        poll.poll(&mut events, actual_sleep).unwrap();
287
288        // If the event loop reported no events, run a belt and braces check on
289        // the quiche connection's timeouts.
290        if events.is_empty() {
291            log::debug!("timed out");
292
293            conn.on_timeout();
294        }
295
296        // Read incoming UDP packets from the socket and feed them to quiche,
297        // until there are no more packets to read.
298        for event in &events {
299            let socket = match event.token() {
300                mio::Token(0) => &socket,
301
302                _ => unreachable!(),
303            };
304
305            let local_addr = socket.local_addr().unwrap();
306            'read: loop {
307                let (len, from) = match socket.recv_from(&mut buf) {
308                    Ok(v) => v,
309
310                    Err(e) => {
311                        // There are no more UDP packets to read on this socket.
312                        // Process subsequent events.
313                        if e.kind() == std::io::ErrorKind::WouldBlock {
314                            break 'read;
315                        }
316
317                        return Err(ClientError::Other(format!(
318                            "{local_addr}: recv() failed: {e:?}"
319                        )));
320                    },
321                };
322
323                let recv_info = quiche::RecvInfo {
324                    to: local_addr,
325                    from,
326                };
327
328                // Process potentially coalesced packets.
329                let _read = match conn.recv(&mut buf[..len], recv_info) {
330                    Ok(v) => v,
331
332                    Err(e) => {
333                        log::debug!("{local_addr}: recv failed: {e:?}");
334                        continue 'read;
335                    },
336                };
337            }
338        }
339
340        log::debug!("done reading");
341
342        if conn.is_closed() {
343            log::info!(
344                "connection closed with error={:?} did_idle_timeout={}, stats={:?} path_stats={:?}",
345                conn.peer_error(),
346                conn.is_timed_out(),
347                conn.stats(),
348                conn.path_stats().collect::<Vec<quiche::PathStats>>(),
349            );
350
351            if !conn.is_established() {
352                log::info!(
353                    "connection timed out after {:?}",
354                    app_data_start.elapsed(),
355                );
356
357                return Err(ClientError::HandshakeFail);
358            }
359
360            break;
361        }
362
363        // Create a new application protocol session once the QUIC connection is
364        // established.
365        if (conn.is_established() || conn.is_in_early_data()) &&
366            !app_proto_selected
367        {
368            app_proto_selected = true;
369        }
370
371        if app_proto_selected {
372            check_duration_and_do_actions(
373                &mut wait_duration,
374                &mut wait_instant,
375                &mut action_iter,
376                &mut conn,
377                &mut waiting_for,
378                client.stream_parsers_mut(),
379            );
380
381            let mut wait_cleared = false;
382            for response in parse_streams(&mut conn, &mut client) {
383                let stream_id = response.stream_id;
384
385                if let StreamEventType::Finished = response.event_type {
386                    waiting_for.clear_waits_on_stream(stream_id);
387                } else {
388                    waiting_for.remove_wait(response);
389                }
390
391                wait_cleared = true;
392            }
393
394            // Check if a CanOpenNumStreams wait is satisfied.
395            let before = waiting_for.is_empty();
396            waiting_for.check_can_open_num_streams(&conn);
397            if !before && waiting_for.is_empty() {
398                wait_cleared = true;
399            }
400
401            if client.streams.all_close_trigger_frames_seen() {
402                client.streams.close_due_to_trigger_frames(&mut conn);
403            }
404
405            if wait_cleared {
406                check_duration_and_do_actions(
407                    &mut wait_duration,
408                    &mut wait_instant,
409                    &mut action_iter,
410                    &mut conn,
411                    &mut waiting_for,
412                    client.stream_parsers_mut(),
413                );
414            }
415        }
416
417        // Provides as many CIDs as possible.
418        while conn.scids_left() > 0 {
419            let (scid, reset_token) = generate_cid_and_reset_token();
420
421            if conn.new_scid(&scid, reset_token, false).is_err() {
422                break;
423            }
424        }
425
426        // Generate outgoing QUIC packets and send them on the UDP socket, until
427        // quiche reports that there are no more packets to be sent.
428        let sockets = vec![&socket];
429
430        for socket in sockets {
431            let local_addr = socket.local_addr().unwrap();
432
433            for peer_addr in conn.paths_iter(local_addr) {
434                loop {
435                    let (write, send_info) = match conn.send_on_path(
436                        &mut out,
437                        Some(local_addr),
438                        Some(peer_addr),
439                    ) {
440                        Ok(v) => v,
441
442                        Err(quiche::Error::Done) => {
443                            break;
444                        },
445
446                        Err(e) => {
447                            log::error!(
448                                "{local_addr} -> {peer_addr}: send failed: {e:?}"
449                            );
450
451                            conn.close(false, 0x1, b"fail").ok();
452                            break;
453                        },
454                    };
455
456                    if let Err(e) = socket.send_to(&out[..write], send_info.to) {
457                        if e.kind() == std::io::ErrorKind::WouldBlock {
458                            log::debug!(
459                                "{} -> {}: send() would block",
460                                local_addr,
461                                send_info.to
462                            );
463                            break;
464                        }
465
466                        return Err(ClientError::Other(format!(
467                            "{} -> {}: send() failed: {:?}",
468                            local_addr, send_info.to, e
469                        )));
470                    }
471                }
472            }
473        }
474
475        if conn.is_closed() {
476            log::info!(
477                "connection closed, {:?} {:?}",
478                conn.stats(),
479                conn.path_stats().collect::<Vec<quiche::PathStats>>()
480            );
481
482            if !conn.is_established() {
483                log::info!(
484                    "connection timed out after {:?}",
485                    app_data_start.elapsed(),
486                );
487
488                return Err(ClientError::HandshakeFail);
489            }
490
491            break;
492        }
493    }
494
495    Ok(ConnectionSummary {
496        stream_map: client.streams,
497        stats: Some(conn.stats()),
498        path_stats: conn.path_stats().collect(),
499        conn_close_details: ConnectionCloseDetails::new(&conn),
500    })
501}
502
503fn check_duration_and_do_actions(
504    wait_duration: &mut Option<Duration>, wait_instant: &mut Option<Instant>,
505    action_iter: &mut Iter<Action>, conn: &mut quiche::Connection,
506    waiting_for: &mut WaitingFor, stream_parsers: &mut StreamParserMap,
507) {
508    match wait_duration.as_ref() {
509        None => {
510            if let Some(idle_wait) =
511                handle_actions(action_iter, conn, waiting_for, stream_parsers)
512            {
513                *wait_duration = Some(idle_wait);
514                *wait_instant = Some(Instant::now());
515
516                // TODO: the wait period could still be larger than the
517                // negotiated idle timeout.
518                // We could in theory check quiche's idle_timeout value if
519                // it was public.
520                log::info!(
521                    "waiting for {idle_wait:?} before executing more actions"
522                );
523            }
524        },
525
526        Some(period) => {
527            let now = Instant::now();
528            let then = wait_instant.unwrap();
529            log::debug!(
530                "checking if actions wait period elapsed {:?} > {:?}",
531                now.duration_since(then),
532                wait_duration
533            );
534            if now.duration_since(then) >= *period {
535                log::debug!("yup!");
536                *wait_duration = None;
537
538                if let Some(idle_wait) =
539                    handle_actions(action_iter, conn, waiting_for, stream_parsers)
540                {
541                    *wait_duration = Some(idle_wait);
542                }
543            }
544        },
545    }
546}
547
548/// Generate a new pair of Source Connection ID and reset token.
549pub fn generate_cid_and_reset_token() -> (quiche::ConnectionId<'static>, u128) {
550    let rng = SystemRandom::new();
551
552    let mut scid = [0; quiche::MAX_CONN_ID_LEN];
553    rng.fill(&mut scid[..]).unwrap();
554    let scid = scid.to_vec().into();
555
556    let mut reset_token = [0; 16];
557    rng.fill(&mut reset_token[..]).unwrap();
558
559    let reset_token = u128::from_be_bytes(reset_token);
560    (scid, reset_token)
561}
562
563fn handle_actions<'a, I>(
564    iter: &mut I, conn: &mut quiche::Connection, waiting_for: &mut WaitingFor,
565    stream_parsers: &mut StreamParserMap,
566) -> Option<Duration>
567where
568    I: Iterator<Item = &'a Action>,
569{
570    if !waiting_for.is_empty() {
571        log::debug!(
572            "won't fire an action due to waiting for responses: {waiting_for:?}"
573        );
574        return None;
575    }
576
577    // Send actions
578    for action in iter {
579        match action {
580            Action::FlushPackets => return None,
581            Action::Wait { wait_type } => match wait_type {
582                WaitType::WaitDuration(period) => return Some(*period),
583                WaitType::StreamEvent(response) => {
584                    log::info!(
585                        "waiting for {response:?} before executing more actions"
586                    );
587                    waiting_for.add_wait(response);
588                    return None;
589                },
590                WaitType::CanOpenNumStreams(required_streams) => {
591                    log::info!(
592                        "h3i: waiting for peer_streams_left_bidi >= {required_streams:?}"
593                    );
594                    waiting_for.set_required_stream_quota(*required_streams);
595                    return None;
596                },
597            },
598            action => execute_action(action, conn, stream_parsers),
599        }
600    }
601
602    None
603}