about summary refs log tree commit diff
diff options
context:
space:
mode:
authorRalf Jung <post@ralfj.de>2024-10-12 16:08:06 +0200
committerRalf Jung <post@ralfj.de>2024-11-10 11:01:57 +0100
commitd1a481216487378361a2da130bb4de3c00e2aaad (patch)
treea0a42281656946f37b464925b873bb8180b94bf4
parenta839fbf0a11bcfc2d594daeab3262f540d4b6178 (diff)
downloadrust-d1a481216487378361a2da130bb4de3c00e2aaad.tar.gz
rust-d1a481216487378361a2da130bb4de3c00e2aaad.zip
store futexes in per-allocation data rather than globally
-rw-r--r--src/tools/miri/src/concurrency/sync.rs65
-rw-r--r--src/tools/miri/src/concurrency/thread.rs2
-rw-r--r--src/tools/miri/src/shims/unix/linux/sync.rs62
-rw-r--r--src/tools/miri/src/shims/windows/sync.rs41
-rw-r--r--src/tools/miri/tests/pass-dep/concurrency/linux-futex.rs7
5 files changed, 122 insertions, 55 deletions
diff --git a/src/tools/miri/src/concurrency/sync.rs b/src/tools/miri/src/concurrency/sync.rs
index 78e5ad5deb2..02e8261a6ed 100644
--- a/src/tools/miri/src/concurrency/sync.rs
+++ b/src/tools/miri/src/concurrency/sync.rs
@@ -1,6 +1,8 @@
+use std::cell::RefCell;
 use std::collections::VecDeque;
 use std::collections::hash_map::Entry;
 use std::ops::Not;
+use std::rc::Rc;
 use std::time::Duration;
 
 use rustc_abi::Size;
@@ -121,6 +123,15 @@ struct Futex {
     clock: VClock,
 }
 
+#[derive(Default, Clone)]
+pub struct FutexRef(Rc<RefCell<Futex>>);
+
+impl VisitProvenance for FutexRef {
+    fn visit_provenance(&self, _visit: &mut VisitWith<'_>) {
+        // No provenance in `Futex`.
+    }
+}
+
 /// A thread waiting on a futex.
 #[derive(Debug)]
 struct FutexWaiter {
@@ -137,9 +148,6 @@ pub struct SynchronizationObjects {
     rwlocks: IndexVec<RwLockId, RwLock>,
     condvars: IndexVec<CondvarId, Condvar>,
     pub(super) init_onces: IndexVec<InitOnceId, InitOnce>,
-
-    /// Futex info for the futex at the given address.
-    futexes: FxHashMap<u64, Futex>,
 }
 
 // Private extension trait for local helper methods
@@ -184,7 +192,7 @@ impl SynchronizationObjects {
 }
 
 impl<'tcx> AllocExtra<'tcx> {
-    pub fn get_sync<T: 'static>(&self, offset: Size) -> Option<&T> {
+    fn get_sync<T: 'static>(&self, offset: Size) -> Option<&T> {
         self.sync.get(&offset).and_then(|s| s.downcast_ref::<T>())
     }
 }
@@ -273,27 +281,32 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
 
     /// Get the synchronization primitive associated with the given pointer,
     /// or initialize a new one.
+    ///
+    /// Return `None` if this pointer does not point to at least 1 byte of mutable memory.
     fn get_sync_or_init<'a, T: 'static>(
         &'a mut self,
         ptr: Pointer,
-        new: impl FnOnce(&'a mut MiriMachine<'tcx>) -> InterpResult<'tcx, T>,
-    ) -> InterpResult<'tcx, &'a T>
+        new: impl FnOnce(&'a mut MiriMachine<'tcx>) -> T,
+    ) -> Option<&'a T>
     where
         'tcx: 'a,
     {
         let this = self.eval_context_mut();
-        // Ensure there is memory behind this pointer, so that this allocation
-        // is truly the only place where the data could be stored.
-        this.check_ptr_access(ptr, Size::from_bytes(1), CheckInAllocMsg::InboundsTest)?;
-
-        let (alloc, offset, _) = this.ptr_get_alloc_id(ptr, 0)?;
-        let (alloc_extra, machine) = this.get_alloc_extra_mut(alloc)?;
+        if !this.ptr_try_get_alloc_id(ptr, 0).ok().is_some_and(|(alloc_id, offset, ..)| {
+            let info = this.get_alloc_info(alloc_id);
+            info.kind == AllocKind::LiveData && info.mutbl.is_mut() && offset < info.size
+        }) {
+            return None;
+        }
+        // This cannot fail now.
+        let (alloc, offset, _) = this.ptr_get_alloc_id(ptr, 0).unwrap();
+        let (alloc_extra, machine) = this.get_alloc_extra_mut(alloc).unwrap();
         // Due to borrow checker reasons, we have to do the lookup twice.
         if alloc_extra.get_sync::<T>(offset).is_none() {
-            let new = new(machine)?;
+            let new = new(machine);
             alloc_extra.sync.insert(offset, Box::new(new));
         }
-        interp_ok(alloc_extra.get_sync::<T>(offset).unwrap())
+        Some(alloc_extra.get_sync::<T>(offset).unwrap())
     }
 
     #[inline]
@@ -690,7 +703,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
     /// On a timeout, `retval_timeout` is written to `dest` and `errno_timeout` is set as the last error.
     fn futex_wait(
         &mut self,
-        addr: u64,
+        futex_ref: FutexRef,
         bitset: u32,
         timeout: Option<(TimeoutClock, TimeoutAnchor, Duration)>,
         retval_succ: Scalar,
@@ -700,23 +713,25 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
     ) {
         let this = self.eval_context_mut();
         let thread = this.active_thread();
-        let futex = &mut this.machine.sync.futexes.entry(addr).or_default();
+        let mut futex = futex_ref.0.borrow_mut();
         let waiters = &mut futex.waiters;
         assert!(waiters.iter().all(|waiter| waiter.thread != thread), "thread is already waiting");
         waiters.push_back(FutexWaiter { thread, bitset });
+        drop(futex);
+
         this.block_thread(
-            BlockReason::Futex { addr },
+            BlockReason::Futex,
             timeout,
             callback!(
                 @capture<'tcx> {
-                    addr: u64,
+                    futex_ref: FutexRef,
                     retval_succ: Scalar,
                     retval_timeout: Scalar,
                     dest: MPlaceTy<'tcx>,
                     errno_timeout: IoError,
                 }
                 @unblock = |this| {
-                    let futex = this.machine.sync.futexes.get(&addr).unwrap();
+                    let futex = futex_ref.0.borrow();
                     // Acquire the clock of the futex.
                     if let Some(data_race) = &this.machine.data_race {
                         data_race.acquire_clock(&futex.clock, &this.machine.threads);
@@ -728,7 +743,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
                 @timeout = |this| {
                     // Remove the waiter from the futex.
                     let thread = this.active_thread();
-                    let futex = this.machine.sync.futexes.get_mut(&addr).unwrap();
+                    let mut futex = futex_ref.0.borrow_mut();
                     futex.waiters.retain(|waiter| waiter.thread != thread);
                     // Set errno and write return value.
                     this.set_last_error(errno_timeout)?;
@@ -739,12 +754,11 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
         );
     }
 
+    /// Wake up the first thread in the queue that matches any of the bits in the bitset.
     /// Returns whether anything was woken.
-    fn futex_wake(&mut self, addr: u64, bitset: u32) -> InterpResult<'tcx, bool> {
+    fn futex_wake(&mut self, futex_ref: &FutexRef, bitset: u32) -> InterpResult<'tcx, bool> {
         let this = self.eval_context_mut();
-        let Some(futex) = this.machine.sync.futexes.get_mut(&addr) else {
-            return interp_ok(false);
-        };
+        let mut futex = futex_ref.0.borrow_mut();
         let data_race = &this.machine.data_race;
 
         // Each futex-wake happens-before the end of the futex wait
@@ -757,7 +771,8 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
             return interp_ok(false);
         };
         let waiter = futex.waiters.remove(i).unwrap();
-        this.unblock_thread(waiter.thread, BlockReason::Futex { addr })?;
+        drop(futex);
+        this.unblock_thread(waiter.thread, BlockReason::Futex)?;
         interp_ok(true)
     }
 }
diff --git a/src/tools/miri/src/concurrency/thread.rs b/src/tools/miri/src/concurrency/thread.rs
index 7477494281d..e6a3ae897c2 100644
--- a/src/tools/miri/src/concurrency/thread.rs
+++ b/src/tools/miri/src/concurrency/thread.rs
@@ -147,7 +147,7 @@ pub enum BlockReason {
     /// Blocked on a reader-writer lock.
     RwLock(RwLockId),
     /// Blocked on a Futex variable.
-    Futex { addr: u64 },
+    Futex,
     /// Blocked on an InitOnce.
     InitOnce(InitOnceId),
     /// Blocked on epoll.
diff --git a/src/tools/miri/src/shims/unix/linux/sync.rs b/src/tools/miri/src/shims/unix/linux/sync.rs
index 6d5747d7c15..01b011d3504 100644
--- a/src/tools/miri/src/shims/unix/linux/sync.rs
+++ b/src/tools/miri/src/shims/unix/linux/sync.rs
@@ -1,6 +1,11 @@
+use crate::concurrency::sync::FutexRef;
 use crate::helpers::check_min_arg_count;
 use crate::*;
 
+struct LinuxFutex {
+    futex: FutexRef,
+}
+
 /// Implementation of the SYS_futex syscall.
 /// `args` is the arguments *including* the syscall number.
 pub fn futex<'tcx>(
@@ -27,7 +32,6 @@ pub fn futex<'tcx>(
 
     // This is a vararg function so we have to bring our own type for this pointer.
     let addr = this.ptr_to_mplace(addr, this.machine.layouts.i32);
-    let addr_usize = addr.ptr().addr().bytes();
 
     let futex_private = this.eval_libc_i32("FUTEX_PRIVATE_FLAG");
     let futex_wait = this.eval_libc_i32("FUTEX_WAIT");
@@ -63,8 +67,7 @@ pub fn futex<'tcx>(
             };
 
             if bitset == 0 {
-                this.set_last_error_and_return(LibcError("EINVAL"), dest)?;
-                return interp_ok(());
+                return this.set_last_error_and_return(LibcError("EINVAL"), dest);
             }
 
             let timeout = this.deref_pointer_as(timeout, this.libc_ty_layout("timespec"))?;
@@ -99,19 +102,18 @@ pub fn futex<'tcx>(
             // effects of this and the other thread are correctly observed,
             // otherwise we will deadlock.
             //
-            // There are two scenarios to consider:
-            // 1. If we (FUTEX_WAIT) execute first, we'll push ourselves into
-            //    the waiters queue and go to sleep. They (addr write & FUTEX_WAKE)
-            //    will see us in the queue and wake us up.
-            // 2. If they (addr write & FUTEX_WAKE) execute first, we must observe
-            //    addr's new value. If we see an outdated value that happens to equal
-            //    the expected val, then we'll put ourselves to sleep with no one to wake us
-            //    up, so we end up with a deadlock. This is prevented by having a SeqCst
-            //    fence inside FUTEX_WAKE syscall, and another SeqCst fence
-            //    below, the atomic read on addr after the SeqCst fence is guaranteed
-            //    not to see any value older than the addr write immediately before
-            //    calling FUTEX_WAKE. We'll see futex_val != val and return without
-            //    sleeping.
+            // There are two scenarios to consider, depending on whether WAIT or WAKE goes first:
+            // 1. If we (FUTEX_WAIT) execute first, we'll push ourselves into the waiters queue and
+            //    go to sleep. They (FUTEX_WAKE) will see us in the queue and wake us up. It doesn't
+            //    matter how the addr write is ordered.
+            // 2. If they (FUTEX_WAKE) execute first, that means the addr write is also before us
+            //    (FUTEX_WAIT). It is crucial that we observe addr's new value. If we see an
+            //    outdated value that happens to equal the expected val, then we'll put ourselves to
+            //    sleep with no one to wake us up, so we end up with a deadlock. This is prevented
+            //    by having a SeqCst fence inside FUTEX_WAKE syscall, and another SeqCst fence here
+            //    in FUTEX_WAIT. The atomic read on addr after the SeqCst fence is guaranteed not to
+            //    see any value older than the addr write immediately before calling FUTEX_WAKE.
+            //    We'll see futex_val != val and return without sleeping.
             //
             //    Note that the fences do not create any happens-before relationship.
             //    The read sees the write immediately before the fence not because
@@ -140,11 +142,22 @@ pub fn futex<'tcx>(
             this.atomic_fence(AtomicFenceOrd::SeqCst)?;
             // Read an `i32` through the pointer, regardless of any wrapper types.
             // It's not uncommon for `addr` to be passed as another type than `*mut i32`, such as `*const AtomicI32`.
-            let futex_val = this.read_scalar_atomic(&addr, AtomicReadOrd::Relaxed)?.to_i32()?;
+            // We do an acquire read -- it only seems reasonable that if we observe a value here, we
+            // actually establish an ordering with that value.
+            let futex_val = this.read_scalar_atomic(&addr, AtomicReadOrd::Acquire)?.to_i32()?;
             if val == futex_val {
                 // The value still matches, so we block the thread and make it wait for FUTEX_WAKE.
+
+                // This cannot fail since we already did an atomic acquire read on that pointer.
+                // Acquire reads are only allowed on mutable memory.
+                let futex_ref = this
+                    .get_sync_or_init(addr.ptr(), |_| LinuxFutex { futex: Default::default() })
+                    .unwrap()
+                    .futex
+                    .clone();
+
                 this.futex_wait(
-                    addr_usize,
+                    futex_ref,
                     bitset,
                     timeout,
                     Scalar::from_target_isize(0, this), // retval_succ
@@ -165,6 +178,17 @@ pub fn futex<'tcx>(
         // FUTEX_WAKE_BITSET: (int *addr, int op = FUTEX_WAKE, int val, const timespect *_unused, int *_unused, unsigned int bitset)
         // Same as FUTEX_WAKE, but allows you to specify a bitset to select which threads to wake up.
         op if op == futex_wake || op == futex_wake_bitset => {
+            let Some(futex_ref) =
+                this.get_sync_or_init(addr.ptr(), |_| LinuxFutex { futex: Default::default() })
+            else {
+                // No AllocId, or no live allocation at that AllocId.
+                // Return an error code. (That seems nicer than silently doing something non-intuitive.)
+                // This means that if an address gets reused by a new allocation,
+                // we'll use an independent futex queue for this... that seems acceptable.
+                return this.set_last_error_and_return(LibcError("EFAULT"), dest);
+            };
+            let futex_ref = futex_ref.futex.clone();
+
             let bitset = if op == futex_wake_bitset {
                 let [_, _, _, _, timeout, uaddr2, bitset] =
                     check_min_arg_count("`syscall(SYS_futex, FUTEX_WAKE_BITSET, ...)`", args)?;
@@ -184,7 +208,7 @@ pub fn futex<'tcx>(
             let mut n = 0;
             #[expect(clippy::arithmetic_side_effects)]
             for _ in 0..val {
-                if this.futex_wake(addr_usize, bitset)? {
+                if this.futex_wake(&futex_ref, bitset)? {
                     n += 1;
                 } else {
                     break;
diff --git a/src/tools/miri/src/shims/windows/sync.rs b/src/tools/miri/src/shims/windows/sync.rs
index 7263958411f..b03dedea146 100644
--- a/src/tools/miri/src/shims/windows/sync.rs
+++ b/src/tools/miri/src/shims/windows/sync.rs
@@ -3,6 +3,7 @@ use std::time::Duration;
 use rustc_abi::Size;
 
 use crate::concurrency::init_once::InitOnceStatus;
+use crate::concurrency::sync::FutexRef;
 use crate::*;
 
 #[derive(Copy, Clone)]
@@ -10,6 +11,10 @@ struct WindowsInitOnce {
     id: InitOnceId,
 }
 
+struct WindowsFutex {
+    futex: FutexRef,
+}
+
 impl<'tcx> EvalContextExtPriv<'tcx> for crate::MiriInterpCx<'tcx> {}
 trait EvalContextExtPriv<'tcx>: crate::MiriInterpCxExt<'tcx> {
     // Windows sync primitives are pointer sized.
@@ -168,8 +173,6 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
         let size = this.read_target_usize(size_op)?;
         let timeout_ms = this.read_scalar(timeout_op)?.to_u32()?;
 
-        let addr = ptr.addr().bytes();
-
         if size > 8 || !size.is_power_of_two() {
             let invalid_param = this.eval_windows("c", "ERROR_INVALID_PARAMETER");
             this.set_last_error(invalid_param)?;
@@ -190,13 +193,21 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
 
         let layout = this.machine.layouts.uint(size).unwrap();
         let futex_val =
-            this.read_scalar_atomic(&this.ptr_to_mplace(ptr, layout), AtomicReadOrd::Relaxed)?;
+            this.read_scalar_atomic(&this.ptr_to_mplace(ptr, layout), AtomicReadOrd::Acquire)?;
         let compare_val = this.read_scalar(&this.ptr_to_mplace(compare, layout))?;
 
         if futex_val == compare_val {
             // If the values are the same, we have to block.
+
+            // This cannot fail since we already did an atomic acquire read on that pointer.
+            let futex_ref = this
+                .get_sync_or_init(ptr, |_| WindowsFutex { futex: Default::default() })
+                .unwrap()
+                .futex
+                .clone();
+
             this.futex_wait(
-                addr,
+                futex_ref,
                 u32::MAX, // bitset
                 timeout,
                 Scalar::from_i32(1), // retval_succ
@@ -219,8 +230,15 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
         // See the Linux futex implementation for why this fence exists.
         this.atomic_fence(AtomicFenceOrd::SeqCst)?;
 
-        let addr = ptr.addr().bytes();
-        this.futex_wake(addr, u32::MAX)?;
+        let Some(futex_ref) =
+            this.get_sync_or_init(ptr, |_| WindowsFutex { futex: Default::default() })
+        else {
+            // Seems like this cannot return an error, so we just wake nobody.
+            return interp_ok(());
+        };
+        let futex_ref = futex_ref.futex.clone();
+
+        this.futex_wake(&futex_ref, u32::MAX)?;
 
         interp_ok(())
     }
@@ -232,8 +250,15 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
         // See the Linux futex implementation for why this fence exists.
         this.atomic_fence(AtomicFenceOrd::SeqCst)?;
 
-        let addr = ptr.addr().bytes();
-        while this.futex_wake(addr, u32::MAX)? {}
+        let Some(futex_ref) =
+            this.get_sync_or_init(ptr, |_| WindowsFutex { futex: Default::default() })
+        else {
+            // Seems like this cannot return an error, so we just wake nobody.
+            return interp_ok(());
+        };
+        let futex_ref = futex_ref.futex.clone();
+
+        while this.futex_wake(&futex_ref, u32::MAX)? {}
 
         interp_ok(())
     }
diff --git a/src/tools/miri/tests/pass-dep/concurrency/linux-futex.rs b/src/tools/miri/tests/pass-dep/concurrency/linux-futex.rs
index 2a36c10f7d4..d1fcf61c4c8 100644
--- a/src/tools/miri/tests/pass-dep/concurrency/linux-futex.rs
+++ b/src/tools/miri/tests/pass-dep/concurrency/linux-futex.rs
@@ -41,9 +41,12 @@ fn wake_dangling() {
     let ptr: *const i32 = &*futex;
     drop(futex);
 
-    // Wake 1 waiter. Expect zero waiters woken up, as nobody is waiting.
+    // Expect error since this is now "unmapped" memory.
+    // parking_lot relies on this:
+    // <https://github.com/Amanieu/parking_lot/blob/ca920b31312839013b4455aba1d53a4aede21b2f/core/src/thread_parker/linux.rs#L138-L145>
     unsafe {
-        assert_eq!(libc::syscall(libc::SYS_futex, ptr, libc::FUTEX_WAKE, 1), 0);
+        assert_eq!(libc::syscall(libc::SYS_futex, ptr, libc::FUTEX_WAKE, 1), -1);
+        assert_eq!(io::Error::last_os_error().raw_os_error().unwrap(), libc::EFAULT);
     }
 }