diff options
| author | Alex Crichton <alex@alexcrichton.com> | 2013-10-15 19:44:08 -0700 |
|---|---|---|
| committer | Alex Crichton <alex@alexcrichton.com> | 2013-10-24 14:21:56 -0700 |
| commit | bac96818580a97c049532e50702c2a8204e11754 (patch) | |
| tree | cb8fc611cf345d6f6c539bd62fd778b7b214d1a6 /src/libstd/rt | |
| parent | 61f8c059c4c6082683d78b2ee3d963f65fa1eb98 (diff) | |
| download | rust-bac96818580a97c049532e50702c2a8204e11754.tar.gz rust-bac96818580a97c049532e50702c2a8204e11754.zip | |
Implement io::net::unix
Diffstat (limited to 'src/libstd/rt')
| -rw-r--r-- | src/libstd/rt/io/net/unix.rs | 284 | ||||
| -rw-r--r-- | src/libstd/rt/io/pipe.rs | 4 | ||||
| -rw-r--r-- | src/libstd/rt/io/process.rs | 2 | ||||
| -rw-r--r-- | src/libstd/rt/rtio.rs | 19 | ||||
| -rw-r--r-- | src/libstd/rt/test.rs | 10 | ||||
| -rw-r--r-- | src/libstd/rt/uv/net.rs | 73 | ||||
| -rw-r--r-- | src/libstd/rt/uv/pipe.rs | 51 | ||||
| -rw-r--r-- | src/libstd/rt/uv/process.rs | 6 | ||||
| -rw-r--r-- | src/libstd/rt/uv/uvio.rs | 205 | ||||
| -rw-r--r-- | src/libstd/rt/uv/uvll.rs | 19 |
10 files changed, 587 insertions, 86 deletions
diff --git a/src/libstd/rt/io/net/unix.rs b/src/libstd/rt/io/net/unix.rs index 1771a963ba7..9428c1f800d 100644 --- a/src/libstd/rt/io/net/unix.rs +++ b/src/libstd/rt/io/net/unix.rs @@ -8,44 +8,296 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +/*! + +Named pipes + +This module contains the ability to communicate over named pipes with +synchronous I/O. On windows, this corresponds to talking over a Named Pipe, +while on Unix it corresponds to UNIX domain sockets. + +These pipes are similar to TCP in the sense that you can have both a stream to a +server and a server itself. The server provided accepts other `UnixStream` +instances as clients. + +*/ + use prelude::*; -use super::super::*; + use super::super::support::PathLike; +use rt::rtio::{IoFactory, IoFactoryObject, RtioUnixListenerObject}; +use rt::rtio::{RtioUnixAcceptorObject, RtioPipeObject, RtioUnixListener}; +use rt::rtio::RtioUnixAcceptor; +use rt::io::pipe::PipeStream; +use rt::io::{io_error, Listener, Acceptor, Reader, Writer}; +use rt::local::Local; -pub struct UnixStream; +/// A stream which communicates over a named pipe. +pub struct UnixStream { + priv obj: PipeStream, +} impl UnixStream { - pub fn connect<P: PathLike>(_path: &P) -> Option<UnixStream> { - fail!() + fn new(obj: ~RtioPipeObject) -> UnixStream { + UnixStream { obj: PipeStream::new_bound(obj) } + } + + /// Connect to a pipe named by `path`. This will attempt to open a + /// connection to the underlying socket. + /// + /// The returned stream will be closed when the object falls out of scope. + /// + /// # Failure + /// + /// This function will raise on the `io_error` condition if the connection + /// could not be made. + /// + /// # Example + /// + /// use std::rt::io::net::unix::UnixStream; + /// + /// let server = Path("path/to/my/socket"); + /// let mut stream = UnixStream::connect(&server); + /// stream.write([1, 2, 3]); + /// + pub fn connect<P: PathLike>(path: &P) -> Option<UnixStream> { + let pipe = unsafe { + let io: *mut IoFactoryObject = Local::unsafe_borrow(); + (*io).unix_connect(path) + }; + + match pipe { + Ok(s) => Some(UnixStream::new(s)), + Err(ioerr) => { + io_error::cond.raise(ioerr); + None + } + } } } impl Reader for UnixStream { - fn read(&mut self, _buf: &mut [u8]) -> Option<uint> { fail!() } - - fn eof(&mut self) -> bool { fail!() } + fn read(&mut self, buf: &mut [u8]) -> Option<uint> { self.obj.read(buf) } + fn eof(&mut self) -> bool { self.obj.eof() } } impl Writer for UnixStream { - fn write(&mut self, _v: &[u8]) { fail!() } - - fn flush(&mut self) { fail!() } + fn write(&mut self, buf: &[u8]) { self.obj.write(buf) } + fn flush(&mut self) { self.obj.flush() } } -pub struct UnixListener; +pub struct UnixListener { + priv obj: ~RtioUnixListenerObject, +} impl UnixListener { - pub fn bind<P: PathLike>(_path: &P) -> Option<UnixListener> { - fail!() + + /// Creates a new listener, ready to receive incoming connections on the + /// specified socket. The server will be named by `path`. + /// + /// This listener will be closed when it falls out of scope. + /// + /// # Failure + /// + /// This function will raise on the `io_error` condition if the specified + /// path could not be bound. + /// + /// # Example + /// + /// use std::rt::io::net::unix::UnixListener; + /// + /// let server = Path("path/to/my/socket"); + /// let mut stream = UnixListener::bind(&server); + /// for client in stream.incoming() { + /// let mut client = client; + /// client.write([1, 2, 3, 4]); + /// } + /// + pub fn bind<P: PathLike>(path: &P) -> Option<UnixListener> { + let listener = unsafe { + let io: *mut IoFactoryObject = Local::unsafe_borrow(); + (*io).unix_bind(path) + }; + match listener { + Ok(s) => Some(UnixListener{ obj: s }), + Err(ioerr) => { + io_error::cond.raise(ioerr); + None + } + } } } impl Listener<UnixStream, UnixAcceptor> for UnixListener { - fn listen(self) -> Option<UnixAcceptor> { fail!() } + fn listen(self) -> Option<UnixAcceptor> { + match self.obj.listen() { + Ok(acceptor) => Some(UnixAcceptor { obj: acceptor }), + Err(ioerr) => { + io_error::cond.raise(ioerr); + None + } + } + } } -pub struct UnixAcceptor; +pub struct UnixAcceptor { + priv obj: ~RtioUnixAcceptorObject, +} impl Acceptor<UnixStream> for UnixAcceptor { - fn accept(&mut self) -> Option<UnixStream> { fail!() } + fn accept(&mut self) -> Option<UnixStream> { + match self.obj.accept() { + Ok(s) => Some(UnixStream::new(s)), + Err(ioerr) => { + io_error::cond.raise(ioerr); + None + } + } + } +} + +#[cfg(test)] +mod tests { + use prelude::*; + use super::*; + use cell::Cell; + use rt::test::*; + use rt::io::*; + use rt::comm::oneshot; + use os; + + fn smalltest(server: ~fn(UnixStream), client: ~fn(UnixStream)) { + let server = Cell::new(server); + let client = Cell::new(client); + do run_in_mt_newsched_task { + let server = Cell::new(server.take()); + let client = Cell::new(client.take()); + let path1 = next_test_unix(); + let path2 = path1.clone(); + let (port, chan) = oneshot(); + let port = Cell::new(port); + let chan = Cell::new(chan); + + do spawntask { + let mut acceptor = UnixListener::bind(&path1).listen(); + chan.take().send(()); + server.take()(acceptor.accept().unwrap()); + } + + do spawntask { + port.take().recv(); + client.take()(UnixStream::connect(&path2).unwrap()); + } + } + } + + #[test] + fn bind_error() { + do run_in_mt_newsched_task { + let mut called = false; + do io_error::cond.trap(|e| { + assert!(e.kind == PermissionDenied); + called = true; + }).inside { + let listener = UnixListener::bind(&("path/to/nowhere")); + assert!(listener.is_none()); + } + assert!(called); + } + } + + #[test] + fn connect_error() { + do run_in_mt_newsched_task { + let mut called = false; + do io_error::cond.trap(|e| { + assert_eq!(e.kind, OtherIoError); + called = true; + }).inside { + let stream = UnixStream::connect(&("path/to/nowhere")); + assert!(stream.is_none()); + } + assert!(called); + } + } + + #[test] + fn smoke() { + smalltest(|mut server| { + let mut buf = [0]; + server.read(buf); + assert!(buf[0] == 99); + }, |mut client| { + client.write([99]); + }) + } + + #[test] + fn read_eof() { + smalltest(|mut server| { + let mut buf = [0]; + assert!(server.read(buf).is_none()); + assert!(server.read(buf).is_none()); + }, |_client| { + // drop the client + }) + } + + #[test] + fn write_begone() { + smalltest(|mut server| { + let buf = [0]; + let mut stop = false; + while !stop{ + do io_error::cond.trap(|e| { + assert_eq!(e.kind, BrokenPipe); + stop = true; + }).inside { + server.write(buf); + } + } + }, |_client| { + // drop the client + }) + } + + #[test] + fn accept_lots() { + do run_in_mt_newsched_task { + let times = 10; + let path1 = next_test_unix(); + let path2 = path1.clone(); + let (port, chan) = oneshot(); + let port = Cell::new(port); + let chan = Cell::new(chan); + + do spawntask { + let mut acceptor = UnixListener::bind(&path1).listen(); + chan.take().send(()); + do times.times { + let mut client = acceptor.accept(); + let mut buf = [0]; + client.read(buf); + assert_eq!(buf[0], 100); + } + } + + do spawntask { + port.take().recv(); + do times.times { + let mut stream = UnixStream::connect(&path2); + stream.write([100]); + } + } + } + } + + #[test] + fn path_exists() { + do run_in_mt_newsched_task { + let path = next_test_unix(); + let _acceptor = UnixListener::bind(&path).listen(); + assert!(os::path_exists(&path)); + } + } } diff --git a/src/libstd/rt/io/pipe.rs b/src/libstd/rt/io/pipe.rs index d2cd531ed26..ff1bd55d594 100644 --- a/src/libstd/rt/io/pipe.rs +++ b/src/libstd/rt/io/pipe.rs @@ -21,7 +21,7 @@ use rt::rtio::{RtioPipe, RtioPipeObject, IoFactoryObject, IoFactory}; use rt::rtio::RtioUnboundPipeObject; pub struct PipeStream { - priv obj: RtioPipeObject + priv obj: ~RtioPipeObject } // This should not be a newtype, but rt::uv::process::set_stdio needs to reach @@ -45,7 +45,7 @@ impl PipeStream { } } - pub fn bind(inner: RtioPipeObject) -> PipeStream { + pub fn new_bound(inner: ~RtioPipeObject) -> PipeStream { PipeStream { obj: inner } } } diff --git a/src/libstd/rt/io/process.rs b/src/libstd/rt/io/process.rs index 5f2453852ee..e0ffa82b59f 100644 --- a/src/libstd/rt/io/process.rs +++ b/src/libstd/rt/io/process.rs @@ -100,7 +100,7 @@ impl Process { Ok((p, io)) => Some(Process{ handle: p, io: io.move_iter().map(|p| - p.map(|p| io::PipeStream::bind(p)) + p.map(|p| io::PipeStream::new_bound(p)) ).collect() }), Err(ioerr) => { diff --git a/src/libstd/rt/rtio.rs b/src/libstd/rt/rtio.rs index 501def8b060..0964f94d6d5 100644 --- a/src/libstd/rt/rtio.rs +++ b/src/libstd/rt/rtio.rs @@ -36,6 +36,8 @@ pub type PausibleIdleCallback = uvio::UvPausibleIdleCallback; pub type RtioPipeObject = uvio::UvPipeStream; pub type RtioUnboundPipeObject = uvio::UvUnboundPipe; pub type RtioProcessObject = uvio::UvProcess; +pub type RtioUnixListenerObject = uvio::UvUnixListener; +pub type RtioUnixAcceptorObject = uvio::UvUnixAcceptor; pub trait EventLoop { fn run(&mut self); @@ -86,7 +88,12 @@ pub trait IoFactory { Result<~[Path], IoError>; fn pipe_init(&mut self, ipc: bool) -> Result<~RtioUnboundPipeObject, IoError>; fn spawn(&mut self, config: ProcessConfig) - -> Result<(~RtioProcessObject, ~[Option<RtioPipeObject>]), IoError>; + -> Result<(~RtioProcessObject, ~[Option<~RtioPipeObject>]), IoError>; + + fn unix_bind<P: PathLike>(&mut self, path: &P) -> + Result<~RtioUnixListenerObject, IoError>; + fn unix_connect<P: PathLike>(&mut self, path: &P) -> + Result<~RtioPipeObject, IoError>; } pub trait RtioTcpListener : RtioSocket { @@ -154,3 +161,13 @@ pub trait RtioPipe { fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError>; fn write(&mut self, buf: &[u8]) -> Result<(), IoError>; } + +pub trait RtioUnixListener { + fn listen(self) -> Result<~RtioUnixAcceptorObject, IoError>; +} + +pub trait RtioUnixAcceptor { + fn accept(&mut self) -> Result<~RtioPipeObject, IoError>; + fn accept_simultaneously(&mut self) -> Result<(), IoError>; + fn dont_accept_simultaneously(&mut self) -> Result<(), IoError>; +} diff --git a/src/libstd/rt/test.rs b/src/libstd/rt/test.rs index 4f7ebb4a721..759550e5cbd 100644 --- a/src/libstd/rt/test.rs +++ b/src/libstd/rt/test.rs @@ -8,8 +8,12 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +use rand; +use rand::Rng; +use os; use libc; use option::{Some, None}; +use path::{Path, GenericPath}; use cell::Cell; use clone::Clone; use container::Container; @@ -327,6 +331,12 @@ pub fn next_test_port() -> u16 { } } +/// Get a temporary path which could be the location of a unix socket +#[fixed_stack_segment] #[inline(never)] +pub fn next_test_unix() -> Path { + os::tmpdir().push(rand::task_rng().gen_ascii_str(20)) +} + /// Get a unique IPv4 localhost:port pair starting at 9600 pub fn next_test_ip4() -> SocketAddr { SocketAddr { ip: Ipv4Addr(127, 0, 0, 1), port: next_test_port() } diff --git a/src/libstd/rt/uv/net.rs b/src/libstd/rt/uv/net.rs index a2608bf6b24..2e85900a3f2 100644 --- a/src/libstd/rt/uv/net.rs +++ b/src/libstd/rt/uv/net.rs @@ -206,12 +206,6 @@ impl StreamWatcher { } } - pub fn accept(&mut self, stream: StreamWatcher) { - let self_handle = self.native_handle() as *c_void; - let stream_handle = stream.native_handle() as *c_void; - assert_eq!(0, unsafe { uvll::accept(self_handle, stream_handle) } ); - } - pub fn close(self, cb: NullCallback) { { let mut this = self; @@ -230,6 +224,36 @@ impl StreamWatcher { cb(); } } + + pub fn listen(&mut self, cb: ConnectionCallback) -> Result<(), UvError> { + { + let data = self.get_watcher_data(); + assert!(data.connect_cb.is_none()); + data.connect_cb = Some(cb); + } + + unsafe { + static BACKLOG: c_int = 128; // XXX should be configurable + match uvll::listen(self.native_handle(), BACKLOG, connection_cb) { + 0 => Ok(()), + n => Err(UvError(n)) + } + } + + extern fn connection_cb(handle: *uvll::uv_stream_t, status: c_int) { + rtdebug!("connection_cb"); + let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(handle); + let cb = stream_watcher.get_watcher_data().connect_cb.get_ref(); + let status = status_to_maybe_uv_error(status); + (*cb)(stream_watcher, status); + } + } + + pub fn accept(&mut self, stream: StreamWatcher) { + let self_handle = self.native_handle() as *c_void; + let stream_handle = stream.native_handle() as *c_void; + assert_eq!(0, unsafe { uvll::accept(self_handle, stream_handle) } ); + } } impl NativeHandle<*uvll::uv_stream_t> for StreamWatcher { @@ -300,28 +324,6 @@ impl TcpWatcher { } } - pub fn listen(&mut self, cb: ConnectionCallback) { - { - let data = self.get_watcher_data(); - assert!(data.connect_cb.is_none()); - data.connect_cb = Some(cb); - } - - unsafe { - static BACKLOG: c_int = 128; // XXX should be configurable - // XXX: This can probably fail - assert_eq!(0, uvll::listen(self.native_handle(), BACKLOG, connection_cb)); - } - - extern fn connection_cb(handle: *uvll::uv_stream_t, status: c_int) { - rtdebug!("connection_cb"); - let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(handle); - let cb = stream_watcher.get_watcher_data().connect_cb.get_ref(); - let status = status_to_maybe_uv_error(status); - (*cb)(stream_watcher, status); - } - } - pub fn as_stream(&self) -> StreamWatcher { NativeHandle::from_native_handle(self.native_handle() as *uvll::uv_stream_t) } @@ -644,7 +646,8 @@ mod test { server_tcp_watcher.bind(addr); let loop_ = loop_; rtdebug!("listening"); - do server_tcp_watcher.listen |mut server_stream_watcher, status| { + let mut stream = server_tcp_watcher.as_stream(); + let res = do stream.listen |mut server_stream_watcher, status| { rtdebug!("listened!"); assert!(status.is_none()); let mut loop_ = loop_; @@ -678,7 +681,9 @@ mod test { } count_cell.put_back(count); } - } + }; + + assert!(res.is_ok()); let client_thread = do Thread::start { rtdebug!("starting client thread"); @@ -705,7 +710,7 @@ mod test { loop_.run(); loop_.close(); client_thread.join(); - } + }; } #[test] @@ -718,7 +723,8 @@ mod test { server_tcp_watcher.bind(addr); let loop_ = loop_; rtdebug!("listening"); - do server_tcp_watcher.listen |mut server_stream_watcher, status| { + let mut stream = server_tcp_watcher.as_stream(); + let res = do stream.listen |mut server_stream_watcher, status| { rtdebug!("listened!"); assert!(status.is_none()); let mut loop_ = loop_; @@ -754,7 +760,8 @@ mod test { } count_cell.put_back(count); } - } + }; + assert!(res.is_ok()); let client_thread = do Thread::start { rtdebug!("starting client thread"); diff --git a/src/libstd/rt/uv/pipe.rs b/src/libstd/rt/uv/pipe.rs index 1147c731a60..1cb86d4df2c 100644 --- a/src/libstd/rt/uv/pipe.rs +++ b/src/libstd/rt/uv/pipe.rs @@ -10,6 +10,7 @@ use prelude::*; use libc; +use c_str::CString; use rt::uv; use rt::uv::net; @@ -37,6 +38,54 @@ impl Pipe { net::StreamWatcher(**self as *uvll::uv_stream_t) } + #[fixed_stack_segment] #[inline(never)] + pub fn open(&mut self, file: libc::c_int) -> Result<(), uv::UvError> { + match unsafe { uvll::uv_pipe_open(self.native_handle(), file) } { + 0 => Ok(()), + n => Err(uv::UvError(n)) + } + } + + #[fixed_stack_segment] #[inline(never)] + pub fn bind(&mut self, name: &CString) -> Result<(), uv::UvError> { + do name.with_ref |name| { + match unsafe { uvll::uv_pipe_bind(self.native_handle(), name) } { + 0 => Ok(()), + n => Err(uv::UvError(n)) + } + } + } + + #[fixed_stack_segment] #[inline(never)] + pub fn connect(&mut self, name: &CString, cb: uv::ConnectionCallback) { + { + let data = self.get_watcher_data(); + assert!(data.connect_cb.is_none()); + data.connect_cb = Some(cb); + } + + let connect = net::ConnectRequest::new(); + let name = do name.with_ref |p| { p }; + + unsafe { + uvll::uv_pipe_connect(connect.native_handle(), + self.native_handle(), + name, + connect_cb) + } + + extern "C" fn connect_cb(req: *uvll::uv_connect_t, status: libc::c_int) { + let connect_request: net::ConnectRequest = + uv::NativeHandle::from_native_handle(req); + let mut stream_watcher = connect_request.stream(); + connect_request.delete(); + + let cb = stream_watcher.get_watcher_data().connect_cb.take_unwrap(); + let status = uv::status_to_maybe_uv_error(status); + cb(stream_watcher, status); + } + } + pub fn close(self, cb: uv::NullCallback) { { let mut this = self; @@ -47,7 +96,7 @@ impl Pipe { unsafe { uvll::close(self.native_handle(), close_cb); } - extern fn close_cb(handle: *uvll::uv_pipe_t) { + extern "C" fn close_cb(handle: *uvll::uv_pipe_t) { let mut process: Pipe = uv::NativeHandle::from_native_handle(handle); process.get_watcher_data().close_cb.take_unwrap()(); process.drop_watcher_data(); diff --git a/src/libstd/rt/uv/process.rs b/src/libstd/rt/uv/process.rs index 176754de8f7..3c629a783cf 100644 --- a/src/libstd/rt/uv/process.rs +++ b/src/libstd/rt/uv/process.rs @@ -44,7 +44,7 @@ impl Process { /// occurred. pub fn spawn(&mut self, loop_: &uv::Loop, mut config: ProcessConfig, exit_cb: uv::ExitCallback) - -> Result<~[Option<UvPipeStream>], uv::UvError> + -> Result<~[Option<~UvPipeStream>], uv::UvError> { let cwd = config.cwd.map(|s| s.to_c_str()); @@ -144,7 +144,7 @@ impl Process { } unsafe fn set_stdio(dst: *uvll::uv_stdio_container_t, - io: StdioContainer) -> Option<UvPipeStream> { + io: StdioContainer) -> Option<~UvPipeStream> { match io { Ignored => { uvll::set_stdio_container_flags(dst, uvll::STDIO_IGNORE); @@ -166,7 +166,7 @@ unsafe fn set_stdio(dst: *uvll::uv_stdio_container_t, let handle = pipe.pipe.as_stream().native_handle(); uvll::set_stdio_container_flags(dst, flags); uvll::set_stdio_container_stream(dst, handle); - Some(pipe.bind()) + Some(~UvPipeStream::new(**pipe)) } } } diff --git a/src/libstd/rt/uv/uvio.rs b/src/libstd/rt/uv/uvio.rs index 8dd0f8a6b10..6888aa23e99 100644 --- a/src/libstd/rt/uv/uvio.rs +++ b/src/libstd/rt/uv/uvio.rs @@ -746,11 +746,11 @@ impl IoFactory for UvIoFactory { fn pipe_init(&mut self, ipc: bool) -> Result<~RtioUnboundPipeObject, IoError> { let home = get_handle_to_current_scheduler!(); - Ok(~UvUnboundPipe { pipe: Pipe::new(self.uv_loop(), ipc), home: home }) + Ok(~UvUnboundPipe::new(Pipe::new(self.uv_loop(), ipc), home)) } fn spawn(&mut self, config: ProcessConfig) - -> Result<(~RtioProcessObject, ~[Option<RtioPipeObject>]), IoError> + -> Result<(~RtioProcessObject, ~[Option<~RtioPipeObject>]), IoError> { // Sadly, we must create the UvProcess before we actually call uv_spawn // so that the exit_cb can close over it and notify it when the process @@ -801,6 +801,74 @@ impl IoFactory for UvIoFactory { } } } + + fn unix_bind<P: PathLike>(&mut self, path: &P) -> + Result<~RtioUnixListenerObject, IoError> { + let mut pipe = Pipe::new(self.uv_loop(), false); + match pipe.bind(&path.path_as_str(|s| s.to_c_str())) { + Ok(()) => { + let handle = get_handle_to_current_scheduler!(); + let pipe = UvUnboundPipe::new(pipe, handle); + Ok(~UvUnixListener::new(pipe)) + } + Err(e) => { + let scheduler: ~Scheduler = Local::take(); + do scheduler.deschedule_running_task_and_then |_, task| { + let task_cell = Cell::new(task); + do pipe.close { + let scheduler: ~Scheduler = Local::take(); + scheduler.resume_blocked_task_immediately( + task_cell.take()); + } + } + Err(uv_error_to_io_error(e)) + } + } + } + + fn unix_connect<P: PathLike>(&mut self, path: &P) -> + Result<~RtioPipeObject, IoError> + { + let scheduler: ~Scheduler = Local::take(); + let mut pipe = Pipe::new(self.uv_loop(), false); + let result_cell = Cell::new_empty(); + let result_cell_ptr: *Cell<Result<~RtioPipeObject, IoError>> = &result_cell; + + do scheduler.deschedule_running_task_and_then |_, task| { + let task_cell = Cell::new(task); + let cstr = do path.path_as_str |s| { s.to_c_str() }; + do pipe.connect(&cstr) |stream, err| { + let res = match err { + None => { + let handle = stream.native_handle(); + let pipe = NativeHandle::from_native_handle( + handle as *uvll::uv_pipe_t); + let home = get_handle_to_current_scheduler!(); + let pipe = UvUnboundPipe::new(pipe, home); + Ok(~UvPipeStream::new(pipe)) + } + Some(e) => { Err(uv_error_to_io_error(e)) } + }; + unsafe { (*result_cell_ptr).put_back(res); } + let scheduler: ~Scheduler = Local::take(); + scheduler.resume_blocked_task_immediately(task_cell.take()); + } + } + + assert!(!result_cell.is_empty()); + let ret = result_cell.take(); + if ret.is_err() { + let scheduler: ~Scheduler = Local::take(); + do scheduler.deschedule_running_task_and_then |_, task| { + let task_cell = Cell::new(task); + do pipe.close { + let scheduler: ~Scheduler = Local::take(); + scheduler.resume_blocked_task_immediately(task_cell.take()); + } + } + } + return ret; + } } pub struct UvTcpListener { @@ -843,9 +911,10 @@ impl RtioSocket for UvTcpListener { impl RtioTcpListener for UvTcpListener { fn listen(self) -> Result<~RtioTcpAcceptorObject, IoError> { do self.home_for_io_consume |self_| { - let mut acceptor = ~UvTcpAcceptor::new(self_); + let acceptor = ~UvTcpAcceptor::new(self_); let incoming = Cell::new(acceptor.incoming.clone()); - do acceptor.listener.watcher.listen |mut server, status| { + let mut stream = acceptor.listener.watcher.as_stream(); + let res = do stream.listen |mut server, status| { do incoming.with_mut_ref |incoming| { let inc = match status { Some(_) => Err(standard_error(OtherIoError)), @@ -860,7 +929,10 @@ impl RtioTcpListener for UvTcpListener { incoming.send(inc); } }; - Ok(acceptor) + match res { + Ok(()) => Ok(acceptor), + Err(e) => Err(uv_error_to_io_error(e)), + } } } } @@ -888,6 +960,17 @@ impl RtioSocket for UvTcpAcceptor { } } +fn accept_simultaneously(stream: StreamWatcher, a: int) -> Result<(), IoError> { + let r = unsafe { + uvll::tcp_simultaneous_accepts(stream.native_handle(), a as c_int) + }; + + match status_to_maybe_uv_error(r) { + Some(err) => Err(uv_error_to_io_error(err)), + None => Ok(()) + } +} + impl RtioTcpAcceptor for UvTcpAcceptor { fn accept(&mut self) -> Result<~RtioTcpStreamObject, IoError> { do self.home_for_io |self_| { @@ -897,27 +980,13 @@ impl RtioTcpAcceptor for UvTcpAcceptor { fn accept_simultaneously(&mut self) -> Result<(), IoError> { do self.home_for_io |self_| { - let r = unsafe { - uvll::tcp_simultaneous_accepts(self_.listener.watcher.native_handle(), 1 as c_int) - }; - - match status_to_maybe_uv_error(r) { - Some(err) => Err(uv_error_to_io_error(err)), - None => Ok(()) - } + accept_simultaneously(self_.listener.watcher.as_stream(), 1) } } fn dont_accept_simultaneously(&mut self) -> Result<(), IoError> { do self.home_for_io |self_| { - let r = unsafe { - uvll::tcp_simultaneous_accepts(self_.listener.watcher.native_handle(), 0 as c_int) - }; - - match status_to_maybe_uv_error(r) { - Some(err) => Err(uv_error_to_io_error(err)), - None => Ok(()) - } + accept_simultaneously(self_.listener.watcher.as_stream(), 0) } } } @@ -994,6 +1063,12 @@ pub struct UvUnboundPipe { priv home: SchedHandle, } +impl UvUnboundPipe { + fn new(pipe: Pipe, home: SchedHandle) -> UvUnboundPipe { + UvUnboundPipe { pipe: pipe, home: home } + } +} + impl HomingIO for UvUnboundPipe { fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home } } @@ -1013,18 +1088,12 @@ impl Drop for UvUnboundPipe { } } -impl UvUnboundPipe { - pub unsafe fn bind(~self) -> UvPipeStream { - UvPipeStream { inner: self } - } -} - pub struct UvPipeStream { - priv inner: ~UvUnboundPipe, + priv inner: UvUnboundPipe, } impl UvPipeStream { - pub fn new(inner: ~UvUnboundPipe) -> UvPipeStream { + pub fn new(inner: UvUnboundPipe) -> UvPipeStream { UvPipeStream { inner: inner } } } @@ -1612,6 +1681,84 @@ impl RtioProcess for UvProcess { } } +pub struct UvUnixListener { + priv inner: UvUnboundPipe +} + +impl HomingIO for UvUnixListener { + fn home<'r>(&'r mut self) -> &'r mut SchedHandle { self.inner.home() } +} + +impl UvUnixListener { + fn new(pipe: UvUnboundPipe) -> UvUnixListener { + UvUnixListener { inner: pipe } + } +} + +impl RtioUnixListener for UvUnixListener { + fn listen(self) -> Result<~RtioUnixAcceptorObject, IoError> { + do self.home_for_io_consume |self_| { + let acceptor = ~UvUnixAcceptor::new(self_); + let incoming = Cell::new(acceptor.incoming.clone()); + let mut stream = acceptor.listener.inner.pipe.as_stream(); + let res = do stream.listen |mut server, status| { + do incoming.with_mut_ref |incoming| { + let inc = match status { + Some(e) => Err(uv_error_to_io_error(e)), + None => { + let inc = Pipe::new(&server.event_loop(), false); + server.accept(inc.as_stream()); + let home = get_handle_to_current_scheduler!(); + let pipe = UvUnboundPipe::new(inc, home); + Ok(~UvPipeStream::new(pipe)) + } + }; + incoming.send(inc); + } + }; + match res { + Ok(()) => Ok(acceptor), + Err(e) => Err(uv_error_to_io_error(e)), + } + } + } +} + +pub struct UvUnixAcceptor { + listener: UvUnixListener, + incoming: Tube<Result<~RtioPipeObject, IoError>>, +} + +impl HomingIO for UvUnixAcceptor { + fn home<'r>(&'r mut self) -> &'r mut SchedHandle { self.listener.home() } +} + +impl UvUnixAcceptor { + fn new(listener: UvUnixListener) -> UvUnixAcceptor { + UvUnixAcceptor { listener: listener, incoming: Tube::new() } + } +} + +impl RtioUnixAcceptor for UvUnixAcceptor { + fn accept(&mut self) -> Result<~RtioPipeObject, IoError> { + do self.home_for_io |self_| { + self_.incoming.recv() + } + } + + fn accept_simultaneously(&mut self) -> Result<(), IoError> { + do self.home_for_io |self_| { + accept_simultaneously(self_.listener.inner.pipe.as_stream(), 1) + } + } + + fn dont_accept_simultaneously(&mut self) -> Result<(), IoError> { + do self.home_for_io |self_| { + accept_simultaneously(self_.listener.inner.pipe.as_stream(), 0) + } + } +} + #[test] fn test_simple_io_no_connect() { do run_in_mt_newsched_task { diff --git a/src/libstd/rt/uv/uvll.rs b/src/libstd/rt/uv/uvll.rs index 367585b0f0e..eb770d08070 100644 --- a/src/libstd/rt/uv/uvll.rs +++ b/src/libstd/rt/uv/uvll.rs @@ -1102,4 +1102,23 @@ extern { fn rust_set_stdio_container_stream(c: *uv_stdio_container_t, stream: *uv_stream_t); fn rust_uv_pipe_init(loop_ptr: *c_void, p: *uv_pipe_t, ipc: c_int) -> c_int; + + pub fn uv_pipe_open(pipe: *uv_pipe_t, file: c_int) -> c_int; + pub fn uv_pipe_bind(pipe: *uv_pipe_t, name: *c_char) -> c_int; + pub fn uv_pipe_connect(req: *uv_connect_t, handle: *uv_pipe_t, + name: *c_char, cb: uv_connect_cb); + + // These should all really be constants... + #[rust_stack] pub fn rust_SOCK_STREAM() -> c_int; + #[rust_stack] pub fn rust_SOCK_DGRAM() -> c_int; + #[rust_stack] pub fn rust_SOCK_RAW() -> c_int; + #[rust_stack] pub fn rust_IPPROTO_UDP() -> c_int; + #[rust_stack] pub fn rust_IPPROTO_TCP() -> c_int; + #[rust_stack] pub fn rust_AI_ADDRCONFIG() -> c_int; + #[rust_stack] pub fn rust_AI_ALL() -> c_int; + #[rust_stack] pub fn rust_AI_CANONNAME() -> c_int; + #[rust_stack] pub fn rust_AI_NUMERICHOST() -> c_int; + #[rust_stack] pub fn rust_AI_NUMERICSERV() -> c_int; + #[rust_stack] pub fn rust_AI_PASSIVE() -> c_int; + #[rust_stack] pub fn rust_AI_V4MAPPED() -> c_int; } |
