about summary refs log tree commit diff
path: root/src/libcore
diff options
context:
space:
mode:
authorBrian Anderson <banderson@mozilla.com>2013-05-12 14:35:52 -0700
committerBrian Anderson <banderson@mozilla.com>2013-05-14 14:52:07 -0700
commitee0ce64d9db10aebc491454b6595d6edf69fe513 (patch)
tree562b2b6b4d4e292842c37830d531c680a802d335 /src/libcore
parent204e3d82ccf5015e39f847aafea148d5180ab951 (diff)
downloadrust-ee0ce64d9db10aebc491454b6595d6edf69fe513.tar.gz
rust-ee0ce64d9db10aebc491454b6595d6edf69fe513.zip
core::rt: Wait for handles to close
Diffstat (limited to 'src/libcore')
-rw-r--r--src/libcore/rt/uv/idle.rs15
-rw-r--r--src/libcore/rt/uv/mod.rs6
-rw-r--r--src/libcore/rt/uv/uvio.rs63
3 files changed, 60 insertions, 24 deletions
diff --git a/src/libcore/rt/uv/idle.rs b/src/libcore/rt/uv/idle.rs
index 518429eeaff..fecb9391caa 100644
--- a/src/libcore/rt/uv/idle.rs
+++ b/src/libcore/rt/uv/idle.rs
@@ -11,7 +11,7 @@
 use libc::c_int;
 use option::Some;
 use rt::uv::uvll;
-use rt::uv::{Watcher, Loop, NativeHandle, IdleCallback};
+use rt::uv::{Watcher, Loop, NativeHandle, IdleCallback, NullCallback};
 use rt::uv::status_to_maybe_uv_error;
 
 pub struct IdleWatcher(*uvll::uv_idle_t);
@@ -57,12 +57,23 @@ pub impl IdleWatcher {
         }
     }
 
-    fn close(self) {
+    fn close(self, cb: NullCallback) {
+        {
+            let mut this = self;
+            let data = this.get_watcher_data();
+            assert!(data.close_cb.is_none());
+            data.close_cb = Some(cb);
+        }
+
         unsafe { uvll::close(self.native_handle(), close_cb) };
 
         extern fn close_cb(handle: *uvll::uv_idle_t) {
             unsafe {
                 let mut idle_watcher: IdleWatcher = NativeHandle::from_native_handle(handle);
+                {
+                    let mut data = idle_watcher.get_watcher_data();
+                    data.close_cb.swap_unwrap()();
+                }
                 idle_watcher.drop_watcher_data();
                 uvll::idle_delete(handle);
             }
diff --git a/src/libcore/rt/uv/mod.rs b/src/libcore/rt/uv/mod.rs
index 2c83873359a..684099d7fd1 100644
--- a/src/libcore/rt/uv/mod.rs
+++ b/src/libcore/rt/uv/mod.rs
@@ -356,7 +356,7 @@ fn idle_new_then_close() {
     do run_in_bare_thread {
         let mut loop_ = Loop::new();
         let idle_watcher = { IdleWatcher::new(&mut loop_) };
-        idle_watcher.close();
+        idle_watcher.close(||());
     }
 }
 
@@ -372,7 +372,7 @@ fn idle_smoke_test() {
             assert!(status.is_none());
             if unsafe { *count_ptr == 10 } {
                 idle_watcher.stop();
-                idle_watcher.close();
+                idle_watcher.close(||());
             } else {
                 unsafe { *count_ptr = *count_ptr + 1; }
             }
@@ -396,7 +396,7 @@ fn idle_start_stop_start() {
                 assert!(status.is_none());
                 let mut idle_watcher = idle_watcher;
                 idle_watcher.stop();
-                idle_watcher.close();
+                idle_watcher.close(||());
             }
         }
         loop_.run();
diff --git a/src/libcore/rt/uv/uvio.rs b/src/libcore/rt/uv/uvio.rs
index 2218c0734fb..c031d7a1a69 100644
--- a/src/libcore/rt/uv/uvio.rs
+++ b/src/libcore/rt/uv/uvio.rs
@@ -66,7 +66,7 @@ impl EventLoop for UvEventLoop {
             assert!(status.is_none());
             let mut idle_watcher = idle_watcher;
             idle_watcher.stop();
-            idle_watcher.close();
+            idle_watcher.close(||());
             f();
         }
     }
@@ -124,22 +124,26 @@ impl IoFactory for UvIoFactory {
             // Wait for a connection
             do tcp_watcher.connect(addr) |stream_watcher, status| {
                 rtdebug!("connect: in connect callback");
-                let maybe_stream = if status.is_none() {
+                if status.is_none() {
                     rtdebug!("status is none");
-                    Ok(~UvTcpStream { watcher: stream_watcher })
+                    let res = Ok(~UvTcpStream { watcher: stream_watcher });
+
+                    // Store the stream in the task's stack
+                    unsafe { (*result_cell_ptr).put_back(res); }
+
+                    // Context switch
+                    let scheduler = local_sched::take();
+                    scheduler.resume_task_immediately(task_cell.take());
                 } else {
                     rtdebug!("status is some");
-                    // XXX: Wait for close
-                    stream_watcher.close(||());
-                    Err(uv_error_to_io_error(status.get()))
+                    let task_cell = Cell(task_cell.take());
+                    do stream_watcher.close {
+                        let res = Err(uv_error_to_io_error(status.get()));
+                        unsafe { (*result_cell_ptr).put_back(res); }
+                        let scheduler = local_sched::take();
+                        scheduler.resume_task_immediately(task_cell.take());
+                    }
                 };
-
-                // Store the stream in the task's stack
-                unsafe { (*result_cell_ptr).put_back(maybe_stream); }
-
-                // Context switch
-                let scheduler = local_sched::take();
-                scheduler.resume_task_immediately(task_cell.take());
             }
         }
 
@@ -152,8 +156,14 @@ impl IoFactory for UvIoFactory {
         match watcher.bind(addr) {
             Ok(_) => Ok(~UvTcpListener::new(watcher)),
             Err(uverr) => {
-                // XXX: Should we wait until close completes?
-                watcher.as_stream().close(||());
+                let scheduler = local_sched::take();
+                do scheduler.deschedule_running_task_and_then |task| {
+                    let task_cell = Cell(task);
+                    do watcher.as_stream().close {
+                        let scheduler = local_sched::take();
+                        scheduler.resume_task_immediately(task_cell.take());
+                    }
+                }
                 Err(uv_error_to_io_error(uverr))
             }
         }
@@ -181,8 +191,15 @@ impl UvTcpListener {
 
 impl Drop for UvTcpListener {
     fn finalize(&self) {
-        // XXX: Need to wait until close finishes before returning
-        self.watcher().as_stream().close(||());
+        let watcher = self.watcher();
+        let scheduler = local_sched::take();
+        do scheduler.deschedule_running_task_and_then |task| {
+            let task_cell = Cell(task);
+            do watcher.as_stream().close {
+                let scheduler = local_sched::take();
+                scheduler.resume_task_immediately(task_cell.take());
+            }
+        }
     }
 }
 
@@ -235,8 +252,16 @@ impl UvTcpStream {
 
 impl Drop for UvTcpStream {
     fn finalize(&self) {
-        rtdebug!("closing stream");
-        self.watcher().close(||());
+        rtdebug!("closing tcp stream");
+        let watcher = self.watcher();
+        let scheduler = local_sched::take();
+        do scheduler.deschedule_running_task_and_then |task| {
+            let task_cell = Cell(task);
+            do watcher.close {
+                let scheduler = local_sched::take();
+                scheduler.resume_task_immediately(task_cell.take());
+            }
+        }
     }
 }