about summary refs log tree commit diff
path: root/src
diff options
context:
space:
mode:
authorBrian Anderson <banderson@mozilla.com>2013-01-25 00:52:50 -0800
committerBrian Anderson <banderson@mozilla.com>2013-01-29 19:54:55 -0800
commitda4b3768971c7c025ba8a85ebf59572fd752dfb6 (patch)
treee47abf6553bffeaaa35d682f74f0cc42df5e913e /src
parent87acde8826af4dfd8391cbccc48526381796dab3 (diff)
downloadrust-da4b3768971c7c025ba8a85ebf59572fd752dfb6.tar.gz
rust-da4b3768971c7c025ba8a85ebf59572fd752dfb6.zip
std: Stop using oldcomm
Diffstat (limited to 'src')
-rw-r--r--src/libstd/arc.rs1
-rw-r--r--src/libstd/c_vec.rs1
-rw-r--r--src/libstd/net_ip.rs71
-rw-r--r--src/libstd/net_tcp.rs579
-rw-r--r--src/libstd/test.rs42
-rw-r--r--src/libstd/timer.rs91
-rw-r--r--src/libstd/uv_iotask.rs34
-rw-r--r--src/libstd/uv_ll.rs59
8 files changed, 429 insertions, 449 deletions
diff --git a/src/libstd/arc.rs b/src/libstd/arc.rs
index a45e2b32941..e50245168b1 100644
--- a/src/libstd/arc.rs
+++ b/src/libstd/arc.rs
@@ -483,7 +483,6 @@ mod tests {
     use arc::*;
     use arc;
 
-    use core::oldcomm::*;
     use core::option::{Some, None};
     use core::option;
     use core::pipes;
diff --git a/src/libstd/c_vec.rs b/src/libstd/c_vec.rs
index 359d3039229..c190d08687a 100644
--- a/src/libstd/c_vec.rs
+++ b/src/libstd/c_vec.rs
@@ -38,7 +38,6 @@
 #[forbid(deprecated_mode)];
 
 use core::libc;
-use core::oldcomm;
 use core::option;
 use core::prelude::*;
 use core::ptr;
diff --git a/src/libstd/net_ip.rs b/src/libstd/net_ip.rs
index 72e58cbd5d3..839d0d23a61 100644
--- a/src/libstd/net_ip.rs
+++ b/src/libstd/net_ip.rs
@@ -12,8 +12,8 @@
 #[forbid(deprecated_mode)];
 
 use core::libc;
-use core::oldcomm;
 use core::prelude::*;
+use core::pipes::{stream, SharedChan};
 use core::ptr;
 use core::result;
 use core::str;
@@ -113,40 +113,40 @@ enum IpGetAddrErr {
  * A `result<~[ip_addr], ip_get_addr_err>` instance that will contain
  * a vector of `ip_addr` results, in the case of success, or an error
  * object in the case of failure
- */
+*/
 pub fn get_addr(node: &str, iotask: &iotask)
-        -> result::Result<~[IpAddr], IpGetAddrErr> {
-    do oldcomm::listen |output_ch| {
-        do str::as_buf(node) |node_ptr, len| {
-            unsafe {
-                log(debug, fmt!("slice len %?", len));
-                let handle = create_uv_getaddrinfo_t();
-                let handle_ptr = ptr::addr_of(&handle);
-                let handle_data = GetAddrData {
-                    output_ch: output_ch
-                };
-                let handle_data_ptr = ptr::addr_of(&handle_data);
-                do interact(iotask) |loop_ptr| {
-                    unsafe {
-                        let result = uv_getaddrinfo(
-                            loop_ptr,
-                            handle_ptr,
-                            get_addr_cb,
-                            node_ptr,
-                            ptr::null(),
-                            ptr::null());
-                        match result {
-                          0i32 => {
+    -> result::Result<~[IpAddr], IpGetAddrErr> {
+    let (output_po, output_ch) = stream();
+    let output_ch = SharedChan(output_ch);
+    do str::as_buf(node) |node_ptr, len| {
+        unsafe {
+            log(debug, fmt!("slice len %?", len));
+            let handle = create_uv_getaddrinfo_t();
+            let handle_ptr = ptr::addr_of(&handle);
+            let handle_data = GetAddrData {
+                output_ch: output_ch.clone()
+            };
+            let handle_data_ptr = ptr::addr_of(&handle_data);
+            do interact(iotask) |loop_ptr| {
+                unsafe {
+                    let result = uv_getaddrinfo(
+                        loop_ptr,
+                        handle_ptr,
+                        get_addr_cb,
+                        node_ptr,
+                        ptr::null(),
+                        ptr::null());
+                    match result {
+                        0i32 => {
                             set_data_for_req(handle_ptr, handle_data_ptr);
-                          }
-                          _ => {
+                        }
+                        _ => {
                             output_ch.send(result::Err(GetAddrUnknownError));
-                          }
                         }
                     }
-                };
-                output_ch.recv()
-            }
+                }
+            };
+            output_po.recv()
         }
     }
 }
@@ -300,7 +300,7 @@ pub mod v6 {
 }
 
 struct GetAddrData {
-    output_ch: oldcomm::Chan<result::Result<~[IpAddr],IpGetAddrErr>>
+    output_ch: SharedChan<result::Result<~[IpAddr],IpGetAddrErr>>
 }
 
 extern fn get_addr_cb(handle: *uv_getaddrinfo_t, status: libc::c_int,
@@ -309,6 +309,7 @@ extern fn get_addr_cb(handle: *uv_getaddrinfo_t, status: libc::c_int,
         log(debug, ~"in get_addr_cb");
         let handle_data = get_data_for_req(handle) as
             *GetAddrData;
+        let output_ch = (*handle_data).output_ch.clone();
         if status == 0i32 {
             if res != (ptr::null::<addrinfo>()) {
                 let mut out_vec = ~[];
@@ -326,7 +327,7 @@ extern fn get_addr_cb(handle: *uv_getaddrinfo_t, status: libc::c_int,
                     else {
                         log(debug, ~"curr_addr is not of family AF_INET or "+
                             ~"AF_INET6. Error.");
-                        (*handle_data).output_ch.send(
+                        output_ch.send(
                             result::Err(GetAddrUnknownError));
                         break;
                     };
@@ -344,17 +345,17 @@ extern fn get_addr_cb(handle: *uv_getaddrinfo_t, status: libc::c_int,
                 }
                 log(debug, fmt!("successful process addrinfo result, len: %?",
                                 vec::len(out_vec)));
-                (*handle_data).output_ch.send(result::Ok(move out_vec));
+                output_ch.send(result::Ok(move out_vec));
             }
             else {
                 log(debug, ~"addrinfo pointer is NULL");
-                (*handle_data).output_ch.send(
+                output_ch.send(
                     result::Err(GetAddrUnknownError));
             }
         }
         else {
             log(debug, ~"status != 0 error in get_addr_cb");
-            (*handle_data).output_ch.send(
+            output_ch.send(
                 result::Err(GetAddrUnknownError));
         }
         if res != (ptr::null::<addrinfo>()) {
diff --git a/src/libstd/net_tcp.rs b/src/libstd/net_tcp.rs
index 608723a6ca5..d9e4bfc540c 100644
--- a/src/libstd/net_tcp.rs
+++ b/src/libstd/net_tcp.rs
@@ -23,7 +23,7 @@ use core::io::{Reader, ReaderUtil, Writer};
 use core::io;
 use core::libc::size_t;
 use core::libc;
-use core::oldcomm;
+use core::pipes::{stream, Chan, Port, SharedChan};
 use core::prelude::*;
 use core::ptr;
 use core::result::{Result};
@@ -146,19 +146,22 @@ pub fn connect(input_ip: ip::IpAddr, port: uint,
                iotask: &IoTask)
     -> result::Result<TcpSocket, TcpConnectErrData> {
     unsafe {
-        let result_po = oldcomm::Port::<ConnAttempt>();
-        let closed_signal_po = oldcomm::Port::<()>();
-        let conn_data = {
-            result_ch: oldcomm::Chan(&result_po),
-            closed_signal_ch: oldcomm::Chan(&closed_signal_po)
+        let (result_po, result_ch) = stream::<ConnAttempt>();
+        let result_ch = SharedChan(result_ch);
+        let (closed_signal_po, closed_signal_ch) = stream::<()>();
+        let closed_signal_ch = SharedChan(closed_signal_ch);
+        let conn_data = ConnectReqData {
+            result_ch: result_ch,
+            closed_signal_ch: closed_signal_ch
         };
         let conn_data_ptr = ptr::addr_of(&conn_data);
-        let reader_po = oldcomm::Port::<result::Result<~[u8], TcpErrData>>();
+        let (reader_po, reader_ch) = stream::<Result<~[u8], TcpErrData>>();
+        let reader_ch = SharedChan(reader_ch);
         let stream_handle_ptr = malloc_uv_tcp_t();
         *(stream_handle_ptr as *mut uv::ll::uv_tcp_t) = uv::ll::tcp_t();
         let socket_data = @TcpSocketData {
-            reader_po: reader_po,
-            reader_ch: oldcomm::Chan(&reader_po),
+            reader_po: @reader_po,
+            reader_ch: reader_ch,
             stream_handle_ptr: stream_handle_ptr,
             connect_req: uv::ll::connect_t(),
             write_req: uv::ll::write_t(),
@@ -169,7 +172,6 @@ pub fn connect(input_ip: ip::IpAddr, port: uint,
             iotask: iotask.clone()
         };
         let socket_data_ptr = ptr::addr_of(&(*socket_data));
-        debug!("tcp_connect result_ch %?", conn_data.result_ch);
         // get an unsafe representation of our stream_handle_ptr that
         // we can send into the interact cb to be handled in libuv..
         debug!("stream_handle_ptr outside interact %?",
@@ -238,8 +240,9 @@ pub fn connect(input_ip: ip::IpAddr, port: uint,
                                 // somesuch
                                 let err_data =
                                     uv::ll::get_last_err_data(loop_ptr);
-                                oldcomm::send((*conn_data_ptr).result_ch,
-                                              ConnFailure(err_data));
+                                let result_ch = (*conn_data_ptr)
+                                    .result_ch.clone();
+                                result_ch.send(ConnFailure(err_data));
                                 uv::ll::set_data_for_uv_handle(
                                     stream_handle_ptr,
                                     conn_data_ptr);
@@ -251,19 +254,19 @@ pub fn connect(input_ip: ip::IpAddr, port: uint,
                     _ => {
                         // failure to create a tcp handle
                         let err_data = uv::ll::get_last_err_data(loop_ptr);
-                        oldcomm::send((*conn_data_ptr).result_ch,
-                                      ConnFailure(err_data));
+                        let result_ch = (*conn_data_ptr).result_ch.clone();
+                        result_ch.send(ConnFailure(err_data));
                     }
                 }
             }
         }
-        match oldcomm::recv(result_po) {
+        match result_po.recv() {
             ConnSuccess => {
                 debug!("tcp::connect - received success on result_po");
                 result::Ok(TcpSocket(socket_data))
             }
             ConnFailure(ref err_data) => {
-                oldcomm::recv(closed_signal_po);
+                closed_signal_po.recv();
                 debug!("tcp::connect - received failure on result_po");
                 // still have to free the malloc'd stream handle..
                 rustrt::rust_uv_current_kernel_free(stream_handle_ptr
@@ -359,7 +362,7 @@ pub fn write_future(sock: &TcpSocket, raw_write_data: ~[u8])
  * `tcp_err_data` record
  */
 pub fn read_start(sock: &TcpSocket)
-    -> result::Result<oldcomm::Port<
+    -> result::Result<@Port<
         result::Result<~[u8], TcpErrData>>, TcpErrData> {
     unsafe {
         let socket_data = ptr::addr_of(&(*(sock.socket_data)));
@@ -374,12 +377,9 @@ pub fn read_start(sock: &TcpSocket)
  *
  * * `sock` - a `net::tcp::tcp_socket` that you wish to stop reading on
  */
-pub fn read_stop(sock: &TcpSocket,
-             read_port: oldcomm::Port<result::Result<~[u8], TcpErrData>>) ->
+pub fn read_stop(sock: &TcpSocket) ->
     result::Result<(), TcpErrData> {
     unsafe {
-        debug!(
-            "taking the read_port out of commission %?", read_port);
         let socket_data = ptr::addr_of(&(*sock.socket_data));
         read_stop_common_impl(socket_data)
     }
@@ -519,14 +519,16 @@ pub fn accept(new_conn: TcpNewConnection)
             NewTcpConn(server_handle_ptr) => {
                 let server_data_ptr = uv::ll::get_data_for_uv_handle(
                     server_handle_ptr) as *TcpListenFcData;
-                let reader_po = oldcomm::Port();
+                let (reader_po, reader_ch) = stream::<
+                    Result<~[u8], TcpErrData>>();
+                let reader_ch = SharedChan(reader_ch);
                 let iotask = &(*server_data_ptr).iotask;
                 let stream_handle_ptr = malloc_uv_tcp_t();
                 *(stream_handle_ptr as *mut uv::ll::uv_tcp_t) =
                     uv::ll::tcp_t();
                 let client_socket_data: @TcpSocketData = @TcpSocketData {
-                    reader_po: reader_po,
-                    reader_ch: oldcomm::Chan(&reader_po),
+                    reader_po: @reader_po,
+                    reader_ch: reader_ch,
                     stream_handle_ptr : stream_handle_ptr,
                     connect_req : uv::ll::connect_t(),
                     write_req : uv::ll::write_t(),
@@ -538,8 +540,8 @@ pub fn accept(new_conn: TcpNewConnection)
                 let client_stream_handle_ptr =
                     (*client_socket_data_ptr).stream_handle_ptr;
 
-                let result_po = oldcomm::Port::<Option<TcpErrData>>();
-                let result_ch = oldcomm::Chan(&result_po);
+                let (result_po, result_ch) = stream::<Option<TcpErrData>>();
+                let result_ch = SharedChan(result_ch);
 
                 // UNSAFE LIBUV INTERACTION BEGIN
                 // .. normally this happens within the context of
@@ -565,11 +567,11 @@ pub fn accept(new_conn: TcpNewConnection)
                                     client_stream_handle_ptr,
                                     client_socket_data_ptr
                                     as *libc::c_void);
-                                oldcomm::send(result_ch, None);
+                                result_ch.send(None);
                             }
                             _ => {
                                 log(debug, ~"failed to accept client conn");
-                                oldcomm::send(result_ch, Some(
+                                result_ch.send(Some(
                                     uv::ll::get_last_err_data(
                                         loop_ptr).to_tcp_err()));
                             }
@@ -577,13 +579,13 @@ pub fn accept(new_conn: TcpNewConnection)
                     }
                     _ => {
                         log(debug, ~"failed to accept client stream");
-                        oldcomm::send(result_ch, Some(
+                        result_ch.send(Some(
                             uv::ll::get_last_err_data(
                                 loop_ptr).to_tcp_err()));
                     }
                 }
                 // UNSAFE LIBUV INTERACTION END
-                match oldcomm::recv(result_po) {
+                match result_po.recv() {
                     Some(copy err_data) => result::Err(err_data),
                     None => result::Ok(TcpSocket(client_socket_data))
                 }
@@ -622,9 +624,9 @@ pub fn accept(new_conn: TcpNewConnection)
  */
 pub fn listen(host_ip: ip::IpAddr, port: uint, backlog: uint,
               iotask: &IoTask,
-              on_establish_cb: fn~(oldcomm::Chan<Option<TcpErrData>>),
+              on_establish_cb: fn~(SharedChan<Option<TcpErrData>>),
               new_connect_cb: fn~(TcpNewConnection,
-                                  oldcomm::Chan<Option<TcpErrData>>))
+                                  SharedChan<Option<TcpErrData>>))
     -> result::Result<(), TcpListenErrData> {
     do listen_common(move host_ip, port, backlog, iotask,
                      move on_establish_cb)
@@ -634,7 +636,7 @@ pub fn listen(host_ip: ip::IpAddr, port: uint, backlog: uint,
             let server_data_ptr = uv::ll::get_data_for_uv_handle(handle)
                 as *TcpListenFcData;
             let new_conn = NewTcpConn(handle);
-            let kill_ch = (*server_data_ptr).kill_ch;
+            let kill_ch = (*server_data_ptr).kill_ch.clone();
             new_connect_cb(new_conn, kill_ch);
         }
     }
@@ -642,19 +644,20 @@ pub fn listen(host_ip: ip::IpAddr, port: uint, backlog: uint,
 
 fn listen_common(host_ip: ip::IpAddr, port: uint, backlog: uint,
           iotask: &IoTask,
-          on_establish_cb: fn~(oldcomm::Chan<Option<TcpErrData>>),
+          on_establish_cb: fn~(SharedChan<Option<TcpErrData>>),
           on_connect_cb: fn~(*uv::ll::uv_tcp_t))
     -> result::Result<(), TcpListenErrData> {
     unsafe {
-        let stream_closed_po = oldcomm::Port::<()>();
-        let kill_po = oldcomm::Port::<Option<TcpErrData>>();
-        let kill_ch = oldcomm::Chan(&kill_po);
+        let (stream_closed_po, stream_closed_ch) = stream::<()>();
+        let stream_closed_ch = SharedChan(stream_closed_ch);
+        let (kill_po, kill_ch) = stream::<Option<TcpErrData>>();
+        let kill_ch = SharedChan(kill_ch);
         let server_stream = uv::ll::tcp_t();
         let server_stream_ptr = ptr::addr_of(&server_stream);
         let server_data: TcpListenFcData = TcpListenFcData {
             server_stream_ptr: server_stream_ptr,
-            stream_closed_ch: oldcomm::Chan(&stream_closed_po),
-            kill_ch: kill_ch,
+            stream_closed_ch: stream_closed_ch,
+            kill_ch: kill_ch.clone(),
             on_connect_cb: move on_connect_cb,
             iotask: iotask.clone(),
             ipv6: match &host_ip {
@@ -665,77 +668,78 @@ fn listen_common(host_ip: ip::IpAddr, port: uint, backlog: uint,
         };
         let server_data_ptr = ptr::addr_of(&server_data);
 
-        let setup_result = do oldcomm::listen |setup_ch| {
-            // this is to address a compiler warning about
-            // an implicit copy.. it seems that double nested
-            // will defeat a move sigil, as is done to the host_ip
-            // arg above.. this same pattern works w/o complaint in
-            // tcp::connect (because the iotask::interact cb isn't
-            // nested within a core::comm::listen block)
-            let loc_ip = copy(host_ip);
-            do iotask::interact(iotask) |move loc_ip, loop_ptr| {
-                unsafe {
-                    match uv::ll::tcp_init(loop_ptr, server_stream_ptr) {
-                        0i32 => {
-                            uv::ll::set_data_for_uv_handle(
-                                server_stream_ptr,
-                                server_data_ptr);
-                            let addr_str = ip::format_addr(&loc_ip);
-                            let bind_result = match loc_ip {
-                                ip::Ipv4(ref addr) => {
-                                    log(debug, fmt!("addr: %?", addr));
-                                    let in_addr = uv::ll::ip4_addr(
-                                        addr_str,
-                                        port as int);
-                                    uv::ll::tcp_bind(server_stream_ptr,
-                                                     ptr::addr_of(&in_addr))
-                                }
-                                ip::Ipv6(ref addr) => {
-                                    log(debug, fmt!("addr: %?", addr));
-                                    let in_addr = uv::ll::ip6_addr(
-                                        addr_str,
-                                        port as int);
-                                    uv::ll::tcp_bind6(server_stream_ptr,
-                                                      ptr::addr_of(&in_addr))
-                                }
-                            };
-                            match bind_result {
-                                0i32 => {
-                                    match uv::ll::listen(
-                                        server_stream_ptr,
-                                        backlog as libc::c_int,
-                                        tcp_lfc_on_connection_cb) {
-                                        0i32 => oldcomm::send(setup_ch, None),
-                                        _ => {
-                                            log(debug,
-                                                ~"failure to uv_tcp_init");
-                                            let err_data =
-                                                uv::ll::get_last_err_data(
-                                                    loop_ptr);
-                                            oldcomm::send(setup_ch,
-                                                          Some(err_data));
-                                        }
+        let (setup_po, setup_ch) = stream();
+
+        // this is to address a compiler warning about
+        // an implicit copy.. it seems that double nested
+        // will defeat a move sigil, as is done to the host_ip
+        // arg above.. this same pattern works w/o complaint in
+        // tcp::connect (because the iotask::interact cb isn't
+        // nested within a core::comm::listen block)
+        let loc_ip = copy(host_ip);
+        do iotask::interact(iotask) |move loc_ip, loop_ptr| {
+            unsafe {
+                match uv::ll::tcp_init(loop_ptr, server_stream_ptr) {
+                    0i32 => {
+                        uv::ll::set_data_for_uv_handle(
+                            server_stream_ptr,
+                            server_data_ptr);
+                        let addr_str = ip::format_addr(&loc_ip);
+                        let bind_result = match loc_ip {
+                            ip::Ipv4(ref addr) => {
+                                log(debug, fmt!("addr: %?", addr));
+                                let in_addr = uv::ll::ip4_addr(
+                                    addr_str,
+                                    port as int);
+                                uv::ll::tcp_bind(server_stream_ptr,
+                                                 ptr::addr_of(&in_addr))
+                            }
+                            ip::Ipv6(ref addr) => {
+                                log(debug, fmt!("addr: %?", addr));
+                                let in_addr = uv::ll::ip6_addr(
+                                    addr_str,
+                                    port as int);
+                                uv::ll::tcp_bind6(server_stream_ptr,
+                                                  ptr::addr_of(&in_addr))
+                            }
+                        };
+                        match bind_result {
+                            0i32 => {
+                                match uv::ll::listen(
+                                    server_stream_ptr,
+                                    backlog as libc::c_int,
+                                    tcp_lfc_on_connection_cb) {
+                                    0i32 => setup_ch.send(None),
+                                    _ => {
+                                        log(debug,
+                                            ~"failure to uv_tcp_init");
+                                        let err_data =
+                                            uv::ll::get_last_err_data(
+                                                loop_ptr);
+                                        setup_ch.send(Some(err_data));
                                     }
                                 }
-                                _ => {
-                                    log(debug, ~"failure to uv_tcp_bind");
-                                    let err_data = uv::ll::get_last_err_data(
-                                        loop_ptr);
-                                    oldcomm::send(setup_ch, Some(err_data));
-                                }
+                            }
+                            _ => {
+                                log(debug, ~"failure to uv_tcp_bind");
+                                let err_data = uv::ll::get_last_err_data(
+                                    loop_ptr);
+                                setup_ch.send(Some(err_data));
                             }
                         }
-                        _ => {
-                            log(debug, ~"failure to uv_tcp_bind");
-                            let err_data = uv::ll::get_last_err_data(
-                                loop_ptr);
-                            oldcomm::send(setup_ch, Some(err_data));
-                        }
+                    }
+                    _ => {
+                        log(debug, ~"failure to uv_tcp_bind");
+                        let err_data = uv::ll::get_last_err_data(
+                            loop_ptr);
+                        setup_ch.send(Some(err_data));
                     }
                 }
             }
-            setup_ch.recv()
-        };
+        }
+
+        let setup_result = setup_po.recv();
+
         match setup_result {
             Some(ref err_data) => {
                 do iotask::interact(iotask) |loop_ptr| {
@@ -767,8 +771,8 @@ fn listen_common(host_ip: ip::IpAddr, port: uint, backlog: uint,
                 }
             }
             None => {
-                on_establish_cb(kill_ch);
-                let kill_result = oldcomm::recv(kill_po);
+                on_establish_cb(kill_ch.clone());
+                let kill_result = kill_po.recv();
                 do iotask::interact(iotask) |loop_ptr| {
                     unsafe {
                         log(debug,
@@ -816,14 +820,13 @@ pub fn socket_buf(sock: TcpSocket) -> TcpSocketBuf {
 
 /// Convenience methods extending `net::tcp::tcp_socket`
 impl TcpSocket {
-    pub fn read_start() -> result::Result<oldcomm::Port<
+    pub fn read_start() -> result::Result<@Port<
         result::Result<~[u8], TcpErrData>>, TcpErrData> {
         read_start(&self)
     }
-    pub fn read_stop(read_port:
-                 oldcomm::Port<result::Result<~[u8], TcpErrData>>) ->
+    pub fn read_stop() ->
         result::Result<(), TcpErrData> {
-        read_stop(&self, move read_port)
+        read_stop(&self)
     }
     fn read(timeout_msecs: uint) ->
         result::Result<~[u8], TcpErrData> {
@@ -995,9 +998,9 @@ impl TcpSocketBuf: io::Writer {
 
 fn tear_down_socket_data(socket_data: @TcpSocketData) {
     unsafe {
-        let closed_po = oldcomm::Port::<()>();
-        let closed_ch = oldcomm::Chan(&closed_po);
-        let close_data = {
+        let (closed_po, closed_ch) = stream::<()>();
+        let closed_ch = SharedChan(closed_ch);
+        let close_data = TcpSocketCloseData {
             closed_ch: closed_ch
         };
         let close_data_ptr = ptr::addr_of(&close_data);
@@ -1012,7 +1015,7 @@ fn tear_down_socket_data(socket_data: @TcpSocketData) {
                 uv::ll::close(stream_handle_ptr, tcp_socket_dtor_close_cb);
             }
         };
-        oldcomm::recv(closed_po);
+        closed_po.recv();
         //the line below will most likely crash
         //log(debug, fmt!("about to free socket_data at %?", socket_data));
         rustrt::rust_uv_current_kernel_free(stream_handle_ptr
@@ -1038,9 +1041,9 @@ fn read_common_impl(socket_data: *TcpSocketData, timeout_msecs: uint)
             log(debug, ~"tcp::read before recv_timeout");
             let read_result = if timeout_msecs > 0u {
                 timer::recv_timeout(
-                    iotask, timeout_msecs, result::get(&rs_result))
+                    iotask, timeout_msecs, result::unwrap(rs_result))
             } else {
-                Some(oldcomm::recv(result::get(&rs_result)))
+                Some(result::get(&rs_result).recv())
             };
             log(debug, ~"tcp::read after recv_timeout");
             match move read_result {
@@ -1068,8 +1071,7 @@ fn read_stop_common_impl(socket_data: *TcpSocketData) ->
     result::Result<(), TcpErrData> {
     unsafe {
         let stream_handle_ptr = (*socket_data).stream_handle_ptr;
-        let stop_po = oldcomm::Port::<Option<TcpErrData>>();
-        let stop_ch = oldcomm::Chan(&stop_po);
+        let (stop_po, stop_ch) = stream::<Option<TcpErrData>>();
         do iotask::interact(&(*socket_data).iotask) |loop_ptr| {
             unsafe {
                 log(debug, ~"in interact cb for tcp::read_stop");
@@ -1077,17 +1079,17 @@ fn read_stop_common_impl(socket_data: *TcpSocketData) ->
                                         as *uv::ll::uv_stream_t) {
                     0i32 => {
                         log(debug, ~"successfully called uv_read_stop");
-                        oldcomm::send(stop_ch, None);
+                        stop_ch.send(None);
                     }
                     _ => {
                         log(debug, ~"failure in calling uv_read_stop");
                         let err_data = uv::ll::get_last_err_data(loop_ptr);
-                        oldcomm::send(stop_ch, Some(err_data.to_tcp_err()));
+                        stop_ch.send(Some(err_data.to_tcp_err()));
                     }
                 }
             }
         }
-        match oldcomm::recv(stop_po) {
+        match stop_po.recv() {
             Some(move err_data) => Err(err_data),
             None => Ok(())
         }
@@ -1096,12 +1098,11 @@ fn read_stop_common_impl(socket_data: *TcpSocketData) ->
 
 // shared impl for read_start
 fn read_start_common_impl(socket_data: *TcpSocketData)
-    -> result::Result<oldcomm::Port<
+    -> result::Result<@Port<
         result::Result<~[u8], TcpErrData>>, TcpErrData> {
     unsafe {
         let stream_handle_ptr = (*socket_data).stream_handle_ptr;
-        let start_po = oldcomm::Port::<Option<uv::ll::uv_err_data>>();
-        let start_ch = oldcomm::Chan(&start_po);
+        let (start_po, start_ch) = stream::<Option<uv::ll::uv_err_data>>();
         log(debug, ~"in tcp::read_start before interact loop");
         do iotask::interact(&(*socket_data).iotask) |loop_ptr| {
             unsafe {
@@ -1113,19 +1114,22 @@ fn read_start_common_impl(socket_data: *TcpSocketData)
                                          on_tcp_read_cb) {
                     0i32 => {
                         log(debug, ~"success doing uv_read_start");
-                        oldcomm::send(start_ch, None);
+                        start_ch.send(None);
                     }
                     _ => {
                         log(debug, ~"error attempting uv_read_start");
                         let err_data = uv::ll::get_last_err_data(loop_ptr);
-                        oldcomm::send(start_ch, Some(err_data));
+                        start_ch.send(Some(err_data));
                     }
                 }
             }
         }
-        match oldcomm::recv(start_po) {
-            Some(ref err_data) => result::Err(err_data.to_tcp_err()),
-            None => result::Ok((*socket_data).reader_po)
+        match start_po.recv() {
+            Some(ref err_data) => result::Err(
+                err_data.to_tcp_err()),
+            None => {
+                result::Ok((*socket_data).reader_po)
+            }
         }
     }
 }
@@ -1144,9 +1148,10 @@ fn write_common_impl(socket_data_ptr: *TcpSocketData,
             vec::raw::to_ptr(raw_write_data),
             vec::len(raw_write_data)) ];
         let write_buf_vec_ptr = ptr::addr_of(&write_buf_vec);
-        let result_po = oldcomm::Port::<TcpWriteResult>();
-        let write_data = {
-            result_ch: oldcomm::Chan(&result_po)
+        let (result_po, result_ch) = stream::<TcpWriteResult>();
+        let result_ch = SharedChan(result_ch);
+        let write_data = WriteReqData {
+            result_ch: result_ch
         };
         let write_data_ptr = ptr::addr_of(&write_data);
         do iotask::interact(&(*socket_data_ptr).iotask) |loop_ptr| {
@@ -1165,8 +1170,8 @@ fn write_common_impl(socket_data_ptr: *TcpSocketData,
                     _ => {
                         log(debug, ~"error invoking uv_write()");
                         let err_data = uv::ll::get_last_err_data(loop_ptr);
-                        oldcomm::send((*write_data_ptr).result_ch,
-                                      TcpWriteError(err_data.to_tcp_err()));
+                        let result_ch = (*write_data_ptr).result_ch.clone();
+                        result_ch.send(TcpWriteError(err_data.to_tcp_err()));
                     }
                 }
             }
@@ -1175,7 +1180,7 @@ fn write_common_impl(socket_data_ptr: *TcpSocketData,
         // and waiting here for the write to complete, we should transfer
         // ownership of everything to the I/O task and let it deal with the
         // aftermath, so we don't have to sit here blocking.
-        match oldcomm::recv(result_po) {
+        match result_po.recv() {
             TcpWriteSuccess => Ok(()),
             TcpWriteError(move err_data) => Err(err_data)
         }
@@ -1188,8 +1193,8 @@ enum TcpNewConnection {
 
 struct TcpListenFcData {
     server_stream_ptr: *uv::ll::uv_tcp_t,
-    stream_closed_ch: oldcomm::Chan<()>,
-    kill_ch: oldcomm::Chan<Option<TcpErrData>>,
+    stream_closed_ch: SharedChan<()>,
+    kill_ch: SharedChan<Option<TcpErrData>>,
     on_connect_cb: fn~(*uv::ll::uv_tcp_t),
     iotask: IoTask,
     ipv6: bool,
@@ -1200,7 +1205,8 @@ extern fn tcp_lfc_close_cb(handle: *uv::ll::uv_tcp_t) {
     unsafe {
         let server_data_ptr = uv::ll::get_data_for_uv_handle(
             handle) as *TcpListenFcData;
-        oldcomm::send((*server_data_ptr).stream_closed_ch, ());
+        let stream_closed_ch = (*server_data_ptr).stream_closed_ch.clone();
+        stream_closed_ch.send(());
     }
 }
 
@@ -1209,13 +1215,13 @@ extern fn tcp_lfc_on_connection_cb(handle: *uv::ll::uv_tcp_t,
     unsafe {
         let server_data_ptr = uv::ll::get_data_for_uv_handle(handle)
             as *TcpListenFcData;
-        let kill_ch = (*server_data_ptr).kill_ch;
+        let kill_ch = (*server_data_ptr).kill_ch.clone();
         if (*server_data_ptr).active {
             match status {
               0i32 => ((*server_data_ptr).on_connect_cb)(handle),
               _ => {
                 let loop_ptr = uv::ll::get_loop_for_uv_handle(handle);
-                oldcomm::send(kill_ch,
+                kill_ch.send(
                            Some(uv::ll::get_last_err_data(loop_ptr)
                                 .to_tcp_err()));
                 (*server_data_ptr).active = false;
@@ -1243,7 +1249,7 @@ enum TcpWriteResult {
 }
 
 enum TcpReadStartResult {
-    TcpReadStartSuccess(oldcomm::Port<TcpReadResult>),
+    TcpReadStartSuccess(Port<TcpReadResult>),
     TcpReadStartError(TcpErrData)
 }
 
@@ -1278,8 +1284,8 @@ extern fn on_tcp_read_cb(stream: *uv::ll::uv_stream_t,
             let err_data = uv::ll::get_last_err_data(loop_ptr).to_tcp_err();
             log(debug, fmt!("on_tcp_read_cb: incoming err.. name %? msg %?",
                             err_data.err_name, err_data.err_msg));
-            let reader_ch = (*socket_data_ptr).reader_ch;
-            oldcomm::send(reader_ch, result::Err(err_data));
+            let reader_ch = &(*socket_data_ptr).reader_ch;
+            reader_ch.send(result::Err(err_data));
           }
           // do nothing .. unneeded buf
           0 => (),
@@ -1287,10 +1293,10 @@ extern fn on_tcp_read_cb(stream: *uv::ll::uv_stream_t,
           _ => {
             // we have data
             log(debug, fmt!("tcp on_read_cb nread: %d", nread as int));
-            let reader_ch = (*socket_data_ptr).reader_ch;
+            let reader_ch = &(*socket_data_ptr).reader_ch;
             let buf_base = uv::ll::get_base_from_buf(buf);
             let new_bytes = vec::from_buf(buf_base, nread as uint);
-            oldcomm::send(reader_ch, result::Ok(new_bytes));
+            reader_ch.send(result::Ok(new_bytes));
           }
         }
         uv::ll::free_base_of_buf(buf);
@@ -1313,15 +1319,15 @@ extern fn on_alloc_cb(handle: *libc::c_void,
 }
 
 struct TcpSocketCloseData {
-    closed_ch: oldcomm::Chan<()>,
+    closed_ch: SharedChan<()>,
 }
 
 extern fn tcp_socket_dtor_close_cb(handle: *uv::ll::uv_tcp_t) {
     unsafe {
         let data = uv::ll::get_data_for_uv_handle(handle)
             as *TcpSocketCloseData;
-        let closed_ch = (*data).closed_ch;
-        oldcomm::send(closed_ch, ());
+        let closed_ch = (*data).closed_ch.clone();
+        closed_ch.send(());
         log(debug, ~"tcp_socket_dtor_close_cb exiting..");
     }
 }
@@ -1333,33 +1339,35 @@ extern fn tcp_write_complete_cb(write_req: *uv::ll::uv_write_t,
             as *WriteReqData;
         if status == 0i32 {
             log(debug, ~"successful write complete");
-            oldcomm::send((*write_data_ptr).result_ch, TcpWriteSuccess);
+            let result_ch = (*write_data_ptr).result_ch.clone();
+            result_ch.send(TcpWriteSuccess);
         } else {
             let stream_handle_ptr = uv::ll::get_stream_handle_from_write_req(
                 write_req);
             let loop_ptr = uv::ll::get_loop_for_uv_handle(stream_handle_ptr);
             let err_data = uv::ll::get_last_err_data(loop_ptr);
             log(debug, ~"failure to write");
-            oldcomm::send((*write_data_ptr).result_ch,
-                             TcpWriteError(err_data.to_tcp_err()));
+            let result_ch = (*write_data_ptr).result_ch.clone();
+            result_ch.send(TcpWriteError(err_data.to_tcp_err()));
         }
     }
 }
 
 struct WriteReqData {
-    result_ch: oldcomm::Chan<TcpWriteResult>,
+    result_ch: SharedChan<TcpWriteResult>,
 }
 
 struct ConnectReqData {
-    result_ch: oldcomm::Chan<ConnAttempt>,
-    closed_signal_ch: oldcomm::Chan<()>,
+    result_ch: SharedChan<ConnAttempt>,
+    closed_signal_ch: SharedChan<()>,
 }
 
 extern fn stream_error_close_cb(handle: *uv::ll::uv_tcp_t) {
     unsafe {
         let data = uv::ll::get_data_for_uv_handle(handle) as
             *ConnectReqData;
-        oldcomm::send((*data).closed_signal_ch, ());
+        let closed_signal_ch = (*data).closed_signal_ch.clone();
+        closed_signal_ch.send(());
         log(debug, fmt!("exiting steam_error_close_cb for %?", handle));
     }
 }
@@ -1375,14 +1383,14 @@ extern fn tcp_connect_on_connect_cb(connect_req_ptr: *uv::ll::uv_connect_t,
     unsafe {
         let conn_data_ptr = (uv::ll::get_data_for_req(connect_req_ptr)
                           as *ConnectReqData);
-        let result_ch = (*conn_data_ptr).result_ch;
+        let result_ch = (*conn_data_ptr).result_ch.clone();
         log(debug, fmt!("tcp_connect result_ch %?", result_ch));
         let tcp_stream_ptr =
             uv::ll::get_stream_handle_from_connect_req(connect_req_ptr);
         match status {
           0i32 => {
             log(debug, ~"successful tcp connection!");
-            oldcomm::send(result_ch, ConnSuccess);
+            result_ch.send(ConnSuccess);
           }
           _ => {
             log(debug, ~"error in tcp_connect_on_connect_cb");
@@ -1390,7 +1398,7 @@ extern fn tcp_connect_on_connect_cb(connect_req_ptr: *uv::ll::uv_connect_t,
             let err_data = uv::ll::get_last_err_data(loop_ptr);
             log(debug, fmt!("err_data %? %?", err_data.err_name,
                             err_data.err_msg));
-            oldcomm::send(result_ch, ConnFailure(err_data));
+            result_ch.send(ConnFailure(err_data));
             uv::ll::set_data_for_uv_handle(tcp_stream_ptr,
                                            conn_data_ptr);
             uv::ll::close(tcp_stream_ptr, stream_error_close_cb);
@@ -1406,8 +1414,8 @@ enum ConnAttempt {
 }
 
 struct TcpSocketData {
-    reader_po: oldcomm::Port<result::Result<~[u8], TcpErrData>>,
-    reader_ch: oldcomm::Chan<result::Result<~[u8], TcpErrData>>,
+    reader_po: @Port<result::Result<~[u8], TcpErrData>>,
+    reader_ch: SharedChan<result::Result<~[u8], TcpErrData>>,
     stream_handle_ptr: *uv::ll::uv_tcp_t,
     connect_req: uv::ll::uv_connect_t,
     write_req: uv::ll::uv_write_t,
@@ -1431,7 +1439,7 @@ pub mod test {
     use uv;
 
     use core::io;
-    use core::oldcomm;
+    use core::pipes::{stream, Chan, Port, SharedChan};
     use core::prelude::*;
     use core::result;
     use core::str;
@@ -1546,39 +1554,33 @@ pub mod test {
         let expected_req = ~"ping";
         let expected_resp = ~"pong";
 
-        let server_result_po = oldcomm::Port::<~str>();
-        let server_result_ch = oldcomm::Chan(&server_result_po);
+        let (server_result_po, server_result_ch) = stream::<~str>();
 
-        let cont_po = oldcomm::Port::<()>();
-        let cont_ch = oldcomm::Chan(&cont_po);
+        let (cont_po, cont_ch) = stream::<()>();
+        let cont_ch = SharedChan(cont_ch);
         // server
         let hl_loop_clone = hl_loop.clone();
         do task::spawn_sched(task::ManualThreads(1u)) {
-            let actual_req = do oldcomm::listen |server_ch| {
-                run_tcp_test_server(
-                    server_ip,
-                    server_port,
-                    expected_resp,
-                    server_ch,
-                    cont_ch,
-                    &hl_loop_clone)
-            };
+            let cont_ch = cont_ch.clone();
+            let actual_req = run_tcp_test_server(
+                server_ip,
+                server_port,
+                expected_resp,
+                cont_ch.clone(),
+                &hl_loop_clone);
             server_result_ch.send(actual_req);
         };
-        oldcomm::recv(cont_po);
+        cont_po.recv();
         // client
         debug!("server started, firing up client..");
-        let actual_resp_result = do oldcomm::listen |client_ch| {
-            run_tcp_test_client(
-                server_ip,
-                server_port,
-                expected_req,
-                client_ch,
-                hl_loop)
-        };
+        let actual_resp_result = run_tcp_test_client(
+            server_ip,
+            server_port,
+            expected_req,
+            hl_loop);
         assert actual_resp_result.is_ok();
         let actual_resp = actual_resp_result.get();
-        let actual_req = oldcomm::recv(server_result_po);
+        let actual_req = server_result_po.recv();
         debug!("REQ: expected: '%s' actual: '%s'",
                        expected_req, actual_req);
         debug!("RESP: expected: '%s' actual: '%s'",
@@ -1592,50 +1594,41 @@ pub mod test {
         let server_port = 8887u;
         let expected_resp = ~"pong";
 
-        let server_result_po = oldcomm::Port::<~str>();
-        let server_result_ch = oldcomm::Chan(&server_result_po);
-
-        let cont_po = oldcomm::Port::<()>();
-        let cont_ch = oldcomm::Chan(&cont_po);
+        let (cont_po, cont_ch) = stream::<()>();
+        let cont_ch = SharedChan(cont_ch);
         // server
         let hl_loop_clone = hl_loop.clone();
         do task::spawn_sched(task::ManualThreads(1u)) {
-            let actual_req = do oldcomm::listen |server_ch| {
-                run_tcp_test_server(
-                    server_ip,
-                    server_port,
-                    expected_resp,
-                    server_ch,
-                    cont_ch,
-                    &hl_loop_clone)
-            };
-            server_result_ch.send(actual_req);
+            let cont_ch = cont_ch.clone();
+            run_tcp_test_server(
+                server_ip,
+                server_port,
+                expected_resp,
+                cont_ch.clone(),
+                &hl_loop_clone);
         };
-        oldcomm::recv(cont_po);
+        cont_po.recv();
         // client
         debug!("server started, firing up client..");
-        do oldcomm::listen |client_ch| {
-            let server_ip_addr = ip::v4::parse_addr(server_ip);
-            let iotask = uv::global_loop::get();
-            let connect_result = connect(move server_ip_addr, server_port,
-                                         &iotask);
-
-            let sock = result::unwrap(move connect_result);
-
-            debug!("testing peer address");
-            // This is what we are actually testing!
-            assert net::ip::format_addr(&sock.get_peer_addr()) ==
-                ~"127.0.0.1";
-            assert net::ip::get_port(&sock.get_peer_addr()) == 8887;
-
-            // Fulfill the protocol the test server expects
-            let resp_bytes = str::to_bytes(~"ping");
-            tcp_write_single(&sock, resp_bytes);
-            debug!("message sent");
-            let read_result = sock.read(0u);
-            client_ch.send(str::from_bytes(read_result.get()));
-            debug!("result read");
-        };
+        let server_ip_addr = ip::v4::parse_addr(server_ip);
+        let iotask = uv::global_loop::get();
+        let connect_result = connect(move server_ip_addr, server_port,
+                                     &iotask);
+
+        let sock = result::unwrap(move connect_result);
+
+        debug!("testing peer address");
+        // This is what we are actually testing!
+        assert net::ip::format_addr(&sock.get_peer_addr()) ==
+            ~"127.0.0.1";
+        assert net::ip::get_port(&sock.get_peer_addr()) == 8887;
+
+        // Fulfill the protocol the test server expects
+        let resp_bytes = str::to_bytes(~"ping");
+        tcp_write_single(&sock, resp_bytes);
+        debug!("message sent");
+        sock.read(0u);
+        debug!("result read");
     }
     pub fn impl_gl_tcp_ipv4_client_error_connection_refused() {
         let hl_loop = &uv::global_loop::get();
@@ -1644,14 +1637,11 @@ pub mod test {
         let expected_req = ~"ping";
         // client
         debug!("firing up client..");
-        let actual_resp_result = do oldcomm::listen |client_ch| {
-            run_tcp_test_client(
-                server_ip,
-                server_port,
-                expected_req,
-                client_ch,
-                hl_loop)
-        };
+        let actual_resp_result = run_tcp_test_client(
+            server_ip,
+            server_port,
+            expected_req,
+            hl_loop);
         match actual_resp_result.get_err() {
           ConnectionRefused => (),
           _ => fail ~"unknown error.. expected connection_refused"
@@ -1664,26 +1654,20 @@ pub mod test {
         let expected_req = ~"ping";
         let expected_resp = ~"pong";
 
-        let server_result_po = oldcomm::Port::<~str>();
-        let server_result_ch = oldcomm::Chan(&server_result_po);
-
-        let cont_po = oldcomm::Port::<()>();
-        let cont_ch = oldcomm::Chan(&cont_po);
+        let (cont_po, cont_ch) = stream::<()>();
+        let cont_ch = SharedChan(cont_ch);
         // server
         let hl_loop_clone = hl_loop.clone();
         do task::spawn_sched(task::ManualThreads(1u)) {
-            let actual_req = do oldcomm::listen |server_ch| {
-                run_tcp_test_server(
-                    server_ip,
-                    server_port,
-                    expected_resp,
-                    server_ch,
-                    cont_ch,
-                    &hl_loop_clone)
-            };
-            server_result_ch.send(actual_req);
-        };
-        oldcomm::recv(cont_po);
+            let cont_ch = cont_ch.clone();
+            run_tcp_test_server(
+                server_ip,
+                server_port,
+                expected_resp,
+                cont_ch.clone(),
+                &hl_loop_clone);
+        }
+        cont_po.recv();
         // this one should fail..
         let listen_err = run_tcp_test_server_fail(
                             server_ip,
@@ -1691,14 +1675,11 @@ pub mod test {
                             hl_loop);
         // client.. just doing this so that the first server tears down
         debug!("server started, firing up client..");
-        do oldcomm::listen |client_ch| {
-            run_tcp_test_client(
-                server_ip,
-                server_port,
-                expected_req,
-                client_ch,
-                hl_loop)
-        };
+        run_tcp_test_client(
+            server_ip,
+            server_port,
+            expected_req,
+            hl_loop);
         match listen_err {
           AddressInUse => {
             assert true;
@@ -1736,26 +1717,23 @@ pub mod test {
         let expected_req = ~"ping";
         let expected_resp = ~"pong";
 
-        let server_result_po = oldcomm::Port::<~str>();
-        let server_result_ch = oldcomm::Chan(&server_result_po);
+        let (server_result_po, server_result_ch) = stream::<~str>();
 
-        let cont_po = oldcomm::Port::<()>();
-        let cont_ch = oldcomm::Chan(&cont_po);
+        let (cont_po, cont_ch) = stream::<()>();
+        let cont_ch = SharedChan(cont_ch);
         // server
         let iotask_clone = iotask.clone();
         do task::spawn_sched(task::ManualThreads(1u)) {
-            let actual_req = do oldcomm::listen |server_ch| {
-                run_tcp_test_server(
-                    server_ip,
-                    server_port,
-                    expected_resp,
-                    server_ch,
-                    cont_ch,
-                    &iotask_clone)
-            };
+            let cont_ch = cont_ch.clone();
+            let actual_req = run_tcp_test_server(
+                server_ip,
+                server_port,
+                expected_resp,
+                cont_ch.clone(),
+                &iotask_clone);
             server_result_ch.send(actual_req);
         };
-        oldcomm::recv(cont_po);
+        cont_po.recv();
         // client
         let server_addr = ip::v4::parse_addr(server_ip);
         let conn_result = connect(server_addr, server_port, iotask);
@@ -1770,7 +1748,7 @@ pub mod test {
             buf_read(sock_buf, resp_buf.len())
         };
 
-        let actual_req = oldcomm::recv(server_result_po);
+        let actual_req = server_result_po.recv();
         log(debug, fmt!("REQ: expected: '%s' actual: '%s'",
                        expected_req, actual_req));
         log(debug, fmt!("RESP: expected: '%s' actual: '%s'",
@@ -1788,26 +1766,20 @@ pub mod test {
         let expected_req = ~"GET /";
         let expected_resp = ~"A string\nwith multiple lines\n";
 
-        let server_result_po = oldcomm::Port::<~str>();
-        let server_result_ch = oldcomm::Chan(&server_result_po);
-
-        let cont_po = oldcomm::Port::<()>();
-        let cont_ch = oldcomm::Chan(&cont_po);
+        let (cont_po, cont_ch) = stream::<()>();
+        let cont_ch = SharedChan(cont_ch);
         // server
         let hl_loop_clone = hl_loop.clone();
         do task::spawn_sched(task::ManualThreads(1u)) {
-            let actual_req = do oldcomm::listen |server_ch| {
-                run_tcp_test_server(
-                    server_ip,
-                    server_port,
-                    expected_resp,
-                    server_ch,
-                    cont_ch,
-                    &hl_loop_clone)
-            };
-            server_result_ch.send(actual_req);
+            let cont_ch = cont_ch.clone();
+            run_tcp_test_server(
+                server_ip,
+                server_port,
+                expected_resp,
+                cont_ch.clone(),
+                &hl_loop_clone);
         };
-        oldcomm::recv(cont_po);
+        cont_po.recv();
         // client
         debug!("server started, firing up client..");
         let server_addr = ip::v4::parse_addr(server_ip);
@@ -1841,22 +1813,25 @@ pub mod test {
     }
 
     fn run_tcp_test_server(server_ip: &str, server_port: uint, resp: ~str,
-                          server_ch: oldcomm::Chan<~str>,
-                          cont_ch: oldcomm::Chan<()>,
+                          cont_ch: SharedChan<()>,
                           iotask: &IoTask) -> ~str {
+        let (server_po, server_ch) = stream::<~str>();
+        let server_ch = SharedChan(server_ch);
         let server_ip_addr = ip::v4::parse_addr(server_ip);
         let listen_result = listen(move server_ip_addr, server_port, 128,
                                    iotask,
             // on_establish_cb -- called when listener is set up
             |kill_ch| {
-                debug!("establish_cb %?", kill_ch);
-                oldcomm::send(cont_ch, ());
+                debug!("establish_cb %?",
+                    kill_ch);
+                cont_ch.send(());
             },
             // risky to run this on the loop, but some users
             // will want the POWER
             |new_conn, kill_ch| {
-            debug!("SERVER: new connection!");
-            do oldcomm::listen |cont_ch| {
+                debug!("SERVER: new connection!");
+                let (cont_po, cont_ch) = stream();
+                let server_ch = server_ch.clone();
                 do task::spawn_sched(task::ManualThreads(1u)) {
                     debug!("SERVER: starting worker for new req");
 
@@ -1865,8 +1840,9 @@ pub mod test {
                     if result::is_err(&accept_result) {
                         debug!("SERVER: error accept connection");
                         let err_data = result::get_err(&accept_result);
-                        oldcomm::send(kill_ch, Some(err_data));
-                        debug!("SERVER/WORKER: send on err cont ch");
+                        kill_ch.send(Some(err_data));
+                        debug!(
+                            "SERVER/WORKER: send on err cont ch");
                         cont_ch.send(());
                     }
                     else {
@@ -1889,12 +1865,12 @@ pub mod test {
                             debug!("SERVER: before write");
                             tcp_write_single(&sock, str::to_bytes(resp));
                             debug!("SERVER: after write.. die");
-                            oldcomm::send(kill_ch, None);
+                            kill_ch.send(None);
                           }
                           result::Err(move err_data) => {
                             debug!("SERVER: error recvd: %s %s",
                                 err_data.err_name, err_data.err_msg);
-                            oldcomm::send(kill_ch, Some(err_data));
+                            kill_ch.send(Some(err_data));
                             server_ch.send(~"");
                           }
                         }
@@ -1902,9 +1878,7 @@ pub mod test {
                     }
                 }
                 debug!("SERVER: waiting to recv on cont_ch");
-                cont_ch.recv()
-            };
-            debug!("SERVER: recv'd on cont_ch..leaving listen cb");
+                cont_po.recv();
         });
         // err check on listen_result
         if result::is_err(&listen_result) {
@@ -1921,7 +1895,7 @@ pub mod test {
               }
             }
         }
-        let ret_val = server_ch.recv();
+        let ret_val = server_po.recv();
         debug!("SERVER: exited and got return val: '%s'", ret_val);
         ret_val
     }
@@ -1949,7 +1923,6 @@ pub mod test {
     }
 
     fn run_tcp_test_client(server_ip: &str, server_port: uint, resp: &str,
-                          client_ch: oldcomm::Chan<~str>,
                           iotask: &IoTask) -> result::Result<~str,
                                                     TcpConnectErrData> {
         let server_ip_addr = ip::v4::parse_addr(server_ip);
@@ -1972,9 +1945,9 @@ pub mod test {
                 Ok(~"")
             }
             else {
-                client_ch.send(str::from_bytes(read_result.get()));
-                let ret_val = client_ch.recv();
-                debug!("CLIENT: after client_ch recv ret: '%s'", ret_val);
+                let ret_val = str::from_bytes(read_result.get());
+                debug!("CLIENT: after client_ch recv ret: '%s'",
+                   ret_val);
                 Ok(ret_val)
             }
         }
diff --git a/src/libstd/test.rs b/src/libstd/test.rs
index 2db1a51e34a..58bc32b71af 100644
--- a/src/libstd/test.rs
+++ b/src/libstd/test.rs
@@ -27,7 +27,7 @@ use core::either;
 use core::io::WriterUtil;
 use core::io;
 use core::libc::size_t;
-use core::oldcomm;
+use core::pipes::{stream, Chan, Port, SharedChan};
 use core::option;
 use core::prelude::*;
 use core::result;
@@ -305,8 +305,8 @@ fn run_tests(opts: &TestOpts,
     let mut wait_idx = 0;
     let mut done_idx = 0;
 
-    let p = oldcomm::Port();
-    let ch = oldcomm::Chan(&p);
+    let (p, ch) = stream();
+    let ch = SharedChan(ch);
 
     while done_idx < total {
         while wait_idx < concurrency && run_idx < total {
@@ -317,12 +317,12 @@ fn run_tests(opts: &TestOpts,
                 // that hang forever.
                 callback(TeWait(copy test));
             }
-            run_test(move test, ch);
+            run_test(move test, ch.clone());
             wait_idx += 1;
             run_idx += 1;
         }
 
-        let (test, result) = oldcomm::recv(p);
+        let (test, result) = p.recv();
         if concurrency != 1 {
             callback(TeWait(copy test));
         }
@@ -406,9 +406,9 @@ struct TestFuture {
     wait: fn@() -> TestResult,
 }
 
-pub fn run_test(test: TestDesc, monitor_ch: oldcomm::Chan<MonitorMsg>) {
+pub fn run_test(test: TestDesc, monitor_ch: SharedChan<MonitorMsg>) {
     if test.ignore {
-        oldcomm::send(monitor_ch, (copy test, TrIgnored));
+        monitor_ch.send((copy test, TrIgnored));
         return;
     }
 
@@ -420,7 +420,7 @@ pub fn run_test(test: TestDesc, monitor_ch: oldcomm::Chan<MonitorMsg>) {
         }).spawn(move testfn);
         let task_result = option::unwrap(move result_future).recv();
         let test_result = calc_result(&test, task_result == task::Success);
-        oldcomm::send(monitor_ch, (copy test, test_result));
+        monitor_ch.send((copy test, test_result));
     };
 }
 
@@ -440,7 +440,7 @@ mod tests {
     use test::{TestOpts, run_test};
 
     use core::either;
-    use core::oldcomm;
+    use core::pipes::{stream, SharedChan};
     use core::option;
     use core::vec;
 
@@ -453,10 +453,10 @@ mod tests {
             ignore: true,
             should_fail: false
         };
-        let p = oldcomm::Port();
-        let ch = oldcomm::Chan(&p);
+        let (p, ch) = stream();
+        let ch = SharedChan(ch);
         run_test(desc, ch);
-        let (_, res) = oldcomm::recv(p);
+        let (_, res) = p.recv();
         assert res != TrOk;
     }
 
@@ -469,10 +469,10 @@ mod tests {
             ignore: true,
             should_fail: false
         };
-        let p = oldcomm::Port();
-        let ch = oldcomm::Chan(&p);
+        let (p, ch) = stream();
+        let ch = SharedChan(ch);
         run_test(desc, ch);
-        let (_, res) = oldcomm::recv(p);
+        let (_, res) = p.recv();
         assert res == TrIgnored;
     }
 
@@ -486,10 +486,10 @@ mod tests {
             ignore: false,
             should_fail: true
         };
-        let p = oldcomm::Port();
-        let ch = oldcomm::Chan(&p);
+        let (p, ch) = stream();
+        let ch = SharedChan(ch);
         run_test(desc, ch);
-        let (_, res) = oldcomm::recv(p);
+        let (_, res) = p.recv();
         assert res == TrOk;
     }
 
@@ -502,10 +502,10 @@ mod tests {
             ignore: false,
             should_fail: true
         };
-        let p = oldcomm::Port();
-        let ch = oldcomm::Chan(&p);
+        let (p, ch) = stream();
+        let ch = SharedChan(ch);
         run_test(desc, ch);
-        let (_, res) = oldcomm::recv(p);
+        let (_, res) = p.recv();
         assert res == TrFailed;
     }
 
diff --git a/src/libstd/timer.rs b/src/libstd/timer.rs
index d62e2cbf05d..b967f92a22e 100644
--- a/src/libstd/timer.rs
+++ b/src/libstd/timer.rs
@@ -18,7 +18,9 @@ use uv::iotask::IoTask;
 
 use core::either;
 use core::libc;
-use core::oldcomm;
+use core::libc::c_void;
+use core::cast::transmute;
+use core::pipes::{stream, Chan, SharedChan, Port, select2i};
 use core::prelude::*;
 use core::ptr;
 use core;
@@ -41,12 +43,11 @@ use core;
  */
 pub fn delayed_send<T: Owned>(iotask: &IoTask,
                               msecs: uint,
-                              ch: oldcomm::Chan<T>,
+                              ch: &Chan<T>,
                               val: T) {
         unsafe {
-            let timer_done_po = oldcomm::Port::<()>();
-            let timer_done_ch = oldcomm::Chan(&timer_done_po);
-            let timer_done_ch_ptr = ptr::addr_of(&timer_done_ch);
+            let (timer_done_po, timer_done_ch) = stream::<()>();
+            let timer_done_ch = SharedChan(timer_done_ch);
             let timer = uv::ll::timer_t();
             let timer_ptr = ptr::addr_of(&timer);
             do iotask::interact(iotask) |loop_ptr| {
@@ -56,9 +57,15 @@ pub fn delayed_send<T: Owned>(iotask: &IoTask,
                         let start_result = uv::ll::timer_start(
                             timer_ptr, delayed_send_cb, msecs, 0u);
                         if (start_result == 0i32) {
+                            // Note: putting the channel into a ~
+                            // to cast to *c_void
+                            let timer_done_ch_clone = ~timer_done_ch.clone();
+                            let timer_done_ch_ptr = transmute::<
+                                ~SharedChan<()>, *c_void>(
+                                timer_done_ch_clone);
                             uv::ll::set_data_for_uv_handle(
                                 timer_ptr,
-                                timer_done_ch_ptr as *libc::c_void);
+                                timer_done_ch_ptr);
                         } else {
                             let error_msg = uv::ll::get_last_err_info(
                                 loop_ptr);
@@ -73,11 +80,11 @@ pub fn delayed_send<T: Owned>(iotask: &IoTask,
                 }
             };
             // delayed_send_cb has been processed by libuv
-            oldcomm::recv(timer_done_po);
+            timer_done_po.recv();
             // notify the caller immediately
-            oldcomm::send(ch, move(val));
+            ch.send(val);
             // uv_close for this timer has been processed
-            oldcomm::recv(timer_done_po);
+            timer_done_po.recv();
     };
 }
 
@@ -93,10 +100,9 @@ pub fn delayed_send<T: Owned>(iotask: &IoTask,
  * * msecs - an amount of time, in milliseconds, for the current task to block
  */
 pub fn sleep(iotask: &IoTask, msecs: uint) {
-    let exit_po = oldcomm::Port::<()>();
-    let exit_ch = oldcomm::Chan(&exit_po);
-    delayed_send(iotask, msecs, exit_ch, ());
-    oldcomm::recv(exit_po);
+    let (exit_po, exit_ch) = stream::<()>();
+    delayed_send(iotask, msecs, &exit_ch, ());
+    exit_po.recv();
 }
 
 /**
@@ -121,20 +127,17 @@ pub fn sleep(iotask: &IoTask, msecs: uint) {
  */
 pub fn recv_timeout<T: Copy Owned>(iotask: &IoTask,
                                    msecs: uint,
-                                   wait_po: oldcomm::Port<T>)
+                                   wait_po: &Port<T>)
                                 -> Option<T> {
-    let timeout_po = oldcomm::Port::<()>();
-    let timeout_ch = oldcomm::Chan(&timeout_po);
-    delayed_send(iotask, msecs, timeout_ch, ());
+    let (timeout_po, timeout_ch) = stream::<()>();
+    delayed_send(iotask, msecs, &timeout_ch, ());
     // FIXME: This could be written clearer (#2618)
     either::either(
-        |left_val| {
-            log(debug, fmt!("recv_time .. left_val %?",
-                           left_val));
+        |_| {
             None
-        }, |right_val| {
-            Some(*right_val)
-        }, &oldcomm::select2(timeout_po, wait_po)
+        }, |_| {
+            Some(wait_po.recv())
+        }, &select2i(&timeout_po, wait_po)
     )
 }
 
@@ -144,11 +147,14 @@ extern fn delayed_send_cb(handle: *uv::ll::uv_timer_t,
     unsafe {
         log(debug,
             fmt!("delayed_send_cb handle %? status %?", handle, status));
-        let timer_done_ch =
-            *(uv::ll::get_data_for_uv_handle(handle) as *oldcomm::Chan<()>);
+        // Faking a borrowed pointer to our ~SharedChan
+        let timer_done_ch_ptr: &*c_void = &uv::ll::get_data_for_uv_handle(
+            handle);
+        let timer_done_ch_ptr = transmute::<&*c_void, &~SharedChan<()>>(
+            timer_done_ch_ptr);
         let stop_result = uv::ll::timer_stop(handle);
         if (stop_result == 0i32) {
-            oldcomm::send(timer_done_ch, ());
+            timer_done_ch_ptr.send(());
             uv::ll::close(handle, delayed_send_close_cb);
         } else {
             let loop_ptr = uv::ll::get_loop_for_uv_handle(handle);
@@ -161,9 +167,10 @@ extern fn delayed_send_cb(handle: *uv::ll::uv_timer_t,
 extern fn delayed_send_close_cb(handle: *uv::ll::uv_timer_t) {
     unsafe {
         log(debug, fmt!("delayed_send_close_cb handle %?", handle));
-        let timer_done_ch =
-            *(uv::ll::get_data_for_uv_handle(handle) as *oldcomm::Chan<()>);
-        oldcomm::send(timer_done_ch, ());
+        let timer_done_ch_ptr = uv::ll::get_data_for_uv_handle(handle);
+        let timer_done_ch = transmute::<*c_void, ~SharedChan<()>>(
+            timer_done_ch_ptr);
+        timer_done_ch.send(());
     }
 }
 
@@ -175,9 +182,9 @@ mod test {
     use uv;
 
     use core::iter;
-    use core::oldcomm;
     use core::rand;
     use core::task;
+    use core::pipes::{stream, SharedChan};
 
     #[test]
     pub fn test_gl_timer_simple_sleep_test() {
@@ -195,8 +202,8 @@ mod test {
 
     #[test]
     pub fn test_gl_timer_sleep_stress2() {
-        let po = oldcomm::Port();
-        let ch = oldcomm::Chan(&po);
+        let (po, ch) = stream();
+        let ch = SharedChan(ch);
         let hl_loop = &uv::global_loop::get();
 
         let repeat = 20u;
@@ -210,8 +217,10 @@ mod test {
 
         for iter::repeat(repeat) {
 
+            let ch = ch.clone();
             for spec.each |spec| {
                 let (times, maxms) = *spec;
+                let ch = ch.clone();
                 let hl_loop_clone = hl_loop.clone();
                 do task::spawn {
                     use rand::*;
@@ -219,13 +228,13 @@ mod test {
                     for iter::repeat(times) {
                         sleep(&hl_loop_clone, rng.next() as uint % maxms);
                     }
-                    oldcomm::send(ch, ());
+                    ch.send(());
                 }
             }
         }
 
         for iter::repeat(repeat * spec.len()) {
-            oldcomm::recv(po)
+            po.recv()
         }
     }
 
@@ -246,14 +255,13 @@ mod test {
             task::yield();
 
             let expected = rand::rng().gen_str(16u);
-            let test_po = core::comm::port::<str>();
-            let test_ch = core::comm::chan(test_po);
+            let (test_po, test_ch) = stream::<~str>();
 
             do task::spawn() {
-                delayed_send(hl_loop, 1u, test_ch, expected);
+                delayed_send(hl_loop, 1u, &test_ch, expected);
             };
 
-            match recv_timeout(hl_loop, 10u, test_po) {
+            match recv_timeout(hl_loop, 10u, &test_po) {
               Some(val) => {
                 assert val == expected;
                 successes += 1;
@@ -274,14 +282,13 @@ mod test {
 
         for iter::repeat(times as uint) {
             let expected = rand::Rng().gen_str(16u);
-            let test_po = oldcomm::Port::<~str>();
-            let test_ch = oldcomm::Chan(&test_po);
+            let (test_po, test_ch) = stream::<~str>();
             let hl_loop_clone = hl_loop.clone();
             do task::spawn() {
-                delayed_send(&hl_loop_clone, 50u, test_ch, expected);
+                delayed_send(&hl_loop_clone, 50u, &test_ch, expected);
             };
 
-            match recv_timeout(&hl_loop, 1u, test_po) {
+            match recv_timeout(&hl_loop, 1u, &test_po) {
               None => successes += 1,
               _ => failures += 1
             };
diff --git a/src/libstd/uv_iotask.rs b/src/libstd/uv_iotask.rs
index dc0092aadfa..6fcbccf8183 100644
--- a/src/libstd/uv_iotask.rs
+++ b/src/libstd/uv_iotask.rs
@@ -209,16 +209,17 @@ mod test {
 
     use core::iter;
     use core::libc;
-    use core::oldcomm;
     use core::ptr;
     use core::task;
+    use core::pipes::{stream, Chan, SharedChan, Port};
 
     extern fn async_close_cb(handle: *ll::uv_async_t) {
         unsafe {
             log(debug, fmt!("async_close_cb handle %?", handle));
-            let exit_ch = (*(ll::get_data_for_uv_handle(handle)
+            let exit_ch = &(*(ll::get_data_for_uv_handle(handle)
                             as *AhData)).exit_ch;
-            oldcomm::send(exit_ch, ());
+            let exit_ch = exit_ch.clone();
+            exit_ch.send(());
         }
     }
     extern fn async_handle_cb(handle: *ll::uv_async_t, status: libc::c_int) {
@@ -230,17 +231,16 @@ mod test {
     }
     struct AhData {
         iotask: IoTask,
-        exit_ch: oldcomm::Chan<()>
+        exit_ch: SharedChan<()>
     }
     fn impl_uv_iotask_async(iotask: &IoTask) {
         unsafe {
             let async_handle = ll::async_t();
             let ah_ptr = ptr::addr_of(&async_handle);
-            let exit_po = oldcomm::Port::<()>();
-            let exit_ch = oldcomm::Chan(&exit_po);
+            let (exit_po, exit_ch) = stream::<()>();
             let ah_data = AhData {
                 iotask: iotask.clone(),
-                exit_ch: exit_ch
+                exit_ch: SharedChan(exit_ch)
             };
             let ah_data_ptr: *AhData = unsafe {
                 ptr::to_unsafe_ptr(&ah_data)
@@ -256,13 +256,13 @@ mod test {
                 }
             };
             debug!("waiting for async close");
-            oldcomm::recv(exit_po);
+            exit_po.recv();
         }
     }
 
     // this fn documents the bear minimum neccesary to roll your own
     // high_level_loop
-    unsafe fn spawn_test_loop(exit_ch: oldcomm::Chan<()>) -> IoTask {
+    unsafe fn spawn_test_loop(exit_ch: ~Chan<()>) -> IoTask {
         let (iotask_port, iotask_ch) = stream::<IoTask>();
         do task::spawn_sched(task::ManualThreads(1u)) {
             debug!("about to run a test loop");
@@ -287,9 +287,8 @@ mod test {
     #[test]
     fn test_uv_iotask_async() {
         unsafe {
-            let exit_po = oldcomm::Port::<()>();
-            let exit_ch = oldcomm::Chan(&exit_po);
-            let iotask = &spawn_test_loop(exit_ch);
+            let (exit_po, exit_ch) = stream::<()>();
+            let iotask = &spawn_test_loop(~exit_ch);
 
             debug!("spawned iotask");
 
@@ -300,24 +299,25 @@ mod test {
             // race-condition type situations.. this ensures that the
             // loop lives until, at least, all of the
             // impl_uv_hl_async() runs have been called, at least.
-            let work_exit_po = oldcomm::Port::<()>();
-            let work_exit_ch = oldcomm::Chan(&work_exit_po);
+            let (work_exit_po, work_exit_ch) = stream::<()>();
+            let work_exit_ch = SharedChan(work_exit_ch);
             for iter::repeat(7u) {
                 let iotask_clone = iotask.clone();
+                let work_exit_ch_clone = work_exit_ch.clone();
                 do task::spawn_sched(task::ManualThreads(1u)) {
                     debug!("async");
                     impl_uv_iotask_async(&iotask_clone);
                     debug!("done async");
-                    oldcomm::send(work_exit_ch, ());
+                    work_exit_ch_clone.send(());
                 };
             };
             for iter::repeat(7u) {
                 debug!("waiting");
-                oldcomm::recv(work_exit_po);
+                work_exit_po.recv();
             };
             log(debug, ~"sending teardown_loop msg..");
             exit(iotask);
-            oldcomm::recv(exit_po);
+            exit_po.recv();
             log(debug, ~"after recv on exit_po.. exiting..");
         }
     }
diff --git a/src/libstd/uv_ll.rs b/src/libstd/uv_ll.rs
index 499d36a6613..8bef6eb6c91 100644
--- a/src/libstd/uv_ll.rs
+++ b/src/libstd/uv_ll.rs
@@ -39,6 +39,7 @@ use core::ptr::to_unsafe_ptr;
 use core::ptr;
 use core::str;
 use core::vec;
+use core::pipes::{stream, Chan, SharedChan, Port};
 
 // libuv struct mappings
 pub struct uv_ip4_addr {
@@ -1132,7 +1133,6 @@ pub mod test {
     use uv_ll::*;
 
     use core::libc;
-    use core::oldcomm;
     use core::ptr;
     use core::str;
     use core::sys;
@@ -1148,7 +1148,7 @@ pub mod test {
     struct request_wrapper {
         write_req: *uv_write_t,
         req_buf: *~[uv_buf_t],
-        read_chan: *oldcomm::Chan<~str>,
+        read_chan: SharedChan<~str>,
     }
 
     extern fn after_close_cb(handle: *libc::c_void) {
@@ -1187,9 +1187,9 @@ pub mod test {
                 let buf_base = get_base_from_buf(buf);
                 let buf_len = get_len_from_buf(buf);
                 let bytes = vec::from_buf(buf_base, buf_len as uint);
-                let read_chan = *((*client_data).read_chan);
+                let read_chan = (*client_data).read_chan.clone();
                 let msg_from_server = str::from_bytes(bytes);
-                oldcomm::send(read_chan, msg_from_server);
+                read_chan.send(msg_from_server);
                 close(stream as *libc::c_void, after_close_cb)
             }
             else if (nread == -1) {
@@ -1257,7 +1257,7 @@ pub mod test {
     }
 
     fn impl_uv_tcp_request(ip: &str, port: int, req_str: &str,
-                          client_chan: *oldcomm::Chan<~str>) {
+                          client_chan: SharedChan<~str>) {
         unsafe {
             let test_loop = loop_new();
             let tcp_handle = tcp_t();
@@ -1283,9 +1283,11 @@ pub mod test {
             log(debug, fmt!("tcp req: tcp stream: %d write_handle: %d",
                              tcp_handle_ptr as int,
                              write_handle_ptr as int));
-            let client_data = { writer_handle: write_handle_ptr,
-                        req_buf: ptr::addr_of(&req_msg),
-                        read_chan: client_chan };
+            let client_data = request_wrapper {
+                write_req: write_handle_ptr,
+                req_buf: ptr::addr_of(&req_msg),
+                read_chan: client_chan
+            };
 
             let tcp_init_result = tcp_init(
                 test_loop as *libc::c_void, tcp_handle_ptr);
@@ -1388,8 +1390,8 @@ pub mod test {
                     log(debug, ~"SERVER: client req contains kill_msg!");
                     log(debug, ~"SERVER: sending response to client");
                     read_stop(client_stream_ptr);
-                    let server_chan = *((*client_data).server_chan);
-                    oldcomm::send(server_chan, request_str);
+                    let server_chan = (*client_data).server_chan.clone();
+                    server_chan.send(request_str);
                     let write_result = write(
                         write_req,
                         client_stream_ptr as *libc::c_void,
@@ -1484,12 +1486,12 @@ pub mod test {
         server: *uv_tcp_t,
         server_kill_msg: ~str,
         server_resp_buf: *~[uv_buf_t],
-        server_chan: *oldcomm::Chan<~str>,
+        server_chan: SharedChan<~str>,
         server_write_req: *uv_write_t,
     }
 
     struct async_handle_data {
-        continue_chan: *oldcomm::Chan<bool>,
+        continue_chan: SharedChan<bool>,
     }
 
     extern fn async_close_cb(handle: *libc::c_void) {
@@ -1506,9 +1508,9 @@ pub mod test {
             // do its thang
             let data = get_data_for_uv_handle(
                 async_handle as *libc::c_void) as *async_handle_data;
-            let continue_chan = *((*data).continue_chan);
+            let continue_chan = (*data).continue_chan.clone();
             let should_continue = status == 0i32;
-            oldcomm::send(continue_chan, should_continue);
+            continue_chan.send(should_continue);
             close(async_handle as *libc::c_void, async_close_cb);
         }
     }
@@ -1517,8 +1519,8 @@ pub mod test {
                           server_port: int,
                           +kill_server_msg: ~str,
                           +server_resp_msg: ~str,
-                          server_chan: *oldcomm::Chan<~str>,
-                          continue_chan: *oldcomm::Chan<bool>) {
+                          server_chan: SharedChan<~str>,
+                          continue_chan: SharedChan<bool>) {
         unsafe {
             let test_loop = loop_new();
             let tcp_server = tcp_t();
@@ -1626,36 +1628,35 @@ pub mod test {
             let port = 8886;
             let kill_server_msg = ~"does a dog have buddha nature?";
             let server_resp_msg = ~"mu!";
-            let client_port = oldcomm::Port::<~str>();
-            let client_chan = oldcomm::Chan::<~str>(&client_port);
-            let server_port = oldcomm::Port::<~str>();
-            let server_chan = oldcomm::Chan::<~str>(&server_port);
+            let (client_port, client_chan) = stream::<~str>();
+            let client_chan = SharedChan(client_chan);
+            let (server_port, server_chan) = stream::<~str>();
+            let server_chan = SharedChan(server_chan);
 
-            let continue_port = oldcomm::Port::<bool>();
-            let continue_chan = oldcomm::Chan::<bool>(&continue_port);
-            let continue_chan_ptr = ptr::addr_of(&continue_chan);
+            let (continue_port, continue_chan) = stream::<bool>();
+            let continue_chan = SharedChan(continue_chan);
 
             do task::spawn_sched(task::ManualThreads(1)) {
                 impl_uv_tcp_server(bind_ip, port,
                                    kill_server_msg,
                                    server_resp_msg,
-                                   ptr::addr_of(&server_chan),
-                                   continue_chan_ptr);
+                                   server_chan.clone(),
+                                   continue_chan.clone());
             };
 
             // block until the server up is.. possibly a race?
             log(debug, ~"before receiving on server continue_port");
-            oldcomm::recv(continue_port);
+            continue_port.recv();
             log(debug, ~"received on continue port, set up tcp client");
 
             do task::spawn_sched(task::ManualThreads(1u)) {
                 impl_uv_tcp_request(request_ip, port,
                                    kill_server_msg,
-                                   ptr::addr_of(&client_chan));
+                                   client_chan.clone());
             };
 
-            let msg_from_client = oldcomm::recv(server_port);
-            let msg_from_server = oldcomm::recv(client_port);
+            let msg_from_client = server_port.recv();
+            let msg_from_server = client_port.recv();
 
             assert str::contains(msg_from_client, kill_server_msg);
             assert str::contains(msg_from_server, server_resp_msg);