about summary refs log tree commit diff
path: root/src/libstd/io
diff options
context:
space:
mode:
authorAlex Crichton <alex@alexcrichton.com>2013-12-05 18:19:06 -0800
committerAlex Crichton <alex@alexcrichton.com>2013-12-16 17:47:11 -0800
commit529e268ab900f1b6e731af64ce2aeecda3555f4e (patch)
tree7ebb9ed2a7f36455b9550749a442522d45f0dc30 /src/libstd/io
parentbfa9064ba2687eb1d95708f72f41ddd9729a6ba1 (diff)
downloadrust-529e268ab900f1b6e731af64ce2aeecda3555f4e.tar.gz
rust-529e268ab900f1b6e731af64ce2aeecda3555f4e.zip
Fallout of rewriting std::comm
Diffstat (limited to 'src/libstd/io')
-rw-r--r--src/libstd/io/comm_adapters.rs48
-rw-r--r--src/libstd/io/mod.rs3
-rw-r--r--src/libstd/io/net/tcp.rs141
-rw-r--r--src/libstd/io/net/udp.rs67
-rw-r--r--src/libstd/io/net/unix.rs25
-rw-r--r--src/libstd/io/signal.rs11
-rw-r--r--src/libstd/io/timer.rs4
7 files changed, 117 insertions, 182 deletions
diff --git a/src/libstd/io/comm_adapters.rs b/src/libstd/io/comm_adapters.rs
index b3e5a9a0c86..7f94af8307e 100644
--- a/src/libstd/io/comm_adapters.rs
+++ b/src/libstd/io/comm_adapters.rs
@@ -10,7 +10,7 @@
 
 use prelude::*;
 
-use comm::{GenericPort, GenericChan, GenericSmartChan};
+use comm::{Port, Chan};
 use cmp;
 use io;
 use option::{None, Option, Some};
@@ -30,15 +30,15 @@ use vec::{bytes, CopyableVector, MutableVector, ImmutableVector};
 ///     None => println!("At the end of the stream!")
 /// }
 /// ```
-pub struct PortReader<P> {
+pub struct PortReader {
     priv buf: Option<~[u8]>,  // A buffer of bytes received but not consumed.
     priv pos: uint,           // How many of the buffered bytes have already be consumed.
-    priv port: P,             // The port to pull data from.
+    priv port: Port<~[u8]>,   // The port to pull data from.
     priv closed: bool,        // Whether the pipe this port connects to has been closed.
 }
 
-impl<P: GenericPort<~[u8]>> PortReader<P> {
-    pub fn new(port: P) -> PortReader<P> {
+impl PortReader {
+    pub fn new(port: Port<~[u8]>) -> PortReader<P> {
         PortReader {
             buf: None,
             pos: 0,
@@ -48,7 +48,7 @@ impl<P: GenericPort<~[u8]>> PortReader<P> {
     }
 }
 
-impl<P: GenericPort<~[u8]>> Reader for PortReader<P> {
+impl Reader for PortReader {
     fn read(&mut self, buf: &mut [u8]) -> Option<uint> {
         let mut num_read = 0;
         loop {
@@ -67,7 +67,7 @@ impl<P: GenericPort<~[u8]>> Reader for PortReader<P> {
                 break;
             }
             self.pos = 0;
-            self.buf = self.port.try_recv();
+            self.buf = self.port.recv_opt();
             self.closed = self.buf.is_none();
         }
         if self.closed && num_read == 0 {
@@ -89,17 +89,17 @@ impl<P: GenericPort<~[u8]>> Reader for PortReader<P> {
 /// let writer = ChanWriter::new(chan);
 /// writer.write("hello, world".as_bytes());
 /// ```
-pub struct ChanWriter<C> {
-    chan: C,
+pub struct ChanWriter {
+    chan: Chan<~[u8]>,
 }
 
-impl<C: GenericSmartChan<~[u8]>> ChanWriter<C> {
+impl ChanWriter {
     pub fn new(chan: C) -> ChanWriter<C> {
         ChanWriter { chan: chan }
     }
 }
 
-impl<C: GenericSmartChan<~[u8]>> Writer for ChanWriter<C> {
+impl Writer for ChanWriter {
     fn write(&mut self, buf: &[u8]) {
         if !self.chan.try_send(buf.to_owned()) {
             io::io_error::cond.raise(io::IoError {
@@ -111,28 +111,6 @@ impl<C: GenericSmartChan<~[u8]>> Writer for ChanWriter<C> {
     }
 }
 
-pub struct ReaderPort<R>;
-
-impl<R: Reader> ReaderPort<R> {
-    pub fn new(_reader: R) -> ReaderPort<R> { fail!() }
-}
-
-impl<R: Reader> GenericPort<~[u8]> for ReaderPort<R> {
-    fn recv(&self) -> ~[u8] { fail!() }
-
-    fn try_recv(&self) -> Option<~[u8]> { fail!() }
-}
-
-pub struct WriterChan<W>;
-
-impl<W: Writer> WriterChan<W> {
-    pub fn new(_writer: W) -> WriterChan<W> { fail!() }
-}
-
-impl<W: Writer> GenericChan<~[u8]> for WriterChan<W> {
-    fn send(&self, _x: ~[u8]) { fail!() }
-}
-
 
 #[cfg(test)]
 mod test {
@@ -144,7 +122,7 @@ mod test {
 
     #[test]
     fn test_port_reader() {
-        let (port, chan) = comm::stream();
+        let (port, chan) = Chan::new();
         do task::spawn {
           chan.send(~[1u8, 2u8]);
           chan.send(~[]);
@@ -199,7 +177,7 @@ mod test {
 
     #[test]
     fn test_chan_writer() {
-        let (port, chan) = comm::stream();
+        let (port, chan) = Chan::new();
         let mut writer = ChanWriter::new(chan);
         writer.write_be_u32(42);
 
diff --git a/src/libstd/io/mod.rs b/src/libstd/io/mod.rs
index c0bdc2a2014..2e9056a6aee 100644
--- a/src/libstd/io/mod.rs
+++ b/src/libstd/io/mod.rs
@@ -318,9 +318,6 @@ mod option;
 /// Basic stream compression. XXX: Belongs with other flate code
 pub mod flate;
 
-/// Interop between byte streams and pipes. Not sure where it belongs
-pub mod comm_adapters;
-
 /// Extension traits
 pub mod extensions;
 
diff --git a/src/libstd/io/net/tcp.rs b/src/libstd/io/net/tcp.rs
index 3c7582db7b8..a6230ede7e3 100644
--- a/src/libstd/io/net/tcp.rs
+++ b/src/libstd/io/net/tcp.rs
@@ -151,7 +151,6 @@ mod test {
     use io::net::ip::{Ipv4Addr, SocketAddr};
     use io::*;
     use prelude::*;
-    use rt::comm::oneshot;
 
     #[test] #[ignore]
     fn bind_error() {
@@ -195,7 +194,7 @@ mod test {
     fn smoke_test_ip4() {
         do run_in_mt_newsched_task {
             let addr = next_test_ip4();
-            let (port, chan) = oneshot();
+            let (port, chan) = Chan::new();
 
             do spawntask {
                 let mut acceptor = TcpListener::bind(addr).listen();
@@ -206,11 +205,9 @@ mod test {
                 assert!(buf[0] == 99);
             }
 
-            do spawntask {
-                port.recv();
-                let mut stream = TcpStream::connect(addr);
-                stream.write([99]);
-            }
+            port.recv();
+            let mut stream = TcpStream::connect(addr);
+            stream.write([99]);
         }
     }
 
@@ -218,7 +215,7 @@ mod test {
     fn smoke_test_ip6() {
         do run_in_mt_newsched_task {
             let addr = next_test_ip6();
-            let (port, chan) = oneshot();
+            let (port, chan) = Chan::new();
 
             do spawntask {
                 let mut acceptor = TcpListener::bind(addr).listen();
@@ -229,11 +226,9 @@ mod test {
                 assert!(buf[0] == 99);
             }
 
-            do spawntask {
-                port.recv();
-                let mut stream = TcpStream::connect(addr);
-                stream.write([99]);
-            }
+            port.recv();
+            let mut stream = TcpStream::connect(addr);
+            stream.write([99]);
         }
     }
 
@@ -241,7 +236,7 @@ mod test {
     fn read_eof_ip4() {
         do run_in_mt_newsched_task {
             let addr = next_test_ip4();
-            let (port, chan) = oneshot();
+            let (port, chan) = Chan::new();
 
             do spawntask {
                 let mut acceptor = TcpListener::bind(addr).listen();
@@ -252,11 +247,9 @@ mod test {
                 assert!(nread.is_none());
             }
 
-            do spawntask {
-                port.recv();
-                let _stream = TcpStream::connect(addr);
-                // Close
-            }
+            port.recv();
+            let _stream = TcpStream::connect(addr);
+            // Close
         }
     }
 
@@ -264,7 +257,7 @@ mod test {
     fn read_eof_ip6() {
         do run_in_mt_newsched_task {
             let addr = next_test_ip6();
-            let (port, chan) = oneshot();
+            let (port, chan) = Chan::new();
 
             do spawntask {
                 let mut acceptor = TcpListener::bind(addr).listen();
@@ -275,11 +268,9 @@ mod test {
                 assert!(nread.is_none());
             }
 
-            do spawntask {
-                port.recv();
-                let _stream = TcpStream::connect(addr);
-                // Close
-            }
+            port.recv();
+            let _stream = TcpStream::connect(addr);
+            // Close
         }
     }
 
@@ -287,7 +278,7 @@ mod test {
     fn read_eof_twice_ip4() {
         do run_in_mt_newsched_task {
             let addr = next_test_ip4();
-            let (port, chan) = oneshot();
+            let (port, chan) = Chan::new();
 
             do spawntask {
                 let mut acceptor = TcpListener::bind(addr).listen();
@@ -308,11 +299,9 @@ mod test {
                 })
             }
 
-            do spawntask {
-                port.recv();
-                let _stream = TcpStream::connect(addr);
-                // Close
-            }
+            port.recv();
+            let _stream = TcpStream::connect(addr);
+            // Close
         }
     }
 
@@ -320,7 +309,7 @@ mod test {
     fn read_eof_twice_ip6() {
         do run_in_mt_newsched_task {
             let addr = next_test_ip6();
-            let (port, chan) = oneshot();
+            let (port, chan) = Chan::new();
 
             do spawntask {
                 let mut acceptor = TcpListener::bind(addr).listen();
@@ -341,11 +330,9 @@ mod test {
                 })
             }
 
-            do spawntask {
-                port.recv();
-                let _stream = TcpStream::connect(addr);
-                // Close
-            }
+            port.recv();
+            let _stream = TcpStream::connect(addr);
+            // Close
         }
     }
 
@@ -353,7 +340,7 @@ mod test {
     fn write_close_ip4() {
         do run_in_mt_newsched_task {
             let addr = next_test_ip4();
-            let (port, chan) = oneshot();
+            let (port, chan) = Chan::new();
 
             do spawntask {
                 let mut acceptor = TcpListener::bind(addr).listen();
@@ -377,11 +364,9 @@ mod test {
                 }
             }
 
-            do spawntask {
-                port.recv();
-                let _stream = TcpStream::connect(addr);
-                // Close
-            }
+            port.recv();
+            let _stream = TcpStream::connect(addr);
+            // Close
         }
     }
 
@@ -389,7 +374,7 @@ mod test {
     fn write_close_ip6() {
         do run_in_mt_newsched_task {
             let addr = next_test_ip6();
-            let (port, chan) = oneshot();
+            let (port, chan) = Chan::new();
 
             do spawntask {
                 let mut acceptor = TcpListener::bind(addr).listen();
@@ -413,11 +398,9 @@ mod test {
                 }
             }
 
-            do spawntask {
-                port.recv();
-                let _stream = TcpStream::connect(addr);
-                // Close
-            }
+            port.recv();
+            let _stream = TcpStream::connect(addr);
+            // Close
         }
     }
 
@@ -426,7 +409,7 @@ mod test {
         do run_in_mt_newsched_task {
             let addr = next_test_ip4();
             let max = 10;
-            let (port, chan) = oneshot();
+            let (port, chan) = Chan::new();
 
             do spawntask {
                 let mut acceptor = TcpListener::bind(addr).listen();
@@ -438,13 +421,11 @@ mod test {
                 }
             }
 
-            do spawntask {
-                port.recv();
-                max.times(|| {
-                    let mut stream = TcpStream::connect(addr);
-                    stream.write([99]);
-                });
-            }
+            port.recv();
+            max.times(|| {
+                let mut stream = TcpStream::connect(addr);
+                stream.write([99]);
+            });
         }
     }
 
@@ -453,7 +434,7 @@ mod test {
         do run_in_mt_newsched_task {
             let addr = next_test_ip6();
             let max = 10;
-            let (port, chan) = oneshot();
+            let (port, chan) = Chan::new();
 
             do spawntask {
                 let mut acceptor = TcpListener::bind(addr).listen();
@@ -465,13 +446,11 @@ mod test {
                 }
             }
 
-            do spawntask {
-                port.recv();
-                max.times(|| {
-                    let mut stream = TcpStream::connect(addr);
-                    stream.write([99]);
-                });
-            }
+            port.recv();
+            max.times(|| {
+                let mut stream = TcpStream::connect(addr);
+                stream.write([99]);
+            });
         }
     }
 
@@ -480,7 +459,7 @@ mod test {
         do run_in_mt_newsched_task {
             let addr = next_test_ip4();
             static MAX: int = 10;
-            let (port, chan) = oneshot();
+            let (port, chan) = Chan::new();
 
             do spawntask {
                 let mut acceptor = TcpListener::bind(addr).listen();
@@ -520,7 +499,7 @@ mod test {
         do run_in_mt_newsched_task {
             let addr = next_test_ip6();
             static MAX: int = 10;
-            let (port, chan) = oneshot();
+            let (port, chan) = Chan::new();
 
             do spawntask {
                 let mut acceptor = TcpListener::bind(addr).listen();
@@ -560,7 +539,7 @@ mod test {
         do run_in_mt_newsched_task {
             let addr = next_test_ip4();
             static MAX: int = 10;
-            let (port, chan) = oneshot();
+            let (port, chan) = Chan::new();
 
             do spawntask {
                 let mut acceptor = TcpListener::bind(addr).listen();
@@ -599,7 +578,7 @@ mod test {
         do run_in_mt_newsched_task {
             let addr = next_test_ip6();
             static MAX: int = 10;
-            let (port, chan) = oneshot();
+            let (port, chan) = Chan::new();
 
             do spawntask {
                 let mut acceptor = TcpListener::bind(addr).listen();
@@ -653,7 +632,7 @@ mod test {
     #[cfg(test)]
     fn peer_name(addr: SocketAddr) {
         do run_in_mt_newsched_task {
-            let (port, chan) = oneshot();
+            let (port, chan) = Chan::new();
 
             do spawntask {
                 let mut acceptor = TcpListener::bind(addr).listen();
@@ -662,20 +641,18 @@ mod test {
                 acceptor.accept();
             }
 
-            do spawntask {
-                port.recv();
-                let stream = TcpStream::connect(addr);
+            port.recv();
+            let stream = TcpStream::connect(addr);
 
-                assert!(stream.is_some());
-                let mut stream = stream.unwrap();
+            assert!(stream.is_some());
+            let mut stream = stream.unwrap();
 
-                // Make sure peer_name gives us the
-                // address/port of the peer we've
-                // connected to.
-                let peer_name = stream.peer_name();
-                assert!(peer_name.is_some());
-                assert_eq!(addr, peer_name.unwrap());
-            }
+            // Make sure peer_name gives us the
+            // address/port of the peer we've
+            // connected to.
+            let peer_name = stream.peer_name();
+            assert!(peer_name.is_some());
+            assert_eq!(addr, peer_name.unwrap());
         }
     }
 
diff --git a/src/libstd/io/net/udp.rs b/src/libstd/io/net/udp.rs
index 87cf59aba3b..1e56f964bea 100644
--- a/src/libstd/io/net/udp.rs
+++ b/src/libstd/io/net/udp.rs
@@ -107,8 +107,7 @@ mod test {
     use rt::test::*;
     use io::net::ip::{Ipv4Addr, SocketAddr};
     use io::*;
-    use option::{Some, None};
-    use rt::comm::oneshot;
+    use prelude::*;
 
     #[test]  #[ignore]
     fn bind_error() {
@@ -131,7 +130,7 @@ mod test {
         do run_in_mt_newsched_task {
             let server_ip = next_test_ip4();
             let client_ip = next_test_ip4();
-            let (port, chan) = oneshot();
+            let (port, chan) = Chan::new();
 
             do spawntask {
                 match UdpSocket::bind(server_ip) {
@@ -151,14 +150,12 @@ mod test {
                 }
             }
 
-            do spawntask {
-                match UdpSocket::bind(client_ip) {
-                    Some(ref mut client) => {
-                        port.recv();
-                        client.sendto([99], server_ip)
-                    }
-                    None => fail!()
+            match UdpSocket::bind(client_ip) {
+                Some(ref mut client) => {
+                    port.recv();
+                    client.sendto([99], server_ip)
                 }
+                None => fail!()
             }
         }
     }
@@ -168,7 +165,7 @@ mod test {
         do run_in_mt_newsched_task {
             let server_ip = next_test_ip6();
             let client_ip = next_test_ip6();
-            let (port, chan) = oneshot();
+            let (port, chan) = Chan::new();
 
             do spawntask {
                 match UdpSocket::bind(server_ip) {
@@ -188,14 +185,12 @@ mod test {
                 }
             }
 
-            do spawntask {
-                match UdpSocket::bind(client_ip) {
-                    Some(ref mut client) => {
-                        port.recv();
-                        client.sendto([99], server_ip)
-                    }
-                    None => fail!()
+            match UdpSocket::bind(client_ip) {
+                Some(ref mut client) => {
+                    port.recv();
+                    client.sendto([99], server_ip)
                 }
+                None => fail!()
             }
         }
     }
@@ -205,7 +200,7 @@ mod test {
         do run_in_mt_newsched_task {
             let server_ip = next_test_ip4();
             let client_ip = next_test_ip4();
-            let (port, chan) = oneshot();
+            let (port, chan) = Chan::new();
 
             do spawntask {
                 match UdpSocket::bind(server_ip) {
@@ -226,16 +221,14 @@ mod test {
                 }
             }
 
-            do spawntask {
-                match UdpSocket::bind(client_ip) {
-                    Some(client) => {
-                        let client = ~client;
-                        let mut stream = client.connect(server_ip);
-                        port.recv();
-                        stream.write([99]);
-                    }
-                    None => fail!()
+            match UdpSocket::bind(client_ip) {
+                Some(client) => {
+                    let client = ~client;
+                    let mut stream = client.connect(server_ip);
+                    port.recv();
+                    stream.write([99]);
                 }
+                None => fail!()
             }
         }
     }
@@ -245,7 +238,7 @@ mod test {
         do run_in_mt_newsched_task {
             let server_ip = next_test_ip6();
             let client_ip = next_test_ip6();
-            let (port, chan) = oneshot();
+            let (port, chan) = Chan::new();
 
             do spawntask {
                 match UdpSocket::bind(server_ip) {
@@ -266,16 +259,14 @@ mod test {
                 }
             }
 
-            do spawntask {
-                match UdpSocket::bind(client_ip) {
-                    Some(client) => {
-                        let client = ~client;
-                        let mut stream = client.connect(server_ip);
-                        port.recv();
-                        stream.write([99]);
-                    }
-                    None => fail!()
+            match UdpSocket::bind(client_ip) {
+                Some(client) => {
+                    let client = ~client;
+                    let mut stream = client.connect(server_ip);
+                    port.recv();
+                    stream.write([99]);
                 }
+                None => fail!()
             }
         }
     }
diff --git a/src/libstd/io/net/unix.rs b/src/libstd/io/net/unix.rs
index c1f75465d9c..d3fc265cf2a 100644
--- a/src/libstd/io/net/unix.rs
+++ b/src/libstd/io/net/unix.rs
@@ -152,25 +152,22 @@ mod tests {
     use super::*;
     use rt::test::*;
     use io::*;
-    use rt::comm::oneshot;
 
     fn smalltest(server: proc(UnixStream), client: proc(UnixStream)) {
         do run_in_mt_newsched_task {
             let path1 = next_test_unix();
             let path2 = path1.clone();
-            let (port, chan) = oneshot();
             let (client, server) = (client, server);
+            let (port, chan) = Chan::new();
 
             do spawntask {
                 let mut acceptor = UnixListener::bind(&path1).listen();
                 chan.send(());
-                server(acceptor.accept().unwrap());
+                server.take()(acceptor.accept().unwrap());
             }
 
-            do spawntask {
-                port.recv();
-                client(UnixStream::connect(&path2).unwrap());
-            }
+            port.recv();
+            client.take()(UnixStream::connect(&path2).unwrap());
         }
     }
 
@@ -251,7 +248,7 @@ mod tests {
             let times = 10;
             let path1 = next_test_unix();
             let path2 = path1.clone();
-            let (port, chan) = oneshot();
+            let (port, chan) = Chan::new();
 
             do spawntask {
                 let mut acceptor = UnixListener::bind(&path1).listen();
@@ -264,13 +261,11 @@ mod tests {
                 })
             }
 
-            do spawntask {
-                port.recv();
-                times.times(|| {
-                    let mut stream = UnixStream::connect(&path2);
-                    stream.write([100]);
-                })
-            }
+            port.recv();
+            times.times(|| {
+                let mut stream = UnixStream::connect(&path2);
+                stream.write([100]);
+            })
         }
     }
 
diff --git a/src/libstd/io/signal.rs b/src/libstd/io/signal.rs
index 3b6c6013dd2..c568a19dfa2 100644
--- a/src/libstd/io/signal.rs
+++ b/src/libstd/io/signal.rs
@@ -20,7 +20,7 @@ definitions for a number of signals.
 */
 
 use clone::Clone;
-use comm::{Port, SharedChan, stream};
+use comm::{Port, SharedChan};
 use container::{Map, MutableMap};
 use hashmap;
 use io::io_error;
@@ -93,9 +93,9 @@ impl Listener {
     /// Creates a new listener for signals. Once created, signals are bound via
     /// the `register` method (otherwise nothing will ever be received)
     pub fn new() -> Listener {
-        let (port, chan) = stream();
+        let (port, chan) = SharedChan::new();
         Listener {
-            chan: SharedChan::new(chan),
+            chan: chan,
             port: port,
             handles: hashmap::HashMap::new(),
         }
@@ -149,7 +149,6 @@ mod test {
     use libc;
     use io::timer;
     use super::{Listener, Interrupt};
-    use comm::{GenericPort, Peekable};
 
     // kill is only available on Unixes
     #[cfg(unix)]
@@ -198,9 +197,7 @@ mod test {
         s2.unregister(Interrupt);
         sigint();
         timer::sleep(10);
-        if s2.port.peek() {
-            fail!("Unexpected {:?}", s2.port.recv());
-        }
+        assert!(s2.port.try_recv().is_none());
     }
 
     #[cfg(windows)]
diff --git a/src/libstd/io/timer.rs b/src/libstd/io/timer.rs
index 202e02d55d0..5fb64ab3d09 100644
--- a/src/libstd/io/timer.rs
+++ b/src/libstd/io/timer.rs
@@ -38,7 +38,7 @@ loop {
 
 */
 
-use comm::{Port, PortOne};
+use comm::Port;
 use option::{Option, Some, None};
 use result::{Ok, Err};
 use io::io_error;
@@ -86,7 +86,7 @@ impl Timer {
     /// Note that this invalidates any previous port which has been created by
     /// this timer, and that the returned port will be invalidated once the
     /// timer is destroyed (when it falls out of scope).
-    pub fn oneshot(&mut self, msecs: u64) -> PortOne<()> {
+    pub fn oneshot(&mut self, msecs: u64) -> Port<()> {
         self.obj.oneshot(msecs)
     }