about summary refs log tree commit diff
path: root/src/libstd/comm
diff options
context:
space:
mode:
authorAaron Turon <aturon@mozilla.com>2014-12-01 08:49:32 -0800
committerAaron Turon <aturon@mozilla.com>2014-12-18 23:31:51 -0800
commitd8e4780b0b59636cd979a60434a407142e407ac9 (patch)
tree8b13d1e484c9b461ca1b5f0c80ef5ac35da1c44b /src/libstd/comm
parent7fd7ce682dd6f98d456d817a297b15bdc9841190 (diff)
downloadrust-d8e4780b0b59636cd979a60434a407142e407ac9.tar.gz
rust-d8e4780b0b59636cd979a60434a407142e407ac9.zip
Remove rt::{mutex, exclusive}
Diffstat (limited to 'src/libstd/comm')
-rw-r--r--src/libstd/comm/sync.rs111
1 files changed, 62 insertions, 49 deletions
diff --git a/src/libstd/comm/sync.rs b/src/libstd/comm/sync.rs
index 7e87596429c..9e4bdb15b00 100644
--- a/src/libstd/comm/sync.rs
+++ b/src/libstd/comm/sync.rs
@@ -52,9 +52,7 @@ pub struct Packet<T> {
     /// the other shared channel already had the code implemented
     channels: atomic::AtomicUint,
 
-    /// The state field is protected by this mutex
-    lock: NativeMutex,
-    state: UnsafeCell<State<T>>,
+    lock: Mutex<State<T>>,
 }
 
 struct State<T> {
@@ -107,9 +105,25 @@ pub enum Failure {
 
 /// Atomically blocks the current thread, placing it into `slot`, unlocking `lock`
 /// in the meantime. This re-locks the mutex upon returning.
+fn wait<'a, 'b, T>(lock: &'a Mutex<State<T>>,
+                   guard: MutexGuard<'b, State<T>>,
+                   f: fn(BlockedTask) -> Blocker)
+                   -> MutexGuard<'a, State<T>>
+{
+    let me: Box<Task> = Local::take();
+    me.deschedule(1, |task| {
+        match mem::replace(&mut guard.blocker, f(task)) {
+            NoneBlocked => {}
+            _ => unreachable!(),
+        }
+        mem::drop(guard);
+        Ok(())
+    });
+    lock.lock()
+}
 
-/// Wakes up a thread, dropping the lock at the correct time
-fn wakeup<T>(token: SignalToken, guard: MutexGuard<State<T>>) {
+/// Wakes up a task, dropping the lock at the correct time
+fn wakeup<T>(task: BlockedTask, guard: MutexGuard<State<T>>) {
     // We need to be careful to wake up the waiting task *outside* of the mutex
     // in case it incurs a context switch.
     drop(guard);
@@ -120,8 +134,7 @@ impl<T: Send> Packet<T> {
     pub fn new(cap: uint) -> Packet<T> {
         Packet {
             channels: atomic::AtomicUint::new(1),
-            lock: unsafe { NativeMutex::new() },
-            state: UnsafeCell::new(State {
+            lock: Mutex::new(State {
                 disconnected: false,
                 blocker: NoneBlocked,
                 cap: cap,
@@ -161,17 +174,17 @@ impl<T: Send> Packet<T> {
         if guard.disconnected { return Err(t) }
         guard.buf.enqueue(t);
 
-        match mem::replace(&mut state.blocker, NoneBlocked) {
+        match mem::replace(&mut guard.blocker, NoneBlocked) {
             // if our capacity is 0, then we need to wait for a receiver to be
             // available to take our data. After waiting, we check again to make
             // sure the port didn't go away in the meantime. If it did, we need
             // to hand back our data.
-            NoneBlocked if state.cap == 0 => {
+            NoneBlocked if guard.cap == 0 => {
                 let mut canceled = false;
-                assert!(state.canceled.is_none());
-                state.canceled = Some(unsafe { mem::transmute(&mut canceled) });
-                wait(&mut state.blocker, BlockedSender, &self.lock);
-                if canceled {Err(state.buf.dequeue())} else {Ok(())}
+                assert!(guard.canceled.is_none());
+                guard.canceled = Some(unsafe { mem::transmute(&mut canceled) });
+                let guard = wait(&self.lock, guard, BlockedSender);
+                if canceled {Err(guard.buf.dequeue())} else {Ok(())}
             }
 
             // success, we buffered some data
@@ -185,15 +198,15 @@ impl<T: Send> Packet<T> {
     }
 
     pub fn try_send(&self, t: T) -> Result<(), super::TrySendError<T>> {
-        let (guard, state) = self.lock();
-        if state.disconnected {
+        let guard = self.lock.lock();
+        if guard.disconnected {
             Err(super::RecvDisconnected(t))
-        } else if state.buf.size() == state.buf.cap() {
+        } else if guard.buf.size() == guard.buf.cap() {
             Err(super::Full(t))
-        } else if state.cap == 0 {
+        } else if guard.cap == 0 {
             // With capacity 0, even though we have buffer space we can't
             // transfer the data unless there's a receiver waiting.
-            match mem::replace(&mut state.blocker, NoneBlocked) {
+            match mem::replace(&mut guard.blocker, NoneBlocked) {
                 NoneBlocked => Err(super::Full(t)),
                 BlockedSender(..) => unreachable!(),
                 BlockedReceiver(token) => {
@@ -227,28 +240,28 @@ impl<T: Send> Packet<T> {
         // Wait for the buffer to have something in it. No need for a while loop
         // because we're the only receiver.
         let mut waited = false;
-        if !state.disconnected && state.buf.size() == 0 {
-            wait(&mut state.blocker, BlockedReceiver, &self.lock);
+        if !guard.disconnected && guard.buf.size() == 0 {
+            wait(&mut guard.blocker, BlockedReceiver, &self.lock);
             waited = true;
         }
-        if state.disconnected && state.buf.size() == 0 { return Err(()) }
+        if guard.disconnected && guard.buf.size() == 0 { return Err(()) }
 
         // Pick up the data, wake up our neighbors, and carry on
-        assert!(state.buf.size() > 0);
-        let ret = state.buf.dequeue();
+        assert!(guard.buf.size() > 0);
+        let ret = guard.buf.dequeue();
         self.wakeup_senders(waited, guard, state);
         return Ok(ret);
     }
 
     pub fn try_recv(&self) -> Result<T, Failure> {
-        let (guard, state) = self.lock();
+        let guard = self.lock();
 
         // Easy cases first
-        if state.disconnected { return Err(Disconnected) }
-        if state.buf.size() == 0 { return Err(Empty) }
+        if guard.disconnected { return Err(Disconnected) }
+        if guard.buf.size() == 0 { return Err(Empty) }
 
         // Be sure to wake up neighbors
-        let ret = Ok(state.buf.dequeue());
+        let ret = Ok(guard.buf.dequeue());
         self.wakeup_senders(false, guard, state);
 
         return ret;
@@ -265,8 +278,8 @@ impl<T: Send> Packet<T> {
         // If this is a no-buffer channel (cap == 0), then if we didn't wait we
         // need to ACK the sender. If we waited, then the sender waking us up
         // was already the ACK.
-        let pending_sender2 = if state.cap == 0 && !waited {
-            match mem::replace(&mut state.blocker, NoneBlocked) {
+        let pending_sender2 = if guard.cap == 0 && !waited {
+            match mem::replace(&mut guard.blocker, NoneBlocked) {
                 NoneBlocked => None,
                 BlockedReceiver(..) => unreachable!(),
                 BlockedSender(token) => {
@@ -277,7 +290,7 @@ impl<T: Send> Packet<T> {
         } else {
             None
         };
-        mem::drop((state, guard));
+        mem::drop(guard);
 
         // only outside of the lock do we wake up the pending tasks
         pending_sender1.map(|t| t.signal());
@@ -298,10 +311,10 @@ impl<T: Send> Packet<T> {
         }
 
         // Not much to do other than wake up a receiver if one's there
-        let (guard, state) = self.lock();
-        if state.disconnected { return }
-        state.disconnected = true;
-        match mem::replace(&mut state.blocker, NoneBlocked) {
+        let guard = self.lock();
+        if guard.disconnected { return }
+        guard.disconnected = true;
+        match mem::replace(&mut guard.blocker, NoneBlocked) {
             NoneBlocked => {}
             BlockedSender(..) => unreachable!(),
             BlockedReceiver(token) => wakeup(token, guard),
@@ -309,27 +322,27 @@ impl<T: Send> Packet<T> {
     }
 
     pub fn drop_port(&self) {
-        let (guard, state) = self.lock();
+        let guard = self.lock();
 
-        if state.disconnected { return }
-        state.disconnected = true;
+        if guard.disconnected { return }
+        guard.disconnected = true;
 
         // If the capacity is 0, then the sender may want its data back after
         // we're disconnected. Otherwise it's now our responsibility to destroy
         // the buffered data. As with many other portions of this code, this
         // needs to be careful to destroy the data *outside* of the lock to
         // prevent deadlock.
-        let _data = if state.cap != 0 {
-            mem::replace(&mut state.buf.buf, Vec::new())
+        let _data = if guard.cap != 0 {
+            mem::replace(&mut guard.buf.buf, Vec::new())
         } else {
             Vec::new()
         };
-        let mut queue = mem::replace(&mut state.queue, Queue {
+        let mut queue = mem::replace(&mut guard.queue, Queue {
             head: 0 as *mut Node,
             tail: 0 as *mut Node,
         });
 
-        let waiter = match mem::replace(&mut state.blocker, NoneBlocked) {
+        let waiter = match mem::replace(&mut guard.blocker, NoneBlocked) {
             NoneBlocked => None,
             BlockedSender(token) => {
                 *guard.canceled.take().unwrap() = true;
@@ -337,7 +350,7 @@ impl<T: Send> Packet<T> {
             }
             BlockedReceiver(..) => unreachable!(),
         };
-        mem::drop((state, guard));
+        mem::drop(guard);
 
         loop {
             match queue.dequeue() {
@@ -355,8 +368,8 @@ impl<T: Send> Packet<T> {
     // If Ok, the value is whether this port has data, if Err, then the upgraded
     // port needs to be checked instead of this one.
     pub fn can_recv(&self) -> bool {
-        let (_g, state) = self.lock();
-        state.disconnected || state.buf.size() > 0
+        let guard = self.lock();
+        guard.disconnected || guard.buf.size() > 0
     }
 
     // Attempts to start selection on this port. This can either succeed or fail
@@ -380,8 +393,8 @@ impl<T: Send> Packet<T> {
     //
     // The return value indicates whether there's data on this port.
     pub fn abort_selection(&self) -> bool {
-        let (_g, state) = self.lock();
-        match mem::replace(&mut state.blocker, NoneBlocked) {
+        let guard = self.lock();
+        match mem::replace(&mut guard.blocker, NoneBlocked) {
             NoneBlocked => true,
             BlockedSender(token) => {
                 guard.blocker = BlockedSender(token);
@@ -396,9 +409,9 @@ impl<T: Send> Packet<T> {
 impl<T: Send> Drop for Packet<T> {
     fn drop(&mut self) {
         assert_eq!(self.channels.load(atomic::SeqCst), 0);
-        let (_g, state) = self.lock();
-        assert!(state.queue.dequeue().is_none());
-        assert!(state.canceled.is_none());
+        let guard = self.lock();
+        assert!(guard.queue.dequeue().is_none());
+        assert!(guard.canceled.is_none());
     }
 }