tokio_quiche/http3/driver/
server.rs1use std::ops::Deref;
28use std::sync::Arc;
29
30use tokio::sync::mpsc;
31
32use super::datagram;
33use super::DriverHooks;
34use super::H3Command;
35use super::H3ConnectionError;
36use super::H3ConnectionResult;
37use super::H3Controller;
38use super::H3Driver;
39use super::H3Event;
40use super::InboundHeaders;
41use super::IncomingH3Headers;
42use super::StreamCtx;
43use super::STREAM_CAPACITY;
44use crate::http3::settings::Http3Settings;
45use crate::http3::settings::Http3SettingsEnforcer;
46use crate::http3::settings::Http3TimeoutType;
47use crate::http3::settings::TimeoutKey;
48use crate::quic::HandshakeInfo;
49use crate::quic::QuicCommand;
50use crate::quic::QuicheConnection;
51
52pub type ServerH3Driver = H3Driver<ServerHooks>;
56pub type ServerH3Controller = H3Controller<ServerHooks>;
59pub type ServerEventStream = mpsc::UnboundedReceiver<ServerH3Event>;
63
64#[derive(Clone, Debug)]
65pub struct RawPriorityValue(Vec<u8>);
66
67impl From<Vec<u8>> for RawPriorityValue {
68 fn from(value: Vec<u8>) -> Self {
69 RawPriorityValue(value)
70 }
71}
72
73impl Deref for RawPriorityValue {
74 type Target = [u8];
75
76 fn deref(&self) -> &Self::Target {
77 &self.0
78 }
79}
80
81#[derive(Clone, Debug)]
83pub struct IsInEarlyData(bool);
84
85impl IsInEarlyData {
86 fn new(is_in_early_data: bool) -> Self {
87 IsInEarlyData(is_in_early_data)
88 }
89}
90
91impl Deref for IsInEarlyData {
92 type Target = bool;
93
94 fn deref(&self) -> &Self::Target {
95 &self.0
96 }
97}
98
99#[derive(Debug)]
101pub enum ServerH3Event {
102 Core(H3Event),
103
104 Headers {
105 incoming_headers: IncomingH3Headers,
106 priority: Option<RawPriorityValue>,
108 is_in_early_data: IsInEarlyData,
109 },
110}
111
112impl From<H3Event> for ServerH3Event {
113 fn from(ev: H3Event) -> Self {
114 match ev {
115 H3Event::IncomingHeaders(incoming_headers) => {
116 Self::Headers {
122 incoming_headers,
123 priority: None,
124 is_in_early_data: IsInEarlyData::new(false),
125 }
126 },
127 _ => Self::Core(ev),
128 }
129 }
130}
131
132#[derive(Debug)]
134pub enum ServerH3Command {
135 Core(H3Command),
136}
137
138impl From<H3Command> for ServerH3Command {
139 fn from(cmd: H3Command) -> Self {
140 Self::Core(cmd)
141 }
142}
143
144impl From<QuicCommand> for ServerH3Command {
145 fn from(cmd: QuicCommand) -> Self {
146 Self::Core(H3Command::QuicCmd(cmd))
147 }
148}
149
150const PRE_HEADERS_BOOSTED_PRIORITY_URGENCY: u8 = 64;
154const PRE_HEADERS_BOOSTED_PRIORITY_INCREMENTAL: bool = false;
157
158pub struct ServerHooks {
159 settings_enforcer: Http3SettingsEnforcer,
161 requests: u64,
163
164 post_accept_timeout: Option<TimeoutKey>,
167}
168
169impl ServerHooks {
170 fn handle_request(
175 driver: &mut H3Driver<Self>, qconn: &mut QuicheConnection,
176 headers: InboundHeaders,
177 ) -> H3ConnectionResult<()> {
178 let InboundHeaders {
179 stream_id,
180 headers,
181 has_body,
182 } = headers;
183
184 if driver.stream_map.contains_key(&stream_id) {
188 return Ok(());
189 }
190
191 let (mut stream_ctx, send, recv) =
192 StreamCtx::new(stream_id, STREAM_CAPACITY);
193
194 if let Some(flow_id) = datagram::extract_flow_id(stream_id, &headers) {
195 let _ = driver.get_or_insert_flow(flow_id)?;
196 stream_ctx.associated_dgram_flow_id = Some(flow_id);
197 }
198
199 let latest_priority_update: Option<RawPriorityValue> = driver
200 .conn_mut()?
201 .take_last_priority_update(stream_id)
202 .ok()
203 .map(|v| v.into());
204
205 qconn
209 .stream_priority(
210 stream_id,
211 PRE_HEADERS_BOOSTED_PRIORITY_URGENCY,
212 PRE_HEADERS_BOOSTED_PRIORITY_INCREMENTAL,
213 )
214 .ok();
215
216 let headers = IncomingH3Headers {
217 stream_id,
218 headers,
219 send,
220 recv,
221 read_fin: !has_body,
222 h3_audit_stats: Arc::clone(&stream_ctx.audit_stats),
223 };
224
225 driver
226 .waiting_streams
227 .push(stream_ctx.wait_for_recv(stream_id));
228 driver.insert_stream(stream_id, stream_ctx);
229
230 driver
231 .h3_event_sender
232 .send(ServerH3Event::Headers {
233 incoming_headers: headers,
234 priority: latest_priority_update,
235 is_in_early_data: IsInEarlyData::new(qconn.is_in_early_data()),
236 })
237 .map_err(|_| H3ConnectionError::ControllerWentAway)?;
238 driver.hooks.requests += 1;
239
240 Ok(())
241 }
242}
243
244#[allow(private_interfaces)]
245impl DriverHooks for ServerHooks {
246 type Command = ServerH3Command;
247 type Event = ServerH3Event;
248
249 fn new(settings: &Http3Settings) -> Self {
250 Self {
251 settings_enforcer: settings.into(),
252 requests: 0,
253 post_accept_timeout: None,
254 }
255 }
256
257 fn conn_established(
258 driver: &mut H3Driver<Self>, qconn: &mut QuicheConnection,
259 handshake_info: &HandshakeInfo,
260 ) -> H3ConnectionResult<()> {
261 assert!(
262 qconn.is_server(),
263 "ServerH3Driver requires a server-side QUIC connection"
264 );
265
266 if let Some(post_accept_timeout) =
267 driver.hooks.settings_enforcer.post_accept_timeout()
268 {
269 let remaining = post_accept_timeout
270 .checked_sub(handshake_info.elapsed())
271 .ok_or(H3ConnectionError::PostAcceptTimeout)?;
272
273 let key = driver
274 .hooks
275 .settings_enforcer
276 .add_timeout(Http3TimeoutType::PostAccept, remaining);
277 driver.hooks.post_accept_timeout = Some(key);
278 }
279
280 Ok(())
281 }
282
283 fn headers_received(
284 driver: &mut H3Driver<Self>, qconn: &mut QuicheConnection,
285 headers: InboundHeaders,
286 ) -> H3ConnectionResult<()> {
287 if driver
288 .hooks
289 .settings_enforcer
290 .enforce_requests_limit(driver.hooks.requests)
291 {
292 let _ =
293 qconn.close(true, quiche::h3::WireErrorCode::NoError as u64, &[]);
294 return Ok(());
295 }
296
297 if let Some(timeout) = driver.hooks.post_accept_timeout.take() {
298 driver.hooks.settings_enforcer.cancel_timeout(timeout);
301 }
302
303 Self::handle_request(driver, qconn, headers)
304 }
305
306 fn conn_command(
307 driver: &mut H3Driver<Self>, qconn: &mut QuicheConnection,
308 cmd: Self::Command,
309 ) -> H3ConnectionResult<()> {
310 let ServerH3Command::Core(cmd) = cmd;
311 driver.handle_core_command(qconn, cmd)
312 }
313
314 fn has_wait_action(driver: &mut H3Driver<Self>) -> bool {
315 driver.hooks.settings_enforcer.has_pending_timeouts()
316 }
317
318 async fn wait_for_action(
319 &mut self, qconn: &mut QuicheConnection,
320 ) -> H3ConnectionResult<()> {
321 self.settings_enforcer.enforce_timeouts(qconn).await?;
322 Err(H3ConnectionError::PostAcceptTimeout)
323 }
324}