tokio_quiche/quic/io/
connection_stage.rs1use std::fmt::Debug;
28use std::ops::ControlFlow;
29use std::time::Instant;
30
31use tokio::sync::mpsc;
32
33use crate::quic::connection::ApplicationOverQuic;
34use crate::quic::connection::HandshakeError;
35use crate::quic::connection::HandshakeInfo;
36use crate::quic::connection::Incoming;
37use crate::quic::connection::QuicConnectionStatsShared;
38use crate::quic::QuicheConnection;
39use crate::QuicResult;
40
41pub trait ConnectionStage: Send + Debug {
54 fn on_read<A: ApplicationOverQuic>(
55 &mut self, _received_packets: bool, _qconn: &mut QuicheConnection,
56 _ctx: &mut ConnectionStageContext<A>,
57 ) -> QuicResult<()> {
58 Ok(())
59 }
60
61 fn on_flush<A: ApplicationOverQuic>(
62 &mut self, _qconn: &mut QuicheConnection,
63 _ctx: &mut ConnectionStageContext<A>,
64 ) -> ControlFlow<QuicResult<()>> {
65 ControlFlow::Continue(())
66 }
67
68 fn wait_deadline(&mut self) -> Option<Instant> {
69 None
70 }
71
72 fn post_wait(
73 &self, _qconn: &mut QuicheConnection,
74 ) -> ControlFlow<QuicResult<()>> {
75 ControlFlow::Continue(())
76 }
77}
78
79pub struct ConnectionStageContext<A> {
81 pub in_pkt: Option<Incoming>,
82 pub application: A,
83 pub incoming_pkt_receiver: mpsc::Receiver<Incoming>,
84 pub stats: QuicConnectionStatsShared,
85}
86
87impl<A> ConnectionStageContext<A>
88where
89 A: ApplicationOverQuic,
90{
91 pub fn buffer(&mut self) -> &mut [u8] {
94 self.application.buffer()
95 }
96}
97
98#[derive(Debug)]
99pub struct Handshake {
100 pub handshake_info: HandshakeInfo,
101}
102
103impl Handshake {
104 fn check_handshake_timeout_expired(
105 &self, conn: &mut QuicheConnection,
106 ) -> QuicResult<()> {
107 if self.handshake_info.is_expired() {
108 let _ = conn.close(
109 false,
110 quiche::WireErrorCode::ApplicationError as u64,
111 &[],
112 );
113 return Err(HandshakeError::Timeout.into());
114 }
115
116 Ok(())
117 }
118}
119
120impl ConnectionStage for Handshake {
121 fn on_flush<A: ApplicationOverQuic>(
122 &mut self, qconn: &mut QuicheConnection,
123 _ctx: &mut ConnectionStageContext<A>,
124 ) -> ControlFlow<QuicResult<()>> {
125 if qconn.is_established() || qconn.is_in_early_data() {
128 ControlFlow::Break(Ok(()))
129 } else {
130 ControlFlow::Continue(())
131 }
132 }
133
134 fn wait_deadline(&mut self) -> Option<Instant> {
135 self.handshake_info.deadline()
136 }
137
138 fn post_wait(
139 &self, qconn: &mut QuicheConnection,
140 ) -> ControlFlow<QuicResult<()>> {
141 match self.check_handshake_timeout_expired(qconn) {
142 Ok(_) => ControlFlow::Continue(()),
143 Err(e) => ControlFlow::Break(Err(e)),
144 }
145 }
146}
147
148#[derive(Debug)]
149pub struct RunningApplication;
150
151impl ConnectionStage for RunningApplication {
152 fn on_read<A: ApplicationOverQuic>(
153 &mut self, received_packets: bool, qconn: &mut QuicheConnection,
154 ctx: &mut ConnectionStageContext<A>,
155 ) -> QuicResult<()> {
156 if ctx.application.should_act() {
157 if received_packets {
158 ctx.application.process_reads(qconn)?;
159 }
160
161 if qconn.is_established() {
162 ctx.application.process_writes(qconn)?;
163 }
164 }
165
166 Ok(())
167 }
168}
169
170#[derive(Debug)]
171pub struct Close {
172 pub work_loop_result: QuicResult<()>,
173}
174
175impl ConnectionStage for Close {}