about summary refs log tree commit diff
path: root/src/libstd/rt
diff options
context:
space:
mode:
authorbors <bors@rust-lang.org>2013-10-25 00:46:11 -0700
committerbors <bors@rust-lang.org>2013-10-25 00:46:11 -0700
commitdeeca5d586bfaa4aa60246f671a8d611d38f6248 (patch)
treea0aaa22d41ef38f3ef93ab72e399860edd31ff25 /src/libstd/rt
parentac82d185b0a9d04bb4e85578aad558da784a2be4 (diff)
parent64a5c3bc1ee869990f8205374f9dac837a475dbd (diff)
downloadrust-deeca5d586bfaa4aa60246f671a8d611d38f6248.tar.gz
rust-deeca5d586bfaa4aa60246f671a8d611d38f6248.zip
auto merge of #10054 : alexcrichton/rust/basic-event-loop, r=brson
This is more progress towards #9128 and all its related tree of issues. This implements a new `BasicLoop` on top of pthreads synchronization primitives (wrapped in `LittleLock`). This also removes the wonky `callback_ms` function from the interface of the event loop.

After #9901 is taking forever to land, I'm going to try to do all this runtime work in much smaller chunks than before. Right now this will not work unless #9901 lands first, but I'm close to landing it (hopefully), and I wanted to go ahead and get this reviewed before throwing it at bors later on down the road.

This "pausible idle callback" is also a bit of a weird idea, but it wasn't as difficult to implement as callback_ms so I'm more semi-ok with it.
Diffstat (limited to 'src/libstd/rt')
-rw-r--r--src/libstd/rt/basic.rs256
-rw-r--r--src/libstd/rt/io/mod.rs7
-rw-r--r--src/libstd/rt/mod.rs3
-rw-r--r--src/libstd/rt/rtio.rs1
-rw-r--r--src/libstd/rt/sched.rs39
-rw-r--r--src/libstd/rt/task.rs4
-rw-r--r--src/libstd/rt/test.rs44
-rw-r--r--src/libstd/rt/uv/uvio.rs9
8 files changed, 329 insertions, 34 deletions
diff --git a/src/libstd/rt/basic.rs b/src/libstd/rt/basic.rs
new file mode 100644
index 00000000000..86d3f8a52ba
--- /dev/null
+++ b/src/libstd/rt/basic.rs
@@ -0,0 +1,256 @@
+// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+//! This is a basic event loop implementation not meant for any "real purposes"
+//! other than testing the scheduler and proving that it's possible to have a
+//! pluggable event loop.
+
+use prelude::*;
+
+use cast;
+use rt::rtio::{EventLoop, IoFactory, RemoteCallback, PausibleIdleCallback};
+use unstable::sync::Exclusive;
+use util;
+
+/// This is the only exported function from this module.
+pub fn event_loop() -> ~EventLoop {
+    ~BasicLoop::new() as ~EventLoop
+}
+
+struct BasicLoop {
+    work: ~[~fn()],               // pending work
+    idle: Option<*BasicPausible>, // only one is allowed
+    remotes: ~[(uint, ~fn())],
+    next_remote: uint,
+    messages: Exclusive<~[Message]>
+}
+
+enum Message { RunRemote(uint), RemoveRemote(uint) }
+
+struct Time {
+    sec: u64,
+    nsec: u64,
+}
+
+impl Ord for Time {
+    fn lt(&self, other: &Time) -> bool {
+        self.sec < other.sec || self.nsec < other.nsec
+    }
+}
+
+impl BasicLoop {
+    fn new() -> BasicLoop {
+        BasicLoop {
+            work: ~[],
+            idle: None,
+            next_remote: 0,
+            remotes: ~[],
+            messages: Exclusive::new(~[]),
+        }
+    }
+
+    /// Process everything in the work queue (continually)
+    fn work(&mut self) {
+        while self.work.len() > 0 {
+            for work in util::replace(&mut self.work, ~[]).move_iter() {
+                work();
+            }
+        }
+    }
+
+    fn remote_work(&mut self) {
+        let messages = unsafe {
+            do self.messages.with |messages| {
+                if messages.len() > 0 {
+                    Some(util::replace(messages, ~[]))
+                } else {
+                    None
+                }
+            }
+        };
+        let messages = match messages {
+            Some(m) => m, None => return
+        };
+        for message in messages.iter() {
+            self.message(*message);
+        }
+    }
+
+    fn message(&mut self, message: Message) {
+        match message {
+            RunRemote(i) => {
+                match self.remotes.iter().find(|& &(id, _)| id == i) {
+                    Some(&(_, ref f)) => (*f)(),
+                    None => unreachable!()
+                }
+            }
+            RemoveRemote(i) => {
+                match self.remotes.iter().position(|&(id, _)| id == i) {
+                    Some(i) => { self.remotes.remove(i); }
+                    None => unreachable!()
+                }
+            }
+        }
+    }
+
+    /// Run the idle callback if one is registered
+    fn idle(&mut self) {
+        unsafe {
+            match self.idle {
+                Some(idle) => {
+                    if (*idle).active {
+                        (*(*idle).work.get_ref())();
+                    }
+                }
+                None => {}
+            }
+        }
+    }
+
+    fn has_idle(&self) -> bool {
+        unsafe { self.idle.is_some() && (**self.idle.get_ref()).active }
+    }
+}
+
+impl EventLoop for BasicLoop {
+    fn run(&mut self) {
+        // Not exactly efficient, but it gets the job done.
+        while self.remotes.len() > 0 || self.work.len() > 0 || self.has_idle() {
+
+            self.work();
+            self.remote_work();
+
+            if self.has_idle() {
+                self.idle();
+                continue
+            }
+
+            unsafe {
+                // We block here if we have no messages to process and we may
+                // receive a message at a later date
+                do self.messages.hold_and_wait |messages| {
+                    self.remotes.len() > 0 &&
+                        messages.len() == 0 &&
+                        self.work.len() == 0
+                }
+            }
+        }
+    }
+
+    fn callback(&mut self, f: ~fn()) {
+        self.work.push(f);
+    }
+
+    // XXX: Seems like a really weird requirement to have an event loop provide.
+    fn pausible_idle_callback(&mut self) -> ~PausibleIdleCallback {
+        let callback = ~BasicPausible::new(self);
+        rtassert!(self.idle.is_none());
+        unsafe {
+            let cb_ptr: &*BasicPausible = cast::transmute(&callback);
+            self.idle = Some(*cb_ptr);
+        }
+        return callback as ~PausibleIdleCallback;
+    }
+
+    fn remote_callback(&mut self, f: ~fn()) -> ~RemoteCallback {
+        let id = self.next_remote;
+        self.next_remote += 1;
+        self.remotes.push((id, f));
+        ~BasicRemote::new(self.messages.clone(), id) as ~RemoteCallback
+    }
+
+    /// This has no bindings for local I/O
+    fn io<'a>(&'a mut self, _: &fn(&'a mut IoFactory)) {}
+}
+
+struct BasicRemote {
+    queue: Exclusive<~[Message]>,
+    id: uint,
+}
+
+impl BasicRemote {
+    fn new(queue: Exclusive<~[Message]>, id: uint) -> BasicRemote {
+        BasicRemote { queue: queue, id: id }
+    }
+}
+
+impl RemoteCallback for BasicRemote {
+    fn fire(&mut self) {
+        unsafe {
+            do self.queue.hold_and_signal |queue| {
+                queue.push(RunRemote(self.id));
+            }
+        }
+    }
+}
+
+impl Drop for BasicRemote {
+    fn drop(&mut self) {
+        unsafe {
+            do self.queue.hold_and_signal |queue| {
+                queue.push(RemoveRemote(self.id));
+            }
+        }
+    }
+}
+
+struct BasicPausible {
+    eloop: *mut BasicLoop,
+    work: Option<~fn()>,
+    active: bool,
+}
+
+impl BasicPausible {
+    fn new(eloop: &mut BasicLoop) -> BasicPausible {
+        BasicPausible {
+            active: false,
+            work: None,
+            eloop: eloop,
+        }
+    }
+}
+
+impl PausibleIdleCallback for BasicPausible {
+    fn start(&mut self, f: ~fn()) {
+        rtassert!(!self.active && self.work.is_none());
+        self.active = true;
+        self.work = Some(f);
+    }
+    fn pause(&mut self) {
+        self.active = false;
+    }
+    fn resume(&mut self) {
+        self.active = true;
+    }
+    fn close(&mut self) {
+        self.active = false;
+        self.work = None;
+    }
+}
+
+impl Drop for BasicPausible {
+    fn drop(&mut self) {
+        unsafe {
+            (*self.eloop).idle = None;
+        }
+    }
+}
+
+fn time() -> Time {
+    #[fixed_stack_segment]; #[inline(never)];
+    extern {
+        fn get_time(sec: &mut i64, nsec: &mut i32);
+    }
+    let mut sec = 0;
+    let mut nsec = 0;
+    unsafe { get_time(&mut sec, &mut nsec) }
+
+    Time { sec: sec as u64, nsec: nsec as u64 }
+}
diff --git a/src/libstd/rt/io/mod.rs b/src/libstd/rt/io/mod.rs
index 758c9779165..decf801d592 100644
--- a/src/libstd/rt/io/mod.rs
+++ b/src/libstd/rt/io/mod.rs
@@ -606,6 +606,13 @@ pub fn standard_error(kind: IoErrorKind) -> IoError {
                 detail: None
             }
         }
+        IoUnavailable => {
+            IoError {
+                kind: IoUnavailable,
+                desc: "I/O is unavailable",
+                detail: None
+            }
+        }
         _ => fail!()
     }
 }
diff --git a/src/libstd/rt/mod.rs b/src/libstd/rt/mod.rs
index 66d7a6bf488..5113c28aa08 100644
--- a/src/libstd/rt/mod.rs
+++ b/src/libstd/rt/mod.rs
@@ -102,6 +102,9 @@ pub mod shouldnt_be_public {
 // Internal macros used by the runtime.
 mod macros;
 
+/// Basic implementation of an EventLoop, provides no I/O interfaces
+mod basic;
+
 /// The global (exchange) heap.
 pub mod global_heap;
 
diff --git a/src/libstd/rt/rtio.rs b/src/libstd/rt/rtio.rs
index 66a0676a2f4..29f728a5e0c 100644
--- a/src/libstd/rt/rtio.rs
+++ b/src/libstd/rt/rtio.rs
@@ -28,7 +28,6 @@ pub trait EventLoop {
     fn run(&mut self);
     fn callback(&mut self, ~fn());
     fn pausible_idle_callback(&mut self) -> ~PausibleIdleCallback;
-    fn callback_ms(&mut self, ms: u64, ~fn());
     fn remote_callback(&mut self, ~fn()) -> ~RemoteCallback;
 
     /// The asynchronous I/O services. Not all event loops may provide one
diff --git a/src/libstd/rt/sched.rs b/src/libstd/rt/sched.rs
index 6e661884616..b008a8a74f2 100644
--- a/src/libstd/rt/sched.rs
+++ b/src/libstd/rt/sched.rs
@@ -62,8 +62,6 @@ pub struct Scheduler {
     /// no longer try to go to sleep, but exit instead.
     no_sleep: bool,
     stack_pool: StackPool,
-    /// The event loop used to drive the scheduler and perform I/O
-    event_loop: ~EventLoop,
     /// The scheduler runs on a special task. When it is not running
     /// it is stored here instead of the work queue.
     priv sched_task: Option<~Task>,
@@ -85,7 +83,17 @@ pub struct Scheduler {
     priv yield_check_count: uint,
     /// A flag to tell the scheduler loop it needs to do some stealing
     /// in order to introduce randomness as part of a yield
-    priv steal_for_yield: bool
+    priv steal_for_yield: bool,
+
+    // n.b. currently destructors of an object are run in top-to-bottom in order
+    //      of field declaration. Due to its nature, the pausible idle callback
+    //      must have some sort of handle to the event loop, so it needs to get
+    //      destroyed before the event loop itself. For this reason, we destroy
+    //      the event loop last to ensure that any unsafe references to it are
+    //      destroyed before it's actually destroyed.
+
+    /// The event loop used to drive the scheduler and perform I/O
+    event_loop: ~EventLoop,
 }
 
 /// An indication of how hard to work on a given operation, the difference
@@ -905,7 +913,7 @@ mod test {
     use cell::Cell;
     use rt::thread::Thread;
     use rt::task::{Task, Sched};
-    use rt::rtio::EventLoop;
+    use rt::basic;
     use rt::util;
     use option::{Some};
 
@@ -1005,7 +1013,6 @@ mod test {
     #[test]
     fn test_schedule_home_states() {
 
-        use rt::uv::uvio::UvEventLoop;
         use rt::sleeper_list::SleeperList;
         use rt::work_queue::WorkQueue;
         use rt::sched::Shutdown;
@@ -1021,7 +1028,7 @@ mod test {
 
             // Our normal scheduler
             let mut normal_sched = ~Scheduler::new(
-                ~UvEventLoop::new() as ~EventLoop,
+                basic::event_loop(),
                 normal_queue,
                 queues.clone(),
                 sleepers.clone());
@@ -1032,7 +1039,7 @@ mod test {
 
             // Our special scheduler
             let mut special_sched = ~Scheduler::new_special(
-                ~UvEventLoop::new() as ~EventLoop,
+                basic::event_loop(),
                 special_queue.clone(),
                 queues.clone(),
                 sleepers.clone(),
@@ -1137,22 +1144,15 @@ mod test {
 
     #[test]
     fn test_io_callback() {
+        use rt::io::timer;
+
         // This is a regression test that when there are no schedulable tasks
         // in the work queue, but we are performing I/O, that once we do put
         // something in the work queue again the scheduler picks it up and doesn't
         // exit before emptying the work queue
-        do run_in_newsched_task {
+        do run_in_uv_task {
             do spawntask {
-                let sched: ~Scheduler = Local::take();
-                do sched.deschedule_running_task_and_then |sched, task| {
-                    let task = Cell::new(task);
-                    do sched.event_loop.callback_ms(10) {
-                        rtdebug!("in callback");
-                        let mut sched: ~Scheduler = Local::take();
-                        sched.enqueue_blocked_task(task.take());
-                        Local::put(sched);
-                    }
-                }
+                timer::sleep(10);
             }
         }
     }
@@ -1192,7 +1192,6 @@ mod test {
         use rt::work_queue::WorkQueue;
         use rt::sleeper_list::SleeperList;
         use rt::stack::StackPool;
-        use rt::uv::uvio::UvEventLoop;
         use rt::sched::{Shutdown, TaskFromFriend};
         use util;
 
@@ -1203,7 +1202,7 @@ mod test {
                 let queues = ~[queue.clone()];
 
                 let mut sched = ~Scheduler::new(
-                    ~UvEventLoop::new() as ~EventLoop,
+                    basic::event_loop(),
                     queue,
                     queues.clone(),
                     sleepers.clone());
diff --git a/src/libstd/rt/task.rs b/src/libstd/rt/task.rs
index 1ea68bb52d7..7bf124ad312 100644
--- a/src/libstd/rt/task.rs
+++ b/src/libstd/rt/task.rs
@@ -637,7 +637,7 @@ mod test {
 
     #[test]
     fn rng() {
-        do run_in_newsched_task() {
+        do run_in_uv_task() {
             use rand::{rng, Rng};
             let mut r = rng();
             let _ = r.next_u32();
@@ -646,7 +646,7 @@ mod test {
 
     #[test]
     fn logging() {
-        do run_in_newsched_task() {
+        do run_in_uv_task() {
             info!("here i am. logging in a newsched task");
         }
     }
diff --git a/src/libstd/rt/test.rs b/src/libstd/rt/test.rs
index c238b1dfba1..e4bbfe0a5a3 100644
--- a/src/libstd/rt/test.rs
+++ b/src/libstd/rt/test.rs
@@ -21,6 +21,7 @@ use iter::{Iterator, range};
 use super::io::net::ip::{SocketAddr, Ipv4Addr, Ipv6Addr};
 use vec::{OwnedVector, MutableVector, ImmutableVector};
 use path::GenericPath;
+use rt::basic;
 use rt::sched::Scheduler;
 use rt::rtio::EventLoop;
 use unstable::{run_in_bare_thread};
@@ -48,6 +49,28 @@ pub fn new_test_uv_sched() -> Scheduler {
 
 }
 
+pub fn new_test_sched() -> Scheduler {
+
+    let queue = WorkQueue::new();
+    let queues = ~[queue.clone()];
+
+    let mut sched = Scheduler::new(basic::event_loop(),
+                                   queue,
+                                   queues,
+                                   SleeperList::new());
+
+    // Don't wait for the Shutdown message
+    sched.no_sleep = true;
+    return sched;
+}
+
+pub fn run_in_uv_task(f: ~fn()) {
+    let f = Cell::new(f);
+    do run_in_bare_thread {
+        run_in_uv_task_core(f.take());
+    }
+}
+
 pub fn run_in_newsched_task(f: ~fn()) {
     let f = Cell::new(f);
     do run_in_bare_thread {
@@ -55,7 +78,7 @@ pub fn run_in_newsched_task(f: ~fn()) {
     }
 }
 
-pub fn run_in_newsched_task_core(f: ~fn()) {
+pub fn run_in_uv_task_core(f: ~fn()) {
 
     use rt::sched::Shutdown;
 
@@ -72,6 +95,23 @@ pub fn run_in_newsched_task_core(f: ~fn()) {
     sched.bootstrap(task);
 }
 
+pub fn run_in_newsched_task_core(f: ~fn()) {
+
+    use rt::sched::Shutdown;
+
+    let mut sched = ~new_test_sched();
+    let exit_handle = Cell::new(sched.make_handle());
+
+    let on_exit: ~fn(bool) = |exit_status| {
+        exit_handle.take().send(Shutdown);
+        rtassert!(exit_status);
+    };
+    let mut task = ~Task::new_root(&mut sched.stack_pool, None, f);
+    task.death.on_exit = Some(on_exit);
+
+    sched.bootstrap(task);
+}
+
 #[cfg(target_os="macos")]
 #[allow(non_camel_case_types)]
 mod darwin_fd_limit {
@@ -310,7 +350,7 @@ pub fn spawntask_thread(f: ~fn()) -> Thread {
 /// Get a ~Task for testing purposes other than actually scheduling it.
 pub fn with_test_task(blk: ~fn(~Task) -> ~Task) {
     do run_in_bare_thread {
-        let mut sched = ~new_test_uv_sched();
+        let mut sched = ~new_test_sched();
         let task = blk(~Task::new_root(&mut sched.stack_pool, None, ||{}));
         cleanup_task(task);
     }
diff --git a/src/libstd/rt/uv/uvio.rs b/src/libstd/rt/uv/uvio.rs
index 29370c484eb..eee89365fb5 100644
--- a/src/libstd/rt/uv/uvio.rs
+++ b/src/libstd/rt/uv/uvio.rs
@@ -222,15 +222,6 @@ impl EventLoop for UvEventLoop {
         } as ~PausibleIdleCallback
     }
 
-    fn callback_ms(&mut self, ms: u64, f: ~fn()) {
-        let mut timer =  TimerWatcher::new(self.uvio.uv_loop());
-        do timer.start(ms, 0) |timer, status| {
-            assert!(status.is_none());
-            timer.close(||());
-            f();
-        }
-    }
-
     fn remote_callback(&mut self, f: ~fn()) -> ~RemoteCallback {
         ~UvRemoteCallback::new(self.uvio.uv_loop(), f) as ~RemoteCallback
     }