1mod client;
28pub mod connection;
30mod datagram;
31mod hooks;
34mod server;
35mod streams;
36
37use std::collections::BTreeMap;
38use std::error::Error;
39use std::fmt;
40use std::marker::PhantomData;
41use std::sync::Arc;
42use std::time::Instant;
43
44use datagram_socket::StreamClosureKind;
45use foundations::telemetry::log;
46use futures::FutureExt;
47use futures_util::stream::FuturesUnordered;
48use quiche::h3;
49use tokio::select;
50use tokio::sync::mpsc;
51use tokio::sync::mpsc::error::TryRecvError;
52use tokio::sync::mpsc::error::TrySendError;
53use tokio::sync::mpsc::UnboundedReceiver;
54use tokio::sync::mpsc::UnboundedSender;
55use tokio_stream::StreamExt;
56use tokio_util::sync::PollSender;
57
58use self::hooks::DriverHooks;
59use self::hooks::InboundHeaders;
60use self::streams::FlowCtx;
61use self::streams::HaveUpstreamCapacity;
62use self::streams::ReceivedDownstreamData;
63use self::streams::StreamCtx;
64use self::streams::StreamReady;
65use self::streams::WaitForDownstreamData;
66use self::streams::WaitForStream;
67use self::streams::WaitForUpstreamCapacity;
68use crate::buf_factory::BufFactory;
69use crate::buf_factory::PooledBuf;
70use crate::buf_factory::PooledDgram;
71use crate::http3::settings::Http3Settings;
72use crate::http3::H3AuditStats;
73use crate::metrics::Metrics;
74use crate::quic::HandshakeInfo;
75use crate::quic::QuicCommand;
76use crate::quic::QuicheConnection;
77use crate::ApplicationOverQuic;
78use crate::QuicResult;
79
80pub use self::client::ClientEventStream;
81pub use self::client::ClientH3Command;
82pub use self::client::ClientH3Controller;
83pub use self::client::ClientH3Driver;
84pub use self::client::ClientH3Event;
85pub use self::client::ClientRequestSender;
86pub use self::client::NewClientRequest;
87pub use self::server::ServerEventStream;
88pub use self::server::ServerH3Command;
89pub use self::server::ServerH3Controller;
90pub use self::server::ServerH3Driver;
91pub use self::server::ServerH3Event;
92
93const DEFAULT_PRIO: h3::Priority = h3::Priority::new(3, true);
96
97#[cfg(not(any(test, debug_assertions)))]
100const STREAM_CAPACITY: usize = 16;
101#[cfg(any(test, debug_assertions))]
102const STREAM_CAPACITY: usize = 1; const FLOW_CAPACITY: usize = 2048;
107
108pub type OutboundFrameSender = PollSender<OutboundFrame>;
111
112type OutboundFrameStream = mpsc::Receiver<OutboundFrame>;
115
116type InboundFrameSender = PollSender<InboundFrame>;
119
120pub type InboundFrameStream = mpsc::Receiver<InboundFrame>;
123
124#[derive(Debug, PartialEq, Eq)]
129#[non_exhaustive]
130pub enum H3ConnectionError {
131 ControllerWentAway,
133 H3(h3::Error),
135 GoAway,
137 NonexistentStream,
139 PostAcceptTimeout,
142}
143
144impl From<h3::Error> for H3ConnectionError {
145 fn from(err: h3::Error) -> Self {
146 H3ConnectionError::H3(err)
147 }
148}
149
150impl From<quiche::Error> for H3ConnectionError {
151 fn from(err: quiche::Error) -> Self {
152 H3ConnectionError::H3(h3::Error::TransportError(err))
153 }
154}
155
156impl Error for H3ConnectionError {}
157
158impl fmt::Display for H3ConnectionError {
159 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
160 let s: &dyn fmt::Display = match self {
161 Self::ControllerWentAway => &"controller went away",
162 Self::H3(e) => e,
163 Self::GoAway => &"goaway",
164 Self::NonexistentStream => &"nonexistent stream",
165 Self::PostAcceptTimeout => &"post accept timeout hit",
166 };
167
168 write!(f, "H3ConnectionError: {s}")
169 }
170}
171
172type H3ConnectionResult<T> = Result<T, H3ConnectionError>;
173
174pub struct IncomingH3Headers {
179 pub stream_id: u64,
181 pub headers: Vec<h3::Header>,
183 pub send: OutboundFrameSender,
187 pub recv: InboundFrameStream,
189 pub read_fin: bool,
191 pub h3_audit_stats: Arc<H3AuditStats>,
193}
194
195impl fmt::Debug for IncomingH3Headers {
196 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
197 f.debug_struct("IncomingH3Headers")
198 .field("stream_id", &self.stream_id)
199 .field("headers", &self.headers)
200 .field("read_fin", &self.read_fin)
201 .field("h3_audit_stats", &self.h3_audit_stats)
202 .finish()
203 }
204}
205
206#[derive(Debug)]
212pub enum H3Event {
213 IncomingSettings {
215 settings: Vec<(u64, u64)>,
217 },
218
219 IncomingHeaders(IncomingH3Headers),
223
224 NewFlow {
227 flow_id: u64,
229 send: OutboundFrameSender,
231 recv: InboundFrameStream,
233 },
234 ResetStream { stream_id: u64 },
237 ConnectionError(h3::Error),
239 ConnectionShutdown(Option<H3ConnectionError>),
242 BodyBytesReceived {
244 stream_id: u64,
246 num_bytes: u64,
248 fin: bool,
250 },
251 StreamClosed { stream_id: u64 },
255}
256
257impl H3Event {
258 fn from_error(err: &H3ConnectionError) -> Option<Self> {
260 Some(match err {
261 H3ConnectionError::H3(e) => Self::ConnectionError(*e),
262 H3ConnectionError::PostAcceptTimeout => Self::ConnectionShutdown(
263 Some(H3ConnectionError::PostAcceptTimeout),
264 ),
265 _ => return None,
266 })
267 }
268}
269
270#[derive(Debug)]
276pub enum OutboundFrame {
277 Headers(Vec<h3::Header>, Option<quiche::h3::Priority>),
279 #[cfg(feature = "zero-copy")]
281 Body(crate::buf_factory::QuicheBuf, bool),
282 #[cfg(not(feature = "zero-copy"))]
284 Body(PooledBuf, bool),
285 Datagram(PooledDgram, u64),
287 PeerStreamError,
289 FlowShutdown { flow_id: u64, stream_id: u64 },
291}
292
293impl OutboundFrame {
294 pub fn body(body: PooledBuf, fin: bool) -> Self {
296 #[cfg(feature = "zero-copy")]
297 let body = crate::buf_factory::QuicheBuf::new(body);
298
299 OutboundFrame::Body(body, fin)
300 }
301}
302
303#[derive(Debug)]
307pub enum InboundFrame {
308 Body(PooledBuf, bool),
310 Datagram(PooledDgram),
312}
313
314pub struct H3Driver<H: DriverHooks> {
324 h3_config: h3::Config,
327 conn: Option<h3::Connection>,
330 hooks: H,
332 h3_event_sender: mpsc::UnboundedSender<H::Event>,
334 cmd_recv: mpsc::UnboundedReceiver<H::Command>,
336
337 stream_map: BTreeMap<u64, StreamCtx>,
340 flow_map: BTreeMap<u64, FlowCtx>,
343 waiting_streams: FuturesUnordered<WaitForStream>,
347
348 dgram_recv: OutboundFrameStream,
350 dgram_send: OutboundFrameSender,
352
353 pooled_buf: PooledBuf,
355 max_stream_seen: u64,
357
358 settings_received_and_forwarded: bool,
361}
362
363impl<H: DriverHooks> H3Driver<H> {
364 pub fn new(http3_settings: Http3Settings) -> (Self, H3Controller<H>) {
370 let (dgram_send, dgram_recv) = mpsc::channel(FLOW_CAPACITY);
371 let (cmd_sender, cmd_recv) = mpsc::unbounded_channel();
372 let (h3_event_sender, h3_event_recv) = mpsc::unbounded_channel();
373
374 (
375 H3Driver {
376 h3_config: (&http3_settings).into(),
377 conn: None,
378 hooks: H::new(&http3_settings),
379 h3_event_sender,
380 cmd_recv,
381
382 stream_map: BTreeMap::new(),
383 flow_map: BTreeMap::new(),
384
385 dgram_recv,
386 dgram_send: PollSender::new(dgram_send),
387 pooled_buf: BufFactory::get_max_buf(),
388 max_stream_seen: 0,
389
390 waiting_streams: FuturesUnordered::new(),
391
392 settings_received_and_forwarded: false,
393 },
394 H3Controller {
395 cmd_sender,
396 h3_event_recv: Some(h3_event_recv),
397 },
398 )
399 }
400
401 fn get_or_insert_flow(
404 &mut self, flow_id: u64,
405 ) -> H3ConnectionResult<&mut FlowCtx> {
406 use std::collections::btree_map::Entry;
407 Ok(match self.flow_map.entry(flow_id) {
408 Entry::Vacant(e) => {
409 let (flow, recv) = FlowCtx::new(FLOW_CAPACITY);
411 let flow_req = H3Event::NewFlow {
412 flow_id,
413 recv,
414 send: self.dgram_send.clone(),
415 };
416 self.h3_event_sender
417 .send(flow_req.into())
418 .map_err(|_| H3ConnectionError::ControllerWentAway)?;
419 e.insert(flow)
420 },
421 Entry::Occupied(e) => e.into_mut(),
422 })
423 }
424
425 fn insert_stream(&mut self, stream_id: u64, ctx: StreamCtx) {
427 self.stream_map.insert(stream_id, ctx);
428 self.max_stream_seen = self.max_stream_seen.max(stream_id);
429 }
430
431 fn process_h3_data(
434 &mut self, qconn: &mut QuicheConnection, stream_id: u64,
435 ) -> H3ConnectionResult<()> {
436 let conn = self.conn.as_mut().ok_or(Self::connection_not_present())?;
438 let ctx = self
439 .stream_map
440 .get_mut(&stream_id)
441 .ok_or(H3ConnectionError::NonexistentStream)?;
442
443 enum StreamStatus {
444 Done { close: bool },
445 Blocked,
446 }
447
448 let status = loop {
449 let Some(sender) = ctx.send.as_ref().and_then(PollSender::get_ref)
450 else {
451 break StreamStatus::Done { close: false };
453 };
454
455 let permit = match sender.try_reserve() {
456 Ok(permit) => permit,
457 Err(TrySendError::Closed(())) => {
458 break StreamStatus::Done {
459 close: ctx.fin_sent && ctx.fin_recv,
460 };
461 },
462 Err(TrySendError::Full(())) => {
463 if ctx.fin_recv || qconn.stream_readable(stream_id) {
464 break StreamStatus::Blocked;
465 }
466 break StreamStatus::Done { close: false };
467 },
468 };
469
470 if ctx.fin_recv {
471 permit
473 .send(InboundFrame::Body(BufFactory::get_empty_buf(), true));
474 break StreamStatus::Done {
475 close: ctx.fin_sent,
476 };
477 }
478
479 match conn.recv_body(qconn, stream_id, &mut self.pooled_buf) {
480 Ok(n) => {
481 let mut body = std::mem::replace(
482 &mut self.pooled_buf,
483 BufFactory::get_max_buf(),
484 );
485 body.truncate(n);
486
487 ctx.audit_stats.add_downstream_bytes_recvd(n as u64);
488 let event = H3Event::BodyBytesReceived {
489 stream_id,
490 num_bytes: n as u64,
491 fin: false,
492 };
493 let _ = self.h3_event_sender.send(event.into());
494
495 permit.send(InboundFrame::Body(body, false));
496 },
497 Err(h3::Error::Done) =>
498 break StreamStatus::Done { close: false },
499 Err(_) => break StreamStatus::Done { close: true },
500 }
501 };
502
503 match status {
504 StreamStatus::Done { close } => {
505 if close {
506 return self.finish_stream(qconn, stream_id, None, None);
507 }
508
509 if !ctx.fin_recv && qconn.stream_finished(stream_id) {
518 return self.process_h3_fin(qconn, stream_id);
519 }
520 },
521 StreamStatus::Blocked => {
522 self.waiting_streams.push(ctx.wait_for_send(stream_id));
523 },
524 }
525
526 Ok(())
527 }
528
529 fn process_h3_fin(
531 &mut self, qconn: &mut QuicheConnection, stream_id: u64,
532 ) -> H3ConnectionResult<()> {
533 let ctx = self.stream_map.get_mut(&stream_id).filter(|c| !c.fin_recv);
534 let Some(ctx) = ctx else {
535 return Ok(());
537 };
538
539 ctx.fin_recv = true;
540 ctx.audit_stats
541 .set_recvd_stream_fin(StreamClosureKind::Explicit);
542
543 let event = H3Event::BodyBytesReceived {
547 stream_id,
548 num_bytes: 0,
549 fin: true,
550 };
551 let _ = self.h3_event_sender.send(event.into());
552
553 self.process_h3_data(qconn, stream_id)
556 }
557
558 fn process_read_event(
562 &mut self, qconn: &mut QuicheConnection, stream_id: u64, event: h3::Event,
563 ) -> H3ConnectionResult<()> {
564 self.forward_settings()?;
565
566 match event {
567 h3::Event::Headers { list, more_frames } =>
569 H::headers_received(self, qconn, InboundHeaders {
570 stream_id,
571 headers: list,
572 has_body: more_frames,
573 }),
574
575 h3::Event::Data => self.process_h3_data(qconn, stream_id),
576 h3::Event::Finished => self.process_h3_fin(qconn, stream_id),
577
578 h3::Event::Reset(code) => {
579 if let Some(ctx) = self.stream_map.get(&stream_id) {
580 ctx.audit_stats.set_recvd_reset_stream_error_code(code as _);
581 }
582
583 self.h3_event_sender
584 .send(H3Event::ResetStream { stream_id }.into())
585 .map_err(|_| H3ConnectionError::ControllerWentAway)?;
586
587 self.finish_stream(qconn, stream_id, None, None)
588 },
589
590 h3::Event::PriorityUpdate => Ok(()),
591 h3::Event::GoAway => Err(H3ConnectionError::GoAway),
592 }
593 }
594
595 fn forward_settings(&mut self) -> H3ConnectionResult<()> {
601 if self.settings_received_and_forwarded {
602 return Ok(());
603 }
604
605 if let Some(settings) = self.conn_mut()?.peer_settings_raw() {
607 let incoming_settings = H3Event::IncomingSettings {
608 settings: settings.to_vec(),
609 };
610
611 self.h3_event_sender
612 .send(incoming_settings.into())
613 .map_err(|_| H3ConnectionError::ControllerWentAway)?;
614
615 self.settings_received_and_forwarded = true;
616 }
617 Ok(())
618 }
619
620 fn process_write_frame(
626 conn: &mut h3::Connection, qconn: &mut QuicheConnection,
627 ctx: &mut StreamCtx,
628 ) -> h3::Result<()> {
629 let Some(frame) = &mut ctx.queued_frame else {
630 return Ok(());
631 };
632
633 let audit_stats = &ctx.audit_stats;
634 let stream_id = audit_stats.stream_id();
635
636 match frame {
637 OutboundFrame::Headers(headers, priority) => {
638 let prio = priority.as_ref().unwrap_or(&DEFAULT_PRIO);
639
640 let res = if ctx.initial_headers_sent {
641 conn.send_additional_headers_with_priority(
644 qconn, stream_id, headers, prio, false, false,
645 )
646 } else {
647 conn.send_response_with_priority(
649 qconn, stream_id, headers, prio, false,
650 )
651 .inspect(|_| ctx.initial_headers_sent = true)
652 };
653
654 if let Err(h3::Error::StreamBlocked) = res {
655 ctx.first_full_headers_flush_fail_time
656 .get_or_insert(Instant::now());
657 }
658
659 if res.is_ok() {
660 if let Some(first) =
661 ctx.first_full_headers_flush_fail_time.take()
662 {
663 ctx.audit_stats.add_header_flush_duration(
664 Instant::now().duration_since(first),
665 );
666 }
667 }
668
669 res
670 },
671
672 OutboundFrame::Body(body, fin) => {
673 let len = body.as_ref().len();
674 if len == 0 && !*fin {
675 return Ok(());
678 }
679 if *fin {
680 ctx.recv = None;
686 }
687 #[cfg(feature = "zero-copy")]
688 let n = conn.send_body_zc(qconn, stream_id, body, *fin)?;
689
690 #[cfg(not(feature = "zero-copy"))]
691 let n = conn.send_body(qconn, stream_id, body, *fin)?;
692
693 audit_stats.add_downstream_bytes_sent(n as _);
694 if n != len {
695 #[cfg(not(feature = "zero-copy"))]
698 body.pop_front(n);
699
700 Err(h3::Error::StreamBlocked)
701 } else {
702 if *fin {
703 ctx.fin_sent = true;
704 audit_stats
705 .set_sent_stream_fin(StreamClosureKind::Explicit);
706 if ctx.fin_recv {
707 return Err(h3::Error::TransportError(
710 quiche::Error::Done,
711 ));
712 }
713 }
714 Ok(())
715 }
716 },
717
718 OutboundFrame::PeerStreamError => Err(h3::Error::MessageError),
719
720 OutboundFrame::FlowShutdown { .. } => {
721 unreachable!("Only flows send shutdowns")
722 },
723
724 OutboundFrame::Datagram(..) => {
725 unreachable!("Only flows send datagrams")
726 },
727 }
728 }
729
730 fn upstream_ready(
738 &mut self, qconn: &mut QuicheConnection, ready: StreamReady,
739 ) -> H3ConnectionResult<()> {
740 match ready {
741 StreamReady::Downstream(r) => self.upstream_read_ready(qconn, r),
742 StreamReady::Upstream(r) => self.upstream_write_ready(qconn, r),
743 }
744 }
745
746 fn upstream_read_ready(
747 &mut self, qconn: &mut QuicheConnection,
748 read_ready: ReceivedDownstreamData,
749 ) -> H3ConnectionResult<()> {
750 let ReceivedDownstreamData {
751 stream_id,
752 chan,
753 data,
754 } = read_ready;
755
756 match self.stream_map.get_mut(&stream_id) {
757 None => Ok(()),
758 Some(stream) => {
759 stream.recv = Some(chan);
760 stream.queued_frame = data;
761 self.process_writable_stream(qconn, stream_id)
762 },
763 }
764 }
765
766 fn upstream_write_ready(
767 &mut self, qconn: &mut QuicheConnection,
768 write_ready: HaveUpstreamCapacity,
769 ) -> H3ConnectionResult<()> {
770 let HaveUpstreamCapacity {
771 stream_id,
772 mut chan,
773 } = write_ready;
774
775 match self.stream_map.get_mut(&stream_id) {
776 None => Ok(()),
777 Some(stream) => {
778 chan.abort_send(); stream.send = Some(chan);
780 self.process_h3_data(qconn, stream_id)
781 },
782 }
783 }
784
785 fn dgram_ready(
787 &mut self, qconn: &mut QuicheConnection, frame: OutboundFrame,
788 ) -> H3ConnectionResult<()> {
789 let mut frame = Ok(frame);
790
791 loop {
792 match frame {
793 Ok(OutboundFrame::Datagram(dgram, flow_id)) => {
794 let _ = datagram::send_h3_dgram(qconn, flow_id, dgram);
796 },
797 Ok(OutboundFrame::FlowShutdown { flow_id, stream_id }) => {
798 self.finish_stream(
799 qconn,
800 stream_id,
801 Some(quiche::h3::WireErrorCode::NoError as u64),
802 Some(quiche::h3::WireErrorCode::NoError as u64),
803 )?;
804 self.flow_map.remove(&flow_id);
805 break;
806 },
807 Ok(_) => unreachable!("Flows can't send frame of other types"),
808 Err(TryRecvError::Empty) => break,
809 Err(TryRecvError::Disconnected) =>
810 return Err(H3ConnectionError::ControllerWentAway),
811 }
812
813 frame = self.dgram_recv.try_recv();
814 }
815
816 Ok(())
817 }
818
819 fn conn_mut(&mut self) -> H3ConnectionResult<&mut h3::Connection> {
824 self.conn.as_mut().ok_or(Self::connection_not_present())
825 }
826
827 const fn connection_not_present() -> H3ConnectionError {
831 H3ConnectionError::H3(h3::Error::TransportError(quiche::Error::TlsFail))
832 }
833
834 fn finish_stream(
838 &mut self, qconn: &mut QuicheConnection, stream_id: u64,
839 read: Option<u64>, write: Option<u64>,
840 ) -> H3ConnectionResult<()> {
841 let Some(stream_ctx) = self.stream_map.remove(&stream_id) else {
842 return Ok(());
843 };
844
845 let audit_stats = &stream_ctx.audit_stats;
846
847 if let Some(err) = read {
848 audit_stats.set_sent_stop_sending_error_code(err as _);
849 let _ = qconn.stream_shutdown(stream_id, quiche::Shutdown::Read, err);
850 }
851
852 if let Some(err) = write {
853 audit_stats.set_sent_reset_stream_error_code(err as _);
854 let _ =
855 qconn.stream_shutdown(stream_id, quiche::Shutdown::Write, err);
856 }
857
858 for pending in self.waiting_streams.iter_mut() {
860 match pending {
861 WaitForStream::Downstream(WaitForDownstreamData {
862 stream_id: id,
863 chan: Some(chan),
864 }) if stream_id == *id => {
865 chan.close();
866 },
867 WaitForStream::Upstream(WaitForUpstreamCapacity {
868 stream_id: id,
869 chan: Some(chan),
870 }) if stream_id == *id => {
871 chan.close();
872 },
873 _ => {},
874 }
875 }
876
877 if let Some(mapped_flow_id) = stream_ctx.associated_dgram_flow_id {
880 self.flow_map.remove(&mapped_flow_id);
881 }
882
883 if qconn.is_server() {
884 let _ = self
886 .h3_event_sender
887 .send(H3Event::StreamClosed { stream_id }.into());
888 }
889
890 Ok(())
891 }
892
893 fn handle_core_command(
896 &mut self, qconn: &mut QuicheConnection, cmd: H3Command,
897 ) -> H3ConnectionResult<()> {
898 match cmd {
899 H3Command::QuicCmd(cmd) => cmd.execute(qconn),
900 H3Command::GoAway => {
901 let max_id = self.max_stream_seen;
902 self.conn_mut()
903 .expect("connection should be established")
904 .send_goaway(qconn, max_id)?;
905 },
906 }
907 Ok(())
908 }
909}
910
911impl<H: DriverHooks> H3Driver<H> {
912 fn process_available_dgrams(
915 &mut self, qconn: &mut QuicheConnection,
916 ) -> H3ConnectionResult<()> {
917 loop {
918 match datagram::receive_h3_dgram(qconn) {
919 Ok((flow_id, dgram)) => {
920 self.get_or_insert_flow(flow_id)?.send_best_effort(dgram);
921 },
922 Err(quiche::Error::Done) => return Ok(()),
923 Err(err) => return Err(H3ConnectionError::from(err)),
924 }
925 }
926 }
927
928 fn process_writable_stream(
931 &mut self, qconn: &mut QuicheConnection, stream_id: u64,
932 ) -> H3ConnectionResult<()> {
933 let conn = self.conn.as_mut().ok_or(Self::connection_not_present())?;
935 let Some(ctx) = self.stream_map.get_mut(&stream_id) else {
936 return Ok(()); };
938
939 loop {
940 match Self::process_write_frame(conn, qconn, ctx) {
943 Ok(()) => ctx.queued_frame = None,
944 Err(h3::Error::StreamBlocked | h3::Error::Done) => break,
945 Err(h3::Error::MessageError) => {
946 return self.finish_stream(
947 qconn,
948 stream_id,
949 Some(quiche::h3::WireErrorCode::MessageError as u64),
950 Some(quiche::h3::WireErrorCode::MessageError as u64),
951 );
952 },
953 Err(h3::Error::TransportError(quiche::Error::StreamStopped(
954 e,
955 ))) => {
956 ctx.audit_stats.set_recvd_stop_sending_error_code(e as i64);
957 return self.finish_stream(qconn, stream_id, Some(e), None);
958 },
959 Err(h3::Error::TransportError(
960 quiche::Error::InvalidStreamState(stream),
961 )) => {
962 return self.finish_stream(qconn, stream, None, None);
963 },
964 Err(_) => {
965 return self.finish_stream(qconn, stream_id, None, None);
966 },
967 }
968
969 let Some(recv) = ctx.recv.as_mut() else {
970 return Ok(());
973 };
974
975 match recv.try_recv() {
981 Ok(frame) => ctx.queued_frame = Some(frame),
982 Err(TryRecvError::Disconnected) => break,
983 Err(TryRecvError::Empty) => {
984 self.waiting_streams.push(ctx.wait_for_recv(stream_id));
985 break;
986 },
987 }
988 }
989
990 Ok(())
991 }
992
993 fn record_quiche_error(qconn: &mut QuicheConnection, metrics: &impl Metrics) {
996 if let Some(err) = qconn.local_error() {
998 if err.is_app {
999 metrics.local_h3_conn_close_error_count(err.error_code.into())
1000 } else {
1001 metrics.local_quic_conn_close_error_count(err.error_code.into())
1002 }
1003 .inc();
1004 } else if let Some(err) = qconn.peer_error() {
1005 if err.is_app {
1006 metrics.peer_h3_conn_close_error_count(err.error_code.into())
1007 } else {
1008 metrics.peer_quic_conn_close_error_count(err.error_code.into())
1009 }
1010 .inc();
1011 }
1012 }
1013}
1014
1015impl<H: DriverHooks> ApplicationOverQuic for H3Driver<H> {
1016 fn on_conn_established(
1017 &mut self, quiche_conn: &mut QuicheConnection,
1018 handshake_info: &HandshakeInfo,
1019 ) -> QuicResult<()> {
1020 let conn = h3::Connection::with_transport(quiche_conn, &self.h3_config)?;
1021 self.conn = Some(conn);
1022
1023 H::conn_established(self, quiche_conn, handshake_info)?;
1024 Ok(())
1025 }
1026
1027 #[inline]
1028 fn should_act(&self) -> bool {
1029 self.conn.is_some()
1030 }
1031
1032 #[inline]
1033 fn buffer(&mut self) -> &mut [u8] {
1034 &mut self.pooled_buf
1035 }
1036
1037 fn process_reads(&mut self, qconn: &mut QuicheConnection) -> QuicResult<()> {
1043 loop {
1044 match self.conn_mut()?.poll(qconn) {
1045 Ok((stream_id, event)) =>
1046 self.process_read_event(qconn, stream_id, event)?,
1047 Err(h3::Error::Done) => break,
1048 Err(err) => {
1049 log::debug!("connection closed due to h3 protocol error"; "error"=>?err);
1053 return Ok(());
1054 },
1055 };
1056 }
1057
1058 self.process_available_dgrams(qconn)?;
1059 Ok(())
1060 }
1061
1062 fn process_writes(&mut self, qconn: &mut QuicheConnection) -> QuicResult<()> {
1066 while let Some(stream_id) = qconn.stream_writable_next() {
1067 self.process_writable_stream(qconn, stream_id)?;
1068 }
1069
1070 while let Some(Some(ready)) = self.waiting_streams.next().now_or_never() {
1072 self.upstream_ready(qconn, ready)?;
1073 }
1074
1075 Ok(())
1076 }
1077
1078 fn on_conn_close<M: Metrics>(
1081 &mut self, quiche_conn: &mut QuicheConnection, metrics: &M,
1082 work_loop_result: &QuicResult<()>,
1083 ) {
1084 let max_stream_seen = self.max_stream_seen;
1085 metrics
1086 .maximum_writable_streams()
1087 .observe(max_stream_seen as f64);
1088
1089 let Err(work_loop_error) = work_loop_result else {
1090 return;
1091 };
1092
1093 Self::record_quiche_error(quiche_conn, metrics);
1094
1095 let Some(h3_err) = work_loop_error.downcast_ref::<H3ConnectionError>()
1096 else {
1097 log::error!("Found non-H3ConnectionError"; "error" => %work_loop_error);
1098 return;
1099 };
1100
1101 if matches!(h3_err, H3ConnectionError::ControllerWentAway) {
1102 let _ =
1104 quiche_conn.close(true, h3::WireErrorCode::NoError as u64, &[]);
1105 return;
1106 }
1107
1108 if let Some(ev) = H3Event::from_error(h3_err) {
1109 let _ = self.h3_event_sender.send(ev.into());
1110 #[expect(clippy::needless_return)]
1111 return; }
1113 }
1114
1115 #[inline]
1118 async fn wait_for_data(
1119 &mut self, qconn: &mut QuicheConnection,
1120 ) -> QuicResult<()> {
1121 select! {
1122 biased;
1123 Some(ready) = self.waiting_streams.next() => self.upstream_ready(qconn, ready),
1124 Some(dgram) = self.dgram_recv.recv() => self.dgram_ready(qconn, dgram),
1125 Some(cmd) = self.cmd_recv.recv() => H::conn_command(self, qconn, cmd),
1126 r = self.hooks.wait_for_action(qconn), if H::has_wait_action(self) => r,
1127 }?;
1128
1129 if let Ok(cmd) = self.cmd_recv.try_recv() {
1133 H::conn_command(self, qconn, cmd)?;
1134 }
1135
1136 Ok(())
1137 }
1138}
1139
1140impl<H: DriverHooks> Drop for H3Driver<H> {
1141 fn drop(&mut self) {
1142 for stream in self.stream_map.values() {
1143 stream
1144 .audit_stats
1145 .set_recvd_stream_fin(StreamClosureKind::Implicit);
1146 }
1147 }
1148}
1149
1150#[derive(Debug)]
1156pub enum H3Command {
1157 QuicCmd(QuicCommand),
1160 GoAway,
1163}
1164
1165pub struct RequestSender<C, T> {
1168 sender: UnboundedSender<C>,
1169 _r: PhantomData<fn() -> T>,
1171}
1172
1173impl<C, T: Into<C>> RequestSender<C, T> {
1174 #[inline(always)]
1177 pub fn send(&self, v: T) -> Result<(), mpsc::error::SendError<C>> {
1178 self.sender.send(v.into())
1179 }
1180}
1181
1182impl<C, T> Clone for RequestSender<C, T> {
1183 fn clone(&self) -> Self {
1184 Self {
1185 sender: self.sender.clone(),
1186 _r: Default::default(),
1187 }
1188 }
1189}
1190
1191pub struct H3Controller<H: DriverHooks> {
1199 cmd_sender: UnboundedSender<H::Command>,
1202 h3_event_recv: Option<UnboundedReceiver<H::Event>>,
1205}
1206
1207impl<H: DriverHooks> H3Controller<H> {
1208 pub fn event_receiver_mut(&mut self) -> &mut UnboundedReceiver<H::Event> {
1211 self.h3_event_recv
1212 .as_mut()
1213 .expect("No event receiver on H3Controller")
1214 }
1215
1216 pub fn take_event_receiver(&mut self) -> UnboundedReceiver<H::Event> {
1218 self.h3_event_recv
1219 .take()
1220 .expect("No event receiver on H3Controller")
1221 }
1222
1223 pub fn cmd_sender(&self) -> RequestSender<H::Command, QuicCommand> {
1225 RequestSender {
1226 sender: self.cmd_sender.clone(),
1227 _r: Default::default(),
1228 }
1229 }
1230
1231 pub fn send_goaway(&self) {
1233 let _ = self.cmd_sender.send(H3Command::GoAway.into());
1234 }
1235}