about summary refs log tree commit diff
diff options
context:
space:
mode:
authorBen Blum <bblum@andrew.cmu.edu>2012-08-07 18:03:30 -0400
committerBen Blum <bblum@andrew.cmu.edu>2012-08-07 18:18:48 -0400
commitf2ca54991f2af6bf3195e78bb22e509cda9bcf9f (patch)
treece6bc45a1742a143b5f5734fb8e4b223d6bb0ce1
parentbdbad614ac7ea948fdd0d9cfb509d9b05008c3c2 (diff)
downloadrust-f2ca54991f2af6bf3195e78bb22e509cda9bcf9f.tar.gz
rust-f2ca54991f2af6bf3195e78bb22e509cda9bcf9f.zip
Add sync.rs with counting blocking semaphores
-rw-r--r--src/libcore/core.rc3
-rw-r--r--src/libcore/sync.rs189
2 files changed, 191 insertions, 1 deletions
diff --git a/src/libcore/core.rc b/src/libcore/core.rc
index d78da3b2533..06b9c6f2c9e 100644
--- a/src/libcore/core.rc
+++ b/src/libcore/core.rc
@@ -39,7 +39,7 @@ export float, f32, f64;
 export box, char, str, ptr, vec, at_vec, bool;
 export either, option, result, iter;
 export libc, os, io, run, rand, sys, unsafe, logging;
-export arc, comm, task, future, pipes;
+export arc, comm, task, future, pipes, sync;
 export extfmt;
 // The test harness links against core, so don't include runtime in tests.
 // FIXME (#2861): Uncomment this after snapshot gets updated.
@@ -204,6 +204,7 @@ mod comm;
 mod task;
 mod future;
 mod pipes;
+mod sync;
 
 // Runtime and language-primitive support
 
diff --git a/src/libcore/sync.rs b/src/libcore/sync.rs
new file mode 100644
index 00000000000..3e18a9b1634
--- /dev/null
+++ b/src/libcore/sync.rs
@@ -0,0 +1,189 @@
+/**
+ * The concurrency primitives you know and love.
+ *
+ * Maybe once we have a "core exports x only to std" mechanism, these can be
+ * in std.
+ */
+
+export semaphore, new_semaphore;
+
+// FIXME (#3119) This shouldn't be a thing exported from core.
+import arc::exclusive;
+
+// Each waiting task receives on one of these. FIXME #3125 make these oneshot.
+type wait_end = pipes::port<()>;
+type signal_end = pipes::chan<()>;
+// A doubly-ended queue of waiting tasks.
+type waitqueue = { head: pipes::port<signal_end>,
+                   tail: pipes::chan<signal_end> };
+
+fn new_waiter() -> (signal_end, wait_end) { pipes::stream() }
+
+/// A counting semaphore.
+enum semaphore = exclusive<{
+    mut count: int,
+    waiters:   waitqueue,
+}>;
+
+/// Create a new semaphore with the specified count.
+fn new_semaphore(count: int) -> semaphore {
+    let (tail, head) = pipes::stream();
+    semaphore(exclusive({ mut count: count,
+                          waiters: { head: head, tail: tail } }))
+}
+
+impl semaphore for &semaphore {
+    /// Creates a new handle to the semaphore.
+    fn clone() -> semaphore {
+        semaphore((**self).clone())
+    }
+
+    /**
+     * Acquires a resource represented by the semaphore. Blocks if necessary
+     * until resource(s) become available.
+     */
+    fn wait() {
+        let mut waiter_nobe = none;
+        unsafe {
+            do (**self).with |state| {
+                state.count -= 1;
+                if state.count < 0 {
+                    let (signal_end,wait_end) = new_waiter();
+                    waiter_nobe = some(wait_end);
+                    // Enqueue ourself.
+                    state.waiters.tail.send(signal_end);
+                }
+            }
+        }
+        for 1000.times { task::yield(); }
+        // Need to wait outside the exclusive.
+        if waiter_nobe.is_some() {
+            let _ = option::unwrap(waiter_nobe).recv();
+        }
+    }
+
+    /**
+     * Release a held resource represented by the semaphore. Wakes a blocked
+     * contending task, if any exist.
+     */
+    fn signal() {
+        unsafe {
+            do (**self).with |state| {
+                state.count += 1;
+                // The peek is mandatory to make sure recv doesn't block.
+                if state.count >= 0 && state.waiters.head.peek() {
+                    // Pop off the waitqueue and send a wakeup signal. If the
+                    // waiter was killed, its port will have closed, and send
+                    // will fail. Keep trying until we get a live task.
+                    state.waiters.head.recv().send(());
+                    // to-do: use this version when it's ready, kill-friendly.
+                    // while !state.waiters.head.recv().try_send(()) { }
+                }
+            }
+        }
+    }
+
+    /// Runs a function with ownership of one of the semaphore's resources.
+    fn access<U>(blk: fn() -> U) -> U {
+        self.wait();
+        let _x = sem_release(self);
+        blk()
+    }
+}
+
+// FIXME(#3136) should go inside of access()
+struct sem_release {
+    sem: &semaphore;
+    new(sem: &semaphore) { self.sem = sem; }
+    drop { self.sem.signal(); }
+}
+
+#[cfg(test)]
+mod tests {
+    #[test]
+    fn test_sem_as_mutex() {
+        let s = ~new_semaphore(1);
+        let s2 = ~s.clone();
+        do task::spawn {
+            do s2.access {
+                for 10.times { task::yield(); }
+            }
+        }
+        do s.access {
+            for 10.times { task::yield(); }
+        }
+    }
+    #[test]
+    fn test_sem_as_cvar() {
+        /* Child waits and parent signals */
+        let (c,p) = pipes::stream();
+        let s = ~new_semaphore(0);
+        let s2 = ~s.clone();
+        do task::spawn {
+            s2.wait();
+            c.send(());
+        }
+        for 10.times { task::yield(); }
+        s.signal();
+        let _ = p.recv();
+        
+        /* Parent waits and child signals */
+        let (c,p) = pipes::stream();
+        let s = ~new_semaphore(0);
+        let s2 = ~s.clone();
+        do task::spawn {
+            for 10.times { task::yield(); }
+            s2.signal();
+            let _ = p.recv();
+        }
+        s.wait();
+        c.send(());
+    }
+    #[test]
+    fn test_sem_mutual_exclusion() {
+        let (c,p) = pipes::stream();
+        let s = ~new_semaphore(1);
+        let s2 = ~s.clone();
+        let sharedstate = ~0;
+        let ptr = ptr::addr_of(*sharedstate);
+        do task::spawn {
+            let sharedstate = unsafe { unsafe::reinterpret_cast(ptr) };
+            access_shared(sharedstate, s2, 10);
+            c.send(());
+        }
+        access_shared(sharedstate, s, 10);
+        let _ = p.recv();
+
+        assert *sharedstate == 20;
+
+        fn access_shared(sharedstate: &mut int, sem: &semaphore, n: uint) {
+            for n.times {
+                do sem.access {
+                    let oldval = *sharedstate;
+                    task::yield();
+                    *sharedstate = oldval + 1;
+                }
+            }
+        }
+    }
+    #[test]
+    fn test_sem_runtime_friendly_blocking() {
+        do task::spawn_sched(task::manual_threads(1)) {
+            let s = ~new_semaphore(1);
+            let s2 = ~s.clone();
+            let (c,p) = pipes::stream();
+            let child_data = ~mut some((s2,c));
+            do s.access {
+                let (s2,c) = option::swap_unwrap(child_data);
+                do task::spawn {
+                    c.send(());
+                    do s2.access { }
+                    c.send(());
+                }
+                let _ = p.recv(); // wait for child to come alive
+                for 5.times { task::yield(); } // let the child contend
+            }
+            let _ = p.recv(); // wait for child to be done
+        }
+    }
+}