about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--src/libnative/io/net.rs93
-rw-r--r--src/libnative/io/pipe_unix.rs88
-rw-r--r--src/libnative/io/pipe_windows.rs6
-rw-r--r--src/libnative/io/process.rs2
-rw-r--r--src/libnative/io/util.rs13
-rw-r--r--src/librustrt/rtio.rs4
-rw-r--r--src/libstd/io/net/tcp.rs132
-rw-r--r--src/libstd/io/net/unix.rs95
-rw-r--r--src/test/run-pass/tcp-accept-stress.rs94
9 files changed, 481 insertions, 46 deletions
diff --git a/src/libnative/io/net.rs b/src/libnative/io/net.rs
index 2255578ba80..7a8a363a0a3 100644
--- a/src/libnative/io/net.rs
+++ b/src/libnative/io/net.rs
@@ -14,10 +14,13 @@ use std::mem;
 use std::rt::mutex;
 use std::rt::rtio;
 use std::rt::rtio::{IoResult, IoError};
+use std::sync::atomics;
 
 use super::{retry, keep_going};
 use super::c;
 use super::util;
+use super::file::FileDesc;
+use super::process;
 
 ////////////////////////////////////////////////////////////////////////////////
 // sockaddr and misc bindings
@@ -479,9 +482,26 @@ impl TcpListener {
     pub fn fd(&self) -> sock_t { self.inner.fd }
 
     pub fn native_listen(self, backlog: int) -> IoResult<TcpAcceptor> {
+        try!(util::set_nonblocking(self.fd(), true));
         match unsafe { libc::listen(self.fd(), backlog as libc::c_int) } {
             -1 => Err(last_error()),
-            _ => Ok(TcpAcceptor { listener: self, deadline: 0 })
+
+            #[cfg(unix)]
+            _ => {
+                let (reader, writer) = try!(process::pipe());
+                try!(util::set_nonblocking(reader.fd(), true));
+                try!(util::set_nonblocking(writer.fd(), true));
+                try!(util::set_nonblocking(self.fd(), true));
+                Ok(TcpAcceptor {
+                    inner: Arc::new(AcceptorInner {
+                        listener: self,
+                        reader: reader,
+                        writer: writer,
+                        closed: atomics::AtomicBool::new(false),
+                    }),
+                    deadline: 0,
+                })
+            }
         }
     }
 }
@@ -502,31 +522,46 @@ impl rtio::RtioSocket for TcpListener {
 }
 
 pub struct TcpAcceptor {
-    listener: TcpListener,
+    inner: Arc<AcceptorInner>,
     deadline: u64,
 }
 
+#[cfg(unix)]
+struct AcceptorInner {
+    listener: TcpListener,
+    reader: FileDesc,
+    writer: FileDesc,
+    closed: atomics::AtomicBool,
+}
+
 impl TcpAcceptor {
-    pub fn fd(&self) -> sock_t { self.listener.fd() }
+    pub fn fd(&self) -> sock_t { self.inner.listener.fd() }
 
+    #[cfg(unix)]
     pub fn native_accept(&mut self) -> IoResult<TcpStream> {
-        if self.deadline != 0 {
-            try!(util::await(self.fd(), Some(self.deadline), util::Readable));
-        }
-        unsafe {
-            let mut storage: libc::sockaddr_storage = mem::zeroed();
-            let storagep = &mut storage as *mut libc::sockaddr_storage;
-            let size = mem::size_of::<libc::sockaddr_storage>();
-            let mut size = size as libc::socklen_t;
-            match retry(|| {
-                libc::accept(self.fd(),
-                             storagep as *mut libc::sockaddr,
-                             &mut size as *mut libc::socklen_t) as libc::c_int
-            }) as sock_t {
-                -1 => Err(last_error()),
-                fd => Ok(TcpStream::new(Inner::new(fd))),
+        let deadline = if self.deadline == 0 {None} else {Some(self.deadline)};
+
+        while !self.inner.closed.load(atomics::SeqCst) {
+            unsafe {
+                let mut storage: libc::sockaddr_storage = mem::zeroed();
+                let storagep = &mut storage as *mut libc::sockaddr_storage;
+                let size = mem::size_of::<libc::sockaddr_storage>();
+                let mut size = size as libc::socklen_t;
+                match retry(|| {
+                    libc::accept(self.fd(),
+                                 storagep as *mut libc::sockaddr,
+                                 &mut size as *mut libc::socklen_t) as libc::c_int
+                }) as sock_t {
+                    -1 if util::wouldblock() => {}
+                    -1 => return Err(last_error()),
+                    fd => return Ok(TcpStream::new(Inner::new(fd))),
+                }
             }
+            try!(util::await([self.fd(), self.inner.reader.fd()],
+                             deadline, util::Readable));
         }
+
+        Err(util::eof())
     }
 }
 
@@ -546,6 +581,24 @@ impl rtio::RtioTcpAcceptor for TcpAcceptor {
     fn set_timeout(&mut self, timeout: Option<u64>) {
         self.deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
     }
+
+    fn clone(&self) -> Box<rtio::RtioTcpAcceptor + Send> {
+        box TcpAcceptor {
+            inner: self.inner.clone(),
+            deadline: 0,
+        } as Box<rtio::RtioTcpAcceptor + Send>
+    }
+
+    #[cfg(unix)]
+    fn close_accept(&mut self) -> IoResult<()> {
+        self.inner.closed.store(true, atomics::SeqCst);
+        let mut fd = FileDesc::new(self.inner.writer.fd(), false);
+        match fd.inner_write([0]) {
+            Ok(..) => Ok(()),
+            Err(..) if util::wouldblock() => Ok(()),
+            Err(e) => Err(e),
+        }
+    }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -817,7 +870,7 @@ pub fn read<T>(fd: sock_t,
             // With a timeout, first we wait for the socket to become
             // readable using select(), specifying the relevant timeout for
             // our previously set deadline.
-            try!(util::await(fd, deadline, util::Readable));
+            try!(util::await([fd], deadline, util::Readable));
 
             // At this point, we're still within the timeout, and we've
             // determined that the socket is readable (as returned by
@@ -871,7 +924,7 @@ pub fn write<T>(fd: sock_t,
         while written < buf.len() && (write_everything || written == 0) {
             // As with read(), first wait for the socket to be ready for
             // the I/O operation.
-            match util::await(fd, deadline, util::Writable) {
+            match util::await([fd], deadline, util::Writable) {
                 Err(ref e) if e.code == libc::EOF as uint && written > 0 => {
                     assert!(deadline.is_some());
                     return Err(util::short_write(written, "short write"))
diff --git a/src/libnative/io/pipe_unix.rs b/src/libnative/io/pipe_unix.rs
index 895b8b5929c..4ad8383e6f8 100644
--- a/src/libnative/io/pipe_unix.rs
+++ b/src/libnative/io/pipe_unix.rs
@@ -15,12 +15,14 @@ use std::mem;
 use std::rt::mutex;
 use std::rt::rtio;
 use std::rt::rtio::{IoResult, IoError};
+use std::sync::atomics;
 
 use super::retry;
 use super::net;
 use super::util;
 use super::c;
-use super::file::fd_t;
+use super::process;
+use super::file::{fd_t, FileDesc};
 
 fn unix_socket(ty: libc::c_int) -> IoResult<fd_t> {
     match unsafe { libc::socket(libc::AF_UNIX, ty, 0) } {
@@ -225,7 +227,23 @@ impl UnixListener {
     pub fn native_listen(self, backlog: int) -> IoResult<UnixAcceptor> {
         match unsafe { libc::listen(self.fd(), backlog as libc::c_int) } {
             -1 => Err(super::last_error()),
-            _ => Ok(UnixAcceptor { listener: self, deadline: 0 })
+
+            #[cfg(unix)]
+            _ => {
+                let (reader, writer) = try!(process::pipe());
+                try!(util::set_nonblocking(reader.fd(), true));
+                try!(util::set_nonblocking(writer.fd(), true));
+                try!(util::set_nonblocking(self.fd(), true));
+                Ok(UnixAcceptor {
+                    inner: Arc::new(AcceptorInner {
+                        listener: self,
+                        reader: reader,
+                        writer: writer,
+                        closed: atomics::AtomicBool::new(false),
+                    }),
+                    deadline: 0,
+                })
+            }
         }
     }
 }
@@ -240,29 +258,45 @@ impl rtio::RtioUnixListener for UnixListener {
 }
 
 pub struct UnixAcceptor {
-    listener: UnixListener,
+    inner: Arc<AcceptorInner>,
     deadline: u64,
 }
 
+#[cfg(unix)]
+struct AcceptorInner {
+    listener: UnixListener,
+    reader: FileDesc,
+    writer: FileDesc,
+    closed: atomics::AtomicBool,
+}
+
 impl UnixAcceptor {
-    fn fd(&self) -> fd_t { self.listener.fd() }
+    fn fd(&self) -> fd_t { self.inner.listener.fd() }
 
     pub fn native_accept(&mut self) -> IoResult<UnixStream> {
-        if self.deadline != 0 {
-            try!(util::await(self.fd(), Some(self.deadline), util::Readable));
-        }
-        let mut storage: libc::sockaddr_storage = unsafe { mem::zeroed() };
-        let storagep = &mut storage as *mut libc::sockaddr_storage;
-        let size = mem::size_of::<libc::sockaddr_storage>();
-        let mut size = size as libc::socklen_t;
-        match retry(|| unsafe {
-            libc::accept(self.fd(),
-                         storagep as *mut libc::sockaddr,
-                         &mut size as *mut libc::socklen_t) as libc::c_int
-        }) {
-            -1 => Err(super::last_error()),
-            fd => Ok(UnixStream::new(Arc::new(Inner::new(fd))))
+        let deadline = if self.deadline == 0 {None} else {Some(self.deadline)};
+
+        while !self.inner.closed.load(atomics::SeqCst) {
+            unsafe {
+                let mut storage: libc::sockaddr_storage = mem::zeroed();
+                let storagep = &mut storage as *mut libc::sockaddr_storage;
+                let size = mem::size_of::<libc::sockaddr_storage>();
+                let mut size = size as libc::socklen_t;
+                match retry(|| {
+                    libc::accept(self.fd(),
+                                 storagep as *mut libc::sockaddr,
+                                 &mut size as *mut libc::socklen_t) as libc::c_int
+                }) {
+                    -1 if util::wouldblock() => {}
+                    -1 => return Err(super::last_error()),
+                    fd => return Ok(UnixStream::new(Arc::new(Inner::new(fd)))),
+                }
+            }
+            try!(util::await([self.fd(), self.inner.reader.fd()],
+                             deadline, util::Readable));
         }
+
+        Err(util::eof())
     }
 }
 
@@ -273,6 +307,24 @@ impl rtio::RtioUnixAcceptor for UnixAcceptor {
     fn set_timeout(&mut self, timeout: Option<u64>) {
         self.deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
     }
+
+    fn clone(&self) -> Box<rtio::RtioUnixAcceptor + Send> {
+        box UnixAcceptor {
+            inner: self.inner.clone(),
+            deadline: 0,
+        } as Box<rtio::RtioUnixAcceptor + Send>
+    }
+
+    #[cfg(unix)]
+    fn close_accept(&mut self) -> IoResult<()> {
+        self.inner.closed.store(true, atomics::SeqCst);
+        let mut fd = FileDesc::new(self.inner.writer.fd(), false);
+        match fd.inner_write([0]) {
+            Ok(..) => Ok(()),
+            Err(..) if util::wouldblock() => Ok(()),
+            Err(e) => Err(e),
+        }
+    }
 }
 
 impl Drop for UnixListener {
diff --git a/src/libnative/io/pipe_windows.rs b/src/libnative/io/pipe_windows.rs
index 717915e5d23..6ad51ee586f 100644
--- a/src/libnative/io/pipe_windows.rs
+++ b/src/libnative/io/pipe_windows.rs
@@ -99,10 +99,10 @@ use super::c;
 use super::util;
 use super::file::to_utf16;
 
-struct Event(libc::HANDLE);
+pub struct Event(libc::HANDLE);
 
 impl Event {
-    fn new(manual_reset: bool, initial_state: bool) -> IoResult<Event> {
+    pub fn new(manual_reset: bool, initial_state: bool) -> IoResult<Event> {
         let event = unsafe {
             libc::CreateEventW(ptr::mut_null(),
                                manual_reset as libc::BOOL,
@@ -116,7 +116,7 @@ impl Event {
         }
     }
 
-    fn handle(&self) -> libc::HANDLE { let Event(handle) = *self; handle }
+    pub fn handle(&self) -> libc::HANDLE { let Event(handle) = *self; handle }
 }
 
 impl Drop for Event {
diff --git a/src/libnative/io/process.rs b/src/libnative/io/process.rs
index d1b28854157..b8ec0cd5496 100644
--- a/src/libnative/io/process.rs
+++ b/src/libnative/io/process.rs
@@ -191,7 +191,7 @@ impl Drop for Process {
     }
 }
 
-fn pipe() -> IoResult<(file::FileDesc, file::FileDesc)> {
+pub fn pipe() -> IoResult<(file::FileDesc, file::FileDesc)> {
     #[cfg(unix)] use libc::EMFILE as ERROR;
     #[cfg(windows)] use libc::WSAEMFILE as ERROR;
     struct Closer { fd: libc::c_int }
diff --git a/src/libnative/io/util.rs b/src/libnative/io/util.rs
index 356805d91de..aec29bc2d03 100644
--- a/src/libnative/io/util.rs
+++ b/src/libnative/io/util.rs
@@ -9,6 +9,7 @@
 // except according to those terms.
 
 use libc;
+use std::cmp;
 use std::mem;
 use std::os;
 use std::ptr;
@@ -166,10 +167,15 @@ pub fn connect_timeout(fd: net::sock_t,
     }
 }
 
-pub fn await(fd: net::sock_t, deadline: Option<u64>,
+pub fn await(fds: &[net::sock_t], deadline: Option<u64>,
              status: SocketStatus) -> IoResult<()> {
     let mut set: c::fd_set = unsafe { mem::zeroed() };
-    c::fd_set(&mut set, fd);
+    let mut max = 0;
+    for &fd in fds.iter() {
+        c::fd_set(&mut set, fd);
+        max = cmp::max(max, fd + 1);
+    }
+
     let (read, write) = match status {
         Readable => (&mut set as *mut _, ptr::mut_null()),
         Writable => (ptr::mut_null(), &mut set as *mut _),
@@ -188,8 +194,7 @@ pub fn await(fd: net::sock_t, deadline: Option<u64>,
                 &mut tv as *mut _
             }
         };
-        let n = if cfg!(windows) {1} else {fd as libc::c_int + 1};
-        let r = unsafe { c::select(n, read, write, ptr::mut_null(), tvp) };
+        let r = unsafe { c::select(max, read, write, ptr::mut_null(), tvp) };
         r
     }) {
         -1 => Err(last_error()),
diff --git a/src/librustrt/rtio.rs b/src/librustrt/rtio.rs
index 6525adf07f7..261d544a241 100644
--- a/src/librustrt/rtio.rs
+++ b/src/librustrt/rtio.rs
@@ -246,6 +246,8 @@ pub trait RtioTcpAcceptor : RtioSocket {
     fn accept_simultaneously(&mut self) -> IoResult<()>;
     fn dont_accept_simultaneously(&mut self) -> IoResult<()>;
     fn set_timeout(&mut self, timeout: Option<u64>);
+    fn clone(&self) -> Box<RtioTcpAcceptor + Send>;
+    fn close_accept(&mut self) -> IoResult<()>;
 }
 
 pub trait RtioTcpStream : RtioSocket {
@@ -335,6 +337,8 @@ pub trait RtioUnixListener {
 pub trait RtioUnixAcceptor {
     fn accept(&mut self) -> IoResult<Box<RtioPipe + Send>>;
     fn set_timeout(&mut self, timeout: Option<u64>);
+    fn clone(&self) -> Box<RtioUnixAcceptor + Send>;
+    fn close_accept(&mut self) -> IoResult<()>;
 }
 
 pub trait RtioTTY {
diff --git a/src/libstd/io/net/tcp.rs b/src/libstd/io/net/tcp.rs
index 7055b9d7a47..ebc3940c16f 100644
--- a/src/libstd/io/net/tcp.rs
+++ b/src/libstd/io/net/tcp.rs
@@ -442,6 +442,54 @@ impl TcpAcceptor {
     #[experimental = "the type of the argument and name of this function are \
                       subject to change"]
     pub fn set_timeout(&mut self, ms: Option<u64>) { self.obj.set_timeout(ms); }
+
+    /// Closes the accepting capabilities of this acceptor.
+    ///
+    /// This function is similar to `TcpStream`'s `close_{read,write}` methods
+    /// in that it will affect *all* cloned handles of this acceptor's original
+    /// handle.
+    ///
+    /// Once this function succeeds, all future calls to `accept` will return
+    /// immediately with an error, preventing all future calls to accept. The
+    /// underlying socket will not be relinquished back to the OS until all
+    /// acceptors have been deallocated.
+    ///
+    /// This is useful for waking up a thread in an accept loop to indicate that
+    /// it should exit.
+    ///
+    /// # Example
+    ///
+    /// ```
+    /// # #![allow(experimental)]
+    /// use std::io::TcpListener;
+    /// use std::io::{Listener, Acceptor, TimedOut};
+    ///
+    /// let mut a = TcpListener::bind("127.0.0.1", 8482).listen().unwrap();
+    /// let a2 = a.clone();
+    ///
+    /// spawn(proc() {
+    ///     let mut a2 = a2;
+    ///     for socket in a2.incoming() {
+    ///         match socket {
+    ///             Ok(s) => { /* handle s */ }
+    ///             Err(ref e) if e.kind == EndOfFile => break, // closed
+    ///             Err(e) => fail!("unexpected error: {}", e),
+    ///         }
+    ///     }
+    /// });
+    ///
+    /// # fn wait_for_sigint() {}
+    /// // Now that our accept loop is running, wait for the program to be
+    /// // requested to exit.
+    /// wait_for_sigint();
+    ///
+    /// // Signal our accept loop to exit
+    /// assert!(a.close_accept().is_ok());
+    /// ```
+    #[experimental]
+    pub fn close_accept(&mut self) -> IoResult<()> {
+        self.obj.close_accept().map_err(IoError::from_rtio_error)
+    }
 }
 
 impl Acceptor<TcpStream> for TcpAcceptor {
@@ -453,6 +501,25 @@ impl Acceptor<TcpStream> for TcpAcceptor {
     }
 }
 
+impl Clone for TcpAcceptor {
+    /// Creates a new handle to this TCP acceptor, allowing for simultaneous
+    /// accepts.
+    ///
+    /// The underlying TCP acceptor will not be closed until all handles to the
+    /// acceptor have been deallocated. Incoming connections will be received on
+    /// at most once acceptor, the same connection will not be accepted twice.
+    ///
+    /// The `close_accept` method will shut down *all* acceptors cloned from the
+    /// same original acceptor, whereas the `set_timeout` method only affects
+    /// the selector that it is called on.
+    ///
+    /// This function is useful for creating a handle to invoke `close_accept`
+    /// on to wake up any other task blocked in `accept`.
+    fn clone(&self) -> TcpAcceptor {
+        TcpAcceptor { obj: self.obj.clone() }
+    }
+}
+
 #[cfg(test)]
 #[allow(experimental)]
 mod test {
@@ -1411,4 +1478,69 @@ mod test {
         rxdone.recv();
         rxdone.recv();
     })
+
+    iotest!(fn clone_accept_smoke() {
+        let addr = next_test_ip4();
+        let l = TcpListener::bind(addr.ip.to_string().as_slice(), addr.port);
+        let mut a = l.listen().unwrap();
+        let mut a2 = a.clone();
+
+        spawn(proc() {
+            let _ = TcpStream::connect(addr.ip.to_string().as_slice(), addr.port);
+        });
+        spawn(proc() {
+            let _ = TcpStream::connect(addr.ip.to_string().as_slice(), addr.port);
+        });
+
+        assert!(a.accept().is_ok());
+        assert!(a2.accept().is_ok());
+    })
+
+    iotest!(fn clone_accept_concurrent() {
+        let addr = next_test_ip4();
+        let l = TcpListener::bind(addr.ip.to_string().as_slice(), addr.port);
+        let a = l.listen().unwrap();
+        let a2 = a.clone();
+
+        let (tx, rx) = channel();
+        let tx2 = tx.clone();
+
+        spawn(proc() { let mut a = a; tx.send(a.accept()) });
+        spawn(proc() { let mut a = a2; tx2.send(a.accept()) });
+
+        spawn(proc() {
+            let _ = TcpStream::connect(addr.ip.to_string().as_slice(), addr.port);
+        });
+        spawn(proc() {
+            let _ = TcpStream::connect(addr.ip.to_string().as_slice(), addr.port);
+        });
+
+        assert!(rx.recv().is_ok());
+        assert!(rx.recv().is_ok());
+    })
+
+    iotest!(fn close_accept_smoke() {
+        let addr = next_test_ip4();
+        let l = TcpListener::bind(addr.ip.to_string().as_slice(), addr.port);
+        let mut a = l.listen().unwrap();
+
+        a.close_accept().unwrap();
+        assert_eq!(a.accept().err().unwrap().kind, EndOfFile);
+    })
+
+    iotest!(fn close_accept_concurrent() {
+        let addr = next_test_ip4();
+        let l = TcpListener::bind(addr.ip.to_string().as_slice(), addr.port);
+        let a = l.listen().unwrap();
+        let mut a2 = a.clone();
+
+        let (tx, rx) = channel();
+        spawn(proc() {
+            let mut a = a;
+            tx.send(a.accept());
+        });
+        a2.close_accept().unwrap();
+
+        assert_eq!(rx.recv().err().unwrap().kind, EndOfFile);
+    })
 }
diff --git a/src/libstd/io/net/unix.rs b/src/libstd/io/net/unix.rs
index eb251075418..74f024a844e 100644
--- a/src/libstd/io/net/unix.rs
+++ b/src/libstd/io/net/unix.rs
@@ -212,6 +212,15 @@ impl UnixAcceptor {
     pub fn set_timeout(&mut self, timeout_ms: Option<u64>) {
         self.obj.set_timeout(timeout_ms)
     }
+
+    /// Closes the accepting capabilities of this acceptor.
+    ///
+    /// This function has the same semantics as `TcpAcceptor::close_accept`, and
+    /// more information can be found in that documentation.
+    #[experimental]
+    pub fn close_accept(&mut self) -> IoResult<()> {
+        self.obj.close_accept().map_err(IoError::from_rtio_error)
+    }
 }
 
 impl Acceptor<UnixStream> for UnixAcceptor {
@@ -222,6 +231,25 @@ impl Acceptor<UnixStream> for UnixAcceptor {
     }
 }
 
+impl Clone for UnixAcceptor {
+    /// Creates a new handle to this unix acceptor, allowing for simultaneous
+    /// accepts.
+    ///
+    /// The underlying unix acceptor will not be closed until all handles to the
+    /// acceptor have been deallocated. Incoming connections will be received on
+    /// at most once acceptor, the same connection will not be accepted twice.
+    ///
+    /// The `close_accept` method will shut down *all* acceptors cloned from the
+    /// same original acceptor, whereas the `set_timeout` method only affects
+    /// the selector that it is called on.
+    ///
+    /// This function is useful for creating a handle to invoke `close_accept`
+    /// on to wake up any other task blocked in `accept`.
+    fn clone(&self) -> UnixAcceptor {
+        UnixAcceptor { obj: self.obj.clone() }
+    }
+}
+
 #[cfg(test)]
 #[allow(experimental)]
 mod tests {
@@ -702,4 +730,71 @@ mod tests {
 
         rx2.recv();
     })
+
+    iotest!(fn clone_accept_smoke() {
+        let addr = next_test_unix();
+        let l = UnixListener::bind(&addr);
+        let mut a = l.listen().unwrap();
+        let mut a2 = a.clone();
+
+        let addr2 = addr.clone();
+        spawn(proc() {
+            let _ = UnixStream::connect(&addr2);
+        });
+        spawn(proc() {
+            let _ = UnixStream::connect(&addr);
+        });
+
+        assert!(a.accept().is_ok());
+        assert!(a2.accept().is_ok());
+    })
+
+    iotest!(fn clone_accept_concurrent() {
+        let addr = next_test_unix();
+        let l = UnixListener::bind(&addr);
+        let a = l.listen().unwrap();
+        let a2 = a.clone();
+
+        let (tx, rx) = channel();
+        let tx2 = tx.clone();
+
+        spawn(proc() { let mut a = a; tx.send(a.accept()) });
+        spawn(proc() { let mut a = a2; tx2.send(a.accept()) });
+
+        let addr2 = addr.clone();
+        spawn(proc() {
+            let _ = UnixStream::connect(&addr2);
+        });
+        spawn(proc() {
+            let _ = UnixStream::connect(&addr);
+        });
+
+        assert!(rx.recv().is_ok());
+        assert!(rx.recv().is_ok());
+    })
+
+    iotest!(fn close_accept_smoke() {
+        let addr = next_test_unix();
+        let l = UnixListener::bind(&addr);
+        let mut a = l.listen().unwrap();
+
+        a.close_accept().unwrap();
+        assert_eq!(a.accept().err().unwrap().kind, EndOfFile);
+    })
+
+    iotest!(fn close_accept_concurrent() {
+        let addr = next_test_unix();
+        let l = UnixListener::bind(&addr);
+        let a = l.listen().unwrap();
+        let mut a2 = a.clone();
+
+        let (tx, rx) = channel();
+        spawn(proc() {
+            let mut a = a;
+            tx.send(a.accept());
+        });
+        a2.close_accept().unwrap();
+
+        assert_eq!(rx.recv().err().unwrap().kind, EndOfFile);
+    })
 }
diff --git a/src/test/run-pass/tcp-accept-stress.rs b/src/test/run-pass/tcp-accept-stress.rs
new file mode 100644
index 00000000000..3e420e45cfc
--- /dev/null
+++ b/src/test/run-pass/tcp-accept-stress.rs
@@ -0,0 +1,94 @@
+// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+#![feature(phase)]
+
+#[phase(plugin)]
+extern crate green;
+extern crate native;
+
+use std::io::{TcpListener, Listener, Acceptor, EndOfFile, TcpStream};
+use std::sync::{atomics, Arc};
+use std::task::TaskBuilder;
+use native::NativeTaskBuilder;
+
+static N: uint = 8;
+static M: uint = 100;
+
+green_start!(main)
+
+fn main() {
+    test();
+
+    let (tx, rx) = channel();
+    TaskBuilder::new().native().spawn(proc() {
+        tx.send(test());
+    });
+    rx.recv();
+}
+
+fn test() {
+    let mut l = TcpListener::bind("127.0.0.1", 0).unwrap();
+    let addr = l.socket_name().unwrap();
+    let mut a = l.listen().unwrap();
+    let cnt = Arc::new(atomics::AtomicUint::new(0));
+
+    let (tx, rx) = channel();
+    for _ in range(0, N) {
+        let a = a.clone();
+        let cnt = cnt.clone();
+        let tx = tx.clone();
+        spawn(proc() {
+            let mut a = a;
+            let mut mycnt = 0u;
+            loop {
+                match a.accept() {
+                    Ok(..) => {
+                        mycnt += 1;
+                        if cnt.fetch_add(1, atomics::SeqCst) == N * M - 1 {
+                            break
+                        }
+                    }
+                    Err(ref e) if e.kind == EndOfFile => break,
+                    Err(e) => fail!("{}", e),
+                }
+            }
+            assert!(mycnt > 0);
+            tx.send(());
+        });
+    }
+
+    for _ in range(0, N) {
+        let tx = tx.clone();
+        spawn(proc() {
+            for _ in range(0, M) {
+                let _s = TcpStream::connect(addr.ip.to_string().as_slice(),
+                                            addr.port).unwrap();
+            }
+            tx.send(());
+        });
+    }
+
+    // wait for senders
+    assert_eq!(rx.iter().take(N).count(), N);
+
+    // wait for one acceptor to die
+    let _ = rx.recv();
+
+    // Notify other receivers should die
+    a.close_accept().unwrap();
+
+    // wait for receivers
+    assert_eq!(rx.iter().take(N - 1).count(), N - 1);
+
+    // Everything should have been accepted.
+    assert_eq!(cnt.load(atomics::SeqCst), N * M);
+}
+