about summary refs log tree commit diff
path: root/src/libcore/task.rs
diff options
context:
space:
mode:
authorBrian Anderson <banderson@mozilla.com>2012-02-18 16:34:42 -0800
committerBrian Anderson <banderson@mozilla.com>2012-02-20 18:58:04 -0800
commit4220dcf1e9de2c2d2c329ecefa80108b63a69145 (patch)
treebd70397f064225f8d38b692f5f8120ed9e732d9e /src/libcore/task.rs
parentfbc95ba0184a417ff6d8b2b417f210c960e142cc (diff)
downloadrust-4220dcf1e9de2c2d2c329ecefa80108b63a69145.tar.gz
rust-4220dcf1e9de2c2d2c329ecefa80108b63a69145.zip
core: New task API
Diffstat (limited to 'src/libcore/task.rs')
-rw-r--r--src/libcore/task.rs1183
1 files changed, 767 insertions, 416 deletions
diff --git a/src/libcore/task.rs b/src/libcore/task.rs
index ad1f4a7fc7a..79316d5310d 100644
--- a/src/libcore/task.rs
+++ b/src/libcore/task.rs
@@ -1,5 +1,4 @@
-/*
-Module: task
+#[doc = "
 
 Task management.
 
@@ -13,564 +12,916 @@ true: when a parent task fails its children will continue executing. When
 the root (main) task fails, all tasks fail, and then so does the entire
 process.
 
-A task may remove itself from this failure propagation mechanism by
-calling the <unsupervise> function, after which failure will only
-result in the termination of that task.
-
 Tasks may execute in parallel and are scheduled automatically by the runtime.
 
 Example:
 
-> spawn {||
->   log(debug, "Hello, World!");
-> };
+    spawn {||
+        log(error, \"Hello, World!\");
+    }
 
-*/
-import cast = unsafe::reinterpret_cast;
-import comm;
-import ptr;
-import c = ctypes;
+"];
 
 export task;
-export joinable_task;
-export yield;
-export task_notification;
-export join;
-export unsupervise;
 export task_result;
-export tr_success;
-export tr_failure;
-export get_task;
+export notification;
+export sched_mode;
+export sched_opts;
+export task_opts;
+export task_builder::{};
+
+export default_task_opts;
+export mk_task_builder;
+export get_opts;
+export set_opts;
+export add_wrapper;
+export run;
+
+export future_result;
+export future_task;
+export unsupervise;
+export run_listener;
+
 export spawn;
-export spawn_joinable;
-export spawn_connected;
+export spawn_listener;
 export spawn_sched;
-export connected_fn;
-export connected_task;
-export currently_unwinding;
 export try;
 
-#[abi = "rust-intrinsic"]
-native mod rusti {
-    // these must run on the Rust stack so that they can swap stacks etc:
-    fn task_yield(task: *rust_task, &killed: bool);
+export yield;
+export failing;
+export get_task;
+
+
+/* Data types */
+
+#[doc = "A handle to a task"]
+enum task = task_id;
+
+#[doc = "
+
+Indicates the manner in which a task exited.
+
+A task that completes without failing and whose supervised children complete
+without failing is considered to exit successfully.
+
+FIXME: This description does not indicate the current behavior for linked
+failure.
+
+"]
+enum task_result {
+    success,
+    failure,
 }
 
-type rust_closure = {
-    fnptr: c::intptr_t, envptr: c::intptr_t
+#[doc = "
+
+A message type for notifying of task lifecycle events
+
+"]
+enum notification {
+    #[doc = "Sent when a task exits with the task handle and result"]
+    exit(task, task_result)
+}
+
+#[doc = "Scheduler modes"]
+enum sched_mode {
+    #[doc = "All tasks run in the same OS thread"]
+    single_threaded,
+    #[doc = "Tasks are distributed among available CPUs"]
+    thread_per_core,
+    #[doc = "Each task runs in its own OS thread"]
+    thread_per_task,
+    #[doc = "Tasks are distributed among a fixed number of OS threads"]
+    manual_threads(uint),
+}
+
+#[doc = "
+
+Scheduler configuration options
+
+Fields:
+
+* sched_mode - The operating mode of the scheduler
+
+* native_stack_size - The size of the native stack, in bytes
+
+    Rust code runs on Rust-specific stacks. When Rust code calls native code
+    (via functions in native modules) it switches to a typical, large stack
+    appropriate for running code written in languages like C. By default these
+    native stacks have unspecified size, but with this option their size can
+    be precisely specified.
+
+"]
+type sched_opts = {
+    mode: sched_mode,
+    native_stack_size: option<uint>,
 };
 
-#[link_name = "rustrt"]
-#[abi = "cdecl"]
-native mod rustrt {
-    fn rust_get_sched_id() -> sched_id;
-    fn rust_new_sched(num_threads: c::uintptr_t) -> sched_id;
+#[doc = "
 
-    fn get_task_id() -> task_id;
-    fn rust_get_task() -> *rust_task;
+Task configuration options
 
-    fn new_task() -> task_id;
-    fn rust_new_task_in_sched(id: sched_id) -> task_id;
+Fields:
 
-    fn rust_task_config_notify(
-        id: task_id, &&chan: comm::chan<task_notification>);
+* supervise - Do not propagate failure to the parent task
 
-    fn start_task(id: task, closure: *rust_closure);
+    All tasks are linked together via a tree, from parents to children. By
+    default children are 'supervised' by their parent and when they fail
+    so too will their parents. Settings this flag to false disables that
+    behavior.
 
-    fn rust_task_is_unwinding(rt: *rust_task) -> bool;
-    fn unsupervise();
-}
+* notify_chan - Enable lifecycle notifications on the given channel
 
-/* Section: Types */
+* sched - Specify the configuration of a new scheduler to create the task in
 
-type rust_task = *ctypes::void;
+    By default, every task is created in the same scheduler as its
+    parent, where it is scheduled cooperatively with all other tasks
+    in that scheduler. Some specialized applications may want more
+    control over their scheduling, in which case they can be spawned
+    into a new scheduler with the specific properties required.
 
-type sched_id = int;
-type task_id = int;
+    This is of particular importance for libraries which want to call
+    into native code that blocks. Without doing so in a different
+    scheduler other tasks will be impeded or even blocked indefinitely.
 
-/*
-Type: task
+"]
+type task_opts = {
+    supervise: bool,
+    notify_chan: option<comm::chan<notification>>,
+    sched: option<sched_opts>,
+};
+
+#[doc = "
 
-A handle to a task
-*/
-type task = task_id;
+The task builder type.
 
-/*
-Function: spawn
+Provides detailed control over the properties and behavior of new tasks.
 
-Creates and executes a new child task
+"]
+// NB: Builders are designed to be single-use because they do stateful
+// things that get weird when reusing - e.g. if you create a result future
+// it only applies to a single task, so then you have to maintain some
+// potentially tricky state to ensure that everything behaves correctly
+// when you try to reuse the builder to spawn a new task. We'll just
+// sidestep that whole issue by making builder's uncopyable and making
+// the run function move them in.
+enum task_builder = {
+    mutable opts: task_opts,
+    mutable gen_body: fn@(+fn~()) -> fn~(),
+    can_not_copy: option<comm::port<()>>
+};
 
-Sets up a new task with its own call stack and schedules it to be
-executed.  Upon execution, the closure `f()` will be invoked.
 
-Parameters:
+/* Task construction */
 
-f - A function to execute in the new task
+fn default_task_opts() -> task_opts {
+    #[doc = "
 
-Returns:
+    The default task options
 
-A handle to the new task
-*/
-fn spawn(+f: fn~()) -> task {
-    spawn_inner(f, none, new_task_in_this_sched)
-}
+    By default all tasks are supervised by their parent, are spawned
+    into the same scheduler, and do not post lifecycle notifications.
 
-fn spawn_inner(
-    -f: fn~(),
-    notify: option<comm::chan<task_notification>>,
-    new_task: fn() -> task_id
-) -> task unsafe {
-    let closure: *rust_closure = unsafe::reinterpret_cast(ptr::addr_of(f));
-    #debug("spawn: closure={%x,%x}", (*closure).fnptr, (*closure).envptr);
-    let id = new_task();
+    "];
 
-    // set up notifications if they are enabled.
-    option::may(notify) {|c|
-        rustrt::rust_task_config_notify(id, c);
+    {
+        supervise: true,
+        notify_chan: none,
+        sched: none
     }
+}
 
-    rustrt::start_task(id, closure);
-    unsafe::leak(f);
-    ret id;
+fn mk_task_builder() -> task_builder {
+    #[doc = "Construct a task_builder"];
+
+    let body_identity = fn@(+body: fn~()) -> fn~() { body };
+
+    task_builder({
+        mutable opts: default_task_opts(),
+        mutable gen_body: body_identity,
+        can_not_copy: none
+    })
+}
+
+fn get_opts(builder: task_builder) -> task_opts {
+    #[doc = "Get the task_opts associated with a task_builder"];
+
+    builder.opts
+}
+
+fn set_opts(builder: task_builder, opts: task_opts) {
+    #[doc = "
+
+    Set the task_opts associated with a task_builder
+
+    To update a single option use a pattern like the following:
+
+        set_opts(builder, {
+            supervise: false
+            with get_opts(builder)
+        });
+
+    "];
+
+    builder.opts = opts;
 }
 
-fn new_task_in_this_sched() -> task_id {
-    rustrt::new_task()
+fn add_wrapper(builder: task_builder, gen_body: fn@(+fn~()) -> fn~()) {
+    #[doc = "
+
+    Add a wrapper to the body of the spawned task.
+
+    Before the task is spawned it is passed through a 'body generator'
+    function that may perform local setup operations as well as wrap
+    the task body in remote setup operations. With this the behavior
+    of tasks can be extended in simple ways.
+
+    This function augments the current body generator with a new body
+    generator by applying the task body which results from the
+    existing body generator to the new body generator.
+
+    "];
+
+    let prev_gen_body = builder.gen_body;
+    builder.gen_body = fn@(+body: fn~()) -> fn~() {
+        gen_body(prev_gen_body(body))
+    };
 }
 
-fn new_task_in_new_sched(num_threads: uint) -> task_id {
-    let sched_id = rustrt::rust_new_sched(num_threads);
-    rustrt::rust_new_task_in_sched(sched_id)
+fn run(-builder: task_builder, +f: fn~()) {
+    #[doc(desc = "
+
+    Creates and exucutes a new child task
+
+    Sets up a new task with its own call stack and schedules it to run
+    the provided unique closure. The task has the properties and behavior
+    specified by `builder`.
+
+    ", failure = "
+
+    When spawning into a new scheduler, the number of threads requested
+    must be greater than zero.
+
+    ")];
+
+    let body = builder.gen_body(f);
+    spawn_raw(builder.opts, body);
 }
 
-/*
-Function: spawn_sched
 
-Creates a new scheduler and executes a task on it. Tasks subsequently
-spawned by that task will also execute on the new scheduler. When
-there are no more tasks to execute the scheduler terminates.
+/* Builder convenience functions */
+
+fn future_result(builder: task_builder) -> future::future<task_result> {
+    #[doc = "
+
+    Get a future representing the exit status of the task.
+
+    Taking the value of the future will block until the child task terminates.
 
-Arguments:
+    Note that the future returning by this function is only useful for
+    obtaining the value of the next task to be spawning with the
+    builder. If additional tasks are spawned with the same builder
+    then a new result future must be obtained prior to spawning each
+    task.
 
-num_threads - The number of OS threads to dedicate schedule tasks on
-f - A unique closure to execute as a task on the new scheduler
+    "];
 
-Failure:
+    // FIXME (1087, 1857): Once linked failure and notification are
+    // handled in the library, I can imagine implementing this by just
+    // registering an arbitrary number of task::on_exit handlers and
+    // sending out messages.
 
-The number of threads must be greater than 0
+    let po = comm::port();
+    let ch = comm::chan(po);
 
-*/
-fn spawn_sched(num_threads: uint, +f: fn~()) -> task {
-    if num_threads < 1u {
-        fail "Can not create a scheduler with no threads";
+    set_opts(builder, {
+        notify_chan: some(ch)
+        with get_opts(builder)
+    });
+
+    future::from_fn {||
+        alt comm::recv(po) {
+          exit(_, result) { result }
+        }
     }
-    spawn_inner(f, none, bind new_task_in_new_sched(num_threads))
-}
-
-/*
-Type: joinable_task
-
-A task that sends notification upon termination
-*/
-type joinable_task = (task, comm::port<task_notification>);
-
-fn spawn_joinable(+f: fn~()) -> joinable_task {
-    let notify_port = comm::port();
-    let notify_chan = comm::chan(notify_port);
-    let task = spawn_inner(f, some(notify_chan), new_task_in_this_sched);
-    ret (task, notify_port);
-    /*
-    resource notify_rsrc(data: (comm::chan<task_notification>,
-                                task,
-                                @mutable task_result)) {
-        let (chan, task, tr) = data;
-        let msg = exit(task, *tr);
-        comm::send(chan, msg);
+}
+
+fn future_task(builder: task_builder) -> future::future<task> {
+    #[doc = "Get a future representing the handle to the new task"];
+
+    let po = comm::port();
+    let ch = comm::chan(po);
+    add_wrapper(builder) {|body|
+        fn~[move body]() {
+            comm::send(ch, get_task());
+            body();
+        }
     }
+    future::from_port(po)
+}
 
-    let notify_port = comm::port();
-    let notify_chan = comm::chan(notify_port);
-    let g = fn~[copy notify_chan; move f]() {
-        let this_task = rustrt::get_task_id();
-        let result = @mutable tr_failure;
-        let _rsrc = notify_rsrc((notify_chan, this_task, result));
-        f();
-        *result = tr_success; // rsrc will fire msg when fn returns
-    };
-    let task = spawn(g);
-    ret (task, notify_port);
-    */
+fn unsupervise(builder: task_builder) {
+    #[doc = "Configures the new task to not propagate failure to its parent"];
+
+    set_opts(builder, {
+        supervise: false
+        with get_opts(builder)
+    });
 }
 
-/*
-Tag: task_result
+fn run_listener<A:send>(-builder: task_builder,
+                        +f: fn~(comm::port<A>)) -> comm::chan<A> {
+    #[doc = "
 
-Indicates the manner in which a task exited
-*/
-enum task_result {
-    /* Variant: tr_success */
-    tr_success,
-    /* Variant: tr_failure */
-    tr_failure,
+    Runs a new task while providing a channel from the parent to the child
+
+    Sets up a communication channel from the current task to the new
+    child task, passes the port to child's body, and returns a channel
+    linked to the port to the parent.
+
+    This encapsulates some boilerplate handshaking logic that would
+    otherwise be required to establish communication from the parent
+    to the child.
+    "];
+
+    let setup_po = comm::port();
+    let setup_ch = comm::chan(setup_po);
+
+    run(builder, fn~[move f]() {
+        let po = comm::port();
+        let ch = comm::chan(po);
+        comm::send(setup_ch, ch);
+        f(po);
+    });
+
+    comm::recv(setup_po)
 }
 
-/*
-Tag: task_notification
 
-Message sent upon task exit to indicate normal or abnormal termination
-*/
-enum task_notification {
-    /* Variant: exit */
-    exit(task, task_result),
+/* Spawn convenience functions */
+
+fn spawn(+f: fn~()) {
+    #[doc = "
+
+    Creates and exucutes a new child task
+
+    Sets up a new task with its own call stack and schedules it to run
+    the provided unique closure.
+
+    This function is equivalent to `run(mk_task_builder(), f)`.
+    "];
+
+    run(mk_task_builder(), f);
 }
 
-/*
-Type: connected_fn
+fn spawn_listener<A:send>(+f: fn~(comm::port<A>)) -> comm::chan<A> {
+    #[doc = "
 
-The prototype for a connected child task function.  Such a function will be
-supplied with a channel to send messages to the parent and a port to receive
-messages from the parent. The type parameter `ToCh` is the type for messages
-sent from the parent to the child and `FrCh` is the type for messages sent
-from the child to the parent. */
-type connected_fn<ToCh, FrCh> = fn~(comm::port<ToCh>, comm::chan<FrCh>);
+    Runs a new task while providing a channel from the parent to the child
 
-/*
-Type: connected_fn
+    Sets up a communication channel from the current task to the new
+    child task, passes the port to child's body, and returns a channel
+    linked to the port to the parent.
 
-The result type of <spawn_connected>
-*/
-type connected_task<ToCh, FrCh> = {
-    from_child: comm::port<FrCh>,
-    to_child: comm::chan<ToCh>,
-    task: task
-};
+    This encapsulates some boilerplate handshaking logic that would
+    otherwise be required to establish communication from the parent
+    to the child.
 
-/*
-Function: spawn_connected
+    The simplest way to establish bidirectional communication between
+    a parent in child is as follows:
 
-Spawns a child task along with a port/channel for exchanging messages
-with the parent task.  The type `ToCh` represents messages sent to the child
-and `FrCh` messages received from the child.
+        let po = comm::port();
+        let ch = comm::chan(po);
+        let ch = spawn_listener {|po|
+            // Now the child has a port called 'po' to read from and
+            // an environment-captured channel called 'ch'.
+        };
+        // Likewise, the parent has both a 'po' and 'ch'
+
+    This function is equivalent to `run_listener(mk_task_builder(), f)`.
+
+    "];
+
+    run_listener(mk_task_builder(), f)
+}
 
-Parameters:
+fn spawn_sched(mode: sched_mode, +f: fn~()) {
+    #[doc(desc = "
 
-f - the child function to execute
+    Creates a new scheduler and executes a task on it
 
-Returns:
+    Tasks subsequently spawned by that task will also execute on
+    the new scheduler. When there are no more tasks to execute the
+    scheduler terminates.
 
-The new child task along with the port to receive messages and the channel
-to send messages.
-*/
-fn spawn_connected<ToCh:send, FrCh:send>(+f: connected_fn<ToCh, FrCh>)
-    -> connected_task<ToCh,FrCh> {
-    let from_child_port = comm::port::<FrCh>();
-    let from_child_chan = comm::chan(from_child_port);
-    let get_to_child_port = comm::port::<comm::chan<ToCh>>();
-    let get_to_child_chan = comm::chan(get_to_child_port);
-    let child_task = spawn(fn~[move f]() {
-        let to_child_port = comm::port::<ToCh>();
-        comm::send(get_to_child_chan, comm::chan(to_child_port));
-        f(to_child_port, from_child_chan);
+    ", failure = "
+
+    In manual threads mode the number of threads requested must be
+    greater than zero.
+
+    ")];
+
+    let builder = mk_task_builder();
+    set_opts(builder, {
+        sched: some({
+            mode: mode,
+            native_stack_size: none
+        })
+        with get_opts(builder)
     });
-    let to_child_chan = comm::recv(get_to_child_port);
-    ret {from_child: from_child_port,
-         to_child: to_child_chan,
-         task: child_task};
+    run(builder, f);
 }
 
-/* Section: Operations */
+fn try<T:send>(+f: fn~() -> T) -> result::t<T,()> {
+    #[doc(desc = "
 
-/*
-Type: get_task
+    Execute a function in another task and return either the return value
+    of the function or result::err.
 
-Retreives a handle to the currently executing task
-*/
-fn get_task() -> task { rustrt::get_task_id() }
+    ", return = "
 
-/*
-Function: yield
+    If the function executed successfully then try returns result::ok
+    containing the value returned by the function. If the function fails
+    then try returns result::err containing nil.
 
-Yield control to the task scheduler
+    ")];
+
+    let po = comm::port();
+    let ch = comm::chan(po);
+    let builder = mk_task_builder();
+    unsupervise(builder);
+    let result = future_result(builder);
+    run(builder, fn~[move f]() {
+        comm::send(ch, f());
+    });
+    alt future::get(result) {
+      success { result::ok(comm::recv(po)) }
+      failure { result::err(()) }
+    }
+}
+
+
+/* Lifecycle functions */
 
-The scheduler may schedule another task to execute.
-*/
 fn yield() {
-    let task = rustrt::rust_get_task();
+    #[doc = "Yield control to the task scheduler"];
+
+    let task_ = rustrt::rust_get_task();
     let killed = false;
-    rusti::task_yield(task, killed);
-    if killed && !currently_unwinding() {
+    rusti::task_yield(task_, killed);
+    if killed && !failing() {
         fail "killed";
     }
 }
 
-/*
-Function: join
+fn failing() -> bool {
+    #[doc = "True if the running task has failed"];
 
-Wait for a child task to exit
+    rustrt::rust_task_is_unwinding(rustrt::rust_get_task())
+}
 
-The child task must have been spawned with <spawn_joinable>, which
-produces a notification port that the child uses to communicate its
-exit status.
+fn get_task() -> task {
+    #[doc = "Get a handle to the running task"];
 
-Returns:
+    task(rustrt::get_task_id())
+}
 
-A task_result indicating whether the task terminated normally or failed
-*/
-fn join(task_port: joinable_task) -> task_result {
-    let (id, port) = task_port;
-    alt comm::recv::<task_notification>(port) {
-      exit(_id, res) {
-        if _id == id {
-            ret res
-        } else {
-            fail #fmt["join received id %d, expected %d", _id, id]
+
+/* Internal */
+
+type sched_id = int;
+type task_id = int;
+
+// These are both opaque runtime/compiler types that we don't know the
+// structure of and should only deal with via unsafe pointer
+type rust_task = ctypes::void;
+type rust_closure = ctypes::void;
+
+fn spawn_raw(opts: task_opts, +f: fn~()) unsafe {
+
+    let f = if opts.supervise {
+        f
+    } else {
+        // FIXME: The runtime supervision API is weird here because it
+        // was designed to let the child unsupervise itself, when what
+        // we actually want is for parents to unsupervise new
+        // children.
+        fn~[move f]() {
+            rustrt::unsupervise();
+            f();
         }
+    };
+
+    let fptr = ptr::addr_of(f);
+    let closure: *rust_closure = unsafe::reinterpret_cast(fptr);
+
+    let task_id = alt opts.sched {
+      none {
+        rustrt::new_task()
+      }
+      some(sched_opts) {
+        new_task_in_new_sched(sched_opts)
       }
+    };
+
+    option::may(opts.notify_chan) {|c|
+        // FIXME (1087): Would like to do notification in Rust
+        rustrt::rust_task_config_notify(task_id, c);
     }
-}
 
-/*
-Function: unsupervise
+    rustrt::start_task(task_id, closure);
+    unsafe::leak(f);
 
-Detaches this task from its parent in the task tree
+    fn new_task_in_new_sched(opts: sched_opts) -> task_id {
+        if opts.native_stack_size != none {
+            fail "native_stack_size scheduler option unimplemented";
+        }
 
-An unsupervised task will not propagate its failure up the task tree
-*/
-fn unsupervise() {
-    rustrt::unsupervise();
-}
+        let num_threads = alt opts.mode {
+          single_threaded { 1u }
+          thread_per_core {
+            fail "thread_per_core scheduling mode unimplemented"
+          }
+          thread_per_task {
+            fail "thread_per_task scheduling mode unimplemented"
+          }
+          manual_threads(threads) {
+            if threads == 0u {
+                fail "can not create a scheduler with no threads";
+            }
+            threads
+          }
+        };
 
-/*
-Function: currently_unwinding()
+        let sched_id = rustrt::rust_new_sched(num_threads);
+        rustrt::rust_new_task_in_sched(sched_id)
+    }
 
-True if we are currently unwinding after a failure.
-*/
-fn currently_unwinding() -> bool {
-    rustrt::rust_task_is_unwinding(rustrt::rust_get_task())
 }
 
-/*
-Function: try
+#[abi = "rust-intrinsic"]
+native mod rusti {
+    fn task_yield(task: *rust_task, &killed: bool);
+}
 
-Execute a function in another task and return either the return value
-of the function or result::err.
+native mod rustrt {
+    fn rust_get_sched_id() -> sched_id;
+    fn rust_new_sched(num_threads: ctypes::uintptr_t) -> sched_id;
 
-Returns:
+    fn get_task_id() -> task_id;
+    fn rust_get_task() -> *rust_task;
 
-If the function executed successfully then try returns result::ok
-containing the value returned by the function. If the function fails
-then try returns result::err containing nil.
-*/
-fn try<T:send>(+f: fn~() -> T) -> result::t<T,()> {
-    let p = comm::port();
-    let ch = comm::chan(p);
-    alt join(spawn_joinable {||
-        unsupervise();
-        comm::send(ch, f());
-    }) {
-      tr_success { result::ok(comm::recv(p)) }
-      tr_failure { result::err(()) }
+    fn new_task() -> task_id;
+    fn rust_new_task_in_sched(id: sched_id) -> task_id;
+
+    fn rust_task_config_notify(
+        id: task_id, &&chan: comm::chan<notification>);
+
+    fn start_task(id: task_id, closure: *rust_closure);
+
+    fn rust_task_is_unwinding(rt: *rust_task) -> bool;
+    fn unsupervise();
+}
+
+
+#[test]
+fn test_spawn_raw_simple() {
+    let po = comm::port();
+    let ch = comm::chan(po);
+    spawn_raw(default_task_opts()) {||
+        comm::send(ch, ());
     }
+    comm::recv(po);
 }
 
-#[cfg(test)]
-mod tests {
-    // FIXME: Leaks on windows
-    #[test]
-    #[ignore(cfg(target_os = "win32"))]
-    fn test_unsupervise() {
-        fn f() { unsupervise(); fail; }
-        spawn {|| f();};
+#[test]
+#[ignore(cfg(target_os = "win32"))]
+fn test_spawn_raw_unsupervise() {
+    let opts = {
+        supervise: false
+        with default_task_opts()
+    };
+    spawn_raw(opts) {||
+        fail;
     }
+}
 
-    #[test]
-    fn test_lib_spawn() {
-        fn foo() { #error("Hello, World!"); }
-        spawn {|| foo();};
+#[test]
+#[ignore(cfg(target_os = "win32"))]
+fn test_spawn_raw_notify() {
+    let task_po = comm::port();
+    let task_ch = comm::chan(task_po);
+    let notify_po = comm::port();
+    let notify_ch = comm::chan(notify_po);
+
+    let opts = {
+        notify_chan: some(notify_ch)
+        with default_task_opts()
+    };
+    spawn_raw(opts) {||
+        comm::send(task_ch, get_task());
     }
+    let task_ = comm::recv(task_po);
+    assert comm::recv(notify_po) == exit(task_, success);
 
-    #[test]
-    fn test_lib_spawn2() {
-        fn foo(x: int) { assert (x == 42); }
-        spawn {|| foo(42);};
+    let opts = {
+        supervise: false,
+        notify_chan: some(notify_ch)
+        with default_task_opts()
+    };
+    spawn_raw(opts) {||
+        comm::send(task_ch, get_task());
+        fail;
     }
+    let task_ = comm::recv(task_po);
+    assert comm::recv(notify_po) == exit(task_, failure);
+}
 
-    #[test]
-    fn test_join_chan() {
-        fn winner() { }
+#[test]
+fn test_run_basic() {
+    let po = comm::port();
+    let ch = comm::chan(po);
+    let builder = mk_task_builder();
+    run(builder) {||
+        comm::send(ch, ());
+    }
+    comm::recv(po);
+}
 
-        let t = spawn_joinable {|| winner();};
-        alt join(t) {
-          tr_success {/* yay! */ }
-          _ { fail "invalid task status received" }
+#[test]
+fn test_add_wrapper() {
+    let po = comm::port();
+    let ch = comm::chan(po);
+    let builder = mk_task_builder();
+    add_wrapper(builder) {|body|
+        fn~() {
+            body();
+            comm::send(ch, ());
         }
     }
+    run(builder) {||}
+    comm::recv(po);
+}
 
-    // FIXME: Leaks on windows
-    #[test]
-    #[ignore(cfg(target_os = "win32"))]
-    fn test_join_chan_fail() {
-        fn failer() { unsupervise(); fail }
+#[test]
+#[ignore(cfg(target_os = "win32"))]
+fn test_future_result() {
+    let builder = mk_task_builder();
+    let result = future_result(builder);
+    run(builder) {||}
+    assert future::get(result) == success;
+
+    let builder = mk_task_builder();
+    let result = future_result(builder);
+    unsupervise(builder);
+    run(builder) {|| fail }
+    assert future::get(result) == failure;
+}
 
-        let t = spawn_joinable {|| failer();};
-        alt join(t) {
-          tr_failure {/* yay! */ }
-          _ { fail "invalid task status received" }
-        }
-    }
+#[test]
+fn test_future_task() {
+    let po = comm::port();
+    let ch = comm::chan(po);
+    let builder = mk_task_builder();
+    let task1 = future_task(builder);
+    run(builder) {|| comm::send(ch, get_task()) }
+    assert future::get(task1) == comm::recv(po);
+}
 
-    #[test]
-    fn spawn_polymorphic() {
-        fn foo<T:send>(x: T) { log(error, x); }
-        spawn {|| foo(true);};
-        spawn {|| foo(42);};
-    }
+#[test]
+fn test_spawn_listiner_bidi() {
+    let po = comm::port();
+    let ch = comm::chan(po);
+    let ch = spawn_listener {|po|
+        // Now the child has a port called 'po' to read from and
+        // an environment-captured channel called 'ch'.
+        let res = comm::recv(po);
+        assert res == "ping";
+        comm::send(ch, "pong");
+    };
+    // Likewise, the parent has both a 'po' and 'ch'
+    comm::send(ch, "ping");
+    let res = comm::recv(po);
+    assert res == "pong";
+}
 
-    #[test]
-    fn try_success() {
-        alt try {||
-            "Success!"
-        } {
-            result::ok("Success!") { }
-            _ { fail; }
-        }
+#[test]
+fn test_try_success() {
+    alt try {||
+        "Success!"
+    } {
+        result::ok("Success!") { }
+        _ { fail; }
     }
+}
 
-    #[test]
-    #[ignore(cfg(target_os = "win32"))]
-    fn try_fail() {
-        alt try {||
-            fail
-        } {
-            result::err(()) { }
-            _ { fail; }
-        }
+#[test]
+#[ignore(cfg(target_os = "win32"))]
+fn test_try_fail() {
+    alt try {||
+        fail
+    } {
+        result::err(()) { }
+        _ { fail; }
     }
+}
 
-    #[test]
-    #[should_fail]
-    #[ignore(cfg(target_os = "win32"))]
-    fn spawn_sched_no_threads() {
-        spawn_sched(0u) {|| };
-    }
+#[test]
+#[should_fail]
+#[ignore(cfg(target_os = "win32"))]
+fn test_spawn_sched_no_threads() {
+    spawn_sched(manual_threads(0u)) {|| };
+}
 
-    #[test]
-    fn spawn_sched_1() {
-        let po = comm::port();
-        let ch = comm::chan(po);
+#[test]
+fn test_spawn_sched() {
+    let po = comm::port();
+    let ch = comm::chan(po);
 
-        fn f(i: int, ch: comm::chan<()>) {
-            let parent_sched_id = rustrt::rust_get_sched_id();
+    fn f(i: int, ch: comm::chan<()>) {
+        let parent_sched_id = rustrt::rust_get_sched_id();
 
-            spawn_sched(1u) {||
-                let child_sched_id = rustrt::rust_get_sched_id();
-                assert parent_sched_id != child_sched_id;
+        spawn_sched(single_threaded) {||
+            let child_sched_id = rustrt::rust_get_sched_id();
+            assert parent_sched_id != child_sched_id;
 
-                if (i == 0) {
-                    comm::send(ch, ());
-                } else {
-                    f(i - 1, ch);
-                }
-            };
+            if (i == 0) {
+                comm::send(ch, ());
+            } else {
+                f(i - 1, ch);
+            }
+        };
 
-        }
-        f(10, ch);
-        comm::recv(po);
     }
+    f(10, ch);
+    comm::recv(po);
+}
 
-    #[test]
-    fn spawn_sched_childs_on_same_sched() {
-        let po = comm::port();
-        let ch = comm::chan(po);
-
-        spawn_sched(1u) {||
-            let parent_sched_id = rustrt::rust_get_sched_id();
-            spawn {||
-                let child_sched_id = rustrt::rust_get_sched_id();
-                // This should be on the same scheduler
-                assert parent_sched_id == child_sched_id;
-                comm::send(ch, ());
-            };
+#[test]
+fn test_spawn_sched_childs_on_same_sched() {
+    let po = comm::port();
+    let ch = comm::chan(po);
+
+    spawn_sched(single_threaded) {||
+        let parent_sched_id = rustrt::rust_get_sched_id();
+        spawn {||
+            let child_sched_id = rustrt::rust_get_sched_id();
+            // This should be on the same scheduler
+            assert parent_sched_id == child_sched_id;
+            comm::send(ch, ());
         };
+    };
 
-        comm::recv(po);
-    }
+    comm::recv(po);
+}
 
-    #[nolink]
-    native mod rt {
-        fn rust_dbg_lock_create() -> *ctypes::void;
-        fn rust_dbg_lock_destroy(lock: *ctypes::void);
-        fn rust_dbg_lock_lock(lock: *ctypes::void);
-        fn rust_dbg_lock_unlock(lock: *ctypes::void);
-        fn rust_dbg_lock_wait(lock: *ctypes::void);
-        fn rust_dbg_lock_signal(lock: *ctypes::void);
-    }
+#[nolink]
+#[cfg(test)]
+native mod testrt {
+    fn rust_dbg_lock_create() -> *ctypes::void;
+    fn rust_dbg_lock_destroy(lock: *ctypes::void);
+    fn rust_dbg_lock_lock(lock: *ctypes::void);
+    fn rust_dbg_lock_unlock(lock: *ctypes::void);
+    fn rust_dbg_lock_wait(lock: *ctypes::void);
+    fn rust_dbg_lock_signal(lock: *ctypes::void);
+}
 
-    #[test]
-    fn spawn_sched_blocking() {
+#[test]
+fn test_spawn_sched_blocking() {
 
-        // Testing that a task in one scheduler can block natively
-        // without affecting other schedulers
-        iter::repeat(20u) {||
+    // Testing that a task in one scheduler can block natively
+    // without affecting other schedulers
+    iter::repeat(20u) {||
 
-            let start_po = comm::port();
-            let start_ch = comm::chan(start_po);
-            let fin_po = comm::port();
-            let fin_ch = comm::chan(fin_po);
+        let start_po = comm::port();
+        let start_ch = comm::chan(start_po);
+        let fin_po = comm::port();
+        let fin_ch = comm::chan(fin_po);
 
-            let lock = rt::rust_dbg_lock_create();
+        let lock = testrt::rust_dbg_lock_create();
 
-            spawn_sched(1u) {||
-                rt::rust_dbg_lock_lock(lock);
+        spawn_sched(single_threaded) {||
+            testrt::rust_dbg_lock_lock(lock);
 
-                comm::send(start_ch, ());
+            comm::send(start_ch, ());
 
-                // Block the scheduler thread
-                rt::rust_dbg_lock_wait(lock);
-                rt::rust_dbg_lock_unlock(lock);
+            // Block the scheduler thread
+            testrt::rust_dbg_lock_wait(lock);
+            testrt::rust_dbg_lock_unlock(lock);
 
-                comm::send(fin_ch, ());
-            };
+            comm::send(fin_ch, ());
+        };
 
-            // Wait until the other task has its lock
-            comm::recv(start_po);
+        // Wait until the other task has its lock
+        comm::recv(start_po);
 
-            fn pingpong(po: comm::port<int>, ch: comm::chan<int>) {
-                let val = 20;
-                while val > 0 {
-                    val = comm::recv(po);
-                    comm::send(ch, val - 1);
-                }
+        fn pingpong(po: comm::port<int>, ch: comm::chan<int>) {
+            let val = 20;
+            while val > 0 {
+                val = comm::recv(po);
+                comm::send(ch, val - 1);
             }
-
-            let setup_po = comm::port();
-            let setup_ch = comm::chan(setup_po);
-            let parent_po = comm::port();
-            let parent_ch = comm::chan(parent_po);
-            spawn {||
-                let child_po = comm::port();
-                comm::send(setup_ch, comm::chan(child_po));
-                pingpong(child_po, parent_ch);
-            };
-
-            let child_ch = comm::recv(setup_po);
-            comm::send(child_ch, 20);
-            pingpong(parent_po, child_ch);
-            rt::rust_dbg_lock_lock(lock);
-            rt::rust_dbg_lock_signal(lock);
-            rt::rust_dbg_lock_unlock(lock);
-            comm::recv(fin_po);
-            rt::rust_dbg_lock_destroy(lock);
         }
+
+        let setup_po = comm::port();
+        let setup_ch = comm::chan(setup_po);
+        let parent_po = comm::port();
+        let parent_ch = comm::chan(parent_po);
+        spawn {||
+            let child_po = comm::port();
+            comm::send(setup_ch, comm::chan(child_po));
+            pingpong(child_po, parent_ch);
+        };
+
+        let child_ch = comm::recv(setup_po);
+        comm::send(child_ch, 20);
+        pingpong(parent_po, child_ch);
+        testrt::rust_dbg_lock_lock(lock);
+        testrt::rust_dbg_lock_signal(lock);
+        testrt::rust_dbg_lock_unlock(lock);
+        comm::recv(fin_po);
+        testrt::rust_dbg_lock_destroy(lock);
     }
+}
 
+#[cfg(test)]
+fn avoid_copying_the_body(spawnfn: fn(+fn~())) {
+    let p = comm::port::<uint>();
+    let ch = comm::chan(p);
+
+    let x = ~1;
+    let x_in_parent = ptr::addr_of(*x) as uint;
+
+    spawnfn(fn~[move x]() {
+        let x_in_child = ptr::addr_of(*x) as uint;
+        comm::send(ch, x_in_child);
+    });
+
+    let x_in_child = comm::recv(p);
+    assert x_in_parent == x_in_child;
 }
 
+#[test]
+fn test_avoid_copying_the_body_spawn() {
+    avoid_copying_the_body(spawn);
+}
 
-// Local Variables:
-// mode: rust;
-// fill-column: 78;
-// indent-tabs-mode: nil
-// c-basic-offset: 4
-// buffer-file-coding-system: utf-8-unix
-// End:
+#[test]
+fn test_avoid_copying_the_body_spawn_listener() {
+    avoid_copying_the_body {|f|
+        spawn_listener(fn~[move f](_po: comm::port<int>) {
+            f();
+        });
+    }
+}
+
+#[test]
+fn test_avoid_copying_the_body_run() {
+    avoid_copying_the_body {|f|
+        let builder = mk_task_builder();
+        run(builder, fn~[move f]() {
+            f();
+        });
+    }
+}
+
+#[test]
+fn test_avoid_copying_the_body_run_listener() {
+    avoid_copying_the_body {|f|
+        let builder = mk_task_builder();
+        run_listener(builder,fn~[move f](_po: comm::port<int>) {
+            f();
+        });
+    }
+}
+
+#[test]
+fn test_avoid_copying_the_body_try() {
+    avoid_copying_the_body {|f|
+        try(fn~[move f]() {
+            f();
+        });
+    }
+}
+
+#[test]
+fn test_avoid_copying_the_body_future_task() {
+    avoid_copying_the_body {|f|
+        let builder = mk_task_builder();
+        future_task(builder);
+        run(builder, fn~[move f]() {
+            f();
+        });
+    }
+}
+
+#[test]
+fn test_avoid_copying_the_body_unsupervise() {
+    avoid_copying_the_body {|f|
+        let builder = mk_task_builder();
+        unsupervise(builder);
+        run(builder, fn~[move f]() {
+            f();
+        });
+    }
+}
\ No newline at end of file