tokio_quiche/http3/driver/
server.rs1use std::ops::Deref;
28use std::sync::Arc;
29
30use tokio::sync::mpsc;
31
32use super::datagram;
33use super::DriverHooks;
34use super::H3Command;
35use super::H3ConnectionError;
36use super::H3ConnectionResult;
37use super::H3Controller;
38use super::H3Driver;
39use super::H3Event;
40use super::InboundHeaders;
41use super::IncomingH3Headers;
42use super::StreamCtx;
43use super::STREAM_CAPACITY;
44use crate::http3::settings::Http3Settings;
45use crate::http3::settings::Http3SettingsEnforcer;
46use crate::http3::settings::Http3TimeoutType;
47use crate::http3::settings::TimeoutKey;
48use crate::quic::HandshakeInfo;
49use crate::quic::QuicCommand;
50use crate::quic::QuicheConnection;
51
52pub type ServerH3Driver = H3Driver<ServerHooks>;
56pub type ServerH3Controller = H3Controller<ServerHooks>;
59pub type ServerEventStream = mpsc::UnboundedReceiver<ServerH3Event>;
63
64#[derive(Clone, Debug)]
65pub struct RawPriorityValue(Vec<u8>);
66
67impl From<Vec<u8>> for RawPriorityValue {
68 fn from(value: Vec<u8>) -> Self {
69 RawPriorityValue(value)
70 }
71}
72
73impl Deref for RawPriorityValue {
74 type Target = [u8];
75
76 fn deref(&self) -> &Self::Target {
77 &self.0
78 }
79}
80
81#[derive(Clone, Debug)]
83pub struct IsInEarlyData(bool);
84
85impl IsInEarlyData {
86 fn new(is_in_early_data: bool) -> Self {
87 IsInEarlyData(is_in_early_data)
88 }
89}
90
91impl Deref for IsInEarlyData {
92 type Target = bool;
93
94 fn deref(&self) -> &Self::Target {
95 &self.0
96 }
97}
98
99#[derive(Debug)]
101pub enum ServerH3Event {
102 Core(H3Event),
103
104 Headers {
105 incoming_headers: IncomingH3Headers,
106 priority: Option<RawPriorityValue>,
108 is_in_early_data: IsInEarlyData,
109 },
110}
111
112impl From<H3Event> for ServerH3Event {
113 fn from(ev: H3Event) -> Self {
114 match ev {
115 H3Event::IncomingHeaders(incoming_headers) => {
116 Self::Headers {
122 incoming_headers,
123 priority: None,
124 is_in_early_data: IsInEarlyData::new(false),
125 }
126 },
127 _ => Self::Core(ev),
128 }
129 }
130}
131
132#[derive(Debug)]
134pub enum ServerH3Command {
135 Core(H3Command),
136}
137
138impl From<H3Command> for ServerH3Command {
139 fn from(cmd: H3Command) -> Self {
140 Self::Core(cmd)
141 }
142}
143
144impl From<QuicCommand> for ServerH3Command {
145 fn from(cmd: QuicCommand) -> Self {
146 Self::Core(H3Command::QuicCmd(cmd))
147 }
148}
149
150const PRE_HEADERS_BOOSTED_PRIORITY_URGENCY: u8 = 64;
154const PRE_HEADERS_BOOSTED_PRIORITY_INCREMENTAL: bool = false;
157
158pub struct ServerHooks {
159 settings_enforcer: Http3SettingsEnforcer,
161 requests: u64,
163
164 post_accept_timeout: Option<TimeoutKey>,
167}
168
169impl ServerHooks {
170 fn handle_request(
175 driver: &mut H3Driver<Self>, qconn: &mut QuicheConnection,
176 headers: InboundHeaders,
177 ) -> H3ConnectionResult<()> {
178 let InboundHeaders {
179 stream_id,
180 headers,
181 has_body,
182 } = headers;
183
184 if driver.stream_map.contains_key(&stream_id) {
188 return Ok(());
189 }
190
191 let (mut stream_ctx, send, recv) =
192 StreamCtx::new(stream_id, STREAM_CAPACITY);
193
194 if let Some(flow_id) =
195 datagram::extract_quarter_stream_id(stream_id, &headers)
196 {
197 let _ = driver.get_or_insert_flow(flow_id)?;
198 stream_ctx.associated_dgram_flow_id = Some(flow_id);
199 }
200
201 let latest_priority_update: Option<RawPriorityValue> = driver
202 .conn_mut()?
203 .take_last_priority_update(stream_id)
204 .ok()
205 .map(|v| v.into());
206
207 qconn
211 .stream_priority(
212 stream_id,
213 PRE_HEADERS_BOOSTED_PRIORITY_URGENCY,
214 PRE_HEADERS_BOOSTED_PRIORITY_INCREMENTAL,
215 )
216 .ok();
217
218 let headers = IncomingH3Headers {
219 stream_id,
220 headers,
221 send,
222 recv,
223 read_fin: !has_body,
224 h3_audit_stats: Arc::clone(&stream_ctx.audit_stats),
225 };
226
227 driver
228 .waiting_streams
229 .push(stream_ctx.wait_for_recv(stream_id));
230 driver.insert_stream(stream_id, stream_ctx);
231
232 driver
233 .h3_event_sender
234 .send(ServerH3Event::Headers {
235 incoming_headers: headers,
236 priority: latest_priority_update,
237 is_in_early_data: IsInEarlyData::new(qconn.is_in_early_data()),
238 })
239 .map_err(|_| H3ConnectionError::ControllerWentAway)?;
240 driver.hooks.requests += 1;
241
242 Ok(())
243 }
244}
245
246#[allow(private_interfaces)]
247impl DriverHooks for ServerHooks {
248 type Command = ServerH3Command;
249 type Event = ServerH3Event;
250
251 fn new(settings: &Http3Settings) -> Self {
252 Self {
253 settings_enforcer: settings.into(),
254 requests: 0,
255 post_accept_timeout: None,
256 }
257 }
258
259 fn conn_established(
260 driver: &mut H3Driver<Self>, qconn: &mut QuicheConnection,
261 handshake_info: &HandshakeInfo,
262 ) -> H3ConnectionResult<()> {
263 assert!(
264 qconn.is_server(),
265 "ServerH3Driver requires a server-side QUIC connection"
266 );
267
268 if let Some(post_accept_timeout) =
269 driver.hooks.settings_enforcer.post_accept_timeout()
270 {
271 let remaining = post_accept_timeout
272 .checked_sub(handshake_info.elapsed())
273 .ok_or(H3ConnectionError::PostAcceptTimeout)?;
274
275 let key = driver
276 .hooks
277 .settings_enforcer
278 .add_timeout(Http3TimeoutType::PostAccept, remaining);
279 driver.hooks.post_accept_timeout = Some(key);
280 }
281
282 Ok(())
283 }
284
285 fn headers_received(
286 driver: &mut H3Driver<Self>, qconn: &mut QuicheConnection,
287 headers: InboundHeaders,
288 ) -> H3ConnectionResult<()> {
289 if driver
290 .hooks
291 .settings_enforcer
292 .enforce_requests_limit(driver.hooks.requests)
293 {
294 let _ =
295 qconn.close(true, quiche::h3::WireErrorCode::NoError as u64, &[]);
296 return Ok(());
297 }
298
299 if let Some(timeout) = driver.hooks.post_accept_timeout.take() {
300 driver.hooks.settings_enforcer.cancel_timeout(timeout);
303 }
304
305 Self::handle_request(driver, qconn, headers)
306 }
307
308 fn conn_command(
309 driver: &mut H3Driver<Self>, qconn: &mut QuicheConnection,
310 cmd: Self::Command,
311 ) -> H3ConnectionResult<()> {
312 let ServerH3Command::Core(cmd) = cmd;
313 driver.handle_core_command(qconn, cmd)
314 }
315
316 fn has_wait_action(driver: &mut H3Driver<Self>) -> bool {
317 driver.hooks.settings_enforcer.has_pending_timeouts()
318 }
319
320 async fn wait_for_action(
321 &mut self, qconn: &mut QuicheConnection,
322 ) -> H3ConnectionResult<()> {
323 self.settings_enforcer.enforce_timeouts(qconn).await?;
324 Err(H3ConnectionError::PostAcceptTimeout)
325 }
326}