about summary refs log tree commit diff
path: root/src/libstd/rt
diff options
context:
space:
mode:
Diffstat (limited to 'src/libstd/rt')
-rw-r--r--src/libstd/rt/io/net/ip.rs4
-rw-r--r--src/libstd/rt/io/net/tcp.rs8
-rw-r--r--src/libstd/rt/io/timer.rs10
-rw-r--r--src/libstd/rt/rtio.rs2
-rw-r--r--src/libstd/rt/uv/net.rs6
-rw-r--r--src/libstd/rt/uv/uvio.rs994
-rw-r--r--src/libstd/rt/uv/uvll.rs3
7 files changed, 679 insertions, 348 deletions
diff --git a/src/libstd/rt/io/net/ip.rs b/src/libstd/rt/io/net/ip.rs
index 77176088801..3b3ea80eafa 100644
--- a/src/libstd/rt/io/net/ip.rs
+++ b/src/libstd/rt/io/net/ip.rs
@@ -17,7 +17,7 @@ use option::{Option, None, Some};
 
 type Port = u16;
 
-#[deriving(Eq, TotalEq)]
+#[deriving(Eq, TotalEq, Clone)]
 pub enum IpAddr {
     Ipv4Addr(u8, u8, u8, u8),
     Ipv6Addr(u16, u16, u16, u16, u16, u16, u16, u16)
@@ -62,7 +62,7 @@ impl ToStr for IpAddr {
     }
 }
 
-#[deriving(Eq, TotalEq)]
+#[deriving(Eq, TotalEq, Clone)]
 pub struct SocketAddr {
     ip: IpAddr,
     port: Port,
diff --git a/src/libstd/rt/io/net/tcp.rs b/src/libstd/rt/io/net/tcp.rs
index 27222542e08..746fa5668a5 100644
--- a/src/libstd/rt/io/net/tcp.rs
+++ b/src/libstd/rt/io/net/tcp.rs
@@ -88,9 +88,7 @@ impl Writer for TcpStream {
     fn write(&mut self, buf: &[u8]) {
         match (**self).write(buf) {
             Ok(_) => (),
-            Err(ioerr) => {
-                io_error::cond.raise(ioerr);
-            }
+            Err(ioerr) => io_error::cond.raise(ioerr),
         }
     }
 
@@ -129,9 +127,7 @@ impl TcpListener {
 impl Listener<TcpStream> for TcpListener {
     fn accept(&mut self) -> Option<TcpStream> {
         match (**self).accept() {
-            Ok(s) => {
-                Some(TcpStream::new(s))
-            }
+            Ok(s) => Some(TcpStream::new(s)),
             Err(ioerr) => {
                 io_error::cond.raise(ioerr);
                 return None;
diff --git a/src/libstd/rt/io/timer.rs b/src/libstd/rt/io/timer.rs
index c7820ebf623..bfd1ed48ac1 100644
--- a/src/libstd/rt/io/timer.rs
+++ b/src/libstd/rt/io/timer.rs
@@ -41,7 +41,7 @@ impl Timer {
 }
 
 impl RtioTimer for Timer {
-    fn sleep(&self, msecs: u64) {
+    fn sleep(&mut self, msecs: u64) {
         (**self).sleep(msecs);
     }
 }
@@ -50,15 +50,11 @@ impl RtioTimer for Timer {
 mod test {
     use super::*;
     use rt::test::*;
-    use option::{Some, None};
     #[test]
     fn test_io_timer_sleep_simple() {
         do run_in_newsched_task {
             let timer = Timer::new();
-            match timer {
-                Some(t) => t.sleep(1),
-                None => assert!(false)
-            }
+            do timer.map_move |mut t| { t.sleep(1) };
         }
     }
-}
\ No newline at end of file
+}
diff --git a/src/libstd/rt/rtio.rs b/src/libstd/rt/rtio.rs
index e29c30ba033..f36c96706e5 100644
--- a/src/libstd/rt/rtio.rs
+++ b/src/libstd/rt/rtio.rs
@@ -91,5 +91,5 @@ pub trait RtioUdpSocket : RtioSocket {
 }
 
 pub trait RtioTimer {
-    fn sleep(&self, msecs: u64);
+    fn sleep(&mut self, msecs: u64);
 }
diff --git a/src/libstd/rt/uv/net.rs b/src/libstd/rt/uv/net.rs
index c8b3d41a78d..e8d0296e543 100644
--- a/src/libstd/rt/uv/net.rs
+++ b/src/libstd/rt/uv/net.rs
@@ -190,9 +190,10 @@ impl StreamWatcher {
 
         extern fn close_cb(handle: *uvll::uv_stream_t) {
             let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(handle);
-            stream_watcher.get_watcher_data().close_cb.take_unwrap()();
+            let cb = stream_watcher.get_watcher_data().close_cb.take_unwrap();
             stream_watcher.drop_watcher_data();
             unsafe { free_handle(handle as *c_void) }
+            cb();
         }
     }
 }
@@ -411,9 +412,10 @@ impl UdpWatcher {
 
         extern fn close_cb(handle: *uvll::uv_udp_t) {
             let mut udp_watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
-            udp_watcher.get_watcher_data().close_cb.take_unwrap()();
+            let cb = udp_watcher.get_watcher_data().close_cb.take_unwrap();
             udp_watcher.drop_watcher_data();
             unsafe { free_handle(handle as *c_void) }
+            cb();
         }
     }
 }
diff --git a/src/libstd/rt/uv/uvio.rs b/src/libstd/rt/uv/uvio.rs
index 078dce4f0c8..f6a2d1be512 100644
--- a/src/libstd/rt/uv/uvio.rs
+++ b/src/libstd/rt/uv/uvio.rs
@@ -23,7 +23,7 @@ use rt::io::net::ip::{SocketAddr, IpAddr};
 use rt::io::{standard_error, OtherIoError};
 use rt::local::Local;
 use rt::rtio::*;
-use rt::sched::Scheduler;
+use rt::sched::{Scheduler, SchedHandle};
 use rt::tube::Tube;
 use rt::uv::*;
 use rt::uv::idle::IdleWatcher;
@@ -37,6 +37,49 @@ use unstable::sync::Exclusive;
                             run_in_newsched_task};
 #[cfg(test)] use iterator::{Iterator, range};
 
+// XXX we should not be calling uvll functions in here.
+
+trait HomingIO {
+    fn home<'r>(&'r mut self) -> &'r mut SchedHandle;
+    /* XXX This will move pinned tasks to do IO on the proper scheduler
+     * and then move them back to their home.
+     */
+    fn home_for_io<A>(&mut self, io: &fn(&mut Self) -> A) -> A {
+        use rt::sched::{PinnedTask, TaskFromFriend};
+        // go home
+        let old_home = Cell::new_empty();
+        let old_home_ptr = &old_home;
+        let scheduler = Local::take::<Scheduler>();
+        do scheduler.deschedule_running_task_and_then |_, task| {
+            // get the old home first
+            do task.wake().map_move |mut task| {
+                old_home_ptr.put_back(task.take_unwrap_home());
+                self.home().send(PinnedTask(task));
+            };
+        }
+
+        // do IO
+        let a = io(self);
+
+        // unhome home
+        let scheduler = Local::take::<Scheduler>();
+        do scheduler.deschedule_running_task_and_then |scheduler, task| {
+            do task.wake().map_move |mut task| {
+                task.give_home(old_home.take());
+                scheduler.make_handle().send(TaskFromFriend(task));
+            };
+        }
+
+        // return the result of the IO
+        a
+    }
+}
+
+// get a handle for the current scheduler
+macro_rules! get_handle_to_current_scheduler(
+    () => (do Local::borrow::<Scheduler, SchedHandle> |sched| { sched.make_handle() })
+)
+
 enum SocketNameKind {
     TcpPeer,
     Tcp,
@@ -45,12 +88,10 @@ enum SocketNameKind {
 
 fn socket_name<T, U: Watcher + NativeHandle<*T>>(sk: SocketNameKind,
                                                  handle: U) -> Result<SocketAddr, IoError> {
-    #[fixed_stack_segment]; #[inline(never)];
-
     let getsockname = match sk {
-        TcpPeer => uvll::rust_uv_tcp_getpeername,
-        Tcp     => uvll::rust_uv_tcp_getsockname,
-        Udp     => uvll::rust_uv_udp_getsockname
+        TcpPeer => uvll::tcp_getpeername,
+        Tcp     => uvll::tcp_getsockname,
+        Udp     => uvll::udp_getsockname,
     };
 
     // Allocate a sockaddr_storage
@@ -80,6 +121,7 @@ fn socket_name<T, U: Watcher + NativeHandle<*T>>(sk: SocketNameKind,
 
 }
 
+// Obviously an Event Loop is always home.
 pub struct UvEventLoop {
     uvio: UvIoFactory
 }
@@ -196,6 +238,7 @@ fn test_callback_run_once() {
     }
 }
 
+// The entire point of async is to call into a loop from other threads so it does not need to home.
 pub struct UvRemoteCallback {
     // The uv async handle for triggering the callback
     async: AsyncWatcher,
@@ -326,40 +369,38 @@ impl IoFactory for UvIoFactory {
         let result_cell = Cell::new_empty();
         let result_cell_ptr: *Cell<Result<~RtioTcpStreamObject, IoError>> = &result_cell;
 
-        let scheduler = Local::take::<Scheduler>();
-
         // Block this task and take ownership, switch to scheduler context
+        let scheduler = Local::take::<Scheduler>();
         do scheduler.deschedule_running_task_and_then |_, task| {
 
-            rtdebug!("connect: entered scheduler context");
-            let mut tcp_watcher = TcpWatcher::new(self.uv_loop());
+            let mut tcp = TcpWatcher::new(self.uv_loop());
             let task_cell = Cell::new(task);
 
             // Wait for a connection
-            do tcp_watcher.connect(addr) |stream_watcher, status| {
-                rtdebug!("connect: in connect callback");
-                if status.is_none() {
-                    rtdebug!("status is none");
-                    let tcp_watcher =
-                        NativeHandle::from_native_handle(stream_watcher.native_handle());
-                    let res = Ok(~UvTcpStream(tcp_watcher));
-
-                    // Store the stream in the task's stack
-                    unsafe { (*result_cell_ptr).put_back(res); }
-
-                    // Context switch
-                    let scheduler = Local::take::<Scheduler>();
-                    scheduler.resume_blocked_task_immediately(task_cell.take());
-                } else {
-                    rtdebug!("status is some");
-                    let task_cell = Cell::new(task_cell.take());
-                    do stream_watcher.close {
-                        let res = Err(uv_error_to_io_error(status.unwrap()));
+            do tcp.connect(addr) |stream, status| {
+                match status {
+                    None => {
+                        let tcp = NativeHandle::from_native_handle(stream.native_handle());
+                        let home = get_handle_to_current_scheduler!();
+                        let res = Ok(~UvTcpStream { watcher: tcp, home: home });
+
+                        // Store the stream in the task's stack
                         unsafe { (*result_cell_ptr).put_back(res); }
+
+                        // Context switch
                         let scheduler = Local::take::<Scheduler>();
                         scheduler.resume_blocked_task_immediately(task_cell.take());
                     }
-                };
+                    Some(_) => {
+                        let task_cell = Cell::new(task_cell.take());
+                        do stream.close {
+                            let res = Err(uv_error_to_io_error(status.unwrap()));
+                            unsafe { (*result_cell_ptr).put_back(res); }
+                            let scheduler = Local::take::<Scheduler>();
+                            scheduler.resume_blocked_task_immediately(task_cell.take());
+                        }
+                    }
+                }
             }
         }
 
@@ -370,7 +411,10 @@ impl IoFactory for UvIoFactory {
     fn tcp_bind(&mut self, addr: SocketAddr) -> Result<~RtioTcpListenerObject, IoError> {
         let mut watcher = TcpWatcher::new(self.uv_loop());
         match watcher.bind(addr) {
-            Ok(_) => Ok(~UvTcpListener::new(watcher)),
+            Ok(_) => {
+                let home = get_handle_to_current_scheduler!();
+                Ok(~UvTcpListener::new(watcher, home))
+            }
             Err(uverr) => {
                 let scheduler = Local::take::<Scheduler>();
                 do scheduler.deschedule_running_task_and_then |_, task| {
@@ -388,7 +432,10 @@ impl IoFactory for UvIoFactory {
     fn udp_bind(&mut self, addr: SocketAddr) -> Result<~RtioUdpSocketObject, IoError> {
         let mut watcher = UdpWatcher::new(self.uv_loop());
         match watcher.bind(addr) {
-            Ok(_) => Ok(~UvUdpSocket(watcher)),
+            Ok(_) => {
+                let home = get_handle_to_current_scheduler!();
+                Ok(~UvUdpSocket { watcher: watcher, home: home })
+            }
             Err(uverr) => {
                 let scheduler = Local::take::<Scheduler>();
                 do scheduler.deschedule_running_task_and_then |_, task| {
@@ -404,22 +451,30 @@ impl IoFactory for UvIoFactory {
     }
 
     fn timer_init(&mut self) -> Result<~RtioTimerObject, IoError> {
-        Ok(~UvTimer(TimerWatcher::new(self.uv_loop())))
+        let watcher = TimerWatcher::new(self.uv_loop());
+        let home = get_handle_to_current_scheduler!();
+        Ok(~UvTimer::new(watcher, home))
     }
 }
 
 pub struct UvTcpListener {
     watcher: TcpWatcher,
     listening: bool,
-    incoming_streams: Tube<Result<~RtioTcpStreamObject, IoError>>
+    incoming_streams: Tube<Result<~RtioTcpStreamObject, IoError>>,
+    home: SchedHandle,
+}
+
+impl HomingIO for UvTcpListener {
+    fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
 }
 
 impl UvTcpListener {
-    fn new(watcher: TcpWatcher) -> UvTcpListener {
+    fn new(watcher: TcpWatcher, home: SchedHandle) -> UvTcpListener {
         UvTcpListener {
             watcher: watcher,
             listening: false,
-            incoming_streams: Tube::new()
+            incoming_streams: Tube::new(),
+            home: home,
         }
     }
 
@@ -428,13 +483,16 @@ impl UvTcpListener {
 
 impl Drop for UvTcpListener {
     fn drop(&self) {
-        let watcher = self.watcher();
-        let scheduler = Local::take::<Scheduler>();
-        do scheduler.deschedule_running_task_and_then |_, task| {
-            let task_cell = Cell::new(task);
-            do watcher.as_stream().close {
-                let scheduler = Local::take::<Scheduler>();
-                scheduler.resume_blocked_task_immediately(task_cell.take());
+        // XXX need mutable finalizer
+        let self_ = unsafe { transmute::<&UvTcpListener, &mut UvTcpListener>(self) };
+        do self_.home_for_io |self_| {
+            let scheduler = Local::take::<Scheduler>();
+            do scheduler.deschedule_running_task_and_then |_, task| {
+                let task_cell = Cell::new(task);
+                do self_.watcher().as_stream().close {
+                    let scheduler = Local::take::<Scheduler>();
+                    scheduler.resume_blocked_task_immediately(task_cell.take());
+                }
             }
         }
     }
@@ -442,83 +500,92 @@ impl Drop for UvTcpListener {
 
 impl RtioSocket for UvTcpListener {
     fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
-        socket_name(Tcp, self.watcher)
+        do self.home_for_io |self_| {
+          socket_name(Tcp, self_.watcher)
+        }
     }
 }
 
 impl RtioTcpListener for UvTcpListener {
 
     fn accept(&mut self) -> Result<~RtioTcpStreamObject, IoError> {
-        rtdebug!("entering listen");
-
-        if self.listening {
-            return self.incoming_streams.recv();
-        }
-
-        self.listening = true;
-
-        let server_tcp_watcher = self.watcher();
-        let incoming_streams_cell = Cell::new(self.incoming_streams.clone());
-
-        let incoming_streams_cell = Cell::new(incoming_streams_cell.take());
-        let mut server_tcp_watcher = server_tcp_watcher;
-        do server_tcp_watcher.listen |mut server_stream_watcher, status| {
-            let maybe_stream = if status.is_none() {
-                let mut loop_ = server_stream_watcher.event_loop();
-                let client_tcp_watcher = TcpWatcher::new(&mut loop_);
-                // XXX: Need's to be surfaced in interface
-                server_stream_watcher.accept(client_tcp_watcher.as_stream());
-                Ok(~UvTcpStream(client_tcp_watcher))
-            } else {
-                Err(standard_error(OtherIoError))
-            };
+        do self.home_for_io |self_| {
+
+            if !self_.listening {
+                self_.listening = true;
+
+                let incoming_streams_cell = Cell::new(self_.incoming_streams.clone());
+
+                do self_.watcher().listen |mut server, status| {
+                    let stream = match status {
+                        Some(_) => Err(standard_error(OtherIoError)),
+                        None => {
+                            let client = TcpWatcher::new(&server.event_loop());
+                            // XXX: needs to be surfaced in interface
+                            server.accept(client.as_stream());
+                            let home = get_handle_to_current_scheduler!();
+                            Ok(~UvTcpStream { watcher: client, home: home })
+                        }
+                    };
+
+                    let mut incoming_streams = incoming_streams_cell.take();
+                    incoming_streams.send(stream);
+                    incoming_streams_cell.put_back(incoming_streams);
+                }
 
-            let mut incoming_streams = incoming_streams_cell.take();
-            incoming_streams.send(maybe_stream);
-            incoming_streams_cell.put_back(incoming_streams);
+            }
+            self_.incoming_streams.recv()
         }
-
-        return self.incoming_streams.recv();
     }
 
     fn accept_simultaneously(&mut self) -> Result<(), IoError> {
-        #[fixed_stack_segment]; #[inline(never)];
-
-        let r = unsafe {
-            uvll::rust_uv_tcp_simultaneous_accepts(self.watcher.native_handle(), 1 as c_int)
-        };
+        do self.home_for_io |self_| {
+            let r = unsafe {
+                uvll::tcp_simultaneous_accepts(self_.watcher().native_handle(), 1 as c_int)
+            };
 
-        match status_to_maybe_uv_error(self.watcher, r) {
-            Some(err) => Err(uv_error_to_io_error(err)),
-            None => Ok(())
+            match status_to_maybe_uv_error(self_.watcher(), r) {
+                Some(err) => Err(uv_error_to_io_error(err)),
+                None => Ok(())
+            }
         }
     }
 
     fn dont_accept_simultaneously(&mut self) -> Result<(), IoError> {
-        #[fixed_stack_segment]; #[inline(never)];
-
-        let r = unsafe {
-            uvll::rust_uv_tcp_simultaneous_accepts(self.watcher.native_handle(), 0 as c_int)
-        };
+        do self.home_for_io |self_| {
+            let r = unsafe {
+                uvll::tcp_simultaneous_accepts(self_.watcher().native_handle(), 0 as c_int)
+            };
 
-        match status_to_maybe_uv_error(self.watcher, r) {
-            Some(err) => Err(uv_error_to_io_error(err)),
-            None => Ok(())
+            match status_to_maybe_uv_error(self_.watcher(), r) {
+                Some(err) => Err(uv_error_to_io_error(err)),
+                None => Ok(())
+            }
         }
     }
 }
 
-pub struct UvTcpStream(TcpWatcher);
+pub struct UvTcpStream {
+    watcher: TcpWatcher,
+    home: SchedHandle,
+}
+
+impl HomingIO for UvTcpStream {
+    fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
+}
 
 impl Drop for UvTcpStream {
     fn drop(&self) {
-        rtdebug!("closing tcp stream");
-        let scheduler = Local::take::<Scheduler>();
-        do scheduler.deschedule_running_task_and_then |_, task| {
-            let task_cell = Cell::new(task);
-            do self.as_stream().close {
-                let scheduler = Local::take::<Scheduler>();
-                scheduler.resume_blocked_task_immediately(task_cell.take());
+        // XXX need mutable finalizer
+        let this = unsafe { transmute::<&UvTcpStream, &mut UvTcpStream>(self) };
+        do this.home_for_io |self_| {
+            let scheduler = Local::take::<Scheduler>();
+            do scheduler.deschedule_running_task_and_then |_, task| {
+                let task_cell = Cell::new(task);
+                do self_.watcher.as_stream().close {
+                    let scheduler = Local::take::<Scheduler>();
+                    scheduler.resume_blocked_task_immediately(task_cell.take());
+                }
             }
         }
     }
@@ -526,148 +593,161 @@ impl Drop for UvTcpStream {
 
 impl RtioSocket for UvTcpStream {
     fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
-        socket_name(Tcp, **self)
+        do self.home_for_io |self_| {
+            socket_name(Tcp, self_.watcher)
+        }
     }
 }
 
 impl RtioTcpStream for UvTcpStream {
     fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
-        let result_cell = Cell::new_empty();
-        let result_cell_ptr: *Cell<Result<uint, IoError>> = &result_cell;
-
-        let scheduler = Local::take::<Scheduler>();
-        let buf_ptr: *&mut [u8] = &buf;
-        do scheduler.deschedule_running_task_and_then |_sched, task| {
-            rtdebug!("read: entered scheduler context");
-            let task_cell = Cell::new(task);
-            // XXX: We shouldn't reallocate these callbacks every
-            // call to read
-            let alloc: AllocCallback = |_| unsafe {
-                slice_to_uv_buf(*buf_ptr)
-            };
-            let mut watcher = self.as_stream();
-            do watcher.read_start(alloc) |mut watcher, nread, _buf, status| {
-
-                // Stop reading so that no read callbacks are
-                // triggered before the user calls `read` again.
-                // XXX: Is there a performance impact to calling
-                // stop here?
-                watcher.read_stop();
-
-                let result = if status.is_none() {
-                    assert!(nread >= 0);
-                    Ok(nread as uint)
-                } else {
-                    Err(uv_error_to_io_error(status.unwrap()))
+        do self.home_for_io |self_| {
+            let result_cell = Cell::new_empty();
+            let result_cell_ptr: *Cell<Result<uint, IoError>> = &result_cell;
+
+            let scheduler = Local::take::<Scheduler>();
+            let buf_ptr: *&mut [u8] = &buf;
+            do scheduler.deschedule_running_task_and_then |_sched, task| {
+                let task_cell = Cell::new(task);
+                // XXX: We shouldn't reallocate these callbacks every
+                // call to read
+                let alloc: AllocCallback = |_| unsafe {
+                    slice_to_uv_buf(*buf_ptr)
                 };
+                let mut watcher = self_.watcher.as_stream();
+                do watcher.read_start(alloc) |mut watcher, nread, _buf, status| {
 
-                unsafe { (*result_cell_ptr).put_back(result); }
+                    // Stop reading so that no read callbacks are
+                    // triggered before the user calls `read` again.
+                    // XXX: Is there a performance impact to calling
+                    // stop here?
+                    watcher.read_stop();
 
-                let scheduler = Local::take::<Scheduler>();
-                scheduler.resume_blocked_task_immediately(task_cell.take());
+                    let result = if status.is_none() {
+                        assert!(nread >= 0);
+                        Ok(nread as uint)
+                    } else {
+                        Err(uv_error_to_io_error(status.unwrap()))
+                    };
+
+                    unsafe { (*result_cell_ptr).put_back(result); }
+
+                    let scheduler = Local::take::<Scheduler>();
+                    scheduler.resume_blocked_task_immediately(task_cell.take());
+                }
             }
-        }
 
-        assert!(!result_cell.is_empty());
-        return result_cell.take();
+            assert!(!result_cell.is_empty());
+            result_cell.take()
+        }
     }
 
     fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
-        let result_cell = Cell::new_empty();
-        let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
-        let scheduler = Local::take::<Scheduler>();
-        let buf_ptr: *&[u8] = &buf;
-        do scheduler.deschedule_running_task_and_then |_, task| {
-            let task_cell = Cell::new(task);
-            let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
-            let mut watcher = self.as_stream();
-            do watcher.write(buf) |_watcher, status| {
-                let result = if status.is_none() {
-                    Ok(())
-                } else {
-                    Err(uv_error_to_io_error(status.unwrap()))
-                };
-
-                unsafe { (*result_cell_ptr).put_back(result); }
+        do self.home_for_io |self_| {
+            let result_cell = Cell::new_empty();
+            let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
+            let scheduler = Local::take::<Scheduler>();
+            let buf_ptr: *&[u8] = &buf;
+            do scheduler.deschedule_running_task_and_then |_, task| {
+                let task_cell = Cell::new(task);
+                let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
+                let mut watcher = self_.watcher.as_stream();
+                do watcher.write(buf) |_watcher, status| {
+                    let result = if status.is_none() {
+                        Ok(())
+                    } else {
+                        Err(uv_error_to_io_error(status.unwrap()))
+                    };
+
+                    unsafe { (*result_cell_ptr).put_back(result); }
 
-                let scheduler = Local::take::<Scheduler>();
-                scheduler.resume_blocked_task_immediately(task_cell.take());
+                    let scheduler = Local::take::<Scheduler>();
+                    scheduler.resume_blocked_task_immediately(task_cell.take());
+                }
             }
-        }
 
-        assert!(!result_cell.is_empty());
-        return result_cell.take();
+            assert!(!result_cell.is_empty());
+            result_cell.take()
+        }
     }
 
     fn peer_name(&mut self) -> Result<SocketAddr, IoError> {
-        socket_name(TcpPeer, **self)
+        do self.home_for_io |self_| {
+            socket_name(TcpPeer, self_.watcher)
+        }
     }
 
     fn control_congestion(&mut self) -> Result<(), IoError> {
-        #[fixed_stack_segment]; #[inline(never)];
+        do self.home_for_io |self_| {
+            let r = unsafe { uvll::tcp_nodelay(self_.watcher.native_handle(), 0 as c_int) };
 
-        let r = unsafe {
-            uvll::rust_uv_tcp_nodelay(self.native_handle(), 0 as c_int)
-        };
-
-        match status_to_maybe_uv_error(**self, r) {
-            Some(err) => Err(uv_error_to_io_error(err)),
-            None => Ok(())
+            match status_to_maybe_uv_error(self_.watcher, r) {
+                Some(err) => Err(uv_error_to_io_error(err)),
+                None => Ok(())
+            }
         }
     }
 
     fn nodelay(&mut self) -> Result<(), IoError> {
-        #[fixed_stack_segment]; #[inline(never)];
-
-        let r = unsafe {
-            uvll::rust_uv_tcp_nodelay(self.native_handle(), 1 as c_int)
-        };
+        do self.home_for_io |self_| {
+            let r = unsafe { uvll::tcp_nodelay(self_.watcher.native_handle(), 1 as c_int) };
 
-        match status_to_maybe_uv_error(**self, r) {
-            Some(err) => Err(uv_error_to_io_error(err)),
-            None => Ok(())
+            match status_to_maybe_uv_error(self_.watcher, r) {
+                Some(err) => Err(uv_error_to_io_error(err)),
+                None => Ok(())
+            }
         }
     }
 
     fn keepalive(&mut self, delay_in_seconds: uint) -> Result<(), IoError> {
-        #[fixed_stack_segment]; #[inline(never)];
-
-        let r = unsafe {
-            uvll::rust_uv_tcp_keepalive(self.native_handle(), 1 as c_int,
-                                        delay_in_seconds as c_uint)
-        };
+        do self.home_for_io |self_| {
+            let r = unsafe {
+                uvll::tcp_keepalive(self_.watcher.native_handle(), 1 as c_int,
+                                    delay_in_seconds as c_uint)
+            };
 
-        match status_to_maybe_uv_error(**self, r) {
-            Some(err) => Err(uv_error_to_io_error(err)),
-            None => Ok(())
+            match status_to_maybe_uv_error(self_.watcher, r) {
+                Some(err) => Err(uv_error_to_io_error(err)),
+                None => Ok(())
+            }
         }
     }
 
     fn letdie(&mut self) -> Result<(), IoError> {
-        #[fixed_stack_segment]; #[inline(never)];
-
-        let r = unsafe {
-            uvll::rust_uv_tcp_keepalive(self.native_handle(), 0 as c_int, 0 as c_uint)
-        };
+        do self.home_for_io |self_| {
+            let r = unsafe {
+                uvll::tcp_keepalive(self_.watcher.native_handle(), 0 as c_int, 0 as c_uint)
+            };
 
-        match status_to_maybe_uv_error(**self, r) {
-            Some(err) => Err(uv_error_to_io_error(err)),
-            None => Ok(())
+            match status_to_maybe_uv_error(self_.watcher, r) {
+                Some(err) => Err(uv_error_to_io_error(err)),
+                None => Ok(())
+            }
         }
     }
 }
 
-pub struct UvUdpSocket(UdpWatcher);
+pub struct UvUdpSocket {
+    watcher: UdpWatcher,
+    home: SchedHandle,
+}
+
+impl HomingIO for UvUdpSocket {
+    fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
+}
 
 impl Drop for UvUdpSocket {
     fn drop(&self) {
-        rtdebug!("closing udp socket");
-        let scheduler = Local::take::<Scheduler>();
-        do scheduler.deschedule_running_task_and_then |_, task| {
-            let task_cell = Cell::new(task);
-            do self.close {
-                let scheduler = Local::take::<Scheduler>();
-                scheduler.resume_blocked_task_immediately(task_cell.take());
+        // XXX need mutable finalizer
+        let this = unsafe { transmute::<&UvUdpSocket, &mut UvUdpSocket>(self) };
+        do this.home_for_io |_| {
+            let scheduler = Local::take::<Scheduler>();
+            do scheduler.deschedule_running_task_and_then |_, task| {
+                let task_cell = Cell::new(task);
+                do this.watcher.close {
+                    let scheduler = Local::take::<Scheduler>();
+                    scheduler.resume_blocked_task_immediately(task_cell.take());
+                }
             }
         }
     }
@@ -675,203 +755,240 @@ impl Drop for UvUdpSocket {
 
 impl RtioSocket for UvUdpSocket {
     fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
-        socket_name(Udp, **self)
+        do self.home_for_io |self_| {
+            socket_name(Udp, self_.watcher)
+        }
     }
 }
 
 impl RtioUdpSocket for UvUdpSocket {
     fn recvfrom(&mut self, buf: &mut [u8]) -> Result<(uint, SocketAddr), IoError> {
-        let result_cell = Cell::new_empty();
-        let result_cell_ptr: *Cell<Result<(uint, SocketAddr), IoError>> = &result_cell;
-
-        let scheduler = Local::take::<Scheduler>();
-        let buf_ptr: *&mut [u8] = &buf;
-        do scheduler.deschedule_running_task_and_then |_sched, task| {
-            rtdebug!("recvfrom: entered scheduler context");
-            let task_cell = Cell::new(task);
-            let alloc: AllocCallback = |_| unsafe { slice_to_uv_buf(*buf_ptr) };
-            do self.recv_start(alloc) |mut watcher, nread, _buf, addr, flags, status| {
-                let _ = flags; // XXX add handling for partials?
-
-                watcher.recv_stop();
-
-                let result = match status {
-                    None => {
-                        assert!(nread >= 0);
-                        Ok((nread as uint, addr))
-                    }
-                    Some(err) => Err(uv_error_to_io_error(err))
-                };
-
-                unsafe { (*result_cell_ptr).put_back(result); }
+        do self.home_for_io |self_| {
+            let result_cell = Cell::new_empty();
+            let result_cell_ptr: *Cell<Result<(uint, SocketAddr), IoError>> = &result_cell;
+
+            let scheduler = Local::take::<Scheduler>();
+            let buf_ptr: *&mut [u8] = &buf;
+            do scheduler.deschedule_running_task_and_then |_, task| {
+                let task_cell = Cell::new(task);
+                let alloc: AllocCallback = |_| unsafe { slice_to_uv_buf(*buf_ptr) };
+                do self_.watcher.recv_start(alloc) |mut watcher, nread, _buf, addr, flags, status| {
+                    let _ = flags; // /XXX add handling for partials?
+
+                    watcher.recv_stop();
+
+                    let result = match status {
+                        None => {
+                            assert!(nread >= 0);
+                            Ok((nread as uint, addr))
+                        }
+                        Some(err) => Err(uv_error_to_io_error(err)),
+                    };
+
+                    unsafe { (*result_cell_ptr).put_back(result); }
 
-                let scheduler = Local::take::<Scheduler>();
-                scheduler.resume_blocked_task_immediately(task_cell.take());
+                    let scheduler = Local::take::<Scheduler>();
+                    scheduler.resume_blocked_task_immediately(task_cell.take());
+                }
             }
-        }
 
-        assert!(!result_cell.is_empty());
-        return result_cell.take();
+            assert!(!result_cell.is_empty());
+            result_cell.take()
+        }
     }
 
     fn sendto(&mut self, buf: &[u8], dst: SocketAddr) -> Result<(), IoError> {
-        let result_cell = Cell::new_empty();
-        let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
-        let scheduler = Local::take::<Scheduler>();
-        let buf_ptr: *&[u8] = &buf;
-        do scheduler.deschedule_running_task_and_then |_, task| {
-            let task_cell = Cell::new(task);
-            let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
-            do self.send(buf, dst) |_watcher, status| {
+        do self.home_for_io |self_| {
+            let result_cell = Cell::new_empty();
+            let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
+            let scheduler = Local::take::<Scheduler>();
+            let buf_ptr: *&[u8] = &buf;
+            do scheduler.deschedule_running_task_and_then |_, task| {
+                let task_cell = Cell::new(task);
+                let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
+                do self_.watcher.send(buf, dst) |_watcher, status| {
+
+                    let result = match status {
+                        None => Ok(()),
+                        Some(err) => Err(uv_error_to_io_error(err)),
+                    };
+
+                    unsafe { (*result_cell_ptr).put_back(result); }
 
-                let result = match status {
-                    None => Ok(()),
-                    Some(err) => Err(uv_error_to_io_error(err)),
-                };
-
-                unsafe { (*result_cell_ptr).put_back(result); }
-
-                let scheduler = Local::take::<Scheduler>();
-                scheduler.resume_blocked_task_immediately(task_cell.take());
+                    let scheduler = Local::take::<Scheduler>();
+                    scheduler.resume_blocked_task_immediately(task_cell.take());
+                }
             }
-        }
 
-        assert!(!result_cell.is_empty());
-        return result_cell.take();
+            assert!(!result_cell.is_empty());
+            result_cell.take()
+        }
     }
 
     fn join_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> {
-        let r = unsafe {
-            do multi.to_str().with_c_str |m_addr| {
-                uvll::udp_set_membership(self.native_handle(), m_addr,
-                                         ptr::null(), uvll::UV_JOIN_GROUP)
-            }
-        };
+        do self.home_for_io |self_| {
+            let r = unsafe {
+                do multi.to_str().with_c_str |m_addr| {
+                    uvll::udp_set_membership(self_.watcher.native_handle(), m_addr,
+                                             ptr::null(), uvll::UV_JOIN_GROUP)
+                }
+            };
 
-        match status_to_maybe_uv_error(**self, r) {
-            Some(err) => Err(uv_error_to_io_error(err)),
-            None => Ok(())
+            match status_to_maybe_uv_error(self_.watcher, r) {
+                Some(err) => Err(uv_error_to_io_error(err)),
+                None => Ok(())
+            }
         }
     }
 
     fn leave_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> {
-        let r = unsafe {
-            do multi.to_str().with_c_str |m_addr| {
-                uvll::udp_set_membership(self.native_handle(), m_addr,
-                                         ptr::null(), uvll::UV_LEAVE_GROUP)
-            }
-        };
+        do self.home_for_io |self_| {
+            let r = unsafe {
+                do multi.to_str().with_c_str |m_addr| {
+                    uvll::udp_set_membership(self_.watcher.native_handle(), m_addr,
+                                             ptr::null(), uvll::UV_LEAVE_GROUP)
+                }
+            };
 
-        match status_to_maybe_uv_error(**self, r) {
-            Some(err) => Err(uv_error_to_io_error(err)),
-            None => Ok(())
+            match status_to_maybe_uv_error(self_.watcher, r) {
+                Some(err) => Err(uv_error_to_io_error(err)),
+                None => Ok(())
+            }
         }
     }
 
     fn loop_multicast_locally(&mut self) -> Result<(), IoError> {
-        let r = unsafe {
-            uvll::udp_set_multicast_loop(self.native_handle(), 1 as c_int)
-        };
+        do self.home_for_io |self_| {
+
+            let r = unsafe {
+                uvll::udp_set_multicast_loop(self_.watcher.native_handle(), 1 as c_int)
+            };
 
-        match status_to_maybe_uv_error(**self, r) {
-            Some(err) => Err(uv_error_to_io_error(err)),
-            None => Ok(())
+            match status_to_maybe_uv_error(self_.watcher, r) {
+                Some(err) => Err(uv_error_to_io_error(err)),
+                None => Ok(())
+            }
         }
     }
 
     fn dont_loop_multicast_locally(&mut self) -> Result<(), IoError> {
-        let r = unsafe {
-            uvll::udp_set_multicast_loop(self.native_handle(), 0 as c_int)
-        };
+        do self.home_for_io |self_| {
+
+            let r = unsafe {
+                uvll::udp_set_multicast_loop(self_.watcher.native_handle(), 0 as c_int)
+            };
 
-        match status_to_maybe_uv_error(**self, r) {
-            Some(err) => Err(uv_error_to_io_error(err)),
-            None => Ok(())
+            match status_to_maybe_uv_error(self_.watcher, r) {
+                Some(err) => Err(uv_error_to_io_error(err)),
+                None => Ok(())
+            }
         }
     }
 
     fn multicast_time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
-        let r = unsafe {
-            uvll::udp_set_multicast_ttl(self.native_handle(), ttl as c_int)
-        };
+        do self.home_for_io |self_| {
 
-        match status_to_maybe_uv_error(**self, r) {
-            Some(err) => Err(uv_error_to_io_error(err)),
-            None => Ok(())
+            let r = unsafe {
+                uvll::udp_set_multicast_ttl(self_.watcher.native_handle(), ttl as c_int)
+            };
+
+            match status_to_maybe_uv_error(self_.watcher, r) {
+                Some(err) => Err(uv_error_to_io_error(err)),
+                None => Ok(())
+            }
         }
     }
 
     fn time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
-        let r = unsafe {
-            uvll::udp_set_ttl(self.native_handle(), ttl as c_int)
-        };
+        do self.home_for_io |self_| {
+
+            let r = unsafe {
+                uvll::udp_set_ttl(self_.watcher.native_handle(), ttl as c_int)
+            };
 
-        match status_to_maybe_uv_error(**self, r) {
-            Some(err) => Err(uv_error_to_io_error(err)),
-            None => Ok(())
+            match status_to_maybe_uv_error(self_.watcher, r) {
+                Some(err) => Err(uv_error_to_io_error(err)),
+                None => Ok(())
+            }
         }
     }
 
     fn hear_broadcasts(&mut self) -> Result<(), IoError> {
-        let r = unsafe {
-            uvll::udp_set_broadcast(self.native_handle(), 1 as c_int)
-        };
+        do self.home_for_io |self_| {
+
+            let r = unsafe {
+                uvll::udp_set_broadcast(self_.watcher.native_handle(), 1 as c_int)
+            };
 
-        match status_to_maybe_uv_error(**self, r) {
-            Some(err) => Err(uv_error_to_io_error(err)),
-            None => Ok(())
+            match status_to_maybe_uv_error(self_.watcher, r) {
+                Some(err) => Err(uv_error_to_io_error(err)),
+                None => Ok(())
+            }
         }
     }
 
     fn ignore_broadcasts(&mut self) -> Result<(), IoError> {
-        let r = unsafe {
-            uvll::udp_set_broadcast(self.native_handle(), 0 as c_int)
-        };
+        do self.home_for_io |self_| {
 
-        match status_to_maybe_uv_error(**self, r) {
-            Some(err) => Err(uv_error_to_io_error(err)),
-            None => Ok(())
+            let r = unsafe {
+                uvll::udp_set_broadcast(self_.watcher.native_handle(), 0 as c_int)
+            };
+
+            match status_to_maybe_uv_error(self_.watcher, r) {
+                Some(err) => Err(uv_error_to_io_error(err)),
+                None => Ok(())
+            }
         }
     }
 }
 
-pub struct UvTimer(timer::TimerWatcher);
+pub struct UvTimer {
+    watcher: timer::TimerWatcher,
+    home: SchedHandle,
+}
+
+impl HomingIO for UvTimer {
+    fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
+}
 
 impl UvTimer {
-    fn new(w: timer::TimerWatcher) -> UvTimer {
-        UvTimer(w)
+    fn new(w: timer::TimerWatcher, home: SchedHandle) -> UvTimer {
+        UvTimer { watcher: w, home: home }
     }
 }
 
 impl Drop for UvTimer {
     fn drop(&self) {
-        rtdebug!("closing UvTimer");
-        let scheduler = Local::take::<Scheduler>();
-        do scheduler.deschedule_running_task_and_then |_, task| {
-            let task_cell = Cell::new(task);
-            do self.close {
-                let scheduler = Local::take::<Scheduler>();
-                scheduler.resume_blocked_task_immediately(task_cell.take());
+        let self_ = unsafe { transmute::<&UvTimer, &mut UvTimer>(self) };
+        do self_.home_for_io |self_| {
+            rtdebug!("closing UvTimer");
+            let scheduler = Local::take::<Scheduler>();
+            do scheduler.deschedule_running_task_and_then |_, task| {
+                let task_cell = Cell::new(task);
+                do self_.watcher.close {
+                    let scheduler = Local::take::<Scheduler>();
+                    scheduler.resume_blocked_task_immediately(task_cell.take());
+                }
             }
         }
     }
 }
 
 impl RtioTimer for UvTimer {
-    fn sleep(&self, msecs: u64) {
-        let scheduler = Local::take::<Scheduler>();
-        do scheduler.deschedule_running_task_and_then |_sched, task| {
-            rtdebug!("sleep: entered scheduler context");
-            let task_cell = Cell::new(task);
-            let mut watcher = **self;
-            do watcher.start(msecs, 0) |_, status| {
-                assert!(status.is_none());
-                let scheduler = Local::take::<Scheduler>();
-                scheduler.resume_blocked_task_immediately(task_cell.take());
+    fn sleep(&mut self, msecs: u64) {
+        do self.home_for_io |self_| {
+            let scheduler = Local::take::<Scheduler>();
+            do scheduler.deschedule_running_task_and_then |_sched, task| {
+                rtdebug!("sleep: entered scheduler context");
+                let task_cell = Cell::new(task);
+                do self_.watcher.start(msecs, 0) |_, status| {
+                    assert!(status.is_none());
+                    let scheduler = Local::take::<Scheduler>();
+                    scheduler.resume_blocked_task_immediately(task_cell.take());
+                }
             }
+            self_.watcher.stop();
         }
-        let mut w = **self;
-        w.stop();
     }
 }
 
@@ -900,6 +1017,152 @@ fn test_simple_udp_io_bind_only() {
 }
 
 #[test]
+fn test_simple_homed_udp_io_bind_then_move_task_then_home_and_close() {
+    use rt::sleeper_list::SleeperList;
+    use rt::work_queue::WorkQueue;
+    use rt::thread::Thread;
+    use rt::task::Task;
+    use rt::sched::{Shutdown, TaskFromFriend};
+    do run_in_bare_thread {
+        let sleepers = SleeperList::new();
+        let work_queue1 = WorkQueue::new();
+        let work_queue2 = WorkQueue::new();
+        let queues = ~[work_queue1.clone(), work_queue2.clone()];
+
+        let mut sched1 = ~Scheduler::new(~UvEventLoop::new(), work_queue1, queues.clone(),
+                                         sleepers.clone());
+        let mut sched2 = ~Scheduler::new(~UvEventLoop::new(), work_queue2, queues.clone(),
+                                         sleepers.clone());
+
+        let handle1 = Cell::new(sched1.make_handle());
+        let handle2 = Cell::new(sched2.make_handle());
+        let tasksFriendHandle = Cell::new(sched2.make_handle());
+
+        let on_exit: ~fn(bool) = |exit_status| {
+            handle1.take().send(Shutdown);
+            handle2.take().send(Shutdown);
+            rtassert!(exit_status);
+        };
+
+        let test_function: ~fn() = || {
+            let io = unsafe { Local::unsafe_borrow::<IoFactoryObject>() };
+            let addr = next_test_ip4();
+            let maybe_socket = unsafe { (*io).udp_bind(addr) };
+            // this socket is bound to this event loop
+            assert!(maybe_socket.is_ok());
+
+            // block self on sched1
+            let scheduler = Local::take::<Scheduler>();
+            do scheduler.deschedule_running_task_and_then |_, task| {
+                // unblock task
+                do task.wake().map_move |task| {
+                  // send self to sched2
+                  tasksFriendHandle.take().send(TaskFromFriend(task));
+                };
+                // sched1 should now sleep since it has nothing else to do
+            }
+            // sched2 will wake up and get the task
+            // as we do nothing else, the function ends and the socket goes out of scope
+            // sched2 will start to run the destructor
+            // the destructor will first block the task, set it's home as sched1, then enqueue it
+            // sched2 will dequeue the task, see that it has a home, and send it to sched1
+            // sched1 will wake up, exec the close function on the correct loop, and then we're done
+        };
+
+        let mut main_task = ~Task::new_root(&mut sched1.stack_pool, None, test_function);
+        main_task.death.on_exit = Some(on_exit);
+        let main_task = Cell::new(main_task);
+
+        let null_task = Cell::new(~do Task::new_root(&mut sched2.stack_pool, None) || {});
+
+        let sched1 = Cell::new(sched1);
+        let sched2 = Cell::new(sched2);
+
+        let thread1 = do Thread::start {
+            sched1.take().bootstrap(main_task.take());
+        };
+        let thread2 = do Thread::start {
+            sched2.take().bootstrap(null_task.take());
+        };
+
+        thread1.join();
+        thread2.join();
+    }
+}
+
+#[test]
+fn test_simple_homed_udp_io_bind_then_move_handle_then_home_and_close() {
+    use rt::sleeper_list::SleeperList;
+    use rt::work_queue::WorkQueue;
+    use rt::thread::Thread;
+    use rt::task::Task;
+    use rt::comm::oneshot;
+    use rt::sched::Shutdown;
+    do run_in_bare_thread {
+        let sleepers = SleeperList::new();
+        let work_queue1 = WorkQueue::new();
+        let work_queue2 = WorkQueue::new();
+        let queues = ~[work_queue1.clone(), work_queue2.clone()];
+
+        let mut sched1 = ~Scheduler::new(~UvEventLoop::new(), work_queue1, queues.clone(),
+                                         sleepers.clone());
+        let mut sched2 = ~Scheduler::new(~UvEventLoop::new(), work_queue2, queues.clone(),
+                                         sleepers.clone());
+
+        let handle1 = Cell::new(sched1.make_handle());
+        let handle2 = Cell::new(sched2.make_handle());
+
+        let (port, chan) = oneshot();
+        let port = Cell::new(port);
+        let chan = Cell::new(chan);
+
+        let body1: ~fn() = || {
+            let io = unsafe { Local::unsafe_borrow::<IoFactoryObject>() };
+            let addr = next_test_ip4();
+            let socket = unsafe { (*io).udp_bind(addr) };
+            assert!(socket.is_ok());
+            chan.take().send(socket);
+        };
+
+        let body2: ~fn() = || {
+            let socket = port.take().recv();
+            assert!(socket.is_ok());
+            /* The socket goes out of scope and the destructor is called.
+             * The destructor:
+             *  - sends itself back to sched1
+             *  - frees the socket
+             *  - resets the home of the task to whatever it was previously
+             */
+        };
+
+        let on_exit: ~fn(bool) = |exit| {
+            handle1.take().send(Shutdown);
+            handle2.take().send(Shutdown);
+            rtassert!(exit);
+        };
+
+        let task1 = Cell::new(~Task::new_root(&mut sched1.stack_pool, None, body1));
+
+        let mut task2 = ~Task::new_root(&mut sched2.stack_pool, None, body2);
+        task2.death.on_exit = Some(on_exit);
+        let task2 = Cell::new(task2);
+
+        let sched1 = Cell::new(sched1);
+        let sched2 = Cell::new(sched2);
+
+        let thread1 = do Thread::start {
+            sched1.take().bootstrap(task1.take());
+        };
+        let thread2 = do Thread::start {
+            sched2.take().bootstrap(task2.take());
+        };
+
+        thread1.join();
+        thread2.join();
+    }
+}
+
+#[test]
 fn test_simple_tcp_server_and_client() {
     do run_in_newsched_task {
         let addr = next_test_ip4();
@@ -931,6 +1194,85 @@ fn test_simple_tcp_server_and_client() {
 }
 
 #[test]
+fn test_simple_tcp_server_and_client_on_diff_threads() {
+    use rt::sleeper_list::SleeperList;
+    use rt::work_queue::WorkQueue;
+    use rt::thread::Thread;
+    use rt::task::Task;
+    use rt::sched::{Shutdown};
+    do run_in_bare_thread {
+        let sleepers = SleeperList::new();
+
+        let server_addr = next_test_ip4();
+        let client_addr = server_addr.clone();
+
+        let server_work_queue = WorkQueue::new();
+        let client_work_queue = WorkQueue::new();
+        let queues = ~[server_work_queue.clone(), client_work_queue.clone()];
+
+        let mut server_sched = ~Scheduler::new(~UvEventLoop::new(), server_work_queue,
+                                               queues.clone(), sleepers.clone());
+        let mut client_sched = ~Scheduler::new(~UvEventLoop::new(), client_work_queue,
+                                               queues.clone(), sleepers.clone());
+
+        let server_handle = Cell::new(server_sched.make_handle());
+        let client_handle = Cell::new(client_sched.make_handle());
+
+        let server_on_exit: ~fn(bool) = |exit_status| {
+            server_handle.take().send(Shutdown);
+            rtassert!(exit_status);
+        };
+
+        let client_on_exit: ~fn(bool) = |exit_status| {
+            client_handle.take().send(Shutdown);
+            rtassert!(exit_status);
+        };
+
+        let server_fn: ~fn() = || {
+            let io = unsafe { Local::unsafe_borrow::<IoFactoryObject>() };
+            let mut listener = unsafe { (*io).tcp_bind(server_addr).unwrap() };
+            let mut stream = listener.accept().unwrap();
+            let mut buf = [0, .. 2048];
+            let nread = stream.read(buf).unwrap();
+            assert_eq!(nread, 8);
+            for i in range(0u, nread) {
+                assert_eq!(buf[i], i as u8);
+            }
+        };
+
+        let client_fn: ~fn() = || {
+            let io = unsafe { Local::unsafe_borrow::<IoFactoryObject>() };
+            let mut stream = unsafe { (*io).tcp_connect(client_addr) };
+            while stream.is_err() {
+                stream = unsafe { (*io).tcp_connect(client_addr) };
+            }
+            stream.unwrap().write([0, 1, 2, 3, 4, 5, 6, 7]);
+        };
+
+        let mut server_task = ~Task::new_root(&mut server_sched.stack_pool, None, server_fn);
+        server_task.death.on_exit = Some(server_on_exit);
+        let server_task = Cell::new(server_task);
+
+        let mut client_task = ~Task::new_root(&mut client_sched.stack_pool, None, client_fn);
+        client_task.death.on_exit = Some(client_on_exit);
+        let client_task = Cell::new(client_task);
+
+        let server_sched = Cell::new(server_sched);
+        let client_sched = Cell::new(client_sched);
+
+        let server_thread = do Thread::start {
+            server_sched.take().bootstrap(server_task.take());
+        };
+        let client_thread = do Thread::start {
+            client_sched.take().bootstrap(client_task.take());
+        };
+
+        server_thread.join();
+        client_thread.join();
+    }
+}
+
+#[test]
 fn test_simple_udp_server_and_client() {
     do run_in_newsched_task {
         let server_addr = next_test_ip4();
@@ -1146,19 +1488,13 @@ fn test_udp_many_read() {
     }
 }
 
-fn test_timer_sleep_simple_impl() {
-    unsafe {
-        let io = Local::unsafe_borrow::<IoFactoryObject>();
-        let timer = (*io).timer_init();
-        match timer {
-            Ok(t) => t.sleep(1),
-            Err(_) => assert!(false)
-        }
-    }
-}
 #[test]
 fn test_timer_sleep_simple() {
     do run_in_newsched_task {
-        test_timer_sleep_simple_impl();
+        unsafe {
+            let io = Local::unsafe_borrow::<IoFactoryObject>();
+            let timer = (*io).timer_init();
+            do timer.map_move |mut t| { t.sleep(1) };
+        }
     }
 }
diff --git a/src/libstd/rt/uv/uvll.rs b/src/libstd/rt/uv/uvll.rs
index 65c0cffe5a0..0ea2175336a 100644
--- a/src/libstd/rt/uv/uvll.rs
+++ b/src/libstd/rt/uv/uvll.rs
@@ -172,6 +172,7 @@ fn request_sanity_check() {
     }
 }
 
+// XXX Event loops ignore SIGPIPE by default.
 pub unsafe fn loop_new() -> *c_void {
     #[fixed_stack_segment]; #[inline(never)];
 
@@ -287,7 +288,7 @@ pub unsafe fn get_udp_handle_from_send_req(send_req: *uv_udp_send_t) -> *uv_udp_
     return rust_uv_get_udp_handle_from_send_req(send_req);
 }
 
-pub unsafe fn udp_get_sockname(handle: *uv_udp_t, name: *sockaddr_storage) -> c_int {
+pub unsafe fn udp_getsockname(handle: *uv_udp_t, name: *sockaddr_storage) -> c_int {
     #[fixed_stack_segment]; #[inline(never)];
 
     return rust_uv_udp_getsockname(handle, name);