about summary refs log tree commit diff
diff options
context:
space:
mode:
authorBrian Anderson <banderson@mozilla.com>2013-01-19 23:38:17 -0800
committerBrian Anderson <banderson@mozilla.com>2013-01-23 17:35:34 -0800
commitb9608fe4232c4014daa540849d471b1791b41fa6 (patch)
tree10bd75fb21036ba97b406550c4c21ebe526ae2d5
parentfb9299346af9b951890db80e47eb65625997f160 (diff)
downloadrust-b9608fe4232c4014daa540849d471b1791b41fa6.tar.gz
rust-b9608fe4232c4014daa540849d471b1791b41fa6.zip
std: Convert uv_global_loop to use pipes
-rw-r--r--src/libcore/private/global.rs15
-rw-r--r--src/libstd/flatpipes.rs3
-rw-r--r--src/libstd/net_ip.rs6
-rw-r--r--src/libstd/net_tcp.rs74
-rw-r--r--src/libstd/timer.rs21
-rw-r--r--src/libstd/uv_global_loop.rs106
-rw-r--r--src/libstd/uv_iotask.rs72
-rw-r--r--src/rt/rust_kernel.cpp1
-rw-r--r--src/rt/rust_kernel.h5
-rw-r--r--src/rt/rust_uv.cpp9
-rw-r--r--src/rt/rustrt.def.in1
-rw-r--r--src/test/run-pass/pipe-detect-term.rs2
-rw-r--r--src/test/run-pass/pipe-select.rs2
-rw-r--r--src/test/run-pass/pipe-sleep.rs2
14 files changed, 172 insertions, 147 deletions
diff --git a/src/libcore/private/global.rs b/src/libcore/private/global.rs
index 8c1500353ae..d9230e08dc7 100644
--- a/src/libcore/private/global.rs
+++ b/src/libcore/private/global.rs
@@ -114,6 +114,21 @@ unsafe fn global_data_modify_<T: Owned>(
     }
 }
 
+pub unsafe fn global_data_clone<T: Owned Clone>(
+    key: GlobalDataKey<T>) -> Option<T> {
+    let mut maybe_clone: Option<T> = None;
+    do global_data_modify(key) |current| {
+        match &current {
+            &Some(~ref value) => {
+                maybe_clone = Some(value.clone());
+            }
+            &None => ()
+        }
+        current
+    }
+    return maybe_clone;
+}
+
 // GlobalState is a map from keys to unique pointers and a
 // destructor. Keys are pointers derived from the type of the
 // global value.  There is a single GlobalState instance per runtime.
diff --git a/src/libstd/flatpipes.rs b/src/libstd/flatpipes.rs
index 0607055db5c..cc788dfee22 100644
--- a/src/libstd/flatpipes.rs
+++ b/src/libstd/flatpipes.rs
@@ -782,7 +782,6 @@ mod test {
         let (finish_port, finish_chan) = pipes::stream();
 
         let addr = ip::v4::parse_addr("127.0.0.1");
-        let iotask = uv::global_loop::get();
 
         let begin_connect_chan = Cell(move begin_connect_chan);
         let accept_chan = Cell(move accept_chan);
@@ -790,6 +789,7 @@ mod test {
         // The server task
         do task::spawn |copy addr, move begin_connect_chan,
                         move accept_chan| {
+            let iotask = &uv::global_loop::get();
             let begin_connect_chan = begin_connect_chan.take();
             let accept_chan = accept_chan.take();
             let listen_res = do tcp::listen(
@@ -821,6 +821,7 @@ mod test {
             begin_connect_port.recv();
 
             debug!("connecting");
+            let iotask = &uv::global_loop::get();
             let connect_result = tcp::connect(copy addr, port, iotask);
             assert connect_result.is_ok();
             let sock = result::unwrap(move connect_result);
diff --git a/src/libstd/net_ip.rs b/src/libstd/net_ip.rs
index fad583a668b..080c5514ac8 100644
--- a/src/libstd/net_ip.rs
+++ b/src/libstd/net_ip.rs
@@ -114,7 +114,7 @@ enum IpGetAddrErr {
  * a vector of `ip_addr` results, in the case of success, or an error
  * object in the case of failure
  */
-pub fn get_addr(node: &str, iotask: iotask)
+pub fn get_addr(node: &str, iotask: &iotask)
         -> result::Result<~[IpAddr], IpGetAddrErr> {
     do oldcomm::listen |output_ch| {
         do str::as_buf(node) |node_ptr, len| unsafe {
@@ -413,7 +413,7 @@ mod test {
     #[ignore(reason = "valgrind says it's leaky")]
     fn test_ip_get_addr() {
         let localhost_name = ~"localhost";
-        let iotask = uv::global_loop::get();
+        let iotask = &uv::global_loop::get();
         let ga_result = get_addr(localhost_name, iotask);
         if result::is_err(&ga_result) {
             fail ~"got err result from net::ip::get_addr();"
@@ -439,7 +439,7 @@ mod test {
     #[ignore(reason = "valgrind says it's leaky")]
     fn test_ip_get_addr_bad_input() {
         let localhost_name = ~"sjkl234m,./sdf";
-        let iotask = uv::global_loop::get();
+        let iotask = &uv::global_loop::get();
         let ga_result = get_addr(localhost_name, iotask);
         assert result::is_err(&ga_result);
     }
diff --git a/src/libstd/net_tcp.rs b/src/libstd/net_tcp.rs
index 847962c1773..75c7a7cbfb9 100644
--- a/src/libstd/net_tcp.rs
+++ b/src/libstd/net_tcp.rs
@@ -142,7 +142,7 @@ pub enum TcpConnectErrData {
  * `net::tcp::tcp_connect_err_data` instance will be returned
  */
 pub fn connect(input_ip: ip::IpAddr, port: uint,
-           iotask: IoTask)
+               iotask: &IoTask)
     -> result::Result<TcpSocket, TcpConnectErrData> unsafe {
     let result_po = oldcomm::Port::<ConnAttempt>();
     let closed_signal_po = oldcomm::Port::<()>();
@@ -164,7 +164,7 @@ pub fn connect(input_ip: ip::IpAddr, port: uint,
             ip::Ipv4(_) => { false }
             ip::Ipv6(_) => { true }
         },
-        iotask: iotask
+        iotask: iotask.clone()
     };
     let socket_data_ptr = ptr::addr_of(&(*socket_data));
     log(debug, fmt!("tcp_connect result_ch %?", conn_data.result_ch));
@@ -496,17 +496,17 @@ pub fn accept(new_conn: TcpNewConnection)
         let server_data_ptr = uv::ll::get_data_for_uv_handle(
             server_handle_ptr) as *TcpListenFcData;
         let reader_po = oldcomm::Port();
-        let iotask = (*server_data_ptr).iotask;
+        let iotask = &(*server_data_ptr).iotask;
         let stream_handle_ptr = malloc_uv_tcp_t();
         *(stream_handle_ptr as *mut uv::ll::uv_tcp_t) = uv::ll::tcp_t();
-        let client_socket_data = @{
+        let client_socket_data: @TcpSocketData = @{
             reader_po: reader_po,
             reader_ch: oldcomm::Chan(&reader_po),
             stream_handle_ptr : stream_handle_ptr,
             connect_req : uv::ll::connect_t(),
             write_req : uv::ll::write_t(),
             ipv6: (*server_data_ptr).ipv6,
-            iotask : iotask
+            iotask : iotask.clone()
         };
         let client_socket_data_ptr = ptr::addr_of(&(*client_socket_data));
         let client_stream_handle_ptr =
@@ -588,10 +588,10 @@ pub fn accept(new_conn: TcpNewConnection)
  * of listen exiting because of an error
  */
 pub fn listen(host_ip: ip::IpAddr, port: uint, backlog: uint,
-          iotask: IoTask,
-          on_establish_cb: fn~(oldcomm::Chan<Option<TcpErrData>>),
-          new_connect_cb: fn~(TcpNewConnection,
-                               oldcomm::Chan<Option<TcpErrData>>))
+              iotask: &IoTask,
+              on_establish_cb: fn~(oldcomm::Chan<Option<TcpErrData>>),
+              new_connect_cb: fn~(TcpNewConnection,
+                                  oldcomm::Chan<Option<TcpErrData>>))
     -> result::Result<(), TcpListenErrData> unsafe {
     do listen_common(move host_ip, port, backlog, iotask,
                      move on_establish_cb)
@@ -606,7 +606,7 @@ pub fn listen(host_ip: ip::IpAddr, port: uint, backlog: uint,
 }
 
 fn listen_common(host_ip: ip::IpAddr, port: uint, backlog: uint,
-          iotask: IoTask,
+          iotask: &IoTask,
           on_establish_cb: fn~(oldcomm::Chan<Option<TcpErrData>>),
           on_connect_cb: fn~(*uv::ll::uv_tcp_t))
     -> result::Result<(), TcpListenErrData> unsafe {
@@ -615,12 +615,12 @@ fn listen_common(host_ip: ip::IpAddr, port: uint, backlog: uint,
     let kill_ch = oldcomm::Chan(&kill_po);
     let server_stream = uv::ll::tcp_t();
     let server_stream_ptr = ptr::addr_of(&server_stream);
-    let server_data = {
+    let server_data: TcpListenFcData = {
         server_stream_ptr: server_stream_ptr,
         stream_closed_ch: oldcomm::Chan(&stream_closed_po),
         kill_ch: kill_ch,
         on_connect_cb: move on_connect_cb,
-        iotask: iotask,
+        iotask: iotask.clone(),
         ipv6: match &host_ip {
             &ip::Ipv4(_) => { false }
             &ip::Ipv6(_) => { true }
@@ -895,7 +895,7 @@ fn tear_down_socket_data(socket_data: @TcpSocketData) unsafe {
     };
     let close_data_ptr = ptr::addr_of(&close_data);
     let stream_handle_ptr = (*socket_data).stream_handle_ptr;
-    do iotask::interact((*socket_data).iotask) |loop_ptr| unsafe {
+    do iotask::interact(&(*socket_data).iotask) |loop_ptr| unsafe {
         log(debug, fmt!("interact dtor for tcp_socket stream %? loop %?",
             stream_handle_ptr, loop_ptr));
         uv::ll::set_data_for_uv_handle(stream_handle_ptr,
@@ -916,7 +916,7 @@ fn read_common_impl(socket_data: *TcpSocketData, timeout_msecs: uint)
     use timer;
 
     log(debug, ~"starting tcp::read");
-    let iotask = (*socket_data).iotask;
+    let iotask = &(*socket_data).iotask;
     let rs_result = read_start_common_impl(socket_data);
     if result::is_err(&rs_result) {
         let err_data = result::get_err(&rs_result);
@@ -956,7 +956,7 @@ fn read_stop_common_impl(socket_data: *TcpSocketData) ->
     let stream_handle_ptr = (*socket_data).stream_handle_ptr;
     let stop_po = oldcomm::Port::<Option<TcpErrData>>();
     let stop_ch = oldcomm::Chan(&stop_po);
-    do iotask::interact((*socket_data).iotask) |loop_ptr| unsafe {
+    do iotask::interact(&(*socket_data).iotask) |loop_ptr| unsafe {
         log(debug, ~"in interact cb for tcp::read_stop");
         match uv::ll::read_stop(stream_handle_ptr as *uv::ll::uv_stream_t) {
           0i32 => {
@@ -984,7 +984,7 @@ fn read_start_common_impl(socket_data: *TcpSocketData)
     let start_po = oldcomm::Port::<Option<uv::ll::uv_err_data>>();
     let start_ch = oldcomm::Chan(&start_po);
     log(debug, ~"in tcp::read_start before interact loop");
-    do iotask::interact((*socket_data).iotask) |loop_ptr| unsafe {
+    do iotask::interact(&(*socket_data).iotask) |loop_ptr| unsafe {
         log(debug, fmt!("in tcp::read_start interact cb %?", loop_ptr));
         match uv::ll::read_start(stream_handle_ptr as *uv::ll::uv_stream_t,
                                on_alloc_cb,
@@ -1024,7 +1024,7 @@ fn write_common_impl(socket_data_ptr: *TcpSocketData,
         result_ch: oldcomm::Chan(&result_po)
     };
     let write_data_ptr = ptr::addr_of(&write_data);
-    do iotask::interact((*socket_data_ptr).iotask) |loop_ptr| unsafe {
+    do iotask::interact(&(*socket_data_ptr).iotask) |loop_ptr| unsafe {
         log(debug, fmt!("in interact cb for tcp::write %?", loop_ptr));
         match uv::ll::write(write_req_ptr,
                           stream_handle_ptr,
@@ -1369,7 +1369,7 @@ pub mod test {
         }
     }
     pub fn impl_gl_tcp_ipv4_server_and_client() {
-        let hl_loop = uv::global_loop::get();
+        let hl_loop = &uv::global_loop::get();
         let server_ip = ~"127.0.0.1";
         let server_port = 8888u;
         let expected_req = ~"ping";
@@ -1381,6 +1381,7 @@ pub mod test {
         let cont_po = oldcomm::Port::<()>();
         let cont_ch = oldcomm::Chan(&cont_po);
         // server
+        let hl_loop_clone = hl_loop.clone();
         do task::spawn_sched(task::ManualThreads(1u)) {
             let actual_req = do oldcomm::listen |server_ch| {
                 run_tcp_test_server(
@@ -1389,7 +1390,7 @@ pub mod test {
                     expected_resp,
                     server_ch,
                     cont_ch,
-                    hl_loop)
+                    &hl_loop_clone)
             };
             server_result_ch.send(actual_req);
         };
@@ -1415,7 +1416,7 @@ pub mod test {
         assert str::contains(actual_resp, expected_resp);
     }
     pub fn impl_gl_tcp_ipv4_get_peer_addr() {
-        let hl_loop = uv::global_loop::get();
+        let hl_loop = &uv::global_loop::get();
         let server_ip = ~"127.0.0.1";
         let server_port = 8887u;
         let expected_resp = ~"pong";
@@ -1426,6 +1427,7 @@ pub mod test {
         let cont_po = oldcomm::Port::<()>();
         let cont_ch = oldcomm::Chan(&cont_po);
         // server
+        let hl_loop_clone = hl_loop.clone();
         do task::spawn_sched(task::ManualThreads(1u)) {
             let actual_req = do oldcomm::listen |server_ch| {
                 run_tcp_test_server(
@@ -1434,7 +1436,7 @@ pub mod test {
                     expected_resp,
                     server_ch,
                     cont_ch,
-                    hl_loop)
+                    &hl_loop_clone)
             };
             server_result_ch.send(actual_req);
         };
@@ -1445,10 +1447,11 @@ pub mod test {
             let server_ip_addr = ip::v4::parse_addr(server_ip);
             let iotask = uv::global_loop::get();
             let connect_result = connect(move server_ip_addr, server_port,
-                                         iotask);
+                                         &iotask);
 
             let sock = result::unwrap(move connect_result);
 
+            debug!("testing peer address");
             // This is what we are actually testing!
             assert net::ip::format_addr(&sock.get_peer_addr()) ==
                 ~"127.0.0.1";
@@ -1457,12 +1460,14 @@ pub mod test {
             // Fulfill the protocol the test server expects
             let resp_bytes = str::to_bytes(~"ping");
             tcp_write_single(&sock, resp_bytes);
+            debug!("message sent");
             let read_result = sock.read(0u);
             client_ch.send(str::from_bytes(read_result.get()));
+            debug!("result read");
         };
     }
     pub fn impl_gl_tcp_ipv4_client_error_connection_refused() {
-        let hl_loop = uv::global_loop::get();
+        let hl_loop = &uv::global_loop::get();
         let server_ip = ~"127.0.0.1";
         let server_port = 8889u;
         let expected_req = ~"ping";
@@ -1482,7 +1487,7 @@ pub mod test {
         }
     }
     pub fn impl_gl_tcp_ipv4_server_address_in_use() {
-        let hl_loop = uv::global_loop::get();
+        let hl_loop = &uv::global_loop::get();
         let server_ip = ~"127.0.0.1";
         let server_port = 8890u;
         let expected_req = ~"ping";
@@ -1494,6 +1499,7 @@ pub mod test {
         let cont_po = oldcomm::Port::<()>();
         let cont_ch = oldcomm::Chan(&cont_po);
         // server
+        let hl_loop_clone = hl_loop.clone();
         do task::spawn_sched(task::ManualThreads(1u)) {
             let actual_req = do oldcomm::listen |server_ch| {
                 run_tcp_test_server(
@@ -1502,7 +1508,7 @@ pub mod test {
                     expected_resp,
                     server_ch,
                     cont_ch,
-                    hl_loop)
+                    &hl_loop_clone)
             };
             server_result_ch.send(actual_req);
         };
@@ -1533,7 +1539,7 @@ pub mod test {
         }
     }
     pub fn impl_gl_tcp_ipv4_server_access_denied() {
-        let hl_loop = uv::global_loop::get();
+        let hl_loop = &uv::global_loop::get();
         let server_ip = ~"127.0.0.1";
         let server_port = 80u;
         // this one should fail..
@@ -1553,7 +1559,7 @@ pub mod test {
     }
     pub fn impl_gl_tcp_ipv4_server_client_reader_writer() {
 
-        let iotask = uv::global_loop::get();
+        let iotask = &uv::global_loop::get();
         let server_ip = ~"127.0.0.1";
         let server_port = 8891u;
         let expected_req = ~"ping";
@@ -1565,6 +1571,7 @@ pub mod test {
         let cont_po = oldcomm::Port::<()>();
         let cont_ch = oldcomm::Chan(&cont_po);
         // server
+        let iotask_clone = iotask.clone();
         do task::spawn_sched(task::ManualThreads(1u)) {
             let actual_req = do oldcomm::listen |server_ch| {
                 run_tcp_test_server(
@@ -1573,7 +1580,7 @@ pub mod test {
                     expected_resp,
                     server_ch,
                     cont_ch,
-                    iotask)
+                    &iotask_clone)
             };
             server_result_ch.send(actual_req);
         };
@@ -1604,7 +1611,7 @@ pub mod test {
     pub fn impl_tcp_socket_impl_reader_handles_eof() {
         use core::io::{Reader,ReaderUtil};
 
-        let hl_loop = uv::global_loop::get();
+        let hl_loop = &uv::global_loop::get();
         let server_ip = ~"127.0.0.1";
         let server_port = 10041u;
         let expected_req = ~"GET /";
@@ -1616,6 +1623,7 @@ pub mod test {
         let cont_po = oldcomm::Port::<()>();
         let cont_ch = oldcomm::Chan(&cont_po);
         // server
+        let hl_loop_clone = hl_loop.clone();
         do task::spawn_sched(task::ManualThreads(1u)) {
             let actual_req = do oldcomm::listen |server_ch| {
                 run_tcp_test_server(
@@ -1624,7 +1632,7 @@ pub mod test {
                     expected_resp,
                     server_ch,
                     cont_ch,
-                    hl_loop)
+                    &hl_loop_clone)
             };
             server_result_ch.send(actual_req);
         };
@@ -1664,7 +1672,7 @@ pub mod test {
     fn run_tcp_test_server(server_ip: &str, server_port: uint, resp: ~str,
                           server_ch: oldcomm::Chan<~str>,
                           cont_ch: oldcomm::Chan<()>,
-                          iotask: IoTask) -> ~str {
+                          iotask: &IoTask) -> ~str {
         let server_ip_addr = ip::v4::parse_addr(server_ip);
         let listen_result = listen(move server_ip_addr, server_port, 128,
                                    iotask,
@@ -1751,7 +1759,7 @@ pub mod test {
     }
 
     fn run_tcp_test_server_fail(server_ip: &str, server_port: uint,
-                          iotask: IoTask) -> TcpListenErrData {
+                                iotask: &IoTask) -> TcpListenErrData {
         let server_ip_addr = ip::v4::parse_addr(server_ip);
         let listen_result = listen(move server_ip_addr, server_port, 128,
                                    iotask,
@@ -1775,7 +1783,7 @@ pub mod test {
 
     fn run_tcp_test_client(server_ip: &str, server_port: uint, resp: &str,
                           client_ch: oldcomm::Chan<~str>,
-                          iotask: IoTask) -> result::Result<~str,
+                          iotask: &IoTask) -> result::Result<~str,
                                                     TcpConnectErrData> {
         let server_ip_addr = ip::v4::parse_addr(server_ip);
 
diff --git a/src/libstd/timer.rs b/src/libstd/timer.rs
index 18c623c2bd8..0f0aa2a011e 100644
--- a/src/libstd/timer.rs
+++ b/src/libstd/timer.rs
@@ -39,7 +39,7 @@ use core;
  * * ch - a channel of type T to send a `val` on
  * * val - a value of type T to send over the provided `ch`
  */
-pub fn delayed_send<T: Owned>(iotask: IoTask,
+pub fn delayed_send<T: Owned>(iotask: &IoTask,
                               msecs: uint,
                               ch: oldcomm::Chan<T>,
                               val: T) {
@@ -90,7 +90,7 @@ pub fn delayed_send<T: Owned>(iotask: IoTask,
  * * `iotask` - a `uv::iotask` that the tcp request will run on
  * * msecs - an amount of time, in milliseconds, for the current task to block
  */
-pub fn sleep(iotask: IoTask, msecs: uint) {
+pub fn sleep(iotask: &IoTask, msecs: uint) {
     let exit_po = oldcomm::Port::<()>();
     let exit_ch = oldcomm::Chan(&exit_po);
     delayed_send(iotask, msecs, exit_ch, ());
@@ -117,7 +117,7 @@ pub fn sleep(iotask: IoTask, msecs: uint) {
  * on the provided port in the allotted timeout period, then the result will
  * be a `some(T)`. If not, then `none` will be returned.
  */
-pub fn recv_timeout<T: Copy Owned>(iotask: IoTask,
+pub fn recv_timeout<T: Copy Owned>(iotask: &IoTask,
                                    msecs: uint,
                                    wait_po: oldcomm::Port<T>)
                                 -> Option<T> {
@@ -177,13 +177,13 @@ mod test {
 
     #[test]
     fn test_gl_timer_simple_sleep_test() {
-        let hl_loop = uv::global_loop::get();
+        let hl_loop = &uv::global_loop::get();
         sleep(hl_loop, 1u);
     }
 
     #[test]
     fn test_gl_timer_sleep_stress1() {
-        let hl_loop = uv::global_loop::get();
+        let hl_loop = &uv::global_loop::get();
         for iter::repeat(50u) {
             sleep(hl_loop, 1u);
         }
@@ -193,7 +193,7 @@ mod test {
     fn test_gl_timer_sleep_stress2() {
         let po = oldcomm::Port();
         let ch = oldcomm::Chan(&po);
-        let hl_loop = uv::global_loop::get();
+        let hl_loop = &uv::global_loop::get();
 
         let repeat = 20u;
         let spec = {
@@ -208,11 +208,12 @@ mod test {
 
             for spec.each |spec| {
                 let (times, maxms) = *spec;
+                let hl_loop_clone = hl_loop.clone();
                 do task::spawn {
                     use rand::*;
                     let rng = Rng();
                     for iter::repeat(times) {
-                        sleep(hl_loop, rng.next() as uint % maxms);
+                        sleep(&hl_loop_clone, rng.next() as uint % maxms);
                     }
                     oldcomm::send(ch, ());
                 }
@@ -271,12 +272,12 @@ mod test {
             let expected = rand::Rng().gen_str(16u);
             let test_po = oldcomm::Port::<~str>();
             let test_ch = oldcomm::Chan(&test_po);
-
+            let hl_loop_clone = hl_loop.clone();
             do task::spawn() {
-                delayed_send(hl_loop, 50u, test_ch, expected);
+                delayed_send(&hl_loop_clone, 50u, test_ch, expected);
             };
 
-            match recv_timeout(hl_loop, 1u, test_po) {
+            match recv_timeout(&hl_loop, 1u, test_po) {
               None => successes += 1,
               _ => failures += 1
             };
diff --git a/src/libstd/uv_global_loop.rs b/src/libstd/uv_global_loop.rs
index 276cb9cab64..097e923225a 100644
--- a/src/libstd/uv_global_loop.rs
+++ b/src/libstd/uv_global_loop.rs
@@ -19,16 +19,16 @@ use uv_iotask::{IoTask, spawn_iotask};
 
 use core::either::{Left, Right};
 use core::libc;
-use core::oldcomm::{Port, Chan, select2, listen};
-use core::private::{chan_from_global_ptr, weaken_task};
+use core::pipes::{Port, Chan, SharedChan, select2i};
+use core::private::global::{global_data_clone_create,
+                            global_data_clone};
+use core::private::weak_task::weaken_task;
 use core::str;
-use core::task::TaskBuilder;
+use core::task::{task, SingleThreaded, spawn};
 use core::task;
 use core::vec;
-
-extern mod rustrt {
-    unsafe fn rust_uv_get_kernel_global_chan_ptr() -> *libc::uintptr_t;
-}
+use core::clone::Clone;
+use core::option::{Some, None};
 
 /**
  * Race-free helper to get access to a global task where a libuv
@@ -49,64 +49,58 @@ pub fn get() -> IoTask {
 #[doc(hidden)]
 fn get_monitor_task_gl() -> IoTask unsafe {
 
-    let monitor_loop_chan_ptr = rustrt::rust_uv_get_kernel_global_chan_ptr();
+    type MonChan = Chan<IoTask>;
 
-    debug!("ENTERING global_loop::get() loop chan: %?",
-           monitor_loop_chan_ptr);
+    struct GlobalIoTask(IoTask);
 
-    debug!("before priv::chan_from_global_ptr");
-    type MonChan = Chan<IoTask>;
+    impl GlobalIoTask: Clone {
+        fn clone(&self) -> GlobalIoTask {
+            GlobalIoTask((**self).clone())
+        }
+    }
 
-    let monitor_ch =
-        do chan_from_global_ptr::<MonChan>(monitor_loop_chan_ptr,
-                                           || {
-                                                task::task().sched_mode
-                                                (task::SingleThreaded)
-                                                .unlinked()
-                                           }) |msg_po| unsafe {
-        debug!("global monitor task starting");
-
-        // As a weak task the runtime will notify us when to exit
-        do weaken_task() |weak_exit_po| {
-            debug!("global monitor task is now weak");
-            let hl_loop = spawn_loop();
-            loop {
-                debug!("in outer_loop...");
-                match select2(weak_exit_po, msg_po) {
-                  Left(weak_exit) => {
-                    // all normal tasks have ended, tell the
-                    // libuv loop to tear_down, then exit
-                    debug!("weak_exit_po recv'd msg: %?", weak_exit);
-                    iotask::exit(hl_loop);
-                    break;
-                  }
-                  Right(fetch_ch) => {
-                    debug!("hl_loop req recv'd: %?", fetch_ch);
-                    fetch_ch.send(hl_loop);
-                  }
+    fn key(_: GlobalIoTask) { }
+
+    match global_data_clone(key) {
+        Some(GlobalIoTask(iotask)) => iotask,
+        None => {
+            let iotask: IoTask = spawn_loop();
+            let mut installed = false;
+            let final_iotask = do global_data_clone_create(key) {
+                installed = true;
+                ~GlobalIoTask(iotask.clone())
+            };
+            if installed {
+                do task().unlinked().spawn() unsafe {
+                    debug!("global monitor task starting");
+                    // As a weak task the runtime will notify us when to exit
+                    do weaken_task |weak_exit_po| {
+                        debug!("global monitor task is now weak");
+                        weak_exit_po.recv();
+                        iotask::exit(&iotask);
+                        debug!("global monitor task is leaving weakend state");
+                    };
+                    debug!("global monitor task exiting");
                 }
+            } else {
+                iotask::exit(&iotask);
             }
-            debug!("global monitor task is leaving weakend state");
-        };
-        debug!("global monitor task exiting");
-    };
 
-    // once we have a chan to the monitor loop, we ask it for
-    // the libuv loop's async handle
-    do listen |fetch_ch| {
-        monitor_ch.send(fetch_ch);
-        fetch_ch.recv()
+            match final_iotask {
+                GlobalIoTask(iotask) => iotask
+            }
+        }
     }
 }
 
 fn spawn_loop() -> IoTask {
-    let builder = do task::task().add_wrapper |task_body| {
+    let builder = do task().add_wrapper |task_body| {
         fn~(move task_body) {
             // The I/O loop task also needs to be weak so it doesn't keep
             // the runtime alive
             unsafe {
-                do weaken_task |weak_exit_po| {
-                    debug!("global libuv task is now weak %?", weak_exit_po);
+                do weaken_task |_| {
+                    debug!("global libuv task is now weak");
                     task_body();
 
                     // We don't wait for the exit message on weak_exit_po
@@ -118,6 +112,7 @@ fn spawn_loop() -> IoTask {
             }
         }
     };
+    let builder = builder.unlinked();
     spawn_iotask(move builder)
 }
 
@@ -147,7 +142,7 @@ mod test {
                              _status: libc::c_int) unsafe {
         log(debug, ~"in simple timer cb");
         ll::timer_stop(timer_ptr);
-        let hl_loop = get_gl();
+        let hl_loop = &get_gl();
         do iotask::interact(hl_loop) |_loop_ptr| unsafe {
             log(debug, ~"closing timer");
             ll::close(timer_ptr, simple_timer_close_cb);
@@ -157,7 +152,7 @@ mod test {
         log(debug, ~"exiting simple timer cb");
     }
 
-    fn impl_uv_hl_simple_timer(iotask: IoTask) unsafe {
+    fn impl_uv_hl_simple_timer(iotask: &IoTask) unsafe {
         let exit_po = oldcomm::Port::<bool>();
         let exit_ch = oldcomm::Chan(&exit_po);
         let exit_ch_ptr = ptr::addr_of(&exit_ch);
@@ -190,10 +185,11 @@ mod test {
 
     #[test]
     fn test_gl_uv_global_loop_high_level_global_timer() unsafe {
-        let hl_loop = get_gl();
+        let hl_loop = &get_gl();
         let exit_po = oldcomm::Port::<()>();
         let exit_ch = oldcomm::Chan(&exit_po);
         task::spawn_sched(task::ManualThreads(1u), || {
+            let hl_loop = &get_gl();
             impl_uv_hl_simple_timer(hl_loop);
             oldcomm::send(exit_ch, ());
         });
@@ -206,12 +202,12 @@ mod test {
     #[test]
     #[ignore]
     fn test_stress_gl_uv_global_loop_high_level_global_timer() unsafe {
-        let hl_loop = get_gl();
         let exit_po = oldcomm::Port::<()>();
         let exit_ch = oldcomm::Chan(&exit_po);
         let cycles = 5000u;
         for iter::repeat(cycles) {
             task::spawn_sched(task::ManualThreads(1u), || {
+                let hl_loop = &get_gl();
                 impl_uv_hl_simple_timer(hl_loop);
                 oldcomm::send(exit_ch, ());
             });
diff --git a/src/libstd/uv_iotask.rs b/src/libstd/uv_iotask.rs
index 409d73c2539..c50a19cc5c1 100644
--- a/src/libstd/uv_iotask.rs
+++ b/src/libstd/uv_iotask.rs
@@ -20,7 +20,7 @@ use ll = uv_ll;
 
 use core::libc::c_void;
 use core::libc;
-use core::oldcomm::{Port, Chan, listen};
+use core::pipes::{stream, Port, Chan, SharedChan};
 use core::prelude::*;
 use core::ptr::addr_of;
 use core::task::TaskBuilder;
@@ -30,22 +30,30 @@ use core::task;
 pub enum IoTask {
     IoTask_({
         async_handle: *ll::uv_async_t,
-        op_chan: Chan<IoTaskMsg>
+        op_chan: SharedChan<IoTaskMsg>
     })
 }
 
+impl IoTask: Clone {
+    fn clone(&self) -> IoTask {
+        IoTask_({
+            async_handle: self.async_handle,
+            op_chan: self.op_chan.clone()
+        })
+    }
+}
+
 pub fn spawn_iotask(task: task::TaskBuilder) -> IoTask {
 
-    do listen |iotask_ch| {
+    let (iotask_port, iotask_chan) = stream();
 
-        do task.sched_mode(task::SingleThreaded).spawn {
-            debug!("entering libuv task");
-            run_loop(iotask_ch);
-            debug!("libuv task exiting");
-        };
+    do task.sched_mode(task::SingleThreaded).spawn {
+        debug!("entering libuv task");
+        run_loop(&iotask_chan);
+        debug!("libuv task exiting");
+    };
 
-        iotask_ch.recv()
-    }
+    iotask_port.recv()
 }
 
 
@@ -71,7 +79,7 @@ pub fn spawn_iotask(task: task::TaskBuilder) -> IoTask {
  * module. It is not safe to send the `loop_ptr` param to this callback out
  * via ports/chans.
  */
-pub unsafe fn interact(iotask: IoTask,
+pub unsafe fn interact(iotask: &IoTask,
                    cb: fn~(*c_void)) {
     send_msg(iotask, Interaction(move cb));
 }
@@ -83,7 +91,7 @@ pub unsafe fn interact(iotask: IoTask,
  * async handle and do a sanity check to make sure that all other handles are
  * closed, causing a failure otherwise.
  */
-pub fn exit(iotask: IoTask) unsafe {
+pub fn exit(iotask: &IoTask) unsafe {
     send_msg(iotask, TeardownLoop);
 }
 
@@ -96,8 +104,9 @@ enum IoTaskMsg {
 }
 
 /// Run the loop and begin handling messages
-fn run_loop(iotask_ch: Chan<IoTask>) unsafe {
+fn run_loop(iotask_ch: &Chan<IoTask>) unsafe {
 
+    debug!("creating loop");
     let loop_ptr = ll::loop_new();
 
     // set up the special async handle we'll use to allow multi-task
@@ -108,10 +117,12 @@ fn run_loop(iotask_ch: Chan<IoTask>) unsafe {
     // associate the async handle with the loop
     ll::async_init(loop_ptr, async_handle, wake_up_cb);
 
+    let (msg_po, msg_ch) = stream::<IoTaskMsg>();
+
     // initialize our loop data and store it in the loop
     let data: IoTaskLoopData = {
         async_handle: async_handle,
-        msg_po: Port()
+        msg_po: msg_po
     };
     ll::set_data_for_uv_handle(async_handle, addr_of(&data));
 
@@ -119,7 +130,7 @@ fn run_loop(iotask_ch: Chan<IoTask>) unsafe {
     // while we dwell in the I/O loop
     let iotask = IoTask_({
         async_handle: async_handle,
-        op_chan: data.msg_po.chan()
+        op_chan: SharedChan(msg_ch)
     });
     iotask_ch.send(iotask);
 
@@ -136,7 +147,7 @@ type IoTaskLoopData = {
     msg_po: Port<IoTaskMsg>
 };
 
-fn send_msg(iotask: IoTask,
+fn send_msg(iotask: &IoTask,
             msg: IoTaskMsg) unsafe {
     iotask.op_chan.send(move msg);
     ll::async_send(iotask.async_handle);
@@ -151,7 +162,7 @@ extern fn wake_up_cb(async_handle: *ll::uv_async_t,
 
     let loop_ptr = ll::get_loop_for_uv_handle(async_handle);
     let data = ll::get_data_for_uv_handle(async_handle) as *IoTaskLoopData;
-    let msg_po = (*data).msg_po;
+    let msg_po = &(*data).msg_po;
 
     while msg_po.peek() {
         match msg_po.recv() {
@@ -203,34 +214,37 @@ mod test {
         iotask: IoTask,
         exit_ch: oldcomm::Chan<()>
     };
-    fn impl_uv_iotask_async(iotask: IoTask) unsafe {
+    fn impl_uv_iotask_async(iotask: &IoTask) unsafe {
         let async_handle = ll::async_t();
         let ah_ptr = ptr::addr_of(&async_handle);
         let exit_po = oldcomm::Port::<()>();
         let exit_ch = oldcomm::Chan(&exit_po);
         let ah_data = {
-            iotask: iotask,
+            iotask: iotask.clone(),
             exit_ch: exit_ch
         };
-        let ah_data_ptr = ptr::addr_of(&ah_data);
+        let ah_data_ptr: *AhData = ptr::to_unsafe_ptr(&ah_data);
+        debug!("about to interact");
         do interact(iotask) |loop_ptr| unsafe {
+            debug!("interacting");
             ll::async_init(loop_ptr, ah_ptr, async_handle_cb);
             ll::set_data_for_uv_handle(ah_ptr, ah_data_ptr as *libc::c_void);
             ll::async_send(ah_ptr);
         };
+        debug!("waiting for async close");
         oldcomm::recv(exit_po);
     }
 
     // this fn documents the bear minimum neccesary to roll your own
     // high_level_loop
     unsafe fn spawn_test_loop(exit_ch: oldcomm::Chan<()>) -> IoTask {
-        let iotask_port = oldcomm::Port::<IoTask>();
-        let iotask_ch = oldcomm::Chan(&iotask_port);
+        let (iotask_port, iotask_ch) = stream::<IoTask>();
         do task::spawn_sched(task::ManualThreads(1u)) {
-            run_loop(iotask_ch);
+            debug!("about to run a test loop");
+            run_loop(&iotask_ch);
             exit_ch.send(());
         };
-        return oldcomm::recv(iotask_port);
+        return iotask_port.recv();
     }
 
     extern fn lifetime_handle_close(handle: *libc::c_void) unsafe {
@@ -247,7 +261,9 @@ mod test {
     fn test_uv_iotask_async() unsafe {
         let exit_po = oldcomm::Port::<()>();
         let exit_ch = oldcomm::Chan(&exit_po);
-        let iotask = spawn_test_loop(exit_ch);
+        let iotask = &spawn_test_loop(exit_ch);
+
+        debug!("spawned iotask");
 
         // using this handle to manage the lifetime of the high_level_loop,
         // as it will exit the first time one of the impl_uv_hl_async() is
@@ -258,12 +274,16 @@ mod test {
         let work_exit_po = oldcomm::Port::<()>();
         let work_exit_ch = oldcomm::Chan(&work_exit_po);
         for iter::repeat(7u) {
+            let iotask_clone = iotask.clone();
             do task::spawn_sched(task::ManualThreads(1u)) {
-                impl_uv_iotask_async(iotask);
+                debug!("async");
+                impl_uv_iotask_async(&iotask_clone);
+                debug!("done async");
                 oldcomm::send(work_exit_ch, ());
             };
         };
         for iter::repeat(7u) {
+            debug!("waiting");
             oldcomm::recv(work_exit_po);
         };
         log(debug, ~"sending teardown_loop msg..");
diff --git a/src/rt/rust_kernel.cpp b/src/rt/rust_kernel.cpp
index 7e342878841..8ca49ea6a57 100644
--- a/src/rt/rust_kernel.cpp
+++ b/src/rt/rust_kernel.cpp
@@ -34,7 +34,6 @@ rust_kernel::rust_kernel(rust_env *env) :
     sched_reaper(this),
     osmain_driver(NULL),
     non_weak_tasks(0),
-    global_loop_chan(0),
     at_exit_runner(NULL),
     at_exit_started(false),
     env(env),
diff --git a/src/rt/rust_kernel.h b/src/rt/rust_kernel.h
index 477e59d1b3e..8ba0405b86e 100644
--- a/src/rt/rust_kernel.h
+++ b/src/rt/rust_kernel.h
@@ -129,9 +129,6 @@ class rust_kernel {
     void end_weak_tasks();
     void begin_shutdown();
 
-    // Used to communicate with the process-side, global libuv loop
-    uintptr_t global_loop_chan;
-
     lock_and_signal at_exit_lock;
     spawn_fn at_exit_runner;
     bool at_exit_started;
@@ -190,8 +187,6 @@ public:
 
     bool send_to_port(rust_port_id chan, void *sptr);
 
-    uintptr_t* get_global_loop() { return &global_loop_chan; }
-
     void register_exit_function(spawn_fn runner, fn_env_pair *f);
 };
 
diff --git a/src/rt/rust_uv.cpp b/src/rt/rust_uv.cpp
index 53d8177bcf8..2dc70088628 100644
--- a/src/rt/rust_uv.cpp
+++ b/src/rt/rust_uv.cpp
@@ -513,15 +513,6 @@ rust_uv_ip6_port(struct sockaddr_in6* src) {
     return ntohs(src->sin6_port);
 }
 
-extern "C" uintptr_t*
-rust_uv_get_kernel_global_chan_ptr() {
-    uintptr_t* result = rust_get_current_task()->kernel->get_global_loop();
-    rust_task* task = rust_get_current_task();
-    LOG(task, stdlib, "global loop: %lu", (unsigned long int)result);
-    LOG(task, stdlib,"global loop val: %lu", (unsigned long int)*result);
-    return result;
-}
-
 extern "C" void*
 rust_uv_current_kernel_malloc(size_t size) {
     return current_kernel_malloc(size, "rust_uv_current_kernel_malloc");
diff --git a/src/rt/rustrt.def.in b/src/rt/rustrt.def.in
index dd84e5ff6e7..8e8ce9ee509 100644
--- a/src/rt/rustrt.def.in
+++ b/src/rt/rustrt.def.in
@@ -158,7 +158,6 @@ rust_uv_get_data_for_req
 rust_uv_set_data_for_req
 rust_uv_get_base_from_buf
 rust_uv_get_len_from_buf
-rust_uv_get_kernel_global_chan_ptr
 rust_uv_current_kernel_malloc
 rust_uv_current_kernel_free
 rust_uv_getaddrinfo
diff --git a/src/test/run-pass/pipe-detect-term.rs b/src/test/run-pass/pipe-detect-term.rs
index c2d4be04191..10b13d8757f 100644
--- a/src/test/run-pass/pipe-detect-term.rs
+++ b/src/test/run-pass/pipe-detect-term.rs
@@ -27,7 +27,7 @@ proto! oneshot (
 )
 
 fn main() {
-    let iotask = uv::global_loop::get();
+    let iotask = &uv::global_loop::get();
     
     pipes::spawn_service(oneshot::init, |p| { 
         match try_recv(move p) {
diff --git a/src/test/run-pass/pipe-select.rs b/src/test/run-pass/pipe-select.rs
index e71d0c4931d..e138f2562aa 100644
--- a/src/test/run-pass/pipe-select.rs
+++ b/src/test/run-pass/pipe-select.rs
@@ -35,7 +35,7 @@ fn main() {
     use oneshot::client::*;
     use stream::client::*;
 
-    let iotask = uv::global_loop::get();
+    let iotask = &uv::global_loop::get();
     
     let c = pipes::spawn_service(stream::init, |p| { 
         error!("waiting for pipes");
diff --git a/src/test/run-pass/pipe-sleep.rs b/src/test/run-pass/pipe-sleep.rs
index 4a6e7b4ce36..ae7e4e7fb0c 100644
--- a/src/test/run-pass/pipe-sleep.rs
+++ b/src/test/run-pass/pipe-sleep.rs
@@ -27,7 +27,7 @@ fn main() {
 
     let c = pipes::spawn_service(oneshot::init, |p| { recv(move p); });
 
-    let iotask = uv::global_loop::get();
+    let iotask = &uv::global_loop::get();
     sleep(iotask, 500);
     
     signal(move c);