tokio_quiche/quic/io/
worker.rs

1// Copyright (C) 2025, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27use std::net::SocketAddr;
28use std::ops::ControlFlow;
29use std::sync::Arc;
30use std::task::Poll;
31use std::time::Duration;
32use std::time::Instant;
33#[cfg(feature = "perf-quic-listener-metrics")]
34use std::time::SystemTime;
35
36use super::connection_stage::Close;
37use super::connection_stage::ConnectionStage;
38use super::connection_stage::ConnectionStageContext;
39use super::connection_stage::Handshake;
40use super::connection_stage::RunningApplication;
41use super::gso::*;
42use super::utilization_estimator::BandwidthReporter;
43
44use crate::metrics::labels;
45use crate::metrics::Metrics;
46use crate::quic::connection::ApplicationOverQuic;
47use crate::quic::connection::HandshakeError;
48use crate::quic::connection::Incoming;
49use crate::quic::connection::QuicConnectionStats;
50use crate::quic::router::ConnectionMapCommand;
51use crate::quic::QuicheConnection;
52use crate::QuicResult;
53
54use boring::ssl::SslRef;
55use datagram_socket::DatagramSocketSend;
56use datagram_socket::DatagramSocketSendExt;
57use datagram_socket::MaybeConnectedSocket;
58use datagram_socket::QuicAuditStats;
59use foundations::telemetry::log;
60use quiche::ConnectionId;
61use quiche::Error as QuicheError;
62use quiche::SendInfo;
63use tokio::select;
64use tokio::sync::mpsc;
65use tokio::time;
66
67// Number of incoming packets to be buffered in the incoming channel.
68pub(crate) const INCOMING_QUEUE_SIZE: usize = 2048;
69
70// Check if there are any incoming packets while sending data every this number
71// of sent packets
72pub(crate) const CHECK_INCOMING_QUEUE_RATIO: usize = INCOMING_QUEUE_SIZE / 16;
73
74const RELEASE_TIMER_THRESHOLD: Duration = Duration::from_micros(250);
75
76/// Stop queuing GSO packets, if packet size is below this threshold.
77const GSO_THRESHOLD: usize = 1_000;
78
79pub struct WriterConfig {
80    pub pending_cid: Option<ConnectionId<'static>>,
81    pub peer_addr: SocketAddr,
82    pub with_gso: bool,
83    pub pacing_offload: bool,
84    pub with_pktinfo: bool,
85}
86
87#[derive(Default)]
88pub(crate) struct WriteState {
89    conn_established: bool,
90    bytes_written: usize,
91    segment_size: usize,
92    num_pkts: usize,
93    tx_time: Option<Instant>,
94    has_pending_data: bool,
95    // If pacer schedules packets too far into the future, we want to pause
96    // sending, until the future arrives
97    next_release_time: Option<Instant>,
98    // If set, outgoing packets will be sent to the peer from the `send_from`
99    // address rather than the listening socket.
100    send_from: Option<SocketAddr>,
101}
102
103pub(crate) struct IoWorkerParams<Tx, M> {
104    pub(crate) socket: MaybeConnectedSocket<Tx>,
105    pub(crate) shutdown_tx: mpsc::Sender<()>,
106    pub(crate) cfg: WriterConfig,
107    pub(crate) audit_log_stats: Arc<QuicAuditStats>,
108    pub(crate) write_state: WriteState,
109    pub(crate) conn_map_cmd_tx: mpsc::UnboundedSender<ConnectionMapCommand>,
110    #[cfg(feature = "perf-quic-listener-metrics")]
111    pub(crate) init_rx_time: Option<SystemTime>,
112    pub(crate) metrics: M,
113}
114
115pub(crate) struct IoWorker<Tx, M, S> {
116    socket: MaybeConnectedSocket<Tx>,
117    /// A field that signals to the listener task that the connection has gone
118    /// away (nothing is sent here, listener task just detects the sender
119    /// has dropped)
120    shutdown_tx: mpsc::Sender<()>,
121    cfg: WriterConfig,
122    audit_log_stats: Arc<QuicAuditStats>,
123    write_state: WriteState,
124    conn_map_cmd_tx: mpsc::UnboundedSender<ConnectionMapCommand>,
125    #[cfg(feature = "perf-quic-listener-metrics")]
126    init_rx_time: Option<SystemTime>,
127    metrics: M,
128    conn_stage: S,
129    bw_estimator: BandwidthReporter,
130}
131
132impl<Tx, M, S> IoWorker<Tx, M, S>
133where
134    Tx: DatagramSocketSend + Send,
135    M: Metrics,
136    S: ConnectionStage,
137{
138    pub(crate) fn new(params: IoWorkerParams<Tx, M>, conn_stage: S) -> Self {
139        let bw_estimator =
140            BandwidthReporter::new(params.metrics.utilized_bandwidth());
141
142        log::trace!("Creating IoWorker with stage: {conn_stage:?}");
143
144        Self {
145            socket: params.socket,
146            shutdown_tx: params.shutdown_tx,
147            cfg: params.cfg,
148            audit_log_stats: params.audit_log_stats,
149            write_state: params.write_state,
150            conn_map_cmd_tx: params.conn_map_cmd_tx,
151            #[cfg(feature = "perf-quic-listener-metrics")]
152            init_rx_time: params.init_rx_time,
153            metrics: params.metrics,
154            conn_stage,
155            bw_estimator,
156        }
157    }
158
159    async fn work_loop<A: ApplicationOverQuic>(
160        &mut self, qconn: &mut QuicheConnection,
161        ctx: &mut ConnectionStageContext<A>,
162    ) -> QuicResult<()> {
163        const DEFAULT_SLEEP: Duration = Duration::from_secs(60);
164        let mut current_deadline: Option<Instant> = None;
165        let sleep = time::sleep(DEFAULT_SLEEP);
166        tokio::pin!(sleep);
167
168        loop {
169            let now = Instant::now();
170
171            self.write_state.has_pending_data = true;
172
173            while self.write_state.has_pending_data {
174                let mut packets_sent = 0;
175
176                // Try to clear all received packets every so often, because
177                // incoming packets contain acks, and because the
178                // receive queue has a very limited size, once it is full incoming
179                // packets get stalled indefinitely
180                let mut did_recv = false;
181                while let Some(pkt) = ctx
182                    .in_pkt
183                    .take()
184                    .or_else(|| ctx.incoming_pkt_receiver.try_recv().ok())
185                {
186                    self.process_incoming(qconn, pkt)?;
187                    did_recv = true;
188                }
189
190                self.conn_stage.on_read(did_recv, qconn, ctx)?;
191
192                let can_release = match self.write_state.next_release_time {
193                    None => true,
194                    Some(next_release) =>
195                        next_release
196                            .checked_duration_since(now)
197                            .unwrap_or_default() <
198                            RELEASE_TIMER_THRESHOLD,
199                };
200
201                self.write_state.has_pending_data &= can_release;
202
203                while self.write_state.has_pending_data &&
204                    packets_sent < CHECK_INCOMING_QUEUE_RATIO
205                {
206                    self.gather_data_from_quiche_conn(qconn, ctx.buffer())?;
207
208                    // Break if the connection is closed
209                    if qconn.is_closed() {
210                        return Ok(());
211                    }
212
213                    self.flush_buffer_to_socket(ctx.buffer()).await;
214                    packets_sent += self.write_state.num_pkts;
215
216                    if let ControlFlow::Break(reason) =
217                        self.conn_stage.on_flush(qconn, ctx)
218                    {
219                        return reason;
220                    }
221                }
222            }
223
224            self.bw_estimator.update(qconn, now);
225
226            self.audit_log_stats
227                .set_max_bandwidth(self.bw_estimator.max_bandwidth);
228            self.audit_log_stats.set_max_loss_pct(
229                (self.bw_estimator.max_loss_pct * 100_f32).round() as u8,
230            );
231
232            let new_deadline = min_of_some(
233                qconn.timeout_instant(),
234                self.write_state.next_release_time,
235            );
236            let new_deadline =
237                min_of_some(new_deadline, self.conn_stage.wait_deadline());
238
239            if new_deadline != current_deadline {
240                current_deadline = new_deadline;
241
242                sleep
243                    .as_mut()
244                    .reset(new_deadline.unwrap_or(now + DEFAULT_SLEEP).into());
245            }
246
247            let incoming_recv = &mut ctx.incoming_pkt_receiver;
248            let application = &mut ctx.application;
249            select! {
250                biased;
251                () = &mut sleep => {
252                    // It's very important that we keep the timeout arm at the top of this loop so
253                    // that we poll it every time we need to. Since this is a biased `select!`, if
254                    // we put this behind another arm, we could theoretically starve the sleep arm
255                    // and hang connections.
256                    //
257                    // See https://docs.rs/tokio/latest/tokio/macro.select.html#fairness for more
258                    qconn.on_timeout();
259
260                    self.write_state.next_release_time = None;
261                    current_deadline = None;
262                    sleep.as_mut().reset((now + DEFAULT_SLEEP).into());
263                }
264                Some(pkt) = incoming_recv.recv() => ctx.in_pkt = Some(pkt),
265                // TODO(erittenhouse): would be nice to decouple wait_for_data from the
266                // application, but wait_for_quiche relies on IOW methods, so we can't write a
267                // default implementation for ConnectionStage
268                status = self.wait_for_data_or_handshake(qconn, application) => status?,
269            };
270
271            if let ControlFlow::Break(reason) = self.conn_stage.post_wait(qconn) {
272                return reason;
273            }
274        }
275    }
276
277    #[inline]
278    fn gather_data_from_quiche_conn(
279        &mut self, qconn: &mut QuicheConnection, send_buf: &mut [u8],
280    ) -> QuicResult<usize> {
281        self.fill_send_buffer(qconn, send_buf)
282    }
283
284    #[cfg(feature = "perf-quic-listener-metrics")]
285    fn measure_complete_handshake_time(&mut self) {
286        if let Some(init_rx_time) = self.init_rx_time.take() {
287            if let Ok(delta) = init_rx_time.elapsed() {
288                self.metrics
289                    .handshake_time_seconds(
290                        labels::QuicHandshakeStage::HandshakeResponse,
291                    )
292                    .observe(delta.as_nanos() as u64);
293            }
294        }
295    }
296
297    fn fill_send_buffer(
298        &mut self, qconn: &mut QuicheConnection, send_buf: &mut [u8],
299    ) -> QuicResult<usize> {
300        let mut segment_size = None;
301        let mut send_info = None;
302
303        self.write_state.num_pkts = 0;
304        self.write_state.bytes_written = 0;
305
306        let now = Instant::now();
307
308        let send_buf = {
309            let trunc = UDP_MAX_GSO_PACKET_SIZE.min(send_buf.len());
310            &mut send_buf[..trunc]
311        };
312
313        #[cfg(feature = "gcongestion")]
314        let gcongestion_enabled = true;
315
316        #[cfg(not(feature = "gcongestion"))]
317        let gcongestion_enabled = qconn.gcongestion_enabled().unwrap_or(false);
318
319        let initial_release_decision = if gcongestion_enabled {
320            let initial_release_decision = qconn
321                .get_next_release_time()
322                .filter(|_| self.pacing_enabled(qconn));
323
324            if let Some(future_release_time) =
325                initial_release_decision.as_ref().and_then(|v| v.time(now))
326            {
327                let max_into_fut = qconn.max_release_into_future();
328
329                if future_release_time.duration_since(now) >= max_into_fut {
330                    self.write_state.next_release_time =
331                        Some(now + max_into_fut.mul_f32(0.8));
332                    self.write_state.has_pending_data = false;
333                    return Ok(0);
334                }
335            }
336
337            initial_release_decision
338        } else {
339            None
340        };
341
342        let buffer_write_outcome = loop {
343            let outcome = self.write_packet_to_buffer(
344                qconn,
345                send_buf,
346                &mut send_info,
347                segment_size,
348            );
349
350            let packet_size = match outcome {
351                Ok(0) => {
352                    self.write_state.has_pending_data = false;
353
354                    break Ok(0);
355                },
356                Ok(bytes_written) => {
357                    self.write_state.has_pending_data = true;
358
359                    bytes_written
360                },
361                Err(e) => break Err(e),
362            };
363
364            // Flush to network after generating a single packet when GSO
365            // is disabled.
366            if !self.cfg.with_gso {
367                break outcome;
368            }
369
370            #[cfg(not(feature = "gcongestion"))]
371            let max_send_size = if !gcongestion_enabled {
372                // Only call qconn.send_quantum when !gcongestion_enabled.
373                tune_max_send_size(
374                    segment_size,
375                    qconn.send_quantum(),
376                    send_buf.len(),
377                )
378            } else {
379                usize::MAX
380            };
381
382            #[cfg(feature = "gcongestion")]
383            let max_send_size = usize::MAX;
384
385            // If segment_size is known, update the maximum of
386            // GSO sender buffer size to the multiple of
387            // segment_size.
388            let buffer_is_full = self.write_state.num_pkts ==
389                UDP_MAX_SEGMENT_COUNT ||
390                self.write_state.bytes_written >= max_send_size;
391
392            if buffer_is_full {
393                break outcome;
394            }
395
396            // Flush to network when the newly generated packet size is
397            // different from previously written packet, as GSO needs packets
398            // to have the same size, except for the last one in the buffer.
399            // The last packet may be smaller than the previous size.
400            match segment_size {
401                Some(size)
402                    if packet_size != size || packet_size < GSO_THRESHOLD =>
403                    break outcome,
404                None => segment_size = Some(packet_size),
405                _ => (),
406            }
407
408            if gcongestion_enabled {
409                // If the release time of next packet is different, or it can't be
410                // part of a burst, start the next batch
411                if let Some(initial_release_decision) = initial_release_decision {
412                    match qconn.get_next_release_time() {
413                        Some(release)
414                            if release.can_burst() ||
415                                release.time_eq(
416                                    &initial_release_decision,
417                                    now,
418                                ) => {},
419                        _ => break outcome,
420                    }
421                }
422            }
423        };
424
425        let tx_time = if gcongestion_enabled {
426            initial_release_decision
427                .filter(|_| self.pacing_enabled(qconn))
428                // Return the time from the release decision if release_decision.time > now, else None.
429                .and_then(|v| v.time(now))
430        } else {
431            send_info
432                .filter(|_| self.pacing_enabled(qconn))
433                .map(|v| v.at)
434        };
435
436        self.write_state.conn_established = qconn.is_established();
437        self.write_state.tx_time = tx_time;
438        self.write_state.segment_size =
439            segment_size.unwrap_or(self.write_state.bytes_written);
440
441        if !gcongestion_enabled {
442            if let Some(time) = tx_time {
443                const DEFAULT_MAX_INTO_FUTURE: Duration =
444                    Duration::from_millis(1);
445                if time
446                    .checked_duration_since(now)
447                    .map(|d| d > DEFAULT_MAX_INTO_FUTURE)
448                    .unwrap_or(false)
449                {
450                    self.write_state.next_release_time =
451                        Some(now + DEFAULT_MAX_INTO_FUTURE.mul_f32(0.8));
452                    self.write_state.has_pending_data = false;
453                    return Ok(0);
454                }
455            }
456        }
457
458        buffer_write_outcome
459    }
460
461    #[cfg(not(feature = "gcongestion"))]
462    fn pacing_enabled(&self, qconn: &QuicheConnection) -> bool {
463        self.cfg.pacing_offload && qconn.pacing_enabled()
464    }
465
466    #[cfg(feature = "gcongestion")]
467    fn pacing_enabled(&self, _qconn: &QuicheConnection) -> bool {
468        self.cfg.pacing_offload
469    }
470
471    fn write_packet_to_buffer(
472        &mut self, qconn: &mut QuicheConnection, send_buf: &mut [u8],
473        send_info: &mut Option<SendInfo>, segment_size: Option<usize>,
474    ) -> QuicResult<usize> {
475        let mut send_buf = &mut send_buf[self.write_state.bytes_written..];
476        if send_buf.len() > segment_size.unwrap_or(usize::MAX) {
477            // Never let the buffer be longer than segment size, for GSO to
478            // function properly
479            send_buf = &mut send_buf[..segment_size.unwrap_or(usize::MAX)];
480        }
481
482        match qconn.send(send_buf) {
483            Ok((packet_size, info)) => {
484                let _ = send_info.get_or_insert(info);
485
486                self.write_state.bytes_written += packet_size;
487                self.write_state.num_pkts += 1;
488                self.write_state.send_from =
489                    send_info.as_ref().map(|info| info.from);
490
491                Ok(packet_size)
492            },
493            Err(QuicheError::Done) => {
494                // Flush to network and yield when there are no
495                // more packets to write.
496                Ok(0)
497            },
498            Err(e) => {
499                let error_code = if let Some(local_error) = qconn.local_error() {
500                    local_error.error_code
501                } else {
502                    let internal_error_code =
503                        quiche::WireErrorCode::InternalError as u64;
504                    let _ = qconn.close(false, internal_error_code, &[]);
505
506                    internal_error_code
507                };
508
509                self.audit_log_stats
510                    .set_sent_conn_close_transport_error_code(error_code as i64);
511
512                Err(Box::new(e))
513            },
514        }
515    }
516
517    async fn flush_buffer_to_socket(&mut self, send_buf: &[u8]) {
518        if self.write_state.bytes_written > 0 {
519            let current_send_buf = &send_buf[..self.write_state.bytes_written];
520            let send_res = if let (Some(udp_socket), true) =
521                (self.socket.as_udp_socket(), self.cfg.with_gso)
522            {
523                // Only UDP supports GSO
524                send_to(
525                    udp_socket,
526                    self.cfg.peer_addr,
527                    self.write_state.send_from.filter(|_| self.cfg.with_pktinfo),
528                    current_send_buf,
529                    self.write_state.segment_size,
530                    self.write_state.tx_time,
531                    self.metrics
532                        .write_errors(labels::QuicWriteError::WouldBlock),
533                )
534                .await
535            } else {
536                self.socket
537                    .send_to(current_send_buf, self.cfg.peer_addr)
538                    .await
539            };
540
541            #[cfg(feature = "perf-quic-listener-metrics")]
542            self.measure_complete_handshake_time();
543
544            match send_res {
545                Ok(n) =>
546                    if n < self.write_state.bytes_written {
547                        self.metrics
548                            .write_errors(labels::QuicWriteError::Partial)
549                            .inc();
550                    },
551                Err(_) => {
552                    self.metrics.write_errors(labels::QuicWriteError::Err).inc();
553                },
554            }
555        }
556    }
557
558    /// Process the incoming packet
559    fn process_incoming(
560        &mut self, qconn: &mut QuicheConnection, mut pkt: Incoming,
561    ) -> QuicResult<()> {
562        let recv_info = quiche::RecvInfo {
563            from: pkt.peer_addr,
564            to: pkt.local_addr,
565        };
566
567        if let Some(gro) = pkt.gro {
568            for dgram in pkt.buf.chunks_mut(gro as usize) {
569                qconn.recv(dgram, recv_info)?;
570            }
571        } else {
572            qconn.recv(&mut pkt.buf, recv_info)?;
573        }
574
575        Ok(())
576    }
577
578    /// When a connection is established, process application data, if not the
579    /// task is probably polled following a wakeup from boring, so we check
580    /// if quiche has any handshake packets to send.
581    async fn wait_for_data_or_handshake<A: ApplicationOverQuic>(
582        &mut self, qconn: &mut QuicheConnection, quic_application: &mut A,
583    ) -> QuicResult<()> {
584        if quic_application.should_act() {
585            quic_application.wait_for_data(qconn).await
586        } else {
587            self.wait_for_quiche(qconn, quic_application).await
588        }
589    }
590
591    /// Check if Quiche has any packets to send and flush them to socket.
592    ///
593    /// # Example
594    ///
595    /// This function can be used, for example, to drive an asynchronous TLS
596    /// handshake. Each call to `gather_data_from_quiche_conn` attempts to
597    /// progress the handshake via a call to `quiche::Connection.send()` -
598    /// once one of the `gather_data_from_quiche_conn()` calls writes to the
599    /// send buffer, we flush it to the network socket.
600    async fn wait_for_quiche<App: ApplicationOverQuic>(
601        &mut self, qconn: &mut QuicheConnection, app: &mut App,
602    ) -> QuicResult<()> {
603        let populate_send_buf = std::future::poll_fn(|_| {
604            match self.gather_data_from_quiche_conn(qconn, app.buffer()) {
605                Ok(bytes_written) => {
606                    // We need to avoid consecutive calls to gather(), which write
607                    // data to the buffer, without a flush().
608                    // If we don't avoid those consecutive calls, we end
609                    // up overwriting data in the buffer or unnecessarily waiting
610                    // for more calls to drive_handshake()
611                    // before calling the handshake complete.
612                    if bytes_written == 0 && self.write_state.bytes_written == 0 {
613                        Poll::Pending
614                    } else {
615                        Poll::Ready(Ok(()))
616                    }
617                },
618                _ => Poll::Ready(Err(quiche::Error::TlsFail)),
619            }
620        })
621        .await;
622
623        if populate_send_buf.is_err() {
624            return Err(Box::new(quiche::Error::TlsFail));
625        }
626
627        self.flush_buffer_to_socket(app.buffer()).await;
628
629        Ok(())
630    }
631}
632
633pub struct Running<Tx, M, A> {
634    pub(crate) params: IoWorkerParams<Tx, M>,
635    pub(crate) context: ConnectionStageContext<A>,
636    pub(crate) qconn: QuicheConnection,
637}
638
639impl<Tx, M, A> Running<Tx, M, A> {
640    pub fn ssl(&mut self) -> &mut SslRef {
641        self.qconn.as_mut()
642    }
643}
644
645pub(crate) struct Closing<Tx, M, A> {
646    pub(crate) params: IoWorkerParams<Tx, M>,
647    pub(crate) context: ConnectionStageContext<A>,
648    pub(crate) work_loop_result: QuicResult<()>,
649    pub(crate) qconn: QuicheConnection,
650}
651
652pub enum RunningOrClosing<Tx, M, A> {
653    Running(Running<Tx, M, A>),
654    Closing(Closing<Tx, M, A>),
655}
656
657impl<Tx, M> IoWorker<Tx, M, Handshake>
658where
659    Tx: DatagramSocketSend + Send,
660    M: Metrics,
661{
662    pub(crate) async fn run<A>(
663        mut self, mut qconn: QuicheConnection, mut ctx: ConnectionStageContext<A>,
664    ) -> RunningOrClosing<Tx, M, A>
665    where
666        A: ApplicationOverQuic,
667    {
668        // This makes an assumption that the waker being set in ex_data is stable
669        // accross the active task's lifetime. Moving a future that encompasses an
670        // async callback from this task accross a channel, for example, will
671        // cause issues as this waker will then be stale and attempt to
672        // wake the wrong task.
673        std::future::poll_fn(|cx| {
674            let ssl = qconn.as_mut();
675            ssl.set_task_waker(Some(cx.waker().clone()));
676
677            Poll::Ready(())
678        })
679        .await;
680
681        let mut work_loop_result = self.work_loop(&mut qconn, &mut ctx).await;
682        if work_loop_result.is_ok() && qconn.is_closed() {
683            work_loop_result = Err(HandshakeError::ConnectionClosed.into());
684        }
685
686        if let Err(err) = &work_loop_result {
687            self.metrics.failed_handshakes(err.into()).inc();
688
689            return RunningOrClosing::Closing(Closing {
690                params: self.into(),
691                context: ctx,
692                work_loop_result,
693                qconn,
694            });
695        };
696
697        match self.on_conn_established(&mut qconn, &mut ctx.application) {
698            Ok(()) => RunningOrClosing::Running(Running {
699                params: self.into(),
700                context: ctx,
701                qconn,
702            }),
703            Err(e) => {
704                foundations::telemetry::log::warn!(
705                    "Handshake stage on_connection_established failed"; "error"=>%e
706                );
707
708                RunningOrClosing::Closing(Closing {
709                    params: self.into(),
710                    context: ctx,
711                    work_loop_result,
712                    qconn,
713                })
714            },
715        }
716    }
717
718    fn on_conn_established<App: ApplicationOverQuic>(
719        &mut self, qconn: &mut QuicheConnection, driver: &mut App,
720    ) -> QuicResult<()> {
721        // Only calculate the QUIC handshake duration and call the driver's
722        // on_conn_established hook if this is the first time
723        // is_established == true.
724        if self.audit_log_stats.transport_handshake_duration_us() == -1 {
725            self.conn_stage.handshake_info.set_elapsed();
726            let handshake_info = &self.conn_stage.handshake_info;
727
728            self.audit_log_stats
729                .set_transport_handshake_duration(handshake_info.elapsed());
730
731            driver.on_conn_established(qconn, handshake_info)?;
732        }
733
734        if let Some(cid) = self.cfg.pending_cid.take() {
735            let _ = self
736                .conn_map_cmd_tx
737                .send(ConnectionMapCommand::UnmapCid(cid));
738        }
739
740        Ok(())
741    }
742}
743
744impl<Tx, M, S> From<IoWorker<Tx, M, S>> for IoWorkerParams<Tx, M> {
745    fn from(value: IoWorker<Tx, M, S>) -> Self {
746        Self {
747            socket: value.socket,
748            shutdown_tx: value.shutdown_tx,
749            cfg: value.cfg,
750            audit_log_stats: value.audit_log_stats,
751            write_state: value.write_state,
752            conn_map_cmd_tx: value.conn_map_cmd_tx,
753            #[cfg(feature = "perf-quic-listener-metrics")]
754            init_rx_time: value.init_rx_time,
755            metrics: value.metrics,
756        }
757    }
758}
759
760impl<Tx, M> IoWorker<Tx, M, RunningApplication>
761where
762    Tx: DatagramSocketSend + Send,
763    M: Metrics,
764{
765    pub(crate) async fn run<A: ApplicationOverQuic>(
766        mut self, mut qconn: QuicheConnection, mut ctx: ConnectionStageContext<A>,
767    ) -> Closing<Tx, M, A> {
768        // Perform a single call to process_reads()/process_writes(),
769        // unconditionally, to ensure that any application data (e.g.
770        // STREAM frames or datagrams) processed by the Handshake
771        // stage are properly passed to the application.
772        if let Err(e) = self.conn_stage.on_read(true, &mut qconn, &mut ctx) {
773            return Closing {
774                params: self.into(),
775                context: ctx,
776                work_loop_result: Err(e),
777                qconn,
778            };
779        };
780
781        let work_loop_result = self.work_loop(&mut qconn, &mut ctx).await;
782
783        Closing {
784            params: self.into(),
785            context: ctx,
786            work_loop_result,
787            qconn,
788        }
789    }
790}
791
792impl<Tx, M> IoWorker<Tx, M, Close>
793where
794    Tx: DatagramSocketSend + Send,
795    M: Metrics,
796{
797    pub(crate) async fn close<A: ApplicationOverQuic>(
798        mut self, qconn: &mut QuicheConnection,
799        ctx: &mut ConnectionStageContext<A>,
800    ) {
801        if self.conn_stage.work_loop_result.is_ok() &&
802            self.bw_estimator.max_bandwidth > 0
803        {
804            let metrics = &self.metrics;
805
806            metrics
807                .max_bandwidth_mbps()
808                .observe(self.bw_estimator.max_bandwidth as f64 * 1e-6);
809
810            metrics
811                .max_loss_pct()
812                .observe(self.bw_estimator.max_loss_pct as f64 * 100.);
813        }
814
815        if ctx.application.should_act() {
816            ctx.application.on_conn_close(
817                qconn,
818                &self.metrics,
819                &self.conn_stage.work_loop_result,
820            );
821        }
822
823        // TODO: this assumes that the tidy_up operation can be completed in one
824        // send (ignoring flow/congestion control constraints). We should
825        // guarantee that it gets sent by doublechecking the
826        // gathered/flushed byte totals and retry if they don't match.
827        let _ = self.gather_data_from_quiche_conn(qconn, ctx.buffer());
828        self.flush_buffer_to_socket(ctx.buffer()).await;
829
830        *ctx.stats.lock().unwrap() = QuicConnectionStats::from_conn(qconn);
831
832        if let Some(err) = qconn.peer_error() {
833            if err.is_app {
834                self.audit_log_stats
835                    .set_recvd_conn_close_application_error_code(
836                        err.error_code as _,
837                    );
838            } else {
839                self.audit_log_stats
840                    .set_recvd_conn_close_transport_error_code(
841                        err.error_code as _,
842                    );
843            }
844        }
845
846        self.close_connection(qconn);
847
848        if let Err(work_loop_error) = self.conn_stage.work_loop_result {
849            self.audit_log_stats
850                .set_connection_close_reason(work_loop_error);
851        }
852    }
853
854    fn close_connection(&mut self, qconn: &QuicheConnection) {
855        let scid = qconn.source_id().into_owned();
856
857        if let Some(cid) = self.cfg.pending_cid.take() {
858            let _ = self
859                .conn_map_cmd_tx
860                .send(ConnectionMapCommand::UnmapCid(cid));
861        }
862
863        let _ = self
864            .conn_map_cmd_tx
865            .send(ConnectionMapCommand::RemoveScid(scid));
866
867        self.metrics.connections_in_memory().dec();
868    }
869}
870
871/// Returns the minimum of `v1` and `v2`, ignoring `None`s.
872fn min_of_some<T: Ord>(v1: Option<T>, v2: Option<T>) -> Option<T> {
873    match (v1, v2) {
874        (Some(a), Some(b)) => Some(a.min(b)),
875        (Some(v), _) | (_, Some(v)) => Some(v),
876        (None, None) => None,
877    }
878}