// Copyright 2012-2014 The Rust Project Developers. See the COPYRIGHT // file at the top-level directory of this distribution and at // http://rust-lang.org/COPYRIGHT. // // Licensed under the Apache License, Version 2.0 or the MIT license // , at your // option. This file may not be copied, modified, or distributed // except according to those terms. #[allow(missing_doc)]; /** * The concurrency primitives you know and love. * * Maybe once we have a "core exports x only to std" mechanism, these can be * in std. */ use std::cast; use std::comm; use std::kinds::marker; use std::mem::replace; use std::sync::arc::UnsafeArc; use std::sync::atomics; use std::unstable::finally::Finally; use arc::MutexArc; /**************************************************************************** * Internals ****************************************************************************/ pub mod mutex; pub mod one; mod mpsc_intrusive; // Each waiting task receives on one of these. #[doc(hidden)] type WaitEnd = Port<()>; #[doc(hidden)] type SignalEnd = Chan<()>; // A doubly-ended queue of waiting tasks. #[doc(hidden)] struct WaitQueue { head: Port, tail: Chan } impl WaitQueue { fn new() -> WaitQueue { let (block_head, block_tail) = Chan::new(); WaitQueue { head: block_head, tail: block_tail } } // Signals one live task from the queue. fn signal(&self) -> bool { match self.head.try_recv() { comm::Data(ch) => { // Send a wakeup signal. If the waiter was killed, its port will // have closed. Keep trying until we get a live task. if ch.try_send(()) { true } else { self.signal() } } _ => false } } fn broadcast(&self) -> uint { let mut count = 0; loop { match self.head.try_recv() { comm::Data(ch) => { if ch.try_send(()) { count += 1; } } _ => break } } count } fn wait_end(&self) -> WaitEnd { let (wait_end, signal_end) = Chan::new(); assert!(self.tail.try_send(signal_end)); wait_end } } // The building-block used to make semaphores, mutexes, and rwlocks. struct SemInner { lock: mutex::Mutex, count: int, waiters: WaitQueue, // Can be either unit or another waitqueue. Some sems shouldn't come with // a condition variable attached, others should. blocked: Q } struct Sem(UnsafeArc>); #[doc(hidden)] impl Sem { fn new(count: int, q: Q) -> Sem { Sem(UnsafeArc::new(SemInner { count: count, waiters: WaitQueue::new(), blocked: q, lock: mutex::Mutex::new(), })) } unsafe fn with(&self, f: |&mut SemInner|) { let Sem(ref arc) = *self; let state = arc.get(); let _g = (*state).lock.lock(); f(cast::transmute(state)); } pub fn acquire(&self) { unsafe { let mut waiter_nobe = None; self.with(|state| { state.count -= 1; if state.count < 0 { // Create waiter nobe, enqueue ourself, and tell // outer scope we need to block. waiter_nobe = Some(state.waiters.wait_end()); } }); // Uncomment if you wish to test for sem races. Not valgrind-friendly. /* for _ in range(0, 1000) { task::deschedule(); } */ // Need to wait outside the exclusive. if waiter_nobe.is_some() { let _ = waiter_nobe.unwrap().recv(); } } } pub fn release(&self) { unsafe { self.with(|state| { state.count += 1; if state.count <= 0 { state.waiters.signal(); } }) } } pub fn access(&self, blk: || -> U) -> U { (|| { self.acquire(); blk() }).finally(|| { self.release(); }) } } #[doc(hidden)] impl Sem<~[WaitQueue]> { fn new_and_signal(count: int, num_condvars: uint) -> Sem<~[WaitQueue]> { let mut queues = ~[]; for _ in range(0, num_condvars) { queues.push(WaitQueue::new()); } Sem::new(count, queues) } } // FIXME(#3598): Want to use an Option down below, but we need a custom enum // that's not polymorphic to get around the fact that lifetimes are invariant // inside of type parameters. enum ReacquireOrderLock<'a> { Nothing, // c.c Just(&'a Semaphore), } /// A mechanism for atomic-unlock-and-deschedule blocking and signalling. pub struct Condvar<'a> { // The 'Sem' object associated with this condvar. This is the one that's // atomically-unlocked-and-descheduled upon and reacquired during wakeup. priv sem: &'a Sem<~[WaitQueue]>, // This is (can be) an extra semaphore which is held around the reacquire // operation on the first one. This is only used in cvars associated with // rwlocks, and is needed to ensure that, when a downgrader is trying to // hand off the access lock (which would be the first field, here), a 2nd // writer waking up from a cvar wait can't race with a reader to steal it, // See the comment in write_cond for more detail. priv order: ReacquireOrderLock<'a>, // Make sure condvars are non-copyable. priv nopod: marker::NoPod, } impl<'a> Condvar<'a> { /** * Atomically drop the associated lock, and block until a signal is sent. * * # Failure * A task which is killed (i.e., by linked failure with another task) * while waiting on a condition variable will wake up, fail, and unlock * the associated lock as it unwinds. */ pub fn wait(&self) { self.wait_on(0) } /** * As wait(), but can specify which of multiple condition variables to * wait on. Only a signal_on() or broadcast_on() with the same condvar_id * will wake this thread. * * The associated lock must have been initialised with an appropriate * number of condvars. The condvar_id must be between 0 and num_condvars-1 * or else this call will fail. * * wait() is equivalent to wait_on(0). */ pub fn wait_on(&self, condvar_id: uint) { let mut WaitEnd = None; let mut out_of_bounds = None; // Release lock, 'atomically' enqueuing ourselves in so doing. unsafe { self.sem.with(|state| { if condvar_id < state.blocked.len() { // Drop the lock. state.count += 1; if state.count <= 0 { state.waiters.signal(); } // Create waiter nobe, and enqueue ourself to // be woken up by a signaller. WaitEnd = Some(state.blocked[condvar_id].wait_end()); } else { out_of_bounds = Some(state.blocked.len()); } }) } // If deschedule checks start getting inserted anywhere, we can be // killed before or after enqueueing. check_cvar_bounds(out_of_bounds, condvar_id, "cond.wait_on()", || { // Unconditionally "block". (Might not actually block if a // signaller already sent -- I mean 'unconditionally' in contrast // with acquire().) (|| { let _ = WaitEnd.take_unwrap().recv(); }).finally(|| { // Reacquire the condvar. match self.order { Just(lock) => lock.access(|| self.sem.acquire()), Nothing => self.sem.acquire(), } }) }) } /// Wake up a blocked task. Returns false if there was no blocked task. pub fn signal(&self) -> bool { self.signal_on(0) } /// As signal, but with a specified condvar_id. See wait_on. pub fn signal_on(&self, condvar_id: uint) -> bool { unsafe { let mut out_of_bounds = None; let mut result = false; self.sem.with(|state| { if condvar_id < state.blocked.len() { result = state.blocked[condvar_id].signal(); } else { out_of_bounds = Some(state.blocked.len()); } }); check_cvar_bounds(out_of_bounds, condvar_id, "cond.signal_on()", || result) } } /// Wake up all blocked tasks. Returns the number of tasks woken. pub fn broadcast(&self) -> uint { self.broadcast_on(0) } /// As broadcast, but with a specified condvar_id. See wait_on. pub fn broadcast_on(&self, condvar_id: uint) -> uint { let mut out_of_bounds = None; let mut queue = None; unsafe { self.sem.with(|state| { if condvar_id < state.blocked.len() { // To avoid :broadcast_heavy, we make a new waitqueue, // swap it out with the old one, and broadcast on the // old one outside of the little-lock. queue = Some(replace(&mut state.blocked[condvar_id], WaitQueue::new())); } else { out_of_bounds = Some(state.blocked.len()); } }); check_cvar_bounds(out_of_bounds, condvar_id, "cond.signal_on()", || { queue.take_unwrap().broadcast() }) } } } // Checks whether a condvar ID was out of bounds, and fails if so, or does // something else next on success. #[inline] #[doc(hidden)] fn check_cvar_bounds( out_of_bounds: Option, id: uint, act: &str, blk: || -> U) -> U { match out_of_bounds { Some(0) => fail!("{} with illegal ID {} - this lock has no condvars!", act, id), Some(length) => fail!("{} with illegal ID {} - ID must be less than {}", act, id, length), None => blk() } } #[doc(hidden)] impl Sem<~[WaitQueue]> { // The only other places that condvars get built are rwlock.write_cond() // and rwlock_write_mode. pub fn access_cond(&self, blk: |c: &Condvar| -> U) -> U { self.access(|| { blk(&Condvar { sem: self, order: Nothing, nopod: marker::NoPod }) }) } } /**************************************************************************** * Semaphores ****************************************************************************/ /// A counting, blocking, bounded-waiting semaphore. pub struct Semaphore { priv sem: Sem<()> } impl Clone for Semaphore { /// Create a new handle to the semaphore. fn clone(&self) -> Semaphore { let Sem(ref lock) = self.sem; Semaphore { sem: Sem(lock.clone()) } } } impl Semaphore { /// Create a new semaphore with the specified count. pub fn new(count: int) -> Semaphore { Semaphore { sem: Sem::new(count, ()) } } /** * Acquire a resource represented by the semaphore. Blocks if necessary * until resource(s) become available. */ pub fn acquire(&self) { (&self.sem).acquire() } /** * Release a held resource represented by the semaphore. Wakes a blocked * contending task, if any exist. Won't block the caller. */ pub fn release(&self) { (&self.sem).release() } /// Run a function with ownership of one of the semaphore's resources. pub fn access(&self, blk: || -> U) -> U { (&self.sem).access(blk) } } /**************************************************************************** * Mutexes ****************************************************************************/ /** * A blocking, bounded-waiting, mutual exclusion lock with an associated * FIFO condition variable. * * # Failure * A task which fails while holding a mutex will unlock the mutex as it * unwinds. */ pub struct Mutex { priv sem: Sem<~[WaitQueue]> } impl Clone for Mutex { /// Create a new handle to the mutex. fn clone(&self) -> Mutex { let Sem(ref queue) = self.sem; Mutex { sem: Sem(queue.clone()) } } } impl Mutex { /// Create a new mutex, with one associated condvar. pub fn new() -> Mutex { Mutex::new_with_condvars(1) } /** * Create a new mutex, with a specified number of associated condvars. This * will allow calling wait_on/signal_on/broadcast_on with condvar IDs between * 0 and num_condvars-1. (If num_condvars is 0, lock_cond will be allowed but * any operations on the condvar will fail.) */ pub fn new_with_condvars(num_condvars: uint) -> Mutex { Mutex { sem: Sem::new_and_signal(1, num_condvars) } } /// Run a function with ownership of the mutex. pub fn lock(&self, blk: || -> U) -> U { (&self.sem).access(blk) } /// Run a function with ownership of the mutex and a handle to a condvar. pub fn lock_cond(&self, blk: |c: &Condvar| -> U) -> U { (&self.sem).access_cond(blk) } } /**************************************************************************** * Reader-writer locks ****************************************************************************/ // NB: Wikipedia - Readers-writers_problem#The_third_readers-writers_problem #[doc(hidden)] struct RWLockInner { // You might ask, "Why don't you need to use an atomic for the mode flag?" // This flag affects the behaviour of readers (for plain readers, they // assert on it; for downgraders, they use it to decide which mode to // unlock for). Consider that the flag is only unset when the very last // reader exits; therefore, it can never be unset during a reader/reader // (or reader/downgrader) race. // By the way, if we didn't care about the assert in the read unlock path, // we could instead store the mode flag in write_downgrade's stack frame, // and have the downgrade tokens store a reference to it. read_mode: bool, // The only way the count flag is ever accessed is with xadd. Since it is // a read-modify-write operation, multiple xadds on different cores will // always be consistent with respect to each other, so a monotonic/relaxed // consistency ordering suffices (i.e., no extra barriers are needed). // FIXME(#6598): The atomics module has no relaxed ordering flag, so I use // acquire/release orderings superfluously. Change these someday. read_count: atomics::AtomicUint, } /** * A blocking, no-starvation, reader-writer lock with an associated condvar. * * # Failure * A task which fails while holding an rwlock will unlock the rwlock as it * unwinds. */ pub struct RWLock { priv order_lock: Semaphore, priv access_lock: Sem<~[WaitQueue]>, priv state: UnsafeArc, } impl RWLock { /// Create a new rwlock, with one associated condvar. pub fn new() -> RWLock { RWLock::new_with_condvars(1) } /** * Create a new rwlock, with a specified number of associated condvars. * Similar to mutex_with_condvars. */ pub fn new_with_condvars(num_condvars: uint) -> RWLock { let state = UnsafeArc::new(RWLockInner { read_mode: false, read_count: atomics::AtomicUint::new(0), }); RWLock { order_lock: Semaphore::new(1), access_lock: Sem::new_and_signal(1, num_condvars), state: state, } } /// Create a new handle to the rwlock. pub fn clone(&self) -> RWLock { let Sem(ref access_lock_queue) = self.access_lock; RWLock { order_lock: (&(self.order_lock)).clone(), access_lock: Sem(access_lock_queue.clone()), state: self.state.clone() } } /** * Run a function with the rwlock in read mode. Calls to 'read' from other * tasks may run concurrently with this one. */ pub fn read(&self, blk: || -> U) -> U { unsafe { (&self.order_lock).access(|| { let state = &mut *self.state.get(); let old_count = state.read_count.fetch_add(1, atomics::Acquire); if old_count == 0 { (&self.access_lock).acquire(); state.read_mode = true; } }); (|| { blk() }).finally(|| { let state = &mut *self.state.get(); assert!(state.read_mode); let old_count = state.read_count.fetch_sub(1, atomics::Release); assert!(old_count > 0); if old_count == 1 { state.read_mode = false; // Note: this release used to be outside of a locked access // to exclusive-protected state. If this code is ever // converted back to such (instead of using atomic ops), // this access MUST NOT go inside the exclusive access. (&self.access_lock).release(); } }) } } /** * Run a function with the rwlock in write mode. No calls to 'read' or * 'write' from other tasks will run concurrently with this one. */ pub fn write(&self, blk: || -> U) -> U { (&self.order_lock).acquire(); (&self.access_lock).access(|| { (&self.order_lock).release(); blk() }) } /** * As write(), but also with a handle to a condvar. Waiting on this * condvar will allow readers and writers alike to take the rwlock before * the waiting task is signalled. (Note: a writer that waited and then * was signalled might reacquire the lock before other waiting writers.) */ pub fn write_cond(&self, blk: |c: &Condvar| -> U) -> U { // It's important to thread our order lock into the condvar, so that // when a cond.wait() wakes up, it uses it while reacquiring the // access lock. If we permitted a waking-up writer to "cut in line", // there could arise a subtle race when a downgrader attempts to hand // off the reader cloud lock to a waiting reader. This race is tested // in arc.rs (test_rw_write_cond_downgrade_read_race) and looks like: // T1 (writer) T2 (downgrader) T3 (reader) // [in cond.wait()] // [locks for writing] // [holds access_lock] // [is signalled, perhaps by // downgrader or a 4th thread] // tries to lock access(!) // lock order_lock // xadd read_count[0->1] // tries to lock access // [downgrade] // xadd read_count[1->2] // unlock access // Since T1 contended on the access lock before T3 did, it will steal // the lock handoff. Adding order_lock in the condvar reacquire path // solves this because T1 will hold order_lock while waiting on access, // which will cause T3 to have to wait until T1 finishes its write, // which can't happen until T2 finishes the downgrade-read entirely. // The astute reader will also note that making waking writers use the // order_lock is better for not starving readers. (&self.order_lock).acquire(); (&self.access_lock).access_cond(|cond| { (&self.order_lock).release(); let opt_lock = Just(&self.order_lock); blk(&Condvar { sem: cond.sem, order: opt_lock, nopod: marker::NoPod }) }) } /** * As write(), but with the ability to atomically 'downgrade' the lock; * i.e., to become a reader without letting other writers get the lock in * the meantime (such as unlocking and then re-locking as a reader would * do). The block takes a "write mode token" argument, which can be * transformed into a "read mode token" by calling downgrade(). Example: * * # Example * * ```rust * use sync::RWLock; * * let lock = RWLock::new(); * lock.write_downgrade(|mut write_token| { * write_token.write_cond(|condvar| { * // ... exclusive access ... * }); * let read_token = lock.downgrade(write_token); * read_token.read(|| { * // ... shared access ... * }) * }) * ``` */ pub fn write_downgrade(&self, blk: |v: RWLockWriteMode| -> U) -> U { // Implementation slightly different from the slicker 'write's above. // The exit path is conditional on whether the caller downgrades. (&self.order_lock).acquire(); (&self.access_lock).acquire(); (&self.order_lock).release(); (|| { blk(RWLockWriteMode { lock: self, nopod: marker::NoPod }) }).finally(|| { let writer_or_last_reader; // Check if we're releasing from read mode or from write mode. let state = unsafe { &mut *self.state.get() }; if state.read_mode { // Releasing from read mode. let old_count = state.read_count.fetch_sub(1, atomics::Release); assert!(old_count > 0); // Check if other readers remain. if old_count == 1 { // Case 1: Writer downgraded & was the last reader writer_or_last_reader = true; state.read_mode = false; } else { // Case 2: Writer downgraded & was not the last reader writer_or_last_reader = false; } } else { // Case 3: Writer did not downgrade writer_or_last_reader = true; } if writer_or_last_reader { // Nobody left inside; release the "reader cloud" lock. (&self.access_lock).release(); } }) } /// To be called inside of the write_downgrade block. pub fn downgrade<'a>(&self, token: RWLockWriteMode<'a>) -> RWLockReadMode<'a> { if !((self as *RWLock) == (token.lock as *RWLock)) { fail!("Can't downgrade() with a different rwlock's write_mode!"); } unsafe { let state = &mut *self.state.get(); assert!(!state.read_mode); state.read_mode = true; // If a reader attempts to enter at this point, both the // downgrader and reader will set the mode flag. This is fine. let old_count = state.read_count.fetch_add(1, atomics::Release); // If another reader was already blocking, we need to hand-off // the "reader cloud" access lock to them. if old_count != 0 { // Guaranteed not to let another writer in, because // another reader was holding the order_lock. Hence they // must be the one to get the access_lock (because all // access_locks are acquired with order_lock held). See // the comment in write_cond for more justification. (&self.access_lock).release(); } } RWLockReadMode { lock: token.lock, nopod: marker::NoPod } } } /// The "write permission" token used for rwlock.write_downgrade(). pub struct RWLockWriteMode<'a> { priv lock: &'a RWLock, priv nopod: marker::NoPod } /// The "read permission" token used for rwlock.write_downgrade(). pub struct RWLockReadMode<'a> { priv lock: &'a RWLock, priv nopod: marker::NoPod } impl<'a> RWLockWriteMode<'a> { /// Access the pre-downgrade rwlock in write mode. pub fn write(&self, blk: || -> U) -> U { blk() } /// Access the pre-downgrade rwlock in write mode with a condvar. pub fn write_cond(&self, blk: |c: &Condvar| -> U) -> U { // Need to make the condvar use the order lock when reacquiring the // access lock. See comment in RWLock::write_cond for why. blk(&Condvar { sem: &self.lock.access_lock, order: Just(&self.lock.order_lock), nopod: marker::NoPod }) } } impl<'a> RWLockReadMode<'a> { /// Access the post-downgrade rwlock in read mode. pub fn read(&self, blk: || -> U) -> U { blk() } } /// A barrier enables multiple tasks to synchronize the beginning /// of some computation. /// /// ```rust /// use sync::Barrier; /// /// let barrier = Barrier::new(10); /// for _ in range(0, 10) { /// let c = barrier.clone(); /// // The same messages will be printed together. /// // You will NOT see any interleaving. /// spawn(proc() { /// println!("before wait"); /// c.wait(); /// println!("after wait"); /// }); /// } /// ``` #[deriving(Clone)] pub struct Barrier { priv arc: MutexArc, priv num_tasks: uint, } // The inner state of a double barrier struct BarrierState { count: uint, generation_id: uint, } impl Barrier { /// Create a new barrier that can block a given number of tasks. pub fn new(num_tasks: uint) -> Barrier { Barrier { arc: MutexArc::new(BarrierState { count: 0, generation_id: 0, }), num_tasks: num_tasks, } } /// Block the current task until a certain number of tasks is waiting. pub fn wait(&self) { self.arc.access_cond(|state, cond| { let local_gen = state.generation_id; state.count += 1; if state.count < self.num_tasks { // We need a while loop to guard against spurious wakeups. // http://en.wikipedia.org/wiki/Spurious_wakeup while local_gen == state.generation_id && state.count < self.num_tasks { cond.wait(); } } else { state.count = 0; state.generation_id += 1; cond.broadcast(); } }); } } /**************************************************************************** * Tests ****************************************************************************/ #[cfg(test)] mod tests { use sync::{Semaphore, Mutex, RWLock, Barrier, Condvar}; use std::cast; use std::result; use std::task; use std::comm::Empty; /************************************************************************ * Semaphore tests ************************************************************************/ #[test] fn test_sem_acquire_release() { let s = Semaphore::new(1); s.acquire(); s.release(); s.acquire(); } #[test] fn test_sem_basic() { let s = Semaphore::new(1); s.access(|| { }) } #[test] fn test_sem_as_mutex() { let s = Semaphore::new(1); let s2 = s.clone(); task::spawn(proc() { s2.access(|| { for _ in range(0, 5) { task::deschedule(); } }) }); s.access(|| { for _ in range(0, 5) { task::deschedule(); } }) } #[test] fn test_sem_as_cvar() { /* Child waits and parent signals */ let (p, c) = Chan::new(); let s = Semaphore::new(0); let s2 = s.clone(); task::spawn(proc() { s2.acquire(); c.send(()); }); for _ in range(0, 5) { task::deschedule(); } s.release(); let _ = p.recv(); /* Parent waits and child signals */ let (p, c) = Chan::new(); let s = Semaphore::new(0); let s2 = s.clone(); task::spawn(proc() { for _ in range(0, 5) { task::deschedule(); } s2.release(); let _ = p.recv(); }); s.acquire(); c.send(()); } #[test] fn test_sem_multi_resource() { // Parent and child both get in the critical section at the same // time, and shake hands. let s = Semaphore::new(2); let s2 = s.clone(); let (p1,c1) = Chan::new(); let (p2,c2) = Chan::new(); task::spawn(proc() { s2.access(|| { let _ = p2.recv(); c1.send(()); }) }); s.access(|| { c2.send(()); let _ = p1.recv(); }) } #[test] fn test_sem_runtime_friendly_blocking() { // Force the runtime to schedule two threads on the same sched_loop. // When one blocks, it should schedule the other one. let s = Semaphore::new(1); let s2 = s.clone(); let (p, c) = Chan::new(); let mut child_data = Some((s2, c)); s.access(|| { let (s2, c) = child_data.take_unwrap(); task::spawn(proc() { c.send(()); s2.access(|| { }); c.send(()); }); let _ = p.recv(); // wait for child to come alive for _ in range(0, 5) { task::deschedule(); } // let the child contend }); let _ = p.recv(); // wait for child to be done } /************************************************************************ * Mutex tests ************************************************************************/ #[test] fn test_mutex_lock() { // Unsafely achieve shared state, and do the textbook // "load tmp = move ptr; inc tmp; store ptr <- tmp" dance. let (p, c) = Chan::new(); let m = Mutex::new(); let m2 = m.clone(); let mut sharedstate = ~0; { let ptr: *int = &*sharedstate; task::spawn(proc() { let sharedstate: &mut int = unsafe { cast::transmute(ptr) }; access_shared(sharedstate, &m2, 10); c.send(()); }); } { access_shared(sharedstate, &m, 10); let _ = p.recv(); assert_eq!(*sharedstate, 20); } fn access_shared(sharedstate: &mut int, m: &Mutex, n: uint) { for _ in range(0, n) { m.lock(|| { let oldval = *sharedstate; task::deschedule(); *sharedstate = oldval + 1; }) } } } #[test] fn test_mutex_cond_wait() { let m = Mutex::new(); // Child wakes up parent m.lock_cond(|cond| { let m2 = m.clone(); task::spawn(proc() { m2.lock_cond(|cond| { let woken = cond.signal(); assert!(woken); }) }); cond.wait(); }); // Parent wakes up child let (port,chan) = Chan::new(); let m3 = m.clone(); task::spawn(proc() { m3.lock_cond(|cond| { chan.send(()); cond.wait(); chan.send(()); }) }); let _ = port.recv(); // Wait until child gets in the mutex m.lock_cond(|cond| { let woken = cond.signal(); assert!(woken); }); let _ = port.recv(); // Wait until child wakes up } #[cfg(test)] fn test_mutex_cond_broadcast_helper(num_waiters: uint) { let m = Mutex::new(); let mut ports = ~[]; for _ in range(0, num_waiters) { let mi = m.clone(); let (port, chan) = Chan::new(); ports.push(port); task::spawn(proc() { mi.lock_cond(|cond| { chan.send(()); cond.wait(); chan.send(()); }) }); } // wait until all children get in the mutex for port in ports.mut_iter() { let _ = port.recv(); } m.lock_cond(|cond| { let num_woken = cond.broadcast(); assert_eq!(num_woken, num_waiters); }); // wait until all children wake up for port in ports.mut_iter() { let _ = port.recv(); } } #[test] fn test_mutex_cond_broadcast() { test_mutex_cond_broadcast_helper(12); } #[test] fn test_mutex_cond_broadcast_none() { test_mutex_cond_broadcast_helper(0); } #[test] fn test_mutex_cond_no_waiter() { let m = Mutex::new(); let m2 = m.clone(); let _ = task::try(proc() { m.lock_cond(|_x| { }) }); m2.lock_cond(|cond| { assert!(!cond.signal()); }) } #[test] fn test_mutex_killed_simple() { // Mutex must get automatically unlocked if failed/killed within. let m = Mutex::new(); let m2 = m.clone(); let result: result::Result<(), ~Any> = task::try(proc() { m2.lock(|| { fail!(); }) }); assert!(result.is_err()); // child task must have finished by the time try returns m.lock(|| { }) } #[ignore(reason = "linked failure")] #[test] fn test_mutex_killed_cond() { // Getting killed during cond wait must not corrupt the mutex while // unwinding (e.g. double unlock). let m = Mutex::new(); let m2 = m.clone(); let result: result::Result<(), ~Any> = task::try(proc() { let (p, c) = Chan::new(); task::spawn(proc() { // linked let _ = p.recv(); // wait for sibling to get in the mutex task::deschedule(); fail!(); }); m2.lock_cond(|cond| { c.send(()); // tell sibling go ahead cond.wait(); // block forever }) }); assert!(result.is_err()); // child task must have finished by the time try returns m.lock_cond(|cond| { let woken = cond.signal(); assert!(!woken); }) } #[ignore(reason = "linked failure")] #[test] fn test_mutex_killed_broadcast() { use std::unstable::finally::Finally; let m = Mutex::new(); let m2 = m.clone(); let (p, c) = Chan::new(); let result: result::Result<(), ~Any> = task::try(proc() { let mut sibling_convos = ~[]; for _ in range(0, 2) { let (p, c) = Chan::new(); sibling_convos.push(p); let mi = m2.clone(); // spawn sibling task task::spawn(proc() { // linked mi.lock_cond(|cond| { c.send(()); // tell sibling to go ahead (|| { cond.wait(); // block forever }).finally(|| { error!("task unwinding and sending"); c.send(()); error!("task unwinding and done sending"); }) }) }); } for p in sibling_convos.mut_iter() { let _ = p.recv(); // wait for sibling to get in the mutex } m2.lock(|| { }); c.send(sibling_convos); // let parent wait on all children fail!(); }); assert!(result.is_err()); // child task must have finished by the time try returns let mut r = p.recv(); for p in r.mut_iter() { p.recv(); } // wait on all its siblings m.lock_cond(|cond| { let woken = cond.broadcast(); assert_eq!(woken, 0); }) } #[test] fn test_mutex_cond_signal_on_0() { // Tests that signal_on(0) is equivalent to signal(). let m = Mutex::new(); m.lock_cond(|cond| { let m2 = m.clone(); task::spawn(proc() { m2.lock_cond(|cond| { cond.signal_on(0); }) }); cond.wait(); }) } #[test] #[ignore(reason = "linked failure?")] fn test_mutex_different_conds() { let result = task::try(proc() { let m = Mutex::new_with_condvars(2); let m2 = m.clone(); let (p, c) = Chan::new(); task::spawn(proc() { m2.lock_cond(|cond| { c.send(()); cond.wait_on(1); }) }); let _ = p.recv(); m.lock_cond(|cond| { if !cond.signal_on(0) { fail!(); // success; punt sibling awake. } }) }); assert!(result.is_err()); } #[test] fn test_mutex_no_condvars() { let result = task::try(proc() { let m = Mutex::new_with_condvars(0); m.lock_cond(|cond| { cond.wait(); }) }); assert!(result.is_err()); let result = task::try(proc() { let m = Mutex::new_with_condvars(0); m.lock_cond(|cond| { cond.signal(); }) }); assert!(result.is_err()); let result = task::try(proc() { let m = Mutex::new_with_condvars(0); m.lock_cond(|cond| { cond.broadcast(); }) }); assert!(result.is_err()); } /************************************************************************ * Reader/writer lock tests ************************************************************************/ #[cfg(test)] pub enum RWLockMode { Read, Write, Downgrade, DowngradeRead } #[cfg(test)] fn lock_rwlock_in_mode(x: &RWLock, mode: RWLockMode, blk: ||) { match mode { Read => x.read(blk), Write => x.write(blk), Downgrade => x.write_downgrade(|mode| { mode.write(|| { blk() }); }), DowngradeRead => x.write_downgrade(|mode| { let mode = x.downgrade(mode); mode.read(|| { blk() }); }), } } #[cfg(test)] fn test_rwlock_exclusion(x: &RWLock, mode1: RWLockMode, mode2: RWLockMode) { // Test mutual exclusion between readers and writers. Just like the // mutex mutual exclusion test, a ways above. let (p, c) = Chan::new(); let x2 = x.clone(); let mut sharedstate = ~0; { let ptr: *int = &*sharedstate; task::spawn(proc() { let sharedstate: &mut int = unsafe { cast::transmute(ptr) }; access_shared(sharedstate, &x2, mode1, 10); c.send(()); }); } { access_shared(sharedstate, x, mode2, 10); let _ = p.recv(); assert_eq!(*sharedstate, 20); } fn access_shared(sharedstate: &mut int, x: &RWLock, mode: RWLockMode, n: uint) { for _ in range(0, n) { lock_rwlock_in_mode(x, mode, || { let oldval = *sharedstate; task::deschedule(); *sharedstate = oldval + 1; }) } } } #[test] fn test_rwlock_readers_wont_modify_the_data() { test_rwlock_exclusion(&RWLock::new(), Read, Write); test_rwlock_exclusion(&RWLock::new(), Write, Read); test_rwlock_exclusion(&RWLock::new(), Read, Downgrade); test_rwlock_exclusion(&RWLock::new(), Downgrade, Read); } #[test] fn test_rwlock_writers_and_writers() { test_rwlock_exclusion(&RWLock::new(), Write, Write); test_rwlock_exclusion(&RWLock::new(), Write, Downgrade); test_rwlock_exclusion(&RWLock::new(), Downgrade, Write); test_rwlock_exclusion(&RWLock::new(), Downgrade, Downgrade); } #[cfg(test)] fn test_rwlock_handshake(x: &RWLock, mode1: RWLockMode, mode2: RWLockMode, make_mode2_go_first: bool) { // Much like sem_multi_resource. let x2 = x.clone(); let (p1, c1) = Chan::new(); let (p2, c2) = Chan::new(); task::spawn(proc() { if !make_mode2_go_first { let _ = p2.recv(); // parent sends to us once it locks, or ... } lock_rwlock_in_mode(&x2, mode2, || { if make_mode2_go_first { c1.send(()); // ... we send to it once we lock } let _ = p2.recv(); c1.send(()); }) }); if make_mode2_go_first { let _ = p1.recv(); // child sends to us once it locks, or ... } lock_rwlock_in_mode(x, mode1, || { if !make_mode2_go_first { c2.send(()); // ... we send to it once we lock } c2.send(()); let _ = p1.recv(); }) } #[test] fn test_rwlock_readers_and_readers() { test_rwlock_handshake(&RWLock::new(), Read, Read, false); // The downgrader needs to get in before the reader gets in, otherwise // they cannot end up reading at the same time. test_rwlock_handshake(&RWLock::new(), DowngradeRead, Read, false); test_rwlock_handshake(&RWLock::new(), Read, DowngradeRead, true); // Two downgrade_reads can never both end up reading at the same time. } #[test] fn test_rwlock_downgrade_unlock() { // Tests that downgrade can unlock the lock in both modes let x = RWLock::new(); lock_rwlock_in_mode(&x, Downgrade, || { }); test_rwlock_handshake(&x, Read, Read, false); let y = RWLock::new(); lock_rwlock_in_mode(&y, DowngradeRead, || { }); test_rwlock_exclusion(&y, Write, Write); } #[test] fn test_rwlock_read_recursive() { let x = RWLock::new(); x.read(|| { x.read(|| { }) }) } #[test] fn test_rwlock_cond_wait() { // As test_mutex_cond_wait above. let x = RWLock::new(); // Child wakes up parent x.write_cond(|cond| { let x2 = x.clone(); task::spawn(proc() { x2.write_cond(|cond| { let woken = cond.signal(); assert!(woken); }) }); cond.wait(); }); // Parent wakes up child let (port, chan) = Chan::new(); let x3 = x.clone(); task::spawn(proc() { x3.write_cond(|cond| { chan.send(()); cond.wait(); chan.send(()); }) }); let _ = port.recv(); // Wait until child gets in the rwlock x.read(|| { }); // Must be able to get in as a reader in the meantime x.write_cond(|cond| { // Or as another writer let woken = cond.signal(); assert!(woken); }); let _ = port.recv(); // Wait until child wakes up x.read(|| { }); // Just for good measure } #[cfg(test)] fn test_rwlock_cond_broadcast_helper(num_waiters: uint, dg1: bool, dg2: bool) { // Much like the mutex broadcast test. Downgrade-enabled. fn lock_cond(x: &RWLock, downgrade: bool, blk: |c: &Condvar|) { if downgrade { x.write_downgrade(|mode| { mode.write_cond(|c| { blk(c) }); }); } else { x.write_cond(|c| { blk(c) }); } } let x = RWLock::new(); let mut ports = ~[]; for _ in range(0, num_waiters) { let xi = x.clone(); let (port, chan) = Chan::new(); ports.push(port); task::spawn(proc() { lock_cond(&xi, dg1, |cond| { chan.send(()); cond.wait(); chan.send(()); }) }); } // wait until all children get in the mutex for port in ports.mut_iter() { let _ = port.recv(); } lock_cond(&x, dg2, |cond| { let num_woken = cond.broadcast(); assert_eq!(num_woken, num_waiters); }); // wait until all children wake up for port in ports.mut_iter() { let _ = port.recv(); } } #[test] fn test_rwlock_cond_broadcast() { test_rwlock_cond_broadcast_helper(0, true, true); test_rwlock_cond_broadcast_helper(0, true, false); test_rwlock_cond_broadcast_helper(0, false, true); test_rwlock_cond_broadcast_helper(0, false, false); test_rwlock_cond_broadcast_helper(12, true, true); test_rwlock_cond_broadcast_helper(12, true, false); test_rwlock_cond_broadcast_helper(12, false, true); test_rwlock_cond_broadcast_helper(12, false, false); } #[cfg(test)] fn rwlock_kill_helper(mode1: RWLockMode, mode2: RWLockMode) { // Mutex must get automatically unlocked if failed/killed within. let x = RWLock::new(); let x2 = x.clone(); let result: result::Result<(), ~Any> = task::try(proc() { lock_rwlock_in_mode(&x2, mode1, || { fail!(); }) }); assert!(result.is_err()); // child task must have finished by the time try returns lock_rwlock_in_mode(&x, mode2, || { }) } #[test] fn test_rwlock_reader_killed_writer() { rwlock_kill_helper(Read, Write); } #[test] fn test_rwlock_writer_killed_reader() { rwlock_kill_helper(Write, Read); } #[test] fn test_rwlock_reader_killed_reader() { rwlock_kill_helper(Read, Read); } #[test] fn test_rwlock_writer_killed_writer() { rwlock_kill_helper(Write, Write); } #[test] fn test_rwlock_kill_downgrader() { rwlock_kill_helper(Downgrade, Read); rwlock_kill_helper(Read, Downgrade); rwlock_kill_helper(Downgrade, Write); rwlock_kill_helper(Write, Downgrade); rwlock_kill_helper(DowngradeRead, Read); rwlock_kill_helper(Read, DowngradeRead); rwlock_kill_helper(DowngradeRead, Write); rwlock_kill_helper(Write, DowngradeRead); rwlock_kill_helper(DowngradeRead, Downgrade); rwlock_kill_helper(DowngradeRead, Downgrade); rwlock_kill_helper(Downgrade, DowngradeRead); rwlock_kill_helper(Downgrade, DowngradeRead); } #[test] #[should_fail] fn test_rwlock_downgrade_cant_swap() { // Tests that you can't downgrade with a different rwlock's token. let x = RWLock::new(); let y = RWLock::new(); x.write_downgrade(|xwrite| { let mut xopt = Some(xwrite); y.write_downgrade(|_ywrite| { y.downgrade(xopt.take_unwrap()); error!("oops, y.downgrade(x) should have failed!"); }) }) } /************************************************************************ * Barrier tests ************************************************************************/ #[test] fn test_barrier() { let barrier = Barrier::new(10); let (port, chan) = Chan::new(); for _ in range(0, 9) { let c = barrier.clone(); let chan = chan.clone(); spawn(proc() { c.wait(); chan.send(true); }); } // At this point, all spawned tasks should be blocked, // so we shouldn't get anything from the port assert!(match port.try_recv() { Empty => true, _ => false, }); barrier.wait(); // Now, the barrier is cleared and we should get data. for _ in range(0, 9) { port.recv(); } } }