From d6b3f1f231350798f019fdb09f6c4979fb23b8d4 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Sat, 22 Mar 2014 00:46:57 -0700 Subject: sync: Move the concurrent queue to using &self This commit also lifts it up a level in the module hierarchy in the soon-to-come reorganization of libsync. --- src/libsync/mpsc_intrusive.rs | 140 +++++++++++++++++++++++++++++++++++++ src/libsync/sync/mpsc_intrusive.rs | 139 ------------------------------------ 2 files changed, 140 insertions(+), 139 deletions(-) create mode 100644 src/libsync/mpsc_intrusive.rs delete mode 100644 src/libsync/sync/mpsc_intrusive.rs (limited to 'src/libsync') diff --git a/src/libsync/mpsc_intrusive.rs b/src/libsync/mpsc_intrusive.rs new file mode 100644 index 00000000000..12e8ca48ba1 --- /dev/null +++ b/src/libsync/mpsc_intrusive.rs @@ -0,0 +1,140 @@ +/* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved. + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO + * EVENT SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, + * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, + * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + * The views and conclusions contained in the software and documentation are + * those of the authors and should not be interpreted as representing official + * policies, either expressed or implied, of Dmitry Vyukov. + */ + +//! A mostly lock-free multi-producer, single consumer queue. +//! +//! This module implements an intrusive MPSC queue. This queue is incredibly +//! unsafe (due to use of unsafe pointers for nodes), and hence is not public. + +// http://www.1024cores.net/home/lock-free-algorithms +// /queues/intrusive-mpsc-node-based-queue + +use std::cast; +use std::sync::atomics; +use std::ty::Unsafe; + +// NB: all links are done as AtomicUint instead of AtomicPtr to allow for static +// initialization. + +pub struct Node { + next: atomics::AtomicUint, + data: T, +} + +pub struct DummyNode { + next: atomics::AtomicUint, +} + +pub struct Queue { + head: atomics::AtomicUint, + tail: Unsafe<*mut Node>, + stub: DummyNode, +} + +impl Queue { + pub fn new() -> Queue { + Queue { + head: atomics::AtomicUint::new(0), + tail: Unsafe::new(0 as *mut Node), + stub: DummyNode { + next: atomics::AtomicUint::new(0), + }, + } + } + + pub unsafe fn push(&self, node: *mut Node) { + (*node).next.store(0, atomics::Release); + let prev = self.head.swap(node as uint, atomics::AcqRel); + + // Note that this code is slightly modified to allow static + // initialization of these queues with rust's flavor of static + // initialization. + if prev == 0 { + self.stub.next.store(node as uint, atomics::Release); + } else { + let prev = prev as *mut Node; + (*prev).next.store(node as uint, atomics::Release); + } + } + + /// You'll note that the other MPSC queue in std::sync is non-intrusive and + /// returns a `PopResult` here to indicate when the queue is inconsistent. + /// An "inconsistent state" in the other queue means that a pusher has + /// pushed, but it hasn't finished linking the rest of the chain. + /// + /// This queue also suffers from this problem, but I currently haven't been + /// able to detangle when this actually happens. This code is translated + /// verbatim from the website above, and is more complicated than the + /// non-intrusive version. + /// + /// Right now consumers of this queue must be ready for this fact. Just + /// because `pop` returns `None` does not mean that there is not data + /// on the queue. + pub unsafe fn pop(&self) -> Option<*mut Node> { + let tail = *self.tail.get(); + let mut tail = if !tail.is_null() {tail} else { + cast::transmute(&self.stub) + }; + let mut next = (*tail).next(atomics::Relaxed); + if tail as uint == &self.stub as *DummyNode as uint { + if next.is_null() { + return None; + } + *self.tail.get() = next; + tail = next; + next = (*next).next(atomics::Relaxed); + } + if !next.is_null() { + *self.tail.get() = next; + return Some(tail); + } + let head = self.head.load(atomics::Acquire) as *mut Node; + if tail != head { + return None; + } + let stub = cast::transmute(&self.stub); + self.push(stub); + next = (*tail).next(atomics::Relaxed); + if !next.is_null() { + *self.tail.get() = next; + return Some(tail); + } + return None + } +} + +impl Node { + pub fn new(t: T) -> Node { + Node { + data: t, + next: atomics::AtomicUint::new(0), + } + } + pub unsafe fn next(&self, ord: atomics::Ordering) -> *mut Node { + cast::transmute::>(self.next.load(ord)) + } +} diff --git a/src/libsync/sync/mpsc_intrusive.rs b/src/libsync/sync/mpsc_intrusive.rs deleted file mode 100644 index 0f13a4980d9..00000000000 --- a/src/libsync/sync/mpsc_intrusive.rs +++ /dev/null @@ -1,139 +0,0 @@ -/* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved. - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. - * - * 2. Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * - * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED - * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF - * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO - * EVENT SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, - * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, - * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF - * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING - * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, - * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - * The views and conclusions contained in the software and documentation are - * those of the authors and should not be interpreted as representing official - * policies, either expressed or implied, of Dmitry Vyukov. - */ - -//! A mostly lock-free multi-producer, single consumer queue. -//! -//! This module implements an intrusive MPSC queue. This queue is incredibly -//! unsafe (due to use of unsafe pointers for nodes), and hence is not public. - -// http://www.1024cores.net/home/lock-free-algorithms -// /queues/intrusive-mpsc-node-based-queue - -use std::cast; -use std::sync::atomics; - -// NB: all links are done as AtomicUint instead of AtomicPtr to allow for static -// initialization. - -pub struct Node { - next: atomics::AtomicUint, - data: T, -} - -pub struct DummyNode { - next: atomics::AtomicUint, -} - -pub struct Queue { - head: atomics::AtomicUint, - tail: *mut Node, - stub: DummyNode, -} - -impl Queue { - pub fn new() -> Queue { - Queue { - head: atomics::AtomicUint::new(0), - tail: 0 as *mut Node, - stub: DummyNode { - next: atomics::AtomicUint::new(0), - }, - } - } - - pub unsafe fn push(&mut self, node: *mut Node) { - (*node).next.store(0, atomics::Release); - let prev = self.head.swap(node as uint, atomics::AcqRel); - - // Note that this code is slightly modified to allow static - // initialization of these queues with rust's flavor of static - // initialization. - if prev == 0 { - self.stub.next.store(node as uint, atomics::Release); - } else { - let prev = prev as *mut Node; - (*prev).next.store(node as uint, atomics::Release); - } - } - - /// You'll note that the other MPSC queue in std::sync is non-intrusive and - /// returns a `PopResult` here to indicate when the queue is inconsistent. - /// An "inconsistent state" in the other queue means that a pusher has - /// pushed, but it hasn't finished linking the rest of the chain. - /// - /// This queue also suffers from this problem, but I currently haven't been - /// able to detangle when this actually happens. This code is translated - /// verbatim from the website above, and is more complicated than the - /// non-intrusive version. - /// - /// Right now consumers of this queue must be ready for this fact. Just - /// because `pop` returns `None` does not mean that there is not data - /// on the queue. - pub unsafe fn pop(&mut self) -> Option<*mut Node> { - let tail = self.tail; - let mut tail = if !tail.is_null() {tail} else { - cast::transmute(&self.stub) - }; - let mut next = (*tail).next(atomics::Relaxed); - if tail as uint == &self.stub as *DummyNode as uint { - if next.is_null() { - return None; - } - self.tail = next; - tail = next; - next = (*next).next(atomics::Relaxed); - } - if !next.is_null() { - self.tail = next; - return Some(tail); - } - let head = self.head.load(atomics::Acquire) as *mut Node; - if tail != head { - return None; - } - let stub = cast::transmute(&self.stub); - self.push(stub); - next = (*tail).next(atomics::Relaxed); - if !next.is_null() { - self.tail = next; - return Some(tail); - } - return None - } -} - -impl Node { - pub fn new(t: T) -> Node { - Node { - data: t, - next: atomics::AtomicUint::new(0), - } - } - pub unsafe fn next(&mut self, ord: atomics::Ordering) -> *mut Node { - cast::transmute::>(self.next.load(ord)) - } -} -- cgit 1.4.1-3-g733a5 From 3572a30e7a4fec7f0bb0957fc72588757111f14e Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Sat, 22 Mar 2014 00:47:43 -0700 Subject: sync: Move the Mutex type to using &self This also uses the Unsafe type for any interior mutability in the type to avoid transmutes. --- src/libsync/mutex.rs | 579 ++++++++++++++++++++++++++++++++++++++++++++++ src/libsync/sync/mutex.rs | 558 -------------------------------------------- 2 files changed, 579 insertions(+), 558 deletions(-) create mode 100644 src/libsync/mutex.rs delete mode 100644 src/libsync/sync/mutex.rs (limited to 'src/libsync') diff --git a/src/libsync/mutex.rs b/src/libsync/mutex.rs new file mode 100644 index 00000000000..b01c82eb7ac --- /dev/null +++ b/src/libsync/mutex.rs @@ -0,0 +1,579 @@ +// Copyright 2014 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +//! A proper mutex implementation regardless of the "flavor of task" which is +//! acquiring the lock. + +// # Implementation of Rust mutexes +// +// Most answers to the question of "how do I use a mutex" are "use pthreads", +// but for Rust this isn't quite sufficient. Green threads cannot acquire an OS +// mutex because they can context switch among many OS threads, leading to +// deadlocks with other green threads. +// +// Another problem for green threads grabbing an OS mutex is that POSIX dictates +// that unlocking a mutex on a different thread from where it was locked is +// undefined behavior. Remember that green threads can migrate among OS threads, +// so this would mean that we would have to pin green threads to OS threads, +// which is less than ideal. +// +// ## Using deschedule/reawaken +// +// We already have primitives for descheduling/reawakening tasks, so they're the +// first obvious choice when implementing a mutex. The idea would be to have a +// concurrent queue that everyone is pushed on to, and then the owner of the +// mutex is the one popping from the queue. +// +// Unfortunately, this is not very performant for native tasks. The suspected +// reason for this is that each native thread is suspended on its own condition +// variable, unique from all the other threads. In this situation, the kernel +// has no idea what the scheduling semantics are of the user program, so all of +// the threads are distributed among all cores on the system. This ends up +// having very expensive wakeups of remote cores high up in the profile when +// handing off the mutex among native tasks. On the other hand, when using an OS +// mutex, the kernel knows that all native threads are contended on the same +// mutex, so they're in theory all migrated to a single core (fast context +// switching). +// +// ## Mixing implementations +// +// From that above information, we have two constraints. The first is that +// green threads can't touch os mutexes, and the second is that native tasks +// pretty much *must* touch an os mutex. +// +// As a compromise, the queueing implementation is used for green threads and +// the os mutex is used for native threads (why not have both?). This ends up +// leading to fairly decent performance for both native threads and green +// threads on various workloads (uncontended and contended). +// +// The crux of this implementation is an atomic work which is CAS'd on many +// times in order to manage a few flags about who's blocking where and whether +// it's locked or not. + +use std::kinds::marker; +use std::mem; +use std::rt::local::Local; +use std::rt::task::{BlockedTask, Task}; +use std::rt::thread::Thread; +use std::sync::atomics; +use std::ty::Unsafe; +use std::unstable::mutex; + +use q = mpsc_intrusive; + +pub static LOCKED: uint = 1 << 0; +pub static GREEN_BLOCKED: uint = 1 << 1; +pub static NATIVE_BLOCKED: uint = 1 << 2; + +/// A mutual exclusion primitive useful for protecting shared data +/// +/// This mutex is an implementation of a lock for all flavors of tasks which may +/// be grabbing. A common problem with green threads is that they cannot grab +/// locks (if they reschedule during the lock a contender could deadlock the +/// system), but this mutex does *not* suffer this problem. +/// +/// This mutex will properly block tasks waiting for the lock to become +/// available. The mutex can also be statically initialized or created via a +/// `new` constructor. +/// +/// # Example +/// +/// ```rust +/// use sync::mutex::Mutex; +/// +/// let m = Mutex::new(); +/// let guard = m.lock(); +/// // do some work +/// drop(guard); // unlock the lock +/// ``` +pub struct Mutex { + priv lock: StaticMutex, +} + +#[deriving(Eq, Show)] +enum Flavor { + Unlocked, + TryLockAcquisition, + GreenAcquisition, + NativeAcquisition, +} + +/// The static mutex type is provided to allow for static allocation of mutexes. +/// +/// Note that this is a separate type because using a Mutex correctly means that +/// it needs to have a destructor run. In Rust, statics are not allowed to have +/// destructors. As a result, a `StaticMutex` has one extra method when compared +/// to a `Mutex`, a `destroy` method. This method is unsafe to call, and +/// documentation can be found directly on the method. +/// +/// # Example +/// +/// ```rust +/// use sync::mutex::{StaticMutex, MUTEX_INIT}; +/// +/// static mut LOCK: StaticMutex = MUTEX_INIT; +/// +/// unsafe { +/// let _g = LOCK.lock(); +/// // do some productive work +/// } +/// // lock is unlocked here. +/// ``` +pub struct StaticMutex { + /// Current set of flags on this mutex + priv state: atomics::AtomicUint, + /// an OS mutex used by native threads + priv lock: mutex::StaticNativeMutex, + + /// Type of locking operation currently on this mutex + priv flavor: Unsafe, + /// uint-cast of the green thread waiting for this mutex + priv green_blocker: Unsafe, + /// uint-cast of the native thread waiting for this mutex + priv native_blocker: Unsafe, + + /// A concurrent mpsc queue used by green threads, along with a count used + /// to figure out when to dequeue and enqueue. + priv q: q::Queue, + priv green_cnt: atomics::AtomicUint, +} + +/// An RAII implementation of a "scoped lock" of a mutex. When this structure is +/// dropped (falls out of scope), the lock will be unlocked. +#[must_use] +pub struct Guard<'a> { + priv lock: &'a StaticMutex, +} + +/// Static initialization of a mutex. This constant can be used to initialize +/// other mutex constants. +pub static MUTEX_INIT: StaticMutex = StaticMutex { + lock: mutex::NATIVE_MUTEX_INIT, + state: atomics::INIT_ATOMIC_UINT, + flavor: Unsafe { value: Unlocked, marker1: marker::InvariantType }, + green_blocker: Unsafe { value: 0, marker1: marker::InvariantType }, + native_blocker: Unsafe { value: 0, marker1: marker::InvariantType }, + green_cnt: atomics::INIT_ATOMIC_UINT, + q: q::Queue { + head: atomics::INIT_ATOMIC_UINT, + tail: Unsafe { + value: 0 as *mut q::Node, + marker1: marker::InvariantType, + }, + stub: q::DummyNode { + next: atomics::INIT_ATOMIC_UINT, + } + } +}; + +impl StaticMutex { + /// Attempts to grab this lock, see `Mutex::try_lock` + pub fn try_lock<'a>(&'a self) -> Option> { + // Attempt to steal the mutex from an unlocked state. + // + // FIXME: this can mess up the fairness of the mutex, seems bad + match self.state.compare_and_swap(0, LOCKED, atomics::SeqCst) { + 0 => { + // After acquiring the mutex, we can safely access the inner + // fields. + let prev = unsafe { + mem::replace(&mut *self.flavor.get(), TryLockAcquisition) + }; + assert_eq!(prev, Unlocked); + Some(Guard::new(self)) + } + _ => None + } + } + + /// Acquires this lock, see `Mutex::lock` + pub fn lock<'a>(&'a self) -> Guard<'a> { + // First, attempt to steal the mutex from an unlocked state. The "fast + // path" needs to have as few atomic instructions as possible, and this + // one cmpxchg is already pretty expensive. + // + // FIXME: this can mess up the fairness of the mutex, seems bad + match self.try_lock() { + Some(guard) => return guard, + None => {} + } + + // After we've failed the fast path, then we delegate to the differnet + // locking protocols for green/native tasks. This will select two tasks + // to continue further (one native, one green). + let t: ~Task = Local::take(); + let can_block = t.can_block(); + let native_bit; + if can_block { + self.native_lock(t); + native_bit = NATIVE_BLOCKED; + } else { + self.green_lock(t); + native_bit = GREEN_BLOCKED; + } + + // After we've arbitrated among task types, attempt to re-acquire the + // lock (avoids a deschedule). This is very important to do in order to + // allow threads coming out of the native_lock function to try their + // best to not hit a cvar in deschedule. + let mut old = match self.state.compare_and_swap(0, LOCKED, + atomics::SeqCst) { + 0 => { + let flavor = if can_block { + NativeAcquisition + } else { + GreenAcquisition + }; + // We've acquired the lock, so this unsafe access to flavor is + // allowed. + unsafe { *self.flavor.get() = flavor; } + return Guard::new(self) + } + old => old, + }; + + // Alright, everything else failed. We need to deschedule ourselves and + // flag ourselves as waiting. Note that this case should only happen + // regularly in native/green contention. Due to try_lock and the header + // of lock stealing the lock, it's also possible for native/native + // contention to hit this location, but as less common. + let t: ~Task = Local::take(); + t.deschedule(1, |task| { + let task = unsafe { task.cast_to_uint() }; + + // These accesses are protected by the respective native/green + // mutexes which were acquired above. + let prev = if can_block { + unsafe { mem::replace(&mut *self.native_blocker.get(), task) } + } else { + unsafe { mem::replace(&mut *self.green_blocker.get(), task) } + }; + assert_eq!(prev, 0); + + loop { + assert_eq!(old & native_bit, 0); + // If the old state was locked, then we need to flag ourselves + // as blocking in the state. If the old state was unlocked, then + // we attempt to acquire the mutex. Everything here is a CAS + // loop that'll eventually make progress. + if old & LOCKED != 0 { + old = match self.state.compare_and_swap(old, + old | native_bit, + atomics::SeqCst) { + n if n == old => return Ok(()), + n => n + }; + } else { + assert_eq!(old, 0); + old = match self.state.compare_and_swap(old, + old | LOCKED, + atomics::SeqCst) { + n if n == old => { + // After acquiring the lock, we have access to the + // flavor field, and we've regained access to our + // respective native/green blocker field. + let prev = if can_block { + unsafe { + *self.native_blocker.get() = 0; + mem::replace(&mut *self.flavor.get(), + NativeAcquisition) + } + } else { + unsafe { + *self.green_blocker.get() = 0; + mem::replace(&mut *self.flavor.get(), + GreenAcquisition) + } + }; + assert_eq!(prev, Unlocked); + return Err(unsafe { + BlockedTask::cast_from_uint(task) + }) + } + n => n, + }; + } + } + }); + + Guard::new(self) + } + + // Tasks which can block are super easy. These tasks just call the blocking + // `lock()` function on an OS mutex + fn native_lock(&self, t: ~Task) { + Local::put(t); + unsafe { self.lock.lock_noguard(); } + } + + fn native_unlock(&self) { + unsafe { self.lock.unlock_noguard(); } + } + + fn green_lock(&self, t: ~Task) { + // Green threads flag their presence with an atomic counter, and if they + // fail to be the first to the mutex, they enqueue themselves on a + // concurrent internal queue with a stack-allocated node. + // + // FIXME: There isn't a cancellation currently of an enqueue, forcing + // the unlocker to spin for a bit. + if self.green_cnt.fetch_add(1, atomics::SeqCst) == 0 { + Local::put(t); + return + } + + let mut node = q::Node::new(0); + t.deschedule(1, |task| { + unsafe { + node.data = task.cast_to_uint(); + self.q.push(&mut node); + } + Ok(()) + }); + } + + fn green_unlock(&self) { + // If we're the only green thread, then no need to check the queue, + // otherwise the fixme above forces us to spin for a bit. + if self.green_cnt.fetch_sub(1, atomics::SeqCst) == 1 { return } + let node; + loop { + match unsafe { self.q.pop() } { + Some(t) => { node = t; break; } + None => Thread::yield_now(), + } + } + let task = unsafe { BlockedTask::cast_from_uint((*node).data) }; + task.wake().map(|t| t.reawaken()); + } + + fn unlock(&self) { + // Unlocking this mutex is a little tricky. We favor any task that is + // manually blocked (not in each of the separate locks) in order to help + // provide a little fairness (green threads will wake up the pending + // native thread and native threads will wake up the pending green + // thread). + // + // There's also the question of when we unlock the actual green/native + // locking halves as well. If we're waking up someone, then we can wait + // to unlock until we've acquired the task to wake up (we're guaranteed + // the mutex memory is still valid when there's contenders), but as soon + // as we don't find any contenders we must unlock the mutex, and *then* + // flag the mutex as unlocked. + // + // This flagging can fail, leading to another round of figuring out if a + // task needs to be woken, and in this case it's ok that the "mutex + // halves" are unlocked, we're just mainly dealing with the atomic state + // of the outer mutex. + let flavor = unsafe { mem::replace(&mut *self.flavor.get(), Unlocked) }; + + let mut state = self.state.load(atomics::SeqCst); + let mut unlocked = false; + let task; + loop { + assert!(state & LOCKED != 0); + if state & GREEN_BLOCKED != 0 { + self.unset(state, GREEN_BLOCKED); + task = unsafe { + *self.flavor.get() = GreenAcquisition; + let task = mem::replace(&mut *self.green_blocker.get(), 0); + BlockedTask::cast_from_uint(task) + }; + break; + } else if state & NATIVE_BLOCKED != 0 { + self.unset(state, NATIVE_BLOCKED); + task = unsafe { + *self.flavor.get() = NativeAcquisition; + let task = mem::replace(&mut *self.native_blocker.get(), 0); + BlockedTask::cast_from_uint(task) + }; + break; + } else { + assert_eq!(state, LOCKED); + if !unlocked { + match flavor { + GreenAcquisition => { self.green_unlock(); } + NativeAcquisition => { self.native_unlock(); } + TryLockAcquisition => {} + Unlocked => unreachable!() + } + unlocked = true; + } + match self.state.compare_and_swap(LOCKED, 0, atomics::SeqCst) { + LOCKED => return, + n => { state = n; } + } + } + } + if !unlocked { + match flavor { + GreenAcquisition => { self.green_unlock(); } + NativeAcquisition => { self.native_unlock(); } + TryLockAcquisition => {} + Unlocked => unreachable!() + } + } + + task.wake().map(|t| t.reawaken()); + } + + /// Loops around a CAS to unset the `bit` in `state` + fn unset(&self, mut state: uint, bit: uint) { + loop { + assert!(state & bit != 0); + let new = state ^ bit; + match self.state.compare_and_swap(state, new, atomics::SeqCst) { + n if n == state => break, + n => { state = n; } + } + } + } + + /// Deallocates resources associated with this static mutex. + /// + /// This method is unsafe because it provides no guarantees that there are + /// no active users of this mutex, and safety is not guaranteed if there are + /// active users of this mutex. + /// + /// This method is required to ensure that there are no memory leaks on + /// *all* platforms. It may be the case that some platforms do not leak + /// memory if this method is not called, but this is not guaranteed to be + /// true on all platforms. + pub unsafe fn destroy(&self) { + self.lock.destroy() + } +} + +impl Mutex { + /// Creates a new mutex in an unlocked state ready for use. + pub fn new() -> Mutex { + Mutex { + lock: StaticMutex { + state: atomics::AtomicUint::new(0), + flavor: Unsafe::new(Unlocked), + green_blocker: Unsafe::new(0), + native_blocker: Unsafe::new(0), + green_cnt: atomics::AtomicUint::new(0), + q: q::Queue::new(), + lock: unsafe { mutex::StaticNativeMutex::new() }, + } + } + } + + /// Attempts to acquire this lock. + /// + /// If the lock could not be acquired at this time, then `None` is returned. + /// Otherwise, an RAII guard is returned. The lock will be unlocked when the + /// guard is dropped. + /// + /// This function does not block. + pub fn try_lock<'a>(&'a self) -> Option> { + self.lock.try_lock() + } + + /// Acquires a mutex, blocking the current task until it is able to do so. + /// + /// This function will block the local task until it is available to acquire + /// the mutex. Upon returning, the task is the only task with the mutex + /// held. An RAII guard is returned to allow scoped unlock of the lock. When + /// the guard goes out of scope, the mutex will be unlocked. + pub fn lock<'a>(&'a self) -> Guard<'a> { self.lock.lock() } +} + +impl<'a> Guard<'a> { + fn new<'b>(lock: &'b StaticMutex) -> Guard<'b> { + if cfg!(debug) { + // once we've acquired a lock, it's ok to access the flavor + assert!(unsafe { *lock.flavor.get() != Unlocked }); + assert!(lock.state.load(atomics::SeqCst) & LOCKED != 0); + } + Guard { lock: lock } + } +} + +#[unsafe_destructor] +impl<'a> Drop for Guard<'a> { + #[inline] + fn drop(&mut self) { + self.lock.unlock(); + } +} + +impl Drop for Mutex { + fn drop(&mut self) { + // This is actually safe b/c we know that there is no further usage of + // this mutex (it's up to the user to arrange for a mutex to get + // dropped, that's not our job) + unsafe { self.lock.destroy() } + } +} + +#[cfg(test)] +mod test { + extern crate native; + use super::{Mutex, StaticMutex, MUTEX_INIT}; + + #[test] + fn smoke() { + let m = Mutex::new(); + drop(m.lock()); + drop(m.lock()); + } + + #[test] + fn smoke_static() { + static mut m: StaticMutex = MUTEX_INIT; + unsafe { + drop(m.lock()); + drop(m.lock()); + m.destroy(); + } + } + + #[test] + fn lots_and_lots() { + static mut m: StaticMutex = MUTEX_INIT; + static mut CNT: uint = 0; + static M: uint = 1000; + static N: uint = 3; + + fn inc() { + for _ in range(0, M) { + unsafe { + let _g = m.lock(); + CNT += 1; + } + } + } + + let (tx, rx) = channel(); + for _ in range(0, N) { + let tx2 = tx.clone(); + native::task::spawn(proc() { inc(); tx2.send(()); }); + let tx2 = tx.clone(); + spawn(proc() { inc(); tx2.send(()); }); + } + + drop(tx); + for _ in range(0, 2 * N) { + rx.recv(); + } + assert_eq!(unsafe {CNT}, M * N * 2); + unsafe { + m.destroy(); + } + } + + #[test] + fn trylock() { + let m = Mutex::new(); + assert!(m.try_lock().is_some()); + } +} diff --git a/src/libsync/sync/mutex.rs b/src/libsync/sync/mutex.rs deleted file mode 100644 index 9901cda423b..00000000000 --- a/src/libsync/sync/mutex.rs +++ /dev/null @@ -1,558 +0,0 @@ -// Copyright 2014 The Rust Project Developers. See the COPYRIGHT -// file at the top-level directory of this distribution and at -// http://rust-lang.org/COPYRIGHT. -// -// Licensed under the Apache License, Version 2.0 or the MIT license -// , at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -//! A proper mutex implementation regardless of the "flavor of task" which is -//! acquiring the lock. - -// # Implementation of Rust mutexes -// -// Most answers to the question of "how do I use a mutex" are "use pthreads", -// but for Rust this isn't quite sufficient. Green threads cannot acquire an OS -// mutex because they can context switch among many OS threads, leading to -// deadlocks with other green threads. -// -// Another problem for green threads grabbing an OS mutex is that POSIX dictates -// that unlocking a mutex on a different thread from where it was locked is -// undefined behavior. Remember that green threads can migrate among OS threads, -// so this would mean that we would have to pin green threads to OS threads, -// which is less than ideal. -// -// ## Using deschedule/reawaken -// -// We already have primitives for descheduling/reawakening tasks, so they're the -// first obvious choice when implementing a mutex. The idea would be to have a -// concurrent queue that everyone is pushed on to, and then the owner of the -// mutex is the one popping from the queue. -// -// Unfortunately, this is not very performant for native tasks. The suspected -// reason for this is that each native thread is suspended on its own condition -// variable, unique from all the other threads. In this situation, the kernel -// has no idea what the scheduling semantics are of the user program, so all of -// the threads are distributed among all cores on the system. This ends up -// having very expensive wakeups of remote cores high up in the profile when -// handing off the mutex among native tasks. On the other hand, when using an OS -// mutex, the kernel knows that all native threads are contended on the same -// mutex, so they're in theory all migrated to a single core (fast context -// switching). -// -// ## Mixing implementations -// -// From that above information, we have two constraints. The first is that -// green threads can't touch os mutexes, and the second is that native tasks -// pretty much *must* touch an os mutex. -// -// As a compromise, the queueing implementation is used for green threads and -// the os mutex is used for native threads (why not have both?). This ends up -// leading to fairly decent performance for both native threads and green -// threads on various workloads (uncontended and contended). -// -// The crux of this implementation is an atomic work which is CAS'd on many -// times in order to manage a few flags about who's blocking where and whether -// it's locked or not. - -use std::rt::local::Local; -use std::rt::task::{BlockedTask, Task}; -use std::rt::thread::Thread; -use std::sync::atomics; -use std::unstable::mutex; - -use q = sync::mpsc_intrusive; - -pub static LOCKED: uint = 1 << 0; -pub static GREEN_BLOCKED: uint = 1 << 1; -pub static NATIVE_BLOCKED: uint = 1 << 2; - -/// A mutual exclusion primitive useful for protecting shared data -/// -/// This mutex is an implementation of a lock for all flavors of tasks which may -/// be grabbing. A common problem with green threads is that they cannot grab -/// locks (if they reschedule during the lock a contender could deadlock the -/// system), but this mutex does *not* suffer this problem. -/// -/// This mutex will properly block tasks waiting for the lock to become -/// available. The mutex can also be statically initialized or created via a -/// `new` constructor. -/// -/// # Example -/// -/// ```rust -/// use sync::mutex::Mutex; -/// -/// let mut m = Mutex::new(); -/// let guard = m.lock(); -/// // do some work -/// drop(guard); // unlock the lock -/// ``` -pub struct Mutex { - priv lock: StaticMutex, -} - -#[deriving(Eq, Show)] -enum Flavor { - Unlocked, - TryLockAcquisition, - GreenAcquisition, - NativeAcquisition, -} - -/// The static mutex type is provided to allow for static allocation of mutexes. -/// -/// Note that this is a separate type because using a Mutex correctly means that -/// it needs to have a destructor run. In Rust, statics are not allowed to have -/// destructors. As a result, a `StaticMutex` has one extra method when compared -/// to a `Mutex`, a `destroy` method. This method is unsafe to call, and -/// documentation can be found directly on the method. -/// -/// # Example -/// -/// ```rust -/// use sync::mutex::{StaticMutex, MUTEX_INIT}; -/// -/// static mut LOCK: StaticMutex = MUTEX_INIT; -/// -/// unsafe { -/// let _g = LOCK.lock(); -/// // do some productive work -/// } -/// // lock is unlocked here. -/// ``` -pub struct StaticMutex { - /// Current set of flags on this mutex - priv state: atomics::AtomicUint, - /// Type of locking operation currently on this mutex - priv flavor: Flavor, - /// uint-cast of the green thread waiting for this mutex - priv green_blocker: uint, - /// uint-cast of the native thread waiting for this mutex - priv native_blocker: uint, - /// an OS mutex used by native threads - priv lock: mutex::StaticNativeMutex, - - /// A concurrent mpsc queue used by green threads, along with a count used - /// to figure out when to dequeue and enqueue. - priv q: q::Queue, - priv green_cnt: atomics::AtomicUint, -} - -/// An RAII implementation of a "scoped lock" of a mutex. When this structure is -/// dropped (falls out of scope), the lock will be unlocked. -#[must_use] -pub struct Guard<'a> { - priv lock: &'a mut StaticMutex, -} - -/// Static initialization of a mutex. This constant can be used to initialize -/// other mutex constants. -pub static MUTEX_INIT: StaticMutex = StaticMutex { - lock: mutex::NATIVE_MUTEX_INIT, - state: atomics::INIT_ATOMIC_UINT, - flavor: Unlocked, - green_blocker: 0, - native_blocker: 0, - green_cnt: atomics::INIT_ATOMIC_UINT, - q: q::Queue { - head: atomics::INIT_ATOMIC_UINT, - tail: 0 as *mut q::Node, - stub: q::DummyNode { - next: atomics::INIT_ATOMIC_UINT, - } - } -}; - -impl StaticMutex { - /// Attempts to grab this lock, see `Mutex::try_lock` - pub fn try_lock<'a>(&'a mut self) -> Option> { - // Attempt to steal the mutex from an unlocked state. - // - // FIXME: this can mess up the fairness of the mutex, seems bad - match self.state.compare_and_swap(0, LOCKED, atomics::SeqCst) { - 0 => { - assert!(self.flavor == Unlocked); - self.flavor = TryLockAcquisition; - Some(Guard::new(self)) - } - _ => None - } - } - - /// Acquires this lock, see `Mutex::lock` - pub fn lock<'a>(&'a mut self) -> Guard<'a> { - // First, attempt to steal the mutex from an unlocked state. The "fast - // path" needs to have as few atomic instructions as possible, and this - // one cmpxchg is already pretty expensive. - // - // FIXME: this can mess up the fairness of the mutex, seems bad - match self.state.compare_and_swap(0, LOCKED, atomics::SeqCst) { - 0 => { - assert!(self.flavor == Unlocked); - self.flavor = TryLockAcquisition; - return Guard::new(self) - } - _ => {} - } - - // After we've failed the fast path, then we delegate to the differnet - // locking protocols for green/native tasks. This will select two tasks - // to continue further (one native, one green). - let t: ~Task = Local::take(); - let can_block = t.can_block(); - let native_bit; - if can_block { - self.native_lock(t); - native_bit = NATIVE_BLOCKED; - } else { - self.green_lock(t); - native_bit = GREEN_BLOCKED; - } - - // After we've arbitrated among task types, attempt to re-acquire the - // lock (avoids a deschedule). This is very important to do in order to - // allow threads coming out of the native_lock function to try their - // best to not hit a cvar in deschedule. - let mut old = match self.state.compare_and_swap(0, LOCKED, - atomics::SeqCst) { - 0 => { - self.flavor = if can_block { - NativeAcquisition - } else { - GreenAcquisition - }; - return Guard::new(self) - } - old => old, - }; - - // Alright, everything else failed. We need to deschedule ourselves and - // flag ourselves as waiting. Note that this case should only happen - // regularly in native/green contention. Due to try_lock and the header - // of lock stealing the lock, it's also possible for native/native - // contention to hit this location, but as less common. - let t: ~Task = Local::take(); - t.deschedule(1, |task| { - let task = unsafe { task.cast_to_uint() }; - if can_block { - assert_eq!(self.native_blocker, 0); - self.native_blocker = task; - } else { - assert_eq!(self.green_blocker, 0); - self.green_blocker = task; - } - - loop { - assert_eq!(old & native_bit, 0); - // If the old state was locked, then we need to flag ourselves - // as blocking in the state. If the old state was unlocked, then - // we attempt to acquire the mutex. Everything here is a CAS - // loop that'll eventually make progress. - if old & LOCKED != 0 { - old = match self.state.compare_and_swap(old, - old | native_bit, - atomics::SeqCst) { - n if n == old => return Ok(()), - n => n - }; - } else { - assert_eq!(old, 0); - old = match self.state.compare_and_swap(old, - old | LOCKED, - atomics::SeqCst) { - n if n == old => { - assert_eq!(self.flavor, Unlocked); - if can_block { - self.native_blocker = 0; - self.flavor = NativeAcquisition; - } else { - self.green_blocker = 0; - self.flavor = GreenAcquisition; - } - return Err(unsafe { - BlockedTask::cast_from_uint(task) - }) - } - n => n, - }; - } - } - }); - - Guard::new(self) - } - - // Tasks which can block are super easy. These tasks just call the blocking - // `lock()` function on an OS mutex - fn native_lock(&mut self, t: ~Task) { - Local::put(t); - unsafe { self.lock.lock_noguard(); } - } - - fn native_unlock(&mut self) { - unsafe { self.lock.unlock_noguard(); } - } - - fn green_lock(&mut self, t: ~Task) { - // Green threads flag their presence with an atomic counter, and if they - // fail to be the first to the mutex, they enqueue themselves on a - // concurrent internal queue with a stack-allocated node. - // - // FIXME: There isn't a cancellation currently of an enqueue, forcing - // the unlocker to spin for a bit. - if self.green_cnt.fetch_add(1, atomics::SeqCst) == 0 { - Local::put(t); - return - } - - let mut node = q::Node::new(0); - t.deschedule(1, |task| { - unsafe { - node.data = task.cast_to_uint(); - self.q.push(&mut node); - } - Ok(()) - }); - } - - fn green_unlock(&mut self) { - // If we're the only green thread, then no need to check the queue, - // otherwise the fixme above forces us to spin for a bit. - if self.green_cnt.fetch_sub(1, atomics::SeqCst) == 1 { return } - let node; - loop { - match unsafe { self.q.pop() } { - Some(t) => { node = t; break; } - None => Thread::yield_now(), - } - } - let task = unsafe { BlockedTask::cast_from_uint((*node).data) }; - task.wake().map(|t| t.reawaken()); - } - - fn unlock(&mut self) { - // Unlocking this mutex is a little tricky. We favor any task that is - // manually blocked (not in each of the separate locks) in order to help - // provide a little fairness (green threads will wake up the pending - // native thread and native threads will wake up the pending green - // thread). - // - // There's also the question of when we unlock the actual green/native - // locking halves as well. If we're waking up someone, then we can wait - // to unlock until we've acquired the task to wake up (we're guaranteed - // the mutex memory is still valid when there's contenders), but as soon - // as we don't find any contenders we must unlock the mutex, and *then* - // flag the mutex as unlocked. - // - // This flagging can fail, leading to another round of figuring out if a - // task needs to be woken, and in this case it's ok that the "mutex - // halves" are unlocked, we're just mainly dealing with the atomic state - // of the outer mutex. - let flavor = self.flavor; - self.flavor = Unlocked; - - let mut state = self.state.load(atomics::SeqCst); - let mut unlocked = false; - let task; - loop { - assert!(state & LOCKED != 0); - if state & GREEN_BLOCKED != 0 { - self.unset(state, GREEN_BLOCKED); - task = unsafe { - BlockedTask::cast_from_uint(self.green_blocker) - }; - self.green_blocker = 0; - self.flavor = GreenAcquisition; - break; - } else if state & NATIVE_BLOCKED != 0 { - self.unset(state, NATIVE_BLOCKED); - task = unsafe { - BlockedTask::cast_from_uint(self.native_blocker) - }; - self.native_blocker = 0; - self.flavor = NativeAcquisition; - break; - } else { - assert_eq!(state, LOCKED); - if !unlocked { - match flavor { - GreenAcquisition => { self.green_unlock(); } - NativeAcquisition => { self.native_unlock(); } - TryLockAcquisition => {} - Unlocked => unreachable!() - } - unlocked = true; - } - match self.state.compare_and_swap(LOCKED, 0, atomics::SeqCst) { - LOCKED => return, - n => { state = n; } - } - } - } - if !unlocked { - match flavor { - GreenAcquisition => { self.green_unlock(); } - NativeAcquisition => { self.native_unlock(); } - TryLockAcquisition => {} - Unlocked => unreachable!() - } - } - - task.wake().map(|t| t.reawaken()); - } - - /// Loops around a CAS to unset the `bit` in `state` - fn unset(&mut self, mut state: uint, bit: uint) { - loop { - assert!(state & bit != 0); - let new = state ^ bit; - match self.state.compare_and_swap(state, new, atomics::SeqCst) { - n if n == state => break, - n => { state = n; } - } - } - } - - /// Deallocates resources associated with this static mutex. - /// - /// This method is unsafe because it provides no guarantees that there are - /// no active users of this mutex, and safety is not guaranteed if there are - /// active users of this mutex. - /// - /// This method is required to ensure that there are no memory leaks on - /// *all* platforms. It may be the case that some platforms do not leak - /// memory if this method is not called, but this is not guaranteed to be - /// true on all platforms. - pub unsafe fn destroy(&mut self) { - self.lock.destroy() - } -} - -impl Mutex { - /// Creates a new mutex in an unlocked state ready for use. - pub fn new() -> Mutex { - Mutex { - lock: StaticMutex { - state: atomics::AtomicUint::new(0), - flavor: Unlocked, - green_blocker: 0, - native_blocker: 0, - green_cnt: atomics::AtomicUint::new(0), - q: q::Queue::new(), - lock: unsafe { mutex::StaticNativeMutex::new() }, - } - } - } - - /// Attempts to acquire this lock. - /// - /// If the lock could not be acquired at this time, then `None` is returned. - /// Otherwise, an RAII guard is returned. The lock will be unlocked when the - /// guard is dropped. - /// - /// This function does not block. - pub fn try_lock<'a>(&'a mut self) -> Option> { - self.lock.try_lock() - } - - /// Acquires a mutex, blocking the current task until it is able to do so. - /// - /// This function will block the local task until it is available to acquire - /// the mutex. Upon returning, the task is the only task with the mutex - /// held. An RAII guard is returned to allow scoped unlock of the lock. When - /// the guard goes out of scope, the mutex will be unlocked. - pub fn lock<'a>(&'a mut self) -> Guard<'a> { self.lock.lock() } -} - -impl<'a> Guard<'a> { - fn new<'b>(lock: &'b mut StaticMutex) -> Guard<'b> { - if cfg!(debug) { - assert!(lock.flavor != Unlocked); - assert!(lock.state.load(atomics::SeqCst) & LOCKED != 0); - } - Guard { lock: lock } - } -} - -#[unsafe_destructor] -impl<'a> Drop for Guard<'a> { - #[inline] - fn drop(&mut self) { - self.lock.unlock(); - } -} - -impl Drop for Mutex { - fn drop(&mut self) { - // This is actually safe b/c we know that there is no further usage of - // this mutex (it's up to the user to arrange for a mutex to get - // dropped, that's not our job) - unsafe { self.lock.destroy() } - } -} - -#[cfg(test)] -mod test { - extern crate native; - use super::{Mutex, StaticMutex, MUTEX_INIT}; - - #[test] - fn smoke() { - let mut m = Mutex::new(); - drop(m.lock()); - drop(m.lock()); - } - - #[test] - fn smoke_static() { - static mut m: StaticMutex = MUTEX_INIT; - unsafe { - drop(m.lock()); - drop(m.lock()); - m.destroy(); - } - } - - #[test] - fn lots_and_lots() { - static mut m: StaticMutex = MUTEX_INIT; - static mut CNT: uint = 0; - static M: uint = 1000; - static N: uint = 3; - - fn inc() { - for _ in range(0, M) { - unsafe { - let _g = m.lock(); - CNT += 1; - } - } - } - - let (tx, rx) = channel(); - for _ in range(0, N) { - let tx2 = tx.clone(); - native::task::spawn(proc() { inc(); tx2.send(()); }); - let tx2 = tx.clone(); - spawn(proc() { inc(); tx2.send(()); }); - } - - drop(tx); - for _ in range(0, 2 * N) { - rx.recv(); - } - assert_eq!(unsafe {CNT}, M * N * 2); - unsafe { - m.destroy(); - } - } - - #[test] - fn trylock() { - let mut m = Mutex::new(); - assert!(m.try_lock().is_some()); - } -} -- cgit 1.4.1-3-g733a5 From 53e451f4106c0eb6614b4c534744e81c6100cbbd Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Sat, 22 Mar 2014 00:49:16 -0700 Subject: sync: Move Once to using &self Similarly to the rest of the previous commits, this moves the once primitive to using &self instead of &mut self for proper sharing among many threads now. --- src/libsync/one.rs | 168 ++++++++++++++++++++++++++++++++++++++++++++++++ src/libsync/sync/one.rs | 168 ------------------------------------------------ 2 files changed, 168 insertions(+), 168 deletions(-) create mode 100644 src/libsync/one.rs delete mode 100644 src/libsync/sync/one.rs (limited to 'src/libsync') diff --git a/src/libsync/one.rs b/src/libsync/one.rs new file mode 100644 index 00000000000..161f759ca2d --- /dev/null +++ b/src/libsync/one.rs @@ -0,0 +1,168 @@ +// Copyright 2014 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +//! A "once initialization" primitive +//! +//! This primitive is meant to be used to run one-time initialization. An +//! example use case would be for initializing an FFI library. + +use std::int; +use std::sync::atomics; + +use mutex::{StaticMutex, MUTEX_INIT}; + +/// A type which can be used to run a one-time global initialization. This type +/// is *unsafe* to use because it is built on top of the `Mutex` in this module. +/// It does not know whether the currently running task is in a green or native +/// context, and a blocking mutex should *not* be used under normal +/// circumstances on a green task. +/// +/// Despite its unsafety, it is often useful to have a one-time initialization +/// routine run for FFI bindings or related external functionality. This type +/// can only be statically constructed with the `ONCE_INIT` value. +/// +/// # Example +/// +/// ```rust +/// use sync::one::{Once, ONCE_INIT}; +/// +/// static mut START: Once = ONCE_INIT; +/// unsafe { +/// START.doit(|| { +/// // run initialization here +/// }); +/// } +/// ``` +pub struct Once { + priv mutex: StaticMutex, + priv cnt: atomics::AtomicInt, + priv lock_cnt: atomics::AtomicInt, +} + +/// Initialization value for static `Once` values. +pub static ONCE_INIT: Once = Once { + mutex: MUTEX_INIT, + cnt: atomics::INIT_ATOMIC_INT, + lock_cnt: atomics::INIT_ATOMIC_INT, +}; + +impl Once { + /// Perform an initialization routine once and only once. The given closure + /// will be executed if this is the first time `doit` has been called, and + /// otherwise the routine will *not* be invoked. + /// + /// This method will block the calling *os thread* if another initialization + /// routine is currently running. + /// + /// When this function returns, it is guaranteed that some initialization + /// has run and completed (it may not be the closure specified). + pub fn doit(&self, f: ||) { + // Implementation-wise, this would seem like a fairly trivial primitive. + // The stickler part is where our mutexes currently require an + // allocation, and usage of a `Once` should't leak this allocation. + // + // This means that there must be a deterministic destroyer of the mutex + // contained within (because it's not needed after the initialization + // has run). + // + // The general scheme here is to gate all future threads once + // initialization has completed with a "very negative" count, and to + // allow through threads to lock the mutex if they see a non negative + // count. For all threads grabbing the mutex, exactly one of them should + // be responsible for unlocking the mutex, and this should only be done + // once everyone else is done with the mutex. + // + // This atomicity is achieved by swapping a very negative value into the + // shared count when the initialization routine has completed. This will + // read the number of threads which will at some point attempt to + // acquire the mutex. This count is then squirreled away in a separate + // variable, and the last person on the way out of the mutex is then + // responsible for destroying the mutex. + // + // It is crucial that the negative value is swapped in *after* the + // initialization routine has completed because otherwise new threads + // calling `doit` will return immediately before the initialization has + // completed. + + let prev = self.cnt.fetch_add(1, atomics::SeqCst); + if prev < 0 { + // Make sure we never overflow, we'll never have int::MIN + // simultaneous calls to `doit` to make this value go back to 0 + self.cnt.store(int::MIN, atomics::SeqCst); + return + } + + // If the count is negative, then someone else finished the job, + // otherwise we run the job and record how many people will try to grab + // this lock + let guard = self.mutex.lock(); + if self.cnt.load(atomics::SeqCst) > 0 { + f(); + let prev = self.cnt.swap(int::MIN, atomics::SeqCst); + self.lock_cnt.store(prev, atomics::SeqCst); + } + drop(guard); + + // Last one out cleans up after everyone else, no leaks! + if self.lock_cnt.fetch_add(-1, atomics::SeqCst) == 1 { + unsafe { self.mutex.destroy() } + } + } +} + +#[cfg(test)] +mod test { + use super::{ONCE_INIT, Once}; + use std::task; + + #[test] + fn smoke_once() { + static mut o: Once = ONCE_INIT; + let mut a = 0; + unsafe { o.doit(|| a += 1); } + assert_eq!(a, 1); + unsafe { o.doit(|| a += 1); } + assert_eq!(a, 1); + } + + #[test] + fn stampede_once() { + static mut o: Once = ONCE_INIT; + static mut run: bool = false; + + let (tx, rx) = channel(); + for _ in range(0, 10) { + let tx = tx.clone(); + spawn(proc() { + for _ in range(0, 4) { task::deschedule() } + unsafe { + o.doit(|| { + assert!(!run); + run = true; + }); + assert!(run); + } + tx.send(()); + }); + } + + unsafe { + o.doit(|| { + assert!(!run); + run = true; + }); + assert!(run); + } + + for _ in range(0, 10) { + rx.recv(); + } + } +} diff --git a/src/libsync/sync/one.rs b/src/libsync/sync/one.rs deleted file mode 100644 index c5e83bed0ed..00000000000 --- a/src/libsync/sync/one.rs +++ /dev/null @@ -1,168 +0,0 @@ -// Copyright 2014 The Rust Project Developers. See the COPYRIGHT -// file at the top-level directory of this distribution and at -// http://rust-lang.org/COPYRIGHT. -// -// Licensed under the Apache License, Version 2.0 or the MIT license -// , at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -//! A "once initialization" primitive -//! -//! This primitive is meant to be used to run one-time initialization. An -//! example use case would be for initializing an FFI library. - -use std::int; -use std::sync::atomics; -use sync::mutex::{StaticMutex, MUTEX_INIT}; - -/// A type which can be used to run a one-time global initialization. This type -/// is *unsafe* to use because it is built on top of the `Mutex` in this module. -/// It does not know whether the currently running task is in a green or native -/// context, and a blocking mutex should *not* be used under normal -/// circumstances on a green task. -/// -/// Despite its unsafety, it is often useful to have a one-time initialization -/// routine run for FFI bindings or related external functionality. This type -/// can only be statically constructed with the `ONCE_INIT` value. -/// -/// # Example -/// -/// ```rust -/// use sync::one::{Once, ONCE_INIT}; -/// -/// static mut START: Once = ONCE_INIT; -/// unsafe { -/// START.doit(|| { -/// // run initialization here -/// }); -/// } -/// ``` -pub struct Once { - priv mutex: StaticMutex, - priv cnt: atomics::AtomicInt, - priv lock_cnt: atomics::AtomicInt, -} - -/// Initialization value for static `Once` values. -pub static ONCE_INIT: Once = Once { - mutex: MUTEX_INIT, - cnt: atomics::INIT_ATOMIC_INT, - lock_cnt: atomics::INIT_ATOMIC_INT, -}; - -impl Once { - /// Perform an initialization routine once and only once. The given closure - /// will be executed if this is the first time `doit` has been called, and - /// otherwise the routine will *not* be invoked. - /// - /// This method will block the calling *os thread* if another initialization - /// routine is currently running. - /// - /// When this function returns, it is guaranteed that some initialization - /// has run and completed (it may not be the closure specified). - pub fn doit(&mut self, f: ||) { - // Implementation-wise, this would seem like a fairly trivial primitive. - // The stickler part is where our mutexes currently require an - // allocation, and usage of a `Once` should't leak this allocation. - // - // This means that there must be a deterministic destroyer of the mutex - // contained within (because it's not needed after the initialization - // has run). - // - // The general scheme here is to gate all future threads once - // initialization has completed with a "very negative" count, and to - // allow through threads to lock the mutex if they see a non negative - // count. For all threads grabbing the mutex, exactly one of them should - // be responsible for unlocking the mutex, and this should only be done - // once everyone else is done with the mutex. - // - // This atomicity is achieved by swapping a very negative value into the - // shared count when the initialization routine has completed. This will - // read the number of threads which will at some point attempt to - // acquire the mutex. This count is then squirreled away in a separate - // variable, and the last person on the way out of the mutex is then - // responsible for destroying the mutex. - // - // It is crucial that the negative value is swapped in *after* the - // initialization routine has completed because otherwise new threads - // calling `doit` will return immediately before the initialization has - // completed. - - let prev = self.cnt.fetch_add(1, atomics::SeqCst); - if prev < 0 { - // Make sure we never overflow, we'll never have int::MIN - // simultaneous calls to `doit` to make this value go back to 0 - self.cnt.store(int::MIN, atomics::SeqCst); - return - } - - // If the count is negative, then someone else finished the job, - // otherwise we run the job and record how many people will try to grab - // this lock - { - let _guard = self.mutex.lock(); - if self.cnt.load(atomics::SeqCst) > 0 { - f(); - let prev = self.cnt.swap(int::MIN, atomics::SeqCst); - self.lock_cnt.store(prev, atomics::SeqCst); - } - } - - // Last one out cleans up after everyone else, no leaks! - if self.lock_cnt.fetch_add(-1, atomics::SeqCst) == 1 { - unsafe { self.mutex.destroy() } - } - } -} - -#[cfg(test)] -mod test { - use super::{ONCE_INIT, Once}; - use std::task; - - #[test] - fn smoke_once() { - static mut o: Once = ONCE_INIT; - let mut a = 0; - unsafe { o.doit(|| a += 1); } - assert_eq!(a, 1); - unsafe { o.doit(|| a += 1); } - assert_eq!(a, 1); - } - - #[test] - fn stampede_once() { - static mut o: Once = ONCE_INIT; - static mut run: bool = false; - - let (tx, rx) = channel(); - for _ in range(0, 10) { - let tx = tx.clone(); - spawn(proc() { - for _ in range(0, 4) { task::deschedule() } - unsafe { - o.doit(|| { - assert!(!run); - run = true; - }); - assert!(run); - } - tx.send(()); - }); - } - - unsafe { - o.doit(|| { - assert!(!run); - run = true; - }); - assert!(run); - } - - for _ in range(0, 10) { - rx.recv(); - } - } -} -- cgit 1.4.1-3-g733a5 From ae049e82f876e335b835edb674228090d84f811e Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Sat, 22 Mar 2014 00:50:19 -0700 Subject: sync: Rewrite the base primitives This commit rewrites the core primitives of the sync library: Mutex, RWLock, and Semaphore. These primitives now have updated, more modernized apis: * Guards are returned instead of locking with closures. All condition variables have moved inside the guards and extraneous methods have been removed. * Downgrading on an rwlock is now done through the guard instead of the rwlock itself. These types are meant to be general locks, not locks of an internal type (for external usage). New types will be introduced for locking shared data. --- src/libsync/raw.rs | 1119 +++++++++++++++++++++++++++++++++++++++ src/libsync/sync/mod.rs | 1330 ----------------------------------------------- 2 files changed, 1119 insertions(+), 1330 deletions(-) create mode 100644 src/libsync/raw.rs delete mode 100644 src/libsync/sync/mod.rs (limited to 'src/libsync') diff --git a/src/libsync/raw.rs b/src/libsync/raw.rs new file mode 100644 index 00000000000..36f0748fe71 --- /dev/null +++ b/src/libsync/raw.rs @@ -0,0 +1,1119 @@ +// Copyright 2012-2014 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +//! Raw concurrency primitives you know and love. +//! +//! These primitives are not recommended for general use, but are provided for +//! flavorful use-cases. It is recommended to use the types at the top of the +//! `sync` crate which wrap values directly and provide safer abstractions for +//! containing data. + +use std::cast; +use std::comm; +use std::kinds::marker; +use std::mem::replace; +use std::sync::atomics; +use std::unstable::finally::Finally; + +use mutex; + +/**************************************************************************** + * Internals + ****************************************************************************/ + +// Each waiting task receives on one of these. +type WaitEnd = Receiver<()>; +type SignalEnd = Sender<()>; +// A doubly-ended queue of waiting tasks. +struct WaitQueue { + head: Receiver, + tail: Sender, +} + +impl WaitQueue { + fn new() -> WaitQueue { + let (block_tail, block_head) = channel(); + WaitQueue { head: block_head, tail: block_tail } + } + + // Signals one live task from the queue. + fn signal(&self) -> bool { + match self.head.try_recv() { + comm::Data(ch) => { + // Send a wakeup signal. If the waiter was killed, its port will + // have closed. Keep trying until we get a live task. + if ch.try_send(()) { + true + } else { + self.signal() + } + } + _ => false + } + } + + fn broadcast(&self) -> uint { + let mut count = 0; + loop { + match self.head.try_recv() { + comm::Data(ch) => { + if ch.try_send(()) { + count += 1; + } + } + _ => break + } + } + count + } + + fn wait_end(&self) -> WaitEnd { + let (signal_end, wait_end) = channel(); + assert!(self.tail.try_send(signal_end)); + wait_end + } +} + +// The building-block used to make semaphores, mutexes, and rwlocks. +struct Sem { + lock: mutex::Mutex, + // n.b, we need Sem to be `Share`, but the WaitQueue type is not send/share + // (for good reason). We have an internal invariant on this semaphore, + // however, that the queue is never accessed outside of a locked + // context. For this reason, we shove these behind a pointer which will + // be inferred to be `Share`. + // + // FIXME: this requires an extra allocation, which is bad. + inner: *() +} + +struct SemInner { + count: int, + waiters: WaitQueue, + // Can be either unit or another waitqueue. Some sems shouldn't come with + // a condition variable attached, others should. + blocked: Q, +} + +#[must_use] +struct SemGuard<'a, Q> { + sem: &'a Sem, +} + +impl Sem { + fn new(count: int, q: Q) -> Sem { + let inner = unsafe { + cast::transmute(~SemInner { + waiters: WaitQueue::new(), + count: count, + blocked: q, + }) + }; + Sem { + lock: mutex::Mutex::new(), + inner: inner, + } + } + + unsafe fn with(&self, f: |&mut SemInner|) { + let _g = self.lock.lock(); + f(&mut *(self.inner as *mut SemInner)) + } + + pub fn acquire(&self) { + unsafe { + let mut waiter_nobe = None; + self.with(|state| { + state.count -= 1; + if state.count < 0 { + // Create waiter nobe, enqueue ourself, and tell + // outer scope we need to block. + waiter_nobe = Some(state.waiters.wait_end()); + } + }); + // Uncomment if you wish to test for sem races. Not + // valgrind-friendly. + /* for _ in range(0, 1000) { task::deschedule(); } */ + // Need to wait outside the exclusive. + if waiter_nobe.is_some() { + let _ = waiter_nobe.unwrap().recv(); + } + } + } + + pub fn release(&self) { + unsafe { + self.with(|state| { + state.count += 1; + if state.count <= 0 { + state.waiters.signal(); + } + }) + } + } + + pub fn access<'a>(&'a self) -> SemGuard<'a, Q> { + self.acquire(); + SemGuard { sem: self } + } +} + +#[unsafe_destructor] +impl Drop for Sem { + fn drop(&mut self) { + let _waiters: ~SemInner = unsafe { cast::transmute(self.inner) }; + self.inner = 0 as *(); + } +} + +#[unsafe_destructor] +impl<'a, Q: Send> Drop for SemGuard<'a, Q> { + fn drop(&mut self) { + self.sem.release(); + } +} + +impl Sem> { + fn new_and_signal(count: int, num_condvars: uint) -> Sem> { + let mut queues = Vec::new(); + for _ in range(0, num_condvars) { queues.push(WaitQueue::new()); } + Sem::new(count, queues) + } + + // The only other places that condvars get built are rwlock.write_cond() + // and rwlock_write_mode. + pub fn access_cond<'a>(&'a self) -> SemCondGuard<'a> { + SemCondGuard { + guard: self.access(), + cvar: Condvar { sem: self, order: Nothing, nopod: marker::NoPod }, + } + } +} + +// FIXME(#3598): Want to use an Option down below, but we need a custom enum +// that's not polymorphic to get around the fact that lifetimes are invariant +// inside of type parameters. +enum ReacquireOrderLock<'a> { + Nothing, // c.c + Just(&'a Semaphore), +} + +/// A mechanism for atomic-unlock-and-deschedule blocking and signalling. +pub struct Condvar<'a> { + // The 'Sem' object associated with this condvar. This is the one that's + // atomically-unlocked-and-descheduled upon and reacquired during wakeup. + priv sem: &'a Sem >, + // This is (can be) an extra semaphore which is held around the reacquire + // operation on the first one. This is only used in cvars associated with + // rwlocks, and is needed to ensure that, when a downgrader is trying to + // hand off the access lock (which would be the first field, here), a 2nd + // writer waking up from a cvar wait can't race with a reader to steal it, + // See the comment in write_cond for more detail. + priv order: ReacquireOrderLock<'a>, + // Make sure condvars are non-copyable. + priv nopod: marker::NoPod, +} + +impl<'a> Condvar<'a> { + /// Atomically drop the associated lock, and block until a signal is sent. + /// + /// # Failure + /// + /// A task which is killed while waiting on a condition variable will wake + /// up, fail, and unlock the associated lock as it unwinds. + pub fn wait(&self) { self.wait_on(0) } + + /// As wait(), but can specify which of multiple condition variables to + /// wait on. Only a signal_on() or broadcast_on() with the same condvar_id + /// will wake this thread. + /// + /// The associated lock must have been initialised with an appropriate + /// number of condvars. The condvar_id must be between 0 and num_condvars-1 + /// or else this call will fail. + /// + /// wait() is equivalent to wait_on(0). + pub fn wait_on(&self, condvar_id: uint) { + let mut wait_end = None; + let mut out_of_bounds = None; + // Release lock, 'atomically' enqueuing ourselves in so doing. + unsafe { + self.sem.with(|state| { + if condvar_id < state.blocked.len() { + // Drop the lock. + state.count += 1; + if state.count <= 0 { + state.waiters.signal(); + } + // Create waiter nobe, and enqueue ourself to + // be woken up by a signaller. + wait_end = Some(state.blocked.get(condvar_id).wait_end()); + } else { + out_of_bounds = Some(state.blocked.len()); + } + }) + } + + // If deschedule checks start getting inserted anywhere, we can be + // killed before or after enqueueing. + check_cvar_bounds(out_of_bounds, condvar_id, "cond.wait_on()", || { + // Unconditionally "block". (Might not actually block if a + // signaller already sent -- I mean 'unconditionally' in contrast + // with acquire().) + (|| { + let _ = wait_end.take_unwrap().recv(); + }).finally(|| { + // Reacquire the condvar. + match self.order { + Just(lock) => { + let _g = lock.access(); + self.sem.acquire(); + } + Nothing => self.sem.acquire(), + } + }) + }) + } + + /// Wake up a blocked task. Returns false if there was no blocked task. + pub fn signal(&self) -> bool { self.signal_on(0) } + + /// As signal, but with a specified condvar_id. See wait_on. + pub fn signal_on(&self, condvar_id: uint) -> bool { + unsafe { + let mut out_of_bounds = None; + let mut result = false; + self.sem.with(|state| { + if condvar_id < state.blocked.len() { + result = state.blocked.get(condvar_id).signal(); + } else { + out_of_bounds = Some(state.blocked.len()); + } + }); + check_cvar_bounds(out_of_bounds, + condvar_id, + "cond.signal_on()", + || result) + } + } + + /// Wake up all blocked tasks. Returns the number of tasks woken. + pub fn broadcast(&self) -> uint { self.broadcast_on(0) } + + /// As broadcast, but with a specified condvar_id. See wait_on. + pub fn broadcast_on(&self, condvar_id: uint) -> uint { + let mut out_of_bounds = None; + let mut queue = None; + unsafe { + self.sem.with(|state| { + if condvar_id < state.blocked.len() { + // To avoid :broadcast_heavy, we make a new waitqueue, + // swap it out with the old one, and broadcast on the + // old one outside of the little-lock. + queue = Some(replace(state.blocked.get_mut(condvar_id), + WaitQueue::new())); + } else { + out_of_bounds = Some(state.blocked.len()); + } + }); + check_cvar_bounds(out_of_bounds, + condvar_id, + "cond.signal_on()", + || { + queue.take_unwrap().broadcast() + }) + } + } +} + +// Checks whether a condvar ID was out of bounds, and fails if so, or does +// something else next on success. +#[inline] +fn check_cvar_bounds( + out_of_bounds: Option, + id: uint, + act: &str, + blk: || -> U) + -> U { + match out_of_bounds { + Some(0) => + fail!("{} with illegal ID {} - this lock has no condvars!", act, id), + Some(length) => + fail!("{} with illegal ID {} - ID must be less than {}", act, id, length), + None => blk() + } +} + +#[must_use] +struct SemCondGuard<'a> { + guard: SemGuard<'a, Vec>, + cvar: Condvar<'a>, +} + +/**************************************************************************** + * Semaphores + ****************************************************************************/ + +/// A counting, blocking, bounded-waiting semaphore. +pub struct Semaphore { + priv sem: Sem<()>, +} + +/// An RAII guard used to represent an acquired resource to a semaphore. When +/// dropped, this value will release the resource back to the semaphore. +#[must_use] +pub struct SemaphoreGuard<'a> { + priv guard: SemGuard<'a, ()>, +} + +impl Semaphore { + /// Create a new semaphore with the specified count. + pub fn new(count: int) -> Semaphore { + Semaphore { sem: Sem::new(count, ()) } + } + + /// Acquire a resource represented by the semaphore. Blocks if necessary + /// until resource(s) become available. + pub fn acquire(&self) { self.sem.acquire() } + + /// Release a held resource represented by the semaphore. Wakes a blocked + /// contending task, if any exist. Won't block the caller. + pub fn release(&self) { self.sem.release() } + + /// Acquire a resource of this semaphore, returning an RAII guard which will + /// release the resource when dropped. + pub fn access<'a>(&'a self) -> SemaphoreGuard<'a> { + SemaphoreGuard { guard: self.sem.access() } + } +} + +/**************************************************************************** + * Mutexes + ****************************************************************************/ + +/// A blocking, bounded-waiting, mutual exclusion lock with an associated +/// FIFO condition variable. +/// +/// # Failure +/// A task which fails while holding a mutex will unlock the mutex as it +/// unwinds. +pub struct Mutex { + priv sem: Sem>, +} + +/// An RAII structure which is used to gain access to a mutex's condition +/// variable. Additionally, when a value of this type is dropped, the +/// corresponding mutex is also unlocked. +#[must_use] +pub struct MutexGuard<'a> { + priv guard: SemGuard<'a, Vec>, + /// Inner condition variable which is connected to the outer mutex, and can + /// be used for atomic-unlock-and-deschedule. + cond: Condvar<'a>, +} + +impl Mutex { + /// Create a new mutex, with one associated condvar. + pub fn new() -> Mutex { Mutex::new_with_condvars(1) } + + /// Create a new mutex, with a specified number of associated condvars. This + /// will allow calling wait_on/signal_on/broadcast_on with condvar IDs + /// between 0 and num_condvars-1. (If num_condvars is 0, lock_cond will be + /// allowed but any operations on the condvar will fail.) + pub fn new_with_condvars(num_condvars: uint) -> Mutex { + Mutex { sem: Sem::new_and_signal(1, num_condvars) } + } + + /// Acquires ownership of this mutex, returning an RAII guard which will + /// unlock the mutex when dropped. The associated condition variable can + /// also be accessed through the returned guard. + pub fn lock<'a>(&'a self) -> MutexGuard<'a> { + let SemCondGuard { guard, cvar } = self.sem.access_cond(); + MutexGuard { guard: guard, cond: cvar } + } +} + +/**************************************************************************** + * Reader-writer locks + ****************************************************************************/ + +// NB: Wikipedia - Readers-writers_problem#The_third_readers-writers_problem + +/// A blocking, no-starvation, reader-writer lock with an associated condvar. +/// +/// # Failure +/// +/// A task which fails while holding an rwlock will unlock the rwlock as it +/// unwinds. +pub struct RWLock { + priv order_lock: Semaphore, + priv access_lock: Sem>, + + // The only way the count flag is ever accessed is with xadd. Since it is + // a read-modify-write operation, multiple xadds on different cores will + // always be consistent with respect to each other, so a monotonic/relaxed + // consistency ordering suffices (i.e., no extra barriers are needed). + // + // FIXME(#6598): The atomics module has no relaxed ordering flag, so I use + // acquire/release orderings superfluously. Change these someday. + priv read_count: atomics::AtomicUint, +} + +/// An RAII helper which is created by acquiring a read lock on an RWLock. When +/// dropped, this will unlock the RWLock. +#[must_use] +pub struct RWLockReadGuard<'a> { + priv lock: &'a RWLock, +} + +/// An RAII helper which is created by acquiring a write lock on an RWLock. When +/// dropped, this will unlock the RWLock. +/// +/// A value of this type can also be consumed to downgrade to a read-only lock. +#[must_use] +pub struct RWLockWriteGuard<'a> { + priv lock: &'a RWLock, + /// Inner condition variable that is connected to the write-mode of the + /// outer rwlock. + cond: Condvar<'a>, +} + +impl RWLock { + /// Create a new rwlock, with one associated condvar. + pub fn new() -> RWLock { RWLock::new_with_condvars(1) } + + /// Create a new rwlock, with a specified number of associated condvars. + /// Similar to mutex_with_condvars. + pub fn new_with_condvars(num_condvars: uint) -> RWLock { + RWLock { + order_lock: Semaphore::new(1), + access_lock: Sem::new_and_signal(1, num_condvars), + read_count: atomics::AtomicUint::new(0), + } + } + + /// Acquires a read-lock, returning an RAII guard that will unlock the lock + /// when dropped. Calls to 'read' from other tasks may run concurrently with + /// this one. + pub fn read<'a>(&'a self) -> RWLockReadGuard<'a> { + let _guard = self.order_lock.access(); + let old_count = self.read_count.fetch_add(1, atomics::Acquire); + if old_count == 0 { + self.access_lock.acquire(); + } + RWLockReadGuard { lock: self } + } + + /// Acquire a write-lock, returning an RAII guard that will unlock the lock + /// when dropped. No calls to 'read' or 'write' from other tasks will run + /// concurrently with this one. + /// + /// You can also downgrade a write to a read by calling the `downgrade` + /// method on the returned guard. Additionally, the guard will contain a + /// `Condvar` attached to this lock. + /// + /// # Example + /// + /// ```rust + /// use sync::raw::RWLock; + /// + /// let lock = RWLock::new(); + /// let write = lock.write(); + /// // ... exclusive access ... + /// let read = write.downgrade(); + /// // ... shared access ... + /// drop(read); + /// ``` + pub fn write<'a>(&'a self) -> RWLockWriteGuard<'a> { + let _g = self.order_lock.access(); + self.access_lock.acquire(); + + // It's important to thread our order lock into the condvar, so that + // when a cond.wait() wakes up, it uses it while reacquiring the + // access lock. If we permitted a waking-up writer to "cut in line", + // there could arise a subtle race when a downgrader attempts to hand + // off the reader cloud lock to a waiting reader. This race is tested + // in arc.rs (test_rw_write_cond_downgrade_read_race) and looks like: + // T1 (writer) T2 (downgrader) T3 (reader) + // [in cond.wait()] + // [locks for writing] + // [holds access_lock] + // [is signalled, perhaps by + // downgrader or a 4th thread] + // tries to lock access(!) + // lock order_lock + // xadd read_count[0->1] + // tries to lock access + // [downgrade] + // xadd read_count[1->2] + // unlock access + // Since T1 contended on the access lock before T3 did, it will steal + // the lock handoff. Adding order_lock in the condvar reacquire path + // solves this because T1 will hold order_lock while waiting on access, + // which will cause T3 to have to wait until T1 finishes its write, + // which can't happen until T2 finishes the downgrade-read entirely. + // The astute reader will also note that making waking writers use the + // order_lock is better for not starving readers. + RWLockWriteGuard { + lock: self, + cond: Condvar { + sem: &self.access_lock, + order: Just(&self.order_lock), + nopod: marker::NoPod, + } + } + } +} + +impl<'a> RWLockWriteGuard<'a> { + /// Consumes this write lock and converts it into a read lock. + pub fn downgrade(self) -> RWLockReadGuard<'a> { + let lock = self.lock; + // Don't run the destructor of the write guard, we're in charge of + // things from now on + unsafe { cast::forget(self) } + + let old_count = lock.read_count.fetch_add(1, atomics::Release); + // If another reader was already blocking, we need to hand-off + // the "reader cloud" access lock to them. + if old_count != 0 { + // Guaranteed not to let another writer in, because + // another reader was holding the order_lock. Hence they + // must be the one to get the access_lock (because all + // access_locks are acquired with order_lock held). See + // the comment in write_cond for more justification. + lock.access_lock.release(); + } + RWLockReadGuard { lock: lock } + } +} + +#[unsafe_destructor] +impl<'a> Drop for RWLockWriteGuard<'a> { + fn drop(&mut self) { + self.lock.access_lock.release(); + } +} + +#[unsafe_destructor] +impl<'a> Drop for RWLockReadGuard<'a> { + fn drop(&mut self) { + let old_count = self.lock.read_count.fetch_sub(1, atomics::Release); + assert!(old_count > 0); + if old_count == 1 { + // Note: this release used to be outside of a locked access + // to exclusive-protected state. If this code is ever + // converted back to such (instead of using atomic ops), + // this access MUST NOT go inside the exclusive access. + self.lock.access_lock.release(); + } + } +} + +/**************************************************************************** + * Tests + ****************************************************************************/ + +#[cfg(test)] +mod tests { + use arc::Arc; + use super::{Semaphore, Mutex, RWLock, Condvar}; + + use std::cast; + use std::result; + use std::task; + + /************************************************************************ + * Semaphore tests + ************************************************************************/ + #[test] + fn test_sem_acquire_release() { + let s = Semaphore::new(1); + s.acquire(); + s.release(); + s.acquire(); + } + #[test] + fn test_sem_basic() { + let s = Semaphore::new(1); + let _g = s.access(); + } + #[test] + fn test_sem_as_mutex() { + let s = Arc::new(Semaphore::new(1)); + let s2 = s.clone(); + task::spawn(proc() { + let _g = s2.access(); + for _ in range(0, 5) { task::deschedule(); } + }); + let _g = s.access(); + for _ in range(0, 5) { task::deschedule(); } + } + #[test] + fn test_sem_as_cvar() { + /* Child waits and parent signals */ + let (tx, rx) = channel(); + let s = Arc::new(Semaphore::new(0)); + let s2 = s.clone(); + task::spawn(proc() { + s2.acquire(); + tx.send(()); + }); + for _ in range(0, 5) { task::deschedule(); } + s.release(); + let _ = rx.recv(); + + /* Parent waits and child signals */ + let (tx, rx) = channel(); + let s = Arc::new(Semaphore::new(0)); + let s2 = s.clone(); + task::spawn(proc() { + for _ in range(0, 5) { task::deschedule(); } + s2.release(); + let _ = rx.recv(); + }); + s.acquire(); + tx.send(()); + } + #[test] + fn test_sem_multi_resource() { + // Parent and child both get in the critical section at the same + // time, and shake hands. + let s = Arc::new(Semaphore::new(2)); + let s2 = s.clone(); + let (tx1, rx1) = channel(); + let (tx2, rx2) = channel(); + task::spawn(proc() { + let _g = s2.access(); + let _ = rx2.recv(); + tx1.send(()); + }); + let _g = s.access(); + tx2.send(()); + let _ = rx1.recv(); + } + #[test] + fn test_sem_runtime_friendly_blocking() { + // Force the runtime to schedule two threads on the same sched_loop. + // When one blocks, it should schedule the other one. + let s = Arc::new(Semaphore::new(1)); + let s2 = s.clone(); + let (tx, rx) = channel(); + { + let _g = s.access(); + task::spawn(proc() { + tx.send(()); + drop(s2.access()); + tx.send(()); + }); + rx.recv(); // wait for child to come alive + for _ in range(0, 5) { task::deschedule(); } // let the child contend + } + rx.recv(); // wait for child to be done + } + /************************************************************************ + * Mutex tests + ************************************************************************/ + #[test] + fn test_mutex_lock() { + // Unsafely achieve shared state, and do the textbook + // "load tmp = move ptr; inc tmp; store ptr <- tmp" dance. + let (tx, rx) = channel(); + let m = Arc::new(Mutex::new()); + let m2 = m.clone(); + let mut sharedstate = ~0; + { + let ptr: *mut int = &mut *sharedstate; + task::spawn(proc() { + access_shared(ptr, &m2, 10); + tx.send(()); + }); + } + { + access_shared(&mut *sharedstate, &m, 10); + let _ = rx.recv(); + + assert_eq!(*sharedstate, 20); + } + + fn access_shared(sharedstate: *mut int, m: &Arc, n: uint) { + for _ in range(0, n) { + let _g = m.lock(); + let oldval = unsafe { *sharedstate }; + task::deschedule(); + unsafe { *sharedstate = oldval + 1; } + } + } + } + #[test] + fn test_mutex_cond_wait() { + let m = Arc::new(Mutex::new()); + + // Child wakes up parent + { + let lock = m.lock(); + let m2 = m.clone(); + task::spawn(proc() { + let lock = m2.lock(); + let woken = lock.cond.signal(); + assert!(woken); + }); + lock.cond.wait(); + } + // Parent wakes up child + let (tx, rx) = channel(); + let m3 = m.clone(); + task::spawn(proc() { + let lock = m3.lock(); + tx.send(()); + lock.cond.wait(); + tx.send(()); + }); + rx.recv(); // Wait until child gets in the mutex + { + let lock = m.lock(); + let woken = lock.cond.signal(); + assert!(woken); + } + rx.recv(); // Wait until child wakes up + } + + fn test_mutex_cond_broadcast_helper(num_waiters: uint) { + let m = Arc::new(Mutex::new()); + let mut rxs = Vec::new(); + + for _ in range(0, num_waiters) { + let mi = m.clone(); + let (tx, rx) = channel(); + rxs.push(rx); + task::spawn(proc() { + let lock = mi.lock(); + tx.send(()); + lock.cond.wait(); + tx.send(()); + }); + } + + // wait until all children get in the mutex + for rx in rxs.mut_iter() { rx.recv(); } + { + let lock = m.lock(); + let num_woken = lock.cond.broadcast(); + assert_eq!(num_woken, num_waiters); + } + // wait until all children wake up + for rx in rxs.mut_iter() { rx.recv(); } + } + #[test] + fn test_mutex_cond_broadcast() { + test_mutex_cond_broadcast_helper(12); + } + #[test] + fn test_mutex_cond_broadcast_none() { + test_mutex_cond_broadcast_helper(0); + } + #[test] + fn test_mutex_cond_no_waiter() { + let m = Arc::new(Mutex::new()); + let m2 = m.clone(); + let _ = task::try(proc() { + drop(m.lock()); + }); + let lock = m2.lock(); + assert!(!lock.cond.signal()); + } + #[test] + fn test_mutex_killed_simple() { + use std::any::Any; + + // Mutex must get automatically unlocked if failed/killed within. + let m = Arc::new(Mutex::new()); + let m2 = m.clone(); + + let result: result::Result<(), ~Any> = task::try(proc() { + let _lock = m2.lock(); + fail!(); + }); + assert!(result.is_err()); + // child task must have finished by the time try returns + drop(m.lock()); + } + #[test] + fn test_mutex_cond_signal_on_0() { + // Tests that signal_on(0) is equivalent to signal(). + let m = Arc::new(Mutex::new()); + let lock = m.lock(); + let m2 = m.clone(); + task::spawn(proc() { + let lock = m2.lock(); + lock.cond.signal_on(0); + }); + lock.cond.wait(); + } + #[test] + fn test_mutex_no_condvars() { + let result = task::try(proc() { + let m = Mutex::new_with_condvars(0); + m.lock().cond.wait(); + }); + assert!(result.is_err()); + let result = task::try(proc() { + let m = Mutex::new_with_condvars(0); + m.lock().cond.signal(); + }); + assert!(result.is_err()); + let result = task::try(proc() { + let m = Mutex::new_with_condvars(0); + m.lock().cond.broadcast(); + }); + assert!(result.is_err()); + } + /************************************************************************ + * Reader/writer lock tests + ************************************************************************/ + #[cfg(test)] + pub enum RWLockMode { Read, Write, Downgrade, DowngradeRead } + #[cfg(test)] + fn lock_rwlock_in_mode(x: &Arc, mode: RWLockMode, blk: ||) { + match mode { + Read => { let _g = x.read(); blk() } + Write => { let _g = x.write(); blk() } + Downgrade => { let _g = x.write(); blk() } + DowngradeRead => { let _g = x.write().downgrade(); blk() } + } + } + #[cfg(test)] + fn test_rwlock_exclusion(x: Arc, + mode1: RWLockMode, + mode2: RWLockMode) { + // Test mutual exclusion between readers and writers. Just like the + // mutex mutual exclusion test, a ways above. + let (tx, rx) = channel(); + let x2 = x.clone(); + let mut sharedstate = ~0; + { + let ptr: *int = &*sharedstate; + task::spawn(proc() { + let sharedstate: &mut int = + unsafe { cast::transmute(ptr) }; + access_shared(sharedstate, &x2, mode1, 10); + tx.send(()); + }); + } + { + access_shared(sharedstate, &x, mode2, 10); + let _ = rx.recv(); + + assert_eq!(*sharedstate, 20); + } + + fn access_shared(sharedstate: &mut int, x: &Arc, + mode: RWLockMode, n: uint) { + for _ in range(0, n) { + lock_rwlock_in_mode(x, mode, || { + let oldval = *sharedstate; + task::deschedule(); + *sharedstate = oldval + 1; + }) + } + } + } + #[test] + fn test_rwlock_readers_wont_modify_the_data() { + test_rwlock_exclusion(Arc::new(RWLock::new()), Read, Write); + test_rwlock_exclusion(Arc::new(RWLock::new()), Write, Read); + test_rwlock_exclusion(Arc::new(RWLock::new()), Read, Downgrade); + test_rwlock_exclusion(Arc::new(RWLock::new()), Downgrade, Read); + test_rwlock_exclusion(Arc::new(RWLock::new()), Write, DowngradeRead); + test_rwlock_exclusion(Arc::new(RWLock::new()), DowngradeRead, Write); + } + #[test] + fn test_rwlock_writers_and_writers() { + test_rwlock_exclusion(Arc::new(RWLock::new()), Write, Write); + test_rwlock_exclusion(Arc::new(RWLock::new()), Write, Downgrade); + test_rwlock_exclusion(Arc::new(RWLock::new()), Downgrade, Write); + test_rwlock_exclusion(Arc::new(RWLock::new()), Downgrade, Downgrade); + } + #[cfg(test)] + fn test_rwlock_handshake(x: Arc, + mode1: RWLockMode, + mode2: RWLockMode, + make_mode2_go_first: bool) { + // Much like sem_multi_resource. + let x2 = x.clone(); + let (tx1, rx1) = channel(); + let (tx2, rx2) = channel(); + task::spawn(proc() { + if !make_mode2_go_first { + rx2.recv(); // parent sends to us once it locks, or ... + } + lock_rwlock_in_mode(&x2, mode2, || { + if make_mode2_go_first { + tx1.send(()); // ... we send to it once we lock + } + rx2.recv(); + tx1.send(()); + }) + }); + if make_mode2_go_first { + rx1.recv(); // child sends to us once it locks, or ... + } + lock_rwlock_in_mode(&x, mode1, || { + if !make_mode2_go_first { + tx2.send(()); // ... we send to it once we lock + } + tx2.send(()); + rx1.recv(); + }) + } + #[test] + fn test_rwlock_readers_and_readers() { + test_rwlock_handshake(Arc::new(RWLock::new()), Read, Read, false); + // The downgrader needs to get in before the reader gets in, otherwise + // they cannot end up reading at the same time. + test_rwlock_handshake(Arc::new(RWLock::new()), DowngradeRead, Read, false); + test_rwlock_handshake(Arc::new(RWLock::new()), Read, DowngradeRead, true); + // Two downgrade_reads can never both end up reading at the same time. + } + #[test] + fn test_rwlock_downgrade_unlock() { + // Tests that downgrade can unlock the lock in both modes + let x = Arc::new(RWLock::new()); + lock_rwlock_in_mode(&x, Downgrade, || { }); + test_rwlock_handshake(x, Read, Read, false); + let y = Arc::new(RWLock::new()); + lock_rwlock_in_mode(&y, DowngradeRead, || { }); + test_rwlock_exclusion(y, Write, Write); + } + #[test] + fn test_rwlock_read_recursive() { + let x = RWLock::new(); + let _g1 = x.read(); + let _g2 = x.read(); + } + #[test] + fn test_rwlock_cond_wait() { + // As test_mutex_cond_wait above. + let x = Arc::new(RWLock::new()); + + // Child wakes up parent + { + let lock = x.write(); + let x2 = x.clone(); + task::spawn(proc() { + let lock = x2.write(); + assert!(lock.cond.signal()); + }); + lock.cond.wait(); + } + // Parent wakes up child + let (tx, rx) = channel(); + let x3 = x.clone(); + task::spawn(proc() { + let lock = x3.write(); + tx.send(()); + lock.cond.wait(); + tx.send(()); + }); + rx.recv(); // Wait until child gets in the rwlock + drop(x.read()); // Must be able to get in as a reader + { + let x = x.write(); + assert!(x.cond.signal()); + } + rx.recv(); // Wait until child wakes up + drop(x.read()); // Just for good measure + } + #[cfg(test)] + fn test_rwlock_cond_broadcast_helper(num_waiters: uint) { + // Much like the mutex broadcast test. Downgrade-enabled. + fn lock_cond(x: &Arc, blk: |c: &Condvar|) { + let lock = x.write(); + blk(&lock.cond); + } + + let x = Arc::new(RWLock::new()); + let mut rxs = Vec::new(); + + for _ in range(0, num_waiters) { + let xi = x.clone(); + let (tx, rx) = channel(); + rxs.push(rx); + task::spawn(proc() { + lock_cond(&xi, |cond| { + tx.send(()); + cond.wait(); + tx.send(()); + }) + }); + } + + // wait until all children get in the mutex + for rx in rxs.mut_iter() { let _ = rx.recv(); } + lock_cond(&x, |cond| { + let num_woken = cond.broadcast(); + assert_eq!(num_woken, num_waiters); + }); + // wait until all children wake up + for rx in rxs.mut_iter() { let _ = rx.recv(); } + } + #[test] + fn test_rwlock_cond_broadcast() { + test_rwlock_cond_broadcast_helper(0); + test_rwlock_cond_broadcast_helper(12); + } + #[cfg(test)] + fn rwlock_kill_helper(mode1: RWLockMode, mode2: RWLockMode) { + use std::any::Any; + + // Mutex must get automatically unlocked if failed/killed within. + let x = Arc::new(RWLock::new()); + let x2 = x.clone(); + + let result: result::Result<(), ~Any> = task::try(proc() { + lock_rwlock_in_mode(&x2, mode1, || { + fail!(); + }) + }); + assert!(result.is_err()); + // child task must have finished by the time try returns + lock_rwlock_in_mode(&x, mode2, || { }) + } + #[test] + fn test_rwlock_reader_killed_writer() { + rwlock_kill_helper(Read, Write); + } + #[test] + fn test_rwlock_writer_killed_reader() { + rwlock_kill_helper(Write, Read); + } + #[test] + fn test_rwlock_reader_killed_reader() { + rwlock_kill_helper(Read, Read); + } + #[test] + fn test_rwlock_writer_killed_writer() { + rwlock_kill_helper(Write, Write); + } + #[test] + fn test_rwlock_kill_downgrader() { + rwlock_kill_helper(Downgrade, Read); + rwlock_kill_helper(Read, Downgrade); + rwlock_kill_helper(Downgrade, Write); + rwlock_kill_helper(Write, Downgrade); + rwlock_kill_helper(DowngradeRead, Read); + rwlock_kill_helper(Read, DowngradeRead); + rwlock_kill_helper(DowngradeRead, Write); + rwlock_kill_helper(Write, DowngradeRead); + rwlock_kill_helper(DowngradeRead, Downgrade); + rwlock_kill_helper(DowngradeRead, Downgrade); + rwlock_kill_helper(Downgrade, DowngradeRead); + rwlock_kill_helper(Downgrade, DowngradeRead); + } +} diff --git a/src/libsync/sync/mod.rs b/src/libsync/sync/mod.rs deleted file mode 100644 index 2217706d4f0..00000000000 --- a/src/libsync/sync/mod.rs +++ /dev/null @@ -1,1330 +0,0 @@ -// Copyright 2012-2014 The Rust Project Developers. See the COPYRIGHT -// file at the top-level directory of this distribution and at -// http://rust-lang.org/COPYRIGHT. -// -// Licensed under the Apache License, Version 2.0 or the MIT license -// , at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -#[allow(missing_doc)]; - -/** - * The concurrency primitives you know and love. - * - * Maybe once we have a "core exports x only to std" mechanism, these can be - * in std. - */ - -use std::cast; -use std::comm; -use std::kinds::marker; -use std::mem::replace; -use std::sync::arc::UnsafeArc; -use std::sync::atomics; -use std::unstable::finally::Finally; - -use arc::MutexArc; - -/**************************************************************************** - * Internals - ****************************************************************************/ - -pub mod mutex; -pub mod one; -mod mpsc_intrusive; - -// Each waiting task receives on one of these. -#[doc(hidden)] -type WaitEnd = Receiver<()>; -#[doc(hidden)] -type SignalEnd = Sender<()>; -// A doubly-ended queue of waiting tasks. -#[doc(hidden)] -struct WaitQueue { head: Receiver, - tail: Sender } - -impl WaitQueue { - fn new() -> WaitQueue { - let (block_tail, block_head) = channel(); - WaitQueue { head: block_head, tail: block_tail } - } - - // Signals one live task from the queue. - fn signal(&self) -> bool { - match self.head.try_recv() { - comm::Data(ch) => { - // Send a wakeup signal. If the waiter was killed, its port will - // have closed. Keep trying until we get a live task. - if ch.try_send(()) { - true - } else { - self.signal() - } - } - _ => false - } - } - - fn broadcast(&self) -> uint { - let mut count = 0; - loop { - match self.head.try_recv() { - comm::Data(ch) => { - if ch.try_send(()) { - count += 1; - } - } - _ => break - } - } - count - } - - fn wait_end(&self) -> WaitEnd { - let (signal_end, wait_end) = channel(); - assert!(self.tail.try_send(signal_end)); - wait_end - } -} - -// The building-block used to make semaphores, mutexes, and rwlocks. -struct SemInner { - lock: mutex::Mutex, - count: int, - waiters: WaitQueue, - // Can be either unit or another waitqueue. Some sems shouldn't come with - // a condition variable attached, others should. - blocked: Q -} - -struct Sem(UnsafeArc>); - -#[doc(hidden)] -impl Sem { - fn new(count: int, q: Q) -> Sem { - Sem(UnsafeArc::new(SemInner { - count: count, - waiters: WaitQueue::new(), - blocked: q, - lock: mutex::Mutex::new(), - })) - } - - unsafe fn with(&self, f: |&mut SemInner|) { - let Sem(ref arc) = *self; - let state = arc.get(); - let _g = (*state).lock.lock(); - f(cast::transmute(state)); - } - - pub fn acquire(&self) { - unsafe { - let mut waiter_nobe = None; - self.with(|state| { - state.count -= 1; - if state.count < 0 { - // Create waiter nobe, enqueue ourself, and tell - // outer scope we need to block. - waiter_nobe = Some(state.waiters.wait_end()); - } - }); - // Uncomment if you wish to test for sem races. Not valgrind-friendly. - /* for _ in range(0, 1000) { task::deschedule(); } */ - // Need to wait outside the exclusive. - if waiter_nobe.is_some() { - let _ = waiter_nobe.unwrap().recv(); - } - } - } - - pub fn release(&self) { - unsafe { - self.with(|state| { - state.count += 1; - if state.count <= 0 { - state.waiters.signal(); - } - }) - } - } - - pub fn access(&self, blk: || -> U) -> U { - (|| { - self.acquire(); - blk() - }).finally(|| { - self.release(); - }) - } -} - -#[doc(hidden)] -impl Sem > { - fn new_and_signal(count: int, num_condvars: uint) - -> Sem > { - let mut queues = Vec::new(); - for _ in range(0, num_condvars) { queues.push(WaitQueue::new()); } - Sem::new(count, queues) - } -} - -// FIXME(#3598): Want to use an Option down below, but we need a custom enum -// that's not polymorphic to get around the fact that lifetimes are invariant -// inside of type parameters. -enum ReacquireOrderLock<'a> { - Nothing, // c.c - Just(&'a Semaphore), -} - -/// A mechanism for atomic-unlock-and-deschedule blocking and signalling. -pub struct Condvar<'a> { - // The 'Sem' object associated with this condvar. This is the one that's - // atomically-unlocked-and-descheduled upon and reacquired during wakeup. - priv sem: &'a Sem >, - // This is (can be) an extra semaphore which is held around the reacquire - // operation on the first one. This is only used in cvars associated with - // rwlocks, and is needed to ensure that, when a downgrader is trying to - // hand off the access lock (which would be the first field, here), a 2nd - // writer waking up from a cvar wait can't race with a reader to steal it, - // See the comment in write_cond for more detail. - priv order: ReacquireOrderLock<'a>, - // Make sure condvars are non-copyable. - priv nopod: marker::NoPod, -} - -impl<'a> Condvar<'a> { - /** - * Atomically drop the associated lock, and block until a signal is sent. - * - * # Failure - * A task which is killed (i.e., by linked failure with another task) - * while waiting on a condition variable will wake up, fail, and unlock - * the associated lock as it unwinds. - */ - pub fn wait(&self) { self.wait_on(0) } - - /** - * As wait(), but can specify which of multiple condition variables to - * wait on. Only a signal_on() or broadcast_on() with the same condvar_id - * will wake this thread. - * - * The associated lock must have been initialised with an appropriate - * number of condvars. The condvar_id must be between 0 and num_condvars-1 - * or else this call will fail. - * - * wait() is equivalent to wait_on(0). - */ - pub fn wait_on(&self, condvar_id: uint) { - let mut wait_end = None; - let mut out_of_bounds = None; - // Release lock, 'atomically' enqueuing ourselves in so doing. - unsafe { - self.sem.with(|state| { - if condvar_id < state.blocked.len() { - // Drop the lock. - state.count += 1; - if state.count <= 0 { - state.waiters.signal(); - } - // Create waiter nobe, and enqueue ourself to - // be woken up by a signaller. - wait_end = Some(state.blocked.get(condvar_id).wait_end()); - } else { - out_of_bounds = Some(state.blocked.len()); - } - }) - } - - // If deschedule checks start getting inserted anywhere, we can be - // killed before or after enqueueing. - check_cvar_bounds(out_of_bounds, condvar_id, "cond.wait_on()", || { - // Unconditionally "block". (Might not actually block if a - // signaller already sent -- I mean 'unconditionally' in contrast - // with acquire().) - (|| { - let _ = wait_end.take_unwrap().recv(); - }).finally(|| { - // Reacquire the condvar. - match self.order { - Just(lock) => lock.access(|| self.sem.acquire()), - Nothing => self.sem.acquire(), - } - }) - }) - } - - /// Wake up a blocked task. Returns false if there was no blocked task. - pub fn signal(&self) -> bool { self.signal_on(0) } - - /// As signal, but with a specified condvar_id. See wait_on. - pub fn signal_on(&self, condvar_id: uint) -> bool { - unsafe { - let mut out_of_bounds = None; - let mut result = false; - self.sem.with(|state| { - if condvar_id < state.blocked.len() { - result = state.blocked.get(condvar_id).signal(); - } else { - out_of_bounds = Some(state.blocked.len()); - } - }); - check_cvar_bounds(out_of_bounds, - condvar_id, - "cond.signal_on()", - || result) - } - } - - /// Wake up all blocked tasks. Returns the number of tasks woken. - pub fn broadcast(&self) -> uint { self.broadcast_on(0) } - - /// As broadcast, but with a specified condvar_id. See wait_on. - pub fn broadcast_on(&self, condvar_id: uint) -> uint { - let mut out_of_bounds = None; - let mut queue = None; - unsafe { - self.sem.with(|state| { - if condvar_id < state.blocked.len() { - // To avoid :broadcast_heavy, we make a new waitqueue, - // swap it out with the old one, and broadcast on the - // old one outside of the little-lock. - queue = Some(replace(state.blocked.get_mut(condvar_id), - WaitQueue::new())); - } else { - out_of_bounds = Some(state.blocked.len()); - } - }); - check_cvar_bounds(out_of_bounds, - condvar_id, - "cond.signal_on()", - || { - queue.take_unwrap().broadcast() - }) - } - } -} - -// Checks whether a condvar ID was out of bounds, and fails if so, or does -// something else next on success. -#[inline] -#[doc(hidden)] -fn check_cvar_bounds( - out_of_bounds: Option, - id: uint, - act: &str, - blk: || -> U) - -> U { - match out_of_bounds { - Some(0) => - fail!("{} with illegal ID {} - this lock has no condvars!", act, id), - Some(length) => - fail!("{} with illegal ID {} - ID must be less than {}", act, id, length), - None => blk() - } -} - -#[doc(hidden)] -impl Sem > { - // The only other places that condvars get built are rwlock.write_cond() - // and rwlock_write_mode. - pub fn access_cond(&self, blk: |c: &Condvar| -> U) -> U { - self.access(|| { - blk(&Condvar { - sem: self, - order: Nothing, - nopod: marker::NoPod - }) - }) - } -} - -/**************************************************************************** - * Semaphores - ****************************************************************************/ - -/// A counting, blocking, bounded-waiting semaphore. -pub struct Semaphore { priv sem: Sem<()> } - - -impl Clone for Semaphore { - /// Create a new handle to the semaphore. - fn clone(&self) -> Semaphore { - let Sem(ref lock) = self.sem; - Semaphore { sem: Sem(lock.clone()) } - } -} - -impl Semaphore { - /// Create a new semaphore with the specified count. - pub fn new(count: int) -> Semaphore { - Semaphore { sem: Sem::new(count, ()) } - } - - /** - * Acquire a resource represented by the semaphore. Blocks if necessary - * until resource(s) become available. - */ - pub fn acquire(&self) { (&self.sem).acquire() } - - /** - * Release a held resource represented by the semaphore. Wakes a blocked - * contending task, if any exist. Won't block the caller. - */ - pub fn release(&self) { (&self.sem).release() } - - /// Run a function with ownership of one of the semaphore's resources. - pub fn access(&self, blk: || -> U) -> U { (&self.sem).access(blk) } -} - -/**************************************************************************** - * Mutexes - ****************************************************************************/ - -/** - * A blocking, bounded-waiting, mutual exclusion lock with an associated - * FIFO condition variable. - * - * # Failure - * A task which fails while holding a mutex will unlock the mutex as it - * unwinds. - */ - -pub struct Mutex { priv sem: Sem > } -impl Clone for Mutex { - /// Create a new handle to the mutex. - fn clone(&self) -> Mutex { - let Sem(ref queue) = self.sem; - Mutex { sem: Sem(queue.clone()) } } -} - -impl Mutex { - /// Create a new mutex, with one associated condvar. - pub fn new() -> Mutex { Mutex::new_with_condvars(1) } - - /** - * Create a new mutex, with a specified number of associated condvars. This - * will allow calling wait_on/signal_on/broadcast_on with condvar IDs between - * 0 and num_condvars-1. (If num_condvars is 0, lock_cond will be allowed but - * any operations on the condvar will fail.) - */ - pub fn new_with_condvars(num_condvars: uint) -> Mutex { - Mutex { sem: Sem::new_and_signal(1, num_condvars) } - } - - - /// Run a function with ownership of the mutex. - pub fn lock(&self, blk: || -> U) -> U { - (&self.sem).access(blk) - } - - /// Run a function with ownership of the mutex and a handle to a condvar. - pub fn lock_cond(&self, blk: |c: &Condvar| -> U) -> U { - (&self.sem).access_cond(blk) - } -} - -/**************************************************************************** - * Reader-writer locks - ****************************************************************************/ - -// NB: Wikipedia - Readers-writers_problem#The_third_readers-writers_problem - -#[doc(hidden)] -struct RWLockInner { - // You might ask, "Why don't you need to use an atomic for the mode flag?" - // This flag affects the behaviour of readers (for plain readers, they - // assert on it; for downgraders, they use it to decide which mode to - // unlock for). Consider that the flag is only unset when the very last - // reader exits; therefore, it can never be unset during a reader/reader - // (or reader/downgrader) race. - // By the way, if we didn't care about the assert in the read unlock path, - // we could instead store the mode flag in write_downgrade's stack frame, - // and have the downgrade tokens store a reference to it. - read_mode: bool, - // The only way the count flag is ever accessed is with xadd. Since it is - // a read-modify-write operation, multiple xadds on different cores will - // always be consistent with respect to each other, so a monotonic/relaxed - // consistency ordering suffices (i.e., no extra barriers are needed). - // FIXME(#6598): The atomics module has no relaxed ordering flag, so I use - // acquire/release orderings superfluously. Change these someday. - read_count: atomics::AtomicUint, -} - -/** - * A blocking, no-starvation, reader-writer lock with an associated condvar. - * - * # Failure - * A task which fails while holding an rwlock will unlock the rwlock as it - * unwinds. - */ -pub struct RWLock { - priv order_lock: Semaphore, - priv access_lock: Sem >, - priv state: UnsafeArc, -} - -impl RWLock { - /// Create a new rwlock, with one associated condvar. - pub fn new() -> RWLock { RWLock::new_with_condvars(1) } - - /** - * Create a new rwlock, with a specified number of associated condvars. - * Similar to mutex_with_condvars. - */ - pub fn new_with_condvars(num_condvars: uint) -> RWLock { - let state = UnsafeArc::new(RWLockInner { - read_mode: false, - read_count: atomics::AtomicUint::new(0), - }); - RWLock { order_lock: Semaphore::new(1), - access_lock: Sem::new_and_signal(1, num_condvars), - state: state, } - } - - /// Create a new handle to the rwlock. - pub fn clone(&self) -> RWLock { - let Sem(ref access_lock_queue) = self.access_lock; - RWLock { order_lock: (&(self.order_lock)).clone(), - access_lock: Sem(access_lock_queue.clone()), - state: self.state.clone() } - } - - /** - * Run a function with the rwlock in read mode. Calls to 'read' from other - * tasks may run concurrently with this one. - */ - pub fn read(&self, blk: || -> U) -> U { - unsafe { - (&self.order_lock).access(|| { - let state = &mut *self.state.get(); - let old_count = state.read_count.fetch_add(1, atomics::Acquire); - if old_count == 0 { - (&self.access_lock).acquire(); - state.read_mode = true; - } - }); - (|| { - blk() - }).finally(|| { - let state = &mut *self.state.get(); - assert!(state.read_mode); - let old_count = state.read_count.fetch_sub(1, atomics::Release); - assert!(old_count > 0); - if old_count == 1 { - state.read_mode = false; - // Note: this release used to be outside of a locked access - // to exclusive-protected state. If this code is ever - // converted back to such (instead of using atomic ops), - // this access MUST NOT go inside the exclusive access. - (&self.access_lock).release(); - } - }) - } - } - - /** - * Run a function with the rwlock in write mode. No calls to 'read' or - * 'write' from other tasks will run concurrently with this one. - */ - pub fn write(&self, blk: || -> U) -> U { - (&self.order_lock).acquire(); - (&self.access_lock).access(|| { - (&self.order_lock).release(); - blk() - }) - } - - /** - * As write(), but also with a handle to a condvar. Waiting on this - * condvar will allow readers and writers alike to take the rwlock before - * the waiting task is signalled. (Note: a writer that waited and then - * was signalled might reacquire the lock before other waiting writers.) - */ - pub fn write_cond(&self, blk: |c: &Condvar| -> U) -> U { - // It's important to thread our order lock into the condvar, so that - // when a cond.wait() wakes up, it uses it while reacquiring the - // access lock. If we permitted a waking-up writer to "cut in line", - // there could arise a subtle race when a downgrader attempts to hand - // off the reader cloud lock to a waiting reader. This race is tested - // in arc.rs (test_rw_write_cond_downgrade_read_race) and looks like: - // T1 (writer) T2 (downgrader) T3 (reader) - // [in cond.wait()] - // [locks for writing] - // [holds access_lock] - // [is signalled, perhaps by - // downgrader or a 4th thread] - // tries to lock access(!) - // lock order_lock - // xadd read_count[0->1] - // tries to lock access - // [downgrade] - // xadd read_count[1->2] - // unlock access - // Since T1 contended on the access lock before T3 did, it will steal - // the lock handoff. Adding order_lock in the condvar reacquire path - // solves this because T1 will hold order_lock while waiting on access, - // which will cause T3 to have to wait until T1 finishes its write, - // which can't happen until T2 finishes the downgrade-read entirely. - // The astute reader will also note that making waking writers use the - // order_lock is better for not starving readers. - (&self.order_lock).acquire(); - (&self.access_lock).access_cond(|cond| { - (&self.order_lock).release(); - let opt_lock = Just(&self.order_lock); - blk(&Condvar { sem: cond.sem, order: opt_lock, - nopod: marker::NoPod }) - }) - } - - /** - * As write(), but with the ability to atomically 'downgrade' the lock; - * i.e., to become a reader without letting other writers get the lock in - * the meantime (such as unlocking and then re-locking as a reader would - * do). The block takes a "write mode token" argument, which can be - * transformed into a "read mode token" by calling downgrade(). Example: - * - * # Example - * - * ```rust - * use sync::RWLock; - * - * let lock = RWLock::new(); - * lock.write_downgrade(|mut write_token| { - * write_token.write_cond(|condvar| { - * // ... exclusive access ... - * }); - * let read_token = lock.downgrade(write_token); - * read_token.read(|| { - * // ... shared access ... - * }) - * }) - * ``` - */ - pub fn write_downgrade(&self, blk: |v: RWLockWriteMode| -> U) -> U { - // Implementation slightly different from the slicker 'write's above. - // The exit path is conditional on whether the caller downgrades. - (&self.order_lock).acquire(); - (&self.access_lock).acquire(); - (&self.order_lock).release(); - (|| { - blk(RWLockWriteMode { lock: self, nopod: marker::NoPod }) - }).finally(|| { - let writer_or_last_reader; - // Check if we're releasing from read mode or from write mode. - let state = unsafe { &mut *self.state.get() }; - if state.read_mode { - // Releasing from read mode. - let old_count = state.read_count.fetch_sub(1, atomics::Release); - assert!(old_count > 0); - // Check if other readers remain. - if old_count == 1 { - // Case 1: Writer downgraded & was the last reader - writer_or_last_reader = true; - state.read_mode = false; - } else { - // Case 2: Writer downgraded & was not the last reader - writer_or_last_reader = false; - } - } else { - // Case 3: Writer did not downgrade - writer_or_last_reader = true; - } - if writer_or_last_reader { - // Nobody left inside; release the "reader cloud" lock. - (&self.access_lock).release(); - } - }) - } - - /// To be called inside of the write_downgrade block. - pub fn downgrade<'a>(&self, token: RWLockWriteMode<'a>) - -> RWLockReadMode<'a> { - if !((self as *RWLock) == (token.lock as *RWLock)) { - fail!("Can't downgrade() with a different rwlock's write_mode!"); - } - unsafe { - let state = &mut *self.state.get(); - assert!(!state.read_mode); - state.read_mode = true; - // If a reader attempts to enter at this point, both the - // downgrader and reader will set the mode flag. This is fine. - let old_count = state.read_count.fetch_add(1, atomics::Release); - // If another reader was already blocking, we need to hand-off - // the "reader cloud" access lock to them. - if old_count != 0 { - // Guaranteed not to let another writer in, because - // another reader was holding the order_lock. Hence they - // must be the one to get the access_lock (because all - // access_locks are acquired with order_lock held). See - // the comment in write_cond for more justification. - (&self.access_lock).release(); - } - } - RWLockReadMode { lock: token.lock, nopod: marker::NoPod } - } -} - -/// The "write permission" token used for rwlock.write_downgrade(). - -pub struct RWLockWriteMode<'a> { priv lock: &'a RWLock, priv nopod: marker::NoPod } -/// The "read permission" token used for rwlock.write_downgrade(). -pub struct RWLockReadMode<'a> { priv lock: &'a RWLock, - priv nopod: marker::NoPod } - -impl<'a> RWLockWriteMode<'a> { - /// Access the pre-downgrade rwlock in write mode. - pub fn write(&self, blk: || -> U) -> U { blk() } - /// Access the pre-downgrade rwlock in write mode with a condvar. - pub fn write_cond(&self, blk: |c: &Condvar| -> U) -> U { - // Need to make the condvar use the order lock when reacquiring the - // access lock. See comment in RWLock::write_cond for why. - blk(&Condvar { sem: &self.lock.access_lock, - order: Just(&self.lock.order_lock), - nopod: marker::NoPod }) - } -} - -impl<'a> RWLockReadMode<'a> { - /// Access the post-downgrade rwlock in read mode. - pub fn read(&self, blk: || -> U) -> U { blk() } -} - -/// A barrier enables multiple tasks to synchronize the beginning -/// of some computation. -/// -/// ```rust -/// use sync::Barrier; -/// -/// let barrier = Barrier::new(10); -/// for _ in range(0, 10) { -/// let c = barrier.clone(); -/// // The same messages will be printed together. -/// // You will NOT see any interleaving. -/// spawn(proc() { -/// println!("before wait"); -/// c.wait(); -/// println!("after wait"); -/// }); -/// } -/// ``` -#[deriving(Clone)] -pub struct Barrier { - priv arc: MutexArc, - priv num_tasks: uint, -} - -// The inner state of a double barrier -struct BarrierState { - count: uint, - generation_id: uint, -} - -impl Barrier { - /// Create a new barrier that can block a given number of tasks. - pub fn new(num_tasks: uint) -> Barrier { - Barrier { - arc: MutexArc::new(BarrierState { - count: 0, - generation_id: 0, - }), - num_tasks: num_tasks, - } - } - - /// Block the current task until a certain number of tasks is waiting. - pub fn wait(&self) { - self.arc.access_cond(|state, cond| { - let local_gen = state.generation_id; - state.count += 1; - if state.count < self.num_tasks { - // We need a while loop to guard against spurious wakeups. - // http://en.wikipedia.org/wiki/Spurious_wakeup - while local_gen == state.generation_id && state.count < self.num_tasks { - cond.wait(); - } - } else { - state.count = 0; - state.generation_id += 1; - cond.broadcast(); - } - }); - } -} - -/**************************************************************************** - * Tests - ****************************************************************************/ - -#[cfg(test)] -mod tests { - use sync::{Semaphore, Mutex, RWLock, Barrier, Condvar}; - - use std::cast; - use std::result; - use std::task; - use std::comm::Empty; - - /************************************************************************ - * Semaphore tests - ************************************************************************/ - #[test] - fn test_sem_acquire_release() { - let s = Semaphore::new(1); - s.acquire(); - s.release(); - s.acquire(); - } - #[test] - fn test_sem_basic() { - let s = Semaphore::new(1); - s.access(|| { }) - } - #[test] - fn test_sem_as_mutex() { - let s = Semaphore::new(1); - let s2 = s.clone(); - task::spawn(proc() { - s2.access(|| { - for _ in range(0, 5) { task::deschedule(); } - }) - }); - s.access(|| { - for _ in range(0, 5) { task::deschedule(); } - }) - } - #[test] - fn test_sem_as_cvar() { - /* Child waits and parent signals */ - let (tx, rx) = channel(); - let s = Semaphore::new(0); - let s2 = s.clone(); - task::spawn(proc() { - s2.acquire(); - tx.send(()); - }); - for _ in range(0, 5) { task::deschedule(); } - s.release(); - let _ = rx.recv(); - - /* Parent waits and child signals */ - let (tx, rx) = channel(); - let s = Semaphore::new(0); - let s2 = s.clone(); - task::spawn(proc() { - for _ in range(0, 5) { task::deschedule(); } - s2.release(); - let _ = rx.recv(); - }); - s.acquire(); - tx.send(()); - } - #[test] - fn test_sem_multi_resource() { - // Parent and child both get in the critical section at the same - // time, and shake hands. - let s = Semaphore::new(2); - let s2 = s.clone(); - let (tx1, rx1) = channel(); - let (tx2, rx2) = channel(); - task::spawn(proc() { - s2.access(|| { - let _ = rx2.recv(); - tx1.send(()); - }) - }); - s.access(|| { - tx2.send(()); - let _ = rx1.recv(); - }) - } - #[test] - fn test_sem_runtime_friendly_blocking() { - // Force the runtime to schedule two threads on the same sched_loop. - // When one blocks, it should schedule the other one. - let s = Semaphore::new(1); - let s2 = s.clone(); - let (tx, rx) = channel(); - let mut child_data = Some((s2, tx)); - s.access(|| { - let (s2, tx) = child_data.take_unwrap(); - task::spawn(proc() { - tx.send(()); - s2.access(|| { }); - tx.send(()); - }); - let _ = rx.recv(); // wait for child to come alive - for _ in range(0, 5) { task::deschedule(); } // let the child contend - }); - let _ = rx.recv(); // wait for child to be done - } - /************************************************************************ - * Mutex tests - ************************************************************************/ - #[test] - fn test_mutex_lock() { - // Unsafely achieve shared state, and do the textbook - // "load tmp = move ptr; inc tmp; store ptr <- tmp" dance. - let (tx, rx) = channel(); - let m = Mutex::new(); - let m2 = m.clone(); - let mut sharedstate = ~0; - { - let ptr: *int = &*sharedstate; - task::spawn(proc() { - let sharedstate: &mut int = - unsafe { cast::transmute(ptr) }; - access_shared(sharedstate, &m2, 10); - tx.send(()); - }); - } - { - access_shared(sharedstate, &m, 10); - let _ = rx.recv(); - - assert_eq!(*sharedstate, 20); - } - - fn access_shared(sharedstate: &mut int, m: &Mutex, n: uint) { - for _ in range(0, n) { - m.lock(|| { - let oldval = *sharedstate; - task::deschedule(); - *sharedstate = oldval + 1; - }) - } - } - } - #[test] - fn test_mutex_cond_wait() { - let m = Mutex::new(); - - // Child wakes up parent - m.lock_cond(|cond| { - let m2 = m.clone(); - task::spawn(proc() { - m2.lock_cond(|cond| { - let woken = cond.signal(); - assert!(woken); - }) - }); - cond.wait(); - }); - // Parent wakes up child - let (tx, rx) = channel(); - let m3 = m.clone(); - task::spawn(proc() { - m3.lock_cond(|cond| { - tx.send(()); - cond.wait(); - tx.send(()); - }) - }); - let _ = rx.recv(); // Wait until child gets in the mutex - m.lock_cond(|cond| { - let woken = cond.signal(); - assert!(woken); - }); - let _ = rx.recv(); // Wait until child wakes up - } - #[cfg(test)] - fn test_mutex_cond_broadcast_helper(num_waiters: uint) { - let m = Mutex::new(); - let mut rxs = vec!(); - - for _ in range(0, num_waiters) { - let mi = m.clone(); - let (tx, rx) = channel(); - rxs.push(rx); - task::spawn(proc() { - mi.lock_cond(|cond| { - tx.send(()); - cond.wait(); - tx.send(()); - }) - }); - } - - // wait until all children get in the mutex - for rx in rxs.mut_iter() { let _ = rx.recv(); } - m.lock_cond(|cond| { - let num_woken = cond.broadcast(); - assert_eq!(num_woken, num_waiters); - }); - // wait until all children wake up - for rx in rxs.mut_iter() { let _ = rx.recv(); } - } - #[test] - fn test_mutex_cond_broadcast() { - test_mutex_cond_broadcast_helper(12); - } - #[test] - fn test_mutex_cond_broadcast_none() { - test_mutex_cond_broadcast_helper(0); - } - #[test] - fn test_mutex_cond_no_waiter() { - let m = Mutex::new(); - let m2 = m.clone(); - let _ = task::try(proc() { - m.lock_cond(|_x| { }) - }); - m2.lock_cond(|cond| { - assert!(!cond.signal()); - }) - } - #[test] - fn test_mutex_killed_simple() { - use std::any::Any; - - // Mutex must get automatically unlocked if failed/killed within. - let m = Mutex::new(); - let m2 = m.clone(); - - let result: result::Result<(), ~Any> = task::try(proc() { - m2.lock(|| { - fail!(); - }) - }); - assert!(result.is_err()); - // child task must have finished by the time try returns - m.lock(|| { }) - } - #[test] - fn test_mutex_cond_signal_on_0() { - // Tests that signal_on(0) is equivalent to signal(). - let m = Mutex::new(); - m.lock_cond(|cond| { - let m2 = m.clone(); - task::spawn(proc() { - m2.lock_cond(|cond| { - cond.signal_on(0); - }) - }); - cond.wait(); - }) - } - #[test] - fn test_mutex_no_condvars() { - let result = task::try(proc() { - let m = Mutex::new_with_condvars(0); - m.lock_cond(|cond| { cond.wait(); }) - }); - assert!(result.is_err()); - let result = task::try(proc() { - let m = Mutex::new_with_condvars(0); - m.lock_cond(|cond| { cond.signal(); }) - }); - assert!(result.is_err()); - let result = task::try(proc() { - let m = Mutex::new_with_condvars(0); - m.lock_cond(|cond| { cond.broadcast(); }) - }); - assert!(result.is_err()); - } - /************************************************************************ - * Reader/writer lock tests - ************************************************************************/ - #[cfg(test)] - pub enum RWLockMode { Read, Write, Downgrade, DowngradeRead } - #[cfg(test)] - fn lock_rwlock_in_mode(x: &RWLock, mode: RWLockMode, blk: ||) { - match mode { - Read => x.read(blk), - Write => x.write(blk), - Downgrade => - x.write_downgrade(|mode| { - mode.write(|| { blk() }); - }), - DowngradeRead => - x.write_downgrade(|mode| { - let mode = x.downgrade(mode); - mode.read(|| { blk() }); - }), - } - } - #[cfg(test)] - fn test_rwlock_exclusion(x: &RWLock, - mode1: RWLockMode, - mode2: RWLockMode) { - // Test mutual exclusion between readers and writers. Just like the - // mutex mutual exclusion test, a ways above. - let (tx, rx) = channel(); - let x2 = x.clone(); - let mut sharedstate = ~0; - { - let ptr: *int = &*sharedstate; - task::spawn(proc() { - let sharedstate: &mut int = - unsafe { cast::transmute(ptr) }; - access_shared(sharedstate, &x2, mode1, 10); - tx.send(()); - }); - } - { - access_shared(sharedstate, x, mode2, 10); - let _ = rx.recv(); - - assert_eq!(*sharedstate, 20); - } - - fn access_shared(sharedstate: &mut int, x: &RWLock, mode: RWLockMode, - n: uint) { - for _ in range(0, n) { - lock_rwlock_in_mode(x, mode, || { - let oldval = *sharedstate; - task::deschedule(); - *sharedstate = oldval + 1; - }) - } - } - } - #[test] - fn test_rwlock_readers_wont_modify_the_data() { - test_rwlock_exclusion(&RWLock::new(), Read, Write); - test_rwlock_exclusion(&RWLock::new(), Write, Read); - test_rwlock_exclusion(&RWLock::new(), Read, Downgrade); - test_rwlock_exclusion(&RWLock::new(), Downgrade, Read); - } - #[test] - fn test_rwlock_writers_and_writers() { - test_rwlock_exclusion(&RWLock::new(), Write, Write); - test_rwlock_exclusion(&RWLock::new(), Write, Downgrade); - test_rwlock_exclusion(&RWLock::new(), Downgrade, Write); - test_rwlock_exclusion(&RWLock::new(), Downgrade, Downgrade); - } - #[cfg(test)] - fn test_rwlock_handshake(x: &RWLock, - mode1: RWLockMode, - mode2: RWLockMode, - make_mode2_go_first: bool) { - // Much like sem_multi_resource. - let x2 = x.clone(); - let (tx1, rx1) = channel(); - let (tx2, rx2) = channel(); - task::spawn(proc() { - if !make_mode2_go_first { - let _ = rx2.recv(); // parent sends to us once it locks, or ... - } - lock_rwlock_in_mode(&x2, mode2, || { - if make_mode2_go_first { - tx1.send(()); // ... we send to it once we lock - } - let _ = rx2.recv(); - tx1.send(()); - }) - }); - if make_mode2_go_first { - let _ = rx1.recv(); // child sends to us once it locks, or ... - } - lock_rwlock_in_mode(x, mode1, || { - if !make_mode2_go_first { - tx2.send(()); // ... we send to it once we lock - } - tx2.send(()); - let _ = rx1.recv(); - }) - } - #[test] - fn test_rwlock_readers_and_readers() { - test_rwlock_handshake(&RWLock::new(), Read, Read, false); - // The downgrader needs to get in before the reader gets in, otherwise - // they cannot end up reading at the same time. - test_rwlock_handshake(&RWLock::new(), DowngradeRead, Read, false); - test_rwlock_handshake(&RWLock::new(), Read, DowngradeRead, true); - // Two downgrade_reads can never both end up reading at the same time. - } - #[test] - fn test_rwlock_downgrade_unlock() { - // Tests that downgrade can unlock the lock in both modes - let x = RWLock::new(); - lock_rwlock_in_mode(&x, Downgrade, || { }); - test_rwlock_handshake(&x, Read, Read, false); - let y = RWLock::new(); - lock_rwlock_in_mode(&y, DowngradeRead, || { }); - test_rwlock_exclusion(&y, Write, Write); - } - #[test] - fn test_rwlock_read_recursive() { - let x = RWLock::new(); - x.read(|| { x.read(|| { }) }) - } - #[test] - fn test_rwlock_cond_wait() { - // As test_mutex_cond_wait above. - let x = RWLock::new(); - - // Child wakes up parent - x.write_cond(|cond| { - let x2 = x.clone(); - task::spawn(proc() { - x2.write_cond(|cond| { - let woken = cond.signal(); - assert!(woken); - }) - }); - cond.wait(); - }); - // Parent wakes up child - let (tx, rx) = channel(); - let x3 = x.clone(); - task::spawn(proc() { - x3.write_cond(|cond| { - tx.send(()); - cond.wait(); - tx.send(()); - }) - }); - let _ = rx.recv(); // Wait until child gets in the rwlock - x.read(|| { }); // Must be able to get in as a reader in the meantime - x.write_cond(|cond| { // Or as another writer - let woken = cond.signal(); - assert!(woken); - }); - let _ = rx.recv(); // Wait until child wakes up - x.read(|| { }); // Just for good measure - } - #[cfg(test)] - fn test_rwlock_cond_broadcast_helper(num_waiters: uint, - dg1: bool, - dg2: bool) { - // Much like the mutex broadcast test. Downgrade-enabled. - fn lock_cond(x: &RWLock, downgrade: bool, blk: |c: &Condvar|) { - if downgrade { - x.write_downgrade(|mode| { - mode.write_cond(|c| { blk(c) }); - }); - } else { - x.write_cond(|c| { blk(c) }); - } - } - let x = RWLock::new(); - let mut rxs = vec!(); - - for _ in range(0, num_waiters) { - let xi = x.clone(); - let (tx, rx) = channel(); - rxs.push(rx); - task::spawn(proc() { - lock_cond(&xi, dg1, |cond| { - tx.send(()); - cond.wait(); - tx.send(()); - }) - }); - } - - // wait until all children get in the mutex - for rx in rxs.mut_iter() { let _ = rx.recv(); } - lock_cond(&x, dg2, |cond| { - let num_woken = cond.broadcast(); - assert_eq!(num_woken, num_waiters); - }); - // wait until all children wake up - for rx in rxs.mut_iter() { let _ = rx.recv(); } - } - #[test] - fn test_rwlock_cond_broadcast() { - test_rwlock_cond_broadcast_helper(0, true, true); - test_rwlock_cond_broadcast_helper(0, true, false); - test_rwlock_cond_broadcast_helper(0, false, true); - test_rwlock_cond_broadcast_helper(0, false, false); - test_rwlock_cond_broadcast_helper(12, true, true); - test_rwlock_cond_broadcast_helper(12, true, false); - test_rwlock_cond_broadcast_helper(12, false, true); - test_rwlock_cond_broadcast_helper(12, false, false); - } - #[cfg(test)] - fn rwlock_kill_helper(mode1: RWLockMode, mode2: RWLockMode) { - use std::any::Any; - - // Mutex must get automatically unlocked if failed/killed within. - let x = RWLock::new(); - let x2 = x.clone(); - - let result: result::Result<(), ~Any> = task::try(proc() { - lock_rwlock_in_mode(&x2, mode1, || { - fail!(); - }) - }); - assert!(result.is_err()); - // child task must have finished by the time try returns - lock_rwlock_in_mode(&x, mode2, || { }) - } - #[test] - fn test_rwlock_reader_killed_writer() { - rwlock_kill_helper(Read, Write); - } - #[test] - fn test_rwlock_writer_killed_reader() { - rwlock_kill_helper(Write, Read); - } - #[test] - fn test_rwlock_reader_killed_reader() { - rwlock_kill_helper(Read, Read); - } - #[test] - fn test_rwlock_writer_killed_writer() { - rwlock_kill_helper(Write, Write); - } - #[test] - fn test_rwlock_kill_downgrader() { - rwlock_kill_helper(Downgrade, Read); - rwlock_kill_helper(Read, Downgrade); - rwlock_kill_helper(Downgrade, Write); - rwlock_kill_helper(Write, Downgrade); - rwlock_kill_helper(DowngradeRead, Read); - rwlock_kill_helper(Read, DowngradeRead); - rwlock_kill_helper(DowngradeRead, Write); - rwlock_kill_helper(Write, DowngradeRead); - rwlock_kill_helper(DowngradeRead, Downgrade); - rwlock_kill_helper(DowngradeRead, Downgrade); - rwlock_kill_helper(Downgrade, DowngradeRead); - rwlock_kill_helper(Downgrade, DowngradeRead); - } - #[test] #[should_fail] - fn test_rwlock_downgrade_cant_swap() { - // Tests that you can't downgrade with a different rwlock's token. - let x = RWLock::new(); - let y = RWLock::new(); - x.write_downgrade(|xwrite| { - let mut xopt = Some(xwrite); - y.write_downgrade(|_ywrite| { - y.downgrade(xopt.take_unwrap()); - error!("oops, y.downgrade(x) should have failed!"); - }) - }) - } - - /************************************************************************ - * Barrier tests - ************************************************************************/ - #[test] - fn test_barrier() { - let barrier = Barrier::new(10); - let (tx, rx) = channel(); - - for _ in range(0, 9) { - let c = barrier.clone(); - let tx = tx.clone(); - spawn(proc() { - c.wait(); - tx.send(true); - }); - } - - // At this point, all spawned tasks should be blocked, - // so we shouldn't get anything from the port - assert!(match rx.try_recv() { - Empty => true, - _ => false, - }); - - barrier.wait(); - // Now, the barrier is cleared and we should get data. - for _ in range(0, 9) { - rx.recv(); - } - } -} -- cgit 1.4.1-3-g733a5 From 4d5aafd3a6a25967a4b0d9326bbbbc1840dbaeeb Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Sat, 22 Mar 2014 00:52:33 -0700 Subject: sync: Introduce new wrapper types for locking This introduces new synchronization types which are meant to be the foundational building blocks for sharing data among tasks. The new Mutex and RWLock types have a type parameter which is the internal data that is accessed. Access to the data is all performed through the guards returned, and the guards all have autoderef implemented for easy access. --- src/libsync/lock.rs | 816 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 816 insertions(+) create mode 100644 src/libsync/lock.rs (limited to 'src/libsync') diff --git a/src/libsync/lock.rs b/src/libsync/lock.rs new file mode 100644 index 00000000000..6ddd0d400f2 --- /dev/null +++ b/src/libsync/lock.rs @@ -0,0 +1,816 @@ +// Copyright 2012-2014 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +//! Wrappers for safe, shared, mutable memory between tasks +//! +//! The wrappers in this module build on the primitives from `sync::raw` to +//! provide safe interfaces around using the primitive locks. These primitives +//! implement a technique called "poisoning" where when a task failed with a +//! held lock, all future attempts to use the lock will fail. +//! +//! For example, if two tasks are contending on a mutex and one of them fails +//! after grabbing the lock, the second task will immediately fail because the +//! lock is now poisoned. + +use std::task; +use std::ty::Unsafe; + +use raw; + +/**************************************************************************** + * Poisoning helpers + ****************************************************************************/ + +struct PoisonOnFail<'a> { + flag: &'a mut bool, + failed: bool, +} + +impl<'a> PoisonOnFail<'a> { + fn check(flag: bool, name: &str) { + if flag { + fail!("Poisoned {} - another task failed inside!", name); + } + } + + fn new<'a>(flag: &'a mut bool, name: &str) -> PoisonOnFail<'a> { + PoisonOnFail::check(*flag, name); + PoisonOnFail { + flag: flag, + failed: task::failing() + } + } +} + +#[unsafe_destructor] +impl<'a> Drop for PoisonOnFail<'a> { + fn drop(&mut self) { + if !self.failed && task::failing() { + *self.flag = true; + } + } +} + +/**************************************************************************** + * Condvar + ****************************************************************************/ + +enum Inner<'a> { + InnerMutex(raw::MutexGuard<'a>), + InnerRWLock(raw::RWLockWriteGuard<'a>), +} + +impl<'b> Inner<'b> { + fn cond<'a>(&'a self) -> &'a raw::Condvar<'b> { + match *self { + InnerMutex(ref m) => &m.cond, + InnerRWLock(ref m) => &m.cond, + } + } +} + +/// A condition variable, a mechanism for unlock-and-descheduling and +/// signaling, for use with the lock types. +pub struct Condvar<'a> { + priv name: &'static str, + // n.b. Inner must be after PoisonOnFail because we must set the poison flag + // *inside* the mutex, and struct fields are destroyed top-to-bottom + // (destroy the lock guard last). + priv poison: PoisonOnFail<'a>, + priv inner: Inner<'a>, +} + +impl<'a> Condvar<'a> { + /// Atomically exit the associated lock and block until a signal is sent. + /// + /// wait() is equivalent to wait_on(0). + /// + /// # Failure + /// + /// A task which is killed while waiting on a condition variable will wake + /// up, fail, and unlock the associated lock as it unwinds. + #[inline] + pub fn wait(&self) { self.wait_on(0) } + + /// Atomically exit the associated lock and block on a specified condvar + /// until a signal is sent on that same condvar. + /// + /// The associated lock must have been initialised with an appropriate + /// number of condvars. The condvar_id must be between 0 and num_condvars-1 + /// or else this call will fail. + #[inline] + pub fn wait_on(&self, condvar_id: uint) { + assert!(!*self.poison.flag); + self.inner.cond().wait_on(condvar_id); + // This is why we need to wrap sync::condvar. + PoisonOnFail::check(*self.poison.flag, self.name); + } + + /// Wake up a blocked task. Returns false if there was no blocked task. + #[inline] + pub fn signal(&self) -> bool { self.signal_on(0) } + + /// Wake up a blocked task on a specified condvar (as + /// sync::cond.signal_on). Returns false if there was no blocked task. + #[inline] + pub fn signal_on(&self, condvar_id: uint) -> bool { + assert!(!*self.poison.flag); + self.inner.cond().signal_on(condvar_id) + } + + /// Wake up all blocked tasks. Returns the number of tasks woken. + #[inline] + pub fn broadcast(&self) -> uint { self.broadcast_on(0) } + + /// Wake up all blocked tasks on a specified condvar (as + /// sync::cond.broadcast_on). Returns the number of tasks woken. + #[inline] + pub fn broadcast_on(&self, condvar_id: uint) -> uint { + assert!(!*self.poison.flag); + self.inner.cond().broadcast_on(condvar_id) + } +} + +/**************************************************************************** + * Mutex + ****************************************************************************/ + +/// A wrapper type which provides synchronized access to the underlying data, of +/// type `T`. A mutex always provides exclusive access, and concurrent requests +/// will block while the mutex is already locked. +/// +/// # Example +/// +/// ``` +/// use sync::{Mutex, Arc}; +/// +/// let mutex = Arc::new(Mutex::new(1)); +/// let mutex2 = mutex.clone(); +/// +/// spawn(proc() { +/// let mut val = mutex2.lock(); +/// *val += 1; +/// val.cond.signal(); +/// }); +/// +/// let mut value = mutex.lock(); +/// while *value != 2 { +/// value.cond.wait(); +/// } +/// ``` +pub struct Mutex { + priv lock: raw::Mutex, + priv failed: Unsafe, + priv data: Unsafe, +} + +/// An guard which is created by locking a mutex. Through this guard the +/// underlying data can be accessed. +pub struct MutexGuard<'a, T> { + priv data: &'a mut T, + /// Inner condition variable connected to the locked mutex that this guard + /// was created from. This can be used for atomic-unlock-and-deschedule. + cond: Condvar<'a>, +} + +impl Mutex { + /// Creates a new mutex to protect the user-supplied data. + pub fn new(user_data: T) -> Mutex { + Mutex::new_with_condvars(user_data, 1) + } + + /// Create a new mutex, with a specified number of associated condvars. + /// + /// This will allow calling wait_on/signal_on/broadcast_on with condvar IDs + /// between 0 and num_condvars-1. (If num_condvars is 0, lock_cond will be + /// allowed but any operations on the condvar will fail.) + pub fn new_with_condvars(user_data: T, num_condvars: uint) -> Mutex { + Mutex { + lock: raw::Mutex::new_with_condvars(num_condvars), + failed: Unsafe::new(false), + data: Unsafe::new(user_data), + } + } + + /// Access the underlying mutable data with mutual exclusion from other + /// tasks. The returned value is an RAII guard which will unlock the mutex + /// when dropped. All concurrent tasks attempting to lock the mutex will + /// block while the returned value is still alive. + /// + /// # Failure + /// + /// Failing while inside the Mutex will unlock the Mutex while unwinding, so + /// that other tasks won't block forever. It will also poison the Mutex: + /// any tasks that subsequently try to access it (including those already + /// blocked on the mutex) will also fail immediately. + #[inline] + pub fn lock<'a>(&'a self) -> MutexGuard<'a, T> { + let guard = self.lock.lock(); + + // These two accesses are safe because we're guranteed at this point + // that we have exclusive access to this mutex. We are indeed able to + // promote ourselves from &Mutex to `&mut T` + let poison = unsafe { &mut *self.failed.get() }; + let data = unsafe { &mut *self.data.get() }; + + MutexGuard { + data: data, + cond: Condvar { + name: "Mutex", + poison: PoisonOnFail::new(poison, "Mutex"), + inner: InnerMutex(guard), + }, + } + } +} + +// FIXME(#13042): these should both have T: Send +impl<'a, T> Deref for MutexGuard<'a, T> { + fn deref<'a>(&'a self) -> &'a T { &*self.data } +} +impl<'a, T> DerefMut for MutexGuard<'a, T> { + fn deref_mut<'a>(&'a mut self) -> &'a mut T { &mut *self.data } +} + +/**************************************************************************** + * R/W lock protected lock + ****************************************************************************/ + +/// A dual-mode reader-writer lock. The data can be accessed mutably or +/// immutably, and immutably-accessing tasks may run concurrently. +/// +/// # Example +/// +/// ``` +/// use sync::{RWLock, Arc}; +/// +/// let lock1 = Arc::new(RWLock::new(1)); +/// let lock2 = lock1.clone(); +/// +/// spawn(proc() { +/// let mut val = lock2.write(); +/// *val = 3; +/// let val = val.downgrade(); +/// println!("{}", *val); +/// }); +/// +/// let val = lock1.read(); +/// println!("{}", *val); +/// ``` +pub struct RWLock { + priv lock: raw::RWLock, + priv failed: Unsafe, + priv data: Unsafe, +} + +/// A guard which is created by locking an rwlock in write mode. Through this +/// guard the underlying data can be accessed. +pub struct RWLockWriteGuard<'a, T> { + priv data: &'a mut T, + /// Inner condition variable that can be used to sleep on the write mode of + /// this rwlock. + cond: Condvar<'a>, +} + +/// A guard which is created by locking an rwlock in read mode. Through this +/// guard the underlying data can be accessed. +pub struct RWLockReadGuard<'a, T> { + priv data: &'a T, + priv guard: raw::RWLockReadGuard<'a>, +} + +impl RWLock { + /// Create a reader/writer lock with the supplied data. + pub fn new(user_data: T) -> RWLock { + RWLock::new_with_condvars(user_data, 1) + } + + /// Create a reader/writer lock with the supplied data and a specified number + /// of condvars (as sync::RWLock::new_with_condvars). + pub fn new_with_condvars(user_data: T, num_condvars: uint) -> RWLock { + RWLock { + lock: raw::RWLock::new_with_condvars(num_condvars), + failed: Unsafe::new(false), + data: Unsafe::new(user_data), + } + } + + /// Access the underlying data mutably. Locks the rwlock in write mode; + /// other readers and writers will block. + /// + /// # Failure + /// + /// Failing while inside the lock will unlock the lock while unwinding, so + /// that other tasks won't block forever. As Mutex.lock, it will also poison + /// the lock, so subsequent readers and writers will both also fail. + #[inline] + pub fn write<'a>(&'a self) -> RWLockWriteGuard<'a, T> { + let guard = self.lock.write(); + + // These two accesses are safe because we're guranteed at this point + // that we have exclusive access to this rwlock. We are indeed able to + // promote ourselves from &RWLock to `&mut T` + let poison = unsafe { &mut *self.failed.get() }; + let data = unsafe { &mut *self.data.get() }; + + RWLockWriteGuard { + data: data, + cond: Condvar { + name: "RWLock", + poison: PoisonOnFail::new(poison, "RWLock"), + inner: InnerRWLock(guard), + }, + } + } + + /// Access the underlying data immutably. May run concurrently with other + /// reading tasks. + /// + /// # Failure + /// + /// Failing will unlock the lock while unwinding. However, unlike all other + /// access modes, this will not poison the lock. + pub fn read<'a>(&'a self) -> RWLockReadGuard<'a, T> { + let guard = self.lock.read(); + PoisonOnFail::check(unsafe { *self.failed.get() }, "RWLock"); + RWLockReadGuard { + guard: guard, + data: unsafe { &*self.data.get() }, + } + } +} + +impl<'a, T: Send + Share> RWLockWriteGuard<'a, T> { + /// Consumes this write lock token, returning a new read lock token. + /// + /// This will allow pending readers to come into the lock. + pub fn downgrade(self) -> RWLockReadGuard<'a, T> { + let RWLockWriteGuard { data, cond } = self; + // convert the data to read-only explicitly + let data = &*data; + let guard = match cond.inner { + InnerMutex(..) => unreachable!(), + InnerRWLock(guard) => guard.downgrade() + }; + RWLockReadGuard { guard: guard, data: data } + } +} + +// FIXME(#13042): these should all have T: Send + Share +impl<'a, T> Deref for RWLockReadGuard<'a, T> { + fn deref<'a>(&'a self) -> &'a T { self.data } +} +impl<'a, T> Deref for RWLockWriteGuard<'a, T> { + fn deref<'a>(&'a self) -> &'a T { &*self.data } +} +impl<'a, T> DerefMut for RWLockWriteGuard<'a, T> { + fn deref_mut<'a>(&'a mut self) -> &'a mut T { &mut *self.data } +} + +/**************************************************************************** + * Barrier + ****************************************************************************/ + +/// A barrier enables multiple tasks to synchronize the beginning +/// of some computation. +/// +/// ```rust +/// use sync::{Arc, Barrier}; +/// +/// let barrier = Arc::new(Barrier::new(10)); +/// for _ in range(0, 10) { +/// let c = barrier.clone(); +/// // The same messages will be printed together. +/// // You will NOT see any interleaving. +/// spawn(proc() { +/// println!("before wait"); +/// c.wait(); +/// println!("after wait"); +/// }); +/// } +/// ``` +pub struct Barrier { + priv lock: Mutex, + priv num_tasks: uint, +} + +// The inner state of a double barrier +struct BarrierState { + count: uint, + generation_id: uint, +} + +impl Barrier { + /// Create a new barrier that can block a given number of tasks. + pub fn new(num_tasks: uint) -> Barrier { + Barrier { + lock: Mutex::new(BarrierState { + count: 0, + generation_id: 0, + }), + num_tasks: num_tasks, + } + } + + /// Block the current task until a certain number of tasks is waiting. + pub fn wait(&self) { + let mut lock = self.lock.lock(); + let local_gen = lock.generation_id; + lock.count += 1; + if lock.count < self.num_tasks { + // We need a while loop to guard against spurious wakeups. + // http://en.wikipedia.org/wiki/Spurious_wakeup + while local_gen == lock.generation_id && + lock.count < self.num_tasks { + lock.cond.wait(); + } + } else { + lock.count = 0; + lock.generation_id += 1; + lock.cond.broadcast(); + } + } +} + +/**************************************************************************** + * Tests + ****************************************************************************/ + +#[cfg(test)] +mod tests { + use std::comm::Empty; + use std::task; + + use arc::Arc; + use super::{Mutex, Barrier, RWLock}; + + #[test] + fn test_mutex_arc_condvar() { + let arc = Arc::new(Mutex::new(false)); + let arc2 = arc.clone(); + let (tx, rx) = channel(); + task::spawn(proc() { + // wait until parent gets in + rx.recv(); + let mut lock = arc2.lock(); + *lock = true; + lock.cond.signal(); + }); + + let lock = arc.lock(); + tx.send(()); + assert!(!*lock); + while !*lock { + lock.cond.wait(); + } + } + + #[test] #[should_fail] + fn test_arc_condvar_poison() { + let arc = Arc::new(Mutex::new(1)); + let arc2 = arc.clone(); + let (tx, rx) = channel(); + + spawn(proc() { + rx.recv(); + let lock = arc2.lock(); + lock.cond.signal(); + // Parent should fail when it wakes up. + fail!(); + }); + + let lock = arc.lock(); + tx.send(()); + while *lock == 1 { + lock.cond.wait(); + } + } + + #[test] #[should_fail] + fn test_mutex_arc_poison() { + let arc = Arc::new(Mutex::new(1)); + let arc2 = arc.clone(); + let _ = task::try(proc() { + let lock = arc2.lock(); + assert_eq!(*lock, 2); + }); + let lock = arc.lock(); + assert_eq!(*lock, 1); + } + + #[test] + fn test_mutex_arc_nested() { + // Tests nested mutexes and access + // to underlaying data. + let arc = Arc::new(Mutex::new(1)); + let arc2 = Arc::new(Mutex::new(arc)); + task::spawn(proc() { + let lock = arc2.lock(); + let lock2 = lock.deref().lock(); + assert_eq!(*lock2, 1); + }); + } + + #[test] + fn test_mutex_arc_access_in_unwind() { + let arc = Arc::new(Mutex::new(1i)); + let arc2 = arc.clone(); + let _ = task::try::<()>(proc() { + struct Unwinder { + i: Arc>, + } + impl Drop for Unwinder { + fn drop(&mut self) { + let mut lock = self.i.lock(); + *lock += 1; + } + } + let _u = Unwinder { i: arc2 }; + fail!(); + }); + let lock = arc.lock(); + assert_eq!(*lock, 2); + } + + #[test] #[should_fail] + fn test_rw_arc_poison_wr() { + let arc = Arc::new(RWLock::new(1)); + let arc2 = arc.clone(); + let _ = task::try(proc() { + let lock = arc2.write(); + assert_eq!(*lock, 2); + }); + let lock = arc.read(); + assert_eq!(*lock, 1); + } + #[test] #[should_fail] + fn test_rw_arc_poison_ww() { + let arc = Arc::new(RWLock::new(1)); + let arc2 = arc.clone(); + let _ = task::try(proc() { + let lock = arc2.write(); + assert_eq!(*lock, 2); + }); + let lock = arc.write(); + assert_eq!(*lock, 1); + } + #[test] + fn test_rw_arc_no_poison_rr() { + let arc = Arc::new(RWLock::new(1)); + let arc2 = arc.clone(); + let _ = task::try(proc() { + let lock = arc2.read(); + assert_eq!(*lock, 2); + }); + let lock = arc.read(); + assert_eq!(*lock, 1); + } + #[test] + fn test_rw_arc_no_poison_rw() { + let arc = Arc::new(RWLock::new(1)); + let arc2 = arc.clone(); + let _ = task::try(proc() { + let lock = arc2.read(); + assert_eq!(*lock, 2); + }); + let lock = arc.write(); + assert_eq!(*lock, 1); + } + #[test] + fn test_rw_arc_no_poison_dr() { + let arc = Arc::new(RWLock::new(1)); + let arc2 = arc.clone(); + let _ = task::try(proc() { + let lock = arc2.write().downgrade(); + assert_eq!(*lock, 2); + }); + let lock = arc.write(); + assert_eq!(*lock, 1); + } + + #[test] + fn test_rw_arc() { + let arc = Arc::new(RWLock::new(0)); + let arc2 = arc.clone(); + let (tx, rx) = channel(); + + task::spawn(proc() { + let mut lock = arc2.write(); + for _ in range(0, 10) { + let tmp = *lock; + *lock = -1; + task::deschedule(); + *lock = tmp + 1; + } + tx.send(()); + }); + + // Readers try to catch the writer in the act + let mut children = Vec::new(); + for _ in range(0, 5) { + let arc3 = arc.clone(); + let mut builder = task::task(); + children.push(builder.future_result()); + builder.spawn(proc() { + let lock = arc3.read(); + assert!(*lock >= 0); + }); + } + + // Wait for children to pass their asserts + for r in children.mut_iter() { + assert!(r.recv().is_ok()); + } + + // Wait for writer to finish + rx.recv(); + let lock = arc.read(); + assert_eq!(*lock, 10); + } + + #[test] + fn test_rw_arc_access_in_unwind() { + let arc = Arc::new(RWLock::new(1i)); + let arc2 = arc.clone(); + let _ = task::try::<()>(proc() { + struct Unwinder { + i: Arc>, + } + impl Drop for Unwinder { + fn drop(&mut self) { + let mut lock = self.i.write(); + *lock += 1; + } + } + let _u = Unwinder { i: arc2 }; + fail!(); + }); + let lock = arc.read(); + assert_eq!(*lock, 2); + } + + #[test] + fn test_rw_downgrade() { + // (1) A downgrader gets in write mode and does cond.wait. + // (2) A writer gets in write mode, sets state to 42, and does signal. + // (3) Downgrader wakes, sets state to 31337. + // (4) tells writer and all other readers to contend as it downgrades. + // (5) Writer attempts to set state back to 42, while downgraded task + // and all reader tasks assert that it's 31337. + let arc = Arc::new(RWLock::new(0)); + + // Reader tasks + let mut reader_convos = Vec::new(); + for _ in range(0, 10) { + let ((tx1, rx1), (tx2, rx2)) = (channel(), channel()); + reader_convos.push((tx1, rx2)); + let arcn = arc.clone(); + task::spawn(proc() { + rx1.recv(); // wait for downgrader to give go-ahead + let lock = arcn.read(); + assert_eq!(*lock, 31337); + tx2.send(()); + }); + } + + // Writer task + let arc2 = arc.clone(); + let ((tx1, rx1), (tx2, rx2)) = (channel(), channel()); + task::spawn(proc() { + rx1.recv(); + { + let mut lock = arc2.write(); + assert_eq!(*lock, 0); + *lock = 42; + lock.cond.signal(); + } + rx1.recv(); + { + let mut lock = arc2.write(); + // This shouldn't happen until after the downgrade read + // section, and all other readers, finish. + assert_eq!(*lock, 31337); + *lock = 42; + } + tx2.send(()); + }); + + // Downgrader (us) + let mut lock = arc.write(); + tx1.send(()); // send to another writer who will wake us up + while *lock == 0 { + lock.cond.wait(); + } + assert_eq!(*lock, 42); + *lock = 31337; + // send to other readers + for &(ref mut rc, _) in reader_convos.mut_iter() { + rc.send(()) + } + let lock = lock.downgrade(); + // complete handshake with other readers + for &(_, ref mut rp) in reader_convos.mut_iter() { + rp.recv() + } + tx1.send(()); // tell writer to try again + assert_eq!(*lock, 31337); + drop(lock); + + rx2.recv(); // complete handshake with writer + } + + #[cfg(test)] + fn test_rw_write_cond_downgrade_read_race_helper() { + // Tests that when a downgrader hands off the "reader cloud" lock + // because of a contending reader, a writer can't race to get it + // instead, which would result in readers_and_writers. This tests + // the raw module rather than this one, but it's here because an + // rwarc gives us extra shared state to help check for the race. + let x = Arc::new(RWLock::new(true)); + let (tx, rx) = channel(); + + // writer task + let xw = x.clone(); + task::spawn(proc() { + let mut lock = xw.write(); + tx.send(()); // tell downgrader it's ok to go + lock.cond.wait(); + // The core of the test is here: the condvar reacquire path + // must involve order_lock, so that it cannot race with a reader + // trying to receive the "reader cloud lock hand-off". + *lock = false; + }); + + rx.recv(); // wait for writer to get in + + let lock = x.write(); + assert!(*lock); + // make writer contend in the cond-reacquire path + lock.cond.signal(); + // make a reader task to trigger the "reader cloud lock" handoff + let xr = x.clone(); + let (tx, rx) = channel(); + task::spawn(proc() { + tx.send(()); + drop(xr.read()); + }); + rx.recv(); // wait for reader task to exist + + let lock = lock.downgrade(); + // if writer mistakenly got in, make sure it mutates state + // before we assert on it + for _ in range(0, 5) { task::deschedule(); } + // make sure writer didn't get in. + assert!(*lock); + } + #[test] + fn test_rw_write_cond_downgrade_read_race() { + // Ideally the above test case would have deschedule statements in it + // that helped to expose the race nearly 100% of the time... but adding + // deschedules in the intuitively-right locations made it even less + // likely, and I wasn't sure why :( . This is a mediocre "next best" + // option. + for _ in range(0, 8) { + test_rw_write_cond_downgrade_read_race_helper(); + } + } + + /************************************************************************ + * Barrier tests + ************************************************************************/ + #[test] + fn test_barrier() { + let barrier = Arc::new(Barrier::new(10)); + let (tx, rx) = channel(); + + for _ in range(0, 9) { + let c = barrier.clone(); + let tx = tx.clone(); + spawn(proc() { + c.wait(); + tx.send(true); + }); + } + + // At this point, all spawned tasks should be blocked, + // so we shouldn't get anything from the port + assert!(match rx.try_recv() { + Empty => true, + _ => false, + }); + + barrier.wait(); + // Now, the barrier is cleared and we should get data. + for _ in range(0, 9) { + rx.recv(); + } + } +} + -- cgit 1.4.1-3-g733a5 From 64a52de8236e6405a50150c910370e161b854927 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Sat, 22 Mar 2014 00:53:58 -0700 Subject: sync: Update the arc module This removes the now-outdated MutexArc and RWArc types. These are superseded by Arc> and Arc>. The only remaining arc is the one true Arc. Additionally, the arc now has weak pointers implemented for it to assist in breaking cycles. This commit brings the arc api up to parity with the sibling Rc api, making them nearly interchangeable for inter and intra task communication. --- src/libsync/arc.rs | 1168 +++++++++++----------------------------------------- 1 file changed, 239 insertions(+), 929 deletions(-) (limited to 'src/libsync') diff --git a/src/libsync/arc.rs b/src/libsync/arc.rs index 0bc3b121a88..28841b780a4 100644 --- a/src/libsync/arc.rs +++ b/src/libsync/arc.rs @@ -11,571 +11,247 @@ /*! * Concurrency-enabled mechanisms for sharing mutable and/or immutable state * between tasks. - * - * # Example - * - * In this example, a large vector of floats is shared between several tasks. - * With simple pipes, without Arc, a copy would have to be made for each task. - * - * ```rust - * extern crate sync; - * extern crate rand; - * - * use std::slice; - * use sync::Arc; - * - * fn main() { - * let numbers = slice::from_fn(100, |i| (i as f32) * rand::random()); - * let shared_numbers = Arc::new(numbers); - * - * for _ in range(0, 10) { - * let (tx, rx) = channel(); - * tx.send(shared_numbers.clone()); - * - * spawn(proc() { - * let shared_numbers = rx.recv(); - * let local_numbers = shared_numbers.get(); - * - * // Work with the local numbers - * }); - * } - * } - * ``` */ -#[allow(missing_doc, dead_code)]; - - -use sync; -use sync::{Mutex, RWLock}; - use std::cast; -use std::kinds::{Share, marker}; -use std::sync::arc::UnsafeArc; -use std::task; - -/// As sync::condvar, a mechanism for unlock-and-descheduling and -/// signaling, for use with the Arc types. -pub struct ArcCondvar<'a> { - priv is_mutex: bool, - priv failed: &'a bool, - priv cond: &'a sync::Condvar<'a> +use std::ptr; +use std::rt::global_heap; +use std::sync::atomics; + +/// An atomically reference counted wrapper for shared state. +/// +/// # Example +/// +/// In this example, a large vector of floats is shared between several tasks. +/// With simple pipes, without `Arc`, a copy would have to be made for each +/// task. +/// +/// ```rust +/// use sync::Arc; +/// +/// fn main() { +/// let numbers = Vec::from_fn(100, |i| i as f32); +/// let shared_numbers = Arc::new(numbers); +/// +/// for _ in range(0, 10) { +/// let child_numbers = shared_numbers.clone(); +/// +/// spawn(proc() { +/// let local_numbers = child_numbers.as_slice(); +/// +/// // Work with the local numbers +/// }); +/// } +/// } +/// ``` +#[unsafe_no_drop_flag] +pub struct Arc { + priv x: *mut ArcInner, } -impl<'a> ArcCondvar<'a> { - /// Atomically exit the associated Arc and block until a signal is sent. - #[inline] - pub fn wait(&self) { self.wait_on(0) } - - /** - * Atomically exit the associated Arc and block on a specified condvar - * until a signal is sent on that same condvar (as sync::cond.wait_on). - * - * wait() is equivalent to wait_on(0). - */ - #[inline] - pub fn wait_on(&self, condvar_id: uint) { - assert!(!*self.failed); - self.cond.wait_on(condvar_id); - // This is why we need to wrap sync::condvar. - check_poison(self.is_mutex, *self.failed); - } - - /// Wake up a blocked task. Returns false if there was no blocked task. - #[inline] - pub fn signal(&self) -> bool { self.signal_on(0) } - - /** - * Wake up a blocked task on a specified condvar (as - * sync::cond.signal_on). Returns false if there was no blocked task. - */ - #[inline] - pub fn signal_on(&self, condvar_id: uint) -> bool { - assert!(!*self.failed); - self.cond.signal_on(condvar_id) - } - - /// Wake up all blocked tasks. Returns the number of tasks woken. - #[inline] - pub fn broadcast(&self) -> uint { self.broadcast_on(0) } - - /** - * Wake up all blocked tasks on a specified condvar (as - * sync::cond.broadcast_on). Returns the number of tasks woken. - */ - #[inline] - pub fn broadcast_on(&self, condvar_id: uint) -> uint { - assert!(!*self.failed); - self.cond.broadcast_on(condvar_id) - } +/// A weak pointer to an `Arc`. +/// +/// Weak pointers will not keep the data inside of the `Arc` alive, and can be +/// used to break cycles between `Arc` pointers. +#[unsafe_no_drop_flag] +pub struct Weak { + priv x: *mut ArcInner, } -/**************************************************************************** - * Immutable Arc - ****************************************************************************/ - -/// An atomically reference counted wrapper for shared immutable state. -pub struct Arc { priv x: UnsafeArc } - +struct ArcInner { + strong: atomics::AtomicUint, + weak: atomics::AtomicUint, + data: T, +} -/** - * Access the underlying data in an atomically reference counted - * wrapper. - */ impl Arc { /// Create an atomically reference counted wrapper. #[inline] pub fn new(data: T) -> Arc { - Arc { x: UnsafeArc::new(data) } + // Start the weak pointer count as 1 which is the weak pointer that's + // held by all the strong pointers (kinda), see std/rc.rs for more info + let x = ~ArcInner { + strong: atomics::AtomicUint::new(1), + weak: atomics::AtomicUint::new(1), + data: data, + }; + Arc { x: unsafe { cast::transmute(x) } } } #[inline] - pub fn get<'a>(&'a self) -> &'a T { - unsafe { &*self.x.get_immut() } + fn inner<'a>(&'a self) -> &'a ArcInner { + // This unsafety is ok because while this arc is alive we're guaranteed + // that the inner pointer is valid. Furthermore, we know that the + // `ArcInner` structure itself is `Share` because the inner data is + // `Share` as well, so we're ok loaning out an immutable pointer to + // these contents. + unsafe { &*self.x } + } + + /// Downgrades a strong pointer to a weak pointer + /// + /// Weak pointers will not keep the data alive. Once all strong references + /// to the underlying data have been dropped, the data itself will be + /// destroyed. + pub fn downgrade(&self) -> Weak { + // See the clone() impl for why this is relaxed + self.inner().weak.fetch_add(1, atomics::Relaxed); + Weak { x: self.x } } } impl Clone for Arc { - /** - * Duplicate an atomically reference counted wrapper. - * - * The resulting two `arc` objects will point to the same underlying data - * object. However, one of the `arc` objects can be sent to another task, - * allowing them to share the underlying data. - */ + /// Duplicate an atomically reference counted wrapper. + /// + /// The resulting two `Arc` objects will point to the same underlying data + /// object. However, one of the `Arc` objects can be sent to another task, + /// allowing them to share the underlying data. #[inline] fn clone(&self) -> Arc { - Arc { x: self.x.clone() } + // Using a relaxed ordering is alright here, as knowledge of the + // original reference prevents other threads from erroneously deleting + // the object. + // + // As explained in the [Boost documentation][1], Increasing the + // reference counter can always be done with memory_order_relaxed: New + // references to an object can only be formed from an existing + // reference, and passing an existing reference from one thread to + // another must already provide any required synchronization. + // + // [1]: (www.boost.org/doc/libs/1_55_0/doc/html/atomic/usage_examples.html) + self.inner().strong.fetch_add(1, atomics::Relaxed); + Arc { x: self.x } } } -/**************************************************************************** - * Mutex protected Arc (unsafe) - ****************************************************************************/ - -#[doc(hidden)] -struct MutexArcInner { lock: Mutex, failed: bool, data: T } - -/// An Arc with mutable data protected by a blocking mutex. -pub struct MutexArc { - priv x: UnsafeArc>, -} - -impl Clone for MutexArc { - /// Duplicate a mutex-protected Arc. See arc::clone for more details. +// FIXME(#13042): this should have T: Send, and use self.inner() +impl Deref for Arc { #[inline] - fn clone(&self) -> MutexArc { - // NB: Cloning the underlying mutex is not necessary. Its reference - // count would be exactly the same as the shared state's. - MutexArc { x: self.x.clone() } + fn deref<'a>(&'a self) -> &'a T { + let inner = unsafe { &*self.x }; + &inner.data } } -impl MutexArc { - /// Create a mutex-protected Arc with the supplied data. - pub fn new(user_data: T) -> MutexArc { - MutexArc::new_with_condvars(user_data, 1) - } - - /** - * Create a mutex-protected Arc with the supplied data and a specified number - * of condvars (as sync::Mutex::new_with_condvars). - */ - pub fn new_with_condvars(user_data: T, num_condvars: uint) -> MutexArc { - let data = MutexArcInner { - lock: Mutex::new_with_condvars(num_condvars), - failed: false, data: user_data - }; - MutexArc { x: UnsafeArc::new(data) } - } - - /** - * Access the underlying mutable data with mutual exclusion from other - * tasks. The argument closure will be run with the mutex locked; all - * other tasks wishing to access the data will block until the closure - * finishes running. - * - * If you wish to nest MutexArcs, one strategy for ensuring safety at - * runtime is to add a "nesting level counter" inside the stored data, and - * when traversing the arcs, assert that they monotonically decrease. - * - * # Failure - * - * Failing while inside the Arc will unlock the Arc while unwinding, so - * that other tasks won't block forever. It will also poison the Arc: - * any tasks that subsequently try to access it (including those already - * blocked on the mutex) will also fail immediately. - */ - #[inline] - pub fn access(&self, blk: |x: &mut T| -> U) -> U { - let state = self.x.get(); - unsafe { - // Borrowck would complain about this if the code were - // not already unsafe. See borrow_rwlock, far below. - (&(*state).lock).lock(|| { - check_poison(true, (*state).failed); - let _z = PoisonOnFail::new(&mut (*state).failed); - blk(&mut (*state).data) - }) - } - } - - /// As access(), but with a condvar, as sync::mutex.lock_cond(). +impl Arc { + /// Acquires a mutable pointer to the inner contents by guaranteeing that + /// the reference count is one (no sharing is possible). + /// + /// This is also referred to as a copy-on-write operation because the inner + /// data is cloned if the reference count is greater than one. #[inline] - pub fn access_cond(&self, blk: |x: &mut T, c: &ArcCondvar| -> U) -> U { - let state = self.x.get(); - unsafe { - (&(*state).lock).lock_cond(|cond| { - check_poison(true, (*state).failed); - let _z = PoisonOnFail::new(&mut (*state).failed); - blk(&mut (*state).data, - &ArcCondvar {is_mutex: true, - failed: &(*state).failed, - cond: cond }) - }) - } - } -} - -// Common code for {mutex.access,rwlock.write}{,_cond}. -#[inline] -#[doc(hidden)] -fn check_poison(is_mutex: bool, failed: bool) { - if failed { - if is_mutex { - fail!("Poisoned MutexArc - another task failed inside!"); - } else { - fail!("Poisoned rw_arc - another task failed inside!"); + #[experimental] + pub fn make_unique<'a>(&'a mut self) -> &'a mut T { + if self.inner().strong.load(atomics::SeqCst) != 1 { + *self = Arc::new(self.deref().clone()) } + // This unsafety is ok because we're guaranteed that the pointer + // returned is the *only* pointer that will ever be returned to T. Our + // reference count is guaranteed to be 1 at this point, and we required + // the Arc itself to be `mut`, so we're returning the only possible + // reference to the inner data. + unsafe { cast::transmute_mut(self.deref()) } } } -#[doc(hidden)] -struct PoisonOnFail { - flag: *mut bool, - failed: bool, -} - -impl Drop for PoisonOnFail { +#[unsafe_destructor] +impl Drop for Arc { fn drop(&mut self) { - unsafe { - /* assert!(!*self.failed); - -- might be false in case of cond.wait() */ - if !self.failed && task::failing() { - *self.flag = true; - } + // This structure has #[unsafe_no_drop_flag], so this drop glue may run + // more than once (but it is guaranteed to be zeroed after the first if + // it's run more than once) + if self.x.is_null() { return } + + // Because `fetch_sub` is already atomic, we do not need to synchronize + // with other threads unless we are going to delete the object. This + // same logic applies to the below `fetch_sub` to the `weak` count. + if self.inner().strong.fetch_sub(1, atomics::Release) != 0 { return } + + // This fence is needed to prevent reordering of use of the data and + // deletion of the data. Because it is marked `Release`, the + // decreasing of the reference count sychronizes with this `Acquire` + // fence. This means that use of the data happens before decreasing + // the refernce count, which happens before this fence, which + // happens before the deletion of the data. + // + // As explained in the [Boost documentation][1], + // + // It is important to enforce any possible access to the object in + // one thread (through an existing reference) to *happen before* + // deleting the object in a different thread. This is achieved by a + // "release" operation after dropping a reference (any access to the + // object through this reference must obviously happened before), + // and an "acquire" operation before deleting the object. + // + // [1]: (www.boost.org/doc/libs/1_55_0/doc/html/atomic/usage_examples.html) + atomics::fence(atomics::Acquire); + + // Destroy the data at this time, even though we may not free the box + // allocation itself (there may still be weak pointers lying around). + unsafe { drop(ptr::read(&self.inner().data)); } + + if self.inner().weak.fetch_sub(1, atomics::Release) == 0 { + atomics::fence(atomics::Acquire); + unsafe { global_heap::exchange_free(self.x as *u8) } } } } -impl PoisonOnFail { - fn new<'a>(flag: &'a mut bool) -> PoisonOnFail { - PoisonOnFail { - flag: flag, - failed: task::failing() +impl Weak { + /// Attempts to upgrade this weak reference to a strong reference. + /// + /// This method will fail to upgrade this reference if the strong reference + /// count has already reached 0, but if there are still other active strong + /// references this function will return a new strong reference to the data + pub fn upgrade(&self) -> Option> { + // We use a CAS loop to increment the strong count instead of a + // fetch_add because once the count hits 0 is must never be above 0. + let inner = self.inner(); + loop { + let n = inner.strong.load(atomics::SeqCst); + if n == 0 { return None } + let old = inner.strong.compare_and_swap(n, n + 1, atomics::SeqCst); + if old == n { return Some(Arc { x: self.x }) } } } -} -/**************************************************************************** - * R/W lock protected Arc - ****************************************************************************/ - -#[doc(hidden)] -struct RWArcInner { lock: RWLock, failed: bool, data: T } -/** - * A dual-mode Arc protected by a reader-writer lock. The data can be accessed - * mutably or immutably, and immutably-accessing tasks may run concurrently. - * - * Unlike mutex_arcs, rw_arcs are safe, because they cannot be nested. - */ -pub struct RWArc { - priv x: UnsafeArc>, - priv marker: marker::NoShare, -} - -impl Clone for RWArc { - /// Duplicate a rwlock-protected Arc. See arc::clone for more details. #[inline] - fn clone(&self) -> RWArc { - RWArc { - x: self.x.clone(), - marker: marker::NoShare - } + fn inner<'a>(&'a self) -> &'a ArcInner { + // See comments above for why this is "safe" + unsafe { &*self.x } } - } -impl RWArc { - /// Create a reader/writer Arc with the supplied data. - pub fn new(user_data: T) -> RWArc { - RWArc::new_with_condvars(user_data, 1) - } - - /** - * Create a reader/writer Arc with the supplied data and a specified number - * of condvars (as sync::RWLock::new_with_condvars). - */ - pub fn new_with_condvars(user_data: T, num_condvars: uint) -> RWArc { - let data = RWArcInner { - lock: RWLock::new_with_condvars(num_condvars), - failed: false, data: user_data - }; - RWArc { - x: UnsafeArc::new(data), - marker: marker::NoShare - } - } - - /** - * Access the underlying data mutably. Locks the rwlock in write mode; - * other readers and writers will block. - * - * # Failure - * - * Failing while inside the Arc will unlock the Arc while unwinding, so - * that other tasks won't block forever. As MutexArc.access, it will also - * poison the Arc, so subsequent readers and writers will both also fail. - */ +impl Clone for Weak { #[inline] - pub fn write(&self, blk: |x: &mut T| -> U) -> U { - unsafe { - let state = self.x.get(); - (*borrow_rwlock(state)).write(|| { - check_poison(false, (*state).failed); - let _z = PoisonOnFail::new(&mut (*state).failed); - blk(&mut (*state).data) - }) - } - } - - /// As write(), but with a condvar, as sync::rwlock.write_cond(). - #[inline] - pub fn write_cond(&self, - blk: |x: &mut T, c: &ArcCondvar| -> U) - -> U { - unsafe { - let state = self.x.get(); - (*borrow_rwlock(state)).write_cond(|cond| { - check_poison(false, (*state).failed); - let _z = PoisonOnFail::new(&mut (*state).failed); - blk(&mut (*state).data, - &ArcCondvar {is_mutex: false, - failed: &(*state).failed, - cond: cond}) - }) - } - } - - /** - * Access the underlying data immutably. May run concurrently with other - * reading tasks. - * - * # Failure - * - * Failing will unlock the Arc while unwinding. However, unlike all other - * access modes, this will not poison the Arc. - */ - pub fn read(&self, blk: |x: &T| -> U) -> U { - unsafe { - let state = self.x.get(); - (*state).lock.read(|| { - check_poison(false, (*state).failed); - blk(&(*state).data) - }) - } - } - - /** - * As write(), but with the ability to atomically 'downgrade' the lock. - * See sync::rwlock.write_downgrade(). The RWWriteMode token must be used - * to obtain the &mut T, and can be transformed into a RWReadMode token by - * calling downgrade(), after which a &T can be obtained instead. - * - * # Example - * - * ```rust - * use sync::RWArc; - * - * let arc = RWArc::new(1); - * arc.write_downgrade(|mut write_token| { - * write_token.write_cond(|state, condvar| { - * // ... exclusive access with mutable state ... - * }); - * let read_token = arc.downgrade(write_token); - * read_token.read(|state| { - * // ... shared access with immutable state ... - * }); - * }) - * ``` - */ - pub fn write_downgrade(&self, blk: |v: RWWriteMode| -> U) -> U { - unsafe { - let state = self.x.get(); - (*borrow_rwlock(state)).write_downgrade(|write_mode| { - check_poison(false, (*state).failed); - blk(RWWriteMode { - data: &mut (*state).data, - token: write_mode, - poison: PoisonOnFail::new(&mut (*state).failed) - }) - }) - } - } - - /// To be called inside of the write_downgrade block. - pub fn downgrade<'a>(&self, token: RWWriteMode<'a, T>) - -> RWReadMode<'a, T> { - unsafe { - // The rwlock should assert that the token belongs to us for us. - let state = self.x.get(); - let RWWriteMode { - data: data, - token: t, - poison: _poison - } = token; - // Let readers in - let new_token = (*state).lock.downgrade(t); - // Whatever region the input reference had, it will be safe to use - // the same region for the output reference. (The only 'unsafe' part - // of this cast is removing the mutability.) - let new_data = data; - // Downgrade ensured the token belonged to us. Just a sanity check. - assert!((&(*state).data as *T as uint) == (new_data as *mut T as uint)); - // Produce new token - RWReadMode { - data: new_data, - token: new_token, - } - } - } -} - -// Borrowck rightly complains about immutably aliasing the rwlock in order to -// lock it. This wraps the unsafety, with the justification that the 'lock' -// field is never overwritten; only 'failed' and 'data'. -#[doc(hidden)] -fn borrow_rwlock(state: *mut RWArcInner) -> *RWLock { - unsafe { cast::transmute(&(*state).lock) } -} - -/// The "write permission" token used for RWArc.write_downgrade(). -pub struct RWWriteMode<'a, T> { - priv data: &'a mut T, - priv token: sync::RWLockWriteMode<'a>, - priv poison: PoisonOnFail, -} - -/// The "read permission" token used for RWArc.write_downgrade(). -pub struct RWReadMode<'a, T> { - priv data: &'a T, - priv token: sync::RWLockReadMode<'a>, -} - -impl<'a, T: Share + Send> RWWriteMode<'a, T> { - /// Access the pre-downgrade RWArc in write mode. - pub fn write(&mut self, blk: |x: &mut T| -> U) -> U { - match *self { - RWWriteMode { - data: &ref mut data, - token: ref token, - poison: _ - } => { - token.write(|| blk(data)) - } - } - } - - /// Access the pre-downgrade RWArc in write mode with a condvar. - pub fn write_cond(&mut self, - blk: |x: &mut T, c: &ArcCondvar| -> U) - -> U { - match *self { - RWWriteMode { - data: &ref mut data, - token: ref token, - poison: ref poison - } => { - token.write_cond(|cond| { - unsafe { - let cvar = ArcCondvar { - is_mutex: false, - failed: &*poison.flag, - cond: cond - }; - blk(data, &cvar) - } - }) - } - } - } -} - -impl<'a, T: Share + Send> RWReadMode<'a, T> { - /// Access the post-downgrade rwlock in read mode. - pub fn read(&self, blk: |x: &T| -> U) -> U { - match *self { - RWReadMode { - data: data, - token: ref token - } => { - token.read(|| blk(data)) - } - } + fn clone(&self) -> Weak { + // See comments in Arc::clone() for why this is relaxed + self.inner().weak.fetch_add(1, atomics::Relaxed); + Weak { x: self.x } } } -/**************************************************************************** - * Copy-on-write Arc - ****************************************************************************/ - -pub struct CowArc { priv x: UnsafeArc } - -/// A Copy-on-write Arc functions the same way as an `arc` except it allows -/// mutation of the contents if there is only a single reference to -/// the data. If there are multiple references the data is automatically -/// cloned and the task modifies the cloned data in place of the shared data. -impl CowArc { - /// Create a copy-on-write atomically reference counted wrapper - #[inline] - pub fn new(data: T) -> CowArc { - CowArc { x: UnsafeArc::new(data) } - } - - #[inline] - pub fn get<'a>(&'a self) -> &'a T { - unsafe { &*self.x.get_immut() } - } - - /// get a mutable reference to the contents. If there are more then one - /// reference to the contents of the `CowArc` will be cloned - /// and this reference updated to point to the cloned data. - #[inline] - pub fn get_mut<'a>(&'a mut self) -> &'a mut T { - if !self.x.is_owned() { - *self = CowArc::new(self.get().clone()) +#[unsafe_destructor] +impl Drop for Weak { + fn drop(&mut self) { + // see comments above for why this check is here + if self.x.is_null() { return } + + // If we find out that we were the last weak pointer, then its time to + // deallocate the data entirely. See the discussion in Arc::drop() about + // the memory orderings + if self.inner().weak.fetch_sub(1, atomics::Release) == 0 { + atomics::fence(atomics::Acquire); + unsafe { global_heap::exchange_free(self.x as *u8) } } - unsafe { &mut *self.x.get() } - } -} - -impl Clone for CowArc { - /// Duplicate a Copy-on-write Arc. See arc::clone for more details. - fn clone(&self) -> CowArc { - CowArc { x: self.x.clone() } } } - - -/**************************************************************************** - * Tests - ****************************************************************************/ - #[cfg(test)] +#[allow(experimental)] mod tests { - - use super::{Arc, RWArc, MutexArc, CowArc}; + use super::{Arc, Weak}; + use Mutex; use std::task; @@ -588,455 +264,89 @@ mod tests { task::spawn(proc() { let arc_v: Arc> = rx.recv(); - - let v = arc_v.get().clone(); - assert_eq!(*v.get(3), 4); + assert_eq!(*arc_v.get(3), 4); }); tx.send(arc_v.clone()); - assert_eq!(*arc_v.get().get(2), 3); - assert_eq!(*arc_v.get().get(4), 5); + assert_eq!(*arc_v.get(2), 3); + assert_eq!(*arc_v.get(4), 5); info!("{:?}", arc_v); } #[test] - fn test_mutex_arc_condvar() { - let arc = ~MutexArc::new(false); - let arc2 = ~arc.clone(); - let (tx, rx) = channel(); - task::spawn(proc() { - // wait until parent gets in - rx.recv(); - arc2.access_cond(|state, cond| { - *state = true; - cond.signal(); - }) - }); - - arc.access_cond(|state, cond| { - tx.send(()); - assert!(!*state); - while !*state { - cond.wait(); - } - }) - } - - #[test] #[should_fail] - fn test_arc_condvar_poison() { - let arc = ~MutexArc::new(1); - let arc2 = ~arc.clone(); - let (tx, rx) = channel(); - - spawn(proc() { - let _ = rx.recv(); - arc2.access_cond(|one, cond| { - cond.signal(); - // Parent should fail when it wakes up. - assert_eq!(*one, 0); - }) - }); - - arc.access_cond(|one, cond| { - tx.send(()); - while *one == 1 { - cond.wait(); - } - }) - } - - #[test] #[should_fail] - fn test_mutex_arc_poison() { - let arc = ~MutexArc::new(1); - let arc2 = ~arc.clone(); - let _ = task::try(proc() { - arc2.access(|one| { - assert_eq!(*one, 2); - }) - }); - arc.access(|one| { - assert_eq!(*one, 1); - }) - } - - #[test] - fn test_mutex_arc_nested() { - // Tests nested mutexes and access - // to underlaying data. - let arc = ~MutexArc::new(1); - let arc2 = ~MutexArc::new(*arc); - task::spawn(proc() { - (*arc2).access(|mutex| { - (*mutex).access(|one| { - assert!(*one == 1); - }) - }) - }); - } - - #[test] - fn test_mutex_arc_access_in_unwind() { - let arc = MutexArc::new(1i); - let arc2 = arc.clone(); - let _ = task::try::<()>(proc() { - struct Unwinder { - i: MutexArc - } - impl Drop for Unwinder { - fn drop(&mut self) { - self.i.access(|num| *num += 1); - } - } - let _u = Unwinder { i: arc2 }; - fail!(); - }); - assert_eq!(2, arc.access(|n| *n)); - } - - #[test] #[should_fail] - fn test_rw_arc_poison_wr() { - let arc = RWArc::new(1); - let arc2 = arc.clone(); - let _ = task::try(proc() { - arc2.write(|one| { - assert_eq!(*one, 2); - }) - }); - arc.read(|one| { - assert_eq!(*one, 1); - }) - } - - #[test] #[should_fail] - fn test_rw_arc_poison_ww() { - let arc = RWArc::new(1); - let arc2 = arc.clone(); - let _ = task::try(proc() { - arc2.write(|one| { - assert_eq!(*one, 2); - }) - }); - arc.write(|one| { - assert_eq!(*one, 1); - }) - } - #[test] #[should_fail] - fn test_rw_arc_poison_dw() { - let arc = RWArc::new(1); - let arc2 = arc.clone(); - let _ = task::try(proc() { - arc2.write_downgrade(|mut write_mode| { - write_mode.write(|one| { - assert_eq!(*one, 2); - }) - }) - }); - arc.write(|one| { - assert_eq!(*one, 1); - }) - } - #[test] - fn test_rw_arc_no_poison_rr() { - let arc = RWArc::new(1); - let arc2 = arc.clone(); - let _ = task::try(proc() { - arc2.read(|one| { - assert_eq!(*one, 2); - }) - }); - arc.read(|one| { - assert_eq!(*one, 1); - }) - } - #[test] - fn test_rw_arc_no_poison_rw() { - let arc = RWArc::new(1); - let arc2 = arc.clone(); - let _ = task::try(proc() { - arc2.read(|one| { - assert_eq!(*one, 2); - }) - }); - arc.write(|one| { - assert_eq!(*one, 1); - }) - } - #[test] - fn test_rw_arc_no_poison_dr() { - let arc = RWArc::new(1); - let arc2 = arc.clone(); - let _ = task::try(proc() { - arc2.write_downgrade(|write_mode| { - let read_mode = arc2.downgrade(write_mode); - read_mode.read(|one| { - assert_eq!(*one, 2); - }) - }) - }); - arc.write(|one| { - assert_eq!(*one, 1); - }) - } - #[test] - fn test_rw_arc() { - let arc = RWArc::new(0); - let arc2 = arc.clone(); - let (tx, rx) = channel(); - - task::spawn(proc() { - arc2.write(|num| { - for _ in range(0, 10) { - let tmp = *num; - *num = -1; - task::deschedule(); - *num = tmp + 1; - } - tx.send(()); - }) - }); + fn test_cowarc_clone_make_unique() { + let mut cow0 = Arc::new(75u); + let mut cow1 = cow0.clone(); + let mut cow2 = cow1.clone(); - // Readers try to catch the writer in the act - let mut children = Vec::new(); - for _ in range(0, 5) { - let arc3 = arc.clone(); - let mut builder = task::task(); - children.push(builder.future_result()); - builder.spawn(proc() { - arc3.read(|num| { - assert!(*num >= 0); - }) - }); - } + assert!(75 == *cow0.make_unique()); + assert!(75 == *cow1.make_unique()); + assert!(75 == *cow2.make_unique()); - // Wait for children to pass their asserts - for r in children.mut_iter() { - let _ = r.recv(); - } + *cow0.make_unique() += 1; + *cow1.make_unique() += 2; + *cow2.make_unique() += 3; - // Wait for writer to finish - rx.recv(); - arc.read(|num| { - assert_eq!(*num, 10); - }) - } + assert!(76 == *cow0); + assert!(77 == *cow1); + assert!(78 == *cow2); - #[test] - fn test_rw_arc_access_in_unwind() { - let arc = RWArc::new(1i); - let arc2 = arc.clone(); - let _ = task::try::<()>(proc() { - struct Unwinder { - i: RWArc - } - impl Drop for Unwinder { - fn drop(&mut self) { - self.i.write(|num| *num += 1); - } - } - let _u = Unwinder { i: arc2 }; - fail!(); - }); - assert_eq!(2, arc.read(|n| *n)); + // none should point to the same backing memory + assert!(*cow0 != *cow1); + assert!(*cow0 != *cow2); + assert!(*cow1 != *cow2); } #[test] - fn test_rw_downgrade() { - // (1) A downgrader gets in write mode and does cond.wait. - // (2) A writer gets in write mode, sets state to 42, and does signal. - // (3) Downgrader wakes, sets state to 31337. - // (4) tells writer and all other readers to contend as it downgrades. - // (5) Writer attempts to set state back to 42, while downgraded task - // and all reader tasks assert that it's 31337. - let arc = RWArc::new(0); - - // Reader tasks - let mut reader_convos = Vec::new(); - for _ in range(0, 10) { - let ((tx1, rx1), (tx2, rx2)) = (channel(), channel()); - reader_convos.push((tx1, rx2)); - let arcn = arc.clone(); - task::spawn(proc() { - rx1.recv(); // wait for downgrader to give go-ahead - arcn.read(|state| { - assert_eq!(*state, 31337); - tx2.send(()); - }) - }); - } - - // Writer task - let arc2 = arc.clone(); - let ((tx1, rx1), (tx2, rx2)) = (channel(), channel()); - task::spawn(proc() { - rx1.recv(); - arc2.write_cond(|state, cond| { - assert_eq!(*state, 0); - *state = 42; - cond.signal(); - }); - rx1.recv(); - arc2.write(|state| { - // This shouldn't happen until after the downgrade read - // section, and all other readers, finish. - assert_eq!(*state, 31337); - *state = 42; - }); - tx2.send(()); - }); - - // Downgrader (us) - arc.write_downgrade(|mut write_mode| { - write_mode.write_cond(|state, cond| { - tx1.send(()); // send to another writer who will wake us up - while *state == 0 { - cond.wait(); - } - assert_eq!(*state, 42); - *state = 31337; - // send to other readers - for &(ref mut rc, _) in reader_convos.mut_iter() { - rc.send(()) - } - }); - let read_mode = arc.downgrade(write_mode); - read_mode.read(|state| { - // complete handshake with other readers - for &(_, ref mut rp) in reader_convos.mut_iter() { - rp.recv() - } - tx1.send(()); // tell writer to try again - assert_eq!(*state, 31337); - }); - }); - - rx2.recv(); // complete handshake with writer - } - #[cfg(test)] - fn test_rw_write_cond_downgrade_read_race_helper() { - // Tests that when a downgrader hands off the "reader cloud" lock - // because of a contending reader, a writer can't race to get it - // instead, which would result in readers_and_writers. This tests - // the sync module rather than this one, but it's here because an - // rwarc gives us extra shared state to help check for the race. - // If you want to see this test fail, go to sync.rs and replace the - // line in RWLock::write_cond() that looks like: - // "blk(&ArcCondvar { order: opt_lock, ..*cond })" - // with just "blk(cond)". - let x = RWArc::new(true); - let (tx, rx) = channel(); + fn test_cowarc_clone_unique2() { + let mut cow0 = Arc::new(75u); + let cow1 = cow0.clone(); + let cow2 = cow1.clone(); - // writer task - let xw = x.clone(); - task::spawn(proc() { - xw.write_cond(|state, c| { - tx.send(()); // tell downgrader it's ok to go - c.wait(); - // The core of the test is here: the condvar reacquire path - // must involve order_lock, so that it cannot race with a reader - // trying to receive the "reader cloud lock hand-off". - *state = false; - }) - }); + assert!(75 == *cow0); + assert!(75 == *cow1); + assert!(75 == *cow2); - rx.recv(); // wait for writer to get in + *cow0.make_unique() += 1; - x.write_downgrade(|mut write_mode| { - write_mode.write_cond(|state, c| { - assert!(*state); - // make writer contend in the cond-reacquire path - c.signal(); - }); - // make a reader task to trigger the "reader cloud lock" handoff - let xr = x.clone(); - let (tx, rx) = channel(); - task::spawn(proc() { - tx.send(()); - xr.read(|_state| { }) - }); - rx.recv(); // wait for reader task to exist + assert!(76 == *cow0); + assert!(75 == *cow1); + assert!(75 == *cow2); - let read_mode = x.downgrade(write_mode); - read_mode.read(|state| { - // if writer mistakenly got in, make sure it mutates state - // before we assert on it - for _ in range(0, 5) { task::deschedule(); } - // make sure writer didn't get in. - assert!(*state); - }) - }); - } - #[test] - fn test_rw_write_cond_downgrade_read_race() { - // Ideally the above test case would have deschedule statements in it that - // helped to expose the race nearly 100% of the time... but adding - // deschedules in the intuitively-right locations made it even less likely, - // and I wasn't sure why :( . This is a mediocre "next best" option. - for _ in range(0, 8) { test_rw_write_cond_downgrade_read_race_helper(); } + // cow1 and cow2 should share the same contents + // cow0 should have a unique reference + assert!(*cow0 != *cow1); + assert!(*cow0 != *cow2); + assert!(*cow1 == *cow2); } #[test] - fn test_cowarc_clone() - { - let cow0 = CowArc::new(75u); - let cow1 = cow0.clone(); - let cow2 = cow1.clone(); - - assert!(75 == *cow0.get()); - assert!(75 == *cow1.get()); - assert!(75 == *cow2.get()); - - assert!(cow0.get() == cow1.get()); - assert!(cow0.get() == cow2.get()); + fn test_live() { + let x = Arc::new(5); + let y = x.downgrade(); + assert!(y.upgrade().is_some()); } #[test] - fn test_cowarc_clone_get_mut() - { - let mut cow0 = CowArc::new(75u); - let mut cow1 = cow0.clone(); - let mut cow2 = cow1.clone(); - - assert!(75 == *cow0.get_mut()); - assert!(75 == *cow1.get_mut()); - assert!(75 == *cow2.get_mut()); - - *cow0.get_mut() += 1; - *cow1.get_mut() += 2; - *cow2.get_mut() += 3; - - assert!(76 == *cow0.get()); - assert!(77 == *cow1.get()); - assert!(78 == *cow2.get()); - - // none should point to the same backing memory - assert!(cow0.get() != cow1.get()); - assert!(cow0.get() != cow2.get()); - assert!(cow1.get() != cow2.get()); + fn test_dead() { + let x = Arc::new(5); + let y = x.downgrade(); + drop(x); + assert!(y.upgrade().is_none()); } #[test] - fn test_cowarc_clone_get_mut2() - { - let mut cow0 = CowArc::new(75u); - let cow1 = cow0.clone(); - let cow2 = cow1.clone(); - - assert!(75 == *cow0.get()); - assert!(75 == *cow1.get()); - assert!(75 == *cow2.get()); - - *cow0.get_mut() += 1; + fn weak_self_cyclic() { + struct Cycle { + x: Mutex>> + } - assert!(76 == *cow0.get()); - assert!(75 == *cow1.get()); - assert!(75 == *cow2.get()); + let a = Arc::new(Cycle { x: Mutex::new(None) }); + let b = a.clone().downgrade(); + *a.deref().x.lock().deref_mut() = Some(b); - // cow1 and cow2 should share the same contents - // cow0 should have a unique reference - assert!(cow0.get() != cow1.get()); - assert!(cow0.get() != cow2.get()); - assert!(cow1.get() == cow2.get()); + // hopefully we don't double-free (or leak)... } } -- cgit 1.4.1-3-g733a5 From eff025797a6dc58dad724e8523223d07025ac697 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Sat, 22 Mar 2014 00:55:27 -0700 Subject: sync: Wire up all of the previous commits This updates the exports and layout of the crate --- src/libsync/lib.rs | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) (limited to 'src/libsync') diff --git a/src/libsync/lib.rs b/src/libsync/lib.rs index 70874a029ac..d166076e96e 100644 --- a/src/libsync/lib.rs +++ b/src/libsync/lib.rs @@ -20,18 +20,28 @@ html_favicon_url = "http://www.rust-lang.org/favicon.ico", html_root_url = "http://static.rust-lang.org/doc/master")]; #[feature(phase)]; +#[deny(missing_doc, deprecated_owned_vector)]; -#[cfg(test)] #[phase(syntax, link)] extern crate log; +#[cfg(test)] +#[phase(syntax, link)] extern crate log; -pub use arc::{Arc, MutexArc, RWArc, RWWriteMode, RWReadMode, ArcCondvar, CowArc}; -pub use sync::{Mutex, RWLock, Condvar, Semaphore, RWLockWriteMode, - RWLockReadMode, Barrier, one, mutex}; pub use comm::{DuplexStream, SyncSender, SyncReceiver, rendezvous, duplex}; pub use task_pool::TaskPool; pub use future::Future; +pub use arc::{Arc, Weak}; +pub use lock::{Mutex, MutexGuard, Condvar, Barrier, + RWLock, RWLockReadGuard, RWLockWriteGuard}; + +// The mutex/rwlock in this module are not meant for reexport +pub use raw::{Semaphore, SemaphoreGuard}; mod arc; -mod sync; mod comm; -mod task_pool; mod future; +mod lock; +mod mpsc_intrusive; +mod task_pool; + +pub mod raw; +pub mod mutex; +pub mod one; -- cgit 1.4.1-3-g733a5