1mod client;
28pub mod connection;
30mod datagram;
31mod hooks;
34mod server;
35mod streams;
36
37use std::collections::BTreeMap;
38use std::error::Error;
39use std::fmt;
40use std::marker::PhantomData;
41use std::sync::Arc;
42
43use datagram_socket::StreamClosureKind;
44use foundations::telemetry::log;
45use futures::FutureExt;
46use futures_util::stream::FuturesUnordered;
47use quiche::h3;
48use tokio::select;
49use tokio::sync::mpsc;
50use tokio::sync::mpsc::error::TryRecvError;
51use tokio::sync::mpsc::error::TrySendError;
52use tokio::sync::mpsc::UnboundedReceiver;
53use tokio::sync::mpsc::UnboundedSender;
54use tokio_stream::StreamExt;
55use tokio_util::sync::PollSender;
56
57use self::hooks::DriverHooks;
58use self::hooks::InboundHeaders;
59use self::streams::FlowCtx;
60use self::streams::HaveUpstreamCapacity;
61use self::streams::ReceivedDownstreamData;
62use self::streams::StreamCtx;
63use self::streams::StreamReady;
64use self::streams::WaitForDownstreamData;
65use self::streams::WaitForStream;
66use self::streams::WaitForUpstreamCapacity;
67use crate::buf_factory::BufFactory;
68use crate::buf_factory::PooledBuf;
69use crate::buf_factory::PooledDgram;
70use crate::http3::settings::Http3Settings;
71use crate::http3::H3AuditStats;
72use crate::metrics::Metrics;
73use crate::quic::HandshakeInfo;
74use crate::quic::QuicCommand;
75use crate::quic::QuicheConnection;
76use crate::ApplicationOverQuic;
77use crate::QuicResult;
78
79pub use self::client::ClientEventStream;
80pub use self::client::ClientH3Command;
81pub use self::client::ClientH3Controller;
82pub use self::client::ClientH3Driver;
83pub use self::client::ClientH3Event;
84pub use self::client::ClientRequestSender;
85pub use self::client::NewClientRequest;
86pub use self::server::ServerEventStream;
87pub use self::server::ServerH3Command;
88pub use self::server::ServerH3Controller;
89pub use self::server::ServerH3Driver;
90pub use self::server::ServerH3Event;
91
92const DEFAULT_PRIO: h3::Priority = h3::Priority::new(3, true);
95
96#[cfg(not(any(test, debug_assertions)))]
99const STREAM_CAPACITY: usize = 16;
100#[cfg(any(test, debug_assertions))]
101const STREAM_CAPACITY: usize = 1; const FLOW_CAPACITY: usize = 2048;
106
107pub type OutboundFrameSender = PollSender<OutboundFrame>;
110
111type OutboundFrameStream = mpsc::Receiver<OutboundFrame>;
114
115type InboundFrameSender = PollSender<InboundFrame>;
118
119pub type InboundFrameStream = mpsc::Receiver<InboundFrame>;
122
123#[derive(Debug, PartialEq, Eq)]
128#[non_exhaustive]
129pub enum H3ConnectionError {
130 ControllerWentAway,
132 H3(h3::Error),
134 GoAway,
136 NonexistentStream,
138 PostAcceptTimeout,
141}
142
143impl From<h3::Error> for H3ConnectionError {
144 fn from(err: h3::Error) -> Self {
145 H3ConnectionError::H3(err)
146 }
147}
148
149impl From<quiche::Error> for H3ConnectionError {
150 fn from(err: quiche::Error) -> Self {
151 H3ConnectionError::H3(h3::Error::TransportError(err))
152 }
153}
154
155impl Error for H3ConnectionError {}
156
157impl fmt::Display for H3ConnectionError {
158 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
159 let s: &dyn fmt::Display = match self {
160 Self::ControllerWentAway => &"controller went away",
161 Self::H3(e) => e,
162 Self::GoAway => &"goaway",
163 Self::NonexistentStream => &"nonexistent stream",
164 Self::PostAcceptTimeout => &"post accept timeout hit",
165 };
166
167 write!(f, "H3ConnectionError: {s}")
168 }
169}
170
171type H3ConnectionResult<T> = Result<T, H3ConnectionError>;
172
173#[derive(Debug)]
178pub struct IncomingH3Headers {
179 pub stream_id: u64,
181 pub headers: Vec<h3::Header>,
183 pub send: OutboundFrameSender,
187 pub recv: InboundFrameStream,
189 pub read_fin: bool,
191 pub h3_audit_stats: Arc<H3AuditStats>,
193}
194
195#[derive(Debug)]
201pub enum H3Event {
202 IncomingSettings {
204 settings: Vec<(u64, u64)>,
206 },
207
208 IncomingHeaders(IncomingH3Headers),
212
213 NewFlow {
216 flow_id: u64,
218 send: OutboundFrameSender,
220 recv: InboundFrameStream,
222 },
223 ResetStream { stream_id: u64 },
226 ConnectionError(h3::Error),
228 ConnectionShutdown(Option<H3ConnectionError>),
231 BodyBytesReceived {
233 stream_id: u64,
235 num_bytes: u64,
237 fin: bool,
239 },
240 StreamClosed { stream_id: u64 },
244}
245
246impl H3Event {
247 fn from_error(err: &H3ConnectionError) -> Option<Self> {
249 Some(match err {
250 H3ConnectionError::H3(e) => Self::ConnectionError(*e),
251 H3ConnectionError::PostAcceptTimeout => Self::ConnectionShutdown(
252 Some(H3ConnectionError::PostAcceptTimeout),
253 ),
254 _ => return None,
255 })
256 }
257}
258
259#[derive(Debug)]
265pub enum OutboundFrame {
266 Headers(Vec<h3::Header>),
268 #[cfg(feature = "zero-copy")]
270 Body(crate::buf_factory::QuicheBuf, bool),
271 #[cfg(not(feature = "zero-copy"))]
273 Body(PooledBuf, bool),
274 Datagram(PooledDgram, u64),
276 PeerStreamError,
278 FlowShutdown { flow_id: u64, stream_id: u64 },
280}
281
282impl OutboundFrame {
283 pub fn body(body: PooledBuf, fin: bool) -> Self {
285 #[cfg(feature = "zero-copy")]
286 let body = crate::buf_factory::QuicheBuf::new(body);
287
288 OutboundFrame::Body(body, fin)
289 }
290}
291
292#[derive(Debug)]
296pub enum InboundFrame {
297 Body(PooledBuf, bool),
299 Datagram(PooledDgram),
301}
302
303pub struct H3Driver<H: DriverHooks> {
313 h3_config: h3::Config,
316 conn: Option<h3::Connection>,
319 hooks: H,
321 h3_event_sender: mpsc::UnboundedSender<H::Event>,
323 cmd_recv: mpsc::UnboundedReceiver<H::Command>,
325
326 stream_map: BTreeMap<u64, StreamCtx>,
329 flow_map: BTreeMap<u64, FlowCtx>,
332 waiting_streams: FuturesUnordered<WaitForStream>,
336
337 dgram_recv: OutboundFrameStream,
339 dgram_send: OutboundFrameSender,
341
342 pooled_buf: PooledBuf,
344 max_stream_seen: u64,
346
347 settings_received_and_forwarded: bool,
350}
351
352impl<H: DriverHooks> H3Driver<H> {
353 pub fn new(http3_settings: Http3Settings) -> (Self, H3Controller<H>) {
359 let (dgram_send, dgram_recv) = mpsc::channel(FLOW_CAPACITY);
360 let (cmd_sender, cmd_recv) = mpsc::unbounded_channel();
361 let (h3_event_sender, h3_event_recv) = mpsc::unbounded_channel();
362
363 (
364 H3Driver {
365 h3_config: (&http3_settings).into(),
366 conn: None,
367 hooks: H::new(&http3_settings),
368 h3_event_sender,
369 cmd_recv,
370
371 stream_map: BTreeMap::new(),
372 flow_map: BTreeMap::new(),
373
374 dgram_recv,
375 dgram_send: PollSender::new(dgram_send),
376 pooled_buf: BufFactory::get_max_buf(),
377 max_stream_seen: 0,
378
379 waiting_streams: FuturesUnordered::new(),
380
381 settings_received_and_forwarded: false,
382 },
383 H3Controller {
384 cmd_sender,
385 h3_event_recv: Some(h3_event_recv),
386 },
387 )
388 }
389
390 fn get_or_insert_flow(
393 &mut self, flow_id: u64,
394 ) -> H3ConnectionResult<&mut FlowCtx> {
395 use std::collections::btree_map::Entry;
396 Ok(match self.flow_map.entry(flow_id) {
397 Entry::Vacant(e) => {
398 let (flow, recv) = FlowCtx::new(FLOW_CAPACITY);
400 let flow_req = H3Event::NewFlow {
401 flow_id,
402 recv,
403 send: self.dgram_send.clone(),
404 };
405 self.h3_event_sender
406 .send(flow_req.into())
407 .map_err(|_| H3ConnectionError::ControllerWentAway)?;
408 e.insert(flow)
409 },
410 Entry::Occupied(e) => e.into_mut(),
411 })
412 }
413
414 fn insert_stream(&mut self, stream_id: u64, ctx: StreamCtx) {
416 self.stream_map.insert(stream_id, ctx);
417 self.max_stream_seen = self.max_stream_seen.max(stream_id);
418 }
419
420 fn process_h3_data(
423 &mut self, qconn: &mut QuicheConnection, stream_id: u64,
424 ) -> H3ConnectionResult<()> {
425 let conn = self.conn.as_mut().ok_or(Self::connection_not_present())?;
427 let ctx = self
428 .stream_map
429 .get_mut(&stream_id)
430 .ok_or(H3ConnectionError::NonexistentStream)?;
431
432 enum StreamStatus {
433 Done { close: bool },
434 Blocked,
435 }
436
437 let status = loop {
438 let Some(sender) = ctx.send.as_ref().and_then(PollSender::get_ref)
439 else {
440 break StreamStatus::Done { close: false };
442 };
443
444 let permit = match sender.try_reserve() {
445 Ok(permit) => permit,
446 Err(TrySendError::Closed(())) => {
447 break StreamStatus::Done {
448 close: ctx.fin_sent && ctx.fin_recv,
449 };
450 },
451 Err(TrySendError::Full(())) => {
452 if ctx.fin_recv || qconn.stream_readable(stream_id) {
453 break StreamStatus::Blocked;
454 }
455 break StreamStatus::Done { close: false };
456 },
457 };
458
459 if ctx.fin_recv {
460 permit
462 .send(InboundFrame::Body(BufFactory::get_empty_buf(), true));
463 break StreamStatus::Done {
464 close: ctx.fin_sent,
465 };
466 }
467
468 match conn.recv_body(qconn, stream_id, &mut self.pooled_buf) {
469 Ok(n) => {
470 let mut body = std::mem::replace(
471 &mut self.pooled_buf,
472 BufFactory::get_max_buf(),
473 );
474 body.truncate(n);
475
476 ctx.audit_stats.add_downstream_bytes_recvd(n as u64);
477 let event = H3Event::BodyBytesReceived {
478 stream_id,
479 num_bytes: n as u64,
480 fin: false,
481 };
482 let _ = self.h3_event_sender.send(event.into());
483
484 permit.send(InboundFrame::Body(body, false));
485 },
486 Err(h3::Error::Done) =>
487 break StreamStatus::Done { close: false },
488 Err(_) => break StreamStatus::Done { close: true },
489 }
490 };
491
492 match status {
493 StreamStatus::Done { close } => {
494 if close {
495 return self.finish_stream(qconn, stream_id, None, None);
496 }
497
498 if !ctx.fin_recv && qconn.stream_finished(stream_id) {
507 return self.process_h3_fin(qconn, stream_id);
508 }
509 },
510 StreamStatus::Blocked => {
511 self.waiting_streams.push(ctx.wait_for_send(stream_id));
512 },
513 }
514
515 Ok(())
516 }
517
518 fn process_h3_fin(
520 &mut self, qconn: &mut QuicheConnection, stream_id: u64,
521 ) -> H3ConnectionResult<()> {
522 let ctx = self.stream_map.get_mut(&stream_id).filter(|c| !c.fin_recv);
523 let Some(ctx) = ctx else {
524 return Ok(());
526 };
527
528 ctx.fin_recv = true;
529 ctx.audit_stats
530 .set_recvd_stream_fin(StreamClosureKind::Explicit);
531
532 let event = H3Event::BodyBytesReceived {
536 stream_id,
537 num_bytes: 0,
538 fin: true,
539 };
540 let _ = self.h3_event_sender.send(event.into());
541
542 self.process_h3_data(qconn, stream_id)
545 }
546
547 fn process_read_event(
551 &mut self, qconn: &mut QuicheConnection, stream_id: u64, event: h3::Event,
552 ) -> H3ConnectionResult<()> {
553 self.forward_settings()?;
554
555 match event {
556 #[cfg(not(feature = "gcongestion"))]
558 h3::Event::Headers { list, more_frames } =>
559 H::headers_received(self, qconn, InboundHeaders {
560 stream_id,
561 headers: list,
562 has_body: more_frames,
563 }),
564
565 #[cfg(feature = "gcongestion")]
566 h3::Event::Headers { list, has_body } =>
567 H::headers_received(self, qconn, InboundHeaders {
568 stream_id,
569 headers: list,
570 has_body,
571 }),
572
573 h3::Event::Data => self.process_h3_data(qconn, stream_id),
574 h3::Event::Finished => self.process_h3_fin(qconn, stream_id),
575
576 h3::Event::Reset(code) => {
577 if let Some(ctx) = self.stream_map.get(&stream_id) {
578 ctx.audit_stats.set_recvd_reset_stream_error_code(code as _);
579 }
580
581 self.h3_event_sender
582 .send(H3Event::ResetStream { stream_id }.into())
583 .map_err(|_| H3ConnectionError::ControllerWentAway)?;
584
585 self.finish_stream(qconn, stream_id, None, None)
586 },
587
588 h3::Event::PriorityUpdate => Ok(()),
589 h3::Event::GoAway => Err(H3ConnectionError::GoAway),
590 }
591 }
592
593 fn forward_settings(&mut self) -> H3ConnectionResult<()> {
599 if self.settings_received_and_forwarded {
600 return Ok(());
601 }
602
603 if let Some(settings) = self.conn_mut()?.peer_settings_raw() {
605 let incoming_settings = H3Event::IncomingSettings {
606 settings: settings.to_vec(),
607 };
608
609 self.h3_event_sender
610 .send(incoming_settings.into())
611 .map_err(|_| H3ConnectionError::ControllerWentAway)?;
612
613 self.settings_received_and_forwarded = true;
614 }
615 Ok(())
616 }
617
618 fn process_write_frame(
624 conn: &mut h3::Connection, qconn: &mut QuicheConnection,
625 ctx: &mut StreamCtx,
626 ) -> h3::Result<()> {
627 let Some(frame) = &mut ctx.queued_frame else {
628 return Ok(());
629 };
630
631 let audit_stats = &ctx.audit_stats;
632 let stream_id = audit_stats.stream_id();
633
634 match frame {
635 #[cfg(not(feature = "gcongestion"))]
637 OutboundFrame::Headers(headers) if ctx.initial_headers_sent => conn
638 .send_additional_headers(qconn, stream_id, headers, false, false),
639
640 OutboundFrame::Headers(headers) => conn
642 .send_response_with_priority(
643 qconn,
644 stream_id,
645 headers,
646 &DEFAULT_PRIO,
647 false,
648 )
649 .inspect(|_| ctx.initial_headers_sent = true),
650
651 OutboundFrame::Body(body, fin) => {
652 let len = body.as_ref().len();
653 if *fin {
654 ctx.recv.as_mut().expect("channel").close();
658 }
659 #[cfg(feature = "zero-copy")]
660 let n = conn.send_body_zc(qconn, stream_id, body, *fin)?;
661
662 #[cfg(not(feature = "zero-copy"))]
663 let n = conn.send_body(qconn, stream_id, body, *fin)?;
664
665 audit_stats.add_downstream_bytes_sent(n as _);
666 if n != len {
667 #[cfg(not(feature = "zero-copy"))]
670 body.pop_front(n);
671
672 Err(h3::Error::StreamBlocked)
673 } else {
674 if *fin {
675 ctx.fin_sent = true;
676 audit_stats
677 .set_sent_stream_fin(StreamClosureKind::Explicit);
678 if ctx.fin_recv {
679 return Err(h3::Error::TransportError(
682 quiche::Error::Done,
683 ));
684 }
685 }
686 Ok(())
687 }
688 },
689
690 OutboundFrame::PeerStreamError => Err(h3::Error::MessageError),
691
692 OutboundFrame::FlowShutdown { .. } => {
693 unreachable!("Only flows send shutdowns")
694 },
695
696 OutboundFrame::Datagram(..) => {
697 unreachable!("Only flows send datagrams")
698 },
699 }
700 }
701
702 fn upstream_ready(
710 &mut self, qconn: &mut QuicheConnection, ready: StreamReady,
711 ) -> H3ConnectionResult<()> {
712 match ready {
713 StreamReady::Downstream(r) => self.upstream_read_ready(qconn, r),
714 StreamReady::Upstream(r) => self.upstream_write_ready(qconn, r),
715 }
716 }
717
718 fn upstream_read_ready(
719 &mut self, qconn: &mut QuicheConnection,
720 read_ready: ReceivedDownstreamData,
721 ) -> H3ConnectionResult<()> {
722 let ReceivedDownstreamData {
723 stream_id,
724 chan,
725 data,
726 } = read_ready;
727
728 match self.stream_map.get_mut(&stream_id) {
729 None => Ok(()),
730 Some(stream) => {
731 stream.recv = Some(chan);
732 stream.queued_frame = data;
733 self.process_writable_stream(qconn, stream_id)
734 },
735 }
736 }
737
738 fn upstream_write_ready(
739 &mut self, qconn: &mut QuicheConnection,
740 write_ready: HaveUpstreamCapacity,
741 ) -> H3ConnectionResult<()> {
742 let HaveUpstreamCapacity {
743 stream_id,
744 mut chan,
745 } = write_ready;
746
747 match self.stream_map.get_mut(&stream_id) {
748 None => Ok(()),
749 Some(stream) => {
750 chan.abort_send(); stream.send = Some(chan);
752 self.process_h3_data(qconn, stream_id)
753 },
754 }
755 }
756
757 fn dgram_ready(
759 &mut self, qconn: &mut QuicheConnection, frame: OutboundFrame,
760 ) -> H3ConnectionResult<()> {
761 let mut frame = Ok(frame);
762
763 loop {
764 match frame {
765 Ok(OutboundFrame::Datagram(dgram, flow_id)) => {
766 let _ = datagram::send_h3_dgram(qconn, flow_id, dgram);
768 },
769 Ok(OutboundFrame::FlowShutdown { flow_id, stream_id }) => {
770 self.finish_stream(
771 qconn,
772 stream_id,
773 Some(quiche::h3::WireErrorCode::NoError as u64),
774 Some(quiche::h3::WireErrorCode::NoError as u64),
775 )?;
776 self.flow_map.remove(&flow_id);
777 break;
778 },
779 Ok(_) => unreachable!("Flows can't send frame of other types"),
780 Err(TryRecvError::Empty) => break,
781 Err(TryRecvError::Disconnected) =>
782 return Err(H3ConnectionError::ControllerWentAway),
783 }
784
785 frame = self.dgram_recv.try_recv();
786 }
787
788 Ok(())
789 }
790
791 fn conn_mut(&mut self) -> H3ConnectionResult<&mut h3::Connection> {
796 self.conn.as_mut().ok_or(Self::connection_not_present())
797 }
798
799 const fn connection_not_present() -> H3ConnectionError {
803 H3ConnectionError::H3(h3::Error::TransportError(quiche::Error::TlsFail))
804 }
805
806 fn finish_stream(
810 &mut self, qconn: &mut QuicheConnection, stream_id: u64,
811 read: Option<u64>, write: Option<u64>,
812 ) -> H3ConnectionResult<()> {
813 let Some(stream_ctx) = self.stream_map.remove(&stream_id) else {
814 return Ok(());
815 };
816
817 let audit_stats = &stream_ctx.audit_stats;
818
819 if let Some(err) = read {
820 audit_stats.set_sent_stop_sending_error_code(err as _);
821 let _ = qconn.stream_shutdown(stream_id, quiche::Shutdown::Read, err);
822 }
823
824 if let Some(err) = write {
825 audit_stats.set_sent_reset_stream_error_code(err as _);
826 let _ =
827 qconn.stream_shutdown(stream_id, quiche::Shutdown::Write, err);
828 }
829
830 for pending in self.waiting_streams.iter_mut() {
832 match pending {
833 WaitForStream::Downstream(WaitForDownstreamData {
834 stream_id: id,
835 chan: Some(chan),
836 }) if stream_id == *id => {
837 chan.close();
838 },
839 WaitForStream::Upstream(WaitForUpstreamCapacity {
840 stream_id: id,
841 chan: Some(chan),
842 }) if stream_id == *id => {
843 chan.close();
844 },
845 _ => {},
846 }
847 }
848
849 if let Some(mapped_flow_id) = stream_ctx.associated_dgram_flow_id {
852 self.flow_map.remove(&mapped_flow_id);
853 }
854
855 if qconn.is_server() {
856 let _ = self
858 .h3_event_sender
859 .send(H3Event::StreamClosed { stream_id }.into());
860 }
861
862 Ok(())
863 }
864
865 fn handle_core_command(
868 &mut self, qconn: &mut QuicheConnection, cmd: H3Command,
869 ) -> H3ConnectionResult<()> {
870 match cmd {
871 H3Command::QuicCmd(cmd) => cmd.execute(qconn),
872 H3Command::GoAway => {
873 let max_id = self.max_stream_seen;
874 self.conn_mut()
875 .expect("connection should be established")
876 .send_goaway(qconn, max_id)?;
877 },
878 }
879 Ok(())
880 }
881}
882
883impl<H: DriverHooks> H3Driver<H> {
884 fn process_available_dgrams(
887 &mut self, qconn: &mut QuicheConnection,
888 ) -> H3ConnectionResult<()> {
889 loop {
890 match datagram::receive_h3_dgram(qconn) {
891 Ok((flow_id, dgram)) => {
892 self.get_or_insert_flow(flow_id)?.send_best_effort(dgram);
893 },
894 Err(quiche::Error::Done) => return Ok(()),
895 Err(err) => return Err(H3ConnectionError::from(err)),
896 }
897 }
898 }
899
900 fn process_writable_stream(
903 &mut self, qconn: &mut QuicheConnection, stream_id: u64,
904 ) -> H3ConnectionResult<()> {
905 let conn = self.conn.as_mut().ok_or(Self::connection_not_present())?;
907 let Some(ctx) = self.stream_map.get_mut(&stream_id) else {
908 return Ok(()); };
910
911 loop {
912 match Self::process_write_frame(conn, qconn, ctx) {
915 Ok(()) => ctx.queued_frame = None,
916 Err(h3::Error::StreamBlocked | h3::Error::Done) => break,
917 Err(h3::Error::MessageError) => {
918 return self.finish_stream(
919 qconn,
920 stream_id,
921 Some(quiche::h3::WireErrorCode::MessageError as u64),
922 Some(quiche::h3::WireErrorCode::MessageError as u64),
923 );
924 },
925 Err(h3::Error::TransportError(quiche::Error::StreamStopped(
926 e,
927 ))) => {
928 ctx.audit_stats.set_recvd_stop_sending_error_code(e as i64);
929 return self.finish_stream(qconn, stream_id, Some(e), None);
930 },
931 Err(h3::Error::TransportError(
932 quiche::Error::InvalidStreamState(stream),
933 )) => {
934 return self.finish_stream(qconn, stream, None, None);
935 },
936 Err(_) => {
937 return self.finish_stream(qconn, stream_id, None, None);
938 },
939 }
940
941 let Some(recv) = ctx.recv.as_mut() else {
942 return Ok(()); };
944
945 match recv.try_recv() {
951 Ok(frame) => ctx.queued_frame = Some(frame),
952 Err(TryRecvError::Disconnected) => break,
953 Err(TryRecvError::Empty) => {
954 self.waiting_streams.push(ctx.wait_for_recv(stream_id));
955 break;
956 },
957 }
958 }
959
960 Ok(())
961 }
962
963 fn record_quiche_error(qconn: &mut QuicheConnection, metrics: &impl Metrics) {
966 if let Some(err) = qconn.local_error() {
968 if err.is_app {
969 metrics.local_h3_conn_close_error_count(err.error_code.into())
970 } else {
971 metrics.local_quic_conn_close_error_count(err.error_code.into())
972 }
973 .inc();
974 } else if let Some(err) = qconn.peer_error() {
975 if err.is_app {
976 metrics.peer_h3_conn_close_error_count(err.error_code.into())
977 } else {
978 metrics.peer_quic_conn_close_error_count(err.error_code.into())
979 }
980 .inc();
981 }
982 }
983}
984
985impl<H: DriverHooks> ApplicationOverQuic for H3Driver<H> {
986 fn on_conn_established(
987 &mut self, quiche_conn: &mut QuicheConnection,
988 handshake_info: &HandshakeInfo,
989 ) -> QuicResult<()> {
990 let conn = h3::Connection::with_transport(quiche_conn, &self.h3_config)?;
991 self.conn = Some(conn);
992
993 H::conn_established(self, quiche_conn, handshake_info)?;
994 Ok(())
995 }
996
997 #[inline]
998 fn should_act(&self) -> bool {
999 self.conn.is_some()
1000 }
1001
1002 #[inline]
1003 fn buffer(&mut self) -> &mut [u8] {
1004 &mut self.pooled_buf
1005 }
1006
1007 fn process_reads(&mut self, qconn: &mut QuicheConnection) -> QuicResult<()> {
1013 loop {
1014 match self.conn_mut()?.poll(qconn) {
1015 Ok((stream_id, event)) =>
1016 self.process_read_event(qconn, stream_id, event)?,
1017 Err(h3::Error::Done) => break,
1018 Err(err) => {
1019 log::debug!("connection closed due to h3 protocol error"; "error"=>?err);
1023 return Ok(());
1024 },
1025 };
1026 }
1027
1028 self.process_available_dgrams(qconn)?;
1029 Ok(())
1030 }
1031
1032 fn process_writes(&mut self, qconn: &mut QuicheConnection) -> QuicResult<()> {
1036 while let Some(stream_id) = qconn.stream_writable_next() {
1037 self.process_writable_stream(qconn, stream_id)?;
1038 }
1039
1040 while let Some(Some(ready)) = self.waiting_streams.next().now_or_never() {
1042 self.upstream_ready(qconn, ready)?;
1043 }
1044
1045 Ok(())
1046 }
1047
1048 fn on_conn_close<M: Metrics>(
1051 &mut self, quiche_conn: &mut QuicheConnection, metrics: &M,
1052 work_loop_result: &QuicResult<()>,
1053 ) {
1054 let max_stream_seen = self.max_stream_seen;
1055 metrics
1056 .maximum_writable_streams()
1057 .observe(max_stream_seen as f64);
1058
1059 let Err(work_loop_error) = work_loop_result else {
1060 return;
1061 };
1062
1063 Self::record_quiche_error(quiche_conn, metrics);
1064
1065 let Some(h3_err) = work_loop_error.downcast_ref::<H3ConnectionError>()
1066 else {
1067 log::error!("Found non-H3ConnectionError"; "error" => %work_loop_error);
1068 return;
1069 };
1070
1071 if matches!(h3_err, H3ConnectionError::ControllerWentAway) {
1072 let _ =
1074 quiche_conn.close(true, h3::WireErrorCode::NoError as u64, &[]);
1075 return;
1076 }
1077
1078 if let Some(ev) = H3Event::from_error(h3_err) {
1079 let _ = self.h3_event_sender.send(ev.into());
1080 #[expect(clippy::needless_return)]
1081 return; }
1083 }
1084
1085 #[inline]
1088 async fn wait_for_data(
1089 &mut self, qconn: &mut QuicheConnection,
1090 ) -> QuicResult<()> {
1091 select! {
1092 biased;
1093 Some(ready) = self.waiting_streams.next() => self.upstream_ready(qconn, ready),
1094 Some(dgram) = self.dgram_recv.recv() => self.dgram_ready(qconn, dgram),
1095 Some(cmd) = self.cmd_recv.recv() => H::conn_command(self, qconn, cmd),
1096 r = self.hooks.wait_for_action(qconn), if H::has_wait_action(self) => r,
1097 }?;
1098
1099 if let Ok(cmd) = self.cmd_recv.try_recv() {
1103 H::conn_command(self, qconn, cmd)?;
1104 }
1105
1106 Ok(())
1107 }
1108}
1109
1110impl<H: DriverHooks> Drop for H3Driver<H> {
1111 fn drop(&mut self) {
1112 for stream in self.stream_map.values() {
1113 stream
1114 .audit_stats
1115 .set_recvd_stream_fin(StreamClosureKind::Implicit);
1116 }
1117 }
1118}
1119
1120#[derive(Debug)]
1126pub enum H3Command {
1127 QuicCmd(QuicCommand),
1130 GoAway,
1133}
1134
1135pub struct RequestSender<C, T> {
1138 sender: UnboundedSender<C>,
1139 _r: PhantomData<fn() -> T>,
1141}
1142
1143impl<C, T: Into<C>> RequestSender<C, T> {
1144 #[inline(always)]
1147 pub fn send(&self, v: T) -> Result<(), mpsc::error::SendError<C>> {
1148 self.sender.send(v.into())
1149 }
1150}
1151
1152impl<C, T> Clone for RequestSender<C, T> {
1153 fn clone(&self) -> Self {
1154 Self {
1155 sender: self.sender.clone(),
1156 _r: Default::default(),
1157 }
1158 }
1159}
1160
1161pub struct H3Controller<H: DriverHooks> {
1169 cmd_sender: UnboundedSender<H::Command>,
1172 h3_event_recv: Option<UnboundedReceiver<H::Event>>,
1175}
1176
1177impl<H: DriverHooks> H3Controller<H> {
1178 pub fn event_receiver_mut(&mut self) -> &mut UnboundedReceiver<H::Event> {
1181 self.h3_event_recv
1182 .as_mut()
1183 .expect("No event receiver on H3Controller")
1184 }
1185
1186 pub fn take_event_receiver(&mut self) -> UnboundedReceiver<H::Event> {
1188 self.h3_event_recv
1189 .take()
1190 .expect("No event receiver on H3Controller")
1191 }
1192
1193 pub fn cmd_sender(&self) -> RequestSender<H::Command, QuicCommand> {
1195 RequestSender {
1196 sender: self.cmd_sender.clone(),
1197 _r: Default::default(),
1198 }
1199 }
1200
1201 pub fn send_goaway(&self) {
1203 let _ = self.cmd_sender.send(H3Command::GoAway.into());
1204 }
1205}