diff options
Diffstat (limited to 'src/libstd/rt/task.rs')
| -rw-r--r-- | src/libstd/rt/task.rs | 561 |
1 files changed, 561 insertions, 0 deletions
diff --git a/src/libstd/rt/task.rs b/src/libstd/rt/task.rs new file mode 100644 index 00000000000..babd111b3c2 --- /dev/null +++ b/src/libstd/rt/task.rs @@ -0,0 +1,561 @@ +// Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +//! Language-level runtime services that should reasonably expected +//! to be available 'everywhere'. Unwinding, local storage, and logging. +//! Even a 'freestanding' Rust would likely want to implement this. + +pub use self::BlockedTask::*; +use self::TaskState::*; + +use any::Any; +use boxed::Box; +use sync::Arc; +use sync::atomic::{AtomicUint, SeqCst}; +use iter::{IteratorExt, Take}; +use kinds::marker; +use mem; +use ops::FnMut; +use core::prelude::{Clone, Drop, Err, Iterator, None, Ok, Option, Send, Some}; +use core::prelude::{drop}; +use str::SendStr; +use thunk::Thunk; + +use rt; +use rt::bookkeeping; +use rt::mutex::NativeMutex; +use rt::local::Local; +use rt::thread::{mod, Thread}; +use sys_common::stack; +use rt::unwind; +use rt::unwind::Unwinder; + +/// State associated with Rust tasks. +/// +/// This structure is currently undergoing major changes, and is +/// likely to be move/be merged with a `Thread` structure. +pub struct Task { + pub unwinder: Unwinder, + pub death: Death, + pub name: Option<SendStr>, + + state: TaskState, + lock: NativeMutex, // native synchronization + awoken: bool, // used to prevent spurious wakeups + + // This field holds the known bounds of the stack in (lo, hi) form. Not all + // native tasks necessarily know their precise bounds, hence this is + // optional. + stack_bounds: (uint, uint), + + stack_guard: uint +} + +// Once a task has entered the `Armed` state it must be destroyed via `drop`, +// and no other method. This state is used to track this transition. +#[deriving(PartialEq)] +enum TaskState { + New, + Armed, + Destroyed, +} + +pub struct TaskOpts { + /// Invoke this procedure with the result of the task when it finishes. + pub on_exit: Option<Thunk<Result>>, + /// A name for the task-to-be, for identification in panic messages + pub name: Option<SendStr>, + /// The size of the stack for the spawned task + pub stack_size: Option<uint>, +} + +/// Indicates the manner in which a task exited. +/// +/// A task that completes without panicking is considered to exit successfully. +/// +/// If you wish for this result's delivery to block until all +/// children tasks complete, recommend using a result future. +pub type Result = ::core::result::Result<(), Box<Any + Send>>; + +/// A handle to a blocked task. Usually this means having the Box<Task> +/// pointer by ownership, but if the task is killable, a killer can steal it +/// at any time. +pub enum BlockedTask { + Owned(Box<Task>), + Shared(Arc<AtomicUint>), +} + +/// Per-task state related to task death, killing, panic, etc. +pub struct Death { + pub on_exit: Option<Thunk<Result>>, +} + +pub struct BlockedTasks { + inner: Arc<AtomicUint>, +} + +impl Task { + /// Creates a new uninitialized task. + pub fn new(stack_bounds: Option<(uint, uint)>, stack_guard: Option<uint>) -> Task { + Task { + unwinder: Unwinder::new(), + death: Death::new(), + state: New, + name: None, + lock: unsafe { NativeMutex::new() }, + awoken: false, + // these *should* get overwritten + stack_bounds: stack_bounds.unwrap_or((0, 0)), + stack_guard: stack_guard.unwrap_or(0) + } + } + + pub fn spawn<F>(opts: TaskOpts, f: F) + where F : FnOnce(), F : Send + { + Task::spawn_thunk(opts, Thunk::new(f)) + } + + fn spawn_thunk(opts: TaskOpts, f: Thunk) { + let TaskOpts { name, stack_size, on_exit } = opts; + + let mut task = box Task::new(None, None); + task.name = name; + task.death.on_exit = on_exit; + + let stack = stack_size.unwrap_or(rt::min_stack()); + + // Note that this increment must happen *before* the spawn in order to + // guarantee that if this task exits it will always end up waiting for + // the spawned task to exit. + let token = bookkeeping::increment(); + + // Spawning a new OS thread guarantees that __morestack will never get + // triggered, but we must manually set up the actual stack bounds once + // this function starts executing. This raises the lower limit by a bit + // because by the time that this function is executing we've already + // consumed at least a little bit of stack (we don't know the exact byte + // address at which our stack started). + Thread::spawn_stack(stack, move|| { + let something_around_the_top_of_the_stack = 1; + let addr = &something_around_the_top_of_the_stack as *const int; + let my_stack = addr as uint; + unsafe { + stack::record_os_managed_stack_bounds(my_stack - stack + 1024, + my_stack); + } + task.stack_guard = thread::current_guard_page(); + task.stack_bounds = (my_stack - stack + 1024, my_stack); + + let mut f = Some(f); + drop(task.run(|| { f.take().unwrap().invoke(()) }).destroy()); + drop(token); + }) + } + + /// Consumes ownership of a task, runs some code, and returns the task back. + /// + /// This function can be used as an emulated "try/catch" to interoperate + /// with the rust runtime at the outermost boundary. It is not possible to + /// use this function in a nested fashion (a try/catch inside of another + /// try/catch). Invoking this function is quite cheap. + /// + /// If the closure `f` succeeds, then the returned task can be used again + /// for another invocation of `run`. If the closure `f` panics then `self` + /// will be internally destroyed along with all of the other associated + /// resources of this task. The `on_exit` callback is invoked with the + /// cause of panic (not returned here). This can be discovered by querying + /// `is_destroyed()`. + /// + /// Note that it is possible to view partial execution of the closure `f` + /// because it is not guaranteed to run to completion, but this function is + /// guaranteed to return if it panicks. Care should be taken to ensure that + /// stack references made by `f` are handled appropriately. + /// + /// It is invalid to call this function with a task that has been previously + /// destroyed via a failed call to `run`. + pub fn run(mut self: Box<Task>, f: ||) -> Box<Task> { + assert!(!self.is_destroyed(), "cannot re-use a destroyed task"); + + // First, make sure that no one else is in TLS. This does not allow + // recursive invocations of run(). If there's no one else, then + // relinquish ownership of ourselves back into TLS. + if Local::exists(None::<Task>) { + panic!("cannot run a task recursively inside another"); + } + self.state = Armed; + Local::put(self); + + // There are two primary reasons that general try/catch is unsafe. The + // first is that we do not support nested try/catch. The above check for + // an existing task in TLS is sufficient for this invariant to be + // upheld. The second is that unwinding while unwinding is not defined. + // We take care of that by having an 'unwinding' flag in the task + // itself. For these reasons, this unsafety should be ok. + let result = unsafe { unwind::try(f) }; + + // After running the closure given return the task back out if it ran + // successfully, or clean up the task if it panicked. + let task: Box<Task> = Local::take(); + match result { + Ok(()) => task, + Err(cause) => { task.cleanup(Err(cause)) } + } + } + + /// Destroy all associated resources of this task. + /// + /// This function will perform any necessary clean up to prepare the task + /// for destruction. It is required that this is called before a `Task` + /// falls out of scope. + /// + /// The returned task cannot be used for running any more code, but it may + /// be used to extract the runtime as necessary. + pub fn destroy(self: Box<Task>) -> Box<Task> { + if self.is_destroyed() { + self + } else { + self.cleanup(Ok(())) + } + } + + /// Cleans up a task, processing the result of the task as appropriate. + /// + /// This function consumes ownership of the task, deallocating it once it's + /// done being processed. It is assumed that TLD and the local heap have + /// already been destroyed and/or annihilated. + fn cleanup(mut self: Box<Task>, result: Result) -> Box<Task> { + // After taking care of the data above, we need to transmit the result + // of this task. + let what_to_do = self.death.on_exit.take(); + Local::put(self); + + // FIXME: this is running in a seriously constrained context. If this + // allocates TLD then it will likely abort the runtime. Similarly, + // if this panics, this will also likely abort the runtime. + // + // This closure is currently limited to a channel send via the + // standard library's task interface, but this needs + // reconsideration to whether it's a reasonable thing to let a + // task to do or not. + match what_to_do { + Some(f) => { f.invoke(result) } + None => { drop(result) } + } + + // Now that we're done, we remove the task from TLS and flag it for + // destruction. + let mut task: Box<Task> = Local::take(); + task.state = Destroyed; + return task; + } + + /// Queries whether this can be destroyed or not. + pub fn is_destroyed(&self) -> bool { self.state == Destroyed } + + /// Deschedules the current task, invoking `f` `amt` times. It is not + /// recommended to use this function directly, but rather communication + /// primitives in `std::comm` should be used. + // + // This function gets a little interesting. There are a few safety and + // ownership violations going on here, but this is all done in the name of + // shared state. Additionally, all of the violations are protected with a + // mutex, so in theory there are no races. + // + // The first thing we need to do is to get a pointer to the task's internal + // mutex. This address will not be changing (because the task is allocated + // on the heap). We must have this handle separately because the task will + // have its ownership transferred to the given closure. We're guaranteed, + // however, that this memory will remain valid because *this* is the current + // task's execution thread. + // + // The next weird part is where ownership of the task actually goes. We + // relinquish it to the `f` blocking function, but upon returning this + // function needs to replace the task back in TLS. There is no communication + // from the wakeup thread back to this thread about the task pointer, and + // there's really no need to. In order to get around this, we cast the task + // to a `uint` which is then used at the end of this function to cast back + // to a `Box<Task>` object. Naturally, this looks like it violates + // ownership semantics in that there may be two `Box<Task>` objects. + // + // The fun part is that the wakeup half of this implementation knows to + // "forget" the task on the other end. This means that the awakening half of + // things silently relinquishes ownership back to this thread, but not in a + // way that the compiler can understand. The task's memory is always valid + // for both tasks because these operations are all done inside of a mutex. + // + // You'll also find that if blocking fails (the `f` function hands the + // BlockedTask back to us), we will `mem::forget` the handles. The + // reasoning for this is the same logic as above in that the task silently + // transfers ownership via the `uint`, not through normal compiler + // semantics. + // + // On a mildly unrelated note, it should also be pointed out that OS + // condition variables are susceptible to spurious wakeups, which we need to + // be ready for. In order to accommodate for this fact, we have an extra + // `awoken` field which indicates whether we were actually woken up via some + // invocation of `reawaken`. This flag is only ever accessed inside the + // lock, so there's no need to make it atomic. + pub fn deschedule<F>(mut self: Box<Task>, times: uint, mut f: F) where + F: FnMut(BlockedTask) -> ::core::result::Result<(), BlockedTask>, + { + unsafe { + let me = &mut *self as *mut Task; + let task = BlockedTask::block(self); + + if times == 1 { + let guard = (*me).lock.lock(); + (*me).awoken = false; + match f(task) { + Ok(()) => { + while !(*me).awoken { + guard.wait(); + } + } + Err(task) => { mem::forget(task.wake()); } + } + } else { + let iter = task.make_selectable(times); + let guard = (*me).lock.lock(); + (*me).awoken = false; + + // Apply the given closure to all of the "selectable tasks", + // bailing on the first one that produces an error. Note that + // care must be taken such that when an error is occurred, we + // may not own the task, so we may still have to wait for the + // task to become available. In other words, if task.wake() + // returns `None`, then someone else has ownership and we must + // wait for their signal. + match iter.map(f).filter_map(|a| a.err()).next() { + None => {} + Some(task) => { + match task.wake() { + Some(task) => { + mem::forget(task); + (*me).awoken = true; + } + None => {} + } + } + } + while !(*me).awoken { + guard.wait(); + } + } + // put the task back in TLS, and everything is as it once was. + Local::put(mem::transmute(me)); + } + } + + /// Wakes up a previously blocked task. This function can only be + /// called on tasks that were previously blocked in `deschedule`. + // + // See the comments on `deschedule` for why the task is forgotten here, and + // why it's valid to do so. + pub fn reawaken(mut self: Box<Task>) { + unsafe { + let me = &mut *self as *mut Task; + mem::forget(self); + let guard = (*me).lock.lock(); + (*me).awoken = true; + guard.signal(); + } + } + + /// Yields control of this task to another task. This function will + /// eventually return, but possibly not immediately. This is used as an + /// opportunity to allow other tasks a chance to run. + pub fn yield_now() { + Thread::yield_now(); + } + + /// Returns the stack bounds for this task in (lo, hi) format. The stack + /// bounds may not be known for all tasks, so the return value may be + /// `None`. + pub fn stack_bounds(&self) -> (uint, uint) { + self.stack_bounds + } + + /// Returns the stack guard for this task, if known. + pub fn stack_guard(&self) -> Option<uint> { + if self.stack_guard != 0 { + Some(self.stack_guard) + } else { + None + } + } + + /// Consume this task, flagging it as a candidate for destruction. + /// + /// This function is required to be invoked to destroy a task. A task + /// destroyed through a normal drop will abort. + pub fn drop(mut self) { + self.state = Destroyed; + } +} + +impl Drop for Task { + fn drop(&mut self) { + rtdebug!("called drop for a task: {}", self as *mut Task as uint); + rtassert!(self.state != Armed); + } +} + +impl TaskOpts { + pub fn new() -> TaskOpts { + TaskOpts { on_exit: None, name: None, stack_size: None } + } +} + +impl Iterator<BlockedTask> for BlockedTasks { + fn next(&mut self) -> Option<BlockedTask> { + Some(Shared(self.inner.clone())) + } +} + +impl BlockedTask { + /// Returns Some if the task was successfully woken; None if already killed. + pub fn wake(self) -> Option<Box<Task>> { + match self { + Owned(task) => Some(task), + Shared(arc) => { + match arc.swap(0, SeqCst) { + 0 => None, + n => Some(unsafe { mem::transmute(n) }), + } + } + } + } + + /// Reawakens this task if ownership is acquired. If finer-grained control + /// is desired, use `wake` instead. + pub fn reawaken(self) { + self.wake().map(|t| t.reawaken()); + } + + // This assertion has two flavours because the wake involves an atomic op. + // In the faster version, destructors will panic dramatically instead. + #[cfg(not(test))] pub fn trash(self) { } + #[cfg(test)] pub fn trash(self) { assert!(self.wake().is_none()); } + + /// Create a blocked task, unless the task was already killed. + pub fn block(task: Box<Task>) -> BlockedTask { + Owned(task) + } + + /// Converts one blocked task handle to a list of many handles to the same. + pub fn make_selectable(self, num_handles: uint) -> Take<BlockedTasks> { + let arc = match self { + Owned(task) => { + let flag = unsafe { AtomicUint::new(mem::transmute(task)) }; + Arc::new(flag) + } + Shared(arc) => arc.clone(), + }; + BlockedTasks{ inner: arc }.take(num_handles) + } + + /// Convert to an unsafe uint value. Useful for storing in a pipe's state + /// flag. + #[inline] + pub unsafe fn cast_to_uint(self) -> uint { + match self { + Owned(task) => { + let blocked_task_ptr: uint = mem::transmute(task); + rtassert!(blocked_task_ptr & 0x1 == 0); + blocked_task_ptr + } + Shared(arc) => { + let blocked_task_ptr: uint = mem::transmute(box arc); + rtassert!(blocked_task_ptr & 0x1 == 0); + blocked_task_ptr | 0x1 + } + } + } + + /// Convert from an unsafe uint value. Useful for retrieving a pipe's state + /// flag. + #[inline] + pub unsafe fn cast_from_uint(blocked_task_ptr: uint) -> BlockedTask { + if blocked_task_ptr & 0x1 == 0 { + Owned(mem::transmute(blocked_task_ptr)) + } else { + let ptr: Box<Arc<AtomicUint>> = + mem::transmute(blocked_task_ptr & !1); + Shared(*ptr) + } + } +} + +impl Death { + pub fn new() -> Death { + Death { on_exit: None } + } +} + +#[cfg(test)] +mod test { + use super::*; + use prelude::*; + use task; + use rt::unwind; + + #[test] + fn unwind() { + let result = task::try(move|| ()); + rtdebug!("trying first assert"); + assert!(result.is_ok()); + let result = task::try(move|| -> () panic!()); + rtdebug!("trying second assert"); + assert!(result.is_err()); + } + + #[test] + fn rng() { + use rand::{StdRng, Rng}; + let mut r = StdRng::new().ok().unwrap(); + let _ = r.next_u32(); + } + + #[test] + fn comm_stream() { + let (tx, rx) = channel(); + tx.send(10i); + assert!(rx.recv() == 10); + } + + #[test] + fn comm_shared_chan() { + let (tx, rx) = channel(); + tx.send(10i); + assert!(rx.recv() == 10); + } + + #[test] + #[should_fail] + fn test_begin_unwind() { + use rt::unwind::begin_unwind; + begin_unwind("cause", &(file!(), line!())) + } + + #[test] + fn drop_new_task_ok() { + drop(Task::new(None, None)); + } + + // Task blocking tests + + #[test] + fn block_and_wake() { + let task = box Task::new(None, None); + let task = BlockedTask::block(task).wake().unwrap(); + task.drop(); + } +} |
