diff options
| author | Brian Anderson <banderson@mozilla.com> | 2013-01-25 17:51:53 -0800 |
|---|---|---|
| committer | Brian Anderson <banderson@mozilla.com> | 2013-01-25 18:06:30 -0800 |
| commit | 1ef83945c1d76c9f2b9b0d087ceac65963087be7 (patch) | |
| tree | b114ac7f84a0a10a3d207da0bcd3e9dcc2418e7b | |
| parent | d1f771ca341bc93e2ebe9f0ef9979a71a8e3c6d8 (diff) | |
| parent | 19aa88cd64c81b77b874f9396a43fedfa28f14ee (diff) | |
| download | rust-1ef83945c1d76c9f2b9b0d087ceac65963087be7.tar.gz rust-1ef83945c1d76c9f2b9b0d087ceac65963087be7.zip | |
Merge remote-tracking branch 'brson/nocommupstream'
Conflicts: src/libcore/private.rs src/libcore/task/mod.rs src/libcore/task/spawn.rs src/libstd/net_tcp.rs src/libstd/uv_global_loop.rs src/libstd/uv_iotask.rs
| -rw-r--r-- | src/libcore/os.rs | 204 | ||||
| -rw-r--r-- | src/libcore/pipes.rs | 10 | ||||
| -rw-r--r-- | src/libcore/private.rs | 274 | ||||
| -rw-r--r-- | src/libcore/private/at_exit.rs | 98 | ||||
| -rw-r--r-- | src/libcore/private/finally.rs | 98 | ||||
| -rw-r--r-- | src/libcore/private/global.rs | 296 | ||||
| -rw-r--r-- | src/libcore/private/weak_task.rs | 207 | ||||
| -rw-r--r-- | src/libcore/run.rs | 13 | ||||
| -rw-r--r-- | src/libcore/task/mod.rs | 258 | ||||
| -rw-r--r-- | src/libcore/task/spawn.rs | 67 | ||||
| -rw-r--r-- | src/libstd/flatpipes.rs | 3 | ||||
| -rw-r--r-- | src/libstd/net_ip.rs | 6 | ||||
| -rw-r--r-- | src/libstd/net_tcp.rs | 712 | ||||
| -rw-r--r-- | src/libstd/timer.rs | 21 | ||||
| -rw-r--r-- | src/libstd/uv_global_loop.rs | 205 | ||||
| -rw-r--r-- | src/libstd/uv_iotask.rs | 114 | ||||
| -rw-r--r-- | src/rt/rust.cpp | 4 | ||||
| -rw-r--r-- | src/rt/rust_builtin.cpp | 46 | ||||
| -rw-r--r-- | src/rt/rust_kernel.cpp | 165 | ||||
| -rw-r--r-- | src/rt/rust_kernel.h | 43 | ||||
| -rw-r--r-- | src/rt/rust_uv.cpp | 9 | ||||
| -rw-r--r-- | src/rt/rustrt.def.in | 8 | ||||
| -rw-r--r-- | src/test/run-pass/pipe-detect-term.rs | 2 | ||||
| -rw-r--r-- | src/test/run-pass/pipe-select.rs | 2 | ||||
| -rw-r--r-- | src/test/run-pass/pipe-sleep.rs | 2 |
25 files changed, 1683 insertions, 1184 deletions
diff --git a/src/libcore/os.rs b/src/libcore/os.rs index cf86f45379c..6f568e9b2a7 100644 --- a/src/libcore/os.rs +++ b/src/libcore/os.rs @@ -141,169 +141,101 @@ pub mod win32 { } } -pub fn getenv(n: &str) -> Option<~str> { - global_env::getenv(n) -} +/* +Accessing environment variables is not generally threadsafe. +This uses a per-runtime lock to serialize access. +XXX: It would probably be appropriate to make this a real global +*/ +fn with_env_lock<T>(f: &fn() -> T) -> T { + use private::global::global_data_clone_create; + use private::{Exclusive, exclusive}; + + struct SharedValue(()); + type ValueMutex = Exclusive<SharedValue>; + fn key(_: ValueMutex) { } -pub fn setenv(n: &str, v: &str) { - global_env::setenv(n, v) -} + unsafe { + let lock: ValueMutex = global_data_clone_create(key, || { + ~exclusive(SharedValue(())) + }); -pub fn env() -> ~[(~str,~str)] { - global_env::env() + lock.with_imm(|_| f() ) + } } -mod global_env { - //! Internal module for serializing access to getenv/setenv - use either; - use libc; - use oldcomm; - use option::Option; - use private; - use str; - use task; - +pub fn env() -> ~[(~str,~str)] { extern mod rustrt { - unsafe fn rust_global_env_chan_ptr() -> *libc::uintptr_t; - } - - enum Msg { - MsgGetEnv(~str, oldcomm::Chan<Option<~str>>), - MsgSetEnv(~str, ~str, oldcomm::Chan<()>), - MsgEnv(oldcomm::Chan<~[(~str,~str)]>) - } - - pub fn getenv(n: &str) -> Option<~str> { - let env_ch = get_global_env_chan(); - let po = oldcomm::Port(); - oldcomm::send(env_ch, MsgGetEnv(str::from_slice(n), - oldcomm::Chan(&po))); - oldcomm::recv(po) - } - - pub fn setenv(n: &str, v: &str) { - let env_ch = get_global_env_chan(); - let po = oldcomm::Port(); - oldcomm::send(env_ch, MsgSetEnv(str::from_slice(n), - str::from_slice(v), - oldcomm::Chan(&po))); - oldcomm::recv(po) - } - - pub fn env() -> ~[(~str,~str)] { - let env_ch = get_global_env_chan(); - let po = oldcomm::Port(); - oldcomm::send(env_ch, MsgEnv(oldcomm::Chan(&po))); - oldcomm::recv(po) - } - - fn get_global_env_chan() -> oldcomm::Chan<Msg> { - unsafe { - let global_ptr = rustrt::rust_global_env_chan_ptr(); - private::chan_from_global_ptr(global_ptr, || { - // FIXME (#2621): This would be a good place to use a very - // small foreign stack - task::task().sched_mode(task::SingleThreaded).unlinked() - }, global_env_task) - } + unsafe fn rust_env_pairs() -> ~[~str]; } - fn global_env_task(msg_po: oldcomm::Port<Msg>) { - unsafe { - do private::weaken_task |weak_po| { - loop { - match oldcomm::select2(msg_po, weak_po) { - either::Left(MsgGetEnv(ref n, resp_ch)) => { - oldcomm::send(resp_ch, impl_::getenv(*n)) - } - either::Left(MsgSetEnv(ref n, ref v, resp_ch)) => { - oldcomm::send(resp_ch, impl_::setenv(*n, *v)) - } - either::Left(MsgEnv(resp_ch)) => { - oldcomm::send(resp_ch, impl_::env()) - } - either::Right(_) => break - } - } + unsafe { + do with_env_lock { + let mut pairs = ~[]; + for vec::each(rustrt::rust_env_pairs()) |p| { + let vs = str::splitn_char(*p, '=', 1u); + assert vec::len(vs) == 2u; + pairs.push((copy vs[0], copy vs[1])); } + move pairs } } +} - mod impl_ { - use cast; - use libc; - use option::Option; - use option; - use ptr; - use str; - use vec; - - extern mod rustrt { - unsafe fn rust_env_pairs() -> ~[~str]; - } - - pub fn env() -> ~[(~str,~str)] { - unsafe { - let mut pairs = ~[]; - for vec::each(rustrt::rust_env_pairs()) |p| { - let vs = str::splitn_char(*p, '=', 1u); - assert vec::len(vs) == 2u; - pairs.push((copy vs[0], copy vs[1])); - } - move pairs - } - } - - #[cfg(unix)] - pub fn getenv(n: &str) -> Option<~str> { - unsafe { - let s = str::as_c_str(n, |s| libc::getenv(s)); - return if ptr::null::<u8>() == cast::reinterpret_cast(&s) { - option::None::<~str> - } else { - let s = cast::reinterpret_cast(&s); - option::Some::<~str>(str::raw::from_buf(s)) - }; +#[cfg(unix)] +pub fn getenv(n: &str) -> Option<~str> { + unsafe { + do with_env_lock { + let s = str::as_c_str(n, |s| libc::getenv(s)); + if ptr::null::<u8>() == cast::reinterpret_cast(&s) { + option::None::<~str> + } else { + let s = cast::reinterpret_cast(&s); + option::Some::<~str>(str::raw::from_buf(s)) } } + } +} - #[cfg(windows)] - pub fn getenv(n: &str) -> Option<~str> { - unsafe { - use os::win32::{as_utf16_p, fill_utf16_buf_and_decode}; - do as_utf16_p(n) |u| { - do fill_utf16_buf_and_decode() |buf, sz| { - libc::GetEnvironmentVariableW(u, buf, sz) - } +#[cfg(windows)] +pub fn getenv(n: &str) -> Option<~str> { + unsafe { + do with_env_lock { + use os::win32::{as_utf16_p, fill_utf16_buf_and_decode}; + do as_utf16_p(n) |u| { + do fill_utf16_buf_and_decode() |buf, sz| { + libc::GetEnvironmentVariableW(u, buf, sz) } } } + } +} - #[cfg(unix)] - pub fn setenv(n: &str, v: &str) { - unsafe { - do str::as_c_str(n) |nbuf| { - do str::as_c_str(v) |vbuf| { - libc::funcs::posix01::unistd::setenv(nbuf, vbuf, 1); - } +#[cfg(unix)] +pub fn setenv(n: &str, v: &str) { + unsafe { + do with_env_lock { + do str::as_c_str(n) |nbuf| { + do str::as_c_str(v) |vbuf| { + libc::funcs::posix01::unistd::setenv(nbuf, vbuf, 1); } } } + } +} - #[cfg(windows)] - pub fn setenv(n: &str, v: &str) { - unsafe { - use os::win32::as_utf16_p; - do as_utf16_p(n) |nbuf| { - do as_utf16_p(v) |vbuf| { - libc::SetEnvironmentVariableW(nbuf, vbuf); - } +#[cfg(windows)] +pub fn setenv(n: &str, v: &str) { + unsafe { + do with_env_lock { + use os::win32::as_utf16_p; + do as_utf16_p(n) |nbuf| { + do as_utf16_p(v) |vbuf| { + libc::SetEnvironmentVariableW(nbuf, vbuf); } } } - } } diff --git a/src/libcore/pipes.rs b/src/libcore/pipes.rs index 0ef30668dbc..cecc954cdf3 100644 --- a/src/libcore/pipes.rs +++ b/src/libcore/pipes.rs @@ -1286,6 +1286,16 @@ pub fn oneshot<T: Owned>() -> (PortOne<T>, ChanOne<T>) { (port, chan) } +impl<T: Owned> PortOne<T> { + fn recv(self) -> T { recv_one(self) } + fn try_recv(self) -> Option<T> { try_recv_one(self) } +} + +impl<T: Owned> ChanOne<T> { + fn send(self, data: T) { send_one(self, data) } + fn try_send(self, data: T) -> bool { try_send_one(self, data) } +} + /** * Receive a message from a oneshot pipe, failing if the connection was * closed. diff --git a/src/libcore/private.rs b/src/libcore/private.rs index ad27729cc9f..332c763f151 100644 --- a/src/libcore/private.rs +++ b/src/libcore/private.rs @@ -18,7 +18,6 @@ use cast; use iter; use libc; -use oldcomm; use option; use pipes; use prelude::*; @@ -28,10 +27,17 @@ use task; use task::{TaskBuilder, atomically}; use uint; +#[path = "private/at_exit.rs"] +pub mod at_exit; +#[path = "private/global.rs"] +pub mod global; +#[path = "private/finally.rs"] +pub mod finally; +#[path = "private/weak_task.rs"] +pub mod weak_task; + extern mod rustrt { #[legacy_exports]; - unsafe fn rust_task_weaken(ch: rust_port_id); - unsafe fn rust_task_unweaken(ch: rust_port_id); unsafe fn rust_create_little_lock() -> rust_little_lock; unsafe fn rust_destroy_little_lock(lock: rust_little_lock); @@ -87,11 +93,6 @@ fn test_run_in_bare_thread() { } } -#[allow(non_camel_case_types)] // runtime type -type rust_port_id = uint; - -type GlobalPtr = *libc::uintptr_t; - fn compare_and_swap(address: &mut int, oldval: int, newval: int) -> bool { unsafe { let old = rusti::atomic_cxchg(address, oldval, newval); @@ -99,255 +100,6 @@ fn compare_and_swap(address: &mut int, oldval: int, newval: int) -> bool { } } -/** - * Atomically gets a channel from a pointer to a pointer-sized memory location - * or, if no channel exists creates and installs a new channel and sets up a - * new task to receive from it. - */ -pub unsafe fn chan_from_global_ptr<T: Owned>( - global: GlobalPtr, - task_fn: fn() -> task::TaskBuilder, - f: fn~(oldcomm::Port<T>) -) -> oldcomm::Chan<T> { - - enum Msg { - Proceed, - Abort - } - - log(debug,~"ENTERING chan_from_global_ptr, before is_prob_zero check"); - let is_probably_zero = *global == 0u; - log(debug,~"after is_prob_zero check"); - if is_probably_zero { - log(debug,~"is probably zero..."); - // There's no global channel. We must make it - - let (setup1_po, setup1_ch) = pipes::stream(); - let (setup2_po, setup2_ch) = pipes::stream(); - - // FIXME #4422: Ugly type inference hint - let setup2_po: pipes::Port<Msg> = setup2_po; - - do task_fn().spawn |move f, move setup1_ch, move setup2_po| { - let po = oldcomm::Port::<T>(); - let ch = oldcomm::Chan(&po); - setup1_ch.send(ch); - - // Wait to hear if we are the official instance of - // this global task - match setup2_po.recv() { - Proceed => f(move po), - Abort => () - } - }; - - log(debug,~"before setup recv.."); - // This is the proposed global channel - let ch = setup1_po.recv(); - // 0 is our sentinal value. It is not a valid channel - assert *ch != 0; - - // Install the channel - log(debug,~"BEFORE COMPARE AND SWAP"); - let swapped = compare_and_swap( - cast::reinterpret_cast(&global), - 0, cast::reinterpret_cast(&ch)); - log(debug,fmt!("AFTER .. swapped? %?", swapped)); - - if swapped { - // Success! - setup2_ch.send(Proceed); - ch - } else { - // Somebody else got in before we did - setup2_ch.send(Abort); - cast::reinterpret_cast(&*global) - } - } else { - log(debug, ~"global != 0"); - cast::reinterpret_cast(&*global) - } -} - -#[test] -pub fn test_from_global_chan1() { - - // This is unreadable, right? - - // The global channel - let globchan = 0; - let globchanp = ptr::addr_of(&globchan); - - // Create the global channel, attached to a new task - let ch = unsafe { - do chan_from_global_ptr(globchanp, task::task) |po| { - let ch = oldcomm::recv(po); - oldcomm::send(ch, true); - let ch = oldcomm::recv(po); - oldcomm::send(ch, true); - } - }; - // Talk to it - let po = oldcomm::Port(); - oldcomm::send(ch, oldcomm::Chan(&po)); - assert oldcomm::recv(po) == true; - - // This one just reuses the previous channel - let ch = unsafe { - do chan_from_global_ptr(globchanp, task::task) |po| { - let ch = oldcomm::recv(po); - oldcomm::send(ch, false); - } - }; - - // Talk to the original global task - let po = oldcomm::Port(); - oldcomm::send(ch, oldcomm::Chan(&po)); - assert oldcomm::recv(po) == true; -} - -#[test] -pub fn test_from_global_chan2() { - - for iter::repeat(100) { - // The global channel - let globchan = 0; - let globchanp = ptr::addr_of(&globchan); - - let resultpo = oldcomm::Port(); - let resultch = oldcomm::Chan(&resultpo); - - // Spawn a bunch of tasks that all want to compete to - // create the global channel - for uint::range(0, 10) |i| { - do task::spawn { - let ch = unsafe { - do chan_from_global_ptr( - globchanp, task::task) |po| { - - for uint::range(0, 10) |_j| { - let ch = oldcomm::recv(po); - oldcomm::send(ch, {i}); - } - } - }; - let po = oldcomm::Port(); - oldcomm::send(ch, oldcomm::Chan(&po)); - // We are The winner if our version of the - // task was installed - let winner = oldcomm::recv(po); - oldcomm::send(resultch, winner == i); - } - } - // There should be only one winner - let mut winners = 0u; - for uint::range(0u, 10u) |_i| { - let res = oldcomm::recv(resultpo); - if res { winners += 1u }; - } - assert winners == 1u; - } -} - -/** - * Convert the current task to a 'weak' task temporarily - * - * As a weak task it will not be counted towards the runtime's set - * of live tasks. When there are no more outstanding live (non-weak) tasks - * the runtime will send an exit message on the provided channel. - * - * This function is super-unsafe. Do not use. - * - * # Safety notes - * - * * Weak tasks must either die on their own or exit upon receipt of - * the exit message. Failure to do so will cause the runtime to never - * exit - * * Tasks must not call `weaken_task` multiple times. This will - * break the kernel's accounting of live tasks. - * * Weak tasks must not be supervised. A supervised task keeps - * a reference to its parent, so the parent will not die. - */ -pub unsafe fn weaken_task(f: fn(oldcomm::Port<()>)) { - let po = oldcomm::Port(); - let ch = oldcomm::Chan(&po); - unsafe { - rustrt::rust_task_weaken(cast::reinterpret_cast(&ch)); - } - let _unweaken = Unweaken(ch); - f(po); - - struct Unweaken { - ch: oldcomm::Chan<()>, - drop { - unsafe { - rustrt::rust_task_unweaken(cast::reinterpret_cast(&self.ch)); - } - } - } - - fn Unweaken(ch: oldcomm::Chan<()>) -> Unweaken { - Unweaken { - ch: ch - } - } -} - -#[test] -pub fn test_weaken_task_then_unweaken() { - do task::try { - unsafe { - do weaken_task |_po| { - } - } - }; -} - -#[test] -pub fn test_weaken_task_wait() { - do task::spawn_unlinked { - unsafe { - do weaken_task |po| { - oldcomm::recv(po); - } - } - } -} - -#[test] -pub fn test_weaken_task_stress() { - // Create a bunch of weak tasks - for iter::repeat(100u) { - do task::spawn { - unsafe { - do weaken_task |_po| { - } - } - } - do task::spawn_unlinked { - unsafe { - do weaken_task |po| { - // Wait for it to tell us to die - oldcomm::recv(po); - } - } - } - } -} - -#[test] -#[ignore(cfg(windows))] -pub fn test_weaken_task_fail() { - let res = do task::try { - unsafe { - do weaken_task |_po| { - fail; - } - } - }; - assert result::is_err(&res); -} - /**************************************************************************** * Shared state & exclusive ARC ****************************************************************************/ @@ -533,6 +285,14 @@ pub unsafe fn clone_shared_mutable_state<T: Owned>(rc: &SharedMutableState<T>) ArcDestruct((*rc).data) } +impl<T: Owned> SharedMutableState<T>: Clone { + fn clone(&self) -> SharedMutableState<T> { + unsafe { + clone_shared_mutable_state(self) + } + } +} + /****************************************************************************/ #[allow(non_camel_case_types)] // runtime type diff --git a/src/libcore/private/at_exit.rs b/src/libcore/private/at_exit.rs new file mode 100644 index 00000000000..a87301dbe07 --- /dev/null +++ b/src/libcore/private/at_exit.rs @@ -0,0 +1,98 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use sys; +use cast; +use ptr; +use task; +use uint; +use vec; +use rand; +use libc::{c_void, size_t}; + +/** +Register a function to be run during runtime shutdown. + +After all non-weak tasks have exited, registered exit functions will +execute, in random order, on the primary scheduler. Each function runs +in its own unsupervised task. +*/ +pub fn at_exit(f: ~fn()) { + unsafe { + let runner: &fn(*ExitFunctions) = exit_runner; + let runner_pair: sys::Closure = cast::transmute(runner); + let runner_ptr = runner_pair.code; + let runner_ptr = cast::transmute(runner_ptr); + rustrt::rust_register_exit_function(runner_ptr, ~f); + } +} + +// NB: The double pointer indirection here is because ~fn() is a fat +// pointer and due to FFI problems I am more comfortable making the +// interface use a normal pointer +extern mod rustrt { + fn rust_register_exit_function(runner: *c_void, f: ~~fn()); +} + +struct ExitFunctions { + // The number of exit functions + count: size_t, + // The buffer of exit functions + start: *~~fn() +} + +fn exit_runner(exit_fns: *ExitFunctions) { + let exit_fns = unsafe { &*exit_fns }; + let count = (*exit_fns).count; + let start = (*exit_fns).start; + + // NB: from_buf memcpys from the source, which will + // give us ownership of the array of functions + let mut exit_fns_vec = unsafe { vec::from_buf(start, count as uint) }; + // Let's not make any promises about execution order + rand::Rng().shuffle_mut(exit_fns_vec); + + debug!("running %u exit functions", exit_fns_vec.len()); + + while !exit_fns_vec.is_empty() { + match exit_fns_vec.pop() { + ~f => { + task::task().supervised().spawn(f); + } + } + } +} + +#[abi = "rust-intrinsic"] +pub extern mod rusti { + fn move_val_init<T>(dst: &mut T, -src: T); + fn init<T>() -> T; +} + +#[test] +fn test_at_exit() { + let i = 10; + do at_exit { + debug!("at_exit1"); + assert i == 10; + } +} + +#[test] +fn test_at_exit_many() { + let i = 10; + for uint::range(20, 100) |j| { + do at_exit { + debug!("at_exit2"); + assert i == 10; + assert j > i; + } + } +} \ No newline at end of file diff --git a/src/libcore/private/finally.rs b/src/libcore/private/finally.rs new file mode 100644 index 00000000000..66e23ff4336 --- /dev/null +++ b/src/libcore/private/finally.rs @@ -0,0 +1,98 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +/*! +The Finally trait provides a method, `finally` on +stack closures that emulates Java-style try/finally blocks. + +# Example + +~~~ +do || { + ... +}.finally { + alway_run_this(); +} +~~~ +*/ + +use ops::Drop; +use task::{spawn, failing}; + +pub trait Finally<T> { + fn finally(&self, +dtor: &fn()) -> T; +} + +impl<T> &fn() -> T: Finally<T> { + // XXX: Should not require a mode here + fn finally(&self, +dtor: &fn()) -> T { + let _d = Finallyalizer { + dtor: dtor + }; + + (*self)() + } +} + +struct Finallyalizer { + dtor: &fn() +} + +impl Finallyalizer: Drop { + fn finalize(&self) { + (self.dtor)(); + } +} + +#[test] +fn test_success() { + let mut i = 0; + do (|| { + i = 10; + }).finally { + assert !failing(); + assert i == 10; + i = 20; + } + assert i == 20; +} + +#[test] +#[ignore(cfg(windows))] +#[should_fail] +fn test_fail() { + let mut i = 0; + do (|| { + i = 10; + fail; + }).finally { + assert failing(); + assert i == 10; + } +} + +#[test] +fn test_retval() { + let i = do (fn&() -> int { + 10 + }).finally { }; + assert i == 10; +} + +#[test] +fn test_compact() { + // XXX Should be able to use a fn item instead + // of a closure for do_some_fallible_work, + // but it's a type error. + let do_some_fallible_work: &fn() = || { }; + fn but_always_run_this_function() { } + do_some_fallible_work.finally( + but_always_run_this_function); +} \ No newline at end of file diff --git a/src/libcore/private/global.rs b/src/libcore/private/global.rs new file mode 100644 index 00000000000..69319abc009 --- /dev/null +++ b/src/libcore/private/global.rs @@ -0,0 +1,296 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +/*! +Global data + +An interface for creating and retrieving values with global +(per-runtime) scope. + +Global values are stored in a map and protected by a single global +mutex. Operations are provided for accessing and cloning the value +under the mutex. + +Because all globals go through a single mutex, they should be used +sparingly. The interface is intended to be used with clonable, +atomically reference counted synchronization types, like ARCs, in +which case the value should be cached locally whenever possible to +avoid hitting the mutex. +*/ + +use cast::{transmute, reinterpret_cast}; +use clone::Clone; +use kinds::Owned; +use libc::{c_void, uintptr_t}; +use option::{Option, Some, None}; +use ops::Drop; +use pipes; +use private::{Exclusive, exclusive}; +use private::{SharedMutableState, shared_mutable_state}; +use private::{get_shared_immutable_state}; +use private::at_exit::at_exit; +use hashmap::linear::LinearMap; +use sys::Closure; +use task::spawn; +use uint; + +pub type GlobalDataKey<T: Owned> = &fn(v: T); + +pub unsafe fn global_data_clone_create<T: Owned Clone>( + key: GlobalDataKey<T>, create: &fn() -> ~T) -> T { + /*! + * Clone a global value or, if it has not been created, + * first construct the value then return a clone. + * + * # Safety note + * + * Both the clone operation and the constructor are + * called while the global lock is held. Recursive + * use of the global interface in either of these + * operations will result in deadlock. + */ + global_data_clone_create_(key_ptr(key), create) +} + +unsafe fn global_data_clone_create_<T: Owned Clone>( + key: uint, create: &fn() -> ~T) -> T { + + let mut clone_value: Option<T> = None; + do global_data_modify_(key) |value: Option<~T>| { + match value { + None => { + let value = create(); + clone_value = Some(value.clone()); + Some(value) + } + Some(value) => { + clone_value = Some(value.clone()); + Some(value) + } + } + } + return clone_value.unwrap(); +} + +unsafe fn global_data_modify<T: Owned>( + key: GlobalDataKey<T>, op: &fn(Option<~T>) -> Option<~T>) { + + global_data_modify_(key_ptr(key), op) +} + +unsafe fn global_data_modify_<T: Owned>( + key: uint, op: &fn(Option<~T>) -> Option<~T>) { + + let mut old_dtor = None; + do get_global_state().with |gs| { + let (maybe_new_value, maybe_dtor) = match gs.map.pop(&key) { + Some((ptr, dtor)) => { + let value: ~T = transmute(ptr); + (op(Some(value)), Some(dtor)) + } + None => { + (op(None), None) + } + }; + match maybe_new_value { + Some(value) => { + let data: *c_void = transmute(value); + let dtor: ~fn() = match maybe_dtor { + Some(dtor) => dtor, + None => { + let dtor: ~fn() = || unsafe { + let _destroy_value: ~T = transmute(data); + }; + dtor + } + }; + let value = (data, dtor); + gs.map.insert(key, value); + } + None => { + match maybe_dtor { + Some(dtor) => old_dtor = Some(dtor), + None => () + } + } + } + } +} + +pub unsafe fn global_data_clone<T: Owned Clone>( + key: GlobalDataKey<T>) -> Option<T> { + let mut maybe_clone: Option<T> = None; + do global_data_modify(key) |current| { + match ¤t { + &Some(~ref value) => { + maybe_clone = Some(value.clone()); + } + &None => () + } + current + } + return maybe_clone; +} + +// GlobalState is a map from keys to unique pointers and a +// destructor. Keys are pointers derived from the type of the +// global value. There is a single GlobalState instance per runtime. +struct GlobalState { + map: LinearMap<uint, (*c_void, ~fn())> +} + +impl GlobalState: Drop { + fn finalize(&self) { + for self.map.each_value |v| { + match v { + &(_, ref dtor) => (*dtor)() + } + } + } +} + +fn get_global_state() -> Exclusive<GlobalState> { + + const POISON: int = -1; + + // XXX: Doing atomic_cxchg to initialize the global state + // lazily, which wouldn't be necessary with a runtime written + // in Rust + let global_ptr = unsafe { rust_get_global_data_ptr() }; + + if unsafe { *global_ptr } == 0 { + // Global state doesn't exist yet, probably + + // The global state object + let state = GlobalState { + map: LinearMap::new() + }; + + // It's under a reference-counted mutex + let state = ~exclusive(state); + + // Convert it to an integer + let state_ptr: &Exclusive<GlobalState> = state; + let state_i: int = unsafe { transmute(state_ptr) }; + + // Swap our structure into the global pointer + let prev_i = unsafe { atomic_cxchg(&mut *global_ptr, 0, state_i) }; + + // Sanity check that we're not trying to reinitialize after shutdown + assert prev_i != POISON; + + if prev_i == 0 { + // Successfully installed the global pointer + + // Take a handle to return + let clone = state.clone(); + + // Install a runtime exit function to destroy the global object + do at_exit { + // Poison the global pointer + let prev_i = unsafe { + atomic_cxchg(&mut *global_ptr, state_i, POISON) + }; + assert prev_i == state_i; + + // Capture the global state object in the at_exit closure + // so that it is destroyed at the right time + let _capture_global_state = &state; + }; + return clone; + } else { + // Somebody else initialized the globals first + let state: &Exclusive<GlobalState> = unsafe { transmute(prev_i) }; + return state.clone(); + } + } else { + let state: &Exclusive<GlobalState> = unsafe { + transmute(*global_ptr) + }; + return state.clone(); + } +} + +fn key_ptr<T: Owned>(key: GlobalDataKey<T>) -> uint { + unsafe { + let closure: Closure = reinterpret_cast(&key); + return transmute(closure.code); + } +} + +extern { + fn rust_get_global_data_ptr() -> *mut int; +} + +#[abi = "rust-intrinsic"] +extern { + fn atomic_cxchg(dst: &mut int, old: int, src: int) -> int; +} + +#[test] +fn test_clone_rc() { + type MyType = SharedMutableState<int>; + + fn key(_v: SharedMutableState<int>) { } + + for uint::range(0, 100) |_| { + do spawn { + unsafe { + let val = do global_data_clone_create(key) { + ~shared_mutable_state(10) + }; + + assert get_shared_immutable_state(&val) == &10; + } + } + } +} + +#[test] +fn test_modify() { + type MyType = SharedMutableState<int>; + + fn key(_v: SharedMutableState<int>) { } + + unsafe { + do global_data_modify(key) |v| { + match v { + None => { + unsafe { + Some(~shared_mutable_state(10)) + } + } + _ => fail + } + } + + do global_data_modify(key) |v| { + match v { + Some(sms) => { + let v = get_shared_immutable_state(sms); + assert *v == 10; + None + }, + _ => fail + } + } + + do global_data_modify(key) |v| { + match v { + None => { + unsafe { + Some(~shared_mutable_state(10)) + } + } + _ => fail + } + } + } +} diff --git a/src/libcore/private/weak_task.rs b/src/libcore/private/weak_task.rs new file mode 100644 index 00000000000..25a03ff960f --- /dev/null +++ b/src/libcore/private/weak_task.rs @@ -0,0 +1,207 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +/*! +Weak tasks + +Weak tasks are a runtime feature for building global services that +do not keep the runtime alive. Normally the runtime exits when all +tasks exits, but if a task is weak then the runtime may exit while +it is running, sending a notification to the task that the runtime +is trying to shut down. +*/ + +use option::{Some, None, swap_unwrap}; +use private::at_exit::at_exit; +use private::global::global_data_clone_create; +use private::finally::Finally; +use pipes::{Port, Chan, SharedChan, stream}; +use task::{Task, task, spawn}; +use task::rt::{task_id, get_task_id}; +use hashmap::linear::LinearMap; +use ops::Drop; + +type ShutdownMsg = (); + +// XXX: This could be a PortOne but I've experienced bugginess +// with oneshot pipes and try_send +pub unsafe fn weaken_task(f: &fn(Port<ShutdownMsg>)) { + let service = global_data_clone_create(global_data_key, + create_global_service); + let (shutdown_port, shutdown_chan) = stream::<ShutdownMsg>(); + let shutdown_port = ~mut Some(shutdown_port); + let task = get_task_id(); + // Expect the weak task service to be alive + assert service.try_send(RegisterWeakTask(task, shutdown_chan)); + unsafe { rust_inc_weak_task_count(); } + do fn&() { + let shutdown_port = swap_unwrap(&mut *shutdown_port); + f(shutdown_port) + }.finally || { + unsafe { rust_dec_weak_task_count(); } + // Service my have already exited + service.send(UnregisterWeakTask(task)); + } +} + +type WeakTaskService = SharedChan<ServiceMsg>; +type TaskHandle = task_id; + +fn global_data_key(_v: WeakTaskService) { } + +enum ServiceMsg { + RegisterWeakTask(TaskHandle, Chan<ShutdownMsg>), + UnregisterWeakTask(TaskHandle), + Shutdown +} + +fn create_global_service() -> ~WeakTaskService { + + debug!("creating global weak task service"); + let (port, chan) = stream::<ServiceMsg>(); + let port = ~mut Some(port); + let chan = SharedChan(chan); + let chan_clone = chan.clone(); + + do task().unlinked().spawn { + debug!("running global weak task service"); + let port = swap_unwrap(&mut *port); + let port = ~mut Some(port); + do fn&() { + let port = swap_unwrap(&mut *port); + // The weak task service is itself a weak task + debug!("weakening the weak service task"); + unsafe { rust_inc_weak_task_count(); } + run_weak_task_service(port); + }.finally { + debug!("unweakening the weak service task"); + unsafe { rust_dec_weak_task_count(); } + } + } + + do at_exit { + debug!("shutting down weak task service"); + chan.send(Shutdown); + } + + return ~chan_clone; +} + +fn run_weak_task_service(port: Port<ServiceMsg>) { + + let mut shutdown_map = LinearMap::new(); + + loop { + match port.recv() { + RegisterWeakTask(task, shutdown_chan) => { + let previously_unregistered = + shutdown_map.insert(task, shutdown_chan); + assert previously_unregistered; + } + UnregisterWeakTask(task) => { + match shutdown_map.pop(&task) { + Some(shutdown_chan) => { + // Oneshot pipes must send, even though + // nobody will receive this + shutdown_chan.send(()); + } + None => fail + } + } + Shutdown => break + } + } + + do shutdown_map.consume |_, shutdown_chan| { + // Weak task may have already exited + shutdown_chan.send(()); + } +} + +extern { + unsafe fn rust_inc_weak_task_count(); + unsafe fn rust_dec_weak_task_count(); +} + +#[test] +fn test_simple() { + let (port, chan) = stream(); + do spawn { + unsafe { + do weaken_task |_signal| { + } + } + chan.send(()); + } + port.recv(); +} + +#[test] +fn test_weak_weak() { + let (port, chan) = stream(); + do spawn { + unsafe { + do weaken_task |_signal| { + } + do weaken_task |_signal| { + } + } + chan.send(()); + } + port.recv(); +} + +#[test] +fn test_wait_for_signal() { + do spawn { + unsafe { + do weaken_task |signal| { + signal.recv(); + } + } + } +} + +#[test] +fn test_wait_for_signal_many() { + use uint; + for uint::range(0, 100) |_| { + do spawn { + unsafe { + do weaken_task |signal| { + signal.recv(); + } + } + } + } +} + +#[test] +fn test_select_stream_and_oneshot() { + use pipes::select2i; + use either::{Left, Right}; + + let (port, chan) = stream(); + let (waitport, waitchan) = stream(); + do spawn { + unsafe { + do weaken_task |signal| { + match select2i(&port, &signal) { + Left(*) => (), + Right(*) => fail + } + } + } + waitchan.send(()); + } + chan.send(()); + waitport.recv(); +} + diff --git a/src/libcore/run.rs b/src/libcore/run.rs index 8960d40b85a..eeae7f5b291 100644 --- a/src/libcore/run.rs +++ b/src/libcore/run.rs @@ -17,7 +17,7 @@ use io; use io::ReaderUtil; use libc; use libc::{pid_t, c_void, c_int}; -use oldcomm; +use pipes::{stream, SharedChan}; use option::{Some, None}; use os; use prelude::*; @@ -336,22 +336,23 @@ pub fn program_output(prog: &str, args: &[~str]) -> // in parallel so we don't deadlock while blocking on one // or the other. FIXME (#2625): Surely there's a much more // clever way to do this. - let p = oldcomm::Port(); - let ch = oldcomm::Chan(&p); + let (p, ch) = stream(); + let ch = SharedChan(ch); + let ch_clone = ch.clone(); do task::spawn_sched(task::SingleThreaded) { let errput = readclose(pipe_err.in); - oldcomm::send(ch, (2, move errput)); + ch.send((2, move errput)); }; do task::spawn_sched(task::SingleThreaded) { let output = readclose(pipe_out.in); - oldcomm::send(ch, (1, move output)); + ch_clone.send((1, move output)); }; let status = run::waitpid(pid); let mut errs = ~""; let mut outs = ~""; let mut count = 2; while count > 0 { - let stream = oldcomm::recv(p); + let stream = p.recv(); match stream { (1, copy s) => { outs = move s; diff --git a/src/libcore/task/mod.rs b/src/libcore/task/mod.rs index a4d99bf5db4..aa82309c78a 100644 --- a/src/libcore/task/mod.rs +++ b/src/libcore/task/mod.rs @@ -43,16 +43,15 @@ use cmp; use cmp::Eq; use iter; use libc; -use oldcomm; use option; use result::Result; -use pipes::{stream, Chan, Port}; +use pipes::{stream, Chan, Port, SharedChan}; use pipes; use prelude::*; use ptr; use result; use task::local_data_priv::{local_get, local_set}; -use task::rt::{task_id, rust_task}; +use task::rt::{task_id, sched_id, rust_task}; use task; use util; use util::replace; @@ -62,6 +61,12 @@ pub mod local_data; pub mod rt; pub mod spawn; +/// A handle to a scheduler +#[deriving_eq] +pub enum Scheduler { + SchedulerHandle(sched_id) +} + /// A handle to a task #[deriving_eq] pub enum Task { @@ -95,7 +100,21 @@ impl TaskResult : Eq { } /// Scheduler modes +#[deriving_eq] pub enum SchedMode { + /// Run task on the default scheduler + DefaultScheduler, + /// Run task on the current scheduler + CurrentScheduler, + /// Run task on a specific scheduler + ExistingScheduler(Scheduler), + /** + * Tasks are scheduled on the main OS thread + * + * The main OS thread is the thread used to launch the runtime which, + * in most cases, is the process's initial thread as created by the OS. + */ + PlatformThread, /// All tasks run in the same OS thread SingleThreaded, /// Tasks are distributed among available CPUs @@ -104,53 +123,6 @@ pub enum SchedMode { ThreadPerTask, /// Tasks are distributed among a fixed number of OS threads ManualThreads(uint), - /** - * Tasks are scheduled on the main OS thread - * - * The main OS thread is the thread used to launch the runtime which, - * in most cases, is the process's initial thread as created by the OS. - */ - PlatformThread -} - -impl SchedMode : cmp::Eq { - pure fn eq(&self, other: &SchedMode) -> bool { - match (*self) { - SingleThreaded => { - match (*other) { - SingleThreaded => true, - _ => false - } - } - ThreadPerCore => { - match (*other) { - ThreadPerCore => true, - _ => false - } - } - ThreadPerTask => { - match (*other) { - ThreadPerTask => true, - _ => false - } - } - ManualThreads(e0a) => { - match (*other) { - ManualThreads(e0b) => e0a == e0b, - _ => false - } - } - PlatformThread => { - match (*other) { - PlatformThread => true, - _ => false - } - } - } - } - pure fn ne(&self, other: &SchedMode) -> bool { - !(*self).eq(other) - } } /** @@ -204,7 +176,7 @@ pub struct TaskOpts { linked: bool, supervised: bool, mut notify_chan: Option<Chan<TaskResult>>, - sched: Option<SchedOpts>, + sched: SchedOpts } /** @@ -369,11 +341,8 @@ impl TaskBuilder { opts: TaskOpts { linked: self.opts.linked, supervised: self.opts.supervised, - notify_chan: notify_chan, - sched: Some(SchedOpts { - mode: mode, - foreign_stack_size: None, - }) + notify_chan: move notify_chan, + sched: SchedOpts { mode: mode, foreign_stack_size: None} }, can_not_copy: None, .. self.consume() @@ -457,18 +426,17 @@ impl TaskBuilder { * Fails if a future_result was already set for this task. */ fn try<T: Owned>(f: fn~() -> T) -> Result<T,()> { - let po = oldcomm::Port(); - let ch = oldcomm::Chan(&po); + let (po, ch) = stream::<T>(); let mut result = None; let fr_task_builder = self.future_result(|+r| { result = Some(move r); }); - do fr_task_builder.spawn |move f| { - oldcomm::send(ch, f()); + do fr_task_builder.spawn |move f, move ch| { + ch.send(f()); } match option::unwrap(move result).recv() { - Success => result::Ok(oldcomm::recv(po)), + Success => result::Ok(po.recv()), Failure => result::Err(()) } } @@ -489,7 +457,10 @@ pub fn default_task_opts() -> TaskOpts { linked: true, supervised: false, notify_chan: None, - sched: None + sched: SchedOpts { + mode: DefaultScheduler, + foreign_stack_size: None + } } } @@ -542,10 +513,9 @@ pub fn spawn_with<A:Owned>(arg: A, f: fn~(v: A)) { pub fn spawn_sched(mode: SchedMode, f: fn~()) { /*! - * Creates a new scheduler and executes a task on it - * - * Tasks subsequently spawned by that task will also execute on - * the new scheduler. When there are no more tasks to execute the + * Creates a new task on a new or existing scheduler + + * When there are no more tasks to execute the * scheduler terminates. * * # Failure @@ -599,6 +569,10 @@ pub fn get_task() -> Task { } } +pub fn get_scheduler() -> Scheduler { + SchedulerHandle(unsafe { rt::rust_get_sched_id() }) +} + /** * Temporarily make the task unkillable * @@ -711,17 +685,18 @@ fn test_cant_dup_task_builder() { #[test] #[ignore(cfg(windows))] fn test_spawn_unlinked_unsup_no_fail_down() { // grandchild sends on a port - let po = oldcomm::Port(); - let ch = oldcomm::Chan(&po); + let (po, ch) = stream(); + let ch = SharedChan(ch); do spawn_unlinked { + let ch = ch.clone(); do spawn_unlinked { // Give middle task a chance to fail-but-not-kill-us. for iter::repeat(16) { task::yield(); } - oldcomm::send(ch, ()); // If killed first, grandparent hangs. + ch.send(()); // If killed first, grandparent hangs. } fail; // Shouldn't kill either (grand)parent or (grand)child. } - oldcomm::recv(po); + po.recv(); } #[test] #[ignore(cfg(windows))] fn test_spawn_unlinked_unsup_no_fail_up() { // child unlinked fails @@ -741,8 +716,7 @@ fn test_spawn_unlinked_sup_fail_down() { #[test] #[should_fail] #[ignore(cfg(windows))] fn test_spawn_linked_sup_fail_up() { // child fails; parent fails - let po = oldcomm::Port::<()>(); - let _ch = oldcomm::Chan(&po); + let (po, _ch) = stream::<()>(); // Unidirectional "parenting" shouldn't override bidirectional linked. // We have to cheat with opts - the interface doesn't support them because // they don't make sense (redundant with task().supervised()). @@ -760,7 +734,7 @@ fn test_spawn_linked_sup_fail_up() { // child fails; parent fails .. b0 }; do b1.spawn { fail; } - oldcomm::recv(po); // We should get punted awake + po.recv(); // We should get punted awake } #[test] #[should_fail] #[ignore(cfg(windows))] fn test_spawn_linked_sup_fail_down() { // parent fails; child fails @@ -784,11 +758,10 @@ fn test_spawn_linked_sup_fail_down() { // parent fails; child fails } #[test] #[should_fail] #[ignore(cfg(windows))] fn test_spawn_linked_unsup_fail_up() { // child fails; parent fails - let po = oldcomm::Port::<()>(); - let _ch = oldcomm::Chan(&po); + let (po, _ch) = stream::<()>(); // Default options are to spawn linked & unsupervised. do spawn { fail; } - oldcomm::recv(po); // We should get punted awake + po.recv(); // We should get punted awake } #[test] #[should_fail] #[ignore(cfg(windows))] fn test_spawn_linked_unsup_fail_down() { // parent fails; child fails @@ -856,27 +829,25 @@ fn test_spawn_linked_sup_propagate_sibling() { #[test] fn test_run_basic() { - let po = oldcomm::Port(); - let ch = oldcomm::Chan(&po); + let (po, ch) = stream::<()>(); do task().spawn { - oldcomm::send(ch, ()); + ch.send(()); } - oldcomm::recv(po); + po.recv(); } #[test] fn test_add_wrapper() { - let po = oldcomm::Port(); - let ch = oldcomm::Chan(&po); + let (po, ch) = stream::<()>(); let b0 = task(); let b1 = do b0.add_wrapper |body| { fn~(move body) { body(); - oldcomm::send(ch, ()); + ch.send(()); } }; do b1.spawn { } - oldcomm::recv(po); + po.recv(); } #[test] @@ -929,52 +900,46 @@ fn test_spawn_sched_no_threads() { #[test] fn test_spawn_sched() { - let po = oldcomm::Port(); - let ch = oldcomm::Chan(&po); + let (po, ch) = stream::<()>(); + let ch = SharedChan(ch); - fn f(i: int, ch: oldcomm::Chan<()>) { - unsafe { - let parent_sched_id = rt::rust_get_sched_id(); + fn f(i: int, ch: SharedChan<()>) { + let parent_sched_id = unsafe { rt::rust_get_sched_id() }; - do spawn_sched(SingleThreaded) { - unsafe { - let child_sched_id = rt::rust_get_sched_id(); - assert parent_sched_id != child_sched_id; - - if (i == 0) { - oldcomm::send(ch, ()); - } else { - f(i - 1, ch); - } - } - }; - } + do spawn_sched(SingleThreaded) { + let child_sched_id = unsafe { rt::rust_get_sched_id() }; + assert parent_sched_id != child_sched_id; + + if (i == 0) { + ch.send(()); + } else { + f(i - 1, ch.clone()); + } + }; } f(10, ch); - oldcomm::recv(po); + po.recv(); } #[test] -fn test_spawn_sched_childs_on_same_sched() { - let po = oldcomm::Port(); - let ch = oldcomm::Chan(&po); +fn test_spawn_sched_childs_on_default_sched() { + let (po, ch) = stream(); + + // Assuming tests run on the default scheduler + let default_id = unsafe { rt::rust_get_sched_id() }; do spawn_sched(SingleThreaded) { - unsafe { - let parent_sched_id = rt::rust_get_sched_id(); - do spawn { - unsafe { - let child_sched_id = rt::rust_get_sched_id(); - // This should be on the same scheduler - assert parent_sched_id == child_sched_id; - oldcomm::send(ch, ()); - } - }; - } + let parent_sched_id = unsafe { rt::rust_get_sched_id() }; + do spawn { + let child_sched_id = unsafe { rt::rust_get_sched_id() }; + assert parent_sched_id != child_sched_id; + assert child_sched_id == default_id; + ch.send(()); + }; }; - oldcomm::recv(po); + po.recv(); } #[nolink] @@ -996,10 +961,8 @@ fn test_spawn_sched_blocking() { // without affecting other schedulers for iter::repeat(20u) { - let start_po = oldcomm::Port(); - let start_ch = oldcomm::Chan(&start_po); - let fin_po = oldcomm::Port(); - let fin_ch = oldcomm::Chan(&fin_po); + let (start_po, start_ch) = stream(); + let (fin_po, fin_ch) = stream(); let lock = testrt::rust_dbg_lock_create(); @@ -1007,44 +970,42 @@ fn test_spawn_sched_blocking() { unsafe { testrt::rust_dbg_lock_lock(lock); - oldcomm::send(start_ch, ()); + start_ch.send(()); // Block the scheduler thread testrt::rust_dbg_lock_wait(lock); testrt::rust_dbg_lock_unlock(lock); - oldcomm::send(fin_ch, ()); + fin_ch.send(()); } }; // Wait until the other task has its lock - oldcomm::recv(start_po); + start_po.recv(); - fn pingpong(po: oldcomm::Port<int>, ch: oldcomm::Chan<int>) { + fn pingpong(po: &Port<int>, ch: &Chan<int>) { let mut val = 20; while val > 0 { - val = oldcomm::recv(po); - oldcomm::send(ch, val - 1); + val = po.recv(); + ch.send(val - 1); } } - let setup_po = oldcomm::Port(); - let setup_ch = oldcomm::Chan(&setup_po); - let parent_po = oldcomm::Port(); - let parent_ch = oldcomm::Chan(&parent_po); + let (setup_po, setup_ch) = stream(); + let (parent_po, parent_ch) = stream(); do spawn { - let child_po = oldcomm::Port(); - oldcomm::send(setup_ch, oldcomm::Chan(&child_po)); - pingpong(child_po, parent_ch); + let (child_po, child_ch) = stream(); + setup_ch.send(child_ch); + pingpong(&child_po, &parent_ch); }; - let child_ch = oldcomm::recv(setup_po); - oldcomm::send(child_ch, 20); - pingpong(parent_po, child_ch); + let child_ch = setup_po.recv(); + child_ch.send(20); + pingpong(&parent_po, &child_ch); testrt::rust_dbg_lock_lock(lock); testrt::rust_dbg_lock_signal(lock); testrt::rust_dbg_lock_unlock(lock); - oldcomm::recv(fin_po); + fin_po.recv(); testrt::rust_dbg_lock_destroy(lock); } } @@ -1052,18 +1013,17 @@ fn test_spawn_sched_blocking() { #[cfg(test)] fn avoid_copying_the_body(spawnfn: fn(v: fn~())) { - let p = oldcomm::Port::<uint>(); - let ch = oldcomm::Chan(&p); + let (p, ch) = stream::<uint>(); let x = ~1; let x_in_parent = ptr::addr_of(&(*x)) as uint; do spawnfn |move x| { let x_in_child = ptr::addr_of(&(*x)) as uint; - oldcomm::send(ch, x_in_child); + ch.send(x_in_child); } - let x_in_child = oldcomm::recv(p); + let x_in_child = p.recv(); assert x_in_parent == x_in_child; } @@ -1101,20 +1061,18 @@ fn test_avoid_copying_the_body_unlinked() { #[test] fn test_platform_thread() { - let po = oldcomm::Port(); - let ch = oldcomm::Chan(&po); + let (po, ch) = stream(); do task().sched_mode(PlatformThread).spawn { - oldcomm::send(ch, ()); + ch.send(()); } - oldcomm::recv(po); + po.recv(); } #[test] #[ignore(cfg(windows))] #[should_fail] fn test_unkillable() { - let po = oldcomm::Port(); - let ch = po.chan(); + let (po, ch) = stream(); // We want to do this after failing do spawn_unlinked { @@ -1242,7 +1200,7 @@ fn test_spawn_thread_on_demand() { let (port2, chan2) = pipes::stream(); - do spawn() |move chan2| { + do spawn_sched(CurrentScheduler) |move chan2| { chan2.send(()); } diff --git a/src/libcore/task/spawn.rs b/src/libcore/task/spawn.rs index edeacb31e1d..a5ab4af40be 100644 --- a/src/libcore/task/spawn.rs +++ b/src/libcore/task/spawn.rs @@ -74,9 +74,8 @@ #[warn(deprecated_mode)]; use cast; -use oldcomm; use option; -use pipes::{Chan, Port}; +use pipes::{stream, Chan, Port}; use pipes; use prelude::*; use private; @@ -88,6 +87,7 @@ use task::rt::rust_closure; use task::rt; use task::{Failure, ManualThreads, PlatformThread, SchedOpts, SingleThreaded}; use task::{Success, TaskOpts, TaskResult, ThreadPerCore, ThreadPerTask}; +use task::{ExistingScheduler, SchedulerHandle}; use task::{default_task_opts, unkillable}; use uint; use util; @@ -536,9 +536,9 @@ pub fn spawn_raw(opts: TaskOpts, f: fn~()) { // Agh. Get move-mode items into the closure. FIXME (#2829) let (child_tg, ancestors, f) = option::swap_unwrap(child_data); // Create child task. - let new_task = match opts.sched { - None => rt::new_task(), - Some(sched_opts) => new_task_in_new_sched(sched_opts) + let new_task = match opts.sched.mode { + DefaultScheduler => rt::new_task(), + _ => new_task_in_sched(opts.sched) }; assert !new_task.is_null(); // Getting killed after here would leak the task. @@ -642,31 +642,35 @@ pub fn spawn_raw(opts: TaskOpts, f: fn~()) { } } - fn new_task_in_new_sched(opts: SchedOpts) -> *rust_task { - unsafe { - if opts.foreign_stack_size != None { - fail ~"foreign_stack_size scheduler option unimplemented"; - } + fn new_task_in_sched(opts: SchedOpts) -> *rust_task { + if opts.foreign_stack_size != None { + fail ~"foreign_stack_size scheduler option unimplemented"; + } - let num_threads = match opts.mode { - SingleThreaded => 1u, - ThreadPerCore => rt::rust_num_threads(), - ThreadPerTask => { - fail ~"ThreadPerTask scheduling mode unimplemented" - } - ManualThreads(threads) => { - if threads == 0u { - fail ~"can not create a scheduler with no threads"; - } - threads - } - PlatformThread => 0u /* Won't be used */ - }; + let num_threads = match opts.mode { + DefaultScheduler + | CurrentScheduler + | ExistingScheduler(*) + | PlatformThread => 0u, /* Won't be used */ + SingleThreaded => 1u, + ThreadPerCore => unsafe { rt::rust_num_threads() }, + ThreadPerTask => { + fail ~"ThreadPerTask scheduling mode unimplemented" + } + ManualThreads(threads) => { + if threads == 0u { + fail ~"can not create a scheduler with no threads"; + } + threads + } + }; - let sched_id = if opts.mode != PlatformThread { - rt::rust_new_sched(num_threads) - } else { - rt::rust_osmain_sched_id() + unsafe { + let sched_id = match opts.mode { + CurrentScheduler => rt::rust_get_sched_id(), + ExistingScheduler(SchedulerHandle(id)) => id, + PlatformThread => rt::rust_osmain_sched_id(), + _ => rt::rust_new_sched(num_threads) }; rt::rust_new_task_in_sched(sched_id) } @@ -675,12 +679,11 @@ pub fn spawn_raw(opts: TaskOpts, f: fn~()) { #[test] fn test_spawn_raw_simple() { - let po = oldcomm::Port(); - let ch = oldcomm::Chan(&po); + let (po, ch) = stream(); do spawn_raw(default_task_opts()) { - oldcomm::send(ch, ()); + ch.send(()); } - oldcomm::recv(po); + po.recv(); } #[test] diff --git a/src/libstd/flatpipes.rs b/src/libstd/flatpipes.rs index ea7b2442bb9..afc3e72e636 100644 --- a/src/libstd/flatpipes.rs +++ b/src/libstd/flatpipes.rs @@ -792,7 +792,6 @@ mod test { let (finish_port, finish_chan) = pipes::stream(); let addr = ip::v4::parse_addr("127.0.0.1"); - let iotask = uv::global_loop::get(); let begin_connect_chan = Cell(move begin_connect_chan); let accept_chan = Cell(move accept_chan); @@ -800,6 +799,7 @@ mod test { // The server task do task::spawn |copy addr, move begin_connect_chan, move accept_chan| { + let iotask = &uv::global_loop::get(); let begin_connect_chan = begin_connect_chan.take(); let accept_chan = accept_chan.take(); let listen_res = do tcp::listen( @@ -831,6 +831,7 @@ mod test { begin_connect_port.recv(); debug!("connecting"); + let iotask = &uv::global_loop::get(); let connect_result = tcp::connect(copy addr, port, iotask); assert connect_result.is_ok(); let sock = result::unwrap(move connect_result); diff --git a/src/libstd/net_ip.rs b/src/libstd/net_ip.rs index 84c3b755649..72e58cbd5d3 100644 --- a/src/libstd/net_ip.rs +++ b/src/libstd/net_ip.rs @@ -114,7 +114,7 @@ enum IpGetAddrErr { * a vector of `ip_addr` results, in the case of success, or an error * object in the case of failure */ -pub fn get_addr(node: &str, iotask: iotask) +pub fn get_addr(node: &str, iotask: &iotask) -> result::Result<~[IpAddr], IpGetAddrErr> { do oldcomm::listen |output_ch| { do str::as_buf(node) |node_ptr, len| { @@ -419,7 +419,7 @@ mod test { #[ignore(reason = "valgrind says it's leaky")] fn test_ip_get_addr() { let localhost_name = ~"localhost"; - let iotask = uv::global_loop::get(); + let iotask = &uv::global_loop::get(); let ga_result = get_addr(localhost_name, iotask); if result::is_err(&ga_result) { fail ~"got err result from net::ip::get_addr();" @@ -445,7 +445,7 @@ mod test { #[ignore(reason = "valgrind says it's leaky")] fn test_ip_get_addr_bad_input() { let localhost_name = ~"sjkl234m,./sdf"; - let iotask = uv::global_loop::get(); + let iotask = &uv::global_loop::get(); let ga_result = get_addr(localhost_name, iotask); assert result::is_err(&ga_result); } diff --git a/src/libstd/net_tcp.rs b/src/libstd/net_tcp.rs index aa5eec2b43c..8d6de369479 100644 --- a/src/libstd/net_tcp.rs +++ b/src/libstd/net_tcp.rs @@ -143,7 +143,7 @@ pub enum TcpConnectErrData { * `net::tcp::tcp_connect_err_data` instance will be returned */ pub fn connect(input_ip: ip::IpAddr, port: uint, - iotask: IoTask) + iotask: &IoTask) -> result::Result<TcpSocket, TcpConnectErrData> { unsafe { let result_po = oldcomm::Port::<ConnAttempt>(); @@ -166,106 +166,116 @@ pub fn connect(input_ip: ip::IpAddr, port: uint, ip::Ipv4(_) => { false } ip::Ipv6(_) => { true } }, - iotask: iotask + iotask: iotask.clone() }; let socket_data_ptr = ptr::addr_of(&(*socket_data)); log(debug, fmt!("tcp_connect result_ch %?", conn_data.result_ch)); // get an unsafe representation of our stream_handle_ptr that // we can send into the interact cb to be handled in libuv.. log(debug, fmt!("stream_handle_ptr outside interact %?", - stream_handle_ptr)); + stream_handle_ptr)); do iotask::interact(iotask) |move input_ip, loop_ptr| { unsafe { log(debug, ~"in interact cb for tcp client connect.."); log(debug, fmt!("stream_handle_ptr in interact %?", - stream_handle_ptr)); + stream_handle_ptr)); match uv::ll::tcp_init( loop_ptr, stream_handle_ptr) { - 0i32 => { - log(debug, ~"tcp_init successful"); - log(debug, ~"dealing w/ ipv4 connection.."); - let connect_req_ptr = - ptr::addr_of(&((*socket_data_ptr).connect_req)); - let addr_str = ip::format_addr(&input_ip); - let connect_result = match input_ip { - ip::Ipv4(ref addr) => { - // have to "recreate" the sockaddr_in/6 - // since the ip_addr discards the port - // info.. should probably add an additional - // rust type that actually is closer to - // what the libuv API expects (ip str + port num) - log(debug, fmt!("addr: %?", addr)); - let in_addr = uv::ll::ip4_addr(addr_str, port as int); - uv::ll::tcp_connect( - connect_req_ptr, - stream_handle_ptr, - ptr::addr_of(&in_addr), - tcp_connect_on_connect_cb) - } - ip::Ipv6(ref addr) => { - log(debug, fmt!("addr: %?", addr)); - let in_addr = uv::ll::ip6_addr(addr_str, port as int); - uv::ll::tcp_connect6( - connect_req_ptr, - stream_handle_ptr, - ptr::addr_of(&in_addr), - tcp_connect_on_connect_cb) - } - }; - match connect_result { - 0i32 => { - log(debug, ~"tcp_connect successful"); - // reusable data that we'll have for the - // duration.. - uv::ll::set_data_for_uv_handle(stream_handle_ptr, - socket_data_ptr as - *libc::c_void); - // just so the connect_cb can send the - // outcome.. - uv::ll::set_data_for_req(connect_req_ptr, - conn_data_ptr); - log(debug, ~"leaving tcp_connect interact cb..."); - // let tcp_connect_on_connect_cb send on - // the result_ch, now.. - } - _ => { - // immediate connect failure.. probably a garbage - // ip or somesuch + 0i32 => { + log(debug, ~"tcp_init successful"); + log(debug, ~"dealing w/ ipv4 connection.."); + let connect_req_ptr = + ptr::addr_of(&((*socket_data_ptr).connect_req)); + let addr_str = ip::format_addr(&input_ip); + let connect_result = match input_ip { + ip::Ipv4(ref addr) => { + // have to "recreate" the + // sockaddr_in/6 since the ip_addr + // discards the port info.. should + // probably add an additional rust + // type that actually is closer to + // what the libuv API expects (ip str + // + port num) + log(debug, fmt!("addr: %?", addr)); + let in_addr = uv::ll::ip4_addr(addr_str, + port as int); + uv::ll::tcp_connect( + connect_req_ptr, + stream_handle_ptr, + ptr::addr_of(&in_addr), + tcp_connect_on_connect_cb) + } + ip::Ipv6(ref addr) => { + log(debug, fmt!("addr: %?", addr)); + let in_addr = uv::ll::ip6_addr(addr_str, + port as int); + uv::ll::tcp_connect6( + connect_req_ptr, + stream_handle_ptr, + ptr::addr_of(&in_addr), + tcp_connect_on_connect_cb) + } + }; + match connect_result { + 0i32 => { + log(debug, ~"tcp_connect successful"); + // reusable data that we'll have for the + // duration.. + uv::ll::set_data_for_uv_handle( + stream_handle_ptr, + socket_data_ptr as + *libc::c_void); + // just so the connect_cb can send the + // outcome.. + uv::ll::set_data_for_req(connect_req_ptr, + conn_data_ptr); + log(debug, + ~"leaving tcp_connect interact cb..."); + // let tcp_connect_on_connect_cb send on + // the result_ch, now.. + } + _ => { + // immediate connect + // failure.. probably a garbage ip or + // somesuch + let err_data = + uv::ll::get_last_err_data(loop_ptr); + oldcomm::send((*conn_data_ptr).result_ch, + ConnFailure(err_data)); + uv::ll::set_data_for_uv_handle( + stream_handle_ptr, + conn_data_ptr); + uv::ll::close(stream_handle_ptr, + stream_error_close_cb); + } + } + } + _ => { + // failure to create a tcp handle let err_data = uv::ll::get_last_err_data(loop_ptr); oldcomm::send((*conn_data_ptr).result_ch, - ConnFailure(err_data)); - uv::ll::set_data_for_uv_handle(stream_handle_ptr, - conn_data_ptr); - uv::ll::close(stream_handle_ptr, - stream_error_close_cb); - } + ConnFailure(err_data)); } - } - _ => { - // failure to create a tcp handle - let err_data = uv::ll::get_last_err_data(loop_ptr); - oldcomm::send((*conn_data_ptr).result_ch, - ConnFailure(err_data)); - } } } - }; + } match oldcomm::recv(result_po) { - ConnSuccess => { - log(debug, ~"tcp::connect - received success on result_po"); - result::Ok(TcpSocket(socket_data)) - } - ConnFailure(ref err_data) => { - oldcomm::recv(closed_signal_po); - log(debug, ~"tcp::connect - received failure on result_po"); - // still have to free the malloc'd stream handle.. - rustrt::rust_uv_current_kernel_free(stream_handle_ptr - as *libc::c_void); - let tcp_conn_err = match err_data.err_name { - ~"ECONNREFUSED" => ConnectionRefused, - _ => GenericConnectErr(err_data.err_name, err_data.err_msg) - }; - result::Err(tcp_conn_err) - } + ConnSuccess => { + log(debug, ~"tcp::connect - received success on result_po"); + result::Ok(TcpSocket(socket_data)) + } + ConnFailure(ref err_data) => { + oldcomm::recv(closed_signal_po); + log(debug, ~"tcp::connect - received failure on result_po"); + // still have to free the malloc'd stream handle.. + rustrt::rust_uv_current_kernel_free(stream_handle_ptr + as *libc::c_void); + let tcp_conn_err = match err_data.err_name { + ~"ECONNREFUSED" => ConnectionRefused, + _ => GenericConnectErr(err_data.err_name, + err_data.err_msg) + }; + result::Err(tcp_conn_err) + } } } } @@ -506,71 +516,79 @@ fn read_future(sock: &TcpSocket, timeout_msecs: uint) pub fn accept(new_conn: TcpNewConnection) -> result::Result<TcpSocket, TcpErrData> { unsafe { - match new_conn { - NewTcpConn(server_handle_ptr) => { - let server_data_ptr = uv::ll::get_data_for_uv_handle( - server_handle_ptr) as *TcpListenFcData; - let reader_po = oldcomm::Port(); - let iotask = (*server_data_ptr).iotask; - let stream_handle_ptr = malloc_uv_tcp_t(); - *(stream_handle_ptr as *mut uv::ll::uv_tcp_t) = uv::ll::tcp_t(); - let client_socket_data = @TcpSocketData { - reader_po: reader_po, - reader_ch: oldcomm::Chan(&reader_po), - stream_handle_ptr : stream_handle_ptr, - connect_req : uv::ll::connect_t(), - write_req : uv::ll::write_t(), - ipv6: (*server_data_ptr).ipv6, - iotask : iotask - }; - let client_socket_data_ptr = ptr::addr_of(&(*client_socket_data)); - let client_stream_handle_ptr = - (*client_socket_data_ptr).stream_handle_ptr; - - let result_po = oldcomm::Port::<Option<TcpErrData>>(); - let result_ch = oldcomm::Chan(&result_po); - - // UNSAFE LIBUV INTERACTION BEGIN - // .. normally this happens within the context of - // a call to uv::hl::interact.. but we're breaking - // the rules here because this always has to be - // called within the context of a listen() new_connect_cb - // callback (or it will likely fail and drown your cat) - log(debug, ~"in interact cb for tcp::accept"); - let loop_ptr = uv::ll::get_loop_for_uv_handle( - server_handle_ptr); - match uv::ll::tcp_init(loop_ptr, client_stream_handle_ptr) { - 0i32 => { - log(debug, ~"uv_tcp_init successful for client stream"); - match uv::ll::accept( - server_handle_ptr as *libc::c_void, - client_stream_handle_ptr as *libc::c_void) { - 0i32 => { - log(debug, ~"successfully accepted client connection"); - uv::ll::set_data_for_uv_handle(client_stream_handle_ptr, - client_socket_data_ptr - as *libc::c_void); - oldcomm::send(result_ch, None); - } - _ => { - log(debug, ~"failed to accept client conn"); - oldcomm::send(result_ch, Some( - uv::ll::get_last_err_data(loop_ptr).to_tcp_err())); - } + match new_conn{ + NewTcpConn(server_handle_ptr) => { + let server_data_ptr = uv::ll::get_data_for_uv_handle( + server_handle_ptr) as *TcpListenFcData; + let reader_po = oldcomm::Port(); + let iotask = &(*server_data_ptr).iotask; + let stream_handle_ptr = malloc_uv_tcp_t(); + *(stream_handle_ptr as *mut uv::ll::uv_tcp_t) = + uv::ll::tcp_t(); + let client_socket_data: @TcpSocketData = @TcpSocketData { + reader_po: reader_po, + reader_ch: oldcomm::Chan(&reader_po), + stream_handle_ptr : stream_handle_ptr, + connect_req : uv::ll::connect_t(), + write_req : uv::ll::write_t(), + ipv6: (*server_data_ptr).ipv6, + iotask : iotask.clone() + }; + let client_socket_data_ptr = ptr::addr_of( + &(*client_socket_data)); + let client_stream_handle_ptr = + (*client_socket_data_ptr).stream_handle_ptr; + + let result_po = oldcomm::Port::<Option<TcpErrData>>(); + let result_ch = oldcomm::Chan(&result_po); + + // UNSAFE LIBUV INTERACTION BEGIN + // .. normally this happens within the context of + // a call to uv::hl::interact.. but we're breaking + // the rules here because this always has to be + // called within the context of a listen() new_connect_cb + // callback (or it will likely fail and drown your cat) + log(debug, ~"in interact cb for tcp::accept"); + let loop_ptr = uv::ll::get_loop_for_uv_handle( + server_handle_ptr); + match uv::ll::tcp_init(loop_ptr, client_stream_handle_ptr) { + 0i32 => { + log(debug, ~"uv_tcp_init successful for \ + client stream"); + match uv::ll::accept( + server_handle_ptr as *libc::c_void, + client_stream_handle_ptr as *libc::c_void) { + 0i32 => { + log(debug, + ~"successfully accepted client \ + connection"); + uv::ll::set_data_for_uv_handle( + client_stream_handle_ptr, + client_socket_data_ptr + as *libc::c_void); + oldcomm::send(result_ch, None); + } + _ => { + log(debug, ~"failed to accept client conn"); + oldcomm::send(result_ch, Some( + uv::ll::get_last_err_data( + loop_ptr).to_tcp_err())); + } + } + } + _ => { + log(debug, ~"failed to accept client stream"); + oldcomm::send(result_ch, Some( + uv::ll::get_last_err_data( + loop_ptr).to_tcp_err())); + } + } + // UNSAFE LIBUV INTERACTION END + match oldcomm::recv(result_po) { + Some(copy err_data) => result::Err(err_data), + None => result::Ok(TcpSocket(client_socket_data)) } - } - _ => { - log(debug, ~"failed to init client stream"); - oldcomm::send(result_ch, Some( - uv::ll::get_last_err_data(loop_ptr).to_tcp_err())); - } - } - // UNSAFE LIBUV INTERACTION END - match oldcomm::recv(result_po) { - Some(copy err_data) => result::Err(err_data), - None => result::Ok(TcpSocket(client_socket_data)) } - } } } } @@ -604,30 +622,27 @@ pub fn accept(new_conn: TcpNewConnection) * of listen exiting because of an error */ pub fn listen(host_ip: ip::IpAddr, port: uint, backlog: uint, - iotask: IoTask, - on_establish_cb: fn~(oldcomm::Chan<Option<TcpErrData>>), - new_connect_cb: fn~(TcpNewConnection, - oldcomm::Chan<Option<TcpErrData>>)) + iotask: &IoTask, + on_establish_cb: fn~(oldcomm::Chan<Option<TcpErrData>>), + new_connect_cb: fn~(TcpNewConnection, + oldcomm::Chan<Option<TcpErrData>>)) -> result::Result<(), TcpListenErrData> { - unsafe { - do listen_common(move host_ip, port, backlog, iotask, - move on_establish_cb) - // on_connect_cb - |move new_connect_cb, handle| { - unsafe { - let server_data_ptr = - uv::ll::get_data_for_uv_handle(handle) - as *TcpListenFcData; - let new_conn = NewTcpConn(handle); - let kill_ch = (*server_data_ptr).kill_ch; - new_connect_cb(new_conn, kill_ch); - } - } + do listen_common(move host_ip, port, backlog, iotask, + move on_establish_cb) + // on_connect_cb + |move new_connect_cb, handle| { + unsafe { + let server_data_ptr = uv::ll::get_data_for_uv_handle(handle) + as *TcpListenFcData; + let new_conn = NewTcpConn(handle); + let kill_ch = (*server_data_ptr).kill_ch; + new_connect_cb(new_conn, kill_ch); + } } } fn listen_common(host_ip: ip::IpAddr, port: uint, backlog: uint, - iotask: IoTask, + iotask: &IoTask, on_establish_cb: fn~(oldcomm::Chan<Option<TcpErrData>>), on_connect_cb: fn~(*uv::ll::uv_tcp_t)) -> result::Result<(), TcpListenErrData> { @@ -637,12 +652,12 @@ fn listen_common(host_ip: ip::IpAddr, port: uint, backlog: uint, let kill_ch = oldcomm::Chan(&kill_po); let server_stream = uv::ll::tcp_t(); let server_stream_ptr = ptr::addr_of(&server_stream); - let server_data = { + let server_data: TcpListenFcData = TcpListenFcData { server_stream_ptr: server_stream_ptr, stream_closed_ch: oldcomm::Chan(&stream_closed_po), kill_ch: kill_ch, on_connect_cb: move on_connect_cb, - iotask: iotask, + iotask: iotask.clone(), ipv6: match &host_ip { &ip::Ipv4(_) => { false } &ip::Ipv6(_) => { true } @@ -662,114 +677,123 @@ fn listen_common(host_ip: ip::IpAddr, port: uint, backlog: uint, do iotask::interact(iotask) |move loc_ip, loop_ptr| { unsafe { match uv::ll::tcp_init(loop_ptr, server_stream_ptr) { - 0i32 => { - uv::ll::set_data_for_uv_handle( - server_stream_ptr, - server_data_ptr); - let addr_str = ip::format_addr(&loc_ip); - let bind_result = match loc_ip { - ip::Ipv4(ref addr) => { - log(debug, fmt!("addr: %?", addr)); - let in_addr = uv::ll::ip4_addr(addr_str, - port as int); - uv::ll::tcp_bind(server_stream_ptr, - ptr::addr_of(&in_addr)) - } - ip::Ipv6(ref addr) => { - log(debug, fmt!("addr: %?", addr)); - let in_addr = uv::ll::ip6_addr(addr_str, - port as int); - uv::ll::tcp_bind6(server_stream_ptr, - ptr::addr_of(&in_addr)) - } - }; - match bind_result { - 0i32 => { - match uv::ll::listen(server_stream_ptr, - backlog as libc::c_int, - tcp_lfc_on_connection_cb) { - 0i32 => oldcomm::send(setup_ch, None), - _ => { - log(debug, ~"failure to uv_listen()"); - let err_data = uv::ll::get_last_err_data( - loop_ptr); - oldcomm::send(setup_ch, Some(err_data)); - } + 0i32 => { + uv::ll::set_data_for_uv_handle( + server_stream_ptr, + server_data_ptr); + let addr_str = ip::format_addr(&loc_ip); + let bind_result = match loc_ip { + ip::Ipv4(ref addr) => { + log(debug, fmt!("addr: %?", addr)); + let in_addr = uv::ll::ip4_addr( + addr_str, + port as int); + uv::ll::tcp_bind(server_stream_ptr, + ptr::addr_of(&in_addr)) + } + ip::Ipv6(ref addr) => { + log(debug, fmt!("addr: %?", addr)); + let in_addr = uv::ll::ip6_addr( + addr_str, + port as int); + uv::ll::tcp_bind6(server_stream_ptr, + ptr::addr_of(&in_addr)) + } + }; + match bind_result { + 0i32 => { + match uv::ll::listen( + server_stream_ptr, + backlog as libc::c_int, + tcp_lfc_on_connection_cb) { + 0i32 => oldcomm::send(setup_ch, None), + _ => { + log(debug, + ~"failure to uv_tcp_init"); + let err_data = + uv::ll::get_last_err_data( + loop_ptr); + oldcomm::send(setup_ch, + Some(err_data)); + } + } + } + _ => { + log(debug, ~"failure to uv_tcp_bind"); + let err_data = uv::ll::get_last_err_data( + loop_ptr); + oldcomm::send(setup_ch, Some(err_data)); + } } - } - _ => { + } + _ => { log(debug, ~"failure to uv_tcp_bind"); let err_data = uv::ll::get_last_err_data( loop_ptr); oldcomm::send(setup_ch, Some(err_data)); - } } - } - _ => { - log(debug, ~"failure to uv_tcp_init"); - let err_data = uv::ll::get_last_err_data(loop_ptr); - oldcomm::send(setup_ch, Some(err_data)); - } } - }; + } } setup_ch.recv() }; match setup_result { - Some(ref err_data) => { - do iotask::interact(iotask) |loop_ptr| { - unsafe { - log(debug, - fmt!("tcp::listen post-kill recv hl interact %?", - loop_ptr)); - (*server_data_ptr).active = false; - uv::ll::close(server_stream_ptr, tcp_lfc_close_cb); + Some(ref err_data) => { + do iotask::interact(iotask) |loop_ptr| { + unsafe { + log(debug, + fmt!("tcp::listen post-kill recv hl interact %?", + loop_ptr)); + (*server_data_ptr).active = false; + uv::ll::close(server_stream_ptr, tcp_lfc_close_cb); + } + }; + stream_closed_po.recv(); + match err_data.err_name { + ~"EACCES" => { + log(debug, ~"Got EACCES error"); + result::Err(AccessDenied) + } + ~"EADDRINUSE" => { + log(debug, ~"Got EADDRINUSE error"); + result::Err(AddressInUse) + } + _ => { + log(debug, fmt!("Got '%s' '%s' libuv error", + err_data.err_name, err_data.err_msg)); + result::Err( + GenericListenErr(err_data.err_name, + err_data.err_msg)) + } } - }; - stream_closed_po.recv(); - match err_data.err_name { - ~"EACCES" => { - log(debug, ~"Got EACCES error"); - result::Err(AccessDenied) - } - ~"EADDRINUSE" => { - log(debug, ~"Got EADDRINUSE error"); - result::Err(AddressInUse) - } - _ => { - log(debug, fmt!("Got '%s' '%s' libuv error", - err_data.err_name, err_data.err_msg)); - result::Err( - GenericListenErr(err_data.err_name, err_data.err_msg)) - } } - } - None => { - on_establish_cb(kill_ch); - let kill_result = oldcomm::recv(kill_po); - do iotask::interact(iotask) |loop_ptr| { - unsafe { - log(debug, - fmt!("tcp::listen post-kill recv hl interact %?", - loop_ptr)); - (*server_data_ptr).active = false; - uv::ll::close(server_stream_ptr, tcp_lfc_close_cb); + None => { + on_establish_cb(kill_ch); + let kill_result = oldcomm::recv(kill_po); + do iotask::interact(iotask) |loop_ptr| { + unsafe { + log(debug, + fmt!("tcp::listen post-kill recv hl interact %?", + loop_ptr)); + (*server_data_ptr).active = false; + uv::ll::close(server_stream_ptr, tcp_lfc_close_cb); + } + }; + stream_closed_po.recv(); + match kill_result { + // some failure post bind/listen + Some(ref err_data) => result::Err(GenericListenErr( + err_data.err_name, + err_data.err_msg)), + // clean exit + None => result::Ok(()) } - }; - stream_closed_po.recv(); - match kill_result { - // some failure post bind/listen - Some(ref err_data) => result::Err(GenericListenErr( - err_data.err_name, - err_data.err_msg)), - // clean exit - None => result::Ok(()) } - } } } } + /** * Convert a `net::tcp::tcp_socket` to a `net::tcp::tcp_socket_buf`. * @@ -936,11 +960,11 @@ fn tear_down_socket_data(socket_data: @TcpSocketData) { }; let close_data_ptr = ptr::addr_of(&close_data); let stream_handle_ptr = (*socket_data).stream_handle_ptr; - do iotask::interact((*socket_data).iotask) |loop_ptr| { + do iotask::interact(&(*socket_data).iotask) |loop_ptr| { unsafe { log(debug, fmt!("interact dtor for tcp_socket stream %? loop %?", - stream_handle_ptr, loop_ptr)); + stream_handle_ptr, loop_ptr)); uv::ll::set_data_for_uv_handle(stream_handle_ptr, close_data_ptr); uv::ll::close(stream_handle_ptr, tcp_socket_dtor_close_cb); @@ -950,7 +974,7 @@ fn tear_down_socket_data(socket_data: @TcpSocketData) { //the line below will most likely crash //log(debug, fmt!("about to free socket_data at %?", socket_data)); rustrt::rust_uv_current_kernel_free(stream_handle_ptr - as *libc::c_void); + as *libc::c_void); log(debug, ~"exiting dtor for tcp_socket"); } } @@ -962,7 +986,7 @@ fn read_common_impl(socket_data: *TcpSocketData, timeout_msecs: uint) use timer; log(debug, ~"starting tcp::read"); - let iotask = (*socket_data).iotask; + let iotask = &(*socket_data).iotask; let rs_result = read_start_common_impl(socket_data); if result::is_err(&rs_result) { let err_data = result::get_err(&rs_result); @@ -972,26 +996,26 @@ fn read_common_impl(socket_data: *TcpSocketData, timeout_msecs: uint) log(debug, ~"tcp::read before recv_timeout"); let read_result = if timeout_msecs > 0u { timer::recv_timeout( - iotask, timeout_msecs, result::get(&rs_result)) + iotask, timeout_msecs, result::get(&rs_result)) } else { Some(oldcomm::recv(result::get(&rs_result))) }; log(debug, ~"tcp::read after recv_timeout"); match move read_result { - None => { - log(debug, ~"tcp::read: timed out.."); - let err_data = TcpErrData { - err_name: ~"TIMEOUT", - err_msg: ~"req timed out" - }; - read_stop_common_impl(socket_data); - result::Err(err_data) - } - Some(move data_result) => { - log(debug, ~"tcp::read got data"); - read_stop_common_impl(socket_data); - data_result - } + None => { + log(debug, ~"tcp::read: timed out.."); + let err_data = TcpErrData { + err_name: ~"TIMEOUT", + err_msg: ~"req timed out" + }; + read_stop_common_impl(socket_data); + result::Err(err_data) + } + Some(move data_result) => { + log(debug, ~"tcp::read got data"); + read_stop_common_impl(socket_data); + data_result + } } } } @@ -1004,27 +1028,26 @@ fn read_stop_common_impl(socket_data: *TcpSocketData) -> let stream_handle_ptr = (*socket_data).stream_handle_ptr; let stop_po = oldcomm::Port::<Option<TcpErrData>>(); let stop_ch = oldcomm::Chan(&stop_po); - do iotask::interact((*socket_data).iotask) |loop_ptr| { + do iotask::interact(&(*socket_data).iotask) |loop_ptr| { unsafe { log(debug, ~"in interact cb for tcp::read_stop"); - match uv::ll::read_stop(stream_handle_ptr as - *uv::ll::uv_stream_t) { - 0i32 => { - log(debug, ~"successfully called uv_read_stop"); - oldcomm::send(stop_ch, None); - } - _ => { - log(debug, ~"failure in calling uv_read_stop"); - let err_data = uv::ll::get_last_err_data(loop_ptr); - oldcomm::send(stop_ch, Some(err_data.to_tcp_err())); - } + match uv::ll::read_stop(stream_handle_ptr + as *uv::ll::uv_stream_t) { + 0i32 => { + log(debug, ~"successfully called uv_read_stop"); + oldcomm::send(stop_ch, None); + } + _ => { + log(debug, ~"failure in calling uv_read_stop"); + let err_data = uv::ll::get_last_err_data(loop_ptr); + oldcomm::send(stop_ch, Some(err_data.to_tcp_err())); + } } } - }; - + } match oldcomm::recv(stop_po) { - Some(move err_data) => Err(err_data), - None => Ok(()) + Some(move err_data) => Err(err_data), + None => Ok(()) } } } @@ -1038,29 +1061,29 @@ fn read_start_common_impl(socket_data: *TcpSocketData) let start_po = oldcomm::Port::<Option<uv::ll::uv_err_data>>(); let start_ch = oldcomm::Chan(&start_po); log(debug, ~"in tcp::read_start before interact loop"); - do iotask::interact((*socket_data).iotask) |loop_ptr| { + do iotask::interact(&(*socket_data).iotask) |loop_ptr| { unsafe { - log(debug, - fmt!("in tcp::read_start interact cb %?", loop_ptr)); - match uv::ll::read_start(stream_handle_ptr as - *uv::ll::uv_stream_t, + log(debug, fmt!("in tcp::read_start interact cb %?", + loop_ptr)); + match uv::ll::read_start(stream_handle_ptr + as *uv::ll::uv_stream_t, on_alloc_cb, on_tcp_read_cb) { - 0i32 => { - log(debug, ~"success doing uv_read_start"); - oldcomm::send(start_ch, None); - } - _ => { - log(debug, ~"error attempting uv_read_start"); - let err_data = uv::ll::get_last_err_data(loop_ptr); - oldcomm::send(start_ch, Some(err_data)); - } + 0i32 => { + log(debug, ~"success doing uv_read_start"); + oldcomm::send(start_ch, None); + } + _ => { + log(debug, ~"error attempting uv_read_start"); + let err_data = uv::ll::get_last_err_data(loop_ptr); + oldcomm::send(start_ch, Some(err_data)); + } } } - }; + } match oldcomm::recv(start_po) { - Some(ref err_data) => result::Err(err_data.to_tcp_err()), - None => result::Ok((*socket_data).reader_po) + Some(ref err_data) => result::Err(err_data.to_tcp_err()), + None => result::Ok((*socket_data).reader_po) } } } @@ -1084,27 +1107,28 @@ fn write_common_impl(socket_data_ptr: *TcpSocketData, result_ch: oldcomm::Chan(&result_po) }; let write_data_ptr = ptr::addr_of(&write_data); - do iotask::interact((*socket_data_ptr).iotask) |loop_ptr| { + do iotask::interact(&(*socket_data_ptr).iotask) |loop_ptr| { unsafe { log(debug, fmt!("in interact cb for tcp::write %?", loop_ptr)); match uv::ll::write(write_req_ptr, - stream_handle_ptr, - write_buf_vec_ptr, - tcp_write_complete_cb) { - 0i32 => { - log(debug, ~"uv_write() invoked successfully"); - uv::ll::set_data_for_req(write_req_ptr, write_data_ptr); - } - _ => { - log(debug, ~"error invoking uv_write()"); - let err_data = uv::ll::get_last_err_data(loop_ptr); - oldcomm::send((*write_data_ptr).result_ch, - TcpWriteError(err_data.to_tcp_err())); - } + stream_handle_ptr, + write_buf_vec_ptr, + tcp_write_complete_cb) { + 0i32 => { + log(debug, ~"uv_write() invoked successfully"); + uv::ll::set_data_for_req(write_req_ptr, + write_data_ptr); + } + _ => { + log(debug, ~"error invoking uv_write()"); + let err_data = uv::ll::get_last_err_data(loop_ptr); + oldcomm::send((*write_data_ptr).result_ch, + TcpWriteError(err_data.to_tcp_err())); + } } } - }; + } // FIXME (#2656): Instead of passing unsafe pointers to local data, // and waiting here for the write to complete, we should transfer // ownership of everything to the I/O task and let it deal with the @@ -1473,7 +1497,7 @@ pub mod test { } } pub fn impl_gl_tcp_ipv4_server_and_client() { - let hl_loop = uv::global_loop::get(); + let hl_loop = &uv::global_loop::get(); let server_ip = ~"127.0.0.1"; let server_port = 8888u; let expected_req = ~"ping"; @@ -1485,6 +1509,7 @@ pub mod test { let cont_po = oldcomm::Port::<()>(); let cont_ch = oldcomm::Chan(&cont_po); // server + let hl_loop_clone = hl_loop.clone(); do task::spawn_sched(task::ManualThreads(1u)) { let actual_req = do oldcomm::listen |server_ch| { run_tcp_test_server( @@ -1493,7 +1518,7 @@ pub mod test { expected_resp, server_ch, cont_ch, - hl_loop) + &hl_loop_clone) }; server_result_ch.send(actual_req); }; @@ -1519,7 +1544,7 @@ pub mod test { assert str::contains(actual_resp, expected_resp); } pub fn impl_gl_tcp_ipv4_get_peer_addr() { - let hl_loop = uv::global_loop::get(); + let hl_loop = &uv::global_loop::get(); let server_ip = ~"127.0.0.1"; let server_port = 8887u; let expected_resp = ~"pong"; @@ -1530,6 +1555,7 @@ pub mod test { let cont_po = oldcomm::Port::<()>(); let cont_ch = oldcomm::Chan(&cont_po); // server + let hl_loop_clone = hl_loop.clone(); do task::spawn_sched(task::ManualThreads(1u)) { let actual_req = do oldcomm::listen |server_ch| { run_tcp_test_server( @@ -1538,7 +1564,7 @@ pub mod test { expected_resp, server_ch, cont_ch, - hl_loop) + &hl_loop_clone) }; server_result_ch.send(actual_req); }; @@ -1549,10 +1575,11 @@ pub mod test { let server_ip_addr = ip::v4::parse_addr(server_ip); let iotask = uv::global_loop::get(); let connect_result = connect(move server_ip_addr, server_port, - iotask); + &iotask); let sock = result::unwrap(move connect_result); + debug!("testing peer address"); // This is what we are actually testing! assert net::ip::format_addr(&sock.get_peer_addr()) == ~"127.0.0.1"; @@ -1561,12 +1588,14 @@ pub mod test { // Fulfill the protocol the test server expects let resp_bytes = str::to_bytes(~"ping"); tcp_write_single(&sock, resp_bytes); + debug!("message sent"); let read_result = sock.read(0u); client_ch.send(str::from_bytes(read_result.get())); + debug!("result read"); }; } pub fn impl_gl_tcp_ipv4_client_error_connection_refused() { - let hl_loop = uv::global_loop::get(); + let hl_loop = &uv::global_loop::get(); let server_ip = ~"127.0.0.1"; let server_port = 8889u; let expected_req = ~"ping"; @@ -1586,7 +1615,7 @@ pub mod test { } } pub fn impl_gl_tcp_ipv4_server_address_in_use() { - let hl_loop = uv::global_loop::get(); + let hl_loop = &uv::global_loop::get(); let server_ip = ~"127.0.0.1"; let server_port = 8890u; let expected_req = ~"ping"; @@ -1598,6 +1627,7 @@ pub mod test { let cont_po = oldcomm::Port::<()>(); let cont_ch = oldcomm::Chan(&cont_po); // server + let hl_loop_clone = hl_loop.clone(); do task::spawn_sched(task::ManualThreads(1u)) { let actual_req = do oldcomm::listen |server_ch| { run_tcp_test_server( @@ -1606,7 +1636,7 @@ pub mod test { expected_resp, server_ch, cont_ch, - hl_loop) + &hl_loop_clone) }; server_result_ch.send(actual_req); }; @@ -1637,7 +1667,7 @@ pub mod test { } } pub fn impl_gl_tcp_ipv4_server_access_denied() { - let hl_loop = uv::global_loop::get(); + let hl_loop = &uv::global_loop::get(); let server_ip = ~"127.0.0.1"; let server_port = 80u; // this one should fail.. @@ -1657,7 +1687,7 @@ pub mod test { } pub fn impl_gl_tcp_ipv4_server_client_reader_writer() { - let iotask = uv::global_loop::get(); + let iotask = &uv::global_loop::get(); let server_ip = ~"127.0.0.1"; let server_port = 8891u; let expected_req = ~"ping"; @@ -1669,6 +1699,7 @@ pub mod test { let cont_po = oldcomm::Port::<()>(); let cont_ch = oldcomm::Chan(&cont_po); // server + let iotask_clone = iotask.clone(); do task::spawn_sched(task::ManualThreads(1u)) { let actual_req = do oldcomm::listen |server_ch| { run_tcp_test_server( @@ -1677,7 +1708,7 @@ pub mod test { expected_resp, server_ch, cont_ch, - iotask) + &iotask_clone) }; server_result_ch.send(actual_req); }; @@ -1708,7 +1739,7 @@ pub mod test { pub fn impl_tcp_socket_impl_reader_handles_eof() { use core::io::{Reader,ReaderUtil}; - let hl_loop = uv::global_loop::get(); + let hl_loop = &uv::global_loop::get(); let server_ip = ~"127.0.0.1"; let server_port = 10041u; let expected_req = ~"GET /"; @@ -1720,6 +1751,7 @@ pub mod test { let cont_po = oldcomm::Port::<()>(); let cont_ch = oldcomm::Chan(&cont_po); // server + let hl_loop_clone = hl_loop.clone(); do task::spawn_sched(task::ManualThreads(1u)) { let actual_req = do oldcomm::listen |server_ch| { run_tcp_test_server( @@ -1728,7 +1760,7 @@ pub mod test { expected_resp, server_ch, cont_ch, - hl_loop) + &hl_loop_clone) }; server_result_ch.send(actual_req); }; @@ -1768,7 +1800,7 @@ pub mod test { fn run_tcp_test_server(server_ip: &str, server_port: uint, resp: ~str, server_ch: oldcomm::Chan<~str>, cont_ch: oldcomm::Chan<()>, - iotask: IoTask) -> ~str { + iotask: &IoTask) -> ~str { let server_ip_addr = ip::v4::parse_addr(server_ip); let listen_result = listen(move server_ip_addr, server_port, 128, iotask, @@ -1855,7 +1887,7 @@ pub mod test { } fn run_tcp_test_server_fail(server_ip: &str, server_port: uint, - iotask: IoTask) -> TcpListenErrData { + iotask: &IoTask) -> TcpListenErrData { let server_ip_addr = ip::v4::parse_addr(server_ip); let listen_result = listen(move server_ip_addr, server_port, 128, iotask, @@ -1879,7 +1911,7 @@ pub mod test { fn run_tcp_test_client(server_ip: &str, server_port: uint, resp: &str, client_ch: oldcomm::Chan<~str>, - iotask: IoTask) -> result::Result<~str, + iotask: &IoTask) -> result::Result<~str, TcpConnectErrData> { let server_ip_addr = ip::v4::parse_addr(server_ip); diff --git a/src/libstd/timer.rs b/src/libstd/timer.rs index f8147c532e6..a3e39bc7bbc 100644 --- a/src/libstd/timer.rs +++ b/src/libstd/timer.rs @@ -39,7 +39,7 @@ use core; * * ch - a channel of type T to send a `val` on * * val - a value of type T to send over the provided `ch` */ -pub fn delayed_send<T: Owned>(iotask: IoTask, +pub fn delayed_send<T: Owned>(iotask: &IoTask, msecs: uint, ch: oldcomm::Chan<T>, val: T) { @@ -92,7 +92,7 @@ pub fn delayed_send<T: Owned>(iotask: IoTask, * * `iotask` - a `uv::iotask` that the tcp request will run on * * msecs - an amount of time, in milliseconds, for the current task to block */ -pub fn sleep(iotask: IoTask, msecs: uint) { +pub fn sleep(iotask: &IoTask, msecs: uint) { let exit_po = oldcomm::Port::<()>(); let exit_ch = oldcomm::Chan(&exit_po); delayed_send(iotask, msecs, exit_ch, ()); @@ -119,7 +119,7 @@ pub fn sleep(iotask: IoTask, msecs: uint) { * on the provided port in the allotted timeout period, then the result will * be a `some(T)`. If not, then `none` will be returned. */ -pub fn recv_timeout<T: Copy Owned>(iotask: IoTask, +pub fn recv_timeout<T: Copy Owned>(iotask: &IoTask, msecs: uint, wait_po: oldcomm::Port<T>) -> Option<T> { @@ -183,13 +183,13 @@ mod test { #[test] fn test_gl_timer_simple_sleep_test() { - let hl_loop = uv::global_loop::get(); + let hl_loop = &uv::global_loop::get(); sleep(hl_loop, 1u); } #[test] fn test_gl_timer_sleep_stress1() { - let hl_loop = uv::global_loop::get(); + let hl_loop = &uv::global_loop::get(); for iter::repeat(50u) { sleep(hl_loop, 1u); } @@ -199,7 +199,7 @@ mod test { fn test_gl_timer_sleep_stress2() { let po = oldcomm::Port(); let ch = oldcomm::Chan(&po); - let hl_loop = uv::global_loop::get(); + let hl_loop = &uv::global_loop::get(); let repeat = 20u; let spec = { @@ -214,11 +214,12 @@ mod test { for spec.each |spec| { let (times, maxms) = *spec; + let hl_loop_clone = hl_loop.clone(); do task::spawn { use rand::*; let rng = Rng(); for iter::repeat(times) { - sleep(hl_loop, rng.next() as uint % maxms); + sleep(&hl_loop_clone, rng.next() as uint % maxms); } oldcomm::send(ch, ()); } @@ -277,12 +278,12 @@ mod test { let expected = rand::Rng().gen_str(16u); let test_po = oldcomm::Port::<~str>(); let test_ch = oldcomm::Chan(&test_po); - + let hl_loop_clone = hl_loop.clone(); do task::spawn() { - delayed_send(hl_loop, 50u, test_ch, expected); + delayed_send(&hl_loop_clone, 50u, test_ch, expected); }; - match recv_timeout(hl_loop, 1u, test_po) { + match recv_timeout(&hl_loop, 1u, test_po) { None => successes += 1, _ => failures += 1 }; diff --git a/src/libstd/uv_global_loop.rs b/src/libstd/uv_global_loop.rs index 3a2c3b7c135..8ae3e24abee 100644 --- a/src/libstd/uv_global_loop.rs +++ b/src/libstd/uv_global_loop.rs @@ -19,16 +19,16 @@ use uv_iotask::{IoTask, spawn_iotask}; use core::either::{Left, Right}; use core::libc; -use core::oldcomm::{Port, Chan, select2, listen}; -use core::private::{chan_from_global_ptr, weaken_task}; +use core::pipes::{Port, Chan, SharedChan, select2i}; +use core::private::global::{global_data_clone_create, + global_data_clone}; +use core::private::weak_task::weaken_task; use core::str; -use core::task::TaskBuilder; +use core::task::{task, SingleThreaded, spawn}; use core::task; use core::vec; - -extern mod rustrt { - unsafe fn rust_uv_get_kernel_global_chan_ptr() -> *libc::uintptr_t; -} +use core::clone::Clone; +use core::option::{Some, None}; /** * Race-free helper to get access to a global task where a libuv @@ -48,69 +48,64 @@ pub fn get() -> IoTask { #[doc(hidden)] fn get_monitor_task_gl() -> IoTask { - unsafe { - let monitor_loop_chan_ptr = - rustrt::rust_uv_get_kernel_global_chan_ptr(); - - debug!("ENTERING global_loop::get() loop chan: %?", - monitor_loop_chan_ptr); - - debug!("before priv::chan_from_global_ptr"); - type MonChan = Chan<IoTask>; - - let monitor_ch = - do chan_from_global_ptr::<MonChan>(monitor_loop_chan_ptr, - || { - task::task().sched_mode - (task::SingleThreaded) - .unlinked() - }) |msg_po| { - unsafe { - debug!("global monitor task starting"); - - // As a weak task the runtime will notify us when to exit - do weaken_task() |weak_exit_po| { - debug!("global monitor task is now weak"); - let hl_loop = spawn_loop(); - loop { - debug!("in outer_loop..."); - match select2(weak_exit_po, msg_po) { - Left(weak_exit) => { - // all normal tasks have ended, tell the - // libuv loop to tear_down, then exit - debug!("weak_exit_po recv'd msg: %?", weak_exit); - iotask::exit(hl_loop); - break; - } - Right(fetch_ch) => { - debug!("hl_loop req recv'd: %?", fetch_ch); - fetch_ch.send(hl_loop); - } - } + + type MonChan = Chan<IoTask>; + + struct GlobalIoTask(IoTask); + + impl GlobalIoTask: Clone { + fn clone(&self) -> GlobalIoTask { + GlobalIoTask((**self).clone()) + } + } + + fn key(_: GlobalIoTask) { } + + match unsafe { global_data_clone(key) } { + Some(GlobalIoTask(iotask)) => iotask, + None => { + let iotask: IoTask = spawn_loop(); + let mut installed = false; + let final_iotask = unsafe { + do global_data_clone_create(key) { + installed = true; + ~GlobalIoTask(iotask.clone()) + } + }; + if installed { + do task().unlinked().spawn() { + unsafe { + debug!("global monitor task starting"); + // As a weak task the runtime will notify us + // when to exit + do weaken_task |weak_exit_po| { + debug!("global monitor task is weak"); + weak_exit_po.recv(); + iotask::exit(&iotask); + debug!("global monitor task is unweak"); + }; + debug!("global monitor task exiting"); } - debug!("global monitor task is leaving weakend state"); - }; - debug!("global monitor task exiting"); + } + } else { + iotask::exit(&iotask); } - }; - // once we have a chan to the monitor loop, we ask it for - // the libuv loop's async handle - do listen |fetch_ch| { - monitor_ch.send(fetch_ch); - fetch_ch.recv() + match final_iotask { + GlobalIoTask(iotask) => iotask + } } } } fn spawn_loop() -> IoTask { - let builder = do task::task().add_wrapper |task_body| { + let builder = do task().add_wrapper |task_body| { fn~(move task_body) { // The I/O loop task also needs to be weak so it doesn't keep // the runtime alive unsafe { - do weaken_task |weak_exit_po| { - debug!("global libuv task is now weak %?", weak_exit_po); + do weaken_task |_| { + debug!("global libuv task is now weak"); task_body(); // We don't wait for the exit message on weak_exit_po @@ -122,6 +117,7 @@ fn spawn_loop() -> IoTask { } } }; + let builder = builder.unlinked(); spawn_iotask(move builder) } @@ -135,16 +131,18 @@ mod test { use core::iter; use core::libc; - use core::oldcomm; use core::ptr; use core::task; + use core::cast::transmute; + use core::libc::c_void; + use core::pipes::{stream, SharedChan, Chan}; extern fn simple_timer_close_cb(timer_ptr: *ll::uv_timer_t) { unsafe { let exit_ch_ptr = ll::get_data_for_uv_handle( - timer_ptr as *libc::c_void) as *oldcomm::Chan<bool>; - let exit_ch = *exit_ch_ptr; - oldcomm::send(exit_ch, true); + timer_ptr as *libc::c_void); + let exit_ch = transmute::<*c_void, ~Chan<bool>>(exit_ch_ptr); + exit_ch.send(true); log(debug, fmt!("EXIT_CH_PTR simple_timer_close_cb exit_ch_ptr: %?", exit_ch_ptr)); @@ -155,26 +153,25 @@ mod test { unsafe { log(debug, ~"in simple timer cb"); ll::timer_stop(timer_ptr); - let hl_loop = get_gl(); + let hl_loop = &get_gl(); do iotask::interact(hl_loop) |_loop_ptr| { + log(debug, ~"closing timer"); unsafe { - log(debug, ~"closing timer"); ll::close(timer_ptr, simple_timer_close_cb); - log(debug, ~"about to deref exit_ch_ptr"); - log(debug, ~"after msg sent on deref'd exit_ch"); } + log(debug, ~"about to deref exit_ch_ptr"); + log(debug, ~"after msg sent on deref'd exit_ch"); }; log(debug, ~"exiting simple timer cb"); } } - fn impl_uv_hl_simple_timer(iotask: IoTask) { + fn impl_uv_hl_simple_timer(iotask: &IoTask) { unsafe { - let exit_po = oldcomm::Port::<bool>(); - let exit_ch = oldcomm::Chan(&exit_po); - let exit_ch_ptr = ptr::addr_of(&exit_ch); + let (exit_po, exit_ch) = stream::<bool>(); + let exit_ch_ptr: *libc::c_void = transmute(~exit_ch); log(debug, fmt!("EXIT_CH_PTR newly created exit_ch_ptr: %?", - exit_ch_ptr)); + exit_ch_ptr)); let timer_handle = ll::timer_t(); let timer_ptr = ptr::addr_of(&timer_handle); do iotask::interact(iotask) |loop_ptr| { @@ -184,20 +181,22 @@ mod test { if(init_status == 0i32) { ll::set_data_for_uv_handle( timer_ptr as *libc::c_void, - exit_ch_ptr as *libc::c_void); + exit_ch_ptr); let start_status = ll::timer_start(timer_ptr, simple_timer_cb, - 1u, - 0u); - if start_status != 0 { + 1u, 0u); + if(start_status == 0i32) { + } + else { fail ~"failure on ll::timer_start()"; } - } else { + } + else { fail ~"failure on ll::timer_init()"; } } }; - oldcomm::recv(exit_po); + exit_po.recv(); log(debug, ~"global_loop timer test: msg recv on exit_po, done.."); } @@ -205,17 +204,15 @@ mod test { #[test] fn test_gl_uv_global_loop_high_level_global_timer() { - unsafe { - let hl_loop = get_gl(); - let exit_po = oldcomm::Port::<()>(); - let exit_ch = oldcomm::Chan(&exit_po); - task::spawn_sched(task::ManualThreads(1u), || { - impl_uv_hl_simple_timer(hl_loop); - oldcomm::send(exit_ch, ()); - }); + let hl_loop = &get_gl(); + let (exit_po, exit_ch) = stream::<()>(); + task::spawn_sched(task::ManualThreads(1u), || { + let hl_loop = &get_gl(); impl_uv_hl_simple_timer(hl_loop); - oldcomm::recv(exit_po); - } + exit_ch.send(()); + }); + impl_uv_hl_simple_timer(hl_loop); + exit_po.recv(); } // keeping this test ignored until some kind of stress-test-harness @@ -223,23 +220,21 @@ mod test { #[test] #[ignore] fn test_stress_gl_uv_global_loop_high_level_global_timer() { - unsafe { - let hl_loop = get_gl(); - let exit_po = oldcomm::Port::<()>(); - let exit_ch = oldcomm::Chan(&exit_po); - let cycles = 5000u; - for iter::repeat(cycles) { - task::spawn_sched(task::ManualThreads(1u), || { - impl_uv_hl_simple_timer(hl_loop); - oldcomm::send(exit_ch, ()); - }); - }; - for iter::repeat(cycles) { - oldcomm::recv(exit_po); - }; - log(debug, - ~"test_stress_gl_uv_global_loop_high_level_global_timer"+ - ~" exiting sucessfully!"); - } + let (exit_po, exit_ch) = stream::<()>(); + let exit_ch = SharedChan(exit_ch); + let cycles = 5000u; + for iter::repeat(cycles) { + let exit_ch_clone = exit_ch.clone(); + task::spawn_sched(task::ManualThreads(1u), || { + let hl_loop = &get_gl(); + impl_uv_hl_simple_timer(hl_loop); + exit_ch_clone.send(()); + }); + }; + for iter::repeat(cycles) { + exit_po.recv(); + }; + log(debug, ~"test_stress_gl_uv_global_loop_high_level_global_timer"+ + ~" exiting sucessfully!"); } } diff --git a/src/libstd/uv_iotask.rs b/src/libstd/uv_iotask.rs index 0a3d64a02a4..dc0092aadfa 100644 --- a/src/libstd/uv_iotask.rs +++ b/src/libstd/uv_iotask.rs @@ -20,7 +20,7 @@ use ll = uv_ll; use core::libc::c_void; use core::libc; -use core::oldcomm::{Port, Chan, listen}; +use core::pipes::{stream, Port, Chan, SharedChan}; use core::prelude::*; use core::ptr::addr_of; use core::task::TaskBuilder; @@ -30,22 +30,30 @@ use core::task; pub enum IoTask { IoTask_({ async_handle: *ll::uv_async_t, - op_chan: Chan<IoTaskMsg> + op_chan: SharedChan<IoTaskMsg> }) } +impl IoTask: Clone { + fn clone(&self) -> IoTask { + IoTask_({ + async_handle: self.async_handle, + op_chan: self.op_chan.clone() + }) + } +} + pub fn spawn_iotask(task: task::TaskBuilder) -> IoTask { - do listen |iotask_ch| { + let (iotask_port, iotask_chan) = stream(); - do task.sched_mode(task::SingleThreaded).spawn { - debug!("entering libuv task"); - run_loop(iotask_ch); - debug!("libuv task exiting"); - }; + do task.sched_mode(task::SingleThreaded).spawn { + debug!("entering libuv task"); + run_loop(&iotask_chan); + debug!("libuv task exiting"); + }; - iotask_ch.recv() - } + iotask_port.recv() } @@ -71,7 +79,7 @@ pub fn spawn_iotask(task: task::TaskBuilder) -> IoTask { * module. It is not safe to send the `loop_ptr` param to this callback out * via ports/chans. */ -pub unsafe fn interact(iotask: IoTask, +pub unsafe fn interact(iotask: &IoTask, cb: fn~(*c_void)) { send_msg(iotask, Interaction(move cb)); } @@ -83,7 +91,7 @@ pub unsafe fn interact(iotask: IoTask, * async handle and do a sanity check to make sure that all other handles are * closed, causing a failure otherwise. */ -pub fn exit(iotask: IoTask) { +pub fn exit(iotask: &IoTask) { unsafe { send_msg(iotask, TeardownLoop); } @@ -98,8 +106,10 @@ enum IoTaskMsg { } /// Run the loop and begin handling messages -fn run_loop(iotask_ch: Chan<IoTask>) { +fn run_loop(iotask_ch: &Chan<IoTask>) { + unsafe { + debug!("creating loop"); let loop_ptr = ll::loop_new(); // set up the special async handle we'll use to allow multi-task @@ -110,10 +120,12 @@ fn run_loop(iotask_ch: Chan<IoTask>) { // associate the async handle with the loop ll::async_init(loop_ptr, async_handle, wake_up_cb); + let (msg_po, msg_ch) = stream::<IoTaskMsg>(); + // initialize our loop data and store it in the loop - let data = IoTaskLoopData { + let data: IoTaskLoopData = IoTaskLoopData { async_handle: async_handle, - msg_po: Port() + msg_po: msg_po }; ll::set_data_for_uv_handle(async_handle, addr_of(&data)); @@ -121,7 +133,7 @@ fn run_loop(iotask_ch: Chan<IoTask>) { // while we dwell in the I/O loop let iotask = IoTask_({ async_handle: async_handle, - op_chan: data.msg_po.chan() + op_chan: SharedChan(msg_ch) }); iotask_ch.send(iotask); @@ -139,9 +151,10 @@ struct IoTaskLoopData { msg_po: Port<IoTaskMsg>, } -fn send_msg(iotask: IoTask, msg: IoTaskMsg) { +fn send_msg(iotask: &IoTask, + msg: IoTaskMsg) { + iotask.op_chan.send(move msg); unsafe { - iotask.op_chan.send(move msg); ll::async_send(iotask.async_handle); } } @@ -149,19 +162,20 @@ fn send_msg(iotask: IoTask, msg: IoTaskMsg) { /// Dispatch all pending messages extern fn wake_up_cb(async_handle: *ll::uv_async_t, status: int) { - unsafe { - log(debug, fmt!("wake_up_cb extern.. handle: %? status: %?", - async_handle, status)); + log(debug, fmt!("wake_up_cb extern.. handle: %? status: %?", + async_handle, status)); + + unsafe { let loop_ptr = ll::get_loop_for_uv_handle(async_handle); - let data = ll::get_data_for_uv_handle(async_handle) - as *IoTaskLoopData; - let msg_po = (*data).msg_po; + let data = + ll::get_data_for_uv_handle(async_handle) as *IoTaskLoopData; + let msg_po = &(*data).msg_po; while msg_po.peek() { match msg_po.recv() { - Interaction(ref cb) => (*cb)(loop_ptr), - TeardownLoop => begin_teardown(data) + Interaction(ref cb) => (*cb)(loop_ptr), + TeardownLoop => begin_teardown(data) } } } @@ -216,27 +230,32 @@ mod test { } struct AhData { iotask: IoTask, - exit_ch: oldcomm::Chan<()>, + exit_ch: oldcomm::Chan<()> } - fn impl_uv_iotask_async(iotask: IoTask) { + fn impl_uv_iotask_async(iotask: &IoTask) { unsafe { let async_handle = ll::async_t(); let ah_ptr = ptr::addr_of(&async_handle); let exit_po = oldcomm::Port::<()>(); let exit_ch = oldcomm::Chan(&exit_po); - let ah_data = { - iotask: iotask, + let ah_data = AhData { + iotask: iotask.clone(), exit_ch: exit_ch }; - let ah_data_ptr = ptr::addr_of(&ah_data); + let ah_data_ptr: *AhData = unsafe { + ptr::to_unsafe_ptr(&ah_data) + }; + debug!("about to interact"); do interact(iotask) |loop_ptr| { unsafe { + debug!("interacting"); ll::async_init(loop_ptr, ah_ptr, async_handle_cb); - ll::set_data_for_uv_handle(ah_ptr, - ah_data_ptr as *libc::c_void); + ll::set_data_for_uv_handle( + ah_ptr, ah_data_ptr as *libc::c_void); ll::async_send(ah_ptr); } }; + debug!("waiting for async close"); oldcomm::recv(exit_po); } } @@ -244,13 +263,13 @@ mod test { // this fn documents the bear minimum neccesary to roll your own // high_level_loop unsafe fn spawn_test_loop(exit_ch: oldcomm::Chan<()>) -> IoTask { - let iotask_port = oldcomm::Port::<IoTask>(); - let iotask_ch = oldcomm::Chan(&iotask_port); + let (iotask_port, iotask_ch) = stream::<IoTask>(); do task::spawn_sched(task::ManualThreads(1u)) { - run_loop(iotask_ch); + debug!("about to run a test loop"); + run_loop(&iotask_ch); exit_ch.send(()); }; - return oldcomm::recv(iotask_port); + return iotask_port.recv(); } extern fn lifetime_handle_close(handle: *libc::c_void) { @@ -270,23 +289,30 @@ mod test { unsafe { let exit_po = oldcomm::Port::<()>(); let exit_ch = oldcomm::Chan(&exit_po); - let iotask = spawn_test_loop(exit_ch); + let iotask = &spawn_test_loop(exit_ch); + + debug!("spawned iotask"); // using this handle to manage the lifetime of the - // high_level_loop, as it will exit the first time one of the - // impl_uv_hl_async() is cleaned up with no one ref'd handles on - // the loop (Which can happen under race-condition type - // situations.. this ensures that the loop lives until, at least, - // all of the impl_uv_hl_async() runs have been called, at least. + // high_level_loop, as it will exit the first time one of + // the impl_uv_hl_async() is cleaned up with no one ref'd + // handles on the loop (Which can happen under + // race-condition type situations.. this ensures that the + // loop lives until, at least, all of the + // impl_uv_hl_async() runs have been called, at least. let work_exit_po = oldcomm::Port::<()>(); let work_exit_ch = oldcomm::Chan(&work_exit_po); for iter::repeat(7u) { + let iotask_clone = iotask.clone(); do task::spawn_sched(task::ManualThreads(1u)) { - impl_uv_iotask_async(iotask); + debug!("async"); + impl_uv_iotask_async(&iotask_clone); + debug!("done async"); oldcomm::send(work_exit_ch, ()); }; }; for iter::repeat(7u) { + debug!("waiting"); oldcomm::recv(work_exit_po); }; log(debug, ~"sending teardown_loop msg.."); diff --git a/src/rt/rust.cpp b/src/rt/rust.cpp index f21a7441640..803da32cbc8 100644 --- a/src/rt/rust.cpp +++ b/src/rt/rust.cpp @@ -43,8 +43,8 @@ rust_start(uintptr_t main_fn, int argc, char **argv, void* crate_map) { rust_kernel *kernel = new rust_kernel(env); - // Create the main scheduler and the main task - rust_sched_id sched_id = kernel->create_scheduler(env->num_sched_threads); + // Create the main task + rust_sched_id sched_id = kernel->main_sched_id(); rust_scheduler *sched = kernel->get_scheduler_by_id(sched_id); assert(sched != NULL); rust_task *root_task = sched->create_task(NULL, "main"); diff --git a/src/rt/rust_builtin.cpp b/src/rt/rust_builtin.cpp index de69272aca1..4fcfc11b325 100644 --- a/src/rt/rust_builtin.cpp +++ b/src/rt/rust_builtin.cpp @@ -652,7 +652,10 @@ new_task_common(rust_scheduler *sched, rust_task *parent) { extern "C" CDECL rust_task* new_task() { rust_task *task = rust_get_current_task(); - return new_task_common(task->sched, task); + rust_sched_id sched_id = task->kernel->main_sched_id(); + rust_scheduler *sched = task->kernel->get_scheduler_by_id(sched_id); + assert(sched != NULL && "should always have a main scheduler"); + return new_task_common(sched, task); } extern "C" CDECL rust_task* @@ -855,24 +858,6 @@ rust_compare_and_swap_ptr(intptr_t *address, return sync::compare_and_swap(address, oldval, newval); } -extern "C" CDECL void -rust_task_weaken(rust_port_id chan) { - rust_task *task = rust_get_current_task(); - task->kernel->weaken_task(chan); -} - -extern "C" CDECL void -rust_task_unweaken(rust_port_id chan) { - rust_task *task = rust_get_current_task(); - task->kernel->unweaken_task(chan); -} - -extern "C" CDECL uintptr_t* -rust_global_env_chan_ptr() { - rust_task *task = rust_get_current_task(); - return task->kernel->get_global_env_chan(); -} - extern "C" void rust_task_inhibit_kill(rust_task *task) { task->inhibit_kill(); @@ -1023,6 +1008,29 @@ rust_raw_thread_join_delete(raw_thread *thread) { delete thread; } +extern "C" void +rust_register_exit_function(spawn_fn runner, fn_env_pair *f) { + rust_task *task = rust_get_current_task(); + task->kernel->register_exit_function(runner, f); +} + +extern "C" void * +rust_get_global_data_ptr() { + rust_task *task = rust_get_current_task(); + return &task->kernel->global_data; +} + +extern "C" void +rust_inc_weak_task_count() { + rust_task *task = rust_get_current_task(); + task->kernel->inc_weak_task_count(); +} + +extern "C" void +rust_dec_weak_task_count() { + rust_task *task = rust_get_current_task(); + task->kernel->dec_weak_task_count(); +} // // Local Variables: diff --git a/src/rt/rust_kernel.cpp b/src/rt/rust_kernel.cpp index 8871d133ea1..c365f3cca1e 100644 --- a/src/rt/rust_kernel.cpp +++ b/src/rt/rust_kernel.cpp @@ -30,21 +30,29 @@ rust_kernel::rust_kernel(rust_env *env) : rval(0), max_sched_id(1), killed(false), + already_exiting(false), sched_reaper(this), osmain_driver(NULL), non_weak_tasks(0), - global_loop_chan(0), - global_env_chan(0), - env(env) - + at_exit_runner(NULL), + at_exit_started(false), + env(env), + global_data(0) { - // Create the single threaded scheduler that will run on the platform's // main thread - rust_manual_sched_launcher_factory *launchfac = + rust_manual_sched_launcher_factory *osmain_launchfac = new rust_manual_sched_launcher_factory(); - osmain_scheduler = create_scheduler(launchfac, 1, false); - osmain_driver = launchfac->get_driver(); + osmain_scheduler = create_scheduler(osmain_launchfac, 1, false); + osmain_driver = osmain_launchfac->get_driver(); + + // Create the primary scheduler + rust_thread_sched_launcher_factory *main_launchfac = + new rust_thread_sched_launcher_factory(); + main_scheduler = create_scheduler(main_launchfac, + env->num_sched_threads, + false); + sched_reaper.start(); } @@ -103,15 +111,22 @@ rust_kernel::create_scheduler(rust_sched_launcher_factory *launchfac, { scoped_lock with(sched_lock); + /*if (sched_table.size() == 2) { + // The main and OS main schedulers may not exit while there are + // other schedulers + KLOG_("Disallowing main scheduler to exit"); + rust_scheduler *main_sched = + get_scheduler_by_id_nolock(main_scheduler); + assert(main_sched != NULL); + main_sched->disallow_exit(); + } if (sched_table.size() == 1) { - // The OS main scheduler may not exit while there are other - // schedulers KLOG_("Disallowing osmain scheduler to exit"); - rust_scheduler *sched = + rust_scheduler *osmain_sched = get_scheduler_by_id_nolock(osmain_scheduler); - assert(sched != NULL); - sched->disallow_exit(); - } + assert(osmain_sched != NULL); + osmain_sched->disallow_exit(); + }*/ id = max_sched_id++; assert(id != INTPTR_MAX && "Hit the maximum scheduler id"); @@ -175,14 +190,21 @@ rust_kernel::wait_for_schedulers() sched_table.erase(iter); sched->join_task_threads(); sched->deref(); + /*if (sched_table.size() == 2) { + KLOG_("Allowing main scheduler to exit"); + // It's only the main schedulers left. Tell them to exit + rust_scheduler *main_sched = + get_scheduler_by_id_nolock(main_scheduler); + assert(main_sched != NULL); + main_sched->allow_exit(); + } if (sched_table.size() == 1) { KLOG_("Allowing osmain scheduler to exit"); - // It's only the osmain scheduler left. Tell it to exit - rust_scheduler *sched = + rust_scheduler *osmain_sched = get_scheduler_by_id_nolock(osmain_scheduler); - assert(sched != NULL); - sched->allow_exit(); - } + assert(osmain_sched != NULL); + osmain_sched->allow_exit(); + }*/ } if (!sched_table.empty()) { sched_lock.wait(); @@ -319,59 +341,63 @@ rust_kernel::register_task() { } void +rust_kernel::allow_scheduler_exit() { + scoped_lock with(sched_lock); + + KLOG_("Allowing main scheduler to exit"); + // It's only the main schedulers left. Tell them to exit + rust_scheduler *main_sched = + get_scheduler_by_id_nolock(main_scheduler); + assert(main_sched != NULL); + main_sched->allow_exit(); + + KLOG_("Allowing osmain scheduler to exit"); + rust_scheduler *osmain_sched = + get_scheduler_by_id_nolock(osmain_scheduler); + assert(osmain_sched != NULL); + osmain_sched->allow_exit(); +} + +void rust_kernel::unregister_task() { KLOG_("Unregistering task"); uintptr_t new_non_weak_tasks = sync::decrement(non_weak_tasks); KLOG_("New non-weak tasks %" PRIdPTR, new_non_weak_tasks); if (new_non_weak_tasks == 0) { - end_weak_tasks(); + begin_shutdown(); } } void -rust_kernel::weaken_task(rust_port_id chan) { - { - scoped_lock with(weak_task_lock); - KLOG_("Weakening task with channel %" PRIdPTR, chan); - weak_task_chans.push_back(chan); - } +rust_kernel::inc_weak_task_count() { uintptr_t new_non_weak_tasks = sync::decrement(non_weak_tasks); KLOG_("New non-weak tasks %" PRIdPTR, new_non_weak_tasks); if (new_non_weak_tasks == 0) { - end_weak_tasks(); + begin_shutdown(); } } void -rust_kernel::unweaken_task(rust_port_id chan) { +rust_kernel::dec_weak_task_count() { uintptr_t new_non_weak_tasks = sync::increment(non_weak_tasks); KLOG_("New non-weak tasks %" PRIdPTR, new_non_weak_tasks); - { - scoped_lock with(weak_task_lock); - KLOG_("Unweakening task with channel %" PRIdPTR, chan); - std::vector<rust_port_id>::iterator iter = - std::find(weak_task_chans.begin(), weak_task_chans.end(), chan); - if (iter != weak_task_chans.end()) { - weak_task_chans.erase(iter); - } - } } void -rust_kernel::end_weak_tasks() { - std::vector<rust_port_id> chancopies; +rust_kernel::begin_shutdown() { { - scoped_lock with(weak_task_lock); - chancopies = weak_task_chans; - weak_task_chans.clear(); - } - while (!chancopies.empty()) { - rust_port_id chan = chancopies.back(); - chancopies.pop_back(); - KLOG_("Notifying weak task " PRIdPTR, chan); - uintptr_t token = 0; - send_to_port(chan, &token); + scoped_lock with(sched_lock); + // FIXME #4410: This shouldn't be necessary, but because of + // unweaken_task this may end up getting called multiple times. + if (already_exiting) { + return; + } else { + already_exiting = true; + } } + + run_exit_functions(); + allow_scheduler_exit(); } bool @@ -389,6 +415,47 @@ rust_kernel::send_to_port(rust_port_id chan, void *sptr) { } } +void +rust_kernel::register_exit_function(spawn_fn runner, fn_env_pair *f) { + scoped_lock with(at_exit_lock); + + assert(!at_exit_started && "registering at_exit function after exit"); + + if (at_exit_runner) { + assert(runner == at_exit_runner + && "there can be only one at_exit_runner"); + } + + at_exit_runner = runner; + at_exit_fns.push_back(f); +} + +void +rust_kernel::run_exit_functions() { + rust_task *task; + + { + scoped_lock with(at_exit_lock); + + assert(!at_exit_started && "running exit functions twice?"); + + at_exit_started = true; + + if (at_exit_runner == NULL) { + return; + } + + rust_scheduler *sched = get_scheduler_by_id(main_sched_id()); + assert(sched); + task = sched->create_task(NULL, "at_exit"); + + final_exit_fns.count = at_exit_fns.size(); + final_exit_fns.start = at_exit_fns.data(); + } + + task->start(at_exit_runner, NULL, &final_exit_fns); +} + // // Local Variables: // mode: C++ diff --git a/src/rt/rust_kernel.h b/src/rt/rust_kernel.h index 48522b57d5c..a7c6249e3db 100644 --- a/src/rt/rust_kernel.h +++ b/src/rt/rust_kernel.h @@ -49,6 +49,7 @@ #include "memory_region.h" #include "rust_log.h" #include "rust_sched_reaper.h" +#include "rust_type.h" #include "util/hash_map.h" class rust_scheduler; @@ -65,6 +66,13 @@ typedef intptr_t rust_port_id; typedef std::map<rust_sched_id, rust_scheduler*> sched_map; +// This is defined as a struct only because we need a single pointer to pass +// to the Rust function that runs the at_exit functions +struct exit_functions { + size_t count; + fn_env_pair **start; +}; + class rust_kernel { memory_region _region; rust_log _log; @@ -81,7 +89,8 @@ class rust_kernel { lock_and_signal rval_lock; int rval; - // Protects max_sched_id and sched_table, join_list, killed + // Protects max_sched_id and sched_table, join_list, killed, + // already_exiting lock_and_signal sched_lock; // The next scheduler id rust_sched_id max_sched_id; @@ -94,8 +103,13 @@ class rust_kernel { // task group fails). This propagates to all new schedulers and tasks // created after it is set. bool killed; + bool already_exiting; + rust_sched_reaper sched_reaper; + + // The primary scheduler + rust_sched_id main_scheduler; // The single-threaded scheduler that uses the main thread rust_sched_id osmain_scheduler; // Runs the single-threaded scheduler that executes tasks @@ -104,21 +118,22 @@ class rust_kernel { // An atomically updated count of the live, 'non-weak' tasks uintptr_t non_weak_tasks; - // Protects weak_task_chans - lock_and_signal weak_task_lock; - // A list of weak tasks that need to be told when to exit - std::vector<rust_port_id> weak_task_chans; rust_scheduler* get_scheduler_by_id_nolock(rust_sched_id id); - void end_weak_tasks(); + void allow_scheduler_exit(); + void begin_shutdown(); + + lock_and_signal at_exit_lock; + spawn_fn at_exit_runner; + bool at_exit_started; + std::vector<fn_env_pair*> at_exit_fns; + exit_functions final_exit_fns; - // Used to communicate with the process-side, global libuv loop - uintptr_t global_loop_chan; - // Used to serialize access to getenv/setenv - uintptr_t global_env_chan; + void run_exit_functions(); public: struct rust_env *env; + uintptr_t global_data; rust_kernel(rust_env *env); @@ -154,17 +169,17 @@ public: void set_exit_status(int code); + rust_sched_id main_sched_id() { return main_scheduler; } rust_sched_id osmain_sched_id() { return osmain_scheduler; } void register_task(); void unregister_task(); - void weaken_task(rust_port_id chan); - void unweaken_task(rust_port_id chan); + void inc_weak_task_count(); + void dec_weak_task_count(); bool send_to_port(rust_port_id chan, void *sptr); - uintptr_t* get_global_loop() { return &global_loop_chan; } - uintptr_t* get_global_env_chan() { return &global_env_chan; } + void register_exit_function(spawn_fn runner, fn_env_pair *f); }; template <typename T> struct kernel_owned { diff --git a/src/rt/rust_uv.cpp b/src/rt/rust_uv.cpp index 53d8177bcf8..2dc70088628 100644 --- a/src/rt/rust_uv.cpp +++ b/src/rt/rust_uv.cpp @@ -513,15 +513,6 @@ rust_uv_ip6_port(struct sockaddr_in6* src) { return ntohs(src->sin6_port); } -extern "C" uintptr_t* -rust_uv_get_kernel_global_chan_ptr() { - uintptr_t* result = rust_get_current_task()->kernel->get_global_loop(); - rust_task* task = rust_get_current_task(); - LOG(task, stdlib, "global loop: %lu", (unsigned long int)result); - LOG(task, stdlib,"global loop val: %lu", (unsigned long int)*result); - return result; -} - extern "C" void* rust_uv_current_kernel_malloc(size_t size) { return current_kernel_malloc(size, "rust_uv_current_kernel_malloc"); diff --git a/src/rt/rustrt.def.in b/src/rt/rustrt.def.in index cce4e411e02..eb9db6c1d57 100644 --- a/src/rt/rustrt.def.in +++ b/src/rt/rustrt.def.in @@ -61,8 +61,6 @@ rust_task_yield rust_task_is_unwinding rust_get_task rust_get_stack_segment -rust_task_weaken -rust_task_unweaken rust_log_str start_task vec_reserve_shared_actual @@ -158,7 +156,6 @@ rust_uv_get_data_for_req rust_uv_set_data_for_req rust_uv_get_base_from_buf rust_uv_get_len_from_buf -rust_uv_get_kernel_global_chan_ptr rust_uv_current_kernel_malloc rust_uv_current_kernel_free rust_uv_getaddrinfo @@ -174,7 +171,6 @@ rust_dbg_do_nothing rust_dbg_breakpoint rust_osmain_sched_id rust_compare_and_swap_ptr -rust_global_env_chan_ptr rust_port_take rust_port_drop rust_port_task @@ -210,3 +206,7 @@ linenoiseHistorySave linenoiseHistoryLoad rust_raw_thread_start rust_raw_thread_join_delete +rust_register_exit_function +rust_get_global_data_ptr +rust_inc_weak_task_count +rust_dec_weak_task_count \ No newline at end of file diff --git a/src/test/run-pass/pipe-detect-term.rs b/src/test/run-pass/pipe-detect-term.rs index c2d4be04191..10b13d8757f 100644 --- a/src/test/run-pass/pipe-detect-term.rs +++ b/src/test/run-pass/pipe-detect-term.rs @@ -27,7 +27,7 @@ proto! oneshot ( ) fn main() { - let iotask = uv::global_loop::get(); + let iotask = &uv::global_loop::get(); pipes::spawn_service(oneshot::init, |p| { match try_recv(move p) { diff --git a/src/test/run-pass/pipe-select.rs b/src/test/run-pass/pipe-select.rs index e71d0c4931d..e138f2562aa 100644 --- a/src/test/run-pass/pipe-select.rs +++ b/src/test/run-pass/pipe-select.rs @@ -35,7 +35,7 @@ fn main() { use oneshot::client::*; use stream::client::*; - let iotask = uv::global_loop::get(); + let iotask = &uv::global_loop::get(); let c = pipes::spawn_service(stream::init, |p| { error!("waiting for pipes"); diff --git a/src/test/run-pass/pipe-sleep.rs b/src/test/run-pass/pipe-sleep.rs index 4a6e7b4ce36..ae7e4e7fb0c 100644 --- a/src/test/run-pass/pipe-sleep.rs +++ b/src/test/run-pass/pipe-sleep.rs @@ -27,7 +27,7 @@ fn main() { let c = pipes::spawn_service(oneshot::init, |p| { recv(move p); }); - let iotask = uv::global_loop::get(); + let iotask = &uv::global_loop::get(); sleep(iotask, 500); signal(move c); |
