Skip to main content

tokio_quiche/http3/driver/
server.rs

1// Copyright (C) 2025, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27use std::ops::Deref;
28use std::sync::Arc;
29
30use tokio::sync::mpsc;
31
32use super::datagram;
33use super::DriverHooks;
34use super::H3Command;
35use super::H3ConnectionError;
36use super::H3ConnectionResult;
37use super::H3Controller;
38use super::H3Driver;
39use super::H3Event;
40use super::InboundHeaders;
41use super::IncomingH3Headers;
42use super::StreamCtx;
43use super::STREAM_CAPACITY;
44use crate::http3::settings::Http3Settings;
45use crate::http3::settings::Http3SettingsEnforcer;
46use crate::http3::settings::Http3TimeoutType;
47use crate::http3::settings::TimeoutKey;
48use crate::quic::HandshakeInfo;
49use crate::quic::QuicCommand;
50use crate::quic::QuicheConnection;
51
52/// An [H3Driver] for a server-side HTTP/3 connection. See [H3Driver] for
53/// details. Emits [`ServerH3Event`]s and expects [`ServerH3Command`]s for
54/// control.
55pub type ServerH3Driver = H3Driver<ServerHooks>;
56/// The [H3Controller] type paired with [ServerH3Driver]. See [H3Controller] for
57/// details.
58pub type ServerH3Controller = H3Controller<ServerHooks>;
59/// Receives [`ServerH3Event`]s from a [ServerH3Driver]. This is the control
60/// stream which describes what is happening on the connection, but does not
61/// transfer data.
62pub type ServerEventStream = mpsc::UnboundedReceiver<ServerH3Event>;
63
64#[derive(Clone, Debug)]
65pub struct RawPriorityValue(Vec<u8>);
66
67impl From<Vec<u8>> for RawPriorityValue {
68    fn from(value: Vec<u8>) -> Self {
69        RawPriorityValue(value)
70    }
71}
72
73impl Deref for RawPriorityValue {
74    type Target = [u8];
75
76    fn deref(&self) -> &Self::Target {
77        &self.0
78    }
79}
80
81/// The request was received during early data (0-RTT).
82#[derive(Clone, Debug)]
83pub struct IsInEarlyData(bool);
84
85impl IsInEarlyData {
86    fn new(is_in_early_data: bool) -> Self {
87        IsInEarlyData(is_in_early_data)
88    }
89}
90
91impl Deref for IsInEarlyData {
92    type Target = bool;
93
94    fn deref(&self) -> &Self::Target {
95        &self.0
96    }
97}
98
99/// Events produced by [ServerH3Driver].
100#[derive(Debug)]
101pub enum ServerH3Event {
102    Core(H3Event),
103
104    Headers {
105        incoming_headers: IncomingH3Headers,
106        /// The latest PRIORITY_UPDATE frame value, if any.
107        priority: Option<RawPriorityValue>,
108        is_in_early_data: IsInEarlyData,
109    },
110}
111
112impl From<H3Event> for ServerH3Event {
113    fn from(ev: H3Event) -> Self {
114        match ev {
115            H3Event::IncomingHeaders(incoming_headers) => {
116                // Server `incoming_headers` are exclusively created in
117                // `ServerHooks::handle_request`, which correctly serializes the
118                // RawPriorityValue and IsInEarlyData values.
119                //
120                // See `H3Driver::process_read_event` for implementation details.
121                Self::Headers {
122                    incoming_headers,
123                    priority: None,
124                    is_in_early_data: IsInEarlyData::new(false),
125                }
126            },
127            _ => Self::Core(ev),
128        }
129    }
130}
131
132/// Commands accepted by [ServerH3Driver].
133#[derive(Debug)]
134pub enum ServerH3Command {
135    Core(H3Command),
136}
137
138impl From<H3Command> for ServerH3Command {
139    fn from(cmd: H3Command) -> Self {
140        Self::Core(cmd)
141    }
142}
143
144impl From<QuicCommand> for ServerH3Command {
145    fn from(cmd: QuicCommand) -> Self {
146        Self::Core(H3Command::QuicCmd(cmd))
147    }
148}
149
150// Quiche urgency is an 8-bit space. Internally, quiche reserves 0 for HTTP/3
151// control streams and request are shifted up by 124. Any value in that range is
152// suitable here.
153const PRE_HEADERS_BOOSTED_PRIORITY_URGENCY: u8 = 64;
154// Non-incremental streams are served in stream ID order, matching the client
155// FIFO expectation.
156const PRE_HEADERS_BOOSTED_PRIORITY_INCREMENTAL: bool = false;
157
158pub struct ServerHooks {
159    /// Helper to enforce limits and timeouts on an HTTP/3 connection.
160    settings_enforcer: Http3SettingsEnforcer,
161    /// Tracks the number of requests that have been handled by this driver.
162    requests: u64,
163
164    /// Handle to the post-accept timeout entry. If present, the server must
165    /// receive a HEADERS frame before this timeout.
166    post_accept_timeout: Option<TimeoutKey>,
167    /// Whether the extended CONNECT protocol is enabled. When disabled,
168    /// skip DATAGRAM flow creation for `:protocol` requests.
169    extended_connect_enabled: bool,
170}
171
172impl ServerHooks {
173    /// Handles a new request, creating a stream context, checking for a
174    /// potential DATAGRAM flow (CONNECT-{UDP,IP}) and sending a relevant
175    /// [`H3Event`] to the [ServerH3Controller] for application-level
176    /// processing.
177    fn handle_request(
178        driver: &mut H3Driver<Self>, qconn: &mut QuicheConnection,
179        headers: InboundHeaders,
180    ) -> H3ConnectionResult<()> {
181        let InboundHeaders {
182            stream_id,
183            headers,
184            has_body,
185        } = headers;
186
187        // Multiple HEADERS frames can be received on a single stream, but only
188        // the first one is an actual request. For now ignore any additional
189        // HEADERS (e.g. "trailers").
190        if driver.stream_map.contains_key(&stream_id) {
191            return Ok(());
192        }
193
194        let (mut stream_ctx, send, recv) =
195            StreamCtx::new(stream_id, STREAM_CAPACITY);
196
197        // When Extended CONNECT is enabled, extract the datagram flow ID
198        // from the request headers and register it in flow_map. This
199        // allows incoming DATAGRAMs to be routed to the correct stream.
200        // Note: flow_map entries are considered active work, so the
201        // connection will not be closed in cleanup_stream() while
202        // flows remain.
203        if driver.hooks.extended_connect_enabled() {
204            if let Some(flow_id) =
205                datagram::extract_quarter_stream_id(stream_id, &headers)
206            {
207                let _ = driver.get_or_insert_flow(flow_id)?;
208                stream_ctx.associated_dgram_flow_id = Some(flow_id);
209            }
210        }
211
212        let latest_priority_update: Option<RawPriorityValue> = driver
213            .conn_mut()?
214            .take_last_priority_update(stream_id)
215            .ok()
216            .map(|v| v.into());
217
218        // Boost the priority of the stream until we write response headers via
219        // process_write_frame(), which will set the desired priority. Since it
220        // will get set later, just swallow any error here.
221        qconn
222            .stream_priority(
223                stream_id,
224                PRE_HEADERS_BOOSTED_PRIORITY_URGENCY,
225                PRE_HEADERS_BOOSTED_PRIORITY_INCREMENTAL,
226            )
227            .ok();
228
229        let headers = IncomingH3Headers {
230            stream_id,
231            headers,
232            send,
233            recv,
234            read_fin: !has_body,
235            h3_audit_stats: Arc::clone(&stream_ctx.audit_stats),
236        };
237
238        driver
239            .waiting_streams
240            .push(stream_ctx.wait_for_recv(stream_id));
241        driver.insert_stream(stream_id, stream_ctx);
242
243        driver
244            .h3_event_sender
245            .send(ServerH3Event::Headers {
246                incoming_headers: headers,
247                priority: latest_priority_update,
248                is_in_early_data: IsInEarlyData::new(qconn.is_in_early_data()),
249            })
250            .map_err(|_| H3ConnectionError::ControllerWentAway)?;
251        driver.hooks.requests += 1;
252
253        Ok(())
254    }
255}
256
257#[allow(private_interfaces)]
258impl DriverHooks for ServerHooks {
259    type Command = ServerH3Command;
260    type Event = ServerH3Event;
261
262    fn new(settings: &Http3Settings) -> Self {
263        Self {
264            settings_enforcer: settings.into(),
265            requests: 0,
266            post_accept_timeout: None,
267            extended_connect_enabled: settings.enable_extended_connect,
268        }
269    }
270
271    fn extended_connect_enabled(&self) -> bool {
272        self.extended_connect_enabled
273    }
274
275    fn conn_established(
276        driver: &mut H3Driver<Self>, qconn: &mut QuicheConnection,
277        handshake_info: &HandshakeInfo,
278    ) -> H3ConnectionResult<()> {
279        assert!(
280            qconn.is_server(),
281            "ServerH3Driver requires a server-side QUIC connection"
282        );
283
284        if let Some(post_accept_timeout) =
285            driver.hooks.settings_enforcer.post_accept_timeout()
286        {
287            let remaining = post_accept_timeout
288                .checked_sub(handshake_info.elapsed())
289                .ok_or(H3ConnectionError::PostAcceptTimeout)?;
290
291            let key = driver
292                .hooks
293                .settings_enforcer
294                .add_timeout(Http3TimeoutType::PostAccept, remaining);
295            driver.hooks.post_accept_timeout = Some(key);
296        }
297
298        Ok(())
299    }
300
301    fn headers_received(
302        driver: &mut H3Driver<Self>, qconn: &mut QuicheConnection,
303        headers: InboundHeaders,
304    ) -> H3ConnectionResult<()> {
305        if driver
306            .hooks
307            .settings_enforcer
308            .enforce_requests_limit(driver.hooks.requests)
309        {
310            let _ =
311                qconn.close(true, quiche::h3::WireErrorCode::NoError as u64, &[]);
312            return Ok(());
313        }
314
315        if let Some(timeout) = driver.hooks.post_accept_timeout.take() {
316            // We've seen the first Headers event for the connection,
317            // so we can abort the post-accept timeout
318            driver.hooks.settings_enforcer.cancel_timeout(timeout);
319        }
320
321        Self::handle_request(driver, qconn, headers)
322    }
323
324    fn conn_command(
325        driver: &mut H3Driver<Self>, qconn: &mut QuicheConnection,
326        cmd: Self::Command,
327    ) -> H3ConnectionResult<()> {
328        let ServerH3Command::Core(cmd) = cmd;
329        driver.handle_core_command(qconn, cmd)
330    }
331
332    fn has_wait_action(driver: &mut H3Driver<Self>) -> bool {
333        driver.hooks.settings_enforcer.has_pending_timeouts()
334    }
335
336    async fn wait_for_action(
337        &mut self, qconn: &mut QuicheConnection,
338    ) -> H3ConnectionResult<()> {
339        self.settings_enforcer.enforce_timeouts(qconn).await?;
340        Err(H3ConnectionError::PostAcceptTimeout)
341    }
342}