diff options
| author | bors <bors@rust-lang.org> | 2014-05-21 17:31:29 -0700 |
|---|---|---|
| committer | bors <bors@rust-lang.org> | 2014-05-21 17:31:29 -0700 |
| commit | 257a73ce8273d026f2af1a5021ae2d1a4e7b95e5 (patch) | |
| tree | 2f7d66fc0f7de80105252babe27757e7ea94951a /src/libstd/sync/spsc_queue.rs | |
| parent | 5f3f0918ad70cd9b0bfcd2f93aea0218ec92fb87 (diff) | |
| parent | fdf935a5249edd0be0f14385a099963e43c7a29b (diff) | |
| download | rust-257a73ce8273d026f2af1a5021ae2d1a4e7b95e5.tar.gz rust-257a73ce8273d026f2af1a5021ae2d1a4e7b95e5.zip | |
auto merge of #14301 : alexcrichton/rust/remove-unsafe-arc, r=brson
This type can be built with `Arc<Unsafe<T>>` now that liballoc exists.
Diffstat (limited to 'src/libstd/sync/spsc_queue.rs')
| -rw-r--r-- | src/libstd/sync/spsc_queue.rs | 70 |
1 files changed, 37 insertions, 33 deletions
diff --git a/src/libstd/sync/spsc_queue.rs b/src/libstd/sync/spsc_queue.rs index ed6d690def0..fb515c9db6e 100644 --- a/src/libstd/sync/spsc_queue.rs +++ b/src/libstd/sync/spsc_queue.rs @@ -40,6 +40,7 @@ use option::{Some, None, Option}; use owned::Box; use ptr::RawPtr; use sync::atomics::{AtomicPtr, Relaxed, AtomicUint, Acquire, Release}; +use ty::Unsafe; // Node within the linked list queue of messages to send struct Node<T> { @@ -51,18 +52,18 @@ struct Node<T> { } /// The single-producer single-consumer queue. This structure is not cloneable, -/// but it can be safely shared in an UnsafeArc if it is guaranteed that there +/// but it can be safely shared in an Arc if it is guaranteed that there /// is only one popper and one pusher touching the queue at any one point in /// time. pub struct Queue<T> { // consumer fields - tail: *mut Node<T>, // where to pop from + tail: Unsafe<*mut Node<T>>, // where to pop from tail_prev: AtomicPtr<Node<T>>, // where to pop from // producer fields - head: *mut Node<T>, // where to push to - first: *mut Node<T>, // where to get new nodes from - tail_copy: *mut Node<T>, // between first/tail + head: Unsafe<*mut Node<T>>, // where to push to + first: Unsafe<*mut Node<T>>, // where to get new nodes from + tail_copy: Unsafe<*mut Node<T>>, // between first/tail // Cache maintenance fields. Additions and subtractions are stored // separately in order to allow them to use nonatomic addition/subtraction. @@ -101,11 +102,11 @@ impl<T: Send> Queue<T> { let n2 = Node::new(); unsafe { (*n1).next.store(n2, Relaxed) } Queue { - tail: n2, + tail: Unsafe::new(n2), tail_prev: AtomicPtr::new(n1), - head: n2, - first: n1, - tail_copy: n1, + head: Unsafe::new(n2), + first: Unsafe::new(n1), + tail_copy: Unsafe::new(n1), cache_bound: bound, cache_additions: AtomicUint::new(0), cache_subtractions: AtomicUint::new(0), @@ -114,7 +115,7 @@ impl<T: Send> Queue<T> { /// Pushes a new value onto this queue. Note that to use this function /// safely, it must be externally guaranteed that there is only one pusher. - pub fn push(&mut self, t: T) { + pub fn push(&self, t: T) { unsafe { // Acquire a node (which either uses a cached one or allocates a new // one), and then append this to the 'head' node. @@ -122,35 +123,35 @@ impl<T: Send> Queue<T> { assert!((*n).value.is_none()); (*n).value = Some(t); (*n).next.store(0 as *mut Node<T>, Relaxed); - (*self.head).next.store(n, Release); - self.head = n; + (**self.head.get()).next.store(n, Release); + *self.head.get() = n; } } - unsafe fn alloc(&mut self) -> *mut Node<T> { + unsafe fn alloc(&self) -> *mut Node<T> { // First try to see if we can consume the 'first' node for our uses. // We try to avoid as many atomic instructions as possible here, so // the addition to cache_subtractions is not atomic (plus we're the // only one subtracting from the cache). - if self.first != self.tail_copy { + if *self.first.get() != *self.tail_copy.get() { if self.cache_bound > 0 { let b = self.cache_subtractions.load(Relaxed); self.cache_subtractions.store(b + 1, Relaxed); } - let ret = self.first; - self.first = (*ret).next.load(Relaxed); + let ret = *self.first.get(); + *self.first.get() = (*ret).next.load(Relaxed); return ret; } // If the above fails, then update our copy of the tail and try // again. - self.tail_copy = self.tail_prev.load(Acquire); - if self.first != self.tail_copy { + *self.tail_copy.get() = self.tail_prev.load(Acquire); + if *self.first.get() != *self.tail_copy.get() { if self.cache_bound > 0 { let b = self.cache_subtractions.load(Relaxed); self.cache_subtractions.store(b + 1, Relaxed); } - let ret = self.first; - self.first = (*ret).next.load(Relaxed); + let ret = *self.first.get(); + *self.first.get() = (*ret).next.load(Relaxed); return ret; } // If all of that fails, then we have to allocate a new node @@ -160,19 +161,19 @@ impl<T: Send> Queue<T> { /// Attempts to pop a value from this queue. Remember that to use this type /// safely you must ensure that there is only one popper at a time. - pub fn pop(&mut self) -> Option<T> { + pub fn pop(&self) -> Option<T> { unsafe { // The `tail` node is not actually a used node, but rather a // sentinel from where we should start popping from. Hence, look at // tail's next field and see if we can use it. If we do a pop, then // the current tail node is a candidate for going into the cache. - let tail = self.tail; + let tail = *self.tail.get(); let next = (*tail).next.load(Acquire); if next.is_null() { return None } assert!((*next).value.is_some()); let ret = (*next).value.take(); - self.tail = next; + *self.tail.get() = next; if self.cache_bound == 0 { self.tail_prev.store(tail, Release); } else { @@ -197,11 +198,11 @@ impl<T: Send> Queue<T> { /// Attempts to peek at the head of the queue, returning `None` if the queue /// has no data currently - pub fn peek<'a>(&'a mut self) -> Option<&'a mut T> { + pub fn peek<'a>(&'a self) -> Option<&'a mut T> { // This is essentially the same as above with all the popping bits // stripped out. unsafe { - let tail = self.tail; + let tail = *self.tail.get(); let next = (*tail).next.load(Acquire); if next.is_null() { return None } return (*next).value.as_mut(); @@ -213,7 +214,7 @@ impl<T: Send> Queue<T> { impl<T: Send> Drop for Queue<T> { fn drop(&mut self) { unsafe { - let mut cur = self.first; + let mut cur = *self.first.get(); while !cur.is_null() { let next = (*cur).next.load(Relaxed); let _n: Box<Node<T>> = mem::transmute(cur); @@ -226,13 +227,15 @@ impl<T: Send> Drop for Queue<T> { #[cfg(test)] mod test { use prelude::*; + + use alloc::arc::Arc; use native; + use super::Queue; - use sync::arc::UnsafeArc; #[test] fn smoke() { - let mut q = Queue::new(0); + let q = Queue::new(0); q.push(1); q.push(2); assert_eq!(q.pop(), Some(1)); @@ -247,14 +250,14 @@ mod test { #[test] fn drop_full() { - let mut q = Queue::new(0); + let q = Queue::new(0); q.push(box 1); q.push(box 2); } #[test] fn smoke_bound() { - let mut q = Queue::new(1); + let q = Queue::new(1); q.push(1); q.push(2); assert_eq!(q.pop(), Some(1)); @@ -273,12 +276,13 @@ mod test { stress_bound(1); fn stress_bound(bound: uint) { - let (a, b) = UnsafeArc::new2(Queue::new(bound)); + let a = Arc::new(Queue::new(bound)); + let b = a.clone(); let (tx, rx) = channel(); native::task::spawn(proc() { for _ in range(0, 100000) { loop { - match unsafe { (*b.get()).pop() } { + match b.pop() { Some(1) => break, Some(_) => fail!(), None => {} @@ -288,7 +292,7 @@ mod test { tx.send(()); }); for _ in range(0, 100000) { - unsafe { (*a.get()).push(1); } + a.push(1); } rx.recv(); } |
