diff options
| author | Alex Crichton <alex@alexcrichton.com> | 2013-12-13 11:30:59 -0800 |
|---|---|---|
| committer | Alex Crichton <alex@alexcrichton.com> | 2013-12-24 19:59:53 -0800 |
| commit | afd4e2ad8dc4112b99c8d30996ff0bb5b0516b53 (patch) | |
| tree | 9e4bd858bbb5e3c0b3526c0976d0ca032217c16a /src | |
| parent | f5d9b2ca6d9a360112f06b3044897c22736c52b8 (diff) | |
| download | rust-afd4e2ad8dc4112b99c8d30996ff0bb5b0516b53.tar.gz rust-afd4e2ad8dc4112b99c8d30996ff0bb5b0516b53.zip | |
rustuv: Get all tests passing again after refactor
All tests except for the homing tests are now working again with the librustuv/libgreen refactoring. The homing-related tests are currently commented out and now placed in the rustuv::homing module. I plan on refactoring scheduler pool spawning in order to enable more homing tests in a future commit.
Diffstat (limited to 'src')
| -rw-r--r-- | src/librustuv/addrinfo.rs | 7 | ||||
| -rw-r--r-- | src/librustuv/async.rs | 13 | ||||
| -rw-r--r-- | src/librustuv/file.rs | 12 | ||||
| -rw-r--r-- | src/librustuv/homing.rs | 121 | ||||
| -rw-r--r-- | src/librustuv/idle.rs | 87 | ||||
| -rw-r--r-- | src/librustuv/lib.rs | 12 | ||||
| -rw-r--r-- | src/librustuv/net.rs | 334 | ||||
| -rw-r--r-- | src/librustuv/pipe.rs | 3 | ||||
| -rw-r--r-- | src/librustuv/signal.rs | 3 | ||||
| -rw-r--r-- | src/librustuv/timer.rs | 1 | ||||
| -rw-r--r-- | src/librustuv/uvio.rs | 1 | ||||
| -rw-r--r-- | src/libstd/io/mod.rs | 5 |
12 files changed, 286 insertions, 313 deletions
diff --git a/src/librustuv/addrinfo.rs b/src/librustuv/addrinfo.rs index f4c12c6f2a3..f6fad524b5c 100644 --- a/src/librustuv/addrinfo.rs +++ b/src/librustuv/addrinfo.rs @@ -186,10 +186,12 @@ pub fn accum_addrinfo(addr: &Addrinfo) -> ~[ai::Info] { mod test { use std::io::net::ip::{SocketAddr, Ipv4Addr}; use super::super::local_loop; + use super::GetAddrInfoRequest; #[test] fn getaddrinfo_test() { - match GetAddrInfoRequest::run(local_loop(), Some("localhost"), None, None) { + let loop_ = &mut local_loop().loop_; + match GetAddrInfoRequest::run(loop_, Some("localhost"), None, None) { Ok(infos) => { let mut found_local = false; let local_addr = &SocketAddr { @@ -207,9 +209,10 @@ mod test { #[test] fn issue_10663() { + let loop_ = &mut local_loop().loop_; // Something should happen here, but this certainly shouldn't cause // everything to die. The actual outcome we don't care too much about. - GetAddrInfoRequest::run(local_loop(), Some("irc.n0v4.com"), None, + GetAddrInfoRequest::run(loop_, Some("irc.n0v4.com"), None, None); } } diff --git a/src/librustuv/async.rs b/src/librustuv/async.rs index 2d770ff6be1..0c353785982 100644 --- a/src/librustuv/async.rs +++ b/src/librustuv/async.rs @@ -127,15 +127,15 @@ impl Drop for AsyncWatcher { mod test_remote { use std::rt::rtio::Callback; use std::rt::thread::Thread; - use std::rt::tube::Tube; + use super::AsyncWatcher; use super::super::local_loop; // Make sure that we can fire watchers in remote threads and that they // actually trigger what they say they will. #[test] fn smoke_test() { - struct MyCallback(Option<Tube<int>>); + struct MyCallback(Option<Chan<int>>); impl Callback for MyCallback { fn call(&mut self) { // this can get called more than once, but we only want to send @@ -146,16 +146,17 @@ mod test_remote { } } - let mut tube = Tube::new(); - let cb = ~MyCallback(Some(tube.clone())); - let watcher = AsyncWatcher::new(local_loop(), cb as ~Callback); + let (port, chan) = Chan::new(); + let cb = ~MyCallback(Some(chan)); + let watcher = AsyncWatcher::new(&mut local_loop().loop_, + cb as ~Callback); let thread = do Thread::start { let mut watcher = watcher; watcher.fire(); }; - assert_eq!(tube.recv(), 1); + assert_eq!(port.recv(), 1); thread.join(); } } diff --git a/src/librustuv/file.rs b/src/librustuv/file.rs index cebf4f199e4..059bf072a1a 100644 --- a/src/librustuv/file.rs +++ b/src/librustuv/file.rs @@ -448,7 +448,11 @@ mod test { use std::io; use std::str; use std::vec; - use l = super::super::local_loop; + use super::FsRequest; + use super::super::Loop; + use super::super::local_loop; + + fn l() -> &mut Loop { &mut local_loop().loop_ } #[test] fn file_test_full_simple_sync() { @@ -459,7 +463,7 @@ mod test { { // open/create - let result = FsRequest::open(l(), &path_str.to_c_str(), + let result = FsRequest::open(local_loop(), &path_str.to_c_str(), create_flags as int, mode as int); assert!(result.is_ok()); let result = result.unwrap(); @@ -472,7 +476,7 @@ mod test { { // re-open - let result = FsRequest::open(l(), &path_str.to_c_str(), + let result = FsRequest::open(local_loop(), &path_str.to_c_str(), read_flags as int, 0); assert!(result.is_ok()); let result = result.unwrap(); @@ -499,7 +503,7 @@ mod test { let create_flags = (O_RDWR | O_CREAT) as int; let mode = (S_IWUSR | S_IRUSR) as int; - let result = FsRequest::open(l(), path, create_flags, mode); + let result = FsRequest::open(local_loop(), path, create_flags, mode); assert!(result.is_ok()); let file = result.unwrap(); diff --git a/src/librustuv/homing.rs b/src/librustuv/homing.rs index 7a12c4ad06d..d09dfae0b29 100644 --- a/src/librustuv/homing.rs +++ b/src/librustuv/homing.rs @@ -142,3 +142,124 @@ impl Drop for HomingMissile { self.check("task moved away from the home scheduler"); } } + +#[cfg(test)] +mod test { + // On one thread, create a udp socket. Then send that socket to another + // thread and destroy the socket on the remote thread. This should make sure + // that homing kicks in for the socket to go back home to the original + // thread, close itself, and then come back to the last thread. + //#[test] + //fn test_homing_closes_correctly() { + // let (port, chan) = Chan::new(); + + // do task::spawn_sched(task::SingleThreaded) { + // let listener = UdpWatcher::bind(local_loop(), next_test_ip4()).unwrap(); + // chan.send(listener); + // } + + // do task::spawn_sched(task::SingleThreaded) { + // port.recv(); + // } + //} + + // This is a bit of a crufty old test, but it has its uses. + //#[test] + //fn test_simple_homed_udp_io_bind_then_move_task_then_home_and_close() { + // use std::cast; + // use std::rt::local::Local; + // use std::rt::rtio::{EventLoop, IoFactory}; + // use std::rt::sched::Scheduler; + // use std::rt::sched::{Shutdown, TaskFromFriend}; + // use std::rt::sleeper_list::SleeperList; + // use std::rt::task::Task; + // use std::rt::task::UnwindResult; + // use std::rt::thread::Thread; + // use std::rt::deque::BufferPool; + // use std::unstable::run_in_bare_thread; + // use uvio::UvEventLoop; + + // do run_in_bare_thread { + // let sleepers = SleeperList::new(); + // let mut pool = BufferPool::new(); + // let (worker1, stealer1) = pool.deque(); + // let (worker2, stealer2) = pool.deque(); + // let queues = ~[stealer1, stealer2]; + + // let loop1 = ~UvEventLoop::new() as ~EventLoop; + // let mut sched1 = ~Scheduler::new(loop1, worker1, queues.clone(), + // sleepers.clone()); + // let loop2 = ~UvEventLoop::new() as ~EventLoop; + // let mut sched2 = ~Scheduler::new(loop2, worker2, queues.clone(), + // sleepers.clone()); + + // let handle1 = sched1.make_handle(); + // let handle2 = sched2.make_handle(); + // let tasksFriendHandle = sched2.make_handle(); + + // let on_exit: proc(UnwindResult) = proc(exit_status) { + // let mut handle1 = handle1; + // let mut handle2 = handle2; + // handle1.send(Shutdown); + // handle2.send(Shutdown); + // assert!(exit_status.is_success()); + // }; + + // unsafe fn local_io() -> &'static mut IoFactory { + // let mut sched = Local::borrow(None::<Scheduler>); + // let io = sched.get().event_loop.io(); + // cast::transmute(io.unwrap()) + // } + + // let test_function: proc() = proc() { + // let io = unsafe { local_io() }; + // let addr = next_test_ip4(); + // let maybe_socket = io.udp_bind(addr); + // // this socket is bound to this event loop + // assert!(maybe_socket.is_ok()); + + // // block self on sched1 + // let scheduler: ~Scheduler = Local::take(); + // let mut tasksFriendHandle = Some(tasksFriendHandle); + // scheduler.deschedule_running_task_and_then(|_, task| { + // // unblock task + // task.wake().map(|task| { + // // send self to sched2 + // tasksFriendHandle.take_unwrap() + // .send(TaskFromFriend(task)); + // }); + // // sched1 should now sleep since it has nothing else to do + // }) + // // sched2 will wake up and get the task as we do nothing else, + // // the function ends and the socket goes out of scope sched2 + // // will start to run the destructor the destructor will first + // // block the task, set it's home as sched1, then enqueue it + // // sched2 will dequeue the task, see that it has a home, and + // // send it to sched1 sched1 will wake up, exec the close + // // function on the correct loop, and then we're done + // }; + + // let mut main_task = ~Task::new_root(&mut sched1.stack_pool, None, + // test_function); + // main_task.death.on_exit = Some(on_exit); + + // let null_task = ~do Task::new_root(&mut sched2.stack_pool, None) { + // // nothing + // }; + + // let main_task = main_task; + // let sched1 = sched1; + // let thread1 = do Thread::start { + // sched1.bootstrap(main_task); + // }; + + // let sched2 = sched2; + // let thread2 = do Thread::start { + // sched2.bootstrap(null_task); + // }; + + // thread1.join(); + // thread2.join(); + // } + //} +} diff --git a/src/librustuv/idle.rs b/src/librustuv/idle.rs index 2445932c026..44b74d05096 100644 --- a/src/librustuv/idle.rs +++ b/src/librustuv/idle.rs @@ -97,71 +97,102 @@ impl Drop for IdleWatcher { #[cfg(test)] mod test { - use std::rt::tube::Tube; - use std::rt::rtio::{Callback, PausableIdleCallback}; + use std::cast; + use std::cell::RefCell; + use std::rc::Rc; + use std::rt::rtio::{Callback, PausibleIdleCallback}; + use std::rt::task::{BlockedTask, Task}; + use std::rt::local::Local; + use super::IdleWatcher; use super::super::local_loop; - struct MyCallback(Tube<int>, int); + type Chan = Rc<RefCell<(Option<BlockedTask>, uint)>>; + + struct MyCallback(Rc<RefCell<(Option<BlockedTask>, uint)>>, uint); impl Callback for MyCallback { fn call(&mut self) { - match *self { - MyCallback(ref mut tube, val) => tube.send(val) - } + let task = match *self { + MyCallback(ref rc, n) => { + let mut slot = rc.borrow().borrow_mut(); + match *slot.get() { + (ref mut task, ref mut val) => { + *val = n; + task.take_unwrap() + } + } + } + }; + task.wake().map(|t| t.reawaken(true)); } } + fn mk(v: uint) -> (~IdleWatcher, Chan) { + let rc = Rc::from_send(RefCell::new((None, 0))); + let cb = ~MyCallback(rc.clone(), v); + let cb = cb as ~Callback:; + let cb = unsafe { cast::transmute(cb) }; + (IdleWatcher::new(&mut local_loop().loop_, cb), rc) + } + + fn sleep(chan: &Chan) -> uint { + let task: ~Task = Local::take(); + task.deschedule(1, |task| { + let mut slot = chan.borrow().borrow_mut(); + match *slot.get() { + (ref mut slot, _) => { + assert!(slot.is_none()); + *slot = Some(task); + } + } + Ok(()) + }); + + let slot = chan.borrow().borrow(); + match *slot.get() { (_, n) => n } + } + #[test] fn not_used() { - let cb = ~MyCallback(Tube::new(), 1); - let _idle = IdleWatcher::new(local_loop(), cb as ~Callback); + let (_idle, _chan) = mk(1); } #[test] fn smoke_test() { - let mut tube = Tube::new(); - let cb = ~MyCallback(tube.clone(), 1); - let mut idle = IdleWatcher::new(local_loop(), cb as ~Callback); + let (mut idle, chan) = mk(1); idle.resume(); - tube.recv(); + assert_eq!(sleep(&chan), 1); } #[test] #[should_fail] fn smoke_fail() { - let tube = Tube::new(); - let cb = ~MyCallback(tube.clone(), 1); - let mut idle = IdleWatcher::new(local_loop(), cb as ~Callback); + let (mut idle, _chan) = mk(1); idle.resume(); fail!(); } #[test] fn fun_combinations_of_methods() { - let mut tube = Tube::new(); - let cb = ~MyCallback(tube.clone(), 1); - let mut idle = IdleWatcher::new(local_loop(), cb as ~Callback); + let (mut idle, chan) = mk(1); idle.resume(); - tube.recv(); + assert_eq!(sleep(&chan), 1); idle.pause(); idle.resume(); idle.resume(); - tube.recv(); + assert_eq!(sleep(&chan), 1); idle.pause(); idle.pause(); idle.resume(); - tube.recv(); + assert_eq!(sleep(&chan), 1); } #[test] fn pause_pauses() { - let mut tube = Tube::new(); - let cb = ~MyCallback(tube.clone(), 1); - let mut idle1 = IdleWatcher::new(local_loop(), cb as ~Callback); - let cb = ~MyCallback(tube.clone(), 2); - let mut idle2 = IdleWatcher::new(local_loop(), cb as ~Callback); + let (mut idle1, chan1) = mk(1); + let (mut idle2, chan2) = mk(2); idle2.resume(); - assert_eq!(tube.recv(), 2); + assert_eq!(sleep(&chan2), 2); idle2.pause(); idle1.resume(); - assert_eq!(tube.recv(), 1); + assert_eq!(sleep(&chan1), 1); } } diff --git a/src/librustuv/lib.rs b/src/librustuv/lib.rs index 2715f0bd02a..49d695ea3fb 100644 --- a/src/librustuv/lib.rs +++ b/src/librustuv/lib.rs @@ -43,6 +43,8 @@ via `close` and `delete` methods. #[feature(macro_rules)]; +#[cfg(test)] extern mod green; + use std::cast; use std::io; use std::io::IoError; @@ -392,15 +394,17 @@ pub fn slice_to_uv_buf(v: &[u8]) -> Buf { uvll::uv_buf_t { base: data, len: v.len() as uvll::uv_buf_len_t } } +// This function is full of lies! #[cfg(test)] -fn local_loop() -> &'static mut Loop { +fn local_loop() -> &'static mut uvio::UvIoFactory { unsafe { cast::transmute({ - let mut sched = Local::borrow(None::<Scheduler>); + let mut task = Local::borrow(None::<Task>); + let mut io = task.get().local_io().unwrap(); let (_vtable, uvio): (uint, &'static mut uvio::UvIoFactory) = - cast::transmute(sched.get().event_loop.io().unwrap()); + cast::transmute(io.get()); uvio - }.uv_loop()) + }) } } diff --git a/src/librustuv/net.rs b/src/librustuv/net.rs index caf9ee0aac6..85e9202c1fa 100644 --- a/src/librustuv/net.rs +++ b/src/librustuv/net.rs @@ -86,21 +86,19 @@ pub fn sockaddr_to_socket_addr(addr: *sockaddr) -> SocketAddr { } } -#[cfg(test)] #[test] fn test_ip4_conversion() { - use std::rt; - let ip4 = rt::test::next_test_ip4(); + use std::io::net::ip::{SocketAddr, Ipv4Addr}; + let ip4 = SocketAddr { ip: Ipv4Addr(127, 0, 0, 1), port: 4824 }; socket_addr_as_sockaddr(ip4, |addr| { assert_eq!(ip4, sockaddr_to_socket_addr(addr)); }) } -#[cfg(test)] #[test] fn test_ip6_conversion() { - use std::rt; - let ip6 = rt::test::next_test_ip6(); + use std::io::net::ip::{SocketAddr, Ipv6Addr}; + let ip6 = SocketAddr { ip: Ipv6Addr(0, 0, 0, 0, 0, 0, 0, 1), port: 4824 }; socket_addr_as_sockaddr(ip6, |addr| { assert_eq!(ip6, sockaddr_to_socket_addr(addr)); }) @@ -634,16 +632,13 @@ impl Drop for UdpWatcher { } } -//////////////////////////////////////////////////////////////////////////////// -/// UV request support -//////////////////////////////////////////////////////////////////////////////// - #[cfg(test)] mod test { use std::rt::rtio::{RtioTcpStream, RtioTcpListener, RtioTcpAcceptor, RtioUdpSocket}; - use std::task; + use std::io::test::{next_test_ip4, next_test_ip6}; + use super::{UdpWatcher, TcpWatcher, TcpListener}; use super::super::local_loop; #[test] @@ -834,20 +829,18 @@ mod test { } } - do spawn { - port.recv(); - let mut stream = TcpWatcher::connect(local_loop(), addr).unwrap(); - let mut buf = [0, .. 2048]; - let mut total_bytes_read = 0; - while total_bytes_read < MAX { - let nread = stream.read(buf).unwrap(); - total_bytes_read += nread; - for i in range(0u, nread) { - assert_eq!(buf[i], 1); - } + port.recv(); + let mut stream = TcpWatcher::connect(local_loop(), addr).unwrap(); + let mut buf = [0, .. 2048]; + let mut total_bytes_read = 0; + while total_bytes_read < MAX { + let nread = stream.read(buf).unwrap(); + total_bytes_read += nread; + for i in range(0u, nread) { + assert_eq!(buf[i], 1); } - uvdebug!("read {} bytes total", total_bytes_read); } + uvdebug!("read {} bytes total", total_bytes_read); } #[test] @@ -913,65 +906,35 @@ mod test { assert!(total_bytes_sent >= MAX); } - do spawn { - let l = local_loop(); - let mut client_out = UdpWatcher::bind(l, client_out_addr).unwrap(); - let mut client_in = UdpWatcher::bind(l, client_in_addr).unwrap(); - let (port, chan) = (p2, c1); - port.recv(); - chan.send(()); - let mut total_bytes_recv = 0; - let mut buf = [0, .. 2048]; - while total_bytes_recv < MAX { - // ask for more - assert!(client_out.sendto([1], server_in_addr).is_ok()); - // wait for data - let res = client_in.recvfrom(buf); - assert!(res.is_ok()); - let (nread, src) = res.unwrap(); - assert_eq!(src, server_out_addr); - total_bytes_recv += nread; - for i in range(0u, nread) { - assert_eq!(buf[i], 1); - } + let l = local_loop(); + let mut client_out = UdpWatcher::bind(l, client_out_addr).unwrap(); + let mut client_in = UdpWatcher::bind(l, client_in_addr).unwrap(); + let (port, chan) = (p2, c1); + port.recv(); + chan.send(()); + let mut total_bytes_recv = 0; + let mut buf = [0, .. 2048]; + while total_bytes_recv < MAX { + // ask for more + assert!(client_out.sendto([1], server_in_addr).is_ok()); + // wait for data + let res = client_in.recvfrom(buf); + assert!(res.is_ok()); + let (nread, src) = res.unwrap(); + assert_eq!(src, server_out_addr); + total_bytes_recv += nread; + for i in range(0u, nread) { + assert_eq!(buf[i], 1); } - // tell the server we're done - assert!(client_out.sendto([0], server_in_addr).is_ok()); } + // tell the server we're done + assert!(client_out.sendto([0], server_in_addr).is_ok()); } #[test] fn test_read_and_block() { let addr = next_test_ip4(); - let (port, chan) = Chan::new(); - - do spawn { - let listener = TcpListener::bind(local_loop(), addr).unwrap(); - let mut acceptor = listener.listen().unwrap(); - let (port2, chan2) = Chan::new(); - chan.send(port2); - let mut stream = acceptor.accept().unwrap(); - let mut buf = [0, .. 2048]; - - let expected = 32; - let mut current = 0; - let mut reads = 0; - - while current < expected { - let nread = stream.read(buf).unwrap(); - for i in range(0u, nread) { - let val = buf[i] as uint; - assert_eq!(val, current % 8); - current += 1; - } - reads += 1; - - chan2.send(()); - } - - // Make sure we had multiple reads - assert!(reads > 1); - } + let (port, chan) = Chan::<Port<()>>::new(); do spawn { let port2 = port.recv(); @@ -983,13 +946,39 @@ mod test { stream.write([0, 1, 2, 3, 4, 5, 6, 7]); port2.recv(); } + + let listener = TcpListener::bind(local_loop(), addr).unwrap(); + let mut acceptor = listener.listen().unwrap(); + let (port2, chan2) = Chan::new(); + chan.send(port2); + let mut stream = acceptor.accept().unwrap(); + let mut buf = [0, .. 2048]; + + let expected = 32; + let mut current = 0; + let mut reads = 0; + + while current < expected { + let nread = stream.read(buf).unwrap(); + for i in range(0u, nread) { + let val = buf[i] as uint; + assert_eq!(val, current % 8); + current += 1; + } + reads += 1; + + chan2.try_send(()); + } + + // Make sure we had multiple reads + assert!(reads > 1); } #[test] fn test_simple_tcp_server_and_client_on_diff_threads() { let addr = next_test_ip4(); - do task::spawn_sched(task::SingleThreaded) { + do spawn { let listener = TcpListener::bind(local_loop(), addr).unwrap(); let mut acceptor = listener.listen().unwrap(); let mut stream = acceptor.accept().unwrap(); @@ -1001,131 +990,11 @@ mod test { } } - do task::spawn_sched(task::SingleThreaded) { - let mut stream = TcpWatcher::connect(local_loop(), addr); - while stream.is_err() { - stream = TcpWatcher::connect(local_loop(), addr); - } - stream.unwrap().write([0, 1, 2, 3, 4, 5, 6, 7]); - } - } - - // On one thread, create a udp socket. Then send that socket to another - // thread and destroy the socket on the remote thread. This should make sure - // that homing kicks in for the socket to go back home to the original - // thread, close itself, and then come back to the last thread. - #[test] - fn test_homing_closes_correctly() { - let (port, chan) = Chan::new(); - - do task::spawn_sched(task::SingleThreaded) { - let listener = UdpWatcher::bind(local_loop(), next_test_ip4()).unwrap(); - chan.send(listener); - } - - do task::spawn_sched(task::SingleThreaded) { - port.recv(); - } - } - - // This is a bit of a crufty old test, but it has its uses. - #[test] - fn test_simple_homed_udp_io_bind_then_move_task_then_home_and_close() { - use std::cast; - use std::rt::local::Local; - use std::rt::rtio::{EventLoop, IoFactory}; - use std::rt::sched::Scheduler; - use std::rt::sched::{Shutdown, TaskFromFriend}; - use std::rt::sleeper_list::SleeperList; - use std::rt::task::Task; - use std::rt::thread::Thread; - use std::rt::deque::BufferPool; - use std::task::TaskResult; - use std::unstable::run_in_bare_thread; - use uvio::UvEventLoop; - - do run_in_bare_thread { - let sleepers = SleeperList::new(); - let mut pool = BufferPool::new(); - let (worker1, stealer1) = pool.deque(); - let (worker2, stealer2) = pool.deque(); - let queues = ~[stealer1, stealer2]; - - let loop1 = ~UvEventLoop::new() as ~EventLoop; - let mut sched1 = ~Scheduler::new(loop1, worker1, queues.clone(), - sleepers.clone()); - let loop2 = ~UvEventLoop::new() as ~EventLoop; - let mut sched2 = ~Scheduler::new(loop2, worker2, queues.clone(), - sleepers.clone()); - - let handle1 = sched1.make_handle(); - let handle2 = sched2.make_handle(); - let tasksFriendHandle = sched2.make_handle(); - - let on_exit: proc(TaskResult) = proc(exit_status) { - let mut handle1 = handle1; - let mut handle2 = handle2; - handle1.send(Shutdown); - handle2.send(Shutdown); - assert!(exit_status.is_ok()); - }; - - unsafe fn local_io() -> &'static mut IoFactory { - let mut sched = Local::borrow(None::<Scheduler>); - let io = sched.get().event_loop.io(); - cast::transmute(io.unwrap()) - } - - let test_function: proc() = proc() { - let io = unsafe { local_io() }; - let addr = next_test_ip4(); - let maybe_socket = io.udp_bind(addr); - // this socket is bound to this event loop - assert!(maybe_socket.is_ok()); - - // block self on sched1 - let scheduler: ~Scheduler = Local::take(); - let mut tasksFriendHandle = Some(tasksFriendHandle); - scheduler.deschedule_running_task_and_then(|_, task| { - // unblock task - task.wake().map(|task| { - // send self to sched2 - tasksFriendHandle.take_unwrap() - .send(TaskFromFriend(task)); - }); - // sched1 should now sleep since it has nothing else to do - }) - // sched2 will wake up and get the task as we do nothing else, - // the function ends and the socket goes out of scope sched2 - // will start to run the destructor the destructor will first - // block the task, set it's home as sched1, then enqueue it - // sched2 will dequeue the task, see that it has a home, and - // send it to sched1 sched1 will wake up, exec the close - // function on the correct loop, and then we're done - }; - - let mut main_task = ~Task::new_root(&mut sched1.stack_pool, None, - test_function); - main_task.death.on_exit = Some(on_exit); - - let null_task = ~do Task::new_root(&mut sched2.stack_pool, None) { - // nothing - }; - - let main_task = main_task; - let sched1 = sched1; - let thread1 = do Thread::start { - sched1.bootstrap(main_task); - }; - - let sched2 = sched2; - let thread2 = do Thread::start { - sched2.bootstrap(null_task); - }; - - thread1.join(); - thread2.join(); + let mut stream = TcpWatcher::connect(local_loop(), addr); + while stream.is_err() { + stream = TcpWatcher::connect(local_loop(), addr); } + stream.unwrap().write([0, 1, 2, 3, 4, 5, 6, 7]); } #[should_fail] #[test] @@ -1167,7 +1036,7 @@ mod test { // force the handle to be created on a different scheduler, failure in // the original task will force a homing operation back to this // scheduler. - do task::spawn_sched(task::SingleThreaded) { + do spawn { let w = UdpWatcher::bind(local_loop(), addr).unwrap(); chan.send(w); } @@ -1175,67 +1044,4 @@ mod test { let _w = port.recv(); fail!(); } - - #[should_fail] - #[test] - #[ignore(reason = "linked failure")] - fn linked_failure1() { - let (port, chan) = Chan::new(); - let addr = next_test_ip4(); - - do spawn { - let w = TcpListener::bind(local_loop(), addr).unwrap(); - let mut w = w.listen().unwrap(); - chan.send(()); - w.accept(); - } - - port.recv(); - fail!(); - } - - #[should_fail] - #[test] - #[ignore(reason = "linked failure")] - fn linked_failure2() { - let (port, chan) = Chan::new(); - let addr = next_test_ip4(); - - do spawn { - let w = TcpListener::bind(local_loop(), addr).unwrap(); - let mut w = w.listen().unwrap(); - chan.send(()); - let mut buf = [0]; - w.accept().unwrap().read(buf); - } - - port.recv(); - let _w = TcpWatcher::connect(local_loop(), addr).unwrap(); - - fail!(); - } - - #[should_fail] - #[test] - #[ignore(reason = "linked failure")] - fn linked_failure3() { - let (port, chan) = Chan::new(); - let addr = next_test_ip4(); - - do spawn { - let chan = chan; - let w = TcpListener::bind(local_loop(), addr).unwrap(); - let mut w = w.listen().unwrap(); - chan.send(()); - let mut conn = w.accept().unwrap(); - chan.send(()); - let buf = [0, ..65536]; - conn.write(buf); - } - - port.recv(); - let _w = TcpWatcher::connect(local_loop(), addr).unwrap(); - port.recv(); - fail!(); - } } diff --git a/src/librustuv/pipe.rs b/src/librustuv/pipe.rs index 6975ef26bd7..0bc198a4a3f 100644 --- a/src/librustuv/pipe.rs +++ b/src/librustuv/pipe.rs @@ -235,8 +235,9 @@ impl HomingIO for PipeAcceptor { #[cfg(test)] mod tests { use std::rt::rtio::{RtioUnixListener, RtioUnixAcceptor, RtioPipe}; - use std::rt::test::next_test_unix; + use std::io::test::next_test_unix; + use super::{PipeWatcher, PipeListener}; use super::super::local_loop; #[test] diff --git a/src/librustuv/signal.rs b/src/librustuv/signal.rs index 27dbc0fe3bb..b53acd4ebd6 100644 --- a/src/librustuv/signal.rs +++ b/src/librustuv/signal.rs @@ -52,7 +52,7 @@ impl SignalWatcher { extern fn signal_cb(handle: *uvll::uv_signal_t, signum: c_int) { let s: &mut SignalWatcher = unsafe { UvHandle::from_uv_handle(&handle) }; assert_eq!(signum as int, s.signal as int); - s.channel.send_deferred(s.signal); + s.channel.try_send_deferred(s.signal); } impl HomingIO for SignalWatcher { @@ -76,6 +76,7 @@ impl Drop for SignalWatcher { mod test { use super::super::local_loop; use std::io::signal; + use super::SignalWatcher; #[test] fn closing_channel_during_drop_doesnt_kill_everything() { diff --git a/src/librustuv/timer.rs b/src/librustuv/timer.rs index 1e70c5c55e0..d3a190df8be 100644 --- a/src/librustuv/timer.rs +++ b/src/librustuv/timer.rs @@ -179,6 +179,7 @@ impl Drop for TimerWatcher { mod test { use std::rt::rtio::RtioTimer; use super::super::local_loop; + use super::TimerWatcher; #[test] fn oneshot() { diff --git a/src/librustuv/uvio.rs b/src/librustuv/uvio.rs index 52e0b5ed77b..57bb0cfdc7a 100644 --- a/src/librustuv/uvio.rs +++ b/src/librustuv/uvio.rs @@ -102,6 +102,7 @@ pub extern "C" fn new_loop() -> ~rtio::EventLoop { #[test] fn test_callback_run_once() { + use std::rt::rtio::EventLoop; do run_in_bare_thread { let mut event_loop = UvEventLoop::new(); let mut count = 0; diff --git a/src/libstd/io/mod.rs b/src/libstd/io/mod.rs index 8481de73c7f..2d52986294d 100644 --- a/src/libstd/io/mod.rs +++ b/src/libstd/io/mod.rs @@ -313,9 +313,8 @@ pub use self::net::udp::UdpStream; pub use self::pipe::PipeStream; pub use self::process::Process; -/// Testing helpers -#[cfg(test)] -mod test; +/// Various utility functions useful for writing I/O tests +pub mod test; /// Synchronous, non-blocking filesystem operations. pub mod fs; |
