diff options
| author | Alex Crichton <alex@alexcrichton.com> | 2013-12-12 17:27:37 -0800 |
|---|---|---|
| committer | Alex Crichton <alex@alexcrichton.com> | 2013-12-24 14:42:00 -0800 |
| commit | a55c57284d8341ee5b22c5372e77ac0af9479dc5 (patch) | |
| tree | 73d06098fa02bf822a6709761f67342e8ba1fe6a /src/libstd/rt | |
| parent | dafb310ba131b34a8819566201dc8d0af9bbd406 (diff) | |
| download | rust-a55c57284d8341ee5b22c5372e77ac0af9479dc5.tar.gz rust-a55c57284d8341ee5b22c5372e77ac0af9479dc5.zip | |
std: Introduce std::sync
For now, this moves the following modules to std::sync * UnsafeArc (also removed unwrap method) * mpsc_queue * spsc_queue * atomics * mpmc_bounded_queue * deque We may want to remove some of the queues, but for now this moves things out of std::rt into std::sync
Diffstat (limited to 'src/libstd/rt')
| -rw-r--r-- | src/libstd/rt/deque.rs | 658 | ||||
| -rw-r--r-- | src/libstd/rt/mpmc_bounded_queue.rs | 209 | ||||
| -rw-r--r-- | src/libstd/rt/mpsc_queue.rs | 215 | ||||
| -rw-r--r-- | src/libstd/rt/spsc_queue.rs | 296 |
4 files changed, 0 insertions, 1378 deletions
diff --git a/src/libstd/rt/deque.rs b/src/libstd/rt/deque.rs deleted file mode 100644 index 770fc9ffa12..00000000000 --- a/src/libstd/rt/deque.rs +++ /dev/null @@ -1,658 +0,0 @@ -// Copyright 2013 The Rust Project Developers. See the COPYRIGHT -// file at the top-level directory of this distribution and at -// http://rust-lang.org/COPYRIGHT. -// -// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or -// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license -// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -//! A (mostly) lock-free concurrent work-stealing deque -//! -//! This module contains an implementation of the Chase-Lev work stealing deque -//! described in "Dynamic Circular Work-Stealing Deque". The implementation is -//! heavily based on the pseudocode found in the paper. -//! -//! This implementation does not want to have the restriction of a garbage -//! collector for reclamation of buffers, and instead it uses a shared pool of -//! buffers. This shared pool is required for correctness in this -//! implementation. -//! -//! The only lock-synchronized portions of this deque are the buffer allocation -//! and deallocation portions. Otherwise all operations are lock-free. -//! -//! # Example -//! -//! use std::rt::deque::BufferPool; -//! -//! let mut pool = BufferPool::new(); -//! let (mut worker, mut stealer) = pool.deque(); -//! -//! // Only the worker may push/pop -//! worker.push(1); -//! worker.pop(); -//! -//! // Stealers take data from the other end of the deque -//! worker.push(1); -//! stealer.steal(); -//! -//! // Stealers can be cloned to have many stealers stealing in parallel -//! worker.push(1); -//! let mut stealer2 = stealer.clone(); -//! stealer2.steal(); - -// NB: the "buffer pool" strategy is not done for speed, but rather for -// correctness. For more info, see the comment on `swap_buffer` - -// XXX: all atomic operations in this module use a SeqCst ordering. That is -// probably overkill - -use cast; -use clone::Clone; -use iter::range; -use kinds::Send; -use libc; -use mem; -use ops::Drop; -use option::{Option, Some, None}; -use ptr; -use unstable::atomics::{AtomicInt, AtomicPtr, SeqCst}; -use unstable::sync::{UnsafeArc, Exclusive}; - -// Once the queue is less than 1/K full, then it will be downsized. Note that -// the deque requires that this number be less than 2. -static K: int = 4; - -// Minimum number of bits that a buffer size should be. No buffer will resize to -// under this value, and all deques will initially contain a buffer of this -// size. -// -// The size in question is 1 << MIN_BITS -static MIN_BITS: int = 7; - -struct Deque<T> { - bottom: AtomicInt, - top: AtomicInt, - array: AtomicPtr<Buffer<T>>, - pool: BufferPool<T>, -} - -/// Worker half of the work-stealing deque. This worker has exclusive access to -/// one side of the deque, and uses `push` and `pop` method to manipulate it. -/// -/// There may only be one worker per deque. -pub struct Worker<T> { - priv deque: UnsafeArc<Deque<T>>, -} - -/// The stealing half of the work-stealing deque. Stealers have access to the -/// opposite end of the deque from the worker, and they only have access to the -/// `steal` method. -pub struct Stealer<T> { - priv deque: UnsafeArc<Deque<T>>, -} - -/// When stealing some data, this is an enumeration of the possible outcomes. -#[deriving(Eq)] -pub enum Stolen<T> { - /// The deque was empty at the time of stealing - Empty, - /// The stealer lost the race for stealing data, and a retry may return more - /// data. - Abort, - /// The stealer has successfully stolen some data. - Data(T), -} - -/// The allocation pool for buffers used by work-stealing deques. Right now this -/// structure is used for reclamation of memory after it is no longer in use by -/// deques. -/// -/// This data structure is protected by a mutex, but it is rarely used. Deques -/// will only use this structure when allocating a new buffer or deallocating a -/// previous one. -pub struct BufferPool<T> { - priv pool: Exclusive<~[~Buffer<T>]>, -} - -/// An internal buffer used by the chase-lev deque. This structure is actually -/// implemented as a circular buffer, and is used as the intermediate storage of -/// the data in the deque. -/// -/// This type is implemented with *T instead of ~[T] for two reasons: -/// -/// 1. There is nothing safe about using this buffer. This easily allows the -/// same value to be read twice in to rust, and there is nothing to -/// prevent this. The usage by the deque must ensure that one of the -/// values is forgotten. Furthermore, we only ever want to manually run -/// destructors for values in this buffer (on drop) because the bounds -/// are defined by the deque it's owned by. -/// -/// 2. We can certainly avoid bounds checks using *T instead of ~[T], although -/// LLVM is probably pretty good at doing this already. -struct Buffer<T> { - storage: *T, - log_size: int, -} - -impl<T: Send> BufferPool<T> { - /// Allocates a new buffer pool which in turn can be used to allocate new - /// deques. - pub fn new() -> BufferPool<T> { - BufferPool { pool: Exclusive::new(~[]) } - } - - /// Allocates a new work-stealing deque which will send/receiving memory to - /// and from this buffer pool. - pub fn deque(&mut self) -> (Worker<T>, Stealer<T>) { - let (a, b) = UnsafeArc::new2(Deque::new(self.clone())); - (Worker { deque: a }, Stealer { deque: b }) - } - - fn alloc(&mut self, bits: int) -> ~Buffer<T> { - unsafe { - self.pool.with(|pool| { - match pool.iter().position(|x| x.size() >= (1 << bits)) { - Some(i) => pool.remove(i), - None => ~Buffer::new(bits) - } - }) - } - } - - fn free(&mut self, buf: ~Buffer<T>) { - unsafe { - let mut buf = Some(buf); - self.pool.with(|pool| { - let buf = buf.take_unwrap(); - match pool.iter().position(|v| v.size() > buf.size()) { - Some(i) => pool.insert(i, buf), - None => pool.push(buf), - } - }) - } - } -} - -impl<T: Send> Clone for BufferPool<T> { - fn clone(&self) -> BufferPool<T> { BufferPool { pool: self.pool.clone() } } -} - -impl<T: Send> Worker<T> { - /// Pushes data onto the front of this work queue. - pub fn push(&mut self, t: T) { - unsafe { (*self.deque.get()).push(t) } - } - /// Pops data off the front of the work queue, returning `None` on an empty - /// queue. - pub fn pop(&mut self) -> Option<T> { - unsafe { (*self.deque.get()).pop() } - } - - /// Gets access to the buffer pool that this worker is attached to. This can - /// be used to create more deques which share the same buffer pool as this - /// deque. - pub fn pool<'a>(&'a mut self) -> &'a mut BufferPool<T> { - unsafe { &mut (*self.deque.get()).pool } - } -} - -impl<T: Send> Stealer<T> { - /// Steals work off the end of the queue (opposite of the worker's end) - pub fn steal(&mut self) -> Stolen<T> { - unsafe { (*self.deque.get()).steal() } - } - - /// Gets access to the buffer pool that this stealer is attached to. This - /// can be used to create more deques which share the same buffer pool as - /// this deque. - pub fn pool<'a>(&'a mut self) -> &'a mut BufferPool<T> { - unsafe { &mut (*self.deque.get()).pool } - } -} - -impl<T: Send> Clone for Stealer<T> { - fn clone(&self) -> Stealer<T> { Stealer { deque: self.deque.clone() } } -} - -// Almost all of this code can be found directly in the paper so I'm not -// personally going to heavily comment what's going on here. - -impl<T: Send> Deque<T> { - fn new(mut pool: BufferPool<T>) -> Deque<T> { - let buf = pool.alloc(MIN_BITS); - Deque { - bottom: AtomicInt::new(0), - top: AtomicInt::new(0), - array: AtomicPtr::new(unsafe { cast::transmute(buf) }), - pool: pool, - } - } - - unsafe fn push(&mut self, data: T) { - let mut b = self.bottom.load(SeqCst); - let t = self.top.load(SeqCst); - let mut a = self.array.load(SeqCst); - let size = b - t; - if size >= (*a).size() - 1 { - // You won't find this code in the chase-lev deque paper. This is - // alluded to in a small footnote, however. We always free a buffer - // when growing in order to prevent leaks. - a = self.swap_buffer(b, a, (*a).resize(b, t, 1)); - b = self.bottom.load(SeqCst); - } - (*a).put(b, data); - self.bottom.store(b + 1, SeqCst); - } - - unsafe fn pop(&mut self) -> Option<T> { - let b = self.bottom.load(SeqCst); - let a = self.array.load(SeqCst); - let b = b - 1; - self.bottom.store(b, SeqCst); - let t = self.top.load(SeqCst); - let size = b - t; - if size < 0 { - self.bottom.store(t, SeqCst); - return None; - } - let data = (*a).get(b); - if size > 0 { - self.maybe_shrink(b, t); - return Some(data); - } - if self.top.compare_and_swap(t, t + 1, SeqCst) == t { - self.bottom.store(t + 1, SeqCst); - return Some(data); - } else { - self.bottom.store(t + 1, SeqCst); - cast::forget(data); // someone else stole this value - return None; - } - } - - unsafe fn steal(&mut self) -> Stolen<T> { - let t = self.top.load(SeqCst); - let old = self.array.load(SeqCst); - let b = self.bottom.load(SeqCst); - let a = self.array.load(SeqCst); - let size = b - t; - if size <= 0 { return Empty } - if size % (*a).size() == 0 { - if a == old && t == self.top.load(SeqCst) { - return Empty - } - return Abort - } - let data = (*a).get(t); - if self.top.compare_and_swap(t, t + 1, SeqCst) == t { - Data(data) - } else { - cast::forget(data); // someone else stole this value - Abort - } - } - - unsafe fn maybe_shrink(&mut self, b: int, t: int) { - let a = self.array.load(SeqCst); - if b - t < (*a).size() / K && b - t > (1 << MIN_BITS) { - self.swap_buffer(b, a, (*a).resize(b, t, -1)); - } - } - - // Helper routine not mentioned in the paper which is used in growing and - // shrinking buffers to swap in a new buffer into place. As a bit of a - // recap, the whole point that we need a buffer pool rather than just - // calling malloc/free directly is that stealers can continue using buffers - // after this method has called 'free' on it. The continued usage is simply - // a read followed by a forget, but we must make sure that the memory can - // continue to be read after we flag this buffer for reclamation. - unsafe fn swap_buffer(&mut self, b: int, old: *mut Buffer<T>, - buf: Buffer<T>) -> *mut Buffer<T> { - let newbuf: *mut Buffer<T> = cast::transmute(~buf); - self.array.store(newbuf, SeqCst); - let ss = (*newbuf).size(); - self.bottom.store(b + ss, SeqCst); - let t = self.top.load(SeqCst); - if self.top.compare_and_swap(t, t + ss, SeqCst) != t { - self.bottom.store(b, SeqCst); - } - self.pool.free(cast::transmute(old)); - return newbuf; - } -} - - -#[unsafe_destructor] -impl<T: Send> Drop for Deque<T> { - fn drop(&mut self) { - let t = self.top.load(SeqCst); - let b = self.bottom.load(SeqCst); - let a = self.array.load(SeqCst); - // Free whatever is leftover in the dequeue, and then move the buffer - // back into the pool. - for i in range(t, b) { - let _: T = unsafe { (*a).get(i) }; - } - self.pool.free(unsafe { cast::transmute(a) }); - } -} - -impl<T: Send> Buffer<T> { - unsafe fn new(log_size: int) -> Buffer<T> { - let size = (1 << log_size) * mem::size_of::<T>(); - let buffer = libc::malloc(size as libc::size_t); - assert!(!buffer.is_null()); - Buffer { - storage: buffer as *T, - log_size: log_size, - } - } - - fn size(&self) -> int { 1 << self.log_size } - - // Apparently LLVM cannot optimize (foo % (1 << bar)) into this implicitly - fn mask(&self) -> int { (1 << self.log_size) - 1 } - - // This does not protect against loading duplicate values of the same cell, - // nor does this clear out the contents contained within. Hence, this is a - // very unsafe method which the caller needs to treat specially in case a - // race is lost. - unsafe fn get(&self, i: int) -> T { - ptr::read_ptr(self.storage.offset(i & self.mask())) - } - - // Unsafe because this unsafely overwrites possibly uninitialized or - // initialized data. - unsafe fn put(&mut self, i: int, t: T) { - let ptr = self.storage.offset(i & self.mask()); - ptr::copy_nonoverlapping_memory(ptr as *mut T, &t as *T, 1); - cast::forget(t); - } - - // Again, unsafe because this has incredibly dubious ownership violations. - // It is assumed that this buffer is immediately dropped. - unsafe fn resize(&self, b: int, t: int, delta: int) -> Buffer<T> { - let mut buf = Buffer::new(self.log_size + delta); - for i in range(t, b) { - buf.put(i, self.get(i)); - } - return buf; - } -} - -#[unsafe_destructor] -impl<T: Send> Drop for Buffer<T> { - fn drop(&mut self) { - // It is assumed that all buffers are empty on drop. - unsafe { libc::free(self.storage as *libc::c_void) } - } -} - -#[cfg(test)] -mod tests { - use prelude::*; - use super::{Data, BufferPool, Abort, Empty, Worker, Stealer}; - - use cast; - use rt::thread::Thread; - use rand; - use rand::Rng; - use unstable::atomics::{AtomicBool, INIT_ATOMIC_BOOL, SeqCst, - AtomicUint, INIT_ATOMIC_UINT}; - use vec; - - #[test] - fn smoke() { - let mut pool = BufferPool::new(); - let (mut w, mut s) = pool.deque(); - assert_eq!(w.pop(), None); - assert_eq!(s.steal(), Empty); - w.push(1); - assert_eq!(w.pop(), Some(1)); - w.push(1); - assert_eq!(s.steal(), Data(1)); - w.push(1); - assert_eq!(s.clone().steal(), Data(1)); - } - - #[test] - fn stealpush() { - static AMT: int = 100000; - let mut pool = BufferPool::<int>::new(); - let (mut w, s) = pool.deque(); - let t = do Thread::start { - let mut s = s; - let mut left = AMT; - while left > 0 { - match s.steal() { - Data(i) => { - assert_eq!(i, 1); - left -= 1; - } - Abort | Empty => {} - } - } - }; - - for _ in range(0, AMT) { - w.push(1); - } - - t.join(); - } - - #[test] - fn stealpush_large() { - static AMT: int = 100000; - let mut pool = BufferPool::<(int, int)>::new(); - let (mut w, s) = pool.deque(); - let t = do Thread::start { - let mut s = s; - let mut left = AMT; - while left > 0 { - match s.steal() { - Data((1, 10)) => { left -= 1; } - Data(..) => fail!(), - Abort | Empty => {} - } - } - }; - - for _ in range(0, AMT) { - w.push((1, 10)); - } - - t.join(); - } - - fn stampede(mut w: Worker<~int>, s: Stealer<~int>, - nthreads: int, amt: uint) { - for _ in range(0, amt) { - w.push(~20); - } - let mut remaining = AtomicUint::new(amt); - let unsafe_remaining: *mut AtomicUint = &mut remaining; - - let threads = range(0, nthreads).map(|_| { - let s = s.clone(); - do Thread::start { - unsafe { - let mut s = s; - while (*unsafe_remaining).load(SeqCst) > 0 { - match s.steal() { - Data(~20) => { - (*unsafe_remaining).fetch_sub(1, SeqCst); - } - Data(..) => fail!(), - Abort | Empty => {} - } - } - } - } - }).to_owned_vec(); - - while remaining.load(SeqCst) > 0 { - match w.pop() { - Some(~20) => { remaining.fetch_sub(1, SeqCst); } - Some(..) => fail!(), - None => {} - } - } - - for thread in threads.move_iter() { - thread.join(); - } - } - - #[test] - fn run_stampede() { - let mut pool = BufferPool::<~int>::new(); - let (w, s) = pool.deque(); - stampede(w, s, 8, 10000); - } - - #[test] - fn many_stampede() { - static AMT: uint = 4; - let mut pool = BufferPool::<~int>::new(); - let threads = range(0, AMT).map(|_| { - let (w, s) = pool.deque(); - do Thread::start { - stampede(w, s, 4, 10000); - } - }).to_owned_vec(); - - for thread in threads.move_iter() { - thread.join(); - } - } - - #[test] - fn stress() { - static AMT: int = 100000; - static NTHREADS: int = 8; - static mut DONE: AtomicBool = INIT_ATOMIC_BOOL; - static mut HITS: AtomicUint = INIT_ATOMIC_UINT; - let mut pool = BufferPool::<int>::new(); - let (mut w, s) = pool.deque(); - - let threads = range(0, NTHREADS).map(|_| { - let s = s.clone(); - do Thread::start { - unsafe { - let mut s = s; - loop { - match s.steal() { - Data(2) => { HITS.fetch_add(1, SeqCst); } - Data(..) => fail!(), - _ if DONE.load(SeqCst) => break, - _ => {} - } - } - } - } - }).to_owned_vec(); - - let mut rng = rand::task_rng(); - let mut expected = 0; - while expected < AMT { - if rng.gen_range(0, 3) == 2 { - match w.pop() { - None => {} - Some(2) => unsafe { HITS.fetch_add(1, SeqCst); }, - Some(_) => fail!(), - } - } else { - expected += 1; - w.push(2); - } - } - - unsafe { - while HITS.load(SeqCst) < AMT as uint { - match w.pop() { - None => {} - Some(2) => { HITS.fetch_add(1, SeqCst); }, - Some(_) => fail!(), - } - } - DONE.store(true, SeqCst); - } - - for thread in threads.move_iter() { - thread.join(); - } - - assert_eq!(unsafe { HITS.load(SeqCst) }, expected as uint); - } - - #[test] - #[ignore(cfg(windows))] // apparently windows scheduling is weird? - fn no_starvation() { - static AMT: int = 10000; - static NTHREADS: int = 4; - static mut DONE: AtomicBool = INIT_ATOMIC_BOOL; - let mut pool = BufferPool::<(int, uint)>::new(); - let (mut w, s) = pool.deque(); - - let (threads, hits) = vec::unzip(range(0, NTHREADS).map(|_| { - let s = s.clone(); - let unique_box = ~AtomicUint::new(0); - let thread_box = unsafe { - *cast::transmute::<&~AtomicUint,**mut AtomicUint>(&unique_box) - }; - (do Thread::start { - unsafe { - let mut s = s; - loop { - match s.steal() { - Data((1, 2)) => { - (*thread_box).fetch_add(1, SeqCst); - } - Data(..) => fail!(), - _ if DONE.load(SeqCst) => break, - _ => {} - } - } - } - }, unique_box) - })); - - let mut rng = rand::task_rng(); - let mut myhit = false; - let mut iter = 0; - 'outer: loop { - for _ in range(0, rng.gen_range(0, AMT)) { - if !myhit && rng.gen_range(0, 3) == 2 { - match w.pop() { - None => {} - Some((1, 2)) => myhit = true, - Some(_) => fail!(), - } - } else { - w.push((1, 2)); - } - } - iter += 1; - - debug!("loop iteration {}", iter); - for (i, slot) in hits.iter().enumerate() { - let amt = slot.load(SeqCst); - debug!("thread {}: {}", i, amt); - if amt == 0 { continue 'outer; } - } - if myhit { - break - } - } - - unsafe { DONE.store(true, SeqCst); } - - for thread in threads.move_iter() { - thread.join(); - } - } -} - diff --git a/src/libstd/rt/mpmc_bounded_queue.rs b/src/libstd/rt/mpmc_bounded_queue.rs deleted file mode 100644 index 25a3ba8ab48..00000000000 --- a/src/libstd/rt/mpmc_bounded_queue.rs +++ /dev/null @@ -1,209 +0,0 @@ -/* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved. - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. - * - * 2. Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * - * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED - * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF - * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT - * SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, - * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR - * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF - * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE - * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF - * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - * The views and conclusions contained in the software and documentation are - * those of the authors and should not be interpreted as representing official - * policies, either expressed or implied, of Dmitry Vyukov. - */ - -// http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue - -use unstable::sync::UnsafeArc; -use unstable::atomics::{AtomicUint,Relaxed,Release,Acquire}; -use option::*; -use vec; -use clone::Clone; -use kinds::Send; -use num::{Exponential,Algebraic,Round}; - -struct Node<T> { - sequence: AtomicUint, - value: Option<T>, -} - -struct State<T> { - pad0: [u8, ..64], - buffer: ~[Node<T>], - mask: uint, - pad1: [u8, ..64], - enqueue_pos: AtomicUint, - pad2: [u8, ..64], - dequeue_pos: AtomicUint, - pad3: [u8, ..64], -} - -pub struct Queue<T> { - priv state: UnsafeArc<State<T>>, -} - -impl<T: Send> State<T> { - fn with_capacity(capacity: uint) -> State<T> { - let capacity = if capacity < 2 || (capacity & (capacity - 1)) != 0 { - if capacity < 2 { - 2u - } else { - // use next power of 2 as capacity - 2f64.pow(&((capacity as f64).log2().ceil())) as uint - } - } else { - capacity - }; - let buffer = vec::from_fn(capacity, |i:uint| { - Node{sequence:AtomicUint::new(i),value:None} - }); - State{ - pad0: [0, ..64], - buffer: buffer, - mask: capacity-1, - pad1: [0, ..64], - enqueue_pos: AtomicUint::new(0), - pad2: [0, ..64], - dequeue_pos: AtomicUint::new(0), - pad3: [0, ..64], - } - } - - fn push(&mut self, value: T) -> bool { - let mask = self.mask; - let mut pos = self.enqueue_pos.load(Relaxed); - loop { - let node = &mut self.buffer[pos & mask]; - let seq = node.sequence.load(Acquire); - let diff: int = seq as int - pos as int; - - if diff == 0 { - let enqueue_pos = self.enqueue_pos.compare_and_swap(pos, pos+1, Relaxed); - if enqueue_pos == pos { - node.value = Some(value); - node.sequence.store(pos+1, Release); - break - } else { - pos = enqueue_pos; - } - } else if (diff < 0) { - return false - } else { - pos = self.enqueue_pos.load(Relaxed); - } - } - true - } - - fn pop(&mut self) -> Option<T> { - let mask = self.mask; - let mut pos = self.dequeue_pos.load(Relaxed); - loop { - let node = &mut self.buffer[pos & mask]; - let seq = node.sequence.load(Acquire); - let diff: int = seq as int - (pos + 1) as int; - if diff == 0 { - let dequeue_pos = self.dequeue_pos.compare_and_swap(pos, pos+1, Relaxed); - if dequeue_pos == pos { - let value = node.value.take(); - node.sequence.store(pos + mask + 1, Release); - return value - } else { - pos = dequeue_pos; - } - } else if diff < 0 { - return None - } else { - pos = self.dequeue_pos.load(Relaxed); - } - } - } -} - -impl<T: Send> Queue<T> { - pub fn with_capacity(capacity: uint) -> Queue<T> { - Queue{ - state: UnsafeArc::new(State::with_capacity(capacity)) - } - } - - pub fn push(&mut self, value: T) -> bool { - unsafe { (*self.state.get()).push(value) } - } - - pub fn pop(&mut self) -> Option<T> { - unsafe { (*self.state.get()).pop() } - } -} - -impl<T: Send> Clone for Queue<T> { - fn clone(&self) -> Queue<T> { - Queue { - state: self.state.clone() - } - } -} - -#[cfg(test)] -mod tests { - use prelude::*; - use option::*; - use task; - use super::Queue; - - #[test] - fn test() { - let nthreads = 8u; - let nmsgs = 1000u; - let mut q = Queue::with_capacity(nthreads*nmsgs); - assert_eq!(None, q.pop()); - - for _ in range(0, nthreads) { - let q = q.clone(); - do task::spawn_sched(task::SingleThreaded) { - let mut q = q; - for i in range(0, nmsgs) { - assert!(q.push(i)); - } - } - } - - let mut completion_ports = ~[]; - for _ in range(0, nthreads) { - let (completion_port, completion_chan) = Chan::new(); - completion_ports.push(completion_port); - let q = q.clone(); - do task::spawn_sched(task::SingleThreaded) { - let mut q = q; - let mut i = 0u; - loop { - match q.pop() { - None => {}, - Some(_) => { - i += 1; - if i == nmsgs { break } - } - } - } - completion_chan.send(i); - } - } - - for completion_port in completion_ports.mut_iter() { - assert_eq!(nmsgs, completion_port.recv()); - } - } -} diff --git a/src/libstd/rt/mpsc_queue.rs b/src/libstd/rt/mpsc_queue.rs deleted file mode 100644 index d575028af70..00000000000 --- a/src/libstd/rt/mpsc_queue.rs +++ /dev/null @@ -1,215 +0,0 @@ -/* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved. - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. - * - * 2. Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * - * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED - * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF - * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT - * SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, - * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR - * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF - * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE - * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF - * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - * The views and conclusions contained in the software and documentation are - * those of the authors and should not be interpreted as representing official - * policies, either expressed or implied, of Dmitry Vyukov. - */ - -//! A mostly lock-free multi-producer, single consumer queue. - -// http://www.1024cores.net/home/lock-free-algorithms -// /queues/non-intrusive-mpsc-node-based-queue - -use cast; -use clone::Clone; -use kinds::Send; -use ops::Drop; -use option::{Option, None, Some}; -use unstable::atomics::{AtomicPtr, Release, Acquire, AcqRel, Relaxed}; -use unstable::sync::UnsafeArc; - -pub enum PopResult<T> { - /// Some data has been popped - Data(T), - /// The queue is empty - Empty, - /// The queue is in an inconsistent state. Popping data should succeed, but - /// some pushers have yet to make enough progress in order allow a pop to - /// succeed. It is recommended that a pop() occur "in the near future" in - /// order to see if the sender has made progress or not - Inconsistent, -} - -struct Node<T> { - next: AtomicPtr<Node<T>>, - value: Option<T>, -} - -struct State<T, P> { - head: AtomicPtr<Node<T>>, - tail: *mut Node<T>, - packet: P, -} - -pub struct Consumer<T, P> { - priv state: UnsafeArc<State<T, P>>, -} - -pub struct Producer<T, P> { - priv state: UnsafeArc<State<T, P>>, -} - -impl<T: Send, P: Send> Clone for Producer<T, P> { - fn clone(&self) -> Producer<T, P> { - Producer { state: self.state.clone() } - } -} - -pub fn queue<T: Send, P: Send>(p: P) -> (Consumer<T, P>, Producer<T, P>) { - unsafe { - let (a, b) = UnsafeArc::new2(State::new(p)); - (Consumer { state: a }, Producer { state: b }) - } -} - -impl<T> Node<T> { - unsafe fn new(v: Option<T>) -> *mut Node<T> { - cast::transmute(~Node { - next: AtomicPtr::new(0 as *mut Node<T>), - value: v, - }) - } -} - -impl<T: Send, P: Send> State<T, P> { - pub unsafe fn new(p: P) -> State<T, P> { - let stub = Node::new(None); - State { - head: AtomicPtr::new(stub), - tail: stub, - packet: p, - } - } - - unsafe fn push(&mut self, t: T) { - let n = Node::new(Some(t)); - let prev = self.head.swap(n, AcqRel); - (*prev).next.store(n, Release); - } - - unsafe fn pop(&mut self) -> PopResult<T> { - let tail = self.tail; - let next = (*tail).next.load(Acquire); - - if !next.is_null() { - self.tail = next; - assert!((*tail).value.is_none()); - assert!((*next).value.is_some()); - let ret = (*next).value.take_unwrap(); - let _: ~Node<T> = cast::transmute(tail); - return Data(ret); - } - - if self.head.load(Acquire) == tail {Empty} else {Inconsistent} - } - - unsafe fn is_empty(&mut self) -> bool { - return (*self.tail).next.load(Acquire).is_null(); - } -} - -#[unsafe_destructor] -impl<T: Send, P: Send> Drop for State<T, P> { - fn drop(&mut self) { - unsafe { - let mut cur = self.tail; - while !cur.is_null() { - let next = (*cur).next.load(Relaxed); - let _: ~Node<T> = cast::transmute(cur); - cur = next; - } - } - } -} - -impl<T: Send, P: Send> Producer<T, P> { - pub fn push(&mut self, value: T) { - unsafe { (*self.state.get()).push(value) } - } - pub fn is_empty(&self) -> bool { - unsafe{ (*self.state.get()).is_empty() } - } - pub unsafe fn packet(&self) -> *mut P { - &mut (*self.state.get()).packet as *mut P - } -} - -impl<T: Send, P: Send> Consumer<T, P> { - pub fn pop(&mut self) -> PopResult<T> { - unsafe { (*self.state.get()).pop() } - } - pub fn casual_pop(&mut self) -> Option<T> { - match self.pop() { - Data(t) => Some(t), - Empty | Inconsistent => None, - } - } - pub unsafe fn packet(&self) -> *mut P { - &mut (*self.state.get()).packet as *mut P - } -} - -#[cfg(test)] -mod tests { - use prelude::*; - - use task; - use super::{queue, Data, Empty, Inconsistent}; - - #[test] - fn test_full() { - let (_, mut p) = queue(()); - p.push(~1); - p.push(~2); - } - - #[test] - fn test() { - let nthreads = 8u; - let nmsgs = 1000u; - let (mut c, p) = queue(()); - match c.pop() { - Empty => {} - Inconsistent | Data(..) => fail!() - } - - for _ in range(0, nthreads) { - let q = p.clone(); - do task::spawn_sched(task::SingleThreaded) { - let mut q = q; - for i in range(0, nmsgs) { - q.push(i); - } - } - } - - let mut i = 0u; - while i < nthreads * nmsgs { - match c.pop() { - Empty | Inconsistent => {}, - Data(_) => { i += 1 } - } - } - } -} - diff --git a/src/libstd/rt/spsc_queue.rs b/src/libstd/rt/spsc_queue.rs deleted file mode 100644 index f14533d726a..00000000000 --- a/src/libstd/rt/spsc_queue.rs +++ /dev/null @@ -1,296 +0,0 @@ -/* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved. - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. - * - * 2. Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * - * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED - * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF - * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT - * SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, - * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR - * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF - * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE - * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF - * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - * The views and conclusions contained in the software and documentation are - * those of the authors and should not be interpreted as representing official - * policies, either expressed or implied, of Dmitry Vyukov. - */ - -// http://www.1024cores.net/home/lock-free-algorithms/queues/unbounded-spsc-queue -use cast; -use kinds::Send; -use ops::Drop; -use option::{Some, None, Option}; -use unstable::atomics::{AtomicPtr, Relaxed, AtomicUint, Acquire, Release}; -use unstable::sync::UnsafeArc; - -// Node within the linked list queue of messages to send -struct Node<T> { - // XXX: this could be an uninitialized T if we're careful enough, and - // that would reduce memory usage (and be a bit faster). - // is it worth it? - value: Option<T>, // nullable for re-use of nodes - next: AtomicPtr<Node<T>>, // next node in the queue -} - -// The producer/consumer halves both need access to the `tail` field, and if -// they both have access to that we may as well just give them both access -// to this whole structure. -struct State<T, P> { - // consumer fields - tail: *mut Node<T>, // where to pop from - tail_prev: AtomicPtr<Node<T>>, // where to pop from - - // producer fields - head: *mut Node<T>, // where to push to - first: *mut Node<T>, // where to get new nodes from - tail_copy: *mut Node<T>, // between first/tail - - // Cache maintenance fields. Additions and subtractions are stored - // separately in order to allow them to use nonatomic addition/subtraction. - cache_bound: uint, - cache_additions: AtomicUint, - cache_subtractions: AtomicUint, - - packet: P, -} - -pub struct Producer<T, P> { - priv state: UnsafeArc<State<T, P>>, -} - -pub struct Consumer<T, P> { - priv state: UnsafeArc<State<T, P>>, -} - -pub fn queue<T: Send, P: Send>(bound: uint, - p: P) -> (Consumer<T, P>, Producer<T, P>) -{ - let n1 = Node::new(); - let n2 = Node::new(); - unsafe { (*n1).next.store(n2, Relaxed) } - let state = State { - tail: n2, - tail_prev: AtomicPtr::new(n1), - head: n2, - first: n1, - tail_copy: n1, - cache_bound: bound, - cache_additions: AtomicUint::new(0), - cache_subtractions: AtomicUint::new(0), - packet: p, - }; - let (arc1, arc2) = UnsafeArc::new2(state); - (Consumer { state: arc1 }, Producer { state: arc2 }) -} - -impl<T: Send> Node<T> { - fn new() -> *mut Node<T> { - unsafe { - cast::transmute(~Node { - value: None, - next: AtomicPtr::new(0 as *mut Node<T>), - }) - } - } -} - -impl<T: Send, P: Send> Producer<T, P> { - pub fn push(&mut self, t: T) { - unsafe { (*self.state.get()).push(t) } - } - pub fn is_empty(&self) -> bool { - unsafe { (*self.state.get()).is_empty() } - } - pub unsafe fn packet(&self) -> *mut P { - &mut (*self.state.get()).packet as *mut P - } -} - -impl<T: Send, P: Send> Consumer<T, P> { - pub fn pop(&mut self) -> Option<T> { - unsafe { (*self.state.get()).pop() } - } - pub unsafe fn packet(&self) -> *mut P { - &mut (*self.state.get()).packet as *mut P - } -} - -impl<T: Send, P: Send> State<T, P> { - // remember that there is only one thread executing `push` (and only one - // thread executing `pop`) - unsafe fn push(&mut self, t: T) { - // Acquire a node (which either uses a cached one or allocates a new - // one), and then append this to the 'head' node. - let n = self.alloc(); - assert!((*n).value.is_none()); - (*n).value = Some(t); - (*n).next.store(0 as *mut Node<T>, Relaxed); - (*self.head).next.store(n, Release); - self.head = n; - } - - unsafe fn alloc(&mut self) -> *mut Node<T> { - // First try to see if we can consume the 'first' node for our uses. - // We try to avoid as many atomic instructions as possible here, so - // the addition to cache_subtractions is not atomic (plus we're the - // only one subtracting from the cache). - if self.first != self.tail_copy { - if self.cache_bound > 0 { - let b = self.cache_subtractions.load(Relaxed); - self.cache_subtractions.store(b + 1, Relaxed); - } - let ret = self.first; - self.first = (*ret).next.load(Relaxed); - return ret; - } - // If the above fails, then update our copy of the tail and try - // again. - self.tail_copy = self.tail_prev.load(Acquire); - if self.first != self.tail_copy { - if self.cache_bound > 0 { - let b = self.cache_subtractions.load(Relaxed); - self.cache_subtractions.store(b + 1, Relaxed); - } - let ret = self.first; - self.first = (*ret).next.load(Relaxed); - return ret; - } - // If all of that fails, then we have to allocate a new node - // (there's nothing in the node cache). - Node::new() - } - - // remember that there is only one thread executing `pop` (and only one - // thread executing `push`) - unsafe fn pop(&mut self) -> Option<T> { - // The `tail` node is not actually a used node, but rather a - // sentinel from where we should start popping from. Hence, look at - // tail's next field and see if we can use it. If we do a pop, then - // the current tail node is a candidate for going into the cache. - let tail = self.tail; - let next = (*tail).next.load(Acquire); - if next.is_null() { return None } - assert!((*next).value.is_some()); - let ret = (*next).value.take(); - - self.tail = next; - if self.cache_bound == 0 { - self.tail_prev.store(tail, Release); - } else { - // XXX: this is dubious with overflow. - let additions = self.cache_additions.load(Relaxed); - let subtractions = self.cache_subtractions.load(Relaxed); - let size = additions - subtractions; - - if size < self.cache_bound { - self.tail_prev.store(tail, Release); - self.cache_additions.store(additions + 1, Relaxed); - } else { - (*self.tail_prev.load(Relaxed)).next.store(next, Relaxed); - // We have successfully erased all references to 'tail', so - // now we can safely drop it. - let _: ~Node<T> = cast::transmute(tail); - } - } - return ret; - } - - unsafe fn is_empty(&self) -> bool { - let tail = self.tail; - let next = (*tail).next.load(Acquire); - return next.is_null(); - } -} - -#[unsafe_destructor] -impl<T: Send, P: Send> Drop for State<T, P> { - fn drop(&mut self) { - unsafe { - let mut cur = self.first; - while !cur.is_null() { - let next = (*cur).next.load(Relaxed); - let _n: ~Node<T> = cast::transmute(cur); - cur = next; - } - } - } -} - -#[cfg(test)] -mod test { - use prelude::*; - use super::queue; - use task; - - #[test] - fn smoke() { - let (mut c, mut p) = queue(0, ()); - p.push(1); - p.push(2); - assert_eq!(c.pop(), Some(1)); - assert_eq!(c.pop(), Some(2)); - assert_eq!(c.pop(), None); - p.push(3); - p.push(4); - assert_eq!(c.pop(), Some(3)); - assert_eq!(c.pop(), Some(4)); - assert_eq!(c.pop(), None); - } - - #[test] - fn drop_full() { - let (_, mut p) = queue(0, ()); - p.push(~1); - p.push(~2); - } - - #[test] - fn smoke_bound() { - let (mut c, mut p) = queue(1, ()); - p.push(1); - p.push(2); - assert_eq!(c.pop(), Some(1)); - assert_eq!(c.pop(), Some(2)); - assert_eq!(c.pop(), None); - p.push(3); - p.push(4); - assert_eq!(c.pop(), Some(3)); - assert_eq!(c.pop(), Some(4)); - assert_eq!(c.pop(), None); - } - - #[test] - fn stress() { - stress_bound(0); - stress_bound(1); - - fn stress_bound(bound: uint) { - let (c, mut p) = queue(bound, ()); - do task::spawn_sched(task::SingleThreaded) { - let mut c = c; - for _ in range(0, 100000) { - loop { - match c.pop() { - Some(1) => break, - Some(_) => fail!(), - None => {} - } - } - } - } - for _ in range(0, 100000) { - p.push(1); - } - } - } -} |
