diff options
Diffstat (limited to 'src/libstd')
| -rw-r--r-- | src/libstd/sync/arc.rs | 189 | ||||
| -rw-r--r-- | src/libstd/sync/deque.rs | 31 | ||||
| -rw-r--r-- | src/libstd/sync/mod.rs | 1 | ||||
| -rw-r--r-- | src/libstd/sync/mpmc_bounded_queue.rs | 6 | ||||
| -rw-r--r-- | src/libstd/sync/mpsc_queue.rs | 4 | ||||
| -rw-r--r-- | src/libstd/sync/spsc_queue.rs | 10 |
6 files changed, 23 insertions, 218 deletions
diff --git a/src/libstd/sync/arc.rs b/src/libstd/sync/arc.rs deleted file mode 100644 index 7dcfe62ffb8..00000000000 --- a/src/libstd/sync/arc.rs +++ /dev/null @@ -1,189 +0,0 @@ -// Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT -// file at the top-level directory of this distribution and at -// http://rust-lang.org/COPYRIGHT. -// -// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or -// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license -// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -//! Atomically reference counted data -//! -//! This modules contains the implementation of an atomically reference counted -//! pointer for the purpose of sharing data between tasks. This is obviously a -//! very unsafe primitive to use, but it has its use cases when implementing -//! concurrent data structures and similar tasks. -//! -//! Great care must be taken to ensure that data races do not arise through the -//! usage of `UnsafeArc`, and this often requires some form of external -//! synchronization. The only guarantee provided to you by this class is that -//! the underlying data will remain valid (not free'd) so long as the reference -//! count is greater than one. - -use clone::Clone; -use iter::Iterator; -use kinds::Send; -use mem; -use ops::Drop; -use owned::Box; -use ptr::RawPtr; -use sync::atomics::{fence, AtomicUint, Relaxed, Acquire, Release}; -use ty::Unsafe; -use vec::Vec; - -/// An atomically reference counted pointer. -/// -/// Enforces no shared-memory safety. -#[unsafe_no_drop_flag] -pub struct UnsafeArc<T> { - data: *mut ArcData<T>, -} - -struct ArcData<T> { - count: AtomicUint, - data: Unsafe<T>, -} - -unsafe fn new_inner<T: Send>(data: T, refcount: uint) -> *mut ArcData<T> { - let data = box ArcData { - count: AtomicUint::new(refcount), - data: Unsafe::new(data) - }; - mem::transmute(data) -} - -impl<T: Send> UnsafeArc<T> { - /// Creates a new `UnsafeArc` which wraps the given data. - pub fn new(data: T) -> UnsafeArc<T> { - unsafe { UnsafeArc { data: new_inner(data, 1) } } - } - - /// As new(), but returns an extra pre-cloned handle. - pub fn new2(data: T) -> (UnsafeArc<T>, UnsafeArc<T>) { - unsafe { - let ptr = new_inner(data, 2); - (UnsafeArc { data: ptr }, UnsafeArc { data: ptr }) - } - } - - /// As new(), but returns a vector of as many pre-cloned handles as - /// requested. - pub fn newN(data: T, num_handles: uint) -> Vec<UnsafeArc<T>> { - unsafe { - if num_handles == 0 { - vec![] // need to free data here - } else { - let ptr = new_inner(data, num_handles); - let v = Vec::from_fn(num_handles, |_| UnsafeArc { data: ptr }); - v - } - } - } - - /// Gets a pointer to the inner shared data. Note that care must be taken to - /// ensure that the outer `UnsafeArc` does not fall out of scope while this - /// pointer is in use, otherwise it could possibly contain a use-after-free. - #[inline] - pub fn get(&self) -> *mut T { - unsafe { - debug_assert!((*self.data).count.load(Relaxed) > 0); - return (*self.data).data.get(); - } - } - - /// Gets an immutable pointer to the inner shared data. This has the same - /// caveats as the `get` method. - #[inline] - pub fn get_immut(&self) -> *T { - unsafe { - debug_assert!((*self.data).count.load(Relaxed) > 0); - return (*self.data).data.get() as *T; - } - } - - /// checks if this is the only reference to the arc protected data - #[inline] - pub fn is_owned(&self) -> bool { - unsafe { - (*self.data).count.load(Relaxed) == 1 - } - } -} - -impl<T: Send> Clone for UnsafeArc<T> { - fn clone(&self) -> UnsafeArc<T> { - unsafe { - // Using a relaxed ordering is alright here, as knowledge of the original reference - // prevents other threads from erroneously deleting the object. - // - // As explained in the [Boost documentation][1], - // Increasing the reference counter can always be done with memory_order_relaxed: New - // references to an object can only be formed from an existing reference, and passing - // an existing reference from one thread to another must already provide any required - // synchronization. - // [1]: (www.boost.org/doc/libs/1_55_0/doc/html/atomic/usage_examples.html) - let old_count = (*self.data).count.fetch_add(1, Relaxed); - debug_assert!(old_count >= 1); - return UnsafeArc { data: self.data }; - } - } -} - -#[unsafe_destructor] -impl<T> Drop for UnsafeArc<T>{ - fn drop(&mut self) { - unsafe { - // Happens when destructing an unwrapper's handle and from - // `#[unsafe_no_drop_flag]` - if self.data.is_null() { - return - } - // Because `fetch_sub` is already atomic, we do not need to synchronize with other - // threads unless we are going to delete the object. - let old_count = (*self.data).count.fetch_sub(1, Release); - debug_assert!(old_count >= 1); - if old_count == 1 { - // This fence is needed to prevent reordering of use of the data and deletion of - // the data. Because it is marked `Release`, the decreasing of the reference count - // sychronizes with this `Acquire` fence. This means that use of the data happens - // before decreasing the refernce count, which happens before this fence, which - // happens before the deletion of the data. - // - // As explained in the [Boost documentation][1], - // It is important to enforce any possible access to the object in one thread - // (through an existing reference) to *happen before* deleting the object in a - // different thread. This is achieved by a "release" operation after dropping a - // reference (any access to the object through this reference must obviously - // happened before), and an "acquire" operation before deleting the object. - // [1]: (www.boost.org/doc/libs/1_55_0/doc/html/atomic/usage_examples.html) - fence(Acquire); - let _: Box<ArcData<T>> = mem::transmute(self.data); - } - } - } -} - -#[cfg(test)] -mod tests { - use prelude::*; - use super::UnsafeArc; - use mem::size_of; - - #[test] - fn test_size() { - assert_eq!(size_of::<UnsafeArc<[int, ..10]>>(), size_of::<*[int, ..10]>()); - } - - #[test] - fn arclike_newN() { - // Tests that the many-refcounts-at-once constructors don't leak. - let _ = UnsafeArc::new2("hello".to_owned().to_owned()); - let x = UnsafeArc::newN("hello".to_owned().to_owned(), 0); - assert_eq!(x.len(), 0) - let x = UnsafeArc::newN("hello".to_owned().to_owned(), 1); - assert_eq!(x.len(), 1) - let x = UnsafeArc::newN("hello".to_owned().to_owned(), 10); - assert_eq!(x.len(), 10) - } -} diff --git a/src/libstd/sync/deque.rs b/src/libstd/sync/deque.rs index 42a8bd88652..c6446775b0c 100644 --- a/src/libstd/sync/deque.rs +++ b/src/libstd/sync/deque.rs @@ -417,8 +417,8 @@ mod tests { #[test] fn smoke() { - let mut pool = BufferPool::new(); - let (mut w, mut s) = pool.deque(); + let pool = BufferPool::new(); + let (w, s) = pool.deque(); assert_eq!(w.pop(), None); assert_eq!(s.steal(), Empty); w.push(1); @@ -432,10 +432,9 @@ mod tests { #[test] fn stealpush() { static AMT: int = 100000; - let mut pool = BufferPool::<int>::new(); - let (mut w, s) = pool.deque(); + let pool = BufferPool::<int>::new(); + let (w, s) = pool.deque(); let t = Thread::start(proc() { - let mut s = s; let mut left = AMT; while left > 0 { match s.steal() { @@ -458,10 +457,9 @@ mod tests { #[test] fn stealpush_large() { static AMT: int = 100000; - let mut pool = BufferPool::<(int, int)>::new(); - let (mut w, s) = pool.deque(); + let pool = BufferPool::<(int, int)>::new(); + let (w, s) = pool.deque(); let t = Thread::start(proc() { - let mut s = s; let mut left = AMT; while left > 0 { match s.steal() { @@ -479,7 +477,7 @@ mod tests { t.join(); } - fn stampede(mut w: Worker<Box<int>>, s: Stealer<Box<int>>, + fn stampede(w: Worker<Box<int>>, s: Stealer<Box<int>>, nthreads: int, amt: uint) { for _ in range(0, amt) { w.push(box 20); @@ -491,7 +489,6 @@ mod tests { let s = s.clone(); Thread::start(proc() { unsafe { - let mut s = s; while (*unsafe_remaining).load(SeqCst) > 0 { match s.steal() { Data(box 20) => { @@ -520,7 +517,7 @@ mod tests { #[test] fn run_stampede() { - let mut pool = BufferPool::<Box<int>>::new(); + let pool = BufferPool::<Box<int>>::new(); let (w, s) = pool.deque(); stampede(w, s, 8, 10000); } @@ -528,7 +525,7 @@ mod tests { #[test] fn many_stampede() { static AMT: uint = 4; - let mut pool = BufferPool::<Box<int>>::new(); + let pool = BufferPool::<Box<int>>::new(); let threads = range(0, AMT).map(|_| { let (w, s) = pool.deque(); Thread::start(proc() { @@ -547,14 +544,13 @@ mod tests { static NTHREADS: int = 8; static mut DONE: AtomicBool = INIT_ATOMIC_BOOL; static mut HITS: AtomicUint = INIT_ATOMIC_UINT; - let mut pool = BufferPool::<int>::new(); - let (mut w, s) = pool.deque(); + let pool = BufferPool::<int>::new(); + let (w, s) = pool.deque(); let threads = range(0, NTHREADS).map(|_| { let s = s.clone(); Thread::start(proc() { unsafe { - let mut s = s; loop { match s.steal() { Data(2) => { HITS.fetch_add(1, SeqCst); } @@ -606,8 +602,8 @@ mod tests { static AMT: int = 10000; static NTHREADS: int = 4; static mut DONE: AtomicBool = INIT_ATOMIC_BOOL; - let mut pool = BufferPool::<(int, uint)>::new(); - let (mut w, s) = pool.deque(); + let pool = BufferPool::<(int, uint)>::new(); + let (w, s) = pool.deque(); let (threads, hits) = vec::unzip(range(0, NTHREADS).map(|_| { let s = s.clone(); @@ -617,7 +613,6 @@ mod tests { }; (Thread::start(proc() { unsafe { - let mut s = s; loop { match s.steal() { Data((1, 2)) => { diff --git a/src/libstd/sync/mod.rs b/src/libstd/sync/mod.rs index 3213c538152..b2cf427edc8 100644 --- a/src/libstd/sync/mod.rs +++ b/src/libstd/sync/mod.rs @@ -15,7 +15,6 @@ //! and/or blocking at all, but rather provide the necessary tools to build //! other types of concurrent primitives. -pub mod arc; pub mod atomics; pub mod deque; pub mod mpmc_bounded_queue; diff --git a/src/libstd/sync/mpmc_bounded_queue.rs b/src/libstd/sync/mpmc_bounded_queue.rs index 7fb98e14086..ffad9c1c583 100644 --- a/src/libstd/sync/mpmc_bounded_queue.rs +++ b/src/libstd/sync/mpmc_bounded_queue.rs @@ -173,7 +173,7 @@ mod tests { fn test() { let nthreads = 8u; let nmsgs = 1000u; - let mut q = Queue::with_capacity(nthreads*nmsgs); + let q = Queue::with_capacity(nthreads*nmsgs); assert_eq!(None, q.pop()); let (tx, rx) = channel(); @@ -181,7 +181,7 @@ mod tests { let q = q.clone(); let tx = tx.clone(); native::task::spawn(proc() { - let mut q = q; + let q = q; for i in range(0, nmsgs) { assert!(q.push(i)); } @@ -195,7 +195,7 @@ mod tests { completion_rxs.push(rx); let q = q.clone(); native::task::spawn(proc() { - let mut q = q; + let q = q; let mut i = 0u; loop { match q.pop() { diff --git a/src/libstd/sync/mpsc_queue.rs b/src/libstd/sync/mpsc_queue.rs index f2f95da1842..4db24e82d37 100644 --- a/src/libstd/sync/mpsc_queue.rs +++ b/src/libstd/sync/mpsc_queue.rs @@ -165,7 +165,7 @@ mod tests { #[test] fn test_full() { - let mut q = Queue::new(); + let q = Queue::new(); q.push(box 1); q.push(box 2); } @@ -174,7 +174,7 @@ mod tests { fn test() { let nthreads = 8u; let nmsgs = 1000u; - let mut q = Queue::new(); + let q = Queue::new(); match q.pop() { Empty => {} Inconsistent | Data(..) => fail!() diff --git a/src/libstd/sync/spsc_queue.rs b/src/libstd/sync/spsc_queue.rs index 093933c82fc..fb515c9db6e 100644 --- a/src/libstd/sync/spsc_queue.rs +++ b/src/libstd/sync/spsc_queue.rs @@ -235,7 +235,7 @@ mod test { #[test] fn smoke() { - let mut q = Queue::new(0); + let q = Queue::new(0); q.push(1); q.push(2); assert_eq!(q.pop(), Some(1)); @@ -250,14 +250,14 @@ mod test { #[test] fn drop_full() { - let mut q = Queue::new(0); + let q = Queue::new(0); q.push(box 1); q.push(box 2); } #[test] fn smoke_bound() { - let mut q = Queue::new(1); + let q = Queue::new(1); q.push(1); q.push(2); assert_eq!(q.pop(), Some(1)); @@ -282,7 +282,7 @@ mod test { native::task::spawn(proc() { for _ in range(0, 100000) { loop { - match unsafe { (*b.get()).pop() } { + match b.pop() { Some(1) => break, Some(_) => fail!(), None => {} @@ -292,7 +292,7 @@ mod test { tx.send(()); }); for _ in range(0, 100000) { - unsafe { (*a.get()).push(1); } + a.push(1); } rx.recv(); } |
