about summary refs log tree commit diff
path: root/src
diff options
context:
space:
mode:
authorJeff Olson <olson.jeffery@gmail.com>2012-04-27 21:42:04 -0700
committerJeff Olson <olson.jeffery@gmail.com>2012-04-27 22:19:30 -0700
commit92e88e4e2ce79868daee1ac3f77a6aaa193b5896 (patch)
tree4f4f88e251751cc32dd6bd60992714a24493acc9 /src
parent577b888e4b87437a37903abb5b70b113a9df059d (diff)
downloadrust-92e88e4e2ce79868daee1ac3f77a6aaa193b5896.tar.gz
rust-92e88e4e2ce79868daee1ac3f77a6aaa193b5896.zip
std: another stab at a race-free global loop implementation
seems to hold up pretty well.

uv::hl API is affected.. had to do work on tests and std::timer code that
leverages the global loop/high_level_loop API.

see test_stress_gl_uv_global_loop_high_level_global_timer for a stress
example.. it takes a while to run, but it exits cleanly (something I could
never accomplish with earlier iterations of the global loop)
Diffstat (limited to 'src')
-rw-r--r--src/libstd/timer.rs14
-rw-r--r--src/libstd/uv_global_loop.rs371
-rw-r--r--src/libstd/uv_hl.rs318
3 files changed, 188 insertions, 515 deletions
diff --git a/src/libstd/timer.rs b/src/libstd/timer.rs
index a7cb1762c4f..1c646f61d6a 100644
--- a/src/libstd/timer.rs
+++ b/src/libstd/timer.rs
@@ -30,7 +30,6 @@ fn delayed_send<T: send>(msecs: uint, ch: comm::chan<T>, val: T) {
             let timer_ptr = ptr::addr_of(timer);
             let hl_loop = uv::global_loop::get();
             uv::hl::interact(hl_loop) {|loop_ptr|
-                uv::hl::ref(hl_loop, timer_ptr);
                 let init_result = uv::ll::timer_init(loop_ptr, timer_ptr);
                 if (init_result == 0i32) {
                     let start_result = uv::ll::timer_start(
@@ -54,9 +53,6 @@ fn delayed_send<T: send>(msecs: uint, ch: comm::chan<T>, val: T) {
             comm::recv(timer_done_po);
             // notify the caller immediately
             comm::send(ch, copy(val));
-            // then clean up our handle
-            uv::hl::unref_and_close(hl_loop, timer_ptr,
-                                    delayed_send_close_cb);
             // uv_close for this timer has been processed
             comm::recv(timer_done_po);
         }
@@ -122,6 +118,7 @@ crust fn delayed_send_cb(handle: *uv::ll::uv_timer_t,
     let stop_result = uv::ll::timer_stop(handle);
     if (stop_result == 0i32) {
         comm::send(timer_done_ch, ());
+        uv::ll::close(handle, delayed_send_close_cb);
     }
     else {
         let loop_ptr = uv::ll::get_loop_for_uv_handle(handle);
@@ -140,14 +137,12 @@ crust fn delayed_send_close_cb(handle: *uv::ll::uv_timer_t) unsafe {
 #[cfg(test)]
 mod test {
     #[test]
-    #[ignore]
-    fn test_timer_simple_sleep_test() {
+    fn test_gl_timer_simple_sleep_test() {
         sleep(1u);
     }
 
     #[test]
-    #[ignore]
-    fn test_timer_recv_timeout_before_time_passes() {
+    fn test_gl_timer_recv_timeout_before_time_passes() {
         let expected = rand::rng().gen_str(16u);
         let test_po = comm::port::<str>();
         let test_ch = comm::chan(test_po);
@@ -165,8 +160,7 @@ mod test {
     }
 
     #[test]
-    #[ignore]
-    fn test_timer_recv_timeout_after_time_passes() {
+    fn test_gl_timer_recv_timeout_after_time_passes() {
         let expected = rand::rng().gen_str(16u);
         let fail_msg = rand::rng().gen_str(16u);
         let test_po = comm::port::<str>();
diff --git a/src/libstd/uv_global_loop.rs b/src/libstd/uv_global_loop.rs
index 53130c04218..46d73867285 100644
--- a/src/libstd/uv_global_loop.rs
+++ b/src/libstd/uv_global_loop.rs
@@ -6,7 +6,7 @@ import ll = uv_ll;
 import hl = uv_hl;
 import get_gl = get;
 
-export get, get_single_task_gl, get_monitor_task_gl;
+export get, get_monitor_task_gl;
 
 native mod rustrt {
     fn rust_uv_get_kernel_global_chan_ptr() -> *libc::uintptr_t;
@@ -16,8 +16,7 @@ native mod rustrt {
 Race-free helper to get access to a global task where a libuv
 loop is running.
 
-Use `uv::hl::interact`, `uv::hl::ref`, `uv::hl::unref` and
-uv `uv::hl::unref_and_close` to do operations against the global
+Use `uv::hl::interact` to do operations against the global
 loop that this function returns.
 
 # Return
@@ -32,61 +31,10 @@ fn get() -> hl::high_level_loop {
 // WARNING: USE ONLY ONE get_*_task_gl fn in the scope of a process lifetime.
 #[doc(hidden)]
 fn get_monitor_task_gl() -> hl::high_level_loop {
-    let monitor_loop_chan =
-        rustrt::rust_uv_get_kernel_monitor_global_chan_ptr();
-    ret spawn_global_weak_task(
-        monitor_loop_chan,
-        {|weak_exit_po, msg_po, loop_ptr, first_msg|
-            log(debug, "monitor gl: entering inner loop");
-            unsafe {
-                monitor_task_loop_body(weak_exit_po, msg_po, loop_ptr,
-                                       copy(first_msg))
-            }
-        },
-        {|msg_ch|
-            hl::monitor_task_loop({op_chan: msg_ch})
-        });
-}
-
-// WARNING: USE ONLY ONE get_*_task_gl fn in the scope of a process lifetime.
-#[doc(hidden)]
-fn get_single_task_gl() -> hl::high_level_loop {
-    let global_loop_chan_ptr = rustrt::rust_uv_get_kernel_global_chan_ptr();
-    ret spawn_global_weak_task(
-        global_loop_chan_ptr,
-        {|weak_exit_po, msg_po, loop_ptr, first_msg|
-            log(debug, "single-task gl: about to enter inner loop");
-            unsafe {
-                single_task_loop_body(weak_exit_po, msg_po, loop_ptr,
-                                      copy(first_msg))
-            }
-        },
-        {|msg_ch|
-            log(debug, "after priv::chan_from_global_ptr");
-            unsafe {
-                let handle = get_global_async_handle_native_representation()
-                    as **ll::uv_async_t;
-                hl::single_task_loop(
-                    { async_handle: handle, op_chan: msg_ch })
-            }
-        }
-    );
-}
-
-// INTERNAL API
-
-fn spawn_global_weak_task(
-        global_loop_chan_ptr: *libc::uintptr_t,
-        weak_task_body_cb: fn~(
-            comm::port<()>,
-            comm::port<hl::high_level_msg>,
-            *libc::c_void,
-            hl::high_level_msg) -> bool,
-        after_task_spawn_cb: fn~(comm::chan<hl::high_level_msg>)
-          -> hl::high_level_loop) -> hl::high_level_loop {
+    let monitor_loop_chan_ptr =
+        rustrt::rust_uv_get_kernel_global_chan_ptr();
     log(debug, #fmt("ENTERING global_loop::get() loop chan: %?",
-       global_loop_chan_ptr));
-
+       monitor_loop_chan_ptr));
     let builder_fn = {||
         let builder = task::builder();
         let opts = {
@@ -101,206 +49,95 @@ fn spawn_global_weak_task(
     };
     unsafe {
         log(debug, "before priv::chan_from_global_ptr");
-        let msg_ch = priv::chan_from_global_ptr::<hl::high_level_msg>(
-            global_loop_chan_ptr,
-            builder_fn) {|port|
-
-            // the actual body of our global loop lives here
-            log(debug, "initialized global port task!");
-            log(debug, "GLOBAL initialized global port task!");
-            outer_global_loop_body(port, weak_task_body_cb);
+        type hl_loop_req_ch = comm::chan<hl::high_level_loop>;
+        let msg_ch = priv::chan_from_global_ptr::<hl_loop_req_ch>(
+            monitor_loop_chan_ptr,
+            builder_fn) {|msg_po|
+            log(debug, "global monitor task starting");
+            priv::weaken_task() {|weak_exit_po|
+                log(debug, "global monitor task is now weak");
+                let hl_loop_data = spawn_libuv_weak_task();
+                let hl_loop = alt hl_loop_data {
+                  (async, msg_ch) {
+                    hl::simple_task_loop({async_handle:async, op_chan:msg_ch})
+                  }
+                };
+                loop {
+                    log(debug, "in outer_loop...");
+                    let continue = either::either(
+                        {|weak_exit|
+                            // all normal tasks have ended, tell the
+                            // libuv loop to tear_down, then exit
+                            log(debug, #fmt("weak_exit_po recv'd msg: %?",
+                                           weak_exit));
+                            let ( a, loop_msg_ch )= hl_loop_data;
+                            comm::send(loop_msg_ch, hl::teardown_loop);
+                            ll::async_send(a);
+                            false
+                        }, {|fetch_ch|
+                            log(debug, #fmt("hl_loop req recv'd: %?",
+                                           fetch_ch));
+                            comm::send(fetch_ch, copy(hl_loop));
+                            true
+                        }, comm::select2(weak_exit_po, msg_po));
+                    if !continue { break; }
+                }
+                log(debug, "global monitor task is leaving weakend state");
+            };
+            log(debug, "global monitor task exiting");
         };
-        ret after_task_spawn_cb(msg_ch);
+        // once we have a chan to the monitor loop, we ask it for
+        // the libuv loop's async handle
+        let fetch_po = comm::port::<hl::high_level_loop>();
+        let fetch_ch = comm::chan(fetch_po);
+        comm::send(msg_ch, fetch_ch);
+        comm::recv(fetch_po)
     }
 }
 
-unsafe fn outer_global_loop_body(
-    msg_po: comm::port<hl::high_level_msg>,
-    weak_task_body_cb: fn~(
-        comm::port<()>,
-        comm::port<hl::high_level_msg>,
-        *libc::c_void,
-        hl::high_level_msg) -> bool) {
-    // we're going to use a single libuv-generated loop ptr
-    // for the duration of the process
-    let loop_ptr = ll::loop_new();
-
-    // data structure for loop goes here..
-
-    // immediately weaken the task this is running in.
-    priv::weaken_task() {|weak_exit_po|
-        // when we first enter this loop, we're going
-        // to wait on stand-by to receive a request to
-        // fire-up the libuv loop
-        let mut continue = true;
-        while continue {
-            log(debug, "in outer_loop...");
-            continue = either::either(
-                {|left_val|
-                    // bail out..
-                    // if we catch this msg at this point,
-                    // we should just be able to exit because
-                    // the loop isn't active
-                    log(debug, #fmt("weak_exit_po recv'd msg: %?",
-                                   left_val));
-                    false
-                }, {|right_val|
-                    weak_task_body_cb(weak_exit_po, msg_po, loop_ptr,
-                                      right_val)
-                }, comm::select2(weak_exit_po, msg_po));
-            log(debug,#fmt("GLOBAL LOOP EXITED, WAITING TO RESTART? %?",
-                       continue));
-        }
-    };
-
-    ll::loop_delete(loop_ptr);
-}
-
-unsafe fn monitor_task_loop_body(weak_exit_po_in: comm::port<()>,
-                          msg_po_in: comm::port<hl::high_level_msg>,
-                          loop_ptr: *libc::c_void,
-                          -first_interaction: hl::high_level_msg) -> bool {
-    // resend the msg to be handled in the select2 loop below..
-    comm::send(comm::chan(msg_po_in), first_interaction);
-
-    // our async_handle
-    let async_handle_po = comm::port::<*ll::uv_async_t>();
-    let async_handle_ch = comm::chan(async_handle_po);
-
-    // the msg_po that libuv will be receiving on..
-    let loop_msg_po = comm::port::<hl::high_level_msg>();
-    let loop_msg_po_ptr = ptr::addr_of(loop_msg_po);
-    let loop_msg_ch = comm::chan(loop_msg_po);
-
-    // the question of whether unsupervising this will even do any
-    // good is there.. but since this'll go into blocking in libuv with
-    // a quickness.. any errors that occur (including inside crust) will
-    // be segfaults.. so yeah.
+unsafe fn spawn_libuv_weak_task() -> (*ll::uv_async_t,
+                                      comm::chan<hl::high_level_msg>){
+    let exit_po = comm::port::<(*ll::uv_async_t,
+                              comm::chan<hl::high_level_msg>)>();
+    let exit_ch = comm::chan(exit_po);
+    
     task::spawn_sched(task::manual_threads(1u)) {||
-        let loop_msg_po_in = *loop_msg_po_ptr;
-        hl::run_high_level_loop(
-            loop_ptr,
-            loop_msg_po_in, // here the loop gets handed a different message
-                            // port, as we'll be receiving all of the messages
-                            // initially and then passing them on..
-            // before_run
-            {|async_handle|
-                log(debug,#fmt("monitor gl: before_run: async_handle %?",
-                              async_handle));
-                // when this is ran, our async_handle is set up, so let's
-                // do an async_send with it.. letting the loop know, once it
-                // starts, that is has work
-                ll::async_send(async_handle);
-                comm::send(async_handle_ch, copy(async_handle));
-            },
-            // before_msg_drain
-            {|async_handle|
-                log(debug,#fmt("monitor gl: b4_msg_drain: async_handle %?",
-                              async_handle));
-                true
-            },
-            // before_tear_down
-            {|async_handle|
-                log(debug,#fmt("monitor gl: b4_tear_down: async_handle %?",
-                              async_handle));
-            });
+        log(debug, "entering global libuv task");
+        let loop_ptr = ll::loop_new();
+        priv::weaken_task() {|weak_exit_po|
+            log(debug, #fmt("global libuv task is now weak %?",
+                            weak_exit_po));
+            let loop_msg_po = comm::port::<hl::high_level_msg>();
+            let loop_msg_ch = comm::chan(loop_msg_po);
+            hl::run_high_level_loop(
+                loop_ptr,
+                loop_msg_po,
+                // before_run
+                {|async_handle|
+                    log(debug,#fmt("global libuv: before_run %?",
+                                  async_handle));
+                    let out_data = (async_handle, loop_msg_ch);
+                    comm::send(exit_ch, out_data);
+                },
+                // before_msg_process
+                {|async_handle, loop_active|
+                    log(debug,#fmt("global libuv: before_msg_drain %? %?",
+                                  async_handle, loop_active));
+                    true
+                },
+                // before_tear_down
+                {|async_handle|
+                    log(debug,#fmt("libuv task: before_tear_down %?",
+                                  async_handle));
+                }
+            );
+            log(debug, "global libuv task is leaving weakened state");
+        };
+        ll::loop_delete(loop_ptr);
+        log(debug, "global libuv task exiting");
     };
 
-    // our loop is set up, so let's emit the handle back out to our users..
-    let async_handle = comm::recv(async_handle_po);
-    // supposed to return a bool to indicate to the enclosing loop whether
-    // it should continue or not..
-    let mut continue_inner_loop = true;
-    let mut didnt_get_hl_bailout = true;
-    while continue_inner_loop {
-        log(debug, "monitor task inner loop.. about to block on select2");
-        continue_inner_loop = either::either(
-            {|left_val|
-                // bail out..
-                log(debug, #fmt("monitor inner weak_exit_po recv'd msg: %?",
-                               left_val));
-                // TODO: make loop bail out
-                didnt_get_hl_bailout = false;
-                false
-            }, {|right_val|
-                // wake up our inner loop and pass it a msg..
-                comm::send(loop_msg_ch, copy(right_val));
-                ll::async_send(async_handle);
-                true
-            }, comm::select2(weak_exit_po_in, msg_po_in)
-        )
-    }
-    didnt_get_hl_bailout
-}
-
-unsafe fn single_task_loop_body(weak_exit_po_in: comm::port<()>,
-                          msg_po_in: comm::port<hl::high_level_msg>,
-                          loop_ptr: *libc::c_void,
-                          -first_interaction: hl::high_level_msg) -> bool {
-    // resend the msg
-    comm::send(comm::chan(msg_po_in), first_interaction);
-
-    // black magic
-    let weak_exit_po_ptr = ptr::addr_of(weak_exit_po_in);
-    hl::run_high_level_loop(
-        loop_ptr,
-        msg_po_in,
-        // before_run
-        {|async_handle|
-            log(debug,#fmt("global_loop before_run: async_handle %?",
-                          async_handle));
-            // set the handle as the global
-            set_global_async_handle(0u as *ll::uv_async_t,
-                                    async_handle);
-            // when this is ran, our async_handle is set up, so let's
-            // do an async_send with it
-            ll::async_send(async_handle);
-        },
-        // before_msg_drain
-        {|async_handle|
-            log(debug,#fmt("global_loop before_msg_drain: async_handle %?",
-                          async_handle));
-            let weak_exit_po = *weak_exit_po_ptr;
-            if(comm::peek(weak_exit_po)) {
-                // if this is true, immediately bail and return false, causing
-                // the libuv loop to start tearing down
-                log(debug,"got weak_exit meg inside libuv loop");
-                comm::recv(weak_exit_po);
-                false
-            }
-            // if no weak_exit_po msg is received, then we'll let the
-            // loop continue
-            else {
-                true
-            }
-        },
-        // before_tear_down
-        {|async_handle|
-            log(debug,#fmt("global_loop before_tear_down: async_handle %?",
-                          async_handle));
-            set_global_async_handle(async_handle,
-                                    0 as *ll::uv_async_t);
-        });
-    // supposed to return a bool to indicate to the enclosing loop whether
-    // it should continue or not..
-    ret true;
-}
-
-unsafe fn get_global_async_handle_native_representation()
-    -> *libc::uintptr_t {
-    ret rustrt::rust_uv_get_kernel_global_async_handle();
-}
-
-unsafe fn get_global_async_handle() -> *ll::uv_async_t {
-    ret (*get_global_async_handle_native_representation()) as *ll::uv_async_t;
-}
-
-unsafe fn set_global_async_handle(old: *ll::uv_async_t,
-                           new_ptr: *ll::uv_async_t) {
-    rustrt::rust_compare_and_swap_ptr(
-        get_global_async_handle_native_representation(),
-        old as libc::uintptr_t,
-        new_ptr as libc::uintptr_t);
+    comm::recv(exit_po)
 }
 
 #[cfg(test)]
@@ -320,8 +157,7 @@ mod test {
         let hl_loop = get_gl();
         hl::interact(hl_loop) {|loop_ptr|
             log(debug, "closing timer");
-            //ll::close(timer_ptr as *libc::c_void, simple_timer_close_cb);
-            hl::unref_and_close(hl_loop, timer_ptr, simple_timer_close_cb);
+            ll::close(timer_ptr, simple_timer_close_cb);
             log(debug, "about to deref exit_ch_ptr");
             log(debug, "after msg sent on deref'd exit_ch");
         };
@@ -340,7 +176,6 @@ mod test {
             log(debug, "user code inside interact loop!!!");
             let init_status = ll::timer_init(loop_ptr, timer_ptr);
             if(init_status == 0i32) {
-                hl::ref(hl_loop, timer_ptr);
                 ll::set_data_for_uv_handle(
                     timer_ptr as *libc::c_void,
                     exit_ch_ptr as *libc::c_void);
@@ -359,13 +194,39 @@ mod test {
         comm::recv(exit_po);
         log(debug, "global_loop timer test: msg recv on exit_po, done..");
     }
+
     #[test]
-    #[ignore]
-    fn test_uv_global_loop_high_level_global_timer() unsafe {
+    fn test_gl_uv_global_loop_high_level_global_timer() unsafe {
         let hl_loop = get_gl();
+        let exit_po = comm::port::<()>();
+        let exit_ch = comm::chan(exit_po);
         task::spawn_sched(task::manual_threads(1u), {||
             impl_uv_hl_simple_timer(hl_loop);
+            comm::send(exit_ch, ());
         });
         impl_uv_hl_simple_timer(hl_loop);
+        comm::recv(exit_po);
+    }
+
+    // keeping this test ignored until some kind of stress-test-harness
+    // is set up for the build bots
+    #[test]
+    #[ignore]
+    fn test_stress_gl_uv_global_loop_high_level_global_timer() unsafe {
+        let hl_loop = get_gl();
+        let exit_po = comm::port::<()>();
+        let exit_ch = comm::chan(exit_po);
+        let cycles = 5000u;
+        iter::repeat(cycles) {||
+            task::spawn_sched(task::manual_threads(1u), {||
+                impl_uv_hl_simple_timer(hl_loop);
+                comm::send(exit_ch, ());
+            });
+        };
+        iter::repeat(cycles) {||
+            comm::recv(exit_po);
+        };
+        log(debug, "test_stress_gl_uv_global_loop_high_level_global_timer"+
+            " exiting sucessfully!");
     }
 }
\ No newline at end of file
diff --git a/src/libstd/uv_hl.rs b/src/libstd/uv_hl.rs
index 8ce0fb8e5c7..75c5c6ebea4 100644
--- a/src/libstd/uv_hl.rs
+++ b/src/libstd/uv_hl.rs
@@ -6,8 +6,8 @@ provide a high-level, abstracted interface to some set of
 libuv functionality.
 "];
 
-export high_level_loop, hl_loop_ext, high_level_msg;
-export run_high_level_loop, interact, ref, unref, unref_and_close;
+export high_level_loop, high_level_msg;
+export run_high_level_loop, interact;
 
 import ll = uv_ll;
 
@@ -26,51 +26,15 @@ enum high_level_loop {
     simple_task_loop({
         async_handle: *ll::uv_async_t,
         op_chan: comm::chan<high_level_msg>
-    }),
-    single_task_loop({
-        async_handle: **ll::uv_async_t,
-        op_chan: comm::chan<high_level_msg>
-    }),
-    monitor_task_loop({
-        op_chan: comm::chan<high_level_msg>
     })
 }
 
-impl hl_loop_ext for high_level_loop {
-    fn async_handle() -> **ll::uv_async_t {
-        alt self {
-          single_task_loop({async_handle, op_chan}) {
-            ret async_handle;
-          }
-          _ {
-            fail "variant of hl::high_level_loop that doesn't include" +
-                "an async_handle field";
-          }
-        }
-    }
-    fn op_chan() -> comm::chan<high_level_msg> {
-        alt self {
-          single_task_loop({async_handle, op_chan}) {
-            ret op_chan;
-          }
-          monitor_task_loop({op_chan}) {
-            ret op_chan;
-          }
-          simple_task_loop({async_handle, op_chan}) {
-            ret op_chan;
-          }
-        }
-    }
-}
-
 #[doc="
 Represents the range of interactions with a `high_level_loop`
 "]
 enum high_level_msg {
     interaction (fn~(*libc::c_void)),
-    ref_handle (*libc::c_void),
-    manual_unref_handle (*libc::c_void, option<*u8>),
-    tear_down
+    teardown_loop
 }
 
 #[doc = "
@@ -93,7 +57,8 @@ provided `async_handle`. `uv_run` should return shortly after
 unsafe fn run_high_level_loop(loop_ptr: *libc::c_void,
                               msg_po: comm::port<high_level_msg>,
                               before_run: fn~(*ll::uv_async_t),
-                              before_msg_drain: fn~(*ll::uv_async_t) -> bool,
+                              before_msg_process:
+                                fn~(*ll::uv_async_t, bool) -> bool,
                               before_tear_down: fn~(*ll::uv_async_t)) {
     // set up the special async handle we'll use to allow multi-task
     // communication with this loop
@@ -106,11 +71,9 @@ unsafe fn run_high_level_loop(loop_ptr: *libc::c_void,
     let data: hl_loop_data = default_gl_data({
         async_handle: async_handle,
         mut active: true,
-        before_msg_drain: before_msg_drain,
+        before_msg_process: before_msg_process,
         before_tear_down: before_tear_down,
-        msg_po_ptr: ptr::addr_of(msg_po),
-        mut refd_handles: [mut],
-        mut unrefd_handles: [mut]
+        msg_po_ptr: ptr::addr_of(msg_po)
     });
     let data_ptr = ptr::addr_of(data);
     ll::set_data_for_uv_handle(async_handle, data_ptr);
@@ -143,44 +106,6 @@ unsafe fn interact(a_loop: high_level_loop,
     send_high_level_msg(a_loop, interaction(cb));
 }
 
-iface uv_handle_manager<T> {
-    fn init() -> T;
-}
-
-type safe_handle_fields<T> = {
-    hl_loop: high_level_loop,
-    handle: T,
-    close_cb: *u8
-};
-
-/*fn safe_handle<T>(a_loop: high_level_loop,
-                  handle_val: T,
-                  handle_init_cb: fn~(*libc::c_void, *T),
-                  close_cb: *u8) {
-
-resource safe_handle_container<T>(handle_fields: safe_handle_fields<T>) {
-}
-}*/
-
-
-#[doc="
-Needs to be encapsulated within `safe_handle`
-"]
-fn ref<T>(hl_loop: high_level_loop, handle: *T) unsafe {
-    send_high_level_msg(hl_loop, ref_handle(handle as *libc::c_void));
-}
-#[doc="
-Needs to be encapsulated within `safe_handle`
-"]
-fn unref<T>(hl_loop: high_level_loop, handle: *T) unsafe {
-    send_high_level_msg(hl_loop, manual_unref_handle(handle as *libc::c_void,
-                                                   none));
-}
-fn unref_and_close<T>(hl_loop: high_level_loop, handle: *T, cb: *u8) unsafe {
-    send_high_level_msg(hl_loop, manual_unref_handle(handle as *libc::c_void,
-                                                   some(cb)));
-}
-
 // INTERNAL API
 
 // data that lives for the lifetime of the high-evel oo
@@ -188,36 +113,26 @@ enum hl_loop_data {
     default_gl_data({
         async_handle: *ll::uv_async_t,
         mut active: bool,
-        before_msg_drain: fn~(*ll::uv_async_t) -> bool,
+        before_msg_process: fn~(*ll::uv_async_t, bool) -> bool,
         before_tear_down: fn~(*ll::uv_async_t),
-        msg_po_ptr: *comm::port<high_level_msg>,
-        mut refd_handles: [mut *libc::c_void],
-        mut unrefd_handles: [mut *libc::c_void]})
+        msg_po_ptr: *comm::port<high_level_msg>})
 }
 
 unsafe fn send_high_level_msg(hl_loop: high_level_loop,
-                              -msg: high_level_msg) unsafe {
-    comm::send(hl_loop.op_chan(), msg);
+                              -msg: high_level_msg) {
+    let op_chan = alt hl_loop{simple_task_loop({async_handle, op_chan}){
+      op_chan}};
+    comm::send(op_chan, msg);
 
     // if the global async handle == 0, then that means
     // the loop isn't active, so we don't need to wake it up,
     // (the loop's enclosing task should be blocking on a message
     // receive on this port)
     alt hl_loop {
-      single_task_loop({async_handle, op_chan}) {
-        if ((*async_handle) != 0 as *ll::uv_async_t) {
-            log(debug,"global async handle != 0, waking up loop..");
-            ll::async_send((*async_handle));
-        }
-        else {
-            log(debug,"GLOBAL ASYNC handle == 0");
-        }
-      }
       simple_task_loop({async_handle, op_chan}) {
         log(debug,"simple async handle != 0, waking up loop..");
         ll::async_send((async_handle));
       }
-      _ {}
     }
 }
 
@@ -228,71 +143,57 @@ unsafe fn send_high_level_msg(hl_loop: high_level_loop,
 // data member
 crust fn high_level_wake_up_cb(async_handle: *ll::uv_async_t,
                                status: int) unsafe {
-    // nothing here, yet.
     log(debug, #fmt("high_level_wake_up_cb crust.. handle: %? status: %?",
                      async_handle, status));
     let loop_ptr = ll::get_loop_for_uv_handle(async_handle);
     let data = ll::get_data_for_uv_handle(async_handle) as *hl_loop_data;
-    // we check to see if the loop is "active" (the loop is set to
-    // active = false the first time we realize we need to 'tear down',
-    // set subsequent calls to the global async handle may be triggered
-    // before all of the uv_close() calls are processed and loop exits
-    // on its own. So if the loop isn't active, we won't run the user's
-    // on_wake callback (and, consequently, let messages pile up, probably
-    // in the loops msg_po)
-    if (*data).active {
-        log(debug, "before on_wake");
-        let mut do_msg_drain = (*data).before_msg_drain(async_handle);
-        let mut continue = true;
-        if do_msg_drain {
-            let msg_po = *((*data).msg_po_ptr);
-            if comm::peek(msg_po) {
-                // if this is true, we'll iterate over the
-                // msgs waiting in msg_po until there's no more
-                log(debug,"got msg_po");
-                while(continue) {
-                    log(debug,"before alt'ing on high_level_msg");
-                    alt comm::recv(msg_po) {
+    alt (*data).active {
+      true {
+        let msg_po = *((*data).msg_po_ptr);
+        alt comm::peek(msg_po) {
+          true {
+            loop {
+                let msg = comm::recv(msg_po);
+                alt (*data).active {
+                  true {
+                    alt msg {
                       interaction(cb) {
-                        log(debug,"got interaction, before cb..");
-                        // call it..
+                        (*data).before_msg_process(async_handle,
+                                                   (*data).active);
                         cb(loop_ptr);
-                        log(debug,"after calling cb");
                       }
-                      ref_handle(handle) {
-                        high_level_ref(data, handle);
-                      }
-                      manual_unref_handle(handle, user_close_cb) {
-                        high_level_unref(data, handle, true, user_close_cb);
-                      }
-                      tear_down {
-                        log(debug,"incoming hl_msg: got tear_down");
+                      teardown_loop {
+                        begin_teardown(data);
                       }
                     }
-                    continue = comm::peek(msg_po);
+                  }
+                  false {
+                    // drop msg ?
+                  }
                 }
+                if !comm::peek(msg_po) { break; }
             }
-            else {
-                log(debug, "in hl wake_cb, no pending messages");
-            }
-        }
-        log(debug, #fmt("after on_wake, continue? %?", continue));
-        if !do_msg_drain {
-            high_level_tear_down(data);
+          }
+          false {
+            // no pending msgs
+          }
         }
+      }
+      false {
+        // loop not active
+      }
     }
 }
 
 crust fn tear_down_close_cb(handle: *ll::uv_async_t) unsafe {
-    log(debug, #fmt("tear_down_close_cb called, closing handle at %?",
-                    handle));
-    let data = ll::get_data_for_uv_handle(handle) as *hl_loop_data;
-    if vec::len((*data).refd_handles) > 0u {
-        fail "Didn't unref all high-level handles";
-    }
+    let loop_ptr = ll::get_loop_for_uv_handle(handle);
+    let loop_refs = ll::loop_refcount(loop_ptr);
+    log(debug, #fmt("tear_down_close_cb called, closing handle at %? refs %?",
+                    handle, loop_refs));
+    assert loop_refs == 1i32;
 }
 
-fn high_level_tear_down(data: *hl_loop_data) unsafe {
+fn begin_teardown(data: *hl_loop_data) unsafe {
     log(debug, "high_level_tear_down() called, close async_handle");
     // call user-suppled before_tear_down cb
     let async_handle = (*data).async_handle;
@@ -300,90 +201,6 @@ fn high_level_tear_down(data: *hl_loop_data) unsafe {
     ll::close(async_handle as *libc::c_void, tear_down_close_cb);
 }
 
-unsafe fn high_level_ref(data: *hl_loop_data, handle: *libc::c_void) {
-    log(debug,"incoming hl_msg: got ..ref_handle");
-    let mut refd_handles = (*data).refd_handles;
-    let mut unrefd_handles = (*data).unrefd_handles;
-    let handle_already_refd = refd_handles.contains(handle);
-    if handle_already_refd {
-        fail "attempt to do a high-level ref an already ref'd handle";
-    }
-    let handle_already_unrefd = unrefd_handles.contains(handle);
-    // if we are ref'ing a handle (by ptr) that was already unref'd,
-    // probably
-    if handle_already_unrefd {
-        let last_idx = vec::len(unrefd_handles) - 1u;
-        let handle_idx = vec::position_elem(unrefd_handles, handle);
-        alt handle_idx {
-          none {
-            fail "trying to remove handle that isn't in unrefd_handles";
-          }
-          some(idx) {
-            unrefd_handles[idx] <-> unrefd_handles[last_idx];
-            vec::pop(unrefd_handles);
-          }
-        }
-        (*data).unrefd_handles = unrefd_handles;
-    }
-    refd_handles += [handle];
-    (*data).refd_handles = refd_handles;
-}
-
-unsafe fn high_level_unref(data: *hl_loop_data, handle: *libc::c_void,
-                   manual_unref: bool, user_close_cb: option<*u8>) {
-    log(debug,"incoming hl_msg: got auto_unref_handle");
-    let mut refd_handles = (*data).refd_handles;
-    let mut unrefd_handles = (*data).unrefd_handles;
-    log(debug, #fmt("refs: %?, unrefs %? handle %?", vec::len(refd_handles),
-                    vec::len(unrefd_handles), handle));
-    let handle_already_refd = refd_handles.contains(handle);
-    if !handle_already_refd {
-        fail "attempting to high-level unref an untracked handle";
-    }
-    let double_unref = unrefd_handles.contains(handle);
-    if double_unref {
-        log(debug, "double unref encountered");
-        if manual_unref {
-            // will allow a user to manual unref, but only signal
-            // a fail when a double-unref is caused by a user
-            fail "attempting to high-level unref an unrefd handle";
-        }
-        else {
-            log(debug, "not failing...");
-        }
-    }
-    else {
-        log(debug, "attempting to unref handle");
-        alt user_close_cb {
-          some(cb) {
-            ll::close(handle, cb);
-          }
-          none { }
-        }
-        let last_idx = vec::len(refd_handles) - 1u;
-        let handle_idx = vec::position_elem(refd_handles, handle);
-        alt handle_idx {
-          none {
-            fail "trying to remove handle that isn't in refd_handles";
-          }
-          some(idx) {
-            refd_handles[idx] <-> refd_handles[last_idx];
-            vec::pop(refd_handles);
-          }
-        }
-        (*data).refd_handles = refd_handles;
-        unrefd_handles += [handle];
-        (*data).unrefd_handles = unrefd_handles;
-        if vec::len(refd_handles) == 0u {
-            log(debug, "0 referenced handles, start loop teardown");
-            high_level_tear_down(data);
-        }
-        else {
-            log(debug, "more than 0 referenced handles");
-        }
-    }
-
-}
 #[cfg(test)]
 mod test {
     crust fn async_close_cb(handle: *ll::uv_async_t) unsafe {
@@ -397,7 +214,7 @@ mod test {
         log(debug, #fmt("async_handle_cb handle %? status %?",handle,status));
         let hl_loop = (*(ll::get_data_for_uv_handle(handle)
                         as *ah_data)).hl_loop;
-        unref_and_close(hl_loop, handle, async_close_cb);
+        ll::close(handle, async_close_cb);
     }
     type ah_data = {
         hl_loop: high_level_loop,
@@ -414,7 +231,6 @@ mod test {
         };
         let ah_data_ptr = ptr::addr_of(ah_data);
         interact(hl_loop) {|loop_ptr|
-            ref(hl_loop, ah_ptr);
             ll::async_init(loop_ptr, ah_ptr, async_handle_cb);
             ll::set_data_for_uv_handle(ah_ptr, ah_data_ptr as *libc::c_void);
             ll::async_send(ah_ptr);
@@ -446,9 +262,9 @@ mod test {
                     }));
                 },
                 // before_msg_drain
-                {|async_handle|
-                    log(debug,#fmt("hltest before_msg_drain: async_handle %?",
-                                  async_handle));
+                {|async_handle, status|
+                    log(debug,#fmt("hltest before_msg_drain: handle %? %?",
+                                  async_handle, status));
                     true
                 },
                 // before_tear_down
@@ -473,7 +289,6 @@ mod test {
     }
 
     #[test]
-    #[ignore]
     fn test_uv_hl_async() unsafe {
         let exit_po = comm::port::<()>();
         let exit_ch = comm::chan(exit_po);
@@ -485,27 +300,30 @@ mod test {
         // under race-condition type situations.. this ensures that the loop
         // lives until, at least, all of the impl_uv_hl_async() runs have been
         // called, at least.
-        let lifetime_handle = ll::async_t();
-        let lifetime_handle_ptr = ptr::addr_of(lifetime_handle);
-        interact(hl_loop) {|loop_ptr|
-            ref(hl_loop, lifetime_handle_ptr);
-            ll::async_init(loop_ptr, lifetime_handle_ptr,
-                          lifetime_async_callback);
-        };
-
+        let work_exit_po = comm::port::<()>();
+        let work_exit_ch = comm::chan(work_exit_po);
         iter::repeat(7u) {||
             task::spawn_sched(task::manual_threads(1u), {||
                 impl_uv_hl_async(hl_loop);
+                comm::send(work_exit_ch, ());
             });
         };
-        impl_uv_hl_async(hl_loop);
-        impl_uv_hl_async(hl_loop);
-        impl_uv_hl_async(hl_loop);
-        interact(hl_loop) {|loop_ptr|
-            ll::close(lifetime_handle_ptr, lifetime_handle_close);
-            unref(hl_loop, lifetime_handle_ptr);
-            log(debug, "close and unref lifetime handle");
+        iter::repeat(7u) {||
+            comm::recv(work_exit_po);
         };
+        log(debug, "sending teardown_loop msg..");
+        // the teardown msg usually comes, in the case of the global loop,
+        // as a result of receiving a msg on the weaken_task port. but,
+        // anyone rolling their own high_level_loop can decide when to
+        // send the msg. it's assert and barf, though, if all of your
+        // handles aren't uv_close'd first
+        alt hl_loop {
+          simple_task_loop({async_handle, op_chan}) {
+            comm::send(op_chan, teardown_loop);
+            ll::async_send(async_handle);
+          }
+        }
         comm::recv(exit_po);
+        log(debug, "after recv on exit_po.. exiting..");
     }
 }