about summary refs log tree commit diff
path: root/src/libstd
diff options
context:
space:
mode:
authorbors <bors@rust-lang.org>2013-08-20 17:12:09 -0700
committerbors <bors@rust-lang.org>2013-08-20 17:12:09 -0700
commit0bc1ca404539102065ee0757944ce2288db4cb32 (patch)
treeb52c5d124c4321244e781047080edb4153381fca /src/libstd
parent3cd978fbc75a691a144dcafe99a08531a424fcd0 (diff)
parent35e844ffc1e3c022e868817ad1c548b900db800a (diff)
downloadrust-0bc1ca404539102065ee0757944ce2288db4cb32.tar.gz
rust-0bc1ca404539102065ee0757944ce2288db4cb32.zip
auto merge of #8631 : anasazi/rust/homing-io, r=brson
libuv handles are tied to the event loop that created them. In order to perform IO, the handle must be on the thread with its home event loop. Thus, when as task wants to do IO it must first go to the IO handle's home event loop and pin itself to the corresponding scheduler while the IO action is in flight. Once the IO action completes, the task is unpinned and either returns to its home scheduler if it is a pinned task, or otherwise stays on the current scheduler.

Making new blocking IO implementations (i.e. files) thread safe is rather simple. Add a home field to the IO handle's struct in uvio and implement the HomingIO trait. Wrap every IO call in the HomingIO.home_for_io method, which will take care of the scheduling.

I'm not sure if this remains thread safe in the presence of asynchronous IO at the libuv level. If we decide to do that, then this set up should be revisited.
Diffstat (limited to 'src/libstd')
-rw-r--r--src/libstd/rt/io/net/ip.rs4
-rw-r--r--src/libstd/rt/io/net/tcp.rs8
-rw-r--r--src/libstd/rt/io/timer.rs10
-rw-r--r--src/libstd/rt/rtio.rs2
-rw-r--r--src/libstd/rt/uv/net.rs6
-rw-r--r--src/libstd/rt/uv/uvio.rs994
-rw-r--r--src/libstd/rt/uv/uvll.rs3
7 files changed, 679 insertions, 348 deletions
diff --git a/src/libstd/rt/io/net/ip.rs b/src/libstd/rt/io/net/ip.rs
index 77176088801..3b3ea80eafa 100644
--- a/src/libstd/rt/io/net/ip.rs
+++ b/src/libstd/rt/io/net/ip.rs
@@ -17,7 +17,7 @@ use option::{Option, None, Some};
 
 type Port = u16;
 
-#[deriving(Eq, TotalEq)]
+#[deriving(Eq, TotalEq, Clone)]
 pub enum IpAddr {
     Ipv4Addr(u8, u8, u8, u8),
     Ipv6Addr(u16, u16, u16, u16, u16, u16, u16, u16)
@@ -62,7 +62,7 @@ impl ToStr for IpAddr {
     }
 }
 
-#[deriving(Eq, TotalEq)]
+#[deriving(Eq, TotalEq, Clone)]
 pub struct SocketAddr {
     ip: IpAddr,
     port: Port,
diff --git a/src/libstd/rt/io/net/tcp.rs b/src/libstd/rt/io/net/tcp.rs
index 27222542e08..746fa5668a5 100644
--- a/src/libstd/rt/io/net/tcp.rs
+++ b/src/libstd/rt/io/net/tcp.rs
@@ -88,9 +88,7 @@ impl Writer for TcpStream {
     fn write(&mut self, buf: &[u8]) {
         match (**self).write(buf) {
             Ok(_) => (),
-            Err(ioerr) => {
-                io_error::cond.raise(ioerr);
-            }
+            Err(ioerr) => io_error::cond.raise(ioerr),
         }
     }
 
@@ -129,9 +127,7 @@ impl TcpListener {
 impl Listener<TcpStream> for TcpListener {
     fn accept(&mut self) -> Option<TcpStream> {
         match (**self).accept() {
-            Ok(s) => {
-                Some(TcpStream::new(s))
-            }
+            Ok(s) => Some(TcpStream::new(s)),
             Err(ioerr) => {
                 io_error::cond.raise(ioerr);
                 return None;
diff --git a/src/libstd/rt/io/timer.rs b/src/libstd/rt/io/timer.rs
index c7820ebf623..bfd1ed48ac1 100644
--- a/src/libstd/rt/io/timer.rs
+++ b/src/libstd/rt/io/timer.rs
@@ -41,7 +41,7 @@ impl Timer {
 }
 
 impl RtioTimer for Timer {
-    fn sleep(&self, msecs: u64) {
+    fn sleep(&mut self, msecs: u64) {
         (**self).sleep(msecs);
     }
 }
@@ -50,15 +50,11 @@ impl RtioTimer for Timer {
 mod test {
     use super::*;
     use rt::test::*;
-    use option::{Some, None};
     #[test]
     fn test_io_timer_sleep_simple() {
         do run_in_newsched_task {
             let timer = Timer::new();
-            match timer {
-                Some(t) => t.sleep(1),
-                None => assert!(false)
-            }
+            do timer.map_move |mut t| { t.sleep(1) };
         }
     }
-}
\ No newline at end of file
+}
diff --git a/src/libstd/rt/rtio.rs b/src/libstd/rt/rtio.rs
index e29c30ba033..f36c96706e5 100644
--- a/src/libstd/rt/rtio.rs
+++ b/src/libstd/rt/rtio.rs
@@ -91,5 +91,5 @@ pub trait RtioUdpSocket : RtioSocket {
 }
 
 pub trait RtioTimer {
-    fn sleep(&self, msecs: u64);
+    fn sleep(&mut self, msecs: u64);
 }
diff --git a/src/libstd/rt/uv/net.rs b/src/libstd/rt/uv/net.rs
index c8b3d41a78d..e8d0296e543 100644
--- a/src/libstd/rt/uv/net.rs
+++ b/src/libstd/rt/uv/net.rs
@@ -190,9 +190,10 @@ impl StreamWatcher {
 
         extern fn close_cb(handle: *uvll::uv_stream_t) {
             let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(handle);
-            stream_watcher.get_watcher_data().close_cb.take_unwrap()();
+            let cb = stream_watcher.get_watcher_data().close_cb.take_unwrap();
             stream_watcher.drop_watcher_data();
             unsafe { free_handle(handle as *c_void) }
+            cb();
         }
     }
 }
@@ -411,9 +412,10 @@ impl UdpWatcher {
 
         extern fn close_cb(handle: *uvll::uv_udp_t) {
             let mut udp_watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
-            udp_watcher.get_watcher_data().close_cb.take_unwrap()();
+            let cb = udp_watcher.get_watcher_data().close_cb.take_unwrap();
             udp_watcher.drop_watcher_data();
             unsafe { free_handle(handle as *c_void) }
+            cb();
         }
     }
 }
diff --git a/src/libstd/rt/uv/uvio.rs b/src/libstd/rt/uv/uvio.rs
index 078dce4f0c8..f6a2d1be512 100644
--- a/src/libstd/rt/uv/uvio.rs
+++ b/src/libstd/rt/uv/uvio.rs
@@ -23,7 +23,7 @@ use rt::io::net::ip::{SocketAddr, IpAddr};
 use rt::io::{standard_error, OtherIoError};
 use rt::local::Local;
 use rt::rtio::*;
-use rt::sched::Scheduler;
+use rt::sched::{Scheduler, SchedHandle};
 use rt::tube::Tube;
 use rt::uv::*;
 use rt::uv::idle::IdleWatcher;
@@ -37,6 +37,49 @@ use unstable::sync::Exclusive;
                             run_in_newsched_task};
 #[cfg(test)] use iterator::{Iterator, range};
 
+// XXX we should not be calling uvll functions in here.
+
+trait HomingIO {
+    fn home<'r>(&'r mut self) -> &'r mut SchedHandle;
+    /* XXX This will move pinned tasks to do IO on the proper scheduler
+     * and then move them back to their home.
+     */
+    fn home_for_io<A>(&mut self, io: &fn(&mut Self) -> A) -> A {
+        use rt::sched::{PinnedTask, TaskFromFriend};
+        // go home
+        let old_home = Cell::new_empty();
+        let old_home_ptr = &old_home;
+        let scheduler = Local::take::<Scheduler>();
+        do scheduler.deschedule_running_task_and_then |_, task| {
+            // get the old home first
+            do task.wake().map_move |mut task| {
+                old_home_ptr.put_back(task.take_unwrap_home());
+                self.home().send(PinnedTask(task));
+            };
+        }
+
+        // do IO
+        let a = io(self);
+
+        // unhome home
+        let scheduler = Local::take::<Scheduler>();
+        do scheduler.deschedule_running_task_and_then |scheduler, task| {
+            do task.wake().map_move |mut task| {
+                task.give_home(old_home.take());
+                scheduler.make_handle().send(TaskFromFriend(task));
+            };
+        }
+
+        // return the result of the IO
+        a
+    }
+}
+
+// get a handle for the current scheduler
+macro_rules! get_handle_to_current_scheduler(
+    () => (do Local::borrow::<Scheduler, SchedHandle> |sched| { sched.make_handle() })
+)
+
 enum SocketNameKind {
     TcpPeer,
     Tcp,
@@ -45,12 +88,10 @@ enum SocketNameKind {
 
 fn socket_name<T, U: Watcher + NativeHandle<*T>>(sk: SocketNameKind,
                                                  handle: U) -> Result<SocketAddr, IoError> {
-    #[fixed_stack_segment]; #[inline(never)];
-
     let getsockname = match sk {
-        TcpPeer => uvll::rust_uv_tcp_getpeername,
-        Tcp     => uvll::rust_uv_tcp_getsockname,
-        Udp     => uvll::rust_uv_udp_getsockname
+        TcpPeer => uvll::tcp_getpeername,
+        Tcp     => uvll::tcp_getsockname,
+        Udp     => uvll::udp_getsockname,
     };
 
     // Allocate a sockaddr_storage
@@ -80,6 +121,7 @@ fn socket_name<T, U: Watcher + NativeHandle<*T>>(sk: SocketNameKind,
 
 }
 
+// Obviously an Event Loop is always home.
 pub struct UvEventLoop {
     uvio: UvIoFactory
 }
@@ -196,6 +238,7 @@ fn test_callback_run_once() {
     }
 }
 
+// The entire point of async is to call into a loop from other threads so it does not need to home.
 pub struct UvRemoteCallback {
     // The uv async handle for triggering the callback
     async: AsyncWatcher,
@@ -326,40 +369,38 @@ impl IoFactory for UvIoFactory {
         let result_cell = Cell::new_empty();
         let result_cell_ptr: *Cell<Result<~RtioTcpStreamObject, IoError>> = &result_cell;
 
-        let scheduler = Local::take::<Scheduler>();
-
         // Block this task and take ownership, switch to scheduler context
+        let scheduler = Local::take::<Scheduler>();
         do scheduler.deschedule_running_task_and_then |_, task| {
 
-            rtdebug!("connect: entered scheduler context");
-            let mut tcp_watcher = TcpWatcher::new(self.uv_loop());
+            let mut tcp = TcpWatcher::new(self.uv_loop());
             let task_cell = Cell::new(task);
 
             // Wait for a connection
-            do tcp_watcher.connect(addr) |stream_watcher, status| {
-                rtdebug!("connect: in connect callback");
-                if status.is_none() {
-                    rtdebug!("status is none");
-                    let tcp_watcher =
-                        NativeHandle::from_native_handle(stream_watcher.native_handle());
-                    let res = Ok(~UvTcpStream(tcp_watcher));
-
-                    // Store the stream in the task's stack
-                    unsafe { (*result_cell_ptr).put_back(res); }
-
-                    // Context switch
-                    let scheduler = Local::take::<Scheduler>();
-                    scheduler.resume_blocked_task_immediately(task_cell.take());
-                } else {
-                    rtdebug!("status is some");
-                    let task_cell = Cell::new(task_cell.take());
-                    do stream_watcher.close {
-                        let res = Err(uv_error_to_io_error(status.unwrap()));
+            do tcp.connect(addr) |stream, status| {
+                match status {
+                    None => {
+                        let tcp = NativeHandle::from_native_handle(stream.native_handle());
+                        let home = get_handle_to_current_scheduler!();
+                        let res = Ok(~UvTcpStream { watcher: tcp, home: home });
+
+                        // Store the stream in the task's stack
                         unsafe { (*result_cell_ptr).put_back(res); }
+
+                        // Context switch
                         let scheduler = Local::take::<Scheduler>();
                         scheduler.resume_blocked_task_immediately(task_cell.take());
                     }
-                };
+                    Some(_) => {
+                        let task_cell = Cell::new(task_cell.take());
+                        do stream.close {
+                            let res = Err(uv_error_to_io_error(status.unwrap()));
+                            unsafe { (*result_cell_ptr).put_back(res); }
+                            let scheduler = Local::take::<Scheduler>();
+                            scheduler.resume_blocked_task_immediately(task_cell.take());
+                        }
+                    }
+                }
             }
         }
 
@@ -370,7 +411,10 @@ impl IoFactory for UvIoFactory {
     fn tcp_bind(&mut self, addr: SocketAddr) -> Result<~RtioTcpListenerObject, IoError> {
         let mut watcher = TcpWatcher::new(self.uv_loop());
         match watcher.bind(addr) {
-            Ok(_) => Ok(~UvTcpListener::new(watcher)),
+            Ok(_) => {
+                let home = get_handle_to_current_scheduler!();
+                Ok(~UvTcpListener::new(watcher, home))
+            }
             Err(uverr) => {
                 let scheduler = Local::take::<Scheduler>();
                 do scheduler.deschedule_running_task_and_then |_, task| {
@@ -388,7 +432,10 @@ impl IoFactory for UvIoFactory {
     fn udp_bind(&mut self, addr: SocketAddr) -> Result<~RtioUdpSocketObject, IoError> {
         let mut watcher = UdpWatcher::new(self.uv_loop());
         match watcher.bind(addr) {
-            Ok(_) => Ok(~UvUdpSocket(watcher)),
+            Ok(_) => {
+                let home = get_handle_to_current_scheduler!();
+                Ok(~UvUdpSocket { watcher: watcher, home: home })
+            }
             Err(uverr) => {
                 let scheduler = Local::take::<Scheduler>();
                 do scheduler.deschedule_running_task_and_then |_, task| {
@@ -404,22 +451,30 @@ impl IoFactory for UvIoFactory {
     }
 
     fn timer_init(&mut self) -> Result<~RtioTimerObject, IoError> {
-        Ok(~UvTimer(TimerWatcher::new(self.uv_loop())))
+        let watcher = TimerWatcher::new(self.uv_loop());
+        let home = get_handle_to_current_scheduler!();
+        Ok(~UvTimer::new(watcher, home))
     }
 }
 
 pub struct UvTcpListener {
     watcher: TcpWatcher,
     listening: bool,
-    incoming_streams: Tube<Result<~RtioTcpStreamObject, IoError>>
+    incoming_streams: Tube<Result<~RtioTcpStreamObject, IoError>>,
+    home: SchedHandle,
+}
+
+impl HomingIO for UvTcpListener {
+    fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
 }
 
 impl UvTcpListener {
-    fn new(watcher: TcpWatcher) -> UvTcpListener {
+    fn new(watcher: TcpWatcher, home: SchedHandle) -> UvTcpListener {
         UvTcpListener {
             watcher: watcher,
             listening: false,
-            incoming_streams: Tube::new()
+            incoming_streams: Tube::new(),
+            home: home,
         }
     }
 
@@ -428,13 +483,16 @@ impl UvTcpListener {
 
 impl Drop for UvTcpListener {
     fn drop(&self) {
-        let watcher = self.watcher();
-        let scheduler = Local::take::<Scheduler>();
-        do scheduler.deschedule_running_task_and_then |_, task| {
-            let task_cell = Cell::new(task);
-            do watcher.as_stream().close {
-                let scheduler = Local::take::<Scheduler>();
-                scheduler.resume_blocked_task_immediately(task_cell.take());
+        // XXX need mutable finalizer
+        let self_ = unsafe { transmute::<&UvTcpListener, &mut UvTcpListener>(self) };
+        do self_.home_for_io |self_| {
+            let scheduler = Local::take::<Scheduler>();
+            do scheduler.deschedule_running_task_and_then |_, task| {
+                let task_cell = Cell::new(task);
+                do self_.watcher().as_stream().close {
+                    let scheduler = Local::take::<Scheduler>();
+                    scheduler.resume_blocked_task_immediately(task_cell.take());
+                }
             }
         }
     }
@@ -442,83 +500,92 @@ impl Drop for UvTcpListener {
 
 impl RtioSocket for UvTcpListener {
     fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
-        socket_name(Tcp, self.watcher)
+        do self.home_for_io |self_| {
+          socket_name(Tcp, self_.watcher)
+        }
     }
 }
 
 impl RtioTcpListener for UvTcpListener {
 
     fn accept(&mut self) -> Result<~RtioTcpStreamObject, IoError> {
-        rtdebug!("entering listen");
-
-        if self.listening {
-            return self.incoming_streams.recv();
-        }
-
-        self.listening = true;
-
-        let server_tcp_watcher = self.watcher();
-        let incoming_streams_cell = Cell::new(self.incoming_streams.clone());
-
-        let incoming_streams_cell = Cell::new(incoming_streams_cell.take());
-        let mut server_tcp_watcher = server_tcp_watcher;
-        do server_tcp_watcher.listen |mut server_stream_watcher, status| {
-            let maybe_stream = if status.is_none() {
-                let mut loop_ = server_stream_watcher.event_loop();
-                let client_tcp_watcher = TcpWatcher::new(&mut loop_);
-                // XXX: Need's to be surfaced in interface
-                server_stream_watcher.accept(client_tcp_watcher.as_stream());
-                Ok(~UvTcpStream(client_tcp_watcher))
-            } else {
-                Err(standard_error(OtherIoError))
-            };
+        do self.home_for_io |self_| {
+
+            if !self_.listening {
+                self_.listening = true;
+
+                let incoming_streams_cell = Cell::new(self_.incoming_streams.clone());
+
+                do self_.watcher().listen |mut server, status| {
+                    let stream = match status {
+                        Some(_) => Err(standard_error(OtherIoError)),
+                        None => {
+                            let client = TcpWatcher::new(&server.event_loop());
+                            // XXX: needs to be surfaced in interface
+                            server.accept(client.as_stream());
+                            let home = get_handle_to_current_scheduler!();
+                            Ok(~UvTcpStream { watcher: client, home: home })
+                        }
+                    };
+
+                    let mut incoming_streams = incoming_streams_cell.take();
+                    incoming_streams.send(stream);
+                    incoming_streams_cell.put_back(incoming_streams);
+                }
 
-            let mut incoming_streams = incoming_streams_cell.take();
-            incoming_streams.send(maybe_stream);
-            incoming_streams_cell.put_back(incoming_streams);
+            }
+            self_.incoming_streams.recv()
         }
-
-        return self.incoming_streams.recv();
     }
 
     fn accept_simultaneously(&mut self) -> Result<(), IoError> {
-        #[fixed_stack_segment]; #[inline(never)];
-
-        let r = unsafe {
-            uvll::rust_uv_tcp_simultaneous_accepts(self.watcher.native_handle(), 1 as c_int)
-        };
+        do self.home_for_io |self_| {
+            let r = unsafe {
+                uvll::tcp_simultaneous_accepts(self_.watcher().native_handle(), 1 as c_int)
+            };
 
-        match status_to_maybe_uv_error(self.watcher, r) {
-            Some(err) => Err(uv_error_to_io_error(err)),
-            None => Ok(())
+            match status_to_maybe_uv_error(self_.watcher(), r) {
+                Some(err) => Err(uv_error_to_io_error(err)),
+                None => Ok(())
+            }
         }
     }
 
     fn dont_accept_simultaneously(&mut self) -> Result<(), IoError> {
-        #[fixed_stack_segment]; #[inline(never)];
-
-        let r = unsafe {
-            uvll::rust_uv_tcp_simultaneous_accepts(self.watcher.native_handle(), 0 as c_int)
-        };
+        do self.home_for_io |self_| {
+            let r = unsafe {
+                uvll::tcp_simultaneous_accepts(self_.watcher().native_handle(), 0 as c_int)
+            };
 
-        match status_to_maybe_uv_error(self.watcher, r) {
-            Some(err) => Err(uv_error_to_io_error(err)),
-            None => Ok(())
+            match status_to_maybe_uv_error(self_.watcher(), r) {
+                Some(err) => Err(uv_error_to_io_error(err)),
+                None => Ok(())
+            }
         }
     }
 }
 
-pub struct UvTcpStream(TcpWatcher);
+pub struct UvTcpStream {
+    watcher: TcpWatcher,
+    home: SchedHandle,
+}
+
+impl HomingIO for UvTcpStream {
+    fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
+}
 
 impl Drop for UvTcpStream {
     fn drop(&self) {
-        rtdebug!("closing tcp stream");
-        let scheduler = Local::take::<Scheduler>();
-        do scheduler.deschedule_running_task_and_then |_, task| {
-            let task_cell = Cell::new(task);
-            do self.as_stream().close {
-                let scheduler = Local::take::<Scheduler>();
-                scheduler.resume_blocked_task_immediately(task_cell.take());
+        // XXX need mutable finalizer
+        let this = unsafe { transmute::<&UvTcpStream, &mut UvTcpStream>(self) };
+        do this.home_for_io |self_| {
+            let scheduler = Local::take::<Scheduler>();
+            do scheduler.deschedule_running_task_and_then |_, task| {
+                let task_cell = Cell::new(task);
+                do self_.watcher.as_stream().close {
+                    let scheduler = Local::take::<Scheduler>();
+                    scheduler.resume_blocked_task_immediately(task_cell.take());
+                }
             }
         }
     }
@@ -526,148 +593,161 @@ impl Drop for UvTcpStream {
 
 impl RtioSocket for UvTcpStream {
     fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
-        socket_name(Tcp, **self)
+        do self.home_for_io |self_| {
+            socket_name(Tcp, self_.watcher)
+        }
     }
 }
 
 impl RtioTcpStream for UvTcpStream {
     fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
-        let result_cell = Cell::new_empty();
-        let result_cell_ptr: *Cell<Result<uint, IoError>> = &result_cell;
-
-        let scheduler = Local::take::<Scheduler>();
-        let buf_ptr: *&mut [u8] = &buf;
-        do scheduler.deschedule_running_task_and_then |_sched, task| {
-            rtdebug!("read: entered scheduler context");
-            let task_cell = Cell::new(task);
-            // XXX: We shouldn't reallocate these callbacks every
-            // call to read
-            let alloc: AllocCallback = |_| unsafe {
-                slice_to_uv_buf(*buf_ptr)
-            };
-            let mut watcher = self.as_stream();
-            do watcher.read_start(alloc) |mut watcher, nread, _buf, status| {
-
-                // Stop reading so that no read callbacks are
-                // triggered before the user calls `read` again.
-                // XXX: Is there a performance impact to calling
-                // stop here?
-                watcher.read_stop();
-
-                let result = if status.is_none() {
-                    assert!(nread >= 0);
-                    Ok(nread as uint)
-                } else {
-                    Err(uv_error_to_io_error(status.unwrap()))
+        do self.home_for_io |self_| {
+            let result_cell = Cell::new_empty();
+            let result_cell_ptr: *Cell<Result<uint, IoError>> = &result_cell;
+
+            let scheduler = Local::take::<Scheduler>();
+            let buf_ptr: *&mut [u8] = &buf;
+            do scheduler.deschedule_running_task_and_then |_sched, task| {
+                let task_cell = Cell::new(task);
+                // XXX: We shouldn't reallocate these callbacks every
+                // call to read
+                let alloc: AllocCallback = |_| unsafe {
+                    slice_to_uv_buf(*buf_ptr)
                 };
+                let mut watcher = self_.watcher.as_stream();
+                do watcher.read_start(alloc) |mut watcher, nread, _buf, status| {
 
-                unsafe { (*result_cell_ptr).put_back(result); }
+                    // Stop reading so that no read callbacks are
+                    // triggered before the user calls `read` again.
+                    // XXX: Is there a performance impact to calling
+                    // stop here?
+                    watcher.read_stop();
 
-                let scheduler = Local::take::<Scheduler>();
-                scheduler.resume_blocked_task_immediately(task_cell.take());
+                    let result = if status.is_none() {
+                        assert!(nread >= 0);
+                        Ok(nread as uint)
+                    } else {
+                        Err(uv_error_to_io_error(status.unwrap()))
+                    };
+
+                    unsafe { (*result_cell_ptr).put_back(result); }
+
+                    let scheduler = Local::take::<Scheduler>();
+                    scheduler.resume_blocked_task_immediately(task_cell.take());
+                }
             }
-        }
 
-        assert!(!result_cell.is_empty());
-        return result_cell.take();
+            assert!(!result_cell.is_empty());
+            result_cell.take()
+        }
     }
 
     fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
-        let result_cell = Cell::new_empty();
-        let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
-        let scheduler = Local::take::<Scheduler>();
-        let buf_ptr: *&[u8] = &buf;
-        do scheduler.deschedule_running_task_and_then |_, task| {
-            let task_cell = Cell::new(task);
-            let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
-            let mut watcher = self.as_stream();
-            do watcher.write(buf) |_watcher, status| {
-                let result = if status.is_none() {
-                    Ok(())
-                } else {
-                    Err(uv_error_to_io_error(status.unwrap()))
-                };
-
-                unsafe { (*result_cell_ptr).put_back(result); }
+        do self.home_for_io |self_| {
+            let result_cell = Cell::new_empty();
+            let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
+            let scheduler = Local::take::<Scheduler>();
+            let buf_ptr: *&[u8] = &buf;
+            do scheduler.deschedule_running_task_and_then |_, task| {
+                let task_cell = Cell::new(task);
+                let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
+                let mut watcher = self_.watcher.as_stream();
+                do watcher.write(buf) |_watcher, status| {
+                    let result = if status.is_none() {
+                        Ok(())
+                    } else {
+                        Err(uv_error_to_io_error(status.unwrap()))
+                    };
+
+                    unsafe { (*result_cell_ptr).put_back(result); }
 
-                let scheduler = Local::take::<Scheduler>();
-                scheduler.resume_blocked_task_immediately(task_cell.take());
+                    let scheduler = Local::take::<Scheduler>();
+                    scheduler.resume_blocked_task_immediately(task_cell.take());
+                }
             }
-        }
 
-        assert!(!result_cell.is_empty());
-        return result_cell.take();
+            assert!(!result_cell.is_empty());
+            result_cell.take()
+        }
     }
 
     fn peer_name(&mut self) -> Result<SocketAddr, IoError> {
-        socket_name(TcpPeer, **self)
+        do self.home_for_io |self_| {
+            socket_name(TcpPeer, self_.watcher)
+        }
     }
 
     fn control_congestion(&mut self) -> Result<(), IoError> {
-        #[fixed_stack_segment]; #[inline(never)];
+        do self.home_for_io |self_| {
+            let r = unsafe { uvll::tcp_nodelay(self_.watcher.native_handle(), 0 as c_int) };
 
-        let r = unsafe {
-            uvll::rust_uv_tcp_nodelay(self.native_handle(), 0 as c_int)
-        };
-
-        match status_to_maybe_uv_error(**self, r) {
-            Some(err) => Err(uv_error_to_io_error(err)),
-            None => Ok(())
+            match status_to_maybe_uv_error(self_.watcher, r) {
+                Some(err) => Err(uv_error_to_io_error(err)),
+                None => Ok(())
+            }
         }
     }
 
     fn nodelay(&mut self) -> Result<(), IoError> {
-        #[fixed_stack_segment]; #[inline(never)];
-
-        let r = unsafe {
-            uvll::rust_uv_tcp_nodelay(self.native_handle(), 1 as c_int)
-        };
+        do self.home_for_io |self_| {
+            let r = unsafe { uvll::tcp_nodelay(self_.watcher.native_handle(), 1 as c_int) };
 
-        match status_to_maybe_uv_error(**self, r) {
-            Some(err) => Err(uv_error_to_io_error(err)),
-            None => Ok(())
+            match status_to_maybe_uv_error(self_.watcher, r) {
+                Some(err) => Err(uv_error_to_io_error(err)),
+                None => Ok(())
+            }
         }
     }
 
     fn keepalive(&mut self, delay_in_seconds: uint) -> Result<(), IoError> {
-        #[fixed_stack_segment]; #[inline(never)];
-
-        let r = unsafe {
-            uvll::rust_uv_tcp_keepalive(self.native_handle(), 1 as c_int,
-                                        delay_in_seconds as c_uint)
-        };
+        do self.home_for_io |self_| {
+            let r = unsafe {
+                uvll::tcp_keepalive(self_.watcher.native_handle(), 1 as c_int,
+                                    delay_in_seconds as c_uint)
+            };
 
-        match status_to_maybe_uv_error(**self, r) {
-            Some(err) => Err(uv_error_to_io_error(err)),
-            None => Ok(())
+            match status_to_maybe_uv_error(self_.watcher, r) {
+                Some(err) => Err(uv_error_to_io_error(err)),
+                None => Ok(())
+            }
         }
     }
 
     fn letdie(&mut self) -> Result<(), IoError> {
-        #[fixed_stack_segment]; #[inline(never)];
-
-        let r = unsafe {
-            uvll::rust_uv_tcp_keepalive(self.native_handle(), 0 as c_int, 0 as c_uint)
-        };
+        do self.home_for_io |self_| {
+            let r = unsafe {
+                uvll::tcp_keepalive(self_.watcher.native_handle(), 0 as c_int, 0 as c_uint)
+            };
 
-        match status_to_maybe_uv_error(**self, r) {
-            Some(err) => Err(uv_error_to_io_error(err)),
-            None => Ok(())
+            match status_to_maybe_uv_error(self_.watcher, r) {
+                Some(err) => Err(uv_error_to_io_error(err)),
+                None => Ok(())
+            }
         }
     }
 }
 
-pub struct UvUdpSocket(UdpWatcher);
+pub struct UvUdpSocket {
+    watcher: UdpWatcher,
+    home: SchedHandle,
+}
+
+impl HomingIO for UvUdpSocket {
+    fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
+}
 
 impl Drop for UvUdpSocket {
     fn drop(&self) {
-        rtdebug!("closing udp socket");
-        let scheduler = Local::take::<Scheduler>();
-        do scheduler.deschedule_running_task_and_then |_, task| {
-            let task_cell = Cell::new(task);
-            do self.close {
-                let scheduler = Local::take::<Scheduler>();
-                scheduler.resume_blocked_task_immediately(task_cell.take());
+        // XXX need mutable finalizer
+        let this = unsafe { transmute::<&UvUdpSocket, &mut UvUdpSocket>(self) };
+        do this.home_for_io |_| {
+            let scheduler = Local::take::<Scheduler>();
+            do scheduler.deschedule_running_task_and_then |_, task| {
+                let task_cell = Cell::new(task);
+                do this.watcher.close {
+                    let scheduler = Local::take::<Scheduler>();
+                    scheduler.resume_blocked_task_immediately(task_cell.take());
+                }
             }
         }
     }
@@ -675,203 +755,240 @@ impl Drop for UvUdpSocket {
 
 impl RtioSocket for UvUdpSocket {
     fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
-        socket_name(Udp, **self)
+        do self.home_for_io |self_| {
+            socket_name(Udp, self_.watcher)
+        }
     }
 }
 
 impl RtioUdpSocket for UvUdpSocket {
     fn recvfrom(&mut self, buf: &mut [u8]) -> Result<(uint, SocketAddr), IoError> {
-        let result_cell = Cell::new_empty();
-        let result_cell_ptr: *Cell<Result<(uint, SocketAddr), IoError>> = &result_cell;
-
-        let scheduler = Local::take::<Scheduler>();
-        let buf_ptr: *&mut [u8] = &buf;
-        do scheduler.deschedule_running_task_and_then |_sched, task| {
-            rtdebug!("recvfrom: entered scheduler context");
-            let task_cell = Cell::new(task);
-            let alloc: AllocCallback = |_| unsafe { slice_to_uv_buf(*buf_ptr) };
-            do self.recv_start(alloc) |mut watcher, nread, _buf, addr, flags, status| {
-                let _ = flags; // XXX add handling for partials?
-
-                watcher.recv_stop();
-
-                let result = match status {
-                    None => {
-                        assert!(nread >= 0);
-                        Ok((nread as uint, addr))
-                    }
-                    Some(err) => Err(uv_error_to_io_error(err))
-                };
-
-                unsafe { (*result_cell_ptr).put_back(result); }
+        do self.home_for_io |self_| {
+            let result_cell = Cell::new_empty();
+            let result_cell_ptr: *Cell<Result<(uint, SocketAddr), IoError>> = &result_cell;
+
+            let scheduler = Local::take::<Scheduler>();
+            let buf_ptr: *&mut [u8] = &buf;
+            do scheduler.deschedule_running_task_and_then |_, task| {
+                let task_cell = Cell::new(task);
+                let alloc: AllocCallback = |_| unsafe { slice_to_uv_buf(*buf_ptr) };
+                do self_.watcher.recv_start(alloc) |mut watcher, nread, _buf, addr, flags, status| {
+                    let _ = flags; // /XXX add handling for partials?
+
+                    watcher.recv_stop();
+
+                    let result = match status {
+                        None => {
+                            assert!(nread >= 0);
+                            Ok((nread as uint, addr))
+                        }
+                        Some(err) => Err(uv_error_to_io_error(err)),
+                    };
+
+                    unsafe { (*result_cell_ptr).put_back(result); }
 
-                let scheduler = Local::take::<Scheduler>();
-                scheduler.resume_blocked_task_immediately(task_cell.take());
+                    let scheduler = Local::take::<Scheduler>();
+                    scheduler.resume_blocked_task_immediately(task_cell.take());
+                }
             }
-        }
 
-        assert!(!result_cell.is_empty());
-        return result_cell.take();
+            assert!(!result_cell.is_empty());
+            result_cell.take()
+        }
     }
 
     fn sendto(&mut self, buf: &[u8], dst: SocketAddr) -> Result<(), IoError> {
-        let result_cell = Cell::new_empty();
-        let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
-        let scheduler = Local::take::<Scheduler>();
-        let buf_ptr: *&[u8] = &buf;
-        do scheduler.deschedule_running_task_and_then |_, task| {
-            let task_cell = Cell::new(task);
-            let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
-            do self.send(buf, dst) |_watcher, status| {
+        do self.home_for_io |self_| {
+            let result_cell = Cell::new_empty();
+            let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
+            let scheduler = Local::take::<Scheduler>();
+            let buf_ptr: *&[u8] = &buf;
+            do scheduler.deschedule_running_task_and_then |_, task| {
+                let task_cell = Cell::new(task);
+                let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
+                do self_.watcher.send(buf, dst) |_watcher, status| {
+
+                    let result = match status {
+                        None => Ok(()),
+                        Some(err) => Err(uv_error_to_io_error(err)),
+                    };
+
+                    unsafe { (*result_cell_ptr).put_back(result); }
 
-                let result = match status {
-                    None => Ok(()),
-                    Some(err) => Err(uv_error_to_io_error(err)),
-                };
-
-                unsafe { (*result_cell_ptr).put_back(result); }
-
-                let scheduler = Local::take::<Scheduler>();
-                scheduler.resume_blocked_task_immediately(task_cell.take());
+                    let scheduler = Local::take::<Scheduler>();
+                    scheduler.resume_blocked_task_immediately(task_cell.take());
+                }
             }
-        }
 
-        assert!(!result_cell.is_empty());
-        return result_cell.take();
+            assert!(!result_cell.is_empty());
+            result_cell.take()
+        }
     }
 
     fn join_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> {
-        let r = unsafe {
-            do multi.to_str().with_c_str |m_addr| {
-                uvll::udp_set_membership(self.native_handle(), m_addr,
-                                         ptr::null(), uvll::UV_JOIN_GROUP)
-            }
-        };
+        do self.home_for_io |self_| {
+            let r = unsafe {
+                do multi.to_str().with_c_str |m_addr| {
+                    uvll::udp_set_membership(self_.watcher.native_handle(), m_addr,
+                                             ptr::null(), uvll::UV_JOIN_GROUP)
+                }
+            };
 
-        match status_to_maybe_uv_error(**self, r) {
-            Some(err) => Err(uv_error_to_io_error(err)),
-            None => Ok(())
+            match status_to_maybe_uv_error(self_.watcher, r) {
+                Some(err) => Err(uv_error_to_io_error(err)),
+                None => Ok(())
+            }
         }
     }
 
     fn leave_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> {
-        let r = unsafe {
-            do multi.to_str().with_c_str |m_addr| {
-                uvll::udp_set_membership(self.native_handle(), m_addr,
-                                         ptr::null(), uvll::UV_LEAVE_GROUP)
-            }
-        };
+        do self.home_for_io |self_| {
+            let r = unsafe {
+                do multi.to_str().with_c_str |m_addr| {
+                    uvll::udp_set_membership(self_.watcher.native_handle(), m_addr,
+                                             ptr::null(), uvll::UV_LEAVE_GROUP)
+                }
+            };
 
-        match status_to_maybe_uv_error(**self, r) {
-            Some(err) => Err(uv_error_to_io_error(err)),
-            None => Ok(())
+            match status_to_maybe_uv_error(self_.watcher, r) {
+                Some(err) => Err(uv_error_to_io_error(err)),
+                None => Ok(())
+            }
         }
     }
 
     fn loop_multicast_locally(&mut self) -> Result<(), IoError> {
-        let r = unsafe {
-            uvll::udp_set_multicast_loop(self.native_handle(), 1 as c_int)
-        };
+        do self.home_for_io |self_| {
+
+            let r = unsafe {
+                uvll::udp_set_multicast_loop(self_.watcher.native_handle(), 1 as c_int)
+            };
 
-        match status_to_maybe_uv_error(**self, r) {
-            Some(err) => Err(uv_error_to_io_error(err)),
-            None => Ok(())
+            match status_to_maybe_uv_error(self_.watcher, r) {
+                Some(err) => Err(uv_error_to_io_error(err)),
+                None => Ok(())
+            }
         }
     }
 
     fn dont_loop_multicast_locally(&mut self) -> Result<(), IoError> {
-        let r = unsafe {
-            uvll::udp_set_multicast_loop(self.native_handle(), 0 as c_int)
-        };
+        do self.home_for_io |self_| {
+
+            let r = unsafe {
+                uvll::udp_set_multicast_loop(self_.watcher.native_handle(), 0 as c_int)
+            };
 
-        match status_to_maybe_uv_error(**self, r) {
-            Some(err) => Err(uv_error_to_io_error(err)),
-            None => Ok(())
+            match status_to_maybe_uv_error(self_.watcher, r) {
+                Some(err) => Err(uv_error_to_io_error(err)),
+                None => Ok(())
+            }
         }
     }
 
     fn multicast_time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
-        let r = unsafe {
-            uvll::udp_set_multicast_ttl(self.native_handle(), ttl as c_int)
-        };
+        do self.home_for_io |self_| {
 
-        match status_to_maybe_uv_error(**self, r) {
-            Some(err) => Err(uv_error_to_io_error(err)),
-            None => Ok(())
+            let r = unsafe {
+                uvll::udp_set_multicast_ttl(self_.watcher.native_handle(), ttl as c_int)
+            };
+
+            match status_to_maybe_uv_error(self_.watcher, r) {
+                Some(err) => Err(uv_error_to_io_error(err)),
+                None => Ok(())
+            }
         }
     }
 
     fn time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
-        let r = unsafe {
-            uvll::udp_set_ttl(self.native_handle(), ttl as c_int)
-        };
+        do self.home_for_io |self_| {
+
+            let r = unsafe {
+                uvll::udp_set_ttl(self_.watcher.native_handle(), ttl as c_int)
+            };
 
-        match status_to_maybe_uv_error(**self, r) {
-            Some(err) => Err(uv_error_to_io_error(err)),
-            None => Ok(())
+            match status_to_maybe_uv_error(self_.watcher, r) {
+                Some(err) => Err(uv_error_to_io_error(err)),
+                None => Ok(())
+            }
         }
     }
 
     fn hear_broadcasts(&mut self) -> Result<(), IoError> {
-        let r = unsafe {
-            uvll::udp_set_broadcast(self.native_handle(), 1 as c_int)
-        };
+        do self.home_for_io |self_| {
+
+            let r = unsafe {
+                uvll::udp_set_broadcast(self_.watcher.native_handle(), 1 as c_int)
+            };
 
-        match status_to_maybe_uv_error(**self, r) {
-            Some(err) => Err(uv_error_to_io_error(err)),
-            None => Ok(())
+            match status_to_maybe_uv_error(self_.watcher, r) {
+                Some(err) => Err(uv_error_to_io_error(err)),
+                None => Ok(())
+            }
         }
     }
 
     fn ignore_broadcasts(&mut self) -> Result<(), IoError> {
-        let r = unsafe {
-            uvll::udp_set_broadcast(self.native_handle(), 0 as c_int)
-        };
+        do self.home_for_io |self_| {
 
-        match status_to_maybe_uv_error(**self, r) {
-            Some(err) => Err(uv_error_to_io_error(err)),
-            None => Ok(())
+            let r = unsafe {
+                uvll::udp_set_broadcast(self_.watcher.native_handle(), 0 as c_int)
+            };
+
+            match status_to_maybe_uv_error(self_.watcher, r) {
+                Some(err) => Err(uv_error_to_io_error(err)),
+                None => Ok(())
+            }
         }
     }
 }
 
-pub struct UvTimer(timer::TimerWatcher);
+pub struct UvTimer {
+    watcher: timer::TimerWatcher,
+    home: SchedHandle,
+}
+
+impl HomingIO for UvTimer {
+    fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
+}
 
 impl UvTimer {
-    fn new(w: timer::TimerWatcher) -> UvTimer {
-        UvTimer(w)
+    fn new(w: timer::TimerWatcher, home: SchedHandle) -> UvTimer {
+        UvTimer { watcher: w, home: home }
     }
 }
 
 impl Drop for UvTimer {
     fn drop(&self) {
-        rtdebug!("closing UvTimer");
-        let scheduler = Local::take::<Scheduler>();
-        do scheduler.deschedule_running_task_and_then |_, task| {
-            let task_cell = Cell::new(task);
-            do self.close {
-                let scheduler = Local::take::<Scheduler>();
-                scheduler.resume_blocked_task_immediately(task_cell.take());
+        let self_ = unsafe { transmute::<&UvTimer, &mut UvTimer>(self) };
+        do self_.home_for_io |self_| {
+            rtdebug!("closing UvTimer");
+            let scheduler = Local::take::<Scheduler>();
+            do scheduler.deschedule_running_task_and_then |_, task| {
+                let task_cell = Cell::new(task);
+                do self_.watcher.close {
+                    let scheduler = Local::take::<Scheduler>();
+                    scheduler.resume_blocked_task_immediately(task_cell.take());
+                }
             }
         }
     }
 }
 
 impl RtioTimer for UvTimer {
-    fn sleep(&self, msecs: u64) {
-        let scheduler = Local::take::<Scheduler>();
-        do scheduler.deschedule_running_task_and_then |_sched, task| {
-            rtdebug!("sleep: entered scheduler context");
-            let task_cell = Cell::new(task);
-            let mut watcher = **self;
-            do watcher.start(msecs, 0) |_, status| {
-                assert!(status.is_none());
-                let scheduler = Local::take::<Scheduler>();
-                scheduler.resume_blocked_task_immediately(task_cell.take());
+    fn sleep(&mut self, msecs: u64) {
+        do self.home_for_io |self_| {
+            let scheduler = Local::take::<Scheduler>();
+            do scheduler.deschedule_running_task_and_then |_sched, task| {
+                rtdebug!("sleep: entered scheduler context");
+                let task_cell = Cell::new(task);
+                do self_.watcher.start(msecs, 0) |_, status| {
+                    assert!(status.is_none());
+                    let scheduler = Local::take::<Scheduler>();
+                    scheduler.resume_blocked_task_immediately(task_cell.take());
+                }
             }
+            self_.watcher.stop();
         }
-        let mut w = **self;
-        w.stop();
     }
 }
 
@@ -900,6 +1017,152 @@ fn test_simple_udp_io_bind_only() {
 }
 
 #[test]
+fn test_simple_homed_udp_io_bind_then_move_task_then_home_and_close() {
+    use rt::sleeper_list::SleeperList;
+    use rt::work_queue::WorkQueue;
+    use rt::thread::Thread;
+    use rt::task::Task;
+    use rt::sched::{Shutdown, TaskFromFriend};
+    do run_in_bare_thread {
+        let sleepers = SleeperList::new();
+        let work_queue1 = WorkQueue::new();
+        let work_queue2 = WorkQueue::new();
+        let queues = ~[work_queue1.clone(), work_queue2.clone()];
+
+        let mut sched1 = ~Scheduler::new(~UvEventLoop::new(), work_queue1, queues.clone(),
+                                         sleepers.clone());
+        let mut sched2 = ~Scheduler::new(~UvEventLoop::new(), work_queue2, queues.clone(),
+                                         sleepers.clone());
+
+        let handle1 = Cell::new(sched1.make_handle());
+        let handle2 = Cell::new(sched2.make_handle());
+        let tasksFriendHandle = Cell::new(sched2.make_handle());
+
+        let on_exit: ~fn(bool) = |exit_status| {
+            handle1.take().send(Shutdown);
+            handle2.take().send(Shutdown);
+            rtassert!(exit_status);
+        };
+
+        let test_function: ~fn() = || {
+            let io = unsafe { Local::unsafe_borrow::<IoFactoryObject>() };
+            let addr = next_test_ip4();
+            let maybe_socket = unsafe { (*io).udp_bind(addr) };
+            // this socket is bound to this event loop
+            assert!(maybe_socket.is_ok());
+
+            // block self on sched1
+            let scheduler = Local::take::<Scheduler>();
+            do scheduler.deschedule_running_task_and_then |_, task| {
+                // unblock task
+                do task.wake().map_move |task| {
+                  // send self to sched2
+                  tasksFriendHandle.take().send(TaskFromFriend(task));
+                };
+                // sched1 should now sleep since it has nothing else to do
+            }
+            // sched2 will wake up and get the task
+            // as we do nothing else, the function ends and the socket goes out of scope
+            // sched2 will start to run the destructor
+            // the destructor will first block the task, set it's home as sched1, then enqueue it
+            // sched2 will dequeue the task, see that it has a home, and send it to sched1
+            // sched1 will wake up, exec the close function on the correct loop, and then we're done
+        };
+
+        let mut main_task = ~Task::new_root(&mut sched1.stack_pool, None, test_function);
+        main_task.death.on_exit = Some(on_exit);
+        let main_task = Cell::new(main_task);
+
+        let null_task = Cell::new(~do Task::new_root(&mut sched2.stack_pool, None) || {});
+
+        let sched1 = Cell::new(sched1);
+        let sched2 = Cell::new(sched2);
+
+        let thread1 = do Thread::start {
+            sched1.take().bootstrap(main_task.take());
+        };
+        let thread2 = do Thread::start {
+            sched2.take().bootstrap(null_task.take());
+        };
+
+        thread1.join();
+        thread2.join();
+    }
+}
+
+#[test]
+fn test_simple_homed_udp_io_bind_then_move_handle_then_home_and_close() {
+    use rt::sleeper_list::SleeperList;
+    use rt::work_queue::WorkQueue;
+    use rt::thread::Thread;
+    use rt::task::Task;
+    use rt::comm::oneshot;
+    use rt::sched::Shutdown;
+    do run_in_bare_thread {
+        let sleepers = SleeperList::new();
+        let work_queue1 = WorkQueue::new();
+        let work_queue2 = WorkQueue::new();
+        let queues = ~[work_queue1.clone(), work_queue2.clone()];
+
+        let mut sched1 = ~Scheduler::new(~UvEventLoop::new(), work_queue1, queues.clone(),
+                                         sleepers.clone());
+        let mut sched2 = ~Scheduler::new(~UvEventLoop::new(), work_queue2, queues.clone(),
+                                         sleepers.clone());
+
+        let handle1 = Cell::new(sched1.make_handle());
+        let handle2 = Cell::new(sched2.make_handle());
+
+        let (port, chan) = oneshot();
+        let port = Cell::new(port);
+        let chan = Cell::new(chan);
+
+        let body1: ~fn() = || {
+            let io = unsafe { Local::unsafe_borrow::<IoFactoryObject>() };
+            let addr = next_test_ip4();
+            let socket = unsafe { (*io).udp_bind(addr) };
+            assert!(socket.is_ok());
+            chan.take().send(socket);
+        };
+
+        let body2: ~fn() = || {
+            let socket = port.take().recv();
+            assert!(socket.is_ok());
+            /* The socket goes out of scope and the destructor is called.
+             * The destructor:
+             *  - sends itself back to sched1
+             *  - frees the socket
+             *  - resets the home of the task to whatever it was previously
+             */
+        };
+
+        let on_exit: ~fn(bool) = |exit| {
+            handle1.take().send(Shutdown);
+            handle2.take().send(Shutdown);
+            rtassert!(exit);
+        };
+
+        let task1 = Cell::new(~Task::new_root(&mut sched1.stack_pool, None, body1));
+
+        let mut task2 = ~Task::new_root(&mut sched2.stack_pool, None, body2);
+        task2.death.on_exit = Some(on_exit);
+        let task2 = Cell::new(task2);
+
+        let sched1 = Cell::new(sched1);
+        let sched2 = Cell::new(sched2);
+
+        let thread1 = do Thread::start {
+            sched1.take().bootstrap(task1.take());
+        };
+        let thread2 = do Thread::start {
+            sched2.take().bootstrap(task2.take());
+        };
+
+        thread1.join();
+        thread2.join();
+    }
+}
+
+#[test]
 fn test_simple_tcp_server_and_client() {
     do run_in_newsched_task {
         let addr = next_test_ip4();
@@ -931,6 +1194,85 @@ fn test_simple_tcp_server_and_client() {
 }
 
 #[test]
+fn test_simple_tcp_server_and_client_on_diff_threads() {
+    use rt::sleeper_list::SleeperList;
+    use rt::work_queue::WorkQueue;
+    use rt::thread::Thread;
+    use rt::task::Task;
+    use rt::sched::{Shutdown};
+    do run_in_bare_thread {
+        let sleepers = SleeperList::new();
+
+        let server_addr = next_test_ip4();
+        let client_addr = server_addr.clone();
+
+        let server_work_queue = WorkQueue::new();
+        let client_work_queue = WorkQueue::new();
+        let queues = ~[server_work_queue.clone(), client_work_queue.clone()];
+
+        let mut server_sched = ~Scheduler::new(~UvEventLoop::new(), server_work_queue,
+                                               queues.clone(), sleepers.clone());
+        let mut client_sched = ~Scheduler::new(~UvEventLoop::new(), client_work_queue,
+                                               queues.clone(), sleepers.clone());
+
+        let server_handle = Cell::new(server_sched.make_handle());
+        let client_handle = Cell::new(client_sched.make_handle());
+
+        let server_on_exit: ~fn(bool) = |exit_status| {
+            server_handle.take().send(Shutdown);
+            rtassert!(exit_status);
+        };
+
+        let client_on_exit: ~fn(bool) = |exit_status| {
+            client_handle.take().send(Shutdown);
+            rtassert!(exit_status);
+        };
+
+        let server_fn: ~fn() = || {
+            let io = unsafe { Local::unsafe_borrow::<IoFactoryObject>() };
+            let mut listener = unsafe { (*io).tcp_bind(server_addr).unwrap() };
+            let mut stream = listener.accept().unwrap();
+            let mut buf = [0, .. 2048];
+            let nread = stream.read(buf).unwrap();
+            assert_eq!(nread, 8);
+            for i in range(0u, nread) {
+                assert_eq!(buf[i], i as u8);
+            }
+        };
+
+        let client_fn: ~fn() = || {
+            let io = unsafe { Local::unsafe_borrow::<IoFactoryObject>() };
+            let mut stream = unsafe { (*io).tcp_connect(client_addr) };
+            while stream.is_err() {
+                stream = unsafe { (*io).tcp_connect(client_addr) };
+            }
+            stream.unwrap().write([0, 1, 2, 3, 4, 5, 6, 7]);
+        };
+
+        let mut server_task = ~Task::new_root(&mut server_sched.stack_pool, None, server_fn);
+        server_task.death.on_exit = Some(server_on_exit);
+        let server_task = Cell::new(server_task);
+
+        let mut client_task = ~Task::new_root(&mut client_sched.stack_pool, None, client_fn);
+        client_task.death.on_exit = Some(client_on_exit);
+        let client_task = Cell::new(client_task);
+
+        let server_sched = Cell::new(server_sched);
+        let client_sched = Cell::new(client_sched);
+
+        let server_thread = do Thread::start {
+            server_sched.take().bootstrap(server_task.take());
+        };
+        let client_thread = do Thread::start {
+            client_sched.take().bootstrap(client_task.take());
+        };
+
+        server_thread.join();
+        client_thread.join();
+    }
+}
+
+#[test]
 fn test_simple_udp_server_and_client() {
     do run_in_newsched_task {
         let server_addr = next_test_ip4();
@@ -1146,19 +1488,13 @@ fn test_udp_many_read() {
     }
 }
 
-fn test_timer_sleep_simple_impl() {
-    unsafe {
-        let io = Local::unsafe_borrow::<IoFactoryObject>();
-        let timer = (*io).timer_init();
-        match timer {
-            Ok(t) => t.sleep(1),
-            Err(_) => assert!(false)
-        }
-    }
-}
 #[test]
 fn test_timer_sleep_simple() {
     do run_in_newsched_task {
-        test_timer_sleep_simple_impl();
+        unsafe {
+            let io = Local::unsafe_borrow::<IoFactoryObject>();
+            let timer = (*io).timer_init();
+            do timer.map_move |mut t| { t.sleep(1) };
+        }
     }
 }
diff --git a/src/libstd/rt/uv/uvll.rs b/src/libstd/rt/uv/uvll.rs
index 65c0cffe5a0..0ea2175336a 100644
--- a/src/libstd/rt/uv/uvll.rs
+++ b/src/libstd/rt/uv/uvll.rs
@@ -172,6 +172,7 @@ fn request_sanity_check() {
     }
 }
 
+// XXX Event loops ignore SIGPIPE by default.
 pub unsafe fn loop_new() -> *c_void {
     #[fixed_stack_segment]; #[inline(never)];
 
@@ -287,7 +288,7 @@ pub unsafe fn get_udp_handle_from_send_req(send_req: *uv_udp_send_t) -> *uv_udp_
     return rust_uv_get_udp_handle_from_send_req(send_req);
 }
 
-pub unsafe fn udp_get_sockname(handle: *uv_udp_t, name: *sockaddr_storage) -> c_int {
+pub unsafe fn udp_getsockname(handle: *uv_udp_t, name: *sockaddr_storage) -> c_int {
     #[fixed_stack_segment]; #[inline(never)];
 
     return rust_uv_udp_getsockname(handle, name);