diff options
| author | Alex Crichton <alex@alexcrichton.com> | 2013-12-15 18:17:43 -0800 |
|---|---|---|
| committer | Alex Crichton <alex@alexcrichton.com> | 2013-12-16 22:55:49 -0800 |
| commit | 39a6c9d6376f96cc8b905f00b88d20e42961206c (patch) | |
| tree | 9dde8eaa226ad53aa1d80233557a30811d913ef3 | |
| parent | 529e268ab900f1b6e731af64ce2aeecda3555f4e (diff) | |
| download | rust-39a6c9d6376f96cc8b905f00b88d20e42961206c.tar.gz rust-39a6c9d6376f96cc8b905f00b88d20e42961206c.zip | |
Test fallout from std::comm rewrite
| -rw-r--r-- | doc/tutorial-tasks.md | 46 | ||||
| -rw-r--r-- | src/libextra/arc.rs | 3 | ||||
| -rw-r--r-- | src/libextra/sync.rs | 2 | ||||
| -rw-r--r-- | src/librustuv/net.rs | 39 | ||||
| -rw-r--r-- | src/librustuv/pipe.rs | 5 | ||||
| -rw-r--r-- | src/librustuv/signal.rs | 4 | ||||
| -rw-r--r-- | src/librustuv/timer.rs | 42 | ||||
| -rw-r--r-- | src/libstd/comm/mod.rs | 7 | ||||
| -rw-r--r-- | src/libstd/comm/select.rs | 13 | ||||
| -rw-r--r-- | src/libstd/io/net/unix.rs | 4 | ||||
| -rw-r--r-- | src/libstd/rt/thread.rs | 14 | ||||
| -rw-r--r-- | src/libstd/task/spawn.rs | 2 | ||||
| -rw-r--r-- | src/test/bench/msgsend-pipes-shared.rs | 9 | ||||
| -rw-r--r-- | src/test/bench/msgsend-pipes.rs | 33 | ||||
| -rw-r--r-- | src/test/bench/rt-messaging-ping-pong.rs | 4 | ||||
| -rw-r--r-- | src/test/bench/rt-parfib.rs | 3 | ||||
| -rw-r--r-- | src/test/bench/shootout-chameneos-redux.rs | 9 | ||||
| -rw-r--r-- | src/test/bench/shootout-k-nucleotide-pipes.rs | 5 | ||||
| -rw-r--r-- | src/test/bench/shootout-pfib.rs | 7 | ||||
| -rw-r--r-- | src/test/bench/shootout-threadring.rs | 4 | ||||
| -rw-r--r-- | src/test/bench/task-perf-jargon-metal-smoke.rs | 4 | ||||
| -rw-r--r-- | src/test/run-pass/hashmap-memory.rs | 2 | ||||
| -rw-r--r-- | src/test/run-pass/task-comm-14.rs | 2 |
23 files changed, 140 insertions, 123 deletions
diff --git a/doc/tutorial-tasks.md b/doc/tutorial-tasks.md index 41cd796325c..6213a0cfe1c 100644 --- a/doc/tutorial-tasks.md +++ b/doc/tutorial-tasks.md @@ -121,7 +121,7 @@ receiving messages. Pipes are low-level communication building-blocks and so come in a variety of forms, each one appropriate for a different use case. In what follows, we cover the most commonly used varieties. -The simplest way to create a pipe is to use the `comm::stream` +The simplest way to create a pipe is to use `Chan::new` function to create a `(Port, Chan)` pair. In Rust parlance, a *channel* is a sending endpoint of a pipe, and a *port* is the receiving endpoint. Consider the following example of calculating two results @@ -129,9 +129,8 @@ concurrently: ~~~~ # use std::task::spawn; -# use std::comm::{stream, Port, Chan}; -let (port, chan): (Port<int>, Chan<int>) = stream(); +let (port, chan): (Port<int>, Chan<int>) = Chan::new(); do spawn || { let result = some_expensive_computation(); @@ -150,8 +149,7 @@ stream for sending and receiving integers (the left-hand side of the `let`, a tuple into its component parts). ~~~~ -# use std::comm::{stream, Chan, Port}; -let (port, chan): (Port<int>, Chan<int>) = stream(); +let (port, chan): (Port<int>, Chan<int>) = Chan::new(); ~~~~ The child task will use the channel to send data to the parent task, @@ -160,9 +158,8 @@ spawns the child task. ~~~~ # use std::task::spawn; -# use std::comm::stream; # fn some_expensive_computation() -> int { 42 } -# let (port, chan) = stream(); +# let (port, chan) = Chan::new(); do spawn || { let result = some_expensive_computation(); chan.send(result); @@ -180,25 +177,23 @@ computation, then waits for the child's result to arrive on the port: ~~~~ -# use std::comm::{stream}; # fn some_other_expensive_computation() {} -# let (port, chan) = stream::<int>(); +# let (port, chan) = Chan::<int>::new(); # chan.send(0); some_other_expensive_computation(); let result = port.recv(); ~~~~ -The `Port` and `Chan` pair created by `stream` enables efficient communication -between a single sender and a single receiver, but multiple senders cannot use -a single `Chan`, and multiple receivers cannot use a single `Port`. What if our -example needed to compute multiple results across a number of tasks? The -following program is ill-typed: +The `Port` and `Chan` pair created by `Chan::new` enables efficient +communication between a single sender and a single receiver, but multiple +senders cannot use a single `Chan`, and multiple receivers cannot use a single +`Port`. What if our example needed to compute multiple results across a number +of tasks? The following program is ill-typed: ~~~ {.xfail-test} # use std::task::{spawn}; -# use std::comm::{stream, Port, Chan}; # fn some_expensive_computation() -> int { 42 } -let (port, chan) = stream(); +let (port, chan) = Chan::new(); do spawn { chan.send(some_expensive_computation()); @@ -216,10 +211,8 @@ Instead we can use a `SharedChan`, a type that allows a single ~~~ # use std::task::spawn; -# use std::comm::{stream, SharedChan}; -let (port, chan) = stream(); -let chan = SharedChan::new(chan); +let (port, chan) = SharedChan::new(); for init_val in range(0u, 3) { // Create a new channel handle to distribute to the child task @@ -238,23 +231,22 @@ Here we transfer ownership of the channel into a new `SharedChan` value. Like as an *affine* or *linear* type). Unlike with `Chan`, though, the programmer may duplicate a `SharedChan`, with the `clone()` method. A cloned `SharedChan` produces a new handle to the same channel, allowing multiple -tasks to send data to a single port. Between `spawn`, `stream` and +tasks to send data to a single port. Between `spawn`, `Chan` and `SharedChan`, we have enough tools to implement many useful concurrency patterns. Note that the above `SharedChan` example is somewhat contrived since -you could also simply use three `stream` pairs, but it serves to +you could also simply use three `Chan` pairs, but it serves to illustrate the point. For reference, written with multiple streams, it might look like the example below. ~~~ # use std::task::spawn; -# use std::comm::stream; # use std::vec; // Create a vector of ports, one for each child task let ports = vec::from_fn(3, |init_val| { - let (port, chan) = stream(); + let (port, chan) = Chan::new(); do spawn { chan.send(some_expensive_computation(init_val)); } @@ -341,7 +333,7 @@ fn main() { let numbers_arc = Arc::new(numbers); for num in range(1u, 10) { - let (port, chan) = stream(); + let (port, chan) = Chan::new(); chan.send(numbers_arc.clone()); do spawn { @@ -370,7 +362,7 @@ and a clone of it is sent to each task # use std::rand; # let numbers=vec::from_fn(1000000, |_| rand::random::<f64>()); # let numbers_arc = Arc::new(numbers); -# let (port, chan) = stream(); +# let (port, chan) = Chan::new(); chan.send(numbers_arc.clone()); ~~~ copying only the wrapper and not its contents. @@ -382,7 +374,7 @@ Each task recovers the underlying data by # use std::rand; # let numbers=vec::from_fn(1000000, |_| rand::random::<f64>()); # let numbers_arc=Arc::new(numbers); -# let (port, chan) = stream(); +# let (port, chan) = Chan::new(); # chan.send(numbers_arc.clone()); # let local_arc : Arc<~[f64]> = port.recv(); let task_numbers = local_arc.get(); @@ -499,7 +491,7 @@ Here is the code for the parent task: # } # fn main() { -let (from_child, to_child) = DuplexStream(); +let (from_child, to_child) = DuplexStream::new(); do spawn { stringifier(&to_child); diff --git a/src/libextra/arc.rs b/src/libextra/arc.rs index ea8066b786f..6add053fa81 100644 --- a/src/libextra/arc.rs +++ b/src/libextra/arc.rs @@ -635,9 +635,8 @@ mod tests { }) } - let mut c = Some(c); arc.access_cond(|state, cond| { - c.take_unwrawp().send(()); + c.send(()); assert!(!*state); while !*state { cond.wait(); diff --git a/src/libextra/sync.rs b/src/libextra/sync.rs index 1cc403c32f4..6e582982962 100644 --- a/src/libextra/sync.rs +++ b/src/libextra/sync.rs @@ -950,7 +950,6 @@ mod tests { let mi = m2.clone(); // spawn sibling task do task::spawn { // linked - let mut c = Some(c); mi.lock_cond(|cond| { c.send(()); // tell sibling to go ahead (|| { @@ -994,6 +993,7 @@ mod tests { }) } #[test] + #[ignore(reason = "linked failure?")] fn test_mutex_different_conds() { let result = do task::try { let m = Mutex::new_with_condvars(2); diff --git a/src/librustuv/net.rs b/src/librustuv/net.rs index 6f1930bc7fe..ce543eafd2f 100644 --- a/src/librustuv/net.rs +++ b/src/librustuv/net.rs @@ -646,7 +646,6 @@ impl Drop for UdpWatcher { #[cfg(test)] mod test { - use std::comm::oneshot; use std::rt::test::*; use std::rt::rtio::{RtioTcpStream, RtioTcpListener, RtioTcpAcceptor, RtioUdpSocket}; @@ -689,7 +688,7 @@ mod test { #[test] fn listen_ip4() { - let (port, chan) = oneshot(); + let (port, chan) = Chan::new(); let addr = next_test_ip4(); do spawn { @@ -725,7 +724,7 @@ mod test { #[test] fn listen_ip6() { - let (port, chan) = oneshot(); + let (port, chan) = Chan::new(); let addr = next_test_ip6(); do spawn { @@ -761,7 +760,7 @@ mod test { #[test] fn udp_recv_ip4() { - let (port, chan) = oneshot(); + let (port, chan) = Chan::new(); let client = next_test_ip4(); let server = next_test_ip4(); @@ -793,7 +792,7 @@ mod test { #[test] fn udp_recv_ip6() { - let (port, chan) = oneshot(); + let (port, chan) = Chan::new(); let client = next_test_ip6(); let server = next_test_ip6(); @@ -828,7 +827,7 @@ mod test { use std::rt::rtio::*; let addr = next_test_ip4(); static MAX: uint = 5000; - let (port, chan) = oneshot(); + let (port, chan) = Chan::new(); do spawn { let listener = TcpListener::bind(local_loop(), addr).unwrap(); @@ -865,7 +864,7 @@ mod test { fn test_udp_twice() { let server_addr = next_test_ip4(); let client_addr = next_test_ip4(); - let (port, chan) = oneshot(); + let (port, chan) = Chan::new(); do spawn { let mut client = UdpWatcher::bind(local_loop(), client_addr).unwrap(); @@ -896,8 +895,8 @@ mod test { let client_in_addr = next_test_ip4(); static MAX: uint = 500_000; - let (p1, c1) = oneshot(); - let (p2, c2) = oneshot(); + let (p1, c1) = Chan::new(); + let (p2, c2) = Chan::new(); do spawn { let l = local_loop(); @@ -953,12 +952,12 @@ mod test { #[test] fn test_read_and_block() { let addr = next_test_ip4(); - let (port, chan) = oneshot(); + let (port, chan) = Chan::new(); do spawn { let listener = TcpListener::bind(local_loop(), addr).unwrap(); let mut acceptor = listener.listen().unwrap(); - let (port2, chan2) = stream(); + let (port2, chan2) = Chan::new(); chan.send(port2); let mut stream = acceptor.accept().unwrap(); let mut buf = [0, .. 2048]; @@ -1026,7 +1025,7 @@ mod test { // thread, close itself, and then come back to the last thread. #[test] fn test_homing_closes_correctly() { - let (port, chan) = oneshot(); + let (port, chan) = Chan::new(); do task::spawn_sched(task::SingleThreaded) { let listener = UdpWatcher::bind(local_loop(), next_test_ip4()).unwrap(); @@ -1048,9 +1047,9 @@ mod test { use std::rt::sched::{Shutdown, TaskFromFriend}; use std::rt::sleeper_list::SleeperList; use std::rt::task::Task; - use std::rt::task::UnwindResult; use std::rt::thread::Thread; use std::rt::deque::BufferPool; + use std::task::TaskResult; use std::unstable::run_in_bare_thread; use uvio::UvEventLoop; @@ -1072,12 +1071,12 @@ mod test { let handle2 = sched2.make_handle(); let tasksFriendHandle = sched2.make_handle(); - let on_exit: proc(UnwindResult) = proc(exit_status) { + let on_exit: proc(TaskResult) = proc(exit_status) { let mut handle1 = handle1; let mut handle2 = handle2; handle1.send(Shutdown); handle2.send(Shutdown); - assert!(exit_status.is_success()); + assert!(exit_status.is_ok()); }; unsafe fn local_io() -> &'static mut IoFactory { @@ -1148,7 +1147,7 @@ mod test { #[should_fail] #[test] fn tcp_stream_fail_cleanup() { - let (port, chan) = oneshot(); + let (port, chan) = Chan::new(); let addr = next_test_ip4(); do spawn { @@ -1172,7 +1171,7 @@ mod test { #[should_fail] #[test] fn udp_fail_other_task() { let addr = next_test_ip4(); - let (port, chan) = oneshot(); + let (port, chan) = Chan::new(); // force the handle to be created on a different scheduler, failure in // the original task will force a homing operation back to this @@ -1190,7 +1189,7 @@ mod test { #[test] #[ignore(reason = "linked failure")] fn linked_failure1() { - let (port, chan) = oneshot(); + let (port, chan) = Chan::new(); let addr = next_test_ip4(); do spawn { @@ -1208,7 +1207,7 @@ mod test { #[test] #[ignore(reason = "linked failure")] fn linked_failure2() { - let (port, chan) = oneshot(); + let (port, chan) = Chan::new(); let addr = next_test_ip4(); do spawn { @@ -1229,7 +1228,7 @@ mod test { #[test] #[ignore(reason = "linked failure")] fn linked_failure3() { - let (port, chan) = stream(); + let (port, chan) = Chan::new(); let addr = next_test_ip4(); do spawn { diff --git a/src/librustuv/pipe.rs b/src/librustuv/pipe.rs index 86ebae45f19..814205cbbf1 100644 --- a/src/librustuv/pipe.rs +++ b/src/librustuv/pipe.rs @@ -231,7 +231,6 @@ impl HomingIO for PipeAcceptor { #[cfg(test)] mod tests { - use std::comm::oneshot; use std::rt::rtio::{RtioUnixListener, RtioUnixAcceptor, RtioPipe}; use std::rt::test::next_test_unix; @@ -274,7 +273,7 @@ mod tests { fn connect() { let path = next_test_unix(); let path2 = path.clone(); - let (port, chan) = oneshot(); + let (port, chan) = Chan::new(); do spawn { let p = PipeListener::bind(local_loop(), &path2.to_c_str()).unwrap(); @@ -298,7 +297,7 @@ mod tests { fn connect_fail() { let path = next_test_unix(); let path2 = path.clone(); - let (port, chan) = oneshot(); + let (port, chan) = Chan::new(); do spawn { let p = PipeListener::bind(local_loop(), &path2.to_c_str()).unwrap(); diff --git a/src/librustuv/signal.rs b/src/librustuv/signal.rs index 67777050cf3..f082aef003c 100644 --- a/src/librustuv/signal.rs +++ b/src/librustuv/signal.rs @@ -78,13 +78,11 @@ mod test { use super::*; use super::super::local_loop; use std::io::signal; - use std::comm::{SharedChan, stream}; #[test] fn closing_channel_during_drop_doesnt_kill_everything() { // see issue #10375, relates to timers as well. - let (port, chan) = stream(); - let chan = SharedChan::new(chan); + let (port, chan) = SharedChan::new(); let _signal = SignalWatcher::new(local_loop(), signal::Interrupt, chan); diff --git a/src/librustuv/timer.rs b/src/librustuv/timer.rs index 7efdafd2369..ab143d6e8b0 100644 --- a/src/librustuv/timer.rs +++ b/src/librustuv/timer.rs @@ -23,12 +23,13 @@ pub struct TimerWatcher { handle: *uvll::uv_timer_t, home: SchedHandle, action: Option<NextAction>, + id: uint, // see comments in timer_cb } pub enum NextAction { WakeTask(BlockedTask), SendOnce(Chan<()>), - SendMany(Chan<()>), + SendMany(Chan<()>, uint), } impl TimerWatcher { @@ -41,6 +42,7 @@ impl TimerWatcher { handle: handle, action: None, home: get_handle_to_current_scheduler!(), + id: 0, }; return me.install(); } @@ -72,6 +74,7 @@ impl RtioTimer for TimerWatcher { // we must temporarily un-home ourselves, then destroy the action, and // then re-home again. let missile = self.fire_homing_missile(); + self.id += 1; self.stop(); let _missile = match util::replace(&mut self.action, None) { None => missile, // no need to do a homing dance @@ -101,6 +104,7 @@ impl RtioTimer for TimerWatcher { // of the homing missile let _prev_action = { let _m = self.fire_homing_missile(); + self.id += 1; self.stop(); self.start(msecs, 0); util::replace(&mut self.action, Some(SendOnce(chan))) @@ -116,9 +120,10 @@ impl RtioTimer for TimerWatcher { // of the homing missile let _prev_action = { let _m = self.fire_homing_missile(); + self.id += 1; self.stop(); self.start(msecs, msecs); - util::replace(&mut self.action, Some(SendMany(chan))) + util::replace(&mut self.action, Some(SendMany(chan, self.id))) }; return port; @@ -135,10 +140,21 @@ extern fn timer_cb(handle: *uvll::uv_timer_t, status: c_int) { let sched: ~Scheduler = Local::take(); sched.resume_blocked_task_immediately(task); } - SendOnce(chan) => chan.send_deferred(()), - SendMany(chan) => { - chan.send_deferred(()); - timer.action = Some(SendMany(chan)); + SendOnce(chan) => { chan.try_send_deferred(()); } + SendMany(chan, id) => { + chan.try_send_deferred(()); + + // Note that the above operation could have performed some form of + // scheduling. This means that the timer may have decided to insert + // some other action to happen. This 'id' keeps track of the updates + // to the timer, so we only reset the action back to sending on this + // channel if the id has remained the same. This is essentially a + // bug in that we have mutably aliasable memory, but that's libuv + // for you. We're guaranteed to all be running on the same thread, + // so there's no need for any synchronization here. + if timer.id == id { + timer.action = Some(SendMany(chan, id)); + } } } } @@ -180,8 +196,8 @@ mod test { let oport = timer.oneshot(1); let pport = timer.period(1); timer.sleep(1); - assert_eq!(oport.try_recv(), None); - assert_eq!(pport.try_recv(), None); + assert_eq!(oport.recv_opt(), None); + assert_eq!(pport.recv_opt(), None); timer.oneshot(1).recv(); } @@ -230,7 +246,7 @@ mod test { let timer_port = timer.period(1000); do spawn { - timer_port.try_recv(); + timer_port.recv_opt(); } // when we drop the TimerWatcher we're going to destroy the channel, @@ -244,7 +260,7 @@ mod test { let timer_port = timer.period(1000); do spawn { - timer_port.try_recv(); + timer_port.recv_opt(); } timer.oneshot(1); @@ -256,7 +272,7 @@ mod test { let timer_port = timer.period(1000); do spawn { - timer_port.try_recv(); + timer_port.recv_opt(); } timer.sleep(1); @@ -268,7 +284,7 @@ mod test { let mut timer = TimerWatcher::new(local_loop()); timer.oneshot(1000) }; - assert_eq!(port.try_recv(), None); + assert_eq!(port.recv_opt(), None); } #[test] @@ -277,7 +293,7 @@ mod test { let mut timer = TimerWatcher::new(local_loop()); timer.period(1000) }; - assert_eq!(port.try_recv(), None); + assert_eq!(port.recv_opt(), None); } #[test] diff --git a/src/libstd/comm/mod.rs b/src/libstd/comm/mod.rs index 9a65e9973cb..4cbc6c7cbb7 100644 --- a/src/libstd/comm/mod.rs +++ b/src/libstd/comm/mod.rs @@ -255,7 +255,9 @@ macro_rules! test ( fn f() $b $($a)* #[test] fn uv() { f() } - $($a)* #[test] fn native() { + $($a)* #[test] + #[ignore(cfg(windows))] // FIXME(#11003) + fn native() { use unstable::run_in_bare_thread; run_in_bare_thread(f); } @@ -1021,6 +1023,7 @@ mod test { } #[test] + #[ignore(cfg(windows))] // FIXME(#11003) fn send_from_outside_runtime() { let (p, c) = Chan::<int>::new(); let (p1, c1) = Chan::new(); @@ -1040,6 +1043,7 @@ mod test { } #[test] + #[ignore(cfg(windows))] // FIXME(#11003) fn recv_from_outside_runtime() { let (p, c) = Chan::<int>::new(); let t = do Thread::start { @@ -1054,6 +1058,7 @@ mod test { } #[test] + #[ignore(cfg(windows))] // FIXME(#11003) fn no_runtime() { let (p1, c1) = Chan::<int>::new(); let (p2, c2) = Chan::<int>::new(); diff --git a/src/libstd/comm/select.rs b/src/libstd/comm/select.rs index 2d9bc6e9c12..4d6b540f2a5 100644 --- a/src/libstd/comm/select.rs +++ b/src/libstd/comm/select.rs @@ -83,10 +83,10 @@ pub struct Select { /// A handle to a port which is currently a member of a `Select` set of ports. /// This handle is used to keep the port in the set as well as interact with the /// underlying port. -pub struct Handle<'self, T> { +pub struct Handle<'port, T> { id: uint, - priv selector: &'self Select, - priv port: &'self mut Port<T>, + priv selector: &'port Select, + priv port: &'port mut Port<T>, } struct PacketIterator { priv cur: *mut Packet } @@ -234,6 +234,7 @@ impl Select { assert!(!(*packet).selecting.load(Relaxed)); } + assert!(ready_id != uint::max_value); return ready_id; } } @@ -261,7 +262,7 @@ impl Select { fn iter(&self) -> PacketIterator { PacketIterator { cur: self.head } } } -impl<'self, T: Send> Handle<'self, T> { +impl<'port, T: Send> Handle<'port, T> { /// Receive a value on the underlying port. Has the same semantics as /// `Port.recv` pub fn recv(&mut self) -> T { self.port.recv() } @@ -283,7 +284,7 @@ impl Drop for Select { } #[unsafe_destructor] -impl<'self, T: Send> Drop for Handle<'self, T> { +impl<'port, T: Send> Drop for Handle<'port, T> { fn drop(&mut self) { unsafe { self.selector.remove(self.port.queue.packet()) } } @@ -437,6 +438,7 @@ mod test { } #[test] + #[ignore(cfg(windows))] // FIXME(#11003) fn stress_native() { use std::rt::thread::Thread; use std::unstable::run_in_bare_thread; @@ -470,6 +472,7 @@ mod test { } #[test] + #[ignore(cfg(windows))] // FIXME(#11003) fn native_both_ready() { use std::rt::thread::Thread; use std::unstable::run_in_bare_thread; diff --git a/src/libstd/io/net/unix.rs b/src/libstd/io/net/unix.rs index d3fc265cf2a..49770b80060 100644 --- a/src/libstd/io/net/unix.rs +++ b/src/libstd/io/net/unix.rs @@ -163,11 +163,11 @@ mod tests { do spawntask { let mut acceptor = UnixListener::bind(&path1).listen(); chan.send(()); - server.take()(acceptor.accept().unwrap()); + server(acceptor.accept().unwrap()); } port.recv(); - client.take()(UnixStream::connect(&path2).unwrap()); + client(UnixStream::connect(&path2).unwrap()); } } diff --git a/src/libstd/rt/thread.rs b/src/libstd/rt/thread.rs index da02988c75c..6128f310a2e 100644 --- a/src/libstd/rt/thread.rs +++ b/src/libstd/rt/thread.rs @@ -132,11 +132,13 @@ impl<T: Send> Drop for Thread<T> { #[cfg(windows)] mod imp { + use super::DEFAULT_STACK_SIZE; + + use cast; + use libc; use libc::types::os::arch::extra::{LPSECURITY_ATTRIBUTES, SIZE_T, BOOL, LPVOID, DWORD, LPDWORD, HANDLE}; - use libc; - use cast; - use super::DEFAULT_STACK_SIZE; + use ptr; pub type rust_thread = HANDLE; pub type rust_thread_return = DWORD; @@ -210,9 +212,10 @@ mod imp { } #[cfg(target_os = "macos")] + #[cfg(target_os = "android")] pub unsafe fn yield_now() { assert_eq!(sched_yield(), 0); } - #[cfg(not(target_os = "macos"))] + #[cfg(not(target_os = "macos"), not(target_os = "android"))] pub unsafe fn yield_now() { assert_eq!(pthread_yield(), 0); } extern { @@ -230,8 +233,9 @@ mod imp { fn pthread_detach(thread: libc::pthread_t) -> libc::c_int; #[cfg(target_os = "macos")] + #[cfg(target_os = "android")] fn sched_yield() -> libc::c_int; - #[cfg(not(target_os = "macos"))] + #[cfg(not(target_os = "macos"), not(target_os = "android"))] fn pthread_yield() -> libc::c_int; } } diff --git a/src/libstd/task/spawn.rs b/src/libstd/task/spawn.rs index eb3e19f4a5a..1148774020a 100644 --- a/src/libstd/task/spawn.rs +++ b/src/libstd/task/spawn.rs @@ -171,7 +171,7 @@ pub fn spawn_raw(mut opts: TaskOpts, f: proc()) { if opts.notify_chan.is_some() { let notify_chan = opts.notify_chan.take_unwrap(); let on_exit: proc(TaskResult) = proc(task_result) { - notify_chan.send(task_result) + notify_chan.try_send(task_result); }; task.death.on_exit = Some(on_exit); } diff --git a/src/test/bench/msgsend-pipes-shared.rs b/src/test/bench/msgsend-pipes-shared.rs index 2a5971be216..50cb00b25d4 100644 --- a/src/test/bench/msgsend-pipes-shared.rs +++ b/src/test/bench/msgsend-pipes-shared.rs @@ -20,7 +20,6 @@ extern mod extra; -use std::comm::{Port, Chan, SharedChan}; use std::comm; use std::os; use std::task; @@ -38,7 +37,7 @@ fn server(requests: &Port<request>, responses: &Chan<uint>) { let mut count = 0u; let mut done = false; while !done { - match requests.try_recv() { + match requests.recv_opt() { Some(get_count) => { responses.send(count.clone()); } Some(bytes(b)) => { //error!("server: received {:?} bytes", b); @@ -53,10 +52,8 @@ fn server(requests: &Port<request>, responses: &Chan<uint>) { } fn run(args: &[~str]) { - let (from_child, to_parent) = comm::stream(); - let (from_parent, to_child) = comm::stream(); - - let to_child = SharedChan::new(to_child); + let (from_child, to_parent) = Chan::new(); + let (from_parent, to_child) = SharedChan::new(); let size = from_str::<uint>(args[1]).unwrap(); let workers = from_str::<uint>(args[2]).unwrap(); diff --git a/src/test/bench/msgsend-pipes.rs b/src/test/bench/msgsend-pipes.rs index 1ff531324b3..3cf1a97a36e 100644 --- a/src/test/bench/msgsend-pipes.rs +++ b/src/test/bench/msgsend-pipes.rs @@ -16,7 +16,6 @@ extern mod extra; -use std::comm::{SharedChan, Chan, stream}; use std::os; use std::task; use std::uint; @@ -33,7 +32,7 @@ fn server(requests: &Port<request>, responses: &Chan<uint>) { let mut count: uint = 0; let mut done = false; while !done { - match requests.try_recv() { + match requests.recv_opt() { Some(get_count) => { responses.send(count.clone()); } Some(bytes(b)) => { //error!("server: received {:?} bytes", b); @@ -48,17 +47,15 @@ fn server(requests: &Port<request>, responses: &Chan<uint>) { } fn run(args: &[~str]) { - let (from_child, to_parent) = stream(); - let (from_parent, to_child) = stream(); - let to_child = SharedChan::new(to_child); + let (from_child, to_parent) = Chan::new(); let size = from_str::<uint>(args[1]).unwrap(); let workers = from_str::<uint>(args[2]).unwrap(); let num_bytes = 100; let start = extra::time::precise_time_s(); let mut worker_results = ~[]; - for _ in range(0u, workers) { - let to_child = to_child.clone(); + let from_parent = if workers == 1 { + let (from_parent, to_child) = Chan::new(); let mut builder = task::task(); worker_results.push(builder.future_result()); do builder.spawn { @@ -68,7 +65,23 @@ fn run(args: &[~str]) { } //error!("worker {:?} exiting", i); }; - } + from_parent + } else { + let (from_parent, to_child) = SharedChan::new(); + for _ in range(0u, workers) { + let to_child = to_child.clone(); + let mut builder = task::task(); + worker_results.push(builder.future_result()); + do builder.spawn { + for _ in range(0u, size / workers) { + //error!("worker {:?}: sending {:?} bytes", i, num_bytes); + to_child.send(bytes(num_bytes)); + } + //error!("worker {:?} exiting", i); + }; + } + from_parent + }; do task::spawn || { server(&from_parent, &to_parent); } @@ -78,8 +91,8 @@ fn run(args: &[~str]) { } //error!("sending stop message"); - to_child.send(stop); - move_out(to_child); + //to_child.send(stop); + //move_out(to_child); let result = from_child.recv(); let end = extra::time::precise_time_s(); let elapsed = end - start; diff --git a/src/test/bench/rt-messaging-ping-pong.rs b/src/test/bench/rt-messaging-ping-pong.rs index 8fa26b42e85..90d81aa7c3e 100644 --- a/src/test/bench/rt-messaging-ping-pong.rs +++ b/src/test/bench/rt-messaging-ping-pong.rs @@ -24,9 +24,9 @@ fn ping_pong_bench(n: uint, m: uint) { // Create pairs of tasks that pingpong back and forth. fn run_pair(n: uint) { // Create a stream A->B - let (pa,ca) = stream::<()>(); + let (pa,ca) = Chan::<()>::new(); // Create a stream B->A - let (pb,cb) = stream::<()>(); + let (pb,cb) = Chan::<()>::new(); do spawntask_later() || { let chan = ca; diff --git a/src/test/bench/rt-parfib.rs b/src/test/bench/rt-parfib.rs index e6519a78856..ab607d9aebc 100644 --- a/src/test/bench/rt-parfib.rs +++ b/src/test/bench/rt-parfib.rs @@ -13,7 +13,6 @@ extern mod extra; use std::os; use std::uint; use std::rt::test::spawntask_later; -use std::comm::oneshot; // A simple implementation of parfib. One subtree is found in a new // task and communicated over a oneshot pipe, the other is found @@ -24,7 +23,7 @@ fn parfib(n: uint) -> uint { return 1; } - let (port,chan) = oneshot::<uint>(); + let (port,chan) = Chan::new(); do spawntask_later { chan.send(parfib(n-1)); }; diff --git a/src/test/bench/shootout-chameneos-redux.rs b/src/test/bench/shootout-chameneos-redux.rs index 464bc664fb5..7801a64fced 100644 --- a/src/test/bench/shootout-chameneos-redux.rs +++ b/src/test/bench/shootout-chameneos-redux.rs @@ -12,7 +12,6 @@ extern mod extra; -use std::comm::{stream, SharedChan}; use std::option; use std::os; use std::task; @@ -138,10 +137,8 @@ fn creature( fn rendezvous(nn: uint, set: ~[color]) { // these ports will allow us to hear from the creatures - let (from_creatures, to_rendezvous) = stream::<CreatureInfo>(); - let to_rendezvous = SharedChan::new(to_rendezvous); - let (from_creatures_log, to_rendezvous_log) = stream::<~str>(); - let to_rendezvous_log = SharedChan::new(to_rendezvous_log); + let (from_creatures, to_rendezvous) = SharedChan::<CreatureInfo>::new(); + let (from_creatures_log, to_rendezvous_log) = SharedChan::<~str>::new(); // these channels will be passed to the creatures so they can talk to us @@ -154,7 +151,7 @@ fn rendezvous(nn: uint, set: ~[color]) { let col = *col; let to_rendezvous = to_rendezvous.clone(); let to_rendezvous_log = to_rendezvous_log.clone(); - let (from_rendezvous, to_creature) = stream(); + let (from_rendezvous, to_creature) = Chan::new(); do task::spawn { creature(ii, col, diff --git a/src/test/bench/shootout-k-nucleotide-pipes.rs b/src/test/bench/shootout-k-nucleotide-pipes.rs index a12eac50852..96de6097873 100644 --- a/src/test/bench/shootout-k-nucleotide-pipes.rs +++ b/src/test/bench/shootout-k-nucleotide-pipes.rs @@ -17,7 +17,6 @@ extern mod extra; use extra::sort; use std::cmp::Ord; -use std::comm::{stream, Port, Chan}; use std::comm; use std::hashmap::HashMap; use std::option; @@ -165,7 +164,7 @@ fn main() { // initialize each sequence sorter let sizes = ~[1u,2,3,4,6,12,18]; - let mut streams = vec::from_fn(sizes.len(), |_| Some(stream::<~str>())); + let mut streams = vec::from_fn(sizes.len(), |_| Some(Chan::<~str>::new())); let mut from_child = ~[]; let to_child = sizes.iter().zip(streams.mut_iter()).map(|(sz, stream_ref)| { let sz = *sz; @@ -174,7 +173,7 @@ fn main() { from_child.push(from_child_); - let (from_parent, to_child) = comm::stream(); + let (from_parent, to_child) = Chan::new(); do spawn { make_sequence_processor(sz, &from_parent, &to_parent_); diff --git a/src/test/bench/shootout-pfib.rs b/src/test/bench/shootout-pfib.rs index da25f1e82ee..aa060ceb097 100644 --- a/src/test/bench/shootout-pfib.rs +++ b/src/test/bench/shootout-pfib.rs @@ -21,7 +21,6 @@ extern mod extra; use extra::{time, getopts}; -use std::comm::{stream, SharedChan}; use std::os; use std::result::{Ok, Err}; use std::task; @@ -34,8 +33,7 @@ fn fib(n: int) -> int { } else if n <= 2 { c.send(1); } else { - let (pp, cc) = stream(); - let cc = SharedChan::new(cc); + let (pp, cc) = SharedChan::new(); let ch = cc.clone(); task::spawn(proc() pfib(&ch, n - 1)); let ch = cc.clone(); @@ -44,8 +42,7 @@ fn fib(n: int) -> int { } } - let (p, ch) = stream(); - let ch = SharedChan::new(ch); + let (p, ch) = SharedChan::new(); let _t = task::spawn(proc() pfib(&ch, n) ); p.recv() } diff --git a/src/test/bench/shootout-threadring.rs b/src/test/bench/shootout-threadring.rs index 5e096816306..6293b6ce866 100644 --- a/src/test/bench/shootout-threadring.rs +++ b/src/test/bench/shootout-threadring.rs @@ -13,14 +13,14 @@ use std::os; fn start(n_tasks: int, token: int) { - let (p, ch1) = stream(); + let (p, ch1) = Chan::new(); let mut p = p; let ch1 = ch1; ch1.send(token); // XXX could not get this to work with a range closure let mut i = 2; while i <= n_tasks { - let (next_p, ch) = stream(); + let (next_p, ch) = Chan::new(); let imm_i = i; let imm_p = p; do spawn { diff --git a/src/test/bench/task-perf-jargon-metal-smoke.rs b/src/test/bench/task-perf-jargon-metal-smoke.rs index 8e7b48040cd..dc31ef06fa6 100644 --- a/src/test/bench/task-perf-jargon-metal-smoke.rs +++ b/src/test/bench/task-perf-jargon-metal-smoke.rs @@ -48,9 +48,9 @@ fn main() { args.clone() }; - let (p,c) = comm::stream(); + let (p,c) = Chan::new(); child_generation(from_str::<uint>(args[1]).unwrap(), c); - if p.try_recv().is_none() { + if p.recv_opt().is_none() { fail!("it happened when we slumbered"); } } diff --git a/src/test/run-pass/hashmap-memory.rs b/src/test/run-pass/hashmap-memory.rs index bacf8353a2e..49aa8d18e90 100644 --- a/src/test/run-pass/hashmap-memory.rs +++ b/src/test/run-pass/hashmap-memory.rs @@ -81,7 +81,7 @@ mod map_reduce { mapper_done => { num_mappers -= 1; } find_reducer(k, cc) => { let mut c; - match reducers.find(&str::from_utf8(k)) { + match reducers.find(&str::from_utf8(k).to_owned()) { Some(&_c) => { c = _c; } None => { c = 0; } } diff --git a/src/test/run-pass/task-comm-14.rs b/src/test/run-pass/task-comm-14.rs index 435d68ada49..2a7a0c25a21 100644 --- a/src/test/run-pass/task-comm-14.rs +++ b/src/test/run-pass/task-comm-14.rs @@ -20,7 +20,7 @@ pub fn main() { while (i > 0) { info!("{}", i); let ch = ch.clone(); - task::spawn({let i = i; proc() { child(i, &ch) }); + task::spawn({let i = i; proc() { child(i, &ch) }}); i = i - 1; } |
