diff options
| author | Aaron Turon <aturon@mozilla.com> | 2014-11-23 12:52:37 -0800 |
|---|---|---|
| committer | Aaron Turon <aturon@mozilla.com> | 2014-11-24 10:51:39 -0800 |
| commit | 985acfdb67d550d0259fcdcfbeed0a86ec3da9d0 (patch) | |
| tree | 0c5c9056f11c6f3f602310e1592345e931676c18 /src/libstd/sync | |
| parent | 54c628cb849ad53b66f0d738dc8c83529a9d08d2 (diff) | |
| download | rust-985acfdb67d550d0259fcdcfbeed0a86ec3da9d0.tar.gz rust-985acfdb67d550d0259fcdcfbeed0a86ec3da9d0.zip | |
Merge libsync into libstd
This patch merges the `libsync` crate into `libstd`, undoing part of the facade. This is in preparation for ultimately merging `librustrt`, as well as the upcoming rewrite of `sync`. Because this removes the `libsync` crate, it is a: [breaking-change] However, all uses of `libsync` should be able to reroute through `std::sync` and `std::comm` instead.
Diffstat (limited to 'src/libstd/sync')
| -rw-r--r-- | src/libstd/sync/atomic.rs | 223 | ||||
| -rw-r--r-- | src/libstd/sync/deque.rs | 663 | ||||
| -rw-r--r-- | src/libstd/sync/lock.rs | 828 | ||||
| -rw-r--r-- | src/libstd/sync/mod.rs | 38 | ||||
| -rw-r--r-- | src/libstd/sync/mpmc_bounded_queue.rs | 219 | ||||
| -rw-r--r-- | src/libstd/sync/mpsc_queue.rs | 210 | ||||
| -rw-r--r-- | src/libstd/sync/mutex.rs | 218 | ||||
| -rw-r--r-- | src/libstd/sync/one.rs | 170 | ||||
| -rw-r--r-- | src/libstd/sync/raw.rs | 1129 | ||||
| -rw-r--r-- | src/libstd/sync/spsc_queue.rs | 385 |
10 files changed, 4076 insertions, 7 deletions
diff --git a/src/libstd/sync/atomic.rs b/src/libstd/sync/atomic.rs new file mode 100644 index 00000000000..2bb55188113 --- /dev/null +++ b/src/libstd/sync/atomic.rs @@ -0,0 +1,223 @@ +// Copyright 2012-2014 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +//! Atomic types +//! +//! Atomic types provide primitive shared-memory communication between +//! threads, and are the building blocks of other concurrent +//! types. +//! +//! This module defines atomic versions of a select number of primitive +//! types, including `AtomicBool`, `AtomicInt`, `AtomicUint`, and `AtomicOption`. +//! Atomic types present operations that, when used correctly, synchronize +//! updates between threads. +//! +//! Each method takes an `Ordering` which represents the strength of +//! the memory barrier for that operation. These orderings are the +//! same as [C++11 atomic orderings][1]. +//! +//! [1]: http://gcc.gnu.org/wiki/Atomic/GCCMM/AtomicSync +//! +//! Atomic variables are safe to share between threads (they implement `Sync`) +//! but they do not themselves provide the mechanism for sharing. The most +//! common way to share an atomic variable is to put it into an `Arc` (an +//! atomically-reference-counted shared pointer). +//! +//! Most atomic types may be stored in static variables, initialized using +//! the provided static initializers like `INIT_ATOMIC_BOOL`. Atomic statics +//! are often used for lazy global initialization. +//! +//! +//! # Examples +//! +//! A simple spinlock: +//! +//! ``` +//! use std::sync::Arc; +//! use std::sync::atomic::{AtomicUint, SeqCst}; +//! +//! fn main() { +//! let spinlock = Arc::new(AtomicUint::new(1)); +//! +//! let spinlock_clone = spinlock.clone(); +//! spawn(proc() { +//! spinlock_clone.store(0, SeqCst); +//! }); +//! +//! // Wait for the other task to release the lock +//! while spinlock.load(SeqCst) != 0 {} +//! } +//! ``` +//! +//! Transferring a heap object with `AtomicOption`: +//! +//! ``` +//! use std::sync::Arc; +//! use std::sync::atomic::{AtomicOption, SeqCst}; +//! +//! fn main() { +//! struct BigObject; +//! +//! let shared_big_object = Arc::new(AtomicOption::empty()); +//! +//! let shared_big_object_clone = shared_big_object.clone(); +//! spawn(proc() { +//! let unwrapped_big_object = shared_big_object_clone.take(SeqCst); +//! if unwrapped_big_object.is_some() { +//! println!("got a big object from another task"); +//! } else { +//! println!("other task hasn't sent big object yet"); +//! } +//! }); +//! +//! shared_big_object.swap(box BigObject, SeqCst); +//! } +//! ``` +//! +//! Keep a global count of live tasks: +//! +//! ``` +//! use std::sync::atomic::{AtomicUint, SeqCst, INIT_ATOMIC_UINT}; +//! +//! static GLOBAL_TASK_COUNT: AtomicUint = INIT_ATOMIC_UINT; +//! +//! let old_task_count = GLOBAL_TASK_COUNT.fetch_add(1, SeqCst); +//! println!("live tasks: {}", old_task_count + 1); +//! ``` + +#![allow(deprecated)] + +use alloc::boxed::Box; +use core::mem; +use core::prelude::{Send, Drop, None, Option, Some}; + +pub use core::atomic::{AtomicBool, AtomicInt, AtomicUint, AtomicPtr}; +pub use core::atomic::{Ordering, Relaxed, Release, Acquire, AcqRel, SeqCst}; +pub use core::atomic::{INIT_ATOMIC_BOOL, INIT_ATOMIC_INT, INIT_ATOMIC_UINT}; +pub use core::atomic::fence; + +/// An atomic, nullable unique pointer +/// +/// This can be used as the concurrency primitive for operations that transfer +/// owned heap objects across tasks. +#[unsafe_no_drop_flag] +#[deprecated = "no longer used; will eventually be replaced by a higher-level\ + concept like MVar"] +pub struct AtomicOption<T> { + p: AtomicUint, +} + +impl<T: Send> AtomicOption<T> { + /// Create a new `AtomicOption` + pub fn new(p: Box<T>) -> AtomicOption<T> { + unsafe { AtomicOption { p: AtomicUint::new(mem::transmute(p)) } } + } + + /// Create a new `AtomicOption` that doesn't contain a value + pub fn empty() -> AtomicOption<T> { AtomicOption { p: AtomicUint::new(0) } } + + /// Store a value, returning the old value + #[inline] + pub fn swap(&self, val: Box<T>, order: Ordering) -> Option<Box<T>> { + let val = unsafe { mem::transmute(val) }; + + match self.p.swap(val, order) { + 0 => None, + n => Some(unsafe { mem::transmute(n) }), + } + } + + /// Remove the value, leaving the `AtomicOption` empty. + #[inline] + pub fn take(&self, order: Ordering) -> Option<Box<T>> { + unsafe { self.swap(mem::transmute(0u), order) } + } + + /// Replace an empty value with a non-empty value. + /// + /// Succeeds if the option is `None` and returns `None` if so. If + /// the option was already `Some`, returns `Some` of the rejected + /// value. + #[inline] + pub fn fill(&self, val: Box<T>, order: Ordering) -> Option<Box<T>> { + unsafe { + let val = mem::transmute(val); + let expected = mem::transmute(0u); + let oldval = self.p.compare_and_swap(expected, val, order); + if oldval == expected { + None + } else { + Some(mem::transmute(val)) + } + } + } + + /// Returns `true` if the `AtomicOption` is empty. + /// + /// Be careful: The caller must have some external method of ensuring the + /// result does not get invalidated by another task after this returns. + #[inline] + pub fn is_empty(&self, order: Ordering) -> bool { + self.p.load(order) as uint == 0 + } +} + +#[unsafe_destructor] +impl<T: Send> Drop for AtomicOption<T> { + fn drop(&mut self) { + let _ = self.take(SeqCst); + } +} + +#[cfg(test)] +mod test { + use prelude::*; + use super::*; + + #[test] + fn option_empty() { + let option: AtomicOption<()> = AtomicOption::empty(); + assert!(option.is_empty(SeqCst)); + } + + #[test] + fn option_swap() { + let p = AtomicOption::new(box 1i); + let a = box 2i; + + let b = p.swap(a, SeqCst); + + assert!(b == Some(box 1)); + assert!(p.take(SeqCst) == Some(box 2)); + } + + #[test] + fn option_take() { + let p = AtomicOption::new(box 1i); + + assert!(p.take(SeqCst) == Some(box 1)); + assert!(p.take(SeqCst) == None); + + let p2 = box 2i; + p.swap(p2, SeqCst); + + assert!(p.take(SeqCst) == Some(box 2)); + } + + #[test] + fn option_fill() { + let p = AtomicOption::new(box 1i); + assert!(p.fill(box 2i, SeqCst).is_some()); // should fail; shouldn't leak! + assert!(p.take(SeqCst) == Some(box 1)); + + assert!(p.fill(box 2i, SeqCst).is_none()); // shouldn't fail + assert!(p.take(SeqCst) == Some(box 2)); + } +} diff --git a/src/libstd/sync/deque.rs b/src/libstd/sync/deque.rs new file mode 100644 index 00000000000..33f6f77eb62 --- /dev/null +++ b/src/libstd/sync/deque.rs @@ -0,0 +1,663 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +//! A (mostly) lock-free concurrent work-stealing deque +//! +//! This module contains an implementation of the Chase-Lev work stealing deque +//! described in "Dynamic Circular Work-Stealing Deque". The implementation is +//! heavily based on the pseudocode found in the paper. +//! +//! This implementation does not want to have the restriction of a garbage +//! collector for reclamation of buffers, and instead it uses a shared pool of +//! buffers. This shared pool is required for correctness in this +//! implementation. +//! +//! The only lock-synchronized portions of this deque are the buffer allocation +//! and deallocation portions. Otherwise all operations are lock-free. +//! +//! # Example +//! +//! use std::sync::deque::BufferPool; +//! +//! let mut pool = BufferPool::new(); +//! let (mut worker, mut stealer) = pool.deque(); +//! +//! // Only the worker may push/pop +//! worker.push(1i); +//! worker.pop(); +//! +//! // Stealers take data from the other end of the deque +//! worker.push(1i); +//! stealer.steal(); +//! +//! // Stealers can be cloned to have many stealers stealing in parallel +//! worker.push(1i); +//! let mut stealer2 = stealer.clone(); +//! stealer2.steal(); + +#![experimental] + +// NB: the "buffer pool" strategy is not done for speed, but rather for +// correctness. For more info, see the comment on `swap_buffer` + +// FIXME: all atomic operations in this module use a SeqCst ordering. That is +// probably overkill + +pub use self::Stolen::*; + +use core::prelude::*; + +use alloc::arc::Arc; +use alloc::heap::{allocate, deallocate}; +use alloc::boxed::Box; +use vec::Vec; +use core::kinds::marker; +use core::mem::{forget, min_align_of, size_of, transmute}; +use core::ptr; +use rustrt::exclusive::Exclusive; + +use sync::atomic::{AtomicInt, AtomicPtr, SeqCst}; + +// Once the queue is less than 1/K full, then it will be downsized. Note that +// the deque requires that this number be less than 2. +static K: int = 4; + +// Minimum number of bits that a buffer size should be. No buffer will resize to +// under this value, and all deques will initially contain a buffer of this +// size. +// +// The size in question is 1 << MIN_BITS +static MIN_BITS: uint = 7; + +struct Deque<T> { + bottom: AtomicInt, + top: AtomicInt, + array: AtomicPtr<Buffer<T>>, + pool: BufferPool<T>, +} + +/// Worker half of the work-stealing deque. This worker has exclusive access to +/// one side of the deque, and uses `push` and `pop` method to manipulate it. +/// +/// There may only be one worker per deque. +pub struct Worker<T> { + deque: Arc<Deque<T>>, + _noshare: marker::NoSync, +} + +/// The stealing half of the work-stealing deque. Stealers have access to the +/// opposite end of the deque from the worker, and they only have access to the +/// `steal` method. +pub struct Stealer<T> { + deque: Arc<Deque<T>>, + _noshare: marker::NoSync, +} + +/// When stealing some data, this is an enumeration of the possible outcomes. +#[deriving(PartialEq, Show)] +pub enum Stolen<T> { + /// The deque was empty at the time of stealing + Empty, + /// The stealer lost the race for stealing data, and a retry may return more + /// data. + Abort, + /// The stealer has successfully stolen some data. + Data(T), +} + +/// The allocation pool for buffers used by work-stealing deques. Right now this +/// structure is used for reclamation of memory after it is no longer in use by +/// deques. +/// +/// This data structure is protected by a mutex, but it is rarely used. Deques +/// will only use this structure when allocating a new buffer or deallocating a +/// previous one. +pub struct BufferPool<T> { + pool: Arc<Exclusive<Vec<Box<Buffer<T>>>>>, +} + +/// An internal buffer used by the chase-lev deque. This structure is actually +/// implemented as a circular buffer, and is used as the intermediate storage of +/// the data in the deque. +/// +/// This type is implemented with *T instead of Vec<T> for two reasons: +/// +/// 1. There is nothing safe about using this buffer. This easily allows the +/// same value to be read twice in to rust, and there is nothing to +/// prevent this. The usage by the deque must ensure that one of the +/// values is forgotten. Furthermore, we only ever want to manually run +/// destructors for values in this buffer (on drop) because the bounds +/// are defined by the deque it's owned by. +/// +/// 2. We can certainly avoid bounds checks using *T instead of Vec<T>, although +/// LLVM is probably pretty good at doing this already. +struct Buffer<T> { + storage: *const T, + log_size: uint, +} + +impl<T: Send> BufferPool<T> { + /// Allocates a new buffer pool which in turn can be used to allocate new + /// deques. + pub fn new() -> BufferPool<T> { + BufferPool { pool: Arc::new(Exclusive::new(Vec::new())) } + } + + /// Allocates a new work-stealing deque which will send/receiving memory to + /// and from this buffer pool. + pub fn deque(&self) -> (Worker<T>, Stealer<T>) { + let a = Arc::new(Deque::new(self.clone())); + let b = a.clone(); + (Worker { deque: a, _noshare: marker::NoSync }, + Stealer { deque: b, _noshare: marker::NoSync }) + } + + fn alloc(&mut self, bits: uint) -> Box<Buffer<T>> { + unsafe { + let mut pool = self.pool.lock(); + match pool.iter().position(|x| x.size() >= (1 << bits)) { + Some(i) => pool.remove(i).unwrap(), + None => box Buffer::new(bits) + } + } + } + + fn free(&self, buf: Box<Buffer<T>>) { + unsafe { + let mut pool = self.pool.lock(); + match pool.iter().position(|v| v.size() > buf.size()) { + Some(i) => pool.insert(i, buf), + None => pool.push(buf), + } + } + } +} + +impl<T: Send> Clone for BufferPool<T> { + fn clone(&self) -> BufferPool<T> { BufferPool { pool: self.pool.clone() } } +} + +impl<T: Send> Worker<T> { + /// Pushes data onto the front of this work queue. + pub fn push(&self, t: T) { + unsafe { self.deque.push(t) } + } + /// Pops data off the front of the work queue, returning `None` on an empty + /// queue. + pub fn pop(&self) -> Option<T> { + unsafe { self.deque.pop() } + } + + /// Gets access to the buffer pool that this worker is attached to. This can + /// be used to create more deques which share the same buffer pool as this + /// deque. + pub fn pool<'a>(&'a self) -> &'a BufferPool<T> { + &self.deque.pool + } +} + +impl<T: Send> Stealer<T> { + /// Steals work off the end of the queue (opposite of the worker's end) + pub fn steal(&self) -> Stolen<T> { + unsafe { self.deque.steal() } + } + + /// Gets access to the buffer pool that this stealer is attached to. This + /// can be used to create more deques which share the same buffer pool as + /// this deque. + pub fn pool<'a>(&'a self) -> &'a BufferPool<T> { + &self.deque.pool + } +} + +impl<T: Send> Clone for Stealer<T> { + fn clone(&self) -> Stealer<T> { + Stealer { deque: self.deque.clone(), _noshare: marker::NoSync } + } +} + +// Almost all of this code can be found directly in the paper so I'm not +// personally going to heavily comment what's going on here. + +impl<T: Send> Deque<T> { + fn new(mut pool: BufferPool<T>) -> Deque<T> { + let buf = pool.alloc(MIN_BITS); + Deque { + bottom: AtomicInt::new(0), + top: AtomicInt::new(0), + array: AtomicPtr::new(unsafe { transmute(buf) }), + pool: pool, + } + } + + unsafe fn push(&self, data: T) { + let mut b = self.bottom.load(SeqCst); + let t = self.top.load(SeqCst); + let mut a = self.array.load(SeqCst); + let size = b - t; + if size >= (*a).size() - 1 { + // You won't find this code in the chase-lev deque paper. This is + // alluded to in a small footnote, however. We always free a buffer + // when growing in order to prevent leaks. + a = self.swap_buffer(b, a, (*a).resize(b, t, 1)); + b = self.bottom.load(SeqCst); + } + (*a).put(b, data); + self.bottom.store(b + 1, SeqCst); + } + + unsafe fn pop(&self) -> Option<T> { + let b = self.bottom.load(SeqCst); + let a = self.array.load(SeqCst); + let b = b - 1; + self.bottom.store(b, SeqCst); + let t = self.top.load(SeqCst); + let size = b - t; + if size < 0 { + self.bottom.store(t, SeqCst); + return None; + } + let data = (*a).get(b); + if size > 0 { + self.maybe_shrink(b, t); + return Some(data); + } + if self.top.compare_and_swap(t, t + 1, SeqCst) == t { + self.bottom.store(t + 1, SeqCst); + return Some(data); + } else { + self.bottom.store(t + 1, SeqCst); + forget(data); // someone else stole this value + return None; + } + } + + unsafe fn steal(&self) -> Stolen<T> { + let t = self.top.load(SeqCst); + let old = self.array.load(SeqCst); + let b = self.bottom.load(SeqCst); + let a = self.array.load(SeqCst); + let size = b - t; + if size <= 0 { return Empty } + if size % (*a).size() == 0 { + if a == old && t == self.top.load(SeqCst) { + return Empty + } + return Abort + } + let data = (*a).get(t); + if self.top.compare_and_swap(t, t + 1, SeqCst) == t { + Data(data) + } else { + forget(data); // someone else stole this value + Abort + } + } + + unsafe fn maybe_shrink(&self, b: int, t: int) { + let a = self.array.load(SeqCst); + if b - t < (*a).size() / K && b - t > (1 << MIN_BITS) { + self.swap_buffer(b, a, (*a).resize(b, t, -1)); + } + } + + // Helper routine not mentioned in the paper which is used in growing and + // shrinking buffers to swap in a new buffer into place. As a bit of a + // recap, the whole point that we need a buffer pool rather than just + // calling malloc/free directly is that stealers can continue using buffers + // after this method has called 'free' on it. The continued usage is simply + // a read followed by a forget, but we must make sure that the memory can + // continue to be read after we flag this buffer for reclamation. + unsafe fn swap_buffer(&self, b: int, old: *mut Buffer<T>, + buf: Buffer<T>) -> *mut Buffer<T> { + let newbuf: *mut Buffer<T> = transmute(box buf); + self.array.store(newbuf, SeqCst); + let ss = (*newbuf).size(); + self.bottom.store(b + ss, SeqCst); + let t = self.top.load(SeqCst); + if self.top.compare_and_swap(t, t + ss, SeqCst) != t { + self.bottom.store(b, SeqCst); + } + self.pool.free(transmute(old)); + return newbuf; + } +} + + +#[unsafe_destructor] +impl<T: Send> Drop for Deque<T> { + fn drop(&mut self) { + let t = self.top.load(SeqCst); + let b = self.bottom.load(SeqCst); + let a = self.array.load(SeqCst); + // Free whatever is leftover in the dequeue, and then move the buffer + // back into the pool. + for i in range(t, b) { + let _: T = unsafe { (*a).get(i) }; + } + self.pool.free(unsafe { transmute(a) }); + } +} + +#[inline] +fn buffer_alloc_size<T>(log_size: uint) -> uint { + (1 << log_size) * size_of::<T>() +} + +impl<T: Send> Buffer<T> { + unsafe fn new(log_size: uint) -> Buffer<T> { + let size = buffer_alloc_size::<T>(log_size); + let buffer = allocate(size, min_align_of::<T>()); + if buffer.is_null() { ::alloc::oom() } + Buffer { + storage: buffer as *const T, + log_size: log_size, + } + } + + fn size(&self) -> int { 1 << self.log_size } + + // Apparently LLVM cannot optimize (foo % (1 << bar)) into this implicitly + fn mask(&self) -> int { (1 << self.log_size) - 1 } + + unsafe fn elem(&self, i: int) -> *const T { + self.storage.offset(i & self.mask()) + } + + // This does not protect against loading duplicate values of the same cell, + // nor does this clear out the contents contained within. Hence, this is a + // very unsafe method which the caller needs to treat specially in case a + // race is lost. + unsafe fn get(&self, i: int) -> T { + ptr::read(self.elem(i)) + } + + // Unsafe because this unsafely overwrites possibly uninitialized or + // initialized data. + unsafe fn put(&self, i: int, t: T) { + ptr::write(self.elem(i) as *mut T, t); + } + + // Again, unsafe because this has incredibly dubious ownership violations. + // It is assumed that this buffer is immediately dropped. + unsafe fn resize(&self, b: int, t: int, delta: int) -> Buffer<T> { + // NB: not entirely obvious, but thanks to 2's complement, + // casting delta to uint and then adding gives the desired + // effect. + let buf = Buffer::new(self.log_size + delta as uint); + for i in range(t, b) { + buf.put(i, self.get(i)); + } + return buf; + } +} + +#[unsafe_destructor] +impl<T: Send> Drop for Buffer<T> { + fn drop(&mut self) { + // It is assumed that all buffers are empty on drop. + let size = buffer_alloc_size::<T>(self.log_size); + unsafe { deallocate(self.storage as *mut u8, size, min_align_of::<T>()) } + } +} + +#[cfg(test)] +mod tests { + use prelude::*; + use super::{Data, BufferPool, Abort, Empty, Worker, Stealer}; + + use mem; + use rustrt::thread::Thread; + use rand; + use rand::Rng; + use sync::atomic::{AtomicBool, INIT_ATOMIC_BOOL, SeqCst, + AtomicUint, INIT_ATOMIC_UINT}; + use vec; + + #[test] + fn smoke() { + let pool = BufferPool::new(); + let (w, s) = pool.deque(); + assert_eq!(w.pop(), None); + assert_eq!(s.steal(), Empty); + w.push(1i); + assert_eq!(w.pop(), Some(1)); + w.push(1); + assert_eq!(s.steal(), Data(1)); + w.push(1); + assert_eq!(s.clone().steal(), Data(1)); + } + + #[test] + fn stealpush() { + static AMT: int = 100000; + let pool = BufferPool::<int>::new(); + let (w, s) = pool.deque(); + let t = Thread::start(proc() { + let mut left = AMT; + while left > 0 { + match s.steal() { + Data(i) => { + assert_eq!(i, 1); + left -= 1; + } + Abort | Empty => {} + } + } + }); + + for _ in range(0, AMT) { + w.push(1); + } + + t.join(); + } + + #[test] + fn stealpush_large() { + static AMT: int = 100000; + let pool = BufferPool::<(int, int)>::new(); + let (w, s) = pool.deque(); + let t = Thread::start(proc() { + let mut left = AMT; + while left > 0 { + match s.steal() { + Data((1, 10)) => { left -= 1; } + Data(..) => panic!(), + Abort | Empty => {} + } + } + }); + + for _ in range(0, AMT) { + w.push((1, 10)); + } + + t.join(); + } + + fn stampede(w: Worker<Box<int>>, s: Stealer<Box<int>>, + nthreads: int, amt: uint) { + for _ in range(0, amt) { + w.push(box 20); + } + let mut remaining = AtomicUint::new(amt); + let unsafe_remaining: *mut AtomicUint = &mut remaining; + + let threads = range(0, nthreads).map(|_| { + let s = s.clone(); + Thread::start(proc() { + unsafe { + while (*unsafe_remaining).load(SeqCst) > 0 { + match s.steal() { + Data(box 20) => { + (*unsafe_remaining).fetch_sub(1, SeqCst); + } + Data(..) => panic!(), + Abort | Empty => {} + } + } + } + }) + }).collect::<Vec<Thread<()>>>(); + + while remaining.load(SeqCst) > 0 { + match w.pop() { + Some(box 20) => { remaining.fetch_sub(1, SeqCst); } + Some(..) => panic!(), + None => {} + } + } + + for thread in threads.into_iter() { + thread.join(); + } + } + + #[test] + fn run_stampede() { + let pool = BufferPool::<Box<int>>::new(); + let (w, s) = pool.deque(); + stampede(w, s, 8, 10000); + } + + #[test] + fn many_stampede() { + static AMT: uint = 4; + let pool = BufferPool::<Box<int>>::new(); + let threads = range(0, AMT).map(|_| { + let (w, s) = pool.deque(); + Thread::start(proc() { + stampede(w, s, 4, 10000); + }) + }).collect::<Vec<Thread<()>>>(); + + for thread in threads.into_iter() { + thread.join(); + } + } + + #[test] + fn stress() { + static AMT: int = 100000; + static NTHREADS: int = 8; + static DONE: AtomicBool = INIT_ATOMIC_BOOL; + static HITS: AtomicUint = INIT_ATOMIC_UINT; + let pool = BufferPool::<int>::new(); + let (w, s) = pool.deque(); + + let threads = range(0, NTHREADS).map(|_| { + let s = s.clone(); + Thread::start(proc() { + loop { + match s.steal() { + Data(2) => { HITS.fetch_add(1, SeqCst); } + Data(..) => panic!(), + _ if DONE.load(SeqCst) => break, + _ => {} + } + } + }) + }).collect::<Vec<Thread<()>>>(); + + let mut rng = rand::task_rng(); + let mut expected = 0; + while expected < AMT { + if rng.gen_range(0i, 3) == 2 { + match w.pop() { + None => {} + Some(2) => { HITS.fetch_add(1, SeqCst); }, + Some(_) => panic!(), + } + } else { + expected += 1; + w.push(2); + } + } + + while HITS.load(SeqCst) < AMT as uint { + match w.pop() { + None => {} + Some(2) => { HITS.fetch_add(1, SeqCst); }, + Some(_) => panic!(), + } + } + DONE.store(true, SeqCst); + + for thread in threads.into_iter() { + thread.join(); + } + + assert_eq!(HITS.load(SeqCst), expected as uint); + } + + #[test] + #[cfg_attr(windows, ignore)] // apparently windows scheduling is weird? + fn no_starvation() { + static AMT: int = 10000; + static NTHREADS: int = 4; + static DONE: AtomicBool = INIT_ATOMIC_BOOL; + let pool = BufferPool::<(int, uint)>::new(); + let (w, s) = pool.deque(); + + let (threads, hits) = vec::unzip(range(0, NTHREADS).map(|_| { + let s = s.clone(); + let unique_box = box AtomicUint::new(0); + let thread_box = unsafe { + *mem::transmute::<&Box<AtomicUint>, + *const *mut AtomicUint>(&unique_box) + }; + (Thread::start(proc() { + unsafe { + loop { + match s.steal() { + Data((1, 2)) => { + (*thread_box).fetch_add(1, SeqCst); + } + Data(..) => panic!(), + _ if DONE.load(SeqCst) => break, + _ => {} + } + } + } + }), unique_box) + })); + + let mut rng = rand::task_rng(); + let mut myhit = false; + 'outer: loop { + for _ in range(0, rng.gen_range(0, AMT)) { + if !myhit && rng.gen_range(0i, 3) == 2 { + match w.pop() { + None => {} + Some((1, 2)) => myhit = true, + Some(_) => panic!(), + } + } else { + w.push((1, 2)); + } + } + + for slot in hits.iter() { + let amt = slot.load(SeqCst); + if amt == 0 { continue 'outer; } + } + if myhit { + break + } + } + + DONE.store(true, SeqCst); + + for thread in threads.into_iter() { + thread.join(); + } + } +} diff --git a/src/libstd/sync/lock.rs b/src/libstd/sync/lock.rs new file mode 100644 index 00000000000..6b63f7ae618 --- /dev/null +++ b/src/libstd/sync/lock.rs @@ -0,0 +1,828 @@ +// Copyright 2012-2014 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +//! Wrappers for safe, shared, mutable memory between tasks +//! +//! The wrappers in this module build on the primitives from `sync::raw` to +//! provide safe interfaces around using the primitive locks. These primitives +//! implement a technique called "poisoning" where when a task panicked with a +//! held lock, all future attempts to use the lock will panic. +//! +//! For example, if two tasks are contending on a mutex and one of them panics +//! after grabbing the lock, the second task will immediately panic because the +//! lock is now poisoned. + +use core::prelude::*; + +use self::Inner::*; + +use core::cell::UnsafeCell; +use rustrt::local::Local; +use rustrt::task::Task; + +use super::raw; + +/**************************************************************************** + * Poisoning helpers + ****************************************************************************/ + +struct PoisonOnFail<'a> { + flag: &'a mut bool, + failed: bool, +} + +fn failing() -> bool { + Local::borrow(None::<Task>).unwinder.unwinding() +} + +impl<'a> PoisonOnFail<'a> { + fn check(flag: bool, name: &str) { + if flag { + panic!("Poisoned {} - another task failed inside!", name); + } + } + + fn new<'a>(flag: &'a mut bool, name: &str) -> PoisonOnFail<'a> { + PoisonOnFail::check(*flag, name); + PoisonOnFail { + flag: flag, + failed: failing() + } + } +} + +#[unsafe_destructor] +impl<'a> Drop for PoisonOnFail<'a> { + fn drop(&mut self) { + if !self.failed && failing() { + *self.flag = true; + } + } +} + +/**************************************************************************** + * Condvar + ****************************************************************************/ + +enum Inner<'a> { + InnerMutex(raw::MutexGuard<'a>), + InnerRWLock(raw::RWLockWriteGuard<'a>), +} + +impl<'b> Inner<'b> { + fn cond<'a>(&'a self) -> &'a raw::Condvar<'b> { + match *self { + InnerMutex(ref m) => &m.cond, + InnerRWLock(ref m) => &m.cond, + } + } +} + +/// A condition variable, a mechanism for unlock-and-descheduling and +/// signaling, for use with the lock types. +pub struct Condvar<'a> { + name: &'static str, + // n.b. Inner must be after PoisonOnFail because we must set the poison flag + // *inside* the mutex, and struct fields are destroyed top-to-bottom + // (destroy the lock guard last). + poison: PoisonOnFail<'a>, + inner: Inner<'a>, +} + +impl<'a> Condvar<'a> { + /// Atomically exit the associated lock and block until a signal is sent. + /// + /// wait() is equivalent to wait_on(0). + /// + /// # Panics + /// + /// A task which is killed while waiting on a condition variable will wake + /// up, panic, and unlock the associated lock as it unwinds. + #[inline] + pub fn wait(&self) { self.wait_on(0) } + + /// Atomically exit the associated lock and block on a specified condvar + /// until a signal is sent on that same condvar. + /// + /// The associated lock must have been initialised with an appropriate + /// number of condvars. The condvar_id must be between 0 and num_condvars-1 + /// or else this call will fail. + #[inline] + pub fn wait_on(&self, condvar_id: uint) { + assert!(!*self.poison.flag); + self.inner.cond().wait_on(condvar_id); + // This is why we need to wrap sync::condvar. + PoisonOnFail::check(*self.poison.flag, self.name); + } + + /// Wake up a blocked task. Returns false if there was no blocked task. + #[inline] + pub fn signal(&self) -> bool { self.signal_on(0) } + + /// Wake up a blocked task on a specified condvar (as + /// sync::cond.signal_on). Returns false if there was no blocked task. + #[inline] + pub fn signal_on(&self, condvar_id: uint) -> bool { + assert!(!*self.poison.flag); + self.inner.cond().signal_on(condvar_id) + } + + /// Wake up all blocked tasks. Returns the number of tasks woken. + #[inline] + pub fn broadcast(&self) -> uint { self.broadcast_on(0) } + + /// Wake up all blocked tasks on a specified condvar (as + /// sync::cond.broadcast_on). Returns the number of tasks woken. + #[inline] + pub fn broadcast_on(&self, condvar_id: uint) -> uint { + assert!(!*self.poison.flag); + self.inner.cond().broadcast_on(condvar_id) + } +} + +/**************************************************************************** + * Mutex + ****************************************************************************/ + +/// A wrapper type which provides synchronized access to the underlying data, of +/// type `T`. A mutex always provides exclusive access, and concurrent requests +/// will block while the mutex is already locked. +/// +/// # Example +/// +/// ``` +/// use std::sync::{Mutex, Arc}; +/// +/// let mutex = Arc::new(Mutex::new(1i)); +/// let mutex2 = mutex.clone(); +/// +/// spawn(proc() { +/// let mut val = mutex2.lock(); +/// *val += 1; +/// val.cond.signal(); +/// }); +/// +/// let value = mutex.lock(); +/// while *value != 2 { +/// value.cond.wait(); +/// } +/// ``` +pub struct Mutex<T> { + lock: raw::Mutex, + failed: UnsafeCell<bool>, + data: UnsafeCell<T>, +} + +/// An guard which is created by locking a mutex. Through this guard the +/// underlying data can be accessed. +pub struct MutexGuard<'a, T:'a> { + // FIXME #12808: strange name to try to avoid interfering with + // field accesses of the contained type via Deref + _data: &'a mut T, + /// Inner condition variable connected to the locked mutex that this guard + /// was created from. This can be used for atomic-unlock-and-deschedule. + pub cond: Condvar<'a>, +} + +impl<T: Send> Mutex<T> { + /// Creates a new mutex to protect the user-supplied data. + pub fn new(user_data: T) -> Mutex<T> { + Mutex::new_with_condvars(user_data, 1) + } + + /// Create a new mutex, with a specified number of associated condvars. + /// + /// This will allow calling wait_on/signal_on/broadcast_on with condvar IDs + /// between 0 and num_condvars-1. (If num_condvars is 0, lock_cond will be + /// allowed but any operations on the condvar will fail.) + pub fn new_with_condvars(user_data: T, num_condvars: uint) -> Mutex<T> { + Mutex { + lock: raw::Mutex::new_with_condvars(num_condvars), + failed: UnsafeCell::new(false), + data: UnsafeCell::new(user_data), + } + } + + /// Access the underlying mutable data with mutual exclusion from other + /// tasks. The returned value is an RAII guard which will unlock the mutex + /// when dropped. All concurrent tasks attempting to lock the mutex will + /// block while the returned value is still alive. + /// + /// # Panics + /// + /// Panicking while inside the Mutex will unlock the Mutex while unwinding, so + /// that other tasks won't block forever. It will also poison the Mutex: + /// any tasks that subsequently try to access it (including those already + /// blocked on the mutex) will also panic immediately. + #[inline] + pub fn lock<'a>(&'a self) -> MutexGuard<'a, T> { + let guard = self.lock.lock(); + + // These two accesses are safe because we're guaranteed at this point + // that we have exclusive access to this mutex. We are indeed able to + // promote ourselves from &Mutex to `&mut T` + let poison = unsafe { &mut *self.failed.get() }; + let data = unsafe { &mut *self.data.get() }; + + MutexGuard { + _data: data, + cond: Condvar { + name: "Mutex", + poison: PoisonOnFail::new(poison, "Mutex"), + inner: InnerMutex(guard), + }, + } + } +} + +impl<'a, T: Send> Deref<T> for MutexGuard<'a, T> { + fn deref<'a>(&'a self) -> &'a T { &*self._data } +} +impl<'a, T: Send> DerefMut<T> for MutexGuard<'a, T> { + fn deref_mut<'a>(&'a mut self) -> &'a mut T { &mut *self._data } +} + +/**************************************************************************** + * R/W lock protected lock + ****************************************************************************/ + +/// A dual-mode reader-writer lock. The data can be accessed mutably or +/// immutably, and immutably-accessing tasks may run concurrently. +/// +/// # Example +/// +/// ``` +/// use std::sync::{RWLock, Arc}; +/// +/// let lock1 = Arc::new(RWLock::new(1i)); +/// let lock2 = lock1.clone(); +/// +/// spawn(proc() { +/// let mut val = lock2.write(); +/// *val = 3; +/// let val = val.downgrade(); +/// println!("{}", *val); +/// }); +/// +/// let val = lock1.read(); +/// println!("{}", *val); +/// ``` +pub struct RWLock<T> { + lock: raw::RWLock, + failed: UnsafeCell<bool>, + data: UnsafeCell<T>, +} + +/// A guard which is created by locking an rwlock in write mode. Through this +/// guard the underlying data can be accessed. +pub struct RWLockWriteGuard<'a, T:'a> { + // FIXME #12808: strange name to try to avoid interfering with + // field accesses of the contained type via Deref + _data: &'a mut T, + /// Inner condition variable that can be used to sleep on the write mode of + /// this rwlock. + pub cond: Condvar<'a>, +} + +/// A guard which is created by locking an rwlock in read mode. Through this +/// guard the underlying data can be accessed. +pub struct RWLockReadGuard<'a, T:'a> { + // FIXME #12808: strange names to try to avoid interfering with + // field accesses of the contained type via Deref + _data: &'a T, + _guard: raw::RWLockReadGuard<'a>, +} + +impl<T: Send + Sync> RWLock<T> { + /// Create a reader/writer lock with the supplied data. + pub fn new(user_data: T) -> RWLock<T> { + RWLock::new_with_condvars(user_data, 1) + } + + /// Create a reader/writer lock with the supplied data and a specified number + /// of condvars (as sync::RWLock::new_with_condvars). + pub fn new_with_condvars(user_data: T, num_condvars: uint) -> RWLock<T> { + RWLock { + lock: raw::RWLock::new_with_condvars(num_condvars), + failed: UnsafeCell::new(false), + data: UnsafeCell::new(user_data), + } + } + + /// Access the underlying data mutably. Locks the rwlock in write mode; + /// other readers and writers will block. + /// + /// # Panics + /// + /// Panicking while inside the lock will unlock the lock while unwinding, so + /// that other tasks won't block forever. As Mutex.lock, it will also poison + /// the lock, so subsequent readers and writers will both also panic. + #[inline] + pub fn write<'a>(&'a self) -> RWLockWriteGuard<'a, T> { + let guard = self.lock.write(); + + // These two accesses are safe because we're guaranteed at this point + // that we have exclusive access to this rwlock. We are indeed able to + // promote ourselves from &RWLock to `&mut T` + let poison = unsafe { &mut *self.failed.get() }; + let data = unsafe { &mut *self.data.get() }; + + RWLockWriteGuard { + _data: data, + cond: Condvar { + name: "RWLock", + poison: PoisonOnFail::new(poison, "RWLock"), + inner: InnerRWLock(guard), + }, + } + } + + /// Access the underlying data immutably. May run concurrently with other + /// reading tasks. + /// + /// # Panics + /// + /// Panicking will unlock the lock while unwinding. However, unlike all other + /// access modes, this will not poison the lock. + pub fn read<'a>(&'a self) -> RWLockReadGuard<'a, T> { + let guard = self.lock.read(); + PoisonOnFail::check(unsafe { *self.failed.get() }, "RWLock"); + RWLockReadGuard { + _guard: guard, + _data: unsafe { &*self.data.get() }, + } + } +} + +impl<'a, T: Send + Sync> RWLockWriteGuard<'a, T> { + /// Consumes this write lock token, returning a new read lock token. + /// + /// This will allow pending readers to come into the lock. + pub fn downgrade(self) -> RWLockReadGuard<'a, T> { + let RWLockWriteGuard { _data, cond } = self; + // convert the data to read-only explicitly + let data = &*_data; + let guard = match cond.inner { + InnerMutex(..) => unreachable!(), + InnerRWLock(guard) => guard.downgrade() + }; + RWLockReadGuard { _guard: guard, _data: data } + } +} + +impl<'a, T: Send + Sync> Deref<T> for RWLockReadGuard<'a, T> { + fn deref<'a>(&'a self) -> &'a T { self._data } +} +impl<'a, T: Send + Sync> Deref<T> for RWLockWriteGuard<'a, T> { + fn deref<'a>(&'a self) -> &'a T { &*self._data } +} +impl<'a, T: Send + Sync> DerefMut<T> for RWLockWriteGuard<'a, T> { + fn deref_mut<'a>(&'a mut self) -> &'a mut T { &mut *self._data } +} + +/**************************************************************************** + * Barrier + ****************************************************************************/ + +/// A barrier enables multiple tasks to synchronize the beginning +/// of some computation. +/// +/// ```rust +/// use std::sync::{Arc, Barrier}; +/// +/// let barrier = Arc::new(Barrier::new(10)); +/// for _ in range(0u, 10) { +/// let c = barrier.clone(); +/// // The same messages will be printed together. +/// // You will NOT see any interleaving. +/// spawn(proc() { +/// println!("before wait"); +/// c.wait(); +/// println!("after wait"); +/// }); +/// } +/// ``` +pub struct Barrier { + lock: Mutex<BarrierState>, + num_tasks: uint, +} + +// The inner state of a double barrier +struct BarrierState { + count: uint, + generation_id: uint, +} + +impl Barrier { + /// Create a new barrier that can block a given number of tasks. + pub fn new(num_tasks: uint) -> Barrier { + Barrier { + lock: Mutex::new(BarrierState { + count: 0, + generation_id: 0, + }), + num_tasks: num_tasks, + } + } + + /// Block the current task until a certain number of tasks is waiting. + pub fn wait(&self) { + let mut lock = self.lock.lock(); + let local_gen = lock.generation_id; + lock.count += 1; + if lock.count < self.num_tasks { + // We need a while loop to guard against spurious wakeups. + // http://en.wikipedia.org/wiki/Spurious_wakeup + while local_gen == lock.generation_id && + lock.count < self.num_tasks { + lock.cond.wait(); + } + } else { + lock.count = 0; + lock.generation_id += 1; + lock.cond.broadcast(); + } + } +} + +/**************************************************************************** + * Tests + ****************************************************************************/ + +#[cfg(test)] +mod tests { + use prelude::*; + use comm::Empty; + use task; + use task::try_future; + use sync::Arc; + + use super::{Mutex, Barrier, RWLock}; + + #[test] + fn test_mutex_arc_condvar() { + let arc = Arc::new(Mutex::new(false)); + let arc2 = arc.clone(); + let (tx, rx) = channel(); + task::spawn(proc() { + // wait until parent gets in + rx.recv(); + let mut lock = arc2.lock(); + *lock = true; + lock.cond.signal(); + }); + + let lock = arc.lock(); + tx.send(()); + assert!(!*lock); + while !*lock { + lock.cond.wait(); + } + } + + #[test] #[should_fail] + fn test_arc_condvar_poison() { + let arc = Arc::new(Mutex::new(1i)); + let arc2 = arc.clone(); + let (tx, rx) = channel(); + + spawn(proc() { + rx.recv(); + let lock = arc2.lock(); + lock.cond.signal(); + // Parent should fail when it wakes up. + panic!(); + }); + + let lock = arc.lock(); + tx.send(()); + while *lock == 1 { + lock.cond.wait(); + } + } + + #[test] #[should_fail] + fn test_mutex_arc_poison() { + let arc = Arc::new(Mutex::new(1i)); + let arc2 = arc.clone(); + let _ = task::try(proc() { + let lock = arc2.lock(); + assert_eq!(*lock, 2); + }); + let lock = arc.lock(); + assert_eq!(*lock, 1); + } + + #[test] + fn test_mutex_arc_nested() { + // Tests nested mutexes and access + // to underlying data. + let arc = Arc::new(Mutex::new(1i)); + let arc2 = Arc::new(Mutex::new(arc)); + task::spawn(proc() { + let lock = arc2.lock(); + let lock2 = lock.deref().lock(); + assert_eq!(*lock2, 1); + }); + } + + #[test] + fn test_mutex_arc_access_in_unwind() { + let arc = Arc::new(Mutex::new(1i)); + let arc2 = arc.clone(); + let _ = task::try::<()>(proc() { + struct Unwinder { + i: Arc<Mutex<int>>, + } + impl Drop for Unwinder { + fn drop(&mut self) { + let mut lock = self.i.lock(); + *lock += 1; + } + } + let _u = Unwinder { i: arc2 }; + panic!(); + }); + let lock = arc.lock(); + assert_eq!(*lock, 2); + } + + #[test] #[should_fail] + fn test_rw_arc_poison_wr() { + let arc = Arc::new(RWLock::new(1i)); + let arc2 = arc.clone(); + let _ = task::try(proc() { + let lock = arc2.write(); + assert_eq!(*lock, 2); + }); + let lock = arc.read(); + assert_eq!(*lock, 1); + } + #[test] #[should_fail] + fn test_rw_arc_poison_ww() { + let arc = Arc::new(RWLock::new(1i)); + let arc2 = arc.clone(); + let _ = task::try(proc() { + let lock = arc2.write(); + assert_eq!(*lock, 2); + }); + let lock = arc.write(); + assert_eq!(*lock, 1); + } + #[test] + fn test_rw_arc_no_poison_rr() { + let arc = Arc::new(RWLock::new(1i)); + let arc2 = arc.clone(); + let _ = task::try(proc() { + let lock = arc2.read(); + assert_eq!(*lock, 2); + }); + let lock = arc.read(); + assert_eq!(*lock, 1); + } + #[test] + fn test_rw_arc_no_poison_rw() { + let arc = Arc::new(RWLock::new(1i)); + let arc2 = arc.clone(); + let _ = task::try(proc() { + let lock = arc2.read(); + assert_eq!(*lock, 2); + }); + let lock = arc.write(); + assert_eq!(*lock, 1); + } + #[test] + fn test_rw_arc_no_poison_dr() { + let arc = Arc::new(RWLock::new(1i)); + let arc2 = arc.clone(); + let _ = task::try(proc() { + let lock = arc2.write().downgrade(); + assert_eq!(*lock, 2); + }); + let lock = arc.write(); + assert_eq!(*lock, 1); + } + + #[test] + fn test_rw_arc() { + let arc = Arc::new(RWLock::new(0i)); + let arc2 = arc.clone(); + let (tx, rx) = channel(); + + task::spawn(proc() { + let mut lock = arc2.write(); + for _ in range(0u, 10) { + let tmp = *lock; + *lock = -1; + task::deschedule(); + *lock = tmp + 1; + } + tx.send(()); + }); + + // Readers try to catch the writer in the act + let mut children = Vec::new(); + for _ in range(0u, 5) { + let arc3 = arc.clone(); + children.push(try_future(proc() { + let lock = arc3.read(); + assert!(*lock >= 0); + })); + } + + // Wait for children to pass their asserts + for r in children.iter_mut() { + assert!(r.get_ref().is_ok()); + } + + // Wait for writer to finish + rx.recv(); + let lock = arc.read(); + assert_eq!(*lock, 10); + } + + #[test] + fn test_rw_arc_access_in_unwind() { + let arc = Arc::new(RWLock::new(1i)); + let arc2 = arc.clone(); + let _ = task::try::<()>(proc() { + struct Unwinder { + i: Arc<RWLock<int>>, + } + impl Drop for Unwinder { + fn drop(&mut self) { + let mut lock = self.i.write(); + *lock += 1; + } + } + let _u = Unwinder { i: arc2 }; + panic!(); + }); + let lock = arc.read(); + assert_eq!(*lock, 2); + } + + #[test] + fn test_rw_downgrade() { + // (1) A downgrader gets in write mode and does cond.wait. + // (2) A writer gets in write mode, sets state to 42, and does signal. + // (3) Downgrader wakes, sets state to 31337. + // (4) tells writer and all other readers to contend as it downgrades. + // (5) Writer attempts to set state back to 42, while downgraded task + // and all reader tasks assert that it's 31337. + let arc = Arc::new(RWLock::new(0i)); + + // Reader tasks + let mut reader_convos = Vec::new(); + for _ in range(0u, 10) { + let ((tx1, rx1), (tx2, rx2)) = (channel(), channel()); + reader_convos.push((tx1, rx2)); + let arcn = arc.clone(); + task::spawn(proc() { + rx1.recv(); // wait for downgrader to give go-ahead + let lock = arcn.read(); + assert_eq!(*lock, 31337); + tx2.send(()); + }); + } + + // Writer task + let arc2 = arc.clone(); + let ((tx1, rx1), (tx2, rx2)) = (channel(), channel()); + task::spawn(proc() { + rx1.recv(); + { + let mut lock = arc2.write(); + assert_eq!(*lock, 0); + *lock = 42; + lock.cond.signal(); + } + rx1.recv(); + { + let mut lock = arc2.write(); + // This shouldn't happen until after the downgrade read + // section, and all other readers, finish. + assert_eq!(*lock, 31337); + *lock = 42; + } + tx2.send(()); + }); + + // Downgrader (us) + let mut lock = arc.write(); + tx1.send(()); // send to another writer who will wake us up + while *lock == 0 { + lock.cond.wait(); + } + assert_eq!(*lock, 42); + *lock = 31337; + // send to other readers + for &(ref mut rc, _) in reader_convos.iter_mut() { + rc.send(()) + } + let lock = lock.downgrade(); + // complete handshake with other readers + for &(_, ref mut rp) in reader_convos.iter_mut() { + rp.recv() + } + tx1.send(()); // tell writer to try again + assert_eq!(*lock, 31337); + drop(lock); + + rx2.recv(); // complete handshake with writer + } + + #[cfg(test)] + fn test_rw_write_cond_downgrade_read_race_helper() { + // Tests that when a downgrader hands off the "reader cloud" lock + // because of a contending reader, a writer can't race to get it + // instead, which would result in readers_and_writers. This tests + // the raw module rather than this one, but it's here because an + // rwarc gives us extra shared state to help check for the race. + let x = Arc::new(RWLock::new(true)); + let (tx, rx) = channel(); + + // writer task + let xw = x.clone(); + task::spawn(proc() { + let mut lock = xw.write(); + tx.send(()); // tell downgrader it's ok to go + lock.cond.wait(); + // The core of the test is here: the condvar reacquire path + // must involve order_lock, so that it cannot race with a reader + // trying to receive the "reader cloud lock hand-off". + *lock = false; + }); + + rx.recv(); // wait for writer to get in + + let lock = x.write(); + assert!(*lock); + // make writer contend in the cond-reacquire path + lock.cond.signal(); + // make a reader task to trigger the "reader cloud lock" handoff + let xr = x.clone(); + let (tx, rx) = channel(); + task::spawn(proc() { + tx.send(()); + drop(xr.read()); + }); + rx.recv(); // wait for reader task to exist + + let lock = lock.downgrade(); + // if writer mistakenly got in, make sure it mutates state + // before we assert on it + for _ in range(0u, 5) { task::deschedule(); } + // make sure writer didn't get in. + assert!(*lock); + } + #[test] + fn test_rw_write_cond_downgrade_read_race() { + // Ideally the above test case would have deschedule statements in it + // that helped to expose the race nearly 100% of the time... but adding + // deschedules in the intuitively-right locations made it even less + // likely, and I wasn't sure why :( . This is a mediocre "next best" + // option. + for _ in range(0u, 8) { + test_rw_write_cond_downgrade_read_race_helper(); + } + } + + /************************************************************************ + * Barrier tests + ************************************************************************/ + #[test] + fn test_barrier() { + let barrier = Arc::new(Barrier::new(10)); + let (tx, rx) = channel(); + + for _ in range(0u, 9) { + let c = barrier.clone(); + let tx = tx.clone(); + spawn(proc() { + c.wait(); + tx.send(true); + }); + } + + // At this point, all spawned tasks should be blocked, + // so we shouldn't get anything from the port + assert!(match rx.try_recv() { + Err(Empty) => true, + _ => false, + }); + + barrier.wait(); + // Now, the barrier is cleared and we should get data. + for _ in range(0u, 9) { + rx.recv(); + } + } +} diff --git a/src/libstd/sync/mod.rs b/src/libstd/sync/mod.rs index 38e1e952f77..944b852db35 100644 --- a/src/libstd/sync/mod.rs +++ b/src/libstd/sync/mod.rs @@ -17,17 +17,41 @@ #![experimental] -#[stable] -pub use core_sync::atomic; +pub use self::one::{Once, ONCE_INIT}; + +pub use alloc::arc::{Arc, Weak}; +pub use self::lock::{Mutex, MutexGuard, Condvar, Barrier, + RWLock, RWLockReadGuard, RWLockWriteGuard}; -pub use core_sync::{deque, mpmc_bounded_queue, mpsc_queue, spsc_queue}; -pub use core_sync::{Arc, Weak, Mutex, MutexGuard, Condvar, Barrier}; -pub use core_sync::{RWLock, RWLockReadGuard, RWLockWriteGuard}; -pub use core_sync::{Semaphore, SemaphoreGuard}; -pub use core_sync::one::{Once, ONCE_INIT}; +// The mutex/rwlock in this module are not meant for reexport +pub use self::raw::{Semaphore, SemaphoreGuard}; pub use self::future::Future; pub use self::task_pool::TaskPool; +// Core building blocks for all primitives in this crate + +#[stable] +pub mod atomic; + +// Concurrent data structures + +pub mod spsc_queue; +pub mod mpsc_queue; +pub mod mpmc_bounded_queue; +pub mod deque; + +// Low-level concurrency primitives + +mod raw; +mod mutex; +mod one; + +// Higher level primitives based on those above + +mod lock; + +// Task management + mod future; mod task_pool; diff --git a/src/libstd/sync/mpmc_bounded_queue.rs b/src/libstd/sync/mpmc_bounded_queue.rs new file mode 100644 index 00000000000..dca2d4098c6 --- /dev/null +++ b/src/libstd/sync/mpmc_bounded_queue.rs @@ -0,0 +1,219 @@ +/* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved. + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT + * SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE + * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + * The views and conclusions contained in the software and documentation are + * those of the authors and should not be interpreted as representing official + * policies, either expressed or implied, of Dmitry Vyukov. + */ + +#![experimental] +#![allow(missing_docs, dead_code)] + +// http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue + +use core::prelude::*; + +use alloc::arc::Arc; +use vec::Vec; +use core::num::UnsignedInt; +use core::cell::UnsafeCell; + +use sync::atomic::{AtomicUint,Relaxed,Release,Acquire}; + +struct Node<T> { + sequence: AtomicUint, + value: Option<T>, +} + +struct State<T> { + pad0: [u8, ..64], + buffer: Vec<UnsafeCell<Node<T>>>, + mask: uint, + pad1: [u8, ..64], + enqueue_pos: AtomicUint, + pad2: [u8, ..64], + dequeue_pos: AtomicUint, + pad3: [u8, ..64], +} + +pub struct Queue<T> { + state: Arc<State<T>>, +} + +impl<T: Send> State<T> { + fn with_capacity(capacity: uint) -> State<T> { + let capacity = if capacity < 2 || (capacity & (capacity - 1)) != 0 { + if capacity < 2 { + 2u + } else { + // use next power of 2 as capacity + capacity.next_power_of_two() + } + } else { + capacity + }; + let buffer = Vec::from_fn(capacity, |i| { + UnsafeCell::new(Node { sequence:AtomicUint::new(i), value: None }) + }); + State{ + pad0: [0, ..64], + buffer: buffer, + mask: capacity-1, + pad1: [0, ..64], + enqueue_pos: AtomicUint::new(0), + pad2: [0, ..64], + dequeue_pos: AtomicUint::new(0), + pad3: [0, ..64], + } + } + + fn push(&self, value: T) -> bool { + let mask = self.mask; + let mut pos = self.enqueue_pos.load(Relaxed); + loop { + let node = &self.buffer[pos & mask]; + let seq = unsafe { (*node.get()).sequence.load(Acquire) }; + let diff: int = seq as int - pos as int; + + if diff == 0 { + let enqueue_pos = self.enqueue_pos.compare_and_swap(pos, pos+1, Relaxed); + if enqueue_pos == pos { + unsafe { + (*node.get()).value = Some(value); + (*node.get()).sequence.store(pos+1, Release); + } + break + } else { + pos = enqueue_pos; + } + } else if diff < 0 { + return false + } else { + pos = self.enqueue_pos.load(Relaxed); + } + } + true + } + + fn pop(&self) -> Option<T> { + let mask = self.mask; + let mut pos = self.dequeue_pos.load(Relaxed); + loop { + let node = &self.buffer[pos & mask]; + let seq = unsafe { (*node.get()).sequence.load(Acquire) }; + let diff: int = seq as int - (pos + 1) as int; + if diff == 0 { + let dequeue_pos = self.dequeue_pos.compare_and_swap(pos, pos+1, Relaxed); + if dequeue_pos == pos { + unsafe { + let value = (*node.get()).value.take(); + (*node.get()).sequence.store(pos + mask + 1, Release); + return value + } + } else { + pos = dequeue_pos; + } + } else if diff < 0 { + return None + } else { + pos = self.dequeue_pos.load(Relaxed); + } + } + } +} + +impl<T: Send> Queue<T> { + pub fn with_capacity(capacity: uint) -> Queue<T> { + Queue{ + state: Arc::new(State::with_capacity(capacity)) + } + } + + pub fn push(&self, value: T) -> bool { + self.state.push(value) + } + + pub fn pop(&self) -> Option<T> { + self.state.pop() + } +} + +impl<T: Send> Clone for Queue<T> { + fn clone(&self) -> Queue<T> { + Queue { state: self.state.clone() } + } +} + +#[cfg(test)] +mod tests { + use prelude::*; + use super::Queue; + + #[test] + fn test() { + let nthreads = 8u; + let nmsgs = 1000u; + let q = Queue::with_capacity(nthreads*nmsgs); + assert_eq!(None, q.pop()); + let (tx, rx) = channel(); + + for _ in range(0, nthreads) { + let q = q.clone(); + let tx = tx.clone(); + spawn(proc() { + let q = q; + for i in range(0, nmsgs) { + assert!(q.push(i)); + } + tx.send(()); + }); + } + + let mut completion_rxs = vec![]; + for _ in range(0, nthreads) { + let (tx, rx) = channel(); + completion_rxs.push(rx); + let q = q.clone(); + spawn(proc() { + let q = q; + let mut i = 0u; + loop { + match q.pop() { + None => {}, + Some(_) => { + i += 1; + if i == nmsgs { break } + } + } + } + tx.send(i); + }); + } + + for rx in completion_rxs.iter_mut() { + assert_eq!(nmsgs, rx.recv()); + } + for _ in range(0, nthreads) { + rx.recv(); + } + } +} diff --git a/src/libstd/sync/mpsc_queue.rs b/src/libstd/sync/mpsc_queue.rs new file mode 100644 index 00000000000..09212e4dfb6 --- /dev/null +++ b/src/libstd/sync/mpsc_queue.rs @@ -0,0 +1,210 @@ +/* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved. + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT + * SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE + * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + * The views and conclusions contained in the software and documentation are + * those of the authors and should not be interpreted as representing official + * policies, either expressed or implied, of Dmitry Vyukov. + */ + +//! A mostly lock-free multi-producer, single consumer queue. +//! +//! This module contains an implementation of a concurrent MPSC queue. This +//! queue can be used to share data between tasks, and is also used as the +//! building block of channels in rust. +//! +//! Note that the current implementation of this queue has a caveat of the `pop` +//! method, and see the method for more information about it. Due to this +//! caveat, this queue may not be appropriate for all use-cases. + +#![experimental] + +// http://www.1024cores.net/home/lock-free-algorithms +// /queues/non-intrusive-mpsc-node-based-queue + +pub use self::PopResult::*; + +use core::prelude::*; + +use alloc::boxed::Box; +use core::mem; +use core::cell::UnsafeCell; + +use sync::atomic::{AtomicPtr, Release, Acquire, AcqRel, Relaxed}; + +/// A result of the `pop` function. +pub enum PopResult<T> { + /// Some data has been popped + Data(T), + /// The queue is empty + Empty, + /// The queue is in an inconsistent state. Popping data should succeed, but + /// some pushers have yet to make enough progress in order allow a pop to + /// succeed. It is recommended that a pop() occur "in the near future" in + /// order to see if the sender has made progress or not + Inconsistent, +} + +struct Node<T> { + next: AtomicPtr<Node<T>>, + value: Option<T>, +} + +/// The multi-producer single-consumer structure. This is not cloneable, but it +/// may be safely shared so long as it is guaranteed that there is only one +/// popper at a time (many pushers are allowed). +pub struct Queue<T> { + head: AtomicPtr<Node<T>>, + tail: UnsafeCell<*mut Node<T>>, +} + +impl<T> Node<T> { + unsafe fn new(v: Option<T>) -> *mut Node<T> { + mem::transmute(box Node { + next: AtomicPtr::new(0 as *mut Node<T>), + value: v, + }) + } +} + +impl<T: Send> Queue<T> { + /// Creates a new queue that is safe to share among multiple producers and + /// one consumer. + pub fn new() -> Queue<T> { + let stub = unsafe { Node::new(None) }; + Queue { + head: AtomicPtr::new(stub), + tail: UnsafeCell::new(stub), + } + } + + /// Pushes a new value onto this queue. + pub fn push(&self, t: T) { + unsafe { + let n = Node::new(Some(t)); + let prev = self.head.swap(n, AcqRel); + (*prev).next.store(n, Release); + } + } + + /// Pops some data from this queue. + /// + /// Note that the current implementation means that this function cannot + /// return `Option<T>`. It is possible for this queue to be in an + /// inconsistent state where many pushes have succeeded and completely + /// finished, but pops cannot return `Some(t)`. This inconsistent state + /// happens when a pusher is pre-empted at an inopportune moment. + /// + /// This inconsistent state means that this queue does indeed have data, but + /// it does not currently have access to it at this time. + pub fn pop(&self) -> PopResult<T> { + unsafe { + let tail = *self.tail.get(); + let next = (*tail).next.load(Acquire); + + if !next.is_null() { + *self.tail.get() = next; + assert!((*tail).value.is_none()); + assert!((*next).value.is_some()); + let ret = (*next).value.take().unwrap(); + let _: Box<Node<T>> = mem::transmute(tail); + return Data(ret); + } + + if self.head.load(Acquire) == tail {Empty} else {Inconsistent} + } + } + + /// Attempts to pop data from this queue, but doesn't attempt too hard. This + /// will canonicalize inconsistent states to a `None` value. + pub fn casual_pop(&self) -> Option<T> { + match self.pop() { + Data(t) => Some(t), + Empty | Inconsistent => None, + } + } +} + +#[unsafe_destructor] +impl<T: Send> Drop for Queue<T> { + fn drop(&mut self) { + unsafe { + let mut cur = *self.tail.get(); + while !cur.is_null() { + let next = (*cur).next.load(Relaxed); + let _: Box<Node<T>> = mem::transmute(cur); + cur = next; + } + } + } +} + +#[cfg(test)] +mod tests { + use prelude::*; + + use alloc::arc::Arc; + + use super::{Queue, Data, Empty, Inconsistent}; + + #[test] + fn test_full() { + let q = Queue::new(); + q.push(box 1i); + q.push(box 2i); + } + + #[test] + fn test() { + let nthreads = 8u; + let nmsgs = 1000u; + let q = Queue::new(); + match q.pop() { + Empty => {} + Inconsistent | Data(..) => panic!() + } + let (tx, rx) = channel(); + let q = Arc::new(q); + + for _ in range(0, nthreads) { + let tx = tx.clone(); + let q = q.clone(); + spawn(proc() { + for i in range(0, nmsgs) { + q.push(i); + } + tx.send(()); + }); + } + + let mut i = 0u; + while i < nthreads * nmsgs { + match q.pop() { + Empty | Inconsistent => {}, + Data(_) => { i += 1 } + } + } + drop(tx); + for _ in range(0, nthreads) { + rx.recv(); + } + } +} diff --git a/src/libstd/sync/mutex.rs b/src/libstd/sync/mutex.rs new file mode 100644 index 00000000000..c9e90210c30 --- /dev/null +++ b/src/libstd/sync/mutex.rs @@ -0,0 +1,218 @@ +// Copyright 2014 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +//! A simple native mutex implementation. Warning: this API is likely +//! to change soon. + +#![allow(dead_code)] + +use core::prelude::*; +use alloc::boxed::Box; +use rustrt::mutex; + +pub const LOCKED: uint = 1 << 0; +pub const BLOCKED: uint = 1 << 1; + +/// A mutual exclusion primitive useful for protecting shared data +/// +/// This mutex will properly block tasks waiting for the lock to become +/// available. The mutex can also be statically initialized or created via a +/// `new` constructor. +/// +/// # Example +/// +/// ```rust,ignore +/// use std::sync::mutex::Mutex; +/// +/// let m = Mutex::new(); +/// let guard = m.lock(); +/// // do some work +/// drop(guard); // unlock the lock +/// ``` +pub struct Mutex { + // Note that this static mutex is in a *box*, not inlined into the struct + // itself. This is done for memory safety reasons with the usage of a + // StaticNativeMutex inside the static mutex above. Once a native mutex has + // been used once, its address can never change (it can't be moved). This + // mutex type can be safely moved at any time, so to ensure that the native + // mutex is used correctly we box the inner lock to give it a constant + // address. + lock: Box<StaticMutex>, +} + +/// The static mutex type is provided to allow for static allocation of mutexes. +/// +/// Note that this is a separate type because using a Mutex correctly means that +/// it needs to have a destructor run. In Rust, statics are not allowed to have +/// destructors. As a result, a `StaticMutex` has one extra method when compared +/// to a `Mutex`, a `destroy` method. This method is unsafe to call, and +/// documentation can be found directly on the method. +/// +/// # Example +/// +/// ```rust,ignore +/// use std::sync::mutex::{StaticMutex, MUTEX_INIT}; +/// +/// static LOCK: StaticMutex = MUTEX_INIT; +/// +/// { +/// let _g = LOCK.lock(); +/// // do some productive work +/// } +/// // lock is unlocked here. +/// ``` +pub struct StaticMutex { + lock: mutex::StaticNativeMutex, +} + +/// An RAII implementation of a "scoped lock" of a mutex. When this structure is +/// dropped (falls out of scope), the lock will be unlocked. +#[must_use] +pub struct Guard<'a> { + guard: mutex::LockGuard<'a>, +} + +fn lift_guard(guard: mutex::LockGuard) -> Guard { + Guard { guard: guard } +} + +/// Static initialization of a mutex. This constant can be used to initialize +/// other mutex constants. +pub const MUTEX_INIT: StaticMutex = StaticMutex { + lock: mutex::NATIVE_MUTEX_INIT +}; + +impl StaticMutex { + /// Attempts to grab this lock, see `Mutex::try_lock` + pub fn try_lock<'a>(&'a self) -> Option<Guard<'a>> { + unsafe { self.lock.trylock().map(lift_guard) } + } + + /// Acquires this lock, see `Mutex::lock` + pub fn lock<'a>(&'a self) -> Guard<'a> { + lift_guard(unsafe { self.lock.lock() }) + } + + /// Deallocates resources associated with this static mutex. + /// + /// This method is unsafe because it provides no guarantees that there are + /// no active users of this mutex, and safety is not guaranteed if there are + /// active users of this mutex. + /// + /// This method is required to ensure that there are no memory leaks on + /// *all* platforms. It may be the case that some platforms do not leak + /// memory if this method is not called, but this is not guaranteed to be + /// true on all platforms. + pub unsafe fn destroy(&self) { + self.lock.destroy() + } +} + +impl Mutex { + /// Creates a new mutex in an unlocked state ready for use. + pub fn new() -> Mutex { + Mutex { + lock: box StaticMutex { + lock: unsafe { mutex::StaticNativeMutex::new() }, + } + } + } + + /// Attempts to acquire this lock. + /// + /// If the lock could not be acquired at this time, then `None` is returned. + /// Otherwise, an RAII guard is returned. The lock will be unlocked when the + /// guard is dropped. + /// + /// This function does not block. + pub fn try_lock<'a>(&'a self) -> Option<Guard<'a>> { + self.lock.try_lock() + } + + /// Acquires a mutex, blocking the current task until it is able to do so. + /// + /// This function will block the local task until it is available to acquire + /// the mutex. Upon returning, the task is the only task with the mutex + /// held. An RAII guard is returned to allow scoped unlock of the lock. When + /// the guard goes out of scope, the mutex will be unlocked. + pub fn lock<'a>(&'a self) -> Guard<'a> { self.lock.lock() } +} + +impl Drop for Mutex { + fn drop(&mut self) { + // This is actually safe b/c we know that there is no further usage of + // this mutex (it's up to the user to arrange for a mutex to get + // dropped, that's not our job) + unsafe { self.lock.destroy() } + } +} + +#[cfg(test)] +mod test { + use prelude::*; + use super::{Mutex, StaticMutex, MUTEX_INIT}; + + #[test] + fn smoke() { + let m = Mutex::new(); + drop(m.lock()); + drop(m.lock()); + } + + #[test] + fn smoke_static() { + static M: StaticMutex = MUTEX_INIT; + unsafe { + drop(M.lock()); + drop(M.lock()); + M.destroy(); + } + } + + #[test] + fn lots_and_lots() { + static M: StaticMutex = MUTEX_INIT; + static mut CNT: uint = 0; + static J: uint = 1000; + static K: uint = 3; + + fn inc() { + for _ in range(0, J) { + unsafe { + let _g = M.lock(); + CNT += 1; + } + } + } + + let (tx, rx) = channel(); + for _ in range(0, K) { + let tx2 = tx.clone(); + spawn(proc() { inc(); tx2.send(()); }); + let tx2 = tx.clone(); + spawn(proc() { inc(); tx2.send(()); }); + } + + drop(tx); + for _ in range(0, 2 * K) { + rx.recv(); + } + assert_eq!(unsafe {CNT}, J * K * 2); + unsafe { + M.destroy(); + } + } + + #[test] + fn trylock() { + let m = Mutex::new(); + assert!(m.try_lock().is_some()); + } +} diff --git a/src/libstd/sync/one.rs b/src/libstd/sync/one.rs new file mode 100644 index 00000000000..f710a6da59b --- /dev/null +++ b/src/libstd/sync/one.rs @@ -0,0 +1,170 @@ +// Copyright 2014 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +//! A "once initialization" primitive +//! +//! This primitive is meant to be used to run one-time initialization. An +//! example use case would be for initializing an FFI library. + +use core::prelude::*; + +use core::int; +use core::atomic; + +use super::mutex::{StaticMutex, MUTEX_INIT}; + +/// A synchronization primitive which can be used to run a one-time global +/// initialization. Useful for one-time initialization for FFI or related +/// functionality. This type can only be constructed with the `ONCE_INIT` +/// value. +/// +/// # Example +/// +/// ```rust,ignore +/// use std::sync::one::{Once, ONCE_INIT}; +/// +/// static START: Once = ONCE_INIT; +/// +/// START.doit(|| { +/// // run initialization here +/// }); +/// ``` +pub struct Once { + mutex: StaticMutex, + cnt: atomic::AtomicInt, + lock_cnt: atomic::AtomicInt, +} + +/// Initialization value for static `Once` values. +pub const ONCE_INIT: Once = Once { + mutex: MUTEX_INIT, + cnt: atomic::INIT_ATOMIC_INT, + lock_cnt: atomic::INIT_ATOMIC_INT, +}; + +impl Once { + /// Perform an initialization routine once and only once. The given closure + /// will be executed if this is the first time `doit` has been called, and + /// otherwise the routine will *not* be invoked. + /// + /// This method will block the calling task if another initialization + /// routine is currently running. + /// + /// When this function returns, it is guaranteed that some initialization + /// has run and completed (it may not be the closure specified). + pub fn doit(&self, f: ||) { + // Optimize common path: load is much cheaper than fetch_add. + if self.cnt.load(atomic::SeqCst) < 0 { + return + } + + // Implementation-wise, this would seem like a fairly trivial primitive. + // The stickler part is where our mutexes currently require an + // allocation, and usage of a `Once` shouldn't leak this allocation. + // + // This means that there must be a deterministic destroyer of the mutex + // contained within (because it's not needed after the initialization + // has run). + // + // The general scheme here is to gate all future threads once + // initialization has completed with a "very negative" count, and to + // allow through threads to lock the mutex if they see a non negative + // count. For all threads grabbing the mutex, exactly one of them should + // be responsible for unlocking the mutex, and this should only be done + // once everyone else is done with the mutex. + // + // This atomicity is achieved by swapping a very negative value into the + // shared count when the initialization routine has completed. This will + // read the number of threads which will at some point attempt to + // acquire the mutex. This count is then squirreled away in a separate + // variable, and the last person on the way out of the mutex is then + // responsible for destroying the mutex. + // + // It is crucial that the negative value is swapped in *after* the + // initialization routine has completed because otherwise new threads + // calling `doit` will return immediately before the initialization has + // completed. + + let prev = self.cnt.fetch_add(1, atomic::SeqCst); + if prev < 0 { + // Make sure we never overflow, we'll never have int::MIN + // simultaneous calls to `doit` to make this value go back to 0 + self.cnt.store(int::MIN, atomic::SeqCst); + return + } + + // If the count is negative, then someone else finished the job, + // otherwise we run the job and record how many people will try to grab + // this lock + let guard = self.mutex.lock(); + if self.cnt.load(atomic::SeqCst) > 0 { + f(); + let prev = self.cnt.swap(int::MIN, atomic::SeqCst); + self.lock_cnt.store(prev, atomic::SeqCst); + } + drop(guard); + + // Last one out cleans up after everyone else, no leaks! + if self.lock_cnt.fetch_add(-1, atomic::SeqCst) == 1 { + unsafe { self.mutex.destroy() } + } + } +} + +#[cfg(test)] +mod test { + use prelude::*; + use task; + use super::{ONCE_INIT, Once}; + + #[test] + fn smoke_once() { + static O: Once = ONCE_INIT; + let mut a = 0i; + O.doit(|| a += 1); + assert_eq!(a, 1); + O.doit(|| a += 1); + assert_eq!(a, 1); + } + + #[test] + fn stampede_once() { + static O: Once = ONCE_INIT; + static mut run: bool = false; + + let (tx, rx) = channel(); + for _ in range(0u, 10) { + let tx = tx.clone(); + spawn(proc() { + for _ in range(0u, 4) { task::deschedule() } + unsafe { + O.doit(|| { + assert!(!run); + run = true; + }); + assert!(run); + } + tx.send(()); + }); + } + + unsafe { + O.doit(|| { + assert!(!run); + run = true; + }); + assert!(run); + } + + for _ in range(0u, 10) { + rx.recv(); + } + } +} diff --git a/src/libstd/sync/raw.rs b/src/libstd/sync/raw.rs new file mode 100644 index 00000000000..ff3f2c9462c --- /dev/null +++ b/src/libstd/sync/raw.rs @@ -0,0 +1,1129 @@ +// Copyright 2012-2014 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +//! Raw concurrency primitives you know and love. +//! +//! These primitives are not recommended for general use, but are provided for +//! flavorful use-cases. It is recommended to use the types at the top of the +//! `sync` crate which wrap values directly and provide safer abstractions for +//! containing data. + +// A side-effect of merging libsync into libstd; will go away once +// libsync rewrite lands +#![allow(dead_code)] + +use core::prelude::*; +use self::ReacquireOrderLock::*; + +use core::atomic; +use core::finally::Finally; +use core::kinds::marker; +use core::mem; +use core::cell::UnsafeCell; +use vec::Vec; + +use super::mutex; +use comm::{Receiver, Sender, channel}; + +/**************************************************************************** + * Internals + ****************************************************************************/ + +// Each waiting task receives on one of these. +type WaitEnd = Receiver<()>; +type SignalEnd = Sender<()>; +// A doubly-ended queue of waiting tasks. +struct WaitQueue { + head: Receiver<SignalEnd>, + tail: Sender<SignalEnd>, +} + +impl WaitQueue { + fn new() -> WaitQueue { + let (block_tail, block_head) = channel(); + WaitQueue { head: block_head, tail: block_tail } + } + + // Signals one live task from the queue. + fn signal(&self) -> bool { + match self.head.try_recv() { + Ok(ch) => { + // Send a wakeup signal. If the waiter was killed, its port will + // have closed. Keep trying until we get a live task. + if ch.send_opt(()).is_ok() { + true + } else { + self.signal() + } + } + _ => false + } + } + + fn broadcast(&self) -> uint { + let mut count = 0; + loop { + match self.head.try_recv() { + Ok(ch) => { + if ch.send_opt(()).is_ok() { + count += 1; + } + } + _ => break + } + } + count + } + + fn wait_end(&self) -> WaitEnd { + let (signal_end, wait_end) = channel(); + self.tail.send(signal_end); + wait_end + } +} + +// The building-block used to make semaphores, mutexes, and rwlocks. +struct Sem<Q> { + lock: mutex::Mutex, + // n.b, we need Sem to be `Sync`, but the WaitQueue type is not send/share + // (for good reason). We have an internal invariant on this semaphore, + // however, that the queue is never accessed outside of a locked + // context. + inner: UnsafeCell<SemInner<Q>> +} + +struct SemInner<Q> { + count: int, + waiters: WaitQueue, + // Can be either unit or another waitqueue. Some sems shouldn't come with + // a condition variable attached, others should. + blocked: Q, +} + +#[must_use] +struct SemGuard<'a, Q:'a> { + sem: &'a Sem<Q>, +} + +impl<Q: Send> Sem<Q> { + fn new(count: int, q: Q) -> Sem<Q> { + assert!(count >= 0, + "semaphores cannot be initialized with negative values"); + Sem { + lock: mutex::Mutex::new(), + inner: UnsafeCell::new(SemInner { + waiters: WaitQueue::new(), + count: count, + blocked: q, + }) + } + } + + unsafe fn with(&self, f: |&mut SemInner<Q>|) { + let _g = self.lock.lock(); + // This &mut is safe because, due to the lock, we are the only one who can touch the data + f(&mut *self.inner.get()) + } + + pub fn acquire(&self) { + unsafe { + let mut waiter_nobe = None; + self.with(|state| { + state.count -= 1; + if state.count < 0 { + // Create waiter nobe, enqueue ourself, and tell + // outer scope we need to block. + waiter_nobe = Some(state.waiters.wait_end()); + } + }); + // Uncomment if you wish to test for sem races. Not + // valgrind-friendly. + /* for _ in range(0u, 1000) { task::deschedule(); } */ + // Need to wait outside the exclusive. + if waiter_nobe.is_some() { + let _ = waiter_nobe.unwrap().recv(); + } + } + } + + pub fn release(&self) { + unsafe { + self.with(|state| { + state.count += 1; + if state.count <= 0 { + state.waiters.signal(); + } + }) + } + } + + pub fn access<'a>(&'a self) -> SemGuard<'a, Q> { + self.acquire(); + SemGuard { sem: self } + } +} + +#[unsafe_destructor] +impl<'a, Q: Send> Drop for SemGuard<'a, Q> { + fn drop(&mut self) { + self.sem.release(); + } +} + +impl Sem<Vec<WaitQueue>> { + fn new_and_signal(count: int, num_condvars: uint) -> Sem<Vec<WaitQueue>> { + let mut queues = Vec::new(); + for _ in range(0, num_condvars) { queues.push(WaitQueue::new()); } + Sem::new(count, queues) + } + + // The only other places that condvars get built are rwlock.write_cond() + // and rwlock_write_mode. + pub fn access_cond<'a>(&'a self) -> SemCondGuard<'a> { + SemCondGuard { + guard: self.access(), + cvar: Condvar { sem: self, order: Nothing, nocopy: marker::NoCopy }, + } + } +} + +// FIXME(#3598): Want to use an Option down below, but we need a custom enum +// that's not polymorphic to get around the fact that lifetimes are invariant +// inside of type parameters. +enum ReacquireOrderLock<'a> { + Nothing, // c.c + Just(&'a Semaphore), +} + +/// A mechanism for atomic-unlock-and-deschedule blocking and signalling. +pub struct Condvar<'a> { + // The 'Sem' object associated with this condvar. This is the one that's + // atomically-unlocked-and-descheduled upon and reacquired during wakeup. + sem: &'a Sem<Vec<WaitQueue> >, + // This is (can be) an extra semaphore which is held around the reacquire + // operation on the first one. This is only used in cvars associated with + // rwlocks, and is needed to ensure that, when a downgrader is trying to + // hand off the access lock (which would be the first field, here), a 2nd + // writer waking up from a cvar wait can't race with a reader to steal it, + // See the comment in write_cond for more detail. + order: ReacquireOrderLock<'a>, + // Make sure condvars are non-copyable. + nocopy: marker::NoCopy, +} + +impl<'a> Condvar<'a> { + /// Atomically drop the associated lock, and block until a signal is sent. + /// + /// # Panics + /// + /// A task which is killed while waiting on a condition variable will wake + /// up, panic, and unlock the associated lock as it unwinds. + pub fn wait(&self) { self.wait_on(0) } + + /// As wait(), but can specify which of multiple condition variables to + /// wait on. Only a signal_on() or broadcast_on() with the same condvar_id + /// will wake this thread. + /// + /// The associated lock must have been initialised with an appropriate + /// number of condvars. The condvar_id must be between 0 and num_condvars-1 + /// or else this call will panic. + /// + /// wait() is equivalent to wait_on(0). + pub fn wait_on(&self, condvar_id: uint) { + let mut wait_end = None; + let mut out_of_bounds = None; + // Release lock, 'atomically' enqueuing ourselves in so doing. + unsafe { + self.sem.with(|state| { + if condvar_id < state.blocked.len() { + // Drop the lock. + state.count += 1; + if state.count <= 0 { + state.waiters.signal(); + } + // Create waiter nobe, and enqueue ourself to + // be woken up by a signaller. + wait_end = Some(state.blocked[condvar_id].wait_end()); + } else { + out_of_bounds = Some(state.blocked.len()); + } + }) + } + + // If deschedule checks start getting inserted anywhere, we can be + // killed before or after enqueueing. + check_cvar_bounds(out_of_bounds, condvar_id, "cond.wait_on()", || { + // Unconditionally "block". (Might not actually block if a + // signaller already sent -- I mean 'unconditionally' in contrast + // with acquire().) + (|| { + let _ = wait_end.take().unwrap().recv(); + }).finally(|| { + // Reacquire the condvar. + match self.order { + Just(lock) => { + let _g = lock.access(); + self.sem.acquire(); + } + Nothing => self.sem.acquire(), + } + }) + }) + } + + /// Wake up a blocked task. Returns false if there was no blocked task. + pub fn signal(&self) -> bool { self.signal_on(0) } + + /// As signal, but with a specified condvar_id. See wait_on. + pub fn signal_on(&self, condvar_id: uint) -> bool { + unsafe { + let mut out_of_bounds = None; + let mut result = false; + self.sem.with(|state| { + if condvar_id < state.blocked.len() { + result = state.blocked[condvar_id].signal(); + } else { + out_of_bounds = Some(state.blocked.len()); + } + }); + check_cvar_bounds(out_of_bounds, + condvar_id, + "cond.signal_on()", + || result) + } + } + + /// Wake up all blocked tasks. Returns the number of tasks woken. + pub fn broadcast(&self) -> uint { self.broadcast_on(0) } + + /// As broadcast, but with a specified condvar_id. See wait_on. + pub fn broadcast_on(&self, condvar_id: uint) -> uint { + let mut out_of_bounds = None; + let mut queue = None; + unsafe { + self.sem.with(|state| { + if condvar_id < state.blocked.len() { + // To avoid :broadcast_heavy, we make a new waitqueue, + // swap it out with the old one, and broadcast on the + // old one outside of the little-lock. + queue = Some(mem::replace(&mut state.blocked[condvar_id], + WaitQueue::new())); + } else { + out_of_bounds = Some(state.blocked.len()); + } + }); + check_cvar_bounds(out_of_bounds, + condvar_id, + "cond.signal_on()", + || { + queue.take().unwrap().broadcast() + }) + } + } +} + +// Checks whether a condvar ID was out of bounds, and panics if so, or does +// something else next on success. +#[inline] +fn check_cvar_bounds<U>( + out_of_bounds: Option<uint>, + id: uint, + act: &str, + blk: || -> U) + -> U { + match out_of_bounds { + Some(0) => + panic!("{} with illegal ID {} - this lock has no condvars!", act, id), + Some(length) => + panic!("{} with illegal ID {} - ID must be less than {}", act, id, length), + None => blk() + } +} + +#[must_use] +struct SemCondGuard<'a> { + guard: SemGuard<'a, Vec<WaitQueue>>, + cvar: Condvar<'a>, +} + +/**************************************************************************** + * Semaphores + ****************************************************************************/ + +/// A counting, blocking, bounded-waiting semaphore. +pub struct Semaphore { + sem: Sem<()>, +} + +/// An RAII guard used to represent an acquired resource to a semaphore. When +/// dropped, this value will release the resource back to the semaphore. +#[must_use] +pub struct SemaphoreGuard<'a> { + _guard: SemGuard<'a, ()>, +} + +impl Semaphore { + /// Create a new semaphore with the specified count. + /// + /// # Panics + /// + /// This function will panic if `count` is negative. + pub fn new(count: int) -> Semaphore { + Semaphore { sem: Sem::new(count, ()) } + } + + /// Acquire a resource represented by the semaphore. Blocks if necessary + /// until resource(s) become available. + pub fn acquire(&self) { self.sem.acquire() } + + /// Release a held resource represented by the semaphore. Wakes a blocked + /// contending task, if any exist. Won't block the caller. + pub fn release(&self) { self.sem.release() } + + /// Acquire a resource of this semaphore, returning an RAII guard which will + /// release the resource when dropped. + pub fn access<'a>(&'a self) -> SemaphoreGuard<'a> { + SemaphoreGuard { _guard: self.sem.access() } + } +} + +/**************************************************************************** + * Mutexes + ****************************************************************************/ + +/// A blocking, bounded-waiting, mutual exclusion lock with an associated +/// FIFO condition variable. +/// +/// # Panics +/// +/// A task which panicks while holding a mutex will unlock the mutex as it +/// unwinds. +pub struct Mutex { + sem: Sem<Vec<WaitQueue>>, +} + +/// An RAII structure which is used to gain access to a mutex's condition +/// variable. Additionally, when a value of this type is dropped, the +/// corresponding mutex is also unlocked. +#[must_use] +pub struct MutexGuard<'a> { + _guard: SemGuard<'a, Vec<WaitQueue>>, + /// Inner condition variable which is connected to the outer mutex, and can + /// be used for atomic-unlock-and-deschedule. + pub cond: Condvar<'a>, +} + +impl Mutex { + /// Create a new mutex, with one associated condvar. + pub fn new() -> Mutex { Mutex::new_with_condvars(1) } + + /// Create a new mutex, with a specified number of associated condvars. This + /// will allow calling wait_on/signal_on/broadcast_on with condvar IDs + /// between 0 and num_condvars-1. (If num_condvars is 0, lock_cond will be + /// allowed but any operations on the condvar will panic.) + pub fn new_with_condvars(num_condvars: uint) -> Mutex { + Mutex { sem: Sem::new_and_signal(1, num_condvars) } + } + + /// Acquires ownership of this mutex, returning an RAII guard which will + /// unlock the mutex when dropped. The associated condition variable can + /// also be accessed through the returned guard. + pub fn lock<'a>(&'a self) -> MutexGuard<'a> { + let SemCondGuard { guard, cvar } = self.sem.access_cond(); + MutexGuard { _guard: guard, cond: cvar } + } +} + +/**************************************************************************** + * Reader-writer locks + ****************************************************************************/ + +// NB: Wikipedia - Readers-writers_problem#The_third_readers-writers_problem + +/// A blocking, no-starvation, reader-writer lock with an associated condvar. +/// +/// # Panics +/// +/// A task which panics while holding an rwlock will unlock the rwlock as it +/// unwinds. +pub struct RWLock { + order_lock: Semaphore, + access_lock: Sem<Vec<WaitQueue>>, + + // The only way the count flag is ever accessed is with xadd. Since it is + // a read-modify-write operation, multiple xadds on different cores will + // always be consistent with respect to each other, so a monotonic/relaxed + // consistency ordering suffices (i.e., no extra barriers are needed). + // + // FIXME(#6598): The atomics module has no relaxed ordering flag, so I use + // acquire/release orderings superfluously. Change these someday. + read_count: atomic::AtomicUint, +} + +/// An RAII helper which is created by acquiring a read lock on an RWLock. When +/// dropped, this will unlock the RWLock. +#[must_use] +pub struct RWLockReadGuard<'a> { + lock: &'a RWLock, +} + +/// An RAII helper which is created by acquiring a write lock on an RWLock. When +/// dropped, this will unlock the RWLock. +/// +/// A value of this type can also be consumed to downgrade to a read-only lock. +#[must_use] +pub struct RWLockWriteGuard<'a> { + lock: &'a RWLock, + /// Inner condition variable that is connected to the write-mode of the + /// outer rwlock. + pub cond: Condvar<'a>, +} + +impl RWLock { + /// Create a new rwlock, with one associated condvar. + pub fn new() -> RWLock { RWLock::new_with_condvars(1) } + + /// Create a new rwlock, with a specified number of associated condvars. + /// Similar to mutex_with_condvars. + pub fn new_with_condvars(num_condvars: uint) -> RWLock { + RWLock { + order_lock: Semaphore::new(1), + access_lock: Sem::new_and_signal(1, num_condvars), + read_count: atomic::AtomicUint::new(0), + } + } + + /// Acquires a read-lock, returning an RAII guard that will unlock the lock + /// when dropped. Calls to 'read' from other tasks may run concurrently with + /// this one. + pub fn read<'a>(&'a self) -> RWLockReadGuard<'a> { + let _guard = self.order_lock.access(); + let old_count = self.read_count.fetch_add(1, atomic::Acquire); + if old_count == 0 { + self.access_lock.acquire(); + } + RWLockReadGuard { lock: self } + } + + /// Acquire a write-lock, returning an RAII guard that will unlock the lock + /// when dropped. No calls to 'read' or 'write' from other tasks will run + /// concurrently with this one. + /// + /// You can also downgrade a write to a read by calling the `downgrade` + /// method on the returned guard. Additionally, the guard will contain a + /// `Condvar` attached to this lock. + /// + /// # Example + /// + /// ```{rust,ignore} + /// use std::sync::raw::RWLock; + /// + /// let lock = RWLock::new(); + /// let write = lock.write(); + /// // ... exclusive access ... + /// let read = write.downgrade(); + /// // ... shared access ... + /// drop(read); + /// ``` + pub fn write<'a>(&'a self) -> RWLockWriteGuard<'a> { + let _g = self.order_lock.access(); + self.access_lock.acquire(); + + // It's important to thread our order lock into the condvar, so that + // when a cond.wait() wakes up, it uses it while reacquiring the + // access lock. If we permitted a waking-up writer to "cut in line", + // there could arise a subtle race when a downgrader attempts to hand + // off the reader cloud lock to a waiting reader. This race is tested + // in arc.rs (test_rw_write_cond_downgrade_read_race) and looks like: + // T1 (writer) T2 (downgrader) T3 (reader) + // [in cond.wait()] + // [locks for writing] + // [holds access_lock] + // [is signalled, perhaps by + // downgrader or a 4th thread] + // tries to lock access(!) + // lock order_lock + // xadd read_count[0->1] + // tries to lock access + // [downgrade] + // xadd read_count[1->2] + // unlock access + // Since T1 contended on the access lock before T3 did, it will steal + // the lock handoff. Adding order_lock in the condvar reacquire path + // solves this because T1 will hold order_lock while waiting on access, + // which will cause T3 to have to wait until T1 finishes its write, + // which can't happen until T2 finishes the downgrade-read entirely. + // The astute reader will also note that making waking writers use the + // order_lock is better for not starving readers. + RWLockWriteGuard { + lock: self, + cond: Condvar { + sem: &self.access_lock, + order: Just(&self.order_lock), + nocopy: marker::NoCopy, + } + } + } +} + +impl<'a> RWLockWriteGuard<'a> { + /// Consumes this write lock and converts it into a read lock. + pub fn downgrade(self) -> RWLockReadGuard<'a> { + let lock = self.lock; + // Don't run the destructor of the write guard, we're in charge of + // things from now on + unsafe { mem::forget(self) } + + let old_count = lock.read_count.fetch_add(1, atomic::Release); + // If another reader was already blocking, we need to hand-off + // the "reader cloud" access lock to them. + if old_count != 0 { + // Guaranteed not to let another writer in, because + // another reader was holding the order_lock. Hence they + // must be the one to get the access_lock (because all + // access_locks are acquired with order_lock held). See + // the comment in write_cond for more justification. + lock.access_lock.release(); + } + RWLockReadGuard { lock: lock } + } +} + +#[unsafe_destructor] +impl<'a> Drop for RWLockWriteGuard<'a> { + fn drop(&mut self) { + self.lock.access_lock.release(); + } +} + +#[unsafe_destructor] +impl<'a> Drop for RWLockReadGuard<'a> { + fn drop(&mut self) { + let old_count = self.lock.read_count.fetch_sub(1, atomic::Release); + assert!(old_count > 0); + if old_count == 1 { + // Note: this release used to be outside of a locked access + // to exclusive-protected state. If this code is ever + // converted back to such (instead of using atomic ops), + // this access MUST NOT go inside the exclusive access. + self.lock.access_lock.release(); + } + } +} + +/**************************************************************************** + * Tests + ****************************************************************************/ + +#[cfg(test)] +mod tests { + pub use self::RWLockMode::*; + + use sync::Arc; + use prelude::*; + use super::{Semaphore, Mutex, RWLock, Condvar}; + + use mem; + use result; + use task; + + /************************************************************************ + * Semaphore tests + ************************************************************************/ + #[test] + fn test_sem_acquire_release() { + let s = Semaphore::new(1); + s.acquire(); + s.release(); + s.acquire(); + } + #[test] + fn test_sem_basic() { + let s = Semaphore::new(1); + let _g = s.access(); + } + #[test] + #[should_fail] + fn test_sem_basic2() { + Semaphore::new(-1); + } + #[test] + fn test_sem_as_mutex() { + let s = Arc::new(Semaphore::new(1)); + let s2 = s.clone(); + task::spawn(proc() { + let _g = s2.access(); + for _ in range(0u, 5) { task::deschedule(); } + }); + let _g = s.access(); + for _ in range(0u, 5) { task::deschedule(); } + } + #[test] + fn test_sem_as_cvar() { + /* Child waits and parent signals */ + let (tx, rx) = channel(); + let s = Arc::new(Semaphore::new(0)); + let s2 = s.clone(); + task::spawn(proc() { + s2.acquire(); + tx.send(()); + }); + for _ in range(0u, 5) { task::deschedule(); } + s.release(); + let _ = rx.recv(); + + /* Parent waits and child signals */ + let (tx, rx) = channel(); + let s = Arc::new(Semaphore::new(0)); + let s2 = s.clone(); + task::spawn(proc() { + for _ in range(0u, 5) { task::deschedule(); } + s2.release(); + let _ = rx.recv(); + }); + s.acquire(); + tx.send(()); + } + #[test] + fn test_sem_multi_resource() { + // Parent and child both get in the critical section at the same + // time, and shake hands. + let s = Arc::new(Semaphore::new(2)); + let s2 = s.clone(); + let (tx1, rx1) = channel(); + let (tx2, rx2) = channel(); + task::spawn(proc() { + let _g = s2.access(); + let _ = rx2.recv(); + tx1.send(()); + }); + let _g = s.access(); + tx2.send(()); + let _ = rx1.recv(); + } + #[test] + fn test_sem_runtime_friendly_blocking() { + // Force the runtime to schedule two threads on the same sched_loop. + // When one blocks, it should schedule the other one. + let s = Arc::new(Semaphore::new(1)); + let s2 = s.clone(); + let (tx, rx) = channel(); + { + let _g = s.access(); + task::spawn(proc() { + tx.send(()); + drop(s2.access()); + tx.send(()); + }); + rx.recv(); // wait for child to come alive + for _ in range(0u, 5) { task::deschedule(); } // let the child contend + } + rx.recv(); // wait for child to be done + } + /************************************************************************ + * Mutex tests + ************************************************************************/ + #[test] + fn test_mutex_lock() { + // Unsafely achieve shared state, and do the textbook + // "load tmp = move ptr; inc tmp; store ptr <- tmp" dance. + let (tx, rx) = channel(); + let m = Arc::new(Mutex::new()); + let m2 = m.clone(); + let mut sharedstate = box 0; + { + let ptr: *mut int = &mut *sharedstate; + task::spawn(proc() { + access_shared(ptr, &m2, 10); + tx.send(()); + }); + } + { + access_shared(&mut *sharedstate, &m, 10); + let _ = rx.recv(); + + assert_eq!(*sharedstate, 20); + } + + fn access_shared(sharedstate: *mut int, m: &Arc<Mutex>, n: uint) { + for _ in range(0u, n) { + let _g = m.lock(); + let oldval = unsafe { *sharedstate }; + task::deschedule(); + unsafe { *sharedstate = oldval + 1; } + } + } + } + #[test] + fn test_mutex_cond_wait() { + let m = Arc::new(Mutex::new()); + + // Child wakes up parent + { + let lock = m.lock(); + let m2 = m.clone(); + task::spawn(proc() { + let lock = m2.lock(); + let woken = lock.cond.signal(); + assert!(woken); + }); + lock.cond.wait(); + } + // Parent wakes up child + let (tx, rx) = channel(); + let m3 = m.clone(); + task::spawn(proc() { + let lock = m3.lock(); + tx.send(()); + lock.cond.wait(); + tx.send(()); + }); + rx.recv(); // Wait until child gets in the mutex + { + let lock = m.lock(); + let woken = lock.cond.signal(); + assert!(woken); + } + rx.recv(); // Wait until child wakes up + } + + fn test_mutex_cond_broadcast_helper(num_waiters: uint) { + let m = Arc::new(Mutex::new()); + let mut rxs = Vec::new(); + + for _ in range(0u, num_waiters) { + let mi = m.clone(); + let (tx, rx) = channel(); + rxs.push(rx); + task::spawn(proc() { + let lock = mi.lock(); + tx.send(()); + lock.cond.wait(); + tx.send(()); + }); + } + + // wait until all children get in the mutex + for rx in rxs.iter_mut() { rx.recv(); } + { + let lock = m.lock(); + let num_woken = lock.cond.broadcast(); + assert_eq!(num_woken, num_waiters); + } + // wait until all children wake up + for rx in rxs.iter_mut() { rx.recv(); } + } + #[test] + fn test_mutex_cond_broadcast() { + test_mutex_cond_broadcast_helper(12); + } + #[test] + fn test_mutex_cond_broadcast_none() { + test_mutex_cond_broadcast_helper(0); + } + #[test] + fn test_mutex_cond_no_waiter() { + let m = Arc::new(Mutex::new()); + let m2 = m.clone(); + let _ = task::try(proc() { + drop(m.lock()); + }); + let lock = m2.lock(); + assert!(!lock.cond.signal()); + } + #[test] + fn test_mutex_killed_simple() { + use any::Any; + + // Mutex must get automatically unlocked if panicked/killed within. + let m = Arc::new(Mutex::new()); + let m2 = m.clone(); + + let result: result::Result<(), Box<Any + Send>> = task::try(proc() { + let _lock = m2.lock(); + panic!(); + }); + assert!(result.is_err()); + // child task must have finished by the time try returns + drop(m.lock()); + } + #[test] + fn test_mutex_cond_signal_on_0() { + // Tests that signal_on(0) is equivalent to signal(). + let m = Arc::new(Mutex::new()); + let lock = m.lock(); + let m2 = m.clone(); + task::spawn(proc() { + let lock = m2.lock(); + lock.cond.signal_on(0); + }); + lock.cond.wait(); + } + #[test] + fn test_mutex_no_condvars() { + let result = task::try(proc() { + let m = Mutex::new_with_condvars(0); + m.lock().cond.wait(); + }); + assert!(result.is_err()); + let result = task::try(proc() { + let m = Mutex::new_with_condvars(0); + m.lock().cond.signal(); + }); + assert!(result.is_err()); + let result = task::try(proc() { + let m = Mutex::new_with_condvars(0); + m.lock().cond.broadcast(); + }); + assert!(result.is_err()); + } + /************************************************************************ + * Reader/writer lock tests + ************************************************************************/ + #[cfg(test)] + pub enum RWLockMode { Read, Write, Downgrade, DowngradeRead } + #[cfg(test)] + fn lock_rwlock_in_mode(x: &Arc<RWLock>, mode: RWLockMode, blk: ||) { + match mode { + Read => { let _g = x.read(); blk() } + Write => { let _g = x.write(); blk() } + Downgrade => { let _g = x.write(); blk() } + DowngradeRead => { let _g = x.write().downgrade(); blk() } + } + } + #[cfg(test)] + fn test_rwlock_exclusion(x: Arc<RWLock>, + mode1: RWLockMode, + mode2: RWLockMode) { + // Test mutual exclusion between readers and writers. Just like the + // mutex mutual exclusion test, a ways above. + let (tx, rx) = channel(); + let x2 = x.clone(); + let mut sharedstate = box 0; + { + let ptr: *const int = &*sharedstate; + task::spawn(proc() { + let sharedstate: &mut int = + unsafe { mem::transmute(ptr) }; + access_shared(sharedstate, &x2, mode1, 10); + tx.send(()); + }); + } + { + access_shared(&mut *sharedstate, &x, mode2, 10); + let _ = rx.recv(); + + assert_eq!(*sharedstate, 20); + } + + fn access_shared(sharedstate: &mut int, x: &Arc<RWLock>, + mode: RWLockMode, n: uint) { + for _ in range(0u, n) { + lock_rwlock_in_mode(x, mode, || { + let oldval = *sharedstate; + task::deschedule(); + *sharedstate = oldval + 1; + }) + } + } + } + #[test] + fn test_rwlock_readers_wont_modify_the_data() { + test_rwlock_exclusion(Arc::new(RWLock::new()), Read, Write); + test_rwlock_exclusion(Arc::new(RWLock::new()), Write, Read); + test_rwlock_exclusion(Arc::new(RWLock::new()), Read, Downgrade); + test_rwlock_exclusion(Arc::new(RWLock::new()), Downgrade, Read); + test_rwlock_exclusion(Arc::new(RWLock::new()), Write, DowngradeRead); + test_rwlock_exclusion(Arc::new(RWLock::new()), DowngradeRead, Write); + } + #[test] + fn test_rwlock_writers_and_writers() { + test_rwlock_exclusion(Arc::new(RWLock::new()), Write, Write); + test_rwlock_exclusion(Arc::new(RWLock::new()), Write, Downgrade); + test_rwlock_exclusion(Arc::new(RWLock::new()), Downgrade, Write); + test_rwlock_exclusion(Arc::new(RWLock::new()), Downgrade, Downgrade); + } + #[cfg(test)] + fn test_rwlock_handshake(x: Arc<RWLock>, + mode1: RWLockMode, + mode2: RWLockMode, + make_mode2_go_first: bool) { + // Much like sem_multi_resource. + let x2 = x.clone(); + let (tx1, rx1) = channel(); + let (tx2, rx2) = channel(); + task::spawn(proc() { + if !make_mode2_go_first { + rx2.recv(); // parent sends to us once it locks, or ... + } + lock_rwlock_in_mode(&x2, mode2, || { + if make_mode2_go_first { + tx1.send(()); // ... we send to it once we lock + } + rx2.recv(); + tx1.send(()); + }) + }); + if make_mode2_go_first { + rx1.recv(); // child sends to us once it locks, or ... + } + lock_rwlock_in_mode(&x, mode1, || { + if !make_mode2_go_first { + tx2.send(()); // ... we send to it once we lock + } + tx2.send(()); + rx1.recv(); + }) + } + #[test] + fn test_rwlock_readers_and_readers() { + test_rwlock_handshake(Arc::new(RWLock::new()), Read, Read, false); + // The downgrader needs to get in before the reader gets in, otherwise + // they cannot end up reading at the same time. + test_rwlock_handshake(Arc::new(RWLock::new()), DowngradeRead, Read, false); + test_rwlock_handshake(Arc::new(RWLock::new()), Read, DowngradeRead, true); + // Two downgrade_reads can never both end up reading at the same time. + } + #[test] + fn test_rwlock_downgrade_unlock() { + // Tests that downgrade can unlock the lock in both modes + let x = Arc::new(RWLock::new()); + lock_rwlock_in_mode(&x, Downgrade, || { }); + test_rwlock_handshake(x, Read, Read, false); + let y = Arc::new(RWLock::new()); + lock_rwlock_in_mode(&y, DowngradeRead, || { }); + test_rwlock_exclusion(y, Write, Write); + } + #[test] + fn test_rwlock_read_recursive() { + let x = RWLock::new(); + let _g1 = x.read(); + let _g2 = x.read(); + } + #[test] + fn test_rwlock_cond_wait() { + // As test_mutex_cond_wait above. + let x = Arc::new(RWLock::new()); + + // Child wakes up parent + { + let lock = x.write(); + let x2 = x.clone(); + task::spawn(proc() { + let lock = x2.write(); + assert!(lock.cond.signal()); + }); + lock.cond.wait(); + } + // Parent wakes up child + let (tx, rx) = channel(); + let x3 = x.clone(); + task::spawn(proc() { + let lock = x3.write(); + tx.send(()); + lock.cond.wait(); + tx.send(()); + }); + rx.recv(); // Wait until child gets in the rwlock + drop(x.read()); // Must be able to get in as a reader + { + let x = x.write(); + assert!(x.cond.signal()); + } + rx.recv(); // Wait until child wakes up + drop(x.read()); // Just for good measure + } + #[cfg(test)] + fn test_rwlock_cond_broadcast_helper(num_waiters: uint) { + // Much like the mutex broadcast test. Downgrade-enabled. + fn lock_cond(x: &Arc<RWLock>, blk: |c: &Condvar|) { + let lock = x.write(); + blk(&lock.cond); + } + + let x = Arc::new(RWLock::new()); + let mut rxs = Vec::new(); + + for _ in range(0u, num_waiters) { + let xi = x.clone(); + let (tx, rx) = channel(); + rxs.push(rx); + task::spawn(proc() { + lock_cond(&xi, |cond| { + tx.send(()); + cond.wait(); + tx.send(()); + }) + }); + } + + // wait until all children get in the mutex + for rx in rxs.iter_mut() { let _ = rx.recv(); } + lock_cond(&x, |cond| { + let num_woken = cond.broadcast(); + assert_eq!(num_woken, num_waiters); + }); + // wait until all children wake up + for rx in rxs.iter_mut() { let _ = rx.recv(); } + } + #[test] + fn test_rwlock_cond_broadcast() { + test_rwlock_cond_broadcast_helper(0); + test_rwlock_cond_broadcast_helper(12); + } + #[cfg(test)] + fn rwlock_kill_helper(mode1: RWLockMode, mode2: RWLockMode) { + use any::Any; + + // Mutex must get automatically unlocked if panicked/killed within. + let x = Arc::new(RWLock::new()); + let x2 = x.clone(); + + let result: result::Result<(), Box<Any + Send>> = task::try(proc() { + lock_rwlock_in_mode(&x2, mode1, || { + panic!(); + }) + }); + assert!(result.is_err()); + // child task must have finished by the time try returns + lock_rwlock_in_mode(&x, mode2, || { }) + } + #[test] + fn test_rwlock_reader_killed_writer() { + rwlock_kill_helper(Read, Write); + } + #[test] + fn test_rwlock_writer_killed_reader() { + rwlock_kill_helper(Write, Read); + } + #[test] + fn test_rwlock_reader_killed_reader() { + rwlock_kill_helper(Read, Read); + } + #[test] + fn test_rwlock_writer_killed_writer() { + rwlock_kill_helper(Write, Write); + } + #[test] + fn test_rwlock_kill_downgrader() { + rwlock_kill_helper(Downgrade, Read); + rwlock_kill_helper(Read, Downgrade); + rwlock_kill_helper(Downgrade, Write); + rwlock_kill_helper(Write, Downgrade); + rwlock_kill_helper(DowngradeRead, Read); + rwlock_kill_helper(Read, DowngradeRead); + rwlock_kill_helper(DowngradeRead, Write); + rwlock_kill_helper(Write, DowngradeRead); + rwlock_kill_helper(DowngradeRead, Downgrade); + rwlock_kill_helper(DowngradeRead, Downgrade); + rwlock_kill_helper(Downgrade, DowngradeRead); + rwlock_kill_helper(Downgrade, DowngradeRead); + } +} diff --git a/src/libstd/sync/spsc_queue.rs b/src/libstd/sync/spsc_queue.rs new file mode 100644 index 00000000000..f0eabe61737 --- /dev/null +++ b/src/libstd/sync/spsc_queue.rs @@ -0,0 +1,385 @@ +/* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved. + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT + * SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE + * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + * The views and conclusions contained in the software and documentation are + * those of the authors and should not be interpreted as representing official + * policies, either expressed or implied, of Dmitry Vyukov. + */ + +// http://www.1024cores.net/home/lock-free-algorithms/queues/unbounded-spsc-queue + +//! A single-producer single-consumer concurrent queue +//! +//! This module contains the implementation of an SPSC queue which can be used +//! concurrently between two tasks. This data structure is safe to use and +//! enforces the semantics that there is one pusher and one popper. + +#![experimental] + +use core::prelude::*; + +use alloc::boxed::Box; +use core::mem; +use core::cell::UnsafeCell; +use alloc::arc::Arc; + +use sync::atomic::{AtomicPtr, Relaxed, AtomicUint, Acquire, Release}; + +// Node within the linked list queue of messages to send +struct Node<T> { + // FIXME: this could be an uninitialized T if we're careful enough, and + // that would reduce memory usage (and be a bit faster). + // is it worth it? + value: Option<T>, // nullable for re-use of nodes + next: AtomicPtr<Node<T>>, // next node in the queue +} + +/// The single-producer single-consumer queue. This structure is not cloneable, +/// but it can be safely shared in an Arc if it is guaranteed that there +/// is only one popper and one pusher touching the queue at any one point in +/// time. +pub struct Queue<T> { + // consumer fields + tail: UnsafeCell<*mut Node<T>>, // where to pop from + tail_prev: AtomicPtr<Node<T>>, // where to pop from + + // producer fields + head: UnsafeCell<*mut Node<T>>, // where to push to + first: UnsafeCell<*mut Node<T>>, // where to get new nodes from + tail_copy: UnsafeCell<*mut Node<T>>, // between first/tail + + // Cache maintenance fields. Additions and subtractions are stored + // separately in order to allow them to use nonatomic addition/subtraction. + cache_bound: uint, + cache_additions: AtomicUint, + cache_subtractions: AtomicUint, +} + +/// A safe abstraction for the consumer in a single-producer single-consumer +/// queue. +pub struct Consumer<T> { + inner: Arc<Queue<T>> +} + +impl<T: Send> Consumer<T> { + /// Attempts to pop the value from the head of the queue, returning `None` + /// if the queue is empty. + pub fn pop(&mut self) -> Option<T> { + self.inner.pop() + } + + /// Attempts to peek at the head of the queue, returning `None` if the queue + /// is empty. + pub fn peek<'a>(&'a mut self) -> Option<&'a mut T> { + self.inner.peek() + } +} + +/// A safe abstraction for the producer in a single-producer single-consumer +/// queue. +pub struct Producer<T> { + inner: Arc<Queue<T>> +} + +impl<T: Send> Producer<T> { + /// Pushes a new value onto the queue. + pub fn push(&mut self, t: T) { + self.inner.push(t) + } +} + +impl<T: Send> Node<T> { + fn new() -> *mut Node<T> { + unsafe { + mem::transmute(box Node { + value: None, + next: AtomicPtr::new(0 as *mut Node<T>), + }) + } + } +} + +/// Creates a new queue with a consumer-producer pair. +/// +/// The producer returned is connected to the consumer to push all data to +/// the consumer. +/// +/// # Arguments +/// +/// * `bound` - This queue implementation is implemented with a linked +/// list, and this means that a push is always a malloc. In +/// order to amortize this cost, an internal cache of nodes is +/// maintained to prevent a malloc from always being +/// necessary. This bound is the limit on the size of the +/// cache (if desired). If the value is 0, then the cache has +/// no bound. Otherwise, the cache will never grow larger than +/// `bound` (although the queue itself could be much larger. +pub fn queue<T: Send>(bound: uint) -> (Consumer<T>, Producer<T>) { + let q = unsafe { Queue::new(bound) }; + let arc = Arc::new(q); + let consumer = Consumer { inner: arc.clone() }; + let producer = Producer { inner: arc }; + + (consumer, producer) +} + +impl<T: Send> Queue<T> { + /// Creates a new queue. + /// + /// This is unsafe as the type system doesn't enforce a single + /// consumer-producer relationship. It also allows the consumer to `pop` + /// items while there is a `peek` active due to all methods having a + /// non-mutable receiver. + /// + /// # Arguments + /// + /// * `bound` - This queue implementation is implemented with a linked + /// list, and this means that a push is always a malloc. In + /// order to amortize this cost, an internal cache of nodes is + /// maintained to prevent a malloc from always being + /// necessary. This bound is the limit on the size of the + /// cache (if desired). If the value is 0, then the cache has + /// no bound. Otherwise, the cache will never grow larger than + /// `bound` (although the queue itself could be much larger. + pub unsafe fn new(bound: uint) -> Queue<T> { + let n1 = Node::new(); + let n2 = Node::new(); + (*n1).next.store(n2, Relaxed); + Queue { + tail: UnsafeCell::new(n2), + tail_prev: AtomicPtr::new(n1), + head: UnsafeCell::new(n2), + first: UnsafeCell::new(n1), + tail_copy: UnsafeCell::new(n1), + cache_bound: bound, + cache_additions: AtomicUint::new(0), + cache_subtractions: AtomicUint::new(0), + } + } + + /// Pushes a new value onto this queue. Note that to use this function + /// safely, it must be externally guaranteed that there is only one pusher. + pub fn push(&self, t: T) { + unsafe { + // Acquire a node (which either uses a cached one or allocates a new + // one), and then append this to the 'head' node. + let n = self.alloc(); + assert!((*n).value.is_none()); + (*n).value = Some(t); + (*n).next.store(0 as *mut Node<T>, Relaxed); + (**self.head.get()).next.store(n, Release); + *self.head.get() = n; + } + } + + unsafe fn alloc(&self) -> *mut Node<T> { + // First try to see if we can consume the 'first' node for our uses. + // We try to avoid as many atomic instructions as possible here, so + // the addition to cache_subtractions is not atomic (plus we're the + // only one subtracting from the cache). + if *self.first.get() != *self.tail_copy.get() { + if self.cache_bound > 0 { + let b = self.cache_subtractions.load(Relaxed); + self.cache_subtractions.store(b + 1, Relaxed); + } + let ret = *self.first.get(); + *self.first.get() = (*ret).next.load(Relaxed); + return ret; + } + // If the above fails, then update our copy of the tail and try + // again. + *self.tail_copy.get() = self.tail_prev.load(Acquire); + if *self.first.get() != *self.tail_copy.get() { + if self.cache_bound > 0 { + let b = self.cache_subtractions.load(Relaxed); + self.cache_subtractions.store(b + 1, Relaxed); + } + let ret = *self.first.get(); + *self.first.get() = (*ret).next.load(Relaxed); + return ret; + } + // If all of that fails, then we have to allocate a new node + // (there's nothing in the node cache). + Node::new() + } + + /// Attempts to pop a value from this queue. Remember that to use this type + /// safely you must ensure that there is only one popper at a time. + pub fn pop(&self) -> Option<T> { + unsafe { + // The `tail` node is not actually a used node, but rather a + // sentinel from where we should start popping from. Hence, look at + // tail's next field and see if we can use it. If we do a pop, then + // the current tail node is a candidate for going into the cache. + let tail = *self.tail.get(); + let next = (*tail).next.load(Acquire); + if next.is_null() { return None } + assert!((*next).value.is_some()); + let ret = (*next).value.take(); + + *self.tail.get() = next; + if self.cache_bound == 0 { + self.tail_prev.store(tail, Release); + } else { + // FIXME: this is dubious with overflow. + let additions = self.cache_additions.load(Relaxed); + let subtractions = self.cache_subtractions.load(Relaxed); + let size = additions - subtractions; + + if size < self.cache_bound { + self.tail_prev.store(tail, Release); + self.cache_additions.store(additions + 1, Relaxed); + } else { + (*self.tail_prev.load(Relaxed)).next.store(next, Relaxed); + // We have successfully erased all references to 'tail', so + // now we can safely drop it. + let _: Box<Node<T>> = mem::transmute(tail); + } + } + return ret; + } + } + + /// Attempts to peek at the head of the queue, returning `None` if the queue + /// has no data currently + /// + /// # Warning + /// The reference returned is invalid if it is not used before the consumer + /// pops the value off the queue. If the producer then pushes another value + /// onto the queue, it will overwrite the value pointed to by the reference. + pub fn peek<'a>(&'a self) -> Option<&'a mut T> { + // This is essentially the same as above with all the popping bits + // stripped out. + unsafe { + let tail = *self.tail.get(); + let next = (*tail).next.load(Acquire); + if next.is_null() { return None } + return (*next).value.as_mut(); + } + } +} + +#[unsafe_destructor] +impl<T: Send> Drop for Queue<T> { + fn drop(&mut self) { + unsafe { + let mut cur = *self.first.get(); + while !cur.is_null() { + let next = (*cur).next.load(Relaxed); + let _n: Box<Node<T>> = mem::transmute(cur); + cur = next; + } + } + } +} + +#[cfg(test)] +mod test { + use prelude::*; + + use super::{queue}; + + #[test] + fn smoke() { + let (mut consumer, mut producer) = queue(0); + producer.push(1i); + producer.push(2); + assert_eq!(consumer.pop(), Some(1i)); + assert_eq!(consumer.pop(), Some(2)); + assert_eq!(consumer.pop(), None); + producer.push(3); + producer.push(4); + assert_eq!(consumer.pop(), Some(3)); + assert_eq!(consumer.pop(), Some(4)); + assert_eq!(consumer.pop(), None); + } + + #[test] + fn peek() { + let (mut consumer, mut producer) = queue(0); + producer.push(vec![1i]); + + // Ensure the borrowchecker works + match consumer.peek() { + Some(vec) => match vec.as_slice() { + // Note that `pop` is not allowed here due to borrow + [1] => {} + _ => return + }, + None => unreachable!() + } + + consumer.pop(); + } + + #[test] + fn drop_full() { + let (_, mut producer) = queue(0); + producer.push(box 1i); + producer.push(box 2i); + } + + #[test] + fn smoke_bound() { + let (mut consumer, mut producer) = queue(1); + producer.push(1i); + producer.push(2); + assert_eq!(consumer.pop(), Some(1)); + assert_eq!(consumer.pop(), Some(2)); + assert_eq!(consumer.pop(), None); + producer.push(3); + producer.push(4); + assert_eq!(consumer.pop(), Some(3)); + assert_eq!(consumer.pop(), Some(4)); + assert_eq!(consumer.pop(), None); + } + + #[test] + fn stress() { + stress_bound(0); + stress_bound(1); + + fn stress_bound(bound: uint) { + let (consumer, mut producer) = queue(bound); + + let (tx, rx) = channel(); + spawn(proc() { + // Move the consumer to a local mutable slot + let mut consumer = consumer; + for _ in range(0u, 100000) { + loop { + match consumer.pop() { + Some(1i) => break, + Some(_) => panic!(), + None => {} + } + } + } + tx.send(()); + }); + for _ in range(0i, 100000) { + producer.push(1); + } + rx.recv(); + } + } +} |
