tokio_quiche/quic/io/
connection_stage.rs1use std::fmt::Debug;
28use std::ops::ControlFlow;
29use std::time::Instant;
30
31use tokio::sync::mpsc;
32
33use crate::quic::connection::ApplicationOverQuic;
34use crate::quic::connection::HandshakeError;
35use crate::quic::connection::HandshakeInfo;
36use crate::quic::connection::Incoming;
37use crate::quic::connection::QuicConnectionStatsShared;
38use crate::quic::QuicheConnection;
39use crate::QuicResult;
40
41pub trait ConnectionStage: Send + Debug {
54 fn on_read<A: ApplicationOverQuic>(
55 &mut self, _received_packets: bool, _qconn: &mut QuicheConnection,
56 _ctx: &mut ConnectionStageContext<A>,
57 ) -> QuicResult<()> {
58 Ok(())
59 }
60
61 fn on_flush<A: ApplicationOverQuic>(
62 &mut self, _qconn: &mut QuicheConnection,
63 _ctx: &mut ConnectionStageContext<A>,
64 ) -> ControlFlow<QuicResult<()>> {
65 ControlFlow::Continue(())
66 }
67
68 fn wait_deadline(&mut self) -> Option<Instant> {
69 None
70 }
71
72 fn post_wait(
73 &self, _qconn: &mut QuicheConnection,
74 ) -> ControlFlow<QuicResult<()>> {
75 ControlFlow::Continue(())
76 }
77}
78
79pub struct ConnectionStageContext<A> {
81 pub in_pkt: Option<Incoming>,
82 pub application: A,
83 pub incoming_pkt_receiver: mpsc::Receiver<Incoming>,
84 pub stats: QuicConnectionStatsShared,
85}
86
87impl<A> ConnectionStageContext<A>
88where
89 A: ApplicationOverQuic,
90{
91 pub fn buffer(&mut self) -> &mut [u8] {
94 self.application.buffer()
95 }
96}
97
98#[derive(Debug)]
99pub struct Handshake {
100 pub handshake_info: HandshakeInfo,
101}
102
103impl Handshake {
104 fn check_handshake_timeout_expired(
105 &self, conn: &mut QuicheConnection,
106 ) -> QuicResult<()> {
107 if self.handshake_info.is_expired() {
108 let _ = conn.close(
109 false,
110 quiche::WireErrorCode::ApplicationError as u64,
111 &[],
112 );
113 return Err(HandshakeError::Timeout.into());
114 }
115
116 Ok(())
117 }
118}
119
120impl ConnectionStage for Handshake {
121 fn on_flush<A: ApplicationOverQuic>(
122 &mut self, qconn: &mut QuicheConnection,
123 _ctx: &mut ConnectionStageContext<A>,
124 ) -> ControlFlow<QuicResult<()>> {
125 if qconn.is_established() {
126 ControlFlow::Break(Ok(()))
127 } else {
128 ControlFlow::Continue(())
129 }
130 }
131
132 fn wait_deadline(&mut self) -> Option<Instant> {
133 self.handshake_info.deadline()
134 }
135
136 fn post_wait(
137 &self, qconn: &mut QuicheConnection,
138 ) -> ControlFlow<QuicResult<()>> {
139 match self.check_handshake_timeout_expired(qconn) {
140 Ok(_) => ControlFlow::Continue(()),
141 Err(e) => ControlFlow::Break(Err(e)),
142 }
143 }
144}
145
146#[derive(Debug)]
147pub struct RunningApplication;
148
149impl ConnectionStage for RunningApplication {
150 fn on_read<A: ApplicationOverQuic>(
151 &mut self, received_packets: bool, qconn: &mut QuicheConnection,
152 ctx: &mut ConnectionStageContext<A>,
153 ) -> QuicResult<()> {
154 if ctx.application.should_act() {
155 if received_packets {
156 ctx.application.process_reads(qconn)?;
157 }
158
159 if qconn.is_established() {
160 ctx.application.process_writes(qconn)?;
161 }
162 }
163
164 Ok(())
165 }
166}
167
168#[derive(Debug)]
169pub struct Close {
170 pub work_loop_result: QuicResult<()>,
171}
172
173impl ConnectionStage for Close {}