about summary refs log tree commit diff
path: root/src/libstd/comm
diff options
context:
space:
mode:
authorAaron Turon <aturon@mozilla.com>2014-12-06 18:34:37 -0800
committerAaron Turon <aturon@mozilla.com>2014-12-18 23:31:51 -0800
commit43ae4b3301cc0605839778ecf59effb32b752e33 (patch)
treeaa111f5adc1eaa1e996847e1437d1b1b40821ce0 /src/libstd/comm
parent14c1a103bc3f78721df1dc860a75a477c8275e3a (diff)
downloadrust-43ae4b3301cc0605839778ecf59effb32b752e33.tar.gz
rust-43ae4b3301cc0605839778ecf59effb32b752e33.zip
Fallout from new thread API
Diffstat (limited to 'src/libstd/comm')
-rw-r--r--src/libstd/comm/blocking.rs6
-rw-r--r--src/libstd/comm/mod.rs121
-rw-r--r--src/libstd/comm/oneshot.rs7
-rw-r--r--src/libstd/comm/select.rs23
-rw-r--r--src/libstd/comm/shared.rs2
-rw-r--r--src/libstd/comm/stream.rs5
-rw-r--r--src/libstd/comm/sync.rs40
7 files changed, 69 insertions, 135 deletions
diff --git a/src/libstd/comm/blocking.rs b/src/libstd/comm/blocking.rs
index 5e9a01d0151..bb097265756 100644
--- a/src/libstd/comm/blocking.rs
+++ b/src/libstd/comm/blocking.rs
@@ -32,7 +32,7 @@ pub struct WaitToken {
     no_send: NoSend,
 }
 
-fn token() -> (WaitToken, SignalToken) {
+pub fn tokens() -> (WaitToken, SignalToken) {
     let inner = Arc::new(Inner {
         thread: Thread::current(),
         woken: INIT_ATOMIC_BOOL,
@@ -48,7 +48,7 @@ fn token() -> (WaitToken, SignalToken) {
 }
 
 impl SignalToken {
-    fn signal(&self) -> bool {
+    pub fn signal(&self) -> bool {
         let wake = !self.inner.woken.compare_and_swap(false, true, Ordering::SeqCst);
         if wake {
             self.inner.thread.unpark();
@@ -73,7 +73,7 @@ impl SignalToken {
 }
 
 impl WaitToken {
-    fn wait(self) {
+    pub fn wait(self) {
         while !self.inner.woken.load(Ordering::SeqCst) {
             Thread::park()
         }
diff --git a/src/libstd/comm/mod.rs b/src/libstd/comm/mod.rs
index e5ec0078c5e..236a055b91e 100644
--- a/src/libstd/comm/mod.rs
+++ b/src/libstd/comm/mod.rs
@@ -317,8 +317,10 @@ use core::kinds::marker;
 use core::mem;
 use core::cell::UnsafeCell;
 
-pub use comm::select::{Select, Handle};
-use comm::select::StartResult::*;
+pub use self::select::{Select, Handle};
+use self::select::StartResult;
+use self::select::StartResult::*;
+use self::blocking::SignalToken;
 
 macro_rules! test {
     { fn $name:ident() $b:block $(#[$a:meta])*} => (
@@ -330,7 +332,7 @@ macro_rules! test {
 
             use comm::*;
             use super::*;
-            use task;
+            use thread::Thread;
 
             $(#[$a])* #[test] fn f() { $b }
         }
@@ -593,12 +595,12 @@ impl<T: Send> Sender<T> {
                                 (a, ret)
                             }
                             oneshot::UpDisconnected => (a, Err(t)),
-                            oneshot::UpWoke(task) => {
-                                // This send cannot panic because the task is
+                            oneshot::UpWoke(token) => {
+                                // This send cannot panic because the thread is
                                 // asleep (we're looking at it), so the receiver
                                 // can't go away.
                                 (*a.get()).send(t).ok().unwrap();
-                                task.wake().map(|t| t.reawaken());
+                                token.signal();
                                 (a, Ok(()))
                             }
                         }
@@ -937,7 +939,7 @@ impl<T: Send> select::Packet for Receiver<T> {
         }
     }
 
-    fn start_selection(&self, mut token: SignalToken) -> bool {
+    fn start_selection(&self, mut token: SignalToken) -> StartResult {
         loop {
             let (t, new_port) = match *unsafe { self.inner() } {
                 Oneshot(ref p) => {
@@ -1240,11 +1242,11 @@ mod test {
 
     test! { fn oneshot_single_thread_recv_chan_close() {
         // Receiving on a closed chan will panic
-        let res = task::try(move|| {
+        let res = Thread::with_join(move|| {
             let (tx, rx) = channel::<int>();
             drop(tx);
             rx.recv();
-        });
+        }).join();
         // What is our res?
         assert!(res.is_err());
     } }
@@ -1312,9 +1314,9 @@ mod test {
         spawn(move|| {
             drop(tx);
         });
-        let res = task::try(move|| {
+        let res = Thread::with_join(move|| {
             assert!(rx.recv() == box 10);
-        });
+        }).join();
         assert!(res.is_err());
     } }
 
@@ -1334,19 +1336,19 @@ mod test {
             spawn(move|| {
                 drop(rx);
             });
-            let _ = task::try(move|| {
+            let _ = Thread::with_join(move|| {
                 tx.send(1);
-            });
+            }).join();
         }
     } }
 
     test! { fn oneshot_multi_thread_recv_close_stress() {
         for _ in range(0, stress_factor()) {
             let (tx, rx) = channel::<int>();
-            spawn(move|| {
-                let res = task::try(move|| {
+            spawn(proc() {
+                let res = Thread::with_join(move|| {
                     rx.recv();
-                });
+                }).join();
                 assert!(res.is_err());
             });
             spawn(move|| {
@@ -1495,7 +1497,7 @@ mod test {
             tx2.send(());
         });
         // make sure the other task has gone to sleep
-        for _ in range(0u, 5000) { task::deschedule(); }
+        for _ in range(0u, 5000) { Thread::yield_now(); }
 
         // upgrade to a shared chan and send a message
         let t = tx.clone();
@@ -1504,45 +1506,7 @@ mod test {
 
         // wait for the child task to exit before we exit
         rx2.recv();
-    } }
-
-    test! { fn sends_off_the_runtime() {
-        use rt::thread::Thread;
-
-        let (tx, rx) = channel();
-        let t = Thread::start(move|| {
-            for _ in range(0u, 1000) {
-                tx.send(());
-            }
-        });
-        for _ in range(0u, 1000) {
-            rx.recv();
-        }
-        t.join();
-    } }
-
-    test! { fn try_recvs_off_the_runtime() {
-        use rt::thread::Thread;
-
-        let (tx, rx) = channel();
-        let (cdone, pdone) = channel();
-        let t = Thread::start(move|| {
-            let mut hits = 0u;
-            while hits < 10 {
-                match rx.try_recv() {
-                    Ok(()) => { hits += 1; }
-                    Err(Empty) => { Thread::yield_now(); }
-                    Err(Disconnected) => return,
-                }
-            }
-            cdone.send(());
-        });
-        for _ in range(0u, 10) {
-            tx.send(());
-        }
-        t.join();
-        pdone.recv();
-    } }
+    })
 }
 
 #[cfg(test)]
@@ -1700,11 +1664,11 @@ mod sync_tests {
 
     test! { fn oneshot_single_thread_recv_chan_close() {
         // Receiving on a closed chan will panic
-        let res = task::try(move|| {
+        let res = Thread::with_join(move|| {
             let (tx, rx) = sync_channel::<int>(0);
             drop(tx);
             rx.recv();
-        });
+        }).join();
         // What is our res?
         assert!(res.is_err());
     } }
@@ -1777,9 +1741,9 @@ mod sync_tests {
         spawn(move|| {
             drop(tx);
         });
-        let res = task::try(move|| {
+        let res = Thread::with_join(move|| {
             assert!(rx.recv() == box 10);
-        });
+        }).join();
         assert!(res.is_err());
     } }
 
@@ -1799,19 +1763,19 @@ mod sync_tests {
             spawn(move|| {
                 drop(rx);
             });
-            let _ = task::try(move|| {
+            let _ = Thread::with_join(move || {
                 tx.send(1);
-            });
+            }).join();
         }
     } }
 
     test! { fn oneshot_multi_thread_recv_close_stress() {
         for _ in range(0, stress_factor()) {
             let (tx, rx) = sync_channel::<int>(0);
-            spawn(move|| {
-                let res = task::try(move|| {
+            spawn(proc() {
+                let res = Thread::with_join(move|| {
                     rx.recv();
-                });
+                }).join();
                 assert!(res.is_err());
             });
             spawn(move|| {
@@ -1960,7 +1924,7 @@ mod sync_tests {
             tx2.send(());
         });
         // make sure the other task has gone to sleep
-        for _ in range(0u, 5000) { task::deschedule(); }
+        for _ in range(0u, 5000) { Thread::yield_now(); }
 
         // upgrade to a shared chan and send a message
         let t = tx.clone();
@@ -1971,29 +1935,6 @@ mod sync_tests {
         rx2.recv();
     } }
 
-    test! { fn try_recvs_off_the_runtime() {
-        use rt::thread::Thread;
-
-        let (tx, rx) = sync_channel::<()>(0);
-        let (cdone, pdone) = channel();
-        let t = Thread::start(move|| {
-            let mut hits = 0u;
-            while hits < 10 {
-                match rx.try_recv() {
-                    Ok(()) => { hits += 1; }
-                    Err(Empty) => { Thread::yield_now(); }
-                    Err(Disconnected) => return,
-                }
-            }
-            cdone.send(());
-        });
-        for _ in range(0u, 10) {
-            tx.send(());
-        }
-        t.join();
-        pdone.recv();
-    } }
-
     test! { fn send_opt1() {
         let (tx, rx) = sync_channel::<int>(0);
         spawn(move|| { rx.recv(); });
@@ -2052,7 +1993,7 @@ mod sync_tests {
     test! { fn try_send4() {
         let (tx, rx) = sync_channel::<int>(0);
         spawn(move|| {
-            for _ in range(0u, 1000) { task::deschedule(); }
+            for _ in range(0u, 1000) { Thread::yield_now(); }
             assert_eq!(tx.try_send(1), Ok(()));
         });
         assert_eq!(rx.recv(), 1);
diff --git a/src/libstd/comm/oneshot.rs b/src/libstd/comm/oneshot.rs
index 68f3f229cb4..9c5a6518845 100644
--- a/src/libstd/comm/oneshot.rs
+++ b/src/libstd/comm/oneshot.rs
@@ -39,9 +39,8 @@ use self::MyUpgrade::*;
 
 use core::prelude::*;
 
-use alloc::boxed::Box;
 use comm::Receiver;
-use comm::blocking::{mod, WaitToken, SignalToken};
+use comm::blocking::{mod, SignalToken};
 use core::mem;
 use sync::atomic;
 
@@ -143,7 +142,7 @@ impl<T: Send> Packet<T> {
         // Attempt to not block the task (it's a little expensive). If it looks
         // like we're not empty, then immediately go through to `try_recv`.
         if self.state.load(atomic::SeqCst) == EMPTY {
-            let (wait_token, signal_token) = blocking::token();
+            let (wait_token, signal_token) = blocking::tokens();
             let ptr = unsafe { signal_token.cast_to_uint() };
 
             // race with senders to enter the blocking state
@@ -332,7 +331,7 @@ impl<T: Send> Packet<T> {
 
             // If we've got a blocked task, then use an atomic to gain ownership
             // of it (may fail)
-            BLOCKED => self.state.compare_and_swap(BLOCKED, EMPTY, atomic::SeqCst)
+            ptr => self.state.compare_and_swap(ptr, EMPTY, atomic::SeqCst)
         };
 
         // Now that we've got ownership of our state, figure out what to do
diff --git a/src/libstd/comm/select.rs b/src/libstd/comm/select.rs
index 536d38c6e55..690b5861c22 100644
--- a/src/libstd/comm/select.rs
+++ b/src/libstd/comm/select.rs
@@ -54,7 +54,6 @@
 
 use core::prelude::*;
 
-use alloc::boxed::Box;
 use core::cell::Cell;
 use core::kinds::marker;
 use core::mem;
@@ -63,8 +62,6 @@ use core::uint;
 use comm::Receiver;
 use comm::blocking::{mod, SignalToken};
 
-use self::StartResult::*;
-
 /// The "receiver set" of the select interface. This structure is used to manage
 /// a set of receivers which are being selected over.
 pub struct Select {
@@ -190,8 +187,8 @@ impl Select {
             let (wait_token, signal_token) = blocking::tokens();
             for (i, handle) in self.iter().enumerate() {
                 match (*handle).packet.start_selection(signal_token.clone()) {
-                    Installed => {}
-                    Abort => {
+                    StartResult::Installed => {}
+                    StartResult::Abort => {
                         // Go back and abort the already-begun selections
                         for handle in self.iter().take(i) {
                             (*handle).packet.abort_selection();
@@ -417,10 +414,10 @@ mod test {
         let (tx3, rx3) = channel::<int>();
 
         spawn(move|| {
-            for _ in range(0u, 20) { task::deschedule(); }
+            for _ in range(0u, 20) { Thread::yield_now(); }
             tx1.send(1);
             rx3.recv();
-            for _ in range(0u, 20) { task::deschedule(); }
+            for _ in range(0u, 20) { Thread::yield_now(); }
         });
 
         select! {
@@ -440,7 +437,7 @@ mod test {
         let (tx3, rx3) = channel::<()>();
 
         spawn(move|| {
-            for _ in range(0u, 20) { task::deschedule(); }
+            for _ in range(0u, 20) { Thread::yield_now(); }
             tx1.send(1);
             tx2.send(2);
             rx3.recv();
@@ -541,7 +538,7 @@ mod test {
             tx3.send(());
         });
 
-        for _ in range(0u, 1000) { task::deschedule(); }
+        for _ in range(0u, 1000) { Thread::yield_now(); }
         drop(tx1.clone());
         tx2.send(());
         rx3.recv();
@@ -644,7 +641,7 @@ mod test {
             tx2.send(());
         });
 
-        for _ in range(0u, 100) { task::deschedule() }
+        for _ in range(0u, 100) { Thread::yield_now() }
         tx1.send(());
         rx2.recv();
     } }
@@ -663,7 +660,7 @@ mod test {
             tx2.send(());
         });
 
-        for _ in range(0u, 100) { task::deschedule() }
+        for _ in range(0u, 100) { Thread::yield_now() }
         tx1.send(());
         rx2.recv();
     } }
@@ -681,7 +678,7 @@ mod test {
             tx2.send(());
         });
 
-        for _ in range(0u, 100) { task::deschedule() }
+        for _ in range(0u, 100) { Thread::yield_now() }
         tx1.send(());
         rx2.recv();
     } }
@@ -697,7 +694,7 @@ mod test {
     test! { fn sync2() {
         let (tx, rx) = sync_channel::<int>(0);
         spawn(move|| {
-            for _ in range(0u, 100) { task::deschedule() }
+            for _ in range(0u, 100) { Thread::yield_now() }
             tx.send(1);
         });
         select! {
diff --git a/src/libstd/comm/shared.rs b/src/libstd/comm/shared.rs
index 1f1ea2ca9a1..1022694e634 100644
--- a/src/libstd/comm/shared.rs
+++ b/src/libstd/comm/shared.rs
@@ -22,7 +22,6 @@ pub use self::Failure::*;
 
 use core::prelude::*;
 
-use alloc::boxed::Box;
 use core::cmp;
 use core::int;
 
@@ -31,6 +30,7 @@ use comm::mpsc_queue as mpsc;
 use comm::blocking::{mod, SignalToken};
 use comm::select::StartResult;
 use comm::select::StartResult::*;
+use thread::Thread;
 
 const DISCONNECTED: int = int::MIN;
 const FUDGE: int = 1024;
diff --git a/src/libstd/comm/stream.rs b/src/libstd/comm/stream.rs
index a15366d5ebc..b68f626060e 100644
--- a/src/libstd/comm/stream.rs
+++ b/src/libstd/comm/stream.rs
@@ -24,7 +24,6 @@ use self::Message::*;
 
 use core::prelude::*;
 
-use alloc::boxed::Box;
 use core::cmp;
 use core::int;
 use thread::Thread;
@@ -32,7 +31,7 @@ use thread::Thread;
 use sync::atomic;
 use comm::spsc_queue as spsc;
 use comm::Receiver;
-use comm::blocking::{mod, WaitToken, SignalToken};
+use comm::blocking::{mod, SignalToken};
 
 const DISCONNECTED: int = int::MIN;
 #[cfg(test)]
@@ -147,7 +146,7 @@ impl<T: Send> Packet<T> {
         let ptr = self.to_wake.load(atomic::SeqCst);
         self.to_wake.store(0, atomic::SeqCst);
         assert!(ptr != 0);
-        unsafe { SignaToken::cast_from_uint(ptr) }
+        unsafe { SignalToken::cast_from_uint(ptr) }
     }
 
     // Decrements the count on the channel for a sleeper, returning the sleeper
diff --git a/src/libstd/comm/sync.rs b/src/libstd/comm/sync.rs
index 9e4bdb15b00..b24c6d21fba 100644
--- a/src/libstd/comm/sync.rs
+++ b/src/libstd/comm/sync.rs
@@ -38,10 +38,8 @@ use core::prelude::*;
 pub use self::Failure::*;
 use self::Blocker::*;
 
-use alloc::boxed::Box;
 use vec::Vec;
 use core::mem;
-use core::cell::UnsafeCell;
 
 use sync::{atomic, Mutex, MutexGuard};
 use comm::blocking::{mod, WaitToken, SignalToken};
@@ -105,10 +103,10 @@ pub enum Failure {
 
 /// Atomically blocks the current thread, placing it into `slot`, unlocking `lock`
 /// in the meantime. This re-locks the mutex upon returning.
-fn wait<'a, 'b, T>(lock: &'a Mutex<State<T>>,
-                   guard: MutexGuard<'b, State<T>>,
-                   f: fn(BlockedTask) -> Blocker)
-                   -> MutexGuard<'a, State<T>>
+fn wait<'a, 'b, T: Send>(lock: &'a Mutex<State<T>>,
+                         mut guard: MutexGuard<'b, State<T>>,
+                         f: fn(SignalToken) -> Blocker)
+                         -> MutexGuard<'a, State<T>>
 {
     let me: Box<Task> = Local::take();
     me.deschedule(1, |task| {
@@ -170,7 +168,7 @@ impl<T: Send> Packet<T> {
     }
 
     pub fn send(&self, t: T) -> Result<(), T> {
-        let guard = self.acquire_send_slot();
+        let mut guard = self.acquire_send_slot();
         if guard.disconnected { return Err(t) }
         guard.buf.enqueue(t);
 
@@ -183,7 +181,7 @@ impl<T: Send> Packet<T> {
                 let mut canceled = false;
                 assert!(guard.canceled.is_none());
                 guard.canceled = Some(unsafe { mem::transmute(&mut canceled) });
-                let guard = wait(&self.lock, guard, BlockedSender);
+                let mut guard = wait(&self.lock, guard, BlockedSender);
                 if canceled {Err(guard.buf.dequeue())} else {Ok(())}
             }
 
@@ -198,7 +196,7 @@ impl<T: Send> Packet<T> {
     }
 
     pub fn try_send(&self, t: T) -> Result<(), super::TrySendError<T>> {
-        let guard = self.lock.lock();
+        let mut guard = self.lock.lock();
         if guard.disconnected {
             Err(super::RecvDisconnected(t))
         } else if guard.buf.size() == guard.buf.cap() {
@@ -235,13 +233,13 @@ impl<T: Send> Packet<T> {
     // When reading this, remember that there can only ever be one receiver at
     // time.
     pub fn recv(&self) -> Result<T, ()> {
-        let guard = self.lock.lock();
+        let mut guard = self.lock.lock();
 
         // Wait for the buffer to have something in it. No need for a while loop
         // because we're the only receiver.
         let mut waited = false;
         if !guard.disconnected && guard.buf.size() == 0 {
-            wait(&mut guard.blocker, BlockedReceiver, &self.lock);
+            guard = wait(&self.lock, guard, BlockedReceiver);
             waited = true;
         }
         if guard.disconnected && guard.buf.size() == 0 { return Err(()) }
@@ -249,12 +247,12 @@ impl<T: Send> Packet<T> {
         // Pick up the data, wake up our neighbors, and carry on
         assert!(guard.buf.size() > 0);
         let ret = guard.buf.dequeue();
-        self.wakeup_senders(waited, guard, state);
+        self.wakeup_senders(waited, guard);
         return Ok(ret);
     }
 
     pub fn try_recv(&self) -> Result<T, Failure> {
-        let guard = self.lock();
+        let mut guard = self.lock.lock();
 
         // Easy cases first
         if guard.disconnected { return Err(Disconnected) }
@@ -262,7 +260,7 @@ impl<T: Send> Packet<T> {
 
         // Be sure to wake up neighbors
         let ret = Ok(guard.buf.dequeue());
-        self.wakeup_senders(false, guard, state);
+        self.wakeup_senders(false, guard);
 
         return ret;
     }
@@ -272,7 +270,7 @@ impl<T: Send> Packet<T> {
     // * `waited` - flag if the receiver blocked to receive some data, or if it
     //              just picked up some data on the way out
     // * `guard` - the lock guard that is held over this channel's lock
-    fn wakeup_senders(&self, waited: bool, guard: MutexGuard<State<T>>) {
+    fn wakeup_senders(&self, waited: bool, mut guard: MutexGuard<State<T>>) {
         let pending_sender1: Option<SignalToken> = guard.queue.dequeue();
 
         // If this is a no-buffer channel (cap == 0), then if we didn't wait we
@@ -311,7 +309,7 @@ impl<T: Send> Packet<T> {
         }
 
         // Not much to do other than wake up a receiver if one's there
-        let guard = self.lock();
+        let mut guard = self.lock.lock();
         if guard.disconnected { return }
         guard.disconnected = true;
         match mem::replace(&mut guard.blocker, NoneBlocked) {
@@ -322,7 +320,7 @@ impl<T: Send> Packet<T> {
     }
 
     pub fn drop_port(&self) {
-        let guard = self.lock();
+        let mut guard = self.lock.lock();
 
         if guard.disconnected { return }
         guard.disconnected = true;
@@ -368,14 +366,14 @@ impl<T: Send> Packet<T> {
     // If Ok, the value is whether this port has data, if Err, then the upgraded
     // port needs to be checked instead of this one.
     pub fn can_recv(&self) -> bool {
-        let guard = self.lock();
+        let guard = self.lock.lock();
         guard.disconnected || guard.buf.size() > 0
     }
 
     // Attempts to start selection on this port. This can either succeed or fail
     // because there is data waiting.
     pub fn start_selection(&self, token: SignalToken) -> StartResult {
-        let guard = self.lock();
+        let mut guard = self.lock.lock();
         if guard.disconnected || guard.buf.size() > 0 {
             Abort
         } else {
@@ -393,7 +391,7 @@ impl<T: Send> Packet<T> {
     //
     // The return value indicates whether there's data on this port.
     pub fn abort_selection(&self) -> bool {
-        let guard = self.lock();
+        let mut guard = self.lock.lock();
         match mem::replace(&mut guard.blocker, NoneBlocked) {
             NoneBlocked => true,
             BlockedSender(token) => {
@@ -409,7 +407,7 @@ impl<T: Send> Packet<T> {
 impl<T: Send> Drop for Packet<T> {
     fn drop(&mut self) {
         assert_eq!(self.channels.load(atomic::SeqCst), 0);
-        let guard = self.lock();
+        let mut guard = self.lock.lock();
         assert!(guard.queue.dequeue().is_none());
         assert!(guard.canceled.is_none());
     }