diff options
| author | Aaron Turon <aturon@mozilla.com> | 2014-12-06 18:34:37 -0800 |
|---|---|---|
| committer | Aaron Turon <aturon@mozilla.com> | 2014-12-18 23:31:51 -0800 |
| commit | 43ae4b3301cc0605839778ecf59effb32b752e33 (patch) | |
| tree | aa111f5adc1eaa1e996847e1437d1b1b40821ce0 /src/libstd/comm | |
| parent | 14c1a103bc3f78721df1dc860a75a477c8275e3a (diff) | |
| download | rust-43ae4b3301cc0605839778ecf59effb32b752e33.tar.gz rust-43ae4b3301cc0605839778ecf59effb32b752e33.zip | |
Fallout from new thread API
Diffstat (limited to 'src/libstd/comm')
| -rw-r--r-- | src/libstd/comm/blocking.rs | 6 | ||||
| -rw-r--r-- | src/libstd/comm/mod.rs | 121 | ||||
| -rw-r--r-- | src/libstd/comm/oneshot.rs | 7 | ||||
| -rw-r--r-- | src/libstd/comm/select.rs | 23 | ||||
| -rw-r--r-- | src/libstd/comm/shared.rs | 2 | ||||
| -rw-r--r-- | src/libstd/comm/stream.rs | 5 | ||||
| -rw-r--r-- | src/libstd/comm/sync.rs | 40 |
7 files changed, 69 insertions, 135 deletions
diff --git a/src/libstd/comm/blocking.rs b/src/libstd/comm/blocking.rs index 5e9a01d0151..bb097265756 100644 --- a/src/libstd/comm/blocking.rs +++ b/src/libstd/comm/blocking.rs @@ -32,7 +32,7 @@ pub struct WaitToken { no_send: NoSend, } -fn token() -> (WaitToken, SignalToken) { +pub fn tokens() -> (WaitToken, SignalToken) { let inner = Arc::new(Inner { thread: Thread::current(), woken: INIT_ATOMIC_BOOL, @@ -48,7 +48,7 @@ fn token() -> (WaitToken, SignalToken) { } impl SignalToken { - fn signal(&self) -> bool { + pub fn signal(&self) -> bool { let wake = !self.inner.woken.compare_and_swap(false, true, Ordering::SeqCst); if wake { self.inner.thread.unpark(); @@ -73,7 +73,7 @@ impl SignalToken { } impl WaitToken { - fn wait(self) { + pub fn wait(self) { while !self.inner.woken.load(Ordering::SeqCst) { Thread::park() } diff --git a/src/libstd/comm/mod.rs b/src/libstd/comm/mod.rs index e5ec0078c5e..236a055b91e 100644 --- a/src/libstd/comm/mod.rs +++ b/src/libstd/comm/mod.rs @@ -317,8 +317,10 @@ use core::kinds::marker; use core::mem; use core::cell::UnsafeCell; -pub use comm::select::{Select, Handle}; -use comm::select::StartResult::*; +pub use self::select::{Select, Handle}; +use self::select::StartResult; +use self::select::StartResult::*; +use self::blocking::SignalToken; macro_rules! test { { fn $name:ident() $b:block $(#[$a:meta])*} => ( @@ -330,7 +332,7 @@ macro_rules! test { use comm::*; use super::*; - use task; + use thread::Thread; $(#[$a])* #[test] fn f() { $b } } @@ -593,12 +595,12 @@ impl<T: Send> Sender<T> { (a, ret) } oneshot::UpDisconnected => (a, Err(t)), - oneshot::UpWoke(task) => { - // This send cannot panic because the task is + oneshot::UpWoke(token) => { + // This send cannot panic because the thread is // asleep (we're looking at it), so the receiver // can't go away. (*a.get()).send(t).ok().unwrap(); - task.wake().map(|t| t.reawaken()); + token.signal(); (a, Ok(())) } } @@ -937,7 +939,7 @@ impl<T: Send> select::Packet for Receiver<T> { } } - fn start_selection(&self, mut token: SignalToken) -> bool { + fn start_selection(&self, mut token: SignalToken) -> StartResult { loop { let (t, new_port) = match *unsafe { self.inner() } { Oneshot(ref p) => { @@ -1240,11 +1242,11 @@ mod test { test! { fn oneshot_single_thread_recv_chan_close() { // Receiving on a closed chan will panic - let res = task::try(move|| { + let res = Thread::with_join(move|| { let (tx, rx) = channel::<int>(); drop(tx); rx.recv(); - }); + }).join(); // What is our res? assert!(res.is_err()); } } @@ -1312,9 +1314,9 @@ mod test { spawn(move|| { drop(tx); }); - let res = task::try(move|| { + let res = Thread::with_join(move|| { assert!(rx.recv() == box 10); - }); + }).join(); assert!(res.is_err()); } } @@ -1334,19 +1336,19 @@ mod test { spawn(move|| { drop(rx); }); - let _ = task::try(move|| { + let _ = Thread::with_join(move|| { tx.send(1); - }); + }).join(); } } } test! { fn oneshot_multi_thread_recv_close_stress() { for _ in range(0, stress_factor()) { let (tx, rx) = channel::<int>(); - spawn(move|| { - let res = task::try(move|| { + spawn(proc() { + let res = Thread::with_join(move|| { rx.recv(); - }); + }).join(); assert!(res.is_err()); }); spawn(move|| { @@ -1495,7 +1497,7 @@ mod test { tx2.send(()); }); // make sure the other task has gone to sleep - for _ in range(0u, 5000) { task::deschedule(); } + for _ in range(0u, 5000) { Thread::yield_now(); } // upgrade to a shared chan and send a message let t = tx.clone(); @@ -1504,45 +1506,7 @@ mod test { // wait for the child task to exit before we exit rx2.recv(); - } } - - test! { fn sends_off_the_runtime() { - use rt::thread::Thread; - - let (tx, rx) = channel(); - let t = Thread::start(move|| { - for _ in range(0u, 1000) { - tx.send(()); - } - }); - for _ in range(0u, 1000) { - rx.recv(); - } - t.join(); - } } - - test! { fn try_recvs_off_the_runtime() { - use rt::thread::Thread; - - let (tx, rx) = channel(); - let (cdone, pdone) = channel(); - let t = Thread::start(move|| { - let mut hits = 0u; - while hits < 10 { - match rx.try_recv() { - Ok(()) => { hits += 1; } - Err(Empty) => { Thread::yield_now(); } - Err(Disconnected) => return, - } - } - cdone.send(()); - }); - for _ in range(0u, 10) { - tx.send(()); - } - t.join(); - pdone.recv(); - } } + }) } #[cfg(test)] @@ -1700,11 +1664,11 @@ mod sync_tests { test! { fn oneshot_single_thread_recv_chan_close() { // Receiving on a closed chan will panic - let res = task::try(move|| { + let res = Thread::with_join(move|| { let (tx, rx) = sync_channel::<int>(0); drop(tx); rx.recv(); - }); + }).join(); // What is our res? assert!(res.is_err()); } } @@ -1777,9 +1741,9 @@ mod sync_tests { spawn(move|| { drop(tx); }); - let res = task::try(move|| { + let res = Thread::with_join(move|| { assert!(rx.recv() == box 10); - }); + }).join(); assert!(res.is_err()); } } @@ -1799,19 +1763,19 @@ mod sync_tests { spawn(move|| { drop(rx); }); - let _ = task::try(move|| { + let _ = Thread::with_join(move || { tx.send(1); - }); + }).join(); } } } test! { fn oneshot_multi_thread_recv_close_stress() { for _ in range(0, stress_factor()) { let (tx, rx) = sync_channel::<int>(0); - spawn(move|| { - let res = task::try(move|| { + spawn(proc() { + let res = Thread::with_join(move|| { rx.recv(); - }); + }).join(); assert!(res.is_err()); }); spawn(move|| { @@ -1960,7 +1924,7 @@ mod sync_tests { tx2.send(()); }); // make sure the other task has gone to sleep - for _ in range(0u, 5000) { task::deschedule(); } + for _ in range(0u, 5000) { Thread::yield_now(); } // upgrade to a shared chan and send a message let t = tx.clone(); @@ -1971,29 +1935,6 @@ mod sync_tests { rx2.recv(); } } - test! { fn try_recvs_off_the_runtime() { - use rt::thread::Thread; - - let (tx, rx) = sync_channel::<()>(0); - let (cdone, pdone) = channel(); - let t = Thread::start(move|| { - let mut hits = 0u; - while hits < 10 { - match rx.try_recv() { - Ok(()) => { hits += 1; } - Err(Empty) => { Thread::yield_now(); } - Err(Disconnected) => return, - } - } - cdone.send(()); - }); - for _ in range(0u, 10) { - tx.send(()); - } - t.join(); - pdone.recv(); - } } - test! { fn send_opt1() { let (tx, rx) = sync_channel::<int>(0); spawn(move|| { rx.recv(); }); @@ -2052,7 +1993,7 @@ mod sync_tests { test! { fn try_send4() { let (tx, rx) = sync_channel::<int>(0); spawn(move|| { - for _ in range(0u, 1000) { task::deschedule(); } + for _ in range(0u, 1000) { Thread::yield_now(); } assert_eq!(tx.try_send(1), Ok(())); }); assert_eq!(rx.recv(), 1); diff --git a/src/libstd/comm/oneshot.rs b/src/libstd/comm/oneshot.rs index 68f3f229cb4..9c5a6518845 100644 --- a/src/libstd/comm/oneshot.rs +++ b/src/libstd/comm/oneshot.rs @@ -39,9 +39,8 @@ use self::MyUpgrade::*; use core::prelude::*; -use alloc::boxed::Box; use comm::Receiver; -use comm::blocking::{mod, WaitToken, SignalToken}; +use comm::blocking::{mod, SignalToken}; use core::mem; use sync::atomic; @@ -143,7 +142,7 @@ impl<T: Send> Packet<T> { // Attempt to not block the task (it's a little expensive). If it looks // like we're not empty, then immediately go through to `try_recv`. if self.state.load(atomic::SeqCst) == EMPTY { - let (wait_token, signal_token) = blocking::token(); + let (wait_token, signal_token) = blocking::tokens(); let ptr = unsafe { signal_token.cast_to_uint() }; // race with senders to enter the blocking state @@ -332,7 +331,7 @@ impl<T: Send> Packet<T> { // If we've got a blocked task, then use an atomic to gain ownership // of it (may fail) - BLOCKED => self.state.compare_and_swap(BLOCKED, EMPTY, atomic::SeqCst) + ptr => self.state.compare_and_swap(ptr, EMPTY, atomic::SeqCst) }; // Now that we've got ownership of our state, figure out what to do diff --git a/src/libstd/comm/select.rs b/src/libstd/comm/select.rs index 536d38c6e55..690b5861c22 100644 --- a/src/libstd/comm/select.rs +++ b/src/libstd/comm/select.rs @@ -54,7 +54,6 @@ use core::prelude::*; -use alloc::boxed::Box; use core::cell::Cell; use core::kinds::marker; use core::mem; @@ -63,8 +62,6 @@ use core::uint; use comm::Receiver; use comm::blocking::{mod, SignalToken}; -use self::StartResult::*; - /// The "receiver set" of the select interface. This structure is used to manage /// a set of receivers which are being selected over. pub struct Select { @@ -190,8 +187,8 @@ impl Select { let (wait_token, signal_token) = blocking::tokens(); for (i, handle) in self.iter().enumerate() { match (*handle).packet.start_selection(signal_token.clone()) { - Installed => {} - Abort => { + StartResult::Installed => {} + StartResult::Abort => { // Go back and abort the already-begun selections for handle in self.iter().take(i) { (*handle).packet.abort_selection(); @@ -417,10 +414,10 @@ mod test { let (tx3, rx3) = channel::<int>(); spawn(move|| { - for _ in range(0u, 20) { task::deschedule(); } + for _ in range(0u, 20) { Thread::yield_now(); } tx1.send(1); rx3.recv(); - for _ in range(0u, 20) { task::deschedule(); } + for _ in range(0u, 20) { Thread::yield_now(); } }); select! { @@ -440,7 +437,7 @@ mod test { let (tx3, rx3) = channel::<()>(); spawn(move|| { - for _ in range(0u, 20) { task::deschedule(); } + for _ in range(0u, 20) { Thread::yield_now(); } tx1.send(1); tx2.send(2); rx3.recv(); @@ -541,7 +538,7 @@ mod test { tx3.send(()); }); - for _ in range(0u, 1000) { task::deschedule(); } + for _ in range(0u, 1000) { Thread::yield_now(); } drop(tx1.clone()); tx2.send(()); rx3.recv(); @@ -644,7 +641,7 @@ mod test { tx2.send(()); }); - for _ in range(0u, 100) { task::deschedule() } + for _ in range(0u, 100) { Thread::yield_now() } tx1.send(()); rx2.recv(); } } @@ -663,7 +660,7 @@ mod test { tx2.send(()); }); - for _ in range(0u, 100) { task::deschedule() } + for _ in range(0u, 100) { Thread::yield_now() } tx1.send(()); rx2.recv(); } } @@ -681,7 +678,7 @@ mod test { tx2.send(()); }); - for _ in range(0u, 100) { task::deschedule() } + for _ in range(0u, 100) { Thread::yield_now() } tx1.send(()); rx2.recv(); } } @@ -697,7 +694,7 @@ mod test { test! { fn sync2() { let (tx, rx) = sync_channel::<int>(0); spawn(move|| { - for _ in range(0u, 100) { task::deschedule() } + for _ in range(0u, 100) { Thread::yield_now() } tx.send(1); }); select! { diff --git a/src/libstd/comm/shared.rs b/src/libstd/comm/shared.rs index 1f1ea2ca9a1..1022694e634 100644 --- a/src/libstd/comm/shared.rs +++ b/src/libstd/comm/shared.rs @@ -22,7 +22,6 @@ pub use self::Failure::*; use core::prelude::*; -use alloc::boxed::Box; use core::cmp; use core::int; @@ -31,6 +30,7 @@ use comm::mpsc_queue as mpsc; use comm::blocking::{mod, SignalToken}; use comm::select::StartResult; use comm::select::StartResult::*; +use thread::Thread; const DISCONNECTED: int = int::MIN; const FUDGE: int = 1024; diff --git a/src/libstd/comm/stream.rs b/src/libstd/comm/stream.rs index a15366d5ebc..b68f626060e 100644 --- a/src/libstd/comm/stream.rs +++ b/src/libstd/comm/stream.rs @@ -24,7 +24,6 @@ use self::Message::*; use core::prelude::*; -use alloc::boxed::Box; use core::cmp; use core::int; use thread::Thread; @@ -32,7 +31,7 @@ use thread::Thread; use sync::atomic; use comm::spsc_queue as spsc; use comm::Receiver; -use comm::blocking::{mod, WaitToken, SignalToken}; +use comm::blocking::{mod, SignalToken}; const DISCONNECTED: int = int::MIN; #[cfg(test)] @@ -147,7 +146,7 @@ impl<T: Send> Packet<T> { let ptr = self.to_wake.load(atomic::SeqCst); self.to_wake.store(0, atomic::SeqCst); assert!(ptr != 0); - unsafe { SignaToken::cast_from_uint(ptr) } + unsafe { SignalToken::cast_from_uint(ptr) } } // Decrements the count on the channel for a sleeper, returning the sleeper diff --git a/src/libstd/comm/sync.rs b/src/libstd/comm/sync.rs index 9e4bdb15b00..b24c6d21fba 100644 --- a/src/libstd/comm/sync.rs +++ b/src/libstd/comm/sync.rs @@ -38,10 +38,8 @@ use core::prelude::*; pub use self::Failure::*; use self::Blocker::*; -use alloc::boxed::Box; use vec::Vec; use core::mem; -use core::cell::UnsafeCell; use sync::{atomic, Mutex, MutexGuard}; use comm::blocking::{mod, WaitToken, SignalToken}; @@ -105,10 +103,10 @@ pub enum Failure { /// Atomically blocks the current thread, placing it into `slot`, unlocking `lock` /// in the meantime. This re-locks the mutex upon returning. -fn wait<'a, 'b, T>(lock: &'a Mutex<State<T>>, - guard: MutexGuard<'b, State<T>>, - f: fn(BlockedTask) -> Blocker) - -> MutexGuard<'a, State<T>> +fn wait<'a, 'b, T: Send>(lock: &'a Mutex<State<T>>, + mut guard: MutexGuard<'b, State<T>>, + f: fn(SignalToken) -> Blocker) + -> MutexGuard<'a, State<T>> { let me: Box<Task> = Local::take(); me.deschedule(1, |task| { @@ -170,7 +168,7 @@ impl<T: Send> Packet<T> { } pub fn send(&self, t: T) -> Result<(), T> { - let guard = self.acquire_send_slot(); + let mut guard = self.acquire_send_slot(); if guard.disconnected { return Err(t) } guard.buf.enqueue(t); @@ -183,7 +181,7 @@ impl<T: Send> Packet<T> { let mut canceled = false; assert!(guard.canceled.is_none()); guard.canceled = Some(unsafe { mem::transmute(&mut canceled) }); - let guard = wait(&self.lock, guard, BlockedSender); + let mut guard = wait(&self.lock, guard, BlockedSender); if canceled {Err(guard.buf.dequeue())} else {Ok(())} } @@ -198,7 +196,7 @@ impl<T: Send> Packet<T> { } pub fn try_send(&self, t: T) -> Result<(), super::TrySendError<T>> { - let guard = self.lock.lock(); + let mut guard = self.lock.lock(); if guard.disconnected { Err(super::RecvDisconnected(t)) } else if guard.buf.size() == guard.buf.cap() { @@ -235,13 +233,13 @@ impl<T: Send> Packet<T> { // When reading this, remember that there can only ever be one receiver at // time. pub fn recv(&self) -> Result<T, ()> { - let guard = self.lock.lock(); + let mut guard = self.lock.lock(); // Wait for the buffer to have something in it. No need for a while loop // because we're the only receiver. let mut waited = false; if !guard.disconnected && guard.buf.size() == 0 { - wait(&mut guard.blocker, BlockedReceiver, &self.lock); + guard = wait(&self.lock, guard, BlockedReceiver); waited = true; } if guard.disconnected && guard.buf.size() == 0 { return Err(()) } @@ -249,12 +247,12 @@ impl<T: Send> Packet<T> { // Pick up the data, wake up our neighbors, and carry on assert!(guard.buf.size() > 0); let ret = guard.buf.dequeue(); - self.wakeup_senders(waited, guard, state); + self.wakeup_senders(waited, guard); return Ok(ret); } pub fn try_recv(&self) -> Result<T, Failure> { - let guard = self.lock(); + let mut guard = self.lock.lock(); // Easy cases first if guard.disconnected { return Err(Disconnected) } @@ -262,7 +260,7 @@ impl<T: Send> Packet<T> { // Be sure to wake up neighbors let ret = Ok(guard.buf.dequeue()); - self.wakeup_senders(false, guard, state); + self.wakeup_senders(false, guard); return ret; } @@ -272,7 +270,7 @@ impl<T: Send> Packet<T> { // * `waited` - flag if the receiver blocked to receive some data, or if it // just picked up some data on the way out // * `guard` - the lock guard that is held over this channel's lock - fn wakeup_senders(&self, waited: bool, guard: MutexGuard<State<T>>) { + fn wakeup_senders(&self, waited: bool, mut guard: MutexGuard<State<T>>) { let pending_sender1: Option<SignalToken> = guard.queue.dequeue(); // If this is a no-buffer channel (cap == 0), then if we didn't wait we @@ -311,7 +309,7 @@ impl<T: Send> Packet<T> { } // Not much to do other than wake up a receiver if one's there - let guard = self.lock(); + let mut guard = self.lock.lock(); if guard.disconnected { return } guard.disconnected = true; match mem::replace(&mut guard.blocker, NoneBlocked) { @@ -322,7 +320,7 @@ impl<T: Send> Packet<T> { } pub fn drop_port(&self) { - let guard = self.lock(); + let mut guard = self.lock.lock(); if guard.disconnected { return } guard.disconnected = true; @@ -368,14 +366,14 @@ impl<T: Send> Packet<T> { // If Ok, the value is whether this port has data, if Err, then the upgraded // port needs to be checked instead of this one. pub fn can_recv(&self) -> bool { - let guard = self.lock(); + let guard = self.lock.lock(); guard.disconnected || guard.buf.size() > 0 } // Attempts to start selection on this port. This can either succeed or fail // because there is data waiting. pub fn start_selection(&self, token: SignalToken) -> StartResult { - let guard = self.lock(); + let mut guard = self.lock.lock(); if guard.disconnected || guard.buf.size() > 0 { Abort } else { @@ -393,7 +391,7 @@ impl<T: Send> Packet<T> { // // The return value indicates whether there's data on this port. pub fn abort_selection(&self) -> bool { - let guard = self.lock(); + let mut guard = self.lock.lock(); match mem::replace(&mut guard.blocker, NoneBlocked) { NoneBlocked => true, BlockedSender(token) => { @@ -409,7 +407,7 @@ impl<T: Send> Packet<T> { impl<T: Send> Drop for Packet<T> { fn drop(&mut self) { assert_eq!(self.channels.load(atomic::SeqCst), 0); - let guard = self.lock(); + let mut guard = self.lock.lock(); assert!(guard.queue.dequeue().is_none()); assert!(guard.canceled.is_none()); } |
