Skip to main content

h3i/client/
async_client.rs

1// Copyright (C) 2025, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27//! Responsible for creating a [tokio_quiche::quic::QuicheConnection] and
28//! yielding I/O to tokio-quiche.
29
30use log;
31use quiche::PathStats;
32use quiche::Stats;
33use std::future::Future;
34use std::pin::Pin;
35use std::task::Context;
36use std::task::Poll;
37use std::time::Duration;
38use tokio::select;
39use tokio::sync::mpsc;
40use tokio::sync::oneshot;
41use tokio::time::sleep;
42use tokio::time::sleep_until;
43use tokio::time::Instant;
44use tokio_quiche::buf_factory::BufFactory;
45use tokio_quiche::metrics::Metrics;
46use tokio_quiche::quic::HandshakeInfo;
47use tokio_quiche::quic::QuicheConnection;
48use tokio_quiche::settings::Hooks;
49use tokio_quiche::settings::QuicSettings;
50use tokio_quiche::socket::Socket;
51use tokio_quiche::ApplicationOverQuic;
52use tokio_quiche::ConnectionParams;
53use tokio_quiche::QuicResult;
54
55use crate::actions::h3::Action;
56use crate::actions::h3::WaitType;
57use crate::actions::h3::WaitingFor;
58use crate::client::execute_action;
59use crate::client::parse_args;
60use crate::client::parse_streams;
61use crate::client::ClientError;
62use crate::client::CloseTriggerFrames;
63use crate::client::ConnectionSummary;
64use crate::client::ParsedArgs;
65use crate::client::StreamMap;
66use crate::client::MAX_DATAGRAM_SIZE;
67use crate::config::Config as H3iConfig;
68use crate::frame::H3iFrame;
69use crate::quiche;
70
71use super::Client;
72use super::ConnectionCloseDetails;
73use super::StreamParserMap;
74
75/// Connect to the socket.
76pub async fn connect(
77    args: &H3iConfig, frame_actions: Vec<Action>,
78    close_trigger_frames: Option<CloseTriggerFrames>,
79) -> std::result::Result<BuildingConnectionSummary, ClientError> {
80    let quic_settings = create_config(args);
81    let mut connection_params =
82        ConnectionParams::new_client(quic_settings, None, Hooks::default());
83
84    connection_params.session = args.session.clone();
85
86    let ParsedArgs {
87        connect_url,
88        bind_addr,
89        peer_addr,
90    } = parse_args(args);
91
92    let socket = tokio::net::UdpSocket::bind(bind_addr).await.unwrap();
93    socket.connect(peer_addr).await.unwrap();
94
95    log::info!(
96        "connecting to {:} from {:}",
97        peer_addr,
98        socket.local_addr().unwrap()
99    );
100
101    let (h3i, conn_summary_fut) =
102        H3iDriver::new(frame_actions, close_trigger_frames);
103    match tokio_quiche::quic::connect_with_config(
104        Socket::try_from(socket).unwrap(),
105        connect_url,
106        &connection_params,
107        h3i,
108    )
109    .await
110    {
111        Ok(_) => Ok(conn_summary_fut),
112        Err(_) => Err(ClientError::HandshakeFail),
113    }
114}
115
116fn create_config(args: &H3iConfig) -> QuicSettings {
117    let mut quic_settings = QuicSettings::default();
118
119    quic_settings.verify_peer = args.verify_peer;
120    quic_settings.max_idle_timeout =
121        Some(Duration::from_millis(args.idle_timeout));
122    quic_settings.max_recv_udp_payload_size = MAX_DATAGRAM_SIZE;
123    quic_settings.max_send_udp_payload_size = MAX_DATAGRAM_SIZE;
124    quic_settings.initial_max_data = 10_000_000;
125    quic_settings.initial_max_stream_data_bidi_local =
126        args.max_stream_data_bidi_local;
127    quic_settings.initial_max_stream_data_bidi_remote =
128        args.max_stream_data_bidi_remote;
129    quic_settings.initial_max_stream_data_uni = args.max_stream_data_uni;
130    quic_settings.initial_max_streams_bidi = args.max_streams_bidi;
131    quic_settings.initial_max_streams_uni = args.max_streams_uni;
132    quic_settings.disable_active_migration = true;
133    quic_settings.active_connection_id_limit = 0;
134    quic_settings.max_connection_window = args.max_window;
135    quic_settings.max_stream_window = args.max_stream_window;
136    quic_settings.enable_send_streams_blocked = true;
137    quic_settings.grease = false;
138
139    quic_settings.capture_quiche_logs = true;
140    quic_settings.keylog_file = std::env::var_os("SSLKEYLOGFILE")
141        .and_then(|os_str| os_str.into_string().ok());
142
143    quic_settings.enable_dgram = args.enable_dgram;
144    quic_settings.dgram_recv_max_queue_len = args.dgram_recv_queue_len;
145    quic_settings.dgram_send_max_queue_len = args.dgram_send_queue_len;
146
147    quic_settings
148}
149
150/// The [`Future`] used to build a [`ConnectionSummary`].
151///
152/// At a high level, [`H3iDriver`] will interact with the UDP socket directly,
153/// sending and receiving data as necessary. As new data is received, it will
154/// send [`ConnectionRecord`]s to this struct, which uses these records to
155/// construct the [`ConnectionSummary`].
156#[must_use = "must await to get a ConnectionSummary"]
157pub struct BuildingConnectionSummary {
158    rx: mpsc::UnboundedReceiver<ConnectionRecord>,
159    summary: Option<ConnectionSummary>,
160    seen_all_close_trigger_frames: Option<oneshot::Sender<()>>,
161}
162
163impl BuildingConnectionSummary {
164    fn new(
165        rx: mpsc::UnboundedReceiver<ConnectionRecord>,
166        close_trigger_frames: Option<CloseTriggerFrames>,
167        trigger_frame_tx: oneshot::Sender<()>,
168    ) -> Self {
169        let summary = ConnectionSummary {
170            stream_map: StreamMap::new(close_trigger_frames),
171            ..Default::default()
172        };
173
174        Self {
175            rx,
176            summary: Some(summary),
177            seen_all_close_trigger_frames: Some(trigger_frame_tx),
178        }
179    }
180}
181
182impl Future for BuildingConnectionSummary {
183    type Output = ConnectionSummary;
184
185    fn poll(
186        mut self: Pin<&mut Self>, cx: &mut Context<'_>,
187    ) -> Poll<Self::Output> {
188        while let Poll::Ready(Some(record)) = self.rx.poll_recv(cx) {
189            // Grab all records received from the current event loop iteration and
190            // insert them into the in-progress summary
191            let summary = self.summary.as_mut().expect("summary already taken");
192
193            match record {
194                ConnectionRecord::StreamedFrame { stream_id, frame } => {
195                    let stream_map = &mut summary.stream_map;
196                    stream_map.insert(stream_id, frame);
197
198                    if stream_map.all_close_trigger_frames_seen() {
199                        // Signal the H3iDriver task to close the connection.
200                        if let Some(expected_tx) =
201                            self.seen_all_close_trigger_frames.take()
202                        {
203                            let _ = expected_tx.send(());
204                        }
205                    }
206                },
207                ConnectionRecord::ConnectionStats(s) => summary.stats = Some(s),
208                ConnectionRecord::PathStats(ps) => summary.path_stats = ps,
209                ConnectionRecord::Close(d) => summary.conn_close_details = d,
210            };
211        }
212
213        if self.rx.is_closed() {
214            // The sender drops when the Tokio-Quiche IOW finishes, so the
215            // connection is done and we're safe to yield the summary.
216            let summary = self.summary.take().expect("summary already taken");
217            Poll::Ready(summary)
218        } else {
219            Poll::Pending
220        }
221    }
222}
223
224pub struct H3iDriver {
225    buffer: Vec<u8>,
226    actions: Vec<Action>,
227    actions_executed: usize,
228    next_fire_time: Instant,
229    waiting_for_responses: WaitingFor,
230    record_tx: mpsc::UnboundedSender<ConnectionRecord>,
231    stream_parsers: StreamParserMap,
232    close_trigger_seen_rx: oneshot::Receiver<()>,
233}
234
235impl H3iDriver {
236    fn new(
237        actions: Vec<Action>, close_trigger_frames: Option<CloseTriggerFrames>,
238    ) -> (Self, BuildingConnectionSummary) {
239        let (record_tx, record_rx) = mpsc::unbounded_channel();
240        let (close_trigger_seen_tx, close_trigger_seen_rx) = oneshot::channel();
241        let fut = BuildingConnectionSummary::new(
242            record_rx,
243            close_trigger_frames,
244            close_trigger_seen_tx,
245        );
246
247        (
248            Self {
249                buffer: vec![0u8; BufFactory::MAX_BUF_SIZE],
250                actions,
251                actions_executed: 0,
252                next_fire_time: Instant::now(),
253                waiting_for_responses: WaitingFor::default(),
254                record_tx,
255                stream_parsers: StreamParserMap::default(),
256                close_trigger_seen_rx,
257            },
258            fut,
259        )
260    }
261
262    /// If the next action should fire.
263    fn should_fire(&self) -> bool {
264        Instant::now() >= self.next_fire_time
265    }
266
267    /// Insert all waits into the waiting set.
268    fn register_waits(&mut self) {
269        while self.actions_executed < self.actions.len() {
270            if let Action::Wait { wait_type } =
271                &self.actions[self.actions_executed]
272            {
273                self.actions_executed += 1;
274
275                match wait_type {
276                    WaitType::WaitDuration(duration) => {
277                        self.next_fire_time = Instant::now() + *duration;
278
279                        log::debug!(
280                            "h3i: waiting for responses: {:?}",
281                            self.waiting_for_responses
282                        );
283                    },
284                    WaitType::StreamEvent(event) => {
285                        self.waiting_for_responses.add_wait(event);
286                    },
287                    WaitType::CanOpenNumStreams(required_streams) => {
288                        log::info!(
289                            "h3i: waiting for peer_streams_left_bidi >= {required_streams:?}"
290                        );
291                        self.waiting_for_responses
292                            .set_required_stream_quota(*required_streams);
293                    },
294                }
295            } else {
296                break;
297            }
298        }
299    }
300}
301
302impl Client for H3iDriver {
303    fn stream_parsers_mut(&mut self) -> &mut StreamParserMap {
304        &mut self.stream_parsers
305    }
306
307    fn handle_response_frame(
308        &mut self, stream_id: u64, frame: crate::frame::H3iFrame,
309    ) {
310        self.record_tx
311            .send(ConnectionRecord::StreamedFrame { stream_id, frame })
312            .expect("H3iDriver task dropped")
313    }
314}
315
316impl ApplicationOverQuic for H3iDriver {
317    fn on_conn_established(
318        &mut self, _qconn: &mut QuicheConnection, _handshake_info: &HandshakeInfo,
319    ) -> QuicResult<()> {
320        log::info!("h3i: HTTP/3 connection established");
321        Ok(())
322    }
323
324    fn should_act(&self) -> bool {
325        // Even if the connection wasn't established, we should still send
326        // terminal records to the summary
327        true
328    }
329
330    fn process_reads(&mut self, qconn: &mut QuicheConnection) -> QuicResult<()> {
331        log::trace!("h3i: process_reads");
332
333        // This is executed in process_reads so that work_loop can clear any waits
334        // on the current event loop iteration - if it was in process_writes, we
335        // could potentially miss waits and hang the client.
336        self.register_waits();
337
338        let stream_events = parse_streams(qconn, self);
339        for event in stream_events {
340            self.waiting_for_responses.remove_wait(event);
341        }
342
343        self.waiting_for_responses.check_can_open_num_streams(qconn);
344
345        Ok(())
346    }
347
348    fn process_writes(&mut self, qconn: &mut QuicheConnection) -> QuicResult<()> {
349        log::trace!("h3i: process_writes");
350
351        if !self.waiting_for_responses.is_empty() {
352            log::debug!(
353                "awaiting responses on streams {:?}, skipping further action",
354                self.waiting_for_responses
355            );
356
357            return Ok(());
358        }
359
360        // Re-create the iterator so we can mutably borrow the stream parser map
361        let iter = self.actions.clone().into_iter().skip(self.actions_executed);
362
363        for action in iter {
364            match action {
365                Action::SendFrame { .. } |
366                Action::StreamBytes { .. } |
367                Action::SendDatagram { .. } |
368                Action::ResetStream { .. } |
369                Action::StopSending { .. } |
370                Action::OpenUniStream { .. } |
371                Action::ConnectionClose { .. } |
372                Action::SendHeadersFrame { .. } => {
373                    if self.should_fire() {
374                        // Reset the fire time such that the next action will
375                        // still fire.
376                        self.next_fire_time = Instant::now();
377
378                        execute_action(&action, qconn, self.stream_parsers_mut());
379                        self.actions_executed += 1;
380                    } else {
381                        break;
382                    }
383                },
384                Action::Wait { .. } => {
385                    // Break out of the write phase if we see a wait, since waits
386                    // have to be registered in the read
387                    // phase. The actions_executed pointer will be
388                    // incremented there as well
389                    break;
390                },
391                Action::FlushPackets => {
392                    self.actions_executed += 1;
393                    break;
394                },
395            }
396        }
397
398        Ok(())
399    }
400
401    async fn wait_for_data(
402        &mut self, qconn: &mut QuicheConnection,
403    ) -> QuicResult<()> {
404        log::trace!("h3i: wait_for_data");
405
406        let sleep_fut = if !self.should_fire() {
407            sleep_until(self.next_fire_time)
408        } else {
409            // If we have nothing to send, allow the IOW to resolve wait_for_data
410            // on its own (whether via Quiche timer or incoming data).
411            sleep(Duration::MAX)
412        };
413
414        select! {
415            rx = &mut self.close_trigger_seen_rx, if !self.close_trigger_seen_rx.is_terminated() => {
416                // NOTE: wait_for_data can be called again after all close triggers have been seen,
417                // depending on how long it takes quiche to mark the connection as closed.
418                // Therefore we can't re-poll the receiver or we'd panic.
419                if rx.is_ok() {
420                    // TODO: customizable close trigger frames
421                    let _ = qconn.close(true, quiche::h3::WireErrorCode::NoError as u64, b"saw all expected frames");
422                }
423            }
424            _ = sleep_fut => {}
425        }
426
427        Ok(())
428    }
429
430    fn buffer(&mut self) -> &mut [u8] {
431        &mut self.buffer
432    }
433
434    fn on_conn_close<M: Metrics>(
435        &mut self, qconn: &mut QuicheConnection, _metrics: &M,
436        _work_loop_result: &QuicResult<()>,
437    ) {
438        let _ = self
439            .record_tx
440            .send(ConnectionRecord::Close(ConnectionCloseDetails::new(qconn)));
441
442        let _ = self
443            .record_tx
444            .send(ConnectionRecord::ConnectionStats(qconn.stats()));
445
446        let conn_path_stats = qconn.path_stats().collect::<Vec<PathStats>>();
447        let _ = self
448            .record_tx
449            .send(ConnectionRecord::PathStats(conn_path_stats));
450    }
451}
452
453pub enum ConnectionRecord {
454    StreamedFrame { stream_id: u64, frame: H3iFrame },
455    Close(ConnectionCloseDetails),
456    PathStats(Vec<PathStats>),
457    ConnectionStats(Stats),
458}