diff options
| author | Jason Toffaletti <jason@topsy.com> | 2013-10-07 00:07:04 -0700 |
|---|---|---|
| committer | Brian Anderson <banderson@mozilla.com> | 2013-10-25 18:27:45 -0700 |
| commit | 5876e21225f0cf34e8caa40b18db56fa716e8c92 (patch) | |
| tree | 1b5e39fbb2954395ae992af93d4fd6501b644071 | |
| parent | bf0e6eb346665d779ad012f7def9b4948c5c6b26 (diff) | |
| download | rust-5876e21225f0cf34e8caa40b18db56fa716e8c92.tar.gz rust-5876e21225f0cf34e8caa40b18db56fa716e8c92.zip | |
add multi-producer multi-consumer bounded queue to use for sleeper list
| -rw-r--r-- | src/libstd/rt/mod.rs | 3 | ||||
| -rw-r--r-- | src/libstd/rt/mpmc_bounded_queue.rs | 199 | ||||
| -rw-r--r-- | src/libstd/rt/sleeper_list.rs | 65 |
3 files changed, 211 insertions, 56 deletions
diff --git a/src/libstd/rt/mod.rs b/src/libstd/rt/mod.rs index 771b15588d0..d87580c83bf 100644 --- a/src/libstd/rt/mod.rs +++ b/src/libstd/rt/mod.rs @@ -139,6 +139,9 @@ mod message_queue; /// A mostly lock-free multi-producer, single consumer queue. mod mpsc_queue; +/// A lock-free multi-producer, multi-consumer bounded queue. +mod mpmc_bounded_queue; + /// A parallel data structure for tracking sleeping schedulers. mod sleeper_list; diff --git a/src/libstd/rt/mpmc_bounded_queue.rs b/src/libstd/rt/mpmc_bounded_queue.rs new file mode 100644 index 00000000000..8e6ac8f79c7 --- /dev/null +++ b/src/libstd/rt/mpmc_bounded_queue.rs @@ -0,0 +1,199 @@ +/* Multi-producer/multi-consumer bounded queue + * Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved. + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT + * SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE + * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + * The views and conclusions contained in the software and documentation are + * those of the authors and should not be interpreted as representing official + * policies, either expressed or implied, of Dmitry Vyukov. + */ + +use unstable::sync::UnsafeArc; +use unstable::atomics::{AtomicUint,Relaxed,Release,Acquire}; +use option::*; +use vec; +use clone::Clone; +use kinds::Send; +use num::{Exponential,Algebraic,Round}; + +struct Node<T> { + sequence: AtomicUint, + value: Option<T>, +} + +struct State<T> { + buffer: ~[Node<T>], + mask: uint, + enqueue_pos: AtomicUint, + dequeue_pos: AtomicUint, +} + +struct Queue<T> { + priv state: UnsafeArc<State<T>>, +} + +impl<T: Send> State<T> { + fn with_capacity(capacity: uint) -> State<T> { + let capacity = if capacity < 2 || (capacity & (capacity - 1)) != 0 { + // use next power of 2 as capacity + 2f64.pow(&((capacity as f64).log2().floor()+1f64)) as uint + } else { + capacity + }; + let buffer = do vec::from_fn(capacity) |i:uint| { + Node{sequence:AtomicUint::new(i),value:None} + }; + State{ + buffer: buffer, + mask: capacity-1, + enqueue_pos: AtomicUint::new(0), + dequeue_pos: AtomicUint::new(0), + } + } + + fn push(&mut self, value: T) -> bool { + let mask = self.mask; + let mut pos = self.enqueue_pos.load(Relaxed); + loop { + let node = &mut self.buffer[pos & mask]; + let seq = node.sequence.load(Acquire); + let diff: int = seq as int - pos as int; + + if diff == 0 { + let enqueue_pos = self.enqueue_pos.compare_and_swap(pos, pos+1, Relaxed); + if enqueue_pos == pos { + node.value = Some(value); + node.sequence.store(pos+1, Release); + break + } else { + pos = enqueue_pos; + } + } else if (diff < 0) { + return false + } else { + pos = self.enqueue_pos.load(Relaxed); + } + } + true + } + + fn pop(&mut self) -> Option<T> { + let mask = self.mask; + let mut pos = self.dequeue_pos.load(Relaxed); + loop { + let node = &mut self.buffer[pos & mask]; + let seq = node.sequence.load(Acquire); + let diff: int = seq as int - (pos + 1) as int; + if diff == 0 { + let dequeue_pos = self.dequeue_pos.compare_and_swap(pos, pos+1, Relaxed); + if dequeue_pos == pos { + let value = node.value.take(); + node.sequence.store(pos + mask + 1, Release); + return value + } else { + pos = dequeue_pos; + } + } else if diff < 0 { + return None + } else { + pos = self.dequeue_pos.load(Relaxed); + } + } + } +} + +impl<T: Send> Queue<T> { + pub fn with_capacity(capacity: uint) -> Queue<T> { + Queue{ + state: UnsafeArc::new(State::with_capacity(capacity)) + } + } + + pub fn push(&mut self, value: T) -> bool { + unsafe { (*self.state.get()).push(value) } + } + + pub fn pop(&mut self) -> Option<T> { + unsafe { (*self.state.get()).pop() } + } +} + +impl<T: Send> Clone for Queue<T> { + fn clone(&self) -> Queue<T> { + Queue { + state: self.state.clone() + } + } +} + +#[cfg(test)] +mod tests { + use prelude::*; + use option::*; + use task; + use comm; + use super::Queue; + + #[test] + fn test() { + let nthreads = 8u; + let nmsgs = 1000u; + let mut q = Queue::with_capacity(nthreads*nmsgs); + assert_eq!(None, q.pop()); + + for _ in range(0, nthreads) { + let (port, chan) = comm::stream(); + chan.send(q.clone()); + do task::spawn_sched(task::SingleThreaded) { + let mut q = port.recv(); + for i in range(0, nmsgs) { + assert!(q.push(i)); + } + } + } + + let mut completion_ports = ~[]; + for _ in range(0, nthreads) { + let (completion_port, completion_chan) = comm::stream(); + completion_ports.push(completion_port); + let (port, chan) = comm::stream(); + chan.send(q.clone()); + do task::spawn_sched(task::SingleThreaded) { + let mut q = port.recv(); + let mut i = 0u; + loop { + match q.pop() { + None => {}, + Some(_) => { + i += 1; + if i == nmsgs { break } + } + } + } + completion_chan.send(i); + } + } + + for completion_port in completion_ports.iter() { + assert_eq!(nmsgs, completion_port.recv()); + } + } +} diff --git a/src/libstd/rt/sleeper_list.rs b/src/libstd/rt/sleeper_list.rs index f4fdf15cda6..39c7431837f 100644 --- a/src/libstd/rt/sleeper_list.rs +++ b/src/libstd/rt/sleeper_list.rs @@ -11,84 +11,37 @@ //! Maintains a shared list of sleeping schedulers. Schedulers //! use this to wake each other up. -use container::Container; -use vec::OwnedVector; -use option::{Option, Some, None}; -use cell::Cell; -use unstable::sync::{UnsafeArc, LittleLock}; use rt::sched::SchedHandle; +use rt::mpmc_bounded_queue::Queue; +use option::*; use clone::Clone; pub struct SleeperList { - priv state: UnsafeArc<State> -} - -struct State { - count: uint, - stack: ~[SchedHandle], - lock: LittleLock + priv q: Queue<SchedHandle>, } impl SleeperList { pub fn new() -> SleeperList { - SleeperList { - state: UnsafeArc::new(State { - count: 0, - stack: ~[], - lock: LittleLock::new() - }) - } + SleeperList{q: Queue::with_capacity(8*1024)} } - pub fn push(&mut self, handle: SchedHandle) { - let handle = Cell::new(handle); - unsafe { - let state = self.state.get(); - do (*state).lock.lock { - (*state).count += 1; - (*state).stack.push(handle.take()); - } - } + pub fn push(&mut self, value: SchedHandle) { + assert!(self.q.push(value)) } pub fn pop(&mut self) -> Option<SchedHandle> { - unsafe { - let state = self.state.get(); - do (*state).lock.lock { - if !(*state).stack.is_empty() { - (*state).count -= 1; - Some((*state).stack.pop()) - } else { - None - } - } - } + self.q.pop() } - /// A pop that may sometimes miss enqueued elements, but is much faster - /// to give up without doing any synchronization pub fn casual_pop(&mut self) -> Option<SchedHandle> { - unsafe { - let state = self.state.get(); - // NB: Unsynchronized check - if (*state).count == 0 { return None; } - do (*state).lock.lock { - if !(*state).stack.is_empty() { - // NB: count is also protected by the lock - (*state).count -= 1; - Some((*state).stack.pop()) - } else { - None - } - } - } + self.q.pop() } } impl Clone for SleeperList { fn clone(&self) -> SleeperList { SleeperList { - state: self.state.clone() + q: self.q.clone() } } } |
