diff options
| author | Alex Crichton <alex@alexcrichton.com> | 2015-01-02 09:19:00 -0800 |
|---|---|---|
| committer | Alex Crichton <alex@alexcrichton.com> | 2015-01-02 09:19:00 -0800 |
| commit | 009ec5d2b0c4ab0e7dc7ab2f6b15754b4da14caf (patch) | |
| tree | 8b441fd58860857f2e7bf5eabbf2226b92bf13c7 /src/libstd/sync | |
| parent | 0101bbe7acb38e8113c0cafeb7d5ae0be6448e5b (diff) | |
| parent | f3a7ec7028c76b3a1c6051131328f372b068e33a (diff) | |
| download | rust-009ec5d2b0c4ab0e7dc7ab2f6b15754b4da14caf.tar.gz rust-009ec5d2b0c4ab0e7dc7ab2f6b15754b4da14caf.zip | |
rollup merge of #20315: alexcrichton/std-sync
Conflicts: src/libstd/rt/exclusive.rs src/libstd/sync/barrier.rs src/libstd/sys/unix/pipe.rs src/test/bench/shootout-binarytrees.rs src/test/bench/shootout-fannkuch-redux.rs
Diffstat (limited to 'src/libstd/sync')
| -rw-r--r-- | src/libstd/sync/atomic.rs | 8 | ||||
| -rw-r--r-- | src/libstd/sync/barrier.rs | 53 | ||||
| -rw-r--r-- | src/libstd/sync/condvar.rs | 2 | ||||
| -rw-r--r-- | src/libstd/sync/future.rs | 7 | ||||
| -rw-r--r-- | src/libstd/sync/mod.rs | 2 | ||||
| -rw-r--r-- | src/libstd/sync/mpsc/blocking.rs | 4 | ||||
| -rw-r--r-- | src/libstd/sync/once.rs | 33 | ||||
| -rw-r--r-- | src/libstd/sync/semaphore.rs | 3 | ||||
| -rw-r--r-- | src/libstd/sync/task_pool.rs | 5 |
9 files changed, 81 insertions, 36 deletions
diff --git a/src/libstd/sync/atomic.rs b/src/libstd/sync/atomic.rs index a88932f21cb..d4d7607bde3 100644 --- a/src/libstd/sync/atomic.rs +++ b/src/libstd/sync/atomic.rs @@ -86,15 +86,15 @@ //! Keep a global count of live tasks: //! //! ``` -//! use std::sync::atomic::{AtomicUint, SeqCst, INIT_ATOMIC_UINT}; +//! use std::sync::atomic::{AtomicUint, SeqCst, ATOMIC_UINT_INIT}; //! -//! static GLOBAL_TASK_COUNT: AtomicUint = INIT_ATOMIC_UINT; +//! static GLOBAL_TASK_COUNT: AtomicUint = ATOMIC_UINT_INIT; //! //! let old_task_count = GLOBAL_TASK_COUNT.fetch_add(1, SeqCst); //! println!("live tasks: {}", old_task_count + 1); //! ``` -#![allow(deprecated)] +#![stable] use alloc::boxed::Box; use core::mem; @@ -102,6 +102,7 @@ use core::prelude::{Send, Drop, None, Option, Some}; pub use core::atomic::{AtomicBool, AtomicInt, AtomicUint, AtomicPtr}; pub use core::atomic::{INIT_ATOMIC_BOOL, INIT_ATOMIC_INT, INIT_ATOMIC_UINT}; +pub use core::atomic::{ATOMIC_BOOL_INIT, ATOMIC_INT_INIT, ATOMIC_UINT_INIT}; pub use core::atomic::fence; pub use core::atomic::Ordering::{mod, Relaxed, Release, Acquire, AcqRel, SeqCst}; @@ -116,6 +117,7 @@ pub struct AtomicOption<T> { p: AtomicUint, } +#[allow(deprecated)] impl<T: Send> AtomicOption<T> { /// Create a new `AtomicOption` pub fn new(p: Box<T>) -> AtomicOption<T> { diff --git a/src/libstd/sync/barrier.rs b/src/libstd/sync/barrier.rs index 50e4f848d82..bf5da3e7cba 100644 --- a/src/libstd/sync/barrier.rs +++ b/src/libstd/sync/barrier.rs @@ -8,7 +8,6 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -use kinds::{Send, Sync}; use sync::{Mutex, Condvar}; /// A barrier enables multiple tasks to synchronize the beginning @@ -30,29 +29,32 @@ use sync::{Mutex, Condvar}; /// }).detach(); /// } /// ``` +#[stable] pub struct Barrier { lock: Mutex<BarrierState>, cvar: Condvar, num_threads: uint, } -unsafe impl Send for Barrier {} -unsafe impl Sync for Barrier {} - // The inner state of a double barrier struct BarrierState { count: uint, generation_id: uint, } -unsafe impl Send for BarrierState {} -unsafe impl Sync for BarrierState {} +/// A result returned from wait. +/// +/// Currently this opaque structure only has one method, `.is_leader()`. Only +/// one thread will receive a result that will return `true` from this function. +#[allow(missing_copy_implementations)] +pub struct BarrierWaitResult(bool); impl Barrier { /// Create a new barrier that can block a given number of threads. /// /// A barrier will block `n`-1 threads which call `wait` and then wake up /// all threads at once when the `n`th thread calls `wait`. + #[stable] pub fn new(n: uint) -> Barrier { Barrier { lock: Mutex::new(BarrierState { @@ -68,7 +70,13 @@ impl Barrier { /// /// Barriers are re-usable after all threads have rendezvoused once, and can /// be used continuously. - pub fn wait(&self) { + /// + /// A single (arbitrary) thread will receive a `BarrierWaitResult` that + /// returns `true` from `is_leader` when returning from this function, and + /// all other threads will receive a result that will return `false` from + /// `is_leader` + #[stable] + pub fn wait(&self) -> BarrierWaitResult { let mut lock = self.lock.lock().unwrap(); let local_gen = lock.generation_id; lock.count += 1; @@ -79,14 +87,25 @@ impl Barrier { lock.count < self.num_threads { lock = self.cvar.wait(lock).unwrap(); } + BarrierWaitResult(false) } else { lock.count = 0; lock.generation_id += 1; self.cvar.notify_all(); + BarrierWaitResult(true) } } } +impl BarrierWaitResult { + /// Return whether this thread from `wait` is the "leader thread". + /// + /// Only one thread will have `true` returned from their result, all other + /// threads will have `false` returned. + #[stable] + pub fn is_leader(&self) -> bool { self.0 } +} + #[cfg(test)] mod tests { use prelude::v1::*; @@ -97,15 +116,16 @@ mod tests { #[test] fn test_barrier() { - let barrier = Arc::new(Barrier::new(10)); + const N: uint = 10; + + let barrier = Arc::new(Barrier::new(N)); let (tx, rx) = channel(); - for _ in range(0u, 9) { + for _ in range(0u, N - 1) { let c = barrier.clone(); let tx = tx.clone(); Thread::spawn(move|| { - c.wait(); - tx.send(true).unwrap(); + tx.send(c.wait().is_leader()).unwrap(); }).detach(); } @@ -116,10 +136,15 @@ mod tests { _ => false, }); - barrier.wait(); + let mut leader_found = barrier.wait().is_leader(); + // Now, the barrier is cleared and we should get data. - for _ in range(0u, 9) { - rx.recv().unwrap(); + for _ in range(0u, N - 1) { + if rx.recv().unwrap() { + assert!(!leader_found); + leader_found = true; + } } + assert!(leader_found); } } diff --git a/src/libstd/sync/condvar.rs b/src/libstd/sync/condvar.rs index d71cdeb25fd..8d40a854aaf 100644 --- a/src/libstd/sync/condvar.rs +++ b/src/libstd/sync/condvar.rs @@ -88,7 +88,7 @@ unsafe impl Sync for StaticCondvar {} #[unstable = "may be merged with Condvar in the future"] pub const CONDVAR_INIT: StaticCondvar = StaticCondvar { inner: sys::CONDVAR_INIT, - mutex: atomic::INIT_ATOMIC_UINT, + mutex: atomic::ATOMIC_UINT_INIT, }; impl Condvar { diff --git a/src/libstd/sync/future.rs b/src/libstd/sync/future.rs index e3620617d57..e5245251ea8 100644 --- a/src/libstd/sync/future.rs +++ b/src/libstd/sync/future.rs @@ -8,8 +8,8 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -//! A type representing values that may be computed concurrently and operations for working with -//! them. +//! A type representing values that may be computed concurrently and operations +//! for working with them. //! //! # Example //! @@ -23,6 +23,9 @@ //! ``` #![allow(missing_docs)] +#![unstable = "futures as-is have yet to be deeply reevaluated with recent \ + core changes to Rust's synchronization story, and will likely \ + become stable in the future but are unstable until that time"] use core::prelude::*; use core::mem::replace; diff --git a/src/libstd/sync/mod.rs b/src/libstd/sync/mod.rs index 1f8e5d7ee37..c09c3b45d3e 100644 --- a/src/libstd/sync/mod.rs +++ b/src/libstd/sync/mod.rs @@ -26,7 +26,7 @@ pub use self::rwlock::{RWLockReadGuard, RWLockWriteGuard}; pub use self::condvar::{Condvar, StaticCondvar, CONDVAR_INIT}; pub use self::once::{Once, ONCE_INIT}; pub use self::semaphore::{Semaphore, SemaphoreGuard}; -pub use self::barrier::Barrier; +pub use self::barrier::{Barrier, BarrierWaitResult}; pub use self::poison::{PoisonError, TryLockError, TryLockResult, LockResult}; pub use self::future::Future; diff --git a/src/libstd/sync/mpsc/blocking.rs b/src/libstd/sync/mpsc/blocking.rs index 412b7161305..a5299012723 100644 --- a/src/libstd/sync/mpsc/blocking.rs +++ b/src/libstd/sync/mpsc/blocking.rs @@ -11,7 +11,7 @@ //! Generic support for building blocking abstractions. use thread::Thread; -use sync::atomic::{AtomicBool, INIT_ATOMIC_BOOL, Ordering}; +use sync::atomic::{AtomicBool, ATOMIC_BOOL_INIT, Ordering}; use sync::Arc; use kinds::{Sync, Send}; use kinds::marker::{NoSend, NoSync}; @@ -40,7 +40,7 @@ pub struct WaitToken { pub fn tokens() -> (WaitToken, SignalToken) { let inner = Arc::new(Inner { thread: Thread::current(), - woken: INIT_ATOMIC_BOOL, + woken: ATOMIC_BOOL_INIT, }); let wait_token = WaitToken { inner: inner.clone(), diff --git a/src/libstd/sync/once.rs b/src/libstd/sync/once.rs index 17b7b70c301..9e9a17e482f 100644 --- a/src/libstd/sync/once.rs +++ b/src/libstd/sync/once.rs @@ -32,10 +32,11 @@ use sync::{StaticMutex, MUTEX_INIT}; /// /// static START: Once = ONCE_INIT; /// -/// START.doit(|| { +/// START.call_once(|| { /// // run initialization here /// }); /// ``` +#[stable] pub struct Once { mutex: StaticMutex, cnt: atomic::AtomicInt, @@ -45,23 +46,25 @@ pub struct Once { unsafe impl Sync for Once {} /// Initialization value for static `Once` values. +#[stable] pub const ONCE_INIT: Once = Once { mutex: MUTEX_INIT, - cnt: atomic::INIT_ATOMIC_INT, - lock_cnt: atomic::INIT_ATOMIC_INT, + cnt: atomic::ATOMIC_INT_INIT, + lock_cnt: atomic::ATOMIC_INT_INIT, }; impl Once { /// Perform an initialization routine once and only once. The given closure - /// will be executed if this is the first time `doit` has been called, and - /// otherwise the routine will *not* be invoked. + /// will be executed if this is the first time `call_once` has been called, + /// and otherwise the routine will *not* be invoked. /// /// This method will block the calling task if another initialization /// routine is currently running. /// /// When this function returns, it is guaranteed that some initialization /// has run and completed (it may not be the closure specified). - pub fn doit<F>(&'static self, f: F) where F: FnOnce() { + #[stable] + pub fn call_once<F>(&'static self, f: F) where F: FnOnce() { // Optimize common path: load is much cheaper than fetch_add. if self.cnt.load(atomic::SeqCst) < 0 { return @@ -91,13 +94,13 @@ impl Once { // // It is crucial that the negative value is swapped in *after* the // initialization routine has completed because otherwise new threads - // calling `doit` will return immediately before the initialization has - // completed. + // calling `call_once` will return immediately before the initialization + // has completed. let prev = self.cnt.fetch_add(1, atomic::SeqCst); if prev < 0 { // Make sure we never overflow, we'll never have int::MIN - // simultaneous calls to `doit` to make this value go back to 0 + // simultaneous calls to `call_once` to make this value go back to 0 self.cnt.store(int::MIN, atomic::SeqCst); return } @@ -118,6 +121,10 @@ impl Once { unsafe { self.mutex.destroy() } } } + + /// Deprecated + #[deprecated = "renamed to `call_once`"] + pub fn doit<F>(&'static self, f: F) where F: FnOnce() { self.call_once(f) } } #[cfg(test)] @@ -132,9 +139,9 @@ mod test { fn smoke_once() { static O: Once = ONCE_INIT; let mut a = 0i; - O.doit(|| a += 1); + O.call_once(|| a += 1); assert_eq!(a, 1); - O.doit(|| a += 1); + O.call_once(|| a += 1); assert_eq!(a, 1); } @@ -149,7 +156,7 @@ mod test { Thread::spawn(move|| { for _ in range(0u, 4) { Thread::yield_now() } unsafe { - O.doit(|| { + O.call_once(|| { assert!(!run); run = true; }); @@ -160,7 +167,7 @@ mod test { } unsafe { - O.doit(|| { + O.call_once(|| { assert!(!run); run = true; }); diff --git a/src/libstd/sync/semaphore.rs b/src/libstd/sync/semaphore.rs index b03c0e08035..c0ff674ba0f 100644 --- a/src/libstd/sync/semaphore.rs +++ b/src/libstd/sync/semaphore.rs @@ -8,6 +8,9 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +#![unstable = "the interaction between semaphores and the acquisition/release \ + of resources is currently unclear"] + use ops::Drop; use sync::{Mutex, Condvar}; diff --git a/src/libstd/sync/task_pool.rs b/src/libstd/sync/task_pool.rs index c34fa66d12a..088827dc084 100644 --- a/src/libstd/sync/task_pool.rs +++ b/src/libstd/sync/task_pool.rs @@ -10,6 +10,11 @@ //! Abstraction of a thread pool for basic parallelism. +#![unstable = "the semantics of a failing task and whether a thread is \ + re-attached to a thread pool are somewhat unclear, and the \ + utility of this type in `std::sync` is questionable with \ + respect to the jobs of other primitives"] + use core::prelude::*; use sync::{Arc, Mutex}; |
