diff options
| author | toddaaro <github@opprobrio.us> | 2013-08-16 13:41:30 -0700 |
|---|---|---|
| committer | toddaaro <github@opprobrio.us> | 2013-08-16 16:37:09 -0700 |
| commit | f83835b0e7680188b46db5869f49672fef33ff40 (patch) | |
| tree | 1406a0624c0ad95dd6e59ebe7af667bc26413551 /src/libstd/rt | |
| parent | 88d8baa76b0c03f6aceeb1f92120af664047ac02 (diff) | |
| download | rust-f83835b0e7680188b46db5869f49672fef33ff40.tar.gz rust-f83835b0e7680188b46db5869f49672fef33ff40.zip | |
Moved the logic for a pausible idle callback into a new type - PausibleIdleCallback and placed the appropriate signatures in rtio and implementation into uvio.
Diffstat (limited to 'src/libstd/rt')
| -rw-r--r-- | src/libstd/rt/rtio.rs | 13 | ||||
| -rw-r--r-- | src/libstd/rt/sched.rs | 96 | ||||
| -rw-r--r-- | src/libstd/rt/uv/uvio.rs | 90 |
3 files changed, 103 insertions, 96 deletions
diff --git a/src/libstd/rt/rtio.rs b/src/libstd/rt/rtio.rs index a7c794fb5f1..e29c30ba033 100644 --- a/src/libstd/rt/rtio.rs +++ b/src/libstd/rt/rtio.rs @@ -24,10 +24,12 @@ pub type RtioTcpStreamObject = uvio::UvTcpStream; pub type RtioTcpListenerObject = uvio::UvTcpListener; pub type RtioUdpSocketObject = uvio::UvUdpSocket; pub type RtioTimerObject = uvio::UvTimer; +pub type PausibleIdleCallback = uvio::UvPausibleIdleCallback; pub trait EventLoop { fn run(&mut self); fn callback(&mut self, ~fn()); + fn pausible_idle_callback(&mut self) -> ~PausibleIdleCallback; fn callback_ms(&mut self, ms: u64, ~fn()); fn remote_callback(&mut self, ~fn()) -> ~RemoteCallbackObject; /// The asynchronous I/O services. Not all event loops may provide one @@ -35,11 +37,12 @@ pub trait EventLoop { } pub trait RemoteCallback { - /// Trigger the remote callback. Note that the number of times the callback - /// is run is not guaranteed. All that is guaranteed is that, after calling 'fire', - /// the callback will be called at least once, but multiple callbacks may be coalesced - /// and callbacks may be called more often requested. Destruction also triggers the - /// callback. + /// Trigger the remote callback. Note that the number of times the + /// callback is run is not guaranteed. All that is guaranteed is + /// that, after calling 'fire', the callback will be called at + /// least once, but multiple callbacks may be coalesced and + /// callbacks may be called more often requested. Destruction also + /// triggers the callback. fn fire(&mut self); } diff --git a/src/libstd/rt/sched.rs b/src/libstd/rt/sched.rs index b07591183b7..77e3a913e2f 100644 --- a/src/libstd/rt/sched.rs +++ b/src/libstd/rt/sched.rs @@ -23,14 +23,13 @@ use super::message_queue::MessageQueue; use rt::kill::BlockedTask; use rt::local_ptr; use rt::local::Local; -use rt::rtio::RemoteCallback; +use rt::rtio::{RemoteCallback, PausibleIdleCallback}; use rt::metrics::SchedMetrics; use borrow::{to_uint}; use cell::Cell; use rand::{XorShiftRng, RngUtil}; use iterator::{range}; use vec::{OwnedVector}; -use rt::uv::idle::IdleWatcher; /// The Scheduler is responsible for coordinating execution of Coroutines /// on a single thread. When the scheduler is running it is owned by @@ -78,10 +77,13 @@ pub struct Scheduler { friend_handle: Option<SchedHandle>, /// A fast XorShift rng for scheduler use rng: XorShiftRng, - /// An IdleWatcher - idle_watcher: Option<IdleWatcher>, - /// A flag to indicate whether or not the idle callback is active. - idle_flag: bool + /// A toggleable idle callback + idle_callback: ~PausibleIdleCallback +} + +enum CleanupJob { + DoNothing, + GiveTask(~Task, UnsafeTaskReceiver) } pub struct SchedHandle { @@ -97,11 +99,6 @@ pub enum SchedMessage { TaskFromFriend(~Task) } -enum CleanupJob { - DoNothing, - GiveTask(~Task, UnsafeTaskReceiver) -} - impl Scheduler { pub fn sched_id(&self) -> uint { to_uint(self) } @@ -126,7 +123,10 @@ impl Scheduler { sleeper_list: SleeperList, run_anything: bool, friend: Option<SchedHandle>) - -> Scheduler { + -> Scheduler { + + let mut event_loop = event_loop; + let idle_callback = event_loop.pausible_idle_callback(); Scheduler { sleeper_list: sleeper_list, @@ -142,9 +142,8 @@ impl Scheduler { metrics: SchedMetrics::new(), run_anything: run_anything, friend_handle: friend, - rng: XorShiftRng::new(), - idle_watcher: None, - idle_flag: false + rng: XorShiftRng::new(), + idle_callback: idle_callback } } @@ -172,7 +171,7 @@ impl Scheduler { // Before starting our first task, make sure the idle callback // is active. As we do not start in the sleep state this is // important. - this.activate_idle(); + this.idle_callback.start(Scheduler::run_sched_once); // Now, as far as all the scheduler state is concerned, we are // inside the "scheduler" context. So we can act like the @@ -194,7 +193,7 @@ impl Scheduler { // cleaning up the memory it uses. As we didn't actually call // task.run() on the scheduler task we never get through all // the cleanup code it runs. - let mut stask = Local::take::<Task>(); + let mut stask = Local::take::<Task>(); rtdebug!("stopping scheduler %u", stask.sched.get_ref().sched_id()); @@ -202,6 +201,9 @@ impl Scheduler { let message = stask.sched.get_mut_ref().message_queue.pop(); assert!(message.is_none()); + // Close the idle callback. + stask.sched.get_mut_ref().idle_callback.close(); + stask.destroyed = true; } @@ -211,11 +213,6 @@ impl Scheduler { let mut self_sched = self; - // Always run through the scheduler loop at least once so that - // we enter the sleep state and can then be woken up by other - // schedulers. -// self_sched.event_loop.callback(Scheduler::run_sched_once); - // This is unsafe because we need to place the scheduler, with // the event_loop inside, inside our task. But we still need a // mutable reference to the event_loop to give it the "run" @@ -252,7 +249,7 @@ impl Scheduler { // Assume that we need to continue idling unless we reach the // end of this function without performing an action. - sched.activate_idle(); + sched.idle_callback.resume(); // Our first task is to read mail to see if we have important // messages. @@ -300,12 +297,12 @@ impl Scheduler { let handle = sched.make_handle(); sched.sleeper_list.push(handle); // Since we are sleeping, deactivate the idle callback. - sched.pause_idle(); + sched.idle_callback.pause(); } else { rtdebug!("not sleeping, already doing so or no_sleep set"); // We may not be sleeping, but we still need to deactivate // the idle callback. - sched.pause_idle(); + sched.idle_callback.pause(); } // Finished a cycle without using the Scheduler. Place it back @@ -313,46 +310,6 @@ impl Scheduler { Local::put(sched); } - fn activate_idle(&mut self) { - rtdebug!("activating idle"); - if self.idle_flag { - rtassert!(self.idle_watcher.is_some()); - rtdebug!("idle flag already set, not reactivating idle watcher"); - } else { - rtdebug!("idle flag was false, reactivating idle watcher"); - self.idle_flag = true; - if self.idle_watcher.is_none() { - // There's no idle handle yet. Create one - let mut idle_watcher = IdleWatcher::new(self.event_loop.uvio.uv_loop()); - do idle_watcher.start |_idle_watcher, _status| { - Scheduler::run_sched_once(); - } - self.idle_watcher = Some(idle_watcher); - } else { - self.idle_watcher.get_mut_ref().restart(); - } - } - } - - fn pause_idle(&mut self) { - rtassert!(self.idle_watcher.is_some()); - rtassert!(self.idle_flag); - - rtdebug!("stopping idle watcher"); - - self.idle_flag = false; - if !self.no_sleep { - self.idle_watcher.get_mut_ref().stop(); - } else { - rtdebug!("closing idle watcher"); - // The scheduler is trying to exit. Destroy the idle watcher - // to drop the reference to the event loop. - let mut idle_watcher = self.idle_watcher.take_unwrap(); - idle_watcher.stop(); - idle_watcher.close(||()); - } - } - pub fn make_handle(&mut self) -> SchedHandle { let remote = self.event_loop.remote_callback(Scheduler::run_sched_once); @@ -376,10 +333,9 @@ impl Scheduler { // We push the task onto our local queue clone. this.work_queue.push(task); -// this.event_loop.callback(Scheduler::run_sched_once); // There is definitely work to be done later. Make sure we wake up for it. - this.activate_idle(); + this.idle_callback.resume(); // We've made work available. Notify a // sleeping scheduler. @@ -420,28 +376,23 @@ impl Scheduler { let mut this = self; match this.message_queue.pop() { Some(PinnedTask(task)) => { -// this.event_loop.callback(Scheduler::run_sched_once); let mut task = task; task.give_home(Sched(this.make_handle())); this.resume_task_immediately(task); return None; } Some(TaskFromFriend(task)) => { -// this.event_loop.callback(Scheduler::run_sched_once); rtdebug!("got a task from a friend. lovely!"); this.sched_schedule_task(task).map_move(Local::put); return None; } Some(Wake) => { -// this.event_loop.callback(Scheduler::run_sched_once); this.sleepy = false; Local::put(this); return None; -// return Some(this); } Some(Shutdown) => { rtdebug!("shutting down"); -// this.event_loop.callback(Scheduler::run_sched_once); if this.sleepy { // There may be an outstanding handle on the // sleeper list. Pop them all to make sure that's @@ -463,7 +414,6 @@ impl Scheduler { Local::put(this); return None; -// return Some(this); } None => { return Some(this); diff --git a/src/libstd/rt/uv/uvio.rs b/src/libstd/rt/uv/uvio.rs index 1ad295641a7..12a6f6f37f7 100644 --- a/src/libstd/rt/uv/uvio.rs +++ b/src/libstd/rt/uv/uvio.rs @@ -116,6 +116,15 @@ impl EventLoop for UvEventLoop { } } + fn pausible_idle_callback(&mut self) -> ~PausibleIdleCallback { + let idle_watcher = IdleWatcher::new(self.uvio.uv_loop()); + return ~UvPausibleIdleCallback { + watcher: idle_watcher, + idle_flag: false, + closed: false + }; + } + fn callback_ms(&mut self, ms: u64, f: ~fn()) { let mut timer = TimerWatcher::new(self.uvio.uv_loop()); do timer.start(ms, 0) |timer, status| { @@ -134,6 +143,44 @@ impl EventLoop for UvEventLoop { } } +pub struct UvPausibleIdleCallback { + watcher: IdleWatcher, + idle_flag: bool, + closed: bool +} + +impl UvPausibleIdleCallback { + #[inline] + pub fn start(&mut self, f: ~fn()) { + do self.watcher.start |_idle_watcher, _status| { + f(); + }; + self.idle_flag = true; + } + #[inline] + pub fn pause(&mut self) { + if self.idle_flag == true { + self.watcher.stop(); + self.idle_flag = false; + } + } + #[inline] + pub fn resume(&mut self) { + if self.idle_flag == false { + self.watcher.restart(); + self.idle_flag = true; + } + } + #[inline] + pub fn close(&mut self) { + self.pause(); + if !self.closed { + self.closed = true; + self.watcher.close(||()); + } + } +} + #[test] fn test_callback_run_once() { do run_in_bare_thread { @@ -163,24 +210,31 @@ impl UvRemoteCallback { let async = do AsyncWatcher::new(loop_) |watcher, status| { assert!(status.is_none()); - // The synchronization logic here is subtle. To review, the uv async handle - // type promises that, after it is triggered the remote callback is definitely - // called at least once. UvRemoteCallback needs to maintain those semantics - // while also shutting down cleanly from the dtor. In our case that means that, - // when the UvRemoteCallback dtor calls `async.send()`, here `f` is always called - // later. - - // In the dtor both the exit flag is set and the async callback fired under a lock. - // Here, before calling `f`, we take the lock and check the flag. Because we are - // checking the flag before calling `f`, and the flag is set under the same lock - // as the send, then if the flag is set then we're guaranteed to call `f` after - // the final send. - - // If the check was done after `f()` then there would be a period between that call - // and the check where the dtor could be called in the other thread, missing the - // final callback while still destroying the handle. - - let should_exit = unsafe { exit_flag_clone.with_imm(|&should_exit| should_exit) }; + // The synchronization logic here is subtle. To review, + // the uv async handle type promises that, after it is + // triggered the remote callback is definitely called at + // least once. UvRemoteCallback needs to maintain those + // semantics while also shutting down cleanly from the + // dtor. In our case that means that, when the + // UvRemoteCallback dtor calls `async.send()`, here `f` is + // always called later. + + // In the dtor both the exit flag is set and the async + // callback fired under a lock. Here, before calling `f`, + // we take the lock and check the flag. Because we are + // checking the flag before calling `f`, and the flag is + // set under the same lock as the send, then if the flag + // is set then we're guaranteed to call `f` after the + // final send. + + // If the check was done after `f()` then there would be a + // period between that call and the check where the dtor + // could be called in the other thread, missing the final + // callback while still destroying the handle. + + let should_exit = unsafe { + exit_flag_clone.with_imm(|&should_exit| should_exit) + }; f(); |
