quiche_apps/
common.rs

1// Copyright (C) 2020, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27//! Quiche application utilities.
28//!
29//! This module provides some utility functions that are common to quiche
30//! applications.
31
32use std::io::prelude::*;
33
34use std::collections::HashMap;
35
36#[cfg(feature = "sfv")]
37use std::convert::TryFrom;
38
39use std::fmt::Write as _;
40
41use std::rc::Rc;
42
43use std::cell::RefCell;
44
45use std::path;
46
47use ring::rand::SecureRandom;
48
49use quiche::ConnectionId;
50
51use quiche::h3::NameValue;
52use quiche::h3::Priority;
53
54pub fn stdout_sink(out: String) {
55    print!("{out}");
56}
57
58const H3_MESSAGE_ERROR: u64 = 0x10E;
59
60/// ALPN helpers.
61///
62/// This module contains constants and functions for working with ALPN.
63pub mod alpns {
64    pub const HTTP_09: [&[u8]; 2] = [b"hq-interop", b"http/0.9"];
65    pub const HTTP_3: [&[u8]; 1] = [b"h3"];
66}
67
68pub struct PartialRequest {
69    pub req: Vec<u8>,
70}
71
72pub struct PartialResponse {
73    pub headers: Option<Vec<quiche::h3::Header>>,
74    pub priority: Option<quiche::h3::Priority>,
75
76    pub body: Vec<u8>,
77
78    pub written: usize,
79}
80
81pub type ClientId = u64;
82
83pub struct Client {
84    pub conn: quiche::Connection,
85
86    pub http_conn: Option<Box<dyn HttpConn>>,
87
88    pub client_id: ClientId,
89
90    pub app_proto_selected: bool,
91
92    pub partial_requests: std::collections::HashMap<u64, PartialRequest>,
93
94    pub partial_responses: std::collections::HashMap<u64, PartialResponse>,
95
96    pub max_datagram_size: usize,
97
98    pub loss_rate: f64,
99
100    pub max_send_burst: usize,
101}
102
103pub type ClientIdMap = HashMap<ConnectionId<'static>, ClientId>;
104pub type ClientMap = HashMap<ClientId, Client>;
105
106/// Makes a buffered writer for a resource with a target URL.
107///
108/// The file will have the same name as the resource's last path segment value.
109/// Multiple requests for the same URL are indicated by the value of `cardinal`,
110/// any value "N" greater than 1, will cause ".N" to be appended to the
111/// filename.
112fn make_resource_writer(
113    url: &url::Url, target_path: &Option<String>, cardinal: u64,
114) -> Option<std::io::BufWriter<std::fs::File>> {
115    if let Some(tp) = target_path {
116        let resource =
117            url.path_segments().map(|c| c.collect::<Vec<_>>()).unwrap();
118
119        let mut path = format!("{}/{}", tp, resource.iter().last().unwrap());
120
121        if cardinal > 1 {
122            path = format!("{path}.{cardinal}");
123        }
124
125        match std::fs::File::create(&path) {
126            Ok(f) => return Some(std::io::BufWriter::new(f)),
127
128            Err(e) => panic!(
129                "Error creating file for {url}, attempted path was {path}: {e}"
130            ),
131        }
132    }
133
134    None
135}
136
137fn autoindex(path: path::PathBuf, index: &str) -> path::PathBuf {
138    if let Some(path_str) = path.to_str() {
139        if path_str.ends_with('/') {
140            let path_str = format!("{path_str}{index}");
141            return path::PathBuf::from(&path_str);
142        }
143    }
144
145    path
146}
147
148/// Makes a buffered writer for a qlog.
149pub fn make_qlog_writer(
150    dir: &std::ffi::OsStr, role: &str, id: &str,
151) -> std::io::BufWriter<std::fs::File> {
152    let mut path = std::path::PathBuf::from(dir);
153    let filename = format!("{role}-{id}.sqlog");
154    path.push(filename);
155
156    match std::fs::File::create(&path) {
157        Ok(f) => std::io::BufWriter::new(f),
158
159        Err(e) =>
160            panic!("Error creating qlog file attempted path was {path:?}: {e}"),
161    }
162}
163
164fn dump_json(reqs: &[Http3Request], output_sink: &mut dyn FnMut(String)) {
165    let mut out = String::new();
166
167    writeln!(out, "{{").unwrap();
168    writeln!(out, "  \"entries\": [").unwrap();
169    let mut reqs = reqs.iter().peekable();
170
171    while let Some(req) = reqs.next() {
172        writeln!(out, "  {{").unwrap();
173        writeln!(out, "    \"request\":{{").unwrap();
174        writeln!(out, "      \"headers\":[").unwrap();
175
176        let mut req_hdrs = req.hdrs.iter().peekable();
177        while let Some(h) = req_hdrs.next() {
178            writeln!(out, "        {{").unwrap();
179            writeln!(
180                out,
181                "          \"name\": \"{}\",",
182                std::str::from_utf8(h.name()).unwrap()
183            )
184            .unwrap();
185            writeln!(
186                out,
187                "          \"value\": \"{}\"",
188                std::str::from_utf8(h.value()).unwrap().replace('"', "\\\"")
189            )
190            .unwrap();
191
192            if req_hdrs.peek().is_some() {
193                writeln!(out, "        }},").unwrap();
194            } else {
195                writeln!(out, "        }}").unwrap();
196            }
197        }
198        writeln!(out, "      ]}},").unwrap();
199
200        writeln!(out, "    \"response\":{{").unwrap();
201        writeln!(out, "      \"headers\":[").unwrap();
202
203        let mut response_hdrs = req.response_hdrs.iter().peekable();
204        while let Some(h) = response_hdrs.next() {
205            writeln!(out, "        {{").unwrap();
206            writeln!(
207                out,
208                "          \"name\": \"{}\",",
209                std::str::from_utf8(h.name()).unwrap()
210            )
211            .unwrap();
212            writeln!(
213                out,
214                "          \"value\": \"{}\"",
215                std::str::from_utf8(h.value()).unwrap().replace('"', "\\\"")
216            )
217            .unwrap();
218
219            if response_hdrs.peek().is_some() {
220                writeln!(out, "        }},").unwrap();
221            } else {
222                writeln!(out, "        }}").unwrap();
223            }
224        }
225        writeln!(out, "      ],").unwrap();
226        writeln!(out, "      \"body\": {:?}", req.response_body).unwrap();
227        writeln!(out, "    }}").unwrap();
228
229        if reqs.peek().is_some() {
230            writeln!(out, "}},").unwrap();
231        } else {
232            writeln!(out, "}}").unwrap();
233        }
234    }
235    writeln!(out, "]").unwrap();
236    writeln!(out, "}}").unwrap();
237
238    output_sink(out);
239}
240
241pub fn hdrs_to_strings(hdrs: &[quiche::h3::Header]) -> Vec<(String, String)> {
242    hdrs.iter()
243        .map(|h| {
244            let name = String::from_utf8_lossy(h.name()).to_string();
245            let value = String::from_utf8_lossy(h.value()).to_string();
246
247            (name, value)
248        })
249        .collect()
250}
251
252/// Generate a new pair of Source Connection ID and reset token.
253pub fn generate_cid_and_reset_token<T: SecureRandom>(
254    rng: &T,
255) -> (quiche::ConnectionId<'static>, u128) {
256    let mut scid = [0; quiche::MAX_CONN_ID_LEN];
257    rng.fill(&mut scid).unwrap();
258    let scid = scid.to_vec().into();
259    let mut reset_token = [0; 16];
260    rng.fill(&mut reset_token).unwrap();
261    let reset_token = u128::from_be_bytes(reset_token);
262    (scid, reset_token)
263}
264
265/// Construct a priority field value from quiche apps custom query string.
266pub fn priority_field_value_from_query_string(url: &url::Url) -> Option<String> {
267    let mut priority = "".to_string();
268    for param in url.query_pairs() {
269        if param.0 == "u" {
270            write!(priority, "{}={},", param.0, param.1).ok();
271        }
272
273        if param.0 == "i" && param.1 == "1" {
274            priority.push_str("i,");
275        }
276    }
277
278    if !priority.is_empty() {
279        // remove trailing comma
280        priority.pop();
281
282        Some(priority)
283    } else {
284        None
285    }
286}
287
288/// Construct a Priority from quiche apps custom query string.
289pub fn priority_from_query_string(url: &url::Url) -> Option<Priority> {
290    let mut urgency = None;
291    let mut incremental = None;
292    for param in url.query_pairs() {
293        if param.0 == "u" {
294            urgency = Some(param.1.parse::<u8>().unwrap());
295        }
296
297        if param.0 == "i" && param.1 == "1" {
298            incremental = Some(true);
299        }
300    }
301
302    match (urgency, incremental) {
303        (Some(u), Some(i)) => Some(Priority::new(u, i)),
304
305        (Some(u), None) => Some(Priority::new(u, false)),
306
307        (None, Some(i)) => Some(Priority::new(3, i)),
308
309        (None, None) => None,
310    }
311}
312
313fn send_h3_dgram(
314    conn: &mut quiche::Connection, flow_id: u64, dgram_content: &[u8],
315) -> quiche::Result<()> {
316    info!(
317        "sending HTTP/3 DATAGRAM on flow_id={flow_id} with data {dgram_content:?}"
318    );
319
320    let len = octets::varint_len(flow_id) + dgram_content.len();
321    let mut d = vec![0; len];
322    let mut b = octets::OctetsMut::with_slice(&mut d);
323
324    b.put_varint(flow_id)
325        .map_err(|_| quiche::Error::BufferTooShort)?;
326    b.put_bytes(dgram_content)
327        .map_err(|_| quiche::Error::BufferTooShort)?;
328
329    conn.dgram_send(&d)
330}
331
332pub trait HttpConn {
333    fn send_requests(
334        &mut self, conn: &mut quiche::Connection, target_path: &Option<String>,
335    );
336
337    fn handle_responses(
338        &mut self, conn: &mut quiche::Connection, buf: &mut [u8],
339        req_start: &std::time::Instant,
340    );
341
342    fn report_incomplete(&self, start: &std::time::Instant) -> bool;
343
344    fn handle_requests(
345        &mut self, conn: &mut quiche::Connection,
346        partial_requests: &mut HashMap<u64, PartialRequest>,
347        partial_responses: &mut HashMap<u64, PartialResponse>, root: &str,
348        index: &str, buf: &mut [u8],
349    ) -> quiche::h3::Result<()>;
350
351    fn handle_writable(
352        &mut self, conn: &mut quiche::Connection,
353        partial_responses: &mut HashMap<u64, PartialResponse>, stream_id: u64,
354    );
355}
356
357pub fn writable_response_streams(
358    conn: &quiche::Connection,
359) -> impl Iterator<Item = u64> {
360    conn.writable().filter(|id| id % 4 == 0)
361}
362
363/// Represents an HTTP/0.9 formatted request.
364pub struct Http09Request {
365    url: url::Url,
366    cardinal: u64,
367    request_line: String,
368    stream_id: Option<u64>,
369    response_writer: Option<std::io::BufWriter<std::fs::File>>,
370}
371
372/// Represents an HTTP/3 formatted request.
373struct Http3Request {
374    url: url::Url,
375    cardinal: u64,
376    stream_id: Option<u64>,
377    hdrs: Vec<quiche::h3::Header>,
378    priority: Option<Priority>,
379    response_hdrs: Vec<quiche::h3::Header>,
380    response_body: Vec<u8>,
381    response_body_max: usize,
382    response_writer: Option<std::io::BufWriter<std::fs::File>>,
383}
384
385type Http3ResponseBuilderResult = std::result::Result<
386    (Vec<quiche::h3::Header>, Vec<u8>, Vec<u8>),
387    (u64, String),
388>;
389
390pub struct Http09Conn {
391    stream_id: u64,
392    reqs_sent: usize,
393    reqs_complete: usize,
394    reqs: Vec<Http09Request>,
395    output_sink: Rc<RefCell<dyn FnMut(String)>>,
396}
397
398impl Default for Http09Conn {
399    fn default() -> Self {
400        Http09Conn {
401            stream_id: Default::default(),
402            reqs_sent: Default::default(),
403            reqs_complete: Default::default(),
404            reqs: Default::default(),
405            output_sink: Rc::new(RefCell::new(stdout_sink)),
406        }
407    }
408}
409
410impl Http09Conn {
411    pub fn with_urls(
412        urls: &[url::Url], reqs_cardinal: u64,
413        output_sink: Rc<RefCell<dyn FnMut(String)>>,
414    ) -> Box<dyn HttpConn> {
415        let mut reqs = Vec::new();
416        for url in urls {
417            for i in 1..=reqs_cardinal {
418                let request_line = format!("GET {}\r\n", url.path());
419                reqs.push(Http09Request {
420                    url: url.clone(),
421                    cardinal: i,
422                    request_line,
423                    stream_id: None,
424                    response_writer: None,
425                });
426            }
427        }
428
429        let h_conn = Http09Conn {
430            stream_id: 0,
431            reqs_sent: 0,
432            reqs_complete: 0,
433            reqs,
434            output_sink,
435        };
436
437        Box::new(h_conn)
438    }
439}
440
441impl HttpConn for Http09Conn {
442    fn send_requests(
443        &mut self, conn: &mut quiche::Connection, target_path: &Option<String>,
444    ) {
445        let mut reqs_done = 0;
446
447        for req in self.reqs.iter_mut().skip(self.reqs_sent) {
448            match conn.stream_send(
449                self.stream_id,
450                req.request_line.as_bytes(),
451                true,
452            ) {
453                Ok(v) => v,
454
455                Err(quiche::Error::StreamLimit) => {
456                    debug!("not enough stream credits, retry later...");
457                    break;
458                },
459
460                Err(e) => {
461                    error!("failed to send request {e:?}");
462                    break;
463                },
464            };
465
466            debug!("sending HTTP request {:?}", req.request_line);
467
468            req.stream_id = Some(self.stream_id);
469            req.response_writer =
470                make_resource_writer(&req.url, target_path, req.cardinal);
471
472            self.stream_id += 4;
473
474            reqs_done += 1;
475        }
476
477        self.reqs_sent += reqs_done;
478    }
479
480    fn handle_responses(
481        &mut self, conn: &mut quiche::Connection, buf: &mut [u8],
482        req_start: &std::time::Instant,
483    ) {
484        // Process all readable streams.
485        for s in conn.readable() {
486            while let Ok((read, fin)) = conn.stream_recv(s, buf) {
487                trace!("received {read} bytes");
488
489                let stream_buf = &buf[..read];
490
491                trace!(
492                    "stream {} has {} bytes (fin? {})",
493                    s,
494                    stream_buf.len(),
495                    fin
496                );
497
498                let req = self
499                    .reqs
500                    .iter_mut()
501                    .find(|r| r.stream_id == Some(s))
502                    .unwrap();
503
504                match &mut req.response_writer {
505                    Some(rw) => {
506                        rw.write_all(&buf[..read]).ok();
507                    },
508
509                    None => {
510                        self.output_sink.borrow_mut()(unsafe {
511                            String::from_utf8_unchecked(stream_buf.to_vec())
512                        });
513                    },
514                }
515
516                // The server reported that it has no more data to send on
517                // a client-initiated
518                // bidirectional stream, which means
519                // we got the full response. If all responses are received
520                // then close the connection.
521                if &s % 4 == 0 && fin {
522                    self.reqs_complete += 1;
523                    let reqs_count = self.reqs.len();
524
525                    debug!(
526                        "{}/{} responses received",
527                        self.reqs_complete, reqs_count
528                    );
529
530                    if self.reqs_complete == reqs_count {
531                        info!(
532                            "{}/{} response(s) received in {:?}, closing...",
533                            self.reqs_complete,
534                            reqs_count,
535                            req_start.elapsed()
536                        );
537
538                        match conn.close(true, 0x00, b"kthxbye") {
539                            // Already closed.
540                            Ok(_) | Err(quiche::Error::Done) => (),
541
542                            Err(e) => panic!("error closing conn: {e:?}"),
543                        }
544
545                        break;
546                    }
547                }
548            }
549        }
550    }
551
552    fn report_incomplete(&self, start: &std::time::Instant) -> bool {
553        if self.reqs_complete != self.reqs.len() {
554            error!(
555                "connection timed out after {:?} and only completed {}/{} requests",
556                start.elapsed(),
557                self.reqs_complete,
558                self.reqs.len()
559            );
560
561            return true;
562        }
563
564        false
565    }
566
567    fn handle_requests(
568        &mut self, conn: &mut quiche::Connection,
569        partial_requests: &mut HashMap<u64, PartialRequest>,
570        partial_responses: &mut HashMap<u64, PartialResponse>, root: &str,
571        index: &str, buf: &mut [u8],
572    ) -> quiche::h3::Result<()> {
573        // Process all readable streams.
574        for s in conn.readable() {
575            while let Ok((read, fin)) = conn.stream_recv(s, buf) {
576                trace!("{} received {} bytes", conn.trace_id(), read);
577
578                let stream_buf = &buf[..read];
579
580                trace!(
581                    "{} stream {} has {} bytes (fin? {})",
582                    conn.trace_id(),
583                    s,
584                    stream_buf.len(),
585                    fin
586                );
587
588                let stream_buf =
589                    if let Some(partial) = partial_requests.get_mut(&s) {
590                        partial.req.extend_from_slice(stream_buf);
591
592                        if !partial.req.ends_with(b"\r\n") {
593                            return Ok(());
594                        }
595
596                        &partial.req
597                    } else {
598                        if !stream_buf.ends_with(b"\r\n") {
599                            let request = PartialRequest {
600                                req: stream_buf.to_vec(),
601                            };
602
603                            partial_requests.insert(s, request);
604                            return Ok(());
605                        }
606
607                        stream_buf
608                    };
609
610                if stream_buf.starts_with(b"GET ") {
611                    let uri = &stream_buf[4..stream_buf.len() - 2];
612                    let uri = String::from_utf8(uri.to_vec()).unwrap();
613                    let uri = String::from(uri.lines().next().unwrap());
614                    let uri = path::Path::new(&uri);
615                    let mut path = path::PathBuf::from(root);
616
617                    partial_requests.remove(&s);
618
619                    for c in uri.components() {
620                        if let path::Component::Normal(v) = c {
621                            path.push(v)
622                        }
623                    }
624
625                    path = autoindex(path, index);
626
627                    info!(
628                        "{} got GET request for {:?} on stream {}",
629                        conn.trace_id(),
630                        path,
631                        s
632                    );
633
634                    let body = std::fs::read(path.as_path())
635                        .unwrap_or_else(|_| b"Not Found!\r\n".to_vec());
636
637                    info!(
638                        "{} sending response of size {} on stream {}",
639                        conn.trace_id(),
640                        body.len(),
641                        s
642                    );
643
644                    let written = match conn.stream_send(s, &body, true) {
645                        Ok(v) => v,
646
647                        Err(quiche::Error::Done) => 0,
648
649                        Err(e) => {
650                            error!(
651                                "{} stream send failed {:?}",
652                                conn.trace_id(),
653                                e
654                            );
655                            return Err(From::from(e));
656                        },
657                    };
658
659                    if written < body.len() {
660                        let response = PartialResponse {
661                            headers: None,
662                            priority: None,
663                            body,
664                            written,
665                        };
666
667                        partial_responses.insert(s, response);
668                    }
669                }
670            }
671        }
672
673        Ok(())
674    }
675
676    fn handle_writable(
677        &mut self, conn: &mut quiche::Connection,
678        partial_responses: &mut HashMap<u64, PartialResponse>, stream_id: u64,
679    ) {
680        debug!(
681            "{} response stream {} is writable with capacity {:?}",
682            conn.trace_id(),
683            stream_id,
684            conn.stream_capacity(stream_id)
685        );
686
687        if !partial_responses.contains_key(&stream_id) {
688            return;
689        }
690
691        let resp = partial_responses.get_mut(&stream_id).unwrap();
692        let body = &resp.body[resp.written..];
693
694        let written = match conn.stream_send(stream_id, body, true) {
695            Ok(v) => v,
696
697            Err(quiche::Error::Done) => 0,
698
699            Err(e) => {
700                partial_responses.remove(&stream_id);
701
702                error!("{} stream send failed {:?}", conn.trace_id(), e);
703                return;
704            },
705        };
706
707        resp.written += written;
708
709        if resp.written == resp.body.len() {
710            partial_responses.remove(&stream_id);
711        }
712    }
713}
714
715pub struct Http3DgramSender {
716    dgram_count: u64,
717    pub dgram_content: String,
718    pub flow_id: u64,
719    pub dgrams_sent: u64,
720}
721
722impl Http3DgramSender {
723    pub fn new(dgram_count: u64, dgram_content: String, flow_id: u64) -> Self {
724        Self {
725            dgram_count,
726            dgram_content,
727            flow_id,
728            dgrams_sent: 0,
729        }
730    }
731}
732
733fn make_h3_config(
734    max_field_section_size: Option<u64>, qpack_max_table_capacity: Option<u64>,
735    qpack_blocked_streams: Option<u64>,
736) -> quiche::h3::Config {
737    let mut config = quiche::h3::Config::new().unwrap();
738
739    if let Some(v) = max_field_section_size {
740        config.set_max_field_section_size(v);
741    }
742
743    if let Some(v) = qpack_max_table_capacity {
744        // quiche doesn't support dynamic QPACK, so clamp to 0 for now.
745        config.set_qpack_max_table_capacity(v.clamp(0, 0));
746    }
747
748    if let Some(v) = qpack_blocked_streams {
749        // quiche doesn't support dynamic QPACK, so clamp to 0 for now.
750        config.set_qpack_blocked_streams(v.clamp(0, 0));
751    }
752
753    config
754}
755
756pub struct Http3Conn {
757    h3_conn: quiche::h3::Connection,
758    reqs_hdrs_sent: usize,
759    reqs_complete: usize,
760    largest_processed_request: u64,
761    reqs: Vec<Http3Request>,
762    body: Option<Vec<u8>>,
763    sent_body_bytes: HashMap<u64, usize>,
764    dump_json: bool,
765    dgram_sender: Option<Http3DgramSender>,
766    output_sink: Rc<RefCell<dyn FnMut(String)>>,
767}
768
769impl Http3Conn {
770    #[allow(clippy::too_many_arguments)]
771    pub fn with_urls(
772        conn: &mut quiche::Connection, urls: &[url::Url], reqs_cardinal: u64,
773        req_headers: &[String], body: &Option<Vec<u8>>, method: &str,
774        send_priority_update: bool, max_field_section_size: Option<u64>,
775        qpack_max_table_capacity: Option<u64>,
776        qpack_blocked_streams: Option<u64>, dump_json: Option<usize>,
777        dgram_sender: Option<Http3DgramSender>,
778        output_sink: Rc<RefCell<dyn FnMut(String)>>,
779    ) -> Box<dyn HttpConn> {
780        let mut reqs = Vec::new();
781        for url in urls {
782            for i in 1..=reqs_cardinal {
783                let authority = match url.port() {
784                    Some(port) => format!("{}:{}", url.host_str().unwrap(), port),
785
786                    None => url.host_str().unwrap().to_string(),
787                };
788
789                let mut hdrs = vec![
790                    quiche::h3::Header::new(b":method", method.as_bytes()),
791                    quiche::h3::Header::new(b":scheme", url.scheme().as_bytes()),
792                    quiche::h3::Header::new(b":authority", authority.as_bytes()),
793                    quiche::h3::Header::new(
794                        b":path",
795                        url[url::Position::BeforePath..].as_bytes(),
796                    ),
797                    quiche::h3::Header::new(b"user-agent", b"quiche"),
798                ];
799
800                let priority = if send_priority_update {
801                    priority_from_query_string(url)
802                } else {
803                    None
804                };
805
806                // Add custom headers to the request.
807                for header in req_headers {
808                    let header_split: Vec<&str> =
809                        header.splitn(2, ": ").collect();
810
811                    if header_split.len() != 2 {
812                        panic!("malformed header provided - \"{header}\"");
813                    }
814
815                    hdrs.push(quiche::h3::Header::new(
816                        header_split[0].as_bytes(),
817                        header_split[1].as_bytes(),
818                    ));
819                }
820
821                if body.is_some() {
822                    hdrs.push(quiche::h3::Header::new(
823                        b"content-length",
824                        body.as_ref().unwrap().len().to_string().as_bytes(),
825                    ));
826                }
827
828                reqs.push(Http3Request {
829                    url: url.clone(),
830                    cardinal: i,
831                    hdrs,
832                    priority,
833                    response_hdrs: Vec::new(),
834                    response_body: Vec::new(),
835                    response_body_max: dump_json.unwrap_or_default(),
836                    stream_id: None,
837                    response_writer: None,
838                });
839            }
840        }
841
842        let h_conn = Http3Conn {
843            h3_conn: quiche::h3::Connection::with_transport(
844                conn,
845                &make_h3_config(
846                    max_field_section_size,
847                    qpack_max_table_capacity,
848                    qpack_blocked_streams,
849                ),
850            ).expect("Unable to create HTTP/3 connection, check the server's uni stream limit and window size"),
851            reqs_hdrs_sent: 0,
852            reqs_complete: 0,
853            largest_processed_request: 0,
854            reqs,
855            body: body.as_ref().map(|b| b.to_vec()),
856            sent_body_bytes: HashMap::new(),
857            dump_json: dump_json.is_some(),
858            dgram_sender,
859            output_sink,
860        };
861
862        Box::new(h_conn)
863    }
864
865    pub fn with_conn(
866        conn: &mut quiche::Connection, max_field_section_size: Option<u64>,
867        qpack_max_table_capacity: Option<u64>,
868        qpack_blocked_streams: Option<u64>,
869        dgram_sender: Option<Http3DgramSender>,
870        output_sink: Rc<RefCell<dyn FnMut(String)>>,
871    ) -> std::result::Result<Box<dyn HttpConn>, String> {
872        let h3_conn = quiche::h3::Connection::with_transport(
873            conn,
874            &make_h3_config(
875                max_field_section_size,
876                qpack_max_table_capacity,
877                qpack_blocked_streams,
878            ),
879        ).map_err(|_| "Unable to create HTTP/3 connection, check the client's uni stream limit and window size")?;
880
881        let h_conn = Http3Conn {
882            h3_conn,
883            reqs_hdrs_sent: 0,
884            reqs_complete: 0,
885            largest_processed_request: 0,
886            reqs: Vec::new(),
887            body: None,
888            sent_body_bytes: HashMap::new(),
889            dump_json: false,
890            dgram_sender,
891            output_sink,
892        };
893
894        Ok(Box::new(h_conn))
895    }
896
897    /// Builds an HTTP/3 response given a request.
898    fn build_h3_response(
899        root: &str, index: &str, request: &[quiche::h3::Header],
900    ) -> Http3ResponseBuilderResult {
901        let mut file_path = path::PathBuf::from(root);
902        let mut scheme = None;
903        let mut authority = None;
904        let mut host = None;
905        let mut path = None;
906        let mut method = None;
907        let mut priority = vec![];
908
909        // Parse some of the request headers.
910        for hdr in request {
911            match hdr.name() {
912                b":scheme" => {
913                    if scheme.is_some() {
914                        return Err((
915                            H3_MESSAGE_ERROR,
916                            ":scheme cannot be duplicated".to_string(),
917                        ));
918                    }
919
920                    scheme = Some(std::str::from_utf8(hdr.value()).unwrap());
921                },
922
923                b":authority" => {
924                    if authority.is_some() {
925                        return Err((
926                            H3_MESSAGE_ERROR,
927                            ":authority cannot be duplicated".to_string(),
928                        ));
929                    }
930
931                    authority = Some(std::str::from_utf8(hdr.value()).unwrap());
932                },
933
934                b":path" => {
935                    if path.is_some() {
936                        return Err((
937                            H3_MESSAGE_ERROR,
938                            ":path cannot be duplicated".to_string(),
939                        ));
940                    }
941
942                    path = Some(std::str::from_utf8(hdr.value()).unwrap())
943                },
944
945                b":method" => {
946                    if method.is_some() {
947                        return Err((
948                            H3_MESSAGE_ERROR,
949                            ":method cannot be duplicated".to_string(),
950                        ));
951                    }
952
953                    method = Some(std::str::from_utf8(hdr.value()).unwrap())
954                },
955
956                b":protocol" => {
957                    return Err((
958                        H3_MESSAGE_ERROR,
959                        ":protocol not supported".to_string(),
960                    ));
961                },
962
963                b"priority" => priority = hdr.value().to_vec(),
964
965                b"host" => host = Some(std::str::from_utf8(hdr.value()).unwrap()),
966
967                _ => (),
968            }
969        }
970
971        let decided_method = match method {
972            Some(method) => {
973                match method {
974                    "" =>
975                        return Err((
976                            H3_MESSAGE_ERROR,
977                            ":method value cannot be empty".to_string(),
978                        )),
979
980                    "CONNECT" => {
981                        // not allowed
982                        let headers = vec![
983                            quiche::h3::Header::new(
984                                b":status",
985                                "405".to_string().as_bytes(),
986                            ),
987                            quiche::h3::Header::new(b"server", b"quiche"),
988                        ];
989
990                        return Ok((headers, b"".to_vec(), Default::default()));
991                    },
992
993                    _ => method,
994                }
995            },
996
997            None =>
998                return Err((
999                    H3_MESSAGE_ERROR,
1000                    ":method cannot be missing".to_string(),
1001                )),
1002        };
1003
1004        let decided_scheme = match scheme {
1005            Some(scheme) => {
1006                if scheme != "http" && scheme != "https" {
1007                    let headers = vec![
1008                        quiche::h3::Header::new(
1009                            b":status",
1010                            "400".to_string().as_bytes(),
1011                        ),
1012                        quiche::h3::Header::new(b"server", b"quiche"),
1013                    ];
1014
1015                    return Ok((
1016                        headers,
1017                        b"Invalid scheme".to_vec(),
1018                        Default::default(),
1019                    ));
1020                }
1021
1022                scheme
1023            },
1024
1025            None =>
1026                return Err((
1027                    H3_MESSAGE_ERROR,
1028                    ":scheme cannot be missing".to_string(),
1029                )),
1030        };
1031
1032        let decided_host = match (authority, host) {
1033            (None, Some("")) =>
1034                return Err((
1035                    H3_MESSAGE_ERROR,
1036                    "host value cannot be empty".to_string(),
1037                )),
1038
1039            (Some(""), None) =>
1040                return Err((
1041                    H3_MESSAGE_ERROR,
1042                    ":authority value cannot be empty".to_string(),
1043                )),
1044
1045            (Some(""), Some("")) =>
1046                return Err((
1047                    H3_MESSAGE_ERROR,
1048                    ":authority and host value cannot be empty".to_string(),
1049                )),
1050
1051            (None, None) =>
1052                return Err((
1053                    H3_MESSAGE_ERROR,
1054                    ":authority and host missing".to_string(),
1055                )),
1056
1057            // Any other combo, prefer :authority
1058            (..) => authority.unwrap(),
1059        };
1060
1061        let decided_path = match path {
1062            Some("") =>
1063                return Err((
1064                    H3_MESSAGE_ERROR,
1065                    ":path value cannot be empty".to_string(),
1066                )),
1067
1068            None =>
1069                return Err((
1070                    H3_MESSAGE_ERROR,
1071                    ":path cannot be missing".to_string(),
1072                )),
1073
1074            Some(path) => path,
1075        };
1076
1077        let url = format!("{decided_scheme}://{decided_host}{decided_path}");
1078        let url = url::Url::parse(&url).unwrap();
1079
1080        let pathbuf = path::PathBuf::from(url.path());
1081        let pathbuf = autoindex(pathbuf, index);
1082
1083        // Priority query string takes precedence over the header.
1084        // So replace the header with one built here.
1085        let query_priority = priority_field_value_from_query_string(&url);
1086
1087        if let Some(p) = query_priority {
1088            priority = p.as_bytes().to_vec();
1089        }
1090
1091        let (status, body) = match decided_method {
1092            "GET" => {
1093                for c in pathbuf.components() {
1094                    if let path::Component::Normal(v) = c {
1095                        file_path.push(v)
1096                    }
1097                }
1098
1099                match std::fs::read(file_path.as_path()) {
1100                    Ok(data) => (200, data),
1101
1102                    Err(_) => (404, b"Not Found!".to_vec()),
1103                }
1104            },
1105
1106            _ => (405, Vec::new()),
1107        };
1108
1109        let headers = vec![
1110            quiche::h3::Header::new(b":status", status.to_string().as_bytes()),
1111            quiche::h3::Header::new(b"server", b"quiche"),
1112            quiche::h3::Header::new(
1113                b"content-length",
1114                body.len().to_string().as_bytes(),
1115            ),
1116        ];
1117
1118        Ok((headers, body, priority))
1119    }
1120}
1121
1122impl HttpConn for Http3Conn {
1123    fn send_requests(
1124        &mut self, conn: &mut quiche::Connection, target_path: &Option<String>,
1125    ) {
1126        let mut reqs_done = 0;
1127
1128        // First send headers.
1129        for req in self.reqs.iter_mut().skip(self.reqs_hdrs_sent) {
1130            let s = match self.h3_conn.send_request(
1131                conn,
1132                &req.hdrs,
1133                self.body.is_none(),
1134            ) {
1135                Ok(v) => v,
1136
1137                Err(quiche::h3::Error::TransportError(
1138                    quiche::Error::StreamLimit,
1139                )) => {
1140                    debug!("not enough stream credits, retry later...");
1141                    break;
1142                },
1143
1144                Err(quiche::h3::Error::StreamBlocked) => {
1145                    debug!("stream is blocked, retry later...");
1146                    break;
1147                },
1148
1149                Err(e) => {
1150                    error!("failed to send request {e:?}");
1151                    break;
1152                },
1153            };
1154
1155            debug!("Sent HTTP request {:?}", &req.hdrs);
1156
1157            if let Some(priority) = &req.priority {
1158                // If sending the priority fails, don't try again.
1159                self.h3_conn
1160                    .send_priority_update_for_request(conn, s, priority)
1161                    .ok();
1162            }
1163
1164            req.stream_id = Some(s);
1165            req.response_writer =
1166                make_resource_writer(&req.url, target_path, req.cardinal);
1167            self.sent_body_bytes.insert(s, 0);
1168
1169            reqs_done += 1;
1170        }
1171        self.reqs_hdrs_sent += reqs_done;
1172
1173        // Then send any remaining body.
1174        if let Some(body) = &self.body {
1175            for (stream_id, sent_bytes) in self.sent_body_bytes.iter_mut() {
1176                if *sent_bytes == body.len() {
1177                    continue;
1178                }
1179
1180                // Always try to send all remaining bytes, so always set fin to
1181                // true.
1182                let sent = match self.h3_conn.send_body(
1183                    conn,
1184                    *stream_id,
1185                    &body[*sent_bytes..],
1186                    true,
1187                ) {
1188                    Ok(v) => v,
1189
1190                    Err(quiche::h3::Error::Done) => 0,
1191
1192                    Err(e) => {
1193                        error!("failed to send request body {e:?}");
1194                        continue;
1195                    },
1196                };
1197
1198                *sent_bytes += sent;
1199            }
1200        }
1201
1202        // And finally any DATAGRAMS.
1203        if let Some(ds) = self.dgram_sender.as_mut() {
1204            let mut dgrams_done = 0;
1205
1206            for _ in ds.dgrams_sent..ds.dgram_count {
1207                match send_h3_dgram(conn, ds.flow_id, ds.dgram_content.as_bytes())
1208                {
1209                    Ok(v) => v,
1210
1211                    Err(e) => {
1212                        error!("failed to send dgram {e:?}");
1213                        break;
1214                    },
1215                }
1216
1217                dgrams_done += 1;
1218            }
1219
1220            ds.dgrams_sent += dgrams_done;
1221        }
1222    }
1223
1224    fn handle_responses(
1225        &mut self, conn: &mut quiche::Connection, buf: &mut [u8],
1226        req_start: &std::time::Instant,
1227    ) {
1228        loop {
1229            match self.h3_conn.poll(conn) {
1230                Ok((stream_id, quiche::h3::Event::Headers { list, .. })) => {
1231                    debug!(
1232                        "got response headers {:?} on stream id {}",
1233                        hdrs_to_strings(&list),
1234                        stream_id
1235                    );
1236
1237                    let req = self
1238                        .reqs
1239                        .iter_mut()
1240                        .find(|r| r.stream_id == Some(stream_id))
1241                        .unwrap();
1242
1243                    req.response_hdrs = list;
1244                },
1245
1246                Ok((stream_id, quiche::h3::Event::Data)) => {
1247                    while let Ok(read) =
1248                        self.h3_conn.recv_body(conn, stream_id, buf)
1249                    {
1250                        debug!(
1251                            "got {read} bytes of response data on stream {stream_id}"
1252                        );
1253
1254                        let req = self
1255                            .reqs
1256                            .iter_mut()
1257                            .find(|r| r.stream_id == Some(stream_id))
1258                            .unwrap();
1259
1260                        let len = std::cmp::min(
1261                            read,
1262                            req.response_body_max - req.response_body.len(),
1263                        );
1264                        req.response_body.extend_from_slice(&buf[..len]);
1265
1266                        match &mut req.response_writer {
1267                            Some(rw) => {
1268                                rw.write_all(&buf[..read]).ok();
1269                            },
1270
1271                            None =>
1272                                if !self.dump_json {
1273                                    self.output_sink.borrow_mut()(unsafe {
1274                                        String::from_utf8_unchecked(
1275                                            buf[..read].to_vec(),
1276                                        )
1277                                    });
1278                                },
1279                        }
1280                    }
1281                },
1282
1283                Ok((_stream_id, quiche::h3::Event::Finished)) => {
1284                    self.reqs_complete += 1;
1285                    let reqs_count = self.reqs.len();
1286
1287                    debug!(
1288                        "{}/{} responses received",
1289                        self.reqs_complete, reqs_count
1290                    );
1291
1292                    if self.reqs_complete == reqs_count {
1293                        info!(
1294                            "{}/{} response(s) received in {:?}, closing...",
1295                            self.reqs_complete,
1296                            reqs_count,
1297                            req_start.elapsed()
1298                        );
1299
1300                        if self.dump_json {
1301                            dump_json(
1302                                &self.reqs,
1303                                &mut *self.output_sink.borrow_mut(),
1304                            );
1305                        }
1306
1307                        match conn.close(true, 0x100, b"kthxbye") {
1308                            // Already closed.
1309                            Ok(_) | Err(quiche::Error::Done) => (),
1310
1311                            Err(e) => panic!("error closing conn: {e:?}"),
1312                        }
1313
1314                        break;
1315                    }
1316                },
1317
1318                Ok((_stream_id, quiche::h3::Event::Reset(e))) => {
1319                    error!("request was reset by peer with {e}, closing...");
1320
1321                    match conn.close(true, 0x100, b"kthxbye") {
1322                        // Already closed.
1323                        Ok(_) | Err(quiche::Error::Done) => (),
1324
1325                        Err(e) => panic!("error closing conn: {e:?}"),
1326                    }
1327
1328                    break;
1329                },
1330
1331                Ok((
1332                    prioritized_element_id,
1333                    quiche::h3::Event::PriorityUpdate,
1334                )) => {
1335                    info!(
1336                        "{} PRIORITY_UPDATE triggered for element ID={}",
1337                        conn.trace_id(),
1338                        prioritized_element_id
1339                    );
1340                },
1341
1342                Ok((goaway_id, quiche::h3::Event::GoAway)) => {
1343                    info!(
1344                        "{} got GOAWAY with ID {} ",
1345                        conn.trace_id(),
1346                        goaway_id
1347                    );
1348                },
1349
1350                Err(quiche::h3::Error::Done) => {
1351                    break;
1352                },
1353
1354                Err(e) => {
1355                    error!("HTTP/3 processing failed: {e:?}");
1356
1357                    break;
1358                },
1359            }
1360        }
1361
1362        // Process datagram-related events.
1363        while let Ok(len) = conn.dgram_recv(buf) {
1364            let mut b = octets::Octets::with_slice(buf);
1365            if let Ok(flow_id) = b.get_varint() {
1366                info!(
1367                    "Received DATAGRAM flow_id={} len={} data={:?}",
1368                    flow_id,
1369                    len,
1370                    buf[b.off()..len].to_vec()
1371                );
1372            }
1373        }
1374    }
1375
1376    fn report_incomplete(&self, start: &std::time::Instant) -> bool {
1377        if self.reqs_complete != self.reqs.len() {
1378            error!(
1379                "connection timed out after {:?} and only completed {}/{} requests",
1380                start.elapsed(),
1381                self.reqs_complete,
1382                self.reqs.len()
1383            );
1384
1385            if self.dump_json {
1386                dump_json(&self.reqs, &mut *self.output_sink.borrow_mut());
1387            }
1388
1389            return true;
1390        }
1391
1392        false
1393    }
1394
1395    fn handle_requests(
1396        &mut self, conn: &mut quiche::Connection,
1397        _partial_requests: &mut HashMap<u64, PartialRequest>,
1398        partial_responses: &mut HashMap<u64, PartialResponse>, root: &str,
1399        index: &str, buf: &mut [u8],
1400    ) -> quiche::h3::Result<()> {
1401        // Process HTTP stream-related events.
1402        //
1403        // This loops over any and all received HTTP requests and sends just the
1404        // HTTP response headers.
1405        loop {
1406            match self.h3_conn.poll(conn) {
1407                Ok((stream_id, quiche::h3::Event::Headers { list, .. })) => {
1408                    info!(
1409                        "{} got request {:?} on stream id {}",
1410                        conn.trace_id(),
1411                        hdrs_to_strings(&list),
1412                        stream_id
1413                    );
1414
1415                    self.largest_processed_request =
1416                        std::cmp::max(self.largest_processed_request, stream_id);
1417
1418                    // We decide the response based on headers alone, so
1419                    // stop reading the request stream so that any body
1420                    // is ignored and pointless Data events are not
1421                    // generated.
1422                    conn.stream_shutdown(stream_id, quiche::Shutdown::Read, 0)
1423                        .unwrap();
1424
1425                    let (mut headers, body, mut priority) =
1426                        match Http3Conn::build_h3_response(root, index, &list) {
1427                            Ok(v) => v,
1428
1429                            Err((error_code, _)) => {
1430                                conn.stream_shutdown(
1431                                    stream_id,
1432                                    quiche::Shutdown::Write,
1433                                    error_code,
1434                                )
1435                                .unwrap();
1436                                continue;
1437                            },
1438                        };
1439
1440                    match self.h3_conn.take_last_priority_update(stream_id) {
1441                        Ok(v) => {
1442                            priority = v;
1443                        },
1444
1445                        Err(quiche::h3::Error::Done) => (),
1446
1447                        Err(e) => error!(
1448                            "{} error taking PRIORITY_UPDATE {}",
1449                            conn.trace_id(),
1450                            e
1451                        ),
1452                    }
1453
1454                    if !priority.is_empty() {
1455                        headers.push(quiche::h3::Header::new(
1456                            b"priority",
1457                            priority.as_slice(),
1458                        ));
1459                    }
1460
1461                    #[cfg(feature = "sfv")]
1462                    let priority =
1463                        quiche::h3::Priority::try_from(priority.as_slice())
1464                            .unwrap_or_default();
1465
1466                    #[cfg(not(feature = "sfv"))]
1467                    let priority = quiche::h3::Priority::default();
1468
1469                    info!(
1470                        "{} prioritizing response on stream {} as {:?}",
1471                        conn.trace_id(),
1472                        stream_id,
1473                        priority
1474                    );
1475
1476                    match self.h3_conn.send_response_with_priority(
1477                        conn, stream_id, &headers, &priority, false,
1478                    ) {
1479                        Ok(v) => v,
1480
1481                        Err(quiche::h3::Error::StreamBlocked) => {
1482                            let response = PartialResponse {
1483                                headers: Some(headers),
1484                                priority: Some(priority),
1485                                body,
1486                                written: 0,
1487                            };
1488
1489                            partial_responses.insert(stream_id, response);
1490                            continue;
1491                        },
1492
1493                        Err(e) => {
1494                            error!(
1495                                "{} stream send failed {:?}",
1496                                conn.trace_id(),
1497                                e
1498                            );
1499
1500                            break;
1501                        },
1502                    }
1503
1504                    let response = PartialResponse {
1505                        headers: None,
1506                        priority: None,
1507                        body,
1508                        written: 0,
1509                    };
1510
1511                    partial_responses.insert(stream_id, response);
1512                },
1513
1514                Ok((stream_id, quiche::h3::Event::Data)) => {
1515                    info!(
1516                        "{} got data on stream id {}",
1517                        conn.trace_id(),
1518                        stream_id
1519                    );
1520                },
1521
1522                Ok((_stream_id, quiche::h3::Event::Finished)) => (),
1523
1524                Ok((_stream_id, quiche::h3::Event::Reset { .. })) => (),
1525
1526                Ok((
1527                    prioritized_element_id,
1528                    quiche::h3::Event::PriorityUpdate,
1529                )) => {
1530                    info!(
1531                        "{} PRIORITY_UPDATE triggered for element ID={}",
1532                        conn.trace_id(),
1533                        prioritized_element_id
1534                    );
1535                },
1536
1537                Ok((goaway_id, quiche::h3::Event::GoAway)) => {
1538                    trace!(
1539                        "{} got GOAWAY with ID {} ",
1540                        conn.trace_id(),
1541                        goaway_id
1542                    );
1543                    self.h3_conn
1544                        .send_goaway(conn, self.largest_processed_request)?;
1545                },
1546
1547                Err(quiche::h3::Error::Done) => {
1548                    break;
1549                },
1550
1551                Err(e) => {
1552                    error!("{} HTTP/3 error {:?}", conn.trace_id(), e);
1553
1554                    return Err(e);
1555                },
1556            }
1557        }
1558
1559        // Visit all writable response streams to send HTTP content.
1560        for stream_id in writable_response_streams(conn) {
1561            self.handle_writable(conn, partial_responses, stream_id);
1562        }
1563
1564        // Process datagram-related events.
1565        while let Ok(len) = conn.dgram_recv(buf) {
1566            let mut b = octets::Octets::with_slice(buf);
1567            if let Ok(flow_id) = b.get_varint() {
1568                info!(
1569                    "Received DATAGRAM flow_id={} len={} data={:?}",
1570                    flow_id,
1571                    len,
1572                    buf[b.off()..len].to_vec()
1573                );
1574            }
1575        }
1576
1577        if let Some(ds) = self.dgram_sender.as_mut() {
1578            let mut dgrams_done = 0;
1579
1580            for _ in ds.dgrams_sent..ds.dgram_count {
1581                match send_h3_dgram(conn, ds.flow_id, ds.dgram_content.as_bytes())
1582                {
1583                    Ok(v) => v,
1584
1585                    Err(e) => {
1586                        error!("failed to send dgram {e:?}");
1587                        break;
1588                    },
1589                }
1590
1591                dgrams_done += 1;
1592            }
1593
1594            ds.dgrams_sent += dgrams_done;
1595        }
1596
1597        Ok(())
1598    }
1599
1600    fn handle_writable(
1601        &mut self, conn: &mut quiche::Connection,
1602        partial_responses: &mut HashMap<u64, PartialResponse>, stream_id: u64,
1603    ) {
1604        debug!(
1605            "{} response stream {} is writable with capacity {:?}",
1606            conn.trace_id(),
1607            stream_id,
1608            conn.stream_capacity(stream_id)
1609        );
1610
1611        if !partial_responses.contains_key(&stream_id) {
1612            return;
1613        }
1614
1615        let resp = partial_responses.get_mut(&stream_id).unwrap();
1616
1617        if let (Some(headers), Some(priority)) = (&resp.headers, &resp.priority) {
1618            match self.h3_conn.send_response_with_priority(
1619                conn, stream_id, headers, priority, false,
1620            ) {
1621                Ok(_) => (),
1622
1623                Err(quiche::h3::Error::StreamBlocked) => {
1624                    return;
1625                },
1626
1627                Err(e) => {
1628                    error!("{} stream send failed {:?}", conn.trace_id(), e);
1629                    return;
1630                },
1631            }
1632        }
1633
1634        resp.headers = None;
1635        resp.priority = None;
1636
1637        let body = &resp.body[resp.written..];
1638
1639        let written = match self.h3_conn.send_body(conn, stream_id, body, true) {
1640            Ok(v) => v,
1641
1642            Err(quiche::h3::Error::Done) => 0,
1643
1644            Err(e) => {
1645                partial_responses.remove(&stream_id);
1646
1647                error!("{} stream send failed {:?}", conn.trace_id(), e);
1648                return;
1649            },
1650        };
1651
1652        resp.written += written;
1653
1654        if resp.written == resp.body.len() {
1655            partial_responses.remove(&stream_id);
1656        }
1657    }
1658}