diff options
| -rw-r--r-- | src/libextra/arc.rs | 65 | ||||
| -rw-r--r-- | src/libextra/sync.rs | 5 | ||||
| -rw-r--r-- | src/librustc/middle/trans/debuginfo.rs | 2 | ||||
| -rw-r--r-- | src/libstd/lib.rs | 1 | ||||
| -rw-r--r-- | src/libstd/sync/arc.rs | 153 | ||||
| -rw-r--r-- | src/libstd/sync/atomics.rs (renamed from src/libstd/unstable/atomics.rs) | 9 | ||||
| -rw-r--r-- | src/libstd/sync/deque.rs (renamed from src/libstd/rt/deque.rs) | 13 | ||||
| -rw-r--r-- | src/libstd/sync/mod.rs | 23 | ||||
| -rw-r--r-- | src/libstd/sync/mpmc_bounded_queue.rs (renamed from src/libstd/rt/mpmc_bounded_queue.rs) | 10 | ||||
| -rw-r--r-- | src/libstd/sync/mpsc_queue.rs (renamed from src/libstd/rt/mpsc_queue.rs) | 50 | ||||
| -rw-r--r-- | src/libstd/sync/spsc_queue.rs (renamed from src/libstd/rt/spsc_queue.rs) | 42 | ||||
| -rw-r--r-- | src/libstd/unstable/dynamic_lib.rs | 9 | ||||
| -rw-r--r-- | src/libstd/unstable/mod.rs | 1 | ||||
| -rw-r--r-- | src/libstd/unstable/mutex.rs | 2 | ||||
| -rw-r--r-- | src/libstd/unstable/sync.rs | 481 |
15 files changed, 288 insertions, 578 deletions
diff --git a/src/libextra/arc.rs b/src/libextra/arc.rs index c1763c37bb5..a411c4e9185 100644 --- a/src/libextra/arc.rs +++ b/src/libextra/arc.rs @@ -45,7 +45,7 @@ use sync; use sync::{Mutex, RWLock}; use std::cast; -use std::unstable::sync::UnsafeArc; +use std::sync::arc::UnsafeArc; use std::task; use std::borrow; @@ -127,20 +127,6 @@ impl<T:Freeze+Send> Arc<T> { pub fn get<'a>(&'a self) -> &'a T { unsafe { &*self.x.get_immut() } } - - /** - * Retrieve the data back out of the Arc. This function blocks until the - * reference given to it is the last existing one, and then unwrap the data - * instead of destroying it. - * - * If multiple tasks call unwrap, all but the first will fail. Do not call - * unwrap from a task that holds another reference to the same Arc; it is - * guaranteed to deadlock. - */ - pub fn unwrap(self) -> T { - let Arc { x: x } = self; - x.unwrap() - } } impl<T:Freeze + Send> Clone for Arc<T> { @@ -247,22 +233,6 @@ impl<T:Send> MutexArc<T> { cond: cond }) }) } - - /** - * Retrieves the data, blocking until all other references are dropped, - * exactly as arc::unwrap. - * - * Will additionally fail if another task has failed while accessing the arc. - */ - pub fn unwrap(self) -> T { - let MutexArc { x: x } = self; - let inner = x.unwrap(); - let MutexArcInner { failed: failed, data: data, .. } = inner; - if failed { - fail!("Can't unwrap poisoned MutexArc - another task failed inside!"); - } - data - } } impl<T:Freeze + Send> MutexArc<T> { @@ -503,23 +473,6 @@ impl<T:Freeze + Send> RWArc<T> { } } } - - /** - * Retrieves the data, blocking until all other references are dropped, - * exactly as arc::unwrap. - * - * Will additionally fail if another task has failed while accessing the arc - * in write mode. - */ - pub fn unwrap(self) -> T { - let RWArc { x: x, .. } = self; - let inner = x.unwrap(); - let RWArcInner { failed: failed, data: data, .. } = inner; - if failed { - fail!("Can't unwrap poisoned RWArc - another task failed inside!") - } - data - } } // Borrowck rightly complains about immutably aliasing the rwlock in order to @@ -689,22 +642,6 @@ mod tests { }) } - #[test] #[should_fail] - pub fn test_mutex_arc_unwrap_poison() { - let arc = MutexArc::new(1); - let arc2 = ~(&arc).clone(); - let (p, c) = Chan::new(); - do task::spawn { - arc2.access(|one| { - c.send(()); - assert!(*one == 2); - }) - } - let _ = p.recv(); - let one = arc.unwrap(); - assert!(one == 1); - } - #[test] fn test_unsafe_mutex_arc_nested() { unsafe { diff --git a/src/libextra/sync.rs b/src/libextra/sync.rs index 57a7f38696d..fb11eb6a3c4 100644 --- a/src/libextra/sync.rs +++ b/src/libextra/sync.rs @@ -19,8 +19,9 @@ use std::borrow; -use std::unstable::sync::{Exclusive, UnsafeArc}; -use std::unstable::atomics; +use std::unstable::sync::Exclusive; +use std::sync::arc::UnsafeArc; +use std::sync::atomics; use std::unstable::finally::Finally; use std::util; use std::util::NonCopyable; diff --git a/src/librustc/middle/trans/debuginfo.rs b/src/librustc/middle/trans/debuginfo.rs index a77e8f764f3..61fadb7e236 100644 --- a/src/librustc/middle/trans/debuginfo.rs +++ b/src/librustc/middle/trans/debuginfo.rs @@ -146,7 +146,7 @@ use std::hashmap::HashMap; use std::hashmap::HashSet; use std::libc::{c_uint, c_ulonglong, c_longlong}; use std::ptr; -use std::unstable::atomics; +use std::sync::atomics; use std::vec; use syntax::codemap::{Span, Pos}; use syntax::{ast, codemap, ast_util, ast_map, opt_vec}; diff --git a/src/libstd/lib.rs b/src/libstd/lib.rs index b2b856c5c83..200e4e63261 100644 --- a/src/libstd/lib.rs +++ b/src/libstd/lib.rs @@ -159,6 +159,7 @@ pub mod trie; pub mod task; pub mod comm; pub mod local_data; +pub mod sync; /* Runtime and platform support */ diff --git a/src/libstd/sync/arc.rs b/src/libstd/sync/arc.rs new file mode 100644 index 00000000000..7632ec6cf29 --- /dev/null +++ b/src/libstd/sync/arc.rs @@ -0,0 +1,153 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +//! Atomically reference counted data +//! +//! This modules contains the implementation of an atomically reference counted +//! pointer for the purpose of sharing data between tasks. This is obviously a +//! very unsafe primitive to use, but it has its use cases when implementing +//! concurrent data structures and similar tasks. +//! +//! Great care must be taken to ensure that data races do not arise through the +//! usage of `UnsafeArc`, and this often requires some form of external +//! synchronization. The only guarantee provided to you by this class is that +//! the underlying data will remain valid (not free'd) so long as the reference +//! count is greater than one. + +use cast; +use clone::Clone; +use kinds::Send; +use ops::Drop; +use ptr::RawPtr; +use sync::atomics::{AtomicUint, SeqCst, Relaxed, Acquire}; +use vec; + +/// An atomically reference counted pointer. +/// +/// Enforces no shared-memory safety. +#[unsafe_no_drop_flag] +pub struct UnsafeArc<T> { + priv data: *mut ArcData<T>, +} + +struct ArcData<T> { + count: AtomicUint, + data: T, +} + +unsafe fn new_inner<T: Send>(data: T, refcount: uint) -> *mut ArcData<T> { + let data = ~ArcData { count: AtomicUint::new(refcount), data: data }; + cast::transmute(data) +} + +impl<T: Send> UnsafeArc<T> { + /// Creates a new `UnsafeArc` which wraps the given data. + pub fn new(data: T) -> UnsafeArc<T> { + unsafe { UnsafeArc { data: new_inner(data, 1) } } + } + + /// As new(), but returns an extra pre-cloned handle. + pub fn new2(data: T) -> (UnsafeArc<T>, UnsafeArc<T>) { + unsafe { + let ptr = new_inner(data, 2); + (UnsafeArc { data: ptr }, UnsafeArc { data: ptr }) + } + } + + /// As new(), but returns a vector of as many pre-cloned handles as + /// requested. + pub fn newN(data: T, num_handles: uint) -> ~[UnsafeArc<T>] { + unsafe { + if num_handles == 0 { + ~[] // need to free data here + } else { + let ptr = new_inner(data, num_handles); + vec::from_fn(num_handles, |_| UnsafeArc { data: ptr }) + } + } + } + + /// Gets a pointer to the inner shared data. Note that care must be taken to + /// ensure that the outer `UnsafeArc` does not fall out of scope while this + /// pointer is in use, otherwise it could possibly contain a use-after-free. + #[inline] + pub fn get(&self) -> *mut T { + unsafe { + assert!((*self.data).count.load(Relaxed) > 0); + return &mut (*self.data).data as *mut T; + } + } + + /// Gets an immutable pointer to the inner shared data. This has the same + /// caveats as the `get` method. + #[inline] + pub fn get_immut(&self) -> *T { + unsafe { + assert!((*self.data).count.load(Relaxed) > 0); + return &(*self.data).data as *T; + } + } +} + +impl<T: Send> Clone for UnsafeArc<T> { + fn clone(&self) -> UnsafeArc<T> { + unsafe { + // This barrier might be unnecessary, but I'm not sure... + let old_count = (*self.data).count.fetch_add(1, Acquire); + assert!(old_count >= 1); + return UnsafeArc { data: self.data }; + } + } +} + +#[unsafe_destructor] +impl<T> Drop for UnsafeArc<T>{ + fn drop(&mut self) { + unsafe { + // Happens when destructing an unwrapper's handle and from + // `#[unsafe_no_drop_flag]` + if self.data.is_null() { + return + } + // Must be acquire+release, not just release, to make sure this + // doesn't get reordered to after the unwrapper pointer load. + let old_count = (*self.data).count.fetch_sub(1, SeqCst); + assert!(old_count >= 1); + if old_count == 1 { + let _: ~ArcData<T> = cast::transmute(self.data); + } + } + } +} + +#[cfg(test)] +mod tests { + use prelude::*; + use super::UnsafeArc; + use task; + use mem::size_of; + + #[test] + fn test_size() { + assert_eq!(size_of::<UnsafeArc<[int, ..10]>>(), size_of::<*[int, ..10]>()); + } + + #[test] + fn arclike_newN() { + // Tests that the many-refcounts-at-once constructors don't leak. + let _ = UnsafeArc::new2(~~"hello"); + let x = UnsafeArc::newN(~~"hello", 0); + assert_eq!(x.len(), 0) + let x = UnsafeArc::newN(~~"hello", 1); + assert_eq!(x.len(), 1) + let x = UnsafeArc::newN(~~"hello", 10); + assert_eq!(x.len(), 10) + } +} diff --git a/src/libstd/unstable/atomics.rs b/src/libstd/sync/atomics.rs index 9aaccb3ebba..bc9d99c0f37 100644 --- a/src/libstd/unstable/atomics.rs +++ b/src/libstd/sync/atomics.rs @@ -11,13 +11,16 @@ /*! * Atomic types * - * Basic atomic types supporting atomic operations. Each method takes an `Ordering` which - * represents the strength of the memory barrier for that operation. These orderings are the same - * as C++11 atomic orderings [http://gcc.gnu.org/wiki/Atomic/GCCMM/AtomicSync] + * Basic atomic types supporting atomic operations. Each method takes an + * `Ordering` which represents the strength of the memory barrier for that + * operation. These orderings are the same as C++11 atomic orderings + * [http://gcc.gnu.org/wiki/Atomic/GCCMM/AtomicSync] * * All atomic types are a single word in size. */ +#[allow(missing_doc)]; + use unstable::intrinsics; use cast; use option::{Option,Some,None}; diff --git a/src/libstd/rt/deque.rs b/src/libstd/sync/deque.rs index 770fc9ffa12..4d0efcd6ee1 100644 --- a/src/libstd/rt/deque.rs +++ b/src/libstd/sync/deque.rs @@ -50,15 +50,18 @@ use cast; use clone::Clone; -use iter::range; +use iter::{range, Iterator}; use kinds::Send; use libc; use mem; use ops::Drop; use option::{Option, Some, None}; use ptr; -use unstable::atomics::{AtomicInt, AtomicPtr, SeqCst}; -use unstable::sync::{UnsafeArc, Exclusive}; +use ptr::RawPtr; +use sync::arc::UnsafeArc; +use sync::atomics::{AtomicInt, AtomicPtr, SeqCst}; +use unstable::sync::Exclusive; +use vec::{OwnedVector, ImmutableVector}; // Once the queue is less than 1/K full, then it will be downsized. Note that // the deque requires that this number be less than 2. @@ -399,8 +402,8 @@ mod tests { use rt::thread::Thread; use rand; use rand::Rng; - use unstable::atomics::{AtomicBool, INIT_ATOMIC_BOOL, SeqCst, - AtomicUint, INIT_ATOMIC_UINT}; + use sync::atomics::{AtomicBool, INIT_ATOMIC_BOOL, SeqCst, + AtomicUint, INIT_ATOMIC_UINT}; use vec; #[test] diff --git a/src/libstd/sync/mod.rs b/src/libstd/sync/mod.rs new file mode 100644 index 00000000000..3213c538152 --- /dev/null +++ b/src/libstd/sync/mod.rs @@ -0,0 +1,23 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +//! Useful synchronization primitives +//! +//! This modules contains useful safe and unsafe synchronization primitives. +//! Most of the primitives in this module do not provide any sort of locking +//! and/or blocking at all, but rather provide the necessary tools to build +//! other types of concurrent primitives. + +pub mod arc; +pub mod atomics; +pub mod deque; +pub mod mpmc_bounded_queue; +pub mod mpsc_queue; +pub mod spsc_queue; diff --git a/src/libstd/rt/mpmc_bounded_queue.rs b/src/libstd/sync/mpmc_bounded_queue.rs index 25a3ba8ab48..b623976306d 100644 --- a/src/libstd/rt/mpmc_bounded_queue.rs +++ b/src/libstd/sync/mpmc_bounded_queue.rs @@ -25,15 +25,17 @@ * policies, either expressed or implied, of Dmitry Vyukov. */ +#[allow(missing_doc, dead_code)]; + // http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue -use unstable::sync::UnsafeArc; -use unstable::atomics::{AtomicUint,Relaxed,Release,Acquire}; -use option::*; -use vec; use clone::Clone; use kinds::Send; use num::{Exponential,Algebraic,Round}; +use option::{Option, Some, None}; +use sync::arc::UnsafeArc; +use sync::atomics::{AtomicUint,Relaxed,Release,Acquire}; +use vec; struct Node<T> { sequence: AtomicUint, diff --git a/src/libstd/rt/mpsc_queue.rs b/src/libstd/sync/mpsc_queue.rs index d575028af70..89e56e3fa67 100644 --- a/src/libstd/rt/mpsc_queue.rs +++ b/src/libstd/sync/mpsc_queue.rs @@ -26,6 +26,14 @@ */ //! A mostly lock-free multi-producer, single consumer queue. +//! +//! This module contains an implementation of a concurrent MPSC queue. This +//! queue can be used to share data between tasks, and is also used as the +//! building block of channels in rust. +//! +//! Note that the current implementation of this queue has a caveat of the `pop` +//! method, and see the method for more information about it. Due to this +//! caveat, this queue may not be appropriate for all use-cases. // http://www.1024cores.net/home/lock-free-algorithms // /queues/non-intrusive-mpsc-node-based-queue @@ -35,9 +43,11 @@ use clone::Clone; use kinds::Send; use ops::Drop; use option::{Option, None, Some}; -use unstable::atomics::{AtomicPtr, Release, Acquire, AcqRel, Relaxed}; -use unstable::sync::UnsafeArc; +use ptr::RawPtr; +use sync::arc::UnsafeArc; +use sync::atomics::{AtomicPtr, Release, Acquire, AcqRel, Relaxed}; +/// A result of the `pop` function. pub enum PopResult<T> { /// Some data has been popped Data(T), @@ -61,10 +71,14 @@ struct State<T, P> { packet: P, } +/// The consumer half of this concurrent queue. This half is used to receive +/// data from the producers. pub struct Consumer<T, P> { priv state: UnsafeArc<State<T, P>>, } +/// The production half of the concurrent queue. This handle may be cloned in +/// order to make handles for new producers. pub struct Producer<T, P> { priv state: UnsafeArc<State<T, P>>, } @@ -75,6 +89,11 @@ impl<T: Send, P: Send> Clone for Producer<T, P> { } } +/// Creates a new MPSC queue. The given argument `p` is a user-defined "packet" +/// of information which will be shared by the consumer and the producer which +/// can be re-acquired via the `packet` function. This is helpful when extra +/// state is shared between the producer and consumer, but note that there is no +/// synchronization performed of this data. pub fn queue<T: Send, P: Send>(p: P) -> (Consumer<T, P>, Producer<T, P>) { unsafe { let (a, b) = UnsafeArc::new2(State::new(p)); @@ -92,7 +111,7 @@ impl<T> Node<T> { } impl<T: Send, P: Send> State<T, P> { - pub unsafe fn new(p: P) -> State<T, P> { + unsafe fn new(p: P) -> State<T, P> { let stub = Node::new(None); State { head: AtomicPtr::new(stub), @@ -122,10 +141,6 @@ impl<T: Send, P: Send> State<T, P> { if self.head.load(Acquire) == tail {Empty} else {Inconsistent} } - - unsafe fn is_empty(&mut self) -> bool { - return (*self.tail).next.load(Acquire).is_null(); - } } #[unsafe_destructor] @@ -143,27 +158,42 @@ impl<T: Send, P: Send> Drop for State<T, P> { } impl<T: Send, P: Send> Producer<T, P> { + /// Pushes a new value onto this queue. pub fn push(&mut self, value: T) { unsafe { (*self.state.get()).push(value) } } - pub fn is_empty(&self) -> bool { - unsafe{ (*self.state.get()).is_empty() } - } + /// Gets an unsafe pointer to the user-defined packet shared by the + /// producers and the consumer. Note that care must be taken to ensure that + /// the lifetime of the queue outlives the usage of the returned pointer. pub unsafe fn packet(&self) -> *mut P { &mut (*self.state.get()).packet as *mut P } } impl<T: Send, P: Send> Consumer<T, P> { + /// Pops some data from this queue. + /// + /// Note that the current implementation means that this function cannot + /// return `Option<T>`. It is possible for this queue to be in an + /// inconsistent state where many pushes have suceeded and completely + /// finished, but pops cannot return `Some(t)`. This inconsistent state + /// happens when a pusher is pre-empted at an inopportune moment. + /// + /// This inconsistent state means that this queue does indeed have data, but + /// it does not currently have access to it at this time. pub fn pop(&mut self) -> PopResult<T> { unsafe { (*self.state.get()).pop() } } + /// Attempts to pop data from this queue, but doesn't attempt too hard. This + /// will canonicalize inconsistent states to a `None` value. pub fn casual_pop(&mut self) -> Option<T> { match self.pop() { Data(t) => Some(t), Empty | Inconsistent => None, } } + /// Gets an unsafe pointer to the underlying user-defined packet. See + /// `Producer.packet` for more information. pub unsafe fn packet(&self) -> *mut P { &mut (*self.state.get()).packet as *mut P } diff --git a/src/libstd/rt/spsc_queue.rs b/src/libstd/sync/spsc_queue.rs index f14533d726a..c4abba04659 100644 --- a/src/libstd/rt/spsc_queue.rs +++ b/src/libstd/sync/spsc_queue.rs @@ -26,12 +26,20 @@ */ // http://www.1024cores.net/home/lock-free-algorithms/queues/unbounded-spsc-queue + +//! A single-producer single-consumer concurrent queue +//! +//! This module contains the implementation of an SPSC queue which can be used +//! concurrently between two tasks. This data structure is safe to use and +//! enforces the semantics that there is one pusher and one popper. + use cast; use kinds::Send; use ops::Drop; use option::{Some, None, Option}; -use unstable::atomics::{AtomicPtr, Relaxed, AtomicUint, Acquire, Release}; -use unstable::sync::UnsafeArc; +use ptr::RawPtr; +use sync::arc::UnsafeArc; +use sync::atomics::{AtomicPtr, Relaxed, AtomicUint, Acquire, Release}; // Node within the linked list queue of messages to send struct Node<T> { @@ -64,14 +72,34 @@ struct State<T, P> { packet: P, } +/// Producer half of this queue. This handle is used to push data to the +/// consumer. pub struct Producer<T, P> { priv state: UnsafeArc<State<T, P>>, } +/// Consumer half of this queue. This handle is used to receive data from the +/// producer. pub struct Consumer<T, P> { priv state: UnsafeArc<State<T, P>>, } +/// Creates a new queue. The producer returned is connected to the consumer to +/// push all data to the consumer. +/// +/// # Arguments +/// +/// * `bound` - This queue implementation is implemented with a linked list, +/// and this means that a push is always a malloc. In order to +/// amortize this cost, an internal cache of nodes is maintained +/// to prevent a malloc from always being necessary. This bound is +/// the limit on the size of the cache (if desired). If the value +/// is 0, then the cache has no bound. Otherwise, the cache will +/// never grow larger than `bound` (although the queue itself +/// could be much larger. +/// +/// * `p` - This is the user-defined packet of data which will also be shared +/// between the producer and consumer. pub fn queue<T: Send, P: Send>(bound: uint, p: P) -> (Consumer<T, P>, Producer<T, P>) { @@ -105,21 +133,31 @@ impl<T: Send> Node<T> { } impl<T: Send, P: Send> Producer<T, P> { + /// Pushes data onto the queue pub fn push(&mut self, t: T) { unsafe { (*self.state.get()).push(t) } } + /// Tests whether the queue is empty. Note that if this function returns + /// `false`, the return value is significant, but if the return value is + /// `true` then almost no meaning can be attached to the return value. pub fn is_empty(&self) -> bool { unsafe { (*self.state.get()).is_empty() } } + /// Acquires an unsafe pointer to the underlying user-defined packet. Note + /// that care must be taken to ensure that the queue outlives the usage of + /// the packet (because it is an unsafe pointer). pub unsafe fn packet(&self) -> *mut P { &mut (*self.state.get()).packet as *mut P } } impl<T: Send, P: Send> Consumer<T, P> { + /// Pops some data from this queue, returning `None` when the queue is + /// empty. pub fn pop(&mut self) -> Option<T> { unsafe { (*self.state.get()).pop() } } + /// Same function as the producer's `packet` method. pub unsafe fn packet(&self) -> *mut P { &mut (*self.state.get()).packet as *mut P } diff --git a/src/libstd/unstable/dynamic_lib.rs b/src/libstd/unstable/dynamic_lib.rs index 03b25fbd044..0569fe32c58 100644 --- a/src/libstd/unstable/dynamic_lib.rs +++ b/src/libstd/unstable/dynamic_lib.rs @@ -140,7 +140,6 @@ pub mod dl { use path; use ptr; use str; - use unstable::sync::atomic; use result::*; pub unsafe fn open_external(filename: &path::Path) -> *libc::c_void { @@ -158,11 +157,7 @@ pub mod dl { static mut lock: Mutex = MUTEX_INIT; unsafe { // dlerror isn't thread safe, so we need to lock around this entire - // sequence. `atomic` asserts that we don't do anything that - // would cause this task to be descheduled, which could deadlock - // the scheduler if it happens while the lock is held. - // FIXME #9105 use a Rust mutex instead of C++ mutexes. - let _guard = atomic(); + // sequence lock.lock(); let _old_error = dlerror(); @@ -208,7 +203,6 @@ pub mod dl { use libc; use path; use ptr; - use unstable::sync::atomic; use result::*; pub unsafe fn open_external(filename: &path::Path) -> *libc::c_void { @@ -225,7 +219,6 @@ pub mod dl { pub fn check_for_errors_in<T>(f: || -> T) -> Result<T, ~str> { unsafe { - let _guard = atomic(); SetLastError(0); let result = f(); diff --git a/src/libstd/unstable/mod.rs b/src/libstd/unstable/mod.rs index 043d99eb1b8..f70d0b5169f 100644 --- a/src/libstd/unstable/mod.rs +++ b/src/libstd/unstable/mod.rs @@ -22,7 +22,6 @@ pub mod simd; pub mod lang; pub mod sync; pub mod mutex; -pub mod atomics; pub mod raw; /** diff --git a/src/libstd/unstable/mutex.rs b/src/libstd/unstable/mutex.rs index 3e7a861b385..eaf716f2726 100644 --- a/src/libstd/unstable/mutex.rs +++ b/src/libstd/unstable/mutex.rs @@ -48,7 +48,7 @@ #[allow(non_camel_case_types)]; use libc::c_void; -use unstable::atomics; +use sync::atomics; pub struct Mutex { // pointers for the lock/cond handles, atomically updated diff --git a/src/libstd/unstable/sync.rs b/src/libstd/unstable/sync.rs index 50fae1e0239..ad36f71cdea 100644 --- a/src/libstd/unstable/sync.rs +++ b/src/libstd/unstable/sync.rs @@ -8,353 +8,12 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -use cast; -use comm::{Chan, Port}; -use ptr; -use option::{Option,Some,None}; -use task; -use unstable::atomics::{AtomicOption,AtomicUint,Acquire,Release,Relaxed,SeqCst}; -use unstable::mutex::Mutex; -use ops::Drop; use clone::Clone; use kinds::Send; -use vec; - -/// An atomically reference counted pointer. -/// -/// Enforces no shared-memory safety. -//#[unsafe_no_drop_flag] FIXME: #9758 -pub struct UnsafeArc<T> { - data: *mut ArcData<T>, -} - -pub enum UnsafeArcUnwrap<T> { - UnsafeArcSelf(UnsafeArc<T>), - UnsafeArcT(T) -} - -#[cfg(test)] -impl<T> UnsafeArcUnwrap<T> { - fn expect_t(self, msg: &'static str) -> T { - match self { - UnsafeArcSelf(_) => fail!(msg), - UnsafeArcT(t) => t - } - } - - fn is_self(&self) -> bool { - match *self { - UnsafeArcSelf(_) => true, - UnsafeArcT(_) => false - } - } -} - -struct ArcData<T> { - count: AtomicUint, - // An unwrapper uses this protocol to communicate with the "other" task that - // drops the last refcount on an arc. Unfortunately this can't be a proper - // pipe protocol because the unwrapper has to access both stages at once. - // FIXME(#7544): Maybe use AtomicPtr instead (to avoid xchg in take() later)? - unwrapper: AtomicOption<(Chan<()>, Port<bool>)>, - // FIXME(#3224) should be able to make this non-option to save memory - data: Option<T>, -} - -unsafe fn new_inner<T: Send>(data: T, refcount: uint) -> *mut ArcData<T> { - let data = ~ArcData { count: AtomicUint::new(refcount), - unwrapper: AtomicOption::empty(), - data: Some(data) }; - cast::transmute(data) -} - -/// A helper object used by `UnsafeArc::unwrap`. -struct ChannelAndDataGuard<T> { - channel: Option<Chan<bool>>, - data: Option<~ArcData<T>>, -} - -#[unsafe_destructor] -impl<T> Drop for ChannelAndDataGuard<T> { - fn drop(&mut self) { - if task::failing() { - // Killed during wait. Because this might happen while - // someone else still holds a reference, we can't free - // the data now; the "other" last refcount will free it. - unsafe { - let channel = self.channel.take_unwrap(); - let data = self.data.take_unwrap(); - channel.send(false); - cast::forget(data); - } - } - } -} - -impl<T> ChannelAndDataGuard<T> { - fn unwrap(mut self) -> (Chan<bool>, ~ArcData<T>) { - (self.channel.take_unwrap(), self.data.take_unwrap()) - } -} - -impl<T: Send> UnsafeArc<T> { - pub fn new(data: T) -> UnsafeArc<T> { - unsafe { UnsafeArc { data: new_inner(data, 1) } } - } - - /// As new(), but returns an extra pre-cloned handle. - pub fn new2(data: T) -> (UnsafeArc<T>, UnsafeArc<T>) { - unsafe { - let ptr = new_inner(data, 2); - (UnsafeArc { data: ptr }, UnsafeArc { data: ptr }) - } - } - - /// As new(), but returns a vector of as many pre-cloned handles as requested. - pub fn newN(data: T, num_handles: uint) -> ~[UnsafeArc<T>] { - unsafe { - if num_handles == 0 { - ~[] // need to free data here - } else { - let ptr = new_inner(data, num_handles); - vec::from_fn(num_handles, |_| UnsafeArc { data: ptr }) - } - } - } - - /// As newN(), but from an already-existing handle. Uses one xadd. - pub fn cloneN(self, num_handles: uint) -> ~[UnsafeArc<T>] { - if num_handles == 0 { - ~[] // The "num_handles - 1" trick (below) fails in the 0 case. - } else { - unsafe { - // Minus one because we are recycling the given handle's refcount. - let old_count = (*self.data).count.fetch_add(num_handles - 1, Acquire); - // let old_count = (*self.data).count.fetch_add(num_handles, Acquire); - assert!(old_count >= 1); - let ptr = self.data; - cast::forget(self); // Don't run the destructor on this handle. - vec::from_fn(num_handles, |_| UnsafeArc { data: ptr }) - } - } - } - - #[inline] - pub fn get(&self) -> *mut T { - unsafe { - assert!((*self.data).count.load(Relaxed) > 0); - let r: *mut T = (*self.data).data.get_mut_ref(); - return r; - } - } - - #[inline] - pub fn get_immut(&self) -> *T { - unsafe { - assert!((*self.data).count.load(Relaxed) > 0); - let r: *T = (*self.data).data.get_ref(); - return r; - } - } - - /// Wait until all other handles are dropped, then retrieve the enclosed - /// data. See extra::arc::Arc for specific semantics documentation. - /// If called when the task is already unkillable, unwrap will unkillably - /// block; otherwise, an unwrapping task can be killed by linked failure. - pub fn unwrap(self) -> T { - unsafe { - let mut this = self; - // The ~ dtor needs to run if this code succeeds. - let mut data: ~ArcData<T> = cast::transmute(this.data); - // Set up the unwrap protocol. - let (p1,c1) = Chan::new(); // () - let (p2,c2) = Chan::new(); // bool - // Try to put our server end in the unwrapper slot. - // This needs no barrier -- it's protected by the release barrier on - // the xadd, and the acquire+release barrier in the destructor's xadd. - if data.unwrapper.fill(~(c1,p2), Relaxed).is_none() { - // Got in. Tell this handle's destructor not to run (we are now it). - this.data = ptr::mut_null(); - // Drop our own reference. - let old_count = data.count.fetch_sub(1, Release); - assert!(old_count >= 1); - if old_count == 1 { - // We were the last owner. Can unwrap immediately. - // AtomicOption's destructor will free the server endpoint. - // FIXME(#3224): it should be like this - // let ~ArcData { data: user_data, _ } = data; - // user_data - data.data.take_unwrap() - } else { - // The *next* person who sees the refcount hit 0 will wake us. - let c2_and_data = ChannelAndDataGuard { - channel: Some(c2), - data: Some(data), - }; - p1.recv(); - // Got here. Back in the 'unkillable' without getting killed. - let (c2, data) = c2_and_data.unwrap(); - c2.send(true); - // FIXME(#3224): it should be like this - // let ~ArcData { data: user_data, _ } = data; - // user_data - let mut data = data; - data.data.take_unwrap() - } - } else { - // If 'put' returns the server end back to us, we were rejected; - // someone else was trying to unwrap. Avoid guaranteed deadlock. - cast::forget(data); - fail!("Another task is already unwrapping this Arc!"); - } - } - } - - /// As unwrap above, but without blocking. Returns 'UnsafeArcSelf(self)' if this is - /// not the last reference; 'UnsafeArcT(unwrapped_data)' if so. - pub fn try_unwrap(mut self) -> UnsafeArcUnwrap<T> { - unsafe { - // The ~ dtor needs to run if this code succeeds. - let mut data: ~ArcData<T> = cast::transmute(self.data); - // This can of course race with anybody else who has a handle, but in - // such a case, the returned count will always be at least 2. If we - // see 1, no race was possible. All that matters is 1 or not-1. - let count = data.count.load(Acquire); - assert!(count >= 1); - // The more interesting race is one with an unwrapper. They may have - // already dropped their count -- but if so, the unwrapper pointer - // will have been set first, which the barriers ensure we will see. - // (Note: using is_empty(), not take(), to not free the unwrapper.) - if count == 1 && data.unwrapper.is_empty(Acquire) { - // Tell this handle's destructor not to run (we are now it). - self.data = ptr::mut_null(); - // FIXME(#3224) as above - UnsafeArcT(data.data.take_unwrap()) - } else { - cast::forget(data); - UnsafeArcSelf(self) - } - } - } -} - -impl<T: Send> Clone for UnsafeArc<T> { - fn clone(&self) -> UnsafeArc<T> { - unsafe { - // This barrier might be unnecessary, but I'm not sure... - let old_count = (*self.data).count.fetch_add(1, Acquire); - assert!(old_count >= 1); - return UnsafeArc { data: self.data }; - } - } -} - -#[unsafe_destructor] -impl<T> Drop for UnsafeArc<T>{ - fn drop(&mut self) { - unsafe { - // Happens when destructing an unwrapper's handle and from `#[unsafe_no_drop_flag]` - if self.data.is_null() { - return - } - let mut data: ~ArcData<T> = cast::transmute(self.data); - // Must be acquire+release, not just release, to make sure this - // doesn't get reordered to after the unwrapper pointer load. - let old_count = data.count.fetch_sub(1, SeqCst); - assert!(old_count >= 1); - if old_count == 1 { - // Were we really last, or should we hand off to an - // unwrapper? It's safe to not xchg because the unwrapper - // will set the unwrap lock *before* dropping his/her - // reference. In effect, being here means we're the only - // *awake* task with the data. - match data.unwrapper.take(Acquire) { - Some(~(message, response)) => { - // Send 'ready' and wait for a response. - message.send(()); - // Unkillable wait. Message guaranteed to come. - if response.recv() { - // Other task got the data. - cast::forget(data); - } else { - // Other task was killed. drop glue takes over. - } - } - None => { - // drop glue takes over. - } - } - } else { - cast::forget(data); - } - } - } -} - - -/****************************************************************************/ - -pub struct AtomicGuard { - on: bool, -} - -impl Drop for AtomicGuard { - fn drop(&mut self) { - use rt::task::{Task, GreenTask, SchedTask}; - use rt::local::Local; - - if self.on { - unsafe { - let task_opt: Option<*mut Task> = Local::try_unsafe_borrow(); - match task_opt { - Some(t) => { - match (*t).task_type { - GreenTask(_) => (*t).death.allow_deschedule(), - SchedTask => {} - } - } - None => {} - } - } - } - } -} - -/** - * Enables a runtime assertion that no operation while the returned guard is - * live uses scheduler operations (deschedule, recv, spawn, etc). This is for - * use with pthread mutexes, which may block the entire scheduler thread, - * rather than just one task, and is hence prone to deadlocks if mixed with - * descheduling. - * - * NOTE: THIS DOES NOT PROVIDE LOCKING, or any sort of critical-section - * synchronization whatsoever. It only makes sense to use for CPU-local issues. - */ -// FIXME(#8140) should not be pub -pub unsafe fn atomic() -> AtomicGuard { - use rt::task::{Task, GreenTask, SchedTask}; - use rt::local::Local; - - let task_opt: Option<*mut Task> = Local::try_unsafe_borrow(); - match task_opt { - Some(t) => { - match (*t).task_type { - GreenTask(_) => { - (*t).death.inhibit_deschedule(); - return AtomicGuard { - on: true, - }; - } - SchedTask => {} - } - } - None => {} - } - - AtomicGuard { - on: false, - } -} +use ops::Drop; +use option::{Option,Some,None}; +use sync::arc::UnsafeArc; +use unstable::mutex::Mutex; pub struct LittleLock { priv l: Mutex, @@ -496,14 +155,6 @@ impl<T:Send> Exclusive<T> { l.wait(); } } - - pub fn unwrap(self) -> T { - let Exclusive { x: x } = self; - // Someday we might need to unkillably unwrap an Exclusive, but not today. - let inner = x.unwrap(); - let ExData { data: user_data, .. } = inner; // will destroy the LittleLock - user_data - } } #[cfg(test)] @@ -514,20 +165,6 @@ mod tests { use task; use mem::size_of; - //#[unsafe_no_drop_flag] FIXME: #9758 - #[ignore] - #[test] - fn test_size() { - assert_eq!(size_of::<UnsafeArc<[int, ..10]>>(), size_of::<*[int, ..10]>()); - } - - #[test] - fn test_atomic() { - // NB. The whole runtime will abort on an 'atomic-sleep' violation, - // so we can't really test for the converse behaviour. - unsafe { let _ = atomic(); } // oughtn't fail - } - #[test] fn exclusive_new_arc() { unsafe { @@ -570,114 +207,4 @@ mod tests { x.with(|one| assert_eq!(*one, 1)); } } - - #[test] - fn arclike_newN() { - // Tests that the many-refcounts-at-once constructors don't leak. - let _ = UnsafeArc::new2(~~"hello"); - let x = UnsafeArc::newN(~~"hello", 0); - assert_eq!(x.len(), 0) - let x = UnsafeArc::newN(~~"hello", 1); - assert_eq!(x.len(), 1) - let x = UnsafeArc::newN(~~"hello", 10); - assert_eq!(x.len(), 10) - } - - #[test] - fn arclike_cloneN() { - // Tests that the many-refcounts-at-once special-clone doesn't leak. - let x = UnsafeArc::new(~~"hello"); - let x = x.cloneN(0); - assert_eq!(x.len(), 0); - let x = UnsafeArc::new(~~"hello"); - let x = x.cloneN(1); - assert_eq!(x.len(), 1); - let x = UnsafeArc::new(~~"hello"); - let x = x.cloneN(10); - assert_eq!(x.len(), 10); - } - - #[test] - fn arclike_unwrap_basic() { - let x = UnsafeArc::new(~~"hello"); - assert!(x.unwrap() == ~~"hello"); - } - - #[test] - fn arclike_try_unwrap() { - let x = UnsafeArc::new(~~"hello"); - assert!(x.try_unwrap().expect_t("try_unwrap failed") == ~~"hello"); - } - - #[test] - fn arclike_try_unwrap_fail() { - let x = UnsafeArc::new(~~"hello"); - let x2 = x.clone(); - let left_x = x.try_unwrap(); - assert!(left_x.is_self()); - drop(left_x); - assert!(x2.try_unwrap().expect_t("try_unwrap none") == ~~"hello"); - } - - #[test] - fn arclike_try_unwrap_unwrap_race() { - // When an unwrap and a try_unwrap race, the unwrapper should always win. - let x = UnsafeArc::new(~~"hello"); - let x2 = x.clone(); - let (p,c) = Chan::new(); - do task::spawn { - c.send(()); - assert!(x2.unwrap() == ~~"hello"); - c.send(()); - } - p.recv(); - task::deschedule(); // Try to make the unwrapper get blocked first. - let left_x = x.try_unwrap(); - assert!(left_x.is_self()); - drop(left_x); - p.recv(); - } - - #[test] - fn exclusive_new_unwrap_basic() { - // Unlike the above, also tests no double-freeing of the LittleLock. - let x = Exclusive::new(~~"hello"); - assert!(x.unwrap() == ~~"hello"); - } - - #[test] - fn exclusive_new_unwrap_contended() { - let x = Exclusive::new(~~"hello"); - let x2 = x.clone(); - do task::spawn { - unsafe { x2.with(|_hello| ()); } - task::deschedule(); - } - assert!(x.unwrap() == ~~"hello"); - - // Now try the same thing, but with the child task blocking. - let x = Exclusive::new(~~"hello"); - let x2 = x.clone(); - let mut builder = task::task(); - let res = builder.future_result(); - do builder.spawn { - assert!(x2.unwrap() == ~~"hello"); - } - // Have to get rid of our reference before blocking. - drop(x); - res.recv(); - } - - #[test] #[should_fail] - fn exclusive_new_unwrap_conflict() { - let x = Exclusive::new(~~"hello"); - let x2 = x.clone(); - let mut builder = task::task(); - let res = builder.future_result(); - do builder.spawn { - assert!(x2.unwrap() == ~~"hello"); - } - assert!(x.unwrap() == ~~"hello"); - assert!(res.recv().is_ok()); - } } |
