diff options
| author | Jason Toffaletti <jason@topsy.com> | 2013-10-03 19:23:47 -0700 |
|---|---|---|
| committer | Brian Anderson <banderson@mozilla.com> | 2013-10-25 18:27:45 -0700 |
| commit | c62d604531456e96de506b835207223136361dc2 (patch) | |
| tree | cc01486b04a5e0b8bcc99fb462e6c767501612af /src/libstd/rt/mpsc_queue.rs | |
| parent | dcdcd309fb9baf06f835831c83b94a5ad1cdd568 (diff) | |
| download | rust-c62d604531456e96de506b835207223136361dc2.tar.gz rust-c62d604531456e96de506b835207223136361dc2.zip | |
lock-free queue for scheduler message queue
Diffstat (limited to 'src/libstd/rt/mpsc_queue.rs')
| -rw-r--r-- | src/libstd/rt/mpsc_queue.rs | 203 |
1 files changed, 203 insertions, 0 deletions
diff --git a/src/libstd/rt/mpsc_queue.rs b/src/libstd/rt/mpsc_queue.rs new file mode 100644 index 00000000000..57b7d4f469b --- /dev/null +++ b/src/libstd/rt/mpsc_queue.rs @@ -0,0 +1,203 @@ +/* Multi-producer/single-consumer queue + * Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved. + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT + * SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE + * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + * The views and conclusions contained in the software and documentation are + * those of the authors and should not be interpreted as representing official + * policies, either expressed or implied, of Dmitry Vyukov. + */ + +//! A mostly lock-free multi-producer, single consumer queue. + +use unstable::sync::UnsafeArc; +use unstable::atomics::{AtomicPtr,Relaxed,Release,Acquire}; +use ptr::{mut_null, to_mut_unsafe_ptr}; +use cast; +use option::*; +use clone::Clone; +use default::Default; +use kinds::Send; +use fmt; + +struct Node<T> { + next: AtomicPtr<Node<T>>, + value: Option<T>, +} + +impl<T> Node<T> { + fn new(value: T) -> Node<T> { + Node{next: AtomicPtr::new(mut_null()), value: Some(value)} + } +} + +impl<T> Default for Node<T> { + fn default() -> Node<T> { + Node{next: AtomicPtr::new(mut_null()), value: None} + } +} + +struct State<T> { + stub: Node<T>, + head: AtomicPtr<Node<T>>, + tail: *mut Node<T>, +} + +struct Queue<T> { + priv state: UnsafeArc<State<T>>, +} + +impl<T: Send> Clone for Queue<T> { + fn clone(&self) -> Queue<T> { + Queue { + state: self.state.clone() + } + } +} + +impl<T: Send> fmt::Default for Queue<T> { + fn fmt(value: &Queue<T>, f: &mut fmt::Formatter) { + write!(f.buf, "Queue({})", value.state.get()); + } +} + +impl<T: Send> Queue<T> { + pub fn new() -> Queue<T> { + let mut q = Queue{state: UnsafeArc::new(State { + stub: Default::default(), + head: AtomicPtr::new(mut_null()), + tail: mut_null(), + })}; + let stub = q.get_stub_unsafe(); + q.get_head().store(stub, Relaxed); + q.set_tail(stub); + q + } + + pub fn push(&mut self, value: T) { + unsafe { + let node = cast::transmute(~Node::new(value)); + self.push_node(node); + } + } + + fn push_node(&mut self, node: *mut Node<T>) { + unsafe { + (*node).next.store(mut_null(), Release); + let prev = (*self.state.get()).head.swap(node, Relaxed); + (*prev).next.store(node, Release); + } + } + + fn get_stub_unsafe(&mut self) -> *mut Node<T> { + unsafe { to_mut_unsafe_ptr(&mut (*self.state.get()).stub) } + } + + fn get_head(&mut self) -> &mut AtomicPtr<Node<T>> { + unsafe { &mut (*self.state.get()).head } + } + + fn get_tail(&mut self) -> *mut Node<T> { + unsafe { (*self.state.get()).tail } + } + + fn set_tail(&mut self, tail: *mut Node<T>) { + unsafe { (*self.state.get()).tail = tail } + } + + pub fn casual_pop(&mut self) -> Option<T> { + self.pop() + } + + pub fn pop(&mut self) -> Option<T> { + unsafe { + let mut tail = self.get_tail(); + let mut next = (*tail).next.load(Acquire); + let stub = self.get_stub_unsafe(); + if tail == stub { + if mut_null() == next { + return None + } + self.set_tail(next); + tail = next; + next = (*next).next.load(Acquire); + } + if next != mut_null() { + let tail: ~Node<T> = cast::transmute(tail); + self.set_tail(next); + return tail.value + } + let head = self.get_head().load(Relaxed); + if tail != head { + return None + } + self.push_node(stub); + next = (*tail).next.load(Acquire); + if next != mut_null() { + let tail: ~Node<T> = cast::transmute(tail); + self.set_tail(next); + return tail.value + } + } + None + } +} + +#[cfg(test)] +mod tests { + use prelude::*; + use option::*; + use task; + use comm; + use fmt; + use super::Queue; + + #[test] + fn test() { + let nthreads = 8u; + let nmsgs = 1000u; + let mut q = Queue::new(); + assert_eq!(None, q.pop()); + + for _ in range(0, nthreads) { + let (port, chan) = comm::stream(); + chan.send(q.clone()); + do task::spawn_sched(task::SingleThreaded) { + let mut q = port.recv(); + for i in range(0, nmsgs) { + q.push(i); + } + } + } + + let mut i = 0u; + loop { + match q.pop() { + None => {}, + Some(_) => { + i += 1; + if i == nthreads*nmsgs { break } + } + } + } + } +} + |
