diff options
Diffstat (limited to 'src/libstd/thread/mod.rs')
| -rw-r--r-- | src/libstd/thread/mod.rs | 113 |
1 files changed, 88 insertions, 25 deletions
diff --git a/src/libstd/thread/mod.rs b/src/libstd/thread/mod.rs index 30887b16c60..ee49bf796b8 100644 --- a/src/libstd/thread/mod.rs +++ b/src/libstd/thread/mod.rs @@ -25,11 +25,15 @@ //! //! Fatal logic errors in Rust cause *thread panic*, during which //! a thread will unwind the stack, running destructors and freeing -//! owned resources. Thread panic is unrecoverable from within -//! the panicking thread (i.e. there is no 'try/catch' in Rust), but -//! the panic may optionally be detected from a different thread. If -//! the main thread panics, the application will exit with a non-zero -//! exit code. +//! owned resources. While not meant as a 'try/catch' mechanism, panics +//! in Rust can nonetheless be caught (unless compiling with `panic=abort`) with +//! [`catch_unwind`](../../std/panic/fn.catch_unwind.html) and recovered +//! from, or alternatively be resumed with +//! [`resume_unwind`](../../std/panic/fn.resume_unwind.html). If the panic +//! is not caught the thread will exit, but the panic may optionally be +//! detected from a different thread with [`join`]. If the main thread panics +//! without the panic being caught, the application will exit with a +//! non-zero exit code. //! //! When the main thread of a Rust program terminates, the entire program shuts //! down, even if other threads are still running. However, this module provides @@ -171,6 +175,8 @@ use panic; use panicking; use str; use sync::{Mutex, Condvar, Arc}; +use sync::atomic::AtomicUsize; +use sync::atomic::Ordering::SeqCst; use sys::thread as imp; use sys_common::mutex; use sys_common::thread_info; @@ -485,15 +491,17 @@ impl Builder { /// let (tx, rx) = channel(); /// /// let sender = thread::spawn(move || { -/// let _ = tx.send("Hello, thread".to_owned()); +/// tx.send("Hello, thread".to_owned()) +/// .expect("Unable to send on channel"); /// }); /// /// let receiver = thread::spawn(move || { -/// println!("{}", rx.recv().unwrap()); +/// let value = rx.recv().expect("Unable to receive from channel"); +/// println!("{}", value); /// }); /// -/// let _ = sender.join(); -/// let _ = receiver.join(); +/// sender.join().expect("The sender thread has panicked"); +/// receiver.join().expect("The receiver thread has panicked"); /// ``` /// /// A thread can also return a value through its [`JoinHandle`], you can use @@ -692,6 +700,11 @@ pub fn sleep(dur: Duration) { imp::Thread::sleep(dur) } +// constants for park/unpark +const EMPTY: usize = 0; +const PARKED: usize = 1; +const NOTIFIED: usize = 2; + /// Blocks unless or until the current thread's token is made available. /// /// A call to `park` does not guarantee that the thread will remain parked @@ -769,11 +782,27 @@ pub fn sleep(dur: Duration) { #[stable(feature = "rust1", since = "1.0.0")] pub fn park() { let thread = current(); - let mut guard = thread.inner.lock.lock().unwrap(); - while !*guard { - guard = thread.inner.cvar.wait(guard).unwrap(); + + // If we were previously notified then we consume this notification and + // return quickly. + if thread.inner.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst).is_ok() { + return + } + + // Otherwise we need to coordinate going to sleep + let mut m = thread.inner.lock.lock().unwrap(); + match thread.inner.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) { + Ok(_) => {} + Err(NOTIFIED) => return, // notified after we locked + Err(_) => panic!("inconsistent park state"), + } + loop { + m = thread.inner.cvar.wait(m).unwrap(); + match thread.inner.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) { + Ok(_) => return, // got a notification + Err(_) => {} // spurious wakeup, go back to sleep + } } - *guard = false; } /// Use [`park_timeout`]. @@ -840,12 +869,30 @@ pub fn park_timeout_ms(ms: u32) { #[stable(feature = "park_timeout", since = "1.4.0")] pub fn park_timeout(dur: Duration) { let thread = current(); - let mut guard = thread.inner.lock.lock().unwrap(); - if !*guard { - let (g, _) = thread.inner.cvar.wait_timeout(guard, dur).unwrap(); - guard = g; + + // Like `park` above we have a fast path for an already-notified thread, and + // afterwards we start coordinating for a sleep. + // return quickly. + if thread.inner.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst).is_ok() { + return + } + let m = thread.inner.lock.lock().unwrap(); + match thread.inner.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) { + Ok(_) => {} + Err(NOTIFIED) => return, // notified after we locked + Err(_) => panic!("inconsistent park_timeout state"), + } + + // Wait with a timeout, and if we spuriously wake up or otherwise wake up + // from a notification we just want to unconditionally set the state back to + // empty, either consuming a notification or un-flagging ourselves as + // parked. + let (_m, _result) = thread.inner.cvar.wait_timeout(m, dur).unwrap(); + match thread.inner.state.swap(EMPTY, SeqCst) { + NOTIFIED => {} // got a notification, hurray! + PARKED => {} // no notification, alas + n => panic!("inconsistent park_timeout state: {}", n), } - *guard = false; } //////////////////////////////////////////////////////////////////////////////// @@ -912,7 +959,10 @@ impl ThreadId { struct Inner { name: Option<CString>, // Guaranteed to be UTF-8 id: ThreadId, - lock: Mutex<bool>, // true when there is a buffered unpark + + // state for thread park/unpark + state: AtomicUsize, + lock: Mutex<()>, cvar: Condvar, } @@ -956,7 +1006,8 @@ impl Thread { inner: Arc::new(Inner { name: cname, id: ThreadId::new(), - lock: Mutex::new(false), + state: AtomicUsize::new(EMPTY), + lock: Mutex::new(()), cvar: Condvar::new(), }) } @@ -996,10 +1047,22 @@ impl Thread { /// [park]: fn.park.html #[stable(feature = "rust1", since = "1.0.0")] pub fn unpark(&self) { - let mut guard = self.inner.lock.lock().unwrap(); - if !*guard { - *guard = true; - self.inner.cvar.notify_one(); + loop { + match self.inner.state.compare_exchange(EMPTY, NOTIFIED, SeqCst, SeqCst) { + Ok(_) => return, // no one was waiting + Err(NOTIFIED) => return, // already unparked + Err(PARKED) => {} // gotta go wake someone up + _ => panic!("inconsistent state in unpark"), + } + + // Coordinate wakeup through the mutex and a condvar notification + let _lock = self.inner.lock.lock().unwrap(); + match self.inner.state.compare_exchange(PARKED, NOTIFIED, SeqCst, SeqCst) { + Ok(_) => return self.inner.cvar.notify_one(), + Err(NOTIFIED) => return, // a different thread unparked + Err(EMPTY) => {} // parked thread went away, try again + _ => panic!("inconsistent state in unpark"), + } } } @@ -1192,7 +1255,7 @@ impl<T> JoinInner<T> { /// }); /// }); /// -/// let _ = original_thread.join(); +/// original_thread.join().expect("The thread being joined has panicked"); /// println!("Original thread is joined."); /// /// // We make sure that the new thread has time to run, before the main |
