diff options
| author | Alex Crichton <alex@alexcrichton.com> | 2013-12-12 17:27:37 -0800 |
|---|---|---|
| committer | Alex Crichton <alex@alexcrichton.com> | 2013-12-24 14:42:00 -0800 |
| commit | a55c57284d8341ee5b22c5372e77ac0af9479dc5 (patch) | |
| tree | 73d06098fa02bf822a6709761f67342e8ba1fe6a /src/libstd/rt/spsc_queue.rs | |
| parent | dafb310ba131b34a8819566201dc8d0af9bbd406 (diff) | |
| download | rust-a55c57284d8341ee5b22c5372e77ac0af9479dc5.tar.gz rust-a55c57284d8341ee5b22c5372e77ac0af9479dc5.zip | |
std: Introduce std::sync
For now, this moves the following modules to std::sync * UnsafeArc (also removed unwrap method) * mpsc_queue * spsc_queue * atomics * mpmc_bounded_queue * deque We may want to remove some of the queues, but for now this moves things out of std::rt into std::sync
Diffstat (limited to 'src/libstd/rt/spsc_queue.rs')
| -rw-r--r-- | src/libstd/rt/spsc_queue.rs | 296 |
1 files changed, 0 insertions, 296 deletions
diff --git a/src/libstd/rt/spsc_queue.rs b/src/libstd/rt/spsc_queue.rs deleted file mode 100644 index f14533d726a..00000000000 --- a/src/libstd/rt/spsc_queue.rs +++ /dev/null @@ -1,296 +0,0 @@ -/* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved. - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. - * - * 2. Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * - * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED - * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF - * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT - * SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, - * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR - * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF - * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE - * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF - * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - * The views and conclusions contained in the software and documentation are - * those of the authors and should not be interpreted as representing official - * policies, either expressed or implied, of Dmitry Vyukov. - */ - -// http://www.1024cores.net/home/lock-free-algorithms/queues/unbounded-spsc-queue -use cast; -use kinds::Send; -use ops::Drop; -use option::{Some, None, Option}; -use unstable::atomics::{AtomicPtr, Relaxed, AtomicUint, Acquire, Release}; -use unstable::sync::UnsafeArc; - -// Node within the linked list queue of messages to send -struct Node<T> { - // XXX: this could be an uninitialized T if we're careful enough, and - // that would reduce memory usage (and be a bit faster). - // is it worth it? - value: Option<T>, // nullable for re-use of nodes - next: AtomicPtr<Node<T>>, // next node in the queue -} - -// The producer/consumer halves both need access to the `tail` field, and if -// they both have access to that we may as well just give them both access -// to this whole structure. -struct State<T, P> { - // consumer fields - tail: *mut Node<T>, // where to pop from - tail_prev: AtomicPtr<Node<T>>, // where to pop from - - // producer fields - head: *mut Node<T>, // where to push to - first: *mut Node<T>, // where to get new nodes from - tail_copy: *mut Node<T>, // between first/tail - - // Cache maintenance fields. Additions and subtractions are stored - // separately in order to allow them to use nonatomic addition/subtraction. - cache_bound: uint, - cache_additions: AtomicUint, - cache_subtractions: AtomicUint, - - packet: P, -} - -pub struct Producer<T, P> { - priv state: UnsafeArc<State<T, P>>, -} - -pub struct Consumer<T, P> { - priv state: UnsafeArc<State<T, P>>, -} - -pub fn queue<T: Send, P: Send>(bound: uint, - p: P) -> (Consumer<T, P>, Producer<T, P>) -{ - let n1 = Node::new(); - let n2 = Node::new(); - unsafe { (*n1).next.store(n2, Relaxed) } - let state = State { - tail: n2, - tail_prev: AtomicPtr::new(n1), - head: n2, - first: n1, - tail_copy: n1, - cache_bound: bound, - cache_additions: AtomicUint::new(0), - cache_subtractions: AtomicUint::new(0), - packet: p, - }; - let (arc1, arc2) = UnsafeArc::new2(state); - (Consumer { state: arc1 }, Producer { state: arc2 }) -} - -impl<T: Send> Node<T> { - fn new() -> *mut Node<T> { - unsafe { - cast::transmute(~Node { - value: None, - next: AtomicPtr::new(0 as *mut Node<T>), - }) - } - } -} - -impl<T: Send, P: Send> Producer<T, P> { - pub fn push(&mut self, t: T) { - unsafe { (*self.state.get()).push(t) } - } - pub fn is_empty(&self) -> bool { - unsafe { (*self.state.get()).is_empty() } - } - pub unsafe fn packet(&self) -> *mut P { - &mut (*self.state.get()).packet as *mut P - } -} - -impl<T: Send, P: Send> Consumer<T, P> { - pub fn pop(&mut self) -> Option<T> { - unsafe { (*self.state.get()).pop() } - } - pub unsafe fn packet(&self) -> *mut P { - &mut (*self.state.get()).packet as *mut P - } -} - -impl<T: Send, P: Send> State<T, P> { - // remember that there is only one thread executing `push` (and only one - // thread executing `pop`) - unsafe fn push(&mut self, t: T) { - // Acquire a node (which either uses a cached one or allocates a new - // one), and then append this to the 'head' node. - let n = self.alloc(); - assert!((*n).value.is_none()); - (*n).value = Some(t); - (*n).next.store(0 as *mut Node<T>, Relaxed); - (*self.head).next.store(n, Release); - self.head = n; - } - - unsafe fn alloc(&mut self) -> *mut Node<T> { - // First try to see if we can consume the 'first' node for our uses. - // We try to avoid as many atomic instructions as possible here, so - // the addition to cache_subtractions is not atomic (plus we're the - // only one subtracting from the cache). - if self.first != self.tail_copy { - if self.cache_bound > 0 { - let b = self.cache_subtractions.load(Relaxed); - self.cache_subtractions.store(b + 1, Relaxed); - } - let ret = self.first; - self.first = (*ret).next.load(Relaxed); - return ret; - } - // If the above fails, then update our copy of the tail and try - // again. - self.tail_copy = self.tail_prev.load(Acquire); - if self.first != self.tail_copy { - if self.cache_bound > 0 { - let b = self.cache_subtractions.load(Relaxed); - self.cache_subtractions.store(b + 1, Relaxed); - } - let ret = self.first; - self.first = (*ret).next.load(Relaxed); - return ret; - } - // If all of that fails, then we have to allocate a new node - // (there's nothing in the node cache). - Node::new() - } - - // remember that there is only one thread executing `pop` (and only one - // thread executing `push`) - unsafe fn pop(&mut self) -> Option<T> { - // The `tail` node is not actually a used node, but rather a - // sentinel from where we should start popping from. Hence, look at - // tail's next field and see if we can use it. If we do a pop, then - // the current tail node is a candidate for going into the cache. - let tail = self.tail; - let next = (*tail).next.load(Acquire); - if next.is_null() { return None } - assert!((*next).value.is_some()); - let ret = (*next).value.take(); - - self.tail = next; - if self.cache_bound == 0 { - self.tail_prev.store(tail, Release); - } else { - // XXX: this is dubious with overflow. - let additions = self.cache_additions.load(Relaxed); - let subtractions = self.cache_subtractions.load(Relaxed); - let size = additions - subtractions; - - if size < self.cache_bound { - self.tail_prev.store(tail, Release); - self.cache_additions.store(additions + 1, Relaxed); - } else { - (*self.tail_prev.load(Relaxed)).next.store(next, Relaxed); - // We have successfully erased all references to 'tail', so - // now we can safely drop it. - let _: ~Node<T> = cast::transmute(tail); - } - } - return ret; - } - - unsafe fn is_empty(&self) -> bool { - let tail = self.tail; - let next = (*tail).next.load(Acquire); - return next.is_null(); - } -} - -#[unsafe_destructor] -impl<T: Send, P: Send> Drop for State<T, P> { - fn drop(&mut self) { - unsafe { - let mut cur = self.first; - while !cur.is_null() { - let next = (*cur).next.load(Relaxed); - let _n: ~Node<T> = cast::transmute(cur); - cur = next; - } - } - } -} - -#[cfg(test)] -mod test { - use prelude::*; - use super::queue; - use task; - - #[test] - fn smoke() { - let (mut c, mut p) = queue(0, ()); - p.push(1); - p.push(2); - assert_eq!(c.pop(), Some(1)); - assert_eq!(c.pop(), Some(2)); - assert_eq!(c.pop(), None); - p.push(3); - p.push(4); - assert_eq!(c.pop(), Some(3)); - assert_eq!(c.pop(), Some(4)); - assert_eq!(c.pop(), None); - } - - #[test] - fn drop_full() { - let (_, mut p) = queue(0, ()); - p.push(~1); - p.push(~2); - } - - #[test] - fn smoke_bound() { - let (mut c, mut p) = queue(1, ()); - p.push(1); - p.push(2); - assert_eq!(c.pop(), Some(1)); - assert_eq!(c.pop(), Some(2)); - assert_eq!(c.pop(), None); - p.push(3); - p.push(4); - assert_eq!(c.pop(), Some(3)); - assert_eq!(c.pop(), Some(4)); - assert_eq!(c.pop(), None); - } - - #[test] - fn stress() { - stress_bound(0); - stress_bound(1); - - fn stress_bound(bound: uint) { - let (c, mut p) = queue(bound, ()); - do task::spawn_sched(task::SingleThreaded) { - let mut c = c; - for _ in range(0, 100000) { - loop { - match c.pop() { - Some(1) => break, - Some(_) => fail!(), - None => {} - } - } - } - } - for _ in range(0, 100000) { - p.push(1); - } - } - } -} |
