tokio_quiche/http3/driver/
streams.rs1use std::future::Future;
28use std::pin::Pin;
29use std::sync::Arc;
30use std::task::Context;
31use std::task::Poll;
32
33use tokio::sync::mpsc;
34use tokio_util::sync::PollSender;
35
36use super::InboundFrame;
37use super::InboundFrameSender;
38use super::InboundFrameStream;
39use super::OutboundFrame;
40use super::OutboundFrameSender;
41use super::OutboundFrameStream;
42use crate::http3::H3AuditStats;
43
44pub(crate) struct StreamCtx {
45 pub(crate) send: Option<InboundFrameSender>,
47 pub(crate) recv: Option<OutboundFrameStream>,
49 pub(crate) queued_frame: Option<OutboundFrame>,
52 pub(crate) audit_stats: Arc<H3AuditStats>,
53 pub(crate) initial_headers_sent: bool,
55 pub(crate) fin_recv: bool,
57 pub(crate) fin_sent: bool,
59 pub(crate) associated_dgram_flow_id: Option<u64>,
62}
63
64impl StreamCtx {
65 pub(crate) fn new(
68 stream_id: u64, capacity: usize,
69 ) -> (Self, OutboundFrameSender, InboundFrameStream) {
70 let (forward_sender, forward_receiver) = mpsc::channel(capacity);
71 let (backward_sender, backward_receiver) = mpsc::channel(capacity);
72
73 let ctx = StreamCtx {
74 send: Some(PollSender::new(forward_sender)),
75 recv: Some(backward_receiver),
76 queued_frame: None,
77 audit_stats: Arc::new(H3AuditStats::new(stream_id)),
78
79 initial_headers_sent: false,
80
81 fin_recv: false,
82 fin_sent: false,
83
84 associated_dgram_flow_id: None,
85 };
86
87 (ctx, PollSender::new(backward_sender), forward_receiver)
88 }
89
90 pub(crate) fn wait_for_send(&mut self, stream_id: u64) -> WaitForStream {
92 WaitForStream::Upstream(WaitForUpstreamCapacity {
93 stream_id,
94 chan: self.send.take(),
95 })
96 }
97
98 pub(crate) fn wait_for_recv(&mut self, stream_id: u64) -> WaitForStream {
100 WaitForStream::Downstream(WaitForDownstreamData {
101 stream_id,
102 chan: self.recv.take(),
103 })
104 }
105}
106
107pub(crate) struct FlowCtx {
108 send: mpsc::Sender<InboundFrame>,
110 }
112
113impl FlowCtx {
114 pub(crate) fn new(capacity: usize) -> (Self, InboundFrameStream) {
117 let (forward_sender, forward_receiver) = mpsc::channel(capacity);
118 let ctx = FlowCtx {
119 send: forward_sender,
120 };
121 (ctx, forward_receiver)
122 }
123
124 pub(crate) fn send_best_effort(&self, datagram: InboundFrame) {
127 let _ = self.send.try_send(datagram);
128 }
129}
130
131pub(crate) enum WaitForStream {
132 Downstream(WaitForDownstreamData),
133 Upstream(WaitForUpstreamCapacity),
134}
135
136pub(crate) enum StreamReady {
137 Downstream(ReceivedDownstreamData),
138 Upstream(HaveUpstreamCapacity),
139}
140
141impl Future for WaitForStream {
142 type Output = StreamReady;
143
144 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
145 match self.get_mut() {
146 WaitForStream::Downstream(d) =>
147 Pin::new(d).poll(cx).map(StreamReady::Downstream),
148 WaitForStream::Upstream(u) =>
149 Pin::new(u).poll(cx).map(StreamReady::Upstream),
150 }
151 }
152}
153
154pub(crate) struct WaitForDownstreamData {
155 pub(crate) stream_id: u64,
156 pub(crate) chan: Option<OutboundFrameStream>,
157}
158
159pub(crate) struct ReceivedDownstreamData {
160 pub(crate) stream_id: u64,
161 pub(crate) chan: OutboundFrameStream,
162 pub(crate) data: Option<OutboundFrame>,
163}
164
165impl Future for WaitForDownstreamData {
166 type Output = ReceivedDownstreamData;
167
168 fn poll(
169 mut self: Pin<&mut Self>, cx: &mut Context<'_>,
170 ) -> Poll<Self::Output> {
171 self.chan.as_mut().unwrap().poll_recv(cx).map(|data| {
174 ReceivedDownstreamData {
175 stream_id: self.stream_id,
176 chan: self.chan.take().unwrap(),
177 data,
178 }
179 })
180 }
181}
182
183pub(crate) struct WaitForUpstreamCapacity {
184 pub(crate) stream_id: u64,
185 pub(crate) chan: Option<InboundFrameSender>,
186}
187
188pub(crate) struct HaveUpstreamCapacity {
189 pub(crate) stream_id: u64,
190 pub(crate) chan: InboundFrameSender,
191}
192
193impl Future for WaitForUpstreamCapacity {
194 type Output = HaveUpstreamCapacity;
195
196 fn poll(
197 mut self: Pin<&mut Self>, cx: &mut Context<'_>,
198 ) -> Poll<Self::Output> {
199 match self.chan.as_mut().unwrap().poll_reserve(cx) {
202 Poll::Ready(_) => Poll::Ready(HaveUpstreamCapacity {
203 stream_id: self.stream_id,
204 chan: self.chan.take().unwrap(),
205 }),
206 Poll::Pending => Poll::Pending,
207 }
208 }
209}