quiche/stream/
send_buf.rs

1// Copyright (C) 2023, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27use std::cmp;
28
29use std::collections::VecDeque;
30
31use crate::range_buf::BufSplit;
32use crate::range_buf::RangeBuf;
33use crate::BufFactory;
34use crate::Error;
35use crate::Result;
36
37use crate::range_buf::DefaultBufFactory;
38use crate::ranges;
39
40#[cfg(test)]
41const SEND_BUFFER_SIZE: usize = 5;
42
43#[cfg(not(test))]
44const SEND_BUFFER_SIZE: usize = 4096;
45
46struct SendReserve<'a, F: BufFactory> {
47    inner: &'a mut SendBuf<F>,
48    reserved: usize,
49    fin: bool,
50}
51
52impl<F: BufFactory> SendReserve<'_, F> {
53    fn append_buf(&mut self, buf: F::Buf) -> Result<()> {
54        let len = buf.as_ref().len();
55        let inner = &mut self.inner;
56
57        if len > self.reserved {
58            return Err(Error::BufferTooShort);
59        }
60
61        let fin: bool = self.reserved == len && self.fin;
62
63        let buf = RangeBuf::from_raw(buf, inner.off, fin);
64
65        // The new data can simply be appended at the end of the send buffer.
66        inner.data.push_back(buf);
67
68        inner.off += len as u64;
69        inner.len += len as u64;
70        self.reserved -= len;
71
72        Ok(())
73    }
74}
75
76impl<F: BufFactory> Drop for SendReserve<'_, F> {
77    fn drop(&mut self) {
78        assert_eq!(self.reserved, 0)
79    }
80}
81
82/// Send-side stream buffer.
83///
84/// Stream data scheduled to be sent to the peer is buffered in a list of data
85/// chunks ordered by offset in ascending order. Contiguous data can then be
86/// read into a slice.
87///
88/// By default, new data is appended at the end of the stream, but data can be
89/// inserted at the start of the buffer (this is to allow data that needs to be
90/// retransmitted to be re-buffered).
91#[derive(Debug, Default)]
92pub struct SendBuf<F = DefaultBufFactory>
93where
94    F: BufFactory,
95{
96    /// Chunks of data to be sent, ordered by offset.
97    data: VecDeque<RangeBuf<F>>,
98
99    /// The index of the buffer that needs to be sent next.
100    pos: usize,
101
102    /// The maximum offset of data buffered in the stream.
103    off: u64,
104
105    /// The maximum offset of data sent to the peer, regardless of
106    /// retransmissions.
107    emit_off: u64,
108
109    /// The amount of data currently buffered.
110    len: u64,
111
112    /// The maximum offset we are allowed to send to the peer.
113    max_data: u64,
114
115    /// The last offset the stream was blocked at, if any.
116    blocked_at: Option<u64>,
117
118    /// The final stream offset written to the stream, if any.
119    fin_off: Option<u64>,
120
121    /// Whether the stream's send-side has been shut down.
122    shutdown: bool,
123
124    /// Ranges of data offsets that have been acked.
125    acked: ranges::RangeSet,
126
127    /// The error code received via STOP_SENDING.
128    error: Option<u64>,
129}
130
131impl<F: BufFactory> SendBuf<F> {
132    /// Creates a new send buffer.
133    pub fn new(max_data: u64) -> SendBuf<F> {
134        SendBuf {
135            max_data,
136            ..SendBuf::default()
137        }
138    }
139
140    /// Try to reserve the required number of bytes to be sent
141    fn reserve_for_write(
142        &mut self, mut len: usize, mut fin: bool,
143    ) -> Result<SendReserve<'_, F>> {
144        let max_off = self.off + len as u64;
145
146        // Get the stream send capacity. This will return an error if the stream
147        // was stopped.
148        if len > self.cap()? {
149            len = self.cap()?;
150            fin = false;
151        }
152
153        if let Some(fin_off) = self.fin_off {
154            // Can't write past final offset.
155            if max_off > fin_off {
156                return Err(Error::FinalSize);
157            }
158
159            // Can't "undo" final offset.
160            if max_off == fin_off && !fin {
161                return Err(Error::FinalSize);
162            }
163        }
164
165        if fin {
166            self.fin_off = Some(max_off);
167        }
168
169        // Don't queue data that was already fully acked.
170        if self.ack_off() >= max_off {
171            return Ok(SendReserve {
172                inner: self,
173                reserved: 0,
174                fin,
175            });
176        }
177
178        Ok(SendReserve {
179            inner: self,
180            reserved: len,
181            fin,
182        })
183    }
184
185    /// Inserts the given slice of data at the end of the buffer.
186    ///
187    /// The number of bytes that were actually stored in the buffer is returned
188    /// (this may be lower than the size of the input buffer, in case of partial
189    /// writes).
190    pub fn write(&mut self, data: &[u8], fin: bool) -> Result<usize> {
191        let mut reserve = self.reserve_for_write(data.len(), fin)?;
192
193        if reserve.reserved == 0 {
194            return Ok(0);
195        }
196
197        let ret = reserve.reserved;
198
199        // Split the remaining input data into consistently-sized buffers to
200        // avoid fragmentation.
201        for chunk in data[..reserve.reserved].chunks(SEND_BUFFER_SIZE) {
202            reserve.append_buf(F::buf_from_slice(chunk))?;
203        }
204
205        Ok(ret)
206    }
207
208    /// Inserts the given buffer of data at the end of the buffer.
209    ///
210    /// The number of bytes that were actually stored in the buffer is returned
211    /// (this may be lower than the size of the input buffer, in case of partial
212    /// writes, in which case the unwritten buffer is also returned).
213    pub fn append_buf(
214        &mut self, mut data: F::Buf, cap: usize, fin: bool,
215    ) -> Result<(usize, Option<F::Buf>)>
216    where
217        F::Buf: BufSplit,
218    {
219        let len = data.as_ref().len();
220        let mut reserve = self.reserve_for_write(cap.min(len), fin)?;
221
222        if reserve.reserved == 0 {
223            return Ok((0, Some(data)));
224        }
225
226        let remainder =
227            (reserve.reserved < len).then(|| data.split_at(reserve.reserved));
228
229        let ret = reserve.reserved;
230
231        reserve.append_buf(data)?;
232
233        Ok((ret, remainder))
234    }
235
236    /// Writes data from the send buffer into the given output buffer.
237    pub fn emit(&mut self, out: &mut [u8]) -> Result<(usize, bool)> {
238        let mut out_len = out.len();
239        let out_off = self.off_front();
240
241        let mut next_off = out_off;
242
243        while out_len > 0 {
244            let off_front = self.off_front();
245
246            if self.is_empty() ||
247                off_front >= self.off ||
248                off_front != next_off ||
249                off_front >= self.max_data
250            {
251                break;
252            }
253
254            let buf = match self.data.get_mut(self.pos) {
255                Some(v) => v,
256
257                None => break,
258            };
259
260            if buf.is_empty() {
261                self.pos += 1;
262                continue;
263            }
264
265            let buf_len = cmp::min(buf.len(), out_len);
266            let partial = buf_len < buf.len();
267
268            // Copy data to the output buffer.
269            let out_pos = (next_off - out_off) as usize;
270            out[out_pos..out_pos + buf_len].copy_from_slice(&buf[..buf_len]);
271
272            self.len -= buf_len as u64;
273
274            out_len -= buf_len;
275
276            next_off = buf.off() + buf_len as u64;
277
278            buf.consume(buf_len);
279
280            if partial {
281                // We reached the maximum capacity, so end here.
282                break;
283            }
284
285            self.pos += 1;
286        }
287
288        // Override the `fin` flag set for the output buffer by matching the
289        // buffer's maximum offset against the stream's final offset (if known).
290        //
291        // This is more efficient than tracking `fin` using the range buffers
292        // themselves, and lets us avoid queueing empty buffers just so we can
293        // propagate the final size.
294        let fin = self.fin_off == Some(next_off);
295
296        // Record the largest offset that has been sent so we can accurately
297        // report final_size
298        self.emit_off = cmp::max(self.emit_off, next_off);
299
300        Ok((out.len() - out_len, fin))
301    }
302
303    /// Updates the max_data limit to the given value.
304    pub fn update_max_data(&mut self, max_data: u64) {
305        self.max_data = cmp::max(self.max_data, max_data);
306    }
307
308    /// Updates the last offset the stream was blocked at, if any.
309    pub fn update_blocked_at(&mut self, blocked_at: Option<u64>) {
310        self.blocked_at = blocked_at;
311    }
312
313    /// The last offset the stream was blocked at, if any.
314    pub fn blocked_at(&self) -> Option<u64> {
315        self.blocked_at
316    }
317
318    /// Increments the acked data offset.
319    pub fn ack(&mut self, off: u64, len: usize) {
320        self.acked.insert(off..off + len as u64);
321    }
322
323    pub fn ack_and_drop(&mut self, off: u64, len: usize) {
324        self.ack(off, len);
325
326        let ack_off = self.ack_off();
327
328        if self.data.is_empty() {
329            return;
330        }
331
332        if off > ack_off {
333            return;
334        }
335
336        let mut drop_until = None;
337
338        // Drop contiguously acked data from the front of the buffer.
339        for (i, buf) in self.data.iter_mut().enumerate() {
340            // Newly acked range is past highest contiguous acked range, so we
341            // can't drop it.
342            if buf.off >= ack_off {
343                break;
344            }
345
346            // Highest contiguous acked range falls within newly acked range,
347            // so we can't drop it.
348            if buf.off < ack_off && ack_off < buf.max_off() {
349                break;
350            }
351
352            // Newly acked range can be dropped.
353            drop_until = Some(i);
354        }
355
356        if let Some(drop) = drop_until {
357            self.data.drain(..=drop);
358
359            // When a buffer is marked for retransmission, but then acked before
360            // it could be retransmitted, we might end up decreasing the SendBuf
361            // position too much, so make sure that doesn't happen.
362            self.pos = self.pos.saturating_sub(drop + 1);
363        }
364    }
365
366    pub fn retransmit(&mut self, off: u64, len: usize) {
367        let max_off = off + len as u64;
368        let ack_off = self.ack_off();
369
370        if self.data.is_empty() {
371            return;
372        }
373
374        if max_off <= ack_off {
375            return;
376        }
377
378        for i in 0..self.data.len() {
379            let buf = &mut self.data[i];
380
381            if buf.off >= max_off {
382                break;
383            }
384
385            if off > buf.max_off() {
386                continue;
387            }
388
389            // Split the buffer into 2 if the retransmit range ends before the
390            // buffer's final offset.
391            let new_buf = if buf.off < max_off && max_off < buf.max_off() {
392                Some(buf.split_off((max_off - buf.off) as usize))
393            } else {
394                None
395            };
396
397            let prev_pos = buf.pos;
398
399            // Reduce the buffer's position (expand the buffer) if the retransmit
400            // range is past the buffer's starting offset.
401            buf.pos = if off > buf.off && off <= buf.max_off() {
402                cmp::min(buf.pos, buf.start + (off - buf.off) as usize)
403            } else {
404                buf.start
405            };
406
407            self.pos = cmp::min(self.pos, i);
408
409            self.len += (prev_pos - buf.pos) as u64;
410
411            if let Some(b) = new_buf {
412                self.data.insert(i + 1, b);
413            }
414        }
415    }
416
417    /// Resets the stream at the current offset and clears all buffered data.
418    pub fn reset(&mut self) -> (u64, u64) {
419        let unsent_off = cmp::max(self.off_front(), self.emit_off);
420        let unsent_len = self.off_back().saturating_sub(unsent_off);
421
422        self.fin_off = Some(unsent_off);
423
424        // Drop all buffered data.
425        self.data.clear();
426
427        // Mark relevant data as acked.
428        self.off = unsent_off;
429        self.ack(0, self.off as usize);
430
431        self.pos = 0;
432        self.len = 0;
433
434        (self.emit_off, unsent_len)
435    }
436
437    /// Resets the streams and records the received error code.
438    ///
439    /// Calling this again after the first time has no effect.
440    pub fn stop(&mut self, error_code: u64) -> Result<(u64, u64)> {
441        if self.error.is_some() {
442            return Err(Error::Done);
443        }
444
445        let (max_off, unsent) = self.reset();
446
447        self.error = Some(error_code);
448
449        Ok((max_off, unsent))
450    }
451
452    /// Shuts down sending data.
453    pub fn shutdown(&mut self) -> Result<(u64, u64)> {
454        if self.shutdown {
455            return Err(Error::Done);
456        }
457
458        self.shutdown = true;
459
460        Ok(self.reset())
461    }
462
463    /// Returns the largest offset of data buffered.
464    pub fn off_back(&self) -> u64 {
465        self.off
466    }
467
468    /// Returns the lowest offset of data buffered.
469    pub fn off_front(&self) -> u64 {
470        let mut pos = self.pos;
471
472        // Skip empty buffers from the start of the queue.
473        while let Some(b) = self.data.get(pos) {
474            if !b.is_empty() {
475                return b.off();
476            }
477
478            pos += 1;
479        }
480
481        self.off
482    }
483
484    /// The maximum offset we are allowed to send to the peer.
485    pub fn max_off(&self) -> u64 {
486        self.max_data
487    }
488
489    /// Returns true if all data in the stream has been sent.
490    ///
491    /// This happens when the stream's send final size is known, and the
492    /// application has already written data up to that point.
493    pub fn is_fin(&self) -> bool {
494        if self.fin_off == Some(self.off) {
495            return true;
496        }
497
498        false
499    }
500
501    /// Returns true if the send-side of the stream is complete.
502    ///
503    /// This happens when the stream's send final size is known, and the peer
504    /// has already acked all stream data up to that point.
505    pub fn is_complete(&self) -> bool {
506        if let Some(fin_off) = self.fin_off {
507            if self.acked == (0..fin_off) {
508                return true;
509            }
510        }
511
512        false
513    }
514
515    /// Returns true if the stream was stopped before completion.
516    pub fn is_stopped(&self) -> bool {
517        self.error.is_some()
518    }
519
520    /// Returns true if the stream was shut down.
521    pub fn is_shutdown(&self) -> bool {
522        self.shutdown
523    }
524
525    /// Returns true if there is no data.
526    pub fn is_empty(&self) -> bool {
527        self.data.is_empty()
528    }
529
530    /// Returns the highest contiguously acked offset.
531    pub fn ack_off(&self) -> u64 {
532        match self.acked.iter().next() {
533            // Only consider the initial range if it contiguously covers the
534            // start of the stream (i.e. from offset 0).
535            Some(std::ops::Range { start: 0, end }) => end,
536
537            Some(_) | None => 0,
538        }
539    }
540
541    /// Returns the outgoing flow control capacity.
542    pub fn cap(&self) -> Result<usize> {
543        // The stream was stopped, so return the error code instead.
544        if let Some(e) = self.error {
545            return Err(Error::StreamStopped(e));
546        }
547
548        Ok((self.max_data - self.off) as usize)
549    }
550
551    /// Returns the number of separate buffers stored.
552    #[allow(dead_code)]
553    pub fn bufs_count(&self) -> usize {
554        self.data.len()
555    }
556}
557
558#[cfg(test)]
559mod tests {
560    use super::*;
561
562    #[test]
563    fn empty_write() {
564        let mut buf = [0; 5];
565
566        let mut send = <SendBuf>::new(u64::MAX);
567        assert_eq!(send.len, 0);
568
569        let (written, fin) = send.emit(&mut buf).unwrap();
570        assert_eq!(written, 0);
571        assert!(!fin);
572    }
573
574    #[test]
575    fn multi_write() {
576        let mut buf = [0; 128];
577
578        let mut send = <SendBuf>::new(u64::MAX);
579        assert_eq!(send.len, 0);
580
581        let first = b"something";
582        let second = b"helloworld";
583
584        assert!(send.write(first, false).is_ok());
585        assert_eq!(send.len, 9);
586
587        assert!(send.write(second, true).is_ok());
588        assert_eq!(send.len, 19);
589
590        let (written, fin) = send.emit(&mut buf[..128]).unwrap();
591        assert_eq!(written, 19);
592        assert!(fin);
593        assert_eq!(&buf[..written], b"somethinghelloworld");
594        assert_eq!(send.len, 0);
595    }
596
597    #[test]
598    fn split_write() {
599        let mut buf = [0; 10];
600
601        let mut send = <SendBuf>::new(u64::MAX);
602        assert_eq!(send.len, 0);
603
604        let first = b"something";
605        let second = b"helloworld";
606
607        assert!(send.write(first, false).is_ok());
608        assert_eq!(send.len, 9);
609
610        assert!(send.write(second, true).is_ok());
611        assert_eq!(send.len, 19);
612
613        assert_eq!(send.off_front(), 0);
614
615        let (written, fin) = send.emit(&mut buf[..10]).unwrap();
616        assert_eq!(written, 10);
617        assert!(!fin);
618        assert_eq!(&buf[..written], b"somethingh");
619        assert_eq!(send.len, 9);
620
621        assert_eq!(send.off_front(), 10);
622
623        let (written, fin) = send.emit(&mut buf[..5]).unwrap();
624        assert_eq!(written, 5);
625        assert!(!fin);
626        assert_eq!(&buf[..written], b"ellow");
627        assert_eq!(send.len, 4);
628
629        assert_eq!(send.off_front(), 15);
630
631        let (written, fin) = send.emit(&mut buf[..10]).unwrap();
632        assert_eq!(written, 4);
633        assert!(fin);
634        assert_eq!(&buf[..written], b"orld");
635        assert_eq!(send.len, 0);
636
637        assert_eq!(send.off_front(), 19);
638    }
639
640    #[test]
641    fn resend() {
642        let mut buf = [0; 15];
643
644        let mut send = <SendBuf>::new(u64::MAX);
645        assert_eq!(send.len, 0);
646        assert_eq!(send.off_front(), 0);
647
648        let first = b"something";
649        let second = b"helloworld";
650
651        assert!(send.write(first, false).is_ok());
652        assert_eq!(send.off_front(), 0);
653
654        assert!(send.write(second, true).is_ok());
655        assert_eq!(send.off_front(), 0);
656
657        assert_eq!(send.len, 19);
658
659        let (written, fin) = send.emit(&mut buf[..4]).unwrap();
660        assert_eq!(written, 4);
661        assert!(!fin);
662        assert_eq!(&buf[..written], b"some");
663        assert_eq!(send.len, 15);
664        assert_eq!(send.off_front(), 4);
665
666        let (written, fin) = send.emit(&mut buf[..5]).unwrap();
667        assert_eq!(written, 5);
668        assert!(!fin);
669        assert_eq!(&buf[..written], b"thing");
670        assert_eq!(send.len, 10);
671        assert_eq!(send.off_front(), 9);
672
673        let (written, fin) = send.emit(&mut buf[..5]).unwrap();
674        assert_eq!(written, 5);
675        assert!(!fin);
676        assert_eq!(&buf[..written], b"hello");
677        assert_eq!(send.len, 5);
678        assert_eq!(send.off_front(), 14);
679
680        send.retransmit(4, 5);
681        assert_eq!(send.len, 10);
682        assert_eq!(send.off_front(), 4);
683
684        send.retransmit(0, 4);
685        assert_eq!(send.len, 14);
686        assert_eq!(send.off_front(), 0);
687
688        let (written, fin) = send.emit(&mut buf[..11]).unwrap();
689        assert_eq!(written, 9);
690        assert!(!fin);
691        assert_eq!(&buf[..written], b"something");
692        assert_eq!(send.len, 5);
693        assert_eq!(send.off_front(), 14);
694
695        let (written, fin) = send.emit(&mut buf[..11]).unwrap();
696        assert_eq!(written, 5);
697        assert!(fin);
698        assert_eq!(&buf[..written], b"world");
699        assert_eq!(send.len, 0);
700        assert_eq!(send.off_front(), 19);
701    }
702
703    #[test]
704    fn write_blocked_by_off() {
705        let mut buf = [0; 10];
706
707        let mut send = <SendBuf>::default();
708        assert_eq!(send.len, 0);
709
710        let first = b"something";
711        let second = b"helloworld";
712
713        assert_eq!(send.write(first, false), Ok(0));
714        assert_eq!(send.len, 0);
715
716        assert_eq!(send.write(second, true), Ok(0));
717        assert_eq!(send.len, 0);
718
719        send.update_max_data(5);
720
721        assert_eq!(send.write(first, false), Ok(5));
722        assert_eq!(send.len, 5);
723
724        assert_eq!(send.write(second, true), Ok(0));
725        assert_eq!(send.len, 5);
726
727        assert_eq!(send.off_front(), 0);
728
729        let (written, fin) = send.emit(&mut buf[..10]).unwrap();
730        assert_eq!(written, 5);
731        assert!(!fin);
732        assert_eq!(&buf[..written], b"somet");
733        assert_eq!(send.len, 0);
734
735        assert_eq!(send.off_front(), 5);
736
737        let (written, fin) = send.emit(&mut buf[..10]).unwrap();
738        assert_eq!(written, 0);
739        assert!(!fin);
740        assert_eq!(&buf[..written], b"");
741        assert_eq!(send.len, 0);
742
743        send.update_max_data(15);
744
745        assert_eq!(send.write(&first[5..], false), Ok(4));
746        assert_eq!(send.len, 4);
747
748        assert_eq!(send.write(second, true), Ok(6));
749        assert_eq!(send.len, 10);
750
751        assert_eq!(send.off_front(), 5);
752
753        let (written, fin) = send.emit(&mut buf[..10]).unwrap();
754        assert_eq!(written, 10);
755        assert!(!fin);
756        assert_eq!(&buf[..10], b"hinghellow");
757        assert_eq!(send.len, 0);
758
759        send.update_max_data(25);
760
761        assert_eq!(send.write(&second[6..], true), Ok(4));
762        assert_eq!(send.len, 4);
763
764        assert_eq!(send.off_front(), 15);
765
766        let (written, fin) = send.emit(&mut buf[..10]).unwrap();
767        assert_eq!(written, 4);
768        assert!(fin);
769        assert_eq!(&buf[..written], b"orld");
770        assert_eq!(send.len, 0);
771    }
772
773    #[test]
774    fn zero_len_write() {
775        let mut buf = [0; 10];
776
777        let mut send = <SendBuf>::new(u64::MAX);
778        assert_eq!(send.len, 0);
779
780        let first = b"something";
781
782        assert!(send.write(first, false).is_ok());
783        assert_eq!(send.len, 9);
784
785        assert!(send.write(&[], true).is_ok());
786        assert_eq!(send.len, 9);
787
788        assert_eq!(send.off_front(), 0);
789
790        let (written, fin) = send.emit(&mut buf[..10]).unwrap();
791        assert_eq!(written, 9);
792        assert!(fin);
793        assert_eq!(&buf[..written], b"something");
794        assert_eq!(send.len, 0);
795    }
796
797    /// Check SendBuf::len calculation on a retransmit case
798    #[test]
799    fn send_buf_len_on_retransmit() {
800        let mut buf = [0; 15];
801
802        let mut send = <SendBuf>::new(u64::MAX);
803        assert_eq!(send.len, 0);
804        assert_eq!(send.off_front(), 0);
805
806        let first = b"something";
807
808        assert!(send.write(first, false).is_ok());
809        assert_eq!(send.off_front(), 0);
810
811        assert_eq!(send.len, 9);
812
813        let (written, fin) = send.emit(&mut buf[..4]).unwrap();
814        assert_eq!(written, 4);
815        assert!(!fin);
816        assert_eq!(&buf[..written], b"some");
817        assert_eq!(send.len, 5);
818        assert_eq!(send.off_front(), 4);
819
820        send.retransmit(3, 5);
821        assert_eq!(send.len, 6);
822        assert_eq!(send.off_front(), 3);
823    }
824
825    #[test]
826    fn send_buf_final_size_retransmit() {
827        let mut buf = [0; 50];
828        let mut send = <SendBuf>::new(u64::MAX);
829
830        send.write(&buf, false).unwrap();
831        assert_eq!(send.off_front(), 0);
832
833        // Emit the whole buffer
834        let (written, _fin) = send.emit(&mut buf).unwrap();
835        assert_eq!(written, buf.len());
836        assert_eq!(send.off_front(), buf.len() as u64);
837
838        // Server decides to retransmit the last 10 bytes. It's possible
839        // it's not actually lost and that the client did receive it.
840        send.retransmit(40, 10);
841
842        // Server receives STOP_SENDING from client. The final_size we
843        // send in the RESET_STREAM should be 50. If we send anything less,
844        // it's a FINAL_SIZE_ERROR.
845        let (fin_off, unsent) = send.stop(0).unwrap();
846        assert_eq!(fin_off, 50);
847        assert_eq!(unsent, 0);
848    }
849}