diff options
| author | Alex Crichton <alex@alexcrichton.com> | 2013-12-05 18:19:06 -0800 |
|---|---|---|
| committer | Alex Crichton <alex@alexcrichton.com> | 2013-12-16 17:47:11 -0800 |
| commit | 529e268ab900f1b6e731af64ce2aeecda3555f4e (patch) | |
| tree | 7ebb9ed2a7f36455b9550749a442522d45f0dc30 /src/libstd/io | |
| parent | bfa9064ba2687eb1d95708f72f41ddd9729a6ba1 (diff) | |
| download | rust-529e268ab900f1b6e731af64ce2aeecda3555f4e.tar.gz rust-529e268ab900f1b6e731af64ce2aeecda3555f4e.zip | |
Fallout of rewriting std::comm
Diffstat (limited to 'src/libstd/io')
| -rw-r--r-- | src/libstd/io/comm_adapters.rs | 48 | ||||
| -rw-r--r-- | src/libstd/io/mod.rs | 3 | ||||
| -rw-r--r-- | src/libstd/io/net/tcp.rs | 141 | ||||
| -rw-r--r-- | src/libstd/io/net/udp.rs | 67 | ||||
| -rw-r--r-- | src/libstd/io/net/unix.rs | 25 | ||||
| -rw-r--r-- | src/libstd/io/signal.rs | 11 | ||||
| -rw-r--r-- | src/libstd/io/timer.rs | 4 |
7 files changed, 117 insertions, 182 deletions
diff --git a/src/libstd/io/comm_adapters.rs b/src/libstd/io/comm_adapters.rs index b3e5a9a0c86..7f94af8307e 100644 --- a/src/libstd/io/comm_adapters.rs +++ b/src/libstd/io/comm_adapters.rs @@ -10,7 +10,7 @@ use prelude::*; -use comm::{GenericPort, GenericChan, GenericSmartChan}; +use comm::{Port, Chan}; use cmp; use io; use option::{None, Option, Some}; @@ -30,15 +30,15 @@ use vec::{bytes, CopyableVector, MutableVector, ImmutableVector}; /// None => println!("At the end of the stream!") /// } /// ``` -pub struct PortReader<P> { +pub struct PortReader { priv buf: Option<~[u8]>, // A buffer of bytes received but not consumed. priv pos: uint, // How many of the buffered bytes have already be consumed. - priv port: P, // The port to pull data from. + priv port: Port<~[u8]>, // The port to pull data from. priv closed: bool, // Whether the pipe this port connects to has been closed. } -impl<P: GenericPort<~[u8]>> PortReader<P> { - pub fn new(port: P) -> PortReader<P> { +impl PortReader { + pub fn new(port: Port<~[u8]>) -> PortReader<P> { PortReader { buf: None, pos: 0, @@ -48,7 +48,7 @@ impl<P: GenericPort<~[u8]>> PortReader<P> { } } -impl<P: GenericPort<~[u8]>> Reader for PortReader<P> { +impl Reader for PortReader { fn read(&mut self, buf: &mut [u8]) -> Option<uint> { let mut num_read = 0; loop { @@ -67,7 +67,7 @@ impl<P: GenericPort<~[u8]>> Reader for PortReader<P> { break; } self.pos = 0; - self.buf = self.port.try_recv(); + self.buf = self.port.recv_opt(); self.closed = self.buf.is_none(); } if self.closed && num_read == 0 { @@ -89,17 +89,17 @@ impl<P: GenericPort<~[u8]>> Reader for PortReader<P> { /// let writer = ChanWriter::new(chan); /// writer.write("hello, world".as_bytes()); /// ``` -pub struct ChanWriter<C> { - chan: C, +pub struct ChanWriter { + chan: Chan<~[u8]>, } -impl<C: GenericSmartChan<~[u8]>> ChanWriter<C> { +impl ChanWriter { pub fn new(chan: C) -> ChanWriter<C> { ChanWriter { chan: chan } } } -impl<C: GenericSmartChan<~[u8]>> Writer for ChanWriter<C> { +impl Writer for ChanWriter { fn write(&mut self, buf: &[u8]) { if !self.chan.try_send(buf.to_owned()) { io::io_error::cond.raise(io::IoError { @@ -111,28 +111,6 @@ impl<C: GenericSmartChan<~[u8]>> Writer for ChanWriter<C> { } } -pub struct ReaderPort<R>; - -impl<R: Reader> ReaderPort<R> { - pub fn new(_reader: R) -> ReaderPort<R> { fail!() } -} - -impl<R: Reader> GenericPort<~[u8]> for ReaderPort<R> { - fn recv(&self) -> ~[u8] { fail!() } - - fn try_recv(&self) -> Option<~[u8]> { fail!() } -} - -pub struct WriterChan<W>; - -impl<W: Writer> WriterChan<W> { - pub fn new(_writer: W) -> WriterChan<W> { fail!() } -} - -impl<W: Writer> GenericChan<~[u8]> for WriterChan<W> { - fn send(&self, _x: ~[u8]) { fail!() } -} - #[cfg(test)] mod test { @@ -144,7 +122,7 @@ mod test { #[test] fn test_port_reader() { - let (port, chan) = comm::stream(); + let (port, chan) = Chan::new(); do task::spawn { chan.send(~[1u8, 2u8]); chan.send(~[]); @@ -199,7 +177,7 @@ mod test { #[test] fn test_chan_writer() { - let (port, chan) = comm::stream(); + let (port, chan) = Chan::new(); let mut writer = ChanWriter::new(chan); writer.write_be_u32(42); diff --git a/src/libstd/io/mod.rs b/src/libstd/io/mod.rs index c0bdc2a2014..2e9056a6aee 100644 --- a/src/libstd/io/mod.rs +++ b/src/libstd/io/mod.rs @@ -318,9 +318,6 @@ mod option; /// Basic stream compression. XXX: Belongs with other flate code pub mod flate; -/// Interop between byte streams and pipes. Not sure where it belongs -pub mod comm_adapters; - /// Extension traits pub mod extensions; diff --git a/src/libstd/io/net/tcp.rs b/src/libstd/io/net/tcp.rs index 3c7582db7b8..a6230ede7e3 100644 --- a/src/libstd/io/net/tcp.rs +++ b/src/libstd/io/net/tcp.rs @@ -151,7 +151,6 @@ mod test { use io::net::ip::{Ipv4Addr, SocketAddr}; use io::*; use prelude::*; - use rt::comm::oneshot; #[test] #[ignore] fn bind_error() { @@ -195,7 +194,7 @@ mod test { fn smoke_test_ip4() { do run_in_mt_newsched_task { let addr = next_test_ip4(); - let (port, chan) = oneshot(); + let (port, chan) = Chan::new(); do spawntask { let mut acceptor = TcpListener::bind(addr).listen(); @@ -206,11 +205,9 @@ mod test { assert!(buf[0] == 99); } - do spawntask { - port.recv(); - let mut stream = TcpStream::connect(addr); - stream.write([99]); - } + port.recv(); + let mut stream = TcpStream::connect(addr); + stream.write([99]); } } @@ -218,7 +215,7 @@ mod test { fn smoke_test_ip6() { do run_in_mt_newsched_task { let addr = next_test_ip6(); - let (port, chan) = oneshot(); + let (port, chan) = Chan::new(); do spawntask { let mut acceptor = TcpListener::bind(addr).listen(); @@ -229,11 +226,9 @@ mod test { assert!(buf[0] == 99); } - do spawntask { - port.recv(); - let mut stream = TcpStream::connect(addr); - stream.write([99]); - } + port.recv(); + let mut stream = TcpStream::connect(addr); + stream.write([99]); } } @@ -241,7 +236,7 @@ mod test { fn read_eof_ip4() { do run_in_mt_newsched_task { let addr = next_test_ip4(); - let (port, chan) = oneshot(); + let (port, chan) = Chan::new(); do spawntask { let mut acceptor = TcpListener::bind(addr).listen(); @@ -252,11 +247,9 @@ mod test { assert!(nread.is_none()); } - do spawntask { - port.recv(); - let _stream = TcpStream::connect(addr); - // Close - } + port.recv(); + let _stream = TcpStream::connect(addr); + // Close } } @@ -264,7 +257,7 @@ mod test { fn read_eof_ip6() { do run_in_mt_newsched_task { let addr = next_test_ip6(); - let (port, chan) = oneshot(); + let (port, chan) = Chan::new(); do spawntask { let mut acceptor = TcpListener::bind(addr).listen(); @@ -275,11 +268,9 @@ mod test { assert!(nread.is_none()); } - do spawntask { - port.recv(); - let _stream = TcpStream::connect(addr); - // Close - } + port.recv(); + let _stream = TcpStream::connect(addr); + // Close } } @@ -287,7 +278,7 @@ mod test { fn read_eof_twice_ip4() { do run_in_mt_newsched_task { let addr = next_test_ip4(); - let (port, chan) = oneshot(); + let (port, chan) = Chan::new(); do spawntask { let mut acceptor = TcpListener::bind(addr).listen(); @@ -308,11 +299,9 @@ mod test { }) } - do spawntask { - port.recv(); - let _stream = TcpStream::connect(addr); - // Close - } + port.recv(); + let _stream = TcpStream::connect(addr); + // Close } } @@ -320,7 +309,7 @@ mod test { fn read_eof_twice_ip6() { do run_in_mt_newsched_task { let addr = next_test_ip6(); - let (port, chan) = oneshot(); + let (port, chan) = Chan::new(); do spawntask { let mut acceptor = TcpListener::bind(addr).listen(); @@ -341,11 +330,9 @@ mod test { }) } - do spawntask { - port.recv(); - let _stream = TcpStream::connect(addr); - // Close - } + port.recv(); + let _stream = TcpStream::connect(addr); + // Close } } @@ -353,7 +340,7 @@ mod test { fn write_close_ip4() { do run_in_mt_newsched_task { let addr = next_test_ip4(); - let (port, chan) = oneshot(); + let (port, chan) = Chan::new(); do spawntask { let mut acceptor = TcpListener::bind(addr).listen(); @@ -377,11 +364,9 @@ mod test { } } - do spawntask { - port.recv(); - let _stream = TcpStream::connect(addr); - // Close - } + port.recv(); + let _stream = TcpStream::connect(addr); + // Close } } @@ -389,7 +374,7 @@ mod test { fn write_close_ip6() { do run_in_mt_newsched_task { let addr = next_test_ip6(); - let (port, chan) = oneshot(); + let (port, chan) = Chan::new(); do spawntask { let mut acceptor = TcpListener::bind(addr).listen(); @@ -413,11 +398,9 @@ mod test { } } - do spawntask { - port.recv(); - let _stream = TcpStream::connect(addr); - // Close - } + port.recv(); + let _stream = TcpStream::connect(addr); + // Close } } @@ -426,7 +409,7 @@ mod test { do run_in_mt_newsched_task { let addr = next_test_ip4(); let max = 10; - let (port, chan) = oneshot(); + let (port, chan) = Chan::new(); do spawntask { let mut acceptor = TcpListener::bind(addr).listen(); @@ -438,13 +421,11 @@ mod test { } } - do spawntask { - port.recv(); - max.times(|| { - let mut stream = TcpStream::connect(addr); - stream.write([99]); - }); - } + port.recv(); + max.times(|| { + let mut stream = TcpStream::connect(addr); + stream.write([99]); + }); } } @@ -453,7 +434,7 @@ mod test { do run_in_mt_newsched_task { let addr = next_test_ip6(); let max = 10; - let (port, chan) = oneshot(); + let (port, chan) = Chan::new(); do spawntask { let mut acceptor = TcpListener::bind(addr).listen(); @@ -465,13 +446,11 @@ mod test { } } - do spawntask { - port.recv(); - max.times(|| { - let mut stream = TcpStream::connect(addr); - stream.write([99]); - }); - } + port.recv(); + max.times(|| { + let mut stream = TcpStream::connect(addr); + stream.write([99]); + }); } } @@ -480,7 +459,7 @@ mod test { do run_in_mt_newsched_task { let addr = next_test_ip4(); static MAX: int = 10; - let (port, chan) = oneshot(); + let (port, chan) = Chan::new(); do spawntask { let mut acceptor = TcpListener::bind(addr).listen(); @@ -520,7 +499,7 @@ mod test { do run_in_mt_newsched_task { let addr = next_test_ip6(); static MAX: int = 10; - let (port, chan) = oneshot(); + let (port, chan) = Chan::new(); do spawntask { let mut acceptor = TcpListener::bind(addr).listen(); @@ -560,7 +539,7 @@ mod test { do run_in_mt_newsched_task { let addr = next_test_ip4(); static MAX: int = 10; - let (port, chan) = oneshot(); + let (port, chan) = Chan::new(); do spawntask { let mut acceptor = TcpListener::bind(addr).listen(); @@ -599,7 +578,7 @@ mod test { do run_in_mt_newsched_task { let addr = next_test_ip6(); static MAX: int = 10; - let (port, chan) = oneshot(); + let (port, chan) = Chan::new(); do spawntask { let mut acceptor = TcpListener::bind(addr).listen(); @@ -653,7 +632,7 @@ mod test { #[cfg(test)] fn peer_name(addr: SocketAddr) { do run_in_mt_newsched_task { - let (port, chan) = oneshot(); + let (port, chan) = Chan::new(); do spawntask { let mut acceptor = TcpListener::bind(addr).listen(); @@ -662,20 +641,18 @@ mod test { acceptor.accept(); } - do spawntask { - port.recv(); - let stream = TcpStream::connect(addr); + port.recv(); + let stream = TcpStream::connect(addr); - assert!(stream.is_some()); - let mut stream = stream.unwrap(); + assert!(stream.is_some()); + let mut stream = stream.unwrap(); - // Make sure peer_name gives us the - // address/port of the peer we've - // connected to. - let peer_name = stream.peer_name(); - assert!(peer_name.is_some()); - assert_eq!(addr, peer_name.unwrap()); - } + // Make sure peer_name gives us the + // address/port of the peer we've + // connected to. + let peer_name = stream.peer_name(); + assert!(peer_name.is_some()); + assert_eq!(addr, peer_name.unwrap()); } } diff --git a/src/libstd/io/net/udp.rs b/src/libstd/io/net/udp.rs index 87cf59aba3b..1e56f964bea 100644 --- a/src/libstd/io/net/udp.rs +++ b/src/libstd/io/net/udp.rs @@ -107,8 +107,7 @@ mod test { use rt::test::*; use io::net::ip::{Ipv4Addr, SocketAddr}; use io::*; - use option::{Some, None}; - use rt::comm::oneshot; + use prelude::*; #[test] #[ignore] fn bind_error() { @@ -131,7 +130,7 @@ mod test { do run_in_mt_newsched_task { let server_ip = next_test_ip4(); let client_ip = next_test_ip4(); - let (port, chan) = oneshot(); + let (port, chan) = Chan::new(); do spawntask { match UdpSocket::bind(server_ip) { @@ -151,14 +150,12 @@ mod test { } } - do spawntask { - match UdpSocket::bind(client_ip) { - Some(ref mut client) => { - port.recv(); - client.sendto([99], server_ip) - } - None => fail!() + match UdpSocket::bind(client_ip) { + Some(ref mut client) => { + port.recv(); + client.sendto([99], server_ip) } + None => fail!() } } } @@ -168,7 +165,7 @@ mod test { do run_in_mt_newsched_task { let server_ip = next_test_ip6(); let client_ip = next_test_ip6(); - let (port, chan) = oneshot(); + let (port, chan) = Chan::new(); do spawntask { match UdpSocket::bind(server_ip) { @@ -188,14 +185,12 @@ mod test { } } - do spawntask { - match UdpSocket::bind(client_ip) { - Some(ref mut client) => { - port.recv(); - client.sendto([99], server_ip) - } - None => fail!() + match UdpSocket::bind(client_ip) { + Some(ref mut client) => { + port.recv(); + client.sendto([99], server_ip) } + None => fail!() } } } @@ -205,7 +200,7 @@ mod test { do run_in_mt_newsched_task { let server_ip = next_test_ip4(); let client_ip = next_test_ip4(); - let (port, chan) = oneshot(); + let (port, chan) = Chan::new(); do spawntask { match UdpSocket::bind(server_ip) { @@ -226,16 +221,14 @@ mod test { } } - do spawntask { - match UdpSocket::bind(client_ip) { - Some(client) => { - let client = ~client; - let mut stream = client.connect(server_ip); - port.recv(); - stream.write([99]); - } - None => fail!() + match UdpSocket::bind(client_ip) { + Some(client) => { + let client = ~client; + let mut stream = client.connect(server_ip); + port.recv(); + stream.write([99]); } + None => fail!() } } } @@ -245,7 +238,7 @@ mod test { do run_in_mt_newsched_task { let server_ip = next_test_ip6(); let client_ip = next_test_ip6(); - let (port, chan) = oneshot(); + let (port, chan) = Chan::new(); do spawntask { match UdpSocket::bind(server_ip) { @@ -266,16 +259,14 @@ mod test { } } - do spawntask { - match UdpSocket::bind(client_ip) { - Some(client) => { - let client = ~client; - let mut stream = client.connect(server_ip); - port.recv(); - stream.write([99]); - } - None => fail!() + match UdpSocket::bind(client_ip) { + Some(client) => { + let client = ~client; + let mut stream = client.connect(server_ip); + port.recv(); + stream.write([99]); } + None => fail!() } } } diff --git a/src/libstd/io/net/unix.rs b/src/libstd/io/net/unix.rs index c1f75465d9c..d3fc265cf2a 100644 --- a/src/libstd/io/net/unix.rs +++ b/src/libstd/io/net/unix.rs @@ -152,25 +152,22 @@ mod tests { use super::*; use rt::test::*; use io::*; - use rt::comm::oneshot; fn smalltest(server: proc(UnixStream), client: proc(UnixStream)) { do run_in_mt_newsched_task { let path1 = next_test_unix(); let path2 = path1.clone(); - let (port, chan) = oneshot(); let (client, server) = (client, server); + let (port, chan) = Chan::new(); do spawntask { let mut acceptor = UnixListener::bind(&path1).listen(); chan.send(()); - server(acceptor.accept().unwrap()); + server.take()(acceptor.accept().unwrap()); } - do spawntask { - port.recv(); - client(UnixStream::connect(&path2).unwrap()); - } + port.recv(); + client.take()(UnixStream::connect(&path2).unwrap()); } } @@ -251,7 +248,7 @@ mod tests { let times = 10; let path1 = next_test_unix(); let path2 = path1.clone(); - let (port, chan) = oneshot(); + let (port, chan) = Chan::new(); do spawntask { let mut acceptor = UnixListener::bind(&path1).listen(); @@ -264,13 +261,11 @@ mod tests { }) } - do spawntask { - port.recv(); - times.times(|| { - let mut stream = UnixStream::connect(&path2); - stream.write([100]); - }) - } + port.recv(); + times.times(|| { + let mut stream = UnixStream::connect(&path2); + stream.write([100]); + }) } } diff --git a/src/libstd/io/signal.rs b/src/libstd/io/signal.rs index 3b6c6013dd2..c568a19dfa2 100644 --- a/src/libstd/io/signal.rs +++ b/src/libstd/io/signal.rs @@ -20,7 +20,7 @@ definitions for a number of signals. */ use clone::Clone; -use comm::{Port, SharedChan, stream}; +use comm::{Port, SharedChan}; use container::{Map, MutableMap}; use hashmap; use io::io_error; @@ -93,9 +93,9 @@ impl Listener { /// Creates a new listener for signals. Once created, signals are bound via /// the `register` method (otherwise nothing will ever be received) pub fn new() -> Listener { - let (port, chan) = stream(); + let (port, chan) = SharedChan::new(); Listener { - chan: SharedChan::new(chan), + chan: chan, port: port, handles: hashmap::HashMap::new(), } @@ -149,7 +149,6 @@ mod test { use libc; use io::timer; use super::{Listener, Interrupt}; - use comm::{GenericPort, Peekable}; // kill is only available on Unixes #[cfg(unix)] @@ -198,9 +197,7 @@ mod test { s2.unregister(Interrupt); sigint(); timer::sleep(10); - if s2.port.peek() { - fail!("Unexpected {:?}", s2.port.recv()); - } + assert!(s2.port.try_recv().is_none()); } #[cfg(windows)] diff --git a/src/libstd/io/timer.rs b/src/libstd/io/timer.rs index 202e02d55d0..5fb64ab3d09 100644 --- a/src/libstd/io/timer.rs +++ b/src/libstd/io/timer.rs @@ -38,7 +38,7 @@ loop { */ -use comm::{Port, PortOne}; +use comm::Port; use option::{Option, Some, None}; use result::{Ok, Err}; use io::io_error; @@ -86,7 +86,7 @@ impl Timer { /// Note that this invalidates any previous port which has been created by /// this timer, and that the returned port will be invalidated once the /// timer is destroyed (when it falls out of scope). - pub fn oneshot(&mut self, msecs: u64) -> PortOne<()> { + pub fn oneshot(&mut self, msecs: u64) -> Port<()> { self.obj.oneshot(msecs) } |
