Skip to main content

quiche_apps/
common.rs

1// Copyright (C) 2020, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27//! Quiche application utilities.
28//!
29//! This module provides some utility functions that are common to quiche
30//! applications.
31
32use std::io::prelude::*;
33
34use std::collections::HashMap;
35
36#[cfg(feature = "sfv")]
37use std::convert::TryFrom;
38
39use std::fmt::Write as _;
40
41use std::rc::Rc;
42
43use std::cell::RefCell;
44
45use std::path;
46
47use ring::rand::SecureRandom;
48
49use quiche::ConnectionId;
50
51use quiche::h3::NameValue;
52use quiche::h3::Priority;
53
54pub fn stdout_sink(out: String) {
55    print!("{out}");
56}
57
58const H3_MESSAGE_ERROR: u64 = 0x10E;
59
60/// ALPN helpers.
61///
62/// This module contains constants and functions for working with ALPN.
63pub mod alpns {
64    pub const HTTP_09: [&[u8]; 2] = [b"hq-interop", b"http/0.9"];
65    pub const HTTP_3: [&[u8]; 1] = [b"h3"];
66}
67
68pub struct PartialRequest {
69    pub req: Vec<u8>,
70}
71
72pub struct PartialResponse {
73    pub headers: Option<Vec<quiche::h3::Header>>,
74    pub priority: Option<quiche::h3::Priority>,
75
76    pub body: Vec<u8>,
77
78    pub written: usize,
79}
80
81pub type ClientId = u64;
82
83pub struct Client {
84    pub conn: quiche::Connection,
85
86    pub http_conn: Option<Box<dyn HttpConn>>,
87
88    pub client_id: ClientId,
89
90    pub app_proto_selected: bool,
91
92    pub partial_requests: std::collections::HashMap<u64, PartialRequest>,
93
94    pub partial_responses: std::collections::HashMap<u64, PartialResponse>,
95
96    pub max_datagram_size: usize,
97
98    pub loss_rate: f64,
99
100    pub max_send_burst: usize,
101}
102
103pub type ClientIdMap = HashMap<ConnectionId<'static>, ClientId>;
104pub type ClientMap = HashMap<ClientId, Client>;
105
106/// Makes a buffered writer for a resource with a target URL.
107///
108/// The file will have the same name as the resource's last path segment value.
109/// Multiple requests for the same URL are indicated by the value of `cardinal`,
110/// any value "N" greater than 1, will cause ".N" to be appended to the
111/// filename.
112fn make_resource_writer(
113    url: &url::Url, target_path: &Option<String>, cardinal: u64,
114) -> Option<std::io::BufWriter<std::fs::File>> {
115    if let Some(tp) = target_path {
116        let resource =
117            url.path_segments().map(|c| c.collect::<Vec<_>>()).unwrap();
118
119        let mut path = format!("{}/{}", tp, resource.iter().last().unwrap());
120
121        if cardinal > 1 {
122            path = format!("{path}.{cardinal}");
123        }
124
125        match std::fs::File::create(&path) {
126            Ok(f) => return Some(std::io::BufWriter::new(f)),
127
128            Err(e) => panic!(
129                "Error creating file for {url}, attempted path was {path}: {e}"
130            ),
131        }
132    }
133
134    None
135}
136
137fn autoindex(path: path::PathBuf, index: &str) -> path::PathBuf {
138    if let Some(path_str) = path.to_str() {
139        if path_str.ends_with('/') {
140            let path_str = format!("{path_str}{index}");
141            return path::PathBuf::from(&path_str);
142        }
143    }
144
145    path
146}
147
148/// Makes a buffered writer for a qlog.
149pub fn make_qlog_writer(
150    dir: &std::ffi::OsStr, role: &str, id: &str,
151) -> std::io::BufWriter<std::fs::File> {
152    let mut path = std::path::PathBuf::from(dir);
153    let filename = format!("{role}-{id}.sqlog");
154    path.push(filename);
155
156    match std::fs::File::create(&path) {
157        Ok(f) => std::io::BufWriter::new(f),
158
159        Err(e) =>
160            panic!("Error creating qlog file attempted path was {path:?}: {e}"),
161    }
162}
163
164fn dump_json(reqs: &[Http3Request], output_sink: &mut dyn FnMut(String)) {
165    let mut out = String::new();
166
167    writeln!(out, "{{").unwrap();
168    writeln!(out, "  \"entries\": [").unwrap();
169    let mut reqs = reqs.iter().peekable();
170
171    while let Some(req) = reqs.next() {
172        writeln!(out, "  {{").unwrap();
173        writeln!(out, "    \"request\":{{").unwrap();
174        writeln!(out, "      \"headers\":[").unwrap();
175
176        let mut req_hdrs = req.hdrs.iter().peekable();
177        while let Some(h) = req_hdrs.next() {
178            writeln!(out, "        {{").unwrap();
179            writeln!(
180                out,
181                "          \"name\": \"{}\",",
182                std::str::from_utf8(h.name()).unwrap()
183            )
184            .unwrap();
185            writeln!(
186                out,
187                "          \"value\": \"{}\"",
188                std::str::from_utf8(h.value()).unwrap().replace('"', "\\\"")
189            )
190            .unwrap();
191
192            if req_hdrs.peek().is_some() {
193                writeln!(out, "        }},").unwrap();
194            } else {
195                writeln!(out, "        }}").unwrap();
196            }
197        }
198        writeln!(out, "      ]}},").unwrap();
199
200        writeln!(out, "    \"response\":{{").unwrap();
201        writeln!(out, "      \"headers\":[").unwrap();
202
203        let mut response_hdrs = req.response_hdrs.iter().peekable();
204        while let Some(h) = response_hdrs.next() {
205            writeln!(out, "        {{").unwrap();
206            writeln!(
207                out,
208                "          \"name\": \"{}\",",
209                std::str::from_utf8(h.name()).unwrap()
210            )
211            .unwrap();
212            writeln!(
213                out,
214                "          \"value\": \"{}\"",
215                std::str::from_utf8(h.value()).unwrap().replace('"', "\\\"")
216            )
217            .unwrap();
218
219            if response_hdrs.peek().is_some() {
220                writeln!(out, "        }},").unwrap();
221            } else {
222                writeln!(out, "        }}").unwrap();
223            }
224        }
225        writeln!(out, "      ],").unwrap();
226        writeln!(out, "      \"body\": {:?}", req.response_body).unwrap();
227        writeln!(out, "    }}").unwrap();
228
229        if reqs.peek().is_some() {
230            writeln!(out, "}},").unwrap();
231        } else {
232            writeln!(out, "}}").unwrap();
233        }
234    }
235    writeln!(out, "]").unwrap();
236    writeln!(out, "}}").unwrap();
237
238    output_sink(out);
239}
240
241pub fn hdrs_to_strings(hdrs: &[quiche::h3::Header]) -> Vec<(String, String)> {
242    hdrs.iter()
243        .map(|h| {
244            let name = String::from_utf8_lossy(h.name()).to_string();
245            let value = String::from_utf8_lossy(h.value()).to_string();
246
247            (name, value)
248        })
249        .collect()
250}
251
252/// Generate a new pair of Source Connection ID and reset token.
253pub fn generate_cid_and_reset_token<T: SecureRandom>(
254    rng: &T,
255) -> (quiche::ConnectionId<'static>, u128) {
256    let mut scid = [0; quiche::MAX_CONN_ID_LEN];
257    rng.fill(&mut scid).unwrap();
258    let scid = scid.to_vec().into();
259    let mut reset_token = [0; 16];
260    rng.fill(&mut reset_token).unwrap();
261    let reset_token = u128::from_be_bytes(reset_token);
262    (scid, reset_token)
263}
264
265/// Construct a priority field value from quiche apps custom query string.
266pub fn priority_field_value_from_query_string(url: &url::Url) -> Option<String> {
267    let mut priority = "".to_string();
268    for param in url.query_pairs() {
269        if param.0 == "u" {
270            write!(priority, "{}={},", param.0, param.1).ok();
271        }
272
273        if param.0 == "i" && param.1 == "1" {
274            priority.push_str("i,");
275        }
276    }
277
278    if !priority.is_empty() {
279        // remove trailing comma
280        priority.pop();
281
282        Some(priority)
283    } else {
284        None
285    }
286}
287
288/// Construct a Priority from quiche apps custom query string.
289pub fn priority_from_query_string(url: &url::Url) -> Option<Priority> {
290    let mut urgency = None;
291    let mut incremental = None;
292    for param in url.query_pairs() {
293        if param.0 == "u" {
294            urgency = Some(param.1.parse::<u8>().unwrap());
295        }
296
297        if param.0 == "i" && param.1 == "1" {
298            incremental = Some(true);
299        }
300    }
301
302    match (urgency, incremental) {
303        (Some(u), Some(i)) => Some(Priority::new(u, i)),
304
305        (Some(u), None) => Some(Priority::new(u, false)),
306
307        (None, Some(i)) => Some(Priority::new(3, i)),
308
309        (None, None) => None,
310    }
311}
312
313fn send_h3_dgram(
314    conn: &mut quiche::Connection, flow_id: u64, dgram_content: &[u8],
315) -> quiche::Result<()> {
316    info!(
317        "sending HTTP/3 DATAGRAM on flow_id={flow_id} with data {dgram_content:?}"
318    );
319
320    let len = octets::varint_len(flow_id) + dgram_content.len();
321    let mut d = vec![0; len];
322    let mut b = octets::OctetsMut::with_slice(&mut d);
323
324    b.put_varint(flow_id)
325        .map_err(|_| quiche::Error::BufferTooShort)?;
326    b.put_bytes(dgram_content)
327        .map_err(|_| quiche::Error::BufferTooShort)?;
328
329    conn.dgram_send(&d)
330}
331
332pub trait HttpConn {
333    fn send_requests(
334        &mut self, conn: &mut quiche::Connection, target_path: &Option<String>,
335    );
336
337    fn handle_responses(
338        &mut self, conn: &mut quiche::Connection, buf: &mut [u8],
339        req_start: &std::time::Instant,
340    );
341
342    fn report_incomplete(&self, start: &std::time::Instant) -> bool;
343
344    fn handle_requests(
345        &mut self, conn: &mut quiche::Connection,
346        partial_requests: &mut HashMap<u64, PartialRequest>,
347        partial_responses: &mut HashMap<u64, PartialResponse>, root: &str,
348        index: &str, buf: &mut [u8],
349    ) -> quiche::h3::Result<()>;
350
351    fn handle_writable(
352        &mut self, conn: &mut quiche::Connection,
353        partial_responses: &mut HashMap<u64, PartialResponse>, stream_id: u64,
354    );
355}
356
357pub fn writable_response_streams(
358    conn: &quiche::Connection,
359) -> impl Iterator<Item = u64> {
360    conn.writable().filter(|id| id % 4 == 0)
361}
362
363/// Represents an HTTP/0.9 formatted request.
364pub struct Http09Request {
365    url: url::Url,
366    cardinal: u64,
367    request_line: String,
368    stream_id: Option<u64>,
369    response_writer: Option<std::io::BufWriter<std::fs::File>>,
370}
371
372/// Represents an HTTP/3 formatted request.
373struct Http3Request {
374    url: url::Url,
375    cardinal: u64,
376    stream_id: Option<u64>,
377    hdrs: Vec<quiche::h3::Header>,
378    priority: Option<Priority>,
379    response_hdrs: Vec<quiche::h3::Header>,
380    response_body: Vec<u8>,
381    response_body_max: usize,
382    response_writer: Option<std::io::BufWriter<std::fs::File>>,
383}
384
385type Http3ResponseBuilderResult = std::result::Result<
386    (Vec<quiche::h3::Header>, Vec<u8>, Vec<u8>),
387    (u64, String),
388>;
389
390pub struct Http09Conn {
391    stream_id: u64,
392    reqs_sent: usize,
393    reqs_complete: usize,
394    reqs: Vec<Http09Request>,
395    output_sink: Rc<RefCell<dyn FnMut(String)>>,
396}
397
398impl Default for Http09Conn {
399    fn default() -> Self {
400        Http09Conn {
401            stream_id: Default::default(),
402            reqs_sent: Default::default(),
403            reqs_complete: Default::default(),
404            reqs: Default::default(),
405            output_sink: Rc::new(RefCell::new(stdout_sink)),
406        }
407    }
408}
409
410impl Http09Conn {
411    pub fn with_urls(
412        urls: &[url::Url], reqs_cardinal: u64,
413        output_sink: Rc<RefCell<dyn FnMut(String)>>,
414    ) -> Box<dyn HttpConn> {
415        let mut reqs = Vec::new();
416        for url in urls {
417            for i in 1..=reqs_cardinal {
418                let request_line = format!("GET {}\r\n", url.path());
419                reqs.push(Http09Request {
420                    url: url.clone(),
421                    cardinal: i,
422                    request_line,
423                    stream_id: None,
424                    response_writer: None,
425                });
426            }
427        }
428
429        let h_conn = Http09Conn {
430            stream_id: 0,
431            reqs_sent: 0,
432            reqs_complete: 0,
433            reqs,
434            output_sink,
435        };
436
437        Box::new(h_conn)
438    }
439}
440
441impl HttpConn for Http09Conn {
442    fn send_requests(
443        &mut self, conn: &mut quiche::Connection, target_path: &Option<String>,
444    ) {
445        let mut reqs_done = 0;
446
447        for req in self.reqs.iter_mut().skip(self.reqs_sent) {
448            match conn.stream_send(
449                self.stream_id,
450                req.request_line.as_bytes(),
451                true,
452            ) {
453                Ok(v) => v,
454
455                Err(quiche::Error::StreamLimit) => {
456                    debug!("not enough stream credits, retry later...");
457                    break;
458                },
459
460                Err(e) => {
461                    error!("failed to send request {e:?}");
462                    break;
463                },
464            };
465
466            debug!("sending HTTP request {:?}", req.request_line);
467
468            req.stream_id = Some(self.stream_id);
469            req.response_writer =
470                make_resource_writer(&req.url, target_path, req.cardinal);
471
472            self.stream_id += 4;
473
474            reqs_done += 1;
475        }
476
477        self.reqs_sent += reqs_done;
478    }
479
480    fn handle_responses(
481        &mut self, conn: &mut quiche::Connection, buf: &mut [u8],
482        req_start: &std::time::Instant,
483    ) {
484        // Process all readable streams.
485        for s in conn.readable() {
486            while let Ok((read, fin)) = conn.stream_recv(s, buf) {
487                trace!("received {read} bytes");
488
489                let stream_buf = &buf[..read];
490
491                trace!(
492                    "stream {} has {} bytes (fin? {})",
493                    s,
494                    stream_buf.len(),
495                    fin
496                );
497
498                let req = self
499                    .reqs
500                    .iter_mut()
501                    .find(|r| r.stream_id == Some(s))
502                    .unwrap();
503
504                match &mut req.response_writer {
505                    Some(rw) => {
506                        rw.write_all(&buf[..read]).ok();
507                    },
508
509                    None => {
510                        self.output_sink.borrow_mut()(unsafe {
511                            String::from_utf8_unchecked(stream_buf.to_vec())
512                        });
513                    },
514                }
515
516                // The server reported that it has no more data to send on
517                // a client-initiated
518                // bidirectional stream, which means
519                // we got the full response. If all responses are received
520                // then close the connection.
521                if &s % 4 == 0 && fin {
522                    self.reqs_complete += 1;
523                    let reqs_count = self.reqs.len();
524
525                    debug!(
526                        "{}/{} responses received",
527                        self.reqs_complete, reqs_count
528                    );
529
530                    if self.reqs_complete == reqs_count {
531                        info!(
532                            "{}/{} response(s) received in {:?}, closing...",
533                            self.reqs_complete,
534                            reqs_count,
535                            req_start.elapsed()
536                        );
537
538                        match conn.close(true, 0x00, b"kthxbye") {
539                            // Already closed.
540                            Ok(_) | Err(quiche::Error::Done) => (),
541
542                            Err(e) => panic!("error closing conn: {e:?}"),
543                        }
544
545                        break;
546                    }
547                }
548            }
549        }
550    }
551
552    fn report_incomplete(&self, start: &std::time::Instant) -> bool {
553        if self.reqs_complete != self.reqs.len() {
554            error!(
555                "connection timed out after {:?} and only completed {}/{} requests",
556                start.elapsed(),
557                self.reqs_complete,
558                self.reqs.len()
559            );
560
561            return true;
562        }
563
564        false
565    }
566
567    fn handle_requests(
568        &mut self, conn: &mut quiche::Connection,
569        partial_requests: &mut HashMap<u64, PartialRequest>,
570        partial_responses: &mut HashMap<u64, PartialResponse>, root: &str,
571        index: &str, buf: &mut [u8],
572    ) -> quiche::h3::Result<()> {
573        // Process all readable streams.
574        for s in conn.readable() {
575            while let Ok((read, fin)) = conn.stream_recv(s, buf) {
576                trace!("{} received {} bytes", conn.trace_id(), read);
577
578                let stream_buf = &buf[..read];
579
580                trace!(
581                    "{} stream {} has {} bytes (fin? {})",
582                    conn.trace_id(),
583                    s,
584                    stream_buf.len(),
585                    fin
586                );
587
588                let stream_buf =
589                    if let Some(partial) = partial_requests.get_mut(&s) {
590                        partial.req.extend_from_slice(stream_buf);
591
592                        if !partial.req.ends_with(b"\r\n") {
593                            return Ok(());
594                        }
595
596                        &partial.req
597                    } else {
598                        if !stream_buf.ends_with(b"\r\n") {
599                            let request = PartialRequest {
600                                req: stream_buf.to_vec(),
601                            };
602
603                            partial_requests.insert(s, request);
604                            return Ok(());
605                        }
606
607                        stream_buf
608                    };
609
610                if stream_buf.starts_with(b"GET ") {
611                    let uri = &stream_buf[4..stream_buf.len() - 2];
612                    let uri = String::from_utf8(uri.to_vec()).unwrap();
613                    let uri = String::from(uri.lines().next().unwrap());
614                    let uri = path::Path::new(&uri);
615                    let mut path = path::PathBuf::from(root);
616
617                    partial_requests.remove(&s);
618
619                    for c in uri.components() {
620                        if let path::Component::Normal(v) = c {
621                            path.push(v)
622                        }
623                    }
624
625                    path = autoindex(path, index);
626
627                    info!(
628                        "{} got GET request for {:?} on stream {}",
629                        conn.trace_id(),
630                        path,
631                        s
632                    );
633
634                    let body = std::fs::read(path.as_path())
635                        .unwrap_or_else(|_| b"Not Found!\r\n".to_vec());
636
637                    info!(
638                        "{} sending response of size {} on stream {}",
639                        conn.trace_id(),
640                        body.len(),
641                        s
642                    );
643
644                    let written = match conn.stream_send(s, &body, true) {
645                        Ok(v) => v,
646
647                        Err(quiche::Error::Done) => 0,
648
649                        Err(e) => {
650                            error!(
651                                "{} stream send failed {:?}",
652                                conn.trace_id(),
653                                e
654                            );
655                            return Err(From::from(e));
656                        },
657                    };
658
659                    if written < body.len() {
660                        let response = PartialResponse {
661                            headers: None,
662                            priority: None,
663                            body,
664                            written,
665                        };
666
667                        partial_responses.insert(s, response);
668                    }
669                }
670            }
671        }
672
673        Ok(())
674    }
675
676    fn handle_writable(
677        &mut self, conn: &mut quiche::Connection,
678        partial_responses: &mut HashMap<u64, PartialResponse>, stream_id: u64,
679    ) {
680        let stream_cap = conn.stream_capacity(stream_id);
681
682        debug!(
683            "{} response stream {} is writable with capacity {:?}",
684            conn.trace_id(),
685            stream_id,
686            stream_cap,
687        );
688
689        if !partial_responses.contains_key(&stream_id) {
690            return;
691        }
692
693        let resp = partial_responses.get_mut(&stream_id).unwrap();
694        let body = &resp.body[resp.written..];
695
696        let written = match conn.stream_send(stream_id, body, true) {
697            Ok(v) => v,
698
699            Err(quiche::Error::Done) => 0,
700
701            Err(e) => {
702                partial_responses.remove(&stream_id);
703
704                error!("{} stream send failed {:?}", conn.trace_id(), e);
705                return;
706            },
707        };
708
709        resp.written += written;
710
711        if resp.written == resp.body.len() {
712            partial_responses.remove(&stream_id);
713        }
714    }
715}
716
717pub struct Http3DgramSender {
718    dgram_count: u64,
719    pub dgram_content: String,
720    pub flow_id: u64,
721    pub dgrams_sent: u64,
722}
723
724impl Http3DgramSender {
725    pub fn new(dgram_count: u64, dgram_content: String, flow_id: u64) -> Self {
726        Self {
727            dgram_count,
728            dgram_content,
729            flow_id,
730            dgrams_sent: 0,
731        }
732    }
733}
734
735fn make_h3_config(
736    max_field_section_size: Option<u64>, qpack_max_table_capacity: Option<u64>,
737    qpack_blocked_streams: Option<u64>,
738) -> quiche::h3::Config {
739    let mut config = quiche::h3::Config::new().unwrap();
740
741    if let Some(v) = max_field_section_size {
742        config.set_max_field_section_size(v);
743    }
744
745    if let Some(v) = qpack_max_table_capacity {
746        // quiche doesn't support dynamic QPACK, so clamp to 0 for now.
747        config.set_qpack_max_table_capacity(v.clamp(0, 0));
748    }
749
750    if let Some(v) = qpack_blocked_streams {
751        // quiche doesn't support dynamic QPACK, so clamp to 0 for now.
752        config.set_qpack_blocked_streams(v.clamp(0, 0));
753    }
754
755    config
756}
757
758pub struct Http3Conn {
759    h3_conn: quiche::h3::Connection,
760    reqs_hdrs_sent: usize,
761    reqs_complete: usize,
762    largest_processed_request: u64,
763    reqs: Vec<Http3Request>,
764    body: Option<Vec<u8>>,
765    sent_body_bytes: HashMap<u64, usize>,
766    dump_json: bool,
767    dgram_sender: Option<Http3DgramSender>,
768    output_sink: Rc<RefCell<dyn FnMut(String)>>,
769}
770
771impl Http3Conn {
772    #[allow(clippy::too_many_arguments)]
773    pub fn with_urls(
774        conn: &mut quiche::Connection, urls: &[url::Url], reqs_cardinal: u64,
775        req_headers: &[String], body: &Option<Vec<u8>>, method: &str,
776        send_priority_update: bool, max_field_section_size: Option<u64>,
777        qpack_max_table_capacity: Option<u64>,
778        qpack_blocked_streams: Option<u64>, dump_json: Option<usize>,
779        dgram_sender: Option<Http3DgramSender>,
780        output_sink: Rc<RefCell<dyn FnMut(String)>>,
781    ) -> Box<dyn HttpConn> {
782        let mut reqs = Vec::new();
783        for url in urls {
784            for i in 1..=reqs_cardinal {
785                let authority = match url.port() {
786                    Some(port) => format!("{}:{}", url.host_str().unwrap(), port),
787
788                    None => url.host_str().unwrap().to_string(),
789                };
790
791                let mut hdrs = vec![
792                    quiche::h3::Header::new(b":method", method.as_bytes()),
793                    quiche::h3::Header::new(b":scheme", url.scheme().as_bytes()),
794                    quiche::h3::Header::new(b":authority", authority.as_bytes()),
795                    quiche::h3::Header::new(
796                        b":path",
797                        url[url::Position::BeforePath..].as_bytes(),
798                    ),
799                    quiche::h3::Header::new(b"user-agent", b"quiche"),
800                ];
801
802                let priority = if send_priority_update {
803                    priority_from_query_string(url)
804                } else {
805                    None
806                };
807
808                // Add custom headers to the request.
809                for header in req_headers {
810                    let header_split: Vec<&str> =
811                        header.splitn(2, ": ").collect();
812
813                    if header_split.len() != 2 {
814                        panic!("malformed header provided - \"{header}\"");
815                    }
816
817                    hdrs.push(quiche::h3::Header::new(
818                        header_split[0].as_bytes(),
819                        header_split[1].as_bytes(),
820                    ));
821                }
822
823                if body.is_some() {
824                    hdrs.push(quiche::h3::Header::new(
825                        b"content-length",
826                        body.as_ref().unwrap().len().to_string().as_bytes(),
827                    ));
828                }
829
830                reqs.push(Http3Request {
831                    url: url.clone(),
832                    cardinal: i,
833                    hdrs,
834                    priority,
835                    response_hdrs: Vec::new(),
836                    response_body: Vec::new(),
837                    response_body_max: dump_json.unwrap_or_default(),
838                    stream_id: None,
839                    response_writer: None,
840                });
841            }
842        }
843
844        let h_conn = Http3Conn {
845            h3_conn: quiche::h3::Connection::with_transport(
846                conn,
847                &make_h3_config(
848                    max_field_section_size,
849                    qpack_max_table_capacity,
850                    qpack_blocked_streams,
851                ),
852            ).expect("Unable to create HTTP/3 connection, check the server's uni stream limit and window size"),
853            reqs_hdrs_sent: 0,
854            reqs_complete: 0,
855            largest_processed_request: 0,
856            reqs,
857            body: body.as_ref().map(|b| b.to_vec()),
858            sent_body_bytes: HashMap::new(),
859            dump_json: dump_json.is_some(),
860            dgram_sender,
861            output_sink,
862        };
863
864        Box::new(h_conn)
865    }
866
867    pub fn with_conn(
868        conn: &mut quiche::Connection, max_field_section_size: Option<u64>,
869        qpack_max_table_capacity: Option<u64>,
870        qpack_blocked_streams: Option<u64>,
871        dgram_sender: Option<Http3DgramSender>,
872        output_sink: Rc<RefCell<dyn FnMut(String)>>,
873    ) -> std::result::Result<Box<dyn HttpConn>, String> {
874        let h3_conn = quiche::h3::Connection::with_transport(
875            conn,
876            &make_h3_config(
877                max_field_section_size,
878                qpack_max_table_capacity,
879                qpack_blocked_streams,
880            ),
881        ).map_err(|_| "Unable to create HTTP/3 connection, check the client's uni stream limit and window size")?;
882
883        let h_conn = Http3Conn {
884            h3_conn,
885            reqs_hdrs_sent: 0,
886            reqs_complete: 0,
887            largest_processed_request: 0,
888            reqs: Vec::new(),
889            body: None,
890            sent_body_bytes: HashMap::new(),
891            dump_json: false,
892            dgram_sender,
893            output_sink,
894        };
895
896        Ok(Box::new(h_conn))
897    }
898
899    /// Builds an HTTP/3 response given a request.
900    fn build_h3_response(
901        root: &str, index: &str, request: &[quiche::h3::Header],
902    ) -> Http3ResponseBuilderResult {
903        let mut file_path = path::PathBuf::from(root);
904        let mut scheme = None;
905        let mut authority = None;
906        let mut host = None;
907        let mut path = None;
908        let mut method = None;
909        let mut priority = vec![];
910
911        // Parse some of the request headers.
912        for hdr in request {
913            match hdr.name() {
914                b":scheme" => {
915                    if scheme.is_some() {
916                        return Err((
917                            H3_MESSAGE_ERROR,
918                            ":scheme cannot be duplicated".to_string(),
919                        ));
920                    }
921
922                    scheme = Some(std::str::from_utf8(hdr.value()).unwrap());
923                },
924
925                b":authority" => {
926                    if authority.is_some() {
927                        return Err((
928                            H3_MESSAGE_ERROR,
929                            ":authority cannot be duplicated".to_string(),
930                        ));
931                    }
932
933                    authority = Some(std::str::from_utf8(hdr.value()).unwrap());
934                },
935
936                b":path" => {
937                    if path.is_some() {
938                        return Err((
939                            H3_MESSAGE_ERROR,
940                            ":path cannot be duplicated".to_string(),
941                        ));
942                    }
943
944                    path = Some(std::str::from_utf8(hdr.value()).unwrap())
945                },
946
947                b":method" => {
948                    if method.is_some() {
949                        return Err((
950                            H3_MESSAGE_ERROR,
951                            ":method cannot be duplicated".to_string(),
952                        ));
953                    }
954
955                    method = Some(std::str::from_utf8(hdr.value()).unwrap())
956                },
957
958                b":protocol" => {
959                    return Err((
960                        H3_MESSAGE_ERROR,
961                        ":protocol not supported".to_string(),
962                    ));
963                },
964
965                b"priority" => priority = hdr.value().to_vec(),
966
967                b"host" => host = Some(std::str::from_utf8(hdr.value()).unwrap()),
968
969                _ => (),
970            }
971        }
972
973        let decided_method = match method {
974            Some(method) => {
975                match method {
976                    "" =>
977                        return Err((
978                            H3_MESSAGE_ERROR,
979                            ":method value cannot be empty".to_string(),
980                        )),
981
982                    "CONNECT" => {
983                        // not allowed
984                        let headers = vec![
985                            quiche::h3::Header::new(
986                                b":status",
987                                "405".to_string().as_bytes(),
988                            ),
989                            quiche::h3::Header::new(b"server", b"quiche"),
990                        ];
991
992                        return Ok((headers, b"".to_vec(), Default::default()));
993                    },
994
995                    _ => method,
996                }
997            },
998
999            None =>
1000                return Err((
1001                    H3_MESSAGE_ERROR,
1002                    ":method cannot be missing".to_string(),
1003                )),
1004        };
1005
1006        let decided_scheme = match scheme {
1007            Some(scheme) => {
1008                if scheme != "http" && scheme != "https" {
1009                    let headers = vec![
1010                        quiche::h3::Header::new(
1011                            b":status",
1012                            "400".to_string().as_bytes(),
1013                        ),
1014                        quiche::h3::Header::new(b"server", b"quiche"),
1015                    ];
1016
1017                    return Ok((
1018                        headers,
1019                        b"Invalid scheme".to_vec(),
1020                        Default::default(),
1021                    ));
1022                }
1023
1024                scheme
1025            },
1026
1027            None =>
1028                return Err((
1029                    H3_MESSAGE_ERROR,
1030                    ":scheme cannot be missing".to_string(),
1031                )),
1032        };
1033
1034        let decided_host = match (authority, host) {
1035            (None, Some("")) =>
1036                return Err((
1037                    H3_MESSAGE_ERROR,
1038                    "host value cannot be empty".to_string(),
1039                )),
1040
1041            (Some(""), None) =>
1042                return Err((
1043                    H3_MESSAGE_ERROR,
1044                    ":authority value cannot be empty".to_string(),
1045                )),
1046
1047            (Some(""), Some("")) =>
1048                return Err((
1049                    H3_MESSAGE_ERROR,
1050                    ":authority and host value cannot be empty".to_string(),
1051                )),
1052
1053            (None, None) =>
1054                return Err((
1055                    H3_MESSAGE_ERROR,
1056                    ":authority and host missing".to_string(),
1057                )),
1058
1059            // Any other combo, prefer :authority
1060            (..) => authority.unwrap(),
1061        };
1062
1063        let decided_path = match path {
1064            Some("") =>
1065                return Err((
1066                    H3_MESSAGE_ERROR,
1067                    ":path value cannot be empty".to_string(),
1068                )),
1069
1070            None =>
1071                return Err((
1072                    H3_MESSAGE_ERROR,
1073                    ":path cannot be missing".to_string(),
1074                )),
1075
1076            Some(path) => path,
1077        };
1078
1079        let url = format!("{decided_scheme}://{decided_host}{decided_path}");
1080        let url = url::Url::parse(&url).unwrap();
1081
1082        let pathbuf = path::PathBuf::from(url.path());
1083        let pathbuf = autoindex(pathbuf, index);
1084
1085        // Priority query string takes precedence over the header.
1086        // So replace the header with one built here.
1087        let query_priority = priority_field_value_from_query_string(&url);
1088
1089        if let Some(p) = query_priority {
1090            priority = p.as_bytes().to_vec();
1091        }
1092
1093        let (status, body) = match decided_method {
1094            "GET" => {
1095                for c in pathbuf.components() {
1096                    if let path::Component::Normal(v) = c {
1097                        file_path.push(v)
1098                    }
1099                }
1100
1101                match std::fs::read(file_path.as_path()) {
1102                    Ok(data) => (200, data),
1103
1104                    Err(_) => (404, b"Not Found!".to_vec()),
1105                }
1106            },
1107
1108            _ => (405, Vec::new()),
1109        };
1110
1111        let headers = vec![
1112            quiche::h3::Header::new(b":status", status.to_string().as_bytes()),
1113            quiche::h3::Header::new(b"server", b"quiche"),
1114            quiche::h3::Header::new(
1115                b"content-length",
1116                body.len().to_string().as_bytes(),
1117            ),
1118        ];
1119
1120        Ok((headers, body, priority))
1121    }
1122}
1123
1124impl HttpConn for Http3Conn {
1125    fn send_requests(
1126        &mut self, conn: &mut quiche::Connection, target_path: &Option<String>,
1127    ) {
1128        let mut reqs_done = 0;
1129
1130        // First send headers.
1131        for req in self.reqs.iter_mut().skip(self.reqs_hdrs_sent) {
1132            let s = match self.h3_conn.send_request(
1133                conn,
1134                &req.hdrs,
1135                self.body.is_none(),
1136            ) {
1137                Ok(v) => v,
1138
1139                Err(quiche::h3::Error::TransportError(
1140                    quiche::Error::StreamLimit,
1141                )) => {
1142                    debug!("not enough stream credits, retry later...");
1143                    break;
1144                },
1145
1146                Err(quiche::h3::Error::StreamBlocked) => {
1147                    debug!("stream is blocked, retry later...");
1148                    break;
1149                },
1150
1151                Err(e) => {
1152                    error!("failed to send request {e:?}");
1153                    break;
1154                },
1155            };
1156
1157            debug!("Sent HTTP request {:?}", &req.hdrs);
1158
1159            if let Some(priority) = &req.priority {
1160                // If sending the priority fails, don't try again.
1161                self.h3_conn
1162                    .send_priority_update_for_request(conn, s, priority)
1163                    .ok();
1164            }
1165
1166            req.stream_id = Some(s);
1167            req.response_writer =
1168                make_resource_writer(&req.url, target_path, req.cardinal);
1169            self.sent_body_bytes.insert(s, 0);
1170
1171            reqs_done += 1;
1172        }
1173        self.reqs_hdrs_sent += reqs_done;
1174
1175        // Then send any remaining body.
1176        if let Some(body) = &self.body {
1177            for (stream_id, sent_bytes) in self.sent_body_bytes.iter_mut() {
1178                if *sent_bytes == body.len() {
1179                    continue;
1180                }
1181
1182                // Always try to send all remaining bytes, so always set fin to
1183                // true.
1184                let sent = match self.h3_conn.send_body(
1185                    conn,
1186                    *stream_id,
1187                    &body[*sent_bytes..],
1188                    true,
1189                ) {
1190                    Ok(v) => v,
1191
1192                    Err(quiche::h3::Error::Done) => 0,
1193
1194                    Err(e) => {
1195                        error!("failed to send request body {e:?}");
1196                        continue;
1197                    },
1198                };
1199
1200                *sent_bytes += sent;
1201            }
1202        }
1203
1204        // And finally any DATAGRAMS.
1205        if let Some(ds) = self.dgram_sender.as_mut() {
1206            let mut dgrams_done = 0;
1207
1208            for _ in ds.dgrams_sent..ds.dgram_count {
1209                match send_h3_dgram(conn, ds.flow_id, ds.dgram_content.as_bytes())
1210                {
1211                    Ok(v) => v,
1212
1213                    Err(e) => {
1214                        error!("failed to send dgram {e:?}");
1215                        break;
1216                    },
1217                }
1218
1219                dgrams_done += 1;
1220            }
1221
1222            ds.dgrams_sent += dgrams_done;
1223        }
1224    }
1225
1226    fn handle_responses(
1227        &mut self, conn: &mut quiche::Connection, buf: &mut [u8],
1228        req_start: &std::time::Instant,
1229    ) {
1230        loop {
1231            match self.h3_conn.poll(conn) {
1232                Ok((stream_id, quiche::h3::Event::Headers { list, .. })) => {
1233                    debug!(
1234                        "got response headers {:?} on stream id {}",
1235                        hdrs_to_strings(&list),
1236                        stream_id
1237                    );
1238
1239                    let req = self
1240                        .reqs
1241                        .iter_mut()
1242                        .find(|r| r.stream_id == Some(stream_id))
1243                        .unwrap();
1244
1245                    req.response_hdrs = list;
1246                },
1247
1248                Ok((stream_id, quiche::h3::Event::Data)) => {
1249                    while let Ok(read) =
1250                        self.h3_conn.recv_body(conn, stream_id, buf)
1251                    {
1252                        debug!(
1253                            "got {read} bytes of response data on stream {stream_id}"
1254                        );
1255
1256                        let req = self
1257                            .reqs
1258                            .iter_mut()
1259                            .find(|r| r.stream_id == Some(stream_id))
1260                            .unwrap();
1261
1262                        let len = std::cmp::min(
1263                            read,
1264                            req.response_body_max - req.response_body.len(),
1265                        );
1266                        req.response_body.extend_from_slice(&buf[..len]);
1267
1268                        match &mut req.response_writer {
1269                            Some(rw) => {
1270                                rw.write_all(&buf[..read]).ok();
1271                            },
1272
1273                            None =>
1274                                if !self.dump_json {
1275                                    self.output_sink.borrow_mut()(unsafe {
1276                                        String::from_utf8_unchecked(
1277                                            buf[..read].to_vec(),
1278                                        )
1279                                    });
1280                                },
1281                        }
1282                    }
1283                },
1284
1285                Ok((_stream_id, quiche::h3::Event::Finished)) => {
1286                    self.reqs_complete += 1;
1287                    let reqs_count = self.reqs.len();
1288
1289                    debug!(
1290                        "{}/{} responses received",
1291                        self.reqs_complete, reqs_count
1292                    );
1293
1294                    if self.reqs_complete == reqs_count {
1295                        info!(
1296                            "{}/{} response(s) received in {:?}, closing...",
1297                            self.reqs_complete,
1298                            reqs_count,
1299                            req_start.elapsed()
1300                        );
1301
1302                        if self.dump_json {
1303                            dump_json(
1304                                &self.reqs,
1305                                &mut *self.output_sink.borrow_mut(),
1306                            );
1307                        }
1308
1309                        match conn.close(true, 0x100, b"kthxbye") {
1310                            // Already closed.
1311                            Ok(_) | Err(quiche::Error::Done) => (),
1312
1313                            Err(e) => panic!("error closing conn: {e:?}"),
1314                        }
1315
1316                        break;
1317                    }
1318                },
1319
1320                Ok((_stream_id, quiche::h3::Event::Reset(e))) => {
1321                    error!("request was reset by peer with {e}, closing...");
1322
1323                    match conn.close(true, 0x100, b"kthxbye") {
1324                        // Already closed.
1325                        Ok(_) | Err(quiche::Error::Done) => (),
1326
1327                        Err(e) => panic!("error closing conn: {e:?}"),
1328                    }
1329
1330                    break;
1331                },
1332
1333                Ok((
1334                    prioritized_element_id,
1335                    quiche::h3::Event::PriorityUpdate,
1336                )) => {
1337                    info!(
1338                        "{} PRIORITY_UPDATE triggered for element ID={}",
1339                        conn.trace_id(),
1340                        prioritized_element_id
1341                    );
1342                },
1343
1344                Ok((goaway_id, quiche::h3::Event::GoAway)) => {
1345                    info!(
1346                        "{} got GOAWAY with ID {} ",
1347                        conn.trace_id(),
1348                        goaway_id
1349                    );
1350                },
1351
1352                Err(quiche::h3::Error::Done) => {
1353                    break;
1354                },
1355
1356                Err(e) => {
1357                    error!("HTTP/3 processing failed: {e:?}");
1358
1359                    break;
1360                },
1361            }
1362        }
1363
1364        // Process datagram-related events.
1365        while let Ok(len) = conn.dgram_recv(buf) {
1366            let mut b = octets::Octets::with_slice(buf);
1367            if let Ok(flow_id) = b.get_varint() {
1368                info!(
1369                    "Received DATAGRAM flow_id={} len={} data={:?}",
1370                    flow_id,
1371                    len,
1372                    buf[b.off()..len].to_vec()
1373                );
1374            }
1375        }
1376    }
1377
1378    fn report_incomplete(&self, start: &std::time::Instant) -> bool {
1379        if self.reqs_complete != self.reqs.len() {
1380            error!(
1381                "connection timed out after {:?} and only completed {}/{} requests",
1382                start.elapsed(),
1383                self.reqs_complete,
1384                self.reqs.len()
1385            );
1386
1387            if self.dump_json {
1388                dump_json(&self.reqs, &mut *self.output_sink.borrow_mut());
1389            }
1390
1391            return true;
1392        }
1393
1394        false
1395    }
1396
1397    fn handle_requests(
1398        &mut self, conn: &mut quiche::Connection,
1399        _partial_requests: &mut HashMap<u64, PartialRequest>,
1400        partial_responses: &mut HashMap<u64, PartialResponse>, root: &str,
1401        index: &str, buf: &mut [u8],
1402    ) -> quiche::h3::Result<()> {
1403        // Process HTTP stream-related events.
1404        //
1405        // This loops over any and all received HTTP requests and sends just the
1406        // HTTP response headers.
1407        loop {
1408            match self.h3_conn.poll(conn) {
1409                Ok((stream_id, quiche::h3::Event::Headers { list, .. })) => {
1410                    info!(
1411                        "{} got request {:?} on stream id {}",
1412                        conn.trace_id(),
1413                        hdrs_to_strings(&list),
1414                        stream_id
1415                    );
1416
1417                    self.largest_processed_request =
1418                        std::cmp::max(self.largest_processed_request, stream_id);
1419
1420                    // We decide the response based on headers alone, so
1421                    // stop reading the request stream so that any body
1422                    // is ignored and pointless Data events are not
1423                    // generated.
1424                    conn.stream_shutdown(stream_id, quiche::Shutdown::Read, 0)
1425                        .unwrap();
1426
1427                    let (mut headers, body, mut priority) =
1428                        match Http3Conn::build_h3_response(root, index, &list) {
1429                            Ok(v) => v,
1430
1431                            Err((error_code, _)) => {
1432                                conn.stream_shutdown(
1433                                    stream_id,
1434                                    quiche::Shutdown::Write,
1435                                    error_code,
1436                                )
1437                                .unwrap();
1438                                continue;
1439                            },
1440                        };
1441
1442                    match self.h3_conn.take_last_priority_update(stream_id) {
1443                        Ok(v) => {
1444                            priority = v;
1445                        },
1446
1447                        Err(quiche::h3::Error::Done) => (),
1448
1449                        Err(e) => error!(
1450                            "{} error taking PRIORITY_UPDATE {}",
1451                            conn.trace_id(),
1452                            e
1453                        ),
1454                    }
1455
1456                    if !priority.is_empty() {
1457                        headers.push(quiche::h3::Header::new(
1458                            b"priority",
1459                            priority.as_slice(),
1460                        ));
1461                    }
1462
1463                    #[cfg(feature = "sfv")]
1464                    let priority =
1465                        quiche::h3::Priority::try_from(priority.as_slice())
1466                            .unwrap_or_default();
1467
1468                    #[cfg(not(feature = "sfv"))]
1469                    let priority = quiche::h3::Priority::default();
1470
1471                    info!(
1472                        "{} prioritizing response on stream {} as {:?}",
1473                        conn.trace_id(),
1474                        stream_id,
1475                        priority
1476                    );
1477
1478                    match self.h3_conn.send_response_with_priority(
1479                        conn, stream_id, &headers, &priority, false,
1480                    ) {
1481                        Ok(v) => v,
1482
1483                        Err(quiche::h3::Error::StreamBlocked) => {
1484                            let response = PartialResponse {
1485                                headers: Some(headers),
1486                                priority: Some(priority),
1487                                body,
1488                                written: 0,
1489                            };
1490
1491                            partial_responses.insert(stream_id, response);
1492                            continue;
1493                        },
1494
1495                        Err(e) => {
1496                            error!(
1497                                "{} stream send failed {:?}",
1498                                conn.trace_id(),
1499                                e
1500                            );
1501
1502                            break;
1503                        },
1504                    }
1505
1506                    let response = PartialResponse {
1507                        headers: None,
1508                        priority: None,
1509                        body,
1510                        written: 0,
1511                    };
1512
1513                    partial_responses.insert(stream_id, response);
1514                },
1515
1516                Ok((stream_id, quiche::h3::Event::Data)) => {
1517                    info!(
1518                        "{} got data on stream id {}",
1519                        conn.trace_id(),
1520                        stream_id
1521                    );
1522                },
1523
1524                Ok((_stream_id, quiche::h3::Event::Finished)) => (),
1525
1526                Ok((_stream_id, quiche::h3::Event::Reset { .. })) => (),
1527
1528                Ok((
1529                    prioritized_element_id,
1530                    quiche::h3::Event::PriorityUpdate,
1531                )) => {
1532                    info!(
1533                        "{} PRIORITY_UPDATE triggered for element ID={}",
1534                        conn.trace_id(),
1535                        prioritized_element_id
1536                    );
1537                },
1538
1539                Ok((goaway_id, quiche::h3::Event::GoAway)) => {
1540                    trace!(
1541                        "{} got GOAWAY with ID {} ",
1542                        conn.trace_id(),
1543                        goaway_id
1544                    );
1545                    self.h3_conn
1546                        .send_goaway(conn, self.largest_processed_request)?;
1547                },
1548
1549                Err(quiche::h3::Error::Done) => {
1550                    break;
1551                },
1552
1553                Err(e) => {
1554                    error!("{} HTTP/3 error {:?}", conn.trace_id(), e);
1555
1556                    return Err(e);
1557                },
1558            }
1559        }
1560
1561        // Visit all writable response streams to send HTTP content.
1562        for stream_id in writable_response_streams(conn) {
1563            self.handle_writable(conn, partial_responses, stream_id);
1564        }
1565
1566        // Process datagram-related events.
1567        while let Ok(len) = conn.dgram_recv(buf) {
1568            let mut b = octets::Octets::with_slice(buf);
1569            if let Ok(flow_id) = b.get_varint() {
1570                info!(
1571                    "Received DATAGRAM flow_id={} len={} data={:?}",
1572                    flow_id,
1573                    len,
1574                    buf[b.off()..len].to_vec()
1575                );
1576            }
1577        }
1578
1579        if let Some(ds) = self.dgram_sender.as_mut() {
1580            let mut dgrams_done = 0;
1581
1582            for _ in ds.dgrams_sent..ds.dgram_count {
1583                match send_h3_dgram(conn, ds.flow_id, ds.dgram_content.as_bytes())
1584                {
1585                    Ok(v) => v,
1586
1587                    Err(e) => {
1588                        error!("failed to send dgram {e:?}");
1589                        break;
1590                    },
1591                }
1592
1593                dgrams_done += 1;
1594            }
1595
1596            ds.dgrams_sent += dgrams_done;
1597        }
1598
1599        Ok(())
1600    }
1601
1602    fn handle_writable(
1603        &mut self, conn: &mut quiche::Connection,
1604        partial_responses: &mut HashMap<u64, PartialResponse>, stream_id: u64,
1605    ) {
1606        let stream_cap = conn.stream_capacity(stream_id);
1607
1608        debug!(
1609            "{} response stream {} is writable with capacity {:?}",
1610            conn.trace_id(),
1611            stream_id,
1612            stream_cap,
1613        );
1614
1615        if !partial_responses.contains_key(&stream_id) {
1616            return;
1617        }
1618
1619        let resp = partial_responses.get_mut(&stream_id).unwrap();
1620
1621        if let (Some(headers), Some(priority)) = (&resp.headers, &resp.priority) {
1622            match self.h3_conn.send_response_with_priority(
1623                conn, stream_id, headers, priority, false,
1624            ) {
1625                Ok(_) => (),
1626
1627                Err(quiche::h3::Error::StreamBlocked) => {
1628                    return;
1629                },
1630
1631                Err(e) => {
1632                    error!("{} stream send failed {:?}", conn.trace_id(), e);
1633                    return;
1634                },
1635            }
1636        }
1637
1638        resp.headers = None;
1639        resp.priority = None;
1640
1641        let body = &resp.body[resp.written..];
1642
1643        let written = match self.h3_conn.send_body(conn, stream_id, body, true) {
1644            Ok(v) => v,
1645
1646            Err(quiche::h3::Error::Done) => 0,
1647
1648            Err(e) => {
1649                partial_responses.remove(&stream_id);
1650
1651                error!("{} stream send failed {:?}", conn.trace_id(), e);
1652                return;
1653            },
1654        };
1655
1656        resp.written += written;
1657
1658        if resp.written == resp.body.len() {
1659            partial_responses.remove(&stream_id);
1660        }
1661    }
1662}