about summary refs log tree commit diff
diff options
context:
space:
mode:
authorBrian Anderson <banderson@mozilla.com>2013-01-25 17:51:53 -0800
committerBrian Anderson <banderson@mozilla.com>2013-01-25 18:06:30 -0800
commit1ef83945c1d76c9f2b9b0d087ceac65963087be7 (patch)
treeb114ac7f84a0a10a3d207da0bcd3e9dcc2418e7b
parentd1f771ca341bc93e2ebe9f0ef9979a71a8e3c6d8 (diff)
parent19aa88cd64c81b77b874f9396a43fedfa28f14ee (diff)
downloadrust-1ef83945c1d76c9f2b9b0d087ceac65963087be7.tar.gz
rust-1ef83945c1d76c9f2b9b0d087ceac65963087be7.zip
Merge remote-tracking branch 'brson/nocommupstream'
Conflicts:
	src/libcore/private.rs
	src/libcore/task/mod.rs
	src/libcore/task/spawn.rs
	src/libstd/net_tcp.rs
	src/libstd/uv_global_loop.rs
	src/libstd/uv_iotask.rs
-rw-r--r--src/libcore/os.rs204
-rw-r--r--src/libcore/pipes.rs10
-rw-r--r--src/libcore/private.rs274
-rw-r--r--src/libcore/private/at_exit.rs98
-rw-r--r--src/libcore/private/finally.rs98
-rw-r--r--src/libcore/private/global.rs296
-rw-r--r--src/libcore/private/weak_task.rs207
-rw-r--r--src/libcore/run.rs13
-rw-r--r--src/libcore/task/mod.rs258
-rw-r--r--src/libcore/task/spawn.rs67
-rw-r--r--src/libstd/flatpipes.rs3
-rw-r--r--src/libstd/net_ip.rs6
-rw-r--r--src/libstd/net_tcp.rs712
-rw-r--r--src/libstd/timer.rs21
-rw-r--r--src/libstd/uv_global_loop.rs205
-rw-r--r--src/libstd/uv_iotask.rs114
-rw-r--r--src/rt/rust.cpp4
-rw-r--r--src/rt/rust_builtin.cpp46
-rw-r--r--src/rt/rust_kernel.cpp165
-rw-r--r--src/rt/rust_kernel.h43
-rw-r--r--src/rt/rust_uv.cpp9
-rw-r--r--src/rt/rustrt.def.in8
-rw-r--r--src/test/run-pass/pipe-detect-term.rs2
-rw-r--r--src/test/run-pass/pipe-select.rs2
-rw-r--r--src/test/run-pass/pipe-sleep.rs2
25 files changed, 1683 insertions, 1184 deletions
diff --git a/src/libcore/os.rs b/src/libcore/os.rs
index cf86f45379c..6f568e9b2a7 100644
--- a/src/libcore/os.rs
+++ b/src/libcore/os.rs
@@ -141,169 +141,101 @@ pub mod win32 {
     }
 }
 
-pub fn getenv(n: &str) -> Option<~str> {
-    global_env::getenv(n)
-}
+/*
+Accessing environment variables is not generally threadsafe.
+This uses a per-runtime lock to serialize access.
+XXX: It would probably be appropriate to make this a real global
+*/
+fn with_env_lock<T>(f: &fn() -> T) -> T {
+    use private::global::global_data_clone_create;
+    use private::{Exclusive, exclusive};
+
+    struct SharedValue(());
+    type ValueMutex = Exclusive<SharedValue>;
+    fn key(_: ValueMutex) { }
 
-pub fn setenv(n: &str, v: &str) {
-    global_env::setenv(n, v)
-}
+    unsafe {
+        let lock: ValueMutex = global_data_clone_create(key, || {
+            ~exclusive(SharedValue(()))
+        });
 
-pub fn env() -> ~[(~str,~str)] {
-    global_env::env()
+        lock.with_imm(|_| f() )
+    }
 }
 
-mod global_env {
-    //! Internal module for serializing access to getenv/setenv
-    use either;
-    use libc;
-    use oldcomm;
-    use option::Option;
-    use private;
-    use str;
-    use task;
-
+pub fn env() -> ~[(~str,~str)] {
     extern mod rustrt {
-        unsafe fn rust_global_env_chan_ptr() -> *libc::uintptr_t;
-    }
-
-    enum Msg {
-        MsgGetEnv(~str, oldcomm::Chan<Option<~str>>),
-        MsgSetEnv(~str, ~str, oldcomm::Chan<()>),
-        MsgEnv(oldcomm::Chan<~[(~str,~str)]>)
-    }
-
-    pub fn getenv(n: &str) -> Option<~str> {
-        let env_ch = get_global_env_chan();
-        let po = oldcomm::Port();
-        oldcomm::send(env_ch, MsgGetEnv(str::from_slice(n),
-                                        oldcomm::Chan(&po)));
-        oldcomm::recv(po)
-    }
-
-    pub fn setenv(n: &str, v: &str) {
-        let env_ch = get_global_env_chan();
-        let po = oldcomm::Port();
-        oldcomm::send(env_ch, MsgSetEnv(str::from_slice(n),
-                                        str::from_slice(v),
-                                        oldcomm::Chan(&po)));
-        oldcomm::recv(po)
-    }
-
-    pub fn env() -> ~[(~str,~str)] {
-        let env_ch = get_global_env_chan();
-        let po = oldcomm::Port();
-        oldcomm::send(env_ch, MsgEnv(oldcomm::Chan(&po)));
-        oldcomm::recv(po)
-    }
-
-    fn get_global_env_chan() -> oldcomm::Chan<Msg> {
-        unsafe {
-            let global_ptr = rustrt::rust_global_env_chan_ptr();
-            private::chan_from_global_ptr(global_ptr, || {
-                // FIXME (#2621): This would be a good place to use a very
-                // small foreign stack
-                task::task().sched_mode(task::SingleThreaded).unlinked()
-            }, global_env_task)
-        }
+        unsafe fn rust_env_pairs() -> ~[~str];
     }
 
-    fn global_env_task(msg_po: oldcomm::Port<Msg>) {
-        unsafe {
-            do private::weaken_task |weak_po| {
-                loop {
-                    match oldcomm::select2(msg_po, weak_po) {
-                      either::Left(MsgGetEnv(ref n, resp_ch)) => {
-                        oldcomm::send(resp_ch, impl_::getenv(*n))
-                      }
-                      either::Left(MsgSetEnv(ref n, ref v, resp_ch)) => {
-                        oldcomm::send(resp_ch, impl_::setenv(*n, *v))
-                      }
-                      either::Left(MsgEnv(resp_ch)) => {
-                        oldcomm::send(resp_ch, impl_::env())
-                      }
-                      either::Right(_) => break
-                    }
-                }
+    unsafe {
+        do with_env_lock {
+            let mut pairs = ~[];
+            for vec::each(rustrt::rust_env_pairs()) |p| {
+                let vs = str::splitn_char(*p, '=', 1u);
+                assert vec::len(vs) == 2u;
+                pairs.push((copy vs[0], copy vs[1]));
             }
+            move pairs
         }
     }
+}
 
-    mod impl_ {
-        use cast;
-        use libc;
-        use option::Option;
-        use option;
-        use ptr;
-        use str;
-        use vec;
-
-        extern mod rustrt {
-            unsafe fn rust_env_pairs() -> ~[~str];
-        }
-
-        pub fn env() -> ~[(~str,~str)] {
-            unsafe {
-                let mut pairs = ~[];
-                for vec::each(rustrt::rust_env_pairs()) |p| {
-                    let vs = str::splitn_char(*p, '=', 1u);
-                    assert vec::len(vs) == 2u;
-                    pairs.push((copy vs[0], copy vs[1]));
-                }
-                move pairs
-            }
-        }
-
-        #[cfg(unix)]
-        pub fn getenv(n: &str) -> Option<~str> {
-            unsafe {
-                let s = str::as_c_str(n, |s| libc::getenv(s));
-                return if ptr::null::<u8>() == cast::reinterpret_cast(&s) {
-                    option::None::<~str>
-                } else {
-                    let s = cast::reinterpret_cast(&s);
-                    option::Some::<~str>(str::raw::from_buf(s))
-                };
+#[cfg(unix)]
+pub fn getenv(n: &str) -> Option<~str> {
+    unsafe {
+        do with_env_lock {
+            let s = str::as_c_str(n, |s| libc::getenv(s));
+            if ptr::null::<u8>() == cast::reinterpret_cast(&s) {
+                option::None::<~str>
+            } else {
+                let s = cast::reinterpret_cast(&s);
+                option::Some::<~str>(str::raw::from_buf(s))
             }
         }
+    }
+}
 
-        #[cfg(windows)]
-        pub fn getenv(n: &str) -> Option<~str> {
-            unsafe {
-                use os::win32::{as_utf16_p, fill_utf16_buf_and_decode};
-                do as_utf16_p(n) |u| {
-                    do fill_utf16_buf_and_decode() |buf, sz| {
-                        libc::GetEnvironmentVariableW(u, buf, sz)
-                    }
+#[cfg(windows)]
+pub fn getenv(n: &str) -> Option<~str> {
+    unsafe {
+        do with_env_lock {
+            use os::win32::{as_utf16_p, fill_utf16_buf_and_decode};
+            do as_utf16_p(n) |u| {
+                do fill_utf16_buf_and_decode() |buf, sz| {
+                    libc::GetEnvironmentVariableW(u, buf, sz)
                 }
             }
         }
+    }
+}
 
 
-        #[cfg(unix)]
-        pub fn setenv(n: &str, v: &str) {
-            unsafe {
-                do str::as_c_str(n) |nbuf| {
-                    do str::as_c_str(v) |vbuf| {
-                        libc::funcs::posix01::unistd::setenv(nbuf, vbuf, 1);
-                    }
+#[cfg(unix)]
+pub fn setenv(n: &str, v: &str) {
+    unsafe {
+        do with_env_lock {
+            do str::as_c_str(n) |nbuf| {
+                do str::as_c_str(v) |vbuf| {
+                    libc::funcs::posix01::unistd::setenv(nbuf, vbuf, 1);
                 }
             }
         }
+    }
+}
 
 
-        #[cfg(windows)]
-        pub fn setenv(n: &str, v: &str) {
-            unsafe {
-                use os::win32::as_utf16_p;
-                do as_utf16_p(n) |nbuf| {
-                    do as_utf16_p(v) |vbuf| {
-                        libc::SetEnvironmentVariableW(nbuf, vbuf);
-                    }
+#[cfg(windows)]
+pub fn setenv(n: &str, v: &str) {
+    unsafe {
+        do with_env_lock {
+            use os::win32::as_utf16_p;
+            do as_utf16_p(n) |nbuf| {
+                do as_utf16_p(v) |vbuf| {
+                    libc::SetEnvironmentVariableW(nbuf, vbuf);
                 }
             }
         }
-
     }
 }
 
diff --git a/src/libcore/pipes.rs b/src/libcore/pipes.rs
index 0ef30668dbc..cecc954cdf3 100644
--- a/src/libcore/pipes.rs
+++ b/src/libcore/pipes.rs
@@ -1286,6 +1286,16 @@ pub fn oneshot<T: Owned>() -> (PortOne<T>, ChanOne<T>) {
     (port, chan)
 }
 
+impl<T: Owned> PortOne<T> {
+    fn recv(self) -> T { recv_one(self) }
+    fn try_recv(self) -> Option<T> { try_recv_one(self) }
+}
+
+impl<T: Owned> ChanOne<T> {
+    fn send(self, data: T) { send_one(self, data) }
+    fn try_send(self, data: T) -> bool { try_send_one(self, data) }
+}
+
 /**
  * Receive a message from a oneshot pipe, failing if the connection was
  * closed.
diff --git a/src/libcore/private.rs b/src/libcore/private.rs
index ad27729cc9f..332c763f151 100644
--- a/src/libcore/private.rs
+++ b/src/libcore/private.rs
@@ -18,7 +18,6 @@
 use cast;
 use iter;
 use libc;
-use oldcomm;
 use option;
 use pipes;
 use prelude::*;
@@ -28,10 +27,17 @@ use task;
 use task::{TaskBuilder, atomically};
 use uint;
 
+#[path = "private/at_exit.rs"]
+pub mod at_exit;
+#[path = "private/global.rs"]
+pub mod global;
+#[path = "private/finally.rs"]
+pub mod finally;
+#[path = "private/weak_task.rs"]
+pub mod weak_task;
+
 extern mod rustrt {
     #[legacy_exports];
-    unsafe fn rust_task_weaken(ch: rust_port_id);
-    unsafe fn rust_task_unweaken(ch: rust_port_id);
 
     unsafe fn rust_create_little_lock() -> rust_little_lock;
     unsafe fn rust_destroy_little_lock(lock: rust_little_lock);
@@ -87,11 +93,6 @@ fn test_run_in_bare_thread() {
     }
 }
 
-#[allow(non_camel_case_types)] // runtime type
-type rust_port_id = uint;
-
-type GlobalPtr = *libc::uintptr_t;
-
 fn compare_and_swap(address: &mut int, oldval: int, newval: int) -> bool {
     unsafe {
         let old = rusti::atomic_cxchg(address, oldval, newval);
@@ -99,255 +100,6 @@ fn compare_and_swap(address: &mut int, oldval: int, newval: int) -> bool {
     }
 }
 
-/**
- * Atomically gets a channel from a pointer to a pointer-sized memory location
- * or, if no channel exists creates and installs a new channel and sets up a
- * new task to receive from it.
- */
-pub unsafe fn chan_from_global_ptr<T: Owned>(
-    global: GlobalPtr,
-    task_fn: fn() -> task::TaskBuilder,
-    f: fn~(oldcomm::Port<T>)
-) -> oldcomm::Chan<T> {
-
-    enum Msg {
-        Proceed,
-        Abort
-    }
-
-    log(debug,~"ENTERING chan_from_global_ptr, before is_prob_zero check");
-    let is_probably_zero = *global == 0u;
-    log(debug,~"after is_prob_zero check");
-    if is_probably_zero {
-        log(debug,~"is probably zero...");
-        // There's no global channel. We must make it
-
-        let (setup1_po, setup1_ch) = pipes::stream();
-        let (setup2_po, setup2_ch) = pipes::stream();
-
-        // FIXME #4422: Ugly type inference hint
-        let setup2_po: pipes::Port<Msg> = setup2_po;
-
-        do task_fn().spawn |move f, move setup1_ch, move setup2_po| {
-            let po = oldcomm::Port::<T>();
-            let ch = oldcomm::Chan(&po);
-            setup1_ch.send(ch);
-
-            // Wait to hear if we are the official instance of
-            // this global task
-            match setup2_po.recv() {
-              Proceed => f(move po),
-              Abort => ()
-            }
-        };
-
-        log(debug,~"before setup recv..");
-        // This is the proposed global channel
-        let ch = setup1_po.recv();
-        // 0 is our sentinal value. It is not a valid channel
-        assert *ch != 0;
-
-        // Install the channel
-        log(debug,~"BEFORE COMPARE AND SWAP");
-        let swapped = compare_and_swap(
-            cast::reinterpret_cast(&global),
-            0, cast::reinterpret_cast(&ch));
-        log(debug,fmt!("AFTER .. swapped? %?", swapped));
-
-        if swapped {
-            // Success!
-            setup2_ch.send(Proceed);
-            ch
-        } else {
-            // Somebody else got in before we did
-            setup2_ch.send(Abort);
-            cast::reinterpret_cast(&*global)
-        }
-    } else {
-        log(debug, ~"global != 0");
-        cast::reinterpret_cast(&*global)
-    }
-}
-
-#[test]
-pub fn test_from_global_chan1() {
-
-    // This is unreadable, right?
-
-    // The global channel
-    let globchan = 0;
-    let globchanp = ptr::addr_of(&globchan);
-
-    // Create the global channel, attached to a new task
-    let ch = unsafe {
-        do chan_from_global_ptr(globchanp, task::task) |po| {
-            let ch = oldcomm::recv(po);
-            oldcomm::send(ch, true);
-            let ch = oldcomm::recv(po);
-            oldcomm::send(ch, true);
-        }
-    };
-    // Talk to it
-    let po = oldcomm::Port();
-    oldcomm::send(ch, oldcomm::Chan(&po));
-    assert oldcomm::recv(po) == true;
-
-    // This one just reuses the previous channel
-    let ch = unsafe {
-        do chan_from_global_ptr(globchanp, task::task) |po| {
-            let ch = oldcomm::recv(po);
-            oldcomm::send(ch, false);
-        }
-    };
-
-    // Talk to the original global task
-    let po = oldcomm::Port();
-    oldcomm::send(ch, oldcomm::Chan(&po));
-    assert oldcomm::recv(po) == true;
-}
-
-#[test]
-pub fn test_from_global_chan2() {
-
-    for iter::repeat(100) {
-        // The global channel
-        let globchan = 0;
-        let globchanp = ptr::addr_of(&globchan);
-
-        let resultpo = oldcomm::Port();
-        let resultch = oldcomm::Chan(&resultpo);
-
-        // Spawn a bunch of tasks that all want to compete to
-        // create the global channel
-        for uint::range(0, 10) |i| {
-            do task::spawn {
-                let ch = unsafe {
-                    do chan_from_global_ptr(
-                        globchanp, task::task) |po| {
-
-                        for uint::range(0, 10) |_j| {
-                            let ch = oldcomm::recv(po);
-                            oldcomm::send(ch, {i});
-                        }
-                    }
-                };
-                let po = oldcomm::Port();
-                oldcomm::send(ch, oldcomm::Chan(&po));
-                // We are The winner if our version of the
-                // task was installed
-                let winner = oldcomm::recv(po);
-                oldcomm::send(resultch, winner == i);
-            }
-        }
-        // There should be only one winner
-        let mut winners = 0u;
-        for uint::range(0u, 10u) |_i| {
-            let res = oldcomm::recv(resultpo);
-            if res { winners += 1u };
-        }
-        assert winners == 1u;
-    }
-}
-
-/**
- * Convert the current task to a 'weak' task temporarily
- *
- * As a weak task it will not be counted towards the runtime's set
- * of live tasks. When there are no more outstanding live (non-weak) tasks
- * the runtime will send an exit message on the provided channel.
- *
- * This function is super-unsafe. Do not use.
- *
- * # Safety notes
- *
- * * Weak tasks must either die on their own or exit upon receipt of
- *   the exit message. Failure to do so will cause the runtime to never
- *   exit
- * * Tasks must not call `weaken_task` multiple times. This will
- *   break the kernel's accounting of live tasks.
- * * Weak tasks must not be supervised. A supervised task keeps
- *   a reference to its parent, so the parent will not die.
- */
-pub unsafe fn weaken_task(f: fn(oldcomm::Port<()>)) {
-    let po = oldcomm::Port();
-    let ch = oldcomm::Chan(&po);
-    unsafe {
-        rustrt::rust_task_weaken(cast::reinterpret_cast(&ch));
-    }
-    let _unweaken = Unweaken(ch);
-    f(po);
-
-    struct Unweaken {
-      ch: oldcomm::Chan<()>,
-      drop {
-        unsafe {
-            rustrt::rust_task_unweaken(cast::reinterpret_cast(&self.ch));
-        }
-      }
-    }
-
-    fn Unweaken(ch: oldcomm::Chan<()>) -> Unweaken {
-        Unweaken {
-            ch: ch
-        }
-    }
-}
-
-#[test]
-pub fn test_weaken_task_then_unweaken() {
-    do task::try {
-        unsafe {
-            do weaken_task |_po| {
-            }
-        }
-    };
-}
-
-#[test]
-pub fn test_weaken_task_wait() {
-    do task::spawn_unlinked {
-        unsafe {
-            do weaken_task |po| {
-                oldcomm::recv(po);
-            }
-        }
-    }
-}
-
-#[test]
-pub fn test_weaken_task_stress() {
-    // Create a bunch of weak tasks
-    for iter::repeat(100u) {
-        do task::spawn {
-            unsafe {
-                do weaken_task |_po| {
-                }
-            }
-        }
-        do task::spawn_unlinked {
-            unsafe {
-                do weaken_task |po| {
-                    // Wait for it to tell us to die
-                    oldcomm::recv(po);
-                }
-            }
-        }
-    }
-}
-
-#[test]
-#[ignore(cfg(windows))]
-pub fn test_weaken_task_fail() {
-    let res = do task::try {
-        unsafe {
-            do weaken_task |_po| {
-                fail;
-            }
-        }
-    };
-    assert result::is_err(&res);
-}
-
 /****************************************************************************
  * Shared state & exclusive ARC
  ****************************************************************************/
@@ -533,6 +285,14 @@ pub unsafe fn clone_shared_mutable_state<T: Owned>(rc: &SharedMutableState<T>)
     ArcDestruct((*rc).data)
 }
 
+impl<T: Owned> SharedMutableState<T>: Clone {
+    fn clone(&self) -> SharedMutableState<T> {
+        unsafe {
+            clone_shared_mutable_state(self)
+        }
+    }
+}
+
 /****************************************************************************/
 
 #[allow(non_camel_case_types)] // runtime type
diff --git a/src/libcore/private/at_exit.rs b/src/libcore/private/at_exit.rs
new file mode 100644
index 00000000000..a87301dbe07
--- /dev/null
+++ b/src/libcore/private/at_exit.rs
@@ -0,0 +1,98 @@
+// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+use sys;
+use cast;
+use ptr;
+use task;
+use uint;
+use vec;
+use rand;
+use libc::{c_void, size_t};
+
+/**
+Register a function to be run during runtime shutdown.
+
+After all non-weak tasks have exited, registered exit functions will
+execute, in random order, on the primary scheduler. Each function runs
+in its own unsupervised task.
+*/
+pub fn at_exit(f: ~fn()) {
+    unsafe {
+        let runner: &fn(*ExitFunctions) = exit_runner;
+        let runner_pair: sys::Closure = cast::transmute(runner);
+        let runner_ptr = runner_pair.code;
+        let runner_ptr = cast::transmute(runner_ptr);
+        rustrt::rust_register_exit_function(runner_ptr, ~f);
+    }
+}
+
+// NB: The double pointer indirection here is because ~fn() is a fat
+// pointer and due to FFI problems I am more comfortable making the
+// interface use a normal pointer
+extern mod rustrt {
+    fn rust_register_exit_function(runner: *c_void, f: ~~fn());
+}
+
+struct ExitFunctions {
+    // The number of exit functions
+    count: size_t,
+    // The buffer of exit functions
+    start: *~~fn()
+}
+
+fn exit_runner(exit_fns: *ExitFunctions) {
+    let exit_fns = unsafe { &*exit_fns };
+    let count = (*exit_fns).count;
+    let start = (*exit_fns).start;
+
+    // NB: from_buf memcpys from the source, which will
+    // give us ownership of the array of functions
+    let mut exit_fns_vec = unsafe { vec::from_buf(start, count as uint) };
+    // Let's not make any promises about execution order
+    rand::Rng().shuffle_mut(exit_fns_vec);
+
+    debug!("running %u exit functions", exit_fns_vec.len());
+
+    while !exit_fns_vec.is_empty() {
+        match exit_fns_vec.pop() {
+            ~f => {
+                task::task().supervised().spawn(f);
+            }
+        }
+    }
+}
+
+#[abi = "rust-intrinsic"]
+pub extern mod rusti {
+    fn move_val_init<T>(dst: &mut T, -src: T);
+    fn init<T>() -> T;
+}
+
+#[test]
+fn test_at_exit() {
+    let i = 10;
+    do at_exit {
+        debug!("at_exit1");
+        assert i == 10;
+    }
+}
+
+#[test]
+fn test_at_exit_many() {
+    let i = 10;
+    for uint::range(20, 100) |j| {
+        do at_exit {
+            debug!("at_exit2");
+            assert i == 10;
+            assert j > i;
+        }
+    }
+}
\ No newline at end of file
diff --git a/src/libcore/private/finally.rs b/src/libcore/private/finally.rs
new file mode 100644
index 00000000000..66e23ff4336
--- /dev/null
+++ b/src/libcore/private/finally.rs
@@ -0,0 +1,98 @@
+// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+/*!
+The Finally trait provides a method, `finally` on
+stack closures that emulates Java-style try/finally blocks.
+
+# Example
+
+~~~
+do || {
+    ...
+}.finally {
+    alway_run_this();
+}
+~~~
+*/
+
+use ops::Drop;
+use task::{spawn, failing};
+
+pub trait Finally<T> {
+    fn finally(&self, +dtor: &fn()) -> T;
+}
+
+impl<T> &fn() -> T: Finally<T> {
+    // XXX: Should not require a mode here
+    fn finally(&self, +dtor: &fn()) -> T {
+        let _d = Finallyalizer {
+            dtor: dtor
+        };
+
+        (*self)()
+    }
+}
+
+struct Finallyalizer {
+    dtor: &fn()
+}
+
+impl Finallyalizer: Drop {
+    fn finalize(&self) {
+        (self.dtor)();
+    }
+}
+
+#[test]
+fn test_success() {
+    let mut i = 0;
+    do (|| {
+        i = 10;
+    }).finally {
+        assert !failing();
+        assert i == 10;
+        i = 20;
+    }
+    assert i == 20;
+}
+
+#[test]
+#[ignore(cfg(windows))]
+#[should_fail]
+fn test_fail() {
+    let mut i = 0;
+    do (|| {
+        i = 10;
+        fail;
+    }).finally {
+        assert failing();
+        assert i == 10;
+    }
+}
+
+#[test]
+fn test_retval() {
+    let i = do (fn&() -> int {
+        10
+    }).finally { };
+    assert i == 10;
+}
+
+#[test]
+fn test_compact() {
+    // XXX Should be able to use a fn item instead
+    // of a closure for do_some_fallible_work,
+    // but it's a type error.
+    let do_some_fallible_work: &fn() = || { };
+    fn but_always_run_this_function() { }
+    do_some_fallible_work.finally(
+        but_always_run_this_function);
+}
\ No newline at end of file
diff --git a/src/libcore/private/global.rs b/src/libcore/private/global.rs
new file mode 100644
index 00000000000..69319abc009
--- /dev/null
+++ b/src/libcore/private/global.rs
@@ -0,0 +1,296 @@
+// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+/*!
+Global data
+
+An interface for creating and retrieving values with global
+(per-runtime) scope.
+
+Global values are stored in a map and protected by a single global
+mutex. Operations are provided for accessing and cloning the value
+under the mutex.
+
+Because all globals go through a single mutex, they should be used
+sparingly.  The interface is intended to be used with clonable,
+atomically reference counted synchronization types, like ARCs, in
+which case the value should be cached locally whenever possible to
+avoid hitting the mutex.
+*/
+
+use cast::{transmute, reinterpret_cast};
+use clone::Clone;
+use kinds::Owned;
+use libc::{c_void, uintptr_t};
+use option::{Option, Some, None};
+use ops::Drop;
+use pipes;
+use private::{Exclusive, exclusive};
+use private::{SharedMutableState, shared_mutable_state};
+use private::{get_shared_immutable_state};
+use private::at_exit::at_exit;
+use hashmap::linear::LinearMap;
+use sys::Closure;
+use task::spawn;
+use uint;
+
+pub type GlobalDataKey<T: Owned> = &fn(v: T);
+
+pub unsafe fn global_data_clone_create<T: Owned Clone>(
+    key: GlobalDataKey<T>, create: &fn() -> ~T) -> T {
+    /*!
+     * Clone a global value or, if it has not been created,
+     * first construct the value then return a clone.
+     *
+     * # Safety note
+     *
+     * Both the clone operation and the constructor are
+     * called while the global lock is held. Recursive
+     * use of the global interface in either of these
+     * operations will result in deadlock.
+     */
+    global_data_clone_create_(key_ptr(key), create)
+}
+
+unsafe fn global_data_clone_create_<T: Owned Clone>(
+    key: uint, create: &fn() -> ~T) -> T {
+
+    let mut clone_value: Option<T> = None;
+    do global_data_modify_(key) |value: Option<~T>| {
+        match value {
+            None => {
+                let value = create();
+                clone_value = Some(value.clone());
+                Some(value)
+            }
+            Some(value) => {
+                clone_value = Some(value.clone());
+                Some(value)
+            }
+        }
+    }
+    return clone_value.unwrap();
+}
+
+unsafe fn global_data_modify<T: Owned>(
+    key: GlobalDataKey<T>, op: &fn(Option<~T>) -> Option<~T>) {
+
+    global_data_modify_(key_ptr(key), op)
+}
+
+unsafe fn global_data_modify_<T: Owned>(
+    key: uint, op: &fn(Option<~T>) -> Option<~T>) {
+
+    let mut old_dtor = None;
+    do get_global_state().with |gs| {
+        let (maybe_new_value, maybe_dtor) = match gs.map.pop(&key) {
+            Some((ptr, dtor)) => {
+                let value: ~T = transmute(ptr);
+                (op(Some(value)), Some(dtor))
+            }
+            None => {
+                (op(None), None)
+            }
+        };
+        match maybe_new_value {
+            Some(value) => {
+                let data: *c_void = transmute(value);
+                let dtor: ~fn() = match maybe_dtor {
+                    Some(dtor) => dtor,
+                    None => {
+                        let dtor: ~fn() = || unsafe {
+                            let _destroy_value: ~T = transmute(data);
+                        };
+                        dtor
+                    }
+                };
+                let value = (data, dtor);
+                gs.map.insert(key, value);
+            }
+            None => {
+                match maybe_dtor {
+                    Some(dtor) => old_dtor = Some(dtor),
+                    None => ()
+                }
+            }
+        }
+    }
+}
+
+pub unsafe fn global_data_clone<T: Owned Clone>(
+    key: GlobalDataKey<T>) -> Option<T> {
+    let mut maybe_clone: Option<T> = None;
+    do global_data_modify(key) |current| {
+        match &current {
+            &Some(~ref value) => {
+                maybe_clone = Some(value.clone());
+            }
+            &None => ()
+        }
+        current
+    }
+    return maybe_clone;
+}
+
+// GlobalState is a map from keys to unique pointers and a
+// destructor. Keys are pointers derived from the type of the
+// global value.  There is a single GlobalState instance per runtime.
+struct GlobalState {
+    map: LinearMap<uint, (*c_void, ~fn())>
+}
+
+impl GlobalState: Drop {
+    fn finalize(&self) {
+        for self.map.each_value |v| {
+            match v {
+                &(_, ref dtor) => (*dtor)()
+            }
+        }
+    }
+}
+
+fn get_global_state() -> Exclusive<GlobalState> {
+
+    const POISON: int = -1;
+
+    // XXX: Doing atomic_cxchg to initialize the global state
+    // lazily, which wouldn't be necessary with a runtime written
+    // in Rust
+    let global_ptr = unsafe { rust_get_global_data_ptr() };
+
+    if unsafe { *global_ptr } == 0 {
+        // Global state doesn't exist yet, probably
+
+        // The global state object
+        let state = GlobalState {
+            map: LinearMap::new()
+        };
+
+        // It's under a reference-counted mutex
+        let state = ~exclusive(state);
+
+        // Convert it to an integer
+        let state_ptr: &Exclusive<GlobalState> = state;
+        let state_i: int = unsafe { transmute(state_ptr) };
+
+        // Swap our structure into the global pointer
+        let prev_i = unsafe { atomic_cxchg(&mut *global_ptr, 0, state_i) };
+
+        // Sanity check that we're not trying to reinitialize after shutdown
+        assert prev_i != POISON;
+
+        if prev_i == 0 {
+            // Successfully installed the global pointer
+
+            // Take a handle to return
+            let clone = state.clone();
+
+            // Install a runtime exit function to destroy the global object
+            do at_exit {
+                // Poison the global pointer
+                let prev_i = unsafe {
+                    atomic_cxchg(&mut *global_ptr, state_i, POISON)
+                };
+                assert prev_i == state_i;
+
+                // Capture the global state object in the at_exit closure
+                // so that it is destroyed at the right time
+                let _capture_global_state = &state;
+            };
+            return clone;
+        } else {
+            // Somebody else initialized the globals first
+            let state: &Exclusive<GlobalState> = unsafe { transmute(prev_i) };
+            return state.clone();
+        }
+    } else {
+        let state: &Exclusive<GlobalState> = unsafe {
+            transmute(*global_ptr)
+        };
+        return state.clone();
+    }
+}
+
+fn key_ptr<T: Owned>(key: GlobalDataKey<T>) -> uint {
+    unsafe {
+        let closure: Closure = reinterpret_cast(&key);
+        return transmute(closure.code);
+    }
+}
+
+extern {
+    fn rust_get_global_data_ptr() -> *mut int;
+}
+
+#[abi = "rust-intrinsic"]
+extern {
+    fn atomic_cxchg(dst: &mut int, old: int, src: int) -> int;
+}
+
+#[test]
+fn test_clone_rc() {
+    type MyType = SharedMutableState<int>;
+
+    fn key(_v: SharedMutableState<int>) { }
+
+    for uint::range(0, 100) |_| {
+        do spawn {
+            unsafe {
+                let val = do global_data_clone_create(key) {
+                    ~shared_mutable_state(10)
+                };
+
+                assert get_shared_immutable_state(&val) == &10;
+            }
+        }
+    }
+}
+
+#[test]
+fn test_modify() {
+    type MyType = SharedMutableState<int>;
+
+    fn key(_v: SharedMutableState<int>) { }
+
+    unsafe {
+        do global_data_modify(key) |v| {
+            match v {
+                None => {
+                    unsafe {
+                        Some(~shared_mutable_state(10))
+                    }
+                }
+                _ => fail
+            }
+        }
+
+        do global_data_modify(key) |v| {
+            match v {
+                Some(sms) => {
+                    let v = get_shared_immutable_state(sms);
+                    assert *v == 10;
+                    None
+                },
+                _ => fail
+            }
+        }
+
+        do global_data_modify(key) |v| {
+            match v {
+                None => {
+                    unsafe {
+                        Some(~shared_mutable_state(10))
+                    }
+                }
+                _ => fail
+            }
+        }
+    }
+}
diff --git a/src/libcore/private/weak_task.rs b/src/libcore/private/weak_task.rs
new file mode 100644
index 00000000000..25a03ff960f
--- /dev/null
+++ b/src/libcore/private/weak_task.rs
@@ -0,0 +1,207 @@
+// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+/*!
+Weak tasks
+
+Weak tasks are a runtime feature for building global services that
+do not keep the runtime alive. Normally the runtime exits when all
+tasks exits, but if a task is weak then the runtime may exit while
+it is running, sending a notification to the task that the runtime
+is trying to shut down.
+*/
+
+use option::{Some, None, swap_unwrap};
+use private::at_exit::at_exit;
+use private::global::global_data_clone_create;
+use private::finally::Finally;
+use pipes::{Port, Chan, SharedChan, stream};
+use task::{Task, task, spawn};
+use task::rt::{task_id, get_task_id};
+use hashmap::linear::LinearMap;
+use ops::Drop;
+
+type ShutdownMsg = ();
+
+// XXX: This could be a PortOne but I've experienced bugginess
+// with oneshot pipes and try_send
+pub unsafe fn weaken_task(f: &fn(Port<ShutdownMsg>)) {
+    let service = global_data_clone_create(global_data_key,
+                                           create_global_service);
+    let (shutdown_port, shutdown_chan) = stream::<ShutdownMsg>();
+    let shutdown_port = ~mut Some(shutdown_port);
+    let task = get_task_id();
+    // Expect the weak task service to be alive
+    assert service.try_send(RegisterWeakTask(task, shutdown_chan));
+    unsafe { rust_inc_weak_task_count(); }
+    do fn&() {
+        let shutdown_port = swap_unwrap(&mut *shutdown_port);
+        f(shutdown_port)
+    }.finally || {
+        unsafe { rust_dec_weak_task_count(); }
+        // Service my have already exited
+        service.send(UnregisterWeakTask(task));
+    }
+}
+
+type WeakTaskService = SharedChan<ServiceMsg>;
+type TaskHandle = task_id;
+
+fn global_data_key(_v: WeakTaskService) { }
+
+enum ServiceMsg {
+    RegisterWeakTask(TaskHandle, Chan<ShutdownMsg>),
+    UnregisterWeakTask(TaskHandle),
+    Shutdown
+}
+
+fn create_global_service() -> ~WeakTaskService {
+
+    debug!("creating global weak task service");
+    let (port, chan) = stream::<ServiceMsg>();
+    let port = ~mut Some(port);
+    let chan = SharedChan(chan);
+    let chan_clone = chan.clone();
+
+    do task().unlinked().spawn {
+        debug!("running global weak task service");
+        let port = swap_unwrap(&mut *port);
+        let port = ~mut Some(port);
+        do fn&() {
+            let port = swap_unwrap(&mut *port);
+            // The weak task service is itself a weak task
+            debug!("weakening the weak service task");
+            unsafe { rust_inc_weak_task_count(); }
+            run_weak_task_service(port);
+        }.finally {
+            debug!("unweakening the weak service task");
+            unsafe { rust_dec_weak_task_count(); }
+        }
+    }
+
+    do at_exit {
+        debug!("shutting down weak task service");
+        chan.send(Shutdown);
+    }
+
+    return ~chan_clone;
+}
+
+fn run_weak_task_service(port: Port<ServiceMsg>) {
+
+    let mut shutdown_map = LinearMap::new();
+
+    loop {
+        match port.recv() {
+            RegisterWeakTask(task, shutdown_chan) => {
+                let previously_unregistered =
+                    shutdown_map.insert(task, shutdown_chan);
+                assert previously_unregistered;
+            }
+            UnregisterWeakTask(task) => {
+                match shutdown_map.pop(&task) {
+                    Some(shutdown_chan) => {
+                        // Oneshot pipes must send, even though
+                        // nobody will receive this
+                        shutdown_chan.send(());
+                    }
+                    None => fail
+                }
+            }
+            Shutdown => break
+        }
+    }
+
+    do shutdown_map.consume |_, shutdown_chan| {
+        // Weak task may have already exited
+        shutdown_chan.send(());
+    }
+}
+
+extern {
+    unsafe fn rust_inc_weak_task_count();
+    unsafe fn rust_dec_weak_task_count();
+}
+
+#[test]
+fn test_simple() {
+    let (port, chan) = stream();
+    do spawn {
+        unsafe {
+            do weaken_task |_signal| {
+            }
+        }
+        chan.send(());
+    }
+    port.recv();
+}
+
+#[test]
+fn test_weak_weak() {
+    let (port, chan) = stream();
+    do spawn {
+        unsafe {
+            do weaken_task |_signal| {
+            }
+            do weaken_task |_signal| {
+            }
+        }
+        chan.send(());
+    }
+    port.recv();
+}
+
+#[test]
+fn test_wait_for_signal() {
+    do spawn {
+        unsafe {
+            do weaken_task |signal| {
+                signal.recv();
+            }
+        }
+    }
+}
+
+#[test]
+fn test_wait_for_signal_many() {
+    use uint;
+    for uint::range(0, 100) |_| {
+        do spawn {
+            unsafe {
+                do weaken_task |signal| {
+                    signal.recv();
+                }
+            }
+        }
+    }
+}
+
+#[test]
+fn test_select_stream_and_oneshot() {
+    use pipes::select2i;
+    use either::{Left, Right};
+
+    let (port, chan) = stream();
+    let (waitport, waitchan) = stream();
+    do spawn {
+        unsafe {
+            do weaken_task |signal| {
+                match select2i(&port, &signal) {
+                    Left(*) => (),
+                    Right(*) => fail
+                }
+            }
+        }
+        waitchan.send(());
+    }
+    chan.send(());
+    waitport.recv();
+}
+
diff --git a/src/libcore/run.rs b/src/libcore/run.rs
index 8960d40b85a..eeae7f5b291 100644
--- a/src/libcore/run.rs
+++ b/src/libcore/run.rs
@@ -17,7 +17,7 @@ use io;
 use io::ReaderUtil;
 use libc;
 use libc::{pid_t, c_void, c_int};
-use oldcomm;
+use pipes::{stream, SharedChan};
 use option::{Some, None};
 use os;
 use prelude::*;
@@ -336,22 +336,23 @@ pub fn program_output(prog: &str, args: &[~str]) ->
         // in parallel so we don't deadlock while blocking on one
         // or the other. FIXME (#2625): Surely there's a much more
         // clever way to do this.
-        let p = oldcomm::Port();
-        let ch = oldcomm::Chan(&p);
+        let (p, ch) = stream();
+        let ch = SharedChan(ch);
+        let ch_clone = ch.clone();
         do task::spawn_sched(task::SingleThreaded) {
             let errput = readclose(pipe_err.in);
-            oldcomm::send(ch, (2, move errput));
+            ch.send((2, move errput));
         };
         do task::spawn_sched(task::SingleThreaded) {
             let output = readclose(pipe_out.in);
-            oldcomm::send(ch, (1, move output));
+            ch_clone.send((1, move output));
         };
         let status = run::waitpid(pid);
         let mut errs = ~"";
         let mut outs = ~"";
         let mut count = 2;
         while count > 0 {
-            let stream = oldcomm::recv(p);
+            let stream = p.recv();
             match stream {
                 (1, copy s) => {
                     outs = move s;
diff --git a/src/libcore/task/mod.rs b/src/libcore/task/mod.rs
index a4d99bf5db4..aa82309c78a 100644
--- a/src/libcore/task/mod.rs
+++ b/src/libcore/task/mod.rs
@@ -43,16 +43,15 @@ use cmp;
 use cmp::Eq;
 use iter;
 use libc;
-use oldcomm;
 use option;
 use result::Result;
-use pipes::{stream, Chan, Port};
+use pipes::{stream, Chan, Port, SharedChan};
 use pipes;
 use prelude::*;
 use ptr;
 use result;
 use task::local_data_priv::{local_get, local_set};
-use task::rt::{task_id, rust_task};
+use task::rt::{task_id, sched_id, rust_task};
 use task;
 use util;
 use util::replace;
@@ -62,6 +61,12 @@ pub mod local_data;
 pub mod rt;
 pub mod spawn;
 
+/// A handle to a scheduler
+#[deriving_eq]
+pub enum Scheduler {
+    SchedulerHandle(sched_id)
+}
+
 /// A handle to a task
 #[deriving_eq]
 pub enum Task {
@@ -95,7 +100,21 @@ impl TaskResult : Eq {
 }
 
 /// Scheduler modes
+#[deriving_eq]
 pub enum SchedMode {
+    /// Run task on the default scheduler
+    DefaultScheduler,
+    /// Run task on the current scheduler
+    CurrentScheduler,
+    /// Run task on a specific scheduler
+    ExistingScheduler(Scheduler),
+    /**
+     * Tasks are scheduled on the main OS thread
+     *
+     * The main OS thread is the thread used to launch the runtime which,
+     * in most cases, is the process's initial thread as created by the OS.
+     */
+    PlatformThread,
     /// All tasks run in the same OS thread
     SingleThreaded,
     /// Tasks are distributed among available CPUs
@@ -104,53 +123,6 @@ pub enum SchedMode {
     ThreadPerTask,
     /// Tasks are distributed among a fixed number of OS threads
     ManualThreads(uint),
-    /**
-     * Tasks are scheduled on the main OS thread
-     *
-     * The main OS thread is the thread used to launch the runtime which,
-     * in most cases, is the process's initial thread as created by the OS.
-     */
-    PlatformThread
-}
-
-impl SchedMode : cmp::Eq {
-    pure fn eq(&self, other: &SchedMode) -> bool {
-        match (*self) {
-            SingleThreaded => {
-                match (*other) {
-                    SingleThreaded => true,
-                    _ => false
-                }
-            }
-            ThreadPerCore => {
-                match (*other) {
-                    ThreadPerCore => true,
-                    _ => false
-                }
-            }
-            ThreadPerTask => {
-                match (*other) {
-                    ThreadPerTask => true,
-                    _ => false
-                }
-            }
-            ManualThreads(e0a) => {
-                match (*other) {
-                    ManualThreads(e0b) => e0a == e0b,
-                    _ => false
-                }
-            }
-            PlatformThread => {
-                match (*other) {
-                    PlatformThread => true,
-                    _ => false
-                }
-            }
-        }
-    }
-    pure fn ne(&self, other: &SchedMode) -> bool {
-        !(*self).eq(other)
-    }
 }
 
 /**
@@ -204,7 +176,7 @@ pub struct TaskOpts {
     linked: bool,
     supervised: bool,
     mut notify_chan: Option<Chan<TaskResult>>,
-    sched: Option<SchedOpts>,
+    sched: SchedOpts
 }
 
 /**
@@ -369,11 +341,8 @@ impl TaskBuilder {
             opts: TaskOpts {
                 linked: self.opts.linked,
                 supervised: self.opts.supervised,
-                notify_chan: notify_chan,
-                sched: Some(SchedOpts {
-                    mode: mode,
-                    foreign_stack_size: None,
-                })
+                notify_chan: move notify_chan,
+                sched: SchedOpts { mode: mode, foreign_stack_size: None}
             },
             can_not_copy: None,
             .. self.consume()
@@ -457,18 +426,17 @@ impl TaskBuilder {
      * Fails if a future_result was already set for this task.
      */
     fn try<T: Owned>(f: fn~() -> T) -> Result<T,()> {
-        let po = oldcomm::Port();
-        let ch = oldcomm::Chan(&po);
+        let (po, ch) = stream::<T>();
         let mut result = None;
 
         let fr_task_builder = self.future_result(|+r| {
             result = Some(move r);
         });
-        do fr_task_builder.spawn |move f| {
-            oldcomm::send(ch, f());
+        do fr_task_builder.spawn |move f, move ch| {
+            ch.send(f());
         }
         match option::unwrap(move result).recv() {
-            Success => result::Ok(oldcomm::recv(po)),
+            Success => result::Ok(po.recv()),
             Failure => result::Err(())
         }
     }
@@ -489,7 +457,10 @@ pub fn default_task_opts() -> TaskOpts {
         linked: true,
         supervised: false,
         notify_chan: None,
-        sched: None
+        sched: SchedOpts {
+            mode: DefaultScheduler,
+            foreign_stack_size: None
+        }
     }
 }
 
@@ -542,10 +513,9 @@ pub fn spawn_with<A:Owned>(arg: A, f: fn~(v: A)) {
 
 pub fn spawn_sched(mode: SchedMode, f: fn~()) {
     /*!
-     * Creates a new scheduler and executes a task on it
-     *
-     * Tasks subsequently spawned by that task will also execute on
-     * the new scheduler. When there are no more tasks to execute the
+     * Creates a new task on a new or existing scheduler
+
+     * When there are no more tasks to execute the
      * scheduler terminates.
      *
      * # Failure
@@ -599,6 +569,10 @@ pub fn get_task() -> Task {
     }
 }
 
+pub fn get_scheduler() -> Scheduler {
+    SchedulerHandle(unsafe { rt::rust_get_sched_id() })
+}
+
 /**
  * Temporarily make the task unkillable
  *
@@ -711,17 +685,18 @@ fn test_cant_dup_task_builder() {
 
 #[test] #[ignore(cfg(windows))]
 fn test_spawn_unlinked_unsup_no_fail_down() { // grandchild sends on a port
-    let po = oldcomm::Port();
-    let ch = oldcomm::Chan(&po);
+    let (po, ch) = stream();
+    let ch = SharedChan(ch);
     do spawn_unlinked {
+        let ch = ch.clone();
         do spawn_unlinked {
             // Give middle task a chance to fail-but-not-kill-us.
             for iter::repeat(16) { task::yield(); }
-            oldcomm::send(ch, ()); // If killed first, grandparent hangs.
+            ch.send(()); // If killed first, grandparent hangs.
         }
         fail; // Shouldn't kill either (grand)parent or (grand)child.
     }
-    oldcomm::recv(po);
+    po.recv();
 }
 #[test] #[ignore(cfg(windows))]
 fn test_spawn_unlinked_unsup_no_fail_up() { // child unlinked fails
@@ -741,8 +716,7 @@ fn test_spawn_unlinked_sup_fail_down() {
 
 #[test] #[should_fail] #[ignore(cfg(windows))]
 fn test_spawn_linked_sup_fail_up() { // child fails; parent fails
-    let po = oldcomm::Port::<()>();
-    let _ch = oldcomm::Chan(&po);
+    let (po, _ch) = stream::<()>();
     // Unidirectional "parenting" shouldn't override bidirectional linked.
     // We have to cheat with opts - the interface doesn't support them because
     // they don't make sense (redundant with task().supervised()).
@@ -760,7 +734,7 @@ fn test_spawn_linked_sup_fail_up() { // child fails; parent fails
         .. b0
     };
     do b1.spawn { fail; }
-    oldcomm::recv(po); // We should get punted awake
+    po.recv(); // We should get punted awake
 }
 #[test] #[should_fail] #[ignore(cfg(windows))]
 fn test_spawn_linked_sup_fail_down() { // parent fails; child fails
@@ -784,11 +758,10 @@ fn test_spawn_linked_sup_fail_down() { // parent fails; child fails
 }
 #[test] #[should_fail] #[ignore(cfg(windows))]
 fn test_spawn_linked_unsup_fail_up() { // child fails; parent fails
-    let po = oldcomm::Port::<()>();
-    let _ch = oldcomm::Chan(&po);
+    let (po, _ch) = stream::<()>();
     // Default options are to spawn linked & unsupervised.
     do spawn { fail; }
-    oldcomm::recv(po); // We should get punted awake
+    po.recv(); // We should get punted awake
 }
 #[test] #[should_fail] #[ignore(cfg(windows))]
 fn test_spawn_linked_unsup_fail_down() { // parent fails; child fails
@@ -856,27 +829,25 @@ fn test_spawn_linked_sup_propagate_sibling() {
 
 #[test]
 fn test_run_basic() {
-    let po = oldcomm::Port();
-    let ch = oldcomm::Chan(&po);
+    let (po, ch) = stream::<()>();
     do task().spawn {
-        oldcomm::send(ch, ());
+        ch.send(());
     }
-    oldcomm::recv(po);
+    po.recv();
 }
 
 #[test]
 fn test_add_wrapper() {
-    let po = oldcomm::Port();
-    let ch = oldcomm::Chan(&po);
+    let (po, ch) = stream::<()>();
     let b0 = task();
     let b1 = do b0.add_wrapper |body| {
         fn~(move body) {
             body();
-            oldcomm::send(ch, ());
+            ch.send(());
         }
     };
     do b1.spawn { }
-    oldcomm::recv(po);
+    po.recv();
 }
 
 #[test]
@@ -929,52 +900,46 @@ fn test_spawn_sched_no_threads() {
 
 #[test]
 fn test_spawn_sched() {
-    let po = oldcomm::Port();
-    let ch = oldcomm::Chan(&po);
+    let (po, ch) = stream::<()>();
+    let ch = SharedChan(ch);
 
-    fn f(i: int, ch: oldcomm::Chan<()>) {
-        unsafe {
-            let parent_sched_id = rt::rust_get_sched_id();
+    fn f(i: int, ch: SharedChan<()>) {
+        let parent_sched_id = unsafe { rt::rust_get_sched_id() };
 
-            do spawn_sched(SingleThreaded) {
-                unsafe {
-                    let child_sched_id = rt::rust_get_sched_id();
-                    assert parent_sched_id != child_sched_id;
-
-                    if (i == 0) {
-                        oldcomm::send(ch, ());
-                    } else {
-                        f(i - 1, ch);
-                    }
-                }
-            };
-        }
+        do spawn_sched(SingleThreaded) {
+            let child_sched_id = unsafe { rt::rust_get_sched_id() };
+            assert parent_sched_id != child_sched_id;
+
+            if (i == 0) {
+                ch.send(());
+            } else {
+                f(i - 1, ch.clone());
+            }
+        };
 
     }
     f(10, ch);
-    oldcomm::recv(po);
+    po.recv();
 }
 
 #[test]
-fn test_spawn_sched_childs_on_same_sched() {
-    let po = oldcomm::Port();
-    let ch = oldcomm::Chan(&po);
+fn test_spawn_sched_childs_on_default_sched() {
+    let (po, ch) = stream();
+
+    // Assuming tests run on the default scheduler
+    let default_id = unsafe { rt::rust_get_sched_id() };
 
     do spawn_sched(SingleThreaded) {
-        unsafe {
-            let parent_sched_id = rt::rust_get_sched_id();
-            do spawn {
-                unsafe {
-                    let child_sched_id = rt::rust_get_sched_id();
-                    // This should be on the same scheduler
-                    assert parent_sched_id == child_sched_id;
-                    oldcomm::send(ch, ());
-                }
-            };
-        }
+        let parent_sched_id = unsafe { rt::rust_get_sched_id() };
+        do spawn {
+            let child_sched_id = unsafe { rt::rust_get_sched_id() };
+            assert parent_sched_id != child_sched_id;
+            assert child_sched_id == default_id;
+            ch.send(());
+        };
     };
 
-    oldcomm::recv(po);
+    po.recv();
 }
 
 #[nolink]
@@ -996,10 +961,8 @@ fn test_spawn_sched_blocking() {
         // without affecting other schedulers
         for iter::repeat(20u) {
 
-            let start_po = oldcomm::Port();
-            let start_ch = oldcomm::Chan(&start_po);
-            let fin_po = oldcomm::Port();
-            let fin_ch = oldcomm::Chan(&fin_po);
+            let (start_po, start_ch) = stream();
+            let (fin_po, fin_ch) = stream();
 
             let lock = testrt::rust_dbg_lock_create();
 
@@ -1007,44 +970,42 @@ fn test_spawn_sched_blocking() {
                 unsafe {
                     testrt::rust_dbg_lock_lock(lock);
 
-                    oldcomm::send(start_ch, ());
+                    start_ch.send(());
 
                     // Block the scheduler thread
                     testrt::rust_dbg_lock_wait(lock);
                     testrt::rust_dbg_lock_unlock(lock);
 
-                    oldcomm::send(fin_ch, ());
+                    fin_ch.send(());
                 }
             };
 
             // Wait until the other task has its lock
-            oldcomm::recv(start_po);
+            start_po.recv();
 
-            fn pingpong(po: oldcomm::Port<int>, ch: oldcomm::Chan<int>) {
+            fn pingpong(po: &Port<int>, ch: &Chan<int>) {
                 let mut val = 20;
                 while val > 0 {
-                    val = oldcomm::recv(po);
-                    oldcomm::send(ch, val - 1);
+                    val = po.recv();
+                    ch.send(val - 1);
                 }
             }
 
-            let setup_po = oldcomm::Port();
-            let setup_ch = oldcomm::Chan(&setup_po);
-            let parent_po = oldcomm::Port();
-            let parent_ch = oldcomm::Chan(&parent_po);
+            let (setup_po, setup_ch) = stream();
+            let (parent_po, parent_ch) = stream();
             do spawn {
-                let child_po = oldcomm::Port();
-                oldcomm::send(setup_ch, oldcomm::Chan(&child_po));
-                pingpong(child_po, parent_ch);
+                let (child_po, child_ch) = stream();
+                setup_ch.send(child_ch);
+                pingpong(&child_po, &parent_ch);
             };
 
-            let child_ch = oldcomm::recv(setup_po);
-            oldcomm::send(child_ch, 20);
-            pingpong(parent_po, child_ch);
+            let child_ch = setup_po.recv();
+            child_ch.send(20);
+            pingpong(&parent_po, &child_ch);
             testrt::rust_dbg_lock_lock(lock);
             testrt::rust_dbg_lock_signal(lock);
             testrt::rust_dbg_lock_unlock(lock);
-            oldcomm::recv(fin_po);
+            fin_po.recv();
             testrt::rust_dbg_lock_destroy(lock);
         }
     }
@@ -1052,18 +1013,17 @@ fn test_spawn_sched_blocking() {
 
 #[cfg(test)]
 fn avoid_copying_the_body(spawnfn: fn(v: fn~())) {
-    let p = oldcomm::Port::<uint>();
-    let ch = oldcomm::Chan(&p);
+    let (p, ch) = stream::<uint>();
 
     let x = ~1;
     let x_in_parent = ptr::addr_of(&(*x)) as uint;
 
     do spawnfn |move x| {
         let x_in_child = ptr::addr_of(&(*x)) as uint;
-        oldcomm::send(ch, x_in_child);
+        ch.send(x_in_child);
     }
 
-    let x_in_child = oldcomm::recv(p);
+    let x_in_child = p.recv();
     assert x_in_parent == x_in_child;
 }
 
@@ -1101,20 +1061,18 @@ fn test_avoid_copying_the_body_unlinked() {
 
 #[test]
 fn test_platform_thread() {
-    let po = oldcomm::Port();
-    let ch = oldcomm::Chan(&po);
+    let (po, ch) = stream();
     do task().sched_mode(PlatformThread).spawn {
-        oldcomm::send(ch, ());
+        ch.send(());
     }
-    oldcomm::recv(po);
+    po.recv();
 }
 
 #[test]
 #[ignore(cfg(windows))]
 #[should_fail]
 fn test_unkillable() {
-    let po = oldcomm::Port();
-    let ch = po.chan();
+    let (po, ch) = stream();
 
     // We want to do this after failing
     do spawn_unlinked {
@@ -1242,7 +1200,7 @@ fn test_spawn_thread_on_demand() {
 
             let (port2, chan2) = pipes::stream();
 
-            do spawn() |move chan2| {
+            do spawn_sched(CurrentScheduler) |move chan2| {
                 chan2.send(());
             }
 
diff --git a/src/libcore/task/spawn.rs b/src/libcore/task/spawn.rs
index edeacb31e1d..a5ab4af40be 100644
--- a/src/libcore/task/spawn.rs
+++ b/src/libcore/task/spawn.rs
@@ -74,9 +74,8 @@
 #[warn(deprecated_mode)];
 
 use cast;
-use oldcomm;
 use option;
-use pipes::{Chan, Port};
+use pipes::{stream, Chan, Port};
 use pipes;
 use prelude::*;
 use private;
@@ -88,6 +87,7 @@ use task::rt::rust_closure;
 use task::rt;
 use task::{Failure, ManualThreads, PlatformThread, SchedOpts, SingleThreaded};
 use task::{Success, TaskOpts, TaskResult, ThreadPerCore, ThreadPerTask};
+use task::{ExistingScheduler, SchedulerHandle};
 use task::{default_task_opts, unkillable};
 use uint;
 use util;
@@ -536,9 +536,9 @@ pub fn spawn_raw(opts: TaskOpts, f: fn~()) {
             // Agh. Get move-mode items into the closure. FIXME (#2829)
             let (child_tg, ancestors, f) = option::swap_unwrap(child_data);
             // Create child task.
-            let new_task = match opts.sched {
-              None             => rt::new_task(),
-              Some(sched_opts) => new_task_in_new_sched(sched_opts)
+            let new_task = match opts.sched.mode {
+                DefaultScheduler => rt::new_task(),
+                _ => new_task_in_sched(opts.sched)
             };
             assert !new_task.is_null();
             // Getting killed after here would leak the task.
@@ -642,31 +642,35 @@ pub fn spawn_raw(opts: TaskOpts, f: fn~()) {
         }
     }
 
-    fn new_task_in_new_sched(opts: SchedOpts) -> *rust_task {
-        unsafe {
-            if opts.foreign_stack_size != None {
-                fail ~"foreign_stack_size scheduler option unimplemented";
-            }
+    fn new_task_in_sched(opts: SchedOpts) -> *rust_task {
+        if opts.foreign_stack_size != None {
+            fail ~"foreign_stack_size scheduler option unimplemented";
+        }
 
-            let num_threads = match opts.mode {
-              SingleThreaded => 1u,
-              ThreadPerCore => rt::rust_num_threads(),
-              ThreadPerTask => {
-                fail ~"ThreadPerTask scheduling mode unimplemented"
-              }
-              ManualThreads(threads) => {
-                if threads == 0u {
-                    fail ~"can not create a scheduler with no threads";
-                }
-                threads
-              }
-              PlatformThread => 0u /* Won't be used */
-            };
+        let num_threads = match opts.mode {
+          DefaultScheduler
+          | CurrentScheduler
+          | ExistingScheduler(*)
+          | PlatformThread => 0u, /* Won't be used */
+          SingleThreaded => 1u,
+          ThreadPerCore => unsafe { rt::rust_num_threads() },
+          ThreadPerTask => {
+            fail ~"ThreadPerTask scheduling mode unimplemented"
+          }
+          ManualThreads(threads) => {
+            if threads == 0u {
+                fail ~"can not create a scheduler with no threads";
+            }
+            threads
+          }
+        };
 
-            let sched_id = if opts.mode != PlatformThread {
-                rt::rust_new_sched(num_threads)
-            } else {
-                rt::rust_osmain_sched_id()
+        unsafe {
+            let sched_id = match opts.mode {
+                CurrentScheduler => rt::rust_get_sched_id(),
+                ExistingScheduler(SchedulerHandle(id)) => id,
+                PlatformThread => rt::rust_osmain_sched_id(),
+                _ => rt::rust_new_sched(num_threads)
             };
             rt::rust_new_task_in_sched(sched_id)
         }
@@ -675,12 +679,11 @@ pub fn spawn_raw(opts: TaskOpts, f: fn~()) {
 
 #[test]
 fn test_spawn_raw_simple() {
-    let po = oldcomm::Port();
-    let ch = oldcomm::Chan(&po);
+    let (po, ch) = stream();
     do spawn_raw(default_task_opts()) {
-        oldcomm::send(ch, ());
+        ch.send(());
     }
-    oldcomm::recv(po);
+    po.recv();
 }
 
 #[test]
diff --git a/src/libstd/flatpipes.rs b/src/libstd/flatpipes.rs
index ea7b2442bb9..afc3e72e636 100644
--- a/src/libstd/flatpipes.rs
+++ b/src/libstd/flatpipes.rs
@@ -792,7 +792,6 @@ mod test {
         let (finish_port, finish_chan) = pipes::stream();
 
         let addr = ip::v4::parse_addr("127.0.0.1");
-        let iotask = uv::global_loop::get();
 
         let begin_connect_chan = Cell(move begin_connect_chan);
         let accept_chan = Cell(move accept_chan);
@@ -800,6 +799,7 @@ mod test {
         // The server task
         do task::spawn |copy addr, move begin_connect_chan,
                         move accept_chan| {
+            let iotask = &uv::global_loop::get();
             let begin_connect_chan = begin_connect_chan.take();
             let accept_chan = accept_chan.take();
             let listen_res = do tcp::listen(
@@ -831,6 +831,7 @@ mod test {
             begin_connect_port.recv();
 
             debug!("connecting");
+            let iotask = &uv::global_loop::get();
             let connect_result = tcp::connect(copy addr, port, iotask);
             assert connect_result.is_ok();
             let sock = result::unwrap(move connect_result);
diff --git a/src/libstd/net_ip.rs b/src/libstd/net_ip.rs
index 84c3b755649..72e58cbd5d3 100644
--- a/src/libstd/net_ip.rs
+++ b/src/libstd/net_ip.rs
@@ -114,7 +114,7 @@ enum IpGetAddrErr {
  * a vector of `ip_addr` results, in the case of success, or an error
  * object in the case of failure
  */
-pub fn get_addr(node: &str, iotask: iotask)
+pub fn get_addr(node: &str, iotask: &iotask)
         -> result::Result<~[IpAddr], IpGetAddrErr> {
     do oldcomm::listen |output_ch| {
         do str::as_buf(node) |node_ptr, len| {
@@ -419,7 +419,7 @@ mod test {
     #[ignore(reason = "valgrind says it's leaky")]
     fn test_ip_get_addr() {
         let localhost_name = ~"localhost";
-        let iotask = uv::global_loop::get();
+        let iotask = &uv::global_loop::get();
         let ga_result = get_addr(localhost_name, iotask);
         if result::is_err(&ga_result) {
             fail ~"got err result from net::ip::get_addr();"
@@ -445,7 +445,7 @@ mod test {
     #[ignore(reason = "valgrind says it's leaky")]
     fn test_ip_get_addr_bad_input() {
         let localhost_name = ~"sjkl234m,./sdf";
-        let iotask = uv::global_loop::get();
+        let iotask = &uv::global_loop::get();
         let ga_result = get_addr(localhost_name, iotask);
         assert result::is_err(&ga_result);
     }
diff --git a/src/libstd/net_tcp.rs b/src/libstd/net_tcp.rs
index aa5eec2b43c..8d6de369479 100644
--- a/src/libstd/net_tcp.rs
+++ b/src/libstd/net_tcp.rs
@@ -143,7 +143,7 @@ pub enum TcpConnectErrData {
  * `net::tcp::tcp_connect_err_data` instance will be returned
  */
 pub fn connect(input_ip: ip::IpAddr, port: uint,
-           iotask: IoTask)
+               iotask: &IoTask)
     -> result::Result<TcpSocket, TcpConnectErrData> {
     unsafe {
         let result_po = oldcomm::Port::<ConnAttempt>();
@@ -166,106 +166,116 @@ pub fn connect(input_ip: ip::IpAddr, port: uint,
                 ip::Ipv4(_) => { false }
                 ip::Ipv6(_) => { true }
             },
-            iotask: iotask
+            iotask: iotask.clone()
         };
         let socket_data_ptr = ptr::addr_of(&(*socket_data));
         log(debug, fmt!("tcp_connect result_ch %?", conn_data.result_ch));
         // get an unsafe representation of our stream_handle_ptr that
         // we can send into the interact cb to be handled in libuv..
         log(debug, fmt!("stream_handle_ptr outside interact %?",
-            stream_handle_ptr));
+                        stream_handle_ptr));
         do iotask::interact(iotask) |move input_ip, loop_ptr| {
             unsafe {
                 log(debug, ~"in interact cb for tcp client connect..");
                 log(debug, fmt!("stream_handle_ptr in interact %?",
-                    stream_handle_ptr));
+                                stream_handle_ptr));
                 match uv::ll::tcp_init( loop_ptr, stream_handle_ptr) {
-                  0i32 => {
-                    log(debug, ~"tcp_init successful");
-                    log(debug, ~"dealing w/ ipv4 connection..");
-                    let connect_req_ptr =
-                        ptr::addr_of(&((*socket_data_ptr).connect_req));
-                    let addr_str = ip::format_addr(&input_ip);
-                    let connect_result = match input_ip {
-                      ip::Ipv4(ref addr) => {
-                        // have to "recreate" the sockaddr_in/6
-                        // since the ip_addr discards the port
-                        // info.. should probably add an additional
-                        // rust type that actually is closer to
-                        // what the libuv API expects (ip str + port num)
-                        log(debug, fmt!("addr: %?", addr));
-                        let in_addr = uv::ll::ip4_addr(addr_str, port as int);
-                        uv::ll::tcp_connect(
-                            connect_req_ptr,
-                            stream_handle_ptr,
-                            ptr::addr_of(&in_addr),
-                            tcp_connect_on_connect_cb)
-                      }
-                      ip::Ipv6(ref addr) => {
-                        log(debug, fmt!("addr: %?", addr));
-                        let in_addr = uv::ll::ip6_addr(addr_str, port as int);
-                        uv::ll::tcp_connect6(
-                            connect_req_ptr,
-                            stream_handle_ptr,
-                            ptr::addr_of(&in_addr),
-                            tcp_connect_on_connect_cb)
-                      }
-                    };
-                    match connect_result {
-                      0i32 => {
-                        log(debug, ~"tcp_connect successful");
-                        // reusable data that we'll have for the
-                        // duration..
-                        uv::ll::set_data_for_uv_handle(stream_handle_ptr,
-                                                   socket_data_ptr as
-                                                      *libc::c_void);
-                        // just so the connect_cb can send the
-                        // outcome..
-                        uv::ll::set_data_for_req(connect_req_ptr,
-                                                 conn_data_ptr);
-                        log(debug, ~"leaving tcp_connect interact cb...");
-                        // let tcp_connect_on_connect_cb send on
-                        // the result_ch, now..
-                      }
-                      _ => {
-                        // immediate connect failure.. probably a garbage
-                        // ip or somesuch
+                    0i32 => {
+                        log(debug, ~"tcp_init successful");
+                        log(debug, ~"dealing w/ ipv4 connection..");
+                        let connect_req_ptr =
+                            ptr::addr_of(&((*socket_data_ptr).connect_req));
+                        let addr_str = ip::format_addr(&input_ip);
+                        let connect_result = match input_ip {
+                            ip::Ipv4(ref addr) => {
+                                // have to "recreate" the
+                                // sockaddr_in/6 since the ip_addr
+                                // discards the port info.. should
+                                // probably add an additional rust
+                                // type that actually is closer to
+                                // what the libuv API expects (ip str
+                                // + port num)
+                                log(debug, fmt!("addr: %?", addr));
+                                let in_addr = uv::ll::ip4_addr(addr_str,
+                                                               port as int);
+                                uv::ll::tcp_connect(
+                                    connect_req_ptr,
+                                    stream_handle_ptr,
+                                    ptr::addr_of(&in_addr),
+                                    tcp_connect_on_connect_cb)
+                            }
+                            ip::Ipv6(ref addr) => {
+                                log(debug, fmt!("addr: %?", addr));
+                                let in_addr = uv::ll::ip6_addr(addr_str,
+                                                               port as int);
+                                uv::ll::tcp_connect6(
+                                    connect_req_ptr,
+                                    stream_handle_ptr,
+                                    ptr::addr_of(&in_addr),
+                                    tcp_connect_on_connect_cb)
+                            }
+                        };
+                        match connect_result {
+                            0i32 => {
+                                log(debug, ~"tcp_connect successful");
+                                // reusable data that we'll have for the
+                                // duration..
+                                uv::ll::set_data_for_uv_handle(
+                                    stream_handle_ptr,
+                                    socket_data_ptr as
+                                    *libc::c_void);
+                                // just so the connect_cb can send the
+                                // outcome..
+                                uv::ll::set_data_for_req(connect_req_ptr,
+                                                         conn_data_ptr);
+                                log(debug,
+                                    ~"leaving tcp_connect interact cb...");
+                                // let tcp_connect_on_connect_cb send on
+                                // the result_ch, now..
+                            }
+                            _ => {
+                                // immediate connect
+                                // failure.. probably a garbage ip or
+                                // somesuch
+                                let err_data =
+                                    uv::ll::get_last_err_data(loop_ptr);
+                                oldcomm::send((*conn_data_ptr).result_ch,
+                                              ConnFailure(err_data));
+                                uv::ll::set_data_for_uv_handle(
+                                    stream_handle_ptr,
+                                    conn_data_ptr);
+                                uv::ll::close(stream_handle_ptr,
+                                              stream_error_close_cb);
+                            }
+                        }
+                    }
+                    _ => {
+                        // failure to create a tcp handle
                         let err_data = uv::ll::get_last_err_data(loop_ptr);
                         oldcomm::send((*conn_data_ptr).result_ch,
-                                   ConnFailure(err_data));
-                        uv::ll::set_data_for_uv_handle(stream_handle_ptr,
-                                                       conn_data_ptr);
-                        uv::ll::close(stream_handle_ptr,
-                                      stream_error_close_cb);
-                      }
+                                      ConnFailure(err_data));
                     }
-                  }
-                  _ => {
-                    // failure to create a tcp handle
-                    let err_data = uv::ll::get_last_err_data(loop_ptr);
-                    oldcomm::send((*conn_data_ptr).result_ch,
-                               ConnFailure(err_data));
-                  }
                 }
             }
-        };
+        }
         match oldcomm::recv(result_po) {
-          ConnSuccess => {
-            log(debug, ~"tcp::connect - received success on result_po");
-            result::Ok(TcpSocket(socket_data))
-          }
-          ConnFailure(ref err_data) => {
-            oldcomm::recv(closed_signal_po);
-            log(debug, ~"tcp::connect - received failure on result_po");
-            // still have to free the malloc'd stream handle..
-            rustrt::rust_uv_current_kernel_free(stream_handle_ptr
-                                               as *libc::c_void);
-            let tcp_conn_err = match err_data.err_name {
-              ~"ECONNREFUSED" => ConnectionRefused,
-              _ => GenericConnectErr(err_data.err_name, err_data.err_msg)
-            };
-            result::Err(tcp_conn_err)
-          }
+            ConnSuccess => {
+                log(debug, ~"tcp::connect - received success on result_po");
+                result::Ok(TcpSocket(socket_data))
+            }
+            ConnFailure(ref err_data) => {
+                oldcomm::recv(closed_signal_po);
+                log(debug, ~"tcp::connect - received failure on result_po");
+                // still have to free the malloc'd stream handle..
+                rustrt::rust_uv_current_kernel_free(stream_handle_ptr
+                                                    as *libc::c_void);
+                let tcp_conn_err = match err_data.err_name {
+                    ~"ECONNREFUSED" => ConnectionRefused,
+                    _ => GenericConnectErr(err_data.err_name,
+                                           err_data.err_msg)
+                };
+                result::Err(tcp_conn_err)
+            }
         }
     }
 }
@@ -506,71 +516,79 @@ fn read_future(sock: &TcpSocket, timeout_msecs: uint)
 pub fn accept(new_conn: TcpNewConnection)
     -> result::Result<TcpSocket, TcpErrData> {
     unsafe {
-        match new_conn {
-          NewTcpConn(server_handle_ptr) => {
-            let server_data_ptr = uv::ll::get_data_for_uv_handle(
-                server_handle_ptr) as *TcpListenFcData;
-            let reader_po = oldcomm::Port();
-            let iotask = (*server_data_ptr).iotask;
-            let stream_handle_ptr = malloc_uv_tcp_t();
-            *(stream_handle_ptr as *mut uv::ll::uv_tcp_t) = uv::ll::tcp_t();
-            let client_socket_data = @TcpSocketData {
-                reader_po: reader_po,
-                reader_ch: oldcomm::Chan(&reader_po),
-                stream_handle_ptr : stream_handle_ptr,
-                connect_req : uv::ll::connect_t(),
-                write_req : uv::ll::write_t(),
-                ipv6: (*server_data_ptr).ipv6,
-                iotask : iotask
-            };
-            let client_socket_data_ptr = ptr::addr_of(&(*client_socket_data));
-            let client_stream_handle_ptr =
-                (*client_socket_data_ptr).stream_handle_ptr;
-
-            let result_po = oldcomm::Port::<Option<TcpErrData>>();
-            let result_ch = oldcomm::Chan(&result_po);
-
-            // UNSAFE LIBUV INTERACTION BEGIN
-            // .. normally this happens within the context of
-            // a call to uv::hl::interact.. but we're breaking
-            // the rules here because this always has to be
-            // called within the context of a listen() new_connect_cb
-            // callback (or it will likely fail and drown your cat)
-            log(debug, ~"in interact cb for tcp::accept");
-            let loop_ptr = uv::ll::get_loop_for_uv_handle(
-                server_handle_ptr);
-            match uv::ll::tcp_init(loop_ptr, client_stream_handle_ptr) {
-              0i32 => {
-                log(debug, ~"uv_tcp_init successful for client stream");
-                match uv::ll::accept(
-                    server_handle_ptr as *libc::c_void,
-                    client_stream_handle_ptr as *libc::c_void) {
-                  0i32 => {
-                    log(debug, ~"successfully accepted client connection");
-                    uv::ll::set_data_for_uv_handle(client_stream_handle_ptr,
-                                                   client_socket_data_ptr
-                                                       as *libc::c_void);
-                    oldcomm::send(result_ch, None);
-                  }
-                  _ => {
-                    log(debug, ~"failed to accept client conn");
-                    oldcomm::send(result_ch, Some(
-                        uv::ll::get_last_err_data(loop_ptr).to_tcp_err()));
-                  }
+        match new_conn{
+            NewTcpConn(server_handle_ptr) => {
+                let server_data_ptr = uv::ll::get_data_for_uv_handle(
+                    server_handle_ptr) as *TcpListenFcData;
+                let reader_po = oldcomm::Port();
+                let iotask = &(*server_data_ptr).iotask;
+                let stream_handle_ptr = malloc_uv_tcp_t();
+                *(stream_handle_ptr as *mut uv::ll::uv_tcp_t) =
+                    uv::ll::tcp_t();
+                let client_socket_data: @TcpSocketData = @TcpSocketData {
+                    reader_po: reader_po,
+                    reader_ch: oldcomm::Chan(&reader_po),
+                    stream_handle_ptr : stream_handle_ptr,
+                    connect_req : uv::ll::connect_t(),
+                    write_req : uv::ll::write_t(),
+                    ipv6: (*server_data_ptr).ipv6,
+                    iotask : iotask.clone()
+                };
+                let client_socket_data_ptr = ptr::addr_of(
+                    &(*client_socket_data));
+                let client_stream_handle_ptr =
+                    (*client_socket_data_ptr).stream_handle_ptr;
+
+                let result_po = oldcomm::Port::<Option<TcpErrData>>();
+                let result_ch = oldcomm::Chan(&result_po);
+
+                // UNSAFE LIBUV INTERACTION BEGIN
+                // .. normally this happens within the context of
+                // a call to uv::hl::interact.. but we're breaking
+                // the rules here because this always has to be
+                // called within the context of a listen() new_connect_cb
+                // callback (or it will likely fail and drown your cat)
+                log(debug, ~"in interact cb for tcp::accept");
+                let loop_ptr = uv::ll::get_loop_for_uv_handle(
+                    server_handle_ptr);
+                match uv::ll::tcp_init(loop_ptr, client_stream_handle_ptr) {
+                    0i32 => {
+                        log(debug, ~"uv_tcp_init successful for \
+                                     client stream");
+                        match uv::ll::accept(
+                            server_handle_ptr as *libc::c_void,
+                            client_stream_handle_ptr as *libc::c_void) {
+                            0i32 => {
+                                log(debug,
+                                    ~"successfully accepted client \
+                                      connection");
+                                uv::ll::set_data_for_uv_handle(
+                                    client_stream_handle_ptr,
+                                    client_socket_data_ptr
+                                    as *libc::c_void);
+                                oldcomm::send(result_ch, None);
+                            }
+                            _ => {
+                                log(debug, ~"failed to accept client conn");
+                                oldcomm::send(result_ch, Some(
+                                    uv::ll::get_last_err_data(
+                                        loop_ptr).to_tcp_err()));
+                            }
+                        }
+                    }
+                    _ => {
+                        log(debug, ~"failed to accept client stream");
+                        oldcomm::send(result_ch, Some(
+                            uv::ll::get_last_err_data(
+                                loop_ptr).to_tcp_err()));
+                    }
+                }
+                // UNSAFE LIBUV INTERACTION END
+                match oldcomm::recv(result_po) {
+                    Some(copy err_data) => result::Err(err_data),
+                    None => result::Ok(TcpSocket(client_socket_data))
                 }
-              }
-              _ => {
-                log(debug, ~"failed to init client stream");
-                oldcomm::send(result_ch, Some(
-                    uv::ll::get_last_err_data(loop_ptr).to_tcp_err()));
-              }
-            }
-            // UNSAFE LIBUV INTERACTION END
-            match oldcomm::recv(result_po) {
-              Some(copy err_data) => result::Err(err_data),
-              None => result::Ok(TcpSocket(client_socket_data))
             }
-          }
         }
     }
 }
@@ -604,30 +622,27 @@ pub fn accept(new_conn: TcpNewConnection)
  * of listen exiting because of an error
  */
 pub fn listen(host_ip: ip::IpAddr, port: uint, backlog: uint,
-          iotask: IoTask,
-          on_establish_cb: fn~(oldcomm::Chan<Option<TcpErrData>>),
-          new_connect_cb: fn~(TcpNewConnection,
-                               oldcomm::Chan<Option<TcpErrData>>))
+              iotask: &IoTask,
+              on_establish_cb: fn~(oldcomm::Chan<Option<TcpErrData>>),
+              new_connect_cb: fn~(TcpNewConnection,
+                                  oldcomm::Chan<Option<TcpErrData>>))
     -> result::Result<(), TcpListenErrData> {
-    unsafe {
-        do listen_common(move host_ip, port, backlog, iotask,
-                         move on_establish_cb)
-            // on_connect_cb
-            |move new_connect_cb, handle| {
-                unsafe {
-                    let server_data_ptr =
-                        uv::ll::get_data_for_uv_handle(handle)
-                        as *TcpListenFcData;
-                    let new_conn = NewTcpConn(handle);
-                    let kill_ch = (*server_data_ptr).kill_ch;
-                    new_connect_cb(new_conn, kill_ch);
-                }
-            }
+    do listen_common(move host_ip, port, backlog, iotask,
+                     move on_establish_cb)
+        // on_connect_cb
+        |move new_connect_cb, handle| {
+        unsafe {
+            let server_data_ptr = uv::ll::get_data_for_uv_handle(handle)
+                as *TcpListenFcData;
+            let new_conn = NewTcpConn(handle);
+            let kill_ch = (*server_data_ptr).kill_ch;
+            new_connect_cb(new_conn, kill_ch);
+        }
     }
 }
 
 fn listen_common(host_ip: ip::IpAddr, port: uint, backlog: uint,
-          iotask: IoTask,
+          iotask: &IoTask,
           on_establish_cb: fn~(oldcomm::Chan<Option<TcpErrData>>),
           on_connect_cb: fn~(*uv::ll::uv_tcp_t))
     -> result::Result<(), TcpListenErrData> {
@@ -637,12 +652,12 @@ fn listen_common(host_ip: ip::IpAddr, port: uint, backlog: uint,
         let kill_ch = oldcomm::Chan(&kill_po);
         let server_stream = uv::ll::tcp_t();
         let server_stream_ptr = ptr::addr_of(&server_stream);
-        let server_data = {
+        let server_data: TcpListenFcData = TcpListenFcData {
             server_stream_ptr: server_stream_ptr,
             stream_closed_ch: oldcomm::Chan(&stream_closed_po),
             kill_ch: kill_ch,
             on_connect_cb: move on_connect_cb,
-            iotask: iotask,
+            iotask: iotask.clone(),
             ipv6: match &host_ip {
                 &ip::Ipv4(_) => { false }
                 &ip::Ipv6(_) => { true }
@@ -662,114 +677,123 @@ fn listen_common(host_ip: ip::IpAddr, port: uint, backlog: uint,
             do iotask::interact(iotask) |move loc_ip, loop_ptr| {
                 unsafe {
                     match uv::ll::tcp_init(loop_ptr, server_stream_ptr) {
-                      0i32 => {
-                        uv::ll::set_data_for_uv_handle(
-                            server_stream_ptr,
-                            server_data_ptr);
-                        let addr_str = ip::format_addr(&loc_ip);
-                        let bind_result = match loc_ip {
-                          ip::Ipv4(ref addr) => {
-                            log(debug, fmt!("addr: %?", addr));
-                            let in_addr = uv::ll::ip4_addr(addr_str,
-                                                           port as int);
-                            uv::ll::tcp_bind(server_stream_ptr,
-                                             ptr::addr_of(&in_addr))
-                          }
-                          ip::Ipv6(ref addr) => {
-                            log(debug, fmt!("addr: %?", addr));
-                            let in_addr = uv::ll::ip6_addr(addr_str,
-                                                           port as int);
-                            uv::ll::tcp_bind6(server_stream_ptr,
-                                             ptr::addr_of(&in_addr))
-                          }
-                        };
-                        match bind_result {
-                          0i32 => {
-                            match uv::ll::listen(server_stream_ptr,
-                                               backlog as libc::c_int,
-                                               tcp_lfc_on_connection_cb) {
-                              0i32 => oldcomm::send(setup_ch, None),
-                              _ => {
-                                log(debug, ~"failure to uv_listen()");
-                                let err_data = uv::ll::get_last_err_data(
-                                    loop_ptr);
-                                oldcomm::send(setup_ch, Some(err_data));
-                              }
+                        0i32 => {
+                            uv::ll::set_data_for_uv_handle(
+                                server_stream_ptr,
+                                server_data_ptr);
+                            let addr_str = ip::format_addr(&loc_ip);
+                            let bind_result = match loc_ip {
+                                ip::Ipv4(ref addr) => {
+                                    log(debug, fmt!("addr: %?", addr));
+                                    let in_addr = uv::ll::ip4_addr(
+                                        addr_str,
+                                        port as int);
+                                    uv::ll::tcp_bind(server_stream_ptr,
+                                                     ptr::addr_of(&in_addr))
+                                }
+                                ip::Ipv6(ref addr) => {
+                                    log(debug, fmt!("addr: %?", addr));
+                                    let in_addr = uv::ll::ip6_addr(
+                                        addr_str,
+                                        port as int);
+                                    uv::ll::tcp_bind6(server_stream_ptr,
+                                                      ptr::addr_of(&in_addr))
+                                }
+                            };
+                            match bind_result {
+                                0i32 => {
+                                    match uv::ll::listen(
+                                        server_stream_ptr,
+                                        backlog as libc::c_int,
+                                        tcp_lfc_on_connection_cb) {
+                                        0i32 => oldcomm::send(setup_ch, None),
+                                        _ => {
+                                            log(debug,
+                                                ~"failure to uv_tcp_init");
+                                            let err_data =
+                                                uv::ll::get_last_err_data(
+                                                    loop_ptr);
+                                            oldcomm::send(setup_ch,
+                                                          Some(err_data));
+                                        }
+                                    }
+                                }
+                                _ => {
+                                    log(debug, ~"failure to uv_tcp_bind");
+                                    let err_data = uv::ll::get_last_err_data(
+                                        loop_ptr);
+                                    oldcomm::send(setup_ch, Some(err_data));
+                                }
                             }
-                          }
-                          _ => {
+                        }
+                        _ => {
                             log(debug, ~"failure to uv_tcp_bind");
                             let err_data = uv::ll::get_last_err_data(
                                 loop_ptr);
                             oldcomm::send(setup_ch, Some(err_data));
-                          }
                         }
-                      }
-                      _ => {
-                        log(debug, ~"failure to uv_tcp_init");
-                        let err_data = uv::ll::get_last_err_data(loop_ptr);
-                        oldcomm::send(setup_ch, Some(err_data));
-                      }
                     }
-                };
+                }
             }
             setup_ch.recv()
         };
         match setup_result {
-          Some(ref err_data) => {
-            do iotask::interact(iotask) |loop_ptr| {
-                unsafe {
-                    log(debug,
-                        fmt!("tcp::listen post-kill recv hl interact %?",
-                             loop_ptr));
-                    (*server_data_ptr).active = false;
-                    uv::ll::close(server_stream_ptr, tcp_lfc_close_cb);
+            Some(ref err_data) => {
+                do iotask::interact(iotask) |loop_ptr| {
+                    unsafe {
+                        log(debug,
+                            fmt!("tcp::listen post-kill recv hl interact %?",
+                                 loop_ptr));
+                        (*server_data_ptr).active = false;
+                        uv::ll::close(server_stream_ptr, tcp_lfc_close_cb);
+                    }
+                };
+                stream_closed_po.recv();
+                match err_data.err_name {
+                    ~"EACCES" => {
+                        log(debug, ~"Got EACCES error");
+                        result::Err(AccessDenied)
+                    }
+                    ~"EADDRINUSE" => {
+                        log(debug, ~"Got EADDRINUSE error");
+                        result::Err(AddressInUse)
+                    }
+                    _ => {
+                        log(debug, fmt!("Got '%s' '%s' libuv error",
+                                        err_data.err_name, err_data.err_msg));
+                        result::Err(
+                            GenericListenErr(err_data.err_name,
+                                             err_data.err_msg))
+                    }
                 }
-            };
-            stream_closed_po.recv();
-            match err_data.err_name {
-              ~"EACCES" => {
-                log(debug, ~"Got EACCES error");
-                result::Err(AccessDenied)
-              }
-              ~"EADDRINUSE" => {
-                log(debug, ~"Got EADDRINUSE error");
-                result::Err(AddressInUse)
-              }
-              _ => {
-                log(debug, fmt!("Got '%s' '%s' libuv error",
-                                err_data.err_name, err_data.err_msg));
-                result::Err(
-                    GenericListenErr(err_data.err_name, err_data.err_msg))
-              }
             }
-          }
-          None => {
-            on_establish_cb(kill_ch);
-            let kill_result = oldcomm::recv(kill_po);
-            do iotask::interact(iotask) |loop_ptr| {
-                unsafe {
-                    log(debug,
-                        fmt!("tcp::listen post-kill recv hl interact %?",
-                             loop_ptr));
-                    (*server_data_ptr).active = false;
-                    uv::ll::close(server_stream_ptr, tcp_lfc_close_cb);
+            None => {
+                on_establish_cb(kill_ch);
+                let kill_result = oldcomm::recv(kill_po);
+                do iotask::interact(iotask) |loop_ptr| {
+                    unsafe {
+                        log(debug,
+                            fmt!("tcp::listen post-kill recv hl interact %?",
+                                 loop_ptr));
+                        (*server_data_ptr).active = false;
+                        uv::ll::close(server_stream_ptr, tcp_lfc_close_cb);
+                    }
+                };
+                stream_closed_po.recv();
+                match kill_result {
+                    // some failure post bind/listen
+                    Some(ref err_data) => result::Err(GenericListenErr(
+                        err_data.err_name,
+                        err_data.err_msg)),
+                    // clean exit
+                    None => result::Ok(())
                 }
-            };
-            stream_closed_po.recv();
-            match kill_result {
-              // some failure post bind/listen
-              Some(ref err_data) => result::Err(GenericListenErr(
-                  err_data.err_name,
-                  err_data.err_msg)),
-              // clean exit
-              None => result::Ok(())
             }
-          }
         }
     }
 }
 
+
 /**
  * Convert a `net::tcp::tcp_socket` to a `net::tcp::tcp_socket_buf`.
  *
@@ -936,11 +960,11 @@ fn tear_down_socket_data(socket_data: @TcpSocketData) {
         };
         let close_data_ptr = ptr::addr_of(&close_data);
         let stream_handle_ptr = (*socket_data).stream_handle_ptr;
-        do iotask::interact((*socket_data).iotask) |loop_ptr| {
+        do iotask::interact(&(*socket_data).iotask) |loop_ptr| {
             unsafe {
                 log(debug,
                     fmt!("interact dtor for tcp_socket stream %? loop %?",
-                    stream_handle_ptr, loop_ptr));
+                         stream_handle_ptr, loop_ptr));
                 uv::ll::set_data_for_uv_handle(stream_handle_ptr,
                                                close_data_ptr);
                 uv::ll::close(stream_handle_ptr, tcp_socket_dtor_close_cb);
@@ -950,7 +974,7 @@ fn tear_down_socket_data(socket_data: @TcpSocketData) {
         //the line below will most likely crash
         //log(debug, fmt!("about to free socket_data at %?", socket_data));
         rustrt::rust_uv_current_kernel_free(stream_handle_ptr
-                                           as *libc::c_void);
+                                            as *libc::c_void);
         log(debug, ~"exiting dtor for tcp_socket");
     }
 }
@@ -962,7 +986,7 @@ fn read_common_impl(socket_data: *TcpSocketData, timeout_msecs: uint)
         use timer;
 
         log(debug, ~"starting tcp::read");
-        let iotask = (*socket_data).iotask;
+        let iotask = &(*socket_data).iotask;
         let rs_result = read_start_common_impl(socket_data);
         if result::is_err(&rs_result) {
             let err_data = result::get_err(&rs_result);
@@ -972,26 +996,26 @@ fn read_common_impl(socket_data: *TcpSocketData, timeout_msecs: uint)
             log(debug, ~"tcp::read before recv_timeout");
             let read_result = if timeout_msecs > 0u {
                 timer::recv_timeout(
-                   iotask, timeout_msecs, result::get(&rs_result))
+                    iotask, timeout_msecs, result::get(&rs_result))
             } else {
                 Some(oldcomm::recv(result::get(&rs_result)))
             };
             log(debug, ~"tcp::read after recv_timeout");
             match move read_result {
-              None => {
-                log(debug, ~"tcp::read: timed out..");
-                let err_data = TcpErrData {
-                    err_name: ~"TIMEOUT",
-                    err_msg: ~"req timed out"
-                };
-                read_stop_common_impl(socket_data);
-                result::Err(err_data)
-              }
-              Some(move data_result) => {
-                log(debug, ~"tcp::read got data");
-                read_stop_common_impl(socket_data);
-                data_result
-              }
+                None => {
+                    log(debug, ~"tcp::read: timed out..");
+                    let err_data = TcpErrData {
+                        err_name: ~"TIMEOUT",
+                        err_msg: ~"req timed out"
+                    };
+                    read_stop_common_impl(socket_data);
+                    result::Err(err_data)
+                }
+                Some(move data_result) => {
+                    log(debug, ~"tcp::read got data");
+                    read_stop_common_impl(socket_data);
+                    data_result
+                }
             }
         }
     }
@@ -1004,27 +1028,26 @@ fn read_stop_common_impl(socket_data: *TcpSocketData) ->
         let stream_handle_ptr = (*socket_data).stream_handle_ptr;
         let stop_po = oldcomm::Port::<Option<TcpErrData>>();
         let stop_ch = oldcomm::Chan(&stop_po);
-        do iotask::interact((*socket_data).iotask) |loop_ptr| {
+        do iotask::interact(&(*socket_data).iotask) |loop_ptr| {
             unsafe {
                 log(debug, ~"in interact cb for tcp::read_stop");
-                match uv::ll::read_stop(stream_handle_ptr as
-                                        *uv::ll::uv_stream_t) {
-                  0i32 => {
-                    log(debug, ~"successfully called uv_read_stop");
-                    oldcomm::send(stop_ch, None);
-                  }
-                  _ => {
-                    log(debug, ~"failure in calling uv_read_stop");
-                    let err_data = uv::ll::get_last_err_data(loop_ptr);
-                    oldcomm::send(stop_ch, Some(err_data.to_tcp_err()));
-                  }
+                match uv::ll::read_stop(stream_handle_ptr
+                                        as *uv::ll::uv_stream_t) {
+                    0i32 => {
+                        log(debug, ~"successfully called uv_read_stop");
+                        oldcomm::send(stop_ch, None);
+                    }
+                    _ => {
+                        log(debug, ~"failure in calling uv_read_stop");
+                        let err_data = uv::ll::get_last_err_data(loop_ptr);
+                        oldcomm::send(stop_ch, Some(err_data.to_tcp_err()));
+                    }
                 }
             }
-        };
-
+        }
         match oldcomm::recv(stop_po) {
-          Some(move err_data) => Err(err_data),
-          None => Ok(())
+            Some(move err_data) => Err(err_data),
+            None => Ok(())
         }
     }
 }
@@ -1038,29 +1061,29 @@ fn read_start_common_impl(socket_data: *TcpSocketData)
         let start_po = oldcomm::Port::<Option<uv::ll::uv_err_data>>();
         let start_ch = oldcomm::Chan(&start_po);
         log(debug, ~"in tcp::read_start before interact loop");
-        do iotask::interact((*socket_data).iotask) |loop_ptr| {
+        do iotask::interact(&(*socket_data).iotask) |loop_ptr| {
             unsafe {
-                log(debug,
-                    fmt!("in tcp::read_start interact cb %?", loop_ptr));
-                match uv::ll::read_start(stream_handle_ptr as
-                                         *uv::ll::uv_stream_t,
+                log(debug, fmt!("in tcp::read_start interact cb %?",
+                                loop_ptr));
+                match uv::ll::read_start(stream_handle_ptr
+                                         as *uv::ll::uv_stream_t,
                                          on_alloc_cb,
                                          on_tcp_read_cb) {
-                  0i32 => {
-                    log(debug, ~"success doing uv_read_start");
-                    oldcomm::send(start_ch, None);
-                  }
-                  _ => {
-                    log(debug, ~"error attempting uv_read_start");
-                    let err_data = uv::ll::get_last_err_data(loop_ptr);
-                    oldcomm::send(start_ch, Some(err_data));
-                  }
+                    0i32 => {
+                        log(debug, ~"success doing uv_read_start");
+                        oldcomm::send(start_ch, None);
+                    }
+                    _ => {
+                        log(debug, ~"error attempting uv_read_start");
+                        let err_data = uv::ll::get_last_err_data(loop_ptr);
+                        oldcomm::send(start_ch, Some(err_data));
+                    }
                 }
             }
-        };
+        }
         match oldcomm::recv(start_po) {
-          Some(ref err_data) => result::Err(err_data.to_tcp_err()),
-          None => result::Ok((*socket_data).reader_po)
+            Some(ref err_data) => result::Err(err_data.to_tcp_err()),
+            None => result::Ok((*socket_data).reader_po)
         }
     }
 }
@@ -1084,27 +1107,28 @@ fn write_common_impl(socket_data_ptr: *TcpSocketData,
             result_ch: oldcomm::Chan(&result_po)
         };
         let write_data_ptr = ptr::addr_of(&write_data);
-        do iotask::interact((*socket_data_ptr).iotask) |loop_ptr| {
+        do iotask::interact(&(*socket_data_ptr).iotask) |loop_ptr| {
             unsafe {
                 log(debug, fmt!("in interact cb for tcp::write %?",
                                 loop_ptr));
                 match uv::ll::write(write_req_ptr,
-                                  stream_handle_ptr,
-                                  write_buf_vec_ptr,
-                                  tcp_write_complete_cb) {
-                  0i32 => {
-                    log(debug, ~"uv_write() invoked successfully");
-                    uv::ll::set_data_for_req(write_req_ptr, write_data_ptr);
-                  }
-                  _ => {
-                    log(debug, ~"error invoking uv_write()");
-                    let err_data = uv::ll::get_last_err_data(loop_ptr);
-                    oldcomm::send((*write_data_ptr).result_ch,
-                               TcpWriteError(err_data.to_tcp_err()));
-                  }
+                                    stream_handle_ptr,
+                                    write_buf_vec_ptr,
+                                    tcp_write_complete_cb) {
+                    0i32 => {
+                        log(debug, ~"uv_write() invoked successfully");
+                        uv::ll::set_data_for_req(write_req_ptr,
+                                                 write_data_ptr);
+                    }
+                    _ => {
+                        log(debug, ~"error invoking uv_write()");
+                        let err_data = uv::ll::get_last_err_data(loop_ptr);
+                        oldcomm::send((*write_data_ptr).result_ch,
+                                      TcpWriteError(err_data.to_tcp_err()));
+                    }
                 }
             }
-        };
+        }
         // FIXME (#2656): Instead of passing unsafe pointers to local data,
         // and waiting here for the write to complete, we should transfer
         // ownership of everything to the I/O task and let it deal with the
@@ -1473,7 +1497,7 @@ pub mod test {
         }
     }
     pub fn impl_gl_tcp_ipv4_server_and_client() {
-        let hl_loop = uv::global_loop::get();
+        let hl_loop = &uv::global_loop::get();
         let server_ip = ~"127.0.0.1";
         let server_port = 8888u;
         let expected_req = ~"ping";
@@ -1485,6 +1509,7 @@ pub mod test {
         let cont_po = oldcomm::Port::<()>();
         let cont_ch = oldcomm::Chan(&cont_po);
         // server
+        let hl_loop_clone = hl_loop.clone();
         do task::spawn_sched(task::ManualThreads(1u)) {
             let actual_req = do oldcomm::listen |server_ch| {
                 run_tcp_test_server(
@@ -1493,7 +1518,7 @@ pub mod test {
                     expected_resp,
                     server_ch,
                     cont_ch,
-                    hl_loop)
+                    &hl_loop_clone)
             };
             server_result_ch.send(actual_req);
         };
@@ -1519,7 +1544,7 @@ pub mod test {
         assert str::contains(actual_resp, expected_resp);
     }
     pub fn impl_gl_tcp_ipv4_get_peer_addr() {
-        let hl_loop = uv::global_loop::get();
+        let hl_loop = &uv::global_loop::get();
         let server_ip = ~"127.0.0.1";
         let server_port = 8887u;
         let expected_resp = ~"pong";
@@ -1530,6 +1555,7 @@ pub mod test {
         let cont_po = oldcomm::Port::<()>();
         let cont_ch = oldcomm::Chan(&cont_po);
         // server
+        let hl_loop_clone = hl_loop.clone();
         do task::spawn_sched(task::ManualThreads(1u)) {
             let actual_req = do oldcomm::listen |server_ch| {
                 run_tcp_test_server(
@@ -1538,7 +1564,7 @@ pub mod test {
                     expected_resp,
                     server_ch,
                     cont_ch,
-                    hl_loop)
+                    &hl_loop_clone)
             };
             server_result_ch.send(actual_req);
         };
@@ -1549,10 +1575,11 @@ pub mod test {
             let server_ip_addr = ip::v4::parse_addr(server_ip);
             let iotask = uv::global_loop::get();
             let connect_result = connect(move server_ip_addr, server_port,
-                                         iotask);
+                                         &iotask);
 
             let sock = result::unwrap(move connect_result);
 
+            debug!("testing peer address");
             // This is what we are actually testing!
             assert net::ip::format_addr(&sock.get_peer_addr()) ==
                 ~"127.0.0.1";
@@ -1561,12 +1588,14 @@ pub mod test {
             // Fulfill the protocol the test server expects
             let resp_bytes = str::to_bytes(~"ping");
             tcp_write_single(&sock, resp_bytes);
+            debug!("message sent");
             let read_result = sock.read(0u);
             client_ch.send(str::from_bytes(read_result.get()));
+            debug!("result read");
         };
     }
     pub fn impl_gl_tcp_ipv4_client_error_connection_refused() {
-        let hl_loop = uv::global_loop::get();
+        let hl_loop = &uv::global_loop::get();
         let server_ip = ~"127.0.0.1";
         let server_port = 8889u;
         let expected_req = ~"ping";
@@ -1586,7 +1615,7 @@ pub mod test {
         }
     }
     pub fn impl_gl_tcp_ipv4_server_address_in_use() {
-        let hl_loop = uv::global_loop::get();
+        let hl_loop = &uv::global_loop::get();
         let server_ip = ~"127.0.0.1";
         let server_port = 8890u;
         let expected_req = ~"ping";
@@ -1598,6 +1627,7 @@ pub mod test {
         let cont_po = oldcomm::Port::<()>();
         let cont_ch = oldcomm::Chan(&cont_po);
         // server
+        let hl_loop_clone = hl_loop.clone();
         do task::spawn_sched(task::ManualThreads(1u)) {
             let actual_req = do oldcomm::listen |server_ch| {
                 run_tcp_test_server(
@@ -1606,7 +1636,7 @@ pub mod test {
                     expected_resp,
                     server_ch,
                     cont_ch,
-                    hl_loop)
+                    &hl_loop_clone)
             };
             server_result_ch.send(actual_req);
         };
@@ -1637,7 +1667,7 @@ pub mod test {
         }
     }
     pub fn impl_gl_tcp_ipv4_server_access_denied() {
-        let hl_loop = uv::global_loop::get();
+        let hl_loop = &uv::global_loop::get();
         let server_ip = ~"127.0.0.1";
         let server_port = 80u;
         // this one should fail..
@@ -1657,7 +1687,7 @@ pub mod test {
     }
     pub fn impl_gl_tcp_ipv4_server_client_reader_writer() {
 
-        let iotask = uv::global_loop::get();
+        let iotask = &uv::global_loop::get();
         let server_ip = ~"127.0.0.1";
         let server_port = 8891u;
         let expected_req = ~"ping";
@@ -1669,6 +1699,7 @@ pub mod test {
         let cont_po = oldcomm::Port::<()>();
         let cont_ch = oldcomm::Chan(&cont_po);
         // server
+        let iotask_clone = iotask.clone();
         do task::spawn_sched(task::ManualThreads(1u)) {
             let actual_req = do oldcomm::listen |server_ch| {
                 run_tcp_test_server(
@@ -1677,7 +1708,7 @@ pub mod test {
                     expected_resp,
                     server_ch,
                     cont_ch,
-                    iotask)
+                    &iotask_clone)
             };
             server_result_ch.send(actual_req);
         };
@@ -1708,7 +1739,7 @@ pub mod test {
     pub fn impl_tcp_socket_impl_reader_handles_eof() {
         use core::io::{Reader,ReaderUtil};
 
-        let hl_loop = uv::global_loop::get();
+        let hl_loop = &uv::global_loop::get();
         let server_ip = ~"127.0.0.1";
         let server_port = 10041u;
         let expected_req = ~"GET /";
@@ -1720,6 +1751,7 @@ pub mod test {
         let cont_po = oldcomm::Port::<()>();
         let cont_ch = oldcomm::Chan(&cont_po);
         // server
+        let hl_loop_clone = hl_loop.clone();
         do task::spawn_sched(task::ManualThreads(1u)) {
             let actual_req = do oldcomm::listen |server_ch| {
                 run_tcp_test_server(
@@ -1728,7 +1760,7 @@ pub mod test {
                     expected_resp,
                     server_ch,
                     cont_ch,
-                    hl_loop)
+                    &hl_loop_clone)
             };
             server_result_ch.send(actual_req);
         };
@@ -1768,7 +1800,7 @@ pub mod test {
     fn run_tcp_test_server(server_ip: &str, server_port: uint, resp: ~str,
                           server_ch: oldcomm::Chan<~str>,
                           cont_ch: oldcomm::Chan<()>,
-                          iotask: IoTask) -> ~str {
+                          iotask: &IoTask) -> ~str {
         let server_ip_addr = ip::v4::parse_addr(server_ip);
         let listen_result = listen(move server_ip_addr, server_port, 128,
                                    iotask,
@@ -1855,7 +1887,7 @@ pub mod test {
     }
 
     fn run_tcp_test_server_fail(server_ip: &str, server_port: uint,
-                          iotask: IoTask) -> TcpListenErrData {
+                                iotask: &IoTask) -> TcpListenErrData {
         let server_ip_addr = ip::v4::parse_addr(server_ip);
         let listen_result = listen(move server_ip_addr, server_port, 128,
                                    iotask,
@@ -1879,7 +1911,7 @@ pub mod test {
 
     fn run_tcp_test_client(server_ip: &str, server_port: uint, resp: &str,
                           client_ch: oldcomm::Chan<~str>,
-                          iotask: IoTask) -> result::Result<~str,
+                          iotask: &IoTask) -> result::Result<~str,
                                                     TcpConnectErrData> {
         let server_ip_addr = ip::v4::parse_addr(server_ip);
 
diff --git a/src/libstd/timer.rs b/src/libstd/timer.rs
index f8147c532e6..a3e39bc7bbc 100644
--- a/src/libstd/timer.rs
+++ b/src/libstd/timer.rs
@@ -39,7 +39,7 @@ use core;
  * * ch - a channel of type T to send a `val` on
  * * val - a value of type T to send over the provided `ch`
  */
-pub fn delayed_send<T: Owned>(iotask: IoTask,
+pub fn delayed_send<T: Owned>(iotask: &IoTask,
                               msecs: uint,
                               ch: oldcomm::Chan<T>,
                               val: T) {
@@ -92,7 +92,7 @@ pub fn delayed_send<T: Owned>(iotask: IoTask,
  * * `iotask` - a `uv::iotask` that the tcp request will run on
  * * msecs - an amount of time, in milliseconds, for the current task to block
  */
-pub fn sleep(iotask: IoTask, msecs: uint) {
+pub fn sleep(iotask: &IoTask, msecs: uint) {
     let exit_po = oldcomm::Port::<()>();
     let exit_ch = oldcomm::Chan(&exit_po);
     delayed_send(iotask, msecs, exit_ch, ());
@@ -119,7 +119,7 @@ pub fn sleep(iotask: IoTask, msecs: uint) {
  * on the provided port in the allotted timeout period, then the result will
  * be a `some(T)`. If not, then `none` will be returned.
  */
-pub fn recv_timeout<T: Copy Owned>(iotask: IoTask,
+pub fn recv_timeout<T: Copy Owned>(iotask: &IoTask,
                                    msecs: uint,
                                    wait_po: oldcomm::Port<T>)
                                 -> Option<T> {
@@ -183,13 +183,13 @@ mod test {
 
     #[test]
     fn test_gl_timer_simple_sleep_test() {
-        let hl_loop = uv::global_loop::get();
+        let hl_loop = &uv::global_loop::get();
         sleep(hl_loop, 1u);
     }
 
     #[test]
     fn test_gl_timer_sleep_stress1() {
-        let hl_loop = uv::global_loop::get();
+        let hl_loop = &uv::global_loop::get();
         for iter::repeat(50u) {
             sleep(hl_loop, 1u);
         }
@@ -199,7 +199,7 @@ mod test {
     fn test_gl_timer_sleep_stress2() {
         let po = oldcomm::Port();
         let ch = oldcomm::Chan(&po);
-        let hl_loop = uv::global_loop::get();
+        let hl_loop = &uv::global_loop::get();
 
         let repeat = 20u;
         let spec = {
@@ -214,11 +214,12 @@ mod test {
 
             for spec.each |spec| {
                 let (times, maxms) = *spec;
+                let hl_loop_clone = hl_loop.clone();
                 do task::spawn {
                     use rand::*;
                     let rng = Rng();
                     for iter::repeat(times) {
-                        sleep(hl_loop, rng.next() as uint % maxms);
+                        sleep(&hl_loop_clone, rng.next() as uint % maxms);
                     }
                     oldcomm::send(ch, ());
                 }
@@ -277,12 +278,12 @@ mod test {
             let expected = rand::Rng().gen_str(16u);
             let test_po = oldcomm::Port::<~str>();
             let test_ch = oldcomm::Chan(&test_po);
-
+            let hl_loop_clone = hl_loop.clone();
             do task::spawn() {
-                delayed_send(hl_loop, 50u, test_ch, expected);
+                delayed_send(&hl_loop_clone, 50u, test_ch, expected);
             };
 
-            match recv_timeout(hl_loop, 1u, test_po) {
+            match recv_timeout(&hl_loop, 1u, test_po) {
               None => successes += 1,
               _ => failures += 1
             };
diff --git a/src/libstd/uv_global_loop.rs b/src/libstd/uv_global_loop.rs
index 3a2c3b7c135..8ae3e24abee 100644
--- a/src/libstd/uv_global_loop.rs
+++ b/src/libstd/uv_global_loop.rs
@@ -19,16 +19,16 @@ use uv_iotask::{IoTask, spawn_iotask};
 
 use core::either::{Left, Right};
 use core::libc;
-use core::oldcomm::{Port, Chan, select2, listen};
-use core::private::{chan_from_global_ptr, weaken_task};
+use core::pipes::{Port, Chan, SharedChan, select2i};
+use core::private::global::{global_data_clone_create,
+                            global_data_clone};
+use core::private::weak_task::weaken_task;
 use core::str;
-use core::task::TaskBuilder;
+use core::task::{task, SingleThreaded, spawn};
 use core::task;
 use core::vec;
-
-extern mod rustrt {
-    unsafe fn rust_uv_get_kernel_global_chan_ptr() -> *libc::uintptr_t;
-}
+use core::clone::Clone;
+use core::option::{Some, None};
 
 /**
  * Race-free helper to get access to a global task where a libuv
@@ -48,69 +48,64 @@ pub fn get() -> IoTask {
 
 #[doc(hidden)]
 fn get_monitor_task_gl() -> IoTask {
-    unsafe {
-        let monitor_loop_chan_ptr =
-            rustrt::rust_uv_get_kernel_global_chan_ptr();
-
-        debug!("ENTERING global_loop::get() loop chan: %?",
-               monitor_loop_chan_ptr);
-
-        debug!("before priv::chan_from_global_ptr");
-        type MonChan = Chan<IoTask>;
-
-        let monitor_ch =
-            do chan_from_global_ptr::<MonChan>(monitor_loop_chan_ptr,
-                                               || {
-                                                    task::task().sched_mode
-                                                    (task::SingleThreaded)
-                                                    .unlinked()
-                                               }) |msg_po| {
-            unsafe {
-                debug!("global monitor task starting");
-
-                // As a weak task the runtime will notify us when to exit
-                do weaken_task() |weak_exit_po| {
-                    debug!("global monitor task is now weak");
-                    let hl_loop = spawn_loop();
-                    loop {
-                        debug!("in outer_loop...");
-                        match select2(weak_exit_po, msg_po) {
-                          Left(weak_exit) => {
-                            // all normal tasks have ended, tell the
-                            // libuv loop to tear_down, then exit
-                            debug!("weak_exit_po recv'd msg: %?", weak_exit);
-                            iotask::exit(hl_loop);
-                            break;
-                          }
-                          Right(fetch_ch) => {
-                            debug!("hl_loop req recv'd: %?", fetch_ch);
-                            fetch_ch.send(hl_loop);
-                          }
-                        }
+
+    type MonChan = Chan<IoTask>;
+
+    struct GlobalIoTask(IoTask);
+
+    impl GlobalIoTask: Clone {
+        fn clone(&self) -> GlobalIoTask {
+            GlobalIoTask((**self).clone())
+        }
+    }
+
+    fn key(_: GlobalIoTask) { }
+
+    match unsafe { global_data_clone(key) } {
+        Some(GlobalIoTask(iotask)) => iotask,
+        None => {
+            let iotask: IoTask = spawn_loop();
+            let mut installed = false;
+            let final_iotask = unsafe {
+                do global_data_clone_create(key) {
+                    installed = true;
+                    ~GlobalIoTask(iotask.clone())
+                }
+            };
+            if installed {
+                do task().unlinked().spawn() {
+                    unsafe {
+                        debug!("global monitor task starting");
+                        // As a weak task the runtime will notify us
+                        // when to exit
+                        do weaken_task |weak_exit_po| {
+                            debug!("global monitor task is weak");
+                            weak_exit_po.recv();
+                            iotask::exit(&iotask);
+                            debug!("global monitor task is unweak");
+                        };
+                        debug!("global monitor task exiting");
                     }
-                    debug!("global monitor task is leaving weakend state");
-                };
-                debug!("global monitor task exiting");
+                }
+            } else {
+                iotask::exit(&iotask);
             }
-        };
 
-        // once we have a chan to the monitor loop, we ask it for
-        // the libuv loop's async handle
-        do listen |fetch_ch| {
-            monitor_ch.send(fetch_ch);
-            fetch_ch.recv()
+            match final_iotask {
+                GlobalIoTask(iotask) => iotask
+            }
         }
     }
 }
 
 fn spawn_loop() -> IoTask {
-    let builder = do task::task().add_wrapper |task_body| {
+    let builder = do task().add_wrapper |task_body| {
         fn~(move task_body) {
             // The I/O loop task also needs to be weak so it doesn't keep
             // the runtime alive
             unsafe {
-                do weaken_task |weak_exit_po| {
-                    debug!("global libuv task is now weak %?", weak_exit_po);
+                do weaken_task |_| {
+                    debug!("global libuv task is now weak");
                     task_body();
 
                     // We don't wait for the exit message on weak_exit_po
@@ -122,6 +117,7 @@ fn spawn_loop() -> IoTask {
             }
         }
     };
+    let builder = builder.unlinked();
     spawn_iotask(move builder)
 }
 
@@ -135,16 +131,18 @@ mod test {
 
     use core::iter;
     use core::libc;
-    use core::oldcomm;
     use core::ptr;
     use core::task;
+    use core::cast::transmute;
+    use core::libc::c_void;
+    use core::pipes::{stream, SharedChan, Chan};
 
     extern fn simple_timer_close_cb(timer_ptr: *ll::uv_timer_t) {
         unsafe {
             let exit_ch_ptr = ll::get_data_for_uv_handle(
-                timer_ptr as *libc::c_void) as *oldcomm::Chan<bool>;
-            let exit_ch = *exit_ch_ptr;
-            oldcomm::send(exit_ch, true);
+                timer_ptr as *libc::c_void);
+            let exit_ch = transmute::<*c_void, ~Chan<bool>>(exit_ch_ptr);
+            exit_ch.send(true);
             log(debug,
                 fmt!("EXIT_CH_PTR simple_timer_close_cb exit_ch_ptr: %?",
                      exit_ch_ptr));
@@ -155,26 +153,25 @@ mod test {
         unsafe {
             log(debug, ~"in simple timer cb");
             ll::timer_stop(timer_ptr);
-            let hl_loop = get_gl();
+            let hl_loop = &get_gl();
             do iotask::interact(hl_loop) |_loop_ptr| {
+                log(debug, ~"closing timer");
                 unsafe {
-                    log(debug, ~"closing timer");
                     ll::close(timer_ptr, simple_timer_close_cb);
-                    log(debug, ~"about to deref exit_ch_ptr");
-                    log(debug, ~"after msg sent on deref'd exit_ch");
                 }
+                log(debug, ~"about to deref exit_ch_ptr");
+                log(debug, ~"after msg sent on deref'd exit_ch");
             };
             log(debug, ~"exiting simple timer cb");
         }
     }
 
-    fn impl_uv_hl_simple_timer(iotask: IoTask) {
+    fn impl_uv_hl_simple_timer(iotask: &IoTask) {
         unsafe {
-            let exit_po = oldcomm::Port::<bool>();
-            let exit_ch = oldcomm::Chan(&exit_po);
-            let exit_ch_ptr = ptr::addr_of(&exit_ch);
+            let (exit_po, exit_ch) = stream::<bool>();
+            let exit_ch_ptr: *libc::c_void = transmute(~exit_ch);
             log(debug, fmt!("EXIT_CH_PTR newly created exit_ch_ptr: %?",
-                           exit_ch_ptr));
+                            exit_ch_ptr));
             let timer_handle = ll::timer_t();
             let timer_ptr = ptr::addr_of(&timer_handle);
             do iotask::interact(iotask) |loop_ptr| {
@@ -184,20 +181,22 @@ mod test {
                     if(init_status == 0i32) {
                         ll::set_data_for_uv_handle(
                             timer_ptr as *libc::c_void,
-                            exit_ch_ptr as *libc::c_void);
+                            exit_ch_ptr);
                         let start_status = ll::timer_start(timer_ptr,
                                                            simple_timer_cb,
-                                                           1u,
-                                                           0u);
-                        if start_status != 0 {
+                                                           1u, 0u);
+                        if(start_status == 0i32) {
+                        }
+                        else {
                             fail ~"failure on ll::timer_start()";
                         }
-                    } else {
+                    }
+                    else {
                         fail ~"failure on ll::timer_init()";
                     }
                 }
             };
-            oldcomm::recv(exit_po);
+            exit_po.recv();
             log(debug,
                 ~"global_loop timer test: msg recv on exit_po, done..");
         }
@@ -205,17 +204,15 @@ mod test {
 
     #[test]
     fn test_gl_uv_global_loop_high_level_global_timer() {
-        unsafe {
-            let hl_loop = get_gl();
-            let exit_po = oldcomm::Port::<()>();
-            let exit_ch = oldcomm::Chan(&exit_po);
-            task::spawn_sched(task::ManualThreads(1u), || {
-                impl_uv_hl_simple_timer(hl_loop);
-                oldcomm::send(exit_ch, ());
-            });
+        let hl_loop = &get_gl();
+        let (exit_po, exit_ch) = stream::<()>();
+        task::spawn_sched(task::ManualThreads(1u), || {
+            let hl_loop = &get_gl();
             impl_uv_hl_simple_timer(hl_loop);
-            oldcomm::recv(exit_po);
-        }
+            exit_ch.send(());
+        });
+        impl_uv_hl_simple_timer(hl_loop);
+        exit_po.recv();
     }
 
     // keeping this test ignored until some kind of stress-test-harness
@@ -223,23 +220,21 @@ mod test {
     #[test]
     #[ignore]
     fn test_stress_gl_uv_global_loop_high_level_global_timer() {
-        unsafe {
-            let hl_loop = get_gl();
-            let exit_po = oldcomm::Port::<()>();
-            let exit_ch = oldcomm::Chan(&exit_po);
-            let cycles = 5000u;
-            for iter::repeat(cycles) {
-                task::spawn_sched(task::ManualThreads(1u), || {
-                    impl_uv_hl_simple_timer(hl_loop);
-                    oldcomm::send(exit_ch, ());
-                });
-            };
-            for iter::repeat(cycles) {
-                oldcomm::recv(exit_po);
-            };
-            log(debug,
-                ~"test_stress_gl_uv_global_loop_high_level_global_timer"+
-                ~" exiting sucessfully!");
-        }
+        let (exit_po, exit_ch) = stream::<()>();
+        let exit_ch = SharedChan(exit_ch);
+        let cycles = 5000u;
+        for iter::repeat(cycles) {
+            let exit_ch_clone = exit_ch.clone();
+            task::spawn_sched(task::ManualThreads(1u), || {
+                let hl_loop = &get_gl();
+                impl_uv_hl_simple_timer(hl_loop);
+                exit_ch_clone.send(());
+            });
+        };
+        for iter::repeat(cycles) {
+            exit_po.recv();
+        };
+        log(debug, ~"test_stress_gl_uv_global_loop_high_level_global_timer"+
+            ~" exiting sucessfully!");
     }
 }
diff --git a/src/libstd/uv_iotask.rs b/src/libstd/uv_iotask.rs
index 0a3d64a02a4..dc0092aadfa 100644
--- a/src/libstd/uv_iotask.rs
+++ b/src/libstd/uv_iotask.rs
@@ -20,7 +20,7 @@ use ll = uv_ll;
 
 use core::libc::c_void;
 use core::libc;
-use core::oldcomm::{Port, Chan, listen};
+use core::pipes::{stream, Port, Chan, SharedChan};
 use core::prelude::*;
 use core::ptr::addr_of;
 use core::task::TaskBuilder;
@@ -30,22 +30,30 @@ use core::task;
 pub enum IoTask {
     IoTask_({
         async_handle: *ll::uv_async_t,
-        op_chan: Chan<IoTaskMsg>
+        op_chan: SharedChan<IoTaskMsg>
     })
 }
 
+impl IoTask: Clone {
+    fn clone(&self) -> IoTask {
+        IoTask_({
+            async_handle: self.async_handle,
+            op_chan: self.op_chan.clone()
+        })
+    }
+}
+
 pub fn spawn_iotask(task: task::TaskBuilder) -> IoTask {
 
-    do listen |iotask_ch| {
+    let (iotask_port, iotask_chan) = stream();
 
-        do task.sched_mode(task::SingleThreaded).spawn {
-            debug!("entering libuv task");
-            run_loop(iotask_ch);
-            debug!("libuv task exiting");
-        };
+    do task.sched_mode(task::SingleThreaded).spawn {
+        debug!("entering libuv task");
+        run_loop(&iotask_chan);
+        debug!("libuv task exiting");
+    };
 
-        iotask_ch.recv()
-    }
+    iotask_port.recv()
 }
 
 
@@ -71,7 +79,7 @@ pub fn spawn_iotask(task: task::TaskBuilder) -> IoTask {
  * module. It is not safe to send the `loop_ptr` param to this callback out
  * via ports/chans.
  */
-pub unsafe fn interact(iotask: IoTask,
+pub unsafe fn interact(iotask: &IoTask,
                    cb: fn~(*c_void)) {
     send_msg(iotask, Interaction(move cb));
 }
@@ -83,7 +91,7 @@ pub unsafe fn interact(iotask: IoTask,
  * async handle and do a sanity check to make sure that all other handles are
  * closed, causing a failure otherwise.
  */
-pub fn exit(iotask: IoTask) {
+pub fn exit(iotask: &IoTask) {
     unsafe {
         send_msg(iotask, TeardownLoop);
     }
@@ -98,8 +106,10 @@ enum IoTaskMsg {
 }
 
 /// Run the loop and begin handling messages
-fn run_loop(iotask_ch: Chan<IoTask>) {
+fn run_loop(iotask_ch: &Chan<IoTask>) {
+
     unsafe {
+        debug!("creating loop");
         let loop_ptr = ll::loop_new();
 
         // set up the special async handle we'll use to allow multi-task
@@ -110,10 +120,12 @@ fn run_loop(iotask_ch: Chan<IoTask>) {
         // associate the async handle with the loop
         ll::async_init(loop_ptr, async_handle, wake_up_cb);
 
+        let (msg_po, msg_ch) = stream::<IoTaskMsg>();
+
         // initialize our loop data and store it in the loop
-        let data = IoTaskLoopData {
+        let data: IoTaskLoopData = IoTaskLoopData {
             async_handle: async_handle,
-            msg_po: Port()
+            msg_po: msg_po
         };
         ll::set_data_for_uv_handle(async_handle, addr_of(&data));
 
@@ -121,7 +133,7 @@ fn run_loop(iotask_ch: Chan<IoTask>) {
         // while we dwell in the I/O loop
         let iotask = IoTask_({
             async_handle: async_handle,
-            op_chan: data.msg_po.chan()
+            op_chan: SharedChan(msg_ch)
         });
         iotask_ch.send(iotask);
 
@@ -139,9 +151,10 @@ struct IoTaskLoopData {
     msg_po: Port<IoTaskMsg>,
 }
 
-fn send_msg(iotask: IoTask, msg: IoTaskMsg) {
+fn send_msg(iotask: &IoTask,
+            msg: IoTaskMsg) {
+    iotask.op_chan.send(move msg);
     unsafe {
-        iotask.op_chan.send(move msg);
         ll::async_send(iotask.async_handle);
     }
 }
@@ -149,19 +162,20 @@ fn send_msg(iotask: IoTask, msg: IoTaskMsg) {
 /// Dispatch all pending messages
 extern fn wake_up_cb(async_handle: *ll::uv_async_t,
                     status: int) {
-    unsafe {
-        log(debug, fmt!("wake_up_cb extern.. handle: %? status: %?",
-                         async_handle, status));
 
+    log(debug, fmt!("wake_up_cb extern.. handle: %? status: %?",
+                     async_handle, status));
+
+    unsafe {
         let loop_ptr = ll::get_loop_for_uv_handle(async_handle);
-        let data = ll::get_data_for_uv_handle(async_handle)
-            as *IoTaskLoopData;
-        let msg_po = (*data).msg_po;
+        let data =
+            ll::get_data_for_uv_handle(async_handle) as *IoTaskLoopData;
+        let msg_po = &(*data).msg_po;
 
         while msg_po.peek() {
             match msg_po.recv() {
-              Interaction(ref cb) => (*cb)(loop_ptr),
-              TeardownLoop => begin_teardown(data)
+                Interaction(ref cb) => (*cb)(loop_ptr),
+                TeardownLoop => begin_teardown(data)
             }
         }
     }
@@ -216,27 +230,32 @@ mod test {
     }
     struct AhData {
         iotask: IoTask,
-        exit_ch: oldcomm::Chan<()>,
+        exit_ch: oldcomm::Chan<()>
     }
-    fn impl_uv_iotask_async(iotask: IoTask) {
+    fn impl_uv_iotask_async(iotask: &IoTask) {
         unsafe {
             let async_handle = ll::async_t();
             let ah_ptr = ptr::addr_of(&async_handle);
             let exit_po = oldcomm::Port::<()>();
             let exit_ch = oldcomm::Chan(&exit_po);
-            let ah_data = {
-                iotask: iotask,
+            let ah_data = AhData {
+                iotask: iotask.clone(),
                 exit_ch: exit_ch
             };
-            let ah_data_ptr = ptr::addr_of(&ah_data);
+            let ah_data_ptr: *AhData = unsafe {
+                ptr::to_unsafe_ptr(&ah_data)
+            };
+            debug!("about to interact");
             do interact(iotask) |loop_ptr| {
                 unsafe {
+                    debug!("interacting");
                     ll::async_init(loop_ptr, ah_ptr, async_handle_cb);
-                    ll::set_data_for_uv_handle(ah_ptr,
-                                               ah_data_ptr as *libc::c_void);
+                    ll::set_data_for_uv_handle(
+                        ah_ptr, ah_data_ptr as *libc::c_void);
                     ll::async_send(ah_ptr);
                 }
             };
+            debug!("waiting for async close");
             oldcomm::recv(exit_po);
         }
     }
@@ -244,13 +263,13 @@ mod test {
     // this fn documents the bear minimum neccesary to roll your own
     // high_level_loop
     unsafe fn spawn_test_loop(exit_ch: oldcomm::Chan<()>) -> IoTask {
-        let iotask_port = oldcomm::Port::<IoTask>();
-        let iotask_ch = oldcomm::Chan(&iotask_port);
+        let (iotask_port, iotask_ch) = stream::<IoTask>();
         do task::spawn_sched(task::ManualThreads(1u)) {
-            run_loop(iotask_ch);
+            debug!("about to run a test loop");
+            run_loop(&iotask_ch);
             exit_ch.send(());
         };
-        return oldcomm::recv(iotask_port);
+        return iotask_port.recv();
     }
 
     extern fn lifetime_handle_close(handle: *libc::c_void) {
@@ -270,23 +289,30 @@ mod test {
         unsafe {
             let exit_po = oldcomm::Port::<()>();
             let exit_ch = oldcomm::Chan(&exit_po);
-            let iotask = spawn_test_loop(exit_ch);
+            let iotask = &spawn_test_loop(exit_ch);
+
+            debug!("spawned iotask");
 
             // using this handle to manage the lifetime of the
-            // high_level_loop, as it will exit the first time one of the
-            // impl_uv_hl_async() is cleaned up with no one ref'd handles on
-            // the loop (Which can happen under race-condition type
-            // situations.. this ensures that the loop lives until, at least,
-            // all of the impl_uv_hl_async() runs have been called, at least.
+            // high_level_loop, as it will exit the first time one of
+            // the impl_uv_hl_async() is cleaned up with no one ref'd
+            // handles on the loop (Which can happen under
+            // race-condition type situations.. this ensures that the
+            // loop lives until, at least, all of the
+            // impl_uv_hl_async() runs have been called, at least.
             let work_exit_po = oldcomm::Port::<()>();
             let work_exit_ch = oldcomm::Chan(&work_exit_po);
             for iter::repeat(7u) {
+                let iotask_clone = iotask.clone();
                 do task::spawn_sched(task::ManualThreads(1u)) {
-                    impl_uv_iotask_async(iotask);
+                    debug!("async");
+                    impl_uv_iotask_async(&iotask_clone);
+                    debug!("done async");
                     oldcomm::send(work_exit_ch, ());
                 };
             };
             for iter::repeat(7u) {
+                debug!("waiting");
                 oldcomm::recv(work_exit_po);
             };
             log(debug, ~"sending teardown_loop msg..");
diff --git a/src/rt/rust.cpp b/src/rt/rust.cpp
index f21a7441640..803da32cbc8 100644
--- a/src/rt/rust.cpp
+++ b/src/rt/rust.cpp
@@ -43,8 +43,8 @@ rust_start(uintptr_t main_fn, int argc, char **argv, void* crate_map) {
 
     rust_kernel *kernel = new rust_kernel(env);
 
-    // Create the main scheduler and the main task
-    rust_sched_id sched_id = kernel->create_scheduler(env->num_sched_threads);
+    // Create the main task
+    rust_sched_id sched_id = kernel->main_sched_id();
     rust_scheduler *sched = kernel->get_scheduler_by_id(sched_id);
     assert(sched != NULL);
     rust_task *root_task = sched->create_task(NULL, "main");
diff --git a/src/rt/rust_builtin.cpp b/src/rt/rust_builtin.cpp
index de69272aca1..4fcfc11b325 100644
--- a/src/rt/rust_builtin.cpp
+++ b/src/rt/rust_builtin.cpp
@@ -652,7 +652,10 @@ new_task_common(rust_scheduler *sched, rust_task *parent) {
 extern "C" CDECL rust_task*
 new_task() {
     rust_task *task = rust_get_current_task();
-    return new_task_common(task->sched, task);
+    rust_sched_id sched_id = task->kernel->main_sched_id();
+    rust_scheduler *sched = task->kernel->get_scheduler_by_id(sched_id);
+    assert(sched != NULL && "should always have a main scheduler");
+    return new_task_common(sched, task);
 }
 
 extern "C" CDECL rust_task*
@@ -855,24 +858,6 @@ rust_compare_and_swap_ptr(intptr_t *address,
     return sync::compare_and_swap(address, oldval, newval);
 }
 
-extern "C" CDECL void
-rust_task_weaken(rust_port_id chan) {
-    rust_task *task = rust_get_current_task();
-    task->kernel->weaken_task(chan);
-}
-
-extern "C" CDECL void
-rust_task_unweaken(rust_port_id chan) {
-    rust_task *task = rust_get_current_task();
-    task->kernel->unweaken_task(chan);
-}
-
-extern "C" CDECL uintptr_t*
-rust_global_env_chan_ptr() {
-    rust_task *task = rust_get_current_task();
-    return task->kernel->get_global_env_chan();
-}
-
 extern "C" void
 rust_task_inhibit_kill(rust_task *task) {
     task->inhibit_kill();
@@ -1023,6 +1008,29 @@ rust_raw_thread_join_delete(raw_thread *thread) {
     delete thread;
 }
 
+extern "C" void
+rust_register_exit_function(spawn_fn runner, fn_env_pair *f) {
+    rust_task *task = rust_get_current_task();
+    task->kernel->register_exit_function(runner, f);
+}
+
+extern "C" void *
+rust_get_global_data_ptr() {
+    rust_task *task = rust_get_current_task();
+    return &task->kernel->global_data;
+}
+
+extern "C" void
+rust_inc_weak_task_count() {
+    rust_task *task = rust_get_current_task();
+    task->kernel->inc_weak_task_count();
+}
+
+extern "C" void
+rust_dec_weak_task_count() {
+    rust_task *task = rust_get_current_task();
+    task->kernel->dec_weak_task_count();
+}
 
 //
 // Local Variables:
diff --git a/src/rt/rust_kernel.cpp b/src/rt/rust_kernel.cpp
index 8871d133ea1..c365f3cca1e 100644
--- a/src/rt/rust_kernel.cpp
+++ b/src/rt/rust_kernel.cpp
@@ -30,21 +30,29 @@ rust_kernel::rust_kernel(rust_env *env) :
     rval(0),
     max_sched_id(1),
     killed(false),
+    already_exiting(false),
     sched_reaper(this),
     osmain_driver(NULL),
     non_weak_tasks(0),
-    global_loop_chan(0),
-    global_env_chan(0),
-    env(env)
-
+    at_exit_runner(NULL),
+    at_exit_started(false),
+    env(env),
+    global_data(0)
 {
-
     // Create the single threaded scheduler that will run on the platform's
     // main thread
-    rust_manual_sched_launcher_factory *launchfac =
+    rust_manual_sched_launcher_factory *osmain_launchfac =
         new rust_manual_sched_launcher_factory();
-    osmain_scheduler = create_scheduler(launchfac, 1, false);
-    osmain_driver = launchfac->get_driver();
+    osmain_scheduler = create_scheduler(osmain_launchfac, 1, false);
+    osmain_driver = osmain_launchfac->get_driver();
+
+    // Create the primary scheduler
+    rust_thread_sched_launcher_factory *main_launchfac =
+        new rust_thread_sched_launcher_factory();
+    main_scheduler = create_scheduler(main_launchfac,
+                                      env->num_sched_threads,
+                                      false);
+
     sched_reaper.start();
 }
 
@@ -103,15 +111,22 @@ rust_kernel::create_scheduler(rust_sched_launcher_factory *launchfac,
     {
         scoped_lock with(sched_lock);
 
+        /*if (sched_table.size() == 2) {
+            // The main and OS main schedulers may not exit while there are
+            // other schedulers
+            KLOG_("Disallowing main scheduler to exit");
+            rust_scheduler *main_sched =
+                get_scheduler_by_id_nolock(main_scheduler);
+            assert(main_sched != NULL);
+            main_sched->disallow_exit();
+        }
         if (sched_table.size() == 1) {
-            // The OS main scheduler may not exit while there are other
-            // schedulers
             KLOG_("Disallowing osmain scheduler to exit");
-            rust_scheduler *sched =
+            rust_scheduler *osmain_sched =
                 get_scheduler_by_id_nolock(osmain_scheduler);
-            assert(sched != NULL);
-            sched->disallow_exit();
-        }
+            assert(osmain_sched != NULL);
+            osmain_sched->disallow_exit();
+            }*/
 
         id = max_sched_id++;
         assert(id != INTPTR_MAX && "Hit the maximum scheduler id");
@@ -175,14 +190,21 @@ rust_kernel::wait_for_schedulers()
             sched_table.erase(iter);
             sched->join_task_threads();
             sched->deref();
+            /*if (sched_table.size() == 2) {
+                KLOG_("Allowing main scheduler to exit");
+                // It's only the main schedulers left. Tell them to exit
+                rust_scheduler *main_sched =
+                    get_scheduler_by_id_nolock(main_scheduler);
+                assert(main_sched != NULL);
+                main_sched->allow_exit();
+            }
             if (sched_table.size() == 1) {
                 KLOG_("Allowing osmain scheduler to exit");
-                // It's only the osmain scheduler left. Tell it to exit
-                rust_scheduler *sched =
+                rust_scheduler *osmain_sched =
                     get_scheduler_by_id_nolock(osmain_scheduler);
-                assert(sched != NULL);
-                sched->allow_exit();
-            }
+                assert(osmain_sched != NULL);
+                osmain_sched->allow_exit();
+            }*/
         }
         if (!sched_table.empty()) {
             sched_lock.wait();
@@ -319,59 +341,63 @@ rust_kernel::register_task() {
 }
 
 void
+rust_kernel::allow_scheduler_exit() {
+    scoped_lock with(sched_lock);
+
+    KLOG_("Allowing main scheduler to exit");
+    // It's only the main schedulers left. Tell them to exit
+    rust_scheduler *main_sched =
+        get_scheduler_by_id_nolock(main_scheduler);
+    assert(main_sched != NULL);
+    main_sched->allow_exit();
+
+    KLOG_("Allowing osmain scheduler to exit");
+    rust_scheduler *osmain_sched =
+        get_scheduler_by_id_nolock(osmain_scheduler);
+    assert(osmain_sched != NULL);
+    osmain_sched->allow_exit();
+}
+
+void
 rust_kernel::unregister_task() {
     KLOG_("Unregistering task");
     uintptr_t new_non_weak_tasks = sync::decrement(non_weak_tasks);
     KLOG_("New non-weak tasks %" PRIdPTR, new_non_weak_tasks);
     if (new_non_weak_tasks == 0) {
-        end_weak_tasks();
+        begin_shutdown();
     }
 }
 
 void
-rust_kernel::weaken_task(rust_port_id chan) {
-    {
-        scoped_lock with(weak_task_lock);
-        KLOG_("Weakening task with channel %" PRIdPTR, chan);
-        weak_task_chans.push_back(chan);
-    }
+rust_kernel::inc_weak_task_count() {
     uintptr_t new_non_weak_tasks = sync::decrement(non_weak_tasks);
     KLOG_("New non-weak tasks %" PRIdPTR, new_non_weak_tasks);
     if (new_non_weak_tasks == 0) {
-        end_weak_tasks();
+        begin_shutdown();
     }
 }
 
 void
-rust_kernel::unweaken_task(rust_port_id chan) {
+rust_kernel::dec_weak_task_count() {
     uintptr_t new_non_weak_tasks = sync::increment(non_weak_tasks);
     KLOG_("New non-weak tasks %" PRIdPTR, new_non_weak_tasks);
-    {
-        scoped_lock with(weak_task_lock);
-        KLOG_("Unweakening task with channel %" PRIdPTR, chan);
-        std::vector<rust_port_id>::iterator iter =
-            std::find(weak_task_chans.begin(), weak_task_chans.end(), chan);
-        if (iter != weak_task_chans.end()) {
-            weak_task_chans.erase(iter);
-        }
-    }
 }
 
 void
-rust_kernel::end_weak_tasks() {
-    std::vector<rust_port_id> chancopies;
+rust_kernel::begin_shutdown() {
     {
-        scoped_lock with(weak_task_lock);
-        chancopies = weak_task_chans;
-        weak_task_chans.clear();
-    }
-    while (!chancopies.empty()) {
-        rust_port_id chan = chancopies.back();
-        chancopies.pop_back();
-        KLOG_("Notifying weak task " PRIdPTR, chan);
-        uintptr_t token = 0;
-        send_to_port(chan, &token);
+        scoped_lock with(sched_lock);
+        // FIXME #4410: This shouldn't be necessary, but because of
+        // unweaken_task this may end up getting called multiple times.
+        if (already_exiting) {
+            return;
+        } else {
+            already_exiting = true;
+        }
     }
+
+    run_exit_functions();
+    allow_scheduler_exit();
 }
 
 bool
@@ -389,6 +415,47 @@ rust_kernel::send_to_port(rust_port_id chan, void *sptr) {
     }
 }
 
+void
+rust_kernel::register_exit_function(spawn_fn runner, fn_env_pair *f) {
+    scoped_lock with(at_exit_lock);
+
+    assert(!at_exit_started && "registering at_exit function after exit");
+
+    if (at_exit_runner) {
+        assert(runner == at_exit_runner
+               && "there can be only one at_exit_runner");
+    }
+
+    at_exit_runner = runner;
+    at_exit_fns.push_back(f);
+}
+
+void
+rust_kernel::run_exit_functions() {
+    rust_task *task;
+
+    {
+        scoped_lock with(at_exit_lock);
+
+        assert(!at_exit_started && "running exit functions twice?");
+
+        at_exit_started = true;
+
+        if (at_exit_runner == NULL) {
+            return;
+        }
+
+        rust_scheduler *sched = get_scheduler_by_id(main_sched_id());
+        assert(sched);
+        task = sched->create_task(NULL, "at_exit");
+
+        final_exit_fns.count = at_exit_fns.size();
+        final_exit_fns.start = at_exit_fns.data();
+    }
+
+    task->start(at_exit_runner, NULL, &final_exit_fns);
+}
+
 //
 // Local Variables:
 // mode: C++
diff --git a/src/rt/rust_kernel.h b/src/rt/rust_kernel.h
index 48522b57d5c..a7c6249e3db 100644
--- a/src/rt/rust_kernel.h
+++ b/src/rt/rust_kernel.h
@@ -49,6 +49,7 @@
 #include "memory_region.h"
 #include "rust_log.h"
 #include "rust_sched_reaper.h"
+#include "rust_type.h"
 #include "util/hash_map.h"
 
 class rust_scheduler;
@@ -65,6 +66,13 @@ typedef intptr_t rust_port_id;
 
 typedef std::map<rust_sched_id, rust_scheduler*> sched_map;
 
+// This is defined as a struct only because we need a single pointer to pass
+// to the Rust function that runs the at_exit functions
+struct exit_functions {
+    size_t count;
+    fn_env_pair **start;
+};
+
 class rust_kernel {
     memory_region _region;
     rust_log _log;
@@ -81,7 +89,8 @@ class rust_kernel {
     lock_and_signal rval_lock;
     int rval;
 
-    // Protects max_sched_id and sched_table, join_list, killed
+    // Protects max_sched_id and sched_table, join_list, killed,
+    // already_exiting
     lock_and_signal sched_lock;
     // The next scheduler id
     rust_sched_id max_sched_id;
@@ -94,8 +103,13 @@ class rust_kernel {
     // task group fails). This propagates to all new schedulers and tasks
     // created after it is set.
     bool killed;
+    bool already_exiting;
+
 
     rust_sched_reaper sched_reaper;
+
+    // The primary scheduler
+    rust_sched_id main_scheduler;
     // The single-threaded scheduler that uses the main thread
     rust_sched_id osmain_scheduler;
     // Runs the single-threaded scheduler that executes tasks
@@ -104,21 +118,22 @@ class rust_kernel {
 
     // An atomically updated count of the live, 'non-weak' tasks
     uintptr_t non_weak_tasks;
-    // Protects weak_task_chans
-    lock_and_signal weak_task_lock;
-    // A list of weak tasks that need to be told when to exit
-    std::vector<rust_port_id> weak_task_chans;
 
     rust_scheduler* get_scheduler_by_id_nolock(rust_sched_id id);
-    void end_weak_tasks();
+    void allow_scheduler_exit();
+    void begin_shutdown();
+
+    lock_and_signal at_exit_lock;
+    spawn_fn at_exit_runner;
+    bool at_exit_started;
+    std::vector<fn_env_pair*> at_exit_fns;
+    exit_functions final_exit_fns;
 
-    // Used to communicate with the process-side, global libuv loop
-    uintptr_t global_loop_chan;
-    // Used to serialize access to getenv/setenv
-    uintptr_t global_env_chan;
+    void run_exit_functions();
 
 public:
     struct rust_env *env;
+    uintptr_t global_data;
 
     rust_kernel(rust_env *env);
 
@@ -154,17 +169,17 @@ public:
 
     void set_exit_status(int code);
 
+    rust_sched_id main_sched_id() { return main_scheduler; }
     rust_sched_id osmain_sched_id() { return osmain_scheduler; }
 
     void register_task();
     void unregister_task();
-    void weaken_task(rust_port_id chan);
-    void unweaken_task(rust_port_id chan);
+    void inc_weak_task_count();
+    void dec_weak_task_count();
 
     bool send_to_port(rust_port_id chan, void *sptr);
 
-    uintptr_t* get_global_loop() { return &global_loop_chan; }
-    uintptr_t* get_global_env_chan() { return &global_env_chan; }
+    void register_exit_function(spawn_fn runner, fn_env_pair *f);
 };
 
 template <typename T> struct kernel_owned {
diff --git a/src/rt/rust_uv.cpp b/src/rt/rust_uv.cpp
index 53d8177bcf8..2dc70088628 100644
--- a/src/rt/rust_uv.cpp
+++ b/src/rt/rust_uv.cpp
@@ -513,15 +513,6 @@ rust_uv_ip6_port(struct sockaddr_in6* src) {
     return ntohs(src->sin6_port);
 }
 
-extern "C" uintptr_t*
-rust_uv_get_kernel_global_chan_ptr() {
-    uintptr_t* result = rust_get_current_task()->kernel->get_global_loop();
-    rust_task* task = rust_get_current_task();
-    LOG(task, stdlib, "global loop: %lu", (unsigned long int)result);
-    LOG(task, stdlib,"global loop val: %lu", (unsigned long int)*result);
-    return result;
-}
-
 extern "C" void*
 rust_uv_current_kernel_malloc(size_t size) {
     return current_kernel_malloc(size, "rust_uv_current_kernel_malloc");
diff --git a/src/rt/rustrt.def.in b/src/rt/rustrt.def.in
index cce4e411e02..eb9db6c1d57 100644
--- a/src/rt/rustrt.def.in
+++ b/src/rt/rustrt.def.in
@@ -61,8 +61,6 @@ rust_task_yield
 rust_task_is_unwinding
 rust_get_task
 rust_get_stack_segment
-rust_task_weaken
-rust_task_unweaken
 rust_log_str
 start_task
 vec_reserve_shared_actual
@@ -158,7 +156,6 @@ rust_uv_get_data_for_req
 rust_uv_set_data_for_req
 rust_uv_get_base_from_buf
 rust_uv_get_len_from_buf
-rust_uv_get_kernel_global_chan_ptr
 rust_uv_current_kernel_malloc
 rust_uv_current_kernel_free
 rust_uv_getaddrinfo
@@ -174,7 +171,6 @@ rust_dbg_do_nothing
 rust_dbg_breakpoint
 rust_osmain_sched_id
 rust_compare_and_swap_ptr
-rust_global_env_chan_ptr
 rust_port_take
 rust_port_drop
 rust_port_task
@@ -210,3 +206,7 @@ linenoiseHistorySave
 linenoiseHistoryLoad
 rust_raw_thread_start
 rust_raw_thread_join_delete
+rust_register_exit_function
+rust_get_global_data_ptr
+rust_inc_weak_task_count
+rust_dec_weak_task_count
\ No newline at end of file
diff --git a/src/test/run-pass/pipe-detect-term.rs b/src/test/run-pass/pipe-detect-term.rs
index c2d4be04191..10b13d8757f 100644
--- a/src/test/run-pass/pipe-detect-term.rs
+++ b/src/test/run-pass/pipe-detect-term.rs
@@ -27,7 +27,7 @@ proto! oneshot (
 )
 
 fn main() {
-    let iotask = uv::global_loop::get();
+    let iotask = &uv::global_loop::get();
     
     pipes::spawn_service(oneshot::init, |p| { 
         match try_recv(move p) {
diff --git a/src/test/run-pass/pipe-select.rs b/src/test/run-pass/pipe-select.rs
index e71d0c4931d..e138f2562aa 100644
--- a/src/test/run-pass/pipe-select.rs
+++ b/src/test/run-pass/pipe-select.rs
@@ -35,7 +35,7 @@ fn main() {
     use oneshot::client::*;
     use stream::client::*;
 
-    let iotask = uv::global_loop::get();
+    let iotask = &uv::global_loop::get();
     
     let c = pipes::spawn_service(stream::init, |p| { 
         error!("waiting for pipes");
diff --git a/src/test/run-pass/pipe-sleep.rs b/src/test/run-pass/pipe-sleep.rs
index 4a6e7b4ce36..ae7e4e7fb0c 100644
--- a/src/test/run-pass/pipe-sleep.rs
+++ b/src/test/run-pass/pipe-sleep.rs
@@ -27,7 +27,7 @@ fn main() {
 
     let c = pipes::spawn_service(oneshot::init, |p| { recv(move p); });
 
-    let iotask = uv::global_loop::get();
+    let iotask = &uv::global_loop::get();
     sleep(iotask, 500);
     
     signal(move c);