1use buffer_pool::RawPoolBufDatagramIo;
28use futures_util::future::poll_fn;
29use futures_util::ready;
30use futures_util::FutureExt;
31use std::future::Future;
32use std::io;
33use std::net::Ipv4Addr;
34use std::net::SocketAddr;
35use std::net::SocketAddrV4;
36use std::sync::Arc;
37use std::task::Context;
38use std::task::Poll;
39use tokio::io::ReadBuf;
40use tokio::net::UdpSocket;
41use tokio::task::coop::unconstrained;
42
43#[cfg(unix)]
44use std::os::fd::AsFd;
45#[cfg(unix)]
46use std::os::fd::BorrowedFd;
47#[cfg(unix)]
48use std::os::fd::FromRawFd;
49#[cfg(unix)]
50use std::os::fd::IntoRawFd;
51#[cfg(unix)]
52use std::os::fd::OwnedFd;
53#[cfg(unix)]
54use tokio::net::UnixDatagram;
55
56use crate::socket_stats::AsSocketStats;
57
58pub const MAX_DATAGRAM_SIZE: usize = 1500;
62
63pub trait DatagramSocketWithStats: DatagramSocket {}
64
65impl<T> DatagramSocketWithStats for T where T: DatagramSocket + AsSocketStats {}
66
67pub trait DatagramSocket:
78 DatagramSocketSend + DatagramSocketRecv + 'static
79{
80 #[cfg(unix)]
81 fn as_raw_io(&self) -> Option<BorrowedFd<'_>>;
82
83 #[cfg(unix)]
84 fn into_fd(self) -> Option<OwnedFd>;
85
86 fn as_buf_io(&mut self) -> Option<&mut dyn RawPoolBufDatagramIo> {
87 None
88 }
89}
90
91pub trait DatagramSocketSend: Sync {
93 fn poll_send(&self, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>>;
112
113 fn poll_send_to(
136 &self, cx: &mut Context, buf: &[u8], addr: SocketAddr,
137 ) -> Poll<io::Result<usize>>;
138
139 fn poll_send_many(
159 &self, cx: &mut Context, bufs: &[ReadBuf<'_>],
160 ) -> Poll<io::Result<usize>> {
161 let mut sent = 0;
162
163 for buf in bufs {
164 match self.poll_send(cx, buf.filled()) {
165 Poll::Ready(Ok(_)) => sent += 1,
166 Poll::Ready(err) => {
167 if sent == 0 {
168 return Poll::Ready(err);
169 }
170 break;
171 },
172 Poll::Pending => {
173 if sent == 0 {
174 return Poll::Pending;
175 }
176 break;
177 },
178 }
179 }
180
181 Poll::Ready(Ok(sent))
182 }
183
184 fn as_udp_socket(&self) -> Option<&UdpSocket> {
186 None
187 }
188
189 fn peer_addr(&self) -> Option<SocketAddr> {
192 None
193 }
194}
195
196pub trait DatagramSocketSendExt: DatagramSocketSend {
204 fn send(&self, buf: &[u8]) -> impl Future<Output = io::Result<usize>> {
207 poll_fn(move |cx| self.poll_send(cx, buf))
208 }
209
210 fn send_to(
213 &self, buf: &[u8], addr: SocketAddr,
214 ) -> impl Future<Output = io::Result<usize>> {
215 poll_fn(move |cx| self.poll_send_to(cx, buf, addr))
216 }
217
218 fn send_many(
222 &self, bufs: &[ReadBuf<'_>],
223 ) -> impl Future<Output = io::Result<usize>> {
224 poll_fn(move |cx| self.poll_send_many(cx, bufs))
225 }
226
227 fn try_send(&self, buf: &[u8]) -> io::Result<usize> {
228 match unconstrained(poll_fn(|cx| self.poll_send(cx, buf))).now_or_never()
229 {
230 Some(result) => result,
231 None => Err(io::ErrorKind::WouldBlock.into()),
232 }
233 }
234
235 fn try_send_many(&self, bufs: &[ReadBuf<'_>]) -> io::Result<usize> {
236 match unconstrained(poll_fn(|cx| self.poll_send_many(cx, bufs)))
237 .now_or_never()
238 {
239 Some(result) => result,
240 None => Err(io::ErrorKind::WouldBlock.into()),
241 }
242 }
243}
244
245pub trait DatagramSocketRecv: Send {
247 fn poll_recv(
266 &mut self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>,
267 ) -> Poll<io::Result<()>>;
268
269 fn poll_recv_from(
288 &mut self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>,
289 ) -> Poll<io::Result<SocketAddr>> {
290 self.poll_recv(cx, buf).map_ok(|_| {
291 SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0))
292 })
293 }
294
295 fn poll_recv_many(
315 &mut self, cx: &mut Context<'_>, bufs: &mut [ReadBuf<'_>],
316 ) -> Poll<io::Result<usize>> {
317 let mut read = 0;
318
319 for buf in bufs {
320 match self.poll_recv(cx, buf) {
321 Poll::Ready(Ok(())) => read += 1,
322
323 Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
324
325 Poll::Pending if read == 0 => return Poll::Pending,
328 Poll::Pending => break,
329 }
330 }
331
332 Poll::Ready(Ok(read))
333 }
334
335 fn as_udp_socket(&self) -> Option<&UdpSocket> {
337 None
338 }
339}
340
341pub trait DatagramSocketRecvExt: DatagramSocketRecv {
349 fn recv(
352 &mut self, buf: &mut [u8],
353 ) -> impl Future<Output = io::Result<usize>> + Send {
354 poll_fn(|cx| {
355 let mut buf = ReadBuf::new(buf);
356
357 ready!(self.poll_recv(cx, &mut buf)?);
358
359 Poll::Ready(Ok(buf.filled().len()))
360 })
361 }
362
363 fn recv_from(
366 &mut self, buf: &mut [u8],
367 ) -> impl Future<Output = io::Result<(usize, SocketAddr)>> + Send {
368 poll_fn(|cx| {
369 let mut buf = ReadBuf::new(buf);
370
371 let addr = ready!(self.poll_recv_from(cx, &mut buf)?);
372
373 Poll::Ready(Ok((buf.filled().len(), addr)))
374 })
375 }
376
377 fn recv_many(
382 &mut self, bufs: &mut [ReadBuf<'_>],
383 ) -> impl Future<Output = io::Result<usize>> + Send {
384 poll_fn(|cx| self.poll_recv_many(cx, bufs))
385 }
386}
387
388impl<T: DatagramSocketSend + ?Sized> DatagramSocketSendExt for T {}
389
390impl<T: DatagramSocketRecv + ?Sized> DatagramSocketRecvExt for T {}
391
392pub trait AsDatagramSocketSend {
396 type AsSend: DatagramSocketSend + ?Sized;
397
398 fn as_datagram_socket_send(&self) -> &Self::AsSend;
399}
400
401pub trait AsDatagramSocketRecv {
405 type AsRecv: DatagramSocketRecv + ?Sized;
406
407 fn as_datagram_socket_recv(&mut self) -> &mut Self::AsRecv;
408 fn as_shared_datagram_socket_recv(&self) -> &Self::AsRecv;
409}
410
411impl<T: AsDatagramSocketSend + Sync> DatagramSocketSend for T {
412 #[inline]
413 fn poll_send(&self, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
414 self.as_datagram_socket_send().poll_send(cx, buf)
415 }
416
417 #[inline]
418 fn poll_send_to(
419 &self, cx: &mut Context, buf: &[u8], addr: SocketAddr,
420 ) -> Poll<io::Result<usize>> {
421 self.as_datagram_socket_send().poll_send_to(cx, buf, addr)
422 }
423
424 #[inline]
425 fn poll_send_many(
426 &self, cx: &mut Context, bufs: &[ReadBuf<'_>],
427 ) -> Poll<io::Result<usize>> {
428 self.as_datagram_socket_send().poll_send_many(cx, bufs)
429 }
430
431 #[inline]
432 fn as_udp_socket(&self) -> Option<&UdpSocket> {
433 self.as_datagram_socket_send().as_udp_socket()
434 }
435
436 #[inline]
437 fn peer_addr(&self) -> Option<SocketAddr> {
438 self.as_datagram_socket_send().peer_addr()
439 }
440}
441
442impl<T: AsDatagramSocketRecv + Send> DatagramSocketRecv for T {
443 #[inline]
444 fn poll_recv(
445 &mut self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>,
446 ) -> Poll<io::Result<()>> {
447 self.as_datagram_socket_recv().poll_recv(cx, buf)
448 }
449
450 #[inline]
451 fn poll_recv_from(
452 &mut self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>,
453 ) -> Poll<io::Result<SocketAddr>> {
454 self.as_datagram_socket_recv().poll_recv_from(cx, buf)
455 }
456
457 #[inline]
458 fn poll_recv_many(
459 &mut self, cx: &mut Context<'_>, bufs: &mut [ReadBuf<'_>],
460 ) -> Poll<io::Result<usize>> {
461 self.as_datagram_socket_recv().poll_recv_many(cx, bufs)
462 }
463
464 #[inline]
465 fn as_udp_socket(&self) -> Option<&UdpSocket> {
466 self.as_shared_datagram_socket_recv().as_udp_socket()
467 }
468}
469
470impl<T> AsDatagramSocketSend for &mut T
471where
472 T: DatagramSocketSend + Send + ?Sized,
473{
474 type AsSend = T;
475
476 fn as_datagram_socket_send(&self) -> &Self::AsSend {
477 self
478 }
479}
480
481impl<T> AsDatagramSocketSend for Box<T>
482where
483 T: DatagramSocketSend + Send + ?Sized,
484{
485 type AsSend = T;
486
487 fn as_datagram_socket_send(&self) -> &Self::AsSend {
488 self
489 }
490}
491
492impl<T> AsDatagramSocketSend for Arc<T>
493where
494 T: DatagramSocketSend + Send + ?Sized,
495{
496 type AsSend = T;
497
498 fn as_datagram_socket_send(&self) -> &Self::AsSend {
499 self
500 }
501}
502
503impl<T> AsDatagramSocketRecv for &mut T
504where
505 T: DatagramSocketRecv + Send + ?Sized,
506{
507 type AsRecv = T;
508
509 fn as_datagram_socket_recv(&mut self) -> &mut Self::AsRecv {
510 self
511 }
512
513 fn as_shared_datagram_socket_recv(&self) -> &Self::AsRecv {
514 self
515 }
516}
517
518impl<T> AsDatagramSocketRecv for Box<T>
519where
520 T: DatagramSocketRecv + Send + ?Sized,
521{
522 type AsRecv = T;
523
524 fn as_datagram_socket_recv(&mut self) -> &mut Self::AsRecv {
525 self
526 }
527
528 fn as_shared_datagram_socket_recv(&self) -> &Self::AsRecv {
529 self
530 }
531}
532
533impl DatagramSocket for UdpSocket {
534 #[cfg(unix)]
535 fn as_raw_io(&self) -> Option<BorrowedFd<'_>> {
536 Some(self.as_fd())
537 }
538
539 #[cfg(unix)]
540 fn into_fd(self) -> Option<OwnedFd> {
541 Some(into_owned_fd(self.into_std().ok()?))
542 }
543}
544
545impl DatagramSocketSend for UdpSocket {
546 #[inline]
547 fn poll_send(&self, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
548 UdpSocket::poll_send(self, cx, buf)
549 }
550
551 #[inline]
552 fn poll_send_to(
553 &self, cx: &mut Context, buf: &[u8], addr: SocketAddr,
554 ) -> Poll<io::Result<usize>> {
555 UdpSocket::poll_send_to(self, cx, buf, addr)
556 }
557
558 #[cfg(target_os = "linux")]
559 #[inline]
560 fn poll_send_many(
561 &self, cx: &mut Context, bufs: &[ReadBuf<'_>],
562 ) -> Poll<io::Result<usize>> {
563 crate::poll_sendmmsg!(self, cx, bufs)
564 }
565
566 fn as_udp_socket(&self) -> Option<&UdpSocket> {
567 Some(self)
568 }
569
570 fn peer_addr(&self) -> Option<SocketAddr> {
571 self.peer_addr().ok()
572 }
573}
574
575impl DatagramSocketRecv for UdpSocket {
576 #[inline]
577 fn poll_recv(
578 &mut self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>,
579 ) -> Poll<io::Result<()>> {
580 UdpSocket::poll_recv(self, cx, buf)
581 }
582
583 #[cfg(target_os = "linux")]
584 #[inline]
585 fn poll_recv_many(
586 &mut self, cx: &mut Context<'_>, bufs: &mut [ReadBuf<'_>],
587 ) -> Poll<io::Result<usize>> {
588 crate::poll_recvmmsg!(self, cx, bufs)
589 }
590
591 fn as_udp_socket(&self) -> Option<&UdpSocket> {
592 Some(self)
593 }
594}
595
596impl DatagramSocket for Arc<UdpSocket> {
597 #[cfg(unix)]
598 fn as_raw_io(&self) -> Option<BorrowedFd<'_>> {
599 Some(self.as_fd())
600 }
601
602 #[cfg(unix)]
603 fn into_fd(self) -> Option<OwnedFd> {
604 None
605 }
606}
607
608impl DatagramSocketRecv for Arc<UdpSocket> {
609 #[inline]
610 fn poll_recv(
611 &mut self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>,
612 ) -> Poll<io::Result<()>> {
613 UdpSocket::poll_recv(self, cx, buf)
614 }
615
616 #[inline]
617 fn poll_recv_from(
618 &mut self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>,
619 ) -> Poll<io::Result<SocketAddr>> {
620 UdpSocket::poll_recv_from(self, cx, buf)
621 }
622
623 #[cfg(target_os = "linux")]
624 #[inline]
625 fn poll_recv_many(
626 &mut self, cx: &mut Context<'_>, bufs: &mut [ReadBuf<'_>],
627 ) -> Poll<io::Result<usize>> {
628 crate::poll_recvmmsg!(self, cx, bufs)
629 }
630
631 fn as_udp_socket(&self) -> Option<&UdpSocket> {
632 Some(self)
633 }
634}
635
636#[cfg(unix)]
637impl DatagramSocket for UnixDatagram {
638 fn as_raw_io(&self) -> Option<BorrowedFd<'_>> {
639 Some(self.as_fd())
640 }
641
642 fn into_fd(self) -> Option<OwnedFd> {
643 Some(into_owned_fd(self.into_std().ok()?))
644 }
645}
646
647#[cfg(unix)]
648impl DatagramSocketSend for UnixDatagram {
649 #[inline]
650 fn poll_send(&self, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
651 UnixDatagram::poll_send(self, cx, buf)
652 }
653
654 #[inline]
655 fn poll_send_to(
656 &self, _: &mut Context, _: &[u8], _: SocketAddr,
657 ) -> Poll<io::Result<usize>> {
658 Poll::Ready(Err(io::Error::new(
659 io::ErrorKind::Unsupported,
660 "invalid address family",
661 )))
662 }
663
664 #[cfg(target_os = "linux")]
665 #[inline]
666 fn poll_send_many(
667 &self, cx: &mut Context, bufs: &[ReadBuf<'_>],
668 ) -> Poll<io::Result<usize>> {
669 crate::poll_sendmmsg!(self, cx, bufs)
670 }
671}
672
673#[cfg(unix)]
674impl DatagramSocketRecv for UnixDatagram {
675 #[inline]
676 fn poll_recv(
677 &mut self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>,
678 ) -> Poll<io::Result<()>> {
679 UnixDatagram::poll_recv(self, cx, buf)
680 }
681
682 #[cfg(target_os = "linux")]
683 #[inline]
684 fn poll_recv_many(
685 &mut self, cx: &mut Context<'_>, bufs: &mut [ReadBuf<'_>],
686 ) -> Poll<io::Result<usize>> {
687 crate::poll_recvmmsg!(self, cx, bufs)
688 }
689}
690
691#[cfg(unix)]
692impl DatagramSocketRecv for Arc<UnixDatagram> {
693 #[inline]
694 fn poll_recv(
695 &mut self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>,
696 ) -> Poll<io::Result<()>> {
697 UnixDatagram::poll_recv(self, cx, buf)
698 }
699
700 #[cfg(target_os = "linux")]
701 #[inline]
702 fn poll_recv_many(
703 &mut self, cx: &mut Context<'_>, bufs: &mut [ReadBuf<'_>],
704 ) -> Poll<io::Result<usize>> {
705 crate::poll_recvmmsg!(self, cx, bufs)
706 }
707}
708
709#[cfg(unix)]
712fn into_owned_fd<F: IntoRawFd>(into_fd: F) -> OwnedFd {
713 unsafe { OwnedFd::from_raw_fd(into_fd.into_raw_fd()) }
714}
715
716#[derive(Clone)]
736pub struct MaybeConnectedSocket<T> {
737 inner: T,
738 peer: Option<SocketAddr>,
739}
740
741impl<T: DatagramSocketSend> MaybeConnectedSocket<T> {
742 pub fn new(inner: T) -> Self {
743 Self {
744 peer: inner.peer_addr(),
745 inner,
746 }
747 }
748
749 pub fn inner(&self) -> &T {
752 &self.inner
753 }
754
755 pub fn into_inner(self) -> T {
757 self.inner
758 }
759}
760
761impl<T: DatagramSocketSend> DatagramSocketSend for MaybeConnectedSocket<T> {
762 #[inline]
763 fn poll_send(&self, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
764 self.inner.poll_send(cx, buf)
765 }
766
767 #[inline]
768 fn poll_send_to(
769 &self, cx: &mut Context, buf: &[u8], addr: SocketAddr,
770 ) -> Poll<io::Result<usize>> {
771 if let Some(peer) = self.peer {
772 debug_assert_eq!(peer, addr);
773 self.inner.poll_send(cx, buf)
774 } else {
775 self.inner.poll_send_to(cx, buf, addr)
776 }
777 }
778
779 #[inline]
780 fn poll_send_many(
781 &self, cx: &mut Context, bufs: &[ReadBuf<'_>],
782 ) -> Poll<io::Result<usize>> {
783 self.inner.poll_send_many(cx, bufs)
784 }
785
786 #[inline]
787 fn as_udp_socket(&self) -> Option<&UdpSocket> {
788 self.inner.as_udp_socket()
789 }
790
791 #[inline]
792 fn peer_addr(&self) -> Option<SocketAddr> {
793 self.peer
794 }
795}