diff options
| author | bors <bors@rust-lang.org> | 2014-05-21 17:31:29 -0700 |
|---|---|---|
| committer | bors <bors@rust-lang.org> | 2014-05-21 17:31:29 -0700 |
| commit | 257a73ce8273d026f2af1a5021ae2d1a4e7b95e5 (patch) | |
| tree | 2f7d66fc0f7de80105252babe27757e7ea94951a /src/libstd | |
| parent | 5f3f0918ad70cd9b0bfcd2f93aea0218ec92fb87 (diff) | |
| parent | fdf935a5249edd0be0f14385a099963e43c7a29b (diff) | |
| download | rust-257a73ce8273d026f2af1a5021ae2d1a4e7b95e5.tar.gz rust-257a73ce8273d026f2af1a5021ae2d1a4e7b95e5.zip | |
auto merge of #14301 : alexcrichton/rust/remove-unsafe-arc, r=brson
This type can be built with `Arc<Unsafe<T>>` now that liballoc exists.
Diffstat (limited to 'src/libstd')
| -rw-r--r-- | src/libstd/comm/mod.rs | 49 | ||||
| -rw-r--r-- | src/libstd/comm/oneshot.rs | 2 | ||||
| -rw-r--r-- | src/libstd/rt/task.rs | 17 | ||||
| -rw-r--r-- | src/libstd/sync/arc.rs | 189 | ||||
| -rw-r--r-- | src/libstd/sync/deque.rs | 95 | ||||
| -rw-r--r-- | src/libstd/sync/mod.rs | 1 | ||||
| -rw-r--r-- | src/libstd/sync/mpmc_bounded_queue.rs | 56 | ||||
| -rw-r--r-- | src/libstd/sync/mpsc_queue.rs | 30 | ||||
| -rw-r--r-- | src/libstd/sync/spsc_queue.rs | 70 | ||||
| -rw-r--r-- | src/libstd/unstable/sync.rs | 8 |
10 files changed, 174 insertions, 343 deletions
diff --git a/src/libstd/comm/mod.rs b/src/libstd/comm/mod.rs index df0c6f3b8d3..fd5b92ba469 100644 --- a/src/libstd/comm/mod.rs +++ b/src/libstd/comm/mod.rs @@ -271,6 +271,8 @@ // And now that you've seen all the races that I found and attempted to fix, // here's the code for you to find some more! +use alloc::arc::Arc; + use cell::Cell; use clone::Clone; use iter::Iterator; @@ -283,7 +285,6 @@ use owned::Box; use result::{Ok, Err, Result}; use rt::local::Local; use rt::task::{Task, BlockedTask}; -use sync::arc::UnsafeArc; use ty::Unsafe; pub use comm::select::{Select, Handle}; @@ -352,7 +353,7 @@ pub struct Sender<T> { /// The sending-half of Rust's synchronous channel type. This half can only be /// owned by one task, but it can be cloned to send to other tasks. pub struct SyncSender<T> { - inner: UnsafeArc<sync::Packet<T>>, + inner: Arc<Unsafe<sync::Packet<T>>>, // can't share in an arc marker: marker::NoShare, } @@ -386,10 +387,10 @@ pub enum TrySendError<T> { } enum Flavor<T> { - Oneshot(UnsafeArc<oneshot::Packet<T>>), - Stream(UnsafeArc<stream::Packet<T>>), - Shared(UnsafeArc<shared::Packet<T>>), - Sync(UnsafeArc<sync::Packet<T>>), + Oneshot(Arc<Unsafe<oneshot::Packet<T>>>), + Stream(Arc<Unsafe<stream::Packet<T>>>), + Shared(Arc<Unsafe<shared::Packet<T>>>), + Sync(Arc<Unsafe<sync::Packet<T>>>), } #[doc(hidden)] @@ -435,8 +436,8 @@ impl<T> UnsafeFlavor<T> for Receiver<T> { /// println!("{}", rx.recv()); /// ``` pub fn channel<T: Send>() -> (Sender<T>, Receiver<T>) { - let (a, b) = UnsafeArc::new2(oneshot::Packet::new()); - (Sender::new(Oneshot(b)), Receiver::new(Oneshot(a))) + let a = Arc::new(Unsafe::new(oneshot::Packet::new())); + (Sender::new(Oneshot(a.clone())), Receiver::new(Oneshot(a))) } /// Creates a new synchronous, bounded channel. @@ -471,8 +472,8 @@ pub fn channel<T: Send>() -> (Sender<T>, Receiver<T>) { /// assert_eq!(rx.recv(), 2); /// ``` pub fn sync_channel<T: Send>(bound: uint) -> (SyncSender<T>, Receiver<T>) { - let (a, b) = UnsafeArc::new2(sync::Packet::new(bound)); - (SyncSender::new(a), Receiver::new(Sync(b))) + let a = Arc::new(Unsafe::new(sync::Packet::new(bound))); + (SyncSender::new(a.clone()), Receiver::new(Sync(a))) } //////////////////////////////////////////////////////////////////////////////// @@ -557,13 +558,13 @@ impl<T: Send> Sender<T> { let (new_inner, ret) = match *unsafe { self.inner() } { Oneshot(ref p) => { - let p = p.get(); unsafe { + let p = p.get(); if !(*p).sent() { return (*p).send(t); } else { - let (a, b) = UnsafeArc::new2(stream::Packet::new()); - match (*p).upgrade(Receiver::new(Stream(b))) { + let a = Arc::new(Unsafe::new(stream::Packet::new())); + match (*p).upgrade(Receiver::new(Stream(a.clone()))) { oneshot::UpSuccess => { let ret = (*a.get()).send(t); (a, ret) @@ -598,17 +599,21 @@ impl<T: Send> Clone for Sender<T> { fn clone(&self) -> Sender<T> { let (packet, sleeper) = match *unsafe { self.inner() } { Oneshot(ref p) => { - let (a, b) = UnsafeArc::new2(shared::Packet::new()); - match unsafe { (*p.get()).upgrade(Receiver::new(Shared(a))) } { - oneshot::UpSuccess | oneshot::UpDisconnected => (b, None), - oneshot::UpWoke(task) => (b, Some(task)) + let a = Arc::new(Unsafe::new(shared::Packet::new())); + match unsafe { + (*p.get()).upgrade(Receiver::new(Shared(a.clone()))) + } { + oneshot::UpSuccess | oneshot::UpDisconnected => (a, None), + oneshot::UpWoke(task) => (a, Some(task)) } } Stream(ref p) => { - let (a, b) = UnsafeArc::new2(shared::Packet::new()); - match unsafe { (*p.get()).upgrade(Receiver::new(Shared(a))) } { - stream::UpSuccess | stream::UpDisconnected => (b, None), - stream::UpWoke(task) => (b, Some(task)), + let a = Arc::new(Unsafe::new(shared::Packet::new())); + match unsafe { + (*p.get()).upgrade(Receiver::new(Shared(a.clone()))) + } { + stream::UpSuccess | stream::UpDisconnected => (a, None), + stream::UpWoke(task) => (a, Some(task)), } } Shared(ref p) => { @@ -645,7 +650,7 @@ impl<T: Send> Drop for Sender<T> { //////////////////////////////////////////////////////////////////////////////// impl<T: Send> SyncSender<T> { - fn new(inner: UnsafeArc<sync::Packet<T>>) -> SyncSender<T> { + fn new(inner: Arc<Unsafe<sync::Packet<T>>>) -> SyncSender<T> { SyncSender { inner: inner, marker: marker::NoShare } } diff --git a/src/libstd/comm/oneshot.rs b/src/libstd/comm/oneshot.rs index a7124e50b66..f9e8fd1e534 100644 --- a/src/libstd/comm/oneshot.rs +++ b/src/libstd/comm/oneshot.rs @@ -15,7 +15,7 @@ /// this type is to have one and exactly one allocation when the chan/port pair /// is created. /// -/// Another possible optimization would be to not use an UnsafeArc box because +/// Another possible optimization would be to not use an Arc box because /// in theory we know when the shared packet can be deallocated (no real need /// for the atomic reference counting), but I was having trouble how to destroy /// the data early in a drop of a Port. diff --git a/src/libstd/rt/task.rs b/src/libstd/rt/task.rs index 31a20145306..8968747d990 100644 --- a/src/libstd/rt/task.rs +++ b/src/libstd/rt/task.rs @@ -13,6 +13,8 @@ //! local storage, and logging. Even a 'freestanding' Rust would likely want //! to implement this. +use alloc::arc::Arc; + use cleanup; use clone::Clone; use comm::Sender; @@ -32,7 +34,6 @@ use rt::local_heap::LocalHeap; use rt::rtio::LocalIo; use rt::unwind::Unwinder; use str::SendStr; -use sync::arc::UnsafeArc; use sync::atomics::{AtomicUint, SeqCst}; use task::{TaskResult, TaskOpts}; use unstable::finally::Finally; @@ -65,7 +66,7 @@ pub struct LocalStorage(pub Option<local_data::Map>); /// at any time. pub enum BlockedTask { Owned(Box<Task>), - Shared(UnsafeArc<AtomicUint>), + Shared(Arc<AtomicUint>), } pub enum DeathAction { @@ -82,7 +83,7 @@ pub struct Death { } pub struct BlockedTasks { - inner: UnsafeArc<AtomicUint>, + inner: Arc<AtomicUint>, } impl Task { @@ -313,10 +314,10 @@ impl BlockedTask { pub fn wake(self) -> Option<Box<Task>> { match self { Owned(task) => Some(task), - Shared(arc) => unsafe { - match (*arc.get()).swap(0, SeqCst) { + Shared(arc) => { + match arc.swap(0, SeqCst) { 0 => None, - n => Some(mem::transmute(n)), + n => Some(unsafe { mem::transmute(n) }), } } } @@ -343,7 +344,7 @@ impl BlockedTask { let arc = match self { Owned(task) => { let flag = unsafe { AtomicUint::new(mem::transmute(task)) }; - UnsafeArc::new(flag) + Arc::new(flag) } Shared(arc) => arc.clone(), }; @@ -375,7 +376,7 @@ impl BlockedTask { if blocked_task_ptr & 0x1 == 0 { Owned(mem::transmute(blocked_task_ptr)) } else { - let ptr: Box<UnsafeArc<AtomicUint>> = + let ptr: Box<Arc<AtomicUint>> = mem::transmute(blocked_task_ptr & !1); Shared(*ptr) } diff --git a/src/libstd/sync/arc.rs b/src/libstd/sync/arc.rs deleted file mode 100644 index 7dcfe62ffb8..00000000000 --- a/src/libstd/sync/arc.rs +++ /dev/null @@ -1,189 +0,0 @@ -// Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT -// file at the top-level directory of this distribution and at -// http://rust-lang.org/COPYRIGHT. -// -// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or -// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license -// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -//! Atomically reference counted data -//! -//! This modules contains the implementation of an atomically reference counted -//! pointer for the purpose of sharing data between tasks. This is obviously a -//! very unsafe primitive to use, but it has its use cases when implementing -//! concurrent data structures and similar tasks. -//! -//! Great care must be taken to ensure that data races do not arise through the -//! usage of `UnsafeArc`, and this often requires some form of external -//! synchronization. The only guarantee provided to you by this class is that -//! the underlying data will remain valid (not free'd) so long as the reference -//! count is greater than one. - -use clone::Clone; -use iter::Iterator; -use kinds::Send; -use mem; -use ops::Drop; -use owned::Box; -use ptr::RawPtr; -use sync::atomics::{fence, AtomicUint, Relaxed, Acquire, Release}; -use ty::Unsafe; -use vec::Vec; - -/// An atomically reference counted pointer. -/// -/// Enforces no shared-memory safety. -#[unsafe_no_drop_flag] -pub struct UnsafeArc<T> { - data: *mut ArcData<T>, -} - -struct ArcData<T> { - count: AtomicUint, - data: Unsafe<T>, -} - -unsafe fn new_inner<T: Send>(data: T, refcount: uint) -> *mut ArcData<T> { - let data = box ArcData { - count: AtomicUint::new(refcount), - data: Unsafe::new(data) - }; - mem::transmute(data) -} - -impl<T: Send> UnsafeArc<T> { - /// Creates a new `UnsafeArc` which wraps the given data. - pub fn new(data: T) -> UnsafeArc<T> { - unsafe { UnsafeArc { data: new_inner(data, 1) } } - } - - /// As new(), but returns an extra pre-cloned handle. - pub fn new2(data: T) -> (UnsafeArc<T>, UnsafeArc<T>) { - unsafe { - let ptr = new_inner(data, 2); - (UnsafeArc { data: ptr }, UnsafeArc { data: ptr }) - } - } - - /// As new(), but returns a vector of as many pre-cloned handles as - /// requested. - pub fn newN(data: T, num_handles: uint) -> Vec<UnsafeArc<T>> { - unsafe { - if num_handles == 0 { - vec![] // need to free data here - } else { - let ptr = new_inner(data, num_handles); - let v = Vec::from_fn(num_handles, |_| UnsafeArc { data: ptr }); - v - } - } - } - - /// Gets a pointer to the inner shared data. Note that care must be taken to - /// ensure that the outer `UnsafeArc` does not fall out of scope while this - /// pointer is in use, otherwise it could possibly contain a use-after-free. - #[inline] - pub fn get(&self) -> *mut T { - unsafe { - debug_assert!((*self.data).count.load(Relaxed) > 0); - return (*self.data).data.get(); - } - } - - /// Gets an immutable pointer to the inner shared data. This has the same - /// caveats as the `get` method. - #[inline] - pub fn get_immut(&self) -> *T { - unsafe { - debug_assert!((*self.data).count.load(Relaxed) > 0); - return (*self.data).data.get() as *T; - } - } - - /// checks if this is the only reference to the arc protected data - #[inline] - pub fn is_owned(&self) -> bool { - unsafe { - (*self.data).count.load(Relaxed) == 1 - } - } -} - -impl<T: Send> Clone for UnsafeArc<T> { - fn clone(&self) -> UnsafeArc<T> { - unsafe { - // Using a relaxed ordering is alright here, as knowledge of the original reference - // prevents other threads from erroneously deleting the object. - // - // As explained in the [Boost documentation][1], - // Increasing the reference counter can always be done with memory_order_relaxed: New - // references to an object can only be formed from an existing reference, and passing - // an existing reference from one thread to another must already provide any required - // synchronization. - // [1]: (www.boost.org/doc/libs/1_55_0/doc/html/atomic/usage_examples.html) - let old_count = (*self.data).count.fetch_add(1, Relaxed); - debug_assert!(old_count >= 1); - return UnsafeArc { data: self.data }; - } - } -} - -#[unsafe_destructor] -impl<T> Drop for UnsafeArc<T>{ - fn drop(&mut self) { - unsafe { - // Happens when destructing an unwrapper's handle and from - // `#[unsafe_no_drop_flag]` - if self.data.is_null() { - return - } - // Because `fetch_sub` is already atomic, we do not need to synchronize with other - // threads unless we are going to delete the object. - let old_count = (*self.data).count.fetch_sub(1, Release); - debug_assert!(old_count >= 1); - if old_count == 1 { - // This fence is needed to prevent reordering of use of the data and deletion of - // the data. Because it is marked `Release`, the decreasing of the reference count - // sychronizes with this `Acquire` fence. This means that use of the data happens - // before decreasing the refernce count, which happens before this fence, which - // happens before the deletion of the data. - // - // As explained in the [Boost documentation][1], - // It is important to enforce any possible access to the object in one thread - // (through an existing reference) to *happen before* deleting the object in a - // different thread. This is achieved by a "release" operation after dropping a - // reference (any access to the object through this reference must obviously - // happened before), and an "acquire" operation before deleting the object. - // [1]: (www.boost.org/doc/libs/1_55_0/doc/html/atomic/usage_examples.html) - fence(Acquire); - let _: Box<ArcData<T>> = mem::transmute(self.data); - } - } - } -} - -#[cfg(test)] -mod tests { - use prelude::*; - use super::UnsafeArc; - use mem::size_of; - - #[test] - fn test_size() { - assert_eq!(size_of::<UnsafeArc<[int, ..10]>>(), size_of::<*[int, ..10]>()); - } - - #[test] - fn arclike_newN() { - // Tests that the many-refcounts-at-once constructors don't leak. - let _ = UnsafeArc::new2("hello".to_owned().to_owned()); - let x = UnsafeArc::newN("hello".to_owned().to_owned(), 0); - assert_eq!(x.len(), 0) - let x = UnsafeArc::newN("hello".to_owned().to_owned(), 1); - assert_eq!(x.len(), 1) - let x = UnsafeArc::newN("hello".to_owned().to_owned(), 10); - assert_eq!(x.len(), 10) - } -} diff --git a/src/libstd/sync/deque.rs b/src/libstd/sync/deque.rs index 30b95ffb34f..a3fdc4d3eaf 100644 --- a/src/libstd/sync/deque.rs +++ b/src/libstd/sync/deque.rs @@ -48,20 +48,22 @@ // FIXME: all atomic operations in this module use a SeqCst ordering. That is // probably overkill +use alloc::arc::Arc; + use clone::Clone; use iter::{range, Iterator}; use kinds::Send; +use kinds::marker; use mem::{forget, min_align_of, size_of, transmute}; use ops::Drop; use option::{Option, Some, None}; use owned::Box; use ptr::RawPtr; use ptr; +use rt::heap::{allocate, deallocate}; use slice::ImmutableVector; -use sync::arc::UnsafeArc; use sync::atomics::{AtomicInt, AtomicPtr, SeqCst}; use unstable::sync::Exclusive; -use rt::heap::{allocate, deallocate}; use vec::Vec; // Once the queue is less than 1/K full, then it will be downsized. Note that @@ -87,14 +89,16 @@ struct Deque<T> { /// /// There may only be one worker per deque. pub struct Worker<T> { - deque: UnsafeArc<Deque<T>>, + deque: Arc<Deque<T>>, + noshare: marker::NoShare, } /// The stealing half of the work-stealing deque. Stealers have access to the /// opposite end of the deque from the worker, and they only have access to the /// `steal` method. pub struct Stealer<T> { - deque: UnsafeArc<Deque<T>>, + deque: Arc<Deque<T>>, + noshare: marker::NoShare, } /// When stealing some data, this is an enumeration of the possible outcomes. @@ -149,12 +153,14 @@ impl<T: Send> BufferPool<T> { /// Allocates a new work-stealing deque which will send/receiving memory to /// and from this buffer pool. - pub fn deque(&mut self) -> (Worker<T>, Stealer<T>) { - let (a, b) = UnsafeArc::new2(Deque::new(self.clone())); - (Worker { deque: a }, Stealer { deque: b }) + pub fn deque(&self) -> (Worker<T>, Stealer<T>) { + let a = Arc::new(Deque::new(self.clone())); + let b = a.clone(); + (Worker { deque: a, noshare: marker::NoShare }, + Stealer { deque: b, noshare: marker::NoShare }) } - fn alloc(&mut self, bits: int) -> Box<Buffer<T>> { + fn alloc(&self, bits: int) -> Box<Buffer<T>> { unsafe { self.pool.with(|pool| { match pool.iter().position(|x| x.size() >= (1 << bits)) { @@ -165,7 +171,7 @@ impl<T: Send> BufferPool<T> { } } - fn free(&mut self, buf: Box<Buffer<T>>) { + fn free(&self, buf: Box<Buffer<T>>) { unsafe { let mut buf = Some(buf); self.pool.with(|pool| { @@ -185,46 +191,48 @@ impl<T: Send> Clone for BufferPool<T> { impl<T: Send> Worker<T> { /// Pushes data onto the front of this work queue. - pub fn push(&mut self, t: T) { - unsafe { (*self.deque.get()).push(t) } + pub fn push(&self, t: T) { + unsafe { self.deque.push(t) } } /// Pops data off the front of the work queue, returning `None` on an empty /// queue. - pub fn pop(&mut self) -> Option<T> { - unsafe { (*self.deque.get()).pop() } + pub fn pop(&self) -> Option<T> { + unsafe { self.deque.pop() } } /// Gets access to the buffer pool that this worker is attached to. This can /// be used to create more deques which share the same buffer pool as this /// deque. - pub fn pool<'a>(&'a mut self) -> &'a mut BufferPool<T> { - unsafe { &mut (*self.deque.get()).pool } + pub fn pool<'a>(&'a self) -> &'a BufferPool<T> { + &self.deque.pool } } impl<T: Send> Stealer<T> { /// Steals work off the end of the queue (opposite of the worker's end) - pub fn steal(&mut self) -> Stolen<T> { - unsafe { (*self.deque.get()).steal() } + pub fn steal(&self) -> Stolen<T> { + unsafe { self.deque.steal() } } /// Gets access to the buffer pool that this stealer is attached to. This /// can be used to create more deques which share the same buffer pool as /// this deque. - pub fn pool<'a>(&'a mut self) -> &'a mut BufferPool<T> { - unsafe { &mut (*self.deque.get()).pool } + pub fn pool<'a>(&'a self) -> &'a BufferPool<T> { + &self.deque.pool } } impl<T: Send> Clone for Stealer<T> { - fn clone(&self) -> Stealer<T> { Stealer { deque: self.deque.clone() } } + fn clone(&self) -> Stealer<T> { + Stealer { deque: self.deque.clone(), noshare: marker::NoShare } + } } // Almost all of this code can be found directly in the paper so I'm not // personally going to heavily comment what's going on here. impl<T: Send> Deque<T> { - fn new(mut pool: BufferPool<T>) -> Deque<T> { + fn new(pool: BufferPool<T>) -> Deque<T> { let buf = pool.alloc(MIN_BITS); Deque { bottom: AtomicInt::new(0), @@ -234,7 +242,7 @@ impl<T: Send> Deque<T> { } } - unsafe fn push(&mut self, data: T) { + unsafe fn push(&self, data: T) { let mut b = self.bottom.load(SeqCst); let t = self.top.load(SeqCst); let mut a = self.array.load(SeqCst); @@ -250,7 +258,7 @@ impl<T: Send> Deque<T> { self.bottom.store(b + 1, SeqCst); } - unsafe fn pop(&mut self) -> Option<T> { + unsafe fn pop(&self) -> Option<T> { let b = self.bottom.load(SeqCst); let a = self.array.load(SeqCst); let b = b - 1; @@ -276,7 +284,7 @@ impl<T: Send> Deque<T> { } } - unsafe fn steal(&mut self) -> Stolen<T> { + unsafe fn steal(&self) -> Stolen<T> { let t = self.top.load(SeqCst); let old = self.array.load(SeqCst); let b = self.bottom.load(SeqCst); @@ -298,7 +306,7 @@ impl<T: Send> Deque<T> { } } - unsafe fn maybe_shrink(&mut self, b: int, t: int) { + unsafe fn maybe_shrink(&self, b: int, t: int) { let a = self.array.load(SeqCst); if b - t < (*a).size() / K && b - t > (1 << MIN_BITS) { self.swap_buffer(b, a, (*a).resize(b, t, -1)); @@ -312,7 +320,7 @@ impl<T: Send> Deque<T> { // after this method has called 'free' on it. The continued usage is simply // a read followed by a forget, but we must make sure that the memory can // continue to be read after we flag this buffer for reclamation. - unsafe fn swap_buffer(&mut self, b: int, old: *mut Buffer<T>, + unsafe fn swap_buffer(&self, b: int, old: *mut Buffer<T>, buf: Buffer<T>) -> *mut Buffer<T> { let newbuf: *mut Buffer<T> = transmute(box buf); self.array.store(newbuf, SeqCst); @@ -373,7 +381,7 @@ impl<T: Send> Buffer<T> { // Unsafe because this unsafely overwrites possibly uninitialized or // initialized data. - unsafe fn put(&mut self, i: int, t: T) { + unsafe fn put(&self, i: int, t: T) { let ptr = self.storage.offset(i & self.mask()); ptr::copy_nonoverlapping_memory(ptr as *mut T, &t as *T, 1); forget(t); @@ -382,7 +390,7 @@ impl<T: Send> Buffer<T> { // Again, unsafe because this has incredibly dubious ownership violations. // It is assumed that this buffer is immediately dropped. unsafe fn resize(&self, b: int, t: int, delta: int) -> Buffer<T> { - let mut buf = Buffer::new(self.log_size + delta); + let buf = Buffer::new(self.log_size + delta); for i in range(t, b) { buf.put(i, self.get(i)); } @@ -415,8 +423,8 @@ mod tests { #[test] fn smoke() { - let mut pool = BufferPool::new(); - let (mut w, mut s) = pool.deque(); + let pool = BufferPool::new(); + let (w, s) = pool.deque(); assert_eq!(w.pop(), None); assert_eq!(s.steal(), Empty); w.push(1); @@ -430,10 +438,9 @@ mod tests { #[test] fn stealpush() { static AMT: int = 100000; - let mut pool = BufferPool::<int>::new(); - let (mut w, s) = pool.deque(); + let pool = BufferPool::<int>::new(); + let (w, s) = pool.deque(); let t = Thread::start(proc() { - let mut s = s; let mut left = AMT; while left > 0 { match s.steal() { @@ -456,10 +463,9 @@ mod tests { #[test] fn stealpush_large() { static AMT: int = 100000; - let mut pool = BufferPool::<(int, int)>::new(); - let (mut w, s) = pool.deque(); + let pool = BufferPool::<(int, int)>::new(); + let (w, s) = pool.deque(); let t = Thread::start(proc() { - let mut s = s; let mut left = AMT; while left > 0 { match s.steal() { @@ -477,7 +483,7 @@ mod tests { t.join(); } - fn stampede(mut w: Worker<Box<int>>, s: Stealer<Box<int>>, + fn stampede(w: Worker<Box<int>>, s: Stealer<Box<int>>, nthreads: int, amt: uint) { for _ in range(0, amt) { w.push(box 20); @@ -489,7 +495,6 @@ mod tests { let s = s.clone(); Thread::start(proc() { unsafe { - let mut s = s; while (*unsafe_remaining).load(SeqCst) > 0 { match s.steal() { Data(box 20) => { @@ -518,7 +523,7 @@ mod tests { #[test] fn run_stampede() { - let mut pool = BufferPool::<Box<int>>::new(); + let pool = BufferPool::<Box<int>>::new(); let (w, s) = pool.deque(); stampede(w, s, 8, 10000); } @@ -526,7 +531,7 @@ mod tests { #[test] fn many_stampede() { static AMT: uint = 4; - let mut pool = BufferPool::<Box<int>>::new(); + let pool = BufferPool::<Box<int>>::new(); let threads = range(0, AMT).map(|_| { let (w, s) = pool.deque(); Thread::start(proc() { @@ -545,14 +550,13 @@ mod tests { static NTHREADS: int = 8; static mut DONE: AtomicBool = INIT_ATOMIC_BOOL; static mut HITS: AtomicUint = INIT_ATOMIC_UINT; - let mut pool = BufferPool::<int>::new(); - let (mut w, s) = pool.deque(); + let pool = BufferPool::<int>::new(); + let (w, s) = pool.deque(); let threads = range(0, NTHREADS).map(|_| { let s = s.clone(); Thread::start(proc() { unsafe { - let mut s = s; loop { match s.steal() { Data(2) => { HITS.fetch_add(1, SeqCst); } @@ -604,8 +608,8 @@ mod tests { static AMT: int = 10000; static NTHREADS: int = 4; static mut DONE: AtomicBool = INIT_ATOMIC_BOOL; - let mut pool = BufferPool::<(int, uint)>::new(); - let (mut w, s) = pool.deque(); + let pool = BufferPool::<(int, uint)>::new(); + let (w, s) = pool.deque(); let (threads, hits) = vec::unzip(range(0, NTHREADS).map(|_| { let s = s.clone(); @@ -615,7 +619,6 @@ mod tests { }; (Thread::start(proc() { unsafe { - let mut s = s; loop { match s.steal() { Data((1, 2)) => { diff --git a/src/libstd/sync/mod.rs b/src/libstd/sync/mod.rs index 3213c538152..b2cf427edc8 100644 --- a/src/libstd/sync/mod.rs +++ b/src/libstd/sync/mod.rs @@ -15,7 +15,6 @@ //! and/or blocking at all, but rather provide the necessary tools to build //! other types of concurrent primitives. -pub mod arc; pub mod atomics; pub mod deque; pub mod mpmc_bounded_queue; diff --git a/src/libstd/sync/mpmc_bounded_queue.rs b/src/libstd/sync/mpmc_bounded_queue.rs index 2df5031b482..ffad9c1c583 100644 --- a/src/libstd/sync/mpmc_bounded_queue.rs +++ b/src/libstd/sync/mpmc_bounded_queue.rs @@ -29,13 +29,15 @@ // http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue +use alloc::arc::Arc; + use clone::Clone; use kinds::Send; use num::next_power_of_two; use option::{Option, Some, None}; -use sync::arc::UnsafeArc; use sync::atomics::{AtomicUint,Relaxed,Release,Acquire}; use vec::Vec; +use ty::Unsafe; struct Node<T> { sequence: AtomicUint, @@ -44,7 +46,7 @@ struct Node<T> { struct State<T> { pad0: [u8, ..64], - buffer: Vec<Node<T>>, + buffer: Vec<Unsafe<Node<T>>>, mask: uint, pad1: [u8, ..64], enqueue_pos: AtomicUint, @@ -54,7 +56,7 @@ struct State<T> { } pub struct Queue<T> { - state: UnsafeArc<State<T>>, + state: Arc<State<T>>, } impl<T: Send> State<T> { @@ -70,7 +72,7 @@ impl<T: Send> State<T> { capacity }; let buffer = Vec::from_fn(capacity, |i| { - Node { sequence:AtomicUint::new(i), value: None } + Unsafe::new(Node { sequence:AtomicUint::new(i), value: None }) }); State{ pad0: [0, ..64], @@ -84,19 +86,21 @@ impl<T: Send> State<T> { } } - fn push(&mut self, value: T) -> bool { + fn push(&self, value: T) -> bool { let mask = self.mask; let mut pos = self.enqueue_pos.load(Relaxed); loop { - let node = self.buffer.get_mut(pos & mask); - let seq = node.sequence.load(Acquire); + let node = self.buffer.get(pos & mask); + let seq = unsafe { (*node.get()).sequence.load(Acquire) }; let diff: int = seq as int - pos as int; if diff == 0 { let enqueue_pos = self.enqueue_pos.compare_and_swap(pos, pos+1, Relaxed); if enqueue_pos == pos { - node.value = Some(value); - node.sequence.store(pos+1, Release); + unsafe { + (*node.get()).value = Some(value); + (*node.get()).sequence.store(pos+1, Release); + } break } else { pos = enqueue_pos; @@ -110,19 +114,21 @@ impl<T: Send> State<T> { true } - fn pop(&mut self) -> Option<T> { + fn pop(&self) -> Option<T> { let mask = self.mask; let mut pos = self.dequeue_pos.load(Relaxed); loop { - let node = self.buffer.get_mut(pos & mask); - let seq = node.sequence.load(Acquire); + let node = self.buffer.get(pos & mask); + let seq = unsafe { (*node.get()).sequence.load(Acquire) }; let diff: int = seq as int - (pos + 1) as int; if diff == 0 { let dequeue_pos = self.dequeue_pos.compare_and_swap(pos, pos+1, Relaxed); if dequeue_pos == pos { - let value = node.value.take(); - node.sequence.store(pos + mask + 1, Release); - return value + unsafe { + let value = (*node.get()).value.take(); + (*node.get()).sequence.store(pos + mask + 1, Release); + return value + } } else { pos = dequeue_pos; } @@ -138,24 +144,22 @@ impl<T: Send> State<T> { impl<T: Send> Queue<T> { pub fn with_capacity(capacity: uint) -> Queue<T> { Queue{ - state: UnsafeArc::new(State::with_capacity(capacity)) + state: Arc::new(State::with_capacity(capacity)) } } - pub fn push(&mut self, value: T) -> bool { - unsafe { (*self.state.get()).push(value) } + pub fn push(&self, value: T) -> bool { + self.state.push(value) } - pub fn pop(&mut self) -> Option<T> { - unsafe { (*self.state.get()).pop() } + pub fn pop(&self) -> Option<T> { + self.state.pop() } } impl<T: Send> Clone for Queue<T> { fn clone(&self) -> Queue<T> { - Queue { - state: self.state.clone() - } + Queue { state: self.state.clone() } } } @@ -169,7 +173,7 @@ mod tests { fn test() { let nthreads = 8u; let nmsgs = 1000u; - let mut q = Queue::with_capacity(nthreads*nmsgs); + let q = Queue::with_capacity(nthreads*nmsgs); assert_eq!(None, q.pop()); let (tx, rx) = channel(); @@ -177,7 +181,7 @@ mod tests { let q = q.clone(); let tx = tx.clone(); native::task::spawn(proc() { - let mut q = q; + let q = q; for i in range(0, nmsgs) { assert!(q.push(i)); } @@ -191,7 +195,7 @@ mod tests { completion_rxs.push(rx); let q = q.clone(); native::task::spawn(proc() { - let mut q = q; + let q = q; let mut i = 0u; loop { match q.pop() { diff --git a/src/libstd/sync/mpsc_queue.rs b/src/libstd/sync/mpsc_queue.rs index 4cdcd05e9b4..4db24e82d37 100644 --- a/src/libstd/sync/mpsc_queue.rs +++ b/src/libstd/sync/mpsc_queue.rs @@ -45,6 +45,7 @@ use option::{Option, None, Some}; use owned::Box; use ptr::RawPtr; use sync::atomics::{AtomicPtr, Release, Acquire, AcqRel, Relaxed}; +use ty::Unsafe; /// A result of the `pop` function. pub enum PopResult<T> { @@ -69,7 +70,7 @@ struct Node<T> { /// popper at a time (many pushers are allowed). pub struct Queue<T> { head: AtomicPtr<Node<T>>, - tail: *mut Node<T>, + tail: Unsafe<*mut Node<T>>, } impl<T> Node<T> { @@ -88,12 +89,12 @@ impl<T: Send> Queue<T> { let stub = unsafe { Node::new(None) }; Queue { head: AtomicPtr::new(stub), - tail: stub, + tail: Unsafe::new(stub), } } /// Pushes a new value onto this queue. - pub fn push(&mut self, t: T) { + pub fn push(&self, t: T) { unsafe { let n = Node::new(Some(t)); let prev = self.head.swap(n, AcqRel); @@ -111,13 +112,13 @@ impl<T: Send> Queue<T> { /// /// This inconsistent state means that this queue does indeed have data, but /// it does not currently have access to it at this time. - pub fn pop(&mut self) -> PopResult<T> { + pub fn pop(&self) -> PopResult<T> { unsafe { - let tail = self.tail; + let tail = *self.tail.get(); let next = (*tail).next.load(Acquire); if !next.is_null() { - self.tail = next; + *self.tail.get() = next; assert!((*tail).value.is_none()); assert!((*next).value.is_some()); let ret = (*next).value.take_unwrap(); @@ -131,7 +132,7 @@ impl<T: Send> Queue<T> { /// Attempts to pop data from this queue, but doesn't attempt too hard. This /// will canonicalize inconsistent states to a `None` value. - pub fn casual_pop(&mut self) -> Option<T> { + pub fn casual_pop(&self) -> Option<T> { match self.pop() { Data(t) => Some(t), Empty | Inconsistent => None, @@ -143,7 +144,7 @@ impl<T: Send> Queue<T> { impl<T: Send> Drop for Queue<T> { fn drop(&mut self) { unsafe { - let mut cur = self.tail; + let mut cur = *self.tail.get(); while !cur.is_null() { let next = (*cur).next.load(Relaxed); let _: Box<Node<T>> = mem::transmute(cur); @@ -157,13 +158,14 @@ impl<T: Send> Drop for Queue<T> { mod tests { use prelude::*; + use alloc::arc::Arc; + use native; use super::{Queue, Data, Empty, Inconsistent}; - use sync::arc::UnsafeArc; #[test] fn test_full() { - let mut q = Queue::new(); + let q = Queue::new(); q.push(box 1); q.push(box 2); } @@ -172,20 +174,20 @@ mod tests { fn test() { let nthreads = 8u; let nmsgs = 1000u; - let mut q = Queue::new(); + let q = Queue::new(); match q.pop() { Empty => {} Inconsistent | Data(..) => fail!() } let (tx, rx) = channel(); - let q = UnsafeArc::new(q); + let q = Arc::new(q); for _ in range(0, nthreads) { let tx = tx.clone(); let q = q.clone(); native::task::spawn(proc() { for i in range(0, nmsgs) { - unsafe { (*q.get()).push(i); } + q.push(i); } tx.send(()); }); @@ -193,7 +195,7 @@ mod tests { let mut i = 0u; while i < nthreads * nmsgs { - match unsafe { (*q.get()).pop() } { + match q.pop() { Empty | Inconsistent => {}, Data(_) => { i += 1 } } diff --git a/src/libstd/sync/spsc_queue.rs b/src/libstd/sync/spsc_queue.rs index ed6d690def0..fb515c9db6e 100644 --- a/src/libstd/sync/spsc_queue.rs +++ b/src/libstd/sync/spsc_queue.rs @@ -40,6 +40,7 @@ use option::{Some, None, Option}; use owned::Box; use ptr::RawPtr; use sync::atomics::{AtomicPtr, Relaxed, AtomicUint, Acquire, Release}; +use ty::Unsafe; // Node within the linked list queue of messages to send struct Node<T> { @@ -51,18 +52,18 @@ struct Node<T> { } /// The single-producer single-consumer queue. This structure is not cloneable, -/// but it can be safely shared in an UnsafeArc if it is guaranteed that there +/// but it can be safely shared in an Arc if it is guaranteed that there /// is only one popper and one pusher touching the queue at any one point in /// time. pub struct Queue<T> { // consumer fields - tail: *mut Node<T>, // where to pop from + tail: Unsafe<*mut Node<T>>, // where to pop from tail_prev: AtomicPtr<Node<T>>, // where to pop from // producer fields - head: *mut Node<T>, // where to push to - first: *mut Node<T>, // where to get new nodes from - tail_copy: *mut Node<T>, // between first/tail + head: Unsafe<*mut Node<T>>, // where to push to + first: Unsafe<*mut Node<T>>, // where to get new nodes from + tail_copy: Unsafe<*mut Node<T>>, // between first/tail // Cache maintenance fields. Additions and subtractions are stored // separately in order to allow them to use nonatomic addition/subtraction. @@ -101,11 +102,11 @@ impl<T: Send> Queue<T> { let n2 = Node::new(); unsafe { (*n1).next.store(n2, Relaxed) } Queue { - tail: n2, + tail: Unsafe::new(n2), tail_prev: AtomicPtr::new(n1), - head: n2, - first: n1, - tail_copy: n1, + head: Unsafe::new(n2), + first: Unsafe::new(n1), + tail_copy: Unsafe::new(n1), cache_bound: bound, cache_additions: AtomicUint::new(0), cache_subtractions: AtomicUint::new(0), @@ -114,7 +115,7 @@ impl<T: Send> Queue<T> { /// Pushes a new value onto this queue. Note that to use this function /// safely, it must be externally guaranteed that there is only one pusher. - pub fn push(&mut self, t: T) { + pub fn push(&self, t: T) { unsafe { // Acquire a node (which either uses a cached one or allocates a new // one), and then append this to the 'head' node. @@ -122,35 +123,35 @@ impl<T: Send> Queue<T> { assert!((*n).value.is_none()); (*n).value = Some(t); (*n).next.store(0 as *mut Node<T>, Relaxed); - (*self.head).next.store(n, Release); - self.head = n; + (**self.head.get()).next.store(n, Release); + *self.head.get() = n; } } - unsafe fn alloc(&mut self) -> *mut Node<T> { + unsafe fn alloc(&self) -> *mut Node<T> { // First try to see if we can consume the 'first' node for our uses. // We try to avoid as many atomic instructions as possible here, so // the addition to cache_subtractions is not atomic (plus we're the // only one subtracting from the cache). - if self.first != self.tail_copy { + if *self.first.get() != *self.tail_copy.get() { if self.cache_bound > 0 { let b = self.cache_subtractions.load(Relaxed); self.cache_subtractions.store(b + 1, Relaxed); } - let ret = self.first; - self.first = (*ret).next.load(Relaxed); + let ret = *self.first.get(); + *self.first.get() = (*ret).next.load(Relaxed); return ret; } // If the above fails, then update our copy of the tail and try // again. - self.tail_copy = self.tail_prev.load(Acquire); - if self.first != self.tail_copy { + *self.tail_copy.get() = self.tail_prev.load(Acquire); + if *self.first.get() != *self.tail_copy.get() { if self.cache_bound > 0 { let b = self.cache_subtractions.load(Relaxed); self.cache_subtractions.store(b + 1, Relaxed); } - let ret = self.first; - self.first = (*ret).next.load(Relaxed); + let ret = *self.first.get(); + *self.first.get() = (*ret).next.load(Relaxed); return ret; } // If all of that fails, then we have to allocate a new node @@ -160,19 +161,19 @@ impl<T: Send> Queue<T> { /// Attempts to pop a value from this queue. Remember that to use this type /// safely you must ensure that there is only one popper at a time. - pub fn pop(&mut self) -> Option<T> { + pub fn pop(&self) -> Option<T> { unsafe { // The `tail` node is not actually a used node, but rather a // sentinel from where we should start popping from. Hence, look at // tail's next field and see if we can use it. If we do a pop, then // the current tail node is a candidate for going into the cache. - let tail = self.tail; + let tail = *self.tail.get(); let next = (*tail).next.load(Acquire); if next.is_null() { return None } assert!((*next).value.is_some()); let ret = (*next).value.take(); - self.tail = next; + *self.tail.get() = next; if self.cache_bound == 0 { self.tail_prev.store(tail, Release); } else { @@ -197,11 +198,11 @@ impl<T: Send> Queue<T> { /// Attempts to peek at the head of the queue, returning `None` if the queue /// has no data currently - pub fn peek<'a>(&'a mut self) -> Option<&'a mut T> { + pub fn peek<'a>(&'a self) -> Option<&'a mut T> { // This is essentially the same as above with all the popping bits // stripped out. unsafe { - let tail = self.tail; + let tail = *self.tail.get(); let next = (*tail).next.load(Acquire); if next.is_null() { return None } return (*next).value.as_mut(); @@ -213,7 +214,7 @@ impl<T: Send> Queue<T> { impl<T: Send> Drop for Queue<T> { fn drop(&mut self) { unsafe { - let mut cur = self.first; + let mut cur = *self.first.get(); while !cur.is_null() { let next = (*cur).next.load(Relaxed); let _n: Box<Node<T>> = mem::transmute(cur); @@ -226,13 +227,15 @@ impl<T: Send> Drop for Queue<T> { #[cfg(test)] mod test { use prelude::*; + + use alloc::arc::Arc; use native; + use super::Queue; - use sync::arc::UnsafeArc; #[test] fn smoke() { - let mut q = Queue::new(0); + let q = Queue::new(0); q.push(1); q.push(2); assert_eq!(q.pop(), Some(1)); @@ -247,14 +250,14 @@ mod test { #[test] fn drop_full() { - let mut q = Queue::new(0); + let q = Queue::new(0); q.push(box 1); q.push(box 2); } #[test] fn smoke_bound() { - let mut q = Queue::new(1); + let q = Queue::new(1); q.push(1); q.push(2); assert_eq!(q.pop(), Some(1)); @@ -273,12 +276,13 @@ mod test { stress_bound(1); fn stress_bound(bound: uint) { - let (a, b) = UnsafeArc::new2(Queue::new(bound)); + let a = Arc::new(Queue::new(bound)); + let b = a.clone(); let (tx, rx) = channel(); native::task::spawn(proc() { for _ in range(0, 100000) { loop { - match unsafe { (*b.get()).pop() } { + match b.pop() { Some(1) => break, Some(_) => fail!(), None => {} @@ -288,7 +292,7 @@ mod test { tx.send(()); }); for _ in range(0, 100000) { - unsafe { (*a.get()).push(1); } + a.push(1); } rx.recv(); } diff --git a/src/libstd/unstable/sync.rs b/src/libstd/unstable/sync.rs index 5be10fc27df..f0f7e40ce09 100644 --- a/src/libstd/unstable/sync.rs +++ b/src/libstd/unstable/sync.rs @@ -8,9 +8,11 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +use alloc::arc::Arc; + use clone::Clone; use kinds::Send; -use sync::arc::UnsafeArc; +use ty::Unsafe; use unstable::mutex::NativeMutex; struct ExData<T> { @@ -30,7 +32,7 @@ struct ExData<T> { * need to block or deschedule while accessing shared state, use extra::sync::RWArc. */ pub struct Exclusive<T> { - x: UnsafeArc<ExData<T>> + x: Arc<Unsafe<ExData<T>>> } impl<T:Send> Clone for Exclusive<T> { @@ -48,7 +50,7 @@ impl<T:Send> Exclusive<T> { data: user_data }; Exclusive { - x: UnsafeArc::new(data) + x: Arc::new(Unsafe::new(data)) } } |
