about summary refs log tree commit diff
diff options
context:
space:
mode:
authorEric Holk <eholk@mozilla.com>2011-07-23 19:03:02 -0700
committerEric Holk <eholk@mozilla.com>2011-07-28 10:47:28 -0700
commit62bc6b51136760b1d4f4b691aaa089bdb9bf0af5 (patch)
treebd4787e8bd4eed7b3ca7b3d99ece0fc75ae444fa
parentb51f5c395cc3458e428159b908ca95b1777e66e2 (diff)
downloadrust-62bc6b51136760b1d4f4b691aaa089bdb9bf0af5.tar.gz
rust-62bc6b51136760b1d4f4b691aaa089bdb9bf0af5.zip
Per-thread scheduling. Closes #682.
Tasks are spawned on a random thread. Currently they stay there, but
we should add task migration and load balancing in the future. This
should drammatically improve our task performance benchmarks.
-rw-r--r--src/rt/circular_buffer.cpp84
-rw-r--r--src/rt/rust.cpp7
-rw-r--r--src/rt/rust_chan.cpp50
-rw-r--r--src/rt/rust_kernel.cpp85
-rw-r--r--src/rt/rust_kernel.h42
-rw-r--r--src/rt/rust_scheduler.cpp56
-rw-r--r--src/rt/rust_scheduler.h16
-rw-r--r--src/rt/rust_task.cpp14
-rw-r--r--src/rt/rust_task.h1
-rw-r--r--src/rt/rust_upcall.cpp10
-rw-r--r--src/rt/rust_util.h3
-rw-r--r--src/rt/sync/sync.h2
-rw-r--r--src/rt/test/rust_test_runtime.cpp13
-rw-r--r--src/test/run-pass/lib-task.rs41
14 files changed, 239 insertions, 185 deletions
diff --git a/src/rt/circular_buffer.cpp b/src/rt/circular_buffer.cpp
index b645a08e563..aa0127d8c25 100644
--- a/src/rt/circular_buffer.cpp
+++ b/src/rt/circular_buffer.cpp
@@ -5,7 +5,6 @@
 #include "rust_internal.h"
 
 circular_buffer::circular_buffer(rust_kernel *kernel, size_t unit_sz) :
-    sched(kernel->sched),
     kernel(kernel),
     unit_sz(unit_sz),
     _buffer_sz(initial_size()),
@@ -13,26 +12,26 @@ circular_buffer::circular_buffer(rust_kernel *kernel, size_t unit_sz) :
     _unread(0),
     _buffer((uint8_t *)kernel->malloc(_buffer_sz, "circular_buffer")) {
 
-    A(sched, unit_sz, "Unit size must be larger than zero.");
+    // A(sched, unit_sz, "Unit size must be larger than zero.");
 
-    DLOG(sched, mem, "new circular_buffer(buffer_sz=%d, unread=%d)"
-         "-> circular_buffer=0x%" PRIxPTR,
-         _buffer_sz, _unread, this);
+    // DLOG(sched, mem, "new circular_buffer(buffer_sz=%d, unread=%d)"
+    //      "-> circular_buffer=0x%" PRIxPTR,
+    //      _buffer_sz, _unread, this);
 
-    A(sched, _buffer, "Failed to allocate buffer.");
+    // A(sched, _buffer, "Failed to allocate buffer.");
 }
 
 circular_buffer::~circular_buffer() {
-    DLOG(sched, mem, "~circular_buffer 0x%" PRIxPTR, this);
-    I(sched, _buffer);
-    W(sched, _unread == 0,
-      "freeing circular_buffer with %d unread bytes", _unread);
+    // DLOG(sched, mem, "~circular_buffer 0x%" PRIxPTR, this);
+    // I(sched, _buffer);
+    // W(sched, _unread == 0,
+    //   "freeing circular_buffer with %d unread bytes", _unread);
     kernel->free(_buffer);
 }
 
 size_t
 circular_buffer::initial_size() {
-    I(sched, unit_sz > 0);
+    // I(sched, unit_sz > 0);
     return INITIAL_CIRCULAR_BUFFER_SIZE_IN_UNITS * unit_sz;
 }
 
@@ -41,8 +40,8 @@ circular_buffer::initial_size() {
  */
 void
 circular_buffer::transfer(void *dst) {
-    I(sched, dst);
-    I(sched, _unread <= _buffer_sz);
+    // I(sched, dst);
+    // I(sched, _unread <= _buffer_sz);
 
     uint8_t *ptr = (uint8_t *) dst;
 
@@ -54,13 +53,13 @@ circular_buffer::transfer(void *dst) {
     } else {
         head_sz = _buffer_sz - _next;
     }
-    I(sched, _next + head_sz <= _buffer_sz);
+    // I(sched, _next + head_sz <= _buffer_sz);
     memcpy(ptr, _buffer + _next, head_sz);
 
     // Then copy any other items from the beginning of the buffer
-    I(sched, _unread >= head_sz);
+    // I(sched, _unread >= head_sz);
     size_t tail_sz = _unread - head_sz;
-    I(sched, head_sz + tail_sz <= _buffer_sz);
+    // I(sched, head_sz + tail_sz <= _buffer_sz);
     memcpy(ptr + head_sz, _buffer, tail_sz);
 }
 
@@ -70,21 +69,21 @@ circular_buffer::transfer(void *dst) {
  */
 void
 circular_buffer::enqueue(void *src) {
-    I(sched, src);
-    I(sched, _unread <= _buffer_sz);
-    I(sched, _buffer);
+    // I(sched, src);
+    // I(sched, _unread <= _buffer_sz);
+    // I(sched, _buffer);
 
     // Grow if necessary.
     if (_unread == _buffer_sz) {
         grow();
     }
 
-    DLOG(sched, mem, "circular_buffer enqueue "
-         "unread: %d, next: %d, buffer_sz: %d, unit_sz: %d",
-         _unread, _next, _buffer_sz, unit_sz);
+    // DLOG(sched, mem, "circular_buffer enqueue "
+    //      "unread: %d, next: %d, buffer_sz: %d, unit_sz: %d",
+    //      _unread, _next, _buffer_sz, unit_sz);
 
-    I(sched, _unread < _buffer_sz);
-    I(sched, _unread + unit_sz <= _buffer_sz);
+    // I(sched, _unread < _buffer_sz);
+    // I(sched, _unread + unit_sz <= _buffer_sz);
 
     // Copy data
     size_t dst_idx = _next + _unread;
@@ -92,15 +91,15 @@ circular_buffer::enqueue(void *src) {
     if (dst_idx >= _buffer_sz) {
         dst_idx -= _buffer_sz;
 
-        I(sched, _next >= unit_sz);
-        I(sched, dst_idx <= _next - unit_sz);
+        // I(sched, _next >= unit_sz);
+        // I(sched, dst_idx <= _next - unit_sz);
     }
 
-    I(sched, dst_idx + unit_sz <= _buffer_sz);
+    // I(sched, dst_idx + unit_sz <= _buffer_sz);
     memcpy(&_buffer[dst_idx], src, unit_sz);
     _unread += unit_sz;
 
-    DLOG(sched, mem, "circular_buffer pushed data at index: %d", dst_idx);
+    // DLOG(sched, mem, "circular_buffer pushed data at index: %d", dst_idx);
 }
 
 /**
@@ -110,17 +109,17 @@ circular_buffer::enqueue(void *src) {
  */
 void
 circular_buffer::dequeue(void *dst) {
-    I(sched, unit_sz > 0);
-    I(sched, _unread >= unit_sz);
-    I(sched, _unread <= _buffer_sz);
-    I(sched, _buffer);
+    // I(sched, unit_sz > 0);
+    // I(sched, _unread >= unit_sz);
+    // I(sched, _unread <= _buffer_sz);
+    // I(sched, _buffer);
 
-    DLOG(sched, mem,
-             "circular_buffer dequeue "
-             "unread: %d, next: %d, buffer_sz: %d, unit_sz: %d",
-             _unread, _next, _buffer_sz, unit_sz);
+    // DLOG(sched, mem,
+    //          "circular_buffer dequeue "
+    //          "unread: %d, next: %d, buffer_sz: %d, unit_sz: %d",
+    //          _unread, _next, _buffer_sz, unit_sz);
 
-    I(sched, _next + unit_sz <= _buffer_sz);
+    // I(sched, _next + unit_sz <= _buffer_sz);
     if (dst != NULL) {
         memcpy(dst, &_buffer[_next], unit_sz);
     }
@@ -140,8 +139,9 @@ circular_buffer::dequeue(void *dst) {
 void
 circular_buffer::grow() {
     size_t new_buffer_sz = _buffer_sz * 2;
-    I(sched, new_buffer_sz <= MAX_CIRCULAR_BUFFER_SIZE);
-    DLOG(sched, mem, "circular_buffer is growing to %d bytes", new_buffer_sz);
+    // I(sched, new_buffer_sz <= MAX_CIRCULAR_BUFFER_SIZE);
+    // DLOG(sched, mem, "circular_buffer is growing to %d bytes",
+    //      new_buffer_sz);
     void *new_buffer = kernel->malloc(new_buffer_sz,
                                     "new circular_buffer (grow)");
     transfer(new_buffer);
@@ -154,9 +154,9 @@ circular_buffer::grow() {
 void
 circular_buffer::shrink() {
     size_t new_buffer_sz = _buffer_sz / 2;
-    I(sched, initial_size() <= new_buffer_sz);
-    DLOG(sched, mem, "circular_buffer is shrinking to %d bytes",
-         new_buffer_sz);
+    // I(sched, initial_size() <= new_buffer_sz);
+    // DLOG(sched, mem, "circular_buffer is shrinking to %d bytes",
+    //      new_buffer_sz);
     void *new_buffer = kernel->malloc(new_buffer_sz,
                                     "new circular_buffer (shrink)");
     transfer(new_buffer);
diff --git a/src/rt/rust.cpp b/src/rt/rust.cpp
index 06097e34197..df1486952eb 100644
--- a/src/rt/rust.cpp
+++ b/src/rt/rust.cpp
@@ -140,9 +140,10 @@ rust_start(uintptr_t main_fn, int argc, char **argv, void* crate_map) {
 
     update_log_settings(crate_map, getenv("RUST_LOG"));
     enable_claims(getenv("CHECK_CLAIMS"));
+    int num_threads = get_num_threads();
 
     rust_srv *srv = new rust_srv();
-    rust_kernel *kernel = new rust_kernel(srv);
+    rust_kernel *kernel = new rust_kernel(srv, num_threads);
     kernel->start();
     rust_task *root_task = kernel->create_task(NULL, "main");
     rust_scheduler *sched = root_task->sched;
@@ -158,11 +159,9 @@ rust_start(uintptr_t main_fn, int argc, char **argv, void* crate_map) {
 
     root_task->start(main_fn, (uintptr_t)args->args);
 
-    int num_threads = get_num_threads();
-
     DLOG(sched, dom, "Using %d worker threads.", num_threads);
 
-    int ret = kernel->start_task_threads(num_threads);
+    int ret = kernel->start_task_threads();
     delete args;
     delete kernel;
     delete srv;
diff --git a/src/rt/rust_chan.cpp b/src/rt/rust_chan.cpp
index 470be6e6a37..9253d7d0361 100644
--- a/src/rt/rust_chan.cpp
+++ b/src/rt/rust_chan.cpp
@@ -13,17 +13,17 @@ rust_chan::rust_chan(rust_kernel *kernel, maybe_proxy<rust_port> *port,
     if (port) {
         associate(port);
     }
-    DLOG(kernel->sched, comm, "new rust_chan(task=0x%" PRIxPTR
-        ", port=0x%" PRIxPTR ") -> chan=0x%" PRIxPTR,
-        (uintptr_t) task, (uintptr_t) port, (uintptr_t) this);
+    // DLOG(task->sched, comm, "new rust_chan(task=0x%" PRIxPTR
+    //     ", port=0x%" PRIxPTR ") -> chan=0x%" PRIxPTR,
+    //     (uintptr_t) task, (uintptr_t) port, (uintptr_t) this);
 }
 
 rust_chan::~rust_chan() {
-    DLOG(kernel->sched, comm, "del rust_chan(task=0x%" PRIxPTR ")",
-         (uintptr_t) this);
+    // DLOG(kernel->sched, comm, "del rust_chan(task=0x%" PRIxPTR ")",
+    //      (uintptr_t) this);
 
-    A(kernel->sched, is_associated() == false,
-      "Channel must be disassociated before being freed.");
+    // A(kernel->sched, is_associated() == false,
+    //   "Channel must be disassociated before being freed.");
 }
 
 /**
@@ -33,9 +33,9 @@ void rust_chan::associate(maybe_proxy<rust_port> *port) {
     this->port = port;
     if (port->is_proxy() == false) {
         scoped_lock with(port->referent()->lock);
-        DLOG(kernel->sched, task,
-            "associating chan: 0x%" PRIxPTR " with port: 0x%" PRIxPTR,
-            this, port);
+        // DLOG(kernel->sched, task,
+        //     "associating chan: 0x%" PRIxPTR " with port: 0x%" PRIxPTR,
+        //     this, port);
         ++this->ref_count;
         this->task = port->referent()->task;
         this->task->ref();
@@ -51,14 +51,14 @@ bool rust_chan::is_associated() {
  * Unlink this channel from its associated port.
  */
 void rust_chan::disassociate() {
-    A(kernel->sched, is_associated(),
-      "Channel must be associated with a port.");
+    // A(kernel->sched, is_associated(),
+    //   "Channel must be associated with a port.");
 
     if (port->is_proxy() == false) {
         scoped_lock with(port->referent()->lock);
-        DLOG(kernel->sched, task,
-            "disassociating chan: 0x%" PRIxPTR " from port: 0x%" PRIxPTR,
-            this, port->referent());
+        // DLOG(kernel->sched, task,
+        //     "disassociating chan: 0x%" PRIxPTR " from port: 0x%" PRIxPTR,
+        //     this, port->referent());
         --this->ref_count;
         --this->task->ref_count;
         this->task = NULL;
@@ -73,8 +73,8 @@ void rust_chan::disassociate() {
  * Attempt to send data to the associated port.
  */
 void rust_chan::send(void *sptr) {
-    rust_scheduler *sched = kernel->sched;
-    I(sched, !port->is_proxy());
+    // rust_scheduler *sched = kernel->sched;
+    // I(sched, !port->is_proxy());
 
     rust_port *target_port = port->referent();
     // TODO: We can probably avoid this lock by using atomic operations in
@@ -84,13 +84,13 @@ void rust_chan::send(void *sptr) {
     buffer.enqueue(sptr);
 
     if (!is_associated()) {
-        W(sched, is_associated(),
-          "rust_chan::transmit with no associated port.");
+        // W(sched, is_associated(),
+        //   "rust_chan::transmit with no associated port.");
         return;
     }
 
-    A(sched, !buffer.is_empty(),
-      "rust_chan::transmit with nothing to send.");
+    // A(sched, !buffer.is_empty(),
+    //   "rust_chan::transmit with nothing to send.");
 
     if (port->is_proxy()) {
         data_message::send(buffer.peek(), buffer.unit_sz, "send data",
@@ -98,7 +98,7 @@ void rust_chan::send(void *sptr) {
         buffer.dequeue(NULL);
     } else {
         if (target_port->task->blocked_on(target_port)) {
-            DLOG(sched, comm, "dequeued in rendezvous_ptr");
+            // DLOG(sched, comm, "dequeued in rendezvous_ptr");
             buffer.dequeue(target_port->task->rendezvous_ptr);
             target_port->task->rendezvous_ptr = 0;
             target_port->task->wakeup(target_port);
@@ -120,7 +120,7 @@ rust_chan *rust_chan::clone(maybe_proxy<rust_task> *target) {
         rust_handle<rust_port> *handle =
             task->sched->kernel->get_port_handle(port->as_referent());
         maybe_proxy<rust_port> *proxy = new rust_proxy<rust_port> (handle);
-        DLOG(kernel->sched, mem, "new proxy: " PTR, proxy);
+        DLOG(task->sched, mem, "new proxy: " PTR, proxy);
         port = proxy;
         target_task = target->as_proxy()->handle()->referent();
     }
@@ -133,8 +133,8 @@ rust_chan *rust_chan::clone(maybe_proxy<rust_task> *target) {
  * appear to be live, causing modify-after-free errors.
  */
 void rust_chan::destroy() {
-    A(kernel->sched, ref_count == 0,
-      "Channel's ref count should be zero.");
+    // A(kernel->sched, ref_count == 0,
+    //   "Channel's ref count should be zero.");
 
     if (is_associated()) {
         if (port->is_proxy()) {
diff --git a/src/rt/rust_kernel.cpp b/src/rt/rust_kernel.cpp
index 53c2d945b09..1eb82602798 100644
--- a/src/rt/rust_kernel.cpp
+++ b/src/rt/rust_kernel.cpp
@@ -7,36 +7,40 @@
       }                                                    \
   } while (0)
 
-rust_kernel::rust_kernel(rust_srv *srv) :
+rust_kernel::rust_kernel(rust_srv *srv, size_t num_threads) :
     _region(srv, true),
     _log(srv, NULL),
-    _srv(srv),
-    _interrupt_kernel_loop(FALSE)
+    srv(srv),
+    _interrupt_kernel_loop(FALSE),
+    num_threads(num_threads),
+    rval(0),
+    live_tasks(0)
 {
-    sched = create_scheduler("main");
+    isaac_init(this, &rctx);
+    create_schedulers();
 }
 
 rust_scheduler *
-rust_kernel::create_scheduler(const char *name) {
+rust_kernel::create_scheduler(int id) {
     _kernel_lock.lock();
     rust_message_queue *message_queue =
-        new (this, "rust_message_queue") rust_message_queue(_srv, this);
-    rust_srv *srv = _srv->clone();
+        new (this, "rust_message_queue") rust_message_queue(srv, this);
+    rust_srv *srv = this->srv->clone();
     rust_scheduler *sched =
         new (this, "rust_scheduler")
-        rust_scheduler(this, message_queue, srv, name);
+        rust_scheduler(this, message_queue, srv, id);
     rust_handle<rust_scheduler> *handle = internal_get_sched_handle(sched);
     message_queue->associate(handle);
     message_queues.append(message_queue);
-    KLOG("created scheduler: " PTR ", name: %s, index: %d",
-         sched, name, sched->list_index);
+    KLOG("created scheduler: " PTR ", id: %d, index: %d",
+         sched, id, sched->list_index);
     _kernel_lock.signal_all();
     _kernel_lock.unlock();
     return sched;
 }
 
 void
-rust_kernel::destroy_scheduler() {
+rust_kernel::destroy_scheduler(rust_scheduler *sched) {
     _kernel_lock.lock();
     KLOG("deleting scheduler: " PTR ", name: %s, index: %d",
         sched, sched->name, sched->list_index);
@@ -48,6 +52,18 @@ rust_kernel::destroy_scheduler() {
     _kernel_lock.unlock();
 }
 
+void rust_kernel::create_schedulers() {
+    for(int i = 0; i < num_threads; ++i) {
+        threads.push(create_scheduler(i));
+    }
+}
+
+void rust_kernel::destroy_schedulers() {
+    for(int i = 0; i < num_threads; ++i) {
+        destroy_scheduler(threads[i]);
+    }
+}
+
 rust_handle<rust_scheduler> *
 rust_kernel::internal_get_sched_handle(rust_scheduler *sched) {
     rust_handle<rust_scheduler> *handle = NULL;
@@ -59,14 +75,6 @@ rust_kernel::internal_get_sched_handle(rust_scheduler *sched) {
     return handle;
 }
 
-rust_handle<rust_scheduler> *
-rust_kernel::get_sched_handle(rust_scheduler *sched) {
-    _kernel_lock.lock();
-    rust_handle<rust_scheduler> *handle = internal_get_sched_handle(sched);
-    _kernel_lock.unlock();
-    return handle;
-}
-
 rust_handle<rust_task> *
 rust_kernel::get_task_handle(rust_task *task) {
     _kernel_lock.lock();
@@ -98,7 +106,9 @@ rust_kernel::get_port_handle(rust_port *port) {
 
 void
 rust_kernel::log_all_scheduler_state() {
-    sched->log_state();
+    for(int i = 0; i < num_threads; ++i) {
+        threads[i]->log_state();
+    }
 }
 
 /**
@@ -170,7 +180,7 @@ rust_kernel::terminate_kernel_loop() {
 }
 
 rust_kernel::~rust_kernel() {
-    destroy_scheduler();
+    destroy_schedulers();
 
     terminate_kernel_loop();
 
@@ -193,7 +203,7 @@ rust_kernel::~rust_kernel() {
 
     rust_message_queue *queue = NULL;
     while (message_queues.pop(&queue)) {
-        K(_srv, queue->is_empty(), "Kernel message queue should be empty "
+        K(srv, queue->is_empty(), "Kernel message queue should be empty "
           "before killing the kernel.");
         delete queue;
     }
@@ -240,30 +250,25 @@ rust_kernel::signal_kernel_lock() {
     _kernel_lock.unlock();
 }
 
-int rust_kernel::start_task_threads(int num_threads)
+int rust_kernel::start_task_threads()
 {
-    rust_task_thread *thread = NULL;
-
-    // -1, because this thread will also be a thread.
-    for(int i = 0; i < num_threads - 1; ++i) {
-        thread = new rust_task_thread(i + 1, this);
+    for(int i = 0; i < num_threads; ++i) {
+        rust_scheduler *thread = threads[i];
         thread->start();
-        threads.push(thread);
     }
 
-    sched->start_main_loop(0);
-
-    while(threads.pop(&thread)) {
+    for(int i = 0; i < num_threads; ++i) {
+        rust_scheduler *thread = threads[i];
         thread->join();
-        delete thread;
     }
 
-    return sched->rval;
+    return rval;
 }
 
 rust_task *
 rust_kernel::create_task(rust_task *spawner, const char *name) {
-    return sched->create_task(spawner, name);
+    // TODO: use a different rand.
+    return threads[rand(&rctx) % num_threads]->create_task(spawner, name);
 }
 
 #ifdef __WIN32__
@@ -285,16 +290,6 @@ rust_kernel::win32_require(LPCTSTR fn, BOOL ok) {
 }
 #endif
 
-rust_task_thread::rust_task_thread(int id, rust_kernel *owner)
-    : id(id), owner(owner)
-{
-}
-
-void rust_task_thread::run()
-{
-    owner->sched->start_main_loop(id);
-}
-
 //
 // Local Variables:
 // mode: C++
diff --git a/src/rt/rust_kernel.h b/src/rt/rust_kernel.h
index 07f4ff2f787..8be9bb96e90 100644
--- a/src/rt/rust_kernel.h
+++ b/src/rt/rust_kernel.h
@@ -45,7 +45,10 @@ class rust_task_thread;
 class rust_kernel : public rust_thread {
     memory_region _region;
     rust_log _log;
-    rust_srv *_srv;
+
+public:
+    rust_srv *srv;
+private:
 
     /**
      * Task proxy objects are kernel owned handles to Rust objects.
@@ -62,20 +65,29 @@ class rust_kernel : public rust_thread {
 
     lock_and_signal _kernel_lock;
 
+    const size_t num_threads;
+
     void terminate_kernel_loop();
     void pump_message_queues();
 
     rust_handle<rust_scheduler> *
     internal_get_sched_handle(rust_scheduler *sched);
 
-    array_list<rust_task_thread *> threads;
+    array_list<rust_scheduler *> threads;
+
+    randctx rctx;
 
-    rust_scheduler *create_scheduler(const char *name);
-    void destroy_scheduler();
+    rust_scheduler *create_scheduler(int id);
+    void destroy_scheduler(rust_scheduler *sched);
+
+    void create_schedulers();
+    void destroy_schedulers();
 
 public:
-    rust_scheduler *sched;
-    lock_and_signal scheduler_lock;
+
+    int rval;
+
+    volatile int live_tasks;
 
     /**
      * Message queues are kernel objects and are associated with domains.
@@ -86,11 +98,10 @@ public:
      */
     indexed_list<rust_message_queue> message_queues;
 
-    rust_handle<rust_scheduler> *get_sched_handle(rust_scheduler *sched);
     rust_handle<rust_task> *get_task_handle(rust_task *task);
     rust_handle<rust_port> *get_port_handle(rust_port *port);
 
-    rust_kernel(rust_srv *srv);
+    rust_kernel(rust_srv *srv, size_t num_threads);
 
     bool is_deadlocked();
 
@@ -113,10 +124,7 @@ public:
     void *realloc(void *mem, size_t size);
     void free(void *mem);
 
-    // FIXME: this should go away
-    inline rust_scheduler *get_scheduler() const { return sched; }
-
-    int start_task_threads(int num_threads);
+    int start_task_threads();
 
 #ifdef __WIN32__
     void win32_require(LPCTSTR fn, BOOL ok);
@@ -125,14 +133,4 @@ public:
     rust_task *create_task(rust_task *spawner, const char *name);
 };
 
-class rust_task_thread : public rust_thread {
-    int id;
-    rust_kernel *owner;
-
-public:
-    rust_task_thread(int id, rust_kernel *owner);
-
-    virtual void run();
-};
-
 #endif /* RUST_KERNEL_H */
diff --git a/src/rt/rust_scheduler.cpp b/src/rt/rust_scheduler.cpp
index 09a78cebddb..437be04e272 100644
--- a/src/rt/rust_scheduler.cpp
+++ b/src/rt/rust_scheduler.cpp
@@ -4,21 +4,23 @@
 #include "globals.h"
 
 rust_scheduler::rust_scheduler(rust_kernel *kernel,
-    rust_message_queue *message_queue, rust_srv *srv,
-    const char *name) :
+                               rust_message_queue *message_queue,
+                               rust_srv *srv,
+                               int id) :
     interrupt_flag(0),
     _log(srv, this),
     log_lvl(log_note),
     srv(srv),
-    name(name),
+    // TODO: calculate a per scheduler name.
+    name("main"),
     newborn_tasks(this, "newborn"),
     running_tasks(this, "running"),
     blocked_tasks(this, "blocked"),
     dead_tasks(this, "dead"),
     cache(this),
-    rval(0),
     kernel(kernel),
-    message_queue(message_queue)
+    message_queue(message_queue),
+    id(id)
 {
     LOGPTR(this, "new dom", (uintptr_t)this);
     isaac_init(this, &rctx);
@@ -47,9 +49,9 @@ rust_scheduler::activate(rust_task *task) {
 
     task->ctx.next = &ctx;
     DLOG(this, task, "descheduling...");
-    kernel->scheduler_lock.unlock();
+    lock.unlock();
     task->ctx.swap(ctx);
-    kernel->scheduler_lock.lock();
+    lock.lock();
     DLOG(this, task, "task has returned");
 }
 
@@ -67,8 +69,8 @@ void
 rust_scheduler::fail() {
     log(NULL, log_err, "domain %s @0x%" PRIxPTR " root task failed",
         name, this);
-    I(this, rval == 0);
-    rval = 1;
+    I(this, kernel->rval == 0);
+    kernel->rval = 1;
     exit(1);
 }
 
@@ -82,7 +84,7 @@ rust_scheduler::number_of_live_tasks() {
  */
 void
 rust_scheduler::reap_dead_tasks(int id) {
-    I(this, kernel->scheduler_lock.lock_held_by_current_thread());
+    I(this, lock.lock_held_by_current_thread());
     for (size_t i = 0; i < dead_tasks.length(); ) {
         rust_task *task = dead_tasks[i];
         // Make sure this task isn't still running somewhere else...
@@ -93,6 +95,7 @@ rust_scheduler::reap_dead_tasks(int id) {
                 "deleting unreferenced dead task %s @0x%" PRIxPTR,
                 task->name, task);
             delete task;
+            sync::decrement(kernel->live_tasks);
             continue;
         }
         ++i;
@@ -180,9 +183,9 @@ rust_scheduler::log_state() {
  * Returns once no more tasks can be scheduled and all task ref_counts
  * drop to zero.
  */
-int
-rust_scheduler::start_main_loop(int id) {
-    kernel->scheduler_lock.lock();
+void
+rust_scheduler::start_main_loop() {
+    lock.lock();
 
     // Make sure someone is watching, to pull us out of infinite loops.
     //
@@ -193,11 +196,11 @@ rust_scheduler::start_main_loop(int id) {
 
     DLOG(this, dom, "started domain loop %d", id);
 
-    while (number_of_live_tasks() > 0) {
+    while (kernel->live_tasks > 0) {
         A(this, kernel->is_deadlocked() == false, "deadlock");
 
-        DLOG(this, dom, "worker %d, number_of_live_tasks = %d",
-             id, number_of_live_tasks());
+        DLOG(this, dom, "worker %d, number_of_live_tasks = %d, total = %d",
+             id, number_of_live_tasks(), kernel->live_tasks);
 
         drain_incoming_message_queue(true);
 
@@ -212,11 +215,12 @@ rust_scheduler::start_main_loop(int id) {
             DLOG(this, task,
                  "all tasks are blocked, scheduler id %d yielding ...",
                  id);
-            kernel->scheduler_lock.unlock();
+            lock.unlock();
             sync::sleep(100);
-            kernel->scheduler_lock.lock();
+            lock.lock();
             DLOG(this, task,
                 "scheduler resuming ...");
+            reap_dead_tasks(id);
             continue;
         }
 
@@ -264,19 +268,18 @@ rust_scheduler::start_main_loop(int id) {
                 "scheduler yielding ...",
                 dead_tasks.length());
             log_state();
-            kernel->scheduler_lock.unlock();
+            lock.unlock();
             sync::yield();
-            kernel->scheduler_lock.lock();
+            lock.lock();
         } else {
             drain_incoming_message_queue(true);
         }
         reap_dead_tasks(id);
     }
 
-    DLOG(this, dom, "finished main-loop %d (dom.rval = %d)", id, rval);
+    DLOG(this, dom, "finished main-loop %d", id);
 
-    kernel->scheduler_lock.unlock();
-    return rval;
+    lock.unlock();
 }
 
 rust_crate_cache *
@@ -296,9 +299,16 @@ rust_scheduler::create_task(rust_task *spawner, const char *name) {
         task->on_wakeup(spawner->_on_wakeup);
     }
     newborn_tasks.append(task);
+
+    sync::increment(kernel->live_tasks);
+
     return task;
 }
 
+void rust_scheduler::run() {
+    this->start_main_loop();
+}
+
 //
 // Local Variables:
 // mode: C++
diff --git a/src/rt/rust_scheduler.h b/src/rt/rust_scheduler.h
index cabcdf210a8..c53e6157f06 100644
--- a/src/rt/rust_scheduler.h
+++ b/src/rt/rust_scheduler.h
@@ -27,7 +27,8 @@ public:
 };
 
 struct rust_scheduler : public kernel_owned<rust_scheduler>,
-                        rc_base<rust_scheduler>
+                        rc_base<rust_scheduler>,
+                        rust_thread
 {
     // Fields known to the compiler:
     uintptr_t interrupt_flag;
@@ -46,7 +47,6 @@ struct rust_scheduler : public kernel_owned<rust_scheduler>,
     rust_crate_cache cache;
 
     randctx rctx;
-    int rval;
 
     rust_kernel *kernel;
     int32_t list_index;
@@ -57,6 +57,10 @@ struct rust_scheduler : public kernel_owned<rust_scheduler>,
     // Incoming messages from other domains.
     rust_message_queue *message_queue;
 
+    const int id;
+
+    lock_and_signal lock;
+
 #ifndef __WIN32__
     pthread_attr_t attr;
 #endif
@@ -64,8 +68,8 @@ struct rust_scheduler : public kernel_owned<rust_scheduler>,
     // Only a pointer to 'name' is kept, so it must live as long as this
     // domain.
     rust_scheduler(rust_kernel *kernel,
-             rust_message_queue *message_queue, rust_srv *srv,
-             const char *name);
+                   rust_message_queue *message_queue, rust_srv *srv,
+                   int id);
     ~rust_scheduler();
     void activate(rust_task *task);
     void log(rust_task *task, uint32_t level, char const *fmt, ...);
@@ -80,11 +84,13 @@ struct rust_scheduler : public kernel_owned<rust_scheduler>,
     void reap_dead_tasks(int id);
     rust_task *schedule_task(int id);
 
-    int start_main_loop(int id);
+    void start_main_loop();
 
     void log_state();
 
     rust_task *create_task(rust_task *spawner, const char *name);
+
+    virtual void run();
 };
 
 inline rust_log &
diff --git a/src/rt/rust_task.cpp b/src/rt/rust_task.cpp
index de6b00acb3f..10ea48f57c2 100644
--- a/src/rt/rust_task.cpp
+++ b/src/rt/rust_task.cpp
@@ -83,7 +83,8 @@ rust_task::rust_task(rust_scheduler *sched, rust_task_list *state,
     pinned_on(-1),
     local_region(&sched->srv->local_region),
     _on_wakeup(NULL),
-    failed(false)
+    failed(false),
+    propagate_failure(true)
 {
     LOGPTR(sched, "new task", (uintptr_t)this);
     DLOG(sched, task, "sizeof(task) = %d (0x%x)", sizeof *this, sizeof *this);
@@ -207,8 +208,8 @@ rust_task::kill() {
     // Unblock the task so it can unwind.
     unblock();
 
-    // if (this == sched->root_task)
-    //     sched->fail();
+    if (NULL == supervisor && propagate_failure)
+        sched->fail();
 
     LOG(this, task, "preparing to unwind task: 0x%" PRIxPTR, this);
     // run_on_resume(rust_unwind_glue);
@@ -229,6 +230,8 @@ rust_task::fail() {
         supervisor->kill();
     }
     // FIXME: implement unwinding again.
+    if (NULL == supervisor && propagate_failure)
+        sched->fail();
     failed = true;
 }
 
@@ -248,6 +251,7 @@ rust_task::unsupervise()
              " disconnecting from supervisor %s @0x%" PRIxPTR,
              name, this, supervisor->name, supervisor);
     supervisor = NULL;
+    propagate_failure = false;
 }
 
 void
@@ -397,8 +401,8 @@ rust_task::free(void *p, bool is_gc)
 
 void
 rust_task::transition(rust_task_list *src, rust_task_list *dst) {
-    I(sched, !kernel->scheduler_lock.lock_held_by_current_thread());
-    scoped_lock with(kernel->scheduler_lock);
+    I(sched, !sched->lock.lock_held_by_current_thread());
+    scoped_lock with(sched->lock);
     DLOG(sched, task,
          "task %s " PTR " state change '%s' -> '%s' while in '%s'",
          name, (uintptr_t)this, src->name, dst->name, state->name);
diff --git a/src/rt/rust_task.h b/src/rt/rust_task.h
index b1984b9d40b..9b1a3a39582 100644
--- a/src/rt/rust_task.h
+++ b/src/rt/rust_task.h
@@ -91,6 +91,7 @@ rust_task : public maybe_proxy<rust_task>,
 
     // Indicates that the task ended in failure
     bool failed;
+    bool propagate_failure;
 
     lock_and_signal lock;
 
diff --git a/src/rt/rust_upcall.cpp b/src/rt/rust_upcall.cpp
index 8313399130c..3415b6b62ae 100644
--- a/src/rt/rust_upcall.cpp
+++ b/src/rt/rust_upcall.cpp
@@ -541,9 +541,9 @@ extern "C" CDECL rust_task *
 upcall_new_task(rust_task *spawner, rust_vec *name) {
     // name is a rust string structure.
     LOG_UPCALL_ENTRY(spawner);
-    scoped_lock with(spawner->kernel->scheduler_lock);
-    rust_scheduler *sched = spawner->sched;
-    rust_task *task = sched->create_task(spawner, (const char *)name->data);
+    scoped_lock with(spawner->sched->lock);
+    rust_task *task =
+        spawner->kernel->create_task(spawner, (const char *)name->data);
     return task;
 }
 
@@ -584,7 +584,7 @@ upcall_ivec_resize_shared(rust_task *task,
                           rust_ivec *v,
                           size_t newsz) {
     LOG_UPCALL_ENTRY(task);
-    scoped_lock with(task->kernel->scheduler_lock);
+    scoped_lock with(task->sched->lock);
     I(task->sched, !v->fill);
 
     size_t new_alloc = next_power_of_two(newsz);
@@ -604,7 +604,7 @@ upcall_ivec_spill_shared(rust_task *task,
                          rust_ivec *v,
                          size_t newsz) {
     LOG_UPCALL_ENTRY(task);
-    scoped_lock with(task->kernel->scheduler_lock);
+    scoped_lock with(task->sched->lock);
     size_t new_alloc = next_power_of_two(newsz);
 
     rust_ivec_heap *heap_part = (rust_ivec_heap *)
diff --git a/src/rt/rust_util.h b/src/rt/rust_util.h
index e1644e9f3cf..89e7f2e7bed 100644
--- a/src/rt/rust_util.h
+++ b/src/rt/rust_util.h
@@ -126,8 +126,9 @@ next_power_of_two(size_t s)
 
 // Initialization helper for ISAAC RNG
 
+template <typename sched_or_kernel>
 static inline void
-isaac_init(rust_scheduler *sched, randctx *rctx)
+isaac_init(sched_or_kernel *sched, randctx *rctx)
 {
         memset(rctx, 0, sizeof(randctx));
 
diff --git a/src/rt/sync/sync.h b/src/rt/sync/sync.h
index a932ef1c2ca..8298f402881 100644
--- a/src/rt/sync/sync.h
+++ b/src/rt/sync/sync.h
@@ -1,4 +1,4 @@
-// -*- c++-mode -*-
+// -*- c++ -*-
 #ifndef SYNC_H
 #define SYNC_H
 
diff --git a/src/rt/test/rust_test_runtime.cpp b/src/rt/test/rust_test_runtime.cpp
index f9a99d9acb1..1e7c10944a7 100644
--- a/src/rt/test/rust_test_runtime.cpp
+++ b/src/rt/test/rust_test_runtime.cpp
@@ -11,17 +11,16 @@ rust_test_runtime::~rust_test_runtime() {
 
 void
 rust_domain_test::worker::run() {
-    rust_scheduler *handle = kernel->get_scheduler();
     for (int i = 0; i < TASKS; i++) {
-        handle->create_task(NULL, "child");
+        kernel->create_task(NULL, "child");
     }
-    sync::sleep(rand(&handle->rctx) % 1000);
+    //sync::sleep(rand(&handle->rctx) % 1000);
 }
 
 bool
 rust_domain_test::run() {
     rust_srv srv;
-    rust_kernel kernel(&srv);
+    rust_kernel kernel(&srv, 1);
 
     array_list<worker *> workers;
     for (int i = 0; i < DOMAINS; i++) {
@@ -47,13 +46,13 @@ void
 rust_task_test::worker::run() {
     rust_task *root_task = kernel->create_task(NULL, "main");
     root_task->start((uintptr_t)&task_entry, (uintptr_t)NULL);
-    root_task->sched->start_main_loop(0);
+    root_task->sched->start_main_loop();
 }
 
 bool
 rust_task_test::run() {
     rust_srv srv;
-    rust_kernel kernel(&srv);
+    rust_kernel kernel(&srv, 1);
 
     array_list<worker *> workers;
     for (int i = 0; i < DOMAINS; i++) {
@@ -62,6 +61,6 @@ rust_task_test::run() {
         worker->start();
     }
 
-    sync::sleep(rand(&kernel.sched->rctx) % 1000);
+    //sync::sleep(rand(&kernel.sched->rctx) % 1000);
     return true;
 }
diff --git a/src/test/run-pass/lib-task.rs b/src/test/run-pass/lib-task.rs
new file mode 100644
index 00000000000..313ec8afcf1
--- /dev/null
+++ b/src/test/run-pass/lib-task.rs
@@ -0,0 +1,41 @@
+
+
+// xfail-stage0
+
+use std;
+import std::task;
+
+fn test_sleep() { task::sleep(1000000u); }
+
+fn test_unsupervise() {
+  fn f() {
+    task::unsupervise();
+    fail;
+  }
+  spawn f();
+}
+
+fn test_join() {
+  fn winner() {
+  }
+
+  auto wintask = spawn winner();
+
+  assert task::join(wintask) == task::tr_success;
+
+  fn failer() {
+    task::unsupervise();
+    fail;
+  }
+
+  auto failtask = spawn failer();
+
+  assert task::join(failtask) == task::tr_failure;
+}
+
+fn main() {
+  // FIXME: Why aren't we running this?
+  //test_sleep();
+  test_unsupervise();
+  test_join();
+}