1use std::net::SocketAddr;
28use std::ops::ControlFlow;
29use std::sync::Arc;
30use std::task::Poll;
31use std::time::Duration;
32use std::time::Instant;
33#[cfg(feature = "perf-quic-listener-metrics")]
34use std::time::SystemTime;
35
36use super::connection_stage::Close;
37use super::connection_stage::ConnectionStage;
38use super::connection_stage::ConnectionStageContext;
39use super::connection_stage::Handshake;
40use super::connection_stage::RunningApplication;
41use super::gso::*;
42use super::utilization_estimator::BandwidthReporter;
43
44use crate::metrics::labels;
45use crate::metrics::Metrics;
46use crate::quic::connection::ApplicationOverQuic;
47use crate::quic::connection::HandshakeError;
48use crate::quic::connection::Incoming;
49use crate::quic::connection::QuicConnectionStats;
50use crate::quic::router::ConnectionMapCommand;
51use crate::quic::QuicheConnection;
52use crate::QuicResult;
53
54use boring::ssl::SslRef;
55use datagram_socket::DatagramSocketSend;
56use datagram_socket::DatagramSocketSendExt;
57use datagram_socket::MaybeConnectedSocket;
58use datagram_socket::QuicAuditStats;
59use foundations::telemetry::log;
60use quiche::ConnectionId;
61use quiche::Error as QuicheError;
62use quiche::SendInfo;
63use tokio::select;
64use tokio::sync::mpsc;
65use tokio::time;
66
67pub(crate) const INCOMING_QUEUE_SIZE: usize = 2048;
69
70pub(crate) const CHECK_INCOMING_QUEUE_RATIO: usize = INCOMING_QUEUE_SIZE / 16;
73
74const RELEASE_TIMER_THRESHOLD: Duration = Duration::from_micros(250);
75
76const GSO_THRESHOLD: usize = 1_000;
78
79pub struct WriterConfig {
80 pub pending_cid: Option<ConnectionId<'static>>,
81 pub peer_addr: SocketAddr,
82 pub with_gso: bool,
83 pub pacing_offload: bool,
84 pub with_pktinfo: bool,
85}
86
87#[derive(Default)]
88pub(crate) struct WriteState {
89 conn_established: bool,
90 bytes_written: usize,
91 segment_size: usize,
92 num_pkts: usize,
93 tx_time: Option<Instant>,
94 has_pending_data: bool,
95 next_release_time: Option<Instant>,
98 send_from: Option<SocketAddr>,
101}
102
103pub(crate) struct IoWorkerParams<Tx, M> {
104 pub(crate) socket: MaybeConnectedSocket<Tx>,
105 pub(crate) shutdown_tx: mpsc::Sender<()>,
106 pub(crate) cfg: WriterConfig,
107 pub(crate) audit_log_stats: Arc<QuicAuditStats>,
108 pub(crate) write_state: WriteState,
109 pub(crate) conn_map_cmd_tx: mpsc::UnboundedSender<ConnectionMapCommand>,
110 #[cfg(feature = "perf-quic-listener-metrics")]
111 pub(crate) init_rx_time: Option<SystemTime>,
112 pub(crate) metrics: M,
113}
114
115pub(crate) struct IoWorker<Tx, M, S> {
116 socket: MaybeConnectedSocket<Tx>,
117 shutdown_tx: mpsc::Sender<()>,
121 cfg: WriterConfig,
122 audit_log_stats: Arc<QuicAuditStats>,
123 write_state: WriteState,
124 conn_map_cmd_tx: mpsc::UnboundedSender<ConnectionMapCommand>,
125 #[cfg(feature = "perf-quic-listener-metrics")]
126 init_rx_time: Option<SystemTime>,
127 metrics: M,
128 conn_stage: S,
129 bw_estimator: BandwidthReporter,
130}
131
132impl<Tx, M, S> IoWorker<Tx, M, S>
133where
134 Tx: DatagramSocketSend + Send,
135 M: Metrics,
136 S: ConnectionStage,
137{
138 pub(crate) fn new(params: IoWorkerParams<Tx, M>, conn_stage: S) -> Self {
139 let bw_estimator =
140 BandwidthReporter::new(params.metrics.utilized_bandwidth());
141
142 log::trace!("Creating IoWorker with stage: {conn_stage:?}");
143
144 Self {
145 socket: params.socket,
146 shutdown_tx: params.shutdown_tx,
147 cfg: params.cfg,
148 audit_log_stats: params.audit_log_stats,
149 write_state: params.write_state,
150 conn_map_cmd_tx: params.conn_map_cmd_tx,
151 #[cfg(feature = "perf-quic-listener-metrics")]
152 init_rx_time: params.init_rx_time,
153 metrics: params.metrics,
154 conn_stage,
155 bw_estimator,
156 }
157 }
158
159 async fn work_loop<A: ApplicationOverQuic>(
160 &mut self, qconn: &mut QuicheConnection,
161 ctx: &mut ConnectionStageContext<A>,
162 ) -> QuicResult<()> {
163 const DEFAULT_SLEEP: Duration = Duration::from_secs(60);
164 let mut current_deadline: Option<Instant> = None;
165 let sleep = time::sleep(DEFAULT_SLEEP);
166 tokio::pin!(sleep);
167
168 loop {
169 let now = Instant::now();
170
171 self.write_state.has_pending_data = true;
172
173 while self.write_state.has_pending_data {
174 let mut packets_sent = 0;
175
176 let mut did_recv = false;
181 while let Some(pkt) = ctx
182 .in_pkt
183 .take()
184 .or_else(|| ctx.incoming_pkt_receiver.try_recv().ok())
185 {
186 self.process_incoming(qconn, pkt)?;
187 did_recv = true;
188 }
189
190 self.conn_stage.on_read(did_recv, qconn, ctx)?;
191
192 let can_release = match self.write_state.next_release_time {
193 None => true,
194 Some(next_release) =>
195 next_release
196 .checked_duration_since(now)
197 .unwrap_or_default() <
198 RELEASE_TIMER_THRESHOLD,
199 };
200
201 self.write_state.has_pending_data &= can_release;
202
203 while self.write_state.has_pending_data &&
204 packets_sent < CHECK_INCOMING_QUEUE_RATIO
205 {
206 self.gather_data_from_quiche_conn(qconn, ctx.buffer())?;
207
208 if qconn.is_closed() {
210 return Ok(());
211 }
212
213 self.flush_buffer_to_socket(ctx.buffer()).await;
214 packets_sent += self.write_state.num_pkts;
215
216 if let ControlFlow::Break(reason) =
217 self.conn_stage.on_flush(qconn, ctx)
218 {
219 return reason;
220 }
221 }
222 }
223
224 self.bw_estimator.update(qconn, now);
225
226 self.audit_log_stats
227 .set_max_bandwidth(self.bw_estimator.max_bandwidth);
228 self.audit_log_stats.set_max_loss_pct(
229 (self.bw_estimator.max_loss_pct * 100_f32).round() as u8,
230 );
231
232 let new_deadline = min_of_some(
233 qconn.timeout_instant(),
234 self.write_state.next_release_time,
235 );
236 let new_deadline =
237 min_of_some(new_deadline, self.conn_stage.wait_deadline());
238
239 if new_deadline != current_deadline {
240 current_deadline = new_deadline;
241
242 sleep
243 .as_mut()
244 .reset(new_deadline.unwrap_or(now + DEFAULT_SLEEP).into());
245 }
246
247 let incoming_recv = &mut ctx.incoming_pkt_receiver;
248 let application = &mut ctx.application;
249 select! {
250 biased;
251 () = &mut sleep => {
252 qconn.on_timeout();
259
260 self.write_state.next_release_time = None;
261 current_deadline = None;
262 sleep.as_mut().reset((now + DEFAULT_SLEEP).into());
263 }
264 Some(pkt) = incoming_recv.recv() => ctx.in_pkt = Some(pkt),
265 status = self.wait_for_data_or_handshake(qconn, application) => status?,
269 };
270
271 if let ControlFlow::Break(reason) = self.conn_stage.post_wait(qconn) {
272 return reason;
273 }
274 }
275 }
276
277 #[inline]
278 fn gather_data_from_quiche_conn(
279 &mut self, qconn: &mut QuicheConnection, send_buf: &mut [u8],
280 ) -> QuicResult<usize> {
281 self.fill_send_buffer(qconn, send_buf)
282 }
283
284 #[cfg(feature = "perf-quic-listener-metrics")]
285 fn measure_complete_handshake_time(&mut self) {
286 if let Some(init_rx_time) = self.init_rx_time.take() {
287 if let Ok(delta) = init_rx_time.elapsed() {
288 self.metrics
289 .handshake_time_seconds(
290 labels::QuicHandshakeStage::HandshakeResponse,
291 )
292 .observe(delta.as_nanos() as u64);
293 }
294 }
295 }
296
297 fn fill_send_buffer(
298 &mut self, qconn: &mut QuicheConnection, send_buf: &mut [u8],
299 ) -> QuicResult<usize> {
300 let mut segment_size = None;
301 let mut send_info = None;
302
303 self.write_state.num_pkts = 0;
304 self.write_state.bytes_written = 0;
305
306 let now = Instant::now();
307
308 let send_buf = {
309 let trunc = UDP_MAX_GSO_PACKET_SIZE.min(send_buf.len());
310 &mut send_buf[..trunc]
311 };
312
313 #[cfg(feature = "gcongestion")]
314 let gcongestion_enabled = true;
315
316 #[cfg(not(feature = "gcongestion"))]
317 let gcongestion_enabled = qconn.gcongestion_enabled().unwrap_or(false);
318
319 let initial_release_decision = if gcongestion_enabled {
320 let initial_release_decision = qconn
321 .get_next_release_time()
322 .filter(|_| self.pacing_enabled(qconn));
323
324 if let Some(future_release_time) =
325 initial_release_decision.as_ref().and_then(|v| v.time(now))
326 {
327 let max_into_fut = qconn.max_release_into_future();
328
329 if future_release_time.duration_since(now) >= max_into_fut {
330 self.write_state.next_release_time =
331 Some(now + max_into_fut.mul_f32(0.8));
332 self.write_state.has_pending_data = false;
333 return Ok(0);
334 }
335 }
336
337 initial_release_decision
338 } else {
339 None
340 };
341
342 let buffer_write_outcome = loop {
343 let outcome = self.write_packet_to_buffer(
344 qconn,
345 send_buf,
346 &mut send_info,
347 segment_size,
348 );
349
350 let packet_size = match outcome {
351 Ok(0) => {
352 self.write_state.has_pending_data = false;
353
354 break Ok(0);
355 },
356 Ok(bytes_written) => {
357 self.write_state.has_pending_data = true;
358
359 bytes_written
360 },
361 Err(e) => break Err(e),
362 };
363
364 if !self.cfg.with_gso {
367 break outcome;
368 }
369
370 #[cfg(not(feature = "gcongestion"))]
371 let max_send_size = if !gcongestion_enabled {
372 tune_max_send_size(
374 segment_size,
375 qconn.send_quantum(),
376 send_buf.len(),
377 )
378 } else {
379 usize::MAX
380 };
381
382 #[cfg(feature = "gcongestion")]
383 let max_send_size = usize::MAX;
384
385 let buffer_is_full = self.write_state.num_pkts ==
389 UDP_MAX_SEGMENT_COUNT ||
390 self.write_state.bytes_written >= max_send_size;
391
392 if buffer_is_full {
393 break outcome;
394 }
395
396 match segment_size {
401 Some(size)
402 if packet_size != size || packet_size < GSO_THRESHOLD =>
403 break outcome,
404 None => segment_size = Some(packet_size),
405 _ => (),
406 }
407
408 if gcongestion_enabled {
409 if let Some(initial_release_decision) = initial_release_decision {
412 match qconn.get_next_release_time() {
413 Some(release)
414 if release.can_burst() ||
415 release.time_eq(
416 &initial_release_decision,
417 now,
418 ) => {},
419 _ => break outcome,
420 }
421 }
422 }
423 };
424
425 let tx_time = if gcongestion_enabled {
426 initial_release_decision
427 .filter(|_| self.pacing_enabled(qconn))
428 .and_then(|v| v.time(now))
430 } else {
431 send_info
432 .filter(|_| self.pacing_enabled(qconn))
433 .map(|v| v.at)
434 };
435
436 self.write_state.conn_established = qconn.is_established();
437 self.write_state.tx_time = tx_time;
438 self.write_state.segment_size =
439 segment_size.unwrap_or(self.write_state.bytes_written);
440
441 if !gcongestion_enabled {
442 if let Some(time) = tx_time {
443 const DEFAULT_MAX_INTO_FUTURE: Duration =
444 Duration::from_millis(1);
445 if time
446 .checked_duration_since(now)
447 .map(|d| d > DEFAULT_MAX_INTO_FUTURE)
448 .unwrap_or(false)
449 {
450 self.write_state.next_release_time =
451 Some(now + DEFAULT_MAX_INTO_FUTURE.mul_f32(0.8));
452 self.write_state.has_pending_data = false;
453 return Ok(0);
454 }
455 }
456 }
457
458 buffer_write_outcome
459 }
460
461 #[cfg(not(feature = "gcongestion"))]
462 fn pacing_enabled(&self, qconn: &QuicheConnection) -> bool {
463 self.cfg.pacing_offload && qconn.pacing_enabled()
464 }
465
466 #[cfg(feature = "gcongestion")]
467 fn pacing_enabled(&self, _qconn: &QuicheConnection) -> bool {
468 self.cfg.pacing_offload
469 }
470
471 fn write_packet_to_buffer(
472 &mut self, qconn: &mut QuicheConnection, send_buf: &mut [u8],
473 send_info: &mut Option<SendInfo>, segment_size: Option<usize>,
474 ) -> QuicResult<usize> {
475 let mut send_buf = &mut send_buf[self.write_state.bytes_written..];
476 if send_buf.len() > segment_size.unwrap_or(usize::MAX) {
477 send_buf = &mut send_buf[..segment_size.unwrap_or(usize::MAX)];
480 }
481
482 match qconn.send(send_buf) {
483 Ok((packet_size, info)) => {
484 let _ = send_info.get_or_insert(info);
485
486 self.write_state.bytes_written += packet_size;
487 self.write_state.num_pkts += 1;
488 self.write_state.send_from =
489 send_info.as_ref().map(|info| info.from);
490
491 Ok(packet_size)
492 },
493 Err(QuicheError::Done) => {
494 Ok(0)
497 },
498 Err(e) => {
499 let error_code = if let Some(local_error) = qconn.local_error() {
500 local_error.error_code
501 } else {
502 let internal_error_code =
503 quiche::WireErrorCode::InternalError as u64;
504 let _ = qconn.close(false, internal_error_code, &[]);
505
506 internal_error_code
507 };
508
509 self.audit_log_stats
510 .set_sent_conn_close_transport_error_code(error_code as i64);
511
512 Err(Box::new(e))
513 },
514 }
515 }
516
517 async fn flush_buffer_to_socket(&mut self, send_buf: &[u8]) {
518 if self.write_state.bytes_written > 0 {
519 let current_send_buf = &send_buf[..self.write_state.bytes_written];
520 let send_res = if let (Some(udp_socket), true) =
521 (self.socket.as_udp_socket(), self.cfg.with_gso)
522 {
523 send_to(
525 udp_socket,
526 self.cfg.peer_addr,
527 self.write_state.send_from.filter(|_| self.cfg.with_pktinfo),
528 current_send_buf,
529 self.write_state.segment_size,
530 self.write_state.tx_time,
531 self.metrics
532 .write_errors(labels::QuicWriteError::WouldBlock),
533 )
534 .await
535 } else {
536 self.socket
537 .send_to(current_send_buf, self.cfg.peer_addr)
538 .await
539 };
540
541 #[cfg(feature = "perf-quic-listener-metrics")]
542 self.measure_complete_handshake_time();
543
544 match send_res {
545 Ok(n) =>
546 if n < self.write_state.bytes_written {
547 self.metrics
548 .write_errors(labels::QuicWriteError::Partial)
549 .inc();
550 },
551 Err(_) => {
552 self.metrics.write_errors(labels::QuicWriteError::Err).inc();
553 },
554 }
555 }
556 }
557
558 fn process_incoming(
560 &mut self, qconn: &mut QuicheConnection, mut pkt: Incoming,
561 ) -> QuicResult<()> {
562 let recv_info = quiche::RecvInfo {
563 from: pkt.peer_addr,
564 to: pkt.local_addr,
565 };
566
567 if let Some(gro) = pkt.gro {
568 for dgram in pkt.buf.chunks_mut(gro as usize) {
569 qconn.recv(dgram, recv_info)?;
570 }
571 } else {
572 qconn.recv(&mut pkt.buf, recv_info)?;
573 }
574
575 Ok(())
576 }
577
578 async fn wait_for_data_or_handshake<A: ApplicationOverQuic>(
582 &mut self, qconn: &mut QuicheConnection, quic_application: &mut A,
583 ) -> QuicResult<()> {
584 if quic_application.should_act() {
585 quic_application.wait_for_data(qconn).await
586 } else {
587 self.wait_for_quiche(qconn, quic_application).await
588 }
589 }
590
591 async fn wait_for_quiche<App: ApplicationOverQuic>(
601 &mut self, qconn: &mut QuicheConnection, app: &mut App,
602 ) -> QuicResult<()> {
603 let populate_send_buf = std::future::poll_fn(|_| {
604 match self.gather_data_from_quiche_conn(qconn, app.buffer()) {
605 Ok(bytes_written) => {
606 if bytes_written == 0 && self.write_state.bytes_written == 0 {
613 Poll::Pending
614 } else {
615 Poll::Ready(Ok(()))
616 }
617 },
618 _ => Poll::Ready(Err(quiche::Error::TlsFail)),
619 }
620 })
621 .await;
622
623 if populate_send_buf.is_err() {
624 return Err(Box::new(quiche::Error::TlsFail));
625 }
626
627 self.flush_buffer_to_socket(app.buffer()).await;
628
629 Ok(())
630 }
631}
632
633pub struct Running<Tx, M, A> {
634 pub(crate) params: IoWorkerParams<Tx, M>,
635 pub(crate) context: ConnectionStageContext<A>,
636 pub(crate) qconn: QuicheConnection,
637}
638
639impl<Tx, M, A> Running<Tx, M, A> {
640 pub fn ssl(&mut self) -> &mut SslRef {
641 self.qconn.as_mut()
642 }
643}
644
645pub(crate) struct Closing<Tx, M, A> {
646 pub(crate) params: IoWorkerParams<Tx, M>,
647 pub(crate) context: ConnectionStageContext<A>,
648 pub(crate) work_loop_result: QuicResult<()>,
649 pub(crate) qconn: QuicheConnection,
650}
651
652pub enum RunningOrClosing<Tx, M, A> {
653 Running(Running<Tx, M, A>),
654 Closing(Closing<Tx, M, A>),
655}
656
657impl<Tx, M> IoWorker<Tx, M, Handshake>
658where
659 Tx: DatagramSocketSend + Send,
660 M: Metrics,
661{
662 pub(crate) async fn run<A>(
663 mut self, mut qconn: QuicheConnection, mut ctx: ConnectionStageContext<A>,
664 ) -> RunningOrClosing<Tx, M, A>
665 where
666 A: ApplicationOverQuic,
667 {
668 std::future::poll_fn(|cx| {
674 let ssl = qconn.as_mut();
675 ssl.set_task_waker(Some(cx.waker().clone()));
676
677 Poll::Ready(())
678 })
679 .await;
680
681 let mut work_loop_result = self.work_loop(&mut qconn, &mut ctx).await;
682 if work_loop_result.is_ok() && qconn.is_closed() {
683 work_loop_result = Err(HandshakeError::ConnectionClosed.into());
684 }
685
686 if let Err(err) = &work_loop_result {
687 self.metrics.failed_handshakes(err.into()).inc();
688
689 return RunningOrClosing::Closing(Closing {
690 params: self.into(),
691 context: ctx,
692 work_loop_result,
693 qconn,
694 });
695 };
696
697 match self.on_conn_established(&mut qconn, &mut ctx.application) {
698 Ok(()) => RunningOrClosing::Running(Running {
699 params: self.into(),
700 context: ctx,
701 qconn,
702 }),
703 Err(e) => {
704 foundations::telemetry::log::warn!(
705 "Handshake stage on_connection_established failed"; "error"=>%e
706 );
707
708 RunningOrClosing::Closing(Closing {
709 params: self.into(),
710 context: ctx,
711 work_loop_result,
712 qconn,
713 })
714 },
715 }
716 }
717
718 fn on_conn_established<App: ApplicationOverQuic>(
719 &mut self, qconn: &mut QuicheConnection, driver: &mut App,
720 ) -> QuicResult<()> {
721 if self.audit_log_stats.transport_handshake_duration_us() == -1 {
725 self.conn_stage.handshake_info.set_elapsed();
726 let handshake_info = &self.conn_stage.handshake_info;
727
728 self.audit_log_stats
729 .set_transport_handshake_duration(handshake_info.elapsed());
730
731 driver.on_conn_established(qconn, handshake_info)?;
732 }
733
734 if let Some(cid) = self.cfg.pending_cid.take() {
735 let _ = self
736 .conn_map_cmd_tx
737 .send(ConnectionMapCommand::UnmapCid(cid));
738 }
739
740 Ok(())
741 }
742}
743
744impl<Tx, M, S> From<IoWorker<Tx, M, S>> for IoWorkerParams<Tx, M> {
745 fn from(value: IoWorker<Tx, M, S>) -> Self {
746 Self {
747 socket: value.socket,
748 shutdown_tx: value.shutdown_tx,
749 cfg: value.cfg,
750 audit_log_stats: value.audit_log_stats,
751 write_state: value.write_state,
752 conn_map_cmd_tx: value.conn_map_cmd_tx,
753 #[cfg(feature = "perf-quic-listener-metrics")]
754 init_rx_time: value.init_rx_time,
755 metrics: value.metrics,
756 }
757 }
758}
759
760impl<Tx, M> IoWorker<Tx, M, RunningApplication>
761where
762 Tx: DatagramSocketSend + Send,
763 M: Metrics,
764{
765 pub(crate) async fn run<A: ApplicationOverQuic>(
766 mut self, mut qconn: QuicheConnection, mut ctx: ConnectionStageContext<A>,
767 ) -> Closing<Tx, M, A> {
768 if let Err(e) = self.conn_stage.on_read(true, &mut qconn, &mut ctx) {
773 return Closing {
774 params: self.into(),
775 context: ctx,
776 work_loop_result: Err(e),
777 qconn,
778 };
779 };
780
781 let work_loop_result = self.work_loop(&mut qconn, &mut ctx).await;
782
783 Closing {
784 params: self.into(),
785 context: ctx,
786 work_loop_result,
787 qconn,
788 }
789 }
790}
791
792impl<Tx, M> IoWorker<Tx, M, Close>
793where
794 Tx: DatagramSocketSend + Send,
795 M: Metrics,
796{
797 pub(crate) async fn close<A: ApplicationOverQuic>(
798 mut self, qconn: &mut QuicheConnection,
799 ctx: &mut ConnectionStageContext<A>,
800 ) {
801 if self.conn_stage.work_loop_result.is_ok() &&
802 self.bw_estimator.max_bandwidth > 0
803 {
804 let metrics = &self.metrics;
805
806 metrics
807 .max_bandwidth_mbps()
808 .observe(self.bw_estimator.max_bandwidth as f64 * 1e-6);
809
810 metrics
811 .max_loss_pct()
812 .observe(self.bw_estimator.max_loss_pct as f64 * 100.);
813 }
814
815 if ctx.application.should_act() {
816 ctx.application.on_conn_close(
817 qconn,
818 &self.metrics,
819 &self.conn_stage.work_loop_result,
820 );
821 }
822
823 let _ = self.gather_data_from_quiche_conn(qconn, ctx.buffer());
828 self.flush_buffer_to_socket(ctx.buffer()).await;
829
830 *ctx.stats.lock().unwrap() = QuicConnectionStats::from_conn(qconn);
831
832 if let Some(err) = qconn.peer_error() {
833 if err.is_app {
834 self.audit_log_stats
835 .set_recvd_conn_close_application_error_code(
836 err.error_code as _,
837 );
838 } else {
839 self.audit_log_stats
840 .set_recvd_conn_close_transport_error_code(
841 err.error_code as _,
842 );
843 }
844 }
845
846 self.close_connection(qconn);
847
848 if let Err(work_loop_error) = self.conn_stage.work_loop_result {
849 self.audit_log_stats
850 .set_connection_close_reason(work_loop_error);
851 }
852 }
853
854 fn close_connection(&mut self, qconn: &QuicheConnection) {
855 let scid = qconn.source_id().into_owned();
856
857 if let Some(cid) = self.cfg.pending_cid.take() {
858 let _ = self
859 .conn_map_cmd_tx
860 .send(ConnectionMapCommand::UnmapCid(cid));
861 }
862
863 let _ = self
864 .conn_map_cmd_tx
865 .send(ConnectionMapCommand::RemoveScid(scid));
866
867 self.metrics.connections_in_memory().dec();
868 }
869}
870
871fn min_of_some<T: Ord>(v1: Option<T>, v2: Option<T>) -> Option<T> {
873 match (v1, v2) {
874 (Some(a), Some(b)) => Some(a.min(b)),
875 (Some(v), _) | (_, Some(v)) => Some(v),
876 (None, None) => None,
877 }
878}