1mod client;
28pub mod connection;
30mod datagram;
31mod hooks;
34mod server;
35mod streams;
36
37use std::collections::BTreeMap;
38use std::error::Error;
39use std::fmt;
40use std::marker::PhantomData;
41use std::sync::Arc;
42
43use datagram_socket::StreamClosureKind;
44use foundations::telemetry::log;
45use futures::FutureExt;
46use futures_util::stream::FuturesUnordered;
47use quiche::h3;
48use tokio::select;
49use tokio::sync::mpsc;
50use tokio::sync::mpsc::error::TryRecvError;
51use tokio::sync::mpsc::error::TrySendError;
52use tokio::sync::mpsc::UnboundedReceiver;
53use tokio::sync::mpsc::UnboundedSender;
54use tokio_stream::StreamExt;
55use tokio_util::sync::PollSender;
56
57use self::hooks::DriverHooks;
58use self::hooks::InboundHeaders;
59use self::streams::FlowCtx;
60use self::streams::HaveUpstreamCapacity;
61use self::streams::ReceivedDownstreamData;
62use self::streams::StreamCtx;
63use self::streams::StreamReady;
64use self::streams::WaitForDownstreamData;
65use self::streams::WaitForStream;
66use self::streams::WaitForUpstreamCapacity;
67use crate::buf_factory::BufFactory;
68use crate::buf_factory::PooledBuf;
69use crate::buf_factory::PooledDgram;
70use crate::http3::settings::Http3Settings;
71use crate::http3::H3AuditStats;
72use crate::metrics::Metrics;
73use crate::quic::HandshakeInfo;
74use crate::quic::QuicCommand;
75use crate::quic::QuicheConnection;
76use crate::ApplicationOverQuic;
77use crate::QuicResult;
78
79pub use self::client::ClientEventStream;
80pub use self::client::ClientH3Command;
81pub use self::client::ClientH3Controller;
82pub use self::client::ClientH3Driver;
83pub use self::client::ClientH3Event;
84pub use self::client::ClientRequestSender;
85pub use self::client::NewClientRequest;
86pub use self::server::ServerEventStream;
87pub use self::server::ServerH3Command;
88pub use self::server::ServerH3Controller;
89pub use self::server::ServerH3Driver;
90pub use self::server::ServerH3Event;
91
92const DEFAULT_PRIO: h3::Priority = h3::Priority::new(3, true);
95
96#[cfg(not(any(test, debug_assertions)))]
99const STREAM_CAPACITY: usize = 16;
100#[cfg(any(test, debug_assertions))]
101const STREAM_CAPACITY: usize = 1; const FLOW_CAPACITY: usize = 2048;
106
107pub type OutboundFrameSender = PollSender<OutboundFrame>;
110
111type OutboundFrameStream = mpsc::Receiver<OutboundFrame>;
114
115type InboundFrameSender = PollSender<InboundFrame>;
118
119pub type InboundFrameStream = mpsc::Receiver<InboundFrame>;
122
123#[derive(Debug, PartialEq, Eq)]
128#[non_exhaustive]
129pub enum H3ConnectionError {
130 ControllerWentAway,
132 H3(h3::Error),
134 GoAway,
136 NonexistentStream,
138 PostAcceptTimeout,
141}
142
143impl From<h3::Error> for H3ConnectionError {
144 fn from(err: h3::Error) -> Self {
145 H3ConnectionError::H3(err)
146 }
147}
148
149impl From<quiche::Error> for H3ConnectionError {
150 fn from(err: quiche::Error) -> Self {
151 H3ConnectionError::H3(h3::Error::TransportError(err))
152 }
153}
154
155impl Error for H3ConnectionError {}
156
157impl fmt::Display for H3ConnectionError {
158 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
159 let s: &dyn fmt::Display = match self {
160 Self::ControllerWentAway => &"controller went away",
161 Self::H3(e) => e,
162 Self::GoAway => &"goaway",
163 Self::NonexistentStream => &"nonexistent stream",
164 Self::PostAcceptTimeout => &"post accept timeout hit",
165 };
166
167 write!(f, "H3ConnectionError: {s}")
168 }
169}
170
171type H3ConnectionResult<T> = Result<T, H3ConnectionError>;
172
173pub struct IncomingH3Headers {
178 pub stream_id: u64,
180 pub headers: Vec<h3::Header>,
182 pub send: OutboundFrameSender,
186 pub recv: InboundFrameStream,
188 pub read_fin: bool,
190 pub h3_audit_stats: Arc<H3AuditStats>,
192}
193
194impl fmt::Debug for IncomingH3Headers {
195 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
196 f.debug_struct("IncomingH3Headers")
197 .field("stream_id", &self.stream_id)
198 .field("headers", &self.headers)
199 .field("read_fin", &self.read_fin)
200 .field("h3_audit_stats", &self.h3_audit_stats)
201 .finish()
202 }
203}
204
205#[derive(Debug)]
211pub enum H3Event {
212 IncomingSettings {
214 settings: Vec<(u64, u64)>,
216 },
217
218 IncomingHeaders(IncomingH3Headers),
222
223 NewFlow {
226 flow_id: u64,
228 send: OutboundFrameSender,
230 recv: InboundFrameStream,
232 },
233 ResetStream { stream_id: u64 },
236 ConnectionError(h3::Error),
238 ConnectionShutdown(Option<H3ConnectionError>),
241 BodyBytesReceived {
243 stream_id: u64,
245 num_bytes: u64,
247 fin: bool,
249 },
250 StreamClosed { stream_id: u64 },
254}
255
256impl H3Event {
257 fn from_error(err: &H3ConnectionError) -> Option<Self> {
259 Some(match err {
260 H3ConnectionError::H3(e) => Self::ConnectionError(*e),
261 H3ConnectionError::PostAcceptTimeout => Self::ConnectionShutdown(
262 Some(H3ConnectionError::PostAcceptTimeout),
263 ),
264 _ => return None,
265 })
266 }
267}
268
269#[derive(Debug)]
275pub enum OutboundFrame {
276 Headers(Vec<h3::Header>, Option<quiche::h3::Priority>),
278 #[cfg(feature = "zero-copy")]
280 Body(crate::buf_factory::QuicheBuf, bool),
281 #[cfg(not(feature = "zero-copy"))]
283 Body(PooledBuf, bool),
284 Datagram(PooledDgram, u64),
286 PeerStreamError,
288 FlowShutdown { flow_id: u64, stream_id: u64 },
290}
291
292impl OutboundFrame {
293 pub fn body(body: PooledBuf, fin: bool) -> Self {
295 #[cfg(feature = "zero-copy")]
296 let body = crate::buf_factory::QuicheBuf::new(body);
297
298 OutboundFrame::Body(body, fin)
299 }
300}
301
302#[derive(Debug)]
306pub enum InboundFrame {
307 Body(PooledBuf, bool),
309 Datagram(PooledDgram),
311}
312
313pub struct H3Driver<H: DriverHooks> {
323 h3_config: h3::Config,
326 conn: Option<h3::Connection>,
329 hooks: H,
331 h3_event_sender: mpsc::UnboundedSender<H::Event>,
333 cmd_recv: mpsc::UnboundedReceiver<H::Command>,
335
336 stream_map: BTreeMap<u64, StreamCtx>,
339 flow_map: BTreeMap<u64, FlowCtx>,
342 waiting_streams: FuturesUnordered<WaitForStream>,
346
347 dgram_recv: OutboundFrameStream,
349 dgram_send: OutboundFrameSender,
351
352 pooled_buf: PooledBuf,
354 max_stream_seen: u64,
356
357 settings_received_and_forwarded: bool,
360}
361
362impl<H: DriverHooks> H3Driver<H> {
363 pub fn new(http3_settings: Http3Settings) -> (Self, H3Controller<H>) {
369 let (dgram_send, dgram_recv) = mpsc::channel(FLOW_CAPACITY);
370 let (cmd_sender, cmd_recv) = mpsc::unbounded_channel();
371 let (h3_event_sender, h3_event_recv) = mpsc::unbounded_channel();
372
373 (
374 H3Driver {
375 h3_config: (&http3_settings).into(),
376 conn: None,
377 hooks: H::new(&http3_settings),
378 h3_event_sender,
379 cmd_recv,
380
381 stream_map: BTreeMap::new(),
382 flow_map: BTreeMap::new(),
383
384 dgram_recv,
385 dgram_send: PollSender::new(dgram_send),
386 pooled_buf: BufFactory::get_max_buf(),
387 max_stream_seen: 0,
388
389 waiting_streams: FuturesUnordered::new(),
390
391 settings_received_and_forwarded: false,
392 },
393 H3Controller {
394 cmd_sender,
395 h3_event_recv: Some(h3_event_recv),
396 },
397 )
398 }
399
400 fn get_or_insert_flow(
403 &mut self, flow_id: u64,
404 ) -> H3ConnectionResult<&mut FlowCtx> {
405 use std::collections::btree_map::Entry;
406 Ok(match self.flow_map.entry(flow_id) {
407 Entry::Vacant(e) => {
408 let (flow, recv) = FlowCtx::new(FLOW_CAPACITY);
410 let flow_req = H3Event::NewFlow {
411 flow_id,
412 recv,
413 send: self.dgram_send.clone(),
414 };
415 self.h3_event_sender
416 .send(flow_req.into())
417 .map_err(|_| H3ConnectionError::ControllerWentAway)?;
418 e.insert(flow)
419 },
420 Entry::Occupied(e) => e.into_mut(),
421 })
422 }
423
424 fn insert_stream(&mut self, stream_id: u64, ctx: StreamCtx) {
426 self.stream_map.insert(stream_id, ctx);
427 self.max_stream_seen = self.max_stream_seen.max(stream_id);
428 }
429
430 fn process_h3_data(
433 &mut self, qconn: &mut QuicheConnection, stream_id: u64,
434 ) -> H3ConnectionResult<()> {
435 let conn = self.conn.as_mut().ok_or(Self::connection_not_present())?;
437 let ctx = self
438 .stream_map
439 .get_mut(&stream_id)
440 .ok_or(H3ConnectionError::NonexistentStream)?;
441
442 enum StreamStatus {
443 Done { close: bool },
444 Blocked,
445 }
446
447 let status = loop {
448 let Some(sender) = ctx.send.as_ref().and_then(PollSender::get_ref)
449 else {
450 break StreamStatus::Done { close: false };
452 };
453
454 let permit = match sender.try_reserve() {
455 Ok(permit) => permit,
456 Err(TrySendError::Closed(())) => {
457 break StreamStatus::Done {
458 close: ctx.fin_sent && ctx.fin_recv,
459 };
460 },
461 Err(TrySendError::Full(())) => {
462 if ctx.fin_recv || qconn.stream_readable(stream_id) {
463 break StreamStatus::Blocked;
464 }
465 break StreamStatus::Done { close: false };
466 },
467 };
468
469 if ctx.fin_recv {
470 permit
472 .send(InboundFrame::Body(BufFactory::get_empty_buf(), true));
473 break StreamStatus::Done {
474 close: ctx.fin_sent,
475 };
476 }
477
478 match conn.recv_body(qconn, stream_id, &mut self.pooled_buf) {
479 Ok(n) => {
480 let mut body = std::mem::replace(
481 &mut self.pooled_buf,
482 BufFactory::get_max_buf(),
483 );
484 body.truncate(n);
485
486 ctx.audit_stats.add_downstream_bytes_recvd(n as u64);
487 let event = H3Event::BodyBytesReceived {
488 stream_id,
489 num_bytes: n as u64,
490 fin: false,
491 };
492 let _ = self.h3_event_sender.send(event.into());
493
494 permit.send(InboundFrame::Body(body, false));
495 },
496 Err(h3::Error::Done) =>
497 break StreamStatus::Done { close: false },
498 Err(_) => break StreamStatus::Done { close: true },
499 }
500 };
501
502 match status {
503 StreamStatus::Done { close } => {
504 if close {
505 return self.finish_stream(qconn, stream_id, None, None);
506 }
507
508 if !ctx.fin_recv && qconn.stream_finished(stream_id) {
517 return self.process_h3_fin(qconn, stream_id);
518 }
519 },
520 StreamStatus::Blocked => {
521 self.waiting_streams.push(ctx.wait_for_send(stream_id));
522 },
523 }
524
525 Ok(())
526 }
527
528 fn process_h3_fin(
530 &mut self, qconn: &mut QuicheConnection, stream_id: u64,
531 ) -> H3ConnectionResult<()> {
532 let ctx = self.stream_map.get_mut(&stream_id).filter(|c| !c.fin_recv);
533 let Some(ctx) = ctx else {
534 return Ok(());
536 };
537
538 ctx.fin_recv = true;
539 ctx.audit_stats
540 .set_recvd_stream_fin(StreamClosureKind::Explicit);
541
542 let event = H3Event::BodyBytesReceived {
546 stream_id,
547 num_bytes: 0,
548 fin: true,
549 };
550 let _ = self.h3_event_sender.send(event.into());
551
552 self.process_h3_data(qconn, stream_id)
555 }
556
557 fn process_read_event(
561 &mut self, qconn: &mut QuicheConnection, stream_id: u64, event: h3::Event,
562 ) -> H3ConnectionResult<()> {
563 self.forward_settings()?;
564
565 match event {
566 h3::Event::Headers { list, more_frames } =>
568 H::headers_received(self, qconn, InboundHeaders {
569 stream_id,
570 headers: list,
571 has_body: more_frames,
572 }),
573
574 h3::Event::Data => self.process_h3_data(qconn, stream_id),
575 h3::Event::Finished => self.process_h3_fin(qconn, stream_id),
576
577 h3::Event::Reset(code) => {
578 if let Some(ctx) = self.stream_map.get(&stream_id) {
579 ctx.audit_stats.set_recvd_reset_stream_error_code(code as _);
580 }
581
582 self.h3_event_sender
583 .send(H3Event::ResetStream { stream_id }.into())
584 .map_err(|_| H3ConnectionError::ControllerWentAway)?;
585
586 self.finish_stream(qconn, stream_id, None, None)
587 },
588
589 h3::Event::PriorityUpdate => Ok(()),
590 h3::Event::GoAway => Err(H3ConnectionError::GoAway),
591 }
592 }
593
594 fn forward_settings(&mut self) -> H3ConnectionResult<()> {
600 if self.settings_received_and_forwarded {
601 return Ok(());
602 }
603
604 if let Some(settings) = self.conn_mut()?.peer_settings_raw() {
606 let incoming_settings = H3Event::IncomingSettings {
607 settings: settings.to_vec(),
608 };
609
610 self.h3_event_sender
611 .send(incoming_settings.into())
612 .map_err(|_| H3ConnectionError::ControllerWentAway)?;
613
614 self.settings_received_and_forwarded = true;
615 }
616 Ok(())
617 }
618
619 fn process_write_frame(
625 conn: &mut h3::Connection, qconn: &mut QuicheConnection,
626 ctx: &mut StreamCtx,
627 ) -> h3::Result<()> {
628 let Some(frame) = &mut ctx.queued_frame else {
629 return Ok(());
630 };
631
632 let audit_stats = &ctx.audit_stats;
633 let stream_id = audit_stats.stream_id();
634
635 match frame {
636 OutboundFrame::Headers(headers, priority) => {
637 let prio = priority.as_ref().unwrap_or(&DEFAULT_PRIO);
638
639 if ctx.initial_headers_sent {
640 conn.send_additional_headers_with_priority(
643 qconn, stream_id, headers, prio, false, false,
644 )
645 } else {
646 conn.send_response_with_priority(
648 qconn, stream_id, headers, prio, false,
649 )
650 .inspect(|_| ctx.initial_headers_sent = true)
651 }
652 },
653
654 OutboundFrame::Body(body, fin) => {
655 let len = body.as_ref().len();
656 if len == 0 && !*fin {
657 return Ok(());
660 }
661 if *fin {
662 ctx.recv.as_mut().expect("channel").close();
666 }
667 #[cfg(feature = "zero-copy")]
668 let n = conn.send_body_zc(qconn, stream_id, body, *fin)?;
669
670 #[cfg(not(feature = "zero-copy"))]
671 let n = conn.send_body(qconn, stream_id, body, *fin)?;
672
673 audit_stats.add_downstream_bytes_sent(n as _);
674 if n != len {
675 #[cfg(not(feature = "zero-copy"))]
678 body.pop_front(n);
679
680 Err(h3::Error::StreamBlocked)
681 } else {
682 if *fin {
683 ctx.fin_sent = true;
684 audit_stats
685 .set_sent_stream_fin(StreamClosureKind::Explicit);
686 if ctx.fin_recv {
687 return Err(h3::Error::TransportError(
690 quiche::Error::Done,
691 ));
692 }
693 }
694 Ok(())
695 }
696 },
697
698 OutboundFrame::PeerStreamError => Err(h3::Error::MessageError),
699
700 OutboundFrame::FlowShutdown { .. } => {
701 unreachable!("Only flows send shutdowns")
702 },
703
704 OutboundFrame::Datagram(..) => {
705 unreachable!("Only flows send datagrams")
706 },
707 }
708 }
709
710 fn upstream_ready(
718 &mut self, qconn: &mut QuicheConnection, ready: StreamReady,
719 ) -> H3ConnectionResult<()> {
720 match ready {
721 StreamReady::Downstream(r) => self.upstream_read_ready(qconn, r),
722 StreamReady::Upstream(r) => self.upstream_write_ready(qconn, r),
723 }
724 }
725
726 fn upstream_read_ready(
727 &mut self, qconn: &mut QuicheConnection,
728 read_ready: ReceivedDownstreamData,
729 ) -> H3ConnectionResult<()> {
730 let ReceivedDownstreamData {
731 stream_id,
732 chan,
733 data,
734 } = read_ready;
735
736 match self.stream_map.get_mut(&stream_id) {
737 None => Ok(()),
738 Some(stream) => {
739 stream.recv = Some(chan);
740 stream.queued_frame = data;
741 self.process_writable_stream(qconn, stream_id)
742 },
743 }
744 }
745
746 fn upstream_write_ready(
747 &mut self, qconn: &mut QuicheConnection,
748 write_ready: HaveUpstreamCapacity,
749 ) -> H3ConnectionResult<()> {
750 let HaveUpstreamCapacity {
751 stream_id,
752 mut chan,
753 } = write_ready;
754
755 match self.stream_map.get_mut(&stream_id) {
756 None => Ok(()),
757 Some(stream) => {
758 chan.abort_send(); stream.send = Some(chan);
760 self.process_h3_data(qconn, stream_id)
761 },
762 }
763 }
764
765 fn dgram_ready(
767 &mut self, qconn: &mut QuicheConnection, frame: OutboundFrame,
768 ) -> H3ConnectionResult<()> {
769 let mut frame = Ok(frame);
770
771 loop {
772 match frame {
773 Ok(OutboundFrame::Datagram(dgram, flow_id)) => {
774 let _ = datagram::send_h3_dgram(qconn, flow_id, dgram);
776 },
777 Ok(OutboundFrame::FlowShutdown { flow_id, stream_id }) => {
778 self.finish_stream(
779 qconn,
780 stream_id,
781 Some(quiche::h3::WireErrorCode::NoError as u64),
782 Some(quiche::h3::WireErrorCode::NoError as u64),
783 )?;
784 self.flow_map.remove(&flow_id);
785 break;
786 },
787 Ok(_) => unreachable!("Flows can't send frame of other types"),
788 Err(TryRecvError::Empty) => break,
789 Err(TryRecvError::Disconnected) =>
790 return Err(H3ConnectionError::ControllerWentAway),
791 }
792
793 frame = self.dgram_recv.try_recv();
794 }
795
796 Ok(())
797 }
798
799 fn conn_mut(&mut self) -> H3ConnectionResult<&mut h3::Connection> {
804 self.conn.as_mut().ok_or(Self::connection_not_present())
805 }
806
807 const fn connection_not_present() -> H3ConnectionError {
811 H3ConnectionError::H3(h3::Error::TransportError(quiche::Error::TlsFail))
812 }
813
814 fn finish_stream(
818 &mut self, qconn: &mut QuicheConnection, stream_id: u64,
819 read: Option<u64>, write: Option<u64>,
820 ) -> H3ConnectionResult<()> {
821 let Some(stream_ctx) = self.stream_map.remove(&stream_id) else {
822 return Ok(());
823 };
824
825 let audit_stats = &stream_ctx.audit_stats;
826
827 if let Some(err) = read {
828 audit_stats.set_sent_stop_sending_error_code(err as _);
829 let _ = qconn.stream_shutdown(stream_id, quiche::Shutdown::Read, err);
830 }
831
832 if let Some(err) = write {
833 audit_stats.set_sent_reset_stream_error_code(err as _);
834 let _ =
835 qconn.stream_shutdown(stream_id, quiche::Shutdown::Write, err);
836 }
837
838 for pending in self.waiting_streams.iter_mut() {
840 match pending {
841 WaitForStream::Downstream(WaitForDownstreamData {
842 stream_id: id,
843 chan: Some(chan),
844 }) if stream_id == *id => {
845 chan.close();
846 },
847 WaitForStream::Upstream(WaitForUpstreamCapacity {
848 stream_id: id,
849 chan: Some(chan),
850 }) if stream_id == *id => {
851 chan.close();
852 },
853 _ => {},
854 }
855 }
856
857 if let Some(mapped_flow_id) = stream_ctx.associated_dgram_flow_id {
860 self.flow_map.remove(&mapped_flow_id);
861 }
862
863 if qconn.is_server() {
864 let _ = self
866 .h3_event_sender
867 .send(H3Event::StreamClosed { stream_id }.into());
868 }
869
870 Ok(())
871 }
872
873 fn handle_core_command(
876 &mut self, qconn: &mut QuicheConnection, cmd: H3Command,
877 ) -> H3ConnectionResult<()> {
878 match cmd {
879 H3Command::QuicCmd(cmd) => cmd.execute(qconn),
880 H3Command::GoAway => {
881 let max_id = self.max_stream_seen;
882 self.conn_mut()
883 .expect("connection should be established")
884 .send_goaway(qconn, max_id)?;
885 },
886 }
887 Ok(())
888 }
889}
890
891impl<H: DriverHooks> H3Driver<H> {
892 fn process_available_dgrams(
895 &mut self, qconn: &mut QuicheConnection,
896 ) -> H3ConnectionResult<()> {
897 loop {
898 match datagram::receive_h3_dgram(qconn) {
899 Ok((flow_id, dgram)) => {
900 self.get_or_insert_flow(flow_id)?.send_best_effort(dgram);
901 },
902 Err(quiche::Error::Done) => return Ok(()),
903 Err(err) => return Err(H3ConnectionError::from(err)),
904 }
905 }
906 }
907
908 fn process_writable_stream(
911 &mut self, qconn: &mut QuicheConnection, stream_id: u64,
912 ) -> H3ConnectionResult<()> {
913 let conn = self.conn.as_mut().ok_or(Self::connection_not_present())?;
915 let Some(ctx) = self.stream_map.get_mut(&stream_id) else {
916 return Ok(()); };
918
919 loop {
920 match Self::process_write_frame(conn, qconn, ctx) {
923 Ok(()) => ctx.queued_frame = None,
924 Err(h3::Error::StreamBlocked | h3::Error::Done) => break,
925 Err(h3::Error::MessageError) => {
926 return self.finish_stream(
927 qconn,
928 stream_id,
929 Some(quiche::h3::WireErrorCode::MessageError as u64),
930 Some(quiche::h3::WireErrorCode::MessageError as u64),
931 );
932 },
933 Err(h3::Error::TransportError(quiche::Error::StreamStopped(
934 e,
935 ))) => {
936 ctx.audit_stats.set_recvd_stop_sending_error_code(e as i64);
937 return self.finish_stream(qconn, stream_id, Some(e), None);
938 },
939 Err(h3::Error::TransportError(
940 quiche::Error::InvalidStreamState(stream),
941 )) => {
942 return self.finish_stream(qconn, stream, None, None);
943 },
944 Err(_) => {
945 return self.finish_stream(qconn, stream_id, None, None);
946 },
947 }
948
949 let Some(recv) = ctx.recv.as_mut() else {
950 return Ok(()); };
952
953 match recv.try_recv() {
959 Ok(frame) => ctx.queued_frame = Some(frame),
960 Err(TryRecvError::Disconnected) => break,
961 Err(TryRecvError::Empty) => {
962 self.waiting_streams.push(ctx.wait_for_recv(stream_id));
963 break;
964 },
965 }
966 }
967
968 Ok(())
969 }
970
971 fn record_quiche_error(qconn: &mut QuicheConnection, metrics: &impl Metrics) {
974 if let Some(err) = qconn.local_error() {
976 if err.is_app {
977 metrics.local_h3_conn_close_error_count(err.error_code.into())
978 } else {
979 metrics.local_quic_conn_close_error_count(err.error_code.into())
980 }
981 .inc();
982 } else if let Some(err) = qconn.peer_error() {
983 if err.is_app {
984 metrics.peer_h3_conn_close_error_count(err.error_code.into())
985 } else {
986 metrics.peer_quic_conn_close_error_count(err.error_code.into())
987 }
988 .inc();
989 }
990 }
991}
992
993impl<H: DriverHooks> ApplicationOverQuic for H3Driver<H> {
994 fn on_conn_established(
995 &mut self, quiche_conn: &mut QuicheConnection,
996 handshake_info: &HandshakeInfo,
997 ) -> QuicResult<()> {
998 let conn = h3::Connection::with_transport(quiche_conn, &self.h3_config)?;
999 self.conn = Some(conn);
1000
1001 H::conn_established(self, quiche_conn, handshake_info)?;
1002 Ok(())
1003 }
1004
1005 #[inline]
1006 fn should_act(&self) -> bool {
1007 self.conn.is_some()
1008 }
1009
1010 #[inline]
1011 fn buffer(&mut self) -> &mut [u8] {
1012 &mut self.pooled_buf
1013 }
1014
1015 fn process_reads(&mut self, qconn: &mut QuicheConnection) -> QuicResult<()> {
1021 loop {
1022 match self.conn_mut()?.poll(qconn) {
1023 Ok((stream_id, event)) =>
1024 self.process_read_event(qconn, stream_id, event)?,
1025 Err(h3::Error::Done) => break,
1026 Err(err) => {
1027 log::debug!("connection closed due to h3 protocol error"; "error"=>?err);
1031 return Ok(());
1032 },
1033 };
1034 }
1035
1036 self.process_available_dgrams(qconn)?;
1037 Ok(())
1038 }
1039
1040 fn process_writes(&mut self, qconn: &mut QuicheConnection) -> QuicResult<()> {
1044 while let Some(stream_id) = qconn.stream_writable_next() {
1045 self.process_writable_stream(qconn, stream_id)?;
1046 }
1047
1048 while let Some(Some(ready)) = self.waiting_streams.next().now_or_never() {
1050 self.upstream_ready(qconn, ready)?;
1051 }
1052
1053 Ok(())
1054 }
1055
1056 fn on_conn_close<M: Metrics>(
1059 &mut self, quiche_conn: &mut QuicheConnection, metrics: &M,
1060 work_loop_result: &QuicResult<()>,
1061 ) {
1062 let max_stream_seen = self.max_stream_seen;
1063 metrics
1064 .maximum_writable_streams()
1065 .observe(max_stream_seen as f64);
1066
1067 let Err(work_loop_error) = work_loop_result else {
1068 return;
1069 };
1070
1071 Self::record_quiche_error(quiche_conn, metrics);
1072
1073 let Some(h3_err) = work_loop_error.downcast_ref::<H3ConnectionError>()
1074 else {
1075 log::error!("Found non-H3ConnectionError"; "error" => %work_loop_error);
1076 return;
1077 };
1078
1079 if matches!(h3_err, H3ConnectionError::ControllerWentAway) {
1080 let _ =
1082 quiche_conn.close(true, h3::WireErrorCode::NoError as u64, &[]);
1083 return;
1084 }
1085
1086 if let Some(ev) = H3Event::from_error(h3_err) {
1087 let _ = self.h3_event_sender.send(ev.into());
1088 #[expect(clippy::needless_return)]
1089 return; }
1091 }
1092
1093 #[inline]
1096 async fn wait_for_data(
1097 &mut self, qconn: &mut QuicheConnection,
1098 ) -> QuicResult<()> {
1099 select! {
1100 biased;
1101 Some(ready) = self.waiting_streams.next() => self.upstream_ready(qconn, ready),
1102 Some(dgram) = self.dgram_recv.recv() => self.dgram_ready(qconn, dgram),
1103 Some(cmd) = self.cmd_recv.recv() => H::conn_command(self, qconn, cmd),
1104 r = self.hooks.wait_for_action(qconn), if H::has_wait_action(self) => r,
1105 }?;
1106
1107 if let Ok(cmd) = self.cmd_recv.try_recv() {
1111 H::conn_command(self, qconn, cmd)?;
1112 }
1113
1114 Ok(())
1115 }
1116}
1117
1118impl<H: DriverHooks> Drop for H3Driver<H> {
1119 fn drop(&mut self) {
1120 for stream in self.stream_map.values() {
1121 stream
1122 .audit_stats
1123 .set_recvd_stream_fin(StreamClosureKind::Implicit);
1124 }
1125 }
1126}
1127
1128#[derive(Debug)]
1134pub enum H3Command {
1135 QuicCmd(QuicCommand),
1138 GoAway,
1141}
1142
1143pub struct RequestSender<C, T> {
1146 sender: UnboundedSender<C>,
1147 _r: PhantomData<fn() -> T>,
1149}
1150
1151impl<C, T: Into<C>> RequestSender<C, T> {
1152 #[inline(always)]
1155 pub fn send(&self, v: T) -> Result<(), mpsc::error::SendError<C>> {
1156 self.sender.send(v.into())
1157 }
1158}
1159
1160impl<C, T> Clone for RequestSender<C, T> {
1161 fn clone(&self) -> Self {
1162 Self {
1163 sender: self.sender.clone(),
1164 _r: Default::default(),
1165 }
1166 }
1167}
1168
1169pub struct H3Controller<H: DriverHooks> {
1177 cmd_sender: UnboundedSender<H::Command>,
1180 h3_event_recv: Option<UnboundedReceiver<H::Event>>,
1183}
1184
1185impl<H: DriverHooks> H3Controller<H> {
1186 pub fn event_receiver_mut(&mut self) -> &mut UnboundedReceiver<H::Event> {
1189 self.h3_event_recv
1190 .as_mut()
1191 .expect("No event receiver on H3Controller")
1192 }
1193
1194 pub fn take_event_receiver(&mut self) -> UnboundedReceiver<H::Event> {
1196 self.h3_event_recv
1197 .take()
1198 .expect("No event receiver on H3Controller")
1199 }
1200
1201 pub fn cmd_sender(&self) -> RequestSender<H::Command, QuicCommand> {
1203 RequestSender {
1204 sender: self.cmd_sender.clone(),
1205 _r: Default::default(),
1206 }
1207 }
1208
1209 pub fn send_goaway(&self) {
1211 let _ = self.cmd_sender.send(H3Command::GoAway.into());
1212 }
1213}