about summary refs log tree commit diff
path: root/src/libstd/rt/uv
diff options
context:
space:
mode:
authorBrian Anderson <banderson@mozilla.com>2013-05-29 18:22:28 -0700
committerBrian Anderson <banderson@mozilla.com>2013-05-29 18:22:28 -0700
commitf4ed554ddbd2dacfaa5dcc1dda99a3121f8cf2a4 (patch)
tree45f24486e6f2d6d39928462e40d2c4b5f3de2154 /src/libstd/rt/uv
parentbd30285c8467b33b6fea16be79198f7492107af3 (diff)
parent134bb0f3eeed69bbf6dc672bbbfbc802f1a018a9 (diff)
downloadrust-f4ed554ddbd2dacfaa5dcc1dda99a3121f8cf2a4.tar.gz
rust-f4ed554ddbd2dacfaa5dcc1dda99a3121f8cf2a4.zip
Merge remote-tracking branch 'brson/io' into incoming
Conflicts:
	src/libstd/rt/sched.rs
Diffstat (limited to 'src/libstd/rt/uv')
-rw-r--r--src/libstd/rt/uv/async.rs105
-rw-r--r--src/libstd/rt/uv/idle.rs62
-rw-r--r--src/libstd/rt/uv/mod.rs63
-rw-r--r--src/libstd/rt/uv/uvio.rs118
4 files changed, 271 insertions, 77 deletions
diff --git a/src/libstd/rt/uv/async.rs b/src/libstd/rt/uv/async.rs
new file mode 100644
index 00000000000..6ed06cc10b7
--- /dev/null
+++ b/src/libstd/rt/uv/async.rs
@@ -0,0 +1,105 @@
+// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+use libc::{c_int, c_void};
+use option::Some;
+use rt::uv::uvll;
+use rt::uv::uvll::UV_ASYNC;
+use rt::uv::{Watcher, Loop, NativeHandle, AsyncCallback, NullCallback};
+use rt::uv::WatcherInterop;
+use rt::uv::status_to_maybe_uv_error;
+
+pub struct AsyncWatcher(*uvll::uv_async_t);
+impl Watcher for AsyncWatcher { }
+
+impl AsyncWatcher {
+    pub fn new(loop_: &mut Loop, cb: AsyncCallback) -> AsyncWatcher {
+        unsafe {
+            let handle = uvll::malloc_handle(UV_ASYNC);
+            assert!(handle.is_not_null());
+            let mut watcher: AsyncWatcher = NativeHandle::from_native_handle(handle);
+            watcher.install_watcher_data();
+            let data = watcher.get_watcher_data();
+            data.async_cb = Some(cb);
+            assert_eq!(0, uvll::async_init(loop_.native_handle(), handle, async_cb));
+            return watcher;
+        }
+
+        extern fn async_cb(handle: *uvll::uv_async_t, status: c_int) {
+            let mut watcher: AsyncWatcher = NativeHandle::from_native_handle(handle);
+            let status = status_to_maybe_uv_error(watcher.native_handle(), status);
+            let data = watcher.get_watcher_data();
+            let cb = data.async_cb.get_ref();
+            (*cb)(watcher, status);
+        }
+    }
+
+    pub fn send(&mut self) {
+        unsafe {
+            let handle = self.native_handle();
+            uvll::async_send(handle);
+        }
+    }
+
+    pub fn close(self, cb: NullCallback) {
+        let mut this = self;
+        let data = this.get_watcher_data();
+        assert!(data.close_cb.is_none());
+        data.close_cb = Some(cb);
+
+        unsafe {
+            uvll::close(self.native_handle(), close_cb);
+        }
+
+        extern fn close_cb(handle: *uvll::uv_stream_t) {
+            let mut watcher: AsyncWatcher = NativeHandle::from_native_handle(handle);
+            {
+                let data = watcher.get_watcher_data();
+                data.close_cb.swap_unwrap()();
+            }
+            watcher.drop_watcher_data();
+            unsafe { uvll::free_handle(handle as *c_void); }
+        }
+    }
+}
+
+impl NativeHandle<*uvll::uv_async_t> for AsyncWatcher {
+    fn from_native_handle(handle: *uvll::uv_async_t) -> AsyncWatcher {
+        AsyncWatcher(handle)
+    }
+    fn native_handle(&self) -> *uvll::uv_async_t {
+        match self { &AsyncWatcher(ptr) => ptr }
+    }
+}
+
+#[cfg(test)]
+mod test {
+
+    use super::*;
+    use rt::uv::Loop;
+    use unstable::run_in_bare_thread;
+    use rt::thread::Thread;
+    use cell::Cell;
+
+    #[test]
+    fn smoke_test() {
+        do run_in_bare_thread {
+            let mut loop_ = Loop::new();
+            let watcher = AsyncWatcher::new(&mut loop_, |w, _| w.close(||()) );
+            let watcher_cell = Cell(watcher);
+            let _thread = do Thread::start {
+                let mut watcher = watcher_cell.take();
+                watcher.send();
+            };
+            loop_.run();
+            loop_.close();
+        }
+    }
+}
diff --git a/src/libstd/rt/uv/idle.rs b/src/libstd/rt/uv/idle.rs
index 2cf0b5c4872..a81ab48696a 100644
--- a/src/libstd/rt/uv/idle.rs
+++ b/src/libstd/rt/uv/idle.rs
@@ -89,3 +89,65 @@ impl NativeHandle<*uvll::uv_idle_t> for IdleWatcher {
         match self { &IdleWatcher(ptr) => ptr }
     }
 }
+
+#[cfg(test)]
+mod test {
+
+    use rt::uv::Loop;
+    use super::*;
+    use unstable::run_in_bare_thread;
+
+    #[test]
+    #[ignore(reason = "valgrind - loop destroyed before watcher?")]
+    fn idle_new_then_close() {
+        do run_in_bare_thread {
+            let mut loop_ = Loop::new();
+            let idle_watcher = { IdleWatcher::new(&mut loop_) };
+            idle_watcher.close(||());
+        }
+    }
+
+    #[test]
+    fn idle_smoke_test() {
+        do run_in_bare_thread {
+            let mut loop_ = Loop::new();
+            let mut idle_watcher = { IdleWatcher::new(&mut loop_) };
+            let mut count = 10;
+            let count_ptr: *mut int = &mut count;
+            do idle_watcher.start |idle_watcher, status| {
+                let mut idle_watcher = idle_watcher;
+                assert!(status.is_none());
+                if unsafe { *count_ptr == 10 } {
+                    idle_watcher.stop();
+                    idle_watcher.close(||());
+                } else {
+                    unsafe { *count_ptr = *count_ptr + 1; }
+                }
+            }
+            loop_.run();
+            loop_.close();
+            assert_eq!(count, 10);
+        }
+    }
+
+    #[test]
+    fn idle_start_stop_start() {
+        do run_in_bare_thread {
+            let mut loop_ = Loop::new();
+            let mut idle_watcher = { IdleWatcher::new(&mut loop_) };
+            do idle_watcher.start |idle_watcher, status| {
+                let mut idle_watcher = idle_watcher;
+                assert!(status.is_none());
+                idle_watcher.stop();
+                do idle_watcher.start |idle_watcher, status| {
+                    assert!(status.is_none());
+                    let mut idle_watcher = idle_watcher;
+                    idle_watcher.stop();
+                    idle_watcher.close(||());
+                }
+            }
+            loop_.run();
+            loop_.close();
+        }
+    }
+}
diff --git a/src/libstd/rt/uv/mod.rs b/src/libstd/rt/uv/mod.rs
index 2bd657fd864..5f9e5660814 100644
--- a/src/libstd/rt/uv/mod.rs
+++ b/src/libstd/rt/uv/mod.rs
@@ -57,6 +57,7 @@ pub use self::file::FsRequest;
 pub use self::net::{StreamWatcher, TcpWatcher};
 pub use self::idle::IdleWatcher;
 pub use self::timer::TimerWatcher;
+pub use self::async::AsyncWatcher;
 
 /// The implementation of `rtio` for libuv
 pub mod uvio;
@@ -68,6 +69,7 @@ pub mod file;
 pub mod net;
 pub mod idle;
 pub mod timer;
+pub mod async;
 
 /// XXX: Loop(*handle) is buggy with destructors. Normal structs
 /// with dtors may not be destructured, but tuple structs can,
@@ -125,6 +127,7 @@ pub type IdleCallback = ~fn(IdleWatcher, Option<UvError>);
 pub type ConnectionCallback = ~fn(StreamWatcher, Option<UvError>);
 pub type FsCallback = ~fn(FsRequest, Option<UvError>);
 pub type TimerCallback = ~fn(TimerWatcher, Option<UvError>);
+pub type AsyncCallback = ~fn(AsyncWatcher, Option<UvError>);
 
 
 /// Callbacks used by StreamWatchers, set as custom data on the foreign handle
@@ -135,7 +138,8 @@ struct WatcherData {
     close_cb: Option<NullCallback>,
     alloc_cb: Option<AllocCallback>,
     idle_cb: Option<IdleCallback>,
-    timer_cb: Option<TimerCallback>
+    timer_cb: Option<TimerCallback>,
+    async_cb: Option<AsyncCallback>
 }
 
 pub trait WatcherInterop {
@@ -164,7 +168,8 @@ impl<H, W: Watcher + NativeHandle<*H>> WatcherInterop for W {
                 close_cb: None,
                 alloc_cb: None,
                 idle_cb: None,
-                timer_cb: None
+                timer_cb: None,
+                async_cb: None
             };
             let data = transmute::<~WatcherData, *c_void>(data);
             uvll::set_data_for_uv_handle(self.native_handle(), data);
@@ -364,57 +369,3 @@ fn loop_smoke_test() {
         loop_.close();
     }
 }
-
-#[test]
-#[ignore(reason = "valgrind - loop destroyed before watcher?")]
-fn idle_new_then_close() {
-    do run_in_bare_thread {
-        let mut loop_ = Loop::new();
-        let idle_watcher = { IdleWatcher::new(&mut loop_) };
-        idle_watcher.close(||());
-    }
-}
-
-#[test]
-fn idle_smoke_test() {
-    do run_in_bare_thread {
-        let mut loop_ = Loop::new();
-        let mut idle_watcher = { IdleWatcher::new(&mut loop_) };
-        let mut count = 10;
-        let count_ptr: *mut int = &mut count;
-        do idle_watcher.start |idle_watcher, status| {
-            let mut idle_watcher = idle_watcher;
-            assert!(status.is_none());
-            if unsafe { *count_ptr == 10 } {
-                idle_watcher.stop();
-                idle_watcher.close(||());
-            } else {
-                unsafe { *count_ptr = *count_ptr + 1; }
-            }
-        }
-        loop_.run();
-        loop_.close();
-        assert_eq!(count, 10);
-    }
-}
-
-#[test]
-fn idle_start_stop_start() {
-    do run_in_bare_thread {
-        let mut loop_ = Loop::new();
-        let mut idle_watcher = { IdleWatcher::new(&mut loop_) };
-        do idle_watcher.start |idle_watcher, status| {
-            let mut idle_watcher = idle_watcher;
-            assert!(status.is_none());
-            idle_watcher.stop();
-            do idle_watcher.start |idle_watcher, status| {
-                assert!(status.is_none());
-                let mut idle_watcher = idle_watcher;
-                idle_watcher.stop();
-                idle_watcher.close(||());
-            }
-        }
-        loop_.run();
-        loop_.close();
-    }
-}
diff --git a/src/libstd/rt/uv/uvio.rs b/src/libstd/rt/uv/uvio.rs
index cacd67314eb..1ee6504d11f 100644
--- a/src/libstd/rt/uv/uvio.rs
+++ b/src/libstd/rt/uv/uvio.rs
@@ -12,6 +12,7 @@ use option::*;
 use result::*;
 use ops::Drop;
 use cell::{Cell, empty_cell};
+use cast;
 use cast::transmute;
 use clone::Clone;
 use rt::io::IoError;
@@ -23,6 +24,9 @@ use rt::sched::Scheduler;
 use rt::io::{standard_error, OtherIoError};
 use rt::tube::Tube;
 use rt::local::Local;
+use rt::work_queue::WorkQueue;
+use unstable::sync::{UnsafeAtomicRcBox, AtomicInt};
+use unstable::intrinsics;
 
 #[cfg(test)] use container::Container;
 #[cfg(test)] use uint;
@@ -39,11 +43,6 @@ pub impl UvEventLoop {
             uvio: UvIoFactory(Loop::new())
         }
     }
-
-    /// A convenience constructor
-    fn new_scheduler() -> Scheduler {
-        Scheduler::new(~UvEventLoop::new())
-    }
 }
 
 impl Drop for UvEventLoop {
@@ -82,6 +81,10 @@ impl EventLoop for UvEventLoop {
         }
     }
 
+    fn remote_callback(&mut self, f: ~fn()) -> ~RemoteCallbackObject {
+        ~UvRemoteCallback::new(self.uvio.uv_loop(), f)
+    }
+
     fn io<'a>(&'a mut self) -> Option<&'a mut IoFactoryObject> {
         Some(&mut self.uvio)
     }
@@ -101,6 +104,85 @@ fn test_callback_run_once() {
     }
 }
 
+pub struct UvRemoteCallback {
+    // The uv async handle for triggering the callback
+    async: AsyncWatcher,
+    // An atomic flag to tell the callback to exit,
+    // set from the dtor.
+    exit_flag: UnsafeAtomicRcBox<AtomicInt>
+}
+
+impl UvRemoteCallback {
+    pub fn new(loop_: &mut Loop, f: ~fn()) -> UvRemoteCallback {
+        let exit_flag = UnsafeAtomicRcBox::new(AtomicInt::new(0));
+        let exit_flag_clone = exit_flag.clone();
+        let async = do AsyncWatcher::new(loop_) |watcher, status| {
+            assert!(status.is_none());
+            f();
+            let exit_flag_ptr = exit_flag_clone.get();
+            unsafe {
+                if (*exit_flag_ptr).load() == 1 {
+                    watcher.close(||());
+                }
+            }
+        };
+        UvRemoteCallback {
+            async: async,
+            exit_flag: exit_flag
+        }
+    }
+}
+
+impl RemoteCallback for UvRemoteCallback {
+    fn fire(&mut self) { self.async.send() }
+}
+
+impl Drop for UvRemoteCallback {
+    fn finalize(&self) {
+        unsafe {
+            let mut this: &mut UvRemoteCallback = cast::transmute_mut(self);
+            let exit_flag_ptr = this.exit_flag.get();
+            (*exit_flag_ptr).store(1);
+            this.async.send();
+        }
+    }
+}
+
+#[cfg(test)]
+mod test_remote {
+    use super::*;
+    use cell;
+    use cell::Cell;
+    use rt::test::*;
+    use rt::thread::Thread;
+    use rt::tube::Tube;
+    use rt::rtio::EventLoop;
+    use rt::local::Local;
+    use rt::sched::Scheduler;
+
+    #[test]
+    fn test_uv_remote() {
+        do run_in_newsched_task {
+            let mut tube = Tube::new();
+            let tube_clone = tube.clone();
+            let remote_cell = cell::empty_cell();
+            do Local::borrow::<Scheduler>() |sched| {
+                let tube_clone = tube_clone.clone();
+                let tube_clone_cell = Cell(tube_clone);
+                let remote = do sched.event_loop.remote_callback {
+                    tube_clone_cell.take().send(1);
+                };
+                remote_cell.put_back(remote);
+            }
+            let _thread = do Thread::start {
+                remote_cell.take().fire();
+            };
+
+            assert!(tube.recv() == 1);
+        }
+    }
+}
+
 pub struct UvIoFactory(Loop);
 
 pub impl UvIoFactory {
@@ -123,12 +205,10 @@ impl IoFactory for UvIoFactory {
         assert!(scheduler.in_task_context());
 
         // Block this task and take ownership, switch to scheduler context
-        do scheduler.deschedule_running_task_and_then |task| {
+        do scheduler.deschedule_running_task_and_then |sched, task| {
 
             rtdebug!("connect: entered scheduler context");
-            do Local::borrow::<Scheduler> |scheduler| {
-                assert!(!scheduler.in_task_context());
-            }
+            assert!(!sched.in_task_context());
             let mut tcp_watcher = TcpWatcher::new(self.uv_loop());
             let task_cell = Cell(task);
 
@@ -168,7 +248,7 @@ impl IoFactory for UvIoFactory {
             Ok(_) => Ok(~UvTcpListener::new(watcher)),
             Err(uverr) => {
                 let scheduler = Local::take::<Scheduler>();
-                do scheduler.deschedule_running_task_and_then |task| {
+                do scheduler.deschedule_running_task_and_then |_, task| {
                     let task_cell = Cell(task);
                     do watcher.as_stream().close {
                         let scheduler = Local::take::<Scheduler>();
@@ -204,7 +284,7 @@ impl Drop for UvTcpListener {
     fn finalize(&self) {
         let watcher = self.watcher();
         let scheduler = Local::take::<Scheduler>();
-        do scheduler.deschedule_running_task_and_then |task| {
+        do scheduler.deschedule_running_task_and_then |_, task| {
             let task_cell = Cell(task);
             do watcher.as_stream().close {
                 let scheduler = Local::take::<Scheduler>();
@@ -266,7 +346,7 @@ impl Drop for UvTcpStream {
         rtdebug!("closing tcp stream");
         let watcher = self.watcher();
         let scheduler = Local::take::<Scheduler>();
-        do scheduler.deschedule_running_task_and_then |task| {
+        do scheduler.deschedule_running_task_and_then |_, task| {
             let task_cell = Cell(task);
             do watcher.close {
                 let scheduler = Local::take::<Scheduler>();
@@ -285,11 +365,9 @@ impl RtioTcpStream for UvTcpStream {
         assert!(scheduler.in_task_context());
         let watcher = self.watcher();
         let buf_ptr: *&mut [u8] = &buf;
-        do scheduler.deschedule_running_task_and_then |task| {
+        do scheduler.deschedule_running_task_and_then |sched, task| {
             rtdebug!("read: entered scheduler context");
-            do Local::borrow::<Scheduler> |scheduler| {
-                assert!(!scheduler.in_task_context());
-            }
+            assert!(!sched.in_task_context());
             let mut watcher = watcher;
             let task_cell = Cell(task);
             // XXX: We shouldn't reallocate these callbacks every
@@ -331,7 +409,7 @@ impl RtioTcpStream for UvTcpStream {
         assert!(scheduler.in_task_context());
         let watcher = self.watcher();
         let buf_ptr: *&[u8] = &buf;
-        do scheduler.deschedule_running_task_and_then |task| {
+        do scheduler.deschedule_running_task_and_then |_, task| {
             let mut watcher = watcher;
             let task_cell = Cell(task);
             let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
@@ -425,11 +503,9 @@ fn test_read_and_block() {
                 // Yield to the other task in hopes that it
                 // will trigger a read callback while we are
                 // not ready for it
-                do scheduler.deschedule_running_task_and_then |task| {
+                do scheduler.deschedule_running_task_and_then |sched, task| {
                     let task = Cell(task);
-                    do Local::borrow::<Scheduler> |scheduler| {
-                        scheduler.enqueue_task(task.take());
-                    }
+                    sched.enqueue_task(task.take());
                 }
             }