tokio_quiche/http3/driver/
streams.rs1use std::future::Future;
28use std::pin::Pin;
29use std::sync::Arc;
30use std::task::Context;
31use std::task::Poll;
32use std::time::Instant;
33
34use tokio::sync::mpsc;
35use tokio_util::sync::PollSender;
36
37use super::InboundFrame;
38use super::InboundFrameSender;
39use super::InboundFrameStream;
40use super::OutboundFrame;
41use super::OutboundFrameSender;
42use super::OutboundFrameStream;
43use crate::http3::H3AuditStats;
44
45pub(crate) struct StreamCtx {
46 pub(crate) send: Option<InboundFrameSender>,
48 pub(crate) recv: Option<OutboundFrameStream>,
50 pub(crate) queued_frame: Option<OutboundFrame>,
53 pub(crate) audit_stats: Arc<H3AuditStats>,
54 pub(crate) initial_headers_sent: bool,
56 pub(crate) first_full_headers_flush_fail_time: Option<Instant>,
58 pub(crate) fin_or_reset_recv: bool,
61 pub(crate) fin_or_reset_sent: bool,
63 pub(crate) associated_dgram_flow_id: Option<u64>,
66}
67
68impl StreamCtx {
69 pub(crate) fn new(
72 stream_id: u64, capacity: usize,
73 ) -> (Self, OutboundFrameSender, InboundFrameStream) {
74 let (forward_sender, forward_receiver) = mpsc::channel(capacity);
75 let (backward_sender, backward_receiver) = mpsc::channel(capacity);
76
77 let ctx = StreamCtx {
78 send: Some(PollSender::new(forward_sender)),
79 recv: Some(backward_receiver),
80 queued_frame: None,
81 audit_stats: Arc::new(H3AuditStats::new(stream_id)),
82
83 initial_headers_sent: false,
84 first_full_headers_flush_fail_time: None,
85
86 fin_or_reset_recv: false,
87 fin_or_reset_sent: false,
88
89 associated_dgram_flow_id: None,
90 };
91
92 (ctx, PollSender::new(backward_sender), forward_receiver)
93 }
94
95 pub(crate) fn wait_for_send(&mut self, stream_id: u64) -> WaitForStream {
97 WaitForStream::Upstream(WaitForUpstreamCapacity {
98 stream_id,
99 chan: self.send.take(),
100 })
101 }
102
103 pub(crate) fn wait_for_recv(&mut self, stream_id: u64) -> WaitForStream {
105 WaitForStream::Downstream(WaitForDownstreamData {
106 stream_id,
107 chan: self.recv.take(),
108 })
109 }
110
111 pub(crate) fn handle_recvd_stop_sending(&mut self, wire_err_code: u64) {
117 debug_assert!(!self.fin_or_reset_sent);
118 self.audit_stats
123 .set_recvd_stop_sending_error_code(wire_err_code as i64);
124 self.fin_or_reset_sent = true;
125 self.queued_frame = None;
128 debug_assert!(self.recv.is_some());
129 self.recv = None;
130 }
131
132 pub(crate) fn handle_recvd_reset(&mut self, wire_err_code: u64) {
133 debug_assert!(!self.fin_or_reset_recv);
134 self.audit_stats
138 .set_recvd_reset_stream_error_code(wire_err_code as i64);
139 self.fin_or_reset_recv = true;
140 self.send = None;
141 }
142
143 pub(crate) fn handle_sent_reset(&mut self, wire_err_code: u64) {
144 debug_assert!(!self.fin_or_reset_sent);
145 self.audit_stats
146 .set_sent_reset_stream_error_code(wire_err_code as i64);
147 self.fin_or_reset_sent = true;
148 }
149
150 pub(crate) fn handle_sent_stop_sending(&mut self, wire_err_code: u64) {
151 debug_assert!(!self.fin_or_reset_recv);
152 self.audit_stats
153 .set_sent_stop_sending_error_code(wire_err_code as i64);
154 self.fin_or_reset_recv = true;
158 self.send = None;
159 }
160
161 pub(crate) fn both_directions_done(&self) -> bool {
162 self.fin_or_reset_recv && self.fin_or_reset_sent
163 }
164}
165
166pub(crate) struct FlowCtx {
167 send: mpsc::Sender<InboundFrame>,
169 }
171
172impl FlowCtx {
173 pub(crate) fn new(capacity: usize) -> (Self, InboundFrameStream) {
176 let (forward_sender, forward_receiver) = mpsc::channel(capacity);
177 let ctx = FlowCtx {
178 send: forward_sender,
179 };
180 (ctx, forward_receiver)
181 }
182
183 pub(crate) fn send_best_effort(&self, datagram: InboundFrame) {
186 let _ = self.send.try_send(datagram);
187 }
188}
189
190pub(crate) enum WaitForStream {
191 Downstream(WaitForDownstreamData),
192 Upstream(WaitForUpstreamCapacity),
193}
194
195pub(crate) enum StreamReady {
196 Downstream(ReceivedDownstreamData),
197 Upstream(HaveUpstreamCapacity),
198}
199
200impl Future for WaitForStream {
201 type Output = StreamReady;
202
203 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
204 match self.get_mut() {
205 WaitForStream::Downstream(d) =>
206 Pin::new(d).poll(cx).map(StreamReady::Downstream),
207 WaitForStream::Upstream(u) =>
208 Pin::new(u).poll(cx).map(StreamReady::Upstream),
209 }
210 }
211}
212
213pub(crate) struct WaitForDownstreamData {
214 pub(crate) stream_id: u64,
215 pub(crate) chan: Option<OutboundFrameStream>,
216}
217
218pub(crate) struct ReceivedDownstreamData {
219 pub(crate) stream_id: u64,
220 pub(crate) chan: OutboundFrameStream,
221 pub(crate) data: Option<OutboundFrame>,
222}
223
224impl Future for WaitForDownstreamData {
225 type Output = ReceivedDownstreamData;
226
227 fn poll(
228 mut self: Pin<&mut Self>, cx: &mut Context<'_>,
229 ) -> Poll<Self::Output> {
230 self.chan.as_mut().unwrap().poll_recv(cx).map(|data| {
233 ReceivedDownstreamData {
234 stream_id: self.stream_id,
235 chan: self.chan.take().unwrap(),
236 data,
237 }
238 })
239 }
240}
241
242pub(crate) struct WaitForUpstreamCapacity {
243 pub(crate) stream_id: u64,
244 pub(crate) chan: Option<InboundFrameSender>,
245}
246
247pub(crate) struct HaveUpstreamCapacity {
248 pub(crate) stream_id: u64,
249 pub(crate) chan: InboundFrameSender,
250}
251
252impl Future for WaitForUpstreamCapacity {
253 type Output = HaveUpstreamCapacity;
254
255 fn poll(
256 mut self: Pin<&mut Self>, cx: &mut Context<'_>,
257 ) -> Poll<Self::Output> {
258 match self.chan.as_mut().unwrap().poll_reserve(cx) {
261 Poll::Ready(_) => Poll::Ready(HaveUpstreamCapacity {
262 stream_id: self.stream_id,
263 chan: self.chan.take().unwrap(),
264 }),
265 Poll::Pending => Poll::Pending,
266 }
267 }
268}