diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/libstd/macros.rs | 9 | ||||
| -rw-r--r-- | src/libstd/rt/comm.rs | 3 | ||||
| -rw-r--r-- | src/libstd/rt/context.rs | 6 | ||||
| -rw-r--r-- | src/libstd/rt/io/net/tcp.rs | 60 | ||||
| -rw-r--r-- | src/libstd/rt/io/net/udp.rs | 16 | ||||
| -rw-r--r-- | src/libstd/rt/local.rs | 146 | ||||
| -rw-r--r-- | src/libstd/rt/mod.rs | 110 | ||||
| -rw-r--r-- | src/libstd/rt/sched.rs | 1027 | ||||
| -rw-r--r-- | src/libstd/rt/task.rs | 215 | ||||
| -rw-r--r-- | src/libstd/rt/test.rs | 221 | ||||
| -rw-r--r-- | src/libstd/rt/tube.rs | 5 | ||||
| -rw-r--r-- | src/libstd/rt/uv/mod.rs | 5 | ||||
| -rw-r--r-- | src/libstd/rt/uv/uvio.rs | 40 | ||||
| -rw-r--r-- | src/libstd/task/spawn.rs | 43 | ||||
| -rw-r--r-- | src/libstd/unstable/lang.rs | 3 |
15 files changed, 887 insertions, 1022 deletions
diff --git a/src/libstd/macros.rs b/src/libstd/macros.rs index 7748c43efcd..04058887970 100644 --- a/src/libstd/macros.rs +++ b/src/libstd/macros.rs @@ -23,9 +23,14 @@ macro_rules! rtdebug_ ( } ) ) -// An alternate version with no output, for turning off logging +// An alternate version with no output, for turning off logging. An +// earlier attempt that did not call the fmt! macro was insufficient, +// as a case of the "let bind each variable" approach eventually +// failed without an error message describing the invocation site. macro_rules! rtdebug ( - ($( $arg:expr),+) => ( $(let _ = $arg)*; ) + ($( $arg:expr),+) => ( { + let _x = fmt!( $($arg),+ ); + }) ) macro_rules! rtassert ( diff --git a/src/libstd/rt/comm.rs b/src/libstd/rt/comm.rs index 79ee8405531..491bdbe9b06 100644 --- a/src/libstd/rt/comm.rs +++ b/src/libstd/rt/comm.rs @@ -743,7 +743,7 @@ mod test { do run_in_newsched_task { let (port, chan) = oneshot::<~int>(); let port_cell = Cell::new(port); - do spawntask_immediately { + do spawntask { assert!(port_cell.take().recv() == ~10); } @@ -1019,5 +1019,4 @@ mod test { } } } - } diff --git a/src/libstd/rt/context.rs b/src/libstd/rt/context.rs index b30a55978f7..890ad061a68 100644 --- a/src/libstd/rt/context.rs +++ b/src/libstd/rt/context.rs @@ -49,12 +49,11 @@ impl Context { let argp: *c_void = unsafe { transmute::<&~fn(), *c_void>(&*start) }; let sp: *uint = stack.end(); let sp: *mut uint = unsafe { transmute_mut_unsafe(sp) }; - // Save and then immediately load the current context, // which we will then modify to call the given function when restored let mut regs = new_regs(); unsafe { - swap_registers(transmute_mut_region(&mut *regs), transmute_region(&*regs)) + swap_registers(transmute_mut_region(&mut *regs), transmute_region(&*regs)); }; initialize_call_frame(&mut *regs, fp, argp, sp); @@ -72,13 +71,14 @@ impl Context { then loading the registers from a previously saved Context. */ pub fn swap(out_context: &mut Context, in_context: &Context) { + rtdebug!("swapping contexts"); let out_regs: &mut Registers = match out_context { &Context { regs: ~ref mut r, _ } => r }; let in_regs: &Registers = match in_context { &Context { regs: ~ref r, _ } => r }; - + rtdebug!("doing raw swap"); unsafe { swap_registers(out_regs, in_regs) }; } } diff --git a/src/libstd/rt/io/net/tcp.rs b/src/libstd/rt/io/net/tcp.rs index 1d7dafc4302..edfd3a92b5f 100644 --- a/src/libstd/rt/io/net/tcp.rs +++ b/src/libstd/rt/io/net/tcp.rs @@ -186,7 +186,7 @@ mod test { do run_in_newsched_task { let addr = next_test_ip4(); - do spawntask_immediately { + do spawntask { let mut listener = TcpListener::bind(addr); let mut stream = listener.accept(); let mut buf = [0]; @@ -194,7 +194,7 @@ mod test { assert!(buf[0] == 99); } - do spawntask_immediately { + do spawntask { let mut stream = TcpStream::connect(addr); stream.write([99]); } @@ -206,7 +206,7 @@ mod test { do run_in_newsched_task { let addr = next_test_ip6(); - do spawntask_immediately { + do spawntask { let mut listener = TcpListener::bind(addr); let mut stream = listener.accept(); let mut buf = [0]; @@ -214,7 +214,7 @@ mod test { assert!(buf[0] == 99); } - do spawntask_immediately { + do spawntask { let mut stream = TcpStream::connect(addr); stream.write([99]); } @@ -226,7 +226,7 @@ mod test { do run_in_newsched_task { let addr = next_test_ip4(); - do spawntask_immediately { + do spawntask { let mut listener = TcpListener::bind(addr); let mut stream = listener.accept(); let mut buf = [0]; @@ -234,7 +234,7 @@ mod test { assert!(nread.is_none()); } - do spawntask_immediately { + do spawntask { let _stream = TcpStream::connect(addr); // Close } @@ -246,7 +246,7 @@ mod test { do run_in_newsched_task { let addr = next_test_ip6(); - do spawntask_immediately { + do spawntask { let mut listener = TcpListener::bind(addr); let mut stream = listener.accept(); let mut buf = [0]; @@ -254,7 +254,7 @@ mod test { assert!(nread.is_none()); } - do spawntask_immediately { + do spawntask { let _stream = TcpStream::connect(addr); // Close } @@ -266,7 +266,7 @@ mod test { do run_in_newsched_task { let addr = next_test_ip4(); - do spawntask_immediately { + do spawntask { let mut listener = TcpListener::bind(addr); let mut stream = listener.accept(); let mut buf = [0]; @@ -276,7 +276,7 @@ mod test { assert!(nread.is_none()); } - do spawntask_immediately { + do spawntask { let _stream = TcpStream::connect(addr); // Close } @@ -288,7 +288,7 @@ mod test { do run_in_newsched_task { let addr = next_test_ip6(); - do spawntask_immediately { + do spawntask { let mut listener = TcpListener::bind(addr); let mut stream = listener.accept(); let mut buf = [0]; @@ -298,7 +298,7 @@ mod test { assert!(nread.is_none()); } - do spawntask_immediately { + do spawntask { let _stream = TcpStream::connect(addr); // Close } @@ -310,7 +310,7 @@ mod test { do run_in_newsched_task { let addr = next_test_ip4(); - do spawntask_immediately { + do spawntask { let mut listener = TcpListener::bind(addr); let mut stream = listener.accept(); let buf = [0]; @@ -327,7 +327,7 @@ mod test { } } - do spawntask_immediately { + do spawntask { let _stream = TcpStream::connect(addr); // Close } @@ -339,7 +339,7 @@ mod test { do run_in_newsched_task { let addr = next_test_ip6(); - do spawntask_immediately { + do spawntask { let mut listener = TcpListener::bind(addr); let mut stream = listener.accept(); let buf = [0]; @@ -356,7 +356,7 @@ mod test { } } - do spawntask_immediately { + do spawntask { let _stream = TcpStream::connect(addr); // Close } @@ -369,7 +369,7 @@ mod test { let addr = next_test_ip4(); let max = 10; - do spawntask_immediately { + do spawntask { let mut listener = TcpListener::bind(addr); do max.times { let mut stream = listener.accept(); @@ -379,8 +379,8 @@ mod test { } } - do spawntask_immediately { - do max.times { + do spawntask { + for max.times { let mut stream = TcpStream::connect(addr); stream.write([99]); } @@ -394,7 +394,7 @@ mod test { let addr = next_test_ip6(); let max = 10; - do spawntask_immediately { + do spawntask { let mut listener = TcpListener::bind(addr); do max.times { let mut stream = listener.accept(); @@ -404,8 +404,8 @@ mod test { } } - do spawntask_immediately { - do max.times { + do spawntask { + for max.times { let mut stream = TcpStream::connect(addr); stream.write([99]); } @@ -419,13 +419,13 @@ mod test { let addr = next_test_ip4(); static MAX: int = 10; - do spawntask_immediately { + do spawntask { let mut listener = TcpListener::bind(addr); for int::range(0, MAX) |i| { let stream = Cell::new(listener.accept()); rtdebug!("accepted"); // Start another task to handle the connection - do spawntask_immediately { + do spawntask { let mut stream = stream.take(); let mut buf = [0]; stream.read(buf); @@ -440,7 +440,7 @@ mod test { fn connect(i: int, addr: IpAddr) { if i == MAX { return } - do spawntask_immediately { + do spawntask { rtdebug!("connecting"); let mut stream = TcpStream::connect(addr); // Connect again before writing @@ -458,13 +458,13 @@ mod test { let addr = next_test_ip6(); static MAX: int = 10; - do spawntask_immediately { + do spawntask { let mut listener = TcpListener::bind(addr); for int::range(0, MAX) |i| { let stream = Cell::new(listener.accept()); rtdebug!("accepted"); // Start another task to handle the connection - do spawntask_immediately { + do spawntask { let mut stream = stream.take(); let mut buf = [0]; stream.read(buf); @@ -479,7 +479,7 @@ mod test { fn connect(i: int, addr: IpAddr) { if i == MAX { return } - do spawntask_immediately { + do spawntask { rtdebug!("connecting"); let mut stream = TcpStream::connect(addr); // Connect again before writing @@ -497,7 +497,7 @@ mod test { let addr = next_test_ip4(); static MAX: int = 10; - do spawntask_immediately { + do spawntask { let mut listener = TcpListener::bind(addr); for int::range(0, MAX) |_| { let stream = Cell::new(listener.accept()); @@ -535,7 +535,7 @@ mod test { let addr = next_test_ip6(); static MAX: int = 10; - do spawntask_immediately { + do spawntask { let mut listener = TcpListener::bind(addr); for int::range(0, MAX) |_| { let stream = Cell::new(listener.accept()); diff --git a/src/libstd/rt/io/net/udp.rs b/src/libstd/rt/io/net/udp.rs index d186ad15f4a..76200d6f86e 100644 --- a/src/libstd/rt/io/net/udp.rs +++ b/src/libstd/rt/io/net/udp.rs @@ -132,7 +132,7 @@ mod test { let server_ip = next_test_ip4(); let client_ip = next_test_ip4(); - do spawntask_immediately { + do spawntask { match UdpSocket::bind(server_ip) { Some(ref mut server) => { let mut buf = [0]; @@ -149,7 +149,7 @@ mod test { } } - do spawntask_immediately { + do spawntask { match UdpSocket::bind(client_ip) { Some(ref mut client) => client.sendto([99], server_ip), None => fail!() @@ -164,7 +164,7 @@ mod test { let server_ip = next_test_ip6(); let client_ip = next_test_ip6(); - do spawntask_immediately { + do spawntask { match UdpSocket::bind(server_ip) { Some(ref mut server) => { let mut buf = [0]; @@ -181,7 +181,7 @@ mod test { } } - do spawntask_immediately { + do spawntask { match UdpSocket::bind(client_ip) { Some(ref mut client) => client.sendto([99], server_ip), None => fail!() @@ -196,7 +196,7 @@ mod test { let server_ip = next_test_ip4(); let client_ip = next_test_ip4(); - do spawntask_immediately { + do spawntask { match UdpSocket::bind(server_ip) { Some(server) => { let server = ~server; @@ -214,7 +214,7 @@ mod test { } } - do spawntask_immediately { + do spawntask { match UdpSocket::bind(client_ip) { Some(client) => { let client = ~client; @@ -233,7 +233,7 @@ mod test { let server_ip = next_test_ip6(); let client_ip = next_test_ip6(); - do spawntask_immediately { + do spawntask { match UdpSocket::bind(server_ip) { Some(server) => { let server = ~server; @@ -251,7 +251,7 @@ mod test { } } - do spawntask_immediately { + do spawntask { match UdpSocket::bind(client_ip) { Some(client) => { let client = ~client; diff --git a/src/libstd/rt/local.rs b/src/libstd/rt/local.rs index b47bbf3edf0..34e3a0241a9 100644 --- a/src/libstd/rt/local.rs +++ b/src/libstd/rt/local.rs @@ -14,6 +14,7 @@ use rt::task::Task; use rt::local_ptr; use rt::rtio::{EventLoop, IoFactoryObject}; //use borrow::to_uint; +use cell::Cell; pub trait Local { fn put(value: ~Self); @@ -24,40 +25,56 @@ pub trait Local { unsafe fn try_unsafe_borrow() -> Option<*mut Self>; } -impl Local for Scheduler { - fn put(value: ~Scheduler) { unsafe { local_ptr::put(value) }} - fn take() -> ~Scheduler { unsafe { local_ptr::take() } } +impl Local for Task { + fn put(value: ~Task) { unsafe { local_ptr::put(value) } } + fn take() -> ~Task { unsafe { local_ptr::take() } } fn exists() -> bool { local_ptr::exists() } - fn borrow<T>(f: &fn(&mut Scheduler) -> T) -> T { + fn borrow<T>(f: &fn(&mut Task) -> T) -> T { let mut res: Option<T> = None; let res_ptr: *mut Option<T> = &mut res; unsafe { - do local_ptr::borrow |sched| { -// rtdebug!("successfully unsafe borrowed sched pointer"); - let result = f(sched); + do local_ptr::borrow |task| { + let result = f(task); *res_ptr = Some(result); } } match res { Some(r) => { r } - None => rtabort!("function failed!") + None => { rtabort!("function failed in local_borrow") } } } - unsafe fn unsafe_borrow() -> *mut Scheduler { local_ptr::unsafe_borrow() } - unsafe fn try_unsafe_borrow() -> Option<*mut Scheduler> { rtabort!("unimpl") } + unsafe fn unsafe_borrow() -> *mut Task { local_ptr::unsafe_borrow() } + unsafe fn try_unsafe_borrow() -> Option<*mut Task> { rtabort!("unimpl task try_unsafe_borrow") } } -impl Local for Task { - fn put(_value: ~Task) { rtabort!("unimpl") } - fn take() -> ~Task { rtabort!("unimpl") } - fn exists() -> bool { rtabort!("unimpl") } - fn borrow<T>(f: &fn(&mut Task) -> T) -> T { - do Local::borrow::<Scheduler, T> |sched| { -// rtdebug!("sched about to grab current_task"); - match sched.current_task { +impl Local for Scheduler { + fn put(value: ~Scheduler) { + let value = Cell::new(value); + do Local::borrow::<Task,()> |task| { + let task = task; + task.sched = Some(value.take()); + }; + } + fn take() -> ~Scheduler { + do Local::borrow::<Task,~Scheduler> |task| { + let sched = task.sched.take_unwrap(); + let task = task; + task.sched = None; + sched + } + } + fn exists() -> bool { + do Local::borrow::<Task,bool> |task| { + match task.sched { + Some(ref _task) => true, + None => false + } + } + } + fn borrow<T>(f: &fn(&mut Scheduler) -> T) -> T { + do Local::borrow::<Task, T> |task| { + match task.sched { Some(~ref mut task) => { -// rtdebug!("current task pointer: %x", to_uint(task)); -// rtdebug!("current task heap pointer: %x", to_uint(&task.heap)); f(task) } None => { @@ -66,20 +83,19 @@ impl Local for Task { } } } - unsafe fn unsafe_borrow() -> *mut Task { - match (*Local::unsafe_borrow::<Scheduler>()).current_task { - Some(~ref mut task) => { - let s: *mut Task = &mut *task; + unsafe fn unsafe_borrow() -> *mut Scheduler { + match (*Local::unsafe_borrow::<Task>()).sched { + Some(~ref mut sched) => { + let s: *mut Scheduler = &mut *sched; return s; } None => { - // Don't fail. Infinite recursion rtabort!("no scheduler") } } } - unsafe fn try_unsafe_borrow() -> Option<*mut Task> { - if Local::exists::<Scheduler>() { + unsafe fn try_unsafe_borrow() -> Option<*mut Scheduler> { + if Local::exists::<Task>() { Some(Local::unsafe_borrow()) } else { None @@ -101,57 +117,69 @@ impl Local for IoFactoryObject { unsafe fn try_unsafe_borrow() -> Option<*mut IoFactoryObject> { rtabort!("unimpl") } } + #[cfg(test)] mod test { use unstable::run_in_bare_thread; use rt::test::*; - use rt::sched::Scheduler; +// use rt::sched::Scheduler; use super::*; + use rt::task::Task; + use rt::local_ptr; #[test] - fn thread_local_scheduler_smoke_test() { - do run_in_bare_thread { - let scheduler = ~new_test_uv_sched(); - Local::put(scheduler); - let _scheduler: ~Scheduler = Local::take(); - } + fn thread_local_task_smoke_test() { + local_ptr::init_tls_key(); + let mut sched = ~new_test_uv_sched(); + let task = ~Task::new_root(&mut sched.stack_pool, || {}); + Local::put(task); + let task: ~Task = Local::take(); + cleanup_task(task); } #[test] - fn thread_local_scheduler_two_instances() { - do run_in_bare_thread { - let scheduler = ~new_test_uv_sched(); - Local::put(scheduler); - let _scheduler: ~Scheduler = Local::take(); - let scheduler = ~new_test_uv_sched(); - Local::put(scheduler); - let _scheduler: ~Scheduler = Local::take(); - } + fn thread_local_task_two_instances() { + local_ptr::init_tls_key(); + let mut sched = ~new_test_uv_sched(); + let task = ~Task::new_root(&mut sched.stack_pool, || {}); + Local::put(task); + let task: ~Task = Local::take(); + cleanup_task(task); + let task = ~Task::new_root(&mut sched.stack_pool, || {}); + Local::put(task); + let task: ~Task = Local::take(); + cleanup_task(task); + } #[test] fn borrow_smoke_test() { - do run_in_bare_thread { - let scheduler = ~new_test_uv_sched(); - Local::put(scheduler); - unsafe { - let _scheduler: *mut Scheduler = Local::unsafe_borrow(); - } - let _scheduler: ~Scheduler = Local::take(); + local_ptr::init_tls_key(); + let mut sched = ~new_test_uv_sched(); + let task = ~Task::new_root(&mut sched.stack_pool, || {}); + Local::put(task); + + unsafe { + let _task: *mut Task = Local::unsafe_borrow(); } + let task: ~Task = Local::take(); + cleanup_task(task); } #[test] fn borrow_with_return() { - do run_in_bare_thread { - let scheduler = ~new_test_uv_sched(); - Local::put(scheduler); - let res = do Local::borrow::<Scheduler,bool> |_sched| { - true - }; - assert!(res); - let _scheduler: ~Scheduler = Local::take(); - } + local_ptr::init_tls_key(); + let mut sched = ~new_test_uv_sched(); + let task = ~Task::new_root(&mut sched.stack_pool, || {}); + Local::put(task); + + let res = do Local::borrow::<Task,bool> |_task| { + true + }; + assert!(res) + let task: ~Task = Local::take(); + cleanup_task(task); } } + diff --git a/src/libstd/rt/mod.rs b/src/libstd/rt/mod.rs index 3bcf6787824..268d402adf5 100644 --- a/src/libstd/rt/mod.rs +++ b/src/libstd/rt/mod.rs @@ -67,9 +67,10 @@ use iter::Times; use iterator::{Iterator, IteratorUtil}; use option::{Some, None}; use ptr::RawPtr; +use rt::local::Local; use rt::sched::{Scheduler, Shutdown}; use rt::sleeper_list::SleeperList; -use rt::task::{Task, Sched}; +use rt::task::{Task, SchedTask, GreenTask}; use rt::thread::Thread; use rt::work_queue::WorkQueue; use rt::uv::uvio::UvEventLoop; @@ -309,44 +310,53 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int { } }; - // Build the main task and queue it up - match main_sched { - None => { - // The default case where we don't need a scheduler on the main thread. - // Just put an unpinned task onto one of the default schedulers. - let mut main_task = ~Task::new_root(&mut scheds[0].stack_pool, main); - main_task.death.on_exit = Some(on_exit); - main_task.name = Some(~"main"); - scheds[0].enqueue_task(main_task); - } - Some(ref mut main_sched) => { - let home = Sched(main_sched.make_handle()); - let mut main_task = ~Task::new_root_homed(&mut scheds[0].stack_pool, home, main); - main_task.death.on_exit = Some(on_exit); - main_task.name = Some(~"main"); - main_sched.enqueue_task(main_task); - } - }; - - // Run each scheduler in a thread. let mut threads = ~[]; - while !scheds.is_empty() { + + if !use_main_sched { + + // In the case where we do not use a main_thread scheduler we + // run the main task in one of our threads. + + let main_cell = Cell::new(main); + let mut main_task = ~Task::new_root(&mut scheds[0].stack_pool, + main_cell.take()); + main_task.death.on_exit = Some(on_exit); + let main_task_cell = Cell::new(main_task); + let sched = scheds.pop(); let sched_cell = Cell::new(sched); let thread = do Thread::start { let sched = sched_cell.take(); - sched.run(); + sched.bootstrap(main_task_cell.take()); }; - threads.push(thread); } - // Run the main-thread scheduler - match main_sched { - Some(sched) => { let _ = sched.run(); }, - None => () + // Run each remaining scheduler in a thread. + while !scheds.is_empty() { + let sched = scheds.pop(); + let sched_cell = Cell::new(sched); + let thread = do Thread::start { + let mut sched = sched_cell.take(); + let bootstrap_task = ~do Task::new_root(&mut sched.stack_pool) || { + rtdebug!("boostraping a non-primary scheduler"); + }; + sched.bootstrap(bootstrap_task); + }; + threads.push(thread); } + // If we do have a main thread scheduler, run it now. + + if use_main_sched { + let home = Sched(main_sched.make_handle()); + let mut main_task = ~Task::new_root_homed(&mut scheds[0].stack_pool, + home, main); + main_task.death.on_exit = Some(on_exit); + let main_task_cell = Cell::new(main_task); + sched.bootstrap(main_task); + } + // Wait for schedulers foreach thread in threads.consume_iter() { thread.join(); @@ -378,27 +388,23 @@ pub enum RuntimeContext { pub fn context() -> RuntimeContext { use task::rt::rust_task; - use self::local::Local; - use self::sched::Scheduler; - // XXX: Hitting TLS twice to check if the scheduler exists - // then to check for the task is not good for perf if unsafe { rust_try_get_task().is_not_null() } { return OldTaskContext; - } else { - if Local::exists::<Scheduler>() { - let context = Cell::new_empty(); - do Local::borrow::<Scheduler, ()> |sched| { - if sched.in_task_context() { - context.put_back(TaskContext); - } else { - context.put_back(SchedulerContext); - } + } else if Local::exists::<Task>() { + rtdebug!("either task or scheduler context in newrt"); + // In this case we know it is a new runtime context, but we + // need to check which one. Going to try borrowing task to + // check. Task should always be in TLS, so hopefully this + // doesn't conflict with other ops that borrow. + return do Local::borrow::<Task,RuntimeContext> |task| { + match task.task_type { + SchedTask => SchedulerContext, + GreenTask(_) => TaskContext } - return context.take(); - } else { - return GlobalContext; - } + }; + } else { + return GlobalContext; } extern { @@ -410,23 +416,9 @@ pub fn context() -> RuntimeContext { #[test] fn test_context() { use unstable::run_in_bare_thread; - use self::sched::{Scheduler}; - use rt::local::Local; - use rt::test::new_test_uv_sched; assert_eq!(context(), OldTaskContext); do run_in_bare_thread { assert_eq!(context(), GlobalContext); - let mut sched = ~new_test_uv_sched(); - let task = ~do Task::new_root(&mut sched.stack_pool) { - assert_eq!(context(), TaskContext); - let sched = Local::take::<Scheduler>(); - do sched.deschedule_running_task_and_then() |sched, task| { - assert_eq!(context(), SchedulerContext); - sched.enqueue_blocked_task(task); - } - }; - sched.enqueue_task(task); - sched.run(); } } diff --git a/src/libstd/rt/sched.rs b/src/libstd/rt/sched.rs index ae4ca2b9783..0326c2cbfe5 100644 --- a/src/libstd/rt/sched.rs +++ b/src/libstd/rt/sched.rs @@ -10,7 +10,8 @@ use either::{Left, Right}; use option::{Option, Some, None}; -use cast::transmute; +use sys; +use cast::{transmute, transmute_mut_region, transmute_mut_unsafe}; use clone::Clone; use unstable::raw; @@ -27,6 +28,7 @@ use rt::local::Local; use rt::rtio::RemoteCallback; use rt::metrics::SchedMetrics; use borrow::{to_uint}; +use cell::Cell; /// The Scheduler is responsible for coordinating execution of Coroutines /// on a single thread. When the scheduler is running it is owned by @@ -59,11 +61,8 @@ pub struct Scheduler { stack_pool: StackPool, /// The event loop used to drive the scheduler and perform I/O event_loop: ~EventLoopObject, - /// The scheduler's saved context. - /// Always valid when a task is executing, otherwise not - priv saved_context: Context, - /// The currently executing task - current_task: Option<~Task>, + /// The scheduler runs on a special task. + sched_task: Option<~Task>, /// An action performed after a context switch on behalf of the /// code running before the context switch priv cleanup_job: Option<CleanupJob>, @@ -90,7 +89,6 @@ enum CleanupJob { } impl Scheduler { - pub fn in_task_context(&self) -> bool { self.current_task.is_some() } pub fn sched_id(&self) -> uint { to_uint(self) } @@ -103,15 +101,14 @@ impl Scheduler { } + // When you create a scheduler it isn't yet "in" a task, so the + // task field is None. pub fn new_special(event_loop: ~EventLoopObject, work_queue: WorkQueue<~Task>, sleeper_list: SleeperList, run_anything: bool) -> Scheduler { - // Lazily initialize the runtime TLS key - local_ptr::init_tls_key(); - Scheduler { sleeper_list: sleeper_list, message_queue: MessageQueue::new(), @@ -120,8 +117,7 @@ impl Scheduler { event_loop: event_loop, work_queue: work_queue, stack_pool: StackPool::new(), - saved_context: Context::empty(), - current_task: None, + sched_task: None, cleanup_job: None, metrics: SchedMetrics::new(), run_anything: run_anything @@ -132,8 +128,47 @@ impl Scheduler { // the scheduler itself doesn't have to call event_loop.run. // That will be important for embedding the runtime into external // event loops. - pub fn run(~self) -> ~Scheduler { - assert!(!self.in_task_context()); + + // Take a main task to run, and a scheduler to run it in. Create a + // scheduler task and bootstrap into it. + pub fn bootstrap(~self, task: ~Task) { + + // Initialize the TLS key. + local_ptr::init_tls_key(); + + // Create a task for the scheduler with an empty context. + let sched_task = Task::new_sched_task(); + + // Now that we have an empty task struct for the scheduler + // task, put it in TLS. + Local::put::(~sched_task); + + // Now, as far as all the scheduler state is concerned, we are + // inside the "scheduler" context. So we can act like the + // scheduler and resume the provided task. + self.resume_task_immediately(task); + + // Now we are back in the scheduler context, having + // successfully run the input task. Start by running the + // scheduler. Grab it out of TLS - performing the scheduler + // action will have given it away. + let sched = Local::take::<Scheduler>(); + sched.run(); + + // Now that we are done with the scheduler, clean up the + // scheduler task. Do so by removing it from TLS and manually + // cleaning up the memory it uses. As we didn't actually call + // task.run() on the scheduler task we never get through all + // the cleanup code it runs. + + rtdebug!("post sched.run(), cleaning up scheduler task"); + let mut stask = Local::take::<Task>(); + stask.destroyed = true; + } + + // This does not return a scheduler, as the scheduler is placed + // inside the task. + pub fn run(~self) { let mut self_sched = self; @@ -142,79 +177,88 @@ impl Scheduler { // schedulers. self_sched.event_loop.callback(Scheduler::run_sched_once); + // This is unsafe because we need to place the scheduler, with + // the event_loop inside, inside our task. But we still need a + // mutable reference to the event_loop to give it the "run" + // command. unsafe { - let event_loop: *mut ~EventLoopObject = { - let event_loop: *mut ~EventLoopObject = &mut self_sched.event_loop; - event_loop - }; + let event_loop: *mut ~EventLoopObject = &mut self_sched.event_loop; - // Give ownership of the scheduler (self) to the thread - Local::put(self_sched); + // Our scheduler must be in the task before the event loop + // is started. + let self_sched = Cell::new(self_sched); + do Local::borrow::<Task,()> |stask| { + stask.sched = Some(self_sched.take()); + }; (*event_loop).run(); } - - rtdebug!("run taking sched"); - let sched = Local::take::<Scheduler>(); - // XXX: Reenable this once we're using a per-scheduler queue. With a shared - // queue this is not true - //assert!(sched.work_queue.is_empty()); - rtdebug!("scheduler metrics: %s\n", { - use to_str::ToStr; - sched.metrics.to_str() - }); - return sched; } - fn run_sched_once() { + // One iteration of the scheduler loop, always run at least once. - let mut sched = Local::take::<Scheduler>(); - sched.metrics.turns += 1; - - // First, check the message queue for instructions. - // XXX: perf. Check for messages without atomics. - // It's ok if we miss messages occasionally, as long as - // we sync and check again before sleeping. - if sched.interpret_message_queue() { - // We performed a scheduling action. There may be other work - // to do yet, so let's try again later. - rtdebug!("run_sched_once, interpret_message_queue taking sched"); - let mut sched = Local::take::<Scheduler>(); - sched.metrics.messages_received += 1; - sched.event_loop.callback(Scheduler::run_sched_once); - Local::put(sched); - return; - } + // The model for this function is that you continue through it + // until you either use the scheduler while performing a schedule + // action, in which case you give it away and do not return, or + // you reach the end and sleep. In the case that a scheduler + // action is performed the loop is evented such that this function + // is called again. + fn run_sched_once() { - // Now, look in the work queue for tasks to run - rtdebug!("run_sched_once taking"); + // When we reach the scheduler context via the event loop we + // already have a scheduler stored in our local task, so we + // start off by taking it. This is the only path through the + // scheduler where we get the scheduler this way. let sched = Local::take::<Scheduler>(); - if sched.resume_task_from_queue() { - // We performed a scheduling action. There may be other work - // to do yet, so let's try again later. - do Local::borrow::<Scheduler, ()> |sched| { - sched.metrics.tasks_resumed_from_queue += 1; - sched.event_loop.callback(Scheduler::run_sched_once); + + // Our first task is to read mail to see if we have important + // messages. + + // 1) A wake message is easy, mutate sched struct and return + // it. + // 2) A shutdown is also easy, shutdown. + // 3) A pinned task - we resume immediately and do not return + // here. + + let result = sched.interpret_message_queue(); + let sched = match result { + Some(sched) => { + // We did not resume a task, so we returned. + sched } - return; - } + None => { + return; + } + }; + + let result = sched.resume_task_from_queue(); + let mut sched = match result { + Some(sched) => { + // Failed to dequeue a task, so we return. + sched + } + None => { + return; + } + }; // If we got here then there was no work to do. // Generate a SchedHandle and push it to the sleeper list so // somebody can wake us up later. - rtdebug!("no work to do"); - do Local::borrow::<Scheduler, ()> |sched| { - sched.metrics.wasted_turns += 1; - if !sched.sleepy && !sched.no_sleep { - rtdebug!("sleeping"); - sched.metrics.sleepy_times += 1; - sched.sleepy = true; - let handle = sched.make_handle(); - sched.sleeper_list.push(handle); - } else { - rtdebug!("not sleeping"); - } + sched.metrics.wasted_turns += 1; + if !sched.sleepy && !sched.no_sleep { + rtdebug!("scheduler has no work to do, going to sleep"); + sched.metrics.sleepy_times += 1; + sched.sleepy = true; + let handle = sched.make_handle(); + sched.sleeper_list.push(handle); + } else { + rtdebug!("not sleeping, already doing so or no_sleep set"); } + + // Finished a cycle without using the Scheduler. Place it back + // in TLS. + Local::put(sched); } pub fn make_handle(&mut self) -> SchedHandle { @@ -234,18 +278,6 @@ impl Scheduler { /// to the work queue directly. pub fn enqueue_task(&mut self, task: ~Task) { - // We don't want to queue tasks that belong on other threads, - // so we send them home at enqueue time. - - // The borrow checker doesn't like our disassembly of the - // Coroutine struct and partial use and mutation of the - // fields. So completely disassemble here and stop using? - - // XXX perf: I think we might be able to shuffle this code to - // only destruct when we need to. - - rtdebug!("a task was queued on: %u", self.sched_id()); - let this = self; // We push the task onto our local queue clone. @@ -283,30 +315,23 @@ impl Scheduler { // * Scheduler-context operations - fn interpret_message_queue(~self) -> bool { - assert!(!self.in_task_context()); - - rtdebug!("looking for scheduler messages"); + // This function returns None if the scheduler is "used", or it + // returns the still-available scheduler. + fn interpret_message_queue(~self) -> Option<~Scheduler> { let mut this = self; match this.message_queue.pop() { Some(PinnedTask(task)) => { - rtdebug!("recv BiasedTask message in sched: %u", - this.sched_id()); let mut task = task; - task.home = Some(Sched(this.make_handle())); + task.give_home(Sched(this.make_handle())); this.resume_task_immediately(task); - return true; + return None; } - Some(Wake) => { - rtdebug!("recv Wake message"); this.sleepy = false; - Local::put(this); - return true; + return Some(this); } Some(Shutdown) => { - rtdebug!("recv Shutdown message"); if this.sleepy { // There may be an outstanding handle on the // sleeper list. Pop them all to make sure that's @@ -325,12 +350,14 @@ impl Scheduler { // event loop references we will shut down. this.no_sleep = true; this.sleepy = false; - Local::put(this); - return true; + // YYY: Does a shutdown count as a "use" of the + // scheduler? This seems to work - so I'm leaving it + // this way despite not having a solid rational for + // why I should return the scheduler here. + return Some(this); } None => { - Local::put(this); - return false; + return Some(this); } } } @@ -338,7 +365,7 @@ impl Scheduler { /// Given an input Coroutine sends it back to its home scheduler. fn send_task_home(task: ~Task) { let mut task = task; - let mut home = task.home.take_unwrap(); + let mut home = task.take_unwrap_home(); match home { Sched(ref mut home_handle) => { home_handle.send(PinnedTask(task)); @@ -351,69 +378,45 @@ impl Scheduler { // Resume a task from the queue - but also take into account that // it might not belong here. - fn resume_task_from_queue(~self) -> bool { - assert!(!self.in_task_context()); - rtdebug!("looking in work queue for task to schedule"); + // If we perform a scheduler action we give away the scheduler ~ + // pointer, if it is still available we return it. + + fn resume_task_from_queue(~self) -> Option<~Scheduler> { + let mut this = self; - // The borrow checker imposes the possibly absurd requirement - // that we split this into two match expressions. This is due - // to the inspection of the internal bits of task, as that - // can't be in scope when we act on task. match this.work_queue.pop() { Some(task) => { - let action_id = { - let home = &task.home; - match home { - &Some(Sched(ref home_handle)) - if home_handle.sched_id != this.sched_id() => { - SendHome - } - &Some(AnySched) if this.run_anything => { - ResumeNow - } - &Some(AnySched) => { - Requeue - } - &Some(Sched(_)) => { - ResumeNow - } - &None => { - Homeless + let mut task = task; + let home = task.take_unwrap_home(); + match home { + Sched(home_handle) => { + if home_handle.sched_id != this.sched_id() { + task.give_home(Sched(home_handle)); + Scheduler::send_task_home(task); + return Some(this); + } else { + task.give_home(Sched(home_handle)); + this.resume_task_immediately(task); + return None; } } - }; - - match action_id { - SendHome => { - rtdebug!("sending task home"); - Scheduler::send_task_home(task); - Local::put(this); - return false; - } - ResumeNow => { - rtdebug!("resuming now"); + AnySched if this.run_anything => { + task.give_home(AnySched); this.resume_task_immediately(task); - return true; + return None; } - Requeue => { - rtdebug!("re-queueing") + AnySched => { + task.give_home(AnySched); this.enqueue_task(task); - Local::put(this); - return false; - } - Homeless => { - rtabort!("task home was None!"); + return Some(this); } } } - None => { - rtdebug!("no tasks in queue"); - Local::put(this); - return false; - } + return Some(this); + } } } @@ -422,33 +425,20 @@ impl Scheduler { /// Called by a running task to end execution, after which it will /// be recycled by the scheduler for reuse in a new task. pub fn terminate_current_task(~self) { + // Similar to deschedule running task and then, but cannot go through + // the task-blocking path. The task is already dying. let mut this = self; - assert!(this.in_task_context()); - - rtdebug!("ending running task"); - - // This task is post-cleanup, so it must be unkillable. This sequence - // of descheduling and recycling must not get interrupted by a kill. - // FIXME(#7544): Make this use an inner descheduler, like yield should. - this.current_task.get_mut_ref().death.unkillable += 1; - - do this.deschedule_running_task_and_then |sched, dead_task| { - match dead_task.wake() { - Some(dead_task) => { - let mut dead_task = dead_task; - dead_task.death.unkillable -= 1; // FIXME(#7544) ugh - let coroutine = dead_task.coroutine.take_unwrap(); - coroutine.recycle(&mut sched.stack_pool); - } - None => rtabort!("dead task killed before recycle"), - } + let stask = this.sched_task.take_unwrap(); + do this.change_task_context(stask) |sched, mut dead_task| { + let coroutine = dead_task.coroutine.take_unwrap(); + coroutine.recycle(&mut sched.stack_pool); } - - rtabort!("control reached end of task"); } - pub fn schedule_task(~self, task: ~Task) { - assert!(self.in_task_context()); + // If a scheduling action is performed, return None. If not, + // return Some(sched). + + pub fn schedule_task(~self, task: ~Task) -> Option<~Scheduler> { // is the task home? let is_home = task.is_home_no_tls(&self); @@ -461,55 +451,115 @@ impl Scheduler { if is_home || (!homed && this.run_anything) { // here we know we are home, execute now OR we know we // aren't homed, and that this sched doesn't care + rtdebug!("task: %u is on ok sched, executing", to_uint(task)); do this.switch_running_tasks_and_then(task) |sched, last_task| { sched.enqueue_blocked_task(last_task); } + return None; } else if !homed && !this.run_anything { // the task isn't homed, but it can't be run here this.enqueue_task(task); - Local::put(this); + return Some(this); } else { // task isn't home, so don't run it here, send it home Scheduler::send_task_home(task); - Local::put(this); + return Some(this); } } - // Core scheduling ops - - pub fn resume_task_immediately(~self, task: ~Task) { + // The primary function for changing contexts. In the current + // design the scheduler is just a slightly modified GreenTask, so + // all context swaps are from Task to Task. The only difference + // between the various cases is where the inputs come from, and + // what is done with the resulting task. That is specified by the + // cleanup function f, which takes the scheduler and the + // old task as inputs. + + pub fn change_task_context(~self, + next_task: ~Task, + f: &fn(&mut Scheduler, ~Task)) { let mut this = self; - assert!(!this.in_task_context()); - rtdebug!("scheduling a task"); - this.metrics.context_switches_sched_to_task += 1; + // The current task is grabbed from TLS, not taken as an input. + let current_task: ~Task = Local::take::<Task>(); - // Store the task in the scheduler so it can be grabbed later - this.current_task = Some(task); - this.enqueue_cleanup_job(DoNothing); + // These transmutes do something fishy with a closure. + let f_fake_region = unsafe { + transmute::<&fn(&mut Scheduler, ~Task), + &fn(&mut Scheduler, ~Task)>(f) + }; + let f_opaque = ClosureConverter::from_fn(f_fake_region); + + // The current task is placed inside an enum with the cleanup + // function. This enum is then placed inside the scheduler. + this.enqueue_cleanup_job(GiveTask(current_task, f_opaque)); - Local::put(this); + // The scheduler is then placed inside the next task. + let mut next_task = next_task; + next_task.sched = Some(this); - // Take pointers to both the task and scheduler's saved registers. + // However we still need an internal mutable pointer to the + // original task. The strategy here was "arrange memory, then + // get pointers", so we crawl back up the chain using + // transmute to eliminate borrowck errors. unsafe { - let sched = Local::unsafe_borrow::<Scheduler>(); - let (sched_context, _, next_task_context) = (*sched).get_contexts(); - let next_task_context = next_task_context.unwrap(); - // Context switch to the task, restoring it's registers - // and saving the scheduler's - Context::swap(sched_context, next_task_context); - let sched = Local::unsafe_borrow::<Scheduler>(); - // The running task should have passed ownership elsewhere - assert!((*sched).current_task.is_none()); + let sched: &mut Scheduler = + transmute_mut_region(*next_task.sched.get_mut_ref()); + + let current_task: &mut Task = match sched.cleanup_job { + Some(GiveTask(ref task, _)) => { + transmute_mut_region(*transmute_mut_unsafe(task)) + } + Some(DoNothing) => { + rtabort!("no next task"); + } + None => { + rtabort!("no cleanup job"); + } + }; + + let (current_task_context, next_task_context) = + Scheduler::get_contexts(current_task, next_task); + + // Done with everything - put the next task in TLS. This + // works because due to transmute the borrow checker + // believes that we have no internal pointers to + // next_task. + Local::put(next_task); + + // The raw context swap operation. The next action taken + // will be running the cleanup job from the context of the + // next task. + Context::swap(current_task_context, next_task_context); + } - // Running tasks may have asked us to do some cleanup + // When the context swaps back to the scheduler we immediately + // run the cleanup job, as expected by the previously called + // swap_contexts function. + unsafe { + let sched = Local::unsafe_borrow::<Scheduler>(); (*sched).run_cleanup_job(); // Must happen after running the cleanup job (of course). - // Might not be running in task context; if not, a later call to - // resume_task_immediately will take care of this. - (*sched).current_task.map(|t| t.death.check_killed()); + let task = Local::unsafe_borrow::<Task>(); + (*task).death.check_killed(); + } + } + + // There are a variety of "obvious" functions to be passed to + // change_task_context, so we can make a few "named cases". + + // Enqueue the old task on the current scheduler. + pub fn enqueue_old(sched: &mut Scheduler, task: ~Task) { + sched.enqueue_task(task); + } + + // Sometimes we just want the old API though. + + pub fn resume_task_immediately(~self, task: ~Task) { + do self.change_task_context(task) |sched, stask| { + sched.sched_task = Some(stask); } } @@ -533,152 +583,78 @@ impl Scheduler { /// in order to prevent that fn from performing further scheduling operations. /// Doing further scheduling could easily result in infinite recursion. pub fn deschedule_running_task_and_then(~self, f: &fn(&mut Scheduler, BlockedTask)) { + // Trickier - we need to get the scheduler task out of self + // and use it as the destination. let mut this = self; - assert!(this.in_task_context()); - - rtdebug!("blocking task"); - this.metrics.context_switches_task_to_sched += 1; - - unsafe { - let blocked_task = this.current_task.take_unwrap(); - let f_fake_region = transmute::<&fn(&mut Scheduler, BlockedTask), - &fn(&mut Scheduler, BlockedTask)>(f); - let f_opaque = ClosureConverter::from_fn(f_fake_region); - this.enqueue_cleanup_job(GiveTask(blocked_task, f_opaque)); - } - - Local::put(this); - - unsafe { - let sched = Local::unsafe_borrow::<Scheduler>(); - let (sched_context, last_task_context, _) = (*sched).get_contexts(); - let last_task_context = last_task_context.unwrap(); - Context::swap(last_task_context, sched_context); - - // We could be executing in a different thread now - let sched = Local::unsafe_borrow::<Scheduler>(); - (*sched).run_cleanup_job(); - - // As above, must happen after running the cleanup job. - (*sched).current_task.map(|t| t.death.check_killed()); - } + let stask = this.sched_task.take_unwrap(); + // Otherwise this is the same as below. + this.switch_running_tasks_and_then(stask, f); } - /// Switch directly to another task, without going through the scheduler. - /// You would want to think hard about doing this, e.g. if there are - /// pending I/O events it would be a bad idea. pub fn switch_running_tasks_and_then(~self, next_task: ~Task, f: &fn(&mut Scheduler, BlockedTask)) { - let mut this = self; - assert!(this.in_task_context()); - - rtdebug!("switching tasks"); - this.metrics.context_switches_task_to_task += 1; + // This is where we convert the BlockedTask-taking closure into one + // that takes just a Task, and is aware of the block-or-killed protocol. + do self.change_task_context(next_task) |sched, task| { + // Task might need to receive a kill signal instead of blocking. + // We can call the "and_then" only if it blocks successfully. + match BlockedTask::try_block(task) { + Left(killed_task) => sched.enqueue_task(killed_task), + Right(blocked_task) => f(sched, blocked_task), + } + } + } - let old_running_task = this.current_task.take_unwrap(); - let f_fake_region = unsafe { - transmute::<&fn(&mut Scheduler, BlockedTask), - &fn(&mut Scheduler, BlockedTask)>(f) + // A helper that looks up the scheduler and runs a task later by + // enqueuing it. + pub fn run_task_later(next_task: ~Task) { + // We aren't performing a scheduler operation, so we want to + // put the Scheduler back when we finish. + let next_task = Cell::new(next_task); + do Local::borrow::<Scheduler,()> |sched| { + sched.enqueue_task(next_task.take()); }; - let f_opaque = ClosureConverter::from_fn(f_fake_region); - this.enqueue_cleanup_job(GiveTask(old_running_task, f_opaque)); - this.current_task = Some(next_task); + } - Local::put(this); + // A helper that looks up the scheduler and runs a task. If it can + // be run now it is run now. + pub fn run_task(new_task: ~Task) { + let sched = Local::take::<Scheduler>(); + sched.schedule_task(new_task).map_consume(Local::put); + } + // Returns a mutable reference to both contexts involved in this + // swap. This is unsafe - we are getting mutable internal + // references to keep even when we don't own the tasks. It looks + // kinda safe because we are doing transmutes before passing in + // the arguments. + pub fn get_contexts<'a>(current_task: &mut Task, next_task: &mut Task) -> + (&'a mut Context, &'a mut Context) { + let current_task_context = + &mut current_task.coroutine.get_mut_ref().saved_context; + let next_task_context = + &mut next_task.coroutine.get_mut_ref().saved_context; unsafe { - let sched = Local::unsafe_borrow::<Scheduler>(); - let (_, last_task_context, next_task_context) = (*sched).get_contexts(); - let last_task_context = last_task_context.unwrap(); - let next_task_context = next_task_context.unwrap(); - Context::swap(last_task_context, next_task_context); - - // We could be executing in a different thread now - let sched = Local::unsafe_borrow::<Scheduler>(); - (*sched).run_cleanup_job(); - - // As above, must happen after running the cleanup job. - (*sched).current_task.map(|t| t.death.check_killed()); + (transmute_mut_region(current_task_context), + transmute_mut_region(next_task_context)) } } - - // * Other stuff pub fn enqueue_cleanup_job(&mut self, job: CleanupJob) { - assert!(self.cleanup_job.is_none()); self.cleanup_job = Some(job); } pub fn run_cleanup_job(&mut self) { rtdebug!("running cleanup job"); - - assert!(self.cleanup_job.is_some()); - let cleanup_job = self.cleanup_job.take_unwrap(); match cleanup_job { DoNothing => { } - GiveTask(task, f) => { - let f = f.to_fn(); - // Task might need to receive a kill signal instead of blocking. - // We can call the "and_then" only if it blocks successfully. - match BlockedTask::try_block(task) { - Left(killed_task) => self.enqueue_task(killed_task), - Right(blocked_task) => f(self, blocked_task), - } - } + GiveTask(task, f) => f.to_fn()(self, task) } } - /// Get mutable references to all the contexts that may be involved in a - /// context switch. - /// - /// Returns (the scheduler context, the optional context of the - /// task in the cleanup list, the optional context of the task in - /// the current task slot). When context switching to a task, - /// callers should first arrange for that task to be located in the - /// Scheduler's current_task slot and set up the - /// post-context-switch cleanup job. - pub fn get_contexts<'a>(&'a mut self) -> (&'a mut Context, - Option<&'a mut Context>, - Option<&'a mut Context>) { - let last_task = match self.cleanup_job { - Some(GiveTask(~ref task, _)) => { - Some(task) - } - Some(DoNothing) => { - None - } - None => fail!("all context switches should have a cleanup job") - }; - // XXX: Pattern matching mutable pointers above doesn't work - // because borrowck thinks the three patterns are conflicting - // borrows - unsafe { - let last_task = transmute::<Option<&Task>, Option<&mut Task>>(last_task); - let last_task_context = match last_task { - Some(t) => { - Some(&mut t.coroutine.get_mut_ref().saved_context) - } - None => { - None - } - }; - let next_task_context = match self.current_task { - Some(ref mut t) => { - Some(&mut t.coroutine.get_mut_ref().saved_context) - } - None => { - None - } - }; - // XXX: These transmutes can be removed after snapshot - return (transmute(&mut self.saved_context), - last_task_context, - transmute(next_task_context)); - } - } } // The cases for the below function. @@ -700,29 +676,73 @@ impl SchedHandle { // complaining type UnsafeTaskReceiver = raw::Closure; trait ClosureConverter { - fn from_fn(&fn(&mut Scheduler, BlockedTask)) -> Self; - fn to_fn(self) -> &fn(&mut Scheduler, BlockedTask); + fn from_fn(&fn(&mut Scheduler, ~Task)) -> Self; + fn to_fn(self) -> &fn(&mut Scheduler, ~Task); } impl ClosureConverter for UnsafeTaskReceiver { - fn from_fn(f: &fn(&mut Scheduler, BlockedTask)) -> UnsafeTaskReceiver { + fn from_fn(f: &fn(&mut Scheduler, ~Task)) -> UnsafeTaskReceiver { unsafe { transmute(f) } } - fn to_fn(self) -> &fn(&mut Scheduler, BlockedTask) { unsafe { transmute(self) } } + fn to_fn(self) -> &fn(&mut Scheduler, ~Task) { unsafe { transmute(self) } } } - #[cfg(test)] mod test { + use rt::test::*; + use unstable::run_in_bare_thread; + use borrow::to_uint; + use rt::local::*; + use rt::sched::{Scheduler}; + use uint; use int; use cell::Cell; - use unstable::run_in_bare_thread; - use task::spawn; - use rt::local::Local; - use rt::test::*; - use super::*; use rt::thread::Thread; - use borrow::to_uint; - use rt::task::{Task,Sched}; + use rt::task::{Task, Sched}; + use option::{Some}; + + #[test] + fn trivial_run_in_newsched_task_test() { + let mut task_ran = false; + let task_ran_ptr: *mut bool = &mut task_ran; + do run_in_newsched_task || { + unsafe { *task_ran_ptr = true }; + rtdebug!("executed from the new scheduler") + } + assert!(task_ran); + } + + #[test] + fn multiple_task_test() { + let total = 10; + let mut task_run_count = 0; + let task_run_count_ptr: *mut uint = &mut task_run_count; + do run_in_newsched_task || { + for uint::range(0,total) |_| { + do spawntask || { + unsafe { *task_run_count_ptr = *task_run_count_ptr + 1}; + } + } + } + assert!(task_run_count == total); + } + + #[test] + fn multiple_task_nested_test() { + let mut task_run_count = 0; + let task_run_count_ptr: *mut uint = &mut task_run_count; + do run_in_newsched_task || { + do spawntask || { + unsafe { *task_run_count_ptr = *task_run_count_ptr + 1 }; + do spawntask || { + unsafe { *task_run_count_ptr = *task_run_count_ptr + 1 }; + do spawntask || { + unsafe { *task_run_count_ptr = *task_run_count_ptr + 1 }; + } + } + } + } + assert!(task_run_count == 3); + } // Confirm that a sched_id actually is the uint form of the // pointer to the scheduler struct. @@ -745,46 +765,50 @@ mod test { } } - // A simple test to check if a homed task run on a single - // scheduler ends up executing while home. + + // A very simple test that confirms that a task executing on the + // home scheduler notices that it is home. #[test] fn test_home_sched() { do run_in_bare_thread { let mut task_ran = false; let task_ran_ptr: *mut bool = &mut task_ran; - let mut sched = ~new_test_uv_sched(); + let mut sched = ~new_test_uv_sched(); let sched_handle = sched.make_handle(); - let sched_id = sched.sched_id(); - let task = ~do Task::new_root_homed(&mut sched.stack_pool, - Sched(sched_handle)) { + let mut task = ~do Task::new_root_homed(&mut sched.stack_pool, + Sched(sched_handle)) { unsafe { *task_ran_ptr = true }; - let sched = Local::take::<Scheduler>(); - assert!(sched.sched_id() == sched_id); - Local::put::<Scheduler>(sched); + assert!(Task::on_appropriate_sched()); }; - sched.enqueue_task(task); - sched.run(); - assert!(task_ran); + + let on_exit: ~fn(bool) = |exit_status| rtassert!(exit_status); + task.death.on_exit = Some(on_exit); + + sched.bootstrap(task); } } - // A test for each state of schedule_task + // An advanced test that checks all four possible states that a + // (task,sched) can be in regarding homes. + #[test] fn test_schedule_home_states() { use rt::uv::uvio::UvEventLoop; - use rt::sched::Shutdown; use rt::sleeper_list::SleeperList; use rt::work_queue::WorkQueue; + use rt::sched::Shutdown; + use borrow; + use rt::comm::*; do run_in_bare_thread { let sleepers = SleeperList::new(); let work_queue = WorkQueue::new(); - // our normal scheduler + // Our normal scheduler let mut normal_sched = ~Scheduler::new( ~UvEventLoop::new(), work_queue.clone(), @@ -792,113 +816,93 @@ mod test { let normal_handle = Cell::new(normal_sched.make_handle()); - // our special scheduler + // Our special scheduler let mut special_sched = ~Scheduler::new_special( ~UvEventLoop::new(), work_queue.clone(), sleepers.clone(), - true); + false); let special_handle = Cell::new(special_sched.make_handle()); - let special_handle2 = Cell::new(special_sched.make_handle()); - let special_id = special_sched.sched_id(); + let t1_handle = special_sched.make_handle(); let t4_handle = special_sched.make_handle(); - let t1f = ~do Task::new_root_homed(&mut special_sched.stack_pool, - Sched(t1_handle)) || { - let is_home = Task::is_home_using_id(special_id); - rtdebug!("t1 should be home: %b", is_home); - assert!(is_home); - }; - let t1f = Cell::new(t1f); + // Four test tasks: + // 1) task is home on special + // 2) task not homed, sched doesn't care + // 3) task not homed, sched requeues + // 4) task not home, send home - let t2f = ~do Task::new_root(&mut normal_sched.stack_pool) { - let on_special = Task::on_special(); - rtdebug!("t2 should not be on special: %b", on_special); - assert!(!on_special); + let task1 = ~do Task::new_root_homed(&mut special_sched.stack_pool, + Sched(t1_handle)) || { + rtassert!(Task::on_appropriate_sched()); }; - let t2f = Cell::new(t2f); + rtdebug!("task1 id: **%u**", borrow::to_uint(task1)); - let t3f = ~do Task::new_root(&mut normal_sched.stack_pool) { - // not on special - let on_special = Task::on_special(); - rtdebug!("t3 should not be on special: %b", on_special); - assert!(!on_special); - }; - let t3f = Cell::new(t3f); - - let t4f = ~do Task::new_root_homed(&mut special_sched.stack_pool, - Sched(t4_handle)) { - // is home - let home = Task::is_home_using_id(special_id); - rtdebug!("t4 should be home: %b", home); - assert!(home); + let task2 = ~do Task::new_root(&mut normal_sched.stack_pool) { + rtassert!(Task::on_appropriate_sched()); }; - let t4f = Cell::new(t4f); - // we have four tests, make them as closures - let t1: ~fn() = || { - // task is home on special - let task = t1f.take(); - let sched = Local::take::<Scheduler>(); - sched.schedule_task(task); + let task3 = ~do Task::new_root(&mut normal_sched.stack_pool) { + rtassert!(Task::on_appropriate_sched()); }; - let t2: ~fn() = || { - // not homed, task doesn't care - let task = t2f.take(); - let sched = Local::take::<Scheduler>(); - sched.schedule_task(task); - }; - let t3: ~fn() = || { - // task not homed, must leave - let task = t3f.take(); - let sched = Local::take::<Scheduler>(); - sched.schedule_task(task); - }; - let t4: ~fn() = || { - // task not home, send home - let task = t4f.take(); - let sched = Local::take::<Scheduler>(); - sched.schedule_task(task); + + let task4 = ~do Task::new_root_homed(&mut special_sched.stack_pool, + Sched(t4_handle)) { + rtassert!(Task::on_appropriate_sched()); }; + rtdebug!("task4 id: **%u**", borrow::to_uint(task4)); + + let task1 = Cell::new(task1); + let task2 = Cell::new(task2); + let task3 = Cell::new(task3); + let task4 = Cell::new(task4); - let t1 = Cell::new(t1); - let t2 = Cell::new(t2); - let t3 = Cell::new(t3); - let t4 = Cell::new(t4); - - // build a main task that runs our four tests - let main_task = ~do Task::new_root(&mut normal_sched.stack_pool) { - // the two tasks that require a normal start location - t2.take()(); - t4.take()(); - normal_handle.take().send(Shutdown); - special_handle.take().send(Shutdown); + // Signal from the special task that we are done. + let (port, chan) = oneshot::<()>(); + let port = Cell::new(port); + let chan = Cell::new(chan); + + let normal_task = ~do Task::new_root(&mut normal_sched.stack_pool) { + rtdebug!("*about to submit task2*"); + Scheduler::run_task(task2.take()); + rtdebug!("*about to submit task4*"); + Scheduler::run_task(task4.take()); + rtdebug!("*normal_task done*"); + port.take().recv(); + let mut nh = normal_handle.take(); + nh.send(Shutdown); + let mut sh = special_handle.take(); + sh.send(Shutdown); }; - // task to run the two "special start" tests - let special_task = ~do Task::new_root_homed( - &mut special_sched.stack_pool, - Sched(special_handle2.take())) { - t1.take()(); - t3.take()(); + rtdebug!("normal task: %u", borrow::to_uint(normal_task)); + + let special_task = ~do Task::new_root(&mut special_sched.stack_pool) { + rtdebug!("*about to submit task1*"); + Scheduler::run_task(task1.take()); + rtdebug!("*about to submit task3*"); + Scheduler::run_task(task3.take()); + rtdebug!("*done with special_task*"); + chan.take().send(()); }; - // enqueue the main tasks - normal_sched.enqueue_task(special_task); - normal_sched.enqueue_task(main_task); + rtdebug!("special task: %u", borrow::to_uint(special_task)); + + let special_sched = Cell::new(special_sched); + let normal_sched = Cell::new(normal_sched); + let special_task = Cell::new(special_task); + let normal_task = Cell::new(normal_task); - let nsched_cell = Cell::new(normal_sched); let normal_thread = do Thread::start { - let sched = nsched_cell.take(); - sched.run(); + normal_sched.take().bootstrap(normal_task.take()); + rtdebug!("finished with normal_thread"); }; - let ssched_cell = Cell::new(special_sched); let special_thread = do Thread::start { - let sched = ssched_cell.take(); - sched.run(); + special_sched.take().bootstrap(special_task.take()); + rtdebug!("finished with special_sched"); }; normal_thread.join(); @@ -906,7 +910,6 @@ mod test { } } - // Do it a lot #[test] fn test_stress_schedule_task_states() { let n = stress_factor() * 120; @@ -916,123 +919,13 @@ mod test { } #[test] - fn test_simple_scheduling() { - do run_in_bare_thread { - let mut task_ran = false; - let task_ran_ptr: *mut bool = &mut task_ran; - - let mut sched = ~new_test_uv_sched(); - let task = ~do Task::new_root(&mut sched.stack_pool) { - unsafe { *task_ran_ptr = true; } - }; - sched.enqueue_task(task); - sched.run(); - assert!(task_ran); - } - } - - #[test] - fn test_several_tasks() { - do run_in_bare_thread { - let total = 10; - let mut task_count = 0; - let task_count_ptr: *mut int = &mut task_count; - - let mut sched = ~new_test_uv_sched(); - for int::range(0, total) |_| { - let task = ~do Task::new_root(&mut sched.stack_pool) { - unsafe { *task_count_ptr = *task_count_ptr + 1; } - }; - sched.enqueue_task(task); - } - sched.run(); - assert_eq!(task_count, total); - } - } - - #[test] - fn test_swap_tasks_then() { - do run_in_bare_thread { - let mut count = 0; - let count_ptr: *mut int = &mut count; - - let mut sched = ~new_test_uv_sched(); - let task1 = ~do Task::new_root(&mut sched.stack_pool) { - unsafe { *count_ptr = *count_ptr + 1; } - let mut sched = Local::take::<Scheduler>(); - let task2 = ~do Task::new_root(&mut sched.stack_pool) { - unsafe { *count_ptr = *count_ptr + 1; } - }; - // Context switch directly to the new task - do sched.switch_running_tasks_and_then(task2) |sched, task1| { - sched.enqueue_blocked_task(task1); - } - unsafe { *count_ptr = *count_ptr + 1; } - }; - sched.enqueue_task(task1); - sched.run(); - assert_eq!(count, 3); - } - } - - #[bench] #[test] #[ignore(reason = "long test")] - fn test_run_a_lot_of_tasks_queued() { - do run_in_bare_thread { - static MAX: int = 1000000; - let mut count = 0; - let count_ptr: *mut int = &mut count; - - let mut sched = ~new_test_uv_sched(); - - let start_task = ~do Task::new_root(&mut sched.stack_pool) { - run_task(count_ptr); - }; - sched.enqueue_task(start_task); - sched.run(); - - assert_eq!(count, MAX); - - fn run_task(count_ptr: *mut int) { - do Local::borrow::<Scheduler, ()> |sched| { - let task = ~do Task::new_root(&mut sched.stack_pool) { - unsafe { - *count_ptr = *count_ptr + 1; - if *count_ptr != MAX { - run_task(count_ptr); - } - } - }; - sched.enqueue_task(task); - } - }; - } - } - - #[test] - fn test_block_task() { - do run_in_bare_thread { - let mut sched = ~new_test_uv_sched(); - let task = ~do Task::new_root(&mut sched.stack_pool) { - let sched = Local::take::<Scheduler>(); - assert!(sched.in_task_context()); - do sched.deschedule_running_task_and_then() |sched, task| { - assert!(!sched.in_task_context()); - sched.enqueue_blocked_task(task); - } - }; - sched.enqueue_task(task); - sched.run(); - } - } - - #[test] fn test_io_callback() { // This is a regression test that when there are no schedulable tasks // in the work queue, but we are performing I/O, that once we do put // something in the work queue again the scheduler picks it up and doesn't // exit before emptying the work queue do run_in_newsched_task { - do spawn { + do spawntask { let sched = Local::take::<Scheduler>(); do sched.deschedule_running_task_and_then |sched, task| { let task = Cell::new(task); @@ -1053,34 +946,21 @@ mod test { do run_in_bare_thread { let (port, chan) = oneshot::<()>(); - let port_cell = Cell::new(port); - let chan_cell = Cell::new(chan); - let mut sched1 = ~new_test_uv_sched(); - let handle1 = sched1.make_handle(); - let handle1_cell = Cell::new(handle1); - let task1 = ~do Task::new_root(&mut sched1.stack_pool) { - chan_cell.take().send(()); - }; - sched1.enqueue_task(task1); - - let mut sched2 = ~new_test_uv_sched(); - let task2 = ~do Task::new_root(&mut sched2.stack_pool) { - port_cell.take().recv(); - // Release the other scheduler's handle so it can exit - handle1_cell.take(); - }; - sched2.enqueue_task(task2); + let port = Cell::new(port); + let chan = Cell::new(chan); - let sched1_cell = Cell::new(sched1); - let thread1 = do Thread::start { - let sched1 = sched1_cell.take(); - sched1.run(); + let _thread_one = do Thread::start { + let chan = Cell::new(chan.take()); + do run_in_newsched_task_core { + chan.take().send(()); + } }; - let sched2_cell = Cell::new(sched2); - let thread2 = do Thread::start { - let sched2 = sched2_cell.take(); - sched2.run(); + let _thread_two = do Thread::start { + let port = Cell::new(port.take()); + do run_in_newsched_task_core { + port.take().recv(); + } }; thread1.join(); @@ -1112,21 +992,21 @@ mod test { } } - #[test] + #[test] fn thread_ring() { use rt::comm::*; use comm::{GenericPort, GenericChan}; do run_in_mt_newsched_task { - let (end_port, end_chan) = oneshot(); + let (end_port, end_chan) = oneshot(); let n_tasks = 10; let token = 2000; - let (p, ch1) = stream(); + let (p, ch1) = stream(); let mut p = p; - ch1.send((token, end_chan)); - let mut i = 2; + ch1.send((token, end_chan)); + let mut i = 2; while i <= n_tasks { let (next_p, ch) = stream(); let imm_i = i; @@ -1151,9 +1031,9 @@ mod test { while (true) { match p.recv() { (1, end_chan) => { - debug!("%d\n", id); - end_chan.send(()); - return; + debug!("%d\n", id); + end_chan.send(()); + return; } (token, end_chan) => { debug!("thread: %d got token: %d", id, token); @@ -1178,15 +1058,16 @@ mod test { impl Drop for S { fn drop(&self) { - let _foo = @0; + let _foo = @0; } } let s = S { field: () }; do spawntask { - let _ss = &s; + let _ss = &s; } } } + } diff --git a/src/libstd/rt/task.rs b/src/libstd/rt/task.rs index c1b799796d1..bc603bede97 100644 --- a/src/libstd/rt/task.rs +++ b/src/libstd/rt/task.rs @@ -30,21 +30,34 @@ use rt::context::Context; use task::spawn::Taskgroup; use cell::Cell; +// The Task struct represents all state associated with a rust +// task. There are at this point two primary "subtypes" of task, +// however instead of using a subtype we just have a "task_type" field +// in the struct. This contains a pointer to another struct that holds +// the type-specific state. + pub struct Task { heap: LocalHeap, gc: GarbageCollector, storage: LocalStorage, logger: StdErrLogger, unwinder: Unwinder, - home: Option<SchedHome>, taskgroup: Option<Taskgroup>, death: Death, destroyed: bool, coroutine: Option<~Coroutine>, // FIXME(#6874/#7599) use StringRef to save on allocations name: Option<~str>, + sched: Option<~Scheduler>, + task_type: TaskType +} + +pub enum TaskType { + GreenTask(Option<~SchedHome>), + SchedTask } +/// A coroutine is nothing more than a (register context, stack) pair. pub struct Coroutine { /// The segment of stack on which the task is currently running or /// if the task is blocked, on which the task will resume @@ -54,6 +67,7 @@ pub struct Coroutine { saved_context: Context } +/// Some tasks have a deciated home scheduler that they must run on. pub enum SchedHome { AnySched, Sched(SchedHandle) @@ -68,6 +82,58 @@ pub struct Unwinder { impl Task { + // A helper to build a new task using the dynamically found + // scheduler and task. Only works in GreenTask context. + pub fn build_homed_child(f: ~fn(), home: SchedHome) -> ~Task { + let f = Cell::new(f); + let home = Cell::new(home); + do Local::borrow::<Task, ~Task> |running_task| { + let mut sched = running_task.sched.take_unwrap(); + let new_task = ~running_task.new_child_homed(&mut sched.stack_pool, + home.take(), + f.take()); + running_task.sched = Some(sched); + new_task + } + } + + pub fn build_child(f: ~fn()) -> ~Task { + Task::build_homed_child(f, AnySched) + } + + pub fn build_homed_root(f: ~fn(), home: SchedHome) -> ~Task { + let f = Cell::new(f); + let home = Cell::new(home); + do Local::borrow::<Task, ~Task> |running_task| { + let mut sched = running_task.sched.take_unwrap(); + let new_task = ~Task::new_root_homed(&mut sched.stack_pool, + home.take(), + f.take()); + running_task.sched = Some(sched); + new_task + } + } + + pub fn build_root(f: ~fn()) -> ~Task { + Task::build_homed_root(f, AnySched) + } + + pub fn new_sched_task() -> Task { + Task { + heap: LocalHeap::new(), + gc: GarbageCollector, + storage: LocalStorage(ptr::null(), None), + logger: StdErrLogger, + unwinder: Unwinder { unwinding: false }, + taskgroup: None, + death: Death::new(), + destroyed: false, + coroutine: Some(~Coroutine::empty()), + sched: None, + task_type: SchedTask + } + } + pub fn new_root(stack_pool: &mut StackPool, start: ~fn()) -> Task { Task::new_root_homed(stack_pool, AnySched, start) @@ -88,12 +154,13 @@ impl Task { storage: LocalStorage(ptr::null(), None), logger: StdErrLogger, unwinder: Unwinder { unwinding: false }, - home: Some(home), taskgroup: None, death: Death::new(), destroyed: false, coroutine: Some(~Coroutine::new(stack_pool, start)), name: None, + sched: None, + task_type: GreenTask(Some(~home)) } } @@ -106,7 +173,6 @@ impl Task { gc: GarbageCollector, storage: LocalStorage(ptr::null(), None), logger: StdErrLogger, - home: Some(home), unwinder: Unwinder { unwinding: false }, taskgroup: None, // FIXME(#7544) make watching optional @@ -114,19 +180,35 @@ impl Task { destroyed: false, coroutine: Some(~Coroutine::new(stack_pool, start)), name: None, + sched: None, + task_type: GreenTask(Some(~home)) } } pub fn give_home(&mut self, new_home: SchedHome) { - self.home = Some(new_home); + match self.task_type { + GreenTask(ref mut home) => { + *home = Some(~new_home); + } + SchedTask => { + rtabort!("type error: used SchedTask as GreenTask"); + } + } } - pub fn run(&mut self, f: &fn()) { - // This is just an assertion that `run` was called unsafely - // and this instance of Task is still accessible. - do Local::borrow::<Task, ()> |task| { - assert!(borrow::ref_eq(task, self)); + pub fn take_unwrap_home(&mut self) -> SchedHome { + match self.task_type { + GreenTask(ref mut home) => { + let out = home.take_unwrap(); + return *out; + } + SchedTask => { + rtabort!("type error: used SchedTask as GreenTask"); + } } + } + + pub fn run(&mut self, f: &fn()) { self.unwinder.try(f); { let _ = self.taskgroup.take(); } @@ -141,6 +223,8 @@ impl Task { /// thread-local-storage. fn destroy(&mut self) { + rtdebug!("DESTROYING TASK: %u", borrow::to_uint(self)); + do Local::borrow::<Task, ()> |task| { assert!(borrow::ref_eq(task, self)); } @@ -158,63 +242,68 @@ impl Task { self.destroyed = true; } - /// Check if *task* is currently home. - pub fn is_home(&self) -> bool { - do Local::borrow::<Scheduler,bool> |sched| { - match self.home { - Some(AnySched) => { false } - Some(Sched(SchedHandle { sched_id: ref id, _ })) => { - *id == sched.sched_id() - } - None => { rtabort!("task home of None") } - } - } - } + // New utility functions for homes. pub fn is_home_no_tls(&self, sched: &~Scheduler) -> bool { - match self.home { - Some(AnySched) => { false } - Some(Sched(SchedHandle { sched_id: ref id, _ })) => { + match self.task_type { + GreenTask(Some(~AnySched)) => { false } + GreenTask(Some(~Sched(SchedHandle { sched_id: ref id, _}))) => { *id == sched.sched_id() } - None => {rtabort!("task home of None") } - } - } - - pub fn is_home_using_id(sched_id: uint) -> bool { - do Local::borrow::<Task,bool> |task| { - match task.home { - Some(Sched(SchedHandle { sched_id: ref id, _ })) => { - *id == sched_id - } - Some(AnySched) => { false } - None => { rtabort!("task home of None") } + GreenTask(None) => { + rtabort!("task without home"); + } + SchedTask => { + // Awe yea + rtabort!("type error: expected: GreenTask, found: SchedTask"); } } } - /// Check if this *task* has a home. pub fn homed(&self) -> bool { - match self.home { - Some(AnySched) => { false } - Some(Sched(_)) => { true } - None => { - rtabort!("task home of None") + match self.task_type { + GreenTask(Some(~AnySched)) => { false } + GreenTask(Some(~Sched(SchedHandle { _ }))) => { true } + GreenTask(None) => { + rtabort!("task without home"); + } + SchedTask => { + rtabort!("type error: expected: GreenTask, found: SchedTask"); } } } - /// On a special scheduler? - pub fn on_special() -> bool { - do Local::borrow::<Scheduler,bool> |sched| { - !sched.run_anything + // Grab both the scheduler and the task from TLS and check if the + // task is executing on an appropriate scheduler. + pub fn on_appropriate_sched() -> bool { + do Local::borrow::<Task,bool> |task| { + let sched_id = task.sched.get_ref().sched_id(); + let sched_run_anything = task.sched.get_ref().run_anything; + match task.task_type { + GreenTask(Some(~AnySched)) => { + rtdebug!("anysched task in sched check ****"); + sched_run_anything + } + GreenTask(Some(~Sched(SchedHandle { sched_id: ref id, _ }))) => { + rtdebug!("homed task in sched check ****"); + *id == sched_id + } + GreenTask(None) => { + rtabort!("task without home"); + } + SchedTask => { + rtabort!("type error: expected: GreenTask, found: SchedTask"); + } + } } } - } impl Drop for Task { - fn drop(&self) { assert!(self.destroyed) } + fn drop(&self) { + rtdebug!("called drop for a task"); + assert!(self.destroyed) + } } // Coroutines represent nothing more than a context and a stack @@ -234,19 +323,33 @@ impl Coroutine { } } + pub fn empty() -> Coroutine { + Coroutine { + current_stack_segment: StackSegment::new(0), + saved_context: Context::empty() + } + } + fn build_start_wrapper(start: ~fn()) -> ~fn() { let start_cell = Cell::new(start); let wrapper: ~fn() = || { // First code after swap to this new context. Run our // cleanup job. unsafe { - let sched = Local::unsafe_borrow::<Scheduler>(); - (*sched).run_cleanup_job(); - let sched = Local::unsafe_borrow::<Scheduler>(); - let task = (*sched).current_task.get_mut_ref(); + // Again - might work while safe, or it might not. + do Local::borrow::<Scheduler,()> |sched| { + (sched).run_cleanup_job(); + } + + // To call the run method on a task we need a direct + // reference to it. The task is in TLS, so we can + // simply unsafe_borrow it to get this reference. We + // need to still have the task in TLS though, so we + // need to unsafe_borrow. + let task = Local::unsafe_borrow::<Task>(); - do task.run { + do (*task).run { // N.B. Removing `start` from the start wrapper // closure by emptying a cell is critical for // correctness. The ~Task pointer, and in turn the @@ -262,8 +365,11 @@ impl Coroutine { }; } + // We remove the sched from the Task in TLS right now. let sched = Local::take::<Scheduler>(); - sched.terminate_current_task(); + // ... allowing us to give it away when performing a + // scheduling operation. + sched.terminate_current_task() }; return wrapper; } @@ -465,3 +571,4 @@ mod test { } } } + diff --git a/src/libstd/rt/test.rs b/src/libstd/rt/test.rs index ec1094ed4f2..d0970ec5866 100644 --- a/src/libstd/rt/test.rs +++ b/src/libstd/rt/test.rs @@ -18,14 +18,12 @@ use iterator::Iterator; use vec::{OwnedVector, MutableVector}; use super::io::net::ip::{IpAddr, Ipv4, Ipv6}; use rt::sched::Scheduler; -use rt::local::Local; use unstable::run_in_bare_thread; use rt::thread::Thread; use rt::task::Task; use rt::uv::uvio::UvEventLoop; use rt::work_queue::WorkQueue; use rt::sleeper_list::SleeperList; -use rt::task::{Sched}; use rt::comm::oneshot; use result::{Result, Ok, Err}; @@ -34,29 +32,37 @@ pub fn new_test_uv_sched() -> Scheduler { let mut sched = Scheduler::new(~UvEventLoop::new(), WorkQueue::new(), SleeperList::new()); + // Don't wait for the Shutdown message sched.no_sleep = true; return sched; + } -/// Creates a new scheduler in a new thread and runs a task in it, -/// then waits for the scheduler to exit. Failure of the task -/// will abort the process. pub fn run_in_newsched_task(f: ~fn()) { let f = Cell::new(f); - do run_in_bare_thread { - let mut sched = ~new_test_uv_sched(); - let on_exit: ~fn(bool) = |exit_status| rtassert!(exit_status); - let mut task = ~Task::new_root(&mut sched.stack_pool, - f.take()); - rtdebug!("newsched_task: %x", ::borrow::to_uint(task)); - task.death.on_exit = Some(on_exit); - sched.enqueue_task(task); - sched.run(); + run_in_newsched_task_core(f.take()); } } +pub fn run_in_newsched_task_core(f: ~fn()) { + + use rt::sched::Shutdown; + + let mut sched = ~new_test_uv_sched(); + let exit_handle = Cell::new(sched.make_handle()); + + let on_exit: ~fn(bool) = |exit_status| { + exit_handle.take().send(Shutdown); + rtassert!(exit_status); + }; + let mut task = ~Task::new_root(&mut sched.stack_pool, f); + task.death.on_exit = Some(on_exit); + + sched.bootstrap(task); +} + /// Create more than one scheduler and run a function in a task /// in one of the schedulers. The schedulers will stay alive /// until the function `f` returns. @@ -65,7 +71,7 @@ pub fn run_in_mt_newsched_task(f: ~fn()) { use from_str::FromStr; use rt::sched::Shutdown; - let f_cell = Cell::new(f); + let f = Cell::new(f); do run_in_bare_thread { let nthreads = match os::getenv("RUST_RT_TEST_THREADS") { @@ -95,7 +101,6 @@ pub fn run_in_mt_newsched_task(f: ~fn()) { scheds.push(sched); } - let f_cell = Cell::new(f_cell.take()); let handles = Cell::new(handles); let on_exit: ~fn(bool) = |exit_status| { let mut handles = handles.take(); @@ -107,18 +112,30 @@ pub fn run_in_mt_newsched_task(f: ~fn()) { rtassert!(exit_status); }; let mut main_task = ~Task::new_root(&mut scheds[0].stack_pool, - f_cell.take()); + f.take()); main_task.death.on_exit = Some(on_exit); - scheds[0].enqueue_task(main_task); let mut threads = ~[]; + let main_task = Cell::new(main_task); - while !scheds.is_empty() { + let main_thread = { let sched = scheds.pop(); let sched_cell = Cell::new(sched); + do Thread::start { + let sched = sched_cell.take(); + sched.bootstrap(main_task.take()); + } + }; + threads.push(main_thread); + + while !scheds.is_empty() { + let mut sched = scheds.pop(); + let bootstrap_task = ~do Task::new_root(&mut sched.stack_pool) || {}; + let bootstrap_task_cell = Cell::new(bootstrap_task); + let sched_cell = Cell::new(sched); let thread = do Thread::start { let sched = sched_cell.take(); - sched.run(); + sched.bootstrap(bootstrap_task_cell.take()); }; threads.push(thread); @@ -134,187 +151,52 @@ pub fn run_in_mt_newsched_task(f: ~fn()) { /// Test tasks will abort on failure instead of unwinding pub fn spawntask(f: ~fn()) { - use super::sched::*; - let f = Cell::new(f); - - let task = unsafe { - let sched = Local::unsafe_borrow::<Scheduler>(); - rtdebug!("spawntask taking the scheduler from TLS"); - - - do Local::borrow::<Task, ~Task>() |running_task| { - ~running_task.new_child(&mut (*sched).stack_pool, f.take()) - } - }; - - rtdebug!("new task pointer: %x", ::borrow::to_uint(task)); - - let sched = Local::take::<Scheduler>(); - rtdebug!("spawntask scheduling the new task"); - sched.schedule_task(task); -} - - -/// Create a new task and run it right now. Aborts on failure -pub fn spawntask_immediately(f: ~fn()) { - use super::sched::*; - - let f = Cell::new(f); - - let task = unsafe { - let sched = Local::unsafe_borrow::<Scheduler>(); - do Local::borrow::<Task, ~Task>() |running_task| { - ~running_task.new_child(&mut (*sched).stack_pool, - f.take()) - } - }; - - let sched = Local::take::<Scheduler>(); - do sched.switch_running_tasks_and_then(task) |sched, task| { - sched.enqueue_blocked_task(task); - } + Scheduler::run_task(Task::build_child(f)); } /// Create a new task and run it right now. Aborts on failure pub fn spawntask_later(f: ~fn()) { - use super::sched::*; - let f = Cell::new(f); - - let task = unsafe { - let sched = Local::unsafe_borrow::<Scheduler>(); - do Local::borrow::<Task, ~Task>() |running_task| { - ~running_task.new_child(&mut (*sched).stack_pool, f.take()) - } - }; - - let mut sched = Local::take::<Scheduler>(); - sched.enqueue_task(task); - Local::put(sched); + Scheduler::run_task_later(Task::build_child(f)); } -/// Spawn a task and either run it immediately or run it later pub fn spawntask_random(f: ~fn()) { - use super::sched::*; use rand::{Rand, rng}; - let f = Cell::new(f); - - let task = unsafe { - let sched = Local::unsafe_borrow::<Scheduler>(); - do Local::borrow::<Task, ~Task>() |running_task| { - ~running_task.new_child(&mut (*sched).stack_pool, - f.take()) - - } - }; - - let mut sched = Local::take::<Scheduler>(); - let mut rng = rng(); let run_now: bool = Rand::rand(&mut rng); if run_now { - do sched.switch_running_tasks_and_then(task) |sched, task| { - sched.enqueue_blocked_task(task); - } + spawntask(f) } else { - sched.enqueue_task(task); - Local::put(sched); + spawntask_later(f) } } -/// Spawn a task, with the current scheduler as home, and queue it to -/// run later. -pub fn spawntask_homed(scheds: &mut ~[~Scheduler], f: ~fn()) { - use super::sched::*; - use rand::{rng, RngUtil}; - let mut rng = rng(); - - let task = { - let sched = &mut scheds[rng.gen_int_range(0,scheds.len() as int)]; - let handle = sched.make_handle(); - let home_id = handle.sched_id; - - // now that we know where this is going, build a new function - // that can assert it is in the right place - let af: ~fn() = || { - do Local::borrow::<Scheduler,()>() |sched| { - rtdebug!("home_id: %u, runtime loc: %u", - home_id, - sched.sched_id()); - assert!(home_id == sched.sched_id()); - }; - f() - }; - - ~Task::new_root_homed(&mut sched.stack_pool, - Sched(handle), - af) - }; - let dest_sched = &mut scheds[rng.gen_int_range(0,scheds.len() as int)]; - // enqueue it for future execution - dest_sched.enqueue_task(task); -} - -/// Spawn a task and wait for it to finish, returning whether it -/// completed successfully or failed -pub fn spawntask_try(f: ~fn()) -> Result<(), ()> { - use cell::Cell; - use super::sched::*; - - let f = Cell::new(f); +pub fn spawntask_try(f: ~fn()) -> Result<(),()> { let (port, chan) = oneshot(); let chan = Cell::new(chan); let on_exit: ~fn(bool) = |exit_status| chan.take().send(exit_status); - let mut new_task = unsafe { - let sched = Local::unsafe_borrow::<Scheduler>(); - do Local::borrow::<Task, ~Task> |_running_task| { - - // I don't understand why using a child task here fails. I - // think the fail status is propogating back up the task - // tree and triggering a fail for the parent, which we - // aren't correctly expecting. - - // ~running_task.new_child(&mut (*sched).stack_pool, - ~Task::new_root(&mut (*sched).stack_pool, - f.take()) - } - }; - new_task.death.on_exit = Some(on_exit); - let sched = Local::take::<Scheduler>(); - do sched.switch_running_tasks_and_then(new_task) |sched, old_task| { - sched.enqueue_blocked_task(old_task); - } + let mut new_task = Task::build_root(f); + new_task.death.on_exit = Some(on_exit); - rtdebug!("enqueued the new task, now waiting on exit_status"); + Scheduler::run_task(new_task); let exit_status = port.recv(); if exit_status { Ok(()) } else { Err(()) } + } /// Spawn a new task in a new scheduler and return a thread handle. pub fn spawntask_thread(f: ~fn()) -> Thread { - use rt::sched::*; let f = Cell::new(f); - let task = unsafe { - let sched = Local::unsafe_borrow::<Scheduler>(); - do Local::borrow::<Task, ~Task>() |running_task| { - ~running_task.new_child(&mut (*sched).stack_pool, - f.take()) - } - }; - - let task = Cell::new(task); - let thread = do Thread::start { - let mut sched = ~new_test_uv_sched(); - sched.enqueue_task(task.take()); - sched.run(); + run_in_newsched_task_core(f.take()); }; + return thread; } @@ -323,11 +205,14 @@ pub fn with_test_task(blk: ~fn(~Task) -> ~Task) { do run_in_bare_thread { let mut sched = ~new_test_uv_sched(); let task = blk(~Task::new_root(&mut sched.stack_pool, ||{})); - sched.enqueue_task(task); - sched.run(); + cleanup_task(task); } } +/// Use to cleanup tasks created for testing but not "run". +pub fn cleanup_task(mut task: ~Task) { + task.destroyed = true; +} /// Get a port number, starting at 9600, for use in tests pub fn next_test_port() -> u16 { diff --git a/src/libstd/rt/tube.rs b/src/libstd/rt/tube.rs index bc223d8f3f7..ae455a6ad04 100644 --- a/src/libstd/rt/tube.rs +++ b/src/libstd/rt/tube.rs @@ -17,7 +17,6 @@ use option::*; use clone::Clone; use super::rc::RC; use rt::sched::Scheduler; -use rt::{context, TaskContext, SchedulerContext}; use rt::kill::BlockedTask; use rt::local::Local; use vec::OwnedVector; @@ -44,8 +43,6 @@ impl<T> Tube<T> { pub fn send(&mut self, val: T) { rtdebug!("tube send"); - assert!(context() == SchedulerContext); - unsafe { let state = self.p.unsafe_borrow_mut(); (*state).buf.push(val); @@ -61,8 +58,6 @@ impl<T> Tube<T> { } pub fn recv(&mut self) -> T { - assert!(context() == TaskContext); - unsafe { let state = self.p.unsafe_borrow_mut(); if !(*state).buf.is_empty() { diff --git a/src/libstd/rt/uv/mod.rs b/src/libstd/rt/uv/mod.rs index 638d510614a..fa5c497a877 100644 --- a/src/libstd/rt/uv/mod.rs +++ b/src/libstd/rt/uv/mod.rs @@ -51,7 +51,7 @@ use rt::io::net::ip::IpAddr; use rt::io::IoError; -#[cfg(test)] use unstable::run_in_bare_thread; +//#[cfg(test)] use unstable::run_in_bare_thread; pub use self::file::FsRequest; pub use self::net::{StreamWatcher, TcpWatcher, UdpWatcher}; @@ -333,7 +333,7 @@ pub fn vec_from_uv_buf(buf: Buf) -> Option<~[u8]> { return None; } } - +/* #[test] fn test_slice_to_uv_buf() { let slice = [0, .. 20]; @@ -360,3 +360,4 @@ fn loop_smoke_test() { loop_.close(); } } +*/ diff --git a/src/libstd/rt/uv/uvio.rs b/src/libstd/rt/uv/uvio.rs index 53ccd20186d..27970cc52af 100644 --- a/src/libstd/rt/uv/uvio.rs +++ b/src/libstd/rt/uv/uvio.rs @@ -33,7 +33,7 @@ use unstable::sync::Exclusive; #[cfg(test)] use container::Container; #[cfg(test)] use uint; #[cfg(test)] use unstable::run_in_bare_thread; -#[cfg(test)] use rt::test::{spawntask_immediately, +#[cfg(test)] use rt::test::{spawntask, next_test_ip4, run_in_newsched_task}; @@ -251,13 +251,11 @@ impl IoFactory for UvIoFactory { let result_cell_ptr: *Cell<Result<~RtioTcpStreamObject, IoError>> = &result_cell; let scheduler = Local::take::<Scheduler>(); - assert!(scheduler.in_task_context()); // Block this task and take ownership, switch to scheduler context - do scheduler.deschedule_running_task_and_then |sched, task| { + do scheduler.deschedule_running_task_and_then |_sched, task| { rtdebug!("connect: entered scheduler context"); - assert!(!sched.in_task_context()); let mut tcp_watcher = TcpWatcher::new(self.uv_loop()); let task_cell = Cell::new(task); @@ -458,11 +456,9 @@ impl RtioTcpStream for UvTcpStream { let result_cell_ptr: *Cell<Result<uint, IoError>> = &result_cell; let scheduler = Local::take::<Scheduler>(); - assert!(scheduler.in_task_context()); let buf_ptr: *&mut [u8] = &buf; - do scheduler.deschedule_running_task_and_then |sched, task| { + do scheduler.deschedule_running_task_and_then |_sched, task| { rtdebug!("read: entered scheduler context"); - assert!(!sched.in_task_context()); let task_cell = Cell::new(task); // XXX: We shouldn't reallocate these callbacks every // call to read @@ -500,7 +496,6 @@ impl RtioTcpStream for UvTcpStream { let result_cell = Cell::new_empty(); let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell; let scheduler = Local::take::<Scheduler>(); - assert!(scheduler.in_task_context()); let buf_ptr: *&[u8] = &buf; do scheduler.deschedule_running_task_and_then |_, task| { let task_cell = Cell::new(task); @@ -602,11 +597,9 @@ impl RtioUdpSocket for UvUdpSocket { let result_cell_ptr: *Cell<Result<(uint, IpAddr), IoError>> = &result_cell; let scheduler = Local::take::<Scheduler>(); - assert!(scheduler.in_task_context()); let buf_ptr: *&mut [u8] = &buf; - do scheduler.deschedule_running_task_and_then |sched, task| { + do scheduler.deschedule_running_task_and_then |_sched, task| { rtdebug!("recvfrom: entered scheduler context"); - assert!(!sched.in_task_context()); let task_cell = Cell::new(task); let alloc: AllocCallback = |_| unsafe { slice_to_uv_buf(*buf_ptr) }; do self.recv_start(alloc) |mut watcher, nread, _buf, addr, flags, status| { @@ -637,7 +630,6 @@ impl RtioUdpSocket for UvUdpSocket { let result_cell = Cell::new_empty(); let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell; let scheduler = Local::take::<Scheduler>(); - assert!(scheduler.in_task_context()); let buf_ptr: *&[u8] = &buf; do scheduler.deschedule_running_task_and_then |_, task| { let task_cell = Cell::new(task); @@ -845,7 +837,7 @@ fn test_simple_tcp_server_and_client() { let addr = next_test_ip4(); // Start the server first so it's listening when we connect - do spawntask_immediately { + do spawntask { unsafe { let io = Local::unsafe_borrow::<IoFactoryObject>(); let mut listener = (*io).tcp_bind(addr).unwrap(); @@ -860,7 +852,7 @@ fn test_simple_tcp_server_and_client() { } } - do spawntask_immediately { + do spawntask { unsafe { let io = Local::unsafe_borrow::<IoFactoryObject>(); let mut stream = (*io).tcp_connect(addr).unwrap(); @@ -876,7 +868,7 @@ fn test_simple_udp_server_and_client() { let server_addr = next_test_ip4(); let client_addr = next_test_ip4(); - do spawntask_immediately { + do spawntask { unsafe { let io = Local::unsafe_borrow::<IoFactoryObject>(); let mut server_socket = (*io).udp_bind(server_addr).unwrap(); @@ -891,7 +883,7 @@ fn test_simple_udp_server_and_client() { } } - do spawntask_immediately { + do spawntask { unsafe { let io = Local::unsafe_borrow::<IoFactoryObject>(); let mut client_socket = (*io).udp_bind(client_addr).unwrap(); @@ -906,7 +898,7 @@ fn test_read_and_block() { do run_in_newsched_task { let addr = next_test_ip4(); - do spawntask_immediately { + do spawntask { let io = unsafe { Local::unsafe_borrow::<IoFactoryObject>() }; let mut listener = unsafe { (*io).tcp_bind(addr).unwrap() }; let mut stream = listener.accept().unwrap(); @@ -939,7 +931,7 @@ fn test_read_and_block() { assert!(reads > 1); } - do spawntask_immediately { + do spawntask { unsafe { let io = Local::unsafe_borrow::<IoFactoryObject>(); let mut stream = (*io).tcp_connect(addr).unwrap(); @@ -959,7 +951,7 @@ fn test_read_read_read() { let addr = next_test_ip4(); static MAX: uint = 500000; - do spawntask_immediately { + do spawntask { unsafe { let io = Local::unsafe_borrow::<IoFactoryObject>(); let mut listener = (*io).tcp_bind(addr).unwrap(); @@ -973,7 +965,7 @@ fn test_read_read_read() { } } - do spawntask_immediately { + do spawntask { unsafe { let io = Local::unsafe_borrow::<IoFactoryObject>(); let mut stream = (*io).tcp_connect(addr).unwrap(); @@ -999,7 +991,7 @@ fn test_udp_twice() { let server_addr = next_test_ip4(); let client_addr = next_test_ip4(); - do spawntask_immediately { + do spawntask { unsafe { let io = Local::unsafe_borrow::<IoFactoryObject>(); let mut client = (*io).udp_bind(client_addr).unwrap(); @@ -1008,7 +1000,7 @@ fn test_udp_twice() { } } - do spawntask_immediately { + do spawntask { unsafe { let io = Local::unsafe_borrow::<IoFactoryObject>(); let mut server = (*io).udp_bind(server_addr).unwrap(); @@ -1036,7 +1028,7 @@ fn test_udp_many_read() { let client_in_addr = next_test_ip4(); static MAX: uint = 500_000; - do spawntask_immediately { + do spawntask { unsafe { let io = Local::unsafe_borrow::<IoFactoryObject>(); let mut server_out = (*io).udp_bind(server_out_addr).unwrap(); @@ -1059,7 +1051,7 @@ fn test_udp_many_read() { } } - do spawntask_immediately { + do spawntask { unsafe { let io = Local::unsafe_borrow::<IoFactoryObject>(); let mut client_out = (*io).udp_bind(client_out_addr).unwrap(); diff --git a/src/libstd/task/spawn.rs b/src/libstd/task/spawn.rs index 4558f8e32c1..88f214ef4c0 100644 --- a/src/libstd/task/spawn.rs +++ b/src/libstd/task/spawn.rs @@ -653,22 +653,16 @@ fn enlist_many(child: TaskHandle, child_arc: &TaskGroupArc, pub fn spawn_raw(opts: TaskOpts, f: ~fn()) { match context() { - OldTaskContext => { - spawn_raw_oldsched(opts, f) - } - TaskContext => { - spawn_raw_newsched(opts, f) - } - SchedulerContext => { - fail!("can't spawn from scheduler context") - } - GlobalContext => { - fail!("can't spawn from global context") - } + OldTaskContext => spawn_raw_oldsched(opts, f), + TaskContext => spawn_raw_newsched(opts, f), + SchedulerContext => fail!("can't spawn from scheduler context"), + GlobalContext => fail!("can't spawn from global context"), } } fn spawn_raw_newsched(mut opts: TaskOpts, f: ~fn()) { + use rt::sched::*; + let child_data = Cell::new(gen_child_taskgroup(opts.linked, opts.supervised)); let indestructible = opts.indestructible; @@ -700,19 +694,11 @@ fn spawn_raw_newsched(mut opts: TaskOpts, f: ~fn()) { } }; - let mut task = unsafe { - let sched = Local::unsafe_borrow::<Scheduler>(); - rtdebug!("unsafe borrowed sched"); - - if opts.watched { - let child_wrapper = Cell::new(child_wrapper); - do Local::borrow::<Task, ~Task>() |running_task| { - ~running_task.new_child(&mut (*sched).stack_pool, child_wrapper.take()) - } - } else { - // An unwatched task is a new root in the exit-code propagation tree - ~Task::new_root(&mut (*sched).stack_pool, child_wrapper) - } + let mut task = if opts.watched { + Task::build_child(child_wrapper) + } else { + // An unwatched task is a new root in the exit-code propagation tree + Task::build_root(child_wrapper) }; if opts.notify_chan.is_some() { @@ -727,12 +713,9 @@ fn spawn_raw_newsched(mut opts: TaskOpts, f: ~fn()) { } task.name = opts.name.take(); + rtdebug!("spawn calling run_task"); + Scheduler::run_task(task); - rtdebug!("spawn about to take scheduler"); - - let sched = Local::take::<Scheduler>(); - rtdebug!("took sched in spawn"); - sched.schedule_task(task); } fn spawn_raw_oldsched(mut opts: TaskOpts, f: ~fn()) { diff --git a/src/libstd/unstable/lang.rs b/src/libstd/unstable/lang.rs index 7a5e1116c32..74604b4ea17 100644 --- a/src/libstd/unstable/lang.rs +++ b/src/libstd/unstable/lang.rs @@ -70,9 +70,6 @@ pub unsafe fn local_malloc(td: *c_char, size: uintptr_t) -> *c_char { _ => { let mut alloc = ::ptr::null(); do Local::borrow::<Task,()> |task| { - rtdebug!("task pointer: %x, heap pointer: %x", - ::borrow::to_uint(task), - ::borrow::to_uint(&task.heap)); alloc = task.heap.alloc(td as *c_void, size as uint) as *c_char; } return alloc; |
