diff options
| author | Alex Crichton <alex@alexcrichton.com> | 2013-10-22 15:13:18 -0700 |
|---|---|---|
| committer | Alex Crichton <alex@alexcrichton.com> | 2013-10-29 08:39:22 -0700 |
| commit | 201cab84e8f12ec73131ac4908e6779b277449a2 (patch) | |
| tree | 2312f91c2280a1463c179753fe6123b27a77ae9e /src/librustuv/net.rs | |
| parent | 5dd1583c57fbee9a07ac1111858871c241a24c50 (diff) | |
| download | rust-201cab84e8f12ec73131ac4908e6779b277449a2.tar.gz rust-201cab84e8f12ec73131ac4908e6779b277449a2.zip | |
Move rust's uv implementation to its own crate
There are a few reasons that this is a desirable move to take: 1. Proof of concept that a third party event loop is possible 2. Clear separation of responsibility between rt::io and the uv-backend 3. Enforce in the future that the event loop is "pluggable" and replacable Here's a quick summary of the points of this pull request which make this possible: * Two new lang items were introduced: event_loop, and event_loop_factory. The idea of a "factory" is to define a function which can be called with no arguments and will return the new event loop as a trait object. This factory is emitted to the crate map when building an executable. The factory doesn't have to exist, and when it doesn't then an empty slot is in the crate map and a basic event loop with no I/O support is provided to the runtime. * When building an executable, then the rustuv crate will be linked by default (providing a default implementation of the event loop) via a similar method to injecting a dependency on libstd. This is currently the only location where the rustuv crate is ever linked. * There is a new #[no_uv] attribute (implied by #[no_std]) which denies implicitly linking to rustuv by default Closes #5019
Diffstat (limited to 'src/librustuv/net.rs')
| -rw-r--r-- | src/librustuv/net.rs | 851 |
1 files changed, 851 insertions, 0 deletions
diff --git a/src/librustuv/net.rs b/src/librustuv/net.rs new file mode 100644 index 00000000000..0aaa931c947 --- /dev/null +++ b/src/librustuv/net.rs @@ -0,0 +1,851 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use std::libc::{size_t, ssize_t, c_int, c_void, c_uint}; +use std::vec; +use std::str; +use std::rt::io::net::ip::{SocketAddr, Ipv4Addr, Ipv6Addr}; + +use uvll; +use uvll::*; +use super::{AllocCallback, ConnectionCallback, ReadCallback, UdpReceiveCallback, + UdpSendCallback, Loop, Watcher, Request, UvError, Buf, NativeHandle, + status_to_maybe_uv_error, empty_buf}; + +pub struct UvAddrInfo(*uvll::addrinfo); + +pub enum UvSocketAddr { + UvIpv4SocketAddr(*sockaddr_in), + UvIpv6SocketAddr(*sockaddr_in6), +} + +pub fn sockaddr_to_UvSocketAddr(addr: *uvll::sockaddr) -> UvSocketAddr { + unsafe { + assert!((is_ip4_addr(addr) || is_ip6_addr(addr))); + assert!(!(is_ip4_addr(addr) && is_ip6_addr(addr))); + match addr { + _ if is_ip4_addr(addr) => UvIpv4SocketAddr(addr as *uvll::sockaddr_in), + _ if is_ip6_addr(addr) => UvIpv6SocketAddr(addr as *uvll::sockaddr_in6), + _ => fail!(), + } + } +} + +fn socket_addr_as_uv_socket_addr<T>(addr: SocketAddr, f: &fn(UvSocketAddr) -> T) -> T { + let malloc = match addr.ip { + Ipv4Addr(*) => malloc_ip4_addr, + Ipv6Addr(*) => malloc_ip6_addr, + }; + let wrap = match addr.ip { + Ipv4Addr(*) => UvIpv4SocketAddr, + Ipv6Addr(*) => UvIpv6SocketAddr, + }; + let free = match addr.ip { + Ipv4Addr(*) => free_ip4_addr, + Ipv6Addr(*) => free_ip6_addr, + }; + + let addr = unsafe { malloc(addr.ip.to_str(), addr.port as int) }; + do (|| { + f(wrap(addr)) + }).finally { + unsafe { free(addr) }; + } +} + +fn uv_socket_addr_as_socket_addr<T>(addr: UvSocketAddr, f: &fn(SocketAddr) -> T) -> T { + let ip_size = match addr { + UvIpv4SocketAddr(*) => 4/*groups of*/ * 3/*digits separated by*/ + 3/*periods*/, + UvIpv6SocketAddr(*) => 8/*groups of*/ * 4/*hex digits separated by*/ + 7 /*colons*/, + }; + let ip_name = { + let buf = vec::from_elem(ip_size + 1 /*null terminated*/, 0u8); + unsafe { + let buf_ptr = vec::raw::to_ptr(buf); + match addr { + UvIpv4SocketAddr(addr) => uvll::ip4_name(addr, buf_ptr, ip_size as size_t), + UvIpv6SocketAddr(addr) => uvll::ip6_name(addr, buf_ptr, ip_size as size_t), + } + }; + buf + }; + let ip_port = unsafe { + let port = match addr { + UvIpv4SocketAddr(addr) => uvll::ip4_port(addr), + UvIpv6SocketAddr(addr) => uvll::ip6_port(addr), + }; + port as u16 + }; + let ip_str = str::from_utf8_slice(ip_name).trim_right_chars(&'\x00'); + let ip_addr = FromStr::from_str(ip_str).unwrap(); + + // finally run the closure + f(SocketAddr { ip: ip_addr, port: ip_port }) +} + +pub fn uv_socket_addr_to_socket_addr(addr: UvSocketAddr) -> SocketAddr { + use std::util; + uv_socket_addr_as_socket_addr(addr, util::id) +} + +#[cfg(test)] +#[test] +fn test_ip4_conversion() { + use std::rt; + let ip4 = rt::test::next_test_ip4(); + assert_eq!(ip4, socket_addr_as_uv_socket_addr(ip4, uv_socket_addr_to_socket_addr)); +} + +#[cfg(test)] +#[test] +fn test_ip6_conversion() { + use std::rt; + let ip6 = rt::test::next_test_ip6(); + assert_eq!(ip6, socket_addr_as_uv_socket_addr(ip6, uv_socket_addr_to_socket_addr)); +} + +// uv_stream_t is the parent class of uv_tcp_t, uv_pipe_t, uv_tty_t +// and uv_file_t +pub struct StreamWatcher(*uvll::uv_stream_t); +impl Watcher for StreamWatcher { } + +impl StreamWatcher { + pub fn read_start(&mut self, alloc: AllocCallback, cb: ReadCallback) { + unsafe { + match uvll::read_start(self.native_handle(), alloc_cb, read_cb) { + 0 => { + let data = self.get_watcher_data(); + data.alloc_cb = Some(alloc); + data.read_cb = Some(cb); + } + n => { + cb(*self, 0, empty_buf(), Some(UvError(n))) + } + } + } + + extern fn alloc_cb(stream: *uvll::uv_stream_t, suggested_size: size_t) -> Buf { + let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(stream); + let alloc_cb = stream_watcher.get_watcher_data().alloc_cb.get_ref(); + return (*alloc_cb)(suggested_size as uint); + } + + extern fn read_cb(stream: *uvll::uv_stream_t, nread: ssize_t, buf: Buf) { + uvdebug!("buf addr: {}", buf.base); + uvdebug!("buf len: {}", buf.len); + let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(stream); + let cb = stream_watcher.get_watcher_data().read_cb.get_ref(); + let status = status_to_maybe_uv_error(nread as c_int); + (*cb)(stream_watcher, nread as int, buf, status); + } + } + + pub fn read_stop(&mut self) { + // It would be nice to drop the alloc and read callbacks here, + // but read_stop may be called from inside one of them and we + // would end up freeing the in-use environment + let handle = self.native_handle(); + unsafe { assert_eq!(uvll::read_stop(handle), 0); } + } + + pub fn write(&mut self, buf: Buf, cb: ConnectionCallback) { + let req = WriteRequest::new(); + return unsafe { + match uvll::write(req.native_handle(), self.native_handle(), + [buf], write_cb) { + 0 => { + let data = self.get_watcher_data(); + assert!(data.write_cb.is_none()); + data.write_cb = Some(cb); + } + n => { + req.delete(); + cb(*self, Some(UvError(n))) + } + } + }; + + extern fn write_cb(req: *uvll::uv_write_t, status: c_int) { + let write_request: WriteRequest = NativeHandle::from_native_handle(req); + let mut stream_watcher = write_request.stream(); + write_request.delete(); + let cb = stream_watcher.get_watcher_data().write_cb.take_unwrap(); + let status = status_to_maybe_uv_error(status); + cb(stream_watcher, status); + } + } + + + pub fn listen(&mut self, cb: ConnectionCallback) -> Result<(), UvError> { + { + let data = self.get_watcher_data(); + assert!(data.connect_cb.is_none()); + data.connect_cb = Some(cb); + } + + return unsafe { + static BACKLOG: c_int = 128; // XXX should be configurable + match uvll::listen(self.native_handle(), BACKLOG, connection_cb) { + 0 => Ok(()), + n => Err(UvError(n)) + } + }; + + extern fn connection_cb(handle: *uvll::uv_stream_t, status: c_int) { + uvdebug!("connection_cb"); + let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(handle); + let cb = stream_watcher.get_watcher_data().connect_cb.get_ref(); + let status = status_to_maybe_uv_error(status); + (*cb)(stream_watcher, status); + } + } + + pub fn accept(&mut self, stream: StreamWatcher) { + let self_handle = self.native_handle() as *c_void; + let stream_handle = stream.native_handle() as *c_void; + assert_eq!(0, unsafe { uvll::accept(self_handle, stream_handle) } ); + } +} + +impl NativeHandle<*uvll::uv_stream_t> for StreamWatcher { + fn from_native_handle(handle: *uvll::uv_stream_t) -> StreamWatcher { + StreamWatcher(handle) + } + fn native_handle(&self) -> *uvll::uv_stream_t { + match self { &StreamWatcher(ptr) => ptr } + } +} + +pub struct TcpWatcher(*uvll::uv_tcp_t); +impl Watcher for TcpWatcher { } + +impl TcpWatcher { + pub fn new(loop_: &Loop) -> TcpWatcher { + unsafe { + let handle = malloc_handle(UV_TCP); + assert!(handle.is_not_null()); + assert_eq!(0, uvll::tcp_init(loop_.native_handle(), handle)); + let mut watcher: TcpWatcher = NativeHandle::from_native_handle(handle); + watcher.install_watcher_data(); + return watcher; + } + } + + pub fn bind(&mut self, address: SocketAddr) -> Result<(), UvError> { + do socket_addr_as_uv_socket_addr(address) |addr| { + let result = unsafe { + match addr { + UvIpv4SocketAddr(addr) => uvll::tcp_bind(self.native_handle(), addr), + UvIpv6SocketAddr(addr) => uvll::tcp_bind6(self.native_handle(), addr), + } + }; + match result { + 0 => Ok(()), + _ => Err(UvError(result)), + } + } + } + + pub fn connect(&mut self, address: SocketAddr, cb: ConnectionCallback) { + unsafe { + assert!(self.get_watcher_data().connect_cb.is_none()); + self.get_watcher_data().connect_cb = Some(cb); + + let connect_handle = ConnectRequest::new().native_handle(); + uvdebug!("connect_t: {}", connect_handle); + do socket_addr_as_uv_socket_addr(address) |addr| { + let result = match addr { + UvIpv4SocketAddr(addr) => uvll::tcp_connect(connect_handle, + self.native_handle(), addr, connect_cb), + UvIpv6SocketAddr(addr) => uvll::tcp_connect6(connect_handle, + self.native_handle(), addr, connect_cb), + }; + assert_eq!(0, result); + } + + extern fn connect_cb(req: *uvll::uv_connect_t, status: c_int) { + uvdebug!("connect_t: {}", req); + let connect_request: ConnectRequest = NativeHandle::from_native_handle(req); + let mut stream_watcher = connect_request.stream(); + connect_request.delete(); + let cb = stream_watcher.get_watcher_data().connect_cb.take_unwrap(); + let status = status_to_maybe_uv_error(status); + cb(stream_watcher, status); + } + } + } + + pub fn as_stream(&self) -> StreamWatcher { + NativeHandle::from_native_handle(self.native_handle() as *uvll::uv_stream_t) + } +} + +impl NativeHandle<*uvll::uv_tcp_t> for TcpWatcher { + fn from_native_handle(handle: *uvll::uv_tcp_t) -> TcpWatcher { + TcpWatcher(handle) + } + fn native_handle(&self) -> *uvll::uv_tcp_t { + match self { &TcpWatcher(ptr) => ptr } + } +} + +pub struct UdpWatcher(*uvll::uv_udp_t); +impl Watcher for UdpWatcher { } + +impl UdpWatcher { + pub fn new(loop_: &Loop) -> UdpWatcher { + unsafe { + let handle = malloc_handle(UV_UDP); + assert!(handle.is_not_null()); + assert_eq!(0, uvll::udp_init(loop_.native_handle(), handle)); + let mut watcher: UdpWatcher = NativeHandle::from_native_handle(handle); + watcher.install_watcher_data(); + return watcher; + } + } + + pub fn bind(&mut self, address: SocketAddr) -> Result<(), UvError> { + do socket_addr_as_uv_socket_addr(address) |addr| { + let result = unsafe { + match addr { + UvIpv4SocketAddr(addr) => uvll::udp_bind(self.native_handle(), addr, 0u32), + UvIpv6SocketAddr(addr) => uvll::udp_bind6(self.native_handle(), addr, 0u32), + } + }; + match result { + 0 => Ok(()), + _ => Err(UvError(result)), + } + } + } + + pub fn recv_start(&mut self, alloc: AllocCallback, cb: UdpReceiveCallback) { + { + let data = self.get_watcher_data(); + data.alloc_cb = Some(alloc); + data.udp_recv_cb = Some(cb); + } + + unsafe { uvll::udp_recv_start(self.native_handle(), alloc_cb, recv_cb); } + + extern fn alloc_cb(handle: *uvll::uv_udp_t, suggested_size: size_t) -> Buf { + let mut udp_watcher: UdpWatcher = NativeHandle::from_native_handle(handle); + let alloc_cb = udp_watcher.get_watcher_data().alloc_cb.get_ref(); + return (*alloc_cb)(suggested_size as uint); + } + + extern fn recv_cb(handle: *uvll::uv_udp_t, nread: ssize_t, buf: Buf, + addr: *uvll::sockaddr, flags: c_uint) { + // When there's no data to read the recv callback can be a no-op. + // This can happen if read returns EAGAIN/EWOULDBLOCK. By ignoring + // this we just drop back to kqueue and wait for the next callback. + if nread == 0 { + return; + } + + uvdebug!("buf addr: {}", buf.base); + uvdebug!("buf len: {}", buf.len); + let mut udp_watcher: UdpWatcher = NativeHandle::from_native_handle(handle); + let cb = udp_watcher.get_watcher_data().udp_recv_cb.get_ref(); + let status = status_to_maybe_uv_error(nread as c_int); + let addr = uv_socket_addr_to_socket_addr(sockaddr_to_UvSocketAddr(addr)); + (*cb)(udp_watcher, nread as int, buf, addr, flags as uint, status); + } + } + + pub fn recv_stop(&mut self) { + unsafe { uvll::udp_recv_stop(self.native_handle()); } + } + + pub fn send(&mut self, buf: Buf, address: SocketAddr, cb: UdpSendCallback) { + { + let data = self.get_watcher_data(); + assert!(data.udp_send_cb.is_none()); + data.udp_send_cb = Some(cb); + } + + let req = UdpSendRequest::new(); + do socket_addr_as_uv_socket_addr(address) |addr| { + let result = unsafe { + match addr { + UvIpv4SocketAddr(addr) => uvll::udp_send(req.native_handle(), + self.native_handle(), [buf], addr, send_cb), + UvIpv6SocketAddr(addr) => uvll::udp_send6(req.native_handle(), + self.native_handle(), [buf], addr, send_cb), + } + }; + assert_eq!(0, result); + } + + extern fn send_cb(req: *uvll::uv_udp_send_t, status: c_int) { + let send_request: UdpSendRequest = NativeHandle::from_native_handle(req); + let mut udp_watcher = send_request.handle(); + send_request.delete(); + let cb = udp_watcher.get_watcher_data().udp_send_cb.take_unwrap(); + let status = status_to_maybe_uv_error(status); + cb(udp_watcher, status); + } + } +} + +impl NativeHandle<*uvll::uv_udp_t> for UdpWatcher { + fn from_native_handle(handle: *uvll::uv_udp_t) -> UdpWatcher { + UdpWatcher(handle) + } + fn native_handle(&self) -> *uvll::uv_udp_t { + match self { &UdpWatcher(ptr) => ptr } + } +} + +// uv_connect_t is a subclass of uv_req_t +pub struct ConnectRequest(*uvll::uv_connect_t); +impl Request for ConnectRequest { } + +impl ConnectRequest { + + pub fn new() -> ConnectRequest { + let connect_handle = unsafe { malloc_req(UV_CONNECT) }; + assert!(connect_handle.is_not_null()); + ConnectRequest(connect_handle as *uvll::uv_connect_t) + } + + fn stream(&self) -> StreamWatcher { + unsafe { + let stream_handle = uvll::get_stream_handle_from_connect_req(self.native_handle()); + NativeHandle::from_native_handle(stream_handle) + } + } + + fn delete(self) { + unsafe { free_req(self.native_handle() as *c_void) } + } +} + +impl NativeHandle<*uvll::uv_connect_t> for ConnectRequest { + fn from_native_handle(handle: *uvll:: uv_connect_t) -> ConnectRequest { + ConnectRequest(handle) + } + fn native_handle(&self) -> *uvll::uv_connect_t { + match self { &ConnectRequest(ptr) => ptr } + } +} + +pub struct WriteRequest(*uvll::uv_write_t); + +impl Request for WriteRequest { } + +impl WriteRequest { + pub fn new() -> WriteRequest { + let write_handle = unsafe { malloc_req(UV_WRITE) }; + assert!(write_handle.is_not_null()); + WriteRequest(write_handle as *uvll::uv_write_t) + } + + pub fn stream(&self) -> StreamWatcher { + unsafe { + let stream_handle = uvll::get_stream_handle_from_write_req(self.native_handle()); + NativeHandle::from_native_handle(stream_handle) + } + } + + pub fn delete(self) { + unsafe { free_req(self.native_handle() as *c_void) } + } +} + +impl NativeHandle<*uvll::uv_write_t> for WriteRequest { + fn from_native_handle(handle: *uvll:: uv_write_t) -> WriteRequest { + WriteRequest(handle) + } + fn native_handle(&self) -> *uvll::uv_write_t { + match self { &WriteRequest(ptr) => ptr } + } +} + +pub struct UdpSendRequest(*uvll::uv_udp_send_t); +impl Request for UdpSendRequest { } + +impl UdpSendRequest { + pub fn new() -> UdpSendRequest { + let send_handle = unsafe { malloc_req(UV_UDP_SEND) }; + assert!(send_handle.is_not_null()); + UdpSendRequest(send_handle as *uvll::uv_udp_send_t) + } + + pub fn handle(&self) -> UdpWatcher { + let send_request_handle = unsafe { + uvll::get_udp_handle_from_send_req(self.native_handle()) + }; + NativeHandle::from_native_handle(send_request_handle) + } + + pub fn delete(self) { + unsafe { free_req(self.native_handle() as *c_void) } + } +} + +impl NativeHandle<*uvll::uv_udp_send_t> for UdpSendRequest { + fn from_native_handle(handle: *uvll::uv_udp_send_t) -> UdpSendRequest { + UdpSendRequest(handle) + } + fn native_handle(&self) -> *uvll::uv_udp_send_t { + match self { &UdpSendRequest(ptr) => ptr } + } +} + +#[cfg(test)] +mod test { + use super::*; + use std::util::ignore; + use std::cell::Cell; + use std::vec; + use std::unstable::run_in_bare_thread; + use std::rt::thread::Thread; + use std::rt::test::*; + use super::super::{Loop, AllocCallback}; + use super::super::{vec_from_uv_buf, vec_to_uv_buf, slice_to_uv_buf}; + + #[test] + fn connect_close_ip4() { + do run_in_bare_thread() { + let mut loop_ = Loop::new(); + let mut tcp_watcher = { TcpWatcher::new(&mut loop_) }; + // Connect to a port where nobody is listening + let addr = next_test_ip4(); + do tcp_watcher.connect(addr) |stream_watcher, status| { + uvdebug!("tcp_watcher.connect!"); + assert!(status.is_some()); + assert_eq!(status.unwrap().name(), ~"ECONNREFUSED"); + stream_watcher.close(||()); + } + loop_.run(); + loop_.close(); + } + } + + #[test] + fn connect_close_ip6() { + do run_in_bare_thread() { + let mut loop_ = Loop::new(); + let mut tcp_watcher = { TcpWatcher::new(&mut loop_) }; + // Connect to a port where nobody is listening + let addr = next_test_ip6(); + do tcp_watcher.connect(addr) |stream_watcher, status| { + uvdebug!("tcp_watcher.connect!"); + assert!(status.is_some()); + assert_eq!(status.unwrap().name(), ~"ECONNREFUSED"); + stream_watcher.close(||()); + } + loop_.run(); + loop_.close(); + } + } + + #[test] + fn udp_bind_close_ip4() { + do run_in_bare_thread() { + let mut loop_ = Loop::new(); + let mut udp_watcher = { UdpWatcher::new(&mut loop_) }; + let addr = next_test_ip4(); + udp_watcher.bind(addr); + udp_watcher.close(||()); + loop_.run(); + loop_.close(); + } + } + + #[test] + fn udp_bind_close_ip6() { + do run_in_bare_thread() { + let mut loop_ = Loop::new(); + let mut udp_watcher = { UdpWatcher::new(&mut loop_) }; + let addr = next_test_ip6(); + udp_watcher.bind(addr); + udp_watcher.close(||()); + loop_.run(); + loop_.close(); + } + } + + #[test] + fn listen_ip4() { + do run_in_bare_thread() { + static MAX: int = 10; + let mut loop_ = Loop::new(); + let mut server_tcp_watcher = { TcpWatcher::new(&mut loop_) }; + let addr = next_test_ip4(); + server_tcp_watcher.bind(addr); + let loop_ = loop_; + uvdebug!("listening"); + let mut stream = server_tcp_watcher.as_stream(); + let res = do stream.listen |mut server_stream_watcher, status| { + uvdebug!("listened!"); + assert!(status.is_none()); + let mut loop_ = loop_; + let client_tcp_watcher = TcpWatcher::new(&mut loop_); + let mut client_tcp_watcher = client_tcp_watcher.as_stream(); + server_stream_watcher.accept(client_tcp_watcher); + let count_cell = Cell::new(0); + let server_stream_watcher = server_stream_watcher; + uvdebug!("starting read"); + let alloc: AllocCallback = |size| { + vec_to_uv_buf(vec::from_elem(size, 0u8)) + }; + do client_tcp_watcher.read_start(alloc) |stream_watcher, nread, buf, status| { + + uvdebug!("i'm reading!"); + let buf = vec_from_uv_buf(buf); + let mut count = count_cell.take(); + if status.is_none() { + uvdebug!("got {} bytes", nread); + let buf = buf.unwrap(); + for byte in buf.slice(0, nread as uint).iter() { + assert!(*byte == count as u8); + uvdebug!("{}", *byte as uint); + count += 1; + } + } else { + assert_eq!(count, MAX); + do stream_watcher.close { + server_stream_watcher.close(||()); + } + } + count_cell.put_back(count); + } + }; + + assert!(res.is_ok()); + + let client_thread = do Thread::start { + uvdebug!("starting client thread"); + let mut loop_ = Loop::new(); + let mut tcp_watcher = { TcpWatcher::new(&mut loop_) }; + do tcp_watcher.connect(addr) |mut stream_watcher, status| { + uvdebug!("connecting"); + assert!(status.is_none()); + let msg = ~[0, 1, 2, 3, 4, 5, 6 ,7 ,8, 9]; + let buf = slice_to_uv_buf(msg); + let msg_cell = Cell::new(msg); + do stream_watcher.write(buf) |stream_watcher, status| { + uvdebug!("writing"); + assert!(status.is_none()); + let msg_cell = Cell::new(msg_cell.take()); + stream_watcher.close(||ignore(msg_cell.take())); + } + } + loop_.run(); + loop_.close(); + }; + + let mut loop_ = loop_; + loop_.run(); + loop_.close(); + client_thread.join(); + }; + } + + #[test] + fn listen_ip6() { + do run_in_bare_thread() { + static MAX: int = 10; + let mut loop_ = Loop::new(); + let mut server_tcp_watcher = { TcpWatcher::new(&mut loop_) }; + let addr = next_test_ip6(); + server_tcp_watcher.bind(addr); + let loop_ = loop_; + uvdebug!("listening"); + let mut stream = server_tcp_watcher.as_stream(); + let res = do stream.listen |mut server_stream_watcher, status| { + uvdebug!("listened!"); + assert!(status.is_none()); + let mut loop_ = loop_; + let client_tcp_watcher = TcpWatcher::new(&mut loop_); + let mut client_tcp_watcher = client_tcp_watcher.as_stream(); + server_stream_watcher.accept(client_tcp_watcher); + let count_cell = Cell::new(0); + let server_stream_watcher = server_stream_watcher; + uvdebug!("starting read"); + let alloc: AllocCallback = |size| { + vec_to_uv_buf(vec::from_elem(size, 0u8)) + }; + do client_tcp_watcher.read_start(alloc) + |stream_watcher, nread, buf, status| { + + uvdebug!("i'm reading!"); + let buf = vec_from_uv_buf(buf); + let mut count = count_cell.take(); + if status.is_none() { + uvdebug!("got {} bytes", nread); + let buf = buf.unwrap(); + let r = buf.slice(0, nread as uint); + for byte in r.iter() { + assert!(*byte == count as u8); + uvdebug!("{}", *byte as uint); + count += 1; + } + } else { + assert_eq!(count, MAX); + do stream_watcher.close { + server_stream_watcher.close(||()); + } + } + count_cell.put_back(count); + } + }; + assert!(res.is_ok()); + + let client_thread = do Thread::start { + uvdebug!("starting client thread"); + let mut loop_ = Loop::new(); + let mut tcp_watcher = { TcpWatcher::new(&mut loop_) }; + do tcp_watcher.connect(addr) |mut stream_watcher, status| { + uvdebug!("connecting"); + assert!(status.is_none()); + let msg = ~[0, 1, 2, 3, 4, 5, 6 ,7 ,8, 9]; + let buf = slice_to_uv_buf(msg); + let msg_cell = Cell::new(msg); + do stream_watcher.write(buf) |stream_watcher, status| { + uvdebug!("writing"); + assert!(status.is_none()); + let msg_cell = Cell::new(msg_cell.take()); + stream_watcher.close(||ignore(msg_cell.take())); + } + } + loop_.run(); + loop_.close(); + }; + + let mut loop_ = loop_; + loop_.run(); + loop_.close(); + client_thread.join(); + } + } + + #[test] + fn udp_recv_ip4() { + do run_in_bare_thread() { + static MAX: int = 10; + let mut loop_ = Loop::new(); + let server_addr = next_test_ip4(); + let client_addr = next_test_ip4(); + + let mut server = UdpWatcher::new(&loop_); + assert!(server.bind(server_addr).is_ok()); + + uvdebug!("starting read"); + let alloc: AllocCallback = |size| { + vec_to_uv_buf(vec::from_elem(size, 0u8)) + }; + + do server.recv_start(alloc) |mut server, nread, buf, src, flags, status| { + server.recv_stop(); + uvdebug!("i'm reading!"); + assert!(status.is_none()); + assert_eq!(flags, 0); + assert_eq!(src, client_addr); + + let buf = vec_from_uv_buf(buf); + let mut count = 0; + uvdebug!("got {} bytes", nread); + + let buf = buf.unwrap(); + for &byte in buf.slice(0, nread as uint).iter() { + assert!(byte == count as u8); + uvdebug!("{}", byte as uint); + count += 1; + } + assert_eq!(count, MAX); + + server.close(||{}); + } + + let thread = do Thread::start { + let mut loop_ = Loop::new(); + let mut client = UdpWatcher::new(&loop_); + assert!(client.bind(client_addr).is_ok()); + let msg = ~[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]; + let buf = slice_to_uv_buf(msg); + do client.send(buf, server_addr) |client, status| { + uvdebug!("writing"); + assert!(status.is_none()); + client.close(||{}); + } + + loop_.run(); + loop_.close(); + }; + + loop_.run(); + loop_.close(); + thread.join(); + } + } + + #[test] + fn udp_recv_ip6() { + do run_in_bare_thread() { + static MAX: int = 10; + let mut loop_ = Loop::new(); + let server_addr = next_test_ip6(); + let client_addr = next_test_ip6(); + + let mut server = UdpWatcher::new(&loop_); + assert!(server.bind(server_addr).is_ok()); + + uvdebug!("starting read"); + let alloc: AllocCallback = |size| { + vec_to_uv_buf(vec::from_elem(size, 0u8)) + }; + + do server.recv_start(alloc) |mut server, nread, buf, src, flags, status| { + server.recv_stop(); + uvdebug!("i'm reading!"); + assert!(status.is_none()); + assert_eq!(flags, 0); + assert_eq!(src, client_addr); + + let buf = vec_from_uv_buf(buf); + let mut count = 0; + uvdebug!("got {} bytes", nread); + + let buf = buf.unwrap(); + for &byte in buf.slice(0, nread as uint).iter() { + assert!(byte == count as u8); + uvdebug!("{}", byte as uint); + count += 1; + } + assert_eq!(count, MAX); + + server.close(||{}); + } + + let thread = do Thread::start { + let mut loop_ = Loop::new(); + let mut client = UdpWatcher::new(&loop_); + assert!(client.bind(client_addr).is_ok()); + let msg = ~[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]; + let buf = slice_to_uv_buf(msg); + do client.send(buf, server_addr) |client, status| { + uvdebug!("writing"); + assert!(status.is_none()); + client.close(||{}); + } + + loop_.run(); + loop_.close(); + }; + + loop_.run(); + loop_.close(); + thread.join(); + } + } +} |
