about summary refs log tree commit diff
diff options
context:
space:
mode:
authorBrian Anderson <banderson@mozilla.com>2013-05-17 17:53:50 -0700
committerBrian Anderson <banderson@mozilla.com>2013-05-17 17:53:50 -0700
commit03a8e59615f7ced4def8adaad41cfcb0fd0f9d29 (patch)
tree560956d27a7945a464d8d8ec742be4ea5fbecb69
parent2d28d645422c1617be58c8ca7ad9a457264ca850 (diff)
parent018dfaf9a6a25f5dba0ac642ff6c426c549bc4d7 (diff)
downloadrust-03a8e59615f7ced4def8adaad41cfcb0fd0f9d29.tar.gz
rust-03a8e59615f7ced4def8adaad41cfcb0fd0f9d29.zip
Merge remote-tracking branch 'brson/io' into incoming
-rw-r--r--src/libcore/core.rc5
-rw-r--r--src/libcore/logging.rs63
-rw-r--r--src/libcore/macros.rs16
-rw-r--r--src/libcore/os.rs28
-rw-r--r--src/libcore/rt/context.rs7
-rw-r--r--src/libcore/rt/global_heap.rs (renamed from src/libcore/unstable/exchange_alloc.rs)14
-rw-r--r--src/libcore/rt/io/extensions.rs900
-rw-r--r--src/libcore/rt/io/file.rs6
-rw-r--r--src/libcore/rt/io/mock.rs50
-rw-r--r--src/libcore/rt/io/mod.rs73
-rw-r--r--src/libcore/rt/io/native/file.rs8
-rw-r--r--src/libcore/rt/io/net/tcp.rs330
-rw-r--r--src/libcore/rt/io/net/udp.rs4
-rw-r--r--src/libcore/rt/io/net/unix.rs4
-rw-r--r--src/libcore/rt/io/option.rs8
-rw-r--r--src/libcore/rt/io/stdio.rs9
-rw-r--r--src/libcore/rt/local_sched.rs (renamed from src/libcore/rt/sched/local_sched.rs)64
-rw-r--r--src/libcore/rt/local_services.rs36
-rw-r--r--src/libcore/rt/logging.rs68
-rw-r--r--src/libcore/rt/mod.rs168
-rw-r--r--src/libcore/rt/rc.rs142
-rw-r--r--src/libcore/rt/rtio.rs25
-rw-r--r--src/libcore/rt/sched.rs (renamed from src/libcore/rt/sched/mod.rs)370
-rw-r--r--src/libcore/rt/stack.rs39
-rw-r--r--src/libcore/rt/test.rs70
-rw-r--r--src/libcore/rt/thread_local_storage.rs3
-rw-r--r--src/libcore/rt/tube.rs184
-rw-r--r--src/libcore/rt/uv/file.rs10
-rw-r--r--src/libcore/rt/uv/idle.rs91
-rw-r--r--src/libcore/rt/uv/mod.rs353
-rw-r--r--src/libcore/rt/uv/net.rs301
-rw-r--r--src/libcore/rt/uv/timer.rs183
-rw-r--r--src/libcore/rt/uv/uvio.rs (renamed from src/libcore/rt/uvio.rs)291
-rw-r--r--src/libcore/rt/uv/uvll.rs (renamed from src/libcore/rt/uvll.rs)19
-rw-r--r--src/libcore/rt/work_queue.rs4
-rw-r--r--src/libcore/sys.rs29
-rw-r--r--src/libcore/task/local_data_priv.rs2
-rw-r--r--src/libcore/task/mod.rs50
-rw-r--r--src/libcore/task/spawn.rs2
-rw-r--r--src/libcore/unstable/lang.rs31
-rw-r--r--src/libcore/unstable/mod.rs1
-rw-r--r--src/libstd/uv_ll.rs8
-rw-r--r--src/rt/rust_builtin.cpp14
-rw-r--r--src/rt/rust_env.cpp16
-rw-r--r--src/rt/rust_exchange_alloc.cpp16
-rw-r--r--src/rt/rust_log.cpp4
-rw-r--r--src/rt/rust_stack.cpp11
-rw-r--r--src/rt/rust_uv.cpp2
-rw-r--r--src/rt/rustrt.def.in9
-rw-r--r--src/test/run-pass/core-rt-smoke.rs18
50 files changed, 3003 insertions, 1156 deletions
diff --git a/src/libcore/core.rc b/src/libcore/core.rc
index 96b5e1b781d..eb94e9ca028 100644
--- a/src/libcore/core.rc
+++ b/src/libcore/core.rc
@@ -205,8 +205,11 @@ mod unicode;
 #[path = "num/cmath.rs"]
 mod cmath;
 mod stackwalk;
+
+// XXX: This shouldn't be pub, and it should be reexported under 'unstable'
+// but name resolution doesn't work without it being pub.
 #[path = "rt/mod.rs"]
-mod rt;
+pub mod rt;
 
 // A curious inner-module that's not exported that contains the binding
 // 'core' so that macro-expanded references to core::error and such
diff --git a/src/libcore/logging.rs b/src/libcore/logging.rs
index cea827298af..b192333999a 100644
--- a/src/libcore/logging.rs
+++ b/src/libcore/logging.rs
@@ -10,17 +10,16 @@
 
 //! Logging
 
-pub mod rustrt {
-    use libc;
-
-    pub extern {
-        unsafe fn rust_log_console_on();
-        unsafe fn rust_log_console_off();
-        unsafe fn rust_log_str(level: u32,
-                               string: *libc::c_char,
-                               size: libc::size_t);
-    }
-}
+use option::*;
+use either::*;
+use rt;
+use rt::logging::{Logger, StdErrLogger};
+use io;
+use libc;
+use repr;
+use vec;
+use cast;
+use str;
 
 /// Turns on logging to stdout globally
 pub fn console_on() {
@@ -55,8 +54,46 @@ pub fn log_type<T>(level: u32, object: &T) {
     let bytes = do io::with_bytes_writer |writer| {
         repr::write_repr(writer, object);
     };
+
+    match rt::context() {
+        rt::OldTaskContext => {
+            unsafe {
+                let len = bytes.len() as libc::size_t;
+                rustrt::rust_log_str(level, cast::transmute(vec::raw::to_ptr(bytes)), len);
+            }
+        }
+        _ => {
+            // XXX: Bad allocation
+            let msg = str::from_bytes(bytes);
+            newsched_log_str(msg);
+        }
+    }
+}
+
+fn newsched_log_str(msg: ~str) {
     unsafe {
-        let len = bytes.len() as libc::size_t;
-        rustrt::rust_log_str(level, transmute(vec::raw::to_ptr(bytes)), len);
+        match rt::local_services::unsafe_try_borrow_local_services() {
+            Some(local) => {
+                // Use the available logger
+                (*local).logger.log(Left(msg));
+            }
+            None => {
+                // There is no logger anywhere, just write to stderr
+                let mut logger = StdErrLogger;
+                logger.log(Left(msg));
+            }
+        }
+    }
+}
+
+pub mod rustrt {
+    use libc;
+
+    pub extern {
+        unsafe fn rust_log_console_on();
+        unsafe fn rust_log_console_off();
+        unsafe fn rust_log_str(level: u32,
+                               string: *libc::c_char,
+                               size: libc::size_t);
     }
 }
diff --git a/src/libcore/macros.rs b/src/libcore/macros.rs
index b19a753b715..fda48b6ffb7 100644
--- a/src/libcore/macros.rs
+++ b/src/libcore/macros.rs
@@ -30,10 +30,24 @@ macro_rules! rtdebug (
     ($( $arg:expr),+) => ( $(let _ = $arg)*; )
 )
 
+macro_rules! rtassert (
+    ( $arg:expr ) => ( {
+        if !$arg {
+            abort!("assertion failed: %s", stringify!($arg));
+        }
+    } )
+)
+
 macro_rules! abort(
     ($( $msg:expr),+) => ( {
         rtdebug!($($msg),+);
 
-        unsafe { ::libc::abort(); }
+        do_abort();
+
+        // NB: This is in a fn to avoid putting the `unsafe` block in a macro,
+        // which causes spurious 'unnecessary unsafe block' warnings.
+        fn do_abort() -> ! {
+            unsafe { ::libc::abort(); }
+        }
     } )
 )
diff --git a/src/libcore/os.rs b/src/libcore/os.rs
index b97b32330de..72e62c80392 100644
--- a/src/libcore/os.rs
+++ b/src/libcore/os.rs
@@ -147,23 +147,25 @@ pub mod win32 {
 
 /*
 Accessing environment variables is not generally threadsafe.
-This uses a per-runtime lock to serialize access.
-FIXME #4726: It would probably be appropriate to make this a real global
+Serialize access through a global lock.
 */
 fn with_env_lock<T>(f: &fn() -> T) -> T {
-    use unstable::global::global_data_clone_create;
-    use unstable::sync::{Exclusive, exclusive};
-
-    struct SharedValue(());
-    type ValueMutex = Exclusive<SharedValue>;
-    fn key(_: ValueMutex) { }
+    use unstable::finally::Finally;
 
     unsafe {
-        let lock: ValueMutex = global_data_clone_create(key, || {
-            ~exclusive(SharedValue(()))
-        });
+        return do (|| {
+            rust_take_env_lock();
+            f()
+        }).finally {
+            rust_drop_env_lock();
+        };
+    }
 
-        lock.with_imm(|_| f() )
+    extern {
+        #[fast_ffi]
+        fn rust_take_env_lock();
+        #[fast_ffi]
+        fn rust_drop_env_lock();
     }
 }
 
@@ -749,7 +751,7 @@ pub fn list_dir(p: &Path) -> ~[~str] {
             use os::win32::{
                 as_utf16_p
             };
-            use unstable::exchange_alloc::{malloc_raw, free_raw};
+            use rt::global_heap::{malloc_raw, free_raw};
             #[nolink]
             extern {
                 unsafe fn rust_list_dir_wfd_size() -> libc::size_t;
diff --git a/src/libcore/rt/context.rs b/src/libcore/rt/context.rs
index 9c1e566f218..2add314fd11 100644
--- a/src/libcore/rt/context.rs
+++ b/src/libcore/rt/context.rs
@@ -84,6 +84,7 @@ pub impl Context {
 }
 
 extern {
+    #[rust_stack]
     fn swap_registers(out_regs: *mut Registers, in_regs: *Registers);
 }
 
@@ -111,9 +112,9 @@ fn initialize_call_frame(regs: &mut Registers, fptr: *c_void, arg: *c_void, sp:
     let sp = align_down(sp);
     let sp = mut_offset(sp, -4);
 
-    unsafe { *sp = arg as uint; }
+    unsafe { *sp = arg as uint };
     let sp = mut_offset(sp, -1);
-    unsafe { *sp = 0; } // The final return address
+    unsafe { *sp = 0 }; // The final return address
 
     regs.esp = sp as u32;
     regs.eip = fptr as u32;
@@ -195,7 +196,7 @@ fn initialize_call_frame(regs: &mut Registers, fptr: *c_void, arg: *c_void, sp:
 
 fn align_down(sp: *mut uint) -> *mut uint {
     unsafe {
-        let sp = transmute::<*mut uint, uint>(sp);
+        let sp: uint = transmute(sp);
         let sp = sp & !(16 - 1);
         transmute::<uint, *mut uint>(sp)
     }
diff --git a/src/libcore/unstable/exchange_alloc.rs b/src/libcore/rt/global_heap.rs
index 3b35c2fb804..ce7ff87b445 100644
--- a/src/libcore/unstable/exchange_alloc.rs
+++ b/src/libcore/rt/global_heap.rs
@@ -9,7 +9,7 @@
 // except according to those terms.
 
 use sys::{TypeDesc, size_of};
-use libc::{c_void, size_t};
+use libc::{c_void, size_t, uintptr_t};
 use c_malloc = libc::malloc;
 use c_free = libc::free;
 use managed::raw::{BoxHeaderRepr, BoxRepr};
@@ -34,7 +34,7 @@ pub unsafe fn malloc(td: *TypeDesc, size: uint) -> *c_void {
     box.header.prev = null();
     box.header.next = null();
 
-    let exchange_count = &mut *rust_get_exchange_count_ptr();
+    let exchange_count = &mut *exchange_count_ptr();
     atomic_xadd(exchange_count, 1);
 
     return transmute(box);
@@ -52,7 +52,7 @@ pub unsafe fn malloc_raw(size: uint) -> *c_void {
 }
 
 pub unsafe fn free(ptr: *c_void) {
-    let exchange_count = &mut *rust_get_exchange_count_ptr();
+    let exchange_count = &mut *exchange_count_ptr();
     atomic_xsub(exchange_count, 1);
 
     assert!(ptr.is_not_null());
@@ -77,7 +77,11 @@ fn align_to(size: uint, align: uint) -> uint {
     (size + align - 1) & !(align - 1)
 }
 
+fn exchange_count_ptr() -> *mut int {
+    // XXX: Need mutable globals
+    unsafe { transmute(&rust_exchange_count) }
+}
+
 extern {
-    #[rust_stack]
-    fn rust_get_exchange_count_ptr() -> *mut int;
+    static rust_exchange_count: uintptr_t;
 }
diff --git a/src/libcore/rt/io/extensions.rs b/src/libcore/rt/io/extensions.rs
index bb025b0ccb6..a3804d2d6ef 100644
--- a/src/libcore/rt/io/extensions.rs
+++ b/src/libcore/rt/io/extensions.rs
@@ -13,22 +13,53 @@
 // XXX: Not sure how this should be structured
 // XXX: Iteration should probably be considered separately
 
+use uint;
+use int;
+use vec;
+use rt::io::{Reader, Writer};
+use rt::io::{read_error, standard_error, EndOfFile, DEFAULT_BUF_SIZE};
+use option::{Option, Some, None};
+use unstable::finally::Finally;
+use util;
+use cast;
+use io::{u64_to_le_bytes, u64_to_be_bytes};
+
 pub trait ReaderUtil {
 
-    /// Reads `len` bytes and gives you back a new vector
+    /// Reads a single byte. Returns `None` on EOF.
+    ///
+    /// # Failure
+    ///
+    /// Raises the same conditions as the `read` method. Returns
+    /// `None` if the condition is handled.
+    fn read_byte(&mut self) -> Option<u8>;
+
+    /// Reads `len` bytes and appends them to a vector.
+    ///
+    /// May push fewer than the requested number of bytes on error
+    /// or EOF. Returns true on success, false on EOF or error.
     ///
     /// # Failure
     ///
-    /// Raises the `io_error` condition on error. Returns an empty
-    /// vector if the condition is handled.
+    /// Raises the same conditions as `read`. Additionally raises `read_error`
+    /// on EOF. If `read_error` is handled then `push_bytes` may push less
+    /// than the requested number of bytes.
+    fn push_bytes(&mut self, buf: &mut ~[u8], len: uint);
+
+    /// Reads `len` bytes and gives you back a new vector of length `len`
+    ///
+    /// # Failure
+    ///
+    /// Raises the same conditions as `read`. Additionally raises `read_error`
+    /// on EOF. If `read_error` is handled then the returned vector may
+    /// contain less than the requested number of bytes.
     fn read_bytes(&mut self, len: uint) -> ~[u8];
 
     /// Reads all remaining bytes from the stream.
     ///
     /// # Failure
     ///
-    /// Raises the `io_error` condition on error. Returns an empty
-    /// vector if the condition is handled.
+    /// Raises the same conditions as the `read` method.
     fn read_to_end(&mut self) -> ~[u8];
 
 }
@@ -37,433 +68,836 @@ pub trait ReaderByteConversions {
     /// Reads `n` little-endian unsigned integer bytes.
     ///
     /// `n` must be between 1 and 8, inclusive.
-    ///
-    /// # Failure
-    ///
-    /// Raises the `io_error` condition on error. Returns `0` if
-    /// the condition is handled.
     fn read_le_uint_n(&mut self, nbytes: uint) -> u64;
 
     /// Reads `n` little-endian signed integer bytes.
     ///
     /// `n` must be between 1 and 8, inclusive.
-    ///
-    /// # Failure
-    ///
-    /// Raises the `io_error` condition on error. Returns `0` if
-    /// the condition is handled.
     fn read_le_int_n(&mut self, nbytes: uint) -> i64;
 
     /// Reads `n` big-endian unsigned integer bytes.
     ///
     /// `n` must be between 1 and 8, inclusive.
-    ///
-    /// # Failure
-    ///
-    /// Raises the `io_error` condition on error. Returns `0` if
-    /// the condition is handled.
     fn read_be_uint_n(&mut self, nbytes: uint) -> u64;
 
     /// Reads `n` big-endian signed integer bytes.
     ///
     /// `n` must be between 1 and 8, inclusive.
-    ///
-    /// # Failure
-    ///
-    /// Raises the `io_error` condition on error. Returns `0` if
-    /// the condition is handled.
     fn read_be_int_n(&mut self, nbytes: uint) -> i64;
 
     /// Reads a little-endian unsigned integer.
     ///
     /// The number of bytes returned is system-dependant.
-    ///
-    /// # Failure
-    ///
-    /// Raises the `io_error` condition on error. Returns `0` if
-    /// the condition is handled.
     fn read_le_uint(&mut self) -> uint;
 
     /// Reads a little-endian integer.
     ///
     /// The number of bytes returned is system-dependant.
-    ///
-    /// # Failure
-    ///
-    /// Raises the `io_error` condition on error. Returns `0` if
-    /// the condition is handled.
     fn read_le_int(&mut self) -> int;
 
     /// Reads a big-endian unsigned integer.
     ///
     /// The number of bytes returned is system-dependant.
-    ///
-    /// # Failure
-    ///
-    /// Raises the `io_error` condition on error. Returns `0` if
-    /// the condition is handled.
     fn read_be_uint(&mut self) -> uint;
 
     /// Reads a big-endian integer.
     ///
     /// The number of bytes returned is system-dependant.
-    ///
-    /// # Failure
-    ///
-    /// Raises the `io_error` condition on error. Returns `0` if
-    /// the condition is handled.
     fn read_be_int(&mut self) -> int;
 
     /// Reads a big-endian `u64`.
     ///
     /// `u64`s are 8 bytes long.
-    ///
-    /// # Failure
-    ///
-    /// Raises the `io_error` condition on error. Returns `0` if
-    /// the condition is handled.
     fn read_be_u64(&mut self) -> u64;
 
     /// Reads a big-endian `u32`.
     ///
     /// `u32`s are 4 bytes long.
-    ///
-    /// # Failure
-    ///
-    /// Raises the `io_error` condition on error. Returns `0` if
-    /// the condition is handled.
     fn read_be_u32(&mut self) -> u32;
 
     /// Reads a big-endian `u16`.
     ///
     /// `u16`s are 2 bytes long.
-    ///
-    /// # Failure
-    ///
-    /// Raises the `io_error` condition on error. Returns `0` if
-    /// the condition is handled.
     fn read_be_u16(&mut self) -> u16;
 
     /// Reads a big-endian `i64`.
     ///
     /// `i64`s are 8 bytes long.
-    ///
-    /// # Failure
-    ///
-    /// Raises the `io_error` condition on error. Returns `0` if
-    /// the condition is handled.
     fn read_be_i64(&mut self) -> i64;
 
     /// Reads a big-endian `i32`.
     ///
     /// `i32`s are 4 bytes long.
-    ///
-    /// # Failure
-    ///
-    /// Raises the `io_error` condition on error. Returns `0` if
-    /// the condition is handled.
     fn read_be_i32(&mut self) -> i32;
 
     /// Reads a big-endian `i16`.
     ///
     /// `i16`s are 2 bytes long.
-    ///
-    /// # Failure
-    ///
-    /// Raises the `io_error` condition on error. Returns `0` if
-    /// the condition is handled.
     fn read_be_i16(&mut self) -> i16;
 
     /// Reads a big-endian `f64`.
     ///
     /// `f64`s are 8 byte, IEEE754 double-precision floating point numbers.
-    ///
-    /// # Failure
-    ///
-    /// Raises the `io_error` condition on error. Returns `0` if
-    /// the condition is handled.
     fn read_be_f64(&mut self) -> f64;
 
     /// Reads a big-endian `f32`.
     ///
     /// `f32`s are 4 byte, IEEE754 single-precision floating point numbers.
-    ///
-    /// # Failure
-    ///
-    /// Raises the `io_error` condition on error. Returns `0` if
-    /// the condition is handled.
     fn read_be_f32(&mut self) -> f32;
 
     /// Reads a little-endian `u64`.
     ///
     /// `u64`s are 8 bytes long.
-    ///
-    /// # Failure
-    ///
-    /// Raises the `io_error` condition on error. Returns `0` if
-    /// the condition is handled.
     fn read_le_u64(&mut self) -> u64;
 
     /// Reads a little-endian `u32`.
     ///
     /// `u32`s are 4 bytes long.
-    ///
-    /// # Failure
-    ///
-    /// Raises the `io_error` condition on error. Returns `0` if
-    /// the condition is handled.
     fn read_le_u32(&mut self) -> u32;
 
     /// Reads a little-endian `u16`.
     ///
     /// `u16`s are 2 bytes long.
-    ///
-    /// # Failure
-    ///
-    /// Raises the `io_error` condition on error. Returns `0` if
-    /// the condition is handled.
     fn read_le_u16(&mut self) -> u16;
 
     /// Reads a little-endian `i64`.
     ///
     /// `i64`s are 8 bytes long.
-    ///
-    /// # Failure
-    ///
-    /// Raises the `io_error` condition on error. Returns `0` if
-    /// the condition is handled.
     fn read_le_i64(&mut self) -> i64;
 
     /// Reads a little-endian `i32`.
     ///
     /// `i32`s are 4 bytes long.
-    ///
-    /// # Failure
-    ///
-    /// Raises the `io_error` condition on error. Returns `0` if
-    /// the condition is handled.
     fn read_le_i32(&mut self) -> i32;
 
     /// Reads a little-endian `i16`.
     ///
     /// `i16`s are 2 bytes long.
-    ///
-    /// # Failure
-    ///
-    /// Raises the `io_error` condition on error. Returns `0` if
-    /// the condition is handled.
     fn read_le_i16(&mut self) -> i16;
 
     /// Reads a little-endian `f64`.
     ///
     /// `f64`s are 8 byte, IEEE754 double-precision floating point numbers.
-    ///
-    /// # Failure
-    ///
-    /// Raises the `io_error` condition on error. Returns `0` if
-    /// the condition is handled.
     fn read_le_f64(&mut self) -> f64;
 
     /// Reads a little-endian `f32`.
     ///
     /// `f32`s are 4 byte, IEEE754 single-precision floating point numbers.
-    ///
-    /// # Failure
-    ///
-    /// Raises the `io_error` condition on error. Returns `0` if
-    /// the condition is handled.
     fn read_le_f32(&mut self) -> f32;
 
     /// Read a u8.
     ///
     /// `u8`s are 1 byte.
-    ///
-    /// # Failure
-    ///
-    /// Raises the `io_error` condition on error. Returns `0` if
-    /// the condition is handled.
     fn read_u8(&mut self) -> u8;
 
     /// Read an i8.
     ///
     /// `i8`s are 1 byte.
-    ///
-    /// # Failure
-    ///
-    /// Raises the `io_error` condition on error. Returns `0` if
-    /// the condition is handled.
     fn read_i8(&mut self) -> i8;
 
 }
 
 pub trait WriterByteConversions {
     /// Write the result of passing n through `int::to_str_bytes`.
-    ///
-    /// # Failure
-    ///
-    /// Raises the `io_error` condition on error.
     fn write_int(&mut self, n: int);
 
     /// Write the result of passing n through `uint::to_str_bytes`.
-    ///
-    /// # Failure
-    ///
-    /// Raises the `io_error` condition on error.
     fn write_uint(&mut self, n: uint);
 
     /// Write a little-endian uint (number of bytes depends on system).
-    ///
-    /// # Failure
-    ///
-    /// Raises the `io_error` condition on error.
     fn write_le_uint(&mut self, n: uint);
 
     /// Write a little-endian int (number of bytes depends on system).
-    ///
-    /// # Failure
-    ///
-    /// Raises the `io_error` condition on error.
     fn write_le_int(&mut self, n: int);
 
     /// Write a big-endian uint (number of bytes depends on system).
-    ///
-    /// # Failure
-    ///
-    /// Raises the `io_error` condition on error.
     fn write_be_uint(&mut self, n: uint);
 
     /// Write a big-endian int (number of bytes depends on system).
-    ///
-    /// # Failure
-    ///
-    /// Raises the `io_error` condition on error.
     fn write_be_int(&mut self, n: int);
 
     /// Write a big-endian u64 (8 bytes).
-    ///
-    /// # Failure
-    ///
-    /// Raises the `io_error` condition on error.
-    fn write_be_u64(&mut self, n: u64);
+    fn write_be_u64_(&mut self, n: u64);
 
     /// Write a big-endian u32 (4 bytes).
-    ///
-    /// # Failure
-    ///
-    /// Raises the `io_error` condition on error.
     fn write_be_u32(&mut self, n: u32);
 
     /// Write a big-endian u16 (2 bytes).
-    ///
-    /// # Failure
-    ///
-    /// Raises the `io_error` condition on error.
     fn write_be_u16(&mut self, n: u16);
 
     /// Write a big-endian i64 (8 bytes).
-    ///
-    /// # Failure
-    ///
-    /// Raises the `io_error` condition on error.
     fn write_be_i64(&mut self, n: i64);
 
     /// Write a big-endian i32 (4 bytes).
-    ///
-    /// # Failure
-    ///
-    /// Raises the `io_error` condition on error.
     fn write_be_i32(&mut self, n: i32);
 
     /// Write a big-endian i16 (2 bytes).
-    ///
-    /// # Failure
-    ///
-    /// Raises the `io_error` condition on error.
     fn write_be_i16(&mut self, n: i16);
 
     /// Write a big-endian IEEE754 double-precision floating-point (8 bytes).
-    ///
-    /// # Failure
-    ///
-    /// Raises the `io_error` condition on error.
     fn write_be_f64(&mut self, f: f64);
 
     /// Write a big-endian IEEE754 single-precision floating-point (4 bytes).
-    ///
-    /// # Failure
-    ///
-    /// Raises the `io_error` condition on error.
     fn write_be_f32(&mut self, f: f32);
 
     /// Write a little-endian u64 (8 bytes).
-    ///
-    /// # Failure
-    ///
-    /// Raises the `io_error` condition on error.
-    fn write_le_u64(&mut self, n: u64);
+    fn write_le_u64_(&mut self, n: u64);
 
     /// Write a little-endian u32 (4 bytes).
-    ///
-    /// # Failure
-    ///
-    /// Raises the `io_error` condition on error.
     fn write_le_u32(&mut self, n: u32);
 
     /// Write a little-endian u16 (2 bytes).
-    ///
-    /// # Failure
-    ///
-    /// Raises the `io_error` condition on error.
     fn write_le_u16(&mut self, n: u16);
 
     /// Write a little-endian i64 (8 bytes).
-    ///
-    /// # Failure
-    ///
-    /// Raises the `io_error` condition on error.
     fn write_le_i64(&mut self, n: i64);
 
     /// Write a little-endian i32 (4 bytes).
-    ///
-    /// # Failure
-    ///
-    /// Raises the `io_error` condition on error.
     fn write_le_i32(&mut self, n: i32);
 
     /// Write a little-endian i16 (2 bytes).
-    ///
-    /// # Failure
-    ///
-    /// Raises the `io_error` condition on error.
     fn write_le_i16(&mut self, n: i16);
 
     /// Write a little-endian IEEE754 double-precision floating-point
     /// (8 bytes).
-    ///
-    /// # Failure
-    ///
-    /// Raises the `io_error` condition on error.
     fn write_le_f64(&mut self, f: f64);
 
     /// Write a litten-endian IEEE754 single-precision floating-point
     /// (4 bytes).
-    ///
-    /// # Failure
-    ///
-    /// Raises the `io_error` condition on error.
     fn write_le_f32(&mut self, f: f32);
 
     /// Write a u8 (1 byte).
-    ///
-    /// # Failure
-    ///
-    /// Raises the `io_error` condition on error.
     fn write_u8(&mut self, n: u8);
 
     /// Write a i8 (1 byte).
-    ///
-    /// # Failure
-    ///
-    /// Raises the `io_error` condition on error.
     fn write_i8(&mut self, n: i8);
 }
+
+impl<T: Reader> ReaderUtil for T {
+    fn read_byte(&mut self) -> Option<u8> {
+        let mut buf = [0];
+        match self.read(buf) {
+            Some(0) => {
+                debug!("read 0 bytes. trying again");
+                self.read_byte()
+            }
+            Some(1) => Some(buf[0]),
+            Some(_) => util::unreachable(),
+            None => None
+        }
+    }
+
+    fn push_bytes(&mut self, buf: &mut ~[u8], len: uint) {
+        unsafe {
+            let start_len = buf.len();
+            let mut total_read = 0;
+
+            vec::reserve_at_least(buf, start_len + len);
+            vec::raw::set_len(buf, start_len + len);
+
+            do (|| {
+                while total_read < len {
+                    let slice = vec::mut_slice(*buf, start_len + total_read, buf.len());
+                    match self.read(slice) {
+                        Some(nread) => {
+                            total_read += nread;
+                        }
+                        None => {
+                            read_error::cond.raise(standard_error(EndOfFile));
+                            break;
+                        }
+                    }
+                }
+            }).finally {
+                vec::raw::set_len(buf, start_len + total_read);
+            }
+        }
+    }
+
+    fn read_bytes(&mut self, len: uint) -> ~[u8] {
+        let mut buf = vec::with_capacity(len);
+        self.push_bytes(&mut buf, len);
+        return buf;
+    }
+
+    fn read_to_end(&mut self) -> ~[u8] {
+        let mut buf = vec::with_capacity(DEFAULT_BUF_SIZE);
+        let mut keep_reading = true;
+        do read_error::cond.trap(|e| {
+            if e.kind == EndOfFile {
+                keep_reading = false;
+            } else {
+                read_error::cond.raise(e)
+            }
+        }).in {
+            while keep_reading {
+                self.push_bytes(&mut buf, DEFAULT_BUF_SIZE)
+            }
+        }
+        return buf;
+    }
+}
+
+impl<T: Reader> ReaderByteConversions for T {
+    fn read_le_uint_n(&mut self, nbytes: uint) -> u64 {
+        assert!(nbytes > 0 && nbytes <= 8);
+
+        let mut val = 0u64, pos = 0, i = nbytes;
+        while i > 0 {
+            val += (self.read_u8() as u64) << pos;
+            pos += 8;
+            i -= 1;
+        }
+        val
+    }
+
+    fn read_le_int_n(&mut self, nbytes: uint) -> i64 {
+        extend_sign(self.read_le_uint_n(nbytes), nbytes)
+    }
+
+    fn read_be_uint_n(&mut self, nbytes: uint) -> u64 {
+        assert!(nbytes > 0 && nbytes <= 8);
+
+        let mut val = 0u64, i = nbytes;
+        while i > 0 {
+            i -= 1;
+            val += (self.read_u8() as u64) << i * 8;
+        }
+        val
+    }
+
+    fn read_be_int_n(&mut self, nbytes: uint) -> i64 {
+        extend_sign(self.read_be_uint_n(nbytes), nbytes)
+    }
+
+    fn read_le_uint(&mut self) -> uint {
+        self.read_le_uint_n(uint::bytes) as uint
+    }
+
+    fn read_le_int(&mut self) -> int {
+        self.read_le_int_n(int::bytes) as int
+    }
+
+    fn read_be_uint(&mut self) -> uint {
+        self.read_be_uint_n(uint::bytes) as uint
+    }
+
+    fn read_be_int(&mut self) -> int {
+        self.read_be_int_n(int::bytes) as int
+    }
+
+    fn read_be_u64(&mut self) -> u64 {
+        self.read_be_uint_n(8) as u64
+    }
+
+    fn read_be_u32(&mut self) -> u32 {
+        self.read_be_uint_n(4) as u32
+    }
+
+    fn read_be_u16(&mut self) -> u16 {
+        self.read_be_uint_n(2) as u16
+    }
+
+    fn read_be_i64(&mut self) -> i64 {
+        self.read_be_int_n(8) as i64
+    }
+
+    fn read_be_i32(&mut self) -> i32 {
+        self.read_be_int_n(4) as i32
+    }
+
+    fn read_be_i16(&mut self) -> i16 {
+        self.read_be_int_n(2) as i16
+    }
+
+    fn read_be_f64(&mut self) -> f64 {
+        unsafe {
+            cast::transmute::<u64, f64>(self.read_be_u64())
+        }
+    }
+
+    fn read_be_f32(&mut self) -> f32 {
+        unsafe {
+            cast::transmute::<u32, f32>(self.read_be_u32())
+        }
+    }
+
+    fn read_le_u64(&mut self) -> u64 {
+        self.read_le_uint_n(8) as u64
+    }
+
+    fn read_le_u32(&mut self) -> u32 {
+        self.read_le_uint_n(4) as u32
+    }
+
+    fn read_le_u16(&mut self) -> u16 {
+        self.read_le_uint_n(2) as u16
+    }
+
+    fn read_le_i64(&mut self) -> i64 {
+        self.read_le_int_n(8) as i64
+    }
+
+    fn read_le_i32(&mut self) -> i32 {
+        self.read_le_int_n(4) as i32
+    }
+
+    fn read_le_i16(&mut self) -> i16 {
+        self.read_le_int_n(2) as i16
+    }
+
+    fn read_le_f64(&mut self) -> f64 {
+        unsafe {
+            cast::transmute::<u64, f64>(self.read_le_u64())
+        }
+    }
+
+    fn read_le_f32(&mut self) -> f32 {
+        unsafe {
+            cast::transmute::<u32, f32>(self.read_le_u32())
+        }
+    }
+
+    fn read_u8(&mut self) -> u8 {
+        match self.read_byte() {
+            Some(b) => b as u8,
+            None => 0
+        }
+    }
+
+    fn read_i8(&mut self) -> i8 {
+        match self.read_byte() {
+            Some(b) => b as i8,
+            None => 0
+        }
+    }
+
+}
+
+impl<T: Writer> WriterByteConversions for T {
+    fn write_int(&mut self, n: int) {
+        int::to_str_bytes(n, 10u, |bytes| self.write(bytes))
+    }
+
+    fn write_uint(&mut self, n: uint) {
+        uint::to_str_bytes(n, 10u, |bytes| self.write(bytes))
+    }
+
+    fn write_le_uint(&mut self, n: uint) {
+        u64_to_le_bytes(n as u64, uint::bytes, |v| self.write(v))
+    }
+
+    fn write_le_int(&mut self, n: int) {
+        u64_to_le_bytes(n as u64, int::bytes, |v| self.write(v))
+    }
+
+    fn write_be_uint(&mut self, n: uint) {
+        u64_to_be_bytes(n as u64, uint::bytes, |v| self.write(v))
+    }
+
+    fn write_be_int(&mut self, n: int) {
+        u64_to_be_bytes(n as u64, int::bytes, |v| self.write(v))
+    }
+
+    fn write_be_u64_(&mut self, n: u64) {
+        u64_to_be_bytes(n, 8u, |v| self.write(v))
+    }
+
+    fn write_be_u32(&mut self, n: u32) {
+        u64_to_be_bytes(n as u64, 4u, |v| self.write(v))
+    }
+
+    fn write_be_u16(&mut self, n: u16) {
+        u64_to_be_bytes(n as u64, 2u, |v| self.write(v))
+    }
+
+    fn write_be_i64(&mut self, n: i64) {
+        u64_to_be_bytes(n as u64, 8u, |v| self.write(v))
+    }
+
+    fn write_be_i32(&mut self, n: i32) {
+        u64_to_be_bytes(n as u64, 4u, |v| self.write(v))
+    }
+
+    fn write_be_i16(&mut self, n: i16) {
+        u64_to_be_bytes(n as u64, 2u, |v| self.write(v))
+    }
+
+    fn write_be_f64(&mut self, f: f64) {
+        unsafe {
+            self.write_be_u64_(cast::transmute(f))
+        }
+    }
+
+    fn write_be_f32(&mut self, f: f32) {
+        unsafe {
+            self.write_be_u32(cast::transmute(f))
+        }
+    }
+
+    fn write_le_u64_(&mut self, n: u64) {
+        u64_to_le_bytes(n, 8u, |v| self.write(v))
+    }
+
+    fn write_le_u32(&mut self, n: u32) {
+        u64_to_le_bytes(n as u64, 4u, |v| self.write(v))
+    }
+
+    fn write_le_u16(&mut self, n: u16) {
+        u64_to_le_bytes(n as u64, 2u, |v| self.write(v))
+    }
+
+    fn write_le_i64(&mut self, n: i64) {
+        u64_to_le_bytes(n as u64, 8u, |v| self.write(v))
+    }
+
+    fn write_le_i32(&mut self, n: i32) {
+        u64_to_le_bytes(n as u64, 4u, |v| self.write(v))
+    }
+
+    fn write_le_i16(&mut self, n: i16) {
+        u64_to_le_bytes(n as u64, 2u, |v| self.write(v))
+    }
+
+    fn write_le_f64(&mut self, f: f64) {
+        unsafe {
+            self.write_le_u64_(cast::transmute(f))
+        }
+    }
+
+    fn write_le_f32(&mut self, f: f32) {
+        unsafe {
+            self.write_le_u32(cast::transmute(f))
+        }
+    }
+
+    fn write_u8(&mut self, n: u8) {
+        self.write([n]) 
+    }
+
+    fn write_i8(&mut self, n: i8) {
+        self.write([n as u8]) 
+    }
+}
+
+fn extend_sign(val: u64, nbytes: uint) -> i64 {
+    let shift = (8 - nbytes) * 8;
+    (val << shift) as i64 >> shift
+}
+
+#[cfg(test)]
+mod test {
+    use super::{ReaderUtil, ReaderByteConversions, WriterByteConversions};
+    use u64;
+    use i32;
+    use option::{Some, None};
+    use cell::Cell;
+    use rt::io::mem::{MemReader, MemWriter};
+    use rt::io::mock::MockReader;
+    use rt::io::{read_error, placeholder_error};
+
+    #[test]
+    fn read_byte() {
+        let mut reader = MemReader::new(~[10]);
+        let byte = reader.read_byte();
+        assert!(byte == Some(10));
+    }
+
+    #[test]
+    fn read_byte_0_bytes() {
+        let mut reader = MockReader::new();
+        let count = Cell(0);
+        reader.read = |buf| {
+            do count.with_mut_ref |count| {
+                if *count == 0 {
+                    *count = 1;
+                    Some(0)
+                } else {
+                    buf[0] = 10;
+                    Some(1)
+                }
+            }
+        };
+        let byte = reader.read_byte();
+        assert!(byte == Some(10));
+    }
+
+    #[test]
+    fn read_byte_eof() {
+        let mut reader = MockReader::new();
+        reader.read = |_| None;
+        let byte = reader.read_byte();
+        assert!(byte == None);
+    }
+
+    #[test]
+    fn read_byte_error() {
+        let mut reader = MockReader::new();
+        reader.read = |_| {
+            read_error::cond.raise(placeholder_error());
+            None
+        };
+        do read_error::cond.trap(|_| {
+        }).in {
+            let byte = reader.read_byte();
+            assert!(byte == None);
+        }
+    }
+
+    #[test]
+    fn read_bytes() {
+        let mut reader = MemReader::new(~[10, 11, 12, 13]);
+        let bytes = reader.read_bytes(4);
+        assert!(bytes == ~[10, 11, 12, 13]);
+    }
+
+    #[test]
+    fn read_bytes_partial() {
+        let mut reader = MockReader::new();
+        let count = Cell(0);
+        reader.read = |buf| {
+            do count.with_mut_ref |count| {
+                if *count == 0 {
+                    *count = 1;
+                    buf[0] = 10;
+                    buf[1] = 11;
+                    Some(2)
+                } else {
+                    buf[0] = 12;
+                    buf[1] = 13;
+                    Some(2)
+                }
+            }
+        };
+        let bytes = reader.read_bytes(4);
+        assert!(bytes == ~[10, 11, 12, 13]);
+    }
+
+    #[test]
+    fn read_bytes_eof() {
+        let mut reader = MemReader::new(~[10, 11]);
+        do read_error::cond.trap(|_| {
+        }).in {
+            assert!(reader.read_bytes(4) == ~[10, 11]);
+        }
+    }
+
+    #[test]
+    fn push_bytes() {
+        let mut reader = MemReader::new(~[10, 11, 12, 13]);
+        let mut buf = ~[8, 9];
+        reader.push_bytes(&mut buf, 4);
+        assert!(buf == ~[8, 9, 10, 11, 12, 13]);
+    }
+
+    #[test]
+    fn push_bytes_partial() {
+        let mut reader = MockReader::new();
+        let count = Cell(0);
+        reader.read = |buf| {
+            do count.with_mut_ref |count| {
+                if *count == 0 {
+                    *count = 1;
+                    buf[0] = 10;
+                    buf[1] = 11;
+                    Some(2)
+                } else {
+                    buf[0] = 12;
+                    buf[1] = 13;
+                    Some(2)
+                }
+            }
+        };
+        let mut buf = ~[8, 9];
+        reader.push_bytes(&mut buf, 4);
+        assert!(buf == ~[8, 9, 10, 11, 12, 13]);
+    }
+
+    #[test]
+    fn push_bytes_eof() {
+        let mut reader = MemReader::new(~[10, 11]);
+        let mut buf = ~[8, 9];
+        do read_error::cond.trap(|_| {
+        }).in {
+            reader.push_bytes(&mut buf, 4);
+            assert!(buf == ~[8, 9, 10, 11]);
+        }
+    }
+
+    #[test]
+    fn push_bytes_error() {
+        let mut reader = MockReader::new();
+        let count = Cell(0);
+        reader.read = |buf| {
+            do count.with_mut_ref |count| {
+                if *count == 0 {
+                    *count = 1;
+                    buf[0] = 10;
+                    Some(1)
+                } else {
+                    read_error::cond.raise(placeholder_error());
+                    None
+                }
+            }
+        };
+        let mut buf = ~[8, 9];
+        do read_error::cond.trap(|_| { } ).in {
+            reader.push_bytes(&mut buf, 4);
+        }
+        assert!(buf == ~[8, 9, 10]);
+    }
+
+    #[test]
+    #[should_fail]
+    #[ignore(cfg(windows))]
+    fn push_bytes_fail_reset_len() {
+        use unstable::finally::Finally;
+
+        // push_bytes unsafely sets the vector length. This is testing that
+        // upon failure the length is reset correctly.
+        let mut reader = MockReader::new();
+        let count = Cell(0);
+        reader.read = |buf| {
+            do count.with_mut_ref |count| {
+                if *count == 0 {
+                    *count = 1;
+                    buf[0] = 10;
+                    Some(1)
+                } else {
+                    read_error::cond.raise(placeholder_error());
+                    None
+                }
+            }
+        };
+        let buf = @mut ~[8, 9];
+        do (|| {
+            reader.push_bytes(&mut *buf, 4);
+        }).finally {
+            // NB: Using rtassert here to trigger abort on failure since this is a should_fail test
+            rtassert!(*buf == ~[8, 9, 10]);
+        }
+    }
+
+    #[test]
+    fn read_to_end() {
+        let mut reader = MockReader::new();
+        let count = Cell(0);
+        reader.read = |buf| {
+            do count.with_mut_ref |count| {
+                if *count == 0 {
+                    *count = 1;
+                    buf[0] = 10;
+                    buf[1] = 11;
+                    Some(2)
+                } else if *count == 1 {
+                    *count = 2;
+                    buf[0] = 12;
+                    buf[1] = 13;
+                    Some(2)
+                } else {
+                    None
+                }
+            }
+        };
+        let buf = reader.read_to_end();
+        assert!(buf == ~[10, 11, 12, 13]);
+    }
+
+    #[test]
+    #[should_fail]
+    #[ignore(cfg(windows))]
+    fn read_to_end_error() {
+        let mut reader = MockReader::new();
+        let count = Cell(0);
+        reader.read = |buf| {
+            do count.with_mut_ref |count| {
+                if *count == 0 {
+                    *count = 1;
+                    buf[0] = 10;
+                    buf[1] = 11;
+                    Some(2)
+                } else {
+                    read_error::cond.raise(placeholder_error());
+                    None
+                }
+            }
+        };
+        let buf = reader.read_to_end();
+        assert!(buf == ~[10, 11]);
+    }
+
+    // XXX: Some problem with resolve here
+    /*#[test]
+    fn test_read_write_le() {
+        let uints = [0, 1, 2, 42, 10_123, 100_123_456, u64::max_value];
+
+        let mut writer = MemWriter::new();
+        for uints.each |i| {
+            writer.write_le_u64(*i);
+        }
+
+        let mut reader = MemReader::new(writer.inner());
+        for uints.each |i| {
+            assert!(reader.read_le_u64() == *i);
+        }
+    }
+
+    #[test]
+    fn test_read_write_be() {
+        let uints = [0, 1, 2, 42, 10_123, 100_123_456, u64::max_value];
+
+        let mut writer = MemWriter::new();
+        for uints.each |i| {
+            writer.write_be_u64(*i);
+        }
+
+        let mut reader = MemReader::new(writer.inner());
+        for uints.each |i| {
+            assert!(reader.read_be_u64() == *i);
+        }
+    }
+
+    #[test]
+    fn test_read_be_int_n() {
+        let ints = [i32::min_value, -123456, -42, -5, 0, 1, i32::max_value];
+
+        let mut writer = MemWriter::new();
+        for ints.each |i| {
+            writer.write_be_i32(*i);
+        }
+
+        let mut reader = MemReader::new(writer.inner());
+        for ints.each |i| {
+            // this tests that the sign extension is working
+            // (comparing the values as i32 would not test this)
+            assert!(reader.read_be_int_n(4) == *i as i64);
+        }
+    }
+
+    #[test]
+    fn test_read_f32() {
+        //big-endian floating-point 8.1250
+        let buf = ~[0x41, 0x02, 0x00, 0x00];
+
+        let mut writer = MemWriter::new();
+        writer.write(buf);
+
+        let mut reader = MemReader::new(writer.inner());
+        let f = reader.read_be_f32();
+        assert!(f == 8.1250);
+    }
+
+    #[test]
+    fn test_read_write_f32() {
+        let f:f32 = 8.1250;
+
+        let mut writer = MemWriter::new();
+        writer.write_be_f32(f);
+        writer.write_le_f32(f);
+
+        let mut reader = MemReader::new(writer.inner());
+        assert!(reader.read_be_f32() == 8.1250);
+        assert!(reader.read_le_f32() == 8.1250);
+    }*/
+
+}
diff --git a/src/libcore/rt/io/file.rs b/src/libcore/rt/io/file.rs
index 85dc180452f..1f61cf25fbd 100644
--- a/src/libcore/rt/io/file.rs
+++ b/src/libcore/rt/io/file.rs
@@ -10,7 +10,7 @@
 
 use prelude::*;
 use super::support::PathLike;
-use super::{Reader, Writer, Seek, Close};
+use super::{Reader, Writer, Seek};
 use super::SeekStyle;
 
 /// # XXX
@@ -69,10 +69,6 @@ impl Seek for FileStream {
     fn seek(&mut self, _pos: i64, _style: SeekStyle) { fail!() }
 }
 
-impl Close for FileStream {
-    fn close(&mut self) { fail!() }
-}
-
 #[test]
 #[ignore]
 fn super_simple_smoke_test_lets_go_read_some_files_and_have_a_good_time() {
diff --git a/src/libcore/rt/io/mock.rs b/src/libcore/rt/io/mock.rs
new file mode 100644
index 00000000000..b580b752bd9
--- /dev/null
+++ b/src/libcore/rt/io/mock.rs
@@ -0,0 +1,50 @@
+// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+use option::{Option, None};
+use rt::io::{Reader, Writer};
+
+pub struct MockReader {
+    read: ~fn(buf: &mut [u8]) -> Option<uint>,
+    eof: ~fn() -> bool
+}
+
+impl MockReader {
+    pub fn new() -> MockReader {
+        MockReader {
+            read: |_| None,
+            eof: || false
+        }
+    }
+}
+
+impl Reader for MockReader {
+    fn read(&mut self, buf: &mut [u8]) -> Option<uint> { (self.read)(buf) }
+    fn eof(&mut self) -> bool { (self.eof)() }
+}
+
+pub struct MockWriter {
+    write: ~fn(buf: &[u8]),
+    flush: ~fn()
+}
+
+impl MockWriter {
+    pub fn new() -> MockWriter {
+        MockWriter {
+            write: |_| (),
+            flush: || ()
+        }
+    }
+}
+
+impl Writer for MockWriter {
+    fn write(&mut self, buf: &[u8]) { (self.write)(buf) }
+    fn flush(&mut self) { (self.flush)() }
+}
\ No newline at end of file
diff --git a/src/libcore/rt/io/mod.rs b/src/libcore/rt/io/mod.rs
index fea32bc5b75..802e069a738 100644
--- a/src/libcore/rt/io/mod.rs
+++ b/src/libcore/rt/io/mod.rs
@@ -187,7 +187,7 @@ In particular code written to ignore errors and expect conditions to be unhandle
 will start passing around null or zero objects when wrapped in a condition handler.
 
 * XXX: How should we use condition handlers that return values?
-
+* XXX: Should EOF raise default conditions when EOF is not an error?
 
 # Issues withi/o scheduler affinity, work stealing, task pinning
 
@@ -238,6 +238,7 @@ Out of scope
 * How does I/O relate to the Iterator trait?
 * std::base64 filters
 * Using conditions is a big unknown since we don't have much experience with them
+* Too many uses of OtherIoError
 
 */
 
@@ -252,13 +253,18 @@ pub use self::stdio::println;
 
 pub use self::file::FileStream;
 pub use self::net::ip::IpAddr;
+#[cfg(not(stage0))]
 pub use self::net::tcp::TcpListener;
+#[cfg(not(stage0))]
 pub use self::net::tcp::TcpStream;
 pub use self::net::udp::UdpStream;
 
 // Some extension traits that all Readers and Writers get.
+#[cfg(not(stage0))] // Requires condition! fixes
 pub use self::extensions::ReaderUtil;
+#[cfg(not(stage0))] // Requires condition! fixes
 pub use self::extensions::ReaderByteConversions;
+#[cfg(not(stage0))] // Requires condition! fixes
 pub use self::extensions::WriterByteConversions;
 
 /// Synchronous, non-blocking file I/O.
@@ -266,6 +272,7 @@ pub mod file;
 
 /// Synchronous, non-blocking network I/O.
 pub mod net {
+    #[cfg(not(stage0))]
     pub mod tcp;
     pub mod udp;
     pub mod ip;
@@ -291,6 +298,7 @@ pub mod flate;
 pub mod comm_adapters;
 
 /// Extension traits
+#[cfg(not(stage0))] // Requires condition! fixes
 mod extensions;
 
 /// Non-I/O things needed by the I/O module
@@ -312,6 +320,12 @@ pub mod native {
     }
 }
 
+/// Mock implementations for testing
+mod mock;
+
+/// The default buffer size for various I/O operations
+/// XXX: Not pub
+pub static DEFAULT_BUF_SIZE: uint = 1024 * 64;
 
 /// The type passed to I/O condition handlers to indicate error
 ///
@@ -326,12 +340,16 @@ pub struct IoError {
 
 #[deriving(Eq)]
 pub enum IoErrorKind {
+    PreviousIoError,
+    OtherIoError,
+    EndOfFile,
     FileNotFound,
-    FilePermission,
+    PermissionDenied,
     ConnectionFailed,
     Closed,
-    OtherIoError,
-    PreviousIoError
+    ConnectionRefused,
+    ConnectionReset,
+    BrokenPipe
 }
 
 // XXX: Can't put doc comments on macros
@@ -341,19 +359,36 @@ condition! {
     /*pub*/ io_error: super::IoError -> ();
 }
 
+// XXX: Can't put doc comments on macros
+// Raised by `read` on error
+condition! {
+    // FIXME (#6009): uncomment `pub` after expansion support lands.
+    /*pub*/ read_error: super::IoError -> ();
+}
+
 pub trait Reader {
     /// Read bytes, up to the length of `buf` and place them in `buf`.
-    /// Returns the number of bytes read, or `None` on EOF.
+    /// Returns the number of bytes read. The number of bytes read my
+    /// be less than the number requested, even 0. Returns `None` on EOF.
     ///
     /// # Failure
     ///
-    /// Raises the `io_error` condition on error, then returns `None`.
+    /// Raises the `read_error` condition on error. If the condition
+    /// is handled then no guarantee is made about the number of bytes
+    /// read and the contents of `buf`. If the condition is handled
+    /// returns `None` (XXX see below).
     ///
     /// # XXX
     ///
+    /// * Should raise_default error on eof?
+    /// * If the condition is handled it should still return the bytes read,
+    ///   in which case there's no need to return Option - but then you *have*
+    ///   to install a handler to detect eof.
+    ///
     /// This doesn't take a `len` argument like the old `read`.
     /// Will people often need to slice their vectors to call this
     /// and will that be annoying?
+    /// Is it actually possible for 0 bytes to be read successfully?
     fn read(&mut self, buf: &mut [u8]) -> Option<uint>;
 
     /// Return whether the Reader has reached the end of the stream.
@@ -383,16 +418,7 @@ pub trait Writer {
     fn flush(&mut self);
 }
 
-/// I/O types that may be closed
-///
-/// Any further operations performed on a closed resource will raise
-/// on `io_error`
-pub trait Close {
-    /// Close the I/O resource
-    fn close(&mut self);
-}
-
-pub trait Stream: Reader + Writer + Close { }
+pub trait Stream: Reader + Writer { }
 
 pub enum SeekStyle {
     /// Seek from the beginning of the stream
@@ -466,6 +492,21 @@ pub fn standard_error(kind: IoErrorKind) -> IoError {
                 detail: None
             }
         }
+        EndOfFile => {
+            IoError {
+                kind: EndOfFile,
+                desc: "End of file",
+                detail: None
+            }
+        }
         _ => fail!()
     }
 }
+
+pub fn placeholder_error() -> IoError {
+    IoError {
+        kind: OtherIoError,
+        desc: "Placeholder error. You shouldn't be seeing this",
+        detail: None
+    }
+}
\ No newline at end of file
diff --git a/src/libcore/rt/io/native/file.rs b/src/libcore/rt/io/native/file.rs
index e203df815f2..31c90336a24 100644
--- a/src/libcore/rt/io/native/file.rs
+++ b/src/libcore/rt/io/native/file.rs
@@ -40,10 +40,6 @@ impl Writer for FileDesc {
     fn flush(&mut self) { fail!() }
 }
 
-impl Close for FileDesc {
-    fn close(&mut self) { fail!() }
-}
-
 impl Seek for FileDesc {
     fn tell(&self) -> u64 { fail!() }
 
@@ -72,10 +68,6 @@ impl Writer for CFile {
     fn flush(&mut self) { fail!() }
 }
 
-impl Close for CFile {
-    fn close(&mut self) { fail!() }
-}
-
 impl Seek for CFile {
     fn tell(&self) -> u64 { fail!() }
     fn seek(&mut self, _pos: i64, _style: SeekStyle) { fail!() }
diff --git a/src/libcore/rt/io/net/tcp.rs b/src/libcore/rt/io/net/tcp.rs
index c95b4344fe7..a833e92fc10 100644
--- a/src/libcore/rt/io/net/tcp.rs
+++ b/src/libcore/rt/io/net/tcp.rs
@@ -8,67 +8,349 @@
 // option. This file may not be copied, modified, or distributed
 // except according to those terms.
 
-use prelude::*;
-use super::super::*;
-use super::ip::IpAddr;
+use option::{Option, Some, None};
+use result::{Ok, Err};
+use rt::sched::local_sched::unsafe_borrow_io;
+use rt::io::net::ip::IpAddr;
+use rt::io::{Reader, Writer, Listener};
+use rt::io::{io_error, read_error, EndOfFile};
+use rt::rtio::{IoFactory,
+               RtioTcpListener, RtioTcpListenerObject,
+               RtioTcpStream, RtioTcpStreamObject};
 
-pub struct TcpStream;
+pub struct TcpStream {
+    rtstream: ~RtioTcpStreamObject
+}
 
 impl TcpStream {
-    pub fn connect(_addr: IpAddr) -> Option<TcpStream> {
-        fail!()
+    fn new(s: ~RtioTcpStreamObject) -> TcpStream {
+        TcpStream {
+            rtstream: s
+        }
+    }
+
+    pub fn connect(addr: IpAddr) -> Option<TcpStream> {
+        let stream = unsafe {
+            rtdebug!("borrowing io to connect");
+            let io = unsafe_borrow_io();
+            rtdebug!("about to connect");
+            (*io).tcp_connect(addr)
+        };
+
+        match stream {
+            Ok(s) => {
+                Some(TcpStream::new(s))
+            }
+            Err(ioerr) => {
+                rtdebug!("failed to connect: %?", ioerr);
+                io_error::cond.raise(ioerr);
+                return None;
+            }
+        }
     }
 }
 
 impl Reader for TcpStream {
-    fn read(&mut self, _buf: &mut [u8]) -> Option<uint> { fail!() }
+    fn read(&mut self, buf: &mut [u8]) -> Option<uint> {
+        let bytes_read = self.rtstream.read(buf);
+        match bytes_read {
+            Ok(read) => Some(read),
+            Err(ioerr) => {
+                // EOF is indicated by returning None
+                if ioerr.kind != EndOfFile {
+                    read_error::cond.raise(ioerr);
+                }
+                return None;
+            }
+        }
+    }
 
     fn eof(&mut self) -> bool { fail!() }
 }
 
 impl Writer for TcpStream {
-    fn write(&mut self, _buf: &[u8]) { fail!() }
+    fn write(&mut self, buf: &[u8]) {
+        let res = self.rtstream.write(buf);
+        match res {
+            Ok(_) => (),
+            Err(ioerr) => {
+                io_error::cond.raise(ioerr);
+            }
+        }
+    }
 
     fn flush(&mut self) { fail!() }
 }
 
-impl Close for TcpStream {
-    fn close(&mut self) { fail!() }
+pub struct TcpListener {
+    rtlistener: ~RtioTcpListenerObject,
 }
 
-pub struct TcpListener;
-
 impl TcpListener {
-    pub fn bind(_addr: IpAddr) -> Option<TcpListener> {
-        fail!()
+    pub fn bind(addr: IpAddr) -> Option<TcpListener> {
+        let listener = unsafe { (*unsafe_borrow_io()).tcp_bind(addr) };
+        match listener {
+            Ok(l) => {
+                Some(TcpListener {
+                    rtlistener: l
+                })
+            }
+            Err(ioerr) => {
+                io_error::cond.raise(ioerr);
+                return None;
+            }
+        }
     }
 }
 
 impl Listener<TcpStream> for TcpListener {
-    fn accept(&mut self) -> Option<TcpStream> { fail!() }
+    fn accept(&mut self) -> Option<TcpStream> {
+        let rtstream = self.rtlistener.accept();
+        match rtstream {
+            Ok(s) => {
+                Some(TcpStream::new(s))
+            }
+            Err(ioerr) => {
+                io_error::cond.raise(ioerr);
+                return None;
+            }
+        }
+    }
 }
 
 #[cfg(test)]
 mod test {
+    use super::*;
+    use int;
+    use cell::Cell;
+    use rt::test::*;
+    use rt::io::net::ip::Ipv4;
+    use rt::io::*;
 
     #[test] #[ignore]
+    fn bind_error() {
+        do run_in_newsched_task {
+            let mut called = false;
+            do io_error::cond.trap(|e| {
+                assert!(e.kind == PermissionDenied);
+                called = true;
+            }).in {
+                let addr = Ipv4(0, 0, 0, 0, 1);
+                let listener = TcpListener::bind(addr);
+                assert!(listener.is_none());
+            }
+            assert!(called);
+        }
+    }
+
+    #[test]
+    fn connect_error() {
+        do run_in_newsched_task {
+            let mut called = false;
+            do io_error::cond.trap(|e| {
+                assert!(e.kind == ConnectionRefused);
+                called = true;
+            }).in {
+                let addr = Ipv4(0, 0, 0, 0, 1);
+                let stream = TcpStream::connect(addr);
+                assert!(stream.is_none());
+            }
+            assert!(called);
+        }
+    }
+
+    #[test]
     fn smoke_test() {
-        /*do run_in_newsched_task {
+        do run_in_newsched_task {
             let addr = next_test_ip4();
 
-            do spawn_immediately {
-                let listener = TcpListener::bind(addr);
-                do listener.accept() {
+            do spawntask_immediately {
+                let mut listener = TcpListener::bind(addr);
+                let mut stream = listener.accept();
+                let mut buf = [0];
+                stream.read(buf);
+                assert!(buf[0] == 99);
+            }
+
+            do spawntask_immediately {
+                let mut stream = TcpStream::connect(addr);
+                stream.write([99]);
+            }
+        }
+    }
+
+    #[test]
+    fn read_eof() {
+        do run_in_newsched_task {
+            let addr = next_test_ip4();
+
+            do spawntask_immediately {
+                let mut listener = TcpListener::bind(addr);
+                let mut stream = listener.accept();
+                let mut buf = [0];
+                let nread = stream.read(buf);
+                assert!(nread.is_none());
+            }
+
+            do spawntask_immediately {
+                let _stream = TcpStream::connect(addr);
+                // Close
+            }
+        }
+    }
+
+    #[test]
+    fn read_eof_twice() {
+        do run_in_newsched_task {
+            let addr = next_test_ip4();
+
+            do spawntask_immediately {
+                let mut listener = TcpListener::bind(addr);
+                let mut stream = listener.accept();
+                let mut buf = [0];
+                let nread = stream.read(buf);
+                assert!(nread.is_none());
+                let nread = stream.read(buf);
+                assert!(nread.is_none());
+            }
+
+            do spawntask_immediately {
+                let _stream = TcpStream::connect(addr);
+                // Close
+            }
+        }
+    }
+
+    #[test]
+    fn write_close() {
+        do run_in_newsched_task {
+            let addr = next_test_ip4();
+
+            do spawntask_immediately {
+                let mut listener = TcpListener::bind(addr);
+                let mut stream = listener.accept();
+                let buf = [0];
+                loop {
+                    let mut stop = false;
+                    do io_error::cond.trap(|e| {
+                        // NB: ECONNRESET on linux, EPIPE on mac
+                        assert!(e.kind == ConnectionReset || e.kind == BrokenPipe);
+                        stop = true;
+                    }).in {
+                        stream.write(buf);
+                    }
+                    if stop { break }
+                }
+            }
+
+            do spawntask_immediately {
+                let _stream = TcpStream::connect(addr);
+                // Close
+            }
+        }
+    }
+
+    #[test]
+    fn multiple_connect_serial() {
+        do run_in_newsched_task {
+            let addr = next_test_ip4();
+            let max = 10;
+
+            do spawntask_immediately {
+                let mut listener = TcpListener::bind(addr);
+                for max.times {
+                    let mut stream = listener.accept();
                     let mut buf = [0];
-                    listener.read(buf);
+                    stream.read(buf);
                     assert!(buf[0] == 99);
                 }
             }
 
-            do spawn_immediately {
-                let stream = TcpStream::connect(addr);
-                stream.write([99]);
+            do spawntask_immediately {
+                for max.times {
+                    let mut stream = TcpStream::connect(addr);
+                    stream.write([99]);
+                }
             }
-        }*/
+        }
     }
+
+    #[test]
+    fn multiple_connect_interleaved_greedy_schedule() {
+        do run_in_newsched_task {
+            let addr = next_test_ip4();
+            static MAX: int = 10;
+
+            do spawntask_immediately {
+                let mut listener = TcpListener::bind(addr);
+                for int::range(0, MAX) |i| {
+                    let stream = Cell(listener.accept());
+                    rtdebug!("accepted");
+                    // Start another task to handle the connection
+                    do spawntask_immediately {
+                        let mut stream = stream.take();
+                        let mut buf = [0];
+                        stream.read(buf);
+                        assert!(buf[0] == i as u8);
+                        rtdebug!("read");
+                    }
+                }
+            }
+
+            connect(0, addr);
+
+            fn connect(i: int, addr: IpAddr) {
+                if i == MAX { return }
+
+                do spawntask_immediately {
+                    rtdebug!("connecting");
+                    let mut stream = TcpStream::connect(addr);
+                    // Connect again before writing
+                    connect(i + 1, addr);
+                    rtdebug!("writing");
+                    stream.write([i as u8]);
+                }
+            }
+        }
+    }
+
+    #[test]
+    fn multiple_connect_interleaved_lazy_schedule() {
+        do run_in_newsched_task {
+            let addr = next_test_ip4();
+            static MAX: int = 10;
+
+            do spawntask_immediately {
+                let mut listener = TcpListener::bind(addr);
+                for int::range(0, MAX) |_| {
+                    let stream = Cell(listener.accept());
+                    rtdebug!("accepted");
+                    // Start another task to handle the connection
+                    do spawntask_later {
+                        let mut stream = stream.take();
+                        let mut buf = [0];
+                        stream.read(buf);
+                        assert!(buf[0] == 99);
+                        rtdebug!("read");
+                    }
+                }
+            }
+
+            connect(0, addr);
+
+            fn connect(i: int, addr: IpAddr) {
+                if i == MAX { return }
+
+                do spawntask_later {
+                    rtdebug!("connecting");
+                    let mut stream = TcpStream::connect(addr);
+                    // Connect again before writing
+                    connect(i + 1, addr);
+                    rtdebug!("writing");
+                    stream.write([99]);
+                }
+            }
+        }
+    }
+
 }
diff --git a/src/libcore/rt/io/net/udp.rs b/src/libcore/rt/io/net/udp.rs
index 1f1254a7029..bb5457e334d 100644
--- a/src/libcore/rt/io/net/udp.rs
+++ b/src/libcore/rt/io/net/udp.rs
@@ -32,10 +32,6 @@ impl Writer for UdpStream {
     fn flush(&mut self) { fail!() }
 }
 
-impl Close for UdpStream {
-    fn close(&mut self) { fail!() }
-}
-
 pub struct UdpListener;
 
 impl UdpListener {
diff --git a/src/libcore/rt/io/net/unix.rs b/src/libcore/rt/io/net/unix.rs
index f449a857467..b85b7dd059d 100644
--- a/src/libcore/rt/io/net/unix.rs
+++ b/src/libcore/rt/io/net/unix.rs
@@ -32,10 +32,6 @@ impl Writer for UnixStream {
     fn flush(&mut self) { fail!() }
 }
 
-impl Close for UnixStream {
-    fn close(&mut self) { fail!() }
-}
-
 pub struct UnixListener;
 
 impl UnixListener {
diff --git a/src/libcore/rt/io/option.rs b/src/libcore/rt/io/option.rs
index 95f8711cb5b..6ae747f8b4b 100644
--- a/src/libcore/rt/io/option.rs
+++ b/src/libcore/rt/io/option.rs
@@ -18,7 +18,7 @@
 
 use option::*;
 use super::{Reader, Writer, Listener};
-use super::{standard_error, PreviousIoError, io_error, IoError};
+use super::{standard_error, PreviousIoError, io_error, read_error, IoError};
 
 fn prev_io_error() -> IoError {
     standard_error(PreviousIoError)
@@ -45,7 +45,7 @@ impl<R: Reader> Reader for Option<R> {
         match *self {
             Some(ref mut reader) => reader.read(buf),
             None => {
-                io_error::cond.raise(prev_io_error());
+                read_error::cond.raise(prev_io_error());
                 None
             }
         }
@@ -79,7 +79,7 @@ mod test {
     use option::*;
     use super::super::mem::*;
     use rt::test::*;
-    use super::super::{PreviousIoError, io_error};
+    use super::super::{PreviousIoError, io_error, read_error};
 
     #[test]
     fn test_option_writer() {
@@ -133,7 +133,7 @@ mod test {
         let mut buf = [];
 
         let mut called = false;
-        do io_error::cond.trap(|err| {
+        do read_error::cond.trap(|err| {
             assert!(err.kind == PreviousIoError);
             called = true;
         }).in {
diff --git a/src/libcore/rt/io/stdio.rs b/src/libcore/rt/io/stdio.rs
index 26950986f7a..247fe954408 100644
--- a/src/libcore/rt/io/stdio.rs
+++ b/src/libcore/rt/io/stdio.rs
@@ -9,7 +9,7 @@
 // except according to those terms.
 
 use prelude::*;
-use super::{Reader, Writer, Close};
+use super::{Reader, Writer};
 
 pub fn stdin() -> StdReader { fail!() }
 
@@ -39,10 +39,6 @@ impl Reader for StdReader {
     fn eof(&mut self) -> bool { fail!() }
 }
 
-impl Close for StdReader {
-    fn close(&mut self) { fail!() }
-}
-
 pub struct StdWriter;
 
 impl StdWriter {
@@ -55,6 +51,3 @@ impl Writer for StdWriter {
     fn flush(&mut self) { fail!() }
 }
 
-impl Close for StdWriter {
-    fn close(&mut self) { fail!() }
-}
diff --git a/src/libcore/rt/sched/local_sched.rs b/src/libcore/rt/local_sched.rs
index a7e02f30e01..895354d2218 100644
--- a/src/libcore/rt/sched/local_sched.rs
+++ b/src/libcore/rt/local_sched.rs
@@ -13,18 +13,21 @@
 use prelude::*;
 use ptr::mut_null;
 use libc::c_void;
-use cast::transmute;
+use cast;
+use cell::Cell;
 
-use super::Scheduler;
-use super::super::rtio::IoFactoryObject;
-use tls = super::super::thread_local_storage;
-#[cfg(test)] use super::super::uvio::UvEventLoop;
+use rt::sched::Scheduler;
+use rt::rtio::{EventLoop, IoFactoryObject};
+use tls = rt::thread_local_storage;
+use unstable::finally::Finally;
+
+#[cfg(test)] use rt::uv::uvio::UvEventLoop;
 
 /// Give the Scheduler to thread-local storage
 pub fn put(sched: ~Scheduler) {
     unsafe {
         let key = tls_key();
-        let void_sched: *mut c_void = transmute::<~Scheduler, *mut c_void>(sched);
+        let void_sched: *mut c_void = cast::transmute(sched);
         tls::set(key, void_sched);
     }
 }
@@ -34,8 +37,8 @@ pub fn take() -> ~Scheduler {
     unsafe {
         let key = tls_key();
         let void_sched: *mut c_void = tls::get(key);
-        assert!(void_sched.is_not_null());
-        let sched = transmute::<*mut c_void, ~Scheduler>(void_sched);
+        rtassert!(void_sched.is_not_null());
+        let sched: ~Scheduler = cast::transmute(void_sched);
         tls::set(key, mut_null());
         return sched;
     }
@@ -55,8 +58,18 @@ pub fn exists() -> bool {
 /// While the scheduler is borrowed it is not available in TLS.
 pub fn borrow(f: &fn(&mut Scheduler)) {
     let mut sched = take();
-    f(sched);
-    put(sched);
+
+    // XXX: Need a different abstraction from 'finally' here to avoid unsafety
+    unsafe {
+        let unsafe_sched = cast::transmute_mut_region(&mut *sched);
+        let sched = Cell(sched);
+
+        do (|| {
+            f(unsafe_sched);
+        }).finally {
+            put(sched.take());
+        }
+    }
 }
 
 /// Borrow a mutable reference to the thread-local Scheduler
@@ -65,33 +78,35 @@ pub fn borrow(f: &fn(&mut Scheduler)) {
 ///
 /// Because this leaves the Scheduler in thread-local storage it is possible
 /// For the Scheduler pointer to be aliased
-pub unsafe fn unsafe_borrow() -> &mut Scheduler {
+pub unsafe fn unsafe_borrow() -> *mut Scheduler {
     let key = tls_key();
     let mut void_sched: *mut c_void = tls::get(key);
-    assert!(void_sched.is_not_null());
+    rtassert!(void_sched.is_not_null());
     {
-        let void_sched_ptr = &mut void_sched;
-        let sched: &mut ~Scheduler = {
-            transmute::<&mut *mut c_void, &mut ~Scheduler>(void_sched_ptr)
-        };
-        let sched: &mut Scheduler = &mut **sched;
+        let sched: *mut *mut c_void = &mut void_sched;
+        let sched: *mut ~Scheduler = sched as *mut ~Scheduler;
+        let sched: *mut Scheduler = &mut **sched;
         return sched;
     }
 }
 
-pub unsafe fn unsafe_borrow_io() -> &mut IoFactoryObject {
+pub unsafe fn unsafe_borrow_io() -> *mut IoFactoryObject {
     let sched = unsafe_borrow();
-    return sched.event_loop.io().unwrap();
+    let io: *mut IoFactoryObject = (*sched).event_loop.io().unwrap();
+    return io;
 }
 
 fn tls_key() -> tls::Key {
-    maybe_tls_key().get()
+    match maybe_tls_key() {
+        Some(key) => key,
+        None => abort!("runtime tls key not initialized")
+    }
 }
 
 fn maybe_tls_key() -> Option<tls::Key> {
     unsafe {
-        let key: *mut c_void = rust_get_sched_tls_key();
-        let key: &mut tls::Key = transmute(key);
+        let key: *mut c_void = rust_get_rt_tls_key();
+        let key: &mut tls::Key = cast::transmute(key);
         let key = *key;
         // Check that the key has been initialized.
 
@@ -105,7 +120,7 @@ fn maybe_tls_key() -> Option<tls::Key> {
         // another thread. I think this is fine since the only action
         // they could take if it was initialized would be to check the
         // thread-local value and see that it's not set.
-        if key != 0 {
+        if key != -1 {
             return Some(key);
         } else {
             return None;
@@ -114,7 +129,8 @@ fn maybe_tls_key() -> Option<tls::Key> {
 }
 
 extern {
-    fn rust_get_sched_tls_key() -> *mut c_void;
+    #[fast_ffi]
+    fn rust_get_rt_tls_key() -> *mut c_void;
 }
 
 #[test]
diff --git a/src/libcore/rt/local_services.rs b/src/libcore/rt/local_services.rs
index bc945707e62..98bfc2fa168 100644
--- a/src/libcore/rt/local_services.rs
+++ b/src/libcore/rt/local_services.rs
@@ -23,19 +23,19 @@ use libc::{c_void, uintptr_t};
 use cast::transmute;
 use super::sched::local_sched;
 use super::local_heap::LocalHeap;
+use rt::logging::StdErrLogger;
 
 pub struct LocalServices {
     heap: LocalHeap,
     gc: GarbageCollector,
     storage: LocalStorage,
-    logger: Logger,
+    logger: StdErrLogger,
     unwinder: Option<Unwinder>,
     destroyed: bool
 }
 
 pub struct GarbageCollector;
 pub struct LocalStorage(*c_void, Option<~fn(*c_void)>);
-pub struct Logger;
 
 pub struct Unwinder {
     unwinding: bool,
@@ -47,7 +47,7 @@ impl LocalServices {
             heap: LocalHeap::new(),
             gc: GarbageCollector,
             storage: LocalStorage(ptr::null(), None),
-            logger: Logger,
+            logger: StdErrLogger,
             unwinder: Some(Unwinder { unwinding: false }),
             destroyed: false
         }
@@ -58,7 +58,7 @@ impl LocalServices {
             heap: LocalHeap::new(),
             gc: GarbageCollector,
             storage: LocalStorage(ptr::null(), None),
-            logger: Logger,
+            logger: StdErrLogger,
             unwinder: None,
             destroyed: false
         }
@@ -169,19 +169,27 @@ pub fn borrow_local_services(f: &fn(&mut LocalServices)) {
     }
 }
 
-pub unsafe fn unsafe_borrow_local_services() -> &mut LocalServices {
-    use cast::transmute_mut_region;
-
-    match local_sched::unsafe_borrow().current_task {
+pub unsafe fn unsafe_borrow_local_services() -> *mut LocalServices {
+    match (*local_sched::unsafe_borrow()).current_task {
         Some(~ref mut task) => {
-            transmute_mut_region(&mut task.local_services)
+            let s: *mut LocalServices = &mut task.local_services;
+            return s;
         }
         None => {
-            fail!("no local services for schedulers yet")
+            // Don't fail. Infinite recursion
+            abort!("no local services for schedulers yet")
         }
     }
 }
 
+pub unsafe fn unsafe_try_borrow_local_services() -> Option<*mut LocalServices> {
+    if local_sched::exists() {
+        Some(unsafe_borrow_local_services())
+    } else {
+        None
+    }
+}
+
 #[cfg(test)]
 mod test {
     use rt::test::*;
@@ -229,4 +237,12 @@ mod test {
             let _ = r.next();
         }
     }
+
+    #[test]
+    fn logging() {
+        do run_in_newsched_task() {
+            info!("here i am. logging in a newsched task");
+        }
+    }
 }
+
diff --git a/src/libcore/rt/logging.rs b/src/libcore/rt/logging.rs
new file mode 100644
index 00000000000..a0d05397689
--- /dev/null
+++ b/src/libcore/rt/logging.rs
@@ -0,0 +1,68 @@
+// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+use either::*;
+
+pub trait Logger {
+    fn log(&mut self, msg: Either<~str, &'static str>);
+}
+
+pub struct StdErrLogger;
+
+impl Logger for StdErrLogger {
+    fn log(&mut self, msg: Either<~str, &'static str>) {
+        use io::{Writer, WriterUtil};
+
+        let s: &str = match msg {
+            Left(ref s) => {
+                let s: &str = *s;
+                s
+            }
+            Right(ref s) => {
+                let s: &str = *s;
+                s
+            }
+        };
+        let dbg = ::libc::STDERR_FILENO as ::io::fd_t;
+        dbg.write_str(s);
+        dbg.write_str("\n");
+        dbg.flush();
+    }
+}
+
+/// Configure logging by traversing the crate map and setting the
+/// per-module global logging flags based on the logging spec
+pub fn init(crate_map: *u8) {
+    use os;
+    use str;
+    use ptr;
+    use option::{Some, None};
+    use libc::c_char;
+
+    let log_spec = os::getenv("RUST_LOG");
+    match log_spec {
+        Some(spec) => {
+            do str::as_c_str(spec) |s| {
+                unsafe {
+                    rust_update_log_settings(crate_map, s);
+                }
+            }
+        }
+        None => {
+            unsafe {
+                rust_update_log_settings(crate_map, ptr::null());
+            }
+        }
+    }
+
+    extern {
+        fn rust_update_log_settings(crate_map: *u8, settings: *c_char);
+    }
+}
diff --git a/src/libcore/rt/mod.rs b/src/libcore/rt/mod.rs
index fbbc8274340..7a772ff0f3b 100644
--- a/src/libcore/rt/mod.rs
+++ b/src/libcore/rt/mod.rs
@@ -8,70 +8,161 @@
 // option. This file may not be copied, modified, or distributed
 // except according to those terms.
 
-/*! The Rust runtime, including the scheduler and I/O interface */
+/*! The Rust Runtime, including the task scheduler and I/O
+
+The `rt` module provides the private runtime infrastructure necessary
+to support core language features like the exchange and local heap,
+the garbage collector, logging, local data and unwinding. It also
+implements the default task scheduler and task model. Initialization
+routines are provided for setting up runtime resources in common
+configurations, including that used by `rustc` when generating
+executables.
+
+It is intended that the features provided by `rt` can be factored in a
+way such that the core library can be built with different 'profiles'
+for different use cases, e.g. excluding the task scheduler. A number
+of runtime features though are critical to the functioning of the
+language and an implementation must be provided regardless of the
+execution environment.
+
+Of foremost importance is the global exchange heap, in the module
+`global_heap`. Very little practical Rust code can be written without
+access to the global heap. Unlike most of `rt` the global heap is
+truly a global resource and generally operates independently of the
+rest of the runtime.
+
+All other runtime features are 'local', either thread-local or
+task-local.  Those critical to the functioning of the language are
+defined in the module `local_services`. Local services are those which
+are expected to be available to Rust code generally but rely on
+thread- or task-local state. These currently include the local heap,
+the garbage collector, local storage, logging and the stack unwinder.
+Local services are primarily implemented for tasks, but may also
+be implemented for use outside of tasks.
+
+The relationship between `rt` and the rest of the core library is
+not entirely clear yet and some modules will be moving into or
+out of `rt` as development proceeds.
+
+Several modules in `core` are clients of `rt`:
+
+* `core::task` - The user-facing interface to the Rust task model.
+* `core::task::local_data` - The interface to local data.
+* `core::gc` - The garbage collector.
+* `core::unstable::lang` - Miscellaneous lang items, some of which rely on `core::rt`.
+* `core::condition` - Uses local data.
+* `core::cleanup` - Local heap destruction.
+* `core::io` - In the future `core::io` will use an `rt` implementation.
+* `core::logging`
+* `core::pipes`
+* `core::comm`
+* `core::stackwalk`
+
+*/
 
 #[doc(hidden)];
 
-use libc::c_char;
 use ptr::Ptr;
 
-#[path = "sched/mod.rs"]
+/// The global (exchange) heap.
+pub mod global_heap;
+
+/// The Scheduler and Coroutine types.
 mod sched;
+
+/// Thread-local access to the current Scheduler.
+pub mod local_sched;
+
+/// Synchronous I/O.
+#[path = "io/mod.rs"]
+pub mod io;
+
+/// Thread-local implementations of language-critical runtime features like @.
+pub mod local_services;
+
+/// The EventLoop and internal synchronous I/O interface.
 mod rtio;
-pub mod uvll;
-mod uvio;
+
+/// libuv and default rtio implementation.
 #[path = "uv/mod.rs"]
-mod uv;
-#[path = "io/mod.rs"]
-mod io;
+pub mod uv;
+
 // FIXME #5248: The import in `sched` doesn't resolve unless this is pub!
+/// Bindings to pthread/windows thread-local storage.
 pub mod thread_local_storage;
+
+/// A parallel work-stealing dequeue.
 mod work_queue;
+
+/// Stack segments and caching.
 mod stack;
+
+/// CPU context swapping.
 mod context;
+
+/// Bindings to system threading libraries.
 mod thread;
+
+/// The runtime configuration, read from environment variables
 pub mod env;
-pub mod local_services;
+
+/// The local, managed heap
 mod local_heap;
 
+/// The Logger trait and implementations
+pub mod logging;
+
 /// Tools for testing the runtime
-#[cfg(test)]
 pub mod test;
 
-pub fn start(main: *u8, _argc: int, _argv: **c_char, _crate_map: *u8) -> int {
-
-    use self::sched::{Scheduler, Task};
-    use self::uvio::UvEventLoop;
-    use sys::Closure;
-    use ptr;
-    use cast;
+/// Reference counting
+pub mod rc;
+
+/// A simple single-threaded channel type for passing buffered data between
+/// scheduler and task context
+pub mod tube;
+
+/// Set up a default runtime configuration, given compiler-supplied arguments.
+///
+/// This is invoked by the `start` _language item_ (unstable::lang) to
+/// run a Rust executable.
+///
+/// # Arguments
+///
+/// * `argc` & `argv` - The argument vector. On Unix this information is used
+///   by os::args.
+/// * `crate_map` - Runtime information about the executing crate, mostly for logging
+///
+/// # Return value
+///
+/// The return value is used as the process return code. 0 on success, 101 on error.
+pub fn start(_argc: int, _argv: **u8, crate_map: *u8, main: ~fn()) -> int {
+
+    use self::sched::{Scheduler, Coroutine};
+    use self::uv::uvio::UvEventLoop;
+
+    init(crate_map);
 
     let loop_ = ~UvEventLoop::new();
     let mut sched = ~Scheduler::new(loop_);
+    let main_task = ~Coroutine::new(&mut sched.stack_pool, main);
 
-    let main_task = ~do Task::new(&mut sched.stack_pool) {
-
-        unsafe {
-            // `main` is an `fn() -> ()` that doesn't take an environment
-            // XXX: Could also call this as an `extern "Rust" fn` once they work
-            let main = Closure {
-                code: main as *(),
-                env: ptr::null(),
-            };
-            let mainfn: &fn() = cast::transmute(main);
-
-            mainfn();
-        }
-    };
-
-    sched.task_queue.push_back(main_task);
+    sched.enqueue_task(main_task);
     sched.run();
 
     return 0;
 }
 
+/// One-time runtime initialization. Currently all this does is set up logging
+/// based on the RUST_LOG environment variable.
+pub fn init(crate_map: *u8) {
+    logging::init(crate_map);
+}
+
 /// Possible contexts in which Rust code may be executing.
 /// Different runtime services are available depending on context.
+/// Mostly used for determining if we're using the new scheduler
+/// or the old scheduler.
 #[deriving(Eq)]
 pub enum RuntimeContext {
     // Only the exchange heap is available
@@ -84,6 +175,7 @@ pub enum RuntimeContext {
     OldTaskContext
 }
 
+/// Determine the current RuntimeContext
 pub fn context() -> RuntimeContext {
 
     use task::rt::rust_task;
@@ -118,26 +210,26 @@ pub fn context() -> RuntimeContext {
 #[test]
 fn test_context() {
     use unstable::run_in_bare_thread;
-    use self::sched::{local_sched, Task};
-    use self::uvio::UvEventLoop;
+    use self::sched::{local_sched, Coroutine};
+    use rt::uv::uvio::UvEventLoop;
     use cell::Cell;
 
     assert!(context() == OldTaskContext);
     do run_in_bare_thread {
         assert!(context() == GlobalContext);
         let mut sched = ~UvEventLoop::new_scheduler();
-        let task = ~do Task::new(&mut sched.stack_pool) {
+        let task = ~do Coroutine::new(&mut sched.stack_pool) {
             assert!(context() == TaskContext);
             let sched = local_sched::take();
             do sched.deschedule_running_task_and_then() |task| {
                 assert!(context() == SchedulerContext);
                 let task = Cell(task);
                 do local_sched::borrow |sched| {
-                    sched.task_queue.push_back(task.take());
+                    sched.enqueue_task(task.take());
                 }
             }
         };
-        sched.task_queue.push_back(task);
+        sched.enqueue_task(task);
         sched.run();
     }
 }
diff --git a/src/libcore/rt/rc.rs b/src/libcore/rt/rc.rs
new file mode 100644
index 00000000000..1c0c8c14fdf
--- /dev/null
+++ b/src/libcore/rt/rc.rs
@@ -0,0 +1,142 @@
+// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+//! An owned, task-local, reference counted type
+//!
+//! # Safety note
+//!
+//! XXX There is currently no type-system mechanism for enforcing that
+//! reference counted types are both allocated on the exchange heap
+//! and also non-sendable
+//!
+//! This doesn't prevent borrowing multiple aliasable mutable pointers
+
+use ops::Drop;
+use clone::Clone;
+use libc::c_void;
+use cast;
+
+pub struct RC<T> {
+    p: *c_void // ~(uint, T)
+}
+
+impl<T> RC<T> {
+    pub fn new(val: T) -> RC<T> {
+        unsafe {
+            let v = ~(1, val);
+            let p: *c_void = cast::transmute(v);
+            RC { p: p }
+        }
+    }
+
+    fn get_mut_state(&mut self) -> *mut (uint, T) {
+        unsafe {
+            let p: &mut ~(uint, T) = cast::transmute(&mut self.p);
+            let p: *mut (uint, T) = &mut **p;
+            return p;
+        }
+    }
+
+    fn get_state(&self) -> *(uint, T) {
+        unsafe {
+            let p: &~(uint, T) = cast::transmute(&self.p);
+            let p: *(uint, T) = &**p;
+            return p;
+        }
+    }
+
+    pub fn unsafe_borrow_mut(&mut self) -> *mut T {
+        unsafe {
+            match *self.get_mut_state() {
+                (_, ref mut p) => {
+                    let p: *mut T = p;
+                    return p;
+                }
+            }
+        }
+    }
+
+    pub fn refcount(&self) -> uint {
+        unsafe {
+            match *self.get_state() {
+                (count, _) => count
+            }
+        }
+    }
+}
+
+#[unsafe_destructor]
+impl<T> Drop for RC<T> {
+    fn finalize(&self) {
+        assert!(self.refcount() > 0);
+
+        unsafe {
+            // XXX: Mutable finalizer
+            let this: &mut RC<T> = cast::transmute_mut(self);
+
+            match *this.get_mut_state() {
+                (ref mut count, _) => {
+                    *count = *count - 1
+                }
+            }
+
+            if this.refcount() == 0 {
+                let _: ~(uint, T) = cast::transmute(this.p);
+            }
+        }
+    }
+}
+
+impl<T> Clone for RC<T> {
+    fn clone(&self) -> RC<T> {
+        unsafe {
+            // XXX: Mutable clone
+            let this: &mut RC<T> = cast::transmute_mut(self);
+
+            match *this.get_mut_state() {
+                (ref mut count, _) => {
+                    *count = *count + 1;
+                }
+            }
+        }
+
+        RC { p: self.p }
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use super::RC;
+
+    #[test]
+    fn smoke_test() {
+        unsafe {
+            let mut v1 = RC::new(100);
+            assert!(*v1.unsafe_borrow_mut() == 100);
+            assert!(v1.refcount() == 1);
+
+            let mut v2 = v1.clone();
+            assert!(*v2.unsafe_borrow_mut() == 100);
+            assert!(v2.refcount() == 2);
+
+            *v2.unsafe_borrow_mut() = 200;
+            assert!(*v2.unsafe_borrow_mut() == 200);
+            assert!(*v1.unsafe_borrow_mut() == 200);
+
+            let v3 = v2.clone();
+            assert!(v3.refcount() == 3);
+            {
+                let _v1 = v1;
+                let _v2 = v2;
+            }
+            assert!(v3.refcount() == 1);
+        }
+    }
+}
diff --git a/src/libcore/rt/rtio.rs b/src/libcore/rt/rtio.rs
index fd64438c61b..4b5eda22ff5 100644
--- a/src/libcore/rt/rtio.rs
+++ b/src/libcore/rt/rtio.rs
@@ -11,32 +11,35 @@
 use option::*;
 use result::*;
 
+use rt::io::IoError;
 use super::io::net::ip::IpAddr;
+use rt::uv::uvio;
 
 // XXX: ~object doesn't work currently so these are some placeholder
 // types to use instead
-pub type EventLoopObject = super::uvio::UvEventLoop;
-pub type IoFactoryObject = super::uvio::UvIoFactory;
-pub type StreamObject = super::uvio::UvStream;
-pub type TcpListenerObject = super::uvio::UvTcpListener;
+pub type EventLoopObject = uvio::UvEventLoop;
+pub type IoFactoryObject = uvio::UvIoFactory;
+pub type RtioTcpStreamObject = uvio::UvTcpStream;
+pub type RtioTcpListenerObject = uvio::UvTcpListener;
 
 pub trait EventLoop {
     fn run(&mut self);
     fn callback(&mut self, ~fn());
+    fn callback_ms(&mut self, ms: u64, ~fn());
     /// The asynchronous I/O services. Not all event loops may provide one
     fn io<'a>(&'a mut self) -> Option<&'a mut IoFactoryObject>;
 }
 
 pub trait IoFactory {
-    fn connect(&mut self, addr: IpAddr) -> Option<~StreamObject>;
-    fn bind(&mut self, addr: IpAddr) -> Option<~TcpListenerObject>;
+    fn tcp_connect(&mut self, addr: IpAddr) -> Result<~RtioTcpStreamObject, IoError>;
+    fn tcp_bind(&mut self, addr: IpAddr) -> Result<~RtioTcpListenerObject, IoError>;
 }
 
-pub trait TcpListener {
-    fn listen(&mut self) -> Option<~StreamObject>;
+pub trait RtioTcpListener {
+    fn accept(&mut self) -> Result<~RtioTcpStreamObject, IoError>;
 }
 
-pub trait Stream {
-    fn read(&mut self, buf: &mut [u8]) -> Result<uint, ()>;
-    fn write(&mut self, buf: &[u8]) -> Result<(), ()>;
+pub trait RtioTcpStream {
+    fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError>;
+    fn write(&mut self, buf: &[u8]) -> Result<(), IoError>;
 }
diff --git a/src/libcore/rt/sched/mod.rs b/src/libcore/rt/sched.rs
index dda1f27550f..5c1a3410087 100644
--- a/src/libcore/rt/sched/mod.rs
+++ b/src/libcore/rt/sched.rs
@@ -19,19 +19,15 @@ use super::context::Context;
 use super::local_services::LocalServices;
 use cell::Cell;
 
-#[cfg(test)] use super::uvio::UvEventLoop;
-#[cfg(test)] use unstable::run_in_bare_thread;
-#[cfg(test)] use int;
-
 // A more convenient name for external callers, e.g. `local_sched::take()`
 pub mod local_sched;
 
-/// The Scheduler is responsible for coordinating execution of Tasks
+/// The Scheduler is responsible for coordinating execution of Coroutines
 /// on a single thread. When the scheduler is running it is owned by
 /// thread local storage and the running task is owned by the
 /// scheduler.
 pub struct Scheduler {
-    task_queue: WorkQueue<~Task>,
+    priv work_queue: WorkQueue<~Coroutine>,
     stack_pool: StackPool,
     /// The event loop used to drive the scheduler and perform I/O
     event_loop: ~EventLoopObject,
@@ -39,7 +35,7 @@ pub struct Scheduler {
     /// Always valid when a task is executing, otherwise not
     priv saved_context: Context,
     /// The currently executing task
-    current_task: Option<~Task>,
+    current_task: Option<~Coroutine>,
     /// An action performed after a context switch on behalf of the
     /// code running before the context switch
     priv cleanup_job: Option<CleanupJob>
@@ -49,17 +45,17 @@ pub struct Scheduler {
 // complaining
 type UnsafeTaskReceiver = sys::Closure;
 trait ClosureConverter {
-    fn from_fn(&fn(~Task)) -> Self;
-    fn to_fn(self) -> &fn(~Task);
+    fn from_fn(&fn(~Coroutine)) -> Self;
+    fn to_fn(self) -> &fn(~Coroutine);
 }
 impl ClosureConverter for UnsafeTaskReceiver {
-    fn from_fn(f: &fn(~Task)) -> UnsafeTaskReceiver { unsafe { transmute(f) } }
-    fn to_fn(self) -> &fn(~Task) { unsafe { transmute(self) } }
+    fn from_fn(f: &fn(~Coroutine)) -> UnsafeTaskReceiver { unsafe { transmute(f) } }
+    fn to_fn(self) -> &fn(~Coroutine) { unsafe { transmute(self) } }
 }
 
 enum CleanupJob {
     DoNothing,
-    GiveTask(~Task, UnsafeTaskReceiver)
+    GiveTask(~Coroutine, UnsafeTaskReceiver)
 }
 
 pub impl Scheduler {
@@ -76,7 +72,7 @@ pub impl Scheduler {
 
         Scheduler {
             event_loop: event_loop,
-            task_queue: WorkQueue::new(),
+            work_queue: WorkQueue::new(),
             stack_pool: StackPool::new(),
             saved_context: Context::empty(),
             current_task: None,
@@ -91,43 +87,56 @@ pub impl Scheduler {
     fn run(~self) -> ~Scheduler {
         assert!(!self.in_task_context());
 
-        // Give ownership of the scheduler (self) to the thread
-        local_sched::put(self);
+        let mut self_sched = self;
 
         unsafe {
-            let scheduler = local_sched::unsafe_borrow();
-            fn run_scheduler_once() {
-                let scheduler = local_sched::take();
-                if scheduler.resume_task_from_queue() {
-                    // Ok, a task ran. Nice! We'll do it again later
-                    do local_sched::borrow |scheduler| {
-                        scheduler.event_loop.callback(run_scheduler_once);
-                    }
-                }
-            }
+            let event_loop: *mut ~EventLoopObject = {
+                let event_loop: *mut ~EventLoopObject = &mut self_sched.event_loop;
+                event_loop
+            };
+
+            // Give ownership of the scheduler (self) to the thread
+            local_sched::put(self_sched);
 
-            scheduler.event_loop.callback(run_scheduler_once);
-            scheduler.event_loop.run();
+            (*event_loop).run();
         }
 
-        return local_sched::take();
+        let sched = local_sched::take();
+        assert!(sched.work_queue.is_empty());
+        return sched;
+    }
+
+    /// Schedule a task to be executed later.
+    ///
+    /// Pushes the task onto the work stealing queue and tells the event loop
+    /// to run it later. Always use this instead of pushing to the work queue
+    /// directly.
+    fn enqueue_task(&mut self, task: ~Coroutine) {
+        self.work_queue.push_front(task);
+        self.event_loop.callback(resume_task_from_queue);
+
+        fn resume_task_from_queue() {
+            let scheduler = local_sched::take();
+            scheduler.resume_task_from_queue();
+        }
     }
 
     // * Scheduler-context operations
 
-    fn resume_task_from_queue(~self) -> bool {
+    fn resume_task_from_queue(~self) {
         assert!(!self.in_task_context());
 
+        rtdebug!("looking in work queue for task to schedule");
+
         let mut this = self;
-        match this.task_queue.pop_front() {
+        match this.work_queue.pop_front() {
             Some(task) => {
+                rtdebug!("resuming task from work queue");
                 this.resume_task_immediately(task);
-                return true;
             }
             None => {
                 rtdebug!("no tasks in queue");
                 local_sched::put(this);
-                return false;
             }
         }
     }
@@ -151,20 +160,20 @@ pub impl Scheduler {
         abort!("control reached end of task");
     }
 
-    fn schedule_new_task(~self, task: ~Task) {
+    fn schedule_new_task(~self, task: ~Coroutine) {
         assert!(self.in_task_context());
 
         do self.switch_running_tasks_and_then(task) |last_task| {
             let last_task = Cell(last_task);
             do local_sched::borrow |sched| {
-                sched.task_queue.push_front(last_task.take());
+                sched.enqueue_task(last_task.take());
             }
         }
     }
 
     // Core scheduling ops
 
-    fn resume_task_immediately(~self, task: ~Task) {
+    fn resume_task_immediately(~self, task: ~Coroutine) {
         let mut this = self;
         assert!(!this.in_task_context());
 
@@ -179,7 +188,7 @@ pub impl Scheduler {
         // Take pointers to both the task and scheduler's saved registers.
         unsafe {
             let sched = local_sched::unsafe_borrow();
-            let (sched_context, _, next_task_context) = sched.get_contexts();
+            let (sched_context, _, next_task_context) = (*sched).get_contexts();
             let next_task_context = next_task_context.unwrap();
             // Context switch to the task, restoring it's registers
             // and saving the scheduler's
@@ -187,10 +196,10 @@ pub impl Scheduler {
 
             let sched = local_sched::unsafe_borrow();
             // The running task should have passed ownership elsewhere
-            assert!(sched.current_task.is_none());
+            assert!((*sched).current_task.is_none());
 
             // Running tasks may have asked us to do some cleanup
-            sched.run_cleanup_job();
+            (*sched).run_cleanup_job();
         }
     }
 
@@ -202,40 +211,44 @@ pub impl Scheduler {
     /// The closure here is a *stack* closure that lives in the
     /// running task.  It gets transmuted to the scheduler's lifetime
     /// and called while the task is blocked.
-    fn deschedule_running_task_and_then(~self, f: &fn(~Task)) {
+    fn deschedule_running_task_and_then(~self, f: &fn(~Coroutine)) {
         let mut this = self;
         assert!(this.in_task_context());
 
         rtdebug!("blocking task");
 
-        let blocked_task = this.current_task.swap_unwrap();
-        let f_fake_region = unsafe { transmute::<&fn(~Task), &fn(~Task)>(f) };
-        let f_opaque = ClosureConverter::from_fn(f_fake_region);
-        this.enqueue_cleanup_job(GiveTask(blocked_task, f_opaque));
+        unsafe {
+            let blocked_task = this.current_task.swap_unwrap();
+            let f_fake_region = transmute::<&fn(~Coroutine), &fn(~Coroutine)>(f);
+            let f_opaque = ClosureConverter::from_fn(f_fake_region);
+            this.enqueue_cleanup_job(GiveTask(blocked_task, f_opaque));
+        }
 
         local_sched::put(this);
 
-        let sched = unsafe { local_sched::unsafe_borrow() };
-        let (sched_context, last_task_context, _) = sched.get_contexts();
-        let last_task_context = last_task_context.unwrap();
-        Context::swap(last_task_context, sched_context);
+        unsafe {
+            let sched = local_sched::unsafe_borrow();
+            let (sched_context, last_task_context, _) = (*sched).get_contexts();
+            let last_task_context = last_task_context.unwrap();
+            Context::swap(last_task_context, sched_context);
 
-        // We could be executing in a different thread now
-        let sched = unsafe { local_sched::unsafe_borrow() };
-        sched.run_cleanup_job();
+            // We could be executing in a different thread now
+            let sched = local_sched::unsafe_borrow();
+            (*sched).run_cleanup_job();
+        }
     }
 
     /// Switch directly to another task, without going through the scheduler.
     /// You would want to think hard about doing this, e.g. if there are
     /// pending I/O events it would be a bad idea.
-    fn switch_running_tasks_and_then(~self, next_task: ~Task, f: &fn(~Task)) {
+    fn switch_running_tasks_and_then(~self, next_task: ~Coroutine, f: &fn(~Coroutine)) {
         let mut this = self;
         assert!(this.in_task_context());
 
         rtdebug!("switching tasks");
 
         let old_running_task = this.current_task.swap_unwrap();
-        let f_fake_region = unsafe { transmute::<&fn(~Task), &fn(~Task)>(f) };
+        let f_fake_region = unsafe { transmute::<&fn(~Coroutine), &fn(~Coroutine)>(f) };
         let f_opaque = ClosureConverter::from_fn(f_fake_region);
         this.enqueue_cleanup_job(GiveTask(old_running_task, f_opaque));
         this.current_task = Some(next_task);
@@ -244,14 +257,14 @@ pub impl Scheduler {
 
         unsafe {
             let sched = local_sched::unsafe_borrow();
-            let (_, last_task_context, next_task_context) = sched.get_contexts();
+            let (_, last_task_context, next_task_context) = (*sched).get_contexts();
             let last_task_context = last_task_context.unwrap();
             let next_task_context = next_task_context.unwrap();
             Context::swap(last_task_context, next_task_context);
 
             // We could be executing in a different thread now
             let sched = local_sched::unsafe_borrow();
-            sched.run_cleanup_job();
+            (*sched).run_cleanup_job();
         }
     }
 
@@ -301,7 +314,7 @@ pub impl Scheduler {
         // because borrowck thinks the three patterns are conflicting
         // borrows
         unsafe {
-            let last_task = transmute::<Option<&Task>, Option<&mut Task>>(last_task);
+            let last_task = transmute::<Option<&Coroutine>, Option<&mut Coroutine>>(last_task);
             let last_task_context = match last_task {
                 Some(t) => Some(&mut t.saved_context), None => None
             };
@@ -316,9 +329,9 @@ pub impl Scheduler {
     }
 }
 
-static TASK_MIN_STACK_SIZE: uint = 10000000; // XXX: Too much stack
+static MIN_STACK_SIZE: uint = 10000000; // XXX: Too much stack
 
-pub struct Task {
+pub struct Coroutine {
     /// The segment of stack on which the task is currently running or,
     /// if the task is blocked, on which the task will resume execution
     priv current_stack_segment: StackSegment,
@@ -329,19 +342,19 @@ pub struct Task {
     local_services: LocalServices
 }
 
-pub impl Task {
-    fn new(stack_pool: &mut StackPool, start: ~fn()) -> Task {
-        Task::with_local(stack_pool, LocalServices::new(), start)
+pub impl Coroutine {
+    fn new(stack_pool: &mut StackPool, start: ~fn()) -> Coroutine {
+        Coroutine::with_local(stack_pool, LocalServices::new(), start)
     }
 
     fn with_local(stack_pool: &mut StackPool,
                   local_services: LocalServices,
-                  start: ~fn()) -> Task {
-        let start = Task::build_start_wrapper(start);
-        let mut stack = stack_pool.take_segment(TASK_MIN_STACK_SIZE);
+                  start: ~fn()) -> Coroutine {
+        let start = Coroutine::build_start_wrapper(start);
+        let mut stack = stack_pool.take_segment(MIN_STACK_SIZE);
         // NB: Context holds a pointer to that ~fn
         let initial_context = Context::new(start, &mut stack);
-        return Task {
+        return Coroutine {
             current_stack_segment: stack,
             saved_context: initial_context,
             local_services: local_services
@@ -356,10 +369,10 @@ pub impl Task {
             // have asked us to do some cleanup.
             unsafe {
                 let sched = local_sched::unsafe_borrow();
-                sched.run_cleanup_job();
+                (*sched).run_cleanup_job();
 
                 let sched = local_sched::unsafe_borrow();
-                let task = sched.current_task.get_mut_ref();
+                let task = (*sched).current_task.get_mut_ref();
                 // FIXME #6141: shouldn't neet to put `start()` in another closure
                 task.local_services.run(||start());
             }
@@ -373,125 +386,160 @@ pub impl Task {
     /// Destroy the task and try to reuse its components
     fn recycle(~self, stack_pool: &mut StackPool) {
         match self {
-            ~Task {current_stack_segment, _} => {
+            ~Coroutine {current_stack_segment, _} => {
                 stack_pool.give_segment(current_stack_segment);
             }
         }
     }
 }
 
-#[test]
-fn test_simple_scheduling() {
-    do run_in_bare_thread {
-        let mut task_ran = false;
-        let task_ran_ptr: *mut bool = &mut task_ran;
-
-        let mut sched = ~UvEventLoop::new_scheduler();
-        let task = ~do Task::new(&mut sched.stack_pool) {
-            unsafe { *task_ran_ptr = true; }
-        };
-        sched.task_queue.push_back(task);
-        sched.run();
-        assert!(task_ran);
+#[cfg(test)]
+mod test {
+    use int;
+    use cell::Cell;
+    use rt::uv::uvio::UvEventLoop;
+    use unstable::run_in_bare_thread;
+    use task::spawn;
+    use rt::test::*;
+    use super::*;
+
+    #[test]
+    fn test_simple_scheduling() {
+        do run_in_bare_thread {
+            let mut task_ran = false;
+            let task_ran_ptr: *mut bool = &mut task_ran;
+
+            let mut sched = ~UvEventLoop::new_scheduler();
+            let task = ~do Coroutine::new(&mut sched.stack_pool) {
+                unsafe { *task_ran_ptr = true; }
+            };
+            sched.enqueue_task(task);
+            sched.run();
+            assert!(task_ran);
+        }
     }
-}
 
-#[test]
-fn test_several_tasks() {
-    do run_in_bare_thread {
-        let total = 10;
-        let mut task_count = 0;
-        let task_count_ptr: *mut int = &mut task_count;
-
-        let mut sched = ~UvEventLoop::new_scheduler();
-        for int::range(0, total) |_| {
-            let task = ~do Task::new(&mut sched.stack_pool) {
-                unsafe { *task_count_ptr = *task_count_ptr + 1; }
-            };
-            sched.task_queue.push_back(task);
+    #[test]
+    fn test_several_tasks() {
+        do run_in_bare_thread {
+            let total = 10;
+            let mut task_count = 0;
+            let task_count_ptr: *mut int = &mut task_count;
+
+            let mut sched = ~UvEventLoop::new_scheduler();
+            for int::range(0, total) |_| {
+                let task = ~do Coroutine::new(&mut sched.stack_pool) {
+                    unsafe { *task_count_ptr = *task_count_ptr + 1; }
+                };
+                sched.enqueue_task(task);
+            }
+            sched.run();
+            assert!(task_count == total);
         }
-        sched.run();
-        assert!(task_count == total);
     }
-}
 
-#[test]
-fn test_swap_tasks_then() {
-    do run_in_bare_thread {
-        let mut count = 0;
-        let count_ptr: *mut int = &mut count;
-
-        let mut sched = ~UvEventLoop::new_scheduler();
-        let task1 = ~do Task::new(&mut sched.stack_pool) {
-            unsafe { *count_ptr = *count_ptr + 1; }
-            let mut sched = local_sched::take();
-            let task2 = ~do Task::new(&mut sched.stack_pool) {
+    #[test]
+    fn test_swap_tasks_then() {
+        do run_in_bare_thread {
+            let mut count = 0;
+            let count_ptr: *mut int = &mut count;
+
+            let mut sched = ~UvEventLoop::new_scheduler();
+            let task1 = ~do Coroutine::new(&mut sched.stack_pool) {
                 unsafe { *count_ptr = *count_ptr + 1; }
-            };
-            // Context switch directly to the new task
-            do sched.switch_running_tasks_and_then(task2) |task1| {
-                let task1 = Cell(task1);
-                do local_sched::borrow |sched| {
-                    sched.task_queue.push_front(task1.take());
+                let mut sched = local_sched::take();
+                let task2 = ~do Coroutine::new(&mut sched.stack_pool) {
+                    unsafe { *count_ptr = *count_ptr + 1; }
+                };
+                // Context switch directly to the new task
+                do sched.switch_running_tasks_and_then(task2) |task1| {
+                    let task1 = Cell(task1);
+                    do local_sched::borrow |sched| {
+                        sched.enqueue_task(task1.take());
+                    }
                 }
-            }
-            unsafe { *count_ptr = *count_ptr + 1; }
-        };
-        sched.task_queue.push_back(task1);
-        sched.run();
-        assert!(count == 3);
+                unsafe { *count_ptr = *count_ptr + 1; }
+            };
+            sched.enqueue_task(task1);
+            sched.run();
+            assert!(count == 3);
+        }
     }
-}
 
-#[bench] #[test] #[ignore(reason = "long test")]
-fn test_run_a_lot_of_tasks_queued() {
-    do run_in_bare_thread {
-        static MAX: int = 1000000;
-        let mut count = 0;
-        let count_ptr: *mut int = &mut count;
+    #[bench] #[test] #[ignore(reason = "long test")]
+    fn test_run_a_lot_of_tasks_queued() {
+        do run_in_bare_thread {
+            static MAX: int = 1000000;
+            let mut count = 0;
+            let count_ptr: *mut int = &mut count;
 
-        let mut sched = ~UvEventLoop::new_scheduler();
+            let mut sched = ~UvEventLoop::new_scheduler();
 
-        let start_task = ~do Task::new(&mut sched.stack_pool) {
-            run_task(count_ptr);
-        };
-        sched.task_queue.push_back(start_task);
-        sched.run();
+            let start_task = ~do Coroutine::new(&mut sched.stack_pool) {
+                run_task(count_ptr);
+            };
+            sched.enqueue_task(start_task);
+            sched.run();
 
-        assert!(count == MAX);
+            assert!(count == MAX);
 
-        fn run_task(count_ptr: *mut int) {
-            do local_sched::borrow |sched| {
-                let task = ~do Task::new(&mut sched.stack_pool) {
-                    unsafe {
-                        *count_ptr = *count_ptr + 1;
-                        if *count_ptr != MAX {
-                            run_task(count_ptr);
+            fn run_task(count_ptr: *mut int) {
+                do local_sched::borrow |sched| {
+                    let task = ~do Coroutine::new(&mut sched.stack_pool) {
+                        unsafe {
+                            *count_ptr = *count_ptr + 1;
+                            if *count_ptr != MAX {
+                                run_task(count_ptr);
+                            }
                         }
+                    };
+                    sched.enqueue_task(task);
+                }
+            };
+        }
+    }
+
+    #[test]
+    fn test_block_task() {
+        do run_in_bare_thread {
+            let mut sched = ~UvEventLoop::new_scheduler();
+            let task = ~do Coroutine::new(&mut sched.stack_pool) {
+                let sched = local_sched::take();
+                assert!(sched.in_task_context());
+                do sched.deschedule_running_task_and_then() |task| {
+                    let task = Cell(task);
+                    do local_sched::borrow |sched| {
+                        assert!(!sched.in_task_context());
+                        sched.enqueue_task(task.take());
                     }
-                };
-                sched.task_queue.push_back(task);
-            }
-        };
+                }
+            };
+            sched.enqueue_task(task);
+            sched.run();
+        }
     }
-}
 
-#[test]
-fn test_block_task() {
-    do run_in_bare_thread {
-        let mut sched = ~UvEventLoop::new_scheduler();
-        let task = ~do Task::new(&mut sched.stack_pool) {
-            let sched = local_sched::take();
-            assert!(sched.in_task_context());
-            do sched.deschedule_running_task_and_then() |task| {
-                let task = Cell(task);
-                do local_sched::borrow |sched| {
-                    assert!(!sched.in_task_context());
-                    sched.task_queue.push_back(task.take());
+    #[test]
+    fn test_io_callback() {
+        // This is a regression test that when there are no schedulable tasks
+        // in the work queue, but we are performing I/O, that once we do put
+        // something in the work queue again the scheduler picks it up and doesn't
+        // exit before emptying the work queue
+        do run_in_newsched_task {
+            do spawn {
+                let sched = local_sched::take();
+                do sched.deschedule_running_task_and_then |task| {
+                    let mut sched = local_sched::take();
+                    let task = Cell(task);
+                    do sched.event_loop.callback_ms(10) {
+                        rtdebug!("in callback");
+                        let mut sched = local_sched::take();
+                        sched.enqueue_task(task.take());
+                        local_sched::put(sched);
+                    }
+                    local_sched::put(sched);
                 }
             }
-        };
-        sched.task_queue.push_back(task);
-        sched.run();
+        }
     }
 }
diff --git a/src/libcore/rt/stack.rs b/src/libcore/rt/stack.rs
index 061d7dab9a1..cab9c3390b2 100644
--- a/src/libcore/rt/stack.rs
+++ b/src/libcore/rt/stack.rs
@@ -11,21 +11,36 @@
 use container::Container;
 use ptr::Ptr;
 use vec;
+use ops::Drop;
+use libc::{c_uint, uintptr_t};
 
 pub struct StackSegment {
-    buf: ~[u8]
+    buf: ~[u8],
+    valgrind_id: c_uint
 }
 
 pub impl StackSegment {
     fn new(size: uint) -> StackSegment {
-        // Crate a block of uninitialized values
-        let mut stack = vec::with_capacity(size);
         unsafe {
+            // Crate a block of uninitialized values
+            let mut stack = vec::with_capacity(size);
             vec::raw::set_len(&mut stack, size);
+
+            let mut stk = StackSegment {
+                buf: stack,
+                valgrind_id: 0
+            };
+
+            // XXX: Using the FFI to call a C macro. Slow
+            stk.valgrind_id = rust_valgrind_stack_register(stk.start(), stk.end());
+            return stk;
         }
+    }
 
-        StackSegment {
-            buf: stack
+    /// Point to the low end of the allocated stack
+    fn start(&self) -> *uint {
+        unsafe {
+            vec::raw::to_ptr(self.buf) as *uint
         }
     }
 
@@ -35,6 +50,15 @@ pub impl StackSegment {
     }
 }
 
+impl Drop for StackSegment {
+    fn finalize(&self) {
+        unsafe {
+            // XXX: Using the FFI to call a C macro. Slow
+            rust_valgrind_stack_deregister(self.valgrind_id);
+        }
+    }
+}
+
 pub struct StackPool(());
 
 impl StackPool {
@@ -47,3 +71,8 @@ impl StackPool {
     fn give_segment(&self, _stack: StackSegment) {
     }
 }
+
+extern {
+    fn rust_valgrind_stack_register(start: *uintptr_t, end: *uintptr_t) -> c_uint;
+    fn rust_valgrind_stack_deregister(id: c_uint);
+}
diff --git a/src/libcore/rt/test.rs b/src/libcore/rt/test.rs
index 0c6843c605d..1294b9bcf47 100644
--- a/src/libcore/rt/test.rs
+++ b/src/libcore/rt/test.rs
@@ -18,17 +18,17 @@ use rt::local_services::LocalServices;
 /// will abort the process.
 pub fn run_in_newsched_task(f: ~fn()) {
     use unstable::run_in_bare_thread;
-    use super::sched::Task;
-    use super::uvio::UvEventLoop;
+    use super::sched::Coroutine;
+    use rt::uv::uvio::UvEventLoop;
 
     let f = Cell(f);
 
     do run_in_bare_thread {
         let mut sched = ~UvEventLoop::new_scheduler();
-        let task = ~Task::with_local(&mut sched.stack_pool,
-                                     LocalServices::without_unwinding(),
-                                     f.take());
-        sched.task_queue.push_back(task);
+        let task = ~Coroutine::with_local(&mut sched.stack_pool,
+                                          LocalServices::without_unwinding(),
+                                          f.take());
+        sched.enqueue_task(task);
         sched.run();
     }
 }
@@ -38,9 +38,9 @@ pub fn spawntask(f: ~fn()) {
     use super::sched::*;
 
     let mut sched = local_sched::take();
-    let task = ~Task::with_local(&mut sched.stack_pool,
-                                 LocalServices::without_unwinding(),
-                                 f);
+    let task = ~Coroutine::with_local(&mut sched.stack_pool,
+                                      LocalServices::without_unwinding(),
+                                      f);
     do sched.switch_running_tasks_and_then(task) |task| {
         let task = Cell(task);
         let sched = local_sched::take();
@@ -53,17 +53,57 @@ pub fn spawntask_immediately(f: ~fn()) {
     use super::sched::*;
 
     let mut sched = local_sched::take();
-    let task = ~Task::with_local(&mut sched.stack_pool,
-                                 LocalServices::without_unwinding(),
-                                 f);
+    let task = ~Coroutine::with_local(&mut sched.stack_pool,
+                                      LocalServices::without_unwinding(),
+                                      f);
     do sched.switch_running_tasks_and_then(task) |task| {
         let task = Cell(task);
         do local_sched::borrow |sched| {
-            sched.task_queue.push_front(task.take());
+            sched.enqueue_task(task.take());
         }
     }
 }
 
+/// Create a new task and run it right now. Aborts on failure
+pub fn spawntask_later(f: ~fn()) {
+    use super::sched::*;
+
+    let mut sched = local_sched::take();
+    let task = ~Coroutine::with_local(&mut sched.stack_pool,
+                                      LocalServices::without_unwinding(),
+                                      f);
+
+    sched.enqueue_task(task);
+    local_sched::put(sched);
+}
+
+/// Spawn a task and either run it immediately or run it later
+pub fn spawntask_random(f: ~fn()) {
+    use super::sched::*;
+    use rand::{Rand, rng};
+
+    let mut rng = rng();
+    let run_now: bool = Rand::rand(&mut rng);
+
+    let mut sched = local_sched::take();
+    let task = ~Coroutine::with_local(&mut sched.stack_pool,
+                                      LocalServices::without_unwinding(),
+                                      f);
+
+    if run_now {
+        do sched.switch_running_tasks_and_then(task) |task| {
+            let task = Cell(task);
+            do local_sched::borrow |sched| {
+                sched.enqueue_task(task.take());
+            }
+        }
+    } else {
+        sched.enqueue_task(task);
+        local_sched::put(sched);
+    }
+}
+
+
 /// Spawn a task and wait for it to finish, returning whether it completed successfully or failed
 pub fn spawntask_try(f: ~fn()) -> Result<(), ()> {
     use cell::Cell;
@@ -82,7 +122,7 @@ pub fn spawntask_try(f: ~fn()) -> Result<(), ()> {
         let old_task = Cell(old_task);
         let f = f.take();
         let mut sched = local_sched::take();
-        let new_task = ~do Task::new(&mut sched.stack_pool) {
+        let new_task = ~do Coroutine::new(&mut sched.stack_pool) {
             do (|| {
                 (f.take())()
             }).finally {
@@ -92,7 +132,7 @@ pub fn spawntask_try(f: ~fn()) -> Result<(), ()> {
                 do sched.switch_running_tasks_and_then(old_task.take()) |new_task| {
                     let new_task = Cell(new_task);
                     do local_sched::borrow |sched| {
-                        sched.task_queue.push_front(new_task.take());
+                        sched.enqueue_task(new_task.take());
                     }
                 }
             }
diff --git a/src/libcore/rt/thread_local_storage.rs b/src/libcore/rt/thread_local_storage.rs
index 366996fb935..6a08c0f59b1 100644
--- a/src/libcore/rt/thread_local_storage.rs
+++ b/src/libcore/rt/thread_local_storage.rs
@@ -46,8 +46,11 @@ type pthread_key_t = ::libc::c_uint;
 
 #[cfg(unix)]
 extern {
+    #[fast_ffi]
     fn pthread_key_create(key: *mut pthread_key_t, dtor: *u8) -> c_int;
+    #[fast_ffi]
     fn pthread_setspecific(key: pthread_key_t, value: *mut c_void) -> c_int;
+    #[fast_ffi]
     fn pthread_getspecific(key: pthread_key_t) -> *mut c_void;
 }
 
diff --git a/src/libcore/rt/tube.rs b/src/libcore/rt/tube.rs
new file mode 100644
index 00000000000..bc9269f08fa
--- /dev/null
+++ b/src/libcore/rt/tube.rs
@@ -0,0 +1,184 @@
+// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+//! A very simple unsynchronized channel type for sending buffered data from
+//! scheduler context to task context.
+//!
+//! XXX: This would be safer to use if split into two types like Port/Chan
+
+use option::*;
+use clone::Clone;
+use super::rc::RC;
+use rt::sched::Coroutine;
+use rt::{context, TaskContext, SchedulerContext};
+use rt::local_sched;
+use vec::OwnedVector;
+use container::Container;
+
+struct TubeState<T> {
+    blocked_task: Option<~Coroutine>,
+    buf: ~[T]
+}
+
+pub struct Tube<T> {
+    p: RC<TubeState<T>>
+}
+
+impl<T> Tube<T> {
+    pub fn new() -> Tube<T> {
+        Tube {
+            p: RC::new(TubeState {
+                blocked_task: None,
+                buf: ~[]
+            })
+        }
+    }
+
+    pub fn send(&mut self, val: T) {
+        rtdebug!("tube send");
+        assert!(context() == SchedulerContext);
+
+        unsafe {
+            let state = self.p.unsafe_borrow_mut();
+            (*state).buf.push(val);
+
+            if (*state).blocked_task.is_some() {
+                // There's a waiting task. Wake it up
+                rtdebug!("waking blocked tube");
+                let task = (*state).blocked_task.swap_unwrap();
+                let sched = local_sched::take();
+                sched.resume_task_immediately(task);
+            }
+        }
+    }
+
+    pub fn recv(&mut self) -> T {
+        assert!(context() == TaskContext);
+
+        unsafe {
+            let state = self.p.unsafe_borrow_mut();
+            if !(*state).buf.is_empty() {
+                return (*state).buf.shift();
+            } else {
+                // Block and wait for the next message
+                rtdebug!("blocking on tube recv");
+                assert!(self.p.refcount() > 1); // There better be somebody to wake us up
+                assert!((*state).blocked_task.is_none());
+                let sched = local_sched::take();
+                do sched.deschedule_running_task_and_then |task| {
+                    (*state).blocked_task = Some(task);
+                }
+                rtdebug!("waking after tube recv");
+                let buf = &mut (*state).buf;
+                assert!(!buf.is_empty());
+                return buf.shift();
+            }
+        }
+    }
+}
+
+impl<T> Clone for Tube<T> {
+    fn clone(&self) -> Tube<T> {
+        Tube { p: self.p.clone() }
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use int;
+    use cell::Cell;
+    use rt::local_sched;
+    use rt::test::*;
+    use rt::rtio::EventLoop;
+    use super::*;
+
+    #[test]
+    fn simple_test() {
+        do run_in_newsched_task {
+            let mut tube: Tube<int> = Tube::new();
+            let tube_clone = tube.clone();
+            let tube_clone_cell = Cell(tube_clone);
+            let sched = local_sched::take();
+            do sched.deschedule_running_task_and_then |task| {
+                let mut tube_clone = tube_clone_cell.take();
+                tube_clone.send(1);
+                let sched = local_sched::take();
+                sched.resume_task_immediately(task);
+            }
+
+            assert!(tube.recv() == 1);
+        }
+    }
+
+    #[test]
+    fn blocking_test() {
+        do run_in_newsched_task {
+            let mut tube: Tube<int> = Tube::new();
+            let tube_clone = tube.clone();
+            let tube_clone = Cell(Cell(Cell(tube_clone)));
+            let sched = local_sched::take();
+            do sched.deschedule_running_task_and_then |task| {
+                let tube_clone = tube_clone.take();
+                do local_sched::borrow |sched| {
+                    let tube_clone = tube_clone.take();
+                    do sched.event_loop.callback {
+                        let mut tube_clone = tube_clone.take();
+                        // The task should be blocked on this now and
+                        // sending will wake it up.
+                        tube_clone.send(1);
+                    }
+                }
+                let sched = local_sched::take();
+                sched.resume_task_immediately(task);
+            }
+
+            assert!(tube.recv() == 1);
+        }
+    }
+
+    #[test]
+    fn many_blocking_test() {
+        static MAX: int = 100;
+
+        do run_in_newsched_task {
+            let mut tube: Tube<int> = Tube::new();
+            let tube_clone = tube.clone();
+            let tube_clone = Cell(tube_clone);
+            let sched = local_sched::take();
+            do sched.deschedule_running_task_and_then |task| {
+                callback_send(tube_clone.take(), 0);
+
+                fn callback_send(tube: Tube<int>, i: int) {
+                    if i == 100 { return; }
+
+                    let tube = Cell(Cell(tube));
+                    do local_sched::borrow |sched| {
+                        let tube = tube.take();
+                        do sched.event_loop.callback {
+                            let mut tube = tube.take();
+                            // The task should be blocked on this now and
+                            // sending will wake it up.
+                            tube.send(i);
+                            callback_send(tube, i + 1);
+                        }
+                    }
+                }
+
+                let sched = local_sched::take();
+                sched.resume_task_immediately(task);
+            }
+
+            for int::range(0, MAX) |i| {
+                let j = tube.recv();
+                assert!(j == i);
+            }
+        }
+    }
+}
diff --git a/src/libcore/rt/uv/file.rs b/src/libcore/rt/uv/file.rs
index a4aef7485d7..2d145055097 100644
--- a/src/libcore/rt/uv/file.rs
+++ b/src/libcore/rt/uv/file.rs
@@ -11,15 +11,11 @@
 use prelude::*;
 use ptr::null;
 use libc::c_void;
-use super::{UvError, Callback, Request, NativeHandle, Loop};
-use super::super::uvll;
-use super::super::uvll::*;
-
-pub type FsCallback = ~fn(FsRequest, Option<UvError>);
-impl Callback for FsCallback { }
+use rt::uv::{Request, NativeHandle, Loop, FsCallback};
+use rt::uv::uvll;
+use rt::uv::uvll::*;
 
 pub struct FsRequest(*uvll::uv_fs_t);
-
 impl Request for FsRequest;
 
 impl FsRequest {
diff --git a/src/libcore/rt/uv/idle.rs b/src/libcore/rt/uv/idle.rs
new file mode 100644
index 00000000000..2cf0b5c4872
--- /dev/null
+++ b/src/libcore/rt/uv/idle.rs
@@ -0,0 +1,91 @@
+// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+use libc::c_int;
+use option::Some;
+use rt::uv::uvll;
+use rt::uv::{Watcher, Loop, NativeHandle, IdleCallback, NullCallback};
+use rt::uv::status_to_maybe_uv_error;
+
+pub struct IdleWatcher(*uvll::uv_idle_t);
+impl Watcher for IdleWatcher { }
+
+pub impl IdleWatcher {
+    fn new(loop_: &mut Loop) -> IdleWatcher {
+        unsafe {
+            let handle = uvll::idle_new();
+            assert!(handle.is_not_null());
+            assert!(0 == uvll::idle_init(loop_.native_handle(), handle));
+            let mut watcher: IdleWatcher = NativeHandle::from_native_handle(handle);
+            watcher.install_watcher_data();
+            return watcher
+        }
+    }
+
+    fn start(&mut self, cb: IdleCallback) {
+        {
+            let data = self.get_watcher_data();
+            data.idle_cb = Some(cb);
+        }
+
+        unsafe {
+            assert!(0 == uvll::idle_start(self.native_handle(), idle_cb))
+        };
+
+        extern fn idle_cb(handle: *uvll::uv_idle_t, status: c_int) {
+            let mut idle_watcher: IdleWatcher = NativeHandle::from_native_handle(handle);
+            let data = idle_watcher.get_watcher_data();
+            let cb: &IdleCallback = data.idle_cb.get_ref();
+            let status = status_to_maybe_uv_error(handle, status);
+            (*cb)(idle_watcher, status);
+        }
+    }
+
+    fn stop(&mut self) {
+        // NB: Not resetting the Rust idle_cb to None here because `stop` is likely
+        // called from *within* the idle callback, causing a use after free
+
+        unsafe {
+            assert!(0 == uvll::idle_stop(self.native_handle()));
+        }
+    }
+
+    fn close(self, cb: NullCallback) {
+        {
+            let mut this = self;
+            let data = this.get_watcher_data();
+            assert!(data.close_cb.is_none());
+            data.close_cb = Some(cb);
+        }
+
+        unsafe { uvll::close(self.native_handle(), close_cb) };
+
+        extern fn close_cb(handle: *uvll::uv_idle_t) {
+            unsafe {
+                let mut idle_watcher: IdleWatcher = NativeHandle::from_native_handle(handle);
+                {
+                    let data = idle_watcher.get_watcher_data();
+                    data.close_cb.swap_unwrap()();
+                }
+                idle_watcher.drop_watcher_data();
+                uvll::idle_delete(handle);
+            }
+        }
+    }
+}
+
+impl NativeHandle<*uvll::uv_idle_t> for IdleWatcher {
+    fn from_native_handle(handle: *uvll::uv_idle_t) -> IdleWatcher {
+        IdleWatcher(handle)
+    }
+    fn native_handle(&self) -> *uvll::uv_idle_t {
+        match self { &IdleWatcher(ptr) => ptr }
+    }
+}
diff --git a/src/libcore/rt/uv/mod.rs b/src/libcore/rt/uv/mod.rs
index 1d93b327395..99a5252c88a 100644
--- a/src/libcore/rt/uv/mod.rs
+++ b/src/libcore/rt/uv/mod.rs
@@ -10,7 +10,7 @@
 
 /*!
 
-Bindings to libuv.
+Bindings to libuv, along with the default implementation of `core::rt::rtio`.
 
 UV types consist of the event loop (Loop), Watchers, Requests and
 Callbacks.
@@ -38,29 +38,46 @@ use container::Container;
 use option::*;
 use str::raw::from_c_str;
 use to_str::ToStr;
+use ptr::Ptr;
+use libc;
 use vec;
 use ptr;
-use ptr::Ptr;
+use cast;
+use str;
+use option::*;
+use str::raw::from_c_str;
+use to_str::ToStr;
 use libc::{c_void, c_int, size_t, malloc, free};
 use cast::transmute;
 use ptr::null;
-use super::uvll;
 use unstable::finally::Finally;
 
+use rt::io::IoError;
+
 #[cfg(test)] use unstable::run_in_bare_thread;
 
-pub use self::file::{FsRequest, FsCallback};
+pub use self::file::FsRequest;
 pub use self::net::{StreamWatcher, TcpWatcher};
-pub use self::net::{ReadCallback, AllocCallback, ConnectionCallback, ConnectCallback};
+pub use self::idle::IdleWatcher;
+pub use self::timer::TimerWatcher;
+
+/// The implementation of `rtio` for libuv
+pub mod uvio;
+
+/// C bindings to libuv
+pub mod uvll;
 
 pub mod file;
 pub mod net;
+pub mod idle;
+pub mod timer;
 
-/// A trait for callbacks to implement. Provides a little extra type safety
-/// for generic, unsafe interop functions like `set_watcher_callback`.
-pub trait Callback { }
-
-pub trait Request { }
+/// XXX: Loop(*handle) is buggy with destructors. Normal structs
+/// with dtors may not be destructured, but tuple structs can,
+/// but the results are not correct.
+pub struct Loop {
+    handle: *uvll::uv_loop_t
+}
 
 /// The trait implemented by uv 'watchers' (handles). Watchers are
 /// non-owning wrappers around the uv handles and are not completely
@@ -68,12 +85,9 @@ pub trait Request { }
 /// handle.  Watchers are generally created, then `start`ed, `stop`ed
 /// and `close`ed, but due to their complex life cycle may not be
 /// entirely memory safe if used in unanticipated patterns.
-pub trait Watcher {
-    fn event_loop(&self) -> Loop;
-}
+pub trait Watcher { }
 
-pub type NullCallback = ~fn();
-impl Callback for NullCallback { }
+pub trait Request { }
 
 /// A type that wraps a native handle
 pub trait NativeHandle<T> {
@@ -81,13 +95,6 @@ pub trait NativeHandle<T> {
     pub fn native_handle(&self) -> T;
 }
 
-/// XXX: Loop(*handle) is buggy with destructors. Normal structs
-/// with dtors may not be destructured, but tuple structs can,
-/// but the results are not correct.
-pub struct Loop {
-    handle: *uvll::uv_loop_t
-}
-
 pub impl Loop {
     fn new() -> Loop {
         let handle = unsafe { uvll::loop_new() };
@@ -113,64 +120,74 @@ impl NativeHandle<*uvll::uv_loop_t> for Loop {
     }
 }
 
-pub struct IdleWatcher(*uvll::uv_idle_t);
+// XXX: The uv alloc callback also has a *uv_handle_t arg
+pub type AllocCallback = ~fn(uint) -> Buf;
+pub type ReadCallback = ~fn(StreamWatcher, int, Buf, Option<UvError>);
+pub type NullCallback = ~fn();
+pub type IdleCallback = ~fn(IdleWatcher, Option<UvError>);
+pub type ConnectionCallback = ~fn(StreamWatcher, Option<UvError>);
+pub type FsCallback = ~fn(FsRequest, Option<UvError>);
+pub type TimerCallback = ~fn(TimerWatcher, Option<UvError>);
 
-impl Watcher for IdleWatcher {
-    fn event_loop(&self) -> Loop {
-        loop_from_watcher(self)
-    }
+
+/// Callbacks used by StreamWatchers, set as custom data on the foreign handle
+struct WatcherData {
+    read_cb: Option<ReadCallback>,
+    write_cb: Option<ConnectionCallback>,
+    connect_cb: Option<ConnectionCallback>,
+    close_cb: Option<NullCallback>,
+    alloc_cb: Option<AllocCallback>,
+    idle_cb: Option<IdleCallback>,
+    timer_cb: Option<TimerCallback>
 }
 
-pub type IdleCallback = ~fn(IdleWatcher, Option<UvError>);
-impl Callback for IdleCallback { }
+pub trait WatcherInterop {
+    fn event_loop(&self) -> Loop;
+    fn install_watcher_data(&mut self);
+    fn get_watcher_data<'r>(&'r mut self) -> &'r mut WatcherData;
+    fn drop_watcher_data(&mut self);
+}
 
-pub impl IdleWatcher {
-    fn new(loop_: &mut Loop) -> IdleWatcher {
+impl<H, W: Watcher + NativeHandle<*H>> WatcherInterop for W {
+    /// Get the uv event loop from a Watcher
+    pub fn event_loop(&self) -> Loop {
         unsafe {
-            let handle = uvll::idle_new();
-            assert!(handle.is_not_null());
-            assert!(0 == uvll::idle_init(loop_.native_handle(), handle));
-            uvll::set_data_for_uv_handle(handle, null::<()>());
-            NativeHandle::from_native_handle(handle)
+            let handle = self.native_handle();
+            let loop_ = uvll::get_loop_for_uv_handle(handle);
+            NativeHandle::from_native_handle(loop_)
         }
     }
 
-    fn start(&mut self, cb: IdleCallback) {
-
-        set_watcher_callback(self, cb);
+    pub fn install_watcher_data(&mut self) {
         unsafe {
-            assert!(0 == uvll::idle_start(self.native_handle(), idle_cb))
-        };
-
-        extern fn idle_cb(handle: *uvll::uv_idle_t, status: c_int) {
-            let idle_watcher: IdleWatcher = NativeHandle::from_native_handle(handle);
-            let cb: &IdleCallback = borrow_callback_from_watcher(&idle_watcher);
-            let status = status_to_maybe_uv_error(handle, status);
-            (*cb)(idle_watcher, status);
+            let data = ~WatcherData {
+                read_cb: None,
+                write_cb: None,
+                connect_cb: None,
+                close_cb: None,
+                alloc_cb: None,
+                idle_cb: None,
+                timer_cb: None
+            };
+            let data = transmute::<~WatcherData, *c_void>(data);
+            uvll::set_data_for_uv_handle(self.native_handle(), data);
         }
     }
 
-    fn stop(&mut self) {
-        unsafe { assert!(0 == uvll::idle_stop(self.native_handle())); }
-    }
-
-    fn close(self) {
-        unsafe { uvll::close(self.native_handle(), close_cb) };
-
-        extern fn close_cb(handle: *uvll::uv_idle_t) {
-            let mut idle_watcher = NativeHandle::from_native_handle(handle);
-            drop_watcher_callback::<uvll::uv_idle_t, IdleWatcher, IdleCallback>(&mut idle_watcher);
-            unsafe { uvll::idle_delete(handle) };
+    pub fn get_watcher_data<'r>(&'r mut self) -> &'r mut WatcherData {
+        unsafe {
+            let data = uvll::get_data_for_uv_handle(self.native_handle());
+            let data = transmute::<&*c_void, &mut ~WatcherData>(&data);
+            return &mut **data;
         }
     }
-}
 
-impl NativeHandle<*uvll::uv_idle_t> for IdleWatcher {
-    fn from_native_handle(handle: *uvll::uv_idle_t) -> IdleWatcher {
-        IdleWatcher(handle)
-    }
-    fn native_handle(&self) -> *uvll::uv_idle_t {
-        match self { &IdleWatcher(ptr) => ptr }
+    pub fn drop_watcher_data(&mut self) {
+        unsafe {
+            let data = uvll::get_data_for_uv_handle(self.native_handle());
+            let _data = transmute::<*c_void, ~WatcherData>(data);
+            uvll::set_data_for_uv_handle(self.native_handle(), null::<()>());
+        }
     }
 }
 
@@ -198,6 +215,10 @@ pub impl UvError {
             from_c_str(desc_str)
         }
     }
+
+    fn is_eof(&self) -> bool {
+        self.code == uvll::EOF
+    }
 }
 
 impl ToStr for UvError {
@@ -213,148 +234,74 @@ fn error_smoke_test() {
     assert!(err.to_str() == ~"EOF: end of file");
 }
 
-
-/// Given a uv handle, convert a callback status to a UvError
-// XXX: Follow the pattern below by parameterizing over T: Watcher, not T
-pub fn status_to_maybe_uv_error<T>(handle: *T, status: c_int) -> Option<UvError> {
-    if status != -1 {
-        None
-    } else {
-        unsafe {
-            rtdebug!("handle: %x", handle as uint);
-            let loop_ = uvll::get_loop_for_uv_handle(handle);
-            rtdebug!("loop: %x", loop_ as uint);
-            let err = uvll::last_error(loop_);
-            Some(UvError(err))
-        }
-    }
-}
-
-/// Get the uv event loop from a Watcher
-pub fn loop_from_watcher<H, W: Watcher + NativeHandle<*H>>(
-    watcher: &W) -> Loop {
-
-    let handle = watcher.native_handle();
-    let loop_ = unsafe { uvll::get_loop_for_uv_handle(handle) };
-    NativeHandle::from_native_handle(loop_)
-}
-
-/// Set the custom data on a handle to a callback Note: This is only
-/// suitable for watchers that make just one type of callback.  For
-/// others use WatcherData
-pub fn set_watcher_callback<H, W: Watcher + NativeHandle<*H>, CB: Callback>(
-    watcher: &mut W, cb: CB) {
-
-    drop_watcher_callback::<H, W, CB>(watcher);
-    // XXX: Boxing the callback so it fits into a
-    // pointer. Unfortunate extra allocation
-    let boxed_cb = ~cb;
-    let data = unsafe { transmute::<~CB, *c_void>(boxed_cb) };
-    unsafe { uvll::set_data_for_uv_handle(watcher.native_handle(), data) };
-}
-
-/// Delete a callback from a handle's custom data
-pub fn drop_watcher_callback<H, W: Watcher + NativeHandle<*H>, CB: Callback>(
-    watcher: &mut W) {
-
+pub fn last_uv_error<H, W: Watcher + NativeHandle<*H>>(watcher: &W) -> UvError {
     unsafe {
-        let handle = watcher.native_handle();
-        let handle_data: *c_void = uvll::get_data_for_uv_handle(handle);
-        if handle_data.is_not_null() {
-            // Take ownership of the callback and drop it
-            let _cb = transmute::<*c_void, ~CB>(handle_data);
-            // Make sure the pointer is zeroed
-            uvll::set_data_for_uv_handle(watcher.native_handle(), null::<()>());
-        }
+        let loop_ = watcher.event_loop();
+        UvError(uvll::last_error(loop_.native_handle()))
     }
 }
 
-/// Take a pointer to the callback installed as custom data
-pub fn borrow_callback_from_watcher<H, W: Watcher + NativeHandle<*H>,
-                                CB: Callback>(watcher: &W) -> &CB {
-
-    unsafe {
-        let handle = watcher.native_handle();
-        let handle_data: *c_void = uvll::get_data_for_uv_handle(handle);
-        assert!(handle_data.is_not_null());
-        let cb = transmute::<&*c_void, &~CB>(&handle_data);
-        return &**cb;
-    }
-}
+pub fn uv_error_to_io_error(uverr: UvError) -> IoError {
 
-/// Take ownership of the callback installed as custom data
-pub fn take_callback_from_watcher<H, W: Watcher + NativeHandle<*H>, CB: Callback>(
-    watcher: &mut W) -> CB {
+    // XXX: Could go in str::raw
+    unsafe fn c_str_to_static_slice(s: *libc::c_char) -> &'static str {
+        let s = s as *u8;
+        let mut curr = s, len = 0u;
+        while *curr != 0u8 {
+            len += 1u;
+            curr = ptr::offset(s, len);
+        }
 
-    unsafe {
-        let handle = watcher.native_handle();
-        let handle_data: *c_void = uvll::get_data_for_uv_handle(handle);
-        assert!(handle_data.is_not_null());
-        uvll::set_data_for_uv_handle(handle, null::<()>());
-        let cb: ~CB = transmute::<*c_void, ~CB>(handle_data);
-        let cb = match cb { ~cb => cb };
-        return cb;
+        str::raw::buf_as_slice(s, len, |d| cast::transmute(d))
     }
-}
 
-/// Callbacks used by StreamWatchers, set as custom data on the foreign handle
-struct WatcherData {
-    read_cb: Option<ReadCallback>,
-    write_cb: Option<ConnectionCallback>,
-    connect_cb: Option<ConnectionCallback>,
-    close_cb: Option<NullCallback>,
-    alloc_cb: Option<AllocCallback>,
-    buf: Option<Buf>
-}
 
-pub fn install_watcher_data<H, W: Watcher + NativeHandle<*H>>(watcher: &mut W) {
     unsafe {
-        let data = ~WatcherData {
-            read_cb: None,
-            write_cb: None,
-            connect_cb: None,
-            close_cb: None,
-            alloc_cb: None,
-            buf: None
+        // Importing error constants
+        use rt::uv::uvll::*;
+        use rt::io::*;
+
+        // uv error descriptions are static
+        let c_desc = uvll::strerror(&*uverr);
+        let desc = c_str_to_static_slice(c_desc);
+
+        let kind = match uverr.code {
+            UNKNOWN => OtherIoError,
+            OK => OtherIoError,
+            EOF => EndOfFile,
+            EACCES => PermissionDenied,
+            ECONNREFUSED => ConnectionRefused,
+            ECONNRESET => ConnectionReset,
+            EPIPE => BrokenPipe,
+            e => {
+                rtdebug!("e %u", e as uint);
+                // XXX: Need to map remaining uv error types
+                OtherIoError
+            }
         };
-        let data = transmute::<~WatcherData, *c_void>(data);
-        uvll::set_data_for_uv_handle(watcher.native_handle(), data);
-    }
-}
 
-pub fn get_watcher_data<'r, H, W: Watcher + NativeHandle<*H>>(
-    watcher: &'r mut W) -> &'r mut WatcherData {
-
-    unsafe {
-        let data = uvll::get_data_for_uv_handle(watcher.native_handle());
-        let data = transmute::<&*c_void, &mut ~WatcherData>(&data);
-        return &mut **data;
-    }
-}
-
-pub fn drop_watcher_data<H, W: Watcher + NativeHandle<*H>>(watcher: &mut W) {
-    unsafe {
-        let data = uvll::get_data_for_uv_handle(watcher.native_handle());
-        let _data = transmute::<*c_void, ~WatcherData>(data);
-        uvll::set_data_for_uv_handle(watcher.native_handle(), null::<()>());
+        IoError {
+            kind: kind,
+            desc: desc,
+            detail: None
+        }
     }
 }
 
-#[test]
-fn test_slice_to_uv_buf() {
-    let slice = [0, .. 20];
-    let buf = slice_to_uv_buf(slice);
-
-    assert!(buf.len == 20);
-
-    unsafe {
-        let base = transmute::<*u8, *mut u8>(buf.base);
-        (*base) = 1;
-        (*ptr::mut_offset(base, 1)) = 2;
+/// Given a uv handle, convert a callback status to a UvError
+// XXX: Follow the pattern below by parameterizing over T: Watcher, not T
+pub fn status_to_maybe_uv_error<T>(handle: *T, status: c_int) -> Option<UvError> {
+    if status != -1 {
+        None
+    } else {
+        unsafe {
+            rtdebug!("handle: %x", handle as uint);
+            let loop_ = uvll::get_loop_for_uv_handle(handle);
+            rtdebug!("loop: %x", loop_ as uint);
+            let err = uvll::last_error(loop_);
+            Some(UvError(err))
+        }
     }
-
-    assert!(slice[0] == 1);
-    assert!(slice[1] == 2);
 }
 
 /// The uv buffer type
@@ -395,6 +342,24 @@ pub fn vec_from_uv_buf(buf: Buf) -> Option<~[u8]> {
 }
 
 #[test]
+fn test_slice_to_uv_buf() {
+    let slice = [0, .. 20];
+    let buf = slice_to_uv_buf(slice);
+
+    assert!(buf.len == 20);
+
+    unsafe {
+        let base = transmute::<*u8, *mut u8>(buf.base);
+        (*base) = 1;
+        (*ptr::mut_offset(base, 1)) = 2;
+    }
+
+    assert!(slice[0] == 1);
+    assert!(slice[1] == 2);
+}
+
+
+#[test]
 fn loop_smoke_test() {
     do run_in_bare_thread {
         let mut loop_ = Loop::new();
@@ -409,7 +374,7 @@ fn idle_new_then_close() {
     do run_in_bare_thread {
         let mut loop_ = Loop::new();
         let idle_watcher = { IdleWatcher::new(&mut loop_) };
-        idle_watcher.close();
+        idle_watcher.close(||());
     }
 }
 
@@ -425,7 +390,7 @@ fn idle_smoke_test() {
             assert!(status.is_none());
             if unsafe { *count_ptr == 10 } {
                 idle_watcher.stop();
-                idle_watcher.close();
+                idle_watcher.close(||());
             } else {
                 unsafe { *count_ptr = *count_ptr + 1; }
             }
@@ -449,7 +414,7 @@ fn idle_start_stop_start() {
                 assert!(status.is_none());
                 let mut idle_watcher = idle_watcher;
                 idle_watcher.stop();
-                idle_watcher.close();
+                idle_watcher.close(||());
             }
         }
         loop_.run();
diff --git a/src/libcore/rt/uv/net.rs b/src/libcore/rt/uv/net.rs
index 3e6aa657c57..bdd5588014c 100644
--- a/src/libcore/rt/uv/net.rs
+++ b/src/libcore/rt/uv/net.rs
@@ -10,21 +10,15 @@
 
 use prelude::*;
 use libc::{size_t, ssize_t, c_int, c_void};
-use cast::transmute_mut_region;
-use super::super::uvll;
-use super::super::uvll::*;
-use super::{Loop, Watcher, Request, UvError, Buf, Callback, NativeHandle, NullCallback,
-            loop_from_watcher, status_to_maybe_uv_error,
-            install_watcher_data, get_watcher_data, drop_watcher_data,
-            vec_to_uv_buf, vec_from_uv_buf};
-use super::super::io::net::ip::{IpAddr, Ipv4, Ipv6};
-
-#[cfg(test)] use cell::Cell;
-#[cfg(test)] use unstable::run_in_bare_thread;
-#[cfg(test)] use super::super::thread::Thread;
-#[cfg(test)] use super::super::test::*;
-
-fn ip4_as_uv_ip4(addr: IpAddr, f: &fn(*sockaddr_in)) {
+use rt::uv::uvll;
+use rt::uv::uvll::*;
+use rt::uv::{AllocCallback, ConnectionCallback, ReadCallback};
+use rt::uv::{Loop, Watcher, Request, UvError, Buf, NativeHandle, NullCallback,
+             status_to_maybe_uv_error};
+use rt::io::net::ip::{IpAddr, Ipv4, Ipv6};
+use rt::uv::last_uv_error;
+
+fn ip4_as_uv_ip4<T>(addr: IpAddr, f: &fn(*sockaddr_in) -> T) -> T {
     match addr {
         Ipv4(a, b, c, d, p) => {
             unsafe {
@@ -34,7 +28,7 @@ fn ip4_as_uv_ip4(addr: IpAddr, f: &fn(*sockaddr_in)) {
                                                 c as uint,
                                                 d as uint), p as int);
                 do (|| {
-                    f(addr);
+                    f(addr)
                 }).finally {
                     free_ip4_addr(addr);
                 }
@@ -47,34 +41,23 @@ fn ip4_as_uv_ip4(addr: IpAddr, f: &fn(*sockaddr_in)) {
 // uv_stream t is the parent class of uv_tcp_t, uv_pipe_t, uv_tty_t
 // and uv_file_t
 pub struct StreamWatcher(*uvll::uv_stream_t);
-
-impl Watcher for StreamWatcher {
-    fn event_loop(&self) -> Loop {
-        loop_from_watcher(self)
-    }
-}
-
-pub type ReadCallback = ~fn(StreamWatcher, int, Buf, Option<UvError>);
-impl Callback for ReadCallback { }
-
-// XXX: The uv alloc callback also has a *uv_handle_t arg
-pub type AllocCallback = ~fn(uint) -> Buf;
-impl Callback for AllocCallback { }
+impl Watcher for StreamWatcher { }
 
 pub impl StreamWatcher {
 
     fn read_start(&mut self, alloc: AllocCallback, cb: ReadCallback) {
-        // XXX: Borrowchk problems
-        let data = get_watcher_data(unsafe { transmute_mut_region(self) });
-        data.alloc_cb = Some(alloc);
-        data.read_cb = Some(cb);
+        {
+            let data = self.get_watcher_data();
+            data.alloc_cb = Some(alloc);
+            data.read_cb = Some(cb);
+        }
 
         let handle = self.native_handle();
         unsafe { uvll::read_start(handle, alloc_cb, read_cb); }
 
         extern fn alloc_cb(stream: *uvll::uv_stream_t, suggested_size: size_t) -> Buf {
             let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(stream);
-            let data = get_watcher_data(&mut stream_watcher);
+            let data = stream_watcher.get_watcher_data();
             let alloc_cb = data.alloc_cb.get_ref();
             return (*alloc_cb)(suggested_size as uint);
         }
@@ -83,7 +66,7 @@ pub impl StreamWatcher {
             rtdebug!("buf addr: %x", buf.base as uint);
             rtdebug!("buf len: %d", buf.len as int);
             let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(stream);
-            let data = get_watcher_data(&mut stream_watcher);
+            let data = stream_watcher.get_watcher_data();
             let cb = data.read_cb.get_ref();
             let status = status_to_maybe_uv_error(stream, nread as c_int);
             (*cb)(stream_watcher, nread as int, buf, status);
@@ -98,22 +81,19 @@ pub impl StreamWatcher {
         unsafe { uvll::read_stop(handle); }
     }
 
-    // XXX: Needs to take &[u8], not ~[u8]
-    fn write(&mut self, msg: ~[u8], cb: ConnectionCallback) {
-        // XXX: Borrowck
-        let data = get_watcher_data(unsafe { transmute_mut_region(self) });
-        assert!(data.write_cb.is_none());
-        data.write_cb = Some(cb);
+    fn write(&mut self, buf: Buf, cb: ConnectionCallback) {
+        {
+            let data = self.get_watcher_data();
+            assert!(data.write_cb.is_none());
+            data.write_cb = Some(cb);
+        }
 
         let req = WriteRequest::new();
-        let buf = vec_to_uv_buf(msg);
-        assert!(data.buf.is_none());
-        data.buf = Some(buf);
         let bufs = [buf];
         unsafe {
             assert!(0 == uvll::write(req.native_handle(),
-                                          self.native_handle(),
-                                          bufs, write_cb));
+                                     self.native_handle(),
+                                     bufs, write_cb));
         }
 
         extern fn write_cb(req: *uvll::uv_write_t, status: c_int) {
@@ -121,8 +101,7 @@ pub impl StreamWatcher {
             let mut stream_watcher = write_request.stream();
             write_request.delete();
             let cb = {
-                let data = get_watcher_data(&mut stream_watcher);
-                let _vec = vec_from_uv_buf(data.buf.swap_unwrap());
+                let data = stream_watcher.get_watcher_data();
                 let cb = data.write_cb.swap_unwrap();
                 cb
             };
@@ -142,7 +121,7 @@ pub impl StreamWatcher {
     fn close(self, cb: NullCallback) {
         {
             let mut this = self;
-            let data = get_watcher_data(&mut this);
+            let data = this.get_watcher_data();
             assert!(data.close_cb.is_none());
             data.close_cb = Some(cb);
         }
@@ -152,9 +131,10 @@ pub impl StreamWatcher {
         extern fn close_cb(handle: *uvll::uv_stream_t) {
             let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(handle);
             {
-                get_watcher_data(&mut stream_watcher).close_cb.swap_unwrap()();
+                let data = stream_watcher.get_watcher_data();
+                data.close_cb.swap_unwrap()();
             }
-            drop_watcher_data(&mut stream_watcher);
+            stream_watcher.drop_watcher_data();
             unsafe { free_handle(handle as *c_void) }
         }
     }
@@ -171,15 +151,7 @@ impl NativeHandle<*uvll::uv_stream_t> for StreamWatcher {
 }
 
 pub struct TcpWatcher(*uvll::uv_tcp_t);
-
-impl Watcher for TcpWatcher {
-    fn event_loop(&self) -> Loop {
-        loop_from_watcher(self)
-    }
-}
-
-pub type ConnectionCallback = ~fn(StreamWatcher, Option<UvError>);
-impl Callback for ConnectionCallback { }
+impl Watcher for TcpWatcher { }
 
 pub impl TcpWatcher {
     fn new(loop_: &mut Loop) -> TcpWatcher {
@@ -187,21 +159,24 @@ pub impl TcpWatcher {
             let handle = malloc_handle(UV_TCP);
             assert!(handle.is_not_null());
             assert!(0 == uvll::tcp_init(loop_.native_handle(), handle));
-            let mut watcher = NativeHandle::from_native_handle(handle);
-            install_watcher_data(&mut watcher);
+            let mut watcher: TcpWatcher = NativeHandle::from_native_handle(handle);
+            watcher.install_watcher_data();
             return watcher;
         }
     }
 
-    fn bind(&mut self, address: IpAddr) {
+    fn bind(&mut self, address: IpAddr) -> Result<(), UvError> {
         match address {
             Ipv4(*) => {
                 do ip4_as_uv_ip4(address) |addr| {
                     let result = unsafe {
                         uvll::tcp_bind(self.native_handle(), addr)
                     };
-                    // XXX: bind is likely to fail. need real error handling
-                    assert!(result == 0);
+                    if result == 0 {
+                        Ok(())
+                    } else {
+                        Err(last_uv_error(self))
+                    }
                 }
             }
             _ => fail!()
@@ -210,8 +185,8 @@ pub impl TcpWatcher {
 
     fn connect(&mut self, address: IpAddr, cb: ConnectionCallback) {
         unsafe {
-            assert!(get_watcher_data(self).connect_cb.is_none());
-            get_watcher_data(self).connect_cb = Some(cb);
+            assert!(self.get_watcher_data().connect_cb.is_none());
+            self.get_watcher_data().connect_cb = Some(cb);
 
             let connect_handle = ConnectRequest::new().native_handle();
             match address {
@@ -232,7 +207,7 @@ pub impl TcpWatcher {
                 let mut stream_watcher = connect_request.stream();
                 connect_request.delete();
                 let cb: ConnectionCallback = {
-                    let data = get_watcher_data(&mut stream_watcher);
+                    let data = stream_watcher.get_watcher_data();
                     data.connect_cb.swap_unwrap()
                 };
                 let status = status_to_maybe_uv_error(stream_watcher.native_handle(), status);
@@ -242,10 +217,11 @@ pub impl TcpWatcher {
     }
 
     fn listen(&mut self, cb: ConnectionCallback) {
-        // XXX: Borrowck
-        let data = get_watcher_data(unsafe { transmute_mut_region(self) });
-        assert!(data.connect_cb.is_none());
-        data.connect_cb = Some(cb);
+        {
+            let data = self.get_watcher_data();
+            assert!(data.connect_cb.is_none());
+            data.connect_cb = Some(cb);
+        }
 
         unsafe {
             static BACKLOG: c_int = 128; // XXX should be configurable
@@ -257,9 +233,10 @@ pub impl TcpWatcher {
         extern fn connection_cb(handle: *uvll::uv_stream_t, status: c_int) {
             rtdebug!("connection_cb");
             let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(handle);
-            let cb = get_watcher_data(&mut stream_watcher).connect_cb.swap_unwrap();
-            let status = status_to_maybe_uv_error(stream_watcher.native_handle(), status);
-            cb(stream_watcher, status);
+            let data = stream_watcher.get_watcher_data();
+            let cb = data.connect_cb.get_ref();
+            let status = status_to_maybe_uv_error(handle, status);
+            (*cb)(stream_watcher, status);
         }
     }
 
@@ -277,12 +254,8 @@ impl NativeHandle<*uvll::uv_tcp_t> for TcpWatcher {
     }
 }
 
-pub type ConnectCallback = ~fn(ConnectRequest, Option<UvError>);
-impl Callback for ConnectCallback { }
-
 // uv_connect_t is a subclass of uv_req_t
 struct ConnectRequest(*uvll::uv_connect_t);
-
 impl Request for ConnectRequest { }
 
 impl ConnectRequest {
@@ -355,93 +328,109 @@ impl NativeHandle<*uvll::uv_write_t> for WriteRequest {
 }
 
 
-#[test]
-fn connect_close() {
-    do run_in_bare_thread() {
-        let mut loop_ = Loop::new();
-        let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
-        // Connect to a port where nobody is listening
-        let addr = next_test_ip4();
-        do tcp_watcher.connect(addr) |stream_watcher, status| {
-            rtdebug!("tcp_watcher.connect!");
-            assert!(status.is_some());
-            assert!(status.get().name() == ~"ECONNREFUSED");
-            stream_watcher.close(||());
+#[cfg(test)]
+mod test {
+    use super::*;
+    use util::ignore;
+    use cell::Cell;
+    use vec;
+    use unstable::run_in_bare_thread;
+    use rt::thread::Thread;
+    use rt::test::*;
+    use rt::uv::{Loop, AllocCallback};
+    use rt::uv::{vec_from_uv_buf, vec_to_uv_buf, slice_to_uv_buf};
+
+    #[test]
+    fn connect_close() {
+        do run_in_bare_thread() {
+            let mut loop_ = Loop::new();
+            let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
+            // Connect to a port where nobody is listening
+            let addr = next_test_ip4();
+            do tcp_watcher.connect(addr) |stream_watcher, status| {
+                rtdebug!("tcp_watcher.connect!");
+                assert!(status.is_some());
+                assert!(status.get().name() == ~"ECONNREFUSED");
+                stream_watcher.close(||());
+            }
+            loop_.run();
+            loop_.close();
         }
-        loop_.run();
-        loop_.close();
     }
-}
 
-#[test]
-fn listen() {
-    do run_in_bare_thread() {
-        static MAX: int = 10;
-        let mut loop_ = Loop::new();
-        let mut server_tcp_watcher = { TcpWatcher::new(&mut loop_) };
-        let addr = next_test_ip4();
-        server_tcp_watcher.bind(addr);
-        let loop_ = loop_;
-        rtdebug!("listening");
-        do server_tcp_watcher.listen |server_stream_watcher, status| {
-            rtdebug!("listened!");
-            assert!(status.is_none());
-            let mut server_stream_watcher = server_stream_watcher;
-            let mut loop_ = loop_;
-            let client_tcp_watcher = TcpWatcher::new(&mut loop_);
-            let mut client_tcp_watcher = client_tcp_watcher.as_stream();
-            server_stream_watcher.accept(client_tcp_watcher);
-            let count_cell = Cell(0);
-            let server_stream_watcher = server_stream_watcher;
-            rtdebug!("starting read");
-            let alloc: AllocCallback = |size| {
-                vec_to_uv_buf(vec::from_elem(size, 0))
-            };
-            do client_tcp_watcher.read_start(alloc)
-                |stream_watcher, nread, buf, status| {
-
-                rtdebug!("i'm reading!");
-                let buf = vec_from_uv_buf(buf);
-                let mut count = count_cell.take();
-                if status.is_none() {
-                    rtdebug!("got %d bytes", nread);
-                    let buf = buf.unwrap();
-                    for buf.slice(0, nread as uint).each |byte| {
-                        assert!(*byte == count as u8);
-                        rtdebug!("%u", *byte as uint);
-                        count += 1;
-                    }
-                } else {
-                    assert!(count == MAX);
-                    do stream_watcher.close {
-                        server_stream_watcher.close(||());
+    #[test]
+    fn listen() {
+        do run_in_bare_thread() {
+            static MAX: int = 10;
+            let mut loop_ = Loop::new();
+            let mut server_tcp_watcher = { TcpWatcher::new(&mut loop_) };
+            let addr = next_test_ip4();
+            server_tcp_watcher.bind(addr);
+            let loop_ = loop_;
+            rtdebug!("listening");
+            do server_tcp_watcher.listen |server_stream_watcher, status| {
+                rtdebug!("listened!");
+                assert!(status.is_none());
+                let mut server_stream_watcher = server_stream_watcher;
+                let mut loop_ = loop_;
+                let client_tcp_watcher = TcpWatcher::new(&mut loop_);
+                let mut client_tcp_watcher = client_tcp_watcher.as_stream();
+                server_stream_watcher.accept(client_tcp_watcher);
+                let count_cell = Cell(0);
+                let server_stream_watcher = server_stream_watcher;
+                rtdebug!("starting read");
+                let alloc: AllocCallback = |size| {
+                    vec_to_uv_buf(vec::from_elem(size, 0))
+                };
+                do client_tcp_watcher.read_start(alloc)
+                    |stream_watcher, nread, buf, status| {
+
+                    rtdebug!("i'm reading!");
+                    let buf = vec_from_uv_buf(buf);
+                    let mut count = count_cell.take();
+                    if status.is_none() {
+                        rtdebug!("got %d bytes", nread);
+                        let buf = buf.unwrap();
+                        for buf.slice(0, nread as uint).each |byte| {
+                            assert!(*byte == count as u8);
+                            rtdebug!("%u", *byte as uint);
+                            count += 1;
+                        }
+                    } else {
+                        assert!(count == MAX);
+                        do stream_watcher.close {
+                            server_stream_watcher.close(||());
+                        }
                     }
+                    count_cell.put_back(count);
                 }
-                count_cell.put_back(count);
             }
-        }
 
-        let _client_thread = do Thread::start {
-            rtdebug!("starting client thread");
-            let mut loop_ = Loop::new();
-            let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
-            do tcp_watcher.connect(addr) |stream_watcher, status| {
-                rtdebug!("connecting");
-                assert!(status.is_none());
-                let mut stream_watcher = stream_watcher;
-                let msg = ~[0, 1, 2, 3, 4, 5, 6 ,7 ,8, 9];
-                do stream_watcher.write(msg) |stream_watcher, status| {
-                    rtdebug!("writing");
+            let _client_thread = do Thread::start {
+                rtdebug!("starting client thread");
+                let mut loop_ = Loop::new();
+                let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
+                do tcp_watcher.connect(addr) |stream_watcher, status| {
+                    rtdebug!("connecting");
                     assert!(status.is_none());
-                    stream_watcher.close(||());
+                    let mut stream_watcher = stream_watcher;
+                    let msg = ~[0, 1, 2, 3, 4, 5, 6 ,7 ,8, 9];
+                    let buf = slice_to_uv_buf(msg);
+                    let msg_cell = Cell(msg);
+                    do stream_watcher.write(buf) |stream_watcher, status| {
+                        rtdebug!("writing");
+                        assert!(status.is_none());
+                        let msg_cell = Cell(msg_cell.take());
+                        stream_watcher.close(||ignore(msg_cell.take()));
+                    }
                 }
-            }
+                loop_.run();
+                loop_.close();
+            };
+
+            let mut loop_ = loop_;
             loop_.run();
             loop_.close();
-        };
-
-        let mut loop_ = loop_;
-        loop_.run();
-        loop_.close();
+        }
     }
 }
diff --git a/src/libcore/rt/uv/timer.rs b/src/libcore/rt/uv/timer.rs
new file mode 100644
index 00000000000..5557a580987
--- /dev/null
+++ b/src/libcore/rt/uv/timer.rs
@@ -0,0 +1,183 @@
+// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+use libc::{c_void, c_int};
+use option::Some;
+use rt::uv::uvll;
+use rt::uv::{Watcher, Loop, NativeHandle, TimerCallback, NullCallback};
+use rt::uv::status_to_maybe_uv_error;
+
+pub struct TimerWatcher(*uvll::uv_timer_t);
+impl Watcher for TimerWatcher { }
+
+impl TimerWatcher {
+    pub fn new(loop_: &mut Loop) -> TimerWatcher {
+        unsafe {
+            let handle = uvll::malloc_handle(uvll::UV_TIMER);
+            assert!(handle.is_not_null());
+            assert!(0 == uvll::timer_init(loop_.native_handle(), handle));
+            let mut watcher: TimerWatcher = NativeHandle::from_native_handle(handle);
+            watcher.install_watcher_data();
+            return watcher;
+        }
+    }
+
+    pub fn start(&mut self, timeout: u64, repeat: u64, cb: TimerCallback) {
+        {
+            let data = self.get_watcher_data();
+            data.timer_cb = Some(cb);
+        }
+
+        unsafe {
+            uvll::timer_start(self.native_handle(), timer_cb, timeout, repeat);
+        }
+
+        extern fn timer_cb(handle: *uvll::uv_timer_t, status: c_int) {
+            let mut watcher: TimerWatcher = NativeHandle::from_native_handle(handle);
+            let data = watcher.get_watcher_data();
+            let cb = data.timer_cb.get_ref();
+            let status = status_to_maybe_uv_error(handle, status);
+            (*cb)(watcher, status);
+        }
+    }
+
+    pub fn stop(&mut self) {
+        unsafe {
+            uvll::timer_stop(self.native_handle());
+        }
+    }
+
+    pub fn close(self, cb: NullCallback) {
+        let mut watcher = self;
+        {
+            let data = watcher.get_watcher_data();
+            assert!(data.close_cb.is_none());
+            data.close_cb = Some(cb);
+        }
+
+        unsafe {
+            uvll::close(watcher.native_handle(), close_cb);
+        }
+
+        extern fn close_cb(handle: *uvll::uv_timer_t) {
+            let mut watcher: TimerWatcher = NativeHandle::from_native_handle(handle);
+            {
+                let data = watcher.get_watcher_data();
+                data.close_cb.swap_unwrap()();
+            }
+            watcher.drop_watcher_data();
+            unsafe {
+                uvll::free_handle(handle as *c_void);
+            }
+        }
+    }
+}
+
+impl NativeHandle<*uvll::uv_timer_t> for TimerWatcher {
+    fn from_native_handle(handle: *uvll::uv_timer_t) -> TimerWatcher {
+        TimerWatcher(handle)
+    }
+    fn native_handle(&self) -> *uvll::uv_idle_t {
+        match self { &TimerWatcher(ptr) => ptr }
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use super::*;
+    use rt::uv::Loop;
+    use unstable::run_in_bare_thread;
+
+    #[test]
+    fn smoke_test() {
+        do run_in_bare_thread {
+            let mut count = 0;
+            let count_ptr: *mut int = &mut count;
+            let mut loop_ = Loop::new();
+            let mut timer = TimerWatcher::new(&mut loop_);
+            do timer.start(10, 0) |timer, status| {
+                assert!(status.is_none());
+                unsafe { *count_ptr += 1 };
+                timer.close(||());
+            }
+            loop_.run();
+            loop_.close();
+            assert!(count == 1);
+        }
+    }
+
+    #[test]
+    fn start_twice() {
+        do run_in_bare_thread {
+            let mut count = 0;
+            let count_ptr: *mut int = &mut count;
+            let mut loop_ = Loop::new();
+            let mut timer = TimerWatcher::new(&mut loop_);
+            do timer.start(10, 0) |timer, status| {
+                let mut timer = timer;
+                assert!(status.is_none());
+                unsafe { *count_ptr += 1 };
+                do timer.start(10, 0) |timer, status| {
+                    assert!(status.is_none());
+                    unsafe { *count_ptr += 1 };
+                    timer.close(||());
+                }
+            }
+            loop_.run();
+            loop_.close();
+            assert!(count == 2);
+        }
+    }
+
+    #[test]
+    fn repeat_stop() {
+        do run_in_bare_thread {
+            let mut count = 0;
+            let count_ptr: *mut int = &mut count;
+            let mut loop_ = Loop::new();
+            let mut timer = TimerWatcher::new(&mut loop_);
+            do timer.start(10, 20) |timer, status| {
+                assert!(status.is_none());
+                unsafe {
+                    *count_ptr += 1;
+
+                    if *count_ptr == 10 {
+
+                        // Stop the timer and do something else
+                        let mut timer = timer;
+                        timer.stop();
+                        // Freeze timer so it can be captured
+                        let timer = timer;
+
+                        let mut loop_ = timer.event_loop();
+                        let mut timer2 = TimerWatcher::new(&mut loop_);
+                        do timer2.start(10, 0) |timer2, _| {
+
+                            unsafe { *count_ptr += 1; }
+
+                            timer2.close(||());
+
+                            // Restart the original timer
+                            let mut timer = timer;
+                            do timer.start(10, 0) |timer, _| {
+                                unsafe { *count_ptr += 1; }
+                                timer.close(||());
+                            }
+                        }
+                    }
+                };
+            }
+            loop_.run();
+            loop_.close();
+            assert!(count == 12);
+        }
+    }
+
+}
diff --git a/src/libcore/rt/uvio.rs b/src/libcore/rt/uv/uvio.rs
index 24bffd8d1cd..ce4eb6aff87 100644
--- a/src/libcore/rt/uvio.rs
+++ b/src/libcore/rt/uv/uvio.rs
@@ -10,20 +10,24 @@
 
 use option::*;
 use result::*;
-
-use super::io::net::ip::IpAddr;
-use super::uv::*;
-use super::rtio::*;
 use ops::Drop;
 use old_iter::CopyableIter;
 use cell::{Cell, empty_cell};
 use cast::transmute;
-use super::sched::{Scheduler, local_sched};
+use clone::Clone;
+use rt::io::IoError;
+use rt::io::net::ip::IpAddr;
+use rt::uv::*;
+use rt::uv::idle::IdleWatcher;
+use rt::rtio::*;
+use rt::sched::{Scheduler, local_sched};
+use rt::io::{standard_error, OtherIoError};
+use rt::tube::Tube;
 
 #[cfg(test)] use container::Container;
 #[cfg(test)] use uint;
 #[cfg(test)] use unstable::run_in_bare_thread;
-#[cfg(test)] use super::test::*;
+#[cfg(test)] use rt::test::*;
 
 pub struct UvEventLoop {
     uvio: UvIoFactory
@@ -64,7 +68,16 @@ impl EventLoop for UvEventLoop {
             assert!(status.is_none());
             let mut idle_watcher = idle_watcher;
             idle_watcher.stop();
-            idle_watcher.close();
+            idle_watcher.close(||());
+            f();
+        }
+    }
+
+    fn callback_ms(&mut self, ms: u64, f: ~fn()) {
+        let mut timer =  TimerWatcher::new(self.uvio.uv_loop());
+        do timer.start(ms, 0) |timer, status| {
+            assert!(status.is_none());
+            timer.close(||());
             f();
         }
     }
@@ -100,11 +113,11 @@ impl IoFactory for UvIoFactory {
     // Connect to an address and return a new stream
     // NB: This blocks the task waiting on the connection.
     // It would probably be better to return a future
-    fn connect(&mut self, addr: IpAddr) -> Option<~StreamObject> {
+    fn tcp_connect(&mut self, addr: IpAddr) -> Result<~RtioTcpStreamObject, IoError> {
         // Create a cell in the task to hold the result. We will fill
         // the cell before resuming the task.
         let result_cell = empty_cell();
-        let result_cell_ptr: *Cell<Option<~StreamObject>> = &result_cell;
+        let result_cell_ptr: *Cell<Result<~RtioTcpStreamObject, IoError>> = &result_cell;
 
         let scheduler = local_sched::take();
         assert!(scheduler.in_task_context());
@@ -122,21 +135,26 @@ impl IoFactory for UvIoFactory {
             // Wait for a connection
             do tcp_watcher.connect(addr) |stream_watcher, status| {
                 rtdebug!("connect: in connect callback");
-                let maybe_stream = if status.is_none() {
+                if status.is_none() {
                     rtdebug!("status is none");
-                    Some(~UvStream(stream_watcher))
+                    let res = Ok(~UvTcpStream { watcher: stream_watcher });
+
+                    // Store the stream in the task's stack
+                    unsafe { (*result_cell_ptr).put_back(res); }
+
+                    // Context switch
+                    let scheduler = local_sched::take();
+                    scheduler.resume_task_immediately(task_cell.take());
                 } else {
                     rtdebug!("status is some");
-                    stream_watcher.close(||());
-                    None
+                    let task_cell = Cell(task_cell.take());
+                    do stream_watcher.close {
+                        let res = Err(uv_error_to_io_error(status.get()));
+                        unsafe { (*result_cell_ptr).put_back(res); }
+                        let scheduler = local_sched::take();
+                        scheduler.resume_task_immediately(task_cell.take());
+                    }
                 };
-
-                // Store the stream in the task's stack
-                unsafe { (*result_cell_ptr).put_back(maybe_stream); }
-
-                // Context switch
-                let scheduler = local_sched::take();
-                scheduler.resume_task_immediately(task_cell.take());
             }
         }
 
@@ -144,103 +162,124 @@ impl IoFactory for UvIoFactory {
         return result_cell.take();
     }
 
-    fn bind(&mut self, addr: IpAddr) -> Option<~TcpListenerObject> {
+    fn tcp_bind(&mut self, addr: IpAddr) -> Result<~RtioTcpListenerObject, IoError> {
         let mut watcher = TcpWatcher::new(self.uv_loop());
-        watcher.bind(addr);
-        return Some(~UvTcpListener(watcher));
+        match watcher.bind(addr) {
+            Ok(_) => Ok(~UvTcpListener::new(watcher)),
+            Err(uverr) => {
+                let scheduler = local_sched::take();
+                do scheduler.deschedule_running_task_and_then |task| {
+                    let task_cell = Cell(task);
+                    do watcher.as_stream().close {
+                        let scheduler = local_sched::take();
+                        scheduler.resume_task_immediately(task_cell.take());
+                    }
+                }
+                Err(uv_error_to_io_error(uverr))
+            }
+        }
     }
 }
 
-pub struct UvTcpListener(TcpWatcher);
+// FIXME #6090: Prefer newtype structs but Drop doesn't work
+pub struct UvTcpListener {
+    watcher: TcpWatcher,
+    listening: bool,
+    incoming_streams: Tube<Result<~RtioTcpStreamObject, IoError>>
+}
 
 impl UvTcpListener {
-    fn watcher(&self) -> TcpWatcher {
-        match self { &UvTcpListener(w) => w }
+    fn new(watcher: TcpWatcher) -> UvTcpListener {
+        UvTcpListener {
+            watcher: watcher,
+            listening: false,
+            incoming_streams: Tube::new()
+        }
     }
 
-    fn close(&self) {
-        // XXX: Need to wait until close finishes before returning
-        self.watcher().as_stream().close(||());
-    }
+    fn watcher(&self) -> TcpWatcher { self.watcher }
 }
 
 impl Drop for UvTcpListener {
     fn finalize(&self) {
-        // XXX: Again, this never gets called. Use .close() instead
-        //self.watcher().as_stream().close(||());
+        let watcher = self.watcher();
+        let scheduler = local_sched::take();
+        do scheduler.deschedule_running_task_and_then |task| {
+            let task_cell = Cell(task);
+            do watcher.as_stream().close {
+                let scheduler = local_sched::take();
+                scheduler.resume_task_immediately(task_cell.take());
+            }
+        }
     }
 }
 
-impl TcpListener for UvTcpListener {
+impl RtioTcpListener for UvTcpListener {
 
-    fn listen(&mut self) -> Option<~StreamObject> {
+    fn accept(&mut self) -> Result<~RtioTcpStreamObject, IoError> {
         rtdebug!("entering listen");
-        let result_cell = empty_cell();
-        let result_cell_ptr: *Cell<Option<~StreamObject>> = &result_cell;
-
-        let server_tcp_watcher = self.watcher();
 
-        let scheduler = local_sched::take();
-        assert!(scheduler.in_task_context());
+        if self.listening {
+            return self.incoming_streams.recv();
+        }
 
-        do scheduler.deschedule_running_task_and_then |task| {
-            let task_cell = Cell(task);
-            let mut server_tcp_watcher = server_tcp_watcher;
-            do server_tcp_watcher.listen |server_stream_watcher, status| {
-                let maybe_stream = if status.is_none() {
-                    let mut server_stream_watcher = server_stream_watcher;
-                    let mut loop_ = loop_from_watcher(&server_stream_watcher);
-                    let client_tcp_watcher = TcpWatcher::new(&mut loop_).as_stream();
-                    // XXX: Needs to be surfaced in interface
-                    server_stream_watcher.accept(client_tcp_watcher);
-                    Some(~UvStream::new(client_tcp_watcher))
-                } else {
-                    None
-                };
+        self.listening = true;
 
-                unsafe { (*result_cell_ptr).put_back(maybe_stream); }
+        let server_tcp_watcher = self.watcher();
+        let incoming_streams_cell = Cell(self.incoming_streams.clone());
+
+        let incoming_streams_cell = Cell(incoming_streams_cell.take());
+        let mut server_tcp_watcher = server_tcp_watcher;
+        do server_tcp_watcher.listen |server_stream_watcher, status| {
+            let maybe_stream = if status.is_none() {
+                let mut server_stream_watcher = server_stream_watcher;
+                let mut loop_ = server_stream_watcher.event_loop();
+                let client_tcp_watcher = TcpWatcher::new(&mut loop_);
+                let client_tcp_watcher = client_tcp_watcher.as_stream();
+                // XXX: Need's to be surfaced in interface
+                server_stream_watcher.accept(client_tcp_watcher);
+                Ok(~UvTcpStream { watcher: client_tcp_watcher })
+            } else {
+                Err(standard_error(OtherIoError))
+            };
 
-                rtdebug!("resuming task from listen");
-                // Context switch
-                let scheduler = local_sched::take();
-                scheduler.resume_task_immediately(task_cell.take());
-            }
+            let mut incoming_streams = incoming_streams_cell.take();
+            incoming_streams.send(maybe_stream);
+            incoming_streams_cell.put_back(incoming_streams);
         }
 
-        assert!(!result_cell.is_empty());
-        return result_cell.take();
+        return self.incoming_streams.recv();
     }
 }
 
-pub struct UvStream(StreamWatcher);
-
-impl UvStream {
-    fn new(watcher: StreamWatcher) -> UvStream {
-        UvStream(watcher)
-    }
-
-    fn watcher(&self) -> StreamWatcher {
-        match self { &UvStream(w) => w }
-    }
+// FIXME #6090: Prefer newtype structs but Drop doesn't work
+pub struct UvTcpStream {
+    watcher: StreamWatcher
+}
 
-    // XXX: finalize isn't working for ~UvStream???
-    fn close(&self) {
-        // XXX: Need to wait until this finishes before returning
-        self.watcher().close(||());
-    }
+impl UvTcpStream {
+    fn watcher(&self) -> StreamWatcher { self.watcher }
 }
 
-impl Drop for UvStream {
+impl Drop for UvTcpStream {
     fn finalize(&self) {
-        rtdebug!("closing stream");
-        //self.watcher().close(||());
+        rtdebug!("closing tcp stream");
+        let watcher = self.watcher();
+        let scheduler = local_sched::take();
+        do scheduler.deschedule_running_task_and_then |task| {
+            let task_cell = Cell(task);
+            do watcher.close {
+                let scheduler = local_sched::take();
+                scheduler.resume_task_immediately(task_cell.take());
+            }
+        }
     }
 }
 
-impl Stream for UvStream {
-    fn read(&mut self, buf: &mut [u8]) -> Result<uint, ()> {
+impl RtioTcpStream for UvTcpStream {
+    fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
         let result_cell = empty_cell();
-        let result_cell_ptr: *Cell<Result<uint, ()>> = &result_cell;
+        let result_cell_ptr: *Cell<Result<uint, IoError>> = &result_cell;
 
         let scheduler = local_sched::take();
         assert!(scheduler.in_task_context());
@@ -271,7 +310,7 @@ impl Stream for UvStream {
                     assert!(nread >= 0);
                     Ok(nread as uint)
                 } else {
-                    Err(())
+                    Err(uv_error_to_io_error(status.unwrap()))
                 };
 
                 unsafe { (*result_cell_ptr).put_back(result); }
@@ -285,9 +324,9 @@ impl Stream for UvStream {
         return result_cell.take();
     }
 
-    fn write(&mut self, buf: &[u8]) -> Result<(), ()> {
+    fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
         let result_cell = empty_cell();
-        let result_cell_ptr: *Cell<Result<(), ()>> = &result_cell;
+        let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
         let scheduler = local_sched::take();
         assert!(scheduler.in_task_context());
         let watcher = self.watcher();
@@ -295,14 +334,12 @@ impl Stream for UvStream {
         do scheduler.deschedule_running_task_and_then |task| {
             let mut watcher = watcher;
             let task_cell = Cell(task);
-            let buf = unsafe { &*buf_ptr };
-            // XXX: OMGCOPIES
-            let buf = buf.to_vec();
+            let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
             do watcher.write(buf) |_watcher, status| {
                 let result = if status.is_none() {
                     Ok(())
                 } else {
-                    Err(())
+                    Err(uv_error_to_io_error(status.unwrap()))
                 };
 
                 unsafe { (*result_cell_ptr).put_back(result); }
@@ -320,10 +357,12 @@ impl Stream for UvStream {
 #[test]
 fn test_simple_io_no_connect() {
     do run_in_newsched_task {
-        let io = unsafe { local_sched::unsafe_borrow_io() };
-        let addr = next_test_ip4();
-        let maybe_chan = io.connect(addr);
-        assert!(maybe_chan.is_none());
+        unsafe {
+            let io = local_sched::unsafe_borrow_io();
+            let addr = next_test_ip4();
+            let maybe_chan = (*io).tcp_connect(addr);
+            assert!(maybe_chan.is_err());
+        }
     }
 }
 
@@ -336,8 +375,8 @@ fn test_simple_tcp_server_and_client() {
         do spawntask_immediately {
             unsafe {
                 let io = local_sched::unsafe_borrow_io();
-                let mut listener = io.bind(addr).unwrap();
-                let mut stream = listener.listen().unwrap();
+                let mut listener = (*io).tcp_bind(addr).unwrap();
+                let mut stream = listener.accept().unwrap();
                 let mut buf = [0, .. 2048];
                 let nread = stream.read(buf).unwrap();
                 assert!(nread == 8);
@@ -345,17 +384,14 @@ fn test_simple_tcp_server_and_client() {
                     rtdebug!("%u", buf[i] as uint);
                     assert!(buf[i] == i as u8);
                 }
-                stream.close();
-                listener.close();
             }
         }
 
         do spawntask_immediately {
             unsafe {
                 let io = local_sched::unsafe_borrow_io();
-                let mut stream = io.connect(addr).unwrap();
+                let mut stream = (*io).tcp_connect(addr).unwrap();
                 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
-                stream.close();
             }
         }
     }
@@ -368,8 +404,8 @@ fn test_read_and_block() {
 
         do spawntask_immediately {
             let io = unsafe { local_sched::unsafe_borrow_io() };
-            let mut listener = io.bind(addr).unwrap();
-            let mut stream = listener.listen().unwrap();
+            let mut listener = unsafe { (*io).tcp_bind(addr).unwrap() };
+            let mut stream = listener.accept().unwrap();
             let mut buf = [0, .. 2048];
 
             let expected = 32;
@@ -392,26 +428,24 @@ fn test_read_and_block() {
                 do scheduler.deschedule_running_task_and_then |task| {
                     let task = Cell(task);
                     do local_sched::borrow |scheduler| {
-                        scheduler.task_queue.push_back(task.take());
+                        scheduler.enqueue_task(task.take());
                     }
                 }
             }
 
             // Make sure we had multiple reads
             assert!(reads > 1);
-
-            stream.close();
-            listener.close();
         }
 
         do spawntask_immediately {
-            let io = unsafe { local_sched::unsafe_borrow_io() };
-            let mut stream = io.connect(addr).unwrap();
-            stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
-            stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
-            stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
-            stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
-            stream.close();
+            unsafe {
+                let io = local_sched::unsafe_borrow_io();
+                let mut stream = (*io).tcp_connect(addr).unwrap();
+                stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
+                stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
+                stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
+                stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
+            }
         }
 
     }
@@ -426,34 +460,33 @@ fn test_read_read_read() {
         do spawntask_immediately {
             unsafe {
                 let io = local_sched::unsafe_borrow_io();
-                let mut listener = io.bind(addr).unwrap();
-                let mut stream = listener.listen().unwrap();
+                let mut listener = (*io).tcp_bind(addr).unwrap();
+                let mut stream = listener.accept().unwrap();
                 let buf = [1, .. 2048];
                 let mut total_bytes_written = 0;
                 while total_bytes_written < MAX {
                     stream.write(buf);
                     total_bytes_written += buf.len();
                 }
-                stream.close();
-                listener.close();
             }
         }
 
         do spawntask_immediately {
-            let io = unsafe { local_sched::unsafe_borrow_io() };
-            let mut stream = io.connect(addr).unwrap();
-            let mut buf = [0, .. 2048];
-            let mut total_bytes_read = 0;
-            while total_bytes_read < MAX {
-                let nread = stream.read(buf).unwrap();
-                rtdebug!("read %u bytes", nread as uint);
-                total_bytes_read += nread;
-                for uint::range(0, nread) |i| {
-                    assert!(buf[i] == 1);
+            unsafe {
+                let io = local_sched::unsafe_borrow_io();
+                let mut stream = (*io).tcp_connect(addr).unwrap();
+                let mut buf = [0, .. 2048];
+                let mut total_bytes_read = 0;
+                while total_bytes_read < MAX {
+                    let nread = stream.read(buf).unwrap();
+                    rtdebug!("read %u bytes", nread as uint);
+                    total_bytes_read += nread;
+                    for uint::range(0, nread) |i| {
+                        assert!(buf[i] == 1);
+                    }
                 }
+                rtdebug!("read %u bytes total", total_bytes_read as uint);
             }
-            rtdebug!("read %u bytes total", total_bytes_read as uint);
-            stream.close();
         }
     }
 }
diff --git a/src/libcore/rt/uvll.rs b/src/libcore/rt/uv/uvll.rs
index 0d298bde6b5..94e1703b263 100644
--- a/src/libcore/rt/uvll.rs
+++ b/src/libcore/rt/uv/uvll.rs
@@ -33,6 +33,15 @@ use libc::{size_t, c_int, c_uint, c_void, c_char, uintptr_t};
 use libc::{malloc, free};
 use prelude::*;
 
+pub static UNKNOWN: c_int = -1;
+pub static OK: c_int = 0;
+pub static EOF: c_int = 1;
+pub static EADDRINFO: c_int = 2;
+pub static EACCES: c_int = 3;
+pub static ECONNREFUSED: c_int = 12;
+pub static ECONNRESET: c_int = 13;
+pub static EPIPE: c_int = 36;
+
 pub struct uv_err_t {
     code: c_int,
     sys_errno_: c_int
@@ -260,9 +269,9 @@ pub unsafe fn buf_init(input: *u8, len: uint) -> uv_buf_t {
 pub unsafe fn timer_init(loop_ptr: *c_void, timer_ptr: *uv_timer_t) -> c_int {
     return rust_uv_timer_init(loop_ptr, timer_ptr);
 }
-pub unsafe fn timer_start(timer_ptr: *uv_timer_t, cb: *u8, timeout: uint,
-                          repeat: uint) -> c_int {
-    return rust_uv_timer_start(timer_ptr, cb, timeout as c_uint, repeat as c_uint);
+pub unsafe fn timer_start(timer_ptr: *uv_timer_t, cb: *u8, timeout: u64,
+                          repeat: u64) -> c_int {
+    return rust_uv_timer_start(timer_ptr, cb, timeout, repeat);
 }
 pub unsafe fn timer_stop(timer_ptr: *uv_timer_t) -> c_int {
     return rust_uv_timer_stop(timer_ptr);
@@ -423,8 +432,8 @@ extern {
                           timer_handle: *uv_timer_t) -> c_int;
     fn rust_uv_timer_start(timer_handle: *uv_timer_t,
                            cb: *u8,
-                           timeout: c_uint,
-                           repeat: c_uint) -> c_int;
+                           timeout: libc::uint64_t,
+                           repeat: libc::uint64_t) -> c_int;
     fn rust_uv_timer_stop(handle: *uv_timer_t) -> c_int;
 
     fn rust_uv_malloc_buf_base_of(sug_size: size_t) -> *u8;
diff --git a/src/libcore/rt/work_queue.rs b/src/libcore/rt/work_queue.rs
index 495cd75a0bf..f82b5847ef2 100644
--- a/src/libcore/rt/work_queue.rs
+++ b/src/libcore/rt/work_queue.rs
@@ -46,4 +46,8 @@ pub impl<T> WorkQueue<T> {
             None
         }
     }
+
+    fn is_empty(&self) -> bool {
+        return self.queue.is_empty();
+    }
 }
diff --git a/src/libcore/sys.rs b/src/libcore/sys.rs
index 4eca7ebbb37..50a739ec67d 100644
--- a/src/libcore/sys.rs
+++ b/src/libcore/sys.rs
@@ -202,10 +202,12 @@ impl FailWithCause for &'static str {
 
 // FIXME #4427: Temporary until rt::rt_fail_ goes away
 pub fn begin_unwind_(msg: *c_char, file: *c_char, line: size_t) -> ! {
-    use rt::{context, OldTaskContext};
-    use rt::local_services::unsafe_borrow_local_services;
+    use option::Option;
+    use rt::{context, OldTaskContext, TaskContext};
+    use rt::local_services::{unsafe_borrow_local_services, Unwinder};
 
-    match context() {
+    let context = context();
+    match context {
         OldTaskContext => {
             unsafe {
                 gc::cleanup_stack_for_failure();
@@ -214,11 +216,26 @@ pub fn begin_unwind_(msg: *c_char, file: *c_char, line: size_t) -> ! {
             }
         }
         _ => {
-            // XXX: Need to print the failure message
-            gc::cleanup_stack_for_failure();
             unsafe {
+                // XXX: Bad re-allocations. fail! needs some refactoring
+                let msg = str::raw::from_c_str(msg);
+                let file = str::raw::from_c_str(file);
+
+                let outmsg = fmt!("%s at line %i of file %s", msg, line as int, file);
+
+                // XXX: Logging doesn't work correctly in non-task context because it
+                // invokes the local heap
+                if context == TaskContext {
+                    error!(outmsg);
+                } else {
+                    rtdebug!("%s", outmsg);
+                }
+
+                gc::cleanup_stack_for_failure();
+
                 let local_services = unsafe_borrow_local_services();
-                match local_services.unwinder {
+                let unwinder: &mut Option<Unwinder> = &mut (*local_services).unwinder;
+                match *unwinder {
                     Some(ref mut unwinder) => unwinder.begin_unwind(),
                     None => abort!("failure without unwinder. aborting process")
                 }
diff --git a/src/libcore/task/local_data_priv.rs b/src/libcore/task/local_data_priv.rs
index 766815a5e90..be4e639e94c 100644
--- a/src/libcore/task/local_data_priv.rs
+++ b/src/libcore/task/local_data_priv.rs
@@ -36,7 +36,7 @@ impl Handle {
                 }
                 _ => {
                     let local_services = unsafe_borrow_local_services();
-                    NewHandle(&mut local_services.storage)
+                    NewHandle(&mut (*local_services).storage)
                 }
             }
         }
diff --git a/src/libcore/task/mod.rs b/src/libcore/task/mod.rs
index d57bd5528bc..a6edee38e18 100644
--- a/src/libcore/task/mod.rs
+++ b/src/libcore/task/mod.rs
@@ -43,6 +43,7 @@ use task::rt::{task_id, sched_id};
 use util;
 use util::replace;
 use unstable::finally::Finally;
+use rt::{context, OldTaskContext};
 
 #[cfg(test)] use comm::SharedChan;
 
@@ -558,23 +559,33 @@ pub fn get_scheduler() -> Scheduler {
  * ~~~
  */
 pub unsafe fn unkillable<U>(f: &fn() -> U) -> U {
-    let t = rt::rust_get_task();
-    do (|| {
-        rt::rust_task_inhibit_kill(t);
+    if context() == OldTaskContext {
+        let t = rt::rust_get_task();
+        do (|| {
+            rt::rust_task_inhibit_kill(t);
+            f()
+        }).finally {
+            rt::rust_task_allow_kill(t);
+        }
+    } else {
+        // FIXME #6377
         f()
-    }).finally {
-        rt::rust_task_allow_kill(t);
     }
 }
 
 /// The inverse of unkillable. Only ever to be used nested in unkillable().
 pub unsafe fn rekillable<U>(f: &fn() -> U) -> U {
-    let t = rt::rust_get_task();
-    do (|| {
-        rt::rust_task_allow_kill(t);
+    if context() == OldTaskContext {
+        let t = rt::rust_get_task();
+        do (|| {
+            rt::rust_task_allow_kill(t);
+            f()
+        }).finally {
+            rt::rust_task_inhibit_kill(t);
+        }
+    } else {
+        // FIXME #6377
         f()
-    }).finally {
-        rt::rust_task_inhibit_kill(t);
     }
 }
 
@@ -583,14 +594,19 @@ pub unsafe fn rekillable<U>(f: &fn() -> U) -> U {
  * For use with exclusive ARCs, which use pthread mutexes directly.
  */
 pub unsafe fn atomically<U>(f: &fn() -> U) -> U {
-    let t = rt::rust_get_task();
-    do (|| {
-        rt::rust_task_inhibit_kill(t);
-        rt::rust_task_inhibit_yield(t);
+    if context() == OldTaskContext {
+        let t = rt::rust_get_task();
+        do (|| {
+            rt::rust_task_inhibit_kill(t);
+            rt::rust_task_inhibit_yield(t);
+            f()
+        }).finally {
+            rt::rust_task_allow_yield(t);
+            rt::rust_task_allow_kill(t);
+        }
+    } else {
+        // FIXME #6377
         f()
-    }).finally {
-        rt::rust_task_allow_yield(t);
-        rt::rust_task_allow_kill(t);
     }
 }
 
diff --git a/src/libcore/task/spawn.rs b/src/libcore/task/spawn.rs
index fc38702bc16..5f9642604d0 100644
--- a/src/libcore/task/spawn.rs
+++ b/src/libcore/task/spawn.rs
@@ -581,7 +581,7 @@ fn spawn_raw_newsched(_opts: TaskOpts, f: ~fn()) {
     use rt::sched::*;
 
     let mut sched = local_sched::take();
-    let task = ~Task::new(&mut sched.stack_pool, f);
+    let task = ~Coroutine::new(&mut sched.stack_pool, f);
     sched.schedule_new_task(task);
 }
 
diff --git a/src/libcore/unstable/lang.rs b/src/libcore/unstable/lang.rs
index 8153c2d43d9..1249392484d 100644
--- a/src/libcore/unstable/lang.rs
+++ b/src/libcore/unstable/lang.rs
@@ -16,12 +16,12 @@ use libc::{c_char, c_uchar, c_void, size_t, uintptr_t, c_int, STDERR_FILENO};
 use managed::raw::BoxRepr;
 use str;
 use sys;
-use unstable::exchange_alloc;
 use cast::transmute;
 use rt::{context, OldTaskContext};
 use rt::local_services::borrow_local_services;
 use option::{Option, Some, None};
 use io;
+use rt::global_heap;
 
 #[allow(non_camel_case_types)]
 pub type rust_task = c_void;
@@ -153,7 +153,7 @@ unsafe fn fail_borrowed(box: *mut BoxRepr, file: *c_char, line: size_t) {
 #[lang="exchange_malloc"]
 #[inline(always)]
 pub unsafe fn exchange_malloc(td: *c_char, size: uintptr_t) -> *c_char {
-    transmute(exchange_alloc::malloc(transmute(td), transmute(size)))
+    transmute(global_heap::malloc(transmute(td), transmute(size)))
 }
 
 /// Because this code is so perf. sensitive, use a static constant so that
@@ -233,7 +233,7 @@ impl DebugPrints for io::fd_t {
 #[lang="exchange_free"]
 #[inline(always)]
 pub unsafe fn exchange_free(ptr: *c_char) {
-    exchange_alloc::free(transmute(ptr))
+    global_heap::free(transmute(ptr))
 }
 
 #[lang="malloc"]
@@ -423,18 +423,31 @@ pub unsafe fn strdup_uniq(ptr: *c_uchar, len: uint) -> ~str {
 #[lang="start"]
 pub fn start(main: *u8, argc: int, argv: **c_char,
              crate_map: *u8) -> int {
-    use libc::getenv;
-    use rt::start;
+    use rt;
+    use sys::Closure;
+    use ptr;
+    use cast;
+    use os;
 
     unsafe {
-        let use_old_rt = do str::as_c_str("RUST_NEWRT") |s| {
-            getenv(s).is_null()
-        };
+        let use_old_rt = os::getenv("RUST_NEWRT").is_none();
         if use_old_rt {
             return rust_start(main as *c_void, argc as c_int, argv,
                               crate_map as *c_void) as int;
         } else {
-            return start(main, argc, argv, crate_map);
+            return do rt::start(argc, argv as **u8, crate_map) {
+                unsafe {
+                    // `main` is an `fn() -> ()` that doesn't take an environment
+                    // XXX: Could also call this as an `extern "Rust" fn` once they work
+                    let main = Closure {
+                        code: main as *(),
+                        env: ptr::null(),
+                    };
+                    let mainfn: &fn() = cast::transmute(main);
+
+                    mainfn();
+                }
+            };
         }
     }
 
diff --git a/src/libcore/unstable/mod.rs b/src/libcore/unstable/mod.rs
index bef7a7f87d3..18a6262f17d 100644
--- a/src/libcore/unstable/mod.rs
+++ b/src/libcore/unstable/mod.rs
@@ -19,7 +19,6 @@ pub mod at_exit;
 pub mod global;
 pub mod finally;
 pub mod weak_task;
-pub mod exchange_alloc;
 pub mod intrinsics;
 pub mod simd;
 pub mod extfmt;
diff --git a/src/libstd/uv_ll.rs b/src/libstd/uv_ll.rs
index 37052f7d1b7..bc7703ec30a 100644
--- a/src/libstd/uv_ll.rs
+++ b/src/libstd/uv_ll.rs
@@ -819,8 +819,8 @@ extern {
     unsafe fn rust_uv_timer_start(
         timer_handle: *uv_timer_t,
         cb: *u8,
-        timeout: libc::c_uint,
-        repeat: libc::c_uint) -> libc::c_int;
+        timeout: libc::uint64_t,
+        repeat: libc::uint64_t) -> libc::c_int;
     unsafe fn rust_uv_timer_stop(handle: *uv_timer_t) -> libc::c_int;
 
     unsafe fn rust_uv_getaddrinfo(loop_ptr: *libc::c_void,
@@ -1084,8 +1084,8 @@ pub unsafe fn timer_init(loop_ptr: *libc::c_void,
 }
 pub unsafe fn timer_start(timer_ptr: *uv_timer_t, cb: *u8, timeout: uint,
                       repeat: uint) -> libc::c_int {
-    return rust_uv_timer_start(timer_ptr, cb, timeout as libc::c_uint,
-                                    repeat as libc::c_uint);
+    return rust_uv_timer_start(timer_ptr, cb, timeout as libc::uint64_t,
+                               repeat as libc::uint64_t);
 }
 pub unsafe fn timer_stop(timer_ptr: *uv_timer_t) -> libc::c_int {
     return rust_uv_timer_stop(timer_ptr);
diff --git a/src/rt/rust_builtin.cpp b/src/rt/rust_builtin.cpp
index 90328928122..1a64066b5a9 100644
--- a/src/rt/rust_builtin.cpp
+++ b/src/rt/rust_builtin.cpp
@@ -830,14 +830,14 @@ rust_get_rt_env() {
 }
 
 #ifndef _WIN32
-pthread_key_t sched_key;
+pthread_key_t rt_key = -1;
 #else
-DWORD sched_key;
+DWORD rt_key = -1;
 #endif
 
 extern "C" void*
-rust_get_sched_tls_key() {
-    return &sched_key;
+rust_get_rt_tls_key() {
+    return &rt_key;
 }
 
 // Initialize the global state required by the new scheduler
@@ -852,10 +852,10 @@ rust_initialize_global_state() {
     if (!initialized) {
 
 #ifndef _WIN32
-        assert(!pthread_key_create(&sched_key, NULL));
+        assert(!pthread_key_create(&rt_key, NULL));
 #else
-        sched_key = TlsAlloc();
-        assert(sched_key != TLS_OUT_OF_INDEXES);
+        rt_key = TlsAlloc();
+        assert(rt_key != TLS_OUT_OF_INDEXES);
 #endif
 
         initialized = true;
diff --git a/src/rt/rust_env.cpp b/src/rt/rust_env.cpp
index 360d6114928..ed38be3550f 100644
--- a/src/rt/rust_env.cpp
+++ b/src/rt/rust_env.cpp
@@ -13,6 +13,7 @@
 // that might come from the environment is loaded here, once, during
 // init.
 
+#include "sync/lock_and_signal.h"
 #include "rust_env.h"
 
 // The environment variables that the runtime knows about
@@ -26,6 +27,18 @@
 #define RUST_DEBUG_MEM "RUST_DEBUG_MEM"
 #define RUST_DEBUG_BORROW "RUST_DEBUG_BORROW"
 
+static lock_and_signal env_lock;
+
+extern "C" CDECL void
+rust_take_env_lock() {
+    env_lock.lock();
+}
+
+extern "C" CDECL void
+rust_drop_env_lock() {
+    env_lock.unlock();
+}
+
 #if defined(__WIN32__)
 static int
 get_num_cpus() {
@@ -119,6 +132,8 @@ copyenv(const char* name) {
 
 rust_env*
 load_env(int argc, char **argv) {
+    scoped_lock with(env_lock);
+
     rust_env *env = (rust_env*)malloc(sizeof(rust_env));
 
     env->num_sched_threads = (size_t)get_num_threads();
@@ -141,3 +156,4 @@ free_env(rust_env *env) {
     free(env->rust_seed);
     free(env);
 }
+
diff --git a/src/rt/rust_exchange_alloc.cpp b/src/rt/rust_exchange_alloc.cpp
index 5958c68f3e7..89257dc9f6e 100644
--- a/src/rt/rust_exchange_alloc.cpp
+++ b/src/rt/rust_exchange_alloc.cpp
@@ -15,14 +15,15 @@
 #include <string.h>
 #include <stdio.h>
 
-uintptr_t exchange_count = 0;
+extern uintptr_t rust_exchange_count;
+uintptr_t rust_exchange_count = 0;
 
 void *
 rust_exchange_alloc::malloc(size_t size) {
   void *value = ::malloc(size);
   assert(value);
 
-  sync::increment(exchange_count);
+  sync::increment(rust_exchange_count);
 
   return value;
 }
@@ -36,20 +37,15 @@ rust_exchange_alloc::realloc(void *ptr, size_t size) {
 
 void
 rust_exchange_alloc::free(void *ptr) {
-  sync::decrement(exchange_count);
+  sync::decrement(rust_exchange_count);
   ::free(ptr);
 }
 
-extern "C" uintptr_t *
-rust_get_exchange_count_ptr() {
-  return &exchange_count;
-}
-
 void
 rust_check_exchange_count_on_exit() {
-  if (exchange_count != 0) {
+  if (rust_exchange_count != 0) {
     printf("exchange heap not empty on exit\n");
-    printf("%d dangling allocations\n", (int)exchange_count);
+    printf("%d dangling allocations\n", (int)rust_exchange_count);
     abort();
   }
 }
diff --git a/src/rt/rust_log.cpp b/src/rt/rust_log.cpp
index c2b58c9fda7..df24f569495 100644
--- a/src/rt/rust_log.cpp
+++ b/src/rt/rust_log.cpp
@@ -324,6 +324,10 @@ void update_log_settings(void* crate_map, char* settings) {
     free(buffer);
 }
 
+extern "C" CDECL void
+rust_update_log_settings(void* crate_map, char* settings) {
+    update_log_settings(crate_map, settings);
+}
 
 //
 // Local Variables:
diff --git a/src/rt/rust_stack.cpp b/src/rt/rust_stack.cpp
index f07690a955e..a609ac57324 100644
--- a/src/rt/rust_stack.cpp
+++ b/src/rt/rust_stack.cpp
@@ -92,3 +92,14 @@ destroy_exchange_stack(rust_exchange_alloc *exchange, stk_seg *stk) {
     deregister_valgrind_stack(stk);
     exchange->free(stk);
 }
+
+
+extern "C" CDECL unsigned int
+rust_valgrind_stack_register(void *start, void *end) {
+  return VALGRIND_STACK_REGISTER(start, end);
+}
+
+extern "C" CDECL void
+rust_valgrind_stack_deregister(unsigned int id) {
+  VALGRIND_STACK_DEREGISTER(id);
+}
diff --git a/src/rt/rust_uv.cpp b/src/rt/rust_uv.cpp
index 8cf2bd4b4ac..fefcbbcacf7 100644
--- a/src/rt/rust_uv.cpp
+++ b/src/rt/rust_uv.cpp
@@ -229,7 +229,7 @@ rust_uv_timer_init(uv_loop_t* loop, uv_timer_t* timer) {
 
 extern "C" int
 rust_uv_timer_start(uv_timer_t* the_timer, uv_timer_cb cb,
-                        uint32_t timeout, uint32_t repeat) {
+                    int64_t timeout, int64_t repeat) {
     return uv_timer_start(the_timer, cb, timeout, repeat);
 }
 
diff --git a/src/rt/rustrt.def.in b/src/rt/rustrt.def.in
index 6be41251f1b..cdc282440b8 100644
--- a/src/rt/rustrt.def.in
+++ b/src/rt/rustrt.def.in
@@ -195,8 +195,8 @@ rust_register_exit_function
 rust_get_global_data_ptr
 rust_inc_kernel_live_count
 rust_dec_kernel_live_count
-rust_get_exchange_count_ptr
-rust_get_sched_tls_key
+rust_exchange_count
+rust_get_rt_tls_key
 swap_registers
 rust_readdir
 rust_opendir
@@ -234,3 +234,8 @@ rust_try
 rust_begin_unwind
 rust_take_task_borrow_list
 rust_set_task_borrow_list
+rust_valgrind_stack_register
+rust_valgrind_stack_deregister
+rust_take_env_lock
+rust_drop_env_lock
+rust_update_log_settings
diff --git a/src/test/run-pass/core-rt-smoke.rs b/src/test/run-pass/core-rt-smoke.rs
new file mode 100644
index 00000000000..fb08cda3b25
--- /dev/null
+++ b/src/test/run-pass/core-rt-smoke.rs
@@ -0,0 +1,18 @@
+// Copyright 2012 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+// A simple test of starting the runtime manually
+
+#[start]
+fn start(argc: int, argv: **u8, crate_map: *u8) -> int {
+    do core::rt::start(argc, argv, crate_map) {
+        debug!("creating my own runtime is joy");
+    }
+}
\ No newline at end of file