diff options
| author | bors <bors@rust-lang.org> | 2013-10-25 00:46:11 -0700 |
|---|---|---|
| committer | bors <bors@rust-lang.org> | 2013-10-25 00:46:11 -0700 |
| commit | deeca5d586bfaa4aa60246f671a8d611d38f6248 (patch) | |
| tree | a0aaa22d41ef38f3ef93ab72e399860edd31ff25 /src/libstd/rt | |
| parent | ac82d185b0a9d04bb4e85578aad558da784a2be4 (diff) | |
| parent | 64a5c3bc1ee869990f8205374f9dac837a475dbd (diff) | |
| download | rust-deeca5d586bfaa4aa60246f671a8d611d38f6248.tar.gz rust-deeca5d586bfaa4aa60246f671a8d611d38f6248.zip | |
auto merge of #10054 : alexcrichton/rust/basic-event-loop, r=brson
This is more progress towards #9128 and all its related tree of issues. This implements a new `BasicLoop` on top of pthreads synchronization primitives (wrapped in `LittleLock`). This also removes the wonky `callback_ms` function from the interface of the event loop. After #9901 is taking forever to land, I'm going to try to do all this runtime work in much smaller chunks than before. Right now this will not work unless #9901 lands first, but I'm close to landing it (hopefully), and I wanted to go ahead and get this reviewed before throwing it at bors later on down the road. This "pausible idle callback" is also a bit of a weird idea, but it wasn't as difficult to implement as callback_ms so I'm more semi-ok with it.
Diffstat (limited to 'src/libstd/rt')
| -rw-r--r-- | src/libstd/rt/basic.rs | 256 | ||||
| -rw-r--r-- | src/libstd/rt/io/mod.rs | 7 | ||||
| -rw-r--r-- | src/libstd/rt/mod.rs | 3 | ||||
| -rw-r--r-- | src/libstd/rt/rtio.rs | 1 | ||||
| -rw-r--r-- | src/libstd/rt/sched.rs | 39 | ||||
| -rw-r--r-- | src/libstd/rt/task.rs | 4 | ||||
| -rw-r--r-- | src/libstd/rt/test.rs | 44 | ||||
| -rw-r--r-- | src/libstd/rt/uv/uvio.rs | 9 |
8 files changed, 329 insertions, 34 deletions
diff --git a/src/libstd/rt/basic.rs b/src/libstd/rt/basic.rs new file mode 100644 index 00000000000..86d3f8a52ba --- /dev/null +++ b/src/libstd/rt/basic.rs @@ -0,0 +1,256 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +//! This is a basic event loop implementation not meant for any "real purposes" +//! other than testing the scheduler and proving that it's possible to have a +//! pluggable event loop. + +use prelude::*; + +use cast; +use rt::rtio::{EventLoop, IoFactory, RemoteCallback, PausibleIdleCallback}; +use unstable::sync::Exclusive; +use util; + +/// This is the only exported function from this module. +pub fn event_loop() -> ~EventLoop { + ~BasicLoop::new() as ~EventLoop +} + +struct BasicLoop { + work: ~[~fn()], // pending work + idle: Option<*BasicPausible>, // only one is allowed + remotes: ~[(uint, ~fn())], + next_remote: uint, + messages: Exclusive<~[Message]> +} + +enum Message { RunRemote(uint), RemoveRemote(uint) } + +struct Time { + sec: u64, + nsec: u64, +} + +impl Ord for Time { + fn lt(&self, other: &Time) -> bool { + self.sec < other.sec || self.nsec < other.nsec + } +} + +impl BasicLoop { + fn new() -> BasicLoop { + BasicLoop { + work: ~[], + idle: None, + next_remote: 0, + remotes: ~[], + messages: Exclusive::new(~[]), + } + } + + /// Process everything in the work queue (continually) + fn work(&mut self) { + while self.work.len() > 0 { + for work in util::replace(&mut self.work, ~[]).move_iter() { + work(); + } + } + } + + fn remote_work(&mut self) { + let messages = unsafe { + do self.messages.with |messages| { + if messages.len() > 0 { + Some(util::replace(messages, ~[])) + } else { + None + } + } + }; + let messages = match messages { + Some(m) => m, None => return + }; + for message in messages.iter() { + self.message(*message); + } + } + + fn message(&mut self, message: Message) { + match message { + RunRemote(i) => { + match self.remotes.iter().find(|& &(id, _)| id == i) { + Some(&(_, ref f)) => (*f)(), + None => unreachable!() + } + } + RemoveRemote(i) => { + match self.remotes.iter().position(|&(id, _)| id == i) { + Some(i) => { self.remotes.remove(i); } + None => unreachable!() + } + } + } + } + + /// Run the idle callback if one is registered + fn idle(&mut self) { + unsafe { + match self.idle { + Some(idle) => { + if (*idle).active { + (*(*idle).work.get_ref())(); + } + } + None => {} + } + } + } + + fn has_idle(&self) -> bool { + unsafe { self.idle.is_some() && (**self.idle.get_ref()).active } + } +} + +impl EventLoop for BasicLoop { + fn run(&mut self) { + // Not exactly efficient, but it gets the job done. + while self.remotes.len() > 0 || self.work.len() > 0 || self.has_idle() { + + self.work(); + self.remote_work(); + + if self.has_idle() { + self.idle(); + continue + } + + unsafe { + // We block here if we have no messages to process and we may + // receive a message at a later date + do self.messages.hold_and_wait |messages| { + self.remotes.len() > 0 && + messages.len() == 0 && + self.work.len() == 0 + } + } + } + } + + fn callback(&mut self, f: ~fn()) { + self.work.push(f); + } + + // XXX: Seems like a really weird requirement to have an event loop provide. + fn pausible_idle_callback(&mut self) -> ~PausibleIdleCallback { + let callback = ~BasicPausible::new(self); + rtassert!(self.idle.is_none()); + unsafe { + let cb_ptr: &*BasicPausible = cast::transmute(&callback); + self.idle = Some(*cb_ptr); + } + return callback as ~PausibleIdleCallback; + } + + fn remote_callback(&mut self, f: ~fn()) -> ~RemoteCallback { + let id = self.next_remote; + self.next_remote += 1; + self.remotes.push((id, f)); + ~BasicRemote::new(self.messages.clone(), id) as ~RemoteCallback + } + + /// This has no bindings for local I/O + fn io<'a>(&'a mut self, _: &fn(&'a mut IoFactory)) {} +} + +struct BasicRemote { + queue: Exclusive<~[Message]>, + id: uint, +} + +impl BasicRemote { + fn new(queue: Exclusive<~[Message]>, id: uint) -> BasicRemote { + BasicRemote { queue: queue, id: id } + } +} + +impl RemoteCallback for BasicRemote { + fn fire(&mut self) { + unsafe { + do self.queue.hold_and_signal |queue| { + queue.push(RunRemote(self.id)); + } + } + } +} + +impl Drop for BasicRemote { + fn drop(&mut self) { + unsafe { + do self.queue.hold_and_signal |queue| { + queue.push(RemoveRemote(self.id)); + } + } + } +} + +struct BasicPausible { + eloop: *mut BasicLoop, + work: Option<~fn()>, + active: bool, +} + +impl BasicPausible { + fn new(eloop: &mut BasicLoop) -> BasicPausible { + BasicPausible { + active: false, + work: None, + eloop: eloop, + } + } +} + +impl PausibleIdleCallback for BasicPausible { + fn start(&mut self, f: ~fn()) { + rtassert!(!self.active && self.work.is_none()); + self.active = true; + self.work = Some(f); + } + fn pause(&mut self) { + self.active = false; + } + fn resume(&mut self) { + self.active = true; + } + fn close(&mut self) { + self.active = false; + self.work = None; + } +} + +impl Drop for BasicPausible { + fn drop(&mut self) { + unsafe { + (*self.eloop).idle = None; + } + } +} + +fn time() -> Time { + #[fixed_stack_segment]; #[inline(never)]; + extern { + fn get_time(sec: &mut i64, nsec: &mut i32); + } + let mut sec = 0; + let mut nsec = 0; + unsafe { get_time(&mut sec, &mut nsec) } + + Time { sec: sec as u64, nsec: nsec as u64 } +} diff --git a/src/libstd/rt/io/mod.rs b/src/libstd/rt/io/mod.rs index 758c9779165..decf801d592 100644 --- a/src/libstd/rt/io/mod.rs +++ b/src/libstd/rt/io/mod.rs @@ -606,6 +606,13 @@ pub fn standard_error(kind: IoErrorKind) -> IoError { detail: None } } + IoUnavailable => { + IoError { + kind: IoUnavailable, + desc: "I/O is unavailable", + detail: None + } + } _ => fail!() } } diff --git a/src/libstd/rt/mod.rs b/src/libstd/rt/mod.rs index 66d7a6bf488..5113c28aa08 100644 --- a/src/libstd/rt/mod.rs +++ b/src/libstd/rt/mod.rs @@ -102,6 +102,9 @@ pub mod shouldnt_be_public { // Internal macros used by the runtime. mod macros; +/// Basic implementation of an EventLoop, provides no I/O interfaces +mod basic; + /// The global (exchange) heap. pub mod global_heap; diff --git a/src/libstd/rt/rtio.rs b/src/libstd/rt/rtio.rs index 66a0676a2f4..29f728a5e0c 100644 --- a/src/libstd/rt/rtio.rs +++ b/src/libstd/rt/rtio.rs @@ -28,7 +28,6 @@ pub trait EventLoop { fn run(&mut self); fn callback(&mut self, ~fn()); fn pausible_idle_callback(&mut self) -> ~PausibleIdleCallback; - fn callback_ms(&mut self, ms: u64, ~fn()); fn remote_callback(&mut self, ~fn()) -> ~RemoteCallback; /// The asynchronous I/O services. Not all event loops may provide one diff --git a/src/libstd/rt/sched.rs b/src/libstd/rt/sched.rs index 6e661884616..b008a8a74f2 100644 --- a/src/libstd/rt/sched.rs +++ b/src/libstd/rt/sched.rs @@ -62,8 +62,6 @@ pub struct Scheduler { /// no longer try to go to sleep, but exit instead. no_sleep: bool, stack_pool: StackPool, - /// The event loop used to drive the scheduler and perform I/O - event_loop: ~EventLoop, /// The scheduler runs on a special task. When it is not running /// it is stored here instead of the work queue. priv sched_task: Option<~Task>, @@ -85,7 +83,17 @@ pub struct Scheduler { priv yield_check_count: uint, /// A flag to tell the scheduler loop it needs to do some stealing /// in order to introduce randomness as part of a yield - priv steal_for_yield: bool + priv steal_for_yield: bool, + + // n.b. currently destructors of an object are run in top-to-bottom in order + // of field declaration. Due to its nature, the pausible idle callback + // must have some sort of handle to the event loop, so it needs to get + // destroyed before the event loop itself. For this reason, we destroy + // the event loop last to ensure that any unsafe references to it are + // destroyed before it's actually destroyed. + + /// The event loop used to drive the scheduler and perform I/O + event_loop: ~EventLoop, } /// An indication of how hard to work on a given operation, the difference @@ -905,7 +913,7 @@ mod test { use cell::Cell; use rt::thread::Thread; use rt::task::{Task, Sched}; - use rt::rtio::EventLoop; + use rt::basic; use rt::util; use option::{Some}; @@ -1005,7 +1013,6 @@ mod test { #[test] fn test_schedule_home_states() { - use rt::uv::uvio::UvEventLoop; use rt::sleeper_list::SleeperList; use rt::work_queue::WorkQueue; use rt::sched::Shutdown; @@ -1021,7 +1028,7 @@ mod test { // Our normal scheduler let mut normal_sched = ~Scheduler::new( - ~UvEventLoop::new() as ~EventLoop, + basic::event_loop(), normal_queue, queues.clone(), sleepers.clone()); @@ -1032,7 +1039,7 @@ mod test { // Our special scheduler let mut special_sched = ~Scheduler::new_special( - ~UvEventLoop::new() as ~EventLoop, + basic::event_loop(), special_queue.clone(), queues.clone(), sleepers.clone(), @@ -1137,22 +1144,15 @@ mod test { #[test] fn test_io_callback() { + use rt::io::timer; + // This is a regression test that when there are no schedulable tasks // in the work queue, but we are performing I/O, that once we do put // something in the work queue again the scheduler picks it up and doesn't // exit before emptying the work queue - do run_in_newsched_task { + do run_in_uv_task { do spawntask { - let sched: ~Scheduler = Local::take(); - do sched.deschedule_running_task_and_then |sched, task| { - let task = Cell::new(task); - do sched.event_loop.callback_ms(10) { - rtdebug!("in callback"); - let mut sched: ~Scheduler = Local::take(); - sched.enqueue_blocked_task(task.take()); - Local::put(sched); - } - } + timer::sleep(10); } } } @@ -1192,7 +1192,6 @@ mod test { use rt::work_queue::WorkQueue; use rt::sleeper_list::SleeperList; use rt::stack::StackPool; - use rt::uv::uvio::UvEventLoop; use rt::sched::{Shutdown, TaskFromFriend}; use util; @@ -1203,7 +1202,7 @@ mod test { let queues = ~[queue.clone()]; let mut sched = ~Scheduler::new( - ~UvEventLoop::new() as ~EventLoop, + basic::event_loop(), queue, queues.clone(), sleepers.clone()); diff --git a/src/libstd/rt/task.rs b/src/libstd/rt/task.rs index 1ea68bb52d7..7bf124ad312 100644 --- a/src/libstd/rt/task.rs +++ b/src/libstd/rt/task.rs @@ -637,7 +637,7 @@ mod test { #[test] fn rng() { - do run_in_newsched_task() { + do run_in_uv_task() { use rand::{rng, Rng}; let mut r = rng(); let _ = r.next_u32(); @@ -646,7 +646,7 @@ mod test { #[test] fn logging() { - do run_in_newsched_task() { + do run_in_uv_task() { info!("here i am. logging in a newsched task"); } } diff --git a/src/libstd/rt/test.rs b/src/libstd/rt/test.rs index c238b1dfba1..e4bbfe0a5a3 100644 --- a/src/libstd/rt/test.rs +++ b/src/libstd/rt/test.rs @@ -21,6 +21,7 @@ use iter::{Iterator, range}; use super::io::net::ip::{SocketAddr, Ipv4Addr, Ipv6Addr}; use vec::{OwnedVector, MutableVector, ImmutableVector}; use path::GenericPath; +use rt::basic; use rt::sched::Scheduler; use rt::rtio::EventLoop; use unstable::{run_in_bare_thread}; @@ -48,6 +49,28 @@ pub fn new_test_uv_sched() -> Scheduler { } +pub fn new_test_sched() -> Scheduler { + + let queue = WorkQueue::new(); + let queues = ~[queue.clone()]; + + let mut sched = Scheduler::new(basic::event_loop(), + queue, + queues, + SleeperList::new()); + + // Don't wait for the Shutdown message + sched.no_sleep = true; + return sched; +} + +pub fn run_in_uv_task(f: ~fn()) { + let f = Cell::new(f); + do run_in_bare_thread { + run_in_uv_task_core(f.take()); + } +} + pub fn run_in_newsched_task(f: ~fn()) { let f = Cell::new(f); do run_in_bare_thread { @@ -55,7 +78,7 @@ pub fn run_in_newsched_task(f: ~fn()) { } } -pub fn run_in_newsched_task_core(f: ~fn()) { +pub fn run_in_uv_task_core(f: ~fn()) { use rt::sched::Shutdown; @@ -72,6 +95,23 @@ pub fn run_in_newsched_task_core(f: ~fn()) { sched.bootstrap(task); } +pub fn run_in_newsched_task_core(f: ~fn()) { + + use rt::sched::Shutdown; + + let mut sched = ~new_test_sched(); + let exit_handle = Cell::new(sched.make_handle()); + + let on_exit: ~fn(bool) = |exit_status| { + exit_handle.take().send(Shutdown); + rtassert!(exit_status); + }; + let mut task = ~Task::new_root(&mut sched.stack_pool, None, f); + task.death.on_exit = Some(on_exit); + + sched.bootstrap(task); +} + #[cfg(target_os="macos")] #[allow(non_camel_case_types)] mod darwin_fd_limit { @@ -310,7 +350,7 @@ pub fn spawntask_thread(f: ~fn()) -> Thread { /// Get a ~Task for testing purposes other than actually scheduling it. pub fn with_test_task(blk: ~fn(~Task) -> ~Task) { do run_in_bare_thread { - let mut sched = ~new_test_uv_sched(); + let mut sched = ~new_test_sched(); let task = blk(~Task::new_root(&mut sched.stack_pool, None, ||{})); cleanup_task(task); } diff --git a/src/libstd/rt/uv/uvio.rs b/src/libstd/rt/uv/uvio.rs index 29370c484eb..eee89365fb5 100644 --- a/src/libstd/rt/uv/uvio.rs +++ b/src/libstd/rt/uv/uvio.rs @@ -222,15 +222,6 @@ impl EventLoop for UvEventLoop { } as ~PausibleIdleCallback } - fn callback_ms(&mut self, ms: u64, f: ~fn()) { - let mut timer = TimerWatcher::new(self.uvio.uv_loop()); - do timer.start(ms, 0) |timer, status| { - assert!(status.is_none()); - timer.close(||()); - f(); - } - } - fn remote_callback(&mut self, f: ~fn()) -> ~RemoteCallback { ~UvRemoteCallback::new(self.uvio.uv_loop(), f) as ~RemoteCallback } |
