about summary refs log tree commit diff
path: root/src/libcore
diff options
context:
space:
mode:
authorBrian Anderson <banderson@mozilla.com>2013-05-14 15:30:01 -0700
committerBrian Anderson <banderson@mozilla.com>2013-05-14 15:30:01 -0700
commitb04fce6a901f490a9df378c64166dda26e0297a3 (patch)
treed4c9a3489ee38b1cd9c4318b54b32663aedf8c17 /src/libcore
parent043d02213e19c5a5cffb781e5a11accbe28bf0de (diff)
parentee0ce64d9db10aebc491454b6595d6edf69fe513 (diff)
downloadrust-b04fce6a901f490a9df378c64166dda26e0297a3.tar.gz
rust-b04fce6a901f490a9df378c64166dda26e0297a3.zip
Merge remote-tracking branch 'brson/io-upstream' into incoming
Conflicts:
	src/libcore/logging.rs
	src/libcore/rt/local_services.rs
	src/libcore/rt/uv/mod.rs
	src/libcore/rt/uv/net.rs
	src/libcore/rt/uv/uvio.rs
	src/libcore/unstable.rs
Diffstat (limited to 'src/libcore')
-rw-r--r--src/libcore/core.rc5
-rw-r--r--src/libcore/logging.rs63
-rw-r--r--src/libcore/macros.rs16
-rw-r--r--src/libcore/os.rs2
-rw-r--r--src/libcore/rt/context.rs6
-rw-r--r--src/libcore/rt/global_heap.rs (renamed from src/libcore/unstable/exchange_alloc.rs)0
-rw-r--r--src/libcore/rt/io/file.rs6
-rw-r--r--src/libcore/rt/io/mod.rs23
-rw-r--r--src/libcore/rt/io/native/file.rs8
-rw-r--r--src/libcore/rt/io/net/tcp.rs254
-rw-r--r--src/libcore/rt/io/net/udp.rs4
-rw-r--r--src/libcore/rt/io/net/unix.rs4
-rw-r--r--src/libcore/rt/io/stdio.rs9
-rw-r--r--src/libcore/rt/local_sched.rs (renamed from src/libcore/rt/sched/local_sched.rs)54
-rw-r--r--src/libcore/rt/local_services.rs36
-rw-r--r--src/libcore/rt/logging.rs38
-rw-r--r--src/libcore/rt/mod.rs126
-rw-r--r--src/libcore/rt/rc.rs142
-rw-r--r--src/libcore/rt/rtio.rs24
-rw-r--r--src/libcore/rt/sched.rs (renamed from src/libcore/rt/sched/mod.rs)43
-rw-r--r--src/libcore/rt/stack.rs39
-rw-r--r--src/libcore/rt/test.rs42
-rw-r--r--src/libcore/rt/tube.rs184
-rw-r--r--src/libcore/rt/uv/file.rs10
-rw-r--r--src/libcore/rt/uv/idle.rs91
-rw-r--r--src/libcore/rt/uv/mod.rs340
-rw-r--r--src/libcore/rt/uv/net.rs301
-rw-r--r--src/libcore/rt/uv/uvio.rs (renamed from src/libcore/rt/uvio.rs)280
-rw-r--r--src/libcore/rt/uv/uvll.rs (renamed from src/libcore/rt/uvll.rs)7
-rw-r--r--src/libcore/sys.rs29
-rw-r--r--src/libcore/task/local_data_priv.rs2
-rw-r--r--src/libcore/unstable/lang.rs6
-rw-r--r--src/libcore/unstable/mod.rs1
33 files changed, 1537 insertions, 658 deletions
diff --git a/src/libcore/core.rc b/src/libcore/core.rc
index 96b5e1b781d..eb94e9ca028 100644
--- a/src/libcore/core.rc
+++ b/src/libcore/core.rc
@@ -205,8 +205,11 @@ mod unicode;
 #[path = "num/cmath.rs"]
 mod cmath;
 mod stackwalk;
+
+// XXX: This shouldn't be pub, and it should be reexported under 'unstable'
+// but name resolution doesn't work without it being pub.
 #[path = "rt/mod.rs"]
-mod rt;
+pub mod rt;
 
 // A curious inner-module that's not exported that contains the binding
 // 'core' so that macro-expanded references to core::error and such
diff --git a/src/libcore/logging.rs b/src/libcore/logging.rs
index cea827298af..b192333999a 100644
--- a/src/libcore/logging.rs
+++ b/src/libcore/logging.rs
@@ -10,17 +10,16 @@
 
 //! Logging
 
-pub mod rustrt {
-    use libc;
-
-    pub extern {
-        unsafe fn rust_log_console_on();
-        unsafe fn rust_log_console_off();
-        unsafe fn rust_log_str(level: u32,
-                               string: *libc::c_char,
-                               size: libc::size_t);
-    }
-}
+use option::*;
+use either::*;
+use rt;
+use rt::logging::{Logger, StdErrLogger};
+use io;
+use libc;
+use repr;
+use vec;
+use cast;
+use str;
 
 /// Turns on logging to stdout globally
 pub fn console_on() {
@@ -55,8 +54,46 @@ pub fn log_type<T>(level: u32, object: &T) {
     let bytes = do io::with_bytes_writer |writer| {
         repr::write_repr(writer, object);
     };
+
+    match rt::context() {
+        rt::OldTaskContext => {
+            unsafe {
+                let len = bytes.len() as libc::size_t;
+                rustrt::rust_log_str(level, cast::transmute(vec::raw::to_ptr(bytes)), len);
+            }
+        }
+        _ => {
+            // XXX: Bad allocation
+            let msg = str::from_bytes(bytes);
+            newsched_log_str(msg);
+        }
+    }
+}
+
+fn newsched_log_str(msg: ~str) {
     unsafe {
-        let len = bytes.len() as libc::size_t;
-        rustrt::rust_log_str(level, transmute(vec::raw::to_ptr(bytes)), len);
+        match rt::local_services::unsafe_try_borrow_local_services() {
+            Some(local) => {
+                // Use the available logger
+                (*local).logger.log(Left(msg));
+            }
+            None => {
+                // There is no logger anywhere, just write to stderr
+                let mut logger = StdErrLogger;
+                logger.log(Left(msg));
+            }
+        }
+    }
+}
+
+pub mod rustrt {
+    use libc;
+
+    pub extern {
+        unsafe fn rust_log_console_on();
+        unsafe fn rust_log_console_off();
+        unsafe fn rust_log_str(level: u32,
+                               string: *libc::c_char,
+                               size: libc::size_t);
     }
 }
diff --git a/src/libcore/macros.rs b/src/libcore/macros.rs
index b19a753b715..fda48b6ffb7 100644
--- a/src/libcore/macros.rs
+++ b/src/libcore/macros.rs
@@ -30,10 +30,24 @@ macro_rules! rtdebug (
     ($( $arg:expr),+) => ( $(let _ = $arg)*; )
 )
 
+macro_rules! rtassert (
+    ( $arg:expr ) => ( {
+        if !$arg {
+            abort!("assertion failed: %s", stringify!($arg));
+        }
+    } )
+)
+
 macro_rules! abort(
     ($( $msg:expr),+) => ( {
         rtdebug!($($msg),+);
 
-        unsafe { ::libc::abort(); }
+        do_abort();
+
+        // NB: This is in a fn to avoid putting the `unsafe` block in a macro,
+        // which causes spurious 'unnecessary unsafe block' warnings.
+        fn do_abort() -> ! {
+            unsafe { ::libc::abort(); }
+        }
     } )
 )
diff --git a/src/libcore/os.rs b/src/libcore/os.rs
index 9129b33fff5..93319efa3b7 100644
--- a/src/libcore/os.rs
+++ b/src/libcore/os.rs
@@ -722,7 +722,7 @@ pub fn list_dir(p: &Path) -> ~[~str] {
             use os::win32::{
                 as_utf16_p
             };
-            use unstable::exchange_alloc::{malloc_raw, free_raw};
+            use rt::global_heap::{malloc_raw, free_raw};
             #[nolink]
             extern {
                 unsafe fn rust_list_dir_wfd_size() -> libc::size_t;
diff --git a/src/libcore/rt/context.rs b/src/libcore/rt/context.rs
index 9c1e566f218..9c1612884f0 100644
--- a/src/libcore/rt/context.rs
+++ b/src/libcore/rt/context.rs
@@ -111,9 +111,9 @@ fn initialize_call_frame(regs: &mut Registers, fptr: *c_void, arg: *c_void, sp:
     let sp = align_down(sp);
     let sp = mut_offset(sp, -4);
 
-    unsafe { *sp = arg as uint; }
+    unsafe { *sp = arg as uint };
     let sp = mut_offset(sp, -1);
-    unsafe { *sp = 0; } // The final return address
+    unsafe { *sp = 0 }; // The final return address
 
     regs.esp = sp as u32;
     regs.eip = fptr as u32;
@@ -195,7 +195,7 @@ fn initialize_call_frame(regs: &mut Registers, fptr: *c_void, arg: *c_void, sp:
 
 fn align_down(sp: *mut uint) -> *mut uint {
     unsafe {
-        let sp = transmute::<*mut uint, uint>(sp);
+        let sp: uint = transmute(sp);
         let sp = sp & !(16 - 1);
         transmute::<uint, *mut uint>(sp)
     }
diff --git a/src/libcore/unstable/exchange_alloc.rs b/src/libcore/rt/global_heap.rs
index 3b35c2fb804..3b35c2fb804 100644
--- a/src/libcore/unstable/exchange_alloc.rs
+++ b/src/libcore/rt/global_heap.rs
diff --git a/src/libcore/rt/io/file.rs b/src/libcore/rt/io/file.rs
index 85dc180452f..1f61cf25fbd 100644
--- a/src/libcore/rt/io/file.rs
+++ b/src/libcore/rt/io/file.rs
@@ -10,7 +10,7 @@
 
 use prelude::*;
 use super::support::PathLike;
-use super::{Reader, Writer, Seek, Close};
+use super::{Reader, Writer, Seek};
 use super::SeekStyle;
 
 /// # XXX
@@ -69,10 +69,6 @@ impl Seek for FileStream {
     fn seek(&mut self, _pos: i64, _style: SeekStyle) { fail!() }
 }
 
-impl Close for FileStream {
-    fn close(&mut self) { fail!() }
-}
-
 #[test]
 #[ignore]
 fn super_simple_smoke_test_lets_go_read_some_files_and_have_a_good_time() {
diff --git a/src/libcore/rt/io/mod.rs b/src/libcore/rt/io/mod.rs
index fea32bc5b75..8f56005d0a4 100644
--- a/src/libcore/rt/io/mod.rs
+++ b/src/libcore/rt/io/mod.rs
@@ -238,6 +238,7 @@ Out of scope
 * How does I/O relate to the Iterator trait?
 * std::base64 filters
 * Using conditions is a big unknown since we don't have much experience with them
+* Too many uses of OtherIoError
 
 */
 
@@ -252,7 +253,9 @@ pub use self::stdio::println;
 
 pub use self::file::FileStream;
 pub use self::net::ip::IpAddr;
+#[cfg(not(stage0))]
 pub use self::net::tcp::TcpListener;
+#[cfg(not(stage0))]
 pub use self::net::tcp::TcpStream;
 pub use self::net::udp::UdpStream;
 
@@ -266,6 +269,7 @@ pub mod file;
 
 /// Synchronous, non-blocking network I/O.
 pub mod net {
+    #[cfg(not(stage0))]
     pub mod tcp;
     pub mod udp;
     pub mod ip;
@@ -326,12 +330,14 @@ pub struct IoError {
 
 #[deriving(Eq)]
 pub enum IoErrorKind {
+    PreviousIoError,
+    OtherIoError,
+    EndOfFile,
     FileNotFound,
-    FilePermission,
+    PermissionDenied,
     ConnectionFailed,
     Closed,
-    OtherIoError,
-    PreviousIoError
+    ConnectionRefused,
 }
 
 // XXX: Can't put doc comments on macros
@@ -383,16 +389,7 @@ pub trait Writer {
     fn flush(&mut self);
 }
 
-/// I/O types that may be closed
-///
-/// Any further operations performed on a closed resource will raise
-/// on `io_error`
-pub trait Close {
-    /// Close the I/O resource
-    fn close(&mut self);
-}
-
-pub trait Stream: Reader + Writer + Close { }
+pub trait Stream: Reader + Writer { }
 
 pub enum SeekStyle {
     /// Seek from the beginning of the stream
diff --git a/src/libcore/rt/io/native/file.rs b/src/libcore/rt/io/native/file.rs
index e203df815f2..31c90336a24 100644
--- a/src/libcore/rt/io/native/file.rs
+++ b/src/libcore/rt/io/native/file.rs
@@ -40,10 +40,6 @@ impl Writer for FileDesc {
     fn flush(&mut self) { fail!() }
 }
 
-impl Close for FileDesc {
-    fn close(&mut self) { fail!() }
-}
-
 impl Seek for FileDesc {
     fn tell(&self) -> u64 { fail!() }
 
@@ -72,10 +68,6 @@ impl Writer for CFile {
     fn flush(&mut self) { fail!() }
 }
 
-impl Close for CFile {
-    fn close(&mut self) { fail!() }
-}
-
 impl Seek for CFile {
     fn tell(&self) -> u64 { fail!() }
     fn seek(&mut self, _pos: i64, _style: SeekStyle) { fail!() }
diff --git a/src/libcore/rt/io/net/tcp.rs b/src/libcore/rt/io/net/tcp.rs
index c95b4344fe7..b4c021ed28f 100644
--- a/src/libcore/rt/io/net/tcp.rs
+++ b/src/libcore/rt/io/net/tcp.rs
@@ -8,67 +8,273 @@
 // option. This file may not be copied, modified, or distributed
 // except according to those terms.
 
-use prelude::*;
-use super::super::*;
-use super::ip::IpAddr;
+use option::{Option, Some, None};
+use result::{Ok, Err};
+use rt::sched::local_sched::unsafe_borrow_io;
+use rt::io::net::ip::IpAddr;
+use rt::io::{Reader, Writer, Listener};
+use rt::io::io_error;
+use rt::rtio::{IoFactory,
+               RtioTcpListener, RtioTcpListenerObject,
+               RtioTcpStream, RtioTcpStreamObject};
 
-pub struct TcpStream;
+pub struct TcpStream {
+    rtstream: ~RtioTcpStreamObject
+}
 
 impl TcpStream {
-    pub fn connect(_addr: IpAddr) -> Option<TcpStream> {
-        fail!()
+    fn new(s: ~RtioTcpStreamObject) -> TcpStream {
+        TcpStream {
+            rtstream: s
+        }
+    }
+
+    pub fn connect(addr: IpAddr) -> Option<TcpStream> {
+        let stream = unsafe {
+            rtdebug!("borrowing io to connect");
+            let io = unsafe_borrow_io();
+            rtdebug!("about to connect");
+            (*io).tcp_connect(addr)
+        };
+
+        match stream {
+            Ok(s) => {
+                Some(TcpStream::new(s))
+            }
+            Err(ioerr) => {
+                rtdebug!("failed to connect: %?", ioerr);
+                io_error::cond.raise(ioerr);
+                return None;
+            }
+        }
     }
 }
 
 impl Reader for TcpStream {
-    fn read(&mut self, _buf: &mut [u8]) -> Option<uint> { fail!() }
+    fn read(&mut self, buf: &mut [u8]) -> Option<uint> {
+        let bytes_read = self.rtstream.read(buf);
+        match bytes_read {
+            Ok(read) => Some(read),
+            Err(_) => {
+                abort!("XXX");
+            }
+        }
+    }
 
     fn eof(&mut self) -> bool { fail!() }
 }
 
 impl Writer for TcpStream {
-    fn write(&mut self, _buf: &[u8]) { fail!() }
+    fn write(&mut self, buf: &[u8]) {
+        let res = self.rtstream.write(buf);
+        match res {
+            Ok(_) => (),
+            Err(_) => {
+                abort!("XXX");
+            }
+        }
+    }
 
     fn flush(&mut self) { fail!() }
 }
 
-impl Close for TcpStream {
-    fn close(&mut self) { fail!() }
+pub struct TcpListener {
+    rtlistener: ~RtioTcpListenerObject,
 }
 
-pub struct TcpListener;
-
 impl TcpListener {
-    pub fn bind(_addr: IpAddr) -> Option<TcpListener> {
-        fail!()
+    pub fn bind(addr: IpAddr) -> Option<TcpListener> {
+        let listener = unsafe { (*unsafe_borrow_io()).tcp_bind(addr) };
+        match listener {
+            Ok(l) => {
+                Some(TcpListener {
+                    rtlistener: l
+                })
+            }
+            Err(ioerr) => {
+                io_error::cond.raise(ioerr);
+                return None;
+            }
+        }
     }
 }
 
 impl Listener<TcpStream> for TcpListener {
-    fn accept(&mut self) -> Option<TcpStream> { fail!() }
+    fn accept(&mut self) -> Option<TcpStream> {
+        let rtstream = self.rtlistener.accept();
+        match rtstream {
+            Ok(s) => {
+                Some(TcpStream::new(s))
+            }
+            Err(_) => {
+                abort!("XXX");
+            }
+        }
+    }
 }
 
 #[cfg(test)]
 mod test {
+    use super::*;
+    use int;
+    use cell::Cell;
+    use rt::test::*;
+    use rt::io::net::ip::Ipv4;
+    use rt::io::*;
 
     #[test] #[ignore]
+    fn bind_error() {
+        do run_in_newsched_task {
+            let mut called = false;
+            do io_error::cond.trap(|e| {
+                assert!(e.kind == PermissionDenied);
+                called = true;
+            }).in {
+                let addr = Ipv4(0, 0, 0, 0, 1);
+                let listener = TcpListener::bind(addr);
+                assert!(listener.is_none());
+            }
+            assert!(called);
+        }
+    }
+
+    #[test]
+    fn connect_error() {
+        do run_in_newsched_task {
+            let mut called = false;
+            do io_error::cond.trap(|e| {
+                assert!(e.kind == ConnectionRefused);
+                called = true;
+            }).in {
+                let addr = Ipv4(0, 0, 0, 0, 1);
+                let stream = TcpStream::connect(addr);
+                assert!(stream.is_none());
+            }
+            assert!(called);
+        }
+    }
+
+    #[test]
     fn smoke_test() {
-        /*do run_in_newsched_task {
+        do run_in_newsched_task {
             let addr = next_test_ip4();
 
-            do spawn_immediately {
-                let listener = TcpListener::bind(addr);
-                do listener.accept() {
+            do spawntask_immediately {
+                let mut listener = TcpListener::bind(addr);
+                let mut stream = listener.accept();
+                let mut buf = [0];
+                stream.read(buf);
+                assert!(buf[0] == 99);
+            }
+
+            do spawntask_immediately {
+                let mut stream = TcpStream::connect(addr);
+                stream.write([99]);
+            }
+        }
+    }
+
+    #[test]
+    fn multiple_connect_serial() {
+        do run_in_newsched_task {
+            let addr = next_test_ip4();
+            let max = 10;
+
+            do spawntask_immediately {
+                let mut listener = TcpListener::bind(addr);
+                for max.times {
+                    let mut stream = listener.accept();
                     let mut buf = [0];
-                    listener.read(buf);
+                    stream.read(buf);
                     assert!(buf[0] == 99);
                 }
             }
 
-            do spawn_immediately {
-                let stream = TcpStream::connect(addr);
-                stream.write([99]);
+            do spawntask_immediately {
+                for max.times {
+                    let mut stream = TcpStream::connect(addr);
+                    stream.write([99]);
+                }
             }
-        }*/
+        }
     }
+
+    #[test]
+    fn multiple_connect_interleaved_greedy_schedule() {
+        do run_in_newsched_task {
+            let addr = next_test_ip4();
+            static MAX: int = 10;
+
+            do spawntask_immediately {
+                let mut listener = TcpListener::bind(addr);
+                for int::range(0, MAX) |i| {
+                    let stream = Cell(listener.accept());
+                    rtdebug!("accepted");
+                    // Start another task to handle the connection
+                    do spawntask_immediately {
+                        let mut stream = stream.take();
+                        let mut buf = [0];
+                        stream.read(buf);
+                        assert!(buf[0] == i as u8);
+                        rtdebug!("read");
+                    }
+                }
+            }
+
+            connect(0, addr);
+
+            fn connect(i: int, addr: IpAddr) {
+                if i == MAX { return }
+
+                do spawntask_immediately {
+                    rtdebug!("connecting");
+                    let mut stream = TcpStream::connect(addr);
+                    // Connect again before writing
+                    connect(i + 1, addr);
+                    rtdebug!("writing");
+                    stream.write([i as u8]);
+                }
+            }
+        }
+    }
+
+    #[test]
+    fn multiple_connect_interleaved_lazy_schedule() {
+        do run_in_newsched_task {
+            let addr = next_test_ip4();
+            static MAX: int = 10;
+
+            do spawntask_immediately {
+                let mut listener = TcpListener::bind(addr);
+                for int::range(0, MAX) |_| {
+                    let stream = Cell(listener.accept());
+                    rtdebug!("accepted");
+                    // Start another task to handle the connection
+                    do spawntask_later {
+                        let mut stream = stream.take();
+                        let mut buf = [0];
+                        stream.read(buf);
+                        assert!(buf[0] == 99);
+                        rtdebug!("read");
+                    }
+                }
+            }
+
+            connect(0, addr);
+
+            fn connect(i: int, addr: IpAddr) {
+                if i == MAX { return }
+
+                do spawntask_later {
+                    rtdebug!("connecting");
+                    let mut stream = TcpStream::connect(addr);
+                    // Connect again before writing
+                    connect(i + 1, addr);
+                    rtdebug!("writing");
+                    stream.write([99]);
+                }
+            }
+        }
+    }
+
 }
diff --git a/src/libcore/rt/io/net/udp.rs b/src/libcore/rt/io/net/udp.rs
index 1f1254a7029..bb5457e334d 100644
--- a/src/libcore/rt/io/net/udp.rs
+++ b/src/libcore/rt/io/net/udp.rs
@@ -32,10 +32,6 @@ impl Writer for UdpStream {
     fn flush(&mut self) { fail!() }
 }
 
-impl Close for UdpStream {
-    fn close(&mut self) { fail!() }
-}
-
 pub struct UdpListener;
 
 impl UdpListener {
diff --git a/src/libcore/rt/io/net/unix.rs b/src/libcore/rt/io/net/unix.rs
index f449a857467..b85b7dd059d 100644
--- a/src/libcore/rt/io/net/unix.rs
+++ b/src/libcore/rt/io/net/unix.rs
@@ -32,10 +32,6 @@ impl Writer for UnixStream {
     fn flush(&mut self) { fail!() }
 }
 
-impl Close for UnixStream {
-    fn close(&mut self) { fail!() }
-}
-
 pub struct UnixListener;
 
 impl UnixListener {
diff --git a/src/libcore/rt/io/stdio.rs b/src/libcore/rt/io/stdio.rs
index 26950986f7a..247fe954408 100644
--- a/src/libcore/rt/io/stdio.rs
+++ b/src/libcore/rt/io/stdio.rs
@@ -9,7 +9,7 @@
 // except according to those terms.
 
 use prelude::*;
-use super::{Reader, Writer, Close};
+use super::{Reader, Writer};
 
 pub fn stdin() -> StdReader { fail!() }
 
@@ -39,10 +39,6 @@ impl Reader for StdReader {
     fn eof(&mut self) -> bool { fail!() }
 }
 
-impl Close for StdReader {
-    fn close(&mut self) { fail!() }
-}
-
 pub struct StdWriter;
 
 impl StdWriter {
@@ -55,6 +51,3 @@ impl Writer for StdWriter {
     fn flush(&mut self) { fail!() }
 }
 
-impl Close for StdWriter {
-    fn close(&mut self) { fail!() }
-}
diff --git a/src/libcore/rt/sched/local_sched.rs b/src/libcore/rt/local_sched.rs
index a7e02f30e01..eb35eb7881d 100644
--- a/src/libcore/rt/sched/local_sched.rs
+++ b/src/libcore/rt/local_sched.rs
@@ -13,18 +13,21 @@
 use prelude::*;
 use ptr::mut_null;
 use libc::c_void;
-use cast::transmute;
+use cast;
+use cell::Cell;
 
-use super::Scheduler;
-use super::super::rtio::IoFactoryObject;
-use tls = super::super::thread_local_storage;
-#[cfg(test)] use super::super::uvio::UvEventLoop;
+use rt::sched::Scheduler;
+use rt::rtio::{EventLoop, IoFactoryObject};
+use tls = rt::thread_local_storage;
+use unstable::finally::Finally;
+
+#[cfg(test)] use rt::uv::uvio::UvEventLoop;
 
 /// Give the Scheduler to thread-local storage
 pub fn put(sched: ~Scheduler) {
     unsafe {
         let key = tls_key();
-        let void_sched: *mut c_void = transmute::<~Scheduler, *mut c_void>(sched);
+        let void_sched: *mut c_void = cast::transmute(sched);
         tls::set(key, void_sched);
     }
 }
@@ -34,8 +37,8 @@ pub fn take() -> ~Scheduler {
     unsafe {
         let key = tls_key();
         let void_sched: *mut c_void = tls::get(key);
-        assert!(void_sched.is_not_null());
-        let sched = transmute::<*mut c_void, ~Scheduler>(void_sched);
+        rtassert!(void_sched.is_not_null());
+        let sched: ~Scheduler = cast::transmute(void_sched);
         tls::set(key, mut_null());
         return sched;
     }
@@ -55,8 +58,18 @@ pub fn exists() -> bool {
 /// While the scheduler is borrowed it is not available in TLS.
 pub fn borrow(f: &fn(&mut Scheduler)) {
     let mut sched = take();
-    f(sched);
-    put(sched);
+
+    // XXX: Need a different abstraction from 'finally' here to avoid unsafety
+    unsafe {
+        let unsafe_sched = cast::transmute_mut_region(&mut *sched);
+        let sched = Cell(sched);
+
+        do (|| {
+            f(unsafe_sched);
+        }).finally {
+            put(sched.take());
+        }
+    }
 }
 
 /// Borrow a mutable reference to the thread-local Scheduler
@@ -65,23 +78,22 @@ pub fn borrow(f: &fn(&mut Scheduler)) {
 ///
 /// Because this leaves the Scheduler in thread-local storage it is possible
 /// For the Scheduler pointer to be aliased
-pub unsafe fn unsafe_borrow() -> &mut Scheduler {
+pub unsafe fn unsafe_borrow() -> *mut Scheduler {
     let key = tls_key();
     let mut void_sched: *mut c_void = tls::get(key);
-    assert!(void_sched.is_not_null());
+    rtassert!(void_sched.is_not_null());
     {
-        let void_sched_ptr = &mut void_sched;
-        let sched: &mut ~Scheduler = {
-            transmute::<&mut *mut c_void, &mut ~Scheduler>(void_sched_ptr)
-        };
-        let sched: &mut Scheduler = &mut **sched;
+        let sched: *mut *mut c_void = &mut void_sched;
+        let sched: *mut ~Scheduler = sched as *mut ~Scheduler;
+        let sched: *mut Scheduler = &mut **sched;
         return sched;
     }
 }
 
-pub unsafe fn unsafe_borrow_io() -> &mut IoFactoryObject {
+pub unsafe fn unsafe_borrow_io() -> *mut IoFactoryObject {
     let sched = unsafe_borrow();
-    return sched.event_loop.io().unwrap();
+    let io: *mut IoFactoryObject = (*sched).event_loop.io().unwrap();
+    return io;
 }
 
 fn tls_key() -> tls::Key {
@@ -91,7 +103,7 @@ fn tls_key() -> tls::Key {
 fn maybe_tls_key() -> Option<tls::Key> {
     unsafe {
         let key: *mut c_void = rust_get_sched_tls_key();
-        let key: &mut tls::Key = transmute(key);
+        let key: &mut tls::Key = cast::transmute(key);
         let key = *key;
         // Check that the key has been initialized.
 
@@ -105,7 +117,7 @@ fn maybe_tls_key() -> Option<tls::Key> {
         // another thread. I think this is fine since the only action
         // they could take if it was initialized would be to check the
         // thread-local value and see that it's not set.
-        if key != 0 {
+        if key != -1 {
             return Some(key);
         } else {
             return None;
diff --git a/src/libcore/rt/local_services.rs b/src/libcore/rt/local_services.rs
index bc945707e62..98bfc2fa168 100644
--- a/src/libcore/rt/local_services.rs
+++ b/src/libcore/rt/local_services.rs
@@ -23,19 +23,19 @@ use libc::{c_void, uintptr_t};
 use cast::transmute;
 use super::sched::local_sched;
 use super::local_heap::LocalHeap;
+use rt::logging::StdErrLogger;
 
 pub struct LocalServices {
     heap: LocalHeap,
     gc: GarbageCollector,
     storage: LocalStorage,
-    logger: Logger,
+    logger: StdErrLogger,
     unwinder: Option<Unwinder>,
     destroyed: bool
 }
 
 pub struct GarbageCollector;
 pub struct LocalStorage(*c_void, Option<~fn(*c_void)>);
-pub struct Logger;
 
 pub struct Unwinder {
     unwinding: bool,
@@ -47,7 +47,7 @@ impl LocalServices {
             heap: LocalHeap::new(),
             gc: GarbageCollector,
             storage: LocalStorage(ptr::null(), None),
-            logger: Logger,
+            logger: StdErrLogger,
             unwinder: Some(Unwinder { unwinding: false }),
             destroyed: false
         }
@@ -58,7 +58,7 @@ impl LocalServices {
             heap: LocalHeap::new(),
             gc: GarbageCollector,
             storage: LocalStorage(ptr::null(), None),
-            logger: Logger,
+            logger: StdErrLogger,
             unwinder: None,
             destroyed: false
         }
@@ -169,19 +169,27 @@ pub fn borrow_local_services(f: &fn(&mut LocalServices)) {
     }
 }
 
-pub unsafe fn unsafe_borrow_local_services() -> &mut LocalServices {
-    use cast::transmute_mut_region;
-
-    match local_sched::unsafe_borrow().current_task {
+pub unsafe fn unsafe_borrow_local_services() -> *mut LocalServices {
+    match (*local_sched::unsafe_borrow()).current_task {
         Some(~ref mut task) => {
-            transmute_mut_region(&mut task.local_services)
+            let s: *mut LocalServices = &mut task.local_services;
+            return s;
         }
         None => {
-            fail!("no local services for schedulers yet")
+            // Don't fail. Infinite recursion
+            abort!("no local services for schedulers yet")
         }
     }
 }
 
+pub unsafe fn unsafe_try_borrow_local_services() -> Option<*mut LocalServices> {
+    if local_sched::exists() {
+        Some(unsafe_borrow_local_services())
+    } else {
+        None
+    }
+}
+
 #[cfg(test)]
 mod test {
     use rt::test::*;
@@ -229,4 +237,12 @@ mod test {
             let _ = r.next();
         }
     }
+
+    #[test]
+    fn logging() {
+        do run_in_newsched_task() {
+            info!("here i am. logging in a newsched task");
+        }
+    }
 }
+
diff --git a/src/libcore/rt/logging.rs b/src/libcore/rt/logging.rs
new file mode 100644
index 00000000000..4ed09fd829f
--- /dev/null
+++ b/src/libcore/rt/logging.rs
@@ -0,0 +1,38 @@
+// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+use either::*;
+
+pub trait Logger {
+    fn log(&mut self, msg: Either<~str, &'static str>);
+}
+
+pub struct StdErrLogger;
+
+impl Logger for StdErrLogger {
+    fn log(&mut self, msg: Either<~str, &'static str>) {
+        use io::{Writer, WriterUtil};
+
+        let s: &str = match msg {
+            Left(ref s) => {
+                let s: &str = *s;
+                s
+            }
+            Right(ref s) => {
+                let s: &str = *s;
+                s
+            }
+        };
+        let dbg = ::libc::STDERR_FILENO as ::io::fd_t;
+        dbg.write_str(s);
+        dbg.write_str("\n");
+        dbg.flush();
+    }
+}
\ No newline at end of file
diff --git a/src/libcore/rt/mod.rs b/src/libcore/rt/mod.rs
index fbbc8274340..f04c38f79e8 100644
--- a/src/libcore/rt/mod.rs
+++ b/src/libcore/rt/mod.rs
@@ -8,40 +8,143 @@
 // option. This file may not be copied, modified, or distributed
 // except according to those terms.
 
-/*! The Rust runtime, including the scheduler and I/O interface */
+/*! The Rust Runtime, including the task scheduler and I/O
+
+The `rt` module provides the private runtime infrastructure necessary
+to support core language features like the exchange and local heap,
+the garbage collector, logging, local data and unwinding. It also
+implements the default task scheduler and task model. Initialization
+routines are provided for setting up runtime resources in common
+configurations, including that used by `rustc` when generating
+executables.
+
+It is intended that the features provided by `rt` can be factored in a
+way such that the core library can be built with different 'profiles'
+for different use cases, e.g. excluding the task scheduler. A number
+of runtime features though are critical to the functioning of the
+language and an implementation must be provided regardless of the
+execution environment.
+
+Of foremost importance is the global exchange heap, in the module
+`global_heap`. Very little practical Rust code can be written without
+access to the global heap. Unlike most of `rt` the global heap is
+truly a global resource and generally operates independently of the
+rest of the runtime.
+
+All other runtime features are 'local', either thread-local or
+task-local.  Those critical to the functioning of the language are
+defined in the module `local_services`. Local services are those which
+are expected to be available to Rust code generally but rely on
+thread- or task-local state. These currently include the local heap,
+the garbage collector, local storage, logging and the stack unwinder.
+Local services are primarily implemented for tasks, but may also
+be implemented for use outside of tasks.
+
+The relationship between `rt` and the rest of the core library is
+not entirely clear yet and some modules will be moving into or
+out of `rt` as development proceeds.
+
+Several modules in `core` are clients of `rt`:
+
+* `core::task` - The user-facing interface to the Rust task model.
+* `core::task::local_data` - The interface to local data.
+* `core::gc` - The garbage collector.
+* `core::unstable::lang` - Miscellaneous lang items, some of which rely on `core::rt`.
+* `core::condition` - Uses local data.
+* `core::cleanup` - Local heap destruction.
+* `core::io` - In the future `core::io` will use an `rt` implementation.
+* `core::logging`
+* `core::pipes`
+* `core::comm`
+* `core::stackwalk`
+
+*/
 
 #[doc(hidden)];
 
 use libc::c_char;
 use ptr::Ptr;
 
-#[path = "sched/mod.rs"]
+/// The global (exchange) heap.
+pub mod global_heap;
+
+/// The Scheduler and Task types.
 mod sched;
+
+/// Thread-local access to the current Scheduler.
+pub mod local_sched;
+
+/// Synchronous I/O.
+#[path = "io/mod.rs"]
+pub mod io;
+
+/// Thread-local implementations of language-critical runtime features like @.
+pub mod local_services;
+
+/// The EventLoop and internal synchronous I/O interface.
 mod rtio;
-pub mod uvll;
-mod uvio;
+
+/// libuv and default rtio implementation.
 #[path = "uv/mod.rs"]
-mod uv;
-#[path = "io/mod.rs"]
-mod io;
+pub mod uv;
+
 // FIXME #5248: The import in `sched` doesn't resolve unless this is pub!
+/// Bindings to pthread/windows thread-local storage.
 pub mod thread_local_storage;
+
+/// A parallel work-stealing dequeue.
 mod work_queue;
+
+/// Stack segments and caching.
 mod stack;
+
+/// CPU context swapping.
 mod context;
+
+/// Bindings to system threading libraries.
 mod thread;
+
+/// The runtime configuration, read from environment variables
 pub mod env;
-pub mod local_services;
+
+/// The local, managed heap
 mod local_heap;
 
+/// The Logger trait and implementations
+pub mod logging;
+
 /// Tools for testing the runtime
 #[cfg(test)]
 pub mod test;
 
+/// Reference counting
+pub mod rc;
+
+/// A simple single-threaded channel type for passing buffered data between
+/// scheduler and task context
+pub mod tube;
+
+/// Set up a default runtime configuration, given compiler-supplied arguments.
+///
+/// This is invoked by the `start` _language item_ (unstable::lang) to
+/// run a Rust executable.
+///
+/// # Arguments
+///
+/// * `main` - A C-abi function that takes no arguments and returns `c_void`.
+///   It is a wrapper around the user-defined `main` function, and will be run
+///   in a task.
+/// * `argc` & `argv` - The argument vector. On Unix this information is used
+///   by os::args.
+/// * `crate_map` - Runtime information about the executing crate, mostly for logging
+///
+/// # Return value
+///
+/// The return value is used as the process return code. 0 on success, 101 on error.
 pub fn start(main: *u8, _argc: int, _argv: **c_char, _crate_map: *u8) -> int {
 
     use self::sched::{Scheduler, Task};
-    use self::uvio::UvEventLoop;
+    use self::uv::uvio::UvEventLoop;
     use sys::Closure;
     use ptr;
     use cast;
@@ -72,6 +175,8 @@ pub fn start(main: *u8, _argc: int, _argv: **c_char, _crate_map: *u8) -> int {
 
 /// Possible contexts in which Rust code may be executing.
 /// Different runtime services are available depending on context.
+/// Mostly used for determining if we're using the new scheduler
+/// or the old scheduler.
 #[deriving(Eq)]
 pub enum RuntimeContext {
     // Only the exchange heap is available
@@ -84,6 +189,7 @@ pub enum RuntimeContext {
     OldTaskContext
 }
 
+/// Determine the current RuntimeContext
 pub fn context() -> RuntimeContext {
 
     use task::rt::rust_task;
@@ -119,7 +225,7 @@ pub fn context() -> RuntimeContext {
 fn test_context() {
     use unstable::run_in_bare_thread;
     use self::sched::{local_sched, Task};
-    use self::uvio::UvEventLoop;
+    use rt::uv::uvio::UvEventLoop;
     use cell::Cell;
 
     assert!(context() == OldTaskContext);
diff --git a/src/libcore/rt/rc.rs b/src/libcore/rt/rc.rs
new file mode 100644
index 00000000000..1c0c8c14fdf
--- /dev/null
+++ b/src/libcore/rt/rc.rs
@@ -0,0 +1,142 @@
+// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+//! An owned, task-local, reference counted type
+//!
+//! # Safety note
+//!
+//! XXX There is currently no type-system mechanism for enforcing that
+//! reference counted types are both allocated on the exchange heap
+//! and also non-sendable
+//!
+//! This doesn't prevent borrowing multiple aliasable mutable pointers
+
+use ops::Drop;
+use clone::Clone;
+use libc::c_void;
+use cast;
+
+pub struct RC<T> {
+    p: *c_void // ~(uint, T)
+}
+
+impl<T> RC<T> {
+    pub fn new(val: T) -> RC<T> {
+        unsafe {
+            let v = ~(1, val);
+            let p: *c_void = cast::transmute(v);
+            RC { p: p }
+        }
+    }
+
+    fn get_mut_state(&mut self) -> *mut (uint, T) {
+        unsafe {
+            let p: &mut ~(uint, T) = cast::transmute(&mut self.p);
+            let p: *mut (uint, T) = &mut **p;
+            return p;
+        }
+    }
+
+    fn get_state(&self) -> *(uint, T) {
+        unsafe {
+            let p: &~(uint, T) = cast::transmute(&self.p);
+            let p: *(uint, T) = &**p;
+            return p;
+        }
+    }
+
+    pub fn unsafe_borrow_mut(&mut self) -> *mut T {
+        unsafe {
+            match *self.get_mut_state() {
+                (_, ref mut p) => {
+                    let p: *mut T = p;
+                    return p;
+                }
+            }
+        }
+    }
+
+    pub fn refcount(&self) -> uint {
+        unsafe {
+            match *self.get_state() {
+                (count, _) => count
+            }
+        }
+    }
+}
+
+#[unsafe_destructor]
+impl<T> Drop for RC<T> {
+    fn finalize(&self) {
+        assert!(self.refcount() > 0);
+
+        unsafe {
+            // XXX: Mutable finalizer
+            let this: &mut RC<T> = cast::transmute_mut(self);
+
+            match *this.get_mut_state() {
+                (ref mut count, _) => {
+                    *count = *count - 1
+                }
+            }
+
+            if this.refcount() == 0 {
+                let _: ~(uint, T) = cast::transmute(this.p);
+            }
+        }
+    }
+}
+
+impl<T> Clone for RC<T> {
+    fn clone(&self) -> RC<T> {
+        unsafe {
+            // XXX: Mutable clone
+            let this: &mut RC<T> = cast::transmute_mut(self);
+
+            match *this.get_mut_state() {
+                (ref mut count, _) => {
+                    *count = *count + 1;
+                }
+            }
+        }
+
+        RC { p: self.p }
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use super::RC;
+
+    #[test]
+    fn smoke_test() {
+        unsafe {
+            let mut v1 = RC::new(100);
+            assert!(*v1.unsafe_borrow_mut() == 100);
+            assert!(v1.refcount() == 1);
+
+            let mut v2 = v1.clone();
+            assert!(*v2.unsafe_borrow_mut() == 100);
+            assert!(v2.refcount() == 2);
+
+            *v2.unsafe_borrow_mut() = 200;
+            assert!(*v2.unsafe_borrow_mut() == 200);
+            assert!(*v1.unsafe_borrow_mut() == 200);
+
+            let v3 = v2.clone();
+            assert!(v3.refcount() == 3);
+            {
+                let _v1 = v1;
+                let _v2 = v2;
+            }
+            assert!(v3.refcount() == 1);
+        }
+    }
+}
diff --git a/src/libcore/rt/rtio.rs b/src/libcore/rt/rtio.rs
index fd64438c61b..497ff8841b6 100644
--- a/src/libcore/rt/rtio.rs
+++ b/src/libcore/rt/rtio.rs
@@ -11,14 +11,16 @@
 use option::*;
 use result::*;
 
+use rt::io::IoError;
 use super::io::net::ip::IpAddr;
+use rt::uv::uvio;
 
 // XXX: ~object doesn't work currently so these are some placeholder
 // types to use instead
-pub type EventLoopObject = super::uvio::UvEventLoop;
-pub type IoFactoryObject = super::uvio::UvIoFactory;
-pub type StreamObject = super::uvio::UvStream;
-pub type TcpListenerObject = super::uvio::UvTcpListener;
+pub type EventLoopObject = uvio::UvEventLoop;
+pub type IoFactoryObject = uvio::UvIoFactory;
+pub type RtioTcpStreamObject = uvio::UvTcpStream;
+pub type RtioTcpListenerObject = uvio::UvTcpListener;
 
 pub trait EventLoop {
     fn run(&mut self);
@@ -28,15 +30,15 @@ pub trait EventLoop {
 }
 
 pub trait IoFactory {
-    fn connect(&mut self, addr: IpAddr) -> Option<~StreamObject>;
-    fn bind(&mut self, addr: IpAddr) -> Option<~TcpListenerObject>;
+    fn tcp_connect(&mut self, addr: IpAddr) -> Result<~RtioTcpStreamObject, IoError>;
+    fn tcp_bind(&mut self, addr: IpAddr) -> Result<~RtioTcpListenerObject, IoError>;
 }
 
-pub trait TcpListener {
-    fn listen(&mut self) -> Option<~StreamObject>;
+pub trait RtioTcpListener {
+    fn accept(&mut self) -> Result<~RtioTcpStreamObject, IoError>;
 }
 
-pub trait Stream {
-    fn read(&mut self, buf: &mut [u8]) -> Result<uint, ()>;
-    fn write(&mut self, buf: &[u8]) -> Result<(), ()>;
+pub trait RtioTcpStream {
+    fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError>;
+    fn write(&mut self, buf: &[u8]) -> Result<(), IoError>;
 }
diff --git a/src/libcore/rt/sched/mod.rs b/src/libcore/rt/sched.rs
index dda1f27550f..395f9099571 100644
--- a/src/libcore/rt/sched/mod.rs
+++ b/src/libcore/rt/sched.rs
@@ -19,7 +19,7 @@ use super::context::Context;
 use super::local_services::LocalServices;
 use cell::Cell;
 
-#[cfg(test)] use super::uvio::UvEventLoop;
+#[cfg(test)] use rt::uv::uvio::UvEventLoop;
 #[cfg(test)] use unstable::run_in_bare_thread;
 #[cfg(test)] use int;
 
@@ -106,6 +106,7 @@ pub impl Scheduler {
                 }
             }
 
+            let scheduler = &mut *scheduler;
             scheduler.event_loop.callback(run_scheduler_once);
             scheduler.event_loop.run();
         }
@@ -179,7 +180,7 @@ pub impl Scheduler {
         // Take pointers to both the task and scheduler's saved registers.
         unsafe {
             let sched = local_sched::unsafe_borrow();
-            let (sched_context, _, next_task_context) = sched.get_contexts();
+            let (sched_context, _, next_task_context) = (*sched).get_contexts();
             let next_task_context = next_task_context.unwrap();
             // Context switch to the task, restoring it's registers
             // and saving the scheduler's
@@ -187,10 +188,10 @@ pub impl Scheduler {
 
             let sched = local_sched::unsafe_borrow();
             // The running task should have passed ownership elsewhere
-            assert!(sched.current_task.is_none());
+            assert!((*sched).current_task.is_none());
 
             // Running tasks may have asked us to do some cleanup
-            sched.run_cleanup_job();
+            (*sched).run_cleanup_job();
         }
     }
 
@@ -208,21 +209,25 @@ pub impl Scheduler {
 
         rtdebug!("blocking task");
 
-        let blocked_task = this.current_task.swap_unwrap();
-        let f_fake_region = unsafe { transmute::<&fn(~Task), &fn(~Task)>(f) };
-        let f_opaque = ClosureConverter::from_fn(f_fake_region);
-        this.enqueue_cleanup_job(GiveTask(blocked_task, f_opaque));
+        unsafe {
+            let blocked_task = this.current_task.swap_unwrap();
+            let f_fake_region = transmute::<&fn(~Task), &fn(~Task)>(f);
+            let f_opaque = ClosureConverter::from_fn(f_fake_region);
+            this.enqueue_cleanup_job(GiveTask(blocked_task, f_opaque));
+        }
 
         local_sched::put(this);
 
-        let sched = unsafe { local_sched::unsafe_borrow() };
-        let (sched_context, last_task_context, _) = sched.get_contexts();
-        let last_task_context = last_task_context.unwrap();
-        Context::swap(last_task_context, sched_context);
+        unsafe {
+            let sched = local_sched::unsafe_borrow();
+            let (sched_context, last_task_context, _) = (*sched).get_contexts();
+            let last_task_context = last_task_context.unwrap();
+            Context::swap(last_task_context, sched_context);
 
-        // We could be executing in a different thread now
-        let sched = unsafe { local_sched::unsafe_borrow() };
-        sched.run_cleanup_job();
+            // We could be executing in a different thread now
+            let sched = local_sched::unsafe_borrow();
+            (*sched).run_cleanup_job();
+        }
     }
 
     /// Switch directly to another task, without going through the scheduler.
@@ -244,14 +249,14 @@ pub impl Scheduler {
 
         unsafe {
             let sched = local_sched::unsafe_borrow();
-            let (_, last_task_context, next_task_context) = sched.get_contexts();
+            let (_, last_task_context, next_task_context) = (*sched).get_contexts();
             let last_task_context = last_task_context.unwrap();
             let next_task_context = next_task_context.unwrap();
             Context::swap(last_task_context, next_task_context);
 
             // We could be executing in a different thread now
             let sched = local_sched::unsafe_borrow();
-            sched.run_cleanup_job();
+            (*sched).run_cleanup_job();
         }
     }
 
@@ -356,10 +361,10 @@ pub impl Task {
             // have asked us to do some cleanup.
             unsafe {
                 let sched = local_sched::unsafe_borrow();
-                sched.run_cleanup_job();
+                (*sched).run_cleanup_job();
 
                 let sched = local_sched::unsafe_borrow();
-                let task = sched.current_task.get_mut_ref();
+                let task = (*sched).current_task.get_mut_ref();
                 // FIXME #6141: shouldn't neet to put `start()` in another closure
                 task.local_services.run(||start());
             }
diff --git a/src/libcore/rt/stack.rs b/src/libcore/rt/stack.rs
index 3a4e9307d3b..019540ce76b 100644
--- a/src/libcore/rt/stack.rs
+++ b/src/libcore/rt/stack.rs
@@ -11,21 +11,36 @@
 use container::Container;
 use ptr::Ptr;
 use vec;
+use ops::Drop;
+use libc::{c_uint, uintptr_t};
 
 pub struct StackSegment {
-    buf: ~[u8]
+    buf: ~[u8],
+    valgrind_id: c_uint
 }
 
 pub impl StackSegment {
     fn new(size: uint) -> StackSegment {
-        // Crate a block of uninitialized values
-        let mut stack = vec::with_capacity(size);
         unsafe {
+            // Crate a block of uninitialized values
+            let mut stack = vec::with_capacity(size);
             vec::raw::set_len(&mut stack, size);
+
+            let mut stk = StackSegment {
+                buf: stack,
+                valgrind_id: 0
+            };
+
+            // XXX: Using the FFI to call a C macro. Slow
+            stk.valgrind_id = rust_valgrind_stack_register(stk.start(), stk.end());
+            return stk;
         }
+    }
 
-        StackSegment {
-            buf: stack
+    /// Point to the low end of the allocated stack
+    fn start(&self) -> *uint {
+        unsafe {
+            vec::raw::to_ptr(self.buf) as *uint
         }
     }
 
@@ -37,6 +52,15 @@ pub impl StackSegment {
     }
 }
 
+impl Drop for StackSegment {
+    fn finalize(&self) {
+        unsafe {
+            // XXX: Using the FFI to call a C macro. Slow
+            rust_valgrind_stack_deregister(self.valgrind_id);
+        }
+    }
+}
+
 pub struct StackPool(());
 
 impl StackPool {
@@ -49,3 +73,8 @@ impl StackPool {
     fn give_segment(&self, _stack: StackSegment) {
     }
 }
+
+extern {
+    fn rust_valgrind_stack_register(start: *uintptr_t, end: *uintptr_t) -> c_uint;
+    fn rust_valgrind_stack_deregister(id: c_uint);
+}
diff --git a/src/libcore/rt/test.rs b/src/libcore/rt/test.rs
index 0c6843c605d..8d0ae0caf4d 100644
--- a/src/libcore/rt/test.rs
+++ b/src/libcore/rt/test.rs
@@ -19,7 +19,7 @@ use rt::local_services::LocalServices;
 pub fn run_in_newsched_task(f: ~fn()) {
     use unstable::run_in_bare_thread;
     use super::sched::Task;
-    use super::uvio::UvEventLoop;
+    use rt::uv::uvio::UvEventLoop;
 
     let f = Cell(f);
 
@@ -64,6 +64,46 @@ pub fn spawntask_immediately(f: ~fn()) {
     }
 }
 
+/// Create a new task and run it right now. Aborts on failure
+pub fn spawntask_later(f: ~fn()) {
+    use super::sched::*;
+
+    let mut sched = local_sched::take();
+    let task = ~Task::with_local(&mut sched.stack_pool,
+                                 LocalServices::without_unwinding(),
+                                 f);
+
+    sched.task_queue.push_front(task);
+    local_sched::put(sched);
+}
+
+/// Spawn a task and either run it immediately or run it later
+pub fn spawntask_random(f: ~fn()) {
+    use super::sched::*;
+    use rand::{Rand, rng};
+
+    let mut rng = rng();
+    let run_now: bool = Rand::rand(&mut rng);
+
+    let mut sched = local_sched::take();
+    let task = ~Task::with_local(&mut sched.stack_pool,
+                                 LocalServices::without_unwinding(),
+                                 f);
+
+    if run_now {
+        do sched.switch_running_tasks_and_then(task) |task| {
+            let task = Cell(task);
+            do local_sched::borrow |sched| {
+                sched.task_queue.push_front(task.take());
+            }
+        }
+    } else {
+        sched.task_queue.push_front(task);
+        local_sched::put(sched);
+    }
+}
+
+
 /// Spawn a task and wait for it to finish, returning whether it completed successfully or failed
 pub fn spawntask_try(f: ~fn()) -> Result<(), ()> {
     use cell::Cell;
diff --git a/src/libcore/rt/tube.rs b/src/libcore/rt/tube.rs
new file mode 100644
index 00000000000..8e7bf72fa63
--- /dev/null
+++ b/src/libcore/rt/tube.rs
@@ -0,0 +1,184 @@
+// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+//! A very simple unsynchronized channel type for sending buffered data from
+//! scheduler context to task context.
+//!
+//! XXX: This would be safer to use if split into two types like Port/Chan
+
+use option::*;
+use clone::Clone;
+use super::rc::RC;
+use rt::sched::Task;
+use rt::{context, TaskContext, SchedulerContext};
+use rt::local_sched;
+use vec::OwnedVector;
+use container::Container;
+
+struct TubeState<T> {
+    blocked_task: Option<~Task>,
+    buf: ~[T]
+}
+
+pub struct Tube<T> {
+    p: RC<TubeState<T>>
+}
+
+impl<T> Tube<T> {
+    pub fn new() -> Tube<T> {
+        Tube {
+            p: RC::new(TubeState {
+                blocked_task: None,
+                buf: ~[]
+            })
+        }
+    }
+
+    pub fn send(&mut self, val: T) {
+        rtdebug!("tube send");
+        assert!(context() == SchedulerContext);
+
+        unsafe {
+            let state = self.p.unsafe_borrow_mut();
+            (*state).buf.push(val);
+
+            if (*state).blocked_task.is_some() {
+                // There's a waiting task. Wake it up
+                rtdebug!("waking blocked tube");
+                let task = (*state).blocked_task.swap_unwrap();
+                let sched = local_sched::take();
+                sched.resume_task_immediately(task);
+            }
+        }
+    }
+
+    pub fn recv(&mut self) -> T {
+        assert!(context() == TaskContext);
+
+        unsafe {
+            let state = self.p.unsafe_borrow_mut();
+            if !(*state).buf.is_empty() {
+                return (*state).buf.shift();
+            } else {
+                // Block and wait for the next message
+                rtdebug!("blocking on tube recv");
+                assert!(self.p.refcount() > 1); // There better be somebody to wake us up
+                assert!((*state).blocked_task.is_none());
+                let sched = local_sched::take();
+                do sched.deschedule_running_task_and_then |task| {
+                    (*state).blocked_task = Some(task);
+                }
+                rtdebug!("waking after tube recv");
+                let buf = &mut (*state).buf;
+                assert!(!buf.is_empty());
+                return buf.shift();
+            }
+        }
+    }
+}
+
+impl<T> Clone for Tube<T> {
+    fn clone(&self) -> Tube<T> {
+        Tube { p: self.p.clone() }
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use int;
+    use cell::Cell;
+    use rt::local_sched;
+    use rt::test::*;
+    use rt::rtio::EventLoop;
+    use super::*;
+
+    #[test]
+    fn simple_test() {
+        do run_in_newsched_task {
+            let mut tube: Tube<int> = Tube::new();
+            let tube_clone = tube.clone();
+            let tube_clone_cell = Cell(tube_clone);
+            let sched = local_sched::take();
+            do sched.deschedule_running_task_and_then |task| {
+                let mut tube_clone = tube_clone_cell.take();
+                tube_clone.send(1);
+                let sched = local_sched::take();
+                sched.resume_task_immediately(task);
+            }
+
+            assert!(tube.recv() == 1);
+        }
+    }
+
+    #[test]
+    fn blocking_test() {
+        do run_in_newsched_task {
+            let mut tube: Tube<int> = Tube::new();
+            let tube_clone = tube.clone();
+            let tube_clone = Cell(Cell(Cell(tube_clone)));
+            let sched = local_sched::take();
+            do sched.deschedule_running_task_and_then |task| {
+                let tube_clone = tube_clone.take();
+                do local_sched::borrow |sched| {
+                    let tube_clone = tube_clone.take();
+                    do sched.event_loop.callback {
+                        let mut tube_clone = tube_clone.take();
+                        // The task should be blocked on this now and
+                        // sending will wake it up.
+                        tube_clone.send(1);
+                    }
+                }
+                let sched = local_sched::take();
+                sched.resume_task_immediately(task);
+            }
+
+            assert!(tube.recv() == 1);
+        }
+    }
+
+    #[test]
+    fn many_blocking_test() {
+        static MAX: int = 100;
+
+        do run_in_newsched_task {
+            let mut tube: Tube<int> = Tube::new();
+            let tube_clone = tube.clone();
+            let tube_clone = Cell(tube_clone);
+            let sched = local_sched::take();
+            do sched.deschedule_running_task_and_then |task| {
+                callback_send(tube_clone.take(), 0);
+
+                fn callback_send(tube: Tube<int>, i: int) {
+                    if i == 100 { return; }
+
+                    let tube = Cell(Cell(tube));
+                    do local_sched::borrow |sched| {
+                        let tube = tube.take();
+                        do sched.event_loop.callback {
+                            let mut tube = tube.take();
+                            // The task should be blocked on this now and
+                            // sending will wake it up.
+                            tube.send(i);
+                            callback_send(tube, i + 1);
+                        }
+                    }
+                }
+
+                let sched = local_sched::take();
+                sched.resume_task_immediately(task);
+            }
+
+            for int::range(0, MAX) |i| {
+                let j = tube.recv();
+                assert!(j == i);
+            }
+        }
+    }
+}
diff --git a/src/libcore/rt/uv/file.rs b/src/libcore/rt/uv/file.rs
index a4aef7485d7..2d145055097 100644
--- a/src/libcore/rt/uv/file.rs
+++ b/src/libcore/rt/uv/file.rs
@@ -11,15 +11,11 @@
 use prelude::*;
 use ptr::null;
 use libc::c_void;
-use super::{UvError, Callback, Request, NativeHandle, Loop};
-use super::super::uvll;
-use super::super::uvll::*;
-
-pub type FsCallback = ~fn(FsRequest, Option<UvError>);
-impl Callback for FsCallback { }
+use rt::uv::{Request, NativeHandle, Loop, FsCallback};
+use rt::uv::uvll;
+use rt::uv::uvll::*;
 
 pub struct FsRequest(*uvll::uv_fs_t);
-
 impl Request for FsRequest;
 
 impl FsRequest {
diff --git a/src/libcore/rt/uv/idle.rs b/src/libcore/rt/uv/idle.rs
new file mode 100644
index 00000000000..fecb9391caa
--- /dev/null
+++ b/src/libcore/rt/uv/idle.rs
@@ -0,0 +1,91 @@
+// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+use libc::c_int;
+use option::Some;
+use rt::uv::uvll;
+use rt::uv::{Watcher, Loop, NativeHandle, IdleCallback, NullCallback};
+use rt::uv::status_to_maybe_uv_error;
+
+pub struct IdleWatcher(*uvll::uv_idle_t);
+impl Watcher for IdleWatcher { }
+
+pub impl IdleWatcher {
+    fn new(loop_: &mut Loop) -> IdleWatcher {
+        unsafe {
+            let handle = uvll::idle_new();
+            assert!(handle.is_not_null());
+            assert!(0 == uvll::idle_init(loop_.native_handle(), handle));
+            let mut watcher: IdleWatcher = NativeHandle::from_native_handle(handle);
+            watcher.install_watcher_data();
+            return watcher
+        }
+    }
+
+    fn start(&mut self, cb: IdleCallback) {
+        {
+            let data = self.get_watcher_data();
+            data.idle_cb = Some(cb);
+        }
+
+        unsafe {
+            assert!(0 == uvll::idle_start(self.native_handle(), idle_cb))
+        };
+
+        extern fn idle_cb(handle: *uvll::uv_idle_t, status: c_int) {
+            let mut idle_watcher: IdleWatcher = NativeHandle::from_native_handle(handle);
+            let data = idle_watcher.get_watcher_data();
+            let cb: &IdleCallback = data.idle_cb.get_ref();
+            let status = status_to_maybe_uv_error(handle, status);
+            (*cb)(idle_watcher, status);
+        }
+    }
+
+    fn stop(&mut self) {
+        // NB: Not resetting the Rust idle_cb to None here because `stop` is likely
+        // called from *within* the idle callback, causing a use after free
+
+        unsafe {
+            assert!(0 == uvll::idle_stop(self.native_handle()));
+        }
+    }
+
+    fn close(self, cb: NullCallback) {
+        {
+            let mut this = self;
+            let data = this.get_watcher_data();
+            assert!(data.close_cb.is_none());
+            data.close_cb = Some(cb);
+        }
+
+        unsafe { uvll::close(self.native_handle(), close_cb) };
+
+        extern fn close_cb(handle: *uvll::uv_idle_t) {
+            unsafe {
+                let mut idle_watcher: IdleWatcher = NativeHandle::from_native_handle(handle);
+                {
+                    let mut data = idle_watcher.get_watcher_data();
+                    data.close_cb.swap_unwrap()();
+                }
+                idle_watcher.drop_watcher_data();
+                uvll::idle_delete(handle);
+            }
+        }
+    }
+}
+
+impl NativeHandle<*uvll::uv_idle_t> for IdleWatcher {
+    fn from_native_handle(handle: *uvll::uv_idle_t) -> IdleWatcher {
+        IdleWatcher(handle)
+    }
+    fn native_handle(&self) -> *uvll::uv_idle_t {
+        match self { &IdleWatcher(ptr) => ptr }
+    }
+}
diff --git a/src/libcore/rt/uv/mod.rs b/src/libcore/rt/uv/mod.rs
index 6499f0a3efd..e7194491397 100644
--- a/src/libcore/rt/uv/mod.rs
+++ b/src/libcore/rt/uv/mod.rs
@@ -10,7 +10,7 @@
 
 /*!
 
-Bindings to libuv.
+Bindings to libuv, along with the default implementation of `core::rt::rtio`.
 
 UV types consist of the event loop (Loop), Watchers, Requests and
 Callbacks.
@@ -38,29 +38,44 @@ use container::Container;
 use option::*;
 use str::raw::from_c_str;
 use to_str::ToStr;
+use ptr::Ptr;
+use libc;
 use vec;
 use ptr;
-use ptr::Ptr;
+use cast;
+use str;
+use option::*;
+use str::raw::from_c_str;
+use to_str::ToStr;
 use libc::{c_void, c_int, size_t, malloc, free};
 use cast::transmute;
 use ptr::null;
-use super::uvll;
 use unstable::finally::Finally;
 
+use rt::io::IoError;
+
 #[cfg(test)] use unstable::run_in_bare_thread;
 
-pub use self::file::{FsRequest, FsCallback};
+pub use self::file::FsRequest;
 pub use self::net::{StreamWatcher, TcpWatcher};
-pub use self::net::{ReadCallback, AllocCallback, ConnectionCallback, ConnectCallback};
+pub use self::idle::IdleWatcher;
+
+/// The implementation of `rtio` for libuv
+pub mod uvio;
+
+/// C bindings to libuv
+pub mod uvll;
 
 pub mod file;
 pub mod net;
+pub mod idle;
 
-/// A trait for callbacks to implement. Provides a little extra type safety
-/// for generic, unsafe interop functions like `set_watcher_callback`.
-pub trait Callback { }
-
-pub trait Request { }
+/// XXX: Loop(*handle) is buggy with destructors. Normal structs
+/// with dtors may not be destructured, but tuple structs can,
+/// but the results are not correct.
+pub struct Loop {
+    handle: *uvll::uv_loop_t
+}
 
 /// The trait implemented by uv 'watchers' (handles). Watchers are
 /// non-owning wrappers around the uv handles and are not completely
@@ -68,12 +83,9 @@ pub trait Request { }
 /// handle.  Watchers are generally created, then `start`ed, `stop`ed
 /// and `close`ed, but due to their complex life cycle may not be
 /// entirely memory safe if used in unanticipated patterns.
-pub trait Watcher {
-    fn event_loop(&self) -> Loop;
-}
+pub trait Watcher { }
 
-pub type NullCallback = ~fn();
-impl Callback for NullCallback { }
+pub trait Request { }
 
 /// A type that wraps a native handle
 pub trait NativeHandle<T> {
@@ -81,13 +93,6 @@ pub trait NativeHandle<T> {
     pub fn native_handle(&self) -> T;
 }
 
-/// XXX: Loop(*handle) is buggy with destructors. Normal structs
-/// with dtors may not be destructured, but tuple structs can,
-/// but the results are not correct.
-pub struct Loop {
-    handle: *uvll::uv_loop_t
-}
-
 pub impl Loop {
     fn new() -> Loop {
         let handle = unsafe { uvll::loop_new() };
@@ -113,64 +118,71 @@ impl NativeHandle<*uvll::uv_loop_t> for Loop {
     }
 }
 
-pub struct IdleWatcher(*uvll::uv_idle_t);
+// XXX: The uv alloc callback also has a *uv_handle_t arg
+pub type AllocCallback = ~fn(uint) -> Buf;
+pub type ReadCallback = ~fn(StreamWatcher, int, Buf, Option<UvError>);
+pub type NullCallback = ~fn();
+pub type IdleCallback = ~fn(IdleWatcher, Option<UvError>);
+pub type ConnectionCallback = ~fn(StreamWatcher, Option<UvError>);
+pub type FsCallback = ~fn(FsRequest, Option<UvError>);
+
 
-impl Watcher for IdleWatcher {
-    fn event_loop(&self) -> Loop {
-        loop_from_watcher(self)
-    }
+/// Callbacks used by StreamWatchers, set as custom data on the foreign handle
+struct WatcherData {
+    read_cb: Option<ReadCallback>,
+    write_cb: Option<ConnectionCallback>,
+    connect_cb: Option<ConnectionCallback>,
+    close_cb: Option<NullCallback>,
+    alloc_cb: Option<AllocCallback>,
+    idle_cb: Option<IdleCallback>
 }
 
-pub type IdleCallback = ~fn(IdleWatcher, Option<UvError>);
-impl Callback for IdleCallback { }
+pub trait WatcherInterop {
+    fn event_loop(&self) -> Loop;
+    fn install_watcher_data(&mut self);
+    fn get_watcher_data<'r>(&'r mut self) -> &'r mut WatcherData;
+    fn drop_watcher_data(&mut self);
+}
 
-pub impl IdleWatcher {
-    fn new(loop_: &mut Loop) -> IdleWatcher {
+impl<H, W: Watcher + NativeHandle<*H>> WatcherInterop for W {
+    /// Get the uv event loop from a Watcher
+    pub fn event_loop(&self) -> Loop {
         unsafe {
-            let handle = uvll::idle_new();
-            assert!(handle.is_not_null());
-            assert!(0 == uvll::idle_init(loop_.native_handle(), handle));
-            uvll::set_data_for_uv_handle(handle, null::<()>());
-            NativeHandle::from_native_handle(handle)
+            let handle = self.native_handle();
+            let loop_ = uvll::get_loop_for_uv_handle(handle);
+            NativeHandle::from_native_handle(loop_)
         }
     }
 
-    fn start(&mut self, cb: IdleCallback) {
-
-        set_watcher_callback(self, cb);
+    pub fn install_watcher_data(&mut self) {
         unsafe {
-            assert!(0 == uvll::idle_start(self.native_handle(), idle_cb))
-        };
-
-        extern fn idle_cb(handle: *uvll::uv_idle_t, status: c_int) {
-            let idle_watcher: IdleWatcher = NativeHandle::from_native_handle(handle);
-            let cb: &IdleCallback = borrow_callback_from_watcher(&idle_watcher);
-            let status = status_to_maybe_uv_error(handle, status);
-            (*cb)(idle_watcher, status);
+            let data = ~WatcherData {
+                read_cb: None,
+                write_cb: None,
+                connect_cb: None,
+                close_cb: None,
+                alloc_cb: None,
+                idle_cb: None
+            };
+            let data = transmute::<~WatcherData, *c_void>(data);
+            uvll::set_data_for_uv_handle(self.native_handle(), data);
         }
     }
 
-    fn stop(&mut self) {
-        unsafe { assert!(0 == uvll::idle_stop(self.native_handle())); }
-    }
-
-    fn close(self) {
-        unsafe { uvll::close(self.native_handle(), close_cb) };
-
-        extern fn close_cb(handle: *uvll::uv_idle_t) {
-            let mut idle_watcher = NativeHandle::from_native_handle(handle);
-            drop_watcher_callback::<uvll::uv_idle_t, IdleWatcher, IdleCallback>(&mut idle_watcher);
-            unsafe { uvll::idle_delete(handle) };
+    pub fn get_watcher_data<'r>(&'r mut self) -> &'r mut WatcherData {
+        unsafe {
+            let data = uvll::get_data_for_uv_handle(self.native_handle());
+            let data = transmute::<&*c_void, &mut ~WatcherData>(&data);
+            return &mut **data;
         }
     }
-}
 
-impl NativeHandle<*uvll::uv_idle_t> for IdleWatcher {
-    fn from_native_handle(handle: *uvll::uv_idle_t) -> IdleWatcher {
-        IdleWatcher(handle)
-    }
-    fn native_handle(&self) -> *uvll::uv_idle_t {
-        match self { &IdleWatcher(ptr) => ptr }
+    pub fn drop_watcher_data(&mut self) {
+        unsafe {
+            let data = uvll::get_data_for_uv_handle(self.native_handle());
+            let _data = transmute::<*c_void, ~WatcherData>(data);
+            uvll::set_data_for_uv_handle(self.native_handle(), null::<()>());
+        }
     }
 }
 
@@ -213,148 +225,70 @@ fn error_smoke_test() {
     assert!(err.to_str() == ~"EOF: end of file");
 }
 
-
-/// Given a uv handle, convert a callback status to a UvError
-// XXX: Follow the pattern below by parameterizing over T: Watcher, not T
-pub fn status_to_maybe_uv_error<T>(handle: *T, status: c_int) -> Option<UvError> {
-    if status != -1 {
-        None
-    } else {
-        unsafe {
-            rtdebug!("handle: %x", handle as uint);
-            let loop_ = uvll::get_loop_for_uv_handle(handle);
-            rtdebug!("loop: %x", loop_ as uint);
-            let err = uvll::last_error(loop_);
-            Some(UvError(err))
-        }
-    }
-}
-
-/// Get the uv event loop from a Watcher
-pub fn loop_from_watcher<H, W: Watcher + NativeHandle<*H>>(
-    watcher: &W) -> Loop {
-
-    let handle = watcher.native_handle();
-    let loop_ = unsafe { uvll::get_loop_for_uv_handle(handle) };
-    NativeHandle::from_native_handle(loop_)
-}
-
-/// Set the custom data on a handle to a callback Note: This is only
-/// suitable for watchers that make just one type of callback.  For
-/// others use WatcherData
-pub fn set_watcher_callback<H, W: Watcher + NativeHandle<*H>, CB: Callback>(
-    watcher: &mut W, cb: CB) {
-
-    drop_watcher_callback::<H, W, CB>(watcher);
-    // XXX: Boxing the callback so it fits into a
-    // pointer. Unfortunate extra allocation
-    let boxed_cb = ~cb;
-    let data = unsafe { transmute::<~CB, *c_void>(boxed_cb) };
-    unsafe { uvll::set_data_for_uv_handle(watcher.native_handle(), data) };
-}
-
-/// Delete a callback from a handle's custom data
-pub fn drop_watcher_callback<H, W: Watcher + NativeHandle<*H>, CB: Callback>(
-    watcher: &mut W) {
-
+pub fn last_uv_error<H, W: Watcher + NativeHandle<*H>>(watcher: &W) -> UvError {
     unsafe {
-        let handle = watcher.native_handle();
-        let handle_data: *c_void = uvll::get_data_for_uv_handle(handle);
-        if handle_data.is_not_null() {
-            // Take ownership of the callback and drop it
-            let _cb = transmute::<*c_void, ~CB>(handle_data);
-            // Make sure the pointer is zeroed
-            uvll::set_data_for_uv_handle(watcher.native_handle(), null::<()>());
-        }
+        let loop_ = watcher.event_loop();
+        UvError(uvll::last_error(loop_.native_handle()))
     }
 }
 
-/// Take a pointer to the callback installed as custom data
-pub fn borrow_callback_from_watcher<H, W: Watcher + NativeHandle<*H>,
-                                CB: Callback>(watcher: &W) -> &CB {
-
-    unsafe {
-        let handle = watcher.native_handle();
-        let handle_data: *c_void = uvll::get_data_for_uv_handle(handle);
-        assert!(handle_data.is_not_null());
-        let cb = transmute::<&*c_void, &~CB>(&handle_data);
-        return &**cb;
-    }
-}
+pub fn uv_error_to_io_error(uverr: UvError) -> IoError {
 
-/// Take ownership of the callback installed as custom data
-pub fn take_callback_from_watcher<H, W: Watcher + NativeHandle<*H>, CB: Callback>(
-    watcher: &mut W) -> CB {
+    // XXX: Could go in str::raw
+    unsafe fn c_str_to_static_slice(s: *libc::c_char) -> &'static str {
+        let s = s as *u8;
+        let mut curr = s, len = 0u;
+        while *curr != 0u8 {
+            len += 1u;
+            curr = ptr::offset(s, len);
+        }
 
-    unsafe {
-        let handle = watcher.native_handle();
-        let handle_data: *c_void = uvll::get_data_for_uv_handle(handle);
-        assert!(handle_data.is_not_null());
-        uvll::set_data_for_uv_handle(handle, null::<()>());
-        let cb: ~CB = transmute::<*c_void, ~CB>(handle_data);
-        let cb = match cb { ~cb => cb };
-        return cb;
+        str::raw::buf_as_slice(s, len, |d| cast::transmute(d))
     }
-}
 
-/// Callbacks used by StreamWatchers, set as custom data on the foreign handle
-struct WatcherData {
-    read_cb: Option<ReadCallback>,
-    write_cb: Option<ConnectionCallback>,
-    connect_cb: Option<ConnectionCallback>,
-    close_cb: Option<NullCallback>,
-    alloc_cb: Option<AllocCallback>,
-    buf: Option<Buf>
-}
 
-pub fn install_watcher_data<H, W: Watcher + NativeHandle<*H>>(watcher: &mut W) {
     unsafe {
-        let data = ~WatcherData {
-            read_cb: None,
-            write_cb: None,
-            connect_cb: None,
-            close_cb: None,
-            alloc_cb: None,
-            buf: None
+        // Importing error constants
+        use rt::uv::uvll::*;
+        use rt::io::*;
+
+        // uv error descriptions are static
+        let c_desc = uvll::strerror(&*uverr);
+        let desc = c_str_to_static_slice(c_desc);
+
+        let kind = match uverr.code {
+            UNKNOWN => OtherIoError,
+            OK => OtherIoError,
+            EOF => EndOfFile,
+            EACCES => PermissionDenied,
+            ECONNREFUSED => ConnectionRefused,
+            e => {
+                abort!("unknown uv error code: %u", e as uint);
+            }
         };
-        let data = transmute::<~WatcherData, *c_void>(data);
-        uvll::set_data_for_uv_handle(watcher.native_handle(), data);
-    }
-}
-
-pub fn get_watcher_data<'r, H, W: Watcher + NativeHandle<*H>>(
-    watcher: &'r mut W) -> &'r mut WatcherData {
 
-    unsafe {
-        let data = uvll::get_data_for_uv_handle(watcher.native_handle());
-        let data = transmute::<&*c_void, &mut ~WatcherData>(&data);
-        return &mut **data;
-    }
-}
-
-pub fn drop_watcher_data<H, W: Watcher + NativeHandle<*H>>(watcher: &mut W) {
-    unsafe {
-        let data = uvll::get_data_for_uv_handle(watcher.native_handle());
-        let _data = transmute::<*c_void, ~WatcherData>(data);
-        uvll::set_data_for_uv_handle(watcher.native_handle(), null::<()>());
+        IoError {
+            kind: kind,
+            desc: desc,
+            detail: None
+        }
     }
 }
 
-#[test]
-fn test_slice_to_uv_buf() {
-    let slice = [0, .. 20];
-    let buf = slice_to_uv_buf(slice);
-
-    assert!(buf.len == 20);
-
-    unsafe {
-        let base = transmute::<*u8, *mut u8>(buf.base);
-        (*base) = 1;
-        (*ptr::mut_offset(base, 1)) = 2;
+/// Given a uv handle, convert a callback status to a UvError
+// XXX: Follow the pattern below by parameterizing over T: Watcher, not T
+pub fn status_to_maybe_uv_error<T>(handle: *T, status: c_int) -> Option<UvError> {
+    if status != -1 {
+        None
+    } else {
+        unsafe {
+            rtdebug!("handle: %x", handle as uint);
+            let loop_ = uvll::get_loop_for_uv_handle(handle);
+            rtdebug!("loop: %x", loop_ as uint);
+            let err = uvll::last_error(loop_);
+            Some(UvError(err))
+        }
     }
-
-    assert!(slice[0] == 1);
-    assert!(slice[1] == 2);
 }
 
 /// The uv buffer type
@@ -395,6 +329,24 @@ pub fn vec_from_uv_buf(buf: Buf) -> Option<~[u8]> {
 }
 
 #[test]
+fn test_slice_to_uv_buf() {
+    let slice = [0, .. 20];
+    let buf = slice_to_uv_buf(slice);
+
+    assert!(buf.len == 20);
+
+    unsafe {
+        let base = transmute::<*u8, *mut u8>(buf.base);
+        (*base) = 1;
+        (*ptr::mut_offset(base, 1)) = 2;
+    }
+
+    assert!(slice[0] == 1);
+    assert!(slice[1] == 2);
+}
+
+
+#[test]
 fn loop_smoke_test() {
     do run_in_bare_thread {
         let mut loop_ = Loop::new();
@@ -409,7 +361,7 @@ fn idle_new_then_close() {
     do run_in_bare_thread {
         let mut loop_ = Loop::new();
         let idle_watcher = { IdleWatcher::new(&mut loop_) };
-        idle_watcher.close();
+        idle_watcher.close(||());
     }
 }
 
@@ -425,7 +377,7 @@ fn idle_smoke_test() {
             assert!(status.is_none());
             if unsafe { *count_ptr == 10 } {
                 idle_watcher.stop();
-                idle_watcher.close();
+                idle_watcher.close(||());
             } else {
                 unsafe { *count_ptr = *count_ptr + 1; }
             }
@@ -449,7 +401,7 @@ fn idle_start_stop_start() {
                 assert!(status.is_none());
                 let mut idle_watcher = idle_watcher;
                 idle_watcher.stop();
-                idle_watcher.close();
+                idle_watcher.close(||());
             }
         }
         loop_.run();
diff --git a/src/libcore/rt/uv/net.rs b/src/libcore/rt/uv/net.rs
index 3e6aa657c57..fd78b552119 100644
--- a/src/libcore/rt/uv/net.rs
+++ b/src/libcore/rt/uv/net.rs
@@ -10,21 +10,15 @@
 
 use prelude::*;
 use libc::{size_t, ssize_t, c_int, c_void};
-use cast::transmute_mut_region;
-use super::super::uvll;
-use super::super::uvll::*;
-use super::{Loop, Watcher, Request, UvError, Buf, Callback, NativeHandle, NullCallback,
-            loop_from_watcher, status_to_maybe_uv_error,
-            install_watcher_data, get_watcher_data, drop_watcher_data,
-            vec_to_uv_buf, vec_from_uv_buf};
-use super::super::io::net::ip::{IpAddr, Ipv4, Ipv6};
-
-#[cfg(test)] use cell::Cell;
-#[cfg(test)] use unstable::run_in_bare_thread;
-#[cfg(test)] use super::super::thread::Thread;
-#[cfg(test)] use super::super::test::*;
-
-fn ip4_as_uv_ip4(addr: IpAddr, f: &fn(*sockaddr_in)) {
+use rt::uv::uvll;
+use rt::uv::uvll::*;
+use rt::uv::{AllocCallback, ConnectionCallback, ReadCallback};
+use rt::uv::{Loop, Watcher, Request, UvError, Buf, NativeHandle, NullCallback,
+             status_to_maybe_uv_error};
+use rt::io::net::ip::{IpAddr, Ipv4, Ipv6};
+use rt::uv::last_uv_error;
+
+fn ip4_as_uv_ip4<T>(addr: IpAddr, f: &fn(*sockaddr_in) -> T) -> T {
     match addr {
         Ipv4(a, b, c, d, p) => {
             unsafe {
@@ -34,7 +28,7 @@ fn ip4_as_uv_ip4(addr: IpAddr, f: &fn(*sockaddr_in)) {
                                                 c as uint,
                                                 d as uint), p as int);
                 do (|| {
-                    f(addr);
+                    f(addr)
                 }).finally {
                     free_ip4_addr(addr);
                 }
@@ -47,34 +41,23 @@ fn ip4_as_uv_ip4(addr: IpAddr, f: &fn(*sockaddr_in)) {
 // uv_stream t is the parent class of uv_tcp_t, uv_pipe_t, uv_tty_t
 // and uv_file_t
 pub struct StreamWatcher(*uvll::uv_stream_t);
-
-impl Watcher for StreamWatcher {
-    fn event_loop(&self) -> Loop {
-        loop_from_watcher(self)
-    }
-}
-
-pub type ReadCallback = ~fn(StreamWatcher, int, Buf, Option<UvError>);
-impl Callback for ReadCallback { }
-
-// XXX: The uv alloc callback also has a *uv_handle_t arg
-pub type AllocCallback = ~fn(uint) -> Buf;
-impl Callback for AllocCallback { }
+impl Watcher for StreamWatcher { }
 
 pub impl StreamWatcher {
 
     fn read_start(&mut self, alloc: AllocCallback, cb: ReadCallback) {
-        // XXX: Borrowchk problems
-        let data = get_watcher_data(unsafe { transmute_mut_region(self) });
-        data.alloc_cb = Some(alloc);
-        data.read_cb = Some(cb);
+        {
+            let data = self.get_watcher_data();
+            data.alloc_cb = Some(alloc);
+            data.read_cb = Some(cb);
+        }
 
         let handle = self.native_handle();
         unsafe { uvll::read_start(handle, alloc_cb, read_cb); }
 
         extern fn alloc_cb(stream: *uvll::uv_stream_t, suggested_size: size_t) -> Buf {
             let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(stream);
-            let data = get_watcher_data(&mut stream_watcher);
+            let data = stream_watcher.get_watcher_data();
             let alloc_cb = data.alloc_cb.get_ref();
             return (*alloc_cb)(suggested_size as uint);
         }
@@ -83,7 +66,7 @@ pub impl StreamWatcher {
             rtdebug!("buf addr: %x", buf.base as uint);
             rtdebug!("buf len: %d", buf.len as int);
             let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(stream);
-            let data = get_watcher_data(&mut stream_watcher);
+            let data = stream_watcher.get_watcher_data();
             let cb = data.read_cb.get_ref();
             let status = status_to_maybe_uv_error(stream, nread as c_int);
             (*cb)(stream_watcher, nread as int, buf, status);
@@ -98,22 +81,19 @@ pub impl StreamWatcher {
         unsafe { uvll::read_stop(handle); }
     }
 
-    // XXX: Needs to take &[u8], not ~[u8]
-    fn write(&mut self, msg: ~[u8], cb: ConnectionCallback) {
-        // XXX: Borrowck
-        let data = get_watcher_data(unsafe { transmute_mut_region(self) });
-        assert!(data.write_cb.is_none());
-        data.write_cb = Some(cb);
+    fn write(&mut self, buf: Buf, cb: ConnectionCallback) {
+        {
+            let data = self.get_watcher_data();
+            assert!(data.write_cb.is_none());
+            data.write_cb = Some(cb);
+        }
 
         let req = WriteRequest::new();
-        let buf = vec_to_uv_buf(msg);
-        assert!(data.buf.is_none());
-        data.buf = Some(buf);
         let bufs = [buf];
         unsafe {
             assert!(0 == uvll::write(req.native_handle(),
-                                          self.native_handle(),
-                                          bufs, write_cb));
+                                     self.native_handle(),
+                                     bufs, write_cb));
         }
 
         extern fn write_cb(req: *uvll::uv_write_t, status: c_int) {
@@ -121,8 +101,7 @@ pub impl StreamWatcher {
             let mut stream_watcher = write_request.stream();
             write_request.delete();
             let cb = {
-                let data = get_watcher_data(&mut stream_watcher);
-                let _vec = vec_from_uv_buf(data.buf.swap_unwrap());
+                let data = stream_watcher.get_watcher_data();
                 let cb = data.write_cb.swap_unwrap();
                 cb
             };
@@ -142,7 +121,7 @@ pub impl StreamWatcher {
     fn close(self, cb: NullCallback) {
         {
             let mut this = self;
-            let data = get_watcher_data(&mut this);
+            let data = this.get_watcher_data();
             assert!(data.close_cb.is_none());
             data.close_cb = Some(cb);
         }
@@ -152,9 +131,10 @@ pub impl StreamWatcher {
         extern fn close_cb(handle: *uvll::uv_stream_t) {
             let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(handle);
             {
-                get_watcher_data(&mut stream_watcher).close_cb.swap_unwrap()();
+                let mut data = stream_watcher.get_watcher_data();
+                data.close_cb.swap_unwrap()();
             }
-            drop_watcher_data(&mut stream_watcher);
+            stream_watcher.drop_watcher_data();
             unsafe { free_handle(handle as *c_void) }
         }
     }
@@ -171,15 +151,7 @@ impl NativeHandle<*uvll::uv_stream_t> for StreamWatcher {
 }
 
 pub struct TcpWatcher(*uvll::uv_tcp_t);
-
-impl Watcher for TcpWatcher {
-    fn event_loop(&self) -> Loop {
-        loop_from_watcher(self)
-    }
-}
-
-pub type ConnectionCallback = ~fn(StreamWatcher, Option<UvError>);
-impl Callback for ConnectionCallback { }
+impl Watcher for TcpWatcher { }
 
 pub impl TcpWatcher {
     fn new(loop_: &mut Loop) -> TcpWatcher {
@@ -187,21 +159,24 @@ pub impl TcpWatcher {
             let handle = malloc_handle(UV_TCP);
             assert!(handle.is_not_null());
             assert!(0 == uvll::tcp_init(loop_.native_handle(), handle));
-            let mut watcher = NativeHandle::from_native_handle(handle);
-            install_watcher_data(&mut watcher);
+            let mut watcher: TcpWatcher = NativeHandle::from_native_handle(handle);
+            watcher.install_watcher_data();
             return watcher;
         }
     }
 
-    fn bind(&mut self, address: IpAddr) {
+    fn bind(&mut self, address: IpAddr) -> Result<(), UvError> {
         match address {
             Ipv4(*) => {
                 do ip4_as_uv_ip4(address) |addr| {
                     let result = unsafe {
                         uvll::tcp_bind(self.native_handle(), addr)
                     };
-                    // XXX: bind is likely to fail. need real error handling
-                    assert!(result == 0);
+                    if result == 0 {
+                        Ok(())
+                    } else {
+                        Err(last_uv_error(self))
+                    }
                 }
             }
             _ => fail!()
@@ -210,8 +185,8 @@ pub impl TcpWatcher {
 
     fn connect(&mut self, address: IpAddr, cb: ConnectionCallback) {
         unsafe {
-            assert!(get_watcher_data(self).connect_cb.is_none());
-            get_watcher_data(self).connect_cb = Some(cb);
+            assert!(self.get_watcher_data().connect_cb.is_none());
+            self.get_watcher_data().connect_cb = Some(cb);
 
             let connect_handle = ConnectRequest::new().native_handle();
             match address {
@@ -232,7 +207,7 @@ pub impl TcpWatcher {
                 let mut stream_watcher = connect_request.stream();
                 connect_request.delete();
                 let cb: ConnectionCallback = {
-                    let data = get_watcher_data(&mut stream_watcher);
+                    let data = stream_watcher.get_watcher_data();
                     data.connect_cb.swap_unwrap()
                 };
                 let status = status_to_maybe_uv_error(stream_watcher.native_handle(), status);
@@ -242,10 +217,11 @@ pub impl TcpWatcher {
     }
 
     fn listen(&mut self, cb: ConnectionCallback) {
-        // XXX: Borrowck
-        let data = get_watcher_data(unsafe { transmute_mut_region(self) });
-        assert!(data.connect_cb.is_none());
-        data.connect_cb = Some(cb);
+        {
+            let data = self.get_watcher_data();
+            assert!(data.connect_cb.is_none());
+            data.connect_cb = Some(cb);
+        }
 
         unsafe {
             static BACKLOG: c_int = 128; // XXX should be configurable
@@ -257,9 +233,10 @@ pub impl TcpWatcher {
         extern fn connection_cb(handle: *uvll::uv_stream_t, status: c_int) {
             rtdebug!("connection_cb");
             let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(handle);
-            let cb = get_watcher_data(&mut stream_watcher).connect_cb.swap_unwrap();
-            let status = status_to_maybe_uv_error(stream_watcher.native_handle(), status);
-            cb(stream_watcher, status);
+            let data = stream_watcher.get_watcher_data();
+            let cb = data.connect_cb.get_ref();
+            let status = status_to_maybe_uv_error(handle, status);
+            (*cb)(stream_watcher, status);
         }
     }
 
@@ -277,12 +254,8 @@ impl NativeHandle<*uvll::uv_tcp_t> for TcpWatcher {
     }
 }
 
-pub type ConnectCallback = ~fn(ConnectRequest, Option<UvError>);
-impl Callback for ConnectCallback { }
-
 // uv_connect_t is a subclass of uv_req_t
 struct ConnectRequest(*uvll::uv_connect_t);
-
 impl Request for ConnectRequest { }
 
 impl ConnectRequest {
@@ -355,93 +328,109 @@ impl NativeHandle<*uvll::uv_write_t> for WriteRequest {
 }
 
 
-#[test]
-fn connect_close() {
-    do run_in_bare_thread() {
-        let mut loop_ = Loop::new();
-        let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
-        // Connect to a port where nobody is listening
-        let addr = next_test_ip4();
-        do tcp_watcher.connect(addr) |stream_watcher, status| {
-            rtdebug!("tcp_watcher.connect!");
-            assert!(status.is_some());
-            assert!(status.get().name() == ~"ECONNREFUSED");
-            stream_watcher.close(||());
+#[cfg(test)]
+mod test {
+    use super::*;
+    use util::ignore;
+    use cell::Cell;
+    use vec;
+    use unstable::run_in_bare_thread;
+    use rt::thread::Thread;
+    use rt::test::*;
+    use rt::uv::{Loop, AllocCallback};
+    use rt::uv::{vec_from_uv_buf, vec_to_uv_buf, slice_to_uv_buf};
+
+    #[test]
+    fn connect_close() {
+        do run_in_bare_thread() {
+            let mut loop_ = Loop::new();
+            let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
+            // Connect to a port where nobody is listening
+            let addr = next_test_ip4();
+            do tcp_watcher.connect(addr) |stream_watcher, status| {
+                rtdebug!("tcp_watcher.connect!");
+                assert!(status.is_some());
+                assert!(status.get().name() == ~"ECONNREFUSED");
+                stream_watcher.close(||());
+            }
+            loop_.run();
+            loop_.close();
         }
-        loop_.run();
-        loop_.close();
     }
-}
 
-#[test]
-fn listen() {
-    do run_in_bare_thread() {
-        static MAX: int = 10;
-        let mut loop_ = Loop::new();
-        let mut server_tcp_watcher = { TcpWatcher::new(&mut loop_) };
-        let addr = next_test_ip4();
-        server_tcp_watcher.bind(addr);
-        let loop_ = loop_;
-        rtdebug!("listening");
-        do server_tcp_watcher.listen |server_stream_watcher, status| {
-            rtdebug!("listened!");
-            assert!(status.is_none());
-            let mut server_stream_watcher = server_stream_watcher;
-            let mut loop_ = loop_;
-            let client_tcp_watcher = TcpWatcher::new(&mut loop_);
-            let mut client_tcp_watcher = client_tcp_watcher.as_stream();
-            server_stream_watcher.accept(client_tcp_watcher);
-            let count_cell = Cell(0);
-            let server_stream_watcher = server_stream_watcher;
-            rtdebug!("starting read");
-            let alloc: AllocCallback = |size| {
-                vec_to_uv_buf(vec::from_elem(size, 0))
-            };
-            do client_tcp_watcher.read_start(alloc)
-                |stream_watcher, nread, buf, status| {
-
-                rtdebug!("i'm reading!");
-                let buf = vec_from_uv_buf(buf);
-                let mut count = count_cell.take();
-                if status.is_none() {
-                    rtdebug!("got %d bytes", nread);
-                    let buf = buf.unwrap();
-                    for buf.slice(0, nread as uint).each |byte| {
-                        assert!(*byte == count as u8);
-                        rtdebug!("%u", *byte as uint);
-                        count += 1;
-                    }
-                } else {
-                    assert!(count == MAX);
-                    do stream_watcher.close {
-                        server_stream_watcher.close(||());
+    #[test]
+    fn listen() {
+        do run_in_bare_thread() {
+            static MAX: int = 10;
+            let mut loop_ = Loop::new();
+            let mut server_tcp_watcher = { TcpWatcher::new(&mut loop_) };
+            let addr = next_test_ip4();
+            server_tcp_watcher.bind(addr);
+            let loop_ = loop_;
+            rtdebug!("listening");
+            do server_tcp_watcher.listen |server_stream_watcher, status| {
+                rtdebug!("listened!");
+                assert!(status.is_none());
+                let mut server_stream_watcher = server_stream_watcher;
+                let mut loop_ = loop_;
+                let mut client_tcp_watcher = TcpWatcher::new(&mut loop_);
+                let mut client_tcp_watcher = client_tcp_watcher.as_stream();
+                server_stream_watcher.accept(client_tcp_watcher);
+                let count_cell = Cell(0);
+                let server_stream_watcher = server_stream_watcher;
+                rtdebug!("starting read");
+                let alloc: AllocCallback = |size| {
+                    vec_to_uv_buf(vec::from_elem(size, 0))
+                };
+                do client_tcp_watcher.read_start(alloc)
+                    |stream_watcher, nread, buf, status| {
+
+                    rtdebug!("i'm reading!");
+                    let buf = vec_from_uv_buf(buf);
+                    let mut count = count_cell.take();
+                    if status.is_none() {
+                        rtdebug!("got %d bytes", nread);
+                        let buf = buf.unwrap();
+                        for buf.slice(0, nread as uint).each |byte| {
+                            assert!(*byte == count as u8);
+                            rtdebug!("%u", *byte as uint);
+                            count += 1;
+                        }
+                    } else {
+                        assert!(count == MAX);
+                        do stream_watcher.close {
+                            server_stream_watcher.close(||());
+                        }
                     }
+                    count_cell.put_back(count);
                 }
-                count_cell.put_back(count);
             }
-        }
 
-        let _client_thread = do Thread::start {
-            rtdebug!("starting client thread");
-            let mut loop_ = Loop::new();
-            let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
-            do tcp_watcher.connect(addr) |stream_watcher, status| {
-                rtdebug!("connecting");
-                assert!(status.is_none());
-                let mut stream_watcher = stream_watcher;
-                let msg = ~[0, 1, 2, 3, 4, 5, 6 ,7 ,8, 9];
-                do stream_watcher.write(msg) |stream_watcher, status| {
-                    rtdebug!("writing");
+            let _client_thread = do Thread::start {
+                rtdebug!("starting client thread");
+                let mut loop_ = Loop::new();
+                let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
+                do tcp_watcher.connect(addr) |stream_watcher, status| {
+                    rtdebug!("connecting");
                     assert!(status.is_none());
-                    stream_watcher.close(||());
+                    let mut stream_watcher = stream_watcher;
+                    let msg = ~[0, 1, 2, 3, 4, 5, 6 ,7 ,8, 9];
+                    let buf = slice_to_uv_buf(msg);
+                    let msg_cell = Cell(msg);
+                    do stream_watcher.write(buf) |stream_watcher, status| {
+                        rtdebug!("writing");
+                        assert!(status.is_none());
+                        let msg_cell = Cell(msg_cell.take());
+                        stream_watcher.close(||ignore(msg_cell.take()));
+                    }
                 }
-            }
+                loop_.run();
+                loop_.close();
+            };
+
+            let mut loop_ = loop_;
             loop_.run();
             loop_.close();
-        };
-
-        let mut loop_ = loop_;
-        loop_.run();
-        loop_.close();
+        }
     }
 }
diff --git a/src/libcore/rt/uvio.rs b/src/libcore/rt/uv/uvio.rs
index 24bffd8d1cd..cc9eb2ada4d 100644
--- a/src/libcore/rt/uvio.rs
+++ b/src/libcore/rt/uv/uvio.rs
@@ -10,20 +10,24 @@
 
 use option::*;
 use result::*;
-
-use super::io::net::ip::IpAddr;
-use super::uv::*;
-use super::rtio::*;
 use ops::Drop;
 use old_iter::CopyableIter;
 use cell::{Cell, empty_cell};
 use cast::transmute;
-use super::sched::{Scheduler, local_sched};
+use clone::Clone;
+use rt::io::IoError;
+use rt::io::net::ip::IpAddr;
+use rt::uv::*;
+use rt::uv::idle::IdleWatcher;
+use rt::rtio::*;
+use rt::sched::{Scheduler, local_sched};
+use rt::io::{standard_error, OtherIoError};
+use rt::tube::Tube;
 
 #[cfg(test)] use container::Container;
 #[cfg(test)] use uint;
 #[cfg(test)] use unstable::run_in_bare_thread;
-#[cfg(test)] use super::test::*;
+#[cfg(test)] use rt::test::*;
 
 pub struct UvEventLoop {
     uvio: UvIoFactory
@@ -64,7 +68,7 @@ impl EventLoop for UvEventLoop {
             assert!(status.is_none());
             let mut idle_watcher = idle_watcher;
             idle_watcher.stop();
-            idle_watcher.close();
+            idle_watcher.close(||());
             f();
         }
     }
@@ -100,11 +104,11 @@ impl IoFactory for UvIoFactory {
     // Connect to an address and return a new stream
     // NB: This blocks the task waiting on the connection.
     // It would probably be better to return a future
-    fn connect(&mut self, addr: IpAddr) -> Option<~StreamObject> {
+    fn tcp_connect(&mut self, addr: IpAddr) -> Result<~RtioTcpStreamObject, IoError> {
         // Create a cell in the task to hold the result. We will fill
         // the cell before resuming the task.
         let result_cell = empty_cell();
-        let result_cell_ptr: *Cell<Option<~StreamObject>> = &result_cell;
+        let result_cell_ptr: *Cell<Result<~RtioTcpStreamObject, IoError>> = &result_cell;
 
         let scheduler = local_sched::take();
         assert!(scheduler.in_task_context());
@@ -122,21 +126,26 @@ impl IoFactory for UvIoFactory {
             // Wait for a connection
             do tcp_watcher.connect(addr) |stream_watcher, status| {
                 rtdebug!("connect: in connect callback");
-                let maybe_stream = if status.is_none() {
+                if status.is_none() {
                     rtdebug!("status is none");
-                    Some(~UvStream(stream_watcher))
+                    let res = Ok(~UvTcpStream { watcher: stream_watcher });
+
+                    // Store the stream in the task's stack
+                    unsafe { (*result_cell_ptr).put_back(res); }
+
+                    // Context switch
+                    let scheduler = local_sched::take();
+                    scheduler.resume_task_immediately(task_cell.take());
                 } else {
                     rtdebug!("status is some");
-                    stream_watcher.close(||());
-                    None
+                    let task_cell = Cell(task_cell.take());
+                    do stream_watcher.close {
+                        let res = Err(uv_error_to_io_error(status.get()));
+                        unsafe { (*result_cell_ptr).put_back(res); }
+                        let scheduler = local_sched::take();
+                        scheduler.resume_task_immediately(task_cell.take());
+                    }
                 };
-
-                // Store the stream in the task's stack
-                unsafe { (*result_cell_ptr).put_back(maybe_stream); }
-
-                // Context switch
-                let scheduler = local_sched::take();
-                scheduler.resume_task_immediately(task_cell.take());
             }
         }
 
@@ -144,103 +153,124 @@ impl IoFactory for UvIoFactory {
         return result_cell.take();
     }
 
-    fn bind(&mut self, addr: IpAddr) -> Option<~TcpListenerObject> {
+    fn tcp_bind(&mut self, addr: IpAddr) -> Result<~RtioTcpListenerObject, IoError> {
         let mut watcher = TcpWatcher::new(self.uv_loop());
-        watcher.bind(addr);
-        return Some(~UvTcpListener(watcher));
+        match watcher.bind(addr) {
+            Ok(_) => Ok(~UvTcpListener::new(watcher)),
+            Err(uverr) => {
+                let scheduler = local_sched::take();
+                do scheduler.deschedule_running_task_and_then |task| {
+                    let task_cell = Cell(task);
+                    do watcher.as_stream().close {
+                        let scheduler = local_sched::take();
+                        scheduler.resume_task_immediately(task_cell.take());
+                    }
+                }
+                Err(uv_error_to_io_error(uverr))
+            }
+        }
     }
 }
 
-pub struct UvTcpListener(TcpWatcher);
+// FIXME #6090: Prefer newtype structs but Drop doesn't work
+pub struct UvTcpListener {
+    watcher: TcpWatcher,
+    listening: bool,
+    incoming_streams: Tube<Result<~RtioTcpStreamObject, IoError>>
+}
 
 impl UvTcpListener {
-    fn watcher(&self) -> TcpWatcher {
-        match self { &UvTcpListener(w) => w }
+    fn new(watcher: TcpWatcher) -> UvTcpListener {
+        UvTcpListener {
+            watcher: watcher,
+            listening: false,
+            incoming_streams: Tube::new()
+        }
     }
 
-    fn close(&self) {
-        // XXX: Need to wait until close finishes before returning
-        self.watcher().as_stream().close(||());
-    }
+    fn watcher(&self) -> TcpWatcher { self.watcher }
 }
 
 impl Drop for UvTcpListener {
     fn finalize(&self) {
-        // XXX: Again, this never gets called. Use .close() instead
-        //self.watcher().as_stream().close(||());
+        let watcher = self.watcher();
+        let scheduler = local_sched::take();
+        do scheduler.deschedule_running_task_and_then |task| {
+            let task_cell = Cell(task);
+            do watcher.as_stream().close {
+                let scheduler = local_sched::take();
+                scheduler.resume_task_immediately(task_cell.take());
+            }
+        }
     }
 }
 
-impl TcpListener for UvTcpListener {
+impl RtioTcpListener for UvTcpListener {
 
-    fn listen(&mut self) -> Option<~StreamObject> {
+    fn accept(&mut self) -> Result<~RtioTcpStreamObject, IoError> {
         rtdebug!("entering listen");
-        let result_cell = empty_cell();
-        let result_cell_ptr: *Cell<Option<~StreamObject>> = &result_cell;
 
-        let server_tcp_watcher = self.watcher();
-
-        let scheduler = local_sched::take();
-        assert!(scheduler.in_task_context());
+        if self.listening {
+            return self.incoming_streams.recv();
+        }
 
-        do scheduler.deschedule_running_task_and_then |task| {
-            let task_cell = Cell(task);
-            let mut server_tcp_watcher = server_tcp_watcher;
-            do server_tcp_watcher.listen |server_stream_watcher, status| {
-                let maybe_stream = if status.is_none() {
-                    let mut server_stream_watcher = server_stream_watcher;
-                    let mut loop_ = loop_from_watcher(&server_stream_watcher);
-                    let client_tcp_watcher = TcpWatcher::new(&mut loop_).as_stream();
-                    // XXX: Needs to be surfaced in interface
-                    server_stream_watcher.accept(client_tcp_watcher);
-                    Some(~UvStream::new(client_tcp_watcher))
-                } else {
-                    None
-                };
+        self.listening = true;
 
-                unsafe { (*result_cell_ptr).put_back(maybe_stream); }
+        let server_tcp_watcher = self.watcher();
+        let incoming_streams_cell = Cell(self.incoming_streams.clone());
+
+        let incoming_streams_cell = Cell(incoming_streams_cell.take());
+        let mut server_tcp_watcher = server_tcp_watcher;
+        do server_tcp_watcher.listen |server_stream_watcher, status| {
+            let maybe_stream = if status.is_none() {
+                let mut server_stream_watcher = server_stream_watcher;
+                let mut loop_ = server_stream_watcher.event_loop();
+                let mut client_tcp_watcher = TcpWatcher::new(&mut loop_);
+                let client_tcp_watcher = client_tcp_watcher.as_stream();
+                // XXX: Need's to be surfaced in interface
+                server_stream_watcher.accept(client_tcp_watcher);
+                Ok(~UvTcpStream { watcher: client_tcp_watcher })
+            } else {
+                Err(standard_error(OtherIoError))
+            };
 
-                rtdebug!("resuming task from listen");
-                // Context switch
-                let scheduler = local_sched::take();
-                scheduler.resume_task_immediately(task_cell.take());
-            }
+            let mut incoming_streams = incoming_streams_cell.take();
+            incoming_streams.send(maybe_stream);
+            incoming_streams_cell.put_back(incoming_streams);
         }
 
-        assert!(!result_cell.is_empty());
-        return result_cell.take();
+        return self.incoming_streams.recv();
     }
 }
 
-pub struct UvStream(StreamWatcher);
-
-impl UvStream {
-    fn new(watcher: StreamWatcher) -> UvStream {
-        UvStream(watcher)
-    }
-
-    fn watcher(&self) -> StreamWatcher {
-        match self { &UvStream(w) => w }
-    }
+// FIXME #6090: Prefer newtype structs but Drop doesn't work
+pub struct UvTcpStream {
+    watcher: StreamWatcher
+}
 
-    // XXX: finalize isn't working for ~UvStream???
-    fn close(&self) {
-        // XXX: Need to wait until this finishes before returning
-        self.watcher().close(||());
-    }
+impl UvTcpStream {
+    fn watcher(&self) -> StreamWatcher { self.watcher }
 }
 
-impl Drop for UvStream {
+impl Drop for UvTcpStream {
     fn finalize(&self) {
-        rtdebug!("closing stream");
-        //self.watcher().close(||());
+        rtdebug!("closing tcp stream");
+        let watcher = self.watcher();
+        let scheduler = local_sched::take();
+        do scheduler.deschedule_running_task_and_then |task| {
+            let task_cell = Cell(task);
+            do watcher.close {
+                let scheduler = local_sched::take();
+                scheduler.resume_task_immediately(task_cell.take());
+            }
+        }
     }
 }
 
-impl Stream for UvStream {
-    fn read(&mut self, buf: &mut [u8]) -> Result<uint, ()> {
+impl RtioTcpStream for UvTcpStream {
+    fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
         let result_cell = empty_cell();
-        let result_cell_ptr: *Cell<Result<uint, ()>> = &result_cell;
+        let result_cell_ptr: *Cell<Result<uint, IoError>> = &result_cell;
 
         let scheduler = local_sched::take();
         assert!(scheduler.in_task_context());
@@ -271,7 +301,7 @@ impl Stream for UvStream {
                     assert!(nread >= 0);
                     Ok(nread as uint)
                 } else {
-                    Err(())
+                    Err(standard_error(OtherIoError))
                 };
 
                 unsafe { (*result_cell_ptr).put_back(result); }
@@ -285,9 +315,9 @@ impl Stream for UvStream {
         return result_cell.take();
     }
 
-    fn write(&mut self, buf: &[u8]) -> Result<(), ()> {
+    fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
         let result_cell = empty_cell();
-        let result_cell_ptr: *Cell<Result<(), ()>> = &result_cell;
+        let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
         let scheduler = local_sched::take();
         assert!(scheduler.in_task_context());
         let watcher = self.watcher();
@@ -295,14 +325,12 @@ impl Stream for UvStream {
         do scheduler.deschedule_running_task_and_then |task| {
             let mut watcher = watcher;
             let task_cell = Cell(task);
-            let buf = unsafe { &*buf_ptr };
-            // XXX: OMGCOPIES
-            let buf = buf.to_vec();
+            let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
             do watcher.write(buf) |_watcher, status| {
                 let result = if status.is_none() {
                     Ok(())
                 } else {
-                    Err(())
+                    Err(standard_error(OtherIoError))
                 };
 
                 unsafe { (*result_cell_ptr).put_back(result); }
@@ -320,10 +348,12 @@ impl Stream for UvStream {
 #[test]
 fn test_simple_io_no_connect() {
     do run_in_newsched_task {
-        let io = unsafe { local_sched::unsafe_borrow_io() };
-        let addr = next_test_ip4();
-        let maybe_chan = io.connect(addr);
-        assert!(maybe_chan.is_none());
+        unsafe {
+            let io = local_sched::unsafe_borrow_io();
+            let addr = next_test_ip4();
+            let maybe_chan = (*io).tcp_connect(addr);
+            assert!(maybe_chan.is_err());
+        }
     }
 }
 
@@ -336,8 +366,8 @@ fn test_simple_tcp_server_and_client() {
         do spawntask_immediately {
             unsafe {
                 let io = local_sched::unsafe_borrow_io();
-                let mut listener = io.bind(addr).unwrap();
-                let mut stream = listener.listen().unwrap();
+                let mut listener = (*io).tcp_bind(addr).unwrap();
+                let mut stream = listener.accept().unwrap();
                 let mut buf = [0, .. 2048];
                 let nread = stream.read(buf).unwrap();
                 assert!(nread == 8);
@@ -345,17 +375,14 @@ fn test_simple_tcp_server_and_client() {
                     rtdebug!("%u", buf[i] as uint);
                     assert!(buf[i] == i as u8);
                 }
-                stream.close();
-                listener.close();
             }
         }
 
         do spawntask_immediately {
             unsafe {
                 let io = local_sched::unsafe_borrow_io();
-                let mut stream = io.connect(addr).unwrap();
+                let mut stream = (*io).tcp_connect(addr).unwrap();
                 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
-                stream.close();
             }
         }
     }
@@ -368,8 +395,8 @@ fn test_read_and_block() {
 
         do spawntask_immediately {
             let io = unsafe { local_sched::unsafe_borrow_io() };
-            let mut listener = io.bind(addr).unwrap();
-            let mut stream = listener.listen().unwrap();
+            let mut listener = unsafe { (*io).tcp_bind(addr).unwrap() };
+            let mut stream = listener.accept().unwrap();
             let mut buf = [0, .. 2048];
 
             let expected = 32;
@@ -399,19 +426,17 @@ fn test_read_and_block() {
 
             // Make sure we had multiple reads
             assert!(reads > 1);
-
-            stream.close();
-            listener.close();
         }
 
         do spawntask_immediately {
-            let io = unsafe { local_sched::unsafe_borrow_io() };
-            let mut stream = io.connect(addr).unwrap();
-            stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
-            stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
-            stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
-            stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
-            stream.close();
+            unsafe {
+                let io = local_sched::unsafe_borrow_io();
+                let mut stream = (*io).tcp_connect(addr).unwrap();
+                stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
+                stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
+                stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
+                stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
+            }
         }
 
     }
@@ -426,34 +451,33 @@ fn test_read_read_read() {
         do spawntask_immediately {
             unsafe {
                 let io = local_sched::unsafe_borrow_io();
-                let mut listener = io.bind(addr).unwrap();
-                let mut stream = listener.listen().unwrap();
+                let mut listener = (*io).tcp_bind(addr).unwrap();
+                let mut stream = listener.accept().unwrap();
                 let buf = [1, .. 2048];
                 let mut total_bytes_written = 0;
                 while total_bytes_written < MAX {
                     stream.write(buf);
                     total_bytes_written += buf.len();
                 }
-                stream.close();
-                listener.close();
             }
         }
 
         do spawntask_immediately {
-            let io = unsafe { local_sched::unsafe_borrow_io() };
-            let mut stream = io.connect(addr).unwrap();
-            let mut buf = [0, .. 2048];
-            let mut total_bytes_read = 0;
-            while total_bytes_read < MAX {
-                let nread = stream.read(buf).unwrap();
-                rtdebug!("read %u bytes", nread as uint);
-                total_bytes_read += nread;
-                for uint::range(0, nread) |i| {
-                    assert!(buf[i] == 1);
+            unsafe {
+                let io = local_sched::unsafe_borrow_io();
+                let mut stream = (*io).tcp_connect(addr).unwrap();
+                let mut buf = [0, .. 2048];
+                let mut total_bytes_read = 0;
+                while total_bytes_read < MAX {
+                    let nread = stream.read(buf).unwrap();
+                    rtdebug!("read %u bytes", nread as uint);
+                    total_bytes_read += nread;
+                    for uint::range(0, nread) |i| {
+                        assert!(buf[i] == 1);
+                    }
                 }
+                rtdebug!("read %u bytes total", total_bytes_read as uint);
             }
-            rtdebug!("read %u bytes total", total_bytes_read as uint);
-            stream.close();
         }
     }
 }
diff --git a/src/libcore/rt/uvll.rs b/src/libcore/rt/uv/uvll.rs
index 4bff3bff7d3..2a2812c6718 100644
--- a/src/libcore/rt/uvll.rs
+++ b/src/libcore/rt/uv/uvll.rs
@@ -33,6 +33,13 @@ use libc::{size_t, c_int, c_uint, c_void, c_char, uintptr_t};
 use libc::{malloc, free};
 use prelude::*;
 
+pub static UNKNOWN: c_int = -1;
+pub static OK: c_int = 0;
+pub static EOF: c_int = 1;
+pub static EADDRINFO: c_int = 2;
+pub static EACCES: c_int = 3;
+pub static ECONNREFUSED: c_int = 12;
+
 pub struct uv_err_t {
     code: c_int,
     sys_errno_: c_int
diff --git a/src/libcore/sys.rs b/src/libcore/sys.rs
index 4eca7ebbb37..50a739ec67d 100644
--- a/src/libcore/sys.rs
+++ b/src/libcore/sys.rs
@@ -202,10 +202,12 @@ impl FailWithCause for &'static str {
 
 // FIXME #4427: Temporary until rt::rt_fail_ goes away
 pub fn begin_unwind_(msg: *c_char, file: *c_char, line: size_t) -> ! {
-    use rt::{context, OldTaskContext};
-    use rt::local_services::unsafe_borrow_local_services;
+    use option::Option;
+    use rt::{context, OldTaskContext, TaskContext};
+    use rt::local_services::{unsafe_borrow_local_services, Unwinder};
 
-    match context() {
+    let context = context();
+    match context {
         OldTaskContext => {
             unsafe {
                 gc::cleanup_stack_for_failure();
@@ -214,11 +216,26 @@ pub fn begin_unwind_(msg: *c_char, file: *c_char, line: size_t) -> ! {
             }
         }
         _ => {
-            // XXX: Need to print the failure message
-            gc::cleanup_stack_for_failure();
             unsafe {
+                // XXX: Bad re-allocations. fail! needs some refactoring
+                let msg = str::raw::from_c_str(msg);
+                let file = str::raw::from_c_str(file);
+
+                let outmsg = fmt!("%s at line %i of file %s", msg, line as int, file);
+
+                // XXX: Logging doesn't work correctly in non-task context because it
+                // invokes the local heap
+                if context == TaskContext {
+                    error!(outmsg);
+                } else {
+                    rtdebug!("%s", outmsg);
+                }
+
+                gc::cleanup_stack_for_failure();
+
                 let local_services = unsafe_borrow_local_services();
-                match local_services.unwinder {
+                let unwinder: &mut Option<Unwinder> = &mut (*local_services).unwinder;
+                match *unwinder {
                     Some(ref mut unwinder) => unwinder.begin_unwind(),
                     None => abort!("failure without unwinder. aborting process")
                 }
diff --git a/src/libcore/task/local_data_priv.rs b/src/libcore/task/local_data_priv.rs
index a30db039f30..27f58057e2f 100644
--- a/src/libcore/task/local_data_priv.rs
+++ b/src/libcore/task/local_data_priv.rs
@@ -36,7 +36,7 @@ impl Handle {
                 }
                 _ => {
                     let local_services = unsafe_borrow_local_services();
-                    NewHandle(&mut local_services.storage)
+                    NewHandle(&mut (*local_services).storage)
                 }
             }
         }
diff --git a/src/libcore/unstable/lang.rs b/src/libcore/unstable/lang.rs
index 8153c2d43d9..e521fb59fbe 100644
--- a/src/libcore/unstable/lang.rs
+++ b/src/libcore/unstable/lang.rs
@@ -16,12 +16,12 @@ use libc::{c_char, c_uchar, c_void, size_t, uintptr_t, c_int, STDERR_FILENO};
 use managed::raw::BoxRepr;
 use str;
 use sys;
-use unstable::exchange_alloc;
 use cast::transmute;
 use rt::{context, OldTaskContext};
 use rt::local_services::borrow_local_services;
 use option::{Option, Some, None};
 use io;
+use rt::global_heap;
 
 #[allow(non_camel_case_types)]
 pub type rust_task = c_void;
@@ -153,7 +153,7 @@ unsafe fn fail_borrowed(box: *mut BoxRepr, file: *c_char, line: size_t) {
 #[lang="exchange_malloc"]
 #[inline(always)]
 pub unsafe fn exchange_malloc(td: *c_char, size: uintptr_t) -> *c_char {
-    transmute(exchange_alloc::malloc(transmute(td), transmute(size)))
+    transmute(global_heap::malloc(transmute(td), transmute(size)))
 }
 
 /// Because this code is so perf. sensitive, use a static constant so that
@@ -233,7 +233,7 @@ impl DebugPrints for io::fd_t {
 #[lang="exchange_free"]
 #[inline(always)]
 pub unsafe fn exchange_free(ptr: *c_char) {
-    exchange_alloc::free(transmute(ptr))
+    global_heap::free(transmute(ptr))
 }
 
 #[lang="malloc"]
diff --git a/src/libcore/unstable/mod.rs b/src/libcore/unstable/mod.rs
index bef7a7f87d3..18a6262f17d 100644
--- a/src/libcore/unstable/mod.rs
+++ b/src/libcore/unstable/mod.rs
@@ -19,7 +19,6 @@ pub mod at_exit;
 pub mod global;
 pub mod finally;
 pub mod weak_task;
-pub mod exchange_alloc;
 pub mod intrinsics;
 pub mod simd;
 pub mod extfmt;