1pub(crate) mod acceptor;
28pub(crate) mod connector;
29
30use super::connection::ConnectionMap;
31use super::connection::HandshakeInfo;
32use super::connection::Incoming;
33use super::connection::InitialQuicConnection;
34use super::connection::QuicConnectionParams;
35use super::io::worker::WriterConfig;
36use super::QuicheConnection;
37use crate::buf_factory::BufFactory;
38use crate::buf_factory::PooledBuf;
39use crate::metrics::labels;
40use crate::metrics::quic_expensive_metrics_ip_reduce;
41use crate::metrics::Metrics;
42use crate::settings::Config;
43
44use datagram_socket::DatagramSocketRecv;
45use datagram_socket::DatagramSocketSend;
46use foundations::telemetry::log;
47#[cfg(target_os = "linux")]
48use foundations::telemetry::metrics::Counter;
49#[cfg(target_os = "linux")]
50use foundations::telemetry::metrics::TimeHistogram;
51#[cfg(target_os = "linux")]
52use libc::sockaddr_in;
53#[cfg(target_os = "linux")]
54use libc::sockaddr_in6;
55use quiche::ConnectionId;
56use quiche::Header;
57use quiche::MAX_CONN_ID_LEN;
58use std::default::Default;
59use std::future::Future;
60use std::io;
61use std::net::SocketAddr;
62use std::pin::Pin;
63use std::sync::Arc;
64use std::task::ready;
65use std::task::Context;
66use std::task::Poll;
67use std::time::Instant;
68use std::time::SystemTime;
69use task_killswitch::spawn_with_killswitch;
70use tokio::sync::mpsc;
71
72type ConnStream<Tx, M> = mpsc::Receiver<io::Result<InitialQuicConnection<Tx, M>>>;
73
74#[cfg(feature = "perf-quic-listener-metrics")]
75mod listener_stage_timer {
76 use foundations::telemetry::metrics::TimeHistogram;
77 use std::time::Instant;
78
79 pub(super) struct ListenerStageTimer {
80 start: Instant,
81 time_hist: TimeHistogram,
82 }
83
84 impl ListenerStageTimer {
85 pub(super) fn new(
86 start: Instant, time_hist: TimeHistogram,
87 ) -> ListenerStageTimer {
88 ListenerStageTimer { start, time_hist }
89 }
90 }
91
92 impl Drop for ListenerStageTimer {
93 fn drop(&mut self) {
94 self.time_hist
95 .observe((Instant::now() - self.start).as_nanos() as u64);
96 }
97 }
98}
99
100#[derive(Debug)]
101struct PollRecvData {
102 bytes: usize,
103 src_addr: SocketAddr,
105 dst_addr_override: Option<SocketAddr>,
108 rx_time: Option<SystemTime>,
109 gro: Option<u16>,
110}
111
112pub enum ConnectionMapCommand {
115 UnmapCid(ConnectionId<'static>),
116 RemoveScid(ConnectionId<'static>),
117}
118
119pub struct InboundPacketRouter<Tx, Rx, M, I>
134where
135 Tx: DatagramSocketSend + Send + 'static,
136 M: Metrics,
137{
138 socket_tx: Arc<Tx>,
139 socket_rx: Rx,
140 local_addr: SocketAddr,
141 config: Config,
142 conns: ConnectionMap,
143 incoming_packet_handler: I,
144 shutdown_tx: Option<mpsc::Sender<()>>,
145 shutdown_rx: mpsc::Receiver<()>,
146 conn_map_cmd_tx: mpsc::UnboundedSender<ConnectionMapCommand>,
147 conn_map_cmd_rx: mpsc::UnboundedReceiver<ConnectionMapCommand>,
148 accept_sink: mpsc::Sender<io::Result<InitialQuicConnection<Tx, M>>>,
149 metrics: M,
150 #[cfg(target_os = "linux")]
151 udp_drop_count: u32,
152
153 #[cfg(target_os = "linux")]
154 reusable_cmsg_space: Vec<u8>,
155
156 current_buf: PooledBuf,
157
158 #[cfg(target_os = "linux")]
160 metrics_handshake_time_seconds: TimeHistogram,
161 #[cfg(target_os = "linux")]
162 metrics_udp_drop_count: Counter,
163}
164
165impl<Tx, Rx, M, I> InboundPacketRouter<Tx, Rx, M, I>
166where
167 Tx: DatagramSocketSend + Send + 'static,
168 Rx: DatagramSocketRecv,
169 M: Metrics,
170 I: InitialPacketHandler,
171{
172 pub(crate) fn new(
173 config: Config, socket_tx: Arc<Tx>, socket_rx: Rx,
174 local_addr: SocketAddr, incoming_packet_handler: I, metrics: M,
175 ) -> (Self, ConnStream<Tx, M>) {
176 let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
177 let (accept_sink, accept_stream) = mpsc::channel(config.listen_backlog);
178 let (conn_map_cmd_tx, conn_map_cmd_rx) = mpsc::unbounded_channel();
179
180 (
181 InboundPacketRouter {
182 local_addr,
183 socket_tx,
184 socket_rx,
185 conns: ConnectionMap::default(),
186 incoming_packet_handler,
187 shutdown_tx: Some(shutdown_tx),
188 shutdown_rx,
189 conn_map_cmd_tx,
190 conn_map_cmd_rx,
191 accept_sink,
192 #[cfg(target_os = "linux")]
193 udp_drop_count: 0,
194 #[cfg(target_os = "linux")]
195 reusable_cmsg_space: nix::cmsg_space!(u32, nix::sys::time::TimeSpec, u16, sockaddr_in, sockaddr_in6),
201 config,
202
203 current_buf: BufFactory::get_max_buf(),
204
205 #[cfg(target_os = "linux")]
206 metrics_handshake_time_seconds: metrics.handshake_time_seconds(labels::QuicHandshakeStage::QueueWaiting),
207 #[cfg(target_os = "linux")]
208 metrics_udp_drop_count: metrics.udp_drop_count(),
209
210 metrics,
211
212 },
213 accept_stream,
214 )
215 }
216
217 fn on_incoming(&mut self, mut incoming: Incoming) -> io::Result<()> {
218 #[cfg(feature = "perf-quic-listener-metrics")]
219 let start = std::time::Instant::now();
220
221 if let Some(dcid) = short_dcid(&incoming.buf) {
222 if let Some(ev_sender) = self.conns.get(&dcid) {
223 let _ = ev_sender.try_send(incoming);
224 return Ok(());
225 }
226 }
227
228 let hdr = Header::from_slice(&mut incoming.buf, MAX_CONN_ID_LEN)
229 .map_err(|e| match e {
230 quiche::Error::BufferTooShort | quiche::Error::InvalidPacket =>
231 labels::QuicInvalidInitialPacketError::FailedToParse.into(),
232 e => io::Error::other(e),
233 })?;
234
235 if let Some(ev_sender) = self.conns.get(&hdr.dcid) {
236 let _ = ev_sender.try_send(incoming);
237 return Ok(());
238 }
239
240 #[cfg(feature = "perf-quic-listener-metrics")]
241 let _timer = listener_stage_timer::ListenerStageTimer::new(
242 start,
243 self.metrics.handshake_time_seconds(
244 labels::QuicHandshakeStage::HandshakeProtocol,
245 ),
246 );
247
248 if self.shutdown_tx.is_none() {
249 return Ok(());
250 }
251
252 let local_addr = incoming.local_addr;
253 let peer_addr = incoming.peer_addr;
254
255 #[cfg(feature = "perf-quic-listener-metrics")]
256 let init_rx_time = incoming.rx_time;
257
258 let new_connection = self.incoming_packet_handler.handle_initials(
259 incoming,
260 hdr,
261 self.config.as_mut(),
262 )?;
263
264 match new_connection {
265 Some(new_connection) => self.spawn_new_connection(
266 new_connection,
267 local_addr,
268 peer_addr,
269 #[cfg(feature = "perf-quic-listener-metrics")]
270 init_rx_time,
271 ),
272 None => Ok(()),
273 }
274 }
275
276 fn spawn_new_connection(
279 &mut self, new_connection: NewConnection, local_addr: SocketAddr,
280 peer_addr: SocketAddr,
281 #[cfg(feature = "perf-quic-listener-metrics")] init_rx_time: Option<
282 SystemTime,
283 >,
284 ) -> io::Result<()> {
285 let NewConnection {
286 conn,
287 pending_cid,
288 handshake_start_time,
289 initial_pkt,
290 } = new_connection;
291
292 let Some(ref shutdown_tx) = self.shutdown_tx else {
293 return Ok(());
295 };
296 let Ok(send_permit) = self.accept_sink.try_reserve() else {
297 return Err(
299 labels::QuicInvalidInitialPacketError::AcceptQueueOverflow.into(),
300 );
301 };
302
303 let scid = conn.source_id().into_owned();
304 let writer_cfg = WriterConfig {
305 peer_addr,
306 pending_cid: pending_cid.clone(),
307 with_gso: self.config.has_gso,
308 pacing_offload: self.config.pacing_offload,
309 with_pktinfo: if self.local_addr.is_ipv4() {
310 self.config.has_ippktinfo
311 } else {
312 self.config.has_ipv6pktinfo
313 },
314 };
315
316 let handshake_info = HandshakeInfo::new(
317 handshake_start_time,
318 self.config.handshake_timeout,
319 );
320
321 let conn = InitialQuicConnection::new(QuicConnectionParams {
322 writer_cfg,
323 initial_pkt,
324 shutdown_tx: shutdown_tx.clone(),
325 conn_map_cmd_tx: self.conn_map_cmd_tx.clone(),
326 scid: scid.clone(),
327 metrics: self.metrics.clone(),
328 #[cfg(feature = "perf-quic-listener-metrics")]
329 init_rx_time,
330 handshake_info,
331 quiche_conn: conn,
332 socket: Arc::clone(&self.socket_tx),
333 local_addr,
334 peer_addr,
335 });
336
337 conn.audit_log_stats
338 .set_transport_handshake_start(instant_to_system(
339 handshake_start_time,
340 ));
341
342 self.conns.insert(scid, &conn);
343
344 if let Some(pending_cid) = pending_cid {
350 self.conns.map_cid(pending_cid, &conn);
351 }
352
353 self.metrics.accepted_initial_packet_count().inc();
354 if self.config.enable_expensive_packet_count_metrics {
355 if let Some(peer_ip) =
356 quic_expensive_metrics_ip_reduce(conn.peer_addr().ip())
357 {
358 self.metrics
359 .expensive_accepted_initial_packet_count(peer_ip)
360 .inc();
361 }
362 }
363
364 send_permit.send(Ok(conn));
365 Ok(())
366 }
367}
368
369impl<Tx, Rx, M, I> InboundPacketRouter<Tx, Rx, M, I>
370where
371 Tx: DatagramSocketSend + Send + Sync + 'static,
372 Rx: DatagramSocketRecv,
373 M: Metrics,
374 I: InitialPacketHandler,
375{
376 fn poll_recv_from(
379 &mut self, cx: &mut Context<'_>,
380 ) -> Poll<io::Result<PollRecvData>> {
381 let mut buf = tokio::io::ReadBuf::new(&mut self.current_buf);
382 let addr = ready!(self.socket_rx.poll_recv_from(cx, &mut buf))?;
383 Poll::Ready(Ok(PollRecvData {
384 bytes: buf.filled().len(),
385 src_addr: addr,
386 rx_time: None,
387 gro: None,
388 dst_addr_override: None,
389 }))
390 }
391
392 fn poll_recv_and_rx_time(
393 &mut self, cx: &mut Context<'_>,
394 ) -> Poll<io::Result<PollRecvData>> {
395 #[cfg(not(target_os = "linux"))]
396 {
397 self.poll_recv_from(cx)
398 }
399
400 #[cfg(target_os = "linux")]
401 {
402 use nix::errno::Errno;
403 use nix::sys::socket::*;
404 use std::net::SocketAddrV4;
405 use std::net::SocketAddrV6;
406 use std::os::fd::AsRawFd;
407 use tokio::io::Interest;
408
409 let Some(udp_socket) = self.socket_rx.as_udp_socket() else {
410 return self.poll_recv_from(cx);
413 };
414
415 self.reusable_cmsg_space.clear();
416
417 loop {
418 let iov_s = &mut [io::IoSliceMut::new(&mut self.current_buf)];
419 match udp_socket.try_io(Interest::READABLE, || {
420 recvmsg::<SockaddrStorage>(
421 udp_socket.as_raw_fd(),
422 iov_s,
423 Some(&mut self.reusable_cmsg_space),
424 MsgFlags::empty(),
425 )
426 .map_err(|x| x.into())
427 }) {
428 Ok(r) => {
429 let bytes = r.bytes;
430
431 let address = match r.address {
432 Some(inner) => inner,
433 _ => return Poll::Ready(Err(Errno::EINVAL.into())),
434 };
435
436 let peer_addr = match address.family() {
437 Some(AddressFamily::Inet) => SocketAddrV4::from(
438 *address.as_sockaddr_in().unwrap(),
439 )
440 .into(),
441 Some(AddressFamily::Inet6) => SocketAddrV6::from(
442 *address.as_sockaddr_in6().unwrap(),
443 )
444 .into(),
445 _ => {
446 return Poll::Ready(Err(Errno::EINVAL.into()));
447 },
448 };
449
450 let mut rx_time = None;
451 let mut gro = None;
452 let mut dst_addr_override = None;
453
454 for cmsg in r.cmsgs() {
455 match cmsg {
456 ControlMessageOwned::RxqOvfl(c) => {
457 if c != self.udp_drop_count {
458 self.metrics_udp_drop_count.inc_by(
459 (c - self.udp_drop_count) as u64,
460 );
461 self.udp_drop_count = c;
462 }
463 },
464 ControlMessageOwned::ScmTimestampns(val) => {
465 rx_time = SystemTime::UNIX_EPOCH
466 .checked_add(val.into());
467 if let Some(delta) =
468 rx_time.and_then(|rx_time| {
469 rx_time.elapsed().ok()
470 })
471 {
472 self.metrics_handshake_time_seconds
473 .observe(delta.as_nanos() as u64);
474 }
475 },
476 ControlMessageOwned::UdpGroSegments(val) =>
477 gro = Some(val),
478 ControlMessageOwned::Ipv4OrigDstAddr(val) => {
479 let source_addr = std::net::Ipv4Addr::from(
480 u32::to_be(val.sin_addr.s_addr),
481 );
482 let source_port = u16::to_be(val.sin_port);
483
484 let parsed_addr =
485 SocketAddr::V4(SocketAddrV4::new(
486 source_addr,
487 source_port,
488 ));
489
490 dst_addr_override = resolve_dst_addr(
491 &self.local_addr,
492 &parsed_addr,
493 );
494 },
495 ControlMessageOwned::Ipv6OrigDstAddr(val) => {
496 let source_addr = std::net::Ipv6Addr::from(
501 val.sin6_addr.s6_addr,
502 );
503 let source_port = u16::to_be(val.sin6_port);
504 let source_flowinfo =
505 u32::to_be(val.sin6_flowinfo);
506 let source_scope =
507 u32::to_be(val.sin6_scope_id);
508
509 let parsed_addr =
510 SocketAddr::V6(SocketAddrV6::new(
511 source_addr,
512 source_port,
513 source_flowinfo,
514 source_scope,
515 ));
516
517 dst_addr_override = resolve_dst_addr(
518 &self.local_addr,
519 &parsed_addr,
520 );
521 },
522 ControlMessageOwned::Ipv4PacketInfo(_) |
523 ControlMessageOwned::Ipv6PacketInfo(_) => {
524 },
529 _ => {
530 return Poll::Ready(
531 Err(Errno::EINVAL.into()),
532 );
533 },
534 };
535 }
536
537 return Poll::Ready(Ok(PollRecvData {
538 bytes,
539 src_addr: peer_addr,
540 dst_addr_override,
541 rx_time,
542 gro,
543 }));
544 },
545 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
546 ready!(udp_socket.poll_recv_ready(cx))?
554 },
555 Err(e) => return Poll::Ready(Err(e)),
556 }
557 }
558 }
559 }
560
561 fn handle_conn_map_commands(&mut self) {
562 while let Ok(req) = self.conn_map_cmd_rx.try_recv() {
563 match req {
564 ConnectionMapCommand::UnmapCid(cid) => self.conns.unmap_cid(&cid),
565 ConnectionMapCommand::RemoveScid(scid) =>
566 self.conns.remove(&scid),
567 }
568 }
569 }
570}
571
572fn short_dcid(buf: &[u8]) -> Option<ConnectionId<'_>> {
574 let is_short_dcid = buf.first()? >> 7 == 0;
575
576 if is_short_dcid {
577 buf.get(1..1 + MAX_CONN_ID_LEN).map(ConnectionId::from_ref)
578 } else {
579 None
580 }
581}
582
583fn instant_to_system(ts: Instant) -> SystemTime {
586 let now = Instant::now();
587 let system_now = SystemTime::now();
588 if let Some(delta) = now.checked_duration_since(ts) {
589 return system_now - delta;
590 }
591
592 let delta = ts.checked_duration_since(now).expect("now < ts");
593 system_now + delta
594}
595
596#[cfg(target_os = "linux")]
606fn resolve_dst_addr(
607 local: &SocketAddr, parsed: &SocketAddr,
608) -> Option<SocketAddr> {
609 if local != parsed {
610 return Some(*parsed);
611 }
612
613 None
614}
615
616impl<Tx, Rx, M, I> Future for InboundPacketRouter<Tx, Rx, M, I>
617where
618 Tx: DatagramSocketSend + Send + Sync + 'static,
619 Rx: DatagramSocketRecv + Unpin,
620 M: Metrics,
621 I: InitialPacketHandler + Unpin,
622{
623 type Output = io::Result<()>;
624
625 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
626 let server_addr = self.local_addr;
627
628 loop {
629 if let Err(error) = self.incoming_packet_handler.update(cx) {
630 let sender = self.accept_sink.clone();
632 spawn_with_killswitch(async move {
633 let _ = sender.send(Err(error)).await;
634 });
635 }
636
637 match self.poll_recv_and_rx_time(cx) {
638 Poll::Ready(Ok(PollRecvData {
639 bytes,
640 src_addr: peer_addr,
641 dst_addr_override,
642 rx_time,
643 gro,
644 })) => {
645 let mut buf = std::mem::replace(
646 &mut self.current_buf,
647 BufFactory::get_max_buf(),
648 );
649 buf.truncate(bytes);
650
651 let send_from = if let Some(dst_addr) = dst_addr_override {
652 log::trace!("overriding local address"; "actual_local" => format!("{:?}", dst_addr), "configured_local" => format!("{:?}", server_addr));
653 dst_addr
654 } else {
655 server_addr
656 };
657
658 let res = self.on_incoming(Incoming {
659 peer_addr,
660 local_addr: send_from,
661 buf,
662 rx_time,
663 gro,
664 });
665
666 if let Err(e) = res {
667 let err_type = initial_packet_error_type(&e);
668 self.metrics
669 .rejected_initial_packet_count(err_type.clone())
670 .inc();
671
672 if self.config.enable_expensive_packet_count_metrics {
673 if let Some(peer_ip) =
674 quic_expensive_metrics_ip_reduce(peer_addr.ip())
675 {
676 self.metrics
677 .expensive_rejected_initial_packet_count(
678 err_type.clone(),
679 peer_ip,
680 )
681 .inc();
682 }
683 }
684
685 if matches!(
686 err_type,
687 labels::QuicInvalidInitialPacketError::Unexpected
688 ) {
689 let _ = self.accept_sink.try_send(Err(e));
691 }
692 }
693 },
694
695 Poll::Ready(Err(e)) => {
696 log::error!("Incoming packet router encountered recvmsg error"; "error" => e);
697 continue;
698 },
699
700 Poll::Pending => {
701 if self.shutdown_tx.is_some() && self.accept_sink.is_closed()
703 {
704 self.shutdown_tx = None;
705 }
706
707 if self.shutdown_rx.poll_recv(cx).is_ready() {
708 return Poll::Ready(Ok(()));
709 }
710
711 self.handle_conn_map_commands();
713
714 return Poll::Pending;
715 },
716 }
717 }
718 }
719}
720
721fn initial_packet_error_type(
726 e: &io::Error,
727) -> labels::QuicInvalidInitialPacketError {
728 Some(e)
729 .filter(|e| e.kind() == io::ErrorKind::Other)
730 .and_then(io::Error::get_ref)
731 .and_then(|e| e.downcast_ref())
732 .map_or(
733 labels::QuicInvalidInitialPacketError::Unexpected,
734 Clone::clone,
735 )
736}
737
738pub trait InitialPacketHandler {
747 fn update(&mut self, _ctx: &mut Context<'_>) -> io::Result<()> {
748 Ok(())
749 }
750
751 fn handle_initials(
752 &mut self, incoming: Incoming, hdr: Header<'static>,
753 quiche_config: &mut quiche::Config,
754 ) -> io::Result<Option<NewConnection>>;
755}
756
757pub struct NewConnection {
760 conn: QuicheConnection,
761 pending_cid: Option<ConnectionId<'static>>,
762 initial_pkt: Option<Incoming>,
763 handshake_start_time: Instant,
766}
767
768#[cfg(all(test, unix))]
771mod tests {
772 use super::acceptor::ConnectionAcceptor;
773 use super::acceptor::ConnectionAcceptorConfig;
774 use super::*;
775
776 use crate::http3::settings::Http3Settings;
777 use crate::metrics::DefaultMetrics;
778 use crate::quic::connection::SimpleConnectionIdGenerator;
779 use crate::settings::Config;
780 use crate::settings::Hooks;
781 use crate::settings::QuicSettings;
782 use crate::settings::TlsCertificatePaths;
783 use crate::socket::SocketCapabilities;
784 use crate::ConnectionParams;
785 use crate::ServerH3Driver;
786
787 use datagram_socket::MAX_DATAGRAM_SIZE;
788 use h3i::actions::h3::Action;
789 use std::sync::Arc;
790 use std::time::Duration;
791 use tokio::net::UdpSocket;
792 use tokio::time;
793
794 const TEST_CERT_FILE: &str = concat!(
795 env!("CARGO_MANIFEST_DIR"),
796 "/",
797 "../quiche/examples/cert.crt"
798 );
799 const TEST_KEY_FILE: &str = concat!(
800 env!("CARGO_MANIFEST_DIR"),
801 "/",
802 "../quiche/examples/cert.key"
803 );
804
805 fn test_connect(host_port: String) {
806 let h3i_config = h3i::config::Config::new()
807 .with_host_port("test.com".to_string())
808 .with_idle_timeout(2000)
809 .with_connect_to(host_port)
810 .verify_peer(false)
811 .build()
812 .unwrap();
813
814 let conn_close = h3i::quiche::ConnectionError {
815 is_app: true,
816 error_code: h3i::quiche::WireErrorCode::NoError as _,
817 reason: Vec::new(),
818 };
819 let actions = vec![Action::ConnectionClose { error: conn_close }];
820
821 let _ = h3i::client::sync_client::connect(h3i_config, actions, None);
822 }
823
824 #[tokio::test]
825 async fn test_timeout() {
826 let quic_settings = QuicSettings {
829 max_idle_timeout: Some(Duration::from_millis(1)),
830 max_recv_udp_payload_size: MAX_DATAGRAM_SIZE,
831 max_send_udp_payload_size: MAX_DATAGRAM_SIZE,
832 ..Default::default()
833 };
834
835 let tls_cert_settings = TlsCertificatePaths {
836 cert: TEST_CERT_FILE,
837 private_key: TEST_KEY_FILE,
838 kind: crate::settings::CertificateKind::X509,
839 };
840
841 let params = ConnectionParams::new_server(
842 quic_settings,
843 tls_cert_settings,
844 Hooks::default(),
845 );
846 let config = Config::new(¶ms, SocketCapabilities::default()).unwrap();
847
848 let socket = UdpSocket::bind("127.0.0.1:0").await.unwrap();
849 let local_addr = socket.local_addr().unwrap();
850 let host_port = local_addr.to_string();
851 let socket_tx = Arc::new(socket);
852 let socket_rx = Arc::clone(&socket_tx);
853
854 let acceptor = ConnectionAcceptor::new(
855 ConnectionAcceptorConfig {
856 disable_client_ip_validation: config.disable_client_ip_validation,
857 qlog_dir: config.qlog_dir.clone(),
858 keylog_file: config
859 .keylog_file
860 .as_ref()
861 .and_then(|f| f.try_clone().ok()),
862 #[cfg(target_os = "linux")]
863 with_pktinfo: false,
864 },
865 Arc::clone(&socket_tx),
866 0,
867 Default::default(),
868 Box::new(SimpleConnectionIdGenerator),
869 DefaultMetrics,
870 );
871
872 let (socket_driver, mut incoming) = InboundPacketRouter::new(
873 config,
874 socket_tx,
875 socket_rx,
876 local_addr,
877 acceptor,
878 DefaultMetrics,
879 );
880 tokio::spawn(socket_driver);
881
882 std::thread::spawn(move || test_connect(host_port));
884
885 time::pause();
887
888 let (h3_driver, _) = ServerH3Driver::new(Http3Settings::default());
889 let conn = incoming.recv().await.unwrap().unwrap();
890 let drop_check = conn.incoming_ev_sender.clone();
891 let _conn = conn.start(h3_driver);
892
893 time::advance(Duration::new(30, 0)).await;
895 time::resume();
896
897 drop_check.closed().await;
900 }
901}