1use std::io::prelude::*;
33
34use std::collections::HashMap;
35
36#[cfg(feature = "sfv")]
37use std::convert::TryFrom;
38
39use std::fmt::Write as _;
40
41use std::rc::Rc;
42
43use std::cell::RefCell;
44
45use std::path;
46
47use ring::rand::SecureRandom;
48
49use quiche::ConnectionId;
50
51use quiche::h3::NameValue;
52use quiche::h3::Priority;
53
54pub fn stdout_sink(out: String) {
55 print!("{out}");
56}
57
58const H3_MESSAGE_ERROR: u64 = 0x10E;
59
60pub mod alpns {
64 pub const HTTP_09: [&[u8]; 2] = [b"hq-interop", b"http/0.9"];
65 pub const HTTP_3: [&[u8]; 1] = [b"h3"];
66}
67
68pub struct PartialRequest {
69 pub req: Vec<u8>,
70}
71
72pub struct PartialResponse {
73 pub headers: Option<Vec<quiche::h3::Header>>,
74 pub priority: Option<quiche::h3::Priority>,
75
76 pub body: Vec<u8>,
77
78 pub written: usize,
79}
80
81pub type ClientId = u64;
82
83pub struct Client {
84 pub conn: quiche::Connection,
85
86 pub http_conn: Option<Box<dyn HttpConn>>,
87
88 pub client_id: ClientId,
89
90 pub app_proto_selected: bool,
91
92 pub partial_requests: std::collections::HashMap<u64, PartialRequest>,
93
94 pub partial_responses: std::collections::HashMap<u64, PartialResponse>,
95
96 pub max_datagram_size: usize,
97
98 pub loss_rate: f64,
99
100 pub max_send_burst: usize,
101}
102
103pub type ClientIdMap = HashMap<ConnectionId<'static>, ClientId>;
104pub type ClientMap = HashMap<ClientId, Client>;
105
106fn make_resource_writer(
113 url: &url::Url, target_path: &Option<String>, cardinal: u64,
114) -> Option<std::io::BufWriter<std::fs::File>> {
115 if let Some(tp) = target_path {
116 let resource =
117 url.path_segments().map(|c| c.collect::<Vec<_>>()).unwrap();
118
119 let mut path = format!("{}/{}", tp, resource.iter().last().unwrap());
120
121 if cardinal > 1 {
122 path = format!("{path}.{cardinal}");
123 }
124
125 match std::fs::File::create(&path) {
126 Ok(f) => return Some(std::io::BufWriter::new(f)),
127
128 Err(e) => panic!(
129 "Error creating file for {url}, attempted path was {path}: {e}"
130 ),
131 }
132 }
133
134 None
135}
136
137fn autoindex(path: path::PathBuf, index: &str) -> path::PathBuf {
138 if let Some(path_str) = path.to_str() {
139 if path_str.ends_with('/') {
140 let path_str = format!("{path_str}{index}");
141 return path::PathBuf::from(&path_str);
142 }
143 }
144
145 path
146}
147
148pub fn make_qlog_writer(
150 dir: &std::ffi::OsStr, role: &str, id: &str,
151) -> std::io::BufWriter<std::fs::File> {
152 let mut path = std::path::PathBuf::from(dir);
153 let filename = format!("{role}-{id}.sqlog");
154 path.push(filename);
155
156 match std::fs::File::create(&path) {
157 Ok(f) => std::io::BufWriter::new(f),
158
159 Err(e) =>
160 panic!("Error creating qlog file attempted path was {path:?}: {e}"),
161 }
162}
163
164fn dump_json(reqs: &[Http3Request], output_sink: &mut dyn FnMut(String)) {
165 let mut out = String::new();
166
167 writeln!(out, "{{").unwrap();
168 writeln!(out, " \"entries\": [").unwrap();
169 let mut reqs = reqs.iter().peekable();
170
171 while let Some(req) = reqs.next() {
172 writeln!(out, " {{").unwrap();
173 writeln!(out, " \"request\":{{").unwrap();
174 writeln!(out, " \"headers\":[").unwrap();
175
176 let mut req_hdrs = req.hdrs.iter().peekable();
177 while let Some(h) = req_hdrs.next() {
178 writeln!(out, " {{").unwrap();
179 writeln!(
180 out,
181 " \"name\": \"{}\",",
182 std::str::from_utf8(h.name()).unwrap()
183 )
184 .unwrap();
185 writeln!(
186 out,
187 " \"value\": \"{}\"",
188 std::str::from_utf8(h.value()).unwrap().replace('"', "\\\"")
189 )
190 .unwrap();
191
192 if req_hdrs.peek().is_some() {
193 writeln!(out, " }},").unwrap();
194 } else {
195 writeln!(out, " }}").unwrap();
196 }
197 }
198 writeln!(out, " ]}},").unwrap();
199
200 writeln!(out, " \"response\":{{").unwrap();
201 writeln!(out, " \"headers\":[").unwrap();
202
203 let mut response_hdrs = req.response_hdrs.iter().peekable();
204 while let Some(h) = response_hdrs.next() {
205 writeln!(out, " {{").unwrap();
206 writeln!(
207 out,
208 " \"name\": \"{}\",",
209 std::str::from_utf8(h.name()).unwrap()
210 )
211 .unwrap();
212 writeln!(
213 out,
214 " \"value\": \"{}\"",
215 std::str::from_utf8(h.value()).unwrap().replace('"', "\\\"")
216 )
217 .unwrap();
218
219 if response_hdrs.peek().is_some() {
220 writeln!(out, " }},").unwrap();
221 } else {
222 writeln!(out, " }}").unwrap();
223 }
224 }
225 writeln!(out, " ],").unwrap();
226 writeln!(out, " \"body\": {:?}", req.response_body).unwrap();
227 writeln!(out, " }}").unwrap();
228
229 if reqs.peek().is_some() {
230 writeln!(out, "}},").unwrap();
231 } else {
232 writeln!(out, "}}").unwrap();
233 }
234 }
235 writeln!(out, "]").unwrap();
236 writeln!(out, "}}").unwrap();
237
238 output_sink(out);
239}
240
241pub fn hdrs_to_strings(hdrs: &[quiche::h3::Header]) -> Vec<(String, String)> {
242 hdrs.iter()
243 .map(|h| {
244 let name = String::from_utf8_lossy(h.name()).to_string();
245 let value = String::from_utf8_lossy(h.value()).to_string();
246
247 (name, value)
248 })
249 .collect()
250}
251
252pub fn generate_cid_and_reset_token<T: SecureRandom>(
254 rng: &T,
255) -> (quiche::ConnectionId<'static>, u128) {
256 let mut scid = [0; quiche::MAX_CONN_ID_LEN];
257 rng.fill(&mut scid).unwrap();
258 let scid = scid.to_vec().into();
259 let mut reset_token = [0; 16];
260 rng.fill(&mut reset_token).unwrap();
261 let reset_token = u128::from_be_bytes(reset_token);
262 (scid, reset_token)
263}
264
265pub fn priority_field_value_from_query_string(url: &url::Url) -> Option<String> {
267 let mut priority = "".to_string();
268 for param in url.query_pairs() {
269 if param.0 == "u" {
270 write!(priority, "{}={},", param.0, param.1).ok();
271 }
272
273 if param.0 == "i" && param.1 == "1" {
274 priority.push_str("i,");
275 }
276 }
277
278 if !priority.is_empty() {
279 priority.pop();
281
282 Some(priority)
283 } else {
284 None
285 }
286}
287
288pub fn priority_from_query_string(url: &url::Url) -> Option<Priority> {
290 let mut urgency = None;
291 let mut incremental = None;
292 for param in url.query_pairs() {
293 if param.0 == "u" {
294 urgency = Some(param.1.parse::<u8>().unwrap());
295 }
296
297 if param.0 == "i" && param.1 == "1" {
298 incremental = Some(true);
299 }
300 }
301
302 match (urgency, incremental) {
303 (Some(u), Some(i)) => Some(Priority::new(u, i)),
304
305 (Some(u), None) => Some(Priority::new(u, false)),
306
307 (None, Some(i)) => Some(Priority::new(3, i)),
308
309 (None, None) => None,
310 }
311}
312
313fn send_h3_dgram(
314 conn: &mut quiche::Connection, flow_id: u64, dgram_content: &[u8],
315) -> quiche::Result<()> {
316 info!(
317 "sending HTTP/3 DATAGRAM on flow_id={flow_id} with data {dgram_content:?}"
318 );
319
320 let len = octets::varint_len(flow_id) + dgram_content.len();
321 let mut d = vec![0; len];
322 let mut b = octets::OctetsMut::with_slice(&mut d);
323
324 b.put_varint(flow_id)
325 .map_err(|_| quiche::Error::BufferTooShort)?;
326 b.put_bytes(dgram_content)
327 .map_err(|_| quiche::Error::BufferTooShort)?;
328
329 conn.dgram_send(&d)
330}
331
332pub trait HttpConn {
333 fn send_requests(
334 &mut self, conn: &mut quiche::Connection, target_path: &Option<String>,
335 );
336
337 fn handle_responses(
338 &mut self, conn: &mut quiche::Connection, buf: &mut [u8],
339 req_start: &std::time::Instant,
340 );
341
342 fn report_incomplete(&self, start: &std::time::Instant) -> bool;
343
344 fn handle_requests(
345 &mut self, conn: &mut quiche::Connection,
346 partial_requests: &mut HashMap<u64, PartialRequest>,
347 partial_responses: &mut HashMap<u64, PartialResponse>, root: &str,
348 index: &str, buf: &mut [u8],
349 ) -> quiche::h3::Result<()>;
350
351 fn handle_writable(
352 &mut self, conn: &mut quiche::Connection,
353 partial_responses: &mut HashMap<u64, PartialResponse>, stream_id: u64,
354 );
355}
356
357pub fn writable_response_streams(
358 conn: &quiche::Connection,
359) -> impl Iterator<Item = u64> {
360 conn.writable().filter(|id| id % 4 == 0)
361}
362
363pub struct Http09Request {
365 url: url::Url,
366 cardinal: u64,
367 request_line: String,
368 stream_id: Option<u64>,
369 response_writer: Option<std::io::BufWriter<std::fs::File>>,
370}
371
372struct Http3Request {
374 url: url::Url,
375 cardinal: u64,
376 stream_id: Option<u64>,
377 hdrs: Vec<quiche::h3::Header>,
378 priority: Option<Priority>,
379 response_hdrs: Vec<quiche::h3::Header>,
380 response_body: Vec<u8>,
381 response_body_max: usize,
382 response_writer: Option<std::io::BufWriter<std::fs::File>>,
383}
384
385type Http3ResponseBuilderResult = std::result::Result<
386 (Vec<quiche::h3::Header>, Vec<u8>, Vec<u8>),
387 (u64, String),
388>;
389
390pub struct Http09Conn {
391 stream_id: u64,
392 reqs_sent: usize,
393 reqs_complete: usize,
394 reqs: Vec<Http09Request>,
395 output_sink: Rc<RefCell<dyn FnMut(String)>>,
396}
397
398impl Default for Http09Conn {
399 fn default() -> Self {
400 Http09Conn {
401 stream_id: Default::default(),
402 reqs_sent: Default::default(),
403 reqs_complete: Default::default(),
404 reqs: Default::default(),
405 output_sink: Rc::new(RefCell::new(stdout_sink)),
406 }
407 }
408}
409
410impl Http09Conn {
411 pub fn with_urls(
412 urls: &[url::Url], reqs_cardinal: u64,
413 output_sink: Rc<RefCell<dyn FnMut(String)>>,
414 ) -> Box<dyn HttpConn> {
415 let mut reqs = Vec::new();
416 for url in urls {
417 for i in 1..=reqs_cardinal {
418 let request_line = format!("GET {}\r\n", url.path());
419 reqs.push(Http09Request {
420 url: url.clone(),
421 cardinal: i,
422 request_line,
423 stream_id: None,
424 response_writer: None,
425 });
426 }
427 }
428
429 let h_conn = Http09Conn {
430 stream_id: 0,
431 reqs_sent: 0,
432 reqs_complete: 0,
433 reqs,
434 output_sink,
435 };
436
437 Box::new(h_conn)
438 }
439}
440
441impl HttpConn for Http09Conn {
442 fn send_requests(
443 &mut self, conn: &mut quiche::Connection, target_path: &Option<String>,
444 ) {
445 let mut reqs_done = 0;
446
447 for req in self.reqs.iter_mut().skip(self.reqs_sent) {
448 match conn.stream_send(
449 self.stream_id,
450 req.request_line.as_bytes(),
451 true,
452 ) {
453 Ok(v) => v,
454
455 Err(quiche::Error::StreamLimit) => {
456 debug!("not enough stream credits, retry later...");
457 break;
458 },
459
460 Err(e) => {
461 error!("failed to send request {e:?}");
462 break;
463 },
464 };
465
466 debug!("sending HTTP request {:?}", req.request_line);
467
468 req.stream_id = Some(self.stream_id);
469 req.response_writer =
470 make_resource_writer(&req.url, target_path, req.cardinal);
471
472 self.stream_id += 4;
473
474 reqs_done += 1;
475 }
476
477 self.reqs_sent += reqs_done;
478 }
479
480 fn handle_responses(
481 &mut self, conn: &mut quiche::Connection, buf: &mut [u8],
482 req_start: &std::time::Instant,
483 ) {
484 for s in conn.readable() {
486 while let Ok((read, fin)) = conn.stream_recv(s, buf) {
487 trace!("received {read} bytes");
488
489 let stream_buf = &buf[..read];
490
491 trace!(
492 "stream {} has {} bytes (fin? {})",
493 s,
494 stream_buf.len(),
495 fin
496 );
497
498 let req = self
499 .reqs
500 .iter_mut()
501 .find(|r| r.stream_id == Some(s))
502 .unwrap();
503
504 match &mut req.response_writer {
505 Some(rw) => {
506 rw.write_all(&buf[..read]).ok();
507 },
508
509 None => {
510 self.output_sink.borrow_mut()(unsafe {
511 String::from_utf8_unchecked(stream_buf.to_vec())
512 });
513 },
514 }
515
516 if &s % 4 == 0 && fin {
522 self.reqs_complete += 1;
523 let reqs_count = self.reqs.len();
524
525 debug!(
526 "{}/{} responses received",
527 self.reqs_complete, reqs_count
528 );
529
530 if self.reqs_complete == reqs_count {
531 info!(
532 "{}/{} response(s) received in {:?}, closing...",
533 self.reqs_complete,
534 reqs_count,
535 req_start.elapsed()
536 );
537
538 match conn.close(true, 0x00, b"kthxbye") {
539 Ok(_) | Err(quiche::Error::Done) => (),
541
542 Err(e) => panic!("error closing conn: {e:?}"),
543 }
544
545 break;
546 }
547 }
548 }
549 }
550 }
551
552 fn report_incomplete(&self, start: &std::time::Instant) -> bool {
553 if self.reqs_complete != self.reqs.len() {
554 error!(
555 "connection timed out after {:?} and only completed {}/{} requests",
556 start.elapsed(),
557 self.reqs_complete,
558 self.reqs.len()
559 );
560
561 return true;
562 }
563
564 false
565 }
566
567 fn handle_requests(
568 &mut self, conn: &mut quiche::Connection,
569 partial_requests: &mut HashMap<u64, PartialRequest>,
570 partial_responses: &mut HashMap<u64, PartialResponse>, root: &str,
571 index: &str, buf: &mut [u8],
572 ) -> quiche::h3::Result<()> {
573 for s in conn.readable() {
575 while let Ok((read, fin)) = conn.stream_recv(s, buf) {
576 trace!("{} received {} bytes", conn.trace_id(), read);
577
578 let stream_buf = &buf[..read];
579
580 trace!(
581 "{} stream {} has {} bytes (fin? {})",
582 conn.trace_id(),
583 s,
584 stream_buf.len(),
585 fin
586 );
587
588 let stream_buf =
589 if let Some(partial) = partial_requests.get_mut(&s) {
590 partial.req.extend_from_slice(stream_buf);
591
592 if !partial.req.ends_with(b"\r\n") {
593 return Ok(());
594 }
595
596 &partial.req
597 } else {
598 if !stream_buf.ends_with(b"\r\n") {
599 let request = PartialRequest {
600 req: stream_buf.to_vec(),
601 };
602
603 partial_requests.insert(s, request);
604 return Ok(());
605 }
606
607 stream_buf
608 };
609
610 if stream_buf.starts_with(b"GET ") {
611 let uri = &stream_buf[4..stream_buf.len() - 2];
612 let uri = String::from_utf8(uri.to_vec()).unwrap();
613 let uri = String::from(uri.lines().next().unwrap());
614 let uri = path::Path::new(&uri);
615 let mut path = path::PathBuf::from(root);
616
617 partial_requests.remove(&s);
618
619 for c in uri.components() {
620 if let path::Component::Normal(v) = c {
621 path.push(v)
622 }
623 }
624
625 path = autoindex(path, index);
626
627 info!(
628 "{} got GET request for {:?} on stream {}",
629 conn.trace_id(),
630 path,
631 s
632 );
633
634 let body = std::fs::read(path.as_path())
635 .unwrap_or_else(|_| b"Not Found!\r\n".to_vec());
636
637 info!(
638 "{} sending response of size {} on stream {}",
639 conn.trace_id(),
640 body.len(),
641 s
642 );
643
644 let written = match conn.stream_send(s, &body, true) {
645 Ok(v) => v,
646
647 Err(quiche::Error::Done) => 0,
648
649 Err(e) => {
650 error!(
651 "{} stream send failed {:?}",
652 conn.trace_id(),
653 e
654 );
655 return Err(From::from(e));
656 },
657 };
658
659 if written < body.len() {
660 let response = PartialResponse {
661 headers: None,
662 priority: None,
663 body,
664 written,
665 };
666
667 partial_responses.insert(s, response);
668 }
669 }
670 }
671 }
672
673 Ok(())
674 }
675
676 fn handle_writable(
677 &mut self, conn: &mut quiche::Connection,
678 partial_responses: &mut HashMap<u64, PartialResponse>, stream_id: u64,
679 ) {
680 let stream_cap = conn.stream_capacity(stream_id);
681
682 debug!(
683 "{} response stream {} is writable with capacity {:?}",
684 conn.trace_id(),
685 stream_id,
686 stream_cap,
687 );
688
689 if !partial_responses.contains_key(&stream_id) {
690 return;
691 }
692
693 let resp = partial_responses.get_mut(&stream_id).unwrap();
694 let body = &resp.body[resp.written..];
695
696 let written = match conn.stream_send(stream_id, body, true) {
697 Ok(v) => v,
698
699 Err(quiche::Error::Done) => 0,
700
701 Err(e) => {
702 partial_responses.remove(&stream_id);
703
704 error!("{} stream send failed {:?}", conn.trace_id(), e);
705 return;
706 },
707 };
708
709 resp.written += written;
710
711 if resp.written == resp.body.len() {
712 partial_responses.remove(&stream_id);
713 }
714 }
715}
716
717pub struct Http3DgramSender {
718 dgram_count: u64,
719 pub dgram_content: String,
720 pub flow_id: u64,
721 pub dgrams_sent: u64,
722}
723
724impl Http3DgramSender {
725 pub fn new(dgram_count: u64, dgram_content: String, flow_id: u64) -> Self {
726 Self {
727 dgram_count,
728 dgram_content,
729 flow_id,
730 dgrams_sent: 0,
731 }
732 }
733}
734
735fn make_h3_config(
736 max_field_section_size: Option<u64>, qpack_max_table_capacity: Option<u64>,
737 qpack_blocked_streams: Option<u64>,
738) -> quiche::h3::Config {
739 let mut config = quiche::h3::Config::new().unwrap();
740
741 if let Some(v) = max_field_section_size {
742 config.set_max_field_section_size(v);
743 }
744
745 if let Some(v) = qpack_max_table_capacity {
746 config.set_qpack_max_table_capacity(v.clamp(0, 0));
748 }
749
750 if let Some(v) = qpack_blocked_streams {
751 config.set_qpack_blocked_streams(v.clamp(0, 0));
753 }
754
755 config
756}
757
758pub struct Http3Conn {
759 h3_conn: quiche::h3::Connection,
760 reqs_hdrs_sent: usize,
761 reqs_complete: usize,
762 largest_processed_request: u64,
763 reqs: Vec<Http3Request>,
764 body: Option<Vec<u8>>,
765 sent_body_bytes: HashMap<u64, usize>,
766 dump_json: bool,
767 dgram_sender: Option<Http3DgramSender>,
768 output_sink: Rc<RefCell<dyn FnMut(String)>>,
769}
770
771impl Http3Conn {
772 #[allow(clippy::too_many_arguments)]
773 pub fn with_urls(
774 conn: &mut quiche::Connection, urls: &[url::Url], reqs_cardinal: u64,
775 req_headers: &[String], body: &Option<Vec<u8>>, method: &str,
776 send_priority_update: bool, max_field_section_size: Option<u64>,
777 qpack_max_table_capacity: Option<u64>,
778 qpack_blocked_streams: Option<u64>, dump_json: Option<usize>,
779 dgram_sender: Option<Http3DgramSender>,
780 output_sink: Rc<RefCell<dyn FnMut(String)>>,
781 ) -> Box<dyn HttpConn> {
782 let mut reqs = Vec::new();
783 for url in urls {
784 for i in 1..=reqs_cardinal {
785 let authority = match url.port() {
786 Some(port) => format!("{}:{}", url.host_str().unwrap(), port),
787
788 None => url.host_str().unwrap().to_string(),
789 };
790
791 let mut hdrs = vec![
792 quiche::h3::Header::new(b":method", method.as_bytes()),
793 quiche::h3::Header::new(b":scheme", url.scheme().as_bytes()),
794 quiche::h3::Header::new(b":authority", authority.as_bytes()),
795 quiche::h3::Header::new(
796 b":path",
797 url[url::Position::BeforePath..].as_bytes(),
798 ),
799 quiche::h3::Header::new(b"user-agent", b"quiche"),
800 ];
801
802 let priority = if send_priority_update {
803 priority_from_query_string(url)
804 } else {
805 None
806 };
807
808 for header in req_headers {
810 let header_split: Vec<&str> =
811 header.splitn(2, ": ").collect();
812
813 if header_split.len() != 2 {
814 panic!("malformed header provided - \"{header}\"");
815 }
816
817 hdrs.push(quiche::h3::Header::new(
818 header_split[0].as_bytes(),
819 header_split[1].as_bytes(),
820 ));
821 }
822
823 if body.is_some() {
824 hdrs.push(quiche::h3::Header::new(
825 b"content-length",
826 body.as_ref().unwrap().len().to_string().as_bytes(),
827 ));
828 }
829
830 reqs.push(Http3Request {
831 url: url.clone(),
832 cardinal: i,
833 hdrs,
834 priority,
835 response_hdrs: Vec::new(),
836 response_body: Vec::new(),
837 response_body_max: dump_json.unwrap_or_default(),
838 stream_id: None,
839 response_writer: None,
840 });
841 }
842 }
843
844 let h_conn = Http3Conn {
845 h3_conn: quiche::h3::Connection::with_transport(
846 conn,
847 &make_h3_config(
848 max_field_section_size,
849 qpack_max_table_capacity,
850 qpack_blocked_streams,
851 ),
852 ).expect("Unable to create HTTP/3 connection, check the server's uni stream limit and window size"),
853 reqs_hdrs_sent: 0,
854 reqs_complete: 0,
855 largest_processed_request: 0,
856 reqs,
857 body: body.as_ref().map(|b| b.to_vec()),
858 sent_body_bytes: HashMap::new(),
859 dump_json: dump_json.is_some(),
860 dgram_sender,
861 output_sink,
862 };
863
864 Box::new(h_conn)
865 }
866
867 pub fn with_conn(
868 conn: &mut quiche::Connection, max_field_section_size: Option<u64>,
869 qpack_max_table_capacity: Option<u64>,
870 qpack_blocked_streams: Option<u64>,
871 dgram_sender: Option<Http3DgramSender>,
872 output_sink: Rc<RefCell<dyn FnMut(String)>>,
873 ) -> std::result::Result<Box<dyn HttpConn>, String> {
874 let h3_conn = quiche::h3::Connection::with_transport(
875 conn,
876 &make_h3_config(
877 max_field_section_size,
878 qpack_max_table_capacity,
879 qpack_blocked_streams,
880 ),
881 ).map_err(|_| "Unable to create HTTP/3 connection, check the client's uni stream limit and window size")?;
882
883 let h_conn = Http3Conn {
884 h3_conn,
885 reqs_hdrs_sent: 0,
886 reqs_complete: 0,
887 largest_processed_request: 0,
888 reqs: Vec::new(),
889 body: None,
890 sent_body_bytes: HashMap::new(),
891 dump_json: false,
892 dgram_sender,
893 output_sink,
894 };
895
896 Ok(Box::new(h_conn))
897 }
898
899 fn build_h3_response(
901 root: &str, index: &str, request: &[quiche::h3::Header],
902 ) -> Http3ResponseBuilderResult {
903 let mut file_path = path::PathBuf::from(root);
904 let mut scheme = None;
905 let mut authority = None;
906 let mut host = None;
907 let mut path = None;
908 let mut method = None;
909 let mut priority = vec![];
910
911 for hdr in request {
913 match hdr.name() {
914 b":scheme" => {
915 if scheme.is_some() {
916 return Err((
917 H3_MESSAGE_ERROR,
918 ":scheme cannot be duplicated".to_string(),
919 ));
920 }
921
922 scheme = Some(std::str::from_utf8(hdr.value()).unwrap());
923 },
924
925 b":authority" => {
926 if authority.is_some() {
927 return Err((
928 H3_MESSAGE_ERROR,
929 ":authority cannot be duplicated".to_string(),
930 ));
931 }
932
933 authority = Some(std::str::from_utf8(hdr.value()).unwrap());
934 },
935
936 b":path" => {
937 if path.is_some() {
938 return Err((
939 H3_MESSAGE_ERROR,
940 ":path cannot be duplicated".to_string(),
941 ));
942 }
943
944 path = Some(std::str::from_utf8(hdr.value()).unwrap())
945 },
946
947 b":method" => {
948 if method.is_some() {
949 return Err((
950 H3_MESSAGE_ERROR,
951 ":method cannot be duplicated".to_string(),
952 ));
953 }
954
955 method = Some(std::str::from_utf8(hdr.value()).unwrap())
956 },
957
958 b":protocol" => {
959 return Err((
960 H3_MESSAGE_ERROR,
961 ":protocol not supported".to_string(),
962 ));
963 },
964
965 b"priority" => priority = hdr.value().to_vec(),
966
967 b"host" => host = Some(std::str::from_utf8(hdr.value()).unwrap()),
968
969 _ => (),
970 }
971 }
972
973 let decided_method = match method {
974 Some(method) => {
975 match method {
976 "" =>
977 return Err((
978 H3_MESSAGE_ERROR,
979 ":method value cannot be empty".to_string(),
980 )),
981
982 "CONNECT" => {
983 let headers = vec![
985 quiche::h3::Header::new(
986 b":status",
987 "405".to_string().as_bytes(),
988 ),
989 quiche::h3::Header::new(b"server", b"quiche"),
990 ];
991
992 return Ok((headers, b"".to_vec(), Default::default()));
993 },
994
995 _ => method,
996 }
997 },
998
999 None =>
1000 return Err((
1001 H3_MESSAGE_ERROR,
1002 ":method cannot be missing".to_string(),
1003 )),
1004 };
1005
1006 let decided_scheme = match scheme {
1007 Some(scheme) => {
1008 if scheme != "http" && scheme != "https" {
1009 let headers = vec![
1010 quiche::h3::Header::new(
1011 b":status",
1012 "400".to_string().as_bytes(),
1013 ),
1014 quiche::h3::Header::new(b"server", b"quiche"),
1015 ];
1016
1017 return Ok((
1018 headers,
1019 b"Invalid scheme".to_vec(),
1020 Default::default(),
1021 ));
1022 }
1023
1024 scheme
1025 },
1026
1027 None =>
1028 return Err((
1029 H3_MESSAGE_ERROR,
1030 ":scheme cannot be missing".to_string(),
1031 )),
1032 };
1033
1034 let decided_host = match (authority, host) {
1035 (None, Some("")) =>
1036 return Err((
1037 H3_MESSAGE_ERROR,
1038 "host value cannot be empty".to_string(),
1039 )),
1040
1041 (Some(""), None) =>
1042 return Err((
1043 H3_MESSAGE_ERROR,
1044 ":authority value cannot be empty".to_string(),
1045 )),
1046
1047 (Some(""), Some("")) =>
1048 return Err((
1049 H3_MESSAGE_ERROR,
1050 ":authority and host value cannot be empty".to_string(),
1051 )),
1052
1053 (None, None) =>
1054 return Err((
1055 H3_MESSAGE_ERROR,
1056 ":authority and host missing".to_string(),
1057 )),
1058
1059 (..) => authority.unwrap(),
1061 };
1062
1063 let decided_path = match path {
1064 Some("") =>
1065 return Err((
1066 H3_MESSAGE_ERROR,
1067 ":path value cannot be empty".to_string(),
1068 )),
1069
1070 None =>
1071 return Err((
1072 H3_MESSAGE_ERROR,
1073 ":path cannot be missing".to_string(),
1074 )),
1075
1076 Some(path) => path,
1077 };
1078
1079 let url = format!("{decided_scheme}://{decided_host}{decided_path}");
1080 let url = url::Url::parse(&url).unwrap();
1081
1082 let pathbuf = path::PathBuf::from(url.path());
1083 let pathbuf = autoindex(pathbuf, index);
1084
1085 let query_priority = priority_field_value_from_query_string(&url);
1088
1089 if let Some(p) = query_priority {
1090 priority = p.as_bytes().to_vec();
1091 }
1092
1093 let (status, body) = match decided_method {
1094 "GET" => {
1095 const STREAM_BYTES_PREFIX: &str = "/stream-bytes/";
1096 const STREAM_BYTES_FILL: u8 = 0x57;
1097
1098 if let Some(suffix) = url.path().strip_prefix(STREAM_BYTES_PREFIX)
1099 {
1100 let n = suffix.parse::<usize>().unwrap_or(0);
1101 (200, vec![STREAM_BYTES_FILL; n])
1102 } else {
1103 for c in pathbuf.components() {
1104 if let path::Component::Normal(v) = c {
1105 file_path.push(v)
1106 }
1107 }
1108
1109 match std::fs::read(file_path.as_path()) {
1110 Ok(data) => (200, data),
1111
1112 Err(_) => (404, b"Not Found!".to_vec()),
1113 }
1114 }
1115 },
1116
1117 _ => (405, Vec::new()),
1118 };
1119
1120 let headers = vec![
1121 quiche::h3::Header::new(b":status", status.to_string().as_bytes()),
1122 quiche::h3::Header::new(b"server", b"quiche"),
1123 quiche::h3::Header::new(
1124 b"content-length",
1125 body.len().to_string().as_bytes(),
1126 ),
1127 ];
1128
1129 Ok((headers, body, priority))
1130 }
1131}
1132
1133impl HttpConn for Http3Conn {
1134 fn send_requests(
1135 &mut self, conn: &mut quiche::Connection, target_path: &Option<String>,
1136 ) {
1137 let mut reqs_done = 0;
1138
1139 for req in self.reqs.iter_mut().skip(self.reqs_hdrs_sent) {
1141 let s = match self.h3_conn.send_request(
1142 conn,
1143 &req.hdrs,
1144 self.body.is_none(),
1145 ) {
1146 Ok(v) => v,
1147
1148 Err(quiche::h3::Error::TransportError(
1149 quiche::Error::StreamLimit,
1150 )) => {
1151 debug!("not enough stream credits, retry later...");
1152 break;
1153 },
1154
1155 Err(quiche::h3::Error::StreamBlocked) => {
1156 debug!("stream is blocked, retry later...");
1157 break;
1158 },
1159
1160 Err(e) => {
1161 error!("failed to send request {e:?}");
1162 break;
1163 },
1164 };
1165
1166 debug!("Sent HTTP request {:?}", &req.hdrs);
1167
1168 if let Some(priority) = &req.priority {
1169 self.h3_conn
1171 .send_priority_update_for_request(conn, s, priority)
1172 .ok();
1173 }
1174
1175 req.stream_id = Some(s);
1176 req.response_writer =
1177 make_resource_writer(&req.url, target_path, req.cardinal);
1178 self.sent_body_bytes.insert(s, 0);
1179
1180 reqs_done += 1;
1181 }
1182 self.reqs_hdrs_sent += reqs_done;
1183
1184 if let Some(body) = &self.body {
1186 for (stream_id, sent_bytes) in self.sent_body_bytes.iter_mut() {
1187 if *sent_bytes == body.len() {
1188 continue;
1189 }
1190
1191 let sent = match self.h3_conn.send_body(
1194 conn,
1195 *stream_id,
1196 &body[*sent_bytes..],
1197 true,
1198 ) {
1199 Ok(v) => v,
1200
1201 Err(quiche::h3::Error::Done) => 0,
1202
1203 Err(e) => {
1204 error!("failed to send request body {e:?}");
1205 continue;
1206 },
1207 };
1208
1209 *sent_bytes += sent;
1210 }
1211 }
1212
1213 if let Some(ds) = self.dgram_sender.as_mut() {
1215 let mut dgrams_done = 0;
1216
1217 for _ in ds.dgrams_sent..ds.dgram_count {
1218 match send_h3_dgram(conn, ds.flow_id, ds.dgram_content.as_bytes())
1219 {
1220 Ok(v) => v,
1221
1222 Err(e) => {
1223 error!("failed to send dgram {e:?}");
1224 break;
1225 },
1226 }
1227
1228 dgrams_done += 1;
1229 }
1230
1231 ds.dgrams_sent += dgrams_done;
1232 }
1233 }
1234
1235 fn handle_responses(
1236 &mut self, conn: &mut quiche::Connection, buf: &mut [u8],
1237 req_start: &std::time::Instant,
1238 ) {
1239 loop {
1240 match self.h3_conn.poll(conn) {
1241 Ok((stream_id, quiche::h3::Event::Headers { list, .. })) => {
1242 debug!(
1243 "got response headers {:?} on stream id {}",
1244 hdrs_to_strings(&list),
1245 stream_id
1246 );
1247
1248 let req = self
1249 .reqs
1250 .iter_mut()
1251 .find(|r| r.stream_id == Some(stream_id))
1252 .unwrap();
1253
1254 req.response_hdrs = list;
1255 },
1256
1257 Ok((stream_id, quiche::h3::Event::Data)) => {
1258 while let Ok(read) =
1259 self.h3_conn.recv_body(conn, stream_id, buf)
1260 {
1261 debug!(
1262 "got {read} bytes of response data on stream {stream_id}"
1263 );
1264
1265 let req = self
1266 .reqs
1267 .iter_mut()
1268 .find(|r| r.stream_id == Some(stream_id))
1269 .unwrap();
1270
1271 let len = std::cmp::min(
1272 read,
1273 req.response_body_max - req.response_body.len(),
1274 );
1275 req.response_body.extend_from_slice(&buf[..len]);
1276
1277 match &mut req.response_writer {
1278 Some(rw) => {
1279 rw.write_all(&buf[..read]).ok();
1280 },
1281
1282 None =>
1283 if !self.dump_json {
1284 self.output_sink.borrow_mut()(unsafe {
1285 String::from_utf8_unchecked(
1286 buf[..read].to_vec(),
1287 )
1288 });
1289 },
1290 }
1291 }
1292 },
1293
1294 Ok((_stream_id, quiche::h3::Event::Finished)) => {
1295 self.reqs_complete += 1;
1296 let reqs_count = self.reqs.len();
1297
1298 debug!(
1299 "{}/{} responses received",
1300 self.reqs_complete, reqs_count
1301 );
1302
1303 if self.reqs_complete == reqs_count {
1304 info!(
1305 "{}/{} response(s) received in {:?}, closing...",
1306 self.reqs_complete,
1307 reqs_count,
1308 req_start.elapsed()
1309 );
1310
1311 if self.dump_json {
1312 dump_json(
1313 &self.reqs,
1314 &mut *self.output_sink.borrow_mut(),
1315 );
1316 }
1317
1318 match conn.close(true, 0x100, b"kthxbye") {
1319 Ok(_) | Err(quiche::Error::Done) => (),
1321
1322 Err(e) => panic!("error closing conn: {e:?}"),
1323 }
1324
1325 break;
1326 }
1327 },
1328
1329 Ok((_stream_id, quiche::h3::Event::Reset(e))) => {
1330 error!("request was reset by peer with {e}, closing...");
1331
1332 match conn.close(true, 0x100, b"kthxbye") {
1333 Ok(_) | Err(quiche::Error::Done) => (),
1335
1336 Err(e) => panic!("error closing conn: {e:?}"),
1337 }
1338
1339 break;
1340 },
1341
1342 Ok((
1343 prioritized_element_id,
1344 quiche::h3::Event::PriorityUpdate,
1345 )) => {
1346 info!(
1347 "{} PRIORITY_UPDATE triggered for element ID={}",
1348 conn.trace_id(),
1349 prioritized_element_id
1350 );
1351 },
1352
1353 Ok((goaway_id, quiche::h3::Event::GoAway)) => {
1354 info!(
1355 "{} got GOAWAY with ID {} ",
1356 conn.trace_id(),
1357 goaway_id
1358 );
1359 },
1360
1361 Err(quiche::h3::Error::Done) => {
1362 break;
1363 },
1364
1365 Err(e) => {
1366 error!("HTTP/3 processing failed: {e:?}");
1367
1368 break;
1369 },
1370 }
1371 }
1372
1373 while let Ok(len) = conn.dgram_recv(buf) {
1375 let mut b = octets::Octets::with_slice(buf);
1376 if let Ok(flow_id) = b.get_varint() {
1377 info!(
1378 "Received DATAGRAM flow_id={} len={} data={:?}",
1379 flow_id,
1380 len,
1381 buf[b.off()..len].to_vec()
1382 );
1383 }
1384 }
1385 }
1386
1387 fn report_incomplete(&self, start: &std::time::Instant) -> bool {
1388 if self.reqs_complete != self.reqs.len() {
1389 error!(
1390 "connection timed out after {:?} and only completed {}/{} requests",
1391 start.elapsed(),
1392 self.reqs_complete,
1393 self.reqs.len()
1394 );
1395
1396 if self.dump_json {
1397 dump_json(&self.reqs, &mut *self.output_sink.borrow_mut());
1398 }
1399
1400 return true;
1401 }
1402
1403 false
1404 }
1405
1406 fn handle_requests(
1407 &mut self, conn: &mut quiche::Connection,
1408 _partial_requests: &mut HashMap<u64, PartialRequest>,
1409 partial_responses: &mut HashMap<u64, PartialResponse>, root: &str,
1410 index: &str, buf: &mut [u8],
1411 ) -> quiche::h3::Result<()> {
1412 loop {
1417 match self.h3_conn.poll(conn) {
1418 Ok((stream_id, quiche::h3::Event::Headers { list, .. })) => {
1419 info!(
1420 "{} got request {:?} on stream id {}",
1421 conn.trace_id(),
1422 hdrs_to_strings(&list),
1423 stream_id
1424 );
1425
1426 self.largest_processed_request =
1427 std::cmp::max(self.largest_processed_request, stream_id);
1428
1429 conn.stream_shutdown(stream_id, quiche::Shutdown::Read, 0)
1434 .unwrap();
1435
1436 let (mut headers, body, mut priority) =
1437 match Http3Conn::build_h3_response(root, index, &list) {
1438 Ok(v) => v,
1439
1440 Err((error_code, _)) => {
1441 conn.stream_shutdown(
1442 stream_id,
1443 quiche::Shutdown::Write,
1444 error_code,
1445 )
1446 .unwrap();
1447 continue;
1448 },
1449 };
1450
1451 match self.h3_conn.take_last_priority_update(stream_id) {
1452 Ok(v) => {
1453 priority = v;
1454 },
1455
1456 Err(quiche::h3::Error::Done) => (),
1457
1458 Err(e) => error!(
1459 "{} error taking PRIORITY_UPDATE {}",
1460 conn.trace_id(),
1461 e
1462 ),
1463 }
1464
1465 if !priority.is_empty() {
1466 headers.push(quiche::h3::Header::new(
1467 b"priority",
1468 priority.as_slice(),
1469 ));
1470 }
1471
1472 #[cfg(feature = "sfv")]
1473 let priority =
1474 quiche::h3::Priority::try_from(priority.as_slice())
1475 .unwrap_or_default();
1476
1477 #[cfg(not(feature = "sfv"))]
1478 let priority = quiche::h3::Priority::default();
1479
1480 info!(
1481 "{} prioritizing response on stream {} as {:?}",
1482 conn.trace_id(),
1483 stream_id,
1484 priority
1485 );
1486
1487 match self.h3_conn.send_response_with_priority(
1488 conn, stream_id, &headers, &priority, false,
1489 ) {
1490 Ok(v) => v,
1491
1492 Err(quiche::h3::Error::StreamBlocked) => {
1493 let response = PartialResponse {
1494 headers: Some(headers),
1495 priority: Some(priority),
1496 body,
1497 written: 0,
1498 };
1499
1500 partial_responses.insert(stream_id, response);
1501 continue;
1502 },
1503
1504 Err(e) => {
1505 error!(
1506 "{} stream send failed {:?}",
1507 conn.trace_id(),
1508 e
1509 );
1510
1511 break;
1512 },
1513 }
1514
1515 let response = PartialResponse {
1516 headers: None,
1517 priority: None,
1518 body,
1519 written: 0,
1520 };
1521
1522 partial_responses.insert(stream_id, response);
1523 },
1524
1525 Ok((stream_id, quiche::h3::Event::Data)) => {
1526 info!(
1527 "{} got data on stream id {}",
1528 conn.trace_id(),
1529 stream_id
1530 );
1531 },
1532
1533 Ok((_stream_id, quiche::h3::Event::Finished)) => (),
1534
1535 Ok((_stream_id, quiche::h3::Event::Reset { .. })) => (),
1536
1537 Ok((
1538 prioritized_element_id,
1539 quiche::h3::Event::PriorityUpdate,
1540 )) => {
1541 info!(
1542 "{} PRIORITY_UPDATE triggered for element ID={}",
1543 conn.trace_id(),
1544 prioritized_element_id
1545 );
1546 },
1547
1548 Ok((goaway_id, quiche::h3::Event::GoAway)) => {
1549 trace!(
1550 "{} got GOAWAY with ID {} ",
1551 conn.trace_id(),
1552 goaway_id
1553 );
1554 self.h3_conn
1555 .send_goaway(conn, self.largest_processed_request)?;
1556 },
1557
1558 Err(quiche::h3::Error::Done) => {
1559 break;
1560 },
1561
1562 Err(e) => {
1563 error!("{} HTTP/3 error {:?}", conn.trace_id(), e);
1564
1565 return Err(e);
1566 },
1567 }
1568 }
1569
1570 for stream_id in writable_response_streams(conn) {
1572 self.handle_writable(conn, partial_responses, stream_id);
1573 }
1574
1575 while let Ok(len) = conn.dgram_recv(buf) {
1577 let mut b = octets::Octets::with_slice(buf);
1578 if let Ok(flow_id) = b.get_varint() {
1579 info!(
1580 "Received DATAGRAM flow_id={} len={} data={:?}",
1581 flow_id,
1582 len,
1583 buf[b.off()..len].to_vec()
1584 );
1585 }
1586 }
1587
1588 if let Some(ds) = self.dgram_sender.as_mut() {
1589 let mut dgrams_done = 0;
1590
1591 for _ in ds.dgrams_sent..ds.dgram_count {
1592 match send_h3_dgram(conn, ds.flow_id, ds.dgram_content.as_bytes())
1593 {
1594 Ok(v) => v,
1595
1596 Err(e) => {
1597 error!("failed to send dgram {e:?}");
1598 break;
1599 },
1600 }
1601
1602 dgrams_done += 1;
1603 }
1604
1605 ds.dgrams_sent += dgrams_done;
1606 }
1607
1608 Ok(())
1609 }
1610
1611 fn handle_writable(
1612 &mut self, conn: &mut quiche::Connection,
1613 partial_responses: &mut HashMap<u64, PartialResponse>, stream_id: u64,
1614 ) {
1615 let stream_cap = conn.stream_capacity(stream_id);
1616
1617 debug!(
1618 "{} response stream {} is writable with capacity {:?}",
1619 conn.trace_id(),
1620 stream_id,
1621 stream_cap,
1622 );
1623
1624 if !partial_responses.contains_key(&stream_id) {
1625 return;
1626 }
1627
1628 let resp = partial_responses.get_mut(&stream_id).unwrap();
1629
1630 if let (Some(headers), Some(priority)) = (&resp.headers, &resp.priority) {
1631 match self.h3_conn.send_response_with_priority(
1632 conn, stream_id, headers, priority, false,
1633 ) {
1634 Ok(_) => (),
1635
1636 Err(quiche::h3::Error::StreamBlocked) => {
1637 return;
1638 },
1639
1640 Err(e) => {
1641 error!("{} stream send failed {:?}", conn.trace_id(), e);
1642 return;
1643 },
1644 }
1645 }
1646
1647 resp.headers = None;
1648 resp.priority = None;
1649
1650 let body = &resp.body[resp.written..];
1651
1652 let written = match self.h3_conn.send_body(conn, stream_id, body, true) {
1653 Ok(v) => v,
1654
1655 Err(quiche::h3::Error::Done) => 0,
1656
1657 Err(e) => {
1658 partial_responses.remove(&stream_id);
1659
1660 error!("{} stream send failed {:?}", conn.trace_id(), e);
1661 return;
1662 },
1663 };
1664
1665 resp.written += written;
1666
1667 if resp.written == resp.body.len() {
1668 partial_responses.remove(&stream_id);
1669 }
1670 }
1671}