h3i/client/
sync_client.rs

1// Copyright (C) 2024, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27//! Responsible for creating a [quiche::Connection] and managing I/O.
28
29use std::slice::Iter;
30use std::time::Duration;
31use std::time::Instant;
32
33use crate::client::QUIC_VERSION;
34use crate::frame::H3iFrame;
35use crate::quiche;
36
37use crate::actions::h3::Action;
38use crate::actions::h3::StreamEventType;
39use crate::actions::h3::WaitType;
40use crate::actions::h3::WaitingFor;
41use crate::client::execute_action;
42use crate::client::parse_streams;
43use crate::client::ClientError;
44use crate::client::ConnectionCloseDetails;
45use crate::client::MAX_DATAGRAM_SIZE;
46use crate::config::Config;
47
48use super::parse_args;
49use super::Client;
50use super::CloseTriggerFrames;
51use super::ConnectionSummary;
52use super::ParsedArgs;
53use super::StreamMap;
54use super::StreamParserMap;
55
56#[derive(Default)]
57struct SyncClient {
58    streams: StreamMap,
59    stream_parsers: StreamParserMap,
60}
61
62impl SyncClient {
63    fn new(close_trigger_frames: Option<CloseTriggerFrames>) -> Self {
64        Self {
65            streams: StreamMap::new(close_trigger_frames),
66            ..Default::default()
67        }
68    }
69}
70
71impl Client for SyncClient {
72    fn stream_parsers_mut(&mut self) -> &mut StreamParserMap {
73        &mut self.stream_parsers
74    }
75
76    fn handle_response_frame(&mut self, stream_id: u64, frame: H3iFrame) {
77        self.streams.insert(stream_id, frame);
78    }
79}
80
81fn create_config(args: &Config, should_log_keys: bool) -> quiche::Config {
82    // Create the configuration for the QUIC connection.
83    let mut config = quiche::Config::new(QUIC_VERSION).unwrap();
84
85    config.verify_peer(args.verify_peer);
86    config.set_application_protos(&[b"h3"]).unwrap();
87    config.set_max_idle_timeout(args.idle_timeout);
88    config.set_max_recv_udp_payload_size(MAX_DATAGRAM_SIZE);
89    config.set_max_send_udp_payload_size(MAX_DATAGRAM_SIZE);
90    config.set_initial_max_data(10_000_000);
91    config
92        .set_initial_max_stream_data_bidi_local(args.max_stream_data_bidi_local);
93    config.set_initial_max_stream_data_bidi_remote(
94        args.max_stream_data_bidi_remote,
95    );
96    config.set_initial_max_stream_data_uni(args.max_stream_data_uni);
97    config.set_initial_max_streams_bidi(args.max_streams_bidi);
98    config.set_initial_max_streams_uni(args.max_streams_uni);
99    config.set_disable_active_migration(true);
100    config.set_active_connection_id_limit(0);
101
102    config.set_max_connection_window(args.max_window);
103    config.set_max_stream_window(args.max_stream_window);
104    config.grease(false);
105
106    if should_log_keys {
107        config.log_keys()
108    }
109
110    config
111}
112
113/// Connect to a server and execute provided actions.
114///
115/// Constructs a socket and [quiche::Connection] based on the provided `args`,
116/// then iterates over `actions`.
117///
118/// If `close_trigger_frames` is specified, h3i will close the connection
119/// immediately upon receiving all of the supplied frames rather than waiting
120/// for the idle timeout. See [`CloseTriggerFrames`] for details.
121///
122/// Returns a [ConnectionSummary] on success, [ClientError] on failure.
123pub fn connect(
124    args: Config, actions: Vec<Action>,
125    close_trigger_frames: Option<CloseTriggerFrames>,
126) -> std::result::Result<ConnectionSummary, ClientError> {
127    let mut buf = [0; 65535];
128    let mut out = [0; MAX_DATAGRAM_SIZE];
129
130    let ParsedArgs {
131        connect_url,
132        bind_addr,
133        peer_addr,
134    } = parse_args(&args);
135
136    // Setup the event loop.
137    let mut poll = mio::Poll::new().unwrap();
138    let mut events = mio::Events::with_capacity(1024);
139
140    // Create the UDP socket backing the QUIC connection, and register it with
141    // the event loop.
142    let mut socket = mio::net::UdpSocket::bind(bind_addr).unwrap();
143    poll.registry()
144        .register(&mut socket, mio::Token(0), mio::Interest::READABLE)
145        .unwrap();
146
147    let mut keylog = None;
148    if let Some(keylog_path) = std::env::var_os("SSLKEYLOGFILE") {
149        let file = std::fs::OpenOptions::new()
150            .create(true)
151            .append(true)
152            .open(keylog_path)
153            .unwrap();
154
155        keylog = Some(file);
156    }
157
158    let mut config = create_config(&args, keylog.is_some());
159
160    // Generate a random source connection ID for the connection.
161    let mut scid = [0; quiche::MAX_CONN_ID_LEN];
162    rand::RngCore::fill_bytes(&mut rand::thread_rng(), &mut scid);
163
164    let scid = quiche::ConnectionId::from_ref(&scid);
165
166    let Ok(local_addr) = socket.local_addr() else {
167        return Err(ClientError::Other("invalid socket".to_string()));
168    };
169
170    // Create a QUIC connection and initiate handshake.
171    let mut conn =
172        quiche::connect(connect_url, &scid, local_addr, peer_addr, &mut config)
173            .map_err(|e| ClientError::Other(e.to_string()))?;
174
175    if let Some(keylog) = &mut keylog {
176        if let Ok(keylog) = keylog.try_clone() {
177            conn.set_keylog(Box::new(keylog));
178        }
179    }
180
181    log::info!(
182        "connecting to {peer_addr:} from {local_addr:} with scid {scid:?}",
183    );
184
185    let mut app_proto_selected = false;
186
187    let (write, send_info) = conn.send(&mut out).expect("initial send failed");
188
189    while let Err(e) = socket.send_to(&out[..write], send_info.to) {
190        if e.kind() == std::io::ErrorKind::WouldBlock {
191            log::debug!(
192                "{} -> {}: send() would block",
193                socket.local_addr().unwrap(),
194                send_info.to
195            );
196            continue;
197        }
198
199        return Err(ClientError::Other(format!("send() failed: {e:?}")));
200    }
201
202    let app_data_start = std::time::Instant::now();
203
204    let mut action_iter = actions.iter();
205    let mut wait_duration = None;
206    let mut wait_instant = None;
207
208    let mut client = SyncClient::new(close_trigger_frames);
209    let mut waiting_for = WaitingFor::default();
210
211    loop {
212        let actual_sleep = match (wait_duration, conn.timeout()) {
213            (Some(wait), Some(timeout)) => {
214                #[allow(clippy::comparison_chain)]
215                if timeout < wait {
216                    // shave some off the wait time so it doesn't go longer
217                    // than user really wanted.
218                    let new = wait - timeout;
219                    wait_duration = Some(new);
220                    Some(timeout)
221                } else if wait < timeout {
222                    Some(wait)
223                } else {
224                    // same, so picking either doesn't matter
225                    Some(timeout)
226                }
227            },
228            (None, Some(timeout)) => Some(timeout),
229            (Some(wait), None) => Some(wait),
230            _ => None,
231        };
232
233        log::debug!("actual sleep is {actual_sleep:?}");
234        poll.poll(&mut events, actual_sleep).unwrap();
235
236        // If the event loop reported no events, run a belt and braces check on
237        // the quiche connection's timeouts.
238        if events.is_empty() {
239            log::debug!("timed out");
240
241            conn.on_timeout();
242        }
243
244        // Read incoming UDP packets from the socket and feed them to quiche,
245        // until there are no more packets to read.
246        for event in &events {
247            let socket = match event.token() {
248                mio::Token(0) => &socket,
249
250                _ => unreachable!(),
251            };
252
253            let local_addr = socket.local_addr().unwrap();
254            'read: loop {
255                let (len, from) = match socket.recv_from(&mut buf) {
256                    Ok(v) => v,
257
258                    Err(e) => {
259                        // There are no more UDP packets to read on this socket.
260                        // Process subsequent events.
261                        if e.kind() == std::io::ErrorKind::WouldBlock {
262                            break 'read;
263                        }
264
265                        return Err(ClientError::Other(format!(
266                            "{local_addr}: recv() failed: {e:?}"
267                        )));
268                    },
269                };
270
271                let recv_info = quiche::RecvInfo {
272                    to: local_addr,
273                    from,
274                };
275
276                // Process potentially coalesced packets.
277                let _read = match conn.recv(&mut buf[..len], recv_info) {
278                    Ok(v) => v,
279
280                    Err(e) => {
281                        log::debug!("{local_addr}: recv failed: {e:?}");
282                        continue 'read;
283                    },
284                };
285            }
286        }
287
288        log::debug!("done reading");
289
290        if conn.is_closed() {
291            log::info!(
292                "connection closed with error={:?} did_idle_timeout={}, stats={:?} path_stats={:?}",
293                conn.peer_error(),
294                conn.is_timed_out(),
295                conn.stats(),
296                conn.path_stats().collect::<Vec<quiche::PathStats>>(),
297            );
298
299            if !conn.is_established() {
300                log::info!(
301                    "connection timed out after {:?}",
302                    app_data_start.elapsed(),
303                );
304
305                return Err(ClientError::HandshakeFail);
306            }
307
308            break;
309        }
310
311        // Create a new application protocol session once the QUIC connection is
312        // established.
313        if (conn.is_established() || conn.is_in_early_data()) &&
314            !app_proto_selected
315        {
316            app_proto_selected = true;
317        }
318
319        if app_proto_selected {
320            check_duration_and_do_actions(
321                &mut wait_duration,
322                &mut wait_instant,
323                &mut action_iter,
324                &mut conn,
325                &mut waiting_for,
326                client.stream_parsers_mut(),
327            );
328
329            let mut wait_cleared = false;
330            for response in parse_streams(&mut conn, &mut client) {
331                let stream_id = response.stream_id;
332
333                if let StreamEventType::Finished = response.event_type {
334                    waiting_for.clear_waits_on_stream(stream_id);
335                } else {
336                    waiting_for.remove_wait(response);
337                }
338
339                wait_cleared = true;
340            }
341
342            if client.streams.all_close_trigger_frames_seen() {
343                client.streams.close_due_to_trigger_frames(&mut conn);
344            }
345
346            if wait_cleared {
347                check_duration_and_do_actions(
348                    &mut wait_duration,
349                    &mut wait_instant,
350                    &mut action_iter,
351                    &mut conn,
352                    &mut waiting_for,
353                    client.stream_parsers_mut(),
354                );
355            }
356        }
357
358        // Provides as many CIDs as possible.
359        while conn.scids_left() > 0 {
360            let (scid, reset_token) = generate_cid_and_reset_token();
361
362            if conn.new_scid(&scid, reset_token, false).is_err() {
363                break;
364            }
365        }
366
367        // Generate outgoing QUIC packets and send them on the UDP socket, until
368        // quiche reports that there are no more packets to be sent.
369        let sockets = vec![&socket];
370
371        for socket in sockets {
372            let local_addr = socket.local_addr().unwrap();
373
374            for peer_addr in conn.paths_iter(local_addr) {
375                loop {
376                    let (write, send_info) = match conn.send_on_path(
377                        &mut out,
378                        Some(local_addr),
379                        Some(peer_addr),
380                    ) {
381                        Ok(v) => v,
382
383                        Err(quiche::Error::Done) => {
384                            break;
385                        },
386
387                        Err(e) => {
388                            log::error!(
389                                "{local_addr} -> {peer_addr}: send failed: {e:?}"
390                            );
391
392                            conn.close(false, 0x1, b"fail").ok();
393                            break;
394                        },
395                    };
396
397                    if let Err(e) = socket.send_to(&out[..write], send_info.to) {
398                        if e.kind() == std::io::ErrorKind::WouldBlock {
399                            log::debug!(
400                                "{} -> {}: send() would block",
401                                local_addr,
402                                send_info.to
403                            );
404                            break;
405                        }
406
407                        return Err(ClientError::Other(format!(
408                            "{} -> {}: send() failed: {:?}",
409                            local_addr, send_info.to, e
410                        )));
411                    }
412                }
413            }
414        }
415
416        if conn.is_closed() {
417            log::info!(
418                "connection closed, {:?} {:?}",
419                conn.stats(),
420                conn.path_stats().collect::<Vec<quiche::PathStats>>()
421            );
422
423            if !conn.is_established() {
424                log::info!(
425                    "connection timed out after {:?}",
426                    app_data_start.elapsed(),
427                );
428
429                return Err(ClientError::HandshakeFail);
430            }
431
432            break;
433        }
434    }
435
436    Ok(ConnectionSummary {
437        stream_map: client.streams,
438        stats: Some(conn.stats()),
439        path_stats: conn.path_stats().collect(),
440        conn_close_details: ConnectionCloseDetails::new(&conn),
441    })
442}
443
444fn check_duration_and_do_actions(
445    wait_duration: &mut Option<Duration>, wait_instant: &mut Option<Instant>,
446    action_iter: &mut Iter<Action>, conn: &mut quiche::Connection,
447    waiting_for: &mut WaitingFor, stream_parsers: &mut StreamParserMap,
448) {
449    match wait_duration.as_ref() {
450        None => {
451            if let Some(idle_wait) =
452                handle_actions(action_iter, conn, waiting_for, stream_parsers)
453            {
454                *wait_duration = Some(idle_wait);
455                *wait_instant = Some(Instant::now());
456
457                // TODO: the wait period could still be larger than the
458                // negotiated idle timeout.
459                // We could in theory check quiche's idle_timeout value if
460                // it was public.
461                log::info!(
462                    "waiting for {idle_wait:?} before executing more actions"
463                );
464            }
465        },
466
467        Some(period) => {
468            let now = Instant::now();
469            let then = wait_instant.unwrap();
470            log::debug!(
471                "checking if actions wait period elapsed {:?} > {:?}",
472                now.duration_since(then),
473                wait_duration
474            );
475            if now.duration_since(then) >= *period {
476                log::debug!("yup!");
477                *wait_duration = None;
478
479                if let Some(idle_wait) =
480                    handle_actions(action_iter, conn, waiting_for, stream_parsers)
481                {
482                    *wait_duration = Some(idle_wait);
483                }
484            }
485        },
486    }
487}
488
489/// Generate a new pair of Source Connection ID and reset token.
490pub fn generate_cid_and_reset_token() -> (quiche::ConnectionId<'static>, u128) {
491    let mut scid = [0; quiche::MAX_CONN_ID_LEN];
492    rand::RngCore::fill_bytes(&mut rand::thread_rng(), &mut scid);
493    let scid = scid.to_vec().into();
494    let mut reset_token = [0; 16];
495    rand::RngCore::fill_bytes(&mut rand::thread_rng(), &mut reset_token);
496    let reset_token = u128::from_be_bytes(reset_token);
497    (scid, reset_token)
498}
499
500fn handle_actions<'a, I>(
501    iter: &mut I, conn: &mut quiche::Connection, waiting_for: &mut WaitingFor,
502    stream_parsers: &mut StreamParserMap,
503) -> Option<Duration>
504where
505    I: Iterator<Item = &'a Action>,
506{
507    if !waiting_for.is_empty() {
508        log::debug!(
509            "won't fire an action due to waiting for responses: {waiting_for:?}"
510        );
511        return None;
512    }
513
514    // Send actions
515    for action in iter {
516        match action {
517            Action::FlushPackets => return None,
518            Action::Wait { wait_type } => match wait_type {
519                WaitType::WaitDuration(period) => return Some(*period),
520                WaitType::StreamEvent(response) => {
521                    log::info!(
522                        "waiting for {response:?} before executing more actions"
523                    );
524                    waiting_for.add_wait(response);
525                    return None;
526                },
527            },
528            action => execute_action(action, conn, stream_parsers),
529        }
530    }
531
532    None
533}