From 5259fb9bb0f54bbcfdccce3a1fc28e01d0066e2c Mon Sep 17 00:00:00 2001 From: Hiroki6 Date: Mon, 22 Aug 2022 19:13:39 +0200 Subject: move thread.rs into concurrency --- src/concurrency/mod.rs | 1 + src/concurrency/thread.rs | 920 +++++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 5 +- src/shims/time.rs | 2 +- src/shims/unix/linux/sync.rs | 2 +- src/shims/unix/sync.rs | 2 +- src/thread.rs | 920 ------------------------------------------- 7 files changed, 926 insertions(+), 926 deletions(-) create mode 100644 src/concurrency/thread.rs delete mode 100644 src/thread.rs (limited to 'src') diff --git a/src/concurrency/mod.rs b/src/concurrency/mod.rs index 07c3f0d5993..61ef3d5640e 100644 --- a/src/concurrency/mod.rs +++ b/src/concurrency/mod.rs @@ -1,5 +1,6 @@ pub mod data_race; mod range_object_map; pub mod sync; +pub mod thread; mod vector_clock; pub mod weak_memory; diff --git a/src/concurrency/thread.rs b/src/concurrency/thread.rs new file mode 100644 index 00000000000..dc8b1c29114 --- /dev/null +++ b/src/concurrency/thread.rs @@ -0,0 +1,920 @@ +//! Implements threads. + +use std::cell::RefCell; +use std::collections::hash_map::Entry; +use std::num::TryFromIntError; +use std::time::{Duration, Instant, SystemTime}; + +use log::trace; + +use rustc_data_structures::fx::FxHashMap; +use rustc_hir::def_id::DefId; +use rustc_index::vec::{Idx, IndexVec}; +use rustc_middle::mir::Mutability; +use rustc_middle::ty::layout::TyAndLayout; +use rustc_target::spec::abi::Abi; + +use crate::concurrency::data_race; +use crate::concurrency::sync::SynchronizationState; +use crate::*; + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum SchedulingAction { + /// Execute step on the active thread. + ExecuteStep, + /// Execute a timeout callback. + ExecuteTimeoutCallback, + /// Execute destructors of the active thread. + ExecuteDtors, + /// Stop the program. + Stop, +} + +/// Timeout callbacks can be created by synchronization primitives to tell the +/// scheduler that they should be called once some period of time passes. +type TimeoutCallback<'mir, 'tcx> = + Box>) -> InterpResult<'tcx> + 'tcx>; + +/// A thread identifier. +#[derive(Clone, Copy, Debug, PartialOrd, Ord, PartialEq, Eq, Hash)] +pub struct ThreadId(u32); + +/// The main thread. When it terminates, the whole application terminates. +const MAIN_THREAD: ThreadId = ThreadId(0); + +impl ThreadId { + pub fn to_u32(self) -> u32 { + self.0 + } +} + +impl Idx for ThreadId { + fn new(idx: usize) -> Self { + ThreadId(u32::try_from(idx).unwrap()) + } + + fn index(self) -> usize { + usize::try_from(self.0).unwrap() + } +} + +impl TryFrom for ThreadId { + type Error = TryFromIntError; + fn try_from(id: u64) -> Result { + u32::try_from(id).map(Self) + } +} + +impl From for ThreadId { + fn from(id: u32) -> Self { + Self(id) + } +} + +impl From for u64 { + fn from(t: ThreadId) -> Self { + t.0.into() + } +} + +/// The state of a thread. +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub enum ThreadState { + /// The thread is enabled and can be executed. + Enabled, + /// The thread tried to join the specified thread and is blocked until that + /// thread terminates. + BlockedOnJoin(ThreadId), + /// The thread is blocked on some synchronization primitive. It is the + /// responsibility of the synchronization primitives to track threads that + /// are blocked by them. + BlockedOnSync, + /// The thread has terminated its execution. We do not delete terminated + /// threads (FIXME: why?). + Terminated, +} + +/// The join status of a thread. +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +enum ThreadJoinStatus { + /// The thread can be joined. + Joinable, + /// A thread is detached if its join handle was destroyed and no other + /// thread can join it. + Detached, + /// The thread was already joined by some thread and cannot be joined again. + Joined, +} + +/// A thread. +pub struct Thread<'mir, 'tcx> { + state: ThreadState, + + /// Name of the thread. + thread_name: Option>, + + /// The virtual call stack. + stack: Vec>>, + + /// The join status. + join_status: ThreadJoinStatus, + + /// The temporary used for storing the argument of + /// the call to `miri_start_panic` (the panic payload) when unwinding. + /// This is pointer-sized, and matches the `Payload` type in `src/libpanic_unwind/miri.rs`. + pub(crate) panic_payload: Option>, + + /// Last OS error location in memory. It is a 32-bit integer. + pub(crate) last_error: Option>, +} + +impl<'mir, 'tcx> Thread<'mir, 'tcx> { + /// Check if the thread is done executing (no more stack frames). If yes, + /// change the state to terminated and return `true`. + fn check_terminated(&mut self) -> bool { + if self.state == ThreadState::Enabled { + if self.stack.is_empty() { + self.state = ThreadState::Terminated; + return true; + } + } + false + } + + /// Get the name of the current thread, or `` if it was not set. + fn thread_name(&self) -> &[u8] { + if let Some(ref thread_name) = self.thread_name { thread_name } else { b"" } + } +} + +impl<'mir, 'tcx> std::fmt::Debug for Thread<'mir, 'tcx> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{}({:?}, {:?})", + String::from_utf8_lossy(self.thread_name()), + self.state, + self.join_status + ) + } +} + +impl<'mir, 'tcx> Default for Thread<'mir, 'tcx> { + fn default() -> Self { + Self { + state: ThreadState::Enabled, + thread_name: None, + stack: Vec::new(), + join_status: ThreadJoinStatus::Joinable, + panic_payload: None, + last_error: None, + } + } +} + +impl<'mir, 'tcx> Thread<'mir, 'tcx> { + fn new(name: &str) -> Self { + let mut thread = Thread::default(); + thread.thread_name = Some(Vec::from(name.as_bytes())); + thread + } +} + +/// A specific moment in time. +#[derive(Debug)] +pub enum Time { + Monotonic(Instant), + RealTime(SystemTime), +} + +impl Time { + /// How long do we have to wait from now until the specified time? + fn get_wait_time(&self) -> Duration { + match self { + Time::Monotonic(instant) => instant.saturating_duration_since(Instant::now()), + Time::RealTime(time) => + time.duration_since(SystemTime::now()).unwrap_or(Duration::new(0, 0)), + } + } +} + +/// Callbacks are used to implement timeouts. For example, waiting on a +/// conditional variable with a timeout creates a callback that is called after +/// the specified time and unblocks the thread. If another thread signals on the +/// conditional variable, the signal handler deletes the callback. +struct TimeoutCallbackInfo<'mir, 'tcx> { + /// The callback should be called no earlier than this time. + call_time: Time, + /// The called function. + callback: TimeoutCallback<'mir, 'tcx>, +} + +impl<'mir, 'tcx> std::fmt::Debug for TimeoutCallbackInfo<'mir, 'tcx> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "TimeoutCallback({:?})", self.call_time) + } +} + +/// A set of threads. +#[derive(Debug)] +pub struct ThreadManager<'mir, 'tcx> { + /// Identifier of the currently active thread. + active_thread: ThreadId, + /// Threads used in the program. + /// + /// Note that this vector also contains terminated threads. + threads: IndexVec>, + /// This field is pub(crate) because the synchronization primitives + /// (`crate::sync`) need a way to access it. + pub(crate) sync: SynchronizationState, + /// A mapping from a thread-local static to an allocation id of a thread + /// specific allocation. + thread_local_alloc_ids: RefCell>>, + /// A flag that indicates that we should change the active thread. + yield_active_thread: bool, + /// Callbacks that are called once the specified time passes. + timeout_callbacks: FxHashMap>, +} + +impl<'mir, 'tcx> Default for ThreadManager<'mir, 'tcx> { + fn default() -> Self { + let mut threads = IndexVec::new(); + // Create the main thread and add it to the list of threads. + threads.push(Thread::new("main")); + Self { + active_thread: ThreadId::new(0), + threads, + sync: SynchronizationState::default(), + thread_local_alloc_ids: Default::default(), + yield_active_thread: false, + timeout_callbacks: FxHashMap::default(), + } + } +} + +impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> { + pub(crate) fn init(ecx: &mut MiriEvalContext<'mir, 'tcx>) { + if ecx.tcx.sess.target.os.as_ref() != "windows" { + // The main thread can *not* be joined on except on windows. + ecx.machine.threads.threads[ThreadId::new(0)].join_status = ThreadJoinStatus::Detached; + } + } + + /// Check if we have an allocation for the given thread local static for the + /// active thread. + fn get_thread_local_alloc_id(&self, def_id: DefId) -> Option> { + self.thread_local_alloc_ids.borrow().get(&(def_id, self.active_thread)).cloned() + } + + /// Set the pointer for the allocation of the given thread local + /// static for the active thread. + /// + /// Panics if a thread local is initialized twice for the same thread. + fn set_thread_local_alloc(&self, def_id: DefId, ptr: Pointer) { + self.thread_local_alloc_ids + .borrow_mut() + .try_insert((def_id, self.active_thread), ptr) + .unwrap(); + } + + /// Borrow the stack of the active thread. + pub fn active_thread_stack(&self) -> &[Frame<'mir, 'tcx, Provenance, FrameData<'tcx>>] { + &self.threads[self.active_thread].stack + } + + /// Mutably borrow the stack of the active thread. + fn active_thread_stack_mut( + &mut self, + ) -> &mut Vec>> { + &mut self.threads[self.active_thread].stack + } + + pub fn all_stacks( + &self, + ) -> impl Iterator>]> { + self.threads.iter().map(|t| &t.stack[..]) + } + + /// Create a new thread and returns its id. + fn create_thread(&mut self) -> ThreadId { + let new_thread_id = ThreadId::new(self.threads.len()); + self.threads.push(Default::default()); + new_thread_id + } + + /// Set an active thread and return the id of the thread that was active before. + fn set_active_thread_id(&mut self, id: ThreadId) -> ThreadId { + let active_thread_id = self.active_thread; + self.active_thread = id; + assert!(self.active_thread.index() < self.threads.len()); + active_thread_id + } + + /// Get the id of the currently active thread. + pub fn get_active_thread_id(&self) -> ThreadId { + self.active_thread + } + + /// Get the total number of threads that were ever spawn by this program. + pub fn get_total_thread_count(&self) -> usize { + self.threads.len() + } + + /// Get the total of threads that are currently live, i.e., not yet terminated. + /// (They might be blocked.) + pub fn get_live_thread_count(&self) -> usize { + self.threads.iter().filter(|t| !matches!(t.state, ThreadState::Terminated)).count() + } + + /// Has the given thread terminated? + fn has_terminated(&self, thread_id: ThreadId) -> bool { + self.threads[thread_id].state == ThreadState::Terminated + } + + /// Have all threads terminated? + fn have_all_terminated(&self) -> bool { + self.threads.iter().all(|thread| thread.state == ThreadState::Terminated) + } + + /// Enable the thread for execution. The thread must be terminated. + fn enable_thread(&mut self, thread_id: ThreadId) { + assert!(self.has_terminated(thread_id)); + self.threads[thread_id].state = ThreadState::Enabled; + } + + /// Get a mutable borrow of the currently active thread. + fn active_thread_mut(&mut self) -> &mut Thread<'mir, 'tcx> { + &mut self.threads[self.active_thread] + } + + /// Get a shared borrow of the currently active thread. + fn active_thread_ref(&self) -> &Thread<'mir, 'tcx> { + &self.threads[self.active_thread] + } + + /// Mark the thread as detached, which means that no other thread will try + /// to join it and the thread is responsible for cleaning up. + /// + /// `allow_terminated_joined` allows detaching joined threads that have already terminated. + /// This matches Windows's behavior for `CloseHandle`. + /// + /// See : + /// > The handle is valid until closed, even after the thread it represents has been terminated. + fn detach_thread(&mut self, id: ThreadId, allow_terminated_joined: bool) -> InterpResult<'tcx> { + trace!("detaching {:?}", id); + + let is_ub = if allow_terminated_joined && self.threads[id].state == ThreadState::Terminated + { + // "Detached" in particular means "not yet joined". Redundant detaching is still UB. + self.threads[id].join_status == ThreadJoinStatus::Detached + } else { + self.threads[id].join_status != ThreadJoinStatus::Joinable + }; + if is_ub { + throw_ub_format!("trying to detach thread that was already detached or joined"); + } + + self.threads[id].join_status = ThreadJoinStatus::Detached; + Ok(()) + } + + /// Mark that the active thread tries to join the thread with `joined_thread_id`. + fn join_thread( + &mut self, + joined_thread_id: ThreadId, + data_race: Option<&mut data_race::GlobalState>, + ) -> InterpResult<'tcx> { + if self.threads[joined_thread_id].join_status == ThreadJoinStatus::Detached { + throw_ub_format!("trying to join a detached thread"); + } + + // Mark the joined thread as being joined so that we detect if other + // threads try to join it. + self.threads[joined_thread_id].join_status = ThreadJoinStatus::Joined; + if self.threads[joined_thread_id].state != ThreadState::Terminated { + // The joined thread is still running, we need to wait for it. + self.active_thread_mut().state = ThreadState::BlockedOnJoin(joined_thread_id); + trace!( + "{:?} blocked on {:?} when trying to join", + self.active_thread, + joined_thread_id + ); + } else { + // The thread has already terminated - mark join happens-before + if let Some(data_race) = data_race { + data_race.thread_joined(self, self.active_thread, joined_thread_id); + } + } + Ok(()) + } + + /// Mark that the active thread tries to exclusively join the thread with `joined_thread_id`. + /// If the thread is already joined by another thread, it will throw UB + fn join_thread_exclusive( + &mut self, + joined_thread_id: ThreadId, + data_race: Option<&mut data_race::GlobalState>, + ) -> InterpResult<'tcx> { + if self.threads[joined_thread_id].join_status == ThreadJoinStatus::Joined { + throw_ub_format!("trying to join an already joined thread"); + } + + if joined_thread_id == self.active_thread { + throw_ub_format!("trying to join itself"); + } + + assert!( + self.threads + .iter() + .all(|thread| thread.state != ThreadState::BlockedOnJoin(joined_thread_id)), + "this thread already has threads waiting for its termination" + ); + + self.join_thread(joined_thread_id, data_race) + } + + /// Set the name of the given thread. + pub fn set_thread_name(&mut self, thread: ThreadId, new_thread_name: Vec) { + self.threads[thread].thread_name = Some(new_thread_name); + } + + /// Get the name of the given thread. + pub fn get_thread_name(&self, thread: ThreadId) -> &[u8] { + self.threads[thread].thread_name() + } + + /// Put the thread into the blocked state. + fn block_thread(&mut self, thread: ThreadId) { + let state = &mut self.threads[thread].state; + assert_eq!(*state, ThreadState::Enabled); + *state = ThreadState::BlockedOnSync; + } + + /// Put the blocked thread into the enabled state. + fn unblock_thread(&mut self, thread: ThreadId) { + let state = &mut self.threads[thread].state; + assert_eq!(*state, ThreadState::BlockedOnSync); + *state = ThreadState::Enabled; + } + + /// Change the active thread to some enabled thread. + fn yield_active_thread(&mut self) { + // We do not yield immediately, as swapping out the current stack while executing a MIR statement + // could lead to all sorts of confusion. + // We should only switch stacks between steps. + self.yield_active_thread = true; + } + + /// Register the given `callback` to be called once the `call_time` passes. + /// + /// The callback will be called with `thread` being the active thread, and + /// the callback may not change the active thread. + fn register_timeout_callback( + &mut self, + thread: ThreadId, + call_time: Time, + callback: TimeoutCallback<'mir, 'tcx>, + ) { + self.timeout_callbacks + .try_insert(thread, TimeoutCallbackInfo { call_time, callback }) + .unwrap(); + } + + /// Unregister the callback for the `thread`. + fn unregister_timeout_callback_if_exists(&mut self, thread: ThreadId) { + self.timeout_callbacks.remove(&thread); + } + + /// Get a callback that is ready to be called. + fn get_ready_callback(&mut self) -> Option<(ThreadId, TimeoutCallback<'mir, 'tcx>)> { + // We iterate over all threads in the order of their indices because + // this allows us to have a deterministic scheduler. + for thread in self.threads.indices() { + match self.timeout_callbacks.entry(thread) { + Entry::Occupied(entry) => + if entry.get().call_time.get_wait_time() == Duration::new(0, 0) { + return Some((thread, entry.remove().callback)); + }, + Entry::Vacant(_) => {} + } + } + None + } + + /// Wakes up threads joining on the active one and deallocates thread-local statics. + /// The `AllocId` that can now be freed are returned. + fn thread_terminated( + &mut self, + mut data_race: Option<&mut data_race::GlobalState>, + ) -> Vec> { + let mut free_tls_statics = Vec::new(); + { + let mut thread_local_statics = self.thread_local_alloc_ids.borrow_mut(); + thread_local_statics.retain(|&(_def_id, thread), &mut alloc_id| { + if thread != self.active_thread { + // Keep this static around. + return true; + } + // Delete this static from the map and from memory. + // We cannot free directly here as we cannot use `?` in this context. + free_tls_statics.push(alloc_id); + false + }); + } + // Set the thread into a terminated state in the data-race detector. + if let Some(ref mut data_race) = data_race { + data_race.thread_terminated(self); + } + // Check if we need to unblock any threads. + let mut joined_threads = vec![]; // store which threads joined, we'll need it + for (i, thread) in self.threads.iter_enumerated_mut() { + if thread.state == ThreadState::BlockedOnJoin(self.active_thread) { + // The thread has terminated, mark happens-before edge to joining thread + if data_race.is_some() { + joined_threads.push(i); + } + trace!("unblocking {:?} because {:?} terminated", i, self.active_thread); + thread.state = ThreadState::Enabled; + } + } + for &i in &joined_threads { + data_race.as_mut().unwrap().thread_joined(self, i, self.active_thread); + } + free_tls_statics + } + + /// Decide which action to take next and on which thread. + /// + /// The currently implemented scheduling policy is the one that is commonly + /// used in stateless model checkers such as Loom: run the active thread as + /// long as we can and switch only when we have to (the active thread was + /// blocked, terminated, or has explicitly asked to be preempted). + fn schedule(&mut self) -> InterpResult<'tcx, SchedulingAction> { + // Check whether the thread has **just** terminated (`check_terminated` + // checks whether the thread has popped all its stack and if yes, sets + // the thread state to terminated). + if self.threads[self.active_thread].check_terminated() { + return Ok(SchedulingAction::ExecuteDtors); + } + // If we get here again and the thread is *still* terminated, there are no more dtors to run. + if self.threads[MAIN_THREAD].state == ThreadState::Terminated { + // The main thread terminated; stop the program. + // We do *not* run TLS dtors of remaining threads, which seems to match rustc behavior. + return Ok(SchedulingAction::Stop); + } + // This thread and the program can keep going. + if self.threads[self.active_thread].state == ThreadState::Enabled + && !self.yield_active_thread + { + // The currently active thread is still enabled, just continue with it. + return Ok(SchedulingAction::ExecuteStep); + } + // The active thread yielded. Let's see if there are any timeouts to take care of. We do + // this *before* running any other thread, to ensure that timeouts "in the past" fire before + // any other thread can take an action. This ensures that for `pthread_cond_timedwait`, "an + // error is returned if [...] the absolute time specified by abstime has already been passed + // at the time of the call". + // + let potential_sleep_time = + self.timeout_callbacks.values().map(|info| info.call_time.get_wait_time()).min(); + if potential_sleep_time == Some(Duration::new(0, 0)) { + return Ok(SchedulingAction::ExecuteTimeoutCallback); + } + // No callbacks scheduled, pick a regular thread to execute. + // The active thread blocked or yielded. So we go search for another enabled thread. + // Curcially, we start searching at the current active thread ID, rather than at 0, since we + // want to avoid always scheduling threads 0 and 1 without ever making progress in thread 2. + // + // `skip(N)` means we start iterating at thread N, so we skip 1 more to start just *after* + // the active thread. Then after that we look at `take(N)`, i.e., the threads *before* the + // active thread. + let threads = self + .threads + .iter_enumerated() + .skip(self.active_thread.index() + 1) + .chain(self.threads.iter_enumerated().take(self.active_thread.index())); + for (id, thread) in threads { + debug_assert_ne!(self.active_thread, id); + if thread.state == ThreadState::Enabled { + self.active_thread = id; + break; + } + } + self.yield_active_thread = false; + if self.threads[self.active_thread].state == ThreadState::Enabled { + return Ok(SchedulingAction::ExecuteStep); + } + // We have not found a thread to execute. + if self.threads.iter().all(|thread| thread.state == ThreadState::Terminated) { + unreachable!("all threads terminated without the main thread terminating?!"); + } else if let Some(sleep_time) = potential_sleep_time { + // All threads are currently blocked, but we have unexecuted + // timeout_callbacks, which may unblock some of the threads. Hence, + // sleep until the first callback. + std::thread::sleep(sleep_time); + Ok(SchedulingAction::ExecuteTimeoutCallback) + } else { + throw_machine_stop!(TerminationInfo::Deadlock); + } + } +} + +// Public interface to thread management. +impl<'mir, 'tcx: 'mir> EvalContextExt<'mir, 'tcx> for crate::MiriEvalContext<'mir, 'tcx> {} +pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriEvalContextExt<'mir, 'tcx> { + /// Get a thread-specific allocation id for the given thread-local static. + /// If needed, allocate a new one. + fn get_or_create_thread_local_alloc( + &mut self, + def_id: DefId, + ) -> InterpResult<'tcx, Pointer> { + let this = self.eval_context_mut(); + let tcx = this.tcx; + if let Some(old_alloc) = this.machine.threads.get_thread_local_alloc_id(def_id) { + // We already have a thread-specific allocation id for this + // thread-local static. + Ok(old_alloc) + } else { + // We need to allocate a thread-specific allocation id for this + // thread-local static. + // First, we compute the initial value for this static. + if tcx.is_foreign_item(def_id) { + throw_unsup_format!("foreign thread-local statics are not supported"); + } + let allocation = tcx.eval_static_initializer(def_id)?; + let mut allocation = allocation.inner().clone(); + // This allocation will be deallocated when the thread dies, so it is not in read-only memory. + allocation.mutability = Mutability::Mut; + // Create a fresh allocation with this content. + let new_alloc = this.allocate_raw_ptr(allocation, MiriMemoryKind::Tls.into())?; + this.machine.threads.set_thread_local_alloc(def_id, new_alloc); + Ok(new_alloc) + } + } + + #[inline] + fn create_thread(&mut self) -> ThreadId { + let this = self.eval_context_mut(); + let id = this.machine.threads.create_thread(); + if let Some(data_race) = &mut this.machine.data_race { + data_race.thread_created(&this.machine.threads, id); + } + id + } + + #[inline] + fn start_thread( + &mut self, + thread: Option>, + start_routine: Pointer>, + start_abi: Abi, + func_arg: ImmTy<'tcx, Provenance>, + ret_layout: TyAndLayout<'tcx>, + ) -> InterpResult<'tcx, ThreadId> { + let this = self.eval_context_mut(); + + // Create the new thread + let new_thread_id = this.create_thread(); + + // Write the current thread-id, switch to the next thread later + // to treat this write operation as occuring on the current thread. + if let Some(thread_info_place) = thread { + this.write_scalar( + Scalar::from_uint(new_thread_id.to_u32(), thread_info_place.layout.size), + &thread_info_place.into(), + )?; + } + + // Finally switch to new thread so that we can push the first stackframe. + // After this all accesses will be treated as occuring in the new thread. + let old_thread_id = this.set_active_thread(new_thread_id); + + // Perform the function pointer load in the new thread frame. + let instance = this.get_ptr_fn(start_routine)?.as_instance()?; + + // Note: the returned value is currently ignored (see the FIXME in + // pthread_join in shims/unix/thread.rs) because the Rust standard library does not use + // it. + let ret_place = this.allocate(ret_layout, MiriMemoryKind::Machine.into())?; + + this.call_function( + instance, + start_abi, + &[*func_arg], + Some(&ret_place.into()), + StackPopCleanup::Root { cleanup: true }, + )?; + + // Restore the old active thread frame. + this.set_active_thread(old_thread_id); + + Ok(new_thread_id) + } + + #[inline] + fn detach_thread( + &mut self, + thread_id: ThreadId, + allow_terminated_joined: bool, + ) -> InterpResult<'tcx> { + let this = self.eval_context_mut(); + this.machine.threads.detach_thread(thread_id, allow_terminated_joined) + } + + #[inline] + fn join_thread(&mut self, joined_thread_id: ThreadId) -> InterpResult<'tcx> { + let this = self.eval_context_mut(); + this.machine.threads.join_thread(joined_thread_id, this.machine.data_race.as_mut())?; + Ok(()) + } + + #[inline] + fn join_thread_exclusive(&mut self, joined_thread_id: ThreadId) -> InterpResult<'tcx> { + let this = self.eval_context_mut(); + this.machine + .threads + .join_thread_exclusive(joined_thread_id, this.machine.data_race.as_mut())?; + Ok(()) + } + + #[inline] + fn set_active_thread(&mut self, thread_id: ThreadId) -> ThreadId { + let this = self.eval_context_mut(); + this.machine.threads.set_active_thread_id(thread_id) + } + + #[inline] + fn get_active_thread(&self) -> ThreadId { + let this = self.eval_context_ref(); + this.machine.threads.get_active_thread_id() + } + + #[inline] + fn active_thread_mut(&mut self) -> &mut Thread<'mir, 'tcx> { + let this = self.eval_context_mut(); + this.machine.threads.active_thread_mut() + } + + #[inline] + fn active_thread_ref(&self) -> &Thread<'mir, 'tcx> { + let this = self.eval_context_ref(); + this.machine.threads.active_thread_ref() + } + + #[inline] + fn get_total_thread_count(&self) -> usize { + let this = self.eval_context_ref(); + this.machine.threads.get_total_thread_count() + } + + #[inline] + fn has_terminated(&self, thread_id: ThreadId) -> bool { + let this = self.eval_context_ref(); + this.machine.threads.has_terminated(thread_id) + } + + #[inline] + fn have_all_terminated(&self) -> bool { + let this = self.eval_context_ref(); + this.machine.threads.have_all_terminated() + } + + #[inline] + fn enable_thread(&mut self, thread_id: ThreadId) { + let this = self.eval_context_mut(); + this.machine.threads.enable_thread(thread_id); + } + + #[inline] + fn active_thread_stack(&self) -> &[Frame<'mir, 'tcx, Provenance, FrameData<'tcx>>] { + let this = self.eval_context_ref(); + this.machine.threads.active_thread_stack() + } + + #[inline] + fn active_thread_stack_mut( + &mut self, + ) -> &mut Vec>> { + let this = self.eval_context_mut(); + this.machine.threads.active_thread_stack_mut() + } + + #[inline] + fn set_thread_name(&mut self, thread: ThreadId, new_thread_name: Vec) { + let this = self.eval_context_mut(); + this.machine.threads.set_thread_name(thread, new_thread_name); + } + + #[inline] + fn set_thread_name_wide(&mut self, thread: ThreadId, new_thread_name: &[u16]) { + let this = self.eval_context_mut(); + + // The Windows `GetThreadDescription` shim to get the thread name isn't implemented, so being lossy is okay. + // This is only read by diagnostics, which already use `from_utf8_lossy`. + this.machine + .threads + .set_thread_name(thread, String::from_utf16_lossy(new_thread_name).into_bytes()); + } + + #[inline] + fn get_thread_name<'c>(&'c self, thread: ThreadId) -> &'c [u8] + where + 'mir: 'c, + { + let this = self.eval_context_ref(); + this.machine.threads.get_thread_name(thread) + } + + #[inline] + fn block_thread(&mut self, thread: ThreadId) { + let this = self.eval_context_mut(); + this.machine.threads.block_thread(thread); + } + + #[inline] + fn unblock_thread(&mut self, thread: ThreadId) { + let this = self.eval_context_mut(); + this.machine.threads.unblock_thread(thread); + } + + #[inline] + fn yield_active_thread(&mut self) { + let this = self.eval_context_mut(); + this.machine.threads.yield_active_thread(); + } + + #[inline] + fn maybe_preempt_active_thread(&mut self) { + use rand::Rng as _; + + let this = self.eval_context_mut(); + if this.machine.rng.get_mut().gen_bool(this.machine.preemption_rate) { + this.yield_active_thread(); + } + } + + #[inline] + fn register_timeout_callback( + &mut self, + thread: ThreadId, + call_time: Time, + callback: TimeoutCallback<'mir, 'tcx>, + ) { + let this = self.eval_context_mut(); + this.machine.threads.register_timeout_callback(thread, call_time, callback); + } + + #[inline] + fn unregister_timeout_callback_if_exists(&mut self, thread: ThreadId) { + let this = self.eval_context_mut(); + this.machine.threads.unregister_timeout_callback_if_exists(thread); + } + + /// Execute a timeout callback on the callback's thread. + #[inline] + fn run_timeout_callback(&mut self) -> InterpResult<'tcx> { + let this = self.eval_context_mut(); + let (thread, callback) = + if let Some((thread, callback)) = this.machine.threads.get_ready_callback() { + (thread, callback) + } else { + // get_ready_callback can return None if the computer's clock + // was shifted after calling the scheduler and before the call + // to get_ready_callback (see issue + // https://github.com/rust-lang/miri/issues/1763). In this case, + // just do nothing, which effectively just returns to the + // scheduler. + return Ok(()); + }; + // This back-and-forth with `set_active_thread` is here because of two + // design decisions: + // 1. Make the caller and not the callback responsible for changing + // thread. + // 2. Make the scheduler the only place that can change the active + // thread. + let old_thread = this.set_active_thread(thread); + callback(this)?; + this.set_active_thread(old_thread); + Ok(()) + } + + /// Decide which action to take next and on which thread. + #[inline] + fn schedule(&mut self) -> InterpResult<'tcx, SchedulingAction> { + let this = self.eval_context_mut(); + this.machine.threads.schedule() + } + + /// Handles thread termination of the active thread: wakes up threads joining on this one, + /// and deallocated thread-local statics. + /// + /// This is called from `tls.rs` after handling the TLS dtors. + #[inline] + fn thread_terminated(&mut self) -> InterpResult<'tcx> { + let this = self.eval_context_mut(); + for ptr in this.machine.threads.thread_terminated(this.machine.data_race.as_mut()) { + this.deallocate_ptr(ptr.into(), None, MiriMemoryKind::Tls.into())?; + } + Ok(()) + } +} diff --git a/src/lib.rs b/src/lib.rs index 5657bba6478..846290ae6c6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -62,7 +62,6 @@ mod operator; mod range_map; mod shims; mod stacked_borrows; -pub mod thread; // Establish a "crate-wide prelude": we often import `crate::*`. @@ -104,10 +103,10 @@ pub use crate::range_map::RangeMap; pub use crate::stacked_borrows::{ CallId, EvalContextExt as StackedBorEvalContextExt, Item, Permission, SbTag, Stack, Stacks, }; -pub use crate::thread::{ +pub use concurrency::sync::{CondvarId, EvalContextExt as SyncEvalContextExt, MutexId, RwLockId}; +pub use concurrency::thread::{ EvalContextExt as ThreadsEvalContextExt, SchedulingAction, ThreadId, ThreadManager, ThreadState, }; -pub use concurrency::sync::{CondvarId, EvalContextExt as SyncEvalContextExt, MutexId, RwLockId}; /// Insert rustc arguments at the beginning of the argument list that Miri wants to be /// set per default, for maximal validation power. pub const MIRI_DEFAULT_ARGS: &[&str] = &[ diff --git a/src/shims/time.rs b/src/shims/time.rs index 3f4600e0e34..a574a0612c4 100644 --- a/src/shims/time.rs +++ b/src/shims/time.rs @@ -1,6 +1,6 @@ use std::time::{Duration, Instant, SystemTime}; -use crate::thread::Time; +use crate::concurrency::thread::Time; use crate::*; /// Returns the time elapsed between the provided time and the unix epoch as a `Duration`. diff --git a/src/shims/unix/linux/sync.rs b/src/shims/unix/linux/sync.rs index f9c97a23721..73a042d45b8 100644 --- a/src/shims/unix/linux/sync.rs +++ b/src/shims/unix/linux/sync.rs @@ -1,4 +1,4 @@ -use crate::thread::Time; +use crate::concurrency::thread::Time; use crate::*; use rustc_target::abi::{Align, Size}; use std::time::{Instant, SystemTime}; diff --git a/src/shims/unix/sync.rs b/src/shims/unix/sync.rs index 246cb100bcb..b8504fb08d4 100644 --- a/src/shims/unix/sync.rs +++ b/src/shims/unix/sync.rs @@ -3,7 +3,7 @@ use std::time::SystemTime; use rustc_hir::LangItem; use rustc_middle::ty::{layout::TyAndLayout, query::TyCtxtAt, subst::Subst, Ty}; -use crate::thread::Time; +use crate::concurrency::thread::Time; use crate::*; // pthread_mutexattr_t is either 4 or 8 bytes, depending on the platform. diff --git a/src/thread.rs b/src/thread.rs deleted file mode 100644 index dc8b1c29114..00000000000 --- a/src/thread.rs +++ /dev/null @@ -1,920 +0,0 @@ -//! Implements threads. - -use std::cell::RefCell; -use std::collections::hash_map::Entry; -use std::num::TryFromIntError; -use std::time::{Duration, Instant, SystemTime}; - -use log::trace; - -use rustc_data_structures::fx::FxHashMap; -use rustc_hir::def_id::DefId; -use rustc_index::vec::{Idx, IndexVec}; -use rustc_middle::mir::Mutability; -use rustc_middle::ty::layout::TyAndLayout; -use rustc_target::spec::abi::Abi; - -use crate::concurrency::data_race; -use crate::concurrency::sync::SynchronizationState; -use crate::*; - -#[derive(Clone, Copy, Debug, PartialEq, Eq)] -pub enum SchedulingAction { - /// Execute step on the active thread. - ExecuteStep, - /// Execute a timeout callback. - ExecuteTimeoutCallback, - /// Execute destructors of the active thread. - ExecuteDtors, - /// Stop the program. - Stop, -} - -/// Timeout callbacks can be created by synchronization primitives to tell the -/// scheduler that they should be called once some period of time passes. -type TimeoutCallback<'mir, 'tcx> = - Box>) -> InterpResult<'tcx> + 'tcx>; - -/// A thread identifier. -#[derive(Clone, Copy, Debug, PartialOrd, Ord, PartialEq, Eq, Hash)] -pub struct ThreadId(u32); - -/// The main thread. When it terminates, the whole application terminates. -const MAIN_THREAD: ThreadId = ThreadId(0); - -impl ThreadId { - pub fn to_u32(self) -> u32 { - self.0 - } -} - -impl Idx for ThreadId { - fn new(idx: usize) -> Self { - ThreadId(u32::try_from(idx).unwrap()) - } - - fn index(self) -> usize { - usize::try_from(self.0).unwrap() - } -} - -impl TryFrom for ThreadId { - type Error = TryFromIntError; - fn try_from(id: u64) -> Result { - u32::try_from(id).map(Self) - } -} - -impl From for ThreadId { - fn from(id: u32) -> Self { - Self(id) - } -} - -impl From for u64 { - fn from(t: ThreadId) -> Self { - t.0.into() - } -} - -/// The state of a thread. -#[derive(Debug, Copy, Clone, PartialEq, Eq)] -pub enum ThreadState { - /// The thread is enabled and can be executed. - Enabled, - /// The thread tried to join the specified thread and is blocked until that - /// thread terminates. - BlockedOnJoin(ThreadId), - /// The thread is blocked on some synchronization primitive. It is the - /// responsibility of the synchronization primitives to track threads that - /// are blocked by them. - BlockedOnSync, - /// The thread has terminated its execution. We do not delete terminated - /// threads (FIXME: why?). - Terminated, -} - -/// The join status of a thread. -#[derive(Debug, Copy, Clone, PartialEq, Eq)] -enum ThreadJoinStatus { - /// The thread can be joined. - Joinable, - /// A thread is detached if its join handle was destroyed and no other - /// thread can join it. - Detached, - /// The thread was already joined by some thread and cannot be joined again. - Joined, -} - -/// A thread. -pub struct Thread<'mir, 'tcx> { - state: ThreadState, - - /// Name of the thread. - thread_name: Option>, - - /// The virtual call stack. - stack: Vec>>, - - /// The join status. - join_status: ThreadJoinStatus, - - /// The temporary used for storing the argument of - /// the call to `miri_start_panic` (the panic payload) when unwinding. - /// This is pointer-sized, and matches the `Payload` type in `src/libpanic_unwind/miri.rs`. - pub(crate) panic_payload: Option>, - - /// Last OS error location in memory. It is a 32-bit integer. - pub(crate) last_error: Option>, -} - -impl<'mir, 'tcx> Thread<'mir, 'tcx> { - /// Check if the thread is done executing (no more stack frames). If yes, - /// change the state to terminated and return `true`. - fn check_terminated(&mut self) -> bool { - if self.state == ThreadState::Enabled { - if self.stack.is_empty() { - self.state = ThreadState::Terminated; - return true; - } - } - false - } - - /// Get the name of the current thread, or `` if it was not set. - fn thread_name(&self) -> &[u8] { - if let Some(ref thread_name) = self.thread_name { thread_name } else { b"" } - } -} - -impl<'mir, 'tcx> std::fmt::Debug for Thread<'mir, 'tcx> { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!( - f, - "{}({:?}, {:?})", - String::from_utf8_lossy(self.thread_name()), - self.state, - self.join_status - ) - } -} - -impl<'mir, 'tcx> Default for Thread<'mir, 'tcx> { - fn default() -> Self { - Self { - state: ThreadState::Enabled, - thread_name: None, - stack: Vec::new(), - join_status: ThreadJoinStatus::Joinable, - panic_payload: None, - last_error: None, - } - } -} - -impl<'mir, 'tcx> Thread<'mir, 'tcx> { - fn new(name: &str) -> Self { - let mut thread = Thread::default(); - thread.thread_name = Some(Vec::from(name.as_bytes())); - thread - } -} - -/// A specific moment in time. -#[derive(Debug)] -pub enum Time { - Monotonic(Instant), - RealTime(SystemTime), -} - -impl Time { - /// How long do we have to wait from now until the specified time? - fn get_wait_time(&self) -> Duration { - match self { - Time::Monotonic(instant) => instant.saturating_duration_since(Instant::now()), - Time::RealTime(time) => - time.duration_since(SystemTime::now()).unwrap_or(Duration::new(0, 0)), - } - } -} - -/// Callbacks are used to implement timeouts. For example, waiting on a -/// conditional variable with a timeout creates a callback that is called after -/// the specified time and unblocks the thread. If another thread signals on the -/// conditional variable, the signal handler deletes the callback. -struct TimeoutCallbackInfo<'mir, 'tcx> { - /// The callback should be called no earlier than this time. - call_time: Time, - /// The called function. - callback: TimeoutCallback<'mir, 'tcx>, -} - -impl<'mir, 'tcx> std::fmt::Debug for TimeoutCallbackInfo<'mir, 'tcx> { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "TimeoutCallback({:?})", self.call_time) - } -} - -/// A set of threads. -#[derive(Debug)] -pub struct ThreadManager<'mir, 'tcx> { - /// Identifier of the currently active thread. - active_thread: ThreadId, - /// Threads used in the program. - /// - /// Note that this vector also contains terminated threads. - threads: IndexVec>, - /// This field is pub(crate) because the synchronization primitives - /// (`crate::sync`) need a way to access it. - pub(crate) sync: SynchronizationState, - /// A mapping from a thread-local static to an allocation id of a thread - /// specific allocation. - thread_local_alloc_ids: RefCell>>, - /// A flag that indicates that we should change the active thread. - yield_active_thread: bool, - /// Callbacks that are called once the specified time passes. - timeout_callbacks: FxHashMap>, -} - -impl<'mir, 'tcx> Default for ThreadManager<'mir, 'tcx> { - fn default() -> Self { - let mut threads = IndexVec::new(); - // Create the main thread and add it to the list of threads. - threads.push(Thread::new("main")); - Self { - active_thread: ThreadId::new(0), - threads, - sync: SynchronizationState::default(), - thread_local_alloc_ids: Default::default(), - yield_active_thread: false, - timeout_callbacks: FxHashMap::default(), - } - } -} - -impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> { - pub(crate) fn init(ecx: &mut MiriEvalContext<'mir, 'tcx>) { - if ecx.tcx.sess.target.os.as_ref() != "windows" { - // The main thread can *not* be joined on except on windows. - ecx.machine.threads.threads[ThreadId::new(0)].join_status = ThreadJoinStatus::Detached; - } - } - - /// Check if we have an allocation for the given thread local static for the - /// active thread. - fn get_thread_local_alloc_id(&self, def_id: DefId) -> Option> { - self.thread_local_alloc_ids.borrow().get(&(def_id, self.active_thread)).cloned() - } - - /// Set the pointer for the allocation of the given thread local - /// static for the active thread. - /// - /// Panics if a thread local is initialized twice for the same thread. - fn set_thread_local_alloc(&self, def_id: DefId, ptr: Pointer) { - self.thread_local_alloc_ids - .borrow_mut() - .try_insert((def_id, self.active_thread), ptr) - .unwrap(); - } - - /// Borrow the stack of the active thread. - pub fn active_thread_stack(&self) -> &[Frame<'mir, 'tcx, Provenance, FrameData<'tcx>>] { - &self.threads[self.active_thread].stack - } - - /// Mutably borrow the stack of the active thread. - fn active_thread_stack_mut( - &mut self, - ) -> &mut Vec>> { - &mut self.threads[self.active_thread].stack - } - - pub fn all_stacks( - &self, - ) -> impl Iterator>]> { - self.threads.iter().map(|t| &t.stack[..]) - } - - /// Create a new thread and returns its id. - fn create_thread(&mut self) -> ThreadId { - let new_thread_id = ThreadId::new(self.threads.len()); - self.threads.push(Default::default()); - new_thread_id - } - - /// Set an active thread and return the id of the thread that was active before. - fn set_active_thread_id(&mut self, id: ThreadId) -> ThreadId { - let active_thread_id = self.active_thread; - self.active_thread = id; - assert!(self.active_thread.index() < self.threads.len()); - active_thread_id - } - - /// Get the id of the currently active thread. - pub fn get_active_thread_id(&self) -> ThreadId { - self.active_thread - } - - /// Get the total number of threads that were ever spawn by this program. - pub fn get_total_thread_count(&self) -> usize { - self.threads.len() - } - - /// Get the total of threads that are currently live, i.e., not yet terminated. - /// (They might be blocked.) - pub fn get_live_thread_count(&self) -> usize { - self.threads.iter().filter(|t| !matches!(t.state, ThreadState::Terminated)).count() - } - - /// Has the given thread terminated? - fn has_terminated(&self, thread_id: ThreadId) -> bool { - self.threads[thread_id].state == ThreadState::Terminated - } - - /// Have all threads terminated? - fn have_all_terminated(&self) -> bool { - self.threads.iter().all(|thread| thread.state == ThreadState::Terminated) - } - - /// Enable the thread for execution. The thread must be terminated. - fn enable_thread(&mut self, thread_id: ThreadId) { - assert!(self.has_terminated(thread_id)); - self.threads[thread_id].state = ThreadState::Enabled; - } - - /// Get a mutable borrow of the currently active thread. - fn active_thread_mut(&mut self) -> &mut Thread<'mir, 'tcx> { - &mut self.threads[self.active_thread] - } - - /// Get a shared borrow of the currently active thread. - fn active_thread_ref(&self) -> &Thread<'mir, 'tcx> { - &self.threads[self.active_thread] - } - - /// Mark the thread as detached, which means that no other thread will try - /// to join it and the thread is responsible for cleaning up. - /// - /// `allow_terminated_joined` allows detaching joined threads that have already terminated. - /// This matches Windows's behavior for `CloseHandle`. - /// - /// See : - /// > The handle is valid until closed, even after the thread it represents has been terminated. - fn detach_thread(&mut self, id: ThreadId, allow_terminated_joined: bool) -> InterpResult<'tcx> { - trace!("detaching {:?}", id); - - let is_ub = if allow_terminated_joined && self.threads[id].state == ThreadState::Terminated - { - // "Detached" in particular means "not yet joined". Redundant detaching is still UB. - self.threads[id].join_status == ThreadJoinStatus::Detached - } else { - self.threads[id].join_status != ThreadJoinStatus::Joinable - }; - if is_ub { - throw_ub_format!("trying to detach thread that was already detached or joined"); - } - - self.threads[id].join_status = ThreadJoinStatus::Detached; - Ok(()) - } - - /// Mark that the active thread tries to join the thread with `joined_thread_id`. - fn join_thread( - &mut self, - joined_thread_id: ThreadId, - data_race: Option<&mut data_race::GlobalState>, - ) -> InterpResult<'tcx> { - if self.threads[joined_thread_id].join_status == ThreadJoinStatus::Detached { - throw_ub_format!("trying to join a detached thread"); - } - - // Mark the joined thread as being joined so that we detect if other - // threads try to join it. - self.threads[joined_thread_id].join_status = ThreadJoinStatus::Joined; - if self.threads[joined_thread_id].state != ThreadState::Terminated { - // The joined thread is still running, we need to wait for it. - self.active_thread_mut().state = ThreadState::BlockedOnJoin(joined_thread_id); - trace!( - "{:?} blocked on {:?} when trying to join", - self.active_thread, - joined_thread_id - ); - } else { - // The thread has already terminated - mark join happens-before - if let Some(data_race) = data_race { - data_race.thread_joined(self, self.active_thread, joined_thread_id); - } - } - Ok(()) - } - - /// Mark that the active thread tries to exclusively join the thread with `joined_thread_id`. - /// If the thread is already joined by another thread, it will throw UB - fn join_thread_exclusive( - &mut self, - joined_thread_id: ThreadId, - data_race: Option<&mut data_race::GlobalState>, - ) -> InterpResult<'tcx> { - if self.threads[joined_thread_id].join_status == ThreadJoinStatus::Joined { - throw_ub_format!("trying to join an already joined thread"); - } - - if joined_thread_id == self.active_thread { - throw_ub_format!("trying to join itself"); - } - - assert!( - self.threads - .iter() - .all(|thread| thread.state != ThreadState::BlockedOnJoin(joined_thread_id)), - "this thread already has threads waiting for its termination" - ); - - self.join_thread(joined_thread_id, data_race) - } - - /// Set the name of the given thread. - pub fn set_thread_name(&mut self, thread: ThreadId, new_thread_name: Vec) { - self.threads[thread].thread_name = Some(new_thread_name); - } - - /// Get the name of the given thread. - pub fn get_thread_name(&self, thread: ThreadId) -> &[u8] { - self.threads[thread].thread_name() - } - - /// Put the thread into the blocked state. - fn block_thread(&mut self, thread: ThreadId) { - let state = &mut self.threads[thread].state; - assert_eq!(*state, ThreadState::Enabled); - *state = ThreadState::BlockedOnSync; - } - - /// Put the blocked thread into the enabled state. - fn unblock_thread(&mut self, thread: ThreadId) { - let state = &mut self.threads[thread].state; - assert_eq!(*state, ThreadState::BlockedOnSync); - *state = ThreadState::Enabled; - } - - /// Change the active thread to some enabled thread. - fn yield_active_thread(&mut self) { - // We do not yield immediately, as swapping out the current stack while executing a MIR statement - // could lead to all sorts of confusion. - // We should only switch stacks between steps. - self.yield_active_thread = true; - } - - /// Register the given `callback` to be called once the `call_time` passes. - /// - /// The callback will be called with `thread` being the active thread, and - /// the callback may not change the active thread. - fn register_timeout_callback( - &mut self, - thread: ThreadId, - call_time: Time, - callback: TimeoutCallback<'mir, 'tcx>, - ) { - self.timeout_callbacks - .try_insert(thread, TimeoutCallbackInfo { call_time, callback }) - .unwrap(); - } - - /// Unregister the callback for the `thread`. - fn unregister_timeout_callback_if_exists(&mut self, thread: ThreadId) { - self.timeout_callbacks.remove(&thread); - } - - /// Get a callback that is ready to be called. - fn get_ready_callback(&mut self) -> Option<(ThreadId, TimeoutCallback<'mir, 'tcx>)> { - // We iterate over all threads in the order of their indices because - // this allows us to have a deterministic scheduler. - for thread in self.threads.indices() { - match self.timeout_callbacks.entry(thread) { - Entry::Occupied(entry) => - if entry.get().call_time.get_wait_time() == Duration::new(0, 0) { - return Some((thread, entry.remove().callback)); - }, - Entry::Vacant(_) => {} - } - } - None - } - - /// Wakes up threads joining on the active one and deallocates thread-local statics. - /// The `AllocId` that can now be freed are returned. - fn thread_terminated( - &mut self, - mut data_race: Option<&mut data_race::GlobalState>, - ) -> Vec> { - let mut free_tls_statics = Vec::new(); - { - let mut thread_local_statics = self.thread_local_alloc_ids.borrow_mut(); - thread_local_statics.retain(|&(_def_id, thread), &mut alloc_id| { - if thread != self.active_thread { - // Keep this static around. - return true; - } - // Delete this static from the map and from memory. - // We cannot free directly here as we cannot use `?` in this context. - free_tls_statics.push(alloc_id); - false - }); - } - // Set the thread into a terminated state in the data-race detector. - if let Some(ref mut data_race) = data_race { - data_race.thread_terminated(self); - } - // Check if we need to unblock any threads. - let mut joined_threads = vec![]; // store which threads joined, we'll need it - for (i, thread) in self.threads.iter_enumerated_mut() { - if thread.state == ThreadState::BlockedOnJoin(self.active_thread) { - // The thread has terminated, mark happens-before edge to joining thread - if data_race.is_some() { - joined_threads.push(i); - } - trace!("unblocking {:?} because {:?} terminated", i, self.active_thread); - thread.state = ThreadState::Enabled; - } - } - for &i in &joined_threads { - data_race.as_mut().unwrap().thread_joined(self, i, self.active_thread); - } - free_tls_statics - } - - /// Decide which action to take next and on which thread. - /// - /// The currently implemented scheduling policy is the one that is commonly - /// used in stateless model checkers such as Loom: run the active thread as - /// long as we can and switch only when we have to (the active thread was - /// blocked, terminated, or has explicitly asked to be preempted). - fn schedule(&mut self) -> InterpResult<'tcx, SchedulingAction> { - // Check whether the thread has **just** terminated (`check_terminated` - // checks whether the thread has popped all its stack and if yes, sets - // the thread state to terminated). - if self.threads[self.active_thread].check_terminated() { - return Ok(SchedulingAction::ExecuteDtors); - } - // If we get here again and the thread is *still* terminated, there are no more dtors to run. - if self.threads[MAIN_THREAD].state == ThreadState::Terminated { - // The main thread terminated; stop the program. - // We do *not* run TLS dtors of remaining threads, which seems to match rustc behavior. - return Ok(SchedulingAction::Stop); - } - // This thread and the program can keep going. - if self.threads[self.active_thread].state == ThreadState::Enabled - && !self.yield_active_thread - { - // The currently active thread is still enabled, just continue with it. - return Ok(SchedulingAction::ExecuteStep); - } - // The active thread yielded. Let's see if there are any timeouts to take care of. We do - // this *before* running any other thread, to ensure that timeouts "in the past" fire before - // any other thread can take an action. This ensures that for `pthread_cond_timedwait`, "an - // error is returned if [...] the absolute time specified by abstime has already been passed - // at the time of the call". - // - let potential_sleep_time = - self.timeout_callbacks.values().map(|info| info.call_time.get_wait_time()).min(); - if potential_sleep_time == Some(Duration::new(0, 0)) { - return Ok(SchedulingAction::ExecuteTimeoutCallback); - } - // No callbacks scheduled, pick a regular thread to execute. - // The active thread blocked or yielded. So we go search for another enabled thread. - // Curcially, we start searching at the current active thread ID, rather than at 0, since we - // want to avoid always scheduling threads 0 and 1 without ever making progress in thread 2. - // - // `skip(N)` means we start iterating at thread N, so we skip 1 more to start just *after* - // the active thread. Then after that we look at `take(N)`, i.e., the threads *before* the - // active thread. - let threads = self - .threads - .iter_enumerated() - .skip(self.active_thread.index() + 1) - .chain(self.threads.iter_enumerated().take(self.active_thread.index())); - for (id, thread) in threads { - debug_assert_ne!(self.active_thread, id); - if thread.state == ThreadState::Enabled { - self.active_thread = id; - break; - } - } - self.yield_active_thread = false; - if self.threads[self.active_thread].state == ThreadState::Enabled { - return Ok(SchedulingAction::ExecuteStep); - } - // We have not found a thread to execute. - if self.threads.iter().all(|thread| thread.state == ThreadState::Terminated) { - unreachable!("all threads terminated without the main thread terminating?!"); - } else if let Some(sleep_time) = potential_sleep_time { - // All threads are currently blocked, but we have unexecuted - // timeout_callbacks, which may unblock some of the threads. Hence, - // sleep until the first callback. - std::thread::sleep(sleep_time); - Ok(SchedulingAction::ExecuteTimeoutCallback) - } else { - throw_machine_stop!(TerminationInfo::Deadlock); - } - } -} - -// Public interface to thread management. -impl<'mir, 'tcx: 'mir> EvalContextExt<'mir, 'tcx> for crate::MiriEvalContext<'mir, 'tcx> {} -pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriEvalContextExt<'mir, 'tcx> { - /// Get a thread-specific allocation id for the given thread-local static. - /// If needed, allocate a new one. - fn get_or_create_thread_local_alloc( - &mut self, - def_id: DefId, - ) -> InterpResult<'tcx, Pointer> { - let this = self.eval_context_mut(); - let tcx = this.tcx; - if let Some(old_alloc) = this.machine.threads.get_thread_local_alloc_id(def_id) { - // We already have a thread-specific allocation id for this - // thread-local static. - Ok(old_alloc) - } else { - // We need to allocate a thread-specific allocation id for this - // thread-local static. - // First, we compute the initial value for this static. - if tcx.is_foreign_item(def_id) { - throw_unsup_format!("foreign thread-local statics are not supported"); - } - let allocation = tcx.eval_static_initializer(def_id)?; - let mut allocation = allocation.inner().clone(); - // This allocation will be deallocated when the thread dies, so it is not in read-only memory. - allocation.mutability = Mutability::Mut; - // Create a fresh allocation with this content. - let new_alloc = this.allocate_raw_ptr(allocation, MiriMemoryKind::Tls.into())?; - this.machine.threads.set_thread_local_alloc(def_id, new_alloc); - Ok(new_alloc) - } - } - - #[inline] - fn create_thread(&mut self) -> ThreadId { - let this = self.eval_context_mut(); - let id = this.machine.threads.create_thread(); - if let Some(data_race) = &mut this.machine.data_race { - data_race.thread_created(&this.machine.threads, id); - } - id - } - - #[inline] - fn start_thread( - &mut self, - thread: Option>, - start_routine: Pointer>, - start_abi: Abi, - func_arg: ImmTy<'tcx, Provenance>, - ret_layout: TyAndLayout<'tcx>, - ) -> InterpResult<'tcx, ThreadId> { - let this = self.eval_context_mut(); - - // Create the new thread - let new_thread_id = this.create_thread(); - - // Write the current thread-id, switch to the next thread later - // to treat this write operation as occuring on the current thread. - if let Some(thread_info_place) = thread { - this.write_scalar( - Scalar::from_uint(new_thread_id.to_u32(), thread_info_place.layout.size), - &thread_info_place.into(), - )?; - } - - // Finally switch to new thread so that we can push the first stackframe. - // After this all accesses will be treated as occuring in the new thread. - let old_thread_id = this.set_active_thread(new_thread_id); - - // Perform the function pointer load in the new thread frame. - let instance = this.get_ptr_fn(start_routine)?.as_instance()?; - - // Note: the returned value is currently ignored (see the FIXME in - // pthread_join in shims/unix/thread.rs) because the Rust standard library does not use - // it. - let ret_place = this.allocate(ret_layout, MiriMemoryKind::Machine.into())?; - - this.call_function( - instance, - start_abi, - &[*func_arg], - Some(&ret_place.into()), - StackPopCleanup::Root { cleanup: true }, - )?; - - // Restore the old active thread frame. - this.set_active_thread(old_thread_id); - - Ok(new_thread_id) - } - - #[inline] - fn detach_thread( - &mut self, - thread_id: ThreadId, - allow_terminated_joined: bool, - ) -> InterpResult<'tcx> { - let this = self.eval_context_mut(); - this.machine.threads.detach_thread(thread_id, allow_terminated_joined) - } - - #[inline] - fn join_thread(&mut self, joined_thread_id: ThreadId) -> InterpResult<'tcx> { - let this = self.eval_context_mut(); - this.machine.threads.join_thread(joined_thread_id, this.machine.data_race.as_mut())?; - Ok(()) - } - - #[inline] - fn join_thread_exclusive(&mut self, joined_thread_id: ThreadId) -> InterpResult<'tcx> { - let this = self.eval_context_mut(); - this.machine - .threads - .join_thread_exclusive(joined_thread_id, this.machine.data_race.as_mut())?; - Ok(()) - } - - #[inline] - fn set_active_thread(&mut self, thread_id: ThreadId) -> ThreadId { - let this = self.eval_context_mut(); - this.machine.threads.set_active_thread_id(thread_id) - } - - #[inline] - fn get_active_thread(&self) -> ThreadId { - let this = self.eval_context_ref(); - this.machine.threads.get_active_thread_id() - } - - #[inline] - fn active_thread_mut(&mut self) -> &mut Thread<'mir, 'tcx> { - let this = self.eval_context_mut(); - this.machine.threads.active_thread_mut() - } - - #[inline] - fn active_thread_ref(&self) -> &Thread<'mir, 'tcx> { - let this = self.eval_context_ref(); - this.machine.threads.active_thread_ref() - } - - #[inline] - fn get_total_thread_count(&self) -> usize { - let this = self.eval_context_ref(); - this.machine.threads.get_total_thread_count() - } - - #[inline] - fn has_terminated(&self, thread_id: ThreadId) -> bool { - let this = self.eval_context_ref(); - this.machine.threads.has_terminated(thread_id) - } - - #[inline] - fn have_all_terminated(&self) -> bool { - let this = self.eval_context_ref(); - this.machine.threads.have_all_terminated() - } - - #[inline] - fn enable_thread(&mut self, thread_id: ThreadId) { - let this = self.eval_context_mut(); - this.machine.threads.enable_thread(thread_id); - } - - #[inline] - fn active_thread_stack(&self) -> &[Frame<'mir, 'tcx, Provenance, FrameData<'tcx>>] { - let this = self.eval_context_ref(); - this.machine.threads.active_thread_stack() - } - - #[inline] - fn active_thread_stack_mut( - &mut self, - ) -> &mut Vec>> { - let this = self.eval_context_mut(); - this.machine.threads.active_thread_stack_mut() - } - - #[inline] - fn set_thread_name(&mut self, thread: ThreadId, new_thread_name: Vec) { - let this = self.eval_context_mut(); - this.machine.threads.set_thread_name(thread, new_thread_name); - } - - #[inline] - fn set_thread_name_wide(&mut self, thread: ThreadId, new_thread_name: &[u16]) { - let this = self.eval_context_mut(); - - // The Windows `GetThreadDescription` shim to get the thread name isn't implemented, so being lossy is okay. - // This is only read by diagnostics, which already use `from_utf8_lossy`. - this.machine - .threads - .set_thread_name(thread, String::from_utf16_lossy(new_thread_name).into_bytes()); - } - - #[inline] - fn get_thread_name<'c>(&'c self, thread: ThreadId) -> &'c [u8] - where - 'mir: 'c, - { - let this = self.eval_context_ref(); - this.machine.threads.get_thread_name(thread) - } - - #[inline] - fn block_thread(&mut self, thread: ThreadId) { - let this = self.eval_context_mut(); - this.machine.threads.block_thread(thread); - } - - #[inline] - fn unblock_thread(&mut self, thread: ThreadId) { - let this = self.eval_context_mut(); - this.machine.threads.unblock_thread(thread); - } - - #[inline] - fn yield_active_thread(&mut self) { - let this = self.eval_context_mut(); - this.machine.threads.yield_active_thread(); - } - - #[inline] - fn maybe_preempt_active_thread(&mut self) { - use rand::Rng as _; - - let this = self.eval_context_mut(); - if this.machine.rng.get_mut().gen_bool(this.machine.preemption_rate) { - this.yield_active_thread(); - } - } - - #[inline] - fn register_timeout_callback( - &mut self, - thread: ThreadId, - call_time: Time, - callback: TimeoutCallback<'mir, 'tcx>, - ) { - let this = self.eval_context_mut(); - this.machine.threads.register_timeout_callback(thread, call_time, callback); - } - - #[inline] - fn unregister_timeout_callback_if_exists(&mut self, thread: ThreadId) { - let this = self.eval_context_mut(); - this.machine.threads.unregister_timeout_callback_if_exists(thread); - } - - /// Execute a timeout callback on the callback's thread. - #[inline] - fn run_timeout_callback(&mut self) -> InterpResult<'tcx> { - let this = self.eval_context_mut(); - let (thread, callback) = - if let Some((thread, callback)) = this.machine.threads.get_ready_callback() { - (thread, callback) - } else { - // get_ready_callback can return None if the computer's clock - // was shifted after calling the scheduler and before the call - // to get_ready_callback (see issue - // https://github.com/rust-lang/miri/issues/1763). In this case, - // just do nothing, which effectively just returns to the - // scheduler. - return Ok(()); - }; - // This back-and-forth with `set_active_thread` is here because of two - // design decisions: - // 1. Make the caller and not the callback responsible for changing - // thread. - // 2. Make the scheduler the only place that can change the active - // thread. - let old_thread = this.set_active_thread(thread); - callback(this)?; - this.set_active_thread(old_thread); - Ok(()) - } - - /// Decide which action to take next and on which thread. - #[inline] - fn schedule(&mut self) -> InterpResult<'tcx, SchedulingAction> { - let this = self.eval_context_mut(); - this.machine.threads.schedule() - } - - /// Handles thread termination of the active thread: wakes up threads joining on this one, - /// and deallocated thread-local statics. - /// - /// This is called from `tls.rs` after handling the TLS dtors. - #[inline] - fn thread_terminated(&mut self) -> InterpResult<'tcx> { - let this = self.eval_context_mut(); - for ptr in this.machine.threads.thread_terminated(this.machine.data_race.as_mut()) { - this.deallocate_ptr(ptr.into(), None, MiriMemoryKind::Tls.into())?; - } - Ok(()) - } -} -- cgit 1.4.1-3-g733a5