about summary refs log tree commit diff
path: root/src/libstd/sync
diff options
context:
space:
mode:
authorAlex Crichton <alex@alexcrichton.com>2014-05-19 16:01:48 -0700
committerAlex Crichton <alex@alexcrichton.com>2014-05-19 16:01:48 -0700
commit7db02e20f2140530a9402f7d7452b10cac6fdf7b (patch)
tree4318e0a2453a45b3d8ece84d0124d8b56db18869 /src/libstd/sync
parentefbd3724c012d68afd428beaa22f0d5aabff007d (diff)
downloadrust-7db02e20f2140530a9402f7d7452b10cac6fdf7b.tar.gz
rust-7db02e20f2140530a9402f7d7452b10cac6fdf7b.zip
std: Rebuild mpmc queues on Unsafe/Arc
This removes usage of UnsafeArc and uses proper self mutability for concurrent
types.
Diffstat (limited to 'src/libstd/sync')
-rw-r--r--src/libstd/sync/mpmc_bounded_queue.rs50
1 files changed, 27 insertions, 23 deletions
diff --git a/src/libstd/sync/mpmc_bounded_queue.rs b/src/libstd/sync/mpmc_bounded_queue.rs
index 2df5031b482..7fb98e14086 100644
--- a/src/libstd/sync/mpmc_bounded_queue.rs
+++ b/src/libstd/sync/mpmc_bounded_queue.rs
@@ -29,13 +29,15 @@
 
 // http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
 
+use alloc::arc::Arc;
+
 use clone::Clone;
 use kinds::Send;
 use num::next_power_of_two;
 use option::{Option, Some, None};
-use sync::arc::UnsafeArc;
 use sync::atomics::{AtomicUint,Relaxed,Release,Acquire};
 use vec::Vec;
+use ty::Unsafe;
 
 struct Node<T> {
     sequence: AtomicUint,
@@ -44,7 +46,7 @@ struct Node<T> {
 
 struct State<T> {
     pad0: [u8, ..64],
-    buffer: Vec<Node<T>>,
+    buffer: Vec<Unsafe<Node<T>>>,
     mask: uint,
     pad1: [u8, ..64],
     enqueue_pos: AtomicUint,
@@ -54,7 +56,7 @@ struct State<T> {
 }
 
 pub struct Queue<T> {
-    state: UnsafeArc<State<T>>,
+    state: Arc<State<T>>,
 }
 
 impl<T: Send> State<T> {
@@ -70,7 +72,7 @@ impl<T: Send> State<T> {
             capacity
         };
         let buffer = Vec::from_fn(capacity, |i| {
-            Node { sequence:AtomicUint::new(i), value: None }
+            Unsafe::new(Node { sequence:AtomicUint::new(i), value: None })
         });
         State{
             pad0: [0, ..64],
@@ -84,19 +86,21 @@ impl<T: Send> State<T> {
         }
     }
 
-    fn push(&mut self, value: T) -> bool {
+    fn push(&self, value: T) -> bool {
         let mask = self.mask;
         let mut pos = self.enqueue_pos.load(Relaxed);
         loop {
-            let node = self.buffer.get_mut(pos & mask);
-            let seq = node.sequence.load(Acquire);
+            let node = self.buffer.get(pos & mask);
+            let seq = unsafe { (*node.get()).sequence.load(Acquire) };
             let diff: int = seq as int - pos as int;
 
             if diff == 0 {
                 let enqueue_pos = self.enqueue_pos.compare_and_swap(pos, pos+1, Relaxed);
                 if enqueue_pos == pos {
-                    node.value = Some(value);
-                    node.sequence.store(pos+1, Release);
+                    unsafe {
+                        (*node.get()).value = Some(value);
+                        (*node.get()).sequence.store(pos+1, Release);
+                    }
                     break
                 } else {
                     pos = enqueue_pos;
@@ -110,19 +114,21 @@ impl<T: Send> State<T> {
         true
     }
 
-    fn pop(&mut self) -> Option<T> {
+    fn pop(&self) -> Option<T> {
         let mask = self.mask;
         let mut pos = self.dequeue_pos.load(Relaxed);
         loop {
-            let node = self.buffer.get_mut(pos & mask);
-            let seq = node.sequence.load(Acquire);
+            let node = self.buffer.get(pos & mask);
+            let seq = unsafe { (*node.get()).sequence.load(Acquire) };
             let diff: int = seq as int - (pos + 1) as int;
             if diff == 0 {
                 let dequeue_pos = self.dequeue_pos.compare_and_swap(pos, pos+1, Relaxed);
                 if dequeue_pos == pos {
-                    let value = node.value.take();
-                    node.sequence.store(pos + mask + 1, Release);
-                    return value
+                    unsafe {
+                        let value = (*node.get()).value.take();
+                        (*node.get()).sequence.store(pos + mask + 1, Release);
+                        return value
+                    }
                 } else {
                     pos = dequeue_pos;
                 }
@@ -138,24 +144,22 @@ impl<T: Send> State<T> {
 impl<T: Send> Queue<T> {
     pub fn with_capacity(capacity: uint) -> Queue<T> {
         Queue{
-            state: UnsafeArc::new(State::with_capacity(capacity))
+            state: Arc::new(State::with_capacity(capacity))
         }
     }
 
-    pub fn push(&mut self, value: T) -> bool {
-        unsafe { (*self.state.get()).push(value) }
+    pub fn push(&self, value: T) -> bool {
+        self.state.push(value)
     }
 
-    pub fn pop(&mut self) -> Option<T> {
-        unsafe { (*self.state.get()).pop() }
+    pub fn pop(&self) -> Option<T> {
+        self.state.pop()
     }
 }
 
 impl<T: Send> Clone for Queue<T> {
     fn clone(&self) -> Queue<T> {
-        Queue {
-            state: self.state.clone()
-        }
+        Queue { state: self.state.clone() }
     }
 }