1use std::cmp;
28
29use std::collections::VecDeque;
30
31use crate::Error;
32use crate::Result;
33
34use crate::ranges;
35
36use super::RangeBuf;
37
38#[cfg(test)]
39const SEND_BUFFER_SIZE: usize = 5;
40
41#[cfg(not(test))]
42const SEND_BUFFER_SIZE: usize = 4096;
43
44#[derive(Debug, Default)]
54pub struct SendBuf {
55 data: VecDeque<RangeBuf>,
57
58 pos: usize,
60
61 off: u64,
63
64 emit_off: u64,
67
68 len: u64,
70
71 max_data: u64,
73
74 blocked_at: Option<u64>,
76
77 fin_off: Option<u64>,
79
80 shutdown: bool,
82
83 acked: ranges::RangeSet,
85
86 error: Option<u64>,
88}
89
90impl SendBuf {
91 pub fn new(max_data: u64) -> SendBuf {
93 SendBuf {
94 max_data,
95 ..SendBuf::default()
96 }
97 }
98
99 pub fn write(&mut self, mut data: &[u8], mut fin: bool) -> Result<usize> {
105 let max_off = self.off + data.len() as u64;
106
107 let capacity = self.cap()?;
110
111 if data.len() > capacity {
112 let len = capacity;
114 data = &data[..len];
115
116 fin = false;
118 }
119
120 if let Some(fin_off) = self.fin_off {
121 if max_off > fin_off {
123 return Err(Error::FinalSize);
124 }
125
126 if max_off == fin_off && !fin {
128 return Err(Error::FinalSize);
129 }
130 }
131
132 if fin {
133 self.fin_off = Some(max_off);
134 }
135
136 if self.ack_off() >= max_off {
138 return Ok(data.len());
139 }
140
141 if data.is_empty() {
144 return Ok(data.len());
145 }
146
147 let mut len = 0;
148
149 for chunk in data.chunks(SEND_BUFFER_SIZE) {
152 len += chunk.len();
153
154 let fin = len == data.len() && fin;
155
156 let buf = RangeBuf::from(chunk, self.off, fin);
157
158 self.data.push_back(buf);
160
161 self.off += chunk.len() as u64;
162 self.len += chunk.len() as u64;
163 }
164
165 Ok(len)
166 }
167
168 pub fn emit(&mut self, out: &mut [u8]) -> Result<(usize, bool)> {
170 let mut out_len = out.len();
171 let out_off = self.off_front();
172
173 let mut next_off = out_off;
174
175 while out_len > 0 {
176 let off_front = self.off_front();
177
178 if self.is_empty() ||
179 off_front >= self.off ||
180 off_front != next_off ||
181 off_front >= self.max_data
182 {
183 break;
184 }
185
186 let buf = match self.data.get_mut(self.pos) {
187 Some(v) => v,
188
189 None => break,
190 };
191
192 if buf.is_empty() {
193 self.pos += 1;
194 continue;
195 }
196
197 let buf_len = cmp::min(buf.len(), out_len);
198 let partial = buf_len < buf.len();
199
200 let out_pos = (next_off - out_off) as usize;
202 out[out_pos..out_pos + buf_len].copy_from_slice(&buf[..buf_len]);
203
204 self.len -= buf_len as u64;
205
206 out_len -= buf_len;
207
208 next_off = buf.off() + buf_len as u64;
209
210 buf.consume(buf_len);
211
212 if partial {
213 break;
215 }
216
217 self.pos += 1;
218 }
219
220 let fin = self.fin_off == Some(next_off);
227
228 self.emit_off = cmp::max(self.emit_off, next_off);
231
232 Ok((out.len() - out_len, fin))
233 }
234
235 pub fn update_max_data(&mut self, max_data: u64) {
237 self.max_data = cmp::max(self.max_data, max_data);
238 }
239
240 pub fn update_blocked_at(&mut self, blocked_at: Option<u64>) {
242 self.blocked_at = blocked_at;
243 }
244
245 pub fn blocked_at(&self) -> Option<u64> {
247 self.blocked_at
248 }
249
250 pub fn ack(&mut self, off: u64, len: usize) {
252 self.acked.insert(off..off + len as u64);
253 }
254
255 pub fn ack_and_drop(&mut self, off: u64, len: usize) {
256 self.ack(off, len);
257
258 let ack_off = self.ack_off();
259
260 if self.data.is_empty() {
261 return;
262 }
263
264 if off > ack_off {
265 return;
266 }
267
268 let mut drop_until = None;
269
270 for (i, buf) in self.data.iter_mut().enumerate() {
272 if buf.off >= ack_off {
275 break;
276 }
277
278 if buf.off < ack_off && ack_off < buf.max_off() {
281 break;
282 }
283
284 drop_until = Some(i);
286 }
287
288 if let Some(drop) = drop_until {
289 self.data.drain(..=drop);
290
291 self.pos = self.pos.saturating_sub(drop + 1);
295 }
296 }
297
298 pub fn retransmit(&mut self, off: u64, len: usize) {
299 let max_off = off + len as u64;
300 let ack_off = self.ack_off();
301
302 if self.data.is_empty() {
303 return;
304 }
305
306 if max_off <= ack_off {
307 return;
308 }
309
310 for i in 0..self.data.len() {
311 let buf = &mut self.data[i];
312
313 if buf.off >= max_off {
314 break;
315 }
316
317 if off > buf.max_off() {
318 continue;
319 }
320
321 let new_buf = if buf.off < max_off && max_off < buf.max_off() {
324 Some(buf.split_off((max_off - buf.off) as usize))
325 } else {
326 None
327 };
328
329 let prev_pos = buf.pos;
330
331 buf.pos = if off > buf.off && off <= buf.max_off() {
334 cmp::min(buf.pos, buf.start + (off - buf.off) as usize)
335 } else {
336 buf.start
337 };
338
339 self.pos = cmp::min(self.pos, i);
340
341 self.len += (prev_pos - buf.pos) as u64;
342
343 if let Some(b) = new_buf {
344 self.data.insert(i + 1, b);
345 }
346 }
347 }
348
349 pub fn reset(&mut self) -> (u64, u64) {
351 let unsent_off = cmp::max(self.off_front(), self.emit_off);
352 let unsent_len = self.off_back().saturating_sub(unsent_off);
353
354 self.fin_off = Some(unsent_off);
355
356 self.data.clear();
358
359 self.off = unsent_off;
361 self.ack(0, self.off as usize);
362
363 self.pos = 0;
364 self.len = 0;
365
366 (self.emit_off, unsent_len)
367 }
368
369 pub fn stop(&mut self, error_code: u64) -> Result<(u64, u64)> {
373 if self.error.is_some() {
374 return Err(Error::Done);
375 }
376
377 let (max_off, unsent) = self.reset();
378
379 self.error = Some(error_code);
380
381 Ok((max_off, unsent))
382 }
383
384 pub fn shutdown(&mut self) -> Result<(u64, u64)> {
386 if self.shutdown {
387 return Err(Error::Done);
388 }
389
390 self.shutdown = true;
391
392 Ok(self.reset())
393 }
394
395 pub fn off_back(&self) -> u64 {
397 self.off
398 }
399
400 pub fn off_front(&self) -> u64 {
402 let mut pos = self.pos;
403
404 while let Some(b) = self.data.get(pos) {
406 if !b.is_empty() {
407 return b.off();
408 }
409
410 pos += 1;
411 }
412
413 self.off
414 }
415
416 pub fn max_off(&self) -> u64 {
418 self.max_data
419 }
420
421 pub fn is_fin(&self) -> bool {
426 if self.fin_off == Some(self.off) {
427 return true;
428 }
429
430 false
431 }
432
433 pub fn is_complete(&self) -> bool {
438 if let Some(fin_off) = self.fin_off {
439 if self.acked == (0..fin_off) {
440 return true;
441 }
442 }
443
444 false
445 }
446
447 pub fn is_stopped(&self) -> bool {
449 self.error.is_some()
450 }
451
452 pub fn is_shutdown(&self) -> bool {
454 self.shutdown
455 }
456
457 pub fn is_empty(&self) -> bool {
459 self.data.is_empty()
460 }
461
462 pub fn ack_off(&self) -> u64 {
464 match self.acked.iter().next() {
465 Some(std::ops::Range { start: 0, end }) => end,
468
469 Some(_) | None => 0,
470 }
471 }
472
473 pub fn cap(&self) -> Result<usize> {
475 if let Some(e) = self.error {
477 return Err(Error::StreamStopped(e));
478 }
479
480 Ok((self.max_data - self.off) as usize)
481 }
482
483 #[allow(dead_code)]
485 pub fn bufs_count(&self) -> usize {
486 self.data.len()
487 }
488}
489
490#[cfg(test)]
491mod tests {
492 use super::*;
493
494 #[test]
495 fn empty_write() {
496 let mut buf = [0; 5];
497
498 let mut send = SendBuf::new(u64::MAX);
499 assert_eq!(send.len, 0);
500
501 let (written, fin) = send.emit(&mut buf).unwrap();
502 assert_eq!(written, 0);
503 assert!(!fin);
504 }
505
506 #[test]
507 fn multi_write() {
508 let mut buf = [0; 128];
509
510 let mut send = SendBuf::new(u64::MAX);
511 assert_eq!(send.len, 0);
512
513 let first = b"something";
514 let second = b"helloworld";
515
516 assert!(send.write(first, false).is_ok());
517 assert_eq!(send.len, 9);
518
519 assert!(send.write(second, true).is_ok());
520 assert_eq!(send.len, 19);
521
522 let (written, fin) = send.emit(&mut buf[..128]).unwrap();
523 assert_eq!(written, 19);
524 assert!(fin);
525 assert_eq!(&buf[..written], b"somethinghelloworld");
526 assert_eq!(send.len, 0);
527 }
528
529 #[test]
530 fn split_write() {
531 let mut buf = [0; 10];
532
533 let mut send = SendBuf::new(u64::MAX);
534 assert_eq!(send.len, 0);
535
536 let first = b"something";
537 let second = b"helloworld";
538
539 assert!(send.write(first, false).is_ok());
540 assert_eq!(send.len, 9);
541
542 assert!(send.write(second, true).is_ok());
543 assert_eq!(send.len, 19);
544
545 assert_eq!(send.off_front(), 0);
546
547 let (written, fin) = send.emit(&mut buf[..10]).unwrap();
548 assert_eq!(written, 10);
549 assert!(!fin);
550 assert_eq!(&buf[..written], b"somethingh");
551 assert_eq!(send.len, 9);
552
553 assert_eq!(send.off_front(), 10);
554
555 let (written, fin) = send.emit(&mut buf[..5]).unwrap();
556 assert_eq!(written, 5);
557 assert!(!fin);
558 assert_eq!(&buf[..written], b"ellow");
559 assert_eq!(send.len, 4);
560
561 assert_eq!(send.off_front(), 15);
562
563 let (written, fin) = send.emit(&mut buf[..10]).unwrap();
564 assert_eq!(written, 4);
565 assert!(fin);
566 assert_eq!(&buf[..written], b"orld");
567 assert_eq!(send.len, 0);
568
569 assert_eq!(send.off_front(), 19);
570 }
571
572 #[test]
573 fn resend() {
574 let mut buf = [0; 15];
575
576 let mut send = SendBuf::new(u64::MAX);
577 assert_eq!(send.len, 0);
578 assert_eq!(send.off_front(), 0);
579
580 let first = b"something";
581 let second = b"helloworld";
582
583 assert!(send.write(first, false).is_ok());
584 assert_eq!(send.off_front(), 0);
585
586 assert!(send.write(second, true).is_ok());
587 assert_eq!(send.off_front(), 0);
588
589 assert_eq!(send.len, 19);
590
591 let (written, fin) = send.emit(&mut buf[..4]).unwrap();
592 assert_eq!(written, 4);
593 assert!(!fin);
594 assert_eq!(&buf[..written], b"some");
595 assert_eq!(send.len, 15);
596 assert_eq!(send.off_front(), 4);
597
598 let (written, fin) = send.emit(&mut buf[..5]).unwrap();
599 assert_eq!(written, 5);
600 assert!(!fin);
601 assert_eq!(&buf[..written], b"thing");
602 assert_eq!(send.len, 10);
603 assert_eq!(send.off_front(), 9);
604
605 let (written, fin) = send.emit(&mut buf[..5]).unwrap();
606 assert_eq!(written, 5);
607 assert!(!fin);
608 assert_eq!(&buf[..written], b"hello");
609 assert_eq!(send.len, 5);
610 assert_eq!(send.off_front(), 14);
611
612 send.retransmit(4, 5);
613 assert_eq!(send.len, 10);
614 assert_eq!(send.off_front(), 4);
615
616 send.retransmit(0, 4);
617 assert_eq!(send.len, 14);
618 assert_eq!(send.off_front(), 0);
619
620 let (written, fin) = send.emit(&mut buf[..11]).unwrap();
621 assert_eq!(written, 9);
622 assert!(!fin);
623 assert_eq!(&buf[..written], b"something");
624 assert_eq!(send.len, 5);
625 assert_eq!(send.off_front(), 14);
626
627 let (written, fin) = send.emit(&mut buf[..11]).unwrap();
628 assert_eq!(written, 5);
629 assert!(fin);
630 assert_eq!(&buf[..written], b"world");
631 assert_eq!(send.len, 0);
632 assert_eq!(send.off_front(), 19);
633 }
634
635 #[test]
636 fn write_blocked_by_off() {
637 let mut buf = [0; 10];
638
639 let mut send = SendBuf::default();
640 assert_eq!(send.len, 0);
641
642 let first = b"something";
643 let second = b"helloworld";
644
645 assert_eq!(send.write(first, false), Ok(0));
646 assert_eq!(send.len, 0);
647
648 assert_eq!(send.write(second, true), Ok(0));
649 assert_eq!(send.len, 0);
650
651 send.update_max_data(5);
652
653 assert_eq!(send.write(first, false), Ok(5));
654 assert_eq!(send.len, 5);
655
656 assert_eq!(send.write(second, true), Ok(0));
657 assert_eq!(send.len, 5);
658
659 assert_eq!(send.off_front(), 0);
660
661 let (written, fin) = send.emit(&mut buf[..10]).unwrap();
662 assert_eq!(written, 5);
663 assert!(!fin);
664 assert_eq!(&buf[..written], b"somet");
665 assert_eq!(send.len, 0);
666
667 assert_eq!(send.off_front(), 5);
668
669 let (written, fin) = send.emit(&mut buf[..10]).unwrap();
670 assert_eq!(written, 0);
671 assert!(!fin);
672 assert_eq!(&buf[..written], b"");
673 assert_eq!(send.len, 0);
674
675 send.update_max_data(15);
676
677 assert_eq!(send.write(&first[5..], false), Ok(4));
678 assert_eq!(send.len, 4);
679
680 assert_eq!(send.write(second, true), Ok(6));
681 assert_eq!(send.len, 10);
682
683 assert_eq!(send.off_front(), 5);
684
685 let (written, fin) = send.emit(&mut buf[..10]).unwrap();
686 assert_eq!(written, 10);
687 assert!(!fin);
688 assert_eq!(&buf[..10], b"hinghellow");
689 assert_eq!(send.len, 0);
690
691 send.update_max_data(25);
692
693 assert_eq!(send.write(&second[6..], true), Ok(4));
694 assert_eq!(send.len, 4);
695
696 assert_eq!(send.off_front(), 15);
697
698 let (written, fin) = send.emit(&mut buf[..10]).unwrap();
699 assert_eq!(written, 4);
700 assert!(fin);
701 assert_eq!(&buf[..written], b"orld");
702 assert_eq!(send.len, 0);
703 }
704
705 #[test]
706 fn zero_len_write() {
707 let mut buf = [0; 10];
708
709 let mut send = SendBuf::new(u64::MAX);
710 assert_eq!(send.len, 0);
711
712 let first = b"something";
713
714 assert!(send.write(first, false).is_ok());
715 assert_eq!(send.len, 9);
716
717 assert!(send.write(&[], true).is_ok());
718 assert_eq!(send.len, 9);
719
720 assert_eq!(send.off_front(), 0);
721
722 let (written, fin) = send.emit(&mut buf[..10]).unwrap();
723 assert_eq!(written, 9);
724 assert!(fin);
725 assert_eq!(&buf[..written], b"something");
726 assert_eq!(send.len, 0);
727 }
728
729 #[test]
731 fn send_buf_len_on_retransmit() {
732 let mut buf = [0; 15];
733
734 let mut send = SendBuf::new(u64::MAX);
735 assert_eq!(send.len, 0);
736 assert_eq!(send.off_front(), 0);
737
738 let first = b"something";
739
740 assert!(send.write(first, false).is_ok());
741 assert_eq!(send.off_front(), 0);
742
743 assert_eq!(send.len, 9);
744
745 let (written, fin) = send.emit(&mut buf[..4]).unwrap();
746 assert_eq!(written, 4);
747 assert!(!fin);
748 assert_eq!(&buf[..written], b"some");
749 assert_eq!(send.len, 5);
750 assert_eq!(send.off_front(), 4);
751
752 send.retransmit(3, 5);
753 assert_eq!(send.len, 6);
754 assert_eq!(send.off_front(), 3);
755 }
756
757 #[test]
758 fn send_buf_final_size_retransmit() {
759 let mut buf = [0; 50];
760 let mut send = SendBuf::new(u64::MAX);
761
762 send.write(&buf, false).unwrap();
763 assert_eq!(send.off_front(), 0);
764
765 let (written, _fin) = send.emit(&mut buf).unwrap();
767 assert_eq!(written, buf.len());
768 assert_eq!(send.off_front(), buf.len() as u64);
769
770 send.retransmit(40, 10);
773
774 let (fin_off, unsent) = send.stop(0).unwrap();
778 assert_eq!(fin_off, 50);
779 assert_eq!(unsent, 0);
780 }
781}