diff options
Diffstat (limited to 'src/libnative/io/pipe_unix.rs')
| -rw-r--r-- | src/libnative/io/pipe_unix.rs | 88 | 
1 files changed, 70 insertions, 18 deletions
diff --git a/src/libnative/io/pipe_unix.rs b/src/libnative/io/pipe_unix.rs index 895b8b5929c..a3564dfe2cc 100644 --- a/src/libnative/io/pipe_unix.rs +++ b/src/libnative/io/pipe_unix.rs @@ -15,12 +15,14 @@ use std::mem; use std::rt::mutex; use std::rt::rtio; use std::rt::rtio::{IoResult, IoError}; +use std::sync::atomic; use super::retry; use super::net; use super::util; use super::c; -use super::file::fd_t; +use super::process; +use super::file::{fd_t, FileDesc}; fn unix_socket(ty: libc::c_int) -> IoResult<fd_t> { match unsafe { libc::socket(libc::AF_UNIX, ty, 0) } { @@ -225,7 +227,23 @@ impl UnixListener { pub fn native_listen(self, backlog: int) -> IoResult<UnixAcceptor> { match unsafe { libc::listen(self.fd(), backlog as libc::c_int) } { -1 => Err(super::last_error()), - _ => Ok(UnixAcceptor { listener: self, deadline: 0 }) + + #[cfg(unix)] + _ => { + let (reader, writer) = try!(process::pipe()); + try!(util::set_nonblocking(reader.fd(), true)); + try!(util::set_nonblocking(writer.fd(), true)); + try!(util::set_nonblocking(self.fd(), true)); + Ok(UnixAcceptor { + inner: Arc::new(AcceptorInner { + listener: self, + reader: reader, + writer: writer, + closed: atomic::AtomicBool::new(false), + }), + deadline: 0, + }) + } } } } @@ -240,29 +258,45 @@ impl rtio::RtioUnixListener for UnixListener { } pub struct UnixAcceptor { - listener: UnixListener, + inner: Arc<AcceptorInner>, deadline: u64, } +#[cfg(unix)] +struct AcceptorInner { + listener: UnixListener, + reader: FileDesc, + writer: FileDesc, + closed: atomic::AtomicBool, +} + impl UnixAcceptor { - fn fd(&self) -> fd_t { self.listener.fd() } + fn fd(&self) -> fd_t { self.inner.listener.fd() } pub fn native_accept(&mut self) -> IoResult<UnixStream> { - if self.deadline != 0 { - try!(util::await(self.fd(), Some(self.deadline), util::Readable)); - } - let mut storage: libc::sockaddr_storage = unsafe { mem::zeroed() }; - let storagep = &mut storage as *mut libc::sockaddr_storage; - let size = mem::size_of::<libc::sockaddr_storage>(); - let mut size = size as libc::socklen_t; - match retry(|| unsafe { - libc::accept(self.fd(), - storagep as *mut libc::sockaddr, - &mut size as *mut libc::socklen_t) as libc::c_int - }) { - -1 => Err(super::last_error()), - fd => Ok(UnixStream::new(Arc::new(Inner::new(fd)))) + let deadline = if self.deadline == 0 {None} else {Some(self.deadline)}; + + while !self.inner.closed.load(atomic::SeqCst) { + unsafe { + let mut storage: libc::sockaddr_storage = mem::zeroed(); + let storagep = &mut storage as *mut libc::sockaddr_storage; + let size = mem::size_of::<libc::sockaddr_storage>(); + let mut size = size as libc::socklen_t; + match retry(|| { + libc::accept(self.fd(), + storagep as *mut libc::sockaddr, + &mut size as *mut libc::socklen_t) as libc::c_int + }) { + -1 if util::wouldblock() => {} + -1 => return Err(super::last_error()), + fd => return Ok(UnixStream::new(Arc::new(Inner::new(fd)))), + } + } + try!(util::await([self.fd(), self.inner.reader.fd()], + deadline, util::Readable)); } + + Err(util::eof()) } } @@ -273,6 +307,24 @@ impl rtio::RtioUnixAcceptor for UnixAcceptor { fn set_timeout(&mut self, timeout: Option<u64>) { self.deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0); } + + fn clone(&self) -> Box<rtio::RtioUnixAcceptor + Send> { + box UnixAcceptor { + inner: self.inner.clone(), + deadline: 0, + } as Box<rtio::RtioUnixAcceptor + Send> + } + + #[cfg(unix)] + fn close_accept(&mut self) -> IoResult<()> { + self.inner.closed.store(true, atomic::SeqCst); + let mut fd = FileDesc::new(self.inner.writer.fd(), false); + match fd.inner_write([0]) { + Ok(..) => Ok(()), + Err(..) if util::wouldblock() => Ok(()), + Err(e) => Err(e), + } + } } impl Drop for UnixListener {  | 
