tokio_quiche/http3/driver/
server.rs

1// Copyright (C) 2025, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27use std::ops::Deref;
28use std::sync::Arc;
29
30use tokio::sync::mpsc;
31
32use super::datagram;
33use super::DriverHooks;
34use super::H3Command;
35use super::H3ConnectionError;
36use super::H3ConnectionResult;
37use super::H3Controller;
38use super::H3Driver;
39use super::H3Event;
40use super::InboundHeaders;
41use super::IncomingH3Headers;
42use super::StreamCtx;
43use super::STREAM_CAPACITY;
44use crate::http3::settings::Http3Settings;
45use crate::http3::settings::Http3SettingsEnforcer;
46use crate::http3::settings::Http3TimeoutType;
47use crate::http3::settings::TimeoutKey;
48use crate::quic::HandshakeInfo;
49use crate::quic::QuicCommand;
50use crate::quic::QuicheConnection;
51
52/// An [H3Driver] for a server-side HTTP/3 connection. See [H3Driver] for
53/// details. Emits [`ServerH3Event`]s and expects [`ServerH3Command`]s for
54/// control.
55pub type ServerH3Driver = H3Driver<ServerHooks>;
56/// The [H3Controller] type paired with [ServerH3Driver]. See [H3Controller] for
57/// details.
58pub type ServerH3Controller = H3Controller<ServerHooks>;
59/// Receives [`ServerH3Event`]s from a [ServerH3Driver]. This is the control
60/// stream which describes what is happening on the connection, but does not
61/// transfer data.
62pub type ServerEventStream = mpsc::UnboundedReceiver<ServerH3Event>;
63
64#[derive(Clone, Debug)]
65pub struct RawPriorityValue(Vec<u8>);
66
67impl From<Vec<u8>> for RawPriorityValue {
68    fn from(value: Vec<u8>) -> Self {
69        RawPriorityValue(value)
70    }
71}
72
73impl Deref for RawPriorityValue {
74    type Target = [u8];
75
76    fn deref(&self) -> &Self::Target {
77        &self.0
78    }
79}
80
81/// Events produced by [ServerH3Driver].
82#[derive(Debug)]
83pub enum ServerH3Event {
84    Core(H3Event),
85
86    Headers {
87        incoming_headers: IncomingH3Headers,
88        /// The latest PRIORITY_UPDATE frame value, if any.
89        priority: Option<RawPriorityValue>,
90    },
91}
92
93impl From<H3Event> for ServerH3Event {
94    fn from(ev: H3Event) -> Self {
95        match ev {
96            H3Event::IncomingHeaders(incoming_headers) => Self::Headers {
97                incoming_headers,
98                priority: None,
99            },
100            _ => Self::Core(ev),
101        }
102    }
103}
104
105/// Commands accepted by [ServerH3Driver].
106#[derive(Debug)]
107pub enum ServerH3Command {
108    Core(H3Command),
109}
110
111impl From<H3Command> for ServerH3Command {
112    fn from(cmd: H3Command) -> Self {
113        Self::Core(cmd)
114    }
115}
116
117impl From<QuicCommand> for ServerH3Command {
118    fn from(cmd: QuicCommand) -> Self {
119        Self::Core(H3Command::QuicCmd(cmd))
120    }
121}
122
123// Quiche urgency is an 8-bit space. Internally, quiche reserves 0 for HTTP/3
124// control streams and request are shifted up by 124. Any value in that range is
125// suitable here.
126const PRE_HEADERS_BOOSTED_PRIORITY_URGENCY: u8 = 64;
127// Non-incremental streams are served in stream ID order, matching the client
128// FIFO expectation.
129const PRE_HEADERS_BOOSTED_PRIORITY_INCREMENTAL: bool = false;
130
131pub struct ServerHooks {
132    /// Helper to enforce limits and timeouts on an HTTP/3 connection.
133    settings_enforcer: Http3SettingsEnforcer,
134    /// Tracks the number of requests that have been handled by this driver.
135    requests: u64,
136
137    /// Handle to the post-accept timeout entry. If present, the server must
138    /// receive a HEADERS frame before this timeout.
139    post_accept_timeout: Option<TimeoutKey>,
140}
141
142impl ServerHooks {
143    /// Handles a new request, creating a stream context, checking for a
144    /// potential DATAGRAM flow (CONNECT-{UDP,IP}) and sending a relevant
145    /// [`H3Event`] to the [ServerH3Controller] for application-level
146    /// processing.
147    fn handle_request(
148        driver: &mut H3Driver<Self>, qconn: &mut QuicheConnection,
149        headers: InboundHeaders,
150    ) -> H3ConnectionResult<()> {
151        let InboundHeaders {
152            stream_id,
153            headers,
154            has_body,
155        } = headers;
156
157        // Multiple HEADERS frames can be received on a single stream, but only
158        // the first one is an actual request. For now ignore any additional
159        // HEADERS (e.g. "trailers").
160        if driver.stream_map.contains_key(&stream_id) {
161            return Ok(());
162        }
163
164        let (mut stream_ctx, send, recv) =
165            StreamCtx::new(stream_id, STREAM_CAPACITY);
166
167        if let Some(flow_id) = datagram::extract_flow_id(stream_id, &headers) {
168            let _ = driver.get_or_insert_flow(flow_id)?;
169            stream_ctx.associated_dgram_flow_id = Some(flow_id);
170        }
171
172        let latest_priority_update: Option<RawPriorityValue> = driver
173            .conn_mut()?
174            .take_last_priority_update(stream_id)
175            .ok()
176            .map(|v| v.into());
177
178        // Boost the priority of the stream until we write response headers via
179        // process_write_frame(), which will set the desired priority. Since it
180        // will get set later, just swallow any error here.
181        qconn
182            .stream_priority(
183                stream_id,
184                PRE_HEADERS_BOOSTED_PRIORITY_URGENCY,
185                PRE_HEADERS_BOOSTED_PRIORITY_INCREMENTAL,
186            )
187            .ok();
188
189        let headers = IncomingH3Headers {
190            stream_id,
191            headers,
192            send,
193            recv,
194            read_fin: !has_body,
195            h3_audit_stats: Arc::clone(&stream_ctx.audit_stats),
196        };
197
198        driver
199            .waiting_streams
200            .push(stream_ctx.wait_for_recv(stream_id));
201        driver.insert_stream(stream_id, stream_ctx);
202
203        driver
204            .h3_event_sender
205            .send(ServerH3Event::Headers {
206                incoming_headers: headers,
207                priority: latest_priority_update,
208            })
209            .map_err(|_| H3ConnectionError::ControllerWentAway)?;
210        driver.hooks.requests += 1;
211
212        Ok(())
213    }
214}
215
216#[allow(private_interfaces)]
217impl DriverHooks for ServerHooks {
218    type Command = ServerH3Command;
219    type Event = ServerH3Event;
220
221    fn new(settings: &Http3Settings) -> Self {
222        Self {
223            settings_enforcer: settings.into(),
224            requests: 0,
225            post_accept_timeout: None,
226        }
227    }
228
229    fn conn_established(
230        driver: &mut H3Driver<Self>, qconn: &mut QuicheConnection,
231        handshake_info: &HandshakeInfo,
232    ) -> H3ConnectionResult<()> {
233        assert!(
234            qconn.is_server(),
235            "ServerH3Driver requires a server-side QUIC connection"
236        );
237
238        if let Some(post_accept_timeout) =
239            driver.hooks.settings_enforcer.post_accept_timeout()
240        {
241            let remaining = post_accept_timeout
242                .checked_sub(handshake_info.elapsed())
243                .ok_or(H3ConnectionError::PostAcceptTimeout)?;
244
245            let key = driver
246                .hooks
247                .settings_enforcer
248                .add_timeout(Http3TimeoutType::PostAccept, remaining);
249            driver.hooks.post_accept_timeout = Some(key);
250        }
251
252        Ok(())
253    }
254
255    fn headers_received(
256        driver: &mut H3Driver<Self>, qconn: &mut QuicheConnection,
257        headers: InboundHeaders,
258    ) -> H3ConnectionResult<()> {
259        if driver
260            .hooks
261            .settings_enforcer
262            .enforce_requests_limit(driver.hooks.requests)
263        {
264            let _ =
265                qconn.close(true, quiche::h3::WireErrorCode::NoError as u64, &[]);
266            return Ok(());
267        }
268
269        if let Some(timeout) = driver.hooks.post_accept_timeout.take() {
270            // We've seen the first Headers event for the connection,
271            // so we can abort the post-accept timeout
272            driver.hooks.settings_enforcer.cancel_timeout(timeout);
273        }
274
275        Self::handle_request(driver, qconn, headers)
276    }
277
278    fn conn_command(
279        driver: &mut H3Driver<Self>, qconn: &mut QuicheConnection,
280        cmd: Self::Command,
281    ) -> H3ConnectionResult<()> {
282        let ServerH3Command::Core(cmd) = cmd;
283        driver.handle_core_command(qconn, cmd)
284    }
285
286    fn has_wait_action(driver: &mut H3Driver<Self>) -> bool {
287        driver.hooks.settings_enforcer.has_pending_timeouts()
288    }
289
290    async fn wait_for_action(
291        &mut self, qconn: &mut QuicheConnection,
292    ) -> H3ConnectionResult<()> {
293        self.settings_enforcer.enforce_timeouts(qconn).await?;
294        Err(H3ConnectionError::PostAcceptTimeout)
295    }
296}