quiche_apps/
common.rs

1// Copyright (C) 2020, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27//! Quiche application utilities.
28//!
29//! This module provides some utility functions that are common to quiche
30//! applications.
31
32use std::io::prelude::*;
33
34use std::collections::HashMap;
35
36#[cfg(feature = "sfv")]
37use std::convert::TryFrom;
38
39use std::fmt::Write as _;
40
41use std::rc::Rc;
42
43use std::cell::RefCell;
44
45use std::path;
46
47use ring::rand::SecureRandom;
48
49use quiche::ConnectionId;
50
51use quiche::h3::NameValue;
52use quiche::h3::Priority;
53
54pub fn stdout_sink(out: String) {
55    print!("{out}");
56}
57
58const H3_MESSAGE_ERROR: u64 = 0x10E;
59
60/// ALPN helpers.
61///
62/// This module contains constants and functions for working with ALPN.
63pub mod alpns {
64    pub const HTTP_09: [&[u8]; 2] = [b"hq-interop", b"http/0.9"];
65    pub const HTTP_3: [&[u8]; 1] = [b"h3"];
66}
67
68pub struct PartialRequest {
69    pub req: Vec<u8>,
70}
71
72pub struct PartialResponse {
73    pub headers: Option<Vec<quiche::h3::Header>>,
74    pub priority: Option<quiche::h3::Priority>,
75
76    pub body: Vec<u8>,
77
78    pub written: usize,
79}
80
81pub type ClientId = u64;
82
83pub struct Client {
84    pub conn: quiche::Connection,
85
86    pub http_conn: Option<Box<dyn HttpConn>>,
87
88    pub client_id: ClientId,
89
90    pub app_proto_selected: bool,
91
92    pub partial_requests: std::collections::HashMap<u64, PartialRequest>,
93
94    pub partial_responses: std::collections::HashMap<u64, PartialResponse>,
95
96    pub max_datagram_size: usize,
97
98    pub loss_rate: f64,
99
100    pub max_send_burst: usize,
101}
102
103pub type ClientIdMap = HashMap<ConnectionId<'static>, ClientId>;
104pub type ClientMap = HashMap<ClientId, Client>;
105
106/// Makes a buffered writer for a resource with a target URL.
107///
108/// The file will have the same name as the resource's last path segment value.
109/// Multiple requests for the same URL are indicated by the value of `cardinal`,
110/// any value "N" greater than 1, will cause ".N" to be appended to the
111/// filename.
112fn make_resource_writer(
113    url: &url::Url, target_path: &Option<String>, cardinal: u64,
114) -> Option<std::io::BufWriter<std::fs::File>> {
115    if let Some(tp) = target_path {
116        let resource =
117            url.path_segments().map(|c| c.collect::<Vec<_>>()).unwrap();
118
119        let mut path = format!("{}/{}", tp, resource.iter().last().unwrap());
120
121        if cardinal > 1 {
122            path = format!("{path}.{cardinal}");
123        }
124
125        match std::fs::File::create(&path) {
126            Ok(f) => return Some(std::io::BufWriter::new(f)),
127
128            Err(e) => panic!(
129                "Error creating file for {}, attempted path was {}: {}",
130                url, path, e
131            ),
132        }
133    }
134
135    None
136}
137
138fn autoindex(path: path::PathBuf, index: &str) -> path::PathBuf {
139    if let Some(path_str) = path.to_str() {
140        if path_str.ends_with('/') {
141            let path_str = format!("{path_str}{index}");
142            return path::PathBuf::from(&path_str);
143        }
144    }
145
146    path
147}
148
149/// Makes a buffered writer for a qlog.
150pub fn make_qlog_writer(
151    dir: &std::ffi::OsStr, role: &str, id: &str,
152) -> std::io::BufWriter<std::fs::File> {
153    let mut path = std::path::PathBuf::from(dir);
154    let filename = format!("{role}-{id}.sqlog");
155    path.push(filename);
156
157    match std::fs::File::create(&path) {
158        Ok(f) => std::io::BufWriter::new(f),
159
160        Err(e) => panic!(
161            "Error creating qlog file attempted path was {:?}: {}",
162            path, e
163        ),
164    }
165}
166
167fn dump_json(reqs: &[Http3Request], output_sink: &mut dyn FnMut(String)) {
168    let mut out = String::new();
169
170    writeln!(out, "{{").unwrap();
171    writeln!(out, "  \"entries\": [").unwrap();
172    let mut reqs = reqs.iter().peekable();
173
174    while let Some(req) = reqs.next() {
175        writeln!(out, "  {{").unwrap();
176        writeln!(out, "    \"request\":{{").unwrap();
177        writeln!(out, "      \"headers\":[").unwrap();
178
179        let mut req_hdrs = req.hdrs.iter().peekable();
180        while let Some(h) = req_hdrs.next() {
181            writeln!(out, "        {{").unwrap();
182            writeln!(
183                out,
184                "          \"name\": \"{}\",",
185                std::str::from_utf8(h.name()).unwrap()
186            )
187            .unwrap();
188            writeln!(
189                out,
190                "          \"value\": \"{}\"",
191                std::str::from_utf8(h.value()).unwrap().replace('"', "\\\"")
192            )
193            .unwrap();
194
195            if req_hdrs.peek().is_some() {
196                writeln!(out, "        }},").unwrap();
197            } else {
198                writeln!(out, "        }}").unwrap();
199            }
200        }
201        writeln!(out, "      ]}},").unwrap();
202
203        writeln!(out, "    \"response\":{{").unwrap();
204        writeln!(out, "      \"headers\":[").unwrap();
205
206        let mut response_hdrs = req.response_hdrs.iter().peekable();
207        while let Some(h) = response_hdrs.next() {
208            writeln!(out, "        {{").unwrap();
209            writeln!(
210                out,
211                "          \"name\": \"{}\",",
212                std::str::from_utf8(h.name()).unwrap()
213            )
214            .unwrap();
215            writeln!(
216                out,
217                "          \"value\": \"{}\"",
218                std::str::from_utf8(h.value()).unwrap().replace('"', "\\\"")
219            )
220            .unwrap();
221
222            if response_hdrs.peek().is_some() {
223                writeln!(out, "        }},").unwrap();
224            } else {
225                writeln!(out, "        }}").unwrap();
226            }
227        }
228        writeln!(out, "      ],").unwrap();
229        writeln!(out, "      \"body\": {:?}", req.response_body).unwrap();
230        writeln!(out, "    }}").unwrap();
231
232        if reqs.peek().is_some() {
233            writeln!(out, "}},").unwrap();
234        } else {
235            writeln!(out, "}}").unwrap();
236        }
237    }
238    writeln!(out, "]").unwrap();
239    writeln!(out, "}}").unwrap();
240
241    output_sink(out);
242}
243
244pub fn hdrs_to_strings(hdrs: &[quiche::h3::Header]) -> Vec<(String, String)> {
245    hdrs.iter()
246        .map(|h| {
247            let name = String::from_utf8_lossy(h.name()).to_string();
248            let value = String::from_utf8_lossy(h.value()).to_string();
249
250            (name, value)
251        })
252        .collect()
253}
254
255/// Generate a new pair of Source Connection ID and reset token.
256pub fn generate_cid_and_reset_token<T: SecureRandom>(
257    rng: &T,
258) -> (quiche::ConnectionId<'static>, u128) {
259    let mut scid = [0; quiche::MAX_CONN_ID_LEN];
260    rng.fill(&mut scid).unwrap();
261    let scid = scid.to_vec().into();
262    let mut reset_token = [0; 16];
263    rng.fill(&mut reset_token).unwrap();
264    let reset_token = u128::from_be_bytes(reset_token);
265    (scid, reset_token)
266}
267
268/// Construct a priority field value from quiche apps custom query string.
269pub fn priority_field_value_from_query_string(url: &url::Url) -> Option<String> {
270    let mut priority = "".to_string();
271    for param in url.query_pairs() {
272        if param.0 == "u" {
273            write!(priority, "{}={},", param.0, param.1).ok();
274        }
275
276        if param.0 == "i" && param.1 == "1" {
277            priority.push_str("i,");
278        }
279    }
280
281    if !priority.is_empty() {
282        // remove trailing comma
283        priority.pop();
284
285        Some(priority)
286    } else {
287        None
288    }
289}
290
291/// Construct a Priority from quiche apps custom query string.
292pub fn priority_from_query_string(url: &url::Url) -> Option<Priority> {
293    let mut urgency = None;
294    let mut incremental = None;
295    for param in url.query_pairs() {
296        if param.0 == "u" {
297            urgency = Some(param.1.parse::<u8>().unwrap());
298        }
299
300        if param.0 == "i" && param.1 == "1" {
301            incremental = Some(true);
302        }
303    }
304
305    match (urgency, incremental) {
306        (Some(u), Some(i)) => Some(Priority::new(u, i)),
307
308        (Some(u), None) => Some(Priority::new(u, false)),
309
310        (None, Some(i)) => Some(Priority::new(3, i)),
311
312        (None, None) => None,
313    }
314}
315
316fn send_h3_dgram(
317    conn: &mut quiche::Connection, flow_id: u64, dgram_content: &[u8],
318) -> quiche::Result<()> {
319    info!(
320        "sending HTTP/3 DATAGRAM on flow_id={} with data {:?}",
321        flow_id, dgram_content
322    );
323
324    let len = octets::varint_len(flow_id) + dgram_content.len();
325    let mut d = vec![0; len];
326    let mut b = octets::OctetsMut::with_slice(&mut d);
327
328    b.put_varint(flow_id)
329        .map_err(|_| quiche::Error::BufferTooShort)?;
330    b.put_bytes(dgram_content)
331        .map_err(|_| quiche::Error::BufferTooShort)?;
332
333    conn.dgram_send(&d)
334}
335
336pub trait HttpConn {
337    fn send_requests(
338        &mut self, conn: &mut quiche::Connection, target_path: &Option<String>,
339    );
340
341    fn handle_responses(
342        &mut self, conn: &mut quiche::Connection, buf: &mut [u8],
343        req_start: &std::time::Instant,
344    );
345
346    fn report_incomplete(&self, start: &std::time::Instant) -> bool;
347
348    fn handle_requests(
349        &mut self, conn: &mut quiche::Connection,
350        partial_requests: &mut HashMap<u64, PartialRequest>,
351        partial_responses: &mut HashMap<u64, PartialResponse>, root: &str,
352        index: &str, buf: &mut [u8],
353    ) -> quiche::h3::Result<()>;
354
355    fn handle_writable(
356        &mut self, conn: &mut quiche::Connection,
357        partial_responses: &mut HashMap<u64, PartialResponse>, stream_id: u64,
358    );
359}
360
361pub fn writable_response_streams(
362    conn: &quiche::Connection,
363) -> impl Iterator<Item = u64> {
364    conn.writable().filter(|id| id % 4 == 0)
365}
366
367/// Represents an HTTP/0.9 formatted request.
368pub struct Http09Request {
369    url: url::Url,
370    cardinal: u64,
371    request_line: String,
372    stream_id: Option<u64>,
373    response_writer: Option<std::io::BufWriter<std::fs::File>>,
374}
375
376/// Represents an HTTP/3 formatted request.
377struct Http3Request {
378    url: url::Url,
379    cardinal: u64,
380    stream_id: Option<u64>,
381    hdrs: Vec<quiche::h3::Header>,
382    priority: Option<Priority>,
383    response_hdrs: Vec<quiche::h3::Header>,
384    response_body: Vec<u8>,
385    response_body_max: usize,
386    response_writer: Option<std::io::BufWriter<std::fs::File>>,
387}
388
389type Http3ResponseBuilderResult = std::result::Result<
390    (Vec<quiche::h3::Header>, Vec<u8>, Vec<u8>),
391    (u64, String),
392>;
393
394pub struct Http09Conn {
395    stream_id: u64,
396    reqs_sent: usize,
397    reqs_complete: usize,
398    reqs: Vec<Http09Request>,
399    output_sink: Rc<RefCell<dyn FnMut(String)>>,
400}
401
402impl Default for Http09Conn {
403    fn default() -> Self {
404        Http09Conn {
405            stream_id: Default::default(),
406            reqs_sent: Default::default(),
407            reqs_complete: Default::default(),
408            reqs: Default::default(),
409            output_sink: Rc::new(RefCell::new(stdout_sink)),
410        }
411    }
412}
413
414impl Http09Conn {
415    pub fn with_urls(
416        urls: &[url::Url], reqs_cardinal: u64,
417        output_sink: Rc<RefCell<dyn FnMut(String)>>,
418    ) -> Box<dyn HttpConn> {
419        let mut reqs = Vec::new();
420        for url in urls {
421            for i in 1..=reqs_cardinal {
422                let request_line = format!("GET {}\r\n", url.path());
423                reqs.push(Http09Request {
424                    url: url.clone(),
425                    cardinal: i,
426                    request_line,
427                    stream_id: None,
428                    response_writer: None,
429                });
430            }
431        }
432
433        let h_conn = Http09Conn {
434            stream_id: 0,
435            reqs_sent: 0,
436            reqs_complete: 0,
437            reqs,
438            output_sink,
439        };
440
441        Box::new(h_conn)
442    }
443}
444
445impl HttpConn for Http09Conn {
446    fn send_requests(
447        &mut self, conn: &mut quiche::Connection, target_path: &Option<String>,
448    ) {
449        let mut reqs_done = 0;
450
451        for req in self.reqs.iter_mut().skip(self.reqs_sent) {
452            match conn.stream_send(
453                self.stream_id,
454                req.request_line.as_bytes(),
455                true,
456            ) {
457                Ok(v) => v,
458
459                Err(quiche::Error::StreamLimit) => {
460                    debug!("not enough stream credits, retry later...");
461                    break;
462                },
463
464                Err(e) => {
465                    error!("failed to send request {:?}", e);
466                    break;
467                },
468            };
469
470            debug!("sending HTTP request {:?}", req.request_line);
471
472            req.stream_id = Some(self.stream_id);
473            req.response_writer =
474                make_resource_writer(&req.url, target_path, req.cardinal);
475
476            self.stream_id += 4;
477
478            reqs_done += 1;
479        }
480
481        self.reqs_sent += reqs_done;
482    }
483
484    fn handle_responses(
485        &mut self, conn: &mut quiche::Connection, buf: &mut [u8],
486        req_start: &std::time::Instant,
487    ) {
488        // Process all readable streams.
489        for s in conn.readable() {
490            while let Ok((read, fin)) = conn.stream_recv(s, buf) {
491                trace!("received {} bytes", read);
492
493                let stream_buf = &buf[..read];
494
495                trace!(
496                    "stream {} has {} bytes (fin? {})",
497                    s,
498                    stream_buf.len(),
499                    fin
500                );
501
502                let req = self
503                    .reqs
504                    .iter_mut()
505                    .find(|r| r.stream_id == Some(s))
506                    .unwrap();
507
508                match &mut req.response_writer {
509                    Some(rw) => {
510                        rw.write_all(&buf[..read]).ok();
511                    },
512
513                    None => {
514                        self.output_sink.borrow_mut()(unsafe {
515                            String::from_utf8_unchecked(stream_buf.to_vec())
516                        });
517                    },
518                }
519
520                // The server reported that it has no more data to send on
521                // a client-initiated
522                // bidirectional stream, which means
523                // we got the full response. If all responses are received
524                // then close the connection.
525                if &s % 4 == 0 && fin {
526                    self.reqs_complete += 1;
527                    let reqs_count = self.reqs.len();
528
529                    debug!(
530                        "{}/{} responses received",
531                        self.reqs_complete, reqs_count
532                    );
533
534                    if self.reqs_complete == reqs_count {
535                        info!(
536                            "{}/{} response(s) received in {:?}, closing...",
537                            self.reqs_complete,
538                            reqs_count,
539                            req_start.elapsed()
540                        );
541
542                        match conn.close(true, 0x00, b"kthxbye") {
543                            // Already closed.
544                            Ok(_) | Err(quiche::Error::Done) => (),
545
546                            Err(e) => panic!("error closing conn: {:?}", e),
547                        }
548
549                        break;
550                    }
551                }
552            }
553        }
554    }
555
556    fn report_incomplete(&self, start: &std::time::Instant) -> bool {
557        if self.reqs_complete != self.reqs.len() {
558            error!(
559                "connection timed out after {:?} and only completed {}/{} requests",
560                start.elapsed(),
561                self.reqs_complete,
562                self.reqs.len()
563            );
564
565            return true;
566        }
567
568        false
569    }
570
571    fn handle_requests(
572        &mut self, conn: &mut quiche::Connection,
573        partial_requests: &mut HashMap<u64, PartialRequest>,
574        partial_responses: &mut HashMap<u64, PartialResponse>, root: &str,
575        index: &str, buf: &mut [u8],
576    ) -> quiche::h3::Result<()> {
577        // Process all readable streams.
578        for s in conn.readable() {
579            while let Ok((read, fin)) = conn.stream_recv(s, buf) {
580                trace!("{} received {} bytes", conn.trace_id(), read);
581
582                let stream_buf = &buf[..read];
583
584                trace!(
585                    "{} stream {} has {} bytes (fin? {})",
586                    conn.trace_id(),
587                    s,
588                    stream_buf.len(),
589                    fin
590                );
591
592                let stream_buf =
593                    if let Some(partial) = partial_requests.get_mut(&s) {
594                        partial.req.extend_from_slice(stream_buf);
595
596                        if !partial.req.ends_with(b"\r\n") {
597                            return Ok(());
598                        }
599
600                        &partial.req
601                    } else {
602                        if !stream_buf.ends_with(b"\r\n") {
603                            let request = PartialRequest {
604                                req: stream_buf.to_vec(),
605                            };
606
607                            partial_requests.insert(s, request);
608                            return Ok(());
609                        }
610
611                        stream_buf
612                    };
613
614                if stream_buf.starts_with(b"GET ") {
615                    let uri = &stream_buf[4..stream_buf.len() - 2];
616                    let uri = String::from_utf8(uri.to_vec()).unwrap();
617                    let uri = String::from(uri.lines().next().unwrap());
618                    let uri = path::Path::new(&uri);
619                    let mut path = path::PathBuf::from(root);
620
621                    partial_requests.remove(&s);
622
623                    for c in uri.components() {
624                        if let path::Component::Normal(v) = c {
625                            path.push(v)
626                        }
627                    }
628
629                    path = autoindex(path, index);
630
631                    info!(
632                        "{} got GET request for {:?} on stream {}",
633                        conn.trace_id(),
634                        path,
635                        s
636                    );
637
638                    let body = std::fs::read(path.as_path())
639                        .unwrap_or_else(|_| b"Not Found!\r\n".to_vec());
640
641                    info!(
642                        "{} sending response of size {} on stream {}",
643                        conn.trace_id(),
644                        body.len(),
645                        s
646                    );
647
648                    let written = match conn.stream_send(s, &body, true) {
649                        Ok(v) => v,
650
651                        Err(quiche::Error::Done) => 0,
652
653                        Err(e) => {
654                            error!(
655                                "{} stream send failed {:?}",
656                                conn.trace_id(),
657                                e
658                            );
659                            return Err(From::from(e));
660                        },
661                    };
662
663                    if written < body.len() {
664                        let response = PartialResponse {
665                            headers: None,
666                            priority: None,
667                            body,
668                            written,
669                        };
670
671                        partial_responses.insert(s, response);
672                    }
673                }
674            }
675        }
676
677        Ok(())
678    }
679
680    fn handle_writable(
681        &mut self, conn: &mut quiche::Connection,
682        partial_responses: &mut HashMap<u64, PartialResponse>, stream_id: u64,
683    ) {
684        debug!(
685            "{} response stream {} is writable with capacity {:?}",
686            conn.trace_id(),
687            stream_id,
688            conn.stream_capacity(stream_id)
689        );
690
691        if !partial_responses.contains_key(&stream_id) {
692            return;
693        }
694
695        let resp = partial_responses.get_mut(&stream_id).unwrap();
696        let body = &resp.body[resp.written..];
697
698        let written = match conn.stream_send(stream_id, body, true) {
699            Ok(v) => v,
700
701            Err(quiche::Error::Done) => 0,
702
703            Err(e) => {
704                partial_responses.remove(&stream_id);
705
706                error!("{} stream send failed {:?}", conn.trace_id(), e);
707                return;
708            },
709        };
710
711        resp.written += written;
712
713        if resp.written == resp.body.len() {
714            partial_responses.remove(&stream_id);
715        }
716    }
717}
718
719pub struct Http3DgramSender {
720    dgram_count: u64,
721    pub dgram_content: String,
722    pub flow_id: u64,
723    pub dgrams_sent: u64,
724}
725
726impl Http3DgramSender {
727    pub fn new(dgram_count: u64, dgram_content: String, flow_id: u64) -> Self {
728        Self {
729            dgram_count,
730            dgram_content,
731            flow_id,
732            dgrams_sent: 0,
733        }
734    }
735}
736
737fn make_h3_config(
738    max_field_section_size: Option<u64>, qpack_max_table_capacity: Option<u64>,
739    qpack_blocked_streams: Option<u64>,
740) -> quiche::h3::Config {
741    let mut config = quiche::h3::Config::new().unwrap();
742
743    if let Some(v) = max_field_section_size {
744        config.set_max_field_section_size(v);
745    }
746
747    if let Some(v) = qpack_max_table_capacity {
748        // quiche doesn't support dynamic QPACK, so clamp to 0 for now.
749        config.set_qpack_max_table_capacity(v.clamp(0, 0));
750    }
751
752    if let Some(v) = qpack_blocked_streams {
753        // quiche doesn't support dynamic QPACK, so clamp to 0 for now.
754        config.set_qpack_blocked_streams(v.clamp(0, 0));
755    }
756
757    config
758}
759
760pub struct Http3Conn {
761    h3_conn: quiche::h3::Connection,
762    reqs_hdrs_sent: usize,
763    reqs_complete: usize,
764    largest_processed_request: u64,
765    reqs: Vec<Http3Request>,
766    body: Option<Vec<u8>>,
767    sent_body_bytes: HashMap<u64, usize>,
768    dump_json: bool,
769    dgram_sender: Option<Http3DgramSender>,
770    output_sink: Rc<RefCell<dyn FnMut(String)>>,
771}
772
773impl Http3Conn {
774    #[allow(clippy::too_many_arguments)]
775    pub fn with_urls(
776        conn: &mut quiche::Connection, urls: &[url::Url], reqs_cardinal: u64,
777        req_headers: &[String], body: &Option<Vec<u8>>, method: &str,
778        send_priority_update: bool, max_field_section_size: Option<u64>,
779        qpack_max_table_capacity: Option<u64>,
780        qpack_blocked_streams: Option<u64>, dump_json: Option<usize>,
781        dgram_sender: Option<Http3DgramSender>,
782        output_sink: Rc<RefCell<dyn FnMut(String)>>,
783    ) -> Box<dyn HttpConn> {
784        let mut reqs = Vec::new();
785        for url in urls {
786            for i in 1..=reqs_cardinal {
787                let authority = match url.port() {
788                    Some(port) => format!("{}:{}", url.host_str().unwrap(), port),
789
790                    None => url.host_str().unwrap().to_string(),
791                };
792
793                let mut hdrs = vec![
794                    quiche::h3::Header::new(b":method", method.as_bytes()),
795                    quiche::h3::Header::new(b":scheme", url.scheme().as_bytes()),
796                    quiche::h3::Header::new(b":authority", authority.as_bytes()),
797                    quiche::h3::Header::new(
798                        b":path",
799                        url[url::Position::BeforePath..].as_bytes(),
800                    ),
801                    quiche::h3::Header::new(b"user-agent", b"quiche"),
802                ];
803
804                let priority = if send_priority_update {
805                    priority_from_query_string(url)
806                } else {
807                    None
808                };
809
810                // Add custom headers to the request.
811                for header in req_headers {
812                    let header_split: Vec<&str> =
813                        header.splitn(2, ": ").collect();
814
815                    if header_split.len() != 2 {
816                        panic!("malformed header provided - \"{}\"", header);
817                    }
818
819                    hdrs.push(quiche::h3::Header::new(
820                        header_split[0].as_bytes(),
821                        header_split[1].as_bytes(),
822                    ));
823                }
824
825                if body.is_some() {
826                    hdrs.push(quiche::h3::Header::new(
827                        b"content-length",
828                        body.as_ref().unwrap().len().to_string().as_bytes(),
829                    ));
830                }
831
832                reqs.push(Http3Request {
833                    url: url.clone(),
834                    cardinal: i,
835                    hdrs,
836                    priority,
837                    response_hdrs: Vec::new(),
838                    response_body: Vec::new(),
839                    response_body_max: dump_json.unwrap_or_default(),
840                    stream_id: None,
841                    response_writer: None,
842                });
843            }
844        }
845
846        let h_conn = Http3Conn {
847            h3_conn: quiche::h3::Connection::with_transport(
848                conn,
849                &make_h3_config(
850                    max_field_section_size,
851                    qpack_max_table_capacity,
852                    qpack_blocked_streams,
853                ),
854            ).expect("Unable to create HTTP/3 connection, check the server's uni stream limit and window size"),
855            reqs_hdrs_sent: 0,
856            reqs_complete: 0,
857            largest_processed_request: 0,
858            reqs,
859            body: body.as_ref().map(|b| b.to_vec()),
860            sent_body_bytes: HashMap::new(),
861            dump_json: dump_json.is_some(),
862            dgram_sender,
863            output_sink,
864        };
865
866        Box::new(h_conn)
867    }
868
869    pub fn with_conn(
870        conn: &mut quiche::Connection, max_field_section_size: Option<u64>,
871        qpack_max_table_capacity: Option<u64>,
872        qpack_blocked_streams: Option<u64>,
873        dgram_sender: Option<Http3DgramSender>,
874        output_sink: Rc<RefCell<dyn FnMut(String)>>,
875    ) -> std::result::Result<Box<dyn HttpConn>, String> {
876        let h3_conn = quiche::h3::Connection::with_transport(
877            conn,
878            &make_h3_config(
879                max_field_section_size,
880                qpack_max_table_capacity,
881                qpack_blocked_streams,
882            ),
883        ).map_err(|_| "Unable to create HTTP/3 connection, check the client's uni stream limit and window size")?;
884
885        let h_conn = Http3Conn {
886            h3_conn,
887            reqs_hdrs_sent: 0,
888            reqs_complete: 0,
889            largest_processed_request: 0,
890            reqs: Vec::new(),
891            body: None,
892            sent_body_bytes: HashMap::new(),
893            dump_json: false,
894            dgram_sender,
895            output_sink,
896        };
897
898        Ok(Box::new(h_conn))
899    }
900
901    /// Builds an HTTP/3 response given a request.
902    fn build_h3_response(
903        root: &str, index: &str, request: &[quiche::h3::Header],
904    ) -> Http3ResponseBuilderResult {
905        let mut file_path = path::PathBuf::from(root);
906        let mut scheme = None;
907        let mut authority = None;
908        let mut host = None;
909        let mut path = None;
910        let mut method = None;
911        let mut priority = vec![];
912
913        // Parse some of the request headers.
914        for hdr in request {
915            match hdr.name() {
916                b":scheme" => {
917                    if scheme.is_some() {
918                        return Err((
919                            H3_MESSAGE_ERROR,
920                            ":scheme cannot be duplicated".to_string(),
921                        ));
922                    }
923
924                    scheme = Some(std::str::from_utf8(hdr.value()).unwrap());
925                },
926
927                b":authority" => {
928                    if authority.is_some() {
929                        return Err((
930                            H3_MESSAGE_ERROR,
931                            ":authority cannot be duplicated".to_string(),
932                        ));
933                    }
934
935                    authority = Some(std::str::from_utf8(hdr.value()).unwrap());
936                },
937
938                b":path" => {
939                    if path.is_some() {
940                        return Err((
941                            H3_MESSAGE_ERROR,
942                            ":path cannot be duplicated".to_string(),
943                        ));
944                    }
945
946                    path = Some(std::str::from_utf8(hdr.value()).unwrap())
947                },
948
949                b":method" => {
950                    if method.is_some() {
951                        return Err((
952                            H3_MESSAGE_ERROR,
953                            ":method cannot be duplicated".to_string(),
954                        ));
955                    }
956
957                    method = Some(std::str::from_utf8(hdr.value()).unwrap())
958                },
959
960                b":protocol" => {
961                    return Err((
962                        H3_MESSAGE_ERROR,
963                        ":protocol not supported".to_string(),
964                    ));
965                },
966
967                b"priority" => priority = hdr.value().to_vec(),
968
969                b"host" => host = Some(std::str::from_utf8(hdr.value()).unwrap()),
970
971                _ => (),
972            }
973        }
974
975        let decided_method = match method {
976            Some(method) => {
977                match method {
978                    "" =>
979                        return Err((
980                            H3_MESSAGE_ERROR,
981                            ":method value cannot be empty".to_string(),
982                        )),
983
984                    "CONNECT" => {
985                        // not allowed
986                        let headers = vec![
987                            quiche::h3::Header::new(
988                                b":status",
989                                "405".to_string().as_bytes(),
990                            ),
991                            quiche::h3::Header::new(b"server", b"quiche"),
992                        ];
993
994                        return Ok((headers, b"".to_vec(), Default::default()));
995                    },
996
997                    _ => method,
998                }
999            },
1000
1001            None =>
1002                return Err((
1003                    H3_MESSAGE_ERROR,
1004                    ":method cannot be missing".to_string(),
1005                )),
1006        };
1007
1008        let decided_scheme = match scheme {
1009            Some(scheme) => {
1010                if scheme != "http" && scheme != "https" {
1011                    let headers = vec![
1012                        quiche::h3::Header::new(
1013                            b":status",
1014                            "400".to_string().as_bytes(),
1015                        ),
1016                        quiche::h3::Header::new(b"server", b"quiche"),
1017                    ];
1018
1019                    return Ok((
1020                        headers,
1021                        b"Invalid scheme".to_vec(),
1022                        Default::default(),
1023                    ));
1024                }
1025
1026                scheme
1027            },
1028
1029            None =>
1030                return Err((
1031                    H3_MESSAGE_ERROR,
1032                    ":scheme cannot be missing".to_string(),
1033                )),
1034        };
1035
1036        let decided_host = match (authority, host) {
1037            (None, Some("")) =>
1038                return Err((
1039                    H3_MESSAGE_ERROR,
1040                    "host value cannot be empty".to_string(),
1041                )),
1042
1043            (Some(""), None) =>
1044                return Err((
1045                    H3_MESSAGE_ERROR,
1046                    ":authority value cannot be empty".to_string(),
1047                )),
1048
1049            (Some(""), Some("")) =>
1050                return Err((
1051                    H3_MESSAGE_ERROR,
1052                    ":authority and host value cannot be empty".to_string(),
1053                )),
1054
1055            (None, None) =>
1056                return Err((
1057                    H3_MESSAGE_ERROR,
1058                    ":authority and host missing".to_string(),
1059                )),
1060
1061            // Any other combo, prefer :authority
1062            (..) => authority.unwrap(),
1063        };
1064
1065        let decided_path = match path {
1066            Some("") =>
1067                return Err((
1068                    H3_MESSAGE_ERROR,
1069                    ":path value cannot be empty".to_string(),
1070                )),
1071
1072            None =>
1073                return Err((
1074                    H3_MESSAGE_ERROR,
1075                    ":path cannot be missing".to_string(),
1076                )),
1077
1078            Some(path) => path,
1079        };
1080
1081        let url = format!("{decided_scheme}://{decided_host}{decided_path}");
1082        let url = url::Url::parse(&url).unwrap();
1083
1084        let pathbuf = path::PathBuf::from(url.path());
1085        let pathbuf = autoindex(pathbuf, index);
1086
1087        // Priority query string takes precedence over the header.
1088        // So replace the header with one built here.
1089        let query_priority = priority_field_value_from_query_string(&url);
1090
1091        if let Some(p) = query_priority {
1092            priority = p.as_bytes().to_vec();
1093        }
1094
1095        let (status, body) = match decided_method {
1096            "GET" => {
1097                for c in pathbuf.components() {
1098                    if let path::Component::Normal(v) = c {
1099                        file_path.push(v)
1100                    }
1101                }
1102
1103                match std::fs::read(file_path.as_path()) {
1104                    Ok(data) => (200, data),
1105
1106                    Err(_) => (404, b"Not Found!".to_vec()),
1107                }
1108            },
1109
1110            _ => (405, Vec::new()),
1111        };
1112
1113        let headers = vec![
1114            quiche::h3::Header::new(b":status", status.to_string().as_bytes()),
1115            quiche::h3::Header::new(b"server", b"quiche"),
1116            quiche::h3::Header::new(
1117                b"content-length",
1118                body.len().to_string().as_bytes(),
1119            ),
1120        ];
1121
1122        Ok((headers, body, priority))
1123    }
1124}
1125
1126impl HttpConn for Http3Conn {
1127    fn send_requests(
1128        &mut self, conn: &mut quiche::Connection, target_path: &Option<String>,
1129    ) {
1130        let mut reqs_done = 0;
1131
1132        // First send headers.
1133        for req in self.reqs.iter_mut().skip(self.reqs_hdrs_sent) {
1134            let s = match self.h3_conn.send_request(
1135                conn,
1136                &req.hdrs,
1137                self.body.is_none(),
1138            ) {
1139                Ok(v) => v,
1140
1141                Err(quiche::h3::Error::TransportError(
1142                    quiche::Error::StreamLimit,
1143                )) => {
1144                    debug!("not enough stream credits, retry later...");
1145                    break;
1146                },
1147
1148                Err(quiche::h3::Error::StreamBlocked) => {
1149                    debug!("stream is blocked, retry later...");
1150                    break;
1151                },
1152
1153                Err(e) => {
1154                    error!("failed to send request {:?}", e);
1155                    break;
1156                },
1157            };
1158
1159            debug!("Sent HTTP request {:?}", &req.hdrs);
1160
1161            if let Some(priority) = &req.priority {
1162                // If sending the priority fails, don't try again.
1163                self.h3_conn
1164                    .send_priority_update_for_request(conn, s, priority)
1165                    .ok();
1166            }
1167
1168            req.stream_id = Some(s);
1169            req.response_writer =
1170                make_resource_writer(&req.url, target_path, req.cardinal);
1171            self.sent_body_bytes.insert(s, 0);
1172
1173            reqs_done += 1;
1174        }
1175        self.reqs_hdrs_sent += reqs_done;
1176
1177        // Then send any remaining body.
1178        if let Some(body) = &self.body {
1179            for (stream_id, sent_bytes) in self.sent_body_bytes.iter_mut() {
1180                if *sent_bytes == body.len() {
1181                    continue;
1182                }
1183
1184                // Always try to send all remaining bytes, so always set fin to
1185                // true.
1186                let sent = match self.h3_conn.send_body(
1187                    conn,
1188                    *stream_id,
1189                    &body[*sent_bytes..],
1190                    true,
1191                ) {
1192                    Ok(v) => v,
1193
1194                    Err(quiche::h3::Error::Done) => 0,
1195
1196                    Err(e) => {
1197                        error!("failed to send request body {:?}", e);
1198                        continue;
1199                    },
1200                };
1201
1202                *sent_bytes += sent;
1203            }
1204        }
1205
1206        // And finally any DATAGRAMS.
1207        if let Some(ds) = self.dgram_sender.as_mut() {
1208            let mut dgrams_done = 0;
1209
1210            for _ in ds.dgrams_sent..ds.dgram_count {
1211                match send_h3_dgram(conn, ds.flow_id, ds.dgram_content.as_bytes())
1212                {
1213                    Ok(v) => v,
1214
1215                    Err(e) => {
1216                        error!("failed to send dgram {:?}", e);
1217                        break;
1218                    },
1219                }
1220
1221                dgrams_done += 1;
1222            }
1223
1224            ds.dgrams_sent += dgrams_done;
1225        }
1226    }
1227
1228    fn handle_responses(
1229        &mut self, conn: &mut quiche::Connection, buf: &mut [u8],
1230        req_start: &std::time::Instant,
1231    ) {
1232        loop {
1233            match self.h3_conn.poll(conn) {
1234                Ok((stream_id, quiche::h3::Event::Headers { list, .. })) => {
1235                    debug!(
1236                        "got response headers {:?} on stream id {}",
1237                        hdrs_to_strings(&list),
1238                        stream_id
1239                    );
1240
1241                    let req = self
1242                        .reqs
1243                        .iter_mut()
1244                        .find(|r| r.stream_id == Some(stream_id))
1245                        .unwrap();
1246
1247                    req.response_hdrs = list;
1248                },
1249
1250                Ok((stream_id, quiche::h3::Event::Data)) => {
1251                    while let Ok(read) =
1252                        self.h3_conn.recv_body(conn, stream_id, buf)
1253                    {
1254                        debug!(
1255                            "got {} bytes of response data on stream {}",
1256                            read, stream_id
1257                        );
1258
1259                        let req = self
1260                            .reqs
1261                            .iter_mut()
1262                            .find(|r| r.stream_id == Some(stream_id))
1263                            .unwrap();
1264
1265                        let len = std::cmp::min(
1266                            read,
1267                            req.response_body_max - req.response_body.len(),
1268                        );
1269                        req.response_body.extend_from_slice(&buf[..len]);
1270
1271                        match &mut req.response_writer {
1272                            Some(rw) => {
1273                                rw.write_all(&buf[..read]).ok();
1274                            },
1275
1276                            None =>
1277                                if !self.dump_json {
1278                                    self.output_sink.borrow_mut()(unsafe {
1279                                        String::from_utf8_unchecked(
1280                                            buf[..read].to_vec(),
1281                                        )
1282                                    });
1283                                },
1284                        }
1285                    }
1286                },
1287
1288                Ok((_stream_id, quiche::h3::Event::Finished)) => {
1289                    self.reqs_complete += 1;
1290                    let reqs_count = self.reqs.len();
1291
1292                    debug!(
1293                        "{}/{} responses received",
1294                        self.reqs_complete, reqs_count
1295                    );
1296
1297                    if self.reqs_complete == reqs_count {
1298                        info!(
1299                            "{}/{} response(s) received in {:?}, closing...",
1300                            self.reqs_complete,
1301                            reqs_count,
1302                            req_start.elapsed()
1303                        );
1304
1305                        if self.dump_json {
1306                            dump_json(
1307                                &self.reqs,
1308                                &mut *self.output_sink.borrow_mut(),
1309                            );
1310                        }
1311
1312                        match conn.close(true, 0x100, b"kthxbye") {
1313                            // Already closed.
1314                            Ok(_) | Err(quiche::Error::Done) => (),
1315
1316                            Err(e) => panic!("error closing conn: {:?}", e),
1317                        }
1318
1319                        break;
1320                    }
1321                },
1322
1323                Ok((_stream_id, quiche::h3::Event::Reset(e))) => {
1324                    error!("request was reset by peer with {}, closing...", e);
1325
1326                    match conn.close(true, 0x100, b"kthxbye") {
1327                        // Already closed.
1328                        Ok(_) | Err(quiche::Error::Done) => (),
1329
1330                        Err(e) => panic!("error closing conn: {:?}", e),
1331                    }
1332
1333                    break;
1334                },
1335
1336                Ok((
1337                    prioritized_element_id,
1338                    quiche::h3::Event::PriorityUpdate,
1339                )) => {
1340                    info!(
1341                        "{} PRIORITY_UPDATE triggered for element ID={}",
1342                        conn.trace_id(),
1343                        prioritized_element_id
1344                    );
1345                },
1346
1347                Ok((goaway_id, quiche::h3::Event::GoAway)) => {
1348                    info!(
1349                        "{} got GOAWAY with ID {} ",
1350                        conn.trace_id(),
1351                        goaway_id
1352                    );
1353                },
1354
1355                Err(quiche::h3::Error::Done) => {
1356                    break;
1357                },
1358
1359                Err(e) => {
1360                    error!("HTTP/3 processing failed: {:?}", e);
1361
1362                    break;
1363                },
1364            }
1365        }
1366
1367        // Process datagram-related events.
1368        while let Ok(len) = conn.dgram_recv(buf) {
1369            let mut b = octets::Octets::with_slice(buf);
1370            if let Ok(flow_id) = b.get_varint() {
1371                info!(
1372                    "Received DATAGRAM flow_id={} len={} data={:?}",
1373                    flow_id,
1374                    len,
1375                    buf[b.off()..len].to_vec()
1376                );
1377            }
1378        }
1379    }
1380
1381    fn report_incomplete(&self, start: &std::time::Instant) -> bool {
1382        if self.reqs_complete != self.reqs.len() {
1383            error!(
1384                "connection timed out after {:?} and only completed {}/{} requests",
1385                start.elapsed(),
1386                self.reqs_complete,
1387                self.reqs.len()
1388            );
1389
1390            if self.dump_json {
1391                dump_json(&self.reqs, &mut *self.output_sink.borrow_mut());
1392            }
1393
1394            return true;
1395        }
1396
1397        false
1398    }
1399
1400    fn handle_requests(
1401        &mut self, conn: &mut quiche::Connection,
1402        _partial_requests: &mut HashMap<u64, PartialRequest>,
1403        partial_responses: &mut HashMap<u64, PartialResponse>, root: &str,
1404        index: &str, buf: &mut [u8],
1405    ) -> quiche::h3::Result<()> {
1406        // Process HTTP stream-related events.
1407        //
1408        // This loops over any and all received HTTP requests and sends just the
1409        // HTTP response headers.
1410        loop {
1411            match self.h3_conn.poll(conn) {
1412                Ok((stream_id, quiche::h3::Event::Headers { list, .. })) => {
1413                    info!(
1414                        "{} got request {:?} on stream id {}",
1415                        conn.trace_id(),
1416                        hdrs_to_strings(&list),
1417                        stream_id
1418                    );
1419
1420                    self.largest_processed_request =
1421                        std::cmp::max(self.largest_processed_request, stream_id);
1422
1423                    // We decide the response based on headers alone, so
1424                    // stop reading the request stream so that any body
1425                    // is ignored and pointless Data events are not
1426                    // generated.
1427                    conn.stream_shutdown(stream_id, quiche::Shutdown::Read, 0)
1428                        .unwrap();
1429
1430                    let (mut headers, body, mut priority) =
1431                        match Http3Conn::build_h3_response(root, index, &list) {
1432                            Ok(v) => v,
1433
1434                            Err((error_code, _)) => {
1435                                conn.stream_shutdown(
1436                                    stream_id,
1437                                    quiche::Shutdown::Write,
1438                                    error_code,
1439                                )
1440                                .unwrap();
1441                                continue;
1442                            },
1443                        };
1444
1445                    match self.h3_conn.take_last_priority_update(stream_id) {
1446                        Ok(v) => {
1447                            priority = v;
1448                        },
1449
1450                        Err(quiche::h3::Error::Done) => (),
1451
1452                        Err(e) => error!(
1453                            "{} error taking PRIORITY_UPDATE {}",
1454                            conn.trace_id(),
1455                            e
1456                        ),
1457                    }
1458
1459                    if !priority.is_empty() {
1460                        headers.push(quiche::h3::Header::new(
1461                            b"priority",
1462                            priority.as_slice(),
1463                        ));
1464                    }
1465
1466                    #[cfg(feature = "sfv")]
1467                    let priority =
1468                        quiche::h3::Priority::try_from(priority.as_slice())
1469                            .unwrap_or_default();
1470
1471                    #[cfg(not(feature = "sfv"))]
1472                    let priority = quiche::h3::Priority::default();
1473
1474                    info!(
1475                        "{} prioritizing response on stream {} as {:?}",
1476                        conn.trace_id(),
1477                        stream_id,
1478                        priority
1479                    );
1480
1481                    match self.h3_conn.send_response_with_priority(
1482                        conn, stream_id, &headers, &priority, false,
1483                    ) {
1484                        Ok(v) => v,
1485
1486                        Err(quiche::h3::Error::StreamBlocked) => {
1487                            let response = PartialResponse {
1488                                headers: Some(headers),
1489                                priority: Some(priority),
1490                                body,
1491                                written: 0,
1492                            };
1493
1494                            partial_responses.insert(stream_id, response);
1495                            continue;
1496                        },
1497
1498                        Err(e) => {
1499                            error!(
1500                                "{} stream send failed {:?}",
1501                                conn.trace_id(),
1502                                e
1503                            );
1504
1505                            break;
1506                        },
1507                    }
1508
1509                    let response = PartialResponse {
1510                        headers: None,
1511                        priority: None,
1512                        body,
1513                        written: 0,
1514                    };
1515
1516                    partial_responses.insert(stream_id, response);
1517                },
1518
1519                Ok((stream_id, quiche::h3::Event::Data)) => {
1520                    info!(
1521                        "{} got data on stream id {}",
1522                        conn.trace_id(),
1523                        stream_id
1524                    );
1525                },
1526
1527                Ok((_stream_id, quiche::h3::Event::Finished)) => (),
1528
1529                Ok((_stream_id, quiche::h3::Event::Reset { .. })) => (),
1530
1531                Ok((
1532                    prioritized_element_id,
1533                    quiche::h3::Event::PriorityUpdate,
1534                )) => {
1535                    info!(
1536                        "{} PRIORITY_UPDATE triggered for element ID={}",
1537                        conn.trace_id(),
1538                        prioritized_element_id
1539                    );
1540                },
1541
1542                Ok((goaway_id, quiche::h3::Event::GoAway)) => {
1543                    trace!(
1544                        "{} got GOAWAY with ID {} ",
1545                        conn.trace_id(),
1546                        goaway_id
1547                    );
1548                    self.h3_conn
1549                        .send_goaway(conn, self.largest_processed_request)?;
1550                },
1551
1552                Err(quiche::h3::Error::Done) => {
1553                    break;
1554                },
1555
1556                Err(e) => {
1557                    error!("{} HTTP/3 error {:?}", conn.trace_id(), e);
1558
1559                    return Err(e);
1560                },
1561            }
1562        }
1563
1564        // Visit all writable response streams to send HTTP content.
1565        for stream_id in writable_response_streams(conn) {
1566            self.handle_writable(conn, partial_responses, stream_id);
1567        }
1568
1569        // Process datagram-related events.
1570        while let Ok(len) = conn.dgram_recv(buf) {
1571            let mut b = octets::Octets::with_slice(buf);
1572            if let Ok(flow_id) = b.get_varint() {
1573                info!(
1574                    "Received DATAGRAM flow_id={} len={} data={:?}",
1575                    flow_id,
1576                    len,
1577                    buf[b.off()..len].to_vec()
1578                );
1579            }
1580        }
1581
1582        if let Some(ds) = self.dgram_sender.as_mut() {
1583            let mut dgrams_done = 0;
1584
1585            for _ in ds.dgrams_sent..ds.dgram_count {
1586                match send_h3_dgram(conn, ds.flow_id, ds.dgram_content.as_bytes())
1587                {
1588                    Ok(v) => v,
1589
1590                    Err(e) => {
1591                        error!("failed to send dgram {:?}", e);
1592                        break;
1593                    },
1594                }
1595
1596                dgrams_done += 1;
1597            }
1598
1599            ds.dgrams_sent += dgrams_done;
1600        }
1601
1602        Ok(())
1603    }
1604
1605    fn handle_writable(
1606        &mut self, conn: &mut quiche::Connection,
1607        partial_responses: &mut HashMap<u64, PartialResponse>, stream_id: u64,
1608    ) {
1609        debug!(
1610            "{} response stream {} is writable with capacity {:?}",
1611            conn.trace_id(),
1612            stream_id,
1613            conn.stream_capacity(stream_id)
1614        );
1615
1616        if !partial_responses.contains_key(&stream_id) {
1617            return;
1618        }
1619
1620        let resp = partial_responses.get_mut(&stream_id).unwrap();
1621
1622        if let (Some(headers), Some(priority)) = (&resp.headers, &resp.priority) {
1623            match self.h3_conn.send_response_with_priority(
1624                conn, stream_id, headers, priority, false,
1625            ) {
1626                Ok(_) => (),
1627
1628                Err(quiche::h3::Error::StreamBlocked) => {
1629                    return;
1630                },
1631
1632                Err(e) => {
1633                    error!("{} stream send failed {:?}", conn.trace_id(), e);
1634                    return;
1635                },
1636            }
1637        }
1638
1639        resp.headers = None;
1640        resp.priority = None;
1641
1642        let body = &resp.body[resp.written..];
1643
1644        let written = match self.h3_conn.send_body(conn, stream_id, body, true) {
1645            Ok(v) => v,
1646
1647            Err(quiche::h3::Error::Done) => 0,
1648
1649            Err(e) => {
1650                partial_responses.remove(&stream_id);
1651
1652                error!("{} stream send failed {:?}", conn.trace_id(), e);
1653                return;
1654            },
1655        };
1656
1657        resp.written += written;
1658
1659        if resp.written == resp.body.len() {
1660            partial_responses.remove(&stream_id);
1661        }
1662    }
1663}