about summary refs log tree commit diff
path: root/src/libstd
diff options
context:
space:
mode:
Diffstat (limited to 'src/libstd')
-rw-r--r--src/libstd/unstable/sync.rs230
1 files changed, 211 insertions, 19 deletions
diff --git a/src/libstd/unstable/sync.rs b/src/libstd/unstable/sync.rs
index cfdbb4930eb..f80ad7239b5 100644
--- a/src/libstd/unstable/sync.rs
+++ b/src/libstd/unstable/sync.rs
@@ -9,12 +9,15 @@
 // except according to those terms.
 
 use cast;
+use cell::Cell;
+use comm;
 use libc;
+use ptr;
 use option::*;
 use task;
 use task::atomically;
+use unstable::atomics::{AtomicOption,AtomicUint,Acquire,Release,SeqCst};
 use unstable::finally::Finally;
-use unstable::intrinsics;
 use ops::Drop;
 use clone::Clone;
 use kinds::Send;
@@ -27,14 +30,22 @@ pub struct UnsafeAtomicRcBox<T> {
 }
 
 struct AtomicRcBoxData<T> {
-    count: int,
+    count: AtomicUint,
+    // An unwrapper uses this protocol to communicate with the "other" task that
+    // drops the last refcount on an arc. Unfortunately this can't be a proper
+    // pipe protocol because the unwrapper has to access both stages at once.
+    // FIXME(#7544): Maybe use AtomicPtr instead (to avoid xchg in take() later)?
+    unwrapper: AtomicOption<(comm::ChanOne<()>, comm::PortOne<bool>)>,
+    // FIXME(#3224) should be able to make this non-option to save memory
     data: Option<T>,
 }
 
 impl<T: Send> UnsafeAtomicRcBox<T> {
     pub fn new(data: T) -> UnsafeAtomicRcBox<T> {
         unsafe {
-            let data = ~AtomicRcBoxData { count: 1, data: Some(data) };
+            let data = ~AtomicRcBoxData { count: AtomicUint::new(1),
+                                          unwrapper: AtomicOption::empty(),
+                                          data: Some(data) };
             let ptr = cast::transmute(data);
             return UnsafeAtomicRcBox { data: ptr };
         }
@@ -44,7 +55,7 @@ impl<T: Send> UnsafeAtomicRcBox<T> {
     pub unsafe fn get(&self) -> *mut T
     {
         let mut data: ~AtomicRcBoxData<T> = cast::transmute(self.data);
-        assert!(data.count > 0);
+        assert!(data.count.load(Acquire) > 0); // no barrier is really needed
         let r: *mut T = data.data.get_mut_ref();
         cast::forget(data);
         return r;
@@ -53,20 +64,88 @@ impl<T: Send> UnsafeAtomicRcBox<T> {
     #[inline]
     pub unsafe fn get_immut(&self) -> *T
     {
-        let mut data: ~AtomicRcBoxData<T> = cast::transmute(self.data);
-        assert!(data.count > 0);
-        let r: *T = cast::transmute_immut(data.data.get_mut_ref());
+        let data: ~AtomicRcBoxData<T> = cast::transmute(self.data);
+        assert!(data.count.load(Acquire) > 0); // no barrier is really needed
+        let r: *T = data.data.get_ref();
         cast::forget(data);
         return r;
     }
+
+    /// Wait until all other handles are dropped, then retrieve the enclosed
+    /// data. See extra::arc::ARC for specific semantics documentation.
+    /// If called when the task is already unkillable, unwrap will unkillably
+    /// block; otherwise, an unwrapping task can be killed by linked failure.
+    pub unsafe fn unwrap(self) -> T {
+        let this = Cell::new(self); // argh
+        do task::unkillable {
+            let mut this = this.take();
+            let mut data: ~AtomicRcBoxData<T> = cast::transmute(this.data);
+            // Set up the unwrap protocol.
+            let (p1,c1) = comm::oneshot(); // ()
+            let (p2,c2) = comm::oneshot(); // bool
+            // Try to put our server end in the unwrapper slot.
+            // This needs no barrier -- it's protected by the release barrier on
+            // the xadd, and the acquire+release barrier in the destructor's xadd.
+            // FIXME(#6598) Change Acquire to Relaxed.
+            if data.unwrapper.fill(~(c1,p2), Acquire).is_none() {
+                // Got in. Tell this handle's destructor not to run (we are now it).
+                this.data = ptr::mut_null();
+                // Drop our own reference.
+                let old_count = data.count.fetch_sub(1, Release);
+                assert!(old_count >= 1);
+                if old_count == 1 {
+                    // We were the last owner. Can unwrap immediately.
+                    // AtomicOption's destructor will free the server endpoint.
+                    // FIXME(#3224): it should be like this
+                    // let ~AtomicRcBoxData { data: user_data, _ } = data;
+                    // user_data
+                    data.data.take_unwrap()
+                } else {
+                    // The *next* person who sees the refcount hit 0 will wake us.
+                    let p1 = Cell::new(p1); // argh
+                    // Unlike the above one, this cell is necessary. It will get
+                    // taken either in the do block or in the finally block.
+                    let c2_and_data = Cell::new((c2,data));
+                    do (|| {
+                        do task::rekillable { p1.take().recv(); }
+                        // Got here. Back in the 'unkillable' without getting killed.
+                        let (c2, data) = c2_and_data.take();
+                        c2.send(true);
+                        // FIXME(#3224): it should be like this
+                        // let ~AtomicRcBoxData { data: user_data, _ } = data;
+                        // user_data
+                        let mut data = data;
+                        data.data.take_unwrap()
+                    }).finally {
+                        if task::failing() {
+                            // Killed during wait. Because this might happen while
+                            // someone else still holds a reference, we can't free
+                            // the data now; the "other" last refcount will free it.
+                            let (c2, data) = c2_and_data.take();
+                            c2.send(false);
+                            cast::forget(data);
+                        } else {
+                            assert!(c2_and_data.is_empty());
+                        }
+                    }
+                }
+            } else {
+                // If 'put' returns the server end back to us, we were rejected;
+                // someone else was trying to unwrap. Avoid guaranteed deadlock.
+                cast::forget(data);
+                fail!("Another task is already unwrapping this ARC!");
+            }
+        }
+    }
 }
 
 impl<T: Send> Clone for UnsafeAtomicRcBox<T> {
     fn clone(&self) -> UnsafeAtomicRcBox<T> {
         unsafe {
             let mut data: ~AtomicRcBoxData<T> = cast::transmute(self.data);
-            let new_count = intrinsics::atomic_xadd(&mut data.count, 1) + 1;
-            assert!(new_count >= 2);
+            // This barrier might be unnecessary, but I'm not sure...
+            let old_count = data.count.fetch_add(1, Acquire);
+            assert!(old_count >= 1);
             cast::forget(data);
             return UnsafeAtomicRcBox { data: self.data };
         }
@@ -77,12 +156,37 @@ impl<T: Send> Clone for UnsafeAtomicRcBox<T> {
 impl<T> Drop for UnsafeAtomicRcBox<T>{
     fn drop(&self) {
         unsafe {
+            if self.data.is_null() {
+                return; // Happens when destructing an unwrapper's handle.
+            }
             do task::unkillable {
                 let mut data: ~AtomicRcBoxData<T> = cast::transmute(self.data);
-                let new_count = intrinsics::atomic_xsub(&mut data.count, 1) - 1;
-                assert!(new_count >= 0);
-                if new_count == 0 {
-                    // drop glue takes over.
+                // Must be acquire+release, not just release, to make sure this
+                // doesn't get reordered to after the unwrapper pointer load.
+                let old_count = data.count.fetch_sub(1, SeqCst);
+                assert!(old_count >= 1);
+                if old_count == 1 {
+                    // Were we really last, or should we hand off to an
+                    // unwrapper? It's safe to not xchg because the unwrapper
+                    // will set the unwrap lock *before* dropping his/her
+                    // reference. In effect, being here means we're the only
+                    // *awake* task with the data.
+                    match data.unwrapper.take(Acquire) {
+                        Some(~(message,response)) => {
+                            // Send 'ready' and wait for a response.
+                            message.send(());
+                            // Unkillable wait. Message guaranteed to come.
+                            if response.recv() {
+                                // Other task got the data.
+                                cast::forget(data);
+                            } else {
+                                // Other task was killed. drop glue takes over.
+                            }
+                        }
+                        None => {
+                            // drop glue takes over.
+                        }
+                    }
                 } else {
                     cast::forget(data);
                 }
@@ -139,6 +243,13 @@ struct ExData<T> {
 
 /**
  * An arc over mutable data that is protected by a lock. For library use only.
+ *
+ * # Safety note
+ *
+ * This uses a pthread mutex, not one that's aware of the userspace scheduler.
+ * The user of an exclusive must be careful not to invoke any functions that may
+ * reschedule the task while holding the lock, or deadlock may result. If you
+ * need to block or yield while accessing shared state, use extra::sync::RWARC.
  */
 pub struct Exclusive<T> {
     x: UnsafeAtomicRcBox<ExData<T>>
@@ -189,12 +300,13 @@ impl<T:Send> Exclusive<T> {
             f(cast::transmute_immut(x))
         }
     }
-}
 
-fn compare_and_swap(address: &mut int, oldval: int, newval: int) -> bool {
-    unsafe {
-        let old = intrinsics::atomic_cxchg(address, oldval, newval);
-        old == oldval
+    pub fn unwrap(self) -> T {
+        let Exclusive { x: x } = self;
+        // Someday we might need to unkillably unwrap an exclusive, but not today.
+        let inner = unsafe { x.unwrap() };
+        let ExData { data: user_data, _ } = inner; // will destroy the LittleLock
+        user_data
     }
 }
 
@@ -208,10 +320,13 @@ extern {
 #[cfg(test)]
 mod tests {
     use super::*;
+    use cell::Cell;
     use comm;
-    use super::exclusive;
+    use option::*;
+    use super::{exclusive, UnsafeAtomicRcBox};
     use task;
     use uint;
+    use util;
 
     #[test]
     fn exclusive_arc() {
@@ -263,4 +378,81 @@ mod tests {
             }
         }
     }
+
+    #[test]
+    fn unsafe_unwrap_basic() {
+        unsafe {
+            let x = UnsafeAtomicRcBox::new(~~"hello");
+            assert!(x.unwrap() == ~~"hello");
+        }
+    }
+
+    #[test]
+    fn exclusive_unwrap_basic() {
+        // Unlike the above, also tests no double-freeing of the LittleLock.
+        let x = exclusive(~~"hello");
+        assert!(x.unwrap() == ~~"hello");
+    }
+
+    #[test]
+    fn exclusive_unwrap_contended() {
+        let x = exclusive(~~"hello");
+        let x2 = Cell::new(x.clone());
+        do task::spawn {
+            let x2 = x2.take();
+            unsafe { do x2.with |_hello| { } }
+            task::yield();
+        }
+        assert!(x.unwrap() == ~~"hello");
+
+        // Now try the same thing, but with the child task blocking.
+        let x = exclusive(~~"hello");
+        let x2 = Cell::new(x.clone());
+        let mut res = None;
+        let mut builder = task::task();
+        builder.future_result(|r| res = Some(r));
+        do builder.spawn {
+            let x2 = x2.take();
+            assert!(x2.unwrap() == ~~"hello");
+        }
+        // Have to get rid of our reference before blocking.
+        util::ignore(x);
+        res.unwrap().recv();
+    }
+
+    #[test] #[should_fail] #[ignore(cfg(windows))]
+    fn exclusive_unwrap_conflict() {
+        let x = exclusive(~~"hello");
+        let x2 = Cell::new(x.clone());
+        let mut res = None;
+        let mut builder = task::task();
+        builder.future_result(|r| res = Some(r));
+        do builder.spawn {
+            let x2 = x2.take();
+            assert!(x2.unwrap() == ~~"hello");
+        }
+        assert!(x.unwrap() == ~~"hello");
+        // See #4689 for why this can't be just "res.recv()".
+        assert!(res.unwrap().recv() == task::Success);
+    }
+
+    #[test] #[ignore(cfg(windows))]
+    fn exclusive_unwrap_deadlock() {
+        // This is not guaranteed to get to the deadlock before being killed,
+        // but it will show up sometimes, and if the deadlock were not there,
+        // the test would nondeterministically fail.
+        let result = do task::try {
+            // a task that has two references to the same exclusive will
+            // deadlock when it unwraps. nothing to be done about that.
+            let x = exclusive(~~"hello");
+            let x2 = x.clone();
+            do task::spawn {
+                for 10.times { task::yield(); } // try to let the unwrapper go
+                fail!(); // punt it awake from its deadlock
+            }
+            let _z = x.unwrap();
+            unsafe { do x2.with |_hello| { } }
+        };
+        assert!(result.is_err());
+    }
 }