diff options
| author | Ben Blum <bblum@andrew.cmu.edu> | 2013-07-11 14:29:33 -0400 |
|---|---|---|
| committer | Ben Blum <bblum@andrew.cmu.edu> | 2013-07-20 05:08:57 -0400 |
| commit | 9ad199754923e6d0ce8a004087036bf5bd347fbf (patch) | |
| tree | 8be4273c3a051201dfa6f16e990cd187ec2cf49e /src/libstd | |
| parent | 0101f35f276d0ef1ab841a179d01d0c66a18b38a (diff) | |
| download | rust-9ad199754923e6d0ce8a004087036bf5bd347fbf.tar.gz rust-9ad199754923e6d0ce8a004087036bf5bd347fbf.zip | |
Change the HOF context switchers to pass a BlockedTask instead of a ~Task.
Diffstat (limited to 'src/libstd')
| -rw-r--r-- | src/libstd/rt/comm.rs | 32 | ||||
| -rw-r--r-- | src/libstd/rt/mod.rs | 2 | ||||
| -rw-r--r-- | src/libstd/rt/sched.rs | 83 | ||||
| -rw-r--r-- | src/libstd/rt/test.rs | 6 | ||||
| -rw-r--r-- | src/libstd/rt/tube.rs | 12 | ||||
| -rw-r--r-- | src/libstd/rt/uv/uvio.rs | 24 | ||||
| -rw-r--r-- | src/libstd/task/mod.rs | 3 |
7 files changed, 102 insertions, 60 deletions
diff --git a/src/libstd/rt/comm.rs b/src/libstd/rt/comm.rs index fba61711297..46c99d21d9d 100644 --- a/src/libstd/rt/comm.rs +++ b/src/libstd/rt/comm.rs @@ -19,7 +19,7 @@ use option::*; use cast; use util; use ops::Drop; -use rt::task::Task; +use rt::kill::BlockedTask; use kinds::Send; use rt::sched::Scheduler; use rt::local::Local; @@ -30,13 +30,13 @@ use comm::{GenericChan, GenericSmartChan, GenericPort, Peekable}; use cell::Cell; use clone::Clone; -/// A combined refcount / ~Task pointer. +/// A combined refcount / BlockedTask-as-uint pointer. /// /// Can be equal to the following values: /// /// * 2 - both endpoints are alive /// * 1 - either the sender or the receiver is dead, determined by context -/// * <ptr> - A pointer to a blocked Task that can be transmuted to ~Task +/// * <ptr> - A pointer to a blocked Task (see BlockedTask::cast_{to,from}_uint) type State = uint; static STATE_BOTH: State = 2; @@ -137,11 +137,13 @@ impl<T> ChanOne<T> { } task_as_state => { // Port is blocked. Wake it up. - let recvr: ~Task = cast::transmute(task_as_state); - let mut sched = Local::take::<Scheduler>(); - rtdebug!("rendezvous send"); - sched.metrics.rendezvous_sends += 1; - sched.schedule_task(recvr); + let recvr = BlockedTask::cast_from_uint(task_as_state); + do recvr.wake().map_consume |woken_task| { + let mut sched = Local::take::<Scheduler>(); + rtdebug!("rendezvous send"); + sched.metrics.rendezvous_sends += 1; + sched.schedule_task(woken_task); + }; } } } @@ -177,7 +179,7 @@ impl<T> PortOne<T> { // an acquire barrier to prevent reordering of the subsequent read // of the payload. Also issues a release barrier to prevent reordering // of any previous writes to the task structure. - let task_as_state: State = cast::transmute(task); + let task_as_state = task.cast_to_uint(); let oldstate = (*packet).state.swap(task_as_state, SeqCst); match oldstate { STATE_BOTH => { @@ -193,8 +195,8 @@ impl<T> PortOne<T> { // NB: We have to drop back into the scheduler event loop here // instead of switching immediately back or we could end up // triggering infinite recursion on the scheduler's stack. - let task: ~Task = cast::transmute(task_as_state); - sched.enqueue_task(task); + let recvr = BlockedTask::cast_from_uint(task_as_state); + sched.enqueue_blocked_task(recvr); } _ => util::unreachable() } @@ -258,9 +260,11 @@ impl<T> Drop for ChanOneHack<T> { task_as_state => { // The port is blocked waiting for a message we will never send. Wake it. assert!((*this.packet()).payload.is_none()); - let recvr: ~Task = cast::transmute(task_as_state); - let sched = Local::take::<Scheduler>(); - sched.schedule_task(recvr); + let recvr = BlockedTask::cast_from_uint(task_as_state); + do recvr.wake().map_consume |woken_task| { + let sched = Local::take::<Scheduler>(); + sched.schedule_task(woken_task); + }; } } } diff --git a/src/libstd/rt/mod.rs b/src/libstd/rt/mod.rs index 0da7a8f8fc3..85537f476d4 100644 --- a/src/libstd/rt/mod.rs +++ b/src/libstd/rt/mod.rs @@ -367,7 +367,7 @@ fn test_context() { let sched = Local::take::<Scheduler>(); do sched.deschedule_running_task_and_then() |sched, task| { assert_eq!(context(), SchedulerContext); - sched.enqueue_task(task); + sched.enqueue_blocked_task(task); } }; sched.enqueue_task(task); diff --git a/src/libstd/rt/sched.rs b/src/libstd/rt/sched.rs index 4b51508f0a4..d8d61806a5b 100644 --- a/src/libstd/rt/sched.rs +++ b/src/libstd/rt/sched.rs @@ -8,7 +8,8 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -use option::*; +use either::{Left, Right}; +use option::{Option, Some, None}; use sys; use cast::transmute; use clone::Clone; @@ -20,6 +21,7 @@ use super::rtio::{EventLoop, EventLoopObject, RemoteCallbackObject}; use super::context::Context; use super::task::{Task, AnySched, Sched}; use super::message_queue::MessageQueue; +use rt::kill::BlockedTask; use rt::local_ptr; use rt::local::Local; use rt::rtio::RemoteCallback; @@ -271,6 +273,14 @@ impl Scheduler { }; } + /// As enqueue_task, but with the possibility for the blocked task to + /// already have been killed. + pub fn enqueue_blocked_task(&mut self, blocked_task: BlockedTask) { + do blocked_task.wake().map_consume |task| { + self.enqueue_task(task); + }; + } + // * Scheduler-context operations fn interpret_message_queue(~self) -> bool { @@ -412,14 +422,26 @@ impl Scheduler { /// Called by a running task to end execution, after which it will /// be recycled by the scheduler for reuse in a new task. pub fn terminate_current_task(~self) { - assert!(self.in_task_context()); + let mut this = self; + assert!(this.in_task_context()); rtdebug!("ending running task"); - do self.deschedule_running_task_and_then |sched, dead_task| { - let mut dead_task = dead_task; - let coroutine = dead_task.coroutine.take_unwrap(); - coroutine.recycle(&mut sched.stack_pool); + // This task is post-cleanup, so it must be unkillable. This sequence + // of descheduling and recycling must not get interrupted by a kill. + // FIXME(#7544): Make this use an inner descheduler, like yield should. + this.current_task.get_mut_ref().death.unkillable += 1; + + do this.deschedule_running_task_and_then |sched, dead_task| { + match dead_task.wake() { + Some(dead_task) => { + let mut dead_task = dead_task; + dead_task.death.unkillable -= 1; // FIXME(#7544) ugh + let coroutine = dead_task.coroutine.take_unwrap(); + coroutine.recycle(&mut sched.stack_pool); + } + None => rtabort!("dead task killed before recycle"), + } } rtabort!("control reached end of task"); @@ -440,7 +462,7 @@ impl Scheduler { // here we know we are home, execute now OR we know we // aren't homed, and that this sched doesn't care do this.switch_running_tasks_and_then(task) |sched, last_task| { - sched.enqueue_task(last_task); + sched.enqueue_blocked_task(last_task); } } else if !homed && !this.run_anything { // the task isn't homed, but it can't be run here @@ -491,6 +513,13 @@ impl Scheduler { } } + pub fn resume_blocked_task_immediately(~self, blocked_task: BlockedTask) { + match blocked_task.wake() { + Some(task) => self.resume_task_immediately(task), + None => Local::put(self), + }; + } + /// Block a running task, context switch to the scheduler, then pass the /// blocked task to a closure. /// @@ -503,7 +532,7 @@ impl Scheduler { /// This passes a Scheduler pointer to the fn after the context switch /// in order to prevent that fn from performing further scheduling operations. /// Doing further scheduling could easily result in infinite recursion. - pub fn deschedule_running_task_and_then(~self, f: &fn(&mut Scheduler, ~Task)) { + pub fn deschedule_running_task_and_then(~self, f: &fn(&mut Scheduler, BlockedTask)) { let mut this = self; assert!(this.in_task_context()); @@ -512,8 +541,8 @@ impl Scheduler { unsafe { let blocked_task = this.current_task.take_unwrap(); - let f_fake_region = transmute::<&fn(&mut Scheduler, ~Task), - &fn(&mut Scheduler, ~Task)>(f); + let f_fake_region = transmute::<&fn(&mut Scheduler, BlockedTask), + &fn(&mut Scheduler, BlockedTask)>(f); let f_opaque = ClosureConverter::from_fn(f_fake_region); this.enqueue_cleanup_job(GiveTask(blocked_task, f_opaque)); } @@ -539,7 +568,7 @@ impl Scheduler { /// You would want to think hard about doing this, e.g. if there are /// pending I/O events it would be a bad idea. pub fn switch_running_tasks_and_then(~self, next_task: ~Task, - f: &fn(&mut Scheduler, ~Task)) { + f: &fn(&mut Scheduler, BlockedTask)) { let mut this = self; assert!(this.in_task_context()); @@ -548,8 +577,8 @@ impl Scheduler { let old_running_task = this.current_task.take_unwrap(); let f_fake_region = unsafe { - transmute::<&fn(&mut Scheduler, ~Task), - &fn(&mut Scheduler, ~Task)>(f) + transmute::<&fn(&mut Scheduler, BlockedTask), + &fn(&mut Scheduler, BlockedTask)>(f) }; let f_opaque = ClosureConverter::from_fn(f_fake_region); this.enqueue_cleanup_job(GiveTask(old_running_task, f_opaque)); @@ -590,7 +619,15 @@ impl Scheduler { let cleanup_job = self.cleanup_job.take_unwrap(); match cleanup_job { DoNothing => { } - GiveTask(task, f) => (f.to_fn())(self, task) + GiveTask(task, f) => { + let f = f.to_fn(); + // Task might need to receive a kill signal instead of blocking. + // We can call the "and_then" only if it blocks successfully. + match BlockedTask::try_block(task) { + Left(killed_task) => self.enqueue_task(killed_task), + Right(blocked_task) => f(self, blocked_task), + } + } } } @@ -663,12 +700,14 @@ impl SchedHandle { // complaining type UnsafeTaskReceiver = sys::Closure; trait ClosureConverter { - fn from_fn(&fn(&mut Scheduler, ~Task)) -> Self; - fn to_fn(self) -> &fn(&mut Scheduler, ~Task); + fn from_fn(&fn(&mut Scheduler, BlockedTask)) -> Self; + fn to_fn(self) -> &fn(&mut Scheduler, BlockedTask); } impl ClosureConverter for UnsafeTaskReceiver { - fn from_fn(f: &fn(&mut Scheduler, ~Task)) -> UnsafeTaskReceiver { unsafe { transmute(f) } } - fn to_fn(self) -> &fn(&mut Scheduler, ~Task) { unsafe { transmute(self) } } + fn from_fn(f: &fn(&mut Scheduler, BlockedTask)) -> UnsafeTaskReceiver { + unsafe { transmute(f) } + } + fn to_fn(self) -> &fn(&mut Scheduler, BlockedTask) { unsafe { transmute(self) } } } @@ -928,8 +967,7 @@ mod test { }; // Context switch directly to the new task do sched.switch_running_tasks_and_then(task2) |sched, task1| { - let task1 = Cell::new(task1); - sched.enqueue_task(task1.take()); + sched.enqueue_blocked_task(task1); } unsafe { *count_ptr = *count_ptr + 1; } }; @@ -980,9 +1018,8 @@ mod test { let sched = Local::take::<Scheduler>(); assert!(sched.in_task_context()); do sched.deschedule_running_task_and_then() |sched, task| { - let task = Cell::new(task); assert!(!sched.in_task_context()); - sched.enqueue_task(task.take()); + sched.enqueue_blocked_task(task); } }; sched.enqueue_task(task); @@ -1004,7 +1041,7 @@ mod test { do sched.event_loop.callback_ms(10) { rtdebug!("in callback"); let mut sched = Local::take::<Scheduler>(); - sched.enqueue_task(task.take()); + sched.enqueue_blocked_task(task.take()); Local::put(sched); } } diff --git a/src/libstd/rt/test.rs b/src/libstd/rt/test.rs index a4242d83ecd..1562160550a 100644 --- a/src/libstd/rt/test.rs +++ b/src/libstd/rt/test.rs @@ -170,7 +170,7 @@ pub fn spawntask_immediately(f: ~fn()) { let sched = Local::take::<Scheduler>(); do sched.switch_running_tasks_and_then(task) |sched, task| { - sched.enqueue_task(task); + sched.enqueue_blocked_task(task); } } @@ -214,7 +214,7 @@ pub fn spawntask_random(f: ~fn()) { if run_now { do sched.switch_running_tasks_and_then(task) |sched, task| { - sched.enqueue_task(task); + sched.enqueue_blocked_task(task); } } else { sched.enqueue_task(task); @@ -284,7 +284,7 @@ pub fn spawntask_try(f: ~fn()) -> Result<(), ()> { let sched = Local::take::<Scheduler>(); do sched.switch_running_tasks_and_then(new_task) |sched, old_task| { - sched.enqueue_task(old_task); + sched.enqueue_blocked_task(old_task); } rtdebug!("enqueued the new task, now waiting on exit_status"); diff --git a/src/libstd/rt/tube.rs b/src/libstd/rt/tube.rs index f61eee8859b..bc223d8f3f7 100644 --- a/src/libstd/rt/tube.rs +++ b/src/libstd/rt/tube.rs @@ -18,13 +18,13 @@ use clone::Clone; use super::rc::RC; use rt::sched::Scheduler; use rt::{context, TaskContext, SchedulerContext}; +use rt::kill::BlockedTask; use rt::local::Local; -use rt::task::Task; use vec::OwnedVector; use container::Container; struct TubeState<T> { - blocked_task: Option<~Task>, + blocked_task: Option<BlockedTask>, buf: ~[T] } @@ -55,7 +55,7 @@ impl<T> Tube<T> { rtdebug!("waking blocked tube"); let task = (*state).blocked_task.take_unwrap(); let sched = Local::take::<Scheduler>(); - sched.resume_task_immediately(task); + sched.resume_blocked_task_immediately(task); } } } @@ -111,7 +111,7 @@ mod test { do sched.deschedule_running_task_and_then |sched, task| { let mut tube_clone = tube_clone_cell.take(); tube_clone.send(1); - sched.enqueue_task(task); + sched.enqueue_blocked_task(task); } assert!(tube.recv() == 1); @@ -133,7 +133,7 @@ mod test { // sending will wake it up. tube_clone.send(1); } - sched.enqueue_task(task); + sched.enqueue_blocked_task(task); } assert!(tube.recv() == 1); @@ -168,7 +168,7 @@ mod test { } } - sched.enqueue_task(task); + sched.enqueue_blocked_task(task); } for int::range(0, MAX) |i| { diff --git a/src/libstd/rt/uv/uvio.rs b/src/libstd/rt/uv/uvio.rs index 5d0c64c6867..7046afe8551 100644 --- a/src/libstd/rt/uv/uvio.rs +++ b/src/libstd/rt/uv/uvio.rs @@ -227,7 +227,7 @@ impl IoFactory for UvIoFactory { // Context switch let scheduler = Local::take::<Scheduler>(); - scheduler.resume_task_immediately(task_cell.take()); + scheduler.resume_blocked_task_immediately(task_cell.take()); } else { rtdebug!("status is some"); let task_cell = Cell::new(task_cell.take()); @@ -235,7 +235,7 @@ impl IoFactory for UvIoFactory { let res = Err(uv_error_to_io_error(status.get())); unsafe { (*result_cell_ptr).put_back(res); } let scheduler = Local::take::<Scheduler>(); - scheduler.resume_task_immediately(task_cell.take()); + scheduler.resume_blocked_task_immediately(task_cell.take()); } }; } @@ -255,7 +255,7 @@ impl IoFactory for UvIoFactory { let task_cell = Cell::new(task); do watcher.as_stream().close { let scheduler = Local::take::<Scheduler>(); - scheduler.resume_task_immediately(task_cell.take()); + scheduler.resume_blocked_task_immediately(task_cell.take()); } } Err(uv_error_to_io_error(uverr)) @@ -273,7 +273,7 @@ impl IoFactory for UvIoFactory { let task_cell = Cell::new(task); do watcher.close { let scheduler = Local::take::<Scheduler>(); - scheduler.resume_task_immediately(task_cell.take()); + scheduler.resume_blocked_task_immediately(task_cell.take()); } } Err(uv_error_to_io_error(uverr)) @@ -309,7 +309,7 @@ impl Drop for UvTcpListener { let task_cell = Cell::new(task); do watcher.as_stream().close { let scheduler = Local::take::<Scheduler>(); - scheduler.resume_task_immediately(task_cell.take()); + scheduler.resume_blocked_task_immediately(task_cell.take()); } } } @@ -372,7 +372,7 @@ impl Drop for UvTcpStream { let task_cell = Cell::new(task); do self.close { let scheduler = Local::take::<Scheduler>(); - scheduler.resume_task_immediately(task_cell.take()); + scheduler.resume_blocked_task_immediately(task_cell.take()); } } } @@ -419,7 +419,7 @@ impl RtioTcpStream for UvTcpStream { unsafe { (*result_cell_ptr).put_back(result); } let scheduler = Local::take::<Scheduler>(); - scheduler.resume_task_immediately(task_cell.take()); + scheduler.resume_blocked_task_immediately(task_cell.take()); } } @@ -447,7 +447,7 @@ impl RtioTcpStream for UvTcpStream { unsafe { (*result_cell_ptr).put_back(result); } let scheduler = Local::take::<Scheduler>(); - scheduler.resume_task_immediately(task_cell.take()); + scheduler.resume_blocked_task_immediately(task_cell.take()); } } @@ -473,7 +473,7 @@ impl Drop for UvUdpSocket { let task_cell = Cell::new(task); do self.close { let scheduler = Local::take::<Scheduler>(); - scheduler.resume_task_immediately(task_cell.take()); + scheduler.resume_blocked_task_immediately(task_cell.take()); } } } @@ -513,7 +513,7 @@ impl RtioUdpSocket for UvUdpSocket { unsafe { (*result_cell_ptr).put_back(result); } let scheduler = Local::take::<Scheduler>(); - scheduler.resume_task_immediately(task_cell.take()); + scheduler.resume_blocked_task_immediately(task_cell.take()); } } @@ -540,7 +540,7 @@ impl RtioUdpSocket for UvUdpSocket { unsafe { (*result_cell_ptr).put_back(result); } let scheduler = Local::take::<Scheduler>(); - scheduler.resume_task_immediately(task_cell.take()); + scheduler.resume_blocked_task_immediately(task_cell.take()); } } @@ -678,7 +678,7 @@ fn test_read_and_block() { // not ready for it do scheduler.deschedule_running_task_and_then |sched, task| { let task = Cell::new(task); - sched.enqueue_task(task.take()); + sched.enqueue_blocked_task(task.take()); } } diff --git a/src/libstd/task/mod.rs b/src/libstd/task/mod.rs index f2c1d2ffd9d..11e2b99d7c0 100644 --- a/src/libstd/task/mod.rs +++ b/src/libstd/task/mod.rs @@ -515,9 +515,10 @@ pub fn yield() { } _ => { // XXX: What does yield really mean in newsched? + // FIXME(#7544): Optimize this, since we know we won't block. let sched = Local::take::<Scheduler>(); do sched.deschedule_running_task_and_then |sched, task| { - sched.enqueue_task(task); + sched.enqueue_blocked_task(task); } } } |
