tokio_quiche/http3/driver/
server.rs1use std::ops::Deref;
28use std::sync::Arc;
29
30use tokio::sync::mpsc;
31
32use super::datagram;
33use super::DriverHooks;
34use super::H3Command;
35use super::H3ConnectionError;
36use super::H3ConnectionResult;
37use super::H3Controller;
38use super::H3Driver;
39use super::H3Event;
40use super::InboundHeaders;
41use super::IncomingH3Headers;
42use super::StreamCtx;
43use super::STREAM_CAPACITY;
44use crate::http3::settings::Http3Settings;
45use crate::http3::settings::Http3SettingsEnforcer;
46use crate::http3::settings::Http3TimeoutType;
47use crate::http3::settings::TimeoutKey;
48use crate::quic::HandshakeInfo;
49use crate::quic::QuicCommand;
50use crate::quic::QuicheConnection;
51
52pub type ServerH3Driver = H3Driver<ServerHooks>;
56pub type ServerH3Controller = H3Controller<ServerHooks>;
59pub type ServerEventStream = mpsc::UnboundedReceiver<ServerH3Event>;
63
64#[derive(Clone, Debug)]
65pub struct RawPriorityValue(Vec<u8>);
66
67impl From<Vec<u8>> for RawPriorityValue {
68 fn from(value: Vec<u8>) -> Self {
69 RawPriorityValue(value)
70 }
71}
72
73impl Deref for RawPriorityValue {
74 type Target = [u8];
75
76 fn deref(&self) -> &Self::Target {
77 &self.0
78 }
79}
80
81#[derive(Debug)]
83pub enum ServerH3Event {
84 Core(H3Event),
85
86 Headers {
87 incoming_headers: IncomingH3Headers,
88 priority: Option<RawPriorityValue>,
90 },
91}
92
93impl From<H3Event> for ServerH3Event {
94 fn from(ev: H3Event) -> Self {
95 match ev {
96 H3Event::IncomingHeaders(incoming_headers) => Self::Headers {
97 incoming_headers,
98 priority: None,
99 },
100 _ => Self::Core(ev),
101 }
102 }
103}
104
105#[derive(Debug)]
107pub enum ServerH3Command {
108 Core(H3Command),
109}
110
111impl From<H3Command> for ServerH3Command {
112 fn from(cmd: H3Command) -> Self {
113 Self::Core(cmd)
114 }
115}
116
117impl From<QuicCommand> for ServerH3Command {
118 fn from(cmd: QuicCommand) -> Self {
119 Self::Core(H3Command::QuicCmd(cmd))
120 }
121}
122
123const PRE_HEADERS_BOOSTED_PRIORITY_URGENCY: u8 = 64;
127const PRE_HEADERS_BOOSTED_PRIORITY_INCREMENTAL: bool = false;
130
131pub struct ServerHooks {
132 settings_enforcer: Http3SettingsEnforcer,
134 requests: u64,
136
137 post_accept_timeout: Option<TimeoutKey>,
140}
141
142impl ServerHooks {
143 fn handle_request(
148 driver: &mut H3Driver<Self>, qconn: &mut QuicheConnection,
149 headers: InboundHeaders,
150 ) -> H3ConnectionResult<()> {
151 let InboundHeaders {
152 stream_id,
153 headers,
154 has_body,
155 } = headers;
156
157 if driver.stream_map.contains_key(&stream_id) {
161 return Ok(());
162 }
163
164 let (mut stream_ctx, send, recv) =
165 StreamCtx::new(stream_id, STREAM_CAPACITY);
166
167 if let Some(flow_id) = datagram::extract_flow_id(stream_id, &headers) {
168 let _ = driver.get_or_insert_flow(flow_id)?;
169 stream_ctx.associated_dgram_flow_id = Some(flow_id);
170 }
171
172 let latest_priority_update: Option<RawPriorityValue> = driver
173 .conn_mut()?
174 .take_last_priority_update(stream_id)
175 .ok()
176 .map(|v| v.into());
177
178 qconn
182 .stream_priority(
183 stream_id,
184 PRE_HEADERS_BOOSTED_PRIORITY_URGENCY,
185 PRE_HEADERS_BOOSTED_PRIORITY_INCREMENTAL,
186 )
187 .ok();
188
189 let headers = IncomingH3Headers {
190 stream_id,
191 headers,
192 send,
193 recv,
194 read_fin: !has_body,
195 h3_audit_stats: Arc::clone(&stream_ctx.audit_stats),
196 };
197
198 driver
199 .waiting_streams
200 .push(stream_ctx.wait_for_recv(stream_id));
201 driver.insert_stream(stream_id, stream_ctx);
202
203 driver
204 .h3_event_sender
205 .send(ServerH3Event::Headers {
206 incoming_headers: headers,
207 priority: latest_priority_update,
208 })
209 .map_err(|_| H3ConnectionError::ControllerWentAway)?;
210 driver.hooks.requests += 1;
211
212 Ok(())
213 }
214}
215
216#[allow(private_interfaces)]
217impl DriverHooks for ServerHooks {
218 type Command = ServerH3Command;
219 type Event = ServerH3Event;
220
221 fn new(settings: &Http3Settings) -> Self {
222 Self {
223 settings_enforcer: settings.into(),
224 requests: 0,
225 post_accept_timeout: None,
226 }
227 }
228
229 fn conn_established(
230 driver: &mut H3Driver<Self>, qconn: &mut QuicheConnection,
231 handshake_info: &HandshakeInfo,
232 ) -> H3ConnectionResult<()> {
233 assert!(
234 qconn.is_server(),
235 "ServerH3Driver requires a server-side QUIC connection"
236 );
237
238 if let Some(post_accept_timeout) =
239 driver.hooks.settings_enforcer.post_accept_timeout()
240 {
241 let remaining = post_accept_timeout
242 .checked_sub(handshake_info.elapsed())
243 .ok_or(H3ConnectionError::PostAcceptTimeout)?;
244
245 let key = driver
246 .hooks
247 .settings_enforcer
248 .add_timeout(Http3TimeoutType::PostAccept, remaining);
249 driver.hooks.post_accept_timeout = Some(key);
250 }
251
252 Ok(())
253 }
254
255 fn headers_received(
256 driver: &mut H3Driver<Self>, qconn: &mut QuicheConnection,
257 headers: InboundHeaders,
258 ) -> H3ConnectionResult<()> {
259 if driver
260 .hooks
261 .settings_enforcer
262 .enforce_requests_limit(driver.hooks.requests)
263 {
264 let _ =
265 qconn.close(true, quiche::h3::WireErrorCode::NoError as u64, &[]);
266 return Ok(());
267 }
268
269 if let Some(timeout) = driver.hooks.post_accept_timeout.take() {
270 driver.hooks.settings_enforcer.cancel_timeout(timeout);
273 }
274
275 Self::handle_request(driver, qconn, headers)
276 }
277
278 fn conn_command(
279 driver: &mut H3Driver<Self>, qconn: &mut QuicheConnection,
280 cmd: Self::Command,
281 ) -> H3ConnectionResult<()> {
282 let ServerH3Command::Core(cmd) = cmd;
283 driver.handle_core_command(qconn, cmd)
284 }
285
286 fn has_wait_action(driver: &mut H3Driver<Self>) -> bool {
287 driver.hooks.settings_enforcer.has_pending_timeouts()
288 }
289
290 async fn wait_for_action(
291 &mut self, qconn: &mut QuicheConnection,
292 ) -> H3ConnectionResult<()> {
293 self.settings_enforcer.enforce_timeouts(qconn).await?;
294 Err(H3ConnectionError::PostAcceptTimeout)
295 }
296}