1use futures_util::future::poll_fn;
28use futures_util::ready;
29use futures_util::FutureExt;
30use std::future::Future;
31use std::io;
32use std::net::Ipv4Addr;
33use std::net::SocketAddr;
34use std::net::SocketAddrV4;
35use std::sync::Arc;
36use std::task::Context;
37use std::task::Poll;
38use tokio::io::ReadBuf;
39use tokio::net::UdpSocket;
40use tokio::task::coop::unconstrained;
41
42#[cfg(unix)]
43use std::os::fd::AsFd;
44#[cfg(unix)]
45use std::os::fd::BorrowedFd;
46#[cfg(unix)]
47use std::os::fd::FromRawFd;
48#[cfg(unix)]
49use std::os::fd::IntoRawFd;
50#[cfg(unix)]
51use std::os::fd::OwnedFd;
52#[cfg(unix)]
53use tokio::net::UnixDatagram;
54
55use crate::socket_stats::AsSocketStats;
56
57pub const MAX_DATAGRAM_SIZE: usize = 1500;
61
62pub trait DatagramSocketWithStats: DatagramSocket {}
63
64impl<T> DatagramSocketWithStats for T where T: DatagramSocket + AsSocketStats {}
65
66pub trait RawPoolBufDatagramIo: Send {
67 fn poll_send_datagrams(
68 &mut self, cx: &mut Context, datagrams: &mut [crate::DgramBuffer],
69 ) -> Poll<io::Result<usize>>;
70
71 fn poll_recv_dgram(
72 &mut self, cx: &mut Context,
73 ) -> Poll<io::Result<crate::DgramBuffer>>;
74
75 fn poll_recv_datagrams(
76 &mut self, cx: &mut Context, buffer: &mut Vec<crate::DgramBuffer>,
77 dgram_limit: usize,
78 ) -> Poll<io::Result<usize>> {
79 for i in 0..dgram_limit {
80 match self.poll_recv_dgram(cx) {
81 Poll::Ready(Ok(buf)) => buffer.push(buf),
82 Poll::Ready(Err(err)) =>
83 if i > 0 {
84 return Poll::Ready(Ok(i));
85 } else {
86 return Poll::Ready(Err(err));
87 },
88 Poll::Pending =>
89 if i > 0 {
90 return Poll::Ready(Ok(i));
91 } else {
92 return Poll::Pending;
93 },
94 }
95 }
96
97 Poll::Ready(Ok(dgram_limit))
98 }
99}
100
101pub trait DatagramSocket:
112 DatagramSocketSend + DatagramSocketRecv + 'static
113{
114 #[cfg(unix)]
115 fn as_raw_io(&self) -> Option<BorrowedFd<'_>>;
116
117 #[cfg(unix)]
118 fn into_fd(self) -> Option<OwnedFd>;
119
120 fn as_buf_io(&mut self) -> Option<&mut dyn RawPoolBufDatagramIo> {
121 None
122 }
123}
124
125pub trait DatagramSocketSend: Sync {
127 fn poll_send(&self, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>>;
146
147 fn poll_send_to(
170 &self, cx: &mut Context, buf: &[u8], addr: SocketAddr,
171 ) -> Poll<io::Result<usize>>;
172
173 fn poll_send_many(
193 &self, cx: &mut Context, bufs: &[ReadBuf<'_>],
194 ) -> Poll<io::Result<usize>> {
195 let mut sent = 0;
196
197 for buf in bufs {
198 match self.poll_send(cx, buf.filled()) {
199 Poll::Ready(Ok(_)) => sent += 1,
200 Poll::Ready(err) => {
201 if sent == 0 {
202 return Poll::Ready(err);
203 }
204 break;
205 },
206 Poll::Pending => {
207 if sent == 0 {
208 return Poll::Pending;
209 }
210 break;
211 },
212 }
213 }
214
215 Poll::Ready(Ok(sent))
216 }
217
218 fn as_udp_socket(&self) -> Option<&UdpSocket> {
220 None
221 }
222
223 fn peer_addr(&self) -> Option<SocketAddr> {
226 None
227 }
228}
229
230pub trait DatagramSocketSendExt: DatagramSocketSend {
238 fn send(&self, buf: &[u8]) -> impl Future<Output = io::Result<usize>> {
241 poll_fn(move |cx| self.poll_send(cx, buf))
242 }
243
244 fn send_to(
247 &self, buf: &[u8], addr: SocketAddr,
248 ) -> impl Future<Output = io::Result<usize>> {
249 poll_fn(move |cx| self.poll_send_to(cx, buf, addr))
250 }
251
252 fn send_many(
256 &self, bufs: &[ReadBuf<'_>],
257 ) -> impl Future<Output = io::Result<usize>> {
258 poll_fn(move |cx| self.poll_send_many(cx, bufs))
259 }
260
261 fn try_send(&self, buf: &[u8]) -> io::Result<usize> {
262 match unconstrained(poll_fn(|cx| self.poll_send(cx, buf))).now_or_never()
263 {
264 Some(result) => result,
265 None => Err(io::ErrorKind::WouldBlock.into()),
266 }
267 }
268
269 fn try_send_many(&self, bufs: &[ReadBuf<'_>]) -> io::Result<usize> {
270 match unconstrained(poll_fn(|cx| self.poll_send_many(cx, bufs)))
271 .now_or_never()
272 {
273 Some(result) => result,
274 None => Err(io::ErrorKind::WouldBlock.into()),
275 }
276 }
277}
278
279pub trait DatagramSocketRecv: Send {
281 fn poll_recv(
300 &mut self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>,
301 ) -> Poll<io::Result<()>>;
302
303 fn poll_recv_from(
322 &mut self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>,
323 ) -> Poll<io::Result<SocketAddr>> {
324 self.poll_recv(cx, buf).map_ok(|_| {
325 SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0))
326 })
327 }
328
329 fn poll_recv_many(
349 &mut self, cx: &mut Context<'_>, bufs: &mut [ReadBuf<'_>],
350 ) -> Poll<io::Result<usize>> {
351 let mut read = 0;
352
353 for buf in bufs {
354 match self.poll_recv(cx, buf) {
355 Poll::Ready(Ok(())) => read += 1,
356
357 Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
358
359 Poll::Pending if read == 0 => return Poll::Pending,
362 Poll::Pending => break,
363 }
364 }
365
366 Poll::Ready(Ok(read))
367 }
368
369 fn as_udp_socket(&self) -> Option<&UdpSocket> {
371 None
372 }
373}
374
375pub trait DatagramSocketRecvExt: DatagramSocketRecv {
383 fn recv(
386 &mut self, buf: &mut [u8],
387 ) -> impl Future<Output = io::Result<usize>> + Send {
388 poll_fn(|cx| {
389 let mut buf = ReadBuf::new(buf);
390
391 ready!(self.poll_recv(cx, &mut buf)?);
392
393 Poll::Ready(Ok(buf.filled().len()))
394 })
395 }
396
397 fn recv_from(
400 &mut self, buf: &mut [u8],
401 ) -> impl Future<Output = io::Result<(usize, SocketAddr)>> + Send {
402 poll_fn(|cx| {
403 let mut buf = ReadBuf::new(buf);
404
405 let addr = ready!(self.poll_recv_from(cx, &mut buf)?);
406
407 Poll::Ready(Ok((buf.filled().len(), addr)))
408 })
409 }
410
411 fn recv_many(
416 &mut self, bufs: &mut [ReadBuf<'_>],
417 ) -> impl Future<Output = io::Result<usize>> + Send {
418 poll_fn(|cx| self.poll_recv_many(cx, bufs))
419 }
420}
421
422impl<T: DatagramSocketSend + ?Sized> DatagramSocketSendExt for T {}
423
424impl<T: DatagramSocketRecv + ?Sized> DatagramSocketRecvExt for T {}
425
426pub trait AsDatagramSocketSend {
430 type AsSend: DatagramSocketSend + ?Sized;
431
432 fn as_datagram_socket_send(&self) -> &Self::AsSend;
433}
434
435pub trait AsDatagramSocketRecv {
439 type AsRecv: DatagramSocketRecv + ?Sized;
440
441 fn as_datagram_socket_recv(&mut self) -> &mut Self::AsRecv;
442 fn as_shared_datagram_socket_recv(&self) -> &Self::AsRecv;
443}
444
445impl<T: AsDatagramSocketSend + Sync> DatagramSocketSend for T {
446 #[inline]
447 fn poll_send(&self, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
448 self.as_datagram_socket_send().poll_send(cx, buf)
449 }
450
451 #[inline]
452 fn poll_send_to(
453 &self, cx: &mut Context, buf: &[u8], addr: SocketAddr,
454 ) -> Poll<io::Result<usize>> {
455 self.as_datagram_socket_send().poll_send_to(cx, buf, addr)
456 }
457
458 #[inline]
459 fn poll_send_many(
460 &self, cx: &mut Context, bufs: &[ReadBuf<'_>],
461 ) -> Poll<io::Result<usize>> {
462 self.as_datagram_socket_send().poll_send_many(cx, bufs)
463 }
464
465 #[inline]
466 fn as_udp_socket(&self) -> Option<&UdpSocket> {
467 self.as_datagram_socket_send().as_udp_socket()
468 }
469
470 #[inline]
471 fn peer_addr(&self) -> Option<SocketAddr> {
472 self.as_datagram_socket_send().peer_addr()
473 }
474}
475
476impl<T: AsDatagramSocketRecv + Send> DatagramSocketRecv for T {
477 #[inline]
478 fn poll_recv(
479 &mut self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>,
480 ) -> Poll<io::Result<()>> {
481 self.as_datagram_socket_recv().poll_recv(cx, buf)
482 }
483
484 #[inline]
485 fn poll_recv_from(
486 &mut self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>,
487 ) -> Poll<io::Result<SocketAddr>> {
488 self.as_datagram_socket_recv().poll_recv_from(cx, buf)
489 }
490
491 #[inline]
492 fn poll_recv_many(
493 &mut self, cx: &mut Context<'_>, bufs: &mut [ReadBuf<'_>],
494 ) -> Poll<io::Result<usize>> {
495 self.as_datagram_socket_recv().poll_recv_many(cx, bufs)
496 }
497
498 #[inline]
499 fn as_udp_socket(&self) -> Option<&UdpSocket> {
500 self.as_shared_datagram_socket_recv().as_udp_socket()
501 }
502}
503
504impl<T> AsDatagramSocketSend for &mut T
505where
506 T: DatagramSocketSend + Send + ?Sized,
507{
508 type AsSend = T;
509
510 fn as_datagram_socket_send(&self) -> &Self::AsSend {
511 self
512 }
513}
514
515impl<T> AsDatagramSocketSend for Box<T>
516where
517 T: DatagramSocketSend + Send + ?Sized,
518{
519 type AsSend = T;
520
521 fn as_datagram_socket_send(&self) -> &Self::AsSend {
522 self
523 }
524}
525
526impl<T> AsDatagramSocketSend for Arc<T>
527where
528 T: DatagramSocketSend + Send + ?Sized,
529{
530 type AsSend = T;
531
532 fn as_datagram_socket_send(&self) -> &Self::AsSend {
533 self
534 }
535}
536
537impl<T> AsDatagramSocketRecv for &mut T
538where
539 T: DatagramSocketRecv + Send + ?Sized,
540{
541 type AsRecv = T;
542
543 fn as_datagram_socket_recv(&mut self) -> &mut Self::AsRecv {
544 self
545 }
546
547 fn as_shared_datagram_socket_recv(&self) -> &Self::AsRecv {
548 self
549 }
550}
551
552impl<T> AsDatagramSocketRecv for Box<T>
553where
554 T: DatagramSocketRecv + Send + ?Sized,
555{
556 type AsRecv = T;
557
558 fn as_datagram_socket_recv(&mut self) -> &mut Self::AsRecv {
559 self
560 }
561
562 fn as_shared_datagram_socket_recv(&self) -> &Self::AsRecv {
563 self
564 }
565}
566
567impl DatagramSocket for UdpSocket {
568 #[cfg(unix)]
569 fn as_raw_io(&self) -> Option<BorrowedFd<'_>> {
570 Some(self.as_fd())
571 }
572
573 #[cfg(unix)]
574 fn into_fd(self) -> Option<OwnedFd> {
575 Some(into_owned_fd(self.into_std().ok()?))
576 }
577}
578
579impl DatagramSocketSend for UdpSocket {
580 #[inline]
581 fn poll_send(&self, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
582 UdpSocket::poll_send(self, cx, buf)
583 }
584
585 #[inline]
586 fn poll_send_to(
587 &self, cx: &mut Context, buf: &[u8], addr: SocketAddr,
588 ) -> Poll<io::Result<usize>> {
589 UdpSocket::poll_send_to(self, cx, buf, addr)
590 }
591
592 #[cfg(target_os = "linux")]
593 #[inline]
594 fn poll_send_many(
595 &self, cx: &mut Context, bufs: &[ReadBuf<'_>],
596 ) -> Poll<io::Result<usize>> {
597 crate::poll_sendmmsg!(self, cx, bufs)
598 }
599
600 fn as_udp_socket(&self) -> Option<&UdpSocket> {
601 Some(self)
602 }
603
604 fn peer_addr(&self) -> Option<SocketAddr> {
605 self.peer_addr().ok()
606 }
607}
608
609impl DatagramSocketRecv for UdpSocket {
610 #[inline]
611 fn poll_recv(
612 &mut self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>,
613 ) -> Poll<io::Result<()>> {
614 UdpSocket::poll_recv(self, cx, buf)
615 }
616
617 #[cfg(target_os = "linux")]
618 #[inline]
619 fn poll_recv_many(
620 &mut self, cx: &mut Context<'_>, bufs: &mut [ReadBuf<'_>],
621 ) -> Poll<io::Result<usize>> {
622 crate::poll_recvmmsg!(self, cx, bufs)
623 }
624
625 fn as_udp_socket(&self) -> Option<&UdpSocket> {
626 Some(self)
627 }
628}
629
630impl DatagramSocket for Arc<UdpSocket> {
631 #[cfg(unix)]
632 fn as_raw_io(&self) -> Option<BorrowedFd<'_>> {
633 Some(self.as_fd())
634 }
635
636 #[cfg(unix)]
637 fn into_fd(self) -> Option<OwnedFd> {
638 None
639 }
640}
641
642impl DatagramSocketRecv for Arc<UdpSocket> {
643 #[inline]
644 fn poll_recv(
645 &mut self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>,
646 ) -> Poll<io::Result<()>> {
647 UdpSocket::poll_recv(self, cx, buf)
648 }
649
650 #[inline]
651 fn poll_recv_from(
652 &mut self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>,
653 ) -> Poll<io::Result<SocketAddr>> {
654 UdpSocket::poll_recv_from(self, cx, buf)
655 }
656
657 #[cfg(target_os = "linux")]
658 #[inline]
659 fn poll_recv_many(
660 &mut self, cx: &mut Context<'_>, bufs: &mut [ReadBuf<'_>],
661 ) -> Poll<io::Result<usize>> {
662 crate::poll_recvmmsg!(self, cx, bufs)
663 }
664
665 fn as_udp_socket(&self) -> Option<&UdpSocket> {
666 Some(self)
667 }
668}
669
670#[cfg(unix)]
671impl DatagramSocket for UnixDatagram {
672 fn as_raw_io(&self) -> Option<BorrowedFd<'_>> {
673 Some(self.as_fd())
674 }
675
676 fn into_fd(self) -> Option<OwnedFd> {
677 Some(into_owned_fd(self.into_std().ok()?))
678 }
679}
680
681#[cfg(unix)]
682impl DatagramSocketSend for UnixDatagram {
683 #[inline]
684 fn poll_send(&self, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
685 UnixDatagram::poll_send(self, cx, buf)
686 }
687
688 #[inline]
689 fn poll_send_to(
690 &self, _: &mut Context, _: &[u8], _: SocketAddr,
691 ) -> Poll<io::Result<usize>> {
692 Poll::Ready(Err(io::Error::new(
693 io::ErrorKind::Unsupported,
694 "invalid address family",
695 )))
696 }
697
698 #[cfg(target_os = "linux")]
699 #[inline]
700 fn poll_send_many(
701 &self, cx: &mut Context, bufs: &[ReadBuf<'_>],
702 ) -> Poll<io::Result<usize>> {
703 crate::poll_sendmmsg!(self, cx, bufs)
704 }
705}
706
707#[cfg(unix)]
708impl DatagramSocketRecv for UnixDatagram {
709 #[inline]
710 fn poll_recv(
711 &mut self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>,
712 ) -> Poll<io::Result<()>> {
713 UnixDatagram::poll_recv(self, cx, buf)
714 }
715
716 #[cfg(target_os = "linux")]
717 #[inline]
718 fn poll_recv_many(
719 &mut self, cx: &mut Context<'_>, bufs: &mut [ReadBuf<'_>],
720 ) -> Poll<io::Result<usize>> {
721 crate::poll_recvmmsg!(self, cx, bufs)
722 }
723}
724
725#[cfg(unix)]
726impl DatagramSocketRecv for Arc<UnixDatagram> {
727 #[inline]
728 fn poll_recv(
729 &mut self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>,
730 ) -> Poll<io::Result<()>> {
731 UnixDatagram::poll_recv(self, cx, buf)
732 }
733
734 #[cfg(target_os = "linux")]
735 #[inline]
736 fn poll_recv_many(
737 &mut self, cx: &mut Context<'_>, bufs: &mut [ReadBuf<'_>],
738 ) -> Poll<io::Result<usize>> {
739 crate::poll_recvmmsg!(self, cx, bufs)
740 }
741}
742
743#[cfg(unix)]
746fn into_owned_fd<F: IntoRawFd>(into_fd: F) -> OwnedFd {
747 unsafe { OwnedFd::from_raw_fd(into_fd.into_raw_fd()) }
748}
749
750#[derive(Clone)]
770pub struct MaybeConnectedSocket<T> {
771 inner: T,
772 peer: Option<SocketAddr>,
773}
774
775impl<T: DatagramSocketSend> MaybeConnectedSocket<T> {
776 pub fn new(inner: T) -> Self {
777 Self {
778 peer: inner.peer_addr(),
779 inner,
780 }
781 }
782
783 pub fn inner(&self) -> &T {
786 &self.inner
787 }
788
789 pub fn into_inner(self) -> T {
791 self.inner
792 }
793}
794
795impl<T: DatagramSocketSend> DatagramSocketSend for MaybeConnectedSocket<T> {
796 #[inline]
797 fn poll_send(&self, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
798 self.inner.poll_send(cx, buf)
799 }
800
801 #[inline]
802 fn poll_send_to(
803 &self, cx: &mut Context, buf: &[u8], addr: SocketAddr,
804 ) -> Poll<io::Result<usize>> {
805 if let Some(peer) = self.peer {
806 debug_assert_eq!(peer, addr);
807 self.inner.poll_send(cx, buf)
808 } else {
809 self.inner.poll_send_to(cx, buf, addr)
810 }
811 }
812
813 #[inline]
814 fn poll_send_many(
815 &self, cx: &mut Context, bufs: &[ReadBuf<'_>],
816 ) -> Poll<io::Result<usize>> {
817 self.inner.poll_send_many(cx, bufs)
818 }
819
820 #[inline]
821 fn as_udp_socket(&self) -> Option<&UdpSocket> {
822 self.inner.as_udp_socket()
823 }
824
825 #[inline]
826 fn peer_addr(&self) -> Option<SocketAddr> {
827 self.peer
828 }
829}