about summary refs log tree commit diff
path: root/src/libstd/thread/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/libstd/thread/mod.rs')
-rw-r--r--src/libstd/thread/mod.rs113
1 files changed, 88 insertions, 25 deletions
diff --git a/src/libstd/thread/mod.rs b/src/libstd/thread/mod.rs
index 30887b16c60..ee49bf796b8 100644
--- a/src/libstd/thread/mod.rs
+++ b/src/libstd/thread/mod.rs
@@ -25,11 +25,15 @@
 //!
 //! Fatal logic errors in Rust cause *thread panic*, during which
 //! a thread will unwind the stack, running destructors and freeing
-//! owned resources. Thread panic is unrecoverable from within
-//! the panicking thread (i.e. there is no 'try/catch' in Rust), but
-//! the panic may optionally be detected from a different thread. If
-//! the main thread panics, the application will exit with a non-zero
-//! exit code.
+//! owned resources. While not meant as a 'try/catch' mechanism, panics
+//! in Rust can nonetheless be caught (unless compiling with `panic=abort`) with
+//! [`catch_unwind`](../../std/panic/fn.catch_unwind.html) and recovered
+//! from, or alternatively be resumed with
+//! [`resume_unwind`](../../std/panic/fn.resume_unwind.html). If the panic
+//! is not caught the thread will exit, but the panic may optionally be
+//! detected from a different thread with [`join`]. If the main thread panics
+//! without the panic being caught, the application will exit with a
+//! non-zero exit code.
 //!
 //! When the main thread of a Rust program terminates, the entire program shuts
 //! down, even if other threads are still running. However, this module provides
@@ -171,6 +175,8 @@ use panic;
 use panicking;
 use str;
 use sync::{Mutex, Condvar, Arc};
+use sync::atomic::AtomicUsize;
+use sync::atomic::Ordering::SeqCst;
 use sys::thread as imp;
 use sys_common::mutex;
 use sys_common::thread_info;
@@ -485,15 +491,17 @@ impl Builder {
 /// let (tx, rx) = channel();
 ///
 /// let sender = thread::spawn(move || {
-///     let _ = tx.send("Hello, thread".to_owned());
+///     tx.send("Hello, thread".to_owned())
+///         .expect("Unable to send on channel");
 /// });
 ///
 /// let receiver = thread::spawn(move || {
-///     println!("{}", rx.recv().unwrap());
+///     let value = rx.recv().expect("Unable to receive from channel");
+///     println!("{}", value);
 /// });
 ///
-/// let _ = sender.join();
-/// let _ = receiver.join();
+/// sender.join().expect("The sender thread has panicked");
+/// receiver.join().expect("The receiver thread has panicked");
 /// ```
 ///
 /// A thread can also return a value through its [`JoinHandle`], you can use
@@ -692,6 +700,11 @@ pub fn sleep(dur: Duration) {
     imp::Thread::sleep(dur)
 }
 
+// constants for park/unpark
+const EMPTY: usize = 0;
+const PARKED: usize = 1;
+const NOTIFIED: usize = 2;
+
 /// Blocks unless or until the current thread's token is made available.
 ///
 /// A call to `park` does not guarantee that the thread will remain parked
@@ -769,11 +782,27 @@ pub fn sleep(dur: Duration) {
 #[stable(feature = "rust1", since = "1.0.0")]
 pub fn park() {
     let thread = current();
-    let mut guard = thread.inner.lock.lock().unwrap();
-    while !*guard {
-        guard = thread.inner.cvar.wait(guard).unwrap();
+
+    // If we were previously notified then we consume this notification and
+    // return quickly.
+    if thread.inner.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst).is_ok() {
+        return
+    }
+
+    // Otherwise we need to coordinate going to sleep
+    let mut m = thread.inner.lock.lock().unwrap();
+    match thread.inner.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
+        Ok(_) => {}
+        Err(NOTIFIED) => return, // notified after we locked
+        Err(_) => panic!("inconsistent park state"),
+    }
+    loop {
+        m = thread.inner.cvar.wait(m).unwrap();
+        match thread.inner.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) {
+            Ok(_) => return, // got a notification
+            Err(_) => {} // spurious wakeup, go back to sleep
+        }
     }
-    *guard = false;
 }
 
 /// Use [`park_timeout`].
@@ -840,12 +869,30 @@ pub fn park_timeout_ms(ms: u32) {
 #[stable(feature = "park_timeout", since = "1.4.0")]
 pub fn park_timeout(dur: Duration) {
     let thread = current();
-    let mut guard = thread.inner.lock.lock().unwrap();
-    if !*guard {
-        let (g, _) = thread.inner.cvar.wait_timeout(guard, dur).unwrap();
-        guard = g;
+
+    // Like `park` above we have a fast path for an already-notified thread, and
+    // afterwards we start coordinating for a sleep.
+    // return quickly.
+    if thread.inner.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst).is_ok() {
+        return
+    }
+    let m = thread.inner.lock.lock().unwrap();
+    match thread.inner.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
+        Ok(_) => {}
+        Err(NOTIFIED) => return, // notified after we locked
+        Err(_) => panic!("inconsistent park_timeout state"),
+    }
+
+    // Wait with a timeout, and if we spuriously wake up or otherwise wake up
+    // from a notification we just want to unconditionally set the state back to
+    // empty, either consuming a notification or un-flagging ourselves as
+    // parked.
+    let (_m, _result) = thread.inner.cvar.wait_timeout(m, dur).unwrap();
+    match thread.inner.state.swap(EMPTY, SeqCst) {
+        NOTIFIED => {} // got a notification, hurray!
+        PARKED => {} // no notification, alas
+        n => panic!("inconsistent park_timeout state: {}", n),
     }
-    *guard = false;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -912,7 +959,10 @@ impl ThreadId {
 struct Inner {
     name: Option<CString>,      // Guaranteed to be UTF-8
     id: ThreadId,
-    lock: Mutex<bool>,          // true when there is a buffered unpark
+
+    // state for thread park/unpark
+    state: AtomicUsize,
+    lock: Mutex<()>,
     cvar: Condvar,
 }
 
@@ -956,7 +1006,8 @@ impl Thread {
             inner: Arc::new(Inner {
                 name: cname,
                 id: ThreadId::new(),
-                lock: Mutex::new(false),
+                state: AtomicUsize::new(EMPTY),
+                lock: Mutex::new(()),
                 cvar: Condvar::new(),
             })
         }
@@ -996,10 +1047,22 @@ impl Thread {
     /// [park]: fn.park.html
     #[stable(feature = "rust1", since = "1.0.0")]
     pub fn unpark(&self) {
-        let mut guard = self.inner.lock.lock().unwrap();
-        if !*guard {
-            *guard = true;
-            self.inner.cvar.notify_one();
+        loop {
+            match self.inner.state.compare_exchange(EMPTY, NOTIFIED, SeqCst, SeqCst) {
+                Ok(_) => return, // no one was waiting
+                Err(NOTIFIED) => return, // already unparked
+                Err(PARKED) => {} // gotta go wake someone up
+                _ => panic!("inconsistent state in unpark"),
+            }
+
+            // Coordinate wakeup through the mutex and a condvar notification
+            let _lock = self.inner.lock.lock().unwrap();
+            match self.inner.state.compare_exchange(PARKED, NOTIFIED, SeqCst, SeqCst) {
+                Ok(_) => return self.inner.cvar.notify_one(),
+                Err(NOTIFIED) => return, // a different thread unparked
+                Err(EMPTY) => {} // parked thread went away, try again
+                _ => panic!("inconsistent state in unpark"),
+            }
         }
     }
 
@@ -1192,7 +1255,7 @@ impl<T> JoinInner<T> {
 ///     });
 /// });
 ///
-/// let _ = original_thread.join();
+/// original_thread.join().expect("The thread being joined has panicked");
 /// println!("Original thread is joined.");
 ///
 /// // We make sure that the new thread has time to run, before the main