about summary refs log tree commit diff
diff options
context:
space:
mode:
authortoddaaro <github@opprobrio.us>2013-07-31 13:52:22 -0700
committertoddaaro <github@opprobrio.us>2013-08-01 15:18:29 -0700
commit1d82fe5aea71b1c265634f32716b268972141efb (patch)
tree876b41aeb9d01d450bdd08f855f572804f24066d
parenta5f55b3ead06886190d905cfc826bf1d072ff675 (diff)
downloadrust-1d82fe5aea71b1c265634f32716b268972141efb.tar.gz
rust-1d82fe5aea71b1c265634f32716b268972141efb.zip
fixed incorrect handling of returned scheduler option and restructed scheduler functions slightly
-rw-r--r--src/libstd/rt/comm.rs8
-rw-r--r--src/libstd/rt/mod.rs12
-rw-r--r--src/libstd/rt/sched.rs109
-rw-r--r--src/libstd/rt/task.rs4
-rw-r--r--src/libstd/rt/test.rs4
-rw-r--r--src/libstd/rt/uv/uvio.rs2
6 files changed, 64 insertions, 75 deletions
diff --git a/src/libstd/rt/comm.rs b/src/libstd/rt/comm.rs
index bb106edad94..5a671d877d2 100644
--- a/src/libstd/rt/comm.rs
+++ b/src/libstd/rt/comm.rs
@@ -131,10 +131,7 @@ impl<T> ChanOne<T> {
                     // Port is blocked. Wake it up.
                     let recvr = BlockedTask::cast_from_uint(task_as_state);
                     do recvr.wake().map_consume |woken_task| {
-                        let mut sched = Local::take::<Scheduler>();
-                        rtdebug!("rendezvous send");
-                        sched.metrics.rendezvous_sends += 1;
-                        sched.schedule_task(woken_task);
+                        Scheduler::run_task(woken_task);
                     };
                 }
             }
@@ -350,8 +347,7 @@ impl<T> Drop for ChanOne<T> {
                     assert!((*this.packet()).payload.is_none());
                     let recvr = BlockedTask::cast_from_uint(task_as_state);
                     do recvr.wake().map_consume |woken_task| {
-                        let sched = Local::take::<Scheduler>();
-                        sched.schedule_task(woken_task);
+                        Scheduler::run_task(woken_task);
                     };
                 }
             }
diff --git a/src/libstd/rt/mod.rs b/src/libstd/rt/mod.rs
index 73c30e5779c..f0f4b646103 100644
--- a/src/libstd/rt/mod.rs
+++ b/src/libstd/rt/mod.rs
@@ -259,6 +259,8 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int {
     let mut handles = ~[];
 
     do nscheds.times {
+        rtdebug!("inserting a regular scheduler");
+
         // Every scheduler is driven by an I/O event loop.
         let loop_ = ~UvEventLoop::new();
         let mut sched = ~Scheduler::new(loop_, work_queue.clone(), sleepers.clone());
@@ -344,6 +346,7 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int {
 
     // Run each remaining scheduler in a thread.
     while !scheds.is_empty() {
+        rtdebug!("creating regular schedulers");
         let sched = scheds.pop();
         let sched_cell = Cell::new(sched);
         let thread = do Thread::start {
@@ -360,15 +363,21 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int {
 
     if use_main_sched {
 
+        rtdebug!("about to create the main scheduler task");
+
         let mut main_sched = main_sched.get();
 
         let home = Sched(main_sched.make_handle());
-        let mut main_task = ~Task::new_root_homed(&mut scheds[0].stack_pool,
+        let mut main_task = ~Task::new_root_homed(&mut main_sched.stack_pool,
                                                   home, main.take());
         main_task.death.on_exit = Some(on_exit.take());
+        rtdebug!("boostrapping main_task");
+
         main_sched.bootstrap(main_task);
     }
 
+    rtdebug!("waiting for threads");
+
     // Wait for schedulers
     foreach thread in threads.consume_iter() {
         thread.join();
@@ -404,7 +413,6 @@ pub fn context() -> RuntimeContext {
     if unsafe { rust_try_get_task().is_not_null() } {
         return OldTaskContext;
     } else if Local::exists::<Task>() {
-        rtdebug!("either task or scheduler context in newrt");
         // In this case we know it is a new runtime context, but we
         // need to check which one. Going to try borrowing task to
         // check. Task should always be in TLS, so hopefully this
diff --git a/src/libstd/rt/sched.rs b/src/libstd/rt/sched.rs
index 816c963ad18..4abe69a7d13 100644
--- a/src/libstd/rt/sched.rs
+++ b/src/libstd/rt/sched.rs
@@ -142,11 +142,11 @@ impl Scheduler {
         local_ptr::init_tls_key();
 
         // Create a task for the scheduler with an empty context.
-        let sched_task = Task::new_sched_task();
+        let sched_task = ~Task::new_sched_task();
 
         // Now that we have an empty task struct for the scheduler
         // task, put it in TLS.
-        Local::put::(~sched_task);
+        Local::put::(sched_task);
 
         // Now, as far as all the scheduler state is concerned, we are
         // inside the "scheduler" context. So we can act like the
@@ -165,8 +165,6 @@ impl Scheduler {
         // cleaning up the memory it uses. As we didn't actually call
         // task.run() on the scheduler task we never get through all
         // the cleanup code it runs.
-
-        rtdebug!("post sched.run(), cleaning up scheduler task");
         let mut stask = Local::take::<Task>();
         stask.destroyed = true;
     }
@@ -224,6 +222,8 @@ impl Scheduler {
         // 2) A shutdown is also easy, shutdown.
         // 3) A pinned task - we resume immediately and do not return
         //    here.
+        // 4) A message from another scheduler with a non-homed task
+        //    to run here.
 
         let result = sched.interpret_message_queue();
         let sched = match result {
@@ -236,6 +236,8 @@ impl Scheduler {
             }
         };
 
+        // Second activity is to try resuming a task from the queue.
+
         let result = sched.resume_task_from_queue();
         let mut sched = match result {
             Some(sched) => {
@@ -333,8 +335,7 @@ impl Scheduler {
                 return None;
             }
             Some(TaskFromFriend(task)) => {
-                this.schedule_task_sched_context(task);
-                return None;
+                return this.sched_schedule_task(task);
             }
             Some(Wake) => {
                 this.sleepy = false;
@@ -442,8 +443,6 @@ impl Scheduler {
         }
     }
 
-    // * Task-context operations
-
     /// Called by a running task to end execution, after which it will
     /// be recycled by the scheduler for reuse in a new task.
     pub fn terminate_current_task(~self) {
@@ -457,10 +456,17 @@ impl Scheduler {
         }
     }
 
-    // If a scheduling action is performed, return None. If not,
-    // return Some(sched).
+    // Scheduling a task requires a few checks to make sure the task
+    // ends up in the appropriate location. The run_anything flag on
+    // the scheduler and the home on the task need to be checked. This
+    // helper performs that check. It takes a function that specifies
+    // how to queue the the provided task if that is the correct
+    // action. This is a "core" function that requires handling the
+    // returned Option correctly.
 
-    pub fn schedule_task(~self, task: ~Task) -> Option<~Scheduler> {
+    pub fn schedule_task(~self, task: ~Task,
+                         schedule_fn: ~fn(sched: ~Scheduler, task: ~Task))
+        -> Option<~Scheduler> {
 
         // is the task home?
         let is_home = task.is_home_no_tls(&self);
@@ -474,9 +480,7 @@ impl Scheduler {
             // here we know we are home, execute now OR we know we
             // aren't homed, and that this sched doesn't care
             rtdebug!("task: %u is on ok sched, executing", to_uint(task));
-            do this.switch_running_tasks_and_then(task) |sched, last_task| {
-                sched.enqueue_blocked_task(last_task);
-            }
+            schedule_fn(this, task);
             return None;
         } else if !homed && !this.run_anything {
             // the task isn't homed, but it can't be run here
@@ -489,35 +493,30 @@ impl Scheduler {
         }
     }
 
-    // BAD BAD BAD BAD BAD
-    // Do something instead of just copy-pasting this.
-    pub fn schedule_task_sched_context(~self, task: ~Task) -> Option<~Scheduler> {
-
-        // is the task home?
-        let is_home = task.is_home_no_tls(&self);
-
-        // does the task have a home?
-        let homed = task.homed();
-
-        let mut this = self;
-
-        if is_home || (!homed && this.run_anything) {
-            // here we know we are home, execute now OR we know we
-            // aren't homed, and that this sched doesn't care
-            rtdebug!("task: %u is on ok sched, executing", to_uint(task));
-            this.resume_task_immediately(task);
-            return None;
-        } else if !homed && !this.run_anything {
-            // the task isn't homed, but it can't be run here
-            this.enqueue_task(task);
-            return Some(this);
-        } else {
-            // task isn't home, so don't run it here, send it home
-            Scheduler::send_task_home(task);
-            return Some(this);
+    // There are two contexts in which schedule_task can be called:
+    // inside the scheduler, and inside a task. These contexts handle
+    // executing the task slightly differently. In the scheduler
+    // context case we want to receive the scheduler as an input, and
+    // manually deal with the option. In the task context case we want
+    // to use TLS to find the scheduler, and deal with the option
+    // inside the helper.
+
+    pub fn sched_schedule_task(~self, task: ~Task) -> Option<~Scheduler> {
+        do self.schedule_task(task) |sched, next_task| {
+            sched.resume_task_immediately(next_task);
         }
     }
 
+    // Task context case - use TLS.
+    pub fn run_task(task: ~Task) {
+        let sched = Local::take::<Scheduler>();
+        let opt = do sched.schedule_task(task) |sched, next_task| {
+            do sched.switch_running_tasks_and_then(next_task) |sched, last_task| {
+                sched.enqueue_blocked_task(last_task);
+            }
+        };
+        opt.map_consume(Local::put);
+    }
 
     // The primary function for changing contexts. In the current
     // design the scheduler is just a slightly modified GreenTask, so
@@ -586,7 +585,7 @@ impl Scheduler {
             Context::swap(current_task_context, next_task_context);
         }
 
-        // When the context swaps back to the scheduler we immediately
+        // When the context swaps back to this task we immediately
         // run the cleanup job, as expected by the previously called
         // swap_contexts function.
         unsafe {
@@ -599,15 +598,8 @@ impl Scheduler {
         }
     }
 
-    // There are a variety of "obvious" functions to be passed to
-    // change_task_context, so we can make a few "named cases".
-
-    // Enqueue the old task on the current scheduler.
-    pub fn enqueue_old(sched: &mut Scheduler, task: ~Task) {
-        sched.enqueue_task(task);
-    }
-
-    // Sometimes we just want the old API though.
+    // Old API for task manipulation implemented over the new core
+    // function.
 
     pub fn resume_task_immediately(~self, task: ~Task) {
         do self.change_task_context(task) |sched, stask| {
@@ -668,13 +660,6 @@ impl Scheduler {
         };
     }
 
-    // A helper that looks up the scheduler and runs a task. If it can
-    // be run now it is run now.
-    pub fn run_task(new_task: ~Task) {
-        let sched = Local::take::<Scheduler>();
-        sched.schedule_task(new_task).map_consume(Local::put);
-    }
-
     // Returns a mutable reference to both contexts involved in this
     // swap. This is unsafe - we are getting mutable internal
     // references to keep even when we don't own the tasks. It looks
@@ -692,8 +677,6 @@ impl Scheduler {
         }
     }
 
-    // * Other stuff
-
     pub fn enqueue_cleanup_job(&mut self, job: CleanupJob) {
         self.cleanup_job = Some(job);
     }
@@ -1004,22 +987,22 @@ mod test {
             let port = Cell::new(port);
             let chan = Cell::new(chan);
 
-            let _thread_one = do Thread::start {
+            let thread_one = do Thread::start {
                 let chan = Cell::new(chan.take());
                 do run_in_newsched_task_core {
                     chan.take().send(());
                 }
             };
 
-            let _thread_two = do Thread::start {
+            let thread_two = do Thread::start {
                 let port = Cell::new(port.take());
                 do run_in_newsched_task_core {
                     port.take().recv();
                 }
             };
 
-            thread1.join();
-            thread2.join();
+            thread_two.join();
+            thread_one.join();
         }
     }
 
diff --git a/src/libstd/rt/task.rs b/src/libstd/rt/task.rs
index 13fdaded84b..fcc6ebeada6 100644
--- a/src/libstd/rt/task.rs
+++ b/src/libstd/rt/task.rs
@@ -209,7 +209,7 @@ impl Task {
     }
 
     pub fn run(&mut self, f: &fn()) {
-
+        rtdebug!("run called on task: %u", borrow::to_uint(self));
         self.unwinder.try(f);
         { let _ = self.taskgroup.take(); }
         self.death.collect_failure(!self.unwinder.unwinding);
@@ -301,7 +301,7 @@ impl Task {
 
 impl Drop for Task {
     fn drop(&self) {
-        rtdebug!("called drop for a task");
+        rtdebug!("called drop for a task: %u", borrow::to_uint(self));
         assert!(self.destroyed)
     }
 }
diff --git a/src/libstd/rt/test.rs b/src/libstd/rt/test.rs
index d0970ec5866..22eb42e2ee8 100644
--- a/src/libstd/rt/test.rs
+++ b/src/libstd/rt/test.rs
@@ -130,7 +130,9 @@ pub fn run_in_mt_newsched_task(f: ~fn()) {
 
         while !scheds.is_empty() {
             let mut sched = scheds.pop();
-            let bootstrap_task = ~do Task::new_root(&mut sched.stack_pool) || {};
+            let bootstrap_task = ~do Task::new_root(&mut sched.stack_pool) || {
+                rtdebug!("bootstrapping non-primary scheduler");
+            };
             let bootstrap_task_cell = Cell::new(bootstrap_task);
             let sched_cell = Cell::new(sched);
             let thread = do Thread::start {
diff --git a/src/libstd/rt/uv/uvio.rs b/src/libstd/rt/uv/uvio.rs
index e93333661cf..5be19752152 100644
--- a/src/libstd/rt/uv/uvio.rs
+++ b/src/libstd/rt/uv/uvio.rs
@@ -253,7 +253,7 @@ impl IoFactory for UvIoFactory {
         let scheduler = Local::take::<Scheduler>();
 
         // Block this task and take ownership, switch to scheduler context
-        do scheduler.deschedule_running_task_and_then |_sched, task| {
+        do scheduler.deschedule_running_task_and_then |_, task| {
 
             rtdebug!("connect: entered scheduler context");
             let mut tcp_watcher = TcpWatcher::new(self.uv_loop());