about summary refs log tree commit diff
diff options
context:
space:
mode:
authorBrian Anderson <banderson@mozilla.com>2013-04-18 19:32:32 -0700
committerBrian Anderson <banderson@mozilla.com>2013-04-19 12:05:18 -0700
commitd261bb32d95732ef8aa74b010bb8c98f058785b2 (patch)
treedf04e4ff3827332c712cc997a01c5cf1e0575ff5
parent15ece0c23ef9b2e696ea4e81bf088e37fedc5d01 (diff)
downloadrust-d261bb32d95732ef8aa74b010bb8c98f058785b2.tar.gz
rust-d261bb32d95732ef8aa74b010bb8c98f058785b2.zip
core: More tweaks to the thread-local scheduler interface
-rw-r--r--src/libcore/rt/mod.rs14
-rw-r--r--src/libcore/rt/sched/local_sched.rs (renamed from src/libcore/rt/sched/local.rs)22
-rw-r--r--src/libcore/rt/sched/mod.rs142
-rw-r--r--src/libcore/rt/uvio.rs68
-rw-r--r--src/libcore/task/spawn.rs2
5 files changed, 125 insertions, 123 deletions
diff --git a/src/libcore/rt/mod.rs b/src/libcore/rt/mod.rs
index 2b9f147bf62..e93e0c6fc6c 100644
--- a/src/libcore/rt/mod.rs
+++ b/src/libcore/rt/mod.rs
@@ -106,16 +106,16 @@ pub enum RuntimeContext {
 pub fn context() -> RuntimeContext {
 
     use task::rt::rust_task;
-    use self::sched::Scheduler;
+    use self::sched::local_sched;
 
     // XXX: Hitting TLS twice to check if the scheduler exists
     // then to check for the task is not good for perf
     if unsafe { rust_try_get_task().is_not_null() } {
         return OldTaskContext;
     } else {
-        if Scheduler::have_local() {
+        if local_sched::exists() {
             let context = ::cell::empty_cell();
-            do Scheduler::borrow_local |sched| {
+            do local_sched::borrow |sched| {
                 if sched.in_task_context() {
                     context.put_back(TaskContext);
                 } else {
@@ -137,7 +137,7 @@ pub fn context() -> RuntimeContext {
 #[test]
 fn test_context() {
     use unstable::run_in_bare_thread;
-    use self::sched::{Scheduler, Task};
+    use self::sched::{local_sched, Task};
     use self::uvio::UvEventLoop;
     use cell::Cell;
 
@@ -147,11 +147,11 @@ fn test_context() {
         let mut sched = ~UvEventLoop::new_scheduler();
         let task = ~do Task::new(&mut sched.stack_pool) {
             assert!(context() == TaskContext);
-            let sched = Scheduler::take_local();
+            let sched = local_sched::take();
             do sched.deschedule_running_task_and_then() |task| {
                 assert!(context() == SchedulerContext);
                 let task = Cell(task);
-                do Scheduler::borrow_local |sched| {
+                do local_sched::borrow |sched| {
                     sched.task_queue.push_back(task.take());
                 }
             }
@@ -166,7 +166,7 @@ fn test_context() {
 pub fn run_in_newsched_task(f: ~fn()) {
     use cell::Cell;
     use unstable::run_in_bare_thread;
-    use self::sched::{Scheduler, Task};
+    use self::sched::Task;
     use self::uvio::UvEventLoop;
 
     let f = Cell(Cell(f));
diff --git a/src/libcore/rt/sched/local.rs b/src/libcore/rt/sched/local_sched.rs
index 0eb97ee67ec..2ab50252ac6 100644
--- a/src/libcore/rt/sched/local.rs
+++ b/src/libcore/rt/sched/local_sched.rs
@@ -16,6 +16,7 @@ use libc::c_void;
 use cast::transmute;
 
 use super::Scheduler;
+use super::super::rtio::IoFactoryObject;
 use tls = super::super::thread_local_storage;
 #[cfg(test)] use super::super::uvio::UvEventLoop;
 
@@ -50,11 +51,21 @@ pub fn exists() -> bool {
     }
 }
 
+/// Borrow the thread-local scheduler from thread-local storage.
+/// While the scheduler is borrowed it is not available in TLS.
+pub fn borrow(f: &fn(&mut Scheduler)) {
+    let mut sched = take();
+    f(sched);
+    put(sched);
+}
+
 /// Borrow a mutable reference to the thread-local Scheduler
+///
 /// # Safety Note
+///
 /// Because this leaves the Scheduler in thread-local storage it is possible
 /// For the Scheduler pointer to be aliased
-pub unsafe fn borrow() -> &mut Scheduler {
+pub unsafe fn unsafe_borrow() -> &mut Scheduler {
     unsafe {
         let key = tls_key();
         let mut void_sched: *mut c_void = tls::get(key);
@@ -70,6 +81,13 @@ pub unsafe fn borrow() -> &mut Scheduler {
     }
 }
 
+pub unsafe fn unsafe_borrow_io() -> &mut IoFactoryObject {
+    unsafe {
+        let sched = unsafe_borrow();
+        return sched.event_loop.io().unwrap();
+    }
+}
+
 fn tls_key() -> tls::Key {
     maybe_tls_key().get()
 }
@@ -125,7 +143,7 @@ fn borrow_smoke_test() {
     let scheduler = ~UvEventLoop::new_scheduler();
     put(scheduler);
     unsafe {
-        let _scheduler = borrow();
+        let _scheduler = unsafe_borrow();
     }
     let _scheduler = take();
 }
diff --git a/src/libcore/rt/sched/mod.rs b/src/libcore/rt/sched/mod.rs
index 1141ea480c9..fe443437e36 100644
--- a/src/libcore/rt/sched/mod.rs
+++ b/src/libcore/rt/sched/mod.rs
@@ -14,7 +14,7 @@ use cast::transmute;
 
 use super::work_queue::WorkQueue;
 use super::stack::{StackPool, StackSegment};
-use super::rtio::{EventLoop, EventLoopObject, IoFactoryObject};
+use super::rtio::{EventLoop, EventLoopObject};
 use super::context::Context;
 
 #[cfg(test)] use super::uvio::UvEventLoop;
@@ -22,7 +22,8 @@ use super::context::Context;
 #[cfg(test)] use int;
 #[cfg(test)] use cell::Cell;
 
-mod local;
+// A more convenient name for external callers, e.g. `local_sched::take()`
+pub mod local_sched;
 
 /// The Scheduler is responsible for coordinating execution of Tasks
 /// on a single thread. When the scheduler is running it is owned by
@@ -90,52 +91,25 @@ pub impl Scheduler {
         assert!(!self.in_task_context());
 
         // Give ownership of the scheduler (self) to the thread
-        local::put(self);
-
-        let scheduler = unsafe { local::borrow() };
-        fn run_scheduler_once() {
-            let scheduler = Scheduler::take_local();
-            if scheduler.resume_task_from_queue() {
-                // Ok, a task ran. Nice! We'll do it again later
-                do Scheduler::borrow_local |scheduler| {
-                    scheduler.event_loop.callback(run_scheduler_once);
+        local_sched::put(self);
+
+        unsafe {
+            let scheduler = local_sched::unsafe_borrow();
+            fn run_scheduler_once() {
+                let scheduler = local_sched::take();
+                if scheduler.resume_task_from_queue() {
+                    // Ok, a task ran. Nice! We'll do it again later
+                    do local_sched::borrow |scheduler| {
+                        scheduler.event_loop.callback(run_scheduler_once);
+                    }
                 }
             }
-        }
-
-        scheduler.event_loop.callback(run_scheduler_once);
-        scheduler.event_loop.run();
-
-        return local::take();
-    }
 
-    /// Get a mutable pointer to the thread-local I/O
-    /// # Safety Note
-    /// This allows other mutable aliases to the scheduler, both in the current
-    /// execution context and other execution contexts.
-    unsafe fn borrow_local_io() -> &mut IoFactoryObject {
-        unsafe {
-            let io = local::borrow().event_loop.io().unwrap();
-            transmute::<&mut IoFactoryObject, &mut IoFactoryObject>(io)
+            scheduler.event_loop.callback(run_scheduler_once);
+            scheduler.event_loop.run();
         }
-    }
-
-    /// Borrow the thread-local scheduler from thread-local storage.
-    /// While the scheduler is borrowed it is not available in TLS.
-    fn borrow_local(f: &fn(&mut Scheduler)) {
-        let mut sched = local::take();
-        f(sched);
-        local::put(sched);
-    }
-
-    /// Take ownership of the scheduler from thread local storage
-    fn take_local() -> ~Scheduler {
-        local::take()
-    }
 
-    /// Just check whether there is a local scheduler
-    fn have_local() -> bool {
-        local::exists()
+        return local_sched::take();
     }
 
     // * Scheduler-context operations
@@ -151,7 +125,7 @@ pub impl Scheduler {
             }
             None => {
                 rtdebug!("no tasks in queue");
-                local::put(self);
+                local_sched::put(self);
                 return false;
             }
         }
@@ -167,22 +141,24 @@ pub impl Scheduler {
         self.current_task = Some(task);
         self.enqueue_cleanup_job(DoNothing);
 
-        local::put(self);
+        local_sched::put(self);
 
         // Take pointers to both the task and scheduler's saved registers.
-        let sched = unsafe { local::borrow() };
-        let (sched_context, _, next_task_context) = sched.get_contexts();
-        let next_task_context = next_task_context.unwrap();
-        // Context switch to the task, restoring it's registers
-        // and saving the scheduler's
-        Context::swap(sched_context, next_task_context);
-
-        let sched = unsafe { local::borrow() };
-        // The running task should have passed ownership elsewhere
-        assert!(sched.current_task.is_none());
-
-        // Running tasks may have asked us to do some cleanup
-        sched.run_cleanup_job();
+        unsafe {
+            let sched = local_sched::unsafe_borrow();
+            let (sched_context, _, next_task_context) = sched.get_contexts();
+            let next_task_context = next_task_context.unwrap();
+            // Context switch to the task, restoring it's registers
+            // and saving the scheduler's
+            Context::swap(sched_context, next_task_context);
+
+            let sched = local_sched::unsafe_borrow();
+            // The running task should have passed ownership elsewhere
+            assert!(sched.current_task.is_none());
+
+            // Running tasks may have asked us to do some cleanup
+            sched.run_cleanup_job();
+        }
     }
 
 
@@ -199,9 +175,9 @@ pub impl Scheduler {
         let dead_task = self.current_task.swap_unwrap();
         self.enqueue_cleanup_job(RecycleTask(dead_task));
 
-        local::put(self);
+        local_sched::put(self);
 
-        let sched = unsafe { local::borrow() };
+        let sched = unsafe { local_sched::unsafe_borrow() };
         let (sched_context, last_task_context, _) = sched.get_contexts();
         let last_task_context = last_task_context.unwrap();
         Context::swap(last_task_context, sched_context);
@@ -228,15 +204,15 @@ pub impl Scheduler {
         let f_opaque = ClosureConverter::from_fn(f_fake_region);
         self.enqueue_cleanup_job(GiveTask(blocked_task, f_opaque));
 
-        local::put(self);
+        local_sched::put(self);
 
-        let sched = unsafe { local::borrow() };
+        let sched = unsafe { local_sched::unsafe_borrow() };
         let (sched_context, last_task_context, _) = sched.get_contexts();
         let last_task_context = last_task_context.unwrap();
         Context::swap(last_task_context, sched_context);
 
         // We could be executing in a different thread now
-        let sched = unsafe { local::borrow() };
+        let sched = unsafe { local_sched::unsafe_borrow() };
         sched.run_cleanup_job();
     }
 
@@ -253,17 +229,19 @@ pub impl Scheduler {
         self.enqueue_cleanup_job(RescheduleTask(old_running_task));
         self.current_task = Some(next_task);
 
-        local::put(self);
+        local_sched::put(self);
 
-        let sched = unsafe { local::borrow() };
-        let (_, last_task_context, next_task_context) = sched.get_contexts();
-        let last_task_context = last_task_context.unwrap();
-        let next_task_context = next_task_context.unwrap();
-        Context::swap(last_task_context, next_task_context);
-
-        // We could be executing in a different thread now
-        let sched = unsafe { local::borrow() };
-        sched.run_cleanup_job();
+        unsafe {
+            let sched = local_sched::unsafe_borrow();
+            let (_, last_task_context, next_task_context) = sched.get_contexts();
+            let last_task_context = last_task_context.unwrap();
+            let next_task_context = next_task_context.unwrap();
+            Context::swap(last_task_context, next_task_context);
+
+            // We could be executing in a different thread now
+            let sched = local_sched::unsafe_borrow();
+            sched.run_cleanup_job();
+        }
     }
 
     // * Other stuff
@@ -363,12 +341,14 @@ pub impl Task {
             // This is the first code to execute after the initial
             // context switch to the task. The previous context may
             // have asked us to do some cleanup.
-            let sched = unsafe { local::borrow() };
-            sched.run_cleanup_job();
+            unsafe {
+                let sched = local_sched::unsafe_borrow();
+                sched.run_cleanup_job();
+            }
 
             start();
 
-            let sched = Scheduler::take_local();
+            let sched = local_sched::take();
             sched.terminate_current_task();
         };
         return wrapper;
@@ -428,7 +408,7 @@ fn test_swap_tasks() {
         let mut sched = ~UvEventLoop::new_scheduler();
         let task1 = ~do Task::new(&mut sched.stack_pool) {
             unsafe { *count_ptr = *count_ptr + 1; }
-            let mut sched = Scheduler::take_local();
+            let mut sched = local_sched::take();
             let task2 = ~do Task::new(&mut sched.stack_pool) {
                 unsafe { *count_ptr = *count_ptr + 1; }
             };
@@ -460,7 +440,7 @@ fn test_run_a_lot_of_tasks_queued() {
         assert!(count == MAX);
 
         fn run_task(count_ptr: *mut int) {
-            do Scheduler::borrow_local |sched| {
+            do local_sched::borrow |sched| {
                 let task = ~do Task::new(&mut sched.stack_pool) {
                     unsafe {
                         *count_ptr = *count_ptr + 1;
@@ -493,7 +473,7 @@ fn test_run_a_lot_of_tasks_direct() {
         assert!(count == MAX);
 
         fn run_task(count_ptr: *mut int) {
-            let mut sched = Scheduler::take_local();
+            let mut sched = local_sched::take();
             let task = ~do Task::new(&mut sched.stack_pool) {
                 unsafe {
                     *count_ptr = *count_ptr + 1;
@@ -513,11 +493,11 @@ fn test_block_task() {
     do run_in_bare_thread {
         let mut sched = ~UvEventLoop::new_scheduler();
         let task = ~do Task::new(&mut sched.stack_pool) {
-            let sched = Scheduler::take_local();
+            let sched = local_sched::take();
             assert!(sched.in_task_context());
             do sched.deschedule_running_task_and_then() |task| {
                 let task = Cell(task);
-                do Scheduler::borrow_local |sched| {
+                do local_sched::borrow |sched| {
                     assert!(!sched.in_task_context());
                     sched.task_queue.push_back(task.take());
                 }
diff --git a/src/libcore/rt/uvio.rs b/src/libcore/rt/uvio.rs
index b069c67a5f7..ff539739835 100644
--- a/src/libcore/rt/uvio.rs
+++ b/src/libcore/rt/uvio.rs
@@ -17,7 +17,7 @@ use super::rtio::*;
 use ops::Drop;
 use cell::{Cell, empty_cell};
 use cast::transmute;
-use super::sched::Scheduler;
+use super::sched::{Scheduler, local_sched};
 
 #[cfg(test)] use super::sched::Task;
 #[cfg(test)] use unstable::run_in_bare_thread;
@@ -121,14 +121,14 @@ impl IoFactory for UvIoFactory {
         let result_cell = empty_cell();
         let result_cell_ptr: *Cell<Option<~StreamObject>> = &result_cell;
 
-        let scheduler = Scheduler::take_local();
+        let scheduler = local_sched::take();
         assert!(scheduler.in_task_context());
 
         // Block this task and take ownership, switch to scheduler context
         do scheduler.deschedule_running_task_and_then |task| {
 
             rtdebug!("connect: entered scheduler context");
-            do Scheduler::borrow_local |scheduler| {
+            do local_sched::borrow |scheduler| {
                 assert!(!scheduler.in_task_context());
             }
             let mut tcp_watcher = TcpWatcher::new(self.uv_loop());
@@ -150,7 +150,7 @@ impl IoFactory for UvIoFactory {
                 unsafe { (*result_cell_ptr).put_back(maybe_stream); }
 
                 // Context switch
-                let scheduler = Scheduler::take_local();
+                let scheduler = local_sched::take();
                 scheduler.resume_task_immediately(task_cell.take());
             }
         }
@@ -195,7 +195,7 @@ impl TcpListener for UvTcpListener {
 
         let server_tcp_watcher = self.watcher();
 
-        let scheduler = Scheduler::take_local();
+        let scheduler = local_sched::take();
         assert!(scheduler.in_task_context());
 
         do scheduler.deschedule_running_task_and_then |task| {
@@ -218,7 +218,7 @@ impl TcpListener for UvTcpListener {
 
                 rtdebug!("resuming task from listen");
                 // Context switch
-                let scheduler = Scheduler::take_local();
+                let scheduler = local_sched::take();
                 scheduler.resume_task_immediately(task_cell.take());
             }
         }
@@ -258,13 +258,13 @@ impl Stream for UvStream {
         let result_cell = empty_cell();
         let result_cell_ptr: *Cell<Result<uint, ()>> = &result_cell;
 
-        let scheduler = Scheduler::take_local();
+        let scheduler = local_sched::take();
         assert!(scheduler.in_task_context());
         let watcher = self.watcher();
         let buf_ptr: *&mut [u8] = &buf;
         do scheduler.deschedule_running_task_and_then |task| {
             rtdebug!("read: entered scheduler context");
-            do Scheduler::borrow_local |scheduler| {
+            do local_sched::borrow |scheduler| {
                 assert!(!scheduler.in_task_context());
             }
             let mut watcher = watcher;
@@ -292,7 +292,7 @@ impl Stream for UvStream {
 
                 unsafe { (*result_cell_ptr).put_back(result); }
 
-                let scheduler = Scheduler::take_local();
+                let scheduler = local_sched::take();
                 scheduler.resume_task_immediately(task_cell.take());
             }
         }
@@ -304,7 +304,7 @@ impl Stream for UvStream {
     fn write(&mut self, buf: &[u8]) -> Result<(), ()> {
         let result_cell = empty_cell();
         let result_cell_ptr: *Cell<Result<(), ()>> = &result_cell;
-        let scheduler = Scheduler::take_local();
+        let scheduler = local_sched::take();
         assert!(scheduler.in_task_context());
         let watcher = self.watcher();
         let buf_ptr: *&[u8] = &buf;
@@ -323,7 +323,7 @@ impl Stream for UvStream {
 
                 unsafe { (*result_cell_ptr).put_back(result); }
 
-                let scheduler = Scheduler::take_local();
+                let scheduler = local_sched::take();
                 scheduler.resume_task_immediately(task_cell.take());
             }
         }
@@ -339,7 +339,7 @@ fn test_simple_io_no_connect() {
     do run_in_bare_thread {
         let mut sched = ~UvEventLoop::new_scheduler();
         let task = ~do Task::new(&mut sched.stack_pool) {
-            let io = unsafe { Scheduler::borrow_local_io() };
+            let io = unsafe { local_sched::unsafe_borrow_io() };
             let addr = Ipv4(127, 0, 0, 1, 2926);
             let maybe_chan = io.connect(addr);
             assert!(maybe_chan.is_none());
@@ -357,25 +357,29 @@ fn test_simple_tcp_server_and_client() {
         let addr = Ipv4(127, 0, 0, 1, 2929);
 
         let client_task = ~do Task::new(&mut sched.stack_pool) {
-            let io = unsafe { Scheduler::borrow_local_io() };
-            let mut stream = io.connect(addr).unwrap();
-            stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
-            stream.close();
+            unsafe {
+                let io = local_sched::unsafe_borrow_io();
+                let mut stream = io.connect(addr).unwrap();
+                stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
+                stream.close();
+            }
         };
 
         let server_task = ~do Task::new(&mut sched.stack_pool) {
-            let io = unsafe { Scheduler::borrow_local_io() };
-            let mut listener = io.bind(addr).unwrap();
-            let mut stream = listener.listen().unwrap();
-            let mut buf = [0, .. 2048];
-            let nread = stream.read(buf).unwrap();
-            assert!(nread == 8);
-            for uint::range(0, nread) |i| {
-                rtdebug!("%u", buf[i] as uint);
-                assert!(buf[i] == i as u8);
+            unsafe {
+                let io = local_sched::unsafe_borrow_io();
+                let mut listener = io.bind(addr).unwrap();
+                let mut stream = listener.listen().unwrap();
+                let mut buf = [0, .. 2048];
+                let nread = stream.read(buf).unwrap();
+                assert!(nread == 8);
+                for uint::range(0, nread) |i| {
+                    rtdebug!("%u", buf[i] as uint);
+                    assert!(buf[i] == i as u8);
+                }
+                stream.close();
+                listener.close();
             }
-            stream.close();
-            listener.close();
         };
 
         // Start the server first so it listens before the client connects
@@ -392,7 +396,7 @@ fn test_read_and_block() {
         let addr = Ipv4(127, 0, 0, 1, 2930);
 
         let client_task = ~do Task::new(&mut sched.stack_pool) {
-            let io = unsafe { Scheduler::borrow_local_io() };
+            let io = unsafe { local_sched::unsafe_borrow_io() };
             let mut stream = io.connect(addr).unwrap();
             stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
             stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
@@ -402,7 +406,7 @@ fn test_read_and_block() {
         };
 
         let server_task = ~do Task::new(&mut sched.stack_pool) {
-            let io = unsafe { Scheduler::borrow_local_io() };
+            let io = unsafe { local_sched::unsafe_borrow_io() };
             let mut listener = io.bind(addr).unwrap();
             let mut stream = listener.listen().unwrap();
             let mut buf = [0, .. 2048];
@@ -420,13 +424,13 @@ fn test_read_and_block() {
                 }
                 reads += 1;
 
-                let scheduler = Scheduler::take_local();
+                let scheduler = local_sched::take();
                 // Yield to the other task in hopes that it
                 // will trigger a read callback while we are
                 // not ready for it
                 do scheduler.deschedule_running_task_and_then |task| {
                     let task = Cell(task);
-                    do Scheduler::borrow_local |scheduler| {
+                    do local_sched::borrow |scheduler| {
                         scheduler.task_queue.push_back(task.take());
                     }
                 }
@@ -453,7 +457,7 @@ fn test_read_read_read() {
         let addr = Ipv4(127, 0, 0, 1, 2931);
 
         let client_task = ~do Task::new(&mut sched.stack_pool) {
-            let io = unsafe { Scheduler::borrow_local_io() };
+            let io = unsafe { local_sched::unsafe_borrow_io() };
             let mut stream = io.connect(addr).unwrap();
             let mut buf = [0, .. 2048];
             let mut total_bytes_read = 0;
diff --git a/src/libcore/task/spawn.rs b/src/libcore/task/spawn.rs
index 47e38602995..5b45f498319 100644
--- a/src/libcore/task/spawn.rs
+++ b/src/libcore/task/spawn.rs
@@ -553,7 +553,7 @@ fn spawn_raw_newsched(opts: TaskOpts, f: ~fn()) {
     use rt::sched::*;
 
     // XXX: How to schedule a new task is a policy decision that shouldn't be made here
-    let mut sched = Scheduler::take_local();
+    let mut sched = local_sched::take();
     let task = ~Task::new(&mut sched.stack_pool, f);
     sched.resume_task_from_running_task_direct(task);
 }