datagram_socket/
datagram.rs

1// Copyright (C) 2025, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27use buffer_pool::RawPoolBufDatagramIo;
28use futures_util::future::poll_fn;
29use futures_util::ready;
30use futures_util::FutureExt;
31use std::future::Future;
32use std::io;
33use std::net::Ipv4Addr;
34use std::net::SocketAddr;
35use std::net::SocketAddrV4;
36use std::sync::Arc;
37use std::task::Context;
38use std::task::Poll;
39use tokio::io::ReadBuf;
40use tokio::net::UdpSocket;
41use tokio::task::coop::unconstrained;
42
43#[cfg(unix)]
44use std::os::fd::AsFd;
45#[cfg(unix)]
46use std::os::fd::BorrowedFd;
47#[cfg(unix)]
48use std::os::fd::FromRawFd;
49#[cfg(unix)]
50use std::os::fd::IntoRawFd;
51#[cfg(unix)]
52use std::os::fd::OwnedFd;
53#[cfg(unix)]
54use tokio::net::UnixDatagram;
55
56use crate::socket_stats::AsSocketStats;
57
58// This is the largest datagram we expect to support.
59// UDP and Unix sockets can support larger datagrams than this, but we only
60// expect to support packets coming to/from the Internet.
61pub const MAX_DATAGRAM_SIZE: usize = 1500;
62
63pub trait DatagramSocketWithStats: DatagramSocket {}
64
65impl<T> DatagramSocketWithStats for T where T: DatagramSocket + AsSocketStats {}
66
67/// Describes an implementation of a connected datagram socket.
68///
69/// Rather than using Socket for datagram-oriented sockets, the DatagramSocket
70/// trait purposely does not implement AsyncRead/AsyncWrite, which are traits
71/// with stream semantics. For example, the `AsyncReadExt::read_exact` method
72/// which issues as many reads as possible to fill the buffer provided.
73///
74/// For a similar reason, [`std::net::UdpSocket`] does not implement
75/// [`io::Read`] nor does [`tokio::net::UdpSocket`] implement
76/// [`tokio::io::AsyncRead`].
77pub trait DatagramSocket:
78    DatagramSocketSend + DatagramSocketRecv + 'static
79{
80    #[cfg(unix)]
81    fn as_raw_io(&self) -> Option<BorrowedFd<'_>>;
82
83    #[cfg(unix)]
84    fn into_fd(self) -> Option<OwnedFd>;
85
86    fn as_buf_io(&mut self) -> Option<&mut dyn RawPoolBufDatagramIo> {
87        None
88    }
89}
90
91/// Describes the send half of a connected datagram socket.
92pub trait DatagramSocketSend: Sync {
93    /// Attempts to send data on the socket to the remote address to which it
94    /// was previously connected.
95    ///
96    /// Note that on multiple calls to a `poll_*` method in the send direction,
97    /// only the `Waker` from the `Context` passed to the most recent call will
98    /// be scheduled to receive a wakeup.
99    ///
100    /// # Return value
101    ///
102    /// The function returns:
103    ///
104    /// * `Poll::Pending` if the socket is not available to write
105    /// * `Poll::Ready(Ok(n))` `n` is the number of bytes sent
106    /// * `Poll::Ready(Err(e))` if an error is encountered.
107    ///
108    /// # Errors
109    ///
110    /// This function may encounter any standard I/O error except `WouldBlock`.
111    fn poll_send(&self, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>>;
112
113    /// Attempts to send data on the socket to a given address.
114    ///
115    /// If this socket only supports a single address, it should forward to
116    /// `send`. It should *not* panic or discard the data.
117    /// It's recommended that this return an error if `addr` doesn't match the
118    /// only supported address.
119    ///
120    /// Note that on multiple calls to a `poll_*` method in the send direction,
121    /// only the `Waker` from the `Context` passed to the most recent call
122    /// will be scheduled to receive a wakeup.
123    ///
124    /// # Return value
125    ///
126    /// The function returns:
127    ///
128    /// * `Poll::Pending` if the socket is not ready to write
129    /// * `Poll::Ready(Ok(n))` `n` is the number of bytes sent.
130    /// * `Poll::Ready(Err(e))` if an error is encountered.
131    ///
132    /// # Errors
133    ///
134    /// This function may encounter any standard I/O error except `WouldBlock`.
135    fn poll_send_to(
136        &self, cx: &mut Context, buf: &[u8], addr: SocketAddr,
137    ) -> Poll<io::Result<usize>>;
138
139    /// Attempts to send multiple packets of data on the socket to the remote
140    /// address to which it was previously connected.
141    ///
142    /// Note that on multiple calls to a `poll_*` method in the send direction,
143    /// only the `Waker` from the `Context` passed to the most recent call
144    /// will be scheduled to receive a wakeup.
145    ///
146    /// # Return value
147    ///
148    /// The function returns:
149    ///
150    /// * `Poll::Pending` if the socket is not ready to write
151    /// * `Poll::Ready(Ok(n))` `n` is the number of packets sent. If any packet
152    ///   was sent only partially, that information is lost.
153    /// * `Poll::Ready(Err(e))` if an error is encountered.
154    ///
155    /// # Errors
156    ///
157    /// This function may encounter any standard I/O error except `WouldBlock`.
158    fn poll_send_many(
159        &self, cx: &mut Context, bufs: &[ReadBuf<'_>],
160    ) -> Poll<io::Result<usize>> {
161        let mut sent = 0;
162
163        for buf in bufs {
164            match self.poll_send(cx, buf.filled()) {
165                Poll::Ready(Ok(_)) => sent += 1,
166                Poll::Ready(err) => {
167                    if sent == 0 {
168                        return Poll::Ready(err);
169                    }
170                    break;
171                },
172                Poll::Pending => {
173                    if sent == 0 {
174                        return Poll::Pending;
175                    }
176                    break;
177                },
178            }
179        }
180
181        Poll::Ready(Ok(sent))
182    }
183
184    /// If the underlying socket is a `UdpSocket`, return the reference to it.
185    fn as_udp_socket(&self) -> Option<&UdpSocket> {
186        None
187    }
188
189    /// Returns the socket address of the remote peer this socket was connected
190    /// to.
191    fn peer_addr(&self) -> Option<SocketAddr> {
192        None
193    }
194}
195
196/// Writes datagrams to a socket.
197///
198/// Implemented as an extension trait, adding utility methods to all
199/// [`DatagramSocketSend`] types. Callers will tend to import this trait instead
200/// of [`DatagramSocketSend`].
201///
202/// [`DatagramSocketSend`]: DatagramSocketSend
203pub trait DatagramSocketSendExt: DatagramSocketSend {
204    /// Sends data on the socket to the remote address that the socket is
205    /// connected to.
206    fn send(&self, buf: &[u8]) -> impl Future<Output = io::Result<usize>> {
207        poll_fn(move |cx| self.poll_send(cx, buf))
208    }
209
210    /// Sends data on the socket to the given address. On success, returns the
211    /// number of bytes written.
212    fn send_to(
213        &self, buf: &[u8], addr: SocketAddr,
214    ) -> impl Future<Output = io::Result<usize>> {
215        poll_fn(move |cx| self.poll_send_to(cx, buf, addr))
216    }
217
218    /// Sends multiple data packets on the socket to the to the remote address
219    /// that the socket is connected to. On success, returns the number of
220    /// packets sent.
221    fn send_many(
222        &self, bufs: &[ReadBuf<'_>],
223    ) -> impl Future<Output = io::Result<usize>> {
224        poll_fn(move |cx| self.poll_send_many(cx, bufs))
225    }
226
227    fn try_send(&self, buf: &[u8]) -> io::Result<usize> {
228        match unconstrained(poll_fn(|cx| self.poll_send(cx, buf))).now_or_never()
229        {
230            Some(result) => result,
231            None => Err(io::ErrorKind::WouldBlock.into()),
232        }
233    }
234
235    fn try_send_many(&self, bufs: &[ReadBuf<'_>]) -> io::Result<usize> {
236        match unconstrained(poll_fn(|cx| self.poll_send_many(cx, bufs)))
237            .now_or_never()
238        {
239            Some(result) => result,
240            None => Err(io::ErrorKind::WouldBlock.into()),
241        }
242    }
243}
244
245/// Describes the receive half of a connected datagram socket.
246pub trait DatagramSocketRecv: Send {
247    /// Attempts to receive a single datagram message on the socket from the
248    /// remote address to which it is `connect`ed.
249    ///
250    /// Note that on multiple calls to a `poll_*` method in the `recv`
251    /// direction, only the `Waker` from the `Context` passed to the most
252    /// recent call will be scheduled to receive a wakeup.
253    ///
254    /// # Return value
255    ///
256    /// The function returns:
257    ///
258    /// * `Poll::Pending` if the socket is not ready to read
259    /// * `Poll::Ready(Ok(()))` reads data `ReadBuf` if the socket is ready
260    /// * `Poll::Ready(Err(e))` if an error is encountered.
261    ///
262    /// # Errors
263    ///
264    /// This function may encounter any standard I/O error except `WouldBlock`.
265    fn poll_recv(
266        &mut self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>,
267    ) -> Poll<io::Result<()>>;
268
269    /// Attempts to receive a single datagram on the socket.
270    ///
271    /// Note that on multiple calls to a `poll_*` method in the `recv`
272    /// direction, only the `Waker` from the `Context` passed to the most
273    /// recent call will be scheduled to receive a wakeup.
274    ///
275    /// # Return value
276    ///
277    /// The function returns:
278    ///
279    /// * `Poll::Pending` if the socket is not ready to read
280    /// * `Poll::Ready(Ok(addr))` reads data from `addr` into `ReadBuf` if the
281    ///   socket is ready
282    /// * `Poll::Ready(Err(e))` if an error is encountered.
283    ///
284    /// # Errors
285    ///
286    /// This function may encounter any standard I/O error except `WouldBlock`.
287    fn poll_recv_from(
288        &mut self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>,
289    ) -> Poll<io::Result<SocketAddr>> {
290        self.poll_recv(cx, buf).map_ok(|_| {
291            SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0))
292        })
293    }
294
295    /// Attempts to receive multiple datagrams on the socket from the remote
296    /// address to which it is `connect`ed.
297    ///
298    /// Note that on multiple calls to a `poll_*` method in the `recv`
299    /// direction, only the `Waker` from the `Context` passed to the most
300    /// recent call will be scheduled to receive a wakeup.
301    ///
302    /// # Return value
303    ///
304    /// The function returns:
305    ///
306    /// * `Poll::Pending` if the socket is not ready to read
307    /// * `Poll::Ready(Ok(n))` reads data `ReadBuf` if the socket is ready `n`
308    ///   is the number of datagrams read.
309    /// * `Poll::Ready(Err(e))` if an error is encountered.
310    ///
311    /// # Errors
312    ///
313    /// This function may encounter any standard I/O error except `WouldBlock`.
314    fn poll_recv_many(
315        &mut self, cx: &mut Context<'_>, bufs: &mut [ReadBuf<'_>],
316    ) -> Poll<io::Result<usize>> {
317        let mut read = 0;
318
319        for buf in bufs {
320            match self.poll_recv(cx, buf) {
321                Poll::Ready(Ok(())) => read += 1,
322
323                Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
324
325                // Only return `Poll::Ready` if at least one datagram was
326                // successfully read, otherwise block.
327                Poll::Pending if read == 0 => return Poll::Pending,
328                Poll::Pending => break,
329            }
330        }
331
332        Poll::Ready(Ok(read))
333    }
334
335    /// If the underlying socket is a `UdpSocket`, return the reference to it.
336    fn as_udp_socket(&self) -> Option<&UdpSocket> {
337        None
338    }
339}
340
341/// Reads datagrams from a socket.
342///
343/// Implemented as an extension trait, adding utility methods to all
344/// [`DatagramSocketRecv`] types. Callers will tend to import this trait instead
345/// of [`DatagramSocketRecv`].
346///
347/// [`DatagramSocketRecv`]: DatagramSocketRecv
348pub trait DatagramSocketRecvExt: DatagramSocketRecv {
349    /// Receives a single datagram message on the socket from the remote address
350    /// to which it is connected. On success, returns the number of bytes read.
351    fn recv(
352        &mut self, buf: &mut [u8],
353    ) -> impl Future<Output = io::Result<usize>> + Send {
354        poll_fn(|cx| {
355            let mut buf = ReadBuf::new(buf);
356
357            ready!(self.poll_recv(cx, &mut buf)?);
358
359            Poll::Ready(Ok(buf.filled().len()))
360        })
361    }
362
363    /// Receives a single datagram message on the socket. On success, returns
364    /// the number of bytes read and the origin.
365    fn recv_from(
366        &mut self, buf: &mut [u8],
367    ) -> impl Future<Output = io::Result<(usize, SocketAddr)>> + Send {
368        poll_fn(|cx| {
369            let mut buf = ReadBuf::new(buf);
370
371            let addr = ready!(self.poll_recv_from(cx, &mut buf)?);
372
373            Poll::Ready(Ok((buf.filled().len(), addr)))
374        })
375    }
376
377    /// Receives multiple datagrams on the socket from the remote address
378    /// to which it is connected. Returns the number of buffers used (i.e.
379    /// number of datagrams read). Each used buffer can be read up to its
380    /// `filled().len()`.
381    fn recv_many(
382        &mut self, bufs: &mut [ReadBuf<'_>],
383    ) -> impl Future<Output = io::Result<usize>> + Send {
384        poll_fn(|cx| self.poll_recv_many(cx, bufs))
385    }
386}
387
388impl<T: DatagramSocketSend + ?Sized> DatagramSocketSendExt for T {}
389
390impl<T: DatagramSocketRecv + ?Sized> DatagramSocketRecvExt for T {}
391
392/// A convenience method that can be implemented for any type if it wants
393/// to forward its `DatagramSocketSend` functionality to an inner field/socket.
394/// This automatically derives `DatagramSocketSend`.
395pub trait AsDatagramSocketSend {
396    type AsSend: DatagramSocketSend + ?Sized;
397
398    fn as_datagram_socket_send(&self) -> &Self::AsSend;
399}
400
401/// A convenience method that can be implemented for any type if it wants
402/// to forward its `DatagramSocketRecv` functionality to an inner field/socket.
403/// This automatically derives `DatagramSocketRecv`.
404pub trait AsDatagramSocketRecv {
405    type AsRecv: DatagramSocketRecv + ?Sized;
406
407    fn as_datagram_socket_recv(&mut self) -> &mut Self::AsRecv;
408    fn as_shared_datagram_socket_recv(&self) -> &Self::AsRecv;
409}
410
411impl<T: AsDatagramSocketSend + Sync> DatagramSocketSend for T {
412    #[inline]
413    fn poll_send(&self, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
414        self.as_datagram_socket_send().poll_send(cx, buf)
415    }
416
417    #[inline]
418    fn poll_send_to(
419        &self, cx: &mut Context, buf: &[u8], addr: SocketAddr,
420    ) -> Poll<io::Result<usize>> {
421        self.as_datagram_socket_send().poll_send_to(cx, buf, addr)
422    }
423
424    #[inline]
425    fn poll_send_many(
426        &self, cx: &mut Context, bufs: &[ReadBuf<'_>],
427    ) -> Poll<io::Result<usize>> {
428        self.as_datagram_socket_send().poll_send_many(cx, bufs)
429    }
430
431    #[inline]
432    fn as_udp_socket(&self) -> Option<&UdpSocket> {
433        self.as_datagram_socket_send().as_udp_socket()
434    }
435
436    #[inline]
437    fn peer_addr(&self) -> Option<SocketAddr> {
438        self.as_datagram_socket_send().peer_addr()
439    }
440}
441
442impl<T: AsDatagramSocketRecv + Send> DatagramSocketRecv for T {
443    #[inline]
444    fn poll_recv(
445        &mut self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>,
446    ) -> Poll<io::Result<()>> {
447        self.as_datagram_socket_recv().poll_recv(cx, buf)
448    }
449
450    #[inline]
451    fn poll_recv_from(
452        &mut self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>,
453    ) -> Poll<io::Result<SocketAddr>> {
454        self.as_datagram_socket_recv().poll_recv_from(cx, buf)
455    }
456
457    #[inline]
458    fn poll_recv_many(
459        &mut self, cx: &mut Context<'_>, bufs: &mut [ReadBuf<'_>],
460    ) -> Poll<io::Result<usize>> {
461        self.as_datagram_socket_recv().poll_recv_many(cx, bufs)
462    }
463
464    #[inline]
465    fn as_udp_socket(&self) -> Option<&UdpSocket> {
466        self.as_shared_datagram_socket_recv().as_udp_socket()
467    }
468}
469
470impl<T> AsDatagramSocketSend for &mut T
471where
472    T: DatagramSocketSend + Send + ?Sized,
473{
474    type AsSend = T;
475
476    fn as_datagram_socket_send(&self) -> &Self::AsSend {
477        self
478    }
479}
480
481impl<T> AsDatagramSocketSend for Box<T>
482where
483    T: DatagramSocketSend + Send + ?Sized,
484{
485    type AsSend = T;
486
487    fn as_datagram_socket_send(&self) -> &Self::AsSend {
488        self
489    }
490}
491
492impl<T> AsDatagramSocketSend for Arc<T>
493where
494    T: DatagramSocketSend + Send + ?Sized,
495{
496    type AsSend = T;
497
498    fn as_datagram_socket_send(&self) -> &Self::AsSend {
499        self
500    }
501}
502
503impl<T> AsDatagramSocketRecv for &mut T
504where
505    T: DatagramSocketRecv + Send + ?Sized,
506{
507    type AsRecv = T;
508
509    fn as_datagram_socket_recv(&mut self) -> &mut Self::AsRecv {
510        self
511    }
512
513    fn as_shared_datagram_socket_recv(&self) -> &Self::AsRecv {
514        self
515    }
516}
517
518impl<T> AsDatagramSocketRecv for Box<T>
519where
520    T: DatagramSocketRecv + Send + ?Sized,
521{
522    type AsRecv = T;
523
524    fn as_datagram_socket_recv(&mut self) -> &mut Self::AsRecv {
525        self
526    }
527
528    fn as_shared_datagram_socket_recv(&self) -> &Self::AsRecv {
529        self
530    }
531}
532
533impl DatagramSocket for UdpSocket {
534    #[cfg(unix)]
535    fn as_raw_io(&self) -> Option<BorrowedFd<'_>> {
536        Some(self.as_fd())
537    }
538
539    #[cfg(unix)]
540    fn into_fd(self) -> Option<OwnedFd> {
541        Some(into_owned_fd(self.into_std().ok()?))
542    }
543}
544
545impl DatagramSocketSend for UdpSocket {
546    #[inline]
547    fn poll_send(&self, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
548        UdpSocket::poll_send(self, cx, buf)
549    }
550
551    #[inline]
552    fn poll_send_to(
553        &self, cx: &mut Context, buf: &[u8], addr: SocketAddr,
554    ) -> Poll<io::Result<usize>> {
555        UdpSocket::poll_send_to(self, cx, buf, addr)
556    }
557
558    #[cfg(target_os = "linux")]
559    #[inline]
560    fn poll_send_many(
561        &self, cx: &mut Context, bufs: &[ReadBuf<'_>],
562    ) -> Poll<io::Result<usize>> {
563        crate::poll_sendmmsg!(self, cx, bufs)
564    }
565
566    fn as_udp_socket(&self) -> Option<&UdpSocket> {
567        Some(self)
568    }
569
570    fn peer_addr(&self) -> Option<SocketAddr> {
571        self.peer_addr().ok()
572    }
573}
574
575impl DatagramSocketRecv for UdpSocket {
576    #[inline]
577    fn poll_recv(
578        &mut self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>,
579    ) -> Poll<io::Result<()>> {
580        UdpSocket::poll_recv(self, cx, buf)
581    }
582
583    #[cfg(target_os = "linux")]
584    #[inline]
585    fn poll_recv_many(
586        &mut self, cx: &mut Context<'_>, bufs: &mut [ReadBuf<'_>],
587    ) -> Poll<io::Result<usize>> {
588        crate::poll_recvmmsg!(self, cx, bufs)
589    }
590
591    fn as_udp_socket(&self) -> Option<&UdpSocket> {
592        Some(self)
593    }
594}
595
596impl DatagramSocket for Arc<UdpSocket> {
597    #[cfg(unix)]
598    fn as_raw_io(&self) -> Option<BorrowedFd<'_>> {
599        Some(self.as_fd())
600    }
601
602    #[cfg(unix)]
603    fn into_fd(self) -> Option<OwnedFd> {
604        None
605    }
606}
607
608impl DatagramSocketRecv for Arc<UdpSocket> {
609    #[inline]
610    fn poll_recv(
611        &mut self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>,
612    ) -> Poll<io::Result<()>> {
613        UdpSocket::poll_recv(self, cx, buf)
614    }
615
616    #[inline]
617    fn poll_recv_from(
618        &mut self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>,
619    ) -> Poll<io::Result<SocketAddr>> {
620        UdpSocket::poll_recv_from(self, cx, buf)
621    }
622
623    #[cfg(target_os = "linux")]
624    #[inline]
625    fn poll_recv_many(
626        &mut self, cx: &mut Context<'_>, bufs: &mut [ReadBuf<'_>],
627    ) -> Poll<io::Result<usize>> {
628        crate::poll_recvmmsg!(self, cx, bufs)
629    }
630
631    fn as_udp_socket(&self) -> Option<&UdpSocket> {
632        Some(self)
633    }
634}
635
636#[cfg(unix)]
637impl DatagramSocket for UnixDatagram {
638    fn as_raw_io(&self) -> Option<BorrowedFd<'_>> {
639        Some(self.as_fd())
640    }
641
642    fn into_fd(self) -> Option<OwnedFd> {
643        Some(into_owned_fd(self.into_std().ok()?))
644    }
645}
646
647#[cfg(unix)]
648impl DatagramSocketSend for UnixDatagram {
649    #[inline]
650    fn poll_send(&self, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
651        UnixDatagram::poll_send(self, cx, buf)
652    }
653
654    #[inline]
655    fn poll_send_to(
656        &self, _: &mut Context, _: &[u8], _: SocketAddr,
657    ) -> Poll<io::Result<usize>> {
658        Poll::Ready(Err(io::Error::new(
659            io::ErrorKind::Unsupported,
660            "invalid address family",
661        )))
662    }
663
664    #[cfg(target_os = "linux")]
665    #[inline]
666    fn poll_send_many(
667        &self, cx: &mut Context, bufs: &[ReadBuf<'_>],
668    ) -> Poll<io::Result<usize>> {
669        crate::poll_sendmmsg!(self, cx, bufs)
670    }
671}
672
673#[cfg(unix)]
674impl DatagramSocketRecv for UnixDatagram {
675    #[inline]
676    fn poll_recv(
677        &mut self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>,
678    ) -> Poll<io::Result<()>> {
679        UnixDatagram::poll_recv(self, cx, buf)
680    }
681
682    #[cfg(target_os = "linux")]
683    #[inline]
684    fn poll_recv_many(
685        &mut self, cx: &mut Context<'_>, bufs: &mut [ReadBuf<'_>],
686    ) -> Poll<io::Result<usize>> {
687        crate::poll_recvmmsg!(self, cx, bufs)
688    }
689}
690
691#[cfg(unix)]
692impl DatagramSocketRecv for Arc<UnixDatagram> {
693    #[inline]
694    fn poll_recv(
695        &mut self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>,
696    ) -> Poll<io::Result<()>> {
697        UnixDatagram::poll_recv(self, cx, buf)
698    }
699
700    #[cfg(target_os = "linux")]
701    #[inline]
702    fn poll_recv_many(
703        &mut self, cx: &mut Context<'_>, bufs: &mut [ReadBuf<'_>],
704    ) -> Poll<io::Result<usize>> {
705        crate::poll_recvmmsg!(self, cx, bufs)
706    }
707}
708
709/// `Into<OwnedFd>::into` for types (tokio sockets etc) that don't implement
710/// `From<OwnedFd>`.
711#[cfg(unix)]
712fn into_owned_fd<F: IntoRawFd>(into_fd: F) -> OwnedFd {
713    unsafe { OwnedFd::from_raw_fd(into_fd.into_raw_fd()) }
714}
715
716/// A cheap wrapper around a datagram socket which describes if it is connected
717/// to an explicit peer.
718///
719/// This struct essentially forwards its underlying socket's `send_to()` method
720/// to `send()` if the socket is explicitly connected to a peer. This is helpful
721/// for preventing issues on platforms that do not support `send_to` on
722/// already-connected sockets.
723///
724/// # Warning
725/// A socket's "connectedness" is determined once, when it is created. If the
726/// socket is created as connected, then later disconnected from its peer, its
727/// `send_to()` call will fail.
728///
729/// For example, MacOS errors if `send_to` is used on a socket that's already
730/// connected. Only `send` can be used. By using `MaybeConnectedSocket`, you can
731/// use the same `send` and `send_to` APIs in both client- and server-side code.
732/// Clients, usually with connected sockets, will then forward `send_to` to
733/// `send`, whereas servers, usually with unconnected sockets, will use
734/// `send_to`.
735#[derive(Clone)]
736pub struct MaybeConnectedSocket<T> {
737    inner: T,
738    peer: Option<SocketAddr>,
739}
740
741impl<T: DatagramSocketSend> MaybeConnectedSocket<T> {
742    pub fn new(inner: T) -> Self {
743        Self {
744            peer: inner.peer_addr(),
745            inner,
746        }
747    }
748
749    /// Provides access to the wrapped socket, allowing the user to override
750    /// `send_to()` behavior if required.
751    pub fn inner(&self) -> &T {
752        &self.inner
753    }
754
755    /// Consumes `self`, returning the wrapped socket.
756    pub fn into_inner(self) -> T {
757        self.inner
758    }
759}
760
761impl<T: DatagramSocketSend> DatagramSocketSend for MaybeConnectedSocket<T> {
762    #[inline]
763    fn poll_send(&self, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
764        self.inner.poll_send(cx, buf)
765    }
766
767    #[inline]
768    fn poll_send_to(
769        &self, cx: &mut Context, buf: &[u8], addr: SocketAddr,
770    ) -> Poll<io::Result<usize>> {
771        if let Some(peer) = self.peer {
772            debug_assert_eq!(peer, addr);
773            self.inner.poll_send(cx, buf)
774        } else {
775            self.inner.poll_send_to(cx, buf, addr)
776        }
777    }
778
779    #[inline]
780    fn poll_send_many(
781        &self, cx: &mut Context, bufs: &[ReadBuf<'_>],
782    ) -> Poll<io::Result<usize>> {
783        self.inner.poll_send_many(cx, bufs)
784    }
785
786    #[inline]
787    fn as_udp_socket(&self) -> Option<&UdpSocket> {
788        self.inner.as_udp_socket()
789    }
790
791    #[inline]
792    fn peer_addr(&self) -> Option<SocketAddr> {
793        self.peer
794    }
795}