From d64d26cd39a86a40feb0db7a9147cc2ae5e82994 Mon Sep 17 00:00:00 2001 From: toddaaro Date: Mon, 10 Jun 2013 15:29:02 -0700 Subject: debugged a compiler ICE when merging local::borrow changes into the main io branch and modified the incoming new file lang.rs to be api-compatible --- src/libstd/task/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/libstd/task') diff --git a/src/libstd/task/mod.rs b/src/libstd/task/mod.rs index f24d2327358..df5b88207ec 100644 --- a/src/libstd/task/mod.rs +++ b/src/libstd/task/mod.rs @@ -514,7 +514,7 @@ pub fn failing() -> bool { } _ => { let mut unwinding = false; - do Local::borrow:: |local| { + do Local::borrow:: |local| { unwinding = match local.unwinder { Some(unwinder) => { unwinder.unwinding -- cgit 1.4.1-3-g733a5 From fd148cd3e2d08ce15272f0690f6e41d2e85ee721 Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Thu, 13 Jun 2013 22:43:20 -0700 Subject: std::rt: Change the Task constructors to reflect a tree --- src/libstd/rt/mod.rs | 4 ++-- src/libstd/rt/sched.rs | 22 +++++++++++----------- src/libstd/rt/task.rs | 26 ++++++++++++++++++++++++-- src/libstd/rt/test.rs | 46 ++++++++++++++++++++++++++++++++++++---------- src/libstd/task/spawn.rs | 9 ++++++++- 5 files changed, 81 insertions(+), 26 deletions(-) (limited to 'src/libstd/task') diff --git a/src/libstd/rt/mod.rs b/src/libstd/rt/mod.rs index 2008c4a180f..a65b07fdbcf 100644 --- a/src/libstd/rt/mod.rs +++ b/src/libstd/rt/mod.rs @@ -167,7 +167,7 @@ pub fn start(_argc: int, _argv: **u8, crate_map: *u8, main: ~fn()) -> int { let sleepers = SleeperList::new(); let mut sched = ~Scheduler::new(loop_, work_queue, sleepers); sched.no_sleep = true; - let main_task = ~Coroutine::new(&mut sched.stack_pool, main); + let main_task = ~Coroutine::new_root(&mut sched.stack_pool, main); sched.enqueue_task(main_task); sched.run(); @@ -241,7 +241,7 @@ fn test_context() { do run_in_bare_thread { assert_eq!(context(), GlobalContext); let mut sched = ~new_test_uv_sched(); - let task = ~do Coroutine::new(&mut sched.stack_pool) { + let task = ~do Coroutine::new_root(&mut sched.stack_pool) { assert_eq!(context(), TaskContext); let sched = Local::take::(); do sched.deschedule_running_task_and_then() |sched, task| { diff --git a/src/libstd/rt/sched.rs b/src/libstd/rt/sched.rs index 104eb4b8bae..9abcc2ec3cc 100644 --- a/src/libstd/rt/sched.rs +++ b/src/libstd/rt/sched.rs @@ -518,8 +518,8 @@ impl SchedHandle { } pub impl Coroutine { - fn new(stack_pool: &mut StackPool, start: ~fn()) -> Coroutine { - Coroutine::with_task(stack_pool, ~Task::new(), start) + fn new_root(stack_pool: &mut StackPool, start: ~fn()) -> Coroutine { + Coroutine::with_task(stack_pool, ~Task::new_root(), start) } fn with_task(stack_pool: &mut StackPool, @@ -614,7 +614,7 @@ mod test { let task_ran_ptr: *mut bool = &mut task_ran; let mut sched = ~new_test_uv_sched(); - let task = ~do Coroutine::new(&mut sched.stack_pool) { + let task = ~do Coroutine::new_root(&mut sched.stack_pool) { unsafe { *task_ran_ptr = true; } }; sched.enqueue_task(task); @@ -632,7 +632,7 @@ mod test { let mut sched = ~new_test_uv_sched(); for int::range(0, total) |_| { - let task = ~do Coroutine::new(&mut sched.stack_pool) { + let task = ~do Coroutine::new_root(&mut sched.stack_pool) { unsafe { *task_count_ptr = *task_count_ptr + 1; } }; sched.enqueue_task(task); @@ -649,10 +649,10 @@ mod test { let count_ptr: *mut int = &mut count; let mut sched = ~new_test_uv_sched(); - let task1 = ~do Coroutine::new(&mut sched.stack_pool) { + let task1 = ~do Coroutine::new_root(&mut sched.stack_pool) { unsafe { *count_ptr = *count_ptr + 1; } let mut sched = Local::take::(); - let task2 = ~do Coroutine::new(&mut sched.stack_pool) { + let task2 = ~do Coroutine::new_root(&mut sched.stack_pool) { unsafe { *count_ptr = *count_ptr + 1; } }; // Context switch directly to the new task @@ -677,7 +677,7 @@ mod test { let mut sched = ~new_test_uv_sched(); - let start_task = ~do Coroutine::new(&mut sched.stack_pool) { + let start_task = ~do Coroutine::new_root(&mut sched.stack_pool) { run_task(count_ptr); }; sched.enqueue_task(start_task); @@ -687,7 +687,7 @@ mod test { fn run_task(count_ptr: *mut int) { do Local::borrow:: |sched| { - let task = ~do Coroutine::new(&mut sched.stack_pool) { + let task = ~do Coroutine::new_root(&mut sched.stack_pool) { unsafe { *count_ptr = *count_ptr + 1; if *count_ptr != MAX { @@ -705,7 +705,7 @@ mod test { fn test_block_task() { do run_in_bare_thread { let mut sched = ~new_test_uv_sched(); - let task = ~do Coroutine::new(&mut sched.stack_pool) { + let task = ~do Coroutine::new_root(&mut sched.stack_pool) { let sched = Local::take::(); assert!(sched.in_task_context()); do sched.deschedule_running_task_and_then() |sched, task| { @@ -752,13 +752,13 @@ mod test { let mut sched1 = ~new_test_uv_sched(); let handle1 = sched1.make_handle(); let handle1_cell = Cell(handle1); - let task1 = ~do Coroutine::new(&mut sched1.stack_pool) { + let task1 = ~do Coroutine::new_root(&mut sched1.stack_pool) { chan_cell.take().send(()); }; sched1.enqueue_task(task1); let mut sched2 = ~new_test_uv_sched(); - let task2 = ~do Coroutine::new(&mut sched2.stack_pool) { + let task2 = ~do Coroutine::new_root(&mut sched2.stack_pool) { port_cell.take().recv(); // Release the other scheduler's handle so it can exit handle1_cell.take(); diff --git a/src/libstd/rt/task.rs b/src/libstd/rt/task.rs index cf4967b12b3..10b4672df05 100644 --- a/src/libstd/rt/task.rs +++ b/src/libstd/rt/task.rs @@ -37,7 +37,7 @@ pub struct Unwinder { } impl Task { - pub fn new() -> Task { + pub fn new_root() -> Task { Task { heap: LocalHeap::new(), gc: GarbageCollector, @@ -48,7 +48,29 @@ impl Task { } } - pub fn without_unwinding() -> Task { + pub fn new_root_without_unwinding() -> Task { + Task { + heap: LocalHeap::new(), + gc: GarbageCollector, + storage: LocalStorage(ptr::null(), None), + logger: StdErrLogger, + unwinder: None, + destroyed: false + } + } + + pub fn new_child(&mut self) -> Task { + Task { + heap: LocalHeap::new(), + gc: GarbageCollector, + storage: LocalStorage(ptr::null(), None), + logger: StdErrLogger, + unwinder: Some(Unwinder { unwinding: false }), + destroyed: false + } + } + + pub fn new_child_without_unwinding(&mut self) -> Task { Task { heap: LocalHeap::new(), gc: GarbageCollector, diff --git a/src/libstd/rt/test.rs b/src/libstd/rt/test.rs index c8df3a61203..4a4d498a26e 100644 --- a/src/libstd/rt/test.rs +++ b/src/libstd/rt/test.rs @@ -48,7 +48,7 @@ pub fn run_in_newsched_task(f: ~fn()) { do run_in_bare_thread { let mut sched = ~new_test_uv_sched(); let task = ~Coroutine::with_task(&mut sched.stack_pool, - ~Task::without_unwinding(), + ~Task::new_root_without_unwinding(), f.take()); sched.enqueue_task(task); sched.run(); @@ -94,7 +94,7 @@ pub fn run_in_mt_newsched_task(f: ~fn()) { let f_cell = Cell(f_cell.take()); let handles = Cell(handles); - let main_task = ~do Coroutine::new(&mut scheds[0].stack_pool) { + let main_task = ~do Coroutine::new_root(&mut scheds[0].stack_pool) { f_cell.take()(); let mut handles = handles.take(); @@ -132,9 +132,14 @@ pub fn run_in_mt_newsched_task(f: ~fn()) { pub fn spawntask(f: ~fn()) { use super::sched::*; + let mut task = None; + do Local::borrow::() |running_task| { + task = Some(~running_task.new_child_without_unwinding()); + } + let mut sched = Local::take::(); let task = ~Coroutine::with_task(&mut sched.stack_pool, - ~Task::without_unwinding(), + task.swap_unwrap(), f); sched.schedule_new_task(task); } @@ -143,9 +148,14 @@ pub fn spawntask(f: ~fn()) { pub fn spawntask_immediately(f: ~fn()) { use super::sched::*; + let mut task = None; + do Local::borrow::() |running_task| { + task = Some(~running_task.new_child_without_unwinding()); + } + let mut sched = Local::take::(); let task = ~Coroutine::with_task(&mut sched.stack_pool, - ~Task::without_unwinding(), + task.swap_unwrap(), f); do sched.switch_running_tasks_and_then(task) |sched, task| { sched.enqueue_task(task); @@ -156,9 +166,14 @@ pub fn spawntask_immediately(f: ~fn()) { pub fn spawntask_later(f: ~fn()) { use super::sched::*; + let mut task = None; + do Local::borrow::() |running_task| { + task = Some(~running_task.new_child_without_unwinding()); + } + let mut sched = Local::take::(); let task = ~Coroutine::with_task(&mut sched.stack_pool, - ~Task::without_unwinding(), + task.swap_unwrap(), f); sched.enqueue_task(task); @@ -170,14 +185,19 @@ pub fn spawntask_random(f: ~fn()) { use super::sched::*; use rand::{Rand, rng}; - let mut rng = rng(); - let run_now: bool = Rand::rand(&mut rng); + let mut task = None; + do Local::borrow::() |running_task| { + task = Some(~running_task.new_child_without_unwinding()); + } let mut sched = Local::take::(); let task = ~Coroutine::with_task(&mut sched.stack_pool, - ~Task::without_unwinding(), + task.swap_unwrap(), f); + let mut rng = rng(); + let run_now: bool = Rand::rand(&mut rng); + if run_now { do sched.switch_running_tasks_and_then(task) |sched, task| { sched.enqueue_task(task); @@ -206,7 +226,7 @@ pub fn spawntask_try(f: ~fn()) -> Result<(), ()> { do sched.deschedule_running_task_and_then() |sched, old_task| { let old_task = Cell(old_task); let f = f.take(); - let new_task = ~do Coroutine::new(&mut sched.stack_pool) { + let new_task = ~do Coroutine::new_root(&mut sched.stack_pool) { do (|| { (f.take())() }).finally { @@ -229,11 +249,17 @@ pub fn spawntask_try(f: ~fn()) -> Result<(), ()> { pub fn spawntask_thread(f: ~fn()) -> Thread { use rt::sched::*; + let mut task = None; + do Local::borrow::() |running_task| { + task = Some(~running_task.new_child_without_unwinding()); + } + + let task = Cell(task.swap_unwrap()); let f = Cell(f); let thread = do Thread::start { let mut sched = ~new_test_uv_sched(); let task = ~Coroutine::with_task(&mut sched.stack_pool, - ~Task::without_unwinding(), + task.take(), f.take()); sched.enqueue_task(task); sched.run(); diff --git a/src/libstd/task/spawn.rs b/src/libstd/task/spawn.rs index 5941221821a..a4fbec11d72 100644 --- a/src/libstd/task/spawn.rs +++ b/src/libstd/task/spawn.rs @@ -91,6 +91,7 @@ use uint; use util; use unstable::sync::{Exclusive, exclusive}; use rt::local::Local; +use rt::task::Task; #[cfg(test)] use task::default_task_opts; @@ -576,8 +577,14 @@ pub fn spawn_raw(opts: TaskOpts, f: ~fn()) { fn spawn_raw_newsched(_opts: TaskOpts, f: ~fn()) { use rt::sched::*; + let mut task = None; + do Local::borrow::() |running_task| { + task = Some(~running_task.new_child_without_unwinding()); + } + let mut sched = Local::take::(); - let task = ~Coroutine::new(&mut sched.stack_pool, f); + let task = ~Coroutine::with_task(&mut sched.stack_pool, + task.swap_unwrap(), f); sched.schedule_new_task(task); } -- cgit 1.4.1-3-g733a5 From 90fbe38f0064836fd5e169c520d3fd19953e5604 Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Thu, 13 Jun 2013 23:16:27 -0700 Subject: std::rt: Tasks must have an unwinder. Simpler --- src/libstd/rt/task.rs | 39 ++++----------------------------------- src/libstd/rt/test.rs | 12 ++++++------ src/libstd/sys.rs | 6 +----- src/libstd/task/mod.rs | 11 +---------- src/libstd/task/spawn.rs | 2 +- 5 files changed, 13 insertions(+), 57 deletions(-) (limited to 'src/libstd/task') diff --git a/src/libstd/rt/task.rs b/src/libstd/rt/task.rs index 10b4672df05..7c08dabf0bd 100644 --- a/src/libstd/rt/task.rs +++ b/src/libstd/rt/task.rs @@ -25,7 +25,7 @@ pub struct Task { gc: GarbageCollector, storage: LocalStorage, logger: StdErrLogger, - unwinder: Option, + unwinder: Unwinder, destroyed: bool } @@ -43,18 +43,7 @@ impl Task { gc: GarbageCollector, storage: LocalStorage(ptr::null(), None), logger: StdErrLogger, - unwinder: Some(Unwinder { unwinding: false }), - destroyed: false - } - } - - pub fn new_root_without_unwinding() -> Task { - Task { - heap: LocalHeap::new(), - gc: GarbageCollector, - storage: LocalStorage(ptr::null(), None), - logger: StdErrLogger, - unwinder: None, + unwinder: Unwinder { unwinding: false }, destroyed: false } } @@ -65,18 +54,7 @@ impl Task { gc: GarbageCollector, storage: LocalStorage(ptr::null(), None), logger: StdErrLogger, - unwinder: Some(Unwinder { unwinding: false }), - destroyed: false - } - } - - pub fn new_child_without_unwinding(&mut self) -> Task { - Task { - heap: LocalHeap::new(), - gc: GarbageCollector, - storage: LocalStorage(ptr::null(), None), - logger: StdErrLogger, - unwinder: None, + unwinder: Unwinder { unwinding: false }, destroyed: false } } @@ -88,16 +66,7 @@ impl Task { assert!(ptr::ref_eq(task, self)); } - match self.unwinder { - Some(ref mut unwinder) => { - // If there's an unwinder then set up the catch block - unwinder.try(f); - } - None => { - // Otherwise, just run the body - f() - } - } + self.unwinder.try(f); self.destroy(); } diff --git a/src/libstd/rt/test.rs b/src/libstd/rt/test.rs index 4a4d498a26e..ecfe93560b4 100644 --- a/src/libstd/rt/test.rs +++ b/src/libstd/rt/test.rs @@ -48,7 +48,7 @@ pub fn run_in_newsched_task(f: ~fn()) { do run_in_bare_thread { let mut sched = ~new_test_uv_sched(); let task = ~Coroutine::with_task(&mut sched.stack_pool, - ~Task::new_root_without_unwinding(), + ~Task::new_root(), f.take()); sched.enqueue_task(task); sched.run(); @@ -134,7 +134,7 @@ pub fn spawntask(f: ~fn()) { let mut task = None; do Local::borrow::() |running_task| { - task = Some(~running_task.new_child_without_unwinding()); + task = Some(~running_task.new_child()); } let mut sched = Local::take::(); @@ -150,7 +150,7 @@ pub fn spawntask_immediately(f: ~fn()) { let mut task = None; do Local::borrow::() |running_task| { - task = Some(~running_task.new_child_without_unwinding()); + task = Some(~running_task.new_child()); } let mut sched = Local::take::(); @@ -168,7 +168,7 @@ pub fn spawntask_later(f: ~fn()) { let mut task = None; do Local::borrow::() |running_task| { - task = Some(~running_task.new_child_without_unwinding()); + task = Some(~running_task.new_child()); } let mut sched = Local::take::(); @@ -187,7 +187,7 @@ pub fn spawntask_random(f: ~fn()) { let mut task = None; do Local::borrow::() |running_task| { - task = Some(~running_task.new_child_without_unwinding()); + task = Some(~running_task.new_child()); } let mut sched = Local::take::(); @@ -251,7 +251,7 @@ pub fn spawntask_thread(f: ~fn()) -> Thread { let mut task = None; do Local::borrow::() |running_task| { - task = Some(~running_task.new_child_without_unwinding()); + task = Some(~running_task.new_child()); } let task = Cell(task.swap_unwrap()); diff --git a/src/libstd/sys.rs b/src/libstd/sys.rs index 137070ce202..77085d19567 100644 --- a/src/libstd/sys.rs +++ b/src/libstd/sys.rs @@ -226,11 +226,7 @@ pub fn begin_unwind_(msg: *c_char, file: *c_char, line: size_t) -> ! { gc::cleanup_stack_for_failure(); let task = Local::unsafe_borrow::(); - let unwinder: &mut Option = &mut (*task).unwinder; - match *unwinder { - Some(ref mut unwinder) => unwinder.begin_unwind(), - None => abort!("failure without unwinder. aborting process") - } + (*task).unwinder.begin_unwind(); } } } diff --git a/src/libstd/task/mod.rs b/src/libstd/task/mod.rs index f24d2327358..faa505c1995 100644 --- a/src/libstd/task/mod.rs +++ b/src/libstd/task/mod.rs @@ -515,16 +515,7 @@ pub fn failing() -> bool { _ => { let mut unwinding = false; do Local::borrow:: |local| { - unwinding = match local.unwinder { - Some(unwinder) => { - unwinder.unwinding - } - None => { - // Because there is no unwinder we can't be unwinding. - // (The process will abort on failure) - false - } - } + unwinding = local.unwinder.unwinding } return unwinding; } diff --git a/src/libstd/task/spawn.rs b/src/libstd/task/spawn.rs index a4fbec11d72..a17a6777a98 100644 --- a/src/libstd/task/spawn.rs +++ b/src/libstd/task/spawn.rs @@ -579,7 +579,7 @@ fn spawn_raw_newsched(_opts: TaskOpts, f: ~fn()) { let mut task = None; do Local::borrow::() |running_task| { - task = Some(~running_task.new_child_without_unwinding()); + task = Some(~running_task.new_child()); } let mut sched = Local::take::(); -- cgit 1.4.1-3-g733a5 From d1ec8b5fb85cb6fd4caed64223c5cb3fd920daab Mon Sep 17 00:00:00 2001 From: toddaaro Date: Fri, 14 Jun 2013 12:17:56 -0700 Subject: redesigned the pinning to pin deal with things on dequeue, not on enqueue --- src/libstd/macros.rs | 11 - src/libstd/rt/local.rs | 6 +- src/libstd/rt/sched.rs | 541 +++++++++++++++++++++++++++++++---------------- src/libstd/rt/task.rs | 14 +- src/libstd/rt/test.rs | 32 +-- src/libstd/task/spawn.rs | 2 +- 6 files changed, 393 insertions(+), 213 deletions(-) (limited to 'src/libstd/task') diff --git a/src/libstd/macros.rs b/src/libstd/macros.rs index bf5b36c7580..b01bd8f993c 100644 --- a/src/libstd/macros.rs +++ b/src/libstd/macros.rs @@ -49,18 +49,7 @@ pub fn do_abort() -> ! { macro_rules! abort( ($( $msg:expr),+) => ( { rtdebug!($($msg),+); - -// do_abort(); - - // NB: This is in a fn to avoid putting the `unsafe` block in - // a macro, which causes spurious 'unnecessary unsafe block' - // warnings. -// fn do_abort() -> ! { -// unsafe { ::libc::abort(); } -// } - ::macros::do_abort(); - } ) ) diff --git a/src/libstd/rt/local.rs b/src/libstd/rt/local.rs index 359cf5fc3e1..6e0fbda5ec9 100644 --- a/src/libstd/rt/local.rs +++ b/src/libstd/rt/local.rs @@ -30,7 +30,7 @@ impl Local for Scheduler { fn borrow(f: &fn(&mut Scheduler) -> T) -> T { let mut res: Option = None; let res_ptr: *mut Option = &mut res; - unsafe { + unsafe { do local_ptr::borrow |sched| { let result = f(sched); *res_ptr = Some(result); @@ -39,7 +39,7 @@ impl Local for Scheduler { match res { Some(r) => { r } None => abort!("function failed!") - } + } } unsafe fn unsafe_borrow() -> *mut Scheduler { local_ptr::unsafe_borrow() } unsafe fn try_unsafe_borrow() -> Option<*mut Scheduler> { abort!("unimpl") } @@ -139,5 +139,5 @@ mod test { assert!(res) let _scheduler: ~Scheduler = Local::take(); } - + } diff --git a/src/libstd/rt/sched.rs b/src/libstd/rt/sched.rs index 4bc61d63824..3b8a31d1840 100644 --- a/src/libstd/rt/sched.rs +++ b/src/libstd/rt/sched.rs @@ -90,27 +90,10 @@ pub struct Coroutine { priv saved_context: Context, /// The heap, GC, unwinding, local storage, logging task: ~Task, - /// The scheduler that this task calls home - home_sched: SchedHome } -// To send a Coroutine to another task we have to use contained home -// information (the SchedHandle). So we need a form that doesn't -// include one. - -// XXX perf: Evaluate this structure - there should be a clever way to -// make it such that we don't need to deal with building/destructing -// on Coroutines that aren't homed. - -pub struct HomelessCoroutine { - priv current_stack_segment: StackSegment, - priv saved_context: Context, - task: ~Task -} - // A scheduler home is either a handle to the home scheduler, or an // explicit "AnySched". - pub enum SchedHome { AnySched, Sched(SchedHandle) @@ -119,7 +102,7 @@ pub enum SchedHome { pub enum SchedMessage { Wake, Shutdown, - BiasedTask(~HomelessCoroutine) + PinnedTask(~Coroutine) } enum CleanupJob { @@ -193,6 +176,7 @@ pub impl Scheduler { (*event_loop).run(); } + rtdebug!("run taking sched"); let sched = Local::take::(); // XXX: Reenable this once we're using a per-task queue. With a shared // queue this is not true @@ -214,6 +198,7 @@ pub impl Scheduler { if sched.interpret_message_queue() { // We performed a scheduling action. There may be other work // to do yet, so let's try again later. + rtdebug!("run_sched_once, interpret_message_queue taking sched"); let mut sched = Local::take::(); sched.metrics.messages_received += 1; sched.event_loop.callback(Scheduler::run_sched_once); @@ -222,6 +207,7 @@ pub impl Scheduler { } // Now, look in the work queue for tasks to run + rtdebug!("run_sched_once taking"); let sched = Local::take::(); if sched.resume_task_from_queue() { // We performed a scheduling action. There may be other work @@ -271,7 +257,7 @@ pub impl Scheduler { // We don't want to queue tasks that belong on other threads, // so we send them home at enqueue time. - + // The borrow checker doesn't like our disassembly of the // Coroutine struct and partial use and mutation of the // fields. So completely disassemble here and stop using? @@ -283,95 +269,31 @@ pub impl Scheduler { let this = self; - match task { - ~Coroutine { current_stack_segment: css, - saved_context: sc, - task: t, - home_sched: home_sched } => { - - let mut home_sched = home_sched; - - match home_sched { - Sched(ref mut home_handle) - if home_handle.sched_id != this.sched_id() => { - - // In this branch we know the task is not - // home, so we send it home. - - rtdebug!("home_handle_id: %u, loc: %u", - home_handle.sched_id, - this.sched_id()); - let homeless = ~HomelessCoroutine { - current_stack_segment: css, - saved_context: sc, - task: t - }; - home_handle.send(BiasedTask(homeless)); - rtdebug!("sent task home"); - return (); - } - Sched( ref mut home_handle) => { - - // Here we know the task is home, so we need - // to "keep" it home. Since we don't have a - // scheduler-local queue for this purpose, we - // just use our message queue. - - rtdebug!("homed task at home, sending to self"); - let homeless = ~HomelessCoroutine { - current_stack_segment: css, - saved_context: sc, - task: t - }; - home_handle.send(BiasedTask(homeless)); - rtdebug!("sent home to self"); - return (); - - } - _ => { - - // We just destroyed our Coroutine ... but now - // we want it back. Build a new one? - // XXX: perf: see above comment about not - // destroying - - let task = ~Coroutine { - current_stack_segment: css, - saved_context: sc, - task: t, - home_sched: AnySched }; - - - // We push the task onto our local queue. - this.work_queue.push(task); - this.event_loop.callback(Scheduler::run_sched_once); - - // We've made work available. Notify a - // sleeping scheduler. - - // XXX: perf. Check for a sleeper without - // synchronizing memory. It's not critical - // that we always find it. - - // XXX: perf. If there's a sleeper then we - // might as well just send it the task - // directly instead of pushing it to the - // queue. That is essentially the intent here - // and it is less work. - match this.sleeper_list.pop() { - Some(handle) => { - let mut handle = handle; - handle.send(Wake) - } - None => { (/* pass */) } - }; - } - } + // We push the task onto our local queue clone. + this.work_queue.push(task); + this.event_loop.callback(Scheduler::run_sched_once); + + // We've made work available. Notify a + // sleeping scheduler. + + // XXX: perf. Check for a sleeper without + // synchronizing memory. It's not critical + // that we always find it. + + // XXX: perf. If there's a sleeper then we + // might as well just send it the task + // directly instead of pushing it to the + // queue. That is essentially the intent here + // and it is less work. + match this.sleeper_list.pop() { + Some(handle) => { + let mut handle = handle; + handle.send(Wake) } - } + None => { (/* pass */) } + }; } - // * Scheduler-context operations fn interpret_message_queue(~self) -> bool { @@ -381,23 +303,11 @@ pub impl Scheduler { let mut this = self; match this.message_queue.pop() { - Some(BiasedTask(~HomelessCoroutine { - current_stack_segment: css, - saved_context: sc, - task: t})) => { + Some(PinnedTask(task)) => { rtdebug!("recv BiasedTask message in sched: %u", this.sched_id()); - - // Since this was the "send home" message for a task, - // we know that this is the home. So we rebuild the - // sched_handle. - - let task = ~Coroutine { - current_stack_segment: css, - saved_context: sc, - task: t, - home_sched: Sched(this.make_handle()) - }; + let mut task = task; + task.task.home = Some(Sched(this.make_handle())); this.resume_task_immediately(task); return true; } @@ -438,32 +348,93 @@ pub impl Scheduler { } } + /// Given an input Coroutine sends it back to its home scheduler. + fn send_task_home(task: ~Coroutine) { + let mut task = task; + let mut home = task.task.home.swap_unwrap(); + match home { + Sched(ref mut home_handle) => { + home_handle.send(PinnedTask(task)); + } + AnySched => { + abort!("error: cannot send anysched task home"); + } + } + } + + // Resume a task from the queue - but also take into account that + // it might not belong here. fn resume_task_from_queue(~self) -> bool { assert!(!self.in_task_context()); rtdebug!("looking in work queue for task to schedule"); let mut this = self; - if this.run_anything { - match this.work_queue.pop() { - Some(task) => { - rtdebug!("resuming task from work queue"); - this.resume_task_immediately(task); - return true; - } - None => { - rtdebug!("no tasks in queue"); - Local::put(this); - return false; + // The borrow checker imposes the possibly absurd requirement + // that we split this into two match expressions. This is due + // to the inspection of the internal bits of task, as that + // can't be in scope when we act on task. + match this.work_queue.pop() { + Some(task) => { + let action_id = { + let home = &task.task.home; + match home { + &Some(Sched(ref home_handle)) + if home_handle.sched_id != this.sched_id() => { + 0 + } + &Some(AnySched) if this.run_anything => { + 1 + } + &Some(AnySched) => { + 2 + } + &Some(Sched(_)) => { + 3 + } + &None => { + 4 + } + } + }; + + match action_id { + 0 => { + rtdebug!("sending task home"); + Scheduler::send_task_home(task); + Local::put(this); + return false; + } + 1 => { + rtdebug!("resuming now"); + this.resume_task_immediately(task); + return true; + } + 2 => { + rtdebug!("re-queueing") + this.enqueue_task(task); + Local::put(this); + return false; + } + 3 => { + rtdebug!("resuming now"); + this.resume_task_immediately(task); + return true; + } + 4 => { + abort!("task home was None!"); + } + _ => { + abort!("literally, you should not be here"); + } } } - } else { - // In this branch we have a scheduler that is not allowed - // to run unpinned tasks. As such it will only get tasks - // to run from the message queue. - rtdebug!("skipping resume_task_from_queue"); - Local::put(this); - return false; + + None => { + rtdebug!("no tasks in queue"); + Local::put(this); + return false; + } } } @@ -484,21 +455,32 @@ pub impl Scheduler { abort!("control reached end of task"); } - fn schedule_new_task(~self, task: ~Coroutine) { + pub fn schedule_task(~self, task: ~Coroutine) { assert!(self.in_task_context()); - do self.switch_running_tasks_and_then(task) |sched, last_task| { - let last_task = Cell(last_task); - sched.enqueue_task(last_task.take()); - } - } + // is the task home? + let is_home = task.is_home_no_tls(&self); - fn schedule_task(~self, task: ~Coroutine) { - assert!(self.in_task_context()); + // does the task have a home? + let homed = task.homed(); + + let mut this = self; - do self.switch_running_tasks_and_then(task) |sched, last_task| { - let last_task = Cell(last_task); - sched.enqueue_task(last_task.take()); + if is_home || (!homed && this.run_anything) { + // here we know we are home, execute now OR we know we + // aren't homed, and that this sched doesn't care + do this.switch_running_tasks_and_then(task) |sched, last_task| { + let last_task = Cell(last_task); + sched.enqueue_task(last_task.take()); + } + } else if !homed && !this.run_anything { + // the task isn't homed, but it can't be run here + this.enqueue_task(task); + Local::put(this); + } else { + // task isn't home, so don't run it here, send it home + Scheduler::send_task_home(task); + Local::put(this); } } @@ -681,19 +663,66 @@ impl SchedHandle { pub impl Coroutine { - - /// This function checks that a coroutine is running "home". - fn am_home(&self) -> bool { + /// This function checks that a coroutine is running "home". + fn is_home(&self) -> bool { + rtdebug!("checking if coroutine is home"); do Local::borrow:: |sched| { - match self.home_sched { - AnySched => { true } - Sched(SchedHandle { sched_id: ref id, _ }) => { + match self.task.home { + Some(AnySched) => { false } + Some(Sched(SchedHandle { sched_id: ref id, _ })) => { *id == sched.sched_id() } + None => { abort!("error: homeless task!"); } + } + } + } + + /// Without access to self, but with access to the "expected home + /// id", see if we are home. + fn is_home_using_id(id: uint) -> bool { + rtdebug!("checking if coroutine is home using id"); + do Local::borrow:: |sched| { + if sched.sched_id() == id { + true + } else { + false + } + } + } + + /// Check if this coroutine has a home + fn homed(&self) -> bool { + rtdebug!("checking if this coroutine has a home"); + match self.task.home { + Some(AnySched) => { false } + Some(Sched(_)) => { true } + None => { abort!("error: homeless task!"); + } + } + } + + /// A version of is_home that does not need to use TLS, it instead + /// takes local scheduler as a parameter. + fn is_home_no_tls(&self, sched: &~Scheduler) -> bool { + rtdebug!("checking if coroutine is home without tls"); + match self.task.home { + Some(AnySched) => { true } + Some(Sched(SchedHandle { sched_id: ref id, _})) => { + *id == sched.sched_id() } + None => { abort!("error: homeless task!"); } + } + } + + /// Check TLS for the scheduler to see if we are on a special + /// scheduler. + pub fn on_special() -> bool { + rtdebug!("checking if coroutine is executing on special sched"); + do Local::borrow::() |sched| { + !sched.run_anything } - } - + } + // Created new variants of "new" that takes a home scheduler // parameter. The original with_task now calls with_task_homed // using the AnySched paramter. @@ -710,19 +739,20 @@ pub impl Coroutine { task: ~Task, start: ~fn(), home: SchedHome) -> Coroutine { - + static MIN_STACK_SIZE: uint = 10000000; // XXX: Too much stack - + let start = Coroutine::build_start_wrapper(start); let mut stack = stack_pool.take_segment(MIN_STACK_SIZE); // NB: Context holds a pointer to that ~fn let initial_context = Context::new(start, &mut stack); - return Coroutine { + let mut crt = Coroutine { current_stack_segment: stack, saved_context: initial_context, task: task, - home_sched: home }; + crt.task.home = Some(home); + return crt; } fn with_task(stack_pool: &mut StackPool, @@ -841,7 +871,7 @@ mod test { let sched_handle = sched.make_handle(); let sched_id = sched.sched_id(); - + let task = ~do Coroutine::new_homed(&mut sched.stack_pool, Sched(sched_handle)) { unsafe { *task_ran_ptr = true }; @@ -855,6 +885,146 @@ mod test { } } + // A test for each state of schedule_task + + #[test] + fn test_schedule_home_states() { + + use rt::uv::uvio::UvEventLoop; + use rt::sched::Shutdown; + use rt::sleeper_list::SleeperList; + use rt::work_queue::WorkQueue; + + do run_in_bare_thread { +// let nthreads = 2; + + let sleepers = SleeperList::new(); + let work_queue = WorkQueue::new(); + + // our normal scheduler + let mut normal_sched = ~Scheduler::new( + ~UvEventLoop::new(), + work_queue.clone(), + sleepers.clone()); + + let normal_handle = Cell(normal_sched.make_handle()); + + // our special scheduler + let mut special_sched = ~Scheduler::new_special( + ~UvEventLoop::new(), + work_queue.clone(), + sleepers.clone(), + true); + + let special_handle = Cell(special_sched.make_handle()); + let special_handle2 = Cell(special_sched.make_handle()); + let special_id = special_sched.sched_id(); + let t1_handle = special_sched.make_handle(); + let t4_handle = special_sched.make_handle(); + + let t1f = ~do Coroutine::new_homed(&mut special_sched.stack_pool, + Sched(t1_handle)) { + let is_home = Coroutine::is_home_using_id(special_id); + rtdebug!("t1 should be home: %b", is_home); + assert!(is_home); + }; + let t1f = Cell(t1f); + + let t2f = ~do Coroutine::new(&mut normal_sched.stack_pool) { + let on_special = Coroutine::on_special(); + rtdebug!("t2 should not be on special: %b", on_special); + assert!(!on_special); + }; + let t2f = Cell(t2f); + + let t3f = ~do Coroutine::new(&mut normal_sched.stack_pool) { + // not on special + let on_special = Coroutine::on_special(); + rtdebug!("t3 should not be on special: %b", on_special); + assert!(!on_special); + }; + let t3f = Cell(t3f); + + let t4f = ~do Coroutine::new_homed(&mut special_sched.stack_pool, + Sched(t4_handle)) { + // is home + let home = Coroutine::is_home_using_id(special_id); + rtdebug!("t4 should be home: %b", home); + assert!(home); + }; + let t4f = Cell(t4f); + + // we have four tests, make them as closures + let t1: ~fn() = || { + // task is home on special + let task = t1f.take(); + let sched = Local::take::(); + sched.schedule_task(task); + }; + let t2: ~fn() = || { + // not homed, task doesn't care + let task = t2f.take(); + let sched = Local::take::(); + sched.schedule_task(task); + }; + let t3: ~fn() = || { + // task not homed, must leave + let task = t3f.take(); + let sched = Local::take::(); + sched.schedule_task(task); + }; + let t4: ~fn() = || { + // task not home, send home + let task = t4f.take(); + let sched = Local::take::(); + sched.schedule_task(task); + }; + + let t1 = Cell(t1); + let t2 = Cell(t2); + let t3 = Cell(t3); + let t4 = Cell(t4); + + // build a main task that runs our four tests + let main_task = ~do Coroutine::new(&mut normal_sched.stack_pool) { + // the two tasks that require a normal start location + t2.take()(); + t4.take()(); + normal_handle.take().send(Shutdown); + special_handle.take().send(Shutdown); + }; + + // task to run the two "special start" tests + let special_task = ~do Coroutine::new_homed( + &mut special_sched.stack_pool, + Sched(special_handle2.take())) { + t1.take()(); + t3.take()(); + }; + + // enqueue the main tasks + normal_sched.enqueue_task(special_task); + normal_sched.enqueue_task(main_task); + + let nsched_cell = Cell(normal_sched); + let normal_thread = do Thread::start { + let sched = nsched_cell.take(); + sched.run(); + }; + + let ssched_cell = Cell(special_sched); + let special_thread = do Thread::start { + let sched = ssched_cell.take(); + sched.run(); + }; + + // wait for the end + let _thread1 = normal_thread; + let _thread2 = special_thread; + + } + } + // The following test is a bit of a mess, but it trys to do // something tricky so I'm not sure how to get around this in the // short term. @@ -865,9 +1035,9 @@ mod test { // observe that the task is not home, and send it home. // This test is light in that it does very little. - + #[test] - fn test_transfer_task_home() { + fn test_transfer_task_home() { use rt::uv::uvio::UvEventLoop; use rt::sched::Shutdown; @@ -879,18 +1049,18 @@ mod test { use vec::OwnedVector; do run_in_bare_thread { - + static N: uint = 8; - + let sleepers = SleeperList::new(); let work_queue = WorkQueue::new(); - + let mut handles = ~[]; let mut scheds = ~[]; - + for uint::range(0, N) |_| { let loop_ = ~UvEventLoop::new(); - let mut sched = ~Scheduler::new(loop_, + let mut sched = ~Scheduler::new(loop_, work_queue.clone(), sleepers.clone()); let handle = sched.make_handle(); @@ -901,7 +1071,7 @@ mod test { let handles = Cell(handles); - let home_handle = scheds[6].make_handle(); + let home_handle = scheds[6].make_handle(); let home_id = home_handle.sched_id; let home = Sched(home_handle); @@ -913,18 +1083,18 @@ mod test { sched.sched_id(), home_id); assert!(sched.sched_id() == home_id); - Local::put::(sched); + Local::put::(sched); let mut handles = handles.take(); for handles.each_mut |handle| { handle.send(Shutdown); } }; - + scheds[0].enqueue_task(main_task); - + let mut threads = ~[]; - + while !scheds.is_empty() { let sched = scheds.pop(); let sched_cell = Cell(sched); @@ -934,13 +1104,23 @@ mod test { }; threads.push(thread); } - + let _threads = threads; } } - + + // Do it a lot + + #[test] + fn test_stress_schedule_task_states() { + let n = stress_factor() * 120; + for int::range(0,n as int) |_| { + test_schedule_home_states(); + } + } + // The goal is that this is the high-stress test for making sure - // homing is working. It allocates 120*RUST_RT_STRESS tasks that + // homing is working. It allocates RUST_RT_STRESS tasks that // do nothing but assert that they are home at execution // time. These tasks are queued to random schedulers, so sometimes // they are home and sometimes not. It also runs RUST_RT_STRESS @@ -953,7 +1133,6 @@ mod test { run_in_mt_newsched_task_random_homed(); } } - #[test] fn test_simple_scheduling() { @@ -1210,8 +1389,8 @@ mod test { fn start_closure_dtor() { use ops::Drop; - // Regression test that the `start` task entrypoint can contain dtors - // that use task resources + // Regression test that the `start` task entrypoint can + // contain dtors that use task resources do run_in_newsched_task { struct S { field: () } @@ -1226,7 +1405,7 @@ mod test { do spawntask { let _ss = &s; } - } + } } } diff --git a/src/libstd/rt/task.rs b/src/libstd/rt/task.rs index 4d9851d3b40..06318ac6623 100644 --- a/src/libstd/rt/task.rs +++ b/src/libstd/rt/task.rs @@ -19,6 +19,7 @@ use cast::transmute; use rt::local::Local; use super::local_heap::LocalHeap; use rt::logging::StdErrLogger; +use rt::sched::{SchedHome, AnySched}; pub struct Task { heap: LocalHeap, @@ -26,7 +27,8 @@ pub struct Task { storage: LocalStorage, logger: StdErrLogger, unwinder: Option, - destroyed: bool + destroyed: bool, + home: Option } pub struct GarbageCollector; @@ -44,7 +46,8 @@ impl Task { storage: LocalStorage(ptr::null(), None), logger: StdErrLogger, unwinder: Some(Unwinder { unwinding: false }), - destroyed: false + destroyed: false, + home: Some(AnySched) } } @@ -55,10 +58,15 @@ impl Task { storage: LocalStorage(ptr::null(), None), logger: StdErrLogger, unwinder: None, - destroyed: false + destroyed: false, + home: Some(AnySched) } } + pub fn give_home(&mut self, new_home: SchedHome) { + self.home = Some(new_home); + } + pub fn run(&mut self, f: &fn()) { // This is just an assertion that `run` was called unsafely // and this instance of Task is still accessible. diff --git a/src/libstd/rt/test.rs b/src/libstd/rt/test.rs index ecef505ce57..bb284c02541 100644 --- a/src/libstd/rt/test.rs +++ b/src/libstd/rt/test.rs @@ -162,18 +162,19 @@ pub fn run_in_mt_newsched_task_random_homed() { for uint::range(0, nthreads) |i| { let special = (i % 2) == 0; let loop_ = ~UvEventLoop::new(); - let mut sched = ~Scheduler::new_special(loop_, work_queue.clone(), sleepers.clone(), special); + let mut sched = ~Scheduler::new_special( + loop_, work_queue.clone(), sleepers.clone(), special); let handle = sched.make_handle(); handles.push(handle); scheds.push(sched); - } + } // Schedule a pile o tasks - let n = 5*stress_factor(); + let n = 5*stress_factor(); for uint::range(0,n) |_i| { rtdebug!("creating task: %u", _i); let hf: ~fn() = || { assert!(true) }; - spawntask_homed(&mut scheds, hf); + spawntask_homed(&mut scheds, hf); } // Now we want another pile o tasks that do not ever run on a @@ -182,11 +183,11 @@ pub fn run_in_mt_newsched_task_random_homed() { let n = 5*stress_factor(); - let f: ~fn() = || { + let f: ~fn() = || { for uint::range(0,n) |_| { - let f: ~fn() = || { + let f: ~fn() = || { // Borrow the scheduler we run on and check if it is - // privliged. + // privileged. do Local::borrow:: |sched| { assert!(sched.run_anything); }; @@ -194,12 +195,12 @@ pub fn run_in_mt_newsched_task_random_homed() { spawntask_random(f); }; }; - + let f_cell = Cell(f); let handles = Cell(handles); rtdebug!("creating main task"); - + let main_task = ~do Coroutine::new(&mut scheds[0].stack_pool) { f_cell.take()(); let mut handles = handles.take(); @@ -210,7 +211,7 @@ pub fn run_in_mt_newsched_task_random_homed() { }; rtdebug!("queuing main task") - + scheds[0].enqueue_task(main_task); let mut threads = ~[]; @@ -243,11 +244,13 @@ pub fn run_in_mt_newsched_task_random_homed() { pub fn spawntask(f: ~fn()) { use super::sched::*; + rtdebug!("spawntask taking the scheduler from TLS") let mut sched = Local::take::(); let task = ~Coroutine::with_task(&mut sched.stack_pool, ~Task::without_unwinding(), f); - sched.schedule_new_task(task); + rtdebug!("spawntask scheduling the new task"); + sched.schedule_task(task); } /// Create a new task and run it right now. Aborts on failure @@ -305,7 +308,7 @@ pub fn spawntask_homed(scheds: &mut ~[~Scheduler], f: ~fn()) { use super::sched::*; use rand::{rng, RngUtil}; let mut rng = rng(); - + let task = { let sched = &mut scheds[rng.gen_int_range(0,scheds.len() as int)]; let handle = sched.make_handle(); @@ -321,14 +324,15 @@ pub fn spawntask_homed(scheds: &mut ~[~Scheduler], f: ~fn()) { assert!(home_id == sched.sched_id()); }; f() - }; - + }; + ~Coroutine::with_task_homed(&mut sched.stack_pool, ~Task::without_unwinding(), af, Sched(handle)) }; let dest_sched = &mut scheds[rng.gen_int_range(0,scheds.len() as int)]; + // enqueue it for future execution dest_sched.enqueue_task(task); } diff --git a/src/libstd/task/spawn.rs b/src/libstd/task/spawn.rs index 5941221821a..5e507238f67 100644 --- a/src/libstd/task/spawn.rs +++ b/src/libstd/task/spawn.rs @@ -578,7 +578,7 @@ fn spawn_raw_newsched(_opts: TaskOpts, f: ~fn()) { let mut sched = Local::take::(); let task = ~Coroutine::new(&mut sched.stack_pool, f); - sched.schedule_new_task(task); + sched.schedule_task(task); } fn spawn_raw_oldsched(mut opts: TaskOpts, f: ~fn()) { -- cgit 1.4.1-3-g733a5 From b530ca103388c99e774868645758785d6ad6b9a9 Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Sun, 23 Jun 2013 14:01:59 -0700 Subject: std: Make unlinking and task notification work with newsched --- src/libstd/rt/task.rs | 25 +++++++++++++++++++++++++ src/libstd/task/spawn.rs | 22 +++++++++++++++++++--- 2 files changed, 44 insertions(+), 3 deletions(-) (limited to 'src/libstd/task') diff --git a/src/libstd/rt/task.rs b/src/libstd/rt/task.rs index 68f7eb659b0..97c3b6a749b 100644 --- a/src/libstd/rt/task.rs +++ b/src/libstd/rt/task.rs @@ -290,4 +290,29 @@ mod test { a.next = Some(b); } } + + // XXX: This is a copy of test_future_result in std::task. + // It can be removed once the scheduler is turned on by default. + #[test] + fn future_result() { + do run_in_newsched_task { + use option::{Some, None}; + use task::*; + + let mut result = None; + let mut builder = task(); + builder.future_result(|r| result = Some(r)); + do builder.spawn {} + assert_eq!(result.unwrap().recv(), Success); + + result = None; + let mut builder = task(); + builder.future_result(|r| result = Some(r)); + builder.unlinked(); + do builder.spawn { + fail!(); + } + assert_eq!(result.unwrap().recv(), Failure); + } + } } diff --git a/src/libstd/task/spawn.rs b/src/libstd/task/spawn.rs index 344a58a877f..63eb768d1c9 100644 --- a/src/libstd/task/spawn.rs +++ b/src/libstd/task/spawn.rs @@ -578,13 +578,29 @@ pub fn spawn_raw(opts: TaskOpts, f: ~fn()) { } } -fn spawn_raw_newsched(_opts: TaskOpts, f: ~fn()) { +fn spawn_raw_newsched(mut opts: TaskOpts, f: ~fn()) { use rt::sched::*; - let task = do Local::borrow::() |running_task| { - ~running_task.new_child() + let mut task = if opts.linked { + do Local::borrow::() |running_task| { + ~running_task.new_child() + } + } else { + // An unlinked task is a new root in the task tree + ~Task::new_root() }; + if opts.notify_chan.is_some() { + let notify_chan = opts.notify_chan.swap_unwrap(); + let notify_chan = Cell::new(notify_chan); + let on_exit: ~fn(bool) = |success| { + notify_chan.take().send( + if success { Success } else { Failure } + ) + }; + task.on_exit = Some(on_exit); + } + let mut sched = Local::take::(); let task = ~Coroutine::with_task(&mut sched.stack_pool, task, f); -- cgit 1.4.1-3-g733a5 From 5cfad4b6de3a9ab749c975338c23fc2e20b0beec Mon Sep 17 00:00:00 2001 From: toddaaro Date: Wed, 26 Jun 2013 16:41:00 -0700 Subject: Refactored the runtime to view coroutines as a component of tasks, instead of tasks as a component of coroutines. --- src/libstd/rt/comm.rs | 12 +- src/libstd/rt/join_latch.rs | 1 + src/libstd/rt/local.rs | 9 +- src/libstd/rt/mod.rs | 13 +- src/libstd/rt/sched.rs | 406 ++++++++------------------------------------ src/libstd/rt/task.rs | 176 +++++++++++++++++-- src/libstd/rt/test.rs | 277 +++++++++++------------------- src/libstd/rt/tube.rs | 5 +- src/libstd/rt/uv/uvio.rs | 5 +- src/libstd/task/mod.rs | 1 + src/libstd/task/spawn.rs | 27 ++- src/libstd/unstable/lang.rs | 4 + 12 files changed, 386 insertions(+), 550 deletions(-) (limited to 'src/libstd/task') diff --git a/src/libstd/rt/comm.rs b/src/libstd/rt/comm.rs index dd27c03ff51..7608bc89e02 100644 --- a/src/libstd/rt/comm.rs +++ b/src/libstd/rt/comm.rs @@ -20,7 +20,8 @@ use cast; use util; use ops::Drop; use kinds::Owned; -use rt::sched::{Scheduler, Coroutine}; +use rt::sched::{Scheduler}; +use rt::task::Task; use rt::local::Local; use unstable::atomics::{AtomicUint, AtomicOption, SeqCst}; use unstable::sync::UnsafeAtomicRcBox; @@ -136,7 +137,7 @@ impl ChanOne { } task_as_state => { // Port is blocked. Wake it up. - let recvr: ~Coroutine = cast::transmute(task_as_state); + let recvr: ~Task = cast::transmute(task_as_state); let mut sched = Local::take::(); rtdebug!("rendezvous send"); sched.metrics.rendezvous_sends += 1; @@ -192,7 +193,7 @@ impl PortOne { // NB: We have to drop back into the scheduler event loop here // instead of switching immediately back or we could end up // triggering infinite recursion on the scheduler's stack. - let task: ~Coroutine = cast::transmute(task_as_state); + let task: ~Task = cast::transmute(task_as_state); sched.enqueue_task(task); } _ => util::unreachable() @@ -257,7 +258,7 @@ impl Drop for ChanOneHack { task_as_state => { // The port is blocked waiting for a message we will never send. Wake it. assert!((*this.packet()).payload.is_none()); - let recvr: ~Coroutine = cast::transmute(task_as_state); + let recvr: ~Task = cast::transmute(task_as_state); let sched = Local::take::(); sched.schedule_task(recvr); } @@ -554,6 +555,8 @@ mod test { { let _c = chan; } port.recv(); }; + // What is our res? + rtdebug!("res is: %?", res.is_err()); assert!(res.is_err()); } } @@ -905,4 +908,5 @@ mod test { } } } + } diff --git a/src/libstd/rt/join_latch.rs b/src/libstd/rt/join_latch.rs index ad5cf2eb378..79c0d5da9a4 100644 --- a/src/libstd/rt/join_latch.rs +++ b/src/libstd/rt/join_latch.rs @@ -643,3 +643,4 @@ mod test { } } } + diff --git a/src/libstd/rt/local.rs b/src/libstd/rt/local.rs index 6df1ffaa453..374933ab281 100644 --- a/src/libstd/rt/local.rs +++ b/src/libstd/rt/local.rs @@ -13,6 +13,7 @@ use rt::sched::Scheduler; use rt::task::Task; use rt::local_ptr; use rt::rtio::{EventLoop, IoFactoryObject}; +//use borrow::to_uint; pub trait Local { fn put(value: ~Self); @@ -32,6 +33,7 @@ impl Local for Scheduler { let res_ptr: *mut Option = &mut res; unsafe { do local_ptr::borrow |sched| { +// rtdebug!("successfully unsafe borrowed sched pointer"); let result = f(sched); *res_ptr = Some(result); } @@ -51,9 +53,12 @@ impl Local for Task { fn exists() -> bool { rtabort!("unimpl") } fn borrow(f: &fn(&mut Task) -> T) -> T { do Local::borrow:: |sched| { +// rtdebug!("sched about to grab current_task"); match sched.current_task { Some(~ref mut task) => { - f(&mut *task.task) +// rtdebug!("current task pointer: %x", to_uint(task)); +// rtdebug!("current task heap pointer: %x", to_uint(&task.heap)); + f(task) } None => { rtabort!("no scheduler") @@ -64,7 +69,7 @@ impl Local for Task { unsafe fn unsafe_borrow() -> *mut Task { match (*Local::unsafe_borrow::()).current_task { Some(~ref mut task) => { - let s: *mut Task = &mut *task.task; + let s: *mut Task = &mut *task; return s; } None => { diff --git a/src/libstd/rt/mod.rs b/src/libstd/rt/mod.rs index bbf1cf0d9b7..aae194ae548 100644 --- a/src/libstd/rt/mod.rs +++ b/src/libstd/rt/mod.rs @@ -67,7 +67,7 @@ use iter::Times; use iterator::IteratorUtil; use option::Some; use ptr::RawPtr; -use rt::sched::{Scheduler, Coroutine, Shutdown}; +use rt::sched::{Scheduler, Shutdown}; use rt::sleeper_list::SleeperList; use rt::task::Task; use rt::thread::Thread; @@ -268,10 +268,9 @@ pub fn run(main: ~fn()) -> int { // Create and enqueue the main task. let main_cell = Cell::new(main); - let mut new_task = ~Task::new_root(); - new_task.on_exit = Some(on_exit); - let main_task = ~Coroutine::with_task(&mut scheds[0].stack_pool, - new_task, main_cell.take()); + let mut main_task = ~Task::new_root(&mut scheds[0].stack_pool, + main_cell.take()); + main_task.on_exit = Some(on_exit); scheds[0].enqueue_task(main_task); // Run each scheduler in a thread. @@ -348,7 +347,7 @@ pub fn context() -> RuntimeContext { #[test] fn test_context() { use unstable::run_in_bare_thread; - use self::sched::{Scheduler, Coroutine}; + use self::sched::{Scheduler}; use rt::local::Local; use rt::test::new_test_uv_sched; @@ -356,7 +355,7 @@ fn test_context() { do run_in_bare_thread { assert_eq!(context(), GlobalContext); let mut sched = ~new_test_uv_sched(); - let task = ~do Coroutine::new_root(&mut sched.stack_pool) { + let task = ~do Task::new_root(&mut sched.stack_pool) { assert_eq!(context(), TaskContext); let sched = Local::take::(); do sched.deschedule_running_task_and_then() |sched, task| { diff --git a/src/libstd/rt/sched.rs b/src/libstd/rt/sched.rs index bbe4aa25e29..ed5cce4b35c 100644 --- a/src/libstd/rt/sched.rs +++ b/src/libstd/rt/sched.rs @@ -16,19 +16,15 @@ use clone::Clone; use super::sleeper_list::SleeperList; use super::work_queue::WorkQueue; -use super::stack::{StackPool, StackSegment}; +use super::stack::{StackPool}; use super::rtio::{EventLoop, EventLoopObject, RemoteCallbackObject}; use super::context::Context; -use super::task::Task; +use super::task::{Task, AnySched, Sched}; use super::message_queue::MessageQueue; use rt::local_ptr; use rt::local::Local; use rt::rtio::RemoteCallback; use rt::metrics::SchedMetrics; - -//use to_str::ToStr; - -/// To allow for using pointers as scheduler ids use borrow::{to_uint}; /// The Scheduler is responsible for coordinating execution of Coroutines @@ -41,7 +37,7 @@ use borrow::{to_uint}; pub struct Scheduler { /// A queue of available work. Under a work-stealing policy there /// is one per Scheduler. - priv work_queue: WorkQueue<~Coroutine>, + priv work_queue: WorkQueue<~Task>, /// The queue of incoming messages from other schedulers. /// These are enqueued by SchedHandles after which a remote callback /// is triggered to handle the message. @@ -66,7 +62,7 @@ pub struct Scheduler { /// Always valid when a task is executing, otherwise not priv saved_context: Context, /// The currently executing task - current_task: Option<~Coroutine>, + current_task: Option<~Task>, /// An action performed after a context switch on behalf of the /// code running before the context switch priv cleanup_job: Option, @@ -81,33 +77,15 @@ pub struct SchedHandle { sched_id: uint } -pub struct Coroutine { - /// The segment of stack on which the task is currently running or, - /// if the task is blocked, on which the task will resume execution - priv current_stack_segment: StackSegment, - /// These are always valid when the task is not running, unless - /// the task is dead - priv saved_context: Context, - /// The heap, GC, unwinding, local storage, logging - task: ~Task, -} - -// A scheduler home is either a handle to the home scheduler, or an -// explicit "AnySched". -pub enum SchedHome { - AnySched, - Sched(SchedHandle) -} - pub enum SchedMessage { Wake, Shutdown, - PinnedTask(~Coroutine) + PinnedTask(~Task) } enum CleanupJob { DoNothing, - GiveTask(~Coroutine, UnsafeTaskReceiver) + GiveTask(~Task, UnsafeTaskReceiver) } impl Scheduler { @@ -116,7 +94,7 @@ impl Scheduler { pub fn sched_id(&self) -> uint { to_uint(self) } pub fn new(event_loop: ~EventLoopObject, - work_queue: WorkQueue<~Coroutine>, + work_queue: WorkQueue<~Task>, sleeper_list: SleeperList) -> Scheduler { @@ -125,7 +103,7 @@ impl Scheduler { } pub fn new_special(event_loop: ~EventLoopObject, - work_queue: WorkQueue<~Coroutine>, + work_queue: WorkQueue<~Task>, sleeper_list: SleeperList, run_anything: bool) -> Scheduler { @@ -253,7 +231,7 @@ impl Scheduler { /// Pushes the task onto the work stealing queue and tells the /// event loop to run it later. Always use this instead of pushing /// to the work queue directly. - pub fn enqueue_task(&mut self, task: ~Coroutine) { + pub fn enqueue_task(&mut self, task: ~Task) { // We don't want to queue tasks that belong on other threads, // so we send them home at enqueue time. @@ -307,7 +285,7 @@ impl Scheduler { rtdebug!("recv BiasedTask message in sched: %u", this.sched_id()); let mut task = task; - task.task.home = Some(Sched(this.make_handle())); + task.home = Some(Sched(this.make_handle())); this.resume_task_immediately(task); return true; } @@ -349,9 +327,9 @@ impl Scheduler { } /// Given an input Coroutine sends it back to its home scheduler. - fn send_task_home(task: ~Coroutine) { + fn send_task_home(task: ~Task) { let mut task = task; - let mut home = task.task.home.swap_unwrap(); + let mut home = task.home.swap_unwrap(); match home { Sched(ref mut home_handle) => { home_handle.send(PinnedTask(task)); @@ -377,7 +355,7 @@ impl Scheduler { match this.work_queue.pop() { Some(task) => { let action_id = { - let home = &task.task.home; + let home = &task.home; match home { &Some(Sched(ref home_handle)) if home_handle.sched_id != this.sched_id() => { @@ -440,14 +418,15 @@ impl Scheduler { rtdebug!("ending running task"); do self.deschedule_running_task_and_then |sched, dead_task| { - let dead_task = Cell::new(dead_task); - dead_task.take().recycle(&mut sched.stack_pool); + let mut dead_task = dead_task; + let coroutine = dead_task.coroutine.swap_unwrap(); + coroutine.recycle(&mut sched.stack_pool); } rtabort!("control reached end of task"); } - pub fn schedule_task(~self, task: ~Coroutine) { + pub fn schedule_task(~self, task: ~Task) { assert!(self.in_task_context()); // is the task home? @@ -478,7 +457,7 @@ impl Scheduler { // Core scheduling ops - pub fn resume_task_immediately(~self, task: ~Coroutine) { + pub fn resume_task_immediately(~self, task: ~Task) { let mut this = self; assert!(!this.in_task_context()); @@ -521,7 +500,7 @@ impl Scheduler { /// This passes a Scheduler pointer to the fn after the context switch /// in order to prevent that fn from performing further scheduling operations. /// Doing further scheduling could easily result in infinite recursion. - pub fn deschedule_running_task_and_then(~self, f: &fn(&mut Scheduler, ~Coroutine)) { + pub fn deschedule_running_task_and_then(~self, f: &fn(&mut Scheduler, ~Task)) { let mut this = self; assert!(this.in_task_context()); @@ -530,8 +509,8 @@ impl Scheduler { unsafe { let blocked_task = this.current_task.swap_unwrap(); - let f_fake_region = transmute::<&fn(&mut Scheduler, ~Coroutine), - &fn(&mut Scheduler, ~Coroutine)>(f); + let f_fake_region = transmute::<&fn(&mut Scheduler, ~Task), + &fn(&mut Scheduler, ~Task)>(f); let f_opaque = ClosureConverter::from_fn(f_fake_region); this.enqueue_cleanup_job(GiveTask(blocked_task, f_opaque)); } @@ -553,8 +532,8 @@ impl Scheduler { /// Switch directly to another task, without going through the scheduler. /// You would want to think hard about doing this, e.g. if there are /// pending I/O events it would be a bad idea. - pub fn switch_running_tasks_and_then(~self, next_task: ~Coroutine, - f: &fn(&mut Scheduler, ~Coroutine)) { + pub fn switch_running_tasks_and_then(~self, next_task: ~Task, + f: &fn(&mut Scheduler, ~Task)) { let mut this = self; assert!(this.in_task_context()); @@ -563,8 +542,8 @@ impl Scheduler { let old_running_task = this.current_task.swap_unwrap(); let f_fake_region = unsafe { - transmute::<&fn(&mut Scheduler, ~Coroutine), - &fn(&mut Scheduler, ~Coroutine)>(f) + transmute::<&fn(&mut Scheduler, ~Task), + &fn(&mut Scheduler, ~Task)>(f) }; let f_opaque = ClosureConverter::from_fn(f_fake_region); this.enqueue_cleanup_job(GiveTask(old_running_task, f_opaque)); @@ -631,12 +610,22 @@ impl Scheduler { // because borrowck thinks the three patterns are conflicting // borrows unsafe { - let last_task = transmute::, Option<&mut Coroutine>>(last_task); + let last_task = transmute::, Option<&mut Task>>(last_task); let last_task_context = match last_task { - Some(t) => Some(&mut t.saved_context), None => None + Some(t) => { + Some(&mut t.coroutine.get_mut_ref().saved_context) + } + None => { + None + } }; let next_task_context = match self.current_task { - Some(ref mut t) => Some(&mut t.saved_context), None => None + Some(ref mut t) => { + Some(&mut t.coroutine.get_mut_ref().saved_context) + } + None => { + None + } }; // XXX: These transmutes can be removed after snapshot return (transmute(&mut self.saved_context), @@ -661,186 +650,34 @@ impl SchedHandle { } } -impl Coroutine { - - /// This function checks that a coroutine is running "home". - pub fn is_home(&self) -> bool { - rtdebug!("checking if coroutine is home"); - do Local::borrow:: |sched| { - match self.task.home { - Some(AnySched) => { false } - Some(Sched(SchedHandle { sched_id: ref id, _ })) => { - *id == sched.sched_id() - } - None => { rtabort!("error: homeless task!"); } - } - } - } - - /// Without access to self, but with access to the "expected home - /// id", see if we are home. - fn is_home_using_id(id: uint) -> bool { - rtdebug!("checking if coroutine is home using id"); - do Local::borrow:: |sched| { - if sched.sched_id() == id { - true - } else { - false - } - } - } - - /// Check if this coroutine has a home - fn homed(&self) -> bool { - rtdebug!("checking if this coroutine has a home"); - match self.task.home { - Some(AnySched) => { false } - Some(Sched(_)) => { true } - None => { rtabort!("error: homeless task!"); - } - } - } - - /// A version of is_home that does not need to use TLS, it instead - /// takes local scheduler as a parameter. - fn is_home_no_tls(&self, sched: &~Scheduler) -> bool { - rtdebug!("checking if coroutine is home without tls"); - match self.task.home { - Some(AnySched) => { true } - Some(Sched(SchedHandle { sched_id: ref id, _})) => { - *id == sched.sched_id() - } - None => { rtabort!("error: homeless task!"); } - } - } - - /// Check TLS for the scheduler to see if we are on a special - /// scheduler. - pub fn on_special() -> bool { - rtdebug!("checking if coroutine is executing on special sched"); - do Local::borrow::() |sched| { - !sched.run_anything - } - } - - // Created new variants of "new" that takes a home scheduler - // parameter. The original with_task now calls with_task_homed - // using the AnySched paramter. - - pub fn new_homed(stack_pool: &mut StackPool, home: SchedHome, start: ~fn()) -> Coroutine { - Coroutine::with_task_homed(stack_pool, ~Task::new_root(), start, home) - } - - pub fn new_root(stack_pool: &mut StackPool, start: ~fn()) -> Coroutine { - Coroutine::with_task(stack_pool, ~Task::new_root(), start) - } - - pub fn with_task_homed(stack_pool: &mut StackPool, - task: ~Task, - start: ~fn(), - home: SchedHome) -> Coroutine { - - static MIN_STACK_SIZE: uint = 1000000; // XXX: Too much stack - - let start = Coroutine::build_start_wrapper(start); - let mut stack = stack_pool.take_segment(MIN_STACK_SIZE); - // NB: Context holds a pointer to that ~fn - let initial_context = Context::new(start, &mut stack); - let mut crt = Coroutine { - current_stack_segment: stack, - saved_context: initial_context, - task: task, - }; - crt.task.home = Some(home); - return crt; - } - - pub fn with_task(stack_pool: &mut StackPool, - task: ~Task, - start: ~fn()) -> Coroutine { - Coroutine::with_task_homed(stack_pool, - task, - start, - AnySched) - } - - fn build_start_wrapper(start: ~fn()) -> ~fn() { - // XXX: The old code didn't have this extra allocation - let start_cell = Cell::new(start); - let wrapper: ~fn() = || { - // This is the first code to execute after the initial - // context switch to the task. The previous context may - // have asked us to do some cleanup. - unsafe { - let sched = Local::unsafe_borrow::(); - (*sched).run_cleanup_job(); - - let sched = Local::unsafe_borrow::(); - let task = (*sched).current_task.get_mut_ref(); - // FIXME #6141: shouldn't neet to put `start()` in - // another closure - let start_cell = Cell::new(start_cell.take()); - do task.task.run { - // N.B. Removing `start` from the start wrapper - // closure by emptying a cell is critical for - // correctness. The ~Task pointer, and in turn the - // closure used to initialize the first call - // frame, is destroyed in scheduler context, not - // task context. So any captured closures must - // not contain user-definable dtors that expect to - // be in task context. By moving `start` out of - // the closure, all the user code goes out of - // scope while the task is still running. - let start = start_cell.take(); - start(); - }; - } - - let sched = Local::take::(); - sched.terminate_current_task(); - }; - return wrapper; - } - - /// Destroy the task and try to reuse its components - pub fn recycle(~self, stack_pool: &mut StackPool) { - match self { - ~Coroutine {current_stack_segment, _} => { - stack_pool.give_segment(current_stack_segment); - } - } - } -} - // XXX: Some hacks to put a &fn in Scheduler without borrowck // complaining type UnsafeTaskReceiver = sys::Closure; trait ClosureConverter { - fn from_fn(&fn(&mut Scheduler, ~Coroutine)) -> Self; - fn to_fn(self) -> &fn(&mut Scheduler, ~Coroutine); + fn from_fn(&fn(&mut Scheduler, ~Task)) -> Self; + fn to_fn(self) -> &fn(&mut Scheduler, ~Task); } impl ClosureConverter for UnsafeTaskReceiver { - fn from_fn(f: &fn(&mut Scheduler, ~Coroutine)) -> UnsafeTaskReceiver { unsafe { transmute(f) } } - fn to_fn(self) -> &fn(&mut Scheduler, ~Coroutine) { unsafe { transmute(self) } } + fn from_fn(f: &fn(&mut Scheduler, ~Task)) -> UnsafeTaskReceiver { unsafe { transmute(f) } } + fn to_fn(self) -> &fn(&mut Scheduler, ~Task) { unsafe { transmute(self) } } } + #[cfg(test)] mod test { use int; use cell::Cell; - use iterator::IteratorUtil; use unstable::run_in_bare_thread; use task::spawn; use rt::local::Local; use rt::test::*; use super::*; use rt::thread::Thread; - use ptr::to_uint; - use vec::MutableVector; + use borrow::to_uint; + use rt::task::{Task,Sched}; // Confirm that a sched_id actually is the uint form of the // pointer to the scheduler struct. - #[test] fn simple_sched_id_test() { do run_in_bare_thread { @@ -851,7 +688,6 @@ mod test { // Compare two scheduler ids that are different, this should never // fail but may catch a mistake someday. - #[test] fn compare_sched_id_test() { do run_in_bare_thread { @@ -863,7 +699,6 @@ mod test { // A simple test to check if a homed task run on a single // scheduler ends up executing while home. - #[test] fn test_home_sched() { do run_in_bare_thread { @@ -874,8 +709,8 @@ mod test { let sched_handle = sched.make_handle(); let sched_id = sched.sched_id(); - let task = ~do Coroutine::new_homed(&mut sched.stack_pool, - Sched(sched_handle)) { + let task = ~do Task::new_root_homed(&mut sched.stack_pool, + Sched(sched_handle)) { unsafe { *task_ran_ptr = true }; let sched = Local::take::(); assert!(sched.sched_id() == sched_id); @@ -888,7 +723,6 @@ mod test { } // A test for each state of schedule_task - #[test] fn test_schedule_home_states() { @@ -898,7 +732,6 @@ mod test { use rt::work_queue::WorkQueue; do run_in_bare_thread { -// let nthreads = 2; let sleepers = SleeperList::new(); let work_queue = WorkQueue::new(); @@ -924,33 +757,33 @@ mod test { let t1_handle = special_sched.make_handle(); let t4_handle = special_sched.make_handle(); - let t1f = ~do Coroutine::new_homed(&mut special_sched.stack_pool, - Sched(t1_handle)) { - let is_home = Coroutine::is_home_using_id(special_id); + let t1f = ~do Task::new_root_homed(&mut special_sched.stack_pool, + Sched(t1_handle)) || { + let is_home = Task::is_home_using_id(special_id); rtdebug!("t1 should be home: %b", is_home); assert!(is_home); }; let t1f = Cell::new(t1f); - let t2f = ~do Coroutine::new_root(&mut normal_sched.stack_pool) { - let on_special = Coroutine::on_special(); + let t2f = ~do Task::new_root(&mut normal_sched.stack_pool) { + let on_special = Task::on_special(); rtdebug!("t2 should not be on special: %b", on_special); assert!(!on_special); }; let t2f = Cell::new(t2f); - let t3f = ~do Coroutine::new_root(&mut normal_sched.stack_pool) { + let t3f = ~do Task::new_root(&mut normal_sched.stack_pool) { // not on special - let on_special = Coroutine::on_special(); + let on_special = Task::on_special(); rtdebug!("t3 should not be on special: %b", on_special); assert!(!on_special); }; let t3f = Cell::new(t3f); - let t4f = ~do Coroutine::new_homed(&mut special_sched.stack_pool, - Sched(t4_handle)) { + let t4f = ~do Task::new_root_homed(&mut special_sched.stack_pool, + Sched(t4_handle)) { // is home - let home = Coroutine::is_home_using_id(special_id); + let home = Task::is_home_using_id(special_id); rtdebug!("t4 should be home: %b", home); assert!(home); }; @@ -988,7 +821,7 @@ mod test { let t4 = Cell::new(t4); // build a main task that runs our four tests - let main_task = ~do Coroutine::new_root(&mut normal_sched.stack_pool) { + let main_task = ~do Task::new_root(&mut normal_sched.stack_pool) { // the two tasks that require a normal start location t2.take()(); t4.take()(); @@ -997,7 +830,7 @@ mod test { }; // task to run the two "special start" tests - let special_task = ~do Coroutine::new_homed( + let special_task = ~do Task::new_root_homed( &mut special_sched.stack_pool, Sched(special_handle2.take())) { t1.take()(); @@ -1027,91 +860,7 @@ mod test { } } - // The following test is a bit of a mess, but it trys to do - // something tricky so I'm not sure how to get around this in the - // short term. - - // A number of schedulers are created, and then a task is created - // and assigned a home scheduler. It is then "started" on a - // different scheduler. The scheduler it is started on should - // observe that the task is not home, and send it home. - - // This test is light in that it does very little. - - #[test] - fn test_transfer_task_home() { - - use rt::uv::uvio::UvEventLoop; - use rt::sched::Shutdown; - use rt::sleeper_list::SleeperList; - use rt::work_queue::WorkQueue; - use uint; - use container::Container; - use vec::OwnedVector; - - do run_in_bare_thread { - - static N: uint = 8; - - let sleepers = SleeperList::new(); - let work_queue = WorkQueue::new(); - - let mut handles = ~[]; - let mut scheds = ~[]; - - for uint::range(0, N) |_| { - let loop_ = ~UvEventLoop::new(); - let mut sched = ~Scheduler::new(loop_, - work_queue.clone(), - sleepers.clone()); - let handle = sched.make_handle(); - rtdebug!("sched id: %u", handle.sched_id); - handles.push(handle); - scheds.push(sched); - }; - - let handles = Cell::new(handles); - - let home_handle = scheds[6].make_handle(); - let home_id = home_handle.sched_id; - let home = Sched(home_handle); - - let main_task = ~do Coroutine::new_homed(&mut scheds[1].stack_pool, home) { - - // Here we check if the task is running on its home. - let sched = Local::take::(); - rtdebug!("run location scheduler id: %u, home: %u", - sched.sched_id(), - home_id); - assert!(sched.sched_id() == home_id); - Local::put::(sched); - - let mut handles = handles.take(); - for handles.mut_iter().advance |handle| { - handle.send(Shutdown); - } - }; - - scheds[0].enqueue_task(main_task); - - let mut threads = ~[]; - - while !scheds.is_empty() { - let sched = scheds.pop(); - let sched_cell = Cell::new(sched); - let thread = do Thread::start { - let sched = sched_cell.take(); - sched.run(); - }; - threads.push(thread); - } - - let _threads = threads; - } - } - // Do it a lot - #[test] fn test_stress_schedule_task_states() { let n = stress_factor() * 120; @@ -1120,21 +869,6 @@ mod test { } } - // The goal is that this is the high-stress test for making sure - // homing is working. It allocates RUST_RT_STRESS tasks that - // do nothing but assert that they are home at execution - // time. These tasks are queued to random schedulers, so sometimes - // they are home and sometimes not. It also runs RUST_RT_STRESS - // times. - - #[test] - fn test_stress_homed_tasks() { - let n = stress_factor(); - for int::range(0,n as int) |_| { - run_in_mt_newsched_task_random_homed(); - } - } - #[test] fn test_simple_scheduling() { do run_in_bare_thread { @@ -1142,7 +876,7 @@ mod test { let task_ran_ptr: *mut bool = &mut task_ran; let mut sched = ~new_test_uv_sched(); - let task = ~do Coroutine::new_root(&mut sched.stack_pool) { + let task = ~do Task::new_root(&mut sched.stack_pool) { unsafe { *task_ran_ptr = true; } }; sched.enqueue_task(task); @@ -1160,7 +894,7 @@ mod test { let mut sched = ~new_test_uv_sched(); for int::range(0, total) |_| { - let task = ~do Coroutine::new_root(&mut sched.stack_pool) { + let task = ~do Task::new_root(&mut sched.stack_pool) { unsafe { *task_count_ptr = *task_count_ptr + 1; } }; sched.enqueue_task(task); @@ -1177,10 +911,10 @@ mod test { let count_ptr: *mut int = &mut count; let mut sched = ~new_test_uv_sched(); - let task1 = ~do Coroutine::new_root(&mut sched.stack_pool) { + let task1 = ~do Task::new_root(&mut sched.stack_pool) { unsafe { *count_ptr = *count_ptr + 1; } let mut sched = Local::take::(); - let task2 = ~do Coroutine::new_root(&mut sched.stack_pool) { + let task2 = ~do Task::new_root(&mut sched.stack_pool) { unsafe { *count_ptr = *count_ptr + 1; } }; // Context switch directly to the new task @@ -1205,7 +939,7 @@ mod test { let mut sched = ~new_test_uv_sched(); - let start_task = ~do Coroutine::new_root(&mut sched.stack_pool) { + let start_task = ~do Task::new_root(&mut sched.stack_pool) { run_task(count_ptr); }; sched.enqueue_task(start_task); @@ -1215,7 +949,7 @@ mod test { fn run_task(count_ptr: *mut int) { do Local::borrow:: |sched| { - let task = ~do Coroutine::new_root(&mut sched.stack_pool) { + let task = ~do Task::new_root(&mut sched.stack_pool) { unsafe { *count_ptr = *count_ptr + 1; if *count_ptr != MAX { @@ -1233,7 +967,7 @@ mod test { fn test_block_task() { do run_in_bare_thread { let mut sched = ~new_test_uv_sched(); - let task = ~do Coroutine::new_root(&mut sched.stack_pool) { + let task = ~do Task::new_root(&mut sched.stack_pool) { let sched = Local::take::(); assert!(sched.in_task_context()); do sched.deschedule_running_task_and_then() |sched, task| { @@ -1280,13 +1014,13 @@ mod test { let mut sched1 = ~new_test_uv_sched(); let handle1 = sched1.make_handle(); let handle1_cell = Cell::new(handle1); - let task1 = ~do Coroutine::new_root(&mut sched1.stack_pool) { + let task1 = ~do Task::new_root(&mut sched1.stack_pool) { chan_cell.take().send(()); }; sched1.enqueue_task(task1); let mut sched2 = ~new_test_uv_sched(); - let task2 = ~do Coroutine::new_root(&mut sched2.stack_pool) { + let task2 = ~do Task::new_root(&mut sched2.stack_pool) { port_cell.take().recv(); // Release the other scheduler's handle so it can exit handle1_cell.take(); @@ -1383,7 +1117,6 @@ mod test { } } } - } #[test] @@ -1408,5 +1141,4 @@ mod test { } } } - -} +} \ No newline at end of file diff --git a/src/libstd/rt/task.rs b/src/libstd/rt/task.rs index 97c3b6a749b..333eaa53912 100644 --- a/src/libstd/rt/task.rs +++ b/src/libstd/rt/task.rs @@ -23,8 +23,11 @@ use option::{Option, Some, None}; use rt::local::Local; use rt::logging::StdErrLogger; use super::local_heap::LocalHeap; -use rt::sched::{SchedHome, AnySched}; +use rt::sched::{Scheduler, SchedHandle}; use rt::join_latch::JoinLatch; +use rt::stack::{StackSegment, StackPool}; +use rt::context::Context; +use cell::Cell; pub struct Task { heap: LocalHeap, @@ -35,7 +38,22 @@ pub struct Task { home: Option, join_latch: Option<~JoinLatch>, on_exit: Option<~fn(bool)>, - destroyed: bool + destroyed: bool, + coroutine: Option<~Coroutine> +} + +pub struct Coroutine { + /// The segment of stack on which the task is currently running or + /// if the task is blocked, on which the task will resume + /// execution. + priv current_stack_segment: StackSegment, + /// Always valid if the task is alive and not running. + saved_context: Context +} + +pub enum SchedHome { + AnySched, + Sched(SchedHandle) } pub struct GarbageCollector; @@ -46,31 +64,50 @@ pub struct Unwinder { } impl Task { - pub fn new_root() -> Task { + + pub fn new_root(stack_pool: &mut StackPool, + start: ~fn()) -> Task { + Task::new_root_homed(stack_pool, AnySched, start) + } + + pub fn new_child(&mut self, + stack_pool: &mut StackPool, + start: ~fn()) -> Task { + self.new_child_homed(stack_pool, AnySched, start) + } + + pub fn new_root_homed(stack_pool: &mut StackPool, + home: SchedHome, + start: ~fn()) -> Task { Task { heap: LocalHeap::new(), gc: GarbageCollector, storage: LocalStorage(ptr::null(), None), logger: StdErrLogger, unwinder: Unwinder { unwinding: false }, - home: Some(AnySched), + home: Some(home), join_latch: Some(JoinLatch::new_root()), on_exit: None, - destroyed: false + destroyed: false, + coroutine: Some(~Coroutine::new(stack_pool, start)) } } - pub fn new_child(&mut self) -> Task { + pub fn new_child_homed(&mut self, + stack_pool: &mut StackPool, + home: SchedHome, + start: ~fn()) -> Task { Task { heap: LocalHeap::new(), gc: GarbageCollector, storage: LocalStorage(ptr::null(), None), logger: StdErrLogger, - home: Some(AnySched), + home: Some(home), unwinder: Unwinder { unwinding: false }, join_latch: Some(self.join_latch.get_mut_ref().new_child()), on_exit: None, - destroyed: false + destroyed: false, + coroutine: Some(~Coroutine::new(stack_pool, start)) } } @@ -108,11 +145,11 @@ impl Task { /// called unsafely, without removing Task from /// thread-local-storage. fn destroy(&mut self) { - // This is just an assertion that `destroy` was called unsafely - // and this instance of Task is still accessible. + do Local::borrow:: |task| { assert!(borrow::ref_eq(task, self)); } + match self.storage { LocalStorage(ptr, Some(ref dtor)) => { (*dtor)(ptr) @@ -125,12 +162,129 @@ impl Task { self.destroyed = true; } + + /// Check if *task* is currently home. + pub fn is_home(&self) -> bool { + do Local::borrow:: |sched| { + match self.home { + Some(AnySched) => { false } + Some(Sched(SchedHandle { sched_id: ref id, _ })) => { + *id == sched.sched_id() + } + None => { rtabort!("task home of None") } + } + } + } + + pub fn is_home_no_tls(&self, sched: &~Scheduler) -> bool { + match self.home { + Some(AnySched) => { false } + Some(Sched(SchedHandle { sched_id: ref id, _ })) => { + *id == sched.sched_id() + } + None => {rtabort!("task home of None") } + } + } + + pub fn is_home_using_id(sched_id: uint) -> bool { + do Local::borrow:: |task| { + match task.home { + Some(Sched(SchedHandle { sched_id: ref id, _ })) => { + *id == sched_id + } + Some(AnySched) => { false } + None => { rtabort!("task home of None") } + } + } + } + + /// Check if this *task* has a home. + pub fn homed(&self) -> bool { + match self.home { + Some(AnySched) => { false } + Some(Sched(_)) => { true } + None => { + rtabort!("task home of None") + } + } + } + + /// On a special scheduler? + pub fn on_special() -> bool { + do Local::borrow:: |sched| { + sched.run_anything + } + } + } impl Drop for Task { fn finalize(&self) { assert!(self.destroyed) } } +// Coroutines represent nothing more than a context and a stack +// segment. + +impl Coroutine { + + pub fn new(stack_pool: &mut StackPool, start: ~fn()) -> Coroutine { + static MIN_STACK_SIZE: uint = 100000; // XXX: Too much stack + + let start = Coroutine::build_start_wrapper(start); + let mut stack = stack_pool.take_segment(MIN_STACK_SIZE); + let initial_context = Context::new(start, &mut stack); + Coroutine { + current_stack_segment: stack, + saved_context: initial_context + } + } + + fn build_start_wrapper(start: ~fn()) -> ~fn() { + let start_cell = Cell::new(start); + let wrapper: ~fn() = || { + // First code after swap to this new context. Run our + // cleanup job. + unsafe { + let sched = Local::unsafe_borrow::(); + (*sched).run_cleanup_job(); + + let sched = Local::unsafe_borrow::(); + let task = (*sched).current_task.get_mut_ref(); + + do task.run { + // N.B. Removing `start` from the start wrapper + // closure by emptying a cell is critical for + // correctness. The ~Task pointer, and in turn the + // closure used to initialize the first call + // frame, is destroyed in the scheduler context, + // not task context. So any captured closures must + // not contain user-definable dtors that expect to + // be in task context. By moving `start` out of + // the closure, all the user code goes our of + // scope while the task is still running. + let start = start_cell.take(); + start(); + }; + } + + let sched = Local::take::(); + sched.terminate_current_task(); + }; + return wrapper; + } + + /// Destroy coroutine and try to reuse stack segment. + pub fn recycle(~self, stack_pool: &mut StackPool) { + match self { + ~Coroutine { current_stack_segment, _ } => { + stack_pool.give_segment(current_stack_segment); + } + } + } + +} + + // Just a sanity check to make sure we are catching a Rust-thrown exception static UNWIND_TOKEN: uintptr_t = 839147; @@ -209,8 +363,10 @@ mod test { fn unwind() { do run_in_newsched_task() { let result = spawntask_try(||()); + rtdebug!("trying first assert"); assert!(result.is_ok()); let result = spawntask_try(|| fail!()); + rtdebug!("trying second assert"); assert!(result.is_err()); } } diff --git a/src/libstd/rt/test.rs b/src/libstd/rt/test.rs index b0e49684014..659c7eb4985 100644 --- a/src/libstd/rt/test.rs +++ b/src/libstd/rt/test.rs @@ -8,30 +8,27 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +use cell::Cell; use uint; use option::{Some, None}; -use cell::Cell; -use clone::Clone; -use container::Container; -use iterator::IteratorUtil; -use vec::{OwnedVector, MutableVector}; -use result::{Result, Ok, Err}; -use unstable::run_in_bare_thread; +use rt::sched::Scheduler; use super::io::net::ip::{IpAddr, Ipv4}; -use rt::comm::oneshot; -use rt::task::Task; -use rt::thread::Thread; use rt::local::Local; -use rt::sched::{Scheduler, Coroutine}; -use rt::sleeper_list::SleeperList; +use unstable::run_in_bare_thread; +use rt::thread::Thread; +use rt::task::Task; +use rt::uv::uvio::UvEventLoop; use rt::work_queue::WorkQueue; +use rt::sleeper_list::SleeperList; +use rt::task::{Sched}; +use rt::comm::oneshot; +use result::{Result, Ok, Err}; pub fn new_test_uv_sched() -> Scheduler { - use rt::uv::uvio::UvEventLoop; - use rt::work_queue::WorkQueue; - use rt::sleeper_list::SleeperList; - let mut sched = Scheduler::new(~UvEventLoop::new(), WorkQueue::new(), SleeperList::new()); + let mut sched = Scheduler::new(~UvEventLoop::new(), + WorkQueue::new(), + SleeperList::new()); // Don't wait for the Shutdown message sched.no_sleep = true; return sched; @@ -41,19 +38,15 @@ pub fn new_test_uv_sched() -> Scheduler { /// then waits for the scheduler to exit. Failure of the task /// will abort the process. pub fn run_in_newsched_task(f: ~fn()) { - use super::sched::*; - use unstable::run_in_bare_thread; - let f = Cell::new(f); do run_in_bare_thread { let mut sched = ~new_test_uv_sched(); - let mut new_task = ~Task::new_root(); let on_exit: ~fn(bool) = |exit_status| rtassert!(exit_status); - new_task.on_exit = Some(on_exit); - let task = ~Coroutine::with_task(&mut sched.stack_pool, - new_task, - f.take()); + let mut task = ~Task::new_root(&mut sched.stack_pool, + f.take()); + rtdebug!("newsched_task: %x", to_uint(task)); + task.on_exit = Some(on_exit); sched.enqueue_task(task); sched.run(); } @@ -65,7 +58,6 @@ pub fn run_in_newsched_task(f: ~fn()) { pub fn run_in_mt_newsched_task(f: ~fn()) { use os; use from_str::FromStr; - use rt::uv::uvio::UvEventLoop; use rt::sched::Shutdown; use rt::util; @@ -90,7 +82,9 @@ pub fn run_in_mt_newsched_task(f: ~fn()) { for uint::range(0, nthreads) |_| { let loop_ = ~UvEventLoop::new(); - let mut sched = ~Scheduler::new(loop_, work_queue.clone(), sleepers.clone()); + let mut sched = ~Scheduler::new(loop_, + work_queue.clone(), + sleepers.clone()); let handle = sched.make_handle(); handles.push(handle); @@ -99,9 +93,7 @@ pub fn run_in_mt_newsched_task(f: ~fn()) { let f_cell = Cell::new(f_cell.take()); let handles = Cell::new(handles); - let mut new_task = ~Task::new_root(); let on_exit: ~fn(bool) = |exit_status| { - let mut handles = handles.take(); // Tell schedulers to exit for handles.mut_iter().advance |handle| { @@ -110,9 +102,9 @@ pub fn run_in_mt_newsched_task(f: ~fn()) { rtassert!(exit_status); }; - new_task.on_exit = Some(on_exit); - let main_task = ~Coroutine::with_task(&mut scheds[0].stack_pool, - new_task, f_cell.take()); + let mut main_task = ~Task::new_root(&mut scheds[0].stack_pool, + f_cell.take()); + main_task.on_exit = Some(on_exit); scheds[0].enqueue_task(main_task); let mut threads = ~[]; @@ -134,144 +126,44 @@ pub fn run_in_mt_newsched_task(f: ~fn()) { } -// THIS IS AWFUL. Copy-pasted the above initialization function but -// with a number of hacks to make it spawn tasks on a variety of -// schedulers with a variety of homes using the new spawn. - -pub fn run_in_mt_newsched_task_random_homed() { - use libc; - use os; - use from_str::FromStr; - use rt::uv::uvio::UvEventLoop; - use rt::sched::Shutdown; - - do run_in_bare_thread { - let nthreads = match os::getenv("RUST_TEST_THREADS") { - Some(nstr) => FromStr::from_str(nstr).get(), - None => unsafe { - // Using more threads than cores in test code to force - // the OS to preempt them frequently. Assuming that - // this help stress test concurrent types. - rust_get_num_cpus() * 2 - } - }; - - let sleepers = SleeperList::new(); - let work_queue = WorkQueue::new(); - - let mut handles = ~[]; - let mut scheds = ~[]; - - // create a few special schedulers, those with even indicies - // will be pinned-only - for uint::range(0, nthreads) |i| { - let special = (i % 2) == 0; - let loop_ = ~UvEventLoop::new(); - let mut sched = ~Scheduler::new_special( - loop_, work_queue.clone(), sleepers.clone(), special); - let handle = sched.make_handle(); - handles.push(handle); - scheds.push(sched); - } - - // Schedule a pile o tasks - let n = 5*stress_factor(); - for uint::range(0,n) |_i| { - rtdebug!("creating task: %u", _i); - let hf: ~fn() = || { assert!(true) }; - spawntask_homed(&mut scheds, hf); - } - - // Now we want another pile o tasks that do not ever run on a - // special scheduler, because they are normal tasks. Because - // we can we put these in the "main" task. - - let n = 5*stress_factor(); - - let f: ~fn() = || { - for uint::range(0,n) |_| { - let f: ~fn() = || { - // Borrow the scheduler we run on and check if it is - // privileged. - do Local::borrow:: |sched| { - assert!(sched.run_anything); - }; - }; - spawntask_random(f); - }; - }; - - let f_cell = Cell::new(f); - let handles = Cell::new(handles); - - rtdebug!("creating main task"); - - let main_task = ~do Coroutine::new_root(&mut scheds[0].stack_pool) { - f_cell.take()(); - let mut handles = handles.take(); - // Tell schedulers to exit - for handles.mut_iter().advance |handle| { - handle.send(Shutdown); - } - }; - - rtdebug!("queuing main task") - - scheds[0].enqueue_task(main_task); - - let mut threads = ~[]; - - while !scheds.is_empty() { - let sched = scheds.pop(); - let sched_cell = Cell::new(sched); - let thread = do Thread::start { - let sched = sched_cell.take(); - rtdebug!("running sched: %u", sched.sched_id()); - sched.run(); - }; - - threads.push(thread); - } - - rtdebug!("waiting on scheduler threads"); - - // Wait for schedulers - let _threads = threads; - } - - extern { - fn rust_get_num_cpus() -> libc::uintptr_t; - } -} - - /// Test tasks will abort on failure instead of unwinding pub fn spawntask(f: ~fn()) { use super::sched::*; + let f = Cell::new(f); - rtdebug!("spawntask taking the scheduler from TLS") - let task = do Local::borrow::() |running_task| { - ~running_task.new_child() + let task = unsafe { + let sched = Local::unsafe_borrow::(); + rtdebug!("spawntask taking the scheduler from TLS"); + + + do Local::borrow::() |running_task| { + ~running_task.new_child(&mut (*sched).stack_pool, f.take()) + } }; - let mut sched = Local::take::(); - let task = ~Coroutine::with_task(&mut sched.stack_pool, - task, f); + rtdebug!("new task pointer: %x", to_uint(task)); + + let sched = Local::take::(); rtdebug!("spawntask scheduling the new task"); sched.schedule_task(task); } + /// Create a new task and run it right now. Aborts on failure pub fn spawntask_immediately(f: ~fn()) { use super::sched::*; - let task = do Local::borrow::() |running_task| { - ~running_task.new_child() + let f = Cell::new(f); + + let task = unsafe { + let sched = Local::unsafe_borrow::(); + do Local::borrow::() |running_task| { + ~running_task.new_child(&mut (*sched).stack_pool, + f.take()) + } }; - let mut sched = Local::take::(); - let task = ~Coroutine::with_task(&mut sched.stack_pool, - task, f); + let sched = Local::take::(); do sched.switch_running_tasks_and_then(task) |sched, task| { sched.enqueue_task(task); } @@ -280,15 +172,16 @@ pub fn spawntask_immediately(f: ~fn()) { /// Create a new task and run it right now. Aborts on failure pub fn spawntask_later(f: ~fn()) { use super::sched::*; + let f = Cell::new(f); - let task = do Local::borrow::() |running_task| { - ~running_task.new_child() + let task = unsafe { + let sched = Local::unsafe_borrow::(); + do Local::borrow::() |running_task| { + ~running_task.new_child(&mut (*sched).stack_pool, f.take()) + } }; let mut sched = Local::take::(); - let task = ~Coroutine::with_task(&mut sched.stack_pool, - task, f); - sched.enqueue_task(task); Local::put(sched); } @@ -298,13 +191,18 @@ pub fn spawntask_random(f: ~fn()) { use super::sched::*; use rand::{Rand, rng}; - let task = do Local::borrow::() |running_task| { - ~running_task.new_child() + let f = Cell::new(f); + + let task = unsafe { + let sched = Local::unsafe_borrow::(); + do Local::borrow::() |running_task| { + ~running_task.new_child(&mut (*sched).stack_pool, + f.take()) + + } }; let mut sched = Local::take::(); - let task = ~Coroutine::with_task(&mut sched.stack_pool, - task, f); let mut rng = rng(); let run_now: bool = Rand::rand(&mut rng); @@ -343,33 +241,49 @@ pub fn spawntask_homed(scheds: &mut ~[~Scheduler], f: ~fn()) { f() }; - ~Coroutine::with_task_homed(&mut sched.stack_pool, - ~Task::new_root(), - af, - Sched(handle)) + ~Task::new_root_homed(&mut sched.stack_pool, + Sched(handle), + af) }; let dest_sched = &mut scheds[rng.gen_int_range(0,scheds.len() as int)]; // enqueue it for future execution dest_sched.enqueue_task(task); } -/// Spawn a task and wait for it to finish, returning whether it completed successfully or failed +/// Spawn a task and wait for it to finish, returning whether it +/// completed successfully or failed pub fn spawntask_try(f: ~fn()) -> Result<(), ()> { use cell::Cell; use super::sched::*; + let f = Cell::new(f); + let (port, chan) = oneshot(); let chan = Cell::new(chan); - let mut new_task = ~Task::new_root(); let on_exit: ~fn(bool) = |exit_status| chan.take().send(exit_status); + let mut new_task = unsafe { + let sched = Local::unsafe_borrow::(); + do Local::borrow:: |_running_task| { + + // I don't understand why using a child task here fails. I + // think the fail status is propogating back up the task + // tree and triggering a fail for the parent, which we + // aren't correctly expecting. + + // ~running_task.new_child(&mut (*sched).stack_pool, + ~Task::new_root(&mut (*sched).stack_pool, + f.take()) + } + }; new_task.on_exit = Some(on_exit); - let mut sched = Local::take::(); - let new_task = ~Coroutine::with_task(&mut sched.stack_pool, - new_task, f); + + let sched = Local::take::(); do sched.switch_running_tasks_and_then(new_task) |sched, old_task| { sched.enqueue_task(old_task); } + rtdebug!("enqueued the new task, now waiting on exit_status"); + let exit_status = port.recv(); if exit_status { Ok(()) } else { Err(()) } } @@ -378,23 +292,27 @@ pub fn spawntask_try(f: ~fn()) -> Result<(), ()> { pub fn spawntask_thread(f: ~fn()) -> Thread { use rt::sched::*; - let task = do Local::borrow::() |running_task| { - ~running_task.new_child() + let f = Cell::new(f); + + let task = unsafe { + let sched = Local::unsafe_borrow::(); + do Local::borrow::() |running_task| { + ~running_task.new_child(&mut (*sched).stack_pool, + f.take()) + } }; let task = Cell::new(task); - let f = Cell::new(f); + let thread = do Thread::start { let mut sched = ~new_test_uv_sched(); - let task = ~Coroutine::with_task(&mut sched.stack_pool, - task.take(), - f.take()); - sched.enqueue_task(task); + sched.enqueue_task(task.take()); sched.run(); }; return thread; } + /// Get a port number, starting at 9600, for use in tests pub fn next_test_port() -> u16 { unsafe { @@ -410,7 +328,8 @@ pub fn next_test_ip4() -> IpAddr { Ipv4(127, 0, 0, 1, next_test_port()) } -/// Get a constant that represents the number of times to repeat stress tests. Default 1. +/// Get a constant that represents the number of times to repeat +/// stress tests. Default 1. pub fn stress_factor() -> uint { use os::getenv; diff --git a/src/libstd/rt/tube.rs b/src/libstd/rt/tube.rs index 89f3d10b5e4..013eb438c36 100644 --- a/src/libstd/rt/tube.rs +++ b/src/libstd/rt/tube.rs @@ -16,14 +16,15 @@ use option::*; use clone::Clone; use super::rc::RC; -use rt::sched::{Scheduler, Coroutine}; +use rt::sched::Scheduler; use rt::{context, TaskContext, SchedulerContext}; use rt::local::Local; +use rt::task::Task; use vec::OwnedVector; use container::Container; struct TubeState { - blocked_task: Option<~Coroutine>, + blocked_task: Option<~Task>, buf: ~[T] } diff --git a/src/libstd/rt/uv/uvio.rs b/src/libstd/rt/uv/uvio.rs index 298277b3df0..b1708e70733 100644 --- a/src/libstd/rt/uv/uvio.rs +++ b/src/libstd/rt/uv/uvio.rs @@ -29,7 +29,10 @@ use unstable::sync::{Exclusive, exclusive}; #[cfg(test)] use container::Container; #[cfg(test)] use uint; #[cfg(test)] use unstable::run_in_bare_thread; -#[cfg(test)] use rt::test::*; +#[cfg(test)] use rt::test::{spawntask_immediately, + next_test_ip4, + run_in_newsched_task}; + pub struct UvEventLoop { uvio: UvIoFactory diff --git a/src/libstd/task/mod.rs b/src/libstd/task/mod.rs index 99858feab22..b0fc6b2884f 100644 --- a/src/libstd/task/mod.rs +++ b/src/libstd/task/mod.rs @@ -1182,3 +1182,4 @@ fn test_simple_newsched_spawn() { spawn(||()) } } + diff --git a/src/libstd/task/spawn.rs b/src/libstd/task/spawn.rs index 63eb768d1c9..aea8cda6a21 100644 --- a/src/libstd/task/spawn.rs +++ b/src/libstd/task/spawn.rs @@ -581,13 +581,20 @@ pub fn spawn_raw(opts: TaskOpts, f: ~fn()) { fn spawn_raw_newsched(mut opts: TaskOpts, f: ~fn()) { use rt::sched::*; - let mut task = if opts.linked { - do Local::borrow::() |running_task| { - ~running_task.new_child() + let f = Cell::new(f); + + let mut task = unsafe { + let sched = Local::unsafe_borrow::(); + rtdebug!("unsafe borrowed sched"); + + if opts.linked { + do Local::borrow::() |running_task| { + ~running_task.new_child(&mut (*sched).stack_pool, f.take()) + } + } else { + // An unlinked task is a new root in the task tree + ~Task::new_root(&mut (*sched).stack_pool, f.take()) } - } else { - // An unlinked task is a new root in the task tree - ~Task::new_root() }; if opts.notify_chan.is_some() { @@ -601,9 +608,13 @@ fn spawn_raw_newsched(mut opts: TaskOpts, f: ~fn()) { task.on_exit = Some(on_exit); } + rtdebug!("spawn about to take scheduler"); + let mut sched = Local::take::(); - let task = ~Coroutine::with_task(&mut sched.stack_pool, - task, f); + rtdebug!("took sched in spawn"); +// let task = ~Coroutine::with_task(&mut sched.stack_pool, +// task, f); +// let task = ~Task::new_root(&mut sched.stack_pool, f); sched.schedule_task(task); } diff --git a/src/libstd/unstable/lang.rs b/src/libstd/unstable/lang.rs index f750b31a466..3a071af5d4c 100644 --- a/src/libstd/unstable/lang.rs +++ b/src/libstd/unstable/lang.rs @@ -23,6 +23,7 @@ use option::{Option, Some, None}; use io; use rt::global_heap; use rt::borrowck; +use borrow::to_uint; #[allow(non_camel_case_types)] pub type rust_task = c_void; @@ -90,6 +91,9 @@ pub unsafe fn local_malloc(td: *c_char, size: uintptr_t) -> *c_char { _ => { let mut alloc = ::ptr::null(); do Local::borrow:: |task| { + rtdebug!("task pointer: %x, heap pointer: %x", + to_uint(task), + to_uint(&task.heap)); alloc = task.heap.alloc(td as *c_void, size as uint) as *c_char; } return alloc; -- cgit 1.4.1-3-g733a5 From 29c9443d854073feeceb7ec2afa5841d9d1242af Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Mon, 8 Jul 2013 18:06:17 -0700 Subject: std: Add a yield implementation for newsched --- src/libstd/task/mod.rs | 23 +++++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) (limited to 'src/libstd/task') diff --git a/src/libstd/task/mod.rs b/src/libstd/task/mod.rs index ae8f1c2101d..5a3ff10ae83 100644 --- a/src/libstd/task/mod.rs +++ b/src/libstd/task/mod.rs @@ -497,11 +497,26 @@ pub fn try(f: ~fn() -> T) -> Result { pub fn yield() { //! Yield control to the task scheduler + use rt::{context, OldTaskContext}; + use rt::local::Local; + use rt::sched::Scheduler; + unsafe { - let task_ = rt::rust_get_task(); - let killed = rt::rust_task_yield(task_); - if killed && !failing() { - fail!("killed"); + match context() { + OldTaskContext => { + let task_ = rt::rust_get_task(); + let killed = rt::rust_task_yield(task_); + if killed && !failing() { + fail!("killed"); + } + } + _ => { + // XXX: What does yield really mean in newsched? + let sched = Local::take::(); + do sched.deschedule_running_task_and_then |sched, task| { + sched.enqueue_task(task); + } + } } } } -- cgit 1.4.1-3-g733a5