From efbd3724c012d68afd428beaa22f0d5aabff007d Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Mon, 19 May 2014 15:51:31 -0700 Subject: std: Rebuild sync::deque on Arc This also removes the `&mut self` requirement, using the correct `&self` requirement for concurrent types. --- src/libstd/sync/deque.rs | 52 +++++++++++++++++++++++++----------------------- 1 file changed, 27 insertions(+), 25 deletions(-) (limited to 'src/libstd/sync') diff --git a/src/libstd/sync/deque.rs b/src/libstd/sync/deque.rs index 30b95ffb34f..42a8bd88652 100644 --- a/src/libstd/sync/deque.rs +++ b/src/libstd/sync/deque.rs @@ -48,6 +48,8 @@ // FIXME: all atomic operations in this module use a SeqCst ordering. That is // probably overkill +use alloc::arc::Arc; + use clone::Clone; use iter::{range, Iterator}; use kinds::Send; @@ -58,7 +60,6 @@ use owned::Box; use ptr::RawPtr; use ptr; use slice::ImmutableVector; -use sync::arc::UnsafeArc; use sync::atomics::{AtomicInt, AtomicPtr, SeqCst}; use unstable::sync::Exclusive; use rt::heap::{allocate, deallocate}; @@ -87,14 +88,14 @@ struct Deque { /// /// There may only be one worker per deque. pub struct Worker { - deque: UnsafeArc>, + deque: Arc>, } /// The stealing half of the work-stealing deque. Stealers have access to the /// opposite end of the deque from the worker, and they only have access to the /// `steal` method. pub struct Stealer { - deque: UnsafeArc>, + deque: Arc>, } /// When stealing some data, this is an enumeration of the possible outcomes. @@ -149,12 +150,13 @@ impl BufferPool { /// Allocates a new work-stealing deque which will send/receiving memory to /// and from this buffer pool. - pub fn deque(&mut self) -> (Worker, Stealer) { - let (a, b) = UnsafeArc::new2(Deque::new(self.clone())); + pub fn deque(&self) -> (Worker, Stealer) { + let a = Arc::new(Deque::new(self.clone())); + let b = a.clone(); (Worker { deque: a }, Stealer { deque: b }) } - fn alloc(&mut self, bits: int) -> Box> { + fn alloc(&self, bits: int) -> Box> { unsafe { self.pool.with(|pool| { match pool.iter().position(|x| x.size() >= (1 << bits)) { @@ -165,7 +167,7 @@ impl BufferPool { } } - fn free(&mut self, buf: Box>) { + fn free(&self, buf: Box>) { unsafe { let mut buf = Some(buf); self.pool.with(|pool| { @@ -185,34 +187,34 @@ impl Clone for BufferPool { impl Worker { /// Pushes data onto the front of this work queue. - pub fn push(&mut self, t: T) { - unsafe { (*self.deque.get()).push(t) } + pub fn push(&self, t: T) { + unsafe { self.deque.push(t) } } /// Pops data off the front of the work queue, returning `None` on an empty /// queue. - pub fn pop(&mut self) -> Option { - unsafe { (*self.deque.get()).pop() } + pub fn pop(&self) -> Option { + unsafe { self.deque.pop() } } /// Gets access to the buffer pool that this worker is attached to. This can /// be used to create more deques which share the same buffer pool as this /// deque. - pub fn pool<'a>(&'a mut self) -> &'a mut BufferPool { - unsafe { &mut (*self.deque.get()).pool } + pub fn pool<'a>(&'a self) -> &'a BufferPool { + &self.deque.pool } } impl Stealer { /// Steals work off the end of the queue (opposite of the worker's end) - pub fn steal(&mut self) -> Stolen { - unsafe { (*self.deque.get()).steal() } + pub fn steal(&self) -> Stolen { + unsafe { self.deque.steal() } } /// Gets access to the buffer pool that this stealer is attached to. This /// can be used to create more deques which share the same buffer pool as /// this deque. - pub fn pool<'a>(&'a mut self) -> &'a mut BufferPool { - unsafe { &mut (*self.deque.get()).pool } + pub fn pool<'a>(&'a self) -> &'a BufferPool { + &self.deque.pool } } @@ -224,7 +226,7 @@ impl Clone for Stealer { // personally going to heavily comment what's going on here. impl Deque { - fn new(mut pool: BufferPool) -> Deque { + fn new(pool: BufferPool) -> Deque { let buf = pool.alloc(MIN_BITS); Deque { bottom: AtomicInt::new(0), @@ -234,7 +236,7 @@ impl Deque { } } - unsafe fn push(&mut self, data: T) { + unsafe fn push(&self, data: T) { let mut b = self.bottom.load(SeqCst); let t = self.top.load(SeqCst); let mut a = self.array.load(SeqCst); @@ -250,7 +252,7 @@ impl Deque { self.bottom.store(b + 1, SeqCst); } - unsafe fn pop(&mut self) -> Option { + unsafe fn pop(&self) -> Option { let b = self.bottom.load(SeqCst); let a = self.array.load(SeqCst); let b = b - 1; @@ -276,7 +278,7 @@ impl Deque { } } - unsafe fn steal(&mut self) -> Stolen { + unsafe fn steal(&self) -> Stolen { let t = self.top.load(SeqCst); let old = self.array.load(SeqCst); let b = self.bottom.load(SeqCst); @@ -298,7 +300,7 @@ impl Deque { } } - unsafe fn maybe_shrink(&mut self, b: int, t: int) { + unsafe fn maybe_shrink(&self, b: int, t: int) { let a = self.array.load(SeqCst); if b - t < (*a).size() / K && b - t > (1 << MIN_BITS) { self.swap_buffer(b, a, (*a).resize(b, t, -1)); @@ -312,7 +314,7 @@ impl Deque { // after this method has called 'free' on it. The continued usage is simply // a read followed by a forget, but we must make sure that the memory can // continue to be read after we flag this buffer for reclamation. - unsafe fn swap_buffer(&mut self, b: int, old: *mut Buffer, + unsafe fn swap_buffer(&self, b: int, old: *mut Buffer, buf: Buffer) -> *mut Buffer { let newbuf: *mut Buffer = transmute(box buf); self.array.store(newbuf, SeqCst); @@ -373,7 +375,7 @@ impl Buffer { // Unsafe because this unsafely overwrites possibly uninitialized or // initialized data. - unsafe fn put(&mut self, i: int, t: T) { + unsafe fn put(&self, i: int, t: T) { let ptr = self.storage.offset(i & self.mask()); ptr::copy_nonoverlapping_memory(ptr as *mut T, &t as *T, 1); forget(t); @@ -382,7 +384,7 @@ impl Buffer { // Again, unsafe because this has incredibly dubious ownership violations. // It is assumed that this buffer is immediately dropped. unsafe fn resize(&self, b: int, t: int, delta: int) -> Buffer { - let mut buf = Buffer::new(self.log_size + delta); + let buf = Buffer::new(self.log_size + delta); for i in range(t, b) { buf.put(i, self.get(i)); } -- cgit 1.4.1-3-g733a5 From 7db02e20f2140530a9402f7d7452b10cac6fdf7b Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Mon, 19 May 2014 16:01:48 -0700 Subject: std: Rebuild mpmc queues on Unsafe/Arc This removes usage of UnsafeArc and uses proper self mutability for concurrent types. --- src/libstd/sync/mpmc_bounded_queue.rs | 50 +++++++++++++++++++---------------- 1 file changed, 27 insertions(+), 23 deletions(-) (limited to 'src/libstd/sync') diff --git a/src/libstd/sync/mpmc_bounded_queue.rs b/src/libstd/sync/mpmc_bounded_queue.rs index 2df5031b482..7fb98e14086 100644 --- a/src/libstd/sync/mpmc_bounded_queue.rs +++ b/src/libstd/sync/mpmc_bounded_queue.rs @@ -29,13 +29,15 @@ // http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue +use alloc::arc::Arc; + use clone::Clone; use kinds::Send; use num::next_power_of_two; use option::{Option, Some, None}; -use sync::arc::UnsafeArc; use sync::atomics::{AtomicUint,Relaxed,Release,Acquire}; use vec::Vec; +use ty::Unsafe; struct Node { sequence: AtomicUint, @@ -44,7 +46,7 @@ struct Node { struct State { pad0: [u8, ..64], - buffer: Vec>, + buffer: Vec>>, mask: uint, pad1: [u8, ..64], enqueue_pos: AtomicUint, @@ -54,7 +56,7 @@ struct State { } pub struct Queue { - state: UnsafeArc>, + state: Arc>, } impl State { @@ -70,7 +72,7 @@ impl State { capacity }; let buffer = Vec::from_fn(capacity, |i| { - Node { sequence:AtomicUint::new(i), value: None } + Unsafe::new(Node { sequence:AtomicUint::new(i), value: None }) }); State{ pad0: [0, ..64], @@ -84,19 +86,21 @@ impl State { } } - fn push(&mut self, value: T) -> bool { + fn push(&self, value: T) -> bool { let mask = self.mask; let mut pos = self.enqueue_pos.load(Relaxed); loop { - let node = self.buffer.get_mut(pos & mask); - let seq = node.sequence.load(Acquire); + let node = self.buffer.get(pos & mask); + let seq = unsafe { (*node.get()).sequence.load(Acquire) }; let diff: int = seq as int - pos as int; if diff == 0 { let enqueue_pos = self.enqueue_pos.compare_and_swap(pos, pos+1, Relaxed); if enqueue_pos == pos { - node.value = Some(value); - node.sequence.store(pos+1, Release); + unsafe { + (*node.get()).value = Some(value); + (*node.get()).sequence.store(pos+1, Release); + } break } else { pos = enqueue_pos; @@ -110,19 +114,21 @@ impl State { true } - fn pop(&mut self) -> Option { + fn pop(&self) -> Option { let mask = self.mask; let mut pos = self.dequeue_pos.load(Relaxed); loop { - let node = self.buffer.get_mut(pos & mask); - let seq = node.sequence.load(Acquire); + let node = self.buffer.get(pos & mask); + let seq = unsafe { (*node.get()).sequence.load(Acquire) }; let diff: int = seq as int - (pos + 1) as int; if diff == 0 { let dequeue_pos = self.dequeue_pos.compare_and_swap(pos, pos+1, Relaxed); if dequeue_pos == pos { - let value = node.value.take(); - node.sequence.store(pos + mask + 1, Release); - return value + unsafe { + let value = (*node.get()).value.take(); + (*node.get()).sequence.store(pos + mask + 1, Release); + return value + } } else { pos = dequeue_pos; } @@ -138,24 +144,22 @@ impl State { impl Queue { pub fn with_capacity(capacity: uint) -> Queue { Queue{ - state: UnsafeArc::new(State::with_capacity(capacity)) + state: Arc::new(State::with_capacity(capacity)) } } - pub fn push(&mut self, value: T) -> bool { - unsafe { (*self.state.get()).push(value) } + pub fn push(&self, value: T) -> bool { + self.state.push(value) } - pub fn pop(&mut self) -> Option { - unsafe { (*self.state.get()).pop() } + pub fn pop(&self) -> Option { + self.state.pop() } } impl Clone for Queue { fn clone(&self) -> Queue { - Queue { - state: self.state.clone() - } + Queue { state: self.state.clone() } } } -- cgit 1.4.1-3-g733a5 From 2966e970cabdf7103ad61c840c72bf58352150e0 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Mon, 19 May 2014 17:27:52 -0700 Subject: std: Rebuild mpsc queue with Unsafe/&self This removes the incorrect `&mut self` taken because it can alias among many threads. --- src/libstd/sync/mpsc_queue.rs | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) (limited to 'src/libstd/sync') diff --git a/src/libstd/sync/mpsc_queue.rs b/src/libstd/sync/mpsc_queue.rs index 4cdcd05e9b4..23afb9487ec 100644 --- a/src/libstd/sync/mpsc_queue.rs +++ b/src/libstd/sync/mpsc_queue.rs @@ -45,6 +45,7 @@ use option::{Option, None, Some}; use owned::Box; use ptr::RawPtr; use sync::atomics::{AtomicPtr, Release, Acquire, AcqRel, Relaxed}; +use ty::Unsafe; /// A result of the `pop` function. pub enum PopResult { @@ -69,7 +70,7 @@ struct Node { /// popper at a time (many pushers are allowed). pub struct Queue { head: AtomicPtr>, - tail: *mut Node, + tail: Unsafe<*mut Node>, } impl Node { @@ -88,12 +89,12 @@ impl Queue { let stub = unsafe { Node::new(None) }; Queue { head: AtomicPtr::new(stub), - tail: stub, + tail: Unsafe::new(stub), } } /// Pushes a new value onto this queue. - pub fn push(&mut self, t: T) { + pub fn push(&self, t: T) { unsafe { let n = Node::new(Some(t)); let prev = self.head.swap(n, AcqRel); @@ -111,13 +112,13 @@ impl Queue { /// /// This inconsistent state means that this queue does indeed have data, but /// it does not currently have access to it at this time. - pub fn pop(&mut self) -> PopResult { + pub fn pop(&self) -> PopResult { unsafe { - let tail = self.tail; + let tail = *self.tail.get(); let next = (*tail).next.load(Acquire); if !next.is_null() { - self.tail = next; + *self.tail.get() = next; assert!((*tail).value.is_none()); assert!((*next).value.is_some()); let ret = (*next).value.take_unwrap(); @@ -131,7 +132,7 @@ impl Queue { /// Attempts to pop data from this queue, but doesn't attempt too hard. This /// will canonicalize inconsistent states to a `None` value. - pub fn casual_pop(&mut self) -> Option { + pub fn casual_pop(&self) -> Option { match self.pop() { Data(t) => Some(t), Empty | Inconsistent => None, @@ -143,7 +144,7 @@ impl Queue { impl Drop for Queue { fn drop(&mut self) { unsafe { - let mut cur = self.tail; + let mut cur = *self.tail.get(); while !cur.is_null() { let next = (*cur).next.load(Relaxed); let _: Box> = mem::transmute(cur); -- cgit 1.4.1-3-g733a5 From fe93c3d47ed0cdf0a0cbac66a9f35ddb4c6783a2 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Mon, 19 May 2014 17:32:04 -0700 Subject: std: Rebuild spsc with Unsafe/&self This removes the incorrect usage of `&mut self` in a concurrent setting. --- src/libstd/sync/spsc_queue.rs | 51 ++++++++++++++++++++++--------------------- 1 file changed, 26 insertions(+), 25 deletions(-) (limited to 'src/libstd/sync') diff --git a/src/libstd/sync/spsc_queue.rs b/src/libstd/sync/spsc_queue.rs index ed6d690def0..b9827ee6b2a 100644 --- a/src/libstd/sync/spsc_queue.rs +++ b/src/libstd/sync/spsc_queue.rs @@ -40,6 +40,7 @@ use option::{Some, None, Option}; use owned::Box; use ptr::RawPtr; use sync::atomics::{AtomicPtr, Relaxed, AtomicUint, Acquire, Release}; +use ty::Unsafe; // Node within the linked list queue of messages to send struct Node { @@ -56,13 +57,13 @@ struct Node { /// time. pub struct Queue { // consumer fields - tail: *mut Node, // where to pop from + tail: Unsafe<*mut Node>, // where to pop from tail_prev: AtomicPtr>, // where to pop from // producer fields - head: *mut Node, // where to push to - first: *mut Node, // where to get new nodes from - tail_copy: *mut Node, // between first/tail + head: Unsafe<*mut Node>, // where to push to + first: Unsafe<*mut Node>, // where to get new nodes from + tail_copy: Unsafe<*mut Node>, // between first/tail // Cache maintenance fields. Additions and subtractions are stored // separately in order to allow them to use nonatomic addition/subtraction. @@ -101,11 +102,11 @@ impl Queue { let n2 = Node::new(); unsafe { (*n1).next.store(n2, Relaxed) } Queue { - tail: n2, + tail: Unsafe::new(n2), tail_prev: AtomicPtr::new(n1), - head: n2, - first: n1, - tail_copy: n1, + head: Unsafe::new(n2), + first: Unsafe::new(n1), + tail_copy: Unsafe::new(n1), cache_bound: bound, cache_additions: AtomicUint::new(0), cache_subtractions: AtomicUint::new(0), @@ -114,7 +115,7 @@ impl Queue { /// Pushes a new value onto this queue. Note that to use this function /// safely, it must be externally guaranteed that there is only one pusher. - pub fn push(&mut self, t: T) { + pub fn push(&self, t: T) { unsafe { // Acquire a node (which either uses a cached one or allocates a new // one), and then append this to the 'head' node. @@ -122,35 +123,35 @@ impl Queue { assert!((*n).value.is_none()); (*n).value = Some(t); (*n).next.store(0 as *mut Node, Relaxed); - (*self.head).next.store(n, Release); - self.head = n; + (**self.head.get()).next.store(n, Release); + *self.head.get() = n; } } - unsafe fn alloc(&mut self) -> *mut Node { + unsafe fn alloc(&self) -> *mut Node { // First try to see if we can consume the 'first' node for our uses. // We try to avoid as many atomic instructions as possible here, so // the addition to cache_subtractions is not atomic (plus we're the // only one subtracting from the cache). - if self.first != self.tail_copy { + if *self.first.get() != *self.tail_copy.get() { if self.cache_bound > 0 { let b = self.cache_subtractions.load(Relaxed); self.cache_subtractions.store(b + 1, Relaxed); } - let ret = self.first; - self.first = (*ret).next.load(Relaxed); + let ret = *self.first.get(); + *self.first.get() = (*ret).next.load(Relaxed); return ret; } // If the above fails, then update our copy of the tail and try // again. - self.tail_copy = self.tail_prev.load(Acquire); - if self.first != self.tail_copy { + *self.tail_copy.get() = self.tail_prev.load(Acquire); + if *self.first.get() != *self.tail_copy.get() { if self.cache_bound > 0 { let b = self.cache_subtractions.load(Relaxed); self.cache_subtractions.store(b + 1, Relaxed); } - let ret = self.first; - self.first = (*ret).next.load(Relaxed); + let ret = *self.first.get(); + *self.first.get() = (*ret).next.load(Relaxed); return ret; } // If all of that fails, then we have to allocate a new node @@ -160,19 +161,19 @@ impl Queue { /// Attempts to pop a value from this queue. Remember that to use this type /// safely you must ensure that there is only one popper at a time. - pub fn pop(&mut self) -> Option { + pub fn pop(&self) -> Option { unsafe { // The `tail` node is not actually a used node, but rather a // sentinel from where we should start popping from. Hence, look at // tail's next field and see if we can use it. If we do a pop, then // the current tail node is a candidate for going into the cache. - let tail = self.tail; + let tail = *self.tail.get(); let next = (*tail).next.load(Acquire); if next.is_null() { return None } assert!((*next).value.is_some()); let ret = (*next).value.take(); - self.tail = next; + *self.tail.get() = next; if self.cache_bound == 0 { self.tail_prev.store(tail, Release); } else { @@ -197,11 +198,11 @@ impl Queue { /// Attempts to peek at the head of the queue, returning `None` if the queue /// has no data currently - pub fn peek<'a>(&'a mut self) -> Option<&'a mut T> { + pub fn peek<'a>(&'a self) -> Option<&'a mut T> { // This is essentially the same as above with all the popping bits // stripped out. unsafe { - let tail = self.tail; + let tail = *self.tail.get(); let next = (*tail).next.load(Acquire); if next.is_null() { return None } return (*next).value.as_mut(); @@ -213,7 +214,7 @@ impl Queue { impl Drop for Queue { fn drop(&mut self) { unsafe { - let mut cur = self.first; + let mut cur = *self.first.get(); while !cur.is_null() { let next = (*cur).next.load(Relaxed); let _n: Box> = mem::transmute(cur); -- cgit 1.4.1-3-g733a5 From 73729e94c87281dd7193dbdc86b4de2963b8fd72 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Mon, 19 May 2014 17:50:57 -0700 Subject: std: Move comm primitives away from UnsafeArc They currently still use `&mut self`, this migration was aimed towards moving from UnsafeArc to Arc> --- src/libstd/comm/mod.rs | 49 ++++++++++++++++++++++++------------------- src/libstd/comm/oneshot.rs | 2 +- src/libstd/sync/mpsc_queue.rs | 9 ++++---- src/libstd/sync/spsc_queue.rs | 9 +++++--- 4 files changed, 39 insertions(+), 30 deletions(-) (limited to 'src/libstd/sync') diff --git a/src/libstd/comm/mod.rs b/src/libstd/comm/mod.rs index df0c6f3b8d3..fd5b92ba469 100644 --- a/src/libstd/comm/mod.rs +++ b/src/libstd/comm/mod.rs @@ -271,6 +271,8 @@ // And now that you've seen all the races that I found and attempted to fix, // here's the code for you to find some more! +use alloc::arc::Arc; + use cell::Cell; use clone::Clone; use iter::Iterator; @@ -283,7 +285,6 @@ use owned::Box; use result::{Ok, Err, Result}; use rt::local::Local; use rt::task::{Task, BlockedTask}; -use sync::arc::UnsafeArc; use ty::Unsafe; pub use comm::select::{Select, Handle}; @@ -352,7 +353,7 @@ pub struct Sender { /// The sending-half of Rust's synchronous channel type. This half can only be /// owned by one task, but it can be cloned to send to other tasks. pub struct SyncSender { - inner: UnsafeArc>, + inner: Arc>>, // can't share in an arc marker: marker::NoShare, } @@ -386,10 +387,10 @@ pub enum TrySendError { } enum Flavor { - Oneshot(UnsafeArc>), - Stream(UnsafeArc>), - Shared(UnsafeArc>), - Sync(UnsafeArc>), + Oneshot(Arc>>), + Stream(Arc>>), + Shared(Arc>>), + Sync(Arc>>), } #[doc(hidden)] @@ -435,8 +436,8 @@ impl UnsafeFlavor for Receiver { /// println!("{}", rx.recv()); /// ``` pub fn channel() -> (Sender, Receiver) { - let (a, b) = UnsafeArc::new2(oneshot::Packet::new()); - (Sender::new(Oneshot(b)), Receiver::new(Oneshot(a))) + let a = Arc::new(Unsafe::new(oneshot::Packet::new())); + (Sender::new(Oneshot(a.clone())), Receiver::new(Oneshot(a))) } /// Creates a new synchronous, bounded channel. @@ -471,8 +472,8 @@ pub fn channel() -> (Sender, Receiver) { /// assert_eq!(rx.recv(), 2); /// ``` pub fn sync_channel(bound: uint) -> (SyncSender, Receiver) { - let (a, b) = UnsafeArc::new2(sync::Packet::new(bound)); - (SyncSender::new(a), Receiver::new(Sync(b))) + let a = Arc::new(Unsafe::new(sync::Packet::new(bound))); + (SyncSender::new(a.clone()), Receiver::new(Sync(a))) } //////////////////////////////////////////////////////////////////////////////// @@ -557,13 +558,13 @@ impl Sender { let (new_inner, ret) = match *unsafe { self.inner() } { Oneshot(ref p) => { - let p = p.get(); unsafe { + let p = p.get(); if !(*p).sent() { return (*p).send(t); } else { - let (a, b) = UnsafeArc::new2(stream::Packet::new()); - match (*p).upgrade(Receiver::new(Stream(b))) { + let a = Arc::new(Unsafe::new(stream::Packet::new())); + match (*p).upgrade(Receiver::new(Stream(a.clone()))) { oneshot::UpSuccess => { let ret = (*a.get()).send(t); (a, ret) @@ -598,17 +599,21 @@ impl Clone for Sender { fn clone(&self) -> Sender { let (packet, sleeper) = match *unsafe { self.inner() } { Oneshot(ref p) => { - let (a, b) = UnsafeArc::new2(shared::Packet::new()); - match unsafe { (*p.get()).upgrade(Receiver::new(Shared(a))) } { - oneshot::UpSuccess | oneshot::UpDisconnected => (b, None), - oneshot::UpWoke(task) => (b, Some(task)) + let a = Arc::new(Unsafe::new(shared::Packet::new())); + match unsafe { + (*p.get()).upgrade(Receiver::new(Shared(a.clone()))) + } { + oneshot::UpSuccess | oneshot::UpDisconnected => (a, None), + oneshot::UpWoke(task) => (a, Some(task)) } } Stream(ref p) => { - let (a, b) = UnsafeArc::new2(shared::Packet::new()); - match unsafe { (*p.get()).upgrade(Receiver::new(Shared(a))) } { - stream::UpSuccess | stream::UpDisconnected => (b, None), - stream::UpWoke(task) => (b, Some(task)), + let a = Arc::new(Unsafe::new(shared::Packet::new())); + match unsafe { + (*p.get()).upgrade(Receiver::new(Shared(a.clone()))) + } { + stream::UpSuccess | stream::UpDisconnected => (a, None), + stream::UpWoke(task) => (a, Some(task)), } } Shared(ref p) => { @@ -645,7 +650,7 @@ impl Drop for Sender { //////////////////////////////////////////////////////////////////////////////// impl SyncSender { - fn new(inner: UnsafeArc>) -> SyncSender { + fn new(inner: Arc>>) -> SyncSender { SyncSender { inner: inner, marker: marker::NoShare } } diff --git a/src/libstd/comm/oneshot.rs b/src/libstd/comm/oneshot.rs index a7124e50b66..f9e8fd1e534 100644 --- a/src/libstd/comm/oneshot.rs +++ b/src/libstd/comm/oneshot.rs @@ -15,7 +15,7 @@ /// this type is to have one and exactly one allocation when the chan/port pair /// is created. /// -/// Another possible optimization would be to not use an UnsafeArc box because +/// Another possible optimization would be to not use an Arc box because /// in theory we know when the shared packet can be deallocated (no real need /// for the atomic reference counting), but I was having trouble how to destroy /// the data early in a drop of a Port. diff --git a/src/libstd/sync/mpsc_queue.rs b/src/libstd/sync/mpsc_queue.rs index 23afb9487ec..f2f95da1842 100644 --- a/src/libstd/sync/mpsc_queue.rs +++ b/src/libstd/sync/mpsc_queue.rs @@ -158,9 +158,10 @@ impl Drop for Queue { mod tests { use prelude::*; + use alloc::arc::Arc; + use native; use super::{Queue, Data, Empty, Inconsistent}; - use sync::arc::UnsafeArc; #[test] fn test_full() { @@ -179,14 +180,14 @@ mod tests { Inconsistent | Data(..) => fail!() } let (tx, rx) = channel(); - let q = UnsafeArc::new(q); + let q = Arc::new(q); for _ in range(0, nthreads) { let tx = tx.clone(); let q = q.clone(); native::task::spawn(proc() { for i in range(0, nmsgs) { - unsafe { (*q.get()).push(i); } + q.push(i); } tx.send(()); }); @@ -194,7 +195,7 @@ mod tests { let mut i = 0u; while i < nthreads * nmsgs { - match unsafe { (*q.get()).pop() } { + match q.pop() { Empty | Inconsistent => {}, Data(_) => { i += 1 } } diff --git a/src/libstd/sync/spsc_queue.rs b/src/libstd/sync/spsc_queue.rs index b9827ee6b2a..093933c82fc 100644 --- a/src/libstd/sync/spsc_queue.rs +++ b/src/libstd/sync/spsc_queue.rs @@ -52,7 +52,7 @@ struct Node { } /// The single-producer single-consumer queue. This structure is not cloneable, -/// but it can be safely shared in an UnsafeArc if it is guaranteed that there +/// but it can be safely shared in an Arc if it is guaranteed that there /// is only one popper and one pusher touching the queue at any one point in /// time. pub struct Queue { @@ -227,9 +227,11 @@ impl Drop for Queue { #[cfg(test)] mod test { use prelude::*; + + use alloc::arc::Arc; use native; + use super::Queue; - use sync::arc::UnsafeArc; #[test] fn smoke() { @@ -274,7 +276,8 @@ mod test { stress_bound(1); fn stress_bound(bound: uint) { - let (a, b) = UnsafeArc::new2(Queue::new(bound)); + let a = Arc::new(Queue::new(bound)); + let b = a.clone(); let (tx, rx) = channel(); native::task::spawn(proc() { for _ in range(0, 100000) { -- cgit 1.4.1-3-g733a5 From 4c8a4d241a984fdc0b8a015dceca2a006f2b7146 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Mon, 19 May 2014 17:51:50 -0700 Subject: std: Remove UnsafeArc This type has been superseded by Arc>. The UnsafeArc type is a relic of an era that has long since past, and with the introduction of liballoc the standard library is able to use the Arc smart pointer. With little need left for UnsafeArc, it was removed. All existing code using UnsafeArc should either be reevaluated to whether it can use only Arc, or it should transition to Arc> [breaking-change] --- src/libstd/sync/arc.rs | 189 ---------------------------------- src/libstd/sync/deque.rs | 31 +++--- src/libstd/sync/mod.rs | 1 - src/libstd/sync/mpmc_bounded_queue.rs | 6 +- src/libstd/sync/mpsc_queue.rs | 4 +- src/libstd/sync/spsc_queue.rs | 10 +- 6 files changed, 23 insertions(+), 218 deletions(-) delete mode 100644 src/libstd/sync/arc.rs (limited to 'src/libstd/sync') diff --git a/src/libstd/sync/arc.rs b/src/libstd/sync/arc.rs deleted file mode 100644 index 7dcfe62ffb8..00000000000 --- a/src/libstd/sync/arc.rs +++ /dev/null @@ -1,189 +0,0 @@ -// Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT -// file at the top-level directory of this distribution and at -// http://rust-lang.org/COPYRIGHT. -// -// Licensed under the Apache License, Version 2.0 or the MIT license -// , at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -//! Atomically reference counted data -//! -//! This modules contains the implementation of an atomically reference counted -//! pointer for the purpose of sharing data between tasks. This is obviously a -//! very unsafe primitive to use, but it has its use cases when implementing -//! concurrent data structures and similar tasks. -//! -//! Great care must be taken to ensure that data races do not arise through the -//! usage of `UnsafeArc`, and this often requires some form of external -//! synchronization. The only guarantee provided to you by this class is that -//! the underlying data will remain valid (not free'd) so long as the reference -//! count is greater than one. - -use clone::Clone; -use iter::Iterator; -use kinds::Send; -use mem; -use ops::Drop; -use owned::Box; -use ptr::RawPtr; -use sync::atomics::{fence, AtomicUint, Relaxed, Acquire, Release}; -use ty::Unsafe; -use vec::Vec; - -/// An atomically reference counted pointer. -/// -/// Enforces no shared-memory safety. -#[unsafe_no_drop_flag] -pub struct UnsafeArc { - data: *mut ArcData, -} - -struct ArcData { - count: AtomicUint, - data: Unsafe, -} - -unsafe fn new_inner(data: T, refcount: uint) -> *mut ArcData { - let data = box ArcData { - count: AtomicUint::new(refcount), - data: Unsafe::new(data) - }; - mem::transmute(data) -} - -impl UnsafeArc { - /// Creates a new `UnsafeArc` which wraps the given data. - pub fn new(data: T) -> UnsafeArc { - unsafe { UnsafeArc { data: new_inner(data, 1) } } - } - - /// As new(), but returns an extra pre-cloned handle. - pub fn new2(data: T) -> (UnsafeArc, UnsafeArc) { - unsafe { - let ptr = new_inner(data, 2); - (UnsafeArc { data: ptr }, UnsafeArc { data: ptr }) - } - } - - /// As new(), but returns a vector of as many pre-cloned handles as - /// requested. - pub fn newN(data: T, num_handles: uint) -> Vec> { - unsafe { - if num_handles == 0 { - vec![] // need to free data here - } else { - let ptr = new_inner(data, num_handles); - let v = Vec::from_fn(num_handles, |_| UnsafeArc { data: ptr }); - v - } - } - } - - /// Gets a pointer to the inner shared data. Note that care must be taken to - /// ensure that the outer `UnsafeArc` does not fall out of scope while this - /// pointer is in use, otherwise it could possibly contain a use-after-free. - #[inline] - pub fn get(&self) -> *mut T { - unsafe { - debug_assert!((*self.data).count.load(Relaxed) > 0); - return (*self.data).data.get(); - } - } - - /// Gets an immutable pointer to the inner shared data. This has the same - /// caveats as the `get` method. - #[inline] - pub fn get_immut(&self) -> *T { - unsafe { - debug_assert!((*self.data).count.load(Relaxed) > 0); - return (*self.data).data.get() as *T; - } - } - - /// checks if this is the only reference to the arc protected data - #[inline] - pub fn is_owned(&self) -> bool { - unsafe { - (*self.data).count.load(Relaxed) == 1 - } - } -} - -impl Clone for UnsafeArc { - fn clone(&self) -> UnsafeArc { - unsafe { - // Using a relaxed ordering is alright here, as knowledge of the original reference - // prevents other threads from erroneously deleting the object. - // - // As explained in the [Boost documentation][1], - // Increasing the reference counter can always be done with memory_order_relaxed: New - // references to an object can only be formed from an existing reference, and passing - // an existing reference from one thread to another must already provide any required - // synchronization. - // [1]: (www.boost.org/doc/libs/1_55_0/doc/html/atomic/usage_examples.html) - let old_count = (*self.data).count.fetch_add(1, Relaxed); - debug_assert!(old_count >= 1); - return UnsafeArc { data: self.data }; - } - } -} - -#[unsafe_destructor] -impl Drop for UnsafeArc{ - fn drop(&mut self) { - unsafe { - // Happens when destructing an unwrapper's handle and from - // `#[unsafe_no_drop_flag]` - if self.data.is_null() { - return - } - // Because `fetch_sub` is already atomic, we do not need to synchronize with other - // threads unless we are going to delete the object. - let old_count = (*self.data).count.fetch_sub(1, Release); - debug_assert!(old_count >= 1); - if old_count == 1 { - // This fence is needed to prevent reordering of use of the data and deletion of - // the data. Because it is marked `Release`, the decreasing of the reference count - // sychronizes with this `Acquire` fence. This means that use of the data happens - // before decreasing the refernce count, which happens before this fence, which - // happens before the deletion of the data. - // - // As explained in the [Boost documentation][1], - // It is important to enforce any possible access to the object in one thread - // (through an existing reference) to *happen before* deleting the object in a - // different thread. This is achieved by a "release" operation after dropping a - // reference (any access to the object through this reference must obviously - // happened before), and an "acquire" operation before deleting the object. - // [1]: (www.boost.org/doc/libs/1_55_0/doc/html/atomic/usage_examples.html) - fence(Acquire); - let _: Box> = mem::transmute(self.data); - } - } - } -} - -#[cfg(test)] -mod tests { - use prelude::*; - use super::UnsafeArc; - use mem::size_of; - - #[test] - fn test_size() { - assert_eq!(size_of::>(), size_of::<*[int, ..10]>()); - } - - #[test] - fn arclike_newN() { - // Tests that the many-refcounts-at-once constructors don't leak. - let _ = UnsafeArc::new2("hello".to_owned().to_owned()); - let x = UnsafeArc::newN("hello".to_owned().to_owned(), 0); - assert_eq!(x.len(), 0) - let x = UnsafeArc::newN("hello".to_owned().to_owned(), 1); - assert_eq!(x.len(), 1) - let x = UnsafeArc::newN("hello".to_owned().to_owned(), 10); - assert_eq!(x.len(), 10) - } -} diff --git a/src/libstd/sync/deque.rs b/src/libstd/sync/deque.rs index 42a8bd88652..c6446775b0c 100644 --- a/src/libstd/sync/deque.rs +++ b/src/libstd/sync/deque.rs @@ -417,8 +417,8 @@ mod tests { #[test] fn smoke() { - let mut pool = BufferPool::new(); - let (mut w, mut s) = pool.deque(); + let pool = BufferPool::new(); + let (w, s) = pool.deque(); assert_eq!(w.pop(), None); assert_eq!(s.steal(), Empty); w.push(1); @@ -432,10 +432,9 @@ mod tests { #[test] fn stealpush() { static AMT: int = 100000; - let mut pool = BufferPool::::new(); - let (mut w, s) = pool.deque(); + let pool = BufferPool::::new(); + let (w, s) = pool.deque(); let t = Thread::start(proc() { - let mut s = s; let mut left = AMT; while left > 0 { match s.steal() { @@ -458,10 +457,9 @@ mod tests { #[test] fn stealpush_large() { static AMT: int = 100000; - let mut pool = BufferPool::<(int, int)>::new(); - let (mut w, s) = pool.deque(); + let pool = BufferPool::<(int, int)>::new(); + let (w, s) = pool.deque(); let t = Thread::start(proc() { - let mut s = s; let mut left = AMT; while left > 0 { match s.steal() { @@ -479,7 +477,7 @@ mod tests { t.join(); } - fn stampede(mut w: Worker>, s: Stealer>, + fn stampede(w: Worker>, s: Stealer>, nthreads: int, amt: uint) { for _ in range(0, amt) { w.push(box 20); @@ -491,7 +489,6 @@ mod tests { let s = s.clone(); Thread::start(proc() { unsafe { - let mut s = s; while (*unsafe_remaining).load(SeqCst) > 0 { match s.steal() { Data(box 20) => { @@ -520,7 +517,7 @@ mod tests { #[test] fn run_stampede() { - let mut pool = BufferPool::>::new(); + let pool = BufferPool::>::new(); let (w, s) = pool.deque(); stampede(w, s, 8, 10000); } @@ -528,7 +525,7 @@ mod tests { #[test] fn many_stampede() { static AMT: uint = 4; - let mut pool = BufferPool::>::new(); + let pool = BufferPool::>::new(); let threads = range(0, AMT).map(|_| { let (w, s) = pool.deque(); Thread::start(proc() { @@ -547,14 +544,13 @@ mod tests { static NTHREADS: int = 8; static mut DONE: AtomicBool = INIT_ATOMIC_BOOL; static mut HITS: AtomicUint = INIT_ATOMIC_UINT; - let mut pool = BufferPool::::new(); - let (mut w, s) = pool.deque(); + let pool = BufferPool::::new(); + let (w, s) = pool.deque(); let threads = range(0, NTHREADS).map(|_| { let s = s.clone(); Thread::start(proc() { unsafe { - let mut s = s; loop { match s.steal() { Data(2) => { HITS.fetch_add(1, SeqCst); } @@ -606,8 +602,8 @@ mod tests { static AMT: int = 10000; static NTHREADS: int = 4; static mut DONE: AtomicBool = INIT_ATOMIC_BOOL; - let mut pool = BufferPool::<(int, uint)>::new(); - let (mut w, s) = pool.deque(); + let pool = BufferPool::<(int, uint)>::new(); + let (w, s) = pool.deque(); let (threads, hits) = vec::unzip(range(0, NTHREADS).map(|_| { let s = s.clone(); @@ -617,7 +613,6 @@ mod tests { }; (Thread::start(proc() { unsafe { - let mut s = s; loop { match s.steal() { Data((1, 2)) => { diff --git a/src/libstd/sync/mod.rs b/src/libstd/sync/mod.rs index 3213c538152..b2cf427edc8 100644 --- a/src/libstd/sync/mod.rs +++ b/src/libstd/sync/mod.rs @@ -15,7 +15,6 @@ //! and/or blocking at all, but rather provide the necessary tools to build //! other types of concurrent primitives. -pub mod arc; pub mod atomics; pub mod deque; pub mod mpmc_bounded_queue; diff --git a/src/libstd/sync/mpmc_bounded_queue.rs b/src/libstd/sync/mpmc_bounded_queue.rs index 7fb98e14086..ffad9c1c583 100644 --- a/src/libstd/sync/mpmc_bounded_queue.rs +++ b/src/libstd/sync/mpmc_bounded_queue.rs @@ -173,7 +173,7 @@ mod tests { fn test() { let nthreads = 8u; let nmsgs = 1000u; - let mut q = Queue::with_capacity(nthreads*nmsgs); + let q = Queue::with_capacity(nthreads*nmsgs); assert_eq!(None, q.pop()); let (tx, rx) = channel(); @@ -181,7 +181,7 @@ mod tests { let q = q.clone(); let tx = tx.clone(); native::task::spawn(proc() { - let mut q = q; + let q = q; for i in range(0, nmsgs) { assert!(q.push(i)); } @@ -195,7 +195,7 @@ mod tests { completion_rxs.push(rx); let q = q.clone(); native::task::spawn(proc() { - let mut q = q; + let q = q; let mut i = 0u; loop { match q.pop() { diff --git a/src/libstd/sync/mpsc_queue.rs b/src/libstd/sync/mpsc_queue.rs index f2f95da1842..4db24e82d37 100644 --- a/src/libstd/sync/mpsc_queue.rs +++ b/src/libstd/sync/mpsc_queue.rs @@ -165,7 +165,7 @@ mod tests { #[test] fn test_full() { - let mut q = Queue::new(); + let q = Queue::new(); q.push(box 1); q.push(box 2); } @@ -174,7 +174,7 @@ mod tests { fn test() { let nthreads = 8u; let nmsgs = 1000u; - let mut q = Queue::new(); + let q = Queue::new(); match q.pop() { Empty => {} Inconsistent | Data(..) => fail!() diff --git a/src/libstd/sync/spsc_queue.rs b/src/libstd/sync/spsc_queue.rs index 093933c82fc..fb515c9db6e 100644 --- a/src/libstd/sync/spsc_queue.rs +++ b/src/libstd/sync/spsc_queue.rs @@ -235,7 +235,7 @@ mod test { #[test] fn smoke() { - let mut q = Queue::new(0); + let q = Queue::new(0); q.push(1); q.push(2); assert_eq!(q.pop(), Some(1)); @@ -250,14 +250,14 @@ mod test { #[test] fn drop_full() { - let mut q = Queue::new(0); + let q = Queue::new(0); q.push(box 1); q.push(box 2); } #[test] fn smoke_bound() { - let mut q = Queue::new(1); + let q = Queue::new(1); q.push(1); q.push(2); assert_eq!(q.pop(), Some(1)); @@ -282,7 +282,7 @@ mod test { native::task::spawn(proc() { for _ in range(0, 100000) { loop { - match unsafe { (*b.get()).pop() } { + match b.pop() { Some(1) => break, Some(_) => fail!(), None => {} @@ -292,7 +292,7 @@ mod test { tx.send(()); }); for _ in range(0, 100000) { - unsafe { (*a.get()).push(1); } + a.push(1); } rx.recv(); } -- cgit 1.4.1-3-g733a5 From fdf935a5249edd0be0f14385a099963e43c7a29b Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Tue, 20 May 2014 18:54:31 -0700 Subject: std,green: Mark some queue types as NoShare --- src/libgreen/message_queue.rs | 8 ++++++-- src/libnative/io/net.rs | 4 ++-- src/libnative/io/pipe_unix.rs | 2 +- src/libnative/io/pipe_win32.rs | 16 ++++++++-------- src/libstd/sync/deque.rs | 12 +++++++++--- 5 files changed, 26 insertions(+), 16 deletions(-) (limited to 'src/libstd/sync') diff --git a/src/libgreen/message_queue.rs b/src/libgreen/message_queue.rs index 99dbf9c8919..137c4933645 100644 --- a/src/libgreen/message_queue.rs +++ b/src/libgreen/message_queue.rs @@ -10,6 +10,7 @@ use alloc::arc::Arc; use mpsc = std::sync::mpsc_queue; +use std::kinds::marker; pub enum PopResult { Inconsistent, @@ -19,15 +20,18 @@ pub enum PopResult { pub fn queue() -> (Consumer, Producer) { let a = Arc::new(mpsc::Queue::new()); - (Consumer { inner: a.clone() }, Producer { inner: a }) + (Consumer { inner: a.clone(), noshare: marker::NoShare }, + Producer { inner: a, noshare: marker::NoShare }) } pub struct Producer { inner: Arc>, + noshare: marker::NoShare, } pub struct Consumer { inner: Arc>, + noshare: marker::NoShare, } impl Consumer { @@ -56,6 +60,6 @@ impl Producer { impl Clone for Producer { fn clone(&self) -> Producer { - Producer { inner: self.inner.clone() } + Producer { inner: self.inner.clone(), noshare: marker::NoShare } } } diff --git a/src/libnative/io/net.rs b/src/libnative/io/net.rs index a67d0439dbf..8bd8bc71a49 100644 --- a/src/libnative/io/net.rs +++ b/src/libnative/io/net.rs @@ -326,7 +326,7 @@ impl TcpStream { fn lock_nonblocking<'a>(&'a self) -> Guard<'a> { let ret = Guard { fd: self.fd(), - guard: unsafe { (*self.inner.get()).lock.lock() }, + guard: unsafe { self.inner.lock.lock() }, }; assert!(util::set_nonblocking(self.fd(), true).is_ok()); ret @@ -597,7 +597,7 @@ impl UdpSocket { fn lock_nonblocking<'a>(&'a self) -> Guard<'a> { let ret = Guard { fd: self.fd(), - guard: unsafe { (*self.inner.get()).lock.lock() }, + guard: unsafe { self.inner.lock.lock() }, }; assert!(util::set_nonblocking(self.fd(), true).is_ok()); ret diff --git a/src/libnative/io/pipe_unix.rs b/src/libnative/io/pipe_unix.rs index 8742fc58af1..a53a58b6cec 100644 --- a/src/libnative/io/pipe_unix.rs +++ b/src/libnative/io/pipe_unix.rs @@ -138,7 +138,7 @@ impl UnixStream { fn lock_nonblocking<'a>(&'a self) -> net::Guard<'a> { let ret = net::Guard { fd: self.fd(), - guard: self.inner.lock.lock(), + guard: unsafe { self.inner.lock.lock() }, }; assert!(util::set_nonblocking(self.fd(), true).is_ok()); ret diff --git a/src/libnative/io/pipe_win32.rs b/src/libnative/io/pipe_win32.rs index c90c3824bba..c9dbdc8331b 100644 --- a/src/libnative/io/pipe_win32.rs +++ b/src/libnative/io/pipe_win32.rs @@ -320,11 +320,11 @@ impl UnixStream { fn handle(&self) -> libc::HANDLE { self.inner.handle } fn read_closed(&self) -> bool { - unsafe { (*self.inner.get()).read_closed.load(atomics::SeqCst) } + self.inner.read_closed.load(atomics::SeqCst) } fn write_closed(&self) -> bool { - unsafe { (*self.inner.get()).write_closed.load(atomics::SeqCst) } + self.inner.write_closed.load(atomics::SeqCst) } fn cancel_io(&self) -> IoResult<()> { @@ -353,7 +353,7 @@ impl rtio::RtioPipe for UnixStream { // acquire the lock. // // See comments in close_read() about why this lock is necessary. - let guard = unsafe { (*self.inner.get()).lock.lock() }; + let guard = unsafe { self.inner.lock.lock() }; if self.read_closed() { return Err(io::standard_error(io::EndOfFile)) } @@ -429,7 +429,7 @@ impl rtio::RtioPipe for UnixStream { // going after we woke up. // // See comments in close_read() about why this lock is necessary. - let guard = unsafe { (*self.inner.get()).lock.lock() }; + let guard = unsafe { self.inner.lock.lock() }; if self.write_closed() { return Err(io::standard_error(io::BrokenPipe)) } @@ -514,15 +514,15 @@ impl rtio::RtioPipe for UnixStream { // close_read() between steps 1 and 2. By atomically executing steps 1 // and 2 with a lock with respect to close_read(), we're guaranteed that // no thread will erroneously sit in a read forever. - let _guard = unsafe { (*self.inner.get()).lock.lock() }; - unsafe { (*self.inner.get()).read_closed.store(true, atomics::SeqCst) } + let _guard = unsafe { self.inner.lock.lock() }; + self.inner.read_closed.store(true, atomics::SeqCst); self.cancel_io() } fn close_write(&mut self) -> IoResult<()> { // see comments in close_read() for why this lock is necessary - let _guard = unsafe { (*self.inner.get()).lock.lock() }; - unsafe { (*self.inner.get()).write_closed.store(true, atomics::SeqCst) } + let _guard = unsafe { self.inner.lock.lock() }; + self.inner.write_closed.store(true, atomics::SeqCst); self.cancel_io() } diff --git a/src/libstd/sync/deque.rs b/src/libstd/sync/deque.rs index c6446775b0c..a3fdc4d3eaf 100644 --- a/src/libstd/sync/deque.rs +++ b/src/libstd/sync/deque.rs @@ -53,16 +53,17 @@ use alloc::arc::Arc; use clone::Clone; use iter::{range, Iterator}; use kinds::Send; +use kinds::marker; use mem::{forget, min_align_of, size_of, transmute}; use ops::Drop; use option::{Option, Some, None}; use owned::Box; use ptr::RawPtr; use ptr; +use rt::heap::{allocate, deallocate}; use slice::ImmutableVector; use sync::atomics::{AtomicInt, AtomicPtr, SeqCst}; use unstable::sync::Exclusive; -use rt::heap::{allocate, deallocate}; use vec::Vec; // Once the queue is less than 1/K full, then it will be downsized. Note that @@ -89,6 +90,7 @@ struct Deque { /// There may only be one worker per deque. pub struct Worker { deque: Arc>, + noshare: marker::NoShare, } /// The stealing half of the work-stealing deque. Stealers have access to the @@ -96,6 +98,7 @@ pub struct Worker { /// `steal` method. pub struct Stealer { deque: Arc>, + noshare: marker::NoShare, } /// When stealing some data, this is an enumeration of the possible outcomes. @@ -153,7 +156,8 @@ impl BufferPool { pub fn deque(&self) -> (Worker, Stealer) { let a = Arc::new(Deque::new(self.clone())); let b = a.clone(); - (Worker { deque: a }, Stealer { deque: b }) + (Worker { deque: a, noshare: marker::NoShare }, + Stealer { deque: b, noshare: marker::NoShare }) } fn alloc(&self, bits: int) -> Box> { @@ -219,7 +223,9 @@ impl Stealer { } impl Clone for Stealer { - fn clone(&self) -> Stealer { Stealer { deque: self.deque.clone() } } + fn clone(&self) -> Stealer { + Stealer { deque: self.deque.clone(), noshare: marker::NoShare } + } } // Almost all of this code can be found directly in the paper so I'm not -- cgit 1.4.1-3-g733a5