diff options
| author | Aaron Turon <aturon@mozilla.com> | 2014-12-01 08:49:32 -0800 |
|---|---|---|
| committer | Aaron Turon <aturon@mozilla.com> | 2014-12-18 23:31:51 -0800 |
| commit | d8e4780b0b59636cd979a60434a407142e407ac9 (patch) | |
| tree | 8b13d1e484c9b461ca1b5f0c80ef5ac35da1c44b /src/libstd/comm | |
| parent | 7fd7ce682dd6f98d456d817a297b15bdc9841190 (diff) | |
| download | rust-d8e4780b0b59636cd979a60434a407142e407ac9.tar.gz rust-d8e4780b0b59636cd979a60434a407142e407ac9.zip | |
Remove rt::{mutex, exclusive}
Diffstat (limited to 'src/libstd/comm')
| -rw-r--r-- | src/libstd/comm/sync.rs | 111 |
1 files changed, 62 insertions, 49 deletions
diff --git a/src/libstd/comm/sync.rs b/src/libstd/comm/sync.rs index 7e87596429c..9e4bdb15b00 100644 --- a/src/libstd/comm/sync.rs +++ b/src/libstd/comm/sync.rs @@ -52,9 +52,7 @@ pub struct Packet<T> { /// the other shared channel already had the code implemented channels: atomic::AtomicUint, - /// The state field is protected by this mutex - lock: NativeMutex, - state: UnsafeCell<State<T>>, + lock: Mutex<State<T>>, } struct State<T> { @@ -107,9 +105,25 @@ pub enum Failure { /// Atomically blocks the current thread, placing it into `slot`, unlocking `lock` /// in the meantime. This re-locks the mutex upon returning. +fn wait<'a, 'b, T>(lock: &'a Mutex<State<T>>, + guard: MutexGuard<'b, State<T>>, + f: fn(BlockedTask) -> Blocker) + -> MutexGuard<'a, State<T>> +{ + let me: Box<Task> = Local::take(); + me.deschedule(1, |task| { + match mem::replace(&mut guard.blocker, f(task)) { + NoneBlocked => {} + _ => unreachable!(), + } + mem::drop(guard); + Ok(()) + }); + lock.lock() +} -/// Wakes up a thread, dropping the lock at the correct time -fn wakeup<T>(token: SignalToken, guard: MutexGuard<State<T>>) { +/// Wakes up a task, dropping the lock at the correct time +fn wakeup<T>(task: BlockedTask, guard: MutexGuard<State<T>>) { // We need to be careful to wake up the waiting task *outside* of the mutex // in case it incurs a context switch. drop(guard); @@ -120,8 +134,7 @@ impl<T: Send> Packet<T> { pub fn new(cap: uint) -> Packet<T> { Packet { channels: atomic::AtomicUint::new(1), - lock: unsafe { NativeMutex::new() }, - state: UnsafeCell::new(State { + lock: Mutex::new(State { disconnected: false, blocker: NoneBlocked, cap: cap, @@ -161,17 +174,17 @@ impl<T: Send> Packet<T> { if guard.disconnected { return Err(t) } guard.buf.enqueue(t); - match mem::replace(&mut state.blocker, NoneBlocked) { + match mem::replace(&mut guard.blocker, NoneBlocked) { // if our capacity is 0, then we need to wait for a receiver to be // available to take our data. After waiting, we check again to make // sure the port didn't go away in the meantime. If it did, we need // to hand back our data. - NoneBlocked if state.cap == 0 => { + NoneBlocked if guard.cap == 0 => { let mut canceled = false; - assert!(state.canceled.is_none()); - state.canceled = Some(unsafe { mem::transmute(&mut canceled) }); - wait(&mut state.blocker, BlockedSender, &self.lock); - if canceled {Err(state.buf.dequeue())} else {Ok(())} + assert!(guard.canceled.is_none()); + guard.canceled = Some(unsafe { mem::transmute(&mut canceled) }); + let guard = wait(&self.lock, guard, BlockedSender); + if canceled {Err(guard.buf.dequeue())} else {Ok(())} } // success, we buffered some data @@ -185,15 +198,15 @@ impl<T: Send> Packet<T> { } pub fn try_send(&self, t: T) -> Result<(), super::TrySendError<T>> { - let (guard, state) = self.lock(); - if state.disconnected { + let guard = self.lock.lock(); + if guard.disconnected { Err(super::RecvDisconnected(t)) - } else if state.buf.size() == state.buf.cap() { + } else if guard.buf.size() == guard.buf.cap() { Err(super::Full(t)) - } else if state.cap == 0 { + } else if guard.cap == 0 { // With capacity 0, even though we have buffer space we can't // transfer the data unless there's a receiver waiting. - match mem::replace(&mut state.blocker, NoneBlocked) { + match mem::replace(&mut guard.blocker, NoneBlocked) { NoneBlocked => Err(super::Full(t)), BlockedSender(..) => unreachable!(), BlockedReceiver(token) => { @@ -227,28 +240,28 @@ impl<T: Send> Packet<T> { // Wait for the buffer to have something in it. No need for a while loop // because we're the only receiver. let mut waited = false; - if !state.disconnected && state.buf.size() == 0 { - wait(&mut state.blocker, BlockedReceiver, &self.lock); + if !guard.disconnected && guard.buf.size() == 0 { + wait(&mut guard.blocker, BlockedReceiver, &self.lock); waited = true; } - if state.disconnected && state.buf.size() == 0 { return Err(()) } + if guard.disconnected && guard.buf.size() == 0 { return Err(()) } // Pick up the data, wake up our neighbors, and carry on - assert!(state.buf.size() > 0); - let ret = state.buf.dequeue(); + assert!(guard.buf.size() > 0); + let ret = guard.buf.dequeue(); self.wakeup_senders(waited, guard, state); return Ok(ret); } pub fn try_recv(&self) -> Result<T, Failure> { - let (guard, state) = self.lock(); + let guard = self.lock(); // Easy cases first - if state.disconnected { return Err(Disconnected) } - if state.buf.size() == 0 { return Err(Empty) } + if guard.disconnected { return Err(Disconnected) } + if guard.buf.size() == 0 { return Err(Empty) } // Be sure to wake up neighbors - let ret = Ok(state.buf.dequeue()); + let ret = Ok(guard.buf.dequeue()); self.wakeup_senders(false, guard, state); return ret; @@ -265,8 +278,8 @@ impl<T: Send> Packet<T> { // If this is a no-buffer channel (cap == 0), then if we didn't wait we // need to ACK the sender. If we waited, then the sender waking us up // was already the ACK. - let pending_sender2 = if state.cap == 0 && !waited { - match mem::replace(&mut state.blocker, NoneBlocked) { + let pending_sender2 = if guard.cap == 0 && !waited { + match mem::replace(&mut guard.blocker, NoneBlocked) { NoneBlocked => None, BlockedReceiver(..) => unreachable!(), BlockedSender(token) => { @@ -277,7 +290,7 @@ impl<T: Send> Packet<T> { } else { None }; - mem::drop((state, guard)); + mem::drop(guard); // only outside of the lock do we wake up the pending tasks pending_sender1.map(|t| t.signal()); @@ -298,10 +311,10 @@ impl<T: Send> Packet<T> { } // Not much to do other than wake up a receiver if one's there - let (guard, state) = self.lock(); - if state.disconnected { return } - state.disconnected = true; - match mem::replace(&mut state.blocker, NoneBlocked) { + let guard = self.lock(); + if guard.disconnected { return } + guard.disconnected = true; + match mem::replace(&mut guard.blocker, NoneBlocked) { NoneBlocked => {} BlockedSender(..) => unreachable!(), BlockedReceiver(token) => wakeup(token, guard), @@ -309,27 +322,27 @@ impl<T: Send> Packet<T> { } pub fn drop_port(&self) { - let (guard, state) = self.lock(); + let guard = self.lock(); - if state.disconnected { return } - state.disconnected = true; + if guard.disconnected { return } + guard.disconnected = true; // If the capacity is 0, then the sender may want its data back after // we're disconnected. Otherwise it's now our responsibility to destroy // the buffered data. As with many other portions of this code, this // needs to be careful to destroy the data *outside* of the lock to // prevent deadlock. - let _data = if state.cap != 0 { - mem::replace(&mut state.buf.buf, Vec::new()) + let _data = if guard.cap != 0 { + mem::replace(&mut guard.buf.buf, Vec::new()) } else { Vec::new() }; - let mut queue = mem::replace(&mut state.queue, Queue { + let mut queue = mem::replace(&mut guard.queue, Queue { head: 0 as *mut Node, tail: 0 as *mut Node, }); - let waiter = match mem::replace(&mut state.blocker, NoneBlocked) { + let waiter = match mem::replace(&mut guard.blocker, NoneBlocked) { NoneBlocked => None, BlockedSender(token) => { *guard.canceled.take().unwrap() = true; @@ -337,7 +350,7 @@ impl<T: Send> Packet<T> { } BlockedReceiver(..) => unreachable!(), }; - mem::drop((state, guard)); + mem::drop(guard); loop { match queue.dequeue() { @@ -355,8 +368,8 @@ impl<T: Send> Packet<T> { // If Ok, the value is whether this port has data, if Err, then the upgraded // port needs to be checked instead of this one. pub fn can_recv(&self) -> bool { - let (_g, state) = self.lock(); - state.disconnected || state.buf.size() > 0 + let guard = self.lock(); + guard.disconnected || guard.buf.size() > 0 } // Attempts to start selection on this port. This can either succeed or fail @@ -380,8 +393,8 @@ impl<T: Send> Packet<T> { // // The return value indicates whether there's data on this port. pub fn abort_selection(&self) -> bool { - let (_g, state) = self.lock(); - match mem::replace(&mut state.blocker, NoneBlocked) { + let guard = self.lock(); + match mem::replace(&mut guard.blocker, NoneBlocked) { NoneBlocked => true, BlockedSender(token) => { guard.blocker = BlockedSender(token); @@ -396,9 +409,9 @@ impl<T: Send> Packet<T> { impl<T: Send> Drop for Packet<T> { fn drop(&mut self) { assert_eq!(self.channels.load(atomic::SeqCst), 0); - let (_g, state) = self.lock(); - assert!(state.queue.dequeue().is_none()); - assert!(state.canceled.is_none()); + let guard = self.lock(); + assert!(guard.queue.dequeue().is_none()); + assert!(guard.canceled.is_none()); } } |
