about summary refs log tree commit diff
diff options
context:
space:
mode:
authorAlex Crichton <alex@alexcrichton.com>2013-12-15 00:42:21 -0800
committerAlex Crichton <alex@alexcrichton.com>2013-12-24 19:59:54 -0800
commit51c03c1f35f6b076928a1e5b94ec81e6d00c3ac2 (patch)
tree5503ef16e6be59f8a0c8948fe347d48be9267a47
parent282f3d99a5ad85acbc58c03b5dfcdabf649c0c85 (diff)
downloadrust-51c03c1f35f6b076928a1e5b94ec81e6d00c3ac2.tar.gz
rust-51c03c1f35f6b076928a1e5b94ec81e6d00c3ac2.zip
green: Properly wait for main before shutdown
There was a race in the code previously where schedulers would *immediately*
shut down after spawning the main task (because the global task count would
still be 0). This fixes the logic by blocking the sched pool task in receving on
a port instead of spawning a task into the pool to receive on a port.

The modifications necessary were to have a "simple task" running by the time the
code is executing, but this is a simple enough thing to implement and I forsee
this being necessary to have implemented in the future anyway.
-rw-r--r--src/libgreen/lib.rs49
-rw-r--r--src/libgreen/simple.rs77
-rw-r--r--src/libnative/lib.rs25
-rw-r--r--src/libnative/task.rs10
-rw-r--r--src/libstd/rt/task.rs27
5 files changed, 143 insertions, 45 deletions
diff --git a/src/libgreen/lib.rs b/src/libgreen/lib.rs
index 57e2a0bfe16..7318eaaf679 100644
--- a/src/libgreen/lib.rs
+++ b/src/libgreen/lib.rs
@@ -33,7 +33,9 @@
 
 use std::os;
 use std::rt::crate_map;
+use std::rt::local::Local;
 use std::rt::rtio;
+use std::rt::task::Task;
 use std::rt::thread::Thread;
 use std::rt;
 use std::sync::atomics::{SeqCst, AtomicUint, INIT_ATOMIC_UINT};
@@ -41,7 +43,6 @@ use std::sync::deque;
 use std::task::TaskOpts;
 use std::util;
 use std::vec;
-use stdtask = std::rt::task;
 
 use sched::{Shutdown, Scheduler, SchedHandle, TaskFromFriend, NewNeighbor};
 use sleeper_list::SleeperList;
@@ -49,6 +50,7 @@ use stack::StackPool;
 use task::GreenTask;
 
 mod macros;
+mod simple;
 
 pub mod basic;
 pub mod context;
@@ -61,16 +63,20 @@ pub mod task;
 #[lang = "start"]
 pub fn lang_start(main: *u8, argc: int, argv: **u8) -> int {
     use std::cast;
-    do start(argc, argv) {
-        let main: extern "Rust" fn() = unsafe { cast::transmute(main) };
-        main();
-    }
+    let mut ret = None;
+    simple::task().run(|| {
+        ret = Some(do start(argc, argv) {
+            let main: extern "Rust" fn() = unsafe { cast::transmute(main) };
+            main();
+        })
+    });
+    ret.unwrap()
 }
 
 /// Set up a default runtime configuration, given compiler-supplied arguments.
 ///
-/// This function will block the current thread of execution until the entire
-/// pool of M:N schedulers have exited.
+/// This function will block until the entire pool of M:N schedulers have
+/// exited. This function also requires a local task to be available.
 ///
 /// # Arguments
 ///
@@ -95,24 +101,37 @@ pub fn start(argc: int, argv: **u8, main: proc()) -> int {
 
 /// Execute the main function in a pool of M:N schedulers.
 ///
-/// Configures the runtime according to the environment, by default
-/// using a task scheduler with the same number of threads as cores.
-/// Returns a process exit code.
+/// Configures the runtime according to the environment, by default using a task
+/// scheduler with the same number of threads as cores.  Returns a process exit
+/// code.
 ///
 /// This function will not return until all schedulers in the associated pool
 /// have returned.
 pub fn run(main: proc()) -> int {
+    // Create a scheduler pool and spawn the main task into this pool. We will
+    // get notified over a channel when the main task exits.
     let mut pool = SchedPool::new(PoolConfig::new());
     let (port, chan) = Chan::new();
     let mut opts = TaskOpts::new();
     opts.notify_chan = Some(chan);
     pool.spawn(opts, main);
-    do pool.spawn(TaskOpts::new()) {
-        if port.recv().is_err() {
-            os::set_exit_status(rt::DEFAULT_ERROR_CODE);
-        }
+
+    // Wait for the main task to return, and set the process error code
+    // appropriately.
+    if port.recv().is_err() {
+        os::set_exit_status(rt::DEFAULT_ERROR_CODE);
     }
-    unsafe { stdtask::wait_for_completion(); }
+
+    // Once the main task has exited and we've set our exit code, wait for all
+    // spawned sub-tasks to finish running. This is done to allow all schedulers
+    // to remain active while there are still tasks possibly running.
+    unsafe {
+        let mut task = Local::borrow(None::<Task>);
+        task.get().wait_for_other_tasks();
+    }
+
+    // Now that we're sure all tasks are dead, shut down the pool of schedulers,
+    // waiting for them all to return.
     pool.shutdown();
     os::get_exit_status()
 }
diff --git a/src/libgreen/simple.rs b/src/libgreen/simple.rs
new file mode 100644
index 00000000000..6fd2c436b2e
--- /dev/null
+++ b/src/libgreen/simple.rs
@@ -0,0 +1,77 @@
+// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+//! A small module implementing a simple "runtime" used for bootstrapping a rust
+//! scheduler pool and then interacting with it.
+
+use std::cast;
+use std::rt::Runtime;
+use std::task::TaskOpts;
+use std::rt::rtio;
+use std::rt::local::Local;
+use std::rt::task::{Task, BlockedTask};
+use std::unstable::sync::LittleLock;
+
+struct SimpleTask {
+    lock: LittleLock,
+}
+
+impl Runtime for SimpleTask {
+    // Implement the simple tasks of descheduling and rescheduling, but only in
+    // a simple number of cases.
+    fn deschedule(mut ~self, times: uint, mut cur_task: ~Task,
+                  f: |BlockedTask| -> Result<(), BlockedTask>) {
+        assert!(times == 1);
+
+        let my_lock: *mut LittleLock = &mut self.lock;
+        cur_task.put_runtime(self as ~Runtime);
+
+        unsafe {
+            let cur_task_dupe = *cast::transmute::<&~Task, &uint>(&cur_task);
+            let task = BlockedTask::block(cur_task);
+
+            let mut guard = (*my_lock).lock();
+            match f(task) {
+                Ok(()) => guard.wait(),
+                Err(task) => { cast::forget(task.wake()); }
+            }
+            drop(guard);
+            cur_task = cast::transmute::<uint, ~Task>(cur_task_dupe);
+        }
+        Local::put(cur_task);
+    }
+    fn reawaken(mut ~self, mut to_wake: ~Task) {
+        let lock: *mut LittleLock = &mut self.lock;
+        to_wake.put_runtime(self as ~Runtime);
+        unsafe {
+            cast::forget(to_wake);
+            let _l = (*lock).lock();
+            (*lock).signal();
+        }
+    }
+
+    // These functions are all unimplemented and fail as a result. This is on
+    // purpose. A "simple task" is just that, a very simple task that can't
+    // really do a whole lot. The only purpose of the task is to get us off our
+    // feet and running.
+    fn yield_now(~self, _cur_task: ~Task) { fail!() }
+    fn maybe_yield(~self, _cur_task: ~Task) { fail!() }
+    fn spawn_sibling(~self, _cur_task: ~Task, _opts: TaskOpts, _f: proc()) {
+        fail!()
+    }
+    fn local_io<'a>(&'a mut self) -> Option<rtio::LocalIo<'a>> { None }
+    fn wrap(~self) -> ~Any { fail!() }
+}
+
+pub fn task() -> ~Task {
+    let mut task = ~Task::new();
+    task.put_runtime(~SimpleTask { lock: LittleLock::new() } as ~Runtime);
+    return task;
+}
diff --git a/src/libnative/lib.rs b/src/libnative/lib.rs
index 44b66a7804d..60ae239ee97 100644
--- a/src/libnative/lib.rs
+++ b/src/libnative/lib.rs
@@ -33,15 +33,16 @@
 //    answer is that you don't need them)
 
 use std::os;
+use std::rt::local::Local;
+use std::rt::task::Task;
 use std::rt;
-use stdtask = std::rt::task;
 
 pub mod io;
 pub mod task;
 
 
 // XXX: this should not exist here
-#[cfg(stage0, notready)]
+#[cfg(stage0)]
 #[lang = "start"]
 pub fn lang_start(main: *u8, argc: int, argv: **u8) -> int {
     use std::cast;
@@ -72,9 +73,13 @@ pub fn lang_start(main: *u8, argc: int, argv: **u8) -> int {
 /// exited.
 pub fn start(argc: int, argv: **u8, main: proc()) -> int {
     rt::init(argc, argv);
-    let exit_code = run(main);
+    let mut exit_code = None;
+    let mut main = Some(main);
+    task::new().run(|| {
+        exit_code = Some(run(main.take_unwrap()));
+    });
     unsafe { rt::cleanup(); }
-    return exit_code;
+    return exit_code.unwrap();
 }
 
 /// Executes a procedure on the current thread in a Rust task context.
@@ -82,11 +87,11 @@ pub fn start(argc: int, argv: **u8, main: proc()) -> int {
 /// This function has all of the same details as `start` except for a different
 /// number of arguments.
 pub fn run(main: proc()) -> int {
-    // Create a task, run the procedure in it, and then wait for everything.
-    task::run(task::new(), main);
-
-    // Block this OS task waiting for everything to finish.
-    unsafe { stdtask::wait_for_completion() }
-
+    // Run the main procedure and then wait for everything to finish
+    main();
+    unsafe {
+        let mut task = Local::borrow(None::<Task>);
+        task.get().wait_for_other_tasks();
+    }
     os::get_exit_status()
 }
diff --git a/src/libnative/task.rs b/src/libnative/task.rs
index 48768def067..0d5e08979ca 100644
--- a/src/libnative/task.rs
+++ b/src/libnative/task.rs
@@ -77,17 +77,11 @@ pub fn spawn_opts(opts: TaskOpts, f: proc()) {
             stack::record_stack_bounds(my_stack - stack + 1024, my_stack);
         }
 
-        run(task, f);
+        let mut f = Some(f);
+        task.run(|| { f.take_unwrap()() });
     })
 }
 
-/// Runs a task once, consuming the task. The given procedure is run inside of
-/// the task.
-pub fn run(t: ~Task, f: proc()) {
-    let mut f = Some(f);
-    t.run(|| { f.take_unwrap()(); });
-}
-
 // This structure is the glue between channels and the 1:1 scheduling mode. This
 // structure is allocated once per task.
 struct Ops {
diff --git a/src/libstd/rt/task.rs b/src/libstd/rt/task.rs
index c0e1086483d..765f0b427cd 100644
--- a/src/libstd/rt/task.rs
+++ b/src/libstd/rt/task.rs
@@ -292,6 +292,21 @@ impl Task {
     pub fn local_io<'a>(&'a mut self) -> Option<LocalIo<'a>> {
         self.imp.get_mut_ref().local_io()
     }
+
+    /// The main function of all rust executables will by default use this
+    /// function. This function will *block* the OS thread (hence the `unsafe`)
+    /// waiting for all known tasks to complete. Once this function has
+    /// returned, it is guaranteed that no more user-defined code is still
+    /// running.
+    pub unsafe fn wait_for_other_tasks(&mut self) {
+        TASK_COUNT.fetch_sub(1, SeqCst); // don't count ourselves
+        TASK_LOCK.lock();
+        while TASK_COUNT.load(SeqCst) > 0 {
+            TASK_LOCK.wait();
+        }
+        TASK_LOCK.unlock();
+        TASK_COUNT.fetch_add(1, SeqCst); // add ourselves back in
+    }
 }
 
 impl Drop for Task {
@@ -396,18 +411,6 @@ impl Drop for Death {
     }
 }
 
-/// The main function of all rust executables will by default use this function.
-/// This function will *block* the OS thread (hence the `unsafe`) waiting for
-/// all known tasks to complete. Once this function has returned, it is
-/// guaranteed that no more user-defined code is still running.
-pub unsafe fn wait_for_completion() {
-    TASK_LOCK.lock();
-    while TASK_COUNT.load(SeqCst) > 0 {
-        TASK_LOCK.wait();
-    }
-    TASK_LOCK.unlock();
-}
-
 #[cfg(test)]
 mod test {
     use super::*;