about summary refs log tree commit diff
path: root/src/libstd/sync
diff options
context:
space:
mode:
authorAlex Crichton <alex@alexcrichton.com>2014-05-19 17:32:04 -0700
committerAlex Crichton <alex@alexcrichton.com>2014-05-19 17:32:04 -0700
commitfe93c3d47ed0cdf0a0cbac66a9f35ddb4c6783a2 (patch)
tree2402913e159bf6cbe21cbc3815b40b7e3b410a78 /src/libstd/sync
parent2966e970cabdf7103ad61c840c72bf58352150e0 (diff)
downloadrust-fe93c3d47ed0cdf0a0cbac66a9f35ddb4c6783a2.tar.gz
rust-fe93c3d47ed0cdf0a0cbac66a9f35ddb4c6783a2.zip
std: Rebuild spsc with Unsafe/&self
This removes the incorrect usage of `&mut self` in a concurrent setting.
Diffstat (limited to 'src/libstd/sync')
-rw-r--r--src/libstd/sync/spsc_queue.rs51
1 files changed, 26 insertions, 25 deletions
diff --git a/src/libstd/sync/spsc_queue.rs b/src/libstd/sync/spsc_queue.rs
index ed6d690def0..b9827ee6b2a 100644
--- a/src/libstd/sync/spsc_queue.rs
+++ b/src/libstd/sync/spsc_queue.rs
@@ -40,6 +40,7 @@ use option::{Some, None, Option};
 use owned::Box;
 use ptr::RawPtr;
 use sync::atomics::{AtomicPtr, Relaxed, AtomicUint, Acquire, Release};
+use ty::Unsafe;
 
 // Node within the linked list queue of messages to send
 struct Node<T> {
@@ -56,13 +57,13 @@ struct Node<T> {
 /// time.
 pub struct Queue<T> {
     // consumer fields
-    tail: *mut Node<T>, // where to pop from
+    tail: Unsafe<*mut Node<T>>, // where to pop from
     tail_prev: AtomicPtr<Node<T>>, // where to pop from
 
     // producer fields
-    head: *mut Node<T>,      // where to push to
-    first: *mut Node<T>,     // where to get new nodes from
-    tail_copy: *mut Node<T>, // between first/tail
+    head: Unsafe<*mut Node<T>>,      // where to push to
+    first: Unsafe<*mut Node<T>>,     // where to get new nodes from
+    tail_copy: Unsafe<*mut Node<T>>, // between first/tail
 
     // Cache maintenance fields. Additions and subtractions are stored
     // separately in order to allow them to use nonatomic addition/subtraction.
@@ -101,11 +102,11 @@ impl<T: Send> Queue<T> {
         let n2 = Node::new();
         unsafe { (*n1).next.store(n2, Relaxed) }
         Queue {
-            tail: n2,
+            tail: Unsafe::new(n2),
             tail_prev: AtomicPtr::new(n1),
-            head: n2,
-            first: n1,
-            tail_copy: n1,
+            head: Unsafe::new(n2),
+            first: Unsafe::new(n1),
+            tail_copy: Unsafe::new(n1),
             cache_bound: bound,
             cache_additions: AtomicUint::new(0),
             cache_subtractions: AtomicUint::new(0),
@@ -114,7 +115,7 @@ impl<T: Send> Queue<T> {
 
     /// Pushes a new value onto this queue. Note that to use this function
     /// safely, it must be externally guaranteed that there is only one pusher.
-    pub fn push(&mut self, t: T) {
+    pub fn push(&self, t: T) {
         unsafe {
             // Acquire a node (which either uses a cached one or allocates a new
             // one), and then append this to the 'head' node.
@@ -122,35 +123,35 @@ impl<T: Send> Queue<T> {
             assert!((*n).value.is_none());
             (*n).value = Some(t);
             (*n).next.store(0 as *mut Node<T>, Relaxed);
-            (*self.head).next.store(n, Release);
-            self.head = n;
+            (**self.head.get()).next.store(n, Release);
+            *self.head.get() = n;
         }
     }
 
-    unsafe fn alloc(&mut self) -> *mut Node<T> {
+    unsafe fn alloc(&self) -> *mut Node<T> {
         // First try to see if we can consume the 'first' node for our uses.
         // We try to avoid as many atomic instructions as possible here, so
         // the addition to cache_subtractions is not atomic (plus we're the
         // only one subtracting from the cache).
-        if self.first != self.tail_copy {
+        if *self.first.get() != *self.tail_copy.get() {
             if self.cache_bound > 0 {
                 let b = self.cache_subtractions.load(Relaxed);
                 self.cache_subtractions.store(b + 1, Relaxed);
             }
-            let ret = self.first;
-            self.first = (*ret).next.load(Relaxed);
+            let ret = *self.first.get();
+            *self.first.get() = (*ret).next.load(Relaxed);
             return ret;
         }
         // If the above fails, then update our copy of the tail and try
         // again.
-        self.tail_copy = self.tail_prev.load(Acquire);
-        if self.first != self.tail_copy {
+        *self.tail_copy.get() = self.tail_prev.load(Acquire);
+        if *self.first.get() != *self.tail_copy.get() {
             if self.cache_bound > 0 {
                 let b = self.cache_subtractions.load(Relaxed);
                 self.cache_subtractions.store(b + 1, Relaxed);
             }
-            let ret = self.first;
-            self.first = (*ret).next.load(Relaxed);
+            let ret = *self.first.get();
+            *self.first.get() = (*ret).next.load(Relaxed);
             return ret;
         }
         // If all of that fails, then we have to allocate a new node
@@ -160,19 +161,19 @@ impl<T: Send> Queue<T> {
 
     /// Attempts to pop a value from this queue. Remember that to use this type
     /// safely you must ensure that there is only one popper at a time.
-    pub fn pop(&mut self) -> Option<T> {
+    pub fn pop(&self) -> Option<T> {
         unsafe {
             // The `tail` node is not actually a used node, but rather a
             // sentinel from where we should start popping from. Hence, look at
             // tail's next field and see if we can use it. If we do a pop, then
             // the current tail node is a candidate for going into the cache.
-            let tail = self.tail;
+            let tail = *self.tail.get();
             let next = (*tail).next.load(Acquire);
             if next.is_null() { return None }
             assert!((*next).value.is_some());
             let ret = (*next).value.take();
 
-            self.tail = next;
+            *self.tail.get() = next;
             if self.cache_bound == 0 {
                 self.tail_prev.store(tail, Release);
             } else {
@@ -197,11 +198,11 @@ impl<T: Send> Queue<T> {
 
     /// Attempts to peek at the head of the queue, returning `None` if the queue
     /// has no data currently
-    pub fn peek<'a>(&'a mut self) -> Option<&'a mut T> {
+    pub fn peek<'a>(&'a self) -> Option<&'a mut T> {
         // This is essentially the same as above with all the popping bits
         // stripped out.
         unsafe {
-            let tail = self.tail;
+            let tail = *self.tail.get();
             let next = (*tail).next.load(Acquire);
             if next.is_null() { return None }
             return (*next).value.as_mut();
@@ -213,7 +214,7 @@ impl<T: Send> Queue<T> {
 impl<T: Send> Drop for Queue<T> {
     fn drop(&mut self) {
         unsafe {
-            let mut cur = self.first;
+            let mut cur = *self.first.get();
             while !cur.is_null() {
                 let next = (*cur).next.load(Relaxed);
                 let _n: Box<Node<T>> = mem::transmute(cur);