tokio_quiche/quic/io/
gso.rs

1// Copyright (C) 2025, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27use std::io;
28use std::net::SocketAddr;
29use std::time::Instant;
30
31#[cfg(target_os = "linux")]
32mod linux_imports {
33    pub(super) use nix::sys::socket::sendmsg;
34    pub(super) use nix::sys::socket::ControlMessage;
35    pub(super) use nix::sys::socket::MsgFlags;
36    pub(super) use nix::sys::socket::SockaddrStorage;
37    pub(super) use smallvec::SmallVec;
38    pub(super) use std::io::ErrorKind;
39    pub(super) use std::os::fd::AsRawFd;
40    pub(super) use tokio::io::Interest;
41}
42
43#[cfg(target_os = "linux")]
44use self::linux_imports::*;
45
46// Maximum number of packets can be sent in UDP GSO.
47pub(crate) const UDP_MAX_SEGMENT_COUNT: usize = 64;
48
49#[cfg(not(feature = "gcongestion"))]
50/// Returns a new max send buffer size to avoid the fragmentation
51/// at the end. Maximum send buffer size is min(MAX_SEND_BUF_SIZE,
52/// connection's send_quantum).
53/// For example,
54///
55/// - max_send_buf = 1000 and mss = 100, return 1000
56/// - max_send_buf = 1000 and mss = 90, return 990
57///
58/// not to have last 10 bytes packet.
59pub(crate) fn tune_max_send_size(
60    segment_size: Option<usize>, send_quantum: usize, max_capacity: usize,
61) -> usize {
62    let max_send_buf_size = send_quantum.min(max_capacity);
63
64    if let Some(mss) = segment_size {
65        max_send_buf_size / mss * mss
66    } else {
67        max_send_buf_size
68    }
69}
70
71// https://wiki.cfdata.org/pages/viewpage.action?pageId=436188159
72pub(crate) const UDP_MAX_GSO_PACKET_SIZE: usize = 65507;
73
74#[cfg(target_os = "linux")]
75#[derive(Copy, Clone, Debug)]
76pub(crate) enum PktInfo {
77    V4(libc::in_pktinfo),
78    V6(libc::in6_pktinfo),
79}
80
81#[cfg(target_os = "linux")]
82impl PktInfo {
83    fn make_cmsg(&'_ self) -> ControlMessage {
84        match self {
85            Self::V4(pkt) => ControlMessage::Ipv4PacketInfo(pkt),
86            Self::V6(pkt) => ControlMessage::Ipv6PacketInfo(pkt),
87        }
88    }
89
90    fn from_socket_addr(addr: SocketAddr) -> Self {
91        match addr {
92            SocketAddr::V4(ipv4) => {
93                // This is basically a safe wrapper around `mem::transmute()`.
94                // Calling this on the raw octets will ensure they
95                // become a native-endian, kernel-readable u32
96                let s_addr = u32::from_ne_bytes(ipv4.ip().octets());
97
98                Self::V4(libc::in_pktinfo {
99                    ipi_ifindex: 0,
100                    ipi_spec_dst: libc::in_addr { s_addr },
101                    ipi_addr: libc::in_addr { s_addr: 0 },
102                })
103            },
104            SocketAddr::V6(ipv6) => Self::V6(libc::in6_pktinfo {
105                ipi6_ifindex: 0,
106                ipi6_addr: libc::in6_addr {
107                    s6_addr: ipv6.ip().octets(),
108                },
109            }),
110        }
111    }
112}
113
114#[cfg(all(target_os = "linux", not(feature = "fuzzing")))]
115pub async fn send_to(
116    socket: &tokio::net::UdpSocket, to: SocketAddr, from: Option<SocketAddr>,
117    send_buf: &[u8], segment_size: usize, num_pkts: usize,
118    tx_time: Option<Instant>,
119) -> io::Result<usize> {
120    // An instant with the value of zero, since [`Instant`] is backed by a version
121    // of timespec this allows to extract raw values from an [`Instant`]
122    const INSTANT_ZERO: Instant = unsafe { std::mem::transmute(0u128) };
123
124    loop {
125        let iov = [std::io::IoSlice::new(send_buf)];
126        let segment_size_u16 = segment_size as u16;
127
128        let raw_time = tx_time
129            .map(|t| t.duration_since(INSTANT_ZERO).as_nanos() as u64)
130            .unwrap_or(0);
131
132        let pkt_info = from.map(PktInfo::from_socket_addr);
133
134        let mut cmsgs: SmallVec<[ControlMessage; 3]> = SmallVec::new();
135
136        if num_pkts > 1 {
137            // Create cmsg for UDP_SEGMENT.
138            cmsgs.push(ControlMessage::UdpGsoSegments(&segment_size_u16));
139        }
140
141        if tx_time.is_some() {
142            // Create cmsg for TXTIME.
143            cmsgs.push(ControlMessage::TxTime(&raw_time));
144        }
145
146        if let Some(pkt) = pkt_info.as_ref() {
147            // Create cmsg for IP(V6)_PKTINFO.
148            cmsgs.push(pkt.make_cmsg());
149        }
150
151        let addr = SockaddrStorage::from(to);
152
153        // Must use [`try_io`] so tokio can properly clear its readyness flag
154        let res = socket.try_io(Interest::WRITABLE, || {
155            let fd = socket.as_raw_fd();
156            sendmsg(fd, &iov, &cmsgs, MsgFlags::empty(), Some(&addr))
157                .map_err(Into::into)
158        });
159
160        match res {
161            // Wait for the socket to become writable and try again
162            Err(e) if e.kind() == ErrorKind::WouldBlock =>
163                socket.writable().await?,
164            res => return res,
165        }
166    }
167}
168
169#[cfg(any(not(target_os = "linux"), feature = "fuzzing"))]
170pub(crate) async fn send_to(
171    socket: &tokio::net::UdpSocket, to: SocketAddr, _from: Option<SocketAddr>,
172    send_buf: &[u8], _segment_size: usize, _num_pkts: usize,
173    _tx_time: Option<Instant>,
174) -> io::Result<usize> {
175    socket.send_to(send_buf, to).await
176}
177
178#[cfg(all(target_os = "linux", test))]
179mod test {
180    #[test]
181    /// If this test begins to fail, it means the implementation of [`Instant`]
182    /// has changed in the std library.
183    fn instant_zero() {
184        use std::time::Instant;
185
186        const INSTANT_ZERO: Instant = unsafe { std::mem::transmute(0u128) };
187        const NANOS_PER_SEC: u128 = 1_000_000_000;
188
189        // Define a [`Timespec`] similar to the one backing [`Instant`]
190        #[derive(Debug)]
191        struct Timespec {
192            tv_sec: i64,
193            tv_nsec: u32,
194        }
195
196        let now = Instant::now();
197        let now_timespec: Timespec = unsafe { std::mem::transmute(now) };
198
199        let ref_elapsed = now.duration_since(INSTANT_ZERO).as_nanos();
200        let raw_elapsed = now_timespec.tv_sec as u128 * NANOS_PER_SEC +
201            now_timespec.tv_nsec as u128;
202
203        assert_eq!(ref_elapsed, raw_elapsed);
204    }
205}