about summary refs log tree commit diff
path: root/src/libstd
diff options
context:
space:
mode:
authortoddaaro <github@opprobrio.us>2013-08-16 13:41:30 -0700
committertoddaaro <github@opprobrio.us>2013-08-16 16:37:09 -0700
commitf83835b0e7680188b46db5869f49672fef33ff40 (patch)
tree1406a0624c0ad95dd6e59ebe7af667bc26413551 /src/libstd
parent88d8baa76b0c03f6aceeb1f92120af664047ac02 (diff)
downloadrust-f83835b0e7680188b46db5869f49672fef33ff40.tar.gz
rust-f83835b0e7680188b46db5869f49672fef33ff40.zip
Moved the logic for a pausible idle callback into a new type - PausibleIdleCallback and placed the appropriate signatures in rtio and implementation into uvio.
Diffstat (limited to 'src/libstd')
-rw-r--r--src/libstd/rt/rtio.rs13
-rw-r--r--src/libstd/rt/sched.rs96
-rw-r--r--src/libstd/rt/uv/uvio.rs90
3 files changed, 103 insertions, 96 deletions
diff --git a/src/libstd/rt/rtio.rs b/src/libstd/rt/rtio.rs
index a7c794fb5f1..e29c30ba033 100644
--- a/src/libstd/rt/rtio.rs
+++ b/src/libstd/rt/rtio.rs
@@ -24,10 +24,12 @@ pub type RtioTcpStreamObject = uvio::UvTcpStream;
 pub type RtioTcpListenerObject = uvio::UvTcpListener;
 pub type RtioUdpSocketObject = uvio::UvUdpSocket;
 pub type RtioTimerObject = uvio::UvTimer;
+pub type PausibleIdleCallback = uvio::UvPausibleIdleCallback;
 
 pub trait EventLoop {
     fn run(&mut self);
     fn callback(&mut self, ~fn());
+    fn pausible_idle_callback(&mut self) -> ~PausibleIdleCallback;
     fn callback_ms(&mut self, ms: u64, ~fn());
     fn remote_callback(&mut self, ~fn()) -> ~RemoteCallbackObject;
     /// The asynchronous I/O services. Not all event loops may provide one
@@ -35,11 +37,12 @@ pub trait EventLoop {
 }
 
 pub trait RemoteCallback {
-    /// Trigger the remote callback. Note that the number of times the callback
-    /// is run is not guaranteed. All that is guaranteed is that, after calling 'fire',
-    /// the callback will be called at least once, but multiple callbacks may be coalesced
-    /// and callbacks may be called more often requested. Destruction also triggers the
-    /// callback.
+    /// Trigger the remote callback. Note that the number of times the
+    /// callback is run is not guaranteed. All that is guaranteed is
+    /// that, after calling 'fire', the callback will be called at
+    /// least once, but multiple callbacks may be coalesced and
+    /// callbacks may be called more often requested. Destruction also
+    /// triggers the callback.
     fn fire(&mut self);
 }
 
diff --git a/src/libstd/rt/sched.rs b/src/libstd/rt/sched.rs
index b07591183b7..77e3a913e2f 100644
--- a/src/libstd/rt/sched.rs
+++ b/src/libstd/rt/sched.rs
@@ -23,14 +23,13 @@ use super::message_queue::MessageQueue;
 use rt::kill::BlockedTask;
 use rt::local_ptr;
 use rt::local::Local;
-use rt::rtio::RemoteCallback;
+use rt::rtio::{RemoteCallback, PausibleIdleCallback};
 use rt::metrics::SchedMetrics;
 use borrow::{to_uint};
 use cell::Cell;
 use rand::{XorShiftRng, RngUtil};
 use iterator::{range};
 use vec::{OwnedVector};
-use rt::uv::idle::IdleWatcher;
 
 /// The Scheduler is responsible for coordinating execution of Coroutines
 /// on a single thread. When the scheduler is running it is owned by
@@ -78,10 +77,13 @@ pub struct Scheduler {
     friend_handle: Option<SchedHandle>,
     /// A fast XorShift rng for scheduler use
     rng: XorShiftRng,
-    /// An IdleWatcher
-    idle_watcher: Option<IdleWatcher>,
-    /// A flag to indicate whether or not the idle callback is active.
-    idle_flag: bool
+    /// A toggleable idle callback
+    idle_callback: ~PausibleIdleCallback
+}
+
+enum CleanupJob {
+    DoNothing,
+    GiveTask(~Task, UnsafeTaskReceiver)
 }
 
 pub struct SchedHandle {
@@ -97,11 +99,6 @@ pub enum SchedMessage {
     TaskFromFriend(~Task)
 }
 
-enum CleanupJob {
-    DoNothing,
-    GiveTask(~Task, UnsafeTaskReceiver)
-}
-
 impl Scheduler {
 
     pub fn sched_id(&self) -> uint { to_uint(self) }
@@ -126,7 +123,10 @@ impl Scheduler {
                        sleeper_list: SleeperList,
                        run_anything: bool,
                        friend: Option<SchedHandle>)
-        -> Scheduler {
+        -> Scheduler {        
+
+        let mut event_loop = event_loop;
+        let idle_callback = event_loop.pausible_idle_callback();
 
         Scheduler {
             sleeper_list: sleeper_list,
@@ -142,9 +142,8 @@ impl Scheduler {
             metrics: SchedMetrics::new(),
             run_anything: run_anything,
             friend_handle: friend,
-            rng: XorShiftRng::new(),
-            idle_watcher: None,
-            idle_flag: false
+            rng: XorShiftRng::new(),            
+            idle_callback: idle_callback
         }
     }
 
@@ -172,7 +171,7 @@ impl Scheduler {
         // Before starting our first task, make sure the idle callback
         // is active. As we do not start in the sleep state this is
         // important.
-        this.activate_idle();
+        this.idle_callback.start(Scheduler::run_sched_once);
 
         // Now, as far as all the scheduler state is concerned, we are
         // inside the "scheduler" context. So we can act like the
@@ -194,7 +193,7 @@ impl Scheduler {
         // cleaning up the memory it uses. As we didn't actually call
         // task.run() on the scheduler task we never get through all
         // the cleanup code it runs.
-        let mut stask = Local::take::<Task>();
+        let mut stask = Local::take::<Task>();        
 
         rtdebug!("stopping scheduler %u", stask.sched.get_ref().sched_id());
 
@@ -202,6 +201,9 @@ impl Scheduler {
         let message = stask.sched.get_mut_ref().message_queue.pop();
         assert!(message.is_none());
 
+        // Close the idle callback.
+        stask.sched.get_mut_ref().idle_callback.close();
+
         stask.destroyed = true;
     }
 
@@ -211,11 +213,6 @@ impl Scheduler {
 
         let mut self_sched = self;
 
-        // Always run through the scheduler loop at least once so that
-        // we enter the sleep state and can then be woken up by other
-        // schedulers.
-//        self_sched.event_loop.callback(Scheduler::run_sched_once);
-
         // This is unsafe because we need to place the scheduler, with
         // the event_loop inside, inside our task. But we still need a
         // mutable reference to the event_loop to give it the "run"
@@ -252,7 +249,7 @@ impl Scheduler {
 
         // Assume that we need to continue idling unless we reach the
         // end of this function without performing an action.
-        sched.activate_idle();
+        sched.idle_callback.resume();
 
         // Our first task is to read mail to see if we have important
         // messages.
@@ -300,12 +297,12 @@ impl Scheduler {
             let handle = sched.make_handle();
             sched.sleeper_list.push(handle);
             // Since we are sleeping, deactivate the idle callback.
-            sched.pause_idle();
+            sched.idle_callback.pause();
         } else {
             rtdebug!("not sleeping, already doing so or no_sleep set");
             // We may not be sleeping, but we still need to deactivate
             // the idle callback.
-            sched.pause_idle();
+            sched.idle_callback.pause();
         }
 
         // Finished a cycle without using the Scheduler. Place it back
@@ -313,46 +310,6 @@ impl Scheduler {
         Local::put(sched);
     }
 
-    fn activate_idle(&mut self) {
-        rtdebug!("activating idle");
-        if self.idle_flag {
-            rtassert!(self.idle_watcher.is_some());
-            rtdebug!("idle flag already set, not reactivating idle watcher");
-        } else {
-            rtdebug!("idle flag was false, reactivating idle watcher");
-            self.idle_flag = true;
-            if self.idle_watcher.is_none() {
-                // There's no idle handle yet. Create one
-                let mut idle_watcher = IdleWatcher::new(self.event_loop.uvio.uv_loop());
-                do idle_watcher.start |_idle_watcher, _status| {
-                    Scheduler::run_sched_once();
-                }
-                self.idle_watcher = Some(idle_watcher);
-            } else {
-                self.idle_watcher.get_mut_ref().restart();
-            }
-        }            
-    }
-
-    fn pause_idle(&mut self) {
-        rtassert!(self.idle_watcher.is_some());
-        rtassert!(self.idle_flag);
-
-        rtdebug!("stopping idle watcher");
-
-        self.idle_flag = false;
-        if !self.no_sleep {
-            self.idle_watcher.get_mut_ref().stop();
-        } else {
-            rtdebug!("closing idle watcher");
-            // The scheduler is trying to exit. Destroy the idle watcher
-            // to drop the reference to the event loop.
-            let mut idle_watcher = self.idle_watcher.take_unwrap();
-            idle_watcher.stop();
-            idle_watcher.close(||());
-        }
-    }
-
     pub fn make_handle(&mut self) -> SchedHandle {
         let remote = self.event_loop.remote_callback(Scheduler::run_sched_once);
 
@@ -376,10 +333,9 @@ impl Scheduler {
 
         // We push the task onto our local queue clone.
         this.work_queue.push(task);
-//        this.event_loop.callback(Scheduler::run_sched_once);
 
         // There is definitely work to be done later. Make sure we wake up for it.
-        this.activate_idle();
+        this.idle_callback.resume();
 
         // We've made work available. Notify a
         // sleeping scheduler.
@@ -420,28 +376,23 @@ impl Scheduler {
         let mut this = self;
         match this.message_queue.pop() {
             Some(PinnedTask(task)) => {
-//                this.event_loop.callback(Scheduler::run_sched_once);
                 let mut task = task;
                 task.give_home(Sched(this.make_handle()));
                 this.resume_task_immediately(task);
                 return None;
             }
             Some(TaskFromFriend(task)) => {
-//                this.event_loop.callback(Scheduler::run_sched_once);
                 rtdebug!("got a task from a friend. lovely!");
                 this.sched_schedule_task(task).map_move(Local::put);
                 return None;
             }
             Some(Wake) => {
-//                this.event_loop.callback(Scheduler::run_sched_once);
                 this.sleepy = false;
                 Local::put(this);
                 return None;
-//                return Some(this);
             }
             Some(Shutdown) => {
                 rtdebug!("shutting down");
-//                this.event_loop.callback(Scheduler::run_sched_once);
                 if this.sleepy {
                     // There may be an outstanding handle on the
                     // sleeper list.  Pop them all to make sure that's
@@ -463,7 +414,6 @@ impl Scheduler {
 
                 Local::put(this);
                 return None;
-//                return Some(this);
             }
             None => {
                 return Some(this);
diff --git a/src/libstd/rt/uv/uvio.rs b/src/libstd/rt/uv/uvio.rs
index 1ad295641a7..12a6f6f37f7 100644
--- a/src/libstd/rt/uv/uvio.rs
+++ b/src/libstd/rt/uv/uvio.rs
@@ -116,6 +116,15 @@ impl EventLoop for UvEventLoop {
         }
     }
 
+    fn pausible_idle_callback(&mut self) -> ~PausibleIdleCallback {
+        let idle_watcher = IdleWatcher::new(self.uvio.uv_loop());
+        return ~UvPausibleIdleCallback {
+            watcher: idle_watcher,
+            idle_flag: false,
+            closed: false
+        };
+    }
+
     fn callback_ms(&mut self, ms: u64, f: ~fn()) {
         let mut timer =  TimerWatcher::new(self.uvio.uv_loop());
         do timer.start(ms, 0) |timer, status| {
@@ -134,6 +143,44 @@ impl EventLoop for UvEventLoop {
     }
 }
 
+pub struct UvPausibleIdleCallback {
+    watcher: IdleWatcher,
+    idle_flag: bool,
+    closed: bool
+}
+
+impl UvPausibleIdleCallback {
+    #[inline]
+    pub fn start(&mut self, f: ~fn()) {
+        do self.watcher.start |_idle_watcher, _status| {
+            f();
+        };
+        self.idle_flag = true;
+    }
+    #[inline]
+    pub fn pause(&mut self) {
+        if self.idle_flag == true {
+            self.watcher.stop();
+            self.idle_flag = false;
+        }
+    }
+    #[inline]
+    pub fn resume(&mut self) {
+        if self.idle_flag == false {
+            self.watcher.restart();
+            self.idle_flag = true;
+        }
+    }
+    #[inline]
+    pub fn close(&mut self) {
+        self.pause();
+        if !self.closed {
+            self.closed = true;
+            self.watcher.close(||());
+        }
+    }                
+}
+
 #[test]
 fn test_callback_run_once() {
     do run_in_bare_thread {
@@ -163,24 +210,31 @@ impl UvRemoteCallback {
         let async = do AsyncWatcher::new(loop_) |watcher, status| {
             assert!(status.is_none());
 
-            // The synchronization logic here is subtle. To review, the uv async handle
-            // type promises that, after it is triggered the remote callback is definitely
-            // called at least once. UvRemoteCallback needs to maintain those semantics
-            // while also shutting down cleanly from the dtor. In our case that means that,
-            // when the UvRemoteCallback dtor calls `async.send()`, here `f` is always called
-            // later.
-
-            // In the dtor both the exit flag is set and the async callback fired under a lock.
-            // Here, before calling `f`, we take the lock and check the flag. Because we are
-            // checking the flag before calling `f`, and the flag is set under the same lock
-            // as the send, then if the flag is set then we're guaranteed to call `f` after
-            // the final send.
-
-            // If the check was done after `f()` then there would be a period between that call
-            // and the check where the dtor could be called in the other thread, missing the
-            // final callback while still destroying the handle.
-
-            let should_exit = unsafe { exit_flag_clone.with_imm(|&should_exit| should_exit) };
+            // The synchronization logic here is subtle. To review,
+            // the uv async handle type promises that, after it is
+            // triggered the remote callback is definitely called at
+            // least once. UvRemoteCallback needs to maintain those
+            // semantics while also shutting down cleanly from the
+            // dtor. In our case that means that, when the
+            // UvRemoteCallback dtor calls `async.send()`, here `f` is
+            // always called later.
+
+            // In the dtor both the exit flag is set and the async
+            // callback fired under a lock.  Here, before calling `f`,
+            // we take the lock and check the flag. Because we are
+            // checking the flag before calling `f`, and the flag is
+            // set under the same lock as the send, then if the flag
+            // is set then we're guaranteed to call `f` after the
+            // final send.
+
+            // If the check was done after `f()` then there would be a
+            // period between that call and the check where the dtor
+            // could be called in the other thread, missing the final
+            // callback while still destroying the handle.
+
+            let should_exit = unsafe { 
+                exit_flag_clone.with_imm(|&should_exit| should_exit) 
+            };
 
             f();