about summary refs log tree commit diff
path: root/src/libstd/io/net/tcp.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/libstd/io/net/tcp.rs')
-rw-r--r--src/libstd/io/net/tcp.rs725
1 files changed, 725 insertions, 0 deletions
diff --git a/src/libstd/io/net/tcp.rs b/src/libstd/io/net/tcp.rs
new file mode 100644
index 00000000000..a59d652d739
--- /dev/null
+++ b/src/libstd/io/net/tcp.rs
@@ -0,0 +1,725 @@
+// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+use option::{Option, Some, None};
+use result::{Ok, Err};
+use io::net::ip::SocketAddr;
+use io::{Reader, Writer, Listener, Acceptor};
+use io::{io_error, EndOfFile};
+use rt::rtio::{IoFactory, with_local_io,
+               RtioSocket, RtioTcpListener, RtioTcpAcceptor, RtioTcpStream};
+
+pub struct TcpStream {
+    priv obj: ~RtioTcpStream
+}
+
+impl TcpStream {
+    fn new(s: ~RtioTcpStream) -> TcpStream {
+        TcpStream { obj: s }
+    }
+
+    pub fn connect(addr: SocketAddr) -> Option<TcpStream> {
+        do with_local_io |io| {
+            match io.tcp_connect(addr) {
+                Ok(s) => Some(TcpStream::new(s)),
+                Err(ioerr) => {
+                    io_error::cond.raise(ioerr);
+                    None
+                }
+            }
+        }
+    }
+
+    pub fn peer_name(&mut self) -> Option<SocketAddr> {
+        match self.obj.peer_name() {
+            Ok(pn) => Some(pn),
+            Err(ioerr) => {
+                debug!("failed to get peer name: {:?}", ioerr);
+                io_error::cond.raise(ioerr);
+                None
+            }
+        }
+    }
+
+    pub fn socket_name(&mut self) -> Option<SocketAddr> {
+        match self.obj.socket_name() {
+            Ok(sn) => Some(sn),
+            Err(ioerr) => {
+                debug!("failed to get socket name: {:?}", ioerr);
+                io_error::cond.raise(ioerr);
+                None
+            }
+        }
+    }
+}
+
+impl Reader for TcpStream {
+    fn read(&mut self, buf: &mut [u8]) -> Option<uint> {
+        match self.obj.read(buf) {
+            Ok(read) => Some(read),
+            Err(ioerr) => {
+                // EOF is indicated by returning None
+                if ioerr.kind != EndOfFile {
+                    io_error::cond.raise(ioerr);
+                }
+                return None;
+            }
+        }
+    }
+
+    fn eof(&mut self) -> bool { fail!() }
+}
+
+impl Writer for TcpStream {
+    fn write(&mut self, buf: &[u8]) {
+        match self.obj.write(buf) {
+            Ok(_) => (),
+            Err(ioerr) => io_error::cond.raise(ioerr),
+        }
+    }
+}
+
+pub struct TcpListener {
+    priv obj: ~RtioTcpListener
+}
+
+impl TcpListener {
+    pub fn bind(addr: SocketAddr) -> Option<TcpListener> {
+        do with_local_io |io| {
+            match io.tcp_bind(addr) {
+                Ok(l) => Some(TcpListener { obj: l }),
+                Err(ioerr) => {
+                    io_error::cond.raise(ioerr);
+                    None
+                }
+            }
+        }
+    }
+
+    pub fn socket_name(&mut self) -> Option<SocketAddr> {
+        match self.obj.socket_name() {
+            Ok(sn) => Some(sn),
+            Err(ioerr) => {
+                debug!("failed to get socket name: {:?}", ioerr);
+                io_error::cond.raise(ioerr);
+                None
+            }
+        }
+    }
+}
+
+impl Listener<TcpStream, TcpAcceptor> for TcpListener {
+    fn listen(self) -> Option<TcpAcceptor> {
+        match self.obj.listen() {
+            Ok(acceptor) => Some(TcpAcceptor { obj: acceptor }),
+            Err(ioerr) => {
+                io_error::cond.raise(ioerr);
+                None
+            }
+        }
+    }
+}
+
+pub struct TcpAcceptor {
+    priv obj: ~RtioTcpAcceptor
+}
+
+impl Acceptor<TcpStream> for TcpAcceptor {
+    fn accept(&mut self) -> Option<TcpStream> {
+        match self.obj.accept() {
+            Ok(s) => Some(TcpStream::new(s)),
+            Err(ioerr) => {
+                io_error::cond.raise(ioerr);
+                None
+            }
+        }
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use super::*;
+    use cell::Cell;
+    use rt::test::*;
+    use io::net::ip::{Ipv4Addr, SocketAddr};
+    use io::*;
+    use prelude::*;
+    use rt::comm::oneshot;
+
+    #[test] #[ignore]
+    fn bind_error() {
+        do run_in_mt_newsched_task {
+            let mut called = false;
+            do io_error::cond.trap(|e| {
+                assert!(e.kind == PermissionDenied);
+                called = true;
+            }).inside {
+                let addr = SocketAddr { ip: Ipv4Addr(0, 0, 0, 0), port: 1 };
+                let listener = TcpListener::bind(addr);
+                assert!(listener.is_none());
+            }
+            assert!(called);
+        }
+    }
+
+    #[test]
+    fn connect_error() {
+        do run_in_mt_newsched_task {
+            let mut called = false;
+            do io_error::cond.trap(|e| {
+                let expected_error = if cfg!(unix) {
+                    ConnectionRefused
+                } else {
+                    // On Win32, opening port 1 gives WSAEADDRNOTAVAIL error.
+                    OtherIoError
+                };
+                assert_eq!(e.kind, expected_error);
+                called = true;
+            }).inside {
+                let addr = SocketAddr { ip: Ipv4Addr(0, 0, 0, 0), port: 1 };
+                let stream = TcpStream::connect(addr);
+                assert!(stream.is_none());
+            }
+            assert!(called);
+        }
+    }
+
+    #[test]
+    fn smoke_test_ip4() {
+        do run_in_mt_newsched_task {
+            let addr = next_test_ip4();
+            let (port, chan) = oneshot();
+            let port = Cell::new(port);
+            let chan = Cell::new(chan);
+
+            do spawntask {
+                let mut acceptor = TcpListener::bind(addr).listen();
+                chan.take().send(());
+                let mut stream = acceptor.accept();
+                let mut buf = [0];
+                stream.read(buf);
+                assert!(buf[0] == 99);
+            }
+
+            do spawntask {
+                port.take().recv();
+                let mut stream = TcpStream::connect(addr);
+                stream.write([99]);
+            }
+        }
+    }
+
+    #[test]
+    fn smoke_test_ip6() {
+        do run_in_mt_newsched_task {
+            let addr = next_test_ip6();
+            let (port, chan) = oneshot();
+            let port = Cell::new(port);
+            let chan = Cell::new(chan);
+
+            do spawntask {
+                let mut acceptor = TcpListener::bind(addr).listen();
+                chan.take().send(());
+                let mut stream = acceptor.accept();
+                let mut buf = [0];
+                stream.read(buf);
+                assert!(buf[0] == 99);
+            }
+
+            do spawntask {
+                port.take().recv();
+                let mut stream = TcpStream::connect(addr);
+                stream.write([99]);
+            }
+        }
+    }
+
+    #[test]
+    fn read_eof_ip4() {
+        do run_in_mt_newsched_task {
+            let addr = next_test_ip4();
+            let (port, chan) = oneshot();
+            let port = Cell::new(port);
+            let chan = Cell::new(chan);
+
+            do spawntask {
+                let mut acceptor = TcpListener::bind(addr).listen();
+                chan.take().send(());
+                let mut stream = acceptor.accept();
+                let mut buf = [0];
+                let nread = stream.read(buf);
+                assert!(nread.is_none());
+            }
+
+            do spawntask {
+                port.take().recv();
+                let _stream = TcpStream::connect(addr);
+                // Close
+            }
+        }
+    }
+
+    #[test]
+    fn read_eof_ip6() {
+        do run_in_mt_newsched_task {
+            let addr = next_test_ip6();
+            let (port, chan) = oneshot();
+            let port = Cell::new(port);
+            let chan = Cell::new(chan);
+
+            do spawntask {
+                let mut acceptor = TcpListener::bind(addr).listen();
+                chan.take().send(());
+                let mut stream = acceptor.accept();
+                let mut buf = [0];
+                let nread = stream.read(buf);
+                assert!(nread.is_none());
+            }
+
+            do spawntask {
+                port.take().recv();
+                let _stream = TcpStream::connect(addr);
+                // Close
+            }
+        }
+    }
+
+    #[test]
+    fn read_eof_twice_ip4() {
+        do run_in_mt_newsched_task {
+            let addr = next_test_ip4();
+            let (port, chan) = oneshot();
+            let port = Cell::new(port);
+            let chan = Cell::new(chan);
+
+            do spawntask {
+                let mut acceptor = TcpListener::bind(addr).listen();
+                chan.take().send(());
+                let mut stream = acceptor.accept();
+                let mut buf = [0];
+                let nread = stream.read(buf);
+                assert!(nread.is_none());
+                do io_error::cond.trap(|e| {
+                    if cfg!(windows) {
+                        assert_eq!(e.kind, NotConnected);
+                    } else {
+                        fail!();
+                    }
+                }).inside {
+                    let nread = stream.read(buf);
+                    assert!(nread.is_none());
+                }
+            }
+
+            do spawntask {
+                port.take().recv();
+                let _stream = TcpStream::connect(addr);
+                // Close
+            }
+        }
+    }
+
+    #[test]
+    fn read_eof_twice_ip6() {
+        do run_in_mt_newsched_task {
+            let addr = next_test_ip6();
+            let (port, chan) = oneshot();
+            let port = Cell::new(port);
+            let chan = Cell::new(chan);
+
+            do spawntask {
+                let mut acceptor = TcpListener::bind(addr).listen();
+                chan.take().send(());
+                let mut stream = acceptor.accept();
+                let mut buf = [0];
+                let nread = stream.read(buf);
+                assert!(nread.is_none());
+                do io_error::cond.trap(|e| {
+                    if cfg!(windows) {
+                        assert_eq!(e.kind, NotConnected);
+                    } else {
+                        fail!();
+                    }
+                }).inside {
+                    let nread = stream.read(buf);
+                    assert!(nread.is_none());
+                }
+            }
+
+            do spawntask {
+                port.take().recv();
+                let _stream = TcpStream::connect(addr);
+                // Close
+            }
+        }
+    }
+
+    #[test]
+    fn write_close_ip4() {
+        do run_in_mt_newsched_task {
+            let addr = next_test_ip4();
+            let (port, chan) = oneshot();
+            let port = Cell::new(port);
+            let chan = Cell::new(chan);
+
+            do spawntask {
+                let mut acceptor = TcpListener::bind(addr).listen();
+                chan.take().send(());
+                let mut stream = acceptor.accept();
+                let buf = [0];
+                loop {
+                    let mut stop = false;
+                    do io_error::cond.trap(|e| {
+                        // NB: ECONNRESET on linux, EPIPE on mac, ECONNABORTED
+                        //     on windows
+                        assert!(e.kind == ConnectionReset ||
+                                e.kind == BrokenPipe ||
+                                e.kind == ConnectionAborted,
+                                "unknown error: {:?}", e);
+                        stop = true;
+                    }).inside {
+                        stream.write(buf);
+                    }
+                    if stop { break }
+                }
+            }
+
+            do spawntask {
+                port.take().recv();
+                let _stream = TcpStream::connect(addr);
+                // Close
+            }
+        }
+    }
+
+    #[test]
+    fn write_close_ip6() {
+        do run_in_mt_newsched_task {
+            let addr = next_test_ip6();
+            let (port, chan) = oneshot();
+            let port = Cell::new(port);
+            let chan = Cell::new(chan);
+
+            do spawntask {
+                let mut acceptor = TcpListener::bind(addr).listen();
+                chan.take().send(());
+                let mut stream = acceptor.accept();
+                let buf = [0];
+                loop {
+                    let mut stop = false;
+                    do io_error::cond.trap(|e| {
+                        // NB: ECONNRESET on linux, EPIPE on mac, ECONNABORTED
+                        //     on windows
+                        assert!(e.kind == ConnectionReset ||
+                                e.kind == BrokenPipe ||
+                                e.kind == ConnectionAborted,
+                                "unknown error: {:?}", e);
+                        stop = true;
+                    }).inside {
+                        stream.write(buf);
+                    }
+                    if stop { break }
+                }
+            }
+
+            do spawntask {
+                port.take().recv();
+                let _stream = TcpStream::connect(addr);
+                // Close
+            }
+        }
+    }
+
+    #[test]
+    fn multiple_connect_serial_ip4() {
+        do run_in_mt_newsched_task {
+            let addr = next_test_ip4();
+            let max = 10;
+            let (port, chan) = oneshot();
+            let port = Cell::new(port);
+            let chan = Cell::new(chan);
+
+            do spawntask {
+                let mut acceptor = TcpListener::bind(addr).listen();
+                chan.take().send(());
+                for ref mut stream in acceptor.incoming().take(max) {
+                    let mut buf = [0];
+                    stream.read(buf);
+                    assert_eq!(buf[0], 99);
+                }
+            }
+
+            do spawntask {
+                port.take().recv();
+                do max.times {
+                    let mut stream = TcpStream::connect(addr);
+                    stream.write([99]);
+                }
+            }
+        }
+    }
+
+    #[test]
+    fn multiple_connect_serial_ip6() {
+        do run_in_mt_newsched_task {
+            let addr = next_test_ip6();
+            let max = 10;
+            let (port, chan) = oneshot();
+            let port = Cell::new(port);
+            let chan = Cell::new(chan);
+
+            do spawntask {
+                let mut acceptor = TcpListener::bind(addr).listen();
+                chan.take().send(());
+                for ref mut stream in acceptor.incoming().take(max) {
+                    let mut buf = [0];
+                    stream.read(buf);
+                    assert_eq!(buf[0], 99);
+                }
+            }
+
+            do spawntask {
+                port.take().recv();
+                do max.times {
+                    let mut stream = TcpStream::connect(addr);
+                    stream.write([99]);
+                }
+            }
+        }
+    }
+
+    #[test]
+    fn multiple_connect_interleaved_greedy_schedule_ip4() {
+        do run_in_mt_newsched_task {
+            let addr = next_test_ip4();
+            static MAX: int = 10;
+            let (port, chan) = oneshot();
+            let chan = Cell::new(chan);
+
+            do spawntask {
+                let mut acceptor = TcpListener::bind(addr).listen();
+                chan.take().send(());
+                for (i, stream) in acceptor.incoming().enumerate().take(MAX as uint) {
+                    let stream = Cell::new(stream);
+                    // Start another task to handle the connection
+                    do spawntask {
+                        let mut stream = stream.take();
+                        let mut buf = [0];
+                        stream.read(buf);
+                        assert!(buf[0] == i as u8);
+                        debug!("read");
+                    }
+                }
+            }
+
+            port.recv();
+            connect(0, addr);
+
+            fn connect(i: int, addr: SocketAddr) {
+                if i == MAX { return }
+
+                do spawntask {
+                    debug!("connecting");
+                    let mut stream = TcpStream::connect(addr);
+                    // Connect again before writing
+                    connect(i + 1, addr);
+                    debug!("writing");
+                    stream.write([i as u8]);
+                }
+            }
+        }
+    }
+
+    #[test]
+    fn multiple_connect_interleaved_greedy_schedule_ip6() {
+        do run_in_mt_newsched_task {
+            let addr = next_test_ip6();
+            static MAX: int = 10;
+            let (port, chan) = oneshot();
+            let chan = Cell::new(chan);
+
+            do spawntask {
+                let mut acceptor = TcpListener::bind(addr).listen();
+                chan.take().send(());
+                for (i, stream) in acceptor.incoming().enumerate().take(MAX as uint) {
+                    let stream = Cell::new(stream);
+                    // Start another task to handle the connection
+                    do spawntask {
+                        let mut stream = stream.take();
+                        let mut buf = [0];
+                        stream.read(buf);
+                        assert!(buf[0] == i as u8);
+                        debug!("read");
+                    }
+                }
+            }
+
+            port.recv();
+            connect(0, addr);
+
+            fn connect(i: int, addr: SocketAddr) {
+                if i == MAX { return }
+
+                do spawntask {
+                    debug!("connecting");
+                    let mut stream = TcpStream::connect(addr);
+                    // Connect again before writing
+                    connect(i + 1, addr);
+                    debug!("writing");
+                    stream.write([i as u8]);
+                }
+            }
+        }
+    }
+
+    #[test]
+    fn multiple_connect_interleaved_lazy_schedule_ip4() {
+        do run_in_mt_newsched_task {
+            let addr = next_test_ip4();
+            static MAX: int = 10;
+            let (port, chan) = oneshot();
+            let chan = Cell::new(chan);
+
+            do spawntask {
+                let mut acceptor = TcpListener::bind(addr).listen();
+                chan.take().send(());
+                for stream in acceptor.incoming().take(MAX as uint) {
+                    let stream = Cell::new(stream);
+                    // Start another task to handle the connection
+                    do spawntask_later {
+                        let mut stream = stream.take();
+                        let mut buf = [0];
+                        stream.read(buf);
+                        assert!(buf[0] == 99);
+                        debug!("read");
+                    }
+                }
+            }
+
+            port.recv();
+            connect(0, addr);
+
+            fn connect(i: int, addr: SocketAddr) {
+                if i == MAX { return }
+
+                do spawntask_later {
+                    debug!("connecting");
+                    let mut stream = TcpStream::connect(addr);
+                    // Connect again before writing
+                    connect(i + 1, addr);
+                    debug!("writing");
+                    stream.write([99]);
+                }
+            }
+        }
+    }
+    #[test]
+    fn multiple_connect_interleaved_lazy_schedule_ip6() {
+        do run_in_mt_newsched_task {
+            let addr = next_test_ip6();
+            static MAX: int = 10;
+            let (port, chan) = oneshot();
+            let chan = Cell::new(chan);
+
+            do spawntask {
+                let mut acceptor = TcpListener::bind(addr).listen();
+                chan.take().send(());
+                for stream in acceptor.incoming().take(MAX as uint) {
+                    let stream = Cell::new(stream);
+                    // Start another task to handle the connection
+                    do spawntask_later {
+                        let mut stream = stream.take();
+                        let mut buf = [0];
+                        stream.read(buf);
+                        assert!(buf[0] == 99);
+                        debug!("read");
+                    }
+                }
+            }
+
+            port.recv();
+            connect(0, addr);
+
+            fn connect(i: int, addr: SocketAddr) {
+                if i == MAX { return }
+
+                do spawntask_later {
+                    debug!("connecting");
+                    let mut stream = TcpStream::connect(addr);
+                    // Connect again before writing
+                    connect(i + 1, addr);
+                    debug!("writing");
+                    stream.write([99]);
+                }
+            }
+        }
+    }
+
+    #[cfg(test)]
+    fn socket_name(addr: SocketAddr) {
+        do run_in_mt_newsched_task {
+            do spawntask {
+                let mut listener = TcpListener::bind(addr).unwrap();
+
+                // Make sure socket_name gives
+                // us the socket we binded to.
+                let so_name = listener.socket_name();
+                assert!(so_name.is_some());
+                assert_eq!(addr, so_name.unwrap());
+
+            }
+        }
+    }
+
+    #[cfg(test)]
+    fn peer_name(addr: SocketAddr) {
+        do run_in_mt_newsched_task {
+            let (port, chan) = oneshot();
+            let port = Cell::new(port);
+            let chan = Cell::new(chan);
+
+            do spawntask {
+                let mut acceptor = TcpListener::bind(addr).listen();
+                chan.take().send(());
+
+                acceptor.accept();
+            }
+
+            do spawntask {
+                port.take().recv();
+                let stream = TcpStream::connect(addr);
+
+                assert!(stream.is_some());
+                let mut stream = stream.unwrap();
+
+                // Make sure peer_name gives us the
+                // address/port of the peer we've
+                // connected to.
+                let peer_name = stream.peer_name();
+                assert!(peer_name.is_some());
+                assert_eq!(addr, peer_name.unwrap());
+            }
+        }
+    }
+
+    #[test]
+    fn socket_and_peer_name_ip4() {
+        peer_name(next_test_ip4());
+        socket_name(next_test_ip4());
+    }
+
+    #[test]
+    fn socket_and_peer_name_ip6() {
+        // XXX: peer name is not consistent
+        //peer_name(next_test_ip6());
+        socket_name(next_test_ip6());
+    }
+
+}