about summary refs log tree commit diff
diff options
context:
space:
mode:
authorOli Scherer <github35764891676564198441@oli-obk.de>2024-12-27 13:13:19 +0000
committerGitHub <noreply@github.com>2024-12-27 13:13:19 +0000
commit5ccf10edf133ad673429ab4e7090459689c78b57 (patch)
tree2a03bd4a2ab5b44218291c5b3129804176408df6
parent3623dfd42b9a32a8ce7bbfe8141b574c4584afea (diff)
parent638351324975fc145cbdb0a30d48b9e3525c020a (diff)
downloadrust-5ccf10edf133ad673429ab4e7090459689c78b57.tar.gz
rust-5ccf10edf133ad673429ab4e7090459689c78b57.zip
Merge pull request #4112 from RalfJung/socket-cleanup
Socket read/write cleanup
-rw-r--r--src/tools/miri/src/shims/unix/linux_like/eventfd.rs55
-rw-r--r--src/tools/miri/src/shims/unix/unnamed_socket.rs158
-rw-r--r--src/tools/miri/tests/fail-dep/libc/socketpair-close-while-blocked.rs37
-rw-r--r--src/tools/miri/tests/fail-dep/libc/socketpair-close-while-blocked.stderr35
4 files changed, 169 insertions, 116 deletions
diff --git a/src/tools/miri/src/shims/unix/linux_like/eventfd.rs b/src/tools/miri/src/shims/unix/linux_like/eventfd.rs
index 4bbe417ea8d..ed81207f54e 100644
--- a/src/tools/miri/src/shims/unix/linux_like/eventfd.rs
+++ b/src/tools/miri/src/shims/unix/linux_like/eventfd.rs
@@ -62,11 +62,10 @@ impl FileDescription for Event {
             return ecx.set_last_error_and_return(ErrorKind::InvalidInput, dest);
         }
 
-        // eventfd read at the size of u64.
+        // Turn the pointer into a place at the right type.
         let buf_place = ecx.ptr_to_mplace_unaligned(ptr, ty);
 
-        let weak_eventfd = self_ref.downgrade();
-        eventfd_read(buf_place, dest, weak_eventfd, ecx)
+        eventfd_read(buf_place, dest, self_ref, ecx)
     }
 
     /// A write call adds the 8-byte integer value supplied in
@@ -97,18 +96,10 @@ impl FileDescription for Event {
             return ecx.set_last_error_and_return(ErrorKind::InvalidInput, dest);
         }
 
-        // Read the user-supplied value from the pointer.
+        // Turn the pointer into a place at the right type.
         let buf_place = ecx.ptr_to_mplace_unaligned(ptr, ty);
-        let num = ecx.read_scalar(&buf_place)?.to_u64()?;
 
-        // u64::MAX as input is invalid because the maximum value of counter is u64::MAX - 1.
-        if num == u64::MAX {
-            return ecx.set_last_error_and_return(ErrorKind::InvalidInput, dest);
-        }
-        // If the addition does not let the counter to exceed the maximum value, update the counter.
-        // Else, block.
-        let weak_eventfd = self_ref.downgrade();
-        eventfd_write(num, buf_place, dest, weak_eventfd, ecx)
+        eventfd_write(buf_place, dest, self_ref, ecx)
     }
 
     fn as_unix(&self) -> &dyn UnixFileDescription {
@@ -193,20 +184,22 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
 /// Block thread if the value addition will exceed u64::MAX -1,
 /// else just add the user-supplied value to current counter.
 fn eventfd_write<'tcx>(
-    num: u64,
     buf_place: MPlaceTy<'tcx>,
     dest: &MPlaceTy<'tcx>,
-    weak_eventfd: WeakFileDescriptionRef,
+    eventfd_ref: &FileDescriptionRef,
     ecx: &mut MiriInterpCx<'tcx>,
 ) -> InterpResult<'tcx> {
-    let Some(eventfd_ref) = weak_eventfd.upgrade() else {
-        throw_unsup_format!("eventfd FD got closed while blocking.")
-    };
-
     // Since we pass the weak file description ref, it is guaranteed to be
     // an eventfd file description.
     let eventfd = eventfd_ref.downcast::<Event>().unwrap();
 
+    // Figure out which value we should add.
+    let num = ecx.read_scalar(&buf_place)?.to_u64()?;
+    // u64::MAX as input is invalid because the maximum value of counter is u64::MAX - 1.
+    if num == u64::MAX {
+        return ecx.set_last_error_and_return(ErrorKind::InvalidInput, dest);
+    }
+
     match eventfd.counter.get().checked_add(num) {
         Some(new_count @ 0..=MAX_COUNTER) => {
             // Future `read` calls will synchronize with this write, so update the FD clock.
@@ -219,7 +212,7 @@ fn eventfd_write<'tcx>(
 
             // The state changed; we check and update the status of all supported event
             // types for current file description.
-            ecx.check_and_update_readiness(&eventfd_ref)?;
+            ecx.check_and_update_readiness(eventfd_ref)?;
 
             // Unblock *all* threads previously blocked on `read`.
             // We need to take out the blocked thread ids and unblock them together,
@@ -244,6 +237,7 @@ fn eventfd_write<'tcx>(
 
             eventfd.blocked_write_tid.borrow_mut().push(ecx.active_thread());
 
+            let weak_eventfd = eventfd_ref.downgrade();
             ecx.block_thread(
                 BlockReason::Eventfd,
                 None,
@@ -255,8 +249,10 @@ fn eventfd_write<'tcx>(
                         weak_eventfd: WeakFileDescriptionRef,
                     }
                     @unblock = |this| {
-                        // When we get unblocked, try again.
-                        eventfd_write(num, buf_place, &dest, weak_eventfd, this)
+                        // When we get unblocked, try again. We know the ref is still valid,
+                        // otherwise there couldn't be a `write` that unblocks us.
+                        let eventfd_ref = weak_eventfd.upgrade().unwrap();
+                        eventfd_write(buf_place, &dest, &eventfd_ref, this)
                     }
                 ),
             );
@@ -270,13 +266,9 @@ fn eventfd_write<'tcx>(
 fn eventfd_read<'tcx>(
     buf_place: MPlaceTy<'tcx>,
     dest: &MPlaceTy<'tcx>,
-    weak_eventfd: WeakFileDescriptionRef,
+    eventfd_ref: &FileDescriptionRef,
     ecx: &mut MiriInterpCx<'tcx>,
 ) -> InterpResult<'tcx> {
-    let Some(eventfd_ref) = weak_eventfd.upgrade() else {
-        throw_unsup_format!("eventfd FD got closed while blocking.")
-    };
-
     // Since we pass the weak file description ref to the callback function, it is guaranteed to be
     // an eventfd file description.
     let eventfd = eventfd_ref.downcast::<Event>().unwrap();
@@ -293,6 +285,7 @@ fn eventfd_read<'tcx>(
 
         eventfd.blocked_read_tid.borrow_mut().push(ecx.active_thread());
 
+        let weak_eventfd = eventfd_ref.downgrade();
         ecx.block_thread(
             BlockReason::Eventfd,
             None,
@@ -303,8 +296,10 @@ fn eventfd_read<'tcx>(
                     weak_eventfd: WeakFileDescriptionRef,
                 }
                 @unblock = |this| {
-                    // When we get unblocked, try again.
-                    eventfd_read(buf_place, &dest, weak_eventfd, this)
+                    // When we get unblocked, try again. We know the ref is still valid,
+                    // otherwise there couldn't be a `write` that unblocks us.
+                    let eventfd_ref = weak_eventfd.upgrade().unwrap();
+                    eventfd_read(buf_place, &dest, &eventfd_ref, this)
                 }
             ),
         );
@@ -317,7 +312,7 @@ fn eventfd_read<'tcx>(
 
         // The state changed; we check and update the status of all supported event
         // types for current file description.
-        ecx.check_and_update_readiness(&eventfd_ref)?;
+        ecx.check_and_update_readiness(eventfd_ref)?;
 
         // Unblock *all* threads previously blocked on `write`.
         // We need to take out the blocked thread ids and unblock them together,
diff --git a/src/tools/miri/src/shims/unix/unnamed_socket.rs b/src/tools/miri/src/shims/unix/unnamed_socket.rs
index 86ebe95762a..4285786f063 100644
--- a/src/tools/miri/src/shims/unix/unnamed_socket.rs
+++ b/src/tools/miri/src/shims/unix/unnamed_socket.rs
@@ -96,26 +96,7 @@ impl FileDescription for AnonSocket {
         dest: &MPlaceTy<'tcx>,
         ecx: &mut MiriInterpCx<'tcx>,
     ) -> InterpResult<'tcx> {
-        // Always succeed on read size 0.
-        if len == 0 {
-            return ecx.return_read_success(ptr, &[], 0, dest);
-        }
-
-        let Some(readbuf) = &self.readbuf else {
-            // FIXME: This should return EBADF, but there's no nice way to do that as there's no
-            // corresponding ErrorKind variant.
-            throw_unsup_format!("reading from the write end of a pipe");
-        };
-
-        if readbuf.borrow().buf.is_empty() && self.is_nonblock {
-            // Non-blocking socketpair with writer and empty buffer.
-            // https://linux.die.net/man/2/read
-            // EAGAIN or EWOULDBLOCK can be returned for socket,
-            // POSIX.1-2001 allows either error to be returned for this case.
-            // Since there is no ErrorKind for EAGAIN, WouldBlock is used.
-            return ecx.set_last_error_and_return(ErrorKind::WouldBlock, dest);
-        }
-        anonsocket_read(self_ref.downgrade(), len, ptr, dest.clone(), ecx)
+        anonsocket_read(self_ref, len, ptr, dest, ecx)
     }
 
     fn write<'tcx>(
@@ -127,31 +108,7 @@ impl FileDescription for AnonSocket {
         dest: &MPlaceTy<'tcx>,
         ecx: &mut MiriInterpCx<'tcx>,
     ) -> InterpResult<'tcx> {
-        // Always succeed on write size 0.
-        // ("If count is zero and fd refers to a file other than a regular file, the results are not specified.")
-        if len == 0 {
-            return ecx.return_write_success(0, dest);
-        }
-
-        // We are writing to our peer's readbuf.
-        let Some(peer_fd) = self.peer_fd().upgrade() else {
-            // If the upgrade from Weak to Rc fails, it indicates that all read ends have been
-            // closed.
-            return ecx.set_last_error_and_return(ErrorKind::BrokenPipe, dest);
-        };
-
-        let Some(writebuf) = &peer_fd.downcast::<AnonSocket>().unwrap().readbuf else {
-            // FIXME: This should return EBADF, but there's no nice way to do that as there's no
-            // corresponding ErrorKind variant.
-            throw_unsup_format!("writing to the reading end of a pipe");
-        };
-        let available_space =
-            MAX_SOCKETPAIR_BUFFER_CAPACITY.strict_sub(writebuf.borrow().buf.len());
-        if available_space == 0 && self.is_nonblock {
-            // Non-blocking socketpair with a full buffer.
-            return ecx.set_last_error_and_return(ErrorKind::WouldBlock, dest);
-        }
-        anonsocket_write(self_ref.downgrade(), ptr, len, dest.clone(), ecx)
+        anonsocket_write(self_ref, ptr, len, dest, ecx)
     }
 
     fn as_unix(&self) -> &dyn UnixFileDescription {
@@ -161,50 +118,65 @@ impl FileDescription for AnonSocket {
 
 /// Write to AnonSocket based on the space available and return the written byte size.
 fn anonsocket_write<'tcx>(
-    weak_self_ref: WeakFileDescriptionRef,
+    self_ref: &FileDescriptionRef,
     ptr: Pointer,
     len: usize,
-    dest: MPlaceTy<'tcx>,
+    dest: &MPlaceTy<'tcx>,
     ecx: &mut MiriInterpCx<'tcx>,
 ) -> InterpResult<'tcx> {
-    let Some(self_ref) = weak_self_ref.upgrade() else {
-        // FIXME:  We should raise a deadlock error if the self_ref upgrade failed.
-        throw_unsup_format!("This will be a deadlock error in future")
-    };
     let self_anonsocket = self_ref.downcast::<AnonSocket>().unwrap();
+
+    // Always succeed on write size 0.
+    // ("If count is zero and fd refers to a file other than a regular file, the results are not specified.")
+    if len == 0 {
+        return ecx.return_write_success(0, dest);
+    }
+
+    // We are writing to our peer's readbuf.
     let Some(peer_fd) = self_anonsocket.peer_fd().upgrade() else {
         // If the upgrade from Weak to Rc fails, it indicates that all read ends have been
-        // closed.
-        return ecx.set_last_error_and_return(ErrorKind::BrokenPipe, &dest);
+        // closed. It is an error to write even if there would be space.
+        return ecx.set_last_error_and_return(ErrorKind::BrokenPipe, dest);
     };
+
     let Some(writebuf) = &peer_fd.downcast::<AnonSocket>().unwrap().readbuf else {
-        // FIXME: This should return EBADF, but there's no nice way to do that as there's no
-        // corresponding ErrorKind variant.
-        throw_unsup_format!("writing to the reading end of a pipe")
+        // Writing to the read end of a pipe.
+        return ecx.set_last_error_and_return(IoError::LibcError("EBADF"), dest);
     };
 
+    // Let's see if we can write.
     let available_space = MAX_SOCKETPAIR_BUFFER_CAPACITY.strict_sub(writebuf.borrow().buf.len());
-
     if available_space == 0 {
-        // Blocking socketpair with a full buffer.
-        let dest = dest.clone();
-        self_anonsocket.blocked_write_tid.borrow_mut().push(ecx.active_thread());
-        ecx.block_thread(
-            BlockReason::UnnamedSocket,
-            None,
-            callback!(
-                @capture<'tcx> {
-                    weak_self_ref: WeakFileDescriptionRef,
-                    ptr: Pointer,
-                    len: usize,
-                    dest: MPlaceTy<'tcx>,
-                }
-                @unblock = |this| {
-                    anonsocket_write(weak_self_ref, ptr, len, dest, this)
-                }
-            ),
-        );
+        if self_anonsocket.is_nonblock {
+            // Non-blocking socketpair with a full buffer.
+            return ecx.set_last_error_and_return(ErrorKind::WouldBlock, dest);
+        } else {
+            // Blocking socketpair with a full buffer.
+            // Block the current thread; only keep a weak ref for this.
+            let weak_self_ref = self_ref.downgrade();
+            let dest = dest.clone();
+            self_anonsocket.blocked_write_tid.borrow_mut().push(ecx.active_thread());
+            ecx.block_thread(
+                BlockReason::UnnamedSocket,
+                None,
+                callback!(
+                    @capture<'tcx> {
+                        weak_self_ref: WeakFileDescriptionRef,
+                        ptr: Pointer,
+                        len: usize,
+                        dest: MPlaceTy<'tcx>,
+                    }
+                    @unblock = |this| {
+                        // If we got unblocked, then our peer successfully upgraded its weak
+                        // ref to us. That means we can also upgrade our weak ref.
+                        let self_ref = weak_self_ref.upgrade().unwrap();
+                        anonsocket_write(&self_ref, ptr, len, &dest, this)
+                    }
+                ),
+            );
+        }
     } else {
+        // There is space to write!
         let mut writebuf = writebuf.borrow_mut();
         // Remember this clock so `read` can synchronize with us.
         ecx.release_clock(|clock| {
@@ -229,25 +201,26 @@ fn anonsocket_write<'tcx>(
             ecx.unblock_thread(thread_id, BlockReason::UnnamedSocket)?;
         }
 
-        return ecx.return_write_success(actual_write_size, &dest);
+        return ecx.return_write_success(actual_write_size, dest);
     }
     interp_ok(())
 }
 
 /// Read from AnonSocket and return the number of bytes read.
 fn anonsocket_read<'tcx>(
-    weak_self_ref: WeakFileDescriptionRef,
+    self_ref: &FileDescriptionRef,
     len: usize,
     ptr: Pointer,
-    dest: MPlaceTy<'tcx>,
+    dest: &MPlaceTy<'tcx>,
     ecx: &mut MiriInterpCx<'tcx>,
 ) -> InterpResult<'tcx> {
-    let Some(self_ref) = weak_self_ref.upgrade() else {
-        // FIXME:  We should raise a deadlock error if the self_ref upgrade failed.
-        throw_unsup_format!("This will be a deadlock error in future")
-    };
     let self_anonsocket = self_ref.downcast::<AnonSocket>().unwrap();
 
+    // Always succeed on read size 0.
+    if len == 0 {
+        return ecx.return_read_success(ptr, &[], 0, dest);
+    }
+
     let Some(readbuf) = &self_anonsocket.readbuf else {
         // FIXME: This should return EBADF, but there's no nice way to do that as there's no
         // corresponding ErrorKind variant.
@@ -258,10 +231,19 @@ fn anonsocket_read<'tcx>(
         if self_anonsocket.peer_fd().upgrade().is_none() {
             // Socketpair with no peer and empty buffer.
             // 0 bytes successfully read indicates end-of-file.
-            return ecx.return_read_success(ptr, &[], 0, &dest);
+            return ecx.return_read_success(ptr, &[], 0, dest);
+        } else if self_anonsocket.is_nonblock {
+            // Non-blocking socketpair with writer and empty buffer.
+            // https://linux.die.net/man/2/read
+            // EAGAIN or EWOULDBLOCK can be returned for socket,
+            // POSIX.1-2001 allows either error to be returned for this case.
+            // Since there is no ErrorKind for EAGAIN, WouldBlock is used.
+            return ecx.set_last_error_and_return(ErrorKind::WouldBlock, dest);
         } else {
             // Blocking socketpair with writer and empty buffer.
-            let weak_self_ref = weak_self_ref.clone();
+            // Block the current thread; only keep a weak ref for this.
+            let weak_self_ref = self_ref.downgrade();
+            let dest = dest.clone();
             self_anonsocket.blocked_read_tid.borrow_mut().push(ecx.active_thread());
             ecx.block_thread(
                 BlockReason::UnnamedSocket,
@@ -274,12 +256,16 @@ fn anonsocket_read<'tcx>(
                         dest: MPlaceTy<'tcx>,
                     }
                     @unblock = |this| {
-                        anonsocket_read(weak_self_ref, len, ptr, dest, this)
+                        // If we got unblocked, then our peer successfully upgraded its weak
+                        // ref to us. That means we can also upgrade our weak ref.
+                        let self_ref = weak_self_ref.upgrade().unwrap();
+                        anonsocket_read(&self_ref, len, ptr, &dest, this)
                     }
                 ),
             );
         }
     } else {
+        // There's data to be read!
         let mut bytes = vec![0; len];
         let mut readbuf = readbuf.borrow_mut();
         // Synchronize with all previous writes to this buffer.
@@ -313,7 +299,7 @@ fn anonsocket_read<'tcx>(
             }
         };
 
-        return ecx.return_read_success(ptr, &bytes, actual_read_size, &dest);
+        return ecx.return_read_success(ptr, &bytes, actual_read_size, dest);
     }
     interp_ok(())
 }
diff --git a/src/tools/miri/tests/fail-dep/libc/socketpair-close-while-blocked.rs b/src/tools/miri/tests/fail-dep/libc/socketpair-close-while-blocked.rs
new file mode 100644
index 00000000000..8413e118819
--- /dev/null
+++ b/src/tools/miri/tests/fail-dep/libc/socketpair-close-while-blocked.rs
@@ -0,0 +1,37 @@
+//! This is a regression test for <https://github.com/rust-lang/miri/issues/3947>: we had some
+//! faulty logic around `release_clock` that led to this code not reporting a data race.
+//~^^ERROR: deadlock
+//@ignore-target: windows # no libc socketpair on Windows
+//@compile-flags: -Zmiri-preemption-rate=0 -Zmiri-address-reuse-rate=0
+//@error-in-other-file: deadlock
+use std::thread;
+
+fn main() {
+    let mut fds = [-1, -1];
+    let res = unsafe { libc::socketpair(libc::AF_UNIX, libc::SOCK_STREAM, 0, fds.as_mut_ptr()) };
+    assert_eq!(res, 0);
+
+    let thread1 = thread::spawn(move || {
+        let mut buf: [u8; 1] = [0; 1];
+        let _res: i32 = unsafe {
+            libc::read(fds[1], buf.as_mut_ptr().cast(), buf.len() as libc::size_t) //~ERROR: deadlock
+                .try_into()
+                .unwrap()
+        };
+    });
+    let thread2 = thread::spawn(move || {
+        // Close the FD that the other thread is blocked on.
+        unsafe { libc::close(fds[1]) };
+    });
+
+    // Run the other threads.
+    thread::yield_now();
+
+    // When they are both done, continue here.
+    let data = "a".as_bytes().as_ptr();
+    let res = unsafe { libc::write(fds[0], data as *const libc::c_void, 1) };
+    assert_eq!(res, -1);
+
+    thread1.join().unwrap();
+    thread2.join().unwrap();
+}
diff --git a/src/tools/miri/tests/fail-dep/libc/socketpair-close-while-blocked.stderr b/src/tools/miri/tests/fail-dep/libc/socketpair-close-while-blocked.stderr
new file mode 100644
index 00000000000..fe196f5d7d7
--- /dev/null
+++ b/src/tools/miri/tests/fail-dep/libc/socketpair-close-while-blocked.stderr
@@ -0,0 +1,35 @@
+error: deadlock: the evaluated program deadlocked
+  --> RUSTLIB/std/src/sys/pal/PLATFORM/thread.rs:LL:CC
+   |
+LL |         let ret = unsafe { libc::pthread_join(id, ptr::null_mut()) };
+   |                                                                  ^ the evaluated program deadlocked
+   |
+   = note: BACKTRACE:
+   = note: inside `std::sys::pal::PLATFORM::thread::Thread::join` at RUSTLIB/std/src/sys/pal/PLATFORM/thread.rs:LL:CC
+   = note: inside `std::thread::JoinInner::<'_, ()>::join` at RUSTLIB/std/src/thread/mod.rs:LL:CC
+   = note: inside `std::thread::JoinHandle::<()>::join` at RUSTLIB/std/src/thread/mod.rs:LL:CC
+note: inside `main`
+  --> tests/fail-dep/libc/socketpair-close-while-blocked.rs:LL:CC
+   |
+LL |     thread1.join().unwrap();
+   |     ^^^^^^^^^^^^^^
+
+error: deadlock: the evaluated program deadlocked
+  --> tests/fail-dep/libc/socketpair-close-while-blocked.rs:LL:CC
+   |
+LL |             libc::read(fds[1], buf.as_mut_ptr().cast(), buf.len() as libc::size_t)
+   |                                                                                  ^ the evaluated program deadlocked
+   |
+   = note: BACKTRACE on thread `unnamed-ID`:
+   = note: inside closure at tests/fail-dep/libc/socketpair-close-while-blocked.rs:LL:CC
+
+error: deadlock: the evaluated program deadlocked
+   |
+   = note: the evaluated program deadlocked
+   = note: (no span available)
+   = note: BACKTRACE on thread `unnamed-ID`:
+
+note: some details are omitted, run with `MIRIFLAGS=-Zmiri-backtrace=full` for a verbose backtrace
+
+error: aborting due to 3 previous errors
+