about summary refs log tree commit diff
path: root/src/libstd/rt
diff options
context:
space:
mode:
authorPatrick Walton <pcwalton@mimiga.net>2013-05-21 18:24:42 -0700
committerPatrick Walton <pcwalton@mimiga.net>2013-05-22 21:57:11 -0700
commit18df18c817b5e109710c58f512a2cc5ad14fa8b2 (patch)
tree09cb14a7fa03754cc978d4824a47979acc6d836e /src/libstd/rt
parentee52865c8848657e737e3c2071728b062ec9c8de (diff)
downloadrust-18df18c817b5e109710c58f512a2cc5ad14fa8b2.tar.gz
rust-18df18c817b5e109710c58f512a2cc5ad14fa8b2.zip
libstd: Fix merge fallout.
Diffstat (limited to 'src/libstd/rt')
-rw-r--r--src/libstd/rt/comm.rs618
-rw-r--r--src/libstd/rt/global_heap.rs87
-rw-r--r--src/libstd/rt/io/mock.rs50
-rw-r--r--src/libstd/rt/local.rs118
-rw-r--r--src/libstd/rt/local_ptr.rs145
-rw-r--r--src/libstd/rt/logging.rs68
-rw-r--r--src/libstd/rt/message_queue.rs53
-rw-r--r--src/libstd/rt/rc.rs142
-rw-r--r--src/libstd/rt/sched.rs554
-rw-r--r--src/libstd/rt/tube.rs185
-rw-r--r--src/libstd/rt/uv/idle.rs91
-rw-r--r--src/libstd/rt/uv/timer.rs183
-rw-r--r--src/libstd/rt/uv/uvio.rs492
-rw-r--r--src/libstd/rt/uv/uvll.rs452
14 files changed, 3238 insertions, 0 deletions
diff --git a/src/libstd/rt/comm.rs b/src/libstd/rt/comm.rs
new file mode 100644
index 00000000000..576a402b709
--- /dev/null
+++ b/src/libstd/rt/comm.rs
@@ -0,0 +1,618 @@
+// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+//! Ports and channels.
+//!
+//! XXX: Carefully consider whether the sequentially consistent
+//! atomics here can be converted to acq/rel. I'm not sure they can,
+//! because there is data being transerred in both directions (the payload
+//! goes from sender to receiver and the task pointer goes the other way).
+
+use option::*;
+use cast;
+use util;
+use ops::Drop;
+use kinds::Owned;
+use rt::sched::{Scheduler, Coroutine};
+use rt::local::Local;
+use unstable::intrinsics::{atomic_xchg, atomic_load};
+use util::Void;
+use comm::{GenericChan, GenericSmartChan, GenericPort, Peekable};
+use cell::Cell;
+
+/// A combined refcount / ~Task pointer.
+///
+/// Can be equal to the following values:
+///
+/// * 2 - both endpoints are alive
+/// * 1 - either the sender or the receiver is dead, determined by context
+/// * <ptr> - A pointer to a blocked Task that can be transmuted to ~Task
+type State = int;
+
+static STATE_BOTH: State = 2;
+static STATE_ONE: State = 1;
+
+/// The heap-allocated structure shared between two endpoints.
+struct Packet<T> {
+    state: State,
+    payload: Option<T>,
+}
+
+/// A one-shot channel.
+pub struct ChanOne<T> {
+    // XXX: Hack extra allocation to make by-val self work
+    inner: ~ChanOneHack<T>
+}
+
+
+/// A one-shot port.
+pub struct PortOne<T> {
+    // XXX: Hack extra allocation to make by-val self work
+    inner: ~PortOneHack<T>
+}
+
+pub struct ChanOneHack<T> {
+    void_packet: *mut Void,
+    suppress_finalize: bool
+}
+
+pub struct PortOneHack<T> {
+    void_packet: *mut Void,
+    suppress_finalize: bool
+}
+
+pub fn oneshot<T: Owned>() -> (PortOne<T>, ChanOne<T>) {
+    let packet: ~Packet<T> = ~Packet {
+        state: STATE_BOTH,
+        payload: None
+    };
+
+    unsafe {
+        let packet: *mut Void = cast::transmute(packet);
+        let port = PortOne {
+            inner: ~PortOneHack {
+                void_packet: packet,
+                suppress_finalize: false
+            }
+        };
+        let chan = ChanOne {
+            inner: ~ChanOneHack {
+                void_packet: packet,
+                suppress_finalize: false
+            }
+        };
+        return (port, chan);
+    }
+}
+
+impl<T> ChanOne<T> {
+
+    pub fn send(self, val: T) {
+        self.try_send(val);
+    }
+
+    pub fn try_send(self, val: T) -> bool {
+        let mut this = self;
+        let mut recvr_active = true;
+        let packet = this.inner.packet();
+
+        unsafe {
+
+            // Install the payload
+            assert!((*packet).payload.is_none());
+            (*packet).payload = Some(val);
+
+            // Atomically swap out the old state to figure out what
+            // the port's up to, issuing a release barrier to prevent
+            // reordering of the payload write. This also issues an
+            // acquire barrier that keeps the subsequent access of the
+            // ~Task pointer from being reordered.
+            let oldstate = atomic_xchg(&mut (*packet).state, STATE_ONE);
+            match oldstate {
+                STATE_BOTH => {
+                    // Port is not waiting yet. Nothing to do
+                }
+                STATE_ONE => {
+                    // Port has closed. Need to clean up.
+                    let _packet: ~Packet<T> = cast::transmute(this.inner.void_packet);
+                    recvr_active = false;
+                }
+                task_as_state => {
+                    // Port is blocked. Wake it up.
+                    let recvr: ~Coroutine = cast::transmute(task_as_state);
+                    let sched = Local::take::<Scheduler>();
+                    sched.schedule_task(recvr);
+                }
+            }
+        }
+
+        // Suppress the synchronizing actions in the finalizer. We're done with the packet.
+        this.inner.suppress_finalize = true;
+        return recvr_active;
+    }
+}
+
+
+impl<T> PortOne<T> {
+    pub fn recv(self) -> T {
+        match self.try_recv() {
+            Some(val) => val,
+            None => {
+                fail!("receiving on closed channel");
+            }
+        }
+    }
+
+    pub fn try_recv(self) -> Option<T> {
+        let mut this = self;
+        let packet = this.inner.packet();
+
+        // XXX: Optimize this to not require the two context switches when data is available
+
+        // Switch to the scheduler to put the ~Task into the Packet state.
+        let sched = Local::take::<Scheduler>();
+        do sched.deschedule_running_task_and_then |task| {
+            unsafe {
+                // Atomically swap the task pointer into the Packet state, issuing
+                // an acquire barrier to prevent reordering of the subsequent read
+                // of the payload. Also issues a release barrier to prevent reordering
+                // of any previous writes to the task structure.
+                let task_as_state: State = cast::transmute(task);
+                let oldstate = atomic_xchg(&mut (*packet).state, task_as_state);
+                match oldstate {
+                    STATE_BOTH => {
+                        // Data has not been sent. Now we're blocked.
+                    }
+                    STATE_ONE => {
+                        // Channel is closed. Switch back and check the data.
+                        let task: ~Coroutine = cast::transmute(task_as_state);
+                        let sched = Local::take::<Scheduler>();
+                        sched.resume_task_immediately(task);
+                    }
+                    _ => util::unreachable()
+                }
+            }
+        }
+
+        // Task resumes.
+
+        // No further memory barrier is needed here to access the
+        // payload. Some scenarios:
+        //
+        // 1) We encountered STATE_ONE above - the atomic_xchg was the acq barrier. We're fine.
+        // 2) We encountered STATE_BOTH above and blocked. The sending task then ran us
+        //    and ran on its thread. The sending task issued a read barrier when taking the
+        //    pointer to the receiving task.
+        // 3) We encountered STATE_BOTH above and blocked, but the receiving task (this task)
+        //    is pinned to some other scheduler, so the sending task had to give us to
+        //    a different scheduler for resuming. That send synchronized memory.
+
+        unsafe {
+            let payload = util::replace(&mut (*packet).payload, None);
+
+            // The sender has closed up shop. Drop the packet.
+            let _packet: ~Packet<T> = cast::transmute(this.inner.void_packet);
+            // Suppress the synchronizing actions in the finalizer. We're done with the packet.
+            this.inner.suppress_finalize = true;
+            return payload;
+        }
+    }
+}
+
+impl<T> Peekable<T> for PortOne<T> {
+    fn peek(&self) -> bool {
+        unsafe {
+            let packet: *mut Packet<T> = self.inner.packet();
+            let oldstate = atomic_load(&mut (*packet).state);
+            match oldstate {
+                STATE_BOTH => false,
+                STATE_ONE => (*packet).payload.is_some(),
+                _ => util::unreachable()
+            }
+        }
+    }
+}
+
+#[unsafe_destructor]
+impl<T> Drop for ChanOneHack<T> {
+    fn finalize(&self) {
+        if self.suppress_finalize { return }
+
+        unsafe {
+            let this = cast::transmute_mut(self);
+            let oldstate = atomic_xchg(&mut (*this.packet()).state, STATE_ONE);
+            match oldstate {
+                STATE_BOTH => {
+                    // Port still active. It will destroy the Packet.
+                },
+                STATE_ONE => {
+                    let _packet: ~Packet<T> = cast::transmute(this.void_packet);
+                },
+                task_as_state => {
+                    // The port is blocked waiting for a message we will never send. Wake it.
+                    assert!((*this.packet()).payload.is_none());
+                    let recvr: ~Coroutine = cast::transmute(task_as_state);
+                    let sched = Local::take::<Scheduler>();
+                    sched.schedule_task(recvr);
+                }
+            }
+        }
+    }
+}
+
+#[unsafe_destructor]
+impl<T> Drop for PortOneHack<T> {
+    fn finalize(&self) {
+        if self.suppress_finalize { return }
+
+        unsafe {
+            let this = cast::transmute_mut(self);
+            let oldstate = atomic_xchg(&mut (*this.packet()).state, STATE_ONE);
+            match oldstate {
+                STATE_BOTH => {
+                    // Chan still active. It will destroy the packet.
+                },
+                STATE_ONE => {
+                    let _packet: ~Packet<T> = cast::transmute(this.void_packet);
+                }
+                _ => {
+                    util::unreachable()
+                }
+            }
+        }
+    }
+}
+
+impl<T> ChanOneHack<T> {
+    fn packet(&self) -> *mut Packet<T> {
+        unsafe {
+            let p: *mut ~Packet<T> = cast::transmute(&self.void_packet);
+            let p: *mut Packet<T> = &mut **p;
+            return p;
+        }
+    }
+}
+
+impl<T> PortOneHack<T> {
+    fn packet(&self) -> *mut Packet<T> {
+        unsafe {
+            let p: *mut ~Packet<T> = cast::transmute(&self.void_packet);
+            let p: *mut Packet<T> = &mut **p;
+            return p;
+        }
+    }
+}
+
+struct StreamPayload<T> {
+    val: T,
+    next: PortOne<StreamPayload<T>>
+}
+
+/// A channel with unbounded size.
+pub struct Chan<T> {
+    // FIXME #5372. Using Cell because we don't take &mut self
+    next: Cell<ChanOne<StreamPayload<T>>>
+}
+
+/// An port with unbounded size.
+pub struct Port<T> {
+    // FIXME #5372. Using Cell because we don't take &mut self
+    next: Cell<PortOne<StreamPayload<T>>>
+}
+
+pub fn stream<T: Owned>() -> (Port<T>, Chan<T>) {
+    let (pone, cone) = oneshot();
+    let port = Port { next: Cell(pone) };
+    let chan = Chan { next: Cell(cone) };
+    return (port, chan);
+}
+
+impl<T: Owned> GenericChan<T> for Chan<T> {
+    fn send(&self, val: T) {
+        self.try_send(val);
+    }
+}
+
+impl<T: Owned> GenericSmartChan<T> for Chan<T> {
+    fn try_send(&self, val: T) -> bool {
+        let (next_pone, next_cone) = oneshot();
+        let cone = self.next.take();
+        self.next.put_back(next_cone);
+        cone.try_send(StreamPayload { val: val, next: next_pone })
+    }
+}
+
+impl<T> GenericPort<T> for Port<T> {
+    fn recv(&self) -> T {
+        match self.try_recv() {
+            Some(val) => val,
+            None => {
+                fail!("receiving on closed channel");
+            }
+        }
+    }
+
+    fn try_recv(&self) -> Option<T> {
+        let pone = self.next.take();
+        match pone.try_recv() {
+            Some(StreamPayload { val, next }) => {
+                self.next.put_back(next);
+                Some(val)
+            }
+            None => None
+        }
+    }
+}
+
+impl<T> Peekable<T> for Port<T> {
+    fn peek(&self) -> bool {
+        self.next.with_mut_ref(|p| p.peek())
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use super::*;
+    use option::*;
+    use rt::test::*;
+    use cell::Cell;
+    use iter::Times;
+
+    #[test]
+    fn oneshot_single_thread_close_port_first() {
+        // Simple test of closing without sending
+        do run_in_newsched_task {
+            let (port, _chan) = oneshot::<int>();
+            { let _p = port; }
+        }
+    }
+
+    #[test]
+    fn oneshot_single_thread_close_chan_first() {
+        // Simple test of closing without sending
+        do run_in_newsched_task {
+            let (_port, chan) = oneshot::<int>();
+            { let _c = chan; }
+        }
+    }
+
+    #[test]
+    fn oneshot_single_thread_send_port_close() {
+        // Testing that the sender cleans up the payload if receiver is closed
+        do run_in_newsched_task {
+            let (port, chan) = oneshot::<~int>();
+            { let _p = port; }
+            chan.send(~0);
+        }
+    }
+
+    #[test]
+    fn oneshot_single_thread_recv_chan_close() {
+        // Receiving on a closed chan will fail
+        do run_in_newsched_task {
+            let res = do spawntask_try {
+                let (port, chan) = oneshot::<~int>();
+                { let _c = chan; }
+                port.recv();
+            };
+            assert!(res.is_err());
+        }
+    }
+
+    #[test]
+    fn oneshot_single_thread_send_then_recv() {
+        do run_in_newsched_task {
+            let (port, chan) = oneshot::<~int>();
+            chan.send(~10);
+            assert!(port.recv() == ~10);
+        }
+    }
+
+    #[test]
+    fn oneshot_single_thread_try_send_open() {
+        do run_in_newsched_task {
+            let (port, chan) = oneshot::<int>();
+            assert!(chan.try_send(10));
+            assert!(port.recv() == 10);
+        }
+    }
+
+    #[test]
+    fn oneshot_single_thread_try_send_closed() {
+        do run_in_newsched_task {
+            let (port, chan) = oneshot::<int>();
+            { let _p = port; }
+            assert!(!chan.try_send(10));
+        }
+    }
+
+    #[test]
+    fn oneshot_single_thread_try_recv_open() {
+        do run_in_newsched_task {
+            let (port, chan) = oneshot::<int>();
+            chan.send(10);
+            assert!(port.try_recv() == Some(10));
+        }
+    }
+
+    #[test]
+    fn oneshot_single_thread_try_recv_closed() {
+        do run_in_newsched_task {
+            let (port, chan) = oneshot::<int>();
+            { let _c = chan; }
+            assert!(port.try_recv() == None);
+        }
+    }
+
+    #[test]
+    fn oneshot_single_thread_peek_data() {
+        do run_in_newsched_task {
+            let (port, chan) = oneshot::<int>();
+            assert!(!port.peek());
+            chan.send(10);
+            assert!(port.peek());
+        }
+    }
+
+    #[test]
+    fn oneshot_single_thread_peek_close() {
+        do run_in_newsched_task {
+            let (port, chan) = oneshot::<int>();
+            { let _c = chan; }
+            assert!(!port.peek());
+            assert!(!port.peek());
+        }
+    }
+
+    #[test]
+    fn oneshot_single_thread_peek_open() {
+        do run_in_newsched_task {
+            let (port, chan) = oneshot::<int>();
+            assert!(!port.peek());
+        }
+    }
+
+    #[test]
+    fn oneshot_multi_task_recv_then_send() {
+        do run_in_newsched_task {
+            let (port, chan) = oneshot::<~int>();
+            let port_cell = Cell(port);
+            do spawntask_immediately {
+                assert!(port_cell.take().recv() == ~10);
+            }
+
+            chan.send(~10);
+        }
+    }
+
+    #[test]
+    fn oneshot_multi_task_recv_then_close() {
+        do run_in_newsched_task {
+            let (port, chan) = oneshot::<~int>();
+            let port_cell = Cell(port);
+            let chan_cell = Cell(chan);
+            do spawntask_later {
+                let _cell = chan_cell.take();
+            }
+            let res = do spawntask_try {
+                assert!(port_cell.take().recv() == ~10);
+            };
+            assert!(res.is_err());
+        }
+    }
+
+    #[test]
+    fn oneshot_multi_thread_close_stress() {
+        for stress_factor().times {
+            do run_in_newsched_task {
+                let (port, chan) = oneshot::<int>();
+                let port_cell = Cell(port);
+                let _thread = do spawntask_thread {
+                    let _p = port_cell.take();
+                };
+                let _chan = chan;
+            }
+        }
+    }
+
+    #[test]
+    fn oneshot_multi_thread_send_close_stress() {
+        for stress_factor().times {
+            do run_in_newsched_task {
+                let (port, chan) = oneshot::<int>();
+                let chan_cell = Cell(chan);
+                let port_cell = Cell(port);
+                let _thread1 = do spawntask_thread {
+                    let _p = port_cell.take();
+                };
+                let _thread2 = do spawntask_thread {
+                    let c = chan_cell.take();
+                    c.send(1);
+                };
+            }
+        }
+    }
+
+    #[test]
+    fn oneshot_multi_thread_recv_close_stress() {
+        for stress_factor().times {
+            do run_in_newsched_task {
+                let (port, chan) = oneshot::<int>();
+                let chan_cell = Cell(chan);
+                let port_cell = Cell(port);
+                let _thread1 = do spawntask_thread {
+                    let port_cell = Cell(port_cell.take());
+                    let res = do spawntask_try {
+                        port_cell.take().recv();
+                    };
+                    assert!(res.is_err());
+                };
+                let _thread2 = do spawntask_thread {
+                    let chan_cell = Cell(chan_cell.take());
+                    do spawntask {
+                        chan_cell.take();
+                    }
+                };
+            }
+        }
+    }
+
+    #[test]
+    fn oneshot_multi_thread_send_recv_stress() {
+        for stress_factor().times {
+            do run_in_newsched_task {
+                let (port, chan) = oneshot::<~int>();
+                let chan_cell = Cell(chan);
+                let port_cell = Cell(port);
+                let _thread1 = do spawntask_thread {
+                    chan_cell.take().send(~10);
+                };
+                let _thread2 = do spawntask_thread {
+                    assert!(port_cell.take().recv() == ~10);
+                };
+            }
+        }
+    }
+
+    #[test]
+    fn stream_send_recv_stress() {
+        for stress_factor().times {
+            do run_in_newsched_task {
+                let (port, chan) = stream::<~int>();
+
+                send(chan, 0);
+                recv(port, 0);
+
+                fn send(chan: Chan<~int>, i: int) {
+                    if i == 10 { return }
+
+                    let chan_cell = Cell(chan);
+                    let _thread = do spawntask_thread {
+                        let chan = chan_cell.take();
+                        chan.send(~i);
+                        send(chan, i + 1);
+                    };
+                }
+
+                fn recv(port: Port<~int>, i: int) {
+                    if i == 10 { return }
+
+                    let port_cell = Cell(port);
+                    let _thread = do spawntask_thread {
+                        let port = port_cell.take();
+                        assert!(port.recv() == ~i);
+                        recv(port, i + 1);
+                    };
+                }
+            }
+        }
+    }
+}
+
diff --git a/src/libstd/rt/global_heap.rs b/src/libstd/rt/global_heap.rs
new file mode 100644
index 00000000000..ce7ff87b445
--- /dev/null
+++ b/src/libstd/rt/global_heap.rs
@@ -0,0 +1,87 @@
+// Copyright 2012 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+use sys::{TypeDesc, size_of};
+use libc::{c_void, size_t, uintptr_t};
+use c_malloc = libc::malloc;
+use c_free = libc::free;
+use managed::raw::{BoxHeaderRepr, BoxRepr};
+use cast::transmute;
+use unstable::intrinsics::{atomic_xadd,atomic_xsub};
+use ptr::null;
+use intrinsic::TyDesc;
+
+pub unsafe fn malloc(td: *TypeDesc, size: uint) -> *c_void {
+    assert!(td.is_not_null());
+
+    let total_size = get_box_size(size, (*td).align);
+    let p = c_malloc(total_size as size_t);
+    assert!(p.is_not_null());
+
+    // FIXME #3475: Converting between our two different tydesc types
+    let td: *TyDesc = transmute(td);
+
+    let box: &mut BoxRepr = transmute(p);
+    box.header.ref_count = -1; // Exchange values not ref counted
+    box.header.type_desc = td;
+    box.header.prev = null();
+    box.header.next = null();
+
+    let exchange_count = &mut *exchange_count_ptr();
+    atomic_xadd(exchange_count, 1);
+
+    return transmute(box);
+}
+/**
+Thin wrapper around libc::malloc, none of the box header
+stuff in exchange_alloc::malloc
+*/
+pub unsafe fn malloc_raw(size: uint) -> *c_void {
+    let p = c_malloc(size as size_t);
+    if p.is_null() {
+        fail!("Failure in malloc_raw: result ptr is null");
+    }
+    p
+}
+
+pub unsafe fn free(ptr: *c_void) {
+    let exchange_count = &mut *exchange_count_ptr();
+    atomic_xsub(exchange_count, 1);
+
+    assert!(ptr.is_not_null());
+    c_free(ptr);
+}
+///Thin wrapper around libc::free, as with exchange_alloc::malloc_raw
+pub unsafe fn free_raw(ptr: *c_void) {
+    c_free(ptr);
+}
+
+fn get_box_size(body_size: uint, body_align: uint) -> uint {
+    let header_size = size_of::<BoxHeaderRepr>();
+    // FIXME (#2699): This alignment calculation is suspicious. Is it right?
+    let total_size = align_to(header_size, body_align) + body_size;
+    return total_size;
+}
+
+// Rounds |size| to the nearest |alignment|. Invariant: |alignment| is a power
+// of two.
+fn align_to(size: uint, align: uint) -> uint {
+    assert!(align != 0);
+    (size + align - 1) & !(align - 1)
+}
+
+fn exchange_count_ptr() -> *mut int {
+    // XXX: Need mutable globals
+    unsafe { transmute(&rust_exchange_count) }
+}
+
+extern {
+    static rust_exchange_count: uintptr_t;
+}
diff --git a/src/libstd/rt/io/mock.rs b/src/libstd/rt/io/mock.rs
new file mode 100644
index 00000000000..b580b752bd9
--- /dev/null
+++ b/src/libstd/rt/io/mock.rs
@@ -0,0 +1,50 @@
+// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+use option::{Option, None};
+use rt::io::{Reader, Writer};
+
+pub struct MockReader {
+    read: ~fn(buf: &mut [u8]) -> Option<uint>,
+    eof: ~fn() -> bool
+}
+
+impl MockReader {
+    pub fn new() -> MockReader {
+        MockReader {
+            read: |_| None,
+            eof: || false
+        }
+    }
+}
+
+impl Reader for MockReader {
+    fn read(&mut self, buf: &mut [u8]) -> Option<uint> { (self.read)(buf) }
+    fn eof(&mut self) -> bool { (self.eof)() }
+}
+
+pub struct MockWriter {
+    write: ~fn(buf: &[u8]),
+    flush: ~fn()
+}
+
+impl MockWriter {
+    pub fn new() -> MockWriter {
+        MockWriter {
+            write: |_| (),
+            flush: || ()
+        }
+    }
+}
+
+impl Writer for MockWriter {
+    fn write(&mut self, buf: &[u8]) { (self.write)(buf) }
+    fn flush(&mut self) { (self.flush)() }
+}
\ No newline at end of file
diff --git a/src/libstd/rt/local.rs b/src/libstd/rt/local.rs
new file mode 100644
index 00000000000..64a384ddff0
--- /dev/null
+++ b/src/libstd/rt/local.rs
@@ -0,0 +1,118 @@
+// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+use option::{Option, Some, None};
+use rt::sched::Scheduler;
+use rt::task::Task;
+use rt::local_ptr;
+use rt::rtio::{EventLoop, IoFactoryObject};
+
+pub trait Local {
+    fn put(value: ~Self);
+    fn take() -> ~Self;
+    fn exists() -> bool;
+    fn borrow(f: &fn(&mut Self));
+    unsafe fn unsafe_borrow() -> *mut Self;
+    unsafe fn try_unsafe_borrow() -> Option<*mut Self>;
+}
+
+impl Local for Scheduler {
+    fn put(value: ~Scheduler) { unsafe { local_ptr::put(value) }}
+    fn take() -> ~Scheduler { unsafe { local_ptr::take() } }
+    fn exists() -> bool { local_ptr::exists() }
+    fn borrow(f: &fn(&mut Scheduler)) { unsafe { local_ptr::borrow(f) } }
+    unsafe fn unsafe_borrow() -> *mut Scheduler { local_ptr::unsafe_borrow() }
+    unsafe fn try_unsafe_borrow() -> Option<*mut Scheduler> { abort!("unimpl") }
+}
+
+impl Local for Task {
+    fn put(value: ~Task) { abort!("unimpl") }
+    fn take() -> ~Task { abort!("unimpl") }
+    fn exists() -> bool { abort!("unimpl") }
+    fn borrow(f: &fn(&mut Task)) {
+        do Local::borrow::<Scheduler> |sched| {
+            match sched.current_task {
+                Some(~ref mut task) => {
+                    f(&mut *task.task)
+                }
+                None => {
+                    abort!("no scheduler")
+                }
+            }
+        }
+    }
+    unsafe fn unsafe_borrow() -> *mut Task {
+        match (*Local::unsafe_borrow::<Scheduler>()).current_task {
+            Some(~ref mut task) => {
+                let s: *mut Task = &mut *task.task;
+                return s;
+            }
+            None => {
+                // Don't fail. Infinite recursion
+                abort!("no scheduler")
+            }
+        }
+    }
+    unsafe fn try_unsafe_borrow() -> Option<*mut Task> {
+        if Local::exists::<Scheduler>() {
+            Some(Local::unsafe_borrow())
+        } else {
+            None
+        }
+    }
+}
+
+// XXX: This formulation won't work once ~IoFactoryObject is a real trait pointer
+impl Local for IoFactoryObject {
+    fn put(value: ~IoFactoryObject) { abort!("unimpl") }
+    fn take() -> ~IoFactoryObject { abort!("unimpl") }
+    fn exists() -> bool { abort!("unimpl") }
+    fn borrow(f: &fn(&mut IoFactoryObject)) { abort!("unimpl") }
+    unsafe fn unsafe_borrow() -> *mut IoFactoryObject {
+        let sched = Local::unsafe_borrow::<Scheduler>();
+        let io: *mut IoFactoryObject = (*sched).event_loop.io().unwrap();
+        return io;
+    }
+    unsafe fn try_unsafe_borrow() -> Option<*mut IoFactoryObject> { abort!("unimpl") }
+}
+
+#[cfg(test)]
+mod test {
+    use rt::sched::Scheduler;
+    use rt::uv::uvio::UvEventLoop;
+    use super::*;
+
+    #[test]
+    fn thread_local_scheduler_smoke_test() {
+        let scheduler = ~UvEventLoop::new_scheduler();
+        Local::put(scheduler);
+        let _scheduler: ~Scheduler = Local::take();
+    }
+
+    #[test]
+    fn thread_local_scheduler_two_instances() {
+        let scheduler = ~UvEventLoop::new_scheduler();
+        Local::put(scheduler);
+        let _scheduler: ~Scheduler = Local::take();
+        let scheduler = ~UvEventLoop::new_scheduler();
+        Local::put(scheduler);
+        let _scheduler: ~Scheduler = Local::take();
+    }
+
+    #[test]
+    fn borrow_smoke_test() {
+        let scheduler = ~UvEventLoop::new_scheduler();
+        Local::put(scheduler);
+        unsafe {
+            let _scheduler: *mut Scheduler = Local::unsafe_borrow();
+        }
+        let _scheduler: ~Scheduler = Local::take();
+    }
+}
\ No newline at end of file
diff --git a/src/libstd/rt/local_ptr.rs b/src/libstd/rt/local_ptr.rs
new file mode 100644
index 00000000000..80d797e8c65
--- /dev/null
+++ b/src/libstd/rt/local_ptr.rs
@@ -0,0 +1,145 @@
+// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+//! Access to a single thread-local pointer.
+//!
+//! The runtime will use this for storing ~Task.
+//!
+//! XXX: Add runtime checks for usage of inconsistent pointer types.
+//! and for overwriting an existing pointer.
+
+use libc::c_void;
+use cast;
+use ptr;
+use cell::Cell;
+use option::{Option, Some, None};
+use unstable::finally::Finally;
+use tls = rt::thread_local_storage;
+
+/// Initialize the TLS key. Other ops will fail if this isn't executed first.
+pub fn init_tls_key() {
+    unsafe {
+        rust_initialize_rt_tls_key();
+        extern {
+            fn rust_initialize_rt_tls_key();
+        }
+    }
+}
+
+/// Give a pointer to thread-local storage.
+///
+/// # Safety note
+///
+/// Does not validate the pointer type.
+pub unsafe fn put<T>(sched: ~T) {
+    let key = tls_key();
+    let void_ptr: *mut c_void = cast::transmute(sched);
+    tls::set(key, void_ptr);
+}
+
+/// Take ownership of a pointer from thread-local storage.
+///
+/// # Safety note
+///
+/// Does not validate the pointer type.
+pub unsafe fn take<T>() -> ~T {
+    let key = tls_key();
+    let void_ptr: *mut c_void = tls::get(key);
+    rtassert!(void_ptr.is_not_null());
+    let ptr: ~T = cast::transmute(void_ptr);
+    tls::set(key, ptr::mut_null());
+    return ptr;
+}
+
+/// Check whether there is a thread-local pointer installed.
+pub fn exists() -> bool {
+    unsafe {
+        match maybe_tls_key() {
+            Some(key) => tls::get(key).is_not_null(),
+            None => false
+        }
+    }
+}
+
+/// Borrow the thread-local scheduler from thread-local storage.
+/// While the scheduler is borrowed it is not available in TLS.
+///
+/// # Safety note
+///
+/// Does not validate the pointer type.
+pub unsafe fn borrow<T>(f: &fn(&mut T)) {
+    let mut value = take();
+
+    // XXX: Need a different abstraction from 'finally' here to avoid unsafety
+    let unsafe_ptr = cast::transmute_mut_region(&mut *value);
+    let value_cell = Cell(value);
+
+    do (|| {
+        f(unsafe_ptr);
+    }).finally {
+        put(value_cell.take());
+    }
+}
+
+/// Borrow a mutable reference to the thread-local Scheduler
+///
+/// # Safety Note
+///
+/// Because this leaves the Scheduler in thread-local storage it is possible
+/// For the Scheduler pointer to be aliased
+pub unsafe fn unsafe_borrow<T>() -> *mut T {
+    let key = tls_key();
+    let mut void_sched: *mut c_void = tls::get(key);
+    rtassert!(void_sched.is_not_null());
+    {
+        let sched: *mut *mut c_void = &mut void_sched;
+        let sched: *mut ~T = sched as *mut ~T;
+        let sched: *mut T = &mut **sched;
+        return sched;
+    }
+}
+
+fn tls_key() -> tls::Key {
+    match maybe_tls_key() {
+        Some(key) => key,
+        None => abort!("runtime tls key not initialized")
+    }
+}
+
+fn maybe_tls_key() -> Option<tls::Key> {
+    unsafe {
+        let key: *mut c_void = rust_get_rt_tls_key();
+        let key: &mut tls::Key = cast::transmute(key);
+        let key = *key;
+        // Check that the key has been initialized.
+
+        // NB: This is a little racy because, while the key is
+        // initalized under a mutex and it's assumed to be initalized
+        // in the Scheduler ctor by any thread that needs to use it,
+        // we are not accessing the key under a mutex.  Threads that
+        // are not using the new Scheduler but still *want to check*
+        // whether they are running under a new Scheduler may see a 0
+        // value here that is in the process of being initialized in
+        // another thread. I think this is fine since the only action
+        // they could take if it was initialized would be to check the
+        // thread-local value and see that it's not set.
+        if key != -1 {
+            return Some(key);
+        } else {
+            return None;
+        }
+    }
+
+    extern {
+        #[fast_ffi]
+        fn rust_get_rt_tls_key() -> *mut c_void;
+    }
+
+}
diff --git a/src/libstd/rt/logging.rs b/src/libstd/rt/logging.rs
new file mode 100644
index 00000000000..a0d05397689
--- /dev/null
+++ b/src/libstd/rt/logging.rs
@@ -0,0 +1,68 @@
+// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+use either::*;
+
+pub trait Logger {
+    fn log(&mut self, msg: Either<~str, &'static str>);
+}
+
+pub struct StdErrLogger;
+
+impl Logger for StdErrLogger {
+    fn log(&mut self, msg: Either<~str, &'static str>) {
+        use io::{Writer, WriterUtil};
+
+        let s: &str = match msg {
+            Left(ref s) => {
+                let s: &str = *s;
+                s
+            }
+            Right(ref s) => {
+                let s: &str = *s;
+                s
+            }
+        };
+        let dbg = ::libc::STDERR_FILENO as ::io::fd_t;
+        dbg.write_str(s);
+        dbg.write_str("\n");
+        dbg.flush();
+    }
+}
+
+/// Configure logging by traversing the crate map and setting the
+/// per-module global logging flags based on the logging spec
+pub fn init(crate_map: *u8) {
+    use os;
+    use str;
+    use ptr;
+    use option::{Some, None};
+    use libc::c_char;
+
+    let log_spec = os::getenv("RUST_LOG");
+    match log_spec {
+        Some(spec) => {
+            do str::as_c_str(spec) |s| {
+                unsafe {
+                    rust_update_log_settings(crate_map, s);
+                }
+            }
+        }
+        None => {
+            unsafe {
+                rust_update_log_settings(crate_map, ptr::null());
+            }
+        }
+    }
+
+    extern {
+        fn rust_update_log_settings(crate_map: *u8, settings: *c_char);
+    }
+}
diff --git a/src/libstd/rt/message_queue.rs b/src/libstd/rt/message_queue.rs
new file mode 100644
index 00000000000..eaab9288ac8
--- /dev/null
+++ b/src/libstd/rt/message_queue.rs
@@ -0,0 +1,53 @@
+// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+use container::Container;
+use kinds::Owned;
+use vec::OwnedVector;
+use cell::Cell;
+use option::*;
+use unstable::sync::{Exclusive, exclusive};
+use clone::Clone;
+
+pub struct MessageQueue<T> {
+    // XXX: Another mystery bug fixed by boxing this lock
+    priv queue: ~Exclusive<~[T]>
+}
+
+impl<T: Owned> MessageQueue<T> {
+    pub fn new() -> MessageQueue<T> {
+        MessageQueue {
+            queue: ~exclusive(~[])
+        }
+    }
+
+    pub fn push(&mut self, value: T) {
+        let value = Cell(value);
+        self.queue.with(|q| q.push(value.take()) );
+    }
+
+    pub fn pop(&mut self) -> Option<T> {
+        do self.queue.with |q| {
+            if !q.is_empty() {
+                Some(q.shift())
+            } else {
+                None
+            }
+        }
+    }
+}
+
+impl<T> Clone for MessageQueue<T> {
+    fn clone(&self) -> MessageQueue<T> {
+        MessageQueue {
+            queue: self.queue.clone()
+        }
+    }
+}
diff --git a/src/libstd/rt/rc.rs b/src/libstd/rt/rc.rs
new file mode 100644
index 00000000000..1c0c8c14fdf
--- /dev/null
+++ b/src/libstd/rt/rc.rs
@@ -0,0 +1,142 @@
+// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+//! An owned, task-local, reference counted type
+//!
+//! # Safety note
+//!
+//! XXX There is currently no type-system mechanism for enforcing that
+//! reference counted types are both allocated on the exchange heap
+//! and also non-sendable
+//!
+//! This doesn't prevent borrowing multiple aliasable mutable pointers
+
+use ops::Drop;
+use clone::Clone;
+use libc::c_void;
+use cast;
+
+pub struct RC<T> {
+    p: *c_void // ~(uint, T)
+}
+
+impl<T> RC<T> {
+    pub fn new(val: T) -> RC<T> {
+        unsafe {
+            let v = ~(1, val);
+            let p: *c_void = cast::transmute(v);
+            RC { p: p }
+        }
+    }
+
+    fn get_mut_state(&mut self) -> *mut (uint, T) {
+        unsafe {
+            let p: &mut ~(uint, T) = cast::transmute(&mut self.p);
+            let p: *mut (uint, T) = &mut **p;
+            return p;
+        }
+    }
+
+    fn get_state(&self) -> *(uint, T) {
+        unsafe {
+            let p: &~(uint, T) = cast::transmute(&self.p);
+            let p: *(uint, T) = &**p;
+            return p;
+        }
+    }
+
+    pub fn unsafe_borrow_mut(&mut self) -> *mut T {
+        unsafe {
+            match *self.get_mut_state() {
+                (_, ref mut p) => {
+                    let p: *mut T = p;
+                    return p;
+                }
+            }
+        }
+    }
+
+    pub fn refcount(&self) -> uint {
+        unsafe {
+            match *self.get_state() {
+                (count, _) => count
+            }
+        }
+    }
+}
+
+#[unsafe_destructor]
+impl<T> Drop for RC<T> {
+    fn finalize(&self) {
+        assert!(self.refcount() > 0);
+
+        unsafe {
+            // XXX: Mutable finalizer
+            let this: &mut RC<T> = cast::transmute_mut(self);
+
+            match *this.get_mut_state() {
+                (ref mut count, _) => {
+                    *count = *count - 1
+                }
+            }
+
+            if this.refcount() == 0 {
+                let _: ~(uint, T) = cast::transmute(this.p);
+            }
+        }
+    }
+}
+
+impl<T> Clone for RC<T> {
+    fn clone(&self) -> RC<T> {
+        unsafe {
+            // XXX: Mutable clone
+            let this: &mut RC<T> = cast::transmute_mut(self);
+
+            match *this.get_mut_state() {
+                (ref mut count, _) => {
+                    *count = *count + 1;
+                }
+            }
+        }
+
+        RC { p: self.p }
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use super::RC;
+
+    #[test]
+    fn smoke_test() {
+        unsafe {
+            let mut v1 = RC::new(100);
+            assert!(*v1.unsafe_borrow_mut() == 100);
+            assert!(v1.refcount() == 1);
+
+            let mut v2 = v1.clone();
+            assert!(*v2.unsafe_borrow_mut() == 100);
+            assert!(v2.refcount() == 2);
+
+            *v2.unsafe_borrow_mut() = 200;
+            assert!(*v2.unsafe_borrow_mut() == 200);
+            assert!(*v1.unsafe_borrow_mut() == 200);
+
+            let v3 = v2.clone();
+            assert!(v3.refcount() == 3);
+            {
+                let _v1 = v1;
+                let _v2 = v2;
+            }
+            assert!(v3.refcount() == 1);
+        }
+    }
+}
diff --git a/src/libstd/rt/sched.rs b/src/libstd/rt/sched.rs
new file mode 100644
index 00000000000..50c6a894093
--- /dev/null
+++ b/src/libstd/rt/sched.rs
@@ -0,0 +1,554 @@
+// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+use option::*;
+use sys;
+use cast::transmute;
+use cell::Cell;
+
+use super::work_queue::WorkQueue;
+use super::stack::{StackPool, StackSegment};
+use super::rtio::{EventLoop, EventLoopObject};
+use super::context::Context;
+use super::task::Task;
+use rt::local_ptr;
+use rt::local::Local;
+use rt::rtio::IoFactoryObject;
+
+/// The Scheduler is responsible for coordinating execution of Coroutines
+/// on a single thread. When the scheduler is running it is owned by
+/// thread local storage and the running task is owned by the
+/// scheduler.
+pub struct Scheduler {
+    priv work_queue: WorkQueue<~Coroutine>,
+    stack_pool: StackPool,
+    /// The event loop used to drive the scheduler and perform I/O
+    event_loop: ~EventLoopObject,
+    /// The scheduler's saved context.
+    /// Always valid when a task is executing, otherwise not
+    priv saved_context: Context,
+    /// The currently executing task
+    current_task: Option<~Coroutine>,
+    /// An action performed after a context switch on behalf of the
+    /// code running before the context switch
+    priv cleanup_job: Option<CleanupJob>
+}
+
+// XXX: Some hacks to put a &fn in Scheduler without borrowck
+// complaining
+type UnsafeTaskReceiver = sys::Closure;
+trait ClosureConverter {
+    fn from_fn(&fn(~Coroutine)) -> Self;
+    fn to_fn(self) -> &fn(~Coroutine);
+}
+impl ClosureConverter for UnsafeTaskReceiver {
+    fn from_fn(f: &fn(~Coroutine)) -> UnsafeTaskReceiver { unsafe { transmute(f) } }
+    fn to_fn(self) -> &fn(~Coroutine) { unsafe { transmute(self) } }
+}
+
+enum CleanupJob {
+    DoNothing,
+    GiveTask(~Coroutine, UnsafeTaskReceiver)
+}
+
+pub impl Scheduler {
+
+    fn in_task_context(&self) -> bool { self.current_task.is_some() }
+
+    fn new(event_loop: ~EventLoopObject) -> Scheduler {
+
+        // Lazily initialize the runtime TLS key
+        local_ptr::init_tls_key();
+
+        Scheduler {
+            event_loop: event_loop,
+            work_queue: WorkQueue::new(),
+            stack_pool: StackPool::new(),
+            saved_context: Context::empty(),
+            current_task: None,
+            cleanup_job: None
+        }
+    }
+
+    // XXX: This may eventually need to be refactored so that
+    // the scheduler itself doesn't have to call event_loop.run.
+    // That will be important for embedding the runtime into external
+    // event loops.
+    fn run(~self) -> ~Scheduler {
+        assert!(!self.in_task_context());
+
+        let mut self_sched = self;
+
+        unsafe {
+            let event_loop: *mut ~EventLoopObject = {
+                let event_loop: *mut ~EventLoopObject = &mut self_sched.event_loop;
+                event_loop
+            };
+
+            // Give ownership of the scheduler (self) to the thread
+            Local::put(self_sched);
+
+            (*event_loop).run();
+        }
+
+        let sched = Local::take::<Scheduler>();
+        assert!(sched.work_queue.is_empty());
+        return sched;
+    }
+
+    /// Schedule a task to be executed later.
+    ///
+    /// Pushes the task onto the work stealing queue and tells the event loop
+    /// to run it later. Always use this instead of pushing to the work queue
+    /// directly.
+    fn enqueue_task(&mut self, task: ~Coroutine) {
+        self.work_queue.push(task);
+        self.event_loop.callback(resume_task_from_queue);
+
+        fn resume_task_from_queue() {
+            let scheduler = Local::take::<Scheduler>();
+            scheduler.resume_task_from_queue();
+        }
+    }
+
+    // * Scheduler-context operations
+
+    fn resume_task_from_queue(~self) {
+        assert!(!self.in_task_context());
+
+        rtdebug!("looking in work queue for task to schedule");
+
+        let mut this = self;
+        match this.work_queue.pop() {
+            Some(task) => {
+                rtdebug!("resuming task from work queue");
+                this.resume_task_immediately(task);
+            }
+            None => {
+                rtdebug!("no tasks in queue");
+                Local::put(this);
+            }
+        }
+    }
+
+    // * Task-context operations
+
+    /// Called by a running task to end execution, after which it will
+    /// be recycled by the scheduler for reuse in a new task.
+    fn terminate_current_task(~self) {
+        assert!(self.in_task_context());
+
+        rtdebug!("ending running task");
+
+        do self.deschedule_running_task_and_then |dead_task| {
+            let dead_task = Cell(dead_task);
+            do Local::borrow::<Scheduler> |sched| {
+                dead_task.take().recycle(&mut sched.stack_pool);
+            }
+        }
+
+        abort!("control reached end of task");
+    }
+
+    fn schedule_new_task(~self, task: ~Coroutine) {
+        assert!(self.in_task_context());
+
+        do self.switch_running_tasks_and_then(task) |last_task| {
+            let last_task = Cell(last_task);
+            do Local::borrow::<Scheduler> |sched| {
+                sched.enqueue_task(last_task.take());
+            }
+        }
+    }
+
+    fn schedule_task(~self, task: ~Coroutine) {
+        assert!(self.in_task_context());
+
+        do self.switch_running_tasks_and_then(task) |last_task| {
+            let last_task = Cell(last_task);
+            do Local::borrow::<Scheduler> |sched| {
+                sched.enqueue_task(last_task.take());
+            }
+        }
+    }
+
+    // Core scheduling ops
+
+    fn resume_task_immediately(~self, task: ~Coroutine) {
+        let mut this = self;
+        assert!(!this.in_task_context());
+
+        rtdebug!("scheduling a task");
+
+        // Store the task in the scheduler so it can be grabbed later
+        this.current_task = Some(task);
+        this.enqueue_cleanup_job(DoNothing);
+
+        Local::put(this);
+
+        // Take pointers to both the task and scheduler's saved registers.
+        unsafe {
+            let sched = Local::unsafe_borrow::<Scheduler>();
+            let (sched_context, _, next_task_context) = (*sched).get_contexts();
+            let next_task_context = next_task_context.unwrap();
+            // Context switch to the task, restoring it's registers
+            // and saving the scheduler's
+            Context::swap(sched_context, next_task_context);
+
+            let sched = Local::unsafe_borrow::<Scheduler>();
+            // The running task should have passed ownership elsewhere
+            assert!((*sched).current_task.is_none());
+
+            // Running tasks may have asked us to do some cleanup
+            (*sched).run_cleanup_job();
+        }
+    }
+
+    /// Block a running task, context switch to the scheduler, then pass the
+    /// blocked task to a closure.
+    ///
+    /// # Safety note
+    ///
+    /// The closure here is a *stack* closure that lives in the
+    /// running task.  It gets transmuted to the scheduler's lifetime
+    /// and called while the task is blocked.
+    fn deschedule_running_task_and_then(~self, f: &fn(~Coroutine)) {
+        let mut this = self;
+        assert!(this.in_task_context());
+
+        rtdebug!("blocking task");
+
+        unsafe {
+            let blocked_task = this.current_task.swap_unwrap();
+            let f_fake_region = transmute::<&fn(~Coroutine), &fn(~Coroutine)>(f);
+            let f_opaque = ClosureConverter::from_fn(f_fake_region);
+            this.enqueue_cleanup_job(GiveTask(blocked_task, f_opaque));
+        }
+
+        Local::put(this);
+
+        unsafe {
+            let sched = Local::unsafe_borrow::<Scheduler>();
+            let (sched_context, last_task_context, _) = (*sched).get_contexts();
+            let last_task_context = last_task_context.unwrap();
+            Context::swap(last_task_context, sched_context);
+
+            // We could be executing in a different thread now
+            let sched = Local::unsafe_borrow::<Scheduler>();
+            (*sched).run_cleanup_job();
+        }
+    }
+
+    /// Switch directly to another task, without going through the scheduler.
+    /// You would want to think hard about doing this, e.g. if there are
+    /// pending I/O events it would be a bad idea.
+    fn switch_running_tasks_and_then(~self, next_task: ~Coroutine, f: &fn(~Coroutine)) {
+        let mut this = self;
+        assert!(this.in_task_context());
+
+        rtdebug!("switching tasks");
+
+        let old_running_task = this.current_task.swap_unwrap();
+        let f_fake_region = unsafe { transmute::<&fn(~Coroutine), &fn(~Coroutine)>(f) };
+        let f_opaque = ClosureConverter::from_fn(f_fake_region);
+        this.enqueue_cleanup_job(GiveTask(old_running_task, f_opaque));
+        this.current_task = Some(next_task);
+
+        Local::put(this);
+
+        unsafe {
+            let sched = Local::unsafe_borrow::<Scheduler>();
+            let (_, last_task_context, next_task_context) = (*sched).get_contexts();
+            let last_task_context = last_task_context.unwrap();
+            let next_task_context = next_task_context.unwrap();
+            Context::swap(last_task_context, next_task_context);
+
+            // We could be executing in a different thread now
+            let sched = Local::unsafe_borrow::<Scheduler>();
+            (*sched).run_cleanup_job();
+        }
+    }
+
+
+
+    // * Other stuff
+
+    fn enqueue_cleanup_job(&mut self, job: CleanupJob) {
+        assert!(self.cleanup_job.is_none());
+        self.cleanup_job = Some(job);
+    }
+
+    fn run_cleanup_job(&mut self) {
+        rtdebug!("running cleanup job");
+
+        assert!(self.cleanup_job.is_some());
+
+        let cleanup_job = self.cleanup_job.swap_unwrap();
+        match cleanup_job {
+            DoNothing => { }
+            GiveTask(task, f) => (f.to_fn())(task)
+        }
+    }
+
+    /// Get mutable references to all the contexts that may be involved in a
+    /// context switch.
+    ///
+    /// Returns (the scheduler context, the optional context of the
+    /// task in the cleanup list, the optional context of the task in
+    /// the current task slot).  When context switching to a task,
+    /// callers should first arrange for that task to be located in the
+    /// Scheduler's current_task slot and set up the
+    /// post-context-switch cleanup job.
+    fn get_contexts<'a>(&'a mut self) -> (&'a mut Context,
+                                          Option<&'a mut Context>,
+                                          Option<&'a mut Context>) {
+        let last_task = match self.cleanup_job {
+            Some(GiveTask(~ref task, _)) => {
+                Some(task)
+            }
+            Some(DoNothing) => {
+                None
+            }
+            None => fail!("all context switches should have a cleanup job")
+        };
+        // XXX: Pattern matching mutable pointers above doesn't work
+        // because borrowck thinks the three patterns are conflicting
+        // borrows
+        unsafe {
+            let last_task = transmute::<Option<&Coroutine>, Option<&mut Coroutine>>(last_task);
+            let last_task_context = match last_task {
+                Some(t) => Some(&mut t.saved_context), None => None
+            };
+            let next_task_context = match self.current_task {
+                Some(ref mut t) => Some(&mut t.saved_context), None => None
+            };
+            // XXX: These transmutes can be removed after snapshot
+            return (transmute(&mut self.saved_context),
+                    last_task_context,
+                    transmute(next_task_context));
+        }
+    }
+}
+
+static MIN_STACK_SIZE: uint = 10000000; // XXX: Too much stack
+
+pub struct Coroutine {
+    /// The segment of stack on which the task is currently running or,
+    /// if the task is blocked, on which the task will resume execution
+    priv current_stack_segment: StackSegment,
+    /// These are always valid when the task is not running, unless
+    /// the task is dead
+    priv saved_context: Context,
+    /// The heap, GC, unwinding, local storage, logging
+    task: ~Task
+}
+
+pub impl Coroutine {
+    fn new(stack_pool: &mut StackPool, start: ~fn()) -> Coroutine {
+        Coroutine::with_task(stack_pool, ~Task::new(), start)
+    }
+
+    fn with_task(stack_pool: &mut StackPool,
+                  task: ~Task,
+                  start: ~fn()) -> Coroutine {
+        let start = Coroutine::build_start_wrapper(start);
+        let mut stack = stack_pool.take_segment(MIN_STACK_SIZE);
+        // NB: Context holds a pointer to that ~fn
+        let initial_context = Context::new(start, &mut stack);
+        return Coroutine {
+            current_stack_segment: stack,
+            saved_context: initial_context,
+            task: task
+        };
+    }
+
+    priv fn build_start_wrapper(start: ~fn()) -> ~fn() {
+        // XXX: The old code didn't have this extra allocation
+        let wrapper: ~fn() = || {
+            // This is the first code to execute after the initial
+            // context switch to the task. The previous context may
+            // have asked us to do some cleanup.
+            unsafe {
+                let sched = Local::unsafe_borrow::<Scheduler>();
+                (*sched).run_cleanup_job();
+
+                let sched = Local::unsafe_borrow::<Scheduler>();
+                let task = (*sched).current_task.get_mut_ref();
+                // FIXME #6141: shouldn't neet to put `start()` in another closure
+                task.task.run(||start());
+            }
+
+            let sched = Local::take::<Scheduler>();
+            sched.terminate_current_task();
+        };
+        return wrapper;
+    }
+
+    /// Destroy the task and try to reuse its components
+    fn recycle(~self, stack_pool: &mut StackPool) {
+        match self {
+            ~Coroutine {current_stack_segment, _} => {
+                stack_pool.give_segment(current_stack_segment);
+            }
+        }
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use int;
+    use cell::Cell;
+    use rt::uv::uvio::UvEventLoop;
+    use unstable::run_in_bare_thread;
+    use task::spawn;
+    use rt::local::Local;
+    use rt::test::*;
+    use super::*;
+
+    #[test]
+    fn test_simple_scheduling() {
+        do run_in_bare_thread {
+            let mut task_ran = false;
+            let task_ran_ptr: *mut bool = &mut task_ran;
+
+            let mut sched = ~UvEventLoop::new_scheduler();
+            let task = ~do Coroutine::new(&mut sched.stack_pool) {
+                unsafe { *task_ran_ptr = true; }
+            };
+            sched.enqueue_task(task);
+            sched.run();
+            assert!(task_ran);
+        }
+    }
+
+    #[test]
+    fn test_several_tasks() {
+        do run_in_bare_thread {
+            let total = 10;
+            let mut task_count = 0;
+            let task_count_ptr: *mut int = &mut task_count;
+
+            let mut sched = ~UvEventLoop::new_scheduler();
+            for int::range(0, total) |_| {
+                let task = ~do Coroutine::new(&mut sched.stack_pool) {
+                    unsafe { *task_count_ptr = *task_count_ptr + 1; }
+                };
+                sched.enqueue_task(task);
+            }
+            sched.run();
+            assert_eq!(task_count, total);
+        }
+    }
+
+    #[test]
+    fn test_swap_tasks_then() {
+        do run_in_bare_thread {
+            let mut count = 0;
+            let count_ptr: *mut int = &mut count;
+
+            let mut sched = ~UvEventLoop::new_scheduler();
+            let task1 = ~do Coroutine::new(&mut sched.stack_pool) {
+                unsafe { *count_ptr = *count_ptr + 1; }
+                let mut sched = Local::take::<Scheduler>();
+                let task2 = ~do Coroutine::new(&mut sched.stack_pool) {
+                    unsafe { *count_ptr = *count_ptr + 1; }
+                };
+                // Context switch directly to the new task
+                do sched.switch_running_tasks_and_then(task2) |task1| {
+                    let task1 = Cell(task1);
+                    do Local::borrow::<Scheduler> |sched| {
+                        sched.enqueue_task(task1.take());
+                    }
+                }
+                unsafe { *count_ptr = *count_ptr + 1; }
+            };
+            sched.enqueue_task(task1);
+            sched.run();
+            assert_eq!(count, 3);
+        }
+    }
+
+    #[bench] #[test] #[ignore(reason = "long test")]
+    fn test_run_a_lot_of_tasks_queued() {
+        do run_in_bare_thread {
+            static MAX: int = 1000000;
+            let mut count = 0;
+            let count_ptr: *mut int = &mut count;
+
+            let mut sched = ~UvEventLoop::new_scheduler();
+
+            let start_task = ~do Coroutine::new(&mut sched.stack_pool) {
+                run_task(count_ptr);
+            };
+            sched.enqueue_task(start_task);
+            sched.run();
+
+            assert_eq!(count, MAX);
+
+            fn run_task(count_ptr: *mut int) {
+                do Local::borrow::<Scheduler> |sched| {
+                    let task = ~do Coroutine::new(&mut sched.stack_pool) {
+                        unsafe {
+                            *count_ptr = *count_ptr + 1;
+                            if *count_ptr != MAX {
+                                run_task(count_ptr);
+                            }
+                        }
+                    };
+                    sched.enqueue_task(task);
+                }
+            };
+        }
+    }
+
+    #[test]
+    fn test_block_task() {
+        do run_in_bare_thread {
+            let mut sched = ~UvEventLoop::new_scheduler();
+            let task = ~do Coroutine::new(&mut sched.stack_pool) {
+                let sched = Local::take::<Scheduler>();
+                assert!(sched.in_task_context());
+                do sched.deschedule_running_task_and_then() |task| {
+                    let task = Cell(task);
+                    do Local::borrow::<Scheduler> |sched| {
+                        assert!(!sched.in_task_context());
+                        sched.enqueue_task(task.take());
+                    }
+                }
+            };
+            sched.enqueue_task(task);
+            sched.run();
+        }
+    }
+
+    #[test]
+    fn test_io_callback() {
+        // This is a regression test that when there are no schedulable tasks
+        // in the work queue, but we are performing I/O, that once we do put
+        // something in the work queue again the scheduler picks it up and doesn't
+        // exit before emptying the work queue
+        do run_in_newsched_task {
+            do spawn {
+                let sched = Local::take::<Scheduler>();
+                do sched.deschedule_running_task_and_then |task| {
+                    let mut sched = Local::take::<Scheduler>();
+                    let task = Cell(task);
+                    do sched.event_loop.callback_ms(10) {
+                        rtdebug!("in callback");
+                        let mut sched = Local::take::<Scheduler>();
+                        sched.enqueue_task(task.take());
+                        Local::put(sched);
+                    }
+                    Local::put(sched);
+                }
+            }
+        }
+    }
+}
diff --git a/src/libstd/rt/tube.rs b/src/libstd/rt/tube.rs
new file mode 100644
index 00000000000..b2f475a6966
--- /dev/null
+++ b/src/libstd/rt/tube.rs
@@ -0,0 +1,185 @@
+// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+//! A very simple unsynchronized channel type for sending buffered data from
+//! scheduler context to task context.
+//!
+//! XXX: This would be safer to use if split into two types like Port/Chan
+
+use option::*;
+use clone::Clone;
+use super::rc::RC;
+use rt::sched::{Scheduler, Coroutine};
+use rt::{context, TaskContext, SchedulerContext};
+use rt::local::Local;
+use vec::OwnedVector;
+use container::Container;
+
+struct TubeState<T> {
+    blocked_task: Option<~Coroutine>,
+    buf: ~[T]
+}
+
+pub struct Tube<T> {
+    p: RC<TubeState<T>>
+}
+
+impl<T> Tube<T> {
+    pub fn new() -> Tube<T> {
+        Tube {
+            p: RC::new(TubeState {
+                blocked_task: None,
+                buf: ~[]
+            })
+        }
+    }
+
+    pub fn send(&mut self, val: T) {
+        rtdebug!("tube send");
+        assert!(context() == SchedulerContext);
+
+        unsafe {
+            let state = self.p.unsafe_borrow_mut();
+            (*state).buf.push(val);
+
+            if (*state).blocked_task.is_some() {
+                // There's a waiting task. Wake it up
+                rtdebug!("waking blocked tube");
+                let task = (*state).blocked_task.swap_unwrap();
+                let sched = Local::take::<Scheduler>();
+                sched.resume_task_immediately(task);
+            }
+        }
+    }
+
+    pub fn recv(&mut self) -> T {
+        assert!(context() == TaskContext);
+
+        unsafe {
+            let state = self.p.unsafe_borrow_mut();
+            if !(*state).buf.is_empty() {
+                return (*state).buf.shift();
+            } else {
+                // Block and wait for the next message
+                rtdebug!("blocking on tube recv");
+                assert!(self.p.refcount() > 1); // There better be somebody to wake us up
+                assert!((*state).blocked_task.is_none());
+                let sched = Local::take::<Scheduler>();
+                do sched.deschedule_running_task_and_then |task| {
+                    (*state).blocked_task = Some(task);
+                }
+                rtdebug!("waking after tube recv");
+                let buf = &mut (*state).buf;
+                assert!(!buf.is_empty());
+                return buf.shift();
+            }
+        }
+    }
+}
+
+impl<T> Clone for Tube<T> {
+    fn clone(&self) -> Tube<T> {
+        Tube { p: self.p.clone() }
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use int;
+    use cell::Cell;
+    use rt::test::*;
+    use rt::rtio::EventLoop;
+    use rt::sched::Scheduler;
+    use rt::local::Local;
+    use super::*;
+
+    #[test]
+    fn simple_test() {
+        do run_in_newsched_task {
+            let mut tube: Tube<int> = Tube::new();
+            let tube_clone = tube.clone();
+            let tube_clone_cell = Cell(tube_clone);
+            let sched = Local::take::<Scheduler>();
+            do sched.deschedule_running_task_and_then |task| {
+                let mut tube_clone = tube_clone_cell.take();
+                tube_clone.send(1);
+                let sched = Local::take::<Scheduler>();
+                sched.resume_task_immediately(task);
+            }
+
+            assert!(tube.recv() == 1);
+        }
+    }
+
+    #[test]
+    fn blocking_test() {
+        do run_in_newsched_task {
+            let mut tube: Tube<int> = Tube::new();
+            let tube_clone = tube.clone();
+            let tube_clone = Cell(Cell(Cell(tube_clone)));
+            let sched = Local::take::<Scheduler>();
+            do sched.deschedule_running_task_and_then |task| {
+                let tube_clone = tube_clone.take();
+                do Local::borrow::<Scheduler> |sched| {
+                    let tube_clone = tube_clone.take();
+                    do sched.event_loop.callback {
+                        let mut tube_clone = tube_clone.take();
+                        // The task should be blocked on this now and
+                        // sending will wake it up.
+                        tube_clone.send(1);
+                    }
+                }
+                let sched = Local::take::<Scheduler>();
+                sched.resume_task_immediately(task);
+            }
+
+            assert!(tube.recv() == 1);
+        }
+    }
+
+    #[test]
+    fn many_blocking_test() {
+        static MAX: int = 100;
+
+        do run_in_newsched_task {
+            let mut tube: Tube<int> = Tube::new();
+            let tube_clone = tube.clone();
+            let tube_clone = Cell(tube_clone);
+            let sched = Local::take::<Scheduler>();
+            do sched.deschedule_running_task_and_then |task| {
+                callback_send(tube_clone.take(), 0);
+
+                fn callback_send(tube: Tube<int>, i: int) {
+                    if i == 100 { return; }
+
+                    let tube = Cell(Cell(tube));
+                    do Local::borrow::<Scheduler> |sched| {
+                        let tube = tube.take();
+                        do sched.event_loop.callback {
+                            let mut tube = tube.take();
+                            // The task should be blocked on this now and
+                            // sending will wake it up.
+                            tube.send(i);
+                            callback_send(tube, i + 1);
+                        }
+                    }
+                }
+
+                let sched = Local::take::<Scheduler>();
+                sched.resume_task_immediately(task);
+            }
+
+            for int::range(0, MAX) |i| {
+                let j = tube.recv();
+                assert!(j == i);
+            }
+        }
+    }
+}
diff --git a/src/libstd/rt/uv/idle.rs b/src/libstd/rt/uv/idle.rs
new file mode 100644
index 00000000000..2cf0b5c4872
--- /dev/null
+++ b/src/libstd/rt/uv/idle.rs
@@ -0,0 +1,91 @@
+// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+use libc::c_int;
+use option::Some;
+use rt::uv::uvll;
+use rt::uv::{Watcher, Loop, NativeHandle, IdleCallback, NullCallback};
+use rt::uv::status_to_maybe_uv_error;
+
+pub struct IdleWatcher(*uvll::uv_idle_t);
+impl Watcher for IdleWatcher { }
+
+pub impl IdleWatcher {
+    fn new(loop_: &mut Loop) -> IdleWatcher {
+        unsafe {
+            let handle = uvll::idle_new();
+            assert!(handle.is_not_null());
+            assert!(0 == uvll::idle_init(loop_.native_handle(), handle));
+            let mut watcher: IdleWatcher = NativeHandle::from_native_handle(handle);
+            watcher.install_watcher_data();
+            return watcher
+        }
+    }
+
+    fn start(&mut self, cb: IdleCallback) {
+        {
+            let data = self.get_watcher_data();
+            data.idle_cb = Some(cb);
+        }
+
+        unsafe {
+            assert!(0 == uvll::idle_start(self.native_handle(), idle_cb))
+        };
+
+        extern fn idle_cb(handle: *uvll::uv_idle_t, status: c_int) {
+            let mut idle_watcher: IdleWatcher = NativeHandle::from_native_handle(handle);
+            let data = idle_watcher.get_watcher_data();
+            let cb: &IdleCallback = data.idle_cb.get_ref();
+            let status = status_to_maybe_uv_error(handle, status);
+            (*cb)(idle_watcher, status);
+        }
+    }
+
+    fn stop(&mut self) {
+        // NB: Not resetting the Rust idle_cb to None here because `stop` is likely
+        // called from *within* the idle callback, causing a use after free
+
+        unsafe {
+            assert!(0 == uvll::idle_stop(self.native_handle()));
+        }
+    }
+
+    fn close(self, cb: NullCallback) {
+        {
+            let mut this = self;
+            let data = this.get_watcher_data();
+            assert!(data.close_cb.is_none());
+            data.close_cb = Some(cb);
+        }
+
+        unsafe { uvll::close(self.native_handle(), close_cb) };
+
+        extern fn close_cb(handle: *uvll::uv_idle_t) {
+            unsafe {
+                let mut idle_watcher: IdleWatcher = NativeHandle::from_native_handle(handle);
+                {
+                    let data = idle_watcher.get_watcher_data();
+                    data.close_cb.swap_unwrap()();
+                }
+                idle_watcher.drop_watcher_data();
+                uvll::idle_delete(handle);
+            }
+        }
+    }
+}
+
+impl NativeHandle<*uvll::uv_idle_t> for IdleWatcher {
+    fn from_native_handle(handle: *uvll::uv_idle_t) -> IdleWatcher {
+        IdleWatcher(handle)
+    }
+    fn native_handle(&self) -> *uvll::uv_idle_t {
+        match self { &IdleWatcher(ptr) => ptr }
+    }
+}
diff --git a/src/libstd/rt/uv/timer.rs b/src/libstd/rt/uv/timer.rs
new file mode 100644
index 00000000000..5557a580987
--- /dev/null
+++ b/src/libstd/rt/uv/timer.rs
@@ -0,0 +1,183 @@
+// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+use libc::{c_void, c_int};
+use option::Some;
+use rt::uv::uvll;
+use rt::uv::{Watcher, Loop, NativeHandle, TimerCallback, NullCallback};
+use rt::uv::status_to_maybe_uv_error;
+
+pub struct TimerWatcher(*uvll::uv_timer_t);
+impl Watcher for TimerWatcher { }
+
+impl TimerWatcher {
+    pub fn new(loop_: &mut Loop) -> TimerWatcher {
+        unsafe {
+            let handle = uvll::malloc_handle(uvll::UV_TIMER);
+            assert!(handle.is_not_null());
+            assert!(0 == uvll::timer_init(loop_.native_handle(), handle));
+            let mut watcher: TimerWatcher = NativeHandle::from_native_handle(handle);
+            watcher.install_watcher_data();
+            return watcher;
+        }
+    }
+
+    pub fn start(&mut self, timeout: u64, repeat: u64, cb: TimerCallback) {
+        {
+            let data = self.get_watcher_data();
+            data.timer_cb = Some(cb);
+        }
+
+        unsafe {
+            uvll::timer_start(self.native_handle(), timer_cb, timeout, repeat);
+        }
+
+        extern fn timer_cb(handle: *uvll::uv_timer_t, status: c_int) {
+            let mut watcher: TimerWatcher = NativeHandle::from_native_handle(handle);
+            let data = watcher.get_watcher_data();
+            let cb = data.timer_cb.get_ref();
+            let status = status_to_maybe_uv_error(handle, status);
+            (*cb)(watcher, status);
+        }
+    }
+
+    pub fn stop(&mut self) {
+        unsafe {
+            uvll::timer_stop(self.native_handle());
+        }
+    }
+
+    pub fn close(self, cb: NullCallback) {
+        let mut watcher = self;
+        {
+            let data = watcher.get_watcher_data();
+            assert!(data.close_cb.is_none());
+            data.close_cb = Some(cb);
+        }
+
+        unsafe {
+            uvll::close(watcher.native_handle(), close_cb);
+        }
+
+        extern fn close_cb(handle: *uvll::uv_timer_t) {
+            let mut watcher: TimerWatcher = NativeHandle::from_native_handle(handle);
+            {
+                let data = watcher.get_watcher_data();
+                data.close_cb.swap_unwrap()();
+            }
+            watcher.drop_watcher_data();
+            unsafe {
+                uvll::free_handle(handle as *c_void);
+            }
+        }
+    }
+}
+
+impl NativeHandle<*uvll::uv_timer_t> for TimerWatcher {
+    fn from_native_handle(handle: *uvll::uv_timer_t) -> TimerWatcher {
+        TimerWatcher(handle)
+    }
+    fn native_handle(&self) -> *uvll::uv_idle_t {
+        match self { &TimerWatcher(ptr) => ptr }
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use super::*;
+    use rt::uv::Loop;
+    use unstable::run_in_bare_thread;
+
+    #[test]
+    fn smoke_test() {
+        do run_in_bare_thread {
+            let mut count = 0;
+            let count_ptr: *mut int = &mut count;
+            let mut loop_ = Loop::new();
+            let mut timer = TimerWatcher::new(&mut loop_);
+            do timer.start(10, 0) |timer, status| {
+                assert!(status.is_none());
+                unsafe { *count_ptr += 1 };
+                timer.close(||());
+            }
+            loop_.run();
+            loop_.close();
+            assert!(count == 1);
+        }
+    }
+
+    #[test]
+    fn start_twice() {
+        do run_in_bare_thread {
+            let mut count = 0;
+            let count_ptr: *mut int = &mut count;
+            let mut loop_ = Loop::new();
+            let mut timer = TimerWatcher::new(&mut loop_);
+            do timer.start(10, 0) |timer, status| {
+                let mut timer = timer;
+                assert!(status.is_none());
+                unsafe { *count_ptr += 1 };
+                do timer.start(10, 0) |timer, status| {
+                    assert!(status.is_none());
+                    unsafe { *count_ptr += 1 };
+                    timer.close(||());
+                }
+            }
+            loop_.run();
+            loop_.close();
+            assert!(count == 2);
+        }
+    }
+
+    #[test]
+    fn repeat_stop() {
+        do run_in_bare_thread {
+            let mut count = 0;
+            let count_ptr: *mut int = &mut count;
+            let mut loop_ = Loop::new();
+            let mut timer = TimerWatcher::new(&mut loop_);
+            do timer.start(10, 20) |timer, status| {
+                assert!(status.is_none());
+                unsafe {
+                    *count_ptr += 1;
+
+                    if *count_ptr == 10 {
+
+                        // Stop the timer and do something else
+                        let mut timer = timer;
+                        timer.stop();
+                        // Freeze timer so it can be captured
+                        let timer = timer;
+
+                        let mut loop_ = timer.event_loop();
+                        let mut timer2 = TimerWatcher::new(&mut loop_);
+                        do timer2.start(10, 0) |timer2, _| {
+
+                            unsafe { *count_ptr += 1; }
+
+                            timer2.close(||());
+
+                            // Restart the original timer
+                            let mut timer = timer;
+                            do timer.start(10, 0) |timer, _| {
+                                unsafe { *count_ptr += 1; }
+                                timer.close(||());
+                            }
+                        }
+                    }
+                };
+            }
+            loop_.run();
+            loop_.close();
+            assert!(count == 12);
+        }
+    }
+
+}
diff --git a/src/libstd/rt/uv/uvio.rs b/src/libstd/rt/uv/uvio.rs
new file mode 100644
index 00000000000..cacd67314eb
--- /dev/null
+++ b/src/libstd/rt/uv/uvio.rs
@@ -0,0 +1,492 @@
+// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+use option::*;
+use result::*;
+use ops::Drop;
+use cell::{Cell, empty_cell};
+use cast::transmute;
+use clone::Clone;
+use rt::io::IoError;
+use rt::io::net::ip::IpAddr;
+use rt::uv::*;
+use rt::uv::idle::IdleWatcher;
+use rt::rtio::*;
+use rt::sched::Scheduler;
+use rt::io::{standard_error, OtherIoError};
+use rt::tube::Tube;
+use rt::local::Local;
+
+#[cfg(test)] use container::Container;
+#[cfg(test)] use uint;
+#[cfg(test)] use unstable::run_in_bare_thread;
+#[cfg(test)] use rt::test::*;
+
+pub struct UvEventLoop {
+    uvio: UvIoFactory
+}
+
+pub impl UvEventLoop {
+    fn new() -> UvEventLoop {
+        UvEventLoop {
+            uvio: UvIoFactory(Loop::new())
+        }
+    }
+
+    /// A convenience constructor
+    fn new_scheduler() -> Scheduler {
+        Scheduler::new(~UvEventLoop::new())
+    }
+}
+
+impl Drop for UvEventLoop {
+    fn finalize(&self) {
+        // XXX: Need mutable finalizer
+        let this = unsafe {
+            transmute::<&UvEventLoop, &mut UvEventLoop>(self)
+        };
+        this.uvio.uv_loop().close();
+    }
+}
+
+impl EventLoop for UvEventLoop {
+
+    fn run(&mut self) {
+        self.uvio.uv_loop().run();
+    }
+
+    fn callback(&mut self, f: ~fn()) {
+        let mut idle_watcher =  IdleWatcher::new(self.uvio.uv_loop());
+        do idle_watcher.start |idle_watcher, status| {
+            assert!(status.is_none());
+            let mut idle_watcher = idle_watcher;
+            idle_watcher.stop();
+            idle_watcher.close(||());
+            f();
+        }
+    }
+
+    fn callback_ms(&mut self, ms: u64, f: ~fn()) {
+        let mut timer =  TimerWatcher::new(self.uvio.uv_loop());
+        do timer.start(ms, 0) |timer, status| {
+            assert!(status.is_none());
+            timer.close(||());
+            f();
+        }
+    }
+
+    fn io<'a>(&'a mut self) -> Option<&'a mut IoFactoryObject> {
+        Some(&mut self.uvio)
+    }
+}
+
+#[test]
+fn test_callback_run_once() {
+    do run_in_bare_thread {
+        let mut event_loop = UvEventLoop::new();
+        let mut count = 0;
+        let count_ptr: *mut int = &mut count;
+        do event_loop.callback {
+            unsafe { *count_ptr += 1 }
+        }
+        event_loop.run();
+        assert_eq!(count, 1);
+    }
+}
+
+pub struct UvIoFactory(Loop);
+
+pub impl UvIoFactory {
+    fn uv_loop<'a>(&'a mut self) -> &'a mut Loop {
+        match self { &UvIoFactory(ref mut ptr) => ptr }
+    }
+}
+
+impl IoFactory for UvIoFactory {
+    // Connect to an address and return a new stream
+    // NB: This blocks the task waiting on the connection.
+    // It would probably be better to return a future
+    fn tcp_connect(&mut self, addr: IpAddr) -> Result<~RtioTcpStreamObject, IoError> {
+        // Create a cell in the task to hold the result. We will fill
+        // the cell before resuming the task.
+        let result_cell = empty_cell();
+        let result_cell_ptr: *Cell<Result<~RtioTcpStreamObject, IoError>> = &result_cell;
+
+        let scheduler = Local::take::<Scheduler>();
+        assert!(scheduler.in_task_context());
+
+        // Block this task and take ownership, switch to scheduler context
+        do scheduler.deschedule_running_task_and_then |task| {
+
+            rtdebug!("connect: entered scheduler context");
+            do Local::borrow::<Scheduler> |scheduler| {
+                assert!(!scheduler.in_task_context());
+            }
+            let mut tcp_watcher = TcpWatcher::new(self.uv_loop());
+            let task_cell = Cell(task);
+
+            // Wait for a connection
+            do tcp_watcher.connect(addr) |stream_watcher, status| {
+                rtdebug!("connect: in connect callback");
+                if status.is_none() {
+                    rtdebug!("status is none");
+                    let res = Ok(~UvTcpStream { watcher: stream_watcher });
+
+                    // Store the stream in the task's stack
+                    unsafe { (*result_cell_ptr).put_back(res); }
+
+                    // Context switch
+                    let scheduler = Local::take::<Scheduler>();
+                    scheduler.resume_task_immediately(task_cell.take());
+                } else {
+                    rtdebug!("status is some");
+                    let task_cell = Cell(task_cell.take());
+                    do stream_watcher.close {
+                        let res = Err(uv_error_to_io_error(status.get()));
+                        unsafe { (*result_cell_ptr).put_back(res); }
+                        let scheduler = Local::take::<Scheduler>();
+                        scheduler.resume_task_immediately(task_cell.take());
+                    }
+                };
+            }
+        }
+
+        assert!(!result_cell.is_empty());
+        return result_cell.take();
+    }
+
+    fn tcp_bind(&mut self, addr: IpAddr) -> Result<~RtioTcpListenerObject, IoError> {
+        let mut watcher = TcpWatcher::new(self.uv_loop());
+        match watcher.bind(addr) {
+            Ok(_) => Ok(~UvTcpListener::new(watcher)),
+            Err(uverr) => {
+                let scheduler = Local::take::<Scheduler>();
+                do scheduler.deschedule_running_task_and_then |task| {
+                    let task_cell = Cell(task);
+                    do watcher.as_stream().close {
+                        let scheduler = Local::take::<Scheduler>();
+                        scheduler.resume_task_immediately(task_cell.take());
+                    }
+                }
+                Err(uv_error_to_io_error(uverr))
+            }
+        }
+    }
+}
+
+// FIXME #6090: Prefer newtype structs but Drop doesn't work
+pub struct UvTcpListener {
+    watcher: TcpWatcher,
+    listening: bool,
+    incoming_streams: Tube<Result<~RtioTcpStreamObject, IoError>>
+}
+
+impl UvTcpListener {
+    fn new(watcher: TcpWatcher) -> UvTcpListener {
+        UvTcpListener {
+            watcher: watcher,
+            listening: false,
+            incoming_streams: Tube::new()
+        }
+    }
+
+    fn watcher(&self) -> TcpWatcher { self.watcher }
+}
+
+impl Drop for UvTcpListener {
+    fn finalize(&self) {
+        let watcher = self.watcher();
+        let scheduler = Local::take::<Scheduler>();
+        do scheduler.deschedule_running_task_and_then |task| {
+            let task_cell = Cell(task);
+            do watcher.as_stream().close {
+                let scheduler = Local::take::<Scheduler>();
+                scheduler.resume_task_immediately(task_cell.take());
+            }
+        }
+    }
+}
+
+impl RtioTcpListener for UvTcpListener {
+
+    fn accept(&mut self) -> Result<~RtioTcpStreamObject, IoError> {
+        rtdebug!("entering listen");
+
+        if self.listening {
+            return self.incoming_streams.recv();
+        }
+
+        self.listening = true;
+
+        let server_tcp_watcher = self.watcher();
+        let incoming_streams_cell = Cell(self.incoming_streams.clone());
+
+        let incoming_streams_cell = Cell(incoming_streams_cell.take());
+        let mut server_tcp_watcher = server_tcp_watcher;
+        do server_tcp_watcher.listen |server_stream_watcher, status| {
+            let maybe_stream = if status.is_none() {
+                let mut server_stream_watcher = server_stream_watcher;
+                let mut loop_ = server_stream_watcher.event_loop();
+                let client_tcp_watcher = TcpWatcher::new(&mut loop_);
+                let client_tcp_watcher = client_tcp_watcher.as_stream();
+                // XXX: Need's to be surfaced in interface
+                server_stream_watcher.accept(client_tcp_watcher);
+                Ok(~UvTcpStream { watcher: client_tcp_watcher })
+            } else {
+                Err(standard_error(OtherIoError))
+            };
+
+            let mut incoming_streams = incoming_streams_cell.take();
+            incoming_streams.send(maybe_stream);
+            incoming_streams_cell.put_back(incoming_streams);
+        }
+
+        return self.incoming_streams.recv();
+    }
+}
+
+// FIXME #6090: Prefer newtype structs but Drop doesn't work
+pub struct UvTcpStream {
+    watcher: StreamWatcher
+}
+
+impl UvTcpStream {
+    fn watcher(&self) -> StreamWatcher { self.watcher }
+}
+
+impl Drop for UvTcpStream {
+    fn finalize(&self) {
+        rtdebug!("closing tcp stream");
+        let watcher = self.watcher();
+        let scheduler = Local::take::<Scheduler>();
+        do scheduler.deschedule_running_task_and_then |task| {
+            let task_cell = Cell(task);
+            do watcher.close {
+                let scheduler = Local::take::<Scheduler>();
+                scheduler.resume_task_immediately(task_cell.take());
+            }
+        }
+    }
+}
+
+impl RtioTcpStream for UvTcpStream {
+    fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
+        let result_cell = empty_cell();
+        let result_cell_ptr: *Cell<Result<uint, IoError>> = &result_cell;
+
+        let scheduler = Local::take::<Scheduler>();
+        assert!(scheduler.in_task_context());
+        let watcher = self.watcher();
+        let buf_ptr: *&mut [u8] = &buf;
+        do scheduler.deschedule_running_task_and_then |task| {
+            rtdebug!("read: entered scheduler context");
+            do Local::borrow::<Scheduler> |scheduler| {
+                assert!(!scheduler.in_task_context());
+            }
+            let mut watcher = watcher;
+            let task_cell = Cell(task);
+            // XXX: We shouldn't reallocate these callbacks every
+            // call to read
+            let alloc: AllocCallback = |_| unsafe {
+                slice_to_uv_buf(*buf_ptr)
+            };
+            do watcher.read_start(alloc) |watcher, nread, _buf, status| {
+
+                // Stop reading so that no read callbacks are
+                // triggered before the user calls `read` again.
+                // XXX: Is there a performance impact to calling
+                // stop here?
+                let mut watcher = watcher;
+                watcher.read_stop();
+
+                let result = if status.is_none() {
+                    assert!(nread >= 0);
+                    Ok(nread as uint)
+                } else {
+                    Err(uv_error_to_io_error(status.unwrap()))
+                };
+
+                unsafe { (*result_cell_ptr).put_back(result); }
+
+                let scheduler = Local::take::<Scheduler>();
+                scheduler.resume_task_immediately(task_cell.take());
+            }
+        }
+
+        assert!(!result_cell.is_empty());
+        return result_cell.take();
+    }
+
+    fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
+        let result_cell = empty_cell();
+        let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
+        let scheduler = Local::take::<Scheduler>();
+        assert!(scheduler.in_task_context());
+        let watcher = self.watcher();
+        let buf_ptr: *&[u8] = &buf;
+        do scheduler.deschedule_running_task_and_then |task| {
+            let mut watcher = watcher;
+            let task_cell = Cell(task);
+            let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
+            do watcher.write(buf) |_watcher, status| {
+                let result = if status.is_none() {
+                    Ok(())
+                } else {
+                    Err(uv_error_to_io_error(status.unwrap()))
+                };
+
+                unsafe { (*result_cell_ptr).put_back(result); }
+
+                let scheduler = Local::take::<Scheduler>();
+                scheduler.resume_task_immediately(task_cell.take());
+            }
+        }
+
+        assert!(!result_cell.is_empty());
+        return result_cell.take();
+    }
+}
+
+#[test]
+fn test_simple_io_no_connect() {
+    do run_in_newsched_task {
+        unsafe {
+            let io = Local::unsafe_borrow::<IoFactoryObject>();
+            let addr = next_test_ip4();
+            let maybe_chan = (*io).tcp_connect(addr);
+            assert!(maybe_chan.is_err());
+        }
+    }
+}
+
+#[test]
+fn test_simple_tcp_server_and_client() {
+    do run_in_newsched_task {
+        let addr = next_test_ip4();
+
+        // Start the server first so it's listening when we connect
+        do spawntask_immediately {
+            unsafe {
+                let io = Local::unsafe_borrow::<IoFactoryObject>();
+                let mut listener = (*io).tcp_bind(addr).unwrap();
+                let mut stream = listener.accept().unwrap();
+                let mut buf = [0, .. 2048];
+                let nread = stream.read(buf).unwrap();
+                assert_eq!(nread, 8);
+                for uint::range(0, nread) |i| {
+                    rtdebug!("%u", buf[i] as uint);
+                    assert_eq!(buf[i], i as u8);
+                }
+            }
+        }
+
+        do spawntask_immediately {
+            unsafe {
+                let io = Local::unsafe_borrow::<IoFactoryObject>();
+                let mut stream = (*io).tcp_connect(addr).unwrap();
+                stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
+            }
+        }
+    }
+}
+
+#[test] #[ignore(reason = "busted")]
+fn test_read_and_block() {
+    do run_in_newsched_task {
+        let addr = next_test_ip4();
+
+        do spawntask_immediately {
+            let io = unsafe { Local::unsafe_borrow::<IoFactoryObject>() };
+            let mut listener = unsafe { (*io).tcp_bind(addr).unwrap() };
+            let mut stream = listener.accept().unwrap();
+            let mut buf = [0, .. 2048];
+
+            let expected = 32;
+            let mut current = 0;
+            let mut reads = 0;
+
+            while current < expected {
+                let nread = stream.read(buf).unwrap();
+                for uint::range(0, nread) |i| {
+                    let val = buf[i] as uint;
+                    assert_eq!(val, current % 8);
+                    current += 1;
+                }
+                reads += 1;
+
+                let scheduler = Local::take::<Scheduler>();
+                // Yield to the other task in hopes that it
+                // will trigger a read callback while we are
+                // not ready for it
+                do scheduler.deschedule_running_task_and_then |task| {
+                    let task = Cell(task);
+                    do Local::borrow::<Scheduler> |scheduler| {
+                        scheduler.enqueue_task(task.take());
+                    }
+                }
+            }
+
+            // Make sure we had multiple reads
+            assert!(reads > 1);
+        }
+
+        do spawntask_immediately {
+            unsafe {
+                let io = Local::unsafe_borrow::<IoFactoryObject>();
+                let mut stream = (*io).tcp_connect(addr).unwrap();
+                stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
+                stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
+                stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
+                stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
+            }
+        }
+
+    }
+}
+
+#[test]
+fn test_read_read_read() {
+    do run_in_newsched_task {
+        let addr = next_test_ip4();
+        static MAX: uint = 500000;
+
+        do spawntask_immediately {
+            unsafe {
+                let io = Local::unsafe_borrow::<IoFactoryObject>();
+                let mut listener = (*io).tcp_bind(addr).unwrap();
+                let mut stream = listener.accept().unwrap();
+                let buf = [1, .. 2048];
+                let mut total_bytes_written = 0;
+                while total_bytes_written < MAX {
+                    stream.write(buf);
+                    total_bytes_written += buf.len();
+                }
+            }
+        }
+
+        do spawntask_immediately {
+            unsafe {
+                let io = Local::unsafe_borrow::<IoFactoryObject>();
+                let mut stream = (*io).tcp_connect(addr).unwrap();
+                let mut buf = [0, .. 2048];
+                let mut total_bytes_read = 0;
+                while total_bytes_read < MAX {
+                    let nread = stream.read(buf).unwrap();
+                    rtdebug!("read %u bytes", nread as uint);
+                    total_bytes_read += nread;
+                    for uint::range(0, nread) |i| {
+                        assert_eq!(buf[i], 1);
+                    }
+                }
+                rtdebug!("read %u bytes total", total_bytes_read as uint);
+            }
+        }
+    }
+}
diff --git a/src/libstd/rt/uv/uvll.rs b/src/libstd/rt/uv/uvll.rs
new file mode 100644
index 00000000000..ddc9040d730
--- /dev/null
+++ b/src/libstd/rt/uv/uvll.rs
@@ -0,0 +1,452 @@
+// Copyright 2012 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+/*!
+ * Low-level bindings to the libuv library.
+ *
+ * This module contains a set of direct, 'bare-metal' wrappers around
+ * the libuv C-API.
+ *
+ * We're not bothering yet to redefine uv's structs as Rust structs
+ * because they are quite large and change often between versions.
+ * The maintenance burden is just too high. Instead we use the uv's
+ * `uv_handle_size` and `uv_req_size` to find the correct size of the
+ * structs and allocate them on the heap. This can be revisited later.
+ *
+ * There are also a collection of helper functions to ease interacting
+ * with the low-level API.
+ *
+ * As new functionality, existant in uv.h, is added to the rust stdlib,
+ * the mappings should be added in this module.
+ */
+
+#[allow(non_camel_case_types)]; // C types
+
+use libc::{size_t, c_int, c_uint, c_void, c_char, uintptr_t};
+use libc::{malloc, free};
+use prelude::*;
+
+pub static UNKNOWN: c_int = -1;
+pub static OK: c_int = 0;
+pub static EOF: c_int = 1;
+pub static EADDRINFO: c_int = 2;
+pub static EACCES: c_int = 3;
+pub static ECONNREFUSED: c_int = 12;
+pub static ECONNRESET: c_int = 13;
+pub static EPIPE: c_int = 36;
+
+pub struct uv_err_t {
+    code: c_int,
+    sys_errno_: c_int
+}
+
+pub struct uv_buf_t {
+    base: *u8,
+    len: libc::size_t,
+}
+
+pub type uv_handle_t = c_void;
+pub type uv_loop_t = c_void;
+pub type uv_idle_t = c_void;
+pub type uv_tcp_t = c_void;
+pub type uv_connect_t = c_void;
+pub type uv_write_t = c_void;
+pub type uv_async_t = c_void;
+pub type uv_timer_t = c_void;
+pub type uv_stream_t = c_void;
+pub type uv_fs_t = c_void;
+
+pub type uv_idle_cb = *u8;
+
+pub type sockaddr_in = c_void;
+pub type sockaddr_in6 = c_void;
+
+#[deriving(Eq)]
+pub enum uv_handle_type {
+    UV_UNKNOWN_HANDLE,
+    UV_ASYNC,
+    UV_CHECK,
+    UV_FS_EVENT,
+    UV_FS_POLL,
+    UV_HANDLE,
+    UV_IDLE,
+    UV_NAMED_PIPE,
+    UV_POLL,
+    UV_PREPARE,
+    UV_PROCESS,
+    UV_STREAM,
+    UV_TCP,
+    UV_TIMER,
+    UV_TTY,
+    UV_UDP,
+    UV_SIGNAL,
+    UV_FILE,
+    UV_HANDLE_TYPE_MAX
+}
+
+#[deriving(Eq)]
+pub enum uv_req_type {
+    UV_UNKNOWN_REQ,
+    UV_REQ,
+    UV_CONNECT,
+    UV_WRITE,
+    UV_SHUTDOWN,
+    UV_UDP_SEND,
+    UV_FS,
+    UV_WORK,
+    UV_GETADDRINFO,
+    UV_REQ_TYPE_MAX
+}
+
+pub unsafe fn malloc_handle(handle: uv_handle_type) -> *c_void {
+    assert!(handle != UV_UNKNOWN_HANDLE && handle != UV_HANDLE_TYPE_MAX);
+    let size = rust_uv_handle_size(handle as uint);
+    let p = malloc(size);
+    assert!(p.is_not_null());
+    return p;
+}
+
+pub unsafe fn free_handle(v: *c_void) {
+    free(v)
+}
+
+pub unsafe fn malloc_req(req: uv_req_type) -> *c_void {
+    assert!(req != UV_UNKNOWN_REQ && req != UV_REQ_TYPE_MAX);
+    let size = rust_uv_req_size(req as uint);
+    let p = malloc(size);
+    assert!(p.is_not_null());
+    return p;
+}
+
+pub unsafe fn free_req(v: *c_void) {
+    free(v)
+}
+
+#[test]
+fn handle_sanity_check() {
+    unsafe {
+        assert_eq!(UV_HANDLE_TYPE_MAX as uint, rust_uv_handle_type_max());
+    }
+}
+
+#[test]
+fn request_sanity_check() {
+    unsafe {
+        assert_eq!(UV_REQ_TYPE_MAX as uint, rust_uv_req_type_max());
+    }
+}
+
+pub unsafe fn loop_new() -> *c_void {
+    return rust_uv_loop_new();
+}
+
+pub unsafe fn loop_delete(loop_handle: *c_void) {
+    rust_uv_loop_delete(loop_handle);
+}
+
+pub unsafe fn run(loop_handle: *c_void) {
+    rust_uv_run(loop_handle);
+}
+
+pub unsafe fn close<T>(handle: *T, cb: *u8) {
+    rust_uv_close(handle as *c_void, cb);
+}
+
+pub unsafe fn walk(loop_handle: *c_void, cb: *u8, arg: *c_void) {
+    rust_uv_walk(loop_handle, cb, arg);
+}
+
+pub unsafe fn idle_new() -> *uv_idle_t {
+    rust_uv_idle_new()
+}
+
+pub unsafe fn idle_delete(handle: *uv_idle_t) {
+    rust_uv_idle_delete(handle)
+}
+
+pub unsafe fn idle_init(loop_handle: *uv_loop_t, handle: *uv_idle_t) -> c_int {
+    rust_uv_idle_init(loop_handle, handle)
+}
+
+pub unsafe fn idle_start(handle: *uv_idle_t, cb: uv_idle_cb) -> c_int {
+    rust_uv_idle_start(handle, cb)
+}
+
+pub unsafe fn idle_stop(handle: *uv_idle_t) -> c_int {
+    rust_uv_idle_stop(handle)
+}
+
+pub unsafe fn tcp_init(loop_handle: *c_void, handle: *uv_tcp_t) -> c_int {
+    return rust_uv_tcp_init(loop_handle, handle);
+}
+
+// FIXME ref #2064
+pub unsafe fn tcp_connect(connect_ptr: *uv_connect_t,
+                          tcp_handle_ptr: *uv_tcp_t,
+                          addr_ptr: *sockaddr_in,
+                          after_connect_cb: *u8) -> c_int {
+    return rust_uv_tcp_connect(connect_ptr, tcp_handle_ptr,
+                                       after_connect_cb, addr_ptr);
+}
+// FIXME ref #2064
+pub unsafe fn tcp_connect6(connect_ptr: *uv_connect_t,
+                           tcp_handle_ptr: *uv_tcp_t,
+                           addr_ptr: *sockaddr_in6,
+                           after_connect_cb: *u8) -> c_int {
+    return rust_uv_tcp_connect6(connect_ptr, tcp_handle_ptr,
+                                        after_connect_cb, addr_ptr);
+}
+// FIXME ref #2064
+pub unsafe fn tcp_bind(tcp_server_ptr: *uv_tcp_t, addr_ptr: *sockaddr_in) -> c_int {
+    return rust_uv_tcp_bind(tcp_server_ptr, addr_ptr);
+}
+// FIXME ref #2064
+pub unsafe fn tcp_bind6(tcp_server_ptr: *uv_tcp_t, addr_ptr: *sockaddr_in6) -> c_int {
+    return rust_uv_tcp_bind6(tcp_server_ptr, addr_ptr);
+}
+
+pub unsafe fn tcp_getpeername(tcp_handle_ptr: *uv_tcp_t, name: *sockaddr_in) -> c_int {
+    return rust_uv_tcp_getpeername(tcp_handle_ptr, name);
+}
+
+pub unsafe fn tcp_getpeername6(tcp_handle_ptr: *uv_tcp_t, name: *sockaddr_in6) ->c_int {
+    return rust_uv_tcp_getpeername6(tcp_handle_ptr, name);
+}
+
+pub unsafe fn listen<T>(stream: *T, backlog: c_int, cb: *u8) -> c_int {
+    return rust_uv_listen(stream as *c_void, backlog, cb);
+}
+
+pub unsafe fn accept(server: *c_void, client: *c_void) -> c_int {
+    return rust_uv_accept(server as *c_void, client as *c_void);
+}
+
+pub unsafe fn write<T>(req: *uv_write_t, stream: *T, buf_in: &[uv_buf_t], cb: *u8) -> c_int {
+    let buf_ptr = vec::raw::to_ptr(buf_in);
+    let buf_cnt = buf_in.len() as i32;
+    return rust_uv_write(req as *c_void, stream as *c_void, buf_ptr, buf_cnt, cb);
+}
+pub unsafe fn read_start(stream: *uv_stream_t, on_alloc: *u8, on_read: *u8) -> c_int {
+    return rust_uv_read_start(stream as *c_void, on_alloc, on_read);
+}
+
+pub unsafe fn read_stop(stream: *uv_stream_t) -> c_int {
+    return rust_uv_read_stop(stream as *c_void);
+}
+
+pub unsafe fn last_error(loop_handle: *c_void) -> uv_err_t {
+    return rust_uv_last_error(loop_handle);
+}
+
+pub unsafe fn strerror(err: *uv_err_t) -> *c_char {
+    return rust_uv_strerror(err);
+}
+pub unsafe fn err_name(err: *uv_err_t) -> *c_char {
+    return rust_uv_err_name(err);
+}
+
+pub unsafe fn async_init(loop_handle: *c_void, async_handle: *uv_async_t, cb: *u8) -> c_int {
+    return rust_uv_async_init(loop_handle, async_handle, cb);
+}
+
+pub unsafe fn async_send(async_handle: *uv_async_t) {
+    return rust_uv_async_send(async_handle);
+}
+pub unsafe fn buf_init(input: *u8, len: uint) -> uv_buf_t {
+    let out_buf = uv_buf_t { base: ptr::null(), len: 0 as size_t };
+    let out_buf_ptr = ptr::to_unsafe_ptr(&out_buf);
+    rust_uv_buf_init(out_buf_ptr, input, len as size_t);
+    return out_buf;
+}
+
+pub unsafe fn timer_init(loop_ptr: *c_void, timer_ptr: *uv_timer_t) -> c_int {
+    return rust_uv_timer_init(loop_ptr, timer_ptr);
+}
+pub unsafe fn timer_start(timer_ptr: *uv_timer_t, cb: *u8, timeout: u64,
+                          repeat: u64) -> c_int {
+    return rust_uv_timer_start(timer_ptr, cb, timeout, repeat);
+}
+pub unsafe fn timer_stop(timer_ptr: *uv_timer_t) -> c_int {
+    return rust_uv_timer_stop(timer_ptr);
+}
+
+pub unsafe fn malloc_ip4_addr(ip: &str, port: int) -> *sockaddr_in {
+    do str::as_c_str(ip) |ip_buf| {
+        rust_uv_ip4_addrp(ip_buf as *u8, port as libc::c_int)
+    }
+}
+pub unsafe fn malloc_ip6_addr(ip: &str, port: int) -> *sockaddr_in6 {
+    do str::as_c_str(ip) |ip_buf| {
+        rust_uv_ip6_addrp(ip_buf as *u8, port as libc::c_int)
+    }
+}
+
+pub unsafe fn free_ip4_addr(addr: *sockaddr_in) {
+    rust_uv_free_ip4_addr(addr);
+}
+
+pub unsafe fn free_ip6_addr(addr: *sockaddr_in6) {
+    rust_uv_free_ip6_addr(addr);
+}
+
+// data access helpers
+pub unsafe fn get_loop_for_uv_handle<T>(handle: *T) -> *c_void {
+    return rust_uv_get_loop_for_uv_handle(handle as *c_void);
+}
+pub unsafe fn get_stream_handle_from_connect_req(connect: *uv_connect_t) -> *uv_stream_t {
+    return rust_uv_get_stream_handle_from_connect_req(connect);
+}
+pub unsafe fn get_stream_handle_from_write_req(write_req: *uv_write_t) -> *uv_stream_t {
+    return rust_uv_get_stream_handle_from_write_req(write_req);
+}
+pub unsafe fn get_data_for_uv_loop(loop_ptr: *c_void) -> *c_void {
+    rust_uv_get_data_for_uv_loop(loop_ptr)
+}
+pub unsafe fn set_data_for_uv_loop(loop_ptr: *c_void, data: *c_void) {
+    rust_uv_set_data_for_uv_loop(loop_ptr, data);
+}
+pub unsafe fn get_data_for_uv_handle<T>(handle: *T) -> *c_void {
+    return rust_uv_get_data_for_uv_handle(handle as *c_void);
+}
+pub unsafe fn set_data_for_uv_handle<T, U>(handle: *T, data: *U) {
+    rust_uv_set_data_for_uv_handle(handle as *c_void, data as *c_void);
+}
+pub unsafe fn get_data_for_req<T>(req: *T) -> *c_void {
+    return rust_uv_get_data_for_req(req as *c_void);
+}
+pub unsafe fn set_data_for_req<T, U>(req: *T, data: *U) {
+    rust_uv_set_data_for_req(req as *c_void, data as *c_void);
+}
+pub unsafe fn get_base_from_buf(buf: uv_buf_t) -> *u8 {
+    return rust_uv_get_base_from_buf(buf);
+}
+pub unsafe fn get_len_from_buf(buf: uv_buf_t) -> size_t {
+    return rust_uv_get_len_from_buf(buf);
+}
+pub unsafe fn malloc_buf_base_of(suggested_size: size_t) -> *u8 {
+    return rust_uv_malloc_buf_base_of(suggested_size);
+}
+pub unsafe fn free_base_of_buf(buf: uv_buf_t) {
+    rust_uv_free_base_of_buf(buf);
+}
+
+pub unsafe fn get_last_err_info(uv_loop: *c_void) -> ~str {
+    let err = last_error(uv_loop);
+    let err_ptr = ptr::to_unsafe_ptr(&err);
+    let err_name = str::raw::from_c_str(err_name(err_ptr));
+    let err_msg = str::raw::from_c_str(strerror(err_ptr));
+    return fmt!("LIBUV ERROR: name: %s msg: %s",
+                    err_name, err_msg);
+}
+
+pub unsafe fn get_last_err_data(uv_loop: *c_void) -> uv_err_data {
+    let err = last_error(uv_loop);
+    let err_ptr = ptr::to_unsafe_ptr(&err);
+    let err_name = str::raw::from_c_str(err_name(err_ptr));
+    let err_msg = str::raw::from_c_str(strerror(err_ptr));
+    uv_err_data { err_name: err_name, err_msg: err_msg }
+}
+
+pub struct uv_err_data {
+    err_name: ~str,
+    err_msg: ~str,
+}
+
+extern {
+
+    fn rust_uv_handle_size(type_: uintptr_t) -> size_t;
+    fn rust_uv_req_size(type_: uintptr_t) -> size_t;
+    fn rust_uv_handle_type_max() -> uintptr_t;
+    fn rust_uv_req_type_max() -> uintptr_t;
+
+    // libuv public API
+    fn rust_uv_loop_new() -> *c_void;
+    fn rust_uv_loop_delete(lp: *c_void);
+    fn rust_uv_run(loop_handle: *c_void);
+    fn rust_uv_close(handle: *c_void, cb: *u8);
+    fn rust_uv_walk(loop_handle: *c_void, cb: *u8, arg: *c_void);
+
+    fn rust_uv_idle_new() -> *uv_idle_t;
+    fn rust_uv_idle_delete(handle: *uv_idle_t);
+    fn rust_uv_idle_init(loop_handle: *uv_loop_t, handle: *uv_idle_t) -> c_int;
+    fn rust_uv_idle_start(handle: *uv_idle_t, cb: uv_idle_cb) -> c_int;
+    fn rust_uv_idle_stop(handle: *uv_idle_t) -> c_int;
+
+    fn rust_uv_async_send(handle: *uv_async_t);
+    fn rust_uv_async_init(loop_handle: *c_void,
+                          async_handle: *uv_async_t,
+                          cb: *u8) -> c_int;
+    fn rust_uv_tcp_init(loop_handle: *c_void, handle_ptr: *uv_tcp_t) -> c_int;
+    // FIXME ref #2604 .. ?
+    fn rust_uv_buf_init(out_buf: *uv_buf_t, base: *u8, len: size_t);
+    fn rust_uv_last_error(loop_handle: *c_void) -> uv_err_t;
+    // FIXME ref #2064
+    fn rust_uv_strerror(err: *uv_err_t) -> *c_char;
+    // FIXME ref #2064
+    fn rust_uv_err_name(err: *uv_err_t) -> *c_char;
+    fn rust_uv_ip4_addrp(ip: *u8, port: c_int) -> *sockaddr_in;
+    fn rust_uv_ip6_addrp(ip: *u8, port: c_int) -> *sockaddr_in6;
+    fn rust_uv_free_ip4_addr(addr: *sockaddr_in);
+    fn rust_uv_free_ip6_addr(addr: *sockaddr_in6);
+    fn rust_uv_ip4_name(src: *sockaddr_in, dst: *u8, size: size_t) -> c_int;
+    fn rust_uv_ip6_name(src: *sockaddr_in6, dst: *u8, size: size_t) -> c_int;
+    fn rust_uv_ip4_port(src: *sockaddr_in) -> c_uint;
+    fn rust_uv_ip6_port(src: *sockaddr_in6) -> c_uint;
+    // FIXME ref #2064
+    fn rust_uv_tcp_connect(connect_ptr: *uv_connect_t,
+                           tcp_handle_ptr: *uv_tcp_t,
+                           after_cb: *u8,
+                           addr: *sockaddr_in) -> c_int;
+    // FIXME ref #2064
+    fn rust_uv_tcp_bind(tcp_server: *uv_tcp_t, addr: *sockaddr_in) -> c_int;
+    // FIXME ref #2064
+    fn rust_uv_tcp_connect6(connect_ptr: *uv_connect_t,
+                            tcp_handle_ptr: *uv_tcp_t,
+                            after_cb: *u8,
+                            addr: *sockaddr_in6) -> c_int;
+    // FIXME ref #2064
+    fn rust_uv_tcp_bind6(tcp_server: *uv_tcp_t, addr: *sockaddr_in6) -> c_int;
+    fn rust_uv_tcp_getpeername(tcp_handle_ptr: *uv_tcp_t,
+                               name: *sockaddr_in) -> c_int;
+    fn rust_uv_tcp_getpeername6(tcp_handle_ptr: *uv_tcp_t,
+                                name: *sockaddr_in6) ->c_int;
+    fn rust_uv_listen(stream: *c_void, backlog: c_int, cb: *u8) -> c_int;
+    fn rust_uv_accept(server: *c_void, client: *c_void) -> c_int;
+    fn rust_uv_write(req: *c_void,
+                     stream: *c_void,
+                     buf_in: *uv_buf_t,
+                     buf_cnt: c_int,
+                     cb: *u8) -> c_int;
+    fn rust_uv_read_start(stream: *c_void,
+                          on_alloc: *u8,
+                          on_read: *u8) -> c_int;
+    fn rust_uv_read_stop(stream: *c_void) -> c_int;
+    fn rust_uv_timer_init(loop_handle: *c_void,
+                          timer_handle: *uv_timer_t) -> c_int;
+    fn rust_uv_timer_start(timer_handle: *uv_timer_t,
+                           cb: *u8,
+                           timeout: libc::uint64_t,
+                           repeat: libc::uint64_t) -> c_int;
+    fn rust_uv_timer_stop(handle: *uv_timer_t) -> c_int;
+
+    fn rust_uv_malloc_buf_base_of(sug_size: size_t) -> *u8;
+    fn rust_uv_free_base_of_buf(buf: uv_buf_t);
+    fn rust_uv_get_stream_handle_from_connect_req(connect_req: *uv_connect_t) -> *uv_stream_t;
+    fn rust_uv_get_stream_handle_from_write_req(write_req: *uv_write_t) -> *uv_stream_t;
+    fn rust_uv_get_loop_for_uv_handle(handle: *c_void) -> *c_void;
+    fn rust_uv_get_data_for_uv_loop(loop_ptr: *c_void) -> *c_void;
+    fn rust_uv_set_data_for_uv_loop(loop_ptr: *c_void, data: *c_void);
+    fn rust_uv_get_data_for_uv_handle(handle: *c_void) -> *c_void;
+    fn rust_uv_set_data_for_uv_handle(handle: *c_void, data: *c_void);
+    fn rust_uv_get_data_for_req(req: *c_void) -> *c_void;
+    fn rust_uv_set_data_for_req(req: *c_void, data: *c_void);
+    fn rust_uv_get_base_from_buf(buf: uv_buf_t) -> *u8;
+    fn rust_uv_get_len_from_buf(buf: uv_buf_t) -> size_t;
+}