From 090b247056a9dd2d4d4a32c631fe2f0ddd3e744d Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Tue, 8 Jan 2013 19:46:12 -0800 Subject: Spawn new tasks onto the primary scheduler by default. #3760 --- src/rt/rust_kernel.cpp | 91 ++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 74 insertions(+), 17 deletions(-) (limited to 'src/rt/rust_kernel.cpp') diff --git a/src/rt/rust_kernel.cpp b/src/rt/rust_kernel.cpp index 8871d133ea1..cc98b474ee3 100644 --- a/src/rt/rust_kernel.cpp +++ b/src/rt/rust_kernel.cpp @@ -30,6 +30,7 @@ rust_kernel::rust_kernel(rust_env *env) : rval(0), max_sched_id(1), killed(false), + already_exiting(false), sched_reaper(this), osmain_driver(NULL), non_weak_tasks(0), @@ -38,13 +39,20 @@ rust_kernel::rust_kernel(rust_env *env) : env(env) { - // Create the single threaded scheduler that will run on the platform's // main thread - rust_manual_sched_launcher_factory *launchfac = + rust_manual_sched_launcher_factory *osmain_launchfac = new rust_manual_sched_launcher_factory(); - osmain_scheduler = create_scheduler(launchfac, 1, false); - osmain_driver = launchfac->get_driver(); + osmain_scheduler = create_scheduler(osmain_launchfac, 1, false); + osmain_driver = osmain_launchfac->get_driver(); + + // Create the primary scheduler + rust_thread_sched_launcher_factory *main_launchfac = + new rust_thread_sched_launcher_factory(); + main_scheduler = create_scheduler(main_launchfac, + env->num_sched_threads, + false); + sched_reaper.start(); } @@ -103,15 +111,22 @@ rust_kernel::create_scheduler(rust_sched_launcher_factory *launchfac, { scoped_lock with(sched_lock); + /*if (sched_table.size() == 2) { + // The main and OS main schedulers may not exit while there are + // other schedulers + KLOG_("Disallowing main scheduler to exit"); + rust_scheduler *main_sched = + get_scheduler_by_id_nolock(main_scheduler); + assert(main_sched != NULL); + main_sched->disallow_exit(); + } if (sched_table.size() == 1) { - // The OS main scheduler may not exit while there are other - // schedulers KLOG_("Disallowing osmain scheduler to exit"); - rust_scheduler *sched = + rust_scheduler *osmain_sched = get_scheduler_by_id_nolock(osmain_scheduler); - assert(sched != NULL); - sched->disallow_exit(); - } + assert(osmain_sched != NULL); + osmain_sched->disallow_exit(); + }*/ id = max_sched_id++; assert(id != INTPTR_MAX && "Hit the maximum scheduler id"); @@ -175,14 +190,21 @@ rust_kernel::wait_for_schedulers() sched_table.erase(iter); sched->join_task_threads(); sched->deref(); + /*if (sched_table.size() == 2) { + KLOG_("Allowing main scheduler to exit"); + // It's only the main schedulers left. Tell them to exit + rust_scheduler *main_sched = + get_scheduler_by_id_nolock(main_scheduler); + assert(main_sched != NULL); + main_sched->allow_exit(); + } if (sched_table.size() == 1) { KLOG_("Allowing osmain scheduler to exit"); - // It's only the osmain scheduler left. Tell it to exit - rust_scheduler *sched = + rust_scheduler *osmain_sched = get_scheduler_by_id_nolock(osmain_scheduler); - assert(sched != NULL); - sched->allow_exit(); - } + assert(osmain_sched != NULL); + osmain_sched->allow_exit(); + }*/ } if (!sched_table.empty()) { sched_lock.wait(); @@ -318,13 +340,31 @@ rust_kernel::register_task() { KLOG_("New non-weak tasks %" PRIdPTR, new_non_weak_tasks); } +void +rust_kernel::allow_scheduler_exit() { + scoped_lock with(sched_lock); + + KLOG_("Allowing main scheduler to exit"); + // It's only the main schedulers left. Tell them to exit + rust_scheduler *main_sched = + get_scheduler_by_id_nolock(main_scheduler); + assert(main_sched != NULL); + main_sched->allow_exit(); + + KLOG_("Allowing osmain scheduler to exit"); + rust_scheduler *osmain_sched = + get_scheduler_by_id_nolock(osmain_scheduler); + assert(osmain_sched != NULL); + osmain_sched->allow_exit(); +} + void rust_kernel::unregister_task() { KLOG_("Unregistering task"); uintptr_t new_non_weak_tasks = sync::decrement(non_weak_tasks); KLOG_("New non-weak tasks %" PRIdPTR, new_non_weak_tasks); if (new_non_weak_tasks == 0) { - end_weak_tasks(); + begin_shutdown(); } } @@ -338,7 +378,7 @@ rust_kernel::weaken_task(rust_port_id chan) { uintptr_t new_non_weak_tasks = sync::decrement(non_weak_tasks); KLOG_("New non-weak tasks %" PRIdPTR, new_non_weak_tasks); if (new_non_weak_tasks == 0) { - end_weak_tasks(); + begin_shutdown(); } } @@ -374,6 +414,23 @@ rust_kernel::end_weak_tasks() { } } +void +rust_kernel::begin_shutdown() { + { + scoped_lock with(sched_lock); + // FIXME #4410: This shouldn't be necessary, but because of + // unweaken_task this may end up getting called multiple times. + if (already_exiting) { + return; + } else { + already_exiting = true; + } + } + + allow_scheduler_exit(); + end_weak_tasks(); +} + bool rust_kernel::send_to_port(rust_port_id chan, void *sptr) { KLOG_("rust_port_id*_send port: 0x%" PRIxPTR, (uintptr_t) chan); -- cgit 1.4.1-3-g733a5 From ac435af73a0009daf22164ee2f081a7c98ca844c Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Fri, 11 Jan 2013 15:55:14 -0800 Subject: Add at_exit function #4450 --- src/libcore/private.rs | 3 ++ src/libcore/private/at_exit.rs | 86 ++++++++++++++++++++++++++++++++++++++++++ src/rt/rust_builtin.cpp | 5 +++ src/rt/rust_kernel.cpp | 44 +++++++++++++++++++++ src/rt/rust_kernel.h | 18 +++++++++ src/rt/rustrt.def.in | 1 + 6 files changed, 157 insertions(+) create mode 100644 src/libcore/private/at_exit.rs (limited to 'src/rt/rust_kernel.cpp') diff --git a/src/libcore/private.rs b/src/libcore/private.rs index d3002ba9316..48489dab488 100644 --- a/src/libcore/private.rs +++ b/src/libcore/private.rs @@ -28,6 +28,9 @@ use task; use task::{TaskBuilder, atomically}; use uint; +#[path = "private/at_exit.rs"] +pub mod at_exit; + extern mod rustrt { #[legacy_exports]; unsafe fn rust_task_weaken(ch: rust_port_id); diff --git a/src/libcore/private/at_exit.rs b/src/libcore/private/at_exit.rs new file mode 100644 index 00000000000..7ac252ea102 --- /dev/null +++ b/src/libcore/private/at_exit.rs @@ -0,0 +1,86 @@ +use sys; +use cast; +use ptr; +use task; +use uint; +use vec; +use rand; +use libc::{c_void, size_t}; + +/** +Register a function to be run during runtime shutdown. + +After all non-weak tasks have exited, registered exit functions will +execute, in random order, on the primary scheduler. Each function runs +in its own unsupervised task. +*/ +pub fn at_exit(f: ~fn()) unsafe { + let runner: &fn(*ExitFunctions) = exit_runner; + let runner_pair: sys::Closure = cast::transmute(runner); + let runner_ptr = runner_pair.code; + let runner_ptr = cast::transmute(runner_ptr); + rustrt::rust_register_exit_function(runner_ptr, ~f); +} + +// NB: The double pointer indirection here is because ~fn() is a fat +// pointer and due to FFI problems I am more comfortable making the +// interface use a normal pointer +extern mod rustrt { + fn rust_register_exit_function(runner: *c_void, f: ~~fn()); +} + +struct ExitFunctions { + // The number of exit functions + count: size_t, + // The buffer of exit functions + start: *~~fn() +} + +fn exit_runner(exit_fns: *ExitFunctions) unsafe { + let exit_fns = &*exit_fns; + let count = (*exit_fns).count; + let start = (*exit_fns).start; + + // NB: from_buf memcpys from the source, which will + // give us ownership of the array of functions + let mut exit_fns_vec = vec::from_buf(start, count as uint); + // Let's not make any promises about execution order + rand::Rng().shuffle_mut(exit_fns_vec); + + debug!("running %u exit functions", exit_fns_vec.len()); + + while exit_fns_vec.is_not_empty() { + match exit_fns_vec.pop() { + ~f => { + task::task().supervised().spawn(f); + } + } + } +} + +#[abi = "rust-intrinsic"] +pub extern mod rusti { + fn move_val_init(dst: &mut T, -src: T); + fn init() -> T; +} + +#[test] +fn test_at_exit() { + let i = 10; + do at_exit { + debug!("at_exit1"); + assert i == 10; + } +} + +#[test] +fn test_at_exit_many() { + let i = 10; + for uint::range(20, 100) |j| { + do at_exit { + debug!("at_exit2"); + assert i == 10; + assert j > i; + } + } +} \ No newline at end of file diff --git a/src/rt/rust_builtin.cpp b/src/rt/rust_builtin.cpp index cbc58e85db6..a37aea13e40 100644 --- a/src/rt/rust_builtin.cpp +++ b/src/rt/rust_builtin.cpp @@ -1026,6 +1026,11 @@ rust_raw_thread_join_delete(raw_thread *thread) { delete thread; } +extern "C" void +rust_register_exit_function(spawn_fn runner, fn_env_pair *f) { + rust_task *task = rust_get_current_task(); + task->kernel->register_exit_function(runner, f); +} // // Local Variables: diff --git a/src/rt/rust_kernel.cpp b/src/rt/rust_kernel.cpp index cc98b474ee3..3042b006a92 100644 --- a/src/rt/rust_kernel.cpp +++ b/src/rt/rust_kernel.cpp @@ -36,6 +36,8 @@ rust_kernel::rust_kernel(rust_env *env) : non_weak_tasks(0), global_loop_chan(0), global_env_chan(0), + at_exit_runner(NULL), + at_exit_started(false), env(env) { @@ -427,6 +429,7 @@ rust_kernel::begin_shutdown() { } } + run_exit_functions(); allow_scheduler_exit(); end_weak_tasks(); } @@ -446,6 +449,47 @@ rust_kernel::send_to_port(rust_port_id chan, void *sptr) { } } +void +rust_kernel::register_exit_function(spawn_fn runner, fn_env_pair *f) { + scoped_lock with(at_exit_lock); + + assert(!at_exit_started && "registering at_exit function after exit"); + + if (at_exit_runner) { + assert(runner == at_exit_runner + && "there can be only one at_exit_runner"); + } + + at_exit_runner = runner; + at_exit_fns.push_back(f); +} + +void +rust_kernel::run_exit_functions() { + rust_task *task; + + { + scoped_lock with(at_exit_lock); + + assert(!at_exit_started && "running exit functions twice?"); + + at_exit_started = true; + + if (at_exit_runner == NULL) { + return; + } + + rust_scheduler *sched = get_scheduler_by_id(main_sched_id()); + assert(sched); + task = sched->create_task(NULL, "at_exit"); + + final_exit_fns.count = at_exit_fns.size(); + final_exit_fns.start = at_exit_fns.data(); + } + + task->start(at_exit_runner, NULL, &final_exit_fns); +} + // // Local Variables: // mode: C++ diff --git a/src/rt/rust_kernel.h b/src/rt/rust_kernel.h index 13fd8934172..b1548e92cdb 100644 --- a/src/rt/rust_kernel.h +++ b/src/rt/rust_kernel.h @@ -50,6 +50,7 @@ #include "memory_region.h" #include "rust_log.h" #include "rust_sched_reaper.h" +#include "rust_type.h" #include "util/hash_map.h" class rust_scheduler; @@ -66,6 +67,13 @@ typedef intptr_t rust_port_id; typedef std::map sched_map; +// This is defined as a struct only because we need a single pointer to pass +// to the Rust function that runs the at_exit functions +struct exit_functions { + size_t count; + fn_env_pair **start; +}; + class rust_kernel { memory_region _region; rust_log _log; @@ -126,6 +134,14 @@ class rust_kernel { // Used to serialize access to getenv/setenv uintptr_t global_env_chan; + lock_and_signal at_exit_lock; + spawn_fn at_exit_runner; + bool at_exit_started; + std::vector at_exit_fns; + exit_functions final_exit_fns; + + void run_exit_functions(); + public: struct rust_env *env; @@ -175,6 +191,8 @@ public: uintptr_t* get_global_loop() { return &global_loop_chan; } uintptr_t* get_global_env_chan() { return &global_env_chan; } + + void register_exit_function(spawn_fn runner, fn_env_pair *f); }; template struct kernel_owned { diff --git a/src/rt/rustrt.def.in b/src/rt/rustrt.def.in index cce4e411e02..719505079e6 100644 --- a/src/rt/rustrt.def.in +++ b/src/rt/rustrt.def.in @@ -210,3 +210,4 @@ linenoiseHistorySave linenoiseHistoryLoad rust_raw_thread_start rust_raw_thread_join_delete +rust_register_exit_function -- cgit 1.4.1-3-g733a5 From db1abbec4ca9f18a224441c483cf23bb4f8361fd Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Sat, 12 Jan 2013 23:27:46 -0800 Subject: core: Add private global data interface. #3915 --- src/libcore/private.rs | 8 ++ src/libcore/private/global.rs | 257 ++++++++++++++++++++++++++++++++++++++++++ src/rt/rust_builtin.cpp | 6 + src/rt/rust_kernel.cpp | 4 +- src/rt/rust_kernel.h | 1 + src/rt/rustrt.def.in | 1 + 6 files changed, 275 insertions(+), 2 deletions(-) create mode 100644 src/libcore/private/global.rs (limited to 'src/rt/rust_kernel.cpp') diff --git a/src/libcore/private.rs b/src/libcore/private.rs index 48489dab488..ef25cb52e8b 100644 --- a/src/libcore/private.rs +++ b/src/libcore/private.rs @@ -30,6 +30,8 @@ use uint; #[path = "private/at_exit.rs"] pub mod at_exit; +#[path = "private/global.rs"] +pub mod global; extern mod rustrt { #[legacy_exports]; @@ -522,6 +524,12 @@ pub unsafe fn clone_shared_mutable_state(rc: &SharedMutableState) ArcDestruct((*rc).data) } +impl SharedMutableState: Clone { + fn clone(&self) -> SharedMutableState unsafe { + clone_shared_mutable_state(self) + } +} + /****************************************************************************/ #[allow(non_camel_case_types)] // runtime type diff --git a/src/libcore/private/global.rs b/src/libcore/private/global.rs new file mode 100644 index 00000000000..8c1500353ae --- /dev/null +++ b/src/libcore/private/global.rs @@ -0,0 +1,257 @@ +/*! +Global data + +An interface for creating and retrieving values with global +(per-runtime) scope. + +Global values are stored in a map and protected by a single global +mutex. Operations are provided for accessing and cloning the value +under the mutex. + +Because all globals go through a single mutex, they should be used +sparingly. The interface is intended to be used with clonable, +atomically reference counted synchronization types, like ARCs, in +which case the value should be cached locally whenever possible to +avoid hitting the mutex. +*/ + +use cast::{transmute, reinterpret_cast}; +use clone::Clone; +use kinds::Owned; +use libc::{c_void, uintptr_t}; +use option::{Option, Some, None}; +use ops::Drop; +use pipes; +use private::{Exclusive, exclusive}; +use private::{SharedMutableState, shared_mutable_state}; +use private::{get_shared_immutable_state}; +use private::at_exit::at_exit; +use send_map::linear::LinearMap; +use sys::Closure; +use task::spawn; +use uint; + +pub type GlobalDataKey = &fn(v: T); + +pub unsafe fn global_data_clone_create( + key: GlobalDataKey, create: &fn() -> ~T) -> T { + /*! + * Clone a global value or, if it has not been created, + * first construct the value then return a clone. + * + * # Safety note + * + * Both the clone operation and the constructor are + * called while the global lock is held. Recursive + * use of the global interface in either of these + * operations will result in deadlock. + */ + global_data_clone_create_(key_ptr(key), create) +} + +unsafe fn global_data_clone_create_( + key: uint, create: &fn() -> ~T) -> T { + + let mut clone_value: Option = None; + do global_data_modify_(key) |value: Option<~T>| { + match value { + None => { + let value = create(); + clone_value = Some(value.clone()); + Some(value) + } + Some(value) => { + clone_value = Some(value.clone()); + Some(value) + } + } + } + return clone_value.unwrap(); +} + +unsafe fn global_data_modify( + key: GlobalDataKey, op: &fn(Option<~T>) -> Option<~T>) { + + global_data_modify_(key_ptr(key), op) +} + +unsafe fn global_data_modify_( + key: uint, op: &fn(Option<~T>) -> Option<~T>) { + + let mut old_dtor = None; + do get_global_state().with |gs| unsafe { + let (maybe_new_value, maybe_dtor) = match gs.map.pop(&key) { + Some((ptr, dtor)) => { + let value: ~T = transmute(ptr); + (op(Some(value)), Some(dtor)) + } + None => { + (op(None), None) + } + }; + match maybe_new_value { + Some(value) => { + let data: *c_void = transmute(value); + let dtor: ~fn() = match maybe_dtor { + Some(dtor) => dtor, + None => { + let dtor: ~fn() = || unsafe { + let _destroy_value: ~T = transmute(data); + }; + dtor + } + }; + let value = (data, dtor); + gs.map.insert(key, value); + } + None => { + match maybe_dtor { + Some(dtor) => old_dtor = Some(dtor), + None => () + } + } + } + } +} + +// GlobalState is a map from keys to unique pointers and a +// destructor. Keys are pointers derived from the type of the +// global value. There is a single GlobalState instance per runtime. +struct GlobalState { + map: LinearMap +} + +impl GlobalState: Drop { + fn finalize(&self) { + for self.map.each_value |v| { + match v { + &(_, ref dtor) => (*dtor)() + } + } + } +} + +fn get_global_state() -> Exclusive unsafe { + + const POISON: int = -1; + + // XXX: Doing atomic_cxchg to initialize the global state + // lazily, which wouldn't be necessary with a runtime written + // in Rust + let global_ptr = rust_get_global_data_ptr(); + + if *global_ptr == 0 { + // Global state doesn't exist yet, probably + + // The global state object + let state = GlobalState { + map: LinearMap() + }; + + // It's under a reference-counted mutex + let state = ~exclusive(state); + + // Convert it to an integer + let state_ptr: &Exclusive = state; + let state_i: int = transmute(state_ptr); + + // Swap our structure into the global pointer + let prev_i = atomic_cxchg(&mut *global_ptr, 0, state_i); + + // Sanity check that we're not trying to reinitialize after shutdown + assert prev_i != POISON; + + if prev_i == 0 { + // Successfully installed the global pointer + + // Take a handle to return + let clone = state.clone(); + + // Install a runtime exit function to destroy the global object + do at_exit || unsafe { + // Poison the global pointer + let prev_i = atomic_cxchg(&mut *global_ptr, state_i, POISON); + assert prev_i == state_i; + + // Capture the global state object in the at_exit closure + // so that it is destroyed at the right time + let _capture_global_state = &state; + }; + return clone; + } else { + // Somebody else initialized the globals first + let state: &Exclusive = transmute(prev_i); + return state.clone(); + } + } else { + let state: &Exclusive = transmute(*global_ptr); + return state.clone(); + } +} + +fn key_ptr(key: GlobalDataKey) -> uint unsafe { + let closure: Closure = reinterpret_cast(&key); + return transmute(closure.code); +} + +extern { + fn rust_get_global_data_ptr() -> *mut int; +} + +#[abi = "rust-intrinsic"] +extern { + fn atomic_cxchg(dst: &mut int, old: int, src: int) -> int; +} + +#[test] +fn test_clone_rc() unsafe { + type MyType = SharedMutableState; + + fn key(_v: SharedMutableState) { } + + for uint::range(0, 100) |_| { + do spawn unsafe { + let val = do global_data_clone_create(key) { + ~shared_mutable_state(10) + }; + + assert get_shared_immutable_state(&val) == &10; + } + } +} + +#[test] +fn test_modify() unsafe { + type MyType = SharedMutableState; + + fn key(_v: SharedMutableState) { } + + do global_data_modify(key) |v| unsafe { + match v { + None => { + Some(~shared_mutable_state(10)) + } + _ => fail + } + } + + do global_data_modify(key) |v| { + match v { + Some(sms) => { + let v = get_shared_immutable_state(sms); + assert *v == 10; + None + }, + _ => fail + } + } + + do global_data_modify(key) |v| unsafe { + match v { + None => { + Some(~shared_mutable_state(10)) + } + _ => fail + } + } +} diff --git a/src/rt/rust_builtin.cpp b/src/rt/rust_builtin.cpp index a37aea13e40..221afb89b23 100644 --- a/src/rt/rust_builtin.cpp +++ b/src/rt/rust_builtin.cpp @@ -1032,6 +1032,12 @@ rust_register_exit_function(spawn_fn runner, fn_env_pair *f) { task->kernel->register_exit_function(runner, f); } +extern "C" void * +rust_get_global_data_ptr() { + rust_task *task = rust_get_current_task(); + return &task->kernel->global_data; +} + // // Local Variables: // mode: C++ diff --git a/src/rt/rust_kernel.cpp b/src/rt/rust_kernel.cpp index 3042b006a92..9c6ba9dcda3 100644 --- a/src/rt/rust_kernel.cpp +++ b/src/rt/rust_kernel.cpp @@ -38,8 +38,8 @@ rust_kernel::rust_kernel(rust_env *env) : global_env_chan(0), at_exit_runner(NULL), at_exit_started(false), - env(env) - + env(env), + global_data(0) { // Create the single threaded scheduler that will run on the platform's // main thread diff --git a/src/rt/rust_kernel.h b/src/rt/rust_kernel.h index b1548e92cdb..99b230f7872 100644 --- a/src/rt/rust_kernel.h +++ b/src/rt/rust_kernel.h @@ -144,6 +144,7 @@ class rust_kernel { public: struct rust_env *env; + uintptr_t global_data; rust_kernel(rust_env *env); diff --git a/src/rt/rustrt.def.in b/src/rt/rustrt.def.in index 719505079e6..8c26832f349 100644 --- a/src/rt/rustrt.def.in +++ b/src/rt/rustrt.def.in @@ -211,3 +211,4 @@ linenoiseHistoryLoad rust_raw_thread_start rust_raw_thread_join_delete rust_register_exit_function +rust_get_global_data_ptr \ No newline at end of file -- cgit 1.4.1-3-g733a5 From 8852279a9ecac970e30b6d92d7efdcbd5485769c Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Tue, 15 Jan 2013 19:53:35 -0800 Subject: core: Add new weak task API --- src/libcore/pipes.rs | 10 +++ src/libcore/private.rs | 2 + src/libcore/private/weak_task.rs | 187 +++++++++++++++++++++++++++++++++++++++ src/rt/rust_builtin.cpp | 12 +++ src/rt/rust_kernel.cpp | 24 +++-- src/rt/rust_kernel.h | 2 + src/rt/rustrt.def.in | 4 +- 7 files changed, 233 insertions(+), 8 deletions(-) create mode 100644 src/libcore/private/weak_task.rs (limited to 'src/rt/rust_kernel.cpp') diff --git a/src/libcore/pipes.rs b/src/libcore/pipes.rs index 2ff4effbd6e..2865c942138 100644 --- a/src/libcore/pipes.rs +++ b/src/libcore/pipes.rs @@ -1234,6 +1234,16 @@ pub fn oneshot() -> (PortOne, ChanOne) { (port, chan) } +impl PortOne { + fn recv(self) -> T { recv_one(self) } + fn try_recv(self) -> Option { try_recv_one(self) } +} + +impl ChanOne { + fn send(self, data: T) { send_one(self, data) } + fn try_send(self, data: T) -> bool { try_send_one(self, data) } +} + /** * Receive a message from a oneshot pipe, failing if the connection was * closed. diff --git a/src/libcore/private.rs b/src/libcore/private.rs index 3eadce1c30c..aa976ee745d 100644 --- a/src/libcore/private.rs +++ b/src/libcore/private.rs @@ -34,6 +34,8 @@ pub mod at_exit; pub mod global; #[path = "private/finally.rs"] pub mod finally; +#[path = "private/weak_task.rs"] +pub mod weak_task; extern mod rustrt { #[legacy_exports]; diff --git a/src/libcore/private/weak_task.rs b/src/libcore/private/weak_task.rs new file mode 100644 index 00000000000..868361b0e60 --- /dev/null +++ b/src/libcore/private/weak_task.rs @@ -0,0 +1,187 @@ +/*! +Weak tasks + +Weak tasks are a runtime feature for building global services that +do not keep the runtime alive. Normally the runtime exits when all +tasks exits, but if a task is weak then the runtime may exit while +it is running, sending a notification to the task that the runtime +is trying to shut down. +*/ + +use option::{Some, None, swap_unwrap}; +use private::at_exit::at_exit; +use private::global::global_data_clone_create; +use private::finally::Finally; +use pipes::{Port, Chan, SharedChan, stream}; +use task::{Task, task, spawn}; +use task::rt::{task_id, get_task_id}; +use send_map::linear::LinearMap; +use ops::Drop; + +type ShutdownMsg = (); + +// XXX: This could be a PortOne but I've experienced bugginess +// with oneshot pipes and try_send +pub unsafe fn weaken_task(f: &fn(Port)) { + let service = global_data_clone_create(global_data_key, + create_global_service); + let (shutdown_port, shutdown_chan) = stream::(); + let shutdown_port = ~mut Some(shutdown_port); + let task = get_task_id(); + // Expect the weak task service to be alive + assert service.try_send(RegisterWeakTask(task, shutdown_chan)); + unsafe { rust_inc_weak_task_count(); } + do fn&() { + let shutdown_port = swap_unwrap(&mut *shutdown_port); + f(shutdown_port) + }.finally || { + unsafe { rust_dec_weak_task_count(); } + // Service my have already exited + service.send(UnregisterWeakTask(task)); + } +} + +type WeakTaskService = SharedChan; +type TaskHandle = task_id; + +fn global_data_key(_v: WeakTaskService) { } + +enum ServiceMsg { + RegisterWeakTask(TaskHandle, Chan), + UnregisterWeakTask(TaskHandle), + Shutdown +} + +fn create_global_service() -> ~WeakTaskService { + + debug!("creating global weak task service"); + let (port, chan) = stream::(); + let port = ~mut Some(port); + let chan = SharedChan(chan); + let chan_clone = chan.clone(); + + do task().unlinked().spawn { + debug!("running global weak task service"); + let port = swap_unwrap(&mut *port); + let port = ~mut Some(port); + do fn&() { + let port = swap_unwrap(&mut *port); + // The weak task service is itself a weak task + debug!("weakening the weak service task"); + unsafe { rust_inc_weak_task_count(); } + run_weak_task_service(port); + }.finally { + debug!("unweakening the weak service task"); + unsafe { rust_dec_weak_task_count(); } + } + } + + do at_exit { + debug!("shutting down weak task service"); + chan.send(Shutdown); + } + + return ~chan_clone; +} + +fn run_weak_task_service(port: Port) { + + let mut shutdown_map = LinearMap(); + + loop { + match port.recv() { + RegisterWeakTask(task, shutdown_chan) => { + let previously_unregistered = + shutdown_map.insert(task, shutdown_chan); + assert previously_unregistered; + } + UnregisterWeakTask(task) => { + match shutdown_map.pop(&task) { + Some(shutdown_chan) => { + // Oneshot pipes must send, even though + // nobody will receive this + shutdown_chan.send(()); + } + None => fail + } + } + Shutdown => break + } + } + + do shutdown_map.consume |_, shutdown_chan| { + // Weak task may have already exited + shutdown_chan.send(()); + } +} + +extern { + unsafe fn rust_inc_weak_task_count(); + unsafe fn rust_dec_weak_task_count(); +} + +#[test] +fn test_simple() unsafe { + let (port, chan) = stream(); + do spawn unsafe { + do weaken_task |_signal| { + } + chan.send(()); + } + port.recv(); +} + +#[test] +fn test_weak_weak() unsafe { + let (port, chan) = stream(); + do spawn unsafe { + do weaken_task |_signal| { + } + do weaken_task |_signal| { + } + chan.send(()); + } + port.recv(); +} + +#[test] +fn test_wait_for_signal() unsafe { + do spawn unsafe { + do weaken_task |signal| { + signal.recv(); + } + } +} + +#[test] +fn test_wait_for_signal_many() unsafe { + use uint; + for uint::range(0, 100) |_| { + do spawn unsafe { + do weaken_task |signal| { + signal.recv(); + } + } + } +} + +#[test] +fn test_select_stream_and_oneshot() unsafe { + use pipes::select2i; + use either::{Left, Right}; + + let (port, chan) = stream(); + let (waitport, waitchan) = stream(); + do spawn unsafe { + do weaken_task |signal| { + match select2i(&port, &signal) { + Left(*) => (), + Right(*) => fail + } + } + waitchan.send(()); + } + chan.send(()); + waitport.recv(); +} + diff --git a/src/rt/rust_builtin.cpp b/src/rt/rust_builtin.cpp index 221afb89b23..a5e1260d4a5 100644 --- a/src/rt/rust_builtin.cpp +++ b/src/rt/rust_builtin.cpp @@ -1038,6 +1038,18 @@ rust_get_global_data_ptr() { return &task->kernel->global_data; } +extern "C" void +rust_inc_weak_task_count() { + rust_task *task = rust_get_current_task(); + task->kernel->inc_weak_task_count(); +} + +extern "C" void +rust_dec_weak_task_count() { + rust_task *task = rust_get_current_task(); + task->kernel->dec_weak_task_count(); +} + // // Local Variables: // mode: C++ diff --git a/src/rt/rust_kernel.cpp b/src/rt/rust_kernel.cpp index 9c6ba9dcda3..d270ac07633 100644 --- a/src/rt/rust_kernel.cpp +++ b/src/rt/rust_kernel.cpp @@ -377,17 +377,12 @@ rust_kernel::weaken_task(rust_port_id chan) { KLOG_("Weakening task with channel %" PRIdPTR, chan); weak_task_chans.push_back(chan); } - uintptr_t new_non_weak_tasks = sync::decrement(non_weak_tasks); - KLOG_("New non-weak tasks %" PRIdPTR, new_non_weak_tasks); - if (new_non_weak_tasks == 0) { - begin_shutdown(); - } + inc_weak_task_count(); } void rust_kernel::unweaken_task(rust_port_id chan) { - uintptr_t new_non_weak_tasks = sync::increment(non_weak_tasks); - KLOG_("New non-weak tasks %" PRIdPTR, new_non_weak_tasks); + dec_weak_task_count(); { scoped_lock with(weak_task_lock); KLOG_("Unweakening task with channel %" PRIdPTR, chan); @@ -399,6 +394,21 @@ rust_kernel::unweaken_task(rust_port_id chan) { } } +void +rust_kernel::inc_weak_task_count() { + uintptr_t new_non_weak_tasks = sync::decrement(non_weak_tasks); + KLOG_("New non-weak tasks %" PRIdPTR, new_non_weak_tasks); + if (new_non_weak_tasks == 0) { + begin_shutdown(); + } +} + +void +rust_kernel::dec_weak_task_count() { + uintptr_t new_non_weak_tasks = sync::increment(non_weak_tasks); + KLOG_("New non-weak tasks %" PRIdPTR, new_non_weak_tasks); +} + void rust_kernel::end_weak_tasks() { std::vector chancopies; diff --git a/src/rt/rust_kernel.h b/src/rt/rust_kernel.h index 99b230f7872..f90ecf01a7b 100644 --- a/src/rt/rust_kernel.h +++ b/src/rt/rust_kernel.h @@ -187,6 +187,8 @@ public: void unregister_task(); void weaken_task(rust_port_id chan); void unweaken_task(rust_port_id chan); + void inc_weak_task_count(); + void dec_weak_task_count(); bool send_to_port(rust_port_id chan, void *sptr); diff --git a/src/rt/rustrt.def.in b/src/rt/rustrt.def.in index 8c26832f349..5be823d8fde 100644 --- a/src/rt/rustrt.def.in +++ b/src/rt/rustrt.def.in @@ -211,4 +211,6 @@ linenoiseHistoryLoad rust_raw_thread_start rust_raw_thread_join_delete rust_register_exit_function -rust_get_global_data_ptr \ No newline at end of file +rust_get_global_data_ptr +rust_inc_weak_task_count +rust_dec_weak_task_count \ No newline at end of file -- cgit 1.4.1-3-g733a5 From fb9299346af9b951890db80e47eb65625997f160 Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Sat, 19 Jan 2013 22:54:29 -0800 Subject: core: Convert getenv/setenv to use a mutex This much simpler implementation uses a global mutex and eliminates the kernel environment channel. --- src/libcore/os.rs | 204 ++++++++++++++++-------------------------------- src/rt/rust_builtin.cpp | 6 -- src/rt/rust_kernel.cpp | 1 - src/rt/rust_kernel.h | 3 - src/rt/rustrt.def.in | 1 - 5 files changed, 68 insertions(+), 147 deletions(-) (limited to 'src/rt/rust_kernel.cpp') diff --git a/src/libcore/os.rs b/src/libcore/os.rs index ff3253a8223..2de7ecf7dff 100644 --- a/src/libcore/os.rs +++ b/src/libcore/os.rs @@ -139,169 +139,101 @@ pub mod win32 { } } -pub fn getenv(n: &str) -> Option<~str> { - global_env::getenv(n) -} +/* +Accessing environment variables is not generally threadsafe. +This uses a per-runtime lock to serialize access. +XXX: It would probably be appropriate to make this a real global +*/ +fn with_env_lock(f: &fn() -> T) -> T { + use private::global::global_data_clone_create; + use private::{Exclusive, exclusive}; + + struct SharedValue(()); + type ValueMutex = Exclusive; + fn key(_: ValueMutex) { } -pub fn setenv(n: &str, v: &str) { - global_env::setenv(n, v) -} + unsafe { + let lock: ValueMutex = global_data_clone_create(key, || { + ~exclusive(SharedValue(())) + }); -pub fn env() -> ~[(~str,~str)] { - global_env::env() + lock.with_imm(|_| f() ) + } } -mod global_env { - //! Internal module for serializing access to getenv/setenv - use either; - use libc; - use oldcomm; - use option::Option; - use private; - use str; - use task; - +pub fn env() -> ~[(~str,~str)] { extern mod rustrt { - unsafe fn rust_global_env_chan_ptr() -> *libc::uintptr_t; - } - - enum Msg { - MsgGetEnv(~str, oldcomm::Chan>), - MsgSetEnv(~str, ~str, oldcomm::Chan<()>), - MsgEnv(oldcomm::Chan<~[(~str,~str)]>) - } - - pub fn getenv(n: &str) -> Option<~str> { - let env_ch = get_global_env_chan(); - let po = oldcomm::Port(); - oldcomm::send(env_ch, MsgGetEnv(str::from_slice(n), - oldcomm::Chan(&po))); - oldcomm::recv(po) - } - - pub fn setenv(n: &str, v: &str) { - let env_ch = get_global_env_chan(); - let po = oldcomm::Port(); - oldcomm::send(env_ch, MsgSetEnv(str::from_slice(n), - str::from_slice(v), - oldcomm::Chan(&po))); - oldcomm::recv(po) + unsafe fn rust_env_pairs() -> ~[~str]; } - pub fn env() -> ~[(~str,~str)] { - let env_ch = get_global_env_chan(); - let po = oldcomm::Port(); - oldcomm::send(env_ch, MsgEnv(oldcomm::Chan(&po))); - oldcomm::recv(po) - } - - fn get_global_env_chan() -> oldcomm::Chan { - unsafe { - let global_ptr = rustrt::rust_global_env_chan_ptr(); - private::chan_from_global_ptr(global_ptr, || { - // FIXME (#2621): This would be a good place to use a very - // small foreign stack - task::task().sched_mode(task::SingleThreaded).unlinked() - }, global_env_task) - } - } - - fn global_env_task(msg_po: oldcomm::Port) { - unsafe { - do private::weaken_task |weak_po| { - loop { - match oldcomm::select2(msg_po, weak_po) { - either::Left(MsgGetEnv(ref n, resp_ch)) => { - oldcomm::send(resp_ch, impl_::getenv(*n)) - } - either::Left(MsgSetEnv(ref n, ref v, resp_ch)) => { - oldcomm::send(resp_ch, impl_::setenv(*n, *v)) - } - either::Left(MsgEnv(resp_ch)) => { - oldcomm::send(resp_ch, impl_::env()) - } - either::Right(_) => break - } - } + unsafe { + do with_env_lock { + let mut pairs = ~[]; + for vec::each(rustrt::rust_env_pairs()) |p| { + let vs = str::splitn_char(*p, '=', 1u); + assert vec::len(vs) == 2u; + pairs.push((copy vs[0], copy vs[1])); } + move pairs } } +} - mod impl_ { - use cast; - use libc; - use option::Option; - use option; - use ptr; - use str; - use vec; - - extern mod rustrt { - unsafe fn rust_env_pairs() -> ~[~str]; - } - - pub fn env() -> ~[(~str,~str)] { - unsafe { - let mut pairs = ~[]; - for vec::each(rustrt::rust_env_pairs()) |p| { - let vs = str::splitn_char(*p, '=', 1u); - assert vec::len(vs) == 2u; - pairs.push((copy vs[0], copy vs[1])); - } - move pairs - } - } - - #[cfg(unix)] - pub fn getenv(n: &str) -> Option<~str> { - unsafe { - let s = str::as_c_str(n, |s| libc::getenv(s)); - return if ptr::null::() == cast::reinterpret_cast(&s) { - option::None::<~str> - } else { - let s = cast::reinterpret_cast(&s); - option::Some::<~str>(str::raw::from_buf(s)) - }; +#[cfg(unix)] +pub fn getenv(n: &str) -> Option<~str> { + unsafe { + do with_env_lock { + let s = str::as_c_str(n, |s| libc::getenv(s)); + if ptr::null::() == cast::reinterpret_cast(&s) { + option::None::<~str> + } else { + let s = cast::reinterpret_cast(&s); + option::Some::<~str>(str::raw::from_buf(s)) } } + } +} - #[cfg(windows)] - pub fn getenv(n: &str) -> Option<~str> { - unsafe { - use os::win32::{as_utf16_p, fill_utf16_buf_and_decode}; - do as_utf16_p(n) |u| { - do fill_utf16_buf_and_decode() |buf, sz| { - libc::GetEnvironmentVariableW(u, buf, sz) - } +#[cfg(windows)] +pub fn getenv(n: &str) -> Option<~str> { + unsafe { + do with_env_lock { + use os::win32::{as_utf16_p, fill_utf16_buf_and_decode}; + do as_utf16_p(n) |u| { + do fill_utf16_buf_and_decode() |buf, sz| { + libc::GetEnvironmentVariableW(u, buf, sz) } } } + } +} - #[cfg(unix)] - pub fn setenv(n: &str, v: &str) { - unsafe { - do str::as_c_str(n) |nbuf| { - do str::as_c_str(v) |vbuf| { - libc::funcs::posix01::unistd::setenv(nbuf, vbuf, 1); - } +#[cfg(unix)] +pub fn setenv(n: &str, v: &str) { + unsafe { + do with_env_lock { + do str::as_c_str(n) |nbuf| { + do str::as_c_str(v) |vbuf| { + libc::funcs::posix01::unistd::setenv(nbuf, vbuf, 1); } } } + } +} - #[cfg(windows)] - pub fn setenv(n: &str, v: &str) { - unsafe { - use os::win32::as_utf16_p; - do as_utf16_p(n) |nbuf| { - do as_utf16_p(v) |vbuf| { - libc::SetEnvironmentVariableW(nbuf, vbuf); - } +#[cfg(windows)] +pub fn setenv(n: &str, v: &str) { + unsafe { + do with_env_lock { + use os::win32::as_utf16_p; + do as_utf16_p(n) |nbuf| { + do as_utf16_p(v) |vbuf| { + libc::SetEnvironmentVariableW(nbuf, vbuf); } } } - } } diff --git a/src/rt/rust_builtin.cpp b/src/rt/rust_builtin.cpp index a5e1260d4a5..327337f441d 100644 --- a/src/rt/rust_builtin.cpp +++ b/src/rt/rust_builtin.cpp @@ -870,12 +870,6 @@ rust_task_unweaken(rust_port_id chan) { task->kernel->unweaken_task(chan); } -extern "C" CDECL uintptr_t* -rust_global_env_chan_ptr() { - rust_task *task = rust_get_current_task(); - return task->kernel->get_global_env_chan(); -} - extern "C" void rust_task_inhibit_kill(rust_task *task) { task->inhibit_kill(); diff --git a/src/rt/rust_kernel.cpp b/src/rt/rust_kernel.cpp index d270ac07633..7e342878841 100644 --- a/src/rt/rust_kernel.cpp +++ b/src/rt/rust_kernel.cpp @@ -35,7 +35,6 @@ rust_kernel::rust_kernel(rust_env *env) : osmain_driver(NULL), non_weak_tasks(0), global_loop_chan(0), - global_env_chan(0), at_exit_runner(NULL), at_exit_started(false), env(env), diff --git a/src/rt/rust_kernel.h b/src/rt/rust_kernel.h index f90ecf01a7b..477e59d1b3e 100644 --- a/src/rt/rust_kernel.h +++ b/src/rt/rust_kernel.h @@ -131,8 +131,6 @@ class rust_kernel { // Used to communicate with the process-side, global libuv loop uintptr_t global_loop_chan; - // Used to serialize access to getenv/setenv - uintptr_t global_env_chan; lock_and_signal at_exit_lock; spawn_fn at_exit_runner; @@ -193,7 +191,6 @@ public: bool send_to_port(rust_port_id chan, void *sptr); uintptr_t* get_global_loop() { return &global_loop_chan; } - uintptr_t* get_global_env_chan() { return &global_env_chan; } void register_exit_function(spawn_fn runner, fn_env_pair *f); }; diff --git a/src/rt/rustrt.def.in b/src/rt/rustrt.def.in index 5be823d8fde..dd84e5ff6e7 100644 --- a/src/rt/rustrt.def.in +++ b/src/rt/rustrt.def.in @@ -174,7 +174,6 @@ rust_dbg_do_nothing rust_dbg_breakpoint rust_osmain_sched_id rust_compare_and_swap_ptr -rust_global_env_chan_ptr rust_port_take rust_port_drop rust_port_task -- cgit 1.4.1-3-g733a5 From b9608fe4232c4014daa540849d471b1791b41fa6 Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Sat, 19 Jan 2013 23:38:17 -0800 Subject: std: Convert uv_global_loop to use pipes --- src/libcore/private/global.rs | 15 +++++ src/libstd/flatpipes.rs | 3 +- src/libstd/net_ip.rs | 6 +- src/libstd/net_tcp.rs | 74 +++++++++++++----------- src/libstd/timer.rs | 21 +++---- src/libstd/uv_global_loop.rs | 106 ++++++++++++++++------------------ src/libstd/uv_iotask.rs | 72 ++++++++++++++--------- src/rt/rust_kernel.cpp | 1 - src/rt/rust_kernel.h | 5 -- src/rt/rust_uv.cpp | 9 --- src/rt/rustrt.def.in | 1 - src/test/run-pass/pipe-detect-term.rs | 2 +- src/test/run-pass/pipe-select.rs | 2 +- src/test/run-pass/pipe-sleep.rs | 2 +- 14 files changed, 172 insertions(+), 147 deletions(-) (limited to 'src/rt/rust_kernel.cpp') diff --git a/src/libcore/private/global.rs b/src/libcore/private/global.rs index 8c1500353ae..d9230e08dc7 100644 --- a/src/libcore/private/global.rs +++ b/src/libcore/private/global.rs @@ -114,6 +114,21 @@ unsafe fn global_data_modify_( } } +pub unsafe fn global_data_clone( + key: GlobalDataKey) -> Option { + let mut maybe_clone: Option = None; + do global_data_modify(key) |current| { + match ¤t { + &Some(~ref value) => { + maybe_clone = Some(value.clone()); + } + &None => () + } + current + } + return maybe_clone; +} + // GlobalState is a map from keys to unique pointers and a // destructor. Keys are pointers derived from the type of the // global value. There is a single GlobalState instance per runtime. diff --git a/src/libstd/flatpipes.rs b/src/libstd/flatpipes.rs index 0607055db5c..cc788dfee22 100644 --- a/src/libstd/flatpipes.rs +++ b/src/libstd/flatpipes.rs @@ -782,7 +782,6 @@ mod test { let (finish_port, finish_chan) = pipes::stream(); let addr = ip::v4::parse_addr("127.0.0.1"); - let iotask = uv::global_loop::get(); let begin_connect_chan = Cell(move begin_connect_chan); let accept_chan = Cell(move accept_chan); @@ -790,6 +789,7 @@ mod test { // The server task do task::spawn |copy addr, move begin_connect_chan, move accept_chan| { + let iotask = &uv::global_loop::get(); let begin_connect_chan = begin_connect_chan.take(); let accept_chan = accept_chan.take(); let listen_res = do tcp::listen( @@ -821,6 +821,7 @@ mod test { begin_connect_port.recv(); debug!("connecting"); + let iotask = &uv::global_loop::get(); let connect_result = tcp::connect(copy addr, port, iotask); assert connect_result.is_ok(); let sock = result::unwrap(move connect_result); diff --git a/src/libstd/net_ip.rs b/src/libstd/net_ip.rs index fad583a668b..080c5514ac8 100644 --- a/src/libstd/net_ip.rs +++ b/src/libstd/net_ip.rs @@ -114,7 +114,7 @@ enum IpGetAddrErr { * a vector of `ip_addr` results, in the case of success, or an error * object in the case of failure */ -pub fn get_addr(node: &str, iotask: iotask) +pub fn get_addr(node: &str, iotask: &iotask) -> result::Result<~[IpAddr], IpGetAddrErr> { do oldcomm::listen |output_ch| { do str::as_buf(node) |node_ptr, len| unsafe { @@ -413,7 +413,7 @@ mod test { #[ignore(reason = "valgrind says it's leaky")] fn test_ip_get_addr() { let localhost_name = ~"localhost"; - let iotask = uv::global_loop::get(); + let iotask = &uv::global_loop::get(); let ga_result = get_addr(localhost_name, iotask); if result::is_err(&ga_result) { fail ~"got err result from net::ip::get_addr();" @@ -439,7 +439,7 @@ mod test { #[ignore(reason = "valgrind says it's leaky")] fn test_ip_get_addr_bad_input() { let localhost_name = ~"sjkl234m,./sdf"; - let iotask = uv::global_loop::get(); + let iotask = &uv::global_loop::get(); let ga_result = get_addr(localhost_name, iotask); assert result::is_err(&ga_result); } diff --git a/src/libstd/net_tcp.rs b/src/libstd/net_tcp.rs index 847962c1773..75c7a7cbfb9 100644 --- a/src/libstd/net_tcp.rs +++ b/src/libstd/net_tcp.rs @@ -142,7 +142,7 @@ pub enum TcpConnectErrData { * `net::tcp::tcp_connect_err_data` instance will be returned */ pub fn connect(input_ip: ip::IpAddr, port: uint, - iotask: IoTask) + iotask: &IoTask) -> result::Result unsafe { let result_po = oldcomm::Port::(); let closed_signal_po = oldcomm::Port::<()>(); @@ -164,7 +164,7 @@ pub fn connect(input_ip: ip::IpAddr, port: uint, ip::Ipv4(_) => { false } ip::Ipv6(_) => { true } }, - iotask: iotask + iotask: iotask.clone() }; let socket_data_ptr = ptr::addr_of(&(*socket_data)); log(debug, fmt!("tcp_connect result_ch %?", conn_data.result_ch)); @@ -496,17 +496,17 @@ pub fn accept(new_conn: TcpNewConnection) let server_data_ptr = uv::ll::get_data_for_uv_handle( server_handle_ptr) as *TcpListenFcData; let reader_po = oldcomm::Port(); - let iotask = (*server_data_ptr).iotask; + let iotask = &(*server_data_ptr).iotask; let stream_handle_ptr = malloc_uv_tcp_t(); *(stream_handle_ptr as *mut uv::ll::uv_tcp_t) = uv::ll::tcp_t(); - let client_socket_data = @{ + let client_socket_data: @TcpSocketData = @{ reader_po: reader_po, reader_ch: oldcomm::Chan(&reader_po), stream_handle_ptr : stream_handle_ptr, connect_req : uv::ll::connect_t(), write_req : uv::ll::write_t(), ipv6: (*server_data_ptr).ipv6, - iotask : iotask + iotask : iotask.clone() }; let client_socket_data_ptr = ptr::addr_of(&(*client_socket_data)); let client_stream_handle_ptr = @@ -588,10 +588,10 @@ pub fn accept(new_conn: TcpNewConnection) * of listen exiting because of an error */ pub fn listen(host_ip: ip::IpAddr, port: uint, backlog: uint, - iotask: IoTask, - on_establish_cb: fn~(oldcomm::Chan>), - new_connect_cb: fn~(TcpNewConnection, - oldcomm::Chan>)) + iotask: &IoTask, + on_establish_cb: fn~(oldcomm::Chan>), + new_connect_cb: fn~(TcpNewConnection, + oldcomm::Chan>)) -> result::Result<(), TcpListenErrData> unsafe { do listen_common(move host_ip, port, backlog, iotask, move on_establish_cb) @@ -606,7 +606,7 @@ pub fn listen(host_ip: ip::IpAddr, port: uint, backlog: uint, } fn listen_common(host_ip: ip::IpAddr, port: uint, backlog: uint, - iotask: IoTask, + iotask: &IoTask, on_establish_cb: fn~(oldcomm::Chan>), on_connect_cb: fn~(*uv::ll::uv_tcp_t)) -> result::Result<(), TcpListenErrData> unsafe { @@ -615,12 +615,12 @@ fn listen_common(host_ip: ip::IpAddr, port: uint, backlog: uint, let kill_ch = oldcomm::Chan(&kill_po); let server_stream = uv::ll::tcp_t(); let server_stream_ptr = ptr::addr_of(&server_stream); - let server_data = { + let server_data: TcpListenFcData = { server_stream_ptr: server_stream_ptr, stream_closed_ch: oldcomm::Chan(&stream_closed_po), kill_ch: kill_ch, on_connect_cb: move on_connect_cb, - iotask: iotask, + iotask: iotask.clone(), ipv6: match &host_ip { &ip::Ipv4(_) => { false } &ip::Ipv6(_) => { true } @@ -895,7 +895,7 @@ fn tear_down_socket_data(socket_data: @TcpSocketData) unsafe { }; let close_data_ptr = ptr::addr_of(&close_data); let stream_handle_ptr = (*socket_data).stream_handle_ptr; - do iotask::interact((*socket_data).iotask) |loop_ptr| unsafe { + do iotask::interact(&(*socket_data).iotask) |loop_ptr| unsafe { log(debug, fmt!("interact dtor for tcp_socket stream %? loop %?", stream_handle_ptr, loop_ptr)); uv::ll::set_data_for_uv_handle(stream_handle_ptr, @@ -916,7 +916,7 @@ fn read_common_impl(socket_data: *TcpSocketData, timeout_msecs: uint) use timer; log(debug, ~"starting tcp::read"); - let iotask = (*socket_data).iotask; + let iotask = &(*socket_data).iotask; let rs_result = read_start_common_impl(socket_data); if result::is_err(&rs_result) { let err_data = result::get_err(&rs_result); @@ -956,7 +956,7 @@ fn read_stop_common_impl(socket_data: *TcpSocketData) -> let stream_handle_ptr = (*socket_data).stream_handle_ptr; let stop_po = oldcomm::Port::>(); let stop_ch = oldcomm::Chan(&stop_po); - do iotask::interact((*socket_data).iotask) |loop_ptr| unsafe { + do iotask::interact(&(*socket_data).iotask) |loop_ptr| unsafe { log(debug, ~"in interact cb for tcp::read_stop"); match uv::ll::read_stop(stream_handle_ptr as *uv::ll::uv_stream_t) { 0i32 => { @@ -984,7 +984,7 @@ fn read_start_common_impl(socket_data: *TcpSocketData) let start_po = oldcomm::Port::>(); let start_ch = oldcomm::Chan(&start_po); log(debug, ~"in tcp::read_start before interact loop"); - do iotask::interact((*socket_data).iotask) |loop_ptr| unsafe { + do iotask::interact(&(*socket_data).iotask) |loop_ptr| unsafe { log(debug, fmt!("in tcp::read_start interact cb %?", loop_ptr)); match uv::ll::read_start(stream_handle_ptr as *uv::ll::uv_stream_t, on_alloc_cb, @@ -1024,7 +1024,7 @@ fn write_common_impl(socket_data_ptr: *TcpSocketData, result_ch: oldcomm::Chan(&result_po) }; let write_data_ptr = ptr::addr_of(&write_data); - do iotask::interact((*socket_data_ptr).iotask) |loop_ptr| unsafe { + do iotask::interact(&(*socket_data_ptr).iotask) |loop_ptr| unsafe { log(debug, fmt!("in interact cb for tcp::write %?", loop_ptr)); match uv::ll::write(write_req_ptr, stream_handle_ptr, @@ -1369,7 +1369,7 @@ pub mod test { } } pub fn impl_gl_tcp_ipv4_server_and_client() { - let hl_loop = uv::global_loop::get(); + let hl_loop = &uv::global_loop::get(); let server_ip = ~"127.0.0.1"; let server_port = 8888u; let expected_req = ~"ping"; @@ -1381,6 +1381,7 @@ pub mod test { let cont_po = oldcomm::Port::<()>(); let cont_ch = oldcomm::Chan(&cont_po); // server + let hl_loop_clone = hl_loop.clone(); do task::spawn_sched(task::ManualThreads(1u)) { let actual_req = do oldcomm::listen |server_ch| { run_tcp_test_server( @@ -1389,7 +1390,7 @@ pub mod test { expected_resp, server_ch, cont_ch, - hl_loop) + &hl_loop_clone) }; server_result_ch.send(actual_req); }; @@ -1415,7 +1416,7 @@ pub mod test { assert str::contains(actual_resp, expected_resp); } pub fn impl_gl_tcp_ipv4_get_peer_addr() { - let hl_loop = uv::global_loop::get(); + let hl_loop = &uv::global_loop::get(); let server_ip = ~"127.0.0.1"; let server_port = 8887u; let expected_resp = ~"pong"; @@ -1426,6 +1427,7 @@ pub mod test { let cont_po = oldcomm::Port::<()>(); let cont_ch = oldcomm::Chan(&cont_po); // server + let hl_loop_clone = hl_loop.clone(); do task::spawn_sched(task::ManualThreads(1u)) { let actual_req = do oldcomm::listen |server_ch| { run_tcp_test_server( @@ -1434,7 +1436,7 @@ pub mod test { expected_resp, server_ch, cont_ch, - hl_loop) + &hl_loop_clone) }; server_result_ch.send(actual_req); }; @@ -1445,10 +1447,11 @@ pub mod test { let server_ip_addr = ip::v4::parse_addr(server_ip); let iotask = uv::global_loop::get(); let connect_result = connect(move server_ip_addr, server_port, - iotask); + &iotask); let sock = result::unwrap(move connect_result); + debug!("testing peer address"); // This is what we are actually testing! assert net::ip::format_addr(&sock.get_peer_addr()) == ~"127.0.0.1"; @@ -1457,12 +1460,14 @@ pub mod test { // Fulfill the protocol the test server expects let resp_bytes = str::to_bytes(~"ping"); tcp_write_single(&sock, resp_bytes); + debug!("message sent"); let read_result = sock.read(0u); client_ch.send(str::from_bytes(read_result.get())); + debug!("result read"); }; } pub fn impl_gl_tcp_ipv4_client_error_connection_refused() { - let hl_loop = uv::global_loop::get(); + let hl_loop = &uv::global_loop::get(); let server_ip = ~"127.0.0.1"; let server_port = 8889u; let expected_req = ~"ping"; @@ -1482,7 +1487,7 @@ pub mod test { } } pub fn impl_gl_tcp_ipv4_server_address_in_use() { - let hl_loop = uv::global_loop::get(); + let hl_loop = &uv::global_loop::get(); let server_ip = ~"127.0.0.1"; let server_port = 8890u; let expected_req = ~"ping"; @@ -1494,6 +1499,7 @@ pub mod test { let cont_po = oldcomm::Port::<()>(); let cont_ch = oldcomm::Chan(&cont_po); // server + let hl_loop_clone = hl_loop.clone(); do task::spawn_sched(task::ManualThreads(1u)) { let actual_req = do oldcomm::listen |server_ch| { run_tcp_test_server( @@ -1502,7 +1508,7 @@ pub mod test { expected_resp, server_ch, cont_ch, - hl_loop) + &hl_loop_clone) }; server_result_ch.send(actual_req); }; @@ -1533,7 +1539,7 @@ pub mod test { } } pub fn impl_gl_tcp_ipv4_server_access_denied() { - let hl_loop = uv::global_loop::get(); + let hl_loop = &uv::global_loop::get(); let server_ip = ~"127.0.0.1"; let server_port = 80u; // this one should fail.. @@ -1553,7 +1559,7 @@ pub mod test { } pub fn impl_gl_tcp_ipv4_server_client_reader_writer() { - let iotask = uv::global_loop::get(); + let iotask = &uv::global_loop::get(); let server_ip = ~"127.0.0.1"; let server_port = 8891u; let expected_req = ~"ping"; @@ -1565,6 +1571,7 @@ pub mod test { let cont_po = oldcomm::Port::<()>(); let cont_ch = oldcomm::Chan(&cont_po); // server + let iotask_clone = iotask.clone(); do task::spawn_sched(task::ManualThreads(1u)) { let actual_req = do oldcomm::listen |server_ch| { run_tcp_test_server( @@ -1573,7 +1580,7 @@ pub mod test { expected_resp, server_ch, cont_ch, - iotask) + &iotask_clone) }; server_result_ch.send(actual_req); }; @@ -1604,7 +1611,7 @@ pub mod test { pub fn impl_tcp_socket_impl_reader_handles_eof() { use core::io::{Reader,ReaderUtil}; - let hl_loop = uv::global_loop::get(); + let hl_loop = &uv::global_loop::get(); let server_ip = ~"127.0.0.1"; let server_port = 10041u; let expected_req = ~"GET /"; @@ -1616,6 +1623,7 @@ pub mod test { let cont_po = oldcomm::Port::<()>(); let cont_ch = oldcomm::Chan(&cont_po); // server + let hl_loop_clone = hl_loop.clone(); do task::spawn_sched(task::ManualThreads(1u)) { let actual_req = do oldcomm::listen |server_ch| { run_tcp_test_server( @@ -1624,7 +1632,7 @@ pub mod test { expected_resp, server_ch, cont_ch, - hl_loop) + &hl_loop_clone) }; server_result_ch.send(actual_req); }; @@ -1664,7 +1672,7 @@ pub mod test { fn run_tcp_test_server(server_ip: &str, server_port: uint, resp: ~str, server_ch: oldcomm::Chan<~str>, cont_ch: oldcomm::Chan<()>, - iotask: IoTask) -> ~str { + iotask: &IoTask) -> ~str { let server_ip_addr = ip::v4::parse_addr(server_ip); let listen_result = listen(move server_ip_addr, server_port, 128, iotask, @@ -1751,7 +1759,7 @@ pub mod test { } fn run_tcp_test_server_fail(server_ip: &str, server_port: uint, - iotask: IoTask) -> TcpListenErrData { + iotask: &IoTask) -> TcpListenErrData { let server_ip_addr = ip::v4::parse_addr(server_ip); let listen_result = listen(move server_ip_addr, server_port, 128, iotask, @@ -1775,7 +1783,7 @@ pub mod test { fn run_tcp_test_client(server_ip: &str, server_port: uint, resp: &str, client_ch: oldcomm::Chan<~str>, - iotask: IoTask) -> result::Result<~str, + iotask: &IoTask) -> result::Result<~str, TcpConnectErrData> { let server_ip_addr = ip::v4::parse_addr(server_ip); diff --git a/src/libstd/timer.rs b/src/libstd/timer.rs index 18c623c2bd8..0f0aa2a011e 100644 --- a/src/libstd/timer.rs +++ b/src/libstd/timer.rs @@ -39,7 +39,7 @@ use core; * * ch - a channel of type T to send a `val` on * * val - a value of type T to send over the provided `ch` */ -pub fn delayed_send(iotask: IoTask, +pub fn delayed_send(iotask: &IoTask, msecs: uint, ch: oldcomm::Chan, val: T) { @@ -90,7 +90,7 @@ pub fn delayed_send(iotask: IoTask, * * `iotask` - a `uv::iotask` that the tcp request will run on * * msecs - an amount of time, in milliseconds, for the current task to block */ -pub fn sleep(iotask: IoTask, msecs: uint) { +pub fn sleep(iotask: &IoTask, msecs: uint) { let exit_po = oldcomm::Port::<()>(); let exit_ch = oldcomm::Chan(&exit_po); delayed_send(iotask, msecs, exit_ch, ()); @@ -117,7 +117,7 @@ pub fn sleep(iotask: IoTask, msecs: uint) { * on the provided port in the allotted timeout period, then the result will * be a `some(T)`. If not, then `none` will be returned. */ -pub fn recv_timeout(iotask: IoTask, +pub fn recv_timeout(iotask: &IoTask, msecs: uint, wait_po: oldcomm::Port) -> Option { @@ -177,13 +177,13 @@ mod test { #[test] fn test_gl_timer_simple_sleep_test() { - let hl_loop = uv::global_loop::get(); + let hl_loop = &uv::global_loop::get(); sleep(hl_loop, 1u); } #[test] fn test_gl_timer_sleep_stress1() { - let hl_loop = uv::global_loop::get(); + let hl_loop = &uv::global_loop::get(); for iter::repeat(50u) { sleep(hl_loop, 1u); } @@ -193,7 +193,7 @@ mod test { fn test_gl_timer_sleep_stress2() { let po = oldcomm::Port(); let ch = oldcomm::Chan(&po); - let hl_loop = uv::global_loop::get(); + let hl_loop = &uv::global_loop::get(); let repeat = 20u; let spec = { @@ -208,11 +208,12 @@ mod test { for spec.each |spec| { let (times, maxms) = *spec; + let hl_loop_clone = hl_loop.clone(); do task::spawn { use rand::*; let rng = Rng(); for iter::repeat(times) { - sleep(hl_loop, rng.next() as uint % maxms); + sleep(&hl_loop_clone, rng.next() as uint % maxms); } oldcomm::send(ch, ()); } @@ -271,12 +272,12 @@ mod test { let expected = rand::Rng().gen_str(16u); let test_po = oldcomm::Port::<~str>(); let test_ch = oldcomm::Chan(&test_po); - + let hl_loop_clone = hl_loop.clone(); do task::spawn() { - delayed_send(hl_loop, 50u, test_ch, expected); + delayed_send(&hl_loop_clone, 50u, test_ch, expected); }; - match recv_timeout(hl_loop, 1u, test_po) { + match recv_timeout(&hl_loop, 1u, test_po) { None => successes += 1, _ => failures += 1 }; diff --git a/src/libstd/uv_global_loop.rs b/src/libstd/uv_global_loop.rs index 276cb9cab64..097e923225a 100644 --- a/src/libstd/uv_global_loop.rs +++ b/src/libstd/uv_global_loop.rs @@ -19,16 +19,16 @@ use uv_iotask::{IoTask, spawn_iotask}; use core::either::{Left, Right}; use core::libc; -use core::oldcomm::{Port, Chan, select2, listen}; -use core::private::{chan_from_global_ptr, weaken_task}; +use core::pipes::{Port, Chan, SharedChan, select2i}; +use core::private::global::{global_data_clone_create, + global_data_clone}; +use core::private::weak_task::weaken_task; use core::str; -use core::task::TaskBuilder; +use core::task::{task, SingleThreaded, spawn}; use core::task; use core::vec; - -extern mod rustrt { - unsafe fn rust_uv_get_kernel_global_chan_ptr() -> *libc::uintptr_t; -} +use core::clone::Clone; +use core::option::{Some, None}; /** * Race-free helper to get access to a global task where a libuv @@ -49,64 +49,58 @@ pub fn get() -> IoTask { #[doc(hidden)] fn get_monitor_task_gl() -> IoTask unsafe { - let monitor_loop_chan_ptr = rustrt::rust_uv_get_kernel_global_chan_ptr(); + type MonChan = Chan; - debug!("ENTERING global_loop::get() loop chan: %?", - monitor_loop_chan_ptr); + struct GlobalIoTask(IoTask); - debug!("before priv::chan_from_global_ptr"); - type MonChan = Chan; + impl GlobalIoTask: Clone { + fn clone(&self) -> GlobalIoTask { + GlobalIoTask((**self).clone()) + } + } - let monitor_ch = - do chan_from_global_ptr::(monitor_loop_chan_ptr, - || { - task::task().sched_mode - (task::SingleThreaded) - .unlinked() - }) |msg_po| unsafe { - debug!("global monitor task starting"); - - // As a weak task the runtime will notify us when to exit - do weaken_task() |weak_exit_po| { - debug!("global monitor task is now weak"); - let hl_loop = spawn_loop(); - loop { - debug!("in outer_loop..."); - match select2(weak_exit_po, msg_po) { - Left(weak_exit) => { - // all normal tasks have ended, tell the - // libuv loop to tear_down, then exit - debug!("weak_exit_po recv'd msg: %?", weak_exit); - iotask::exit(hl_loop); - break; - } - Right(fetch_ch) => { - debug!("hl_loop req recv'd: %?", fetch_ch); - fetch_ch.send(hl_loop); - } + fn key(_: GlobalIoTask) { } + + match global_data_clone(key) { + Some(GlobalIoTask(iotask)) => iotask, + None => { + let iotask: IoTask = spawn_loop(); + let mut installed = false; + let final_iotask = do global_data_clone_create(key) { + installed = true; + ~GlobalIoTask(iotask.clone()) + }; + if installed { + do task().unlinked().spawn() unsafe { + debug!("global monitor task starting"); + // As a weak task the runtime will notify us when to exit + do weaken_task |weak_exit_po| { + debug!("global monitor task is now weak"); + weak_exit_po.recv(); + iotask::exit(&iotask); + debug!("global monitor task is leaving weakend state"); + }; + debug!("global monitor task exiting"); } + } else { + iotask::exit(&iotask); } - debug!("global monitor task is leaving weakend state"); - }; - debug!("global monitor task exiting"); - }; - // once we have a chan to the monitor loop, we ask it for - // the libuv loop's async handle - do listen |fetch_ch| { - monitor_ch.send(fetch_ch); - fetch_ch.recv() + match final_iotask { + GlobalIoTask(iotask) => iotask + } + } } } fn spawn_loop() -> IoTask { - let builder = do task::task().add_wrapper |task_body| { + let builder = do task().add_wrapper |task_body| { fn~(move task_body) { // The I/O loop task also needs to be weak so it doesn't keep // the runtime alive unsafe { - do weaken_task |weak_exit_po| { - debug!("global libuv task is now weak %?", weak_exit_po); + do weaken_task |_| { + debug!("global libuv task is now weak"); task_body(); // We don't wait for the exit message on weak_exit_po @@ -118,6 +112,7 @@ fn spawn_loop() -> IoTask { } } }; + let builder = builder.unlinked(); spawn_iotask(move builder) } @@ -147,7 +142,7 @@ mod test { _status: libc::c_int) unsafe { log(debug, ~"in simple timer cb"); ll::timer_stop(timer_ptr); - let hl_loop = get_gl(); + let hl_loop = &get_gl(); do iotask::interact(hl_loop) |_loop_ptr| unsafe { log(debug, ~"closing timer"); ll::close(timer_ptr, simple_timer_close_cb); @@ -157,7 +152,7 @@ mod test { log(debug, ~"exiting simple timer cb"); } - fn impl_uv_hl_simple_timer(iotask: IoTask) unsafe { + fn impl_uv_hl_simple_timer(iotask: &IoTask) unsafe { let exit_po = oldcomm::Port::(); let exit_ch = oldcomm::Chan(&exit_po); let exit_ch_ptr = ptr::addr_of(&exit_ch); @@ -190,10 +185,11 @@ mod test { #[test] fn test_gl_uv_global_loop_high_level_global_timer() unsafe { - let hl_loop = get_gl(); + let hl_loop = &get_gl(); let exit_po = oldcomm::Port::<()>(); let exit_ch = oldcomm::Chan(&exit_po); task::spawn_sched(task::ManualThreads(1u), || { + let hl_loop = &get_gl(); impl_uv_hl_simple_timer(hl_loop); oldcomm::send(exit_ch, ()); }); @@ -206,12 +202,12 @@ mod test { #[test] #[ignore] fn test_stress_gl_uv_global_loop_high_level_global_timer() unsafe { - let hl_loop = get_gl(); let exit_po = oldcomm::Port::<()>(); let exit_ch = oldcomm::Chan(&exit_po); let cycles = 5000u; for iter::repeat(cycles) { task::spawn_sched(task::ManualThreads(1u), || { + let hl_loop = &get_gl(); impl_uv_hl_simple_timer(hl_loop); oldcomm::send(exit_ch, ()); }); diff --git a/src/libstd/uv_iotask.rs b/src/libstd/uv_iotask.rs index 409d73c2539..c50a19cc5c1 100644 --- a/src/libstd/uv_iotask.rs +++ b/src/libstd/uv_iotask.rs @@ -20,7 +20,7 @@ use ll = uv_ll; use core::libc::c_void; use core::libc; -use core::oldcomm::{Port, Chan, listen}; +use core::pipes::{stream, Port, Chan, SharedChan}; use core::prelude::*; use core::ptr::addr_of; use core::task::TaskBuilder; @@ -30,22 +30,30 @@ use core::task; pub enum IoTask { IoTask_({ async_handle: *ll::uv_async_t, - op_chan: Chan + op_chan: SharedChan }) } +impl IoTask: Clone { + fn clone(&self) -> IoTask { + IoTask_({ + async_handle: self.async_handle, + op_chan: self.op_chan.clone() + }) + } +} + pub fn spawn_iotask(task: task::TaskBuilder) -> IoTask { - do listen |iotask_ch| { + let (iotask_port, iotask_chan) = stream(); - do task.sched_mode(task::SingleThreaded).spawn { - debug!("entering libuv task"); - run_loop(iotask_ch); - debug!("libuv task exiting"); - }; + do task.sched_mode(task::SingleThreaded).spawn { + debug!("entering libuv task"); + run_loop(&iotask_chan); + debug!("libuv task exiting"); + }; - iotask_ch.recv() - } + iotask_port.recv() } @@ -71,7 +79,7 @@ pub fn spawn_iotask(task: task::TaskBuilder) -> IoTask { * module. It is not safe to send the `loop_ptr` param to this callback out * via ports/chans. */ -pub unsafe fn interact(iotask: IoTask, +pub unsafe fn interact(iotask: &IoTask, cb: fn~(*c_void)) { send_msg(iotask, Interaction(move cb)); } @@ -83,7 +91,7 @@ pub unsafe fn interact(iotask: IoTask, * async handle and do a sanity check to make sure that all other handles are * closed, causing a failure otherwise. */ -pub fn exit(iotask: IoTask) unsafe { +pub fn exit(iotask: &IoTask) unsafe { send_msg(iotask, TeardownLoop); } @@ -96,8 +104,9 @@ enum IoTaskMsg { } /// Run the loop and begin handling messages -fn run_loop(iotask_ch: Chan) unsafe { +fn run_loop(iotask_ch: &Chan) unsafe { + debug!("creating loop"); let loop_ptr = ll::loop_new(); // set up the special async handle we'll use to allow multi-task @@ -108,10 +117,12 @@ fn run_loop(iotask_ch: Chan) unsafe { // associate the async handle with the loop ll::async_init(loop_ptr, async_handle, wake_up_cb); + let (msg_po, msg_ch) = stream::(); + // initialize our loop data and store it in the loop let data: IoTaskLoopData = { async_handle: async_handle, - msg_po: Port() + msg_po: msg_po }; ll::set_data_for_uv_handle(async_handle, addr_of(&data)); @@ -119,7 +130,7 @@ fn run_loop(iotask_ch: Chan) unsafe { // while we dwell in the I/O loop let iotask = IoTask_({ async_handle: async_handle, - op_chan: data.msg_po.chan() + op_chan: SharedChan(msg_ch) }); iotask_ch.send(iotask); @@ -136,7 +147,7 @@ type IoTaskLoopData = { msg_po: Port }; -fn send_msg(iotask: IoTask, +fn send_msg(iotask: &IoTask, msg: IoTaskMsg) unsafe { iotask.op_chan.send(move msg); ll::async_send(iotask.async_handle); @@ -151,7 +162,7 @@ extern fn wake_up_cb(async_handle: *ll::uv_async_t, let loop_ptr = ll::get_loop_for_uv_handle(async_handle); let data = ll::get_data_for_uv_handle(async_handle) as *IoTaskLoopData; - let msg_po = (*data).msg_po; + let msg_po = &(*data).msg_po; while msg_po.peek() { match msg_po.recv() { @@ -203,34 +214,37 @@ mod test { iotask: IoTask, exit_ch: oldcomm::Chan<()> }; - fn impl_uv_iotask_async(iotask: IoTask) unsafe { + fn impl_uv_iotask_async(iotask: &IoTask) unsafe { let async_handle = ll::async_t(); let ah_ptr = ptr::addr_of(&async_handle); let exit_po = oldcomm::Port::<()>(); let exit_ch = oldcomm::Chan(&exit_po); let ah_data = { - iotask: iotask, + iotask: iotask.clone(), exit_ch: exit_ch }; - let ah_data_ptr = ptr::addr_of(&ah_data); + let ah_data_ptr: *AhData = ptr::to_unsafe_ptr(&ah_data); + debug!("about to interact"); do interact(iotask) |loop_ptr| unsafe { + debug!("interacting"); ll::async_init(loop_ptr, ah_ptr, async_handle_cb); ll::set_data_for_uv_handle(ah_ptr, ah_data_ptr as *libc::c_void); ll::async_send(ah_ptr); }; + debug!("waiting for async close"); oldcomm::recv(exit_po); } // this fn documents the bear minimum neccesary to roll your own // high_level_loop unsafe fn spawn_test_loop(exit_ch: oldcomm::Chan<()>) -> IoTask { - let iotask_port = oldcomm::Port::(); - let iotask_ch = oldcomm::Chan(&iotask_port); + let (iotask_port, iotask_ch) = stream::(); do task::spawn_sched(task::ManualThreads(1u)) { - run_loop(iotask_ch); + debug!("about to run a test loop"); + run_loop(&iotask_ch); exit_ch.send(()); }; - return oldcomm::recv(iotask_port); + return iotask_port.recv(); } extern fn lifetime_handle_close(handle: *libc::c_void) unsafe { @@ -247,7 +261,9 @@ mod test { fn test_uv_iotask_async() unsafe { let exit_po = oldcomm::Port::<()>(); let exit_ch = oldcomm::Chan(&exit_po); - let iotask = spawn_test_loop(exit_ch); + let iotask = &spawn_test_loop(exit_ch); + + debug!("spawned iotask"); // using this handle to manage the lifetime of the high_level_loop, // as it will exit the first time one of the impl_uv_hl_async() is @@ -258,12 +274,16 @@ mod test { let work_exit_po = oldcomm::Port::<()>(); let work_exit_ch = oldcomm::Chan(&work_exit_po); for iter::repeat(7u) { + let iotask_clone = iotask.clone(); do task::spawn_sched(task::ManualThreads(1u)) { - impl_uv_iotask_async(iotask); + debug!("async"); + impl_uv_iotask_async(&iotask_clone); + debug!("done async"); oldcomm::send(work_exit_ch, ()); }; }; for iter::repeat(7u) { + debug!("waiting"); oldcomm::recv(work_exit_po); }; log(debug, ~"sending teardown_loop msg.."); diff --git a/src/rt/rust_kernel.cpp b/src/rt/rust_kernel.cpp index 7e342878841..8ca49ea6a57 100644 --- a/src/rt/rust_kernel.cpp +++ b/src/rt/rust_kernel.cpp @@ -34,7 +34,6 @@ rust_kernel::rust_kernel(rust_env *env) : sched_reaper(this), osmain_driver(NULL), non_weak_tasks(0), - global_loop_chan(0), at_exit_runner(NULL), at_exit_started(false), env(env), diff --git a/src/rt/rust_kernel.h b/src/rt/rust_kernel.h index 477e59d1b3e..8ba0405b86e 100644 --- a/src/rt/rust_kernel.h +++ b/src/rt/rust_kernel.h @@ -129,9 +129,6 @@ class rust_kernel { void end_weak_tasks(); void begin_shutdown(); - // Used to communicate with the process-side, global libuv loop - uintptr_t global_loop_chan; - lock_and_signal at_exit_lock; spawn_fn at_exit_runner; bool at_exit_started; @@ -190,8 +187,6 @@ public: bool send_to_port(rust_port_id chan, void *sptr); - uintptr_t* get_global_loop() { return &global_loop_chan; } - void register_exit_function(spawn_fn runner, fn_env_pair *f); }; diff --git a/src/rt/rust_uv.cpp b/src/rt/rust_uv.cpp index 53d8177bcf8..2dc70088628 100644 --- a/src/rt/rust_uv.cpp +++ b/src/rt/rust_uv.cpp @@ -513,15 +513,6 @@ rust_uv_ip6_port(struct sockaddr_in6* src) { return ntohs(src->sin6_port); } -extern "C" uintptr_t* -rust_uv_get_kernel_global_chan_ptr() { - uintptr_t* result = rust_get_current_task()->kernel->get_global_loop(); - rust_task* task = rust_get_current_task(); - LOG(task, stdlib, "global loop: %lu", (unsigned long int)result); - LOG(task, stdlib,"global loop val: %lu", (unsigned long int)*result); - return result; -} - extern "C" void* rust_uv_current_kernel_malloc(size_t size) { return current_kernel_malloc(size, "rust_uv_current_kernel_malloc"); diff --git a/src/rt/rustrt.def.in b/src/rt/rustrt.def.in index dd84e5ff6e7..8e8ce9ee509 100644 --- a/src/rt/rustrt.def.in +++ b/src/rt/rustrt.def.in @@ -158,7 +158,6 @@ rust_uv_get_data_for_req rust_uv_set_data_for_req rust_uv_get_base_from_buf rust_uv_get_len_from_buf -rust_uv_get_kernel_global_chan_ptr rust_uv_current_kernel_malloc rust_uv_current_kernel_free rust_uv_getaddrinfo diff --git a/src/test/run-pass/pipe-detect-term.rs b/src/test/run-pass/pipe-detect-term.rs index c2d4be04191..10b13d8757f 100644 --- a/src/test/run-pass/pipe-detect-term.rs +++ b/src/test/run-pass/pipe-detect-term.rs @@ -27,7 +27,7 @@ proto! oneshot ( ) fn main() { - let iotask = uv::global_loop::get(); + let iotask = &uv::global_loop::get(); pipes::spawn_service(oneshot::init, |p| { match try_recv(move p) { diff --git a/src/test/run-pass/pipe-select.rs b/src/test/run-pass/pipe-select.rs index e71d0c4931d..e138f2562aa 100644 --- a/src/test/run-pass/pipe-select.rs +++ b/src/test/run-pass/pipe-select.rs @@ -35,7 +35,7 @@ fn main() { use oneshot::client::*; use stream::client::*; - let iotask = uv::global_loop::get(); + let iotask = &uv::global_loop::get(); let c = pipes::spawn_service(stream::init, |p| { error!("waiting for pipes"); diff --git a/src/test/run-pass/pipe-sleep.rs b/src/test/run-pass/pipe-sleep.rs index 4a6e7b4ce36..ae7e4e7fb0c 100644 --- a/src/test/run-pass/pipe-sleep.rs +++ b/src/test/run-pass/pipe-sleep.rs @@ -27,7 +27,7 @@ fn main() { let c = pipes::spawn_service(oneshot::init, |p| { recv(move p); }); - let iotask = uv::global_loop::get(); + let iotask = &uv::global_loop::get(); sleep(iotask, 500); signal(move c); -- cgit 1.4.1-3-g733a5 From cc9ab2c0339aa00566ee6c5d12383278c7bd7eef Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Mon, 21 Jan 2013 19:22:55 -0800 Subject: Remove old comm-based weak task interface --- src/libcore/private.rs | 102 ------------------------------------------------ src/rt/rust_builtin.cpp | 12 ------ src/rt/rust_kernel.cpp | 42 -------------------- src/rt/rust_kernel.h | 7 ---- src/rt/rustrt.def.in | 2 - 5 files changed, 165 deletions(-) (limited to 'src/rt/rust_kernel.cpp') diff --git a/src/libcore/private.rs b/src/libcore/private.rs index c3068d6f61b..03207330f31 100644 --- a/src/libcore/private.rs +++ b/src/libcore/private.rs @@ -39,8 +39,6 @@ pub mod weak_task; extern mod rustrt { #[legacy_exports]; - unsafe fn rust_task_weaken(ch: rust_port_id); - unsafe fn rust_task_unweaken(ch: rust_port_id); unsafe fn rust_create_little_lock() -> rust_little_lock; unsafe fn rust_destroy_little_lock(lock: rust_little_lock); @@ -92,111 +90,11 @@ fn test_run_in_bare_thread() unsafe { } } -#[allow(non_camel_case_types)] // runtime type -type rust_port_id = uint; - fn compare_and_swap(address: &mut int, oldval: int, newval: int) -> bool { let old = rusti::atomic_cxchg(address, oldval, newval); old == oldval } -/** - * Convert the current task to a 'weak' task temporarily - * - * As a weak task it will not be counted towards the runtime's set - * of live tasks. When there are no more outstanding live (non-weak) tasks - * the runtime will send an exit message on the provided channel. - * - * This function is super-unsafe. Do not use. - * - * # Safety notes - * - * * Weak tasks must either die on their own or exit upon receipt of - * the exit message. Failure to do so will cause the runtime to never - * exit - * * Tasks must not call `weaken_task` multiple times. This will - * break the kernel's accounting of live tasks. - * * Weak tasks must not be supervised. A supervised task keeps - * a reference to its parent, so the parent will not die. - */ -pub unsafe fn weaken_task(f: fn(oldcomm::Port<()>)) { - let po = oldcomm::Port(); - let ch = oldcomm::Chan(&po); - unsafe { - rustrt::rust_task_weaken(cast::reinterpret_cast(&ch)); - } - let _unweaken = Unweaken(ch); - f(po); - - struct Unweaken { - ch: oldcomm::Chan<()>, - drop unsafe { - rustrt::rust_task_unweaken(cast::reinterpret_cast(&self.ch)); - } - } - - fn Unweaken(ch: oldcomm::Chan<()>) -> Unweaken { - Unweaken { - ch: ch - } - } -} - -#[test] -pub fn test_weaken_task_then_unweaken() { - do task::try { - unsafe { - do weaken_task |_po| { - } - } - }; -} - -#[test] -pub fn test_weaken_task_wait() { - do task::spawn_unlinked { - unsafe { - do weaken_task |po| { - oldcomm::recv(po); - } - } - } -} - -#[test] -pub fn test_weaken_task_stress() { - // Create a bunch of weak tasks - for iter::repeat(100u) { - do task::spawn { - unsafe { - do weaken_task |_po| { - } - } - } - do task::spawn_unlinked { - unsafe { - do weaken_task |po| { - // Wait for it to tell us to die - oldcomm::recv(po); - } - } - } - } -} - -#[test] -#[ignore(cfg(windows))] -pub fn test_weaken_task_fail() { - let res = do task::try { - unsafe { - do weaken_task |_po| { - fail; - } - } - }; - assert result::is_err(&res); -} - /**************************************************************************** * Shared state & exclusive ARC ****************************************************************************/ diff --git a/src/rt/rust_builtin.cpp b/src/rt/rust_builtin.cpp index 327337f441d..4fcfc11b325 100644 --- a/src/rt/rust_builtin.cpp +++ b/src/rt/rust_builtin.cpp @@ -858,18 +858,6 @@ rust_compare_and_swap_ptr(intptr_t *address, return sync::compare_and_swap(address, oldval, newval); } -extern "C" CDECL void -rust_task_weaken(rust_port_id chan) { - rust_task *task = rust_get_current_task(); - task->kernel->weaken_task(chan); -} - -extern "C" CDECL void -rust_task_unweaken(rust_port_id chan) { - rust_task *task = rust_get_current_task(); - task->kernel->unweaken_task(chan); -} - extern "C" void rust_task_inhibit_kill(rust_task *task) { task->inhibit_kill(); diff --git a/src/rt/rust_kernel.cpp b/src/rt/rust_kernel.cpp index 8ca49ea6a57..c365f3cca1e 100644 --- a/src/rt/rust_kernel.cpp +++ b/src/rt/rust_kernel.cpp @@ -368,30 +368,6 @@ rust_kernel::unregister_task() { } } -void -rust_kernel::weaken_task(rust_port_id chan) { - { - scoped_lock with(weak_task_lock); - KLOG_("Weakening task with channel %" PRIdPTR, chan); - weak_task_chans.push_back(chan); - } - inc_weak_task_count(); -} - -void -rust_kernel::unweaken_task(rust_port_id chan) { - dec_weak_task_count(); - { - scoped_lock with(weak_task_lock); - KLOG_("Unweakening task with channel %" PRIdPTR, chan); - std::vector::iterator iter = - std::find(weak_task_chans.begin(), weak_task_chans.end(), chan); - if (iter != weak_task_chans.end()) { - weak_task_chans.erase(iter); - } - } -} - void rust_kernel::inc_weak_task_count() { uintptr_t new_non_weak_tasks = sync::decrement(non_weak_tasks); @@ -407,23 +383,6 @@ rust_kernel::dec_weak_task_count() { KLOG_("New non-weak tasks %" PRIdPTR, new_non_weak_tasks); } -void -rust_kernel::end_weak_tasks() { - std::vector chancopies; - { - scoped_lock with(weak_task_lock); - chancopies = weak_task_chans; - weak_task_chans.clear(); - } - while (!chancopies.empty()) { - rust_port_id chan = chancopies.back(); - chancopies.pop_back(); - KLOG_("Notifying weak task " PRIdPTR, chan); - uintptr_t token = 0; - send_to_port(chan, &token); - } -} - void rust_kernel::begin_shutdown() { { @@ -439,7 +398,6 @@ rust_kernel::begin_shutdown() { run_exit_functions(); allow_scheduler_exit(); - end_weak_tasks(); } bool diff --git a/src/rt/rust_kernel.h b/src/rt/rust_kernel.h index 8ba0405b86e..c25cef9fef9 100644 --- a/src/rt/rust_kernel.h +++ b/src/rt/rust_kernel.h @@ -119,14 +119,9 @@ class rust_kernel { // An atomically updated count of the live, 'non-weak' tasks uintptr_t non_weak_tasks; - // Protects weak_task_chans - lock_and_signal weak_task_lock; - // A list of weak tasks that need to be told when to exit - std::vector weak_task_chans; rust_scheduler* get_scheduler_by_id_nolock(rust_sched_id id); void allow_scheduler_exit(); - void end_weak_tasks(); void begin_shutdown(); lock_and_signal at_exit_lock; @@ -180,8 +175,6 @@ public: void register_task(); void unregister_task(); - void weaken_task(rust_port_id chan); - void unweaken_task(rust_port_id chan); void inc_weak_task_count(); void dec_weak_task_count(); diff --git a/src/rt/rustrt.def.in b/src/rt/rustrt.def.in index 8e8ce9ee509..eb9db6c1d57 100644 --- a/src/rt/rustrt.def.in +++ b/src/rt/rustrt.def.in @@ -61,8 +61,6 @@ rust_task_yield rust_task_is_unwinding rust_get_task rust_get_stack_segment -rust_task_weaken -rust_task_unweaken rust_log_str start_task vec_reserve_shared_actual -- cgit 1.4.1-3-g733a5