about summary refs log tree commit diff
diff options
context:
space:
mode:
authorbors <bors@rust-lang.org>2020-09-11 21:54:31 +0000
committerbors <bors@rust-lang.org>2020-09-11 21:54:31 +0000
commit99111606fcda4fdb0646e4f7ee0f6cbcb76fb84a (patch)
tree51009f5f6a8807768441b04360e1e7891e617aa0
parentbc57bd8c7e3e28f8bed4fced3973bfe04949918f (diff)
parent64b8fd7920e177023364c7d9dada0d3f8fad3911 (diff)
downloadrust-99111606fcda4fdb0646e4f7ee0f6cbcb76fb84a.tar.gz
rust-99111606fcda4fdb0646e4f7ee0f6cbcb76fb84a.zip
Auto merge of #73761 - rijenkii:master, r=KodrAus
Add `peek` and `peek_from` to `UnixStream` and `UnixDatagram`

This is my first PR, so I'm sure I've done some things wrong.

This PR:
  * adds `peek` function to `UnixStream`;
  * adds `peek` and `peek_from` to `UnixDatagram`;
  * moves `UnixDatagram::recv_from` implementation to a private function `recv_from_flags`, as `peek_from` uses the same code, just with different flags.

I've taken the documentation from `TcpStream` and `UdpStream`, so it may or may not make sense (I'm bad with english words).
Also, I'm not sure what I should write in the `unstable` attribute, so I've made up the name and set the issue to "none".

Closes #68565.
-rw-r--r--library/std/src/sys/unix/ext/net.rs132
-rw-r--r--library/std/src/sys/unix/ext/net/tests.rs80
2 files changed, 192 insertions, 20 deletions
diff --git a/library/std/src/sys/unix/ext/net.rs b/library/std/src/sys/unix/ext/net.rs
index 0e07106f5ce..3fbd0cb58b1 100644
--- a/library/std/src/sys/unix/ext/net.rs
+++ b/library/std/src/sys/unix/ext/net.rs
@@ -594,6 +594,32 @@ impl UnixStream {
     pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
         self.0.shutdown(how)
     }
+
+    /// Receives data on the socket from the remote address to which it is
+    /// connected, without removing that data from the queue. On success,
+    /// returns the number of bytes peeked.
+    ///
+    /// Successive calls return the same data. This is accomplished by passing
+    /// `MSG_PEEK` as a flag to the underlying `recv` system call.
+    ///
+    /// # Examples
+    ///
+    /// ```no_run
+    /// #![feature(unix_socket_peek)]
+    ///
+    /// use std::os::unix::net::UnixStream;
+    ///
+    /// fn main() -> std::io::Result<()> {
+    ///     let socket = UnixStream::connect("/tmp/sock")?;
+    ///     let mut buf = [0; 10];
+    ///     let len = socket.peek(&mut buf).expect("peek failed");
+    ///     Ok(())
+    /// }
+    /// ```
+    #[unstable(feature = "unix_socket_peek", issue = "none")]
+    pub fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
+        self.0.peek(buf)
+    }
 }
 
 #[stable(feature = "unix_socket", since = "1.10.0")]
@@ -1291,6 +1317,33 @@ impl UnixDatagram {
         SocketAddr::new(|addr, len| unsafe { libc::getpeername(*self.0.as_inner(), addr, len) })
     }
 
+    fn recv_from_flags(
+        &self,
+        buf: &mut [u8],
+        flags: libc::c_int,
+    ) -> io::Result<(usize, SocketAddr)> {
+        let mut count = 0;
+        let addr = SocketAddr::new(|addr, len| unsafe {
+            count = libc::recvfrom(
+                *self.0.as_inner(),
+                buf.as_mut_ptr() as *mut _,
+                buf.len(),
+                flags,
+                addr,
+                len,
+            );
+            if count > 0 {
+                1
+            } else if count == 0 {
+                0
+            } else {
+                -1
+            }
+        })?;
+
+        Ok((count as usize, addr))
+    }
+
     /// Receives data from the socket.
     ///
     /// On success, returns the number of bytes read and the address from
@@ -1311,26 +1364,7 @@ impl UnixDatagram {
     /// ```
     #[stable(feature = "unix_socket", since = "1.10.0")]
     pub fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
-        let mut count = 0;
-        let addr = SocketAddr::new(|addr, len| unsafe {
-            count = libc::recvfrom(
-                *self.0.as_inner(),
-                buf.as_mut_ptr() as *mut _,
-                buf.len(),
-                0,
-                addr,
-                len,
-            );
-            if count > 0 {
-                1
-            } else if count == 0 {
-                0
-            } else {
-                -1
-            }
-        })?;
-
-        Ok((count as usize, addr))
+        self.recv_from_flags(buf, 0)
     }
 
     /// Receives data from the socket.
@@ -1601,6 +1635,64 @@ impl UnixDatagram {
     pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
         self.0.shutdown(how)
     }
+
+    /// Receives data on the socket from the remote address to which it is
+    /// connected, without removing that data from the queue. On success,
+    /// returns the number of bytes peeked.
+    ///
+    /// Successive calls return the same data. This is accomplished by passing
+    /// `MSG_PEEK` as a flag to the underlying `recv` system call.
+    ///
+    /// # Examples
+    ///
+    /// ```no_run
+    /// #![feature(unix_socket_peek)]
+    ///
+    /// use std::os::unix::net::UnixDatagram;
+    ///
+    /// fn main() -> std::io::Result<()> {
+    ///     let socket = UnixDatagram::bind("/tmp/sock")?;
+    ///     let mut buf = [0; 10];
+    ///     let len = socket.peek(&mut buf).expect("peek failed");
+    ///     Ok(())
+    /// }
+    /// ```
+    #[unstable(feature = "unix_socket_peek", issue = "none")]
+    pub fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
+        self.0.peek(buf)
+    }
+
+    /// Receives a single datagram message on the socket, without removing it from the
+    /// queue. On success, returns the number of bytes read and the origin.
+    ///
+    /// The function must be called with valid byte array `buf` of sufficient size to
+    /// hold the message bytes. If a message is too long to fit in the supplied buffer,
+    /// excess bytes may be discarded.
+    ///
+    /// Successive calls return the same data. This is accomplished by passing
+    /// `MSG_PEEK` as a flag to the underlying `recvfrom` system call.
+    ///
+    /// Do not use this function to implement busy waiting, instead use `libc::poll` to
+    /// synchronize IO events on one or more sockets.
+    ///
+    /// # Examples
+    ///
+    /// ```no_run
+    /// #![feature(unix_socket_peek)]
+    ///
+    /// use std::os::unix::net::UnixDatagram;
+    ///
+    /// fn main() -> std::io::Result<()> {
+    ///     let socket = UnixDatagram::bind("/tmp/sock")?;
+    ///     let mut buf = [0; 10];
+    ///     let (len, addr) = socket.peek_from(&mut buf).expect("peek failed");
+    ///     Ok(())
+    /// }
+    /// ```
+    #[unstable(feature = "unix_socket_peek", issue = "none")]
+    pub fn peek_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
+        self.recv_from_flags(buf, libc::MSG_PEEK)
+    }
 }
 
 #[stable(feature = "unix_socket", since = "1.10.0")]
diff --git a/library/std/src/sys/unix/ext/net/tests.rs b/library/std/src/sys/unix/ext/net/tests.rs
index be98766f0f3..ee73a6ed538 100644
--- a/library/std/src/sys/unix/ext/net/tests.rs
+++ b/library/std/src/sys/unix/ext/net/tests.rs
@@ -372,3 +372,83 @@ fn test_unix_datagram_timeout_zero_duration() {
 fn abstract_namespace_not_allowed() {
     assert!(UnixStream::connect("\0asdf").is_err());
 }
+
+#[test]
+fn test_unix_stream_peek() {
+    let (txdone, rxdone) = crate::sync::mpsc::channel();
+
+    let dir = tmpdir();
+    let path = dir.path().join("sock");
+
+    let listener = or_panic!(UnixListener::bind(&path));
+    let thread = thread::spawn(move || {
+        let mut stream = or_panic!(listener.accept()).0;
+        or_panic!(stream.write_all(&[1, 3, 3, 7]));
+        or_panic!(rxdone.recv());
+    });
+
+    let mut stream = or_panic!(UnixStream::connect(&path));
+    let mut buf = [0; 10];
+    for _ in 0..2 {
+        assert_eq!(or_panic!(stream.peek(&mut buf)), 4);
+    }
+    assert_eq!(or_panic!(stream.read(&mut buf)), 4);
+
+    or_panic!(stream.set_nonblocking(true));
+    match stream.peek(&mut buf) {
+        Ok(_) => panic!("expected error"),
+        Err(ref e) if e.kind() == ErrorKind::WouldBlock => {}
+        Err(e) => panic!("unexpected error: {}", e),
+    }
+
+    or_panic!(txdone.send(()));
+    thread.join().unwrap();
+}
+
+#[test]
+fn test_unix_datagram_peek() {
+    let dir = tmpdir();
+    let path1 = dir.path().join("sock");
+
+    let sock1 = or_panic!(UnixDatagram::bind(&path1));
+    let sock2 = or_panic!(UnixDatagram::unbound());
+    or_panic!(sock2.connect(&path1));
+
+    let msg = b"hello world";
+    or_panic!(sock2.send(msg));
+    for _ in 0..2 {
+        let mut buf = [0; 11];
+        let size = or_panic!(sock1.peek(&mut buf));
+        assert_eq!(size, 11);
+        assert_eq!(msg, &buf[..]);
+    }
+
+    let mut buf = [0; 11];
+    let size = or_panic!(sock1.recv(&mut buf));
+    assert_eq!(size, 11);
+    assert_eq!(msg, &buf[..]);
+}
+
+#[test]
+fn test_unix_datagram_peek_from() {
+    let dir = tmpdir();
+    let path1 = dir.path().join("sock");
+
+    let sock1 = or_panic!(UnixDatagram::bind(&path1));
+    let sock2 = or_panic!(UnixDatagram::unbound());
+    or_panic!(sock2.connect(&path1));
+
+    let msg = b"hello world";
+    or_panic!(sock2.send(msg));
+    for _ in 0..2 {
+        let mut buf = [0; 11];
+        let (size, _) = or_panic!(sock1.peek_from(&mut buf));
+        assert_eq!(size, 11);
+        assert_eq!(msg, &buf[..]);
+    }
+
+    let mut buf = [0; 11];
+    let size = or_panic!(sock1.recv(&mut buf));
+    assert_eq!(size, 11);
+    assert_eq!(msg, &buf[..]);
+}