tokio_quiche/http3/driver/
server.rs1use std::sync::Arc;
28
29use tokio::sync::mpsc;
30
31use super::datagram;
32use super::DriverHooks;
33use super::H3Command;
34use super::H3ConnectionError;
35use super::H3ConnectionResult;
36use super::H3Controller;
37use super::H3Driver;
38use super::H3Event;
39use super::InboundHeaders;
40use super::IncomingH3Headers;
41use super::StreamCtx;
42use super::STREAM_CAPACITY;
43use crate::http3::settings::Http3Settings;
44use crate::http3::settings::Http3SettingsEnforcer;
45use crate::http3::settings::Http3TimeoutType;
46use crate::http3::settings::TimeoutKey;
47use crate::quic::HandshakeInfo;
48use crate::quic::QuicCommand;
49use crate::quic::QuicheConnection;
50
51pub type ServerH3Driver = H3Driver<ServerHooks>;
55pub type ServerH3Controller = H3Controller<ServerHooks>;
58pub type ServerEventStream = mpsc::UnboundedReceiver<ServerH3Event>;
62
63#[derive(Debug)]
65pub enum ServerH3Event {
66 Core(H3Event),
67}
68
69impl From<H3Event> for ServerH3Event {
70 fn from(ev: H3Event) -> Self {
71 Self::Core(ev)
72 }
73}
74
75#[derive(Debug)]
77pub enum ServerH3Command {
78 Core(H3Command),
79}
80
81impl From<H3Command> for ServerH3Command {
82 fn from(cmd: H3Command) -> Self {
83 Self::Core(cmd)
84 }
85}
86
87impl From<QuicCommand> for ServerH3Command {
88 fn from(cmd: QuicCommand) -> Self {
89 Self::Core(H3Command::QuicCmd(cmd))
90 }
91}
92
93pub struct ServerHooks {
94 settings_enforcer: Http3SettingsEnforcer,
96 requests: u64,
98
99 post_accept_timeout: Option<TimeoutKey>,
102}
103
104impl ServerHooks {
105 fn handle_request(
110 driver: &mut H3Driver<Self>, headers: InboundHeaders,
111 ) -> H3ConnectionResult<()> {
112 let InboundHeaders {
113 stream_id,
114 headers,
115 has_body,
116 } = headers;
117
118 if driver.stream_map.contains_key(&stream_id) {
122 return Ok(());
123 }
124
125 let (mut stream_ctx, send, recv) =
126 StreamCtx::new(stream_id, STREAM_CAPACITY);
127
128 if let Some(flow_id) = datagram::extract_flow_id(stream_id, &headers) {
129 let _ = driver.get_or_insert_flow(flow_id)?;
130 stream_ctx.associated_dgram_flow_id = Some(flow_id);
131 }
132
133 let headers = IncomingH3Headers {
134 stream_id,
135 headers,
136 send,
137 recv,
138 read_fin: !has_body,
139 h3_audit_stats: Arc::clone(&stream_ctx.audit_stats),
140 };
141
142 driver
143 .waiting_streams
144 .push(stream_ctx.wait_for_recv(stream_id));
145 driver.insert_stream(stream_id, stream_ctx);
146
147 driver
148 .h3_event_sender
149 .send(H3Event::IncomingHeaders(headers).into())
150 .map_err(|_| H3ConnectionError::ControllerWentAway)?;
151 driver.hooks.requests += 1;
152
153 Ok(())
154 }
155}
156
157#[allow(private_interfaces)]
158impl DriverHooks for ServerHooks {
159 type Command = ServerH3Command;
160 type Event = ServerH3Event;
161
162 fn new(settings: &Http3Settings) -> Self {
163 Self {
164 settings_enforcer: settings.into(),
165 requests: 0,
166 post_accept_timeout: None,
167 }
168 }
169
170 fn conn_established(
171 driver: &mut H3Driver<Self>, qconn: &mut QuicheConnection,
172 handshake_info: &HandshakeInfo,
173 ) -> H3ConnectionResult<()> {
174 assert!(
175 qconn.is_server(),
176 "ServerH3Driver requires a server-side QUIC connection"
177 );
178
179 if let Some(post_accept_timeout) =
180 driver.hooks.settings_enforcer.post_accept_timeout()
181 {
182 let remaining = post_accept_timeout
183 .checked_sub(handshake_info.elapsed())
184 .ok_or(H3ConnectionError::PostAcceptTimeout)?;
185
186 let key = driver
187 .hooks
188 .settings_enforcer
189 .add_timeout(Http3TimeoutType::PostAccept, remaining);
190 driver.hooks.post_accept_timeout = Some(key);
191 }
192
193 Ok(())
194 }
195
196 fn headers_received(
197 driver: &mut H3Driver<Self>, qconn: &mut QuicheConnection,
198 headers: InboundHeaders,
199 ) -> H3ConnectionResult<()> {
200 if driver
201 .hooks
202 .settings_enforcer
203 .enforce_requests_limit(driver.hooks.requests)
204 {
205 let _ =
206 qconn.close(true, quiche::h3::WireErrorCode::NoError as u64, &[]);
207 return Ok(());
208 }
209
210 if let Some(timeout) = driver.hooks.post_accept_timeout.take() {
211 driver.hooks.settings_enforcer.cancel_timeout(timeout);
214 }
215
216 Self::handle_request(driver, headers)
217 }
218
219 fn conn_command(
220 driver: &mut H3Driver<Self>, qconn: &mut QuicheConnection,
221 cmd: Self::Command,
222 ) -> H3ConnectionResult<()> {
223 let ServerH3Command::Core(cmd) = cmd;
224 driver.handle_core_command(qconn, cmd)
225 }
226
227 fn has_wait_action(driver: &mut H3Driver<Self>) -> bool {
228 driver.hooks.settings_enforcer.has_pending_timeouts()
229 }
230
231 async fn wait_for_action(
232 &mut self, qconn: &mut QuicheConnection,
233 ) -> H3ConnectionResult<()> {
234 self.settings_enforcer.enforce_timeouts(qconn).await?;
235 Err(H3ConnectionError::PostAcceptTimeout)
236 }
237}