diff options
Diffstat (limited to 'src/libstd/sync/raw.rs')
| -rw-r--r-- | src/libstd/sync/raw.rs | 1129 |
1 files changed, 1129 insertions, 0 deletions
diff --git a/src/libstd/sync/raw.rs b/src/libstd/sync/raw.rs new file mode 100644 index 00000000000..ff3f2c9462c --- /dev/null +++ b/src/libstd/sync/raw.rs @@ -0,0 +1,1129 @@ +// Copyright 2012-2014 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +//! Raw concurrency primitives you know and love. +//! +//! These primitives are not recommended for general use, but are provided for +//! flavorful use-cases. It is recommended to use the types at the top of the +//! `sync` crate which wrap values directly and provide safer abstractions for +//! containing data. + +// A side-effect of merging libsync into libstd; will go away once +// libsync rewrite lands +#![allow(dead_code)] + +use core::prelude::*; +use self::ReacquireOrderLock::*; + +use core::atomic; +use core::finally::Finally; +use core::kinds::marker; +use core::mem; +use core::cell::UnsafeCell; +use vec::Vec; + +use super::mutex; +use comm::{Receiver, Sender, channel}; + +/**************************************************************************** + * Internals + ****************************************************************************/ + +// Each waiting task receives on one of these. +type WaitEnd = Receiver<()>; +type SignalEnd = Sender<()>; +// A doubly-ended queue of waiting tasks. +struct WaitQueue { + head: Receiver<SignalEnd>, + tail: Sender<SignalEnd>, +} + +impl WaitQueue { + fn new() -> WaitQueue { + let (block_tail, block_head) = channel(); + WaitQueue { head: block_head, tail: block_tail } + } + + // Signals one live task from the queue. + fn signal(&self) -> bool { + match self.head.try_recv() { + Ok(ch) => { + // Send a wakeup signal. If the waiter was killed, its port will + // have closed. Keep trying until we get a live task. + if ch.send_opt(()).is_ok() { + true + } else { + self.signal() + } + } + _ => false + } + } + + fn broadcast(&self) -> uint { + let mut count = 0; + loop { + match self.head.try_recv() { + Ok(ch) => { + if ch.send_opt(()).is_ok() { + count += 1; + } + } + _ => break + } + } + count + } + + fn wait_end(&self) -> WaitEnd { + let (signal_end, wait_end) = channel(); + self.tail.send(signal_end); + wait_end + } +} + +// The building-block used to make semaphores, mutexes, and rwlocks. +struct Sem<Q> { + lock: mutex::Mutex, + // n.b, we need Sem to be `Sync`, but the WaitQueue type is not send/share + // (for good reason). We have an internal invariant on this semaphore, + // however, that the queue is never accessed outside of a locked + // context. + inner: UnsafeCell<SemInner<Q>> +} + +struct SemInner<Q> { + count: int, + waiters: WaitQueue, + // Can be either unit or another waitqueue. Some sems shouldn't come with + // a condition variable attached, others should. + blocked: Q, +} + +#[must_use] +struct SemGuard<'a, Q:'a> { + sem: &'a Sem<Q>, +} + +impl<Q: Send> Sem<Q> { + fn new(count: int, q: Q) -> Sem<Q> { + assert!(count >= 0, + "semaphores cannot be initialized with negative values"); + Sem { + lock: mutex::Mutex::new(), + inner: UnsafeCell::new(SemInner { + waiters: WaitQueue::new(), + count: count, + blocked: q, + }) + } + } + + unsafe fn with(&self, f: |&mut SemInner<Q>|) { + let _g = self.lock.lock(); + // This &mut is safe because, due to the lock, we are the only one who can touch the data + f(&mut *self.inner.get()) + } + + pub fn acquire(&self) { + unsafe { + let mut waiter_nobe = None; + self.with(|state| { + state.count -= 1; + if state.count < 0 { + // Create waiter nobe, enqueue ourself, and tell + // outer scope we need to block. + waiter_nobe = Some(state.waiters.wait_end()); + } + }); + // Uncomment if you wish to test for sem races. Not + // valgrind-friendly. + /* for _ in range(0u, 1000) { task::deschedule(); } */ + // Need to wait outside the exclusive. + if waiter_nobe.is_some() { + let _ = waiter_nobe.unwrap().recv(); + } + } + } + + pub fn release(&self) { + unsafe { + self.with(|state| { + state.count += 1; + if state.count <= 0 { + state.waiters.signal(); + } + }) + } + } + + pub fn access<'a>(&'a self) -> SemGuard<'a, Q> { + self.acquire(); + SemGuard { sem: self } + } +} + +#[unsafe_destructor] +impl<'a, Q: Send> Drop for SemGuard<'a, Q> { + fn drop(&mut self) { + self.sem.release(); + } +} + +impl Sem<Vec<WaitQueue>> { + fn new_and_signal(count: int, num_condvars: uint) -> Sem<Vec<WaitQueue>> { + let mut queues = Vec::new(); + for _ in range(0, num_condvars) { queues.push(WaitQueue::new()); } + Sem::new(count, queues) + } + + // The only other places that condvars get built are rwlock.write_cond() + // and rwlock_write_mode. + pub fn access_cond<'a>(&'a self) -> SemCondGuard<'a> { + SemCondGuard { + guard: self.access(), + cvar: Condvar { sem: self, order: Nothing, nocopy: marker::NoCopy }, + } + } +} + +// FIXME(#3598): Want to use an Option down below, but we need a custom enum +// that's not polymorphic to get around the fact that lifetimes are invariant +// inside of type parameters. +enum ReacquireOrderLock<'a> { + Nothing, // c.c + Just(&'a Semaphore), +} + +/// A mechanism for atomic-unlock-and-deschedule blocking and signalling. +pub struct Condvar<'a> { + // The 'Sem' object associated with this condvar. This is the one that's + // atomically-unlocked-and-descheduled upon and reacquired during wakeup. + sem: &'a Sem<Vec<WaitQueue> >, + // This is (can be) an extra semaphore which is held around the reacquire + // operation on the first one. This is only used in cvars associated with + // rwlocks, and is needed to ensure that, when a downgrader is trying to + // hand off the access lock (which would be the first field, here), a 2nd + // writer waking up from a cvar wait can't race with a reader to steal it, + // See the comment in write_cond for more detail. + order: ReacquireOrderLock<'a>, + // Make sure condvars are non-copyable. + nocopy: marker::NoCopy, +} + +impl<'a> Condvar<'a> { + /// Atomically drop the associated lock, and block until a signal is sent. + /// + /// # Panics + /// + /// A task which is killed while waiting on a condition variable will wake + /// up, panic, and unlock the associated lock as it unwinds. + pub fn wait(&self) { self.wait_on(0) } + + /// As wait(), but can specify which of multiple condition variables to + /// wait on. Only a signal_on() or broadcast_on() with the same condvar_id + /// will wake this thread. + /// + /// The associated lock must have been initialised with an appropriate + /// number of condvars. The condvar_id must be between 0 and num_condvars-1 + /// or else this call will panic. + /// + /// wait() is equivalent to wait_on(0). + pub fn wait_on(&self, condvar_id: uint) { + let mut wait_end = None; + let mut out_of_bounds = None; + // Release lock, 'atomically' enqueuing ourselves in so doing. + unsafe { + self.sem.with(|state| { + if condvar_id < state.blocked.len() { + // Drop the lock. + state.count += 1; + if state.count <= 0 { + state.waiters.signal(); + } + // Create waiter nobe, and enqueue ourself to + // be woken up by a signaller. + wait_end = Some(state.blocked[condvar_id].wait_end()); + } else { + out_of_bounds = Some(state.blocked.len()); + } + }) + } + + // If deschedule checks start getting inserted anywhere, we can be + // killed before or after enqueueing. + check_cvar_bounds(out_of_bounds, condvar_id, "cond.wait_on()", || { + // Unconditionally "block". (Might not actually block if a + // signaller already sent -- I mean 'unconditionally' in contrast + // with acquire().) + (|| { + let _ = wait_end.take().unwrap().recv(); + }).finally(|| { + // Reacquire the condvar. + match self.order { + Just(lock) => { + let _g = lock.access(); + self.sem.acquire(); + } + Nothing => self.sem.acquire(), + } + }) + }) + } + + /// Wake up a blocked task. Returns false if there was no blocked task. + pub fn signal(&self) -> bool { self.signal_on(0) } + + /// As signal, but with a specified condvar_id. See wait_on. + pub fn signal_on(&self, condvar_id: uint) -> bool { + unsafe { + let mut out_of_bounds = None; + let mut result = false; + self.sem.with(|state| { + if condvar_id < state.blocked.len() { + result = state.blocked[condvar_id].signal(); + } else { + out_of_bounds = Some(state.blocked.len()); + } + }); + check_cvar_bounds(out_of_bounds, + condvar_id, + "cond.signal_on()", + || result) + } + } + + /// Wake up all blocked tasks. Returns the number of tasks woken. + pub fn broadcast(&self) -> uint { self.broadcast_on(0) } + + /// As broadcast, but with a specified condvar_id. See wait_on. + pub fn broadcast_on(&self, condvar_id: uint) -> uint { + let mut out_of_bounds = None; + let mut queue = None; + unsafe { + self.sem.with(|state| { + if condvar_id < state.blocked.len() { + // To avoid :broadcast_heavy, we make a new waitqueue, + // swap it out with the old one, and broadcast on the + // old one outside of the little-lock. + queue = Some(mem::replace(&mut state.blocked[condvar_id], + WaitQueue::new())); + } else { + out_of_bounds = Some(state.blocked.len()); + } + }); + check_cvar_bounds(out_of_bounds, + condvar_id, + "cond.signal_on()", + || { + queue.take().unwrap().broadcast() + }) + } + } +} + +// Checks whether a condvar ID was out of bounds, and panics if so, or does +// something else next on success. +#[inline] +fn check_cvar_bounds<U>( + out_of_bounds: Option<uint>, + id: uint, + act: &str, + blk: || -> U) + -> U { + match out_of_bounds { + Some(0) => + panic!("{} with illegal ID {} - this lock has no condvars!", act, id), + Some(length) => + panic!("{} with illegal ID {} - ID must be less than {}", act, id, length), + None => blk() + } +} + +#[must_use] +struct SemCondGuard<'a> { + guard: SemGuard<'a, Vec<WaitQueue>>, + cvar: Condvar<'a>, +} + +/**************************************************************************** + * Semaphores + ****************************************************************************/ + +/// A counting, blocking, bounded-waiting semaphore. +pub struct Semaphore { + sem: Sem<()>, +} + +/// An RAII guard used to represent an acquired resource to a semaphore. When +/// dropped, this value will release the resource back to the semaphore. +#[must_use] +pub struct SemaphoreGuard<'a> { + _guard: SemGuard<'a, ()>, +} + +impl Semaphore { + /// Create a new semaphore with the specified count. + /// + /// # Panics + /// + /// This function will panic if `count` is negative. + pub fn new(count: int) -> Semaphore { + Semaphore { sem: Sem::new(count, ()) } + } + + /// Acquire a resource represented by the semaphore. Blocks if necessary + /// until resource(s) become available. + pub fn acquire(&self) { self.sem.acquire() } + + /// Release a held resource represented by the semaphore. Wakes a blocked + /// contending task, if any exist. Won't block the caller. + pub fn release(&self) { self.sem.release() } + + /// Acquire a resource of this semaphore, returning an RAII guard which will + /// release the resource when dropped. + pub fn access<'a>(&'a self) -> SemaphoreGuard<'a> { + SemaphoreGuard { _guard: self.sem.access() } + } +} + +/**************************************************************************** + * Mutexes + ****************************************************************************/ + +/// A blocking, bounded-waiting, mutual exclusion lock with an associated +/// FIFO condition variable. +/// +/// # Panics +/// +/// A task which panicks while holding a mutex will unlock the mutex as it +/// unwinds. +pub struct Mutex { + sem: Sem<Vec<WaitQueue>>, +} + +/// An RAII structure which is used to gain access to a mutex's condition +/// variable. Additionally, when a value of this type is dropped, the +/// corresponding mutex is also unlocked. +#[must_use] +pub struct MutexGuard<'a> { + _guard: SemGuard<'a, Vec<WaitQueue>>, + /// Inner condition variable which is connected to the outer mutex, and can + /// be used for atomic-unlock-and-deschedule. + pub cond: Condvar<'a>, +} + +impl Mutex { + /// Create a new mutex, with one associated condvar. + pub fn new() -> Mutex { Mutex::new_with_condvars(1) } + + /// Create a new mutex, with a specified number of associated condvars. This + /// will allow calling wait_on/signal_on/broadcast_on with condvar IDs + /// between 0 and num_condvars-1. (If num_condvars is 0, lock_cond will be + /// allowed but any operations on the condvar will panic.) + pub fn new_with_condvars(num_condvars: uint) -> Mutex { + Mutex { sem: Sem::new_and_signal(1, num_condvars) } + } + + /// Acquires ownership of this mutex, returning an RAII guard which will + /// unlock the mutex when dropped. The associated condition variable can + /// also be accessed through the returned guard. + pub fn lock<'a>(&'a self) -> MutexGuard<'a> { + let SemCondGuard { guard, cvar } = self.sem.access_cond(); + MutexGuard { _guard: guard, cond: cvar } + } +} + +/**************************************************************************** + * Reader-writer locks + ****************************************************************************/ + +// NB: Wikipedia - Readers-writers_problem#The_third_readers-writers_problem + +/// A blocking, no-starvation, reader-writer lock with an associated condvar. +/// +/// # Panics +/// +/// A task which panics while holding an rwlock will unlock the rwlock as it +/// unwinds. +pub struct RWLock { + order_lock: Semaphore, + access_lock: Sem<Vec<WaitQueue>>, + + // The only way the count flag is ever accessed is with xadd. Since it is + // a read-modify-write operation, multiple xadds on different cores will + // always be consistent with respect to each other, so a monotonic/relaxed + // consistency ordering suffices (i.e., no extra barriers are needed). + // + // FIXME(#6598): The atomics module has no relaxed ordering flag, so I use + // acquire/release orderings superfluously. Change these someday. + read_count: atomic::AtomicUint, +} + +/// An RAII helper which is created by acquiring a read lock on an RWLock. When +/// dropped, this will unlock the RWLock. +#[must_use] +pub struct RWLockReadGuard<'a> { + lock: &'a RWLock, +} + +/// An RAII helper which is created by acquiring a write lock on an RWLock. When +/// dropped, this will unlock the RWLock. +/// +/// A value of this type can also be consumed to downgrade to a read-only lock. +#[must_use] +pub struct RWLockWriteGuard<'a> { + lock: &'a RWLock, + /// Inner condition variable that is connected to the write-mode of the + /// outer rwlock. + pub cond: Condvar<'a>, +} + +impl RWLock { + /// Create a new rwlock, with one associated condvar. + pub fn new() -> RWLock { RWLock::new_with_condvars(1) } + + /// Create a new rwlock, with a specified number of associated condvars. + /// Similar to mutex_with_condvars. + pub fn new_with_condvars(num_condvars: uint) -> RWLock { + RWLock { + order_lock: Semaphore::new(1), + access_lock: Sem::new_and_signal(1, num_condvars), + read_count: atomic::AtomicUint::new(0), + } + } + + /// Acquires a read-lock, returning an RAII guard that will unlock the lock + /// when dropped. Calls to 'read' from other tasks may run concurrently with + /// this one. + pub fn read<'a>(&'a self) -> RWLockReadGuard<'a> { + let _guard = self.order_lock.access(); + let old_count = self.read_count.fetch_add(1, atomic::Acquire); + if old_count == 0 { + self.access_lock.acquire(); + } + RWLockReadGuard { lock: self } + } + + /// Acquire a write-lock, returning an RAII guard that will unlock the lock + /// when dropped. No calls to 'read' or 'write' from other tasks will run + /// concurrently with this one. + /// + /// You can also downgrade a write to a read by calling the `downgrade` + /// method on the returned guard. Additionally, the guard will contain a + /// `Condvar` attached to this lock. + /// + /// # Example + /// + /// ```{rust,ignore} + /// use std::sync::raw::RWLock; + /// + /// let lock = RWLock::new(); + /// let write = lock.write(); + /// // ... exclusive access ... + /// let read = write.downgrade(); + /// // ... shared access ... + /// drop(read); + /// ``` + pub fn write<'a>(&'a self) -> RWLockWriteGuard<'a> { + let _g = self.order_lock.access(); + self.access_lock.acquire(); + + // It's important to thread our order lock into the condvar, so that + // when a cond.wait() wakes up, it uses it while reacquiring the + // access lock. If we permitted a waking-up writer to "cut in line", + // there could arise a subtle race when a downgrader attempts to hand + // off the reader cloud lock to a waiting reader. This race is tested + // in arc.rs (test_rw_write_cond_downgrade_read_race) and looks like: + // T1 (writer) T2 (downgrader) T3 (reader) + // [in cond.wait()] + // [locks for writing] + // [holds access_lock] + // [is signalled, perhaps by + // downgrader or a 4th thread] + // tries to lock access(!) + // lock order_lock + // xadd read_count[0->1] + // tries to lock access + // [downgrade] + // xadd read_count[1->2] + // unlock access + // Since T1 contended on the access lock before T3 did, it will steal + // the lock handoff. Adding order_lock in the condvar reacquire path + // solves this because T1 will hold order_lock while waiting on access, + // which will cause T3 to have to wait until T1 finishes its write, + // which can't happen until T2 finishes the downgrade-read entirely. + // The astute reader will also note that making waking writers use the + // order_lock is better for not starving readers. + RWLockWriteGuard { + lock: self, + cond: Condvar { + sem: &self.access_lock, + order: Just(&self.order_lock), + nocopy: marker::NoCopy, + } + } + } +} + +impl<'a> RWLockWriteGuard<'a> { + /// Consumes this write lock and converts it into a read lock. + pub fn downgrade(self) -> RWLockReadGuard<'a> { + let lock = self.lock; + // Don't run the destructor of the write guard, we're in charge of + // things from now on + unsafe { mem::forget(self) } + + let old_count = lock.read_count.fetch_add(1, atomic::Release); + // If another reader was already blocking, we need to hand-off + // the "reader cloud" access lock to them. + if old_count != 0 { + // Guaranteed not to let another writer in, because + // another reader was holding the order_lock. Hence they + // must be the one to get the access_lock (because all + // access_locks are acquired with order_lock held). See + // the comment in write_cond for more justification. + lock.access_lock.release(); + } + RWLockReadGuard { lock: lock } + } +} + +#[unsafe_destructor] +impl<'a> Drop for RWLockWriteGuard<'a> { + fn drop(&mut self) { + self.lock.access_lock.release(); + } +} + +#[unsafe_destructor] +impl<'a> Drop for RWLockReadGuard<'a> { + fn drop(&mut self) { + let old_count = self.lock.read_count.fetch_sub(1, atomic::Release); + assert!(old_count > 0); + if old_count == 1 { + // Note: this release used to be outside of a locked access + // to exclusive-protected state. If this code is ever + // converted back to such (instead of using atomic ops), + // this access MUST NOT go inside the exclusive access. + self.lock.access_lock.release(); + } + } +} + +/**************************************************************************** + * Tests + ****************************************************************************/ + +#[cfg(test)] +mod tests { + pub use self::RWLockMode::*; + + use sync::Arc; + use prelude::*; + use super::{Semaphore, Mutex, RWLock, Condvar}; + + use mem; + use result; + use task; + + /************************************************************************ + * Semaphore tests + ************************************************************************/ + #[test] + fn test_sem_acquire_release() { + let s = Semaphore::new(1); + s.acquire(); + s.release(); + s.acquire(); + } + #[test] + fn test_sem_basic() { + let s = Semaphore::new(1); + let _g = s.access(); + } + #[test] + #[should_fail] + fn test_sem_basic2() { + Semaphore::new(-1); + } + #[test] + fn test_sem_as_mutex() { + let s = Arc::new(Semaphore::new(1)); + let s2 = s.clone(); + task::spawn(proc() { + let _g = s2.access(); + for _ in range(0u, 5) { task::deschedule(); } + }); + let _g = s.access(); + for _ in range(0u, 5) { task::deschedule(); } + } + #[test] + fn test_sem_as_cvar() { + /* Child waits and parent signals */ + let (tx, rx) = channel(); + let s = Arc::new(Semaphore::new(0)); + let s2 = s.clone(); + task::spawn(proc() { + s2.acquire(); + tx.send(()); + }); + for _ in range(0u, 5) { task::deschedule(); } + s.release(); + let _ = rx.recv(); + + /* Parent waits and child signals */ + let (tx, rx) = channel(); + let s = Arc::new(Semaphore::new(0)); + let s2 = s.clone(); + task::spawn(proc() { + for _ in range(0u, 5) { task::deschedule(); } + s2.release(); + let _ = rx.recv(); + }); + s.acquire(); + tx.send(()); + } + #[test] + fn test_sem_multi_resource() { + // Parent and child both get in the critical section at the same + // time, and shake hands. + let s = Arc::new(Semaphore::new(2)); + let s2 = s.clone(); + let (tx1, rx1) = channel(); + let (tx2, rx2) = channel(); + task::spawn(proc() { + let _g = s2.access(); + let _ = rx2.recv(); + tx1.send(()); + }); + let _g = s.access(); + tx2.send(()); + let _ = rx1.recv(); + } + #[test] + fn test_sem_runtime_friendly_blocking() { + // Force the runtime to schedule two threads on the same sched_loop. + // When one blocks, it should schedule the other one. + let s = Arc::new(Semaphore::new(1)); + let s2 = s.clone(); + let (tx, rx) = channel(); + { + let _g = s.access(); + task::spawn(proc() { + tx.send(()); + drop(s2.access()); + tx.send(()); + }); + rx.recv(); // wait for child to come alive + for _ in range(0u, 5) { task::deschedule(); } // let the child contend + } + rx.recv(); // wait for child to be done + } + /************************************************************************ + * Mutex tests + ************************************************************************/ + #[test] + fn test_mutex_lock() { + // Unsafely achieve shared state, and do the textbook + // "load tmp = move ptr; inc tmp; store ptr <- tmp" dance. + let (tx, rx) = channel(); + let m = Arc::new(Mutex::new()); + let m2 = m.clone(); + let mut sharedstate = box 0; + { + let ptr: *mut int = &mut *sharedstate; + task::spawn(proc() { + access_shared(ptr, &m2, 10); + tx.send(()); + }); + } + { + access_shared(&mut *sharedstate, &m, 10); + let _ = rx.recv(); + + assert_eq!(*sharedstate, 20); + } + + fn access_shared(sharedstate: *mut int, m: &Arc<Mutex>, n: uint) { + for _ in range(0u, n) { + let _g = m.lock(); + let oldval = unsafe { *sharedstate }; + task::deschedule(); + unsafe { *sharedstate = oldval + 1; } + } + } + } + #[test] + fn test_mutex_cond_wait() { + let m = Arc::new(Mutex::new()); + + // Child wakes up parent + { + let lock = m.lock(); + let m2 = m.clone(); + task::spawn(proc() { + let lock = m2.lock(); + let woken = lock.cond.signal(); + assert!(woken); + }); + lock.cond.wait(); + } + // Parent wakes up child + let (tx, rx) = channel(); + let m3 = m.clone(); + task::spawn(proc() { + let lock = m3.lock(); + tx.send(()); + lock.cond.wait(); + tx.send(()); + }); + rx.recv(); // Wait until child gets in the mutex + { + let lock = m.lock(); + let woken = lock.cond.signal(); + assert!(woken); + } + rx.recv(); // Wait until child wakes up + } + + fn test_mutex_cond_broadcast_helper(num_waiters: uint) { + let m = Arc::new(Mutex::new()); + let mut rxs = Vec::new(); + + for _ in range(0u, num_waiters) { + let mi = m.clone(); + let (tx, rx) = channel(); + rxs.push(rx); + task::spawn(proc() { + let lock = mi.lock(); + tx.send(()); + lock.cond.wait(); + tx.send(()); + }); + } + + // wait until all children get in the mutex + for rx in rxs.iter_mut() { rx.recv(); } + { + let lock = m.lock(); + let num_woken = lock.cond.broadcast(); + assert_eq!(num_woken, num_waiters); + } + // wait until all children wake up + for rx in rxs.iter_mut() { rx.recv(); } + } + #[test] + fn test_mutex_cond_broadcast() { + test_mutex_cond_broadcast_helper(12); + } + #[test] + fn test_mutex_cond_broadcast_none() { + test_mutex_cond_broadcast_helper(0); + } + #[test] + fn test_mutex_cond_no_waiter() { + let m = Arc::new(Mutex::new()); + let m2 = m.clone(); + let _ = task::try(proc() { + drop(m.lock()); + }); + let lock = m2.lock(); + assert!(!lock.cond.signal()); + } + #[test] + fn test_mutex_killed_simple() { + use any::Any; + + // Mutex must get automatically unlocked if panicked/killed within. + let m = Arc::new(Mutex::new()); + let m2 = m.clone(); + + let result: result::Result<(), Box<Any + Send>> = task::try(proc() { + let _lock = m2.lock(); + panic!(); + }); + assert!(result.is_err()); + // child task must have finished by the time try returns + drop(m.lock()); + } + #[test] + fn test_mutex_cond_signal_on_0() { + // Tests that signal_on(0) is equivalent to signal(). + let m = Arc::new(Mutex::new()); + let lock = m.lock(); + let m2 = m.clone(); + task::spawn(proc() { + let lock = m2.lock(); + lock.cond.signal_on(0); + }); + lock.cond.wait(); + } + #[test] + fn test_mutex_no_condvars() { + let result = task::try(proc() { + let m = Mutex::new_with_condvars(0); + m.lock().cond.wait(); + }); + assert!(result.is_err()); + let result = task::try(proc() { + let m = Mutex::new_with_condvars(0); + m.lock().cond.signal(); + }); + assert!(result.is_err()); + let result = task::try(proc() { + let m = Mutex::new_with_condvars(0); + m.lock().cond.broadcast(); + }); + assert!(result.is_err()); + } + /************************************************************************ + * Reader/writer lock tests + ************************************************************************/ + #[cfg(test)] + pub enum RWLockMode { Read, Write, Downgrade, DowngradeRead } + #[cfg(test)] + fn lock_rwlock_in_mode(x: &Arc<RWLock>, mode: RWLockMode, blk: ||) { + match mode { + Read => { let _g = x.read(); blk() } + Write => { let _g = x.write(); blk() } + Downgrade => { let _g = x.write(); blk() } + DowngradeRead => { let _g = x.write().downgrade(); blk() } + } + } + #[cfg(test)] + fn test_rwlock_exclusion(x: Arc<RWLock>, + mode1: RWLockMode, + mode2: RWLockMode) { + // Test mutual exclusion between readers and writers. Just like the + // mutex mutual exclusion test, a ways above. + let (tx, rx) = channel(); + let x2 = x.clone(); + let mut sharedstate = box 0; + { + let ptr: *const int = &*sharedstate; + task::spawn(proc() { + let sharedstate: &mut int = + unsafe { mem::transmute(ptr) }; + access_shared(sharedstate, &x2, mode1, 10); + tx.send(()); + }); + } + { + access_shared(&mut *sharedstate, &x, mode2, 10); + let _ = rx.recv(); + + assert_eq!(*sharedstate, 20); + } + + fn access_shared(sharedstate: &mut int, x: &Arc<RWLock>, + mode: RWLockMode, n: uint) { + for _ in range(0u, n) { + lock_rwlock_in_mode(x, mode, || { + let oldval = *sharedstate; + task::deschedule(); + *sharedstate = oldval + 1; + }) + } + } + } + #[test] + fn test_rwlock_readers_wont_modify_the_data() { + test_rwlock_exclusion(Arc::new(RWLock::new()), Read, Write); + test_rwlock_exclusion(Arc::new(RWLock::new()), Write, Read); + test_rwlock_exclusion(Arc::new(RWLock::new()), Read, Downgrade); + test_rwlock_exclusion(Arc::new(RWLock::new()), Downgrade, Read); + test_rwlock_exclusion(Arc::new(RWLock::new()), Write, DowngradeRead); + test_rwlock_exclusion(Arc::new(RWLock::new()), DowngradeRead, Write); + } + #[test] + fn test_rwlock_writers_and_writers() { + test_rwlock_exclusion(Arc::new(RWLock::new()), Write, Write); + test_rwlock_exclusion(Arc::new(RWLock::new()), Write, Downgrade); + test_rwlock_exclusion(Arc::new(RWLock::new()), Downgrade, Write); + test_rwlock_exclusion(Arc::new(RWLock::new()), Downgrade, Downgrade); + } + #[cfg(test)] + fn test_rwlock_handshake(x: Arc<RWLock>, + mode1: RWLockMode, + mode2: RWLockMode, + make_mode2_go_first: bool) { + // Much like sem_multi_resource. + let x2 = x.clone(); + let (tx1, rx1) = channel(); + let (tx2, rx2) = channel(); + task::spawn(proc() { + if !make_mode2_go_first { + rx2.recv(); // parent sends to us once it locks, or ... + } + lock_rwlock_in_mode(&x2, mode2, || { + if make_mode2_go_first { + tx1.send(()); // ... we send to it once we lock + } + rx2.recv(); + tx1.send(()); + }) + }); + if make_mode2_go_first { + rx1.recv(); // child sends to us once it locks, or ... + } + lock_rwlock_in_mode(&x, mode1, || { + if !make_mode2_go_first { + tx2.send(()); // ... we send to it once we lock + } + tx2.send(()); + rx1.recv(); + }) + } + #[test] + fn test_rwlock_readers_and_readers() { + test_rwlock_handshake(Arc::new(RWLock::new()), Read, Read, false); + // The downgrader needs to get in before the reader gets in, otherwise + // they cannot end up reading at the same time. + test_rwlock_handshake(Arc::new(RWLock::new()), DowngradeRead, Read, false); + test_rwlock_handshake(Arc::new(RWLock::new()), Read, DowngradeRead, true); + // Two downgrade_reads can never both end up reading at the same time. + } + #[test] + fn test_rwlock_downgrade_unlock() { + // Tests that downgrade can unlock the lock in both modes + let x = Arc::new(RWLock::new()); + lock_rwlock_in_mode(&x, Downgrade, || { }); + test_rwlock_handshake(x, Read, Read, false); + let y = Arc::new(RWLock::new()); + lock_rwlock_in_mode(&y, DowngradeRead, || { }); + test_rwlock_exclusion(y, Write, Write); + } + #[test] + fn test_rwlock_read_recursive() { + let x = RWLock::new(); + let _g1 = x.read(); + let _g2 = x.read(); + } + #[test] + fn test_rwlock_cond_wait() { + // As test_mutex_cond_wait above. + let x = Arc::new(RWLock::new()); + + // Child wakes up parent + { + let lock = x.write(); + let x2 = x.clone(); + task::spawn(proc() { + let lock = x2.write(); + assert!(lock.cond.signal()); + }); + lock.cond.wait(); + } + // Parent wakes up child + let (tx, rx) = channel(); + let x3 = x.clone(); + task::spawn(proc() { + let lock = x3.write(); + tx.send(()); + lock.cond.wait(); + tx.send(()); + }); + rx.recv(); // Wait until child gets in the rwlock + drop(x.read()); // Must be able to get in as a reader + { + let x = x.write(); + assert!(x.cond.signal()); + } + rx.recv(); // Wait until child wakes up + drop(x.read()); // Just for good measure + } + #[cfg(test)] + fn test_rwlock_cond_broadcast_helper(num_waiters: uint) { + // Much like the mutex broadcast test. Downgrade-enabled. + fn lock_cond(x: &Arc<RWLock>, blk: |c: &Condvar|) { + let lock = x.write(); + blk(&lock.cond); + } + + let x = Arc::new(RWLock::new()); + let mut rxs = Vec::new(); + + for _ in range(0u, num_waiters) { + let xi = x.clone(); + let (tx, rx) = channel(); + rxs.push(rx); + task::spawn(proc() { + lock_cond(&xi, |cond| { + tx.send(()); + cond.wait(); + tx.send(()); + }) + }); + } + + // wait until all children get in the mutex + for rx in rxs.iter_mut() { let _ = rx.recv(); } + lock_cond(&x, |cond| { + let num_woken = cond.broadcast(); + assert_eq!(num_woken, num_waiters); + }); + // wait until all children wake up + for rx in rxs.iter_mut() { let _ = rx.recv(); } + } + #[test] + fn test_rwlock_cond_broadcast() { + test_rwlock_cond_broadcast_helper(0); + test_rwlock_cond_broadcast_helper(12); + } + #[cfg(test)] + fn rwlock_kill_helper(mode1: RWLockMode, mode2: RWLockMode) { + use any::Any; + + // Mutex must get automatically unlocked if panicked/killed within. + let x = Arc::new(RWLock::new()); + let x2 = x.clone(); + + let result: result::Result<(), Box<Any + Send>> = task::try(proc() { + lock_rwlock_in_mode(&x2, mode1, || { + panic!(); + }) + }); + assert!(result.is_err()); + // child task must have finished by the time try returns + lock_rwlock_in_mode(&x, mode2, || { }) + } + #[test] + fn test_rwlock_reader_killed_writer() { + rwlock_kill_helper(Read, Write); + } + #[test] + fn test_rwlock_writer_killed_reader() { + rwlock_kill_helper(Write, Read); + } + #[test] + fn test_rwlock_reader_killed_reader() { + rwlock_kill_helper(Read, Read); + } + #[test] + fn test_rwlock_writer_killed_writer() { + rwlock_kill_helper(Write, Write); + } + #[test] + fn test_rwlock_kill_downgrader() { + rwlock_kill_helper(Downgrade, Read); + rwlock_kill_helper(Read, Downgrade); + rwlock_kill_helper(Downgrade, Write); + rwlock_kill_helper(Write, Downgrade); + rwlock_kill_helper(DowngradeRead, Read); + rwlock_kill_helper(Read, DowngradeRead); + rwlock_kill_helper(DowngradeRead, Write); + rwlock_kill_helper(Write, DowngradeRead); + rwlock_kill_helper(DowngradeRead, Downgrade); + rwlock_kill_helper(DowngradeRead, Downgrade); + rwlock_kill_helper(Downgrade, DowngradeRead); + rwlock_kill_helper(Downgrade, DowngradeRead); + } +} |
