about summary refs log tree commit diff
path: root/src/libstd/rt
diff options
context:
space:
mode:
authorErick Tryzelaar <erick.tryzelaar@gmail.com>2013-08-08 19:27:03 -0700
committerErick Tryzelaar <erick.tryzelaar@gmail.com>2013-08-08 19:27:03 -0700
commit56730c094cf95be58fb05b0e423673aca2a98b88 (patch)
tree096a652b16d38a6f4ff65dd39657ccf308249909 /src/libstd/rt
parent03cc757fe90b88895fcf911d9cce5c04a008b127 (diff)
parent936f70bd878327d867b6f8f82061d738355a47c9 (diff)
downloadrust-56730c094cf95be58fb05b0e423673aca2a98b88.tar.gz
rust-56730c094cf95be58fb05b0e423673aca2a98b88.zip
Merge remote-tracking branch 'remotes/origin/master' into remove-str-trailing-nulls
Diffstat (limited to 'src/libstd/rt')
-rw-r--r--src/libstd/rt/comm.rs40
-rw-r--r--src/libstd/rt/env.rs28
-rw-r--r--src/libstd/rt/io/mem.rs2
-rw-r--r--src/libstd/rt/kill.rs23
-rw-r--r--src/libstd/rt/local.rs11
-rw-r--r--src/libstd/rt/mod.rs40
-rw-r--r--src/libstd/rt/sched.rs177
-rw-r--r--src/libstd/rt/select.rs6
-rw-r--r--src/libstd/rt/task.rs45
-rw-r--r--src/libstd/rt/test.rs33
10 files changed, 281 insertions, 124 deletions
diff --git a/src/libstd/rt/comm.rs b/src/libstd/rt/comm.rs
index a060059f5fc..936a6526508 100644
--- a/src/libstd/rt/comm.rs
+++ b/src/libstd/rt/comm.rs
@@ -159,7 +159,7 @@ impl<T> ChanOne<T> {
                     // Port is blocked. Wake it up.
                     let recvr = BlockedTask::cast_from_uint(task_as_state);
                     if do_resched {
-                        do recvr.wake().map_consume |woken_task| {
+                        do recvr.wake().map_move |woken_task| {
                             Scheduler::run_task(woken_task);
                         };
                     } else {
@@ -225,9 +225,10 @@ impl<T> Select for PortOne<T> {
     fn optimistic_check(&mut self) -> bool {
         // The optimistic check is never necessary for correctness. For testing
         // purposes, making it randomly return false simulates a racing sender.
-        use rand::{Rand, rng};
-        let mut rng = rng();
-        let actually_check = Rand::rand(&mut rng);
+        use rand::{Rand};
+        let actually_check = do Local::borrow::<Scheduler, bool> |sched| {
+            Rand::rand(&mut sched.rng)
+        };
         if actually_check {
             unsafe { (*self.packet()).state.load(Acquire) == STATE_ONE }
         } else {
@@ -381,7 +382,7 @@ impl<T> Drop for ChanOne<T> {
                     // The port is blocked waiting for a message we will never send. Wake it.
                     assert!((*this.packet()).payload.is_none());
                     let recvr = BlockedTask::cast_from_uint(task_as_state);
-                    do recvr.wake().map_consume |woken_task| {
+                    do recvr.wake().map_move |woken_task| {
                         Scheduler::run_task(woken_task);
                     };
                 }
@@ -508,7 +509,11 @@ impl<T> Peekable<T> for Port<T> {
     }
 }
 
-impl<T> Select for Port<T> {
+// XXX: Kind of gross. A Port<T> should be selectable so you can make an array
+// of them, but a &Port<T> should also be selectable so you can select2 on it
+// alongside a PortOne<U> without passing the port by value in recv_ready.
+
+impl<'self, T> Select for &'self Port<T> {
     #[inline]
     fn optimistic_check(&mut self) -> bool {
         do self.next.with_mut_ref |pone| { pone.optimistic_check() }
@@ -526,12 +531,29 @@ impl<T> Select for Port<T> {
     }
 }
 
-impl<T> SelectPort<(T, Port<T>)> for Port<T> {
-    fn recv_ready(self) -> Option<(T, Port<T>)> {
+impl<T> Select for Port<T> {
+    #[inline]
+    fn optimistic_check(&mut self) -> bool {
+        (&*self).optimistic_check()
+    }
+
+    #[inline]
+    fn block_on(&mut self, sched: &mut Scheduler, task: BlockedTask) -> bool {
+        (&*self).block_on(sched, task)
+    }
+
+    #[inline]
+    fn unblock_from(&mut self) -> bool {
+        (&*self).unblock_from()
+    }
+}
+
+impl<'self, T> SelectPort<T> for &'self Port<T> {
+    fn recv_ready(self) -> Option<T> {
         match self.next.take().recv_ready() {
             Some(StreamPayload { val, next }) => {
                 self.next.put_back(next);
-                Some((val, self))
+                Some(val)
             }
             None => None
         }
diff --git a/src/libstd/rt/env.rs b/src/libstd/rt/env.rs
index 1d7ff173149..6e671742fb6 100644
--- a/src/libstd/rt/env.rs
+++ b/src/libstd/rt/env.rs
@@ -10,7 +10,12 @@
 
 //! Runtime environment settings
 
+use from_str::FromStr;
 use libc::{size_t, c_char, c_int};
+use option::{Some, None};
+use os;
+
+// OLD RT stuff
 
 pub struct Environment {
     /// The number of threads to use by default
@@ -47,3 +52,26 @@ pub fn get() -> &Environment {
 extern {
     fn rust_get_rt_env() -> &Environment;
 }
+
+// NEW RT stuff
+
+// Note that these are all accessed without any synchronization.
+// They are expected to be initialized once then left alone.
+
+static mut MIN_STACK: uint = 2000000;
+
+pub fn init() {
+    unsafe {
+        match os::getenv("RUST_MIN_STACK") {
+            Some(s) => match FromStr::from_str(s) {
+                Some(i) => MIN_STACK = i,
+                None => ()
+            },
+            None => ()
+        }
+    }
+}
+
+pub fn min_stack() -> uint {
+    unsafe { MIN_STACK }
+}
diff --git a/src/libstd/rt/io/mem.rs b/src/libstd/rt/io/mem.rs
index c93945a6a9a..277897e5d2e 100644
--- a/src/libstd/rt/io/mem.rs
+++ b/src/libstd/rt/io/mem.rs
@@ -26,7 +26,7 @@ pub struct MemWriter {
 }
 
 impl MemWriter {
-    pub fn new() -> MemWriter { MemWriter { buf: ~[] } }
+    pub fn new() -> MemWriter { MemWriter { buf: vec::with_capacity(128) } }
 }
 
 impl Writer for MemWriter {
diff --git a/src/libstd/rt/kill.rs b/src/libstd/rt/kill.rs
index 3372c13b877..e07cb1425bf 100644
--- a/src/libstd/rt/kill.rs
+++ b/src/libstd/rt/kill.rs
@@ -402,10 +402,10 @@ impl KillHandle {
                     || {
                         // Prefer to check tombstones that were there first,
                         // being "more fair" at the expense of tail-recursion.
-                        others.take().map_consume_default(true, |f| f()) && {
+                        others.take().map_move_default(true, |f| f()) && {
                             let mut inner = this.take().unwrap();
                             (!inner.any_child_failed) &&
-                                inner.child_tombstones.take_map_default(true, |f| f())
+                                inner.child_tombstones.take().map_move_default(true, |f| f())
                         }
                     }
                 }
@@ -424,7 +424,7 @@ impl KillHandle {
                     let others = Cell::new(other_tombstones); // :(
                     || {
                         // Prefer fairness to tail-recursion, as in above case.
-                        others.take().map_consume_default(true, |f| f()) &&
+                        others.take().map_move_default(true, |f| f()) &&
                             f.take()()
                     }
                 }
@@ -493,7 +493,7 @@ impl Death {
         { use util; util::ignore(group); }
 
         // Step 1. Decide if we need to collect child failures synchronously.
-        do self.on_exit.take_map |on_exit| {
+        do self.on_exit.take().map_move |on_exit| {
             if success {
                 // We succeeded, but our children might not. Need to wait for them.
                 let mut inner = self.kill_handle.take_unwrap().unwrap();
@@ -501,7 +501,7 @@ impl Death {
                     success = false;
                 } else {
                     // Lockless access to tombstones protected by unwrap barrier.
-                    success = inner.child_tombstones.take_map_default(true, |f| f());
+                    success = inner.child_tombstones.take().map_move_default(true, |f| f());
                 }
             }
             on_exit(success);
@@ -510,12 +510,12 @@ impl Death {
         // Step 2. Possibly alert possibly-watching parent to failure status.
         // Note that as soon as parent_handle goes out of scope, the parent
         // can successfully unwrap its handle and collect our reported status.
-        do self.watching_parent.take_map |mut parent_handle| {
+        do self.watching_parent.take().map_move |mut parent_handle| {
             if success {
                 // Our handle might be None if we had an exit callback, and
                 // already unwrapped it. But 'success' being true means no
                 // child failed, so there's nothing to do (see below case).
-                do self.kill_handle.take_map |own_handle| {
+                do self.kill_handle.take().map_move |own_handle| {
                     own_handle.reparent_children_to(&mut parent_handle);
                 };
             } else {
@@ -590,7 +590,8 @@ impl Death {
     #[inline]
     pub fn assert_may_sleep(&self) {
         if self.wont_sleep != 0 {
-            rtabort!("illegal atomic-sleep: can't deschedule inside atomically()");
+            rtabort!("illegal atomic-sleep: attempt to reschedule while \
+                      using an Exclusive or LittleLock");
         }
     }
 }
@@ -614,6 +615,7 @@ mod test {
     // Test cases don't care about the spare killed flag.
     fn make_kill_handle() -> KillHandle { let (h,_) = KillHandle::new(); h }
 
+    #[ignore(reason = "linked failure")]
     #[test]
     fn no_tombstone_success() {
         do run_in_newsched_task {
@@ -819,6 +821,7 @@ mod test {
         }
     }
 
+    #[ignore(reason = "linked failure")]
     #[test]
     fn block_and_get_killed() {
         do with_test_task |mut task| {
@@ -830,6 +833,7 @@ mod test {
         }
     }
 
+    #[ignore(reason = "linked failure")]
     #[test]
     fn block_already_killed() {
         do with_test_task |mut task| {
@@ -839,6 +843,7 @@ mod test {
         }
     }
 
+    #[ignore(reason = "linked failure")]
     #[test]
     fn block_unkillably_and_get_killed() {
         do with_test_task |mut task| {
@@ -856,6 +861,7 @@ mod test {
         }
     }
 
+    #[ignore(reason = "linked failure")]
     #[test]
     fn block_on_pipe() {
         // Tests the "killable" path of casting to/from uint.
@@ -869,6 +875,7 @@ mod test {
         }
     }
 
+    #[ignore(reason = "linked failure")]
     #[test]
     fn block_unkillably_on_pipe() {
         // Tests the "indestructible" path of casting to/from uint.
diff --git a/src/libstd/rt/local.rs b/src/libstd/rt/local.rs
index 131507196b1..7154066e7b7 100644
--- a/src/libstd/rt/local.rs
+++ b/src/libstd/rt/local.rs
@@ -126,6 +126,7 @@ impl Local for IoFactoryObject {
 
 #[cfg(test)]
 mod test {
+    use option::None;
     use unstable::run_in_bare_thread;
     use rt::test::*;
     use super::*;
@@ -137,7 +138,7 @@ mod test {
         do run_in_bare_thread {
             local_ptr::init_tls_key();
             let mut sched = ~new_test_uv_sched();
-            let task = ~Task::new_root(&mut sched.stack_pool, || {});
+            let task = ~Task::new_root(&mut sched.stack_pool, None, || {});
             Local::put(task);
             let task: ~Task = Local::take();
             cleanup_task(task);
@@ -149,11 +150,11 @@ mod test {
         do run_in_bare_thread {
             local_ptr::init_tls_key();
             let mut sched = ~new_test_uv_sched();
-            let task = ~Task::new_root(&mut sched.stack_pool, || {});
+            let task = ~Task::new_root(&mut sched.stack_pool, None, || {});
             Local::put(task);
             let task: ~Task = Local::take();
             cleanup_task(task);
-            let task = ~Task::new_root(&mut sched.stack_pool, || {});
+            let task = ~Task::new_root(&mut sched.stack_pool, None, || {});
             Local::put(task);
             let task: ~Task = Local::take();
             cleanup_task(task);
@@ -166,7 +167,7 @@ mod test {
         do run_in_bare_thread {
             local_ptr::init_tls_key();
             let mut sched = ~new_test_uv_sched();
-            let task = ~Task::new_root(&mut sched.stack_pool, || {});
+            let task = ~Task::new_root(&mut sched.stack_pool, None, || {});
             Local::put(task);
 
             unsafe {
@@ -182,7 +183,7 @@ mod test {
         do run_in_bare_thread {
             local_ptr::init_tls_key();
             let mut sched = ~new_test_uv_sched();
-            let task = ~Task::new_root(&mut sched.stack_pool, || {});
+            let task = ~Task::new_root(&mut sched.stack_pool, None, || {});
             Local::put(task);
 
             let res = do Local::borrow::<Task,bool> |_task| {
diff --git a/src/libstd/rt/mod.rs b/src/libstd/rt/mod.rs
index 760ca8a9ada..01a52892f63 100644
--- a/src/libstd/rt/mod.rs
+++ b/src/libstd/rt/mod.rs
@@ -63,8 +63,7 @@ Several modules in `core` are clients of `rt`:
 use cell::Cell;
 use clone::Clone;
 use container::Container;
-use iter::Times;
-use iterator::{Iterator, IteratorUtil};
+use iterator::{Iterator, IteratorUtil, range};
 use option::{Some, None};
 use ptr::RawPtr;
 use rt::local::Local;
@@ -212,6 +211,7 @@ pub fn init(argc: int, argv: **u8, crate_map: *u8) {
     // Need to propagate the unsafety to `start`.
     unsafe {
         args::init(argc, argv);
+        env::init();
         logging::init(crate_map);
         rust_update_gc_metadata(crate_map);
     }
@@ -246,11 +246,16 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int {
 
     let main = Cell::new(main);
 
-    // The shared list of sleeping schedulers. Schedulers wake each other
-    // occassionally to do new work.
+    // The shared list of sleeping schedulers.
     let sleepers = SleeperList::new();
-    // The shared work queue. Temporary until work stealing is implemented.
-    let work_queue = WorkQueue::new();
+
+    // Create a work queue for each scheduler, ntimes. Create an extra
+    // for the main thread if that flag is set. We won't steal from it.
+    let mut work_queues = ~[];
+    for _ in range(0u, nscheds) {
+        let work_queue: WorkQueue<~Task> = WorkQueue::new();
+        work_queues.push(work_queue);
+    }
 
     // The schedulers.
     let mut scheds = ~[];
@@ -258,12 +263,15 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int {
     // sent the Shutdown message to terminate the schedulers.
     let mut handles = ~[];
 
-    do nscheds.times {
+    for i in range(0u, nscheds) {
         rtdebug!("inserting a regular scheduler");
 
         // Every scheduler is driven by an I/O event loop.
         let loop_ = ~UvEventLoop::new();
-        let mut sched = ~Scheduler::new(loop_, work_queue.clone(), sleepers.clone());
+        let mut sched = ~Scheduler::new(loop_,
+                                        work_queues[i].clone(),
+                                        work_queues.clone(),
+                                        sleepers.clone());
         let handle = sched.make_handle();
 
         scheds.push(sched);
@@ -279,9 +287,14 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int {
         let friend_handle = friend_sched.make_handle();
         scheds.push(friend_sched);
 
+        // This scheduler needs a queue that isn't part of the stealee
+        // set.
+        let work_queue = WorkQueue::new();
+
         let main_loop = ~UvEventLoop::new();
         let mut main_sched = ~Scheduler::new_special(main_loop,
-                                                     work_queue.clone(),
+                                                     work_queue,
+                                                     work_queues.clone(),
                                                      sleepers.clone(),
                                                      false,
                                                      Some(friend_handle));
@@ -330,8 +343,7 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int {
         // In the case where we do not use a main_thread scheduler we
         // run the main task in one of our threads.
 
-        let mut main_task = ~Task::new_root(&mut scheds[0].stack_pool,
-                                            main.take());
+        let mut main_task = ~Task::new_root(&mut scheds[0].stack_pool, None, main.take());
         main_task.death.on_exit = Some(on_exit.take());
         let main_task_cell = Cell::new(main_task);
 
@@ -351,7 +363,7 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int {
         let sched_cell = Cell::new(sched);
         let thread = do Thread::start {
             let mut sched = sched_cell.take();
-            let bootstrap_task = ~do Task::new_root(&mut sched.stack_pool) || {
+            let bootstrap_task = ~do Task::new_root(&mut sched.stack_pool, None) || {
                 rtdebug!("boostraping a non-primary scheduler");
             };
             sched.bootstrap(bootstrap_task);
@@ -368,10 +380,10 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int {
         let mut main_sched = main_sched.unwrap();
 
         let home = Sched(main_sched.make_handle());
-        let mut main_task = ~Task::new_root_homed(&mut main_sched.stack_pool,
+        let mut main_task = ~Task::new_root_homed(&mut main_sched.stack_pool, None,
                                                   home, main.take());
         main_task.death.on_exit = Some(on_exit.take());
-        rtdebug!("boostrapping main_task");
+        rtdebug!("bootstrapping main_task");
 
         main_sched.bootstrap(main_task);
     }
diff --git a/src/libstd/rt/sched.rs b/src/libstd/rt/sched.rs
index 1a75f2569b5..ce4e64c47d2 100644
--- a/src/libstd/rt/sched.rs
+++ b/src/libstd/rt/sched.rs
@@ -13,7 +13,6 @@ use option::{Option, Some, None};
 use cast::{transmute, transmute_mut_region, transmute_mut_unsafe};
 use clone::Clone;
 use unstable::raw;
-
 use super::sleeper_list::SleeperList;
 use super::work_queue::WorkQueue;
 use super::stack::{StackPool};
@@ -28,6 +27,9 @@ use rt::rtio::RemoteCallback;
 use rt::metrics::SchedMetrics;
 use borrow::{to_uint};
 use cell::Cell;
+use rand::{XorShiftRng, RngUtil};
+use iterator::{range};
+use vec::{OwnedVector};
 
 /// The Scheduler is responsible for coordinating execution of Coroutines
 /// on a single thread. When the scheduler is running it is owned by
@@ -37,9 +39,11 @@ use cell::Cell;
 /// XXX: This creates too many callbacks to run_sched_once, resulting
 /// in too much allocation and too many events.
 pub struct Scheduler {
-    /// A queue of available work. Under a work-stealing policy there
-    /// is one per Scheduler.
-    work_queue: WorkQueue<~Task>,
+    /// There are N work queues, one per scheduler.
+    priv work_queue: WorkQueue<~Task>,
+    /// Work queues for the other schedulers. These are created by
+    /// cloning the core work queues.
+    work_queues: ~[WorkQueue<~Task>],
     /// The queue of incoming messages from other schedulers.
     /// These are enqueued by SchedHandles after which a remote callback
     /// is triggered to handle the message.
@@ -70,7 +74,10 @@ pub struct Scheduler {
     run_anything: bool,
     /// If the scheduler shouldn't run some tasks, a friend to send
     /// them to.
-    friend_handle: Option<SchedHandle>
+    friend_handle: Option<SchedHandle>,
+    /// A fast XorShift rng for scheduler use
+    rng: XorShiftRng
+
 }
 
 pub struct SchedHandle {
@@ -97,10 +104,13 @@ impl Scheduler {
 
     pub fn new(event_loop: ~EventLoopObject,
                work_queue: WorkQueue<~Task>,
+               work_queues: ~[WorkQueue<~Task>],
                sleeper_list: SleeperList)
         -> Scheduler {
 
-        Scheduler::new_special(event_loop, work_queue, sleeper_list, true, None)
+        Scheduler::new_special(event_loop, work_queue,
+                               work_queues,
+                               sleeper_list, true, None)
 
     }
 
@@ -108,6 +118,7 @@ impl Scheduler {
     // task field is None.
     pub fn new_special(event_loop: ~EventLoopObject,
                        work_queue: WorkQueue<~Task>,
+                       work_queues: ~[WorkQueue<~Task>],
                        sleeper_list: SleeperList,
                        run_anything: bool,
                        friend: Option<SchedHandle>)
@@ -120,12 +131,14 @@ impl Scheduler {
             no_sleep: false,
             event_loop: event_loop,
             work_queue: work_queue,
+            work_queues: work_queues,
             stack_pool: StackPool::new(),
             sched_task: None,
             cleanup_job: None,
             metrics: SchedMetrics::new(),
             run_anything: run_anything,
-            friend_handle: friend
+            friend_handle: friend,
+            rng: XorShiftRng::new()
         }
     }
 
@@ -248,7 +261,7 @@ impl Scheduler {
 
         // Second activity is to try resuming a task from the queue.
 
-        let result = sched.resume_task_from_queue();
+        let result = sched.do_work();
         let mut sched = match result {
             Some(sched) => {
                 // Failed to dequeue a task, so we return.
@@ -325,7 +338,7 @@ impl Scheduler {
     /// As enqueue_task, but with the possibility for the blocked task to
     /// already have been killed.
     pub fn enqueue_blocked_task(&mut self, blocked_task: BlockedTask) {
-        do blocked_task.wake().map_consume |task| {
+        do blocked_task.wake().map_move |task| {
             self.enqueue_task(task);
         };
     }
@@ -415,47 +428,98 @@ impl Scheduler {
         }
     }
 
-    // Resume a task from the queue - but also take into account that
-    // it might not belong here.
+    // Workstealing: In this iteration of the runtime each scheduler
+    // thread has a distinct work queue. When no work is available
+    // locally, make a few attempts to steal work from the queues of
+    // other scheduler threads. If a few steals fail we end up in the
+    // old "no work" path which is fine.
+
+    // First step in the process is to find a task. This function does
+    // that by first checking the local queue, and if there is no work
+    // there, trying to steal from the remote work queues.
+    fn find_work(&mut self) -> Option<~Task> {
+        rtdebug!("scheduler looking for work");
+        match self.work_queue.pop() {
+            Some(task) => {
+                rtdebug!("found a task locally");
+                return Some(task)
+            }
+            None => {
+                // Our naive stealing, try kinda hard.
+                rtdebug!("scheduler trying to steal");
+                let _len = self.work_queues.len();
+                return self.try_steals(2);
+            }
+        }
+    }
+
+    // With no backoff try stealing n times from the queues the
+    // scheduler knows about. This naive implementation can steal from
+    // our own queue or from other special schedulers.
+    fn try_steals(&mut self, n: uint) -> Option<~Task> {
+        for _ in range(0, n) {
+            let index = self.rng.gen_uint_range(0, self.work_queues.len());
+            let work_queues = &mut self.work_queues;
+            match work_queues[index].steal() {
+                Some(task) => {
+                    rtdebug!("found task by stealing"); return Some(task)
+                }
+                None => ()
+            }
+        };
+        rtdebug!("giving up on stealing");
+        return None;
+    }
 
-    // If we perform a scheduler action we give away the scheduler ~
-    // pointer, if it is still available we return it.
+    // Given a task, execute it correctly.
+    fn process_task(~self, task: ~Task) -> Option<~Scheduler> {
+        let mut this = self;
+        let mut task = task;
 
-    fn resume_task_from_queue(~self) -> Option<~Scheduler> {
+        rtdebug!("processing a task");
 
+        let home = task.take_unwrap_home();
+        match home {
+            Sched(home_handle) => {
+                if home_handle.sched_id != this.sched_id() {
+                    rtdebug!("sending task home");
+                    task.give_home(Sched(home_handle));
+                    Scheduler::send_task_home(task);
+                    return Some(this);
+                } else {
+                    rtdebug!("running task here");
+                    task.give_home(Sched(home_handle));
+                    this.resume_task_immediately(task);
+                    return None;
+                }
+            }
+            AnySched if this.run_anything => {
+                rtdebug!("running anysched task here");
+                task.give_home(AnySched);
+                this.resume_task_immediately(task);
+                return None;
+            }
+            AnySched => {
+                rtdebug!("sending task to friend");
+                task.give_home(AnySched);
+                this.send_to_friend(task);
+                return Some(this);
+            }
+        }
+    }
+
+    // Bundle the helpers together.
+    fn do_work(~self) -> Option<~Scheduler> {
         let mut this = self;
 
-        match this.work_queue.pop() {
+        rtdebug!("scheduler calling do work");
+        match this.find_work() {
             Some(task) => {
-                let mut task = task;
-                let home = task.take_unwrap_home();
-                match home {
-                    Sched(home_handle) => {
-                        if home_handle.sched_id != this.sched_id() {
-                            task.give_home(Sched(home_handle));
-                            Scheduler::send_task_home(task);
-                            return Some(this);
-                        } else {
-                            this.event_loop.callback(Scheduler::run_sched_once);
-                            task.give_home(Sched(home_handle));
-                            this.resume_task_immediately(task);
-                            return None;
-                        }
-                    }
-                    AnySched if this.run_anything => {
-                        this.event_loop.callback(Scheduler::run_sched_once);
-                        task.give_home(AnySched);
-                        this.resume_task_immediately(task);
-                        return None;
-                    }
-                    AnySched => {
-                        task.give_home(AnySched);
-                        this.send_to_friend(task);
-                        return Some(this);
-                    }
-                }
+                rtdebug!("found some work! processing the task");
+                return this.process_task(task);
             }
             None => {
+                rtdebug!("no work was found, returning the scheduler struct");
                 return Some(this);
             }
         }
@@ -533,7 +597,7 @@ impl Scheduler {
                 sched.enqueue_blocked_task(last_task);
             }
         };
-        opt.map_consume(Local::put);
+        opt.map_move(Local::put);
     }
 
     // The primary function for changing contexts. In the current
@@ -711,7 +775,6 @@ impl Scheduler {
             GiveTask(task, f) => f.to_fn()(self, task)
         }
     }
-
 }
 
 // The cases for the below function.
@@ -745,6 +808,8 @@ impl ClosureConverter for UnsafeTaskReceiver {
 
 #[cfg(test)]
 mod test {
+    extern mod extra;
+
     use prelude::*;
     use rt::test::*;
     use unstable::run_in_bare_thread;
@@ -833,7 +898,7 @@ mod test {
             let mut sched = ~new_test_uv_sched();
             let sched_handle = sched.make_handle();
 
-            let mut task = ~do Task::new_root_homed(&mut sched.stack_pool,
+            let mut task = ~do Task::new_root_homed(&mut sched.stack_pool, None,
                                                 Sched(sched_handle)) {
                 unsafe { *task_ran_ptr = true };
                 assert!(Task::on_appropriate_sched());
@@ -862,12 +927,15 @@ mod test {
         do run_in_bare_thread {
 
             let sleepers = SleeperList::new();
-            let work_queue = WorkQueue::new();
+            let normal_queue = WorkQueue::new();
+            let special_queue = WorkQueue::new();
+            let queues = ~[normal_queue.clone(), special_queue.clone()];
 
             // Our normal scheduler
             let mut normal_sched = ~Scheduler::new(
                 ~UvEventLoop::new(),
-                work_queue.clone(),
+                normal_queue,
+                queues.clone(),
                 sleepers.clone());
 
             let normal_handle = Cell::new(normal_sched.make_handle());
@@ -877,7 +945,8 @@ mod test {
             // Our special scheduler
             let mut special_sched = ~Scheduler::new_special(
                 ~UvEventLoop::new(),
-                work_queue.clone(),
+                special_queue.clone(),
+                queues.clone(),
                 sleepers.clone(),
                 false,
                 Some(friend_handle));
@@ -893,21 +962,21 @@ mod test {
             //   3) task not homed, sched requeues
             //   4) task not home, send home
 
-            let task1 = ~do Task::new_root_homed(&mut special_sched.stack_pool,
+            let task1 = ~do Task::new_root_homed(&mut special_sched.stack_pool, None,
                                                  Sched(t1_handle)) || {
                 rtassert!(Task::on_appropriate_sched());
             };
             rtdebug!("task1 id: **%u**", borrow::to_uint(task1));
 
-            let task2 = ~do Task::new_root(&mut normal_sched.stack_pool) {
+            let task2 = ~do Task::new_root(&mut normal_sched.stack_pool, None) {
                 rtassert!(Task::on_appropriate_sched());
             };
 
-            let task3 = ~do Task::new_root(&mut normal_sched.stack_pool) {
+            let task3 = ~do Task::new_root(&mut normal_sched.stack_pool, None) {
                 rtassert!(Task::on_appropriate_sched());
             };
 
-            let task4 = ~do Task::new_root_homed(&mut special_sched.stack_pool,
+            let task4 = ~do Task::new_root_homed(&mut special_sched.stack_pool, None,
                                                  Sched(t4_handle)) {
                 rtassert!(Task::on_appropriate_sched());
             };
@@ -923,7 +992,7 @@ mod test {
             let port = Cell::new(port);
             let chan = Cell::new(chan);
 
-            let normal_task = ~do Task::new_root(&mut normal_sched.stack_pool) {
+            let normal_task = ~do Task::new_root(&mut normal_sched.stack_pool, None) {
                 rtdebug!("*about to submit task2*");
                 Scheduler::run_task(task2.take());
                 rtdebug!("*about to submit task4*");
@@ -938,7 +1007,7 @@ mod test {
 
             rtdebug!("normal task: %u", borrow::to_uint(normal_task));
 
-            let special_task = ~do Task::new_root(&mut special_sched.stack_pool) {
+            let special_task = ~do Task::new_root(&mut special_sched.stack_pool, None) {
                 rtdebug!("*about to submit task1*");
                 Scheduler::run_task(task1.take());
                 rtdebug!("*about to submit task3*");
diff --git a/src/libstd/rt/select.rs b/src/libstd/rt/select.rs
index 006b777b71b..0e8d26e9482 100644
--- a/src/libstd/rt/select.rs
+++ b/src/libstd/rt/select.rs
@@ -182,6 +182,7 @@ mod test {
     fn select_stream() {
         use util;
         use comm::GenericChan;
+        use iter::Times;
 
         // Sends 10 buffered packets, and uses select to retrieve them all.
         // Puts the port in a different spot in the vector each time.
@@ -199,9 +200,7 @@ mod test {
                 // get it back out
                 util::swap(port.get_mut_ref(), &mut ports[index]);
                 // NB. Not recv(), because optimistic_check randomly fails.
-                let (data, new_port) = port.take_unwrap().recv_ready().unwrap();
-                assert!(data == 31337);
-                port = Some(new_port);
+                assert!(port.get_ref().recv_ready().unwrap() == 31337);
             }
         }
     }
@@ -265,6 +264,7 @@ mod test {
 
         fn select_racing_senders_helper(killable: bool, send_on_chans: ~[uint]) {
             use rt::test::spawntask_random;
+            use iter::Times;
 
             do run_in_newsched_task {
                 // A bit of stress, since ordinarily this is just smoke and mirrors.
diff --git a/src/libstd/rt/task.rs b/src/libstd/rt/task.rs
index 4c5e4bdc3c1..364439a4526 100644
--- a/src/libstd/rt/task.rs
+++ b/src/libstd/rt/task.rs
@@ -20,6 +20,7 @@ use libc::{c_void, uintptr_t};
 use ptr;
 use prelude::*;
 use option::{Option, Some, None};
+use rt::env;
 use rt::kill::Death;
 use rt::local::Local;
 use rt::logging::StdErrLogger;
@@ -85,12 +86,13 @@ impl Task {
 
     // A helper to build a new task using the dynamically found
     // scheduler and task. Only works in GreenTask context.
-    pub fn build_homed_child(f: ~fn(), home: SchedHome) -> ~Task {
+    pub fn build_homed_child(stack_size: Option<uint>, f: ~fn(), home: SchedHome) -> ~Task {
         let f = Cell::new(f);
         let home = Cell::new(home);
         do Local::borrow::<Task, ~Task> |running_task| {
             let mut sched = running_task.sched.take_unwrap();
             let new_task = ~running_task.new_child_homed(&mut sched.stack_pool,
+                                                         stack_size,
                                                          home.take(),
                                                          f.take());
             running_task.sched = Some(sched);
@@ -98,25 +100,26 @@ impl Task {
         }
     }
 
-    pub fn build_child(f: ~fn()) -> ~Task {
-        Task::build_homed_child(f, AnySched)
+    pub fn build_child(stack_size: Option<uint>, f: ~fn()) -> ~Task {
+        Task::build_homed_child(stack_size, f, AnySched)
     }
 
-    pub fn build_homed_root(f: ~fn(), home: SchedHome) -> ~Task {
+    pub fn build_homed_root(stack_size: Option<uint>, f: ~fn(), home: SchedHome) -> ~Task {
         let f = Cell::new(f);
         let home = Cell::new(home);
         do Local::borrow::<Task, ~Task> |running_task| {
             let mut sched = running_task.sched.take_unwrap();
             let new_task = ~Task::new_root_homed(&mut sched.stack_pool,
-                                                    home.take(),
-                                                    f.take());
+                                                 stack_size,
+                                                 home.take(),
+                                                 f.take());
             running_task.sched = Some(sched);
             new_task
         }
     }
 
-    pub fn build_root(f: ~fn()) -> ~Task {
-        Task::build_homed_root(f, AnySched)
+    pub fn build_root(stack_size: Option<uint>, f: ~fn()) -> ~Task {
+        Task::build_homed_root(stack_size, f, AnySched)
     }
 
     pub fn new_sched_task() -> Task {
@@ -137,17 +140,20 @@ impl Task {
     }
 
     pub fn new_root(stack_pool: &mut StackPool,
+                    stack_size: Option<uint>,
                     start: ~fn()) -> Task {
-        Task::new_root_homed(stack_pool, AnySched, start)
+        Task::new_root_homed(stack_pool, stack_size, AnySched, start)
     }
 
     pub fn new_child(&mut self,
                      stack_pool: &mut StackPool,
+                     stack_size: Option<uint>,
                      start: ~fn()) -> Task {
-        self.new_child_homed(stack_pool, AnySched, start)
+        self.new_child_homed(stack_pool, stack_size, AnySched, start)
     }
 
     pub fn new_root_homed(stack_pool: &mut StackPool,
+                          stack_size: Option<uint>,
                           home: SchedHome,
                           start: ~fn()) -> Task {
         Task {
@@ -160,7 +166,7 @@ impl Task {
             death: Death::new(),
             destroyed: false,
             name: None,
-            coroutine: Some(Coroutine::new(stack_pool, start)),
+            coroutine: Some(Coroutine::new(stack_pool, stack_size, start)),
             sched: None,
             task_type: GreenTask(Some(~home))
         }
@@ -168,6 +174,7 @@ impl Task {
 
     pub fn new_child_homed(&mut self,
                            stack_pool: &mut StackPool,
+                           stack_size: Option<uint>,
                            home: SchedHome,
                            start: ~fn()) -> Task {
         Task {
@@ -181,7 +188,7 @@ impl Task {
             death: self.death.new_child(),
             destroyed: false,
             name: None,
-            coroutine: Some(Coroutine::new(stack_pool, start)),
+            coroutine: Some(Coroutine::new(stack_pool, stack_size, start)),
             sched: None,
             task_type: GreenTask(Some(~home))
         }
@@ -325,11 +332,13 @@ impl Drop for Task {
 
 impl Coroutine {
 
-    pub fn new(stack_pool: &mut StackPool, start: ~fn()) -> Coroutine {
-        static MIN_STACK_SIZE: uint = 3000000; // XXX: Too much stack
-
+    pub fn new(stack_pool: &mut StackPool, stack_size: Option<uint>, start: ~fn()) -> Coroutine {
+        let stack_size = match stack_size {
+            Some(size) => size,
+            None => env::min_stack()
+        };
         let start = Coroutine::build_start_wrapper(start);
-        let mut stack = stack_pool.take_segment(MIN_STACK_SIZE);
+        let mut stack = stack_pool.take_segment(stack_size);
         let initial_context = Context::new(start, &mut stack);
         Coroutine {
             current_stack_segment: stack,
@@ -465,10 +474,10 @@ mod test {
         do run_in_newsched_task() {
             static key: local_data::Key<@~str> = &local_data::Key;
             local_data::set(key, @~"data");
-            assert!(*local_data::get(key, |k| k.map(|&k| *k)).unwrap() == ~"data");
+            assert!(*local_data::get(key, |k| k.map_move(|k| *k)).unwrap() == ~"data");
             static key2: local_data::Key<@~str> = &local_data::Key;
             local_data::set(key2, @~"data");
-            assert!(*local_data::get(key2, |k| k.map(|&k| *k)).unwrap() == ~"data");
+            assert!(*local_data::get(key2, |k| k.map_move(|k| *k)).unwrap() == ~"data");
         }
     }
 
diff --git a/src/libstd/rt/test.rs b/src/libstd/rt/test.rs
index 8b5215ae969..92366d5187f 100644
--- a/src/libstd/rt/test.rs
+++ b/src/libstd/rt/test.rs
@@ -15,8 +15,8 @@ use cell::Cell;
 use clone::Clone;
 use container::Container;
 use iterator::{Iterator, range};
-use vec::{OwnedVector, MutableVector};
 use super::io::net::ip::{SocketAddr, Ipv4Addr, Ipv6Addr};
+use vec::{OwnedVector, MutableVector, ImmutableVector};
 use rt::sched::Scheduler;
 use unstable::run_in_bare_thread;
 use rt::thread::Thread;
@@ -29,8 +29,12 @@ use result::{Result, Ok, Err};
 
 pub fn new_test_uv_sched() -> Scheduler {
 
+    let queue = WorkQueue::new();
+    let queues = ~[queue.clone()];
+
     let mut sched = Scheduler::new(~UvEventLoop::new(),
-                                   WorkQueue::new(),
+                                   queue,
+                                   queues,
                                    SleeperList::new());
 
     // Don't wait for the Shutdown message
@@ -57,7 +61,7 @@ pub fn run_in_newsched_task_core(f: ~fn()) {
         exit_handle.take().send(Shutdown);
         rtassert!(exit_status);
     };
-    let mut task = ~Task::new_root(&mut sched.stack_pool, f);
+    let mut task = ~Task::new_root(&mut sched.stack_pool, None, f);
     task.death.on_exit = Some(on_exit);
 
     sched.bootstrap(task);
@@ -164,15 +168,21 @@ pub fn run_in_mt_newsched_task(f: ~fn()) {
         };
 
         let sleepers = SleeperList::new();
-        let work_queue = WorkQueue::new();
 
         let mut handles = ~[];
         let mut scheds = ~[];
+        let mut work_queues = ~[];
 
         for _ in range(0u, nthreads) {
+            let work_queue = WorkQueue::new();
+            work_queues.push(work_queue);
+        }
+
+        for i in range(0u, nthreads) {
             let loop_ = ~UvEventLoop::new();
             let mut sched = ~Scheduler::new(loop_,
-                                            work_queue.clone(),
+                                            work_queues[i].clone(),
+                                            work_queues.clone(),
                                             sleepers.clone());
             let handle = sched.make_handle();
 
@@ -190,8 +200,7 @@ pub fn run_in_mt_newsched_task(f: ~fn()) {
 
             rtassert!(exit_status);
         };
-        let mut main_task = ~Task::new_root(&mut scheds[0].stack_pool,
-                                        f.take());
+        let mut main_task = ~Task::new_root(&mut scheds[0].stack_pool, None, f.take());
         main_task.death.on_exit = Some(on_exit);
 
         let mut threads = ~[];
@@ -209,7 +218,7 @@ pub fn run_in_mt_newsched_task(f: ~fn()) {
 
         while !scheds.is_empty() {
             let mut sched = scheds.pop();
-            let bootstrap_task = ~do Task::new_root(&mut sched.stack_pool) || {
+            let bootstrap_task = ~do Task::new_root(&mut sched.stack_pool, None) || {
                 rtdebug!("bootstrapping non-primary scheduler");
             };
             let bootstrap_task_cell = Cell::new(bootstrap_task);
@@ -232,12 +241,12 @@ pub fn run_in_mt_newsched_task(f: ~fn()) {
 
 /// Test tasks will abort on failure instead of unwinding
 pub fn spawntask(f: ~fn()) {
-    Scheduler::run_task(Task::build_child(f));
+    Scheduler::run_task(Task::build_child(None, f));
 }
 
 /// Create a new task and run it right now. Aborts on failure
 pub fn spawntask_later(f: ~fn()) {
-    Scheduler::run_task_later(Task::build_child(f));
+    Scheduler::run_task_later(Task::build_child(None, f));
 }
 
 pub fn spawntask_random(f: ~fn()) {
@@ -259,7 +268,7 @@ pub fn spawntask_try(f: ~fn()) -> Result<(),()> {
     let chan = Cell::new(chan);
     let on_exit: ~fn(bool) = |exit_status| chan.take().send(exit_status);
 
-    let mut new_task = Task::build_root(f);
+    let mut new_task = Task::build_root(None, f);
     new_task.death.on_exit = Some(on_exit);
 
     Scheduler::run_task(new_task);
@@ -285,7 +294,7 @@ pub fn spawntask_thread(f: ~fn()) -> Thread {
 pub fn with_test_task(blk: ~fn(~Task) -> ~Task) {
     do run_in_bare_thread {
         let mut sched = ~new_test_uv_sched();
-        let task = blk(~Task::new_root(&mut sched.stack_pool, ||{}));
+        let task = blk(~Task::new_root(&mut sched.stack_pool, None, ||{}));
         cleanup_task(task);
     }
 }