h3i/client/
sync_client.rs

1// Copyright (C) 2024, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27//! Responsible for creating a [quiche::Connection] and managing I/O.
28
29use std::slice::Iter;
30use std::time::Duration;
31use std::time::Instant;
32
33use crate::frame::H3iFrame;
34use crate::quiche;
35
36use crate::actions::h3::Action;
37use crate::actions::h3::StreamEventType;
38use crate::actions::h3::WaitType;
39use crate::actions::h3::WaitingFor;
40use crate::client::build_quiche_connection;
41use crate::client::execute_action;
42use crate::client::parse_streams;
43use crate::client::ClientError;
44use crate::client::ConnectionCloseDetails;
45use crate::client::MAX_DATAGRAM_SIZE;
46use crate::config::Config;
47
48use super::Client;
49use super::CloseTriggerFrames;
50use super::ConnectionSummary;
51use super::StreamMap;
52use super::StreamParserMap;
53
54#[derive(Default)]
55struct SyncClient {
56    streams: StreamMap,
57    stream_parsers: StreamParserMap,
58}
59
60impl SyncClient {
61    fn new(close_trigger_frames: Option<CloseTriggerFrames>) -> Self {
62        Self {
63            streams: StreamMap::new(close_trigger_frames),
64            ..Default::default()
65        }
66    }
67}
68
69impl Client for SyncClient {
70    fn stream_parsers_mut(&mut self) -> &mut StreamParserMap {
71        &mut self.stream_parsers
72    }
73
74    fn handle_response_frame(&mut self, stream_id: u64, frame: H3iFrame) {
75        self.streams.insert(stream_id, frame);
76    }
77}
78
79/// Connect to a server and execute provided actions.
80///
81/// Constructs a socket and [quiche::Connection] based on the provided `args`,
82/// then iterates over `actions`.
83///
84/// If `close_trigger_frames` is specified, h3i will close the connection
85/// immediately upon receiving all of the supplied frames rather than waiting
86/// for the idle timeout. See [`CloseTriggerFrames`] for details.
87///
88/// Returns a [ConnectionSummary] on success, [ClientError] on failure.
89pub fn connect(
90    args: Config, actions: &[Action],
91    close_trigger_frames: Option<CloseTriggerFrames>,
92) -> std::result::Result<ConnectionSummary, ClientError> {
93    let mut buf = [0; 65535];
94    let mut out = [0; MAX_DATAGRAM_SIZE];
95
96    // Setup the event loop.
97    let mut poll = mio::Poll::new().unwrap();
98    let mut events = mio::Events::with_capacity(1024);
99
100    // Resolve server address.
101    let peer_addr = if let Some(addr) = &args.connect_to {
102        addr.parse().expect("--connect-to is expected to be a string containing an IPv4 or IPv6 address with a port. E.g. 192.0.2.0:443")
103    } else {
104        let x = format!("https://{}", args.host_port);
105        *url::Url::parse(&x)
106            .unwrap()
107            .socket_addrs(|| None)
108            .unwrap()
109            .first()
110            .unwrap()
111    };
112
113    // Bind to INADDR_ANY or IN6ADDR_ANY depending on the IP family of the
114    // server address. This is needed on macOS and BSD variants that don't
115    // support binding to IN6ADDR_ANY for both v4 and v6.
116    let bind_addr = match peer_addr {
117        std::net::SocketAddr::V4(_) => format!("0.0.0.0:{}", args.source_port),
118        std::net::SocketAddr::V6(_) => format!("[::]:{}", args.source_port),
119    };
120
121    // Create the UDP socket backing the QUIC connection, and register it with
122    // the event loop.
123    let mut socket =
124        mio::net::UdpSocket::bind(bind_addr.parse().unwrap()).unwrap();
125    poll.registry()
126        .register(&mut socket, mio::Token(0), mio::Interest::READABLE)
127        .unwrap();
128
129    let Ok(local_addr) = socket.local_addr() else {
130        return Err(ClientError::Other("invalid socket".to_string()));
131    };
132
133    let mut conn = build_quiche_connection(args, peer_addr, local_addr)
134        .map_err(|_| ClientError::HandshakeFail)?;
135
136    let mut app_proto_selected = false;
137
138    let (write, send_info) = conn.send(&mut out).expect("initial send failed");
139
140    while let Err(e) = socket.send_to(&out[..write], send_info.to) {
141        if e.kind() == std::io::ErrorKind::WouldBlock {
142            log::debug!(
143                "{} -> {}: send() would block",
144                socket.local_addr().unwrap(),
145                send_info.to
146            );
147            continue;
148        }
149
150        return Err(ClientError::Other(format!("send() failed: {e:?}")));
151    }
152
153    let app_data_start = std::time::Instant::now();
154
155    let mut action_iter = actions.iter();
156    let mut wait_duration = None;
157    let mut wait_instant = None;
158
159    let mut client = SyncClient::new(close_trigger_frames);
160    let mut waiting_for = WaitingFor::default();
161
162    loop {
163        let actual_sleep = match (wait_duration, conn.timeout()) {
164            (Some(wait), Some(timeout)) => {
165                #[allow(clippy::comparison_chain)]
166                if timeout < wait {
167                    // shave some off the wait time so it doesn't go longer
168                    // than user really wanted.
169                    let new = wait - timeout;
170                    wait_duration = Some(new);
171                    Some(timeout)
172                } else if wait < timeout {
173                    Some(wait)
174                } else {
175                    // same, so picking either doesn't matter
176                    Some(timeout)
177                }
178            },
179            (None, Some(timeout)) => Some(timeout),
180            (Some(wait), None) => Some(wait),
181            _ => None,
182        };
183
184        log::debug!("actual sleep is {:?}", actual_sleep);
185        poll.poll(&mut events, actual_sleep).unwrap();
186
187        // If the event loop reported no events, run a belt and braces check on
188        // the quiche connection's timeouts.
189        if events.is_empty() {
190            log::debug!("timed out");
191
192            conn.on_timeout();
193        }
194
195        // Read incoming UDP packets from the socket and feed them to quiche,
196        // until there are no more packets to read.
197        for event in &events {
198            let socket = match event.token() {
199                mio::Token(0) => &socket,
200
201                _ => unreachable!(),
202            };
203
204            let local_addr = socket.local_addr().unwrap();
205            'read: loop {
206                let (len, from) = match socket.recv_from(&mut buf) {
207                    Ok(v) => v,
208
209                    Err(e) => {
210                        // There are no more UDP packets to read on this socket.
211                        // Process subsequent events.
212                        if e.kind() == std::io::ErrorKind::WouldBlock {
213                            break 'read;
214                        }
215
216                        return Err(ClientError::Other(format!(
217                            "{local_addr}: recv() failed: {e:?}"
218                        )));
219                    },
220                };
221
222                let recv_info = quiche::RecvInfo {
223                    to: local_addr,
224                    from,
225                };
226
227                // Process potentially coalesced packets.
228                let _read = match conn.recv(&mut buf[..len], recv_info) {
229                    Ok(v) => v,
230
231                    Err(e) => {
232                        log::debug!("{}: recv failed: {:?}", local_addr, e);
233                        continue 'read;
234                    },
235                };
236            }
237        }
238
239        log::debug!("done reading");
240
241        if conn.is_closed() {
242            log::info!(
243                "connection closed with error={:?} did_idle_timeout={}, stats={:?} path_stats={:?}",
244                conn.peer_error(),
245                conn.is_timed_out(),
246                conn.stats(),
247                conn.path_stats().collect::<Vec<quiche::PathStats>>(),
248            );
249
250            if !conn.is_established() {
251                log::info!(
252                    "connection timed out after {:?}",
253                    app_data_start.elapsed(),
254                );
255
256                return Err(ClientError::HandshakeFail);
257            }
258
259            break;
260        }
261
262        // Create a new application protocol session once the QUIC connection is
263        // established.
264        if (conn.is_established() || conn.is_in_early_data()) &&
265            !app_proto_selected
266        {
267            app_proto_selected = true;
268        }
269
270        if app_proto_selected {
271            check_duration_and_do_actions(
272                &mut wait_duration,
273                &mut wait_instant,
274                &mut action_iter,
275                &mut conn,
276                &mut waiting_for,
277                client.stream_parsers_mut(),
278            );
279
280            let mut wait_cleared = false;
281            for response in parse_streams(&mut conn, &mut client) {
282                let stream_id = response.stream_id;
283
284                if let StreamEventType::Finished = response.event_type {
285                    waiting_for.clear_waits_on_stream(stream_id);
286                } else {
287                    waiting_for.remove_wait(response);
288                }
289
290                wait_cleared = true;
291            }
292
293            if client.streams.all_close_trigger_frames_seen() {
294                client.streams.close_due_to_trigger_frames(&mut conn);
295            }
296
297            if wait_cleared {
298                check_duration_and_do_actions(
299                    &mut wait_duration,
300                    &mut wait_instant,
301                    &mut action_iter,
302                    &mut conn,
303                    &mut waiting_for,
304                    client.stream_parsers_mut(),
305                );
306            }
307        }
308
309        // Provides as many CIDs as possible.
310        while conn.scids_left() > 0 {
311            let (scid, reset_token) = generate_cid_and_reset_token();
312
313            if conn.new_scid(&scid, reset_token, false).is_err() {
314                break;
315            }
316        }
317
318        // Generate outgoing QUIC packets and send them on the UDP socket, until
319        // quiche reports that there are no more packets to be sent.
320        let sockets = vec![&socket];
321
322        for socket in sockets {
323            let local_addr = socket.local_addr().unwrap();
324
325            for peer_addr in conn.paths_iter(local_addr) {
326                loop {
327                    let (write, send_info) = match conn.send_on_path(
328                        &mut out,
329                        Some(local_addr),
330                        Some(peer_addr),
331                    ) {
332                        Ok(v) => v,
333
334                        Err(quiche::Error::Done) => {
335                            break;
336                        },
337
338                        Err(e) => {
339                            log::error!(
340                                "{} -> {}: send failed: {:?}",
341                                local_addr,
342                                peer_addr,
343                                e
344                            );
345
346                            conn.close(false, 0x1, b"fail").ok();
347                            break;
348                        },
349                    };
350
351                    if let Err(e) = socket.send_to(&out[..write], send_info.to) {
352                        if e.kind() == std::io::ErrorKind::WouldBlock {
353                            log::debug!(
354                                "{} -> {}: send() would block",
355                                local_addr,
356                                send_info.to
357                            );
358                            break;
359                        }
360
361                        return Err(ClientError::Other(format!(
362                            "{} -> {}: send() failed: {:?}",
363                            local_addr, send_info.to, e
364                        )));
365                    }
366                }
367            }
368        }
369
370        if conn.is_closed() {
371            log::info!(
372                "connection closed, {:?} {:?}",
373                conn.stats(),
374                conn.path_stats().collect::<Vec<quiche::PathStats>>()
375            );
376
377            if !conn.is_established() {
378                log::info!(
379                    "connection timed out after {:?}",
380                    app_data_start.elapsed(),
381                );
382
383                return Err(ClientError::HandshakeFail);
384            }
385
386            break;
387        }
388    }
389
390    Ok(ConnectionSummary {
391        stream_map: client.streams,
392        stats: Some(conn.stats()),
393        path_stats: conn.path_stats().collect(),
394        conn_close_details: ConnectionCloseDetails::new(&conn),
395    })
396}
397
398fn check_duration_and_do_actions(
399    wait_duration: &mut Option<Duration>, wait_instant: &mut Option<Instant>,
400    action_iter: &mut Iter<Action>, conn: &mut quiche::Connection,
401    waiting_for: &mut WaitingFor, stream_parsers: &mut StreamParserMap,
402) {
403    match wait_duration.as_ref() {
404        None => {
405            if let Some(idle_wait) =
406                handle_actions(action_iter, conn, waiting_for, stream_parsers)
407            {
408                *wait_duration = Some(idle_wait);
409                *wait_instant = Some(Instant::now());
410
411                // TODO: the wait period could still be larger than the
412                // negotiated idle timeout.
413                // We could in theory check quiche's idle_timeout value if
414                // it was public.
415                log::info!(
416                    "waiting for {:?} before executing more actions",
417                    idle_wait
418                );
419            }
420        },
421
422        Some(period) => {
423            let now = Instant::now();
424            let then = wait_instant.unwrap();
425            log::debug!(
426                "checking if actions wait period elapsed {:?} > {:?}",
427                now.duration_since(then),
428                wait_duration
429            );
430            if now.duration_since(then) >= *period {
431                log::debug!("yup!");
432                *wait_duration = None;
433
434                if let Some(idle_wait) =
435                    handle_actions(action_iter, conn, waiting_for, stream_parsers)
436                {
437                    *wait_duration = Some(idle_wait);
438                }
439            }
440        },
441    }
442}
443
444/// Generate a new pair of Source Connection ID and reset token.
445pub fn generate_cid_and_reset_token() -> (quiche::ConnectionId<'static>, u128) {
446    let mut scid = [0; quiche::MAX_CONN_ID_LEN];
447    rand::RngCore::fill_bytes(&mut rand::thread_rng(), &mut scid);
448    let scid = scid.to_vec().into();
449    let mut reset_token = [0; 16];
450    rand::RngCore::fill_bytes(&mut rand::thread_rng(), &mut reset_token);
451    let reset_token = u128::from_be_bytes(reset_token);
452    (scid, reset_token)
453}
454
455/// Makes a buffered writer for a qlog.
456pub fn make_qlog_writer(
457    dir: &std::ffi::OsStr, role: &str, id: &str,
458) -> std::io::BufWriter<std::fs::File> {
459    let mut path = std::path::PathBuf::from(dir);
460    let filename = format!("{role}-{id}.sqlog");
461    path.push(filename);
462
463    match std::fs::File::create(&path) {
464        Ok(f) => std::io::BufWriter::new(f),
465
466        Err(e) => panic!(
467            "Error creating qlog file attempted path was {:?}: {}",
468            path, e
469        ),
470    }
471}
472
473fn handle_actions<'a, I>(
474    iter: &mut I, conn: &mut quiche::Connection, waiting_for: &mut WaitingFor,
475    stream_parsers: &mut StreamParserMap,
476) -> Option<Duration>
477where
478    I: Iterator<Item = &'a Action>,
479{
480    if !waiting_for.is_empty() {
481        log::debug!(
482            "won't fire an action due to waiting for responses: {:?}",
483            waiting_for
484        );
485        return None;
486    }
487
488    // Send actions
489    for action in iter {
490        match action {
491            Action::FlushPackets => return None,
492            Action::Wait { wait_type } => match wait_type {
493                WaitType::WaitDuration(period) => return Some(*period),
494                WaitType::StreamEvent(response) => {
495                    log::info!(
496                        "waiting for {:?} before executing more actions",
497                        response
498                    );
499                    waiting_for.add_wait(response);
500                    return None;
501                },
502            },
503            action => execute_action(action, conn, stream_parsers),
504        }
505    }
506
507    None
508}