diff options
Diffstat (limited to 'src/libstd/sync')
| -rw-r--r-- | src/libstd/sync/arc.rs | 153 | ||||
| -rw-r--r-- | src/libstd/sync/atomics.rs | 603 | ||||
| -rw-r--r-- | src/libstd/sync/deque.rs | 661 | ||||
| -rw-r--r-- | src/libstd/sync/mod.rs | 23 | ||||
| -rw-r--r-- | src/libstd/sync/mpmc_bounded_queue.rs | 211 | ||||
| -rw-r--r-- | src/libstd/sync/mpsc_queue.rs | 245 | ||||
| -rw-r--r-- | src/libstd/sync/spsc_queue.rs | 334 |
7 files changed, 2230 insertions, 0 deletions
diff --git a/src/libstd/sync/arc.rs b/src/libstd/sync/arc.rs new file mode 100644 index 00000000000..7632ec6cf29 --- /dev/null +++ b/src/libstd/sync/arc.rs @@ -0,0 +1,153 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +//! Atomically reference counted data +//! +//! This modules contains the implementation of an atomically reference counted +//! pointer for the purpose of sharing data between tasks. This is obviously a +//! very unsafe primitive to use, but it has its use cases when implementing +//! concurrent data structures and similar tasks. +//! +//! Great care must be taken to ensure that data races do not arise through the +//! usage of `UnsafeArc`, and this often requires some form of external +//! synchronization. The only guarantee provided to you by this class is that +//! the underlying data will remain valid (not free'd) so long as the reference +//! count is greater than one. + +use cast; +use clone::Clone; +use kinds::Send; +use ops::Drop; +use ptr::RawPtr; +use sync::atomics::{AtomicUint, SeqCst, Relaxed, Acquire}; +use vec; + +/// An atomically reference counted pointer. +/// +/// Enforces no shared-memory safety. +#[unsafe_no_drop_flag] +pub struct UnsafeArc<T> { + priv data: *mut ArcData<T>, +} + +struct ArcData<T> { + count: AtomicUint, + data: T, +} + +unsafe fn new_inner<T: Send>(data: T, refcount: uint) -> *mut ArcData<T> { + let data = ~ArcData { count: AtomicUint::new(refcount), data: data }; + cast::transmute(data) +} + +impl<T: Send> UnsafeArc<T> { + /// Creates a new `UnsafeArc` which wraps the given data. + pub fn new(data: T) -> UnsafeArc<T> { + unsafe { UnsafeArc { data: new_inner(data, 1) } } + } + + /// As new(), but returns an extra pre-cloned handle. + pub fn new2(data: T) -> (UnsafeArc<T>, UnsafeArc<T>) { + unsafe { + let ptr = new_inner(data, 2); + (UnsafeArc { data: ptr }, UnsafeArc { data: ptr }) + } + } + + /// As new(), but returns a vector of as many pre-cloned handles as + /// requested. + pub fn newN(data: T, num_handles: uint) -> ~[UnsafeArc<T>] { + unsafe { + if num_handles == 0 { + ~[] // need to free data here + } else { + let ptr = new_inner(data, num_handles); + vec::from_fn(num_handles, |_| UnsafeArc { data: ptr }) + } + } + } + + /// Gets a pointer to the inner shared data. Note that care must be taken to + /// ensure that the outer `UnsafeArc` does not fall out of scope while this + /// pointer is in use, otherwise it could possibly contain a use-after-free. + #[inline] + pub fn get(&self) -> *mut T { + unsafe { + assert!((*self.data).count.load(Relaxed) > 0); + return &mut (*self.data).data as *mut T; + } + } + + /// Gets an immutable pointer to the inner shared data. This has the same + /// caveats as the `get` method. + #[inline] + pub fn get_immut(&self) -> *T { + unsafe { + assert!((*self.data).count.load(Relaxed) > 0); + return &(*self.data).data as *T; + } + } +} + +impl<T: Send> Clone for UnsafeArc<T> { + fn clone(&self) -> UnsafeArc<T> { + unsafe { + // This barrier might be unnecessary, but I'm not sure... + let old_count = (*self.data).count.fetch_add(1, Acquire); + assert!(old_count >= 1); + return UnsafeArc { data: self.data }; + } + } +} + +#[unsafe_destructor] +impl<T> Drop for UnsafeArc<T>{ + fn drop(&mut self) { + unsafe { + // Happens when destructing an unwrapper's handle and from + // `#[unsafe_no_drop_flag]` + if self.data.is_null() { + return + } + // Must be acquire+release, not just release, to make sure this + // doesn't get reordered to after the unwrapper pointer load. + let old_count = (*self.data).count.fetch_sub(1, SeqCst); + assert!(old_count >= 1); + if old_count == 1 { + let _: ~ArcData<T> = cast::transmute(self.data); + } + } + } +} + +#[cfg(test)] +mod tests { + use prelude::*; + use super::UnsafeArc; + use task; + use mem::size_of; + + #[test] + fn test_size() { + assert_eq!(size_of::<UnsafeArc<[int, ..10]>>(), size_of::<*[int, ..10]>()); + } + + #[test] + fn arclike_newN() { + // Tests that the many-refcounts-at-once constructors don't leak. + let _ = UnsafeArc::new2(~~"hello"); + let x = UnsafeArc::newN(~~"hello", 0); + assert_eq!(x.len(), 0) + let x = UnsafeArc::newN(~~"hello", 1); + assert_eq!(x.len(), 1) + let x = UnsafeArc::newN(~~"hello", 10); + assert_eq!(x.len(), 10) + } +} diff --git a/src/libstd/sync/atomics.rs b/src/libstd/sync/atomics.rs new file mode 100644 index 00000000000..bc9d99c0f37 --- /dev/null +++ b/src/libstd/sync/atomics.rs @@ -0,0 +1,603 @@ +// Copyright 2012-2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +/*! + * Atomic types + * + * Basic atomic types supporting atomic operations. Each method takes an + * `Ordering` which represents the strength of the memory barrier for that + * operation. These orderings are the same as C++11 atomic orderings + * [http://gcc.gnu.org/wiki/Atomic/GCCMM/AtomicSync] + * + * All atomic types are a single word in size. + */ + +#[allow(missing_doc)]; + +use unstable::intrinsics; +use cast; +use option::{Option,Some,None}; +use libc::c_void; +use ops::Drop; +use util::NonCopyable; + +/** + * A simple atomic flag, that can be set and cleared. The most basic atomic type. + */ +pub struct AtomicFlag { + priv v: int, + priv nocopy: NonCopyable +} + +/** + * An atomic boolean type. + */ +pub struct AtomicBool { + priv v: uint, + priv nocopy: NonCopyable +} + +/** + * A signed atomic integer type, supporting basic atomic arithmetic operations + */ +pub struct AtomicInt { + priv v: int, + priv nocopy: NonCopyable +} + +/** + * An unsigned atomic integer type, supporting basic atomic arithmetic operations + */ +pub struct AtomicUint { + priv v: uint, + priv nocopy: NonCopyable +} + +/** + * An unsafe atomic pointer. Only supports basic atomic operations + */ +pub struct AtomicPtr<T> { + priv p: *mut T, + priv nocopy: NonCopyable +} + +/** + * An owned atomic pointer. Ensures that only a single reference to the data is held at any time. + */ +#[unsafe_no_drop_flag] +pub struct AtomicOption<T> { + priv p: *mut c_void +} + +pub enum Ordering { + Relaxed, + Release, + Acquire, + AcqRel, + SeqCst +} + +pub static INIT_ATOMIC_FLAG : AtomicFlag = AtomicFlag { v: 0, nocopy: NonCopyable }; +pub static INIT_ATOMIC_BOOL : AtomicBool = AtomicBool { v: 0, nocopy: NonCopyable }; +pub static INIT_ATOMIC_INT : AtomicInt = AtomicInt { v: 0, nocopy: NonCopyable }; +pub static INIT_ATOMIC_UINT : AtomicUint = AtomicUint { v: 0, nocopy: NonCopyable }; + +impl AtomicFlag { + + pub fn new() -> AtomicFlag { + AtomicFlag { v: 0, nocopy: NonCopyable } + } + + /** + * Clears the atomic flag + */ + #[inline] + pub fn clear(&mut self, order: Ordering) { + unsafe {atomic_store(&mut self.v, 0, order)} + } + + /** + * Sets the flag if it was previously unset, returns the previous value of the + * flag. + */ + #[inline] + pub fn test_and_set(&mut self, order: Ordering) -> bool { + unsafe { atomic_compare_and_swap(&mut self.v, 0, 1, order) > 0 } + } +} + +impl AtomicBool { + pub fn new(v: bool) -> AtomicBool { + AtomicBool { v: if v { 1 } else { 0 }, nocopy: NonCopyable } + } + + #[inline] + pub fn load(&self, order: Ordering) -> bool { + unsafe { atomic_load(&self.v, order) > 0 } + } + + #[inline] + pub fn store(&mut self, val: bool, order: Ordering) { + let val = if val { 1 } else { 0 }; + + unsafe { atomic_store(&mut self.v, val, order); } + } + + #[inline] + pub fn swap(&mut self, val: bool, order: Ordering) -> bool { + let val = if val { 1 } else { 0 }; + + unsafe { atomic_swap(&mut self.v, val, order) > 0 } + } + + #[inline] + pub fn compare_and_swap(&mut self, old: bool, new: bool, order: Ordering) -> bool { + let old = if old { 1 } else { 0 }; + let new = if new { 1 } else { 0 }; + + unsafe { atomic_compare_and_swap(&mut self.v, old, new, order) > 0 } + } + + /// Returns the old value + #[inline] + pub fn fetch_and(&mut self, val: bool, order: Ordering) -> bool { + let val = if val { 1 } else { 0 }; + + unsafe { atomic_and(&mut self.v, val, order) > 0 } + } + + /// Returns the old value + #[inline] + pub fn fetch_nand(&mut self, val: bool, order: Ordering) -> bool { + let val = if val { 1 } else { 0 }; + + unsafe { atomic_nand(&mut self.v, val, order) > 0 } + } + + /// Returns the old value + #[inline] + pub fn fetch_or(&mut self, val: bool, order: Ordering) -> bool { + let val = if val { 1 } else { 0 }; + + unsafe { atomic_or(&mut self.v, val, order) > 0 } + } + + /// Returns the old value + #[inline] + pub fn fetch_xor(&mut self, val: bool, order: Ordering) -> bool { + let val = if val { 1 } else { 0 }; + + unsafe { atomic_xor(&mut self.v, val, order) > 0 } + } +} + +impl AtomicInt { + pub fn new(v: int) -> AtomicInt { + AtomicInt { v:v, nocopy: NonCopyable } + } + + #[inline] + pub fn load(&self, order: Ordering) -> int { + unsafe { atomic_load(&self.v, order) } + } + + #[inline] + pub fn store(&mut self, val: int, order: Ordering) { + unsafe { atomic_store(&mut self.v, val, order); } + } + + #[inline] + pub fn swap(&mut self, val: int, order: Ordering) -> int { + unsafe { atomic_swap(&mut self.v, val, order) } + } + + #[inline] + pub fn compare_and_swap(&mut self, old: int, new: int, order: Ordering) -> int { + unsafe { atomic_compare_and_swap(&mut self.v, old, new, order) } + } + + /// Returns the old value (like __sync_fetch_and_add). + #[inline] + pub fn fetch_add(&mut self, val: int, order: Ordering) -> int { + unsafe { atomic_add(&mut self.v, val, order) } + } + + /// Returns the old value (like __sync_fetch_and_sub). + #[inline] + pub fn fetch_sub(&mut self, val: int, order: Ordering) -> int { + unsafe { atomic_sub(&mut self.v, val, order) } + } +} + +impl AtomicUint { + pub fn new(v: uint) -> AtomicUint { + AtomicUint { v:v, nocopy: NonCopyable } + } + + #[inline] + pub fn load(&self, order: Ordering) -> uint { + unsafe { atomic_load(&self.v, order) } + } + + #[inline] + pub fn store(&mut self, val: uint, order: Ordering) { + unsafe { atomic_store(&mut self.v, val, order); } + } + + #[inline] + pub fn swap(&mut self, val: uint, order: Ordering) -> uint { + unsafe { atomic_swap(&mut self.v, val, order) } + } + + #[inline] + pub fn compare_and_swap(&mut self, old: uint, new: uint, order: Ordering) -> uint { + unsafe { atomic_compare_and_swap(&mut self.v, old, new, order) } + } + + /// Returns the old value (like __sync_fetch_and_add). + #[inline] + pub fn fetch_add(&mut self, val: uint, order: Ordering) -> uint { + unsafe { atomic_add(&mut self.v, val, order) } + } + + /// Returns the old value (like __sync_fetch_and_sub).. + #[inline] + pub fn fetch_sub(&mut self, val: uint, order: Ordering) -> uint { + unsafe { atomic_sub(&mut self.v, val, order) } + } +} + +impl<T> AtomicPtr<T> { + pub fn new(p: *mut T) -> AtomicPtr<T> { + AtomicPtr { p:p, nocopy: NonCopyable } + } + + #[inline] + pub fn load(&self, order: Ordering) -> *mut T { + unsafe { atomic_load(&self.p, order) } + } + + #[inline] + pub fn store(&mut self, ptr: *mut T, order: Ordering) { + unsafe { atomic_store(&mut self.p, ptr, order); } + } + + #[inline] + pub fn swap(&mut self, ptr: *mut T, order: Ordering) -> *mut T { + unsafe { atomic_swap(&mut self.p, ptr, order) } + } + + #[inline] + pub fn compare_and_swap(&mut self, old: *mut T, new: *mut T, order: Ordering) -> *mut T { + unsafe { atomic_compare_and_swap(&mut self.p, old, new, order) } + } +} + +impl<T> AtomicOption<T> { + pub fn new(p: ~T) -> AtomicOption<T> { + unsafe { + AtomicOption { + p: cast::transmute(p) + } + } + } + + pub fn empty() -> AtomicOption<T> { + unsafe { + AtomicOption { + p: cast::transmute(0) + } + } + } + + #[inline] + pub fn swap(&mut self, val: ~T, order: Ordering) -> Option<~T> { + unsafe { + let val = cast::transmute(val); + + let p = atomic_swap(&mut self.p, val, order); + let pv : &uint = cast::transmute(&p); + + if *pv == 0 { + None + } else { + Some(cast::transmute(p)) + } + } + } + + #[inline] + pub fn take(&mut self, order: Ordering) -> Option<~T> { + unsafe { + self.swap(cast::transmute(0), order) + } + } + + /// A compare-and-swap. Succeeds if the option is 'None' and returns 'None' + /// if so. If the option was already 'Some', returns 'Some' of the rejected + /// value. + #[inline] + pub fn fill(&mut self, val: ~T, order: Ordering) -> Option<~T> { + unsafe { + let val = cast::transmute(val); + let expected = cast::transmute(0); + let oldval = atomic_compare_and_swap(&mut self.p, expected, val, order); + if oldval == expected { + None + } else { + Some(cast::transmute(val)) + } + } + } + + /// Be careful: The caller must have some external method of ensuring the + /// result does not get invalidated by another task after this returns. + #[inline] + pub fn is_empty(&mut self, order: Ordering) -> bool { + unsafe { atomic_load(&self.p, order) == cast::transmute(0) } + } +} + +#[unsafe_destructor] +impl<T> Drop for AtomicOption<T> { + fn drop(&mut self) { + let _ = self.take(SeqCst); + } +} + +#[inline] +pub unsafe fn atomic_store<T>(dst: &mut T, val: T, order:Ordering) { + let dst = cast::transmute(dst); + let val = cast::transmute(val); + + match order { + Release => intrinsics::atomic_store_rel(dst, val), + Relaxed => intrinsics::atomic_store_relaxed(dst, val), + _ => intrinsics::atomic_store(dst, val) + } +} + +#[inline] +pub unsafe fn atomic_load<T>(dst: &T, order:Ordering) -> T { + let dst = cast::transmute(dst); + + cast::transmute(match order { + Acquire => intrinsics::atomic_load_acq(dst), + Relaxed => intrinsics::atomic_load_relaxed(dst), + _ => intrinsics::atomic_load(dst) + }) +} + +#[inline] +pub unsafe fn atomic_swap<T>(dst: &mut T, val: T, order: Ordering) -> T { + let dst = cast::transmute(dst); + let val = cast::transmute(val); + + cast::transmute(match order { + Acquire => intrinsics::atomic_xchg_acq(dst, val), + Release => intrinsics::atomic_xchg_rel(dst, val), + AcqRel => intrinsics::atomic_xchg_acqrel(dst, val), + Relaxed => intrinsics::atomic_xchg_relaxed(dst, val), + _ => intrinsics::atomic_xchg(dst, val) + }) +} + +/// Returns the old value (like __sync_fetch_and_add). +#[inline] +pub unsafe fn atomic_add<T>(dst: &mut T, val: T, order: Ordering) -> T { + let dst = cast::transmute(dst); + let val = cast::transmute(val); + + cast::transmute(match order { + Acquire => intrinsics::atomic_xadd_acq(dst, val), + Release => intrinsics::atomic_xadd_rel(dst, val), + AcqRel => intrinsics::atomic_xadd_acqrel(dst, val), + Relaxed => intrinsics::atomic_xadd_relaxed(dst, val), + _ => intrinsics::atomic_xadd(dst, val) + }) +} + +/// Returns the old value (like __sync_fetch_and_sub). +#[inline] +pub unsafe fn atomic_sub<T>(dst: &mut T, val: T, order: Ordering) -> T { + let dst = cast::transmute(dst); + let val = cast::transmute(val); + + cast::transmute(match order { + Acquire => intrinsics::atomic_xsub_acq(dst, val), + Release => intrinsics::atomic_xsub_rel(dst, val), + AcqRel => intrinsics::atomic_xsub_acqrel(dst, val), + Relaxed => intrinsics::atomic_xsub_relaxed(dst, val), + _ => intrinsics::atomic_xsub(dst, val) + }) +} + +#[inline] +pub unsafe fn atomic_compare_and_swap<T>(dst:&mut T, old:T, new:T, order: Ordering) -> T { + let dst = cast::transmute(dst); + let old = cast::transmute(old); + let new = cast::transmute(new); + + cast::transmute(match order { + Acquire => intrinsics::atomic_cxchg_acq(dst, old, new), + Release => intrinsics::atomic_cxchg_rel(dst, old, new), + AcqRel => intrinsics::atomic_cxchg_acqrel(dst, old, new), + Relaxed => intrinsics::atomic_cxchg_relaxed(dst, old, new), + _ => intrinsics::atomic_cxchg(dst, old, new), + }) +} + +#[inline] +pub unsafe fn atomic_and<T>(dst: &mut T, val: T, order: Ordering) -> T { + let dst = cast::transmute(dst); + let val = cast::transmute(val); + + cast::transmute(match order { + Acquire => intrinsics::atomic_and_acq(dst, val), + Release => intrinsics::atomic_and_rel(dst, val), + AcqRel => intrinsics::atomic_and_acqrel(dst, val), + Relaxed => intrinsics::atomic_and_relaxed(dst, val), + _ => intrinsics::atomic_and(dst, val) + }) +} + + +#[inline] +pub unsafe fn atomic_nand<T>(dst: &mut T, val: T, order: Ordering) -> T { + let dst = cast::transmute(dst); + let val = cast::transmute(val); + + cast::transmute(match order { + Acquire => intrinsics::atomic_nand_acq(dst, val), + Release => intrinsics::atomic_nand_rel(dst, val), + AcqRel => intrinsics::atomic_nand_acqrel(dst, val), + Relaxed => intrinsics::atomic_nand_relaxed(dst, val), + _ => intrinsics::atomic_nand(dst, val) + }) +} + + +#[inline] +pub unsafe fn atomic_or<T>(dst: &mut T, val: T, order: Ordering) -> T { + let dst = cast::transmute(dst); + let val = cast::transmute(val); + + cast::transmute(match order { + Acquire => intrinsics::atomic_or_acq(dst, val), + Release => intrinsics::atomic_or_rel(dst, val), + AcqRel => intrinsics::atomic_or_acqrel(dst, val), + Relaxed => intrinsics::atomic_or_relaxed(dst, val), + _ => intrinsics::atomic_or(dst, val) + }) +} + + +#[inline] +pub unsafe fn atomic_xor<T>(dst: &mut T, val: T, order: Ordering) -> T { + let dst = cast::transmute(dst); + let val = cast::transmute(val); + + cast::transmute(match order { + Acquire => intrinsics::atomic_xor_acq(dst, val), + Release => intrinsics::atomic_xor_rel(dst, val), + AcqRel => intrinsics::atomic_xor_acqrel(dst, val), + Relaxed => intrinsics::atomic_xor_relaxed(dst, val), + _ => intrinsics::atomic_xor(dst, val) + }) +} + + +/** + * An atomic fence. + * + * A fence 'A' which has `Release` ordering semantics, synchronizes with a + * fence 'B' with (at least) `Acquire` semantics, if and only if there exists + * atomic operations X and Y, both operating on some atomic object 'M' such + * that A is sequenced before X, Y is synchronized before B and Y observers + * the change to M. This provides a happens-before dependence between A and B. + * + * Atomic operations with `Release` or `Acquire` semantics can also synchronize + * with a fence. + * + * A fence with has `SeqCst` ordering, in addition to having both `Acquire` and + * `Release` semantics, participates in the global program order of the other + * `SeqCst` operations and/or fences. + * + * Accepts `Acquire`, `Release`, `AcqRel` and `SeqCst` orderings. + */ +#[inline] +pub fn fence(order: Ordering) { + unsafe { + match order { + Acquire => intrinsics::atomic_fence_acq(), + Release => intrinsics::atomic_fence_rel(), + AcqRel => intrinsics::atomic_fence_rel(), + _ => intrinsics::atomic_fence(), + } + } +} + +#[cfg(test)] +mod test { + use option::*; + use super::*; + + #[test] + fn flag() { + let mut flg = AtomicFlag::new(); + assert!(!flg.test_and_set(SeqCst)); + assert!(flg.test_and_set(SeqCst)); + + flg.clear(SeqCst); + assert!(!flg.test_and_set(SeqCst)); + } + + #[test] + fn option_empty() { + let mut option: AtomicOption<()> = AtomicOption::empty(); + assert!(option.is_empty(SeqCst)); + } + + #[test] + fn option_swap() { + let mut p = AtomicOption::new(~1); + let a = ~2; + + let b = p.swap(a, SeqCst); + + assert_eq!(b, Some(~1)); + assert_eq!(p.take(SeqCst), Some(~2)); + } + + #[test] + fn option_take() { + let mut p = AtomicOption::new(~1); + + assert_eq!(p.take(SeqCst), Some(~1)); + assert_eq!(p.take(SeqCst), None); + + let p2 = ~2; + p.swap(p2, SeqCst); + + assert_eq!(p.take(SeqCst), Some(~2)); + } + + #[test] + fn option_fill() { + let mut p = AtomicOption::new(~1); + assert!(p.fill(~2, SeqCst).is_some()); // should fail; shouldn't leak! + assert_eq!(p.take(SeqCst), Some(~1)); + + assert!(p.fill(~2, SeqCst).is_none()); // shouldn't fail + assert_eq!(p.take(SeqCst), Some(~2)); + } + + #[test] + fn bool_and() { + let mut a = AtomicBool::new(true); + assert_eq!(a.fetch_and(false, SeqCst),true); + assert_eq!(a.load(SeqCst),false); + } + + static mut S_FLAG : AtomicFlag = INIT_ATOMIC_FLAG; + static mut S_BOOL : AtomicBool = INIT_ATOMIC_BOOL; + static mut S_INT : AtomicInt = INIT_ATOMIC_INT; + static mut S_UINT : AtomicUint = INIT_ATOMIC_UINT; + + #[test] + fn static_init() { + unsafe { + assert!(!S_FLAG.test_and_set(SeqCst)); + assert!(!S_BOOL.load(SeqCst)); + assert!(S_INT.load(SeqCst) == 0); + assert!(S_UINT.load(SeqCst) == 0); + } + } +} diff --git a/src/libstd/sync/deque.rs b/src/libstd/sync/deque.rs new file mode 100644 index 00000000000..4d0efcd6ee1 --- /dev/null +++ b/src/libstd/sync/deque.rs @@ -0,0 +1,661 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +//! A (mostly) lock-free concurrent work-stealing deque +//! +//! This module contains an implementation of the Chase-Lev work stealing deque +//! described in "Dynamic Circular Work-Stealing Deque". The implementation is +//! heavily based on the pseudocode found in the paper. +//! +//! This implementation does not want to have the restriction of a garbage +//! collector for reclamation of buffers, and instead it uses a shared pool of +//! buffers. This shared pool is required for correctness in this +//! implementation. +//! +//! The only lock-synchronized portions of this deque are the buffer allocation +//! and deallocation portions. Otherwise all operations are lock-free. +//! +//! # Example +//! +//! use std::rt::deque::BufferPool; +//! +//! let mut pool = BufferPool::new(); +//! let (mut worker, mut stealer) = pool.deque(); +//! +//! // Only the worker may push/pop +//! worker.push(1); +//! worker.pop(); +//! +//! // Stealers take data from the other end of the deque +//! worker.push(1); +//! stealer.steal(); +//! +//! // Stealers can be cloned to have many stealers stealing in parallel +//! worker.push(1); +//! let mut stealer2 = stealer.clone(); +//! stealer2.steal(); + +// NB: the "buffer pool" strategy is not done for speed, but rather for +// correctness. For more info, see the comment on `swap_buffer` + +// XXX: all atomic operations in this module use a SeqCst ordering. That is +// probably overkill + +use cast; +use clone::Clone; +use iter::{range, Iterator}; +use kinds::Send; +use libc; +use mem; +use ops::Drop; +use option::{Option, Some, None}; +use ptr; +use ptr::RawPtr; +use sync::arc::UnsafeArc; +use sync::atomics::{AtomicInt, AtomicPtr, SeqCst}; +use unstable::sync::Exclusive; +use vec::{OwnedVector, ImmutableVector}; + +// Once the queue is less than 1/K full, then it will be downsized. Note that +// the deque requires that this number be less than 2. +static K: int = 4; + +// Minimum number of bits that a buffer size should be. No buffer will resize to +// under this value, and all deques will initially contain a buffer of this +// size. +// +// The size in question is 1 << MIN_BITS +static MIN_BITS: int = 7; + +struct Deque<T> { + bottom: AtomicInt, + top: AtomicInt, + array: AtomicPtr<Buffer<T>>, + pool: BufferPool<T>, +} + +/// Worker half of the work-stealing deque. This worker has exclusive access to +/// one side of the deque, and uses `push` and `pop` method to manipulate it. +/// +/// There may only be one worker per deque. +pub struct Worker<T> { + priv deque: UnsafeArc<Deque<T>>, +} + +/// The stealing half of the work-stealing deque. Stealers have access to the +/// opposite end of the deque from the worker, and they only have access to the +/// `steal` method. +pub struct Stealer<T> { + priv deque: UnsafeArc<Deque<T>>, +} + +/// When stealing some data, this is an enumeration of the possible outcomes. +#[deriving(Eq)] +pub enum Stolen<T> { + /// The deque was empty at the time of stealing + Empty, + /// The stealer lost the race for stealing data, and a retry may return more + /// data. + Abort, + /// The stealer has successfully stolen some data. + Data(T), +} + +/// The allocation pool for buffers used by work-stealing deques. Right now this +/// structure is used for reclamation of memory after it is no longer in use by +/// deques. +/// +/// This data structure is protected by a mutex, but it is rarely used. Deques +/// will only use this structure when allocating a new buffer or deallocating a +/// previous one. +pub struct BufferPool<T> { + priv pool: Exclusive<~[~Buffer<T>]>, +} + +/// An internal buffer used by the chase-lev deque. This structure is actually +/// implemented as a circular buffer, and is used as the intermediate storage of +/// the data in the deque. +/// +/// This type is implemented with *T instead of ~[T] for two reasons: +/// +/// 1. There is nothing safe about using this buffer. This easily allows the +/// same value to be read twice in to rust, and there is nothing to +/// prevent this. The usage by the deque must ensure that one of the +/// values is forgotten. Furthermore, we only ever want to manually run +/// destructors for values in this buffer (on drop) because the bounds +/// are defined by the deque it's owned by. +/// +/// 2. We can certainly avoid bounds checks using *T instead of ~[T], although +/// LLVM is probably pretty good at doing this already. +struct Buffer<T> { + storage: *T, + log_size: int, +} + +impl<T: Send> BufferPool<T> { + /// Allocates a new buffer pool which in turn can be used to allocate new + /// deques. + pub fn new() -> BufferPool<T> { + BufferPool { pool: Exclusive::new(~[]) } + } + + /// Allocates a new work-stealing deque which will send/receiving memory to + /// and from this buffer pool. + pub fn deque(&mut self) -> (Worker<T>, Stealer<T>) { + let (a, b) = UnsafeArc::new2(Deque::new(self.clone())); + (Worker { deque: a }, Stealer { deque: b }) + } + + fn alloc(&mut self, bits: int) -> ~Buffer<T> { + unsafe { + self.pool.with(|pool| { + match pool.iter().position(|x| x.size() >= (1 << bits)) { + Some(i) => pool.remove(i), + None => ~Buffer::new(bits) + } + }) + } + } + + fn free(&mut self, buf: ~Buffer<T>) { + unsafe { + let mut buf = Some(buf); + self.pool.with(|pool| { + let buf = buf.take_unwrap(); + match pool.iter().position(|v| v.size() > buf.size()) { + Some(i) => pool.insert(i, buf), + None => pool.push(buf), + } + }) + } + } +} + +impl<T: Send> Clone for BufferPool<T> { + fn clone(&self) -> BufferPool<T> { BufferPool { pool: self.pool.clone() } } +} + +impl<T: Send> Worker<T> { + /// Pushes data onto the front of this work queue. + pub fn push(&mut self, t: T) { + unsafe { (*self.deque.get()).push(t) } + } + /// Pops data off the front of the work queue, returning `None` on an empty + /// queue. + pub fn pop(&mut self) -> Option<T> { + unsafe { (*self.deque.get()).pop() } + } + + /// Gets access to the buffer pool that this worker is attached to. This can + /// be used to create more deques which share the same buffer pool as this + /// deque. + pub fn pool<'a>(&'a mut self) -> &'a mut BufferPool<T> { + unsafe { &mut (*self.deque.get()).pool } + } +} + +impl<T: Send> Stealer<T> { + /// Steals work off the end of the queue (opposite of the worker's end) + pub fn steal(&mut self) -> Stolen<T> { + unsafe { (*self.deque.get()).steal() } + } + + /// Gets access to the buffer pool that this stealer is attached to. This + /// can be used to create more deques which share the same buffer pool as + /// this deque. + pub fn pool<'a>(&'a mut self) -> &'a mut BufferPool<T> { + unsafe { &mut (*self.deque.get()).pool } + } +} + +impl<T: Send> Clone for Stealer<T> { + fn clone(&self) -> Stealer<T> { Stealer { deque: self.deque.clone() } } +} + +// Almost all of this code can be found directly in the paper so I'm not +// personally going to heavily comment what's going on here. + +impl<T: Send> Deque<T> { + fn new(mut pool: BufferPool<T>) -> Deque<T> { + let buf = pool.alloc(MIN_BITS); + Deque { + bottom: AtomicInt::new(0), + top: AtomicInt::new(0), + array: AtomicPtr::new(unsafe { cast::transmute(buf) }), + pool: pool, + } + } + + unsafe fn push(&mut self, data: T) { + let mut b = self.bottom.load(SeqCst); + let t = self.top.load(SeqCst); + let mut a = self.array.load(SeqCst); + let size = b - t; + if size >= (*a).size() - 1 { + // You won't find this code in the chase-lev deque paper. This is + // alluded to in a small footnote, however. We always free a buffer + // when growing in order to prevent leaks. + a = self.swap_buffer(b, a, (*a).resize(b, t, 1)); + b = self.bottom.load(SeqCst); + } + (*a).put(b, data); + self.bottom.store(b + 1, SeqCst); + } + + unsafe fn pop(&mut self) -> Option<T> { + let b = self.bottom.load(SeqCst); + let a = self.array.load(SeqCst); + let b = b - 1; + self.bottom.store(b, SeqCst); + let t = self.top.load(SeqCst); + let size = b - t; + if size < 0 { + self.bottom.store(t, SeqCst); + return None; + } + let data = (*a).get(b); + if size > 0 { + self.maybe_shrink(b, t); + return Some(data); + } + if self.top.compare_and_swap(t, t + 1, SeqCst) == t { + self.bottom.store(t + 1, SeqCst); + return Some(data); + } else { + self.bottom.store(t + 1, SeqCst); + cast::forget(data); // someone else stole this value + return None; + } + } + + unsafe fn steal(&mut self) -> Stolen<T> { + let t = self.top.load(SeqCst); + let old = self.array.load(SeqCst); + let b = self.bottom.load(SeqCst); + let a = self.array.load(SeqCst); + let size = b - t; + if size <= 0 { return Empty } + if size % (*a).size() == 0 { + if a == old && t == self.top.load(SeqCst) { + return Empty + } + return Abort + } + let data = (*a).get(t); + if self.top.compare_and_swap(t, t + 1, SeqCst) == t { + Data(data) + } else { + cast::forget(data); // someone else stole this value + Abort + } + } + + unsafe fn maybe_shrink(&mut self, b: int, t: int) { + let a = self.array.load(SeqCst); + if b - t < (*a).size() / K && b - t > (1 << MIN_BITS) { + self.swap_buffer(b, a, (*a).resize(b, t, -1)); + } + } + + // Helper routine not mentioned in the paper which is used in growing and + // shrinking buffers to swap in a new buffer into place. As a bit of a + // recap, the whole point that we need a buffer pool rather than just + // calling malloc/free directly is that stealers can continue using buffers + // after this method has called 'free' on it. The continued usage is simply + // a read followed by a forget, but we must make sure that the memory can + // continue to be read after we flag this buffer for reclamation. + unsafe fn swap_buffer(&mut self, b: int, old: *mut Buffer<T>, + buf: Buffer<T>) -> *mut Buffer<T> { + let newbuf: *mut Buffer<T> = cast::transmute(~buf); + self.array.store(newbuf, SeqCst); + let ss = (*newbuf).size(); + self.bottom.store(b + ss, SeqCst); + let t = self.top.load(SeqCst); + if self.top.compare_and_swap(t, t + ss, SeqCst) != t { + self.bottom.store(b, SeqCst); + } + self.pool.free(cast::transmute(old)); + return newbuf; + } +} + + +#[unsafe_destructor] +impl<T: Send> Drop for Deque<T> { + fn drop(&mut self) { + let t = self.top.load(SeqCst); + let b = self.bottom.load(SeqCst); + let a = self.array.load(SeqCst); + // Free whatever is leftover in the dequeue, and then move the buffer + // back into the pool. + for i in range(t, b) { + let _: T = unsafe { (*a).get(i) }; + } + self.pool.free(unsafe { cast::transmute(a) }); + } +} + +impl<T: Send> Buffer<T> { + unsafe fn new(log_size: int) -> Buffer<T> { + let size = (1 << log_size) * mem::size_of::<T>(); + let buffer = libc::malloc(size as libc::size_t); + assert!(!buffer.is_null()); + Buffer { + storage: buffer as *T, + log_size: log_size, + } + } + + fn size(&self) -> int { 1 << self.log_size } + + // Apparently LLVM cannot optimize (foo % (1 << bar)) into this implicitly + fn mask(&self) -> int { (1 << self.log_size) - 1 } + + // This does not protect against loading duplicate values of the same cell, + // nor does this clear out the contents contained within. Hence, this is a + // very unsafe method which the caller needs to treat specially in case a + // race is lost. + unsafe fn get(&self, i: int) -> T { + ptr::read_ptr(self.storage.offset(i & self.mask())) + } + + // Unsafe because this unsafely overwrites possibly uninitialized or + // initialized data. + unsafe fn put(&mut self, i: int, t: T) { + let ptr = self.storage.offset(i & self.mask()); + ptr::copy_nonoverlapping_memory(ptr as *mut T, &t as *T, 1); + cast::forget(t); + } + + // Again, unsafe because this has incredibly dubious ownership violations. + // It is assumed that this buffer is immediately dropped. + unsafe fn resize(&self, b: int, t: int, delta: int) -> Buffer<T> { + let mut buf = Buffer::new(self.log_size + delta); + for i in range(t, b) { + buf.put(i, self.get(i)); + } + return buf; + } +} + +#[unsafe_destructor] +impl<T: Send> Drop for Buffer<T> { + fn drop(&mut self) { + // It is assumed that all buffers are empty on drop. + unsafe { libc::free(self.storage as *libc::c_void) } + } +} + +#[cfg(test)] +mod tests { + use prelude::*; + use super::{Data, BufferPool, Abort, Empty, Worker, Stealer}; + + use cast; + use rt::thread::Thread; + use rand; + use rand::Rng; + use sync::atomics::{AtomicBool, INIT_ATOMIC_BOOL, SeqCst, + AtomicUint, INIT_ATOMIC_UINT}; + use vec; + + #[test] + fn smoke() { + let mut pool = BufferPool::new(); + let (mut w, mut s) = pool.deque(); + assert_eq!(w.pop(), None); + assert_eq!(s.steal(), Empty); + w.push(1); + assert_eq!(w.pop(), Some(1)); + w.push(1); + assert_eq!(s.steal(), Data(1)); + w.push(1); + assert_eq!(s.clone().steal(), Data(1)); + } + + #[test] + fn stealpush() { + static AMT: int = 100000; + let mut pool = BufferPool::<int>::new(); + let (mut w, s) = pool.deque(); + let t = do Thread::start { + let mut s = s; + let mut left = AMT; + while left > 0 { + match s.steal() { + Data(i) => { + assert_eq!(i, 1); + left -= 1; + } + Abort | Empty => {} + } + } + }; + + for _ in range(0, AMT) { + w.push(1); + } + + t.join(); + } + + #[test] + fn stealpush_large() { + static AMT: int = 100000; + let mut pool = BufferPool::<(int, int)>::new(); + let (mut w, s) = pool.deque(); + let t = do Thread::start { + let mut s = s; + let mut left = AMT; + while left > 0 { + match s.steal() { + Data((1, 10)) => { left -= 1; } + Data(..) => fail!(), + Abort | Empty => {} + } + } + }; + + for _ in range(0, AMT) { + w.push((1, 10)); + } + + t.join(); + } + + fn stampede(mut w: Worker<~int>, s: Stealer<~int>, + nthreads: int, amt: uint) { + for _ in range(0, amt) { + w.push(~20); + } + let mut remaining = AtomicUint::new(amt); + let unsafe_remaining: *mut AtomicUint = &mut remaining; + + let threads = range(0, nthreads).map(|_| { + let s = s.clone(); + do Thread::start { + unsafe { + let mut s = s; + while (*unsafe_remaining).load(SeqCst) > 0 { + match s.steal() { + Data(~20) => { + (*unsafe_remaining).fetch_sub(1, SeqCst); + } + Data(..) => fail!(), + Abort | Empty => {} + } + } + } + } + }).to_owned_vec(); + + while remaining.load(SeqCst) > 0 { + match w.pop() { + Some(~20) => { remaining.fetch_sub(1, SeqCst); } + Some(..) => fail!(), + None => {} + } + } + + for thread in threads.move_iter() { + thread.join(); + } + } + + #[test] + fn run_stampede() { + let mut pool = BufferPool::<~int>::new(); + let (w, s) = pool.deque(); + stampede(w, s, 8, 10000); + } + + #[test] + fn many_stampede() { + static AMT: uint = 4; + let mut pool = BufferPool::<~int>::new(); + let threads = range(0, AMT).map(|_| { + let (w, s) = pool.deque(); + do Thread::start { + stampede(w, s, 4, 10000); + } + }).to_owned_vec(); + + for thread in threads.move_iter() { + thread.join(); + } + } + + #[test] + fn stress() { + static AMT: int = 100000; + static NTHREADS: int = 8; + static mut DONE: AtomicBool = INIT_ATOMIC_BOOL; + static mut HITS: AtomicUint = INIT_ATOMIC_UINT; + let mut pool = BufferPool::<int>::new(); + let (mut w, s) = pool.deque(); + + let threads = range(0, NTHREADS).map(|_| { + let s = s.clone(); + do Thread::start { + unsafe { + let mut s = s; + loop { + match s.steal() { + Data(2) => { HITS.fetch_add(1, SeqCst); } + Data(..) => fail!(), + _ if DONE.load(SeqCst) => break, + _ => {} + } + } + } + } + }).to_owned_vec(); + + let mut rng = rand::task_rng(); + let mut expected = 0; + while expected < AMT { + if rng.gen_range(0, 3) == 2 { + match w.pop() { + None => {} + Some(2) => unsafe { HITS.fetch_add(1, SeqCst); }, + Some(_) => fail!(), + } + } else { + expected += 1; + w.push(2); + } + } + + unsafe { + while HITS.load(SeqCst) < AMT as uint { + match w.pop() { + None => {} + Some(2) => { HITS.fetch_add(1, SeqCst); }, + Some(_) => fail!(), + } + } + DONE.store(true, SeqCst); + } + + for thread in threads.move_iter() { + thread.join(); + } + + assert_eq!(unsafe { HITS.load(SeqCst) }, expected as uint); + } + + #[test] + #[ignore(cfg(windows))] // apparently windows scheduling is weird? + fn no_starvation() { + static AMT: int = 10000; + static NTHREADS: int = 4; + static mut DONE: AtomicBool = INIT_ATOMIC_BOOL; + let mut pool = BufferPool::<(int, uint)>::new(); + let (mut w, s) = pool.deque(); + + let (threads, hits) = vec::unzip(range(0, NTHREADS).map(|_| { + let s = s.clone(); + let unique_box = ~AtomicUint::new(0); + let thread_box = unsafe { + *cast::transmute::<&~AtomicUint,**mut AtomicUint>(&unique_box) + }; + (do Thread::start { + unsafe { + let mut s = s; + loop { + match s.steal() { + Data((1, 2)) => { + (*thread_box).fetch_add(1, SeqCst); + } + Data(..) => fail!(), + _ if DONE.load(SeqCst) => break, + _ => {} + } + } + } + }, unique_box) + })); + + let mut rng = rand::task_rng(); + let mut myhit = false; + let mut iter = 0; + 'outer: loop { + for _ in range(0, rng.gen_range(0, AMT)) { + if !myhit && rng.gen_range(0, 3) == 2 { + match w.pop() { + None => {} + Some((1, 2)) => myhit = true, + Some(_) => fail!(), + } + } else { + w.push((1, 2)); + } + } + iter += 1; + + debug!("loop iteration {}", iter); + for (i, slot) in hits.iter().enumerate() { + let amt = slot.load(SeqCst); + debug!("thread {}: {}", i, amt); + if amt == 0 { continue 'outer; } + } + if myhit { + break + } + } + + unsafe { DONE.store(true, SeqCst); } + + for thread in threads.move_iter() { + thread.join(); + } + } +} + diff --git a/src/libstd/sync/mod.rs b/src/libstd/sync/mod.rs new file mode 100644 index 00000000000..3213c538152 --- /dev/null +++ b/src/libstd/sync/mod.rs @@ -0,0 +1,23 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +//! Useful synchronization primitives +//! +//! This modules contains useful safe and unsafe synchronization primitives. +//! Most of the primitives in this module do not provide any sort of locking +//! and/or blocking at all, but rather provide the necessary tools to build +//! other types of concurrent primitives. + +pub mod arc; +pub mod atomics; +pub mod deque; +pub mod mpmc_bounded_queue; +pub mod mpsc_queue; +pub mod spsc_queue; diff --git a/src/libstd/sync/mpmc_bounded_queue.rs b/src/libstd/sync/mpmc_bounded_queue.rs new file mode 100644 index 00000000000..b623976306d --- /dev/null +++ b/src/libstd/sync/mpmc_bounded_queue.rs @@ -0,0 +1,211 @@ +/* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved. + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT + * SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE + * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + * The views and conclusions contained in the software and documentation are + * those of the authors and should not be interpreted as representing official + * policies, either expressed or implied, of Dmitry Vyukov. + */ + +#[allow(missing_doc, dead_code)]; + +// http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue + +use clone::Clone; +use kinds::Send; +use num::{Exponential,Algebraic,Round}; +use option::{Option, Some, None}; +use sync::arc::UnsafeArc; +use sync::atomics::{AtomicUint,Relaxed,Release,Acquire}; +use vec; + +struct Node<T> { + sequence: AtomicUint, + value: Option<T>, +} + +struct State<T> { + pad0: [u8, ..64], + buffer: ~[Node<T>], + mask: uint, + pad1: [u8, ..64], + enqueue_pos: AtomicUint, + pad2: [u8, ..64], + dequeue_pos: AtomicUint, + pad3: [u8, ..64], +} + +pub struct Queue<T> { + priv state: UnsafeArc<State<T>>, +} + +impl<T: Send> State<T> { + fn with_capacity(capacity: uint) -> State<T> { + let capacity = if capacity < 2 || (capacity & (capacity - 1)) != 0 { + if capacity < 2 { + 2u + } else { + // use next power of 2 as capacity + 2f64.pow(&((capacity as f64).log2().ceil())) as uint + } + } else { + capacity + }; + let buffer = vec::from_fn(capacity, |i:uint| { + Node{sequence:AtomicUint::new(i),value:None} + }); + State{ + pad0: [0, ..64], + buffer: buffer, + mask: capacity-1, + pad1: [0, ..64], + enqueue_pos: AtomicUint::new(0), + pad2: [0, ..64], + dequeue_pos: AtomicUint::new(0), + pad3: [0, ..64], + } + } + + fn push(&mut self, value: T) -> bool { + let mask = self.mask; + let mut pos = self.enqueue_pos.load(Relaxed); + loop { + let node = &mut self.buffer[pos & mask]; + let seq = node.sequence.load(Acquire); + let diff: int = seq as int - pos as int; + + if diff == 0 { + let enqueue_pos = self.enqueue_pos.compare_and_swap(pos, pos+1, Relaxed); + if enqueue_pos == pos { + node.value = Some(value); + node.sequence.store(pos+1, Release); + break + } else { + pos = enqueue_pos; + } + } else if (diff < 0) { + return false + } else { + pos = self.enqueue_pos.load(Relaxed); + } + } + true + } + + fn pop(&mut self) -> Option<T> { + let mask = self.mask; + let mut pos = self.dequeue_pos.load(Relaxed); + loop { + let node = &mut self.buffer[pos & mask]; + let seq = node.sequence.load(Acquire); + let diff: int = seq as int - (pos + 1) as int; + if diff == 0 { + let dequeue_pos = self.dequeue_pos.compare_and_swap(pos, pos+1, Relaxed); + if dequeue_pos == pos { + let value = node.value.take(); + node.sequence.store(pos + mask + 1, Release); + return value + } else { + pos = dequeue_pos; + } + } else if diff < 0 { + return None + } else { + pos = self.dequeue_pos.load(Relaxed); + } + } + } +} + +impl<T: Send> Queue<T> { + pub fn with_capacity(capacity: uint) -> Queue<T> { + Queue{ + state: UnsafeArc::new(State::with_capacity(capacity)) + } + } + + pub fn push(&mut self, value: T) -> bool { + unsafe { (*self.state.get()).push(value) } + } + + pub fn pop(&mut self) -> Option<T> { + unsafe { (*self.state.get()).pop() } + } +} + +impl<T: Send> Clone for Queue<T> { + fn clone(&self) -> Queue<T> { + Queue { + state: self.state.clone() + } + } +} + +#[cfg(test)] +mod tests { + use prelude::*; + use option::*; + use task; + use super::Queue; + + #[test] + fn test() { + let nthreads = 8u; + let nmsgs = 1000u; + let mut q = Queue::with_capacity(nthreads*nmsgs); + assert_eq!(None, q.pop()); + + for _ in range(0, nthreads) { + let q = q.clone(); + do task::spawn_sched(task::SingleThreaded) { + let mut q = q; + for i in range(0, nmsgs) { + assert!(q.push(i)); + } + } + } + + let mut completion_ports = ~[]; + for _ in range(0, nthreads) { + let (completion_port, completion_chan) = Chan::new(); + completion_ports.push(completion_port); + let q = q.clone(); + do task::spawn_sched(task::SingleThreaded) { + let mut q = q; + let mut i = 0u; + loop { + match q.pop() { + None => {}, + Some(_) => { + i += 1; + if i == nmsgs { break } + } + } + } + completion_chan.send(i); + } + } + + for completion_port in completion_ports.mut_iter() { + assert_eq!(nmsgs, completion_port.recv()); + } + } +} diff --git a/src/libstd/sync/mpsc_queue.rs b/src/libstd/sync/mpsc_queue.rs new file mode 100644 index 00000000000..89e56e3fa67 --- /dev/null +++ b/src/libstd/sync/mpsc_queue.rs @@ -0,0 +1,245 @@ +/* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved. + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT + * SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE + * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + * The views and conclusions contained in the software and documentation are + * those of the authors and should not be interpreted as representing official + * policies, either expressed or implied, of Dmitry Vyukov. + */ + +//! A mostly lock-free multi-producer, single consumer queue. +//! +//! This module contains an implementation of a concurrent MPSC queue. This +//! queue can be used to share data between tasks, and is also used as the +//! building block of channels in rust. +//! +//! Note that the current implementation of this queue has a caveat of the `pop` +//! method, and see the method for more information about it. Due to this +//! caveat, this queue may not be appropriate for all use-cases. + +// http://www.1024cores.net/home/lock-free-algorithms +// /queues/non-intrusive-mpsc-node-based-queue + +use cast; +use clone::Clone; +use kinds::Send; +use ops::Drop; +use option::{Option, None, Some}; +use ptr::RawPtr; +use sync::arc::UnsafeArc; +use sync::atomics::{AtomicPtr, Release, Acquire, AcqRel, Relaxed}; + +/// A result of the `pop` function. +pub enum PopResult<T> { + /// Some data has been popped + Data(T), + /// The queue is empty + Empty, + /// The queue is in an inconsistent state. Popping data should succeed, but + /// some pushers have yet to make enough progress in order allow a pop to + /// succeed. It is recommended that a pop() occur "in the near future" in + /// order to see if the sender has made progress or not + Inconsistent, +} + +struct Node<T> { + next: AtomicPtr<Node<T>>, + value: Option<T>, +} + +struct State<T, P> { + head: AtomicPtr<Node<T>>, + tail: *mut Node<T>, + packet: P, +} + +/// The consumer half of this concurrent queue. This half is used to receive +/// data from the producers. +pub struct Consumer<T, P> { + priv state: UnsafeArc<State<T, P>>, +} + +/// The production half of the concurrent queue. This handle may be cloned in +/// order to make handles for new producers. +pub struct Producer<T, P> { + priv state: UnsafeArc<State<T, P>>, +} + +impl<T: Send, P: Send> Clone for Producer<T, P> { + fn clone(&self) -> Producer<T, P> { + Producer { state: self.state.clone() } + } +} + +/// Creates a new MPSC queue. The given argument `p` is a user-defined "packet" +/// of information which will be shared by the consumer and the producer which +/// can be re-acquired via the `packet` function. This is helpful when extra +/// state is shared between the producer and consumer, but note that there is no +/// synchronization performed of this data. +pub fn queue<T: Send, P: Send>(p: P) -> (Consumer<T, P>, Producer<T, P>) { + unsafe { + let (a, b) = UnsafeArc::new2(State::new(p)); + (Consumer { state: a }, Producer { state: b }) + } +} + +impl<T> Node<T> { + unsafe fn new(v: Option<T>) -> *mut Node<T> { + cast::transmute(~Node { + next: AtomicPtr::new(0 as *mut Node<T>), + value: v, + }) + } +} + +impl<T: Send, P: Send> State<T, P> { + unsafe fn new(p: P) -> State<T, P> { + let stub = Node::new(None); + State { + head: AtomicPtr::new(stub), + tail: stub, + packet: p, + } + } + + unsafe fn push(&mut self, t: T) { + let n = Node::new(Some(t)); + let prev = self.head.swap(n, AcqRel); + (*prev).next.store(n, Release); + } + + unsafe fn pop(&mut self) -> PopResult<T> { + let tail = self.tail; + let next = (*tail).next.load(Acquire); + + if !next.is_null() { + self.tail = next; + assert!((*tail).value.is_none()); + assert!((*next).value.is_some()); + let ret = (*next).value.take_unwrap(); + let _: ~Node<T> = cast::transmute(tail); + return Data(ret); + } + + if self.head.load(Acquire) == tail {Empty} else {Inconsistent} + } +} + +#[unsafe_destructor] +impl<T: Send, P: Send> Drop for State<T, P> { + fn drop(&mut self) { + unsafe { + let mut cur = self.tail; + while !cur.is_null() { + let next = (*cur).next.load(Relaxed); + let _: ~Node<T> = cast::transmute(cur); + cur = next; + } + } + } +} + +impl<T: Send, P: Send> Producer<T, P> { + /// Pushes a new value onto this queue. + pub fn push(&mut self, value: T) { + unsafe { (*self.state.get()).push(value) } + } + /// Gets an unsafe pointer to the user-defined packet shared by the + /// producers and the consumer. Note that care must be taken to ensure that + /// the lifetime of the queue outlives the usage of the returned pointer. + pub unsafe fn packet(&self) -> *mut P { + &mut (*self.state.get()).packet as *mut P + } +} + +impl<T: Send, P: Send> Consumer<T, P> { + /// Pops some data from this queue. + /// + /// Note that the current implementation means that this function cannot + /// return `Option<T>`. It is possible for this queue to be in an + /// inconsistent state where many pushes have suceeded and completely + /// finished, but pops cannot return `Some(t)`. This inconsistent state + /// happens when a pusher is pre-empted at an inopportune moment. + /// + /// This inconsistent state means that this queue does indeed have data, but + /// it does not currently have access to it at this time. + pub fn pop(&mut self) -> PopResult<T> { + unsafe { (*self.state.get()).pop() } + } + /// Attempts to pop data from this queue, but doesn't attempt too hard. This + /// will canonicalize inconsistent states to a `None` value. + pub fn casual_pop(&mut self) -> Option<T> { + match self.pop() { + Data(t) => Some(t), + Empty | Inconsistent => None, + } + } + /// Gets an unsafe pointer to the underlying user-defined packet. See + /// `Producer.packet` for more information. + pub unsafe fn packet(&self) -> *mut P { + &mut (*self.state.get()).packet as *mut P + } +} + +#[cfg(test)] +mod tests { + use prelude::*; + + use task; + use super::{queue, Data, Empty, Inconsistent}; + + #[test] + fn test_full() { + let (_, mut p) = queue(()); + p.push(~1); + p.push(~2); + } + + #[test] + fn test() { + let nthreads = 8u; + let nmsgs = 1000u; + let (mut c, p) = queue(()); + match c.pop() { + Empty => {} + Inconsistent | Data(..) => fail!() + } + + for _ in range(0, nthreads) { + let q = p.clone(); + do task::spawn_sched(task::SingleThreaded) { + let mut q = q; + for i in range(0, nmsgs) { + q.push(i); + } + } + } + + let mut i = 0u; + while i < nthreads * nmsgs { + match c.pop() { + Empty | Inconsistent => {}, + Data(_) => { i += 1 } + } + } + } +} + diff --git a/src/libstd/sync/spsc_queue.rs b/src/libstd/sync/spsc_queue.rs new file mode 100644 index 00000000000..c4abba04659 --- /dev/null +++ b/src/libstd/sync/spsc_queue.rs @@ -0,0 +1,334 @@ +/* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved. + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT + * SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE + * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + * The views and conclusions contained in the software and documentation are + * those of the authors and should not be interpreted as representing official + * policies, either expressed or implied, of Dmitry Vyukov. + */ + +// http://www.1024cores.net/home/lock-free-algorithms/queues/unbounded-spsc-queue + +//! A single-producer single-consumer concurrent queue +//! +//! This module contains the implementation of an SPSC queue which can be used +//! concurrently between two tasks. This data structure is safe to use and +//! enforces the semantics that there is one pusher and one popper. + +use cast; +use kinds::Send; +use ops::Drop; +use option::{Some, None, Option}; +use ptr::RawPtr; +use sync::arc::UnsafeArc; +use sync::atomics::{AtomicPtr, Relaxed, AtomicUint, Acquire, Release}; + +// Node within the linked list queue of messages to send +struct Node<T> { + // XXX: this could be an uninitialized T if we're careful enough, and + // that would reduce memory usage (and be a bit faster). + // is it worth it? + value: Option<T>, // nullable for re-use of nodes + next: AtomicPtr<Node<T>>, // next node in the queue +} + +// The producer/consumer halves both need access to the `tail` field, and if +// they both have access to that we may as well just give them both access +// to this whole structure. +struct State<T, P> { + // consumer fields + tail: *mut Node<T>, // where to pop from + tail_prev: AtomicPtr<Node<T>>, // where to pop from + + // producer fields + head: *mut Node<T>, // where to push to + first: *mut Node<T>, // where to get new nodes from + tail_copy: *mut Node<T>, // between first/tail + + // Cache maintenance fields. Additions and subtractions are stored + // separately in order to allow them to use nonatomic addition/subtraction. + cache_bound: uint, + cache_additions: AtomicUint, + cache_subtractions: AtomicUint, + + packet: P, +} + +/// Producer half of this queue. This handle is used to push data to the +/// consumer. +pub struct Producer<T, P> { + priv state: UnsafeArc<State<T, P>>, +} + +/// Consumer half of this queue. This handle is used to receive data from the +/// producer. +pub struct Consumer<T, P> { + priv state: UnsafeArc<State<T, P>>, +} + +/// Creates a new queue. The producer returned is connected to the consumer to +/// push all data to the consumer. +/// +/// # Arguments +/// +/// * `bound` - This queue implementation is implemented with a linked list, +/// and this means that a push is always a malloc. In order to +/// amortize this cost, an internal cache of nodes is maintained +/// to prevent a malloc from always being necessary. This bound is +/// the limit on the size of the cache (if desired). If the value +/// is 0, then the cache has no bound. Otherwise, the cache will +/// never grow larger than `bound` (although the queue itself +/// could be much larger. +/// +/// * `p` - This is the user-defined packet of data which will also be shared +/// between the producer and consumer. +pub fn queue<T: Send, P: Send>(bound: uint, + p: P) -> (Consumer<T, P>, Producer<T, P>) +{ + let n1 = Node::new(); + let n2 = Node::new(); + unsafe { (*n1).next.store(n2, Relaxed) } + let state = State { + tail: n2, + tail_prev: AtomicPtr::new(n1), + head: n2, + first: n1, + tail_copy: n1, + cache_bound: bound, + cache_additions: AtomicUint::new(0), + cache_subtractions: AtomicUint::new(0), + packet: p, + }; + let (arc1, arc2) = UnsafeArc::new2(state); + (Consumer { state: arc1 }, Producer { state: arc2 }) +} + +impl<T: Send> Node<T> { + fn new() -> *mut Node<T> { + unsafe { + cast::transmute(~Node { + value: None, + next: AtomicPtr::new(0 as *mut Node<T>), + }) + } + } +} + +impl<T: Send, P: Send> Producer<T, P> { + /// Pushes data onto the queue + pub fn push(&mut self, t: T) { + unsafe { (*self.state.get()).push(t) } + } + /// Tests whether the queue is empty. Note that if this function returns + /// `false`, the return value is significant, but if the return value is + /// `true` then almost no meaning can be attached to the return value. + pub fn is_empty(&self) -> bool { + unsafe { (*self.state.get()).is_empty() } + } + /// Acquires an unsafe pointer to the underlying user-defined packet. Note + /// that care must be taken to ensure that the queue outlives the usage of + /// the packet (because it is an unsafe pointer). + pub unsafe fn packet(&self) -> *mut P { + &mut (*self.state.get()).packet as *mut P + } +} + +impl<T: Send, P: Send> Consumer<T, P> { + /// Pops some data from this queue, returning `None` when the queue is + /// empty. + pub fn pop(&mut self) -> Option<T> { + unsafe { (*self.state.get()).pop() } + } + /// Same function as the producer's `packet` method. + pub unsafe fn packet(&self) -> *mut P { + &mut (*self.state.get()).packet as *mut P + } +} + +impl<T: Send, P: Send> State<T, P> { + // remember that there is only one thread executing `push` (and only one + // thread executing `pop`) + unsafe fn push(&mut self, t: T) { + // Acquire a node (which either uses a cached one or allocates a new + // one), and then append this to the 'head' node. + let n = self.alloc(); + assert!((*n).value.is_none()); + (*n).value = Some(t); + (*n).next.store(0 as *mut Node<T>, Relaxed); + (*self.head).next.store(n, Release); + self.head = n; + } + + unsafe fn alloc(&mut self) -> *mut Node<T> { + // First try to see if we can consume the 'first' node for our uses. + // We try to avoid as many atomic instructions as possible here, so + // the addition to cache_subtractions is not atomic (plus we're the + // only one subtracting from the cache). + if self.first != self.tail_copy { + if self.cache_bound > 0 { + let b = self.cache_subtractions.load(Relaxed); + self.cache_subtractions.store(b + 1, Relaxed); + } + let ret = self.first; + self.first = (*ret).next.load(Relaxed); + return ret; + } + // If the above fails, then update our copy of the tail and try + // again. + self.tail_copy = self.tail_prev.load(Acquire); + if self.first != self.tail_copy { + if self.cache_bound > 0 { + let b = self.cache_subtractions.load(Relaxed); + self.cache_subtractions.store(b + 1, Relaxed); + } + let ret = self.first; + self.first = (*ret).next.load(Relaxed); + return ret; + } + // If all of that fails, then we have to allocate a new node + // (there's nothing in the node cache). + Node::new() + } + + // remember that there is only one thread executing `pop` (and only one + // thread executing `push`) + unsafe fn pop(&mut self) -> Option<T> { + // The `tail` node is not actually a used node, but rather a + // sentinel from where we should start popping from. Hence, look at + // tail's next field and see if we can use it. If we do a pop, then + // the current tail node is a candidate for going into the cache. + let tail = self.tail; + let next = (*tail).next.load(Acquire); + if next.is_null() { return None } + assert!((*next).value.is_some()); + let ret = (*next).value.take(); + + self.tail = next; + if self.cache_bound == 0 { + self.tail_prev.store(tail, Release); + } else { + // XXX: this is dubious with overflow. + let additions = self.cache_additions.load(Relaxed); + let subtractions = self.cache_subtractions.load(Relaxed); + let size = additions - subtractions; + + if size < self.cache_bound { + self.tail_prev.store(tail, Release); + self.cache_additions.store(additions + 1, Relaxed); + } else { + (*self.tail_prev.load(Relaxed)).next.store(next, Relaxed); + // We have successfully erased all references to 'tail', so + // now we can safely drop it. + let _: ~Node<T> = cast::transmute(tail); + } + } + return ret; + } + + unsafe fn is_empty(&self) -> bool { + let tail = self.tail; + let next = (*tail).next.load(Acquire); + return next.is_null(); + } +} + +#[unsafe_destructor] +impl<T: Send, P: Send> Drop for State<T, P> { + fn drop(&mut self) { + unsafe { + let mut cur = self.first; + while !cur.is_null() { + let next = (*cur).next.load(Relaxed); + let _n: ~Node<T> = cast::transmute(cur); + cur = next; + } + } + } +} + +#[cfg(test)] +mod test { + use prelude::*; + use super::queue; + use task; + + #[test] + fn smoke() { + let (mut c, mut p) = queue(0, ()); + p.push(1); + p.push(2); + assert_eq!(c.pop(), Some(1)); + assert_eq!(c.pop(), Some(2)); + assert_eq!(c.pop(), None); + p.push(3); + p.push(4); + assert_eq!(c.pop(), Some(3)); + assert_eq!(c.pop(), Some(4)); + assert_eq!(c.pop(), None); + } + + #[test] + fn drop_full() { + let (_, mut p) = queue(0, ()); + p.push(~1); + p.push(~2); + } + + #[test] + fn smoke_bound() { + let (mut c, mut p) = queue(1, ()); + p.push(1); + p.push(2); + assert_eq!(c.pop(), Some(1)); + assert_eq!(c.pop(), Some(2)); + assert_eq!(c.pop(), None); + p.push(3); + p.push(4); + assert_eq!(c.pop(), Some(3)); + assert_eq!(c.pop(), Some(4)); + assert_eq!(c.pop(), None); + } + + #[test] + fn stress() { + stress_bound(0); + stress_bound(1); + + fn stress_bound(bound: uint) { + let (c, mut p) = queue(bound, ()); + do task::spawn_sched(task::SingleThreaded) { + let mut c = c; + for _ in range(0, 100000) { + loop { + match c.pop() { + Some(1) => break, + Some(_) => fail!(), + None => {} + } + } + } + } + for _ in range(0, 100000) { + p.push(1); + } + } + } +} |
