Skip to main content

quiche_apps/
common.rs

1// Copyright (C) 2020, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27//! Quiche application utilities.
28//!
29//! This module provides some utility functions that are common to quiche
30//! applications.
31
32use std::io::prelude::*;
33
34use std::collections::HashMap;
35
36#[cfg(feature = "sfv")]
37use std::convert::TryFrom;
38
39use std::fmt::Write as _;
40
41use std::rc::Rc;
42
43use std::cell::RefCell;
44
45use std::path;
46
47use ring::rand::SecureRandom;
48
49use quiche::ConnectionId;
50
51use quiche::h3::NameValue;
52use quiche::h3::Priority;
53
54pub fn stdout_sink(out: String) {
55    print!("{out}");
56}
57
58const H3_MESSAGE_ERROR: u64 = 0x10E;
59
60/// ALPN helpers.
61///
62/// This module contains constants and functions for working with ALPN.
63pub mod alpns {
64    pub const HTTP_09: [&[u8]; 2] = [b"hq-interop", b"http/0.9"];
65    pub const HTTP_3: [&[u8]; 1] = [b"h3"];
66}
67
68pub struct PartialRequest {
69    pub req: Vec<u8>,
70}
71
72pub struct PartialResponse {
73    pub headers: Option<Vec<quiche::h3::Header>>,
74    pub priority: Option<quiche::h3::Priority>,
75
76    pub body: Vec<u8>,
77
78    pub written: usize,
79}
80
81pub type ClientId = u64;
82
83pub struct Client {
84    pub conn: quiche::Connection,
85
86    pub http_conn: Option<Box<dyn HttpConn>>,
87
88    pub client_id: ClientId,
89
90    pub app_proto_selected: bool,
91
92    pub partial_requests: std::collections::HashMap<u64, PartialRequest>,
93
94    pub partial_responses: std::collections::HashMap<u64, PartialResponse>,
95
96    pub max_datagram_size: usize,
97
98    pub loss_rate: f64,
99
100    pub max_send_burst: usize,
101}
102
103pub type ClientIdMap = HashMap<ConnectionId<'static>, ClientId>;
104pub type ClientMap = HashMap<ClientId, Client>;
105
106/// Makes a buffered writer for a resource with a target URL.
107///
108/// The file will have the same name as the resource's last path segment value.
109/// Multiple requests for the same URL are indicated by the value of `cardinal`,
110/// any value "N" greater than 1, will cause ".N" to be appended to the
111/// filename.
112fn make_resource_writer(
113    url: &url::Url, target_path: &Option<String>, cardinal: u64,
114) -> Option<std::io::BufWriter<std::fs::File>> {
115    if let Some(tp) = target_path {
116        let resource =
117            url.path_segments().map(|c| c.collect::<Vec<_>>()).unwrap();
118
119        let mut path = format!("{}/{}", tp, resource.iter().last().unwrap());
120
121        if cardinal > 1 {
122            path = format!("{path}.{cardinal}");
123        }
124
125        match std::fs::File::create(&path) {
126            Ok(f) => return Some(std::io::BufWriter::new(f)),
127
128            Err(e) => panic!(
129                "Error creating file for {url}, attempted path was {path}: {e}"
130            ),
131        }
132    }
133
134    None
135}
136
137fn autoindex(path: path::PathBuf, index: &str) -> path::PathBuf {
138    if let Some(path_str) = path.to_str() {
139        if path_str.ends_with('/') {
140            let path_str = format!("{path_str}{index}");
141            return path::PathBuf::from(&path_str);
142        }
143    }
144
145    path
146}
147
148/// Makes a buffered writer for a qlog.
149pub fn make_qlog_writer(
150    dir: &std::ffi::OsStr, role: &str, id: &str,
151) -> std::io::BufWriter<std::fs::File> {
152    let mut path = std::path::PathBuf::from(dir);
153    let filename = format!("{role}-{id}.sqlog");
154    path.push(filename);
155
156    match std::fs::File::create(&path) {
157        Ok(f) => std::io::BufWriter::new(f),
158
159        Err(e) =>
160            panic!("Error creating qlog file attempted path was {path:?}: {e}"),
161    }
162}
163
164fn dump_json(reqs: &[Http3Request], output_sink: &mut dyn FnMut(String)) {
165    let mut out = String::new();
166
167    writeln!(out, "{{").unwrap();
168    writeln!(out, "  \"entries\": [").unwrap();
169    let mut reqs = reqs.iter().peekable();
170
171    while let Some(req) = reqs.next() {
172        writeln!(out, "  {{").unwrap();
173        writeln!(out, "    \"request\":{{").unwrap();
174        writeln!(out, "      \"headers\":[").unwrap();
175
176        let mut req_hdrs = req.hdrs.iter().peekable();
177        while let Some(h) = req_hdrs.next() {
178            writeln!(out, "        {{").unwrap();
179            writeln!(
180                out,
181                "          \"name\": \"{}\",",
182                std::str::from_utf8(h.name()).unwrap()
183            )
184            .unwrap();
185            writeln!(
186                out,
187                "          \"value\": \"{}\"",
188                std::str::from_utf8(h.value()).unwrap().replace('"', "\\\"")
189            )
190            .unwrap();
191
192            if req_hdrs.peek().is_some() {
193                writeln!(out, "        }},").unwrap();
194            } else {
195                writeln!(out, "        }}").unwrap();
196            }
197        }
198        writeln!(out, "      ]}},").unwrap();
199
200        writeln!(out, "    \"response\":{{").unwrap();
201        writeln!(out, "      \"headers\":[").unwrap();
202
203        let mut response_hdrs = req.response_hdrs.iter().peekable();
204        while let Some(h) = response_hdrs.next() {
205            writeln!(out, "        {{").unwrap();
206            writeln!(
207                out,
208                "          \"name\": \"{}\",",
209                std::str::from_utf8(h.name()).unwrap()
210            )
211            .unwrap();
212            writeln!(
213                out,
214                "          \"value\": \"{}\"",
215                std::str::from_utf8(h.value()).unwrap().replace('"', "\\\"")
216            )
217            .unwrap();
218
219            if response_hdrs.peek().is_some() {
220                writeln!(out, "        }},").unwrap();
221            } else {
222                writeln!(out, "        }}").unwrap();
223            }
224        }
225        writeln!(out, "      ],").unwrap();
226        writeln!(out, "      \"body\": {:?}", req.response_body).unwrap();
227        writeln!(out, "    }}").unwrap();
228
229        if reqs.peek().is_some() {
230            writeln!(out, "}},").unwrap();
231        } else {
232            writeln!(out, "}}").unwrap();
233        }
234    }
235    writeln!(out, "]").unwrap();
236    writeln!(out, "}}").unwrap();
237
238    output_sink(out);
239}
240
241pub fn hdrs_to_strings(hdrs: &[quiche::h3::Header]) -> Vec<(String, String)> {
242    hdrs.iter()
243        .map(|h| {
244            let name = String::from_utf8_lossy(h.name()).to_string();
245            let value = String::from_utf8_lossy(h.value()).to_string();
246
247            (name, value)
248        })
249        .collect()
250}
251
252/// Generate a new pair of Source Connection ID and reset token.
253pub fn generate_cid_and_reset_token<T: SecureRandom>(
254    rng: &T,
255) -> (quiche::ConnectionId<'static>, u128) {
256    let mut scid = [0; quiche::MAX_CONN_ID_LEN];
257    rng.fill(&mut scid).unwrap();
258    let scid = scid.to_vec().into();
259    let mut reset_token = [0; 16];
260    rng.fill(&mut reset_token).unwrap();
261    let reset_token = u128::from_be_bytes(reset_token);
262    (scid, reset_token)
263}
264
265/// Construct a priority field value from quiche apps custom query string.
266pub fn priority_field_value_from_query_string(url: &url::Url) -> Option<String> {
267    let mut priority = "".to_string();
268    for param in url.query_pairs() {
269        if param.0 == "u" {
270            write!(priority, "{}={},", param.0, param.1).ok();
271        }
272
273        if param.0 == "i" && param.1 == "1" {
274            priority.push_str("i,");
275        }
276    }
277
278    if !priority.is_empty() {
279        // remove trailing comma
280        priority.pop();
281
282        Some(priority)
283    } else {
284        None
285    }
286}
287
288/// Construct a Priority from quiche apps custom query string.
289pub fn priority_from_query_string(url: &url::Url) -> Option<Priority> {
290    let mut urgency = None;
291    let mut incremental = None;
292    for param in url.query_pairs() {
293        if param.0 == "u" {
294            urgency = Some(param.1.parse::<u8>().unwrap());
295        }
296
297        if param.0 == "i" && param.1 == "1" {
298            incremental = Some(true);
299        }
300    }
301
302    match (urgency, incremental) {
303        (Some(u), Some(i)) => Some(Priority::new(u, i)),
304
305        (Some(u), None) => Some(Priority::new(u, false)),
306
307        (None, Some(i)) => Some(Priority::new(3, i)),
308
309        (None, None) => None,
310    }
311}
312
313fn send_h3_dgram(
314    conn: &mut quiche::Connection, flow_id: u64, dgram_content: &[u8],
315) -> quiche::Result<()> {
316    info!(
317        "sending HTTP/3 DATAGRAM on flow_id={flow_id} with data {dgram_content:?}"
318    );
319
320    let len = octets::varint_len(flow_id) + dgram_content.len();
321    let mut d = vec![0; len];
322    let mut b = octets::OctetsMut::with_slice(&mut d);
323
324    b.put_varint(flow_id)
325        .map_err(|_| quiche::Error::BufferTooShort)?;
326    b.put_bytes(dgram_content)
327        .map_err(|_| quiche::Error::BufferTooShort)?;
328
329    conn.dgram_send(&d)
330}
331
332pub trait HttpConn {
333    fn send_requests(
334        &mut self, conn: &mut quiche::Connection, target_path: &Option<String>,
335    );
336
337    fn handle_responses(
338        &mut self, conn: &mut quiche::Connection, buf: &mut [u8],
339        req_start: &std::time::Instant,
340    );
341
342    fn report_incomplete(&self, start: &std::time::Instant) -> bool;
343
344    fn handle_requests(
345        &mut self, conn: &mut quiche::Connection,
346        partial_requests: &mut HashMap<u64, PartialRequest>,
347        partial_responses: &mut HashMap<u64, PartialResponse>, root: &str,
348        index: &str, buf: &mut [u8],
349    ) -> quiche::h3::Result<()>;
350
351    fn handle_writable(
352        &mut self, conn: &mut quiche::Connection,
353        partial_responses: &mut HashMap<u64, PartialResponse>, stream_id: u64,
354    );
355}
356
357pub fn writable_response_streams(
358    conn: &quiche::Connection,
359) -> impl Iterator<Item = u64> {
360    conn.writable().filter(|id| id % 4 == 0)
361}
362
363/// Represents an HTTP/0.9 formatted request.
364pub struct Http09Request {
365    url: url::Url,
366    cardinal: u64,
367    request_line: String,
368    stream_id: Option<u64>,
369    response_writer: Option<std::io::BufWriter<std::fs::File>>,
370}
371
372/// Represents an HTTP/3 formatted request.
373struct Http3Request {
374    url: url::Url,
375    cardinal: u64,
376    stream_id: Option<u64>,
377    hdrs: Vec<quiche::h3::Header>,
378    priority: Option<Priority>,
379    response_hdrs: Vec<quiche::h3::Header>,
380    response_body: Vec<u8>,
381    response_body_max: usize,
382    response_writer: Option<std::io::BufWriter<std::fs::File>>,
383}
384
385type Http3ResponseBuilderResult = std::result::Result<
386    (Vec<quiche::h3::Header>, Vec<u8>, Vec<u8>),
387    (u64, String),
388>;
389
390pub struct Http09Conn {
391    stream_id: u64,
392    reqs_sent: usize,
393    reqs_complete: usize,
394    reqs: Vec<Http09Request>,
395    output_sink: Rc<RefCell<dyn FnMut(String)>>,
396}
397
398impl Default for Http09Conn {
399    fn default() -> Self {
400        Http09Conn {
401            stream_id: Default::default(),
402            reqs_sent: Default::default(),
403            reqs_complete: Default::default(),
404            reqs: Default::default(),
405            output_sink: Rc::new(RefCell::new(stdout_sink)),
406        }
407    }
408}
409
410impl Http09Conn {
411    pub fn with_urls(
412        urls: &[url::Url], reqs_cardinal: u64,
413        output_sink: Rc<RefCell<dyn FnMut(String)>>,
414    ) -> Box<dyn HttpConn> {
415        let mut reqs = Vec::new();
416        for url in urls {
417            for i in 1..=reqs_cardinal {
418                let request_line = format!("GET {}\r\n", url.path());
419                reqs.push(Http09Request {
420                    url: url.clone(),
421                    cardinal: i,
422                    request_line,
423                    stream_id: None,
424                    response_writer: None,
425                });
426            }
427        }
428
429        let h_conn = Http09Conn {
430            stream_id: 0,
431            reqs_sent: 0,
432            reqs_complete: 0,
433            reqs,
434            output_sink,
435        };
436
437        Box::new(h_conn)
438    }
439}
440
441impl HttpConn for Http09Conn {
442    fn send_requests(
443        &mut self, conn: &mut quiche::Connection, target_path: &Option<String>,
444    ) {
445        let mut reqs_done = 0;
446
447        for req in self.reqs.iter_mut().skip(self.reqs_sent) {
448            match conn.stream_send(
449                self.stream_id,
450                req.request_line.as_bytes(),
451                true,
452            ) {
453                Ok(v) => v,
454
455                Err(quiche::Error::StreamLimit) => {
456                    debug!("not enough stream credits, retry later...");
457                    break;
458                },
459
460                Err(e) => {
461                    error!("failed to send request {e:?}");
462                    break;
463                },
464            };
465
466            debug!("sending HTTP request {:?}", req.request_line);
467
468            req.stream_id = Some(self.stream_id);
469            req.response_writer =
470                make_resource_writer(&req.url, target_path, req.cardinal);
471
472            self.stream_id += 4;
473
474            reqs_done += 1;
475        }
476
477        self.reqs_sent += reqs_done;
478    }
479
480    fn handle_responses(
481        &mut self, conn: &mut quiche::Connection, buf: &mut [u8],
482        req_start: &std::time::Instant,
483    ) {
484        // Process all readable streams.
485        for s in conn.readable() {
486            while let Ok((read, fin)) = conn.stream_recv(s, buf) {
487                trace!("received {read} bytes");
488
489                let stream_buf = &buf[..read];
490
491                trace!(
492                    "stream {} has {} bytes (fin? {})",
493                    s,
494                    stream_buf.len(),
495                    fin
496                );
497
498                let req = self
499                    .reqs
500                    .iter_mut()
501                    .find(|r| r.stream_id == Some(s))
502                    .unwrap();
503
504                match &mut req.response_writer {
505                    Some(rw) => {
506                        rw.write_all(&buf[..read]).ok();
507                    },
508
509                    None => {
510                        self.output_sink.borrow_mut()(unsafe {
511                            String::from_utf8_unchecked(stream_buf.to_vec())
512                        });
513                    },
514                }
515
516                // The server reported that it has no more data to send on
517                // a client-initiated
518                // bidirectional stream, which means
519                // we got the full response. If all responses are received
520                // then close the connection.
521                if &s % 4 == 0 && fin {
522                    self.reqs_complete += 1;
523                    let reqs_count = self.reqs.len();
524
525                    debug!(
526                        "{}/{} responses received",
527                        self.reqs_complete, reqs_count
528                    );
529
530                    if self.reqs_complete == reqs_count {
531                        info!(
532                            "{}/{} response(s) received in {:?}, closing...",
533                            self.reqs_complete,
534                            reqs_count,
535                            req_start.elapsed()
536                        );
537
538                        match conn.close(true, 0x00, b"kthxbye") {
539                            // Already closed.
540                            Ok(_) | Err(quiche::Error::Done) => (),
541
542                            Err(e) => panic!("error closing conn: {e:?}"),
543                        }
544
545                        break;
546                    }
547                }
548            }
549        }
550    }
551
552    fn report_incomplete(&self, start: &std::time::Instant) -> bool {
553        if self.reqs_complete != self.reqs.len() {
554            error!(
555                "connection timed out after {:?} and only completed {}/{} requests",
556                start.elapsed(),
557                self.reqs_complete,
558                self.reqs.len()
559            );
560
561            return true;
562        }
563
564        false
565    }
566
567    fn handle_requests(
568        &mut self, conn: &mut quiche::Connection,
569        partial_requests: &mut HashMap<u64, PartialRequest>,
570        partial_responses: &mut HashMap<u64, PartialResponse>, root: &str,
571        index: &str, buf: &mut [u8],
572    ) -> quiche::h3::Result<()> {
573        // Process all readable streams.
574        for s in conn.readable() {
575            while let Ok((read, fin)) = conn.stream_recv(s, buf) {
576                trace!("{} received {} bytes", conn.trace_id(), read);
577
578                let stream_buf = &buf[..read];
579
580                trace!(
581                    "{} stream {} has {} bytes (fin? {})",
582                    conn.trace_id(),
583                    s,
584                    stream_buf.len(),
585                    fin
586                );
587
588                let stream_buf =
589                    if let Some(partial) = partial_requests.get_mut(&s) {
590                        partial.req.extend_from_slice(stream_buf);
591
592                        if !partial.req.ends_with(b"\r\n") {
593                            return Ok(());
594                        }
595
596                        &partial.req
597                    } else {
598                        if !stream_buf.ends_with(b"\r\n") {
599                            let request = PartialRequest {
600                                req: stream_buf.to_vec(),
601                            };
602
603                            partial_requests.insert(s, request);
604                            return Ok(());
605                        }
606
607                        stream_buf
608                    };
609
610                if stream_buf.starts_with(b"GET ") {
611                    let uri = &stream_buf[4..stream_buf.len() - 2];
612                    let uri = String::from_utf8(uri.to_vec()).unwrap();
613                    let uri = String::from(uri.lines().next().unwrap());
614                    let uri = path::Path::new(&uri);
615                    let mut path = path::PathBuf::from(root);
616
617                    partial_requests.remove(&s);
618
619                    for c in uri.components() {
620                        if let path::Component::Normal(v) = c {
621                            path.push(v)
622                        }
623                    }
624
625                    path = autoindex(path, index);
626
627                    info!(
628                        "{} got GET request for {:?} on stream {}",
629                        conn.trace_id(),
630                        path,
631                        s
632                    );
633
634                    let body = std::fs::read(path.as_path())
635                        .unwrap_or_else(|_| b"Not Found!\r\n".to_vec());
636
637                    info!(
638                        "{} sending response of size {} on stream {}",
639                        conn.trace_id(),
640                        body.len(),
641                        s
642                    );
643
644                    let written = match conn.stream_send(s, &body, true) {
645                        Ok(v) => v,
646
647                        Err(quiche::Error::Done) => 0,
648
649                        Err(e) => {
650                            error!(
651                                "{} stream send failed {:?}",
652                                conn.trace_id(),
653                                e
654                            );
655                            return Err(From::from(e));
656                        },
657                    };
658
659                    if written < body.len() {
660                        let response = PartialResponse {
661                            headers: None,
662                            priority: None,
663                            body,
664                            written,
665                        };
666
667                        partial_responses.insert(s, response);
668                    }
669                }
670            }
671        }
672
673        Ok(())
674    }
675
676    fn handle_writable(
677        &mut self, conn: &mut quiche::Connection,
678        partial_responses: &mut HashMap<u64, PartialResponse>, stream_id: u64,
679    ) {
680        let stream_cap = conn.stream_capacity(stream_id);
681
682        debug!(
683            "{} response stream {} is writable with capacity {:?}",
684            conn.trace_id(),
685            stream_id,
686            stream_cap,
687        );
688
689        if !partial_responses.contains_key(&stream_id) {
690            return;
691        }
692
693        let resp = partial_responses.get_mut(&stream_id).unwrap();
694        let body = &resp.body[resp.written..];
695
696        let written = match conn.stream_send(stream_id, body, true) {
697            Ok(v) => v,
698
699            Err(quiche::Error::Done) => 0,
700
701            Err(e) => {
702                partial_responses.remove(&stream_id);
703
704                error!("{} stream send failed {:?}", conn.trace_id(), e);
705                return;
706            },
707        };
708
709        resp.written += written;
710
711        if resp.written == resp.body.len() {
712            partial_responses.remove(&stream_id);
713        }
714    }
715}
716
717pub struct Http3DgramSender {
718    dgram_count: u64,
719    pub dgram_content: String,
720    pub flow_id: u64,
721    pub dgrams_sent: u64,
722}
723
724impl Http3DgramSender {
725    pub fn new(dgram_count: u64, dgram_content: String, flow_id: u64) -> Self {
726        Self {
727            dgram_count,
728            dgram_content,
729            flow_id,
730            dgrams_sent: 0,
731        }
732    }
733}
734
735fn make_h3_config(
736    max_field_section_size: Option<u64>, qpack_max_table_capacity: Option<u64>,
737    qpack_blocked_streams: Option<u64>,
738) -> quiche::h3::Config {
739    let mut config = quiche::h3::Config::new().unwrap();
740
741    if let Some(v) = max_field_section_size {
742        config.set_max_field_section_size(v);
743    }
744
745    if let Some(v) = qpack_max_table_capacity {
746        // quiche doesn't support dynamic QPACK, so clamp to 0 for now.
747        config.set_qpack_max_table_capacity(v.clamp(0, 0));
748    }
749
750    if let Some(v) = qpack_blocked_streams {
751        // quiche doesn't support dynamic QPACK, so clamp to 0 for now.
752        config.set_qpack_blocked_streams(v.clamp(0, 0));
753    }
754
755    config
756}
757
758pub struct Http3Conn {
759    h3_conn: quiche::h3::Connection,
760    reqs_hdrs_sent: usize,
761    reqs_complete: usize,
762    largest_processed_request: u64,
763    reqs: Vec<Http3Request>,
764    body: Option<Vec<u8>>,
765    sent_body_bytes: HashMap<u64, usize>,
766    dump_json: bool,
767    dgram_sender: Option<Http3DgramSender>,
768    output_sink: Rc<RefCell<dyn FnMut(String)>>,
769}
770
771impl Http3Conn {
772    #[allow(clippy::too_many_arguments)]
773    pub fn with_urls(
774        conn: &mut quiche::Connection, urls: &[url::Url], reqs_cardinal: u64,
775        req_headers: &[String], body: &Option<Vec<u8>>, method: &str,
776        send_priority_update: bool, max_field_section_size: Option<u64>,
777        qpack_max_table_capacity: Option<u64>,
778        qpack_blocked_streams: Option<u64>, dump_json: Option<usize>,
779        dgram_sender: Option<Http3DgramSender>,
780        output_sink: Rc<RefCell<dyn FnMut(String)>>,
781    ) -> Box<dyn HttpConn> {
782        let mut reqs = Vec::new();
783        for url in urls {
784            for i in 1..=reqs_cardinal {
785                let authority = match url.port() {
786                    Some(port) => format!("{}:{}", url.host_str().unwrap(), port),
787
788                    None => url.host_str().unwrap().to_string(),
789                };
790
791                let mut hdrs = vec![
792                    quiche::h3::Header::new(b":method", method.as_bytes()),
793                    quiche::h3::Header::new(b":scheme", url.scheme().as_bytes()),
794                    quiche::h3::Header::new(b":authority", authority.as_bytes()),
795                    quiche::h3::Header::new(
796                        b":path",
797                        url[url::Position::BeforePath..].as_bytes(),
798                    ),
799                    quiche::h3::Header::new(b"user-agent", b"quiche"),
800                ];
801
802                let priority = if send_priority_update {
803                    priority_from_query_string(url)
804                } else {
805                    None
806                };
807
808                // Add custom headers to the request.
809                for header in req_headers {
810                    let header_split: Vec<&str> =
811                        header.splitn(2, ": ").collect();
812
813                    if header_split.len() != 2 {
814                        panic!("malformed header provided - \"{header}\"");
815                    }
816
817                    hdrs.push(quiche::h3::Header::new(
818                        header_split[0].as_bytes(),
819                        header_split[1].as_bytes(),
820                    ));
821                }
822
823                if body.is_some() {
824                    hdrs.push(quiche::h3::Header::new(
825                        b"content-length",
826                        body.as_ref().unwrap().len().to_string().as_bytes(),
827                    ));
828                }
829
830                reqs.push(Http3Request {
831                    url: url.clone(),
832                    cardinal: i,
833                    hdrs,
834                    priority,
835                    response_hdrs: Vec::new(),
836                    response_body: Vec::new(),
837                    response_body_max: dump_json.unwrap_or_default(),
838                    stream_id: None,
839                    response_writer: None,
840                });
841            }
842        }
843
844        let h_conn = Http3Conn {
845            h3_conn: quiche::h3::Connection::with_transport(
846                conn,
847                &make_h3_config(
848                    max_field_section_size,
849                    qpack_max_table_capacity,
850                    qpack_blocked_streams,
851                ),
852            ).expect("Unable to create HTTP/3 connection, check the server's uni stream limit and window size"),
853            reqs_hdrs_sent: 0,
854            reqs_complete: 0,
855            largest_processed_request: 0,
856            reqs,
857            body: body.as_ref().map(|b| b.to_vec()),
858            sent_body_bytes: HashMap::new(),
859            dump_json: dump_json.is_some(),
860            dgram_sender,
861            output_sink,
862        };
863
864        Box::new(h_conn)
865    }
866
867    pub fn with_conn(
868        conn: &mut quiche::Connection, max_field_section_size: Option<u64>,
869        qpack_max_table_capacity: Option<u64>,
870        qpack_blocked_streams: Option<u64>,
871        dgram_sender: Option<Http3DgramSender>,
872        output_sink: Rc<RefCell<dyn FnMut(String)>>,
873    ) -> std::result::Result<Box<dyn HttpConn>, String> {
874        let h3_conn = quiche::h3::Connection::with_transport(
875            conn,
876            &make_h3_config(
877                max_field_section_size,
878                qpack_max_table_capacity,
879                qpack_blocked_streams,
880            ),
881        ).map_err(|_| "Unable to create HTTP/3 connection, check the client's uni stream limit and window size")?;
882
883        let h_conn = Http3Conn {
884            h3_conn,
885            reqs_hdrs_sent: 0,
886            reqs_complete: 0,
887            largest_processed_request: 0,
888            reqs: Vec::new(),
889            body: None,
890            sent_body_bytes: HashMap::new(),
891            dump_json: false,
892            dgram_sender,
893            output_sink,
894        };
895
896        Ok(Box::new(h_conn))
897    }
898
899    /// Builds an HTTP/3 response given a request.
900    fn build_h3_response(
901        root: &str, index: &str, request: &[quiche::h3::Header],
902    ) -> Http3ResponseBuilderResult {
903        let mut file_path = path::PathBuf::from(root);
904        let mut scheme = None;
905        let mut authority = None;
906        let mut host = None;
907        let mut path = None;
908        let mut method = None;
909        let mut priority = vec![];
910
911        // Parse some of the request headers.
912        for hdr in request {
913            match hdr.name() {
914                b":scheme" => {
915                    if scheme.is_some() {
916                        return Err((
917                            H3_MESSAGE_ERROR,
918                            ":scheme cannot be duplicated".to_string(),
919                        ));
920                    }
921
922                    scheme = Some(std::str::from_utf8(hdr.value()).unwrap());
923                },
924
925                b":authority" => {
926                    if authority.is_some() {
927                        return Err((
928                            H3_MESSAGE_ERROR,
929                            ":authority cannot be duplicated".to_string(),
930                        ));
931                    }
932
933                    authority = Some(std::str::from_utf8(hdr.value()).unwrap());
934                },
935
936                b":path" => {
937                    if path.is_some() {
938                        return Err((
939                            H3_MESSAGE_ERROR,
940                            ":path cannot be duplicated".to_string(),
941                        ));
942                    }
943
944                    path = Some(std::str::from_utf8(hdr.value()).unwrap())
945                },
946
947                b":method" => {
948                    if method.is_some() {
949                        return Err((
950                            H3_MESSAGE_ERROR,
951                            ":method cannot be duplicated".to_string(),
952                        ));
953                    }
954
955                    method = Some(std::str::from_utf8(hdr.value()).unwrap())
956                },
957
958                b":protocol" => {
959                    return Err((
960                        H3_MESSAGE_ERROR,
961                        ":protocol not supported".to_string(),
962                    ));
963                },
964
965                b"priority" => priority = hdr.value().to_vec(),
966
967                b"host" => host = Some(std::str::from_utf8(hdr.value()).unwrap()),
968
969                _ => (),
970            }
971        }
972
973        let decided_method = match method {
974            Some(method) => {
975                match method {
976                    "" =>
977                        return Err((
978                            H3_MESSAGE_ERROR,
979                            ":method value cannot be empty".to_string(),
980                        )),
981
982                    "CONNECT" => {
983                        // not allowed
984                        let headers = vec![
985                            quiche::h3::Header::new(
986                                b":status",
987                                "405".to_string().as_bytes(),
988                            ),
989                            quiche::h3::Header::new(b"server", b"quiche"),
990                        ];
991
992                        return Ok((headers, b"".to_vec(), Default::default()));
993                    },
994
995                    _ => method,
996                }
997            },
998
999            None =>
1000                return Err((
1001                    H3_MESSAGE_ERROR,
1002                    ":method cannot be missing".to_string(),
1003                )),
1004        };
1005
1006        let decided_scheme = match scheme {
1007            Some(scheme) => {
1008                if scheme != "http" && scheme != "https" {
1009                    let headers = vec![
1010                        quiche::h3::Header::new(
1011                            b":status",
1012                            "400".to_string().as_bytes(),
1013                        ),
1014                        quiche::h3::Header::new(b"server", b"quiche"),
1015                    ];
1016
1017                    return Ok((
1018                        headers,
1019                        b"Invalid scheme".to_vec(),
1020                        Default::default(),
1021                    ));
1022                }
1023
1024                scheme
1025            },
1026
1027            None =>
1028                return Err((
1029                    H3_MESSAGE_ERROR,
1030                    ":scheme cannot be missing".to_string(),
1031                )),
1032        };
1033
1034        let decided_host = match (authority, host) {
1035            (None, Some("")) =>
1036                return Err((
1037                    H3_MESSAGE_ERROR,
1038                    "host value cannot be empty".to_string(),
1039                )),
1040
1041            (Some(""), None) =>
1042                return Err((
1043                    H3_MESSAGE_ERROR,
1044                    ":authority value cannot be empty".to_string(),
1045                )),
1046
1047            (Some(""), Some("")) =>
1048                return Err((
1049                    H3_MESSAGE_ERROR,
1050                    ":authority and host value cannot be empty".to_string(),
1051                )),
1052
1053            (None, None) =>
1054                return Err((
1055                    H3_MESSAGE_ERROR,
1056                    ":authority and host missing".to_string(),
1057                )),
1058
1059            // Any other combo, prefer :authority
1060            (..) => authority.unwrap(),
1061        };
1062
1063        let decided_path = match path {
1064            Some("") =>
1065                return Err((
1066                    H3_MESSAGE_ERROR,
1067                    ":path value cannot be empty".to_string(),
1068                )),
1069
1070            None =>
1071                return Err((
1072                    H3_MESSAGE_ERROR,
1073                    ":path cannot be missing".to_string(),
1074                )),
1075
1076            Some(path) => path,
1077        };
1078
1079        let url = format!("{decided_scheme}://{decided_host}{decided_path}");
1080        let url = url::Url::parse(&url).unwrap();
1081
1082        let pathbuf = path::PathBuf::from(url.path());
1083        let pathbuf = autoindex(pathbuf, index);
1084
1085        // Priority query string takes precedence over the header.
1086        // So replace the header with one built here.
1087        let query_priority = priority_field_value_from_query_string(&url);
1088
1089        if let Some(p) = query_priority {
1090            priority = p.as_bytes().to_vec();
1091        }
1092
1093        let (status, body) = match decided_method {
1094            "GET" => {
1095                const STREAM_BYTES_PREFIX: &str = "/stream-bytes/";
1096                const STREAM_BYTES_FILL: u8 = 0x57;
1097
1098                if let Some(suffix) = url.path().strip_prefix(STREAM_BYTES_PREFIX)
1099                {
1100                    let n = suffix.parse::<usize>().unwrap_or(0);
1101                    (200, vec![STREAM_BYTES_FILL; n])
1102                } else {
1103                    for c in pathbuf.components() {
1104                        if let path::Component::Normal(v) = c {
1105                            file_path.push(v)
1106                        }
1107                    }
1108
1109                    match std::fs::read(file_path.as_path()) {
1110                        Ok(data) => (200, data),
1111
1112                        Err(_) => (404, b"Not Found!".to_vec()),
1113                    }
1114                }
1115            },
1116
1117            _ => (405, Vec::new()),
1118        };
1119
1120        let headers = vec![
1121            quiche::h3::Header::new(b":status", status.to_string().as_bytes()),
1122            quiche::h3::Header::new(b"server", b"quiche"),
1123            quiche::h3::Header::new(
1124                b"content-length",
1125                body.len().to_string().as_bytes(),
1126            ),
1127        ];
1128
1129        Ok((headers, body, priority))
1130    }
1131}
1132
1133impl HttpConn for Http3Conn {
1134    fn send_requests(
1135        &mut self, conn: &mut quiche::Connection, target_path: &Option<String>,
1136    ) {
1137        let mut reqs_done = 0;
1138
1139        // First send headers.
1140        for req in self.reqs.iter_mut().skip(self.reqs_hdrs_sent) {
1141            let s = match self.h3_conn.send_request(
1142                conn,
1143                &req.hdrs,
1144                self.body.is_none(),
1145            ) {
1146                Ok(v) => v,
1147
1148                Err(quiche::h3::Error::TransportError(
1149                    quiche::Error::StreamLimit,
1150                )) => {
1151                    debug!("not enough stream credits, retry later...");
1152                    break;
1153                },
1154
1155                Err(quiche::h3::Error::StreamBlocked) => {
1156                    debug!("stream is blocked, retry later...");
1157                    break;
1158                },
1159
1160                Err(e) => {
1161                    error!("failed to send request {e:?}");
1162                    break;
1163                },
1164            };
1165
1166            debug!("Sent HTTP request {:?}", &req.hdrs);
1167
1168            if let Some(priority) = &req.priority {
1169                // If sending the priority fails, don't try again.
1170                self.h3_conn
1171                    .send_priority_update_for_request(conn, s, priority)
1172                    .ok();
1173            }
1174
1175            req.stream_id = Some(s);
1176            req.response_writer =
1177                make_resource_writer(&req.url, target_path, req.cardinal);
1178            self.sent_body_bytes.insert(s, 0);
1179
1180            reqs_done += 1;
1181        }
1182        self.reqs_hdrs_sent += reqs_done;
1183
1184        // Then send any remaining body.
1185        if let Some(body) = &self.body {
1186            for (stream_id, sent_bytes) in self.sent_body_bytes.iter_mut() {
1187                if *sent_bytes == body.len() {
1188                    continue;
1189                }
1190
1191                // Always try to send all remaining bytes, so always set fin to
1192                // true.
1193                let sent = match self.h3_conn.send_body(
1194                    conn,
1195                    *stream_id,
1196                    &body[*sent_bytes..],
1197                    true,
1198                ) {
1199                    Ok(v) => v,
1200
1201                    Err(quiche::h3::Error::Done) => 0,
1202
1203                    Err(e) => {
1204                        error!("failed to send request body {e:?}");
1205                        continue;
1206                    },
1207                };
1208
1209                *sent_bytes += sent;
1210            }
1211        }
1212
1213        // And finally any DATAGRAMS.
1214        if let Some(ds) = self.dgram_sender.as_mut() {
1215            let mut dgrams_done = 0;
1216
1217            for _ in ds.dgrams_sent..ds.dgram_count {
1218                match send_h3_dgram(conn, ds.flow_id, ds.dgram_content.as_bytes())
1219                {
1220                    Ok(v) => v,
1221
1222                    Err(e) => {
1223                        error!("failed to send dgram {e:?}");
1224                        break;
1225                    },
1226                }
1227
1228                dgrams_done += 1;
1229            }
1230
1231            ds.dgrams_sent += dgrams_done;
1232        }
1233    }
1234
1235    fn handle_responses(
1236        &mut self, conn: &mut quiche::Connection, buf: &mut [u8],
1237        req_start: &std::time::Instant,
1238    ) {
1239        loop {
1240            match self.h3_conn.poll(conn) {
1241                Ok((stream_id, quiche::h3::Event::Headers { list, .. })) => {
1242                    debug!(
1243                        "got response headers {:?} on stream id {}",
1244                        hdrs_to_strings(&list),
1245                        stream_id
1246                    );
1247
1248                    let req = self
1249                        .reqs
1250                        .iter_mut()
1251                        .find(|r| r.stream_id == Some(stream_id))
1252                        .unwrap();
1253
1254                    req.response_hdrs = list;
1255                },
1256
1257                Ok((stream_id, quiche::h3::Event::Data)) => {
1258                    while let Ok(read) =
1259                        self.h3_conn.recv_body(conn, stream_id, buf)
1260                    {
1261                        debug!(
1262                            "got {read} bytes of response data on stream {stream_id}"
1263                        );
1264
1265                        let req = self
1266                            .reqs
1267                            .iter_mut()
1268                            .find(|r| r.stream_id == Some(stream_id))
1269                            .unwrap();
1270
1271                        let len = std::cmp::min(
1272                            read,
1273                            req.response_body_max - req.response_body.len(),
1274                        );
1275                        req.response_body.extend_from_slice(&buf[..len]);
1276
1277                        match &mut req.response_writer {
1278                            Some(rw) => {
1279                                rw.write_all(&buf[..read]).ok();
1280                            },
1281
1282                            None =>
1283                                if !self.dump_json {
1284                                    self.output_sink.borrow_mut()(unsafe {
1285                                        String::from_utf8_unchecked(
1286                                            buf[..read].to_vec(),
1287                                        )
1288                                    });
1289                                },
1290                        }
1291                    }
1292                },
1293
1294                Ok((_stream_id, quiche::h3::Event::Finished)) => {
1295                    self.reqs_complete += 1;
1296                    let reqs_count = self.reqs.len();
1297
1298                    debug!(
1299                        "{}/{} responses received",
1300                        self.reqs_complete, reqs_count
1301                    );
1302
1303                    if self.reqs_complete == reqs_count {
1304                        info!(
1305                            "{}/{} response(s) received in {:?}, closing...",
1306                            self.reqs_complete,
1307                            reqs_count,
1308                            req_start.elapsed()
1309                        );
1310
1311                        if self.dump_json {
1312                            dump_json(
1313                                &self.reqs,
1314                                &mut *self.output_sink.borrow_mut(),
1315                            );
1316                        }
1317
1318                        match conn.close(true, 0x100, b"kthxbye") {
1319                            // Already closed.
1320                            Ok(_) | Err(quiche::Error::Done) => (),
1321
1322                            Err(e) => panic!("error closing conn: {e:?}"),
1323                        }
1324
1325                        break;
1326                    }
1327                },
1328
1329                Ok((_stream_id, quiche::h3::Event::Reset(e))) => {
1330                    error!("request was reset by peer with {e}, closing...");
1331
1332                    match conn.close(true, 0x100, b"kthxbye") {
1333                        // Already closed.
1334                        Ok(_) | Err(quiche::Error::Done) => (),
1335
1336                        Err(e) => panic!("error closing conn: {e:?}"),
1337                    }
1338
1339                    break;
1340                },
1341
1342                Ok((
1343                    prioritized_element_id,
1344                    quiche::h3::Event::PriorityUpdate,
1345                )) => {
1346                    info!(
1347                        "{} PRIORITY_UPDATE triggered for element ID={}",
1348                        conn.trace_id(),
1349                        prioritized_element_id
1350                    );
1351                },
1352
1353                Ok((goaway_id, quiche::h3::Event::GoAway)) => {
1354                    info!(
1355                        "{} got GOAWAY with ID {} ",
1356                        conn.trace_id(),
1357                        goaway_id
1358                    );
1359                },
1360
1361                Err(quiche::h3::Error::Done) => {
1362                    break;
1363                },
1364
1365                Err(e) => {
1366                    error!("HTTP/3 processing failed: {e:?}");
1367
1368                    break;
1369                },
1370            }
1371        }
1372
1373        // Process datagram-related events.
1374        while let Ok(len) = conn.dgram_recv(buf) {
1375            let mut b = octets::Octets::with_slice(buf);
1376            if let Ok(flow_id) = b.get_varint() {
1377                info!(
1378                    "Received DATAGRAM flow_id={} len={} data={:?}",
1379                    flow_id,
1380                    len,
1381                    buf[b.off()..len].to_vec()
1382                );
1383            }
1384        }
1385    }
1386
1387    fn report_incomplete(&self, start: &std::time::Instant) -> bool {
1388        if self.reqs_complete != self.reqs.len() {
1389            error!(
1390                "connection timed out after {:?} and only completed {}/{} requests",
1391                start.elapsed(),
1392                self.reqs_complete,
1393                self.reqs.len()
1394            );
1395
1396            if self.dump_json {
1397                dump_json(&self.reqs, &mut *self.output_sink.borrow_mut());
1398            }
1399
1400            return true;
1401        }
1402
1403        false
1404    }
1405
1406    fn handle_requests(
1407        &mut self, conn: &mut quiche::Connection,
1408        _partial_requests: &mut HashMap<u64, PartialRequest>,
1409        partial_responses: &mut HashMap<u64, PartialResponse>, root: &str,
1410        index: &str, buf: &mut [u8],
1411    ) -> quiche::h3::Result<()> {
1412        // Process HTTP stream-related events.
1413        //
1414        // This loops over any and all received HTTP requests and sends just the
1415        // HTTP response headers.
1416        loop {
1417            match self.h3_conn.poll(conn) {
1418                Ok((stream_id, quiche::h3::Event::Headers { list, .. })) => {
1419                    info!(
1420                        "{} got request {:?} on stream id {}",
1421                        conn.trace_id(),
1422                        hdrs_to_strings(&list),
1423                        stream_id
1424                    );
1425
1426                    self.largest_processed_request =
1427                        std::cmp::max(self.largest_processed_request, stream_id);
1428
1429                    // We decide the response based on headers alone, so
1430                    // stop reading the request stream so that any body
1431                    // is ignored and pointless Data events are not
1432                    // generated.
1433                    conn.stream_shutdown(stream_id, quiche::Shutdown::Read, 0)
1434                        .unwrap();
1435
1436                    let (mut headers, body, mut priority) =
1437                        match Http3Conn::build_h3_response(root, index, &list) {
1438                            Ok(v) => v,
1439
1440                            Err((error_code, _)) => {
1441                                conn.stream_shutdown(
1442                                    stream_id,
1443                                    quiche::Shutdown::Write,
1444                                    error_code,
1445                                )
1446                                .unwrap();
1447                                continue;
1448                            },
1449                        };
1450
1451                    match self.h3_conn.take_last_priority_update(stream_id) {
1452                        Ok(v) => {
1453                            priority = v;
1454                        },
1455
1456                        Err(quiche::h3::Error::Done) => (),
1457
1458                        Err(e) => error!(
1459                            "{} error taking PRIORITY_UPDATE {}",
1460                            conn.trace_id(),
1461                            e
1462                        ),
1463                    }
1464
1465                    if !priority.is_empty() {
1466                        headers.push(quiche::h3::Header::new(
1467                            b"priority",
1468                            priority.as_slice(),
1469                        ));
1470                    }
1471
1472                    #[cfg(feature = "sfv")]
1473                    let priority =
1474                        quiche::h3::Priority::try_from(priority.as_slice())
1475                            .unwrap_or_default();
1476
1477                    #[cfg(not(feature = "sfv"))]
1478                    let priority = quiche::h3::Priority::default();
1479
1480                    info!(
1481                        "{} prioritizing response on stream {} as {:?}",
1482                        conn.trace_id(),
1483                        stream_id,
1484                        priority
1485                    );
1486
1487                    match self.h3_conn.send_response_with_priority(
1488                        conn, stream_id, &headers, &priority, false,
1489                    ) {
1490                        Ok(v) => v,
1491
1492                        Err(quiche::h3::Error::StreamBlocked) => {
1493                            let response = PartialResponse {
1494                                headers: Some(headers),
1495                                priority: Some(priority),
1496                                body,
1497                                written: 0,
1498                            };
1499
1500                            partial_responses.insert(stream_id, response);
1501                            continue;
1502                        },
1503
1504                        Err(e) => {
1505                            error!(
1506                                "{} stream send failed {:?}",
1507                                conn.trace_id(),
1508                                e
1509                            );
1510
1511                            break;
1512                        },
1513                    }
1514
1515                    let response = PartialResponse {
1516                        headers: None,
1517                        priority: None,
1518                        body,
1519                        written: 0,
1520                    };
1521
1522                    partial_responses.insert(stream_id, response);
1523                },
1524
1525                Ok((stream_id, quiche::h3::Event::Data)) => {
1526                    info!(
1527                        "{} got data on stream id {}",
1528                        conn.trace_id(),
1529                        stream_id
1530                    );
1531                },
1532
1533                Ok((_stream_id, quiche::h3::Event::Finished)) => (),
1534
1535                Ok((_stream_id, quiche::h3::Event::Reset { .. })) => (),
1536
1537                Ok((
1538                    prioritized_element_id,
1539                    quiche::h3::Event::PriorityUpdate,
1540                )) => {
1541                    info!(
1542                        "{} PRIORITY_UPDATE triggered for element ID={}",
1543                        conn.trace_id(),
1544                        prioritized_element_id
1545                    );
1546                },
1547
1548                Ok((goaway_id, quiche::h3::Event::GoAway)) => {
1549                    trace!(
1550                        "{} got GOAWAY with ID {} ",
1551                        conn.trace_id(),
1552                        goaway_id
1553                    );
1554                    self.h3_conn
1555                        .send_goaway(conn, self.largest_processed_request)?;
1556                },
1557
1558                Err(quiche::h3::Error::Done) => {
1559                    break;
1560                },
1561
1562                Err(e) => {
1563                    error!("{} HTTP/3 error {:?}", conn.trace_id(), e);
1564
1565                    return Err(e);
1566                },
1567            }
1568        }
1569
1570        // Visit all writable response streams to send HTTP content.
1571        for stream_id in writable_response_streams(conn) {
1572            self.handle_writable(conn, partial_responses, stream_id);
1573        }
1574
1575        // Process datagram-related events.
1576        while let Ok(len) = conn.dgram_recv(buf) {
1577            let mut b = octets::Octets::with_slice(buf);
1578            if let Ok(flow_id) = b.get_varint() {
1579                info!(
1580                    "Received DATAGRAM flow_id={} len={} data={:?}",
1581                    flow_id,
1582                    len,
1583                    buf[b.off()..len].to_vec()
1584                );
1585            }
1586        }
1587
1588        if let Some(ds) = self.dgram_sender.as_mut() {
1589            let mut dgrams_done = 0;
1590
1591            for _ in ds.dgrams_sent..ds.dgram_count {
1592                match send_h3_dgram(conn, ds.flow_id, ds.dgram_content.as_bytes())
1593                {
1594                    Ok(v) => v,
1595
1596                    Err(e) => {
1597                        error!("failed to send dgram {e:?}");
1598                        break;
1599                    },
1600                }
1601
1602                dgrams_done += 1;
1603            }
1604
1605            ds.dgrams_sent += dgrams_done;
1606        }
1607
1608        Ok(())
1609    }
1610
1611    fn handle_writable(
1612        &mut self, conn: &mut quiche::Connection,
1613        partial_responses: &mut HashMap<u64, PartialResponse>, stream_id: u64,
1614    ) {
1615        let stream_cap = conn.stream_capacity(stream_id);
1616
1617        debug!(
1618            "{} response stream {} is writable with capacity {:?}",
1619            conn.trace_id(),
1620            stream_id,
1621            stream_cap,
1622        );
1623
1624        if !partial_responses.contains_key(&stream_id) {
1625            return;
1626        }
1627
1628        let resp = partial_responses.get_mut(&stream_id).unwrap();
1629
1630        if let (Some(headers), Some(priority)) = (&resp.headers, &resp.priority) {
1631            match self.h3_conn.send_response_with_priority(
1632                conn, stream_id, headers, priority, false,
1633            ) {
1634                Ok(_) => (),
1635
1636                Err(quiche::h3::Error::StreamBlocked) => {
1637                    return;
1638                },
1639
1640                Err(e) => {
1641                    error!("{} stream send failed {:?}", conn.trace_id(), e);
1642                    return;
1643                },
1644            }
1645        }
1646
1647        resp.headers = None;
1648        resp.priority = None;
1649
1650        let body = &resp.body[resp.written..];
1651
1652        let written = match self.h3_conn.send_body(conn, stream_id, body, true) {
1653            Ok(v) => v,
1654
1655            Err(quiche::h3::Error::Done) => 0,
1656
1657            Err(e) => {
1658                partial_responses.remove(&stream_id);
1659
1660                error!("{} stream send failed {:?}", conn.trace_id(), e);
1661                return;
1662            },
1663        };
1664
1665        resp.written += written;
1666
1667        if resp.written == resp.body.len() {
1668            partial_responses.remove(&stream_id);
1669        }
1670    }
1671}