h3i/client/
async_client.rs

1// Copyright (C) 2025, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27//! Responsible for creating a [tokio_quiche::quic::QuicheConnection] and
28//! yielding I/O to tokio-quiche.
29
30use buffer_pool::ConsumeBuffer;
31use buffer_pool::Pooled;
32use log;
33use quiche::PathStats;
34use quiche::Stats;
35use std::future::Future;
36use std::pin::Pin;
37use std::task::Context;
38use std::task::Poll;
39use std::time::Duration;
40use tokio::select;
41use tokio::sync::mpsc;
42use tokio::sync::oneshot;
43use tokio::time::sleep_until;
44use tokio::time::Instant;
45use tokio_quiche::buf_factory::BufFactory;
46use tokio_quiche::metrics::Metrics;
47use tokio_quiche::quic::HandshakeInfo;
48use tokio_quiche::quic::QuicheConnection;
49use tokio_quiche::settings::Hooks;
50use tokio_quiche::settings::QuicSettings;
51use tokio_quiche::socket::Socket;
52use tokio_quiche::ApplicationOverQuic;
53use tokio_quiche::ConnectionParams;
54use tokio_quiche::QuicResult;
55
56use crate::actions::h3::Action;
57use crate::actions::h3::WaitType;
58use crate::actions::h3::WaitingFor;
59use crate::client::execute_action;
60use crate::client::parse_args;
61use crate::client::parse_streams;
62use crate::client::ClientError;
63use crate::client::CloseTriggerFrames;
64use crate::client::ConnectionSummary;
65use crate::client::ParsedArgs;
66use crate::client::StreamMap;
67use crate::client::MAX_DATAGRAM_SIZE;
68use crate::config::Config as H3iConfig;
69use crate::frame::H3iFrame;
70use crate::quiche;
71
72use super::Client;
73use super::ConnectionCloseDetails;
74use super::StreamParserMap;
75
76/// Connect to the socket.
77pub async fn connect(
78    args: &H3iConfig, frame_actions: Vec<Action>,
79    close_trigger_frames: Option<CloseTriggerFrames>,
80) -> std::result::Result<BuildingConnectionSummary, ClientError> {
81    let quic_settings = create_config(args);
82    let connection_params =
83        ConnectionParams::new_client(quic_settings, None, Hooks::default());
84
85    let ParsedArgs {
86        connect_url,
87        bind_addr,
88        peer_addr,
89    } = parse_args(args);
90
91    let socket = tokio::net::UdpSocket::bind(bind_addr).await.unwrap();
92    socket.connect(peer_addr).await.unwrap();
93
94    log::info!(
95        "connecting to {:} from {:}",
96        peer_addr,
97        socket.local_addr().unwrap()
98    );
99
100    let (h3i, conn_summary_fut) =
101        H3iDriver::new(frame_actions, close_trigger_frames);
102    match tokio_quiche::quic::connect_with_config(
103        Socket::try_from(socket).unwrap(),
104        connect_url,
105        &connection_params,
106        h3i,
107    )
108    .await
109    {
110        Ok(_) => Ok(conn_summary_fut),
111        Err(_) => Err(ClientError::HandshakeFail),
112    }
113}
114
115fn create_config(args: &H3iConfig) -> QuicSettings {
116    let mut quic_settings = QuicSettings::default();
117
118    quic_settings.verify_peer = args.verify_peer;
119    quic_settings.max_idle_timeout =
120        Some(Duration::from_millis(args.idle_timeout));
121    quic_settings.max_recv_udp_payload_size = MAX_DATAGRAM_SIZE;
122    quic_settings.max_send_udp_payload_size = MAX_DATAGRAM_SIZE;
123    quic_settings.initial_max_data = 10_000_000;
124    quic_settings.initial_max_stream_data_bidi_local =
125        args.max_stream_data_bidi_local;
126    quic_settings.initial_max_stream_data_bidi_remote =
127        args.max_stream_data_bidi_remote;
128    quic_settings.initial_max_stream_data_uni = args.max_stream_data_uni;
129    quic_settings.initial_max_streams_bidi = args.max_streams_bidi;
130    quic_settings.initial_max_streams_uni = args.max_streams_uni;
131    quic_settings.disable_active_migration = true;
132    quic_settings.active_connection_id_limit = 0;
133    quic_settings.max_connection_window = args.max_window;
134    quic_settings.max_stream_window = args.max_stream_window;
135    quic_settings.grease = false;
136
137    quic_settings.capture_quiche_logs = true;
138    quic_settings.keylog_file = std::env::var_os("SSLKEYLOGFILE")
139        .and_then(|os_str| os_str.into_string().ok());
140
141    quic_settings
142}
143
144/// The [`Future`] used to build a [`ConnectionSummary`].
145///
146/// At a high level, [`H3iDriver`] will interact with the UDP socket directly,
147/// sending and receiving data as necessary. As new data is received, it will
148/// send [`ConnectionRecord`]s to this struct, which uses these records to
149/// construct the [`ConnectionSummary`].
150#[must_use = "must await to get a ConnectionSummary"]
151pub struct BuildingConnectionSummary {
152    rx: mpsc::UnboundedReceiver<ConnectionRecord>,
153    summary: Option<ConnectionSummary>,
154    seen_all_close_trigger_frames: Option<oneshot::Sender<()>>,
155}
156
157impl BuildingConnectionSummary {
158    fn new(
159        rx: mpsc::UnboundedReceiver<ConnectionRecord>,
160        close_trigger_frames: Option<CloseTriggerFrames>,
161        trigger_frame_tx: oneshot::Sender<()>,
162    ) -> Self {
163        let summary = ConnectionSummary {
164            stream_map: StreamMap::new(close_trigger_frames),
165            ..Default::default()
166        };
167
168        Self {
169            rx,
170            summary: Some(summary),
171            seen_all_close_trigger_frames: Some(trigger_frame_tx),
172        }
173    }
174}
175
176impl Future for BuildingConnectionSummary {
177    type Output = ConnectionSummary;
178
179    fn poll(
180        mut self: Pin<&mut Self>, cx: &mut Context<'_>,
181    ) -> Poll<Self::Output> {
182        while let Poll::Ready(Some(record)) = self.rx.poll_recv(cx) {
183            // Grab all records received from the current event loop iteration and
184            // insert them into the in-progress summary
185            let summary = self.summary.as_mut().expect("summary already taken");
186
187            match record {
188                ConnectionRecord::StreamedFrame { stream_id, frame } => {
189                    let stream_map = &mut summary.stream_map;
190                    stream_map.insert(stream_id, frame);
191
192                    if stream_map.all_close_trigger_frames_seen() {
193                        // Signal the H3iDriver task to close the connection.
194                        if let Some(expected_tx) =
195                            self.seen_all_close_trigger_frames.take()
196                        {
197                            let _ = expected_tx.send(());
198                        }
199                    }
200                },
201                ConnectionRecord::ConnectionStats(s) => summary.stats = Some(s),
202                ConnectionRecord::PathStats(ps) => summary.path_stats = ps,
203                ConnectionRecord::Close(d) => summary.conn_close_details = d,
204            };
205        }
206
207        if self.rx.is_closed() {
208            // The sender drops when the Tokio-Quiche IOW finishes, so the
209            // connection is done and we're safe to yield the summary.
210            let summary = self.summary.take().expect("summary already taken");
211            Poll::Ready(summary)
212        } else {
213            Poll::Pending
214        }
215    }
216}
217
218pub struct H3iDriver {
219    buffer: Pooled<ConsumeBuffer>,
220    actions: Vec<Action>,
221    actions_executed: usize,
222    next_fire_time: Instant,
223    waiting_for_responses: WaitingFor,
224    record_tx: mpsc::UnboundedSender<ConnectionRecord>,
225    stream_parsers: StreamParserMap,
226    close_trigger_seen_rx: oneshot::Receiver<()>,
227}
228
229impl H3iDriver {
230    fn new(
231        actions: Vec<Action>, close_trigger_frames: Option<CloseTriggerFrames>,
232    ) -> (Self, BuildingConnectionSummary) {
233        let (record_tx, record_rx) = mpsc::unbounded_channel();
234        let (close_trigger_seen_tx, close_trigger_seen_rx) = oneshot::channel();
235        let fut = BuildingConnectionSummary::new(
236            record_rx,
237            close_trigger_frames,
238            close_trigger_seen_tx,
239        );
240
241        (
242            Self {
243                buffer: BufFactory::get_max_buf(),
244                actions,
245                actions_executed: 0,
246                next_fire_time: Instant::now(),
247                waiting_for_responses: WaitingFor::default(),
248                record_tx,
249                stream_parsers: StreamParserMap::default(),
250                close_trigger_seen_rx,
251            },
252            fut,
253        )
254    }
255
256    /// If the next action should fire.
257    fn should_fire(&self) -> bool {
258        Instant::now() >= self.next_fire_time
259    }
260
261    /// Insert all waits into the waiting set.
262    fn register_waits(&mut self) {
263        while self.actions_executed < self.actions.len() {
264            if let Action::Wait { wait_type } =
265                &self.actions[self.actions_executed]
266            {
267                self.actions_executed += 1;
268
269                match wait_type {
270                    WaitType::WaitDuration(duration) => {
271                        self.next_fire_time = Instant::now() + *duration;
272
273                        log::debug!(
274                            "h3i: waiting for responses: {:?}",
275                            self.waiting_for_responses
276                        );
277                    },
278                    WaitType::StreamEvent(event) => {
279                        self.waiting_for_responses.add_wait(event);
280                    },
281                }
282            } else {
283                break;
284            }
285        }
286    }
287}
288
289impl Client for H3iDriver {
290    fn stream_parsers_mut(&mut self) -> &mut StreamParserMap {
291        &mut self.stream_parsers
292    }
293
294    fn handle_response_frame(
295        &mut self, stream_id: u64, frame: crate::frame::H3iFrame,
296    ) {
297        self.record_tx
298            .send(ConnectionRecord::StreamedFrame { stream_id, frame })
299            .expect("H3iDriver task dropped")
300    }
301}
302
303impl ApplicationOverQuic for H3iDriver {
304    fn on_conn_established(
305        &mut self, _qconn: &mut QuicheConnection, _handshake_info: &HandshakeInfo,
306    ) -> QuicResult<()> {
307        log::info!("h3i: HTTP/3 connection established");
308        Ok(())
309    }
310
311    fn should_act(&self) -> bool {
312        // Even if the connection wasn't established, we should still send
313        // terminal records to the summary
314        true
315    }
316
317    fn process_reads(&mut self, qconn: &mut QuicheConnection) -> QuicResult<()> {
318        log::trace!("h3i: process_reads");
319
320        // This is executed in process_reads so that work_loop can clear any waits
321        // on the current event loop iteration - if it was in process_writes, we
322        // could potentially miss waits and hang the client.
323        self.register_waits();
324
325        let stream_events = parse_streams(qconn, self);
326        for event in stream_events {
327            self.waiting_for_responses.remove_wait(event);
328        }
329
330        Ok(())
331    }
332
333    fn process_writes(&mut self, qconn: &mut QuicheConnection) -> QuicResult<()> {
334        log::trace!("h3i: process_writes");
335
336        if !self.waiting_for_responses.is_empty() {
337            log::debug!(
338                "awaiting responses on streams {:?}, skipping further action",
339                self.waiting_for_responses
340            );
341
342            return Ok(());
343        }
344
345        // Re-create the iterator so we can mutably borrow the stream parser map
346        let mut iter =
347            self.actions.clone().into_iter().skip(self.actions_executed);
348        while let Some(action) = iter.next() {
349            match action {
350                Action::SendFrame { .. } |
351                Action::StreamBytes { .. } |
352                Action::ResetStream { .. } |
353                Action::StopSending { .. } |
354                Action::OpenUniStream { .. } |
355                Action::ConnectionClose { .. } |
356                Action::SendHeadersFrame { .. } => {
357                    if self.should_fire() {
358                        // Reset the fire time such that the next action will
359                        // still fire.
360                        self.next_fire_time = Instant::now();
361
362                        execute_action(&action, qconn, self.stream_parsers_mut());
363                        self.actions_executed += 1;
364                    } else {
365                        break;
366                    }
367                },
368                Action::Wait { .. } => {
369                    // Break out of the write phase if we see a wait, since waits
370                    // have to be registered in the read
371                    // phase. The actions_executed pointer will be
372                    // incremented there as well
373                    break;
374                },
375                Action::FlushPackets => {
376                    self.actions_executed += 1;
377                    break;
378                },
379            }
380        }
381
382        Ok(())
383    }
384
385    async fn wait_for_data(
386        &mut self, qconn: &mut QuicheConnection,
387    ) -> QuicResult<()> {
388        log::trace!("h3i: wait_for_data");
389
390        let waiting = !self.should_fire();
391        select! {
392            rx = &mut self.close_trigger_seen_rx, if !self.close_trigger_seen_rx.is_terminated() => {
393                // NOTE: wait_for_data can be called again after all close triggers have been seen,
394                // depending on how long it takes quiche to mark the connection as closed.
395                // Therefore we can't re-poll the receiver or we'd panic.
396                if rx.is_ok() {
397                    // TODO: customizable close trigger frames
398                    let _ = qconn.close(true, quiche::h3::WireErrorCode::NoError as u64, b"saw all expected frames");
399                }
400            }
401            _ = sleep_until(self.next_fire_time), if waiting => {
402                log::debug!("h3i: releasing wait timer");
403            }
404        }
405
406        Ok(())
407    }
408
409    fn buffer(&mut self) -> &mut [u8] {
410        &mut self.buffer
411    }
412
413    fn on_conn_close<M: Metrics>(
414        &mut self, qconn: &mut QuicheConnection, _metrics: &M,
415        _work_loop_result: &QuicResult<()>,
416    ) {
417        let _ = self
418            .record_tx
419            .send(ConnectionRecord::Close(ConnectionCloseDetails::new(&qconn)));
420
421        let _ = self
422            .record_tx
423            .send(ConnectionRecord::ConnectionStats(qconn.stats()));
424
425        let conn_path_stats = qconn.path_stats().collect::<Vec<PathStats>>();
426        let _ = self
427            .record_tx
428            .send(ConnectionRecord::PathStats(conn_path_stats));
429    }
430}
431
432pub enum ConnectionRecord {
433    StreamedFrame { stream_id: u64, frame: H3iFrame },
434    Close(ConnectionCloseDetails),
435    PathStats(Vec<PathStats>),
436    ConnectionStats(Stats),
437}