tokio_quiche/http3/driver/
streams.rs1use std::future::Future;
28use std::pin::Pin;
29use std::sync::Arc;
30use std::task::Context;
31use std::task::Poll;
32use std::time::Instant;
33
34use tokio::sync::mpsc;
35use tokio_util::sync::PollSender;
36
37use super::InboundFrame;
38use super::InboundFrameSender;
39use super::InboundFrameStream;
40use super::OutboundFrame;
41use super::OutboundFrameSender;
42use super::OutboundFrameStream;
43use crate::http3::H3AuditStats;
44
45pub(crate) struct StreamCtx {
46 pub(crate) send: Option<InboundFrameSender>,
48 pub(crate) recv: Option<OutboundFrameStream>,
50 pub(crate) queued_frame: Option<OutboundFrame>,
53 pub(crate) audit_stats: Arc<H3AuditStats>,
54 pub(crate) initial_headers_sent: bool,
56 pub(crate) first_full_headers_flush_fail_time: Option<Instant>,
58 pub(crate) fin_or_reset_recv: bool,
61 pub(crate) fin_or_reset_sent: bool,
63 pub(crate) associated_dgram_flow_id: Option<u64>,
66}
67
68impl StreamCtx {
69 pub(crate) fn new(
72 stream_id: u64, capacity: usize,
73 ) -> (Self, OutboundFrameSender, InboundFrameStream) {
74 let (forward_sender, forward_receiver) = mpsc::channel(capacity);
75 let (backward_sender, backward_receiver) = mpsc::channel(capacity);
76
77 let ctx = StreamCtx {
78 send: Some(PollSender::new(forward_sender)),
79 recv: Some(backward_receiver),
80 queued_frame: None,
81 audit_stats: Arc::new(H3AuditStats::new(stream_id)),
82
83 initial_headers_sent: false,
84 first_full_headers_flush_fail_time: None,
85
86 fin_or_reset_recv: false,
87 fin_or_reset_sent: false,
88
89 associated_dgram_flow_id: None,
90 };
91
92 (ctx, PollSender::new(backward_sender), forward_receiver)
93 }
94
95 pub(crate) fn wait_for_send(&mut self, stream_id: u64) -> WaitForStream {
97 WaitForStream::Upstream(WaitForUpstreamCapacity {
98 stream_id,
99 chan: self.send.take(),
100 })
101 }
102
103 pub(crate) fn wait_for_recv(&mut self, stream_id: u64) -> WaitForStream {
105 WaitForStream::Downstream(WaitForDownstreamData {
106 stream_id,
107 chan: self.recv.take(),
108 })
109 }
110
111 pub(crate) fn handle_recvd_stop_sending(&mut self, wire_err_code: u64) {
117 self.audit_stats
119 .set_recvd_stop_sending_error_code(wire_err_code as i64);
120
121 if self.fin_or_reset_sent {
122 return;
126 }
127
128 self.fin_or_reset_sent = true;
133 self.queued_frame = None;
136 self.recv = None;
137 }
138
139 pub(crate) fn handle_recvd_reset(&mut self, wire_err_code: u64) {
140 debug_assert!(!self.fin_or_reset_recv);
141 self.audit_stats
145 .set_recvd_reset_stream_error_code(wire_err_code as i64);
146 self.fin_or_reset_recv = true;
147 self.send = None;
148 }
149
150 pub(crate) fn handle_sent_reset(&mut self, wire_err_code: u64) {
151 debug_assert!(!self.fin_or_reset_sent);
152 self.audit_stats
153 .set_sent_reset_stream_error_code(wire_err_code as i64);
154 self.fin_or_reset_sent = true;
155 }
156
157 pub(crate) fn handle_sent_stop_sending(&mut self, wire_err_code: u64) {
158 debug_assert!(!self.fin_or_reset_recv);
159 self.audit_stats
160 .set_sent_stop_sending_error_code(wire_err_code as i64);
161 self.fin_or_reset_recv = true;
165 self.send = None;
166 }
167
168 pub(crate) fn both_directions_done(&self) -> bool {
169 self.fin_or_reset_recv && self.fin_or_reset_sent
170 }
171}
172
173pub(crate) struct FlowCtx {
174 send: mpsc::Sender<InboundFrame>,
176 }
178
179impl FlowCtx {
180 pub(crate) fn new(capacity: usize) -> (Self, InboundFrameStream) {
183 let (forward_sender, forward_receiver) = mpsc::channel(capacity);
184 let ctx = FlowCtx {
185 send: forward_sender,
186 };
187 (ctx, forward_receiver)
188 }
189
190 pub(crate) fn send_best_effort(&self, datagram: InboundFrame) {
193 let _ = self.send.try_send(datagram);
194 }
195}
196
197pub(crate) enum WaitForStream {
198 Downstream(WaitForDownstreamData),
199 Upstream(WaitForUpstreamCapacity),
200}
201
202pub(crate) enum StreamReady {
203 Downstream(ReceivedDownstreamData),
204 Upstream(HaveUpstreamCapacity),
205}
206
207impl Future for WaitForStream {
208 type Output = StreamReady;
209
210 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
211 match self.get_mut() {
212 WaitForStream::Downstream(d) =>
213 Pin::new(d).poll(cx).map(StreamReady::Downstream),
214 WaitForStream::Upstream(u) =>
215 Pin::new(u).poll(cx).map(StreamReady::Upstream),
216 }
217 }
218}
219
220pub(crate) struct WaitForDownstreamData {
221 pub(crate) stream_id: u64,
222 pub(crate) chan: Option<OutboundFrameStream>,
223}
224
225pub(crate) struct ReceivedDownstreamData {
226 pub(crate) stream_id: u64,
227 pub(crate) chan: OutboundFrameStream,
228 pub(crate) data: Option<OutboundFrame>,
229}
230
231impl Future for WaitForDownstreamData {
232 type Output = ReceivedDownstreamData;
233
234 fn poll(
235 mut self: Pin<&mut Self>, cx: &mut Context<'_>,
236 ) -> Poll<Self::Output> {
237 self.chan.as_mut().unwrap().poll_recv(cx).map(|data| {
240 ReceivedDownstreamData {
241 stream_id: self.stream_id,
242 chan: self.chan.take().unwrap(),
243 data,
244 }
245 })
246 }
247}
248
249pub(crate) struct WaitForUpstreamCapacity {
250 pub(crate) stream_id: u64,
251 pub(crate) chan: Option<InboundFrameSender>,
252}
253
254pub(crate) struct HaveUpstreamCapacity {
255 pub(crate) stream_id: u64,
256 pub(crate) chan: InboundFrameSender,
257}
258
259impl Future for WaitForUpstreamCapacity {
260 type Output = HaveUpstreamCapacity;
261
262 fn poll(
263 mut self: Pin<&mut Self>, cx: &mut Context<'_>,
264 ) -> Poll<Self::Output> {
265 match self.chan.as_mut().unwrap().poll_reserve(cx) {
268 Poll::Ready(_) => Poll::Ready(HaveUpstreamCapacity {
269 stream_id: self.stream_id,
270 chan: self.chan.take().unwrap(),
271 }),
272 Poll::Pending => Poll::Pending,
273 }
274 }
275}