diff options
| author | bors <bors@rust-lang.org> | 2014-01-01 13:21:48 -0800 |
|---|---|---|
| committer | bors <bors@rust-lang.org> | 2014-01-01 13:21:48 -0800 |
| commit | 48918fab7226593a4ad406cd83edb46e5c15dd15 (patch) | |
| tree | 26b5b7697a984e21b9fb7aae896fee6e484405ca /src/libstd/rt | |
| parent | c34ef5d7e4f44f8e65600a2c3866f5861c401ea1 (diff) | |
| parent | 3f11f8738201dcf230a1647e30c312c980513b37 (diff) | |
| download | rust-48918fab7226593a4ad406cd83edb46e5c15dd15.tar.gz rust-48918fab7226593a4ad406cd83edb46e5c15dd15.zip | |
auto merge of #11212 : alexcrichton/rust/local-task-count, r=brson
For libgreen, bookeeping should not be global but rather on a per-pool basis. Inside libnative, it's known that there must be a global counter with a mutex/cvar. The benefit of taking this strategy is to remove this functionality from libstd to allow fine-grained control of it through libnative/libgreen. Notably, helper threads in libnative can manually decrement the global count so they don't count towards the global count of threads. Also, the shutdown process of *all* sched pools is now dependent on the number of tasks in the pool being 0 rather than this only being a hardcoded solution for the initial sched pool in libgreen. This involved adding a Local::try_take() method on the Local trait in order for the channel wakeup to work inside of libgreen. The channel send was happening from a SchedTask when there is no Task available in TLS, and now this is possible to work (remote wakeups are always possible, just a little slower).
Diffstat (limited to 'src/libstd/rt')
| -rw-r--r-- | src/libstd/rt/local.rs | 23 | ||||
| -rw-r--r-- | src/libstd/rt/local_ptr.rs | 40 | ||||
| -rw-r--r-- | src/libstd/rt/task.rs | 33 |
3 files changed, 59 insertions, 37 deletions
diff --git a/src/libstd/rt/local.rs b/src/libstd/rt/local.rs index 1c04b6b43ce..b4a6f06c2a4 100644 --- a/src/libstd/rt/local.rs +++ b/src/libstd/rt/local.rs @@ -16,6 +16,7 @@ use rt::local_ptr; pub trait Local<Borrowed> { fn put(value: ~Self); fn take() -> ~Self; + fn try_take() -> Option<~Self>; fn exists(unused_value: Option<Self>) -> bool; fn borrow(unused_value: Option<Self>) -> Borrowed; unsafe fn unsafe_take() -> ~Self; @@ -28,6 +29,8 @@ impl Local<local_ptr::Borrowed<Task>> for Task { fn put(value: ~Task) { unsafe { local_ptr::put(value) } } #[inline] fn take() -> ~Task { unsafe { local_ptr::take() } } + #[inline] + fn try_take() -> Option<~Task> { unsafe { local_ptr::try_take() } } fn exists(_: Option<Task>) -> bool { local_ptr::exists() } #[inline] fn borrow(_: Option<Task>) -> local_ptr::Borrowed<Task> { @@ -47,7 +50,7 @@ impl Local<local_ptr::Borrowed<Task>> for Task { #[cfg(test)] mod test { - use option::None; + use option::{None, Option}; use unstable::run_in_bare_thread; use super::*; use rt::task::Task; @@ -56,7 +59,6 @@ mod test { #[test] fn thread_local_task_smoke_test() { do run_in_bare_thread { - local_ptr::init(); let task = ~Task::new(); Local::put(task); let task: ~Task = Local::take(); @@ -67,7 +69,6 @@ mod test { #[test] fn thread_local_task_two_instances() { do run_in_bare_thread { - local_ptr::init(); let task = ~Task::new(); Local::put(task); let task: ~Task = Local::take(); @@ -83,7 +84,6 @@ mod test { #[test] fn borrow_smoke_test() { do run_in_bare_thread { - local_ptr::init(); let task = ~Task::new(); Local::put(task); @@ -98,7 +98,6 @@ mod test { #[test] fn borrow_with_return() { do run_in_bare_thread { - local_ptr::init(); let task = ~Task::new(); Local::put(task); @@ -111,6 +110,20 @@ mod test { } } + #[test] + fn try_take() { + do run_in_bare_thread { + let task = ~Task::new(); + Local::put(task); + + let t: ~Task = Local::try_take().unwrap(); + let u: Option<~Task> = Local::try_take(); + assert!(u.is_none()); + + cleanup_task(t); + } + } + fn cleanup_task(mut t: ~Task) { t.destroyed = true; } diff --git a/src/libstd/rt/local_ptr.rs b/src/libstd/rt/local_ptr.rs index f13691a7bfe..546a6476b57 100644 --- a/src/libstd/rt/local_ptr.rs +++ b/src/libstd/rt/local_ptr.rs @@ -117,6 +117,24 @@ pub mod compiled { ptr } + /// Optionally take ownership of a pointer from thread-local storage. + /// + /// # Safety note + /// + /// Does not validate the pointer type. + #[inline] + pub unsafe fn try_take<T>() -> Option<~T> { + let ptr = RT_TLS_PTR; + if ptr.is_null() { + None + } else { + let ptr: ~T = cast::transmute(ptr); + // can't use `as`, due to type not matching with `cfg(test)` + RT_TLS_PTR = cast::transmute(0); + Some(ptr) + } + } + /// Take ownership of a pointer from thread-local storage. /// /// # Safety note @@ -205,6 +223,28 @@ pub mod native { return ptr; } + /// Optionally take ownership of a pointer from thread-local storage. + /// + /// # Safety note + /// + /// Does not validate the pointer type. + #[inline] + pub unsafe fn try_take<T>() -> Option<~T> { + match maybe_tls_key() { + Some(key) => { + let void_ptr: *mut c_void = tls::get(key); + if void_ptr.is_null() { + None + } else { + let ptr: ~T = cast::transmute(void_ptr); + tls::set(key, ptr::mut_null()); + Some(ptr) + } + } + None => None + } + } + /// Take ownership of a pointer from thread-local storage. /// /// # Safety note diff --git a/src/libstd/rt/task.rs b/src/libstd/rt/task.rs index e6ab159a769..583a1e0657c 100644 --- a/src/libstd/rt/task.rs +++ b/src/libstd/rt/task.rs @@ -34,21 +34,13 @@ use rt::rtio::LocalIo; use rt::unwind::Unwinder; use send_str::SendStr; use sync::arc::UnsafeArc; -use sync::atomics::{AtomicUint, SeqCst, INIT_ATOMIC_UINT}; +use sync::atomics::{AtomicUint, SeqCst}; use task::{TaskResult, TaskOpts}; use unstable::finally::Finally; -use unstable::mutex::{Mutex, MUTEX_INIT}; #[cfg(stage0)] pub use rt::unwind::begin_unwind; -// These two statics are used as bookeeping to keep track of the rust runtime's -// count of threads. In 1:1 contexts, this is used to know when to return from -// the main function, and in M:N contexts this is used to know when to shut down -// the pool of schedulers. -static mut TASK_COUNT: AtomicUint = INIT_ATOMIC_UINT; -static mut TASK_LOCK: Mutex = MUTEX_INIT; - // The Task struct represents all state associated with a rust // task. There are at this point two primary "subtypes" of task, // however instead of using a subtype we just have a "task_type" field @@ -127,7 +119,6 @@ impl Task { *cast::transmute::<&~Task, &*mut Task>(&self) }; Local::put(self); - unsafe { TASK_COUNT.fetch_add(1, SeqCst); } // The only try/catch block in the world. Attempt to run the task's // client-specified code and catch any failures. @@ -194,13 +185,6 @@ impl Task { unsafe { let me: *mut Task = Local::unsafe_borrow(); (*me).death.collect_failure((*me).unwinder.result()); - - // see comments on these statics for why they're used - if TASK_COUNT.fetch_sub(1, SeqCst) == 1 { - TASK_LOCK.lock(); - TASK_LOCK.signal(); - TASK_LOCK.unlock(); - } } let mut me: ~Task = Local::take(); me.destroyed = true; @@ -293,21 +277,6 @@ impl Task { pub fn local_io<'a>(&'a mut self) -> Option<LocalIo<'a>> { self.imp.get_mut_ref().local_io() } - - /// The main function of all rust executables will by default use this - /// function. This function will *block* the OS thread (hence the `unsafe`) - /// waiting for all known tasks to complete. Once this function has - /// returned, it is guaranteed that no more user-defined code is still - /// running. - pub unsafe fn wait_for_other_tasks(&mut self) { - TASK_COUNT.fetch_sub(1, SeqCst); // don't count ourselves - TASK_LOCK.lock(); - while TASK_COUNT.load(SeqCst) > 0 { - TASK_LOCK.wait(); - } - TASK_LOCK.unlock(); - TASK_COUNT.fetch_add(1, SeqCst); // add ourselves back in - } } impl Drop for Task { |
