tokio_quiche/quic/io/
worker.rs

1// Copyright (C) 2025, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27use std::net::SocketAddr;
28use std::ops::ControlFlow;
29use std::sync::Arc;
30use std::task::Poll;
31use std::time::Duration;
32use std::time::Instant;
33#[cfg(feature = "perf-quic-listener-metrics")]
34use std::time::SystemTime;
35
36use super::connection_stage::Close;
37use super::connection_stage::ConnectionStage;
38use super::connection_stage::ConnectionStageContext;
39use super::connection_stage::Handshake;
40use super::connection_stage::RunningApplication;
41use super::gso::*;
42use super::utilization_estimator::BandwidthReporter;
43
44use crate::metrics::labels;
45use crate::metrics::Metrics;
46use crate::quic::connection::ApplicationOverQuic;
47use crate::quic::connection::HandshakeError;
48use crate::quic::connection::Incoming;
49use crate::quic::connection::QuicConnectionStats;
50use crate::quic::router::ConnectionMapCommand;
51use crate::quic::QuicheConnection;
52use crate::QuicResult;
53
54use boring::ssl::SslRef;
55use datagram_socket::DatagramSocketSend;
56use datagram_socket::DatagramSocketSendExt;
57use datagram_socket::MaybeConnectedSocket;
58use datagram_socket::QuicAuditStats;
59use foundations::telemetry::log;
60use quiche::ConnectionId;
61use quiche::Error as QuicheError;
62use quiche::SendInfo;
63use tokio::select;
64use tokio::sync::mpsc;
65use tokio::time;
66
67// Number of incoming packets to be buffered in the incoming channel.
68pub(crate) const INCOMING_QUEUE_SIZE: usize = 2048;
69
70// Check if there are any incoming packets while sending data every this number
71// of sent packets
72pub(crate) const CHECK_INCOMING_QUEUE_RATIO: usize = INCOMING_QUEUE_SIZE / 16;
73
74const RELEASE_TIMER_THRESHOLD: Duration = Duration::from_micros(250);
75
76/// Stop queuing GSO packets, if packet size is below this threshold.
77const GSO_THRESHOLD: usize = 1_000;
78
79pub struct WriterConfig {
80    pub pending_cid: Option<ConnectionId<'static>>,
81    pub peer_addr: SocketAddr,
82    pub with_gso: bool,
83    pub pacing_offload: bool,
84    pub with_pktinfo: bool,
85}
86
87#[derive(Default)]
88pub(crate) struct WriteState {
89    conn_established: bool,
90    bytes_written: usize,
91    segment_size: usize,
92    num_pkts: usize,
93    tx_time: Option<Instant>,
94    has_pending_data: bool,
95    // If pacer schedules packets too far into the future, we want to pause
96    // sending, until the future arrives
97    next_release_time: Option<Instant>,
98    // If set, outgoing packets will be sent to the peer from the `send_from`
99    // address rather than the listening socket.
100    send_from: Option<SocketAddr>,
101}
102
103pub(crate) struct IoWorkerParams<Tx, M> {
104    pub(crate) socket: MaybeConnectedSocket<Tx>,
105    pub(crate) shutdown_tx: mpsc::Sender<()>,
106    pub(crate) cfg: WriterConfig,
107    pub(crate) audit_log_stats: Arc<QuicAuditStats>,
108    pub(crate) write_state: WriteState,
109    pub(crate) conn_map_cmd_tx: mpsc::UnboundedSender<ConnectionMapCommand>,
110    #[cfg(feature = "perf-quic-listener-metrics")]
111    pub(crate) init_rx_time: Option<SystemTime>,
112    pub(crate) metrics: M,
113}
114
115pub(crate) struct IoWorker<Tx, M, S> {
116    socket: MaybeConnectedSocket<Tx>,
117    /// A field that signals to the listener task that the connection has gone
118    /// away (nothing is sent here, listener task just detects the sender
119    /// has dropped)
120    shutdown_tx: mpsc::Sender<()>,
121    cfg: WriterConfig,
122    audit_log_stats: Arc<QuicAuditStats>,
123    write_state: WriteState,
124    conn_map_cmd_tx: mpsc::UnboundedSender<ConnectionMapCommand>,
125    #[cfg(feature = "perf-quic-listener-metrics")]
126    init_rx_time: Option<SystemTime>,
127    metrics: M,
128    conn_stage: S,
129    bw_estimator: BandwidthReporter,
130}
131
132impl<Tx, M, S> IoWorker<Tx, M, S>
133where
134    Tx: DatagramSocketSend + Send,
135    M: Metrics,
136    S: ConnectionStage,
137{
138    pub(crate) fn new(params: IoWorkerParams<Tx, M>, conn_stage: S) -> Self {
139        let bw_estimator =
140            BandwidthReporter::new(params.metrics.utilized_bandwidth());
141
142        log::trace!("Creating IoWorker with stage: {conn_stage:?}");
143
144        Self {
145            socket: params.socket,
146            shutdown_tx: params.shutdown_tx,
147            cfg: params.cfg,
148            audit_log_stats: params.audit_log_stats,
149            write_state: params.write_state,
150            conn_map_cmd_tx: params.conn_map_cmd_tx,
151            #[cfg(feature = "perf-quic-listener-metrics")]
152            init_rx_time: params.init_rx_time,
153            metrics: params.metrics,
154            conn_stage,
155            bw_estimator,
156        }
157    }
158
159    async fn work_loop<A: ApplicationOverQuic>(
160        &mut self, qconn: &mut QuicheConnection,
161        ctx: &mut ConnectionStageContext<A>,
162    ) -> QuicResult<()> {
163        const DEFAULT_SLEEP: Duration = Duration::from_secs(60);
164        let mut current_deadline: Option<Instant> = None;
165        let sleep = time::sleep(DEFAULT_SLEEP);
166        tokio::pin!(sleep);
167
168        loop {
169            let now = Instant::now();
170
171            self.write_state.has_pending_data = true;
172
173            while self.write_state.has_pending_data {
174                let mut packets_sent = 0;
175
176                // Try to clear all received packets every so often, because
177                // incoming packets contain acks, and because the
178                // recieve queue has a very limited size, once it is full incoming
179                // packets get stalled indefinitely
180                let mut did_recv = false;
181                while let Some(pkt) = ctx
182                    .in_pkt
183                    .take()
184                    .or_else(|| ctx.incoming_pkt_receiver.try_recv().ok())
185                {
186                    self.process_incoming(qconn, pkt)?;
187                    did_recv = true;
188                }
189
190                self.conn_stage.on_read(did_recv, qconn, ctx)?;
191
192                let can_release = match self.write_state.next_release_time {
193                    None => true,
194                    Some(next_release) =>
195                        next_release
196                            .checked_duration_since(now)
197                            .unwrap_or_default() <
198                            RELEASE_TIMER_THRESHOLD,
199                };
200
201                self.write_state.has_pending_data &= can_release;
202
203                while self.write_state.has_pending_data &&
204                    packets_sent < CHECK_INCOMING_QUEUE_RATIO
205                {
206                    self.gather_data_from_quiche_conn(qconn, ctx.buffer())?;
207
208                    // Break if the connection is closed
209                    if qconn.is_closed() {
210                        return Ok(());
211                    }
212
213                    self.flush_buffer_to_socket(ctx.buffer()).await;
214                    packets_sent += self.write_state.num_pkts;
215
216                    if let ControlFlow::Break(reason) =
217                        self.conn_stage.on_flush(qconn, ctx)
218                    {
219                        return reason;
220                    }
221                }
222            }
223
224            self.bw_estimator.update(qconn, now);
225
226            let new_deadline = min_of_some(
227                qconn.timeout_instant(),
228                self.write_state.next_release_time,
229            );
230            let new_deadline =
231                min_of_some(new_deadline, self.conn_stage.wait_deadline());
232
233            if new_deadline != current_deadline {
234                current_deadline = new_deadline;
235
236                sleep
237                    .as_mut()
238                    .reset(new_deadline.unwrap_or(now + DEFAULT_SLEEP).into());
239            }
240
241            let incoming_recv = &mut ctx.incoming_pkt_receiver;
242            let application = &mut ctx.application;
243            select! {
244                biased;
245                () = &mut sleep => {
246                    // It's very important that we keep the timeout arm at the top of this loop so
247                    // that we poll it every time we need to. Since this is a biased `select!`, if
248                    // we put this behind another arm, we could theoretically starve the sleep arm
249                    // and hang connections.
250                    //
251                    // See https://docs.rs/tokio/latest/tokio/macro.select.html#fairness for more
252                    qconn.on_timeout();
253
254                    self.write_state.next_release_time = None;
255                    current_deadline = None;
256                    sleep.as_mut().reset((now + DEFAULT_SLEEP).into());
257                }
258                Some(pkt) = incoming_recv.recv() => ctx.in_pkt = Some(pkt),
259                // TODO(erittenhouse): would be nice to decouple wait_for_data from the
260                // application, but wait_for_quiche relies on IOW methods, so we can't write a
261                // default implementation for ConnectionStage
262                status = self.wait_for_data_or_handshake(qconn, application) => status?,
263            };
264
265            if let ControlFlow::Break(reason) = self.conn_stage.post_wait(qconn) {
266                return reason;
267            }
268        }
269    }
270
271    #[inline]
272    fn gather_data_from_quiche_conn(
273        &mut self, qconn: &mut QuicheConnection, send_buf: &mut [u8],
274    ) -> QuicResult<usize> {
275        self.fill_send_buffer(qconn, send_buf)
276    }
277
278    #[cfg(feature = "perf-quic-listener-metrics")]
279    fn measure_complete_handshake_time(&mut self) {
280        if let Some(init_rx_time) = self.init_rx_time.take() {
281            if let Ok(delta) = init_rx_time.elapsed() {
282                self.metrics
283                    .handshake_time_seconds(
284                        labels::QuicHandshakeStage::HandshakeResponse,
285                    )
286                    .observe(delta.as_nanos() as u64);
287            }
288        }
289    }
290
291    fn fill_send_buffer(
292        &mut self, qconn: &mut QuicheConnection, send_buf: &mut [u8],
293    ) -> QuicResult<usize> {
294        let mut segment_size = None;
295        let mut send_info = None;
296
297        self.write_state.num_pkts = 0;
298        self.write_state.bytes_written = 0;
299
300        let now = Instant::now();
301
302        let send_buf = {
303            let trunc = UDP_MAX_GSO_PACKET_SIZE.min(send_buf.len());
304            &mut send_buf[..trunc]
305        };
306
307        #[cfg(feature = "gcongestion")]
308        let gcongestion_enabled = true;
309
310        #[cfg(not(feature = "gcongestion"))]
311        let gcongestion_enabled = qconn.gcongestion_enabled().unwrap_or(false);
312
313        let initial_release_decision = if gcongestion_enabled {
314            let initial_release_decision = qconn
315                .get_next_release_time()
316                .filter(|_| self.cfg.pacing_offload);
317
318            if let Some(future_release_time) =
319                initial_release_decision.as_ref().and_then(|v| v.time(now))
320            {
321                let max_into_fut = qconn.max_release_into_future();
322
323                if future_release_time.duration_since(now) >= max_into_fut {
324                    self.write_state.next_release_time =
325                        Some(now + max_into_fut.mul_f32(0.8));
326                    self.write_state.has_pending_data = false;
327                    return Ok(0);
328                }
329            }
330
331            initial_release_decision
332        } else {
333            None
334        };
335
336        let buffer_write_outcome = loop {
337            let outcome = self.write_packet_to_buffer(
338                qconn,
339                send_buf,
340                &mut send_info,
341                segment_size,
342            );
343
344            let packet_size = match outcome {
345                Ok(0) => {
346                    self.write_state.has_pending_data = false;
347
348                    break Ok(0);
349                },
350                Ok(bytes_written) => {
351                    self.write_state.has_pending_data = true;
352
353                    bytes_written
354                },
355                Err(e) => break Err(e),
356            };
357
358            // Flush to network after generating a single packet when GSO
359            // is disabled.
360            if !self.cfg.with_gso {
361                break outcome;
362            }
363
364            #[cfg(not(feature = "gcongestion"))]
365            let max_send_size = if !gcongestion_enabled {
366                // Only call qconn.send_quantum when !gcongestion_enabled.
367                tune_max_send_size(
368                    segment_size,
369                    qconn.send_quantum(),
370                    send_buf.len(),
371                )
372            } else {
373                usize::MAX
374            };
375
376            #[cfg(feature = "gcongestion")]
377            let max_send_size = usize::MAX;
378
379            // If segment_size is known, update the maximum of
380            // GSO sender buffer size to the multiple of
381            // segment_size.
382            let buffer_is_full = self.write_state.num_pkts ==
383                UDP_MAX_SEGMENT_COUNT ||
384                self.write_state.bytes_written >= max_send_size;
385
386            if buffer_is_full {
387                break outcome;
388            }
389
390            // Flush to network when the newly generated packet size is
391            // different from previously written packet, as GSO needs packets
392            // to have the same size, except for the last one in the buffer.
393            // The last packet may be smaller than the previous size.
394            match segment_size {
395                Some(size)
396                    if packet_size != size || packet_size < GSO_THRESHOLD =>
397                    break outcome,
398                None => segment_size = Some(packet_size),
399                _ => (),
400            }
401
402            if gcongestion_enabled {
403                // If the release time of next packet is different, or it can't be
404                // part of a burst, start the next batch
405                if let Some(initial_release_decision) = initial_release_decision {
406                    match qconn.get_next_release_time() {
407                        Some(release)
408                            if release.can_burst() ||
409                                release.time_eq(
410                                    &initial_release_decision,
411                                    now,
412                                ) => {},
413                        _ => break outcome,
414                    }
415                }
416            }
417        };
418
419        let tx_time = if gcongestion_enabled {
420            initial_release_decision
421                .filter(|_| self.cfg.pacing_offload)
422                // Return the time from the release decision if release_decision.time > now, else None.
423                .and_then(|v| v.time(now))
424        } else {
425            send_info.filter(|_| self.cfg.pacing_offload).map(|v| v.at)
426        };
427
428        self.write_state.conn_established = qconn.is_established();
429        self.write_state.tx_time = tx_time;
430        self.write_state.segment_size =
431            segment_size.unwrap_or(self.write_state.bytes_written);
432
433        if !gcongestion_enabled {
434            if let Some(time) = tx_time {
435                const DEFAULT_MAX_INTO_FUTURE: Duration =
436                    Duration::from_millis(1);
437                if time
438                    .checked_duration_since(now)
439                    .map(|d| d > DEFAULT_MAX_INTO_FUTURE)
440                    .unwrap_or(false)
441                {
442                    self.write_state.next_release_time =
443                        Some(now + DEFAULT_MAX_INTO_FUTURE.mul_f32(0.8));
444                    self.write_state.has_pending_data = false;
445                    return Ok(0);
446                }
447            }
448        }
449
450        buffer_write_outcome
451    }
452
453    fn write_packet_to_buffer(
454        &mut self, qconn: &mut QuicheConnection, send_buf: &mut [u8],
455        send_info: &mut Option<SendInfo>, segment_size: Option<usize>,
456    ) -> QuicResult<usize> {
457        let mut send_buf = &mut send_buf[self.write_state.bytes_written..];
458        if send_buf.len() > segment_size.unwrap_or(usize::MAX) {
459            // Never let the buffer be longer than segment size, for GSO to
460            // function properly
461            send_buf = &mut send_buf[..segment_size.unwrap_or(usize::MAX)];
462        }
463
464        match qconn.send(send_buf) {
465            Ok((packet_size, info)) => {
466                let _ = send_info.get_or_insert(info);
467
468                self.write_state.bytes_written += packet_size;
469                self.write_state.num_pkts += 1;
470                self.write_state.send_from =
471                    send_info.as_ref().map(|info| info.from);
472
473                Ok(packet_size)
474            },
475            Err(QuicheError::Done) => {
476                // Flush to network and yield when there are no
477                // more packets to write.
478                Ok(0)
479            },
480            Err(e) => {
481                if let Some(local_error) = qconn.local_error() {
482                    self.audit_log_stats
483                        .set_sent_conn_close_transport_error_code(
484                            local_error.error_code as i64,
485                        );
486                    log::error!(
487                        "quiche::send failed and connection closed with error_code: {}",
488                        local_error.error_code
489                    );
490                } else {
491                    let internal_error_code =
492                        quiche::WireErrorCode::InternalError as u64;
493
494                    self.audit_log_stats
495                        .set_sent_conn_close_transport_error_code(
496                            internal_error_code as i64,
497                        );
498
499                    let _ = qconn.close(false, internal_error_code, &[]);
500                    log::error!(
501                        "quiche::send failed, closing connection with INTERNAL_ERROR: {}",
502                        internal_error_code
503                    );
504                }
505
506                Err(Box::new(e))
507            },
508        }
509    }
510
511    async fn flush_buffer_to_socket(&mut self, send_buf: &[u8]) {
512        if self.write_state.bytes_written > 0 {
513            let current_send_buf = &send_buf[..self.write_state.bytes_written];
514            let send_res = if let (Some(udp_socket), true) =
515                (self.socket.as_udp_socket(), self.cfg.with_gso)
516            {
517                // Only UDP supports GSO
518                send_to(
519                    udp_socket,
520                    self.cfg.peer_addr,
521                    self.write_state.send_from.filter(|_| self.cfg.with_pktinfo),
522                    current_send_buf,
523                    self.write_state.segment_size,
524                    self.write_state.num_pkts,
525                    self.write_state.tx_time,
526                )
527                .await
528            } else {
529                self.socket
530                    .send_to(current_send_buf, self.cfg.peer_addr)
531                    .await
532            };
533
534            #[cfg(feature = "perf-quic-listener-metrics")]
535            self.measure_complete_handshake_time();
536
537            match send_res {
538                Ok(n) =>
539                    if n < self.write_state.bytes_written {
540                        self.metrics
541                            .write_errors(labels::QuicWriteError::Partial)
542                            .inc();
543                    },
544                Err(_) => {
545                    self.metrics.write_errors(labels::QuicWriteError::Err).inc();
546                },
547            }
548        }
549    }
550
551    /// Process the incoming packet
552    fn process_incoming(
553        &mut self, qconn: &mut QuicheConnection, mut pkt: Incoming,
554    ) -> QuicResult<()> {
555        let recv_info = quiche::RecvInfo {
556            from: pkt.peer_addr,
557            to: pkt.local_addr,
558        };
559
560        if let Some(gro) = pkt.gro {
561            for dgram in pkt.buf.chunks_mut(gro as usize) {
562                qconn.recv(dgram, recv_info)?;
563            }
564        } else {
565            qconn.recv(&mut pkt.buf, recv_info)?;
566        }
567
568        Ok(())
569    }
570
571    /// When a connection is established, process application data, if not the
572    /// task is probably polled following a wakeup from boring, so we check
573    /// if quiche has any handshake packets to send.
574    async fn wait_for_data_or_handshake<A: ApplicationOverQuic>(
575        &mut self, qconn: &mut QuicheConnection, quic_application: &mut A,
576    ) -> QuicResult<()> {
577        if quic_application.should_act() {
578            quic_application.wait_for_data(qconn).await
579        } else {
580            self.wait_for_quiche(qconn, quic_application).await
581        }
582    }
583
584    /// Check if Quiche has any packets to send and flush them to socket.
585    ///
586    /// # Example
587    ///
588    /// This function can be used, for example, to drive an asynchronous TLS
589    /// handshake. Each call to `gather_data_from_quiche_conn` attempts to
590    /// progress the handshake via a call to `quiche::Connection.send()` -
591    /// once one of the `gather_data_from_quiche_conn()` calls writes to the
592    /// send buffer, we flush it to the network socket.
593    async fn wait_for_quiche<App: ApplicationOverQuic>(
594        &mut self, qconn: &mut QuicheConnection, app: &mut App,
595    ) -> QuicResult<()> {
596        let populate_send_buf = std::future::poll_fn(|_| {
597            match self.gather_data_from_quiche_conn(qconn, app.buffer()) {
598                Ok(bytes_written) => {
599                    // We need to avoid consecutive calls to gather(), which write
600                    // data to the buffer, without a flush().
601                    // If we don't avoid those consecutive calls, we end
602                    // up overwriting data in the buffer or unnecessarily waiting
603                    // for more calls to drive_handshake()
604                    // before calling the handshake complete.
605                    if bytes_written == 0 && self.write_state.bytes_written == 0 {
606                        Poll::Pending
607                    } else {
608                        Poll::Ready(Ok(()))
609                    }
610                },
611                _ => Poll::Ready(Err(quiche::Error::TlsFail)),
612            }
613        })
614        .await;
615
616        if populate_send_buf.is_err() {
617            return Err(Box::new(quiche::Error::TlsFail));
618        }
619
620        self.flush_buffer_to_socket(app.buffer()).await;
621
622        Ok(())
623    }
624}
625
626pub struct Running<Tx, M, A> {
627    pub(crate) params: IoWorkerParams<Tx, M>,
628    pub(crate) context: ConnectionStageContext<A>,
629    pub(crate) qconn: QuicheConnection,
630}
631
632impl<Tx, M, A> Running<Tx, M, A> {
633    pub fn ssl(&mut self) -> &mut SslRef {
634        self.qconn.as_mut()
635    }
636}
637
638pub(crate) struct Closing<Tx, M, A> {
639    pub(crate) params: IoWorkerParams<Tx, M>,
640    pub(crate) context: ConnectionStageContext<A>,
641    pub(crate) work_loop_result: QuicResult<()>,
642    pub(crate) qconn: QuicheConnection,
643}
644
645pub enum RunningOrClosing<Tx, M, A> {
646    Running(Running<Tx, M, A>),
647    Closing(Closing<Tx, M, A>),
648}
649
650impl<Tx, M> IoWorker<Tx, M, Handshake>
651where
652    Tx: DatagramSocketSend + Send,
653    M: Metrics,
654{
655    pub(crate) async fn run<A>(
656        mut self, mut qconn: QuicheConnection, mut ctx: ConnectionStageContext<A>,
657    ) -> RunningOrClosing<Tx, M, A>
658    where
659        A: ApplicationOverQuic,
660    {
661        // This makes an assumption that the waker being set in ex_data is stable
662        // accross the active task's lifetime. Moving a future that encompasses an
663        // async callback from this task accross a channel, for example, will
664        // cause issues as this waker will then be stale and attempt to
665        // wake the wrong task.
666        std::future::poll_fn(|cx| {
667            let ssl = qconn.as_mut();
668            ssl.set_task_waker(Some(cx.waker().clone()));
669
670            Poll::Ready(())
671        })
672        .await;
673
674        let mut work_loop_result = self.work_loop(&mut qconn, &mut ctx).await;
675        if work_loop_result.is_ok() && qconn.is_closed() {
676            work_loop_result = Err(HandshakeError::ConnectionClosed.into());
677        }
678
679        if let Err(err) = &work_loop_result {
680            self.metrics.failed_handshakes(err.into()).inc();
681
682            return RunningOrClosing::Closing(Closing {
683                params: self.into(),
684                context: ctx,
685                work_loop_result,
686                qconn,
687            });
688        };
689
690        match self.on_conn_established(&mut qconn, &mut ctx.application) {
691            Ok(()) => RunningOrClosing::Running(Running {
692                params: self.into(),
693                context: ctx,
694                qconn,
695            }),
696            Err(e) => {
697                foundations::telemetry::log::warn!(
698                    "Handshake stage on_connection_established failed"; "error"=>%e
699                );
700
701                RunningOrClosing::Closing(Closing {
702                    params: self.into(),
703                    context: ctx,
704                    work_loop_result,
705                    qconn,
706                })
707            },
708        }
709    }
710
711    fn on_conn_established<App: ApplicationOverQuic>(
712        &mut self, qconn: &mut QuicheConnection, driver: &mut App,
713    ) -> QuicResult<()> {
714        // Only calculate the QUIC handshake duration and call the driver's
715        // on_conn_established hook if this is the first time
716        // is_established == true.
717        if self.audit_log_stats.transport_handshake_duration_us() == -1 {
718            self.conn_stage.handshake_info.set_elapsed();
719            let handshake_info = &self.conn_stage.handshake_info;
720
721            self.audit_log_stats
722                .set_transport_handshake_duration(handshake_info.elapsed());
723
724            driver.on_conn_established(qconn, handshake_info)?;
725        }
726
727        if let Some(cid) = self.cfg.pending_cid.take() {
728            let _ = self
729                .conn_map_cmd_tx
730                .send(ConnectionMapCommand::UnmapCid(cid));
731        }
732
733        Ok(())
734    }
735}
736
737impl<Tx, M, S> From<IoWorker<Tx, M, S>> for IoWorkerParams<Tx, M> {
738    fn from(value: IoWorker<Tx, M, S>) -> Self {
739        Self {
740            socket: value.socket,
741            shutdown_tx: value.shutdown_tx,
742            cfg: value.cfg,
743            audit_log_stats: value.audit_log_stats,
744            write_state: value.write_state,
745            conn_map_cmd_tx: value.conn_map_cmd_tx,
746            #[cfg(feature = "perf-quic-listener-metrics")]
747            init_rx_time: value.init_rx_time,
748            metrics: value.metrics,
749        }
750    }
751}
752
753impl<Tx, M> IoWorker<Tx, M, RunningApplication>
754where
755    Tx: DatagramSocketSend + Send,
756    M: Metrics,
757{
758    pub(crate) async fn run<A: ApplicationOverQuic>(
759        mut self, mut qconn: QuicheConnection, mut ctx: ConnectionStageContext<A>,
760    ) -> Closing<Tx, M, A> {
761        let work_loop_result = self.work_loop(&mut qconn, &mut ctx).await;
762
763        Closing {
764            params: self.into(),
765            context: ctx,
766            work_loop_result,
767            qconn,
768        }
769    }
770}
771
772impl<Tx, M> IoWorker<Tx, M, Close>
773where
774    Tx: DatagramSocketSend + Send,
775    M: Metrics,
776{
777    pub(crate) async fn close<A: ApplicationOverQuic>(
778        mut self, qconn: &mut QuicheConnection,
779        ctx: &mut ConnectionStageContext<A>,
780    ) {
781        if self.conn_stage.work_loop_result.is_ok() &&
782            self.bw_estimator.max_bandwidth > 0
783        {
784            let metrics = &self.metrics;
785
786            metrics
787                .max_bandwidth_mbps()
788                .observe(self.bw_estimator.max_bandwidth as f64 * 1e-6);
789
790            metrics
791                .max_loss_pct()
792                .observe(self.bw_estimator.max_loss_pct as f64 * 100.);
793        }
794
795        if ctx.application.should_act() {
796            ctx.application.on_conn_close(
797                qconn,
798                &self.metrics,
799                &self.conn_stage.work_loop_result,
800            );
801        }
802
803        // TODO: this assumes that the tidy_up operation can be completed in one
804        // send (ignoring flow/congestion control constraints). We should
805        // guarantee that it gets sent by doublechecking the
806        // gathered/flushed byte totals and retry if they don't match.
807        let _ = self.gather_data_from_quiche_conn(qconn, ctx.buffer());
808        self.flush_buffer_to_socket(ctx.buffer()).await;
809
810        *ctx.stats.lock().unwrap() = QuicConnectionStats::from_conn(qconn);
811
812        if let Some(err) = qconn.peer_error() {
813            if err.is_app {
814                self.audit_log_stats
815                    .set_recvd_conn_close_application_error_code(
816                        err.error_code as _,
817                    );
818            } else {
819                self.audit_log_stats
820                    .set_recvd_conn_close_transport_error_code(
821                        err.error_code as _,
822                    );
823            }
824        }
825
826        self.close_connection(qconn);
827
828        if let Err(work_loop_error) = self.conn_stage.work_loop_result {
829            self.audit_log_stats
830                .set_connection_close_reason(work_loop_error);
831        }
832    }
833
834    fn close_connection(&mut self, qconn: &QuicheConnection) {
835        let scid = qconn.source_id().into_owned();
836
837        if let Some(cid) = self.cfg.pending_cid.take() {
838            let _ = self
839                .conn_map_cmd_tx
840                .send(ConnectionMapCommand::UnmapCid(cid));
841        }
842
843        let _ = self
844            .conn_map_cmd_tx
845            .send(ConnectionMapCommand::RemoveScid(scid));
846
847        self.metrics.connections_in_memory().dec();
848    }
849}
850
851/// Returns the minimum of `v1` and `v2`, ignoring `None`s.
852fn min_of_some<T: Ord>(v1: Option<T>, v2: Option<T>) -> Option<T> {
853    match (v1, v2) {
854        (Some(a), Some(b)) => Some(a.min(b)),
855        (Some(v), _) | (_, Some(v)) => Some(v),
856        (None, None) => None,
857    }
858}