1mod client;
28pub mod connection;
30mod datagram;
31mod hooks;
34mod server;
35mod streams;
36
37use std::collections::BTreeMap;
38use std::error::Error;
39use std::fmt;
40use std::marker::PhantomData;
41use std::sync::Arc;
42
43use datagram_socket::StreamClosureKind;
44use foundations::telemetry::log;
45use futures::FutureExt;
46use futures_util::stream::FuturesUnordered;
47use quiche::h3;
48use tokio::select;
49use tokio::sync::mpsc;
50use tokio::sync::mpsc::error::TryRecvError;
51use tokio::sync::mpsc::error::TrySendError;
52use tokio::sync::mpsc::UnboundedReceiver;
53use tokio::sync::mpsc::UnboundedSender;
54use tokio_stream::StreamExt;
55use tokio_util::sync::PollSender;
56
57use self::hooks::DriverHooks;
58use self::hooks::InboundHeaders;
59use self::streams::FlowCtx;
60use self::streams::HaveUpstreamCapacity;
61use self::streams::ReceivedDownstreamData;
62use self::streams::StreamCtx;
63use self::streams::StreamReady;
64use self::streams::WaitForDownstreamData;
65use self::streams::WaitForStream;
66use self::streams::WaitForUpstreamCapacity;
67use crate::buf_factory::BufFactory;
68use crate::buf_factory::PooledBuf;
69use crate::buf_factory::PooledDgram;
70use crate::http3::settings::Http3Settings;
71use crate::http3::H3AuditStats;
72use crate::metrics::Metrics;
73use crate::quic::HandshakeInfo;
74use crate::quic::QuicCommand;
75use crate::quic::QuicheConnection;
76use crate::ApplicationOverQuic;
77use crate::QuicResult;
78
79pub use self::client::ClientEventStream;
80pub use self::client::ClientH3Command;
81pub use self::client::ClientH3Controller;
82pub use self::client::ClientH3Driver;
83pub use self::client::ClientH3Event;
84pub use self::client::ClientRequestSender;
85pub use self::client::NewClientRequest;
86pub use self::server::ServerEventStream;
87pub use self::server::ServerH3Command;
88pub use self::server::ServerH3Controller;
89pub use self::server::ServerH3Driver;
90pub use self::server::ServerH3Event;
91
92const DEFAULT_PRIO: h3::Priority = h3::Priority::new(3, true);
95
96#[cfg(not(any(test, debug_assertions)))]
99const STREAM_CAPACITY: usize = 16;
100#[cfg(any(test, debug_assertions))]
101const STREAM_CAPACITY: usize = 1; const FLOW_CAPACITY: usize = 2048;
106
107pub type OutboundFrameSender = PollSender<OutboundFrame>;
110
111type OutboundFrameStream = mpsc::Receiver<OutboundFrame>;
114
115type InboundFrameSender = PollSender<InboundFrame>;
118
119pub type InboundFrameStream = mpsc::Receiver<InboundFrame>;
122
123#[derive(Debug, PartialEq, Eq)]
128#[non_exhaustive]
129pub enum H3ConnectionError {
130 ControllerWentAway,
132 H3(h3::Error),
134 GoAway,
136 NonexistentStream,
138 PostAcceptTimeout,
141}
142
143impl From<h3::Error> for H3ConnectionError {
144 fn from(err: h3::Error) -> Self {
145 H3ConnectionError::H3(err)
146 }
147}
148
149impl From<quiche::Error> for H3ConnectionError {
150 fn from(err: quiche::Error) -> Self {
151 H3ConnectionError::H3(h3::Error::TransportError(err))
152 }
153}
154
155impl Error for H3ConnectionError {}
156
157impl fmt::Display for H3ConnectionError {
158 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
159 let s: &dyn fmt::Display = match self {
160 Self::ControllerWentAway => &"controller went away",
161 Self::H3(e) => e,
162 Self::GoAway => &"goaway",
163 Self::NonexistentStream => &"nonexistent stream",
164 Self::PostAcceptTimeout => &"post accept timeout hit",
165 };
166
167 write!(f, "H3ConnectionError: {s}")
168 }
169}
170
171type H3ConnectionResult<T> = Result<T, H3ConnectionError>;
172
173#[derive(Debug)]
178pub struct IncomingH3Headers {
179 pub stream_id: u64,
181 pub headers: Vec<h3::Header>,
183 pub send: OutboundFrameSender,
187 pub recv: InboundFrameStream,
189 pub read_fin: bool,
191 pub h3_audit_stats: Arc<H3AuditStats>,
193}
194
195#[derive(Debug)]
201pub enum H3Event {
202 IncomingSettings {
204 settings: Vec<(u64, u64)>,
206 },
207
208 IncomingHeaders(IncomingH3Headers),
212
213 NewFlow {
216 flow_id: u64,
218 send: OutboundFrameSender,
220 recv: InboundFrameStream,
222 },
223 ResetStream { stream_id: u64 },
226 ConnectionError(h3::Error),
228 ConnectionShutdown(Option<H3ConnectionError>),
231 BodyBytesReceived {
233 stream_id: u64,
235 num_bytes: u64,
237 fin: bool,
239 },
240 StreamClosed { stream_id: u64 },
244}
245
246impl H3Event {
247 fn from_error(err: &H3ConnectionError) -> Option<Self> {
249 Some(match err {
250 H3ConnectionError::H3(e) => Self::ConnectionError(*e),
251 H3ConnectionError::PostAcceptTimeout => Self::ConnectionShutdown(
252 Some(H3ConnectionError::PostAcceptTimeout),
253 ),
254 _ => return None,
255 })
256 }
257}
258
259#[derive(Debug)]
265pub enum OutboundFrame {
266 Headers(Vec<h3::Header>, Option<quiche::h3::Priority>),
268 #[cfg(feature = "zero-copy")]
270 Body(crate::buf_factory::QuicheBuf, bool),
271 #[cfg(not(feature = "zero-copy"))]
273 Body(PooledBuf, bool),
274 Datagram(PooledDgram, u64),
276 PeerStreamError,
278 FlowShutdown { flow_id: u64, stream_id: u64 },
280}
281
282impl OutboundFrame {
283 pub fn body(body: PooledBuf, fin: bool) -> Self {
285 #[cfg(feature = "zero-copy")]
286 let body = crate::buf_factory::QuicheBuf::new(body);
287
288 OutboundFrame::Body(body, fin)
289 }
290}
291
292#[derive(Debug)]
296pub enum InboundFrame {
297 Body(PooledBuf, bool),
299 Datagram(PooledDgram),
301}
302
303pub struct H3Driver<H: DriverHooks> {
313 h3_config: h3::Config,
316 conn: Option<h3::Connection>,
319 hooks: H,
321 h3_event_sender: mpsc::UnboundedSender<H::Event>,
323 cmd_recv: mpsc::UnboundedReceiver<H::Command>,
325
326 stream_map: BTreeMap<u64, StreamCtx>,
329 flow_map: BTreeMap<u64, FlowCtx>,
332 waiting_streams: FuturesUnordered<WaitForStream>,
336
337 dgram_recv: OutboundFrameStream,
339 dgram_send: OutboundFrameSender,
341
342 pooled_buf: PooledBuf,
344 max_stream_seen: u64,
346
347 settings_received_and_forwarded: bool,
350}
351
352impl<H: DriverHooks> H3Driver<H> {
353 pub fn new(http3_settings: Http3Settings) -> (Self, H3Controller<H>) {
359 let (dgram_send, dgram_recv) = mpsc::channel(FLOW_CAPACITY);
360 let (cmd_sender, cmd_recv) = mpsc::unbounded_channel();
361 let (h3_event_sender, h3_event_recv) = mpsc::unbounded_channel();
362
363 (
364 H3Driver {
365 h3_config: (&http3_settings).into(),
366 conn: None,
367 hooks: H::new(&http3_settings),
368 h3_event_sender,
369 cmd_recv,
370
371 stream_map: BTreeMap::new(),
372 flow_map: BTreeMap::new(),
373
374 dgram_recv,
375 dgram_send: PollSender::new(dgram_send),
376 pooled_buf: BufFactory::get_max_buf(),
377 max_stream_seen: 0,
378
379 waiting_streams: FuturesUnordered::new(),
380
381 settings_received_and_forwarded: false,
382 },
383 H3Controller {
384 cmd_sender,
385 h3_event_recv: Some(h3_event_recv),
386 },
387 )
388 }
389
390 fn get_or_insert_flow(
393 &mut self, flow_id: u64,
394 ) -> H3ConnectionResult<&mut FlowCtx> {
395 use std::collections::btree_map::Entry;
396 Ok(match self.flow_map.entry(flow_id) {
397 Entry::Vacant(e) => {
398 let (flow, recv) = FlowCtx::new(FLOW_CAPACITY);
400 let flow_req = H3Event::NewFlow {
401 flow_id,
402 recv,
403 send: self.dgram_send.clone(),
404 };
405 self.h3_event_sender
406 .send(flow_req.into())
407 .map_err(|_| H3ConnectionError::ControllerWentAway)?;
408 e.insert(flow)
409 },
410 Entry::Occupied(e) => e.into_mut(),
411 })
412 }
413
414 fn insert_stream(&mut self, stream_id: u64, ctx: StreamCtx) {
416 self.stream_map.insert(stream_id, ctx);
417 self.max_stream_seen = self.max_stream_seen.max(stream_id);
418 }
419
420 fn process_h3_data(
423 &mut self, qconn: &mut QuicheConnection, stream_id: u64,
424 ) -> H3ConnectionResult<()> {
425 let conn = self.conn.as_mut().ok_or(Self::connection_not_present())?;
427 let ctx = self
428 .stream_map
429 .get_mut(&stream_id)
430 .ok_or(H3ConnectionError::NonexistentStream)?;
431
432 enum StreamStatus {
433 Done { close: bool },
434 Blocked,
435 }
436
437 let status = loop {
438 let Some(sender) = ctx.send.as_ref().and_then(PollSender::get_ref)
439 else {
440 break StreamStatus::Done { close: false };
442 };
443
444 let permit = match sender.try_reserve() {
445 Ok(permit) => permit,
446 Err(TrySendError::Closed(())) => {
447 break StreamStatus::Done {
448 close: ctx.fin_sent && ctx.fin_recv,
449 };
450 },
451 Err(TrySendError::Full(())) => {
452 if ctx.fin_recv || qconn.stream_readable(stream_id) {
453 break StreamStatus::Blocked;
454 }
455 break StreamStatus::Done { close: false };
456 },
457 };
458
459 if ctx.fin_recv {
460 permit
462 .send(InboundFrame::Body(BufFactory::get_empty_buf(), true));
463 break StreamStatus::Done {
464 close: ctx.fin_sent,
465 };
466 }
467
468 match conn.recv_body(qconn, stream_id, &mut self.pooled_buf) {
469 Ok(n) => {
470 let mut body = std::mem::replace(
471 &mut self.pooled_buf,
472 BufFactory::get_max_buf(),
473 );
474 body.truncate(n);
475
476 ctx.audit_stats.add_downstream_bytes_recvd(n as u64);
477 let event = H3Event::BodyBytesReceived {
478 stream_id,
479 num_bytes: n as u64,
480 fin: false,
481 };
482 let _ = self.h3_event_sender.send(event.into());
483
484 permit.send(InboundFrame::Body(body, false));
485 },
486 Err(h3::Error::Done) =>
487 break StreamStatus::Done { close: false },
488 Err(_) => break StreamStatus::Done { close: true },
489 }
490 };
491
492 match status {
493 StreamStatus::Done { close } => {
494 if close {
495 return self.finish_stream(qconn, stream_id, None, None);
496 }
497
498 if !ctx.fin_recv && qconn.stream_finished(stream_id) {
507 return self.process_h3_fin(qconn, stream_id);
508 }
509 },
510 StreamStatus::Blocked => {
511 self.waiting_streams.push(ctx.wait_for_send(stream_id));
512 },
513 }
514
515 Ok(())
516 }
517
518 fn process_h3_fin(
520 &mut self, qconn: &mut QuicheConnection, stream_id: u64,
521 ) -> H3ConnectionResult<()> {
522 let ctx = self.stream_map.get_mut(&stream_id).filter(|c| !c.fin_recv);
523 let Some(ctx) = ctx else {
524 return Ok(());
526 };
527
528 ctx.fin_recv = true;
529 ctx.audit_stats
530 .set_recvd_stream_fin(StreamClosureKind::Explicit);
531
532 let event = H3Event::BodyBytesReceived {
536 stream_id,
537 num_bytes: 0,
538 fin: true,
539 };
540 let _ = self.h3_event_sender.send(event.into());
541
542 self.process_h3_data(qconn, stream_id)
545 }
546
547 fn process_read_event(
551 &mut self, qconn: &mut QuicheConnection, stream_id: u64, event: h3::Event,
552 ) -> H3ConnectionResult<()> {
553 self.forward_settings()?;
554
555 match event {
556 h3::Event::Headers { list, more_frames } =>
558 H::headers_received(self, qconn, InboundHeaders {
559 stream_id,
560 headers: list,
561 has_body: more_frames,
562 }),
563
564 h3::Event::Data => self.process_h3_data(qconn, stream_id),
565 h3::Event::Finished => self.process_h3_fin(qconn, stream_id),
566
567 h3::Event::Reset(code) => {
568 if let Some(ctx) = self.stream_map.get(&stream_id) {
569 ctx.audit_stats.set_recvd_reset_stream_error_code(code as _);
570 }
571
572 self.h3_event_sender
573 .send(H3Event::ResetStream { stream_id }.into())
574 .map_err(|_| H3ConnectionError::ControllerWentAway)?;
575
576 self.finish_stream(qconn, stream_id, None, None)
577 },
578
579 h3::Event::PriorityUpdate => Ok(()),
580 h3::Event::GoAway => Err(H3ConnectionError::GoAway),
581 }
582 }
583
584 fn forward_settings(&mut self) -> H3ConnectionResult<()> {
590 if self.settings_received_and_forwarded {
591 return Ok(());
592 }
593
594 if let Some(settings) = self.conn_mut()?.peer_settings_raw() {
596 let incoming_settings = H3Event::IncomingSettings {
597 settings: settings.to_vec(),
598 };
599
600 self.h3_event_sender
601 .send(incoming_settings.into())
602 .map_err(|_| H3ConnectionError::ControllerWentAway)?;
603
604 self.settings_received_and_forwarded = true;
605 }
606 Ok(())
607 }
608
609 fn process_write_frame(
615 conn: &mut h3::Connection, qconn: &mut QuicheConnection,
616 ctx: &mut StreamCtx,
617 ) -> h3::Result<()> {
618 let Some(frame) = &mut ctx.queued_frame else {
619 return Ok(());
620 };
621
622 let audit_stats = &ctx.audit_stats;
623 let stream_id = audit_stats.stream_id();
624
625 match frame {
626 OutboundFrame::Headers(headers, priority) => {
627 let prio = priority.as_ref().unwrap_or(&DEFAULT_PRIO);
628
629 if ctx.initial_headers_sent {
630 conn.send_additional_headers_with_priority(
633 qconn, stream_id, headers, prio, false, false,
634 )
635 } else {
636 conn.send_response_with_priority(
638 qconn, stream_id, headers, prio, false,
639 )
640 .inspect(|_| ctx.initial_headers_sent = true)
641 }
642 },
643
644 OutboundFrame::Body(body, fin) => {
645 let len = body.as_ref().len();
646 if len == 0 && !*fin {
647 return Ok(());
650 }
651 if *fin {
652 ctx.recv.as_mut().expect("channel").close();
656 }
657 #[cfg(feature = "zero-copy")]
658 let n = conn.send_body_zc(qconn, stream_id, body, *fin)?;
659
660 #[cfg(not(feature = "zero-copy"))]
661 let n = conn.send_body(qconn, stream_id, body, *fin)?;
662
663 audit_stats.add_downstream_bytes_sent(n as _);
664 if n != len {
665 #[cfg(not(feature = "zero-copy"))]
668 body.pop_front(n);
669
670 Err(h3::Error::StreamBlocked)
671 } else {
672 if *fin {
673 ctx.fin_sent = true;
674 audit_stats
675 .set_sent_stream_fin(StreamClosureKind::Explicit);
676 if ctx.fin_recv {
677 return Err(h3::Error::TransportError(
680 quiche::Error::Done,
681 ));
682 }
683 }
684 Ok(())
685 }
686 },
687
688 OutboundFrame::PeerStreamError => Err(h3::Error::MessageError),
689
690 OutboundFrame::FlowShutdown { .. } => {
691 unreachable!("Only flows send shutdowns")
692 },
693
694 OutboundFrame::Datagram(..) => {
695 unreachable!("Only flows send datagrams")
696 },
697 }
698 }
699
700 fn upstream_ready(
708 &mut self, qconn: &mut QuicheConnection, ready: StreamReady,
709 ) -> H3ConnectionResult<()> {
710 match ready {
711 StreamReady::Downstream(r) => self.upstream_read_ready(qconn, r),
712 StreamReady::Upstream(r) => self.upstream_write_ready(qconn, r),
713 }
714 }
715
716 fn upstream_read_ready(
717 &mut self, qconn: &mut QuicheConnection,
718 read_ready: ReceivedDownstreamData,
719 ) -> H3ConnectionResult<()> {
720 let ReceivedDownstreamData {
721 stream_id,
722 chan,
723 data,
724 } = read_ready;
725
726 match self.stream_map.get_mut(&stream_id) {
727 None => Ok(()),
728 Some(stream) => {
729 stream.recv = Some(chan);
730 stream.queued_frame = data;
731 self.process_writable_stream(qconn, stream_id)
732 },
733 }
734 }
735
736 fn upstream_write_ready(
737 &mut self, qconn: &mut QuicheConnection,
738 write_ready: HaveUpstreamCapacity,
739 ) -> H3ConnectionResult<()> {
740 let HaveUpstreamCapacity {
741 stream_id,
742 mut chan,
743 } = write_ready;
744
745 match self.stream_map.get_mut(&stream_id) {
746 None => Ok(()),
747 Some(stream) => {
748 chan.abort_send(); stream.send = Some(chan);
750 self.process_h3_data(qconn, stream_id)
751 },
752 }
753 }
754
755 fn dgram_ready(
757 &mut self, qconn: &mut QuicheConnection, frame: OutboundFrame,
758 ) -> H3ConnectionResult<()> {
759 let mut frame = Ok(frame);
760
761 loop {
762 match frame {
763 Ok(OutboundFrame::Datagram(dgram, flow_id)) => {
764 let _ = datagram::send_h3_dgram(qconn, flow_id, dgram);
766 },
767 Ok(OutboundFrame::FlowShutdown { flow_id, stream_id }) => {
768 self.finish_stream(
769 qconn,
770 stream_id,
771 Some(quiche::h3::WireErrorCode::NoError as u64),
772 Some(quiche::h3::WireErrorCode::NoError as u64),
773 )?;
774 self.flow_map.remove(&flow_id);
775 break;
776 },
777 Ok(_) => unreachable!("Flows can't send frame of other types"),
778 Err(TryRecvError::Empty) => break,
779 Err(TryRecvError::Disconnected) =>
780 return Err(H3ConnectionError::ControllerWentAway),
781 }
782
783 frame = self.dgram_recv.try_recv();
784 }
785
786 Ok(())
787 }
788
789 fn conn_mut(&mut self) -> H3ConnectionResult<&mut h3::Connection> {
794 self.conn.as_mut().ok_or(Self::connection_not_present())
795 }
796
797 const fn connection_not_present() -> H3ConnectionError {
801 H3ConnectionError::H3(h3::Error::TransportError(quiche::Error::TlsFail))
802 }
803
804 fn finish_stream(
808 &mut self, qconn: &mut QuicheConnection, stream_id: u64,
809 read: Option<u64>, write: Option<u64>,
810 ) -> H3ConnectionResult<()> {
811 let Some(stream_ctx) = self.stream_map.remove(&stream_id) else {
812 return Ok(());
813 };
814
815 let audit_stats = &stream_ctx.audit_stats;
816
817 if let Some(err) = read {
818 audit_stats.set_sent_stop_sending_error_code(err as _);
819 let _ = qconn.stream_shutdown(stream_id, quiche::Shutdown::Read, err);
820 }
821
822 if let Some(err) = write {
823 audit_stats.set_sent_reset_stream_error_code(err as _);
824 let _ =
825 qconn.stream_shutdown(stream_id, quiche::Shutdown::Write, err);
826 }
827
828 for pending in self.waiting_streams.iter_mut() {
830 match pending {
831 WaitForStream::Downstream(WaitForDownstreamData {
832 stream_id: id,
833 chan: Some(chan),
834 }) if stream_id == *id => {
835 chan.close();
836 },
837 WaitForStream::Upstream(WaitForUpstreamCapacity {
838 stream_id: id,
839 chan: Some(chan),
840 }) if stream_id == *id => {
841 chan.close();
842 },
843 _ => {},
844 }
845 }
846
847 if let Some(mapped_flow_id) = stream_ctx.associated_dgram_flow_id {
850 self.flow_map.remove(&mapped_flow_id);
851 }
852
853 if qconn.is_server() {
854 let _ = self
856 .h3_event_sender
857 .send(H3Event::StreamClosed { stream_id }.into());
858 }
859
860 Ok(())
861 }
862
863 fn handle_core_command(
866 &mut self, qconn: &mut QuicheConnection, cmd: H3Command,
867 ) -> H3ConnectionResult<()> {
868 match cmd {
869 H3Command::QuicCmd(cmd) => cmd.execute(qconn),
870 H3Command::GoAway => {
871 let max_id = self.max_stream_seen;
872 self.conn_mut()
873 .expect("connection should be established")
874 .send_goaway(qconn, max_id)?;
875 },
876 }
877 Ok(())
878 }
879}
880
881impl<H: DriverHooks> H3Driver<H> {
882 fn process_available_dgrams(
885 &mut self, qconn: &mut QuicheConnection,
886 ) -> H3ConnectionResult<()> {
887 loop {
888 match datagram::receive_h3_dgram(qconn) {
889 Ok((flow_id, dgram)) => {
890 self.get_or_insert_flow(flow_id)?.send_best_effort(dgram);
891 },
892 Err(quiche::Error::Done) => return Ok(()),
893 Err(err) => return Err(H3ConnectionError::from(err)),
894 }
895 }
896 }
897
898 fn process_writable_stream(
901 &mut self, qconn: &mut QuicheConnection, stream_id: u64,
902 ) -> H3ConnectionResult<()> {
903 let conn = self.conn.as_mut().ok_or(Self::connection_not_present())?;
905 let Some(ctx) = self.stream_map.get_mut(&stream_id) else {
906 return Ok(()); };
908
909 loop {
910 match Self::process_write_frame(conn, qconn, ctx) {
913 Ok(()) => ctx.queued_frame = None,
914 Err(h3::Error::StreamBlocked | h3::Error::Done) => break,
915 Err(h3::Error::MessageError) => {
916 return self.finish_stream(
917 qconn,
918 stream_id,
919 Some(quiche::h3::WireErrorCode::MessageError as u64),
920 Some(quiche::h3::WireErrorCode::MessageError as u64),
921 );
922 },
923 Err(h3::Error::TransportError(quiche::Error::StreamStopped(
924 e,
925 ))) => {
926 ctx.audit_stats.set_recvd_stop_sending_error_code(e as i64);
927 return self.finish_stream(qconn, stream_id, Some(e), None);
928 },
929 Err(h3::Error::TransportError(
930 quiche::Error::InvalidStreamState(stream),
931 )) => {
932 return self.finish_stream(qconn, stream, None, None);
933 },
934 Err(_) => {
935 return self.finish_stream(qconn, stream_id, None, None);
936 },
937 }
938
939 let Some(recv) = ctx.recv.as_mut() else {
940 return Ok(()); };
942
943 match recv.try_recv() {
949 Ok(frame) => ctx.queued_frame = Some(frame),
950 Err(TryRecvError::Disconnected) => break,
951 Err(TryRecvError::Empty) => {
952 self.waiting_streams.push(ctx.wait_for_recv(stream_id));
953 break;
954 },
955 }
956 }
957
958 Ok(())
959 }
960
961 fn record_quiche_error(qconn: &mut QuicheConnection, metrics: &impl Metrics) {
964 if let Some(err) = qconn.local_error() {
966 if err.is_app {
967 metrics.local_h3_conn_close_error_count(err.error_code.into())
968 } else {
969 metrics.local_quic_conn_close_error_count(err.error_code.into())
970 }
971 .inc();
972 } else if let Some(err) = qconn.peer_error() {
973 if err.is_app {
974 metrics.peer_h3_conn_close_error_count(err.error_code.into())
975 } else {
976 metrics.peer_quic_conn_close_error_count(err.error_code.into())
977 }
978 .inc();
979 }
980 }
981}
982
983impl<H: DriverHooks> ApplicationOverQuic for H3Driver<H> {
984 fn on_conn_established(
985 &mut self, quiche_conn: &mut QuicheConnection,
986 handshake_info: &HandshakeInfo,
987 ) -> QuicResult<()> {
988 let conn = h3::Connection::with_transport(quiche_conn, &self.h3_config)?;
989 self.conn = Some(conn);
990
991 H::conn_established(self, quiche_conn, handshake_info)?;
992 Ok(())
993 }
994
995 #[inline]
996 fn should_act(&self) -> bool {
997 self.conn.is_some()
998 }
999
1000 #[inline]
1001 fn buffer(&mut self) -> &mut [u8] {
1002 &mut self.pooled_buf
1003 }
1004
1005 fn process_reads(&mut self, qconn: &mut QuicheConnection) -> QuicResult<()> {
1011 loop {
1012 match self.conn_mut()?.poll(qconn) {
1013 Ok((stream_id, event)) =>
1014 self.process_read_event(qconn, stream_id, event)?,
1015 Err(h3::Error::Done) => break,
1016 Err(err) => {
1017 log::debug!("connection closed due to h3 protocol error"; "error"=>?err);
1021 return Ok(());
1022 },
1023 };
1024 }
1025
1026 self.process_available_dgrams(qconn)?;
1027 Ok(())
1028 }
1029
1030 fn process_writes(&mut self, qconn: &mut QuicheConnection) -> QuicResult<()> {
1034 while let Some(stream_id) = qconn.stream_writable_next() {
1035 self.process_writable_stream(qconn, stream_id)?;
1036 }
1037
1038 while let Some(Some(ready)) = self.waiting_streams.next().now_or_never() {
1040 self.upstream_ready(qconn, ready)?;
1041 }
1042
1043 Ok(())
1044 }
1045
1046 fn on_conn_close<M: Metrics>(
1049 &mut self, quiche_conn: &mut QuicheConnection, metrics: &M,
1050 work_loop_result: &QuicResult<()>,
1051 ) {
1052 let max_stream_seen = self.max_stream_seen;
1053 metrics
1054 .maximum_writable_streams()
1055 .observe(max_stream_seen as f64);
1056
1057 let Err(work_loop_error) = work_loop_result else {
1058 return;
1059 };
1060
1061 Self::record_quiche_error(quiche_conn, metrics);
1062
1063 let Some(h3_err) = work_loop_error.downcast_ref::<H3ConnectionError>()
1064 else {
1065 log::error!("Found non-H3ConnectionError"; "error" => %work_loop_error);
1066 return;
1067 };
1068
1069 if matches!(h3_err, H3ConnectionError::ControllerWentAway) {
1070 let _ =
1072 quiche_conn.close(true, h3::WireErrorCode::NoError as u64, &[]);
1073 return;
1074 }
1075
1076 if let Some(ev) = H3Event::from_error(h3_err) {
1077 let _ = self.h3_event_sender.send(ev.into());
1078 #[expect(clippy::needless_return)]
1079 return; }
1081 }
1082
1083 #[inline]
1086 async fn wait_for_data(
1087 &mut self, qconn: &mut QuicheConnection,
1088 ) -> QuicResult<()> {
1089 select! {
1090 biased;
1091 Some(ready) = self.waiting_streams.next() => self.upstream_ready(qconn, ready),
1092 Some(dgram) = self.dgram_recv.recv() => self.dgram_ready(qconn, dgram),
1093 Some(cmd) = self.cmd_recv.recv() => H::conn_command(self, qconn, cmd),
1094 r = self.hooks.wait_for_action(qconn), if H::has_wait_action(self) => r,
1095 }?;
1096
1097 if let Ok(cmd) = self.cmd_recv.try_recv() {
1101 H::conn_command(self, qconn, cmd)?;
1102 }
1103
1104 Ok(())
1105 }
1106}
1107
1108impl<H: DriverHooks> Drop for H3Driver<H> {
1109 fn drop(&mut self) {
1110 for stream in self.stream_map.values() {
1111 stream
1112 .audit_stats
1113 .set_recvd_stream_fin(StreamClosureKind::Implicit);
1114 }
1115 }
1116}
1117
1118#[derive(Debug)]
1124pub enum H3Command {
1125 QuicCmd(QuicCommand),
1128 GoAway,
1131}
1132
1133pub struct RequestSender<C, T> {
1136 sender: UnboundedSender<C>,
1137 _r: PhantomData<fn() -> T>,
1139}
1140
1141impl<C, T: Into<C>> RequestSender<C, T> {
1142 #[inline(always)]
1145 pub fn send(&self, v: T) -> Result<(), mpsc::error::SendError<C>> {
1146 self.sender.send(v.into())
1147 }
1148}
1149
1150impl<C, T> Clone for RequestSender<C, T> {
1151 fn clone(&self) -> Self {
1152 Self {
1153 sender: self.sender.clone(),
1154 _r: Default::default(),
1155 }
1156 }
1157}
1158
1159pub struct H3Controller<H: DriverHooks> {
1167 cmd_sender: UnboundedSender<H::Command>,
1170 h3_event_recv: Option<UnboundedReceiver<H::Event>>,
1173}
1174
1175impl<H: DriverHooks> H3Controller<H> {
1176 pub fn event_receiver_mut(&mut self) -> &mut UnboundedReceiver<H::Event> {
1179 self.h3_event_recv
1180 .as_mut()
1181 .expect("No event receiver on H3Controller")
1182 }
1183
1184 pub fn take_event_receiver(&mut self) -> UnboundedReceiver<H::Event> {
1186 self.h3_event_recv
1187 .take()
1188 .expect("No event receiver on H3Controller")
1189 }
1190
1191 pub fn cmd_sender(&self) -> RequestSender<H::Command, QuicCommand> {
1193 RequestSender {
1194 sender: self.cmd_sender.clone(),
1195 _r: Default::default(),
1196 }
1197 }
1198
1199 pub fn send_goaway(&self) {
1201 let _ = self.cmd_sender.send(H3Command::GoAway.into());
1202 }
1203}