tokio_quiche/http3/driver/
streams.rs1use std::future::Future;
28use std::pin::Pin;
29use std::sync::Arc;
30use std::task::Context;
31use std::task::Poll;
32use std::time::Instant;
33
34use tokio::sync::mpsc;
35use tokio_util::sync::PollSender;
36
37use super::InboundFrame;
38use super::InboundFrameSender;
39use super::InboundFrameStream;
40use super::OutboundFrame;
41use super::OutboundFrameSender;
42use super::OutboundFrameStream;
43use crate::http3::H3AuditStats;
44
45pub(crate) struct StreamCtx {
46    pub(crate) send: Option<InboundFrameSender>,
48    pub(crate) recv: Option<OutboundFrameStream>,
50    pub(crate) queued_frame: Option<OutboundFrame>,
53    pub(crate) audit_stats: Arc<H3AuditStats>,
54    pub(crate) initial_headers_sent: bool,
56    pub(crate) first_full_headers_flush_fail_time: Option<Instant>,
58    pub(crate) fin_or_reset_recv: bool,
61    pub(crate) fin_or_reset_sent: bool,
63    pub(crate) associated_dgram_flow_id: Option<u64>,
66}
67
68impl StreamCtx {
69    pub(crate) fn new(
72        stream_id: u64, capacity: usize,
73    ) -> (Self, OutboundFrameSender, InboundFrameStream) {
74        let (forward_sender, forward_receiver) = mpsc::channel(capacity);
75        let (backward_sender, backward_receiver) = mpsc::channel(capacity);
76
77        let ctx = StreamCtx {
78            send: Some(PollSender::new(forward_sender)),
79            recv: Some(backward_receiver),
80            queued_frame: None,
81            audit_stats: Arc::new(H3AuditStats::new(stream_id)),
82
83            initial_headers_sent: false,
84            first_full_headers_flush_fail_time: None,
85
86            fin_or_reset_recv: false,
87            fin_or_reset_sent: false,
88
89            associated_dgram_flow_id: None,
90        };
91
92        (ctx, PollSender::new(backward_sender), forward_receiver)
93    }
94
95    pub(crate) fn wait_for_send(&mut self, stream_id: u64) -> WaitForStream {
97        WaitForStream::Upstream(WaitForUpstreamCapacity {
98            stream_id,
99            chan: self.send.take(),
100        })
101    }
102
103    pub(crate) fn wait_for_recv(&mut self, stream_id: u64) -> WaitForStream {
105        WaitForStream::Downstream(WaitForDownstreamData {
106            stream_id,
107            chan: self.recv.take(),
108        })
109    }
110
111    pub(crate) fn handle_recvd_stop_sending(&mut self, wire_err_code: u64) {
117        debug_assert!(!self.fin_or_reset_sent);
118        self.audit_stats
123            .set_recvd_stop_sending_error_code(wire_err_code as i64);
124        self.fin_or_reset_sent = true;
125        self.queued_frame = None;
128        debug_assert!(self.recv.is_some());
129        self.recv = None;
130    }
131
132    pub(crate) fn handle_recvd_reset(&mut self, wire_err_code: u64) {
133        debug_assert!(!self.fin_or_reset_recv);
134        self.audit_stats
138            .set_recvd_reset_stream_error_code(wire_err_code as i64);
139        self.fin_or_reset_recv = true;
140        self.send = None;
141    }
142
143    pub(crate) fn handle_sent_reset(&mut self, wire_err_code: u64) {
144        debug_assert!(!self.fin_or_reset_sent);
145        self.audit_stats
146            .set_sent_reset_stream_error_code(wire_err_code as i64);
147        self.fin_or_reset_sent = true;
148    }
149
150    pub(crate) fn handle_sent_stop_sending(&mut self, wire_err_code: u64) {
151        debug_assert!(!self.fin_or_reset_recv);
152        self.audit_stats
153            .set_sent_stop_sending_error_code(wire_err_code as i64);
154        self.fin_or_reset_recv = true;
158        self.send = None;
159    }
160
161    pub(crate) fn both_directions_done(&self) -> bool {
162        self.fin_or_reset_recv && self.fin_or_reset_sent
163    }
164}
165
166pub(crate) struct FlowCtx {
167    send: mpsc::Sender<InboundFrame>,
169    }
171
172impl FlowCtx {
173    pub(crate) fn new(capacity: usize) -> (Self, InboundFrameStream) {
176        let (forward_sender, forward_receiver) = mpsc::channel(capacity);
177        let ctx = FlowCtx {
178            send: forward_sender,
179        };
180        (ctx, forward_receiver)
181    }
182
183    pub(crate) fn send_best_effort(&self, datagram: InboundFrame) {
186        let _ = self.send.try_send(datagram);
187    }
188}
189
190pub(crate) enum WaitForStream {
191    Downstream(WaitForDownstreamData),
192    Upstream(WaitForUpstreamCapacity),
193}
194
195pub(crate) enum StreamReady {
196    Downstream(ReceivedDownstreamData),
197    Upstream(HaveUpstreamCapacity),
198}
199
200impl Future for WaitForStream {
201    type Output = StreamReady;
202
203    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
204        match self.get_mut() {
205            WaitForStream::Downstream(d) =>
206                Pin::new(d).poll(cx).map(StreamReady::Downstream),
207            WaitForStream::Upstream(u) =>
208                Pin::new(u).poll(cx).map(StreamReady::Upstream),
209        }
210    }
211}
212
213pub(crate) struct WaitForDownstreamData {
214    pub(crate) stream_id: u64,
215    pub(crate) chan: Option<OutboundFrameStream>,
216}
217
218pub(crate) struct ReceivedDownstreamData {
219    pub(crate) stream_id: u64,
220    pub(crate) chan: OutboundFrameStream,
221    pub(crate) data: Option<OutboundFrame>,
222}
223
224impl Future for WaitForDownstreamData {
225    type Output = ReceivedDownstreamData;
226
227    fn poll(
228        mut self: Pin<&mut Self>, cx: &mut Context<'_>,
229    ) -> Poll<Self::Output> {
230        self.chan.as_mut().unwrap().poll_recv(cx).map(|data| {
233            ReceivedDownstreamData {
234                stream_id: self.stream_id,
235                chan: self.chan.take().unwrap(),
236                data,
237            }
238        })
239    }
240}
241
242pub(crate) struct WaitForUpstreamCapacity {
243    pub(crate) stream_id: u64,
244    pub(crate) chan: Option<InboundFrameSender>,
245}
246
247pub(crate) struct HaveUpstreamCapacity {
248    pub(crate) stream_id: u64,
249    pub(crate) chan: InboundFrameSender,
250}
251
252impl Future for WaitForUpstreamCapacity {
253    type Output = HaveUpstreamCapacity;
254
255    fn poll(
256        mut self: Pin<&mut Self>, cx: &mut Context<'_>,
257    ) -> Poll<Self::Output> {
258        match self.chan.as_mut().unwrap().poll_reserve(cx) {
261            Poll::Ready(_) => Poll::Ready(HaveUpstreamCapacity {
262                stream_id: self.stream_id,
263                chan: self.chan.take().unwrap(),
264            }),
265            Poll::Pending => Poll::Pending,
266        }
267    }
268}