tokio_quiche/http3/driver/
server.rs1use std::ops::Deref;
28use std::sync::Arc;
29
30use tokio::sync::mpsc;
31
32use super::datagram;
33use super::DriverHooks;
34use super::H3Command;
35use super::H3ConnectionError;
36use super::H3ConnectionResult;
37use super::H3Controller;
38use super::H3Driver;
39use super::H3Event;
40use super::InboundHeaders;
41use super::IncomingH3Headers;
42use super::StreamCtx;
43use super::STREAM_CAPACITY;
44use crate::http3::settings::Http3Settings;
45use crate::http3::settings::Http3SettingsEnforcer;
46use crate::http3::settings::Http3TimeoutType;
47use crate::http3::settings::TimeoutKey;
48use crate::quic::HandshakeInfo;
49use crate::quic::QuicCommand;
50use crate::quic::QuicheConnection;
51
52pub type ServerH3Driver = H3Driver<ServerHooks>;
56pub type ServerH3Controller = H3Controller<ServerHooks>;
59pub type ServerEventStream = mpsc::UnboundedReceiver<ServerH3Event>;
63
64#[derive(Clone, Debug)]
65pub struct RawPriorityValue(Vec<u8>);
66
67impl From<Vec<u8>> for RawPriorityValue {
68    fn from(value: Vec<u8>) -> Self {
69        RawPriorityValue(value)
70    }
71}
72
73impl Deref for RawPriorityValue {
74    type Target = [u8];
75
76    fn deref(&self) -> &Self::Target {
77        &self.0
78    }
79}
80
81#[derive(Debug)]
83pub enum ServerH3Event {
84    Core(H3Event),
85
86    Headers {
87        incoming_headers: IncomingH3Headers,
88        priority: Option<RawPriorityValue>,
90    },
91}
92
93impl From<H3Event> for ServerH3Event {
94    fn from(ev: H3Event) -> Self {
95        match ev {
96            H3Event::IncomingHeaders(incoming_headers) => Self::Headers {
97                incoming_headers,
98                priority: None,
99            },
100            _ => Self::Core(ev),
101        }
102    }
103}
104
105#[derive(Debug)]
107pub enum ServerH3Command {
108    Core(H3Command),
109}
110
111impl From<H3Command> for ServerH3Command {
112    fn from(cmd: H3Command) -> Self {
113        Self::Core(cmd)
114    }
115}
116
117impl From<QuicCommand> for ServerH3Command {
118    fn from(cmd: QuicCommand) -> Self {
119        Self::Core(H3Command::QuicCmd(cmd))
120    }
121}
122
123const PRE_HEADERS_BOOSTED_PRIORITY_URGENCY: u8 = 64;
127const PRE_HEADERS_BOOSTED_PRIORITY_INCREMENTAL: bool = false;
130
131pub struct ServerHooks {
132    settings_enforcer: Http3SettingsEnforcer,
134    requests: u64,
136
137    post_accept_timeout: Option<TimeoutKey>,
140}
141
142impl ServerHooks {
143    fn handle_request(
148        driver: &mut H3Driver<Self>, qconn: &mut QuicheConnection,
149        headers: InboundHeaders,
150    ) -> H3ConnectionResult<()> {
151        let InboundHeaders {
152            stream_id,
153            headers,
154            has_body,
155        } = headers;
156
157        if driver.stream_map.contains_key(&stream_id) {
161            return Ok(());
162        }
163
164        let (mut stream_ctx, send, recv) =
165            StreamCtx::new(stream_id, STREAM_CAPACITY);
166
167        if let Some(flow_id) = datagram::extract_flow_id(stream_id, &headers) {
168            let _ = driver.get_or_insert_flow(flow_id)?;
169            stream_ctx.associated_dgram_flow_id = Some(flow_id);
170        }
171
172        let latest_priority_update: Option<RawPriorityValue> = driver
173            .conn_mut()?
174            .take_last_priority_update(stream_id)
175            .ok()
176            .map(|v| v.into());
177
178        qconn
182            .stream_priority(
183                stream_id,
184                PRE_HEADERS_BOOSTED_PRIORITY_URGENCY,
185                PRE_HEADERS_BOOSTED_PRIORITY_INCREMENTAL,
186            )
187            .ok();
188
189        let headers = IncomingH3Headers {
190            stream_id,
191            headers,
192            send,
193            recv,
194            read_fin: !has_body,
195            h3_audit_stats: Arc::clone(&stream_ctx.audit_stats),
196        };
197
198        driver
199            .waiting_streams
200            .push(stream_ctx.wait_for_recv(stream_id));
201        driver.insert_stream(stream_id, stream_ctx);
202
203        driver
204            .h3_event_sender
205            .send(ServerH3Event::Headers {
206                incoming_headers: headers,
207                priority: latest_priority_update,
208            })
209            .map_err(|_| H3ConnectionError::ControllerWentAway)?;
210        driver.hooks.requests += 1;
211
212        Ok(())
213    }
214}
215
216#[allow(private_interfaces)]
217impl DriverHooks for ServerHooks {
218    type Command = ServerH3Command;
219    type Event = ServerH3Event;
220
221    fn new(settings: &Http3Settings) -> Self {
222        Self {
223            settings_enforcer: settings.into(),
224            requests: 0,
225            post_accept_timeout: None,
226        }
227    }
228
229    fn conn_established(
230        driver: &mut H3Driver<Self>, qconn: &mut QuicheConnection,
231        handshake_info: &HandshakeInfo,
232    ) -> H3ConnectionResult<()> {
233        assert!(
234            qconn.is_server(),
235            "ServerH3Driver requires a server-side QUIC connection"
236        );
237
238        if let Some(post_accept_timeout) =
239            driver.hooks.settings_enforcer.post_accept_timeout()
240        {
241            let remaining = post_accept_timeout
242                .checked_sub(handshake_info.elapsed())
243                .ok_or(H3ConnectionError::PostAcceptTimeout)?;
244
245            let key = driver
246                .hooks
247                .settings_enforcer
248                .add_timeout(Http3TimeoutType::PostAccept, remaining);
249            driver.hooks.post_accept_timeout = Some(key);
250        }
251
252        Ok(())
253    }
254
255    fn headers_received(
256        driver: &mut H3Driver<Self>, qconn: &mut QuicheConnection,
257        headers: InboundHeaders,
258    ) -> H3ConnectionResult<()> {
259        if driver
260            .hooks
261            .settings_enforcer
262            .enforce_requests_limit(driver.hooks.requests)
263        {
264            let _ =
265                qconn.close(true, quiche::h3::WireErrorCode::NoError as u64, &[]);
266            return Ok(());
267        }
268
269        if let Some(timeout) = driver.hooks.post_accept_timeout.take() {
270            driver.hooks.settings_enforcer.cancel_timeout(timeout);
273        }
274
275        Self::handle_request(driver, qconn, headers)
276    }
277
278    fn conn_command(
279        driver: &mut H3Driver<Self>, qconn: &mut QuicheConnection,
280        cmd: Self::Command,
281    ) -> H3ConnectionResult<()> {
282        let ServerH3Command::Core(cmd) = cmd;
283        driver.handle_core_command(qconn, cmd)
284    }
285
286    fn has_wait_action(driver: &mut H3Driver<Self>) -> bool {
287        driver.hooks.settings_enforcer.has_pending_timeouts()
288    }
289
290    async fn wait_for_action(
291        &mut self, qconn: &mut QuicheConnection,
292    ) -> H3ConnectionResult<()> {
293        self.settings_enforcer.enforce_timeouts(qconn).await?;
294        Err(H3ConnectionError::PostAcceptTimeout)
295    }
296}