1use std::cmp;
28
29use std::collections::VecDeque;
30
31use crate::range_buf::BufSplit;
32use crate::range_buf::RangeBuf;
33use crate::BufFactory;
34use crate::Error;
35use crate::Result;
36
37use crate::range_buf::DefaultBufFactory;
38use crate::ranges;
39
40#[cfg(test)]
41const SEND_BUFFER_SIZE: usize = 5;
42
43#[cfg(not(test))]
44const SEND_BUFFER_SIZE: usize = 4096;
45
46struct SendReserve<'a, F: BufFactory> {
47 inner: &'a mut SendBuf<F>,
48 reserved: usize,
49 fin: bool,
50}
51
52impl<F: BufFactory> SendReserve<'_, F> {
53 fn append_buf(&mut self, buf: F::Buf) -> Result<()> {
54 let len = buf.as_ref().len();
55 let inner = &mut self.inner;
56
57 if len > self.reserved {
58 return Err(Error::BufferTooShort);
59 }
60
61 let fin: bool = self.reserved == len && self.fin;
62
63 let buf = RangeBuf::from_raw(buf, inner.off, fin);
64
65 inner.data.push_back(buf);
67
68 inner.off += len as u64;
69 inner.len += len as u64;
70 self.reserved -= len;
71
72 Ok(())
73 }
74}
75
76impl<F: BufFactory> Drop for SendReserve<'_, F> {
77 fn drop(&mut self) {
78 assert_eq!(self.reserved, 0)
79 }
80}
81
82#[derive(Debug, Default)]
92pub struct SendBuf<F = DefaultBufFactory>
93where
94 F: BufFactory,
95{
96 data: VecDeque<RangeBuf<F>>,
98
99 pos: usize,
101
102 off: u64,
104
105 emit_off: u64,
108
109 len: u64,
111
112 max_data: u64,
114
115 blocked_at: Option<u64>,
117
118 fin_off: Option<u64>,
120
121 shutdown: bool,
123
124 acked: ranges::RangeSet,
126
127 error: Option<u64>,
129}
130
131impl<F: BufFactory> SendBuf<F> {
132 pub fn new(max_data: u64) -> SendBuf<F> {
134 SendBuf {
135 max_data,
136 ..SendBuf::default()
137 }
138 }
139
140 fn reserve_for_write(
142 &mut self, mut len: usize, mut fin: bool,
143 ) -> Result<SendReserve<'_, F>> {
144 let max_off = self.off + len as u64;
145
146 if len > self.cap()? {
149 len = self.cap()?;
150 fin = false;
151 }
152
153 if let Some(fin_off) = self.fin_off {
154 if max_off > fin_off {
156 return Err(Error::FinalSize);
157 }
158
159 if max_off == fin_off && !fin {
161 return Err(Error::FinalSize);
162 }
163 }
164
165 if fin {
166 self.fin_off = Some(max_off);
167 }
168
169 if self.ack_off() >= max_off {
171 return Ok(SendReserve {
172 inner: self,
173 reserved: 0,
174 fin,
175 });
176 }
177
178 Ok(SendReserve {
179 inner: self,
180 reserved: len,
181 fin,
182 })
183 }
184
185 pub fn write(&mut self, data: &[u8], fin: bool) -> Result<usize> {
191 let mut reserve = self.reserve_for_write(data.len(), fin)?;
192
193 if reserve.reserved == 0 {
194 return Ok(0);
195 }
196
197 let ret = reserve.reserved;
198
199 for chunk in data[..reserve.reserved].chunks(SEND_BUFFER_SIZE) {
202 reserve.append_buf(F::buf_from_slice(chunk))?;
203 }
204
205 Ok(ret)
206 }
207
208 pub fn append_buf(
214 &mut self, mut data: F::Buf, cap: usize, fin: bool,
215 ) -> Result<(usize, Option<F::Buf>)>
216 where
217 F::Buf: BufSplit,
218 {
219 let len = data.as_ref().len();
220 let mut reserve = self.reserve_for_write(cap.min(len), fin)?;
221
222 if reserve.reserved == 0 {
223 return Ok((0, Some(data)));
224 }
225
226 let remainder =
227 (reserve.reserved < len).then(|| data.split_at(reserve.reserved));
228
229 let ret = reserve.reserved;
230
231 reserve.append_buf(data)?;
232
233 Ok((ret, remainder))
234 }
235
236 pub fn emit(&mut self, out: &mut [u8]) -> Result<(usize, bool)> {
238 let mut out_len = out.len();
239 let out_off = self.off_front();
240
241 let mut next_off = out_off;
242
243 while out_len > 0 {
244 let off_front = self.off_front();
245
246 if self.is_empty() ||
247 off_front >= self.off ||
248 off_front != next_off ||
249 off_front >= self.max_data
250 {
251 break;
252 }
253
254 let buf = match self.data.get_mut(self.pos) {
255 Some(v) => v,
256
257 None => break,
258 };
259
260 if buf.is_empty() {
261 self.pos += 1;
262 continue;
263 }
264
265 let buf_len = cmp::min(buf.len(), out_len);
266 let partial = buf_len < buf.len();
267
268 let out_pos = (next_off - out_off) as usize;
270 out[out_pos..out_pos + buf_len].copy_from_slice(&buf[..buf_len]);
271
272 self.len -= buf_len as u64;
273
274 out_len -= buf_len;
275
276 next_off = buf.off() + buf_len as u64;
277
278 buf.consume(buf_len);
279
280 if partial {
281 break;
283 }
284
285 self.pos += 1;
286 }
287
288 let fin = self.fin_off == Some(next_off);
295
296 self.emit_off = cmp::max(self.emit_off, next_off);
299
300 Ok((out.len() - out_len, fin))
301 }
302
303 pub fn update_max_data(&mut self, max_data: u64) {
305 self.max_data = cmp::max(self.max_data, max_data);
306 }
307
308 pub fn update_blocked_at(&mut self, blocked_at: Option<u64>) {
310 self.blocked_at = blocked_at;
311 }
312
313 pub fn blocked_at(&self) -> Option<u64> {
315 self.blocked_at
316 }
317
318 pub fn ack(&mut self, off: u64, len: usize) {
320 self.acked.insert(off..off + len as u64);
321 }
322
323 pub fn ack_and_drop(&mut self, off: u64, len: usize) {
324 self.ack(off, len);
325
326 let ack_off = self.ack_off();
327
328 if self.data.is_empty() {
329 return;
330 }
331
332 if off > ack_off {
333 return;
334 }
335
336 let mut drop_until = None;
337
338 for (i, buf) in self.data.iter_mut().enumerate() {
340 if buf.off >= ack_off {
343 break;
344 }
345
346 if buf.off < ack_off && ack_off < buf.max_off() {
349 break;
350 }
351
352 drop_until = Some(i);
354 }
355
356 if let Some(drop) = drop_until {
357 self.data.drain(..=drop);
358
359 self.pos = self.pos.saturating_sub(drop + 1);
363 }
364 }
365
366 pub fn retransmit(&mut self, off: u64, len: usize) {
367 let max_off = off + len as u64;
368 let ack_off = self.ack_off();
369
370 if self.data.is_empty() {
371 return;
372 }
373
374 if max_off <= ack_off {
375 return;
376 }
377
378 for i in 0..self.data.len() {
379 let buf = &mut self.data[i];
380
381 if buf.off >= max_off {
382 break;
383 }
384
385 if off > buf.max_off() {
386 continue;
387 }
388
389 let new_buf = if buf.off < max_off && max_off < buf.max_off() {
392 Some(buf.split_off((max_off - buf.off) as usize))
393 } else {
394 None
395 };
396
397 let prev_pos = buf.pos;
398
399 buf.pos = if off > buf.off && off <= buf.max_off() {
402 cmp::min(buf.pos, buf.start + (off - buf.off) as usize)
403 } else {
404 buf.start
405 };
406
407 self.pos = cmp::min(self.pos, i);
408
409 self.len += (prev_pos - buf.pos) as u64;
410
411 if let Some(b) = new_buf {
412 self.data.insert(i + 1, b);
413 }
414 }
415 }
416
417 pub fn reset(&mut self) -> (u64, u64) {
419 let unsent_off = cmp::max(self.off_front(), self.emit_off);
420 let unsent_len = self.off_back().saturating_sub(unsent_off);
421
422 self.fin_off = Some(unsent_off);
423
424 self.data.clear();
426
427 self.off = unsent_off;
429 self.ack(0, self.off as usize);
430
431 self.pos = 0;
432 self.len = 0;
433
434 (self.emit_off, unsent_len)
435 }
436
437 pub fn stop(&mut self, error_code: u64) -> Result<(u64, u64)> {
441 if self.error.is_some() {
442 return Err(Error::Done);
443 }
444
445 let (max_off, unsent) = self.reset();
446
447 self.error = Some(error_code);
448
449 Ok((max_off, unsent))
450 }
451
452 pub fn shutdown(&mut self) -> Result<(u64, u64)> {
454 if self.shutdown {
455 return Err(Error::Done);
456 }
457
458 self.shutdown = true;
459
460 Ok(self.reset())
461 }
462
463 pub fn off_back(&self) -> u64 {
465 self.off
466 }
467
468 pub fn off_front(&self) -> u64 {
470 let mut pos = self.pos;
471
472 while let Some(b) = self.data.get(pos) {
474 if !b.is_empty() {
475 return b.off();
476 }
477
478 pos += 1;
479 }
480
481 self.off
482 }
483
484 pub fn max_off(&self) -> u64 {
486 self.max_data
487 }
488
489 pub fn is_fin(&self) -> bool {
494 if self.fin_off == Some(self.off) {
495 return true;
496 }
497
498 false
499 }
500
501 pub fn is_complete(&self) -> bool {
506 if let Some(fin_off) = self.fin_off {
507 if self.acked == (0..fin_off) {
508 return true;
509 }
510 }
511
512 false
513 }
514
515 pub fn is_stopped(&self) -> bool {
517 self.error.is_some()
518 }
519
520 pub fn is_shutdown(&self) -> bool {
522 self.shutdown
523 }
524
525 pub fn is_empty(&self) -> bool {
527 self.data.is_empty()
528 }
529
530 pub fn ack_off(&self) -> u64 {
532 match self.acked.iter().next() {
533 Some(std::ops::Range { start: 0, end }) => end,
536
537 Some(_) | None => 0,
538 }
539 }
540
541 pub fn cap(&self) -> Result<usize> {
543 if let Some(e) = self.error {
545 return Err(Error::StreamStopped(e));
546 }
547
548 Ok((self.max_data - self.off) as usize)
549 }
550
551 #[allow(dead_code)]
553 pub fn bufs_count(&self) -> usize {
554 self.data.len()
555 }
556}
557
558#[cfg(test)]
559mod tests {
560 use super::*;
561
562 #[test]
563 fn empty_write() {
564 let mut buf = [0; 5];
565
566 let mut send = <SendBuf>::new(u64::MAX);
567 assert_eq!(send.len, 0);
568
569 let (written, fin) = send.emit(&mut buf).unwrap();
570 assert_eq!(written, 0);
571 assert!(!fin);
572 }
573
574 #[test]
575 fn multi_write() {
576 let mut buf = [0; 128];
577
578 let mut send = <SendBuf>::new(u64::MAX);
579 assert_eq!(send.len, 0);
580
581 let first = b"something";
582 let second = b"helloworld";
583
584 assert!(send.write(first, false).is_ok());
585 assert_eq!(send.len, 9);
586
587 assert!(send.write(second, true).is_ok());
588 assert_eq!(send.len, 19);
589
590 let (written, fin) = send.emit(&mut buf[..128]).unwrap();
591 assert_eq!(written, 19);
592 assert!(fin);
593 assert_eq!(&buf[..written], b"somethinghelloworld");
594 assert_eq!(send.len, 0);
595 }
596
597 #[test]
598 fn split_write() {
599 let mut buf = [0; 10];
600
601 let mut send = <SendBuf>::new(u64::MAX);
602 assert_eq!(send.len, 0);
603
604 let first = b"something";
605 let second = b"helloworld";
606
607 assert!(send.write(first, false).is_ok());
608 assert_eq!(send.len, 9);
609
610 assert!(send.write(second, true).is_ok());
611 assert_eq!(send.len, 19);
612
613 assert_eq!(send.off_front(), 0);
614
615 let (written, fin) = send.emit(&mut buf[..10]).unwrap();
616 assert_eq!(written, 10);
617 assert!(!fin);
618 assert_eq!(&buf[..written], b"somethingh");
619 assert_eq!(send.len, 9);
620
621 assert_eq!(send.off_front(), 10);
622
623 let (written, fin) = send.emit(&mut buf[..5]).unwrap();
624 assert_eq!(written, 5);
625 assert!(!fin);
626 assert_eq!(&buf[..written], b"ellow");
627 assert_eq!(send.len, 4);
628
629 assert_eq!(send.off_front(), 15);
630
631 let (written, fin) = send.emit(&mut buf[..10]).unwrap();
632 assert_eq!(written, 4);
633 assert!(fin);
634 assert_eq!(&buf[..written], b"orld");
635 assert_eq!(send.len, 0);
636
637 assert_eq!(send.off_front(), 19);
638 }
639
640 #[test]
641 fn resend() {
642 let mut buf = [0; 15];
643
644 let mut send = <SendBuf>::new(u64::MAX);
645 assert_eq!(send.len, 0);
646 assert_eq!(send.off_front(), 0);
647
648 let first = b"something";
649 let second = b"helloworld";
650
651 assert!(send.write(first, false).is_ok());
652 assert_eq!(send.off_front(), 0);
653
654 assert!(send.write(second, true).is_ok());
655 assert_eq!(send.off_front(), 0);
656
657 assert_eq!(send.len, 19);
658
659 let (written, fin) = send.emit(&mut buf[..4]).unwrap();
660 assert_eq!(written, 4);
661 assert!(!fin);
662 assert_eq!(&buf[..written], b"some");
663 assert_eq!(send.len, 15);
664 assert_eq!(send.off_front(), 4);
665
666 let (written, fin) = send.emit(&mut buf[..5]).unwrap();
667 assert_eq!(written, 5);
668 assert!(!fin);
669 assert_eq!(&buf[..written], b"thing");
670 assert_eq!(send.len, 10);
671 assert_eq!(send.off_front(), 9);
672
673 let (written, fin) = send.emit(&mut buf[..5]).unwrap();
674 assert_eq!(written, 5);
675 assert!(!fin);
676 assert_eq!(&buf[..written], b"hello");
677 assert_eq!(send.len, 5);
678 assert_eq!(send.off_front(), 14);
679
680 send.retransmit(4, 5);
681 assert_eq!(send.len, 10);
682 assert_eq!(send.off_front(), 4);
683
684 send.retransmit(0, 4);
685 assert_eq!(send.len, 14);
686 assert_eq!(send.off_front(), 0);
687
688 let (written, fin) = send.emit(&mut buf[..11]).unwrap();
689 assert_eq!(written, 9);
690 assert!(!fin);
691 assert_eq!(&buf[..written], b"something");
692 assert_eq!(send.len, 5);
693 assert_eq!(send.off_front(), 14);
694
695 let (written, fin) = send.emit(&mut buf[..11]).unwrap();
696 assert_eq!(written, 5);
697 assert!(fin);
698 assert_eq!(&buf[..written], b"world");
699 assert_eq!(send.len, 0);
700 assert_eq!(send.off_front(), 19);
701 }
702
703 #[test]
704 fn write_blocked_by_off() {
705 let mut buf = [0; 10];
706
707 let mut send = <SendBuf>::default();
708 assert_eq!(send.len, 0);
709
710 let first = b"something";
711 let second = b"helloworld";
712
713 assert_eq!(send.write(first, false), Ok(0));
714 assert_eq!(send.len, 0);
715
716 assert_eq!(send.write(second, true), Ok(0));
717 assert_eq!(send.len, 0);
718
719 send.update_max_data(5);
720
721 assert_eq!(send.write(first, false), Ok(5));
722 assert_eq!(send.len, 5);
723
724 assert_eq!(send.write(second, true), Ok(0));
725 assert_eq!(send.len, 5);
726
727 assert_eq!(send.off_front(), 0);
728
729 let (written, fin) = send.emit(&mut buf[..10]).unwrap();
730 assert_eq!(written, 5);
731 assert!(!fin);
732 assert_eq!(&buf[..written], b"somet");
733 assert_eq!(send.len, 0);
734
735 assert_eq!(send.off_front(), 5);
736
737 let (written, fin) = send.emit(&mut buf[..10]).unwrap();
738 assert_eq!(written, 0);
739 assert!(!fin);
740 assert_eq!(&buf[..written], b"");
741 assert_eq!(send.len, 0);
742
743 send.update_max_data(15);
744
745 assert_eq!(send.write(&first[5..], false), Ok(4));
746 assert_eq!(send.len, 4);
747
748 assert_eq!(send.write(second, true), Ok(6));
749 assert_eq!(send.len, 10);
750
751 assert_eq!(send.off_front(), 5);
752
753 let (written, fin) = send.emit(&mut buf[..10]).unwrap();
754 assert_eq!(written, 10);
755 assert!(!fin);
756 assert_eq!(&buf[..10], b"hinghellow");
757 assert_eq!(send.len, 0);
758
759 send.update_max_data(25);
760
761 assert_eq!(send.write(&second[6..], true), Ok(4));
762 assert_eq!(send.len, 4);
763
764 assert_eq!(send.off_front(), 15);
765
766 let (written, fin) = send.emit(&mut buf[..10]).unwrap();
767 assert_eq!(written, 4);
768 assert!(fin);
769 assert_eq!(&buf[..written], b"orld");
770 assert_eq!(send.len, 0);
771 }
772
773 #[test]
774 fn zero_len_write() {
775 let mut buf = [0; 10];
776
777 let mut send = <SendBuf>::new(u64::MAX);
778 assert_eq!(send.len, 0);
779
780 let first = b"something";
781
782 assert!(send.write(first, false).is_ok());
783 assert_eq!(send.len, 9);
784
785 assert!(send.write(&[], true).is_ok());
786 assert_eq!(send.len, 9);
787
788 assert_eq!(send.off_front(), 0);
789
790 let (written, fin) = send.emit(&mut buf[..10]).unwrap();
791 assert_eq!(written, 9);
792 assert!(fin);
793 assert_eq!(&buf[..written], b"something");
794 assert_eq!(send.len, 0);
795 }
796
797 #[test]
799 fn send_buf_len_on_retransmit() {
800 let mut buf = [0; 15];
801
802 let mut send = <SendBuf>::new(u64::MAX);
803 assert_eq!(send.len, 0);
804 assert_eq!(send.off_front(), 0);
805
806 let first = b"something";
807
808 assert!(send.write(first, false).is_ok());
809 assert_eq!(send.off_front(), 0);
810
811 assert_eq!(send.len, 9);
812
813 let (written, fin) = send.emit(&mut buf[..4]).unwrap();
814 assert_eq!(written, 4);
815 assert!(!fin);
816 assert_eq!(&buf[..written], b"some");
817 assert_eq!(send.len, 5);
818 assert_eq!(send.off_front(), 4);
819
820 send.retransmit(3, 5);
821 assert_eq!(send.len, 6);
822 assert_eq!(send.off_front(), 3);
823 }
824
825 #[test]
826 fn send_buf_final_size_retransmit() {
827 let mut buf = [0; 50];
828 let mut send = <SendBuf>::new(u64::MAX);
829
830 send.write(&buf, false).unwrap();
831 assert_eq!(send.off_front(), 0);
832
833 let (written, _fin) = send.emit(&mut buf).unwrap();
835 assert_eq!(written, buf.len());
836 assert_eq!(send.off_front(), buf.len() as u64);
837
838 send.retransmit(40, 10);
841
842 let (fin_off, unsent) = send.stop(0).unwrap();
846 assert_eq!(fin_off, 50);
847 assert_eq!(unsent, 0);
848 }
849}