about summary refs log tree commit diff
path: root/src/libstd/sync/spsc_queue.rs
diff options
context:
space:
mode:
authorbors <bors@rust-lang.org>2014-05-21 17:31:29 -0700
committerbors <bors@rust-lang.org>2014-05-21 17:31:29 -0700
commit257a73ce8273d026f2af1a5021ae2d1a4e7b95e5 (patch)
tree2f7d66fc0f7de80105252babe27757e7ea94951a /src/libstd/sync/spsc_queue.rs
parent5f3f0918ad70cd9b0bfcd2f93aea0218ec92fb87 (diff)
parentfdf935a5249edd0be0f14385a099963e43c7a29b (diff)
downloadrust-257a73ce8273d026f2af1a5021ae2d1a4e7b95e5.tar.gz
rust-257a73ce8273d026f2af1a5021ae2d1a4e7b95e5.zip
auto merge of #14301 : alexcrichton/rust/remove-unsafe-arc, r=brson
This type can be built with `Arc<Unsafe<T>>` now that liballoc exists.
Diffstat (limited to 'src/libstd/sync/spsc_queue.rs')
-rw-r--r--src/libstd/sync/spsc_queue.rs70
1 files changed, 37 insertions, 33 deletions
diff --git a/src/libstd/sync/spsc_queue.rs b/src/libstd/sync/spsc_queue.rs
index ed6d690def0..fb515c9db6e 100644
--- a/src/libstd/sync/spsc_queue.rs
+++ b/src/libstd/sync/spsc_queue.rs
@@ -40,6 +40,7 @@ use option::{Some, None, Option};
 use owned::Box;
 use ptr::RawPtr;
 use sync::atomics::{AtomicPtr, Relaxed, AtomicUint, Acquire, Release};
+use ty::Unsafe;
 
 // Node within the linked list queue of messages to send
 struct Node<T> {
@@ -51,18 +52,18 @@ struct Node<T> {
 }
 
 /// The single-producer single-consumer queue. This structure is not cloneable,
-/// but it can be safely shared in an UnsafeArc if it is guaranteed that there
+/// but it can be safely shared in an Arc if it is guaranteed that there
 /// is only one popper and one pusher touching the queue at any one point in
 /// time.
 pub struct Queue<T> {
     // consumer fields
-    tail: *mut Node<T>, // where to pop from
+    tail: Unsafe<*mut Node<T>>, // where to pop from
     tail_prev: AtomicPtr<Node<T>>, // where to pop from
 
     // producer fields
-    head: *mut Node<T>,      // where to push to
-    first: *mut Node<T>,     // where to get new nodes from
-    tail_copy: *mut Node<T>, // between first/tail
+    head: Unsafe<*mut Node<T>>,      // where to push to
+    first: Unsafe<*mut Node<T>>,     // where to get new nodes from
+    tail_copy: Unsafe<*mut Node<T>>, // between first/tail
 
     // Cache maintenance fields. Additions and subtractions are stored
     // separately in order to allow them to use nonatomic addition/subtraction.
@@ -101,11 +102,11 @@ impl<T: Send> Queue<T> {
         let n2 = Node::new();
         unsafe { (*n1).next.store(n2, Relaxed) }
         Queue {
-            tail: n2,
+            tail: Unsafe::new(n2),
             tail_prev: AtomicPtr::new(n1),
-            head: n2,
-            first: n1,
-            tail_copy: n1,
+            head: Unsafe::new(n2),
+            first: Unsafe::new(n1),
+            tail_copy: Unsafe::new(n1),
             cache_bound: bound,
             cache_additions: AtomicUint::new(0),
             cache_subtractions: AtomicUint::new(0),
@@ -114,7 +115,7 @@ impl<T: Send> Queue<T> {
 
     /// Pushes a new value onto this queue. Note that to use this function
     /// safely, it must be externally guaranteed that there is only one pusher.
-    pub fn push(&mut self, t: T) {
+    pub fn push(&self, t: T) {
         unsafe {
             // Acquire a node (which either uses a cached one or allocates a new
             // one), and then append this to the 'head' node.
@@ -122,35 +123,35 @@ impl<T: Send> Queue<T> {
             assert!((*n).value.is_none());
             (*n).value = Some(t);
             (*n).next.store(0 as *mut Node<T>, Relaxed);
-            (*self.head).next.store(n, Release);
-            self.head = n;
+            (**self.head.get()).next.store(n, Release);
+            *self.head.get() = n;
         }
     }
 
-    unsafe fn alloc(&mut self) -> *mut Node<T> {
+    unsafe fn alloc(&self) -> *mut Node<T> {
         // First try to see if we can consume the 'first' node for our uses.
         // We try to avoid as many atomic instructions as possible here, so
         // the addition to cache_subtractions is not atomic (plus we're the
         // only one subtracting from the cache).
-        if self.first != self.tail_copy {
+        if *self.first.get() != *self.tail_copy.get() {
             if self.cache_bound > 0 {
                 let b = self.cache_subtractions.load(Relaxed);
                 self.cache_subtractions.store(b + 1, Relaxed);
             }
-            let ret = self.first;
-            self.first = (*ret).next.load(Relaxed);
+            let ret = *self.first.get();
+            *self.first.get() = (*ret).next.load(Relaxed);
             return ret;
         }
         // If the above fails, then update our copy of the tail and try
         // again.
-        self.tail_copy = self.tail_prev.load(Acquire);
-        if self.first != self.tail_copy {
+        *self.tail_copy.get() = self.tail_prev.load(Acquire);
+        if *self.first.get() != *self.tail_copy.get() {
             if self.cache_bound > 0 {
                 let b = self.cache_subtractions.load(Relaxed);
                 self.cache_subtractions.store(b + 1, Relaxed);
             }
-            let ret = self.first;
-            self.first = (*ret).next.load(Relaxed);
+            let ret = *self.first.get();
+            *self.first.get() = (*ret).next.load(Relaxed);
             return ret;
         }
         // If all of that fails, then we have to allocate a new node
@@ -160,19 +161,19 @@ impl<T: Send> Queue<T> {
 
     /// Attempts to pop a value from this queue. Remember that to use this type
     /// safely you must ensure that there is only one popper at a time.
-    pub fn pop(&mut self) -> Option<T> {
+    pub fn pop(&self) -> Option<T> {
         unsafe {
             // The `tail` node is not actually a used node, but rather a
             // sentinel from where we should start popping from. Hence, look at
             // tail's next field and see if we can use it. If we do a pop, then
             // the current tail node is a candidate for going into the cache.
-            let tail = self.tail;
+            let tail = *self.tail.get();
             let next = (*tail).next.load(Acquire);
             if next.is_null() { return None }
             assert!((*next).value.is_some());
             let ret = (*next).value.take();
 
-            self.tail = next;
+            *self.tail.get() = next;
             if self.cache_bound == 0 {
                 self.tail_prev.store(tail, Release);
             } else {
@@ -197,11 +198,11 @@ impl<T: Send> Queue<T> {
 
     /// Attempts to peek at the head of the queue, returning `None` if the queue
     /// has no data currently
-    pub fn peek<'a>(&'a mut self) -> Option<&'a mut T> {
+    pub fn peek<'a>(&'a self) -> Option<&'a mut T> {
         // This is essentially the same as above with all the popping bits
         // stripped out.
         unsafe {
-            let tail = self.tail;
+            let tail = *self.tail.get();
             let next = (*tail).next.load(Acquire);
             if next.is_null() { return None }
             return (*next).value.as_mut();
@@ -213,7 +214,7 @@ impl<T: Send> Queue<T> {
 impl<T: Send> Drop for Queue<T> {
     fn drop(&mut self) {
         unsafe {
-            let mut cur = self.first;
+            let mut cur = *self.first.get();
             while !cur.is_null() {
                 let next = (*cur).next.load(Relaxed);
                 let _n: Box<Node<T>> = mem::transmute(cur);
@@ -226,13 +227,15 @@ impl<T: Send> Drop for Queue<T> {
 #[cfg(test)]
 mod test {
     use prelude::*;
+
+    use alloc::arc::Arc;
     use native;
+
     use super::Queue;
-    use sync::arc::UnsafeArc;
 
     #[test]
     fn smoke() {
-        let mut q = Queue::new(0);
+        let q = Queue::new(0);
         q.push(1);
         q.push(2);
         assert_eq!(q.pop(), Some(1));
@@ -247,14 +250,14 @@ mod test {
 
     #[test]
     fn drop_full() {
-        let mut q = Queue::new(0);
+        let q = Queue::new(0);
         q.push(box 1);
         q.push(box 2);
     }
 
     #[test]
     fn smoke_bound() {
-        let mut q = Queue::new(1);
+        let q = Queue::new(1);
         q.push(1);
         q.push(2);
         assert_eq!(q.pop(), Some(1));
@@ -273,12 +276,13 @@ mod test {
         stress_bound(1);
 
         fn stress_bound(bound: uint) {
-            let (a, b) = UnsafeArc::new2(Queue::new(bound));
+            let a = Arc::new(Queue::new(bound));
+            let b = a.clone();
             let (tx, rx) = channel();
             native::task::spawn(proc() {
                 for _ in range(0, 100000) {
                     loop {
-                        match unsafe { (*b.get()).pop() } {
+                        match b.pop() {
                             Some(1) => break,
                             Some(_) => fail!(),
                             None => {}
@@ -288,7 +292,7 @@ mod test {
                 tx.send(());
             });
             for _ in range(0, 100000) {
-                unsafe { (*a.get()).push(1); }
+                a.push(1);
             }
             rx.recv();
         }