about summary refs log tree commit diff
path: root/src/libstd/sync
diff options
context:
space:
mode:
authorAlex Crichton <alex@alexcrichton.com>2015-01-02 09:19:00 -0800
committerAlex Crichton <alex@alexcrichton.com>2015-01-02 09:19:00 -0800
commit009ec5d2b0c4ab0e7dc7ab2f6b15754b4da14caf (patch)
tree8b441fd58860857f2e7bf5eabbf2226b92bf13c7 /src/libstd/sync
parent0101bbe7acb38e8113c0cafeb7d5ae0be6448e5b (diff)
parentf3a7ec7028c76b3a1c6051131328f372b068e33a (diff)
downloadrust-009ec5d2b0c4ab0e7dc7ab2f6b15754b4da14caf.tar.gz
rust-009ec5d2b0c4ab0e7dc7ab2f6b15754b4da14caf.zip
rollup merge of #20315: alexcrichton/std-sync
Conflicts:
	src/libstd/rt/exclusive.rs
	src/libstd/sync/barrier.rs
	src/libstd/sys/unix/pipe.rs
	src/test/bench/shootout-binarytrees.rs
	src/test/bench/shootout-fannkuch-redux.rs
Diffstat (limited to 'src/libstd/sync')
-rw-r--r--src/libstd/sync/atomic.rs8
-rw-r--r--src/libstd/sync/barrier.rs53
-rw-r--r--src/libstd/sync/condvar.rs2
-rw-r--r--src/libstd/sync/future.rs7
-rw-r--r--src/libstd/sync/mod.rs2
-rw-r--r--src/libstd/sync/mpsc/blocking.rs4
-rw-r--r--src/libstd/sync/once.rs33
-rw-r--r--src/libstd/sync/semaphore.rs3
-rw-r--r--src/libstd/sync/task_pool.rs5
9 files changed, 81 insertions, 36 deletions
diff --git a/src/libstd/sync/atomic.rs b/src/libstd/sync/atomic.rs
index a88932f21cb..d4d7607bde3 100644
--- a/src/libstd/sync/atomic.rs
+++ b/src/libstd/sync/atomic.rs
@@ -86,15 +86,15 @@
 //! Keep a global count of live tasks:
 //!
 //! ```
-//! use std::sync::atomic::{AtomicUint, SeqCst, INIT_ATOMIC_UINT};
+//! use std::sync::atomic::{AtomicUint, SeqCst, ATOMIC_UINT_INIT};
 //!
-//! static GLOBAL_TASK_COUNT: AtomicUint = INIT_ATOMIC_UINT;
+//! static GLOBAL_TASK_COUNT: AtomicUint = ATOMIC_UINT_INIT;
 //!
 //! let old_task_count = GLOBAL_TASK_COUNT.fetch_add(1, SeqCst);
 //! println!("live tasks: {}", old_task_count + 1);
 //! ```
 
-#![allow(deprecated)]
+#![stable]
 
 use alloc::boxed::Box;
 use core::mem;
@@ -102,6 +102,7 @@ use core::prelude::{Send, Drop, None, Option, Some};
 
 pub use core::atomic::{AtomicBool, AtomicInt, AtomicUint, AtomicPtr};
 pub use core::atomic::{INIT_ATOMIC_BOOL, INIT_ATOMIC_INT, INIT_ATOMIC_UINT};
+pub use core::atomic::{ATOMIC_BOOL_INIT, ATOMIC_INT_INIT, ATOMIC_UINT_INIT};
 pub use core::atomic::fence;
 pub use core::atomic::Ordering::{mod, Relaxed, Release, Acquire, AcqRel, SeqCst};
 
@@ -116,6 +117,7 @@ pub struct AtomicOption<T> {
     p: AtomicUint,
 }
 
+#[allow(deprecated)]
 impl<T: Send> AtomicOption<T> {
     /// Create a new `AtomicOption`
     pub fn new(p: Box<T>) -> AtomicOption<T> {
diff --git a/src/libstd/sync/barrier.rs b/src/libstd/sync/barrier.rs
index 50e4f848d82..bf5da3e7cba 100644
--- a/src/libstd/sync/barrier.rs
+++ b/src/libstd/sync/barrier.rs
@@ -8,7 +8,6 @@
 // option. This file may not be copied, modified, or distributed
 // except according to those terms.
 
-use kinds::{Send, Sync};
 use sync::{Mutex, Condvar};
 
 /// A barrier enables multiple tasks to synchronize the beginning
@@ -30,29 +29,32 @@ use sync::{Mutex, Condvar};
 ///     }).detach();
 /// }
 /// ```
+#[stable]
 pub struct Barrier {
     lock: Mutex<BarrierState>,
     cvar: Condvar,
     num_threads: uint,
 }
 
-unsafe impl Send for Barrier {}
-unsafe impl Sync for Barrier {}
-
 // The inner state of a double barrier
 struct BarrierState {
     count: uint,
     generation_id: uint,
 }
 
-unsafe impl Send for BarrierState {}
-unsafe impl Sync for BarrierState {}
+/// A result returned from wait.
+///
+/// Currently this opaque structure only has one method, `.is_leader()`. Only
+/// one thread will receive a result that will return `true` from this function.
+#[allow(missing_copy_implementations)]
+pub struct BarrierWaitResult(bool);
 
 impl Barrier {
     /// Create a new barrier that can block a given number of threads.
     ///
     /// A barrier will block `n`-1 threads which call `wait` and then wake up
     /// all threads at once when the `n`th thread calls `wait`.
+    #[stable]
     pub fn new(n: uint) -> Barrier {
         Barrier {
             lock: Mutex::new(BarrierState {
@@ -68,7 +70,13 @@ impl Barrier {
     ///
     /// Barriers are re-usable after all threads have rendezvoused once, and can
     /// be used continuously.
-    pub fn wait(&self) {
+    ///
+    /// A single (arbitrary) thread will receive a `BarrierWaitResult` that
+    /// returns `true` from `is_leader` when returning from this function, and
+    /// all other threads will receive a result that will return `false` from
+    /// `is_leader`
+    #[stable]
+    pub fn wait(&self) -> BarrierWaitResult {
         let mut lock = self.lock.lock().unwrap();
         let local_gen = lock.generation_id;
         lock.count += 1;
@@ -79,14 +87,25 @@ impl Barrier {
                   lock.count < self.num_threads {
                 lock = self.cvar.wait(lock).unwrap();
             }
+            BarrierWaitResult(false)
         } else {
             lock.count = 0;
             lock.generation_id += 1;
             self.cvar.notify_all();
+            BarrierWaitResult(true)
         }
     }
 }
 
+impl BarrierWaitResult {
+    /// Return whether this thread from `wait` is the "leader thread".
+    ///
+    /// Only one thread will have `true` returned from their result, all other
+    /// threads will have `false` returned.
+    #[stable]
+    pub fn is_leader(&self) -> bool { self.0 }
+}
+
 #[cfg(test)]
 mod tests {
     use prelude::v1::*;
@@ -97,15 +116,16 @@ mod tests {
 
     #[test]
     fn test_barrier() {
-        let barrier = Arc::new(Barrier::new(10));
+        const N: uint = 10;
+
+        let barrier = Arc::new(Barrier::new(N));
         let (tx, rx) = channel();
 
-        for _ in range(0u, 9) {
+        for _ in range(0u, N - 1) {
             let c = barrier.clone();
             let tx = tx.clone();
             Thread::spawn(move|| {
-                c.wait();
-                tx.send(true).unwrap();
+                tx.send(c.wait().is_leader()).unwrap();
             }).detach();
         }
 
@@ -116,10 +136,15 @@ mod tests {
             _ => false,
         });
 
-        barrier.wait();
+        let mut leader_found = barrier.wait().is_leader();
+
         // Now, the barrier is cleared and we should get data.
-        for _ in range(0u, 9) {
-            rx.recv().unwrap();
+        for _ in range(0u, N - 1) {
+            if rx.recv().unwrap() {
+                assert!(!leader_found);
+                leader_found = true;
+            }
         }
+        assert!(leader_found);
     }
 }
diff --git a/src/libstd/sync/condvar.rs b/src/libstd/sync/condvar.rs
index d71cdeb25fd..8d40a854aaf 100644
--- a/src/libstd/sync/condvar.rs
+++ b/src/libstd/sync/condvar.rs
@@ -88,7 +88,7 @@ unsafe impl Sync for StaticCondvar {}
 #[unstable = "may be merged with Condvar in the future"]
 pub const CONDVAR_INIT: StaticCondvar = StaticCondvar {
     inner: sys::CONDVAR_INIT,
-    mutex: atomic::INIT_ATOMIC_UINT,
+    mutex: atomic::ATOMIC_UINT_INIT,
 };
 
 impl Condvar {
diff --git a/src/libstd/sync/future.rs b/src/libstd/sync/future.rs
index e3620617d57..e5245251ea8 100644
--- a/src/libstd/sync/future.rs
+++ b/src/libstd/sync/future.rs
@@ -8,8 +8,8 @@
 // option. This file may not be copied, modified, or distributed
 // except according to those terms.
 
-//! A type representing values that may be computed concurrently and operations for working with
-//! them.
+//! A type representing values that may be computed concurrently and operations
+//! for working with them.
 //!
 //! # Example
 //!
@@ -23,6 +23,9 @@
 //! ```
 
 #![allow(missing_docs)]
+#![unstable = "futures as-is have yet to be deeply reevaluated with recent \
+               core changes to Rust's synchronization story, and will likely \
+               become stable in the future but are unstable until that time"]
 
 use core::prelude::*;
 use core::mem::replace;
diff --git a/src/libstd/sync/mod.rs b/src/libstd/sync/mod.rs
index 1f8e5d7ee37..c09c3b45d3e 100644
--- a/src/libstd/sync/mod.rs
+++ b/src/libstd/sync/mod.rs
@@ -26,7 +26,7 @@ pub use self::rwlock::{RWLockReadGuard, RWLockWriteGuard};
 pub use self::condvar::{Condvar, StaticCondvar, CONDVAR_INIT};
 pub use self::once::{Once, ONCE_INIT};
 pub use self::semaphore::{Semaphore, SemaphoreGuard};
-pub use self::barrier::Barrier;
+pub use self::barrier::{Barrier, BarrierWaitResult};
 pub use self::poison::{PoisonError, TryLockError, TryLockResult, LockResult};
 
 pub use self::future::Future;
diff --git a/src/libstd/sync/mpsc/blocking.rs b/src/libstd/sync/mpsc/blocking.rs
index 412b7161305..a5299012723 100644
--- a/src/libstd/sync/mpsc/blocking.rs
+++ b/src/libstd/sync/mpsc/blocking.rs
@@ -11,7 +11,7 @@
 //! Generic support for building blocking abstractions.
 
 use thread::Thread;
-use sync::atomic::{AtomicBool, INIT_ATOMIC_BOOL, Ordering};
+use sync::atomic::{AtomicBool, ATOMIC_BOOL_INIT, Ordering};
 use sync::Arc;
 use kinds::{Sync, Send};
 use kinds::marker::{NoSend, NoSync};
@@ -40,7 +40,7 @@ pub struct WaitToken {
 pub fn tokens() -> (WaitToken, SignalToken) {
     let inner = Arc::new(Inner {
         thread: Thread::current(),
-        woken: INIT_ATOMIC_BOOL,
+        woken: ATOMIC_BOOL_INIT,
     });
     let wait_token = WaitToken {
         inner: inner.clone(),
diff --git a/src/libstd/sync/once.rs b/src/libstd/sync/once.rs
index 17b7b70c301..9e9a17e482f 100644
--- a/src/libstd/sync/once.rs
+++ b/src/libstd/sync/once.rs
@@ -32,10 +32,11 @@ use sync::{StaticMutex, MUTEX_INIT};
 ///
 /// static START: Once = ONCE_INIT;
 ///
-/// START.doit(|| {
+/// START.call_once(|| {
 ///     // run initialization here
 /// });
 /// ```
+#[stable]
 pub struct Once {
     mutex: StaticMutex,
     cnt: atomic::AtomicInt,
@@ -45,23 +46,25 @@ pub struct Once {
 unsafe impl Sync for Once {}
 
 /// Initialization value for static `Once` values.
+#[stable]
 pub const ONCE_INIT: Once = Once {
     mutex: MUTEX_INIT,
-    cnt: atomic::INIT_ATOMIC_INT,
-    lock_cnt: atomic::INIT_ATOMIC_INT,
+    cnt: atomic::ATOMIC_INT_INIT,
+    lock_cnt: atomic::ATOMIC_INT_INIT,
 };
 
 impl Once {
     /// Perform an initialization routine once and only once. The given closure
-    /// will be executed if this is the first time `doit` has been called, and
-    /// otherwise the routine will *not* be invoked.
+    /// will be executed if this is the first time `call_once` has been called,
+    /// and otherwise the routine will *not* be invoked.
     ///
     /// This method will block the calling task if another initialization
     /// routine is currently running.
     ///
     /// When this function returns, it is guaranteed that some initialization
     /// has run and completed (it may not be the closure specified).
-    pub fn doit<F>(&'static self, f: F) where F: FnOnce() {
+    #[stable]
+    pub fn call_once<F>(&'static self, f: F) where F: FnOnce() {
         // Optimize common path: load is much cheaper than fetch_add.
         if self.cnt.load(atomic::SeqCst) < 0 {
             return
@@ -91,13 +94,13 @@ impl Once {
         //
         // It is crucial that the negative value is swapped in *after* the
         // initialization routine has completed because otherwise new threads
-        // calling `doit` will return immediately before the initialization has
-        // completed.
+        // calling `call_once` will return immediately before the initialization
+        // has completed.
 
         let prev = self.cnt.fetch_add(1, atomic::SeqCst);
         if prev < 0 {
             // Make sure we never overflow, we'll never have int::MIN
-            // simultaneous calls to `doit` to make this value go back to 0
+            // simultaneous calls to `call_once` to make this value go back to 0
             self.cnt.store(int::MIN, atomic::SeqCst);
             return
         }
@@ -118,6 +121,10 @@ impl Once {
             unsafe { self.mutex.destroy() }
         }
     }
+
+    /// Deprecated
+    #[deprecated = "renamed to `call_once`"]
+    pub fn doit<F>(&'static self, f: F) where F: FnOnce() { self.call_once(f) }
 }
 
 #[cfg(test)]
@@ -132,9 +139,9 @@ mod test {
     fn smoke_once() {
         static O: Once = ONCE_INIT;
         let mut a = 0i;
-        O.doit(|| a += 1);
+        O.call_once(|| a += 1);
         assert_eq!(a, 1);
-        O.doit(|| a += 1);
+        O.call_once(|| a += 1);
         assert_eq!(a, 1);
     }
 
@@ -149,7 +156,7 @@ mod test {
             Thread::spawn(move|| {
                 for _ in range(0u, 4) { Thread::yield_now() }
                 unsafe {
-                    O.doit(|| {
+                    O.call_once(|| {
                         assert!(!run);
                         run = true;
                     });
@@ -160,7 +167,7 @@ mod test {
         }
 
         unsafe {
-            O.doit(|| {
+            O.call_once(|| {
                 assert!(!run);
                 run = true;
             });
diff --git a/src/libstd/sync/semaphore.rs b/src/libstd/sync/semaphore.rs
index b03c0e08035..c0ff674ba0f 100644
--- a/src/libstd/sync/semaphore.rs
+++ b/src/libstd/sync/semaphore.rs
@@ -8,6 +8,9 @@
 // option. This file may not be copied, modified, or distributed
 // except according to those terms.
 
+#![unstable = "the interaction between semaphores and the acquisition/release \
+               of resources is currently unclear"]
+
 use ops::Drop;
 use sync::{Mutex, Condvar};
 
diff --git a/src/libstd/sync/task_pool.rs b/src/libstd/sync/task_pool.rs
index c34fa66d12a..088827dc084 100644
--- a/src/libstd/sync/task_pool.rs
+++ b/src/libstd/sync/task_pool.rs
@@ -10,6 +10,11 @@
 
 //! Abstraction of a thread pool for basic parallelism.
 
+#![unstable = "the semantics of a failing task and whether a thread is \
+               re-attached to a thread pool are somewhat unclear, and the \
+               utility of this type in `std::sync` is questionable with \
+               respect to the jobs of other primitives"]
+
 use core::prelude::*;
 
 use sync::{Arc, Mutex};