about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--src/tools/miri/src/alloc_addresses/mod.rs4
-rw-r--r--src/tools/miri/src/concurrency/data_race.rs20
-rw-r--r--src/tools/miri/src/concurrency/init_once.rs110
-rw-r--r--src/tools/miri/src/concurrency/sync.rs533
-rw-r--r--src/tools/miri/src/concurrency/thread.rs515
-rw-r--r--src/tools/miri/src/diagnostics.rs8
-rw-r--r--src/tools/miri/src/lib.rs3
-rw-r--r--src/tools/miri/src/machine.rs7
-rw-r--r--src/tools/miri/src/shims/time.rs42
-rw-r--r--src/tools/miri/src/shims/tls.rs6
-rw-r--r--src/tools/miri/src/shims/unix/foreign_items.rs16
-rw-r--r--src/tools/miri/src/shims/unix/linux/sync.rs72
-rw-r--r--src/tools/miri/src/shims/unix/macos/foreign_items.rs2
-rw-r--r--src/tools/miri/src/shims/unix/sync.rs213
-rw-r--r--src/tools/miri/src/shims/unix/thread.rs2
-rw-r--r--src/tools/miri/src/shims/windows/foreign_items.rs11
-rw-r--r--src/tools/miri/src/shims/windows/sync.rs161
-rw-r--r--src/tools/miri/src/shims/windows/thread.rs2
18 files changed, 832 insertions, 895 deletions
diff --git a/src/tools/miri/src/alloc_addresses/mod.rs b/src/tools/miri/src/alloc_addresses/mod.rs
index 58241538795..9ec9ae317f4 100644
--- a/src/tools/miri/src/alloc_addresses/mod.rs
+++ b/src/tools/miri/src/alloc_addresses/mod.rs
@@ -169,7 +169,7 @@ trait EvalContextExtPriv<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
                     size,
                     align,
                     memory_kind,
-                    ecx.get_active_thread(),
+                    ecx.active_thread(),
                 ) {
                     if let Some(clock) = clock {
                         ecx.acquire_clock(&clock);
@@ -367,7 +367,7 @@ impl<'mir, 'tcx> MiriMachine<'mir, 'tcx> {
         // `alloc_id_from_addr` any more.
         global_state.exposed.remove(&dead_id);
         // Also remember this address for future reuse.
-        let thread = self.threads.get_active_thread_id();
+        let thread = self.threads.active_thread();
         global_state.reuse.add_addr(rng, addr, size, align, kind, thread, || {
             if let Some(data_race) = &self.data_race {
                 data_race.release_clock(&self.threads).clone()
diff --git a/src/tools/miri/src/concurrency/data_race.rs b/src/tools/miri/src/concurrency/data_race.rs
index da6fa4f3405..f22dbf49e8b 100644
--- a/src/tools/miri/src/concurrency/data_race.rs
+++ b/src/tools/miri/src/concurrency/data_race.rs
@@ -839,7 +839,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: MiriInterpCxExt<'mir, 'tcx> {
     fn acquire_clock(&self, clock: &VClock) {
         let this = self.eval_context_ref();
         if let Some(data_race) = &this.machine.data_race {
-            data_race.acquire_clock(clock, this.get_active_thread());
+            data_race.acquire_clock(clock, &this.machine.threads);
         }
     }
 }
@@ -1662,13 +1662,14 @@ impl GlobalState {
     /// This should be called strictly before any calls to
     /// `thread_joined`.
     #[inline]
-    pub fn thread_terminated(&mut self, thread_mgr: &ThreadManager<'_, '_>, current_span: Span) {
+    pub fn thread_terminated(&mut self, thread_mgr: &ThreadManager<'_, '_>) {
         let current_index = self.active_thread_index(thread_mgr);
 
         // Increment the clock to a unique termination timestamp.
         let vector_clocks = self.vector_clocks.get_mut();
         let current_clocks = &mut vector_clocks[current_index];
-        current_clocks.increment_clock(current_index, current_span);
+        current_clocks
+            .increment_clock(current_index, thread_mgr.active_thread_ref().current_span());
 
         // Load the current thread id for the executing vector.
         let vector_info = self.vector_info.get_mut();
@@ -1722,11 +1723,12 @@ impl GlobalState {
         format!("thread `{thread_name}`")
     }
 
-    /// Acquire the given clock into the given thread, establishing synchronization with
+    /// Acquire the given clock into the current thread, establishing synchronization with
     /// the moment when that clock snapshot was taken via `release_clock`.
     /// As this is an acquire operation, the thread timestamp is not
     /// incremented.
-    pub fn acquire_clock(&self, clock: &VClock, thread: ThreadId) {
+    pub fn acquire_clock<'mir, 'tcx>(&self, clock: &VClock, threads: &ThreadManager<'mir, 'tcx>) {
+        let thread = threads.active_thread();
         let (_, mut clocks) = self.thread_state_mut(thread);
         clocks.clock.join(clock);
     }
@@ -1738,7 +1740,7 @@ impl GlobalState {
         &self,
         threads: &ThreadManager<'mir, 'tcx>,
     ) -> Ref<'_, VClock> {
-        let thread = threads.get_active_thread_id();
+        let thread = threads.active_thread();
         let span = threads.active_thread_ref().current_span();
         // We increment the clock each time this happens, to ensure no two releases
         // can be confused with each other.
@@ -1782,7 +1784,7 @@ impl GlobalState {
         &self,
         thread_mgr: &ThreadManager<'_, '_>,
     ) -> (VectorIdx, Ref<'_, ThreadClockSet>) {
-        self.thread_state(thread_mgr.get_active_thread_id())
+        self.thread_state(thread_mgr.active_thread())
     }
 
     /// Load the current vector clock in use and the current set of thread clocks
@@ -1792,14 +1794,14 @@ impl GlobalState {
         &self,
         thread_mgr: &ThreadManager<'_, '_>,
     ) -> (VectorIdx, RefMut<'_, ThreadClockSet>) {
-        self.thread_state_mut(thread_mgr.get_active_thread_id())
+        self.thread_state_mut(thread_mgr.active_thread())
     }
 
     /// Return the current thread, should be the same
     /// as the data-race active thread.
     #[inline]
     fn active_thread_index(&self, thread_mgr: &ThreadManager<'_, '_>) -> VectorIdx {
-        let active_thread_id = thread_mgr.get_active_thread_id();
+        let active_thread_id = thread_mgr.active_thread();
         self.thread_index(active_thread_id)
     }
 
diff --git a/src/tools/miri/src/concurrency/init_once.rs b/src/tools/miri/src/concurrency/init_once.rs
index 6469c90c693..1ee84273b58 100644
--- a/src/tools/miri/src/concurrency/init_once.rs
+++ b/src/tools/miri/src/concurrency/init_once.rs
@@ -4,29 +4,11 @@ use rustc_index::Idx;
 use rustc_middle::ty::layout::TyAndLayout;
 
 use super::sync::EvalContextExtPriv as _;
-use super::thread::MachineCallback;
 use super::vector_clock::VClock;
 use crate::*;
 
 declare_id!(InitOnceId);
 
-/// A thread waiting on an InitOnce object.
-struct InitOnceWaiter<'mir, 'tcx> {
-    /// The thread that is waiting.
-    thread: ThreadId,
-    /// The callback that should be executed, after the thread has been woken up.
-    callback: Box<dyn MachineCallback<'mir, 'tcx> + 'tcx>,
-}
-
-impl<'mir, 'tcx> std::fmt::Debug for InitOnceWaiter<'mir, 'tcx> {
-    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
-        f.debug_struct("InitOnce")
-            .field("thread", &self.thread)
-            .field("callback", &"dyn MachineCallback")
-            .finish()
-    }
-}
-
 #[derive(Default, Debug, Copy, Clone, PartialEq, Eq)]
 /// The current status of a one time initialization.
 pub enum InitOnceStatus {
@@ -38,68 +20,14 @@ pub enum InitOnceStatus {
 
 /// The one time initialization state.
 #[derive(Default, Debug)]
-pub(super) struct InitOnce<'mir, 'tcx> {
+pub(super) struct InitOnce {
     status: InitOnceStatus,
-    waiters: VecDeque<InitOnceWaiter<'mir, 'tcx>>,
+    waiters: VecDeque<ThreadId>,
     clock: VClock,
 }
 
-impl<'mir, 'tcx> VisitProvenance for InitOnce<'mir, 'tcx> {
-    fn visit_provenance(&self, visit: &mut VisitWith<'_>) {
-        for waiter in self.waiters.iter() {
-            waiter.callback.visit_provenance(visit);
-        }
-    }
-}
-
 impl<'mir, 'tcx: 'mir> EvalContextExtPriv<'mir, 'tcx> for crate::MiriInterpCx<'mir, 'tcx> {}
 trait EvalContextExtPriv<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
-    /// Synchronize with the previous initialization attempt of an InitOnce.
-    #[inline]
-    fn init_once_observe_attempt(&mut self, id: InitOnceId) {
-        let this = self.eval_context_mut();
-        let current_thread = this.get_active_thread();
-
-        if let Some(data_race) = &this.machine.data_race {
-            data_race.acquire_clock(&this.machine.sync.init_onces[id].clock, current_thread);
-        }
-    }
-
-    #[inline]
-    fn init_once_wake_waiter(
-        &mut self,
-        id: InitOnceId,
-        waiter: InitOnceWaiter<'mir, 'tcx>,
-    ) -> InterpResult<'tcx> {
-        let this = self.eval_context_mut();
-        let current_thread = this.get_active_thread();
-
-        this.unblock_thread(waiter.thread, BlockReason::InitOnce(id));
-
-        // Call callback, with the woken-up thread as `current`.
-        this.set_active_thread(waiter.thread);
-        this.init_once_observe_attempt(id);
-        waiter.callback.call(this)?;
-        this.set_active_thread(current_thread);
-
-        Ok(())
-    }
-}
-
-impl<'mir, 'tcx: 'mir> EvalContextExt<'mir, 'tcx> for crate::MiriInterpCx<'mir, 'tcx> {}
-pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
-    fn init_once_get_or_create_id(
-        &mut self,
-        lock_op: &OpTy<'tcx, Provenance>,
-        lock_layout: TyAndLayout<'tcx>,
-        offset: u64,
-    ) -> InterpResult<'tcx, InitOnceId> {
-        let this = self.eval_context_mut();
-        this.init_once_get_or_create(|ecx, next_id| {
-            ecx.get_or_create_id(next_id, lock_op, lock_layout, offset)
-        })
-    }
-
     /// Provides the closure with the next InitOnceId. Creates that InitOnce if the closure returns None,
     /// otherwise returns the value from the closure.
     #[inline]
@@ -120,6 +48,21 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
             Ok(new_index)
         }
     }
+}
+
+impl<'mir, 'tcx: 'mir> EvalContextExt<'mir, 'tcx> for crate::MiriInterpCx<'mir, 'tcx> {}
+pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
+    fn init_once_get_or_create_id(
+        &mut self,
+        lock_op: &OpTy<'tcx, Provenance>,
+        lock_layout: TyAndLayout<'tcx>,
+        offset: u64,
+    ) -> InterpResult<'tcx, InitOnceId> {
+        let this = self.eval_context_mut();
+        this.init_once_get_or_create(|ecx, next_id| {
+            ecx.get_or_create_id(next_id, lock_op, lock_layout, offset)
+        })
+    }
 
     #[inline]
     fn init_once_status(&mut self, id: InitOnceId) -> InitOnceStatus {
@@ -132,14 +75,14 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
     fn init_once_enqueue_and_block(
         &mut self,
         id: InitOnceId,
-        thread: ThreadId,
-        callback: Box<dyn MachineCallback<'mir, 'tcx> + 'tcx>,
+        callback: impl UnblockCallback<'mir, 'tcx> + 'tcx,
     ) {
         let this = self.eval_context_mut();
+        let thread = this.active_thread();
         let init_once = &mut this.machine.sync.init_onces[id];
         assert_ne!(init_once.status, InitOnceStatus::Complete, "queueing on complete init once");
-        init_once.waiters.push_back(InitOnceWaiter { thread, callback });
-        this.block_thread(thread, BlockReason::InitOnce(id));
+        init_once.waiters.push_back(thread);
+        this.block_thread(BlockReason::InitOnce(id), None, callback);
     }
 
     /// Begin initializing this InitOnce. Must only be called after checking that it is currently
@@ -177,7 +120,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
         // Wake up everyone.
         // need to take the queue to avoid having `this` be borrowed multiple times
         for waiter in std::mem::take(&mut init_once.waiters) {
-            this.init_once_wake_waiter(id, waiter)?;
+            this.unblock_thread(waiter, BlockReason::InitOnce(id))?;
         }
 
         Ok(())
@@ -192,6 +135,8 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
             InitOnceStatus::Begun,
             "failing already completed or uninit init once"
         );
+        // This is again uninitialized.
+        init_once.status = InitOnceStatus::Uninitialized;
 
         // Each complete happens-before the end of the wait
         if let Some(data_race) = &this.machine.data_race {
@@ -200,10 +145,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
 
         // Wake up one waiting thread, so they can go ahead and try to init this.
         if let Some(waiter) = init_once.waiters.pop_front() {
-            this.init_once_wake_waiter(id, waiter)?;
-        } else {
-            // Nobody there to take this, so go back to 'uninit'
-            init_once.status = InitOnceStatus::Uninitialized;
+            this.unblock_thread(waiter, BlockReason::InitOnce(id))?;
         }
 
         Ok(())
@@ -221,6 +163,6 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
             "observing the completion of incomplete init once"
         );
 
-        this.init_once_observe_attempt(id);
+        this.acquire_clock(&this.machine.sync.init_onces[id].clock);
     }
 }
diff --git a/src/tools/miri/src/concurrency/sync.rs b/src/tools/miri/src/concurrency/sync.rs
index a498d9e0a52..df10e15fe10 100644
--- a/src/tools/miri/src/concurrency/sync.rs
+++ b/src/tools/miri/src/concurrency/sync.rs
@@ -111,19 +111,10 @@ struct RwLock {
 
 declare_id!(CondvarId);
 
-/// A thread waiting on a conditional variable.
-#[derive(Debug)]
-struct CondvarWaiter {
-    /// The thread that is waiting on this variable.
-    thread: ThreadId,
-    /// The mutex on which the thread is waiting.
-    lock: MutexId,
-}
-
 /// The conditional variable state.
 #[derive(Default, Debug)]
 struct Condvar {
-    waiters: VecDeque<CondvarWaiter>,
+    waiters: VecDeque<ThreadId>,
     /// Tracks the happens-before relationship
     /// between a cond-var signal and a cond-var
     /// wait during a non-spurious signal event.
@@ -155,20 +146,12 @@ struct FutexWaiter {
 
 /// The state of all synchronization objects.
 #[derive(Default, Debug)]
-pub struct SynchronizationObjects<'mir, 'tcx> {
+pub struct SynchronizationObjects {
     mutexes: IndexVec<MutexId, Mutex>,
     rwlocks: IndexVec<RwLockId, RwLock>,
     condvars: IndexVec<CondvarId, Condvar>,
     futexes: FxHashMap<u64, Futex>,
-    pub(super) init_onces: IndexVec<InitOnceId, InitOnce<'mir, 'tcx>>,
-}
-
-impl<'mir, 'tcx> VisitProvenance for SynchronizationObjects<'mir, 'tcx> {
-    fn visit_provenance(&self, visit: &mut VisitWith<'_>) {
-        for init_once in self.init_onces.iter() {
-            init_once.visit_provenance(visit);
-        }
-    }
+    pub(super) init_onces: IndexVec<InitOnceId, InitOnce>,
 }
 
 // Private extension trait for local helper methods
@@ -210,45 +193,69 @@ pub(super) trait EvalContextExtPriv<'mir, 'tcx: 'mir>:
         })
     }
 
-    /// Take a reader out of the queue waiting for the lock.
-    /// Returns `true` if some thread got the rwlock.
+    /// Provides the closure with the next MutexId. Creates that mutex if the closure returns None,
+    /// otherwise returns the value from the closure.
     #[inline]
-    fn rwlock_dequeue_and_lock_reader(&mut self, id: RwLockId) -> bool {
+    fn mutex_get_or_create<F>(&mut self, existing: F) -> InterpResult<'tcx, MutexId>
+    where
+        F: FnOnce(&mut MiriInterpCx<'mir, 'tcx>, MutexId) -> InterpResult<'tcx, Option<MutexId>>,
+    {
         let this = self.eval_context_mut();
-        if let Some(reader) = this.machine.sync.rwlocks[id].reader_queue.pop_front() {
-            this.unblock_thread(reader, BlockReason::RwLock(id));
-            this.rwlock_reader_lock(id, reader);
-            true
+        let next_index = this.machine.sync.mutexes.next_index();
+        if let Some(old) = existing(this, next_index)? {
+            if this.machine.sync.mutexes.get(old).is_none() {
+                throw_ub_format!("mutex has invalid ID");
+            }
+            Ok(old)
         } else {
-            false
+            let new_index = this.machine.sync.mutexes.push(Default::default());
+            assert_eq!(next_index, new_index);
+            Ok(new_index)
         }
     }
 
-    /// Take the writer out of the queue waiting for the lock.
-    /// Returns `true` if some thread got the rwlock.
+    /// Provides the closure with the next RwLockId. Creates that RwLock if the closure returns None,
+    /// otherwise returns the value from the closure.
     #[inline]
-    fn rwlock_dequeue_and_lock_writer(&mut self, id: RwLockId) -> bool {
+    fn rwlock_get_or_create<F>(&mut self, existing: F) -> InterpResult<'tcx, RwLockId>
+    where
+        F: FnOnce(&mut MiriInterpCx<'mir, 'tcx>, RwLockId) -> InterpResult<'tcx, Option<RwLockId>>,
+    {
         let this = self.eval_context_mut();
-        if let Some(writer) = this.machine.sync.rwlocks[id].writer_queue.pop_front() {
-            this.unblock_thread(writer, BlockReason::RwLock(id));
-            this.rwlock_writer_lock(id, writer);
-            true
+        let next_index = this.machine.sync.rwlocks.next_index();
+        if let Some(old) = existing(this, next_index)? {
+            if this.machine.sync.rwlocks.get(old).is_none() {
+                throw_ub_format!("rwlock has invalid ID");
+            }
+            Ok(old)
         } else {
-            false
+            let new_index = this.machine.sync.rwlocks.push(Default::default());
+            assert_eq!(next_index, new_index);
+            Ok(new_index)
         }
     }
 
-    /// Take a thread out of the queue waiting for the mutex, and lock
-    /// the mutex for it. Returns `true` if some thread has the mutex now.
+    /// Provides the closure with the next CondvarId. Creates that Condvar if the closure returns None,
+    /// otherwise returns the value from the closure.
     #[inline]
-    fn mutex_dequeue_and_lock(&mut self, id: MutexId) -> bool {
+    fn condvar_get_or_create<F>(&mut self, existing: F) -> InterpResult<'tcx, CondvarId>
+    where
+        F: FnOnce(
+            &mut MiriInterpCx<'mir, 'tcx>,
+            CondvarId,
+        ) -> InterpResult<'tcx, Option<CondvarId>>,
+    {
         let this = self.eval_context_mut();
-        if let Some(thread) = this.machine.sync.mutexes[id].queue.pop_front() {
-            this.unblock_thread(thread, BlockReason::Mutex(id));
-            this.mutex_lock(id, thread);
-            true
+        let next_index = this.machine.sync.condvars.next_index();
+        if let Some(old) = existing(this, next_index)? {
+            if this.machine.sync.condvars.get(old).is_none() {
+                throw_ub_format!("condvar has invalid ID");
+            }
+            Ok(old)
         } else {
-            false
+            let new_index = this.machine.sync.condvars.push(Default::default());
+            assert_eq!(next_index, new_index);
+            Ok(new_index)
         }
     }
 }
@@ -296,27 +303,6 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
     }
 
     #[inline]
-    /// Provides the closure with the next MutexId. Creates that mutex if the closure returns None,
-    /// otherwise returns the value from the closure
-    fn mutex_get_or_create<F>(&mut self, existing: F) -> InterpResult<'tcx, MutexId>
-    where
-        F: FnOnce(&mut MiriInterpCx<'mir, 'tcx>, MutexId) -> InterpResult<'tcx, Option<MutexId>>,
-    {
-        let this = self.eval_context_mut();
-        let next_index = this.machine.sync.mutexes.next_index();
-        if let Some(old) = existing(this, next_index)? {
-            if this.machine.sync.mutexes.get(old).is_none() {
-                throw_ub_format!("mutex has invalid ID");
-            }
-            Ok(old)
-        } else {
-            let new_index = this.machine.sync.mutexes.push(Default::default());
-            assert_eq!(next_index, new_index);
-            Ok(new_index)
-        }
-    }
-
-    #[inline]
     /// Get the id of the thread that currently owns this lock.
     fn mutex_get_owner(&mut self, id: MutexId) -> ThreadId {
         let this = self.eval_context_ref();
@@ -331,8 +317,9 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
     }
 
     /// Lock by setting the mutex owner and increasing the lock count.
-    fn mutex_lock(&mut self, id: MutexId, thread: ThreadId) {
+    fn mutex_lock(&mut self, id: MutexId) {
         let this = self.eval_context_mut();
+        let thread = this.active_thread();
         let mutex = &mut this.machine.sync.mutexes[id];
         if let Some(current_owner) = mutex.owner {
             assert_eq!(thread, current_owner, "mutex already locked by another thread");
@@ -345,7 +332,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
         }
         mutex.lock_count = mutex.lock_count.checked_add(1).unwrap();
         if let Some(data_race) = &this.machine.data_race {
-            data_race.acquire_clock(&mutex.clock, thread);
+            data_race.acquire_clock(&mutex.clock, &this.machine.threads);
         }
     }
 
@@ -353,14 +340,14 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
     /// count. If the lock count reaches 0, release the lock and potentially
     /// give to a new owner. If the lock was not locked by the current thread,
     /// return `None`.
-    fn mutex_unlock(&mut self, id: MutexId) -> Option<usize> {
+    fn mutex_unlock(&mut self, id: MutexId) -> InterpResult<'tcx, Option<usize>> {
         let this = self.eval_context_mut();
         let mutex = &mut this.machine.sync.mutexes[id];
-        if let Some(current_owner) = mutex.owner {
+        Ok(if let Some(current_owner) = mutex.owner {
             // Mutex is locked.
-            if current_owner != this.machine.threads.get_active_thread_id() {
+            if current_owner != this.machine.threads.active_thread() {
                 // Only the owner can unlock the mutex.
-                return None;
+                return Ok(None);
             }
             let old_lock_count = mutex.lock_count;
             mutex.lock_count = old_lock_count
@@ -373,42 +360,52 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
                 if let Some(data_race) = &this.machine.data_race {
                     mutex.clock.clone_from(&data_race.release_clock(&this.machine.threads));
                 }
-                this.mutex_dequeue_and_lock(id);
+                if let Some(thread) = this.machine.sync.mutexes[id].queue.pop_front() {
+                    this.unblock_thread(thread, BlockReason::Mutex(id))?;
+                }
             }
             Some(old_lock_count)
         } else {
             // Mutex is not locked.
             None
-        }
+        })
     }
 
     /// Put the thread into the queue waiting for the mutex.
+    /// Once the Mutex becomes available, `retval` will be written to `dest`.
     #[inline]
-    fn mutex_enqueue_and_block(&mut self, id: MutexId, thread: ThreadId) {
+    fn mutex_enqueue_and_block(
+        &mut self,
+        id: MutexId,
+        retval: Scalar<Provenance>,
+        dest: MPlaceTy<'tcx, Provenance>,
+    ) {
         let this = self.eval_context_mut();
         assert!(this.mutex_is_locked(id), "queing on unlocked mutex");
+        let thread = this.active_thread();
         this.machine.sync.mutexes[id].queue.push_back(thread);
-        this.block_thread(thread, BlockReason::Mutex(id));
-    }
+        this.block_thread(BlockReason::Mutex(id), None, Callback { id, retval, dest });
 
-    /// Provides the closure with the next RwLockId. Creates that RwLock if the closure returns None,
-    /// otherwise returns the value from the closure
-    #[inline]
-    fn rwlock_get_or_create<F>(&mut self, existing: F) -> InterpResult<'tcx, RwLockId>
-    where
-        F: FnOnce(&mut MiriInterpCx<'mir, 'tcx>, RwLockId) -> InterpResult<'tcx, Option<RwLockId>>,
-    {
-        let this = self.eval_context_mut();
-        let next_index = this.machine.sync.rwlocks.next_index();
-        if let Some(old) = existing(this, next_index)? {
-            if this.machine.sync.rwlocks.get(old).is_none() {
-                throw_ub_format!("rwlock has invalid ID");
+        struct Callback<'tcx> {
+            id: MutexId,
+            retval: Scalar<Provenance>,
+            dest: MPlaceTy<'tcx, Provenance>,
+        }
+        impl<'tcx> VisitProvenance for Callback<'tcx> {
+            fn visit_provenance(&self, visit: &mut VisitWith<'_>) {
+                let Callback { id: _, retval, dest } = self;
+                retval.visit_provenance(visit);
+                dest.visit_provenance(visit);
+            }
+        }
+        impl<'mir, 'tcx: 'mir> UnblockCallback<'mir, 'tcx> for Callback<'tcx> {
+            fn unblock(self: Box<Self>, this: &mut MiriInterpCx<'mir, 'tcx>) -> InterpResult<'tcx> {
+                assert!(!this.mutex_is_locked(self.id));
+                this.mutex_lock(self.id);
+
+                this.write_scalar(self.retval, &self.dest)?;
+                Ok(())
             }
-            Ok(old)
-        } else {
-            let new_index = this.machine.sync.rwlocks.push(Default::default());
-            assert_eq!(next_index, new_index);
-            Ok(new_index)
         }
     }
 
@@ -437,23 +434,24 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
 
     /// Read-lock the lock by adding the `reader` the list of threads that own
     /// this lock.
-    fn rwlock_reader_lock(&mut self, id: RwLockId, reader: ThreadId) {
+    fn rwlock_reader_lock(&mut self, id: RwLockId) {
         let this = self.eval_context_mut();
+        let thread = this.active_thread();
         assert!(!this.rwlock_is_write_locked(id), "the lock is write locked");
-        trace!("rwlock_reader_lock: {:?} now also held (one more time) by {:?}", id, reader);
+        trace!("rwlock_reader_lock: {:?} now also held (one more time) by {:?}", id, thread);
         let rwlock = &mut this.machine.sync.rwlocks[id];
-        let count = rwlock.readers.entry(reader).or_insert(0);
+        let count = rwlock.readers.entry(thread).or_insert(0);
         *count = count.checked_add(1).expect("the reader counter overflowed");
         if let Some(data_race) = &this.machine.data_race {
-            data_race.acquire_clock(&rwlock.clock_unlocked, reader);
+            data_race.acquire_clock(&rwlock.clock_unlocked, &this.machine.threads);
         }
     }
 
     /// Try read-unlock the lock for the current threads and potentially give the lock to a new owner.
     /// Returns `true` if succeeded, `false` if this `reader` did not hold the lock.
-    fn rwlock_reader_unlock(&mut self, id: RwLockId) -> bool {
+    fn rwlock_reader_unlock(&mut self, id: RwLockId) -> InterpResult<'tcx, bool> {
         let this = self.eval_context_mut();
-        let thread = this.get_active_thread();
+        let thread = this.active_thread();
         let rwlock = &mut this.machine.sync.rwlocks[id];
         match rwlock.readers.entry(thread) {
             Entry::Occupied(mut entry) => {
@@ -467,7 +465,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
                     trace!("rwlock_reader_unlock: {:?} held one less time by {:?}", id, thread);
                 }
             }
-            Entry::Vacant(_) => return false, // we did not even own this lock
+            Entry::Vacant(_) => return Ok(false), // we did not even own this lock
         }
         if let Some(data_race) = &this.machine.data_race {
             // Add this to the shared-release clock of all concurrent readers.
@@ -481,48 +479,79 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
             // happen-before the writers
             let rwlock = &mut this.machine.sync.rwlocks[id];
             rwlock.clock_unlocked.clone_from(&rwlock.clock_current_readers);
-            this.rwlock_dequeue_and_lock_writer(id);
+            // See if there is a thread to unblock.
+            if let Some(writer) = rwlock.writer_queue.pop_front() {
+                this.unblock_thread(writer, BlockReason::RwLock(id))?;
+            }
         }
-        true
+        Ok(true)
     }
 
     /// Put the reader in the queue waiting for the lock and block it.
+    /// Once the lock becomes available, `retval` will be written to `dest`.
     #[inline]
-    fn rwlock_enqueue_and_block_reader(&mut self, id: RwLockId, reader: ThreadId) {
+    fn rwlock_enqueue_and_block_reader(
+        &mut self,
+        id: RwLockId,
+        retval: Scalar<Provenance>,
+        dest: MPlaceTy<'tcx, Provenance>,
+    ) {
         let this = self.eval_context_mut();
+        let thread = this.active_thread();
         assert!(this.rwlock_is_write_locked(id), "read-queueing on not write locked rwlock");
-        this.machine.sync.rwlocks[id].reader_queue.push_back(reader);
-        this.block_thread(reader, BlockReason::RwLock(id));
+        this.machine.sync.rwlocks[id].reader_queue.push_back(thread);
+        this.block_thread(BlockReason::RwLock(id), None, Callback { id, retval, dest });
+
+        struct Callback<'tcx> {
+            id: RwLockId,
+            retval: Scalar<Provenance>,
+            dest: MPlaceTy<'tcx, Provenance>,
+        }
+        impl<'tcx> VisitProvenance for Callback<'tcx> {
+            fn visit_provenance(&self, visit: &mut VisitWith<'_>) {
+                let Callback { id: _, retval, dest } = self;
+                retval.visit_provenance(visit);
+                dest.visit_provenance(visit);
+            }
+        }
+        impl<'mir, 'tcx: 'mir> UnblockCallback<'mir, 'tcx> for Callback<'tcx> {
+            fn unblock(self: Box<Self>, this: &mut MiriInterpCx<'mir, 'tcx>) -> InterpResult<'tcx> {
+                this.rwlock_reader_lock(self.id);
+                this.write_scalar(self.retval, &self.dest)?;
+                Ok(())
+            }
+        }
     }
 
     /// Lock by setting the writer that owns the lock.
     #[inline]
-    fn rwlock_writer_lock(&mut self, id: RwLockId, writer: ThreadId) {
+    fn rwlock_writer_lock(&mut self, id: RwLockId) {
         let this = self.eval_context_mut();
+        let thread = this.active_thread();
         assert!(!this.rwlock_is_locked(id), "the rwlock is already locked");
-        trace!("rwlock_writer_lock: {:?} now held by {:?}", id, writer);
+        trace!("rwlock_writer_lock: {:?} now held by {:?}", id, thread);
         let rwlock = &mut this.machine.sync.rwlocks[id];
-        rwlock.writer = Some(writer);
+        rwlock.writer = Some(thread);
         if let Some(data_race) = &this.machine.data_race {
-            data_race.acquire_clock(&rwlock.clock_unlocked, writer);
+            data_race.acquire_clock(&rwlock.clock_unlocked, &this.machine.threads);
         }
     }
 
     /// Try to unlock an rwlock held by the current thread.
     /// Return `false` if it is held by another thread.
     #[inline]
-    fn rwlock_writer_unlock(&mut self, id: RwLockId) -> bool {
+    fn rwlock_writer_unlock(&mut self, id: RwLockId) -> InterpResult<'tcx, bool> {
         let this = self.eval_context_mut();
-        let thread = this.get_active_thread();
+        let thread = this.active_thread();
         let rwlock = &mut this.machine.sync.rwlocks[id];
-        if let Some(current_writer) = rwlock.writer {
+        Ok(if let Some(current_writer) = rwlock.writer {
             if current_writer != thread {
                 // Only the owner can unlock the rwlock.
-                return false;
+                return Ok(false);
             }
             rwlock.writer = None;
             trace!("rwlock_writer_unlock: {:?} unlocked by {:?}", id, thread);
-            // Release memory to next lock holder.
+            // Record release clock for next lock holder.
             if let Some(data_race) = &this.machine.data_race {
                 rwlock.clock_unlocked.clone_from(&*data_race.release_clock(&this.machine.threads));
             }
@@ -531,50 +560,54 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
             // We are prioritizing writers here against the readers. As a
             // result, not only readers can starve writers, but also writers can
             // starve readers.
-            if this.rwlock_dequeue_and_lock_writer(id) {
-                // Someone got the write lock, nice.
+            if let Some(writer) = rwlock.writer_queue.pop_front() {
+                this.unblock_thread(writer, BlockReason::RwLock(id))?;
             } else {
-                // Give the lock to all readers.
-                while this.rwlock_dequeue_and_lock_reader(id) {
-                    // Rinse and repeat.
+                // Take the entire read queue and wake them all up.
+                let readers = std::mem::take(&mut rwlock.reader_queue);
+                for reader in readers {
+                    this.unblock_thread(reader, BlockReason::RwLock(id))?;
                 }
             }
             true
         } else {
             false
-        }
+        })
     }
 
     /// Put the writer in the queue waiting for the lock.
+    /// Once the lock becomes available, `retval` will be written to `dest`.
     #[inline]
-    fn rwlock_enqueue_and_block_writer(&mut self, id: RwLockId, writer: ThreadId) {
+    fn rwlock_enqueue_and_block_writer(
+        &mut self,
+        id: RwLockId,
+        retval: Scalar<Provenance>,
+        dest: MPlaceTy<'tcx, Provenance>,
+    ) {
         let this = self.eval_context_mut();
         assert!(this.rwlock_is_locked(id), "write-queueing on unlocked rwlock");
-        this.machine.sync.rwlocks[id].writer_queue.push_back(writer);
-        this.block_thread(writer, BlockReason::RwLock(id));
-    }
-
-    /// Provides the closure with the next CondvarId. Creates that Condvar if the closure returns None,
-    /// otherwise returns the value from the closure
-    #[inline]
-    fn condvar_get_or_create<F>(&mut self, existing: F) -> InterpResult<'tcx, CondvarId>
-    where
-        F: FnOnce(
-            &mut MiriInterpCx<'mir, 'tcx>,
-            CondvarId,
-        ) -> InterpResult<'tcx, Option<CondvarId>>,
-    {
-        let this = self.eval_context_mut();
-        let next_index = this.machine.sync.condvars.next_index();
-        if let Some(old) = existing(this, next_index)? {
-            if this.machine.sync.condvars.get(old).is_none() {
-                throw_ub_format!("condvar has invalid ID");
+        let thread = this.active_thread();
+        this.machine.sync.rwlocks[id].writer_queue.push_back(thread);
+        this.block_thread(BlockReason::RwLock(id), None, Callback { id, retval, dest });
+
+        struct Callback<'tcx> {
+            id: RwLockId,
+            retval: Scalar<Provenance>,
+            dest: MPlaceTy<'tcx, Provenance>,
+        }
+        impl<'tcx> VisitProvenance for Callback<'tcx> {
+            fn visit_provenance(&self, visit: &mut VisitWith<'_>) {
+                let Callback { id: _, retval, dest } = self;
+                retval.visit_provenance(visit);
+                dest.visit_provenance(visit);
+            }
+        }
+        impl<'mir, 'tcx: 'mir> UnblockCallback<'mir, 'tcx> for Callback<'tcx> {
+            fn unblock(self: Box<Self>, this: &mut MiriInterpCx<'mir, 'tcx>) -> InterpResult<'tcx> {
+                this.rwlock_writer_lock(self.id);
+                this.write_scalar(self.retval, &self.dest)?;
+                Ok(())
             }
-            Ok(old)
-        } else {
-            let new_index = this.machine.sync.condvars.push(Default::default());
-            assert_eq!(next_index, new_index);
-            Ok(new_index)
         }
     }
 
@@ -585,17 +618,106 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
         !this.machine.sync.condvars[id].waiters.is_empty()
     }
 
-    /// Mark that the thread is waiting on the conditional variable.
-    fn condvar_wait(&mut self, id: CondvarId, thread: ThreadId, lock: MutexId) {
+    /// Release the mutex and let the current thread wait on the given condition variable.
+    /// Once it is signaled, the mutex will be acquired and `retval_succ` will be written to `dest`.
+    /// If the timeout happens first, `retval_timeout` will be written to `dest`.
+    fn condvar_wait(
+        &mut self,
+        condvar: CondvarId,
+        mutex: MutexId,
+        timeout: Option<Timeout>,
+        retval_succ: Scalar<Provenance>,
+        retval_timeout: Scalar<Provenance>,
+        dest: MPlaceTy<'tcx, Provenance>,
+    ) -> InterpResult<'tcx> {
         let this = self.eval_context_mut();
-        let waiters = &mut this.machine.sync.condvars[id].waiters;
-        assert!(waiters.iter().all(|waiter| waiter.thread != thread), "thread is already waiting");
-        waiters.push_back(CondvarWaiter { thread, lock });
+        if let Some(old_locked_count) = this.mutex_unlock(mutex)? {
+            if old_locked_count != 1 {
+                throw_unsup_format!(
+                    "awaiting a condvar on a mutex acquired multiple times is not supported"
+                );
+            }
+        } else {
+            throw_ub_format!(
+                "awaiting a condvar on a mutex that is unlocked or owned by a different thread"
+            );
+        }
+        let thread = this.active_thread();
+        let waiters = &mut this.machine.sync.condvars[condvar].waiters;
+        waiters.push_back(thread);
+        this.block_thread(
+            BlockReason::Condvar(condvar),
+            timeout,
+            Callback { condvar, mutex, retval_succ, retval_timeout, dest },
+        );
+        return Ok(());
+
+        struct Callback<'tcx> {
+            condvar: CondvarId,
+            mutex: MutexId,
+            retval_succ: Scalar<Provenance>,
+            retval_timeout: Scalar<Provenance>,
+            dest: MPlaceTy<'tcx, Provenance>,
+        }
+        impl<'tcx> VisitProvenance for Callback<'tcx> {
+            fn visit_provenance(&self, visit: &mut VisitWith<'_>) {
+                let Callback { condvar: _, mutex: _, retval_succ, retval_timeout, dest } = self;
+                retval_succ.visit_provenance(visit);
+                retval_timeout.visit_provenance(visit);
+                dest.visit_provenance(visit);
+            }
+        }
+        impl<'tcx, 'mir> Callback<'tcx> {
+            #[allow(clippy::boxed_local)]
+            fn reacquire_mutex(
+                self: Box<Self>,
+                this: &mut MiriInterpCx<'mir, 'tcx>,
+                retval: Scalar<Provenance>,
+            ) -> InterpResult<'tcx> {
+                if this.mutex_is_locked(self.mutex) {
+                    assert_ne!(this.mutex_get_owner(self.mutex), this.active_thread());
+                    this.mutex_enqueue_and_block(self.mutex, retval, self.dest);
+                } else {
+                    // We can have it right now!
+                    this.mutex_lock(self.mutex);
+                    // Don't forget to write the return value.
+                    this.write_scalar(retval, &self.dest)?;
+                }
+                Ok(())
+            }
+        }
+        impl<'mir, 'tcx: 'mir> UnblockCallback<'mir, 'tcx> for Callback<'tcx> {
+            fn unblock(self: Box<Self>, this: &mut MiriInterpCx<'mir, 'tcx>) -> InterpResult<'tcx> {
+                // The condvar was signaled. Make sure we get the clock for that.
+                if let Some(data_race) = &this.machine.data_race {
+                    data_race.acquire_clock(
+                        &this.machine.sync.condvars[self.condvar].clock,
+                        &this.machine.threads,
+                    );
+                }
+                // Try to acquire the mutex.
+                // The timeout only applies to the first wait (until the signal), not for mutex acquisition.
+                let retval = self.retval_succ;
+                self.reacquire_mutex(this, retval)
+            }
+            fn timeout(
+                self: Box<Self>,
+                this: &mut InterpCx<'mir, 'tcx, MiriMachine<'mir, 'tcx>>,
+            ) -> InterpResult<'tcx> {
+                // We have to remove the waiter from the queue again.
+                let thread = this.active_thread();
+                let waiters = &mut this.machine.sync.condvars[self.condvar].waiters;
+                waiters.retain(|waiter| *waiter != thread);
+                // Now get back the lock.
+                let retval = self.retval_timeout;
+                self.reacquire_mutex(this, retval)
+            }
+        }
     }
 
     /// Wake up some thread (if there is any) sleeping on the conditional
-    /// variable.
-    fn condvar_signal(&mut self, id: CondvarId) -> Option<(ThreadId, MutexId)> {
+    /// variable. Returns `true` iff any thread was woken up.
+    fn condvar_signal(&mut self, id: CondvarId) -> InterpResult<'tcx, bool> {
         let this = self.eval_context_mut();
         let condvar = &mut this.machine.sync.condvars[id];
         let data_race = &this.machine.data_race;
@@ -604,32 +726,87 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
         if let Some(data_race) = data_race {
             condvar.clock.clone_from(&*data_race.release_clock(&this.machine.threads));
         }
-        condvar.waiters.pop_front().map(|waiter| {
-            if let Some(data_race) = data_race {
-                data_race.acquire_clock(&condvar.clock, waiter.thread);
-            }
-            (waiter.thread, waiter.lock)
-        })
-    }
-
-    #[inline]
-    /// Remove the thread from the queue of threads waiting on this conditional variable.
-    fn condvar_remove_waiter(&mut self, id: CondvarId, thread: ThreadId) {
-        let this = self.eval_context_mut();
-        this.machine.sync.condvars[id].waiters.retain(|waiter| waiter.thread != thread);
+        let Some(waiter) = condvar.waiters.pop_front() else {
+            return Ok(false);
+        };
+        this.unblock_thread(waiter, BlockReason::Condvar(id))?;
+        Ok(true)
     }
 
-    fn futex_wait(&mut self, addr: u64, thread: ThreadId, bitset: u32) {
+    /// Wait for the futex to be signaled, or a timeout.
+    /// On a signal, `retval_succ` is written to `dest`.
+    /// On a timeout, `retval_timeout` is written to `dest` and `errno_timeout` is set as the last error.
+    fn futex_wait(
+        &mut self,
+        addr: u64,
+        bitset: u32,
+        timeout: Option<Timeout>,
+        retval_succ: Scalar<Provenance>,
+        retval_timeout: Scalar<Provenance>,
+        dest: MPlaceTy<'tcx, Provenance>,
+        errno_timeout: Scalar<Provenance>,
+    ) {
         let this = self.eval_context_mut();
+        let thread = this.active_thread();
         let futex = &mut this.machine.sync.futexes.entry(addr).or_default();
         let waiters = &mut futex.waiters;
         assert!(waiters.iter().all(|waiter| waiter.thread != thread), "thread is already waiting");
         waiters.push_back(FutexWaiter { thread, bitset });
+        this.block_thread(
+            BlockReason::Futex { addr },
+            timeout,
+            Callback { addr, retval_succ, retval_timeout, dest, errno_timeout },
+        );
+
+        struct Callback<'tcx> {
+            addr: u64,
+            retval_succ: Scalar<Provenance>,
+            retval_timeout: Scalar<Provenance>,
+            dest: MPlaceTy<'tcx, Provenance>,
+            errno_timeout: Scalar<Provenance>,
+        }
+        impl<'tcx> VisitProvenance for Callback<'tcx> {
+            fn visit_provenance(&self, visit: &mut VisitWith<'_>) {
+                let Callback { addr: _, retval_succ, retval_timeout, dest, errno_timeout } = self;
+                retval_succ.visit_provenance(visit);
+                retval_timeout.visit_provenance(visit);
+                dest.visit_provenance(visit);
+                errno_timeout.visit_provenance(visit);
+            }
+        }
+        impl<'mir, 'tcx: 'mir> UnblockCallback<'mir, 'tcx> for Callback<'tcx> {
+            fn unblock(self: Box<Self>, this: &mut MiriInterpCx<'mir, 'tcx>) -> InterpResult<'tcx> {
+                let futex = this.machine.sync.futexes.get(&self.addr).unwrap();
+                // Acquire the clock of the futex.
+                if let Some(data_race) = &this.machine.data_race {
+                    data_race.acquire_clock(&futex.clock, &this.machine.threads);
+                }
+                // Write the return value.
+                this.write_scalar(self.retval_succ, &self.dest)?;
+                Ok(())
+            }
+            fn timeout(
+                self: Box<Self>,
+                this: &mut InterpCx<'mir, 'tcx, MiriMachine<'mir, 'tcx>>,
+            ) -> InterpResult<'tcx> {
+                // Remove the waiter from the futex.
+                let thread = this.active_thread();
+                let futex = this.machine.sync.futexes.get_mut(&self.addr).unwrap();
+                futex.waiters.retain(|waiter| waiter.thread != thread);
+                // Set errno and write return value.
+                this.set_last_error(self.errno_timeout)?;
+                this.write_scalar(self.retval_timeout, &self.dest)?;
+                Ok(())
+            }
+        }
     }
 
-    fn futex_wake(&mut self, addr: u64, bitset: u32) -> Option<ThreadId> {
+    /// Returns whether anything was woken.
+    fn futex_wake(&mut self, addr: u64, bitset: u32) -> InterpResult<'tcx, bool> {
         let this = self.eval_context_mut();
-        let futex = &mut this.machine.sync.futexes.get_mut(&addr)?;
+        let Some(futex) = this.machine.sync.futexes.get_mut(&addr) else {
+            return Ok(false);
+        };
         let data_race = &this.machine.data_race;
 
         // Each futex-wake happens-before the end of the futex wait
@@ -638,19 +815,11 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
         }
 
         // Wake up the first thread in the queue that matches any of the bits in the bitset.
-        futex.waiters.iter().position(|w| w.bitset & bitset != 0).map(|i| {
-            let waiter = futex.waiters.remove(i).unwrap();
-            if let Some(data_race) = data_race {
-                data_race.acquire_clock(&futex.clock, waiter.thread);
-            }
-            waiter.thread
-        })
-    }
-
-    fn futex_remove_waiter(&mut self, addr: u64, thread: ThreadId) {
-        let this = self.eval_context_mut();
-        if let Some(futex) = this.machine.sync.futexes.get_mut(&addr) {
-            futex.waiters.retain(|waiter| waiter.thread != thread);
-        }
+        let Some(i) = futex.waiters.iter().position(|w| w.bitset & bitset != 0) else {
+            return Ok(false);
+        };
+        let waiter = futex.waiters.remove(i).unwrap();
+        this.unblock_thread(waiter.thread, BlockReason::Futex { addr })?;
+        Ok(true)
     }
 }
diff --git a/src/tools/miri/src/concurrency/thread.rs b/src/tools/miri/src/concurrency/thread.rs
index cdc9cba6646..43aac8a3243 100644
--- a/src/tools/miri/src/concurrency/thread.rs
+++ b/src/tools/miri/src/concurrency/thread.rs
@@ -1,7 +1,6 @@
 //! Implements threads.
 
-use std::cell::RefCell;
-use std::collections::hash_map::Entry;
+use std::mem;
 use std::num::TryFromIntError;
 use std::sync::atomic::Ordering::Relaxed;
 use std::task::Poll;
@@ -41,12 +40,23 @@ pub enum TlsAllocAction {
     Leak,
 }
 
-/// Trait for callbacks that can be executed when some event happens, such as after a timeout.
-pub trait MachineCallback<'mir, 'tcx>: VisitProvenance {
-    fn call(&self, ecx: &mut InterpCx<'mir, 'tcx, MiriMachine<'mir, 'tcx>>) -> InterpResult<'tcx>;
-}
+/// Trait for callbacks that are executed when a thread gets unblocked.
+pub trait UnblockCallback<'mir, 'tcx>: VisitProvenance {
+    fn unblock(
+        self: Box<Self>,
+        ecx: &mut InterpCx<'mir, 'tcx, MiriMachine<'mir, 'tcx>>,
+    ) -> InterpResult<'tcx>;
 
-type TimeoutCallback<'mir, 'tcx> = Box<dyn MachineCallback<'mir, 'tcx> + 'tcx>;
+    fn timeout(
+        self: Box<Self>,
+        _ecx: &mut InterpCx<'mir, 'tcx, MiriMachine<'mir, 'tcx>>,
+    ) -> InterpResult<'tcx> {
+        unreachable!(
+            "timeout on a thread that was blocked without a timeout (or someone forgot to overwrite this method)"
+        )
+    }
+}
+type DynUnblockCallback<'mir, 'tcx> = Box<dyn UnblockCallback<'mir, 'tcx> + 'tcx>;
 
 /// A thread identifier.
 #[derive(Clone, Copy, Debug, PartialOrd, Ord, PartialEq, Eq, Hash)]
@@ -117,17 +127,45 @@ pub enum BlockReason {
 }
 
 /// The state of a thread.
-#[derive(Debug, Copy, Clone, PartialEq, Eq)]
-pub enum ThreadState {
+enum ThreadState<'mir, 'tcx> {
     /// The thread is enabled and can be executed.
     Enabled,
     /// The thread is blocked on something.
-    Blocked(BlockReason),
+    Blocked {
+        reason: BlockReason,
+        timeout: Option<Timeout>,
+        callback: DynUnblockCallback<'mir, 'tcx>,
+    },
     /// The thread has terminated its execution. We do not delete terminated
     /// threads (FIXME: why?).
     Terminated,
 }
 
+impl<'mir, 'tcx> std::fmt::Debug for ThreadState<'mir, 'tcx> {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        match self {
+            Self::Enabled => write!(f, "Enabled"),
+            Self::Blocked { reason, timeout, .. } =>
+                f.debug_struct("Blocked").field("reason", reason).field("timeout", timeout).finish(),
+            Self::Terminated => write!(f, "Terminated"),
+        }
+    }
+}
+
+impl<'mir, 'tcx> ThreadState<'mir, 'tcx> {
+    fn is_enabled(&self) -> bool {
+        matches!(self, ThreadState::Enabled)
+    }
+
+    fn is_terminated(&self) -> bool {
+        matches!(self, ThreadState::Terminated)
+    }
+
+    fn is_blocked_on(&self, reason: BlockReason) -> bool {
+        matches!(*self, ThreadState::Blocked { reason: actual_reason, .. } if actual_reason == reason)
+    }
+}
+
 /// The join status of a thread.
 #[derive(Debug, Copy, Clone, PartialEq, Eq)]
 enum ThreadJoinStatus {
@@ -142,7 +180,7 @@ enum ThreadJoinStatus {
 
 /// A thread.
 pub struct Thread<'mir, 'tcx> {
-    state: ThreadState,
+    state: ThreadState<'mir, 'tcx>,
 
     /// Name of the thread.
     thread_name: Option<Vec<u8>>,
@@ -323,41 +361,24 @@ impl VisitProvenance for Frame<'_, '_, Provenance, FrameExtra<'_>> {
     }
 }
 
-/// A specific moment in time.
+/// The moment in time when a blocked thread should be woken up.
 #[derive(Debug)]
-pub enum CallbackTime {
+pub enum Timeout {
     Monotonic(Instant),
     RealTime(SystemTime),
 }
 
-impl CallbackTime {
+impl Timeout {
     /// How long do we have to wait from now until the specified time?
     fn get_wait_time(&self, clock: &Clock) -> Duration {
         match self {
-            CallbackTime::Monotonic(instant) => instant.duration_since(clock.now()),
-            CallbackTime::RealTime(time) =>
-                time.duration_since(SystemTime::now()).unwrap_or(Duration::new(0, 0)),
+            Timeout::Monotonic(instant) => instant.duration_since(clock.now()),
+            Timeout::RealTime(time) =>
+                time.duration_since(SystemTime::now()).unwrap_or(Duration::ZERO),
         }
     }
 }
 
-/// Callbacks are used to implement timeouts. For example, waiting on a
-/// conditional variable with a timeout creates a callback that is called after
-/// the specified time and unblocks the thread. If another thread signals on the
-/// conditional variable, the signal handler deletes the callback.
-struct TimeoutCallbackInfo<'mir, 'tcx> {
-    /// The callback should be called no earlier than this time.
-    call_time: CallbackTime,
-    /// The called function.
-    callback: TimeoutCallback<'mir, 'tcx>,
-}
-
-impl<'mir, 'tcx> std::fmt::Debug for TimeoutCallbackInfo<'mir, 'tcx> {
-    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
-        write!(f, "TimeoutCallback({:?})", self.call_time)
-    }
-}
-
 /// A set of threads.
 #[derive(Debug)]
 pub struct ThreadManager<'mir, 'tcx> {
@@ -369,11 +390,9 @@ pub struct ThreadManager<'mir, 'tcx> {
     threads: IndexVec<ThreadId, Thread<'mir, 'tcx>>,
     /// A mapping from a thread-local static to an allocation id of a thread
     /// specific allocation.
-    thread_local_alloc_ids: RefCell<FxHashMap<(DefId, ThreadId), Pointer<Provenance>>>,
+    thread_local_alloc_ids: FxHashMap<(DefId, ThreadId), Pointer<Provenance>>,
     /// A flag that indicates that we should change the active thread.
     yield_active_thread: bool,
-    /// Callbacks that are called once the specified time passes.
-    timeout_callbacks: FxHashMap<ThreadId, TimeoutCallbackInfo<'mir, 'tcx>>,
 }
 
 impl VisitProvenance for ThreadManager<'_, '_> {
@@ -381,7 +400,6 @@ impl VisitProvenance for ThreadManager<'_, '_> {
         let ThreadManager {
             threads,
             thread_local_alloc_ids,
-            timeout_callbacks,
             active_thread: _,
             yield_active_thread: _,
         } = self;
@@ -389,12 +407,9 @@ impl VisitProvenance for ThreadManager<'_, '_> {
         for thread in threads {
             thread.visit_provenance(visit);
         }
-        for ptr in thread_local_alloc_ids.borrow().values() {
+        for ptr in thread_local_alloc_ids.values() {
             ptr.visit_provenance(visit);
         }
-        for callback in timeout_callbacks.values() {
-            callback.callback.visit_provenance(visit);
-        }
     }
 }
 
@@ -408,7 +423,6 @@ impl<'mir, 'tcx> Default for ThreadManager<'mir, 'tcx> {
             threads,
             thread_local_alloc_ids: Default::default(),
             yield_active_thread: false,
-            timeout_callbacks: FxHashMap::default(),
         }
     }
 }
@@ -430,18 +444,15 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> {
     /// Check if we have an allocation for the given thread local static for the
     /// active thread.
     fn get_thread_local_alloc_id(&self, def_id: DefId) -> Option<Pointer<Provenance>> {
-        self.thread_local_alloc_ids.borrow().get(&(def_id, self.active_thread)).cloned()
+        self.thread_local_alloc_ids.get(&(def_id, self.active_thread)).cloned()
     }
 
     /// Set the pointer for the allocation of the given thread local
     /// static for the active thread.
     ///
     /// Panics if a thread local is initialized twice for the same thread.
-    fn set_thread_local_alloc(&self, def_id: DefId, ptr: Pointer<Provenance>) {
-        self.thread_local_alloc_ids
-            .borrow_mut()
-            .try_insert((def_id, self.active_thread), ptr)
-            .unwrap();
+    fn set_thread_local_alloc(&mut self, def_id: DefId, ptr: Pointer<Provenance>) {
+        self.thread_local_alloc_ids.try_insert((def_id, self.active_thread), ptr).unwrap();
     }
 
     /// Borrow the stack of the active thread.
@@ -480,7 +491,7 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> {
     }
 
     /// Get the id of the currently active thread.
-    pub fn get_active_thread_id(&self) -> ThreadId {
+    pub fn active_thread(&self) -> ThreadId {
         self.active_thread
     }
 
@@ -492,17 +503,17 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> {
     /// Get the total of threads that are currently live, i.e., not yet terminated.
     /// (They might be blocked.)
     pub fn get_live_thread_count(&self) -> usize {
-        self.threads.iter().filter(|t| !matches!(t.state, ThreadState::Terminated)).count()
+        self.threads.iter().filter(|t| !t.state.is_terminated()).count()
     }
 
     /// Has the given thread terminated?
     fn has_terminated(&self, thread_id: ThreadId) -> bool {
-        self.threads[thread_id].state == ThreadState::Terminated
+        self.threads[thread_id].state.is_terminated()
     }
 
     /// Have all threads terminated?
     fn have_all_terminated(&self) -> bool {
-        self.threads.iter().all(|thread| thread.state == ThreadState::Terminated)
+        self.threads.iter().all(|thread| thread.state.is_terminated())
     }
 
     /// Enable the thread for execution. The thread must be terminated.
@@ -532,8 +543,7 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> {
     fn detach_thread(&mut self, id: ThreadId, allow_terminated_joined: bool) -> InterpResult<'tcx> {
         trace!("detaching {:?}", id);
 
-        let is_ub = if allow_terminated_joined && self.threads[id].state == ThreadState::Terminated
-        {
+        let is_ub = if allow_terminated_joined && self.threads[id].state.is_terminated() {
             // "Detached" in particular means "not yet joined". Redundant detaching is still UB.
             self.threads[id].join_status == ThreadJoinStatus::Detached
         } else {
@@ -561,15 +571,41 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> {
         // Mark the joined thread as being joined so that we detect if other
         // threads try to join it.
         self.threads[joined_thread_id].join_status = ThreadJoinStatus::Joined;
-        if self.threads[joined_thread_id].state != ThreadState::Terminated {
-            // The joined thread is still running, we need to wait for it.
-            self.active_thread_mut().state =
-                ThreadState::Blocked(BlockReason::Join(joined_thread_id));
+        if !self.threads[joined_thread_id].state.is_terminated() {
             trace!(
                 "{:?} blocked on {:?} when trying to join",
                 self.active_thread,
                 joined_thread_id
             );
+            // The joined thread is still running, we need to wait for it.
+            // Unce we get unblocked, perform the appropriate synchronization.
+            self.block_thread(
+                BlockReason::Join(joined_thread_id),
+                None,
+                Callback { joined_thread_id },
+            );
+
+            struct Callback {
+                joined_thread_id: ThreadId,
+            }
+            impl VisitProvenance for Callback {
+                fn visit_provenance(&self, _visit: &mut VisitWith<'_>) {}
+            }
+            impl<'mir, 'tcx: 'mir> UnblockCallback<'mir, 'tcx> for Callback {
+                fn unblock(
+                    self: Box<Self>,
+                    this: &mut MiriInterpCx<'mir, 'tcx>,
+                ) -> InterpResult<'tcx> {
+                    if let Some(data_race) = &mut this.machine.data_race {
+                        data_race.thread_joined(
+                            &this.machine.threads,
+                            this.machine.threads.active_thread(),
+                            self.joined_thread_id,
+                        );
+                    }
+                    Ok(())
+                }
+            }
         } else {
             // The thread has already terminated - mark join happens-before
             if let Some(data_race) = data_race {
@@ -596,9 +632,9 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> {
 
         // Sanity check `join_status`.
         assert!(
-            self.threads.iter().all(|thread| {
-                thread.state != ThreadState::Blocked(BlockReason::Join(joined_thread_id))
-            }),
+            self.threads
+                .iter()
+                .all(|thread| { !thread.state.is_blocked_on(BlockReason::Join(joined_thread_id)) }),
             "this thread already has threads waiting for its termination"
         );
 
@@ -620,18 +656,15 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> {
     }
 
     /// Put the thread into the blocked state.
-    fn block_thread(&mut self, thread: ThreadId, reason: BlockReason) {
-        let state = &mut self.threads[thread].state;
-        assert_eq!(*state, ThreadState::Enabled);
-        *state = ThreadState::Blocked(reason);
-    }
-
-    /// Put the blocked thread into the enabled state.
-    /// Sanity-checks that the thread previously was blocked for the right reason.
-    fn unblock_thread(&mut self, thread: ThreadId, reason: BlockReason) {
-        let state = &mut self.threads[thread].state;
-        assert_eq!(*state, ThreadState::Blocked(reason));
-        *state = ThreadState::Enabled;
+    fn block_thread(
+        &mut self,
+        reason: BlockReason,
+        timeout: Option<Timeout>,
+        callback: impl UnblockCallback<'mir, 'tcx> + 'tcx,
+    ) {
+        let state = &mut self.threads[self.active_thread].state;
+        assert!(state.is_enabled());
+        *state = ThreadState::Blocked { reason, timeout, callback: Box::new(callback) }
     }
 
     /// Change the active thread to some enabled thread.
@@ -642,87 +675,18 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> {
         self.yield_active_thread = true;
     }
 
-    /// Register the given `callback` to be called once the `call_time` passes.
-    ///
-    /// The callback will be called with `thread` being the active thread, and
-    /// the callback may not change the active thread.
-    fn register_timeout_callback(
-        &mut self,
-        thread: ThreadId,
-        call_time: CallbackTime,
-        callback: TimeoutCallback<'mir, 'tcx>,
-    ) {
-        self.timeout_callbacks
-            .try_insert(thread, TimeoutCallbackInfo { call_time, callback })
-            .unwrap();
-    }
-
-    /// Unregister the callback for the `thread`.
-    fn unregister_timeout_callback_if_exists(&mut self, thread: ThreadId) {
-        self.timeout_callbacks.remove(&thread);
-    }
-
-    /// Get a callback that is ready to be called.
-    fn get_ready_callback(
-        &mut self,
-        clock: &Clock,
-    ) -> Option<(ThreadId, TimeoutCallback<'mir, 'tcx>)> {
-        // We iterate over all threads in the order of their indices because
-        // this allows us to have a deterministic scheduler.
-        for thread in self.threads.indices() {
-            match self.timeout_callbacks.entry(thread) {
-                Entry::Occupied(entry) => {
-                    if entry.get().call_time.get_wait_time(clock) == Duration::new(0, 0) {
-                        return Some((thread, entry.remove().callback));
-                    }
-                }
-                Entry::Vacant(_) => {}
-            }
-        }
-        None
-    }
-
-    /// Wakes up threads joining on the active one and deallocates thread-local statics.
-    /// The `AllocId` that can now be freed are returned.
-    fn thread_terminated(
-        &mut self,
-        mut data_race: Option<&mut data_race::GlobalState>,
-        current_span: Span,
-    ) -> Vec<Pointer<Provenance>> {
-        let mut free_tls_statics = Vec::new();
-        {
-            let mut thread_local_statics = self.thread_local_alloc_ids.borrow_mut();
-            thread_local_statics.retain(|&(_def_id, thread), &mut alloc_id| {
-                if thread != self.active_thread {
-                    // Keep this static around.
-                    return true;
-                }
-                // Delete this static from the map and from memory.
-                // We cannot free directly here as we cannot use `?` in this context.
-                free_tls_statics.push(alloc_id);
-                false
-            });
-        }
-        // Set the thread into a terminated state in the data-race detector.
-        if let Some(ref mut data_race) = data_race {
-            data_race.thread_terminated(self, current_span);
-        }
-        // Check if we need to unblock any threads.
-        let mut joined_threads = vec![]; // store which threads joined, we'll need it
-        for (i, thread) in self.threads.iter_enumerated_mut() {
-            if thread.state == ThreadState::Blocked(BlockReason::Join(self.active_thread)) {
-                // The thread has terminated, mark happens-before edge to joining thread
-                if data_race.is_some() {
-                    joined_threads.push(i);
+    /// Get the wait time for the next timeout, or `None` if no timeout is pending.
+    fn next_callback_wait_time(&self, clock: &Clock) -> Option<Duration> {
+        self.threads
+            .iter()
+            .filter_map(|t| {
+                match &t.state {
+                    ThreadState::Blocked { timeout: Some(timeout), .. } =>
+                        Some(timeout.get_wait_time(clock)),
+                    _ => None,
                 }
-                trace!("unblocking {:?} because {:?} terminated", i, self.active_thread);
-                thread.state = ThreadState::Enabled;
-            }
-        }
-        for &i in &joined_threads {
-            data_race.as_mut().unwrap().thread_joined(self, i, self.active_thread);
-        }
-        free_tls_statics
+            })
+            .min()
     }
 
     /// Decide which action to take next and on which thread.
@@ -733,9 +697,7 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> {
     /// blocked, terminated, or has explicitly asked to be preempted).
     fn schedule(&mut self, clock: &Clock) -> InterpResult<'tcx, SchedulingAction> {
         // This thread and the program can keep going.
-        if self.threads[self.active_thread].state == ThreadState::Enabled
-            && !self.yield_active_thread
-        {
+        if self.threads[self.active_thread].state.is_enabled() && !self.yield_active_thread {
             // The currently active thread is still enabled, just continue with it.
             return Ok(SchedulingAction::ExecuteStep);
         }
@@ -745,9 +707,8 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> {
         // `pthread_cond_timedwait`, "an error is returned if [...] the absolute time specified by
         // abstime has already been passed at the time of the call".
         // <https://pubs.opengroup.org/onlinepubs/9699919799/functions/pthread_cond_timedwait.html>
-        let potential_sleep_time =
-            self.timeout_callbacks.values().map(|info| info.call_time.get_wait_time(clock)).min();
-        if potential_sleep_time == Some(Duration::new(0, 0)) {
+        let potential_sleep_time = self.next_callback_wait_time(clock);
+        if potential_sleep_time == Some(Duration::ZERO) {
             return Ok(SchedulingAction::ExecuteTimeoutCallback);
         }
         // No callbacks immediately scheduled, pick a regular thread to execute.
@@ -765,7 +726,7 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> {
             .chain(self.threads.iter_enumerated().take(self.active_thread.index()));
         for (id, thread) in threads {
             debug_assert_ne!(self.active_thread, id);
-            if thread.state == ThreadState::Enabled {
+            if thread.state.is_enabled() {
                 info!(
                     "---------- Now executing on thread `{}` (previous: `{}`) ----------------------------------------",
                     self.get_thread_display_name(id),
@@ -776,11 +737,11 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> {
             }
         }
         self.yield_active_thread = false;
-        if self.threads[self.active_thread].state == ThreadState::Enabled {
+        if self.threads[self.active_thread].state.is_enabled() {
             return Ok(SchedulingAction::ExecuteStep);
         }
         // We have not found a thread to execute.
-        if self.threads.iter().all(|thread| thread.state == ThreadState::Terminated) {
+        if self.threads.iter().all(|thread| thread.state.is_terminated()) {
             unreachable!("all threads terminated without the main thread terminating?!");
         } else if let Some(sleep_time) = potential_sleep_time {
             // All threads are currently blocked, but we have unexecuted
@@ -799,29 +760,40 @@ trait EvalContextPrivExt<'mir, 'tcx: 'mir>: MiriInterpCxExt<'mir, 'tcx> {
     #[inline]
     fn run_timeout_callback(&mut self) -> InterpResult<'tcx> {
         let this = self.eval_context_mut();
-        let (thread, callback) = if let Some((thread, callback)) =
-            this.machine.threads.get_ready_callback(&this.machine.clock)
-        {
-            (thread, callback)
-        } else {
-            // get_ready_callback can return None if the computer's clock
-            // was shifted after calling the scheduler and before the call
-            // to get_ready_callback (see issue
-            // https://github.com/rust-lang/miri/issues/1763). In this case,
-            // just do nothing, which effectively just returns to the
-            // scheduler.
-            return Ok(());
-        };
-        // This back-and-forth with `set_active_thread` is here because of two
-        // design decisions:
-        // 1. Make the caller and not the callback responsible for changing
-        //    thread.
-        // 2. Make the scheduler the only place that can change the active
-        //    thread.
-        let old_thread = this.set_active_thread(thread);
-        callback.call(this)?;
-        this.set_active_thread(old_thread);
-        Ok(())
+        let mut found_callback = None;
+        // Find a blocked thread that has timed out.
+        for (id, thread) in this.machine.threads.threads.iter_enumerated_mut() {
+            match &thread.state {
+                ThreadState::Blocked { timeout: Some(timeout), .. }
+                    if timeout.get_wait_time(&this.machine.clock) == Duration::ZERO =>
+                {
+                    let old_state = mem::replace(&mut thread.state, ThreadState::Enabled);
+                    let ThreadState::Blocked { callback, .. } = old_state else { unreachable!() };
+                    found_callback = Some((id, callback));
+                    // Run the fallback (after the loop because borrow-checking).
+                    break;
+                }
+                _ => {}
+            }
+        }
+        if let Some((thread, callback)) = found_callback {
+            // This back-and-forth with `set_active_thread` is here because of two
+            // design decisions:
+            // 1. Make the caller and not the callback responsible for changing
+            //    thread.
+            // 2. Make the scheduler the only place that can change the active
+            //    thread.
+            let old_thread = this.machine.threads.set_active_thread_id(thread);
+            callback.timeout(this)?;
+            this.machine.threads.set_active_thread_id(old_thread);
+        }
+        // found_callback can remain None if the computer's clock
+        // was shifted after calling the scheduler and before the call
+        // to get_ready_callback (see issue
+        // https://github.com/rust-lang/miri/issues/1763). In this case,
+        // just do nothing, which effectively just returns to the
+        // scheduler.
+        return Ok(());
     }
 
     #[inline]
@@ -904,7 +876,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
 
         // Finally switch to new thread so that we can push the first stackframe.
         // After this all accesses will be treated as occurring in the new thread.
-        let old_thread_id = this.set_active_thread(new_thread_id);
+        let old_thread_id = this.machine.threads.set_active_thread_id(new_thread_id);
 
         // Perform the function pointer load in the new thread frame.
         let instance = this.get_ptr_fn(start_routine)?.as_instance()?;
@@ -923,11 +895,110 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
         )?;
 
         // Restore the old active thread frame.
-        this.set_active_thread(old_thread_id);
+        this.machine.threads.set_active_thread_id(old_thread_id);
 
         Ok(new_thread_id)
     }
 
+    /// Handles thread termination of the active thread: wakes up threads joining on this one,
+    /// and deals with the thread's thread-local statics according to `tls_alloc_action`.
+    ///
+    /// This is called by the eval loop when a thread's on_stack_empty returns `Ready`.
+    fn terminate_active_thread(&mut self, tls_alloc_action: TlsAllocAction) -> InterpResult<'tcx> {
+        let this = self.eval_context_mut();
+        // Mark thread as terminated.
+        let thread = this.active_thread_mut();
+        assert!(thread.stack.is_empty(), "only threads with an empty stack can be terminated");
+        thread.state = ThreadState::Terminated;
+        if let Some(ref mut data_race) = this.machine.data_race {
+            data_race.thread_terminated(&this.machine.threads);
+        }
+        // Deallocate TLS.
+        let gone_thread = this.active_thread();
+        {
+            let mut free_tls_statics = Vec::new();
+            this.machine.threads.thread_local_alloc_ids.retain(
+                |&(_def_id, thread), &mut alloc_id| {
+                    if thread != gone_thread {
+                        // A different thread, keep this static around.
+                        return true;
+                    }
+                    // Delete this static from the map and from memory.
+                    // We cannot free directly here as we cannot use `?` in this context.
+                    free_tls_statics.push(alloc_id);
+                    false
+                },
+            );
+            // Now free the TLS statics.
+            for ptr in free_tls_statics {
+                match tls_alloc_action {
+                    TlsAllocAction::Deallocate =>
+                        this.deallocate_ptr(ptr.into(), None, MiriMemoryKind::Tls.into())?,
+                    TlsAllocAction::Leak =>
+                        if let Some(alloc) = ptr.provenance.get_alloc_id() {
+                            trace!(
+                                "Thread-local static leaked and stored as static root: {:?}",
+                                alloc
+                            );
+                            this.machine.static_roots.push(alloc);
+                        },
+                }
+            }
+        }
+        // Unblock joining threads.
+        let unblock_reason = BlockReason::Join(gone_thread);
+        let threads = &this.machine.threads.threads;
+        let joining_threads = threads
+            .iter_enumerated()
+            .filter(|(_, thread)| thread.state.is_blocked_on(unblock_reason))
+            .map(|(id, _)| id)
+            .collect::<Vec<_>>();
+        for thread in joining_threads {
+            this.unblock_thread(thread, unblock_reason)?;
+        }
+
+        Ok(())
+    }
+
+    /// Block the current thread, with an optional timeout.
+    /// The callback will be invoked when the thread gets unblocked.
+    #[inline]
+    fn block_thread(
+        &mut self,
+        reason: BlockReason,
+        timeout: Option<Timeout>,
+        callback: impl UnblockCallback<'mir, 'tcx> + 'tcx,
+    ) {
+        let this = self.eval_context_mut();
+        if !this.machine.communicate() && matches!(timeout, Some(Timeout::RealTime(..))) {
+            panic!("cannot have `RealTime` callback with isolation enabled!")
+        }
+        this.machine.threads.block_thread(reason, timeout, callback);
+    }
+
+    /// Put the blocked thread into the enabled state.
+    /// Sanity-checks that the thread previously was blocked for the right reason.
+    fn unblock_thread(&mut self, thread: ThreadId, reason: BlockReason) -> InterpResult<'tcx> {
+        let this = self.eval_context_mut();
+        let old_state =
+            mem::replace(&mut this.machine.threads.threads[thread].state, ThreadState::Enabled);
+        let callback = match old_state {
+            ThreadState::Blocked { reason: actual_reason, callback, .. } => {
+                assert_eq!(
+                    reason, actual_reason,
+                    "unblock_thread: thread was blocked for the wrong reason"
+                );
+                callback
+            }
+            _ => panic!("unblock_thread: thread was not blocked"),
+        };
+        // The callback must be executed in the previously blocked thread.
+        let old_thread = this.machine.threads.set_active_thread_id(thread);
+        callback.unblock(this)?;
+        this.machine.threads.set_active_thread_id(old_thread);
+        Ok(())
+    }
+
     #[inline]
     fn detach_thread(
         &mut self,
@@ -955,15 +1026,9 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
     }
 
     #[inline]
-    fn set_active_thread(&mut self, thread_id: ThreadId) -> ThreadId {
-        let this = self.eval_context_mut();
-        this.machine.threads.set_active_thread_id(thread_id)
-    }
-
-    #[inline]
-    fn get_active_thread(&self) -> ThreadId {
+    fn active_thread(&self) -> ThreadId {
         let this = self.eval_context_ref();
-        this.machine.threads.get_active_thread_id()
+        this.machine.threads.active_thread()
     }
 
     #[inline]
@@ -1026,16 +1091,6 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
     }
 
     #[inline]
-    fn block_thread(&mut self, thread: ThreadId, reason: BlockReason) {
-        self.eval_context_mut().machine.threads.block_thread(thread, reason);
-    }
-
-    #[inline]
-    fn unblock_thread(&mut self, thread: ThreadId, reason: BlockReason) {
-        self.eval_context_mut().machine.threads.unblock_thread(thread, reason);
-    }
-
-    #[inline]
     fn yield_active_thread(&mut self) {
         self.eval_context_mut().machine.threads.yield_active_thread();
     }
@@ -1050,26 +1105,6 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
         }
     }
 
-    #[inline]
-    fn register_timeout_callback(
-        &mut self,
-        thread: ThreadId,
-        call_time: CallbackTime,
-        callback: TimeoutCallback<'mir, 'tcx>,
-    ) {
-        let this = self.eval_context_mut();
-        if !this.machine.communicate() && matches!(call_time, CallbackTime::RealTime(..)) {
-            panic!("cannot have `RealTime` callback with isolation enabled!")
-        }
-        this.machine.threads.register_timeout_callback(thread, call_time, callback);
-    }
-
-    #[inline]
-    fn unregister_timeout_callback_if_exists(&mut self, thread: ThreadId) {
-        let this = self.eval_context_mut();
-        this.machine.threads.unregister_timeout_callback_if_exists(thread);
-    }
-
     /// Run the core interpreter loop. Returns only when an interrupt occurs (an error or program
     /// termination).
     fn run_threads(&mut self) -> InterpResult<'tcx, !> {
@@ -1099,32 +1134,4 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
             }
         }
     }
-
-    /// Handles thread termination of the active thread: wakes up threads joining on this one,
-    /// and deals with the thread's thread-local statics according to `tls_alloc_action`.
-    ///
-    /// This is called by the eval loop when a thread's on_stack_empty returns `Ready`.
-    #[inline]
-    fn terminate_active_thread(&mut self, tls_alloc_action: TlsAllocAction) -> InterpResult<'tcx> {
-        let this = self.eval_context_mut();
-        let thread = this.active_thread_mut();
-        assert!(thread.stack.is_empty(), "only threads with an empty stack can be terminated");
-        thread.state = ThreadState::Terminated;
-
-        let current_span = this.machine.current_span();
-        let thread_local_allocations =
-            this.machine.threads.thread_terminated(this.machine.data_race.as_mut(), current_span);
-        for ptr in thread_local_allocations {
-            match tls_alloc_action {
-                TlsAllocAction::Deallocate =>
-                    this.deallocate_ptr(ptr.into(), None, MiriMemoryKind::Tls.into())?,
-                TlsAllocAction::Leak =>
-                    if let Some(alloc) = ptr.provenance.get_alloc_id() {
-                        trace!("Thread-local static leaked and stored as static root: {:?}", alloc);
-                        this.machine.static_roots.push(alloc);
-                    },
-            }
-        }
-        Ok(())
-    }
 }
diff --git a/src/tools/miri/src/diagnostics.rs b/src/tools/miri/src/diagnostics.rs
index 189c4a20bf6..468bbb85ddd 100644
--- a/src/tools/miri/src/diagnostics.rs
+++ b/src/tools/miri/src/diagnostics.rs
@@ -411,7 +411,7 @@ pub fn report_error<'tcx, 'mir>(
         vec![],
         helps,
         &stacktrace,
-        Some(ecx.get_active_thread()),
+        Some(ecx.active_thread()),
         &ecx.machine,
     );
 
@@ -419,7 +419,7 @@ pub fn report_error<'tcx, 'mir>(
 
     if show_all_threads {
         for (thread, stack) in ecx.machine.threads.all_stacks() {
-            if thread != ecx.get_active_thread() {
+            if thread != ecx.active_thread() {
                 let stacktrace = Frame::generate_stacktrace_from_stack(stack);
                 let (stacktrace, was_pruned) = prune_stacktrace(stacktrace, &ecx.machine);
                 any_pruned |= was_pruned;
@@ -684,7 +684,7 @@ impl<'mir, 'tcx> MiriMachine<'mir, 'tcx> {
             notes,
             helps,
             &stacktrace,
-            Some(self.threads.get_active_thread_id()),
+            Some(self.threads.active_thread()),
             self,
         );
     }
@@ -712,7 +712,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
             vec![],
             vec![],
             &stacktrace,
-            Some(this.get_active_thread()),
+            Some(this.active_thread()),
             &this.machine,
         );
     }
diff --git a/src/tools/miri/src/lib.rs b/src/tools/miri/src/lib.rs
index 1a663ba0704..2fa88b6a246 100644
--- a/src/tools/miri/src/lib.rs
+++ b/src/tools/miri/src/lib.rs
@@ -123,7 +123,8 @@ pub use crate::concurrency::{
     init_once::{EvalContextExt as _, InitOnceId},
     sync::{CondvarId, EvalContextExt as _, MutexId, RwLockId, SynchronizationObjects},
     thread::{
-        BlockReason, CallbackTime, EvalContextExt as _, StackEmptyCallback, ThreadId, ThreadManager,
+        BlockReason, EvalContextExt as _, StackEmptyCallback, ThreadId, ThreadManager, Timeout,
+        UnblockCallback,
     },
 };
 pub use crate::diagnostics::{
diff --git a/src/tools/miri/src/machine.rs b/src/tools/miri/src/machine.rs
index cd9ab03dc68..e09fd2122f8 100644
--- a/src/tools/miri/src/machine.rs
+++ b/src/tools/miri/src/machine.rs
@@ -473,7 +473,7 @@ pub struct MiriMachine<'mir, 'tcx> {
     /// The set of threads.
     pub(crate) threads: ThreadManager<'mir, 'tcx>,
     /// The state of the primitive synchronization objects.
-    pub(crate) sync: SynchronizationObjects<'mir, 'tcx>,
+    pub(crate) sync: SynchronizationObjects,
 
     /// Precomputed `TyLayout`s for primitive data types that are commonly used inside Miri.
     pub(crate) layouts: PrimitiveLayouts<'tcx>,
@@ -770,7 +770,7 @@ impl VisitProvenance for MiriMachine<'_, '_> {
         #[rustfmt::skip]
         let MiriMachine {
             threads,
-            sync,
+            sync: _,
             tls,
             env_vars,
             main_fn_ret_place,
@@ -819,7 +819,6 @@ impl VisitProvenance for MiriMachine<'_, '_> {
         } = self;
 
         threads.visit_provenance(visit);
-        sync.visit_provenance(visit);
         tls.visit_provenance(visit);
         env_vars.visit_provenance(visit);
         dirs.visit_provenance(visit);
@@ -1371,7 +1370,7 @@ impl<'mir, 'tcx> Machine<'mir, 'tcx> for MiriMachine<'mir, 'tcx> {
             Some(profiler.start_recording_interval_event_detached(
                 *name,
                 measureme::EventId::from_label(*name),
-                ecx.get_active_thread().to_u32(),
+                ecx.active_thread().to_u32(),
             ))
         } else {
             None
diff --git a/src/tools/miri/src/shims/time.rs b/src/tools/miri/src/shims/time.rs
index 941c61caa53..a99006f3970 100644
--- a/src/tools/miri/src/shims/time.rs
+++ b/src/tools/miri/src/shims/time.rs
@@ -6,7 +6,6 @@ use std::time::{Duration, SystemTime};
 use chrono::{DateTime, Datelike, Offset, Timelike, Utc};
 use chrono_tz::Tz;
 
-use crate::concurrency::thread::MachineCallback;
 use crate::*;
 
 /// Returns the time elapsed between the provided time and the unix epoch as a `Duration`.
@@ -336,16 +335,9 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
         let timeout_time = now
             .checked_add(duration)
             .unwrap_or_else(|| now.checked_add(Duration::from_secs(3600)).unwrap());
+        let timeout_time = Timeout::Monotonic(timeout_time);
 
-        let active_thread = this.get_active_thread();
-        this.block_thread(active_thread, BlockReason::Sleep);
-
-        this.register_timeout_callback(
-            active_thread,
-            CallbackTime::Monotonic(timeout_time),
-            Box::new(UnblockCallback { thread_to_unblock: active_thread }),
-        );
-
+        this.block_thread(BlockReason::Sleep, Some(timeout_time), SleepCallback);
         Ok(0)
     }
 
@@ -359,31 +351,25 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
 
         let duration = Duration::from_millis(timeout_ms.into());
         let timeout_time = this.machine.clock.now().checked_add(duration).unwrap();
+        let timeout_time = Timeout::Monotonic(timeout_time);
 
-        let active_thread = this.get_active_thread();
-        this.block_thread(active_thread, BlockReason::Sleep);
-
-        this.register_timeout_callback(
-            active_thread,
-            CallbackTime::Monotonic(timeout_time),
-            Box::new(UnblockCallback { thread_to_unblock: active_thread }),
-        );
-
+        this.block_thread(BlockReason::Sleep, Some(timeout_time), SleepCallback);
         Ok(())
     }
 }
 
-struct UnblockCallback {
-    thread_to_unblock: ThreadId,
-}
-
-impl VisitProvenance for UnblockCallback {
+struct SleepCallback;
+impl VisitProvenance for SleepCallback {
     fn visit_provenance(&self, _visit: &mut VisitWith<'_>) {}
 }
-
-impl<'mir, 'tcx: 'mir> MachineCallback<'mir, 'tcx> for UnblockCallback {
-    fn call(&self, ecx: &mut MiriInterpCx<'mir, 'tcx>) -> InterpResult<'tcx> {
-        ecx.unblock_thread(self.thread_to_unblock, BlockReason::Sleep);
+impl<'mir, 'tcx: 'mir> UnblockCallback<'mir, 'tcx> for SleepCallback {
+    fn timeout(self: Box<Self>, _this: &mut MiriInterpCx<'mir, 'tcx>) -> InterpResult<'tcx> {
         Ok(())
     }
+    fn unblock(
+        self: Box<Self>,
+        _this: &mut InterpCx<'mir, 'tcx, MiriMachine<'mir, 'tcx>>,
+    ) -> InterpResult<'tcx> {
+        panic!("a sleeping thread should only ever be woken up via the timeout")
+    }
 }
diff --git a/src/tools/miri/src/shims/tls.rs b/src/tools/miri/src/shims/tls.rs
index 0dec12f0b65..3dc85cc70be 100644
--- a/src/tools/miri/src/shims/tls.rs
+++ b/src/tools/miri/src/shims/tls.rs
@@ -282,7 +282,7 @@ impl<'tcx> TlsDtorsState<'tcx> {
                     }
                 }
                 Done => {
-                    this.machine.tls.delete_all_thread_tls(this.get_active_thread());
+                    this.machine.tls.delete_all_thread_tls(this.active_thread());
                     return Ok(Poll::Ready(()));
                 }
             }
@@ -332,7 +332,7 @@ trait EvalContextPrivExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
     /// executed.
     fn schedule_macos_tls_dtor(&mut self) -> InterpResult<'tcx> {
         let this = self.eval_context_mut();
-        let thread_id = this.get_active_thread();
+        let thread_id = this.active_thread();
         if let Some((instance, data)) = this.machine.tls.macos_thread_dtors.remove(&thread_id) {
             trace!("Running macos dtor {:?} on {:?} at {:?}", instance, data, thread_id);
 
@@ -354,7 +354,7 @@ trait EvalContextPrivExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
         state: &mut RunningDtorState,
     ) -> InterpResult<'tcx, Poll<()>> {
         let this = self.eval_context_mut();
-        let active_thread = this.get_active_thread();
+        let active_thread = this.active_thread();
 
         // Fetch next dtor after `key`.
         let dtor = match this.machine.tls.fetch_tls_dtor(state.last_key, active_thread) {
diff --git a/src/tools/miri/src/shims/unix/foreign_items.rs b/src/tools/miri/src/shims/unix/foreign_items.rs
index 86dd23f31c7..e75df88876b 100644
--- a/src/tools/miri/src/shims/unix/foreign_items.rs
+++ b/src/tools/miri/src/shims/unix/foreign_items.rs
@@ -388,14 +388,14 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
             "pthread_getspecific" => {
                 let [key] = this.check_shim(abi, Abi::C { unwind: false }, link_name, args)?;
                 let key = this.read_scalar(key)?.to_bits(key.layout.size)?;
-                let active_thread = this.get_active_thread();
+                let active_thread = this.active_thread();
                 let ptr = this.machine.tls.load_tls(key, active_thread, this)?;
                 this.write_scalar(ptr, dest)?;
             }
             "pthread_setspecific" => {
                 let [key, new_ptr] = this.check_shim(abi, Abi::C { unwind: false }, link_name, args)?;
                 let key = this.read_scalar(key)?.to_bits(key.layout.size)?;
-                let active_thread = this.get_active_thread();
+                let active_thread = this.active_thread();
                 let new_data = this.read_scalar(new_ptr)?;
                 this.machine.tls.store_tls(key, active_thread, new_data, &*this.tcx)?;
 
@@ -426,8 +426,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
             }
             "pthread_mutex_lock" => {
                 let [mutex] = this.check_shim(abi, Abi::C { unwind: false }, link_name, args)?;
-                let result = this.pthread_mutex_lock(mutex)?;
-                this.write_scalar(Scalar::from_i32(result), dest)?;
+                this.pthread_mutex_lock(mutex, dest)?;
             }
             "pthread_mutex_trylock" => {
                 let [mutex] = this.check_shim(abi, Abi::C { unwind: false }, link_name, args)?;
@@ -446,8 +445,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
             }
             "pthread_rwlock_rdlock" => {
                 let [rwlock] = this.check_shim(abi, Abi::C { unwind: false }, link_name, args)?;
-                let result = this.pthread_rwlock_rdlock(rwlock)?;
-                this.write_scalar(Scalar::from_i32(result), dest)?;
+                this.pthread_rwlock_rdlock(rwlock, dest)?;
             }
             "pthread_rwlock_tryrdlock" => {
                 let [rwlock] = this.check_shim(abi, Abi::C { unwind: false }, link_name, args)?;
@@ -456,8 +454,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
             }
             "pthread_rwlock_wrlock" => {
                 let [rwlock] = this.check_shim(abi, Abi::C { unwind: false }, link_name, args)?;
-                let result = this.pthread_rwlock_wrlock(rwlock)?;
-                this.write_scalar(Scalar::from_i32(result), dest)?;
+                this.pthread_rwlock_wrlock(rwlock, dest)?;
             }
             "pthread_rwlock_trywrlock" => {
                 let [rwlock] = this.check_shim(abi, Abi::C { unwind: false }, link_name, args)?;
@@ -513,8 +510,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
             }
             "pthread_cond_wait" => {
                 let [cond, mutex] = this.check_shim(abi, Abi::C { unwind: false }, link_name, args)?;
-                let result = this.pthread_cond_wait(cond, mutex)?;
-                this.write_scalar(Scalar::from_i32(result), dest)?;
+                this.pthread_cond_wait(cond, mutex, dest)?;
             }
             "pthread_cond_timedwait" => {
                 let [cond, mutex, abstime] = this.check_shim(abi, Abi::C { unwind: false }, link_name, args)?;
diff --git a/src/tools/miri/src/shims/unix/linux/sync.rs b/src/tools/miri/src/shims/unix/linux/sync.rs
index d4a6cd96f48..2c7c4dd65ce 100644
--- a/src/tools/miri/src/shims/unix/linux/sync.rs
+++ b/src/tools/miri/src/shims/unix/linux/sync.rs
@@ -1,6 +1,5 @@
 use std::time::SystemTime;
 
-use crate::concurrency::thread::MachineCallback;
 use crate::*;
 
 /// Implementation of the SYS_futex syscall.
@@ -32,7 +31,6 @@ pub fn futex<'tcx>(
     let op = this.read_scalar(&args[1])?.to_i32()?;
     let val = this.read_scalar(&args[2])?.to_i32()?;
 
-    let thread = this.get_active_thread();
     // This is a vararg function so we have to bring our own type for this pointer.
     let addr = this.ptr_to_mplace(addr, this.machine.layouts.i32);
     let addr_usize = addr.ptr().addr().bytes();
@@ -107,22 +105,18 @@ pub fn futex<'tcx>(
                 Some(if wait_bitset {
                     // FUTEX_WAIT_BITSET uses an absolute timestamp.
                     if realtime {
-                        CallbackTime::RealTime(
-                            SystemTime::UNIX_EPOCH.checked_add(duration).unwrap(),
-                        )
+                        Timeout::RealTime(SystemTime::UNIX_EPOCH.checked_add(duration).unwrap())
                     } else {
-                        CallbackTime::Monotonic(
+                        Timeout::Monotonic(
                             this.machine.clock.anchor().checked_add(duration).unwrap(),
                         )
                     }
                 } else {
                     // FUTEX_WAIT uses a relative timestamp.
                     if realtime {
-                        CallbackTime::RealTime(SystemTime::now().checked_add(duration).unwrap())
+                        Timeout::RealTime(SystemTime::now().checked_add(duration).unwrap())
                     } else {
-                        CallbackTime::Monotonic(
-                            this.machine.clock.now().checked_add(duration).unwrap(),
-                        )
+                        Timeout::Monotonic(this.machine.clock.now().checked_add(duration).unwrap())
                     }
                 })
             };
@@ -158,7 +152,7 @@ pub fn futex<'tcx>(
             //    to see an up-to-date value.
             //
             // The above case distinction is valid since both FUTEX_WAIT and FUTEX_WAKE
-            // contain a SeqCst fence, therefore inducting a total order between the operations.
+            // contain a SeqCst fence, therefore inducing a total order between the operations.
             // It is also critical that the fence, the atomic load, and the comparison in FUTEX_WAIT
             // altogether happen atomically. If the other thread's fence in FUTEX_WAKE
             // gets interleaved after our fence, then we lose the guarantee on the
@@ -174,48 +168,16 @@ pub fn futex<'tcx>(
             // It's not uncommon for `addr` to be passed as another type than `*mut i32`, such as `*const AtomicI32`.
             let futex_val = this.read_scalar_atomic(&addr, AtomicReadOrd::Relaxed)?.to_i32()?;
             if val == futex_val {
-                // The value still matches, so we block the thread make it wait for FUTEX_WAKE.
-                this.block_thread(thread, BlockReason::Futex { addr: addr_usize });
-                this.futex_wait(addr_usize, thread, bitset);
-                // Succesfully waking up from FUTEX_WAIT always returns zero.
-                this.write_scalar(Scalar::from_target_isize(0, this), dest)?;
-                // Register a timeout callback if a timeout was specified.
-                // This callback will override the return value when the timeout triggers.
-                if let Some(timeout_time) = timeout_time {
-                    struct Callback<'tcx> {
-                        thread: ThreadId,
-                        addr_usize: u64,
-                        dest: MPlaceTy<'tcx, Provenance>,
-                    }
-
-                    impl<'tcx> VisitProvenance for Callback<'tcx> {
-                        fn visit_provenance(&self, visit: &mut VisitWith<'_>) {
-                            let Callback { thread: _, addr_usize: _, dest } = self;
-                            dest.visit_provenance(visit);
-                        }
-                    }
-
-                    impl<'mir, 'tcx: 'mir> MachineCallback<'mir, 'tcx> for Callback<'tcx> {
-                        fn call(&self, this: &mut MiriInterpCx<'mir, 'tcx>) -> InterpResult<'tcx> {
-                            this.unblock_thread(
-                                self.thread,
-                                BlockReason::Futex { addr: self.addr_usize },
-                            );
-                            this.futex_remove_waiter(self.addr_usize, self.thread);
-                            let etimedout = this.eval_libc("ETIMEDOUT");
-                            this.set_last_error(etimedout)?;
-                            this.write_scalar(Scalar::from_target_isize(-1, this), &self.dest)?;
-
-                            Ok(())
-                        }
-                    }
-
-                    this.register_timeout_callback(
-                        thread,
-                        timeout_time,
-                        Box::new(Callback { thread, addr_usize, dest: dest.clone() }),
-                    );
-                }
+                // The value still matches, so we block the thread and make it wait for FUTEX_WAKE.
+                this.futex_wait(
+                    addr_usize,
+                    bitset,
+                    timeout_time,
+                    Scalar::from_target_isize(0, this), // retval_succ
+                    Scalar::from_target_isize(-1, this), // retval_timeout
+                    dest.clone(),
+                    this.eval_libc("ETIMEDOUT"),
+                );
             } else {
                 // The futex value doesn't match the expected value, so we return failure
                 // right away without sleeping: -1 and errno set to EAGAIN.
@@ -257,9 +219,7 @@ pub fn futex<'tcx>(
             let mut n = 0;
             #[allow(clippy::arithmetic_side_effects)]
             for _ in 0..val {
-                if let Some(thread) = this.futex_wake(addr_usize, bitset) {
-                    this.unblock_thread(thread, BlockReason::Futex { addr: addr_usize });
-                    this.unregister_timeout_callback_if_exists(thread);
+                if this.futex_wake(addr_usize, bitset)? {
                     n += 1;
                 } else {
                     break;
diff --git a/src/tools/miri/src/shims/unix/macos/foreign_items.rs b/src/tools/miri/src/shims/unix/macos/foreign_items.rs
index 2b9ce746a56..7c489ec4e3c 100644
--- a/src/tools/miri/src/shims/unix/macos/foreign_items.rs
+++ b/src/tools/miri/src/shims/unix/macos/foreign_items.rs
@@ -131,7 +131,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
                 let dtor = this.read_pointer(dtor)?;
                 let dtor = this.get_ptr_fn(dtor)?.as_instance()?;
                 let data = this.read_scalar(data)?;
-                let active_thread = this.get_active_thread();
+                let active_thread = this.active_thread();
                 this.machine.tls.set_macos_thread_dtor(active_thread, dtor, data)?;
             }
 
diff --git a/src/tools/miri/src/shims/unix/sync.rs b/src/tools/miri/src/shims/unix/sync.rs
index 79358da0894..b2864269b2e 100644
--- a/src/tools/miri/src/shims/unix/sync.rs
+++ b/src/tools/miri/src/shims/unix/sync.rs
@@ -3,7 +3,6 @@ use std::time::SystemTime;
 
 use rustc_target::abi::Size;
 
-use crate::concurrency::thread::MachineCallback;
 use crate::*;
 
 // pthread_mutexattr_t is either 4 or 8 bytes, depending on the platform.
@@ -373,59 +372,6 @@ fn cond_set_clock_id<'mir, 'tcx: 'mir>(
     )
 }
 
-/// Try to reacquire the mutex associated with the condition variable after we
-/// were signaled.
-fn reacquire_cond_mutex<'mir, 'tcx: 'mir>(
-    ecx: &mut MiriInterpCx<'mir, 'tcx>,
-    thread: ThreadId,
-    condvar: CondvarId,
-    mutex: MutexId,
-) -> InterpResult<'tcx> {
-    ecx.unblock_thread(thread, BlockReason::Condvar(condvar));
-    if ecx.mutex_is_locked(mutex) {
-        ecx.mutex_enqueue_and_block(mutex, thread);
-    } else {
-        ecx.mutex_lock(mutex, thread);
-    }
-    Ok(())
-}
-
-/// After a thread waiting on a condvar was signaled:
-/// Reacquire the conditional variable and remove the timeout callback if any
-/// was registered.
-fn post_cond_signal<'mir, 'tcx: 'mir>(
-    ecx: &mut MiriInterpCx<'mir, 'tcx>,
-    thread: ThreadId,
-    condvar: CondvarId,
-    mutex: MutexId,
-) -> InterpResult<'tcx> {
-    reacquire_cond_mutex(ecx, thread, condvar, mutex)?;
-    // Waiting for the mutex is not included in the waiting time because we need
-    // to acquire the mutex always even if we get a timeout.
-    ecx.unregister_timeout_callback_if_exists(thread);
-    Ok(())
-}
-
-/// Release the mutex associated with the condition variable because we are
-/// entering the waiting state.
-fn release_cond_mutex_and_block<'mir, 'tcx: 'mir>(
-    ecx: &mut MiriInterpCx<'mir, 'tcx>,
-    thread: ThreadId,
-    condvar: CondvarId,
-    mutex: MutexId,
-) -> InterpResult<'tcx> {
-    assert_eq!(ecx.get_active_thread(), thread);
-    if let Some(old_locked_count) = ecx.mutex_unlock(mutex) {
-        if old_locked_count != 1 {
-            throw_unsup_format!("awaiting on a lock acquired multiple times is not supported");
-        }
-    } else {
-        throw_ub_format!("awaiting on unlocked or owned by a different thread mutex");
-    }
-    ecx.block_thread(thread, BlockReason::Condvar(condvar));
-    Ok(())
-}
-
 impl<'mir, 'tcx> EvalContextExt<'mir, 'tcx> for crate::MiriInterpCx<'mir, 'tcx> {}
 pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
     fn pthread_mutexattr_init(
@@ -531,19 +477,21 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
         Ok(0)
     }
 
-    fn pthread_mutex_lock(&mut self, mutex_op: &OpTy<'tcx, Provenance>) -> InterpResult<'tcx, i32> {
+    fn pthread_mutex_lock(
+        &mut self,
+        mutex_op: &OpTy<'tcx, Provenance>,
+        dest: &MPlaceTy<'tcx, Provenance>,
+    ) -> InterpResult<'tcx> {
         let this = self.eval_context_mut();
 
         let kind = mutex_get_kind(this, mutex_op)?;
         let id = mutex_get_id(this, mutex_op)?;
-        let active_thread = this.get_active_thread();
 
-        if this.mutex_is_locked(id) {
+        let ret = if this.mutex_is_locked(id) {
             let owner_thread = this.mutex_get_owner(id);
-            if owner_thread != active_thread {
-                // Enqueue the active thread.
-                this.mutex_enqueue_and_block(id, active_thread);
-                Ok(0)
+            if owner_thread != this.active_thread() {
+                this.mutex_enqueue_and_block(id, Scalar::from_i32(0), dest.clone());
+                return Ok(());
             } else {
                 // Trying to acquire the same mutex again.
                 if is_mutex_kind_default(this, kind)? {
@@ -551,10 +499,10 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
                 } else if is_mutex_kind_normal(this, kind)? {
                     throw_machine_stop!(TerminationInfo::Deadlock);
                 } else if kind == this.eval_libc_i32("PTHREAD_MUTEX_ERRORCHECK") {
-                    Ok(this.eval_libc_i32("EDEADLK"))
+                    this.eval_libc_i32("EDEADLK")
                 } else if kind == this.eval_libc_i32("PTHREAD_MUTEX_RECURSIVE") {
-                    this.mutex_lock(id, active_thread);
-                    Ok(0)
+                    this.mutex_lock(id);
+                    0
                 } else {
                     throw_unsup_format!(
                         "called pthread_mutex_lock on an unsupported type of mutex"
@@ -563,9 +511,11 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
             }
         } else {
             // The mutex is unlocked. Let's lock it.
-            this.mutex_lock(id, active_thread);
-            Ok(0)
-        }
+            this.mutex_lock(id);
+            0
+        };
+        this.write_scalar(Scalar::from_i32(ret), dest)?;
+        Ok(())
     }
 
     fn pthread_mutex_trylock(
@@ -576,11 +526,10 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
 
         let kind = mutex_get_kind(this, mutex_op)?;
         let id = mutex_get_id(this, mutex_op)?;
-        let active_thread = this.get_active_thread();
 
         if this.mutex_is_locked(id) {
             let owner_thread = this.mutex_get_owner(id);
-            if owner_thread != active_thread {
+            if owner_thread != this.active_thread() {
                 Ok(this.eval_libc_i32("EBUSY"))
             } else {
                 if is_mutex_kind_default(this, kind)?
@@ -589,7 +538,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
                 {
                     Ok(this.eval_libc_i32("EBUSY"))
                 } else if kind == this.eval_libc_i32("PTHREAD_MUTEX_RECURSIVE") {
-                    this.mutex_lock(id, active_thread);
+                    this.mutex_lock(id);
                     Ok(0)
                 } else {
                     throw_unsup_format!(
@@ -599,7 +548,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
             }
         } else {
             // The mutex is unlocked. Let's lock it.
-            this.mutex_lock(id, active_thread);
+            this.mutex_lock(id);
             Ok(0)
         }
     }
@@ -613,7 +562,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
         let kind = mutex_get_kind(this, mutex_op)?;
         let id = mutex_get_id(this, mutex_op)?;
 
-        if let Some(_old_locked_count) = this.mutex_unlock(id) {
+        if let Some(_old_locked_count) = this.mutex_unlock(id)? {
             // The mutex was locked by the current thread.
             Ok(0)
         } else {
@@ -666,19 +615,20 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
     fn pthread_rwlock_rdlock(
         &mut self,
         rwlock_op: &OpTy<'tcx, Provenance>,
-    ) -> InterpResult<'tcx, i32> {
+        dest: &MPlaceTy<'tcx, Provenance>,
+    ) -> InterpResult<'tcx> {
         let this = self.eval_context_mut();
 
         let id = rwlock_get_id(this, rwlock_op)?;
-        let active_thread = this.get_active_thread();
 
         if this.rwlock_is_write_locked(id) {
-            this.rwlock_enqueue_and_block_reader(id, active_thread);
-            Ok(0)
+            this.rwlock_enqueue_and_block_reader(id, Scalar::from_i32(0), dest.clone());
         } else {
-            this.rwlock_reader_lock(id, active_thread);
-            Ok(0)
+            this.rwlock_reader_lock(id);
+            this.write_null(dest)?;
         }
+
+        Ok(())
     }
 
     fn pthread_rwlock_tryrdlock(
@@ -688,12 +638,11 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
         let this = self.eval_context_mut();
 
         let id = rwlock_get_id(this, rwlock_op)?;
-        let active_thread = this.get_active_thread();
 
         if this.rwlock_is_write_locked(id) {
             Ok(this.eval_libc_i32("EBUSY"))
         } else {
-            this.rwlock_reader_lock(id, active_thread);
+            this.rwlock_reader_lock(id);
             Ok(0)
         }
     }
@@ -701,11 +650,11 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
     fn pthread_rwlock_wrlock(
         &mut self,
         rwlock_op: &OpTy<'tcx, Provenance>,
-    ) -> InterpResult<'tcx, i32> {
+        dest: &MPlaceTy<'tcx, Provenance>,
+    ) -> InterpResult<'tcx> {
         let this = self.eval_context_mut();
 
         let id = rwlock_get_id(this, rwlock_op)?;
-        let active_thread = this.get_active_thread();
 
         if this.rwlock_is_locked(id) {
             // Note: this will deadlock if the lock is already locked by this
@@ -720,12 +669,13 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
             // report the deadlock only when no thread can continue execution,
             // but we could detect that this lock is already locked and report
             // an error.)
-            this.rwlock_enqueue_and_block_writer(id, active_thread);
+            this.rwlock_enqueue_and_block_writer(id, Scalar::from_i32(0), dest.clone());
         } else {
-            this.rwlock_writer_lock(id, active_thread);
+            this.rwlock_writer_lock(id);
+            this.write_null(dest)?;
         }
 
-        Ok(0)
+        Ok(())
     }
 
     fn pthread_rwlock_trywrlock(
@@ -735,12 +685,11 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
         let this = self.eval_context_mut();
 
         let id = rwlock_get_id(this, rwlock_op)?;
-        let active_thread = this.get_active_thread();
 
         if this.rwlock_is_locked(id) {
             Ok(this.eval_libc_i32("EBUSY"))
         } else {
-            this.rwlock_writer_lock(id, active_thread);
+            this.rwlock_writer_lock(id);
             Ok(0)
         }
     }
@@ -754,9 +703,9 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
         let id = rwlock_get_id(this, rwlock_op)?;
 
         #[allow(clippy::if_same_then_else)]
-        if this.rwlock_reader_unlock(id) {
+        if this.rwlock_reader_unlock(id)? {
             Ok(0)
-        } else if this.rwlock_writer_unlock(id) {
+        } else if this.rwlock_writer_unlock(id)? {
             Ok(0)
         } else {
             throw_ub_format!("unlocked an rwlock that was not locked by the active thread");
@@ -885,10 +834,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
     fn pthread_cond_signal(&mut self, cond_op: &OpTy<'tcx, Provenance>) -> InterpResult<'tcx, i32> {
         let this = self.eval_context_mut();
         let id = cond_get_id(this, cond_op)?;
-        if let Some((thread, mutex)) = this.condvar_signal(id) {
-            post_cond_signal(this, thread, id, mutex)?;
-        }
-
+        this.condvar_signal(id)?;
         Ok(0)
     }
 
@@ -898,11 +844,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
     ) -> InterpResult<'tcx, i32> {
         let this = self.eval_context_mut();
         let id = cond_get_id(this, cond_op)?;
-
-        while let Some((thread, mutex)) = this.condvar_signal(id) {
-            post_cond_signal(this, thread, id, mutex)?;
-        }
-
+        while this.condvar_signal(id)? {}
         Ok(0)
     }
 
@@ -910,17 +852,23 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
         &mut self,
         cond_op: &OpTy<'tcx, Provenance>,
         mutex_op: &OpTy<'tcx, Provenance>,
-    ) -> InterpResult<'tcx, i32> {
+        dest: &MPlaceTy<'tcx, Provenance>,
+    ) -> InterpResult<'tcx> {
         let this = self.eval_context_mut();
 
         let id = cond_get_id(this, cond_op)?;
         let mutex_id = mutex_get_id(this, mutex_op)?;
-        let active_thread = this.get_active_thread();
 
-        release_cond_mutex_and_block(this, active_thread, id, mutex_id)?;
-        this.condvar_wait(id, active_thread, mutex_id);
+        this.condvar_wait(
+            id,
+            mutex_id,
+            None, // no timeout
+            Scalar::from_i32(0),
+            Scalar::from_i32(0), // retval_timeout -- unused
+            dest.clone(),
+        )?;
 
-        Ok(0)
+        Ok(())
     }
 
     fn pthread_cond_timedwait(
@@ -934,7 +882,6 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
 
         let id = cond_get_id(this, cond_op)?;
         let mutex_id = mutex_get_id(this, mutex_op)?;
-        let active_thread = this.get_active_thread();
 
         // Extract the timeout.
         let clock_id = cond_get_clock_id(this, cond_op)?;
@@ -948,61 +895,23 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
                 return Ok(());
             }
         };
-
         let timeout_time = if is_cond_clock_realtime(this, clock_id) {
             this.check_no_isolation("`pthread_cond_timedwait` with `CLOCK_REALTIME`")?;
-            CallbackTime::RealTime(SystemTime::UNIX_EPOCH.checked_add(duration).unwrap())
+            Timeout::RealTime(SystemTime::UNIX_EPOCH.checked_add(duration).unwrap())
         } else if clock_id == this.eval_libc_i32("CLOCK_MONOTONIC") {
-            CallbackTime::Monotonic(this.machine.clock.anchor().checked_add(duration).unwrap())
+            Timeout::Monotonic(this.machine.clock.anchor().checked_add(duration).unwrap())
         } else {
             throw_unsup_format!("unsupported clock id: {}", clock_id);
         };
 
-        release_cond_mutex_and_block(this, active_thread, id, mutex_id)?;
-        this.condvar_wait(id, active_thread, mutex_id);
-
-        // We return success for now and override it in the timeout callback.
-        this.write_scalar(Scalar::from_i32(0), dest)?;
-
-        struct Callback<'tcx> {
-            active_thread: ThreadId,
-            mutex_id: MutexId,
-            id: CondvarId,
-            dest: MPlaceTy<'tcx, Provenance>,
-        }
-
-        impl<'tcx> VisitProvenance for Callback<'tcx> {
-            fn visit_provenance(&self, visit: &mut VisitWith<'_>) {
-                let Callback { active_thread: _, mutex_id: _, id: _, dest } = self;
-                dest.visit_provenance(visit);
-            }
-        }
-
-        impl<'mir, 'tcx: 'mir> MachineCallback<'mir, 'tcx> for Callback<'tcx> {
-            fn call(&self, ecx: &mut MiriInterpCx<'mir, 'tcx>) -> InterpResult<'tcx> {
-                assert_eq!(self.active_thread, ecx.get_active_thread());
-                // We are not waiting for the condvar any more, wait for the
-                // mutex instead.
-                reacquire_cond_mutex(ecx, self.active_thread, self.id, self.mutex_id)?;
-
-                // Remove the thread from the conditional variable.
-                ecx.condvar_remove_waiter(self.id, self.active_thread);
-
-                // Set the return value: we timed out.
-                let etimedout = ecx.eval_libc("ETIMEDOUT");
-                ecx.write_scalar(etimedout, &self.dest)?;
-
-                Ok(())
-            }
-        }
-
-        // Register the timeout callback.
-        let dest = dest.clone();
-        this.register_timeout_callback(
-            active_thread,
-            timeout_time,
-            Box::new(Callback { active_thread, mutex_id, id, dest }),
-        );
+        this.condvar_wait(
+            id,
+            mutex_id,
+            Some(timeout_time),
+            Scalar::from_i32(0),
+            this.eval_libc("ETIMEDOUT"), // retval_timeout
+            dest.clone(),
+        )?;
 
         Ok(())
     }
diff --git a/src/tools/miri/src/shims/unix/thread.rs b/src/tools/miri/src/shims/unix/thread.rs
index 9e09401a815..f8787ad90e0 100644
--- a/src/tools/miri/src/shims/unix/thread.rs
+++ b/src/tools/miri/src/shims/unix/thread.rs
@@ -63,7 +63,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
     fn pthread_self(&mut self) -> InterpResult<'tcx, Scalar<Provenance>> {
         let this = self.eval_context_mut();
 
-        let thread_id = this.get_active_thread();
+        let thread_id = this.active_thread();
         Ok(Scalar::from_uint(thread_id.to_u32(), this.libc_ty_layout("pthread_t").size))
     }
 
diff --git a/src/tools/miri/src/shims/windows/foreign_items.rs b/src/tools/miri/src/shims/windows/foreign_items.rs
index 91def80227d..462c7ffcdcc 100644
--- a/src/tools/miri/src/shims/windows/foreign_items.rs
+++ b/src/tools/miri/src/shims/windows/foreign_items.rs
@@ -354,7 +354,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
             "TlsGetValue" => {
                 let [key] = this.check_shim(abi, Abi::System { unwind: false }, link_name, args)?;
                 let key = u128::from(this.read_scalar(key)?.to_u32()?);
-                let active_thread = this.get_active_thread();
+                let active_thread = this.active_thread();
                 let ptr = this.machine.tls.load_tls(key, active_thread, this)?;
                 this.write_scalar(ptr, dest)?;
             }
@@ -362,7 +362,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
                 let [key, new_ptr] =
                     this.check_shim(abi, Abi::System { unwind: false }, link_name, args)?;
                 let key = u128::from(this.read_scalar(key)?.to_u32()?);
-                let active_thread = this.get_active_thread();
+                let active_thread = this.active_thread();
                 let new_data = this.read_scalar(new_ptr)?;
                 this.machine.tls.store_tls(key, active_thread, new_data, &*this.tcx)?;
 
@@ -423,8 +423,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
             "InitOnceBeginInitialize" => {
                 let [ptr, flags, pending, context] =
                     this.check_shim(abi, Abi::System { unwind: false }, link_name, args)?;
-                let result = this.InitOnceBeginInitialize(ptr, flags, pending, context)?;
-                this.write_scalar(result, dest)?;
+                this.InitOnceBeginInitialize(ptr, flags, pending, context, dest)?;
             }
             "InitOnceComplete" => {
                 let [ptr, flags, context] =
@@ -502,7 +501,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
 
                 let thread = match Handle::from_scalar(handle, this)? {
                     Some(Handle::Thread(thread)) => thread,
-                    Some(Handle::Pseudo(PseudoHandle::CurrentThread)) => this.get_active_thread(),
+                    Some(Handle::Pseudo(PseudoHandle::CurrentThread)) => this.active_thread(),
                     _ => this.invalid_handle("SetThreadDescription")?,
                 };
 
@@ -520,7 +519,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
 
                 let thread = match Handle::from_scalar(handle, this)? {
                     Some(Handle::Thread(thread)) => thread,
-                    Some(Handle::Pseudo(PseudoHandle::CurrentThread)) => this.get_active_thread(),
+                    Some(Handle::Pseudo(PseudoHandle::CurrentThread)) => this.active_thread(),
                     _ => this.invalid_handle("SetThreadDescription")?,
                 };
                 // Looks like the default thread name is empty.
diff --git a/src/tools/miri/src/shims/windows/sync.rs b/src/tools/miri/src/shims/windows/sync.rs
index 836b9e92595..1e71fc92400 100644
--- a/src/tools/miri/src/shims/windows/sync.rs
+++ b/src/tools/miri/src/shims/windows/sync.rs
@@ -3,7 +3,6 @@ use std::time::Duration;
 use rustc_target::abi::Size;
 
 use crate::concurrency::init_once::InitOnceStatus;
-use crate::concurrency::thread::MachineCallback;
 use crate::*;
 
 impl<'mir, 'tcx> EvalContextExtPriv<'mir, 'tcx> for crate::MiriInterpCx<'mir, 'tcx> {}
@@ -18,6 +17,31 @@ trait EvalContextExtPriv<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
         let this = self.eval_context_mut();
         this.init_once_get_or_create_id(init_once_op, this.windows_ty_layout("INIT_ONCE"), 0)
     }
+
+    /// Returns `true` if we were succssful, `false` if we would block.
+    fn init_once_try_begin(
+        &mut self,
+        id: InitOnceId,
+        pending_place: &MPlaceTy<'tcx, Provenance>,
+        dest: &MPlaceTy<'tcx, Provenance>,
+    ) -> InterpResult<'tcx, bool> {
+        let this = self.eval_context_mut();
+        Ok(match this.init_once_status(id) {
+            InitOnceStatus::Uninitialized => {
+                this.init_once_begin(id);
+                this.write_scalar(this.eval_windows("c", "TRUE"), pending_place)?;
+                this.write_scalar(this.eval_windows("c", "TRUE"), dest)?;
+                true
+            }
+            InitOnceStatus::Complete => {
+                this.init_once_observe_completed(id);
+                this.write_scalar(this.eval_windows("c", "FALSE"), pending_place)?;
+                this.write_scalar(this.eval_windows("c", "TRUE"), dest)?;
+                true
+            }
+            InitOnceStatus::Begun => false,
+        })
+    }
 }
 
 impl<'mir, 'tcx> EvalContextExt<'mir, 'tcx> for crate::MiriInterpCx<'mir, 'tcx> {}
@@ -29,9 +53,9 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
         flags_op: &OpTy<'tcx, Provenance>,
         pending_op: &OpTy<'tcx, Provenance>,
         context_op: &OpTy<'tcx, Provenance>,
-    ) -> InterpResult<'tcx, Scalar<Provenance>> {
+        dest: &MPlaceTy<'tcx, Provenance>,
+    ) -> InterpResult<'tcx> {
         let this = self.eval_context_mut();
-        let active_thread = this.get_active_thread();
 
         let id = this.init_once_get_id(init_once_op)?;
         let flags = this.read_scalar(flags_op)?.to_u32()?;
@@ -46,58 +70,34 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
             throw_unsup_format!("non-null `lpContext` in `InitOnceBeginInitialize`");
         }
 
-        match this.init_once_status(id) {
-            InitOnceStatus::Uninitialized => {
-                this.init_once_begin(id);
-                this.write_scalar(this.eval_windows("c", "TRUE"), &pending_place)?;
-            }
-            InitOnceStatus::Begun => {
-                // Someone else is already on it.
-                // Block this thread until they are done.
-                // When we are woken up, set the `pending` flag accordingly.
-                struct Callback<'tcx> {
-                    init_once_id: InitOnceId,
-                    pending_place: MPlaceTy<'tcx, Provenance>,
-                }
-
-                impl<'tcx> VisitProvenance for Callback<'tcx> {
-                    fn visit_provenance(&self, visit: &mut VisitWith<'_>) {
-                        let Callback { init_once_id: _, pending_place } = self;
-                        pending_place.visit_provenance(visit);
-                    }
-                }
-
-                impl<'mir, 'tcx> MachineCallback<'mir, 'tcx> for Callback<'tcx> {
-                    fn call(&self, this: &mut MiriInterpCx<'mir, 'tcx>) -> InterpResult<'tcx> {
-                        let pending = match this.init_once_status(self.init_once_id) {
-                            InitOnceStatus::Uninitialized =>
-                                unreachable!(
-                                    "status should have either been set to begun or complete"
-                                ),
-                            InitOnceStatus::Begun => this.eval_windows("c", "TRUE"),
-                            InitOnceStatus::Complete => this.eval_windows("c", "FALSE"),
-                        };
-
-                        this.write_scalar(pending, &self.pending_place)?;
-
-                        Ok(())
-                    }
-                }
-
-                this.init_once_enqueue_and_block(
-                    id,
-                    active_thread,
-                    Box::new(Callback { init_once_id: id, pending_place }),
-                )
+        if this.init_once_try_begin(id, &pending_place, dest)? {
+            // Done!
+            return Ok(());
+        }
+
+        // We have to block, and then try again when we are woken up.
+        this.init_once_enqueue_and_block(id, Callback { id, pending_place, dest: dest.clone() });
+        return Ok(());
+
+        struct Callback<'tcx> {
+            id: InitOnceId,
+            pending_place: MPlaceTy<'tcx, Provenance>,
+            dest: MPlaceTy<'tcx, Provenance>,
+        }
+        impl<'tcx> VisitProvenance for Callback<'tcx> {
+            fn visit_provenance(&self, visit: &mut VisitWith<'_>) {
+                let Callback { id: _, dest, pending_place } = self;
+                pending_place.visit_provenance(visit);
+                dest.visit_provenance(visit);
             }
-            InitOnceStatus::Complete => {
-                this.init_once_observe_completed(id);
-                this.write_scalar(this.eval_windows("c", "FALSE"), &pending_place)?;
+        }
+        impl<'mir, 'tcx> UnblockCallback<'mir, 'tcx> for Callback<'tcx> {
+            fn unblock(self: Box<Self>, this: &mut MiriInterpCx<'mir, 'tcx>) -> InterpResult<'tcx> {
+                let ret = this.init_once_try_begin(self.id, &self.pending_place, &self.dest)?;
+                assert!(ret, "we were woken up but init_once_try_begin still failed");
+                Ok(())
             }
         }
-
-        // This always succeeds (even if the thread is blocked, we will succeed if we ever unblock).
-        Ok(this.eval_windows("c", "TRUE"))
     }
 
     fn InitOnceComplete(
@@ -155,7 +155,6 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
         let size = this.read_target_usize(size_op)?;
         let timeout_ms = this.read_scalar(timeout_op)?.to_u32()?;
 
-        let thread = this.get_active_thread();
         let addr = ptr.addr().bytes();
 
         if size > 8 || !size.is_power_of_two() {
@@ -170,7 +169,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
             None
         } else {
             let duration = Duration::from_millis(timeout_ms.into());
-            Some(CallbackTime::Monotonic(this.machine.clock.now().checked_add(duration).unwrap()))
+            Some(Timeout::Monotonic(this.machine.clock.now().checked_add(duration).unwrap()))
         };
 
         // See the Linux futex implementation for why this fence exists.
@@ -183,41 +182,15 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
 
         if futex_val == compare_val {
             // If the values are the same, we have to block.
-            this.block_thread(thread, BlockReason::Futex { addr });
-            this.futex_wait(addr, thread, u32::MAX);
-
-            if let Some(timeout_time) = timeout_time {
-                struct Callback<'tcx> {
-                    thread: ThreadId,
-                    addr: u64,
-                    dest: MPlaceTy<'tcx, Provenance>,
-                }
-
-                impl<'tcx> VisitProvenance for Callback<'tcx> {
-                    fn visit_provenance(&self, visit: &mut VisitWith<'_>) {
-                        let Callback { thread: _, addr: _, dest } = self;
-                        dest.visit_provenance(visit);
-                    }
-                }
-
-                impl<'mir, 'tcx: 'mir> MachineCallback<'mir, 'tcx> for Callback<'tcx> {
-                    fn call(&self, this: &mut MiriInterpCx<'mir, 'tcx>) -> InterpResult<'tcx> {
-                        this.unblock_thread(self.thread, BlockReason::Futex { addr: self.addr });
-                        this.futex_remove_waiter(self.addr, self.thread);
-                        let error_timeout = this.eval_windows("c", "ERROR_TIMEOUT");
-                        this.set_last_error(error_timeout)?;
-                        this.write_scalar(Scalar::from_i32(0), &self.dest)?;
-
-                        Ok(())
-                    }
-                }
-
-                this.register_timeout_callback(
-                    thread,
-                    timeout_time,
-                    Box::new(Callback { thread, addr, dest: dest.clone() }),
-                );
-            }
+            this.futex_wait(
+                addr,
+                u32::MAX, // bitset
+                timeout_time,
+                Scalar::from_i32(1), // retval_succ
+                Scalar::from_i32(0), // retval_timeout
+                dest.clone(),
+                this.eval_windows("c", "ERROR_TIMEOUT"),
+            );
         }
 
         this.write_scalar(Scalar::from_i32(1), dest)?;
@@ -234,10 +207,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
         this.atomic_fence(AtomicFenceOrd::SeqCst)?;
 
         let addr = ptr.addr().bytes();
-        if let Some(thread) = this.futex_wake(addr, u32::MAX) {
-            this.unblock_thread(thread, BlockReason::Futex { addr });
-            this.unregister_timeout_callback_if_exists(thread);
-        }
+        this.futex_wake(addr, u32::MAX)?;
 
         Ok(())
     }
@@ -250,10 +220,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
         this.atomic_fence(AtomicFenceOrd::SeqCst)?;
 
         let addr = ptr.addr().bytes();
-        while let Some(thread) = this.futex_wake(addr, u32::MAX) {
-            this.unblock_thread(thread, BlockReason::Futex { addr });
-            this.unregister_timeout_callback_if_exists(thread);
-        }
+        while this.futex_wake(addr, u32::MAX)? {}
 
         Ok(())
     }
diff --git a/src/tools/miri/src/shims/windows/thread.rs b/src/tools/miri/src/shims/windows/thread.rs
index 3953a881a72..047f52f50be 100644
--- a/src/tools/miri/src/shims/windows/thread.rs
+++ b/src/tools/miri/src/shims/windows/thread.rs
@@ -69,7 +69,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
             Some(Handle::Thread(thread)) => thread,
             // Unlike on posix, the outcome of joining the current thread is not documented.
             // On current Windows, it just deadlocks.
-            Some(Handle::Pseudo(PseudoHandle::CurrentThread)) => this.get_active_thread(),
+            Some(Handle::Pseudo(PseudoHandle::CurrentThread)) => this.active_thread(),
             _ => this.invalid_handle("WaitForSingleObject")?,
         };