Skip to main content

datagram_socket/
datagram.rs

1// Copyright (C) 2025, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27use futures_util::future::poll_fn;
28use futures_util::ready;
29use futures_util::FutureExt;
30use std::future::Future;
31use std::io;
32use std::net::Ipv4Addr;
33use std::net::SocketAddr;
34use std::net::SocketAddrV4;
35use std::sync::Arc;
36use std::task::Context;
37use std::task::Poll;
38use tokio::io::ReadBuf;
39use tokio::net::UdpSocket;
40use tokio::task::coop::unconstrained;
41
42#[cfg(unix)]
43use std::os::fd::AsFd;
44#[cfg(unix)]
45use std::os::fd::BorrowedFd;
46#[cfg(unix)]
47use std::os::fd::FromRawFd;
48#[cfg(unix)]
49use std::os::fd::IntoRawFd;
50#[cfg(unix)]
51use std::os::fd::OwnedFd;
52#[cfg(unix)]
53use tokio::net::UnixDatagram;
54
55use crate::socket_stats::AsSocketStats;
56
57// This is the largest datagram we expect to support.
58// UDP and Unix sockets can support larger datagrams than this, but we only
59// expect to support packets coming to/from the Internet.
60pub const MAX_DATAGRAM_SIZE: usize = 1500;
61
62pub trait DatagramSocketWithStats: DatagramSocket {}
63
64impl<T> DatagramSocketWithStats for T where T: DatagramSocket + AsSocketStats {}
65
66pub trait RawPoolBufDatagramIo: Send {
67    fn poll_send_datagrams(
68        &mut self, cx: &mut Context, datagrams: &mut [crate::DgramBuffer],
69    ) -> Poll<io::Result<usize>>;
70
71    fn poll_recv_dgram(
72        &mut self, cx: &mut Context,
73    ) -> Poll<io::Result<crate::DgramBuffer>>;
74
75    fn poll_recv_datagrams(
76        &mut self, cx: &mut Context, buffer: &mut Vec<crate::DgramBuffer>,
77        dgram_limit: usize,
78    ) -> Poll<io::Result<usize>> {
79        for i in 0..dgram_limit {
80            match self.poll_recv_dgram(cx) {
81                Poll::Ready(Ok(buf)) => buffer.push(buf),
82                Poll::Ready(Err(err)) =>
83                    if i > 0 {
84                        return Poll::Ready(Ok(i));
85                    } else {
86                        return Poll::Ready(Err(err));
87                    },
88                Poll::Pending =>
89                    if i > 0 {
90                        return Poll::Ready(Ok(i));
91                    } else {
92                        return Poll::Pending;
93                    },
94            }
95        }
96
97        Poll::Ready(Ok(dgram_limit))
98    }
99}
100
101/// Describes an implementation of a connected datagram socket.
102///
103/// Rather than using Socket for datagram-oriented sockets, the DatagramSocket
104/// trait purposely does not implement AsyncRead/AsyncWrite, which are traits
105/// with stream semantics. For example, the `AsyncReadExt::read_exact` method
106/// which issues as many reads as possible to fill the buffer provided.
107///
108/// For a similar reason, [`std::net::UdpSocket`] does not implement
109/// [`io::Read`] nor does [`tokio::net::UdpSocket`] implement
110/// [`tokio::io::AsyncRead`].
111pub trait DatagramSocket:
112    DatagramSocketSend + DatagramSocketRecv + 'static
113{
114    #[cfg(unix)]
115    fn as_raw_io(&self) -> Option<BorrowedFd<'_>>;
116
117    #[cfg(unix)]
118    fn into_fd(self) -> Option<OwnedFd>;
119
120    fn as_buf_io(&mut self) -> Option<&mut dyn RawPoolBufDatagramIo> {
121        None
122    }
123}
124
125/// Describes the send half of a connected datagram socket.
126pub trait DatagramSocketSend: Sync {
127    /// Attempts to send data on the socket to the remote address to which it
128    /// was previously connected.
129    ///
130    /// Note that on multiple calls to a `poll_*` method in the send direction,
131    /// only the `Waker` from the `Context` passed to the most recent call will
132    /// be scheduled to receive a wakeup.
133    ///
134    /// # Return value
135    ///
136    /// The function returns:
137    ///
138    /// * `Poll::Pending` if the socket is not available to write
139    /// * `Poll::Ready(Ok(n))` `n` is the number of bytes sent
140    /// * `Poll::Ready(Err(e))` if an error is encountered.
141    ///
142    /// # Errors
143    ///
144    /// This function may encounter any standard I/O error except `WouldBlock`.
145    fn poll_send(&self, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>>;
146
147    /// Attempts to send data on the socket to a given address.
148    ///
149    /// If this socket only supports a single address, it should forward to
150    /// `send`. It should *not* panic or discard the data.
151    /// It's recommended that this return an error if `addr` doesn't match the
152    /// only supported address.
153    ///
154    /// Note that on multiple calls to a `poll_*` method in the send direction,
155    /// only the `Waker` from the `Context` passed to the most recent call
156    /// will be scheduled to receive a wakeup.
157    ///
158    /// # Return value
159    ///
160    /// The function returns:
161    ///
162    /// * `Poll::Pending` if the socket is not ready to write
163    /// * `Poll::Ready(Ok(n))` `n` is the number of bytes sent.
164    /// * `Poll::Ready(Err(e))` if an error is encountered.
165    ///
166    /// # Errors
167    ///
168    /// This function may encounter any standard I/O error except `WouldBlock`.
169    fn poll_send_to(
170        &self, cx: &mut Context, buf: &[u8], addr: SocketAddr,
171    ) -> Poll<io::Result<usize>>;
172
173    /// Attempts to send multiple packets of data on the socket to the remote
174    /// address to which it was previously connected.
175    ///
176    /// Note that on multiple calls to a `poll_*` method in the send direction,
177    /// only the `Waker` from the `Context` passed to the most recent call
178    /// will be scheduled to receive a wakeup.
179    ///
180    /// # Return value
181    ///
182    /// The function returns:
183    ///
184    /// * `Poll::Pending` if the socket is not ready to write
185    /// * `Poll::Ready(Ok(n))` `n` is the number of packets sent. If any packet
186    ///   was sent only partially, that information is lost.
187    /// * `Poll::Ready(Err(e))` if an error is encountered.
188    ///
189    /// # Errors
190    ///
191    /// This function may encounter any standard I/O error except `WouldBlock`.
192    fn poll_send_many(
193        &self, cx: &mut Context, bufs: &[ReadBuf<'_>],
194    ) -> Poll<io::Result<usize>> {
195        let mut sent = 0;
196
197        for buf in bufs {
198            match self.poll_send(cx, buf.filled()) {
199                Poll::Ready(Ok(_)) => sent += 1,
200                Poll::Ready(err) => {
201                    if sent == 0 {
202                        return Poll::Ready(err);
203                    }
204                    break;
205                },
206                Poll::Pending => {
207                    if sent == 0 {
208                        return Poll::Pending;
209                    }
210                    break;
211                },
212            }
213        }
214
215        Poll::Ready(Ok(sent))
216    }
217
218    /// If the underlying socket is a `UdpSocket`, return the reference to it.
219    fn as_udp_socket(&self) -> Option<&UdpSocket> {
220        None
221    }
222
223    /// Returns the socket address of the remote peer this socket was connected
224    /// to.
225    fn peer_addr(&self) -> Option<SocketAddr> {
226        None
227    }
228}
229
230/// Writes datagrams to a socket.
231///
232/// Implemented as an extension trait, adding utility methods to all
233/// [`DatagramSocketSend`] types. Callers will tend to import this trait instead
234/// of [`DatagramSocketSend`].
235///
236/// [`DatagramSocketSend`]: DatagramSocketSend
237pub trait DatagramSocketSendExt: DatagramSocketSend {
238    /// Sends data on the socket to the remote address that the socket is
239    /// connected to.
240    fn send(&self, buf: &[u8]) -> impl Future<Output = io::Result<usize>> {
241        poll_fn(move |cx| self.poll_send(cx, buf))
242    }
243
244    /// Sends data on the socket to the given address. On success, returns the
245    /// number of bytes written.
246    fn send_to(
247        &self, buf: &[u8], addr: SocketAddr,
248    ) -> impl Future<Output = io::Result<usize>> {
249        poll_fn(move |cx| self.poll_send_to(cx, buf, addr))
250    }
251
252    /// Sends multiple data packets on the socket to the to the remote address
253    /// that the socket is connected to. On success, returns the number of
254    /// packets sent.
255    fn send_many(
256        &self, bufs: &[ReadBuf<'_>],
257    ) -> impl Future<Output = io::Result<usize>> {
258        poll_fn(move |cx| self.poll_send_many(cx, bufs))
259    }
260
261    fn try_send(&self, buf: &[u8]) -> io::Result<usize> {
262        match unconstrained(poll_fn(|cx| self.poll_send(cx, buf))).now_or_never()
263        {
264            Some(result) => result,
265            None => Err(io::ErrorKind::WouldBlock.into()),
266        }
267    }
268
269    fn try_send_many(&self, bufs: &[ReadBuf<'_>]) -> io::Result<usize> {
270        match unconstrained(poll_fn(|cx| self.poll_send_many(cx, bufs)))
271            .now_or_never()
272        {
273            Some(result) => result,
274            None => Err(io::ErrorKind::WouldBlock.into()),
275        }
276    }
277}
278
279/// Describes the receive half of a connected datagram socket.
280pub trait DatagramSocketRecv: Send {
281    /// Attempts to receive a single datagram message on the socket from the
282    /// remote address to which it is `connect`ed.
283    ///
284    /// Note that on multiple calls to a `poll_*` method in the `recv`
285    /// direction, only the `Waker` from the `Context` passed to the most
286    /// recent call will be scheduled to receive a wakeup.
287    ///
288    /// # Return value
289    ///
290    /// The function returns:
291    ///
292    /// * `Poll::Pending` if the socket is not ready to read
293    /// * `Poll::Ready(Ok(()))` reads data `ReadBuf` if the socket is ready
294    /// * `Poll::Ready(Err(e))` if an error is encountered.
295    ///
296    /// # Errors
297    ///
298    /// This function may encounter any standard I/O error except `WouldBlock`.
299    fn poll_recv(
300        &mut self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>,
301    ) -> Poll<io::Result<()>>;
302
303    /// Attempts to receive a single datagram on the socket.
304    ///
305    /// Note that on multiple calls to a `poll_*` method in the `recv`
306    /// direction, only the `Waker` from the `Context` passed to the most
307    /// recent call will be scheduled to receive a wakeup.
308    ///
309    /// # Return value
310    ///
311    /// The function returns:
312    ///
313    /// * `Poll::Pending` if the socket is not ready to read
314    /// * `Poll::Ready(Ok(addr))` reads data from `addr` into `ReadBuf` if the
315    ///   socket is ready
316    /// * `Poll::Ready(Err(e))` if an error is encountered.
317    ///
318    /// # Errors
319    ///
320    /// This function may encounter any standard I/O error except `WouldBlock`.
321    fn poll_recv_from(
322        &mut self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>,
323    ) -> Poll<io::Result<SocketAddr>> {
324        self.poll_recv(cx, buf).map_ok(|_| {
325            SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0))
326        })
327    }
328
329    /// Attempts to receive multiple datagrams on the socket from the remote
330    /// address to which it is `connect`ed.
331    ///
332    /// Note that on multiple calls to a `poll_*` method in the `recv`
333    /// direction, only the `Waker` from the `Context` passed to the most
334    /// recent call will be scheduled to receive a wakeup.
335    ///
336    /// # Return value
337    ///
338    /// The function returns:
339    ///
340    /// * `Poll::Pending` if the socket is not ready to read
341    /// * `Poll::Ready(Ok(n))` reads data `ReadBuf` if the socket is ready `n`
342    ///   is the number of datagrams read.
343    /// * `Poll::Ready(Err(e))` if an error is encountered.
344    ///
345    /// # Errors
346    ///
347    /// This function may encounter any standard I/O error except `WouldBlock`.
348    fn poll_recv_many(
349        &mut self, cx: &mut Context<'_>, bufs: &mut [ReadBuf<'_>],
350    ) -> Poll<io::Result<usize>> {
351        let mut read = 0;
352
353        for buf in bufs {
354            match self.poll_recv(cx, buf) {
355                Poll::Ready(Ok(())) => read += 1,
356
357                Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
358
359                // Only return `Poll::Ready` if at least one datagram was
360                // successfully read, otherwise block.
361                Poll::Pending if read == 0 => return Poll::Pending,
362                Poll::Pending => break,
363            }
364        }
365
366        Poll::Ready(Ok(read))
367    }
368
369    /// If the underlying socket is a `UdpSocket`, return the reference to it.
370    fn as_udp_socket(&self) -> Option<&UdpSocket> {
371        None
372    }
373}
374
375/// Reads datagrams from a socket.
376///
377/// Implemented as an extension trait, adding utility methods to all
378/// [`DatagramSocketRecv`] types. Callers will tend to import this trait instead
379/// of [`DatagramSocketRecv`].
380///
381/// [`DatagramSocketRecv`]: DatagramSocketRecv
382pub trait DatagramSocketRecvExt: DatagramSocketRecv {
383    /// Receives a single datagram message on the socket from the remote address
384    /// to which it is connected. On success, returns the number of bytes read.
385    fn recv(
386        &mut self, buf: &mut [u8],
387    ) -> impl Future<Output = io::Result<usize>> + Send {
388        poll_fn(|cx| {
389            let mut buf = ReadBuf::new(buf);
390
391            ready!(self.poll_recv(cx, &mut buf)?);
392
393            Poll::Ready(Ok(buf.filled().len()))
394        })
395    }
396
397    /// Receives a single datagram message on the socket. On success, returns
398    /// the number of bytes read and the origin.
399    fn recv_from(
400        &mut self, buf: &mut [u8],
401    ) -> impl Future<Output = io::Result<(usize, SocketAddr)>> + Send {
402        poll_fn(|cx| {
403            let mut buf = ReadBuf::new(buf);
404
405            let addr = ready!(self.poll_recv_from(cx, &mut buf)?);
406
407            Poll::Ready(Ok((buf.filled().len(), addr)))
408        })
409    }
410
411    /// Receives multiple datagrams on the socket from the remote address
412    /// to which it is connected. Returns the number of buffers used (i.e.
413    /// number of datagrams read). Each used buffer can be read up to its
414    /// `filled().len()`.
415    fn recv_many(
416        &mut self, bufs: &mut [ReadBuf<'_>],
417    ) -> impl Future<Output = io::Result<usize>> + Send {
418        poll_fn(|cx| self.poll_recv_many(cx, bufs))
419    }
420}
421
422impl<T: DatagramSocketSend + ?Sized> DatagramSocketSendExt for T {}
423
424impl<T: DatagramSocketRecv + ?Sized> DatagramSocketRecvExt for T {}
425
426/// A convenience method that can be implemented for any type if it wants
427/// to forward its `DatagramSocketSend` functionality to an inner field/socket.
428/// This automatically derives `DatagramSocketSend`.
429pub trait AsDatagramSocketSend {
430    type AsSend: DatagramSocketSend + ?Sized;
431
432    fn as_datagram_socket_send(&self) -> &Self::AsSend;
433}
434
435/// A convenience method that can be implemented for any type if it wants
436/// to forward its `DatagramSocketRecv` functionality to an inner field/socket.
437/// This automatically derives `DatagramSocketRecv`.
438pub trait AsDatagramSocketRecv {
439    type AsRecv: DatagramSocketRecv + ?Sized;
440
441    fn as_datagram_socket_recv(&mut self) -> &mut Self::AsRecv;
442    fn as_shared_datagram_socket_recv(&self) -> &Self::AsRecv;
443}
444
445impl<T: AsDatagramSocketSend + Sync> DatagramSocketSend for T {
446    #[inline]
447    fn poll_send(&self, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
448        self.as_datagram_socket_send().poll_send(cx, buf)
449    }
450
451    #[inline]
452    fn poll_send_to(
453        &self, cx: &mut Context, buf: &[u8], addr: SocketAddr,
454    ) -> Poll<io::Result<usize>> {
455        self.as_datagram_socket_send().poll_send_to(cx, buf, addr)
456    }
457
458    #[inline]
459    fn poll_send_many(
460        &self, cx: &mut Context, bufs: &[ReadBuf<'_>],
461    ) -> Poll<io::Result<usize>> {
462        self.as_datagram_socket_send().poll_send_many(cx, bufs)
463    }
464
465    #[inline]
466    fn as_udp_socket(&self) -> Option<&UdpSocket> {
467        self.as_datagram_socket_send().as_udp_socket()
468    }
469
470    #[inline]
471    fn peer_addr(&self) -> Option<SocketAddr> {
472        self.as_datagram_socket_send().peer_addr()
473    }
474}
475
476impl<T: AsDatagramSocketRecv + Send> DatagramSocketRecv for T {
477    #[inline]
478    fn poll_recv(
479        &mut self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>,
480    ) -> Poll<io::Result<()>> {
481        self.as_datagram_socket_recv().poll_recv(cx, buf)
482    }
483
484    #[inline]
485    fn poll_recv_from(
486        &mut self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>,
487    ) -> Poll<io::Result<SocketAddr>> {
488        self.as_datagram_socket_recv().poll_recv_from(cx, buf)
489    }
490
491    #[inline]
492    fn poll_recv_many(
493        &mut self, cx: &mut Context<'_>, bufs: &mut [ReadBuf<'_>],
494    ) -> Poll<io::Result<usize>> {
495        self.as_datagram_socket_recv().poll_recv_many(cx, bufs)
496    }
497
498    #[inline]
499    fn as_udp_socket(&self) -> Option<&UdpSocket> {
500        self.as_shared_datagram_socket_recv().as_udp_socket()
501    }
502}
503
504impl<T> AsDatagramSocketSend for &mut T
505where
506    T: DatagramSocketSend + Send + ?Sized,
507{
508    type AsSend = T;
509
510    fn as_datagram_socket_send(&self) -> &Self::AsSend {
511        self
512    }
513}
514
515impl<T> AsDatagramSocketSend for Box<T>
516where
517    T: DatagramSocketSend + Send + ?Sized,
518{
519    type AsSend = T;
520
521    fn as_datagram_socket_send(&self) -> &Self::AsSend {
522        self
523    }
524}
525
526impl<T> AsDatagramSocketSend for Arc<T>
527where
528    T: DatagramSocketSend + Send + ?Sized,
529{
530    type AsSend = T;
531
532    fn as_datagram_socket_send(&self) -> &Self::AsSend {
533        self
534    }
535}
536
537impl<T> AsDatagramSocketRecv for &mut T
538where
539    T: DatagramSocketRecv + Send + ?Sized,
540{
541    type AsRecv = T;
542
543    fn as_datagram_socket_recv(&mut self) -> &mut Self::AsRecv {
544        self
545    }
546
547    fn as_shared_datagram_socket_recv(&self) -> &Self::AsRecv {
548        self
549    }
550}
551
552impl<T> AsDatagramSocketRecv for Box<T>
553where
554    T: DatagramSocketRecv + Send + ?Sized,
555{
556    type AsRecv = T;
557
558    fn as_datagram_socket_recv(&mut self) -> &mut Self::AsRecv {
559        self
560    }
561
562    fn as_shared_datagram_socket_recv(&self) -> &Self::AsRecv {
563        self
564    }
565}
566
567impl DatagramSocket for UdpSocket {
568    #[cfg(unix)]
569    fn as_raw_io(&self) -> Option<BorrowedFd<'_>> {
570        Some(self.as_fd())
571    }
572
573    #[cfg(unix)]
574    fn into_fd(self) -> Option<OwnedFd> {
575        Some(into_owned_fd(self.into_std().ok()?))
576    }
577}
578
579impl DatagramSocketSend for UdpSocket {
580    #[inline]
581    fn poll_send(&self, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
582        UdpSocket::poll_send(self, cx, buf)
583    }
584
585    #[inline]
586    fn poll_send_to(
587        &self, cx: &mut Context, buf: &[u8], addr: SocketAddr,
588    ) -> Poll<io::Result<usize>> {
589        UdpSocket::poll_send_to(self, cx, buf, addr)
590    }
591
592    #[cfg(target_os = "linux")]
593    #[inline]
594    fn poll_send_many(
595        &self, cx: &mut Context, bufs: &[ReadBuf<'_>],
596    ) -> Poll<io::Result<usize>> {
597        crate::poll_sendmmsg!(self, cx, bufs)
598    }
599
600    fn as_udp_socket(&self) -> Option<&UdpSocket> {
601        Some(self)
602    }
603
604    fn peer_addr(&self) -> Option<SocketAddr> {
605        self.peer_addr().ok()
606    }
607}
608
609impl DatagramSocketRecv for UdpSocket {
610    #[inline]
611    fn poll_recv(
612        &mut self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>,
613    ) -> Poll<io::Result<()>> {
614        UdpSocket::poll_recv(self, cx, buf)
615    }
616
617    #[cfg(target_os = "linux")]
618    #[inline]
619    fn poll_recv_many(
620        &mut self, cx: &mut Context<'_>, bufs: &mut [ReadBuf<'_>],
621    ) -> Poll<io::Result<usize>> {
622        crate::poll_recvmmsg!(self, cx, bufs)
623    }
624
625    fn as_udp_socket(&self) -> Option<&UdpSocket> {
626        Some(self)
627    }
628}
629
630impl DatagramSocket for Arc<UdpSocket> {
631    #[cfg(unix)]
632    fn as_raw_io(&self) -> Option<BorrowedFd<'_>> {
633        Some(self.as_fd())
634    }
635
636    #[cfg(unix)]
637    fn into_fd(self) -> Option<OwnedFd> {
638        None
639    }
640}
641
642impl DatagramSocketRecv for Arc<UdpSocket> {
643    #[inline]
644    fn poll_recv(
645        &mut self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>,
646    ) -> Poll<io::Result<()>> {
647        UdpSocket::poll_recv(self, cx, buf)
648    }
649
650    #[inline]
651    fn poll_recv_from(
652        &mut self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>,
653    ) -> Poll<io::Result<SocketAddr>> {
654        UdpSocket::poll_recv_from(self, cx, buf)
655    }
656
657    #[cfg(target_os = "linux")]
658    #[inline]
659    fn poll_recv_many(
660        &mut self, cx: &mut Context<'_>, bufs: &mut [ReadBuf<'_>],
661    ) -> Poll<io::Result<usize>> {
662        crate::poll_recvmmsg!(self, cx, bufs)
663    }
664
665    fn as_udp_socket(&self) -> Option<&UdpSocket> {
666        Some(self)
667    }
668}
669
670#[cfg(unix)]
671impl DatagramSocket for UnixDatagram {
672    fn as_raw_io(&self) -> Option<BorrowedFd<'_>> {
673        Some(self.as_fd())
674    }
675
676    fn into_fd(self) -> Option<OwnedFd> {
677        Some(into_owned_fd(self.into_std().ok()?))
678    }
679}
680
681#[cfg(unix)]
682impl DatagramSocketSend for UnixDatagram {
683    #[inline]
684    fn poll_send(&self, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
685        UnixDatagram::poll_send(self, cx, buf)
686    }
687
688    #[inline]
689    fn poll_send_to(
690        &self, _: &mut Context, _: &[u8], _: SocketAddr,
691    ) -> Poll<io::Result<usize>> {
692        Poll::Ready(Err(io::Error::new(
693            io::ErrorKind::Unsupported,
694            "invalid address family",
695        )))
696    }
697
698    #[cfg(target_os = "linux")]
699    #[inline]
700    fn poll_send_many(
701        &self, cx: &mut Context, bufs: &[ReadBuf<'_>],
702    ) -> Poll<io::Result<usize>> {
703        crate::poll_sendmmsg!(self, cx, bufs)
704    }
705}
706
707#[cfg(unix)]
708impl DatagramSocketRecv for UnixDatagram {
709    #[inline]
710    fn poll_recv(
711        &mut self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>,
712    ) -> Poll<io::Result<()>> {
713        UnixDatagram::poll_recv(self, cx, buf)
714    }
715
716    #[cfg(target_os = "linux")]
717    #[inline]
718    fn poll_recv_many(
719        &mut self, cx: &mut Context<'_>, bufs: &mut [ReadBuf<'_>],
720    ) -> Poll<io::Result<usize>> {
721        crate::poll_recvmmsg!(self, cx, bufs)
722    }
723}
724
725#[cfg(unix)]
726impl DatagramSocketRecv for Arc<UnixDatagram> {
727    #[inline]
728    fn poll_recv(
729        &mut self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>,
730    ) -> Poll<io::Result<()>> {
731        UnixDatagram::poll_recv(self, cx, buf)
732    }
733
734    #[cfg(target_os = "linux")]
735    #[inline]
736    fn poll_recv_many(
737        &mut self, cx: &mut Context<'_>, bufs: &mut [ReadBuf<'_>],
738    ) -> Poll<io::Result<usize>> {
739        crate::poll_recvmmsg!(self, cx, bufs)
740    }
741}
742
743/// `Into<OwnedFd>::into` for types (tokio sockets etc) that don't implement
744/// `From<OwnedFd>`.
745#[cfg(unix)]
746fn into_owned_fd<F: IntoRawFd>(into_fd: F) -> OwnedFd {
747    unsafe { OwnedFd::from_raw_fd(into_fd.into_raw_fd()) }
748}
749
750/// A cheap wrapper around a datagram socket which describes if it is connected
751/// to an explicit peer.
752///
753/// This struct essentially forwards its underlying socket's `send_to()` method
754/// to `send()` if the socket is explicitly connected to a peer. This is helpful
755/// for preventing issues on platforms that do not support `send_to` on
756/// already-connected sockets.
757///
758/// # Warning
759/// A socket's "connectedness" is determined once, when it is created. If the
760/// socket is created as connected, then later disconnected from its peer, its
761/// `send_to()` call will fail.
762///
763/// For example, MacOS errors if `send_to` is used on a socket that's already
764/// connected. Only `send` can be used. By using `MaybeConnectedSocket`, you can
765/// use the same `send` and `send_to` APIs in both client- and server-side code.
766/// Clients, usually with connected sockets, will then forward `send_to` to
767/// `send`, whereas servers, usually with unconnected sockets, will use
768/// `send_to`.
769#[derive(Clone)]
770pub struct MaybeConnectedSocket<T> {
771    inner: T,
772    peer: Option<SocketAddr>,
773}
774
775impl<T: DatagramSocketSend> MaybeConnectedSocket<T> {
776    pub fn new(inner: T) -> Self {
777        Self {
778            peer: inner.peer_addr(),
779            inner,
780        }
781    }
782
783    /// Provides access to the wrapped socket, allowing the user to override
784    /// `send_to()` behavior if required.
785    pub fn inner(&self) -> &T {
786        &self.inner
787    }
788
789    /// Consumes `self`, returning the wrapped socket.
790    pub fn into_inner(self) -> T {
791        self.inner
792    }
793}
794
795impl<T: DatagramSocketSend> DatagramSocketSend for MaybeConnectedSocket<T> {
796    #[inline]
797    fn poll_send(&self, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
798        self.inner.poll_send(cx, buf)
799    }
800
801    #[inline]
802    fn poll_send_to(
803        &self, cx: &mut Context, buf: &[u8], addr: SocketAddr,
804    ) -> Poll<io::Result<usize>> {
805        if let Some(peer) = self.peer {
806            debug_assert_eq!(peer, addr);
807            self.inner.poll_send(cx, buf)
808        } else {
809            self.inner.poll_send_to(cx, buf, addr)
810        }
811    }
812
813    #[inline]
814    fn poll_send_many(
815        &self, cx: &mut Context, bufs: &[ReadBuf<'_>],
816    ) -> Poll<io::Result<usize>> {
817        self.inner.poll_send_many(cx, bufs)
818    }
819
820    #[inline]
821    fn as_udp_socket(&self) -> Option<&UdpSocket> {
822        self.inner.as_udp_socket()
823    }
824
825    #[inline]
826    fn peer_addr(&self) -> Option<SocketAddr> {
827        self.peer
828    }
829}