1use buffer_pool::RawPoolBufDatagramIo;
28use futures_util::future::poll_fn;
29use futures_util::ready;
30use futures_util::FutureExt;
31use std::future::Future;
32use std::io;
33use std::net::Ipv4Addr;
34use std::net::SocketAddr;
35use std::net::SocketAddrV4;
36use std::sync::Arc;
37use std::task::Context;
38use std::task::Poll;
39use tokio::io::ReadBuf;
40use tokio::net::UdpSocket;
41use tokio::task::coop::unconstrained;
42
43#[cfg(unix)]
44use std::os::fd::AsFd;
45#[cfg(unix)]
46use std::os::fd::BorrowedFd;
47#[cfg(unix)]
48use std::os::fd::FromRawFd;
49#[cfg(unix)]
50use std::os::fd::IntoRawFd;
51#[cfg(unix)]
52use std::os::fd::OwnedFd;
53#[cfg(unix)]
54use tokio::net::UnixDatagram;
55
56use crate::socket_stats::AsSocketStats;
57
58pub const MAX_DATAGRAM_SIZE: usize = 1500;
62
63pub trait DatagramSocketWithStats: DatagramSocket {}
64
65impl<T> DatagramSocketWithStats for T where T: DatagramSocket + AsSocketStats {}
66
67pub trait DatagramSocket:
78    DatagramSocketSend + DatagramSocketRecv + 'static
79{
80    #[cfg(unix)]
81    fn as_raw_io(&self) -> Option<BorrowedFd<'_>>;
82
83    #[cfg(unix)]
84    fn into_fd(self) -> Option<OwnedFd>;
85
86    fn as_buf_io(&mut self) -> Option<&mut dyn RawPoolBufDatagramIo> {
87        None
88    }
89}
90
91pub trait DatagramSocketSend: Sync {
93    fn poll_send(&self, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>>;
112
113    fn poll_send_to(
136        &self, cx: &mut Context, buf: &[u8], addr: SocketAddr,
137    ) -> Poll<io::Result<usize>>;
138
139    fn poll_send_many(
159        &self, cx: &mut Context, bufs: &[ReadBuf<'_>],
160    ) -> Poll<io::Result<usize>> {
161        let mut sent = 0;
162
163        for buf in bufs {
164            match self.poll_send(cx, buf.filled()) {
165                Poll::Ready(Ok(_)) => sent += 1,
166                Poll::Ready(err) => {
167                    if sent == 0 {
168                        return Poll::Ready(err);
169                    }
170                    break;
171                },
172                Poll::Pending => {
173                    if sent == 0 {
174                        return Poll::Pending;
175                    }
176                    break;
177                },
178            }
179        }
180
181        Poll::Ready(Ok(sent))
182    }
183
184    fn as_udp_socket(&self) -> Option<&UdpSocket> {
186        None
187    }
188
189    fn peer_addr(&self) -> Option<SocketAddr> {
192        None
193    }
194}
195
196pub trait DatagramSocketSendExt: DatagramSocketSend {
204    fn send(&self, buf: &[u8]) -> impl Future<Output = io::Result<usize>> {
207        poll_fn(move |cx| self.poll_send(cx, buf))
208    }
209
210    fn send_to(
213        &self, buf: &[u8], addr: SocketAddr,
214    ) -> impl Future<Output = io::Result<usize>> {
215        poll_fn(move |cx| self.poll_send_to(cx, buf, addr))
216    }
217
218    fn send_many(
222        &self, bufs: &[ReadBuf<'_>],
223    ) -> impl Future<Output = io::Result<usize>> {
224        poll_fn(move |cx| self.poll_send_many(cx, bufs))
225    }
226
227    fn try_send(&self, buf: &[u8]) -> io::Result<usize> {
228        match unconstrained(poll_fn(|cx| self.poll_send(cx, buf))).now_or_never()
229        {
230            Some(result) => result,
231            None => Err(io::ErrorKind::WouldBlock.into()),
232        }
233    }
234
235    fn try_send_many(&self, bufs: &[ReadBuf<'_>]) -> io::Result<usize> {
236        match unconstrained(poll_fn(|cx| self.poll_send_many(cx, bufs)))
237            .now_or_never()
238        {
239            Some(result) => result,
240            None => Err(io::ErrorKind::WouldBlock.into()),
241        }
242    }
243}
244
245pub trait DatagramSocketRecv: Send {
247    fn poll_recv(
266        &mut self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>,
267    ) -> Poll<io::Result<()>>;
268
269    fn poll_recv_from(
288        &mut self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>,
289    ) -> Poll<io::Result<SocketAddr>> {
290        self.poll_recv(cx, buf).map_ok(|_| {
291            SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0))
292        })
293    }
294
295    fn poll_recv_many(
315        &mut self, cx: &mut Context<'_>, bufs: &mut [ReadBuf<'_>],
316    ) -> Poll<io::Result<usize>> {
317        let mut read = 0;
318
319        for buf in bufs {
320            match self.poll_recv(cx, buf) {
321                Poll::Ready(Ok(())) => read += 1,
322
323                Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
324
325                Poll::Pending if read == 0 => return Poll::Pending,
328                Poll::Pending => break,
329            }
330        }
331
332        Poll::Ready(Ok(read))
333    }
334
335    fn as_udp_socket(&self) -> Option<&UdpSocket> {
337        None
338    }
339}
340
341pub trait DatagramSocketRecvExt: DatagramSocketRecv {
349    fn recv(
352        &mut self, buf: &mut [u8],
353    ) -> impl Future<Output = io::Result<usize>> + Send {
354        poll_fn(|cx| {
355            let mut buf = ReadBuf::new(buf);
356
357            ready!(self.poll_recv(cx, &mut buf)?);
358
359            Poll::Ready(Ok(buf.filled().len()))
360        })
361    }
362
363    fn recv_from(
366        &mut self, buf: &mut [u8],
367    ) -> impl Future<Output = io::Result<(usize, SocketAddr)>> + Send {
368        poll_fn(|cx| {
369            let mut buf = ReadBuf::new(buf);
370
371            let addr = ready!(self.poll_recv_from(cx, &mut buf)?);
372
373            Poll::Ready(Ok((buf.filled().len(), addr)))
374        })
375    }
376
377    fn recv_many(
382        &mut self, bufs: &mut [ReadBuf<'_>],
383    ) -> impl Future<Output = io::Result<usize>> + Send {
384        poll_fn(|cx| self.poll_recv_many(cx, bufs))
385    }
386}
387
388impl<T: DatagramSocketSend + ?Sized> DatagramSocketSendExt for T {}
389
390impl<T: DatagramSocketRecv + ?Sized> DatagramSocketRecvExt for T {}
391
392pub trait AsDatagramSocketSend {
396    type AsSend: DatagramSocketSend + ?Sized;
397
398    fn as_datagram_socket_send(&self) -> &Self::AsSend;
399}
400
401pub trait AsDatagramSocketRecv {
405    type AsRecv: DatagramSocketRecv + ?Sized;
406
407    fn as_datagram_socket_recv(&mut self) -> &mut Self::AsRecv;
408    fn as_shared_datagram_socket_recv(&self) -> &Self::AsRecv;
409}
410
411impl<T: AsDatagramSocketSend + Sync> DatagramSocketSend for T {
412    #[inline]
413    fn poll_send(&self, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
414        self.as_datagram_socket_send().poll_send(cx, buf)
415    }
416
417    #[inline]
418    fn poll_send_to(
419        &self, cx: &mut Context, buf: &[u8], addr: SocketAddr,
420    ) -> Poll<io::Result<usize>> {
421        self.as_datagram_socket_send().poll_send_to(cx, buf, addr)
422    }
423
424    #[inline]
425    fn poll_send_many(
426        &self, cx: &mut Context, bufs: &[ReadBuf<'_>],
427    ) -> Poll<io::Result<usize>> {
428        self.as_datagram_socket_send().poll_send_many(cx, bufs)
429    }
430
431    #[inline]
432    fn as_udp_socket(&self) -> Option<&UdpSocket> {
433        self.as_datagram_socket_send().as_udp_socket()
434    }
435
436    #[inline]
437    fn peer_addr(&self) -> Option<SocketAddr> {
438        self.as_datagram_socket_send().peer_addr()
439    }
440}
441
442impl<T: AsDatagramSocketRecv + Send> DatagramSocketRecv for T {
443    #[inline]
444    fn poll_recv(
445        &mut self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>,
446    ) -> Poll<io::Result<()>> {
447        self.as_datagram_socket_recv().poll_recv(cx, buf)
448    }
449
450    #[inline]
451    fn poll_recv_from(
452        &mut self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>,
453    ) -> Poll<io::Result<SocketAddr>> {
454        self.as_datagram_socket_recv().poll_recv_from(cx, buf)
455    }
456
457    #[inline]
458    fn poll_recv_many(
459        &mut self, cx: &mut Context<'_>, bufs: &mut [ReadBuf<'_>],
460    ) -> Poll<io::Result<usize>> {
461        self.as_datagram_socket_recv().poll_recv_many(cx, bufs)
462    }
463
464    #[inline]
465    fn as_udp_socket(&self) -> Option<&UdpSocket> {
466        self.as_shared_datagram_socket_recv().as_udp_socket()
467    }
468}
469
470impl<T> AsDatagramSocketSend for &mut T
471where
472    T: DatagramSocketSend + Send + ?Sized,
473{
474    type AsSend = T;
475
476    fn as_datagram_socket_send(&self) -> &Self::AsSend {
477        self
478    }
479}
480
481impl<T> AsDatagramSocketSend for Box<T>
482where
483    T: DatagramSocketSend + Send + ?Sized,
484{
485    type AsSend = T;
486
487    fn as_datagram_socket_send(&self) -> &Self::AsSend {
488        self
489    }
490}
491
492impl<T> AsDatagramSocketSend for Arc<T>
493where
494    T: DatagramSocketSend + Send + ?Sized,
495{
496    type AsSend = T;
497
498    fn as_datagram_socket_send(&self) -> &Self::AsSend {
499        self
500    }
501}
502
503impl<T> AsDatagramSocketRecv for &mut T
504where
505    T: DatagramSocketRecv + Send + ?Sized,
506{
507    type AsRecv = T;
508
509    fn as_datagram_socket_recv(&mut self) -> &mut Self::AsRecv {
510        self
511    }
512
513    fn as_shared_datagram_socket_recv(&self) -> &Self::AsRecv {
514        self
515    }
516}
517
518impl<T> AsDatagramSocketRecv for Box<T>
519where
520    T: DatagramSocketRecv + Send + ?Sized,
521{
522    type AsRecv = T;
523
524    fn as_datagram_socket_recv(&mut self) -> &mut Self::AsRecv {
525        self
526    }
527
528    fn as_shared_datagram_socket_recv(&self) -> &Self::AsRecv {
529        self
530    }
531}
532
533impl DatagramSocket for UdpSocket {
534    #[cfg(unix)]
535    fn as_raw_io(&self) -> Option<BorrowedFd<'_>> {
536        Some(self.as_fd())
537    }
538
539    #[cfg(unix)]
540    fn into_fd(self) -> Option<OwnedFd> {
541        Some(into_owned_fd(self.into_std().ok()?))
542    }
543}
544
545impl DatagramSocketSend for UdpSocket {
546    #[inline]
547    fn poll_send(&self, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
548        UdpSocket::poll_send(self, cx, buf)
549    }
550
551    #[inline]
552    fn poll_send_to(
553        &self, cx: &mut Context, buf: &[u8], addr: SocketAddr,
554    ) -> Poll<io::Result<usize>> {
555        UdpSocket::poll_send_to(self, cx, buf, addr)
556    }
557
558    #[cfg(target_os = "linux")]
559    #[inline]
560    fn poll_send_many(
561        &self, cx: &mut Context, bufs: &[ReadBuf<'_>],
562    ) -> Poll<io::Result<usize>> {
563        crate::poll_sendmmsg!(self, cx, bufs)
564    }
565
566    fn as_udp_socket(&self) -> Option<&UdpSocket> {
567        Some(self)
568    }
569
570    fn peer_addr(&self) -> Option<SocketAddr> {
571        self.peer_addr().ok()
572    }
573}
574
575impl DatagramSocketRecv for UdpSocket {
576    #[inline]
577    fn poll_recv(
578        &mut self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>,
579    ) -> Poll<io::Result<()>> {
580        UdpSocket::poll_recv(self, cx, buf)
581    }
582
583    #[cfg(target_os = "linux")]
584    #[inline]
585    fn poll_recv_many(
586        &mut self, cx: &mut Context<'_>, bufs: &mut [ReadBuf<'_>],
587    ) -> Poll<io::Result<usize>> {
588        crate::poll_recvmmsg!(self, cx, bufs)
589    }
590
591    fn as_udp_socket(&self) -> Option<&UdpSocket> {
592        Some(self)
593    }
594}
595
596impl DatagramSocket for Arc<UdpSocket> {
597    #[cfg(unix)]
598    fn as_raw_io(&self) -> Option<BorrowedFd<'_>> {
599        Some(self.as_fd())
600    }
601
602    #[cfg(unix)]
603    fn into_fd(self) -> Option<OwnedFd> {
604        None
605    }
606}
607
608impl DatagramSocketRecv for Arc<UdpSocket> {
609    #[inline]
610    fn poll_recv(
611        &mut self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>,
612    ) -> Poll<io::Result<()>> {
613        UdpSocket::poll_recv(self, cx, buf)
614    }
615
616    #[inline]
617    fn poll_recv_from(
618        &mut self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>,
619    ) -> Poll<io::Result<SocketAddr>> {
620        UdpSocket::poll_recv_from(self, cx, buf)
621    }
622
623    #[cfg(target_os = "linux")]
624    #[inline]
625    fn poll_recv_many(
626        &mut self, cx: &mut Context<'_>, bufs: &mut [ReadBuf<'_>],
627    ) -> Poll<io::Result<usize>> {
628        crate::poll_recvmmsg!(self, cx, bufs)
629    }
630
631    fn as_udp_socket(&self) -> Option<&UdpSocket> {
632        Some(self)
633    }
634}
635
636#[cfg(unix)]
637impl DatagramSocket for UnixDatagram {
638    fn as_raw_io(&self) -> Option<BorrowedFd<'_>> {
639        Some(self.as_fd())
640    }
641
642    fn into_fd(self) -> Option<OwnedFd> {
643        Some(into_owned_fd(self.into_std().ok()?))
644    }
645}
646
647#[cfg(unix)]
648impl DatagramSocketSend for UnixDatagram {
649    #[inline]
650    fn poll_send(&self, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
651        UnixDatagram::poll_send(self, cx, buf)
652    }
653
654    #[inline]
655    fn poll_send_to(
656        &self, _: &mut Context, _: &[u8], _: SocketAddr,
657    ) -> Poll<io::Result<usize>> {
658        Poll::Ready(Err(io::Error::new(
659            io::ErrorKind::Unsupported,
660            "invalid address family",
661        )))
662    }
663
664    #[cfg(target_os = "linux")]
665    #[inline]
666    fn poll_send_many(
667        &self, cx: &mut Context, bufs: &[ReadBuf<'_>],
668    ) -> Poll<io::Result<usize>> {
669        crate::poll_sendmmsg!(self, cx, bufs)
670    }
671}
672
673#[cfg(unix)]
674impl DatagramSocketRecv for UnixDatagram {
675    #[inline]
676    fn poll_recv(
677        &mut self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>,
678    ) -> Poll<io::Result<()>> {
679        UnixDatagram::poll_recv(self, cx, buf)
680    }
681
682    #[cfg(target_os = "linux")]
683    #[inline]
684    fn poll_recv_many(
685        &mut self, cx: &mut Context<'_>, bufs: &mut [ReadBuf<'_>],
686    ) -> Poll<io::Result<usize>> {
687        crate::poll_recvmmsg!(self, cx, bufs)
688    }
689}
690
691#[cfg(unix)]
692impl DatagramSocketRecv for Arc<UnixDatagram> {
693    #[inline]
694    fn poll_recv(
695        &mut self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>,
696    ) -> Poll<io::Result<()>> {
697        UnixDatagram::poll_recv(self, cx, buf)
698    }
699
700    #[cfg(target_os = "linux")]
701    #[inline]
702    fn poll_recv_many(
703        &mut self, cx: &mut Context<'_>, bufs: &mut [ReadBuf<'_>],
704    ) -> Poll<io::Result<usize>> {
705        crate::poll_recvmmsg!(self, cx, bufs)
706    }
707}
708
709#[cfg(unix)]
712fn into_owned_fd<F: IntoRawFd>(into_fd: F) -> OwnedFd {
713    unsafe { OwnedFd::from_raw_fd(into_fd.into_raw_fd()) }
714}
715
716#[derive(Clone)]
736pub struct MaybeConnectedSocket<T> {
737    inner: T,
738    peer: Option<SocketAddr>,
739}
740
741impl<T: DatagramSocketSend> MaybeConnectedSocket<T> {
742    pub fn new(inner: T) -> Self {
743        Self {
744            peer: inner.peer_addr(),
745            inner,
746        }
747    }
748
749    pub fn inner(&self) -> &T {
752        &self.inner
753    }
754
755    pub fn into_inner(self) -> T {
757        self.inner
758    }
759}
760
761impl<T: DatagramSocketSend> DatagramSocketSend for MaybeConnectedSocket<T> {
762    #[inline]
763    fn poll_send(&self, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
764        self.inner.poll_send(cx, buf)
765    }
766
767    #[inline]
768    fn poll_send_to(
769        &self, cx: &mut Context, buf: &[u8], addr: SocketAddr,
770    ) -> Poll<io::Result<usize>> {
771        if let Some(peer) = self.peer {
772            debug_assert_eq!(peer, addr);
773            self.inner.poll_send(cx, buf)
774        } else {
775            self.inner.poll_send_to(cx, buf, addr)
776        }
777    }
778
779    #[inline]
780    fn poll_send_many(
781        &self, cx: &mut Context, bufs: &[ReadBuf<'_>],
782    ) -> Poll<io::Result<usize>> {
783        self.inner.poll_send_many(cx, bufs)
784    }
785
786    #[inline]
787    fn as_udp_socket(&self) -> Option<&UdpSocket> {
788        self.inner.as_udp_socket()
789    }
790
791    #[inline]
792    fn peer_addr(&self) -> Option<SocketAddr> {
793        self.peer
794    }
795}