1use std::net::SocketAddr;
28use std::ops::ControlFlow;
29use std::sync::Arc;
30use std::task::Poll;
31use std::time::Duration;
32use std::time::Instant;
33#[cfg(feature = "perf-quic-listener-metrics")]
34use std::time::SystemTime;
35
36use super::connection_stage::Close;
37use super::connection_stage::ConnectionStage;
38use super::connection_stage::ConnectionStageContext;
39use super::connection_stage::Handshake;
40use super::connection_stage::RunningApplication;
41use super::gso::*;
42use super::utilization_estimator::BandwidthReporter;
43
44use crate::metrics::labels;
45use crate::metrics::Metrics;
46use crate::quic::connection::ApplicationOverQuic;
47use crate::quic::connection::HandshakeError;
48use crate::quic::connection::Incoming;
49use crate::quic::connection::QuicConnectionStats;
50use crate::quic::router::ConnectionMapCommand;
51use crate::quic::QuicheConnection;
52use crate::QuicResult;
53
54use boring::ssl::SslRef;
55use datagram_socket::DatagramSocketSend;
56use datagram_socket::DatagramSocketSendExt;
57use datagram_socket::MaybeConnectedSocket;
58use datagram_socket::QuicAuditStats;
59use foundations::telemetry::log;
60use quiche::ConnectionId;
61use quiche::Error as QuicheError;
62use quiche::SendInfo;
63use tokio::select;
64use tokio::sync::mpsc;
65use tokio::time;
66
67pub(crate) const INCOMING_QUEUE_SIZE: usize = 2048;
69
70pub(crate) const CHECK_INCOMING_QUEUE_RATIO: usize = INCOMING_QUEUE_SIZE / 16;
73
74const RELEASE_TIMER_THRESHOLD: Duration = Duration::from_micros(250);
75
76const GSO_THRESHOLD: usize = 1_000;
78
79pub struct WriterConfig {
80 pub pending_cid: Option<ConnectionId<'static>>,
81 pub peer_addr: SocketAddr,
82 pub with_gso: bool,
83 pub pacing_offload: bool,
84 pub with_pktinfo: bool,
85}
86
87#[derive(Default)]
88pub(crate) struct WriteState {
89 conn_established: bool,
90 bytes_written: usize,
91 segment_size: usize,
92 num_pkts: usize,
93 tx_time: Option<Instant>,
94 has_pending_data: bool,
95 next_release_time: Option<Instant>,
98 send_from: Option<SocketAddr>,
101}
102
103pub(crate) struct IoWorkerParams<Tx, M> {
104 pub(crate) socket: MaybeConnectedSocket<Tx>,
105 pub(crate) shutdown_tx: mpsc::Sender<()>,
106 pub(crate) cfg: WriterConfig,
107 pub(crate) audit_log_stats: Arc<QuicAuditStats>,
108 pub(crate) write_state: WriteState,
109 pub(crate) conn_map_cmd_tx: mpsc::UnboundedSender<ConnectionMapCommand>,
110 #[cfg(feature = "perf-quic-listener-metrics")]
111 pub(crate) init_rx_time: Option<SystemTime>,
112 pub(crate) metrics: M,
113}
114
115pub(crate) struct IoWorker<Tx, M, S> {
116 socket: MaybeConnectedSocket<Tx>,
117 shutdown_tx: mpsc::Sender<()>,
121 cfg: WriterConfig,
122 audit_log_stats: Arc<QuicAuditStats>,
123 write_state: WriteState,
124 conn_map_cmd_tx: mpsc::UnboundedSender<ConnectionMapCommand>,
125 #[cfg(feature = "perf-quic-listener-metrics")]
126 init_rx_time: Option<SystemTime>,
127 metrics: M,
128 conn_stage: S,
129 bw_estimator: BandwidthReporter,
130}
131
132impl<Tx, M, S> IoWorker<Tx, M, S>
133where
134 Tx: DatagramSocketSend + Send,
135 M: Metrics,
136 S: ConnectionStage,
137{
138 pub(crate) fn new(params: IoWorkerParams<Tx, M>, conn_stage: S) -> Self {
139 let bw_estimator =
140 BandwidthReporter::new(params.metrics.utilized_bandwidth());
141
142 log::trace!("Creating IoWorker with stage: {conn_stage:?}");
143
144 Self {
145 socket: params.socket,
146 shutdown_tx: params.shutdown_tx,
147 cfg: params.cfg,
148 audit_log_stats: params.audit_log_stats,
149 write_state: params.write_state,
150 conn_map_cmd_tx: params.conn_map_cmd_tx,
151 #[cfg(feature = "perf-quic-listener-metrics")]
152 init_rx_time: params.init_rx_time,
153 metrics: params.metrics,
154 conn_stage,
155 bw_estimator,
156 }
157 }
158
159 async fn work_loop<A: ApplicationOverQuic>(
160 &mut self, qconn: &mut QuicheConnection,
161 ctx: &mut ConnectionStageContext<A>,
162 ) -> QuicResult<()> {
163 const DEFAULT_SLEEP: Duration = Duration::from_secs(60);
164 let mut current_deadline: Option<Instant> = None;
165 let sleep = time::sleep(DEFAULT_SLEEP);
166 tokio::pin!(sleep);
167
168 loop {
169 let now = Instant::now();
170
171 self.write_state.has_pending_data = true;
172
173 while self.write_state.has_pending_data {
174 let mut packets_sent = 0;
175
176 let mut did_recv = false;
181 while let Some(pkt) = ctx
182 .in_pkt
183 .take()
184 .or_else(|| ctx.incoming_pkt_receiver.try_recv().ok())
185 {
186 self.process_incoming(qconn, pkt)?;
187 did_recv = true;
188 }
189
190 self.conn_stage.on_read(did_recv, qconn, ctx)?;
191
192 let can_release = match self.write_state.next_release_time {
193 None => true,
194 Some(next_release) =>
195 next_release
196 .checked_duration_since(now)
197 .unwrap_or_default() <
198 RELEASE_TIMER_THRESHOLD,
199 };
200
201 self.write_state.has_pending_data &= can_release;
202
203 while self.write_state.has_pending_data &&
204 packets_sent < CHECK_INCOMING_QUEUE_RATIO
205 {
206 self.gather_data_from_quiche_conn(qconn, ctx.buffer())?;
207
208 if qconn.is_closed() {
210 return Ok(());
211 }
212
213 self.flush_buffer_to_socket(ctx.buffer()).await;
214 packets_sent += self.write_state.num_pkts;
215
216 if let ControlFlow::Break(reason) =
217 self.conn_stage.on_flush(qconn, ctx)
218 {
219 return reason;
220 }
221 }
222 }
223
224 self.bw_estimator.update(qconn, now);
225
226 let new_deadline = min_of_some(
227 qconn.timeout_instant(),
228 self.write_state.next_release_time,
229 );
230 let new_deadline =
231 min_of_some(new_deadline, self.conn_stage.wait_deadline());
232
233 if new_deadline != current_deadline {
234 current_deadline = new_deadline;
235
236 sleep
237 .as_mut()
238 .reset(new_deadline.unwrap_or(now + DEFAULT_SLEEP).into());
239 }
240
241 let incoming_recv = &mut ctx.incoming_pkt_receiver;
242 let application = &mut ctx.application;
243 select! {
244 biased;
245 () = &mut sleep => {
246 qconn.on_timeout();
253
254 self.write_state.next_release_time = None;
255 current_deadline = None;
256 sleep.as_mut().reset((now + DEFAULT_SLEEP).into());
257 }
258 Some(pkt) = incoming_recv.recv() => ctx.in_pkt = Some(pkt),
259 status = self.wait_for_data_or_handshake(qconn, application) => status?,
263 };
264
265 if let ControlFlow::Break(reason) = self.conn_stage.post_wait(qconn) {
266 return reason;
267 }
268 }
269 }
270
271 #[inline]
272 fn gather_data_from_quiche_conn(
273 &mut self, qconn: &mut QuicheConnection, send_buf: &mut [u8],
274 ) -> QuicResult<usize> {
275 self.fill_send_buffer(qconn, send_buf)
276 }
277
278 #[cfg(feature = "perf-quic-listener-metrics")]
279 fn measure_complete_handshake_time(&mut self) {
280 if let Some(init_rx_time) = self.init_rx_time.take() {
281 if let Ok(delta) = init_rx_time.elapsed() {
282 self.metrics
283 .handshake_time_seconds(
284 labels::QuicHandshakeStage::HandshakeResponse,
285 )
286 .observe(delta.as_nanos() as u64);
287 }
288 }
289 }
290
291 fn fill_send_buffer(
292 &mut self, qconn: &mut QuicheConnection, send_buf: &mut [u8],
293 ) -> QuicResult<usize> {
294 let mut segment_size = None;
295 let mut send_info = None;
296
297 self.write_state.num_pkts = 0;
298 self.write_state.bytes_written = 0;
299
300 let now = Instant::now();
301
302 let send_buf = {
303 let trunc = UDP_MAX_GSO_PACKET_SIZE.min(send_buf.len());
304 &mut send_buf[..trunc]
305 };
306
307 #[cfg(feature = "gcongestion")]
308 let gcongestion_enabled = true;
309
310 #[cfg(not(feature = "gcongestion"))]
311 let gcongestion_enabled = qconn.gcongestion_enabled().unwrap_or(false);
312
313 let initial_release_decision = if gcongestion_enabled {
314 let initial_release_decision = qconn
315 .get_next_release_time()
316 .filter(|_| self.cfg.pacing_offload);
317
318 if let Some(future_release_time) =
319 initial_release_decision.as_ref().and_then(|v| v.time(now))
320 {
321 let max_into_fut = qconn.max_release_into_future();
322
323 if future_release_time.duration_since(now) >= max_into_fut {
324 self.write_state.next_release_time =
325 Some(now + max_into_fut.mul_f32(0.8));
326 self.write_state.has_pending_data = false;
327 return Ok(0);
328 }
329 }
330
331 initial_release_decision
332 } else {
333 None
334 };
335
336 let buffer_write_outcome = loop {
337 let outcome = self.write_packet_to_buffer(
338 qconn,
339 send_buf,
340 &mut send_info,
341 segment_size,
342 );
343
344 let packet_size = match outcome {
345 Ok(0) => {
346 self.write_state.has_pending_data = false;
347
348 break Ok(0);
349 },
350 Ok(bytes_written) => {
351 self.write_state.has_pending_data = true;
352
353 bytes_written
354 },
355 Err(e) => break Err(e),
356 };
357
358 if !self.cfg.with_gso {
361 break outcome;
362 }
363
364 #[cfg(not(feature = "gcongestion"))]
365 let max_send_size = if !gcongestion_enabled {
366 tune_max_send_size(
368 segment_size,
369 qconn.send_quantum(),
370 send_buf.len(),
371 )
372 } else {
373 usize::MAX
374 };
375
376 #[cfg(feature = "gcongestion")]
377 let max_send_size = usize::MAX;
378
379 let buffer_is_full = self.write_state.num_pkts ==
383 UDP_MAX_SEGMENT_COUNT ||
384 self.write_state.bytes_written >= max_send_size;
385
386 if buffer_is_full {
387 break outcome;
388 }
389
390 match segment_size {
395 Some(size)
396 if packet_size != size || packet_size < GSO_THRESHOLD =>
397 break outcome,
398 None => segment_size = Some(packet_size),
399 _ => (),
400 }
401
402 if gcongestion_enabled {
403 if let Some(initial_release_decision) = initial_release_decision {
406 match qconn.get_next_release_time() {
407 Some(release)
408 if release.can_burst() ||
409 release.time_eq(
410 &initial_release_decision,
411 now,
412 ) => {},
413 _ => break outcome,
414 }
415 }
416 }
417 };
418
419 let tx_time = if gcongestion_enabled {
420 initial_release_decision
421 .filter(|_| self.cfg.pacing_offload)
422 .and_then(|v| v.time(now))
424 } else {
425 send_info.filter(|_| self.cfg.pacing_offload).map(|v| v.at)
426 };
427
428 self.write_state.conn_established = qconn.is_established();
429 self.write_state.tx_time = tx_time;
430 self.write_state.segment_size =
431 segment_size.unwrap_or(self.write_state.bytes_written);
432
433 if !gcongestion_enabled {
434 if let Some(time) = tx_time {
435 const DEFAULT_MAX_INTO_FUTURE: Duration =
436 Duration::from_millis(1);
437 if time
438 .checked_duration_since(now)
439 .map(|d| d > DEFAULT_MAX_INTO_FUTURE)
440 .unwrap_or(false)
441 {
442 self.write_state.next_release_time =
443 Some(now + DEFAULT_MAX_INTO_FUTURE.mul_f32(0.8));
444 self.write_state.has_pending_data = false;
445 return Ok(0);
446 }
447 }
448 }
449
450 buffer_write_outcome
451 }
452
453 fn write_packet_to_buffer(
454 &mut self, qconn: &mut QuicheConnection, send_buf: &mut [u8],
455 send_info: &mut Option<SendInfo>, segment_size: Option<usize>,
456 ) -> QuicResult<usize> {
457 let mut send_buf = &mut send_buf[self.write_state.bytes_written..];
458 if send_buf.len() > segment_size.unwrap_or(usize::MAX) {
459 send_buf = &mut send_buf[..segment_size.unwrap_or(usize::MAX)];
462 }
463
464 match qconn.send(send_buf) {
465 Ok((packet_size, info)) => {
466 let _ = send_info.get_or_insert(info);
467
468 self.write_state.bytes_written += packet_size;
469 self.write_state.num_pkts += 1;
470 self.write_state.send_from =
471 send_info.as_ref().map(|info| info.from);
472
473 Ok(packet_size)
474 },
475 Err(QuicheError::Done) => {
476 Ok(0)
479 },
480 Err(e) => {
481 if let Some(local_error) = qconn.local_error() {
482 self.audit_log_stats
483 .set_sent_conn_close_transport_error_code(
484 local_error.error_code as i64,
485 );
486 log::error!(
487 "quiche::send failed and connection closed with error_code: {}",
488 local_error.error_code
489 );
490 } else {
491 let internal_error_code =
492 quiche::WireErrorCode::InternalError as u64;
493
494 self.audit_log_stats
495 .set_sent_conn_close_transport_error_code(
496 internal_error_code as i64,
497 );
498
499 let _ = qconn.close(false, internal_error_code, &[]);
500 log::error!(
501 "quiche::send failed, closing connection with INTERNAL_ERROR: {}",
502 internal_error_code
503 );
504 }
505
506 Err(Box::new(e))
507 },
508 }
509 }
510
511 async fn flush_buffer_to_socket(&mut self, send_buf: &[u8]) {
512 if self.write_state.bytes_written > 0 {
513 let current_send_buf = &send_buf[..self.write_state.bytes_written];
514 let send_res = if let (Some(udp_socket), true) =
515 (self.socket.as_udp_socket(), self.cfg.with_gso)
516 {
517 send_to(
519 udp_socket,
520 self.cfg.peer_addr,
521 self.write_state.send_from.filter(|_| self.cfg.with_pktinfo),
522 current_send_buf,
523 self.write_state.segment_size,
524 self.write_state.num_pkts,
525 self.write_state.tx_time,
526 )
527 .await
528 } else {
529 self.socket
530 .send_to(current_send_buf, self.cfg.peer_addr)
531 .await
532 };
533
534 #[cfg(feature = "perf-quic-listener-metrics")]
535 self.measure_complete_handshake_time();
536
537 match send_res {
538 Ok(n) =>
539 if n < self.write_state.bytes_written {
540 self.metrics
541 .write_errors(labels::QuicWriteError::Partial)
542 .inc();
543 },
544 Err(_) => {
545 self.metrics.write_errors(labels::QuicWriteError::Err).inc();
546 },
547 }
548 }
549 }
550
551 fn process_incoming(
553 &mut self, qconn: &mut QuicheConnection, mut pkt: Incoming,
554 ) -> QuicResult<()> {
555 let recv_info = quiche::RecvInfo {
556 from: pkt.peer_addr,
557 to: pkt.local_addr,
558 };
559
560 if let Some(gro) = pkt.gro {
561 for dgram in pkt.buf.chunks_mut(gro as usize) {
562 qconn.recv(dgram, recv_info)?;
563 }
564 } else {
565 qconn.recv(&mut pkt.buf, recv_info)?;
566 }
567
568 Ok(())
569 }
570
571 async fn wait_for_data_or_handshake<A: ApplicationOverQuic>(
575 &mut self, qconn: &mut QuicheConnection, quic_application: &mut A,
576 ) -> QuicResult<()> {
577 if quic_application.should_act() {
578 quic_application.wait_for_data(qconn).await
579 } else {
580 self.wait_for_quiche(qconn, quic_application).await
581 }
582 }
583
584 async fn wait_for_quiche<App: ApplicationOverQuic>(
594 &mut self, qconn: &mut QuicheConnection, app: &mut App,
595 ) -> QuicResult<()> {
596 let populate_send_buf = std::future::poll_fn(|_| {
597 match self.gather_data_from_quiche_conn(qconn, app.buffer()) {
598 Ok(bytes_written) => {
599 if bytes_written == 0 && self.write_state.bytes_written == 0 {
606 Poll::Pending
607 } else {
608 Poll::Ready(Ok(()))
609 }
610 },
611 _ => Poll::Ready(Err(quiche::Error::TlsFail)),
612 }
613 })
614 .await;
615
616 if populate_send_buf.is_err() {
617 return Err(Box::new(quiche::Error::TlsFail));
618 }
619
620 self.flush_buffer_to_socket(app.buffer()).await;
621
622 Ok(())
623 }
624}
625
626pub struct Running<Tx, M, A> {
627 pub(crate) params: IoWorkerParams<Tx, M>,
628 pub(crate) context: ConnectionStageContext<A>,
629 pub(crate) qconn: QuicheConnection,
630}
631
632impl<Tx, M, A> Running<Tx, M, A> {
633 pub fn ssl(&mut self) -> &mut SslRef {
634 self.qconn.as_mut()
635 }
636}
637
638pub(crate) struct Closing<Tx, M, A> {
639 pub(crate) params: IoWorkerParams<Tx, M>,
640 pub(crate) context: ConnectionStageContext<A>,
641 pub(crate) work_loop_result: QuicResult<()>,
642 pub(crate) qconn: QuicheConnection,
643}
644
645pub enum RunningOrClosing<Tx, M, A> {
646 Running(Running<Tx, M, A>),
647 Closing(Closing<Tx, M, A>),
648}
649
650impl<Tx, M> IoWorker<Tx, M, Handshake>
651where
652 Tx: DatagramSocketSend + Send,
653 M: Metrics,
654{
655 pub(crate) async fn run<A>(
656 mut self, mut qconn: QuicheConnection, mut ctx: ConnectionStageContext<A>,
657 ) -> RunningOrClosing<Tx, M, A>
658 where
659 A: ApplicationOverQuic,
660 {
661 std::future::poll_fn(|cx| {
667 let ssl = qconn.as_mut();
668 ssl.set_task_waker(Some(cx.waker().clone()));
669
670 Poll::Ready(())
671 })
672 .await;
673
674 let mut work_loop_result = self.work_loop(&mut qconn, &mut ctx).await;
675 if work_loop_result.is_ok() && qconn.is_closed() {
676 work_loop_result = Err(HandshakeError::ConnectionClosed.into());
677 }
678
679 if let Err(err) = &work_loop_result {
680 self.metrics.failed_handshakes(err.into()).inc();
681
682 return RunningOrClosing::Closing(Closing {
683 params: self.into(),
684 context: ctx,
685 work_loop_result,
686 qconn,
687 });
688 };
689
690 match self.on_conn_established(&mut qconn, &mut ctx.application) {
691 Ok(()) => RunningOrClosing::Running(Running {
692 params: self.into(),
693 context: ctx,
694 qconn,
695 }),
696 Err(e) => {
697 foundations::telemetry::log::warn!(
698 "Handshake stage on_connection_established failed"; "error"=>%e
699 );
700
701 RunningOrClosing::Closing(Closing {
702 params: self.into(),
703 context: ctx,
704 work_loop_result,
705 qconn,
706 })
707 },
708 }
709 }
710
711 fn on_conn_established<App: ApplicationOverQuic>(
712 &mut self, qconn: &mut QuicheConnection, driver: &mut App,
713 ) -> QuicResult<()> {
714 if self.audit_log_stats.transport_handshake_duration_us() == -1 {
718 self.conn_stage.handshake_info.set_elapsed();
719 let handshake_info = &self.conn_stage.handshake_info;
720
721 self.audit_log_stats
722 .set_transport_handshake_duration(handshake_info.elapsed());
723
724 driver.on_conn_established(qconn, handshake_info)?;
725 }
726
727 if let Some(cid) = self.cfg.pending_cid.take() {
728 let _ = self
729 .conn_map_cmd_tx
730 .send(ConnectionMapCommand::UnmapCid(cid));
731 }
732
733 Ok(())
734 }
735}
736
737impl<Tx, M, S> From<IoWorker<Tx, M, S>> for IoWorkerParams<Tx, M> {
738 fn from(value: IoWorker<Tx, M, S>) -> Self {
739 Self {
740 socket: value.socket,
741 shutdown_tx: value.shutdown_tx,
742 cfg: value.cfg,
743 audit_log_stats: value.audit_log_stats,
744 write_state: value.write_state,
745 conn_map_cmd_tx: value.conn_map_cmd_tx,
746 #[cfg(feature = "perf-quic-listener-metrics")]
747 init_rx_time: value.init_rx_time,
748 metrics: value.metrics,
749 }
750 }
751}
752
753impl<Tx, M> IoWorker<Tx, M, RunningApplication>
754where
755 Tx: DatagramSocketSend + Send,
756 M: Metrics,
757{
758 pub(crate) async fn run<A: ApplicationOverQuic>(
759 mut self, mut qconn: QuicheConnection, mut ctx: ConnectionStageContext<A>,
760 ) -> Closing<Tx, M, A> {
761 let work_loop_result = self.work_loop(&mut qconn, &mut ctx).await;
762
763 Closing {
764 params: self.into(),
765 context: ctx,
766 work_loop_result,
767 qconn,
768 }
769 }
770}
771
772impl<Tx, M> IoWorker<Tx, M, Close>
773where
774 Tx: DatagramSocketSend + Send,
775 M: Metrics,
776{
777 pub(crate) async fn close<A: ApplicationOverQuic>(
778 mut self, qconn: &mut QuicheConnection,
779 ctx: &mut ConnectionStageContext<A>,
780 ) {
781 if self.conn_stage.work_loop_result.is_ok() &&
782 self.bw_estimator.max_bandwidth > 0
783 {
784 let metrics = &self.metrics;
785
786 metrics
787 .max_bandwidth_mbps()
788 .observe(self.bw_estimator.max_bandwidth as f64 * 1e-6);
789
790 metrics
791 .max_loss_pct()
792 .observe(self.bw_estimator.max_loss_pct as f64 * 100.);
793 }
794
795 if ctx.application.should_act() {
796 ctx.application.on_conn_close(
797 qconn,
798 &self.metrics,
799 &self.conn_stage.work_loop_result,
800 );
801 }
802
803 let _ = self.gather_data_from_quiche_conn(qconn, ctx.buffer());
808 self.flush_buffer_to_socket(ctx.buffer()).await;
809
810 *ctx.stats.lock().unwrap() = QuicConnectionStats::from_conn(qconn);
811
812 if let Some(err) = qconn.peer_error() {
813 if err.is_app {
814 self.audit_log_stats
815 .set_recvd_conn_close_application_error_code(
816 err.error_code as _,
817 );
818 } else {
819 self.audit_log_stats
820 .set_recvd_conn_close_transport_error_code(
821 err.error_code as _,
822 );
823 }
824 }
825
826 self.close_connection(qconn);
827
828 if let Err(work_loop_error) = self.conn_stage.work_loop_result {
829 self.audit_log_stats
830 .set_connection_close_reason(work_loop_error);
831 }
832 }
833
834 fn close_connection(&mut self, qconn: &QuicheConnection) {
835 let scid = qconn.source_id().into_owned();
836
837 if let Some(cid) = self.cfg.pending_cid.take() {
838 let _ = self
839 .conn_map_cmd_tx
840 .send(ConnectionMapCommand::UnmapCid(cid));
841 }
842
843 let _ = self
844 .conn_map_cmd_tx
845 .send(ConnectionMapCommand::RemoveScid(scid));
846
847 self.metrics.connections_in_memory().dec();
848 }
849}
850
851fn min_of_some<T: Ord>(v1: Option<T>, v2: Option<T>) -> Option<T> {
853 match (v1, v2) {
854 (Some(a), Some(b)) => Some(a.min(b)),
855 (Some(v), _) | (_, Some(v)) => Some(v),
856 (None, None) => None,
857 }
858}