h3i/client/
async_client.rs

1// Copyright (C) 2025, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27//! Responsible for creating a [tokio_quiche::quic::QuicheConnection] and
28//! yielding I/O to tokio-quiche.
29
30use buffer_pool::ConsumeBuffer;
31use buffer_pool::Pooled;
32use log;
33use quiche::PathStats;
34use quiche::Stats;
35use std::future::Future;
36use std::pin::Pin;
37use std::task::Context;
38use std::task::Poll;
39use std::time::Duration;
40use tokio::select;
41use tokio::sync::mpsc;
42use tokio::sync::oneshot;
43use tokio::time::sleep;
44use tokio::time::sleep_until;
45use tokio::time::Instant;
46use tokio_quiche::buf_factory::BufFactory;
47use tokio_quiche::metrics::Metrics;
48use tokio_quiche::quic::HandshakeInfo;
49use tokio_quiche::quic::QuicheConnection;
50use tokio_quiche::settings::Hooks;
51use tokio_quiche::settings::QuicSettings;
52use tokio_quiche::socket::Socket;
53use tokio_quiche::ApplicationOverQuic;
54use tokio_quiche::ConnectionParams;
55use tokio_quiche::QuicResult;
56
57use crate::actions::h3::Action;
58use crate::actions::h3::WaitType;
59use crate::actions::h3::WaitingFor;
60use crate::client::execute_action;
61use crate::client::parse_args;
62use crate::client::parse_streams;
63use crate::client::ClientError;
64use crate::client::CloseTriggerFrames;
65use crate::client::ConnectionSummary;
66use crate::client::ParsedArgs;
67use crate::client::StreamMap;
68use crate::client::MAX_DATAGRAM_SIZE;
69use crate::config::Config as H3iConfig;
70use crate::frame::H3iFrame;
71use crate::quiche;
72
73use super::Client;
74use super::ConnectionCloseDetails;
75use super::StreamParserMap;
76
77/// Connect to the socket.
78pub async fn connect(
79    args: &H3iConfig, frame_actions: Vec<Action>,
80    close_trigger_frames: Option<CloseTriggerFrames>,
81) -> std::result::Result<BuildingConnectionSummary, ClientError> {
82    let quic_settings = create_config(args);
83    let mut connection_params =
84        ConnectionParams::new_client(quic_settings, None, Hooks::default());
85
86    connection_params.session = args.session.clone();
87
88    let ParsedArgs {
89        connect_url,
90        bind_addr,
91        peer_addr,
92    } = parse_args(args);
93
94    let socket = tokio::net::UdpSocket::bind(bind_addr).await.unwrap();
95    socket.connect(peer_addr).await.unwrap();
96
97    log::info!(
98        "connecting to {:} from {:}",
99        peer_addr,
100        socket.local_addr().unwrap()
101    );
102
103    let (h3i, conn_summary_fut) =
104        H3iDriver::new(frame_actions, close_trigger_frames);
105    match tokio_quiche::quic::connect_with_config(
106        Socket::try_from(socket).unwrap(),
107        connect_url,
108        &connection_params,
109        h3i,
110    )
111    .await
112    {
113        Ok(_) => Ok(conn_summary_fut),
114        Err(_) => Err(ClientError::HandshakeFail),
115    }
116}
117
118fn create_config(args: &H3iConfig) -> QuicSettings {
119    let mut quic_settings = QuicSettings::default();
120
121    quic_settings.verify_peer = args.verify_peer;
122    quic_settings.max_idle_timeout =
123        Some(Duration::from_millis(args.idle_timeout));
124    quic_settings.max_recv_udp_payload_size = MAX_DATAGRAM_SIZE;
125    quic_settings.max_send_udp_payload_size = MAX_DATAGRAM_SIZE;
126    quic_settings.initial_max_data = 10_000_000;
127    quic_settings.initial_max_stream_data_bidi_local =
128        args.max_stream_data_bidi_local;
129    quic_settings.initial_max_stream_data_bidi_remote =
130        args.max_stream_data_bidi_remote;
131    quic_settings.initial_max_stream_data_uni = args.max_stream_data_uni;
132    quic_settings.initial_max_streams_bidi = args.max_streams_bidi;
133    quic_settings.initial_max_streams_uni = args.max_streams_uni;
134    quic_settings.disable_active_migration = true;
135    quic_settings.active_connection_id_limit = 0;
136    quic_settings.max_connection_window = args.max_window;
137    quic_settings.max_stream_window = args.max_stream_window;
138    quic_settings.grease = false;
139
140    quic_settings.capture_quiche_logs = true;
141    quic_settings.keylog_file = std::env::var_os("SSLKEYLOGFILE")
142        .and_then(|os_str| os_str.into_string().ok());
143
144    quic_settings.enable_dgram = args.enable_dgram;
145    quic_settings.dgram_recv_max_queue_len = args.dgram_recv_queue_len;
146    quic_settings.dgram_send_max_queue_len = args.dgram_send_queue_len;
147
148    quic_settings
149}
150
151/// The [`Future`] used to build a [`ConnectionSummary`].
152///
153/// At a high level, [`H3iDriver`] will interact with the UDP socket directly,
154/// sending and receiving data as necessary. As new data is received, it will
155/// send [`ConnectionRecord`]s to this struct, which uses these records to
156/// construct the [`ConnectionSummary`].
157#[must_use = "must await to get a ConnectionSummary"]
158pub struct BuildingConnectionSummary {
159    rx: mpsc::UnboundedReceiver<ConnectionRecord>,
160    summary: Option<ConnectionSummary>,
161    seen_all_close_trigger_frames: Option<oneshot::Sender<()>>,
162}
163
164impl BuildingConnectionSummary {
165    fn new(
166        rx: mpsc::UnboundedReceiver<ConnectionRecord>,
167        close_trigger_frames: Option<CloseTriggerFrames>,
168        trigger_frame_tx: oneshot::Sender<()>,
169    ) -> Self {
170        let summary = ConnectionSummary {
171            stream_map: StreamMap::new(close_trigger_frames),
172            ..Default::default()
173        };
174
175        Self {
176            rx,
177            summary: Some(summary),
178            seen_all_close_trigger_frames: Some(trigger_frame_tx),
179        }
180    }
181}
182
183impl Future for BuildingConnectionSummary {
184    type Output = ConnectionSummary;
185
186    fn poll(
187        mut self: Pin<&mut Self>, cx: &mut Context<'_>,
188    ) -> Poll<Self::Output> {
189        while let Poll::Ready(Some(record)) = self.rx.poll_recv(cx) {
190            // Grab all records received from the current event loop iteration and
191            // insert them into the in-progress summary
192            let summary = self.summary.as_mut().expect("summary already taken");
193
194            match record {
195                ConnectionRecord::StreamedFrame { stream_id, frame } => {
196                    let stream_map = &mut summary.stream_map;
197                    stream_map.insert(stream_id, frame);
198
199                    if stream_map.all_close_trigger_frames_seen() {
200                        // Signal the H3iDriver task to close the connection.
201                        if let Some(expected_tx) =
202                            self.seen_all_close_trigger_frames.take()
203                        {
204                            let _ = expected_tx.send(());
205                        }
206                    }
207                },
208                ConnectionRecord::ConnectionStats(s) => summary.stats = Some(s),
209                ConnectionRecord::PathStats(ps) => summary.path_stats = ps,
210                ConnectionRecord::Close(d) => summary.conn_close_details = d,
211            };
212        }
213
214        if self.rx.is_closed() {
215            // The sender drops when the Tokio-Quiche IOW finishes, so the
216            // connection is done and we're safe to yield the summary.
217            let summary = self.summary.take().expect("summary already taken");
218            Poll::Ready(summary)
219        } else {
220            Poll::Pending
221        }
222    }
223}
224
225pub struct H3iDriver {
226    buffer: Pooled<ConsumeBuffer>,
227    actions: Vec<Action>,
228    actions_executed: usize,
229    next_fire_time: Instant,
230    waiting_for_responses: WaitingFor,
231    record_tx: mpsc::UnboundedSender<ConnectionRecord>,
232    stream_parsers: StreamParserMap,
233    close_trigger_seen_rx: oneshot::Receiver<()>,
234}
235
236impl H3iDriver {
237    fn new(
238        actions: Vec<Action>, close_trigger_frames: Option<CloseTriggerFrames>,
239    ) -> (Self, BuildingConnectionSummary) {
240        let (record_tx, record_rx) = mpsc::unbounded_channel();
241        let (close_trigger_seen_tx, close_trigger_seen_rx) = oneshot::channel();
242        let fut = BuildingConnectionSummary::new(
243            record_rx,
244            close_trigger_frames,
245            close_trigger_seen_tx,
246        );
247
248        (
249            Self {
250                buffer: BufFactory::get_max_buf(),
251                actions,
252                actions_executed: 0,
253                next_fire_time: Instant::now(),
254                waiting_for_responses: WaitingFor::default(),
255                record_tx,
256                stream_parsers: StreamParserMap::default(),
257                close_trigger_seen_rx,
258            },
259            fut,
260        )
261    }
262
263    /// If the next action should fire.
264    fn should_fire(&self) -> bool {
265        Instant::now() >= self.next_fire_time
266    }
267
268    /// Insert all waits into the waiting set.
269    fn register_waits(&mut self) {
270        while self.actions_executed < self.actions.len() {
271            if let Action::Wait { wait_type } =
272                &self.actions[self.actions_executed]
273            {
274                self.actions_executed += 1;
275
276                match wait_type {
277                    WaitType::WaitDuration(duration) => {
278                        self.next_fire_time = Instant::now() + *duration;
279
280                        log::debug!(
281                            "h3i: waiting for responses: {:?}",
282                            self.waiting_for_responses
283                        );
284                    },
285                    WaitType::StreamEvent(event) => {
286                        self.waiting_for_responses.add_wait(event);
287                    },
288                }
289            } else {
290                break;
291            }
292        }
293    }
294}
295
296impl Client for H3iDriver {
297    fn stream_parsers_mut(&mut self) -> &mut StreamParserMap {
298        &mut self.stream_parsers
299    }
300
301    fn handle_response_frame(
302        &mut self, stream_id: u64, frame: crate::frame::H3iFrame,
303    ) {
304        self.record_tx
305            .send(ConnectionRecord::StreamedFrame { stream_id, frame })
306            .expect("H3iDriver task dropped")
307    }
308}
309
310impl ApplicationOverQuic for H3iDriver {
311    fn on_conn_established(
312        &mut self, _qconn: &mut QuicheConnection, _handshake_info: &HandshakeInfo,
313    ) -> QuicResult<()> {
314        log::info!("h3i: HTTP/3 connection established");
315        Ok(())
316    }
317
318    fn should_act(&self) -> bool {
319        // Even if the connection wasn't established, we should still send
320        // terminal records to the summary
321        true
322    }
323
324    fn process_reads(&mut self, qconn: &mut QuicheConnection) -> QuicResult<()> {
325        log::trace!("h3i: process_reads");
326
327        // This is executed in process_reads so that work_loop can clear any waits
328        // on the current event loop iteration - if it was in process_writes, we
329        // could potentially miss waits and hang the client.
330        self.register_waits();
331
332        let stream_events = parse_streams(qconn, self);
333        for event in stream_events {
334            self.waiting_for_responses.remove_wait(event);
335        }
336
337        Ok(())
338    }
339
340    fn process_writes(&mut self, qconn: &mut QuicheConnection) -> QuicResult<()> {
341        log::trace!("h3i: process_writes");
342
343        if !self.waiting_for_responses.is_empty() {
344            log::debug!(
345                "awaiting responses on streams {:?}, skipping further action",
346                self.waiting_for_responses
347            );
348
349            return Ok(());
350        }
351
352        // Re-create the iterator so we can mutably borrow the stream parser map
353        let iter = self.actions.clone().into_iter().skip(self.actions_executed);
354
355        for action in iter {
356            match action {
357                Action::SendFrame { .. } |
358                Action::StreamBytes { .. } |
359                Action::SendDatagram { .. } |
360                Action::ResetStream { .. } |
361                Action::StopSending { .. } |
362                Action::OpenUniStream { .. } |
363                Action::ConnectionClose { .. } |
364                Action::SendHeadersFrame { .. } => {
365                    if self.should_fire() {
366                        // Reset the fire time such that the next action will
367                        // still fire.
368                        self.next_fire_time = Instant::now();
369
370                        execute_action(&action, qconn, self.stream_parsers_mut());
371                        self.actions_executed += 1;
372                    } else {
373                        break;
374                    }
375                },
376                Action::Wait { .. } => {
377                    // Break out of the write phase if we see a wait, since waits
378                    // have to be registered in the read
379                    // phase. The actions_executed pointer will be
380                    // incremented there as well
381                    break;
382                },
383                Action::FlushPackets => {
384                    self.actions_executed += 1;
385                    break;
386                },
387            }
388        }
389
390        Ok(())
391    }
392
393    async fn wait_for_data(
394        &mut self, qconn: &mut QuicheConnection,
395    ) -> QuicResult<()> {
396        log::trace!("h3i: wait_for_data");
397
398        let sleep_fut = if !self.should_fire() {
399            sleep_until(self.next_fire_time)
400        } else {
401            // If we have nothing to send, allow the IOW to resolve wait_for_data
402            // on its own (whether via Quiche timer or incoming data).
403            sleep(Duration::MAX)
404        };
405
406        select! {
407            rx = &mut self.close_trigger_seen_rx, if !self.close_trigger_seen_rx.is_terminated() => {
408                // NOTE: wait_for_data can be called again after all close triggers have been seen,
409                // depending on how long it takes quiche to mark the connection as closed.
410                // Therefore we can't re-poll the receiver or we'd panic.
411                if rx.is_ok() {
412                    // TODO: customizable close trigger frames
413                    let _ = qconn.close(true, quiche::h3::WireErrorCode::NoError as u64, b"saw all expected frames");
414                }
415            }
416            _ = sleep_fut => {}
417        }
418
419        Ok(())
420    }
421
422    fn buffer(&mut self) -> &mut [u8] {
423        &mut self.buffer
424    }
425
426    fn on_conn_close<M: Metrics>(
427        &mut self, qconn: &mut QuicheConnection, _metrics: &M,
428        _work_loop_result: &QuicResult<()>,
429    ) {
430        let _ = self
431            .record_tx
432            .send(ConnectionRecord::Close(ConnectionCloseDetails::new(qconn)));
433
434        let _ = self
435            .record_tx
436            .send(ConnectionRecord::ConnectionStats(qconn.stats()));
437
438        let conn_path_stats = qconn.path_stats().collect::<Vec<PathStats>>();
439        let _ = self
440            .record_tx
441            .send(ConnectionRecord::PathStats(conn_path_stats));
442    }
443}
444
445pub enum ConnectionRecord {
446    StreamedFrame { stream_id: u64, frame: H3iFrame },
447    Close(ConnectionCloseDetails),
448    PathStats(Vec<PathStats>),
449    ConnectionStats(Stats),
450}