tokio_quiche/quic/router/
acceptor.rs1use std::fs::File;
28use std::io;
29use std::sync::Arc;
30use std::time::Instant;
31
32use datagram_socket::DatagramSocketSend;
33use datagram_socket::DatagramSocketSendExt;
34use datagram_socket::MAX_DATAGRAM_SIZE;
35use quiche::ConnectionId;
36use quiche::Header;
37use quiche::Type as PacketType;
38use task_killswitch::spawn_with_killswitch;
39
40use crate::metrics::labels;
41use crate::metrics::Metrics;
42use crate::quic::addr_validation_token::AddrValidationTokenManager;
43use crate::quic::make_qlog_writer;
44use crate::quic::router::NewConnection;
45use crate::quic::Incoming;
46use crate::ConnectionIdGenerator;
47use crate::QuicResultExt;
48
49use super::InitialPacketHandler;
50
51pub(crate) struct ConnectionAcceptor<S, M> {
54 config: ConnectionAcceptorConfig,
55 socket: Arc<S>,
56 socket_cookie: u64,
57 token_manager: AddrValidationTokenManager,
58 cid_generator: Box<dyn ConnectionIdGenerator<'static>>,
59 metrics: M,
60}
61
62pub(crate) struct ConnectionAcceptorConfig {
63 pub(crate) disable_client_ip_validation: bool,
64 pub(crate) qlog_dir: Option<String>,
65 pub(crate) keylog_file: Option<File>,
66 #[cfg(target_os = "linux")]
67 pub(crate) with_pktinfo: bool,
68}
69
70impl<S, M> ConnectionAcceptor<S, M>
71where
72 S: DatagramSocketSend + Send + 'static,
73 M: Metrics,
74{
75 pub(crate) fn new(
76 config: ConnectionAcceptorConfig, socket: Arc<S>, socket_cookie: u64,
77 token_manager: AddrValidationTokenManager,
78 cid_generator: Box<dyn ConnectionIdGenerator<'static>>, metrics: M,
79 ) -> Self {
80 Self {
81 config,
82 socket,
83 socket_cookie,
84 token_manager,
85 cid_generator,
86 metrics,
87 }
88 }
89
90 fn accept_conn(
91 &mut self, incoming: Incoming, scid: ConnectionId<'static>,
92 original_dcid: Option<&ConnectionId>,
93 pending_cid: Option<ConnectionId<'static>>,
94 quiche_config: &mut quiche::Config,
95 ) -> io::Result<Option<NewConnection>> {
96 let handshake_start_time = Instant::now();
97
98 #[cfg(feature = "zero-copy")]
99 let mut conn = quiche::accept_with_buf_factory(
100 &scid,
101 original_dcid,
102 incoming.local_addr,
103 incoming.peer_addr,
104 quiche_config,
105 )
106 .into_io()?;
107
108 #[cfg(not(feature = "zero-copy"))]
109 let mut conn = quiche::accept(
110 &scid,
111 original_dcid,
112 incoming.local_addr,
113 incoming.peer_addr,
114 quiche_config,
115 )
116 .into_io()?;
117
118 if let Some(qlog_dir) = &self.config.qlog_dir {
119 let id = format!("{:?}", &scid);
120 if let Ok(writer) = make_qlog_writer(qlog_dir, &id) {
121 conn.set_qlog(
122 std::boxed::Box::new(writer),
123 "tokio-quiche qlog".to_string(),
124 format!("tokio-quiche qlog id={id}"),
125 );
126 }
127 }
128
129 if let Some(keylog_file) = &self.config.keylog_file {
130 if let Ok(keylog_clone) = keylog_file.try_clone() {
131 conn.set_keylog(Box::new(keylog_clone));
132 }
133 }
134
135 Ok(Some(NewConnection {
136 conn,
137 handshake_start_time,
138 pending_cid,
139 initial_pkt: Some(incoming),
140 }))
141 }
142
143 fn handshake_reply(
144 &self, incoming: Incoming,
145 writer: impl FnOnce(&mut [u8]) -> io::Result<usize>,
146 ) -> io::Result<Option<NewConnection>> {
147 let mut send_buf = [0u8; MAX_DATAGRAM_SIZE];
148 let written = writer(&mut send_buf)?;
149 let socket = Arc::clone(&self.socket);
150 #[cfg(target_os = "linux")]
151 let with_pktinfo = self.config.with_pktinfo;
152
153 spawn_with_killswitch(async move {
154 let send_buf = &send_buf[..written];
155 let to = incoming.peer_addr;
156
157 #[allow(unused_variables)]
158 let Some(udp) = socket.as_udp_socket() else {
159 let _ = socket.send_to(send_buf, to).await;
160 return;
161 };
162
163 #[cfg(target_os = "linux")]
164 {
165 let from = Some(incoming.local_addr).filter(|_| with_pktinfo);
166 let _ = crate::quic::io::gso::send_to(
167 udp, to, from, send_buf, 1, 1, None,
168 )
169 .await;
170 }
171
172 #[cfg(not(target_os = "linux"))]
173 let _ = socket.send_to(send_buf, to).await;
174 });
175
176 Ok(None)
177 }
178
179 fn stateless_retry(
180 &mut self, incoming: Incoming, hdr: Header,
181 ) -> io::Result<Option<NewConnection>> {
182 let scid = self.new_connection_id();
183
184 let token = self.token_manager.gen(&hdr.dcid, incoming.peer_addr);
185
186 self.handshake_reply(incoming, move |buf| {
187 quiche::retry(&hdr.scid, &hdr.dcid, &scid, &token, hdr.version, buf)
188 .into_io()
189 })
190 }
191
192 fn new_connection_id(&self) -> ConnectionId<'static> {
193 self.cid_generator.new_connection_id(self.socket_cookie)
194 }
195}
196
197impl<S, M> InitialPacketHandler for ConnectionAcceptor<S, M>
198where
199 S: DatagramSocketSend + Send + 'static,
200 M: Metrics,
201{
202 fn handle_initials(
203 &mut self, incoming: Incoming, hdr: quiche::Header<'static>,
204 quiche_config: &mut quiche::Config,
205 ) -> io::Result<Option<NewConnection>> {
206 if hdr.ty != PacketType::Initial {
207 if let Err(e) = self
210 .cid_generator
211 .verify_connection_id(self.socket_cookie, &hdr.dcid)
212 {
213 self.metrics.invalid_cid_packet_count(e).inc();
214 }
215
216 Err(labels::QuicInvalidInitialPacketError::WrongType(hdr.ty))?;
217 }
218
219 if !quiche::version_is_supported(hdr.version) {
220 return self.handshake_reply(incoming, |buf| {
221 quiche::negotiate_version(&hdr.scid, &hdr.dcid, buf).into_io()
222 });
223 }
224
225 let (scid, original_dcid, pending_cid) = if self
226 .config
227 .disable_client_ip_validation
228 {
229 (self.new_connection_id(), None, Some(hdr.dcid))
230 } else {
231 let token = hdr.token.as_ref().unwrap();
233
234 if token.is_empty() {
235 return self.stateless_retry(incoming, hdr);
236 }
237
238 (
239 hdr.dcid,
240 Some(
241 self.token_manager
242 .validate_and_extract_original_dcid(token, incoming.peer_addr)
243 .or(Err(
244 labels::QuicInvalidInitialPacketError::TokenValidationFail,
245 ))?,
246 ),
247 None,
248 )
249 };
250
251 self.accept_conn(
252 incoming,
253 scid,
254 original_dcid.as_ref(),
255 pending_cid,
256 quiche_config,
257 )
258 }
259}