diff options
| author | Alex Crichton <alex@alexcrichton.com> | 2014-06-07 11:13:26 -0700 |
|---|---|---|
| committer | Alex Crichton <alex@alexcrichton.com> | 2014-06-11 10:00:43 -0700 |
| commit | b1c9ce9c6f0eb7d4a7df1aad6b6799f4b548181c (patch) | |
| tree | 219196013c141f0f2110ac1df21db05433a71e4b /src/libstd/sync | |
| parent | c690191a84728c289a4b3dc17b07934a66311d9d (diff) | |
| download | rust-b1c9ce9c6f0eb7d4a7df1aad6b6799f4b548181c.tar.gz rust-b1c9ce9c6f0eb7d4a7df1aad6b6799f4b548181c.zip | |
sync: Move underneath libstd
This commit is the final step in the libstd facade, #13851. The purpose of this commit is to move libsync underneath the standard library, behind the facade. This will allow core primitives like channels, queues, and atomics to all live in the same location. There were a few notable changes and a few breaking changes as part of this movement: * The `Vec` and `String` types are reexported at the top level of libcollections * The `unreachable!()` macro was copied to libcore * The `std::rt::thread` module was moved to librustrt, but it is still reexported at the same location. * The `std::comm` module was moved to libsync * The `sync::comm` module was moved under `sync::comm`, and renamed to `duplex`. It is now a private module with types/functions being reexported under `sync::comm`. This is a breaking change for any existing users of duplex streams. * All concurrent queues/deques were moved directly under libsync. They are also all marked with #![experimental] for now if they are public. * The `task_pool` and `future` modules no longer live in libsync, but rather live under `std::sync`. They will forever live at this location, but they may move to libsync if the `std::task` module moves as well. [breaking-change]
Diffstat (limited to 'src/libstd/sync')
| -rw-r--r-- | src/libstd/sync/atomics.rs | 234 | ||||
| -rw-r--r-- | src/libstd/sync/deque.rs | 666 | ||||
| -rw-r--r-- | src/libstd/sync/future.rs | 209 | ||||
| -rw-r--r-- | src/libstd/sync/mod.rs | 16 | ||||
| -rw-r--r-- | src/libstd/sync/mpmc_bounded_queue.rs | 220 | ||||
| -rw-r--r-- | src/libstd/sync/mpsc_queue.rs | 208 | ||||
| -rw-r--r-- | src/libstd/sync/spsc_queue.rs | 300 | ||||
| -rw-r--r-- | src/libstd/sync/task_pool.rs | 98 |
8 files changed, 318 insertions, 1633 deletions
diff --git a/src/libstd/sync/atomics.rs b/src/libstd/sync/atomics.rs deleted file mode 100644 index b2565a6a449..00000000000 --- a/src/libstd/sync/atomics.rs +++ /dev/null @@ -1,234 +0,0 @@ -// Copyright 2012-2014 The Rust Project Developers. See the COPYRIGHT -// file at the top-level directory of this distribution and at -// http://rust-lang.org/COPYRIGHT. -// -// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or -// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license -// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -//! Atomic types -//! -//! Atomic types provide primitive shared-memory communication between -//! threads, and are the building blocks of other concurrent -//! types. -//! -//! This module defines atomic versions of a select number of primitive -//! types, including `AtomicBool`, `AtomicInt`, `AtomicUint`, and `AtomicOption`. -//! Atomic types present operations that, when used correctly, synchronize -//! updates between threads. -//! -//! Each method takes an `Ordering` which represents the strength of -//! the memory barrier for that operation. These orderings are the -//! same as [C++11 atomic orderings][1]. -//! -//! [1]: http://gcc.gnu.org/wiki/Atomic/GCCMM/AtomicSync -//! -//! Atomic variables are safe to share between threads (they implement `Share`) -//! but they do not themselves provide the mechanism for sharing. The most -//! common way to share an atomic variable is to put it into an `Arc` (an -//! atomically-reference-counted shared pointer). -//! -//! Most atomic types may be stored in static variables, initialized using -//! the provided static initializers like `INIT_ATOMIC_BOOL`. Atomic statics -//! are often used for lazy global initialization. -//! -//! -//! # Examples -//! -//! A simple spinlock: -//! -//! ``` -//! extern crate sync; -//! -//! use sync::Arc; -//! use std::sync::atomics::{AtomicUint, SeqCst}; -//! use std::task::deschedule; -//! -//! fn main() { -//! let spinlock = Arc::new(AtomicUint::new(1)); -//! -//! let spinlock_clone = spinlock.clone(); -//! spawn(proc() { -//! spinlock_clone.store(0, SeqCst); -//! }); -//! -//! // Wait for the other task to release the lock -//! while spinlock.load(SeqCst) != 0 { -//! // Since tasks may not be preemptive (if they are green threads) -//! // yield to the scheduler to let the other task run. Low level -//! // concurrent code needs to take into account Rust's two threading -//! // models. -//! deschedule(); -//! } -//! } -//! ``` -//! -//! Transferring a heap object with `AtomicOption`: -//! -//! ``` -//! extern crate sync; -//! -//! use sync::Arc; -//! use std::sync::atomics::{AtomicOption, SeqCst}; -//! -//! fn main() { -//! struct BigObject; -//! -//! let shared_big_object = Arc::new(AtomicOption::empty()); -//! -//! let shared_big_object_clone = shared_big_object.clone(); -//! spawn(proc() { -//! let unwrapped_big_object = shared_big_object_clone.take(SeqCst); -//! if unwrapped_big_object.is_some() { -//! println!("got a big object from another task"); -//! } else { -//! println!("other task hasn't sent big object yet"); -//! } -//! }); -//! -//! shared_big_object.swap(box BigObject, SeqCst); -//! } -//! ``` -//! -//! Keep a global count of live tasks: -//! -//! ``` -//! use std::sync::atomics::{AtomicUint, SeqCst, INIT_ATOMIC_UINT}; -//! -//! static mut GLOBAL_TASK_COUNT: AtomicUint = INIT_ATOMIC_UINT; -//! -//! unsafe { -//! let old_task_count = GLOBAL_TASK_COUNT.fetch_add(1, SeqCst); -//! println!("live tasks: {}", old_task_count + 1); -//! } -//! ``` - -use mem; -use ops::Drop; -use option::{Option,Some,None}; -use owned::Box; - -pub use core::atomics::{AtomicBool, AtomicInt, AtomicUint, AtomicPtr}; -pub use core::atomics::{Ordering, Relaxed, Release, Acquire, AcqRel, SeqCst}; -pub use core::atomics::{INIT_ATOMIC_BOOL, INIT_ATOMIC_INT, INIT_ATOMIC_UINT}; -pub use core::atomics::fence; - -/// An atomic, nullable unique pointer -/// -/// This can be used as the concurrency primitive for operations that transfer -/// owned heap objects across tasks. -#[unsafe_no_drop_flag] -pub struct AtomicOption<T> { - p: AtomicUint, -} - -impl<T> AtomicOption<T> { - /// Create a new `AtomicOption` - pub fn new(p: Box<T>) -> AtomicOption<T> { - unsafe { AtomicOption { p: AtomicUint::new(mem::transmute(p)) } } - } - - /// Create a new `AtomicOption` that doesn't contain a value - pub fn empty() -> AtomicOption<T> { AtomicOption { p: AtomicUint::new(0) } } - - /// Store a value, returning the old value - #[inline] - pub fn swap(&self, val: Box<T>, order: Ordering) -> Option<Box<T>> { - let val = unsafe { mem::transmute(val) }; - - match self.p.swap(val, order) { - 0 => None, - n => Some(unsafe { mem::transmute(n) }), - } - } - - /// Remove the value, leaving the `AtomicOption` empty. - #[inline] - pub fn take(&self, order: Ordering) -> Option<Box<T>> { - unsafe { self.swap(mem::transmute(0), order) } - } - - /// Replace an empty value with a non-empty value. - /// - /// Succeeds if the option is `None` and returns `None` if so. If - /// the option was already `Some`, returns `Some` of the rejected - /// value. - #[inline] - pub fn fill(&self, val: Box<T>, order: Ordering) -> Option<Box<T>> { - unsafe { - let val = mem::transmute(val); - let expected = mem::transmute(0); - let oldval = self.p.compare_and_swap(expected, val, order); - if oldval == expected { - None - } else { - Some(mem::transmute(val)) - } - } - } - - /// Returns `true` if the `AtomicOption` is empty. - /// - /// Be careful: The caller must have some external method of ensuring the - /// result does not get invalidated by another task after this returns. - #[inline] - pub fn is_empty(&self, order: Ordering) -> bool { - self.p.load(order) as uint == 0 - } -} - -#[unsafe_destructor] -impl<T> Drop for AtomicOption<T> { - fn drop(&mut self) { - let _ = self.take(SeqCst); - } -} - -#[cfg(test)] -mod test { - use option::*; - use super::*; - - #[test] - fn option_empty() { - let option: AtomicOption<()> = AtomicOption::empty(); - assert!(option.is_empty(SeqCst)); - } - - #[test] - fn option_swap() { - let p = AtomicOption::new(box 1); - let a = box 2; - - let b = p.swap(a, SeqCst); - - assert!(b == Some(box 1)); - assert!(p.take(SeqCst) == Some(box 2)); - } - - #[test] - fn option_take() { - let p = AtomicOption::new(box 1); - - assert!(p.take(SeqCst) == Some(box 1)); - assert!(p.take(SeqCst) == None); - - let p2 = box 2; - p.swap(p2, SeqCst); - - assert!(p.take(SeqCst) == Some(box 2)); - } - - #[test] - fn option_fill() { - let p = AtomicOption::new(box 1); - assert!(p.fill(box 2, SeqCst).is_some()); // should fail; shouldn't leak! - assert!(p.take(SeqCst) == Some(box 1)); - - assert!(p.fill(box 2, SeqCst).is_none()); // shouldn't fail - assert!(p.take(SeqCst) == Some(box 2)); - } -} - diff --git a/src/libstd/sync/deque.rs b/src/libstd/sync/deque.rs deleted file mode 100644 index 39e420685ab..00000000000 --- a/src/libstd/sync/deque.rs +++ /dev/null @@ -1,666 +0,0 @@ -// Copyright 2013 The Rust Project Developers. See the COPYRIGHT -// file at the top-level directory of this distribution and at -// http://rust-lang.org/COPYRIGHT. -// -// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or -// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license -// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -//! A (mostly) lock-free concurrent work-stealing deque -//! -//! This module contains an implementation of the Chase-Lev work stealing deque -//! described in "Dynamic Circular Work-Stealing Deque". The implementation is -//! heavily based on the pseudocode found in the paper. -//! -//! This implementation does not want to have the restriction of a garbage -//! collector for reclamation of buffers, and instead it uses a shared pool of -//! buffers. This shared pool is required for correctness in this -//! implementation. -//! -//! The only lock-synchronized portions of this deque are the buffer allocation -//! and deallocation portions. Otherwise all operations are lock-free. -//! -//! # Example -//! -//! use std::rt::deque::BufferPool; -//! -//! let mut pool = BufferPool::new(); -//! let (mut worker, mut stealer) = pool.deque(); -//! -//! // Only the worker may push/pop -//! worker.push(1); -//! worker.pop(); -//! -//! // Stealers take data from the other end of the deque -//! worker.push(1); -//! stealer.steal(); -//! -//! // Stealers can be cloned to have many stealers stealing in parallel -//! worker.push(1); -//! let mut stealer2 = stealer.clone(); -//! stealer2.steal(); - -// NB: the "buffer pool" strategy is not done for speed, but rather for -// correctness. For more info, see the comment on `swap_buffer` - -// FIXME: all atomic operations in this module use a SeqCst ordering. That is -// probably overkill - -use alloc::arc::Arc; - -use clone::Clone; -use iter::{range, Iterator}; -use kinds::Send; -use kinds::marker; -use mem::{forget, min_align_of, size_of, transmute, overwrite}; -use ops::Drop; -use option::{Option, Some, None}; -use owned::Box; -use ptr::RawPtr; -use ptr; -use rt::heap::{allocate, deallocate}; -use slice::ImmutableVector; -use sync::atomics::{AtomicInt, AtomicPtr, SeqCst}; -use rt::exclusive::Exclusive; -use vec::Vec; - -// Once the queue is less than 1/K full, then it will be downsized. Note that -// the deque requires that this number be less than 2. -static K: int = 4; - -// Minimum number of bits that a buffer size should be. No buffer will resize to -// under this value, and all deques will initially contain a buffer of this -// size. -// -// The size in question is 1 << MIN_BITS -static MIN_BITS: int = 7; - -struct Deque<T> { - bottom: AtomicInt, - top: AtomicInt, - array: AtomicPtr<Buffer<T>>, - pool: BufferPool<T>, -} - -/// Worker half of the work-stealing deque. This worker has exclusive access to -/// one side of the deque, and uses `push` and `pop` method to manipulate it. -/// -/// There may only be one worker per deque. -pub struct Worker<T> { - deque: Arc<Deque<T>>, - noshare: marker::NoShare, -} - -/// The stealing half of the work-stealing deque. Stealers have access to the -/// opposite end of the deque from the worker, and they only have access to the -/// `steal` method. -pub struct Stealer<T> { - deque: Arc<Deque<T>>, - noshare: marker::NoShare, -} - -/// When stealing some data, this is an enumeration of the possible outcomes. -#[deriving(PartialEq, Show)] -pub enum Stolen<T> { - /// The deque was empty at the time of stealing - Empty, - /// The stealer lost the race for stealing data, and a retry may return more - /// data. - Abort, - /// The stealer has successfully stolen some data. - Data(T), -} - -/// The allocation pool for buffers used by work-stealing deques. Right now this -/// structure is used for reclamation of memory after it is no longer in use by -/// deques. -/// -/// This data structure is protected by a mutex, but it is rarely used. Deques -/// will only use this structure when allocating a new buffer or deallocating a -/// previous one. -pub struct BufferPool<T> { - pool: Arc<Exclusive<Vec<Box<Buffer<T>>>>>, -} - -/// An internal buffer used by the chase-lev deque. This structure is actually -/// implemented as a circular buffer, and is used as the intermediate storage of -/// the data in the deque. -/// -/// This type is implemented with *T instead of Vec<T> for two reasons: -/// -/// 1. There is nothing safe about using this buffer. This easily allows the -/// same value to be read twice in to rust, and there is nothing to -/// prevent this. The usage by the deque must ensure that one of the -/// values is forgotten. Furthermore, we only ever want to manually run -/// destructors for values in this buffer (on drop) because the bounds -/// are defined by the deque it's owned by. -/// -/// 2. We can certainly avoid bounds checks using *T instead of Vec<T>, although -/// LLVM is probably pretty good at doing this already. -struct Buffer<T> { - storage: *T, - log_size: int, -} - -impl<T: Send> BufferPool<T> { - /// Allocates a new buffer pool which in turn can be used to allocate new - /// deques. - pub fn new() -> BufferPool<T> { - BufferPool { pool: Arc::new(Exclusive::new(vec!())) } - } - - /// Allocates a new work-stealing deque which will send/receiving memory to - /// and from this buffer pool. - pub fn deque(&self) -> (Worker<T>, Stealer<T>) { - let a = Arc::new(Deque::new(self.clone())); - let b = a.clone(); - (Worker { deque: a, noshare: marker::NoShare }, - Stealer { deque: b, noshare: marker::NoShare }) - } - - fn alloc(&self, bits: int) -> Box<Buffer<T>> { - unsafe { - let mut pool = self.pool.lock(); - match pool.iter().position(|x| x.size() >= (1 << bits)) { - Some(i) => pool.remove(i).unwrap(), - None => box Buffer::new(bits) - } - } - } - - fn free(&self, buf: Box<Buffer<T>>) { - unsafe { - let mut pool = self.pool.lock(); - match pool.iter().position(|v| v.size() > buf.size()) { - Some(i) => pool.insert(i, buf), - None => pool.push(buf), - } - } - } -} - -impl<T: Send> Clone for BufferPool<T> { - fn clone(&self) -> BufferPool<T> { BufferPool { pool: self.pool.clone() } } -} - -impl<T: Send> Worker<T> { - /// Pushes data onto the front of this work queue. - pub fn push(&self, t: T) { - unsafe { self.deque.push(t) } - } - /// Pops data off the front of the work queue, returning `None` on an empty - /// queue. - pub fn pop(&self) -> Option<T> { - unsafe { self.deque.pop() } - } - - /// Gets access to the buffer pool that this worker is attached to. This can - /// be used to create more deques which share the same buffer pool as this - /// deque. - pub fn pool<'a>(&'a self) -> &'a BufferPool<T> { - &self.deque.pool - } -} - -impl<T: Send> Stealer<T> { - /// Steals work off the end of the queue (opposite of the worker's end) - pub fn steal(&self) -> Stolen<T> { - unsafe { self.deque.steal() } - } - - /// Gets access to the buffer pool that this stealer is attached to. This - /// can be used to create more deques which share the same buffer pool as - /// this deque. - pub fn pool<'a>(&'a self) -> &'a BufferPool<T> { - &self.deque.pool - } -} - -impl<T: Send> Clone for Stealer<T> { - fn clone(&self) -> Stealer<T> { - Stealer { deque: self.deque.clone(), noshare: marker::NoShare } - } -} - -// Almost all of this code can be found directly in the paper so I'm not -// personally going to heavily comment what's going on here. - -impl<T: Send> Deque<T> { - fn new(pool: BufferPool<T>) -> Deque<T> { - let buf = pool.alloc(MIN_BITS); - Deque { - bottom: AtomicInt::new(0), - top: AtomicInt::new(0), - array: AtomicPtr::new(unsafe { transmute(buf) }), - pool: pool, - } - } - - unsafe fn push(&self, data: T) { - let mut b = self.bottom.load(SeqCst); - let t = self.top.load(SeqCst); - let mut a = self.array.load(SeqCst); - let size = b - t; - if size >= (*a).size() - 1 { - // You won't find this code in the chase-lev deque paper. This is - // alluded to in a small footnote, however. We always free a buffer - // when growing in order to prevent leaks. - a = self.swap_buffer(b, a, (*a).resize(b, t, 1)); - b = self.bottom.load(SeqCst); - } - (*a).put(b, data); - self.bottom.store(b + 1, SeqCst); - } - - unsafe fn pop(&self) -> Option<T> { - let b = self.bottom.load(SeqCst); - let a = self.array.load(SeqCst); - let b = b - 1; - self.bottom.store(b, SeqCst); - let t = self.top.load(SeqCst); - let size = b - t; - if size < 0 { - self.bottom.store(t, SeqCst); - return None; - } - let data = (*a).get(b); - if size > 0 { - self.maybe_shrink(b, t); - return Some(data); - } - if self.top.compare_and_swap(t, t + 1, SeqCst) == t { - self.bottom.store(t + 1, SeqCst); - return Some(data); - } else { - self.bottom.store(t + 1, SeqCst); - forget(data); // someone else stole this value - return None; - } - } - - unsafe fn steal(&self) -> Stolen<T> { - let t = self.top.load(SeqCst); - let old = self.array.load(SeqCst); - let b = self.bottom.load(SeqCst); - let a = self.array.load(SeqCst); - let size = b - t; - if size <= 0 { return Empty } - if size % (*a).size() == 0 { - if a == old && t == self.top.load(SeqCst) { - return Empty - } - return Abort - } - let data = (*a).get(t); - if self.top.compare_and_swap(t, t + 1, SeqCst) == t { - Data(data) - } else { - forget(data); // someone else stole this value - Abort - } - } - - unsafe fn maybe_shrink(&self, b: int, t: int) { - let a = self.array.load(SeqCst); - if b - t < (*a).size() / K && b - t > (1 << MIN_BITS) { - self.swap_buffer(b, a, (*a).resize(b, t, -1)); - } - } - - // Helper routine not mentioned in the paper which is used in growing and - // shrinking buffers to swap in a new buffer into place. As a bit of a - // recap, the whole point that we need a buffer pool rather than just - // calling malloc/free directly is that stealers can continue using buffers - // after this method has called 'free' on it. The continued usage is simply - // a read followed by a forget, but we must make sure that the memory can - // continue to be read after we flag this buffer for reclamation. - unsafe fn swap_buffer(&self, b: int, old: *mut Buffer<T>, - buf: Buffer<T>) -> *mut Buffer<T> { - let newbuf: *mut Buffer<T> = transmute(box buf); - self.array.store(newbuf, SeqCst); - let ss = (*newbuf).size(); - self.bottom.store(b + ss, SeqCst); - let t = self.top.load(SeqCst); - if self.top.compare_and_swap(t, t + ss, SeqCst) != t { - self.bottom.store(b, SeqCst); - } - self.pool.free(transmute(old)); - return newbuf; - } -} - - -#[unsafe_destructor] -impl<T: Send> Drop for Deque<T> { - fn drop(&mut self) { - let t = self.top.load(SeqCst); - let b = self.bottom.load(SeqCst); - let a = self.array.load(SeqCst); - // Free whatever is leftover in the dequeue, and then move the buffer - // back into the pool. - for i in range(t, b) { - let _: T = unsafe { (*a).get(i) }; - } - self.pool.free(unsafe { transmute(a) }); - } -} - -#[inline] -fn buffer_alloc_size<T>(log_size: int) -> uint { - (1 << log_size) * size_of::<T>() -} - -impl<T: Send> Buffer<T> { - unsafe fn new(log_size: int) -> Buffer<T> { - let size = buffer_alloc_size::<T>(log_size); - let buffer = allocate(size, min_align_of::<T>()); - Buffer { - storage: buffer as *T, - log_size: log_size, - } - } - - fn size(&self) -> int { 1 << self.log_size } - - // Apparently LLVM cannot optimize (foo % (1 << bar)) into this implicitly - fn mask(&self) -> int { (1 << self.log_size) - 1 } - - unsafe fn elem(&self, i: int) -> *T { self.storage.offset(i & self.mask()) } - - // This does not protect against loading duplicate values of the same cell, - // nor does this clear out the contents contained within. Hence, this is a - // very unsafe method which the caller needs to treat specially in case a - // race is lost. - unsafe fn get(&self, i: int) -> T { - ptr::read(self.elem(i)) - } - - // Unsafe because this unsafely overwrites possibly uninitialized or - // initialized data. - unsafe fn put(&self, i: int, t: T) { - overwrite(self.elem(i) as *mut T, t); - } - - // Again, unsafe because this has incredibly dubious ownership violations. - // It is assumed that this buffer is immediately dropped. - unsafe fn resize(&self, b: int, t: int, delta: int) -> Buffer<T> { - let buf = Buffer::new(self.log_size + delta); - for i in range(t, b) { - buf.put(i, self.get(i)); - } - return buf; - } -} - -#[unsafe_destructor] -impl<T: Send> Drop for Buffer<T> { - fn drop(&mut self) { - // It is assumed that all buffers are empty on drop. - let size = buffer_alloc_size::<T>(self.log_size); - unsafe { deallocate(self.storage as *mut u8, size, min_align_of::<T>()) } - } -} - -#[cfg(test)] -mod tests { - use prelude::*; - use super::{Data, BufferPool, Abort, Empty, Worker, Stealer}; - - use mem; - use owned::Box; - use rt::thread::Thread; - use rand; - use rand::Rng; - use sync::atomics::{AtomicBool, INIT_ATOMIC_BOOL, SeqCst, - AtomicUint, INIT_ATOMIC_UINT}; - use vec; - - #[test] - fn smoke() { - let pool = BufferPool::new(); - let (w, s) = pool.deque(); - assert_eq!(w.pop(), None); - assert_eq!(s.steal(), Empty); - w.push(1); - assert_eq!(w.pop(), Some(1)); - w.push(1); - assert_eq!(s.steal(), Data(1)); - w.push(1); - assert_eq!(s.clone().steal(), Data(1)); - } - - #[test] - fn stealpush() { - static AMT: int = 100000; - let pool = BufferPool::<int>::new(); - let (w, s) = pool.deque(); - let t = Thread::start(proc() { - let mut left = AMT; - while left > 0 { - match s.steal() { - Data(i) => { - assert_eq!(i, 1); - left -= 1; - } - Abort | Empty => {} - } - } - }); - - for _ in range(0, AMT) { - w.push(1); - } - - t.join(); - } - - #[test] - fn stealpush_large() { - static AMT: int = 100000; - let pool = BufferPool::<(int, int)>::new(); - let (w, s) = pool.deque(); - let t = Thread::start(proc() { - let mut left = AMT; - while left > 0 { - match s.steal() { - Data((1, 10)) => { left -= 1; } - Data(..) => fail!(), - Abort | Empty => {} - } - } - }); - - for _ in range(0, AMT) { - w.push((1, 10)); - } - - t.join(); - } - - fn stampede(w: Worker<Box<int>>, s: Stealer<Box<int>>, - nthreads: int, amt: uint) { - for _ in range(0, amt) { - w.push(box 20); - } - let mut remaining = AtomicUint::new(amt); - let unsafe_remaining: *mut AtomicUint = &mut remaining; - - let threads = range(0, nthreads).map(|_| { - let s = s.clone(); - Thread::start(proc() { - unsafe { - while (*unsafe_remaining).load(SeqCst) > 0 { - match s.steal() { - Data(box 20) => { - (*unsafe_remaining).fetch_sub(1, SeqCst); - } - Data(..) => fail!(), - Abort | Empty => {} - } - } - } - }) - }).collect::<Vec<Thread<()>>>(); - - while remaining.load(SeqCst) > 0 { - match w.pop() { - Some(box 20) => { remaining.fetch_sub(1, SeqCst); } - Some(..) => fail!(), - None => {} - } - } - - for thread in threads.move_iter() { - thread.join(); - } - } - - #[test] - fn run_stampede() { - let pool = BufferPool::<Box<int>>::new(); - let (w, s) = pool.deque(); - stampede(w, s, 8, 10000); - } - - #[test] - fn many_stampede() { - static AMT: uint = 4; - let pool = BufferPool::<Box<int>>::new(); - let threads = range(0, AMT).map(|_| { - let (w, s) = pool.deque(); - Thread::start(proc() { - stampede(w, s, 4, 10000); - }) - }).collect::<Vec<Thread<()>>>(); - - for thread in threads.move_iter() { - thread.join(); - } - } - - #[test] - fn stress() { - static AMT: int = 100000; - static NTHREADS: int = 8; - static mut DONE: AtomicBool = INIT_ATOMIC_BOOL; - static mut HITS: AtomicUint = INIT_ATOMIC_UINT; - let pool = BufferPool::<int>::new(); - let (w, s) = pool.deque(); - - let threads = range(0, NTHREADS).map(|_| { - let s = s.clone(); - Thread::start(proc() { - unsafe { - loop { - match s.steal() { - Data(2) => { HITS.fetch_add(1, SeqCst); } - Data(..) => fail!(), - _ if DONE.load(SeqCst) => break, - _ => {} - } - } - } - }) - }).collect::<Vec<Thread<()>>>(); - - let mut rng = rand::task_rng(); - let mut expected = 0; - while expected < AMT { - if rng.gen_range(0, 3) == 2 { - match w.pop() { - None => {} - Some(2) => unsafe { HITS.fetch_add(1, SeqCst); }, - Some(_) => fail!(), - } - } else { - expected += 1; - w.push(2); - } - } - - unsafe { - while HITS.load(SeqCst) < AMT as uint { - match w.pop() { - None => {} - Some(2) => { HITS.fetch_add(1, SeqCst); }, - Some(_) => fail!(), - } - } - DONE.store(true, SeqCst); - } - - for thread in threads.move_iter() { - thread.join(); - } - - assert_eq!(unsafe { HITS.load(SeqCst) }, expected as uint); - } - - #[test] - #[ignore(cfg(windows))] // apparently windows scheduling is weird? - fn no_starvation() { - static AMT: int = 10000; - static NTHREADS: int = 4; - static mut DONE: AtomicBool = INIT_ATOMIC_BOOL; - let pool = BufferPool::<(int, uint)>::new(); - let (w, s) = pool.deque(); - - let (threads, hits) = vec::unzip(range(0, NTHREADS).map(|_| { - let s = s.clone(); - let unique_box = box AtomicUint::new(0); - let thread_box = unsafe { - *mem::transmute::<&Box<AtomicUint>, **mut AtomicUint>(&unique_box) - }; - (Thread::start(proc() { - unsafe { - loop { - match s.steal() { - Data((1, 2)) => { - (*thread_box).fetch_add(1, SeqCst); - } - Data(..) => fail!(), - _ if DONE.load(SeqCst) => break, - _ => {} - } - } - } - }), unique_box) - })); - - let mut rng = rand::task_rng(); - let mut myhit = false; - let mut iter = 0; - 'outer: loop { - for _ in range(0, rng.gen_range(0, AMT)) { - if !myhit && rng.gen_range(0, 3) == 2 { - match w.pop() { - None => {} - Some((1, 2)) => myhit = true, - Some(_) => fail!(), - } - } else { - w.push((1, 2)); - } - } - iter += 1; - - debug!("loop iteration {}", iter); - for (i, slot) in hits.iter().enumerate() { - let amt = slot.load(SeqCst); - debug!("thread {}: {}", i, amt); - if amt == 0 { continue 'outer; } - } - if myhit { - break - } - } - - unsafe { DONE.store(true, SeqCst); } - - for thread in threads.move_iter() { - thread.join(); - } - } -} diff --git a/src/libstd/sync/future.rs b/src/libstd/sync/future.rs new file mode 100644 index 00000000000..bc748324fcd --- /dev/null +++ b/src/libstd/sync/future.rs @@ -0,0 +1,209 @@ +// Copyright 2012-2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +/*! + * A type representing values that may be computed concurrently and + * operations for working with them. + * + * # Example + * + * ```rust + * use std::sync::Future; + * # fn fib(n: uint) -> uint {42}; + * # fn make_a_sandwich() {}; + * let mut delayed_fib = Future::spawn(proc() { fib(5000) }); + * make_a_sandwich(); + * println!("fib(5000) = {}", delayed_fib.get()) + * ``` + */ + +#![allow(missing_doc)] + +use core::prelude::*; +use core::mem::replace; + +use comm::{Receiver, channel}; +use task::spawn; + +/// A type encapsulating the result of a computation which may not be complete +pub struct Future<A> { + state: FutureState<A>, +} + +enum FutureState<A> { + Pending(proc():Send -> A), + Evaluating, + Forced(A) +} + +/// Methods on the `future` type +impl<A:Clone> Future<A> { + pub fn get(&mut self) -> A { + //! Get the value of the future. + (*(self.get_ref())).clone() + } +} + +impl<A> Future<A> { + /// Gets the value from this future, forcing evaluation. + pub fn unwrap(mut self) -> A { + self.get_ref(); + let state = replace(&mut self.state, Evaluating); + match state { + Forced(v) => v, + _ => fail!( "Logic error." ), + } + } + + pub fn get_ref<'a>(&'a mut self) -> &'a A { + /*! + * Executes the future's closure and then returns a reference + * to the result. The reference lasts as long as + * the future. + */ + match self.state { + Forced(ref v) => return v, + Evaluating => fail!("Recursive forcing of future!"), + Pending(_) => { + match replace(&mut self.state, Evaluating) { + Forced(_) | Evaluating => fail!("Logic error."), + Pending(f) => { + self.state = Forced(f()); + self.get_ref() + } + } + } + } + } + + pub fn from_value(val: A) -> Future<A> { + /*! + * Create a future from a value. + * + * The value is immediately available and calling `get` later will + * not block. + */ + + Future {state: Forced(val)} + } + + pub fn from_fn(f: proc():Send -> A) -> Future<A> { + /*! + * Create a future from a function. + * + * The first time that the value is requested it will be retrieved by + * calling the function. Note that this function is a local + * function. It is not spawned into another task. + */ + + Future {state: Pending(f)} + } +} + +impl<A:Send> Future<A> { + pub fn from_receiver(rx: Receiver<A>) -> Future<A> { + /*! + * Create a future from a port + * + * The first time that the value is requested the task will block + * waiting for the result to be received on the port. + */ + + Future::from_fn(proc() { + rx.recv() + }) + } + + pub fn spawn(blk: proc():Send -> A) -> Future<A> { + /*! + * Create a future from a unique closure. + * + * The closure will be run in a new task and its result used as the + * value of the future. + */ + + let (tx, rx) = channel(); + + spawn(proc() { + tx.send(blk()); + }); + + Future::from_receiver(rx) + } +} + +#[cfg(test)] +mod test { + use prelude::*; + use sync::Future; + use task; + + #[test] + fn test_from_value() { + let mut f = Future::from_value("snail".to_string()); + assert_eq!(f.get(), "snail".to_string()); + } + + #[test] + fn test_from_receiver() { + let (tx, rx) = channel(); + tx.send("whale".to_string()); + let mut f = Future::from_receiver(rx); + assert_eq!(f.get(), "whale".to_string()); + } + + #[test] + fn test_from_fn() { + let mut f = Future::from_fn(proc() "brail".to_string()); + assert_eq!(f.get(), "brail".to_string()); + } + + #[test] + fn test_interface_get() { + let mut f = Future::from_value("fail".to_string()); + assert_eq!(f.get(), "fail".to_string()); + } + + #[test] + fn test_interface_unwrap() { + let f = Future::from_value("fail".to_string()); + assert_eq!(f.unwrap(), "fail".to_string()); + } + + #[test] + fn test_get_ref_method() { + let mut f = Future::from_value(22); + assert_eq!(*f.get_ref(), 22); + } + + #[test] + fn test_spawn() { + let mut f = Future::spawn(proc() "bale".to_string()); + assert_eq!(f.get(), "bale".to_string()); + } + + #[test] + #[should_fail] + fn test_futurefail() { + let mut f = Future::spawn(proc() fail!()); + let _x: String = f.get(); + } + + #[test] + fn test_sendable_future() { + let expected = "schlorf"; + let f = Future::spawn(proc() { expected }); + task::spawn(proc() { + let mut f = f; + let actual = f.get(); + assert_eq!(actual, expected); + }); + } +} diff --git a/src/libstd/sync/mod.rs b/src/libstd/sync/mod.rs index b2cf427edc8..5f45ce25502 100644 --- a/src/libstd/sync/mod.rs +++ b/src/libstd/sync/mod.rs @@ -15,8 +15,14 @@ //! and/or blocking at all, but rather provide the necessary tools to build //! other types of concurrent primitives. -pub mod atomics; -pub mod deque; -pub mod mpmc_bounded_queue; -pub mod mpsc_queue; -pub mod spsc_queue; +pub use core_sync::{atomics, deque, mpmc_bounded_queue, mpsc_queue, spsc_queue}; +pub use core_sync::{Arc, Weak, Mutex, MutexGuard, Condvar, Barrier}; +pub use core_sync::{RWLock, RWLockReadGuard, RWLockWriteGuard}; +pub use core_sync::{Semaphore, SemaphoreGuard}; +pub use core_sync::one::{Once, ONCE_INIT}; + +pub use self::future::Future; +pub use self::task_pool::TaskPool; + +mod future; +mod task_pool; diff --git a/src/libstd/sync/mpmc_bounded_queue.rs b/src/libstd/sync/mpmc_bounded_queue.rs deleted file mode 100644 index ffad9c1c583..00000000000 --- a/src/libstd/sync/mpmc_bounded_queue.rs +++ /dev/null @@ -1,220 +0,0 @@ -/* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved. - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. - * - * 2. Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * - * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED - * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF - * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT - * SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, - * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR - * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF - * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE - * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF - * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - * The views and conclusions contained in the software and documentation are - * those of the authors and should not be interpreted as representing official - * policies, either expressed or implied, of Dmitry Vyukov. - */ - -#![allow(missing_doc, dead_code)] - -// http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue - -use alloc::arc::Arc; - -use clone::Clone; -use kinds::Send; -use num::next_power_of_two; -use option::{Option, Some, None}; -use sync::atomics::{AtomicUint,Relaxed,Release,Acquire}; -use vec::Vec; -use ty::Unsafe; - -struct Node<T> { - sequence: AtomicUint, - value: Option<T>, -} - -struct State<T> { - pad0: [u8, ..64], - buffer: Vec<Unsafe<Node<T>>>, - mask: uint, - pad1: [u8, ..64], - enqueue_pos: AtomicUint, - pad2: [u8, ..64], - dequeue_pos: AtomicUint, - pad3: [u8, ..64], -} - -pub struct Queue<T> { - state: Arc<State<T>>, -} - -impl<T: Send> State<T> { - fn with_capacity(capacity: uint) -> State<T> { - let capacity = if capacity < 2 || (capacity & (capacity - 1)) != 0 { - if capacity < 2 { - 2u - } else { - // use next power of 2 as capacity - next_power_of_two(capacity) - } - } else { - capacity - }; - let buffer = Vec::from_fn(capacity, |i| { - Unsafe::new(Node { sequence:AtomicUint::new(i), value: None }) - }); - State{ - pad0: [0, ..64], - buffer: buffer, - mask: capacity-1, - pad1: [0, ..64], - enqueue_pos: AtomicUint::new(0), - pad2: [0, ..64], - dequeue_pos: AtomicUint::new(0), - pad3: [0, ..64], - } - } - - fn push(&self, value: T) -> bool { - let mask = self.mask; - let mut pos = self.enqueue_pos.load(Relaxed); - loop { - let node = self.buffer.get(pos & mask); - let seq = unsafe { (*node.get()).sequence.load(Acquire) }; - let diff: int = seq as int - pos as int; - - if diff == 0 { - let enqueue_pos = self.enqueue_pos.compare_and_swap(pos, pos+1, Relaxed); - if enqueue_pos == pos { - unsafe { - (*node.get()).value = Some(value); - (*node.get()).sequence.store(pos+1, Release); - } - break - } else { - pos = enqueue_pos; - } - } else if diff < 0 { - return false - } else { - pos = self.enqueue_pos.load(Relaxed); - } - } - true - } - - fn pop(&self) -> Option<T> { - let mask = self.mask; - let mut pos = self.dequeue_pos.load(Relaxed); - loop { - let node = self.buffer.get(pos & mask); - let seq = unsafe { (*node.get()).sequence.load(Acquire) }; - let diff: int = seq as int - (pos + 1) as int; - if diff == 0 { - let dequeue_pos = self.dequeue_pos.compare_and_swap(pos, pos+1, Relaxed); - if dequeue_pos == pos { - unsafe { - let value = (*node.get()).value.take(); - (*node.get()).sequence.store(pos + mask + 1, Release); - return value - } - } else { - pos = dequeue_pos; - } - } else if diff < 0 { - return None - } else { - pos = self.dequeue_pos.load(Relaxed); - } - } - } -} - -impl<T: Send> Queue<T> { - pub fn with_capacity(capacity: uint) -> Queue<T> { - Queue{ - state: Arc::new(State::with_capacity(capacity)) - } - } - - pub fn push(&self, value: T) -> bool { - self.state.push(value) - } - - pub fn pop(&self) -> Option<T> { - self.state.pop() - } -} - -impl<T: Send> Clone for Queue<T> { - fn clone(&self) -> Queue<T> { - Queue { state: self.state.clone() } - } -} - -#[cfg(test)] -mod tests { - use prelude::*; - use super::Queue; - use native; - - #[test] - fn test() { - let nthreads = 8u; - let nmsgs = 1000u; - let q = Queue::with_capacity(nthreads*nmsgs); - assert_eq!(None, q.pop()); - let (tx, rx) = channel(); - - for _ in range(0, nthreads) { - let q = q.clone(); - let tx = tx.clone(); - native::task::spawn(proc() { - let q = q; - for i in range(0, nmsgs) { - assert!(q.push(i)); - } - tx.send(()); - }); - } - - let mut completion_rxs = vec![]; - for _ in range(0, nthreads) { - let (tx, rx) = channel(); - completion_rxs.push(rx); - let q = q.clone(); - native::task::spawn(proc() { - let q = q; - let mut i = 0u; - loop { - match q.pop() { - None => {}, - Some(_) => { - i += 1; - if i == nmsgs { break } - } - } - } - tx.send(i); - }); - } - - for rx in completion_rxs.mut_iter() { - assert_eq!(nmsgs, rx.recv()); - } - for _ in range(0, nthreads) { - rx.recv(); - } - } -} diff --git a/src/libstd/sync/mpsc_queue.rs b/src/libstd/sync/mpsc_queue.rs deleted file mode 100644 index 4db24e82d37..00000000000 --- a/src/libstd/sync/mpsc_queue.rs +++ /dev/null @@ -1,208 +0,0 @@ -/* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved. - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. - * - * 2. Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * - * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED - * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF - * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT - * SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, - * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR - * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF - * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE - * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF - * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - * The views and conclusions contained in the software and documentation are - * those of the authors and should not be interpreted as representing official - * policies, either expressed or implied, of Dmitry Vyukov. - */ - -//! A mostly lock-free multi-producer, single consumer queue. -//! -//! This module contains an implementation of a concurrent MPSC queue. This -//! queue can be used to share data between tasks, and is also used as the -//! building block of channels in rust. -//! -//! Note that the current implementation of this queue has a caveat of the `pop` -//! method, and see the method for more information about it. Due to this -//! caveat, this queue may not be appropriate for all use-cases. - -// http://www.1024cores.net/home/lock-free-algorithms -// /queues/non-intrusive-mpsc-node-based-queue - -use kinds::Send; -use mem; -use ops::Drop; -use option::{Option, None, Some}; -use owned::Box; -use ptr::RawPtr; -use sync::atomics::{AtomicPtr, Release, Acquire, AcqRel, Relaxed}; -use ty::Unsafe; - -/// A result of the `pop` function. -pub enum PopResult<T> { - /// Some data has been popped - Data(T), - /// The queue is empty - Empty, - /// The queue is in an inconsistent state. Popping data should succeed, but - /// some pushers have yet to make enough progress in order allow a pop to - /// succeed. It is recommended that a pop() occur "in the near future" in - /// order to see if the sender has made progress or not - Inconsistent, -} - -struct Node<T> { - next: AtomicPtr<Node<T>>, - value: Option<T>, -} - -/// The multi-producer single-consumer structure. This is not cloneable, but it -/// may be safely shared so long as it is guaranteed that there is only one -/// popper at a time (many pushers are allowed). -pub struct Queue<T> { - head: AtomicPtr<Node<T>>, - tail: Unsafe<*mut Node<T>>, -} - -impl<T> Node<T> { - unsafe fn new(v: Option<T>) -> *mut Node<T> { - mem::transmute(box Node { - next: AtomicPtr::new(0 as *mut Node<T>), - value: v, - }) - } -} - -impl<T: Send> Queue<T> { - /// Creates a new queue that is safe to share among multiple producers and - /// one consumer. - pub fn new() -> Queue<T> { - let stub = unsafe { Node::new(None) }; - Queue { - head: AtomicPtr::new(stub), - tail: Unsafe::new(stub), - } - } - - /// Pushes a new value onto this queue. - pub fn push(&self, t: T) { - unsafe { - let n = Node::new(Some(t)); - let prev = self.head.swap(n, AcqRel); - (*prev).next.store(n, Release); - } - } - - /// Pops some data from this queue. - /// - /// Note that the current implementation means that this function cannot - /// return `Option<T>`. It is possible for this queue to be in an - /// inconsistent state where many pushes have succeeded and completely - /// finished, but pops cannot return `Some(t)`. This inconsistent state - /// happens when a pusher is pre-empted at an inopportune moment. - /// - /// This inconsistent state means that this queue does indeed have data, but - /// it does not currently have access to it at this time. - pub fn pop(&self) -> PopResult<T> { - unsafe { - let tail = *self.tail.get(); - let next = (*tail).next.load(Acquire); - - if !next.is_null() { - *self.tail.get() = next; - assert!((*tail).value.is_none()); - assert!((*next).value.is_some()); - let ret = (*next).value.take_unwrap(); - let _: Box<Node<T>> = mem::transmute(tail); - return Data(ret); - } - - if self.head.load(Acquire) == tail {Empty} else {Inconsistent} - } - } - - /// Attempts to pop data from this queue, but doesn't attempt too hard. This - /// will canonicalize inconsistent states to a `None` value. - pub fn casual_pop(&self) -> Option<T> { - match self.pop() { - Data(t) => Some(t), - Empty | Inconsistent => None, - } - } -} - -#[unsafe_destructor] -impl<T: Send> Drop for Queue<T> { - fn drop(&mut self) { - unsafe { - let mut cur = *self.tail.get(); - while !cur.is_null() { - let next = (*cur).next.load(Relaxed); - let _: Box<Node<T>> = mem::transmute(cur); - cur = next; - } - } - } -} - -#[cfg(test)] -mod tests { - use prelude::*; - - use alloc::arc::Arc; - - use native; - use super::{Queue, Data, Empty, Inconsistent}; - - #[test] - fn test_full() { - let q = Queue::new(); - q.push(box 1); - q.push(box 2); - } - - #[test] - fn test() { - let nthreads = 8u; - let nmsgs = 1000u; - let q = Queue::new(); - match q.pop() { - Empty => {} - Inconsistent | Data(..) => fail!() - } - let (tx, rx) = channel(); - let q = Arc::new(q); - - for _ in range(0, nthreads) { - let tx = tx.clone(); - let q = q.clone(); - native::task::spawn(proc() { - for i in range(0, nmsgs) { - q.push(i); - } - tx.send(()); - }); - } - - let mut i = 0u; - while i < nthreads * nmsgs { - match q.pop() { - Empty | Inconsistent => {}, - Data(_) => { i += 1 } - } - } - drop(tx); - for _ in range(0, nthreads) { - rx.recv(); - } - } -} diff --git a/src/libstd/sync/spsc_queue.rs b/src/libstd/sync/spsc_queue.rs deleted file mode 100644 index fb515c9db6e..00000000000 --- a/src/libstd/sync/spsc_queue.rs +++ /dev/null @@ -1,300 +0,0 @@ -/* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved. - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. - * - * 2. Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * - * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED - * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF - * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT - * SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, - * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR - * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF - * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE - * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF - * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - * The views and conclusions contained in the software and documentation are - * those of the authors and should not be interpreted as representing official - * policies, either expressed or implied, of Dmitry Vyukov. - */ - -// http://www.1024cores.net/home/lock-free-algorithms/queues/unbounded-spsc-queue - -//! A single-producer single-consumer concurrent queue -//! -//! This module contains the implementation of an SPSC queue which can be used -//! concurrently between two tasks. This data structure is safe to use and -//! enforces the semantics that there is one pusher and one popper. - -use kinds::Send; -use mem; -use ops::Drop; -use option::{Some, None, Option}; -use owned::Box; -use ptr::RawPtr; -use sync::atomics::{AtomicPtr, Relaxed, AtomicUint, Acquire, Release}; -use ty::Unsafe; - -// Node within the linked list queue of messages to send -struct Node<T> { - // FIXME: this could be an uninitialized T if we're careful enough, and - // that would reduce memory usage (and be a bit faster). - // is it worth it? - value: Option<T>, // nullable for re-use of nodes - next: AtomicPtr<Node<T>>, // next node in the queue -} - -/// The single-producer single-consumer queue. This structure is not cloneable, -/// but it can be safely shared in an Arc if it is guaranteed that there -/// is only one popper and one pusher touching the queue at any one point in -/// time. -pub struct Queue<T> { - // consumer fields - tail: Unsafe<*mut Node<T>>, // where to pop from - tail_prev: AtomicPtr<Node<T>>, // where to pop from - - // producer fields - head: Unsafe<*mut Node<T>>, // where to push to - first: Unsafe<*mut Node<T>>, // where to get new nodes from - tail_copy: Unsafe<*mut Node<T>>, // between first/tail - - // Cache maintenance fields. Additions and subtractions are stored - // separately in order to allow them to use nonatomic addition/subtraction. - cache_bound: uint, - cache_additions: AtomicUint, - cache_subtractions: AtomicUint, -} - -impl<T: Send> Node<T> { - fn new() -> *mut Node<T> { - unsafe { - mem::transmute(box Node { - value: None, - next: AtomicPtr::new(0 as *mut Node<T>), - }) - } - } -} - -impl<T: Send> Queue<T> { - /// Creates a new queue. The producer returned is connected to the consumer - /// to push all data to the consumer. - /// - /// # Arguments - /// - /// * `bound` - This queue implementation is implemented with a linked - /// list, and this means that a push is always a malloc. In - /// order to amortize this cost, an internal cache of nodes is - /// maintained to prevent a malloc from always being - /// necessary. This bound is the limit on the size of the - /// cache (if desired). If the value is 0, then the cache has - /// no bound. Otherwise, the cache will never grow larger than - /// `bound` (although the queue itself could be much larger. - pub fn new(bound: uint) -> Queue<T> { - let n1 = Node::new(); - let n2 = Node::new(); - unsafe { (*n1).next.store(n2, Relaxed) } - Queue { - tail: Unsafe::new(n2), - tail_prev: AtomicPtr::new(n1), - head: Unsafe::new(n2), - first: Unsafe::new(n1), - tail_copy: Unsafe::new(n1), - cache_bound: bound, - cache_additions: AtomicUint::new(0), - cache_subtractions: AtomicUint::new(0), - } - } - - /// Pushes a new value onto this queue. Note that to use this function - /// safely, it must be externally guaranteed that there is only one pusher. - pub fn push(&self, t: T) { - unsafe { - // Acquire a node (which either uses a cached one or allocates a new - // one), and then append this to the 'head' node. - let n = self.alloc(); - assert!((*n).value.is_none()); - (*n).value = Some(t); - (*n).next.store(0 as *mut Node<T>, Relaxed); - (**self.head.get()).next.store(n, Release); - *self.head.get() = n; - } - } - - unsafe fn alloc(&self) -> *mut Node<T> { - // First try to see if we can consume the 'first' node for our uses. - // We try to avoid as many atomic instructions as possible here, so - // the addition to cache_subtractions is not atomic (plus we're the - // only one subtracting from the cache). - if *self.first.get() != *self.tail_copy.get() { - if self.cache_bound > 0 { - let b = self.cache_subtractions.load(Relaxed); - self.cache_subtractions.store(b + 1, Relaxed); - } - let ret = *self.first.get(); - *self.first.get() = (*ret).next.load(Relaxed); - return ret; - } - // If the above fails, then update our copy of the tail and try - // again. - *self.tail_copy.get() = self.tail_prev.load(Acquire); - if *self.first.get() != *self.tail_copy.get() { - if self.cache_bound > 0 { - let b = self.cache_subtractions.load(Relaxed); - self.cache_subtractions.store(b + 1, Relaxed); - } - let ret = *self.first.get(); - *self.first.get() = (*ret).next.load(Relaxed); - return ret; - } - // If all of that fails, then we have to allocate a new node - // (there's nothing in the node cache). - Node::new() - } - - /// Attempts to pop a value from this queue. Remember that to use this type - /// safely you must ensure that there is only one popper at a time. - pub fn pop(&self) -> Option<T> { - unsafe { - // The `tail` node is not actually a used node, but rather a - // sentinel from where we should start popping from. Hence, look at - // tail's next field and see if we can use it. If we do a pop, then - // the current tail node is a candidate for going into the cache. - let tail = *self.tail.get(); - let next = (*tail).next.load(Acquire); - if next.is_null() { return None } - assert!((*next).value.is_some()); - let ret = (*next).value.take(); - - *self.tail.get() = next; - if self.cache_bound == 0 { - self.tail_prev.store(tail, Release); - } else { - // FIXME: this is dubious with overflow. - let additions = self.cache_additions.load(Relaxed); - let subtractions = self.cache_subtractions.load(Relaxed); - let size = additions - subtractions; - - if size < self.cache_bound { - self.tail_prev.store(tail, Release); - self.cache_additions.store(additions + 1, Relaxed); - } else { - (*self.tail_prev.load(Relaxed)).next.store(next, Relaxed); - // We have successfully erased all references to 'tail', so - // now we can safely drop it. - let _: Box<Node<T>> = mem::transmute(tail); - } - } - return ret; - } - } - - /// Attempts to peek at the head of the queue, returning `None` if the queue - /// has no data currently - pub fn peek<'a>(&'a self) -> Option<&'a mut T> { - // This is essentially the same as above with all the popping bits - // stripped out. - unsafe { - let tail = *self.tail.get(); - let next = (*tail).next.load(Acquire); - if next.is_null() { return None } - return (*next).value.as_mut(); - } - } -} - -#[unsafe_destructor] -impl<T: Send> Drop for Queue<T> { - fn drop(&mut self) { - unsafe { - let mut cur = *self.first.get(); - while !cur.is_null() { - let next = (*cur).next.load(Relaxed); - let _n: Box<Node<T>> = mem::transmute(cur); - cur = next; - } - } - } -} - -#[cfg(test)] -mod test { - use prelude::*; - - use alloc::arc::Arc; - use native; - - use super::Queue; - - #[test] - fn smoke() { - let q = Queue::new(0); - q.push(1); - q.push(2); - assert_eq!(q.pop(), Some(1)); - assert_eq!(q.pop(), Some(2)); - assert_eq!(q.pop(), None); - q.push(3); - q.push(4); - assert_eq!(q.pop(), Some(3)); - assert_eq!(q.pop(), Some(4)); - assert_eq!(q.pop(), None); - } - - #[test] - fn drop_full() { - let q = Queue::new(0); - q.push(box 1); - q.push(box 2); - } - - #[test] - fn smoke_bound() { - let q = Queue::new(1); - q.push(1); - q.push(2); - assert_eq!(q.pop(), Some(1)); - assert_eq!(q.pop(), Some(2)); - assert_eq!(q.pop(), None); - q.push(3); - q.push(4); - assert_eq!(q.pop(), Some(3)); - assert_eq!(q.pop(), Some(4)); - assert_eq!(q.pop(), None); - } - - #[test] - fn stress() { - stress_bound(0); - stress_bound(1); - - fn stress_bound(bound: uint) { - let a = Arc::new(Queue::new(bound)); - let b = a.clone(); - let (tx, rx) = channel(); - native::task::spawn(proc() { - for _ in range(0, 100000) { - loop { - match b.pop() { - Some(1) => break, - Some(_) => fail!(), - None => {} - } - } - } - tx.send(()); - }); - for _ in range(0, 100000) { - a.push(1); - } - rx.recv(); - } - } -} diff --git a/src/libstd/sync/task_pool.rs b/src/libstd/sync/task_pool.rs new file mode 100644 index 00000000000..7667badf0e7 --- /dev/null +++ b/src/libstd/sync/task_pool.rs @@ -0,0 +1,98 @@ +// Copyright 2012 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +#![allow(missing_doc)] + +/// A task pool abstraction. Useful for achieving predictable CPU +/// parallelism. + +use core::prelude::*; + +use task; +use task::spawn; +use vec::Vec; +use comm::{channel, Sender}; + +enum Msg<T> { + Execute(proc(&T):Send), + Quit +} + +pub struct TaskPool<T> { + channels: Vec<Sender<Msg<T>>>, + next_index: uint, +} + +#[unsafe_destructor] +impl<T> Drop for TaskPool<T> { + fn drop(&mut self) { + for channel in self.channels.mut_iter() { + channel.send(Quit); + } + } +} + +impl<T> TaskPool<T> { + /// Spawns a new task pool with `n_tasks` tasks. If the `sched_mode` + /// is None, the tasks run on this scheduler; otherwise, they run on a + /// new scheduler with the given mode. The provided `init_fn_factory` + /// returns a function which, given the index of the task, should return + /// local data to be kept around in that task. + pub fn new(n_tasks: uint, + init_fn_factory: || -> proc(uint):Send -> T) + -> TaskPool<T> { + assert!(n_tasks >= 1); + + let channels = Vec::from_fn(n_tasks, |i| { + let (tx, rx) = channel::<Msg<T>>(); + let init_fn = init_fn_factory(); + + let task_body = proc() { + let local_data = init_fn(i); + loop { + match rx.recv() { + Execute(f) => f(&local_data), + Quit => break + } + } + }; + + // Run on this scheduler. + task::spawn(task_body); + + tx + }); + + return TaskPool { + channels: channels, + next_index: 0, + }; + } + + /// Executes the function `f` on a task in the pool. The function + /// receives a reference to the local data returned by the `init_fn`. + pub fn execute(&mut self, f: proc(&T):Send) { + self.channels.get(self.next_index).send(Execute(f)); + self.next_index += 1; + if self.next_index == self.channels.len() { self.next_index = 0; } + } +} + +#[test] +fn test_task_pool() { + let f: || -> proc(uint):Send -> uint = || { + let g: proc(uint):Send -> uint = proc(i) i; + g + }; + let mut pool = TaskPool::new(4, f); + for _ in range(0, 8) { + pool.execute(proc(i) println!("Hello from thread {}!", *i)); + } +} |
