tokio_quiche/http3/driver/
server.rs1use std::ops::Deref;
28use std::sync::Arc;
29
30use tokio::sync::mpsc;
31
32use super::datagram;
33use super::DriverHooks;
34use super::H3Command;
35use super::H3ConnectionError;
36use super::H3ConnectionResult;
37use super::H3Controller;
38use super::H3Driver;
39use super::H3Event;
40use super::InboundHeaders;
41use super::IncomingH3Headers;
42use super::StreamCtx;
43use super::STREAM_CAPACITY;
44use crate::http3::settings::Http3Settings;
45use crate::http3::settings::Http3SettingsEnforcer;
46use crate::http3::settings::Http3TimeoutType;
47use crate::http3::settings::TimeoutKey;
48use crate::quic::HandshakeInfo;
49use crate::quic::QuicCommand;
50use crate::quic::QuicheConnection;
51
52pub type ServerH3Driver = H3Driver<ServerHooks>;
56pub type ServerH3Controller = H3Controller<ServerHooks>;
59pub type ServerEventStream = mpsc::UnboundedReceiver<ServerH3Event>;
63
64#[derive(Clone, Debug)]
65pub struct RawPriorityValue(Vec<u8>);
66
67impl From<Vec<u8>> for RawPriorityValue {
68 fn from(value: Vec<u8>) -> Self {
69 RawPriorityValue(value)
70 }
71}
72
73impl Deref for RawPriorityValue {
74 type Target = [u8];
75
76 fn deref(&self) -> &Self::Target {
77 &self.0
78 }
79}
80
81#[derive(Clone, Debug)]
83pub struct IsInEarlyData(bool);
84
85impl IsInEarlyData {
86 fn new(is_in_early_data: bool) -> Self {
87 IsInEarlyData(is_in_early_data)
88 }
89}
90
91impl Deref for IsInEarlyData {
92 type Target = bool;
93
94 fn deref(&self) -> &Self::Target {
95 &self.0
96 }
97}
98
99#[derive(Debug)]
101pub enum ServerH3Event {
102 Core(H3Event),
103
104 Headers {
105 incoming_headers: IncomingH3Headers,
106 priority: Option<RawPriorityValue>,
108 is_in_early_data: IsInEarlyData,
109 },
110}
111
112impl From<H3Event> for ServerH3Event {
113 fn from(ev: H3Event) -> Self {
114 match ev {
115 H3Event::IncomingHeaders(incoming_headers) => {
116 Self::Headers {
122 incoming_headers,
123 priority: None,
124 is_in_early_data: IsInEarlyData::new(false),
125 }
126 },
127 _ => Self::Core(ev),
128 }
129 }
130}
131
132#[derive(Debug)]
134pub enum ServerH3Command {
135 Core(H3Command),
136}
137
138impl From<H3Command> for ServerH3Command {
139 fn from(cmd: H3Command) -> Self {
140 Self::Core(cmd)
141 }
142}
143
144impl From<QuicCommand> for ServerH3Command {
145 fn from(cmd: QuicCommand) -> Self {
146 Self::Core(H3Command::QuicCmd(cmd))
147 }
148}
149
150const PRE_HEADERS_BOOSTED_PRIORITY_URGENCY: u8 = 64;
154const PRE_HEADERS_BOOSTED_PRIORITY_INCREMENTAL: bool = false;
157
158pub struct ServerHooks {
159 settings_enforcer: Http3SettingsEnforcer,
161 requests: u64,
163
164 post_accept_timeout: Option<TimeoutKey>,
167 extended_connect_enabled: bool,
170}
171
172impl ServerHooks {
173 fn handle_request(
178 driver: &mut H3Driver<Self>, qconn: &mut QuicheConnection,
179 headers: InboundHeaders,
180 ) -> H3ConnectionResult<()> {
181 let InboundHeaders {
182 stream_id,
183 headers,
184 has_body,
185 } = headers;
186
187 if driver.stream_map.contains_key(&stream_id) {
191 return Ok(());
192 }
193
194 let (mut stream_ctx, send, recv) =
195 StreamCtx::new(stream_id, STREAM_CAPACITY);
196
197 if driver.hooks.extended_connect_enabled() {
204 if let Some(flow_id) =
205 datagram::extract_quarter_stream_id(stream_id, &headers)
206 {
207 let _ = driver.get_or_insert_flow(flow_id)?;
208 stream_ctx.associated_dgram_flow_id = Some(flow_id);
209 }
210 }
211
212 let latest_priority_update: Option<RawPriorityValue> = driver
213 .conn_mut()?
214 .take_last_priority_update(stream_id)
215 .ok()
216 .map(|v| v.into());
217
218 qconn
222 .stream_priority(
223 stream_id,
224 PRE_HEADERS_BOOSTED_PRIORITY_URGENCY,
225 PRE_HEADERS_BOOSTED_PRIORITY_INCREMENTAL,
226 )
227 .ok();
228
229 let headers = IncomingH3Headers {
230 stream_id,
231 headers,
232 send,
233 recv,
234 read_fin: !has_body,
235 h3_audit_stats: Arc::clone(&stream_ctx.audit_stats),
236 };
237
238 driver
239 .waiting_streams
240 .push(stream_ctx.wait_for_recv(stream_id));
241 driver.insert_stream(stream_id, stream_ctx);
242
243 driver
244 .h3_event_sender
245 .send(ServerH3Event::Headers {
246 incoming_headers: headers,
247 priority: latest_priority_update,
248 is_in_early_data: IsInEarlyData::new(qconn.is_in_early_data()),
249 })
250 .map_err(|_| H3ConnectionError::ControllerWentAway)?;
251 driver.hooks.requests += 1;
252
253 Ok(())
254 }
255}
256
257#[allow(private_interfaces)]
258impl DriverHooks for ServerHooks {
259 type Command = ServerH3Command;
260 type Event = ServerH3Event;
261
262 fn new(settings: &Http3Settings) -> Self {
263 Self {
264 settings_enforcer: settings.into(),
265 requests: 0,
266 post_accept_timeout: None,
267 extended_connect_enabled: settings.enable_extended_connect,
268 }
269 }
270
271 fn extended_connect_enabled(&self) -> bool {
272 self.extended_connect_enabled
273 }
274
275 fn conn_established(
276 driver: &mut H3Driver<Self>, qconn: &mut QuicheConnection,
277 handshake_info: &HandshakeInfo,
278 ) -> H3ConnectionResult<()> {
279 assert!(
280 qconn.is_server(),
281 "ServerH3Driver requires a server-side QUIC connection"
282 );
283
284 if let Some(post_accept_timeout) =
285 driver.hooks.settings_enforcer.post_accept_timeout()
286 {
287 let remaining = post_accept_timeout
288 .checked_sub(handshake_info.elapsed())
289 .ok_or(H3ConnectionError::PostAcceptTimeout)?;
290
291 let key = driver
292 .hooks
293 .settings_enforcer
294 .add_timeout(Http3TimeoutType::PostAccept, remaining);
295 driver.hooks.post_accept_timeout = Some(key);
296 }
297
298 Ok(())
299 }
300
301 fn headers_received(
302 driver: &mut H3Driver<Self>, qconn: &mut QuicheConnection,
303 headers: InboundHeaders,
304 ) -> H3ConnectionResult<()> {
305 if driver
306 .hooks
307 .settings_enforcer
308 .enforce_requests_limit(driver.hooks.requests)
309 {
310 let _ =
311 qconn.close(true, quiche::h3::WireErrorCode::NoError as u64, &[]);
312 return Ok(());
313 }
314
315 if let Some(timeout) = driver.hooks.post_accept_timeout.take() {
316 driver.hooks.settings_enforcer.cancel_timeout(timeout);
319 }
320
321 Self::handle_request(driver, qconn, headers)
322 }
323
324 fn conn_command(
325 driver: &mut H3Driver<Self>, qconn: &mut QuicheConnection,
326 cmd: Self::Command,
327 ) -> H3ConnectionResult<()> {
328 let ServerH3Command::Core(cmd) = cmd;
329 driver.handle_core_command(qconn, cmd)
330 }
331
332 fn has_wait_action(driver: &mut H3Driver<Self>) -> bool {
333 driver.hooks.settings_enforcer.has_pending_timeouts()
334 }
335
336 async fn wait_for_action(
337 &mut self, qconn: &mut QuicheConnection,
338 ) -> H3ConnectionResult<()> {
339 self.settings_enforcer.enforce_timeouts(qconn).await?;
340 Err(H3ConnectionError::PostAcceptTimeout)
341 }
342}