1use std::io::prelude::*;
33
34use std::collections::HashMap;
35
36#[cfg(feature = "sfv")]
37use std::convert::TryFrom;
38
39use std::fmt::Write as _;
40
41use std::rc::Rc;
42
43use std::cell::RefCell;
44
45use std::path;
46
47use ring::rand::SecureRandom;
48
49use quiche::ConnectionId;
50
51use quiche::h3::NameValue;
52use quiche::h3::Priority;
53
54pub fn stdout_sink(out: String) {
55 print!("{out}");
56}
57
58const H3_MESSAGE_ERROR: u64 = 0x10E;
59
60pub mod alpns {
64 pub const HTTP_09: [&[u8]; 2] = [b"hq-interop", b"http/0.9"];
65 pub const HTTP_3: [&[u8]; 1] = [b"h3"];
66}
67
68pub struct PartialRequest {
69 pub req: Vec<u8>,
70}
71
72pub struct PartialResponse {
73 pub headers: Option<Vec<quiche::h3::Header>>,
74 pub priority: Option<quiche::h3::Priority>,
75
76 pub body: Vec<u8>,
77
78 pub written: usize,
79}
80
81pub type ClientId = u64;
82
83pub struct Client {
84 pub conn: quiche::Connection,
85
86 pub http_conn: Option<Box<dyn HttpConn>>,
87
88 pub client_id: ClientId,
89
90 pub app_proto_selected: bool,
91
92 pub partial_requests: std::collections::HashMap<u64, PartialRequest>,
93
94 pub partial_responses: std::collections::HashMap<u64, PartialResponse>,
95
96 pub max_datagram_size: usize,
97
98 pub loss_rate: f64,
99
100 pub max_send_burst: usize,
101}
102
103pub type ClientIdMap = HashMap<ConnectionId<'static>, ClientId>;
104pub type ClientMap = HashMap<ClientId, Client>;
105
106fn make_resource_writer(
113 url: &url::Url, target_path: &Option<String>, cardinal: u64,
114) -> Option<std::io::BufWriter<std::fs::File>> {
115 if let Some(tp) = target_path {
116 let resource =
117 url.path_segments().map(|c| c.collect::<Vec<_>>()).unwrap();
118
119 let mut path = format!("{}/{}", tp, resource.iter().last().unwrap());
120
121 if cardinal > 1 {
122 path = format!("{path}.{cardinal}");
123 }
124
125 match std::fs::File::create(&path) {
126 Ok(f) => return Some(std::io::BufWriter::new(f)),
127
128 Err(e) => panic!(
129 "Error creating file for {}, attempted path was {}: {}",
130 url, path, e
131 ),
132 }
133 }
134
135 None
136}
137
138fn autoindex(path: path::PathBuf, index: &str) -> path::PathBuf {
139 if let Some(path_str) = path.to_str() {
140 if path_str.ends_with('/') {
141 let path_str = format!("{path_str}{index}");
142 return path::PathBuf::from(&path_str);
143 }
144 }
145
146 path
147}
148
149pub fn make_qlog_writer(
151 dir: &std::ffi::OsStr, role: &str, id: &str,
152) -> std::io::BufWriter<std::fs::File> {
153 let mut path = std::path::PathBuf::from(dir);
154 let filename = format!("{role}-{id}.sqlog");
155 path.push(filename);
156
157 match std::fs::File::create(&path) {
158 Ok(f) => std::io::BufWriter::new(f),
159
160 Err(e) => panic!(
161 "Error creating qlog file attempted path was {:?}: {}",
162 path, e
163 ),
164 }
165}
166
167fn dump_json(reqs: &[Http3Request], output_sink: &mut dyn FnMut(String)) {
168 let mut out = String::new();
169
170 writeln!(out, "{{").unwrap();
171 writeln!(out, " \"entries\": [").unwrap();
172 let mut reqs = reqs.iter().peekable();
173
174 while let Some(req) = reqs.next() {
175 writeln!(out, " {{").unwrap();
176 writeln!(out, " \"request\":{{").unwrap();
177 writeln!(out, " \"headers\":[").unwrap();
178
179 let mut req_hdrs = req.hdrs.iter().peekable();
180 while let Some(h) = req_hdrs.next() {
181 writeln!(out, " {{").unwrap();
182 writeln!(
183 out,
184 " \"name\": \"{}\",",
185 std::str::from_utf8(h.name()).unwrap()
186 )
187 .unwrap();
188 writeln!(
189 out,
190 " \"value\": \"{}\"",
191 std::str::from_utf8(h.value()).unwrap().replace('"', "\\\"")
192 )
193 .unwrap();
194
195 if req_hdrs.peek().is_some() {
196 writeln!(out, " }},").unwrap();
197 } else {
198 writeln!(out, " }}").unwrap();
199 }
200 }
201 writeln!(out, " ]}},").unwrap();
202
203 writeln!(out, " \"response\":{{").unwrap();
204 writeln!(out, " \"headers\":[").unwrap();
205
206 let mut response_hdrs = req.response_hdrs.iter().peekable();
207 while let Some(h) = response_hdrs.next() {
208 writeln!(out, " {{").unwrap();
209 writeln!(
210 out,
211 " \"name\": \"{}\",",
212 std::str::from_utf8(h.name()).unwrap()
213 )
214 .unwrap();
215 writeln!(
216 out,
217 " \"value\": \"{}\"",
218 std::str::from_utf8(h.value()).unwrap().replace('"', "\\\"")
219 )
220 .unwrap();
221
222 if response_hdrs.peek().is_some() {
223 writeln!(out, " }},").unwrap();
224 } else {
225 writeln!(out, " }}").unwrap();
226 }
227 }
228 writeln!(out, " ],").unwrap();
229 writeln!(out, " \"body\": {:?}", req.response_body).unwrap();
230 writeln!(out, " }}").unwrap();
231
232 if reqs.peek().is_some() {
233 writeln!(out, "}},").unwrap();
234 } else {
235 writeln!(out, "}}").unwrap();
236 }
237 }
238 writeln!(out, "]").unwrap();
239 writeln!(out, "}}").unwrap();
240
241 output_sink(out);
242}
243
244pub fn hdrs_to_strings(hdrs: &[quiche::h3::Header]) -> Vec<(String, String)> {
245 hdrs.iter()
246 .map(|h| {
247 let name = String::from_utf8_lossy(h.name()).to_string();
248 let value = String::from_utf8_lossy(h.value()).to_string();
249
250 (name, value)
251 })
252 .collect()
253}
254
255pub fn generate_cid_and_reset_token<T: SecureRandom>(
257 rng: &T,
258) -> (quiche::ConnectionId<'static>, u128) {
259 let mut scid = [0; quiche::MAX_CONN_ID_LEN];
260 rng.fill(&mut scid).unwrap();
261 let scid = scid.to_vec().into();
262 let mut reset_token = [0; 16];
263 rng.fill(&mut reset_token).unwrap();
264 let reset_token = u128::from_be_bytes(reset_token);
265 (scid, reset_token)
266}
267
268pub fn priority_field_value_from_query_string(url: &url::Url) -> Option<String> {
270 let mut priority = "".to_string();
271 for param in url.query_pairs() {
272 if param.0 == "u" {
273 write!(priority, "{}={},", param.0, param.1).ok();
274 }
275
276 if param.0 == "i" && param.1 == "1" {
277 priority.push_str("i,");
278 }
279 }
280
281 if !priority.is_empty() {
282 priority.pop();
284
285 Some(priority)
286 } else {
287 None
288 }
289}
290
291pub fn priority_from_query_string(url: &url::Url) -> Option<Priority> {
293 let mut urgency = None;
294 let mut incremental = None;
295 for param in url.query_pairs() {
296 if param.0 == "u" {
297 urgency = Some(param.1.parse::<u8>().unwrap());
298 }
299
300 if param.0 == "i" && param.1 == "1" {
301 incremental = Some(true);
302 }
303 }
304
305 match (urgency, incremental) {
306 (Some(u), Some(i)) => Some(Priority::new(u, i)),
307
308 (Some(u), None) => Some(Priority::new(u, false)),
309
310 (None, Some(i)) => Some(Priority::new(3, i)),
311
312 (None, None) => None,
313 }
314}
315
316fn send_h3_dgram(
317 conn: &mut quiche::Connection, flow_id: u64, dgram_content: &[u8],
318) -> quiche::Result<()> {
319 info!(
320 "sending HTTP/3 DATAGRAM on flow_id={} with data {:?}",
321 flow_id, dgram_content
322 );
323
324 let len = octets::varint_len(flow_id) + dgram_content.len();
325 let mut d = vec![0; len];
326 let mut b = octets::OctetsMut::with_slice(&mut d);
327
328 b.put_varint(flow_id)
329 .map_err(|_| quiche::Error::BufferTooShort)?;
330 b.put_bytes(dgram_content)
331 .map_err(|_| quiche::Error::BufferTooShort)?;
332
333 conn.dgram_send(&d)
334}
335
336pub trait HttpConn {
337 fn send_requests(
338 &mut self, conn: &mut quiche::Connection, target_path: &Option<String>,
339 );
340
341 fn handle_responses(
342 &mut self, conn: &mut quiche::Connection, buf: &mut [u8],
343 req_start: &std::time::Instant,
344 );
345
346 fn report_incomplete(&self, start: &std::time::Instant) -> bool;
347
348 fn handle_requests(
349 &mut self, conn: &mut quiche::Connection,
350 partial_requests: &mut HashMap<u64, PartialRequest>,
351 partial_responses: &mut HashMap<u64, PartialResponse>, root: &str,
352 index: &str, buf: &mut [u8],
353 ) -> quiche::h3::Result<()>;
354
355 fn handle_writable(
356 &mut self, conn: &mut quiche::Connection,
357 partial_responses: &mut HashMap<u64, PartialResponse>, stream_id: u64,
358 );
359}
360
361pub fn writable_response_streams(
362 conn: &quiche::Connection,
363) -> impl Iterator<Item = u64> {
364 conn.writable().filter(|id| id % 4 == 0)
365}
366
367pub struct Http09Request {
369 url: url::Url,
370 cardinal: u64,
371 request_line: String,
372 stream_id: Option<u64>,
373 response_writer: Option<std::io::BufWriter<std::fs::File>>,
374}
375
376struct Http3Request {
378 url: url::Url,
379 cardinal: u64,
380 stream_id: Option<u64>,
381 hdrs: Vec<quiche::h3::Header>,
382 priority: Option<Priority>,
383 response_hdrs: Vec<quiche::h3::Header>,
384 response_body: Vec<u8>,
385 response_body_max: usize,
386 response_writer: Option<std::io::BufWriter<std::fs::File>>,
387}
388
389type Http3ResponseBuilderResult = std::result::Result<
390 (Vec<quiche::h3::Header>, Vec<u8>, Vec<u8>),
391 (u64, String),
392>;
393
394pub struct Http09Conn {
395 stream_id: u64,
396 reqs_sent: usize,
397 reqs_complete: usize,
398 reqs: Vec<Http09Request>,
399 output_sink: Rc<RefCell<dyn FnMut(String)>>,
400}
401
402impl Default for Http09Conn {
403 fn default() -> Self {
404 Http09Conn {
405 stream_id: Default::default(),
406 reqs_sent: Default::default(),
407 reqs_complete: Default::default(),
408 reqs: Default::default(),
409 output_sink: Rc::new(RefCell::new(stdout_sink)),
410 }
411 }
412}
413
414impl Http09Conn {
415 pub fn with_urls(
416 urls: &[url::Url], reqs_cardinal: u64,
417 output_sink: Rc<RefCell<dyn FnMut(String)>>,
418 ) -> Box<dyn HttpConn> {
419 let mut reqs = Vec::new();
420 for url in urls {
421 for i in 1..=reqs_cardinal {
422 let request_line = format!("GET {}\r\n", url.path());
423 reqs.push(Http09Request {
424 url: url.clone(),
425 cardinal: i,
426 request_line,
427 stream_id: None,
428 response_writer: None,
429 });
430 }
431 }
432
433 let h_conn = Http09Conn {
434 stream_id: 0,
435 reqs_sent: 0,
436 reqs_complete: 0,
437 reqs,
438 output_sink,
439 };
440
441 Box::new(h_conn)
442 }
443}
444
445impl HttpConn for Http09Conn {
446 fn send_requests(
447 &mut self, conn: &mut quiche::Connection, target_path: &Option<String>,
448 ) {
449 let mut reqs_done = 0;
450
451 for req in self.reqs.iter_mut().skip(self.reqs_sent) {
452 match conn.stream_send(
453 self.stream_id,
454 req.request_line.as_bytes(),
455 true,
456 ) {
457 Ok(v) => v,
458
459 Err(quiche::Error::StreamLimit) => {
460 debug!("not enough stream credits, retry later...");
461 break;
462 },
463
464 Err(e) => {
465 error!("failed to send request {:?}", e);
466 break;
467 },
468 };
469
470 debug!("sending HTTP request {:?}", req.request_line);
471
472 req.stream_id = Some(self.stream_id);
473 req.response_writer =
474 make_resource_writer(&req.url, target_path, req.cardinal);
475
476 self.stream_id += 4;
477
478 reqs_done += 1;
479 }
480
481 self.reqs_sent += reqs_done;
482 }
483
484 fn handle_responses(
485 &mut self, conn: &mut quiche::Connection, buf: &mut [u8],
486 req_start: &std::time::Instant,
487 ) {
488 for s in conn.readable() {
490 while let Ok((read, fin)) = conn.stream_recv(s, buf) {
491 trace!("received {} bytes", read);
492
493 let stream_buf = &buf[..read];
494
495 trace!(
496 "stream {} has {} bytes (fin? {})",
497 s,
498 stream_buf.len(),
499 fin
500 );
501
502 let req = self
503 .reqs
504 .iter_mut()
505 .find(|r| r.stream_id == Some(s))
506 .unwrap();
507
508 match &mut req.response_writer {
509 Some(rw) => {
510 rw.write_all(&buf[..read]).ok();
511 },
512
513 None => {
514 self.output_sink.borrow_mut()(unsafe {
515 String::from_utf8_unchecked(stream_buf.to_vec())
516 });
517 },
518 }
519
520 if &s % 4 == 0 && fin {
526 self.reqs_complete += 1;
527 let reqs_count = self.reqs.len();
528
529 debug!(
530 "{}/{} responses received",
531 self.reqs_complete, reqs_count
532 );
533
534 if self.reqs_complete == reqs_count {
535 info!(
536 "{}/{} response(s) received in {:?}, closing...",
537 self.reqs_complete,
538 reqs_count,
539 req_start.elapsed()
540 );
541
542 match conn.close(true, 0x00, b"kthxbye") {
543 Ok(_) | Err(quiche::Error::Done) => (),
545
546 Err(e) => panic!("error closing conn: {:?}", e),
547 }
548
549 break;
550 }
551 }
552 }
553 }
554 }
555
556 fn report_incomplete(&self, start: &std::time::Instant) -> bool {
557 if self.reqs_complete != self.reqs.len() {
558 error!(
559 "connection timed out after {:?} and only completed {}/{} requests",
560 start.elapsed(),
561 self.reqs_complete,
562 self.reqs.len()
563 );
564
565 return true;
566 }
567
568 false
569 }
570
571 fn handle_requests(
572 &mut self, conn: &mut quiche::Connection,
573 partial_requests: &mut HashMap<u64, PartialRequest>,
574 partial_responses: &mut HashMap<u64, PartialResponse>, root: &str,
575 index: &str, buf: &mut [u8],
576 ) -> quiche::h3::Result<()> {
577 for s in conn.readable() {
579 while let Ok((read, fin)) = conn.stream_recv(s, buf) {
580 trace!("{} received {} bytes", conn.trace_id(), read);
581
582 let stream_buf = &buf[..read];
583
584 trace!(
585 "{} stream {} has {} bytes (fin? {})",
586 conn.trace_id(),
587 s,
588 stream_buf.len(),
589 fin
590 );
591
592 let stream_buf =
593 if let Some(partial) = partial_requests.get_mut(&s) {
594 partial.req.extend_from_slice(stream_buf);
595
596 if !partial.req.ends_with(b"\r\n") {
597 return Ok(());
598 }
599
600 &partial.req
601 } else {
602 if !stream_buf.ends_with(b"\r\n") {
603 let request = PartialRequest {
604 req: stream_buf.to_vec(),
605 };
606
607 partial_requests.insert(s, request);
608 return Ok(());
609 }
610
611 stream_buf
612 };
613
614 if stream_buf.starts_with(b"GET ") {
615 let uri = &stream_buf[4..stream_buf.len() - 2];
616 let uri = String::from_utf8(uri.to_vec()).unwrap();
617 let uri = String::from(uri.lines().next().unwrap());
618 let uri = path::Path::new(&uri);
619 let mut path = path::PathBuf::from(root);
620
621 partial_requests.remove(&s);
622
623 for c in uri.components() {
624 if let path::Component::Normal(v) = c {
625 path.push(v)
626 }
627 }
628
629 path = autoindex(path, index);
630
631 info!(
632 "{} got GET request for {:?} on stream {}",
633 conn.trace_id(),
634 path,
635 s
636 );
637
638 let body = std::fs::read(path.as_path())
639 .unwrap_or_else(|_| b"Not Found!\r\n".to_vec());
640
641 info!(
642 "{} sending response of size {} on stream {}",
643 conn.trace_id(),
644 body.len(),
645 s
646 );
647
648 let written = match conn.stream_send(s, &body, true) {
649 Ok(v) => v,
650
651 Err(quiche::Error::Done) => 0,
652
653 Err(e) => {
654 error!(
655 "{} stream send failed {:?}",
656 conn.trace_id(),
657 e
658 );
659 return Err(From::from(e));
660 },
661 };
662
663 if written < body.len() {
664 let response = PartialResponse {
665 headers: None,
666 priority: None,
667 body,
668 written,
669 };
670
671 partial_responses.insert(s, response);
672 }
673 }
674 }
675 }
676
677 Ok(())
678 }
679
680 fn handle_writable(
681 &mut self, conn: &mut quiche::Connection,
682 partial_responses: &mut HashMap<u64, PartialResponse>, stream_id: u64,
683 ) {
684 debug!(
685 "{} response stream {} is writable with capacity {:?}",
686 conn.trace_id(),
687 stream_id,
688 conn.stream_capacity(stream_id)
689 );
690
691 if !partial_responses.contains_key(&stream_id) {
692 return;
693 }
694
695 let resp = partial_responses.get_mut(&stream_id).unwrap();
696 let body = &resp.body[resp.written..];
697
698 let written = match conn.stream_send(stream_id, body, true) {
699 Ok(v) => v,
700
701 Err(quiche::Error::Done) => 0,
702
703 Err(e) => {
704 partial_responses.remove(&stream_id);
705
706 error!("{} stream send failed {:?}", conn.trace_id(), e);
707 return;
708 },
709 };
710
711 resp.written += written;
712
713 if resp.written == resp.body.len() {
714 partial_responses.remove(&stream_id);
715 }
716 }
717}
718
719pub struct Http3DgramSender {
720 dgram_count: u64,
721 pub dgram_content: String,
722 pub flow_id: u64,
723 pub dgrams_sent: u64,
724}
725
726impl Http3DgramSender {
727 pub fn new(dgram_count: u64, dgram_content: String, flow_id: u64) -> Self {
728 Self {
729 dgram_count,
730 dgram_content,
731 flow_id,
732 dgrams_sent: 0,
733 }
734 }
735}
736
737fn make_h3_config(
738 max_field_section_size: Option<u64>, qpack_max_table_capacity: Option<u64>,
739 qpack_blocked_streams: Option<u64>,
740) -> quiche::h3::Config {
741 let mut config = quiche::h3::Config::new().unwrap();
742
743 if let Some(v) = max_field_section_size {
744 config.set_max_field_section_size(v);
745 }
746
747 if let Some(v) = qpack_max_table_capacity {
748 config.set_qpack_max_table_capacity(v.clamp(0, 0));
750 }
751
752 if let Some(v) = qpack_blocked_streams {
753 config.set_qpack_blocked_streams(v.clamp(0, 0));
755 }
756
757 config
758}
759
760pub struct Http3Conn {
761 h3_conn: quiche::h3::Connection,
762 reqs_hdrs_sent: usize,
763 reqs_complete: usize,
764 largest_processed_request: u64,
765 reqs: Vec<Http3Request>,
766 body: Option<Vec<u8>>,
767 sent_body_bytes: HashMap<u64, usize>,
768 dump_json: bool,
769 dgram_sender: Option<Http3DgramSender>,
770 output_sink: Rc<RefCell<dyn FnMut(String)>>,
771}
772
773impl Http3Conn {
774 #[allow(clippy::too_many_arguments)]
775 pub fn with_urls(
776 conn: &mut quiche::Connection, urls: &[url::Url], reqs_cardinal: u64,
777 req_headers: &[String], body: &Option<Vec<u8>>, method: &str,
778 send_priority_update: bool, max_field_section_size: Option<u64>,
779 qpack_max_table_capacity: Option<u64>,
780 qpack_blocked_streams: Option<u64>, dump_json: Option<usize>,
781 dgram_sender: Option<Http3DgramSender>,
782 output_sink: Rc<RefCell<dyn FnMut(String)>>,
783 ) -> Box<dyn HttpConn> {
784 let mut reqs = Vec::new();
785 for url in urls {
786 for i in 1..=reqs_cardinal {
787 let authority = match url.port() {
788 Some(port) => format!("{}:{}", url.host_str().unwrap(), port),
789
790 None => url.host_str().unwrap().to_string(),
791 };
792
793 let mut hdrs = vec![
794 quiche::h3::Header::new(b":method", method.as_bytes()),
795 quiche::h3::Header::new(b":scheme", url.scheme().as_bytes()),
796 quiche::h3::Header::new(b":authority", authority.as_bytes()),
797 quiche::h3::Header::new(
798 b":path",
799 url[url::Position::BeforePath..].as_bytes(),
800 ),
801 quiche::h3::Header::new(b"user-agent", b"quiche"),
802 ];
803
804 let priority = if send_priority_update {
805 priority_from_query_string(url)
806 } else {
807 None
808 };
809
810 for header in req_headers {
812 let header_split: Vec<&str> =
813 header.splitn(2, ": ").collect();
814
815 if header_split.len() != 2 {
816 panic!("malformed header provided - \"{}\"", header);
817 }
818
819 hdrs.push(quiche::h3::Header::new(
820 header_split[0].as_bytes(),
821 header_split[1].as_bytes(),
822 ));
823 }
824
825 if body.is_some() {
826 hdrs.push(quiche::h3::Header::new(
827 b"content-length",
828 body.as_ref().unwrap().len().to_string().as_bytes(),
829 ));
830 }
831
832 reqs.push(Http3Request {
833 url: url.clone(),
834 cardinal: i,
835 hdrs,
836 priority,
837 response_hdrs: Vec::new(),
838 response_body: Vec::new(),
839 response_body_max: dump_json.unwrap_or_default(),
840 stream_id: None,
841 response_writer: None,
842 });
843 }
844 }
845
846 let h_conn = Http3Conn {
847 h3_conn: quiche::h3::Connection::with_transport(
848 conn,
849 &make_h3_config(
850 max_field_section_size,
851 qpack_max_table_capacity,
852 qpack_blocked_streams,
853 ),
854 ).expect("Unable to create HTTP/3 connection, check the server's uni stream limit and window size"),
855 reqs_hdrs_sent: 0,
856 reqs_complete: 0,
857 largest_processed_request: 0,
858 reqs,
859 body: body.as_ref().map(|b| b.to_vec()),
860 sent_body_bytes: HashMap::new(),
861 dump_json: dump_json.is_some(),
862 dgram_sender,
863 output_sink,
864 };
865
866 Box::new(h_conn)
867 }
868
869 pub fn with_conn(
870 conn: &mut quiche::Connection, max_field_section_size: Option<u64>,
871 qpack_max_table_capacity: Option<u64>,
872 qpack_blocked_streams: Option<u64>,
873 dgram_sender: Option<Http3DgramSender>,
874 output_sink: Rc<RefCell<dyn FnMut(String)>>,
875 ) -> std::result::Result<Box<dyn HttpConn>, String> {
876 let h3_conn = quiche::h3::Connection::with_transport(
877 conn,
878 &make_h3_config(
879 max_field_section_size,
880 qpack_max_table_capacity,
881 qpack_blocked_streams,
882 ),
883 ).map_err(|_| "Unable to create HTTP/3 connection, check the client's uni stream limit and window size")?;
884
885 let h_conn = Http3Conn {
886 h3_conn,
887 reqs_hdrs_sent: 0,
888 reqs_complete: 0,
889 largest_processed_request: 0,
890 reqs: Vec::new(),
891 body: None,
892 sent_body_bytes: HashMap::new(),
893 dump_json: false,
894 dgram_sender,
895 output_sink,
896 };
897
898 Ok(Box::new(h_conn))
899 }
900
901 fn build_h3_response(
903 root: &str, index: &str, request: &[quiche::h3::Header],
904 ) -> Http3ResponseBuilderResult {
905 let mut file_path = path::PathBuf::from(root);
906 let mut scheme = None;
907 let mut authority = None;
908 let mut host = None;
909 let mut path = None;
910 let mut method = None;
911 let mut priority = vec![];
912
913 for hdr in request {
915 match hdr.name() {
916 b":scheme" => {
917 if scheme.is_some() {
918 return Err((
919 H3_MESSAGE_ERROR,
920 ":scheme cannot be duplicated".to_string(),
921 ));
922 }
923
924 scheme = Some(std::str::from_utf8(hdr.value()).unwrap());
925 },
926
927 b":authority" => {
928 if authority.is_some() {
929 return Err((
930 H3_MESSAGE_ERROR,
931 ":authority cannot be duplicated".to_string(),
932 ));
933 }
934
935 authority = Some(std::str::from_utf8(hdr.value()).unwrap());
936 },
937
938 b":path" => {
939 if path.is_some() {
940 return Err((
941 H3_MESSAGE_ERROR,
942 ":path cannot be duplicated".to_string(),
943 ));
944 }
945
946 path = Some(std::str::from_utf8(hdr.value()).unwrap())
947 },
948
949 b":method" => {
950 if method.is_some() {
951 return Err((
952 H3_MESSAGE_ERROR,
953 ":method cannot be duplicated".to_string(),
954 ));
955 }
956
957 method = Some(std::str::from_utf8(hdr.value()).unwrap())
958 },
959
960 b":protocol" => {
961 return Err((
962 H3_MESSAGE_ERROR,
963 ":protocol not supported".to_string(),
964 ));
965 },
966
967 b"priority" => priority = hdr.value().to_vec(),
968
969 b"host" => host = Some(std::str::from_utf8(hdr.value()).unwrap()),
970
971 _ => (),
972 }
973 }
974
975 let decided_method = match method {
976 Some(method) => {
977 match method {
978 "" =>
979 return Err((
980 H3_MESSAGE_ERROR,
981 ":method value cannot be empty".to_string(),
982 )),
983
984 "CONNECT" => {
985 let headers = vec![
987 quiche::h3::Header::new(
988 b":status",
989 "405".to_string().as_bytes(),
990 ),
991 quiche::h3::Header::new(b"server", b"quiche"),
992 ];
993
994 return Ok((headers, b"".to_vec(), Default::default()));
995 },
996
997 _ => method,
998 }
999 },
1000
1001 None =>
1002 return Err((
1003 H3_MESSAGE_ERROR,
1004 ":method cannot be missing".to_string(),
1005 )),
1006 };
1007
1008 let decided_scheme = match scheme {
1009 Some(scheme) => {
1010 if scheme != "http" && scheme != "https" {
1011 let headers = vec![
1012 quiche::h3::Header::new(
1013 b":status",
1014 "400".to_string().as_bytes(),
1015 ),
1016 quiche::h3::Header::new(b"server", b"quiche"),
1017 ];
1018
1019 return Ok((
1020 headers,
1021 b"Invalid scheme".to_vec(),
1022 Default::default(),
1023 ));
1024 }
1025
1026 scheme
1027 },
1028
1029 None =>
1030 return Err((
1031 H3_MESSAGE_ERROR,
1032 ":scheme cannot be missing".to_string(),
1033 )),
1034 };
1035
1036 let decided_host = match (authority, host) {
1037 (None, Some("")) =>
1038 return Err((
1039 H3_MESSAGE_ERROR,
1040 "host value cannot be empty".to_string(),
1041 )),
1042
1043 (Some(""), None) =>
1044 return Err((
1045 H3_MESSAGE_ERROR,
1046 ":authority value cannot be empty".to_string(),
1047 )),
1048
1049 (Some(""), Some("")) =>
1050 return Err((
1051 H3_MESSAGE_ERROR,
1052 ":authority and host value cannot be empty".to_string(),
1053 )),
1054
1055 (None, None) =>
1056 return Err((
1057 H3_MESSAGE_ERROR,
1058 ":authority and host missing".to_string(),
1059 )),
1060
1061 (..) => authority.unwrap(),
1063 };
1064
1065 let decided_path = match path {
1066 Some("") =>
1067 return Err((
1068 H3_MESSAGE_ERROR,
1069 ":path value cannot be empty".to_string(),
1070 )),
1071
1072 None =>
1073 return Err((
1074 H3_MESSAGE_ERROR,
1075 ":path cannot be missing".to_string(),
1076 )),
1077
1078 Some(path) => path,
1079 };
1080
1081 let url = format!("{decided_scheme}://{decided_host}{decided_path}");
1082 let url = url::Url::parse(&url).unwrap();
1083
1084 let pathbuf = path::PathBuf::from(url.path());
1085 let pathbuf = autoindex(pathbuf, index);
1086
1087 let query_priority = priority_field_value_from_query_string(&url);
1090
1091 if let Some(p) = query_priority {
1092 priority = p.as_bytes().to_vec();
1093 }
1094
1095 let (status, body) = match decided_method {
1096 "GET" => {
1097 for c in pathbuf.components() {
1098 if let path::Component::Normal(v) = c {
1099 file_path.push(v)
1100 }
1101 }
1102
1103 match std::fs::read(file_path.as_path()) {
1104 Ok(data) => (200, data),
1105
1106 Err(_) => (404, b"Not Found!".to_vec()),
1107 }
1108 },
1109
1110 _ => (405, Vec::new()),
1111 };
1112
1113 let headers = vec![
1114 quiche::h3::Header::new(b":status", status.to_string().as_bytes()),
1115 quiche::h3::Header::new(b"server", b"quiche"),
1116 quiche::h3::Header::new(
1117 b"content-length",
1118 body.len().to_string().as_bytes(),
1119 ),
1120 ];
1121
1122 Ok((headers, body, priority))
1123 }
1124}
1125
1126impl HttpConn for Http3Conn {
1127 fn send_requests(
1128 &mut self, conn: &mut quiche::Connection, target_path: &Option<String>,
1129 ) {
1130 let mut reqs_done = 0;
1131
1132 for req in self.reqs.iter_mut().skip(self.reqs_hdrs_sent) {
1134 let s = match self.h3_conn.send_request(
1135 conn,
1136 &req.hdrs,
1137 self.body.is_none(),
1138 ) {
1139 Ok(v) => v,
1140
1141 Err(quiche::h3::Error::TransportError(
1142 quiche::Error::StreamLimit,
1143 )) => {
1144 debug!("not enough stream credits, retry later...");
1145 break;
1146 },
1147
1148 Err(quiche::h3::Error::StreamBlocked) => {
1149 debug!("stream is blocked, retry later...");
1150 break;
1151 },
1152
1153 Err(e) => {
1154 error!("failed to send request {:?}", e);
1155 break;
1156 },
1157 };
1158
1159 debug!("Sent HTTP request {:?}", &req.hdrs);
1160
1161 if let Some(priority) = &req.priority {
1162 self.h3_conn
1164 .send_priority_update_for_request(conn, s, priority)
1165 .ok();
1166 }
1167
1168 req.stream_id = Some(s);
1169 req.response_writer =
1170 make_resource_writer(&req.url, target_path, req.cardinal);
1171 self.sent_body_bytes.insert(s, 0);
1172
1173 reqs_done += 1;
1174 }
1175 self.reqs_hdrs_sent += reqs_done;
1176
1177 if let Some(body) = &self.body {
1179 for (stream_id, sent_bytes) in self.sent_body_bytes.iter_mut() {
1180 if *sent_bytes == body.len() {
1181 continue;
1182 }
1183
1184 let sent = match self.h3_conn.send_body(
1187 conn,
1188 *stream_id,
1189 &body[*sent_bytes..],
1190 true,
1191 ) {
1192 Ok(v) => v,
1193
1194 Err(quiche::h3::Error::Done) => 0,
1195
1196 Err(e) => {
1197 error!("failed to send request body {:?}", e);
1198 continue;
1199 },
1200 };
1201
1202 *sent_bytes += sent;
1203 }
1204 }
1205
1206 if let Some(ds) = self.dgram_sender.as_mut() {
1208 let mut dgrams_done = 0;
1209
1210 for _ in ds.dgrams_sent..ds.dgram_count {
1211 match send_h3_dgram(conn, ds.flow_id, ds.dgram_content.as_bytes())
1212 {
1213 Ok(v) => v,
1214
1215 Err(e) => {
1216 error!("failed to send dgram {:?}", e);
1217 break;
1218 },
1219 }
1220
1221 dgrams_done += 1;
1222 }
1223
1224 ds.dgrams_sent += dgrams_done;
1225 }
1226 }
1227
1228 fn handle_responses(
1229 &mut self, conn: &mut quiche::Connection, buf: &mut [u8],
1230 req_start: &std::time::Instant,
1231 ) {
1232 loop {
1233 match self.h3_conn.poll(conn) {
1234 Ok((stream_id, quiche::h3::Event::Headers { list, .. })) => {
1235 debug!(
1236 "got response headers {:?} on stream id {}",
1237 hdrs_to_strings(&list),
1238 stream_id
1239 );
1240
1241 let req = self
1242 .reqs
1243 .iter_mut()
1244 .find(|r| r.stream_id == Some(stream_id))
1245 .unwrap();
1246
1247 req.response_hdrs = list;
1248 },
1249
1250 Ok((stream_id, quiche::h3::Event::Data)) => {
1251 while let Ok(read) =
1252 self.h3_conn.recv_body(conn, stream_id, buf)
1253 {
1254 debug!(
1255 "got {} bytes of response data on stream {}",
1256 read, stream_id
1257 );
1258
1259 let req = self
1260 .reqs
1261 .iter_mut()
1262 .find(|r| r.stream_id == Some(stream_id))
1263 .unwrap();
1264
1265 let len = std::cmp::min(
1266 read,
1267 req.response_body_max - req.response_body.len(),
1268 );
1269 req.response_body.extend_from_slice(&buf[..len]);
1270
1271 match &mut req.response_writer {
1272 Some(rw) => {
1273 rw.write_all(&buf[..read]).ok();
1274 },
1275
1276 None =>
1277 if !self.dump_json {
1278 self.output_sink.borrow_mut()(unsafe {
1279 String::from_utf8_unchecked(
1280 buf[..read].to_vec(),
1281 )
1282 });
1283 },
1284 }
1285 }
1286 },
1287
1288 Ok((_stream_id, quiche::h3::Event::Finished)) => {
1289 self.reqs_complete += 1;
1290 let reqs_count = self.reqs.len();
1291
1292 debug!(
1293 "{}/{} responses received",
1294 self.reqs_complete, reqs_count
1295 );
1296
1297 if self.reqs_complete == reqs_count {
1298 info!(
1299 "{}/{} response(s) received in {:?}, closing...",
1300 self.reqs_complete,
1301 reqs_count,
1302 req_start.elapsed()
1303 );
1304
1305 if self.dump_json {
1306 dump_json(
1307 &self.reqs,
1308 &mut *self.output_sink.borrow_mut(),
1309 );
1310 }
1311
1312 match conn.close(true, 0x100, b"kthxbye") {
1313 Ok(_) | Err(quiche::Error::Done) => (),
1315
1316 Err(e) => panic!("error closing conn: {:?}", e),
1317 }
1318
1319 break;
1320 }
1321 },
1322
1323 Ok((_stream_id, quiche::h3::Event::Reset(e))) => {
1324 error!("request was reset by peer with {}, closing...", e);
1325
1326 match conn.close(true, 0x100, b"kthxbye") {
1327 Ok(_) | Err(quiche::Error::Done) => (),
1329
1330 Err(e) => panic!("error closing conn: {:?}", e),
1331 }
1332
1333 break;
1334 },
1335
1336 Ok((
1337 prioritized_element_id,
1338 quiche::h3::Event::PriorityUpdate,
1339 )) => {
1340 info!(
1341 "{} PRIORITY_UPDATE triggered for element ID={}",
1342 conn.trace_id(),
1343 prioritized_element_id
1344 );
1345 },
1346
1347 Ok((goaway_id, quiche::h3::Event::GoAway)) => {
1348 info!(
1349 "{} got GOAWAY with ID {} ",
1350 conn.trace_id(),
1351 goaway_id
1352 );
1353 },
1354
1355 Err(quiche::h3::Error::Done) => {
1356 break;
1357 },
1358
1359 Err(e) => {
1360 error!("HTTP/3 processing failed: {:?}", e);
1361
1362 break;
1363 },
1364 }
1365 }
1366
1367 while let Ok(len) = conn.dgram_recv(buf) {
1369 let mut b = octets::Octets::with_slice(buf);
1370 if let Ok(flow_id) = b.get_varint() {
1371 info!(
1372 "Received DATAGRAM flow_id={} len={} data={:?}",
1373 flow_id,
1374 len,
1375 buf[b.off()..len].to_vec()
1376 );
1377 }
1378 }
1379 }
1380
1381 fn report_incomplete(&self, start: &std::time::Instant) -> bool {
1382 if self.reqs_complete != self.reqs.len() {
1383 error!(
1384 "connection timed out after {:?} and only completed {}/{} requests",
1385 start.elapsed(),
1386 self.reqs_complete,
1387 self.reqs.len()
1388 );
1389
1390 if self.dump_json {
1391 dump_json(&self.reqs, &mut *self.output_sink.borrow_mut());
1392 }
1393
1394 return true;
1395 }
1396
1397 false
1398 }
1399
1400 fn handle_requests(
1401 &mut self, conn: &mut quiche::Connection,
1402 _partial_requests: &mut HashMap<u64, PartialRequest>,
1403 partial_responses: &mut HashMap<u64, PartialResponse>, root: &str,
1404 index: &str, buf: &mut [u8],
1405 ) -> quiche::h3::Result<()> {
1406 loop {
1411 match self.h3_conn.poll(conn) {
1412 Ok((stream_id, quiche::h3::Event::Headers { list, .. })) => {
1413 info!(
1414 "{} got request {:?} on stream id {}",
1415 conn.trace_id(),
1416 hdrs_to_strings(&list),
1417 stream_id
1418 );
1419
1420 self.largest_processed_request =
1421 std::cmp::max(self.largest_processed_request, stream_id);
1422
1423 conn.stream_shutdown(stream_id, quiche::Shutdown::Read, 0)
1428 .unwrap();
1429
1430 let (mut headers, body, mut priority) =
1431 match Http3Conn::build_h3_response(root, index, &list) {
1432 Ok(v) => v,
1433
1434 Err((error_code, _)) => {
1435 conn.stream_shutdown(
1436 stream_id,
1437 quiche::Shutdown::Write,
1438 error_code,
1439 )
1440 .unwrap();
1441 continue;
1442 },
1443 };
1444
1445 match self.h3_conn.take_last_priority_update(stream_id) {
1446 Ok(v) => {
1447 priority = v;
1448 },
1449
1450 Err(quiche::h3::Error::Done) => (),
1451
1452 Err(e) => error!(
1453 "{} error taking PRIORITY_UPDATE {}",
1454 conn.trace_id(),
1455 e
1456 ),
1457 }
1458
1459 if !priority.is_empty() {
1460 headers.push(quiche::h3::Header::new(
1461 b"priority",
1462 priority.as_slice(),
1463 ));
1464 }
1465
1466 #[cfg(feature = "sfv")]
1467 let priority =
1468 quiche::h3::Priority::try_from(priority.as_slice())
1469 .unwrap_or_default();
1470
1471 #[cfg(not(feature = "sfv"))]
1472 let priority = quiche::h3::Priority::default();
1473
1474 info!(
1475 "{} prioritizing response on stream {} as {:?}",
1476 conn.trace_id(),
1477 stream_id,
1478 priority
1479 );
1480
1481 match self.h3_conn.send_response_with_priority(
1482 conn, stream_id, &headers, &priority, false,
1483 ) {
1484 Ok(v) => v,
1485
1486 Err(quiche::h3::Error::StreamBlocked) => {
1487 let response = PartialResponse {
1488 headers: Some(headers),
1489 priority: Some(priority),
1490 body,
1491 written: 0,
1492 };
1493
1494 partial_responses.insert(stream_id, response);
1495 continue;
1496 },
1497
1498 Err(e) => {
1499 error!(
1500 "{} stream send failed {:?}",
1501 conn.trace_id(),
1502 e
1503 );
1504
1505 break;
1506 },
1507 }
1508
1509 let response = PartialResponse {
1510 headers: None,
1511 priority: None,
1512 body,
1513 written: 0,
1514 };
1515
1516 partial_responses.insert(stream_id, response);
1517 },
1518
1519 Ok((stream_id, quiche::h3::Event::Data)) => {
1520 info!(
1521 "{} got data on stream id {}",
1522 conn.trace_id(),
1523 stream_id
1524 );
1525 },
1526
1527 Ok((_stream_id, quiche::h3::Event::Finished)) => (),
1528
1529 Ok((_stream_id, quiche::h3::Event::Reset { .. })) => (),
1530
1531 Ok((
1532 prioritized_element_id,
1533 quiche::h3::Event::PriorityUpdate,
1534 )) => {
1535 info!(
1536 "{} PRIORITY_UPDATE triggered for element ID={}",
1537 conn.trace_id(),
1538 prioritized_element_id
1539 );
1540 },
1541
1542 Ok((goaway_id, quiche::h3::Event::GoAway)) => {
1543 trace!(
1544 "{} got GOAWAY with ID {} ",
1545 conn.trace_id(),
1546 goaway_id
1547 );
1548 self.h3_conn
1549 .send_goaway(conn, self.largest_processed_request)?;
1550 },
1551
1552 Err(quiche::h3::Error::Done) => {
1553 break;
1554 },
1555
1556 Err(e) => {
1557 error!("{} HTTP/3 error {:?}", conn.trace_id(), e);
1558
1559 return Err(e);
1560 },
1561 }
1562 }
1563
1564 for stream_id in writable_response_streams(conn) {
1566 self.handle_writable(conn, partial_responses, stream_id);
1567 }
1568
1569 while let Ok(len) = conn.dgram_recv(buf) {
1571 let mut b = octets::Octets::with_slice(buf);
1572 if let Ok(flow_id) = b.get_varint() {
1573 info!(
1574 "Received DATAGRAM flow_id={} len={} data={:?}",
1575 flow_id,
1576 len,
1577 buf[b.off()..len].to_vec()
1578 );
1579 }
1580 }
1581
1582 if let Some(ds) = self.dgram_sender.as_mut() {
1583 let mut dgrams_done = 0;
1584
1585 for _ in ds.dgrams_sent..ds.dgram_count {
1586 match send_h3_dgram(conn, ds.flow_id, ds.dgram_content.as_bytes())
1587 {
1588 Ok(v) => v,
1589
1590 Err(e) => {
1591 error!("failed to send dgram {:?}", e);
1592 break;
1593 },
1594 }
1595
1596 dgrams_done += 1;
1597 }
1598
1599 ds.dgrams_sent += dgrams_done;
1600 }
1601
1602 Ok(())
1603 }
1604
1605 fn handle_writable(
1606 &mut self, conn: &mut quiche::Connection,
1607 partial_responses: &mut HashMap<u64, PartialResponse>, stream_id: u64,
1608 ) {
1609 debug!(
1610 "{} response stream {} is writable with capacity {:?}",
1611 conn.trace_id(),
1612 stream_id,
1613 conn.stream_capacity(stream_id)
1614 );
1615
1616 if !partial_responses.contains_key(&stream_id) {
1617 return;
1618 }
1619
1620 let resp = partial_responses.get_mut(&stream_id).unwrap();
1621
1622 if let (Some(headers), Some(priority)) = (&resp.headers, &resp.priority) {
1623 match self.h3_conn.send_response_with_priority(
1624 conn, stream_id, headers, priority, false,
1625 ) {
1626 Ok(_) => (),
1627
1628 Err(quiche::h3::Error::StreamBlocked) => {
1629 return;
1630 },
1631
1632 Err(e) => {
1633 error!("{} stream send failed {:?}", conn.trace_id(), e);
1634 return;
1635 },
1636 }
1637 }
1638
1639 resp.headers = None;
1640 resp.priority = None;
1641
1642 let body = &resp.body[resp.written..];
1643
1644 let written = match self.h3_conn.send_body(conn, stream_id, body, true) {
1645 Ok(v) => v,
1646
1647 Err(quiche::h3::Error::Done) => 0,
1648
1649 Err(e) => {
1650 partial_responses.remove(&stream_id);
1651
1652 error!("{} stream send failed {:?}", conn.trace_id(), e);
1653 return;
1654 },
1655 };
1656
1657 resp.written += written;
1658
1659 if resp.written == resp.body.len() {
1660 partial_responses.remove(&stream_id);
1661 }
1662 }
1663}