about summary refs log tree commit diff
path: root/src
diff options
context:
space:
mode:
authorAlex Crichton <alex@alexcrichton.com>2013-12-13 11:30:59 -0800
committerAlex Crichton <alex@alexcrichton.com>2013-12-24 19:59:53 -0800
commitafd4e2ad8dc4112b99c8d30996ff0bb5b0516b53 (patch)
tree9e4bd858bbb5e3c0b3526c0976d0ca032217c16a /src
parentf5d9b2ca6d9a360112f06b3044897c22736c52b8 (diff)
downloadrust-afd4e2ad8dc4112b99c8d30996ff0bb5b0516b53.tar.gz
rust-afd4e2ad8dc4112b99c8d30996ff0bb5b0516b53.zip
rustuv: Get all tests passing again after refactor
All tests except for the homing tests are now working again with the
librustuv/libgreen refactoring. The homing-related tests are currently commented
out and now placed in the rustuv::homing module.

I plan on refactoring scheduler pool spawning in order to enable more homing
tests in a future commit.
Diffstat (limited to 'src')
-rw-r--r--src/librustuv/addrinfo.rs7
-rw-r--r--src/librustuv/async.rs13
-rw-r--r--src/librustuv/file.rs12
-rw-r--r--src/librustuv/homing.rs121
-rw-r--r--src/librustuv/idle.rs87
-rw-r--r--src/librustuv/lib.rs12
-rw-r--r--src/librustuv/net.rs334
-rw-r--r--src/librustuv/pipe.rs3
-rw-r--r--src/librustuv/signal.rs3
-rw-r--r--src/librustuv/timer.rs1
-rw-r--r--src/librustuv/uvio.rs1
-rw-r--r--src/libstd/io/mod.rs5
12 files changed, 286 insertions, 313 deletions
diff --git a/src/librustuv/addrinfo.rs b/src/librustuv/addrinfo.rs
index f4c12c6f2a3..f6fad524b5c 100644
--- a/src/librustuv/addrinfo.rs
+++ b/src/librustuv/addrinfo.rs
@@ -186,10 +186,12 @@ pub fn accum_addrinfo(addr: &Addrinfo) -> ~[ai::Info] {
 mod test {
     use std::io::net::ip::{SocketAddr, Ipv4Addr};
     use super::super::local_loop;
+    use super::GetAddrInfoRequest;
 
     #[test]
     fn getaddrinfo_test() {
-        match GetAddrInfoRequest::run(local_loop(), Some("localhost"), None, None) {
+        let loop_ = &mut local_loop().loop_;
+        match GetAddrInfoRequest::run(loop_, Some("localhost"), None, None) {
             Ok(infos) => {
                 let mut found_local = false;
                 let local_addr = &SocketAddr {
@@ -207,9 +209,10 @@ mod test {
 
     #[test]
     fn issue_10663() {
+        let loop_ = &mut local_loop().loop_;
         // Something should happen here, but this certainly shouldn't cause
         // everything to die. The actual outcome we don't care too much about.
-        GetAddrInfoRequest::run(local_loop(), Some("irc.n0v4.com"), None,
+        GetAddrInfoRequest::run(loop_, Some("irc.n0v4.com"), None,
                                 None);
     }
 }
diff --git a/src/librustuv/async.rs b/src/librustuv/async.rs
index 2d770ff6be1..0c353785982 100644
--- a/src/librustuv/async.rs
+++ b/src/librustuv/async.rs
@@ -127,15 +127,15 @@ impl Drop for AsyncWatcher {
 mod test_remote {
     use std::rt::rtio::Callback;
     use std::rt::thread::Thread;
-    use std::rt::tube::Tube;
 
+    use super::AsyncWatcher;
     use super::super::local_loop;
 
     // Make sure that we can fire watchers in remote threads and that they
     // actually trigger what they say they will.
     #[test]
     fn smoke_test() {
-        struct MyCallback(Option<Tube<int>>);
+        struct MyCallback(Option<Chan<int>>);
         impl Callback for MyCallback {
             fn call(&mut self) {
                 // this can get called more than once, but we only want to send
@@ -146,16 +146,17 @@ mod test_remote {
             }
         }
 
-        let mut tube = Tube::new();
-        let cb = ~MyCallback(Some(tube.clone()));
-        let watcher = AsyncWatcher::new(local_loop(), cb as ~Callback);
+        let (port, chan) = Chan::new();
+        let cb = ~MyCallback(Some(chan));
+        let watcher = AsyncWatcher::new(&mut local_loop().loop_,
+                                        cb as ~Callback);
 
         let thread = do Thread::start {
             let mut watcher = watcher;
             watcher.fire();
         };
 
-        assert_eq!(tube.recv(), 1);
+        assert_eq!(port.recv(), 1);
         thread.join();
     }
 }
diff --git a/src/librustuv/file.rs b/src/librustuv/file.rs
index cebf4f199e4..059bf072a1a 100644
--- a/src/librustuv/file.rs
+++ b/src/librustuv/file.rs
@@ -448,7 +448,11 @@ mod test {
     use std::io;
     use std::str;
     use std::vec;
-    use l = super::super::local_loop;
+    use super::FsRequest;
+    use super::super::Loop;
+    use super::super::local_loop;
+
+    fn l() -> &mut Loop { &mut local_loop().loop_ }
 
     #[test]
     fn file_test_full_simple_sync() {
@@ -459,7 +463,7 @@ mod test {
 
         {
             // open/create
-            let result = FsRequest::open(l(), &path_str.to_c_str(),
+            let result = FsRequest::open(local_loop(), &path_str.to_c_str(),
                                          create_flags as int, mode as int);
             assert!(result.is_ok());
             let result = result.unwrap();
@@ -472,7 +476,7 @@ mod test {
 
         {
             // re-open
-            let result = FsRequest::open(l(), &path_str.to_c_str(),
+            let result = FsRequest::open(local_loop(), &path_str.to_c_str(),
                                          read_flags as int, 0);
             assert!(result.is_ok());
             let result = result.unwrap();
@@ -499,7 +503,7 @@ mod test {
         let create_flags = (O_RDWR | O_CREAT) as int;
         let mode = (S_IWUSR | S_IRUSR) as int;
 
-        let result = FsRequest::open(l(), path, create_flags, mode);
+        let result = FsRequest::open(local_loop(), path, create_flags, mode);
         assert!(result.is_ok());
         let file = result.unwrap();
 
diff --git a/src/librustuv/homing.rs b/src/librustuv/homing.rs
index 7a12c4ad06d..d09dfae0b29 100644
--- a/src/librustuv/homing.rs
+++ b/src/librustuv/homing.rs
@@ -142,3 +142,124 @@ impl Drop for HomingMissile {
         self.check("task moved away from the home scheduler");
     }
 }
+
+#[cfg(test)]
+mod test {
+    // On one thread, create a udp socket. Then send that socket to another
+    // thread and destroy the socket on the remote thread. This should make sure
+    // that homing kicks in for the socket to go back home to the original
+    // thread, close itself, and then come back to the last thread.
+    //#[test]
+    //fn test_homing_closes_correctly() {
+    //    let (port, chan) = Chan::new();
+
+    //    do task::spawn_sched(task::SingleThreaded) {
+    //        let listener = UdpWatcher::bind(local_loop(), next_test_ip4()).unwrap();
+    //        chan.send(listener);
+    //    }
+
+    //    do task::spawn_sched(task::SingleThreaded) {
+    //        port.recv();
+    //    }
+    //}
+
+    // This is a bit of a crufty old test, but it has its uses.
+    //#[test]
+    //fn test_simple_homed_udp_io_bind_then_move_task_then_home_and_close() {
+    //    use std::cast;
+    //    use std::rt::local::Local;
+    //    use std::rt::rtio::{EventLoop, IoFactory};
+    //    use std::rt::sched::Scheduler;
+    //    use std::rt::sched::{Shutdown, TaskFromFriend};
+    //    use std::rt::sleeper_list::SleeperList;
+    //    use std::rt::task::Task;
+    //    use std::rt::task::UnwindResult;
+    //    use std::rt::thread::Thread;
+    //    use std::rt::deque::BufferPool;
+    //    use std::unstable::run_in_bare_thread;
+    //    use uvio::UvEventLoop;
+
+    //    do run_in_bare_thread {
+    //        let sleepers = SleeperList::new();
+    //        let mut pool = BufferPool::new();
+    //        let (worker1, stealer1) = pool.deque();
+    //        let (worker2, stealer2) = pool.deque();
+    //        let queues = ~[stealer1, stealer2];
+
+    //        let loop1 = ~UvEventLoop::new() as ~EventLoop;
+    //        let mut sched1 = ~Scheduler::new(loop1, worker1, queues.clone(),
+    //                                         sleepers.clone());
+    //        let loop2 = ~UvEventLoop::new() as ~EventLoop;
+    //        let mut sched2 = ~Scheduler::new(loop2, worker2, queues.clone(),
+    //                                         sleepers.clone());
+
+    //        let handle1 = sched1.make_handle();
+    //        let handle2 = sched2.make_handle();
+    //        let tasksFriendHandle = sched2.make_handle();
+
+    //        let on_exit: proc(UnwindResult) = proc(exit_status) {
+    //            let mut handle1 = handle1;
+    //            let mut handle2 = handle2;
+    //            handle1.send(Shutdown);
+    //            handle2.send(Shutdown);
+    //            assert!(exit_status.is_success());
+    //        };
+
+    //        unsafe fn local_io() -> &'static mut IoFactory {
+    //            let mut sched = Local::borrow(None::<Scheduler>);
+    //            let io = sched.get().event_loop.io();
+    //            cast::transmute(io.unwrap())
+    //        }
+
+    //        let test_function: proc() = proc() {
+    //            let io = unsafe { local_io() };
+    //            let addr = next_test_ip4();
+    //            let maybe_socket = io.udp_bind(addr);
+    //            // this socket is bound to this event loop
+    //            assert!(maybe_socket.is_ok());
+
+    //            // block self on sched1
+    //            let scheduler: ~Scheduler = Local::take();
+    //            let mut tasksFriendHandle = Some(tasksFriendHandle);
+    //            scheduler.deschedule_running_task_and_then(|_, task| {
+    //                // unblock task
+    //                task.wake().map(|task| {
+    //                    // send self to sched2
+    //                    tasksFriendHandle.take_unwrap()
+    //                                     .send(TaskFromFriend(task));
+    //                });
+    //                // sched1 should now sleep since it has nothing else to do
+    //            })
+    //            // sched2 will wake up and get the task as we do nothing else,
+    //            // the function ends and the socket goes out of scope sched2
+    //            // will start to run the destructor the destructor will first
+    //            // block the task, set it's home as sched1, then enqueue it
+    //            // sched2 will dequeue the task, see that it has a home, and
+    //            // send it to sched1 sched1 will wake up, exec the close
+    //            // function on the correct loop, and then we're done
+    //        };
+
+    //        let mut main_task = ~Task::new_root(&mut sched1.stack_pool, None,
+    //                                            test_function);
+    //        main_task.death.on_exit = Some(on_exit);
+
+    //        let null_task = ~do Task::new_root(&mut sched2.stack_pool, None) {
+    //            // nothing
+    //        };
+
+    //        let main_task = main_task;
+    //        let sched1 = sched1;
+    //        let thread1 = do Thread::start {
+    //            sched1.bootstrap(main_task);
+    //        };
+
+    //        let sched2 = sched2;
+    //        let thread2 = do Thread::start {
+    //            sched2.bootstrap(null_task);
+    //        };
+
+    //        thread1.join();
+    //        thread2.join();
+    //    }
+    //}
+}
diff --git a/src/librustuv/idle.rs b/src/librustuv/idle.rs
index 2445932c026..44b74d05096 100644
--- a/src/librustuv/idle.rs
+++ b/src/librustuv/idle.rs
@@ -97,71 +97,102 @@ impl Drop for IdleWatcher {
 
 #[cfg(test)]
 mod test {
-    use std::rt::tube::Tube;
-    use std::rt::rtio::{Callback, PausableIdleCallback};
+    use std::cast;
+    use std::cell::RefCell;
+    use std::rc::Rc;
+    use std::rt::rtio::{Callback, PausibleIdleCallback};
+    use std::rt::task::{BlockedTask, Task};
+    use std::rt::local::Local;
+    use super::IdleWatcher;
     use super::super::local_loop;
 
-    struct MyCallback(Tube<int>, int);
+    type Chan = Rc<RefCell<(Option<BlockedTask>, uint)>>;
+
+    struct MyCallback(Rc<RefCell<(Option<BlockedTask>, uint)>>, uint);
     impl Callback for MyCallback {
         fn call(&mut self) {
-            match *self {
-                MyCallback(ref mut tube, val) => tube.send(val)
-            }
+            let task = match *self {
+                MyCallback(ref rc, n) => {
+                    let mut slot = rc.borrow().borrow_mut();
+                    match *slot.get() {
+                        (ref mut task, ref mut val) => {
+                            *val = n;
+                            task.take_unwrap()
+                        }
+                    }
+                }
+            };
+            task.wake().map(|t| t.reawaken(true));
         }
     }
 
+    fn mk(v: uint) -> (~IdleWatcher, Chan) {
+        let rc = Rc::from_send(RefCell::new((None, 0)));
+        let cb = ~MyCallback(rc.clone(), v);
+        let cb = cb as ~Callback:;
+        let cb = unsafe { cast::transmute(cb) };
+        (IdleWatcher::new(&mut local_loop().loop_, cb), rc)
+    }
+
+    fn sleep(chan: &Chan) -> uint {
+        let task: ~Task = Local::take();
+        task.deschedule(1, |task| {
+            let mut slot = chan.borrow().borrow_mut();
+            match *slot.get() {
+                (ref mut slot, _) => {
+                    assert!(slot.is_none());
+                    *slot = Some(task);
+                }
+            }
+            Ok(())
+        });
+
+        let slot = chan.borrow().borrow();
+        match *slot.get() { (_, n) => n }
+    }
+
     #[test]
     fn not_used() {
-        let cb = ~MyCallback(Tube::new(), 1);
-        let _idle = IdleWatcher::new(local_loop(), cb as ~Callback);
+        let (_idle, _chan) = mk(1);
     }
 
     #[test]
     fn smoke_test() {
-        let mut tube = Tube::new();
-        let cb = ~MyCallback(tube.clone(), 1);
-        let mut idle = IdleWatcher::new(local_loop(), cb as ~Callback);
+        let (mut idle, chan) = mk(1);
         idle.resume();
-        tube.recv();
+        assert_eq!(sleep(&chan), 1);
     }
 
     #[test] #[should_fail]
     fn smoke_fail() {
-        let tube = Tube::new();
-        let cb = ~MyCallback(tube.clone(), 1);
-        let mut idle = IdleWatcher::new(local_loop(), cb as ~Callback);
+        let (mut idle, _chan) = mk(1);
         idle.resume();
         fail!();
     }
 
     #[test]
     fn fun_combinations_of_methods() {
-        let mut tube = Tube::new();
-        let cb = ~MyCallback(tube.clone(), 1);
-        let mut idle = IdleWatcher::new(local_loop(), cb as ~Callback);
+        let (mut idle, chan) = mk(1);
         idle.resume();
-        tube.recv();
+        assert_eq!(sleep(&chan), 1);
         idle.pause();
         idle.resume();
         idle.resume();
-        tube.recv();
+        assert_eq!(sleep(&chan), 1);
         idle.pause();
         idle.pause();
         idle.resume();
-        tube.recv();
+        assert_eq!(sleep(&chan), 1);
     }
 
     #[test]
     fn pause_pauses() {
-        let mut tube = Tube::new();
-        let cb = ~MyCallback(tube.clone(), 1);
-        let mut idle1 = IdleWatcher::new(local_loop(), cb as ~Callback);
-        let cb = ~MyCallback(tube.clone(), 2);
-        let mut idle2 = IdleWatcher::new(local_loop(), cb as ~Callback);
+        let (mut idle1, chan1) = mk(1);
+        let (mut idle2, chan2) = mk(2);
         idle2.resume();
-        assert_eq!(tube.recv(), 2);
+        assert_eq!(sleep(&chan2), 2);
         idle2.pause();
         idle1.resume();
-        assert_eq!(tube.recv(), 1);
+        assert_eq!(sleep(&chan1), 1);
     }
 }
diff --git a/src/librustuv/lib.rs b/src/librustuv/lib.rs
index 2715f0bd02a..49d695ea3fb 100644
--- a/src/librustuv/lib.rs
+++ b/src/librustuv/lib.rs
@@ -43,6 +43,8 @@ via `close` and `delete` methods.
 
 #[feature(macro_rules)];
 
+#[cfg(test)] extern mod green;
+
 use std::cast;
 use std::io;
 use std::io::IoError;
@@ -392,15 +394,17 @@ pub fn slice_to_uv_buf(v: &[u8]) -> Buf {
     uvll::uv_buf_t { base: data, len: v.len() as uvll::uv_buf_len_t }
 }
 
+// This function is full of lies!
 #[cfg(test)]
-fn local_loop() -> &'static mut Loop {
+fn local_loop() -> &'static mut uvio::UvIoFactory {
     unsafe {
         cast::transmute({
-            let mut sched = Local::borrow(None::<Scheduler>);
+            let mut task = Local::borrow(None::<Task>);
+            let mut io = task.get().local_io().unwrap();
             let (_vtable, uvio): (uint, &'static mut uvio::UvIoFactory) =
-                cast::transmute(sched.get().event_loop.io().unwrap());
+                cast::transmute(io.get());
             uvio
-        }.uv_loop())
+        })
     }
 }
 
diff --git a/src/librustuv/net.rs b/src/librustuv/net.rs
index caf9ee0aac6..85e9202c1fa 100644
--- a/src/librustuv/net.rs
+++ b/src/librustuv/net.rs
@@ -86,21 +86,19 @@ pub fn sockaddr_to_socket_addr(addr: *sockaddr) -> SocketAddr {
     }
 }
 
-#[cfg(test)]
 #[test]
 fn test_ip4_conversion() {
-    use std::rt;
-    let ip4 = rt::test::next_test_ip4();
+    use std::io::net::ip::{SocketAddr, Ipv4Addr};
+    let ip4 = SocketAddr { ip: Ipv4Addr(127, 0, 0, 1), port: 4824 };
     socket_addr_as_sockaddr(ip4, |addr| {
         assert_eq!(ip4, sockaddr_to_socket_addr(addr));
     })
 }
 
-#[cfg(test)]
 #[test]
 fn test_ip6_conversion() {
-    use std::rt;
-    let ip6 = rt::test::next_test_ip6();
+    use std::io::net::ip::{SocketAddr, Ipv6Addr};
+    let ip6 = SocketAddr { ip: Ipv6Addr(0, 0, 0, 0, 0, 0, 0, 1), port: 4824 };
     socket_addr_as_sockaddr(ip6, |addr| {
         assert_eq!(ip6, sockaddr_to_socket_addr(addr));
     })
@@ -634,16 +632,13 @@ impl Drop for UdpWatcher {
     }
 }
 
-////////////////////////////////////////////////////////////////////////////////
-/// UV request support
-////////////////////////////////////////////////////////////////////////////////
-
 #[cfg(test)]
 mod test {
     use std::rt::rtio::{RtioTcpStream, RtioTcpListener, RtioTcpAcceptor,
                         RtioUdpSocket};
-    use std::task;
+    use std::io::test::{next_test_ip4, next_test_ip6};
 
+    use super::{UdpWatcher, TcpWatcher, TcpListener};
     use super::super::local_loop;
 
     #[test]
@@ -834,20 +829,18 @@ mod test {
             }
         }
 
-        do spawn {
-            port.recv();
-            let mut stream = TcpWatcher::connect(local_loop(), addr).unwrap();
-            let mut buf = [0, .. 2048];
-            let mut total_bytes_read = 0;
-            while total_bytes_read < MAX {
-                let nread = stream.read(buf).unwrap();
-                total_bytes_read += nread;
-                for i in range(0u, nread) {
-                    assert_eq!(buf[i], 1);
-                }
+        port.recv();
+        let mut stream = TcpWatcher::connect(local_loop(), addr).unwrap();
+        let mut buf = [0, .. 2048];
+        let mut total_bytes_read = 0;
+        while total_bytes_read < MAX {
+            let nread = stream.read(buf).unwrap();
+            total_bytes_read += nread;
+            for i in range(0u, nread) {
+                assert_eq!(buf[i], 1);
             }
-            uvdebug!("read {} bytes total", total_bytes_read);
         }
+        uvdebug!("read {} bytes total", total_bytes_read);
     }
 
     #[test]
@@ -913,65 +906,35 @@ mod test {
             assert!(total_bytes_sent >= MAX);
         }
 
-        do spawn {
-            let l = local_loop();
-            let mut client_out = UdpWatcher::bind(l, client_out_addr).unwrap();
-            let mut client_in = UdpWatcher::bind(l, client_in_addr).unwrap();
-            let (port, chan) = (p2, c1);
-            port.recv();
-            chan.send(());
-            let mut total_bytes_recv = 0;
-            let mut buf = [0, .. 2048];
-            while total_bytes_recv < MAX {
-                // ask for more
-                assert!(client_out.sendto([1], server_in_addr).is_ok());
-                // wait for data
-                let res = client_in.recvfrom(buf);
-                assert!(res.is_ok());
-                let (nread, src) = res.unwrap();
-                assert_eq!(src, server_out_addr);
-                total_bytes_recv += nread;
-                for i in range(0u, nread) {
-                    assert_eq!(buf[i], 1);
-                }
+        let l = local_loop();
+        let mut client_out = UdpWatcher::bind(l, client_out_addr).unwrap();
+        let mut client_in = UdpWatcher::bind(l, client_in_addr).unwrap();
+        let (port, chan) = (p2, c1);
+        port.recv();
+        chan.send(());
+        let mut total_bytes_recv = 0;
+        let mut buf = [0, .. 2048];
+        while total_bytes_recv < MAX {
+            // ask for more
+            assert!(client_out.sendto([1], server_in_addr).is_ok());
+            // wait for data
+            let res = client_in.recvfrom(buf);
+            assert!(res.is_ok());
+            let (nread, src) = res.unwrap();
+            assert_eq!(src, server_out_addr);
+            total_bytes_recv += nread;
+            for i in range(0u, nread) {
+                assert_eq!(buf[i], 1);
             }
-            // tell the server we're done
-            assert!(client_out.sendto([0], server_in_addr).is_ok());
         }
+        // tell the server we're done
+        assert!(client_out.sendto([0], server_in_addr).is_ok());
     }
 
     #[test]
     fn test_read_and_block() {
         let addr = next_test_ip4();
-        let (port, chan) = Chan::new();
-
-        do spawn {
-            let listener = TcpListener::bind(local_loop(), addr).unwrap();
-            let mut acceptor = listener.listen().unwrap();
-            let (port2, chan2) = Chan::new();
-            chan.send(port2);
-            let mut stream = acceptor.accept().unwrap();
-            let mut buf = [0, .. 2048];
-
-            let expected = 32;
-            let mut current = 0;
-            let mut reads = 0;
-
-            while current < expected {
-                let nread = stream.read(buf).unwrap();
-                for i in range(0u, nread) {
-                    let val = buf[i] as uint;
-                    assert_eq!(val, current % 8);
-                    current += 1;
-                }
-                reads += 1;
-
-                chan2.send(());
-            }
-
-            // Make sure we had multiple reads
-            assert!(reads > 1);
-        }
+        let (port, chan) = Chan::<Port<()>>::new();
 
         do spawn {
             let port2 = port.recv();
@@ -983,13 +946,39 @@ mod test {
             stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
             port2.recv();
         }
+
+        let listener = TcpListener::bind(local_loop(), addr).unwrap();
+        let mut acceptor = listener.listen().unwrap();
+        let (port2, chan2) = Chan::new();
+        chan.send(port2);
+        let mut stream = acceptor.accept().unwrap();
+        let mut buf = [0, .. 2048];
+
+        let expected = 32;
+        let mut current = 0;
+        let mut reads = 0;
+
+        while current < expected {
+            let nread = stream.read(buf).unwrap();
+            for i in range(0u, nread) {
+                let val = buf[i] as uint;
+                assert_eq!(val, current % 8);
+                current += 1;
+            }
+            reads += 1;
+
+            chan2.try_send(());
+        }
+
+        // Make sure we had multiple reads
+        assert!(reads > 1);
     }
 
     #[test]
     fn test_simple_tcp_server_and_client_on_diff_threads() {
         let addr = next_test_ip4();
 
-        do task::spawn_sched(task::SingleThreaded) {
+        do spawn {
             let listener = TcpListener::bind(local_loop(), addr).unwrap();
             let mut acceptor = listener.listen().unwrap();
             let mut stream = acceptor.accept().unwrap();
@@ -1001,131 +990,11 @@ mod test {
             }
         }
 
-        do task::spawn_sched(task::SingleThreaded) {
-            let mut stream = TcpWatcher::connect(local_loop(), addr);
-            while stream.is_err() {
-                stream = TcpWatcher::connect(local_loop(), addr);
-            }
-            stream.unwrap().write([0, 1, 2, 3, 4, 5, 6, 7]);
-        }
-    }
-
-    // On one thread, create a udp socket. Then send that socket to another
-    // thread and destroy the socket on the remote thread. This should make sure
-    // that homing kicks in for the socket to go back home to the original
-    // thread, close itself, and then come back to the last thread.
-    #[test]
-    fn test_homing_closes_correctly() {
-        let (port, chan) = Chan::new();
-
-        do task::spawn_sched(task::SingleThreaded) {
-            let listener = UdpWatcher::bind(local_loop(), next_test_ip4()).unwrap();
-            chan.send(listener);
-        }
-
-        do task::spawn_sched(task::SingleThreaded) {
-            port.recv();
-        }
-    }
-
-    // This is a bit of a crufty old test, but it has its uses.
-    #[test]
-    fn test_simple_homed_udp_io_bind_then_move_task_then_home_and_close() {
-        use std::cast;
-        use std::rt::local::Local;
-        use std::rt::rtio::{EventLoop, IoFactory};
-        use std::rt::sched::Scheduler;
-        use std::rt::sched::{Shutdown, TaskFromFriend};
-        use std::rt::sleeper_list::SleeperList;
-        use std::rt::task::Task;
-        use std::rt::thread::Thread;
-        use std::rt::deque::BufferPool;
-        use std::task::TaskResult;
-        use std::unstable::run_in_bare_thread;
-        use uvio::UvEventLoop;
-
-        do run_in_bare_thread {
-            let sleepers = SleeperList::new();
-            let mut pool = BufferPool::new();
-            let (worker1, stealer1) = pool.deque();
-            let (worker2, stealer2) = pool.deque();
-            let queues = ~[stealer1, stealer2];
-
-            let loop1 = ~UvEventLoop::new() as ~EventLoop;
-            let mut sched1 = ~Scheduler::new(loop1, worker1, queues.clone(),
-                                             sleepers.clone());
-            let loop2 = ~UvEventLoop::new() as ~EventLoop;
-            let mut sched2 = ~Scheduler::new(loop2, worker2, queues.clone(),
-                                             sleepers.clone());
-
-            let handle1 = sched1.make_handle();
-            let handle2 = sched2.make_handle();
-            let tasksFriendHandle = sched2.make_handle();
-
-            let on_exit: proc(TaskResult) = proc(exit_status) {
-                let mut handle1 = handle1;
-                let mut handle2 = handle2;
-                handle1.send(Shutdown);
-                handle2.send(Shutdown);
-                assert!(exit_status.is_ok());
-            };
-
-            unsafe fn local_io() -> &'static mut IoFactory {
-                let mut sched = Local::borrow(None::<Scheduler>);
-                let io = sched.get().event_loop.io();
-                cast::transmute(io.unwrap())
-            }
-
-            let test_function: proc() = proc() {
-                let io = unsafe { local_io() };
-                let addr = next_test_ip4();
-                let maybe_socket = io.udp_bind(addr);
-                // this socket is bound to this event loop
-                assert!(maybe_socket.is_ok());
-
-                // block self on sched1
-                let scheduler: ~Scheduler = Local::take();
-                let mut tasksFriendHandle = Some(tasksFriendHandle);
-                scheduler.deschedule_running_task_and_then(|_, task| {
-                    // unblock task
-                    task.wake().map(|task| {
-                        // send self to sched2
-                        tasksFriendHandle.take_unwrap()
-                                         .send(TaskFromFriend(task));
-                    });
-                    // sched1 should now sleep since it has nothing else to do
-                })
-                // sched2 will wake up and get the task as we do nothing else,
-                // the function ends and the socket goes out of scope sched2
-                // will start to run the destructor the destructor will first
-                // block the task, set it's home as sched1, then enqueue it
-                // sched2 will dequeue the task, see that it has a home, and
-                // send it to sched1 sched1 will wake up, exec the close
-                // function on the correct loop, and then we're done
-            };
-
-            let mut main_task = ~Task::new_root(&mut sched1.stack_pool, None,
-                                                test_function);
-            main_task.death.on_exit = Some(on_exit);
-
-            let null_task = ~do Task::new_root(&mut sched2.stack_pool, None) {
-                // nothing
-            };
-
-            let main_task = main_task;
-            let sched1 = sched1;
-            let thread1 = do Thread::start {
-                sched1.bootstrap(main_task);
-            };
-
-            let sched2 = sched2;
-            let thread2 = do Thread::start {
-                sched2.bootstrap(null_task);
-            };
-
-            thread1.join();
-            thread2.join();
+        let mut stream = TcpWatcher::connect(local_loop(), addr);
+        while stream.is_err() {
+            stream = TcpWatcher::connect(local_loop(), addr);
         }
+        stream.unwrap().write([0, 1, 2, 3, 4, 5, 6, 7]);
     }
 
     #[should_fail] #[test]
@@ -1167,7 +1036,7 @@ mod test {
         // force the handle to be created on a different scheduler, failure in
         // the original task will force a homing operation back to this
         // scheduler.
-        do task::spawn_sched(task::SingleThreaded) {
+        do spawn {
             let w = UdpWatcher::bind(local_loop(), addr).unwrap();
             chan.send(w);
         }
@@ -1175,67 +1044,4 @@ mod test {
         let _w = port.recv();
         fail!();
     }
-
-    #[should_fail]
-    #[test]
-    #[ignore(reason = "linked failure")]
-    fn linked_failure1() {
-        let (port, chan) = Chan::new();
-        let addr = next_test_ip4();
-
-        do spawn {
-            let w = TcpListener::bind(local_loop(), addr).unwrap();
-            let mut w = w.listen().unwrap();
-            chan.send(());
-            w.accept();
-        }
-
-        port.recv();
-        fail!();
-    }
-
-    #[should_fail]
-    #[test]
-    #[ignore(reason = "linked failure")]
-    fn linked_failure2() {
-        let (port, chan) = Chan::new();
-        let addr = next_test_ip4();
-
-        do spawn {
-            let w = TcpListener::bind(local_loop(), addr).unwrap();
-            let mut w = w.listen().unwrap();
-            chan.send(());
-            let mut buf = [0];
-            w.accept().unwrap().read(buf);
-        }
-
-        port.recv();
-        let _w = TcpWatcher::connect(local_loop(), addr).unwrap();
-
-        fail!();
-    }
-
-    #[should_fail]
-    #[test]
-    #[ignore(reason = "linked failure")]
-    fn linked_failure3() {
-        let (port, chan) = Chan::new();
-        let addr = next_test_ip4();
-
-        do spawn {
-            let chan = chan;
-            let w = TcpListener::bind(local_loop(), addr).unwrap();
-            let mut w = w.listen().unwrap();
-            chan.send(());
-            let mut conn = w.accept().unwrap();
-            chan.send(());
-            let buf = [0, ..65536];
-            conn.write(buf);
-        }
-
-        port.recv();
-        let _w = TcpWatcher::connect(local_loop(), addr).unwrap();
-        port.recv();
-        fail!();
-    }
 }
diff --git a/src/librustuv/pipe.rs b/src/librustuv/pipe.rs
index 6975ef26bd7..0bc198a4a3f 100644
--- a/src/librustuv/pipe.rs
+++ b/src/librustuv/pipe.rs
@@ -235,8 +235,9 @@ impl HomingIO for PipeAcceptor {
 #[cfg(test)]
 mod tests {
     use std::rt::rtio::{RtioUnixListener, RtioUnixAcceptor, RtioPipe};
-    use std::rt::test::next_test_unix;
+    use std::io::test::next_test_unix;
 
+    use super::{PipeWatcher, PipeListener};
     use super::super::local_loop;
 
     #[test]
diff --git a/src/librustuv/signal.rs b/src/librustuv/signal.rs
index 27dbc0fe3bb..b53acd4ebd6 100644
--- a/src/librustuv/signal.rs
+++ b/src/librustuv/signal.rs
@@ -52,7 +52,7 @@ impl SignalWatcher {
 extern fn signal_cb(handle: *uvll::uv_signal_t, signum: c_int) {
     let s: &mut SignalWatcher = unsafe { UvHandle::from_uv_handle(&handle) };
     assert_eq!(signum as int, s.signal as int);
-    s.channel.send_deferred(s.signal);
+    s.channel.try_send_deferred(s.signal);
 }
 
 impl HomingIO for SignalWatcher {
@@ -76,6 +76,7 @@ impl Drop for SignalWatcher {
 mod test {
     use super::super::local_loop;
     use std::io::signal;
+    use super::SignalWatcher;
 
     #[test]
     fn closing_channel_during_drop_doesnt_kill_everything() {
diff --git a/src/librustuv/timer.rs b/src/librustuv/timer.rs
index 1e70c5c55e0..d3a190df8be 100644
--- a/src/librustuv/timer.rs
+++ b/src/librustuv/timer.rs
@@ -179,6 +179,7 @@ impl Drop for TimerWatcher {
 mod test {
     use std::rt::rtio::RtioTimer;
     use super::super::local_loop;
+    use super::TimerWatcher;
 
     #[test]
     fn oneshot() {
diff --git a/src/librustuv/uvio.rs b/src/librustuv/uvio.rs
index 52e0b5ed77b..57bb0cfdc7a 100644
--- a/src/librustuv/uvio.rs
+++ b/src/librustuv/uvio.rs
@@ -102,6 +102,7 @@ pub extern "C" fn new_loop() -> ~rtio::EventLoop {
 
 #[test]
 fn test_callback_run_once() {
+    use std::rt::rtio::EventLoop;
     do run_in_bare_thread {
         let mut event_loop = UvEventLoop::new();
         let mut count = 0;
diff --git a/src/libstd/io/mod.rs b/src/libstd/io/mod.rs
index 8481de73c7f..2d52986294d 100644
--- a/src/libstd/io/mod.rs
+++ b/src/libstd/io/mod.rs
@@ -313,9 +313,8 @@ pub use self::net::udp::UdpStream;
 pub use self::pipe::PipeStream;
 pub use self::process::Process;
 
-/// Testing helpers
-#[cfg(test)]
-mod test;
+/// Various utility functions useful for writing I/O tests
+pub mod test;
 
 /// Synchronous, non-blocking filesystem operations.
 pub mod fs;