diff options
| author | Alex Crichton <alex@alexcrichton.com> | 2014-05-19 16:01:48 -0700 |
|---|---|---|
| committer | Alex Crichton <alex@alexcrichton.com> | 2014-05-19 16:01:48 -0700 |
| commit | 7db02e20f2140530a9402f7d7452b10cac6fdf7b (patch) | |
| tree | 4318e0a2453a45b3d8ece84d0124d8b56db18869 /src/libstd/sync | |
| parent | efbd3724c012d68afd428beaa22f0d5aabff007d (diff) | |
| download | rust-7db02e20f2140530a9402f7d7452b10cac6fdf7b.tar.gz rust-7db02e20f2140530a9402f7d7452b10cac6fdf7b.zip | |
std: Rebuild mpmc queues on Unsafe/Arc
This removes usage of UnsafeArc and uses proper self mutability for concurrent types.
Diffstat (limited to 'src/libstd/sync')
| -rw-r--r-- | src/libstd/sync/mpmc_bounded_queue.rs | 50 |
1 files changed, 27 insertions, 23 deletions
diff --git a/src/libstd/sync/mpmc_bounded_queue.rs b/src/libstd/sync/mpmc_bounded_queue.rs index 2df5031b482..7fb98e14086 100644 --- a/src/libstd/sync/mpmc_bounded_queue.rs +++ b/src/libstd/sync/mpmc_bounded_queue.rs @@ -29,13 +29,15 @@ // http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue +use alloc::arc::Arc; + use clone::Clone; use kinds::Send; use num::next_power_of_two; use option::{Option, Some, None}; -use sync::arc::UnsafeArc; use sync::atomics::{AtomicUint,Relaxed,Release,Acquire}; use vec::Vec; +use ty::Unsafe; struct Node<T> { sequence: AtomicUint, @@ -44,7 +46,7 @@ struct Node<T> { struct State<T> { pad0: [u8, ..64], - buffer: Vec<Node<T>>, + buffer: Vec<Unsafe<Node<T>>>, mask: uint, pad1: [u8, ..64], enqueue_pos: AtomicUint, @@ -54,7 +56,7 @@ struct State<T> { } pub struct Queue<T> { - state: UnsafeArc<State<T>>, + state: Arc<State<T>>, } impl<T: Send> State<T> { @@ -70,7 +72,7 @@ impl<T: Send> State<T> { capacity }; let buffer = Vec::from_fn(capacity, |i| { - Node { sequence:AtomicUint::new(i), value: None } + Unsafe::new(Node { sequence:AtomicUint::new(i), value: None }) }); State{ pad0: [0, ..64], @@ -84,19 +86,21 @@ impl<T: Send> State<T> { } } - fn push(&mut self, value: T) -> bool { + fn push(&self, value: T) -> bool { let mask = self.mask; let mut pos = self.enqueue_pos.load(Relaxed); loop { - let node = self.buffer.get_mut(pos & mask); - let seq = node.sequence.load(Acquire); + let node = self.buffer.get(pos & mask); + let seq = unsafe { (*node.get()).sequence.load(Acquire) }; let diff: int = seq as int - pos as int; if diff == 0 { let enqueue_pos = self.enqueue_pos.compare_and_swap(pos, pos+1, Relaxed); if enqueue_pos == pos { - node.value = Some(value); - node.sequence.store(pos+1, Release); + unsafe { + (*node.get()).value = Some(value); + (*node.get()).sequence.store(pos+1, Release); + } break } else { pos = enqueue_pos; @@ -110,19 +114,21 @@ impl<T: Send> State<T> { true } - fn pop(&mut self) -> Option<T> { + fn pop(&self) -> Option<T> { let mask = self.mask; let mut pos = self.dequeue_pos.load(Relaxed); loop { - let node = self.buffer.get_mut(pos & mask); - let seq = node.sequence.load(Acquire); + let node = self.buffer.get(pos & mask); + let seq = unsafe { (*node.get()).sequence.load(Acquire) }; let diff: int = seq as int - (pos + 1) as int; if diff == 0 { let dequeue_pos = self.dequeue_pos.compare_and_swap(pos, pos+1, Relaxed); if dequeue_pos == pos { - let value = node.value.take(); - node.sequence.store(pos + mask + 1, Release); - return value + unsafe { + let value = (*node.get()).value.take(); + (*node.get()).sequence.store(pos + mask + 1, Release); + return value + } } else { pos = dequeue_pos; } @@ -138,24 +144,22 @@ impl<T: Send> State<T> { impl<T: Send> Queue<T> { pub fn with_capacity(capacity: uint) -> Queue<T> { Queue{ - state: UnsafeArc::new(State::with_capacity(capacity)) + state: Arc::new(State::with_capacity(capacity)) } } - pub fn push(&mut self, value: T) -> bool { - unsafe { (*self.state.get()).push(value) } + pub fn push(&self, value: T) -> bool { + self.state.push(value) } - pub fn pop(&mut self) -> Option<T> { - unsafe { (*self.state.get()).pop() } + pub fn pop(&self) -> Option<T> { + self.state.pop() } } impl<T: Send> Clone for Queue<T> { fn clone(&self) -> Queue<T> { - Queue { - state: self.state.clone() - } + Queue { state: self.state.clone() } } } |
