diff options
| author | Brian Anderson <banderson@mozilla.com> | 2013-05-12 14:35:52 -0700 |
|---|---|---|
| committer | Brian Anderson <banderson@mozilla.com> | 2013-05-14 14:52:07 -0700 |
| commit | ee0ce64d9db10aebc491454b6595d6edf69fe513 (patch) | |
| tree | 562b2b6b4d4e292842c37830d531c680a802d335 /src/libcore | |
| parent | 204e3d82ccf5015e39f847aafea148d5180ab951 (diff) | |
| download | rust-ee0ce64d9db10aebc491454b6595d6edf69fe513.tar.gz rust-ee0ce64d9db10aebc491454b6595d6edf69fe513.zip | |
core::rt: Wait for handles to close
Diffstat (limited to 'src/libcore')
| -rw-r--r-- | src/libcore/rt/uv/idle.rs | 15 | ||||
| -rw-r--r-- | src/libcore/rt/uv/mod.rs | 6 | ||||
| -rw-r--r-- | src/libcore/rt/uv/uvio.rs | 63 |
3 files changed, 60 insertions, 24 deletions
diff --git a/src/libcore/rt/uv/idle.rs b/src/libcore/rt/uv/idle.rs index 518429eeaff..fecb9391caa 100644 --- a/src/libcore/rt/uv/idle.rs +++ b/src/libcore/rt/uv/idle.rs @@ -11,7 +11,7 @@ use libc::c_int; use option::Some; use rt::uv::uvll; -use rt::uv::{Watcher, Loop, NativeHandle, IdleCallback}; +use rt::uv::{Watcher, Loop, NativeHandle, IdleCallback, NullCallback}; use rt::uv::status_to_maybe_uv_error; pub struct IdleWatcher(*uvll::uv_idle_t); @@ -57,12 +57,23 @@ pub impl IdleWatcher { } } - fn close(self) { + fn close(self, cb: NullCallback) { + { + let mut this = self; + let data = this.get_watcher_data(); + assert!(data.close_cb.is_none()); + data.close_cb = Some(cb); + } + unsafe { uvll::close(self.native_handle(), close_cb) }; extern fn close_cb(handle: *uvll::uv_idle_t) { unsafe { let mut idle_watcher: IdleWatcher = NativeHandle::from_native_handle(handle); + { + let mut data = idle_watcher.get_watcher_data(); + data.close_cb.swap_unwrap()(); + } idle_watcher.drop_watcher_data(); uvll::idle_delete(handle); } diff --git a/src/libcore/rt/uv/mod.rs b/src/libcore/rt/uv/mod.rs index 2c83873359a..684099d7fd1 100644 --- a/src/libcore/rt/uv/mod.rs +++ b/src/libcore/rt/uv/mod.rs @@ -356,7 +356,7 @@ fn idle_new_then_close() { do run_in_bare_thread { let mut loop_ = Loop::new(); let idle_watcher = { IdleWatcher::new(&mut loop_) }; - idle_watcher.close(); + idle_watcher.close(||()); } } @@ -372,7 +372,7 @@ fn idle_smoke_test() { assert!(status.is_none()); if unsafe { *count_ptr == 10 } { idle_watcher.stop(); - idle_watcher.close(); + idle_watcher.close(||()); } else { unsafe { *count_ptr = *count_ptr + 1; } } @@ -396,7 +396,7 @@ fn idle_start_stop_start() { assert!(status.is_none()); let mut idle_watcher = idle_watcher; idle_watcher.stop(); - idle_watcher.close(); + idle_watcher.close(||()); } } loop_.run(); diff --git a/src/libcore/rt/uv/uvio.rs b/src/libcore/rt/uv/uvio.rs index 2218c0734fb..c031d7a1a69 100644 --- a/src/libcore/rt/uv/uvio.rs +++ b/src/libcore/rt/uv/uvio.rs @@ -66,7 +66,7 @@ impl EventLoop for UvEventLoop { assert!(status.is_none()); let mut idle_watcher = idle_watcher; idle_watcher.stop(); - idle_watcher.close(); + idle_watcher.close(||()); f(); } } @@ -124,22 +124,26 @@ impl IoFactory for UvIoFactory { // Wait for a connection do tcp_watcher.connect(addr) |stream_watcher, status| { rtdebug!("connect: in connect callback"); - let maybe_stream = if status.is_none() { + if status.is_none() { rtdebug!("status is none"); - Ok(~UvTcpStream { watcher: stream_watcher }) + let res = Ok(~UvTcpStream { watcher: stream_watcher }); + + // Store the stream in the task's stack + unsafe { (*result_cell_ptr).put_back(res); } + + // Context switch + let scheduler = local_sched::take(); + scheduler.resume_task_immediately(task_cell.take()); } else { rtdebug!("status is some"); - // XXX: Wait for close - stream_watcher.close(||()); - Err(uv_error_to_io_error(status.get())) + let task_cell = Cell(task_cell.take()); + do stream_watcher.close { + let res = Err(uv_error_to_io_error(status.get())); + unsafe { (*result_cell_ptr).put_back(res); } + let scheduler = local_sched::take(); + scheduler.resume_task_immediately(task_cell.take()); + } }; - - // Store the stream in the task's stack - unsafe { (*result_cell_ptr).put_back(maybe_stream); } - - // Context switch - let scheduler = local_sched::take(); - scheduler.resume_task_immediately(task_cell.take()); } } @@ -152,8 +156,14 @@ impl IoFactory for UvIoFactory { match watcher.bind(addr) { Ok(_) => Ok(~UvTcpListener::new(watcher)), Err(uverr) => { - // XXX: Should we wait until close completes? - watcher.as_stream().close(||()); + let scheduler = local_sched::take(); + do scheduler.deschedule_running_task_and_then |task| { + let task_cell = Cell(task); + do watcher.as_stream().close { + let scheduler = local_sched::take(); + scheduler.resume_task_immediately(task_cell.take()); + } + } Err(uv_error_to_io_error(uverr)) } } @@ -181,8 +191,15 @@ impl UvTcpListener { impl Drop for UvTcpListener { fn finalize(&self) { - // XXX: Need to wait until close finishes before returning - self.watcher().as_stream().close(||()); + let watcher = self.watcher(); + let scheduler = local_sched::take(); + do scheduler.deschedule_running_task_and_then |task| { + let task_cell = Cell(task); + do watcher.as_stream().close { + let scheduler = local_sched::take(); + scheduler.resume_task_immediately(task_cell.take()); + } + } } } @@ -235,8 +252,16 @@ impl UvTcpStream { impl Drop for UvTcpStream { fn finalize(&self) { - rtdebug!("closing stream"); - self.watcher().close(||()); + rtdebug!("closing tcp stream"); + let watcher = self.watcher(); + let scheduler = local_sched::take(); + do scheduler.deschedule_running_task_and_then |task| { + let task_cell = Cell(task); + do watcher.close { + let scheduler = local_sched::take(); + scheduler.resume_task_immediately(task_cell.take()); + } + } } } |
