diff options
| author | Ben Blum <bblum@andrew.cmu.edu> | 2013-07-02 13:37:19 -0400 |
|---|---|---|
| committer | Ben Blum <bblum@andrew.cmu.edu> | 2013-07-20 05:08:55 -0400 |
| commit | 10a400ffaa8b67add66a62bde0ef3c415d7aed5f (patch) | |
| tree | 10c5798e9fe2a1c62ebf0cf629f7034c992314ed /src/libstd/unstable/sync.rs | |
| parent | 55adc4467b4364d949774d5bfd1eba4a16c0b810 (diff) | |
| download | rust-10a400ffaa8b67add66a62bde0ef3c415d7aed5f.tar.gz rust-10a400ffaa8b67add66a62bde0ef3c415d7aed5f.zip | |
Reimplement ARC::unwrap() and friends.
Diffstat (limited to 'src/libstd/unstable/sync.rs')
| -rw-r--r-- | src/libstd/unstable/sync.rs | 230 |
1 files changed, 211 insertions, 19 deletions
diff --git a/src/libstd/unstable/sync.rs b/src/libstd/unstable/sync.rs index cfdbb4930eb..f80ad7239b5 100644 --- a/src/libstd/unstable/sync.rs +++ b/src/libstd/unstable/sync.rs @@ -9,12 +9,15 @@ // except according to those terms. use cast; +use cell::Cell; +use comm; use libc; +use ptr; use option::*; use task; use task::atomically; +use unstable::atomics::{AtomicOption,AtomicUint,Acquire,Release,SeqCst}; use unstable::finally::Finally; -use unstable::intrinsics; use ops::Drop; use clone::Clone; use kinds::Send; @@ -27,14 +30,22 @@ pub struct UnsafeAtomicRcBox<T> { } struct AtomicRcBoxData<T> { - count: int, + count: AtomicUint, + // An unwrapper uses this protocol to communicate with the "other" task that + // drops the last refcount on an arc. Unfortunately this can't be a proper + // pipe protocol because the unwrapper has to access both stages at once. + // FIXME(#7544): Maybe use AtomicPtr instead (to avoid xchg in take() later)? + unwrapper: AtomicOption<(comm::ChanOne<()>, comm::PortOne<bool>)>, + // FIXME(#3224) should be able to make this non-option to save memory data: Option<T>, } impl<T: Send> UnsafeAtomicRcBox<T> { pub fn new(data: T) -> UnsafeAtomicRcBox<T> { unsafe { - let data = ~AtomicRcBoxData { count: 1, data: Some(data) }; + let data = ~AtomicRcBoxData { count: AtomicUint::new(1), + unwrapper: AtomicOption::empty(), + data: Some(data) }; let ptr = cast::transmute(data); return UnsafeAtomicRcBox { data: ptr }; } @@ -44,7 +55,7 @@ impl<T: Send> UnsafeAtomicRcBox<T> { pub unsafe fn get(&self) -> *mut T { let mut data: ~AtomicRcBoxData<T> = cast::transmute(self.data); - assert!(data.count > 0); + assert!(data.count.load(Acquire) > 0); // no barrier is really needed let r: *mut T = data.data.get_mut_ref(); cast::forget(data); return r; @@ -53,20 +64,88 @@ impl<T: Send> UnsafeAtomicRcBox<T> { #[inline] pub unsafe fn get_immut(&self) -> *T { - let mut data: ~AtomicRcBoxData<T> = cast::transmute(self.data); - assert!(data.count > 0); - let r: *T = cast::transmute_immut(data.data.get_mut_ref()); + let data: ~AtomicRcBoxData<T> = cast::transmute(self.data); + assert!(data.count.load(Acquire) > 0); // no barrier is really needed + let r: *T = data.data.get_ref(); cast::forget(data); return r; } + + /// Wait until all other handles are dropped, then retrieve the enclosed + /// data. See extra::arc::ARC for specific semantics documentation. + /// If called when the task is already unkillable, unwrap will unkillably + /// block; otherwise, an unwrapping task can be killed by linked failure. + pub unsafe fn unwrap(self) -> T { + let this = Cell::new(self); // argh + do task::unkillable { + let mut this = this.take(); + let mut data: ~AtomicRcBoxData<T> = cast::transmute(this.data); + // Set up the unwrap protocol. + let (p1,c1) = comm::oneshot(); // () + let (p2,c2) = comm::oneshot(); // bool + // Try to put our server end in the unwrapper slot. + // This needs no barrier -- it's protected by the release barrier on + // the xadd, and the acquire+release barrier in the destructor's xadd. + // FIXME(#6598) Change Acquire to Relaxed. + if data.unwrapper.fill(~(c1,p2), Acquire).is_none() { + // Got in. Tell this handle's destructor not to run (we are now it). + this.data = ptr::mut_null(); + // Drop our own reference. + let old_count = data.count.fetch_sub(1, Release); + assert!(old_count >= 1); + if old_count == 1 { + // We were the last owner. Can unwrap immediately. + // AtomicOption's destructor will free the server endpoint. + // FIXME(#3224): it should be like this + // let ~AtomicRcBoxData { data: user_data, _ } = data; + // user_data + data.data.take_unwrap() + } else { + // The *next* person who sees the refcount hit 0 will wake us. + let p1 = Cell::new(p1); // argh + // Unlike the above one, this cell is necessary. It will get + // taken either in the do block or in the finally block. + let c2_and_data = Cell::new((c2,data)); + do (|| { + do task::rekillable { p1.take().recv(); } + // Got here. Back in the 'unkillable' without getting killed. + let (c2, data) = c2_and_data.take(); + c2.send(true); + // FIXME(#3224): it should be like this + // let ~AtomicRcBoxData { data: user_data, _ } = data; + // user_data + let mut data = data; + data.data.take_unwrap() + }).finally { + if task::failing() { + // Killed during wait. Because this might happen while + // someone else still holds a reference, we can't free + // the data now; the "other" last refcount will free it. + let (c2, data) = c2_and_data.take(); + c2.send(false); + cast::forget(data); + } else { + assert!(c2_and_data.is_empty()); + } + } + } + } else { + // If 'put' returns the server end back to us, we were rejected; + // someone else was trying to unwrap. Avoid guaranteed deadlock. + cast::forget(data); + fail!("Another task is already unwrapping this ARC!"); + } + } + } } impl<T: Send> Clone for UnsafeAtomicRcBox<T> { fn clone(&self) -> UnsafeAtomicRcBox<T> { unsafe { let mut data: ~AtomicRcBoxData<T> = cast::transmute(self.data); - let new_count = intrinsics::atomic_xadd(&mut data.count, 1) + 1; - assert!(new_count >= 2); + // This barrier might be unnecessary, but I'm not sure... + let old_count = data.count.fetch_add(1, Acquire); + assert!(old_count >= 1); cast::forget(data); return UnsafeAtomicRcBox { data: self.data }; } @@ -77,12 +156,37 @@ impl<T: Send> Clone for UnsafeAtomicRcBox<T> { impl<T> Drop for UnsafeAtomicRcBox<T>{ fn drop(&self) { unsafe { + if self.data.is_null() { + return; // Happens when destructing an unwrapper's handle. + } do task::unkillable { let mut data: ~AtomicRcBoxData<T> = cast::transmute(self.data); - let new_count = intrinsics::atomic_xsub(&mut data.count, 1) - 1; - assert!(new_count >= 0); - if new_count == 0 { - // drop glue takes over. + // Must be acquire+release, not just release, to make sure this + // doesn't get reordered to after the unwrapper pointer load. + let old_count = data.count.fetch_sub(1, SeqCst); + assert!(old_count >= 1); + if old_count == 1 { + // Were we really last, or should we hand off to an + // unwrapper? It's safe to not xchg because the unwrapper + // will set the unwrap lock *before* dropping his/her + // reference. In effect, being here means we're the only + // *awake* task with the data. + match data.unwrapper.take(Acquire) { + Some(~(message,response)) => { + // Send 'ready' and wait for a response. + message.send(()); + // Unkillable wait. Message guaranteed to come. + if response.recv() { + // Other task got the data. + cast::forget(data); + } else { + // Other task was killed. drop glue takes over. + } + } + None => { + // drop glue takes over. + } + } } else { cast::forget(data); } @@ -139,6 +243,13 @@ struct ExData<T> { /** * An arc over mutable data that is protected by a lock. For library use only. + * + * # Safety note + * + * This uses a pthread mutex, not one that's aware of the userspace scheduler. + * The user of an exclusive must be careful not to invoke any functions that may + * reschedule the task while holding the lock, or deadlock may result. If you + * need to block or yield while accessing shared state, use extra::sync::RWARC. */ pub struct Exclusive<T> { x: UnsafeAtomicRcBox<ExData<T>> @@ -189,12 +300,13 @@ impl<T:Send> Exclusive<T> { f(cast::transmute_immut(x)) } } -} -fn compare_and_swap(address: &mut int, oldval: int, newval: int) -> bool { - unsafe { - let old = intrinsics::atomic_cxchg(address, oldval, newval); - old == oldval + pub fn unwrap(self) -> T { + let Exclusive { x: x } = self; + // Someday we might need to unkillably unwrap an exclusive, but not today. + let inner = unsafe { x.unwrap() }; + let ExData { data: user_data, _ } = inner; // will destroy the LittleLock + user_data } } @@ -208,10 +320,13 @@ extern { #[cfg(test)] mod tests { use super::*; + use cell::Cell; use comm; - use super::exclusive; + use option::*; + use super::{exclusive, UnsafeAtomicRcBox}; use task; use uint; + use util; #[test] fn exclusive_arc() { @@ -263,4 +378,81 @@ mod tests { } } } + + #[test] + fn unsafe_unwrap_basic() { + unsafe { + let x = UnsafeAtomicRcBox::new(~~"hello"); + assert!(x.unwrap() == ~~"hello"); + } + } + + #[test] + fn exclusive_unwrap_basic() { + // Unlike the above, also tests no double-freeing of the LittleLock. + let x = exclusive(~~"hello"); + assert!(x.unwrap() == ~~"hello"); + } + + #[test] + fn exclusive_unwrap_contended() { + let x = exclusive(~~"hello"); + let x2 = Cell::new(x.clone()); + do task::spawn { + let x2 = x2.take(); + unsafe { do x2.with |_hello| { } } + task::yield(); + } + assert!(x.unwrap() == ~~"hello"); + + // Now try the same thing, but with the child task blocking. + let x = exclusive(~~"hello"); + let x2 = Cell::new(x.clone()); + let mut res = None; + let mut builder = task::task(); + builder.future_result(|r| res = Some(r)); + do builder.spawn { + let x2 = x2.take(); + assert!(x2.unwrap() == ~~"hello"); + } + // Have to get rid of our reference before blocking. + util::ignore(x); + res.unwrap().recv(); + } + + #[test] #[should_fail] #[ignore(cfg(windows))] + fn exclusive_unwrap_conflict() { + let x = exclusive(~~"hello"); + let x2 = Cell::new(x.clone()); + let mut res = None; + let mut builder = task::task(); + builder.future_result(|r| res = Some(r)); + do builder.spawn { + let x2 = x2.take(); + assert!(x2.unwrap() == ~~"hello"); + } + assert!(x.unwrap() == ~~"hello"); + // See #4689 for why this can't be just "res.recv()". + assert!(res.unwrap().recv() == task::Success); + } + + #[test] #[ignore(cfg(windows))] + fn exclusive_unwrap_deadlock() { + // This is not guaranteed to get to the deadlock before being killed, + // but it will show up sometimes, and if the deadlock were not there, + // the test would nondeterministically fail. + let result = do task::try { + // a task that has two references to the same exclusive will + // deadlock when it unwraps. nothing to be done about that. + let x = exclusive(~~"hello"); + let x2 = x.clone(); + do task::spawn { + for 10.times { task::yield(); } // try to let the unwrapper go + fail!(); // punt it awake from its deadlock + } + let _z = x.unwrap(); + unsafe { do x2.with |_hello| { } } + }; + assert!(result.is_err()); + } } |
