about summary refs log tree commit diff
path: root/src/libstd
diff options
context:
space:
mode:
authorBen Blum <bblum@andrew.cmu.edu>2013-07-11 14:29:33 -0400
committerBen Blum <bblum@andrew.cmu.edu>2013-07-20 05:08:57 -0400
commit9ad199754923e6d0ce8a004087036bf5bd347fbf (patch)
tree8be4273c3a051201dfa6f16e990cd187ec2cf49e /src/libstd
parent0101f35f276d0ef1ab841a179d01d0c66a18b38a (diff)
downloadrust-9ad199754923e6d0ce8a004087036bf5bd347fbf.tar.gz
rust-9ad199754923e6d0ce8a004087036bf5bd347fbf.zip
Change the HOF context switchers to pass a BlockedTask instead of a ~Task.
Diffstat (limited to 'src/libstd')
-rw-r--r--src/libstd/rt/comm.rs32
-rw-r--r--src/libstd/rt/mod.rs2
-rw-r--r--src/libstd/rt/sched.rs83
-rw-r--r--src/libstd/rt/test.rs6
-rw-r--r--src/libstd/rt/tube.rs12
-rw-r--r--src/libstd/rt/uv/uvio.rs24
-rw-r--r--src/libstd/task/mod.rs3
7 files changed, 102 insertions, 60 deletions
diff --git a/src/libstd/rt/comm.rs b/src/libstd/rt/comm.rs
index fba61711297..46c99d21d9d 100644
--- a/src/libstd/rt/comm.rs
+++ b/src/libstd/rt/comm.rs
@@ -19,7 +19,7 @@ use option::*;
 use cast;
 use util;
 use ops::Drop;
-use rt::task::Task;
+use rt::kill::BlockedTask;
 use kinds::Send;
 use rt::sched::Scheduler;
 use rt::local::Local;
@@ -30,13 +30,13 @@ use comm::{GenericChan, GenericSmartChan, GenericPort, Peekable};
 use cell::Cell;
 use clone::Clone;
 
-/// A combined refcount / ~Task pointer.
+/// A combined refcount / BlockedTask-as-uint pointer.
 ///
 /// Can be equal to the following values:
 ///
 /// * 2 - both endpoints are alive
 /// * 1 - either the sender or the receiver is dead, determined by context
-/// * <ptr> - A pointer to a blocked Task that can be transmuted to ~Task
+/// * <ptr> - A pointer to a blocked Task (see BlockedTask::cast_{to,from}_uint)
 type State = uint;
 
 static STATE_BOTH: State = 2;
@@ -137,11 +137,13 @@ impl<T> ChanOne<T> {
                 }
                 task_as_state => {
                     // Port is blocked. Wake it up.
-                    let recvr: ~Task = cast::transmute(task_as_state);
-                    let mut sched = Local::take::<Scheduler>();
-                    rtdebug!("rendezvous send");
-                    sched.metrics.rendezvous_sends += 1;
-                    sched.schedule_task(recvr);
+                    let recvr = BlockedTask::cast_from_uint(task_as_state);
+                    do recvr.wake().map_consume |woken_task| {
+                        let mut sched = Local::take::<Scheduler>();
+                        rtdebug!("rendezvous send");
+                        sched.metrics.rendezvous_sends += 1;
+                        sched.schedule_task(woken_task);
+                    };
                 }
             }
         }
@@ -177,7 +179,7 @@ impl<T> PortOne<T> {
                 // an acquire barrier to prevent reordering of the subsequent read
                 // of the payload. Also issues a release barrier to prevent reordering
                 // of any previous writes to the task structure.
-                let task_as_state: State = cast::transmute(task);
+                let task_as_state = task.cast_to_uint();
                 let oldstate = (*packet).state.swap(task_as_state, SeqCst);
                 match oldstate {
                     STATE_BOTH => {
@@ -193,8 +195,8 @@ impl<T> PortOne<T> {
                         // NB: We have to drop back into the scheduler event loop here
                         // instead of switching immediately back or we could end up
                         // triggering infinite recursion on the scheduler's stack.
-                        let task: ~Task = cast::transmute(task_as_state);
-                        sched.enqueue_task(task);
+                        let recvr = BlockedTask::cast_from_uint(task_as_state);
+                        sched.enqueue_blocked_task(recvr);
                     }
                     _ => util::unreachable()
                 }
@@ -258,9 +260,11 @@ impl<T> Drop for ChanOneHack<T> {
                 task_as_state => {
                     // The port is blocked waiting for a message we will never send. Wake it.
                     assert!((*this.packet()).payload.is_none());
-                    let recvr: ~Task = cast::transmute(task_as_state);
-                    let sched = Local::take::<Scheduler>();
-                    sched.schedule_task(recvr);
+                    let recvr = BlockedTask::cast_from_uint(task_as_state);
+                    do recvr.wake().map_consume |woken_task| {
+                        let sched = Local::take::<Scheduler>();
+                        sched.schedule_task(woken_task);
+                    };
                 }
             }
         }
diff --git a/src/libstd/rt/mod.rs b/src/libstd/rt/mod.rs
index 0da7a8f8fc3..85537f476d4 100644
--- a/src/libstd/rt/mod.rs
+++ b/src/libstd/rt/mod.rs
@@ -367,7 +367,7 @@ fn test_context() {
             let sched = Local::take::<Scheduler>();
             do sched.deschedule_running_task_and_then() |sched, task| {
                 assert_eq!(context(), SchedulerContext);
-                sched.enqueue_task(task);
+                sched.enqueue_blocked_task(task);
             }
         };
         sched.enqueue_task(task);
diff --git a/src/libstd/rt/sched.rs b/src/libstd/rt/sched.rs
index 4b51508f0a4..d8d61806a5b 100644
--- a/src/libstd/rt/sched.rs
+++ b/src/libstd/rt/sched.rs
@@ -8,7 +8,8 @@
 // option. This file may not be copied, modified, or distributed
 // except according to those terms.
 
-use option::*;
+use either::{Left, Right};
+use option::{Option, Some, None};
 use sys;
 use cast::transmute;
 use clone::Clone;
@@ -20,6 +21,7 @@ use super::rtio::{EventLoop, EventLoopObject, RemoteCallbackObject};
 use super::context::Context;
 use super::task::{Task, AnySched, Sched};
 use super::message_queue::MessageQueue;
+use rt::kill::BlockedTask;
 use rt::local_ptr;
 use rt::local::Local;
 use rt::rtio::RemoteCallback;
@@ -271,6 +273,14 @@ impl Scheduler {
         };
     }
 
+    /// As enqueue_task, but with the possibility for the blocked task to
+    /// already have been killed.
+    pub fn enqueue_blocked_task(&mut self, blocked_task: BlockedTask) {
+        do blocked_task.wake().map_consume |task| {
+            self.enqueue_task(task);
+        };
+    }
+
     // * Scheduler-context operations
 
     fn interpret_message_queue(~self) -> bool {
@@ -412,14 +422,26 @@ impl Scheduler {
     /// Called by a running task to end execution, after which it will
     /// be recycled by the scheduler for reuse in a new task.
     pub fn terminate_current_task(~self) {
-        assert!(self.in_task_context());
+        let mut this = self;
+        assert!(this.in_task_context());
 
         rtdebug!("ending running task");
 
-        do self.deschedule_running_task_and_then |sched, dead_task| {
-            let mut dead_task = dead_task;
-            let coroutine = dead_task.coroutine.take_unwrap();
-            coroutine.recycle(&mut sched.stack_pool);
+        // This task is post-cleanup, so it must be unkillable. This sequence
+        // of descheduling and recycling must not get interrupted by a kill.
+        // FIXME(#7544): Make this use an inner descheduler, like yield should.
+        this.current_task.get_mut_ref().death.unkillable += 1;
+
+        do this.deschedule_running_task_and_then |sched, dead_task| {
+            match dead_task.wake() {
+                Some(dead_task) => {
+                    let mut dead_task = dead_task;
+                    dead_task.death.unkillable -= 1; // FIXME(#7544) ugh
+                    let coroutine = dead_task.coroutine.take_unwrap();
+                    coroutine.recycle(&mut sched.stack_pool);
+                }
+                None => rtabort!("dead task killed before recycle"),
+            }
         }
 
         rtabort!("control reached end of task");
@@ -440,7 +462,7 @@ impl Scheduler {
             // here we know we are home, execute now OR we know we
             // aren't homed, and that this sched doesn't care
             do this.switch_running_tasks_and_then(task) |sched, last_task| {
-                sched.enqueue_task(last_task);
+                sched.enqueue_blocked_task(last_task);
             }
         } else if !homed && !this.run_anything {
             // the task isn't homed, but it can't be run here
@@ -491,6 +513,13 @@ impl Scheduler {
         }
     }
 
+    pub fn resume_blocked_task_immediately(~self, blocked_task: BlockedTask) {
+        match blocked_task.wake() {
+            Some(task) => self.resume_task_immediately(task),
+            None => Local::put(self),
+        };
+    }
+
     /// Block a running task, context switch to the scheduler, then pass the
     /// blocked task to a closure.
     ///
@@ -503,7 +532,7 @@ impl Scheduler {
     /// This passes a Scheduler pointer to the fn after the context switch
     /// in order to prevent that fn from performing further scheduling operations.
     /// Doing further scheduling could easily result in infinite recursion.
-    pub fn deschedule_running_task_and_then(~self, f: &fn(&mut Scheduler, ~Task)) {
+    pub fn deschedule_running_task_and_then(~self, f: &fn(&mut Scheduler, BlockedTask)) {
         let mut this = self;
         assert!(this.in_task_context());
 
@@ -512,8 +541,8 @@ impl Scheduler {
 
         unsafe {
             let blocked_task = this.current_task.take_unwrap();
-            let f_fake_region = transmute::<&fn(&mut Scheduler, ~Task),
-                                            &fn(&mut Scheduler, ~Task)>(f);
+            let f_fake_region = transmute::<&fn(&mut Scheduler, BlockedTask),
+                                            &fn(&mut Scheduler, BlockedTask)>(f);
             let f_opaque = ClosureConverter::from_fn(f_fake_region);
             this.enqueue_cleanup_job(GiveTask(blocked_task, f_opaque));
         }
@@ -539,7 +568,7 @@ impl Scheduler {
     /// You would want to think hard about doing this, e.g. if there are
     /// pending I/O events it would be a bad idea.
     pub fn switch_running_tasks_and_then(~self, next_task: ~Task,
-                                         f: &fn(&mut Scheduler, ~Task)) {
+                                         f: &fn(&mut Scheduler, BlockedTask)) {
         let mut this = self;
         assert!(this.in_task_context());
 
@@ -548,8 +577,8 @@ impl Scheduler {
 
         let old_running_task = this.current_task.take_unwrap();
         let f_fake_region = unsafe {
-            transmute::<&fn(&mut Scheduler, ~Task),
-                        &fn(&mut Scheduler, ~Task)>(f)
+            transmute::<&fn(&mut Scheduler, BlockedTask),
+                        &fn(&mut Scheduler, BlockedTask)>(f)
         };
         let f_opaque = ClosureConverter::from_fn(f_fake_region);
         this.enqueue_cleanup_job(GiveTask(old_running_task, f_opaque));
@@ -590,7 +619,15 @@ impl Scheduler {
         let cleanup_job = self.cleanup_job.take_unwrap();
         match cleanup_job {
             DoNothing => { }
-            GiveTask(task, f) => (f.to_fn())(self, task)
+            GiveTask(task, f) => {
+                let f = f.to_fn();
+                // Task might need to receive a kill signal instead of blocking.
+                // We can call the "and_then" only if it blocks successfully.
+                match BlockedTask::try_block(task) {
+                    Left(killed_task) => self.enqueue_task(killed_task),
+                    Right(blocked_task) => f(self, blocked_task),
+                }
+            }
         }
     }
 
@@ -663,12 +700,14 @@ impl SchedHandle {
 // complaining
 type UnsafeTaskReceiver = sys::Closure;
 trait ClosureConverter {
-    fn from_fn(&fn(&mut Scheduler, ~Task)) -> Self;
-    fn to_fn(self) -> &fn(&mut Scheduler, ~Task);
+    fn from_fn(&fn(&mut Scheduler, BlockedTask)) -> Self;
+    fn to_fn(self) -> &fn(&mut Scheduler, BlockedTask);
 }
 impl ClosureConverter for UnsafeTaskReceiver {
-    fn from_fn(f: &fn(&mut Scheduler, ~Task)) -> UnsafeTaskReceiver { unsafe { transmute(f) } }
-    fn to_fn(self) -> &fn(&mut Scheduler, ~Task) { unsafe { transmute(self) } }
+    fn from_fn(f: &fn(&mut Scheduler, BlockedTask)) -> UnsafeTaskReceiver {
+        unsafe { transmute(f) }
+    }
+    fn to_fn(self) -> &fn(&mut Scheduler, BlockedTask) { unsafe { transmute(self) } }
 }
 
 
@@ -928,8 +967,7 @@ mod test {
                 };
                 // Context switch directly to the new task
                 do sched.switch_running_tasks_and_then(task2) |sched, task1| {
-                    let task1 = Cell::new(task1);
-                    sched.enqueue_task(task1.take());
+                    sched.enqueue_blocked_task(task1);
                 }
                 unsafe { *count_ptr = *count_ptr + 1; }
             };
@@ -980,9 +1018,8 @@ mod test {
                 let sched = Local::take::<Scheduler>();
                 assert!(sched.in_task_context());
                 do sched.deschedule_running_task_and_then() |sched, task| {
-                    let task = Cell::new(task);
                     assert!(!sched.in_task_context());
-                    sched.enqueue_task(task.take());
+                    sched.enqueue_blocked_task(task);
                 }
             };
             sched.enqueue_task(task);
@@ -1004,7 +1041,7 @@ mod test {
                     do sched.event_loop.callback_ms(10) {
                         rtdebug!("in callback");
                         let mut sched = Local::take::<Scheduler>();
-                        sched.enqueue_task(task.take());
+                        sched.enqueue_blocked_task(task.take());
                         Local::put(sched);
                     }
                 }
diff --git a/src/libstd/rt/test.rs b/src/libstd/rt/test.rs
index a4242d83ecd..1562160550a 100644
--- a/src/libstd/rt/test.rs
+++ b/src/libstd/rt/test.rs
@@ -170,7 +170,7 @@ pub fn spawntask_immediately(f: ~fn()) {
 
     let sched = Local::take::<Scheduler>();
     do sched.switch_running_tasks_and_then(task) |sched, task| {
-        sched.enqueue_task(task);
+        sched.enqueue_blocked_task(task);
     }
 }
 
@@ -214,7 +214,7 @@ pub fn spawntask_random(f: ~fn()) {
 
     if run_now {
         do sched.switch_running_tasks_and_then(task) |sched, task| {
-            sched.enqueue_task(task);
+            sched.enqueue_blocked_task(task);
         }
     } else {
         sched.enqueue_task(task);
@@ -284,7 +284,7 @@ pub fn spawntask_try(f: ~fn()) -> Result<(), ()> {
 
     let sched = Local::take::<Scheduler>();
     do sched.switch_running_tasks_and_then(new_task) |sched, old_task| {
-        sched.enqueue_task(old_task);
+        sched.enqueue_blocked_task(old_task);
     }
 
     rtdebug!("enqueued the new task, now waiting on exit_status");
diff --git a/src/libstd/rt/tube.rs b/src/libstd/rt/tube.rs
index f61eee8859b..bc223d8f3f7 100644
--- a/src/libstd/rt/tube.rs
+++ b/src/libstd/rt/tube.rs
@@ -18,13 +18,13 @@ use clone::Clone;
 use super::rc::RC;
 use rt::sched::Scheduler;
 use rt::{context, TaskContext, SchedulerContext};
+use rt::kill::BlockedTask;
 use rt::local::Local;
-use rt::task::Task;
 use vec::OwnedVector;
 use container::Container;
 
 struct TubeState<T> {
-    blocked_task: Option<~Task>,
+    blocked_task: Option<BlockedTask>,
     buf: ~[T]
 }
 
@@ -55,7 +55,7 @@ impl<T> Tube<T> {
                 rtdebug!("waking blocked tube");
                 let task = (*state).blocked_task.take_unwrap();
                 let sched = Local::take::<Scheduler>();
-                sched.resume_task_immediately(task);
+                sched.resume_blocked_task_immediately(task);
             }
         }
     }
@@ -111,7 +111,7 @@ mod test {
             do sched.deschedule_running_task_and_then |sched, task| {
                 let mut tube_clone = tube_clone_cell.take();
                 tube_clone.send(1);
-                sched.enqueue_task(task);
+                sched.enqueue_blocked_task(task);
             }
 
             assert!(tube.recv() == 1);
@@ -133,7 +133,7 @@ mod test {
                     // sending will wake it up.
                     tube_clone.send(1);
                 }
-                sched.enqueue_task(task);
+                sched.enqueue_blocked_task(task);
             }
 
             assert!(tube.recv() == 1);
@@ -168,7 +168,7 @@ mod test {
                     }
                 }
 
-                sched.enqueue_task(task);
+                sched.enqueue_blocked_task(task);
             }
 
             for int::range(0, MAX) |i| {
diff --git a/src/libstd/rt/uv/uvio.rs b/src/libstd/rt/uv/uvio.rs
index 5d0c64c6867..7046afe8551 100644
--- a/src/libstd/rt/uv/uvio.rs
+++ b/src/libstd/rt/uv/uvio.rs
@@ -227,7 +227,7 @@ impl IoFactory for UvIoFactory {
 
                     // Context switch
                     let scheduler = Local::take::<Scheduler>();
-                    scheduler.resume_task_immediately(task_cell.take());
+                    scheduler.resume_blocked_task_immediately(task_cell.take());
                 } else {
                     rtdebug!("status is some");
                     let task_cell = Cell::new(task_cell.take());
@@ -235,7 +235,7 @@ impl IoFactory for UvIoFactory {
                         let res = Err(uv_error_to_io_error(status.get()));
                         unsafe { (*result_cell_ptr).put_back(res); }
                         let scheduler = Local::take::<Scheduler>();
-                        scheduler.resume_task_immediately(task_cell.take());
+                        scheduler.resume_blocked_task_immediately(task_cell.take());
                     }
                 };
             }
@@ -255,7 +255,7 @@ impl IoFactory for UvIoFactory {
                     let task_cell = Cell::new(task);
                     do watcher.as_stream().close {
                         let scheduler = Local::take::<Scheduler>();
-                        scheduler.resume_task_immediately(task_cell.take());
+                        scheduler.resume_blocked_task_immediately(task_cell.take());
                     }
                 }
                 Err(uv_error_to_io_error(uverr))
@@ -273,7 +273,7 @@ impl IoFactory for UvIoFactory {
                     let task_cell = Cell::new(task);
                     do watcher.close {
                         let scheduler = Local::take::<Scheduler>();
-                        scheduler.resume_task_immediately(task_cell.take());
+                        scheduler.resume_blocked_task_immediately(task_cell.take());
                     }
                 }
                 Err(uv_error_to_io_error(uverr))
@@ -309,7 +309,7 @@ impl Drop for UvTcpListener {
             let task_cell = Cell::new(task);
             do watcher.as_stream().close {
                 let scheduler = Local::take::<Scheduler>();
-                scheduler.resume_task_immediately(task_cell.take());
+                scheduler.resume_blocked_task_immediately(task_cell.take());
             }
         }
     }
@@ -372,7 +372,7 @@ impl Drop for UvTcpStream {
             let task_cell = Cell::new(task);
             do self.close {
                 let scheduler = Local::take::<Scheduler>();
-                scheduler.resume_task_immediately(task_cell.take());
+                scheduler.resume_blocked_task_immediately(task_cell.take());
             }
         }
     }
@@ -419,7 +419,7 @@ impl RtioTcpStream for UvTcpStream {
                 unsafe { (*result_cell_ptr).put_back(result); }
 
                 let scheduler = Local::take::<Scheduler>();
-                scheduler.resume_task_immediately(task_cell.take());
+                scheduler.resume_blocked_task_immediately(task_cell.take());
             }
         }
 
@@ -447,7 +447,7 @@ impl RtioTcpStream for UvTcpStream {
                 unsafe { (*result_cell_ptr).put_back(result); }
 
                 let scheduler = Local::take::<Scheduler>();
-                scheduler.resume_task_immediately(task_cell.take());
+                scheduler.resume_blocked_task_immediately(task_cell.take());
             }
         }
 
@@ -473,7 +473,7 @@ impl Drop for UvUdpSocket {
             let task_cell = Cell::new(task);
             do self.close {
                 let scheduler = Local::take::<Scheduler>();
-                scheduler.resume_task_immediately(task_cell.take());
+                scheduler.resume_blocked_task_immediately(task_cell.take());
             }
         }
     }
@@ -513,7 +513,7 @@ impl RtioUdpSocket for UvUdpSocket {
                 unsafe { (*result_cell_ptr).put_back(result); }
 
                 let scheduler = Local::take::<Scheduler>();
-                scheduler.resume_task_immediately(task_cell.take());
+                scheduler.resume_blocked_task_immediately(task_cell.take());
             }
         }
 
@@ -540,7 +540,7 @@ impl RtioUdpSocket for UvUdpSocket {
                 unsafe { (*result_cell_ptr).put_back(result); }
 
                 let scheduler = Local::take::<Scheduler>();
-                scheduler.resume_task_immediately(task_cell.take());
+                scheduler.resume_blocked_task_immediately(task_cell.take());
             }
         }
 
@@ -678,7 +678,7 @@ fn test_read_and_block() {
                 // not ready for it
                 do scheduler.deschedule_running_task_and_then |sched, task| {
                     let task = Cell::new(task);
-                    sched.enqueue_task(task.take());
+                    sched.enqueue_blocked_task(task.take());
                 }
             }
 
diff --git a/src/libstd/task/mod.rs b/src/libstd/task/mod.rs
index f2c1d2ffd9d..11e2b99d7c0 100644
--- a/src/libstd/task/mod.rs
+++ b/src/libstd/task/mod.rs
@@ -515,9 +515,10 @@ pub fn yield() {
             }
             _ => {
                 // XXX: What does yield really mean in newsched?
+                // FIXME(#7544): Optimize this, since we know we won't block.
                 let sched = Local::take::<Scheduler>();
                 do sched.deschedule_running_task_and_then |sched, task| {
-                    sched.enqueue_task(task);
+                    sched.enqueue_blocked_task(task);
                 }
             }
         }