about summary refs log tree commit diff
diff options
context:
space:
mode:
authorRalf Jung <post@ralfj.de>2024-08-16 22:34:50 +0200
committerRalf Jung <post@ralfj.de>2024-08-17 11:32:17 +0200
commit99d742e9b0b7191543567faa67a04c65a4d1bcaf (patch)
tree1fe5f90afde2a40369d7f3df2b0cb8487c6e9ab6
parent78dfb8a10870689491c89db55baf1cc4688cb972 (diff)
downloadrust-99d742e9b0b7191543567faa67a04c65a4d1bcaf.tar.gz
rust-99d742e9b0b7191543567faa67a04c65a4d1bcaf.zip
implement pipe and pipe2
-rw-r--r--src/tools/miri/src/shims/unix/foreign_items.rs15
-rw-r--r--src/tools/miri/src/shims/unix/linux/epoll.rs5
-rw-r--r--src/tools/miri/src/shims/unix/mod.rs4
-rw-r--r--src/tools/miri/src/shims/unix/unnamed_socket.rs (renamed from src/tools/miri/src/shims/unix/socket.rs)104
-rw-r--r--src/tools/miri/tests/pass-dep/libc/libc-pipe.rs106
-rw-r--r--src/tools/miri/tests/pass-dep/libc/libc-socketpair.rs20
6 files changed, 223 insertions, 31 deletions
diff --git a/src/tools/miri/src/shims/unix/foreign_items.rs b/src/tools/miri/src/shims/unix/foreign_items.rs
index 6c35281ecf2..273a99b3116 100644
--- a/src/tools/miri/src/shims/unix/foreign_items.rs
+++ b/src/tools/miri/src/shims/unix/foreign_items.rs
@@ -288,14 +288,25 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
                 this.write_scalar(result, dest)?;
             }
 
-            // Sockets
+            // Unnamed sockets and pipes
             "socketpair" => {
                 let [domain, type_, protocol, sv] =
                     this.check_shim(abi, Abi::C { unwind: false }, link_name, args)?;
-
                 let result = this.socketpair(domain, type_, protocol, sv)?;
                 this.write_scalar(result, dest)?;
             }
+            "pipe" => {
+                let [pipefd] =
+                    this.check_shim(abi, Abi::C { unwind: false }, link_name, args)?;
+                let result = this.pipe2(pipefd, /*flags*/ None)?;
+                this.write_scalar(result, dest)?;
+            }
+            "pipe2" => {
+                let [pipefd, flags] =
+                    this.check_shim(abi, Abi::C { unwind: false }, link_name, args)?;
+                let result = this.pipe2(pipefd, Some(flags))?;
+                this.write_scalar(result, dest)?;
+            }
 
             // Time
             "gettimeofday" => {
diff --git a/src/tools/miri/src/shims/unix/linux/epoll.rs b/src/tools/miri/src/shims/unix/linux/epoll.rs
index 53f27868aeb..127817d5bbe 100644
--- a/src/tools/miri/src/shims/unix/linux/epoll.rs
+++ b/src/tools/miri/src/shims/unix/linux/epoll.rs
@@ -62,9 +62,10 @@ pub struct EpollEventInterest {
 
 /// EpollReadyEvents reflects the readiness of a file description.
 pub struct EpollReadyEvents {
-    /// The associated file is available for read(2) operations.
+    /// The associated file is available for read(2) operations, in the sense that a read will not block.
+    /// (I.e., returning EOF is considered "ready".)
     pub epollin: bool,
-    /// The associated file is available for write(2) operations.
+    /// The associated file is available for write(2) operations, in the sense that a write will not block.
     pub epollout: bool,
     /// Stream socket peer closed connection, or shut down writing
     /// half of connection.
diff --git a/src/tools/miri/src/shims/unix/mod.rs b/src/tools/miri/src/shims/unix/mod.rs
index 8cfa659d90a..7da6d7b02a2 100644
--- a/src/tools/miri/src/shims/unix/mod.rs
+++ b/src/tools/miri/src/shims/unix/mod.rs
@@ -4,9 +4,9 @@ mod env;
 mod fd;
 mod fs;
 mod mem;
-mod socket;
 mod sync;
 mod thread;
+mod unnamed_socket;
 
 mod android;
 mod freebsd;
@@ -23,9 +23,9 @@ pub use env::EvalContextExt as _;
 pub use fd::EvalContextExt as _;
 pub use fs::EvalContextExt as _;
 pub use mem::EvalContextExt as _;
-pub use socket::EvalContextExt as _;
 pub use sync::EvalContextExt as _;
 pub use thread::EvalContextExt as _;
+pub use unnamed_socket::EvalContextExt as _;
 
 // Make up some constants.
 const UID: u32 = 1000;
diff --git a/src/tools/miri/src/shims/unix/socket.rs b/src/tools/miri/src/shims/unix/unnamed_socket.rs
index 3f2adb6e79c..f8553e67992 100644
--- a/src/tools/miri/src/shims/unix/socket.rs
+++ b/src/tools/miri/src/shims/unix/unnamed_socket.rs
@@ -1,3 +1,7 @@
+//! This implements "anonymous" sockets, that do not correspond to anything on the host system and
+//! are entirely implemented inside Miri.
+//! We also use the same infrastructure to implement unnamed pipes.
+
 use std::cell::{OnceCell, RefCell};
 use std::collections::VecDeque;
 use std::io;
@@ -16,8 +20,9 @@ const MAX_SOCKETPAIR_BUFFER_CAPACITY: usize = 212992;
 /// Pair of connected sockets.
 #[derive(Debug)]
 struct SocketPair {
-    /// The buffer we are reading from.
-    readbuf: RefCell<Buffer>,
+    /// The buffer we are reading from, or `None` if this is the writing end of a pipe.
+    /// (In that case, the peer FD will be the reading end of that pipe.)
+    readbuf: Option<RefCell<Buffer>>,
     /// The `SocketPair` file descriptor that is our "peer", and that holds the buffer we are
     /// writing to. This is a weak reference because the other side may be closed before us; all
     /// future writes will then trigger EPIPE.
@@ -55,17 +60,25 @@ impl FileDescription for SocketPair {
         let mut epoll_ready_events = EpollReadyEvents::new();
 
         // Check if it is readable.
-        let readbuf = self.readbuf.borrow();
-        if !readbuf.buf.is_empty() {
+        if let Some(readbuf) = &self.readbuf {
+            if !readbuf.borrow().buf.is_empty() {
+                epoll_ready_events.epollin = true;
+            }
+        } else {
+            // Without a read buffer, reading never blocks, so we are always ready.
             epoll_ready_events.epollin = true;
         }
 
         // Check if is writable.
         if let Some(peer_fd) = self.peer_fd().upgrade() {
-            let writebuf = &peer_fd.downcast::<SocketPair>().unwrap().readbuf.borrow();
-            let data_size = writebuf.buf.len();
-            let available_space = MAX_SOCKETPAIR_BUFFER_CAPACITY.strict_sub(data_size);
-            if available_space != 0 {
+            if let Some(writebuf) = &peer_fd.downcast::<SocketPair>().unwrap().readbuf {
+                let data_size = writebuf.borrow().buf.len();
+                let available_space = MAX_SOCKETPAIR_BUFFER_CAPACITY.strict_sub(data_size);
+                if available_space != 0 {
+                    epoll_ready_events.epollout = true;
+                }
+            } else {
+                // Without a write buffer, writing never blocks.
                 epoll_ready_events.epollout = true;
             }
         } else {
@@ -108,7 +121,12 @@ impl FileDescription for SocketPair {
             return Ok(Ok(0));
         }
 
-        let mut readbuf = self.readbuf.borrow_mut();
+        let Some(readbuf) = &self.readbuf else {
+            // FIXME: This should return EBADF, but there's no nice way to do that as there's no
+            // corresponding ErrorKind variant.
+            throw_unsup_format!("reading from the write end of a pipe");
+        };
+        let mut readbuf = readbuf.borrow_mut();
         if readbuf.buf.is_empty() {
             if self.peer_fd().upgrade().is_none() {
                 // Socketpair with no peer and empty buffer.
@@ -176,7 +194,13 @@ impl FileDescription for SocketPair {
             // closed.
             return Ok(Err(Error::from(ErrorKind::BrokenPipe)));
         };
-        let mut writebuf = peer_fd.downcast::<SocketPair>().unwrap().readbuf.borrow_mut();
+
+        let Some(writebuf) = &peer_fd.downcast::<SocketPair>().unwrap().readbuf else {
+            // FIXME: This should return EBADF, but there's no nice way to do that as there's no
+            // corresponding ErrorKind variant.
+            throw_unsup_format!("writing to the reading end of a pipe");
+        };
+        let mut writebuf = writebuf.borrow_mut();
         let data_size = writebuf.buf.len();
         let available_space = MAX_SOCKETPAIR_BUFFER_CAPACITY.strict_sub(data_size);
         if available_space == 0 {
@@ -227,12 +251,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
 
         let mut is_sock_nonblock = false;
 
-        // Parse and remove the type flags that we support. If type != 0 after removing,
-        // unsupported flags are used.
-        if type_ & this.eval_libc_i32("SOCK_STREAM") == this.eval_libc_i32("SOCK_STREAM") {
-            type_ &= !(this.eval_libc_i32("SOCK_STREAM"));
-        }
-
+        // Parse and remove the type flags that we support.
         // SOCK_NONBLOCK only exists on Linux.
         if this.tcx.sess.target.os == "linux" {
             if type_ & this.eval_libc_i32("SOCK_NONBLOCK") == this.eval_libc_i32("SOCK_NONBLOCK") {
@@ -253,7 +272,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
                                  and AF_LOCAL are allowed",
                 domain
             );
-        } else if type_ != 0 {
+        } else if type_ != this.eval_libc_i32("SOCK_STREAM") {
             throw_unsup_format!(
                 "socketpair: type {:#x} is unsupported, only SOCK_STREAM, \
                                  SOCK_CLOEXEC and SOCK_NONBLOCK are allowed",
@@ -269,12 +288,12 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
         // Generate file descriptions.
         let fds = &mut this.machine.fds;
         let fd0 = fds.new_ref(SocketPair {
-            readbuf: RefCell::new(Buffer::new()),
+            readbuf: Some(RefCell::new(Buffer::new())),
             peer_fd: OnceCell::new(),
             is_nonblock: is_sock_nonblock,
         });
         let fd1 = fds.new_ref(SocketPair {
-            readbuf: RefCell::new(Buffer::new()),
+            readbuf: Some(RefCell::new(Buffer::new())),
             peer_fd: OnceCell::new(),
             is_nonblock: is_sock_nonblock,
         });
@@ -295,4 +314,51 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
 
         Ok(Scalar::from_i32(0))
     }
+
+    fn pipe2(
+        &mut self,
+        pipefd: &OpTy<'tcx>,
+        flags: Option<&OpTy<'tcx>>,
+    ) -> InterpResult<'tcx, Scalar> {
+        let this = self.eval_context_mut();
+
+        let pipefd = this.deref_pointer(pipefd)?;
+        let flags = match flags {
+            Some(flags) => this.read_scalar(flags)?.to_i32()?,
+            None => 0,
+        };
+
+        // As usual we ignore CLOEXEC.
+        let cloexec = this.eval_libc_i32("O_CLOEXEC");
+        if flags != 0 && flags != cloexec {
+            throw_unsup_format!("unsupported flags in `pipe2`");
+        }
+
+        // Generate file descriptions.
+        // pipefd[0] refers to the read end of the pipe.
+        let fds = &mut this.machine.fds;
+        let fd0 = fds.new_ref(SocketPair {
+            readbuf: Some(RefCell::new(Buffer::new())),
+            peer_fd: OnceCell::new(),
+            is_nonblock: false,
+        });
+        let fd1 =
+            fds.new_ref(SocketPair { readbuf: None, peer_fd: OnceCell::new(), is_nonblock: false });
+
+        // Make the file descriptions point to each other.
+        fd0.downcast::<SocketPair>().unwrap().peer_fd.set(fd1.downgrade()).unwrap();
+        fd1.downcast::<SocketPair>().unwrap().peer_fd.set(fd0.downgrade()).unwrap();
+
+        // Insert the file description to the fd table, generating the file descriptors.
+        let pipefd0 = fds.insert(fd0);
+        let pipefd1 = fds.insert(fd1);
+
+        // Return file descriptors to the caller.
+        let pipefd0 = Scalar::from_int(pipefd0, pipefd.layout.size);
+        let pipefd1 = Scalar::from_int(pipefd1, pipefd.layout.size);
+        this.write_scalar(pipefd0, &pipefd)?;
+        this.write_scalar(pipefd1, &pipefd.offset(pipefd.layout.size, pipefd.layout, this)?)?;
+
+        Ok(Scalar::from_i32(0))
+    }
 }
diff --git a/src/tools/miri/tests/pass-dep/libc/libc-pipe.rs b/src/tools/miri/tests/pass-dep/libc/libc-pipe.rs
new file mode 100644
index 00000000000..a57cad124b6
--- /dev/null
+++ b/src/tools/miri/tests/pass-dep/libc/libc-pipe.rs
@@ -0,0 +1,106 @@
+//@ignore-target-windows: No libc pipe on Windows
+// test_race depends on a deterministic schedule.
+//@compile-flags: -Zmiri-preemption-rate=0
+use std::thread;
+fn main() {
+    test_pipe();
+    test_pipe_threaded();
+    test_race();
+}
+
+fn test_pipe() {
+    let mut fds = [-1, -1];
+    let mut res = unsafe { libc::pipe(fds.as_mut_ptr()) };
+    assert_eq!(res, 0);
+
+    // Read size == data available in buffer.
+    let data = "12345".as_bytes().as_ptr();
+    res = unsafe { libc::write(fds[1], data as *const libc::c_void, 5).try_into().unwrap() };
+    assert_eq!(res, 5);
+    let mut buf3: [u8; 5] = [0; 5];
+    res = unsafe {
+        libc::read(fds[0], buf3.as_mut_ptr().cast(), buf3.len() as libc::size_t).try_into().unwrap()
+    };
+    assert_eq!(res, 5);
+    assert_eq!(buf3, "12345".as_bytes());
+
+    // Read size > data available in buffer.
+    let data = "123".as_bytes().as_ptr();
+    res = unsafe { libc::write(fds[1], data as *const libc::c_void, 3).try_into().unwrap() };
+    assert_eq!(res, 3);
+    let mut buf4: [u8; 5] = [0; 5];
+    res = unsafe {
+        libc::read(fds[0], buf4.as_mut_ptr().cast(), buf4.len() as libc::size_t).try_into().unwrap()
+    };
+    assert_eq!(res, 3);
+    assert_eq!(&buf4[0..3], "123".as_bytes());
+}
+
+fn test_pipe_threaded() {
+    let mut fds = [-1, -1];
+    let mut res = unsafe { libc::pipe(fds.as_mut_ptr()) };
+    assert_eq!(res, 0);
+
+    let thread1 = thread::spawn(move || {
+        let mut buf: [u8; 5] = [0; 5];
+        let res: i64 = unsafe {
+            libc::read(fds[0], buf.as_mut_ptr().cast(), buf.len() as libc::size_t)
+                .try_into()
+                .unwrap()
+        };
+        assert_eq!(res, 5);
+        assert_eq!(buf, "abcde".as_bytes());
+    });
+    // FIXME: we should yield here once blocking is implemented.
+    //thread::yield_now();
+    let data = "abcde".as_bytes().as_ptr();
+    res = unsafe { libc::write(fds[1], data as *const libc::c_void, 5).try_into().unwrap() };
+    assert_eq!(res, 5);
+    thread1.join().unwrap();
+
+    // Read and write from different direction
+    let thread2 = thread::spawn(move || {
+        // FIXME: we should yield here once blocking is implemented.
+        //thread::yield_now();
+        let data = "12345".as_bytes().as_ptr();
+        let res: i64 =
+            unsafe { libc::write(fds[1], data as *const libc::c_void, 5).try_into().unwrap() };
+        assert_eq!(res, 5);
+    });
+    // FIXME: we should not yield here once blocking is implemented.
+    thread::yield_now();
+    let mut buf: [u8; 5] = [0; 5];
+    res = unsafe {
+        libc::read(fds[0], buf.as_mut_ptr().cast(), buf.len() as libc::size_t).try_into().unwrap()
+    };
+    assert_eq!(res, 5);
+    assert_eq!(buf, "12345".as_bytes());
+    thread2.join().unwrap();
+}
+
+fn test_race() {
+    static mut VAL: u8 = 0;
+    let mut fds = [-1, -1];
+    let mut res = unsafe { libc::pipe(fds.as_mut_ptr()) };
+    assert_eq!(res, 0);
+    let thread1 = thread::spawn(move || {
+        let mut buf: [u8; 1] = [0; 1];
+        // write() from the main thread will occur before the read() here
+        // because preemption is disabled and the main thread yields after write().
+        let res: i32 = unsafe {
+            libc::read(fds[0], buf.as_mut_ptr().cast(), buf.len() as libc::size_t)
+                .try_into()
+                .unwrap()
+        };
+        assert_eq!(res, 1);
+        assert_eq!(buf, "a".as_bytes());
+        // The read above establishes a happens-before so it is now safe to access this global variable.
+        unsafe { assert_eq!(VAL, 1) };
+    });
+    unsafe { VAL = 1 };
+    let data = "a".as_bytes().as_ptr();
+    res = unsafe { libc::write(fds[1], data as *const libc::c_void, 1).try_into().unwrap() };
+    assert_eq!(res, 1);
+    thread::yield_now();
+    thread1.join().unwrap();
+}
diff --git a/src/tools/miri/tests/pass-dep/libc/libc-socketpair.rs b/src/tools/miri/tests/pass-dep/libc/libc-socketpair.rs
index 324c0127ee9..254be89d482 100644
--- a/src/tools/miri/tests/pass-dep/libc/libc-socketpair.rs
+++ b/src/tools/miri/tests/pass-dep/libc/libc-socketpair.rs
@@ -66,9 +66,6 @@ fn test_socketpair_threaded() {
         unsafe { libc::socketpair(libc::AF_UNIX, libc::SOCK_STREAM, 0, fds.as_mut_ptr()) };
     assert_eq!(res, 0);
 
-    let data = "abcde".as_bytes().as_ptr();
-    res = unsafe { libc::write(fds[0], data as *const libc::c_void, 5).try_into().unwrap() };
-    assert_eq!(res, 5);
     let thread1 = thread::spawn(move || {
         let mut buf: [u8; 5] = [0; 5];
         let res: i64 = unsafe {
@@ -79,23 +76,33 @@ fn test_socketpair_threaded() {
         assert_eq!(res, 5);
         assert_eq!(buf, "abcde".as_bytes());
     });
+    // FIXME: we should yield here once blocking is implemented.
+    //thread::yield_now();
+    let data = "abcde".as_bytes().as_ptr();
+    res = unsafe { libc::write(fds[0], data as *const libc::c_void, 5).try_into().unwrap() };
+    assert_eq!(res, 5);
     thread1.join().unwrap();
 
     // Read and write from different direction
     let thread2 = thread::spawn(move || {
+        // FIXME: we should yield here once blocking is implemented.
+        //thread::yield_now();
         let data = "12345".as_bytes().as_ptr();
         let res: i64 =
-            unsafe { libc::write(fds[0], data as *const libc::c_void, 5).try_into().unwrap() };
+            unsafe { libc::write(fds[1], data as *const libc::c_void, 5).try_into().unwrap() };
         assert_eq!(res, 5);
     });
-    thread2.join().unwrap();
+    // FIXME: we should not yield here once blocking is implemented.
+    thread::yield_now();
     let mut buf: [u8; 5] = [0; 5];
     res = unsafe {
-        libc::read(fds[1], buf.as_mut_ptr().cast(), buf.len() as libc::size_t).try_into().unwrap()
+        libc::read(fds[0], buf.as_mut_ptr().cast(), buf.len() as libc::size_t).try_into().unwrap()
     };
     assert_eq!(res, 5);
     assert_eq!(buf, "12345".as_bytes());
+    thread2.join().unwrap();
 }
+
 fn test_race() {
     static mut VAL: u8 = 0;
     let mut fds = [-1, -1];
@@ -113,6 +120,7 @@ fn test_race() {
         };
         assert_eq!(res, 1);
         assert_eq!(buf, "a".as_bytes());
+        // The read above establishes a happens-before so it is now safe to access this global variable.
         unsafe { assert_eq!(VAL, 1) };
     });
     unsafe { VAL = 1 };